1 /*****************************************************************************
2
3 Copyright (c) 1994, 2016, Oracle and/or its affiliates. All Rights Reserved.
4 Copyright (c) 2012, Facebook Inc.
5 Copyright (c) 2018, 2021, MariaDB Corporation.
6
7 This program is free software; you can redistribute it and/or modify it under
8 the terms of the GNU General Public License as published by the Free Software
9 Foundation; version 2 of the License.
10
11 This program is distributed in the hope that it will be useful, but WITHOUT
12 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
13 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License along with
16 this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
18
19 *****************************************************************************/
20
21 /********************************************************************//**
22 @file page/page0cur.cc
23 The page cursor
24
25 Created 10/4/1994 Heikki Tuuri
26 *************************************************************************/
27
28 #include "page0cur.h"
29 #include "page0zip.h"
30 #include "btr0btr.h"
31 #include "mtr0log.h"
32 #include "log0recv.h"
33 #include "rem0cmp.h"
34 #include "gis0rtree.h"
35
36 #include <algorithm>
37
38 #ifdef BTR_CUR_HASH_ADAPT
39 # ifdef UNIV_SEARCH_PERF_STAT
40 static ulint page_cur_short_succ;
41 # endif /* UNIV_SEARCH_PERF_STAT */
42
43 /** Try a search shortcut based on the last insert.
44 @param[in] block index page
45 @param[in] index index tree
46 @param[in] tuple search key
47 @param[in,out] iup_matched_fields already matched fields in the
48 upper limit record
49 @param[in,out] ilow_matched_fields already matched fields in the
50 lower limit record
51 @param[out] cursor page cursor
52 @return true on success */
53 UNIV_INLINE
54 bool
page_cur_try_search_shortcut(const buf_block_t * block,const dict_index_t * index,const dtuple_t * tuple,ulint * iup_matched_fields,ulint * ilow_matched_fields,page_cur_t * cursor)55 page_cur_try_search_shortcut(
56 const buf_block_t* block,
57 const dict_index_t* index,
58 const dtuple_t* tuple,
59 ulint* iup_matched_fields,
60 ulint* ilow_matched_fields,
61 page_cur_t* cursor)
62 {
63 const rec_t* rec;
64 const rec_t* next_rec;
65 ulint low_match;
66 ulint up_match;
67 ibool success = FALSE;
68 const page_t* page = buf_block_get_frame(block);
69 mem_heap_t* heap = NULL;
70 rec_offs offsets_[REC_OFFS_NORMAL_SIZE];
71 rec_offs* offsets = offsets_;
72 rec_offs_init(offsets_);
73
74 ut_ad(dtuple_check_typed(tuple));
75 ut_ad(page_is_leaf(page));
76
77 rec = page_header_get_ptr(page, PAGE_LAST_INSERT);
78 offsets = rec_get_offsets(rec, index, offsets, index->n_core_fields,
79 dtuple_get_n_fields(tuple), &heap);
80
81 ut_ad(rec);
82 ut_ad(page_rec_is_user_rec(rec));
83
84 low_match = up_match = std::min(*ilow_matched_fields,
85 *iup_matched_fields);
86
87 if (cmp_dtuple_rec_with_match(tuple, rec, offsets, &low_match) < 0) {
88 goto exit_func;
89 }
90
91 next_rec = page_rec_get_next_const(rec);
92 if (!page_rec_is_supremum(next_rec)) {
93 offsets = rec_get_offsets(next_rec, index, offsets,
94 index->n_core_fields,
95 dtuple_get_n_fields(tuple), &heap);
96
97 if (cmp_dtuple_rec_with_match(tuple, next_rec, offsets,
98 &up_match) >= 0) {
99 goto exit_func;
100 }
101
102 *iup_matched_fields = up_match;
103 }
104
105 page_cur_position(rec, block, cursor);
106
107 *ilow_matched_fields = low_match;
108
109 #ifdef UNIV_SEARCH_PERF_STAT
110 page_cur_short_succ++;
111 #endif
112 success = TRUE;
113 exit_func:
114 if (UNIV_LIKELY_NULL(heap)) {
115 mem_heap_free(heap);
116 }
117 return(success);
118 }
119
120 /** Try a search shortcut based on the last insert.
121 @param[in] block index page
122 @param[in] index index tree
123 @param[in] tuple search key
124 @param[in,out] iup_matched_fields already matched fields in the
125 upper limit record
126 @param[in,out] iup_matched_bytes already matched bytes in the
127 first partially matched field in the upper limit record
128 @param[in,out] ilow_matched_fields already matched fields in the
129 lower limit record
130 @param[in,out] ilow_matched_bytes already matched bytes in the
131 first partially matched field in the lower limit record
132 @param[out] cursor page cursor
133 @return true on success */
134 UNIV_INLINE
135 bool
page_cur_try_search_shortcut_bytes(const buf_block_t * block,const dict_index_t * index,const dtuple_t * tuple,ulint * iup_matched_fields,ulint * iup_matched_bytes,ulint * ilow_matched_fields,ulint * ilow_matched_bytes,page_cur_t * cursor)136 page_cur_try_search_shortcut_bytes(
137 const buf_block_t* block,
138 const dict_index_t* index,
139 const dtuple_t* tuple,
140 ulint* iup_matched_fields,
141 ulint* iup_matched_bytes,
142 ulint* ilow_matched_fields,
143 ulint* ilow_matched_bytes,
144 page_cur_t* cursor)
145 {
146 const rec_t* rec;
147 const rec_t* next_rec;
148 ulint low_match;
149 ulint low_bytes;
150 ulint up_match;
151 ulint up_bytes;
152 ibool success = FALSE;
153 const page_t* page = buf_block_get_frame(block);
154 mem_heap_t* heap = NULL;
155 rec_offs offsets_[REC_OFFS_NORMAL_SIZE];
156 rec_offs* offsets = offsets_;
157 rec_offs_init(offsets_);
158
159 ut_ad(dtuple_check_typed(tuple));
160 ut_ad(page_is_leaf(page));
161
162 rec = page_header_get_ptr(page, PAGE_LAST_INSERT);
163 offsets = rec_get_offsets(rec, index, offsets, index->n_core_fields,
164 dtuple_get_n_fields(tuple), &heap);
165
166 ut_ad(rec);
167 ut_ad(page_rec_is_user_rec(rec));
168 if (ut_pair_cmp(*ilow_matched_fields, *ilow_matched_bytes,
169 *iup_matched_fields, *iup_matched_bytes) < 0) {
170 up_match = low_match = *ilow_matched_fields;
171 up_bytes = low_bytes = *ilow_matched_bytes;
172 } else {
173 up_match = low_match = *iup_matched_fields;
174 up_bytes = low_bytes = *iup_matched_bytes;
175 }
176
177 if (cmp_dtuple_rec_with_match_bytes(
178 tuple, rec, index, offsets, &low_match, &low_bytes) < 0) {
179 goto exit_func;
180 }
181
182 next_rec = page_rec_get_next_const(rec);
183 if (!page_rec_is_supremum(next_rec)) {
184 offsets = rec_get_offsets(next_rec, index, offsets,
185 index->n_core_fields,
186 dtuple_get_n_fields(tuple), &heap);
187
188 if (cmp_dtuple_rec_with_match_bytes(
189 tuple, next_rec, index, offsets,
190 &up_match, &up_bytes)
191 >= 0) {
192 goto exit_func;
193 }
194
195 *iup_matched_fields = up_match;
196 *iup_matched_bytes = up_bytes;
197 }
198
199 page_cur_position(rec, block, cursor);
200
201 *ilow_matched_fields = low_match;
202 *ilow_matched_bytes = low_bytes;
203
204 #ifdef UNIV_SEARCH_PERF_STAT
205 page_cur_short_succ++;
206 #endif
207 success = TRUE;
208 exit_func:
209 if (UNIV_LIKELY_NULL(heap)) {
210 mem_heap_free(heap);
211 }
212 return(success);
213 }
214 #endif /* BTR_CUR_HASH_ADAPT */
215
216 #ifdef PAGE_CUR_LE_OR_EXTENDS
217 /****************************************************************//**
218 Checks if the nth field in a record is a character type field which extends
219 the nth field in tuple, i.e., the field is longer or equal in length and has
220 common first characters.
221 @return TRUE if rec field extends tuple field */
222 static
223 ibool
page_cur_rec_field_extends(const dtuple_t * tuple,const rec_t * rec,const rec_offs * offsets,ulint n)224 page_cur_rec_field_extends(
225 /*=======================*/
226 const dtuple_t* tuple, /*!< in: data tuple */
227 const rec_t* rec, /*!< in: record */
228 const rec_offs* offsets,/*!< in: array returned by rec_get_offsets() */
229 ulint n) /*!< in: compare nth field */
230 {
231 const dtype_t* type;
232 const dfield_t* dfield;
233 const byte* rec_f;
234 ulint rec_f_len;
235
236 ut_ad(rec_offs_validate(rec, NULL, offsets));
237 dfield = dtuple_get_nth_field(tuple, n);
238
239 type = dfield_get_type(dfield);
240
241 rec_f = rec_get_nth_field(rec, offsets, n, &rec_f_len);
242
243 if (type->mtype == DATA_VARCHAR
244 || type->mtype == DATA_CHAR
245 || type->mtype == DATA_FIXBINARY
246 || type->mtype == DATA_BINARY
247 || type->mtype == DATA_BLOB
248 || DATA_GEOMETRY_MTYPE(type->mtype)
249 || type->mtype == DATA_VARMYSQL
250 || type->mtype == DATA_MYSQL) {
251
252 if (dfield_get_len(dfield) != UNIV_SQL_NULL
253 && rec_f_len != UNIV_SQL_NULL
254 && rec_f_len >= dfield_get_len(dfield)
255 && !cmp_data_data(type->mtype, type->prtype,
256 dfield_get_data(dfield),
257 dfield_get_len(dfield),
258 rec_f, dfield_get_len(dfield))) {
259
260 return(TRUE);
261 }
262 }
263
264 return(FALSE);
265 }
266 #endif /* PAGE_CUR_LE_OR_EXTENDS */
267
268 /****************************************************************//**
269 Searches the right position for a page cursor. */
270 void
page_cur_search_with_match(const buf_block_t * block,const dict_index_t * index,const dtuple_t * tuple,page_cur_mode_t mode,ulint * iup_matched_fields,ulint * ilow_matched_fields,page_cur_t * cursor,rtr_info_t * rtr_info)271 page_cur_search_with_match(
272 /*=======================*/
273 const buf_block_t* block, /*!< in: buffer block */
274 const dict_index_t* index, /*!< in/out: record descriptor */
275 const dtuple_t* tuple, /*!< in: data tuple */
276 page_cur_mode_t mode, /*!< in: PAGE_CUR_L,
277 PAGE_CUR_LE, PAGE_CUR_G, or
278 PAGE_CUR_GE */
279 ulint* iup_matched_fields,
280 /*!< in/out: already matched
281 fields in upper limit record */
282 ulint* ilow_matched_fields,
283 /*!< in/out: already matched
284 fields in lower limit record */
285 page_cur_t* cursor, /*!< out: page cursor */
286 rtr_info_t* rtr_info)/*!< in/out: rtree search stack */
287 {
288 ulint up;
289 ulint low;
290 ulint mid;
291 const page_t* page;
292 const page_dir_slot_t* slot;
293 const rec_t* up_rec;
294 const rec_t* low_rec;
295 const rec_t* mid_rec;
296 ulint up_matched_fields;
297 ulint low_matched_fields;
298 ulint cur_matched_fields;
299 int cmp;
300 #ifdef UNIV_ZIP_DEBUG
301 const page_zip_des_t* page_zip = buf_block_get_page_zip(block);
302 #endif /* UNIV_ZIP_DEBUG */
303 mem_heap_t* heap = NULL;
304 rec_offs offsets_[REC_OFFS_NORMAL_SIZE];
305 rec_offs* offsets = offsets_;
306 rec_offs_init(offsets_);
307
308 ut_ad(dtuple_validate(tuple));
309 #ifdef UNIV_DEBUG
310 # ifdef PAGE_CUR_DBG
311 if (mode != PAGE_CUR_DBG)
312 # endif /* PAGE_CUR_DBG */
313 # ifdef PAGE_CUR_LE_OR_EXTENDS
314 if (mode != PAGE_CUR_LE_OR_EXTENDS)
315 # endif /* PAGE_CUR_LE_OR_EXTENDS */
316 ut_ad(mode == PAGE_CUR_L || mode == PAGE_CUR_LE
317 || mode == PAGE_CUR_G || mode == PAGE_CUR_GE
318 || dict_index_is_spatial(index));
319 #endif /* UNIV_DEBUG */
320 page = buf_block_get_frame(block);
321 #ifdef UNIV_ZIP_DEBUG
322 ut_a(!page_zip || page_zip_validate(page_zip, page, index));
323 #endif /* UNIV_ZIP_DEBUG */
324
325 ut_d(page_check_dir(page));
326 const ulint n_core = page_is_leaf(page) ? index->n_core_fields : 0;
327
328 #ifdef BTR_CUR_HASH_ADAPT
329 if (n_core
330 && page_get_direction(page) == PAGE_RIGHT
331 && page_header_get_offs(page, PAGE_LAST_INSERT)
332 && mode == PAGE_CUR_LE
333 && !index->is_spatial()
334 && page_header_get_field(page, PAGE_N_DIRECTION) > 3
335 && page_cur_try_search_shortcut(
336 block, index, tuple,
337 iup_matched_fields, ilow_matched_fields, cursor)) {
338 return;
339 }
340 # ifdef PAGE_CUR_DBG
341 if (mode == PAGE_CUR_DBG) {
342 mode = PAGE_CUR_LE;
343 }
344 # endif
345 #endif /* BTR_CUR_HASH_ADAPT */
346
347 /* If the mode is for R-tree indexes, use the special MBR
348 related compare functions */
349 if (index->is_spatial() && mode > PAGE_CUR_LE) {
350 /* For leaf level insert, we still use the traditional
351 compare function for now */
352 if (mode == PAGE_CUR_RTREE_INSERT && n_core) {
353 mode = PAGE_CUR_LE;
354 } else {
355 rtr_cur_search_with_match(
356 block, (dict_index_t*)index, tuple, mode,
357 cursor, rtr_info);
358 return;
359 }
360 }
361
362 /* The following flag does not work for non-latin1 char sets because
363 cmp_full_field does not tell how many bytes matched */
364 #ifdef PAGE_CUR_LE_OR_EXTENDS
365 ut_a(mode != PAGE_CUR_LE_OR_EXTENDS);
366 #endif /* PAGE_CUR_LE_OR_EXTENDS */
367
368 /* If mode PAGE_CUR_G is specified, we are trying to position the
369 cursor to answer a query of the form "tuple < X", where tuple is
370 the input parameter, and X denotes an arbitrary physical record on
371 the page. We want to position the cursor on the first X which
372 satisfies the condition. */
373
374 up_matched_fields = *iup_matched_fields;
375 low_matched_fields = *ilow_matched_fields;
376
377 /* Perform binary search. First the search is done through the page
378 directory, after that as a linear search in the list of records
379 owned by the upper limit directory slot. */
380
381 low = 0;
382 up = ulint(page_dir_get_n_slots(page)) - 1;
383
384 /* Perform binary search until the lower and upper limit directory
385 slots come to the distance 1 of each other */
386
387 while (up - low > 1) {
388 mid = (low + up) / 2;
389 slot = page_dir_get_nth_slot(page, mid);
390 mid_rec = page_dir_slot_get_rec(slot);
391
392 cur_matched_fields = std::min(low_matched_fields,
393 up_matched_fields);
394
395 offsets = offsets_;
396 offsets = rec_get_offsets(
397 mid_rec, index, offsets, n_core,
398 dtuple_get_n_fields_cmp(tuple), &heap);
399
400 cmp = cmp_dtuple_rec_with_match(
401 tuple, mid_rec, offsets, &cur_matched_fields);
402
403 if (cmp > 0) {
404 low_slot_match:
405 low = mid;
406 low_matched_fields = cur_matched_fields;
407
408 } else if (cmp) {
409 #ifdef PAGE_CUR_LE_OR_EXTENDS
410 if (mode == PAGE_CUR_LE_OR_EXTENDS
411 && page_cur_rec_field_extends(
412 tuple, mid_rec, offsets,
413 cur_matched_fields)) {
414
415 goto low_slot_match;
416 }
417 #endif /* PAGE_CUR_LE_OR_EXTENDS */
418 up_slot_match:
419 up = mid;
420 up_matched_fields = cur_matched_fields;
421
422 } else if (mode == PAGE_CUR_G || mode == PAGE_CUR_LE
423 #ifdef PAGE_CUR_LE_OR_EXTENDS
424 || mode == PAGE_CUR_LE_OR_EXTENDS
425 #endif /* PAGE_CUR_LE_OR_EXTENDS */
426 ) {
427 goto low_slot_match;
428 } else {
429
430 goto up_slot_match;
431 }
432 }
433
434 slot = page_dir_get_nth_slot(page, low);
435 low_rec = page_dir_slot_get_rec(slot);
436 slot = page_dir_get_nth_slot(page, up);
437 up_rec = page_dir_slot_get_rec(slot);
438
439 /* Perform linear search until the upper and lower records come to
440 distance 1 of each other. */
441
442 while (page_rec_get_next_const(low_rec) != up_rec) {
443
444 mid_rec = page_rec_get_next_const(low_rec);
445
446 cur_matched_fields = std::min(low_matched_fields,
447 up_matched_fields);
448
449 offsets = offsets_;
450 offsets = rec_get_offsets(
451 mid_rec, index, offsets, n_core,
452 dtuple_get_n_fields_cmp(tuple), &heap);
453
454 cmp = cmp_dtuple_rec_with_match(
455 tuple, mid_rec, offsets, &cur_matched_fields);
456
457 if (cmp > 0) {
458 low_rec_match:
459 low_rec = mid_rec;
460 low_matched_fields = cur_matched_fields;
461
462 } else if (cmp) {
463 #ifdef PAGE_CUR_LE_OR_EXTENDS
464 if (mode == PAGE_CUR_LE_OR_EXTENDS
465 && page_cur_rec_field_extends(
466 tuple, mid_rec, offsets,
467 cur_matched_fields)) {
468
469 goto low_rec_match;
470 }
471 #endif /* PAGE_CUR_LE_OR_EXTENDS */
472 up_rec_match:
473 up_rec = mid_rec;
474 up_matched_fields = cur_matched_fields;
475 } else if (mode == PAGE_CUR_G || mode == PAGE_CUR_LE
476 #ifdef PAGE_CUR_LE_OR_EXTENDS
477 || mode == PAGE_CUR_LE_OR_EXTENDS
478 #endif /* PAGE_CUR_LE_OR_EXTENDS */
479 ) {
480 if (!cmp && !cur_matched_fields) {
481 #ifdef UNIV_DEBUG
482 mtr_t mtr;
483 mtr_start(&mtr);
484
485 /* We got a match, but cur_matched_fields is
486 0, it must have REC_INFO_MIN_REC_FLAG */
487 ulint rec_info = rec_get_info_bits(mid_rec,
488 rec_offs_comp(offsets));
489 ut_ad(rec_info & REC_INFO_MIN_REC_FLAG);
490 ut_ad(!page_has_prev(page));
491 mtr_commit(&mtr);
492 #endif
493
494 cur_matched_fields = dtuple_get_n_fields_cmp(tuple);
495 }
496
497 goto low_rec_match;
498 } else {
499
500 goto up_rec_match;
501 }
502 }
503
504 if (mode <= PAGE_CUR_GE) {
505 page_cur_position(up_rec, block, cursor);
506 } else {
507 page_cur_position(low_rec, block, cursor);
508 }
509
510 *iup_matched_fields = up_matched_fields;
511 *ilow_matched_fields = low_matched_fields;
512 if (UNIV_LIKELY_NULL(heap)) {
513 mem_heap_free(heap);
514 }
515 }
516
517 #ifdef BTR_CUR_HASH_ADAPT
518 /** Search the right position for a page cursor.
519 @param[in] block buffer block
520 @param[in] index index tree
521 @param[in] tuple key to be searched for
522 @param[in] mode search mode
523 @param[in,out] iup_matched_fields already matched fields in the
524 upper limit record
525 @param[in,out] iup_matched_bytes already matched bytes in the
526 first partially matched field in the upper limit record
527 @param[in,out] ilow_matched_fields already matched fields in the
528 lower limit record
529 @param[in,out] ilow_matched_bytes already matched bytes in the
530 first partially matched field in the lower limit record
531 @param[out] cursor page cursor */
532 void
page_cur_search_with_match_bytes(const buf_block_t * block,const dict_index_t * index,const dtuple_t * tuple,page_cur_mode_t mode,ulint * iup_matched_fields,ulint * iup_matched_bytes,ulint * ilow_matched_fields,ulint * ilow_matched_bytes,page_cur_t * cursor)533 page_cur_search_with_match_bytes(
534 const buf_block_t* block,
535 const dict_index_t* index,
536 const dtuple_t* tuple,
537 page_cur_mode_t mode,
538 ulint* iup_matched_fields,
539 ulint* iup_matched_bytes,
540 ulint* ilow_matched_fields,
541 ulint* ilow_matched_bytes,
542 page_cur_t* cursor)
543 {
544 ulint up;
545 ulint low;
546 ulint mid;
547 const page_t* page;
548 const page_dir_slot_t* slot;
549 const rec_t* up_rec;
550 const rec_t* low_rec;
551 const rec_t* mid_rec;
552 ulint up_matched_fields;
553 ulint up_matched_bytes;
554 ulint low_matched_fields;
555 ulint low_matched_bytes;
556 ulint cur_matched_fields;
557 ulint cur_matched_bytes;
558 int cmp;
559 #ifdef UNIV_ZIP_DEBUG
560 const page_zip_des_t* page_zip = buf_block_get_page_zip(block);
561 #endif /* UNIV_ZIP_DEBUG */
562 mem_heap_t* heap = NULL;
563 rec_offs offsets_[REC_OFFS_NORMAL_SIZE];
564 rec_offs* offsets = offsets_;
565 rec_offs_init(offsets_);
566
567 ut_ad(dtuple_validate(tuple));
568 ut_ad(!(tuple->info_bits & REC_INFO_MIN_REC_FLAG));
569 #ifdef UNIV_DEBUG
570 # ifdef PAGE_CUR_DBG
571 if (mode != PAGE_CUR_DBG)
572 # endif /* PAGE_CUR_DBG */
573 # ifdef PAGE_CUR_LE_OR_EXTENDS
574 if (mode != PAGE_CUR_LE_OR_EXTENDS)
575 # endif /* PAGE_CUR_LE_OR_EXTENDS */
576 ut_ad(mode == PAGE_CUR_L || mode == PAGE_CUR_LE
577 || mode == PAGE_CUR_G || mode == PAGE_CUR_GE);
578 #endif /* UNIV_DEBUG */
579 page = buf_block_get_frame(block);
580 #ifdef UNIV_ZIP_DEBUG
581 ut_a(!page_zip || page_zip_validate(page_zip, page, index));
582 #endif /* UNIV_ZIP_DEBUG */
583
584 ut_d(page_check_dir(page));
585
586 #ifdef BTR_CUR_HASH_ADAPT
587 if (page_is_leaf(page)
588 && page_get_direction(page) == PAGE_RIGHT
589 && page_header_get_offs(page, PAGE_LAST_INSERT)
590 && mode == PAGE_CUR_LE
591 && page_header_get_field(page, PAGE_N_DIRECTION) > 3
592 && page_cur_try_search_shortcut_bytes(
593 block, index, tuple,
594 iup_matched_fields, iup_matched_bytes,
595 ilow_matched_fields, ilow_matched_bytes,
596 cursor)) {
597 return;
598 }
599 # ifdef PAGE_CUR_DBG
600 if (mode == PAGE_CUR_DBG) {
601 mode = PAGE_CUR_LE;
602 }
603 # endif
604 #endif /* BTR_CUR_HASH_ADAPT */
605
606 /* The following flag does not work for non-latin1 char sets because
607 cmp_full_field does not tell how many bytes matched */
608 #ifdef PAGE_CUR_LE_OR_EXTENDS
609 ut_a(mode != PAGE_CUR_LE_OR_EXTENDS);
610 #endif /* PAGE_CUR_LE_OR_EXTENDS */
611
612 /* If mode PAGE_CUR_G is specified, we are trying to position the
613 cursor to answer a query of the form "tuple < X", where tuple is
614 the input parameter, and X denotes an arbitrary physical record on
615 the page. We want to position the cursor on the first X which
616 satisfies the condition. */
617
618 up_matched_fields = *iup_matched_fields;
619 up_matched_bytes = *iup_matched_bytes;
620 low_matched_fields = *ilow_matched_fields;
621 low_matched_bytes = *ilow_matched_bytes;
622
623 /* Perform binary search. First the search is done through the page
624 directory, after that as a linear search in the list of records
625 owned by the upper limit directory slot. */
626
627 low = 0;
628 up = ulint(page_dir_get_n_slots(page)) - 1;
629
630 /* Perform binary search until the lower and upper limit directory
631 slots come to the distance 1 of each other */
632 const ulint n_core = page_is_leaf(page) ? index->n_core_fields : 0;
633
634 while (up - low > 1) {
635 mid = (low + up) / 2;
636 slot = page_dir_get_nth_slot(page, mid);
637 mid_rec = page_dir_slot_get_rec(slot);
638
639 ut_pair_min(&cur_matched_fields, &cur_matched_bytes,
640 low_matched_fields, low_matched_bytes,
641 up_matched_fields, up_matched_bytes);
642
643 offsets = rec_get_offsets(
644 mid_rec, index, offsets_, n_core,
645 dtuple_get_n_fields_cmp(tuple), &heap);
646
647 cmp = cmp_dtuple_rec_with_match_bytes(
648 tuple, mid_rec, index, offsets,
649 &cur_matched_fields, &cur_matched_bytes);
650
651 if (cmp > 0) {
652 low_slot_match:
653 low = mid;
654 low_matched_fields = cur_matched_fields;
655 low_matched_bytes = cur_matched_bytes;
656
657 } else if (cmp) {
658 #ifdef PAGE_CUR_LE_OR_EXTENDS
659 if (mode == PAGE_CUR_LE_OR_EXTENDS
660 && page_cur_rec_field_extends(
661 tuple, mid_rec, offsets,
662 cur_matched_fields)) {
663
664 goto low_slot_match;
665 }
666 #endif /* PAGE_CUR_LE_OR_EXTENDS */
667 up_slot_match:
668 up = mid;
669 up_matched_fields = cur_matched_fields;
670 up_matched_bytes = cur_matched_bytes;
671
672 } else if (mode == PAGE_CUR_G || mode == PAGE_CUR_LE
673 #ifdef PAGE_CUR_LE_OR_EXTENDS
674 || mode == PAGE_CUR_LE_OR_EXTENDS
675 #endif /* PAGE_CUR_LE_OR_EXTENDS */
676 ) {
677 goto low_slot_match;
678 } else {
679
680 goto up_slot_match;
681 }
682 }
683
684 slot = page_dir_get_nth_slot(page, low);
685 low_rec = page_dir_slot_get_rec(slot);
686 slot = page_dir_get_nth_slot(page, up);
687 up_rec = page_dir_slot_get_rec(slot);
688
689 /* Perform linear search until the upper and lower records come to
690 distance 1 of each other. */
691
692 while (page_rec_get_next_const(low_rec) != up_rec) {
693
694 mid_rec = page_rec_get_next_const(low_rec);
695
696 ut_pair_min(&cur_matched_fields, &cur_matched_bytes,
697 low_matched_fields, low_matched_bytes,
698 up_matched_fields, up_matched_bytes);
699
700 if (UNIV_UNLIKELY(rec_get_info_bits(
701 mid_rec,
702 dict_table_is_comp(index->table))
703 & REC_INFO_MIN_REC_FLAG)) {
704 ut_ad(!page_has_prev(page_align(mid_rec)));
705 ut_ad(!page_rec_is_leaf(mid_rec)
706 || rec_is_metadata(mid_rec, index));
707 cmp = 1;
708 goto low_rec_match;
709 }
710
711 offsets = rec_get_offsets(
712 mid_rec, index, offsets_, n_core,
713 dtuple_get_n_fields_cmp(tuple), &heap);
714
715 cmp = cmp_dtuple_rec_with_match_bytes(
716 tuple, mid_rec, index, offsets,
717 &cur_matched_fields, &cur_matched_bytes);
718
719 if (cmp > 0) {
720 low_rec_match:
721 low_rec = mid_rec;
722 low_matched_fields = cur_matched_fields;
723 low_matched_bytes = cur_matched_bytes;
724
725 } else if (cmp) {
726 #ifdef PAGE_CUR_LE_OR_EXTENDS
727 if (mode == PAGE_CUR_LE_OR_EXTENDS
728 && page_cur_rec_field_extends(
729 tuple, mid_rec, offsets,
730 cur_matched_fields)) {
731
732 goto low_rec_match;
733 }
734 #endif /* PAGE_CUR_LE_OR_EXTENDS */
735 up_rec_match:
736 up_rec = mid_rec;
737 up_matched_fields = cur_matched_fields;
738 up_matched_bytes = cur_matched_bytes;
739 } else if (mode == PAGE_CUR_G || mode == PAGE_CUR_LE
740 #ifdef PAGE_CUR_LE_OR_EXTENDS
741 || mode == PAGE_CUR_LE_OR_EXTENDS
742 #endif /* PAGE_CUR_LE_OR_EXTENDS */
743 ) {
744 goto low_rec_match;
745 } else {
746
747 goto up_rec_match;
748 }
749 }
750
751 if (mode <= PAGE_CUR_GE) {
752 page_cur_position(up_rec, block, cursor);
753 } else {
754 page_cur_position(low_rec, block, cursor);
755 }
756
757 *iup_matched_fields = up_matched_fields;
758 *iup_matched_bytes = up_matched_bytes;
759 *ilow_matched_fields = low_matched_fields;
760 *ilow_matched_bytes = low_matched_bytes;
761 if (UNIV_LIKELY_NULL(heap)) {
762 mem_heap_free(heap);
763 }
764 }
765 #endif /* BTR_CUR_HASH_ADAPT */
766
767 /***********************************************************//**
768 Positions a page cursor on a randomly chosen user record on a page. If there
769 are no user records, sets the cursor on the infimum record. */
770 void
page_cur_open_on_rnd_user_rec(buf_block_t * block,page_cur_t * cursor)771 page_cur_open_on_rnd_user_rec(
772 /*==========================*/
773 buf_block_t* block, /*!< in: page */
774 page_cur_t* cursor) /*!< out: page cursor */
775 {
776 const ulint n_recs = page_get_n_recs(block->frame);
777
778 page_cur_set_before_first(block, cursor);
779
780 if (UNIV_UNLIKELY(n_recs == 0)) {
781
782 return;
783 }
784
785 cursor->rec = page_rec_get_nth(block->frame,
786 ut_rnd_interval(n_recs) + 1);
787 }
788
789 /** Write a redo log record of inserting a record into an index page.
790 @param[in] insert_rec inserted record
791 @param[in] rec_size rec_get_size(insert_rec)
792 @param[in] cursor_rec predecessor of insert_rec
793 @param[in,out] index index tree
794 @param[in,out] mtr mini-transaction */
795 void
page_cur_insert_rec_write_log(const rec_t * insert_rec,ulint rec_size,const rec_t * cursor_rec,dict_index_t * index,mtr_t * mtr)796 page_cur_insert_rec_write_log(
797 const rec_t* insert_rec,
798 ulint rec_size,
799 const rec_t* cursor_rec,
800 dict_index_t* index,
801 mtr_t* mtr)
802 {
803 ulint cur_rec_size;
804 ulint extra_size;
805 ulint cur_extra_size;
806 const byte* ins_ptr;
807 const byte* log_end;
808 ulint i;
809
810 if (index->table->is_temporary()) {
811 mtr->set_modified();
812 ut_ad(mtr->get_log_mode() == MTR_LOG_NO_REDO);
813 return;
814 }
815
816 ut_a(rec_size < srv_page_size);
817 ut_ad(mtr->is_named_space(index->table->space));
818 ut_ad(page_align(insert_rec) == page_align(cursor_rec));
819 ut_ad(!page_rec_is_comp(insert_rec)
820 == !dict_table_is_comp(index->table));
821
822 const ulint n_core = page_rec_is_leaf(cursor_rec)
823 ? index->n_core_fields : 0;
824
825 {
826 mem_heap_t* heap = NULL;
827 rec_offs cur_offs_[REC_OFFS_NORMAL_SIZE];
828 rec_offs ins_offs_[REC_OFFS_NORMAL_SIZE];
829
830 rec_offs* cur_offs;
831 rec_offs* ins_offs;
832
833 rec_offs_init(cur_offs_);
834 rec_offs_init(ins_offs_);
835
836 cur_offs = rec_get_offsets(cursor_rec, index, cur_offs_,
837 n_core, ULINT_UNDEFINED, &heap);
838 ins_offs = rec_get_offsets(insert_rec, index, ins_offs_,
839 n_core, ULINT_UNDEFINED, &heap);
840
841 extra_size = rec_offs_extra_size(ins_offs);
842 cur_extra_size = rec_offs_extra_size(cur_offs);
843 ut_ad(rec_size == rec_offs_size(ins_offs));
844 cur_rec_size = rec_offs_size(cur_offs);
845
846 if (UNIV_LIKELY_NULL(heap)) {
847 mem_heap_free(heap);
848 }
849 }
850
851 ins_ptr = insert_rec - extra_size;
852
853 i = 0;
854
855 if (cur_extra_size == extra_size) {
856 ulint min_rec_size = ut_min(cur_rec_size, rec_size);
857
858 const byte* cur_ptr = cursor_rec - cur_extra_size;
859
860 /* Find out the first byte in insert_rec which differs from
861 cursor_rec; skip the bytes in the record info */
862
863 do {
864 if (*ins_ptr == *cur_ptr) {
865 i++;
866 ins_ptr++;
867 cur_ptr++;
868 } else if ((i < extra_size)
869 && (i >= extra_size
870 - page_rec_get_base_extra_size
871 (insert_rec))) {
872 i = extra_size;
873 ins_ptr = insert_rec;
874 cur_ptr = cursor_rec;
875 } else {
876 break;
877 }
878 } while (i < min_rec_size);
879 }
880
881 byte* log_ptr;
882
883 if (mtr_get_log_mode(mtr) != MTR_LOG_SHORT_INSERTS) {
884
885 if (page_rec_is_comp(insert_rec)) {
886 log_ptr = mlog_open_and_write_index(
887 mtr, insert_rec, index, MLOG_COMP_REC_INSERT,
888 2 + 5 + 1 + 5 + 5 + MLOG_BUF_MARGIN);
889 if (UNIV_UNLIKELY(!log_ptr)) {
890 /* Logging in mtr is switched off
891 during crash recovery: in that case
892 mlog_open returns NULL */
893 return;
894 }
895 } else {
896 log_ptr = mlog_open(mtr, 11
897 + 2 + 5 + 1 + 5 + 5
898 + MLOG_BUF_MARGIN);
899 if (UNIV_UNLIKELY(!log_ptr)) {
900 /* Logging in mtr is switched off
901 during crash recovery: in that case
902 mlog_open returns NULL */
903 return;
904 }
905
906 log_ptr = mlog_write_initial_log_record_fast(
907 insert_rec, MLOG_REC_INSERT, log_ptr, mtr);
908 }
909
910 log_end = &log_ptr[2 + 5 + 1 + 5 + 5 + MLOG_BUF_MARGIN];
911 /* Write the cursor rec offset as a 2-byte ulint */
912 mach_write_to_2(log_ptr, page_offset(cursor_rec));
913 log_ptr += 2;
914 } else {
915 log_ptr = mlog_open(mtr, 5 + 1 + 5 + 5 + MLOG_BUF_MARGIN);
916 if (!log_ptr) {
917 /* Logging in mtr is switched off during crash
918 recovery: in that case mlog_open returns NULL */
919 return;
920 }
921 log_end = &log_ptr[5 + 1 + 5 + 5 + MLOG_BUF_MARGIN];
922 }
923
924 if (page_rec_is_comp(insert_rec)) {
925 if (UNIV_UNLIKELY
926 (rec_get_info_and_status_bits(insert_rec, TRUE)
927 != rec_get_info_and_status_bits(cursor_rec, TRUE))) {
928
929 goto need_extra_info;
930 }
931 } else {
932 if (UNIV_UNLIKELY
933 (rec_get_info_and_status_bits(insert_rec, FALSE)
934 != rec_get_info_and_status_bits(cursor_rec, FALSE))) {
935
936 goto need_extra_info;
937 }
938 }
939
940 if (extra_size != cur_extra_size || rec_size != cur_rec_size) {
941 need_extra_info:
942 /* Write the record end segment length
943 and the extra info storage flag */
944 log_ptr += mach_write_compressed(log_ptr,
945 2 * (rec_size - i) + 1);
946
947 /* Write the info bits */
948 mach_write_to_1(log_ptr,
949 rec_get_info_and_status_bits(
950 insert_rec,
951 page_rec_is_comp(insert_rec)));
952 log_ptr++;
953
954 /* Write the record origin offset */
955 log_ptr += mach_write_compressed(log_ptr, extra_size);
956
957 /* Write the mismatch index */
958 log_ptr += mach_write_compressed(log_ptr, i);
959
960 ut_a(i < srv_page_size);
961 ut_a(extra_size < srv_page_size);
962 } else {
963 /* Write the record end segment length
964 and the extra info storage flag */
965 log_ptr += mach_write_compressed(log_ptr, 2 * (rec_size - i));
966 }
967
968 /* Write to the log the inserted index record end segment which
969 differs from the cursor record */
970
971 rec_size -= i;
972
973 if (log_ptr + rec_size <= log_end) {
974 memcpy(log_ptr, ins_ptr, rec_size);
975 mlog_close(mtr, log_ptr + rec_size);
976 } else {
977 mlog_close(mtr, log_ptr);
978 ut_a(rec_size < srv_page_size);
979 mlog_catenate_string(mtr, ins_ptr, rec_size);
980 }
981 }
982
983 /***********************************************************//**
984 Parses a log record of a record insert on a page.
985 @return end of log record or NULL */
986 byte*
page_cur_parse_insert_rec(ibool is_short,const byte * ptr,const byte * end_ptr,buf_block_t * block,dict_index_t * index,mtr_t * mtr)987 page_cur_parse_insert_rec(
988 /*======================*/
989 ibool is_short,/*!< in: TRUE if short inserts */
990 const byte* ptr, /*!< in: buffer */
991 const byte* end_ptr,/*!< in: buffer end */
992 buf_block_t* block, /*!< in: page or NULL */
993 dict_index_t* index, /*!< in: record descriptor */
994 mtr_t* mtr) /*!< in: mtr or NULL */
995 {
996 ulint origin_offset = 0; /* remove warning */
997 ulint end_seg_len;
998 ulint mismatch_index = 0; /* remove warning */
999 page_t* page;
1000 rec_t* cursor_rec;
1001 byte buf1[1024];
1002 byte* buf;
1003 const byte* ptr2 = ptr;
1004 ulint info_and_status_bits = 0; /* remove warning */
1005 page_cur_t cursor;
1006 mem_heap_t* heap = NULL;
1007 rec_offs offsets_[REC_OFFS_NORMAL_SIZE];
1008 rec_offs* offsets = offsets_;
1009 rec_offs_init(offsets_);
1010
1011 page = block ? buf_block_get_frame(block) : NULL;
1012
1013 if (is_short) {
1014 cursor_rec = page_rec_get_prev(page_get_supremum_rec(page));
1015 } else {
1016 ulint offset;
1017
1018 /* Read the cursor rec offset as a 2-byte ulint */
1019
1020 if (UNIV_UNLIKELY(end_ptr < ptr + 2)) {
1021
1022 return(NULL);
1023 }
1024
1025 offset = mach_read_from_2(ptr);
1026 ptr += 2;
1027
1028 cursor_rec = page + offset;
1029
1030 if (offset >= srv_page_size) {
1031
1032 recv_sys->found_corrupt_log = TRUE;
1033
1034 return(NULL);
1035 }
1036 }
1037
1038 end_seg_len = mach_parse_compressed(&ptr, end_ptr);
1039
1040 if (ptr == NULL) {
1041
1042 return(NULL);
1043 }
1044
1045 if (end_seg_len >= srv_page_size << 1) {
1046 recv_sys->found_corrupt_log = TRUE;
1047
1048 return(NULL);
1049 }
1050
1051 if (end_seg_len & 0x1UL) {
1052 /* Read the info bits */
1053
1054 if (end_ptr < ptr + 1) {
1055
1056 return(NULL);
1057 }
1058
1059 info_and_status_bits = mach_read_from_1(ptr);
1060 ptr++;
1061
1062 origin_offset = mach_parse_compressed(&ptr, end_ptr);
1063
1064 if (ptr == NULL) {
1065
1066 return(NULL);
1067 }
1068
1069 ut_a(origin_offset < srv_page_size);
1070
1071 mismatch_index = mach_parse_compressed(&ptr, end_ptr);
1072
1073 if (ptr == NULL) {
1074
1075 return(NULL);
1076 }
1077
1078 ut_a(mismatch_index < srv_page_size);
1079 }
1080
1081 if (end_ptr < ptr + (end_seg_len >> 1)) {
1082
1083 return(NULL);
1084 }
1085
1086 if (!block) {
1087
1088 return(const_cast<byte*>(ptr + (end_seg_len >> 1)));
1089 }
1090
1091 ut_ad(!!page_is_comp(page) == dict_table_is_comp(index->table));
1092 ut_ad(!buf_block_get_page_zip(block) || page_is_comp(page));
1093
1094 /* Read from the log the inserted index record end segment which
1095 differs from the cursor record */
1096
1097 const ulint n_core = page_is_leaf(page) ? index->n_core_fields : 0;
1098
1099 offsets = rec_get_offsets(cursor_rec, index, offsets, n_core,
1100 ULINT_UNDEFINED, &heap);
1101
1102 if (!(end_seg_len & 0x1UL)) {
1103 info_and_status_bits = rec_get_info_and_status_bits(
1104 cursor_rec, page_is_comp(page));
1105 origin_offset = rec_offs_extra_size(offsets);
1106 mismatch_index = rec_offs_size(offsets) - (end_seg_len >> 1);
1107 }
1108
1109 end_seg_len >>= 1;
1110
1111 if (mismatch_index + end_seg_len < sizeof buf1) {
1112 buf = buf1;
1113 } else {
1114 buf = static_cast<byte*>(
1115 ut_malloc_nokey(mismatch_index + end_seg_len));
1116 }
1117
1118 /* Build the inserted record to buf */
1119
1120 if (UNIV_UNLIKELY(mismatch_index >= srv_page_size)) {
1121
1122 ib::fatal() << "is_short " << is_short << ", "
1123 << "info_and_status_bits " << info_and_status_bits
1124 << ", offset " << page_offset(cursor_rec) << ","
1125 " o_offset " << origin_offset << ", mismatch index "
1126 << mismatch_index << ", end_seg_len " << end_seg_len
1127 << " parsed len " << (ptr - ptr2);
1128 }
1129
1130 ut_memcpy(buf, rec_get_start(cursor_rec, offsets), mismatch_index);
1131 ut_memcpy(buf + mismatch_index, ptr, end_seg_len);
1132
1133 if (page_is_comp(page)) {
1134 rec_set_heap_no_new(buf + origin_offset,
1135 PAGE_HEAP_NO_USER_LOW);
1136 rec_set_info_and_status_bits(buf + origin_offset,
1137 info_and_status_bits);
1138 } else {
1139 rec_set_heap_no_old(buf + origin_offset,
1140 PAGE_HEAP_NO_USER_LOW);
1141 rec_set_info_bits_old(buf + origin_offset,
1142 info_and_status_bits);
1143 }
1144
1145 page_cur_position(cursor_rec, block, &cursor);
1146
1147 offsets = rec_get_offsets(buf + origin_offset, index, offsets,
1148 n_core, ULINT_UNDEFINED, &heap);
1149 if (UNIV_UNLIKELY(!page_cur_rec_insert(&cursor,
1150 buf + origin_offset,
1151 index, offsets, mtr))) {
1152 /* The redo log record should only have been written
1153 after the write was successful. */
1154 ut_error;
1155 }
1156
1157 if (buf != buf1) {
1158
1159 ut_free(buf);
1160 }
1161
1162 if (UNIV_LIKELY_NULL(heap)) {
1163 mem_heap_free(heap);
1164 }
1165
1166 return(const_cast<byte*>(ptr + end_seg_len));
1167 }
1168
1169 /** Reset PAGE_DIRECTION and PAGE_N_DIRECTION.
1170 @param[in,out] ptr the PAGE_DIRECTION_B field
1171 @param[in,out] page index tree page frame
1172 @param[in] page_zip compressed page descriptor, or NULL */
1173 static inline
1174 void
page_direction_reset(byte * ptr,page_t * page,page_zip_des_t * page_zip)1175 page_direction_reset(byte* ptr, page_t* page, page_zip_des_t* page_zip)
1176 {
1177 ut_ad(ptr == PAGE_HEADER + PAGE_DIRECTION_B + page);
1178 page_ptr_set_direction(ptr, PAGE_NO_DIRECTION);
1179 if (page_zip) {
1180 page_zip_write_header(page_zip, ptr, 1, NULL);
1181 }
1182 ptr = PAGE_HEADER + PAGE_N_DIRECTION + page;
1183 *reinterpret_cast<uint16_t*>(ptr) = 0;
1184 if (page_zip) {
1185 page_zip_write_header(page_zip, ptr, 2, NULL);
1186 }
1187 }
1188
1189 /** Increment PAGE_N_DIRECTION.
1190 @param[in,out] ptr the PAGE_DIRECTION_B field
1191 @param[in,out] page index tree page frame
1192 @param[in] page_zip compressed page descriptor, or NULL
1193 @param[in] dir PAGE_RIGHT or PAGE_LEFT */
1194 static inline
1195 void
page_direction_increment(byte * ptr,page_t * page,page_zip_des_t * page_zip,uint dir)1196 page_direction_increment(
1197 byte* ptr,
1198 page_t* page,
1199 page_zip_des_t* page_zip,
1200 uint dir)
1201 {
1202 ut_ad(ptr == PAGE_HEADER + PAGE_DIRECTION_B + page);
1203 ut_ad(dir == PAGE_RIGHT || dir == PAGE_LEFT);
1204 page_ptr_set_direction(ptr, dir);
1205 if (page_zip) {
1206 page_zip_write_header(page_zip, ptr, 1, NULL);
1207 }
1208 page_header_set_field(
1209 page, page_zip, PAGE_N_DIRECTION,
1210 1U + page_header_get_field(page, PAGE_N_DIRECTION));
1211 }
1212
1213 /************************************************************//**
1214 Allocates a block of memory from the heap of an index page.
1215 @return pointer to start of allocated buffer, or NULL if allocation fails */
1216 static
1217 byte*
page_mem_alloc_heap(page_t * page,page_zip_des_t * page_zip,ulint need,ulint * heap_no)1218 page_mem_alloc_heap(
1219 /*================*/
1220 page_t* page, /*!< in/out: index page */
1221 page_zip_des_t* page_zip,/*!< in/out: compressed page with enough
1222 space available for inserting the record,
1223 or NULL */
1224 ulint need, /*!< in: total number of bytes needed */
1225 ulint* heap_no)/*!< out: this contains the heap number
1226 of the allocated record
1227 if allocation succeeds */
1228 {
1229 byte* block;
1230 ulint avl_space;
1231
1232 ut_ad(page && heap_no);
1233
1234 avl_space = page_get_max_insert_size(page, 1);
1235
1236 if (avl_space >= need) {
1237 const ulint h = page_dir_get_n_heap(page);
1238 if (UNIV_UNLIKELY(h >= 8191)) {
1239 /* At the minimum record size of 5+2 bytes,
1240 we can only reach this condition when using
1241 innodb_page_size=64k. */
1242 ut_ad(srv_page_size == 65536);
1243 return(NULL);
1244 }
1245 *heap_no = h;
1246
1247 block = page_header_get_ptr(page, PAGE_HEAP_TOP);
1248
1249 page_header_set_ptr(page, page_zip, PAGE_HEAP_TOP,
1250 block + need);
1251 page_dir_set_n_heap(page, page_zip, 1 + *heap_no);
1252
1253 return(block);
1254 }
1255
1256 return(NULL);
1257 }
1258
1259 /***********************************************************//**
1260 Inserts a record next to page cursor on an uncompressed page.
1261 Returns pointer to inserted record if succeed, i.e., enough
1262 space available, NULL otherwise. The cursor stays at the same position.
1263 @return pointer to record if succeed, NULL otherwise */
1264 rec_t*
page_cur_insert_rec_low(rec_t * current_rec,dict_index_t * index,const rec_t * rec,rec_offs * offsets,mtr_t * mtr)1265 page_cur_insert_rec_low(
1266 /*====================*/
1267 rec_t* current_rec,/*!< in: pointer to current record after
1268 which the new record is inserted */
1269 dict_index_t* index, /*!< in: record descriptor */
1270 const rec_t* rec, /*!< in: pointer to a physical record */
1271 rec_offs* offsets,/*!< in/out: rec_get_offsets(rec, index) */
1272 mtr_t* mtr) /*!< in: mini-transaction handle, or NULL */
1273 {
1274 byte* insert_buf;
1275 ulint rec_size;
1276 page_t* page; /*!< the relevant page */
1277 rec_t* last_insert; /*!< cursor position at previous
1278 insert */
1279 rec_t* free_rec; /*!< a free record that was reused,
1280 or NULL */
1281 rec_t* insert_rec; /*!< inserted record */
1282 ulint heap_no; /*!< heap number of the inserted
1283 record */
1284
1285 ut_ad(rec_offs_validate(rec, index, offsets));
1286
1287 page = page_align(current_rec);
1288 ut_ad(dict_table_is_comp(index->table)
1289 == (ibool) !!page_is_comp(page));
1290 ut_ad(fil_page_index_page_check(page));
1291 ut_ad(mach_read_from_8(page + PAGE_HEADER + PAGE_INDEX_ID) == index->id
1292 || index->is_dummy
1293 || (mtr ? mtr->is_inside_ibuf() : dict_index_is_ibuf(index)));
1294
1295 ut_ad(!page_rec_is_supremum(current_rec));
1296
1297 /* 1. Get the size of the physical record in the page */
1298 rec_size = rec_offs_size(offsets);
1299
1300 #ifdef HAVE_valgrind
1301 {
1302 const void* rec_start __attribute__((unused))
1303 = rec - rec_offs_extra_size(offsets);
1304 ulint extra_size __attribute__((unused))
1305 = rec_offs_extra_size(offsets)
1306 - (rec_offs_comp(offsets)
1307 ? REC_N_NEW_EXTRA_BYTES
1308 : REC_N_OLD_EXTRA_BYTES);
1309
1310 /* All data bytes of the record must be valid. */
1311 MEM_CHECK_DEFINED(rec, rec_offs_data_size(offsets));
1312 /* The variable-length header must be valid. */
1313 MEM_CHECK_DEFINED(rec_start, extra_size);
1314 }
1315 #endif /* HAVE_valgrind */
1316
1317 /* 2. Try to find suitable space from page memory management */
1318
1319 free_rec = page_header_get_ptr(page, PAGE_FREE);
1320 if (UNIV_LIKELY_NULL(free_rec)) {
1321 /* Try to allocate from the head of the free list. */
1322 rec_offs foffsets_[REC_OFFS_NORMAL_SIZE];
1323 rec_offs* foffsets = foffsets_;
1324 mem_heap_t* heap = NULL;
1325
1326 rec_offs_init(foffsets_);
1327
1328 foffsets = rec_get_offsets(
1329 free_rec, index, foffsets,
1330 page_is_leaf(page) ? index->n_core_fields : 0,
1331 ULINT_UNDEFINED, &heap);
1332 if (rec_offs_size(foffsets) < rec_size) {
1333 if (UNIV_LIKELY_NULL(heap)) {
1334 mem_heap_free(heap);
1335 }
1336
1337 goto use_heap;
1338 }
1339
1340 insert_buf = free_rec - rec_offs_extra_size(foffsets);
1341
1342 if (page_is_comp(page)) {
1343 heap_no = rec_get_heap_no_new(free_rec);
1344 page_mem_alloc_free(page, NULL,
1345 rec_get_next_ptr(free_rec, TRUE),
1346 rec_size);
1347 } else {
1348 heap_no = rec_get_heap_no_old(free_rec);
1349 page_mem_alloc_free(page, NULL,
1350 rec_get_next_ptr(free_rec, FALSE),
1351 rec_size);
1352 }
1353
1354 if (UNIV_LIKELY_NULL(heap)) {
1355 mem_heap_free(heap);
1356 }
1357 } else {
1358 use_heap:
1359 free_rec = NULL;
1360 insert_buf = page_mem_alloc_heap(page, NULL,
1361 rec_size, &heap_no);
1362
1363 if (UNIV_UNLIKELY(insert_buf == NULL)) {
1364 return(NULL);
1365 }
1366 }
1367
1368 /* 3. Create the record */
1369 insert_rec = rec_copy(insert_buf, rec, offsets);
1370 rec_offs_make_valid(insert_rec, index, page_is_leaf(page), offsets);
1371
1372 /* 4. Insert the record in the linked list of records */
1373 ut_ad(current_rec != insert_rec);
1374
1375 {
1376 /* next record after current before the insertion */
1377 rec_t* next_rec = page_rec_get_next(current_rec);
1378 #ifdef UNIV_DEBUG
1379 if (page_is_comp(page)) {
1380 switch (rec_get_status(current_rec)) {
1381 case REC_STATUS_ORDINARY:
1382 case REC_STATUS_NODE_PTR:
1383 case REC_STATUS_COLUMNS_ADDED:
1384 case REC_STATUS_INFIMUM:
1385 break;
1386 case REC_STATUS_SUPREMUM:
1387 ut_ad(!"wrong status on current_rec");
1388 }
1389 switch (rec_get_status(insert_rec)) {
1390 case REC_STATUS_ORDINARY:
1391 case REC_STATUS_NODE_PTR:
1392 case REC_STATUS_COLUMNS_ADDED:
1393 break;
1394 case REC_STATUS_INFIMUM:
1395 case REC_STATUS_SUPREMUM:
1396 ut_ad(!"wrong status on insert_rec");
1397 }
1398 ut_ad(rec_get_status(next_rec) != REC_STATUS_INFIMUM);
1399 }
1400 #endif
1401 page_rec_set_next(insert_rec, next_rec);
1402 page_rec_set_next(current_rec, insert_rec);
1403 }
1404
1405 page_header_set_field(page, NULL, PAGE_N_RECS,
1406 1U + page_get_n_recs(page));
1407
1408 /* 5. Set the n_owned field in the inserted record to zero,
1409 and set the heap_no field */
1410 if (page_is_comp(page)) {
1411 rec_set_n_owned_new(insert_rec, NULL, 0);
1412 rec_set_heap_no_new(insert_rec, heap_no);
1413 } else {
1414 rec_set_n_owned_old(insert_rec, 0);
1415 rec_set_heap_no_old(insert_rec, heap_no);
1416 }
1417
1418 MEM_CHECK_DEFINED(rec_get_start(insert_rec, offsets),
1419 rec_offs_size(offsets));
1420 /* 6. Update the last insertion info in page header */
1421
1422 last_insert = page_header_get_ptr(page, PAGE_LAST_INSERT);
1423 ut_ad(!last_insert || !page_is_comp(page)
1424 || rec_get_node_ptr_flag(last_insert)
1425 == rec_get_node_ptr_flag(insert_rec));
1426
1427 if (!dict_index_is_spatial(index)) {
1428 byte* ptr = PAGE_HEADER + PAGE_DIRECTION_B + page;
1429 if (UNIV_UNLIKELY(last_insert == NULL)) {
1430 no_direction:
1431 page_direction_reset(ptr, page, NULL);
1432 } else if (last_insert == current_rec
1433 && page_ptr_get_direction(ptr) != PAGE_LEFT) {
1434 page_direction_increment(ptr, page, NULL, PAGE_RIGHT);
1435 } else if (page_ptr_get_direction(ptr) != PAGE_RIGHT
1436 && page_rec_get_next(insert_rec) == last_insert) {
1437 page_direction_increment(ptr, page, NULL, PAGE_LEFT);
1438 } else {
1439 goto no_direction;
1440 }
1441 }
1442
1443 page_header_set_ptr(page, NULL, PAGE_LAST_INSERT, insert_rec);
1444
1445 /* 7. It remains to update the owner record. */
1446 {
1447 rec_t* owner_rec = page_rec_find_owner_rec(insert_rec);
1448 ulint n_owned;
1449 if (page_is_comp(page)) {
1450 n_owned = rec_get_n_owned_new(owner_rec);
1451 rec_set_n_owned_new(owner_rec, NULL, n_owned + 1);
1452 } else {
1453 n_owned = rec_get_n_owned_old(owner_rec);
1454 rec_set_n_owned_old(owner_rec, n_owned + 1);
1455 }
1456
1457 /* 8. Now we have incremented the n_owned field of the owner
1458 record. If the number exceeds PAGE_DIR_SLOT_MAX_N_OWNED,
1459 we have to split the corresponding directory slot in two. */
1460
1461 if (UNIV_UNLIKELY(n_owned == PAGE_DIR_SLOT_MAX_N_OWNED)) {
1462 page_dir_split_slot(
1463 page, NULL,
1464 page_dir_find_owner_slot(owner_rec));
1465 }
1466 }
1467
1468 /* 9. Write log record of the insert */
1469 if (UNIV_LIKELY(mtr != NULL)) {
1470 page_cur_insert_rec_write_log(insert_rec, rec_size,
1471 current_rec, index, mtr);
1472 }
1473
1474 return(insert_rec);
1475 }
1476
1477 /***********************************************************//**
1478 Inserts a record next to page cursor on a compressed and uncompressed
1479 page. Returns pointer to inserted record if succeed, i.e.,
1480 enough space available, NULL otherwise.
1481 The cursor stays at the same position.
1482
1483 IMPORTANT: The caller will have to update IBUF_BITMAP_FREE
1484 if this is a compressed leaf page in a secondary index.
1485 This has to be done either within the same mini-transaction,
1486 or by invoking ibuf_reset_free_bits() before mtr_commit().
1487
1488 @return pointer to record if succeed, NULL otherwise */
1489 rec_t*
page_cur_insert_rec_zip(page_cur_t * cursor,dict_index_t * index,const rec_t * rec,rec_offs * offsets,mtr_t * mtr)1490 page_cur_insert_rec_zip(
1491 /*====================*/
1492 page_cur_t* cursor, /*!< in/out: page cursor */
1493 dict_index_t* index, /*!< in: record descriptor */
1494 const rec_t* rec, /*!< in: pointer to a physical record */
1495 rec_offs* offsets,/*!< in/out: rec_get_offsets(rec, index) */
1496 mtr_t* mtr) /*!< in: mini-transaction handle, or NULL */
1497 {
1498 byte* insert_buf;
1499 ulint rec_size;
1500 page_t* page; /*!< the relevant page */
1501 rec_t* last_insert; /*!< cursor position at previous
1502 insert */
1503 rec_t* free_rec; /*!< a free record that was reused,
1504 or NULL */
1505 rec_t* insert_rec; /*!< inserted record */
1506 ulint heap_no; /*!< heap number of the inserted
1507 record */
1508 page_zip_des_t* page_zip;
1509
1510 page_zip = page_cur_get_page_zip(cursor);
1511 ut_ad(page_zip);
1512
1513 ut_ad(rec_offs_validate(rec, index, offsets));
1514
1515 page = page_cur_get_page(cursor);
1516 ut_ad(dict_table_is_comp(index->table));
1517 ut_ad(page_is_comp(page));
1518 ut_ad(fil_page_index_page_check(page));
1519 ut_ad(mach_read_from_8(page + PAGE_HEADER + PAGE_INDEX_ID) == index->id
1520 || index->is_dummy
1521 || (mtr ? mtr->is_inside_ibuf() : dict_index_is_ibuf(index)));
1522 ut_ad(!page_get_instant(page));
1523 ut_ad(!page_cur_is_after_last(cursor));
1524 #ifdef UNIV_ZIP_DEBUG
1525 ut_a(page_zip_validate(page_zip, page, index));
1526 #endif /* UNIV_ZIP_DEBUG */
1527
1528 /* 1. Get the size of the physical record in the page */
1529 rec_size = rec_offs_size(offsets);
1530
1531 #ifdef HAVE_valgrind
1532 {
1533 const void* rec_start __attribute__((unused))
1534 = rec - rec_offs_extra_size(offsets);
1535 ulint extra_size __attribute__((unused))
1536 = rec_offs_extra_size(offsets)
1537 - (rec_offs_comp(offsets)
1538 ? REC_N_NEW_EXTRA_BYTES
1539 : REC_N_OLD_EXTRA_BYTES);
1540
1541 /* All data bytes of the record must be valid. */
1542 MEM_CHECK_DEFINED(rec, rec_offs_data_size(offsets));
1543 /* The variable-length header must be valid. */
1544 MEM_CHECK_DEFINED(rec_start, extra_size);
1545 }
1546 #endif /* HAVE_valgrind */
1547
1548 const bool reorg_before_insert = page_has_garbage(page)
1549 && rec_size > page_get_max_insert_size(page, 1)
1550 && rec_size <= page_get_max_insert_size_after_reorganize(
1551 page, 1);
1552
1553 /* 2. Try to find suitable space from page memory management */
1554 if (!page_zip_available(page_zip, dict_index_is_clust(index),
1555 rec_size, 1)
1556 || reorg_before_insert) {
1557 /* The values can change dynamically. */
1558 bool log_compressed = page_zip_log_pages;
1559 ulint level = page_zip_level;
1560 #ifdef UNIV_DEBUG
1561 rec_t* cursor_rec = page_cur_get_rec(cursor);
1562 #endif /* UNIV_DEBUG */
1563
1564 /* If we are not writing compressed page images, we
1565 must reorganize the page before attempting the
1566 insert. */
1567 if (recv_recovery_is_on()) {
1568 /* Insert into the uncompressed page only.
1569 The page reorganization or creation that we
1570 would attempt outside crash recovery would
1571 have been covered by a previous redo log record. */
1572 } else if (page_is_empty(page)) {
1573 ut_ad(page_cur_is_before_first(cursor));
1574
1575 /* This is an empty page. Recreate it to
1576 get rid of the modification log. */
1577 page_create_zip(page_cur_get_block(cursor), index,
1578 page_header_get_field(page, PAGE_LEVEL),
1579 0, NULL, mtr);
1580 ut_ad(!page_header_get_ptr(page, PAGE_FREE));
1581
1582 if (page_zip_available(
1583 page_zip, dict_index_is_clust(index),
1584 rec_size, 1)) {
1585 goto use_heap;
1586 }
1587
1588 /* The cursor should remain on the page infimum. */
1589 return(NULL);
1590 } else if (!page_zip->m_nonempty && !page_has_garbage(page)) {
1591 /* The page has been freshly compressed, so
1592 reorganizing it will not help. */
1593 } else if (log_compressed && !reorg_before_insert) {
1594 /* Insert into uncompressed page only, and
1595 try page_zip_reorganize() afterwards. */
1596 } else if (btr_page_reorganize_low(
1597 recv_recovery_is_on(), level,
1598 cursor, index, mtr)) {
1599 ut_ad(!page_header_get_ptr(page, PAGE_FREE));
1600
1601 if (page_zip_available(
1602 page_zip, dict_index_is_clust(index),
1603 rec_size, 1)) {
1604 /* After reorganizing, there is space
1605 available. */
1606 goto use_heap;
1607 }
1608 } else {
1609 ut_ad(cursor->rec == cursor_rec);
1610 return(NULL);
1611 }
1612
1613 /* Try compressing the whole page afterwards. */
1614 insert_rec = page_cur_insert_rec_low(
1615 cursor->rec, index, rec, offsets, NULL);
1616
1617 /* If recovery is on, this implies that the compression
1618 of the page was successful during runtime. Had that not
1619 been the case or had the redo logging of compressed
1620 pages been enabled during runtime then we'd have seen
1621 a MLOG_ZIP_PAGE_COMPRESS redo record. Therefore, we
1622 know that we don't need to reorganize the page. We,
1623 however, do need to recompress the page. That will
1624 happen when the next redo record is read which must
1625 be of type MLOG_ZIP_PAGE_COMPRESS_NO_DATA and it must
1626 contain a valid compression level value.
1627 This implies that during recovery from this point till
1628 the next redo is applied the uncompressed and
1629 compressed versions are not identical and
1630 page_zip_validate will fail but that is OK because
1631 we call page_zip_validate only after processing
1632 all changes to a page under a single mtr during
1633 recovery. */
1634 if (insert_rec == NULL) {
1635 /* Out of space.
1636 This should never occur during crash recovery,
1637 because the MLOG_COMP_REC_INSERT should only
1638 be logged after a successful operation. */
1639 ut_ad(!recv_recovery_is_on());
1640 ut_ad(!index->is_dummy);
1641 } else if (recv_recovery_is_on()) {
1642 /* This should be followed by
1643 MLOG_ZIP_PAGE_COMPRESS_NO_DATA,
1644 which should succeed. */
1645 rec_offs_make_valid(insert_rec, index,
1646 page_is_leaf(page), offsets);
1647 } else {
1648 ulint pos = page_rec_get_n_recs_before(insert_rec);
1649 ut_ad(pos > 0);
1650
1651 if (!log_compressed) {
1652 if (page_zip_compress(
1653 page_zip, page, index,
1654 level, NULL, NULL)) {
1655 page_cur_insert_rec_write_log(
1656 insert_rec, rec_size,
1657 cursor->rec, index, mtr);
1658 page_zip_compress_write_log_no_data(
1659 level, page, index, mtr);
1660
1661 rec_offs_make_valid(
1662 insert_rec, index,
1663 page_is_leaf(page), offsets);
1664 return(insert_rec);
1665 }
1666
1667 /* Page compress failed. If this happened on a
1668 leaf page, put the data size into the sample
1669 buffer. */
1670 if (page_is_leaf(page)) {
1671 ulint occupied = page_get_data_size(page)
1672 + page_dir_calc_reserved_space(
1673 page_get_n_recs(page));
1674 index->stat_defrag_data_size_sample[
1675 index->stat_defrag_sample_next_slot] =
1676 occupied;
1677 index->stat_defrag_sample_next_slot =
1678 (index->stat_defrag_sample_next_slot
1679 + 1) % STAT_DEFRAG_DATA_SIZE_N_SAMPLE;
1680 }
1681
1682 ut_ad(cursor->rec
1683 == (pos > 1
1684 ? page_rec_get_nth(
1685 page, pos - 1)
1686 : page + PAGE_NEW_INFIMUM));
1687 } else {
1688 /* We are writing entire page images
1689 to the log. Reduce the redo log volume
1690 by reorganizing the page at the same time. */
1691 if (page_zip_reorganize(
1692 cursor->block, index, mtr)) {
1693 /* The page was reorganized:
1694 Seek to pos. */
1695 if (pos > 1) {
1696 cursor->rec = page_rec_get_nth(
1697 page, pos - 1);
1698 } else {
1699 cursor->rec = page
1700 + PAGE_NEW_INFIMUM;
1701 }
1702
1703 insert_rec = page + rec_get_next_offs(
1704 cursor->rec, TRUE);
1705 rec_offs_make_valid(
1706 insert_rec, index,
1707 page_is_leaf(page), offsets);
1708 return(insert_rec);
1709 }
1710
1711 /* Theoretically, we could try one
1712 last resort of btr_page_reorganize_low()
1713 followed by page_zip_available(), but
1714 that would be very unlikely to
1715 succeed. (If the full reorganized page
1716 failed to compress, why would it
1717 succeed to compress the page, plus log
1718 the insert of this record? */
1719 }
1720
1721 /* Out of space: restore the page */
1722 if (!page_zip_decompress(page_zip, page, FALSE)) {
1723 ut_error; /* Memory corrupted? */
1724 }
1725 ut_ad(page_validate(page, index));
1726 insert_rec = NULL;
1727 }
1728
1729 return(insert_rec);
1730 }
1731
1732 free_rec = page_header_get_ptr(page, PAGE_FREE);
1733 if (UNIV_LIKELY_NULL(free_rec)) {
1734 /* Try to allocate from the head of the free list. */
1735 lint extra_size_diff;
1736 rec_offs foffsets_[REC_OFFS_NORMAL_SIZE];
1737 rec_offs* foffsets = foffsets_;
1738 mem_heap_t* heap = NULL;
1739
1740 rec_offs_init(foffsets_);
1741
1742 foffsets = rec_get_offsets(free_rec, index, foffsets,
1743 page_rec_is_leaf(free_rec)
1744 ? index->n_core_fields : 0,
1745 ULINT_UNDEFINED, &heap);
1746 if (rec_offs_size(foffsets) < rec_size) {
1747 too_small:
1748 if (UNIV_LIKELY_NULL(heap)) {
1749 mem_heap_free(heap);
1750 }
1751
1752 goto use_heap;
1753 }
1754
1755 insert_buf = free_rec - rec_offs_extra_size(foffsets);
1756
1757 /* On compressed pages, do not relocate records from
1758 the free list. If extra_size would grow, use the heap. */
1759 extra_size_diff = lint(rec_offs_extra_size(offsets)
1760 - rec_offs_extra_size(foffsets));
1761
1762 if (UNIV_UNLIKELY(extra_size_diff < 0)) {
1763 /* Add an offset to the extra_size. */
1764 if (rec_offs_size(foffsets)
1765 < rec_size - ulint(extra_size_diff)) {
1766
1767 goto too_small;
1768 }
1769
1770 insert_buf -= extra_size_diff;
1771 } else if (UNIV_UNLIKELY(extra_size_diff)) {
1772 /* Do not allow extra_size to grow */
1773
1774 goto too_small;
1775 }
1776
1777 heap_no = rec_get_heap_no_new(free_rec);
1778 page_mem_alloc_free(page, page_zip,
1779 rec_get_next_ptr(free_rec, TRUE),
1780 rec_size);
1781
1782 if (!page_is_leaf(page)) {
1783 /* Zero out the node pointer of free_rec,
1784 in case it will not be overwritten by
1785 insert_rec. */
1786
1787 ut_ad(rec_size > REC_NODE_PTR_SIZE);
1788
1789 if (rec_offs_extra_size(foffsets)
1790 + rec_offs_data_size(foffsets) > rec_size) {
1791
1792 memset(rec_get_end(free_rec, foffsets)
1793 - REC_NODE_PTR_SIZE, 0,
1794 REC_NODE_PTR_SIZE);
1795 }
1796 } else if (dict_index_is_clust(index)) {
1797 /* Zero out the DB_TRX_ID and DB_ROLL_PTR
1798 columns of free_rec, in case it will not be
1799 overwritten by insert_rec. */
1800
1801 ulint trx_id_col;
1802 ulint trx_id_offs;
1803 ulint len;
1804
1805 trx_id_col = dict_index_get_sys_col_pos(index,
1806 DATA_TRX_ID);
1807 ut_ad(trx_id_col > 0);
1808 ut_ad(trx_id_col != ULINT_UNDEFINED);
1809
1810 trx_id_offs = rec_get_nth_field_offs(foffsets,
1811 trx_id_col, &len);
1812 ut_ad(len == DATA_TRX_ID_LEN);
1813
1814 if (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN + trx_id_offs
1815 + rec_offs_extra_size(foffsets) > rec_size) {
1816 /* We will have to zero out the
1817 DB_TRX_ID and DB_ROLL_PTR, because
1818 they will not be fully overwritten by
1819 insert_rec. */
1820
1821 memset(free_rec + trx_id_offs, 0,
1822 DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
1823 }
1824
1825 ut_ad(free_rec + trx_id_offs + DATA_TRX_ID_LEN
1826 == rec_get_nth_field(free_rec, foffsets,
1827 trx_id_col + 1, &len));
1828 ut_ad(len == DATA_ROLL_PTR_LEN);
1829 }
1830
1831 if (UNIV_LIKELY_NULL(heap)) {
1832 mem_heap_free(heap);
1833 }
1834 } else {
1835 use_heap:
1836 free_rec = NULL;
1837 insert_buf = page_mem_alloc_heap(page, page_zip,
1838 rec_size, &heap_no);
1839
1840 if (UNIV_UNLIKELY(insert_buf == NULL)) {
1841 return(NULL);
1842 }
1843
1844 page_zip_dir_add_slot(page_zip, dict_index_is_clust(index));
1845 }
1846
1847 /* 3. Create the record */
1848 insert_rec = rec_copy(insert_buf, rec, offsets);
1849 rec_offs_make_valid(insert_rec, index, page_is_leaf(page), offsets);
1850
1851 /* 4. Insert the record in the linked list of records */
1852 ut_ad(cursor->rec != insert_rec);
1853
1854 {
1855 /* next record after current before the insertion */
1856 const rec_t* next_rec = page_rec_get_next_low(
1857 cursor->rec, TRUE);
1858 ut_ad(rec_get_status(cursor->rec)
1859 <= REC_STATUS_INFIMUM);
1860 ut_ad(rec_get_status(insert_rec) < REC_STATUS_INFIMUM);
1861 ut_ad(rec_get_status(next_rec) != REC_STATUS_INFIMUM);
1862
1863 page_rec_set_next(insert_rec, next_rec);
1864 page_rec_set_next(cursor->rec, insert_rec);
1865 }
1866
1867 page_header_set_field(page, page_zip, PAGE_N_RECS,
1868 1U + page_get_n_recs(page));
1869
1870 /* 5. Set the n_owned field in the inserted record to zero,
1871 and set the heap_no field */
1872 rec_set_n_owned_new(insert_rec, NULL, 0);
1873 rec_set_heap_no_new(insert_rec, heap_no);
1874
1875 MEM_CHECK_DEFINED(rec_get_start(insert_rec, offsets),
1876 rec_offs_size(offsets));
1877
1878 page_zip_dir_insert(page_zip, cursor->rec, free_rec, insert_rec);
1879
1880 /* 6. Update the last insertion info in page header */
1881
1882 last_insert = page_header_get_ptr(page, PAGE_LAST_INSERT);
1883 ut_ad(!last_insert
1884 || rec_get_node_ptr_flag(last_insert)
1885 == rec_get_node_ptr_flag(insert_rec));
1886
1887 if (!dict_index_is_spatial(index)) {
1888 byte* ptr = PAGE_HEADER + PAGE_DIRECTION_B + page;
1889 if (UNIV_UNLIKELY(last_insert == NULL)) {
1890 no_direction:
1891 page_direction_reset(ptr, page, page_zip);
1892 } else if (last_insert == cursor->rec
1893 && page_ptr_get_direction(ptr) != PAGE_LEFT) {
1894 page_direction_increment(ptr, page, page_zip,
1895 PAGE_RIGHT);
1896 } else if (page_ptr_get_direction(ptr) != PAGE_RIGHT
1897 && page_rec_get_next(insert_rec) == last_insert) {
1898 page_direction_increment(ptr, page, page_zip,
1899 PAGE_LEFT);
1900 } else {
1901 goto no_direction;
1902 }
1903 }
1904
1905 page_header_set_ptr(page, page_zip, PAGE_LAST_INSERT, insert_rec);
1906
1907 /* 7. It remains to update the owner record. */
1908 {
1909 rec_t* owner_rec = page_rec_find_owner_rec(insert_rec);
1910 ulint n_owned;
1911
1912 n_owned = rec_get_n_owned_new(owner_rec);
1913 rec_set_n_owned_new(owner_rec, page_zip, n_owned + 1);
1914
1915 /* 8. Now we have incremented the n_owned field of the owner
1916 record. If the number exceeds PAGE_DIR_SLOT_MAX_N_OWNED,
1917 we have to split the corresponding directory slot in two. */
1918
1919 if (UNIV_UNLIKELY(n_owned == PAGE_DIR_SLOT_MAX_N_OWNED)) {
1920 page_dir_split_slot(
1921 page, page_zip,
1922 page_dir_find_owner_slot(owner_rec));
1923 }
1924 }
1925
1926 page_zip_write_rec(page_zip, insert_rec, index, offsets, 1);
1927
1928 /* 9. Write log record of the insert */
1929 if (UNIV_LIKELY(mtr != NULL)) {
1930 page_cur_insert_rec_write_log(insert_rec, rec_size,
1931 cursor->rec, index, mtr);
1932 }
1933
1934 return(insert_rec);
1935 }
1936
1937 /**********************************************************//**
1938 Writes a log record of copying a record list end to a new created page.
1939 @return 4-byte field where to write the log data length, or NULL if
1940 logging is disabled */
1941 UNIV_INLINE
1942 byte*
page_copy_rec_list_to_created_page_write_log(page_t * page,dict_index_t * index,mtr_t * mtr)1943 page_copy_rec_list_to_created_page_write_log(
1944 /*=========================================*/
1945 page_t* page, /*!< in: index page */
1946 dict_index_t* index, /*!< in: record descriptor */
1947 mtr_t* mtr) /*!< in: mtr */
1948 {
1949 byte* log_ptr;
1950
1951 ut_ad(!!page_is_comp(page) == dict_table_is_comp(index->table));
1952 ut_ad(mtr->is_named_space(index->table->space));
1953
1954 log_ptr = mlog_open_and_write_index(mtr, page, index,
1955 page_is_comp(page)
1956 ? MLOG_COMP_LIST_END_COPY_CREATED
1957 : MLOG_LIST_END_COPY_CREATED, 4);
1958 if (UNIV_LIKELY(log_ptr != NULL)) {
1959 mlog_close(mtr, log_ptr + 4);
1960 }
1961
1962 return(log_ptr);
1963 }
1964
1965 /**********************************************************//**
1966 Parses a log record of copying a record list end to a new created page.
1967 @return end of log record or NULL */
1968 byte*
page_parse_copy_rec_list_to_created_page(byte * ptr,byte * end_ptr,buf_block_t * block,dict_index_t * index,mtr_t * mtr)1969 page_parse_copy_rec_list_to_created_page(
1970 /*=====================================*/
1971 byte* ptr, /*!< in: buffer */
1972 byte* end_ptr,/*!< in: buffer end */
1973 buf_block_t* block, /*!< in: page or NULL */
1974 dict_index_t* index, /*!< in: record descriptor */
1975 mtr_t* mtr) /*!< in: mtr or NULL */
1976 {
1977 byte* rec_end;
1978 ulint log_data_len;
1979 page_t* page;
1980 page_zip_des_t* page_zip;
1981
1982 ut_ad(index->is_dummy);
1983
1984 if (ptr + 4 > end_ptr) {
1985
1986 return(NULL);
1987 }
1988
1989 log_data_len = mach_read_from_4(ptr);
1990 ptr += 4;
1991
1992 rec_end = ptr + log_data_len;
1993
1994 if (rec_end > end_ptr) {
1995
1996 return(NULL);
1997 }
1998
1999 if (!block) {
2000
2001 return(rec_end);
2002 }
2003
2004 ut_ad(fil_page_index_page_check(block->frame));
2005 /* This function is never invoked on the clustered index root page,
2006 except in the redo log apply of
2007 page_copy_rec_list_end_to_created_page() which was logged by.
2008 page_copy_rec_list_to_created_page_write_log().
2009 For other pages, this field must be zero-initialized. */
2010 ut_ad(!page_get_instant(block->frame)
2011 || !page_has_siblings(block->frame));
2012
2013 while (ptr < rec_end) {
2014 ptr = page_cur_parse_insert_rec(TRUE, ptr, end_ptr,
2015 block, index, mtr);
2016 }
2017
2018 ut_a(ptr == rec_end);
2019
2020 page = buf_block_get_frame(block);
2021 page_zip = buf_block_get_page_zip(block);
2022
2023 page_header_set_ptr(page, page_zip, PAGE_LAST_INSERT, NULL);
2024
2025 if (!dict_index_is_spatial(index)) {
2026 page_direction_reset(PAGE_HEADER + PAGE_DIRECTION_B + page,
2027 page, page_zip);
2028 }
2029
2030 return(rec_end);
2031 }
2032
2033 /*************************************************************//**
2034 Copies records from page to a newly created page, from a given record onward,
2035 including that record. Infimum and supremum records are not copied.
2036
2037 IMPORTANT: The caller will have to update IBUF_BITMAP_FREE
2038 if this is a compressed leaf page in a secondary index.
2039 This has to be done either within the same mini-transaction,
2040 or by invoking ibuf_reset_free_bits() before mtr_commit(). */
2041 void
page_copy_rec_list_end_to_created_page(page_t * new_page,rec_t * rec,dict_index_t * index,mtr_t * mtr)2042 page_copy_rec_list_end_to_created_page(
2043 /*===================================*/
2044 page_t* new_page, /*!< in/out: index page to copy to */
2045 rec_t* rec, /*!< in: first record to copy */
2046 dict_index_t* index, /*!< in: record descriptor */
2047 mtr_t* mtr) /*!< in: mtr */
2048 {
2049 page_dir_slot_t* slot = 0; /* remove warning */
2050 byte* heap_top;
2051 rec_t* insert_rec = 0; /* remove warning */
2052 rec_t* prev_rec;
2053 ulint count;
2054 ulint n_recs;
2055 ulint slot_index;
2056 ulint rec_size;
2057 byte* log_ptr;
2058 ulint log_data_len;
2059 mem_heap_t* heap = NULL;
2060 rec_offs offsets_[REC_OFFS_NORMAL_SIZE];
2061 rec_offs* offsets = offsets_;
2062 rec_offs_init(offsets_);
2063
2064 ut_ad(page_dir_get_n_heap(new_page) == PAGE_HEAP_NO_USER_LOW);
2065 ut_ad(page_align(rec) != new_page);
2066 ut_ad(page_rec_is_comp(rec) == page_is_comp(new_page));
2067 ut_ad(fil_page_index_page_check(new_page));
2068 /* This function is never invoked on the clustered index root page,
2069 except in btr_lift_page_up(). */
2070 ut_ad(!page_get_instant(new_page) || !page_has_siblings(new_page));
2071
2072 if (page_rec_is_infimum(rec)) {
2073
2074 rec = page_rec_get_next(rec);
2075 }
2076
2077 if (page_rec_is_supremum(rec)) {
2078
2079 return;
2080 }
2081
2082 #ifdef UNIV_DEBUG
2083 /* To pass the debug tests we have to set these dummy values
2084 in the debug version */
2085 page_dir_set_n_slots(new_page, NULL, srv_page_size / 2);
2086 page_header_set_ptr(new_page, NULL, PAGE_HEAP_TOP,
2087 new_page + srv_page_size - 1);
2088 #endif
2089 log_ptr = page_copy_rec_list_to_created_page_write_log(new_page,
2090 index, mtr);
2091
2092 log_data_len = mtr->get_log()->size();
2093
2094 /* Individual inserts are logged in a shorter form */
2095
2096 const mtr_log_t log_mode = index->table->is_temporary()
2097 || !index->is_readable() /* IMPORT TABLESPACE */
2098 ? mtr_get_log_mode(mtr)
2099 : mtr_set_log_mode(mtr, MTR_LOG_SHORT_INSERTS);
2100
2101 prev_rec = page_get_infimum_rec(new_page);
2102 if (page_is_comp(new_page)) {
2103 heap_top = new_page + PAGE_NEW_SUPREMUM_END;
2104 } else {
2105 heap_top = new_page + PAGE_OLD_SUPREMUM_END;
2106 }
2107 count = 0;
2108 slot_index = 0;
2109 n_recs = 0;
2110
2111 const ulint n_core = page_is_leaf(new_page)
2112 ? index->n_core_fields : 0;
2113
2114 do {
2115 offsets = rec_get_offsets(rec, index, offsets, n_core,
2116 ULINT_UNDEFINED, &heap);
2117 insert_rec = rec_copy(heap_top, rec, offsets);
2118
2119 if (page_is_comp(new_page)) {
2120 rec_set_next_offs_new(prev_rec,
2121 page_offset(insert_rec));
2122
2123 rec_set_n_owned_new(insert_rec, NULL, 0);
2124 rec_set_heap_no_new(insert_rec,
2125 PAGE_HEAP_NO_USER_LOW + n_recs);
2126 } else {
2127 rec_set_next_offs_old(prev_rec,
2128 page_offset(insert_rec));
2129
2130 rec_set_n_owned_old(insert_rec, 0);
2131 rec_set_heap_no_old(insert_rec,
2132 PAGE_HEAP_NO_USER_LOW + n_recs);
2133 }
2134
2135 count++;
2136 n_recs++;
2137
2138 if (UNIV_UNLIKELY
2139 (count == (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2)) {
2140
2141 slot_index++;
2142
2143 slot = page_dir_get_nth_slot(new_page, slot_index);
2144
2145 page_dir_slot_set_rec(slot, insert_rec);
2146 page_dir_slot_set_n_owned(slot, NULL, count);
2147
2148 count = 0;
2149 }
2150
2151 rec_size = rec_offs_size(offsets);
2152
2153 ut_ad(heap_top < new_page + srv_page_size);
2154
2155 heap_top += rec_size;
2156
2157 rec_offs_make_valid(insert_rec, index, n_core != 0, offsets);
2158 page_cur_insert_rec_write_log(insert_rec, rec_size, prev_rec,
2159 index, mtr);
2160 prev_rec = insert_rec;
2161 rec = page_rec_get_next(rec);
2162 } while (!page_rec_is_supremum(rec));
2163
2164 ut_ad(n_recs);
2165
2166 if ((slot_index > 0) && (count + 1
2167 + (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2
2168 <= PAGE_DIR_SLOT_MAX_N_OWNED)) {
2169 /* We can merge the two last dir slots. This operation is
2170 here to make this function imitate exactly the equivalent
2171 task made using page_cur_insert_rec, which we use in database
2172 recovery to reproduce the task performed by this function.
2173 To be able to check the correctness of recovery, it is good
2174 that it imitates exactly. */
2175
2176 count += (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2;
2177
2178 page_dir_slot_set_n_owned(slot, NULL, 0);
2179
2180 slot_index--;
2181 }
2182
2183 if (UNIV_LIKELY_NULL(heap)) {
2184 mem_heap_free(heap);
2185 }
2186
2187 /* Restore the log mode */
2188
2189 mtr_set_log_mode(mtr, log_mode);
2190
2191 log_data_len = mtr->get_log()->size() - log_data_len;
2192
2193 ut_a(log_data_len < 100U << srv_page_size_shift);
2194
2195 if (log_ptr != NULL) {
2196 mach_write_to_4(log_ptr, log_data_len);
2197 }
2198
2199 if (page_is_comp(new_page)) {
2200 rec_set_next_offs_new(insert_rec, PAGE_NEW_SUPREMUM);
2201 } else {
2202 rec_set_next_offs_old(insert_rec, PAGE_OLD_SUPREMUM);
2203 }
2204
2205 slot = page_dir_get_nth_slot(new_page, 1 + slot_index);
2206
2207 page_dir_slot_set_rec(slot, page_get_supremum_rec(new_page));
2208 page_dir_slot_set_n_owned(slot, NULL, count + 1);
2209
2210 page_dir_set_n_slots(new_page, NULL, 2 + slot_index);
2211 page_header_set_ptr(new_page, NULL, PAGE_HEAP_TOP, heap_top);
2212 page_dir_set_n_heap(new_page, NULL, PAGE_HEAP_NO_USER_LOW + n_recs);
2213 page_header_set_field(new_page, NULL, PAGE_N_RECS, n_recs);
2214
2215 *reinterpret_cast<uint16_t*>(PAGE_HEADER + PAGE_LAST_INSERT + new_page)
2216 = 0;
2217 page_direction_reset(PAGE_HEADER + PAGE_DIRECTION_B + new_page,
2218 new_page, NULL);
2219 }
2220
2221 /***********************************************************//**
2222 Writes log record of a record delete on a page. */
2223 UNIV_INLINE
2224 void
page_cur_delete_rec_write_log(rec_t * rec,const dict_index_t * index,mtr_t * mtr)2225 page_cur_delete_rec_write_log(
2226 /*==========================*/
2227 rec_t* rec, /*!< in: record to be deleted */
2228 const dict_index_t* index, /*!< in: record descriptor */
2229 mtr_t* mtr) /*!< in: mini-transaction handle */
2230 {
2231 byte* log_ptr;
2232
2233 ut_ad(!!page_rec_is_comp(rec) == dict_table_is_comp(index->table));
2234 ut_ad(mtr->is_named_space(index->table->space));
2235
2236 log_ptr = mlog_open_and_write_index(mtr, rec, index,
2237 page_rec_is_comp(rec)
2238 ? MLOG_COMP_REC_DELETE
2239 : MLOG_REC_DELETE, 2);
2240
2241 if (!log_ptr) {
2242 /* Logging in mtr is switched off during crash recovery:
2243 in that case mlog_open returns NULL */
2244 return;
2245 }
2246
2247 /* Write the cursor rec offset as a 2-byte ulint */
2248 mach_write_to_2(log_ptr, page_offset(rec));
2249
2250 mlog_close(mtr, log_ptr + 2);
2251 }
2252
2253 /***********************************************************//**
2254 Parses log record of a record delete on a page.
2255 @return pointer to record end or NULL */
2256 byte*
page_cur_parse_delete_rec(byte * ptr,byte * end_ptr,buf_block_t * block,dict_index_t * index,mtr_t * mtr)2257 page_cur_parse_delete_rec(
2258 /*======================*/
2259 byte* ptr, /*!< in: buffer */
2260 byte* end_ptr,/*!< in: buffer end */
2261 buf_block_t* block, /*!< in: page or NULL */
2262 dict_index_t* index, /*!< in: record descriptor */
2263 mtr_t* mtr) /*!< in: mtr or NULL */
2264 {
2265 ulint offset;
2266 page_cur_t cursor;
2267
2268 if (end_ptr < ptr + 2) {
2269
2270 return(NULL);
2271 }
2272
2273 /* Read the cursor rec offset as a 2-byte ulint */
2274 offset = mach_read_from_2(ptr);
2275 ptr += 2;
2276
2277 if (UNIV_UNLIKELY(offset >= srv_page_size)) {
2278 recv_sys->found_corrupt_log = true;
2279 return NULL;
2280 }
2281
2282 if (block) {
2283 page_t* page = buf_block_get_frame(block);
2284 mem_heap_t* heap = NULL;
2285 rec_offs offsets_[REC_OFFS_NORMAL_SIZE];
2286 rec_t* rec = page + offset;
2287 rec_offs_init(offsets_);
2288
2289 page_cur_position(rec, block, &cursor);
2290 ut_ad(!buf_block_get_page_zip(block) || page_is_comp(page));
2291
2292 page_cur_delete_rec(&cursor, index,
2293 rec_get_offsets(rec, index, offsets_,
2294 page_rec_is_leaf(rec)
2295 ? index->n_core_fields : 0,
2296 ULINT_UNDEFINED, &heap),
2297 mtr);
2298 if (UNIV_LIKELY_NULL(heap)) {
2299 mem_heap_free(heap);
2300 }
2301 }
2302
2303 return(ptr);
2304 }
2305
2306 /***********************************************************//**
2307 Deletes a record at the page cursor. The cursor is moved to the next
2308 record after the deleted one. */
2309 void
page_cur_delete_rec(page_cur_t * cursor,const dict_index_t * index,const rec_offs * offsets,mtr_t * mtr)2310 page_cur_delete_rec(
2311 /*================*/
2312 page_cur_t* cursor, /*!< in/out: a page cursor */
2313 const dict_index_t* index, /*!< in: record descriptor */
2314 const rec_offs* offsets,/*!< in: rec_get_offsets(
2315 cursor->rec, index) */
2316 mtr_t* mtr) /*!< in: mini-transaction handle
2317 or NULL */
2318 {
2319 page_dir_slot_t* cur_dir_slot;
2320 page_dir_slot_t* prev_slot;
2321 page_t* page;
2322 page_zip_des_t* page_zip;
2323 rec_t* current_rec;
2324 rec_t* prev_rec = NULL;
2325 rec_t* next_rec;
2326 ulint cur_slot_no;
2327 ulint cur_n_owned;
2328 rec_t* rec;
2329
2330 page = page_cur_get_page(cursor);
2331 page_zip = page_cur_get_page_zip(cursor);
2332
2333 /* page_zip_validate() will fail here when
2334 btr_cur_pessimistic_delete() invokes btr_set_min_rec_mark().
2335 Then, both "page_zip" and "page" would have the min-rec-mark
2336 set on the smallest user record, but "page" would additionally
2337 have it set on the smallest-but-one record. Because sloppy
2338 page_zip_validate_low() only ignores min-rec-flag differences
2339 in the smallest user record, it cannot be used here either. */
2340
2341 current_rec = cursor->rec;
2342 ut_ad(rec_offs_validate(current_rec, index, offsets));
2343 ut_ad(!!page_is_comp(page) == dict_table_is_comp(index->table));
2344 ut_ad(fil_page_index_page_check(page));
2345 ut_ad(mach_read_from_8(page + PAGE_HEADER + PAGE_INDEX_ID) == index->id
2346 || index->is_dummy
2347 || (mtr ? mtr->is_inside_ibuf() : dict_index_is_ibuf(index)));
2348 ut_ad(!mtr || mtr->is_named_space(index->table->space));
2349
2350 /* The record must not be the supremum or infimum record. */
2351 ut_ad(page_rec_is_user_rec(current_rec));
2352
2353 if (page_get_n_recs(page) == 1 && !recv_recovery_is_on()) {
2354 /* Empty the page, unless we are applying the redo log
2355 during crash recovery. During normal operation, the
2356 page_create_empty() gets logged as one of MLOG_PAGE_CREATE,
2357 MLOG_COMP_PAGE_CREATE, MLOG_ZIP_PAGE_COMPRESS. */
2358 ut_ad(page_is_leaf(page));
2359 /* Usually, this should be the root page,
2360 and the whole index tree should become empty.
2361 However, this could also be a call in
2362 btr_cur_pessimistic_update() to delete the only
2363 record in the page and to insert another one. */
2364 page_cur_move_to_next(cursor);
2365 ut_ad(page_cur_is_after_last(cursor));
2366 page_create_empty(page_cur_get_block(cursor),
2367 const_cast<dict_index_t*>(index), mtr);
2368 return;
2369 }
2370
2371 /* Save to local variables some data associated with current_rec */
2372 cur_slot_no = page_dir_find_owner_slot(current_rec);
2373 ut_ad(cur_slot_no > 0);
2374 cur_dir_slot = page_dir_get_nth_slot(page, cur_slot_no);
2375 cur_n_owned = page_dir_slot_get_n_owned(cur_dir_slot);
2376
2377 /* 0. Write the log record */
2378 if (mtr != 0) {
2379 page_cur_delete_rec_write_log(current_rec, index, mtr);
2380 }
2381
2382 /* 1. Reset the last insert info in the page header and increment
2383 the modify clock for the frame */
2384
2385 page_header_set_ptr(page, page_zip, PAGE_LAST_INSERT, NULL);
2386
2387 /* The page gets invalid for optimistic searches: increment the
2388 frame modify clock only if there is an mini-transaction covering
2389 the change. During IMPORT we allocate local blocks that are not
2390 part of the buffer pool. */
2391
2392 if (mtr != 0) {
2393 buf_block_modify_clock_inc(page_cur_get_block(cursor));
2394 }
2395
2396 /* 2. Find the next and the previous record. Note that the cursor is
2397 left at the next record. */
2398
2399 ut_ad(cur_slot_no > 0);
2400 prev_slot = page_dir_get_nth_slot(page, cur_slot_no - 1);
2401
2402 rec = (rec_t*) page_dir_slot_get_rec(prev_slot);
2403
2404 /* rec now points to the record of the previous directory slot. Look
2405 for the immediate predecessor of current_rec in a loop. */
2406
2407 while (current_rec != rec) {
2408 prev_rec = rec;
2409 rec = page_rec_get_next(rec);
2410 }
2411
2412 page_cur_move_to_next(cursor);
2413 next_rec = cursor->rec;
2414
2415 /* 3. Remove the record from the linked list of records */
2416
2417 page_rec_set_next(prev_rec, next_rec);
2418
2419 /* 4. If the deleted record is pointed to by a dir slot, update the
2420 record pointer in slot. In the following if-clause we assume that
2421 prev_rec is owned by the same slot, i.e., PAGE_DIR_SLOT_MIN_N_OWNED
2422 >= 2. */
2423
2424 compile_time_assert(PAGE_DIR_SLOT_MIN_N_OWNED >= 2);
2425 ut_ad(cur_n_owned > 1);
2426
2427 if (current_rec == page_dir_slot_get_rec(cur_dir_slot)) {
2428 page_dir_slot_set_rec(cur_dir_slot, prev_rec);
2429 }
2430
2431 /* 5. Update the number of owned records of the slot */
2432
2433 page_dir_slot_set_n_owned(cur_dir_slot, page_zip, cur_n_owned - 1);
2434
2435 /* 6. Free the memory occupied by the record */
2436 page_mem_free(page, page_zip, current_rec, index, offsets);
2437
2438 /* 7. Now we have decremented the number of owned records of the slot.
2439 If the number drops below PAGE_DIR_SLOT_MIN_N_OWNED, we balance the
2440 slots. */
2441
2442 if (cur_n_owned <= PAGE_DIR_SLOT_MIN_N_OWNED) {
2443 page_dir_balance_slot(page, page_zip, cur_slot_no);
2444 }
2445 }
2446
2447 #ifdef UNIV_COMPILE_TEST_FUNCS
2448
2449 /*******************************************************************//**
2450 Print the first n numbers, generated by ut_rnd_gen() to make sure
2451 (visually) that it works properly. */
2452 void
test_ut_rnd_gen(int n)2453 test_ut_rnd_gen(
2454 int n) /*!< in: print first n numbers */
2455 {
2456 int i;
2457 unsigned long long rnd;
2458
2459 for (i = 0; i < n; i++) {
2460 rnd = ut_rnd_gen();
2461 printf("%llu\t%%2=%llu %%3=%llu %%5=%llu %%7=%llu %%11=%llu\n",
2462 rnd,
2463 rnd % 2,
2464 rnd % 3,
2465 rnd % 5,
2466 rnd % 7,
2467 rnd % 11);
2468 }
2469 }
2470
2471 #endif /* UNIV_COMPILE_TEST_FUNCS */
2472