1 /*****************************************************************************
2 
3 Copyright (c) 1994, 2016, Oracle and/or its affiliates. All Rights Reserved.
4 Copyright (c) 2012, Facebook Inc.
5 Copyright (c) 2018, 2021, MariaDB Corporation.
6 
7 This program is free software; you can redistribute it and/or modify it under
8 the terms of the GNU General Public License as published by the Free Software
9 Foundation; version 2 of the License.
10 
11 This program is distributed in the hope that it will be useful, but WITHOUT
12 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
13 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
14 
15 You should have received a copy of the GNU General Public License along with
16 this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
18 
19 *****************************************************************************/
20 
21 /********************************************************************//**
22 @file page/page0cur.cc
23 The page cursor
24 
25 Created 10/4/1994 Heikki Tuuri
26 *************************************************************************/
27 
28 #include "page0cur.h"
29 #include "page0zip.h"
30 #include "btr0btr.h"
31 #include "mtr0log.h"
32 #include "log0recv.h"
33 #include "rem0cmp.h"
34 #include "gis0rtree.h"
35 
36 #include <algorithm>
37 
38 #ifdef BTR_CUR_HASH_ADAPT
39 # ifdef UNIV_SEARCH_PERF_STAT
40 static ulint	page_cur_short_succ;
41 # endif /* UNIV_SEARCH_PERF_STAT */
42 
43 /** Try a search shortcut based on the last insert.
44 @param[in]	block			index page
45 @param[in]	index			index tree
46 @param[in]	tuple			search key
47 @param[in,out]	iup_matched_fields	already matched fields in the
48 upper limit record
49 @param[in,out]	ilow_matched_fields	already matched fields in the
50 lower limit record
51 @param[out]	cursor			page cursor
52 @return true on success */
53 UNIV_INLINE
54 bool
page_cur_try_search_shortcut(const buf_block_t * block,const dict_index_t * index,const dtuple_t * tuple,ulint * iup_matched_fields,ulint * ilow_matched_fields,page_cur_t * cursor)55 page_cur_try_search_shortcut(
56 	const buf_block_t*	block,
57 	const dict_index_t*	index,
58 	const dtuple_t*		tuple,
59 	ulint*			iup_matched_fields,
60 	ulint*			ilow_matched_fields,
61 	page_cur_t*		cursor)
62 {
63 	const rec_t*	rec;
64 	const rec_t*	next_rec;
65 	ulint		low_match;
66 	ulint		up_match;
67 	ibool		success		= FALSE;
68 	const page_t*	page		= buf_block_get_frame(block);
69 	mem_heap_t*	heap		= NULL;
70 	rec_offs	offsets_[REC_OFFS_NORMAL_SIZE];
71 	rec_offs*	offsets		= offsets_;
72 	rec_offs_init(offsets_);
73 
74 	ut_ad(dtuple_check_typed(tuple));
75 	ut_ad(page_is_leaf(page));
76 
77 	rec = page_header_get_ptr(page, PAGE_LAST_INSERT);
78 	offsets = rec_get_offsets(rec, index, offsets, index->n_core_fields,
79 				  dtuple_get_n_fields(tuple), &heap);
80 
81 	ut_ad(rec);
82 	ut_ad(page_rec_is_user_rec(rec));
83 
84 	low_match = up_match = std::min(*ilow_matched_fields,
85 					*iup_matched_fields);
86 
87 	if (cmp_dtuple_rec_with_match(tuple, rec, offsets, &low_match) < 0) {
88 		goto exit_func;
89 	}
90 
91 	next_rec = page_rec_get_next_const(rec);
92 	if (!page_rec_is_supremum(next_rec)) {
93 		offsets = rec_get_offsets(next_rec, index, offsets,
94 					  index->n_core_fields,
95 					  dtuple_get_n_fields(tuple), &heap);
96 
97 		if (cmp_dtuple_rec_with_match(tuple, next_rec, offsets,
98 					      &up_match) >= 0) {
99 			goto exit_func;
100 		}
101 
102 		*iup_matched_fields = up_match;
103 	}
104 
105 	page_cur_position(rec, block, cursor);
106 
107 	*ilow_matched_fields = low_match;
108 
109 #ifdef UNIV_SEARCH_PERF_STAT
110 	page_cur_short_succ++;
111 #endif
112 	success = TRUE;
113 exit_func:
114 	if (UNIV_LIKELY_NULL(heap)) {
115 		mem_heap_free(heap);
116 	}
117 	return(success);
118 }
119 
120 /** Try a search shortcut based on the last insert.
121 @param[in]	block			index page
122 @param[in]	index			index tree
123 @param[in]	tuple			search key
124 @param[in,out]	iup_matched_fields	already matched fields in the
125 upper limit record
126 @param[in,out]	iup_matched_bytes	already matched bytes in the
127 first partially matched field in the upper limit record
128 @param[in,out]	ilow_matched_fields	already matched fields in the
129 lower limit record
130 @param[in,out]	ilow_matched_bytes	already matched bytes in the
131 first partially matched field in the lower limit record
132 @param[out]	cursor			page cursor
133 @return true on success */
134 UNIV_INLINE
135 bool
page_cur_try_search_shortcut_bytes(const buf_block_t * block,const dict_index_t * index,const dtuple_t * tuple,ulint * iup_matched_fields,ulint * iup_matched_bytes,ulint * ilow_matched_fields,ulint * ilow_matched_bytes,page_cur_t * cursor)136 page_cur_try_search_shortcut_bytes(
137 	const buf_block_t*	block,
138 	const dict_index_t*	index,
139 	const dtuple_t*		tuple,
140 	ulint*			iup_matched_fields,
141 	ulint*			iup_matched_bytes,
142 	ulint*			ilow_matched_fields,
143 	ulint*			ilow_matched_bytes,
144 	page_cur_t*		cursor)
145 {
146 	const rec_t*	rec;
147 	const rec_t*	next_rec;
148 	ulint		low_match;
149 	ulint		low_bytes;
150 	ulint		up_match;
151 	ulint		up_bytes;
152 	ibool		success		= FALSE;
153 	const page_t*	page		= buf_block_get_frame(block);
154 	mem_heap_t*	heap		= NULL;
155 	rec_offs	offsets_[REC_OFFS_NORMAL_SIZE];
156 	rec_offs*	offsets		= offsets_;
157 	rec_offs_init(offsets_);
158 
159 	ut_ad(dtuple_check_typed(tuple));
160 	ut_ad(page_is_leaf(page));
161 
162 	rec = page_header_get_ptr(page, PAGE_LAST_INSERT);
163 	offsets = rec_get_offsets(rec, index, offsets, index->n_core_fields,
164 				  dtuple_get_n_fields(tuple), &heap);
165 
166 	ut_ad(rec);
167 	ut_ad(page_rec_is_user_rec(rec));
168 	if (ut_pair_cmp(*ilow_matched_fields, *ilow_matched_bytes,
169 			*iup_matched_fields, *iup_matched_bytes) < 0) {
170 		up_match = low_match = *ilow_matched_fields;
171 		up_bytes = low_bytes = *ilow_matched_bytes;
172 	} else {
173 		up_match = low_match = *iup_matched_fields;
174 		up_bytes = low_bytes = *iup_matched_bytes;
175 	}
176 
177 	if (cmp_dtuple_rec_with_match_bytes(
178 		    tuple, rec, index, offsets, &low_match, &low_bytes) < 0) {
179 		goto exit_func;
180 	}
181 
182 	next_rec = page_rec_get_next_const(rec);
183 	if (!page_rec_is_supremum(next_rec)) {
184 		offsets = rec_get_offsets(next_rec, index, offsets,
185 					  index->n_core_fields,
186 					  dtuple_get_n_fields(tuple), &heap);
187 
188 		if (cmp_dtuple_rec_with_match_bytes(
189 			    tuple, next_rec, index, offsets,
190 			    &up_match, &up_bytes)
191 		    >= 0) {
192 			goto exit_func;
193 		}
194 
195 		*iup_matched_fields = up_match;
196 		*iup_matched_bytes = up_bytes;
197 	}
198 
199 	page_cur_position(rec, block, cursor);
200 
201 	*ilow_matched_fields = low_match;
202 	*ilow_matched_bytes = low_bytes;
203 
204 #ifdef UNIV_SEARCH_PERF_STAT
205 	page_cur_short_succ++;
206 #endif
207 	success = TRUE;
208 exit_func:
209 	if (UNIV_LIKELY_NULL(heap)) {
210 		mem_heap_free(heap);
211 	}
212 	return(success);
213 }
214 #endif /* BTR_CUR_HASH_ADAPT */
215 
216 #ifdef PAGE_CUR_LE_OR_EXTENDS
217 /****************************************************************//**
218 Checks if the nth field in a record is a character type field which extends
219 the nth field in tuple, i.e., the field is longer or equal in length and has
220 common first characters.
221 @return TRUE if rec field extends tuple field */
222 static
223 ibool
page_cur_rec_field_extends(const dtuple_t * tuple,const rec_t * rec,const rec_offs * offsets,ulint n)224 page_cur_rec_field_extends(
225 /*=======================*/
226 	const dtuple_t*	tuple,	/*!< in: data tuple */
227 	const rec_t*	rec,	/*!< in: record */
228 	const rec_offs*	offsets,/*!< in: array returned by rec_get_offsets() */
229 	ulint		n)	/*!< in: compare nth field */
230 {
231 	const dtype_t*	type;
232 	const dfield_t*	dfield;
233 	const byte*	rec_f;
234 	ulint		rec_f_len;
235 
236 	ut_ad(rec_offs_validate(rec, NULL, offsets));
237 	dfield = dtuple_get_nth_field(tuple, n);
238 
239 	type = dfield_get_type(dfield);
240 
241 	rec_f = rec_get_nth_field(rec, offsets, n, &rec_f_len);
242 
243 	if (type->mtype == DATA_VARCHAR
244 	    || type->mtype == DATA_CHAR
245 	    || type->mtype == DATA_FIXBINARY
246 	    || type->mtype == DATA_BINARY
247 	    || type->mtype == DATA_BLOB
248 	    || DATA_GEOMETRY_MTYPE(type->mtype)
249 	    || type->mtype == DATA_VARMYSQL
250 	    || type->mtype == DATA_MYSQL) {
251 
252 		if (dfield_get_len(dfield) != UNIV_SQL_NULL
253 		    && rec_f_len != UNIV_SQL_NULL
254 		    && rec_f_len >= dfield_get_len(dfield)
255 		    && !cmp_data_data(type->mtype, type->prtype,
256 				      dfield_get_data(dfield),
257 				      dfield_get_len(dfield),
258 				      rec_f, dfield_get_len(dfield))) {
259 
260 			return(TRUE);
261 		}
262 	}
263 
264 	return(FALSE);
265 }
266 #endif /* PAGE_CUR_LE_OR_EXTENDS */
267 
268 /****************************************************************//**
269 Searches the right position for a page cursor. */
270 void
page_cur_search_with_match(const buf_block_t * block,const dict_index_t * index,const dtuple_t * tuple,page_cur_mode_t mode,ulint * iup_matched_fields,ulint * ilow_matched_fields,page_cur_t * cursor,rtr_info_t * rtr_info)271 page_cur_search_with_match(
272 /*=======================*/
273 	const buf_block_t*	block,	/*!< in: buffer block */
274 	const dict_index_t*	index,	/*!< in/out: record descriptor */
275 	const dtuple_t*		tuple,	/*!< in: data tuple */
276 	page_cur_mode_t		mode,	/*!< in: PAGE_CUR_L,
277 					PAGE_CUR_LE, PAGE_CUR_G, or
278 					PAGE_CUR_GE */
279 	ulint*			iup_matched_fields,
280 					/*!< in/out: already matched
281 					fields in upper limit record */
282 	ulint*			ilow_matched_fields,
283 					/*!< in/out: already matched
284 					fields in lower limit record */
285 	page_cur_t*		cursor,	/*!< out: page cursor */
286 	rtr_info_t*		rtr_info)/*!< in/out: rtree search stack */
287 {
288 	ulint		up;
289 	ulint		low;
290 	ulint		mid;
291 	const page_t*	page;
292 	const page_dir_slot_t* slot;
293 	const rec_t*	up_rec;
294 	const rec_t*	low_rec;
295 	const rec_t*	mid_rec;
296 	ulint		up_matched_fields;
297 	ulint		low_matched_fields;
298 	ulint		cur_matched_fields;
299 	int		cmp;
300 #ifdef UNIV_ZIP_DEBUG
301 	const page_zip_des_t*	page_zip = buf_block_get_page_zip(block);
302 #endif /* UNIV_ZIP_DEBUG */
303 	mem_heap_t*	heap		= NULL;
304 	rec_offs	offsets_[REC_OFFS_NORMAL_SIZE];
305 	rec_offs*	offsets		= offsets_;
306 	rec_offs_init(offsets_);
307 
308 	ut_ad(dtuple_validate(tuple));
309 #ifdef UNIV_DEBUG
310 # ifdef PAGE_CUR_DBG
311 	if (mode != PAGE_CUR_DBG)
312 # endif /* PAGE_CUR_DBG */
313 # ifdef PAGE_CUR_LE_OR_EXTENDS
314 		if (mode != PAGE_CUR_LE_OR_EXTENDS)
315 # endif /* PAGE_CUR_LE_OR_EXTENDS */
316 			ut_ad(mode == PAGE_CUR_L || mode == PAGE_CUR_LE
317 			      || mode == PAGE_CUR_G || mode == PAGE_CUR_GE
318 			      || dict_index_is_spatial(index));
319 #endif /* UNIV_DEBUG */
320 	page = buf_block_get_frame(block);
321 #ifdef UNIV_ZIP_DEBUG
322 	ut_a(!page_zip || page_zip_validate(page_zip, page, index));
323 #endif /* UNIV_ZIP_DEBUG */
324 
325 	ut_d(page_check_dir(page));
326 	const ulint n_core = page_is_leaf(page) ? index->n_core_fields : 0;
327 
328 #ifdef BTR_CUR_HASH_ADAPT
329 	if (n_core
330 	    && page_get_direction(page) == PAGE_RIGHT
331 	    && page_header_get_offs(page, PAGE_LAST_INSERT)
332 	    && mode == PAGE_CUR_LE
333 	    && !index->is_spatial()
334 	    && page_header_get_field(page, PAGE_N_DIRECTION) > 3
335 	    && page_cur_try_search_shortcut(
336 		    block, index, tuple,
337 		    iup_matched_fields, ilow_matched_fields, cursor)) {
338 		return;
339 	}
340 # ifdef PAGE_CUR_DBG
341 	if (mode == PAGE_CUR_DBG) {
342 		mode = PAGE_CUR_LE;
343 	}
344 # endif
345 #endif /* BTR_CUR_HASH_ADAPT */
346 
347 	/* If the mode is for R-tree indexes, use the special MBR
348 	related compare functions */
349 	if (index->is_spatial() && mode > PAGE_CUR_LE) {
350 		/* For leaf level insert, we still use the traditional
351 		compare function for now */
352 		if (mode == PAGE_CUR_RTREE_INSERT && n_core) {
353 			mode = PAGE_CUR_LE;
354 		} else {
355 			rtr_cur_search_with_match(
356 				block, (dict_index_t*)index, tuple, mode,
357 				cursor, rtr_info);
358 			return;
359 		}
360 	}
361 
362 	/* The following flag does not work for non-latin1 char sets because
363 	cmp_full_field does not tell how many bytes matched */
364 #ifdef PAGE_CUR_LE_OR_EXTENDS
365 	ut_a(mode != PAGE_CUR_LE_OR_EXTENDS);
366 #endif /* PAGE_CUR_LE_OR_EXTENDS */
367 
368 	/* If mode PAGE_CUR_G is specified, we are trying to position the
369 	cursor to answer a query of the form "tuple < X", where tuple is
370 	the input parameter, and X denotes an arbitrary physical record on
371 	the page. We want to position the cursor on the first X which
372 	satisfies the condition. */
373 
374 	up_matched_fields  = *iup_matched_fields;
375 	low_matched_fields = *ilow_matched_fields;
376 
377 	/* Perform binary search. First the search is done through the page
378 	directory, after that as a linear search in the list of records
379 	owned by the upper limit directory slot. */
380 
381 	low = 0;
382 	up = ulint(page_dir_get_n_slots(page)) - 1;
383 
384 	/* Perform binary search until the lower and upper limit directory
385 	slots come to the distance 1 of each other */
386 
387 	while (up - low > 1) {
388 		mid = (low + up) / 2;
389 		slot = page_dir_get_nth_slot(page, mid);
390 		mid_rec = page_dir_slot_get_rec(slot);
391 
392 		cur_matched_fields = std::min(low_matched_fields,
393 					      up_matched_fields);
394 
395 		offsets = offsets_;
396 		offsets = rec_get_offsets(
397 			mid_rec, index, offsets, n_core,
398 			dtuple_get_n_fields_cmp(tuple), &heap);
399 
400 		cmp = cmp_dtuple_rec_with_match(
401 			tuple, mid_rec, offsets, &cur_matched_fields);
402 
403 		if (cmp > 0) {
404 low_slot_match:
405 			low = mid;
406 			low_matched_fields = cur_matched_fields;
407 
408 		} else if (cmp) {
409 #ifdef PAGE_CUR_LE_OR_EXTENDS
410 			if (mode == PAGE_CUR_LE_OR_EXTENDS
411 			    && page_cur_rec_field_extends(
412 				    tuple, mid_rec, offsets,
413 				    cur_matched_fields)) {
414 
415 				goto low_slot_match;
416 			}
417 #endif /* PAGE_CUR_LE_OR_EXTENDS */
418 up_slot_match:
419 			up = mid;
420 			up_matched_fields = cur_matched_fields;
421 
422 		} else if (mode == PAGE_CUR_G || mode == PAGE_CUR_LE
423 #ifdef PAGE_CUR_LE_OR_EXTENDS
424 			   || mode == PAGE_CUR_LE_OR_EXTENDS
425 #endif /* PAGE_CUR_LE_OR_EXTENDS */
426 			   ) {
427 			goto low_slot_match;
428 		} else {
429 
430 			goto up_slot_match;
431 		}
432 	}
433 
434 	slot = page_dir_get_nth_slot(page, low);
435 	low_rec = page_dir_slot_get_rec(slot);
436 	slot = page_dir_get_nth_slot(page, up);
437 	up_rec = page_dir_slot_get_rec(slot);
438 
439 	/* Perform linear search until the upper and lower records come to
440 	distance 1 of each other. */
441 
442 	while (page_rec_get_next_const(low_rec) != up_rec) {
443 
444 		mid_rec = page_rec_get_next_const(low_rec);
445 
446 		cur_matched_fields = std::min(low_matched_fields,
447 					      up_matched_fields);
448 
449 		offsets = offsets_;
450 		offsets = rec_get_offsets(
451 			mid_rec, index, offsets, n_core,
452 			dtuple_get_n_fields_cmp(tuple), &heap);
453 
454 		cmp = cmp_dtuple_rec_with_match(
455 			tuple, mid_rec, offsets, &cur_matched_fields);
456 
457 		if (cmp > 0) {
458 low_rec_match:
459 			low_rec = mid_rec;
460 			low_matched_fields = cur_matched_fields;
461 
462 		} else if (cmp) {
463 #ifdef PAGE_CUR_LE_OR_EXTENDS
464 			if (mode == PAGE_CUR_LE_OR_EXTENDS
465 			    && page_cur_rec_field_extends(
466 				    tuple, mid_rec, offsets,
467 				    cur_matched_fields)) {
468 
469 				goto low_rec_match;
470 			}
471 #endif /* PAGE_CUR_LE_OR_EXTENDS */
472 up_rec_match:
473 			up_rec = mid_rec;
474 			up_matched_fields = cur_matched_fields;
475 		} else if (mode == PAGE_CUR_G || mode == PAGE_CUR_LE
476 #ifdef PAGE_CUR_LE_OR_EXTENDS
477 			   || mode == PAGE_CUR_LE_OR_EXTENDS
478 #endif /* PAGE_CUR_LE_OR_EXTENDS */
479 			   ) {
480 			if (!cmp && !cur_matched_fields) {
481 #ifdef UNIV_DEBUG
482 				mtr_t	mtr;
483 				mtr_start(&mtr);
484 
485 				/* We got a match, but cur_matched_fields is
486 				0, it must have REC_INFO_MIN_REC_FLAG */
487 				ulint   rec_info = rec_get_info_bits(mid_rec,
488                                                      rec_offs_comp(offsets));
489 				ut_ad(rec_info & REC_INFO_MIN_REC_FLAG);
490 				ut_ad(!page_has_prev(page));
491 				mtr_commit(&mtr);
492 #endif
493 
494 				cur_matched_fields = dtuple_get_n_fields_cmp(tuple);
495 			}
496 
497 			goto low_rec_match;
498 		} else {
499 
500 			goto up_rec_match;
501 		}
502 	}
503 
504 	if (mode <= PAGE_CUR_GE) {
505 		page_cur_position(up_rec, block, cursor);
506 	} else {
507 		page_cur_position(low_rec, block, cursor);
508 	}
509 
510 	*iup_matched_fields  = up_matched_fields;
511 	*ilow_matched_fields = low_matched_fields;
512 	if (UNIV_LIKELY_NULL(heap)) {
513 		mem_heap_free(heap);
514 	}
515 }
516 
517 #ifdef BTR_CUR_HASH_ADAPT
518 /** Search the right position for a page cursor.
519 @param[in]	block			buffer block
520 @param[in]	index			index tree
521 @param[in]	tuple			key to be searched for
522 @param[in]	mode			search mode
523 @param[in,out]	iup_matched_fields	already matched fields in the
524 upper limit record
525 @param[in,out]	iup_matched_bytes	already matched bytes in the
526 first partially matched field in the upper limit record
527 @param[in,out]	ilow_matched_fields	already matched fields in the
528 lower limit record
529 @param[in,out]	ilow_matched_bytes	already matched bytes in the
530 first partially matched field in the lower limit record
531 @param[out]	cursor			page cursor */
532 void
page_cur_search_with_match_bytes(const buf_block_t * block,const dict_index_t * index,const dtuple_t * tuple,page_cur_mode_t mode,ulint * iup_matched_fields,ulint * iup_matched_bytes,ulint * ilow_matched_fields,ulint * ilow_matched_bytes,page_cur_t * cursor)533 page_cur_search_with_match_bytes(
534 	const buf_block_t*	block,
535 	const dict_index_t*	index,
536 	const dtuple_t*		tuple,
537 	page_cur_mode_t		mode,
538 	ulint*			iup_matched_fields,
539 	ulint*			iup_matched_bytes,
540 	ulint*			ilow_matched_fields,
541 	ulint*			ilow_matched_bytes,
542 	page_cur_t*		cursor)
543 {
544 	ulint		up;
545 	ulint		low;
546 	ulint		mid;
547 	const page_t*	page;
548 	const page_dir_slot_t* slot;
549 	const rec_t*	up_rec;
550 	const rec_t*	low_rec;
551 	const rec_t*	mid_rec;
552 	ulint		up_matched_fields;
553 	ulint		up_matched_bytes;
554 	ulint		low_matched_fields;
555 	ulint		low_matched_bytes;
556 	ulint		cur_matched_fields;
557 	ulint		cur_matched_bytes;
558 	int		cmp;
559 #ifdef UNIV_ZIP_DEBUG
560 	const page_zip_des_t*	page_zip = buf_block_get_page_zip(block);
561 #endif /* UNIV_ZIP_DEBUG */
562 	mem_heap_t*	heap		= NULL;
563 	rec_offs	offsets_[REC_OFFS_NORMAL_SIZE];
564 	rec_offs*	offsets		= offsets_;
565 	rec_offs_init(offsets_);
566 
567 	ut_ad(dtuple_validate(tuple));
568 	ut_ad(!(tuple->info_bits & REC_INFO_MIN_REC_FLAG));
569 #ifdef UNIV_DEBUG
570 # ifdef PAGE_CUR_DBG
571 	if (mode != PAGE_CUR_DBG)
572 # endif /* PAGE_CUR_DBG */
573 # ifdef PAGE_CUR_LE_OR_EXTENDS
574 		if (mode != PAGE_CUR_LE_OR_EXTENDS)
575 # endif /* PAGE_CUR_LE_OR_EXTENDS */
576 			ut_ad(mode == PAGE_CUR_L || mode == PAGE_CUR_LE
577 			      || mode == PAGE_CUR_G || mode == PAGE_CUR_GE);
578 #endif /* UNIV_DEBUG */
579 	page = buf_block_get_frame(block);
580 #ifdef UNIV_ZIP_DEBUG
581 	ut_a(!page_zip || page_zip_validate(page_zip, page, index));
582 #endif /* UNIV_ZIP_DEBUG */
583 
584 	ut_d(page_check_dir(page));
585 
586 #ifdef BTR_CUR_HASH_ADAPT
587 	if (page_is_leaf(page)
588 	    && page_get_direction(page) == PAGE_RIGHT
589 	    && page_header_get_offs(page, PAGE_LAST_INSERT)
590 	    && mode == PAGE_CUR_LE
591 	    && page_header_get_field(page, PAGE_N_DIRECTION) > 3
592 	    && page_cur_try_search_shortcut_bytes(
593 		    block, index, tuple,
594 		    iup_matched_fields, iup_matched_bytes,
595 		    ilow_matched_fields, ilow_matched_bytes,
596 		    cursor)) {
597 		return;
598 	}
599 # ifdef PAGE_CUR_DBG
600 	if (mode == PAGE_CUR_DBG) {
601 		mode = PAGE_CUR_LE;
602 	}
603 # endif
604 #endif /* BTR_CUR_HASH_ADAPT */
605 
606 	/* The following flag does not work for non-latin1 char sets because
607 	cmp_full_field does not tell how many bytes matched */
608 #ifdef PAGE_CUR_LE_OR_EXTENDS
609 	ut_a(mode != PAGE_CUR_LE_OR_EXTENDS);
610 #endif /* PAGE_CUR_LE_OR_EXTENDS */
611 
612 	/* If mode PAGE_CUR_G is specified, we are trying to position the
613 	cursor to answer a query of the form "tuple < X", where tuple is
614 	the input parameter, and X denotes an arbitrary physical record on
615 	the page. We want to position the cursor on the first X which
616 	satisfies the condition. */
617 
618 	up_matched_fields  = *iup_matched_fields;
619 	up_matched_bytes  = *iup_matched_bytes;
620 	low_matched_fields = *ilow_matched_fields;
621 	low_matched_bytes  = *ilow_matched_bytes;
622 
623 	/* Perform binary search. First the search is done through the page
624 	directory, after that as a linear search in the list of records
625 	owned by the upper limit directory slot. */
626 
627 	low = 0;
628 	up = ulint(page_dir_get_n_slots(page)) - 1;
629 
630 	/* Perform binary search until the lower and upper limit directory
631 	slots come to the distance 1 of each other */
632 	const ulint n_core = page_is_leaf(page) ? index->n_core_fields : 0;
633 
634 	while (up - low > 1) {
635 		mid = (low + up) / 2;
636 		slot = page_dir_get_nth_slot(page, mid);
637 		mid_rec = page_dir_slot_get_rec(slot);
638 
639 		ut_pair_min(&cur_matched_fields, &cur_matched_bytes,
640 			    low_matched_fields, low_matched_bytes,
641 			    up_matched_fields, up_matched_bytes);
642 
643 		offsets = rec_get_offsets(
644 			mid_rec, index, offsets_, n_core,
645 			dtuple_get_n_fields_cmp(tuple), &heap);
646 
647 		cmp = cmp_dtuple_rec_with_match_bytes(
648 			tuple, mid_rec, index, offsets,
649 			&cur_matched_fields, &cur_matched_bytes);
650 
651 		if (cmp > 0) {
652 low_slot_match:
653 			low = mid;
654 			low_matched_fields = cur_matched_fields;
655 			low_matched_bytes = cur_matched_bytes;
656 
657 		} else if (cmp) {
658 #ifdef PAGE_CUR_LE_OR_EXTENDS
659 			if (mode == PAGE_CUR_LE_OR_EXTENDS
660 			    && page_cur_rec_field_extends(
661 				    tuple, mid_rec, offsets,
662 				    cur_matched_fields)) {
663 
664 				goto low_slot_match;
665 			}
666 #endif /* PAGE_CUR_LE_OR_EXTENDS */
667 up_slot_match:
668 			up = mid;
669 			up_matched_fields = cur_matched_fields;
670 			up_matched_bytes = cur_matched_bytes;
671 
672 		} else if (mode == PAGE_CUR_G || mode == PAGE_CUR_LE
673 #ifdef PAGE_CUR_LE_OR_EXTENDS
674 			   || mode == PAGE_CUR_LE_OR_EXTENDS
675 #endif /* PAGE_CUR_LE_OR_EXTENDS */
676 			   ) {
677 			goto low_slot_match;
678 		} else {
679 
680 			goto up_slot_match;
681 		}
682 	}
683 
684 	slot = page_dir_get_nth_slot(page, low);
685 	low_rec = page_dir_slot_get_rec(slot);
686 	slot = page_dir_get_nth_slot(page, up);
687 	up_rec = page_dir_slot_get_rec(slot);
688 
689 	/* Perform linear search until the upper and lower records come to
690 	distance 1 of each other. */
691 
692 	while (page_rec_get_next_const(low_rec) != up_rec) {
693 
694 		mid_rec = page_rec_get_next_const(low_rec);
695 
696 		ut_pair_min(&cur_matched_fields, &cur_matched_bytes,
697 			    low_matched_fields, low_matched_bytes,
698 			    up_matched_fields, up_matched_bytes);
699 
700 		if (UNIV_UNLIKELY(rec_get_info_bits(
701 					  mid_rec,
702 					  dict_table_is_comp(index->table))
703 				  & REC_INFO_MIN_REC_FLAG)) {
704 			ut_ad(!page_has_prev(page_align(mid_rec)));
705 			ut_ad(!page_rec_is_leaf(mid_rec)
706 			      || rec_is_metadata(mid_rec, index));
707 			cmp = 1;
708 			goto low_rec_match;
709 		}
710 
711 		offsets = rec_get_offsets(
712 			mid_rec, index, offsets_, n_core,
713 			dtuple_get_n_fields_cmp(tuple), &heap);
714 
715 		cmp = cmp_dtuple_rec_with_match_bytes(
716 			tuple, mid_rec, index, offsets,
717 			&cur_matched_fields, &cur_matched_bytes);
718 
719 		if (cmp > 0) {
720 low_rec_match:
721 			low_rec = mid_rec;
722 			low_matched_fields = cur_matched_fields;
723 			low_matched_bytes = cur_matched_bytes;
724 
725 		} else if (cmp) {
726 #ifdef PAGE_CUR_LE_OR_EXTENDS
727 			if (mode == PAGE_CUR_LE_OR_EXTENDS
728 			    && page_cur_rec_field_extends(
729 				    tuple, mid_rec, offsets,
730 				    cur_matched_fields)) {
731 
732 				goto low_rec_match;
733 			}
734 #endif /* PAGE_CUR_LE_OR_EXTENDS */
735 up_rec_match:
736 			up_rec = mid_rec;
737 			up_matched_fields = cur_matched_fields;
738 			up_matched_bytes = cur_matched_bytes;
739 		} else if (mode == PAGE_CUR_G || mode == PAGE_CUR_LE
740 #ifdef PAGE_CUR_LE_OR_EXTENDS
741 			   || mode == PAGE_CUR_LE_OR_EXTENDS
742 #endif /* PAGE_CUR_LE_OR_EXTENDS */
743 			   ) {
744 			goto low_rec_match;
745 		} else {
746 
747 			goto up_rec_match;
748 		}
749 	}
750 
751 	if (mode <= PAGE_CUR_GE) {
752 		page_cur_position(up_rec, block, cursor);
753 	} else {
754 		page_cur_position(low_rec, block, cursor);
755 	}
756 
757 	*iup_matched_fields  = up_matched_fields;
758 	*iup_matched_bytes   = up_matched_bytes;
759 	*ilow_matched_fields = low_matched_fields;
760 	*ilow_matched_bytes  = low_matched_bytes;
761 	if (UNIV_LIKELY_NULL(heap)) {
762 		mem_heap_free(heap);
763 	}
764 }
765 #endif /* BTR_CUR_HASH_ADAPT */
766 
767 /***********************************************************//**
768 Positions a page cursor on a randomly chosen user record on a page. If there
769 are no user records, sets the cursor on the infimum record. */
770 void
page_cur_open_on_rnd_user_rec(buf_block_t * block,page_cur_t * cursor)771 page_cur_open_on_rnd_user_rec(
772 /*==========================*/
773 	buf_block_t*	block,	/*!< in: page */
774 	page_cur_t*	cursor)	/*!< out: page cursor */
775 {
776 	const ulint	n_recs = page_get_n_recs(block->frame);
777 
778 	page_cur_set_before_first(block, cursor);
779 
780 	if (UNIV_UNLIKELY(n_recs == 0)) {
781 
782 		return;
783 	}
784 
785 	cursor->rec = page_rec_get_nth(block->frame,
786 				       ut_rnd_interval(n_recs) + 1);
787 }
788 
789 /** Write a redo log record of inserting a record into an index page.
790 @param[in]	insert_rec	inserted record
791 @param[in]	rec_size	rec_get_size(insert_rec)
792 @param[in]	cursor_rec	predecessor of insert_rec
793 @param[in,out]	index		index tree
794 @param[in,out]	mtr		mini-transaction */
795 void
page_cur_insert_rec_write_log(const rec_t * insert_rec,ulint rec_size,const rec_t * cursor_rec,dict_index_t * index,mtr_t * mtr)796 page_cur_insert_rec_write_log(
797 	const rec_t*	insert_rec,
798 	ulint		rec_size,
799 	const rec_t*	cursor_rec,
800 	dict_index_t*	index,
801 	mtr_t*		mtr)
802 {
803 	ulint	cur_rec_size;
804 	ulint	extra_size;
805 	ulint	cur_extra_size;
806 	const byte* ins_ptr;
807 	const byte* log_end;
808 	ulint	i;
809 
810 	if (index->table->is_temporary()) {
811 		mtr->set_modified();
812 		ut_ad(mtr->get_log_mode() == MTR_LOG_NO_REDO);
813 		return;
814 	}
815 
816 	ut_a(rec_size < srv_page_size);
817 	ut_ad(mtr->is_named_space(index->table->space));
818 	ut_ad(page_align(insert_rec) == page_align(cursor_rec));
819 	ut_ad(!page_rec_is_comp(insert_rec)
820 	      == !dict_table_is_comp(index->table));
821 
822 	const ulint n_core = page_rec_is_leaf(cursor_rec)
823 		? index->n_core_fields : 0;
824 
825 	{
826 		mem_heap_t*	heap		= NULL;
827 		rec_offs	cur_offs_[REC_OFFS_NORMAL_SIZE];
828 		rec_offs	ins_offs_[REC_OFFS_NORMAL_SIZE];
829 
830 		rec_offs*	cur_offs;
831 		rec_offs*	ins_offs;
832 
833 		rec_offs_init(cur_offs_);
834 		rec_offs_init(ins_offs_);
835 
836 		cur_offs = rec_get_offsets(cursor_rec, index, cur_offs_,
837 					   n_core, ULINT_UNDEFINED, &heap);
838 		ins_offs = rec_get_offsets(insert_rec, index, ins_offs_,
839 					   n_core, ULINT_UNDEFINED, &heap);
840 
841 		extra_size = rec_offs_extra_size(ins_offs);
842 		cur_extra_size = rec_offs_extra_size(cur_offs);
843 		ut_ad(rec_size == rec_offs_size(ins_offs));
844 		cur_rec_size = rec_offs_size(cur_offs);
845 
846 		if (UNIV_LIKELY_NULL(heap)) {
847 			mem_heap_free(heap);
848 		}
849 	}
850 
851 	ins_ptr = insert_rec - extra_size;
852 
853 	i = 0;
854 
855 	if (cur_extra_size == extra_size) {
856 		ulint		min_rec_size = ut_min(cur_rec_size, rec_size);
857 
858 		const byte*	cur_ptr = cursor_rec - cur_extra_size;
859 
860 		/* Find out the first byte in insert_rec which differs from
861 		cursor_rec; skip the bytes in the record info */
862 
863 		do {
864 			if (*ins_ptr == *cur_ptr) {
865 				i++;
866 				ins_ptr++;
867 				cur_ptr++;
868 			} else if ((i < extra_size)
869 				   && (i >= extra_size
870 				       - page_rec_get_base_extra_size
871 				       (insert_rec))) {
872 				i = extra_size;
873 				ins_ptr = insert_rec;
874 				cur_ptr = cursor_rec;
875 			} else {
876 				break;
877 			}
878 		} while (i < min_rec_size);
879 	}
880 
881 	byte*	log_ptr;
882 
883 	if (mtr_get_log_mode(mtr) != MTR_LOG_SHORT_INSERTS) {
884 
885 		if (page_rec_is_comp(insert_rec)) {
886 			log_ptr = mlog_open_and_write_index(
887 				mtr, insert_rec, index, MLOG_COMP_REC_INSERT,
888 				2 + 5 + 1 + 5 + 5 + MLOG_BUF_MARGIN);
889 			if (UNIV_UNLIKELY(!log_ptr)) {
890 				/* Logging in mtr is switched off
891 				during crash recovery: in that case
892 				mlog_open returns NULL */
893 				return;
894 			}
895 		} else {
896 			log_ptr = mlog_open(mtr, 11
897 					    + 2 + 5 + 1 + 5 + 5
898 					    + MLOG_BUF_MARGIN);
899 			if (UNIV_UNLIKELY(!log_ptr)) {
900 				/* Logging in mtr is switched off
901 				during crash recovery: in that case
902 				mlog_open returns NULL */
903 				return;
904 			}
905 
906 			log_ptr = mlog_write_initial_log_record_fast(
907 				insert_rec, MLOG_REC_INSERT, log_ptr, mtr);
908 		}
909 
910 		log_end = &log_ptr[2 + 5 + 1 + 5 + 5 + MLOG_BUF_MARGIN];
911 		/* Write the cursor rec offset as a 2-byte ulint */
912 		mach_write_to_2(log_ptr, page_offset(cursor_rec));
913 		log_ptr += 2;
914 	} else {
915 		log_ptr = mlog_open(mtr, 5 + 1 + 5 + 5 + MLOG_BUF_MARGIN);
916 		if (!log_ptr) {
917 			/* Logging in mtr is switched off during crash
918 			recovery: in that case mlog_open returns NULL */
919 			return;
920 		}
921 		log_end = &log_ptr[5 + 1 + 5 + 5 + MLOG_BUF_MARGIN];
922 	}
923 
924 	if (page_rec_is_comp(insert_rec)) {
925 		if (UNIV_UNLIKELY
926 		    (rec_get_info_and_status_bits(insert_rec, TRUE)
927 		     != rec_get_info_and_status_bits(cursor_rec, TRUE))) {
928 
929 			goto need_extra_info;
930 		}
931 	} else {
932 		if (UNIV_UNLIKELY
933 		    (rec_get_info_and_status_bits(insert_rec, FALSE)
934 		     != rec_get_info_and_status_bits(cursor_rec, FALSE))) {
935 
936 			goto need_extra_info;
937 		}
938 	}
939 
940 	if (extra_size != cur_extra_size || rec_size != cur_rec_size) {
941 need_extra_info:
942 		/* Write the record end segment length
943 		and the extra info storage flag */
944 		log_ptr += mach_write_compressed(log_ptr,
945 						 2 * (rec_size - i) + 1);
946 
947 		/* Write the info bits */
948 		mach_write_to_1(log_ptr,
949 				rec_get_info_and_status_bits(
950 					insert_rec,
951 					page_rec_is_comp(insert_rec)));
952 		log_ptr++;
953 
954 		/* Write the record origin offset */
955 		log_ptr += mach_write_compressed(log_ptr, extra_size);
956 
957 		/* Write the mismatch index */
958 		log_ptr += mach_write_compressed(log_ptr, i);
959 
960 		ut_a(i < srv_page_size);
961 		ut_a(extra_size < srv_page_size);
962 	} else {
963 		/* Write the record end segment length
964 		and the extra info storage flag */
965 		log_ptr += mach_write_compressed(log_ptr, 2 * (rec_size - i));
966 	}
967 
968 	/* Write to the log the inserted index record end segment which
969 	differs from the cursor record */
970 
971 	rec_size -= i;
972 
973 	if (log_ptr + rec_size <= log_end) {
974 		memcpy(log_ptr, ins_ptr, rec_size);
975 		mlog_close(mtr, log_ptr + rec_size);
976 	} else {
977 		mlog_close(mtr, log_ptr);
978 		ut_a(rec_size < srv_page_size);
979 		mlog_catenate_string(mtr, ins_ptr, rec_size);
980 	}
981 }
982 
983 /***********************************************************//**
984 Parses a log record of a record insert on a page.
985 @return end of log record or NULL */
986 byte*
page_cur_parse_insert_rec(ibool is_short,const byte * ptr,const byte * end_ptr,buf_block_t * block,dict_index_t * index,mtr_t * mtr)987 page_cur_parse_insert_rec(
988 /*======================*/
989 	ibool		is_short,/*!< in: TRUE if short inserts */
990 	const byte*	ptr,	/*!< in: buffer */
991 	const byte*	end_ptr,/*!< in: buffer end */
992 	buf_block_t*	block,	/*!< in: page or NULL */
993 	dict_index_t*	index,	/*!< in: record descriptor */
994 	mtr_t*		mtr)	/*!< in: mtr or NULL */
995 {
996 	ulint	origin_offset		= 0; /* remove warning */
997 	ulint	end_seg_len;
998 	ulint	mismatch_index		= 0; /* remove warning */
999 	page_t*	page;
1000 	rec_t*	cursor_rec;
1001 	byte	buf1[1024];
1002 	byte*	buf;
1003 	const byte*	ptr2		= ptr;
1004 	ulint		info_and_status_bits = 0; /* remove warning */
1005 	page_cur_t	cursor;
1006 	mem_heap_t*	heap		= NULL;
1007 	rec_offs	offsets_[REC_OFFS_NORMAL_SIZE];
1008 	rec_offs*	offsets		= offsets_;
1009 	rec_offs_init(offsets_);
1010 
1011 	page = block ? buf_block_get_frame(block) : NULL;
1012 
1013 	if (is_short) {
1014 		cursor_rec = page_rec_get_prev(page_get_supremum_rec(page));
1015 	} else {
1016 		ulint	offset;
1017 
1018 		/* Read the cursor rec offset as a 2-byte ulint */
1019 
1020 		if (UNIV_UNLIKELY(end_ptr < ptr + 2)) {
1021 
1022 			return(NULL);
1023 		}
1024 
1025 		offset = mach_read_from_2(ptr);
1026 		ptr += 2;
1027 
1028 		cursor_rec = page + offset;
1029 
1030 		if (offset >= srv_page_size) {
1031 
1032 			recv_sys->found_corrupt_log = TRUE;
1033 
1034 			return(NULL);
1035 		}
1036 	}
1037 
1038 	end_seg_len = mach_parse_compressed(&ptr, end_ptr);
1039 
1040 	if (ptr == NULL) {
1041 
1042 		return(NULL);
1043 	}
1044 
1045 	if (end_seg_len >= srv_page_size << 1) {
1046 		recv_sys->found_corrupt_log = TRUE;
1047 
1048 		return(NULL);
1049 	}
1050 
1051 	if (end_seg_len & 0x1UL) {
1052 		/* Read the info bits */
1053 
1054 		if (end_ptr < ptr + 1) {
1055 
1056 			return(NULL);
1057 		}
1058 
1059 		info_and_status_bits = mach_read_from_1(ptr);
1060 		ptr++;
1061 
1062 		origin_offset = mach_parse_compressed(&ptr, end_ptr);
1063 
1064 		if (ptr == NULL) {
1065 
1066 			return(NULL);
1067 		}
1068 
1069 		ut_a(origin_offset < srv_page_size);
1070 
1071 		mismatch_index = mach_parse_compressed(&ptr, end_ptr);
1072 
1073 		if (ptr == NULL) {
1074 
1075 			return(NULL);
1076 		}
1077 
1078 		ut_a(mismatch_index < srv_page_size);
1079 	}
1080 
1081 	if (end_ptr < ptr + (end_seg_len >> 1)) {
1082 
1083 		return(NULL);
1084 	}
1085 
1086 	if (!block) {
1087 
1088 		return(const_cast<byte*>(ptr + (end_seg_len >> 1)));
1089 	}
1090 
1091 	ut_ad(!!page_is_comp(page) == dict_table_is_comp(index->table));
1092 	ut_ad(!buf_block_get_page_zip(block) || page_is_comp(page));
1093 
1094 	/* Read from the log the inserted index record end segment which
1095 	differs from the cursor record */
1096 
1097 	const ulint n_core = page_is_leaf(page) ? index->n_core_fields : 0;
1098 
1099 	offsets = rec_get_offsets(cursor_rec, index, offsets, n_core,
1100 				  ULINT_UNDEFINED, &heap);
1101 
1102 	if (!(end_seg_len & 0x1UL)) {
1103 		info_and_status_bits = rec_get_info_and_status_bits(
1104 			cursor_rec, page_is_comp(page));
1105 		origin_offset = rec_offs_extra_size(offsets);
1106 		mismatch_index = rec_offs_size(offsets) - (end_seg_len >> 1);
1107 	}
1108 
1109 	end_seg_len >>= 1;
1110 
1111 	if (mismatch_index + end_seg_len < sizeof buf1) {
1112 		buf = buf1;
1113 	} else {
1114 		buf = static_cast<byte*>(
1115 			ut_malloc_nokey(mismatch_index + end_seg_len));
1116 	}
1117 
1118 	/* Build the inserted record to buf */
1119 
1120         if (UNIV_UNLIKELY(mismatch_index >= srv_page_size)) {
1121 
1122 		ib::fatal() << "is_short " << is_short << ", "
1123 			<< "info_and_status_bits " << info_and_status_bits
1124 			<< ", offset " << page_offset(cursor_rec) << ","
1125 			" o_offset " << origin_offset << ", mismatch index "
1126 			<< mismatch_index << ", end_seg_len " << end_seg_len
1127 			<< " parsed len " << (ptr - ptr2);
1128 	}
1129 
1130 	ut_memcpy(buf, rec_get_start(cursor_rec, offsets), mismatch_index);
1131 	ut_memcpy(buf + mismatch_index, ptr, end_seg_len);
1132 
1133 	if (page_is_comp(page)) {
1134 		rec_set_heap_no_new(buf + origin_offset,
1135 				    PAGE_HEAP_NO_USER_LOW);
1136 		rec_set_info_and_status_bits(buf + origin_offset,
1137 					     info_and_status_bits);
1138 	} else {
1139 		rec_set_heap_no_old(buf + origin_offset,
1140 				    PAGE_HEAP_NO_USER_LOW);
1141 		rec_set_info_bits_old(buf + origin_offset,
1142 				      info_and_status_bits);
1143 	}
1144 
1145 	page_cur_position(cursor_rec, block, &cursor);
1146 
1147 	offsets = rec_get_offsets(buf + origin_offset, index, offsets,
1148 				  n_core, ULINT_UNDEFINED, &heap);
1149 	if (UNIV_UNLIKELY(!page_cur_rec_insert(&cursor,
1150 					       buf + origin_offset,
1151 					       index, offsets, mtr))) {
1152 		/* The redo log record should only have been written
1153 		after the write was successful. */
1154 		ut_error;
1155 	}
1156 
1157 	if (buf != buf1) {
1158 
1159 		ut_free(buf);
1160 	}
1161 
1162 	if (UNIV_LIKELY_NULL(heap)) {
1163 		mem_heap_free(heap);
1164 	}
1165 
1166 	return(const_cast<byte*>(ptr + end_seg_len));
1167 }
1168 
1169 /** Reset PAGE_DIRECTION and PAGE_N_DIRECTION.
1170 @param[in,out]	ptr		the PAGE_DIRECTION_B field
1171 @param[in,out]	page		index tree page frame
1172 @param[in]	page_zip	compressed page descriptor, or NULL */
1173 static inline
1174 void
page_direction_reset(byte * ptr,page_t * page,page_zip_des_t * page_zip)1175 page_direction_reset(byte* ptr, page_t* page, page_zip_des_t* page_zip)
1176 {
1177 	ut_ad(ptr == PAGE_HEADER + PAGE_DIRECTION_B + page);
1178 	page_ptr_set_direction(ptr, PAGE_NO_DIRECTION);
1179 	if (page_zip) {
1180 		page_zip_write_header(page_zip, ptr, 1, NULL);
1181 	}
1182 	ptr = PAGE_HEADER + PAGE_N_DIRECTION + page;
1183 	*reinterpret_cast<uint16_t*>(ptr) = 0;
1184 	if (page_zip) {
1185 		page_zip_write_header(page_zip, ptr, 2, NULL);
1186 	}
1187 }
1188 
1189 /** Increment PAGE_N_DIRECTION.
1190 @param[in,out]	ptr		the PAGE_DIRECTION_B field
1191 @param[in,out]	page		index tree page frame
1192 @param[in]	page_zip	compressed page descriptor, or NULL
1193 @param[in]	dir		PAGE_RIGHT or PAGE_LEFT */
1194 static inline
1195 void
page_direction_increment(byte * ptr,page_t * page,page_zip_des_t * page_zip,uint dir)1196 page_direction_increment(
1197 	byte*		ptr,
1198 	page_t*		page,
1199 	page_zip_des_t*	page_zip,
1200 	uint		dir)
1201 {
1202 	ut_ad(ptr == PAGE_HEADER + PAGE_DIRECTION_B + page);
1203 	ut_ad(dir == PAGE_RIGHT || dir == PAGE_LEFT);
1204 	page_ptr_set_direction(ptr, dir);
1205 	if (page_zip) {
1206 		page_zip_write_header(page_zip, ptr, 1, NULL);
1207 	}
1208 	page_header_set_field(
1209 		page, page_zip, PAGE_N_DIRECTION,
1210 		1U + page_header_get_field(page, PAGE_N_DIRECTION));
1211 }
1212 
1213 /************************************************************//**
1214 Allocates a block of memory from the heap of an index page.
1215 @return	pointer to start of allocated buffer, or NULL if allocation fails */
1216 static
1217 byte*
page_mem_alloc_heap(page_t * page,page_zip_des_t * page_zip,ulint need,ulint * heap_no)1218 page_mem_alloc_heap(
1219 /*================*/
1220 	page_t*		page,	/*!< in/out: index page */
1221 	page_zip_des_t*	page_zip,/*!< in/out: compressed page with enough
1222 				space available for inserting the record,
1223 				or NULL */
1224 	ulint		need,	/*!< in: total number of bytes needed */
1225 	ulint*		heap_no)/*!< out: this contains the heap number
1226 				of the allocated record
1227 				if allocation succeeds */
1228 {
1229 	byte*	block;
1230 	ulint	avl_space;
1231 
1232 	ut_ad(page && heap_no);
1233 
1234 	avl_space = page_get_max_insert_size(page, 1);
1235 
1236 	if (avl_space >= need) {
1237 		const ulint h = page_dir_get_n_heap(page);
1238 		if (UNIV_UNLIKELY(h >= 8191)) {
1239 			/* At the minimum record size of 5+2 bytes,
1240 			we can only reach this condition when using
1241 			innodb_page_size=64k. */
1242 			ut_ad(srv_page_size == 65536);
1243 			return(NULL);
1244 		}
1245 		*heap_no = h;
1246 
1247 		block = page_header_get_ptr(page, PAGE_HEAP_TOP);
1248 
1249 		page_header_set_ptr(page, page_zip, PAGE_HEAP_TOP,
1250 				    block + need);
1251 		page_dir_set_n_heap(page, page_zip, 1 + *heap_no);
1252 
1253 		return(block);
1254 	}
1255 
1256 	return(NULL);
1257 }
1258 
1259 /***********************************************************//**
1260 Inserts a record next to page cursor on an uncompressed page.
1261 Returns pointer to inserted record if succeed, i.e., enough
1262 space available, NULL otherwise. The cursor stays at the same position.
1263 @return pointer to record if succeed, NULL otherwise */
1264 rec_t*
page_cur_insert_rec_low(rec_t * current_rec,dict_index_t * index,const rec_t * rec,rec_offs * offsets,mtr_t * mtr)1265 page_cur_insert_rec_low(
1266 /*====================*/
1267 	rec_t*		current_rec,/*!< in: pointer to current record after
1268 				which the new record is inserted */
1269 	dict_index_t*	index,	/*!< in: record descriptor */
1270 	const rec_t*	rec,	/*!< in: pointer to a physical record */
1271 	rec_offs*	offsets,/*!< in/out: rec_get_offsets(rec, index) */
1272 	mtr_t*		mtr)	/*!< in: mini-transaction handle, or NULL */
1273 {
1274 	byte*		insert_buf;
1275 	ulint		rec_size;
1276 	page_t*		page;		/*!< the relevant page */
1277 	rec_t*		last_insert;	/*!< cursor position at previous
1278 					insert */
1279 	rec_t*		free_rec;	/*!< a free record that was reused,
1280 					or NULL */
1281 	rec_t*		insert_rec;	/*!< inserted record */
1282 	ulint		heap_no;	/*!< heap number of the inserted
1283 					record */
1284 
1285 	ut_ad(rec_offs_validate(rec, index, offsets));
1286 
1287 	page = page_align(current_rec);
1288 	ut_ad(dict_table_is_comp(index->table)
1289 	      == (ibool) !!page_is_comp(page));
1290 	ut_ad(fil_page_index_page_check(page));
1291 	ut_ad(mach_read_from_8(page + PAGE_HEADER + PAGE_INDEX_ID) == index->id
1292 	      || index->is_dummy
1293 	      || (mtr ? mtr->is_inside_ibuf() : dict_index_is_ibuf(index)));
1294 
1295 	ut_ad(!page_rec_is_supremum(current_rec));
1296 
1297 	/* 1. Get the size of the physical record in the page */
1298 	rec_size = rec_offs_size(offsets);
1299 
1300 #ifdef HAVE_valgrind
1301 	{
1302 		const void*	rec_start __attribute__((unused))
1303 			= rec - rec_offs_extra_size(offsets);
1304 		ulint		extra_size __attribute__((unused))
1305 			= rec_offs_extra_size(offsets)
1306 			- (rec_offs_comp(offsets)
1307 			   ? REC_N_NEW_EXTRA_BYTES
1308 			   : REC_N_OLD_EXTRA_BYTES);
1309 
1310 		/* All data bytes of the record must be valid. */
1311 		MEM_CHECK_DEFINED(rec, rec_offs_data_size(offsets));
1312 		/* The variable-length header must be valid. */
1313 		MEM_CHECK_DEFINED(rec_start, extra_size);
1314 	}
1315 #endif /* HAVE_valgrind */
1316 
1317 	/* 2. Try to find suitable space from page memory management */
1318 
1319 	free_rec = page_header_get_ptr(page, PAGE_FREE);
1320 	if (UNIV_LIKELY_NULL(free_rec)) {
1321 		/* Try to allocate from the head of the free list. */
1322 		rec_offs	foffsets_[REC_OFFS_NORMAL_SIZE];
1323 		rec_offs*	foffsets	= foffsets_;
1324 		mem_heap_t*	heap		= NULL;
1325 
1326 		rec_offs_init(foffsets_);
1327 
1328 		foffsets = rec_get_offsets(
1329 			free_rec, index, foffsets,
1330 			page_is_leaf(page) ? index->n_core_fields : 0,
1331 			ULINT_UNDEFINED, &heap);
1332 		if (rec_offs_size(foffsets) < rec_size) {
1333 			if (UNIV_LIKELY_NULL(heap)) {
1334 				mem_heap_free(heap);
1335 			}
1336 
1337 			goto use_heap;
1338 		}
1339 
1340 		insert_buf = free_rec - rec_offs_extra_size(foffsets);
1341 
1342 		if (page_is_comp(page)) {
1343 			heap_no = rec_get_heap_no_new(free_rec);
1344 			page_mem_alloc_free(page, NULL,
1345 					rec_get_next_ptr(free_rec, TRUE),
1346 					rec_size);
1347 		} else {
1348 			heap_no = rec_get_heap_no_old(free_rec);
1349 			page_mem_alloc_free(page, NULL,
1350 					rec_get_next_ptr(free_rec, FALSE),
1351 					rec_size);
1352 		}
1353 
1354 		if (UNIV_LIKELY_NULL(heap)) {
1355 			mem_heap_free(heap);
1356 		}
1357 	} else {
1358 use_heap:
1359 		free_rec = NULL;
1360 		insert_buf = page_mem_alloc_heap(page, NULL,
1361 						 rec_size, &heap_no);
1362 
1363 		if (UNIV_UNLIKELY(insert_buf == NULL)) {
1364 			return(NULL);
1365 		}
1366 	}
1367 
1368 	/* 3. Create the record */
1369 	insert_rec = rec_copy(insert_buf, rec, offsets);
1370 	rec_offs_make_valid(insert_rec, index, page_is_leaf(page), offsets);
1371 
1372 	/* 4. Insert the record in the linked list of records */
1373 	ut_ad(current_rec != insert_rec);
1374 
1375 	{
1376 		/* next record after current before the insertion */
1377 		rec_t*	next_rec = page_rec_get_next(current_rec);
1378 #ifdef UNIV_DEBUG
1379 		if (page_is_comp(page)) {
1380 			switch (rec_get_status(current_rec)) {
1381 			case REC_STATUS_ORDINARY:
1382 			case REC_STATUS_NODE_PTR:
1383 			case REC_STATUS_COLUMNS_ADDED:
1384 			case REC_STATUS_INFIMUM:
1385 				break;
1386 			case REC_STATUS_SUPREMUM:
1387 				ut_ad(!"wrong status on current_rec");
1388 			}
1389 			switch (rec_get_status(insert_rec)) {
1390 			case REC_STATUS_ORDINARY:
1391 			case REC_STATUS_NODE_PTR:
1392 			case REC_STATUS_COLUMNS_ADDED:
1393 				break;
1394 			case REC_STATUS_INFIMUM:
1395 			case REC_STATUS_SUPREMUM:
1396 				ut_ad(!"wrong status on insert_rec");
1397 			}
1398 			ut_ad(rec_get_status(next_rec) != REC_STATUS_INFIMUM);
1399 		}
1400 #endif
1401 		page_rec_set_next(insert_rec, next_rec);
1402 		page_rec_set_next(current_rec, insert_rec);
1403 	}
1404 
1405 	page_header_set_field(page, NULL, PAGE_N_RECS,
1406 			      1U + page_get_n_recs(page));
1407 
1408 	/* 5. Set the n_owned field in the inserted record to zero,
1409 	and set the heap_no field */
1410 	if (page_is_comp(page)) {
1411 		rec_set_n_owned_new(insert_rec, NULL, 0);
1412 		rec_set_heap_no_new(insert_rec, heap_no);
1413 	} else {
1414 		rec_set_n_owned_old(insert_rec, 0);
1415 		rec_set_heap_no_old(insert_rec, heap_no);
1416 	}
1417 
1418 	MEM_CHECK_DEFINED(rec_get_start(insert_rec, offsets),
1419 			  rec_offs_size(offsets));
1420 	/* 6. Update the last insertion info in page header */
1421 
1422 	last_insert = page_header_get_ptr(page, PAGE_LAST_INSERT);
1423 	ut_ad(!last_insert || !page_is_comp(page)
1424 	      || rec_get_node_ptr_flag(last_insert)
1425 	      == rec_get_node_ptr_flag(insert_rec));
1426 
1427 	if (!dict_index_is_spatial(index)) {
1428 		byte* ptr = PAGE_HEADER + PAGE_DIRECTION_B + page;
1429 		if (UNIV_UNLIKELY(last_insert == NULL)) {
1430 no_direction:
1431 			page_direction_reset(ptr, page, NULL);
1432 		} else if (last_insert == current_rec
1433 			   && page_ptr_get_direction(ptr) != PAGE_LEFT) {
1434 			page_direction_increment(ptr, page, NULL, PAGE_RIGHT);
1435 		} else if (page_ptr_get_direction(ptr) != PAGE_RIGHT
1436 			   && page_rec_get_next(insert_rec) == last_insert) {
1437 			page_direction_increment(ptr, page, NULL, PAGE_LEFT);
1438 		} else {
1439 			goto no_direction;
1440 		}
1441 	}
1442 
1443 	page_header_set_ptr(page, NULL, PAGE_LAST_INSERT, insert_rec);
1444 
1445 	/* 7. It remains to update the owner record. */
1446 	{
1447 		rec_t*	owner_rec	= page_rec_find_owner_rec(insert_rec);
1448 		ulint	n_owned;
1449 		if (page_is_comp(page)) {
1450 			n_owned = rec_get_n_owned_new(owner_rec);
1451 			rec_set_n_owned_new(owner_rec, NULL, n_owned + 1);
1452 		} else {
1453 			n_owned = rec_get_n_owned_old(owner_rec);
1454 			rec_set_n_owned_old(owner_rec, n_owned + 1);
1455 		}
1456 
1457 		/* 8. Now we have incremented the n_owned field of the owner
1458 		record. If the number exceeds PAGE_DIR_SLOT_MAX_N_OWNED,
1459 		we have to split the corresponding directory slot in two. */
1460 
1461 		if (UNIV_UNLIKELY(n_owned == PAGE_DIR_SLOT_MAX_N_OWNED)) {
1462 			page_dir_split_slot(
1463 				page, NULL,
1464 				page_dir_find_owner_slot(owner_rec));
1465 		}
1466 	}
1467 
1468 	/* 9. Write log record of the insert */
1469 	if (UNIV_LIKELY(mtr != NULL)) {
1470 		page_cur_insert_rec_write_log(insert_rec, rec_size,
1471 					      current_rec, index, mtr);
1472 	}
1473 
1474 	return(insert_rec);
1475 }
1476 
1477 /***********************************************************//**
1478 Inserts a record next to page cursor on a compressed and uncompressed
1479 page. Returns pointer to inserted record if succeed, i.e.,
1480 enough space available, NULL otherwise.
1481 The cursor stays at the same position.
1482 
1483 IMPORTANT: The caller will have to update IBUF_BITMAP_FREE
1484 if this is a compressed leaf page in a secondary index.
1485 This has to be done either within the same mini-transaction,
1486 or by invoking ibuf_reset_free_bits() before mtr_commit().
1487 
1488 @return pointer to record if succeed, NULL otherwise */
1489 rec_t*
page_cur_insert_rec_zip(page_cur_t * cursor,dict_index_t * index,const rec_t * rec,rec_offs * offsets,mtr_t * mtr)1490 page_cur_insert_rec_zip(
1491 /*====================*/
1492 	page_cur_t*	cursor,	/*!< in/out: page cursor */
1493 	dict_index_t*	index,	/*!< in: record descriptor */
1494 	const rec_t*	rec,	/*!< in: pointer to a physical record */
1495 	rec_offs*	offsets,/*!< in/out: rec_get_offsets(rec, index) */
1496 	mtr_t*		mtr)	/*!< in: mini-transaction handle, or NULL */
1497 {
1498 	byte*		insert_buf;
1499 	ulint		rec_size;
1500 	page_t*		page;		/*!< the relevant page */
1501 	rec_t*		last_insert;	/*!< cursor position at previous
1502 					insert */
1503 	rec_t*		free_rec;	/*!< a free record that was reused,
1504 					or NULL */
1505 	rec_t*		insert_rec;	/*!< inserted record */
1506 	ulint		heap_no;	/*!< heap number of the inserted
1507 					record */
1508 	page_zip_des_t*	page_zip;
1509 
1510 	page_zip = page_cur_get_page_zip(cursor);
1511 	ut_ad(page_zip);
1512 
1513 	ut_ad(rec_offs_validate(rec, index, offsets));
1514 
1515 	page = page_cur_get_page(cursor);
1516 	ut_ad(dict_table_is_comp(index->table));
1517 	ut_ad(page_is_comp(page));
1518 	ut_ad(fil_page_index_page_check(page));
1519 	ut_ad(mach_read_from_8(page + PAGE_HEADER + PAGE_INDEX_ID) == index->id
1520 	      || index->is_dummy
1521 	      || (mtr ? mtr->is_inside_ibuf() : dict_index_is_ibuf(index)));
1522 	ut_ad(!page_get_instant(page));
1523 	ut_ad(!page_cur_is_after_last(cursor));
1524 #ifdef UNIV_ZIP_DEBUG
1525 	ut_a(page_zip_validate(page_zip, page, index));
1526 #endif /* UNIV_ZIP_DEBUG */
1527 
1528 	/* 1. Get the size of the physical record in the page */
1529 	rec_size = rec_offs_size(offsets);
1530 
1531 #ifdef HAVE_valgrind
1532 	{
1533 		const void*	rec_start __attribute__((unused))
1534 			= rec - rec_offs_extra_size(offsets);
1535 		ulint		extra_size __attribute__((unused))
1536 			= rec_offs_extra_size(offsets)
1537 			- (rec_offs_comp(offsets)
1538 			   ? REC_N_NEW_EXTRA_BYTES
1539 			   : REC_N_OLD_EXTRA_BYTES);
1540 
1541 		/* All data bytes of the record must be valid. */
1542 		MEM_CHECK_DEFINED(rec, rec_offs_data_size(offsets));
1543 		/* The variable-length header must be valid. */
1544 		MEM_CHECK_DEFINED(rec_start, extra_size);
1545 	}
1546 #endif /* HAVE_valgrind */
1547 
1548 	const bool reorg_before_insert = page_has_garbage(page)
1549 		&& rec_size > page_get_max_insert_size(page, 1)
1550 		&& rec_size <= page_get_max_insert_size_after_reorganize(
1551 			page, 1);
1552 
1553 	/* 2. Try to find suitable space from page memory management */
1554 	if (!page_zip_available(page_zip, dict_index_is_clust(index),
1555 				rec_size, 1)
1556 	    || reorg_before_insert) {
1557 		/* The values can change dynamically. */
1558 		bool	log_compressed	= page_zip_log_pages;
1559 		ulint	level		= page_zip_level;
1560 #ifdef UNIV_DEBUG
1561 		rec_t*	cursor_rec	= page_cur_get_rec(cursor);
1562 #endif /* UNIV_DEBUG */
1563 
1564 		/* If we are not writing compressed page images, we
1565 		must reorganize the page before attempting the
1566 		insert. */
1567 		if (recv_recovery_is_on()) {
1568 			/* Insert into the uncompressed page only.
1569 			The page reorganization or creation that we
1570 			would attempt outside crash recovery would
1571 			have been covered by a previous redo log record. */
1572 		} else if (page_is_empty(page)) {
1573 			ut_ad(page_cur_is_before_first(cursor));
1574 
1575 			/* This is an empty page. Recreate it to
1576 			get rid of the modification log. */
1577 			page_create_zip(page_cur_get_block(cursor), index,
1578 					page_header_get_field(page, PAGE_LEVEL),
1579 					0, NULL, mtr);
1580 			ut_ad(!page_header_get_ptr(page, PAGE_FREE));
1581 
1582 			if (page_zip_available(
1583 				    page_zip, dict_index_is_clust(index),
1584 				    rec_size, 1)) {
1585 				goto use_heap;
1586 			}
1587 
1588 			/* The cursor should remain on the page infimum. */
1589 			return(NULL);
1590 		} else if (!page_zip->m_nonempty && !page_has_garbage(page)) {
1591 			/* The page has been freshly compressed, so
1592 			reorganizing it will not help. */
1593 		} else if (log_compressed && !reorg_before_insert) {
1594 			/* Insert into uncompressed page only, and
1595 			try page_zip_reorganize() afterwards. */
1596 		} else if (btr_page_reorganize_low(
1597 				   recv_recovery_is_on(), level,
1598 				   cursor, index, mtr)) {
1599 			ut_ad(!page_header_get_ptr(page, PAGE_FREE));
1600 
1601 			if (page_zip_available(
1602 				    page_zip, dict_index_is_clust(index),
1603 				    rec_size, 1)) {
1604 				/* After reorganizing, there is space
1605 				available. */
1606 				goto use_heap;
1607 			}
1608 		} else {
1609 			ut_ad(cursor->rec == cursor_rec);
1610 			return(NULL);
1611 		}
1612 
1613 		/* Try compressing the whole page afterwards. */
1614 		insert_rec = page_cur_insert_rec_low(
1615 			cursor->rec, index, rec, offsets, NULL);
1616 
1617 		/* If recovery is on, this implies that the compression
1618 		of the page was successful during runtime. Had that not
1619 		been the case or had the redo logging of compressed
1620 		pages been enabled during runtime then we'd have seen
1621 		a MLOG_ZIP_PAGE_COMPRESS redo record. Therefore, we
1622 		know that we don't need to reorganize the page. We,
1623 		however, do need to recompress the page. That will
1624 		happen when the next redo record is read which must
1625 		be of type MLOG_ZIP_PAGE_COMPRESS_NO_DATA and it must
1626 		contain a valid compression level value.
1627 		This implies that during recovery from this point till
1628 		the next redo is applied the uncompressed and
1629 		compressed versions are not identical and
1630 		page_zip_validate will fail but that is OK because
1631 		we call page_zip_validate only after processing
1632 		all changes to a page under a single mtr during
1633 		recovery. */
1634 		if (insert_rec == NULL) {
1635 			/* Out of space.
1636 			This should never occur during crash recovery,
1637 			because the MLOG_COMP_REC_INSERT should only
1638 			be logged after a successful operation. */
1639 			ut_ad(!recv_recovery_is_on());
1640 			ut_ad(!index->is_dummy);
1641 		} else if (recv_recovery_is_on()) {
1642 			/* This should be followed by
1643 			MLOG_ZIP_PAGE_COMPRESS_NO_DATA,
1644 			which should succeed. */
1645 			rec_offs_make_valid(insert_rec, index,
1646 					    page_is_leaf(page), offsets);
1647 		} else {
1648 			ulint	pos = page_rec_get_n_recs_before(insert_rec);
1649 			ut_ad(pos > 0);
1650 
1651 			if (!log_compressed) {
1652 				if (page_zip_compress(
1653 					    page_zip, page, index,
1654 					    level, NULL, NULL)) {
1655 					page_cur_insert_rec_write_log(
1656 						insert_rec, rec_size,
1657 						cursor->rec, index, mtr);
1658 					page_zip_compress_write_log_no_data(
1659 						level, page, index, mtr);
1660 
1661 					rec_offs_make_valid(
1662 						insert_rec, index,
1663 						page_is_leaf(page), offsets);
1664 					return(insert_rec);
1665 				}
1666 
1667 				/* Page compress failed. If this happened on a
1668 				leaf page, put the data size into the sample
1669 				buffer. */
1670 				if (page_is_leaf(page)) {
1671 					ulint occupied = page_get_data_size(page)
1672 						+ page_dir_calc_reserved_space(
1673 								page_get_n_recs(page));
1674 					index->stat_defrag_data_size_sample[
1675 						index->stat_defrag_sample_next_slot] =
1676 								occupied;
1677 					index->stat_defrag_sample_next_slot =
1678 						(index->stat_defrag_sample_next_slot
1679 						 + 1) % STAT_DEFRAG_DATA_SIZE_N_SAMPLE;
1680 				}
1681 
1682 				ut_ad(cursor->rec
1683 				      == (pos > 1
1684 					  ? page_rec_get_nth(
1685 						  page, pos - 1)
1686 					  : page + PAGE_NEW_INFIMUM));
1687 			} else {
1688 				/* We are writing entire page images
1689 				to the log. Reduce the redo log volume
1690 				by reorganizing the page at the same time. */
1691 				if (page_zip_reorganize(
1692 					    cursor->block, index, mtr)) {
1693 					/* The page was reorganized:
1694 					Seek to pos. */
1695 					if (pos > 1) {
1696 						cursor->rec = page_rec_get_nth(
1697 							page, pos - 1);
1698 					} else {
1699 						cursor->rec = page
1700 							+ PAGE_NEW_INFIMUM;
1701 					}
1702 
1703 					insert_rec = page + rec_get_next_offs(
1704 						cursor->rec, TRUE);
1705 					rec_offs_make_valid(
1706 						insert_rec, index,
1707 						page_is_leaf(page), offsets);
1708 					return(insert_rec);
1709 				}
1710 
1711 				/* Theoretically, we could try one
1712 				last resort of btr_page_reorganize_low()
1713 				followed by page_zip_available(), but
1714 				that would be very unlikely to
1715 				succeed. (If the full reorganized page
1716 				failed to compress, why would it
1717 				succeed to compress the page, plus log
1718 				the insert of this record? */
1719 			}
1720 
1721 			/* Out of space: restore the page */
1722 			if (!page_zip_decompress(page_zip, page, FALSE)) {
1723 				ut_error; /* Memory corrupted? */
1724 			}
1725 			ut_ad(page_validate(page, index));
1726 			insert_rec = NULL;
1727 		}
1728 
1729 		return(insert_rec);
1730 	}
1731 
1732 	free_rec = page_header_get_ptr(page, PAGE_FREE);
1733 	if (UNIV_LIKELY_NULL(free_rec)) {
1734 		/* Try to allocate from the head of the free list. */
1735 		lint	extra_size_diff;
1736 		rec_offs	foffsets_[REC_OFFS_NORMAL_SIZE];
1737 		rec_offs*	foffsets	= foffsets_;
1738 		mem_heap_t*	heap		= NULL;
1739 
1740 		rec_offs_init(foffsets_);
1741 
1742 		foffsets = rec_get_offsets(free_rec, index, foffsets,
1743 					   page_rec_is_leaf(free_rec)
1744 					   ? index->n_core_fields : 0,
1745 					   ULINT_UNDEFINED, &heap);
1746 		if (rec_offs_size(foffsets) < rec_size) {
1747 too_small:
1748 			if (UNIV_LIKELY_NULL(heap)) {
1749 				mem_heap_free(heap);
1750 			}
1751 
1752 			goto use_heap;
1753 		}
1754 
1755 		insert_buf = free_rec - rec_offs_extra_size(foffsets);
1756 
1757 		/* On compressed pages, do not relocate records from
1758 		the free list.  If extra_size would grow, use the heap. */
1759 		extra_size_diff = lint(rec_offs_extra_size(offsets)
1760 				       - rec_offs_extra_size(foffsets));
1761 
1762 		if (UNIV_UNLIKELY(extra_size_diff < 0)) {
1763 			/* Add an offset to the extra_size. */
1764 			if (rec_offs_size(foffsets)
1765 			    < rec_size - ulint(extra_size_diff)) {
1766 
1767 				goto too_small;
1768 			}
1769 
1770 			insert_buf -= extra_size_diff;
1771 		} else if (UNIV_UNLIKELY(extra_size_diff)) {
1772 			/* Do not allow extra_size to grow */
1773 
1774 			goto too_small;
1775 		}
1776 
1777 		heap_no = rec_get_heap_no_new(free_rec);
1778 		page_mem_alloc_free(page, page_zip,
1779 				    rec_get_next_ptr(free_rec, TRUE),
1780 				    rec_size);
1781 
1782 		if (!page_is_leaf(page)) {
1783 			/* Zero out the node pointer of free_rec,
1784 			in case it will not be overwritten by
1785 			insert_rec. */
1786 
1787 			ut_ad(rec_size > REC_NODE_PTR_SIZE);
1788 
1789 			if (rec_offs_extra_size(foffsets)
1790 			    + rec_offs_data_size(foffsets) > rec_size) {
1791 
1792 				memset(rec_get_end(free_rec, foffsets)
1793 				       - REC_NODE_PTR_SIZE, 0,
1794 				       REC_NODE_PTR_SIZE);
1795 			}
1796 		} else if (dict_index_is_clust(index)) {
1797 			/* Zero out the DB_TRX_ID and DB_ROLL_PTR
1798 			columns of free_rec, in case it will not be
1799 			overwritten by insert_rec. */
1800 
1801 			ulint	trx_id_col;
1802 			ulint	trx_id_offs;
1803 			ulint	len;
1804 
1805 			trx_id_col = dict_index_get_sys_col_pos(index,
1806 								DATA_TRX_ID);
1807 			ut_ad(trx_id_col > 0);
1808 			ut_ad(trx_id_col != ULINT_UNDEFINED);
1809 
1810 			trx_id_offs = rec_get_nth_field_offs(foffsets,
1811 							     trx_id_col, &len);
1812 			ut_ad(len == DATA_TRX_ID_LEN);
1813 
1814 			if (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN + trx_id_offs
1815 			    + rec_offs_extra_size(foffsets) > rec_size) {
1816 				/* We will have to zero out the
1817 				DB_TRX_ID and DB_ROLL_PTR, because
1818 				they will not be fully overwritten by
1819 				insert_rec. */
1820 
1821 				memset(free_rec + trx_id_offs, 0,
1822 				       DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
1823 			}
1824 
1825 			ut_ad(free_rec + trx_id_offs + DATA_TRX_ID_LEN
1826 			      == rec_get_nth_field(free_rec, foffsets,
1827 						   trx_id_col + 1, &len));
1828 			ut_ad(len == DATA_ROLL_PTR_LEN);
1829 		}
1830 
1831 		if (UNIV_LIKELY_NULL(heap)) {
1832 			mem_heap_free(heap);
1833 		}
1834 	} else {
1835 use_heap:
1836 		free_rec = NULL;
1837 		insert_buf = page_mem_alloc_heap(page, page_zip,
1838 						 rec_size, &heap_no);
1839 
1840 		if (UNIV_UNLIKELY(insert_buf == NULL)) {
1841 			return(NULL);
1842 		}
1843 
1844 		page_zip_dir_add_slot(page_zip, dict_index_is_clust(index));
1845 	}
1846 
1847 	/* 3. Create the record */
1848 	insert_rec = rec_copy(insert_buf, rec, offsets);
1849 	rec_offs_make_valid(insert_rec, index, page_is_leaf(page), offsets);
1850 
1851 	/* 4. Insert the record in the linked list of records */
1852 	ut_ad(cursor->rec != insert_rec);
1853 
1854 	{
1855 		/* next record after current before the insertion */
1856 		const rec_t*	next_rec = page_rec_get_next_low(
1857 			cursor->rec, TRUE);
1858 		ut_ad(rec_get_status(cursor->rec)
1859 		      <= REC_STATUS_INFIMUM);
1860 		ut_ad(rec_get_status(insert_rec) < REC_STATUS_INFIMUM);
1861 		ut_ad(rec_get_status(next_rec) != REC_STATUS_INFIMUM);
1862 
1863 		page_rec_set_next(insert_rec, next_rec);
1864 		page_rec_set_next(cursor->rec, insert_rec);
1865 	}
1866 
1867 	page_header_set_field(page, page_zip, PAGE_N_RECS,
1868 			      1U + page_get_n_recs(page));
1869 
1870 	/* 5. Set the n_owned field in the inserted record to zero,
1871 	and set the heap_no field */
1872 	rec_set_n_owned_new(insert_rec, NULL, 0);
1873 	rec_set_heap_no_new(insert_rec, heap_no);
1874 
1875 	MEM_CHECK_DEFINED(rec_get_start(insert_rec, offsets),
1876 			  rec_offs_size(offsets));
1877 
1878 	page_zip_dir_insert(page_zip, cursor->rec, free_rec, insert_rec);
1879 
1880 	/* 6. Update the last insertion info in page header */
1881 
1882 	last_insert = page_header_get_ptr(page, PAGE_LAST_INSERT);
1883 	ut_ad(!last_insert
1884 	      || rec_get_node_ptr_flag(last_insert)
1885 	      == rec_get_node_ptr_flag(insert_rec));
1886 
1887 	if (!dict_index_is_spatial(index)) {
1888 		byte* ptr = PAGE_HEADER + PAGE_DIRECTION_B + page;
1889 		if (UNIV_UNLIKELY(last_insert == NULL)) {
1890 no_direction:
1891 			page_direction_reset(ptr, page, page_zip);
1892 		} else if (last_insert == cursor->rec
1893 			   && page_ptr_get_direction(ptr) != PAGE_LEFT) {
1894 			page_direction_increment(ptr, page, page_zip,
1895 						 PAGE_RIGHT);
1896 		} else if (page_ptr_get_direction(ptr) != PAGE_RIGHT
1897 			   && page_rec_get_next(insert_rec) == last_insert) {
1898 			page_direction_increment(ptr, page, page_zip,
1899 						 PAGE_LEFT);
1900 		} else {
1901 			goto no_direction;
1902 		}
1903 	}
1904 
1905 	page_header_set_ptr(page, page_zip, PAGE_LAST_INSERT, insert_rec);
1906 
1907 	/* 7. It remains to update the owner record. */
1908 	{
1909 		rec_t*	owner_rec	= page_rec_find_owner_rec(insert_rec);
1910 		ulint	n_owned;
1911 
1912 		n_owned = rec_get_n_owned_new(owner_rec);
1913 		rec_set_n_owned_new(owner_rec, page_zip, n_owned + 1);
1914 
1915 		/* 8. Now we have incremented the n_owned field of the owner
1916 		record. If the number exceeds PAGE_DIR_SLOT_MAX_N_OWNED,
1917 		we have to split the corresponding directory slot in two. */
1918 
1919 		if (UNIV_UNLIKELY(n_owned == PAGE_DIR_SLOT_MAX_N_OWNED)) {
1920 			page_dir_split_slot(
1921 				page, page_zip,
1922 				page_dir_find_owner_slot(owner_rec));
1923 		}
1924 	}
1925 
1926 	page_zip_write_rec(page_zip, insert_rec, index, offsets, 1);
1927 
1928 	/* 9. Write log record of the insert */
1929 	if (UNIV_LIKELY(mtr != NULL)) {
1930 		page_cur_insert_rec_write_log(insert_rec, rec_size,
1931 					      cursor->rec, index, mtr);
1932 	}
1933 
1934 	return(insert_rec);
1935 }
1936 
1937 /**********************************************************//**
1938 Writes a log record of copying a record list end to a new created page.
1939 @return 4-byte field where to write the log data length, or NULL if
1940 logging is disabled */
1941 UNIV_INLINE
1942 byte*
page_copy_rec_list_to_created_page_write_log(page_t * page,dict_index_t * index,mtr_t * mtr)1943 page_copy_rec_list_to_created_page_write_log(
1944 /*=========================================*/
1945 	page_t*		page,	/*!< in: index page */
1946 	dict_index_t*	index,	/*!< in: record descriptor */
1947 	mtr_t*		mtr)	/*!< in: mtr */
1948 {
1949 	byte*	log_ptr;
1950 
1951 	ut_ad(!!page_is_comp(page) == dict_table_is_comp(index->table));
1952 	ut_ad(mtr->is_named_space(index->table->space));
1953 
1954 	log_ptr = mlog_open_and_write_index(mtr, page, index,
1955 					    page_is_comp(page)
1956 					    ? MLOG_COMP_LIST_END_COPY_CREATED
1957 					    : MLOG_LIST_END_COPY_CREATED, 4);
1958 	if (UNIV_LIKELY(log_ptr != NULL)) {
1959 		mlog_close(mtr, log_ptr + 4);
1960 	}
1961 
1962 	return(log_ptr);
1963 }
1964 
1965 /**********************************************************//**
1966 Parses a log record of copying a record list end to a new created page.
1967 @return end of log record or NULL */
1968 byte*
page_parse_copy_rec_list_to_created_page(byte * ptr,byte * end_ptr,buf_block_t * block,dict_index_t * index,mtr_t * mtr)1969 page_parse_copy_rec_list_to_created_page(
1970 /*=====================================*/
1971 	byte*		ptr,	/*!< in: buffer */
1972 	byte*		end_ptr,/*!< in: buffer end */
1973 	buf_block_t*	block,	/*!< in: page or NULL */
1974 	dict_index_t*	index,	/*!< in: record descriptor */
1975 	mtr_t*		mtr)	/*!< in: mtr or NULL */
1976 {
1977 	byte*		rec_end;
1978 	ulint		log_data_len;
1979 	page_t*		page;
1980 	page_zip_des_t*	page_zip;
1981 
1982 	ut_ad(index->is_dummy);
1983 
1984 	if (ptr + 4 > end_ptr) {
1985 
1986 		return(NULL);
1987 	}
1988 
1989 	log_data_len = mach_read_from_4(ptr);
1990 	ptr += 4;
1991 
1992 	rec_end = ptr + log_data_len;
1993 
1994 	if (rec_end > end_ptr) {
1995 
1996 		return(NULL);
1997 	}
1998 
1999 	if (!block) {
2000 
2001 		return(rec_end);
2002 	}
2003 
2004 	ut_ad(fil_page_index_page_check(block->frame));
2005 	/* This function is never invoked on the clustered index root page,
2006 	except in the redo log apply of
2007 	page_copy_rec_list_end_to_created_page() which was logged by.
2008 	page_copy_rec_list_to_created_page_write_log().
2009 	For other pages, this field must be zero-initialized. */
2010 	ut_ad(!page_get_instant(block->frame)
2011 	      || !page_has_siblings(block->frame));
2012 
2013 	while (ptr < rec_end) {
2014 		ptr = page_cur_parse_insert_rec(TRUE, ptr, end_ptr,
2015 						block, index, mtr);
2016 	}
2017 
2018 	ut_a(ptr == rec_end);
2019 
2020 	page = buf_block_get_frame(block);
2021 	page_zip = buf_block_get_page_zip(block);
2022 
2023 	page_header_set_ptr(page, page_zip, PAGE_LAST_INSERT, NULL);
2024 
2025 	if (!dict_index_is_spatial(index)) {
2026 		page_direction_reset(PAGE_HEADER + PAGE_DIRECTION_B + page,
2027 				     page, page_zip);
2028 	}
2029 
2030 	return(rec_end);
2031 }
2032 
2033 /*************************************************************//**
2034 Copies records from page to a newly created page, from a given record onward,
2035 including that record. Infimum and supremum records are not copied.
2036 
2037 IMPORTANT: The caller will have to update IBUF_BITMAP_FREE
2038 if this is a compressed leaf page in a secondary index.
2039 This has to be done either within the same mini-transaction,
2040 or by invoking ibuf_reset_free_bits() before mtr_commit(). */
2041 void
page_copy_rec_list_end_to_created_page(page_t * new_page,rec_t * rec,dict_index_t * index,mtr_t * mtr)2042 page_copy_rec_list_end_to_created_page(
2043 /*===================================*/
2044 	page_t*		new_page,	/*!< in/out: index page to copy to */
2045 	rec_t*		rec,		/*!< in: first record to copy */
2046 	dict_index_t*	index,		/*!< in: record descriptor */
2047 	mtr_t*		mtr)		/*!< in: mtr */
2048 {
2049 	page_dir_slot_t* slot = 0; /* remove warning */
2050 	byte*	heap_top;
2051 	rec_t*	insert_rec = 0; /* remove warning */
2052 	rec_t*	prev_rec;
2053 	ulint	count;
2054 	ulint	n_recs;
2055 	ulint	slot_index;
2056 	ulint	rec_size;
2057 	byte*	log_ptr;
2058 	ulint	log_data_len;
2059 	mem_heap_t*	heap		= NULL;
2060 	rec_offs	offsets_[REC_OFFS_NORMAL_SIZE];
2061 	rec_offs*	offsets		= offsets_;
2062 	rec_offs_init(offsets_);
2063 
2064 	ut_ad(page_dir_get_n_heap(new_page) == PAGE_HEAP_NO_USER_LOW);
2065 	ut_ad(page_align(rec) != new_page);
2066 	ut_ad(page_rec_is_comp(rec) == page_is_comp(new_page));
2067 	ut_ad(fil_page_index_page_check(new_page));
2068 	/* This function is never invoked on the clustered index root page,
2069 	except in btr_lift_page_up(). */
2070 	ut_ad(!page_get_instant(new_page) || !page_has_siblings(new_page));
2071 
2072 	if (page_rec_is_infimum(rec)) {
2073 
2074 		rec = page_rec_get_next(rec);
2075 	}
2076 
2077 	if (page_rec_is_supremum(rec)) {
2078 
2079 		return;
2080 	}
2081 
2082 #ifdef UNIV_DEBUG
2083 	/* To pass the debug tests we have to set these dummy values
2084 	in the debug version */
2085 	page_dir_set_n_slots(new_page, NULL, srv_page_size / 2);
2086 	page_header_set_ptr(new_page, NULL, PAGE_HEAP_TOP,
2087 			    new_page + srv_page_size - 1);
2088 #endif
2089 	log_ptr = page_copy_rec_list_to_created_page_write_log(new_page,
2090 							       index, mtr);
2091 
2092 	log_data_len = mtr->get_log()->size();
2093 
2094 	/* Individual inserts are logged in a shorter form */
2095 
2096 	const mtr_log_t	log_mode = index->table->is_temporary()
2097 	    || !index->is_readable() /* IMPORT TABLESPACE */
2098 		? mtr_get_log_mode(mtr)
2099 		: mtr_set_log_mode(mtr, MTR_LOG_SHORT_INSERTS);
2100 
2101 	prev_rec = page_get_infimum_rec(new_page);
2102 	if (page_is_comp(new_page)) {
2103 		heap_top = new_page + PAGE_NEW_SUPREMUM_END;
2104 	} else {
2105 		heap_top = new_page + PAGE_OLD_SUPREMUM_END;
2106 	}
2107 	count = 0;
2108 	slot_index = 0;
2109 	n_recs = 0;
2110 
2111 	const ulint n_core = page_is_leaf(new_page)
2112 		? index->n_core_fields : 0;
2113 
2114 	do {
2115 		offsets = rec_get_offsets(rec, index, offsets, n_core,
2116 					  ULINT_UNDEFINED, &heap);
2117 		insert_rec = rec_copy(heap_top, rec, offsets);
2118 
2119 		if (page_is_comp(new_page)) {
2120 			rec_set_next_offs_new(prev_rec,
2121 					      page_offset(insert_rec));
2122 
2123 			rec_set_n_owned_new(insert_rec, NULL, 0);
2124 			rec_set_heap_no_new(insert_rec,
2125 					    PAGE_HEAP_NO_USER_LOW + n_recs);
2126 		} else {
2127 			rec_set_next_offs_old(prev_rec,
2128 					      page_offset(insert_rec));
2129 
2130 			rec_set_n_owned_old(insert_rec, 0);
2131 			rec_set_heap_no_old(insert_rec,
2132 					    PAGE_HEAP_NO_USER_LOW + n_recs);
2133 		}
2134 
2135 		count++;
2136 		n_recs++;
2137 
2138 		if (UNIV_UNLIKELY
2139 		    (count == (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2)) {
2140 
2141 			slot_index++;
2142 
2143 			slot = page_dir_get_nth_slot(new_page, slot_index);
2144 
2145 			page_dir_slot_set_rec(slot, insert_rec);
2146 			page_dir_slot_set_n_owned(slot, NULL, count);
2147 
2148 			count = 0;
2149 		}
2150 
2151 		rec_size = rec_offs_size(offsets);
2152 
2153 		ut_ad(heap_top < new_page + srv_page_size);
2154 
2155 		heap_top += rec_size;
2156 
2157 		rec_offs_make_valid(insert_rec, index, n_core != 0, offsets);
2158 		page_cur_insert_rec_write_log(insert_rec, rec_size, prev_rec,
2159 					      index, mtr);
2160 		prev_rec = insert_rec;
2161 		rec = page_rec_get_next(rec);
2162 	} while (!page_rec_is_supremum(rec));
2163 
2164 	ut_ad(n_recs);
2165 
2166 	if ((slot_index > 0) && (count + 1
2167 				 + (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2
2168 				 <= PAGE_DIR_SLOT_MAX_N_OWNED)) {
2169 		/* We can merge the two last dir slots. This operation is
2170 		here to make this function imitate exactly the equivalent
2171 		task made using page_cur_insert_rec, which we use in database
2172 		recovery to reproduce the task performed by this function.
2173 		To be able to check the correctness of recovery, it is good
2174 		that it imitates exactly. */
2175 
2176 		count += (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2;
2177 
2178 		page_dir_slot_set_n_owned(slot, NULL, 0);
2179 
2180 		slot_index--;
2181 	}
2182 
2183 	if (UNIV_LIKELY_NULL(heap)) {
2184 		mem_heap_free(heap);
2185 	}
2186 
2187 	/* Restore the log mode */
2188 
2189 	mtr_set_log_mode(mtr, log_mode);
2190 
2191 	log_data_len = mtr->get_log()->size() - log_data_len;
2192 
2193 	ut_a(log_data_len < 100U << srv_page_size_shift);
2194 
2195 	if (log_ptr != NULL) {
2196 		mach_write_to_4(log_ptr, log_data_len);
2197 	}
2198 
2199 	if (page_is_comp(new_page)) {
2200 		rec_set_next_offs_new(insert_rec, PAGE_NEW_SUPREMUM);
2201 	} else {
2202 		rec_set_next_offs_old(insert_rec, PAGE_OLD_SUPREMUM);
2203 	}
2204 
2205 	slot = page_dir_get_nth_slot(new_page, 1 + slot_index);
2206 
2207 	page_dir_slot_set_rec(slot, page_get_supremum_rec(new_page));
2208 	page_dir_slot_set_n_owned(slot, NULL, count + 1);
2209 
2210 	page_dir_set_n_slots(new_page, NULL, 2 + slot_index);
2211 	page_header_set_ptr(new_page, NULL, PAGE_HEAP_TOP, heap_top);
2212 	page_dir_set_n_heap(new_page, NULL, PAGE_HEAP_NO_USER_LOW + n_recs);
2213 	page_header_set_field(new_page, NULL, PAGE_N_RECS, n_recs);
2214 
2215 	*reinterpret_cast<uint16_t*>(PAGE_HEADER + PAGE_LAST_INSERT + new_page)
2216 		= 0;
2217 	page_direction_reset(PAGE_HEADER + PAGE_DIRECTION_B + new_page,
2218 			     new_page, NULL);
2219 }
2220 
2221 /***********************************************************//**
2222 Writes log record of a record delete on a page. */
2223 UNIV_INLINE
2224 void
page_cur_delete_rec_write_log(rec_t * rec,const dict_index_t * index,mtr_t * mtr)2225 page_cur_delete_rec_write_log(
2226 /*==========================*/
2227 	rec_t*			rec,	/*!< in: record to be deleted */
2228 	const dict_index_t*	index,	/*!< in: record descriptor */
2229 	mtr_t*			mtr)	/*!< in: mini-transaction handle */
2230 {
2231 	byte*	log_ptr;
2232 
2233 	ut_ad(!!page_rec_is_comp(rec) == dict_table_is_comp(index->table));
2234 	ut_ad(mtr->is_named_space(index->table->space));
2235 
2236 	log_ptr = mlog_open_and_write_index(mtr, rec, index,
2237 					    page_rec_is_comp(rec)
2238 					    ? MLOG_COMP_REC_DELETE
2239 					    : MLOG_REC_DELETE, 2);
2240 
2241 	if (!log_ptr) {
2242 		/* Logging in mtr is switched off during crash recovery:
2243 		in that case mlog_open returns NULL */
2244 		return;
2245 	}
2246 
2247 	/* Write the cursor rec offset as a 2-byte ulint */
2248 	mach_write_to_2(log_ptr, page_offset(rec));
2249 
2250 	mlog_close(mtr, log_ptr + 2);
2251 }
2252 
2253 /***********************************************************//**
2254 Parses log record of a record delete on a page.
2255 @return pointer to record end or NULL */
2256 byte*
page_cur_parse_delete_rec(byte * ptr,byte * end_ptr,buf_block_t * block,dict_index_t * index,mtr_t * mtr)2257 page_cur_parse_delete_rec(
2258 /*======================*/
2259 	byte*		ptr,	/*!< in: buffer */
2260 	byte*		end_ptr,/*!< in: buffer end */
2261 	buf_block_t*	block,	/*!< in: page or NULL */
2262 	dict_index_t*	index,	/*!< in: record descriptor */
2263 	mtr_t*		mtr)	/*!< in: mtr or NULL */
2264 {
2265 	ulint		offset;
2266 	page_cur_t	cursor;
2267 
2268 	if (end_ptr < ptr + 2) {
2269 
2270 		return(NULL);
2271 	}
2272 
2273 	/* Read the cursor rec offset as a 2-byte ulint */
2274 	offset = mach_read_from_2(ptr);
2275 	ptr += 2;
2276 
2277 	if (UNIV_UNLIKELY(offset >= srv_page_size)) {
2278 		recv_sys->found_corrupt_log = true;
2279 		return NULL;
2280 	}
2281 
2282 	if (block) {
2283 		page_t*		page		= buf_block_get_frame(block);
2284 		mem_heap_t*	heap		= NULL;
2285 		rec_offs	offsets_[REC_OFFS_NORMAL_SIZE];
2286 		rec_t*		rec		= page + offset;
2287 		rec_offs_init(offsets_);
2288 
2289 		page_cur_position(rec, block, &cursor);
2290 		ut_ad(!buf_block_get_page_zip(block) || page_is_comp(page));
2291 
2292 		page_cur_delete_rec(&cursor, index,
2293 				    rec_get_offsets(rec, index, offsets_,
2294 						    page_rec_is_leaf(rec)
2295 						    ? index->n_core_fields : 0,
2296 						    ULINT_UNDEFINED, &heap),
2297 				    mtr);
2298 		if (UNIV_LIKELY_NULL(heap)) {
2299 			mem_heap_free(heap);
2300 		}
2301 	}
2302 
2303 	return(ptr);
2304 }
2305 
2306 /***********************************************************//**
2307 Deletes a record at the page cursor. The cursor is moved to the next
2308 record after the deleted one. */
2309 void
page_cur_delete_rec(page_cur_t * cursor,const dict_index_t * index,const rec_offs * offsets,mtr_t * mtr)2310 page_cur_delete_rec(
2311 /*================*/
2312 	page_cur_t*		cursor,	/*!< in/out: a page cursor */
2313 	const dict_index_t*	index,	/*!< in: record descriptor */
2314 	const rec_offs*		offsets,/*!< in: rec_get_offsets(
2315 					cursor->rec, index) */
2316 	mtr_t*			mtr)	/*!< in: mini-transaction handle
2317 					or NULL */
2318 {
2319 	page_dir_slot_t* cur_dir_slot;
2320 	page_dir_slot_t* prev_slot;
2321 	page_t*		page;
2322 	page_zip_des_t*	page_zip;
2323 	rec_t*		current_rec;
2324 	rec_t*		prev_rec	= NULL;
2325 	rec_t*		next_rec;
2326 	ulint		cur_slot_no;
2327 	ulint		cur_n_owned;
2328 	rec_t*		rec;
2329 
2330 	page = page_cur_get_page(cursor);
2331 	page_zip = page_cur_get_page_zip(cursor);
2332 
2333 	/* page_zip_validate() will fail here when
2334 	btr_cur_pessimistic_delete() invokes btr_set_min_rec_mark().
2335 	Then, both "page_zip" and "page" would have the min-rec-mark
2336 	set on the smallest user record, but "page" would additionally
2337 	have it set on the smallest-but-one record.  Because sloppy
2338 	page_zip_validate_low() only ignores min-rec-flag differences
2339 	in the smallest user record, it cannot be used here either. */
2340 
2341 	current_rec = cursor->rec;
2342 	ut_ad(rec_offs_validate(current_rec, index, offsets));
2343 	ut_ad(!!page_is_comp(page) == dict_table_is_comp(index->table));
2344 	ut_ad(fil_page_index_page_check(page));
2345 	ut_ad(mach_read_from_8(page + PAGE_HEADER + PAGE_INDEX_ID) == index->id
2346 	      || index->is_dummy
2347 	      || (mtr ? mtr->is_inside_ibuf() : dict_index_is_ibuf(index)));
2348 	ut_ad(!mtr || mtr->is_named_space(index->table->space));
2349 
2350 	/* The record must not be the supremum or infimum record. */
2351 	ut_ad(page_rec_is_user_rec(current_rec));
2352 
2353 	if (page_get_n_recs(page) == 1 && !recv_recovery_is_on()) {
2354 		/* Empty the page, unless we are applying the redo log
2355 		during crash recovery. During normal operation, the
2356 		page_create_empty() gets logged as one of MLOG_PAGE_CREATE,
2357 		MLOG_COMP_PAGE_CREATE, MLOG_ZIP_PAGE_COMPRESS. */
2358 		ut_ad(page_is_leaf(page));
2359 		/* Usually, this should be the root page,
2360 		and the whole index tree should become empty.
2361 		However, this could also be a call in
2362 		btr_cur_pessimistic_update() to delete the only
2363 		record in the page and to insert another one. */
2364 		page_cur_move_to_next(cursor);
2365 		ut_ad(page_cur_is_after_last(cursor));
2366 		page_create_empty(page_cur_get_block(cursor),
2367 				  const_cast<dict_index_t*>(index), mtr);
2368 		return;
2369 	}
2370 
2371 	/* Save to local variables some data associated with current_rec */
2372 	cur_slot_no = page_dir_find_owner_slot(current_rec);
2373 	ut_ad(cur_slot_no > 0);
2374 	cur_dir_slot = page_dir_get_nth_slot(page, cur_slot_no);
2375 	cur_n_owned = page_dir_slot_get_n_owned(cur_dir_slot);
2376 
2377 	/* 0. Write the log record */
2378 	if (mtr != 0) {
2379 		page_cur_delete_rec_write_log(current_rec, index, mtr);
2380 	}
2381 
2382 	/* 1. Reset the last insert info in the page header and increment
2383 	the modify clock for the frame */
2384 
2385 	page_header_set_ptr(page, page_zip, PAGE_LAST_INSERT, NULL);
2386 
2387 	/* The page gets invalid for optimistic searches: increment the
2388 	frame modify clock only if there is an mini-transaction covering
2389 	the change. During IMPORT we allocate local blocks that are not
2390 	part of the buffer pool. */
2391 
2392 	if (mtr != 0) {
2393 		buf_block_modify_clock_inc(page_cur_get_block(cursor));
2394 	}
2395 
2396 	/* 2. Find the next and the previous record. Note that the cursor is
2397 	left at the next record. */
2398 
2399 	ut_ad(cur_slot_no > 0);
2400 	prev_slot = page_dir_get_nth_slot(page, cur_slot_no - 1);
2401 
2402 	rec = (rec_t*) page_dir_slot_get_rec(prev_slot);
2403 
2404 	/* rec now points to the record of the previous directory slot. Look
2405 	for the immediate predecessor of current_rec in a loop. */
2406 
2407 	while (current_rec != rec) {
2408 		prev_rec = rec;
2409 		rec = page_rec_get_next(rec);
2410 	}
2411 
2412 	page_cur_move_to_next(cursor);
2413 	next_rec = cursor->rec;
2414 
2415 	/* 3. Remove the record from the linked list of records */
2416 
2417 	page_rec_set_next(prev_rec, next_rec);
2418 
2419 	/* 4. If the deleted record is pointed to by a dir slot, update the
2420 	record pointer in slot. In the following if-clause we assume that
2421 	prev_rec is owned by the same slot, i.e., PAGE_DIR_SLOT_MIN_N_OWNED
2422 	>= 2. */
2423 
2424 	compile_time_assert(PAGE_DIR_SLOT_MIN_N_OWNED >= 2);
2425 	ut_ad(cur_n_owned > 1);
2426 
2427 	if (current_rec == page_dir_slot_get_rec(cur_dir_slot)) {
2428 		page_dir_slot_set_rec(cur_dir_slot, prev_rec);
2429 	}
2430 
2431 	/* 5. Update the number of owned records of the slot */
2432 
2433 	page_dir_slot_set_n_owned(cur_dir_slot, page_zip, cur_n_owned - 1);
2434 
2435 	/* 6. Free the memory occupied by the record */
2436 	page_mem_free(page, page_zip, current_rec, index, offsets);
2437 
2438 	/* 7. Now we have decremented the number of owned records of the slot.
2439 	If the number drops below PAGE_DIR_SLOT_MIN_N_OWNED, we balance the
2440 	slots. */
2441 
2442 	if (cur_n_owned <= PAGE_DIR_SLOT_MIN_N_OWNED) {
2443 		page_dir_balance_slot(page, page_zip, cur_slot_no);
2444 	}
2445 }
2446 
2447 #ifdef UNIV_COMPILE_TEST_FUNCS
2448 
2449 /*******************************************************************//**
2450 Print the first n numbers, generated by ut_rnd_gen() to make sure
2451 (visually) that it works properly. */
2452 void
test_ut_rnd_gen(int n)2453 test_ut_rnd_gen(
2454 	int	n)	/*!< in: print first n numbers */
2455 {
2456 	int			i;
2457 	unsigned long long	rnd;
2458 
2459 	for (i = 0; i < n; i++) {
2460 		rnd = ut_rnd_gen();
2461 		printf("%llu\t%%2=%llu %%3=%llu %%5=%llu %%7=%llu %%11=%llu\n",
2462 		       rnd,
2463 		       rnd % 2,
2464 		       rnd % 3,
2465 		       rnd % 5,
2466 		       rnd % 7,
2467 		       rnd % 11);
2468 	}
2469 }
2470 
2471 #endif /* UNIV_COMPILE_TEST_FUNCS */
2472