1 /*****************************************************************************
2 
3 Copyright (c) 1994, 2021, Oracle and/or its affiliates.
4 Copyright (c) 2012, Facebook Inc.
5 
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License, version 2.0,
8 as published by the Free Software Foundation.
9 
10 This program is also distributed with certain software (including
11 but not limited to OpenSSL) that is licensed under separate terms,
12 as designated in a particular file or component or in included license
13 documentation.  The authors of MySQL hereby grant you an additional
14 permission to link the program and your derivative works with the
15 separately licensed software that they have included with MySQL.
16 
17 This program is distributed in the hope that it will be useful,
18 but WITHOUT ANY WARRANTY; without even the implied warranty of
19 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20 GNU General Public License, version 2.0, for more details.
21 
22 You should have received a copy of the GNU General Public License along with
23 this program; if not, write to the Free Software Foundation, Inc.,
24 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
25 
26 *****************************************************************************/
27 
28 /********************************************************************//**
29 @file page/page0cur.cc
30 The page cursor
31 
32 Created 10/4/1994 Heikki Tuuri
33 *************************************************************************/
34 
35 #include "ha_prototypes.h"
36 
37 #include "page0cur.h"
38 #ifdef UNIV_NONINL
39 #include "page0cur.ic"
40 #endif
41 
42 #include "page0zip.h"
43 #include "btr0btr.h"
44 #include "mtr0log.h"
45 #include "log0recv.h"
46 #ifndef UNIV_HOTBACKUP
47 #include "rem0cmp.h"
48 #include "gis0rtree.h"
49 
50 #include <algorithm>
51 
52 #ifdef PAGE_CUR_ADAPT
53 # ifdef UNIV_SEARCH_PERF_STAT
54 static ulint	page_cur_short_succ	= 0;
55 # endif /* UNIV_SEARCH_PERF_STAT */
56 
57 /*******************************************************************//**
58 This is a linear congruential generator PRNG. Returns a pseudo random
59 number between 0 and 2^64-1 inclusive. The formula and the constants
60 being used are:
61 X[n+1] = (a * X[n] + c) mod m
62 where:
63 X[0] = ut_time_monotonic_us()
64 a = 1103515245 (3^5 * 5 * 7 * 129749)
65 c = 12345 (3 * 5 * 823)
66 m = 18446744073709551616 (2^64)
67 
68 @return number between 0 and 2^64-1 */
69 static
70 ib_uint64_t
page_cur_lcg_prng(void)71 page_cur_lcg_prng(void)
72 /*===================*/
73 {
74 #define LCG_a	1103515245
75 #define LCG_c	12345
76 	static ib_uint64_t	lcg_current = 0;
77 	static ibool		initialized = FALSE;
78 
79 	if (!initialized) {
80 		lcg_current = (ib_uint64_t) ut_time_monotonic_us();
81 		initialized = TRUE;
82 	}
83 
84 	/* no need to "% 2^64" explicitly because lcg_current is
85 	64 bit and this will be done anyway */
86 	lcg_current = LCG_a * lcg_current + LCG_c;
87 
88 	return(lcg_current);
89 }
90 
91 /** Try a search shortcut based on the last insert.
92 @param[in]	block			index page
93 @param[in]	index			index tree
94 @param[in]	tuple			search key
95 @param[in,out]	iup_matched_fields	already matched fields in the
96 upper limit record
97 @param[in,out]	ilow_matched_fields	already matched fields in the
98 lower limit record
99 @param[out]	cursor			page cursor
100 @return true on success */
101 UNIV_INLINE
102 bool
page_cur_try_search_shortcut(const buf_block_t * block,const dict_index_t * index,const dtuple_t * tuple,ulint * iup_matched_fields,ulint * ilow_matched_fields,page_cur_t * cursor)103 page_cur_try_search_shortcut(
104 	const buf_block_t*	block,
105 	const dict_index_t*	index,
106 	const dtuple_t*		tuple,
107 	ulint*			iup_matched_fields,
108 	ulint*			ilow_matched_fields,
109 	page_cur_t*		cursor)
110 {
111 	const rec_t*	rec;
112 	const rec_t*	next_rec;
113 	ulint		low_match;
114 	ulint		up_match;
115 	ibool		success		= FALSE;
116 	const page_t*	page		= buf_block_get_frame(block);
117 	mem_heap_t*	heap		= NULL;
118 	ulint		offsets_[REC_OFFS_NORMAL_SIZE];
119 	ulint*		offsets		= offsets_;
120 	rec_offs_init(offsets_);
121 
122 	ut_ad(dtuple_check_typed(tuple));
123 
124 	rec = page_header_get_ptr(page, PAGE_LAST_INSERT);
125 	offsets = rec_get_offsets(rec, index, offsets,
126 				  dtuple_get_n_fields(tuple), &heap);
127 
128 	ut_ad(rec);
129 	ut_ad(page_rec_is_user_rec(rec));
130 
131 	low_match = up_match = std::min(*ilow_matched_fields,
132 					*iup_matched_fields);
133 
134 	if (cmp_dtuple_rec_with_match(tuple, rec, offsets, &low_match) < 0) {
135 		goto exit_func;
136 	}
137 
138 	next_rec = page_rec_get_next_const(rec);
139 	if (!page_rec_is_supremum(next_rec)) {
140 		offsets = rec_get_offsets(next_rec, index, offsets,
141 					  dtuple_get_n_fields(tuple), &heap);
142 
143 		if (cmp_dtuple_rec_with_match(tuple, next_rec, offsets,
144 					      &up_match) >= 0) {
145 			goto exit_func;
146 		}
147 
148 		*iup_matched_fields = up_match;
149 	}
150 
151 	page_cur_position(rec, block, cursor);
152 
153 	*ilow_matched_fields = low_match;
154 
155 #ifdef UNIV_SEARCH_PERF_STAT
156 	page_cur_short_succ++;
157 #endif
158 	success = TRUE;
159 exit_func:
160 	if (UNIV_LIKELY_NULL(heap)) {
161 		mem_heap_free(heap);
162 	}
163 	return(success);
164 }
165 
166 /** Try a search shortcut based on the last insert.
167 @param[in]	block			index page
168 @param[in]	index			index tree
169 @param[in]	tuple			search key
170 @param[in,out]	iup_matched_fields	already matched fields in the
171 upper limit record
172 @param[in,out]	iup_matched_bytes	already matched bytes in the
173 first partially matched field in the upper limit record
174 @param[in,out]	ilow_matched_fields	already matched fields in the
175 lower limit record
176 @param[in,out]	ilow_matched_bytes	already matched bytes in the
177 first partially matched field in the lower limit record
178 @param[out]	cursor			page cursor
179 @return true on success */
180 UNIV_INLINE
181 bool
page_cur_try_search_shortcut_bytes(const buf_block_t * block,const dict_index_t * index,const dtuple_t * tuple,ulint * iup_matched_fields,ulint * iup_matched_bytes,ulint * ilow_matched_fields,ulint * ilow_matched_bytes,page_cur_t * cursor)182 page_cur_try_search_shortcut_bytes(
183 	const buf_block_t*	block,
184 	const dict_index_t*	index,
185 	const dtuple_t*		tuple,
186 	ulint*			iup_matched_fields,
187 	ulint*			iup_matched_bytes,
188 	ulint*			ilow_matched_fields,
189 	ulint*			ilow_matched_bytes,
190 	page_cur_t*		cursor)
191 {
192 	const rec_t*	rec;
193 	const rec_t*	next_rec;
194 	ulint		low_match;
195 	ulint		low_bytes;
196 	ulint		up_match;
197 	ulint		up_bytes;
198 	ibool		success		= FALSE;
199 	const page_t*	page		= buf_block_get_frame(block);
200 	mem_heap_t*	heap		= NULL;
201 	ulint		offsets_[REC_OFFS_NORMAL_SIZE];
202 	ulint*		offsets		= offsets_;
203 	rec_offs_init(offsets_);
204 
205 	ut_ad(dtuple_check_typed(tuple));
206 
207 	rec = page_header_get_ptr(page, PAGE_LAST_INSERT);
208 	offsets = rec_get_offsets(rec, index, offsets,
209 				  dtuple_get_n_fields(tuple), &heap);
210 
211 	ut_ad(rec);
212 	ut_ad(page_rec_is_user_rec(rec));
213 	if (ut_pair_cmp(*ilow_matched_fields, *ilow_matched_bytes,
214 			*iup_matched_fields, *iup_matched_bytes) < 0) {
215 		up_match = low_match = *ilow_matched_fields;
216 		up_bytes = low_bytes = *ilow_matched_bytes;
217 	} else {
218 		up_match = low_match = *iup_matched_fields;
219 		up_bytes = low_bytes = *iup_matched_bytes;
220 	}
221 
222 	if (cmp_dtuple_rec_with_match_bytes(
223 		    tuple, rec, index, offsets, &low_match, &low_bytes) < 0) {
224 		goto exit_func;
225 	}
226 
227 	next_rec = page_rec_get_next_const(rec);
228 	if (!page_rec_is_supremum(next_rec)) {
229 		offsets = rec_get_offsets(next_rec, index, offsets,
230 					  dtuple_get_n_fields(tuple), &heap);
231 
232 		if (cmp_dtuple_rec_with_match_bytes(
233 			    tuple, next_rec, index, offsets,
234 			    &up_match, &up_bytes)
235 		    >= 0) {
236 			goto exit_func;
237 		}
238 
239 		*iup_matched_fields = up_match;
240 		*iup_matched_bytes = up_bytes;
241 	}
242 
243 	page_cur_position(rec, block, cursor);
244 
245 	*ilow_matched_fields = low_match;
246 	*ilow_matched_bytes = low_bytes;
247 
248 #ifdef UNIV_SEARCH_PERF_STAT
249 	page_cur_short_succ++;
250 #endif
251 	success = TRUE;
252 exit_func:
253 	if (UNIV_LIKELY_NULL(heap)) {
254 		mem_heap_free(heap);
255 	}
256 	return(success);
257 }
258 #endif
259 
260 #ifdef PAGE_CUR_LE_OR_EXTENDS
261 /****************************************************************//**
262 Checks if the nth field in a record is a character type field which extends
263 the nth field in tuple, i.e., the field is longer or equal in length and has
264 common first characters.
265 @return TRUE if rec field extends tuple field */
266 static
267 ibool
page_cur_rec_field_extends(const dtuple_t * tuple,const rec_t * rec,const ulint * offsets,ulint n)268 page_cur_rec_field_extends(
269 /*=======================*/
270 	const dtuple_t*	tuple,	/*!< in: data tuple */
271 	const rec_t*	rec,	/*!< in: record */
272 	const ulint*	offsets,/*!< in: array returned by rec_get_offsets() */
273 	ulint		n)	/*!< in: compare nth field */
274 {
275 	const dtype_t*	type;
276 	const dfield_t*	dfield;
277 	const byte*	rec_f;
278 	ulint		rec_f_len;
279 
280 	ut_ad(rec_offs_validate(rec, NULL, offsets));
281 	dfield = dtuple_get_nth_field(tuple, n);
282 
283 	type = dfield_get_type(dfield);
284 
285 	rec_f = rec_get_nth_field(rec, offsets, n, &rec_f_len);
286 
287 	if (type->mtype == DATA_VARCHAR
288 	    || type->mtype == DATA_CHAR
289 	    || type->mtype == DATA_FIXBINARY
290 	    || type->mtype == DATA_BINARY
291 	    || type->mtype == DATA_BLOB
292 	    || DATA_GEOMETRY_MTYPE(type->mtype)
293 	    || type->mtype == DATA_VARMYSQL
294 	    || type->mtype == DATA_MYSQL) {
295 
296 		if (dfield_get_len(dfield) != UNIV_SQL_NULL
297 		    && rec_f_len != UNIV_SQL_NULL
298 		    && rec_f_len >= dfield_get_len(dfield)
299 		    && !cmp_data_data(type->mtype, type->prtype,
300 				      dfield_get_data(dfield),
301 				      dfield_get_len(dfield),
302 				      rec_f, dfield_get_len(dfield))) {
303 
304 			return(TRUE);
305 		}
306 	}
307 
308 	return(FALSE);
309 }
310 #endif /* PAGE_CUR_LE_OR_EXTENDS */
311 
312 /** If key is fixed length then populate offset directly from
313 cached version.
314 @param[in]	rec	B-Tree record for which offset needs to be
315 			populated.
316 @param[in,out]	index	index handler
317 @param[in]	tuple	data tuple
318 @param[in,out]	offsets	default offsets array
319 @param[in,out]	heap	heap
320 @return reference to populate offsets. */
321 static
322 ulint*
populate_offsets(const rec_t * rec,const dtuple_t * tuple,dict_index_t * index,ulint * offsets,mem_heap_t ** heap)323 populate_offsets(
324 	const rec_t*		rec,
325 	const dtuple_t*		tuple,
326 	dict_index_t*		index,
327 	ulint*			offsets,
328 	mem_heap_t**		heap)
329 {
330 	ut_ad(dict_table_is_intrinsic(index->table));
331 
332 	bool rec_has_null_values	= false;
333 
334 	if (index->rec_cache.key_has_null_cols) {
335 		/* Check if record has null value. */
336 		const byte*	nulls = rec - (1 + REC_N_NEW_EXTRA_BYTES);
337 		ulint		n_bytes_to_scan
338 			= UT_BITS_IN_BYTES(index->n_nullable);
339 		byte		null_mask = 0xff;
340 		ulint		bits_examined = 0;
341 
342 		for (ulint i = 0; i < n_bytes_to_scan - 1; i++) {
343 			if (*nulls & null_mask) {
344 				rec_has_null_values = true;
345 				break;
346 			}
347 			--nulls;
348 			bits_examined += 8;
349 		}
350 
351 		if (!rec_has_null_values) {
352 			null_mask >>= (8 - (index->n_nullable - bits_examined));
353 			rec_has_null_values = *nulls & null_mask;
354 		}
355 
356 		if (rec_has_null_values) {
357 
358 			offsets = rec_get_offsets(
359 				rec, index, offsets,
360 				dtuple_get_n_fields_cmp(tuple), heap);
361 
362 			return(offsets);
363 		}
364 	}
365 
366 	/* Check if offsets are cached else cache them first.
367 	There are queries that will first verify if key is present using index
368 	search and then initiate insert. If offsets are cached during index
369 	search it would be based on key part only but during insert that looks
370 	out for exact location to insert key + db_row_id both columns would
371 	be used and so re-compute offsets in such case. */
372 	if (!index->rec_cache.offsets_cached
373 	    || (rec_offs_n_fields(index->rec_cache.offsets)
374 		< dtuple_get_n_fields_cmp(tuple))) {
375 
376 		offsets = rec_get_offsets(
377 			rec, index, offsets,
378 			dtuple_get_n_fields_cmp(tuple), heap);
379 
380 		/* Reallocate if our offset array is not big
381 		enough to hold the needed size. */
382 		ulint sz1 = index->rec_cache.sz_of_offsets;
383 		ulint sz2 = offsets[0];
384 		if (sz1 < sz2) {
385 			index->rec_cache.offsets = static_cast<ulint*>(
386 				mem_heap_alloc(
387 					index->heap, sizeof(ulint) * sz2));
388 			index->rec_cache.sz_of_offsets =
389 				static_cast<uint32_t>(sz2);
390 		}
391 
392 		memcpy(index->rec_cache.offsets,
393 		       offsets, (sizeof(ulint) * sz2));
394 		index->rec_cache.offsets_cached = true;
395 	}
396 
397 	ut_ad(index->rec_cache.offsets[2] = (ulint) rec);
398 
399 	return(index->rec_cache.offsets);
400 }
401 
402 /****************************************************************//**
403 Searches the right position for a page cursor. */
404 void
page_cur_search_with_match(const buf_block_t * block,const dict_index_t * index,const dtuple_t * tuple,page_cur_mode_t mode,ulint * iup_matched_fields,ulint * ilow_matched_fields,page_cur_t * cursor,rtr_info_t * rtr_info)405 page_cur_search_with_match(
406 /*=======================*/
407 	const buf_block_t*	block,	/*!< in: buffer block */
408 	const dict_index_t*	index,	/*!< in/out: record descriptor */
409 	const dtuple_t*		tuple,	/*!< in: data tuple */
410 	page_cur_mode_t		mode,	/*!< in: PAGE_CUR_L,
411 					PAGE_CUR_LE, PAGE_CUR_G, or
412 					PAGE_CUR_GE */
413 	ulint*			iup_matched_fields,
414 					/*!< in/out: already matched
415 					fields in upper limit record */
416 	ulint*			ilow_matched_fields,
417 					/*!< in/out: already matched
418 					fields in lower limit record */
419 	page_cur_t*		cursor,	/*!< out: page cursor */
420 	rtr_info_t*		rtr_info)/*!< in/out: rtree search stack */
421 {
422 	ulint		up;
423 	ulint		low;
424 	ulint		mid;
425 	const page_t*	page;
426 	const page_dir_slot_t* slot;
427 	const rec_t*	up_rec;
428 	const rec_t*	low_rec;
429 	const rec_t*	mid_rec;
430 	ulint		up_matched_fields;
431 	ulint		low_matched_fields;
432 	ulint		cur_matched_fields;
433 	int		cmp;
434 #ifdef UNIV_ZIP_DEBUG
435 	const page_zip_des_t*	page_zip = buf_block_get_page_zip(block);
436 #endif /* UNIV_ZIP_DEBUG */
437 	mem_heap_t*	heap		= NULL;
438 	ulint		offsets_[REC_OFFS_NORMAL_SIZE];
439 	ulint*		offsets		= offsets_;
440 	rec_offs_init(offsets_);
441 
442 	ut_ad(dtuple_validate(tuple));
443 #ifdef UNIV_DEBUG
444 # ifdef PAGE_CUR_DBG
445 	if (mode != PAGE_CUR_DBG)
446 # endif /* PAGE_CUR_DBG */
447 # ifdef PAGE_CUR_LE_OR_EXTENDS
448 		if (mode != PAGE_CUR_LE_OR_EXTENDS)
449 # endif /* PAGE_CUR_LE_OR_EXTENDS */
450 			ut_ad(mode == PAGE_CUR_L || mode == PAGE_CUR_LE
451 			      || mode == PAGE_CUR_G || mode == PAGE_CUR_GE
452 			      || dict_index_is_spatial(index));
453 #endif /* UNIV_DEBUG */
454 	page = buf_block_get_frame(block);
455 #ifdef UNIV_ZIP_DEBUG
456 	ut_a(!page_zip || page_zip_validate(page_zip, page, index));
457 #endif /* UNIV_ZIP_DEBUG */
458 
459 	ut_d(page_check_dir(page));
460 
461 #ifdef PAGE_CUR_ADAPT
462 	if (page_is_leaf(page)
463 	    && (mode == PAGE_CUR_LE)
464 	    && !dict_index_is_spatial(index)
465 	    && (page_header_get_field(page, PAGE_N_DIRECTION) > 3)
466 	    && (page_header_get_ptr(page, PAGE_LAST_INSERT))
467 	    && (page_header_get_field(page, PAGE_DIRECTION) == PAGE_RIGHT)) {
468 
469 		if (page_cur_try_search_shortcut(
470 			    block, index, tuple,
471 			    iup_matched_fields,
472 			    ilow_matched_fields,
473 			    cursor)) {
474 			return;
475 		}
476 	}
477 # ifdef PAGE_CUR_DBG
478 	if (mode == PAGE_CUR_DBG) {
479 		mode = PAGE_CUR_LE;
480 	}
481 # endif
482 #endif
483 
484 	/* If the mode is for R-tree indexes, use the special MBR
485 	related compare functions */
486 	if (dict_index_is_spatial(index) && mode > PAGE_CUR_LE) {
487 		/* For leaf level insert, we still use the traditional
488 		compare function for now */
489 		if (mode == PAGE_CUR_RTREE_INSERT && page_is_leaf(page)){
490 			mode = PAGE_CUR_LE;
491 		} else {
492 			rtr_cur_search_with_match(
493 				block, (dict_index_t*)index, tuple, mode,
494 				cursor, rtr_info);
495 			return;
496 		}
497 	}
498 
499 	/* The following flag does not work for non-latin1 char sets because
500 	cmp_full_field does not tell how many bytes matched */
501 #ifdef PAGE_CUR_LE_OR_EXTENDS
502 	ut_a(mode != PAGE_CUR_LE_OR_EXTENDS);
503 #endif /* PAGE_CUR_LE_OR_EXTENDS */
504 
505 	/* If mode PAGE_CUR_G is specified, we are trying to position the
506 	cursor to answer a query of the form "tuple < X", where tuple is
507 	the input parameter, and X denotes an arbitrary physical record on
508 	the page. We want to position the cursor on the first X which
509 	satisfies the condition. */
510 
511 	up_matched_fields  = *iup_matched_fields;
512 	low_matched_fields = *ilow_matched_fields;
513 
514 	/* Perform binary search. First the search is done through the page
515 	directory, after that as a linear search in the list of records
516 	owned by the upper limit directory slot. */
517 
518 	low = 0;
519 	up = page_dir_get_n_slots(page) - 1;
520 
521 	/* Perform binary search until the lower and upper limit directory
522 	slots come to the distance 1 of each other */
523 
524 	while (up - low > 1) {
525 		mid = (low + up) / 2;
526 		slot = page_dir_get_nth_slot(page, mid);
527 		mid_rec = page_dir_slot_get_rec(slot);
528 
529 		cur_matched_fields = std::min(low_matched_fields,
530 					      up_matched_fields);
531 
532 		offsets = offsets_;
533 		if (index->rec_cache.fixed_len_key) {
534 			offsets = populate_offsets(
535 				mid_rec, tuple,
536 				const_cast<dict_index_t*>(index),
537 				offsets, &heap);
538 		} else {
539 			offsets = rec_get_offsets(
540 				mid_rec, index, offsets,
541 				dtuple_get_n_fields_cmp(tuple), &heap);
542 
543 		}
544 
545 		cmp = cmp_dtuple_rec_with_match(
546 			tuple, mid_rec, offsets, &cur_matched_fields);
547 
548 		if (cmp > 0) {
549 low_slot_match:
550 			low = mid;
551 			low_matched_fields = cur_matched_fields;
552 
553 		} else if (cmp) {
554 #ifdef PAGE_CUR_LE_OR_EXTENDS
555 			if (mode == PAGE_CUR_LE_OR_EXTENDS
556 			    && page_cur_rec_field_extends(
557 				    tuple, mid_rec, offsets,
558 				    cur_matched_fields)) {
559 
560 				goto low_slot_match;
561 			}
562 #endif /* PAGE_CUR_LE_OR_EXTENDS */
563 up_slot_match:
564 			up = mid;
565 			up_matched_fields = cur_matched_fields;
566 
567 		} else if (mode == PAGE_CUR_G || mode == PAGE_CUR_LE
568 #ifdef PAGE_CUR_LE_OR_EXTENDS
569 			   || mode == PAGE_CUR_LE_OR_EXTENDS
570 #endif /* PAGE_CUR_LE_OR_EXTENDS */
571 			   ) {
572 			goto low_slot_match;
573 		} else {
574 
575 			goto up_slot_match;
576 		}
577 	}
578 
579 	slot = page_dir_get_nth_slot(page, low);
580 	low_rec = page_dir_slot_get_rec(slot);
581 	slot = page_dir_get_nth_slot(page, up);
582 	up_rec = page_dir_slot_get_rec(slot);
583 
584 	/* Perform linear search until the upper and lower records come to
585 	distance 1 of each other. */
586 
587 	while (page_rec_get_next_const(low_rec) != up_rec) {
588 
589 		mid_rec = page_rec_get_next_const(low_rec);
590 
591 		cur_matched_fields = std::min(low_matched_fields,
592 					      up_matched_fields);
593 
594 		offsets = offsets_;
595 		if (index->rec_cache.fixed_len_key) {
596 			offsets = populate_offsets(
597 				mid_rec, tuple,
598 				const_cast<dict_index_t*>(index),
599 				offsets, &heap);
600 		} else {
601 			offsets = rec_get_offsets(
602 				mid_rec, index, offsets,
603 				dtuple_get_n_fields_cmp(tuple), &heap);
604 
605 		}
606 
607 		cmp = cmp_dtuple_rec_with_match(
608 			tuple, mid_rec, offsets, &cur_matched_fields);
609 
610 		if (cmp > 0) {
611 low_rec_match:
612 			low_rec = mid_rec;
613 			low_matched_fields = cur_matched_fields;
614 
615 		} else if (cmp) {
616 #ifdef PAGE_CUR_LE_OR_EXTENDS
617 			if (mode == PAGE_CUR_LE_OR_EXTENDS
618 			    && page_cur_rec_field_extends(
619 				    tuple, mid_rec, offsets,
620 				    cur_matched_fields)) {
621 
622 				goto low_rec_match;
623 			}
624 #endif /* PAGE_CUR_LE_OR_EXTENDS */
625 up_rec_match:
626 			up_rec = mid_rec;
627 			up_matched_fields = cur_matched_fields;
628 		} else if (mode == PAGE_CUR_G || mode == PAGE_CUR_LE
629 #ifdef PAGE_CUR_LE_OR_EXTENDS
630 			   || mode == PAGE_CUR_LE_OR_EXTENDS
631 #endif /* PAGE_CUR_LE_OR_EXTENDS */
632 			   ) {
633 			if (!cmp && !cur_matched_fields) {
634 #ifdef UNIV_DEBUG
635 				mtr_t	mtr;
636 				mtr_start(&mtr);
637 
638 				/* We got a match, but cur_matched_fields is
639 				0, it must have REC_INFO_MIN_REC_FLAG */
640 				ulint   rec_info = rec_get_info_bits(mid_rec,
641                                                      rec_offs_comp(offsets));
642 				ut_ad(rec_info & REC_INFO_MIN_REC_FLAG);
643 				ut_ad(btr_page_get_prev(page, &mtr) == FIL_NULL);
644 				mtr_commit(&mtr);
645 #endif
646 
647 				cur_matched_fields = dtuple_get_n_fields_cmp(tuple);
648 			}
649 
650 			goto low_rec_match;
651 		} else {
652 
653 			goto up_rec_match;
654 		}
655 	}
656 
657 	if (mode <= PAGE_CUR_GE) {
658 		page_cur_position(up_rec, block, cursor);
659 	} else {
660 		page_cur_position(low_rec, block, cursor);
661 	}
662 
663 	*iup_matched_fields  = up_matched_fields;
664 	*ilow_matched_fields = low_matched_fields;
665 	if (UNIV_LIKELY_NULL(heap)) {
666 		mem_heap_free(heap);
667 	}
668 }
669 
670 /** Search the right position for a page cursor.
671 @param[in]	block			buffer block
672 @param[in]	index			index tree
673 @param[in]	tuple			key to be searched for
674 @param[in]	mode			search mode
675 @param[in,out]	iup_matched_fields	already matched fields in the
676 upper limit record
677 @param[in,out]	iup_matched_bytes	already matched bytes in the
678 first partially matched field in the upper limit record
679 @param[in,out]	ilow_matched_fields	already matched fields in the
680 lower limit record
681 @param[in,out]	ilow_matched_bytes	already matched bytes in the
682 first partially matched field in the lower limit record
683 @param[out]	cursor			page cursor */
684 void
page_cur_search_with_match_bytes(const buf_block_t * block,const dict_index_t * index,const dtuple_t * tuple,page_cur_mode_t mode,ulint * iup_matched_fields,ulint * iup_matched_bytes,ulint * ilow_matched_fields,ulint * ilow_matched_bytes,page_cur_t * cursor)685 page_cur_search_with_match_bytes(
686 	const buf_block_t*	block,
687 	const dict_index_t*	index,
688 	const dtuple_t*		tuple,
689 	page_cur_mode_t		mode,
690 	ulint*			iup_matched_fields,
691 	ulint*			iup_matched_bytes,
692 	ulint*			ilow_matched_fields,
693 	ulint*			ilow_matched_bytes,
694 	page_cur_t*		cursor)
695 {
696 	ulint		up;
697 	ulint		low;
698 	ulint		mid;
699 	const page_t*	page;
700 	const page_dir_slot_t* slot;
701 	const rec_t*	up_rec;
702 	const rec_t*	low_rec;
703 	const rec_t*	mid_rec;
704 	ulint		up_matched_fields;
705 	ulint		up_matched_bytes;
706 	ulint		low_matched_fields;
707 	ulint		low_matched_bytes;
708 	ulint		cur_matched_fields;
709 	ulint		cur_matched_bytes;
710 	int		cmp;
711 #ifdef UNIV_ZIP_DEBUG
712 	const page_zip_des_t*	page_zip = buf_block_get_page_zip(block);
713 #endif /* UNIV_ZIP_DEBUG */
714 	mem_heap_t*	heap		= NULL;
715 	ulint		offsets_[REC_OFFS_NORMAL_SIZE];
716 	ulint*		offsets		= offsets_;
717 	rec_offs_init(offsets_);
718 
719 	ut_ad(dtuple_validate(tuple));
720 #ifdef UNIV_DEBUG
721 # ifdef PAGE_CUR_DBG
722 	if (mode != PAGE_CUR_DBG)
723 # endif /* PAGE_CUR_DBG */
724 # ifdef PAGE_CUR_LE_OR_EXTENDS
725 		if (mode != PAGE_CUR_LE_OR_EXTENDS)
726 # endif /* PAGE_CUR_LE_OR_EXTENDS */
727 			ut_ad(mode == PAGE_CUR_L || mode == PAGE_CUR_LE
728 			      || mode == PAGE_CUR_G || mode == PAGE_CUR_GE);
729 #endif /* UNIV_DEBUG */
730 	page = buf_block_get_frame(block);
731 #ifdef UNIV_ZIP_DEBUG
732 	ut_a(!page_zip || page_zip_validate(page_zip, page, index));
733 #endif /* UNIV_ZIP_DEBUG */
734 
735 	ut_d(page_check_dir(page));
736 
737 #ifdef PAGE_CUR_ADAPT
738 	if (page_is_leaf(page)
739 	    && (mode == PAGE_CUR_LE)
740 	    && (page_header_get_field(page, PAGE_N_DIRECTION) > 3)
741 	    && (page_header_get_ptr(page, PAGE_LAST_INSERT))
742 	    && (page_header_get_field(page, PAGE_DIRECTION) == PAGE_RIGHT)) {
743 
744 		if (page_cur_try_search_shortcut_bytes(
745 			    block, index, tuple,
746 			    iup_matched_fields, iup_matched_bytes,
747 			    ilow_matched_fields, ilow_matched_bytes,
748 			    cursor)) {
749 			return;
750 		}
751 	}
752 # ifdef PAGE_CUR_DBG
753 	if (mode == PAGE_CUR_DBG) {
754 		mode = PAGE_CUR_LE;
755 	}
756 # endif
757 #endif
758 
759 	/* The following flag does not work for non-latin1 char sets because
760 	cmp_full_field does not tell how many bytes matched */
761 #ifdef PAGE_CUR_LE_OR_EXTENDS
762 	ut_a(mode != PAGE_CUR_LE_OR_EXTENDS);
763 #endif /* PAGE_CUR_LE_OR_EXTENDS */
764 
765 	/* If mode PAGE_CUR_G is specified, we are trying to position the
766 	cursor to answer a query of the form "tuple < X", where tuple is
767 	the input parameter, and X denotes an arbitrary physical record on
768 	the page. We want to position the cursor on the first X which
769 	satisfies the condition. */
770 
771 	up_matched_fields  = *iup_matched_fields;
772 	up_matched_bytes  = *iup_matched_bytes;
773 	low_matched_fields = *ilow_matched_fields;
774 	low_matched_bytes  = *ilow_matched_bytes;
775 
776 	/* Perform binary search. First the search is done through the page
777 	directory, after that as a linear search in the list of records
778 	owned by the upper limit directory slot. */
779 
780 	low = 0;
781 	up = page_dir_get_n_slots(page) - 1;
782 
783 	/* Perform binary search until the lower and upper limit directory
784 	slots come to the distance 1 of each other */
785 
786 	while (up - low > 1) {
787 		mid = (low + up) / 2;
788 		slot = page_dir_get_nth_slot(page, mid);
789 		mid_rec = page_dir_slot_get_rec(slot);
790 
791 		ut_pair_min(&cur_matched_fields, &cur_matched_bytes,
792 			    low_matched_fields, low_matched_bytes,
793 			    up_matched_fields, up_matched_bytes);
794 
795 		offsets = rec_get_offsets(
796 			mid_rec, index, offsets_,
797 			dtuple_get_n_fields_cmp(tuple), &heap);
798 
799 		cmp = cmp_dtuple_rec_with_match_bytes(
800 			tuple, mid_rec, index, offsets,
801 			&cur_matched_fields, &cur_matched_bytes);
802 
803 		if (cmp > 0) {
804 low_slot_match:
805 			low = mid;
806 			low_matched_fields = cur_matched_fields;
807 			low_matched_bytes = cur_matched_bytes;
808 
809 		} else if (cmp) {
810 #ifdef PAGE_CUR_LE_OR_EXTENDS
811 			if (mode == PAGE_CUR_LE_OR_EXTENDS
812 			    && page_cur_rec_field_extends(
813 				    tuple, mid_rec, offsets,
814 				    cur_matched_fields)) {
815 
816 				goto low_slot_match;
817 			}
818 #endif /* PAGE_CUR_LE_OR_EXTENDS */
819 up_slot_match:
820 			up = mid;
821 			up_matched_fields = cur_matched_fields;
822 			up_matched_bytes = cur_matched_bytes;
823 
824 		} else if (mode == PAGE_CUR_G || mode == PAGE_CUR_LE
825 #ifdef PAGE_CUR_LE_OR_EXTENDS
826 			   || mode == PAGE_CUR_LE_OR_EXTENDS
827 #endif /* PAGE_CUR_LE_OR_EXTENDS */
828 			   ) {
829 			goto low_slot_match;
830 		} else {
831 
832 			goto up_slot_match;
833 		}
834 	}
835 
836 	slot = page_dir_get_nth_slot(page, low);
837 	low_rec = page_dir_slot_get_rec(slot);
838 	slot = page_dir_get_nth_slot(page, up);
839 	up_rec = page_dir_slot_get_rec(slot);
840 
841 	/* Perform linear search until the upper and lower records come to
842 	distance 1 of each other. */
843 
844 	while (page_rec_get_next_const(low_rec) != up_rec) {
845 
846 		mid_rec = page_rec_get_next_const(low_rec);
847 
848 		ut_pair_min(&cur_matched_fields, &cur_matched_bytes,
849 			    low_matched_fields, low_matched_bytes,
850 			    up_matched_fields, up_matched_bytes);
851 
852 		offsets = rec_get_offsets(
853 			mid_rec, index, offsets_,
854 			dtuple_get_n_fields_cmp(tuple), &heap);
855 
856 		cmp = cmp_dtuple_rec_with_match_bytes(
857 			tuple, mid_rec, index, offsets,
858 			&cur_matched_fields, &cur_matched_bytes);
859 
860 		if (cmp > 0) {
861 low_rec_match:
862 			low_rec = mid_rec;
863 			low_matched_fields = cur_matched_fields;
864 			low_matched_bytes = cur_matched_bytes;
865 
866 		} else if (cmp) {
867 #ifdef PAGE_CUR_LE_OR_EXTENDS
868 			if (mode == PAGE_CUR_LE_OR_EXTENDS
869 			    && page_cur_rec_field_extends(
870 				    tuple, mid_rec, offsets,
871 				    cur_matched_fields)) {
872 
873 				goto low_rec_match;
874 			}
875 #endif /* PAGE_CUR_LE_OR_EXTENDS */
876 up_rec_match:
877 			up_rec = mid_rec;
878 			up_matched_fields = cur_matched_fields;
879 			up_matched_bytes = cur_matched_bytes;
880 		} else if (mode == PAGE_CUR_G || mode == PAGE_CUR_LE
881 #ifdef PAGE_CUR_LE_OR_EXTENDS
882 			   || mode == PAGE_CUR_LE_OR_EXTENDS
883 #endif /* PAGE_CUR_LE_OR_EXTENDS */
884 			   ) {
885 			if (!cmp && !cur_matched_fields) {
886 #ifdef UNIV_DEBUG
887 				mtr_t	mtr;
888 				mtr_start(&mtr);
889 
890 				/* We got a match, but cur_matched_fields is
891 				0, it must have REC_INFO_MIN_REC_FLAG */
892 				ulint   rec_info = rec_get_info_bits(mid_rec,
893                                                      rec_offs_comp(offsets));
894 				ut_ad(rec_info & REC_INFO_MIN_REC_FLAG);
895 				ut_ad(btr_page_get_prev(page, &mtr) == FIL_NULL);
896 				mtr_commit(&mtr);
897 #endif
898 
899 				cur_matched_fields = dtuple_get_n_fields_cmp(tuple);
900 			}
901 
902 			goto low_rec_match;
903 		} else {
904 
905 			goto up_rec_match;
906 		}
907 	}
908 
909 	if (mode <= PAGE_CUR_GE) {
910 		page_cur_position(up_rec, block, cursor);
911 	} else {
912 		page_cur_position(low_rec, block, cursor);
913 	}
914 
915 	*iup_matched_fields  = up_matched_fields;
916 	*iup_matched_bytes   = up_matched_bytes;
917 	*ilow_matched_fields = low_matched_fields;
918 	*ilow_matched_bytes  = low_matched_bytes;
919 	if (UNIV_LIKELY_NULL(heap)) {
920 		mem_heap_free(heap);
921 	}
922 }
923 
924 /***********************************************************//**
925 Positions a page cursor on a randomly chosen user record on a page. If there
926 are no user records, sets the cursor on the infimum record. */
927 void
page_cur_open_on_rnd_user_rec(buf_block_t * block,page_cur_t * cursor)928 page_cur_open_on_rnd_user_rec(
929 /*==========================*/
930 	buf_block_t*	block,	/*!< in: page */
931 	page_cur_t*	cursor)	/*!< out: page cursor */
932 {
933 	ulint	rnd;
934 	ulint	n_recs = page_get_n_recs(buf_block_get_frame(block));
935 
936 	page_cur_set_before_first(block, cursor);
937 
938 	if (UNIV_UNLIKELY(n_recs == 0)) {
939 
940 		return;
941 	}
942 
943 	rnd = (ulint) (page_cur_lcg_prng() % n_recs);
944 
945 	do {
946 		page_cur_move_to_next(cursor);
947 	} while (rnd--);
948 }
949 
950 /***********************************************************//**
951 Writes the log record of a record insert on a page. */
952 static
953 void
page_cur_insert_rec_write_log(rec_t * insert_rec,ulint rec_size,rec_t * cursor_rec,dict_index_t * index,mtr_t * mtr)954 page_cur_insert_rec_write_log(
955 /*==========================*/
956 	rec_t*		insert_rec,	/*!< in: inserted physical record */
957 	ulint		rec_size,	/*!< in: insert_rec size */
958 	rec_t*		cursor_rec,	/*!< in: record the
959 					cursor is pointing to */
960 	dict_index_t*	index,		/*!< in: record descriptor */
961 	mtr_t*		mtr)		/*!< in: mini-transaction handle */
962 {
963 	ulint	cur_rec_size;
964 	ulint	extra_size;
965 	ulint	cur_extra_size;
966 	const byte* ins_ptr;
967 	const byte* log_end;
968 	ulint	i;
969 
970 	/* Avoid REDO logging to save on costly IO because
971 	temporary tables are not recovered during crash recovery. */
972 	if (dict_table_is_temporary(index->table)) {
973 		byte*	log_ptr = mlog_open(mtr, 0);
974 		if (log_ptr == NULL) {
975 			return;
976 		}
977 		mlog_close(mtr, log_ptr);
978 		log_ptr = NULL;
979 	}
980 
981 	ut_a(rec_size < UNIV_PAGE_SIZE);
982 	ut_ad(mtr->is_named_space(index->space));
983 	ut_ad(page_align(insert_rec) == page_align(cursor_rec));
984 	ut_ad(!page_rec_is_comp(insert_rec)
985 	      == !dict_table_is_comp(index->table));
986 
987 	{
988 		mem_heap_t*	heap		= NULL;
989 		ulint		cur_offs_[REC_OFFS_NORMAL_SIZE];
990 		ulint		ins_offs_[REC_OFFS_NORMAL_SIZE];
991 
992 		ulint*		cur_offs;
993 		ulint*		ins_offs;
994 
995 		rec_offs_init(cur_offs_);
996 		rec_offs_init(ins_offs_);
997 
998 		cur_offs = rec_get_offsets(cursor_rec, index, cur_offs_,
999 					   ULINT_UNDEFINED, &heap);
1000 		ins_offs = rec_get_offsets(insert_rec, index, ins_offs_,
1001 					   ULINT_UNDEFINED, &heap);
1002 
1003 		extra_size = rec_offs_extra_size(ins_offs);
1004 		cur_extra_size = rec_offs_extra_size(cur_offs);
1005 		ut_ad(rec_size == rec_offs_size(ins_offs));
1006 		cur_rec_size = rec_offs_size(cur_offs);
1007 
1008 		if (UNIV_LIKELY_NULL(heap)) {
1009 			mem_heap_free(heap);
1010 		}
1011 	}
1012 
1013 	ins_ptr = insert_rec - extra_size;
1014 
1015 	i = 0;
1016 
1017 	if (cur_extra_size == extra_size) {
1018 		ulint		min_rec_size = ut_min(cur_rec_size, rec_size);
1019 
1020 		const byte*	cur_ptr = cursor_rec - cur_extra_size;
1021 
1022 		/* Find out the first byte in insert_rec which differs from
1023 		cursor_rec; skip the bytes in the record info */
1024 
1025 		do {
1026 			if (*ins_ptr == *cur_ptr) {
1027 				i++;
1028 				ins_ptr++;
1029 				cur_ptr++;
1030 			} else if ((i < extra_size)
1031 				   && (i >= extra_size
1032 				       - page_rec_get_base_extra_size
1033 				       (insert_rec))) {
1034 				i = extra_size;
1035 				ins_ptr = insert_rec;
1036 				cur_ptr = cursor_rec;
1037 			} else {
1038 				break;
1039 			}
1040 		} while (i < min_rec_size);
1041 	}
1042 
1043 	byte*	log_ptr;
1044 
1045 	if (mtr_get_log_mode(mtr) != MTR_LOG_SHORT_INSERTS) {
1046 
1047 		if (page_rec_is_comp(insert_rec)) {
1048 			log_ptr = mlog_open_and_write_index(
1049 				mtr, insert_rec, index, MLOG_COMP_REC_INSERT,
1050 				2 + 5 + 1 + 5 + 5 + MLOG_BUF_MARGIN);
1051 			if (UNIV_UNLIKELY(!log_ptr)) {
1052 				/* Logging in mtr is switched off
1053 				during crash recovery: in that case
1054 				mlog_open returns NULL */
1055 				return;
1056 			}
1057 		} else {
1058 			log_ptr = mlog_open(mtr, 11
1059 					    + 2 + 5 + 1 + 5 + 5
1060 					    + MLOG_BUF_MARGIN);
1061 			if (UNIV_UNLIKELY(!log_ptr)) {
1062 				/* Logging in mtr is switched off
1063 				during crash recovery: in that case
1064 				mlog_open returns NULL */
1065 				return;
1066 			}
1067 
1068 			log_ptr = mlog_write_initial_log_record_fast(
1069 				insert_rec, MLOG_REC_INSERT, log_ptr, mtr);
1070 		}
1071 
1072 		log_end = &log_ptr[2 + 5 + 1 + 5 + 5 + MLOG_BUF_MARGIN];
1073 		/* Write the cursor rec offset as a 2-byte ulint */
1074 		mach_write_to_2(log_ptr, page_offset(cursor_rec));
1075 		log_ptr += 2;
1076 	} else {
1077 		log_ptr = mlog_open(mtr, 5 + 1 + 5 + 5 + MLOG_BUF_MARGIN);
1078 		if (!log_ptr) {
1079 			/* Logging in mtr is switched off during crash
1080 			recovery: in that case mlog_open returns NULL */
1081 			return;
1082 		}
1083 		log_end = &log_ptr[5 + 1 + 5 + 5 + MLOG_BUF_MARGIN];
1084 	}
1085 
1086 	if (page_rec_is_comp(insert_rec)) {
1087 		if (UNIV_UNLIKELY
1088 		    (rec_get_info_and_status_bits(insert_rec, TRUE)
1089 		     != rec_get_info_and_status_bits(cursor_rec, TRUE))) {
1090 
1091 			goto need_extra_info;
1092 		}
1093 	} else {
1094 		if (UNIV_UNLIKELY
1095 		    (rec_get_info_and_status_bits(insert_rec, FALSE)
1096 		     != rec_get_info_and_status_bits(cursor_rec, FALSE))) {
1097 
1098 			goto need_extra_info;
1099 		}
1100 	}
1101 
1102 	if (extra_size != cur_extra_size || rec_size != cur_rec_size) {
1103 need_extra_info:
1104 		/* Write the record end segment length
1105 		and the extra info storage flag */
1106 		log_ptr += mach_write_compressed(log_ptr,
1107 						 2 * (rec_size - i) + 1);
1108 
1109 		/* Write the info bits */
1110 		mach_write_to_1(log_ptr,
1111 				rec_get_info_and_status_bits(
1112 					insert_rec,
1113 					page_rec_is_comp(insert_rec)));
1114 		log_ptr++;
1115 
1116 		/* Write the record origin offset */
1117 		log_ptr += mach_write_compressed(log_ptr, extra_size);
1118 
1119 		/* Write the mismatch index */
1120 		log_ptr += mach_write_compressed(log_ptr, i);
1121 
1122 		ut_a(i < UNIV_PAGE_SIZE);
1123 		ut_a(extra_size < UNIV_PAGE_SIZE);
1124 	} else {
1125 		/* Write the record end segment length
1126 		and the extra info storage flag */
1127 		log_ptr += mach_write_compressed(log_ptr, 2 * (rec_size - i));
1128 	}
1129 
1130 	/* Write to the log the inserted index record end segment which
1131 	differs from the cursor record */
1132 
1133 	rec_size -= i;
1134 
1135 	if (log_ptr + rec_size <= log_end) {
1136 		memcpy(log_ptr, ins_ptr, rec_size);
1137 		mlog_close(mtr, log_ptr + rec_size);
1138 	} else {
1139 		mlog_close(mtr, log_ptr);
1140 		ut_a(rec_size < UNIV_PAGE_SIZE);
1141 		mlog_catenate_string(mtr, ins_ptr, rec_size);
1142 	}
1143 }
1144 #else /* !UNIV_HOTBACKUP */
1145 # define page_cur_insert_rec_write_log(ins_rec,size,cur,index,mtr) ((void) 0)
1146 #endif /* !UNIV_HOTBACKUP */
1147 
1148 /***********************************************************//**
1149 Parses a log record of a record insert on a page.
1150 @return end of log record or NULL */
1151 byte*
page_cur_parse_insert_rec(ibool is_short,const byte * ptr,const byte * end_ptr,buf_block_t * block,dict_index_t * index,mtr_t * mtr)1152 page_cur_parse_insert_rec(
1153 /*======================*/
1154 	ibool		is_short,/*!< in: TRUE if short inserts */
1155 	const byte*	ptr,	/*!< in: buffer */
1156 	const byte*	end_ptr,/*!< in: buffer end */
1157 	buf_block_t*	block,	/*!< in: page or NULL */
1158 	dict_index_t*	index,	/*!< in: record descriptor */
1159 	mtr_t*		mtr)	/*!< in: mtr or NULL */
1160 {
1161 	ulint	origin_offset		= 0; /* remove warning */
1162 	ulint	end_seg_len;
1163 	ulint	mismatch_index		= 0; /* remove warning */
1164 	page_t*	page;
1165 	rec_t*	cursor_rec;
1166 	byte	buf1[1024];
1167 	byte*	buf;
1168 	const byte*	ptr2		= ptr;
1169 	ulint		info_and_status_bits = 0; /* remove warning */
1170 	page_cur_t	cursor;
1171 	mem_heap_t*	heap		= NULL;
1172 	ulint		offsets_[REC_OFFS_NORMAL_SIZE];
1173 	ulint*		offsets		= offsets_;
1174 	rec_offs_init(offsets_);
1175 
1176 	page = block ? buf_block_get_frame(block) : NULL;
1177 
1178 	if (is_short) {
1179 		cursor_rec = page_rec_get_prev(page_get_supremum_rec(page));
1180 	} else {
1181 		ulint	offset;
1182 
1183 		/* Read the cursor rec offset as a 2-byte ulint */
1184 
1185 		if (UNIV_UNLIKELY(end_ptr < ptr + 2)) {
1186 
1187 			return(NULL);
1188 		}
1189 
1190 		offset = mach_read_from_2(ptr);
1191 		ptr += 2;
1192 
1193 		cursor_rec = page + offset;
1194 
1195 		if (offset >= UNIV_PAGE_SIZE) {
1196 
1197 			recv_sys->found_corrupt_log = TRUE;
1198 
1199 			return(NULL);
1200 		}
1201 	}
1202 
1203 	end_seg_len = mach_parse_compressed(&ptr, end_ptr);
1204 
1205 	if (ptr == NULL) {
1206 
1207 		return(NULL);
1208 	}
1209 
1210 	if (end_seg_len >= UNIV_PAGE_SIZE << 1) {
1211 		recv_sys->found_corrupt_log = TRUE;
1212 
1213 		return(NULL);
1214 	}
1215 
1216 	if (end_seg_len & 0x1UL) {
1217 		/* Read the info bits */
1218 
1219 		if (end_ptr < ptr + 1) {
1220 
1221 			return(NULL);
1222 		}
1223 
1224 		info_and_status_bits = mach_read_from_1(ptr);
1225 		ptr++;
1226 
1227 		origin_offset = mach_parse_compressed(&ptr, end_ptr);
1228 
1229 		if (ptr == NULL) {
1230 
1231 			return(NULL);
1232 		}
1233 
1234 		ut_a(origin_offset < UNIV_PAGE_SIZE);
1235 
1236 		mismatch_index = mach_parse_compressed(&ptr, end_ptr);
1237 
1238 		if (ptr == NULL) {
1239 
1240 			return(NULL);
1241 		}
1242 
1243 		ut_a(mismatch_index < UNIV_PAGE_SIZE);
1244 	}
1245 
1246 	if (end_ptr < ptr + (end_seg_len >> 1)) {
1247 
1248 		return(NULL);
1249 	}
1250 
1251 	if (!block) {
1252 
1253 		return(const_cast<byte*>(ptr + (end_seg_len >> 1)));
1254 	}
1255 
1256 	ut_ad(!!page_is_comp(page) == dict_table_is_comp(index->table));
1257 	ut_ad(!buf_block_get_page_zip(block) || page_is_comp(page));
1258 
1259 	/* Read from the log the inserted index record end segment which
1260 	differs from the cursor record */
1261 
1262 	offsets = rec_get_offsets(cursor_rec, index, offsets,
1263 				  ULINT_UNDEFINED, &heap);
1264 
1265 	if (!(end_seg_len & 0x1UL)) {
1266 		info_and_status_bits = rec_get_info_and_status_bits(
1267 			cursor_rec, page_is_comp(page));
1268 		origin_offset = rec_offs_extra_size(offsets);
1269 		mismatch_index = rec_offs_size(offsets) - (end_seg_len >> 1);
1270 	}
1271 
1272 	end_seg_len >>= 1;
1273 
1274 	if (mismatch_index + end_seg_len < sizeof buf1) {
1275 		buf = buf1;
1276 	} else {
1277 		buf = static_cast<byte*>(
1278 			ut_malloc_nokey(mismatch_index + end_seg_len));
1279 	}
1280 
1281 	/* Build the inserted record to buf */
1282 
1283         if (UNIV_UNLIKELY(mismatch_index >= UNIV_PAGE_SIZE)) {
1284 
1285 		ib::fatal() << "is_short " << is_short << ", "
1286 			<< "info_and_status_bits " << info_and_status_bits
1287 			<< ", offset " << page_offset(cursor_rec) << ","
1288 			" o_offset " << origin_offset << ", mismatch index "
1289 			<< mismatch_index << ", end_seg_len " << end_seg_len
1290 			<< " parsed len " << (ptr - ptr2);
1291 	}
1292 
1293 	ut_memcpy(buf, rec_get_start(cursor_rec, offsets), mismatch_index);
1294 	ut_memcpy(buf + mismatch_index, ptr, end_seg_len);
1295 
1296 	if (page_is_comp(page)) {
1297 		rec_set_info_and_status_bits(buf + origin_offset,
1298 					     info_and_status_bits);
1299 	} else {
1300 		rec_set_info_bits_old(buf + origin_offset,
1301 				      info_and_status_bits);
1302 	}
1303 
1304 	page_cur_position(cursor_rec, block, &cursor);
1305 
1306 	offsets = rec_get_offsets(buf + origin_offset, index, offsets,
1307 				  ULINT_UNDEFINED, &heap);
1308 	if (UNIV_UNLIKELY(!page_cur_rec_insert(&cursor,
1309 					       buf + origin_offset,
1310 					       index, offsets, mtr))) {
1311 		/* The redo log record should only have been written
1312 		after the write was successful. */
1313 		ut_error;
1314 	}
1315 
1316 	if (buf != buf1) {
1317 
1318 		ut_free(buf);
1319 	}
1320 
1321 	if (UNIV_LIKELY_NULL(heap)) {
1322 		mem_heap_free(heap);
1323 	}
1324 
1325 	return(const_cast<byte*>(ptr + end_seg_len));
1326 }
1327 
1328 /***********************************************************//**
1329 Inserts a record next to page cursor on an uncompressed page.
1330 Returns pointer to inserted record if succeed, i.e., enough
1331 space available, NULL otherwise. The cursor stays at the same position.
1332 @return pointer to record if succeed, NULL otherwise */
1333 rec_t*
page_cur_insert_rec_low(rec_t * current_rec,dict_index_t * index,const rec_t * rec,ulint * offsets,mtr_t * mtr)1334 page_cur_insert_rec_low(
1335 /*====================*/
1336 	rec_t*		current_rec,/*!< in: pointer to current record after
1337 				which the new record is inserted */
1338 	dict_index_t*	index,	/*!< in: record descriptor */
1339 	const rec_t*	rec,	/*!< in: pointer to a physical record */
1340 	ulint*		offsets,/*!< in/out: rec_get_offsets(rec, index) */
1341 	mtr_t*		mtr)	/*!< in: mini-transaction handle, or NULL */
1342 {
1343 	byte*		insert_buf;
1344 	ulint		rec_size;
1345 	page_t*		page;		/*!< the relevant page */
1346 	rec_t*		last_insert;	/*!< cursor position at previous
1347 					insert */
1348 	rec_t*		free_rec;	/*!< a free record that was reused,
1349 					or NULL */
1350 	rec_t*		insert_rec;	/*!< inserted record */
1351 	ulint		heap_no;	/*!< heap number of the inserted
1352 					record */
1353 
1354 	ut_ad(rec_offs_validate(rec, index, offsets));
1355 
1356 	page = page_align(current_rec);
1357 	ut_ad(dict_table_is_comp(index->table)
1358 	      == (ibool) !!page_is_comp(page));
1359 	ut_ad(fil_page_index_page_check(page));
1360 	ut_ad(mach_read_from_8(page + PAGE_HEADER + PAGE_INDEX_ID) == index->id
1361 	      || recv_recovery_is_on()
1362 	      || (mtr ? mtr->is_inside_ibuf() : dict_index_is_ibuf(index)));
1363 
1364 	ut_ad(!page_rec_is_supremum(current_rec));
1365 
1366 	/* 1. Get the size of the physical record in the page */
1367 	rec_size = rec_offs_size(offsets);
1368 
1369 #ifdef UNIV_DEBUG_VALGRIND
1370 	{
1371 		const void*	rec_start
1372 			= rec - rec_offs_extra_size(offsets);
1373 		ulint		extra_size
1374 			= rec_offs_extra_size(offsets)
1375 			- (rec_offs_comp(offsets)
1376 			   ? REC_N_NEW_EXTRA_BYTES
1377 			   : REC_N_OLD_EXTRA_BYTES);
1378 
1379 		/* All data bytes of the record must be valid. */
1380 		UNIV_MEM_ASSERT_RW(rec, rec_offs_data_size(offsets));
1381 		/* The variable-length header must be valid. */
1382 		UNIV_MEM_ASSERT_RW(rec_start, extra_size);
1383 	}
1384 #endif /* UNIV_DEBUG_VALGRIND */
1385 
1386 	/* 2. Try to find suitable space from page memory management */
1387 
1388 	free_rec = page_header_get_ptr(page, PAGE_FREE);
1389 	if (UNIV_LIKELY_NULL(free_rec)) {
1390 		/* Try to allocate from the head of the free list. */
1391 		ulint		foffsets_[REC_OFFS_NORMAL_SIZE];
1392 		ulint*		foffsets	= foffsets_;
1393 		mem_heap_t*	heap		= NULL;
1394 
1395 		rec_offs_init(foffsets_);
1396 
1397 		foffsets = rec_get_offsets(
1398 			free_rec, index, foffsets, ULINT_UNDEFINED, &heap);
1399 		if (rec_offs_size(foffsets) < rec_size) {
1400 			if (UNIV_LIKELY_NULL(heap)) {
1401 				mem_heap_free(heap);
1402 			}
1403 
1404 			goto use_heap;
1405 		}
1406 
1407 		insert_buf = free_rec - rec_offs_extra_size(foffsets);
1408 
1409 		if (page_is_comp(page)) {
1410 			heap_no = rec_get_heap_no_new(free_rec);
1411 			page_mem_alloc_free(page, NULL,
1412 					rec_get_next_ptr(free_rec, TRUE),
1413 					rec_size);
1414 		} else {
1415 			heap_no = rec_get_heap_no_old(free_rec);
1416 			page_mem_alloc_free(page, NULL,
1417 					rec_get_next_ptr(free_rec, FALSE),
1418 					rec_size);
1419 		}
1420 
1421 		if (UNIV_LIKELY_NULL(heap)) {
1422 			mem_heap_free(heap);
1423 		}
1424 	} else {
1425 use_heap:
1426 		free_rec = NULL;
1427 		insert_buf = page_mem_alloc_heap(page, NULL,
1428 						 rec_size, &heap_no);
1429 
1430 		if (UNIV_UNLIKELY(insert_buf == NULL)) {
1431 			return(NULL);
1432 		}
1433 	}
1434 
1435 	/* 3. Create the record */
1436 	insert_rec = rec_copy(insert_buf, rec, offsets);
1437 	rec_offs_make_valid(insert_rec, index, offsets);
1438 
1439 	/* 4. Insert the record in the linked list of records */
1440 	ut_ad(current_rec != insert_rec);
1441 
1442 	{
1443 		/* next record after current before the insertion */
1444 		rec_t*	next_rec = page_rec_get_next(current_rec);
1445 #ifdef UNIV_DEBUG
1446 		if (page_is_comp(page)) {
1447 			ut_ad(rec_get_status(current_rec)
1448 				<= REC_STATUS_INFIMUM);
1449 			ut_ad(rec_get_status(insert_rec) < REC_STATUS_INFIMUM);
1450 			ut_ad(rec_get_status(next_rec) != REC_STATUS_INFIMUM);
1451 		}
1452 #endif
1453 		page_rec_set_next(insert_rec, next_rec);
1454 		page_rec_set_next(current_rec, insert_rec);
1455 	}
1456 
1457 	page_header_set_field(page, NULL, PAGE_N_RECS,
1458 			      1 + page_get_n_recs(page));
1459 
1460 	/* 5. Set the n_owned field in the inserted record to zero,
1461 	and set the heap_no field */
1462 	if (page_is_comp(page)) {
1463 		rec_set_n_owned_new(insert_rec, NULL, 0);
1464 		rec_set_heap_no_new(insert_rec, heap_no);
1465 	} else {
1466 		rec_set_n_owned_old(insert_rec, 0);
1467 		rec_set_heap_no_old(insert_rec, heap_no);
1468 	}
1469 
1470 	UNIV_MEM_ASSERT_RW(rec_get_start(insert_rec, offsets),
1471 			   rec_offs_size(offsets));
1472 	/* 6. Update the last insertion info in page header */
1473 
1474 	last_insert = page_header_get_ptr(page, PAGE_LAST_INSERT);
1475 	ut_ad(!last_insert || !page_is_comp(page)
1476 	      || rec_get_node_ptr_flag(last_insert)
1477 	      == rec_get_node_ptr_flag(insert_rec));
1478 
1479 	if (!dict_index_is_spatial(index)) {
1480 		if (UNIV_UNLIKELY(last_insert == NULL)) {
1481 			page_header_set_field(page, NULL, PAGE_DIRECTION,
1482 					      PAGE_NO_DIRECTION);
1483 			page_header_set_field(page, NULL, PAGE_N_DIRECTION, 0);
1484 
1485 		} else if ((last_insert == current_rec)
1486 			   && (page_header_get_field(page, PAGE_DIRECTION)
1487 			       != PAGE_LEFT)) {
1488 
1489 			page_header_set_field(page, NULL, PAGE_DIRECTION,
1490 					      PAGE_RIGHT);
1491 			page_header_set_field(page, NULL, PAGE_N_DIRECTION,
1492 					      page_header_get_field(
1493 						page, PAGE_N_DIRECTION) + 1);
1494 
1495 		} else if ((page_rec_get_next(insert_rec) == last_insert)
1496 			   && (page_header_get_field(page, PAGE_DIRECTION)
1497 			       != PAGE_RIGHT)) {
1498 
1499 			page_header_set_field(page, NULL, PAGE_DIRECTION,
1500 					      PAGE_LEFT);
1501 			page_header_set_field(page, NULL, PAGE_N_DIRECTION,
1502 					      page_header_get_field(
1503 						page, PAGE_N_DIRECTION) + 1);
1504 		} else {
1505 			page_header_set_field(page, NULL, PAGE_DIRECTION,
1506 					      PAGE_NO_DIRECTION);
1507 			page_header_set_field(page, NULL, PAGE_N_DIRECTION, 0);
1508 		}
1509 	}
1510 
1511 	page_header_set_ptr(page, NULL, PAGE_LAST_INSERT, insert_rec);
1512 
1513 	/* 7. It remains to update the owner record. */
1514 	{
1515 		rec_t*	owner_rec	= page_rec_find_owner_rec(insert_rec);
1516 		ulint	n_owned;
1517 		if (page_is_comp(page)) {
1518 			n_owned = rec_get_n_owned_new(owner_rec);
1519 			rec_set_n_owned_new(owner_rec, NULL, n_owned + 1);
1520 		} else {
1521 			n_owned = rec_get_n_owned_old(owner_rec);
1522 			rec_set_n_owned_old(owner_rec, n_owned + 1);
1523 		}
1524 
1525 		/* 8. Now we have incremented the n_owned field of the owner
1526 		record. If the number exceeds PAGE_DIR_SLOT_MAX_N_OWNED,
1527 		we have to split the corresponding directory slot in two. */
1528 
1529 		if (UNIV_UNLIKELY(n_owned == PAGE_DIR_SLOT_MAX_N_OWNED)) {
1530 			page_dir_split_slot(
1531 				page, NULL,
1532 				page_dir_find_owner_slot(owner_rec));
1533 		}
1534 	}
1535 
1536 	/* 9. Write log record of the insert */
1537 	if (UNIV_LIKELY(mtr != NULL)) {
1538 		page_cur_insert_rec_write_log(insert_rec, rec_size,
1539 					      current_rec, index, mtr);
1540 	}
1541 
1542 	return(insert_rec);
1543 }
1544 
1545 /** Inserts a record next to page cursor on an uncompressed page.
1546 @param[in]	current_rec	pointer to current record after which
1547 				the new record is inserted.
1548 @param[in]	index		record descriptor
1549 @param[in]	tuple		pointer to a data tuple
1550 @param[in]	n_ext		number of externally stored columns
1551 @param[in]	mtr		mini-transaction handle, or NULL
1552 
1553 @return pointer to record if succeed, NULL otherwise */
1554 rec_t*
page_cur_direct_insert_rec_low(rec_t * current_rec,dict_index_t * index,const dtuple_t * tuple,ulint n_ext,mtr_t * mtr)1555 page_cur_direct_insert_rec_low(
1556 	rec_t*		current_rec,
1557 	dict_index_t*	index,
1558 	const dtuple_t*	tuple,
1559 	ulint		n_ext,
1560 	mtr_t*		mtr)
1561 {
1562 	byte*		insert_buf;
1563 	ulint		rec_size;
1564 	page_t*		page;		/*!< the relevant page */
1565 	rec_t*		last_insert;	/*!< cursor position at previous
1566 					insert */
1567 	rec_t*		free_rec;	/*!< a free record that was reused,
1568 					or NULL */
1569 	rec_t*		insert_rec;	/*!< inserted record */
1570 	ulint		heap_no;	/*!< heap number of the inserted
1571 					record */
1572 
1573 	page = page_align(current_rec);
1574 
1575 	ut_ad(dict_table_is_comp(index->table)
1576 	      == (ibool) !!page_is_comp(page));
1577 
1578 	ut_ad(fil_page_index_page_check(page));
1579 
1580 	ut_ad(mach_read_from_8(page + PAGE_HEADER + PAGE_INDEX_ID)
1581 	      == index->id);
1582 
1583 	ut_ad(!page_rec_is_supremum(current_rec));
1584 
1585 	/* 1. Get the size of the physical record in the page */
1586 	rec_size = index->rec_cache.rec_size;
1587 
1588 	/* 2. Try to find suitable space from page memory management */
1589 	free_rec = page_header_get_ptr(page, PAGE_FREE);
1590 	if (free_rec) {
1591 		/* Try to allocate from the head of the free list. */
1592 		ulint		foffsets_[REC_OFFS_NORMAL_SIZE];
1593 		ulint*		foffsets	= foffsets_;
1594 		mem_heap_t*	heap		= NULL;
1595 
1596 		rec_offs_init(foffsets_);
1597 
1598 		foffsets = rec_get_offsets(
1599 			free_rec, index, foffsets, ULINT_UNDEFINED, &heap);
1600 		if (rec_offs_size(foffsets) < rec_size) {
1601 			if (heap != NULL) {
1602 				mem_heap_free(heap);
1603 				heap = NULL;
1604 			}
1605 
1606 			free_rec = NULL;
1607 			insert_buf = page_mem_alloc_heap(
1608 				page, NULL, rec_size, &heap_no);
1609 
1610 			if (insert_buf == NULL) {
1611 				return(NULL);
1612 			}
1613 		} else {
1614 			insert_buf = free_rec - rec_offs_extra_size(foffsets);
1615 
1616 			if (page_is_comp(page)) {
1617 				heap_no = rec_get_heap_no_new(free_rec);
1618 				page_mem_alloc_free(
1619 					page, NULL,
1620 					rec_get_next_ptr(free_rec, TRUE),
1621 					rec_size);
1622 			} else {
1623 				heap_no = rec_get_heap_no_old(free_rec);
1624 				page_mem_alloc_free(
1625 					page, NULL,
1626 					rec_get_next_ptr(free_rec, FALSE),
1627 					rec_size);
1628 			}
1629 
1630 			if (heap != NULL) {
1631 				mem_heap_free(heap);
1632 				heap = NULL;
1633 			}
1634 		}
1635 	} else {
1636 		free_rec = NULL;
1637 		insert_buf = page_mem_alloc_heap(page, NULL,
1638 						 rec_size, &heap_no);
1639 
1640 		if (insert_buf == NULL) {
1641 			return(NULL);
1642 		}
1643 	}
1644 
1645 	/* 3. Create the record */
1646 	insert_rec = rec_convert_dtuple_to_rec(insert_buf, index, tuple, n_ext);
1647 
1648 	/* 4. Insert the record in the linked list of records */
1649 	ut_ad(current_rec != insert_rec);
1650 
1651 	{
1652 		/* next record after current before the insertion */
1653 		rec_t*	next_rec = page_rec_get_next(current_rec);
1654 #ifdef UNIV_DEBUG
1655 		if (page_is_comp(page)) {
1656 			ut_ad(rec_get_status(current_rec)
1657 				<= REC_STATUS_INFIMUM);
1658 			ut_ad(rec_get_status(insert_rec) < REC_STATUS_INFIMUM);
1659 			ut_ad(rec_get_status(next_rec) != REC_STATUS_INFIMUM);
1660 		}
1661 #endif
1662 		page_rec_set_next(insert_rec, next_rec);
1663 		page_rec_set_next(current_rec, insert_rec);
1664 	}
1665 
1666 	page_header_set_field(page, NULL, PAGE_N_RECS,
1667 			      1 + page_get_n_recs(page));
1668 
1669 	/* 5. Set the n_owned field in the inserted record to zero,
1670 	and set the heap_no field */
1671 	if (page_is_comp(page)) {
1672 		rec_set_n_owned_new(insert_rec, NULL, 0);
1673 		rec_set_heap_no_new(insert_rec, heap_no);
1674 	} else {
1675 		rec_set_n_owned_old(insert_rec, 0);
1676 		rec_set_heap_no_old(insert_rec, heap_no);
1677 	}
1678 
1679 	/* 6. Update the last insertion info in page header */
1680 
1681 	last_insert = page_header_get_ptr(page, PAGE_LAST_INSERT);
1682 	ut_ad(!last_insert || !page_is_comp(page)
1683 	      || rec_get_node_ptr_flag(last_insert)
1684 	      == rec_get_node_ptr_flag(insert_rec));
1685 
1686 	if (last_insert == NULL) {
1687 		page_header_set_field(page, NULL, PAGE_DIRECTION,
1688 				      PAGE_NO_DIRECTION);
1689 		page_header_set_field(page, NULL, PAGE_N_DIRECTION, 0);
1690 
1691 	} else if ((last_insert == current_rec)
1692 		   && (page_header_get_field(page, PAGE_DIRECTION)
1693 		       != PAGE_LEFT)) {
1694 
1695 		page_header_set_field(page, NULL, PAGE_DIRECTION,
1696 							PAGE_RIGHT);
1697 		page_header_set_field(page, NULL, PAGE_N_DIRECTION,
1698 				      page_header_get_field(
1699 					      page, PAGE_N_DIRECTION) + 1);
1700 
1701 	} else if ((page_rec_get_next(insert_rec) == last_insert)
1702 		   && (page_header_get_field(page, PAGE_DIRECTION)
1703 		       != PAGE_RIGHT)) {
1704 
1705 		page_header_set_field(page, NULL, PAGE_DIRECTION,
1706 							PAGE_LEFT);
1707 		page_header_set_field(page, NULL, PAGE_N_DIRECTION,
1708 				      page_header_get_field(
1709 					      page, PAGE_N_DIRECTION) + 1);
1710 	} else {
1711 		page_header_set_field(page, NULL, PAGE_DIRECTION,
1712 							PAGE_NO_DIRECTION);
1713 		page_header_set_field(page, NULL, PAGE_N_DIRECTION, 0);
1714 	}
1715 
1716 	page_header_set_ptr(page, NULL, PAGE_LAST_INSERT, insert_rec);
1717 
1718 	/* 7. It remains to update the owner record. */
1719 	{
1720 		rec_t*	owner_rec	= page_rec_find_owner_rec(insert_rec);
1721 		ulint	n_owned;
1722 		if (page_is_comp(page)) {
1723 			n_owned = rec_get_n_owned_new(owner_rec);
1724 			rec_set_n_owned_new(owner_rec, NULL, n_owned + 1);
1725 		} else {
1726 			n_owned = rec_get_n_owned_old(owner_rec);
1727 			rec_set_n_owned_old(owner_rec, n_owned + 1);
1728 		}
1729 
1730 		/* 8. Now we have incremented the n_owned field of the owner
1731 		record. If the number exceeds PAGE_DIR_SLOT_MAX_N_OWNED,
1732 		we have to split the corresponding directory slot in two. */
1733 
1734 		if (n_owned == PAGE_DIR_SLOT_MAX_N_OWNED) {
1735 			page_dir_split_slot(
1736 				page, NULL,
1737 				page_dir_find_owner_slot(owner_rec));
1738 		}
1739 	}
1740 
1741 	/* 8. Open the mtr for name sake to set the modification flag
1742 	to true failing which no flush would be done. */
1743 	byte*	log_ptr = mlog_open(mtr, 0);
1744 	ut_ad(log_ptr == NULL);
1745 	if (log_ptr != NULL) {
1746 		/* To keep complier happy. */
1747 		mlog_close(mtr, log_ptr);
1748 	}
1749 
1750 	return(insert_rec);
1751 }
1752 
1753 /***********************************************************//**
1754 Inserts a record next to page cursor on a compressed and uncompressed
1755 page. Returns pointer to inserted record if succeed, i.e.,
1756 enough space available, NULL otherwise.
1757 The cursor stays at the same position.
1758 
1759 IMPORTANT: The caller will have to update IBUF_BITMAP_FREE
1760 if this is a compressed leaf page in a secondary index.
1761 This has to be done either within the same mini-transaction,
1762 or by invoking ibuf_reset_free_bits() before mtr_commit().
1763 
1764 @return pointer to record if succeed, NULL otherwise */
1765 rec_t*
page_cur_insert_rec_zip(page_cur_t * cursor,dict_index_t * index,const rec_t * rec,ulint * offsets,mtr_t * mtr)1766 page_cur_insert_rec_zip(
1767 /*====================*/
1768 	page_cur_t*	cursor,	/*!< in/out: page cursor */
1769 	dict_index_t*	index,	/*!< in: record descriptor */
1770 	const rec_t*	rec,	/*!< in: pointer to a physical record */
1771 	ulint*		offsets,/*!< in/out: rec_get_offsets(rec, index) */
1772 	mtr_t*		mtr)	/*!< in: mini-transaction handle, or NULL */
1773 {
1774 	byte*		insert_buf;
1775 	ulint		rec_size;
1776 	page_t*		page;		/*!< the relevant page */
1777 	rec_t*		last_insert;	/*!< cursor position at previous
1778 					insert */
1779 	rec_t*		free_rec;	/*!< a free record that was reused,
1780 					or NULL */
1781 	rec_t*		insert_rec;	/*!< inserted record */
1782 	ulint		heap_no;	/*!< heap number of the inserted
1783 					record */
1784 	page_zip_des_t*	page_zip;
1785 
1786 	page_zip = page_cur_get_page_zip(cursor);
1787 	ut_ad(page_zip);
1788 
1789 	ut_ad(rec_offs_validate(rec, index, offsets));
1790 
1791 	page = page_cur_get_page(cursor);
1792 	ut_ad(dict_table_is_comp(index->table));
1793 	ut_ad(page_is_comp(page));
1794 	ut_ad(fil_page_index_page_check(page));
1795 	ut_ad(mach_read_from_8(page + PAGE_HEADER + PAGE_INDEX_ID) == index->id
1796 	      || (mtr ? mtr->is_inside_ibuf() : dict_index_is_ibuf(index))
1797 	      || recv_recovery_is_on());
1798 
1799 	ut_ad(!page_cur_is_after_last(cursor));
1800 #ifdef UNIV_ZIP_DEBUG
1801 	ut_a(page_zip_validate(page_zip, page, index));
1802 #endif /* UNIV_ZIP_DEBUG */
1803 
1804 	/* 1. Get the size of the physical record in the page */
1805 	rec_size = rec_offs_size(offsets);
1806 
1807 #ifdef UNIV_DEBUG_VALGRIND
1808 	{
1809 		const void*	rec_start
1810 			= rec - rec_offs_extra_size(offsets);
1811 		ulint		extra_size
1812 			= rec_offs_extra_size(offsets)
1813 			- (rec_offs_comp(offsets)
1814 			   ? REC_N_NEW_EXTRA_BYTES
1815 			   : REC_N_OLD_EXTRA_BYTES);
1816 
1817 		/* All data bytes of the record must be valid. */
1818 		UNIV_MEM_ASSERT_RW(rec, rec_offs_data_size(offsets));
1819 		/* The variable-length header must be valid. */
1820 		UNIV_MEM_ASSERT_RW(rec_start, extra_size);
1821 	}
1822 #endif /* UNIV_DEBUG_VALGRIND */
1823 
1824 	const bool reorg_before_insert = page_has_garbage(page)
1825 		&& rec_size > page_get_max_insert_size(page, 1)
1826 		&& rec_size <= page_get_max_insert_size_after_reorganize(
1827 			page, 1);
1828 
1829 	/* 2. Try to find suitable space from page memory management */
1830 	if (!page_zip_available(page_zip, dict_index_is_clust(index),
1831 				rec_size, 1)
1832 	    || reorg_before_insert) {
1833 		/* The values can change dynamically. */
1834 		bool	log_compressed	= page_zip_log_pages;
1835 		ulint	level		= page_zip_level;
1836 #ifdef UNIV_DEBUG
1837 		rec_t*	cursor_rec	= page_cur_get_rec(cursor);
1838 #endif /* UNIV_DEBUG */
1839 
1840 		/* If we are not writing compressed page images, we
1841 		must reorganize the page before attempting the
1842 		insert. */
1843 		if (recv_recovery_is_on()) {
1844 			/* Insert into the uncompressed page only.
1845 			The page reorganization or creation that we
1846 			would attempt outside crash recovery would
1847 			have been covered by a previous redo log record. */
1848 		} else if (page_is_empty(page)) {
1849 			ut_ad(page_cur_is_before_first(cursor));
1850 
1851 			/* This is an empty page. Recreate it to
1852 			get rid of the modification log. */
1853 			page_create_zip(page_cur_get_block(cursor), index,
1854 					page_header_get_field(page, PAGE_LEVEL),
1855 					0, NULL, mtr);
1856 			ut_ad(!page_header_get_ptr(page, PAGE_FREE));
1857 
1858 			if (page_zip_available(
1859 				    page_zip, dict_index_is_clust(index),
1860 				    rec_size, 1)) {
1861 				goto use_heap;
1862 			}
1863 
1864 			/* The cursor should remain on the page infimum. */
1865 			return(NULL);
1866 		} else if (!page_zip->m_nonempty && !page_has_garbage(page)) {
1867 			/* The page has been freshly compressed, so
1868 			reorganizing it will not help. */
1869 		} else if (log_compressed && !reorg_before_insert) {
1870 			/* Insert into uncompressed page only, and
1871 			try page_zip_reorganize() afterwards. */
1872 		} else if (btr_page_reorganize_low(
1873 				   recv_recovery_is_on(), level,
1874 				   cursor, index, mtr)) {
1875 			ut_ad(!page_header_get_ptr(page, PAGE_FREE));
1876 
1877 			if (page_zip_available(
1878 				    page_zip, dict_index_is_clust(index),
1879 				    rec_size, 1)) {
1880 				/* After reorganizing, there is space
1881 				available. */
1882 				goto use_heap;
1883 			}
1884 		} else {
1885 			ut_ad(cursor->rec == cursor_rec);
1886 			return(NULL);
1887 		}
1888 
1889 		/* Try compressing the whole page afterwards. */
1890 		insert_rec = page_cur_insert_rec_low(
1891 			cursor->rec, index, rec, offsets, NULL);
1892 
1893 		/* If recovery is on, this implies that the compression
1894 		of the page was successful during runtime. Had that not
1895 		been the case or had the redo logging of compressed
1896 		pages been enabled during runtime then we'd have seen
1897 		a MLOG_ZIP_PAGE_COMPRESS redo record. Therefore, we
1898 		know that we don't need to reorganize the page. We,
1899 		however, do need to recompress the page. That will
1900 		happen when the next redo record is read which must
1901 		be of type MLOG_ZIP_PAGE_COMPRESS_NO_DATA and it must
1902 		contain a valid compression level value.
1903 		This implies that during recovery from this point till
1904 		the next redo is applied the uncompressed and
1905 		compressed versions are not identical and
1906 		page_zip_validate will fail but that is OK because
1907 		we call page_zip_validate only after processing
1908 		all changes to a page under a single mtr during
1909 		recovery. */
1910 		if (insert_rec == NULL) {
1911 			/* Out of space.
1912 			This should never occur during crash recovery,
1913 			because the MLOG_COMP_REC_INSERT should only
1914 			be logged after a successful operation. */
1915 			ut_ad(!recv_recovery_is_on());
1916 		} else if (recv_recovery_is_on()) {
1917 			/* This should be followed by
1918 			MLOG_ZIP_PAGE_COMPRESS_NO_DATA,
1919 			which should succeed. */
1920 			rec_offs_make_valid(insert_rec, index, offsets);
1921 		} else {
1922 			ulint	pos = page_rec_get_n_recs_before(insert_rec);
1923 			ut_ad(pos > 0);
1924 
1925 			if (!log_compressed) {
1926 				if (page_zip_compress(
1927 					    page_zip, page, index,
1928 					    level, NULL, NULL)) {
1929 					page_cur_insert_rec_write_log(
1930 						insert_rec, rec_size,
1931 						cursor->rec, index, mtr);
1932 					page_zip_compress_write_log_no_data(
1933 						level, page, index, mtr);
1934 
1935 					rec_offs_make_valid(
1936 						insert_rec, index, offsets);
1937 					return(insert_rec);
1938 				}
1939 
1940 				ut_ad(cursor->rec
1941 				      == (pos > 1
1942 					  ? page_rec_get_nth(
1943 						  page, pos - 1)
1944 					  : page + PAGE_NEW_INFIMUM));
1945 			} else {
1946 				/* We are writing entire page images
1947 				to the log. Reduce the redo log volume
1948 				by reorganizing the page at the same time. */
1949 				if (page_zip_reorganize(
1950 					    cursor->block, index, mtr)) {
1951 					/* The page was reorganized:
1952 					Seek to pos. */
1953 					if (pos > 1) {
1954 						cursor->rec = page_rec_get_nth(
1955 							page, pos - 1);
1956 					} else {
1957 						cursor->rec = page
1958 							+ PAGE_NEW_INFIMUM;
1959 					}
1960 
1961 					insert_rec = page + rec_get_next_offs(
1962 						cursor->rec, TRUE);
1963 					rec_offs_make_valid(
1964 						insert_rec, index, offsets);
1965 					return(insert_rec);
1966 				}
1967 
1968 				/* Theoretically, we could try one
1969 				last resort of btr_page_reorganize_low()
1970 				followed by page_zip_available(), but
1971 				that would be very unlikely to
1972 				succeed. (If the full reorganized page
1973 				failed to compress, why would it
1974 				succeed to compress the page, plus log
1975 				the insert of this record? */
1976 			}
1977 
1978 			/* Out of space: restore the page */
1979 			if (!page_zip_decompress(page_zip, page, FALSE)) {
1980 				ut_error; /* Memory corrupted? */
1981 			}
1982 			ut_ad(page_validate(page, index));
1983 			insert_rec = NULL;
1984 		}
1985 
1986 		return(insert_rec);
1987 	}
1988 
1989 	free_rec = page_header_get_ptr(page, PAGE_FREE);
1990 	if (UNIV_LIKELY_NULL(free_rec)) {
1991 		/* Try to allocate from the head of the free list. */
1992 		lint	extra_size_diff;
1993 		ulint		foffsets_[REC_OFFS_NORMAL_SIZE];
1994 		ulint*		foffsets	= foffsets_;
1995 		mem_heap_t*	heap		= NULL;
1996 
1997 		rec_offs_init(foffsets_);
1998 
1999 		foffsets = rec_get_offsets(free_rec, index, foffsets,
2000 					   ULINT_UNDEFINED, &heap);
2001 		if (rec_offs_size(foffsets) < rec_size) {
2002 too_small:
2003 			if (UNIV_LIKELY_NULL(heap)) {
2004 				mem_heap_free(heap);
2005 			}
2006 
2007 			goto use_heap;
2008 		}
2009 
2010 		insert_buf = free_rec - rec_offs_extra_size(foffsets);
2011 
2012 		/* On compressed pages, do not relocate records from
2013 		the free list.  If extra_size would grow, use the heap. */
2014 		extra_size_diff
2015 			= rec_offs_extra_size(offsets)
2016 			- rec_offs_extra_size(foffsets);
2017 
2018 		if (UNIV_UNLIKELY(extra_size_diff < 0)) {
2019 			/* Add an offset to the extra_size. */
2020 			if (rec_offs_size(foffsets)
2021 			    < rec_size - extra_size_diff) {
2022 
2023 				goto too_small;
2024 			}
2025 
2026 			insert_buf -= extra_size_diff;
2027 		} else if (UNIV_UNLIKELY(extra_size_diff)) {
2028 			/* Do not allow extra_size to grow */
2029 
2030 			goto too_small;
2031 		}
2032 
2033 		heap_no = rec_get_heap_no_new(free_rec);
2034 		page_mem_alloc_free(page, page_zip,
2035 				    rec_get_next_ptr(free_rec, TRUE),
2036 				    rec_size);
2037 
2038 		if (!page_is_leaf(page)) {
2039 			/* Zero out the node pointer of free_rec,
2040 			in case it will not be overwritten by
2041 			insert_rec. */
2042 
2043 			ut_ad(rec_size > REC_NODE_PTR_SIZE);
2044 
2045 			if (rec_offs_extra_size(foffsets)
2046 			    + rec_offs_data_size(foffsets) > rec_size) {
2047 
2048 				memset(rec_get_end(free_rec, foffsets)
2049 				       - REC_NODE_PTR_SIZE, 0,
2050 				       REC_NODE_PTR_SIZE);
2051 			}
2052 		} else if (dict_index_is_clust(index)) {
2053 			/* Zero out the DB_TRX_ID and DB_ROLL_PTR
2054 			columns of free_rec, in case it will not be
2055 			overwritten by insert_rec. */
2056 
2057 			ulint	trx_id_col;
2058 			ulint	trx_id_offs;
2059 			ulint	len;
2060 
2061 			trx_id_col = dict_index_get_sys_col_pos(index,
2062 								DATA_TRX_ID);
2063 			ut_ad(trx_id_col > 0);
2064 			ut_ad(trx_id_col != ULINT_UNDEFINED);
2065 
2066 			trx_id_offs = rec_get_nth_field_offs(foffsets,
2067 							     trx_id_col, &len);
2068 			ut_ad(len == DATA_TRX_ID_LEN);
2069 
2070 			if (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN + trx_id_offs
2071 			    + rec_offs_extra_size(foffsets) > rec_size) {
2072 				/* We will have to zero out the
2073 				DB_TRX_ID and DB_ROLL_PTR, because
2074 				they will not be fully overwritten by
2075 				insert_rec. */
2076 
2077 				memset(free_rec + trx_id_offs, 0,
2078 				       DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
2079 			}
2080 
2081 			ut_ad(free_rec + trx_id_offs + DATA_TRX_ID_LEN
2082 			      == rec_get_nth_field(free_rec, foffsets,
2083 						   trx_id_col + 1, &len));
2084 			ut_ad(len == DATA_ROLL_PTR_LEN);
2085 		}
2086 
2087 		if (UNIV_LIKELY_NULL(heap)) {
2088 			mem_heap_free(heap);
2089 		}
2090 	} else {
2091 use_heap:
2092 		free_rec = NULL;
2093 		insert_buf = page_mem_alloc_heap(page, page_zip,
2094 						 rec_size, &heap_no);
2095 
2096 		if (UNIV_UNLIKELY(insert_buf == NULL)) {
2097 			return(NULL);
2098 		}
2099 
2100 		page_zip_dir_add_slot(page_zip, dict_index_is_clust(index));
2101 	}
2102 
2103 	/* 3. Create the record */
2104 	insert_rec = rec_copy(insert_buf, rec, offsets);
2105 	rec_offs_make_valid(insert_rec, index, offsets);
2106 
2107 	/* 4. Insert the record in the linked list of records */
2108 	ut_ad(cursor->rec != insert_rec);
2109 
2110 	{
2111 		/* next record after current before the insertion */
2112 		const rec_t*	next_rec = page_rec_get_next_low(
2113 			cursor->rec, TRUE);
2114 		ut_ad(rec_get_status(cursor->rec)
2115 		      <= REC_STATUS_INFIMUM);
2116 		ut_ad(rec_get_status(insert_rec) < REC_STATUS_INFIMUM);
2117 		ut_ad(rec_get_status(next_rec) != REC_STATUS_INFIMUM);
2118 
2119 		page_rec_set_next(insert_rec, next_rec);
2120 		page_rec_set_next(cursor->rec, insert_rec);
2121 	}
2122 
2123 	page_header_set_field(page, page_zip, PAGE_N_RECS,
2124 			      1 + page_get_n_recs(page));
2125 
2126 	/* 5. Set the n_owned field in the inserted record to zero,
2127 	and set the heap_no field */
2128 	rec_set_n_owned_new(insert_rec, NULL, 0);
2129 	rec_set_heap_no_new(insert_rec, heap_no);
2130 
2131 	UNIV_MEM_ASSERT_RW(rec_get_start(insert_rec, offsets),
2132 			   rec_offs_size(offsets));
2133 
2134 	page_zip_dir_insert(page_zip, cursor->rec, free_rec, insert_rec);
2135 
2136 	/* 6. Update the last insertion info in page header */
2137 
2138 	last_insert = page_header_get_ptr(page, PAGE_LAST_INSERT);
2139 	ut_ad(!last_insert
2140 	      || rec_get_node_ptr_flag(last_insert)
2141 	      == rec_get_node_ptr_flag(insert_rec));
2142 
2143 	if (!dict_index_is_spatial(index)) {
2144 		if (UNIV_UNLIKELY(last_insert == NULL)) {
2145 			page_header_set_field(page, page_zip, PAGE_DIRECTION,
2146 					      PAGE_NO_DIRECTION);
2147 			page_header_set_field(page, page_zip,
2148 					      PAGE_N_DIRECTION, 0);
2149 
2150 		} else if ((last_insert == cursor->rec)
2151 			   && (page_header_get_field(page, PAGE_DIRECTION)
2152 			       != PAGE_LEFT)) {
2153 
2154 			page_header_set_field(page, page_zip, PAGE_DIRECTION,
2155 					      PAGE_RIGHT);
2156 			page_header_set_field(page, page_zip, PAGE_N_DIRECTION,
2157 					      page_header_get_field(
2158 						page, PAGE_N_DIRECTION) + 1);
2159 
2160 		} else if ((page_rec_get_next(insert_rec) == last_insert)
2161 			   && (page_header_get_field(page, PAGE_DIRECTION)
2162 			       != PAGE_RIGHT)) {
2163 
2164 			page_header_set_field(page, page_zip, PAGE_DIRECTION,
2165 					      PAGE_LEFT);
2166 			page_header_set_field(page, page_zip, PAGE_N_DIRECTION,
2167 					      page_header_get_field(
2168 						page, PAGE_N_DIRECTION) + 1);
2169 		} else {
2170 			page_header_set_field(page, page_zip, PAGE_DIRECTION,
2171 					      PAGE_NO_DIRECTION);
2172 			page_header_set_field(page, page_zip,
2173 					      PAGE_N_DIRECTION, 0);
2174 		}
2175 	}
2176 
2177 	page_header_set_ptr(page, page_zip, PAGE_LAST_INSERT, insert_rec);
2178 
2179 	/* 7. It remains to update the owner record. */
2180 	{
2181 		rec_t*	owner_rec	= page_rec_find_owner_rec(insert_rec);
2182 		ulint	n_owned;
2183 
2184 		n_owned = rec_get_n_owned_new(owner_rec);
2185 		rec_set_n_owned_new(owner_rec, page_zip, n_owned + 1);
2186 
2187 		/* 8. Now we have incremented the n_owned field of the owner
2188 		record. If the number exceeds PAGE_DIR_SLOT_MAX_N_OWNED,
2189 		we have to split the corresponding directory slot in two. */
2190 
2191 		if (UNIV_UNLIKELY(n_owned == PAGE_DIR_SLOT_MAX_N_OWNED)) {
2192 			page_dir_split_slot(
2193 				page, page_zip,
2194 				page_dir_find_owner_slot(owner_rec));
2195 		}
2196 	}
2197 
2198 	page_zip_write_rec(page_zip, insert_rec, index, offsets, 1);
2199 
2200 	/* 9. Write log record of the insert */
2201 	if (UNIV_LIKELY(mtr != NULL)) {
2202 		page_cur_insert_rec_write_log(insert_rec, rec_size,
2203 					      cursor->rec, index, mtr);
2204 	}
2205 
2206 	return(insert_rec);
2207 }
2208 
2209 #ifndef UNIV_HOTBACKUP
2210 /**********************************************************//**
2211 Writes a log record of copying a record list end to a new created page.
2212 @return 4-byte field where to write the log data length, or NULL if
2213 logging is disabled */
2214 UNIV_INLINE
2215 byte*
page_copy_rec_list_to_created_page_write_log(page_t * page,dict_index_t * index,mtr_t * mtr)2216 page_copy_rec_list_to_created_page_write_log(
2217 /*=========================================*/
2218 	page_t*		page,	/*!< in: index page */
2219 	dict_index_t*	index,	/*!< in: record descriptor */
2220 	mtr_t*		mtr)	/*!< in: mtr */
2221 {
2222 	byte*	log_ptr;
2223 
2224 	ut_ad(!!page_is_comp(page) == dict_table_is_comp(index->table));
2225 	ut_ad(mtr->is_named_space(index->space));
2226 
2227 	log_ptr = mlog_open_and_write_index(mtr, page, index,
2228 					    page_is_comp(page)
2229 					    ? MLOG_COMP_LIST_END_COPY_CREATED
2230 					    : MLOG_LIST_END_COPY_CREATED, 4);
2231 	if (UNIV_LIKELY(log_ptr != NULL)) {
2232 		mlog_close(mtr, log_ptr + 4);
2233 	}
2234 
2235 	return(log_ptr);
2236 }
2237 #endif /* !UNIV_HOTBACKUP */
2238 
2239 /**********************************************************//**
2240 Parses a log record of copying a record list end to a new created page.
2241 @return end of log record or NULL */
2242 byte*
page_parse_copy_rec_list_to_created_page(byte * ptr,byte * end_ptr,buf_block_t * block,dict_index_t * index,mtr_t * mtr)2243 page_parse_copy_rec_list_to_created_page(
2244 /*=====================================*/
2245 	byte*		ptr,	/*!< in: buffer */
2246 	byte*		end_ptr,/*!< in: buffer end */
2247 	buf_block_t*	block,	/*!< in: page or NULL */
2248 	dict_index_t*	index,	/*!< in: record descriptor */
2249 	mtr_t*		mtr)	/*!< in: mtr or NULL */
2250 {
2251 	byte*		rec_end;
2252 	ulint		log_data_len;
2253 	page_t*		page;
2254 	page_zip_des_t*	page_zip;
2255 
2256 	if (ptr + 4 > end_ptr) {
2257 
2258 		return(NULL);
2259 	}
2260 
2261 	log_data_len = mach_read_from_4(ptr);
2262 	ptr += 4;
2263 
2264 	rec_end = ptr + log_data_len;
2265 
2266 	if (rec_end > end_ptr) {
2267 
2268 		return(NULL);
2269 	}
2270 
2271 	if (!block) {
2272 
2273 		return(rec_end);
2274 	}
2275 
2276 	while (ptr < rec_end) {
2277 		ptr = page_cur_parse_insert_rec(TRUE, ptr, end_ptr,
2278 						block, index, mtr);
2279 	}
2280 
2281 	ut_a(ptr == rec_end);
2282 
2283 	page = buf_block_get_frame(block);
2284 	page_zip = buf_block_get_page_zip(block);
2285 
2286 	page_header_set_ptr(page, page_zip, PAGE_LAST_INSERT, NULL);
2287 
2288 	if (!dict_index_is_spatial(index)) {
2289 		page_header_set_field(page, page_zip, PAGE_DIRECTION,
2290 				      PAGE_NO_DIRECTION);
2291 		page_header_set_field(page, page_zip, PAGE_N_DIRECTION, 0);
2292 	}
2293 
2294 	return(rec_end);
2295 }
2296 
2297 #ifndef UNIV_HOTBACKUP
2298 /*************************************************************//**
2299 Copies records from page to a newly created page, from a given record onward,
2300 including that record. Infimum and supremum records are not copied.
2301 
2302 IMPORTANT: The caller will have to update IBUF_BITMAP_FREE
2303 if this is a compressed leaf page in a secondary index.
2304 This has to be done either within the same mini-transaction,
2305 or by invoking ibuf_reset_free_bits() before mtr_commit(). */
2306 void
page_copy_rec_list_end_to_created_page(page_t * new_page,rec_t * rec,dict_index_t * index,mtr_t * mtr)2307 page_copy_rec_list_end_to_created_page(
2308 /*===================================*/
2309 	page_t*		new_page,	/*!< in/out: index page to copy to */
2310 	rec_t*		rec,		/*!< in: first record to copy */
2311 	dict_index_t*	index,		/*!< in: record descriptor */
2312 	mtr_t*		mtr)		/*!< in: mtr */
2313 {
2314 	page_dir_slot_t* slot = 0; /* remove warning */
2315 	byte*	heap_top;
2316 	rec_t*	insert_rec = 0; /* remove warning */
2317 	rec_t*	prev_rec;
2318 	ulint	count;
2319 	ulint	n_recs;
2320 	ulint	slot_index;
2321 	ulint	rec_size;
2322 	byte*	log_ptr;
2323 	ulint	log_data_len;
2324 	mem_heap_t*	heap		= NULL;
2325 	ulint		offsets_[REC_OFFS_NORMAL_SIZE];
2326 	ulint*		offsets		= offsets_;
2327 	rec_offs_init(offsets_);
2328 
2329 	ut_ad(page_dir_get_n_heap(new_page) == PAGE_HEAP_NO_USER_LOW);
2330 	ut_ad(page_align(rec) != new_page);
2331 	ut_ad(page_rec_is_comp(rec) == page_is_comp(new_page));
2332 
2333 	if (page_rec_is_infimum(rec)) {
2334 
2335 		rec = page_rec_get_next(rec);
2336 	}
2337 
2338 	if (page_rec_is_supremum(rec)) {
2339 
2340 		return;
2341 	}
2342 
2343 #ifdef UNIV_DEBUG
2344 	/* To pass the debug tests we have to set these dummy values
2345 	in the debug version */
2346 	page_dir_set_n_slots(new_page, NULL, UNIV_PAGE_SIZE / 2);
2347 	page_header_set_ptr(new_page, NULL, PAGE_HEAP_TOP,
2348 			    new_page + UNIV_PAGE_SIZE - 1);
2349 #endif
2350 
2351 	log_ptr = page_copy_rec_list_to_created_page_write_log(new_page,
2352 							       index, mtr);
2353 
2354 	log_data_len = mtr->get_log()->size();
2355 
2356 	/* Individual inserts are logged in a shorter form */
2357 
2358 	mtr_log_t	log_mode;
2359 
2360 	if (dict_table_is_temporary(index->table)
2361 	    || index->table->ibd_file_missing /* IMPORT TABLESPACE */) {
2362 		log_mode = mtr_get_log_mode(mtr);
2363 	} else {
2364 		log_mode = mtr_set_log_mode(mtr, MTR_LOG_SHORT_INSERTS);
2365 	}
2366 
2367 	prev_rec = page_get_infimum_rec(new_page);
2368 	if (page_is_comp(new_page)) {
2369 		heap_top = new_page + PAGE_NEW_SUPREMUM_END;
2370 	} else {
2371 		heap_top = new_page + PAGE_OLD_SUPREMUM_END;
2372 	}
2373 	count = 0;
2374 	slot_index = 0;
2375 	n_recs = 0;
2376 
2377 	do {
2378 		offsets = rec_get_offsets(rec, index, offsets,
2379 					  ULINT_UNDEFINED, &heap);
2380 		insert_rec = rec_copy(heap_top, rec, offsets);
2381 
2382 		if (page_is_comp(new_page)) {
2383 			rec_set_next_offs_new(prev_rec,
2384 					      page_offset(insert_rec));
2385 
2386 			rec_set_n_owned_new(insert_rec, NULL, 0);
2387 			rec_set_heap_no_new(insert_rec,
2388 					    PAGE_HEAP_NO_USER_LOW + n_recs);
2389 		} else {
2390 			rec_set_next_offs_old(prev_rec,
2391 					      page_offset(insert_rec));
2392 
2393 			rec_set_n_owned_old(insert_rec, 0);
2394 			rec_set_heap_no_old(insert_rec,
2395 					    PAGE_HEAP_NO_USER_LOW + n_recs);
2396 		}
2397 
2398 		count++;
2399 		n_recs++;
2400 
2401 		if (UNIV_UNLIKELY
2402 		    (count == (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2)) {
2403 
2404 			slot_index++;
2405 
2406 			slot = page_dir_get_nth_slot(new_page, slot_index);
2407 
2408 			page_dir_slot_set_rec(slot, insert_rec);
2409 			page_dir_slot_set_n_owned(slot, NULL, count);
2410 
2411 			count = 0;
2412 		}
2413 
2414 		rec_size = rec_offs_size(offsets);
2415 
2416 		ut_ad(heap_top < new_page + UNIV_PAGE_SIZE);
2417 
2418 		heap_top += rec_size;
2419 
2420 		rec_offs_make_valid(insert_rec, index, offsets);
2421 		page_cur_insert_rec_write_log(insert_rec, rec_size, prev_rec,
2422 					      index, mtr);
2423 		prev_rec = insert_rec;
2424 		rec = page_rec_get_next(rec);
2425 	} while (!page_rec_is_supremum(rec));
2426 
2427 	if ((slot_index > 0) && (count + 1
2428 				 + (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2
2429 				 <= PAGE_DIR_SLOT_MAX_N_OWNED)) {
2430 		/* We can merge the two last dir slots. This operation is
2431 		here to make this function imitate exactly the equivalent
2432 		task made using page_cur_insert_rec, which we use in database
2433 		recovery to reproduce the task performed by this function.
2434 		To be able to check the correctness of recovery, it is good
2435 		that it imitates exactly. */
2436 
2437 		count += (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2;
2438 
2439 		page_dir_slot_set_n_owned(slot, NULL, 0);
2440 
2441 		slot_index--;
2442 	}
2443 
2444 	if (UNIV_LIKELY_NULL(heap)) {
2445 		mem_heap_free(heap);
2446 	}
2447 
2448 	log_data_len = mtr->get_log()->size() - log_data_len;
2449 
2450 	ut_a(log_data_len < 100 * UNIV_PAGE_SIZE);
2451 
2452 	if (log_ptr != NULL) {
2453 		mach_write_to_4(log_ptr, log_data_len);
2454 	}
2455 
2456 	if (page_is_comp(new_page)) {
2457 		rec_set_next_offs_new(insert_rec, PAGE_NEW_SUPREMUM);
2458 	} else {
2459 		rec_set_next_offs_old(insert_rec, PAGE_OLD_SUPREMUM);
2460 	}
2461 
2462 	slot = page_dir_get_nth_slot(new_page, 1 + slot_index);
2463 
2464 	page_dir_slot_set_rec(slot, page_get_supremum_rec(new_page));
2465 	page_dir_slot_set_n_owned(slot, NULL, count + 1);
2466 
2467 	page_dir_set_n_slots(new_page, NULL, 2 + slot_index);
2468 	page_header_set_ptr(new_page, NULL, PAGE_HEAP_TOP, heap_top);
2469 	page_dir_set_n_heap(new_page, NULL, PAGE_HEAP_NO_USER_LOW + n_recs);
2470 	page_header_set_field(new_page, NULL, PAGE_N_RECS, n_recs);
2471 
2472 	page_header_set_ptr(new_page, NULL, PAGE_LAST_INSERT, NULL);
2473 
2474 	page_header_set_field(new_page, NULL, PAGE_DIRECTION,
2475 			      PAGE_NO_DIRECTION);
2476 	page_header_set_field(new_page, NULL, PAGE_N_DIRECTION, 0);
2477 
2478 	/* Restore the log mode */
2479 
2480 	mtr_set_log_mode(mtr, log_mode);
2481 }
2482 
2483 /***********************************************************//**
2484 Writes log record of a record delete on a page. */
2485 UNIV_INLINE
2486 void
page_cur_delete_rec_write_log(rec_t * rec,const dict_index_t * index,mtr_t * mtr)2487 page_cur_delete_rec_write_log(
2488 /*==========================*/
2489 	rec_t*			rec,	/*!< in: record to be deleted */
2490 	const dict_index_t*	index,	/*!< in: record descriptor */
2491 	mtr_t*			mtr)	/*!< in: mini-transaction handle */
2492 {
2493 	byte*	log_ptr;
2494 
2495 	ut_ad(!!page_rec_is_comp(rec) == dict_table_is_comp(index->table));
2496 	ut_ad(mtr->is_named_space(index->space));
2497 
2498 	log_ptr = mlog_open_and_write_index(mtr, rec, index,
2499 					    page_rec_is_comp(rec)
2500 					    ? MLOG_COMP_REC_DELETE
2501 					    : MLOG_REC_DELETE, 2);
2502 
2503 	if (!log_ptr) {
2504 		/* Logging in mtr is switched off during crash recovery:
2505 		in that case mlog_open returns NULL */
2506 		return;
2507 	}
2508 
2509 	/* Write the cursor rec offset as a 2-byte ulint */
2510 	mach_write_to_2(log_ptr, page_offset(rec));
2511 
2512 	mlog_close(mtr, log_ptr + 2);
2513 }
2514 #else /* !UNIV_HOTBACKUP */
2515 # define page_cur_delete_rec_write_log(rec,index,mtr) ((void) 0)
2516 #endif /* !UNIV_HOTBACKUP */
2517 
2518 /***********************************************************//**
2519 Parses log record of a record delete on a page.
2520 @return pointer to record end or NULL */
2521 byte*
page_cur_parse_delete_rec(byte * ptr,byte * end_ptr,buf_block_t * block,dict_index_t * index,mtr_t * mtr)2522 page_cur_parse_delete_rec(
2523 /*======================*/
2524 	byte*		ptr,	/*!< in: buffer */
2525 	byte*		end_ptr,/*!< in: buffer end */
2526 	buf_block_t*	block,	/*!< in: page or NULL */
2527 	dict_index_t*	index,	/*!< in: record descriptor */
2528 	mtr_t*		mtr)	/*!< in: mtr or NULL */
2529 {
2530 	ulint		offset;
2531 	page_cur_t	cursor;
2532 
2533 	if (end_ptr < ptr + 2) {
2534 
2535 		return(NULL);
2536 	}
2537 
2538 	/* Read the cursor rec offset as a 2-byte ulint */
2539 	offset = mach_read_from_2(ptr);
2540 	ptr += 2;
2541 
2542 	ut_a(offset <= UNIV_PAGE_SIZE);
2543 
2544 	if (block) {
2545 		page_t*		page		= buf_block_get_frame(block);
2546 		mem_heap_t*	heap		= NULL;
2547 		ulint		offsets_[REC_OFFS_NORMAL_SIZE];
2548 		rec_t*		rec		= page + offset;
2549 		rec_offs_init(offsets_);
2550 
2551 		page_cur_position(rec, block, &cursor);
2552 		ut_ad(!buf_block_get_page_zip(block) || page_is_comp(page));
2553 
2554 		page_cur_delete_rec(&cursor, index,
2555 				    rec_get_offsets(rec, index, offsets_,
2556 						    ULINT_UNDEFINED, &heap),
2557 				    mtr);
2558 		if (UNIV_LIKELY_NULL(heap)) {
2559 			mem_heap_free(heap);
2560 		}
2561 	}
2562 
2563 	return(ptr);
2564 }
2565 
2566 /***********************************************************//**
2567 Deletes a record at the page cursor. The cursor is moved to the next
2568 record after the deleted one. */
2569 void
page_cur_delete_rec(page_cur_t * cursor,const dict_index_t * index,const ulint * offsets,mtr_t * mtr)2570 page_cur_delete_rec(
2571 /*================*/
2572 	page_cur_t*		cursor,	/*!< in/out: a page cursor */
2573 	const dict_index_t*	index,	/*!< in: record descriptor */
2574 	const ulint*		offsets,/*!< in: rec_get_offsets(
2575 					cursor->rec, index) */
2576 	mtr_t*			mtr)	/*!< in: mini-transaction handle
2577 					or NULL */
2578 {
2579 	page_dir_slot_t* cur_dir_slot;
2580 	page_dir_slot_t* prev_slot;
2581 	page_t*		page;
2582 	page_zip_des_t*	page_zip;
2583 	rec_t*		current_rec;
2584 	rec_t*		prev_rec	= NULL;
2585 	rec_t*		next_rec;
2586 	ulint		cur_slot_no;
2587 	ulint		cur_n_owned;
2588 	rec_t*		rec;
2589 
2590 	page = page_cur_get_page(cursor);
2591 	page_zip = page_cur_get_page_zip(cursor);
2592 
2593 	/* page_zip_validate() will fail here when
2594 	btr_cur_pessimistic_delete() invokes btr_set_min_rec_mark().
2595 	Then, both "page_zip" and "page" would have the min-rec-mark
2596 	set on the smallest user record, but "page" would additionally
2597 	have it set on the smallest-but-one record.  Because sloppy
2598 	page_zip_validate_low() only ignores min-rec-flag differences
2599 	in the smallest user record, it cannot be used here either. */
2600 
2601 	current_rec = cursor->rec;
2602 	ut_ad(rec_offs_validate(current_rec, index, offsets));
2603 	ut_ad(!!page_is_comp(page) == dict_table_is_comp(index->table));
2604 	ut_ad(fil_page_index_page_check(page));
2605 	ut_ad(mach_read_from_8(page + PAGE_HEADER + PAGE_INDEX_ID) == index->id
2606 	      || (mtr ? mtr->is_inside_ibuf() : dict_index_is_ibuf(index))
2607 	      || recv_recovery_is_on());
2608 	ut_ad(mtr == NULL || mtr->is_named_space(index->space));
2609 
2610 	/* The record must not be the supremum or infimum record. */
2611 	ut_ad(page_rec_is_user_rec(current_rec));
2612 
2613 	if (page_get_n_recs(page) == 1 && !recv_recovery_is_on()) {
2614 		/* Empty the page, unless we are applying the redo log
2615 		during crash recovery. During normal operation, the
2616 		page_create_empty() gets logged as one of MLOG_PAGE_CREATE,
2617 		MLOG_COMP_PAGE_CREATE, MLOG_ZIP_PAGE_COMPRESS. */
2618 		ut_ad(page_is_leaf(page));
2619 		/* Usually, this should be the root page,
2620 		and the whole index tree should become empty.
2621 		However, this could also be a call in
2622 		btr_cur_pessimistic_update() to delete the only
2623 		record in the page and to insert another one. */
2624 		page_cur_move_to_next(cursor);
2625 		ut_ad(page_cur_is_after_last(cursor));
2626 		page_create_empty(page_cur_get_block(cursor),
2627 				  const_cast<dict_index_t*>(index), mtr);
2628 		return;
2629 	}
2630 
2631 	/* Save to local variables some data associated with current_rec */
2632 	cur_slot_no = page_dir_find_owner_slot(current_rec);
2633 	ut_ad(cur_slot_no > 0);
2634 	cur_dir_slot = page_dir_get_nth_slot(page, cur_slot_no);
2635 	cur_n_owned = page_dir_slot_get_n_owned(cur_dir_slot);
2636 
2637 	/* 0. Write the log record */
2638 	if (mtr != 0) {
2639 		page_cur_delete_rec_write_log(current_rec, index, mtr);
2640 	}
2641 
2642 	/* 1. Reset the last insert info in the page header and increment
2643 	the modify clock for the frame */
2644 
2645 	page_header_set_ptr(page, page_zip, PAGE_LAST_INSERT, NULL);
2646 
2647 	/* The page gets invalid for optimistic searches: increment the
2648 	frame modify clock only if there is an mini-transaction covering
2649 	the change. During IMPORT we allocate local blocks that are not
2650 	part of the buffer pool. */
2651 
2652 	if (mtr != 0) {
2653 		buf_block_modify_clock_inc(page_cur_get_block(cursor));
2654 	}
2655 
2656 	/* 2. Find the next and the previous record. Note that the cursor is
2657 	left at the next record. */
2658 
2659 	ut_ad(cur_slot_no > 0);
2660 	prev_slot = page_dir_get_nth_slot(page, cur_slot_no - 1);
2661 
2662 	rec = (rec_t*) page_dir_slot_get_rec(prev_slot);
2663 
2664 	/* rec now points to the record of the previous directory slot. Look
2665 	for the immediate predecessor of current_rec in a loop. */
2666 
2667 	while (current_rec != rec) {
2668 		prev_rec = rec;
2669 		rec = page_rec_get_next(rec);
2670 	}
2671 
2672 	page_cur_move_to_next(cursor);
2673 	next_rec = cursor->rec;
2674 
2675 	/* 3. Remove the record from the linked list of records */
2676 
2677 	page_rec_set_next(prev_rec, next_rec);
2678 
2679 	/* 4. If the deleted record is pointed to by a dir slot, update the
2680 	record pointer in slot. In the following if-clause we assume that
2681 	prev_rec is owned by the same slot, i.e., PAGE_DIR_SLOT_MIN_N_OWNED
2682 	>= 2. */
2683 
2684 #if PAGE_DIR_SLOT_MIN_N_OWNED < 2
2685 # error "PAGE_DIR_SLOT_MIN_N_OWNED < 2"
2686 #endif
2687 	ut_ad(cur_n_owned > 1);
2688 
2689 	if (current_rec == page_dir_slot_get_rec(cur_dir_slot)) {
2690 		page_dir_slot_set_rec(cur_dir_slot, prev_rec);
2691 	}
2692 
2693 	/* 5. Update the number of owned records of the slot */
2694 
2695 	page_dir_slot_set_n_owned(cur_dir_slot, page_zip, cur_n_owned - 1);
2696 
2697 	/* 6. Free the memory occupied by the record */
2698 	page_mem_free(page, page_zip, current_rec, index, offsets);
2699 
2700 	/* 7. Now we have decremented the number of owned records of the slot.
2701 	If the number drops below PAGE_DIR_SLOT_MIN_N_OWNED, we balance the
2702 	slots. */
2703 
2704 	if (cur_n_owned <= PAGE_DIR_SLOT_MIN_N_OWNED) {
2705 		page_dir_balance_slot(page, page_zip, cur_slot_no);
2706 	}
2707 
2708 #ifdef UNIV_ZIP_DEBUG
2709 	ut_a(!page_zip || page_zip_validate(page_zip, page, index));
2710 #endif /* UNIV_ZIP_DEBUG */
2711 }
2712 
2713 #ifdef UNIV_COMPILE_TEST_FUNCS
2714 
2715 /*******************************************************************//**
2716 Print the first n numbers, generated by page_cur_lcg_prng() to make sure
2717 (visually) that it works properly. */
2718 void
test_page_cur_lcg_prng(int n)2719 test_page_cur_lcg_prng(
2720 /*===================*/
2721 	int	n)	/*!< in: print first n numbers */
2722 {
2723 	int			i;
2724 	unsigned long long	rnd;
2725 
2726 	for (i = 0; i < n; i++) {
2727 		rnd = page_cur_lcg_prng();
2728 		printf("%llu\t%%2=%llu %%3=%llu %%5=%llu %%7=%llu %%11=%llu\n",
2729 		       rnd,
2730 		       rnd % 2,
2731 		       rnd % 3,
2732 		       rnd % 5,
2733 		       rnd % 7,
2734 		       rnd % 11);
2735 	}
2736 }
2737 
2738 #endif /* UNIV_COMPILE_TEST_FUNCS */
2739