1 /*****************************************************************************
2 
3 Copyright (c) 1994, 2016, Oracle and/or its affiliates. All Rights Reserved.
4 Copyright (c) 2012, Facebook Inc.
5 Copyright (c) 2018, 2021, MariaDB Corporation.
6 
7 This program is free software; you can redistribute it and/or modify it under
8 the terms of the GNU General Public License as published by the Free Software
9 Foundation; version 2 of the License.
10 
11 This program is distributed in the hope that it will be useful, but WITHOUT
12 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
13 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
14 
15 You should have received a copy of the GNU General Public License along with
16 this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
18 
19 *****************************************************************************/
20 
21 /********************************************************************//**
22 @file page/page0cur.cc
23 The page cursor
24 
25 Created 10/4/1994 Heikki Tuuri
26 *************************************************************************/
27 
28 #include "page0cur.h"
29 #include "page0zip.h"
30 #include "btr0btr.h"
31 #include "mtr0log.h"
32 #include "log0recv.h"
33 #include "rem0cmp.h"
34 #include "gis0rtree.h"
35 
36 #include <algorithm>
37 
38 #ifdef BTR_CUR_HASH_ADAPT
39 # ifdef UNIV_SEARCH_PERF_STAT
40 static ulint	page_cur_short_succ;
41 # endif /* UNIV_SEARCH_PERF_STAT */
42 
43 /** Try a search shortcut based on the last insert.
44 @param[in]	block			index page
45 @param[in]	index			index tree
46 @param[in]	tuple			search key
47 @param[in,out]	iup_matched_fields	already matched fields in the
48 upper limit record
49 @param[in,out]	ilow_matched_fields	already matched fields in the
50 lower limit record
51 @param[out]	cursor			page cursor
52 @return true on success */
53 UNIV_INLINE
54 bool
page_cur_try_search_shortcut(const buf_block_t * block,const dict_index_t * index,const dtuple_t * tuple,ulint * iup_matched_fields,ulint * ilow_matched_fields,page_cur_t * cursor)55 page_cur_try_search_shortcut(
56 	const buf_block_t*	block,
57 	const dict_index_t*	index,
58 	const dtuple_t*		tuple,
59 	ulint*			iup_matched_fields,
60 	ulint*			ilow_matched_fields,
61 	page_cur_t*		cursor)
62 {
63 	const rec_t*	rec;
64 	const rec_t*	next_rec;
65 	ulint		low_match;
66 	ulint		up_match;
67 	ibool		success		= FALSE;
68 	const page_t*	page		= buf_block_get_frame(block);
69 	mem_heap_t*	heap		= NULL;
70 	rec_offs	offsets_[REC_OFFS_NORMAL_SIZE];
71 	rec_offs*	offsets		= offsets_;
72 	rec_offs_init(offsets_);
73 
74 	ut_ad(dtuple_check_typed(tuple));
75 	ut_ad(page_is_leaf(page));
76 
77 	rec = page_header_get_ptr(page, PAGE_LAST_INSERT);
78 	offsets = rec_get_offsets(rec, index, offsets, index->n_core_fields,
79 				  dtuple_get_n_fields(tuple), &heap);
80 
81 	ut_ad(rec);
82 	ut_ad(page_rec_is_user_rec(rec));
83 
84 	low_match = up_match = std::min(*ilow_matched_fields,
85 					*iup_matched_fields);
86 
87 	if (cmp_dtuple_rec_with_match(tuple, rec, offsets, &low_match) < 0) {
88 		goto exit_func;
89 	}
90 
91 	next_rec = page_rec_get_next_const(rec);
92 	if (!page_rec_is_supremum(next_rec)) {
93 		offsets = rec_get_offsets(next_rec, index, offsets,
94 					  index->n_core_fields,
95 					  dtuple_get_n_fields(tuple), &heap);
96 
97 		if (cmp_dtuple_rec_with_match(tuple, next_rec, offsets,
98 					      &up_match) >= 0) {
99 			goto exit_func;
100 		}
101 
102 		*iup_matched_fields = up_match;
103 	}
104 
105 	page_cur_position(rec, block, cursor);
106 
107 	*ilow_matched_fields = low_match;
108 
109 #ifdef UNIV_SEARCH_PERF_STAT
110 	page_cur_short_succ++;
111 #endif
112 	success = TRUE;
113 exit_func:
114 	if (UNIV_LIKELY_NULL(heap)) {
115 		mem_heap_free(heap);
116 	}
117 	return(success);
118 }
119 
120 /** Try a search shortcut based on the last insert.
121 @param[in]	block			index page
122 @param[in]	index			index tree
123 @param[in]	tuple			search key
124 @param[in,out]	iup_matched_fields	already matched fields in the
125 upper limit record
126 @param[in,out]	iup_matched_bytes	already matched bytes in the
127 first partially matched field in the upper limit record
128 @param[in,out]	ilow_matched_fields	already matched fields in the
129 lower limit record
130 @param[in,out]	ilow_matched_bytes	already matched bytes in the
131 first partially matched field in the lower limit record
132 @param[out]	cursor			page cursor
133 @return true on success */
134 UNIV_INLINE
135 bool
page_cur_try_search_shortcut_bytes(const buf_block_t * block,const dict_index_t * index,const dtuple_t * tuple,ulint * iup_matched_fields,ulint * iup_matched_bytes,ulint * ilow_matched_fields,ulint * ilow_matched_bytes,page_cur_t * cursor)136 page_cur_try_search_shortcut_bytes(
137 	const buf_block_t*	block,
138 	const dict_index_t*	index,
139 	const dtuple_t*		tuple,
140 	ulint*			iup_matched_fields,
141 	ulint*			iup_matched_bytes,
142 	ulint*			ilow_matched_fields,
143 	ulint*			ilow_matched_bytes,
144 	page_cur_t*		cursor)
145 {
146 	const rec_t*	rec;
147 	const rec_t*	next_rec;
148 	ulint		low_match;
149 	ulint		low_bytes;
150 	ulint		up_match;
151 	ulint		up_bytes;
152 	ibool		success		= FALSE;
153 	const page_t*	page		= buf_block_get_frame(block);
154 	mem_heap_t*	heap		= NULL;
155 	rec_offs	offsets_[REC_OFFS_NORMAL_SIZE];
156 	rec_offs*	offsets		= offsets_;
157 	rec_offs_init(offsets_);
158 
159 	ut_ad(dtuple_check_typed(tuple));
160 	ut_ad(page_is_leaf(page));
161 
162 	rec = page_header_get_ptr(page, PAGE_LAST_INSERT);
163 	offsets = rec_get_offsets(rec, index, offsets, index->n_core_fields,
164 				  dtuple_get_n_fields(tuple), &heap);
165 
166 	ut_ad(rec);
167 	ut_ad(page_rec_is_user_rec(rec));
168 	if (ut_pair_cmp(*ilow_matched_fields, *ilow_matched_bytes,
169 			*iup_matched_fields, *iup_matched_bytes) < 0) {
170 		up_match = low_match = *ilow_matched_fields;
171 		up_bytes = low_bytes = *ilow_matched_bytes;
172 	} else {
173 		up_match = low_match = *iup_matched_fields;
174 		up_bytes = low_bytes = *iup_matched_bytes;
175 	}
176 
177 	if (cmp_dtuple_rec_with_match_bytes(
178 		    tuple, rec, index, offsets, &low_match, &low_bytes) < 0) {
179 		goto exit_func;
180 	}
181 
182 	next_rec = page_rec_get_next_const(rec);
183 	if (!page_rec_is_supremum(next_rec)) {
184 		offsets = rec_get_offsets(next_rec, index, offsets,
185 					  index->n_core_fields,
186 					  dtuple_get_n_fields(tuple), &heap);
187 
188 		if (cmp_dtuple_rec_with_match_bytes(
189 			    tuple, next_rec, index, offsets,
190 			    &up_match, &up_bytes)
191 		    >= 0) {
192 			goto exit_func;
193 		}
194 
195 		*iup_matched_fields = up_match;
196 		*iup_matched_bytes = up_bytes;
197 	}
198 
199 	page_cur_position(rec, block, cursor);
200 
201 	*ilow_matched_fields = low_match;
202 	*ilow_matched_bytes = low_bytes;
203 
204 #ifdef UNIV_SEARCH_PERF_STAT
205 	page_cur_short_succ++;
206 #endif
207 	success = TRUE;
208 exit_func:
209 	if (UNIV_LIKELY_NULL(heap)) {
210 		mem_heap_free(heap);
211 	}
212 	return(success);
213 }
214 #endif /* BTR_CUR_HASH_ADAPT */
215 
216 #ifdef PAGE_CUR_LE_OR_EXTENDS
217 /****************************************************************//**
218 Checks if the nth field in a record is a character type field which extends
219 the nth field in tuple, i.e., the field is longer or equal in length and has
220 common first characters.
221 @return TRUE if rec field extends tuple field */
222 static
223 ibool
page_cur_rec_field_extends(const dtuple_t * tuple,const rec_t * rec,const rec_offs * offsets,ulint n)224 page_cur_rec_field_extends(
225 /*=======================*/
226 	const dtuple_t*	tuple,	/*!< in: data tuple */
227 	const rec_t*	rec,	/*!< in: record */
228 	const rec_offs*	offsets,/*!< in: array returned by rec_get_offsets() */
229 	ulint		n)	/*!< in: compare nth field */
230 {
231 	const dtype_t*	type;
232 	const dfield_t*	dfield;
233 	const byte*	rec_f;
234 	ulint		rec_f_len;
235 
236 	ut_ad(rec_offs_validate(rec, NULL, offsets));
237 	dfield = dtuple_get_nth_field(tuple, n);
238 
239 	type = dfield_get_type(dfield);
240 
241 	rec_f = rec_get_nth_field(rec, offsets, n, &rec_f_len);
242 
243 	if (type->mtype == DATA_VARCHAR
244 	    || type->mtype == DATA_CHAR
245 	    || type->mtype == DATA_FIXBINARY
246 	    || type->mtype == DATA_BINARY
247 	    || type->mtype == DATA_BLOB
248 	    || DATA_GEOMETRY_MTYPE(type->mtype)
249 	    || type->mtype == DATA_VARMYSQL
250 	    || type->mtype == DATA_MYSQL) {
251 
252 		if (dfield_get_len(dfield) != UNIV_SQL_NULL
253 		    && rec_f_len != UNIV_SQL_NULL
254 		    && rec_f_len >= dfield_get_len(dfield)
255 		    && !cmp_data_data(type->mtype, type->prtype,
256 				      dfield_get_data(dfield),
257 				      dfield_get_len(dfield),
258 				      rec_f, dfield_get_len(dfield))) {
259 
260 			return(TRUE);
261 		}
262 	}
263 
264 	return(FALSE);
265 }
266 #endif /* PAGE_CUR_LE_OR_EXTENDS */
267 
268 /****************************************************************//**
269 Searches the right position for a page cursor. */
270 void
page_cur_search_with_match(const buf_block_t * block,const dict_index_t * index,const dtuple_t * tuple,page_cur_mode_t mode,ulint * iup_matched_fields,ulint * ilow_matched_fields,page_cur_t * cursor,rtr_info_t * rtr_info)271 page_cur_search_with_match(
272 /*=======================*/
273 	const buf_block_t*	block,	/*!< in: buffer block */
274 	const dict_index_t*	index,	/*!< in/out: record descriptor */
275 	const dtuple_t*		tuple,	/*!< in: data tuple */
276 	page_cur_mode_t		mode,	/*!< in: PAGE_CUR_L,
277 					PAGE_CUR_LE, PAGE_CUR_G, or
278 					PAGE_CUR_GE */
279 	ulint*			iup_matched_fields,
280 					/*!< in/out: already matched
281 					fields in upper limit record */
282 	ulint*			ilow_matched_fields,
283 					/*!< in/out: already matched
284 					fields in lower limit record */
285 	page_cur_t*		cursor,	/*!< out: page cursor */
286 	rtr_info_t*		rtr_info)/*!< in/out: rtree search stack */
287 {
288 	ulint		up;
289 	ulint		low;
290 	ulint		mid;
291 	const page_t*	page;
292 	const page_dir_slot_t* slot;
293 	const rec_t*	up_rec;
294 	const rec_t*	low_rec;
295 	const rec_t*	mid_rec;
296 	ulint		up_matched_fields;
297 	ulint		low_matched_fields;
298 	ulint		cur_matched_fields;
299 	int		cmp;
300 #ifdef UNIV_ZIP_DEBUG
301 	const page_zip_des_t*	page_zip = buf_block_get_page_zip(block);
302 #endif /* UNIV_ZIP_DEBUG */
303 	mem_heap_t*	heap		= NULL;
304 	rec_offs	offsets_[REC_OFFS_NORMAL_SIZE];
305 	rec_offs*	offsets		= offsets_;
306 	rec_offs_init(offsets_);
307 
308 	ut_ad(dtuple_validate(tuple));
309 #ifdef UNIV_DEBUG
310 # ifdef PAGE_CUR_DBG
311 	if (mode != PAGE_CUR_DBG)
312 # endif /* PAGE_CUR_DBG */
313 # ifdef PAGE_CUR_LE_OR_EXTENDS
314 		if (mode != PAGE_CUR_LE_OR_EXTENDS)
315 # endif /* PAGE_CUR_LE_OR_EXTENDS */
316 			ut_ad(mode == PAGE_CUR_L || mode == PAGE_CUR_LE
317 			      || mode == PAGE_CUR_G || mode == PAGE_CUR_GE
318 			      || dict_index_is_spatial(index));
319 #endif /* UNIV_DEBUG */
320 	page = buf_block_get_frame(block);
321 #ifdef UNIV_ZIP_DEBUG
322 	ut_a(!page_zip || page_zip_validate(page_zip, page, index));
323 #endif /* UNIV_ZIP_DEBUG */
324 
325 	ut_d(page_check_dir(page));
326 	const ulint n_core = page_is_leaf(page) ? index->n_core_fields : 0;
327 
328 #ifdef BTR_CUR_HASH_ADAPT
329 	if (n_core
330 	    && page_get_direction(page) == PAGE_RIGHT
331 	    && page_header_get_offs(page, PAGE_LAST_INSERT)
332 	    && mode == PAGE_CUR_LE
333 	    && !index->is_spatial()
334 	    && page_header_get_field(page, PAGE_N_DIRECTION) > 3
335 	    && page_cur_try_search_shortcut(
336 		    block, index, tuple,
337 		    iup_matched_fields, ilow_matched_fields, cursor)) {
338 		return;
339 	}
340 # ifdef PAGE_CUR_DBG
341 	if (mode == PAGE_CUR_DBG) {
342 		mode = PAGE_CUR_LE;
343 	}
344 # endif
345 #endif /* BTR_CUR_HASH_ADAPT */
346 
347 	/* If the mode is for R-tree indexes, use the special MBR
348 	related compare functions */
349 	if (index->is_spatial() && mode > PAGE_CUR_LE) {
350 		/* For leaf level insert, we still use the traditional
351 		compare function for now */
352 		if (mode == PAGE_CUR_RTREE_INSERT && n_core) {
353 			mode = PAGE_CUR_LE;
354 		} else {
355 			rtr_cur_search_with_match(
356 				block, (dict_index_t*)index, tuple, mode,
357 				cursor, rtr_info);
358 			return;
359 		}
360 	}
361 
362 	/* The following flag does not work for non-latin1 char sets because
363 	cmp_full_field does not tell how many bytes matched */
364 #ifdef PAGE_CUR_LE_OR_EXTENDS
365 	ut_a(mode != PAGE_CUR_LE_OR_EXTENDS);
366 #endif /* PAGE_CUR_LE_OR_EXTENDS */
367 
368 	/* If mode PAGE_CUR_G is specified, we are trying to position the
369 	cursor to answer a query of the form "tuple < X", where tuple is
370 	the input parameter, and X denotes an arbitrary physical record on
371 	the page. We want to position the cursor on the first X which
372 	satisfies the condition. */
373 
374 	up_matched_fields  = *iup_matched_fields;
375 	low_matched_fields = *ilow_matched_fields;
376 
377 	/* Perform binary search. First the search is done through the page
378 	directory, after that as a linear search in the list of records
379 	owned by the upper limit directory slot. */
380 
381 	low = 0;
382 	up = ulint(page_dir_get_n_slots(page)) - 1;
383 
384 	/* Perform binary search until the lower and upper limit directory
385 	slots come to the distance 1 of each other */
386 
387 	while (up - low > 1) {
388 		mid = (low + up) / 2;
389 		slot = page_dir_get_nth_slot(page, mid);
390 		mid_rec = page_dir_slot_get_rec(slot);
391 
392 		cur_matched_fields = std::min(low_matched_fields,
393 					      up_matched_fields);
394 
395 		offsets = offsets_;
396 		offsets = rec_get_offsets(
397 			mid_rec, index, offsets, n_core,
398 			dtuple_get_n_fields_cmp(tuple), &heap);
399 
400 		cmp = cmp_dtuple_rec_with_match(
401 			tuple, mid_rec, offsets, &cur_matched_fields);
402 
403 		if (cmp > 0) {
404 low_slot_match:
405 			low = mid;
406 			low_matched_fields = cur_matched_fields;
407 
408 		} else if (cmp) {
409 #ifdef PAGE_CUR_LE_OR_EXTENDS
410 			if (mode == PAGE_CUR_LE_OR_EXTENDS
411 			    && page_cur_rec_field_extends(
412 				    tuple, mid_rec, offsets,
413 				    cur_matched_fields)) {
414 
415 				goto low_slot_match;
416 			}
417 #endif /* PAGE_CUR_LE_OR_EXTENDS */
418 up_slot_match:
419 			up = mid;
420 			up_matched_fields = cur_matched_fields;
421 
422 		} else if (mode == PAGE_CUR_G || mode == PAGE_CUR_LE
423 #ifdef PAGE_CUR_LE_OR_EXTENDS
424 			   || mode == PAGE_CUR_LE_OR_EXTENDS
425 #endif /* PAGE_CUR_LE_OR_EXTENDS */
426 			   ) {
427 			goto low_slot_match;
428 		} else {
429 
430 			goto up_slot_match;
431 		}
432 	}
433 
434 	slot = page_dir_get_nth_slot(page, low);
435 	low_rec = page_dir_slot_get_rec(slot);
436 	slot = page_dir_get_nth_slot(page, up);
437 	up_rec = page_dir_slot_get_rec(slot);
438 
439 	/* Perform linear search until the upper and lower records come to
440 	distance 1 of each other. */
441 
442 	while (page_rec_get_next_const(low_rec) != up_rec) {
443 
444 		mid_rec = page_rec_get_next_const(low_rec);
445 
446 		cur_matched_fields = std::min(low_matched_fields,
447 					      up_matched_fields);
448 
449 		offsets = offsets_;
450 		offsets = rec_get_offsets(
451 			mid_rec, index, offsets, n_core,
452 			dtuple_get_n_fields_cmp(tuple), &heap);
453 
454 		cmp = cmp_dtuple_rec_with_match(
455 			tuple, mid_rec, offsets, &cur_matched_fields);
456 
457 		if (cmp > 0) {
458 low_rec_match:
459 			low_rec = mid_rec;
460 			low_matched_fields = cur_matched_fields;
461 
462 		} else if (cmp) {
463 #ifdef PAGE_CUR_LE_OR_EXTENDS
464 			if (mode == PAGE_CUR_LE_OR_EXTENDS
465 			    && page_cur_rec_field_extends(
466 				    tuple, mid_rec, offsets,
467 				    cur_matched_fields)) {
468 
469 				goto low_rec_match;
470 			}
471 #endif /* PAGE_CUR_LE_OR_EXTENDS */
472 up_rec_match:
473 			up_rec = mid_rec;
474 			up_matched_fields = cur_matched_fields;
475 		} else if (mode == PAGE_CUR_G || mode == PAGE_CUR_LE
476 #ifdef PAGE_CUR_LE_OR_EXTENDS
477 			   || mode == PAGE_CUR_LE_OR_EXTENDS
478 #endif /* PAGE_CUR_LE_OR_EXTENDS */
479 			   ) {
480 			if (!cmp && !cur_matched_fields) {
481 #ifdef UNIV_DEBUG
482 				mtr_t	mtr;
483 				mtr_start(&mtr);
484 
485 				/* We got a match, but cur_matched_fields is
486 				0, it must have REC_INFO_MIN_REC_FLAG */
487 				ulint   rec_info = rec_get_info_bits(mid_rec,
488                                                      rec_offs_comp(offsets));
489 				ut_ad(rec_info & REC_INFO_MIN_REC_FLAG);
490 				ut_ad(!page_has_prev(page));
491 				mtr_commit(&mtr);
492 #endif
493 
494 				cur_matched_fields = dtuple_get_n_fields_cmp(tuple);
495 			}
496 
497 			goto low_rec_match;
498 		} else {
499 
500 			goto up_rec_match;
501 		}
502 	}
503 
504 	if (mode <= PAGE_CUR_GE) {
505 		page_cur_position(up_rec, block, cursor);
506 	} else {
507 		page_cur_position(low_rec, block, cursor);
508 	}
509 
510 	*iup_matched_fields  = up_matched_fields;
511 	*ilow_matched_fields = low_matched_fields;
512 	if (UNIV_LIKELY_NULL(heap)) {
513 		mem_heap_free(heap);
514 	}
515 }
516 
517 #ifdef BTR_CUR_HASH_ADAPT
518 /** Search the right position for a page cursor.
519 @param[in]	block			buffer block
520 @param[in]	index			index tree
521 @param[in]	tuple			key to be searched for
522 @param[in]	mode			search mode
523 @param[in,out]	iup_matched_fields	already matched fields in the
524 upper limit record
525 @param[in,out]	iup_matched_bytes	already matched bytes in the
526 first partially matched field in the upper limit record
527 @param[in,out]	ilow_matched_fields	already matched fields in the
528 lower limit record
529 @param[in,out]	ilow_matched_bytes	already matched bytes in the
530 first partially matched field in the lower limit record
531 @param[out]	cursor			page cursor */
532 void
page_cur_search_with_match_bytes(const buf_block_t * block,const dict_index_t * index,const dtuple_t * tuple,page_cur_mode_t mode,ulint * iup_matched_fields,ulint * iup_matched_bytes,ulint * ilow_matched_fields,ulint * ilow_matched_bytes,page_cur_t * cursor)533 page_cur_search_with_match_bytes(
534 	const buf_block_t*	block,
535 	const dict_index_t*	index,
536 	const dtuple_t*		tuple,
537 	page_cur_mode_t		mode,
538 	ulint*			iup_matched_fields,
539 	ulint*			iup_matched_bytes,
540 	ulint*			ilow_matched_fields,
541 	ulint*			ilow_matched_bytes,
542 	page_cur_t*		cursor)
543 {
544 	ulint		up;
545 	ulint		low;
546 	ulint		mid;
547 	const page_t*	page;
548 	const page_dir_slot_t* slot;
549 	const rec_t*	up_rec;
550 	const rec_t*	low_rec;
551 	const rec_t*	mid_rec;
552 	ulint		up_matched_fields;
553 	ulint		up_matched_bytes;
554 	ulint		low_matched_fields;
555 	ulint		low_matched_bytes;
556 	ulint		cur_matched_fields;
557 	ulint		cur_matched_bytes;
558 	int		cmp;
559 #ifdef UNIV_ZIP_DEBUG
560 	const page_zip_des_t*	page_zip = buf_block_get_page_zip(block);
561 #endif /* UNIV_ZIP_DEBUG */
562 	mem_heap_t*	heap		= NULL;
563 	rec_offs	offsets_[REC_OFFS_NORMAL_SIZE];
564 	rec_offs*	offsets		= offsets_;
565 	rec_offs_init(offsets_);
566 
567 	ut_ad(dtuple_validate(tuple));
568 	ut_ad(!(tuple->info_bits & REC_INFO_MIN_REC_FLAG));
569 #ifdef UNIV_DEBUG
570 # ifdef PAGE_CUR_DBG
571 	if (mode != PAGE_CUR_DBG)
572 # endif /* PAGE_CUR_DBG */
573 # ifdef PAGE_CUR_LE_OR_EXTENDS
574 		if (mode != PAGE_CUR_LE_OR_EXTENDS)
575 # endif /* PAGE_CUR_LE_OR_EXTENDS */
576 			ut_ad(mode == PAGE_CUR_L || mode == PAGE_CUR_LE
577 			      || mode == PAGE_CUR_G || mode == PAGE_CUR_GE);
578 #endif /* UNIV_DEBUG */
579 	page = buf_block_get_frame(block);
580 #ifdef UNIV_ZIP_DEBUG
581 	ut_a(!page_zip || page_zip_validate(page_zip, page, index));
582 #endif /* UNIV_ZIP_DEBUG */
583 
584 	ut_d(page_check_dir(page));
585 
586 #ifdef BTR_CUR_HASH_ADAPT
587 	if (page_is_leaf(page)
588 	    && page_get_direction(page) == PAGE_RIGHT
589 	    && page_header_get_offs(page, PAGE_LAST_INSERT)
590 	    && mode == PAGE_CUR_LE
591 	    && page_header_get_field(page, PAGE_N_DIRECTION) > 3
592 	    && page_cur_try_search_shortcut_bytes(
593 		    block, index, tuple,
594 		    iup_matched_fields, iup_matched_bytes,
595 		    ilow_matched_fields, ilow_matched_bytes,
596 		    cursor)) {
597 		return;
598 	}
599 # ifdef PAGE_CUR_DBG
600 	if (mode == PAGE_CUR_DBG) {
601 		mode = PAGE_CUR_LE;
602 	}
603 # endif
604 #endif /* BTR_CUR_HASH_ADAPT */
605 
606 	/* The following flag does not work for non-latin1 char sets because
607 	cmp_full_field does not tell how many bytes matched */
608 #ifdef PAGE_CUR_LE_OR_EXTENDS
609 	ut_a(mode != PAGE_CUR_LE_OR_EXTENDS);
610 #endif /* PAGE_CUR_LE_OR_EXTENDS */
611 
612 	/* If mode PAGE_CUR_G is specified, we are trying to position the
613 	cursor to answer a query of the form "tuple < X", where tuple is
614 	the input parameter, and X denotes an arbitrary physical record on
615 	the page. We want to position the cursor on the first X which
616 	satisfies the condition. */
617 
618 	up_matched_fields  = *iup_matched_fields;
619 	up_matched_bytes  = *iup_matched_bytes;
620 	low_matched_fields = *ilow_matched_fields;
621 	low_matched_bytes  = *ilow_matched_bytes;
622 
623 	/* Perform binary search. First the search is done through the page
624 	directory, after that as a linear search in the list of records
625 	owned by the upper limit directory slot. */
626 
627 	low = 0;
628 	up = ulint(page_dir_get_n_slots(page)) - 1;
629 
630 	/* Perform binary search until the lower and upper limit directory
631 	slots come to the distance 1 of each other */
632 	const ulint n_core = page_is_leaf(page) ? index->n_core_fields : 0;
633 
634 	while (up - low > 1) {
635 		mid = (low + up) / 2;
636 		slot = page_dir_get_nth_slot(page, mid);
637 		mid_rec = page_dir_slot_get_rec(slot);
638 
639 		ut_pair_min(&cur_matched_fields, &cur_matched_bytes,
640 			    low_matched_fields, low_matched_bytes,
641 			    up_matched_fields, up_matched_bytes);
642 
643 		offsets = rec_get_offsets(
644 			mid_rec, index, offsets_, n_core,
645 			dtuple_get_n_fields_cmp(tuple), &heap);
646 
647 		cmp = cmp_dtuple_rec_with_match_bytes(
648 			tuple, mid_rec, index, offsets,
649 			&cur_matched_fields, &cur_matched_bytes);
650 
651 		if (cmp > 0) {
652 low_slot_match:
653 			low = mid;
654 			low_matched_fields = cur_matched_fields;
655 			low_matched_bytes = cur_matched_bytes;
656 
657 		} else if (cmp) {
658 #ifdef PAGE_CUR_LE_OR_EXTENDS
659 			if (mode == PAGE_CUR_LE_OR_EXTENDS
660 			    && page_cur_rec_field_extends(
661 				    tuple, mid_rec, offsets,
662 				    cur_matched_fields)) {
663 
664 				goto low_slot_match;
665 			}
666 #endif /* PAGE_CUR_LE_OR_EXTENDS */
667 up_slot_match:
668 			up = mid;
669 			up_matched_fields = cur_matched_fields;
670 			up_matched_bytes = cur_matched_bytes;
671 
672 		} else if (mode == PAGE_CUR_G || mode == PAGE_CUR_LE
673 #ifdef PAGE_CUR_LE_OR_EXTENDS
674 			   || mode == PAGE_CUR_LE_OR_EXTENDS
675 #endif /* PAGE_CUR_LE_OR_EXTENDS */
676 			   ) {
677 			goto low_slot_match;
678 		} else {
679 
680 			goto up_slot_match;
681 		}
682 	}
683 
684 	slot = page_dir_get_nth_slot(page, low);
685 	low_rec = page_dir_slot_get_rec(slot);
686 	slot = page_dir_get_nth_slot(page, up);
687 	up_rec = page_dir_slot_get_rec(slot);
688 
689 	/* Perform linear search until the upper and lower records come to
690 	distance 1 of each other. */
691 
692 	while (page_rec_get_next_const(low_rec) != up_rec) {
693 
694 		mid_rec = page_rec_get_next_const(low_rec);
695 
696 		ut_pair_min(&cur_matched_fields, &cur_matched_bytes,
697 			    low_matched_fields, low_matched_bytes,
698 			    up_matched_fields, up_matched_bytes);
699 
700 		if (UNIV_UNLIKELY(rec_get_info_bits(
701 					  mid_rec,
702 					  dict_table_is_comp(index->table))
703 				  & REC_INFO_MIN_REC_FLAG)) {
704 			ut_ad(!page_has_prev(page_align(mid_rec)));
705 			ut_ad(!page_rec_is_leaf(mid_rec)
706 			      || rec_is_metadata(mid_rec, *index));
707 			cmp = 1;
708 			goto low_rec_match;
709 		}
710 
711 		offsets = rec_get_offsets(
712 			mid_rec, index, offsets_, n_core,
713 			dtuple_get_n_fields_cmp(tuple), &heap);
714 
715 		cmp = cmp_dtuple_rec_with_match_bytes(
716 			tuple, mid_rec, index, offsets,
717 			&cur_matched_fields, &cur_matched_bytes);
718 
719 		if (cmp > 0) {
720 low_rec_match:
721 			low_rec = mid_rec;
722 			low_matched_fields = cur_matched_fields;
723 			low_matched_bytes = cur_matched_bytes;
724 
725 		} else if (cmp) {
726 #ifdef PAGE_CUR_LE_OR_EXTENDS
727 			if (mode == PAGE_CUR_LE_OR_EXTENDS
728 			    && page_cur_rec_field_extends(
729 				    tuple, mid_rec, offsets,
730 				    cur_matched_fields)) {
731 
732 				goto low_rec_match;
733 			}
734 #endif /* PAGE_CUR_LE_OR_EXTENDS */
735 up_rec_match:
736 			up_rec = mid_rec;
737 			up_matched_fields = cur_matched_fields;
738 			up_matched_bytes = cur_matched_bytes;
739 		} else if (mode == PAGE_CUR_G || mode == PAGE_CUR_LE
740 #ifdef PAGE_CUR_LE_OR_EXTENDS
741 			   || mode == PAGE_CUR_LE_OR_EXTENDS
742 #endif /* PAGE_CUR_LE_OR_EXTENDS */
743 			   ) {
744 			goto low_rec_match;
745 		} else {
746 
747 			goto up_rec_match;
748 		}
749 	}
750 
751 	if (mode <= PAGE_CUR_GE) {
752 		page_cur_position(up_rec, block, cursor);
753 	} else {
754 		page_cur_position(low_rec, block, cursor);
755 	}
756 
757 	*iup_matched_fields  = up_matched_fields;
758 	*iup_matched_bytes   = up_matched_bytes;
759 	*ilow_matched_fields = low_matched_fields;
760 	*ilow_matched_bytes  = low_matched_bytes;
761 	if (UNIV_LIKELY_NULL(heap)) {
762 		mem_heap_free(heap);
763 	}
764 }
765 #endif /* BTR_CUR_HASH_ADAPT */
766 
767 /***********************************************************//**
768 Positions a page cursor on a randomly chosen user record on a page. If there
769 are no user records, sets the cursor on the infimum record. */
770 void
page_cur_open_on_rnd_user_rec(buf_block_t * block,page_cur_t * cursor)771 page_cur_open_on_rnd_user_rec(
772 /*==========================*/
773 	buf_block_t*	block,	/*!< in: page */
774 	page_cur_t*	cursor)	/*!< out: page cursor */
775 {
776 	const ulint	n_recs = page_get_n_recs(block->frame);
777 
778 	page_cur_set_before_first(block, cursor);
779 
780 	if (UNIV_UNLIKELY(n_recs == 0)) {
781 
782 		return;
783 	}
784 
785 	cursor->rec = page_rec_get_nth(block->frame,
786 				       ut_rnd_interval(n_recs) + 1);
787 }
788 
789 /**
790 Set the number of owned records.
791 @param[in,out]  rec     record in block.frame
792 @param[in]      n_owned number of records skipped in the sparse page directory
793 @param[in]      comp    whether ROW_FORMAT is COMPACT or DYNAMIC */
page_rec_set_n_owned(rec_t * rec,ulint n_owned,bool comp)794 static void page_rec_set_n_owned(rec_t *rec, ulint n_owned, bool comp)
795 {
796   rec-= comp ? REC_NEW_N_OWNED : REC_OLD_N_OWNED;
797   *rec= static_cast<byte>((*rec & ~REC_N_OWNED_MASK) |
798                           (n_owned << REC_N_OWNED_SHIFT));
799 }
800 
801 /**
802 Split a directory slot which owns too many records.
803 @param[in,out]  block   index page
804 @param[in,out]  slot    the slot that needs to be split */
page_dir_split_slot(const buf_block_t & block,page_dir_slot_t * slot)805 static void page_dir_split_slot(const buf_block_t &block,
806                                 page_dir_slot_t *slot)
807 {
808   ut_ad(slot <= &block.frame[srv_page_size - PAGE_EMPTY_DIR_START]);
809   slot= my_assume_aligned<2>(slot);
810 
811   const ulint n_owned= PAGE_DIR_SLOT_MAX_N_OWNED + 1;
812 
813   ut_ad(page_dir_slot_get_n_owned(slot) == n_owned);
814   static_assert((PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2 >=
815                 PAGE_DIR_SLOT_MIN_N_OWNED, "compatibility");
816 
817   /* Find a record approximately in the middle. */
818   const rec_t *rec= page_dir_slot_get_rec(slot + PAGE_DIR_SLOT_SIZE);
819 
820   for (ulint i= n_owned / 2; i--; )
821     rec= page_rec_get_next_const(rec);
822 
823   /* Add a directory slot immediately below this one. */
824   constexpr uint16_t n_slots_f= PAGE_N_DIR_SLOTS + PAGE_HEADER;
825   byte *n_slots_p= my_assume_aligned<2>(n_slots_f + block.frame);
826   const uint16_t n_slots= mach_read_from_2(n_slots_p);
827 
828   page_dir_slot_t *last_slot= static_cast<page_dir_slot_t*>
829           (block.frame + srv_page_size - (PAGE_DIR + PAGE_DIR_SLOT_SIZE) -
830            n_slots * PAGE_DIR_SLOT_SIZE);
831   ut_ad(slot >= last_slot);
832   memmove_aligned<2>(last_slot, last_slot + PAGE_DIR_SLOT_SIZE,
833                      slot - last_slot);
834 
835   const ulint half_owned= n_owned / 2;
836 
837   mach_write_to_2(n_slots_p, n_slots + 1);
838 
839   mach_write_to_2(slot, rec - block.frame);
840   const bool comp= page_is_comp(block.frame) != 0;
841   page_rec_set_n_owned(page_dir_slot_get_rec(slot), half_owned, comp);
842   page_rec_set_n_owned(page_dir_slot_get_rec(slot - PAGE_DIR_SLOT_SIZE),
843                        n_owned - half_owned, comp);
844 }
845 
846 /**
847 Split a directory slot which owns too many records.
848 @param[in,out]  block   index page (ROW_FORMAT=COMPRESSED)
849 @param[in]      s       the slot that needs to be split
850 @param[in,out]  mtr     mini-transaction */
page_zip_dir_split_slot(buf_block_t * block,ulint s,mtr_t * mtr)851 static void page_zip_dir_split_slot(buf_block_t *block, ulint s, mtr_t* mtr)
852 {
853   ut_ad(block->page.zip.data);
854   ut_ad(page_is_comp(block->frame));
855   ut_ad(s);
856 
857   page_dir_slot_t *slot= page_dir_get_nth_slot(block->frame, s);
858   const ulint n_owned= PAGE_DIR_SLOT_MAX_N_OWNED + 1;
859 
860   ut_ad(page_dir_slot_get_n_owned(slot) == n_owned);
861   static_assert((PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2 >=
862                 PAGE_DIR_SLOT_MIN_N_OWNED, "compatibility");
863 
864   /* 1. We loop to find a record approximately in the middle of the
865   records owned by the slot. */
866 
867   const rec_t *rec= page_dir_slot_get_rec(slot + PAGE_DIR_SLOT_SIZE);
868 
869   for (ulint i= n_owned / 2; i--; )
870     rec= page_rec_get_next_const(rec);
871 
872   /* Add a directory slot immediately below this one. */
873   constexpr uint16_t n_slots_f= PAGE_N_DIR_SLOTS + PAGE_HEADER;
874   byte *n_slots_p= my_assume_aligned<2>(n_slots_f + block->frame);
875   const uint16_t n_slots= mach_read_from_2(n_slots_p);
876 
877   page_dir_slot_t *last_slot= static_cast<page_dir_slot_t*>
878           (block->frame + srv_page_size - (PAGE_DIR + PAGE_DIR_SLOT_SIZE) -
879            n_slots * PAGE_DIR_SLOT_SIZE);
880   memmove_aligned<2>(last_slot, last_slot + PAGE_DIR_SLOT_SIZE,
881                      slot - last_slot);
882 
883   const ulint half_owned= n_owned / 2;
884 
885   mtr->write<2>(*block, n_slots_p, 1U + n_slots);
886 
887   /* Log changes to the compressed page header and the dense page directory. */
888   memcpy_aligned<2>(&block->page.zip.data[n_slots_f], n_slots_p, 2);
889   mach_write_to_2(slot, page_offset(rec));
890   page_rec_set_n_owned<true>(block, page_dir_slot_get_rec(slot), half_owned,
891                              true, mtr);
892   page_rec_set_n_owned<true>(block,
893                              page_dir_slot_get_rec(slot - PAGE_DIR_SLOT_SIZE),
894                              n_owned - half_owned, true, mtr);
895 }
896 
897 /**
898 Try to balance an underfilled directory slot with an adjacent one,
899 so that there are at least the minimum number of records owned by the slot;
900 this may result in merging the two slots.
901 @param[in,out]	block		ROW_FORMAT=COMPRESSED page
902 @param[in]	s		the slot to be balanced
903 @param[in,out]	mtr		mini-transaction */
page_zip_dir_balance_slot(buf_block_t * block,ulint s,mtr_t * mtr)904 static void page_zip_dir_balance_slot(buf_block_t *block, ulint s, mtr_t *mtr)
905 {
906 	ut_ad(block->page.zip.data);
907 	ut_ad(page_is_comp(block->frame));
908 	ut_ad(s > 0);
909 
910 	const ulint n_slots = page_dir_get_n_slots(block->frame);
911 
912 	if (UNIV_UNLIKELY(s + 1 == n_slots)) {
913 		/* The last directory slot cannot be balanced. */
914 		return;
915 	}
916 
917 	ut_ad(s < n_slots);
918 
919 	page_dir_slot_t* slot = page_dir_get_nth_slot(block->frame, s);
920 	rec_t* const up_rec = const_cast<rec_t*>
921 		(page_dir_slot_get_rec(slot - PAGE_DIR_SLOT_SIZE));
922 	rec_t* const slot_rec = const_cast<rec_t*>
923 		(page_dir_slot_get_rec(slot));
924 	const ulint up_n_owned = rec_get_n_owned_new(up_rec);
925 
926 	ut_ad(rec_get_n_owned_new(page_dir_slot_get_rec(slot))
927 	      == PAGE_DIR_SLOT_MIN_N_OWNED - 1);
928 
929 	if (up_n_owned <= PAGE_DIR_SLOT_MIN_N_OWNED) {
930 		compile_time_assert(2 * PAGE_DIR_SLOT_MIN_N_OWNED - 1
931 				    <= PAGE_DIR_SLOT_MAX_N_OWNED);
932 		/* Merge the slots. */
933 		page_rec_set_n_owned<true>(block, slot_rec, 0, true, mtr);
934 		page_rec_set_n_owned<true>(block, up_rec, up_n_owned
935 					   + (PAGE_DIR_SLOT_MIN_N_OWNED - 1),
936 					   true, mtr);
937 		/* Shift the slots */
938 		page_dir_slot_t* last_slot = page_dir_get_nth_slot(
939 			block->frame, n_slots - 1);
940 		memmove_aligned<2>(last_slot + PAGE_DIR_SLOT_SIZE, last_slot,
941 				   slot - last_slot);
942 		constexpr uint16_t n_slots_f = PAGE_N_DIR_SLOTS + PAGE_HEADER;
943 		byte *n_slots_p= my_assume_aligned<2>
944 			(n_slots_f + block->frame);
945 		mtr->write<2>(*block, n_slots_p, n_slots - 1);
946 		memcpy_aligned<2>(n_slots_f + block->page.zip.data,
947 				  n_slots_p, 2);
948 		memset_aligned<2>(last_slot, 0, 2);
949 		return;
950 	}
951 
952 	/* Transfer one record to the underfilled slot */
953 	page_rec_set_n_owned<true>(block, slot_rec, 0, true, mtr);
954 	rec_t* new_rec = rec_get_next_ptr(slot_rec, TRUE);
955 	page_rec_set_n_owned<true>(block, new_rec,
956 				   PAGE_DIR_SLOT_MIN_N_OWNED,
957 				   true, mtr);
958 	mach_write_to_2(slot, page_offset(new_rec));
959 	page_rec_set_n_owned(up_rec, up_n_owned - 1, true);
960 }
961 
962 /**
963 Try to balance an underfilled directory slot with an adjacent one,
964 so that there are at least the minimum number of records owned by the slot;
965 this may result in merging the two slots.
966 @param[in,out]	block		index page
967 @param[in]	s		the slot to be balanced */
page_dir_balance_slot(const buf_block_t & block,ulint s)968 static void page_dir_balance_slot(const buf_block_t &block, ulint s)
969 {
970 	const bool comp= page_is_comp(block.frame);
971 	ut_ad(!block.page.zip.data);
972 	ut_ad(s > 0);
973 
974 	const ulint n_slots = page_dir_get_n_slots(block.frame);
975 
976 	if (UNIV_UNLIKELY(s + 1 == n_slots)) {
977 		/* The last directory slot cannot be balanced. */
978 		return;
979 	}
980 
981 	ut_ad(s < n_slots);
982 
983 	page_dir_slot_t* slot = page_dir_get_nth_slot(block.frame, s);
984 	rec_t* const up_rec = const_cast<rec_t*>
985 		(page_dir_slot_get_rec(slot - PAGE_DIR_SLOT_SIZE));
986 	rec_t* const slot_rec = const_cast<rec_t*>
987 		(page_dir_slot_get_rec(slot));
988 	const ulint up_n_owned = comp
989 		? rec_get_n_owned_new(up_rec)
990 		: rec_get_n_owned_old(up_rec);
991 
992 	ut_ad(page_dir_slot_get_n_owned(slot)
993 	      == PAGE_DIR_SLOT_MIN_N_OWNED - 1);
994 
995 	if (up_n_owned <= PAGE_DIR_SLOT_MIN_N_OWNED) {
996 		compile_time_assert(2 * PAGE_DIR_SLOT_MIN_N_OWNED - 1
997 				    <= PAGE_DIR_SLOT_MAX_N_OWNED);
998 		/* Merge the slots. */
999 		page_rec_set_n_owned(slot_rec, 0, comp);
1000 		page_rec_set_n_owned(up_rec, up_n_owned
1001 				     + (PAGE_DIR_SLOT_MIN_N_OWNED - 1), comp);
1002 		/* Shift the slots */
1003 		page_dir_slot_t* last_slot = page_dir_get_nth_slot(
1004 			block.frame, n_slots - 1);
1005 		memmove_aligned<2>(last_slot + PAGE_DIR_SLOT_SIZE, last_slot,
1006 				   slot - last_slot);
1007 		memset_aligned<2>(last_slot, 0, 2);
1008 		constexpr uint16_t n_slots_f = PAGE_N_DIR_SLOTS + PAGE_HEADER;
1009 		byte *n_slots_p= my_assume_aligned<2>
1010 			(n_slots_f + block.frame);
1011 		mach_write_to_2(n_slots_p, n_slots - 1);
1012 		return;
1013 	}
1014 
1015 	/* Transfer one record to the underfilled slot */
1016 	rec_t* new_rec;
1017 
1018 	if (comp) {
1019 		page_rec_set_n_owned(slot_rec, 0, true);
1020 		new_rec = rec_get_next_ptr(slot_rec, TRUE);
1021 		page_rec_set_n_owned(new_rec, PAGE_DIR_SLOT_MIN_N_OWNED, true);
1022 		page_rec_set_n_owned(up_rec, up_n_owned - 1, true);
1023 	} else {
1024 		page_rec_set_n_owned(slot_rec, 0, false);
1025 		new_rec = rec_get_next_ptr(slot_rec, FALSE);
1026 		page_rec_set_n_owned(new_rec, PAGE_DIR_SLOT_MIN_N_OWNED,
1027 				     false);
1028 		page_rec_set_n_owned(up_rec, up_n_owned - 1, false);
1029 	}
1030 
1031 	mach_write_to_2(slot, page_offset(new_rec));
1032 }
1033 
1034 /** Allocate space for inserting an index record.
1035 @tparam compressed  whether to update the ROW_FORMAT=COMPRESSED
1036 @param[in,out]	block		index page
1037 @param[in]	need		number of bytes needed
1038 @param[out]	heap_no		record heap number
1039 @return	pointer to the start of the allocated buffer
1040 @retval	NULL	if allocation fails */
1041 template<bool compressed=false>
page_mem_alloc_heap(buf_block_t * block,ulint need,ulint * heap_no)1042 static byte* page_mem_alloc_heap(buf_block_t *block, ulint need,
1043                                  ulint *heap_no)
1044 {
1045   ut_ad(!compressed || block->page.zip.data);
1046 
1047   byte *heap_top= my_assume_aligned<2>(PAGE_HEAP_TOP + PAGE_HEADER +
1048                                        block->frame);
1049 
1050   const uint16_t top= mach_read_from_2(heap_top);
1051 
1052   if (need > page_get_max_insert_size(block->frame, 1))
1053     return NULL;
1054 
1055   byte *n_heap= my_assume_aligned<2>(PAGE_N_HEAP + PAGE_HEADER + block->frame);
1056 
1057   const uint16_t h= mach_read_from_2(n_heap);
1058   if (UNIV_UNLIKELY((h + 1) & 0x6000))
1059   {
1060     /* At the minimum record size of 5+2 bytes, we can only reach this
1061     condition when using innodb_page_size=64k. */
1062     ut_ad((h & 0x7fff) == 8191);
1063     ut_ad(srv_page_size == 65536);
1064     return NULL;
1065   }
1066 
1067   *heap_no= h & 0x7fff;
1068   ut_ad(*heap_no < srv_page_size / REC_N_NEW_EXTRA_BYTES);
1069   compile_time_assert(UNIV_PAGE_SIZE_MAX / REC_N_NEW_EXTRA_BYTES < 0x3fff);
1070 
1071   mach_write_to_2(heap_top, top + need);
1072   mach_write_to_2(n_heap, h + 1);
1073 
1074   if (compressed)
1075   {
1076     ut_ad(h & 0x8000);
1077     memcpy_aligned<4>(&block->page.zip.data[PAGE_HEAP_TOP + PAGE_HEADER],
1078                       heap_top, 4);
1079   }
1080 
1081   return &block->frame[top];
1082 }
1083 
1084 /** Write log for inserting a B-tree or R-tree record in
1085 ROW_FORMAT=REDUNDANT.
1086 @param block      B-tree or R-tree page
1087 @param reuse      false=allocate from PAGE_HEAP_TOP; true=reuse PAGE_FREE
1088 @param prev_rec   byte offset of the predecessor of the record to insert,
1089                   starting from PAGE_OLD_INFIMUM
1090 @param info_bits  info_bits of the record
1091 @param n_fields_s number of fields << 1 | rec_get_1byte_offs_flag()
1092 @param hdr_c      number of common record header bytes with prev_rec
1093 @param data_c     number of common data bytes with prev_rec
1094 @param hdr        record header bytes to copy to the log
1095 @param hdr_l      number of copied record header bytes
1096 @param data       record payload bytes to copy to the log
1097 @param data_l     number of copied record data bytes */
page_insert(const buf_block_t & block,bool reuse,ulint prev_rec,byte info_bits,ulint n_fields_s,size_t hdr_c,size_t data_c,const byte * hdr,size_t hdr_l,const byte * data,size_t data_l)1098 inline void mtr_t::page_insert(const buf_block_t &block, bool reuse,
1099                                ulint prev_rec, byte info_bits,
1100                                ulint n_fields_s, size_t hdr_c, size_t data_c,
1101                                const byte *hdr, size_t hdr_l,
1102                                const byte *data, size_t data_l)
1103 {
1104   ut_ad(!block.page.zip.data);
1105   ut_ad(m_log_mode == MTR_LOG_ALL);
1106   ut_d(ulint n_slots= page_dir_get_n_slots(block.frame));
1107   ut_ad(n_slots >= 2);
1108   ut_d(const byte *page_end= page_dir_get_nth_slot(block.frame, n_slots - 1));
1109   ut_ad(&block.frame[prev_rec + PAGE_OLD_INFIMUM] <= page_end);
1110   ut_ad(block.frame + page_header_get_offs(block.frame, PAGE_HEAP_TOP) <=
1111         page_end);
1112   ut_ad(fil_page_index_page_check(block.frame));
1113   ut_ad(!(~(REC_INFO_MIN_REC_FLAG | REC_INFO_DELETED_FLAG) & info_bits));
1114   ut_ad(n_fields_s >= 2);
1115   ut_ad((n_fields_s >> 1) <= REC_MAX_N_FIELDS);
1116   ut_ad(data_l + data_c <= REDUNDANT_REC_MAX_DATA_SIZE);
1117 
1118   set_modified(block);
1119 
1120   static_assert(REC_INFO_MIN_REC_FLAG == 0x10, "compatibility");
1121   static_assert(REC_INFO_DELETED_FLAG == 0x20, "compatibility");
1122   n_fields_s= (n_fields_s - 2) << 2 | info_bits >> 4;
1123 
1124   size_t len= prev_rec < MIN_2BYTE ? 2 : prev_rec < MIN_3BYTE ? 3 : 4;
1125   static_assert((REC_MAX_N_FIELDS << 1 | 1) <= MIN_3BYTE, "compatibility");
1126   len+= n_fields_s < MIN_2BYTE ? 1 : 2;
1127   len+= hdr_c < MIN_2BYTE ? 1 : 2;
1128   static_assert(REDUNDANT_REC_MAX_DATA_SIZE <= MIN_3BYTE, "compatibility");
1129   len+= data_c < MIN_2BYTE ? 1 : 2;
1130   len+= hdr_l + data_l;
1131 
1132   const bool small= len < mtr_buf_t::MAX_DATA_SIZE - (1 + 3 + 3 + 5 + 5);
1133   byte *l= log_write<EXTENDED>(block.page.id(), &block.page, len, small);
1134 
1135   if (UNIV_LIKELY(small))
1136   {
1137     ut_d(const byte * const end = l + len);
1138     *l++= reuse ? INSERT_REUSE_REDUNDANT : INSERT_HEAP_REDUNDANT;
1139     l= mlog_encode_varint(l, prev_rec);
1140     l= mlog_encode_varint(l, n_fields_s);
1141     l= mlog_encode_varint(l, hdr_c);
1142     l= mlog_encode_varint(l, data_c);
1143     ::memcpy(l, hdr, hdr_l);
1144     l+= hdr_l;
1145     ::memcpy(l, data, data_l);
1146     l+= data_l;
1147     ut_ad(end == l);
1148     m_log.close(l);
1149   }
1150   else
1151   {
1152     m_log.close(l);
1153     l= m_log.open(len - hdr_l - data_l);
1154     ut_d(const byte * const end = l + len - hdr_l - data_l);
1155     *l++= reuse ? INSERT_REUSE_REDUNDANT : INSERT_HEAP_REDUNDANT;
1156     l= mlog_encode_varint(l, prev_rec);
1157     l= mlog_encode_varint(l, n_fields_s);
1158     l= mlog_encode_varint(l, hdr_c);
1159     l= mlog_encode_varint(l, data_c);
1160     ut_ad(end == l);
1161     m_log.close(l);
1162     m_log.push(hdr, static_cast<uint32_t>(hdr_l));
1163     m_log.push(data, static_cast<uint32_t>(data_l));
1164   }
1165 
1166   m_last_offset= FIL_PAGE_TYPE;
1167 }
1168 
1169 /** Write log for inserting a B-tree or R-tree record in
1170 ROW_FORMAT=COMPACT or ROW_FORMAT=DYNAMIC.
1171 @param block       B-tree or R-tree page
1172 @param reuse       false=allocate from PAGE_HEAP_TOP; true=reuse PAGE_FREE
1173 @param prev_rec    byte offset of the predecessor of the record to insert,
1174                    starting from PAGE_NEW_INFIMUM
1175 @param info_status rec_get_info_and_status_bits()
1176 @param shift       unless !reuse: number of bytes the PAGE_FREE is moving
1177 @param hdr_c       number of common record header bytes with prev_rec
1178 @param data_c      number of common data bytes with prev_rec
1179 @param hdr         record header bytes to copy to the log
1180 @param hdr_l       number of copied record header bytes
1181 @param data        record payload bytes to copy to the log
1182 @param data_l      number of copied record data bytes */
page_insert(const buf_block_t & block,bool reuse,ulint prev_rec,byte info_status,ssize_t shift,size_t hdr_c,size_t data_c,const byte * hdr,size_t hdr_l,const byte * data,size_t data_l)1183 inline void mtr_t::page_insert(const buf_block_t &block, bool reuse,
1184                                ulint prev_rec, byte info_status,
1185                                ssize_t shift, size_t hdr_c, size_t data_c,
1186                                const byte *hdr, size_t hdr_l,
1187                                const byte *data, size_t data_l)
1188 {
1189   ut_ad(!block.page.zip.data);
1190   ut_ad(m_log_mode == MTR_LOG_ALL);
1191   ut_d(ulint n_slots= page_dir_get_n_slots(block.frame));
1192   ut_ad(n_slots >= 2);
1193   ut_d(const byte *page_end= page_dir_get_nth_slot(block.frame, n_slots - 1));
1194   ut_ad(&block.frame[prev_rec + PAGE_NEW_INFIMUM] <= page_end);
1195   ut_ad(block.frame + page_header_get_offs(block.frame, PAGE_HEAP_TOP) <=
1196         page_end);
1197   ut_ad(fil_page_index_page_check(block.frame));
1198   ut_ad(hdr_l + hdr_c + data_l + data_c <=
1199         static_cast<size_t>(page_end - &block.frame[PAGE_NEW_SUPREMUM_END]));
1200   ut_ad(reuse || shift == 0);
1201 #ifdef UNIV_DEBUG
1202   switch (~(REC_INFO_MIN_REC_FLAG | REC_INFO_DELETED_FLAG) & info_status) {
1203   default:
1204     ut_ad(0);
1205     break;
1206   case REC_STATUS_NODE_PTR:
1207     ut_ad(!page_is_leaf(block.frame));
1208     break;
1209   case REC_STATUS_INSTANT:
1210   case REC_STATUS_ORDINARY:
1211     ut_ad(page_is_leaf(block.frame));
1212   }
1213 #endif
1214 
1215   set_modified(block);
1216 
1217   static_assert(REC_INFO_MIN_REC_FLAG == 0x10, "compatibility");
1218   static_assert(REC_INFO_DELETED_FLAG == 0x20, "compatibility");
1219   static_assert(REC_STATUS_INSTANT == 4, "compatibility");
1220 
1221   const size_t enc_hdr_l= hdr_l << 3 |
1222     (info_status & REC_STATUS_INSTANT) | info_status >> 4;
1223   size_t len= prev_rec < MIN_2BYTE ? 2 : prev_rec < MIN_3BYTE ? 3 : 4;
1224   static_assert(REC_MAX_N_FIELDS * 2 < MIN_3BYTE, "compatibility");
1225   if (reuse)
1226   {
1227     if (shift < 0)
1228       shift= -shift << 1 | 1;
1229     else
1230       shift<<= 1;
1231     len+= static_cast<size_t>(shift) < MIN_2BYTE
1232       ? 1 : static_cast<size_t>(shift) < MIN_3BYTE ? 2 : 3;
1233   }
1234   ut_ad(hdr_c + hdr_l <= REC_MAX_N_FIELDS * 2);
1235   len+= hdr_c < MIN_2BYTE ? 1 : 2;
1236   len+= enc_hdr_l < MIN_2BYTE ? 1 : enc_hdr_l < MIN_3BYTE ? 2 : 3;
1237   len+= data_c < MIN_2BYTE ? 1 : data_c < MIN_3BYTE ? 2 : 3;
1238   len+= hdr_l + data_l;
1239 
1240   const bool small= len < mtr_buf_t::MAX_DATA_SIZE - (1 + 3 + 3 + 5 + 5);
1241   byte *l= log_write<EXTENDED>(block.page.id(), &block.page, len, small);
1242 
1243   if (UNIV_LIKELY(small))
1244   {
1245     ut_d(const byte * const end = l + len);
1246     *l++= reuse ? INSERT_REUSE_DYNAMIC : INSERT_HEAP_DYNAMIC;
1247     l= mlog_encode_varint(l, prev_rec);
1248     if (reuse)
1249       l= mlog_encode_varint(l, shift);
1250     l= mlog_encode_varint(l, enc_hdr_l);
1251     l= mlog_encode_varint(l, hdr_c);
1252     l= mlog_encode_varint(l, data_c);
1253     ::memcpy(l, hdr, hdr_l);
1254     l+= hdr_l;
1255     ::memcpy(l, data, data_l);
1256     l+= data_l;
1257     ut_ad(end == l);
1258     m_log.close(l);
1259   }
1260   else
1261   {
1262     m_log.close(l);
1263     l= m_log.open(len - hdr_l - data_l);
1264     ut_d(const byte * const end = l + len - hdr_l - data_l);
1265     *l++= reuse ? INSERT_REUSE_DYNAMIC : INSERT_HEAP_DYNAMIC;
1266     l= mlog_encode_varint(l, prev_rec);
1267     if (reuse)
1268       l= mlog_encode_varint(l, shift);
1269     l= mlog_encode_varint(l, enc_hdr_l);
1270     l= mlog_encode_varint(l, hdr_c);
1271     l= mlog_encode_varint(l, data_c);
1272     ut_ad(end == l);
1273     m_log.close(l);
1274     m_log.push(hdr, static_cast<uint32_t>(hdr_l));
1275     m_log.push(data, static_cast<uint32_t>(data_l));
1276   }
1277 
1278   m_last_offset= FIL_PAGE_TYPE;
1279 }
1280 
1281 /***********************************************************//**
1282 Inserts a record next to page cursor on an uncompressed page.
1283 Returns pointer to inserted record if succeed, i.e., enough
1284 space available, NULL otherwise. The cursor stays at the same position.
1285 @return pointer to record if succeed, NULL otherwise */
1286 rec_t*
page_cur_insert_rec_low(const page_cur_t * cur,dict_index_t * index,const rec_t * rec,rec_offs * offsets,mtr_t * mtr)1287 page_cur_insert_rec_low(
1288 /*====================*/
1289 	const page_cur_t*cur,	/*!< in: page cursor */
1290 	dict_index_t*	index,	/*!< in: record descriptor */
1291 	const rec_t*	rec,	/*!< in: record to insert after cur */
1292 	rec_offs*	offsets,/*!< in/out: rec_get_offsets(rec, index) */
1293 	mtr_t*		mtr)	/*!< in/out: mini-transaction */
1294 {
1295   buf_block_t* block= cur->block;
1296 
1297   ut_ad(rec_offs_validate(rec, index, offsets));
1298   ut_ad(rec_offs_n_fields(offsets) > 0);
1299   ut_ad(index->table->not_redundant() == !!page_is_comp(block->frame));
1300   ut_ad(!!page_is_comp(block->frame) == !!rec_offs_comp(offsets));
1301   ut_ad(fil_page_index_page_check(block->frame));
1302   ut_ad(mach_read_from_8(PAGE_HEADER + PAGE_INDEX_ID + block->frame) ==
1303         index->id ||
1304         mtr->is_inside_ibuf());
1305   ut_ad(page_dir_get_n_slots(block->frame) >= 2);
1306 
1307   ut_ad(!page_rec_is_supremum(cur->rec));
1308 
1309   /* We should not write log for ROW_FORMAT=COMPRESSED pages here. */
1310   ut_ad(mtr->get_log_mode() != MTR_LOG_ALL ||
1311         !(index->table->flags & DICT_TF_MASK_ZIP_SSIZE));
1312 
1313   /* 1. Get the size of the physical record in the page */
1314   const ulint rec_size= rec_offs_size(offsets);
1315 
1316 #ifdef HAVE_MEM_CHECK
1317   {
1318     const void *rec_start __attribute__((unused))=
1319       rec - rec_offs_extra_size(offsets);
1320     ulint extra_size __attribute__((unused))=
1321       rec_offs_extra_size(offsets) -
1322       (page_is_comp(block->frame)
1323        ? REC_N_NEW_EXTRA_BYTES
1324        : REC_N_OLD_EXTRA_BYTES);
1325     /* All data bytes of the record must be valid. */
1326     MEM_CHECK_DEFINED(rec, rec_offs_data_size(offsets));
1327     /* The variable-length header must be valid. */
1328     MEM_CHECK_DEFINED(rec_start, extra_size);
1329   }
1330 #endif /* HAVE_MEM_CHECK */
1331 
1332   /* 2. Try to find suitable space from page memory management */
1333   bool reuse= false;
1334   ssize_t free_offset= 0;
1335   ulint heap_no;
1336   byte *insert_buf;
1337 
1338   const bool comp= page_is_comp(block->frame);
1339   const ulint extra_size= rec_offs_extra_size(offsets);
1340 
1341   if (rec_t* free_rec= page_header_get_ptr(block->frame, PAGE_FREE))
1342   {
1343     /* Try to reuse the head of PAGE_FREE. */
1344     rec_offs foffsets_[REC_OFFS_NORMAL_SIZE];
1345     mem_heap_t *heap= nullptr;
1346 
1347     rec_offs_init(foffsets_);
1348 
1349     rec_offs *foffsets= rec_get_offsets(free_rec, index, foffsets_,
1350                                         page_is_leaf(block->frame)
1351                                         ? index->n_core_fields : 0,
1352                                         ULINT_UNDEFINED, &heap);
1353     const ulint fextra_size= rec_offs_extra_size(foffsets);
1354     insert_buf= free_rec - fextra_size;
1355     const bool too_small= (fextra_size + rec_offs_data_size(foffsets)) <
1356       rec_size;
1357     if (UNIV_LIKELY_NULL(heap))
1358       mem_heap_free(heap);
1359 
1360     if (too_small)
1361       goto use_heap;
1362 
1363     byte *page_free= my_assume_aligned<2>(PAGE_FREE + PAGE_HEADER +
1364                                           block->frame);
1365     if (comp)
1366     {
1367       heap_no= rec_get_heap_no_new(free_rec);
1368       uint16_t next= mach_read_from_2(free_rec - REC_NEXT);
1369       mach_write_to_2(page_free, next
1370                       ? static_cast<uint16_t>(free_rec + next - block->frame)
1371                       : 0);
1372     }
1373     else
1374     {
1375       heap_no= rec_get_heap_no_old(free_rec);
1376       memcpy(page_free, free_rec - REC_NEXT, 2);
1377     }
1378 
1379     static_assert(PAGE_GARBAGE == PAGE_FREE + 2, "compatibility");
1380 
1381     byte *page_garbage= my_assume_aligned<2>(page_free + 2);
1382     ut_ad(mach_read_from_2(page_garbage) >= rec_size);
1383     mach_write_to_2(page_garbage, mach_read_from_2(page_garbage) - rec_size);
1384     reuse= true;
1385     free_offset= extra_size - fextra_size;
1386   }
1387   else
1388   {
1389 use_heap:
1390     insert_buf= page_mem_alloc_heap(block, rec_size, &heap_no);
1391 
1392     if (UNIV_UNLIKELY(!insert_buf))
1393       return nullptr;
1394   }
1395 
1396   ut_ad(cur->rec != insert_buf + extra_size);
1397 
1398   rec_t *next_rec= block->frame + rec_get_next_offs(cur->rec, comp);
1399   ut_ad(next_rec != block->frame);
1400 
1401   /* Update page header fields */
1402   byte *page_last_insert= my_assume_aligned<2>(PAGE_LAST_INSERT + PAGE_HEADER +
1403                                                block->frame);
1404   const uint16_t last_insert= mach_read_from_2(page_last_insert);
1405   ut_ad(!last_insert || !comp ||
1406         rec_get_node_ptr_flag(block->frame + last_insert) ==
1407         rec_get_node_ptr_flag(rec));
1408 
1409   /* Write PAGE_LAST_INSERT */
1410   mach_write_to_2(page_last_insert, page_offset(insert_buf + extra_size));
1411 
1412   /* Update PAGE_DIRECTION_B, PAGE_N_DIRECTION if needed */
1413   if (block->frame[FIL_PAGE_TYPE + 1] != byte(FIL_PAGE_RTREE))
1414   {
1415     byte *dir= &block->frame[PAGE_DIRECTION_B + PAGE_HEADER];
1416     byte *n= my_assume_aligned<2>
1417       (&block->frame[PAGE_N_DIRECTION + PAGE_HEADER]);
1418     if (UNIV_UNLIKELY(!last_insert))
1419     {
1420 no_direction:
1421       *dir= static_cast<byte>((*dir & ~((1U << 3) - 1)) | PAGE_NO_DIRECTION);
1422       memset(n, 0, 2);
1423     }
1424     else if (block->frame + last_insert == cur->rec &&
1425              (*dir & ((1U << 3) - 1)) != PAGE_LEFT)
1426     {
1427       *dir= static_cast<byte>((*dir & ~((1U << 3) - 1)) | PAGE_RIGHT);
1428 inc_dir:
1429       mach_write_to_2(n, mach_read_from_2(n) + 1);
1430     }
1431     else if (next_rec == block->frame + last_insert &&
1432              (*dir & ((1U << 3) - 1)) != PAGE_RIGHT)
1433     {
1434       *dir= static_cast<byte>((*dir & ~((1U << 3) - 1)) | PAGE_LEFT);
1435       goto inc_dir;
1436     }
1437     else
1438       goto no_direction;
1439   }
1440 
1441   /* Update PAGE_N_RECS. */
1442   byte *page_n_recs= my_assume_aligned<2>(PAGE_N_RECS + PAGE_HEADER +
1443                                           block->frame);
1444 
1445   mach_write_to_2(page_n_recs, mach_read_from_2(page_n_recs) + 1);
1446 
1447   /* Update the preceding record header, the 'owner' record and
1448   prepare the record to insert. */
1449   rec_t *insert_rec= insert_buf + extra_size;
1450   const ulint data_size= rec_offs_data_size(offsets);
1451   memcpy(insert_buf, rec - extra_size, extra_size + data_size);
1452   size_t hdr_common= 0;
1453   ulint n_owned;
1454   const byte info_status= static_cast<byte>
1455     (rec_get_info_and_status_bits(rec, comp));
1456   ut_ad(!(rec_get_info_bits(rec, comp) &
1457           ~(REC_INFO_DELETED_FLAG | REC_INFO_MIN_REC_FLAG)));
1458 
1459   if (comp)
1460   {
1461 #ifdef UNIV_DEBUG
1462     switch (rec_get_status(cur->rec)) {
1463     case REC_STATUS_ORDINARY:
1464     case REC_STATUS_NODE_PTR:
1465     case REC_STATUS_INSTANT:
1466     case REC_STATUS_INFIMUM:
1467       break;
1468     case REC_STATUS_SUPREMUM:
1469       ut_ad("wrong status on cur->rec" == 0);
1470     }
1471     switch (rec_get_status(rec)) {
1472     case REC_STATUS_NODE_PTR:
1473       ut_ad(!page_is_leaf(block->frame));
1474       break;
1475     case REC_STATUS_INSTANT:
1476       ut_ad(index->is_instant());
1477       ut_ad(page_is_leaf(block->frame));
1478       if (!rec_is_metadata(rec, true))
1479         break;
1480       ut_ad(cur->rec == &block->frame[PAGE_NEW_INFIMUM]);
1481       break;
1482     case REC_STATUS_ORDINARY:
1483       ut_ad(page_is_leaf(block->frame));
1484       ut_ad(!(rec_get_info_bits(rec, true) & ~REC_INFO_DELETED_FLAG));
1485       break;
1486     case REC_STATUS_INFIMUM:
1487     case REC_STATUS_SUPREMUM:
1488       ut_ad("wrong status on rec" == 0);
1489     }
1490     ut_ad(rec_get_status(next_rec) != REC_STATUS_INFIMUM);
1491 #endif
1492 
1493     rec_set_bit_field_1(insert_rec, 0, REC_NEW_N_OWNED,
1494                         REC_N_OWNED_MASK, REC_N_OWNED_SHIFT);
1495     insert_rec[-REC_NEW_STATUS]= rec[-REC_NEW_STATUS];
1496     rec_set_bit_field_2(insert_rec, heap_no,
1497                         REC_NEW_HEAP_NO, REC_HEAP_NO_MASK, REC_HEAP_NO_SHIFT);
1498     mach_write_to_2(insert_rec - REC_NEXT,
1499                     static_cast<uint16_t>(next_rec - insert_rec));
1500     mach_write_to_2(cur->rec - REC_NEXT,
1501                     static_cast<uint16_t>(insert_rec - cur->rec));
1502     while (!(n_owned= rec_get_n_owned_new(next_rec)))
1503     {
1504       next_rec= block->frame + rec_get_next_offs(next_rec, true);
1505       ut_ad(next_rec != block->frame);
1506     }
1507     rec_set_bit_field_1(next_rec, n_owned + 1, REC_NEW_N_OWNED,
1508                         REC_N_OWNED_MASK, REC_N_OWNED_SHIFT);
1509     if (mtr->get_log_mode() != MTR_LOG_ALL)
1510     {
1511       mtr->set_modified(*block);
1512       goto copied;
1513     }
1514 
1515     const byte * const c_start= cur->rec - extra_size;
1516     if (extra_size > REC_N_NEW_EXTRA_BYTES &&
1517         c_start >=
1518         &block->frame[PAGE_NEW_SUPREMUM_END + REC_N_NEW_EXTRA_BYTES])
1519     {
1520       /* Find common header bytes with the preceding record. */
1521       const byte *r= rec - (REC_N_NEW_EXTRA_BYTES + 1);
1522       for (const byte *c= cur->rec - (REC_N_NEW_EXTRA_BYTES + 1);
1523            *r == *c && c-- != c_start; r--);
1524       hdr_common= static_cast<size_t>((rec - (REC_N_NEW_EXTRA_BYTES + 1)) - r);
1525       ut_ad(hdr_common <= extra_size - REC_N_NEW_EXTRA_BYTES);
1526     }
1527   }
1528   else
1529   {
1530 #ifdef UNIV_DEBUG
1531     if (!page_is_leaf(block->frame));
1532     else if (rec_is_metadata(rec, false))
1533     {
1534       ut_ad(index->is_instant());
1535       ut_ad(cur->rec == &block->frame[PAGE_OLD_INFIMUM]);
1536     }
1537 #endif
1538     rec_set_bit_field_1(insert_rec, 0, REC_OLD_N_OWNED,
1539                         REC_N_OWNED_MASK, REC_N_OWNED_SHIFT);
1540     rec_set_bit_field_2(insert_rec, heap_no,
1541                         REC_OLD_HEAP_NO, REC_HEAP_NO_MASK, REC_HEAP_NO_SHIFT);
1542     memcpy(insert_rec - REC_NEXT, cur->rec - REC_NEXT, 2);
1543     mach_write_to_2(cur->rec - REC_NEXT, page_offset(insert_rec));
1544     while (!(n_owned= rec_get_n_owned_old(next_rec)))
1545     {
1546       next_rec= block->frame + rec_get_next_offs(next_rec, false);
1547       ut_ad(next_rec != block->frame);
1548     }
1549     rec_set_bit_field_1(next_rec, n_owned + 1, REC_OLD_N_OWNED,
1550                         REC_N_OWNED_MASK, REC_N_OWNED_SHIFT);
1551     if (mtr->get_log_mode() != MTR_LOG_ALL)
1552     {
1553       mtr->set_modified(*block);
1554       goto copied;
1555     }
1556 
1557     ut_ad(extra_size > REC_N_OLD_EXTRA_BYTES);
1558     const byte * const c_start= cur->rec - extra_size;
1559     if (c_start >=
1560         &block->frame[PAGE_OLD_SUPREMUM_END + REC_N_OLD_EXTRA_BYTES])
1561     {
1562       /* Find common header bytes with the preceding record. */
1563       const byte *r= rec - (REC_N_OLD_EXTRA_BYTES + 1);
1564       for (const byte *c= cur->rec - (REC_N_OLD_EXTRA_BYTES + 1);
1565            *r == *c && c-- != c_start; r--);
1566       hdr_common= static_cast<size_t>((rec - (REC_N_OLD_EXTRA_BYTES + 1)) - r);
1567       ut_ad(hdr_common <= extra_size - REC_N_OLD_EXTRA_BYTES);
1568     }
1569   }
1570 
1571   /* Insert the record, possibly copying from the preceding record. */
1572   ut_ad(mtr->get_log_mode() == MTR_LOG_ALL);
1573 
1574   {
1575     const byte *r= rec;
1576     const byte *c= cur->rec;
1577     const byte *c_end= cur->rec + data_size;
1578     if (c <= insert_buf && c_end > insert_buf)
1579       c_end= insert_buf;
1580     else
1581       c_end= std::min<const byte*>(c_end, block->frame + srv_page_size -
1582                                    PAGE_DIR - PAGE_DIR_SLOT_SIZE *
1583                                    page_dir_get_n_slots(block->frame));
1584     size_t data_common;
1585     /* Copy common data bytes of the preceding record. */
1586     for (; c != c_end && *r == *c; c++, r++);
1587     data_common= static_cast<size_t>(r - rec);
1588 
1589     if (comp)
1590       mtr->page_insert(*block, reuse,
1591                        cur->rec - block->frame - PAGE_NEW_INFIMUM,
1592                        info_status, free_offset, hdr_common, data_common,
1593                        insert_buf,
1594                        extra_size - hdr_common - REC_N_NEW_EXTRA_BYTES,
1595                        r, data_size - data_common);
1596     else
1597       mtr->page_insert(*block, reuse,
1598                        cur->rec - block->frame - PAGE_OLD_INFIMUM,
1599                        info_status, rec_get_n_fields_old(insert_rec) << 1 |
1600                        rec_get_1byte_offs_flag(insert_rec),
1601                        hdr_common, data_common,
1602                        insert_buf,
1603                        extra_size - hdr_common - REC_N_OLD_EXTRA_BYTES,
1604                        r, data_size - data_common);
1605   }
1606 
1607 copied:
1608   ut_ad(!memcmp(insert_buf, rec - extra_size, extra_size -
1609                 (comp ? REC_N_NEW_EXTRA_BYTES : REC_N_OLD_EXTRA_BYTES)));
1610   ut_ad(!memcmp(insert_rec, rec, data_size));
1611   /* We have incremented the n_owned field of the owner record.
1612   If the number exceeds PAGE_DIR_SLOT_MAX_N_OWNED, we have to split the
1613   corresponding directory slot in two. */
1614 
1615   if (UNIV_UNLIKELY(n_owned == PAGE_DIR_SLOT_MAX_N_OWNED))
1616   {
1617     const auto owner= page_dir_find_owner_slot(next_rec);
1618     page_dir_split_slot(*block, page_dir_get_nth_slot(block->frame, owner));
1619   }
1620 
1621   rec_offs_make_valid(insert_buf + extra_size, index,
1622                       page_is_leaf(block->frame), offsets);
1623   return insert_buf + extra_size;
1624 }
1625 
1626 /** Add a slot to the dense page directory.
1627 @param[in,out]  block   ROW_FORMAT=COMPRESSED page
1628 @param[in]      index   the index that the page belongs to
1629 @param[in,out]  mtr     mini-transaction */
page_zip_dir_add_slot(buf_block_t * block,const dict_index_t * index,mtr_t * mtr)1630 static inline void page_zip_dir_add_slot(buf_block_t *block,
1631                                          const dict_index_t *index, mtr_t *mtr)
1632 {
1633   page_zip_des_t *page_zip= &block->page.zip;
1634 
1635   ut_ad(page_is_comp(page_zip->data));
1636   MEM_CHECK_DEFINED(page_zip->data, page_zip_get_size(page_zip));
1637 
1638   /* Read the old n_dense (n_heap has already been incremented). */
1639   ulint n_dense= page_dir_get_n_heap(page_zip->data) - (PAGE_HEAP_NO_USER_LOW +
1640                                                         1U);
1641 
1642   byte *dir= page_zip->data + page_zip_get_size(page_zip) -
1643     PAGE_ZIP_DIR_SLOT_SIZE * n_dense;
1644   byte *stored= dir;
1645 
1646   if (!page_is_leaf(page_zip->data))
1647   {
1648     ut_ad(!page_zip->n_blobs);
1649     stored-= n_dense * REC_NODE_PTR_SIZE;
1650   }
1651   else if (index->is_clust())
1652   {
1653     /* Move the BLOB pointer array backwards to make space for the
1654     columns DB_TRX_ID,DB_ROLL_PTR and the dense directory slot. */
1655 
1656     stored-= n_dense * (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
1657     byte *externs= stored - page_zip->n_blobs * BTR_EXTERN_FIELD_REF_SIZE;
1658     byte *dst= externs - PAGE_ZIP_CLUST_LEAF_SLOT_SIZE;
1659     ut_ad(!memcmp(dst, field_ref_zero, PAGE_ZIP_CLUST_LEAF_SLOT_SIZE));
1660     if (const ulint len = ulint(stored - externs))
1661     {
1662       memmove(dst, externs, len);
1663       mtr->memmove(*block, dst - page_zip->data, externs - page_zip->data,
1664                    len);
1665     }
1666   }
1667   else
1668   {
1669     stored-= page_zip->n_blobs * BTR_EXTERN_FIELD_REF_SIZE;
1670     ut_ad(!memcmp(stored - PAGE_ZIP_DIR_SLOT_SIZE, field_ref_zero,
1671                   PAGE_ZIP_DIR_SLOT_SIZE));
1672   }
1673 
1674   /* Move the uncompressed area backwards to make space
1675   for one directory slot. */
1676   if (const ulint len = ulint(dir - stored))
1677   {
1678     byte* dst = stored - PAGE_ZIP_DIR_SLOT_SIZE;
1679     memmove(dst, stored, len);
1680     mtr->memmove(*block, dst - page_zip->data, stored - page_zip->data, len);
1681   }
1682 }
1683 
1684 /***********************************************************//**
1685 Inserts a record next to page cursor on a compressed and uncompressed
1686 page. Returns pointer to inserted record if succeed, i.e.,
1687 enough space available, NULL otherwise.
1688 The cursor stays at the same position.
1689 
1690 IMPORTANT: The caller will have to update IBUF_BITMAP_FREE
1691 if this is a compressed leaf page in a secondary index.
1692 This has to be done either within the same mini-transaction,
1693 or by invoking ibuf_reset_free_bits() before mtr_commit().
1694 
1695 @return pointer to record if succeed, NULL otherwise */
1696 rec_t*
page_cur_insert_rec_zip(page_cur_t * cursor,dict_index_t * index,const rec_t * rec,rec_offs * offsets,mtr_t * mtr)1697 page_cur_insert_rec_zip(
1698 /*====================*/
1699 	page_cur_t*	cursor,	/*!< in/out: page cursor */
1700 	dict_index_t*	index,	/*!< in: record descriptor */
1701 	const rec_t*	rec,	/*!< in: pointer to a physical record */
1702 	rec_offs*	offsets,/*!< in/out: rec_get_offsets(rec, index) */
1703 	mtr_t*		mtr)	/*!< in/out: mini-transaction */
1704 {
1705   page_zip_des_t * const page_zip= page_cur_get_page_zip(cursor);
1706   ut_ad(page_zip);
1707   ut_ad(rec_offs_validate(rec, index, offsets));
1708 
1709   ut_ad(index->table->not_redundant());
1710   ut_ad(page_is_comp(cursor->block->frame));
1711   ut_ad(rec_offs_comp(offsets));
1712   ut_ad(fil_page_get_type(cursor->block->frame) == FIL_PAGE_INDEX ||
1713         fil_page_get_type(cursor->block->frame) == FIL_PAGE_RTREE);
1714   ut_ad(mach_read_from_8(PAGE_HEADER + PAGE_INDEX_ID + cursor->block->frame) ==
1715         index->id || mtr->is_inside_ibuf());
1716   ut_ad(!page_get_instant(cursor->block->frame));
1717   ut_ad(!page_cur_is_after_last(cursor));
1718 #ifdef UNIV_ZIP_DEBUG
1719   ut_a(page_zip_validate(page_zip, cursor->block->frame, index));
1720 #endif /* UNIV_ZIP_DEBUG */
1721 
1722   /* 1. Get the size of the physical record in the page */
1723   const ulint rec_size= rec_offs_size(offsets);
1724 
1725 #ifdef HAVE_MEM_CHECK
1726   {
1727     const void *rec_start __attribute__((unused))=
1728       rec - rec_offs_extra_size(offsets);
1729     ulint extra_size __attribute__((unused))=
1730       rec_offs_extra_size(offsets) - REC_N_NEW_EXTRA_BYTES;
1731     /* All data bytes of the record must be valid. */
1732     MEM_CHECK_DEFINED(rec, rec_offs_data_size(offsets));
1733     /* The variable-length header must be valid. */
1734     MEM_CHECK_DEFINED(rec_start, extra_size);
1735   }
1736 #endif /* HAVE_MEM_CHECK */
1737   const bool reorg_before_insert= page_has_garbage(cursor->block->frame) &&
1738     rec_size > page_get_max_insert_size(cursor->block->frame, 1) &&
1739     rec_size <= page_get_max_insert_size_after_reorganize(cursor->block->frame,
1740                                                           1);
1741   constexpr uint16_t page_free_f= PAGE_FREE + PAGE_HEADER;
1742   byte* const page_free = my_assume_aligned<4>(page_free_f +
1743                                                cursor->block->frame);
1744   uint16_t free_rec= 0;
1745 
1746   /* 2. Try to find suitable space from page memory management */
1747   ulint heap_no;
1748   byte *insert_buf;
1749 
1750   if (reorg_before_insert ||
1751       !page_zip_available(page_zip, index->is_clust(), rec_size, 1))
1752   {
1753     /* SET GLOBAL might be executed concurrently. Sample the value once. */
1754     ulint level= page_zip_level;
1755 #ifdef UNIV_DEBUG
1756     const rec_t * const cursor_rec= page_cur_get_rec(cursor);
1757 #endif /* UNIV_DEBUG */
1758 
1759     if (page_is_empty(cursor->block->frame))
1760     {
1761       ut_ad(page_cur_is_before_first(cursor));
1762 
1763       /* This is an empty page. Recreate to remove the modification log. */
1764       page_create_zip(cursor->block, index,
1765                       page_header_get_field(cursor->block->frame, PAGE_LEVEL),
1766                       0, mtr);
1767       ut_ad(!page_header_get_ptr(cursor->block->frame, PAGE_FREE));
1768 
1769       if (page_zip_available(page_zip, index->is_clust(), rec_size, 1))
1770         goto use_heap;
1771 
1772       /* The cursor should remain on the page infimum. */
1773       return nullptr;
1774     }
1775 
1776     if (page_zip->m_nonempty || page_has_garbage(cursor->block->frame))
1777     {
1778       ulint pos= page_rec_get_n_recs_before(cursor->rec);
1779 
1780       if (!page_zip_reorganize(cursor->block, index, level, mtr, true))
1781       {
1782         ut_ad(cursor->rec == cursor_rec);
1783         return nullptr;
1784       }
1785 
1786       if (pos)
1787         cursor->rec= page_rec_get_nth(cursor->block->frame, pos);
1788       else
1789         ut_ad(cursor->rec == page_get_infimum_rec(cursor->block->frame));
1790 
1791       ut_ad(!page_header_get_ptr(cursor->block->frame, PAGE_FREE));
1792 
1793       if (page_zip_available(page_zip, index->is_clust(), rec_size, 1))
1794         goto use_heap;
1795     }
1796 
1797     /* Try compressing the whole page afterwards. */
1798     const mtr_log_t log_mode= mtr->set_log_mode(MTR_LOG_NONE);
1799     rec_t *insert_rec= page_cur_insert_rec_low(cursor, index, rec, offsets,
1800                                                mtr);
1801     mtr->set_log_mode(log_mode);
1802 
1803     if (insert_rec)
1804     {
1805       ulint pos= page_rec_get_n_recs_before(insert_rec);
1806       ut_ad(pos > 0);
1807 
1808       /* We are writing entire page images to the log.  Reduce the redo
1809       log volume by reorganizing the page at the same time. */
1810       if (page_zip_reorganize(cursor->block, index, level, mtr))
1811       {
1812         /* The page was reorganized: Seek to pos. */
1813         cursor->rec= pos > 1
1814           ? page_rec_get_nth(cursor->block->frame, pos - 1)
1815           : cursor->block->frame + PAGE_NEW_INFIMUM;
1816         insert_rec= cursor->block->frame + rec_get_next_offs(cursor->rec, 1);
1817         rec_offs_make_valid(insert_rec, index,
1818                             page_is_leaf(cursor->block->frame), offsets);
1819         return insert_rec;
1820       }
1821 
1822       /* Theoretically, we could try one last resort of
1823       page_zip_reorganize() followed by page_zip_available(), but that
1824       would be very unlikely to succeed. (If the full reorganized page
1825       failed to compress, why would it succeed to compress the page,
1826       plus log the insert of this record?) */
1827 
1828       /* Out of space: restore the page */
1829       if (!page_zip_decompress(page_zip, cursor->block->frame, false))
1830         ut_error; /* Memory corrupted? */
1831       ut_ad(page_validate(cursor->block->frame, index));
1832       insert_rec= nullptr;
1833     }
1834     return insert_rec;
1835   }
1836 
1837   free_rec= mach_read_from_2(page_free);
1838   if (free_rec)
1839   {
1840     /* Try to allocate from the head of the free list. */
1841     rec_offs foffsets_[REC_OFFS_NORMAL_SIZE];
1842     mem_heap_t *heap= nullptr;
1843 
1844     rec_offs_init(foffsets_);
1845 
1846     rec_offs *foffsets= rec_get_offsets(cursor->block->frame + free_rec, index,
1847                                         foffsets_,
1848                                         page_is_leaf(cursor->block->frame)
1849                                         ? index->n_core_fields : 0,
1850                                         ULINT_UNDEFINED, &heap);
1851     insert_buf= cursor->block->frame + free_rec -
1852       rec_offs_extra_size(foffsets);
1853 
1854     if (rec_offs_size(foffsets) < rec_size)
1855     {
1856 too_small:
1857       if (UNIV_LIKELY_NULL(heap))
1858         mem_heap_free(heap);
1859       free_rec= 0;
1860       goto use_heap;
1861     }
1862 
1863     /* On compressed pages, do not relocate records from
1864     the free list. If extra_size would grow, use the heap. */
1865     const ssize_t extra_size_diff= lint(rec_offs_extra_size(offsets) -
1866                                         rec_offs_extra_size(foffsets));
1867 
1868     if (UNIV_UNLIKELY(extra_size_diff < 0))
1869     {
1870       /* Add an offset to the extra_size. */
1871       if (rec_offs_size(foffsets) < rec_size - ssize_t(extra_size_diff))
1872         goto too_small;
1873 
1874       insert_buf-= extra_size_diff;
1875     }
1876     else if (UNIV_UNLIKELY(extra_size_diff))
1877       /* Do not allow extra_size to grow */
1878       goto too_small;
1879 
1880     byte *const free_rec_ptr= cursor->block->frame + free_rec;
1881     heap_no= rec_get_heap_no_new(free_rec_ptr);
1882     int16_t next_rec= mach_read_from_2(free_rec_ptr - REC_NEXT);
1883     /* With innodb_page_size=64k, int16_t would be unsafe to use here,
1884     but that cannot be used with ROW_FORMAT=COMPRESSED. */
1885     static_assert(UNIV_ZIP_SIZE_SHIFT_MAX == 14, "compatibility");
1886     if (next_rec)
1887     {
1888       next_rec= static_cast<int16_t>(next_rec + free_rec);
1889       ut_ad(int{PAGE_NEW_SUPREMUM_END + REC_N_NEW_EXTRA_BYTES} <= next_rec);
1890       ut_ad(static_cast<uint16_t>(next_rec) < srv_page_size);
1891     }
1892 
1893     byte *hdr= my_assume_aligned<4>(&page_zip->data[page_free_f]);
1894     mach_write_to_2(hdr, static_cast<uint16_t>(next_rec));
1895     const byte *const garbage= my_assume_aligned<2>(page_free + 2);
1896     ut_ad(mach_read_from_2(garbage) >= rec_size);
1897     mach_write_to_2(my_assume_aligned<2>(hdr + 2),
1898                     mach_read_from_2(garbage) - rec_size);
1899     static_assert(PAGE_GARBAGE == PAGE_FREE + 2, "compatibility");
1900     mtr->memcpy(*cursor->block, page_free, hdr, 4);
1901 
1902     if (!page_is_leaf(cursor->block->frame))
1903     {
1904       /* Zero out the node pointer of free_rec, in case it will not be
1905       overwritten by insert_rec. */
1906       ut_ad(rec_size > REC_NODE_PTR_SIZE);
1907 
1908       if (rec_offs_size(foffsets) > rec_size)
1909         memset(rec_get_end(free_rec_ptr, foffsets) -
1910                REC_NODE_PTR_SIZE, 0, REC_NODE_PTR_SIZE);
1911     }
1912     else if (index->is_clust())
1913     {
1914       /* Zero out DB_TRX_ID,DB_ROLL_PTR in free_rec, in case they will
1915       not be overwritten by insert_rec. */
1916 
1917       ulint len;
1918       ulint trx_id_offs= rec_get_nth_field_offs(foffsets, index->db_trx_id(),
1919                                                 &len);
1920       ut_ad(len == DATA_TRX_ID_LEN);
1921 
1922       if (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN + trx_id_offs +
1923           rec_offs_extra_size(foffsets) > rec_size)
1924         memset(free_rec_ptr + trx_id_offs, 0,
1925                DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
1926 
1927       ut_ad(free_rec_ptr + trx_id_offs + DATA_TRX_ID_LEN ==
1928             rec_get_nth_field(free_rec_ptr, foffsets, index->db_roll_ptr(),
1929                               &len));
1930       ut_ad(len == DATA_ROLL_PTR_LEN);
1931     }
1932 
1933     if (UNIV_LIKELY_NULL(heap))
1934       mem_heap_free(heap);
1935   }
1936   else
1937   {
1938 use_heap:
1939     ut_ad(!free_rec);
1940     insert_buf= page_mem_alloc_heap<true>(cursor->block, rec_size, &heap_no);
1941 
1942     if (UNIV_UNLIKELY(!insert_buf))
1943       return insert_buf;
1944 
1945     static_assert(PAGE_N_HEAP == PAGE_HEAP_TOP + 2, "compatibility");
1946     mtr->memcpy(*cursor->block, PAGE_HEAP_TOP + PAGE_HEADER, 4);
1947     page_zip_dir_add_slot(cursor->block, index, mtr);
1948   }
1949 
1950   /* 3. Create the record */
1951   byte *insert_rec= rec_copy(insert_buf, rec, offsets);
1952   rec_offs_make_valid(insert_rec, index, page_is_leaf(cursor->block->frame),
1953                       offsets);
1954 
1955   /* 4. Insert the record in the linked list of records */
1956   ut_ad(cursor->rec != insert_rec);
1957 
1958   /* next record after current before the insertion */
1959   const rec_t* next_rec = page_rec_get_next_low(cursor->rec, TRUE);
1960   ut_ad(rec_get_status(cursor->rec) <= REC_STATUS_INFIMUM);
1961   ut_ad(rec_get_status(insert_rec) < REC_STATUS_INFIMUM);
1962   ut_ad(rec_get_status(next_rec) != REC_STATUS_INFIMUM);
1963 
1964   mach_write_to_2(insert_rec - REC_NEXT, static_cast<uint16_t>
1965                   (next_rec - insert_rec));
1966   mach_write_to_2(cursor->rec - REC_NEXT, static_cast<uint16_t>
1967                   (insert_rec - cursor->rec));
1968   byte *n_recs= my_assume_aligned<2>(PAGE_N_RECS + PAGE_HEADER +
1969                                      cursor->block->frame);
1970   mtr->write<2>(*cursor->block, n_recs, 1U + mach_read_from_2(n_recs));
1971   memcpy_aligned<2>(&page_zip->data[PAGE_N_RECS + PAGE_HEADER], n_recs, 2);
1972 
1973   /* 5. Set the n_owned field in the inserted record to zero,
1974   and set the heap_no field */
1975   rec_set_bit_field_1(insert_rec, 0, REC_NEW_N_OWNED,
1976                       REC_N_OWNED_MASK, REC_N_OWNED_SHIFT);
1977   rec_set_bit_field_2(insert_rec, heap_no, REC_NEW_HEAP_NO,
1978                       REC_HEAP_NO_MASK, REC_HEAP_NO_SHIFT);
1979 
1980   MEM_CHECK_DEFINED(rec_get_start(insert_rec, offsets),
1981                     rec_offs_size(offsets));
1982 
1983   /* 6. Update the last insertion info in page header */
1984   byte *last_insert= my_assume_aligned<4>(PAGE_LAST_INSERT + PAGE_HEADER +
1985                                           page_zip->data);
1986   const uint16_t last_insert_rec= mach_read_from_2(last_insert);
1987   ut_ad(!last_insert_rec ||
1988         rec_get_node_ptr_flag(cursor->block->frame + last_insert_rec) ==
1989         rec_get_node_ptr_flag(insert_rec));
1990   mach_write_to_2(last_insert, page_offset(insert_rec));
1991 
1992   if (!index->is_spatial())
1993   {
1994     byte *dir= &page_zip->data[PAGE_HEADER + PAGE_DIRECTION_B];
1995     ut_ad(!(*dir & ~((1U << 3) - 1)));
1996     byte *n= my_assume_aligned<2>
1997       (&page_zip->data[PAGE_HEADER + PAGE_N_DIRECTION]);
1998     if (UNIV_UNLIKELY(!last_insert_rec))
1999     {
2000 no_direction:
2001       *dir= PAGE_NO_DIRECTION;
2002       memset(n, 0, 2);
2003     }
2004     else if (*dir != PAGE_LEFT &&
2005              cursor->block->frame + last_insert_rec == cursor->rec)
2006     {
2007       *dir= PAGE_RIGHT;
2008 inc_dir:
2009       mach_write_to_2(n, mach_read_from_2(n) + 1);
2010     }
2011     else if (*dir != PAGE_RIGHT && page_rec_get_next(insert_rec) ==
2012              cursor->block->frame + last_insert_rec)
2013     {
2014       *dir= PAGE_LEFT;
2015       goto inc_dir;
2016     }
2017     else
2018       goto no_direction;
2019   }
2020 
2021   /* Write the header fields in one record. */
2022   mtr->memcpy(*cursor->block,
2023               my_assume_aligned<8>(PAGE_LAST_INSERT + PAGE_HEADER +
2024                                    cursor->block->frame),
2025               my_assume_aligned<8>(PAGE_LAST_INSERT + PAGE_HEADER +
2026                                    page_zip->data),
2027               PAGE_N_RECS - PAGE_LAST_INSERT + 2);
2028 
2029   /* 7. It remains to update the owner record. */
2030   ulint n_owned;
2031 
2032   while (!(n_owned = rec_get_n_owned_new(next_rec)))
2033     next_rec= page_rec_get_next_low(next_rec, true);
2034 
2035   rec_set_bit_field_1(const_cast<rec_t*>(next_rec), n_owned + 1,
2036                       REC_NEW_N_OWNED, REC_N_OWNED_MASK, REC_N_OWNED_SHIFT);
2037 
2038   page_zip_dir_insert(cursor, free_rec, insert_rec, mtr);
2039 
2040   /* 8. Now we have incremented the n_owned field of the owner
2041   record. If the number exceeds PAGE_DIR_SLOT_MAX_N_OWNED,
2042   we have to split the corresponding directory slot in two. */
2043   if (UNIV_UNLIKELY(n_owned == PAGE_DIR_SLOT_MAX_N_OWNED))
2044     page_zip_dir_split_slot(cursor->block,
2045                             page_dir_find_owner_slot(next_rec), mtr);
2046 
2047   page_zip_write_rec(cursor->block, insert_rec, index, offsets, 1, mtr);
2048   return insert_rec;
2049 }
2050 
2051 /** Prepend a record to the PAGE_FREE list, or shrink PAGE_HEAP_TOP.
2052 @param[in,out]  block        index page
2053 @param[in,out]  rec          record being deleted
2054 @param[in]      data_size    record payload size, in bytes
2055 @param[in]      extra_size   record header size, in bytes */
page_mem_free(const buf_block_t & block,rec_t * rec,size_t data_size,size_t extra_size)2056 static void page_mem_free(const buf_block_t &block, rec_t *rec,
2057                           size_t data_size, size_t extra_size)
2058 {
2059   ut_ad(page_align(rec) == block.frame);
2060   ut_ad(!block.page.zip.data);
2061   const rec_t *free= page_header_get_ptr(block.frame, PAGE_FREE);
2062 
2063   const uint16_t n_heap= uint16_t(page_header_get_field(block.frame,
2064                                                         PAGE_N_HEAP) - 1);
2065   ut_ad(page_get_n_recs(block.frame) < (n_heap & 0x7fff));
2066   const bool deleting_top= n_heap == ((n_heap & 0x8000)
2067                                       ? (rec_get_heap_no_new(rec) | 0x8000)
2068                                       : rec_get_heap_no_old(rec));
2069 
2070   if (deleting_top)
2071   {
2072     byte *page_heap_top= my_assume_aligned<2>(PAGE_HEAP_TOP + PAGE_HEADER +
2073                                               block.frame);
2074     const uint16_t heap_top= mach_read_from_2(page_heap_top);
2075     const size_t extra_savings= heap_top - page_offset(rec + data_size);
2076     ut_ad(extra_savings < heap_top);
2077 
2078     /* When deleting the last record, do not add it to the PAGE_FREE list.
2079     Instead, decrement PAGE_HEAP_TOP and PAGE_N_HEAP. */
2080     mach_write_to_2(page_heap_top, page_offset(rec - extra_size));
2081     mach_write_to_2(my_assume_aligned<2>(page_heap_top + 2), n_heap);
2082     static_assert(PAGE_N_HEAP == PAGE_HEAP_TOP + 2, "compatibility");
2083     if (extra_savings)
2084     {
2085       byte *page_garbage= my_assume_aligned<2>(PAGE_GARBAGE + PAGE_HEADER +
2086                                                block.frame);
2087       uint16_t garbage= mach_read_from_2(page_garbage);
2088       ut_ad(garbage >= extra_savings);
2089       mach_write_to_2(page_garbage, garbage - extra_savings);
2090     }
2091   }
2092   else
2093   {
2094     byte *page_free= my_assume_aligned<2>(PAGE_FREE + PAGE_HEADER +
2095                                           block.frame);
2096     byte *page_garbage= my_assume_aligned<2>(PAGE_GARBAGE + PAGE_HEADER +
2097                                              block.frame);
2098     mach_write_to_2(page_free, page_offset(rec));
2099     mach_write_to_2(page_garbage, mach_read_from_2(page_garbage) +
2100                     extra_size + data_size);
2101   }
2102 
2103   memset_aligned<2>(PAGE_LAST_INSERT + PAGE_HEADER + block.frame, 0, 2);
2104   byte *page_n_recs= my_assume_aligned<2>(PAGE_N_RECS + PAGE_HEADER +
2105                                           block.frame);
2106   mach_write_to_2(page_n_recs, mach_read_from_2(page_n_recs) - 1);
2107 
2108   const byte* const end= rec + data_size;
2109 
2110   if (!deleting_top)
2111   {
2112     uint16_t next= free
2113       ? ((n_heap & 0x8000)
2114          ? static_cast<uint16_t>(free - rec)
2115          : static_cast<uint16_t>(free - block.frame))
2116       : uint16_t{0};
2117     mach_write_to_2(rec - REC_NEXT, next);
2118   }
2119   else
2120     rec-= extra_size;
2121 
2122   memset(rec, 0, end - rec);
2123 }
2124 
2125 /***********************************************************//**
2126 Deletes a record at the page cursor. The cursor is moved to the next
2127 record after the deleted one. */
2128 void
page_cur_delete_rec(page_cur_t * cursor,const dict_index_t * index,const rec_offs * offsets,mtr_t * mtr)2129 page_cur_delete_rec(
2130 /*================*/
2131 	page_cur_t*		cursor,	/*!< in/out: a page cursor */
2132 	const dict_index_t*	index,	/*!< in: record descriptor */
2133 	const rec_offs*		offsets,/*!< in: rec_get_offsets(
2134 					cursor->rec, index) */
2135 	mtr_t*			mtr)	/*!< in/out: mini-transaction */
2136 {
2137 	page_dir_slot_t* cur_dir_slot;
2138 	rec_t*		current_rec;
2139 	rec_t*		prev_rec	= NULL;
2140 	rec_t*		next_rec;
2141 	ulint		cur_slot_no;
2142 	ulint		cur_n_owned;
2143 	rec_t*		rec;
2144 
2145 	/* page_zip_validate() will fail here when
2146 	btr_cur_pessimistic_delete() invokes btr_set_min_rec_mark().
2147 	Then, both "page_zip" and "block->frame" would have the min-rec-mark
2148 	set on the smallest user record, but "block->frame" would additionally
2149 	have it set on the smallest-but-one record.  Because sloppy
2150 	page_zip_validate_low() only ignores min-rec-flag differences
2151 	in the smallest user record, it cannot be used here either. */
2152 
2153 	current_rec = cursor->rec;
2154 	buf_block_t* const block = cursor->block;
2155 	ut_ad(rec_offs_validate(current_rec, index, offsets));
2156 	ut_ad(!!page_is_comp(block->frame) == index->table->not_redundant());
2157 	ut_ad(fil_page_index_page_check(block->frame));
2158 	ut_ad(mach_read_from_8(PAGE_HEADER + PAGE_INDEX_ID + block->frame)
2159 	      == index->id
2160 	      || mtr->is_inside_ibuf());
2161 	ut_ad(mtr->is_named_space(index->table->space));
2162 
2163 	/* The record must not be the supremum or infimum record. */
2164 	ut_ad(page_rec_is_user_rec(current_rec));
2165 
2166 	if (page_get_n_recs(block->frame) == 1
2167 	    && !rec_is_alter_metadata(current_rec, *index)) {
2168 		/* Empty the page. */
2169 		ut_ad(page_is_leaf(block->frame));
2170 		/* Usually, this should be the root page,
2171 		and the whole index tree should become empty.
2172 		However, this could also be a call in
2173 		btr_cur_pessimistic_update() to delete the only
2174 		record in the page and to insert another one. */
2175 		page_cur_move_to_next(cursor);
2176 		ut_ad(page_cur_is_after_last(cursor));
2177 		page_create_empty(page_cur_get_block(cursor),
2178 				  const_cast<dict_index_t*>(index), mtr);
2179 		return;
2180 	}
2181 
2182 	/* Save to local variables some data associated with current_rec */
2183 	cur_slot_no = page_dir_find_owner_slot(current_rec);
2184 	ut_ad(cur_slot_no > 0);
2185 	cur_dir_slot = page_dir_get_nth_slot(block->frame, cur_slot_no);
2186 	cur_n_owned = page_dir_slot_get_n_owned(cur_dir_slot);
2187 
2188 	/* The page gets invalid for btr_pcur_restore_pos().
2189 	We avoid invoking buf_block_modify_clock_inc(block) because its
2190 	consistency checks would fail for the dummy block that is being
2191 	used during IMPORT TABLESPACE. */
2192 	block->modify_clock++;
2193 
2194 	/* Find the next and the previous record. Note that the cursor is
2195 	left at the next record. */
2196 
2197 	rec = const_cast<rec_t*>
2198 		(page_dir_slot_get_rec(cur_dir_slot + PAGE_DIR_SLOT_SIZE));
2199 
2200 	/* rec now points to the record of the previous directory slot. Look
2201 	for the immediate predecessor of current_rec in a loop. */
2202 
2203 	while (current_rec != rec) {
2204 		prev_rec = rec;
2205 		rec = page_rec_get_next(rec);
2206 	}
2207 
2208 	page_cur_move_to_next(cursor);
2209 	next_rec = cursor->rec;
2210 
2211 	/* Remove the record from the linked list of records */
2212 	/* If the deleted record is pointed to by a dir slot, update the
2213 	record pointer in slot. In the following if-clause we assume that
2214 	prev_rec is owned by the same slot, i.e., PAGE_DIR_SLOT_MIN_N_OWNED
2215 	>= 2. */
2216 	/* Update the number of owned records of the slot */
2217 
2218 	compile_time_assert(PAGE_DIR_SLOT_MIN_N_OWNED >= 2);
2219 	ut_ad(cur_n_owned > 1);
2220 
2221 	rec_t* slot_rec = const_cast<rec_t*>
2222 		(page_dir_slot_get_rec(cur_dir_slot));
2223 
2224 	if (UNIV_LIKELY_NULL(block->page.zip.data)) {
2225 		ut_ad(page_is_comp(block->frame));
2226 		if (current_rec == slot_rec) {
2227 			page_zip_rec_set_owned(block, prev_rec, 1, mtr);
2228 			page_zip_rec_set_owned(block, slot_rec, 0, mtr);
2229 			slot_rec = prev_rec;
2230 			mach_write_to_2(cur_dir_slot, page_offset(slot_rec));
2231 		} else if (cur_n_owned == 1
2232 			   && !page_rec_is_supremum(slot_rec)) {
2233 			page_zip_rec_set_owned(block, slot_rec, 0, mtr);
2234 		}
2235 
2236 		mach_write_to_2(prev_rec - REC_NEXT, static_cast<uint16_t>
2237 				(next_rec - prev_rec));
2238 		slot_rec[-REC_NEW_N_OWNED] = static_cast<byte>(
2239 			(slot_rec[-REC_NEW_N_OWNED] & ~REC_N_OWNED_MASK)
2240 			| (cur_n_owned - 1) << REC_N_OWNED_SHIFT);
2241 
2242 		page_header_reset_last_insert(block, mtr);
2243 		page_zip_dir_delete(block, rec, index, offsets,
2244 				    page_header_get_ptr(block->frame,
2245 							PAGE_FREE),
2246 				    mtr);
2247 		if (cur_n_owned <= PAGE_DIR_SLOT_MIN_N_OWNED) {
2248 			page_zip_dir_balance_slot(block, cur_slot_no, mtr);
2249 		}
2250 		return;
2251 	}
2252 
2253 	if (current_rec == slot_rec) {
2254 		slot_rec = prev_rec;
2255 		mach_write_to_2(cur_dir_slot, page_offset(slot_rec));
2256 	}
2257 
2258 	const size_t data_size = rec_offs_data_size(offsets);
2259 	const size_t extra_size = rec_offs_extra_size(offsets);
2260 
2261 	if (page_is_comp(block->frame)) {
2262 		mtr->page_delete(*block, page_offset(prev_rec)
2263 				 - PAGE_NEW_INFIMUM,
2264 				 extra_size - REC_N_NEW_EXTRA_BYTES,
2265 				 data_size);
2266 		mach_write_to_2(prev_rec - REC_NEXT, static_cast<uint16_t>
2267 				(next_rec - prev_rec));
2268 		slot_rec[-REC_NEW_N_OWNED] = static_cast<byte>(
2269 			(slot_rec[-REC_NEW_N_OWNED] & ~REC_N_OWNED_MASK)
2270 			| (cur_n_owned - 1) << REC_N_OWNED_SHIFT);
2271 	} else {
2272 		mtr->page_delete(*block, page_offset(prev_rec)
2273 				 - PAGE_OLD_INFIMUM);
2274 		memcpy(prev_rec - REC_NEXT, current_rec - REC_NEXT, 2);
2275 		slot_rec[-REC_OLD_N_OWNED] = static_cast<byte>(
2276 			(slot_rec[-REC_OLD_N_OWNED] & ~REC_N_OWNED_MASK)
2277 			| (cur_n_owned - 1) << REC_N_OWNED_SHIFT);
2278 	}
2279 
2280 	page_mem_free(*block, current_rec, data_size, extra_size);
2281 
2282 	/* Now we have decremented the number of owned records of the slot.
2283 	If the number drops below PAGE_DIR_SLOT_MIN_N_OWNED, we balance the
2284 	slots. */
2285 
2286 	if (cur_n_owned <= PAGE_DIR_SLOT_MIN_N_OWNED) {
2287 		page_dir_balance_slot(*block, cur_slot_no);
2288 	}
2289 
2290 	ut_ad(page_is_comp(block->frame)
2291 	      ? page_simple_validate_new(block->frame)
2292 	      : page_simple_validate_old(block->frame));
2293 }
2294 
2295 /** Apply a INSERT_HEAP_REDUNDANT or INSERT_REUSE_REDUNDANT record that was
2296 written by page_cur_insert_rec_low() for a ROW_FORMAT=REDUNDANT page.
2297 @param block      B-tree or R-tree page in ROW_FORMAT=COMPACT or DYNAMIC
2298 @param reuse      false=allocate from PAGE_HEAP_TOP; true=reuse PAGE_FREE
2299 @param prev       byte offset of the predecessor, relative to PAGE_OLD_INFIMUM
2300 @param enc_hdr    encoded fixed-size header bits
2301 @param hdr_c      number of common record header bytes with prev
2302 @param data_c     number of common data bytes with prev
2303 @param data       literal header and data bytes
2304 @param data_len   length of the literal data, in bytes
2305 @return whether the operation failed (inconcistency was noticed) */
page_apply_insert_redundant(const buf_block_t & block,bool reuse,ulint prev,ulint enc_hdr,size_t hdr_c,size_t data_c,const void * data,size_t data_len)2306 bool page_apply_insert_redundant(const buf_block_t &block, bool reuse,
2307                                  ulint prev, ulint enc_hdr,
2308                                  size_t hdr_c, size_t data_c,
2309                                  const void *data, size_t data_len)
2310 {
2311   const uint16_t n_slots= page_dir_get_n_slots(block.frame);
2312   byte *page_n_heap= my_assume_aligned<2>(PAGE_N_HEAP + PAGE_HEADER +
2313                                           block.frame);
2314   const uint16_t h= mach_read_from_2(page_n_heap);
2315   const page_id_t id(block.page.id());
2316   if (UNIV_UNLIKELY(n_slots < 2 || h < n_slots || h < PAGE_HEAP_NO_USER_LOW ||
2317                     h >= srv_page_size / REC_N_OLD_EXTRA_BYTES ||
2318                     !fil_page_index_page_check(block.frame) ||
2319                     page_get_page_no(block.frame) != id.page_no() ||
2320                     mach_read_from_2(my_assume_aligned<2>
2321                                      (PAGE_OLD_SUPREMUM - REC_NEXT +
2322                                       block.frame))))
2323   {
2324 corrupted:
2325     ib::error() << (reuse
2326                     ? "Not applying INSERT_REUSE_REDUNDANT"
2327                     " due to corruption on "
2328                     : "Not applying INSERT_HEAP_REDUNDANT"
2329                     " due to corruption on ")
2330                 << id;
2331     return true;
2332   }
2333 
2334   byte * const last_slot= page_dir_get_nth_slot(block.frame, n_slots - 1);
2335   byte * const page_heap_top= my_assume_aligned<2>
2336     (PAGE_HEAP_TOP + PAGE_HEADER + block.frame);
2337   const byte *const heap_bot= &block.frame[PAGE_OLD_SUPREMUM_END];
2338   byte *heap_top= block.frame + mach_read_from_2(page_heap_top);
2339   if (UNIV_UNLIKELY(heap_bot > heap_top || heap_top > last_slot))
2340     goto corrupted;
2341   if (UNIV_UNLIKELY(mach_read_from_2(last_slot) != PAGE_OLD_SUPREMUM))
2342     goto corrupted;
2343   if (UNIV_UNLIKELY(mach_read_from_2(page_dir_get_nth_slot(block.frame, 0)) !=
2344                                      PAGE_OLD_INFIMUM))
2345     goto corrupted;
2346   rec_t * const prev_rec= block.frame + PAGE_OLD_INFIMUM + prev;
2347   if (!prev);
2348   else if (UNIV_UNLIKELY(heap_bot + (REC_N_OLD_EXTRA_BYTES + 1) > prev_rec ||
2349                          prev_rec > heap_top))
2350     goto corrupted;
2351   const ulint pn_fields= rec_get_bit_field_2(prev_rec, REC_OLD_N_FIELDS,
2352                                              REC_OLD_N_FIELDS_MASK,
2353                                              REC_OLD_N_FIELDS_SHIFT);
2354   if (UNIV_UNLIKELY(pn_fields == 0 || pn_fields > REC_MAX_N_FIELDS))
2355     goto corrupted;
2356   const ulint pextra_size= REC_N_OLD_EXTRA_BYTES +
2357     (rec_get_1byte_offs_flag(prev_rec) ? pn_fields : pn_fields * 2);
2358   if (prev_rec == &block.frame[PAGE_OLD_INFIMUM]);
2359   else if (UNIV_UNLIKELY(prev_rec - pextra_size < heap_bot))
2360     goto corrupted;
2361   if (UNIV_UNLIKELY(hdr_c && prev_rec - hdr_c < heap_bot))
2362     goto corrupted;
2363   const ulint pdata_size= rec_get_data_size_old(prev_rec);
2364   if (UNIV_UNLIKELY(prev_rec + pdata_size > heap_top))
2365     goto corrupted;
2366   rec_t * const next_rec= block.frame + mach_read_from_2(prev_rec - REC_NEXT);
2367   if (next_rec == block.frame + PAGE_OLD_SUPREMUM);
2368   else if (UNIV_UNLIKELY(heap_bot + REC_N_OLD_EXTRA_BYTES > next_rec ||
2369                          next_rec > heap_top))
2370     goto corrupted;
2371   const bool is_short= (enc_hdr >> 2) & 1;
2372   const ulint n_fields= (enc_hdr >> 3) + 1;
2373   if (UNIV_UNLIKELY(n_fields > REC_MAX_N_FIELDS))
2374     goto corrupted;
2375   const ulint extra_size= REC_N_OLD_EXTRA_BYTES +
2376     (is_short ? n_fields : n_fields * 2);
2377   hdr_c+= REC_N_OLD_EXTRA_BYTES;
2378   if (UNIV_UNLIKELY(hdr_c > extra_size))
2379     goto corrupted;
2380   if (UNIV_UNLIKELY(extra_size - hdr_c > data_len))
2381     goto corrupted;
2382   /* We buffer all changes to the record header locally, so that
2383   we will avoid modifying the page before all consistency checks
2384   have been fulfilled. */
2385   alignas(2) byte insert_buf[REC_N_OLD_EXTRA_BYTES + REC_MAX_N_FIELDS * 2];
2386 
2387   ulint n_owned;
2388   rec_t *owner_rec= next_rec;
2389   for (ulint ns= PAGE_DIR_SLOT_MAX_N_OWNED;
2390        !(n_owned= rec_get_n_owned_old(owner_rec)); )
2391   {
2392     owner_rec= block.frame + mach_read_from_2(owner_rec - REC_NEXT);
2393     if (owner_rec == &block.frame[PAGE_OLD_SUPREMUM]);
2394     else if (UNIV_UNLIKELY(heap_bot + REC_N_OLD_EXTRA_BYTES > owner_rec ||
2395                            owner_rec > heap_top))
2396       goto corrupted;
2397     if (!ns--)
2398       goto corrupted; /* Corrupted (cyclic?) next-record list */
2399   }
2400 
2401   page_dir_slot_t *owner_slot= last_slot;
2402 
2403   if (n_owned > PAGE_DIR_SLOT_MAX_N_OWNED)
2404     goto corrupted;
2405   else
2406   {
2407     mach_write_to_2(insert_buf, owner_rec - block.frame);
2408     static_assert(PAGE_DIR_SLOT_SIZE == 2, "compatibility");
2409     const page_dir_slot_t * const first_slot=
2410       page_dir_get_nth_slot(block.frame, 0);
2411 
2412     while (memcmp_aligned<2>(owner_slot, insert_buf, 2))
2413       if ((owner_slot+= 2) == first_slot)
2414         goto corrupted;
2415   }
2416 
2417   memcpy(insert_buf, data, extra_size - hdr_c);
2418   byte *insert_rec= &insert_buf[extra_size];
2419   memcpy(insert_rec - hdr_c, prev_rec - hdr_c, hdr_c);
2420   rec_set_bit_field_1(insert_rec, (enc_hdr & 3) << 4,
2421                       REC_OLD_INFO_BITS, REC_INFO_BITS_MASK,
2422                       REC_INFO_BITS_SHIFT);
2423   rec_set_1byte_offs_flag(insert_rec, is_short);
2424   rec_set_n_fields_old(insert_rec, n_fields);
2425   rec_set_bit_field_1(insert_rec, 0, REC_OLD_N_OWNED,
2426                       REC_N_OWNED_MASK, REC_N_OWNED_SHIFT);
2427 
2428   const ulint data_size= rec_get_data_size_old(insert_rec);
2429   if (UNIV_UNLIKELY(data_c > data_size))
2430     goto corrupted;
2431   if (UNIV_UNLIKELY(extra_size - hdr_c + data_size - data_c != data_len))
2432     goto corrupted;
2433 
2434   /* Perform final consistency checks and then apply the change to the page. */
2435   byte *buf;
2436   if (reuse)
2437   {
2438     byte *page_free= my_assume_aligned<2>(PAGE_FREE + PAGE_HEADER +
2439                                           block.frame);
2440     rec_t *free_rec= block.frame + mach_read_from_2(page_free);
2441     if (UNIV_UNLIKELY(heap_bot + REC_N_OLD_EXTRA_BYTES > free_rec ||
2442                       free_rec > heap_top))
2443       goto corrupted;
2444     const ulint fn_fields= rec_get_n_fields_old(free_rec);
2445     const ulint fextra_size= REC_N_OLD_EXTRA_BYTES +
2446       (rec_get_1byte_offs_flag(free_rec) ? fn_fields : fn_fields * 2);
2447     if (UNIV_UNLIKELY(free_rec - fextra_size < heap_bot))
2448       goto corrupted;
2449     const ulint fdata_size= rec_get_data_size_old(free_rec);
2450     if (UNIV_UNLIKELY(free_rec + fdata_size > heap_top))
2451       goto corrupted;
2452     if (UNIV_UNLIKELY(extra_size + data_size > fextra_size + fdata_size))
2453       goto corrupted;
2454     byte *page_garbage= my_assume_aligned<2>(page_free + 2);
2455     if (UNIV_UNLIKELY(mach_read_from_2(page_garbage) <
2456                       fextra_size + fdata_size))
2457       goto corrupted;
2458     buf= free_rec - fextra_size;
2459     const rec_t *const next_free= block.frame +
2460       mach_read_from_2(free_rec - REC_NEXT);
2461     if (next_free == block.frame);
2462     else if (UNIV_UNLIKELY(next_free < &heap_bot[REC_N_OLD_EXTRA_BYTES + 1] ||
2463                            heap_top < next_free))
2464       goto corrupted;
2465     mach_write_to_2(page_garbage, mach_read_from_2(page_garbage) -
2466                     extra_size - data_size);
2467     rec_set_bit_field_2(insert_rec, rec_get_heap_no_old(free_rec),
2468                         REC_OLD_HEAP_NO, REC_HEAP_NO_MASK, REC_HEAP_NO_SHIFT);
2469     memcpy(page_free, free_rec - REC_NEXT, 2);
2470   }
2471   else
2472   {
2473     if (UNIV_UNLIKELY(heap_top + extra_size + data_size > last_slot))
2474       goto corrupted;
2475     rec_set_bit_field_2(insert_rec, h,
2476                         REC_OLD_HEAP_NO, REC_HEAP_NO_MASK, REC_HEAP_NO_SHIFT);
2477     mach_write_to_2(page_n_heap, h + 1);
2478     mach_write_to_2(page_heap_top,
2479                     mach_read_from_2(page_heap_top) + extra_size + data_size);
2480     buf= heap_top;
2481   }
2482 
2483   ut_ad(data_size - data_c == data_len - (extra_size - hdr_c));
2484   byte *page_last_insert= my_assume_aligned<2>(PAGE_LAST_INSERT + PAGE_HEADER +
2485                                                block.frame);
2486   const uint16_t last_insert= mach_read_from_2(page_last_insert);
2487   memcpy(buf, insert_buf, extra_size);
2488   buf+= extra_size;
2489   mach_write_to_2(page_last_insert, buf - block.frame);
2490   memcpy(prev_rec - REC_NEXT, page_last_insert, 2);
2491   memcpy(buf, prev_rec, data_c);
2492   memcpy(buf + data_c, static_cast<const byte*>(data) + (extra_size - hdr_c),
2493          data_len - (extra_size - hdr_c));
2494   rec_set_bit_field_1(owner_rec, n_owned + 1, REC_OLD_N_OWNED,
2495                       REC_N_OWNED_MASK, REC_N_OWNED_SHIFT);
2496 
2497   /* Update PAGE_DIRECTION_B, PAGE_N_DIRECTION if needed */
2498   if (block.frame[FIL_PAGE_TYPE + 1] != byte(FIL_PAGE_RTREE))
2499   {
2500     byte *dir= &block.frame[PAGE_DIRECTION_B + PAGE_HEADER];
2501     byte *n_dir= my_assume_aligned<2>
2502       (&block.frame[PAGE_N_DIRECTION + PAGE_HEADER]);
2503     if (UNIV_UNLIKELY(!last_insert))
2504     {
2505 no_direction:
2506       *dir= static_cast<byte>((*dir & ~((1U << 3) - 1)) | PAGE_NO_DIRECTION);
2507       memset(n_dir, 0, 2);
2508     }
2509     else if (block.frame + last_insert == prev_rec &&
2510              (*dir & ((1U << 3) - 1)) != PAGE_LEFT)
2511     {
2512       *dir= static_cast<byte>((*dir & ~((1U << 3) - 1)) | PAGE_RIGHT);
2513 inc_dir:
2514       mach_write_to_2(n_dir, mach_read_from_2(n_dir) + 1);
2515     }
2516     else if (next_rec == block.frame + last_insert &&
2517              (*dir & ((1U << 3) - 1)) != PAGE_RIGHT)
2518     {
2519       *dir= static_cast<byte>((*dir & ~((1U << 3) - 1)) | PAGE_LEFT);
2520       goto inc_dir;
2521     }
2522     else
2523       goto no_direction;
2524   }
2525 
2526   /* Update PAGE_N_RECS. */
2527   byte *page_n_recs= my_assume_aligned<2>(PAGE_N_RECS + PAGE_HEADER +
2528                                           block.frame);
2529 
2530   mach_write_to_2(page_n_recs, mach_read_from_2(page_n_recs) + 1);
2531 
2532   if (UNIV_UNLIKELY(n_owned == PAGE_DIR_SLOT_MAX_N_OWNED))
2533     page_dir_split_slot(block, owner_slot);
2534   ut_ad(page_simple_validate_old(block.frame));
2535   return false;
2536 }
2537 
2538 /** Apply a INSERT_HEAP_DYNAMIC or INSERT_REUSE_DYNAMIC record that was
2539 written by page_cur_insert_rec_low() for a ROW_FORMAT=COMPACT or DYNAMIC page.
2540 @param block      B-tree or R-tree page in ROW_FORMAT=COMPACT or DYNAMIC
2541 @param reuse      false=allocate from PAGE_HEAP_TOP; true=reuse PAGE_FREE
2542 @param prev       byte offset of the predecessor, relative to PAGE_NEW_INFIMUM
2543 @param shift      unless !reuse: number of bytes the PAGE_FREE is moving
2544 @param enc_hdr_l  number of copied record header bytes, plus record type bits
2545 @param hdr_c      number of common record header bytes with prev
2546 @param data_c     number of common data bytes with prev
2547 @param data       literal header and data bytes
2548 @param data_len   length of the literal data, in bytes
2549 @return whether the operation failed (inconcistency was noticed) */
page_apply_insert_dynamic(const buf_block_t & block,bool reuse,ulint prev,ulint shift,ulint enc_hdr_l,size_t hdr_c,size_t data_c,const void * data,size_t data_len)2550 bool page_apply_insert_dynamic(const buf_block_t &block, bool reuse,
2551                                ulint prev, ulint shift, ulint enc_hdr_l,
2552                                size_t hdr_c, size_t data_c,
2553                                const void *data, size_t data_len)
2554 {
2555   const uint16_t n_slots= page_dir_get_n_slots(block.frame);
2556   byte *page_n_heap= my_assume_aligned<2>(PAGE_N_HEAP + PAGE_HEADER +
2557                                           block.frame);
2558   ulint h= mach_read_from_2(page_n_heap);
2559   const page_id_t id(block.page.id());
2560   if (UNIV_UNLIKELY(n_slots < 2 || h < (PAGE_HEAP_NO_USER_LOW | 0x8000) ||
2561                     (h & 0x7fff) >= srv_page_size / REC_N_NEW_EXTRA_BYTES ||
2562                     (h & 0x7fff) < n_slots ||
2563                     !fil_page_index_page_check(block.frame) ||
2564                     page_get_page_no(block.frame) != id.page_no() ||
2565                     mach_read_from_2(my_assume_aligned<2>
2566                                      (PAGE_NEW_SUPREMUM - REC_NEXT +
2567                                       block.frame)) ||
2568                     ((enc_hdr_l & REC_STATUS_INSTANT) &&
2569                      !page_is_leaf(block.frame)) ||
2570                     (enc_hdr_l >> 3) > data_len))
2571   {
2572 corrupted:
2573     ib::error() << (reuse
2574                     ? "Not applying INSERT_REUSE_DYNAMIC"
2575                     " due to corruption on "
2576                     : "Not applying INSERT_HEAP_DYNAMIC"
2577                     " due to corruption on ")
2578                 << id;
2579     return true;
2580   }
2581 
2582   byte * const last_slot= page_dir_get_nth_slot(block.frame, n_slots - 1);
2583   byte * const page_heap_top= my_assume_aligned<2>
2584     (PAGE_HEAP_TOP + PAGE_HEADER + block.frame);
2585   const byte *const heap_bot= &block.frame[PAGE_NEW_SUPREMUM_END];
2586   byte *heap_top= block.frame + mach_read_from_2(page_heap_top);
2587   if (UNIV_UNLIKELY(heap_bot > heap_top || heap_top > last_slot))
2588     goto corrupted;
2589   if (UNIV_UNLIKELY(mach_read_from_2(last_slot) != PAGE_NEW_SUPREMUM))
2590     goto corrupted;
2591   if (UNIV_UNLIKELY(mach_read_from_2(page_dir_get_nth_slot(block.frame, 0)) !=
2592                                      PAGE_NEW_INFIMUM))
2593     goto corrupted;
2594 
2595   uint16_t n= static_cast<uint16_t>(PAGE_NEW_INFIMUM + prev);
2596   rec_t *prev_rec= block.frame + n;
2597   n= static_cast<uint16_t>(n + mach_read_from_2(prev_rec - REC_NEXT));
2598   if (!prev);
2599   else if (UNIV_UNLIKELY(heap_bot + REC_N_NEW_EXTRA_BYTES > prev_rec ||
2600                          prev_rec > heap_top))
2601     goto corrupted;
2602 
2603   rec_t * const next_rec= block.frame + n;
2604   if (next_rec == block.frame + PAGE_NEW_SUPREMUM);
2605   else if (UNIV_UNLIKELY(heap_bot + REC_N_NEW_EXTRA_BYTES > next_rec ||
2606                          next_rec > heap_top))
2607     goto corrupted;
2608 
2609   ulint n_owned;
2610   rec_t *owner_rec= next_rec;
2611   n= static_cast<uint16_t>(next_rec - block.frame);
2612 
2613   for (ulint ns= PAGE_DIR_SLOT_MAX_N_OWNED;
2614        !(n_owned= rec_get_n_owned_new(owner_rec)); )
2615   {
2616     n= static_cast<uint16_t>(n + mach_read_from_2(owner_rec - REC_NEXT));
2617     owner_rec= block.frame + n;
2618     if (n == PAGE_NEW_SUPREMUM);
2619     else if (UNIV_UNLIKELY(heap_bot + REC_N_NEW_EXTRA_BYTES > owner_rec ||
2620                            owner_rec > heap_top))
2621       goto corrupted;
2622     if (!ns--)
2623       goto corrupted; /* Corrupted (cyclic?) next-record list */
2624   }
2625 
2626   page_dir_slot_t* owner_slot= last_slot;
2627 
2628   if (n_owned > PAGE_DIR_SLOT_MAX_N_OWNED)
2629     goto corrupted;
2630   else
2631   {
2632     static_assert(PAGE_DIR_SLOT_SIZE == 2, "compatibility");
2633     alignas(2) byte slot_buf[2];
2634     mach_write_to_2(slot_buf, owner_rec - block.frame);
2635     const page_dir_slot_t * const first_slot=
2636       page_dir_get_nth_slot(block.frame, 0);
2637 
2638     while (memcmp_aligned<2>(owner_slot, slot_buf, 2))
2639       if ((owner_slot+= 2) == first_slot)
2640         goto corrupted;
2641   }
2642 
2643   const ulint extra_size= REC_N_NEW_EXTRA_BYTES + hdr_c + (enc_hdr_l >> 3);
2644   const ulint data_size= data_c + data_len - (enc_hdr_l >> 3);
2645 
2646   /* Perform final consistency checks and then apply the change to the page. */
2647   byte *buf;
2648   if (reuse)
2649   {
2650     byte *page_free= my_assume_aligned<2>(PAGE_FREE + PAGE_HEADER +
2651                                           block.frame);
2652     rec_t *free_rec= block.frame + mach_read_from_2(page_free);
2653     if (UNIV_UNLIKELY(heap_bot + REC_N_NEW_EXTRA_BYTES > free_rec ||
2654                       free_rec > heap_top))
2655       goto corrupted;
2656     buf= free_rec - extra_size;
2657     if (shift & 1)
2658       buf-= shift >> 1;
2659     else
2660       buf+= shift >> 1;
2661 
2662     if (UNIV_UNLIKELY(heap_bot > buf ||
2663                       &buf[extra_size + data_size] > heap_top))
2664       goto corrupted;
2665     byte *page_garbage= my_assume_aligned<2>(page_free + 2);
2666     if (UNIV_UNLIKELY(mach_read_from_2(page_garbage) < extra_size + data_size))
2667       goto corrupted;
2668     if ((n= mach_read_from_2(free_rec - REC_NEXT)) != 0)
2669     {
2670       n= static_cast<uint16_t>(n + free_rec - block.frame);
2671       if (UNIV_UNLIKELY(n < PAGE_NEW_SUPREMUM_END + REC_N_NEW_EXTRA_BYTES ||
2672                         heap_top < block.frame + n))
2673         goto corrupted;
2674     }
2675     mach_write_to_2(page_free, n);
2676     mach_write_to_2(page_garbage, mach_read_from_2(page_garbage) -
2677                     (extra_size + data_size));
2678     h= rec_get_heap_no_new(free_rec);
2679   }
2680   else
2681   {
2682     if (UNIV_UNLIKELY(heap_top + extra_size + data_size > last_slot))
2683       goto corrupted;
2684     mach_write_to_2(page_n_heap, h + 1);
2685     h&= 0x7fff;
2686     mach_write_to_2(page_heap_top,
2687                     mach_read_from_2(page_heap_top) + extra_size + data_size);
2688     buf= heap_top;
2689   }
2690 
2691   memcpy(buf, data, (enc_hdr_l >> 3));
2692   buf+= enc_hdr_l >> 3;
2693   data_len-= enc_hdr_l >> 3;
2694   data= &static_cast<const byte*>(data)[enc_hdr_l >> 3];
2695 
2696   memcpy(buf, prev_rec - REC_N_NEW_EXTRA_BYTES - hdr_c, hdr_c);
2697   buf+= hdr_c;
2698   *buf++= static_cast<byte>((enc_hdr_l & 3) << 4); /* info_bits; n_owned=0 */
2699   *buf++= static_cast<byte>(h >> 5); /* MSB of heap number */
2700   h= (h & ((1U << 5) - 1)) << 3;
2701   static_assert(REC_STATUS_ORDINARY == 0, "compatibility");
2702   static_assert(REC_STATUS_INSTANT == 4, "compatibility");
2703   if (page_is_leaf(block.frame))
2704     h|= enc_hdr_l & REC_STATUS_INSTANT;
2705   else
2706   {
2707     ut_ad(!(enc_hdr_l & REC_STATUS_INSTANT)); /* Checked at the start */
2708     h|= REC_STATUS_NODE_PTR;
2709   }
2710   *buf++= static_cast<byte>(h); /* LSB of heap number, and status */
2711   static_assert(REC_NEXT == 2, "compatibility");
2712   buf+= REC_NEXT;
2713   mach_write_to_2(buf - REC_NEXT, static_cast<uint16_t>(next_rec - buf));
2714   byte *page_last_insert= my_assume_aligned<2>(PAGE_LAST_INSERT + PAGE_HEADER +
2715                                                block.frame);
2716   const uint16_t last_insert= mach_read_from_2(page_last_insert);
2717   mach_write_to_2(page_last_insert, buf - block.frame);
2718   mach_write_to_2(prev_rec - REC_NEXT, static_cast<uint16_t>(buf - prev_rec));
2719   memcpy(buf, prev_rec, data_c);
2720   buf+= data_c;
2721   memcpy(buf, data, data_len);
2722 
2723   rec_set_bit_field_1(owner_rec, n_owned + 1, REC_NEW_N_OWNED,
2724                       REC_N_OWNED_MASK, REC_N_OWNED_SHIFT);
2725 
2726   /* Update PAGE_DIRECTION_B, PAGE_N_DIRECTION if needed */
2727   if (block.frame[FIL_PAGE_TYPE + 1] != byte(FIL_PAGE_RTREE))
2728   {
2729     byte *dir= &block.frame[PAGE_DIRECTION_B + PAGE_HEADER];
2730     byte *n_dir= my_assume_aligned<2>
2731       (&block.frame[PAGE_N_DIRECTION + PAGE_HEADER]);
2732     if (UNIV_UNLIKELY(!last_insert))
2733     {
2734 no_direction:
2735       *dir= static_cast<byte>((*dir & ~((1U << 3) - 1)) | PAGE_NO_DIRECTION);
2736       memset(n_dir, 0, 2);
2737     }
2738     else if (block.frame + last_insert == prev_rec &&
2739              (*dir & ((1U << 3) - 1)) != PAGE_LEFT)
2740     {
2741       *dir= static_cast<byte>((*dir & ~((1U << 3) - 1)) | PAGE_RIGHT);
2742 inc_dir:
2743       mach_write_to_2(n_dir, mach_read_from_2(n_dir) + 1);
2744     }
2745     else if (next_rec == block.frame + last_insert &&
2746              (*dir & ((1U << 3) - 1)) != PAGE_RIGHT)
2747     {
2748       *dir= static_cast<byte>((*dir & ~((1U << 3) - 1)) | PAGE_LEFT);
2749       goto inc_dir;
2750     }
2751     else
2752       goto no_direction;
2753   }
2754 
2755   /* Update PAGE_N_RECS. */
2756   byte *page_n_recs= my_assume_aligned<2>(PAGE_N_RECS + PAGE_HEADER +
2757                                           block.frame);
2758 
2759   mach_write_to_2(page_n_recs, mach_read_from_2(page_n_recs) + 1);
2760 
2761   if (UNIV_UNLIKELY(n_owned == PAGE_DIR_SLOT_MAX_N_OWNED))
2762     page_dir_split_slot(block, owner_slot);
2763   ut_ad(page_simple_validate_new(block.frame));
2764   return false;
2765 }
2766 
2767 /** Apply a DELETE_ROW_FORMAT_REDUNDANT record that was written by
2768 page_cur_delete_rec() for a ROW_FORMAT=REDUNDANT page.
2769 @param block    B-tree or R-tree page in ROW_FORMAT=REDUNDANT
2770 @param prev     byte offset of the predecessor, relative to PAGE_OLD_INFIMUM
2771 @return whether the operation failed (inconcistency was noticed) */
page_apply_delete_redundant(const buf_block_t & block,ulint prev)2772 bool page_apply_delete_redundant(const buf_block_t &block, ulint prev)
2773 {
2774   const uint16_t n_slots= page_dir_get_n_slots(block.frame);
2775   ulint n_recs= page_get_n_recs(block.frame);
2776   const page_id_t id(block.page.id());
2777 
2778   if (UNIV_UNLIKELY(!n_recs || n_slots < 2 ||
2779                     !fil_page_index_page_check(block.frame) ||
2780                     page_get_page_no(block.frame) != id.page_no() ||
2781                     mach_read_from_2(my_assume_aligned<2>
2782                                      (PAGE_OLD_SUPREMUM - REC_NEXT +
2783                                       block.frame)) ||
2784                     page_is_comp(block.frame)))
2785   {
2786 corrupted:
2787     ib::error() << "Not applying DELETE_ROW_FORMAT_REDUNDANT"
2788                    " due to corruption on " << id;
2789     return true;
2790   }
2791 
2792   byte *slot= page_dir_get_nth_slot(block.frame, n_slots - 1);
2793   rec_t *prev_rec= block.frame + PAGE_OLD_INFIMUM + prev;
2794   if (UNIV_UNLIKELY(prev_rec > slot))
2795     goto corrupted;
2796   uint16_t n= mach_read_from_2(prev_rec - REC_NEXT);
2797   rec_t *rec= block.frame + n;
2798   if (UNIV_UNLIKELY(n < PAGE_OLD_SUPREMUM_END + REC_N_OLD_EXTRA_BYTES ||
2799                     slot < rec))
2800     goto corrupted;
2801   const ulint extra_size= REC_N_OLD_EXTRA_BYTES + rec_get_n_fields_old(rec) *
2802     (rec_get_1byte_offs_flag(rec) ? 1 : 2);
2803   const ulint data_size= rec_get_data_size_old(rec);
2804   if (UNIV_UNLIKELY(n < PAGE_OLD_SUPREMUM_END + extra_size ||
2805                     slot < rec + data_size))
2806     goto corrupted;
2807 
2808   n= mach_read_from_2(rec - REC_NEXT);
2809   rec_t *next= block.frame + n;
2810   if (n == PAGE_OLD_SUPREMUM);
2811   else if (UNIV_UNLIKELY(n < PAGE_OLD_SUPREMUM_END + REC_N_OLD_EXTRA_BYTES ||
2812                          slot < next))
2813     goto corrupted;
2814 
2815   rec_t *s= rec;
2816   ulint slot_owned;
2817   for (ulint i= n_recs; !(slot_owned= rec_get_n_owned_old(s)); )
2818   {
2819     n= mach_read_from_2(s - REC_NEXT);
2820     s= block.frame + n;
2821     if (n == PAGE_OLD_SUPREMUM);
2822     else if (UNIV_UNLIKELY(n < PAGE_OLD_SUPREMUM_END + REC_N_OLD_EXTRA_BYTES ||
2823                            slot < s))
2824       goto corrupted;
2825     if (UNIV_UNLIKELY(!i--)) /* Corrupted (cyclic?) next-record list */
2826       goto corrupted;
2827   }
2828   slot_owned--;
2829 
2830   /* The first slot is always pointing to the infimum record.
2831   Find the directory slot pointing to s. */
2832   const byte * const first_slot= block.frame + srv_page_size - (PAGE_DIR + 2);
2833   alignas(2) byte slot_offs[2];
2834   mach_write_to_2(slot_offs, s - block.frame);
2835   static_assert(PAGE_DIR_SLOT_SIZE == 2, "compatibility");
2836 
2837   while (memcmp_aligned<2>(slot, slot_offs, 2))
2838     if ((slot+= 2) == first_slot)
2839       goto corrupted;
2840 
2841   if (rec == s)
2842   {
2843     s= prev_rec;
2844     mach_write_to_2(slot, s - block.frame);
2845   }
2846 
2847   memcpy(prev_rec - REC_NEXT, rec - REC_NEXT, 2);
2848   s-= REC_OLD_N_OWNED;
2849   *s= static_cast<byte>((*s & ~REC_N_OWNED_MASK) |
2850                         slot_owned << REC_N_OWNED_SHIFT);
2851   page_mem_free(block, rec, data_size, extra_size);
2852 
2853   if (slot_owned < PAGE_DIR_SLOT_MIN_N_OWNED)
2854     page_dir_balance_slot(block, (first_slot - slot) / 2);
2855 
2856   ut_ad(page_simple_validate_old(block.frame));
2857   return false;
2858 }
2859 
2860 /** Apply a DELETE_ROW_FORMAT_DYNAMIC record that was written by
2861 page_cur_delete_rec() for a ROW_FORMAT=COMPACT or DYNAMIC page.
2862 @param block      B-tree or R-tree page in ROW_FORMAT=COMPACT or DYNAMIC
2863 @param prev       byte offset of the predecessor, relative to PAGE_NEW_INFIMUM
2864 @param hdr_size   record header size, excluding REC_N_NEW_EXTRA_BYTES
2865 @param data_size  data payload size, in bytes
2866 @return whether the operation failed (inconcistency was noticed) */
page_apply_delete_dynamic(const buf_block_t & block,ulint prev,size_t hdr_size,size_t data_size)2867 bool page_apply_delete_dynamic(const buf_block_t &block, ulint prev,
2868                                size_t hdr_size, size_t data_size)
2869 {
2870   const uint16_t n_slots= page_dir_get_n_slots(block.frame);
2871   ulint n_recs= page_get_n_recs(block.frame);
2872   const page_id_t id(block.page.id());
2873 
2874   if (UNIV_UNLIKELY(!n_recs || n_slots < 2 ||
2875                     !fil_page_index_page_check(block.frame) ||
2876                     page_get_page_no(block.frame) != id.page_no() ||
2877                     mach_read_from_2(my_assume_aligned<2>
2878                                      (PAGE_NEW_SUPREMUM - REC_NEXT +
2879                                       block.frame)) ||
2880                     !page_is_comp(block.frame)))
2881   {
2882 corrupted:
2883     ib::error() << "Not applying DELETE_ROW_FORMAT_DYNAMIC"
2884                    " due to corruption on " << id;
2885     return true;
2886   }
2887 
2888   byte *slot= page_dir_get_nth_slot(block.frame, n_slots - 1);
2889   uint16_t n= static_cast<uint16_t>(PAGE_NEW_INFIMUM + prev);
2890   rec_t *prev_rec= block.frame + n;
2891   if (UNIV_UNLIKELY(prev_rec > slot))
2892     goto corrupted;
2893   n= static_cast<uint16_t>(n + mach_read_from_2(prev_rec - REC_NEXT));
2894   rec_t *rec= block.frame + n;
2895   if (UNIV_UNLIKELY(n < PAGE_NEW_SUPREMUM_END + REC_N_NEW_EXTRA_BYTES ||
2896                     slot < rec))
2897     goto corrupted;
2898   const ulint extra_size= REC_N_NEW_EXTRA_BYTES + hdr_size;
2899   if (UNIV_UNLIKELY(n < PAGE_NEW_SUPREMUM_END + extra_size ||
2900                     slot < rec + data_size))
2901     goto corrupted;
2902   n= static_cast<uint16_t>(n + mach_read_from_2(rec - REC_NEXT));
2903   rec_t *next= block.frame + n;
2904   if (n == PAGE_NEW_SUPREMUM);
2905   else if (UNIV_UNLIKELY(n < PAGE_NEW_SUPREMUM_END + REC_N_NEW_EXTRA_BYTES ||
2906                          slot < next))
2907     goto corrupted;
2908 
2909   rec_t *s= rec;
2910   n= static_cast<uint16_t>(rec - block.frame);
2911   ulint slot_owned;
2912   for (ulint i= n_recs; !(slot_owned= rec_get_n_owned_new(s)); )
2913   {
2914     const uint16_t next= mach_read_from_2(s - REC_NEXT);
2915     if (UNIV_UNLIKELY(next < REC_N_NEW_EXTRA_BYTES ||
2916                       next > static_cast<uint16_t>(-REC_N_NEW_EXTRA_BYTES)))
2917       goto corrupted;
2918     n= static_cast<uint16_t>(n + next);
2919     s= block.frame + n;
2920     if (n == PAGE_NEW_SUPREMUM);
2921     else if (UNIV_UNLIKELY(n < PAGE_NEW_SUPREMUM_END + REC_N_NEW_EXTRA_BYTES ||
2922                            slot < s))
2923       goto corrupted;
2924     if (UNIV_UNLIKELY(!i--)) /* Corrupted (cyclic?) next-record list */
2925       goto corrupted;
2926   }
2927   slot_owned--;
2928 
2929   /* The first slot is always pointing to the infimum record.
2930   Find the directory slot pointing to s. */
2931   const byte * const first_slot= block.frame + srv_page_size - (PAGE_DIR + 2);
2932   alignas(2) byte slot_offs[2];
2933   mach_write_to_2(slot_offs, s - block.frame);
2934   static_assert(PAGE_DIR_SLOT_SIZE == 2, "compatibility");
2935 
2936   while (memcmp_aligned<2>(slot, slot_offs, 2))
2937     if ((slot+= 2) == first_slot)
2938       goto corrupted;
2939 
2940   if (rec == s)
2941   {
2942     s= prev_rec;
2943     mach_write_to_2(slot, s - block.frame);
2944   }
2945 
2946   mach_write_to_2(prev_rec - REC_NEXT, static_cast<uint16_t>(next - prev_rec));
2947   s-= REC_NEW_N_OWNED;
2948   *s= static_cast<byte>((*s & ~REC_N_OWNED_MASK) |
2949                         slot_owned << REC_N_OWNED_SHIFT);
2950   page_mem_free(block, rec, data_size, extra_size);
2951 
2952   if (slot_owned < PAGE_DIR_SLOT_MIN_N_OWNED)
2953     page_dir_balance_slot(block, (first_slot - slot) / 2);
2954 
2955   ut_ad(page_simple_validate_new(block.frame));
2956   return false;
2957 }
2958 
2959 #ifdef UNIV_COMPILE_TEST_FUNCS
2960 
2961 /*******************************************************************//**
2962 Print the first n numbers, generated by ut_rnd_gen() to make sure
2963 (visually) that it works properly. */
2964 void
test_ut_rnd_gen(int n)2965 test_ut_rnd_gen(
2966 	int	n)	/*!< in: print first n numbers */
2967 {
2968 	int			i;
2969 	unsigned long long	rnd;
2970 
2971 	for (i = 0; i < n; i++) {
2972 		rnd = ut_rnd_gen();
2973 		printf("%llu\t%%2=%llu %%3=%llu %%5=%llu %%7=%llu %%11=%llu\n",
2974 		       rnd,
2975 		       rnd % 2,
2976 		       rnd % 3,
2977 		       rnd % 5,
2978 		       rnd % 7,
2979 		       rnd % 11);
2980 	}
2981 }
2982 
2983 #endif /* UNIV_COMPILE_TEST_FUNCS */
2984