1 /*****************************************************************************
2
3 Copyright (c) 1994, 2021, Oracle and/or its affiliates.
4 Copyright (c) 2012, Facebook Inc.
5
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License, version 2.0,
8 as published by the Free Software Foundation.
9
10 This program is also distributed with certain software (including
11 but not limited to OpenSSL) that is licensed under separate terms,
12 as designated in a particular file or component or in included license
13 documentation. The authors of MySQL hereby grant you an additional
14 permission to link the program and your derivative works with the
15 separately licensed software that they have included with MySQL.
16
17 This program is distributed in the hope that it will be useful,
18 but WITHOUT ANY WARRANTY; without even the implied warranty of
19 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 GNU General Public License, version 2.0, for more details.
21
22 You should have received a copy of the GNU General Public License along with
23 this program; if not, write to the Free Software Foundation, Inc.,
24 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
25
26 *****************************************************************************/
27
28 /********************************************************************//**
29 @file page/page0cur.cc
30 The page cursor
31
32 Created 10/4/1994 Heikki Tuuri
33 *************************************************************************/
34
35 #include "ha_prototypes.h"
36
37 #include "page0cur.h"
38 #ifdef UNIV_NONINL
39 #include "page0cur.ic"
40 #endif
41
42 #include "page0zip.h"
43 #include "btr0btr.h"
44 #include "mtr0log.h"
45 #include "log0recv.h"
46 #ifndef UNIV_HOTBACKUP
47 #include "rem0cmp.h"
48 #include "gis0rtree.h"
49
50 #include <algorithm>
51
52 #ifdef PAGE_CUR_ADAPT
53 # ifdef UNIV_SEARCH_PERF_STAT
54 static ulint page_cur_short_succ = 0;
55 # endif /* UNIV_SEARCH_PERF_STAT */
56
57 /*******************************************************************//**
58 This is a linear congruential generator PRNG. Returns a pseudo random
59 number between 0 and 2^64-1 inclusive. The formula and the constants
60 being used are:
61 X[n+1] = (a * X[n] + c) mod m
62 where:
63 X[0] = ut_time_monotonic_us()
64 a = 1103515245 (3^5 * 5 * 7 * 129749)
65 c = 12345 (3 * 5 * 823)
66 m = 18446744073709551616 (2^64)
67
68 @return number between 0 and 2^64-1 */
69 static
70 ib_uint64_t
page_cur_lcg_prng(void)71 page_cur_lcg_prng(void)
72 /*===================*/
73 {
74 #define LCG_a 1103515245
75 #define LCG_c 12345
76 static ib_uint64_t lcg_current = 0;
77 static ibool initialized = FALSE;
78
79 if (!initialized) {
80 lcg_current = (ib_uint64_t) ut_time_monotonic_us();
81 initialized = TRUE;
82 }
83
84 /* no need to "% 2^64" explicitly because lcg_current is
85 64 bit and this will be done anyway */
86 lcg_current = LCG_a * lcg_current + LCG_c;
87
88 return(lcg_current);
89 }
90
91 /** Try a search shortcut based on the last insert.
92 @param[in] block index page
93 @param[in] index index tree
94 @param[in] tuple search key
95 @param[in,out] iup_matched_fields already matched fields in the
96 upper limit record
97 @param[in,out] ilow_matched_fields already matched fields in the
98 lower limit record
99 @param[out] cursor page cursor
100 @return true on success */
101 UNIV_INLINE
102 bool
page_cur_try_search_shortcut(const buf_block_t * block,const dict_index_t * index,const dtuple_t * tuple,ulint * iup_matched_fields,ulint * ilow_matched_fields,page_cur_t * cursor)103 page_cur_try_search_shortcut(
104 const buf_block_t* block,
105 const dict_index_t* index,
106 const dtuple_t* tuple,
107 ulint* iup_matched_fields,
108 ulint* ilow_matched_fields,
109 page_cur_t* cursor)
110 {
111 const rec_t* rec;
112 const rec_t* next_rec;
113 ulint low_match;
114 ulint up_match;
115 ibool success = FALSE;
116 const page_t* page = buf_block_get_frame(block);
117 mem_heap_t* heap = NULL;
118 ulint offsets_[REC_OFFS_NORMAL_SIZE];
119 ulint* offsets = offsets_;
120 rec_offs_init(offsets_);
121
122 ut_ad(dtuple_check_typed(tuple));
123
124 rec = page_header_get_ptr(page, PAGE_LAST_INSERT);
125 offsets = rec_get_offsets(rec, index, offsets,
126 dtuple_get_n_fields(tuple), &heap);
127
128 ut_ad(rec);
129 ut_ad(page_rec_is_user_rec(rec));
130
131 low_match = up_match = std::min(*ilow_matched_fields,
132 *iup_matched_fields);
133
134 if (cmp_dtuple_rec_with_match(tuple, rec, offsets, &low_match) < 0) {
135 goto exit_func;
136 }
137
138 next_rec = page_rec_get_next_const(rec);
139 if (!page_rec_is_supremum(next_rec)) {
140 offsets = rec_get_offsets(next_rec, index, offsets,
141 dtuple_get_n_fields(tuple), &heap);
142
143 if (cmp_dtuple_rec_with_match(tuple, next_rec, offsets,
144 &up_match) >= 0) {
145 goto exit_func;
146 }
147
148 *iup_matched_fields = up_match;
149 }
150
151 page_cur_position(rec, block, cursor);
152
153 *ilow_matched_fields = low_match;
154
155 #ifdef UNIV_SEARCH_PERF_STAT
156 page_cur_short_succ++;
157 #endif
158 success = TRUE;
159 exit_func:
160 if (UNIV_LIKELY_NULL(heap)) {
161 mem_heap_free(heap);
162 }
163 return(success);
164 }
165
166 /** Try a search shortcut based on the last insert.
167 @param[in] block index page
168 @param[in] index index tree
169 @param[in] tuple search key
170 @param[in,out] iup_matched_fields already matched fields in the
171 upper limit record
172 @param[in,out] iup_matched_bytes already matched bytes in the
173 first partially matched field in the upper limit record
174 @param[in,out] ilow_matched_fields already matched fields in the
175 lower limit record
176 @param[in,out] ilow_matched_bytes already matched bytes in the
177 first partially matched field in the lower limit record
178 @param[out] cursor page cursor
179 @return true on success */
180 UNIV_INLINE
181 bool
page_cur_try_search_shortcut_bytes(const buf_block_t * block,const dict_index_t * index,const dtuple_t * tuple,ulint * iup_matched_fields,ulint * iup_matched_bytes,ulint * ilow_matched_fields,ulint * ilow_matched_bytes,page_cur_t * cursor)182 page_cur_try_search_shortcut_bytes(
183 const buf_block_t* block,
184 const dict_index_t* index,
185 const dtuple_t* tuple,
186 ulint* iup_matched_fields,
187 ulint* iup_matched_bytes,
188 ulint* ilow_matched_fields,
189 ulint* ilow_matched_bytes,
190 page_cur_t* cursor)
191 {
192 const rec_t* rec;
193 const rec_t* next_rec;
194 ulint low_match;
195 ulint low_bytes;
196 ulint up_match;
197 ulint up_bytes;
198 ibool success = FALSE;
199 const page_t* page = buf_block_get_frame(block);
200 mem_heap_t* heap = NULL;
201 ulint offsets_[REC_OFFS_NORMAL_SIZE];
202 ulint* offsets = offsets_;
203 rec_offs_init(offsets_);
204
205 ut_ad(dtuple_check_typed(tuple));
206
207 rec = page_header_get_ptr(page, PAGE_LAST_INSERT);
208 offsets = rec_get_offsets(rec, index, offsets,
209 dtuple_get_n_fields(tuple), &heap);
210
211 ut_ad(rec);
212 ut_ad(page_rec_is_user_rec(rec));
213 if (ut_pair_cmp(*ilow_matched_fields, *ilow_matched_bytes,
214 *iup_matched_fields, *iup_matched_bytes) < 0) {
215 up_match = low_match = *ilow_matched_fields;
216 up_bytes = low_bytes = *ilow_matched_bytes;
217 } else {
218 up_match = low_match = *iup_matched_fields;
219 up_bytes = low_bytes = *iup_matched_bytes;
220 }
221
222 if (cmp_dtuple_rec_with_match_bytes(
223 tuple, rec, index, offsets, &low_match, &low_bytes) < 0) {
224 goto exit_func;
225 }
226
227 next_rec = page_rec_get_next_const(rec);
228 if (!page_rec_is_supremum(next_rec)) {
229 offsets = rec_get_offsets(next_rec, index, offsets,
230 dtuple_get_n_fields(tuple), &heap);
231
232 if (cmp_dtuple_rec_with_match_bytes(
233 tuple, next_rec, index, offsets,
234 &up_match, &up_bytes)
235 >= 0) {
236 goto exit_func;
237 }
238
239 *iup_matched_fields = up_match;
240 *iup_matched_bytes = up_bytes;
241 }
242
243 page_cur_position(rec, block, cursor);
244
245 *ilow_matched_fields = low_match;
246 *ilow_matched_bytes = low_bytes;
247
248 #ifdef UNIV_SEARCH_PERF_STAT
249 page_cur_short_succ++;
250 #endif
251 success = TRUE;
252 exit_func:
253 if (UNIV_LIKELY_NULL(heap)) {
254 mem_heap_free(heap);
255 }
256 return(success);
257 }
258 #endif
259
260 #ifdef PAGE_CUR_LE_OR_EXTENDS
261 /****************************************************************//**
262 Checks if the nth field in a record is a character type field which extends
263 the nth field in tuple, i.e., the field is longer or equal in length and has
264 common first characters.
265 @return TRUE if rec field extends tuple field */
266 static
267 ibool
page_cur_rec_field_extends(const dtuple_t * tuple,const rec_t * rec,const ulint * offsets,ulint n)268 page_cur_rec_field_extends(
269 /*=======================*/
270 const dtuple_t* tuple, /*!< in: data tuple */
271 const rec_t* rec, /*!< in: record */
272 const ulint* offsets,/*!< in: array returned by rec_get_offsets() */
273 ulint n) /*!< in: compare nth field */
274 {
275 const dtype_t* type;
276 const dfield_t* dfield;
277 const byte* rec_f;
278 ulint rec_f_len;
279
280 ut_ad(rec_offs_validate(rec, NULL, offsets));
281 dfield = dtuple_get_nth_field(tuple, n);
282
283 type = dfield_get_type(dfield);
284
285 rec_f = rec_get_nth_field(rec, offsets, n, &rec_f_len);
286
287 if (type->mtype == DATA_VARCHAR
288 || type->mtype == DATA_CHAR
289 || type->mtype == DATA_FIXBINARY
290 || type->mtype == DATA_BINARY
291 || type->mtype == DATA_BLOB
292 || DATA_GEOMETRY_MTYPE(type->mtype)
293 || type->mtype == DATA_VARMYSQL
294 || type->mtype == DATA_MYSQL) {
295
296 if (dfield_get_len(dfield) != UNIV_SQL_NULL
297 && rec_f_len != UNIV_SQL_NULL
298 && rec_f_len >= dfield_get_len(dfield)
299 && !cmp_data_data(type->mtype, type->prtype,
300 dfield_get_data(dfield),
301 dfield_get_len(dfield),
302 rec_f, dfield_get_len(dfield))) {
303
304 return(TRUE);
305 }
306 }
307
308 return(FALSE);
309 }
310 #endif /* PAGE_CUR_LE_OR_EXTENDS */
311
312 /** If key is fixed length then populate offset directly from
313 cached version.
314 @param[in] rec B-Tree record for which offset needs to be
315 populated.
316 @param[in,out] index index handler
317 @param[in] tuple data tuple
318 @param[in,out] offsets default offsets array
319 @param[in,out] heap heap
320 @return reference to populate offsets. */
321 static
322 ulint*
populate_offsets(const rec_t * rec,const dtuple_t * tuple,dict_index_t * index,ulint * offsets,mem_heap_t ** heap)323 populate_offsets(
324 const rec_t* rec,
325 const dtuple_t* tuple,
326 dict_index_t* index,
327 ulint* offsets,
328 mem_heap_t** heap)
329 {
330 ut_ad(dict_table_is_intrinsic(index->table));
331
332 bool rec_has_null_values = false;
333
334 if (index->rec_cache.key_has_null_cols) {
335 /* Check if record has null value. */
336 const byte* nulls = rec - (1 + REC_N_NEW_EXTRA_BYTES);
337 ulint n_bytes_to_scan
338 = UT_BITS_IN_BYTES(index->n_nullable);
339 byte null_mask = 0xff;
340 ulint bits_examined = 0;
341
342 for (ulint i = 0; i < n_bytes_to_scan - 1; i++) {
343 if (*nulls & null_mask) {
344 rec_has_null_values = true;
345 break;
346 }
347 --nulls;
348 bits_examined += 8;
349 }
350
351 if (!rec_has_null_values) {
352 null_mask >>= (8 - (index->n_nullable - bits_examined));
353 rec_has_null_values = *nulls & null_mask;
354 }
355
356 if (rec_has_null_values) {
357
358 offsets = rec_get_offsets(
359 rec, index, offsets,
360 dtuple_get_n_fields_cmp(tuple), heap);
361
362 return(offsets);
363 }
364 }
365
366 /* Check if offsets are cached else cache them first.
367 There are queries that will first verify if key is present using index
368 search and then initiate insert. If offsets are cached during index
369 search it would be based on key part only but during insert that looks
370 out for exact location to insert key + db_row_id both columns would
371 be used and so re-compute offsets in such case. */
372 if (!index->rec_cache.offsets_cached
373 || (rec_offs_n_fields(index->rec_cache.offsets)
374 < dtuple_get_n_fields_cmp(tuple))) {
375
376 offsets = rec_get_offsets(
377 rec, index, offsets,
378 dtuple_get_n_fields_cmp(tuple), heap);
379
380 /* Reallocate if our offset array is not big
381 enough to hold the needed size. */
382 ulint sz1 = index->rec_cache.sz_of_offsets;
383 ulint sz2 = offsets[0];
384 if (sz1 < sz2) {
385 index->rec_cache.offsets = static_cast<ulint*>(
386 mem_heap_alloc(
387 index->heap, sizeof(ulint) * sz2));
388 index->rec_cache.sz_of_offsets =
389 static_cast<uint32_t>(sz2);
390 }
391
392 memcpy(index->rec_cache.offsets,
393 offsets, (sizeof(ulint) * sz2));
394 index->rec_cache.offsets_cached = true;
395 }
396
397 ut_ad(index->rec_cache.offsets[2] = (ulint) rec);
398
399 return(index->rec_cache.offsets);
400 }
401
402 /****************************************************************//**
403 Searches the right position for a page cursor. */
404 void
page_cur_search_with_match(const buf_block_t * block,const dict_index_t * index,const dtuple_t * tuple,page_cur_mode_t mode,ulint * iup_matched_fields,ulint * ilow_matched_fields,page_cur_t * cursor,rtr_info_t * rtr_info)405 page_cur_search_with_match(
406 /*=======================*/
407 const buf_block_t* block, /*!< in: buffer block */
408 const dict_index_t* index, /*!< in/out: record descriptor */
409 const dtuple_t* tuple, /*!< in: data tuple */
410 page_cur_mode_t mode, /*!< in: PAGE_CUR_L,
411 PAGE_CUR_LE, PAGE_CUR_G, or
412 PAGE_CUR_GE */
413 ulint* iup_matched_fields,
414 /*!< in/out: already matched
415 fields in upper limit record */
416 ulint* ilow_matched_fields,
417 /*!< in/out: already matched
418 fields in lower limit record */
419 page_cur_t* cursor, /*!< out: page cursor */
420 rtr_info_t* rtr_info)/*!< in/out: rtree search stack */
421 {
422 ulint up;
423 ulint low;
424 ulint mid;
425 const page_t* page;
426 const page_dir_slot_t* slot;
427 const rec_t* up_rec;
428 const rec_t* low_rec;
429 const rec_t* mid_rec;
430 ulint up_matched_fields;
431 ulint low_matched_fields;
432 ulint cur_matched_fields;
433 int cmp;
434 #ifdef UNIV_ZIP_DEBUG
435 const page_zip_des_t* page_zip = buf_block_get_page_zip(block);
436 #endif /* UNIV_ZIP_DEBUG */
437 mem_heap_t* heap = NULL;
438 ulint offsets_[REC_OFFS_NORMAL_SIZE];
439 ulint* offsets = offsets_;
440 rec_offs_init(offsets_);
441
442 ut_ad(dtuple_validate(tuple));
443 #ifdef UNIV_DEBUG
444 # ifdef PAGE_CUR_DBG
445 if (mode != PAGE_CUR_DBG)
446 # endif /* PAGE_CUR_DBG */
447 # ifdef PAGE_CUR_LE_OR_EXTENDS
448 if (mode != PAGE_CUR_LE_OR_EXTENDS)
449 # endif /* PAGE_CUR_LE_OR_EXTENDS */
450 ut_ad(mode == PAGE_CUR_L || mode == PAGE_CUR_LE
451 || mode == PAGE_CUR_G || mode == PAGE_CUR_GE
452 || dict_index_is_spatial(index));
453 #endif /* UNIV_DEBUG */
454 page = buf_block_get_frame(block);
455 #ifdef UNIV_ZIP_DEBUG
456 ut_a(!page_zip || page_zip_validate(page_zip, page, index));
457 #endif /* UNIV_ZIP_DEBUG */
458
459 ut_d(page_check_dir(page));
460
461 #ifdef PAGE_CUR_ADAPT
462 if (page_is_leaf(page)
463 && (mode == PAGE_CUR_LE)
464 && !dict_index_is_spatial(index)
465 && (page_header_get_field(page, PAGE_N_DIRECTION) > 3)
466 && (page_header_get_ptr(page, PAGE_LAST_INSERT))
467 && (page_header_get_field(page, PAGE_DIRECTION) == PAGE_RIGHT)) {
468
469 if (page_cur_try_search_shortcut(
470 block, index, tuple,
471 iup_matched_fields,
472 ilow_matched_fields,
473 cursor)) {
474 return;
475 }
476 }
477 # ifdef PAGE_CUR_DBG
478 if (mode == PAGE_CUR_DBG) {
479 mode = PAGE_CUR_LE;
480 }
481 # endif
482 #endif
483
484 /* If the mode is for R-tree indexes, use the special MBR
485 related compare functions */
486 if (dict_index_is_spatial(index) && mode > PAGE_CUR_LE) {
487 /* For leaf level insert, we still use the traditional
488 compare function for now */
489 if (mode == PAGE_CUR_RTREE_INSERT && page_is_leaf(page)){
490 mode = PAGE_CUR_LE;
491 } else {
492 rtr_cur_search_with_match(
493 block, (dict_index_t*)index, tuple, mode,
494 cursor, rtr_info);
495 return;
496 }
497 }
498
499 /* The following flag does not work for non-latin1 char sets because
500 cmp_full_field does not tell how many bytes matched */
501 #ifdef PAGE_CUR_LE_OR_EXTENDS
502 ut_a(mode != PAGE_CUR_LE_OR_EXTENDS);
503 #endif /* PAGE_CUR_LE_OR_EXTENDS */
504
505 /* If mode PAGE_CUR_G is specified, we are trying to position the
506 cursor to answer a query of the form "tuple < X", where tuple is
507 the input parameter, and X denotes an arbitrary physical record on
508 the page. We want to position the cursor on the first X which
509 satisfies the condition. */
510
511 up_matched_fields = *iup_matched_fields;
512 low_matched_fields = *ilow_matched_fields;
513
514 /* Perform binary search. First the search is done through the page
515 directory, after that as a linear search in the list of records
516 owned by the upper limit directory slot. */
517
518 low = 0;
519 up = page_dir_get_n_slots(page) - 1;
520
521 /* Perform binary search until the lower and upper limit directory
522 slots come to the distance 1 of each other */
523
524 while (up - low > 1) {
525 mid = (low + up) / 2;
526 slot = page_dir_get_nth_slot(page, mid);
527 mid_rec = page_dir_slot_get_rec(slot);
528
529 cur_matched_fields = std::min(low_matched_fields,
530 up_matched_fields);
531
532 offsets = offsets_;
533 if (index->rec_cache.fixed_len_key) {
534 offsets = populate_offsets(
535 mid_rec, tuple,
536 const_cast<dict_index_t*>(index),
537 offsets, &heap);
538 } else {
539 offsets = rec_get_offsets(
540 mid_rec, index, offsets,
541 dtuple_get_n_fields_cmp(tuple), &heap);
542
543 }
544
545 cmp = cmp_dtuple_rec_with_match(
546 tuple, mid_rec, offsets, &cur_matched_fields);
547
548 if (cmp > 0) {
549 low_slot_match:
550 low = mid;
551 low_matched_fields = cur_matched_fields;
552
553 } else if (cmp) {
554 #ifdef PAGE_CUR_LE_OR_EXTENDS
555 if (mode == PAGE_CUR_LE_OR_EXTENDS
556 && page_cur_rec_field_extends(
557 tuple, mid_rec, offsets,
558 cur_matched_fields)) {
559
560 goto low_slot_match;
561 }
562 #endif /* PAGE_CUR_LE_OR_EXTENDS */
563 up_slot_match:
564 up = mid;
565 up_matched_fields = cur_matched_fields;
566
567 } else if (mode == PAGE_CUR_G || mode == PAGE_CUR_LE
568 #ifdef PAGE_CUR_LE_OR_EXTENDS
569 || mode == PAGE_CUR_LE_OR_EXTENDS
570 #endif /* PAGE_CUR_LE_OR_EXTENDS */
571 ) {
572 goto low_slot_match;
573 } else {
574
575 goto up_slot_match;
576 }
577 }
578
579 slot = page_dir_get_nth_slot(page, low);
580 low_rec = page_dir_slot_get_rec(slot);
581 slot = page_dir_get_nth_slot(page, up);
582 up_rec = page_dir_slot_get_rec(slot);
583
584 /* Perform linear search until the upper and lower records come to
585 distance 1 of each other. */
586
587 while (page_rec_get_next_const(low_rec) != up_rec) {
588
589 mid_rec = page_rec_get_next_const(low_rec);
590
591 cur_matched_fields = std::min(low_matched_fields,
592 up_matched_fields);
593
594 offsets = offsets_;
595 if (index->rec_cache.fixed_len_key) {
596 offsets = populate_offsets(
597 mid_rec, tuple,
598 const_cast<dict_index_t*>(index),
599 offsets, &heap);
600 } else {
601 offsets = rec_get_offsets(
602 mid_rec, index, offsets,
603 dtuple_get_n_fields_cmp(tuple), &heap);
604
605 }
606
607 cmp = cmp_dtuple_rec_with_match(
608 tuple, mid_rec, offsets, &cur_matched_fields);
609
610 if (cmp > 0) {
611 low_rec_match:
612 low_rec = mid_rec;
613 low_matched_fields = cur_matched_fields;
614
615 } else if (cmp) {
616 #ifdef PAGE_CUR_LE_OR_EXTENDS
617 if (mode == PAGE_CUR_LE_OR_EXTENDS
618 && page_cur_rec_field_extends(
619 tuple, mid_rec, offsets,
620 cur_matched_fields)) {
621
622 goto low_rec_match;
623 }
624 #endif /* PAGE_CUR_LE_OR_EXTENDS */
625 up_rec_match:
626 up_rec = mid_rec;
627 up_matched_fields = cur_matched_fields;
628 } else if (mode == PAGE_CUR_G || mode == PAGE_CUR_LE
629 #ifdef PAGE_CUR_LE_OR_EXTENDS
630 || mode == PAGE_CUR_LE_OR_EXTENDS
631 #endif /* PAGE_CUR_LE_OR_EXTENDS */
632 ) {
633 if (!cmp && !cur_matched_fields) {
634 #ifdef UNIV_DEBUG
635 mtr_t mtr;
636 mtr_start(&mtr);
637
638 /* We got a match, but cur_matched_fields is
639 0, it must have REC_INFO_MIN_REC_FLAG */
640 ulint rec_info = rec_get_info_bits(mid_rec,
641 rec_offs_comp(offsets));
642 ut_ad(rec_info & REC_INFO_MIN_REC_FLAG);
643 ut_ad(btr_page_get_prev(page, &mtr) == FIL_NULL);
644 mtr_commit(&mtr);
645 #endif
646
647 cur_matched_fields = dtuple_get_n_fields_cmp(tuple);
648 }
649
650 goto low_rec_match;
651 } else {
652
653 goto up_rec_match;
654 }
655 }
656
657 if (mode <= PAGE_CUR_GE) {
658 page_cur_position(up_rec, block, cursor);
659 } else {
660 page_cur_position(low_rec, block, cursor);
661 }
662
663 *iup_matched_fields = up_matched_fields;
664 *ilow_matched_fields = low_matched_fields;
665 if (UNIV_LIKELY_NULL(heap)) {
666 mem_heap_free(heap);
667 }
668 }
669
670 /** Search the right position for a page cursor.
671 @param[in] block buffer block
672 @param[in] index index tree
673 @param[in] tuple key to be searched for
674 @param[in] mode search mode
675 @param[in,out] iup_matched_fields already matched fields in the
676 upper limit record
677 @param[in,out] iup_matched_bytes already matched bytes in the
678 first partially matched field in the upper limit record
679 @param[in,out] ilow_matched_fields already matched fields in the
680 lower limit record
681 @param[in,out] ilow_matched_bytes already matched bytes in the
682 first partially matched field in the lower limit record
683 @param[out] cursor page cursor */
684 void
page_cur_search_with_match_bytes(const buf_block_t * block,const dict_index_t * index,const dtuple_t * tuple,page_cur_mode_t mode,ulint * iup_matched_fields,ulint * iup_matched_bytes,ulint * ilow_matched_fields,ulint * ilow_matched_bytes,page_cur_t * cursor)685 page_cur_search_with_match_bytes(
686 const buf_block_t* block,
687 const dict_index_t* index,
688 const dtuple_t* tuple,
689 page_cur_mode_t mode,
690 ulint* iup_matched_fields,
691 ulint* iup_matched_bytes,
692 ulint* ilow_matched_fields,
693 ulint* ilow_matched_bytes,
694 page_cur_t* cursor)
695 {
696 ulint up;
697 ulint low;
698 ulint mid;
699 const page_t* page;
700 const page_dir_slot_t* slot;
701 const rec_t* up_rec;
702 const rec_t* low_rec;
703 const rec_t* mid_rec;
704 ulint up_matched_fields;
705 ulint up_matched_bytes;
706 ulint low_matched_fields;
707 ulint low_matched_bytes;
708 ulint cur_matched_fields;
709 ulint cur_matched_bytes;
710 int cmp;
711 #ifdef UNIV_ZIP_DEBUG
712 const page_zip_des_t* page_zip = buf_block_get_page_zip(block);
713 #endif /* UNIV_ZIP_DEBUG */
714 mem_heap_t* heap = NULL;
715 ulint offsets_[REC_OFFS_NORMAL_SIZE];
716 ulint* offsets = offsets_;
717 rec_offs_init(offsets_);
718
719 ut_ad(dtuple_validate(tuple));
720 #ifdef UNIV_DEBUG
721 # ifdef PAGE_CUR_DBG
722 if (mode != PAGE_CUR_DBG)
723 # endif /* PAGE_CUR_DBG */
724 # ifdef PAGE_CUR_LE_OR_EXTENDS
725 if (mode != PAGE_CUR_LE_OR_EXTENDS)
726 # endif /* PAGE_CUR_LE_OR_EXTENDS */
727 ut_ad(mode == PAGE_CUR_L || mode == PAGE_CUR_LE
728 || mode == PAGE_CUR_G || mode == PAGE_CUR_GE);
729 #endif /* UNIV_DEBUG */
730 page = buf_block_get_frame(block);
731 #ifdef UNIV_ZIP_DEBUG
732 ut_a(!page_zip || page_zip_validate(page_zip, page, index));
733 #endif /* UNIV_ZIP_DEBUG */
734
735 ut_d(page_check_dir(page));
736
737 #ifdef PAGE_CUR_ADAPT
738 if (page_is_leaf(page)
739 && (mode == PAGE_CUR_LE)
740 && (page_header_get_field(page, PAGE_N_DIRECTION) > 3)
741 && (page_header_get_ptr(page, PAGE_LAST_INSERT))
742 && (page_header_get_field(page, PAGE_DIRECTION) == PAGE_RIGHT)) {
743
744 if (page_cur_try_search_shortcut_bytes(
745 block, index, tuple,
746 iup_matched_fields, iup_matched_bytes,
747 ilow_matched_fields, ilow_matched_bytes,
748 cursor)) {
749 return;
750 }
751 }
752 # ifdef PAGE_CUR_DBG
753 if (mode == PAGE_CUR_DBG) {
754 mode = PAGE_CUR_LE;
755 }
756 # endif
757 #endif
758
759 /* The following flag does not work for non-latin1 char sets because
760 cmp_full_field does not tell how many bytes matched */
761 #ifdef PAGE_CUR_LE_OR_EXTENDS
762 ut_a(mode != PAGE_CUR_LE_OR_EXTENDS);
763 #endif /* PAGE_CUR_LE_OR_EXTENDS */
764
765 /* If mode PAGE_CUR_G is specified, we are trying to position the
766 cursor to answer a query of the form "tuple < X", where tuple is
767 the input parameter, and X denotes an arbitrary physical record on
768 the page. We want to position the cursor on the first X which
769 satisfies the condition. */
770
771 up_matched_fields = *iup_matched_fields;
772 up_matched_bytes = *iup_matched_bytes;
773 low_matched_fields = *ilow_matched_fields;
774 low_matched_bytes = *ilow_matched_bytes;
775
776 /* Perform binary search. First the search is done through the page
777 directory, after that as a linear search in the list of records
778 owned by the upper limit directory slot. */
779
780 low = 0;
781 up = page_dir_get_n_slots(page) - 1;
782
783 /* Perform binary search until the lower and upper limit directory
784 slots come to the distance 1 of each other */
785
786 while (up - low > 1) {
787 mid = (low + up) / 2;
788 slot = page_dir_get_nth_slot(page, mid);
789 mid_rec = page_dir_slot_get_rec(slot);
790
791 ut_pair_min(&cur_matched_fields, &cur_matched_bytes,
792 low_matched_fields, low_matched_bytes,
793 up_matched_fields, up_matched_bytes);
794
795 offsets = rec_get_offsets(
796 mid_rec, index, offsets_,
797 dtuple_get_n_fields_cmp(tuple), &heap);
798
799 cmp = cmp_dtuple_rec_with_match_bytes(
800 tuple, mid_rec, index, offsets,
801 &cur_matched_fields, &cur_matched_bytes);
802
803 if (cmp > 0) {
804 low_slot_match:
805 low = mid;
806 low_matched_fields = cur_matched_fields;
807 low_matched_bytes = cur_matched_bytes;
808
809 } else if (cmp) {
810 #ifdef PAGE_CUR_LE_OR_EXTENDS
811 if (mode == PAGE_CUR_LE_OR_EXTENDS
812 && page_cur_rec_field_extends(
813 tuple, mid_rec, offsets,
814 cur_matched_fields)) {
815
816 goto low_slot_match;
817 }
818 #endif /* PAGE_CUR_LE_OR_EXTENDS */
819 up_slot_match:
820 up = mid;
821 up_matched_fields = cur_matched_fields;
822 up_matched_bytes = cur_matched_bytes;
823
824 } else if (mode == PAGE_CUR_G || mode == PAGE_CUR_LE
825 #ifdef PAGE_CUR_LE_OR_EXTENDS
826 || mode == PAGE_CUR_LE_OR_EXTENDS
827 #endif /* PAGE_CUR_LE_OR_EXTENDS */
828 ) {
829 goto low_slot_match;
830 } else {
831
832 goto up_slot_match;
833 }
834 }
835
836 slot = page_dir_get_nth_slot(page, low);
837 low_rec = page_dir_slot_get_rec(slot);
838 slot = page_dir_get_nth_slot(page, up);
839 up_rec = page_dir_slot_get_rec(slot);
840
841 /* Perform linear search until the upper and lower records come to
842 distance 1 of each other. */
843
844 while (page_rec_get_next_const(low_rec) != up_rec) {
845
846 mid_rec = page_rec_get_next_const(low_rec);
847
848 ut_pair_min(&cur_matched_fields, &cur_matched_bytes,
849 low_matched_fields, low_matched_bytes,
850 up_matched_fields, up_matched_bytes);
851
852 offsets = rec_get_offsets(
853 mid_rec, index, offsets_,
854 dtuple_get_n_fields_cmp(tuple), &heap);
855
856 cmp = cmp_dtuple_rec_with_match_bytes(
857 tuple, mid_rec, index, offsets,
858 &cur_matched_fields, &cur_matched_bytes);
859
860 if (cmp > 0) {
861 low_rec_match:
862 low_rec = mid_rec;
863 low_matched_fields = cur_matched_fields;
864 low_matched_bytes = cur_matched_bytes;
865
866 } else if (cmp) {
867 #ifdef PAGE_CUR_LE_OR_EXTENDS
868 if (mode == PAGE_CUR_LE_OR_EXTENDS
869 && page_cur_rec_field_extends(
870 tuple, mid_rec, offsets,
871 cur_matched_fields)) {
872
873 goto low_rec_match;
874 }
875 #endif /* PAGE_CUR_LE_OR_EXTENDS */
876 up_rec_match:
877 up_rec = mid_rec;
878 up_matched_fields = cur_matched_fields;
879 up_matched_bytes = cur_matched_bytes;
880 } else if (mode == PAGE_CUR_G || mode == PAGE_CUR_LE
881 #ifdef PAGE_CUR_LE_OR_EXTENDS
882 || mode == PAGE_CUR_LE_OR_EXTENDS
883 #endif /* PAGE_CUR_LE_OR_EXTENDS */
884 ) {
885 if (!cmp && !cur_matched_fields) {
886 #ifdef UNIV_DEBUG
887 mtr_t mtr;
888 mtr_start(&mtr);
889
890 /* We got a match, but cur_matched_fields is
891 0, it must have REC_INFO_MIN_REC_FLAG */
892 ulint rec_info = rec_get_info_bits(mid_rec,
893 rec_offs_comp(offsets));
894 ut_ad(rec_info & REC_INFO_MIN_REC_FLAG);
895 ut_ad(btr_page_get_prev(page, &mtr) == FIL_NULL);
896 mtr_commit(&mtr);
897 #endif
898
899 cur_matched_fields = dtuple_get_n_fields_cmp(tuple);
900 }
901
902 goto low_rec_match;
903 } else {
904
905 goto up_rec_match;
906 }
907 }
908
909 if (mode <= PAGE_CUR_GE) {
910 page_cur_position(up_rec, block, cursor);
911 } else {
912 page_cur_position(low_rec, block, cursor);
913 }
914
915 *iup_matched_fields = up_matched_fields;
916 *iup_matched_bytes = up_matched_bytes;
917 *ilow_matched_fields = low_matched_fields;
918 *ilow_matched_bytes = low_matched_bytes;
919 if (UNIV_LIKELY_NULL(heap)) {
920 mem_heap_free(heap);
921 }
922 }
923
924 /***********************************************************//**
925 Positions a page cursor on a randomly chosen user record on a page. If there
926 are no user records, sets the cursor on the infimum record. */
927 void
page_cur_open_on_rnd_user_rec(buf_block_t * block,page_cur_t * cursor)928 page_cur_open_on_rnd_user_rec(
929 /*==========================*/
930 buf_block_t* block, /*!< in: page */
931 page_cur_t* cursor) /*!< out: page cursor */
932 {
933 ulint rnd;
934 ulint n_recs = page_get_n_recs(buf_block_get_frame(block));
935
936 page_cur_set_before_first(block, cursor);
937
938 if (UNIV_UNLIKELY(n_recs == 0)) {
939
940 return;
941 }
942
943 rnd = (ulint) (page_cur_lcg_prng() % n_recs);
944
945 do {
946 page_cur_move_to_next(cursor);
947 } while (rnd--);
948 }
949
950 /***********************************************************//**
951 Writes the log record of a record insert on a page. */
952 static
953 void
page_cur_insert_rec_write_log(rec_t * insert_rec,ulint rec_size,rec_t * cursor_rec,dict_index_t * index,mtr_t * mtr)954 page_cur_insert_rec_write_log(
955 /*==========================*/
956 rec_t* insert_rec, /*!< in: inserted physical record */
957 ulint rec_size, /*!< in: insert_rec size */
958 rec_t* cursor_rec, /*!< in: record the
959 cursor is pointing to */
960 dict_index_t* index, /*!< in: record descriptor */
961 mtr_t* mtr) /*!< in: mini-transaction handle */
962 {
963 ulint cur_rec_size;
964 ulint extra_size;
965 ulint cur_extra_size;
966 const byte* ins_ptr;
967 const byte* log_end;
968 ulint i;
969
970 /* Avoid REDO logging to save on costly IO because
971 temporary tables are not recovered during crash recovery. */
972 if (dict_table_is_temporary(index->table)) {
973 byte* log_ptr = mlog_open(mtr, 0);
974 if (log_ptr == NULL) {
975 return;
976 }
977 mlog_close(mtr, log_ptr);
978 log_ptr = NULL;
979 }
980
981 ut_a(rec_size < UNIV_PAGE_SIZE);
982 ut_ad(mtr->is_named_space(index->space));
983 ut_ad(page_align(insert_rec) == page_align(cursor_rec));
984 ut_ad(!page_rec_is_comp(insert_rec)
985 == !dict_table_is_comp(index->table));
986
987 {
988 mem_heap_t* heap = NULL;
989 ulint cur_offs_[REC_OFFS_NORMAL_SIZE];
990 ulint ins_offs_[REC_OFFS_NORMAL_SIZE];
991
992 ulint* cur_offs;
993 ulint* ins_offs;
994
995 rec_offs_init(cur_offs_);
996 rec_offs_init(ins_offs_);
997
998 cur_offs = rec_get_offsets(cursor_rec, index, cur_offs_,
999 ULINT_UNDEFINED, &heap);
1000 ins_offs = rec_get_offsets(insert_rec, index, ins_offs_,
1001 ULINT_UNDEFINED, &heap);
1002
1003 extra_size = rec_offs_extra_size(ins_offs);
1004 cur_extra_size = rec_offs_extra_size(cur_offs);
1005 ut_ad(rec_size == rec_offs_size(ins_offs));
1006 cur_rec_size = rec_offs_size(cur_offs);
1007
1008 if (UNIV_LIKELY_NULL(heap)) {
1009 mem_heap_free(heap);
1010 }
1011 }
1012
1013 ins_ptr = insert_rec - extra_size;
1014
1015 i = 0;
1016
1017 if (cur_extra_size == extra_size) {
1018 ulint min_rec_size = ut_min(cur_rec_size, rec_size);
1019
1020 const byte* cur_ptr = cursor_rec - cur_extra_size;
1021
1022 /* Find out the first byte in insert_rec which differs from
1023 cursor_rec; skip the bytes in the record info */
1024
1025 do {
1026 if (*ins_ptr == *cur_ptr) {
1027 i++;
1028 ins_ptr++;
1029 cur_ptr++;
1030 } else if ((i < extra_size)
1031 && (i >= extra_size
1032 - page_rec_get_base_extra_size
1033 (insert_rec))) {
1034 i = extra_size;
1035 ins_ptr = insert_rec;
1036 cur_ptr = cursor_rec;
1037 } else {
1038 break;
1039 }
1040 } while (i < min_rec_size);
1041 }
1042
1043 byte* log_ptr;
1044
1045 if (mtr_get_log_mode(mtr) != MTR_LOG_SHORT_INSERTS) {
1046
1047 if (page_rec_is_comp(insert_rec)) {
1048 log_ptr = mlog_open_and_write_index(
1049 mtr, insert_rec, index, MLOG_COMP_REC_INSERT,
1050 2 + 5 + 1 + 5 + 5 + MLOG_BUF_MARGIN);
1051 if (UNIV_UNLIKELY(!log_ptr)) {
1052 /* Logging in mtr is switched off
1053 during crash recovery: in that case
1054 mlog_open returns NULL */
1055 return;
1056 }
1057 } else {
1058 log_ptr = mlog_open(mtr, 11
1059 + 2 + 5 + 1 + 5 + 5
1060 + MLOG_BUF_MARGIN);
1061 if (UNIV_UNLIKELY(!log_ptr)) {
1062 /* Logging in mtr is switched off
1063 during crash recovery: in that case
1064 mlog_open returns NULL */
1065 return;
1066 }
1067
1068 log_ptr = mlog_write_initial_log_record_fast(
1069 insert_rec, MLOG_REC_INSERT, log_ptr, mtr);
1070 }
1071
1072 log_end = &log_ptr[2 + 5 + 1 + 5 + 5 + MLOG_BUF_MARGIN];
1073 /* Write the cursor rec offset as a 2-byte ulint */
1074 mach_write_to_2(log_ptr, page_offset(cursor_rec));
1075 log_ptr += 2;
1076 } else {
1077 log_ptr = mlog_open(mtr, 5 + 1 + 5 + 5 + MLOG_BUF_MARGIN);
1078 if (!log_ptr) {
1079 /* Logging in mtr is switched off during crash
1080 recovery: in that case mlog_open returns NULL */
1081 return;
1082 }
1083 log_end = &log_ptr[5 + 1 + 5 + 5 + MLOG_BUF_MARGIN];
1084 }
1085
1086 if (page_rec_is_comp(insert_rec)) {
1087 if (UNIV_UNLIKELY
1088 (rec_get_info_and_status_bits(insert_rec, TRUE)
1089 != rec_get_info_and_status_bits(cursor_rec, TRUE))) {
1090
1091 goto need_extra_info;
1092 }
1093 } else {
1094 if (UNIV_UNLIKELY
1095 (rec_get_info_and_status_bits(insert_rec, FALSE)
1096 != rec_get_info_and_status_bits(cursor_rec, FALSE))) {
1097
1098 goto need_extra_info;
1099 }
1100 }
1101
1102 if (extra_size != cur_extra_size || rec_size != cur_rec_size) {
1103 need_extra_info:
1104 /* Write the record end segment length
1105 and the extra info storage flag */
1106 log_ptr += mach_write_compressed(log_ptr,
1107 2 * (rec_size - i) + 1);
1108
1109 /* Write the info bits */
1110 mach_write_to_1(log_ptr,
1111 rec_get_info_and_status_bits(
1112 insert_rec,
1113 page_rec_is_comp(insert_rec)));
1114 log_ptr++;
1115
1116 /* Write the record origin offset */
1117 log_ptr += mach_write_compressed(log_ptr, extra_size);
1118
1119 /* Write the mismatch index */
1120 log_ptr += mach_write_compressed(log_ptr, i);
1121
1122 ut_a(i < UNIV_PAGE_SIZE);
1123 ut_a(extra_size < UNIV_PAGE_SIZE);
1124 } else {
1125 /* Write the record end segment length
1126 and the extra info storage flag */
1127 log_ptr += mach_write_compressed(log_ptr, 2 * (rec_size - i));
1128 }
1129
1130 /* Write to the log the inserted index record end segment which
1131 differs from the cursor record */
1132
1133 rec_size -= i;
1134
1135 if (log_ptr + rec_size <= log_end) {
1136 memcpy(log_ptr, ins_ptr, rec_size);
1137 mlog_close(mtr, log_ptr + rec_size);
1138 } else {
1139 mlog_close(mtr, log_ptr);
1140 ut_a(rec_size < UNIV_PAGE_SIZE);
1141 mlog_catenate_string(mtr, ins_ptr, rec_size);
1142 }
1143 }
1144 #else /* !UNIV_HOTBACKUP */
1145 # define page_cur_insert_rec_write_log(ins_rec,size,cur,index,mtr) ((void) 0)
1146 #endif /* !UNIV_HOTBACKUP */
1147
1148 /***********************************************************//**
1149 Parses a log record of a record insert on a page.
1150 @return end of log record or NULL */
1151 byte*
page_cur_parse_insert_rec(ibool is_short,const byte * ptr,const byte * end_ptr,buf_block_t * block,dict_index_t * index,mtr_t * mtr)1152 page_cur_parse_insert_rec(
1153 /*======================*/
1154 ibool is_short,/*!< in: TRUE if short inserts */
1155 const byte* ptr, /*!< in: buffer */
1156 const byte* end_ptr,/*!< in: buffer end */
1157 buf_block_t* block, /*!< in: page or NULL */
1158 dict_index_t* index, /*!< in: record descriptor */
1159 mtr_t* mtr) /*!< in: mtr or NULL */
1160 {
1161 ulint origin_offset = 0; /* remove warning */
1162 ulint end_seg_len;
1163 ulint mismatch_index = 0; /* remove warning */
1164 page_t* page;
1165 rec_t* cursor_rec;
1166 byte buf1[1024];
1167 byte* buf;
1168 const byte* ptr2 = ptr;
1169 ulint info_and_status_bits = 0; /* remove warning */
1170 page_cur_t cursor;
1171 mem_heap_t* heap = NULL;
1172 ulint offsets_[REC_OFFS_NORMAL_SIZE];
1173 ulint* offsets = offsets_;
1174 rec_offs_init(offsets_);
1175
1176 page = block ? buf_block_get_frame(block) : NULL;
1177
1178 if (is_short) {
1179 cursor_rec = page_rec_get_prev(page_get_supremum_rec(page));
1180 } else {
1181 ulint offset;
1182
1183 /* Read the cursor rec offset as a 2-byte ulint */
1184
1185 if (UNIV_UNLIKELY(end_ptr < ptr + 2)) {
1186
1187 return(NULL);
1188 }
1189
1190 offset = mach_read_from_2(ptr);
1191 ptr += 2;
1192
1193 cursor_rec = page + offset;
1194
1195 if (offset >= UNIV_PAGE_SIZE) {
1196
1197 recv_sys->found_corrupt_log = TRUE;
1198
1199 return(NULL);
1200 }
1201 }
1202
1203 end_seg_len = mach_parse_compressed(&ptr, end_ptr);
1204
1205 if (ptr == NULL) {
1206
1207 return(NULL);
1208 }
1209
1210 if (end_seg_len >= UNIV_PAGE_SIZE << 1) {
1211 recv_sys->found_corrupt_log = TRUE;
1212
1213 return(NULL);
1214 }
1215
1216 if (end_seg_len & 0x1UL) {
1217 /* Read the info bits */
1218
1219 if (end_ptr < ptr + 1) {
1220
1221 return(NULL);
1222 }
1223
1224 info_and_status_bits = mach_read_from_1(ptr);
1225 ptr++;
1226
1227 origin_offset = mach_parse_compressed(&ptr, end_ptr);
1228
1229 if (ptr == NULL) {
1230
1231 return(NULL);
1232 }
1233
1234 ut_a(origin_offset < UNIV_PAGE_SIZE);
1235
1236 mismatch_index = mach_parse_compressed(&ptr, end_ptr);
1237
1238 if (ptr == NULL) {
1239
1240 return(NULL);
1241 }
1242
1243 ut_a(mismatch_index < UNIV_PAGE_SIZE);
1244 }
1245
1246 if (end_ptr < ptr + (end_seg_len >> 1)) {
1247
1248 return(NULL);
1249 }
1250
1251 if (!block) {
1252
1253 return(const_cast<byte*>(ptr + (end_seg_len >> 1)));
1254 }
1255
1256 ut_ad(!!page_is_comp(page) == dict_table_is_comp(index->table));
1257 ut_ad(!buf_block_get_page_zip(block) || page_is_comp(page));
1258
1259 /* Read from the log the inserted index record end segment which
1260 differs from the cursor record */
1261
1262 offsets = rec_get_offsets(cursor_rec, index, offsets,
1263 ULINT_UNDEFINED, &heap);
1264
1265 if (!(end_seg_len & 0x1UL)) {
1266 info_and_status_bits = rec_get_info_and_status_bits(
1267 cursor_rec, page_is_comp(page));
1268 origin_offset = rec_offs_extra_size(offsets);
1269 mismatch_index = rec_offs_size(offsets) - (end_seg_len >> 1);
1270 }
1271
1272 end_seg_len >>= 1;
1273
1274 if (mismatch_index + end_seg_len < sizeof buf1) {
1275 buf = buf1;
1276 } else {
1277 buf = static_cast<byte*>(
1278 ut_malloc_nokey(mismatch_index + end_seg_len));
1279 }
1280
1281 /* Build the inserted record to buf */
1282
1283 if (UNIV_UNLIKELY(mismatch_index >= UNIV_PAGE_SIZE)) {
1284
1285 ib::fatal() << "is_short " << is_short << ", "
1286 << "info_and_status_bits " << info_and_status_bits
1287 << ", offset " << page_offset(cursor_rec) << ","
1288 " o_offset " << origin_offset << ", mismatch index "
1289 << mismatch_index << ", end_seg_len " << end_seg_len
1290 << " parsed len " << (ptr - ptr2);
1291 }
1292
1293 ut_memcpy(buf, rec_get_start(cursor_rec, offsets), mismatch_index);
1294 ut_memcpy(buf + mismatch_index, ptr, end_seg_len);
1295
1296 if (page_is_comp(page)) {
1297 rec_set_info_and_status_bits(buf + origin_offset,
1298 info_and_status_bits);
1299 } else {
1300 rec_set_info_bits_old(buf + origin_offset,
1301 info_and_status_bits);
1302 }
1303
1304 page_cur_position(cursor_rec, block, &cursor);
1305
1306 offsets = rec_get_offsets(buf + origin_offset, index, offsets,
1307 ULINT_UNDEFINED, &heap);
1308 if (UNIV_UNLIKELY(!page_cur_rec_insert(&cursor,
1309 buf + origin_offset,
1310 index, offsets, mtr))) {
1311 /* The redo log record should only have been written
1312 after the write was successful. */
1313 ut_error;
1314 }
1315
1316 if (buf != buf1) {
1317
1318 ut_free(buf);
1319 }
1320
1321 if (UNIV_LIKELY_NULL(heap)) {
1322 mem_heap_free(heap);
1323 }
1324
1325 return(const_cast<byte*>(ptr + end_seg_len));
1326 }
1327
1328 /***********************************************************//**
1329 Inserts a record next to page cursor on an uncompressed page.
1330 Returns pointer to inserted record if succeed, i.e., enough
1331 space available, NULL otherwise. The cursor stays at the same position.
1332 @return pointer to record if succeed, NULL otherwise */
1333 rec_t*
page_cur_insert_rec_low(rec_t * current_rec,dict_index_t * index,const rec_t * rec,ulint * offsets,mtr_t * mtr)1334 page_cur_insert_rec_low(
1335 /*====================*/
1336 rec_t* current_rec,/*!< in: pointer to current record after
1337 which the new record is inserted */
1338 dict_index_t* index, /*!< in: record descriptor */
1339 const rec_t* rec, /*!< in: pointer to a physical record */
1340 ulint* offsets,/*!< in/out: rec_get_offsets(rec, index) */
1341 mtr_t* mtr) /*!< in: mini-transaction handle, or NULL */
1342 {
1343 byte* insert_buf;
1344 ulint rec_size;
1345 page_t* page; /*!< the relevant page */
1346 rec_t* last_insert; /*!< cursor position at previous
1347 insert */
1348 rec_t* free_rec; /*!< a free record that was reused,
1349 or NULL */
1350 rec_t* insert_rec; /*!< inserted record */
1351 ulint heap_no; /*!< heap number of the inserted
1352 record */
1353
1354 ut_ad(rec_offs_validate(rec, index, offsets));
1355
1356 page = page_align(current_rec);
1357 ut_ad(dict_table_is_comp(index->table)
1358 == (ibool) !!page_is_comp(page));
1359 ut_ad(fil_page_index_page_check(page));
1360 ut_ad(mach_read_from_8(page + PAGE_HEADER + PAGE_INDEX_ID) == index->id
1361 || recv_recovery_is_on()
1362 || (mtr ? mtr->is_inside_ibuf() : dict_index_is_ibuf(index)));
1363
1364 ut_ad(!page_rec_is_supremum(current_rec));
1365
1366 /* 1. Get the size of the physical record in the page */
1367 rec_size = rec_offs_size(offsets);
1368
1369 #ifdef UNIV_DEBUG_VALGRIND
1370 {
1371 const void* rec_start
1372 = rec - rec_offs_extra_size(offsets);
1373 ulint extra_size
1374 = rec_offs_extra_size(offsets)
1375 - (rec_offs_comp(offsets)
1376 ? REC_N_NEW_EXTRA_BYTES
1377 : REC_N_OLD_EXTRA_BYTES);
1378
1379 /* All data bytes of the record must be valid. */
1380 UNIV_MEM_ASSERT_RW(rec, rec_offs_data_size(offsets));
1381 /* The variable-length header must be valid. */
1382 UNIV_MEM_ASSERT_RW(rec_start, extra_size);
1383 }
1384 #endif /* UNIV_DEBUG_VALGRIND */
1385
1386 /* 2. Try to find suitable space from page memory management */
1387
1388 free_rec = page_header_get_ptr(page, PAGE_FREE);
1389 if (UNIV_LIKELY_NULL(free_rec)) {
1390 /* Try to allocate from the head of the free list. */
1391 ulint foffsets_[REC_OFFS_NORMAL_SIZE];
1392 ulint* foffsets = foffsets_;
1393 mem_heap_t* heap = NULL;
1394
1395 rec_offs_init(foffsets_);
1396
1397 foffsets = rec_get_offsets(
1398 free_rec, index, foffsets, ULINT_UNDEFINED, &heap);
1399 if (rec_offs_size(foffsets) < rec_size) {
1400 if (UNIV_LIKELY_NULL(heap)) {
1401 mem_heap_free(heap);
1402 }
1403
1404 goto use_heap;
1405 }
1406
1407 insert_buf = free_rec - rec_offs_extra_size(foffsets);
1408
1409 if (page_is_comp(page)) {
1410 heap_no = rec_get_heap_no_new(free_rec);
1411 page_mem_alloc_free(page, NULL,
1412 rec_get_next_ptr(free_rec, TRUE),
1413 rec_size);
1414 } else {
1415 heap_no = rec_get_heap_no_old(free_rec);
1416 page_mem_alloc_free(page, NULL,
1417 rec_get_next_ptr(free_rec, FALSE),
1418 rec_size);
1419 }
1420
1421 if (UNIV_LIKELY_NULL(heap)) {
1422 mem_heap_free(heap);
1423 }
1424 } else {
1425 use_heap:
1426 free_rec = NULL;
1427 insert_buf = page_mem_alloc_heap(page, NULL,
1428 rec_size, &heap_no);
1429
1430 if (UNIV_UNLIKELY(insert_buf == NULL)) {
1431 return(NULL);
1432 }
1433 }
1434
1435 /* 3. Create the record */
1436 insert_rec = rec_copy(insert_buf, rec, offsets);
1437 rec_offs_make_valid(insert_rec, index, offsets);
1438
1439 /* 4. Insert the record in the linked list of records */
1440 ut_ad(current_rec != insert_rec);
1441
1442 {
1443 /* next record after current before the insertion */
1444 rec_t* next_rec = page_rec_get_next(current_rec);
1445 #ifdef UNIV_DEBUG
1446 if (page_is_comp(page)) {
1447 ut_ad(rec_get_status(current_rec)
1448 <= REC_STATUS_INFIMUM);
1449 ut_ad(rec_get_status(insert_rec) < REC_STATUS_INFIMUM);
1450 ut_ad(rec_get_status(next_rec) != REC_STATUS_INFIMUM);
1451 }
1452 #endif
1453 page_rec_set_next(insert_rec, next_rec);
1454 page_rec_set_next(current_rec, insert_rec);
1455 }
1456
1457 page_header_set_field(page, NULL, PAGE_N_RECS,
1458 1 + page_get_n_recs(page));
1459
1460 /* 5. Set the n_owned field in the inserted record to zero,
1461 and set the heap_no field */
1462 if (page_is_comp(page)) {
1463 rec_set_n_owned_new(insert_rec, NULL, 0);
1464 rec_set_heap_no_new(insert_rec, heap_no);
1465 } else {
1466 rec_set_n_owned_old(insert_rec, 0);
1467 rec_set_heap_no_old(insert_rec, heap_no);
1468 }
1469
1470 UNIV_MEM_ASSERT_RW(rec_get_start(insert_rec, offsets),
1471 rec_offs_size(offsets));
1472 /* 6. Update the last insertion info in page header */
1473
1474 last_insert = page_header_get_ptr(page, PAGE_LAST_INSERT);
1475 ut_ad(!last_insert || !page_is_comp(page)
1476 || rec_get_node_ptr_flag(last_insert)
1477 == rec_get_node_ptr_flag(insert_rec));
1478
1479 if (!dict_index_is_spatial(index)) {
1480 if (UNIV_UNLIKELY(last_insert == NULL)) {
1481 page_header_set_field(page, NULL, PAGE_DIRECTION,
1482 PAGE_NO_DIRECTION);
1483 page_header_set_field(page, NULL, PAGE_N_DIRECTION, 0);
1484
1485 } else if ((last_insert == current_rec)
1486 && (page_header_get_field(page, PAGE_DIRECTION)
1487 != PAGE_LEFT)) {
1488
1489 page_header_set_field(page, NULL, PAGE_DIRECTION,
1490 PAGE_RIGHT);
1491 page_header_set_field(page, NULL, PAGE_N_DIRECTION,
1492 page_header_get_field(
1493 page, PAGE_N_DIRECTION) + 1);
1494
1495 } else if ((page_rec_get_next(insert_rec) == last_insert)
1496 && (page_header_get_field(page, PAGE_DIRECTION)
1497 != PAGE_RIGHT)) {
1498
1499 page_header_set_field(page, NULL, PAGE_DIRECTION,
1500 PAGE_LEFT);
1501 page_header_set_field(page, NULL, PAGE_N_DIRECTION,
1502 page_header_get_field(
1503 page, PAGE_N_DIRECTION) + 1);
1504 } else {
1505 page_header_set_field(page, NULL, PAGE_DIRECTION,
1506 PAGE_NO_DIRECTION);
1507 page_header_set_field(page, NULL, PAGE_N_DIRECTION, 0);
1508 }
1509 }
1510
1511 page_header_set_ptr(page, NULL, PAGE_LAST_INSERT, insert_rec);
1512
1513 /* 7. It remains to update the owner record. */
1514 {
1515 rec_t* owner_rec = page_rec_find_owner_rec(insert_rec);
1516 ulint n_owned;
1517 if (page_is_comp(page)) {
1518 n_owned = rec_get_n_owned_new(owner_rec);
1519 rec_set_n_owned_new(owner_rec, NULL, n_owned + 1);
1520 } else {
1521 n_owned = rec_get_n_owned_old(owner_rec);
1522 rec_set_n_owned_old(owner_rec, n_owned + 1);
1523 }
1524
1525 /* 8. Now we have incremented the n_owned field of the owner
1526 record. If the number exceeds PAGE_DIR_SLOT_MAX_N_OWNED,
1527 we have to split the corresponding directory slot in two. */
1528
1529 if (UNIV_UNLIKELY(n_owned == PAGE_DIR_SLOT_MAX_N_OWNED)) {
1530 page_dir_split_slot(
1531 page, NULL,
1532 page_dir_find_owner_slot(owner_rec));
1533 }
1534 }
1535
1536 /* 9. Write log record of the insert */
1537 if (UNIV_LIKELY(mtr != NULL)) {
1538 page_cur_insert_rec_write_log(insert_rec, rec_size,
1539 current_rec, index, mtr);
1540 }
1541
1542 return(insert_rec);
1543 }
1544
1545 /** Inserts a record next to page cursor on an uncompressed page.
1546 @param[in] current_rec pointer to current record after which
1547 the new record is inserted.
1548 @param[in] index record descriptor
1549 @param[in] tuple pointer to a data tuple
1550 @param[in] n_ext number of externally stored columns
1551 @param[in] mtr mini-transaction handle, or NULL
1552
1553 @return pointer to record if succeed, NULL otherwise */
1554 rec_t*
page_cur_direct_insert_rec_low(rec_t * current_rec,dict_index_t * index,const dtuple_t * tuple,ulint n_ext,mtr_t * mtr)1555 page_cur_direct_insert_rec_low(
1556 rec_t* current_rec,
1557 dict_index_t* index,
1558 const dtuple_t* tuple,
1559 ulint n_ext,
1560 mtr_t* mtr)
1561 {
1562 byte* insert_buf;
1563 ulint rec_size;
1564 page_t* page; /*!< the relevant page */
1565 rec_t* last_insert; /*!< cursor position at previous
1566 insert */
1567 rec_t* free_rec; /*!< a free record that was reused,
1568 or NULL */
1569 rec_t* insert_rec; /*!< inserted record */
1570 ulint heap_no; /*!< heap number of the inserted
1571 record */
1572
1573 page = page_align(current_rec);
1574
1575 ut_ad(dict_table_is_comp(index->table)
1576 == (ibool) !!page_is_comp(page));
1577
1578 ut_ad(fil_page_index_page_check(page));
1579
1580 ut_ad(mach_read_from_8(page + PAGE_HEADER + PAGE_INDEX_ID)
1581 == index->id);
1582
1583 ut_ad(!page_rec_is_supremum(current_rec));
1584
1585 /* 1. Get the size of the physical record in the page */
1586 rec_size = index->rec_cache.rec_size;
1587
1588 /* 2. Try to find suitable space from page memory management */
1589 free_rec = page_header_get_ptr(page, PAGE_FREE);
1590 if (free_rec) {
1591 /* Try to allocate from the head of the free list. */
1592 ulint foffsets_[REC_OFFS_NORMAL_SIZE];
1593 ulint* foffsets = foffsets_;
1594 mem_heap_t* heap = NULL;
1595
1596 rec_offs_init(foffsets_);
1597
1598 foffsets = rec_get_offsets(
1599 free_rec, index, foffsets, ULINT_UNDEFINED, &heap);
1600 if (rec_offs_size(foffsets) < rec_size) {
1601 if (heap != NULL) {
1602 mem_heap_free(heap);
1603 heap = NULL;
1604 }
1605
1606 free_rec = NULL;
1607 insert_buf = page_mem_alloc_heap(
1608 page, NULL, rec_size, &heap_no);
1609
1610 if (insert_buf == NULL) {
1611 return(NULL);
1612 }
1613 } else {
1614 insert_buf = free_rec - rec_offs_extra_size(foffsets);
1615
1616 if (page_is_comp(page)) {
1617 heap_no = rec_get_heap_no_new(free_rec);
1618 page_mem_alloc_free(
1619 page, NULL,
1620 rec_get_next_ptr(free_rec, TRUE),
1621 rec_size);
1622 } else {
1623 heap_no = rec_get_heap_no_old(free_rec);
1624 page_mem_alloc_free(
1625 page, NULL,
1626 rec_get_next_ptr(free_rec, FALSE),
1627 rec_size);
1628 }
1629
1630 if (heap != NULL) {
1631 mem_heap_free(heap);
1632 heap = NULL;
1633 }
1634 }
1635 } else {
1636 free_rec = NULL;
1637 insert_buf = page_mem_alloc_heap(page, NULL,
1638 rec_size, &heap_no);
1639
1640 if (insert_buf == NULL) {
1641 return(NULL);
1642 }
1643 }
1644
1645 /* 3. Create the record */
1646 insert_rec = rec_convert_dtuple_to_rec(insert_buf, index, tuple, n_ext);
1647
1648 /* 4. Insert the record in the linked list of records */
1649 ut_ad(current_rec != insert_rec);
1650
1651 {
1652 /* next record after current before the insertion */
1653 rec_t* next_rec = page_rec_get_next(current_rec);
1654 #ifdef UNIV_DEBUG
1655 if (page_is_comp(page)) {
1656 ut_ad(rec_get_status(current_rec)
1657 <= REC_STATUS_INFIMUM);
1658 ut_ad(rec_get_status(insert_rec) < REC_STATUS_INFIMUM);
1659 ut_ad(rec_get_status(next_rec) != REC_STATUS_INFIMUM);
1660 }
1661 #endif
1662 page_rec_set_next(insert_rec, next_rec);
1663 page_rec_set_next(current_rec, insert_rec);
1664 }
1665
1666 page_header_set_field(page, NULL, PAGE_N_RECS,
1667 1 + page_get_n_recs(page));
1668
1669 /* 5. Set the n_owned field in the inserted record to zero,
1670 and set the heap_no field */
1671 if (page_is_comp(page)) {
1672 rec_set_n_owned_new(insert_rec, NULL, 0);
1673 rec_set_heap_no_new(insert_rec, heap_no);
1674 } else {
1675 rec_set_n_owned_old(insert_rec, 0);
1676 rec_set_heap_no_old(insert_rec, heap_no);
1677 }
1678
1679 /* 6. Update the last insertion info in page header */
1680
1681 last_insert = page_header_get_ptr(page, PAGE_LAST_INSERT);
1682 ut_ad(!last_insert || !page_is_comp(page)
1683 || rec_get_node_ptr_flag(last_insert)
1684 == rec_get_node_ptr_flag(insert_rec));
1685
1686 if (last_insert == NULL) {
1687 page_header_set_field(page, NULL, PAGE_DIRECTION,
1688 PAGE_NO_DIRECTION);
1689 page_header_set_field(page, NULL, PAGE_N_DIRECTION, 0);
1690
1691 } else if ((last_insert == current_rec)
1692 && (page_header_get_field(page, PAGE_DIRECTION)
1693 != PAGE_LEFT)) {
1694
1695 page_header_set_field(page, NULL, PAGE_DIRECTION,
1696 PAGE_RIGHT);
1697 page_header_set_field(page, NULL, PAGE_N_DIRECTION,
1698 page_header_get_field(
1699 page, PAGE_N_DIRECTION) + 1);
1700
1701 } else if ((page_rec_get_next(insert_rec) == last_insert)
1702 && (page_header_get_field(page, PAGE_DIRECTION)
1703 != PAGE_RIGHT)) {
1704
1705 page_header_set_field(page, NULL, PAGE_DIRECTION,
1706 PAGE_LEFT);
1707 page_header_set_field(page, NULL, PAGE_N_DIRECTION,
1708 page_header_get_field(
1709 page, PAGE_N_DIRECTION) + 1);
1710 } else {
1711 page_header_set_field(page, NULL, PAGE_DIRECTION,
1712 PAGE_NO_DIRECTION);
1713 page_header_set_field(page, NULL, PAGE_N_DIRECTION, 0);
1714 }
1715
1716 page_header_set_ptr(page, NULL, PAGE_LAST_INSERT, insert_rec);
1717
1718 /* 7. It remains to update the owner record. */
1719 {
1720 rec_t* owner_rec = page_rec_find_owner_rec(insert_rec);
1721 ulint n_owned;
1722 if (page_is_comp(page)) {
1723 n_owned = rec_get_n_owned_new(owner_rec);
1724 rec_set_n_owned_new(owner_rec, NULL, n_owned + 1);
1725 } else {
1726 n_owned = rec_get_n_owned_old(owner_rec);
1727 rec_set_n_owned_old(owner_rec, n_owned + 1);
1728 }
1729
1730 /* 8. Now we have incremented the n_owned field of the owner
1731 record. If the number exceeds PAGE_DIR_SLOT_MAX_N_OWNED,
1732 we have to split the corresponding directory slot in two. */
1733
1734 if (n_owned == PAGE_DIR_SLOT_MAX_N_OWNED) {
1735 page_dir_split_slot(
1736 page, NULL,
1737 page_dir_find_owner_slot(owner_rec));
1738 }
1739 }
1740
1741 /* 8. Open the mtr for name sake to set the modification flag
1742 to true failing which no flush would be done. */
1743 byte* log_ptr = mlog_open(mtr, 0);
1744 ut_ad(log_ptr == NULL);
1745 if (log_ptr != NULL) {
1746 /* To keep complier happy. */
1747 mlog_close(mtr, log_ptr);
1748 }
1749
1750 return(insert_rec);
1751 }
1752
1753 /***********************************************************//**
1754 Inserts a record next to page cursor on a compressed and uncompressed
1755 page. Returns pointer to inserted record if succeed, i.e.,
1756 enough space available, NULL otherwise.
1757 The cursor stays at the same position.
1758
1759 IMPORTANT: The caller will have to update IBUF_BITMAP_FREE
1760 if this is a compressed leaf page in a secondary index.
1761 This has to be done either within the same mini-transaction,
1762 or by invoking ibuf_reset_free_bits() before mtr_commit().
1763
1764 @return pointer to record if succeed, NULL otherwise */
1765 rec_t*
page_cur_insert_rec_zip(page_cur_t * cursor,dict_index_t * index,const rec_t * rec,ulint * offsets,mtr_t * mtr)1766 page_cur_insert_rec_zip(
1767 /*====================*/
1768 page_cur_t* cursor, /*!< in/out: page cursor */
1769 dict_index_t* index, /*!< in: record descriptor */
1770 const rec_t* rec, /*!< in: pointer to a physical record */
1771 ulint* offsets,/*!< in/out: rec_get_offsets(rec, index) */
1772 mtr_t* mtr) /*!< in: mini-transaction handle, or NULL */
1773 {
1774 byte* insert_buf;
1775 ulint rec_size;
1776 page_t* page; /*!< the relevant page */
1777 rec_t* last_insert; /*!< cursor position at previous
1778 insert */
1779 rec_t* free_rec; /*!< a free record that was reused,
1780 or NULL */
1781 rec_t* insert_rec; /*!< inserted record */
1782 ulint heap_no; /*!< heap number of the inserted
1783 record */
1784 page_zip_des_t* page_zip;
1785
1786 page_zip = page_cur_get_page_zip(cursor);
1787 ut_ad(page_zip);
1788
1789 ut_ad(rec_offs_validate(rec, index, offsets));
1790
1791 page = page_cur_get_page(cursor);
1792 ut_ad(dict_table_is_comp(index->table));
1793 ut_ad(page_is_comp(page));
1794 ut_ad(fil_page_index_page_check(page));
1795 ut_ad(mach_read_from_8(page + PAGE_HEADER + PAGE_INDEX_ID) == index->id
1796 || (mtr ? mtr->is_inside_ibuf() : dict_index_is_ibuf(index))
1797 || recv_recovery_is_on());
1798
1799 ut_ad(!page_cur_is_after_last(cursor));
1800 #ifdef UNIV_ZIP_DEBUG
1801 ut_a(page_zip_validate(page_zip, page, index));
1802 #endif /* UNIV_ZIP_DEBUG */
1803
1804 /* 1. Get the size of the physical record in the page */
1805 rec_size = rec_offs_size(offsets);
1806
1807 #ifdef UNIV_DEBUG_VALGRIND
1808 {
1809 const void* rec_start
1810 = rec - rec_offs_extra_size(offsets);
1811 ulint extra_size
1812 = rec_offs_extra_size(offsets)
1813 - (rec_offs_comp(offsets)
1814 ? REC_N_NEW_EXTRA_BYTES
1815 : REC_N_OLD_EXTRA_BYTES);
1816
1817 /* All data bytes of the record must be valid. */
1818 UNIV_MEM_ASSERT_RW(rec, rec_offs_data_size(offsets));
1819 /* The variable-length header must be valid. */
1820 UNIV_MEM_ASSERT_RW(rec_start, extra_size);
1821 }
1822 #endif /* UNIV_DEBUG_VALGRIND */
1823
1824 const bool reorg_before_insert = page_has_garbage(page)
1825 && rec_size > page_get_max_insert_size(page, 1)
1826 && rec_size <= page_get_max_insert_size_after_reorganize(
1827 page, 1);
1828
1829 /* 2. Try to find suitable space from page memory management */
1830 if (!page_zip_available(page_zip, dict_index_is_clust(index),
1831 rec_size, 1)
1832 || reorg_before_insert) {
1833 /* The values can change dynamically. */
1834 bool log_compressed = page_zip_log_pages;
1835 ulint level = page_zip_level;
1836 #ifdef UNIV_DEBUG
1837 rec_t* cursor_rec = page_cur_get_rec(cursor);
1838 #endif /* UNIV_DEBUG */
1839
1840 /* If we are not writing compressed page images, we
1841 must reorganize the page before attempting the
1842 insert. */
1843 if (recv_recovery_is_on()) {
1844 /* Insert into the uncompressed page only.
1845 The page reorganization or creation that we
1846 would attempt outside crash recovery would
1847 have been covered by a previous redo log record. */
1848 } else if (page_is_empty(page)) {
1849 ut_ad(page_cur_is_before_first(cursor));
1850
1851 /* This is an empty page. Recreate it to
1852 get rid of the modification log. */
1853 page_create_zip(page_cur_get_block(cursor), index,
1854 page_header_get_field(page, PAGE_LEVEL),
1855 0, NULL, mtr);
1856 ut_ad(!page_header_get_ptr(page, PAGE_FREE));
1857
1858 if (page_zip_available(
1859 page_zip, dict_index_is_clust(index),
1860 rec_size, 1)) {
1861 goto use_heap;
1862 }
1863
1864 /* The cursor should remain on the page infimum. */
1865 return(NULL);
1866 } else if (!page_zip->m_nonempty && !page_has_garbage(page)) {
1867 /* The page has been freshly compressed, so
1868 reorganizing it will not help. */
1869 } else if (log_compressed && !reorg_before_insert) {
1870 /* Insert into uncompressed page only, and
1871 try page_zip_reorganize() afterwards. */
1872 } else if (btr_page_reorganize_low(
1873 recv_recovery_is_on(), level,
1874 cursor, index, mtr)) {
1875 ut_ad(!page_header_get_ptr(page, PAGE_FREE));
1876
1877 if (page_zip_available(
1878 page_zip, dict_index_is_clust(index),
1879 rec_size, 1)) {
1880 /* After reorganizing, there is space
1881 available. */
1882 goto use_heap;
1883 }
1884 } else {
1885 ut_ad(cursor->rec == cursor_rec);
1886 return(NULL);
1887 }
1888
1889 /* Try compressing the whole page afterwards. */
1890 insert_rec = page_cur_insert_rec_low(
1891 cursor->rec, index, rec, offsets, NULL);
1892
1893 /* If recovery is on, this implies that the compression
1894 of the page was successful during runtime. Had that not
1895 been the case or had the redo logging of compressed
1896 pages been enabled during runtime then we'd have seen
1897 a MLOG_ZIP_PAGE_COMPRESS redo record. Therefore, we
1898 know that we don't need to reorganize the page. We,
1899 however, do need to recompress the page. That will
1900 happen when the next redo record is read which must
1901 be of type MLOG_ZIP_PAGE_COMPRESS_NO_DATA and it must
1902 contain a valid compression level value.
1903 This implies that during recovery from this point till
1904 the next redo is applied the uncompressed and
1905 compressed versions are not identical and
1906 page_zip_validate will fail but that is OK because
1907 we call page_zip_validate only after processing
1908 all changes to a page under a single mtr during
1909 recovery. */
1910 if (insert_rec == NULL) {
1911 /* Out of space.
1912 This should never occur during crash recovery,
1913 because the MLOG_COMP_REC_INSERT should only
1914 be logged after a successful operation. */
1915 ut_ad(!recv_recovery_is_on());
1916 } else if (recv_recovery_is_on()) {
1917 /* This should be followed by
1918 MLOG_ZIP_PAGE_COMPRESS_NO_DATA,
1919 which should succeed. */
1920 rec_offs_make_valid(insert_rec, index, offsets);
1921 } else {
1922 ulint pos = page_rec_get_n_recs_before(insert_rec);
1923 ut_ad(pos > 0);
1924
1925 if (!log_compressed) {
1926 if (page_zip_compress(
1927 page_zip, page, index,
1928 level, NULL, NULL)) {
1929 page_cur_insert_rec_write_log(
1930 insert_rec, rec_size,
1931 cursor->rec, index, mtr);
1932 page_zip_compress_write_log_no_data(
1933 level, page, index, mtr);
1934
1935 rec_offs_make_valid(
1936 insert_rec, index, offsets);
1937 return(insert_rec);
1938 }
1939
1940 ut_ad(cursor->rec
1941 == (pos > 1
1942 ? page_rec_get_nth(
1943 page, pos - 1)
1944 : page + PAGE_NEW_INFIMUM));
1945 } else {
1946 /* We are writing entire page images
1947 to the log. Reduce the redo log volume
1948 by reorganizing the page at the same time. */
1949 if (page_zip_reorganize(
1950 cursor->block, index, mtr)) {
1951 /* The page was reorganized:
1952 Seek to pos. */
1953 if (pos > 1) {
1954 cursor->rec = page_rec_get_nth(
1955 page, pos - 1);
1956 } else {
1957 cursor->rec = page
1958 + PAGE_NEW_INFIMUM;
1959 }
1960
1961 insert_rec = page + rec_get_next_offs(
1962 cursor->rec, TRUE);
1963 rec_offs_make_valid(
1964 insert_rec, index, offsets);
1965 return(insert_rec);
1966 }
1967
1968 /* Theoretically, we could try one
1969 last resort of btr_page_reorganize_low()
1970 followed by page_zip_available(), but
1971 that would be very unlikely to
1972 succeed. (If the full reorganized page
1973 failed to compress, why would it
1974 succeed to compress the page, plus log
1975 the insert of this record? */
1976 }
1977
1978 /* Out of space: restore the page */
1979 if (!page_zip_decompress(page_zip, page, FALSE)) {
1980 ut_error; /* Memory corrupted? */
1981 }
1982 ut_ad(page_validate(page, index));
1983 insert_rec = NULL;
1984 }
1985
1986 return(insert_rec);
1987 }
1988
1989 free_rec = page_header_get_ptr(page, PAGE_FREE);
1990 if (UNIV_LIKELY_NULL(free_rec)) {
1991 /* Try to allocate from the head of the free list. */
1992 lint extra_size_diff;
1993 ulint foffsets_[REC_OFFS_NORMAL_SIZE];
1994 ulint* foffsets = foffsets_;
1995 mem_heap_t* heap = NULL;
1996
1997 rec_offs_init(foffsets_);
1998
1999 foffsets = rec_get_offsets(free_rec, index, foffsets,
2000 ULINT_UNDEFINED, &heap);
2001 if (rec_offs_size(foffsets) < rec_size) {
2002 too_small:
2003 if (UNIV_LIKELY_NULL(heap)) {
2004 mem_heap_free(heap);
2005 }
2006
2007 goto use_heap;
2008 }
2009
2010 insert_buf = free_rec - rec_offs_extra_size(foffsets);
2011
2012 /* On compressed pages, do not relocate records from
2013 the free list. If extra_size would grow, use the heap. */
2014 extra_size_diff
2015 = rec_offs_extra_size(offsets)
2016 - rec_offs_extra_size(foffsets);
2017
2018 if (UNIV_UNLIKELY(extra_size_diff < 0)) {
2019 /* Add an offset to the extra_size. */
2020 if (rec_offs_size(foffsets)
2021 < rec_size - extra_size_diff) {
2022
2023 goto too_small;
2024 }
2025
2026 insert_buf -= extra_size_diff;
2027 } else if (UNIV_UNLIKELY(extra_size_diff)) {
2028 /* Do not allow extra_size to grow */
2029
2030 goto too_small;
2031 }
2032
2033 heap_no = rec_get_heap_no_new(free_rec);
2034 page_mem_alloc_free(page, page_zip,
2035 rec_get_next_ptr(free_rec, TRUE),
2036 rec_size);
2037
2038 if (!page_is_leaf(page)) {
2039 /* Zero out the node pointer of free_rec,
2040 in case it will not be overwritten by
2041 insert_rec. */
2042
2043 ut_ad(rec_size > REC_NODE_PTR_SIZE);
2044
2045 if (rec_offs_extra_size(foffsets)
2046 + rec_offs_data_size(foffsets) > rec_size) {
2047
2048 memset(rec_get_end(free_rec, foffsets)
2049 - REC_NODE_PTR_SIZE, 0,
2050 REC_NODE_PTR_SIZE);
2051 }
2052 } else if (dict_index_is_clust(index)) {
2053 /* Zero out the DB_TRX_ID and DB_ROLL_PTR
2054 columns of free_rec, in case it will not be
2055 overwritten by insert_rec. */
2056
2057 ulint trx_id_col;
2058 ulint trx_id_offs;
2059 ulint len;
2060
2061 trx_id_col = dict_index_get_sys_col_pos(index,
2062 DATA_TRX_ID);
2063 ut_ad(trx_id_col > 0);
2064 ut_ad(trx_id_col != ULINT_UNDEFINED);
2065
2066 trx_id_offs = rec_get_nth_field_offs(foffsets,
2067 trx_id_col, &len);
2068 ut_ad(len == DATA_TRX_ID_LEN);
2069
2070 if (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN + trx_id_offs
2071 + rec_offs_extra_size(foffsets) > rec_size) {
2072 /* We will have to zero out the
2073 DB_TRX_ID and DB_ROLL_PTR, because
2074 they will not be fully overwritten by
2075 insert_rec. */
2076
2077 memset(free_rec + trx_id_offs, 0,
2078 DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
2079 }
2080
2081 ut_ad(free_rec + trx_id_offs + DATA_TRX_ID_LEN
2082 == rec_get_nth_field(free_rec, foffsets,
2083 trx_id_col + 1, &len));
2084 ut_ad(len == DATA_ROLL_PTR_LEN);
2085 }
2086
2087 if (UNIV_LIKELY_NULL(heap)) {
2088 mem_heap_free(heap);
2089 }
2090 } else {
2091 use_heap:
2092 free_rec = NULL;
2093 insert_buf = page_mem_alloc_heap(page, page_zip,
2094 rec_size, &heap_no);
2095
2096 if (UNIV_UNLIKELY(insert_buf == NULL)) {
2097 return(NULL);
2098 }
2099
2100 page_zip_dir_add_slot(page_zip, dict_index_is_clust(index));
2101 }
2102
2103 /* 3. Create the record */
2104 insert_rec = rec_copy(insert_buf, rec, offsets);
2105 rec_offs_make_valid(insert_rec, index, offsets);
2106
2107 /* 4. Insert the record in the linked list of records */
2108 ut_ad(cursor->rec != insert_rec);
2109
2110 {
2111 /* next record after current before the insertion */
2112 const rec_t* next_rec = page_rec_get_next_low(
2113 cursor->rec, TRUE);
2114 ut_ad(rec_get_status(cursor->rec)
2115 <= REC_STATUS_INFIMUM);
2116 ut_ad(rec_get_status(insert_rec) < REC_STATUS_INFIMUM);
2117 ut_ad(rec_get_status(next_rec) != REC_STATUS_INFIMUM);
2118
2119 page_rec_set_next(insert_rec, next_rec);
2120 page_rec_set_next(cursor->rec, insert_rec);
2121 }
2122
2123 page_header_set_field(page, page_zip, PAGE_N_RECS,
2124 1 + page_get_n_recs(page));
2125
2126 /* 5. Set the n_owned field in the inserted record to zero,
2127 and set the heap_no field */
2128 rec_set_n_owned_new(insert_rec, NULL, 0);
2129 rec_set_heap_no_new(insert_rec, heap_no);
2130
2131 UNIV_MEM_ASSERT_RW(rec_get_start(insert_rec, offsets),
2132 rec_offs_size(offsets));
2133
2134 page_zip_dir_insert(page_zip, cursor->rec, free_rec, insert_rec);
2135
2136 /* 6. Update the last insertion info in page header */
2137
2138 last_insert = page_header_get_ptr(page, PAGE_LAST_INSERT);
2139 ut_ad(!last_insert
2140 || rec_get_node_ptr_flag(last_insert)
2141 == rec_get_node_ptr_flag(insert_rec));
2142
2143 if (!dict_index_is_spatial(index)) {
2144 if (UNIV_UNLIKELY(last_insert == NULL)) {
2145 page_header_set_field(page, page_zip, PAGE_DIRECTION,
2146 PAGE_NO_DIRECTION);
2147 page_header_set_field(page, page_zip,
2148 PAGE_N_DIRECTION, 0);
2149
2150 } else if ((last_insert == cursor->rec)
2151 && (page_header_get_field(page, PAGE_DIRECTION)
2152 != PAGE_LEFT)) {
2153
2154 page_header_set_field(page, page_zip, PAGE_DIRECTION,
2155 PAGE_RIGHT);
2156 page_header_set_field(page, page_zip, PAGE_N_DIRECTION,
2157 page_header_get_field(
2158 page, PAGE_N_DIRECTION) + 1);
2159
2160 } else if ((page_rec_get_next(insert_rec) == last_insert)
2161 && (page_header_get_field(page, PAGE_DIRECTION)
2162 != PAGE_RIGHT)) {
2163
2164 page_header_set_field(page, page_zip, PAGE_DIRECTION,
2165 PAGE_LEFT);
2166 page_header_set_field(page, page_zip, PAGE_N_DIRECTION,
2167 page_header_get_field(
2168 page, PAGE_N_DIRECTION) + 1);
2169 } else {
2170 page_header_set_field(page, page_zip, PAGE_DIRECTION,
2171 PAGE_NO_DIRECTION);
2172 page_header_set_field(page, page_zip,
2173 PAGE_N_DIRECTION, 0);
2174 }
2175 }
2176
2177 page_header_set_ptr(page, page_zip, PAGE_LAST_INSERT, insert_rec);
2178
2179 /* 7. It remains to update the owner record. */
2180 {
2181 rec_t* owner_rec = page_rec_find_owner_rec(insert_rec);
2182 ulint n_owned;
2183
2184 n_owned = rec_get_n_owned_new(owner_rec);
2185 rec_set_n_owned_new(owner_rec, page_zip, n_owned + 1);
2186
2187 /* 8. Now we have incremented the n_owned field of the owner
2188 record. If the number exceeds PAGE_DIR_SLOT_MAX_N_OWNED,
2189 we have to split the corresponding directory slot in two. */
2190
2191 if (UNIV_UNLIKELY(n_owned == PAGE_DIR_SLOT_MAX_N_OWNED)) {
2192 page_dir_split_slot(
2193 page, page_zip,
2194 page_dir_find_owner_slot(owner_rec));
2195 }
2196 }
2197
2198 page_zip_write_rec(page_zip, insert_rec, index, offsets, 1);
2199
2200 /* 9. Write log record of the insert */
2201 if (UNIV_LIKELY(mtr != NULL)) {
2202 page_cur_insert_rec_write_log(insert_rec, rec_size,
2203 cursor->rec, index, mtr);
2204 }
2205
2206 return(insert_rec);
2207 }
2208
2209 #ifndef UNIV_HOTBACKUP
2210 /**********************************************************//**
2211 Writes a log record of copying a record list end to a new created page.
2212 @return 4-byte field where to write the log data length, or NULL if
2213 logging is disabled */
2214 UNIV_INLINE
2215 byte*
page_copy_rec_list_to_created_page_write_log(page_t * page,dict_index_t * index,mtr_t * mtr)2216 page_copy_rec_list_to_created_page_write_log(
2217 /*=========================================*/
2218 page_t* page, /*!< in: index page */
2219 dict_index_t* index, /*!< in: record descriptor */
2220 mtr_t* mtr) /*!< in: mtr */
2221 {
2222 byte* log_ptr;
2223
2224 ut_ad(!!page_is_comp(page) == dict_table_is_comp(index->table));
2225 ut_ad(mtr->is_named_space(index->space));
2226
2227 log_ptr = mlog_open_and_write_index(mtr, page, index,
2228 page_is_comp(page)
2229 ? MLOG_COMP_LIST_END_COPY_CREATED
2230 : MLOG_LIST_END_COPY_CREATED, 4);
2231 if (UNIV_LIKELY(log_ptr != NULL)) {
2232 mlog_close(mtr, log_ptr + 4);
2233 }
2234
2235 return(log_ptr);
2236 }
2237 #endif /* !UNIV_HOTBACKUP */
2238
2239 /**********************************************************//**
2240 Parses a log record of copying a record list end to a new created page.
2241 @return end of log record or NULL */
2242 byte*
page_parse_copy_rec_list_to_created_page(byte * ptr,byte * end_ptr,buf_block_t * block,dict_index_t * index,mtr_t * mtr)2243 page_parse_copy_rec_list_to_created_page(
2244 /*=====================================*/
2245 byte* ptr, /*!< in: buffer */
2246 byte* end_ptr,/*!< in: buffer end */
2247 buf_block_t* block, /*!< in: page or NULL */
2248 dict_index_t* index, /*!< in: record descriptor */
2249 mtr_t* mtr) /*!< in: mtr or NULL */
2250 {
2251 byte* rec_end;
2252 ulint log_data_len;
2253 page_t* page;
2254 page_zip_des_t* page_zip;
2255
2256 if (ptr + 4 > end_ptr) {
2257
2258 return(NULL);
2259 }
2260
2261 log_data_len = mach_read_from_4(ptr);
2262 ptr += 4;
2263
2264 rec_end = ptr + log_data_len;
2265
2266 if (rec_end > end_ptr) {
2267
2268 return(NULL);
2269 }
2270
2271 if (!block) {
2272
2273 return(rec_end);
2274 }
2275
2276 while (ptr < rec_end) {
2277 ptr = page_cur_parse_insert_rec(TRUE, ptr, end_ptr,
2278 block, index, mtr);
2279 }
2280
2281 ut_a(ptr == rec_end);
2282
2283 page = buf_block_get_frame(block);
2284 page_zip = buf_block_get_page_zip(block);
2285
2286 page_header_set_ptr(page, page_zip, PAGE_LAST_INSERT, NULL);
2287
2288 if (!dict_index_is_spatial(index)) {
2289 page_header_set_field(page, page_zip, PAGE_DIRECTION,
2290 PAGE_NO_DIRECTION);
2291 page_header_set_field(page, page_zip, PAGE_N_DIRECTION, 0);
2292 }
2293
2294 return(rec_end);
2295 }
2296
2297 #ifndef UNIV_HOTBACKUP
2298 /*************************************************************//**
2299 Copies records from page to a newly created page, from a given record onward,
2300 including that record. Infimum and supremum records are not copied.
2301
2302 IMPORTANT: The caller will have to update IBUF_BITMAP_FREE
2303 if this is a compressed leaf page in a secondary index.
2304 This has to be done either within the same mini-transaction,
2305 or by invoking ibuf_reset_free_bits() before mtr_commit(). */
2306 void
page_copy_rec_list_end_to_created_page(page_t * new_page,rec_t * rec,dict_index_t * index,mtr_t * mtr)2307 page_copy_rec_list_end_to_created_page(
2308 /*===================================*/
2309 page_t* new_page, /*!< in/out: index page to copy to */
2310 rec_t* rec, /*!< in: first record to copy */
2311 dict_index_t* index, /*!< in: record descriptor */
2312 mtr_t* mtr) /*!< in: mtr */
2313 {
2314 page_dir_slot_t* slot = 0; /* remove warning */
2315 byte* heap_top;
2316 rec_t* insert_rec = 0; /* remove warning */
2317 rec_t* prev_rec;
2318 ulint count;
2319 ulint n_recs;
2320 ulint slot_index;
2321 ulint rec_size;
2322 byte* log_ptr;
2323 ulint log_data_len;
2324 mem_heap_t* heap = NULL;
2325 ulint offsets_[REC_OFFS_NORMAL_SIZE];
2326 ulint* offsets = offsets_;
2327 rec_offs_init(offsets_);
2328
2329 ut_ad(page_dir_get_n_heap(new_page) == PAGE_HEAP_NO_USER_LOW);
2330 ut_ad(page_align(rec) != new_page);
2331 ut_ad(page_rec_is_comp(rec) == page_is_comp(new_page));
2332
2333 if (page_rec_is_infimum(rec)) {
2334
2335 rec = page_rec_get_next(rec);
2336 }
2337
2338 if (page_rec_is_supremum(rec)) {
2339
2340 return;
2341 }
2342
2343 #ifdef UNIV_DEBUG
2344 /* To pass the debug tests we have to set these dummy values
2345 in the debug version */
2346 page_dir_set_n_slots(new_page, NULL, UNIV_PAGE_SIZE / 2);
2347 page_header_set_ptr(new_page, NULL, PAGE_HEAP_TOP,
2348 new_page + UNIV_PAGE_SIZE - 1);
2349 #endif
2350
2351 log_ptr = page_copy_rec_list_to_created_page_write_log(new_page,
2352 index, mtr);
2353
2354 log_data_len = mtr->get_log()->size();
2355
2356 /* Individual inserts are logged in a shorter form */
2357
2358 mtr_log_t log_mode;
2359
2360 if (dict_table_is_temporary(index->table)
2361 || index->table->ibd_file_missing /* IMPORT TABLESPACE */) {
2362 log_mode = mtr_get_log_mode(mtr);
2363 } else {
2364 log_mode = mtr_set_log_mode(mtr, MTR_LOG_SHORT_INSERTS);
2365 }
2366
2367 prev_rec = page_get_infimum_rec(new_page);
2368 if (page_is_comp(new_page)) {
2369 heap_top = new_page + PAGE_NEW_SUPREMUM_END;
2370 } else {
2371 heap_top = new_page + PAGE_OLD_SUPREMUM_END;
2372 }
2373 count = 0;
2374 slot_index = 0;
2375 n_recs = 0;
2376
2377 do {
2378 offsets = rec_get_offsets(rec, index, offsets,
2379 ULINT_UNDEFINED, &heap);
2380 insert_rec = rec_copy(heap_top, rec, offsets);
2381
2382 if (page_is_comp(new_page)) {
2383 rec_set_next_offs_new(prev_rec,
2384 page_offset(insert_rec));
2385
2386 rec_set_n_owned_new(insert_rec, NULL, 0);
2387 rec_set_heap_no_new(insert_rec,
2388 PAGE_HEAP_NO_USER_LOW + n_recs);
2389 } else {
2390 rec_set_next_offs_old(prev_rec,
2391 page_offset(insert_rec));
2392
2393 rec_set_n_owned_old(insert_rec, 0);
2394 rec_set_heap_no_old(insert_rec,
2395 PAGE_HEAP_NO_USER_LOW + n_recs);
2396 }
2397
2398 count++;
2399 n_recs++;
2400
2401 if (UNIV_UNLIKELY
2402 (count == (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2)) {
2403
2404 slot_index++;
2405
2406 slot = page_dir_get_nth_slot(new_page, slot_index);
2407
2408 page_dir_slot_set_rec(slot, insert_rec);
2409 page_dir_slot_set_n_owned(slot, NULL, count);
2410
2411 count = 0;
2412 }
2413
2414 rec_size = rec_offs_size(offsets);
2415
2416 ut_ad(heap_top < new_page + UNIV_PAGE_SIZE);
2417
2418 heap_top += rec_size;
2419
2420 rec_offs_make_valid(insert_rec, index, offsets);
2421 page_cur_insert_rec_write_log(insert_rec, rec_size, prev_rec,
2422 index, mtr);
2423 prev_rec = insert_rec;
2424 rec = page_rec_get_next(rec);
2425 } while (!page_rec_is_supremum(rec));
2426
2427 if ((slot_index > 0) && (count + 1
2428 + (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2
2429 <= PAGE_DIR_SLOT_MAX_N_OWNED)) {
2430 /* We can merge the two last dir slots. This operation is
2431 here to make this function imitate exactly the equivalent
2432 task made using page_cur_insert_rec, which we use in database
2433 recovery to reproduce the task performed by this function.
2434 To be able to check the correctness of recovery, it is good
2435 that it imitates exactly. */
2436
2437 count += (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2;
2438
2439 page_dir_slot_set_n_owned(slot, NULL, 0);
2440
2441 slot_index--;
2442 }
2443
2444 if (UNIV_LIKELY_NULL(heap)) {
2445 mem_heap_free(heap);
2446 }
2447
2448 log_data_len = mtr->get_log()->size() - log_data_len;
2449
2450 ut_a(log_data_len < 100 * UNIV_PAGE_SIZE);
2451
2452 if (log_ptr != NULL) {
2453 mach_write_to_4(log_ptr, log_data_len);
2454 }
2455
2456 if (page_is_comp(new_page)) {
2457 rec_set_next_offs_new(insert_rec, PAGE_NEW_SUPREMUM);
2458 } else {
2459 rec_set_next_offs_old(insert_rec, PAGE_OLD_SUPREMUM);
2460 }
2461
2462 slot = page_dir_get_nth_slot(new_page, 1 + slot_index);
2463
2464 page_dir_slot_set_rec(slot, page_get_supremum_rec(new_page));
2465 page_dir_slot_set_n_owned(slot, NULL, count + 1);
2466
2467 page_dir_set_n_slots(new_page, NULL, 2 + slot_index);
2468 page_header_set_ptr(new_page, NULL, PAGE_HEAP_TOP, heap_top);
2469 page_dir_set_n_heap(new_page, NULL, PAGE_HEAP_NO_USER_LOW + n_recs);
2470 page_header_set_field(new_page, NULL, PAGE_N_RECS, n_recs);
2471
2472 page_header_set_ptr(new_page, NULL, PAGE_LAST_INSERT, NULL);
2473
2474 page_header_set_field(new_page, NULL, PAGE_DIRECTION,
2475 PAGE_NO_DIRECTION);
2476 page_header_set_field(new_page, NULL, PAGE_N_DIRECTION, 0);
2477
2478 /* Restore the log mode */
2479
2480 mtr_set_log_mode(mtr, log_mode);
2481 }
2482
2483 /***********************************************************//**
2484 Writes log record of a record delete on a page. */
2485 UNIV_INLINE
2486 void
page_cur_delete_rec_write_log(rec_t * rec,const dict_index_t * index,mtr_t * mtr)2487 page_cur_delete_rec_write_log(
2488 /*==========================*/
2489 rec_t* rec, /*!< in: record to be deleted */
2490 const dict_index_t* index, /*!< in: record descriptor */
2491 mtr_t* mtr) /*!< in: mini-transaction handle */
2492 {
2493 byte* log_ptr;
2494
2495 ut_ad(!!page_rec_is_comp(rec) == dict_table_is_comp(index->table));
2496 ut_ad(mtr->is_named_space(index->space));
2497
2498 log_ptr = mlog_open_and_write_index(mtr, rec, index,
2499 page_rec_is_comp(rec)
2500 ? MLOG_COMP_REC_DELETE
2501 : MLOG_REC_DELETE, 2);
2502
2503 if (!log_ptr) {
2504 /* Logging in mtr is switched off during crash recovery:
2505 in that case mlog_open returns NULL */
2506 return;
2507 }
2508
2509 /* Write the cursor rec offset as a 2-byte ulint */
2510 mach_write_to_2(log_ptr, page_offset(rec));
2511
2512 mlog_close(mtr, log_ptr + 2);
2513 }
2514 #else /* !UNIV_HOTBACKUP */
2515 # define page_cur_delete_rec_write_log(rec,index,mtr) ((void) 0)
2516 #endif /* !UNIV_HOTBACKUP */
2517
2518 /***********************************************************//**
2519 Parses log record of a record delete on a page.
2520 @return pointer to record end or NULL */
2521 byte*
page_cur_parse_delete_rec(byte * ptr,byte * end_ptr,buf_block_t * block,dict_index_t * index,mtr_t * mtr)2522 page_cur_parse_delete_rec(
2523 /*======================*/
2524 byte* ptr, /*!< in: buffer */
2525 byte* end_ptr,/*!< in: buffer end */
2526 buf_block_t* block, /*!< in: page or NULL */
2527 dict_index_t* index, /*!< in: record descriptor */
2528 mtr_t* mtr) /*!< in: mtr or NULL */
2529 {
2530 ulint offset;
2531 page_cur_t cursor;
2532
2533 if (end_ptr < ptr + 2) {
2534
2535 return(NULL);
2536 }
2537
2538 /* Read the cursor rec offset as a 2-byte ulint */
2539 offset = mach_read_from_2(ptr);
2540 ptr += 2;
2541
2542 ut_a(offset <= UNIV_PAGE_SIZE);
2543
2544 if (block) {
2545 page_t* page = buf_block_get_frame(block);
2546 mem_heap_t* heap = NULL;
2547 ulint offsets_[REC_OFFS_NORMAL_SIZE];
2548 rec_t* rec = page + offset;
2549 rec_offs_init(offsets_);
2550
2551 page_cur_position(rec, block, &cursor);
2552 ut_ad(!buf_block_get_page_zip(block) || page_is_comp(page));
2553
2554 page_cur_delete_rec(&cursor, index,
2555 rec_get_offsets(rec, index, offsets_,
2556 ULINT_UNDEFINED, &heap),
2557 mtr);
2558 if (UNIV_LIKELY_NULL(heap)) {
2559 mem_heap_free(heap);
2560 }
2561 }
2562
2563 return(ptr);
2564 }
2565
2566 /***********************************************************//**
2567 Deletes a record at the page cursor. The cursor is moved to the next
2568 record after the deleted one. */
2569 void
page_cur_delete_rec(page_cur_t * cursor,const dict_index_t * index,const ulint * offsets,mtr_t * mtr)2570 page_cur_delete_rec(
2571 /*================*/
2572 page_cur_t* cursor, /*!< in/out: a page cursor */
2573 const dict_index_t* index, /*!< in: record descriptor */
2574 const ulint* offsets,/*!< in: rec_get_offsets(
2575 cursor->rec, index) */
2576 mtr_t* mtr) /*!< in: mini-transaction handle
2577 or NULL */
2578 {
2579 page_dir_slot_t* cur_dir_slot;
2580 page_dir_slot_t* prev_slot;
2581 page_t* page;
2582 page_zip_des_t* page_zip;
2583 rec_t* current_rec;
2584 rec_t* prev_rec = NULL;
2585 rec_t* next_rec;
2586 ulint cur_slot_no;
2587 ulint cur_n_owned;
2588 rec_t* rec;
2589
2590 page = page_cur_get_page(cursor);
2591 page_zip = page_cur_get_page_zip(cursor);
2592
2593 /* page_zip_validate() will fail here when
2594 btr_cur_pessimistic_delete() invokes btr_set_min_rec_mark().
2595 Then, both "page_zip" and "page" would have the min-rec-mark
2596 set on the smallest user record, but "page" would additionally
2597 have it set on the smallest-but-one record. Because sloppy
2598 page_zip_validate_low() only ignores min-rec-flag differences
2599 in the smallest user record, it cannot be used here either. */
2600
2601 current_rec = cursor->rec;
2602 ut_ad(rec_offs_validate(current_rec, index, offsets));
2603 ut_ad(!!page_is_comp(page) == dict_table_is_comp(index->table));
2604 ut_ad(fil_page_index_page_check(page));
2605 ut_ad(mach_read_from_8(page + PAGE_HEADER + PAGE_INDEX_ID) == index->id
2606 || (mtr ? mtr->is_inside_ibuf() : dict_index_is_ibuf(index))
2607 || recv_recovery_is_on());
2608 ut_ad(mtr == NULL || mtr->is_named_space(index->space));
2609
2610 /* The record must not be the supremum or infimum record. */
2611 ut_ad(page_rec_is_user_rec(current_rec));
2612
2613 if (page_get_n_recs(page) == 1 && !recv_recovery_is_on()) {
2614 /* Empty the page, unless we are applying the redo log
2615 during crash recovery. During normal operation, the
2616 page_create_empty() gets logged as one of MLOG_PAGE_CREATE,
2617 MLOG_COMP_PAGE_CREATE, MLOG_ZIP_PAGE_COMPRESS. */
2618 ut_ad(page_is_leaf(page));
2619 /* Usually, this should be the root page,
2620 and the whole index tree should become empty.
2621 However, this could also be a call in
2622 btr_cur_pessimistic_update() to delete the only
2623 record in the page and to insert another one. */
2624 page_cur_move_to_next(cursor);
2625 ut_ad(page_cur_is_after_last(cursor));
2626 page_create_empty(page_cur_get_block(cursor),
2627 const_cast<dict_index_t*>(index), mtr);
2628 return;
2629 }
2630
2631 /* Save to local variables some data associated with current_rec */
2632 cur_slot_no = page_dir_find_owner_slot(current_rec);
2633 ut_ad(cur_slot_no > 0);
2634 cur_dir_slot = page_dir_get_nth_slot(page, cur_slot_no);
2635 cur_n_owned = page_dir_slot_get_n_owned(cur_dir_slot);
2636
2637 /* 0. Write the log record */
2638 if (mtr != 0) {
2639 page_cur_delete_rec_write_log(current_rec, index, mtr);
2640 }
2641
2642 /* 1. Reset the last insert info in the page header and increment
2643 the modify clock for the frame */
2644
2645 page_header_set_ptr(page, page_zip, PAGE_LAST_INSERT, NULL);
2646
2647 /* The page gets invalid for optimistic searches: increment the
2648 frame modify clock only if there is an mini-transaction covering
2649 the change. During IMPORT we allocate local blocks that are not
2650 part of the buffer pool. */
2651
2652 if (mtr != 0) {
2653 buf_block_modify_clock_inc(page_cur_get_block(cursor));
2654 }
2655
2656 /* 2. Find the next and the previous record. Note that the cursor is
2657 left at the next record. */
2658
2659 ut_ad(cur_slot_no > 0);
2660 prev_slot = page_dir_get_nth_slot(page, cur_slot_no - 1);
2661
2662 rec = (rec_t*) page_dir_slot_get_rec(prev_slot);
2663
2664 /* rec now points to the record of the previous directory slot. Look
2665 for the immediate predecessor of current_rec in a loop. */
2666
2667 while (current_rec != rec) {
2668 prev_rec = rec;
2669 rec = page_rec_get_next(rec);
2670 }
2671
2672 page_cur_move_to_next(cursor);
2673 next_rec = cursor->rec;
2674
2675 /* 3. Remove the record from the linked list of records */
2676
2677 page_rec_set_next(prev_rec, next_rec);
2678
2679 /* 4. If the deleted record is pointed to by a dir slot, update the
2680 record pointer in slot. In the following if-clause we assume that
2681 prev_rec is owned by the same slot, i.e., PAGE_DIR_SLOT_MIN_N_OWNED
2682 >= 2. */
2683
2684 #if PAGE_DIR_SLOT_MIN_N_OWNED < 2
2685 # error "PAGE_DIR_SLOT_MIN_N_OWNED < 2"
2686 #endif
2687 ut_ad(cur_n_owned > 1);
2688
2689 if (current_rec == page_dir_slot_get_rec(cur_dir_slot)) {
2690 page_dir_slot_set_rec(cur_dir_slot, prev_rec);
2691 }
2692
2693 /* 5. Update the number of owned records of the slot */
2694
2695 page_dir_slot_set_n_owned(cur_dir_slot, page_zip, cur_n_owned - 1);
2696
2697 /* 6. Free the memory occupied by the record */
2698 page_mem_free(page, page_zip, current_rec, index, offsets);
2699
2700 /* 7. Now we have decremented the number of owned records of the slot.
2701 If the number drops below PAGE_DIR_SLOT_MIN_N_OWNED, we balance the
2702 slots. */
2703
2704 if (cur_n_owned <= PAGE_DIR_SLOT_MIN_N_OWNED) {
2705 page_dir_balance_slot(page, page_zip, cur_slot_no);
2706 }
2707
2708 #ifdef UNIV_ZIP_DEBUG
2709 ut_a(!page_zip || page_zip_validate(page_zip, page, index));
2710 #endif /* UNIV_ZIP_DEBUG */
2711 }
2712
2713 #ifdef UNIV_COMPILE_TEST_FUNCS
2714
2715 /*******************************************************************//**
2716 Print the first n numbers, generated by page_cur_lcg_prng() to make sure
2717 (visually) that it works properly. */
2718 void
test_page_cur_lcg_prng(int n)2719 test_page_cur_lcg_prng(
2720 /*===================*/
2721 int n) /*!< in: print first n numbers */
2722 {
2723 int i;
2724 unsigned long long rnd;
2725
2726 for (i = 0; i < n; i++) {
2727 rnd = page_cur_lcg_prng();
2728 printf("%llu\t%%2=%llu %%3=%llu %%5=%llu %%7=%llu %%11=%llu\n",
2729 rnd,
2730 rnd % 2,
2731 rnd % 3,
2732 rnd % 5,
2733 rnd % 7,
2734 rnd % 11);
2735 }
2736 }
2737
2738 #endif /* UNIV_COMPILE_TEST_FUNCS */
2739