1 /*****************************************************************************
2 
3 Copyright (c) 1994, 2016, Oracle and/or its affiliates. All Rights Reserved.
4 Copyright (c) 2008, Google Inc.
5 Copyright (c) 2012, Facebook Inc.
6 
7 Portions of this file contain modifications contributed and copyrighted by
8 Google, Inc. Those modifications are gratefully acknowledged and are described
9 briefly in the InnoDB documentation. The contributions by Google are
10 incorporated with their permission, and subject to the conditions contained in
11 the file COPYING.Google.
12 
13 This program is free software; you can redistribute it and/or modify
14 it under the terms of the GNU General Public License, version 2.0,
15 as published by the Free Software Foundation.
16 
17 This program is also distributed with certain software (including
18 but not limited to OpenSSL) that is licensed under separate terms,
19 as designated in a particular file or component or in included license
20 documentation.  The authors of MySQL hereby grant you an additional
21 permission to link the program and your derivative works with the
22 separately licensed software that they have included with MySQL.
23 
24 This program is distributed in the hope that it will be useful,
25 but WITHOUT ANY WARRANTY; without even the implied warranty of
26 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
27 GNU General Public License, version 2.0, for more details.
28 
29 You should have received a copy of the GNU General Public License along with
30 this program; if not, write to the Free Software Foundation, Inc.,
31 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
32 
33 *****************************************************************************/
34 
35 /**************************************************//**
36 @file btr/btr0cur.cc
37 The index tree cursor
38 
39 All changes that row operations make to a B-tree or the records
40 there must go through this module! Undo log records are written here
41 of every modify or insert of a clustered index record.
42 
43 			NOTE!!!
44 To make sure we do not run out of disk space during a pessimistic
45 insert or update, we have to reserve 2 x the height of the index tree
46 many pages in the tablespace before we start the operation, because
47 if leaf splitting has been started, it is difficult to undo, except
48 by crashing the database and doing a roll-forward.
49 
50 Created 10/16/1994 Heikki Tuuri
51 *******************************************************/
52 
53 #include "btr0cur.h"
54 
55 #ifdef UNIV_NONINL
56 #include "btr0cur.ic"
57 #endif
58 
59 #include "row0upd.h"
60 #ifndef UNIV_HOTBACKUP
61 #include "mtr0log.h"
62 #include "page0page.h"
63 #include "page0zip.h"
64 #include "rem0rec.h"
65 #include "rem0cmp.h"
66 #include "buf0lru.h"
67 #include "btr0btr.h"
68 #include "btr0sea.h"
69 #include "row0log.h"
70 #include "row0purge.h"
71 #include "row0upd.h"
72 #include "trx0rec.h"
73 #include "trx0roll.h" /* trx_is_recv() */
74 #include "que0que.h"
75 #include "row0row.h"
76 #include "srv0srv.h"
77 #include "ibuf0ibuf.h"
78 #include "lock0lock.h"
79 #include "zlib.h"
80 
81 /** Buffered B-tree operation types, introduced as part of delete buffering. */
82 enum btr_op_t {
83 	BTR_NO_OP = 0,			/*!< Not buffered */
84 	BTR_INSERT_OP,			/*!< Insert, do not ignore UNIQUE */
85 	BTR_INSERT_IGNORE_UNIQUE_OP,	/*!< Insert, ignoring UNIQUE */
86 	BTR_DELETE_OP,			/*!< Purge a delete-marked record */
87 	BTR_DELMARK_OP			/*!< Mark a record for deletion */
88 };
89 
90 #ifdef UNIV_DEBUG
91 /** If the following is set to TRUE, this module prints a lot of
92 trace information of individual record operations */
93 UNIV_INTERN ibool	btr_cur_print_record_ops = FALSE;
94 #endif /* UNIV_DEBUG */
95 
96 /** Number of searches down the B-tree in btr_cur_search_to_nth_level(). */
97 UNIV_INTERN ulint	btr_cur_n_non_sea	= 0;
98 /** Number of successful adaptive hash index lookups in
99 btr_cur_search_to_nth_level(). */
100 UNIV_INTERN ulint	btr_cur_n_sea		= 0;
101 /** Old value of btr_cur_n_non_sea.  Copied by
102 srv_refresh_innodb_monitor_stats().  Referenced by
103 srv_printf_innodb_monitor(). */
104 UNIV_INTERN ulint	btr_cur_n_non_sea_old	= 0;
105 /** Old value of btr_cur_n_sea.  Copied by
106 srv_refresh_innodb_monitor_stats().  Referenced by
107 srv_printf_innodb_monitor(). */
108 UNIV_INTERN ulint	btr_cur_n_sea_old	= 0;
109 
110 #ifdef UNIV_DEBUG
111 /* Flag to limit optimistic insert records */
112 UNIV_INTERN uint	btr_cur_limit_optimistic_insert_debug = 0;
113 #endif /* UNIV_DEBUG */
114 
115 /** In the optimistic insert, if the insert does not fit, but this much space
116 can be released by page reorganize, then it is reorganized */
117 #define BTR_CUR_PAGE_REORGANIZE_LIMIT	(UNIV_PAGE_SIZE / 32)
118 
119 /** The structure of a BLOB part header */
120 /* @{ */
121 /*--------------------------------------*/
122 #define BTR_BLOB_HDR_PART_LEN		0	/*!< BLOB part len on this
123 						page */
124 #define BTR_BLOB_HDR_NEXT_PAGE_NO	4	/*!< next BLOB part page no,
125 						FIL_NULL if none */
126 /*--------------------------------------*/
127 #define BTR_BLOB_HDR_SIZE		8	/*!< Size of a BLOB
128 						part header, in bytes */
129 
130 /** Estimated table level stats from sampled value.
131 @param value		sampled stats
132 @param index		index being sampled
133 @param sample		number of sampled rows
134 @param ext_size		external stored data size
135 @param not_empty	table not empty
136 @return estimated table wide stats from sampled value */
137 #define BTR_TABLE_STATS_FROM_SAMPLE(value, index, sample, ext_size, not_empty)\
138 	(((value) * (ib_int64_t) index->stat_n_leaf_pages		\
139 	  + (sample) - 1 + (ext_size) + (not_empty)) / ((sample) + (ext_size)))
140 
141 /* @} */
142 #endif /* !UNIV_HOTBACKUP */
143 
144 /** A BLOB field reference full of zero, for use in assertions and tests.
145 Initially, BLOB field references are set to zero, in
146 dtuple_convert_big_rec(). */
147 const byte field_ref_zero[BTR_EXTERN_FIELD_REF_SIZE] = {
148 	0, 0, 0, 0, 0,
149 	0, 0, 0, 0, 0,
150 	0, 0, 0, 0, 0,
151 	0, 0, 0, 0, 0,
152 };
153 
154 #ifndef UNIV_HOTBACKUP
155 /*******************************************************************//**
156 Marks all extern fields in a record as owned by the record. This function
157 should be called if the delete mark of a record is removed: a not delete
158 marked record always owns all its extern fields. */
159 static
160 void
161 btr_cur_unmark_extern_fields(
162 /*=========================*/
163 	page_zip_des_t*	page_zip,/*!< in/out: compressed page whose uncompressed
164 				part will be updated, or NULL */
165 	rec_t*		rec,	/*!< in/out: record in a clustered index */
166 	dict_index_t*	index,	/*!< in: index of the page */
167 	const ulint*	offsets,/*!< in: array returned by rec_get_offsets() */
168 	mtr_t*		mtr);	/*!< in: mtr, or NULL if not logged */
169 /*******************************************************************//**
170 Adds path information to the cursor for the current page, for which
171 the binary search has been performed. */
172 static
173 void
174 btr_cur_add_path_info(
175 /*==================*/
176 	btr_cur_t*	cursor,		/*!< in: cursor positioned on a page */
177 	ulint		height,		/*!< in: height of the page in tree;
178 					0 means leaf node */
179 	ulint		root_height);	/*!< in: root node height in tree */
180 /***********************************************************//**
181 Frees the externally stored fields for a record, if the field is mentioned
182 in the update vector. */
183 static
184 void
185 btr_rec_free_updated_extern_fields(
186 /*===============================*/
187 	dict_index_t*	index,	/*!< in: index of rec; the index tree MUST be
188 				X-latched */
189 	rec_t*		rec,	/*!< in: record */
190 	page_zip_des_t*	page_zip,/*!< in: compressed page whose uncompressed
191 				part will be updated, or NULL */
192 	const ulint*	offsets,/*!< in: rec_get_offsets(rec, index) */
193 	const upd_t*	update,	/*!< in: update vector */
194 	enum trx_rb_ctx	rb_ctx,	/*!< in: rollback context */
195 	mtr_t*		mtr);	/*!< in: mini-transaction handle which contains
196 				an X-latch to record page and to the tree */
197 /***********************************************************//**
198 Frees the externally stored fields for a record. */
199 static
200 void
201 btr_rec_free_externally_stored_fields(
202 /*==================================*/
203 	dict_index_t*	index,	/*!< in: index of the data, the index
204 				tree MUST be X-latched */
205 	rec_t*		rec,	/*!< in: record */
206 	const ulint*	offsets,/*!< in: rec_get_offsets(rec, index) */
207 	page_zip_des_t*	page_zip,/*!< in: compressed page whose uncompressed
208 				part will be updated, or NULL */
209 	enum trx_rb_ctx	rb_ctx,	/*!< in: rollback context */
210 	mtr_t*		mtr);	/*!< in: mini-transaction handle which contains
211 				an X-latch to record page and to the index
212 				tree */
213 #endif /* !UNIV_HOTBACKUP */
214 
215 /******************************************************//**
216 The following function is used to set the deleted bit of a record. */
217 UNIV_INLINE
218 void
btr_rec_set_deleted_flag(rec_t * rec,page_zip_des_t * page_zip,ulint flag)219 btr_rec_set_deleted_flag(
220 /*=====================*/
221 	rec_t*		rec,	/*!< in/out: physical record */
222 	page_zip_des_t*	page_zip,/*!< in/out: compressed page (or NULL) */
223 	ulint		flag)	/*!< in: nonzero if delete marked */
224 {
225 	if (page_rec_is_comp(rec)) {
226 		rec_set_deleted_flag_new(rec, page_zip, flag);
227 	} else {
228 		ut_ad(!page_zip);
229 		rec_set_deleted_flag_old(rec, flag);
230 	}
231 }
232 
233 #ifndef UNIV_HOTBACKUP
234 /*==================== B-TREE SEARCH =========================*/
235 
236 /********************************************************************//**
237 Latches the leaf page or pages requested. */
238 static
239 void
btr_cur_latch_leaves(page_t * page,ulint space,ulint zip_size,ulint page_no,ulint latch_mode,btr_cur_t * cursor,mtr_t * mtr)240 btr_cur_latch_leaves(
241 /*=================*/
242 	page_t*		page,		/*!< in: leaf page where the search
243 					converged */
244 	ulint		space,		/*!< in: space id */
245 	ulint		zip_size,	/*!< in: compressed page size in bytes
246 					or 0 for uncompressed pages */
247 	ulint		page_no,	/*!< in: page number of the leaf */
248 	ulint		latch_mode,	/*!< in: BTR_SEARCH_LEAF, ... */
249 	btr_cur_t*	cursor,		/*!< in: cursor */
250 	mtr_t*		mtr)		/*!< in: mtr */
251 {
252 	ulint		mode;
253 	ulint		sibling_mode;
254 	ulint		left_page_no;
255 	ulint		right_page_no;
256 	buf_block_t*	get_block;
257 
258 	ut_ad(page && mtr);
259 
260 	switch (latch_mode) {
261 	case BTR_SEARCH_LEAF:
262 	case BTR_MODIFY_LEAF:
263 		mode = latch_mode == BTR_SEARCH_LEAF ? RW_S_LATCH : RW_X_LATCH;
264 		get_block = btr_block_get(
265 			space, zip_size, page_no, mode, cursor->index, mtr);
266 
267 		SRV_CORRUPT_TABLE_CHECK(get_block, return;);
268 
269 #ifdef UNIV_BTR_DEBUG
270 		ut_a(page_is_comp(get_block->frame) == page_is_comp(page));
271 #endif /* UNIV_BTR_DEBUG */
272 		get_block->check_index_page_at_flush = TRUE;
273 		return;
274 	case BTR_SEARCH_TREE:
275 	case BTR_MODIFY_TREE:
276 		if (UNIV_UNLIKELY(latch_mode == BTR_SEARCH_TREE)) {
277 			mode = RW_S_LATCH;
278 			sibling_mode = RW_NO_LATCH;
279 		} else {
280 			mode = sibling_mode = RW_X_LATCH;
281 		}
282 		/* Fetch and possibly latch also brothers from left to right */
283 		left_page_no = btr_page_get_prev(page, mtr);
284 
285 		if (left_page_no != FIL_NULL) {
286 			get_block = btr_block_get(
287 				space, zip_size, left_page_no,
288 				sibling_mode, cursor->index, mtr);
289 
290 			SRV_CORRUPT_TABLE_CHECK(get_block, return;);
291 
292 #ifdef UNIV_BTR_DEBUG
293 			ut_a(page_is_comp(get_block->frame)
294 			     == page_is_comp(page));
295 
296 			/* For fake_change mode we avoid a detailed validation
297 			as it operate in tweaked format where-in validation
298 			may fail. */
299 			ut_a(sibling_mode == RW_NO_LATCH
300 			     || btr_page_get_next(get_block->frame, mtr)
301 				== page_get_page_no(page));
302 #endif /* UNIV_BTR_DEBUG */
303 			if (sibling_mode == RW_NO_LATCH) {
304 				/* btr_block_get() called with RW_NO_LATCH will
305 				fix the read block in the buffer.  This serves
306 				no purpose for the fake changes prefetching,
307 				thus we unfix the sibling blocks immediately.*/
308 				mtr_memo_release(mtr, get_block,
309 						 MTR_MEMO_BUF_FIX);
310 			} else {
311 				get_block->check_index_page_at_flush = TRUE;
312 			}
313 		}
314 
315 		get_block = btr_block_get(
316 			space, zip_size, page_no,
317 			mode, cursor->index, mtr);
318 
319 		SRV_CORRUPT_TABLE_CHECK(get_block, return;);
320 
321 #ifdef UNIV_BTR_DEBUG
322 		ut_a(page_is_comp(get_block->frame) == page_is_comp(page));
323 #endif /* UNIV_BTR_DEBUG */
324 		get_block->check_index_page_at_flush = TRUE;
325 
326 		right_page_no = btr_page_get_next(page, mtr);
327 
328 		if (right_page_no != FIL_NULL) {
329 			get_block = btr_block_get(
330 				space, zip_size, right_page_no,
331 				sibling_mode, cursor->index, mtr);
332 
333 			SRV_CORRUPT_TABLE_CHECK(get_block, return;);
334 
335 #ifdef UNIV_BTR_DEBUG
336 			ut_a(page_is_comp(get_block->frame)
337 			     == page_is_comp(page));
338 			ut_a(btr_page_get_prev(get_block->frame, mtr)
339 			     == page_get_page_no(page));
340 #endif /* UNIV_BTR_DEBUG */
341 			if (sibling_mode == RW_NO_LATCH) {
342 				mtr_memo_release(mtr, get_block,
343 						 MTR_MEMO_BUF_FIX);
344 			} else {
345 				get_block->check_index_page_at_flush = TRUE;
346 			}
347 		}
348 
349 		return;
350 
351 	case BTR_SEARCH_PREV:
352 	case BTR_MODIFY_PREV:
353 		mode = latch_mode == BTR_SEARCH_PREV ? RW_S_LATCH : RW_X_LATCH;
354 		/* latch also left brother */
355 		left_page_no = btr_page_get_prev(page, mtr);
356 
357 		if (left_page_no != FIL_NULL) {
358 			get_block = btr_block_get(
359 				space, zip_size,
360 				left_page_no, mode, cursor->index, mtr);
361 			cursor->left_block = get_block;
362 
363 			SRV_CORRUPT_TABLE_CHECK(get_block, return;);
364 
365 #ifdef UNIV_BTR_DEBUG
366 			ut_a(page_is_comp(get_block->frame)
367 			     == page_is_comp(page));
368 			ut_a(btr_page_get_next(get_block->frame, mtr)
369 			     == page_get_page_no(page));
370 #endif /* UNIV_BTR_DEBUG */
371 			get_block->check_index_page_at_flush = TRUE;
372 		}
373 
374 		get_block = btr_block_get(
375 			space, zip_size, page_no, mode, cursor->index, mtr);
376 
377 		SRV_CORRUPT_TABLE_CHECK(get_block, return;);
378 
379 #ifdef UNIV_BTR_DEBUG
380 		ut_a(page_is_comp(get_block->frame) == page_is_comp(page));
381 #endif /* UNIV_BTR_DEBUG */
382 		get_block->check_index_page_at_flush = TRUE;
383 		return;
384 	}
385 
386 	ut_error;
387 }
388 
389 /********************************************************************//**
390 Searches an index tree and positions a tree cursor on a given level.
391 NOTE: n_fields_cmp in tuple must be set so that it cannot be compared
392 to node pointer page number fields on the upper levels of the tree!
393 Note that if mode is PAGE_CUR_LE, which is used in inserts, then
394 cursor->up_match and cursor->low_match both will have sensible values.
395 If mode is PAGE_CUR_GE, then up_match will a have a sensible value.
396 
397 If mode is PAGE_CUR_LE , cursor is left at the place where an insert of the
398 search tuple should be performed in the B-tree. InnoDB does an insert
399 immediately after the cursor. Thus, the cursor may end up on a user record,
400 or on a page infimum record. */
401 UNIV_INTERN
402 void
btr_cur_search_to_nth_level(dict_index_t * index,ulint level,const dtuple_t * tuple,ulint mode,ulint latch_mode,btr_cur_t * cursor,ulint has_search_latch,const char * file,ulint line,mtr_t * mtr)403 btr_cur_search_to_nth_level(
404 /*========================*/
405 	dict_index_t*	index,	/*!< in: index */
406 	ulint		level,	/*!< in: the tree level of search */
407 	const dtuple_t*	tuple,	/*!< in: data tuple; NOTE: n_fields_cmp in
408 				tuple must be set so that it cannot get
409 				compared to the node ptr page number field! */
410 	ulint		mode,	/*!< in: PAGE_CUR_L, ...;
411 				Inserts should always be made using
412 				PAGE_CUR_LE to search the position! */
413 	ulint		latch_mode, /*!< in: BTR_SEARCH_LEAF, ..., ORed with
414 				at most one of BTR_INSERT, BTR_DELETE_MARK,
415 				BTR_DELETE, or BTR_ESTIMATE;
416 				cursor->left_block is used to store a pointer
417 				to the left neighbor page, in the cases
418 				BTR_SEARCH_PREV and BTR_MODIFY_PREV;
419 				NOTE that if has_search_latch
420 				is != 0, we maybe do not have a latch set
421 				on the cursor page, we assume
422 				the caller uses his search latch
423 				to protect the record! */
424 	btr_cur_t*	cursor, /*!< in/out: tree cursor; the cursor page is
425 				s- or x-latched, but see also above! */
426 	ulint		has_search_latch,/*!< in: info on the latch mode the
427 				caller currently has on btr_search_latch:
428 				RW_S_LATCH, or 0 */
429 	const char*	file,	/*!< in: file name */
430 	ulint		line,	/*!< in: line where called */
431 	mtr_t*		mtr)	/*!< in: mtr */
432 {
433 	page_t*		page;
434 	buf_block_t*	block;
435 	ulint		space;
436 	buf_block_t*	guess;
437 	ulint		height;
438 	ulint		page_no;
439 	ulint		up_match;
440 	ulint		up_bytes;
441 	ulint		low_match;
442 	ulint		low_bytes;
443 	ulint		savepoint;
444 	ulint		rw_latch;
445 	ulint		page_mode;
446 	ulint		buf_mode;
447 	ulint		estimate;
448 	ulint		zip_size;
449 	page_cur_t*	page_cursor;
450 	btr_op_t	btr_op;
451 	ulint		root_height = 0; /* remove warning */
452 
453 #ifdef BTR_CUR_ADAPT
454 	btr_search_t*	info;
455 #endif
456 	mem_heap_t*	heap		= NULL;
457 	ulint		offsets_[REC_OFFS_NORMAL_SIZE];
458 	ulint*		offsets		= offsets_;
459 	rec_offs_init(offsets_);
460 	/* Currently, PAGE_CUR_LE is the only search mode used for searches
461 	ending to upper levels */
462 
463 	ut_ad(level == 0 || mode == PAGE_CUR_LE);
464 	ut_ad(dict_index_check_search_tuple(index, tuple));
465 	ut_ad(!dict_index_is_ibuf(index) || ibuf_inside(mtr));
466 	ut_ad(dtuple_check_typed(tuple));
467 	ut_ad(!(index->type & DICT_FTS));
468 	ut_ad(index->page != FIL_NULL);
469 
470 	UNIV_MEM_INVALID(&cursor->up_match, sizeof cursor->up_match);
471 	UNIV_MEM_INVALID(&cursor->up_bytes, sizeof cursor->up_bytes);
472 	UNIV_MEM_INVALID(&cursor->low_match, sizeof cursor->low_match);
473 	UNIV_MEM_INVALID(&cursor->low_bytes, sizeof cursor->low_bytes);
474 #ifdef UNIV_DEBUG
475 	cursor->up_match = ULINT_UNDEFINED;
476 	cursor->low_match = ULINT_UNDEFINED;
477 #endif
478 
479 	ibool	s_latch_by_caller;
480 
481 	s_latch_by_caller = latch_mode & BTR_ALREADY_S_LATCHED;
482 
483 	ut_ad(!s_latch_by_caller
484 	      || mtr_memo_contains(mtr, dict_index_get_lock(index),
485 				   MTR_MEMO_S_LOCK));
486 
487 	/* These flags are mutually exclusive, they are lumped together
488 	with the latch mode for historical reasons. It's possible for
489 	none of the flags to be set. */
490 	switch (UNIV_EXPECT(latch_mode
491 			    & (BTR_INSERT | BTR_DELETE | BTR_DELETE_MARK),
492 			    0)) {
493 	case 0:
494 		btr_op = BTR_NO_OP;
495 		break;
496 	case BTR_INSERT:
497 		btr_op = (latch_mode & BTR_IGNORE_SEC_UNIQUE)
498 			? BTR_INSERT_IGNORE_UNIQUE_OP
499 			: BTR_INSERT_OP;
500 		break;
501 	case BTR_DELETE:
502 		btr_op = BTR_DELETE_OP;
503 		ut_a(cursor->purge_node);
504 		break;
505 	case BTR_DELETE_MARK:
506 		btr_op = BTR_DELMARK_OP;
507 		break;
508 	default:
509 		/* only one of BTR_INSERT, BTR_DELETE, BTR_DELETE_MARK
510 		should be specified at a time */
511 		ut_error;
512 	}
513 
514 	/* Operations on the insert buffer tree cannot be buffered. */
515 	ut_ad(btr_op == BTR_NO_OP || !dict_index_is_ibuf(index));
516 	/* Operations on the clustered index cannot be buffered. */
517 	ut_ad(btr_op == BTR_NO_OP || !dict_index_is_clust(index));
518 
519 	estimate = latch_mode & BTR_ESTIMATE;
520 
521 	/* Turn the flags unrelated to the latch mode off. */
522 	latch_mode = BTR_LATCH_MODE_WITHOUT_FLAGS(latch_mode);
523 
524 	ut_ad(!s_latch_by_caller
525 	      || latch_mode == BTR_SEARCH_LEAF
526 	      || latch_mode == BTR_MODIFY_LEAF);
527 
528 	cursor->flag = BTR_CUR_BINARY;
529 	cursor->index = index;
530 
531 #ifndef BTR_CUR_ADAPT
532 	guess = NULL;
533 #else
534 	info = btr_search_get_info(index);
535 
536 	guess = info->root_guess;
537 
538 #ifdef BTR_CUR_HASH_ADAPT
539 
540 # ifdef UNIV_SEARCH_PERF_STAT
541 	info->n_searches++;
542 # endif
543 	if (rw_lock_get_writer(btr_search_get_latch(cursor->index)) ==
544 	    RW_LOCK_NOT_LOCKED
545 	    && latch_mode <= BTR_MODIFY_LEAF
546 	    && info->last_hash_succ
547 	    && !estimate
548 # ifdef PAGE_CUR_LE_OR_EXTENDS
549 	    && mode != PAGE_CUR_LE_OR_EXTENDS
550 # endif /* PAGE_CUR_LE_OR_EXTENDS */
551 	    /* If !has_search_latch, we do a dirty read of
552 	    btr_search_enabled below, and btr_search_guess_on_hash()
553 	    will have to check it again. */
554 	    && UNIV_LIKELY(btr_search_enabled)
555 	    && btr_search_guess_on_hash(index, info, tuple, mode,
556 					latch_mode, cursor,
557 					has_search_latch, mtr)) {
558 
559 		/* Search using the hash index succeeded */
560 
561 		ut_ad(cursor->up_match != ULINT_UNDEFINED
562 		      || mode != PAGE_CUR_GE);
563 		ut_ad(cursor->up_match != ULINT_UNDEFINED
564 		      || mode != PAGE_CUR_LE);
565 		ut_ad(cursor->low_match != ULINT_UNDEFINED
566 		      || mode != PAGE_CUR_LE);
567 		btr_cur_n_sea++;
568 
569 		return;
570 	}
571 # endif /* BTR_CUR_HASH_ADAPT */
572 #endif /* BTR_CUR_ADAPT */
573 	btr_cur_n_non_sea++;
574 
575 	/* If the hash search did not succeed, do binary search down the
576 	tree */
577 
578 	if (has_search_latch) {
579 		/* Release possible search latch to obey latching order */
580 		rw_lock_s_unlock(btr_search_get_latch(cursor->index));
581 	}
582 
583 	/* Store the position of the tree latch we push to mtr so that we
584 	know how to release it when we have latched leaf node(s) */
585 
586 	savepoint = mtr_set_savepoint(mtr);
587 
588 	switch (latch_mode) {
589 	case BTR_MODIFY_TREE:
590 		mtr_x_lock(dict_index_get_lock(index), mtr);
591 		break;
592 	case BTR_CONT_MODIFY_TREE:
593 		/* Do nothing */
594 		ut_ad(mtr_memo_contains(mtr, dict_index_get_lock(index),
595 					MTR_MEMO_X_LOCK));
596 		break;
597 	default:
598 		if (!s_latch_by_caller) {
599 			mtr_s_lock(dict_index_get_lock(index), mtr);
600 		}
601 	}
602 
603 	page_cursor = btr_cur_get_page_cur(cursor);
604 
605 	space = dict_index_get_space(index);
606 	page_no = dict_index_get_page(index);
607 
608 	up_match = 0;
609 	up_bytes = 0;
610 	low_match = 0;
611 	low_bytes = 0;
612 
613 	height = ULINT_UNDEFINED;
614 
615 	/* We use these modified search modes on non-leaf levels of the
616 	B-tree. These let us end up in the right B-tree leaf. In that leaf
617 	we use the original search mode. */
618 
619 	switch (mode) {
620 	case PAGE_CUR_GE:
621 		page_mode = PAGE_CUR_L;
622 		break;
623 	case PAGE_CUR_G:
624 		page_mode = PAGE_CUR_LE;
625 		break;
626 	default:
627 #ifdef PAGE_CUR_LE_OR_EXTENDS
628 		ut_ad(mode == PAGE_CUR_L || mode == PAGE_CUR_LE
629 		      || mode == PAGE_CUR_LE_OR_EXTENDS);
630 #else /* PAGE_CUR_LE_OR_EXTENDS */
631 		ut_ad(mode == PAGE_CUR_L || mode == PAGE_CUR_LE);
632 #endif /* PAGE_CUR_LE_OR_EXTENDS */
633 		page_mode = mode;
634 		break;
635 	}
636 
637 	/* Loop and search until we arrive at the desired level */
638 
639 search_loop:
640 	buf_mode = BUF_GET;
641 	rw_latch = RW_NO_LATCH;
642 
643 	if (height != 0) {
644 		/* We are about to fetch the root or a non-leaf page. */
645 	} else if (latch_mode <= BTR_MODIFY_LEAF) {
646 		rw_latch = latch_mode;
647 
648 		if (btr_op != BTR_NO_OP
649 		    && ibuf_should_try(index, btr_op != BTR_INSERT_OP)) {
650 
651 			/* Try to buffer the operation if the leaf
652 			page is not in the buffer pool. */
653 
654 			buf_mode = btr_op == BTR_DELETE_OP
655 				? BUF_GET_IF_IN_POOL_OR_WATCH
656 				: BUF_GET_IF_IN_POOL;
657 		}
658 	}
659 
660 	zip_size = dict_table_zip_size(index->table);
661 
662 retry_page_get:
663 	block = buf_page_get_gen(
664 		space, zip_size, page_no, rw_latch, guess, buf_mode,
665 		file, line, mtr);
666 
667 	if (block == NULL) {
668 		SRV_CORRUPT_TABLE_CHECK(buf_mode == BUF_GET_IF_IN_POOL ||
669 					buf_mode == BUF_GET_IF_IN_POOL_OR_WATCH,
670 			{
671 				page_cursor->block = 0;
672 				page_cursor->rec = 0;
673 				if (estimate) {
674 
675 					cursor->path_arr->nth_rec =
676 						ULINT_UNDEFINED;
677 				}
678 
679 				goto func_exit;
680 			});
681 
682 		/* This must be a search to perform an insert/delete
683 		mark/ delete; try using the insert/delete buffer */
684 
685 		ut_ad(height == 0);
686 		ut_ad(cursor->thr);
687 
688 		switch (btr_op) {
689 		case BTR_INSERT_OP:
690 		case BTR_INSERT_IGNORE_UNIQUE_OP:
691 			ut_ad(buf_mode == BUF_GET_IF_IN_POOL);
692 
693 			if (ibuf_insert(IBUF_OP_INSERT, tuple, index,
694 					space, zip_size, page_no,
695 					cursor->thr)) {
696 
697 				cursor->flag = BTR_CUR_INSERT_TO_IBUF;
698 
699 				goto func_exit;
700 			}
701 			break;
702 
703 		case BTR_DELMARK_OP:
704 			ut_ad(buf_mode == BUF_GET_IF_IN_POOL);
705 
706 			if (ibuf_insert(IBUF_OP_DELETE_MARK, tuple,
707 					index, space, zip_size,
708 					page_no, cursor->thr)) {
709 
710 				cursor->flag = BTR_CUR_DEL_MARK_IBUF;
711 
712 				goto func_exit;
713 			}
714 
715 			break;
716 
717 		case BTR_DELETE_OP:
718 			ut_ad(buf_mode == BUF_GET_IF_IN_POOL_OR_WATCH);
719 
720 			if (!row_purge_poss_sec(cursor->purge_node,
721 						index, tuple)) {
722 
723 				/* The record cannot be purged yet. */
724 				cursor->flag = BTR_CUR_DELETE_REF;
725 			} else if (ibuf_insert(IBUF_OP_DELETE, tuple,
726 					       index, space, zip_size,
727 					       page_no,
728 					       cursor->thr)) {
729 
730 				/* The purge was buffered. */
731 				cursor->flag = BTR_CUR_DELETE_IBUF;
732 			} else {
733 				/* The purge could not be buffered. */
734 				buf_pool_watch_unset(space, page_no);
735 				break;
736 			}
737 
738 			buf_pool_watch_unset(space, page_no);
739 			goto func_exit;
740 
741 		default:
742 			ut_error;
743 		}
744 
745 		/* Insert to the insert/delete buffer did not succeed, we
746 		must read the page from disk. */
747 
748 		buf_mode = BUF_GET;
749 
750 		goto retry_page_get;
751 	}
752 
753 	block->check_index_page_at_flush = TRUE;
754 	page = buf_block_get_frame(block);
755 
756 	SRV_CORRUPT_TABLE_CHECK(page,
757 	{
758 		page_cursor->block = 0;
759 		page_cursor->rec = 0;
760 
761 		if (estimate) {
762 
763 			cursor->path_arr->nth_rec = ULINT_UNDEFINED;
764 		}
765 
766 		goto func_exit;
767 	});
768 
769 	if (rw_latch != RW_NO_LATCH) {
770 #ifdef UNIV_ZIP_DEBUG
771 		const page_zip_des_t*	page_zip
772 			= buf_block_get_page_zip(block);
773 		ut_a(!page_zip || page_zip_validate(page_zip, page, index));
774 #endif /* UNIV_ZIP_DEBUG */
775 
776 		buf_block_dbg_add_level(
777 			block, dict_index_is_ibuf(index)
778 			? SYNC_IBUF_TREE_NODE : SYNC_TREE_NODE);
779 	}
780 
781 	ut_ad(fil_page_get_type(page) == FIL_PAGE_INDEX);
782 	ut_ad(index->id == btr_page_get_index_id(page));
783 
784 	if (UNIV_UNLIKELY(height == ULINT_UNDEFINED)) {
785 		/* We are in the root node */
786 
787 		height = btr_page_get_level(page, mtr);
788 		root_height = height;
789 		cursor->tree_height = root_height + 1;
790 
791 #ifdef BTR_CUR_ADAPT
792 		if (block != guess) {
793 			info->root_guess = block;
794 		}
795 #endif
796 	}
797 
798 	if (height == 0) {
799 		if (rw_latch == RW_NO_LATCH) {
800 
801 			btr_cur_latch_leaves(
802 				page, space, zip_size, page_no, latch_mode,
803 				cursor, mtr);
804 		}
805 
806 		switch (latch_mode) {
807 		case BTR_MODIFY_TREE:
808 		case BTR_CONT_MODIFY_TREE:
809 			break;
810 		default:
811 			if (!s_latch_by_caller) {
812 				/* Release the tree s-latch */
813 				mtr_release_s_latch_at_savepoint(
814 					mtr, savepoint,
815 					dict_index_get_lock(index));
816 			}
817 		}
818 
819 		page_mode = mode;
820 	}
821 
822 	page_cur_search_with_match(
823 		block, index, tuple, page_mode, &up_match, &up_bytes,
824 		&low_match, &low_bytes, page_cursor);
825 
826 	if (estimate) {
827 		btr_cur_add_path_info(cursor, height, root_height);
828 	}
829 
830 	/* If this is the desired level, leave the loop */
831 
832 	ut_ad(height == btr_page_get_level(page_cur_get_page(page_cursor),
833 					   mtr));
834 
835 	if (level != height) {
836 
837 		const rec_t*	node_ptr;
838 		ut_ad(height > 0);
839 
840 		height--;
841 		guess = NULL;
842 
843 		node_ptr = page_cur_get_rec(page_cursor);
844 
845 		offsets = rec_get_offsets(
846 			node_ptr, index, offsets, ULINT_UNDEFINED, &heap);
847 
848 		/* Go to the child node */
849 		page_no = btr_node_ptr_get_child_page_no(node_ptr, offsets);
850 
851 		if (UNIV_UNLIKELY(height == 0 && dict_index_is_ibuf(index))) {
852 			/* We're doing a search on an ibuf tree and we're one
853 			level above the leaf page. */
854 
855 			ut_ad(level == 0);
856 
857 			buf_mode = BUF_GET;
858 			rw_latch = RW_NO_LATCH;
859 			goto retry_page_get;
860 		}
861 
862 		goto search_loop;
863 	}
864 
865 	if (level != 0) {
866 		/* x-latch the page */
867 		buf_block_t*	child_block = btr_block_get(
868 			space, zip_size, page_no, RW_X_LATCH, index, mtr);
869 
870 		page = buf_block_get_frame(child_block);
871 		btr_assert_not_corrupted(child_block, index);
872 	} else {
873 		cursor->low_match = low_match;
874 		cursor->low_bytes = low_bytes;
875 		cursor->up_match = up_match;
876 		cursor->up_bytes = up_bytes;
877 
878 #ifdef BTR_CUR_ADAPT
879 		/* We do a dirty read of btr_search_enabled here.  We
880 		will properly check btr_search_enabled again in
881 		btr_search_build_page_hash_index() before building a
882 		page hash index, while holding btr_search_latch. */
883 		if (btr_search_enabled) {
884 			btr_search_info_update(index, cursor);
885 		}
886 #endif
887 		ut_ad(cursor->up_match != ULINT_UNDEFINED
888 		      || mode != PAGE_CUR_GE);
889 		ut_ad(cursor->up_match != ULINT_UNDEFINED
890 		      || mode != PAGE_CUR_LE);
891 		ut_ad(cursor->low_match != ULINT_UNDEFINED
892 		      || mode != PAGE_CUR_LE);
893 	}
894 
895 func_exit:
896 
897 	if (UNIV_LIKELY_NULL(heap)) {
898 		mem_heap_free(heap);
899 	}
900 
901 	if (has_search_latch) {
902 
903 		rw_lock_s_lock(btr_search_get_latch(cursor->index));
904 	}
905 }
906 
907 /*****************************************************************//**
908 Opens a cursor at either end of an index. */
909 UNIV_INTERN
910 void
btr_cur_open_at_index_side_func(bool from_left,dict_index_t * index,ulint latch_mode,btr_cur_t * cursor,ulint level,const char * file,ulint line,mtr_t * mtr)911 btr_cur_open_at_index_side_func(
912 /*============================*/
913 	bool		from_left,	/*!< in: true if open to the low end,
914 					false if to the high end */
915 	dict_index_t*	index,		/*!< in: index */
916 	ulint		latch_mode,	/*!< in: latch mode */
917 	btr_cur_t*	cursor,		/*!< in/out: cursor */
918 	ulint		level,		/*!< in: level to search for
919 					(0=leaf). */
920 	const char*	file,		/*!< in: file name */
921 	ulint		line,		/*!< in: line where called */
922 	mtr_t*		mtr)		/*!< in/out: mini-transaction */
923 {
924 	page_cur_t*	page_cursor;
925 	ulint		page_no;
926 	ulint		space;
927 	ulint		zip_size;
928 	ulint		height;
929 	ulint		root_height = 0; /* remove warning */
930 	rec_t*		node_ptr;
931 	ulint		estimate;
932 	ulint		savepoint;
933 	mem_heap_t*	heap		= NULL;
934 	ulint		offsets_[REC_OFFS_NORMAL_SIZE];
935 	ulint*		offsets		= offsets_;
936 	rec_offs_init(offsets_);
937 
938 	estimate = latch_mode & BTR_ESTIMATE;
939 	latch_mode &= ~BTR_ESTIMATE;
940 
941 	ut_ad(level != ULINT_UNDEFINED);
942 
943 	/* Store the position of the tree latch we push to mtr so that we
944 	know how to release it when we have latched the leaf node */
945 
946 	savepoint = mtr_set_savepoint(mtr);
947 
948 	switch (latch_mode) {
949 	case BTR_CONT_MODIFY_TREE:
950 		break;
951 	case BTR_MODIFY_TREE:
952 		mtr_x_lock(dict_index_get_lock(index), mtr);
953 		break;
954 	case BTR_SEARCH_LEAF | BTR_ALREADY_S_LATCHED:
955 	case BTR_MODIFY_LEAF | BTR_ALREADY_S_LATCHED:
956 		ut_ad(mtr_memo_contains(mtr, dict_index_get_lock(index),
957 					MTR_MEMO_S_LOCK));
958 		break;
959 	default:
960 		mtr_s_lock(dict_index_get_lock(index), mtr);
961 	}
962 
963 	page_cursor = btr_cur_get_page_cur(cursor);
964 	cursor->index = index;
965 
966 	space = dict_index_get_space(index);
967 	zip_size = dict_table_zip_size(index->table);
968 	page_no = dict_index_get_page(index);
969 
970 	height = ULINT_UNDEFINED;
971 
972 	for (;;) {
973 		buf_block_t*	block;
974 		page_t*		page;
975 		block = buf_page_get_gen(space, zip_size, page_no,
976 					 RW_NO_LATCH, NULL, BUF_GET,
977 					 file, line, mtr);
978 		page = buf_block_get_frame(block);
979 
980 		SRV_CORRUPT_TABLE_CHECK(page,
981 		{
982 			page_cursor->block = 0;
983 			page_cursor->rec = 0;
984 
985 			if (estimate) {
986 
987 				cursor->path_arr->nth_rec =
988 					ULINT_UNDEFINED;
989 			}
990 			/* Can't use break with the macro */
991 			goto exit_loop;
992 		});
993 
994 		ut_ad(fil_page_get_type(page) == FIL_PAGE_INDEX);
995 
996 		ut_ad(index->id == btr_page_get_index_id(page));
997 
998 		block->check_index_page_at_flush = TRUE;
999 
1000 		if (height == ULINT_UNDEFINED) {
1001 			/* We are in the root node */
1002 
1003 			height = btr_page_get_level(page, mtr);
1004 			root_height = height;
1005 			ut_a(height >= level);
1006 		} else {
1007 			/* TODO: flag the index corrupted if this fails */
1008 			ut_ad(height == btr_page_get_level(page, mtr));
1009 		}
1010 
1011 		if (height == level) {
1012 			btr_cur_latch_leaves(
1013 				page, space, zip_size, page_no,
1014 				latch_mode & ~BTR_ALREADY_S_LATCHED,
1015 				cursor, mtr);
1016 
1017 			if (height == 0) {
1018 				/* In versions <= 3.23.52 we had
1019 				forgotten to release the tree latch
1020 				here. If in an index scan we had to
1021 				scan far to find a record visible to
1022 				the current transaction, that could
1023 				starve others waiting for the tree
1024 				latch. */
1025 
1026 				switch (latch_mode) {
1027 				case BTR_MODIFY_TREE:
1028 				case BTR_CONT_MODIFY_TREE:
1029 				case BTR_SEARCH_LEAF | BTR_ALREADY_S_LATCHED:
1030 				case BTR_MODIFY_LEAF | BTR_ALREADY_S_LATCHED:
1031 					break;
1032 				default:
1033 					/* Release the tree s-latch */
1034 
1035 					mtr_release_s_latch_at_savepoint(
1036 						mtr, savepoint,
1037 						dict_index_get_lock(index));
1038 				}
1039 			}
1040 		}
1041 
1042 		if (from_left) {
1043 			page_cur_set_before_first(block, page_cursor);
1044 		} else {
1045 			page_cur_set_after_last(block, page_cursor);
1046 		}
1047 
1048 		if (height == level) {
1049 			if (estimate) {
1050 				btr_cur_add_path_info(cursor, height,
1051 						      root_height);
1052 			}
1053 
1054 			break;
1055 		}
1056 
1057 		ut_ad(height > 0);
1058 
1059 		if (from_left) {
1060 			page_cur_move_to_next(page_cursor);
1061 		} else {
1062 			page_cur_move_to_prev(page_cursor);
1063 		}
1064 
1065 		if (estimate) {
1066 			btr_cur_add_path_info(cursor, height, root_height);
1067 		}
1068 
1069 		height--;
1070 
1071 		node_ptr = page_cur_get_rec(page_cursor);
1072 		offsets = rec_get_offsets(node_ptr, cursor->index, offsets,
1073 					  ULINT_UNDEFINED, &heap);
1074 		/* Go to the child node */
1075 		page_no = btr_node_ptr_get_child_page_no(node_ptr, offsets);
1076 	}
1077 
1078 exit_loop:
1079 	if (UNIV_LIKELY_NULL(heap)) {
1080 		mem_heap_free(heap);
1081 	}
1082 }
1083 
1084 /**********************************************************************//**
1085 Positions a cursor at a randomly chosen position within a B-tree. */
1086 UNIV_INTERN
1087 void
btr_cur_open_at_rnd_pos_func(dict_index_t * index,ulint latch_mode,btr_cur_t * cursor,const char * file,ulint line,mtr_t * mtr)1088 btr_cur_open_at_rnd_pos_func(
1089 /*=========================*/
1090 	dict_index_t*	index,		/*!< in: index */
1091 	ulint		latch_mode,	/*!< in: BTR_SEARCH_LEAF, ... */
1092 	btr_cur_t*	cursor,		/*!< in/out: B-tree cursor */
1093 	const char*	file,		/*!< in: file name */
1094 	ulint		line,		/*!< in: line where called */
1095 	mtr_t*		mtr)		/*!< in: mtr */
1096 {
1097 	page_cur_t*	page_cursor;
1098 	ulint		page_no;
1099 	ulint		space;
1100 	ulint		zip_size;
1101 	ulint		height;
1102 	rec_t*		node_ptr;
1103 	mem_heap_t*	heap		= NULL;
1104 	ulint		offsets_[REC_OFFS_NORMAL_SIZE];
1105 	ulint*		offsets		= offsets_;
1106 	rec_offs_init(offsets_);
1107 
1108 	switch (latch_mode) {
1109 	case BTR_MODIFY_TREE:
1110 		mtr_x_lock(dict_index_get_lock(index), mtr);
1111 		break;
1112 	default:
1113 		ut_ad(latch_mode != BTR_CONT_MODIFY_TREE);
1114 		mtr_s_lock(dict_index_get_lock(index), mtr);
1115 	}
1116 
1117 	page_cursor = btr_cur_get_page_cur(cursor);
1118 	cursor->index = index;
1119 
1120 	space = dict_index_get_space(index);
1121 	zip_size = dict_table_zip_size(index->table);
1122 	page_no = dict_index_get_page(index);
1123 
1124 	height = ULINT_UNDEFINED;
1125 
1126 	for (;;) {
1127 		buf_block_t*	block;
1128 		page_t*		page;
1129 
1130 		block = buf_page_get_gen(space, zip_size, page_no,
1131 					 RW_NO_LATCH, NULL, BUF_GET,
1132 					 file, line, mtr);
1133 		page = buf_block_get_frame(block);
1134 
1135 		SRV_CORRUPT_TABLE_CHECK(page,
1136 		{
1137 			page_cursor->block = 0;
1138 			page_cursor->rec = 0;
1139 
1140 			goto exit_loop;
1141 		});
1142 
1143 		ut_ad(fil_page_get_type(page) == FIL_PAGE_INDEX);
1144 
1145 		ut_ad(index->id == btr_page_get_index_id(page));
1146 
1147 		if (height == ULINT_UNDEFINED) {
1148 			/* We are in the root node */
1149 
1150 			height = btr_page_get_level(page, mtr);
1151 		}
1152 
1153 		if (height == 0) {
1154 			btr_cur_latch_leaves(page, space, zip_size, page_no,
1155 					     latch_mode, cursor, mtr);
1156 		}
1157 
1158 		page_cur_open_on_rnd_user_rec(block, page_cursor);
1159 
1160 		if (height == 0) {
1161 
1162 			break;
1163 		}
1164 
1165 		ut_ad(height > 0);
1166 
1167 		height--;
1168 
1169 		node_ptr = page_cur_get_rec(page_cursor);
1170 		offsets = rec_get_offsets(node_ptr, cursor->index, offsets,
1171 					  ULINT_UNDEFINED, &heap);
1172 		/* Go to the child node */
1173 		page_no = btr_node_ptr_get_child_page_no(node_ptr, offsets);
1174 	}
1175 
1176 exit_loop:
1177 	if (UNIV_LIKELY_NULL(heap)) {
1178 		mem_heap_free(heap);
1179 	}
1180 }
1181 
1182 /*==================== B-TREE INSERT =========================*/
1183 
1184 /*************************************************************//**
1185 Inserts a record if there is enough space, or if enough space can
1186 be freed by reorganizing. Differs from btr_cur_optimistic_insert because
1187 no heuristics is applied to whether it pays to use CPU time for
1188 reorganizing the page or not.
1189 
1190 IMPORTANT: The caller will have to update IBUF_BITMAP_FREE
1191 if this is a compressed leaf page in a secondary index.
1192 This has to be done either within the same mini-transaction,
1193 or by invoking ibuf_reset_free_bits() before mtr_commit().
1194 
1195 @return	pointer to inserted record if succeed, else NULL */
1196 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1197 rec_t*
btr_cur_insert_if_possible(btr_cur_t * cursor,const dtuple_t * tuple,ulint ** offsets,mem_heap_t ** heap,ulint n_ext,mtr_t * mtr)1198 btr_cur_insert_if_possible(
1199 /*=======================*/
1200 	btr_cur_t*	cursor,	/*!< in: cursor on page after which to insert;
1201 				cursor stays valid */
1202 	const dtuple_t*	tuple,	/*!< in: tuple to insert; the size info need not
1203 				have been stored to tuple */
1204 	ulint**		offsets,/*!< out: offsets on *rec */
1205 	mem_heap_t**	heap,	/*!< in/out: pointer to memory heap, or NULL */
1206 	ulint		n_ext,	/*!< in: number of externally stored columns */
1207 	mtr_t*		mtr)	/*!< in/out: mini-transaction */
1208 {
1209 	page_cur_t*	page_cursor;
1210 	rec_t*		rec;
1211 
1212 	ut_ad(dtuple_check_typed(tuple));
1213 
1214 	ut_ad(mtr_memo_contains(mtr, btr_cur_get_block(cursor),
1215 				MTR_MEMO_PAGE_X_FIX));
1216 	page_cursor = btr_cur_get_page_cur(cursor);
1217 
1218 	/* Now, try the insert */
1219 	rec = page_cur_tuple_insert(page_cursor, tuple, cursor->index,
1220 				    offsets, heap, n_ext, mtr);
1221 
1222 	/* If the record did not fit, reorganize.
1223 	For compressed pages, page_cur_tuple_insert()
1224 	attempted this already. */
1225 	if (!rec && !page_cur_get_page_zip(page_cursor)
1226 	    && btr_page_reorganize(page_cursor, cursor->index, mtr)) {
1227 		rec = page_cur_tuple_insert(
1228 			page_cursor, tuple, cursor->index,
1229 			offsets, heap, n_ext, mtr);
1230 	}
1231 
1232 	ut_ad(!rec || rec_offs_validate(rec, cursor->index, *offsets));
1233 	return(rec);
1234 }
1235 
1236 /*************************************************************//**
1237 For an insert, checks the locks and does the undo logging if desired.
1238 @return	DB_SUCCESS, DB_WAIT_LOCK, DB_FAIL, or error number */
1239 UNIV_INLINE MY_ATTRIBUTE((warn_unused_result, nonnull(2,3,5,6)))
1240 dberr_t
btr_cur_ins_lock_and_undo(ulint flags,btr_cur_t * cursor,dtuple_t * entry,que_thr_t * thr,mtr_t * mtr,ibool * inherit)1241 btr_cur_ins_lock_and_undo(
1242 /*======================*/
1243 	ulint		flags,	/*!< in: undo logging and locking flags: if
1244 				not zero, the parameters index and thr
1245 				should be specified */
1246 	btr_cur_t*	cursor,	/*!< in: cursor on page after which to insert */
1247 	dtuple_t*	entry,	/*!< in/out: entry to insert */
1248 	que_thr_t*	thr,	/*!< in: query thread or NULL */
1249 	mtr_t*		mtr,	/*!< in/out: mini-transaction */
1250 	ibool*		inherit)/*!< out: TRUE if the inserted new record maybe
1251 				should inherit LOCK_GAP type locks from the
1252 				successor record */
1253 {
1254 	dict_index_t*	index;
1255 	dberr_t		err;
1256 	rec_t*		rec;
1257 	roll_ptr_t	roll_ptr;
1258 
1259 	if (UNIV_UNLIKELY(thr && thr_get_trx(thr)->fake_changes)) {
1260 		/* skip LOCK, UNDO */
1261 		return(DB_SUCCESS);
1262 	}
1263 
1264 	/* Check if we have to wait for a lock: enqueue an explicit lock
1265 	request if yes */
1266 
1267 	rec = btr_cur_get_rec(cursor);
1268 	index = cursor->index;
1269 
1270 	ut_ad(!dict_index_is_online_ddl(index)
1271 	      || dict_index_is_clust(index)
1272 	      || (flags & BTR_CREATE_FLAG));
1273 
1274 	err = lock_rec_insert_check_and_lock(flags, rec,
1275 					     btr_cur_get_block(cursor),
1276 					     index, thr, mtr, inherit);
1277 
1278 	if (err != DB_SUCCESS
1279 	    || !dict_index_is_clust(index) || dict_index_is_ibuf(index)) {
1280 
1281 		return(err);
1282 	}
1283 
1284 	err = trx_undo_report_row_operation(flags, TRX_UNDO_INSERT_OP,
1285 					    thr, index, entry,
1286 					    NULL, 0, NULL, NULL,
1287 					    &roll_ptr);
1288 	if (err != DB_SUCCESS) {
1289 
1290 		return(err);
1291 	}
1292 
1293 	/* Now we can fill in the roll ptr field in entry */
1294 
1295 	if (!(flags & BTR_KEEP_SYS_FLAG)) {
1296 
1297 		row_upd_index_entry_sys_field(entry, index,
1298 					      DATA_ROLL_PTR, roll_ptr);
1299 	}
1300 
1301 	return(DB_SUCCESS);
1302 }
1303 
1304 #ifdef UNIV_DEBUG
1305 /*************************************************************//**
1306 Report information about a transaction. */
1307 static
1308 void
btr_cur_trx_report(trx_id_t trx_id,const dict_index_t * index,const char * op)1309 btr_cur_trx_report(
1310 /*===============*/
1311 	trx_id_t		trx_id,	/*!< in: transaction id */
1312 	const dict_index_t*	index,	/*!< in: index */
1313 	const char*		op)	/*!< in: operation */
1314 {
1315 	fprintf(stderr, "Trx with id " TRX_ID_FMT " going to ", trx_id);
1316 	fputs(op, stderr);
1317 	dict_index_name_print(stderr, NULL, index);
1318 	putc('\n', stderr);
1319 }
1320 #endif /* UNIV_DEBUG */
1321 
1322 /*************************************************************//**
1323 Tries to perform an insert to a page in an index tree, next to cursor.
1324 It is assumed that mtr holds an x-latch on the page. The operation does
1325 not succeed if there is too little space on the page. If there is just
1326 one record on the page, the insert will always succeed; this is to
1327 prevent trying to split a page with just one record.
1328 @return	DB_SUCCESS, DB_WAIT_LOCK, DB_FAIL, or error number */
1329 UNIV_INTERN
1330 dberr_t
btr_cur_optimistic_insert(ulint flags,btr_cur_t * cursor,ulint ** offsets,mem_heap_t ** heap,dtuple_t * entry,rec_t ** rec,big_rec_t ** big_rec,ulint n_ext,que_thr_t * thr,mtr_t * mtr)1331 btr_cur_optimistic_insert(
1332 /*======================*/
1333 	ulint		flags,	/*!< in: undo logging and locking flags: if not
1334 				zero, the parameters index and thr should be
1335 				specified */
1336 	btr_cur_t*	cursor,	/*!< in: cursor on page after which to insert;
1337 				cursor stays valid */
1338 	ulint**		offsets,/*!< out: offsets on *rec */
1339 	mem_heap_t**	heap,	/*!< in/out: pointer to memory heap, or NULL */
1340 	dtuple_t*	entry,	/*!< in/out: entry to insert */
1341 	rec_t**		rec,	/*!< out: pointer to inserted record if
1342 				succeed */
1343 	big_rec_t**	big_rec,/*!< out: big rec vector whose fields have to
1344 				be stored externally by the caller, or
1345 				NULL */
1346 	ulint		n_ext,	/*!< in: number of externally stored columns */
1347 	que_thr_t*	thr,	/*!< in: query thread or NULL */
1348 	mtr_t*		mtr)	/*!< in/out: mini-transaction;
1349 				if this function returns DB_SUCCESS on
1350 				a leaf page of a secondary index in a
1351 				compressed tablespace, the caller must
1352 				mtr_commit(mtr) before latching
1353 				any further pages */
1354 {
1355 	big_rec_t*	big_rec_vec	= NULL;
1356 	dict_index_t*	index;
1357 	page_cur_t*	page_cursor;
1358 	buf_block_t*	block;
1359 	page_t*		page;
1360 	rec_t*		dummy;
1361 	ibool		leaf;
1362 	ibool		reorg;
1363 	ibool		inherit = TRUE;
1364 	ulint		zip_size;
1365 	ulint		rec_size;
1366 	dberr_t		err;
1367 
1368 	*big_rec = NULL;
1369 
1370 	block = btr_cur_get_block(cursor);
1371 
1372 	SRV_CORRUPT_TABLE_CHECK(block, return(DB_CORRUPTION););
1373 
1374 	page = buf_block_get_frame(block);
1375 	index = cursor->index;
1376 
1377 	ut_ad((thr && thr_get_trx(thr)->fake_changes)
1378 	      || mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
1379 	ut_ad(!dict_index_is_online_ddl(index)
1380 	      || dict_index_is_clust(index)
1381 	      || (flags & BTR_CREATE_FLAG));
1382 	ut_ad(dtuple_check_typed(entry));
1383 
1384 	zip_size = buf_block_get_zip_size(block);
1385 #ifdef UNIV_DEBUG_VALGRIND
1386 	if (zip_size) {
1387 		UNIV_MEM_ASSERT_RW(page, UNIV_PAGE_SIZE);
1388 		UNIV_MEM_ASSERT_RW(block->page.zip.data, zip_size);
1389 	}
1390 #endif /* UNIV_DEBUG_VALGRIND */
1391 
1392 #ifdef UNIV_DEBUG
1393 	if (btr_cur_print_record_ops && thr) {
1394 		btr_cur_trx_report(thr_get_trx(thr)->id, index, "insert ");
1395 		dtuple_print(stderr, entry);
1396 	}
1397 #endif /* UNIV_DEBUG */
1398 
1399 	leaf = page_is_leaf(page);
1400 
1401 	/* Calculate the record size when entry is converted to a record */
1402 	rec_size = rec_get_converted_size(index, entry, n_ext);
1403 
1404 	if (page_zip_rec_needs_ext(rec_size, page_is_comp(page),
1405 				   dtuple_get_n_fields(entry), zip_size)) {
1406 
1407 		/* The record is so big that we have to store some fields
1408 		externally on separate database pages */
1409 		big_rec_vec = dtuple_convert_big_rec(index, entry, &n_ext);
1410 
1411 		if (UNIV_UNLIKELY(big_rec_vec == NULL)) {
1412 
1413 			return(DB_TOO_BIG_RECORD);
1414 		}
1415 
1416 		rec_size = rec_get_converted_size(index, entry, n_ext);
1417 	}
1418 
1419 	if (zip_size) {
1420 		/* Estimate the free space of an empty compressed page.
1421 		Subtract one byte for the encoded heap_no in the
1422 		modification log. */
1423 		ulint	free_space_zip = page_zip_empty_size(
1424 			cursor->index->n_fields, zip_size);
1425 		ulint	n_uniq = dict_index_get_n_unique_in_tree(index);
1426 
1427 		ut_ad(dict_table_is_comp(index->table));
1428 
1429 		if (free_space_zip == 0) {
1430 too_big:
1431 			if (big_rec_vec) {
1432 				dtuple_convert_back_big_rec(
1433 					index, entry, big_rec_vec);
1434 			}
1435 
1436 			return(DB_TOO_BIG_RECORD);
1437 		}
1438 
1439 		/* Subtract one byte for the encoded heap_no in the
1440 		modification log. */
1441 		free_space_zip--;
1442 
1443 		/* There should be enough room for two node pointer
1444 		records on an empty non-leaf page.  This prevents
1445 		infinite page splits. */
1446 
1447 		if (entry->n_fields >= n_uniq
1448 		    && (REC_NODE_PTR_SIZE
1449 			+ rec_get_converted_size_comp_prefix(
1450 				index, entry->fields, n_uniq, NULL)
1451 			/* On a compressed page, there is
1452 			a two-byte entry in the dense
1453 			page directory for every record.
1454 			But there is no record header. */
1455 			- (REC_N_NEW_EXTRA_BYTES - 2)
1456 			> free_space_zip / 2)) {
1457 			goto too_big;
1458 		}
1459 	}
1460 
1461 	LIMIT_OPTIMISTIC_INSERT_DEBUG(page_get_n_recs(page),
1462 				      goto fail);
1463 
1464 	if (leaf && zip_size
1465 	    && (page_get_data_size(page) + rec_size
1466 		>= dict_index_zip_pad_optimal_page_size(index))) {
1467 		/* If compression padding tells us that insertion will
1468 		result in too packed up page i.e.: which is likely to
1469 		cause compression failure then don't do an optimistic
1470 		insertion. */
1471 fail:
1472 		err = DB_FAIL;
1473 fail_err:
1474 
1475 		if (big_rec_vec) {
1476 			dtuple_convert_back_big_rec(index, entry, big_rec_vec);
1477 		}
1478 
1479 		return(err);
1480 	}
1481 
1482 	ulint	max_size = page_get_max_insert_size_after_reorganize(page, 1);
1483 
1484 	if (page_has_garbage(page)) {
1485 		if ((max_size < rec_size
1486 		     || max_size < BTR_CUR_PAGE_REORGANIZE_LIMIT)
1487 		    && page_get_n_recs(page) > 1
1488 		    && page_get_max_insert_size(page, 1) < rec_size) {
1489 
1490 			goto fail;
1491 		}
1492 	} else if (max_size < rec_size) {
1493 		goto fail;
1494 	}
1495 
1496 	/* If there have been many consecutive inserts to the
1497 	clustered index leaf page of an uncompressed table, check if
1498 	we have to split the page to reserve enough free space for
1499 	future updates of records. */
1500 
1501 	if (leaf && !zip_size && dict_index_is_clust(index)
1502 	    && page_get_n_recs(page) >= 2
1503 	    && dict_index_get_space_reserve() + rec_size > max_size
1504 	    && (btr_page_get_split_rec_to_right(cursor, &dummy)
1505 		|| btr_page_get_split_rec_to_left(cursor, &dummy))) {
1506 		goto fail;
1507 	}
1508 
1509 	/* Check locks and write to the undo log, if specified */
1510 	err = btr_cur_ins_lock_and_undo(flags, cursor, entry,
1511 					thr, mtr, &inherit);
1512 
1513 	if (UNIV_UNLIKELY(err != DB_SUCCESS)) {
1514 
1515 		goto fail_err;
1516 	}
1517 
1518 	if (UNIV_UNLIKELY(thr && thr_get_trx(thr)->fake_changes)) {
1519 		/* skip CHANGE, LOG */
1520 		*big_rec = big_rec_vec;
1521 		return(err); /* == DB_SUCCESS */
1522 	}
1523 
1524 	page_cursor = btr_cur_get_page_cur(cursor);
1525 
1526 	/* Now, try the insert */
1527 
1528 	{
1529 		const rec_t* page_cursor_rec = page_cur_get_rec(page_cursor);
1530 		*rec = page_cur_tuple_insert(page_cursor, entry, index,
1531 					     offsets, heap, n_ext, mtr);
1532 		reorg = page_cursor_rec != page_cur_get_rec(page_cursor);
1533 	}
1534 
1535 	if (*rec) {
1536 	} else if (zip_size) {
1537 		/* Reset the IBUF_BITMAP_FREE bits, because
1538 		page_cur_tuple_insert() will have attempted page
1539 		reorganize before failing. */
1540 		if (leaf && !dict_index_is_clust(index)) {
1541 			ibuf_reset_free_bits(block);
1542 		}
1543 
1544 		goto fail;
1545 	} else {
1546 		ut_ad(!reorg);
1547 
1548 		/* If the record did not fit, reorganize */
1549 		if (!btr_page_reorganize(page_cursor, index, mtr)) {
1550 			ut_ad(0);
1551 			goto fail;
1552 		}
1553 
1554 		ut_ad(page_get_max_insert_size(page, 1) == max_size);
1555 
1556 		reorg = TRUE;
1557 
1558 		*rec = page_cur_tuple_insert(page_cursor, entry, index,
1559 					     offsets, heap, n_ext, mtr);
1560 
1561 		if (UNIV_UNLIKELY(!*rec)) {
1562 			fputs("InnoDB: Error: cannot insert tuple ", stderr);
1563 			dtuple_print(stderr, entry);
1564 			fputs(" into ", stderr);
1565 			dict_index_name_print(stderr, thr_get_trx(thr), index);
1566 			fprintf(stderr, "\nInnoDB: max insert size %lu\n",
1567 				(ulong) max_size);
1568 			ut_error;
1569 		}
1570 	}
1571 
1572 #ifdef BTR_CUR_HASH_ADAPT
1573 	if (!reorg && leaf && (cursor->flag == BTR_CUR_HASH)) {
1574 		btr_search_update_hash_node_on_insert(cursor);
1575 	} else {
1576 		btr_search_update_hash_on_insert(cursor);
1577 	}
1578 #endif
1579 
1580 	if (!(flags & BTR_NO_LOCKING_FLAG) && inherit) {
1581 
1582 		lock_update_insert(block, *rec);
1583 	}
1584 
1585 	if (leaf && !dict_index_is_clust(index)) {
1586 		/* Update the free bits of the B-tree page in the
1587 		insert buffer bitmap. */
1588 
1589 		/* The free bits in the insert buffer bitmap must
1590 		never exceed the free space on a page.  It is safe to
1591 		decrement or reset the bits in the bitmap in a
1592 		mini-transaction that is committed before the
1593 		mini-transaction that affects the free space. */
1594 
1595 		/* It is unsafe to increment the bits in a separately
1596 		committed mini-transaction, because in crash recovery,
1597 		the free bits could momentarily be set too high. */
1598 
1599 		if (zip_size) {
1600 			/* Update the bits in the same mini-transaction. */
1601 			ibuf_update_free_bits_zip(block, mtr);
1602 		} else {
1603 			/* Decrement the bits in a separate
1604 			mini-transaction. */
1605 			ibuf_update_free_bits_if_full(
1606 				block, max_size,
1607 				rec_size + PAGE_DIR_SLOT_SIZE);
1608 		}
1609 	}
1610 
1611 	*big_rec = big_rec_vec;
1612 
1613 	return(DB_SUCCESS);
1614 }
1615 
1616 /*************************************************************//**
1617 Performs an insert on a page of an index tree. It is assumed that mtr
1618 holds an x-latch on the tree and on the cursor page. If the insert is
1619 made on the leaf level, to avoid deadlocks, mtr must also own x-latches
1620 to brothers of page, if those brothers exist.
1621 @return	DB_SUCCESS or error number */
1622 UNIV_INTERN
1623 dberr_t
btr_cur_pessimistic_insert(ulint flags,btr_cur_t * cursor,ulint ** offsets,mem_heap_t ** heap,dtuple_t * entry,rec_t ** rec,big_rec_t ** big_rec,ulint n_ext,que_thr_t * thr,mtr_t * mtr)1624 btr_cur_pessimistic_insert(
1625 /*=======================*/
1626 	ulint		flags,	/*!< in: undo logging and locking flags: if not
1627 				zero, the parameter thr should be
1628 				specified; if no undo logging is specified,
1629 				then the caller must have reserved enough
1630 				free extents in the file space so that the
1631 				insertion will certainly succeed */
1632 	btr_cur_t*	cursor,	/*!< in: cursor after which to insert;
1633 				cursor stays valid */
1634 	ulint**		offsets,/*!< out: offsets on *rec */
1635 	mem_heap_t**	heap,	/*!< in/out: pointer to memory heap
1636 				that can be emptied, or NULL */
1637 	dtuple_t*	entry,	/*!< in/out: entry to insert */
1638 	rec_t**		rec,	/*!< out: pointer to inserted record if
1639 				succeed */
1640 	big_rec_t**	big_rec,/*!< out: big rec vector whose fields have to
1641 				be stored externally by the caller, or
1642 				NULL */
1643 	ulint		n_ext,	/*!< in: number of externally stored columns */
1644 	que_thr_t*	thr,	/*!< in: query thread or NULL */
1645 	mtr_t*		mtr)	/*!< in/out: mini-transaction */
1646 {
1647 	dict_index_t*	index		= cursor->index;
1648 	ulint		zip_size	= dict_table_zip_size(index->table);
1649 	big_rec_t*	big_rec_vec	= NULL;
1650 	dberr_t		err;
1651 	ibool		inherit = FALSE;
1652 	ibool		success;
1653 	ulint		n_reserved	= 0;
1654 
1655 	ut_ad(dtuple_check_typed(entry));
1656 
1657 	*big_rec = NULL;
1658 
1659 	ut_ad((thr && thr_get_trx(thr)->fake_changes) || mtr_memo_contains(mtr,
1660 				dict_index_get_lock(btr_cur_get_index(cursor)),
1661 				MTR_MEMO_X_LOCK));
1662 	ut_ad((thr && thr_get_trx(thr)->fake_changes) || mtr_memo_contains(mtr, btr_cur_get_block(cursor),
1663 				MTR_MEMO_PAGE_X_FIX));
1664 	ut_ad(!dict_index_is_online_ddl(index)
1665 	      || dict_index_is_clust(index)
1666 	      || (flags & BTR_CREATE_FLAG));
1667 
1668 	cursor->flag = BTR_CUR_BINARY;
1669 
1670 	/* Check locks and write to undo log, if specified */
1671 
1672 	err = btr_cur_ins_lock_and_undo(flags, cursor, entry,
1673 					thr, mtr, &inherit);
1674 
1675 	if (err != DB_SUCCESS) {
1676 
1677 		return(err);
1678 	}
1679 
1680 	if (!(flags & BTR_NO_UNDO_LOG_FLAG)) {
1681 
1682 		ut_a(cursor->tree_height != ULINT_UNDEFINED);
1683 
1684 		/* First reserve enough free space for the file segments
1685 		of the index tree, so that the insert will not fail because
1686 		of lack of space */
1687 
1688 		ulint	n_extents = cursor->tree_height / 16 + 3;
1689 
1690 		success = fsp_reserve_free_extents(&n_reserved, index->space,
1691 						   n_extents, FSP_NORMAL, mtr);
1692 		if (!success) {
1693 			return(DB_OUT_OF_FILE_SPACE);
1694 		}
1695 	}
1696 
1697 	if (page_zip_rec_needs_ext(rec_get_converted_size(index, entry, n_ext),
1698 				   dict_table_is_comp(index->table),
1699 				   dtuple_get_n_fields(entry),
1700 				   zip_size)) {
1701 		/* The record is so big that we have to store some fields
1702 		externally on separate database pages */
1703 
1704 		if (UNIV_LIKELY_NULL(big_rec_vec)) {
1705 			/* This should never happen, but we handle
1706 			the situation in a robust manner. */
1707 			ut_ad(0);
1708 			dtuple_convert_back_big_rec(index, entry, big_rec_vec);
1709 		}
1710 
1711 		big_rec_vec = dtuple_convert_big_rec(index, entry, &n_ext);
1712 
1713 		if (big_rec_vec == NULL) {
1714 
1715 			if (n_reserved > 0) {
1716 				fil_space_release_free_extents(index->space,
1717 							       n_reserved);
1718 			}
1719 			return(DB_TOO_BIG_RECORD);
1720 		}
1721 	}
1722 
1723 	if (UNIV_UNLIKELY(thr && thr_get_trx(thr)->fake_changes)) {
1724 		/* skip CHANGE, LOG */
1725 		if (n_reserved > 0) {
1726 			fil_space_release_free_extents(index->space,
1727 						       n_reserved);
1728 		}
1729 		*big_rec = big_rec_vec;
1730 		return(DB_SUCCESS);
1731 	}
1732 
1733 	if (dict_index_get_page(index)
1734 	    == buf_block_get_page_no(btr_cur_get_block(cursor))) {
1735 
1736 		/* The page is the root page */
1737 		*rec = btr_root_raise_and_insert(
1738 			flags, cursor, offsets, heap, entry, n_ext, mtr);
1739 	} else {
1740 		*rec = btr_page_split_and_insert(
1741 			flags, cursor, offsets, heap, entry, n_ext, mtr);
1742 	}
1743 
1744 	if (*rec == NULL && os_has_said_disk_full) {
1745 		return(DB_OUT_OF_FILE_SPACE);
1746 	}
1747 
1748 	ut_ad(page_rec_get_next(btr_cur_get_rec(cursor)) == *rec);
1749 
1750 	if (!(flags & BTR_NO_LOCKING_FLAG)) {
1751 		/* The cursor might be moved to the other page,
1752 		and the max trx id field should be updated after
1753 		the cursor was fixed. */
1754 		if (!dict_index_is_clust(index)) {
1755 			page_update_max_trx_id(
1756 				btr_cur_get_block(cursor),
1757 				btr_cur_get_page_zip(cursor),
1758 				thr_get_trx(thr)->id, mtr);
1759 		}
1760 		if (!page_rec_is_infimum(btr_cur_get_rec(cursor))
1761 		    || btr_page_get_prev(
1762 			buf_nonnull_block_get_frame(
1763 				btr_cur_get_block(cursor)), mtr)
1764 		       == FIL_NULL) {
1765 			/* split and inserted need to call
1766 			lock_update_insert() always. */
1767 			inherit = TRUE;
1768 		}
1769 	}
1770 
1771 #ifdef BTR_CUR_ADAPT
1772 	btr_search_update_hash_on_insert(cursor);
1773 #endif
1774 	if (inherit && !(flags & BTR_NO_LOCKING_FLAG)) {
1775 
1776 		lock_update_insert(btr_cur_get_block(cursor), *rec);
1777 	}
1778 
1779 	if (n_reserved > 0) {
1780 		fil_space_release_free_extents(index->space, n_reserved);
1781 	}
1782 
1783 	*big_rec = big_rec_vec;
1784 
1785 	return(DB_SUCCESS);
1786 }
1787 
1788 /*==================== B-TREE UPDATE =========================*/
1789 
1790 /*************************************************************//**
1791 For an update, checks the locks and does the undo logging.
1792 @return	DB_SUCCESS, DB_WAIT_LOCK, or error number */
UNIV_INLINE(warn_unused_result)1793 UNIV_INLINE MY_ATTRIBUTE((warn_unused_result))
1794 dberr_t
1795 btr_cur_upd_lock_and_undo(
1796 /*======================*/
1797 	ulint		flags,	/*!< in: undo logging and locking flags */
1798 	btr_cur_t*	cursor,	/*!< in: cursor on record to update */
1799 	const ulint*	offsets,/*!< in: rec_get_offsets() on cursor */
1800 	const upd_t*	update,	/*!< in: update vector */
1801 	ulint		cmpl_info,/*!< in: compiler info on secondary index
1802 				updates */
1803 	que_thr_t*	thr,	/*!< in: query thread
1804 				(can be NULL if BTR_NO_LOCKING_FLAG) */
1805 	mtr_t*		mtr,	/*!< in/out: mini-transaction */
1806 	roll_ptr_t*	roll_ptr)/*!< out: roll pointer */
1807 {
1808 	dict_index_t*	index;
1809 	const rec_t*	rec;
1810 	dberr_t		err;
1811 
1812 	ut_ad((thr != NULL) || (flags & BTR_NO_LOCKING_FLAG));
1813 
1814 	if (UNIV_UNLIKELY(thr && thr_get_trx(thr)->fake_changes)) {
1815 		/* skip LOCK, UNDO */
1816 		return(DB_SUCCESS);
1817 	}
1818 
1819 	rec = btr_cur_get_rec(cursor);
1820 	index = cursor->index;
1821 
1822 	ut_ad(rec_offs_validate(rec, index, offsets));
1823 
1824 	if (!dict_index_is_clust(index)) {
1825 		ut_ad(dict_index_is_online_ddl(index)
1826 		      == !!(flags & BTR_CREATE_FLAG));
1827 
1828 		/* We do undo logging only when we update a clustered index
1829 		record */
1830 		return(lock_sec_rec_modify_check_and_lock(
1831 			       flags, btr_cur_get_block(cursor), rec,
1832 			       index, thr, mtr));
1833 	}
1834 
1835 	/* Check if we have to wait for a lock: enqueue an explicit lock
1836 	request if yes */
1837 
1838 	if (!(flags & BTR_NO_LOCKING_FLAG)) {
1839 		err = lock_clust_rec_modify_check_and_lock(
1840 			flags, btr_cur_get_block(cursor), rec, index,
1841 			offsets, thr);
1842 		if (err != DB_SUCCESS) {
1843 			return(err);
1844 		}
1845 	}
1846 
1847 	/* Append the info about the update in the undo log */
1848 
1849 	return(trx_undo_report_row_operation(
1850 		       flags, TRX_UNDO_MODIFY_OP, thr,
1851 		       index, NULL, update,
1852 		       cmpl_info, rec, offsets, roll_ptr));
1853 }
1854 
1855 /***********************************************************//**
1856 Writes a redo log record of updating a record in-place. */
1857 UNIV_INTERN
1858 void
btr_cur_update_in_place_log(ulint flags,const rec_t * rec,dict_index_t * index,const upd_t * update,trx_id_t trx_id,roll_ptr_t roll_ptr,mtr_t * mtr)1859 btr_cur_update_in_place_log(
1860 /*========================*/
1861 	ulint		flags,		/*!< in: flags */
1862 	const rec_t*	rec,		/*!< in: record */
1863 	dict_index_t*	index,		/*!< in: index of the record */
1864 	const upd_t*	update,		/*!< in: update vector */
1865 	trx_id_t	trx_id,		/*!< in: transaction id */
1866 	roll_ptr_t	roll_ptr,	/*!< in: roll ptr */
1867 	mtr_t*		mtr)		/*!< in: mtr */
1868 {
1869 	byte*		log_ptr;
1870 	const page_t*	page	= page_align(rec);
1871 	ut_ad(flags < 256);
1872 	ut_ad(!!page_is_comp(page) == dict_table_is_comp(index->table));
1873 
1874 	log_ptr = mlog_open_and_write_index(mtr, rec, index, page_is_comp(page)
1875 					    ? MLOG_COMP_REC_UPDATE_IN_PLACE
1876 					    : MLOG_REC_UPDATE_IN_PLACE,
1877 					    1 + DATA_ROLL_PTR_LEN + 14 + 2
1878 					    + MLOG_BUF_MARGIN);
1879 
1880 	if (!log_ptr) {
1881 		/* Logging in mtr is switched off during crash recovery */
1882 		return;
1883 	}
1884 
1885 	/* For secondary indexes, we could skip writing the dummy system fields
1886 	to the redo log but we have to change redo log parsing of
1887 	MLOG_REC_UPDATE_IN_PLACE/MLOG_COMP_REC_UPDATE_IN_PLACE or we have to add
1888 	new redo log record. For now, just write dummy sys fields to the redo
1889 	log if we are updating a secondary index record.
1890 	*/
1891 	mach_write_to_1(log_ptr, flags);
1892 	log_ptr++;
1893 
1894 	if (dict_index_is_clust(index)) {
1895 		log_ptr = row_upd_write_sys_vals_to_log(
1896 				index, trx_id, roll_ptr, log_ptr, mtr);
1897 	} else {
1898 		/* Dummy system fields for a secondary index */
1899 		/* TRX_ID Position */
1900 		log_ptr += mach_write_compressed(log_ptr, 0);
1901 		/* ROLL_PTR */
1902 		trx_write_roll_ptr(log_ptr, 0);
1903 		log_ptr += DATA_ROLL_PTR_LEN;
1904 		/* TRX_ID */
1905 		log_ptr += mach_ull_write_compressed(log_ptr, 0);
1906 	}
1907 
1908 	mach_write_to_2(log_ptr, page_offset(rec));
1909 	log_ptr += 2;
1910 
1911 	row_upd_index_write_log(update, log_ptr, mtr);
1912 }
1913 #endif /* UNIV_HOTBACKUP */
1914 
1915 /***********************************************************//**
1916 Parses a redo log record of updating a record in-place.
1917 @return	end of log record or NULL */
1918 UNIV_INTERN
1919 byte*
btr_cur_parse_update_in_place(byte * ptr,byte * end_ptr,page_t * page,page_zip_des_t * page_zip,dict_index_t * index)1920 btr_cur_parse_update_in_place(
1921 /*==========================*/
1922 	byte*		ptr,	/*!< in: buffer */
1923 	byte*		end_ptr,/*!< in: buffer end */
1924 	page_t*		page,	/*!< in/out: page or NULL */
1925 	page_zip_des_t*	page_zip,/*!< in/out: compressed page, or NULL */
1926 	dict_index_t*	index)	/*!< in: index corresponding to page */
1927 {
1928 	ulint		flags;
1929 	rec_t*		rec;
1930 	upd_t*		update;
1931 	ulint		pos;
1932 	trx_id_t	trx_id;
1933 	roll_ptr_t	roll_ptr;
1934 	ulint		rec_offset;
1935 	mem_heap_t*	heap;
1936 	ulint*		offsets;
1937 
1938 	if (end_ptr < ptr + 1) {
1939 
1940 		return(NULL);
1941 	}
1942 
1943 	flags = mach_read_from_1(ptr);
1944 	ptr++;
1945 
1946 	ptr = row_upd_parse_sys_vals(ptr, end_ptr, &pos, &trx_id, &roll_ptr);
1947 
1948 	if (ptr == NULL) {
1949 
1950 		return(NULL);
1951 	}
1952 
1953 	if (end_ptr < ptr + 2) {
1954 
1955 		return(NULL);
1956 	}
1957 
1958 	rec_offset = mach_read_from_2(ptr);
1959 	ptr += 2;
1960 
1961 	ut_a(rec_offset <= UNIV_PAGE_SIZE);
1962 
1963 	heap = mem_heap_create(256);
1964 
1965 	ptr = row_upd_index_parse(ptr, end_ptr, heap, &update);
1966 
1967 	if (!ptr || !page) {
1968 
1969 		goto func_exit;
1970 	}
1971 
1972 	ut_a((ibool)!!page_is_comp(page) == dict_table_is_comp(index->table));
1973 	rec = page + rec_offset;
1974 
1975 	/* We do not need to reserve btr_search_latch, as the page is only
1976 	being recovered, and there cannot be a hash index to it. */
1977 
1978 	offsets = rec_get_offsets(rec, index, NULL, ULINT_UNDEFINED, &heap);
1979 
1980 	if (!(flags & BTR_KEEP_SYS_FLAG)) {
1981 		row_upd_rec_sys_fields_in_recovery(rec, page_zip, offsets,
1982 						   pos, trx_id, roll_ptr);
1983 	}
1984 
1985 	row_upd_rec_in_place(rec, index, offsets, update, page_zip);
1986 
1987 func_exit:
1988 	mem_heap_free(heap);
1989 
1990 	return(ptr);
1991 }
1992 
1993 #ifndef UNIV_HOTBACKUP
1994 /*************************************************************//**
1995 See if there is enough place in the page modification log to log
1996 an update-in-place.
1997 
1998 @retval false if out of space; IBUF_BITMAP_FREE will be reset
1999 outside mtr if the page was recompressed
2000 @retval	true if enough place;
2001 
2002 IMPORTANT: The caller will have to update IBUF_BITMAP_FREE if this is
2003 a secondary index leaf page. This has to be done either within the
2004 same mini-transaction, or by invoking ibuf_reset_free_bits() before
2005 mtr_commit(mtr). */
2006 UNIV_INTERN
2007 bool
btr_cur_update_alloc_zip_func(page_zip_des_t * page_zip,page_cur_t * cursor,dict_index_t * index,ulint * offsets,ulint length,bool create,mtr_t * mtr,trx_t * trx)2008 btr_cur_update_alloc_zip_func(
2009 /*==========================*/
2010 	page_zip_des_t*	page_zip,/*!< in/out: compressed page */
2011 	page_cur_t*	cursor,	/*!< in/out: B-tree page cursor */
2012 	dict_index_t*	index,	/*!< in: the index corresponding to cursor */
2013 #ifdef UNIV_DEBUG
2014 	ulint*		offsets,/*!< in/out: offsets of the cursor record */
2015 #endif /* UNIV_DEBUG */
2016 	ulint		length,	/*!< in: size needed */
2017 	bool		create,	/*!< in: true=delete-and-insert,
2018 				false=update-in-place */
2019 	mtr_t*		mtr,	/*!< in/out: mini-transaction */
2020 	trx_t*		trx)	/*!< in: NULL or transaction */
2021 {
2022 	const page_t*	page = page_cur_get_page(cursor);
2023 
2024 	ut_ad(page_zip == page_cur_get_page_zip(cursor));
2025 	ut_ad(page_zip);
2026 	ut_ad(!dict_index_is_ibuf(index));
2027 	ut_ad(rec_offs_validate(page_cur_get_rec(cursor), index, offsets));
2028 
2029 	if (page_zip_available(page_zip, dict_index_is_clust(index),
2030 			       length, create)) {
2031 		return(true);
2032 	}
2033 
2034 	if (!page_zip->m_nonempty && !page_has_garbage(page)) {
2035 		/* The page has been freshly compressed, so
2036 		reorganizing it will not help. */
2037 		return(false);
2038 	}
2039 
2040 	if (create && page_is_leaf(page)
2041 	    && (length + page_get_data_size(page)
2042 		>= dict_index_zip_pad_optimal_page_size(index))) {
2043 		return(false);
2044 	}
2045 
2046 	if (UNIV_UNLIKELY(trx && trx->fake_changes)) {
2047 		/* Don't call page_zip_compress_write_log_no_data as that has
2048 		assert which would fail. Assume there won't be a compression
2049 		failure. */
2050 
2051 		return(true);
2052 	}
2053 
2054 	if (!btr_page_reorganize(cursor, index, mtr)) {
2055 		goto out_of_space;
2056 	}
2057 
2058 	rec_offs_make_valid(page_cur_get_rec(cursor), index, offsets);
2059 
2060 	/* After recompressing a page, we must make sure that the free
2061 	bits in the insert buffer bitmap will not exceed the free
2062 	space on the page.  Because this function will not attempt
2063 	recompression unless page_zip_available() fails above, it is
2064 	safe to reset the free bits if page_zip_available() fails
2065 	again, below.  The free bits can safely be reset in a separate
2066 	mini-transaction.  If page_zip_available() succeeds below, we
2067 	can be sure that the btr_page_reorganize() above did not reduce
2068 	the free space available on the page. */
2069 
2070 	if (page_zip_available(page_zip, dict_index_is_clust(index),
2071 			       length, create)) {
2072 		return(true);
2073 	}
2074 
2075 out_of_space:
2076 	ut_ad(rec_offs_validate(page_cur_get_rec(cursor), index, offsets));
2077 
2078 	/* Out of space: reset the free bits. */
2079 	if (!dict_index_is_clust(index) && page_is_leaf(page)) {
2080 		ibuf_reset_free_bits(page_cur_get_block(cursor));
2081 	}
2082 
2083 	return(false);
2084 }
2085 
2086 /*************************************************************//**
2087 Updates a record when the update causes no size changes in its fields.
2088 We assume here that the ordering fields of the record do not change.
2089 @return locking or undo log related error code, or
2090 @retval DB_SUCCESS on success
2091 @retval DB_ZIP_OVERFLOW if there is not enough space left
2092 on the compressed page (IBUF_BITMAP_FREE was reset outside mtr) */
2093 UNIV_INTERN
2094 dberr_t
btr_cur_update_in_place(ulint flags,btr_cur_t * cursor,ulint * offsets,const upd_t * update,ulint cmpl_info,que_thr_t * thr,trx_id_t trx_id,mtr_t * mtr)2095 btr_cur_update_in_place(
2096 /*====================*/
2097 	ulint		flags,	/*!< in: undo logging and locking flags */
2098 	btr_cur_t*	cursor,	/*!< in: cursor on the record to update;
2099 				cursor stays valid and positioned on the
2100 				same record */
2101 	ulint*		offsets,/*!< in/out: offsets on cursor->page_cur.rec */
2102 	const upd_t*	update,	/*!< in: update vector */
2103 	ulint		cmpl_info,/*!< in: compiler info on secondary index
2104 				updates */
2105 	que_thr_t*	thr,	/*!< in: query thread */
2106 	trx_id_t	trx_id,	/*!< in: transaction id */
2107 	mtr_t*		mtr)	/*!< in/out: mini-transaction; if this
2108 				is a secondary index, the caller must
2109 				mtr_commit(mtr) before latching any
2110 				further pages */
2111 {
2112 	dict_index_t*	index;
2113 	buf_block_t*	block;
2114 	page_zip_des_t*	page_zip;
2115 	dberr_t		err;
2116 	rec_t*		rec;
2117 	roll_ptr_t	roll_ptr	= 0;
2118 	ulint		was_delete_marked;
2119 	ibool		is_hashed;
2120 	trx_t*		trx;
2121 
2122 	rec = btr_cur_get_rec(cursor);
2123 	index = cursor->index;
2124 	ut_ad(rec_offs_validate(rec, index, offsets));
2125 	ut_ad(!!page_rec_is_comp(rec) == dict_table_is_comp(index->table));
2126 	/* The insert buffer tree should never be updated in place. */
2127 	ut_ad(!dict_index_is_ibuf(index));
2128 	ut_ad(dict_index_is_online_ddl(index) == !!(flags & BTR_CREATE_FLAG)
2129 	      || dict_index_is_clust(index));
2130 	ut_ad(thr_get_trx(thr)->id == trx_id
2131 	      || (flags & ~(BTR_KEEP_POS_FLAG | BTR_KEEP_IBUF_BITMAP))
2132 	      == (BTR_NO_UNDO_LOG_FLAG | BTR_NO_LOCKING_FLAG
2133 		  | BTR_CREATE_FLAG | BTR_KEEP_SYS_FLAG));
2134 	ut_ad(fil_page_get_type(btr_cur_get_page(cursor)) == FIL_PAGE_INDEX);
2135 	ut_ad(btr_page_get_index_id(btr_cur_get_page(cursor)) == index->id);
2136 
2137 #ifdef UNIV_DEBUG
2138 	if (btr_cur_print_record_ops) {
2139 		btr_cur_trx_report(trx_id, index, "update ");
2140 		rec_print_new(stderr, rec, offsets);
2141 	}
2142 #endif /* UNIV_DEBUG */
2143 
2144 	block = btr_cur_get_block(cursor);
2145 	page_zip = buf_block_get_page_zip(block);
2146 	trx = thr_get_trx(thr);
2147 
2148 	/* Check that enough space is available on the compressed page. */
2149 	if (page_zip) {
2150 		if (!btr_cur_update_alloc_zip(
2151 			    page_zip, btr_cur_get_page_cur(cursor),
2152 			    index, offsets, rec_offs_size(offsets),
2153 			    false, mtr, trx)) {
2154 			return(DB_ZIP_OVERFLOW);
2155 		}
2156 
2157 		rec = btr_cur_get_rec(cursor);
2158 	}
2159 
2160 	/* Do lock checking and undo logging */
2161 	err = btr_cur_upd_lock_and_undo(flags, cursor, offsets,
2162 					update, cmpl_info,
2163 					thr, mtr, &roll_ptr);
2164 	if (UNIV_UNLIKELY(err != DB_SUCCESS)) {
2165 		/* We may need to update the IBUF_BITMAP_FREE
2166 		bits after a reorganize that was done in
2167 		btr_cur_update_alloc_zip(). */
2168 		goto func_exit;
2169 	}
2170 
2171 	if (UNIV_UNLIKELY(trx->fake_changes)) {
2172 		/* skip CHANGE, LOG */
2173 		return(err); /* == DB_SUCCESS */
2174 	}
2175 
2176 	if (!(flags & BTR_KEEP_SYS_FLAG)) {
2177 		row_upd_rec_sys_fields(rec, NULL, index, offsets,
2178 				       thr_get_trx(thr), roll_ptr);
2179 	}
2180 
2181 	was_delete_marked = rec_get_deleted_flag(
2182 		rec, page_is_comp(buf_block_get_frame(block)));
2183 
2184 	is_hashed = (block->index != NULL);
2185 
2186 	if (is_hashed) {
2187 		/* TO DO: Can we skip this if none of the fields
2188 		index->search_info->curr_n_fields
2189 		are being updated? */
2190 
2191 		/* The function row_upd_changes_ord_field_binary works only
2192 		if the update vector was built for a clustered index, we must
2193 		NOT call it if index is secondary */
2194 
2195 		if (!dict_index_is_clust(index)
2196 		    || row_upd_changes_ord_field_binary(index, update, thr,
2197 							NULL, NULL)) {
2198 
2199 			/* Remove possible hash index pointer to this record */
2200 			btr_search_update_hash_on_delete(cursor);
2201 		}
2202 
2203 		rw_lock_x_lock(btr_search_get_latch(cursor->index));
2204 	}
2205 
2206 	row_upd_rec_in_place(rec, index, offsets, update, page_zip);
2207 
2208 	if (is_hashed) {
2209 		rw_lock_x_unlock(btr_search_get_latch(cursor->index));
2210 	}
2211 
2212 	btr_cur_update_in_place_log(flags, rec, index, update,
2213 				    trx_id, roll_ptr, mtr);
2214 
2215 	if (was_delete_marked
2216 	    && !rec_get_deleted_flag(
2217 		    rec, page_is_comp(buf_block_get_frame(block)))) {
2218 		/* The new updated record owns its possible externally
2219 		stored fields */
2220 
2221 		btr_cur_unmark_extern_fields(page_zip,
2222 					     rec, index, offsets, mtr);
2223 	}
2224 
2225 	ut_ad(err == DB_SUCCESS);
2226 
2227 func_exit:
2228 	if (page_zip
2229 	    && !(flags & BTR_KEEP_IBUF_BITMAP)
2230 	    && !dict_index_is_clust(index)
2231 	    && page_is_leaf(buf_nonnull_block_get_frame(block))) {
2232 		/* Update the free bits in the insert buffer. */
2233 		ibuf_update_free_bits_zip(block, mtr);
2234 	}
2235 
2236 	return(err);
2237 }
2238 
2239 /*************************************************************//**
2240 Tries to update a record on a page in an index tree. It is assumed that mtr
2241 holds an x-latch on the page. The operation does not succeed if there is too
2242 little space on the page or if the update would result in too empty a page,
2243 so that tree compression is recommended. We assume here that the ordering
2244 fields of the record do not change.
2245 @return error code, including
2246 @retval DB_SUCCESS on success
2247 @retval DB_OVERFLOW if the updated record does not fit
2248 @retval DB_UNDERFLOW if the page would become too empty
2249 @retval DB_ZIP_OVERFLOW if there is not enough space left
2250 on the compressed page (IBUF_BITMAP_FREE was reset outside mtr) */
2251 UNIV_INTERN
2252 dberr_t
btr_cur_optimistic_update(ulint flags,btr_cur_t * cursor,ulint ** offsets,mem_heap_t ** heap,const upd_t * update,ulint cmpl_info,que_thr_t * thr,trx_id_t trx_id,mtr_t * mtr)2253 btr_cur_optimistic_update(
2254 /*======================*/
2255 	ulint		flags,	/*!< in: undo logging and locking flags */
2256 	btr_cur_t*	cursor,	/*!< in: cursor on the record to update;
2257 				cursor stays valid and positioned on the
2258 				same record */
2259 	ulint**		offsets,/*!< out: offsets on cursor->page_cur.rec */
2260 	mem_heap_t**	heap,	/*!< in/out: pointer to NULL or memory heap */
2261 	const upd_t*	update,	/*!< in: update vector; this must also
2262 				contain trx id and roll ptr fields */
2263 	ulint		cmpl_info,/*!< in: compiler info on secondary index
2264 				updates */
2265 	que_thr_t*	thr,	/*!< in: query thread */
2266 	trx_id_t	trx_id,	/*!< in: transaction id */
2267 	mtr_t*		mtr)	/*!< in/out: mini-transaction; if this
2268 				is a secondary index, the caller must
2269 				mtr_commit(mtr) before latching any
2270 				further pages */
2271 {
2272 	dict_index_t*	index;
2273 	page_cur_t*	page_cursor;
2274 	dberr_t		err;
2275 	buf_block_t*	block;
2276 	page_t*		page;
2277 	page_zip_des_t*	page_zip;
2278 	rec_t*		rec;
2279 	ulint		max_size;
2280 	ulint		new_rec_size;
2281 	ulint		old_rec_size;
2282 	ulint		max_ins_size = 0;
2283 	dtuple_t*	new_entry;
2284 	roll_ptr_t	roll_ptr;
2285 	ulint		i;
2286 	ulint		n_ext;
2287 
2288 	block = btr_cur_get_block(cursor);
2289 	page = buf_block_get_frame(block);
2290 	rec = btr_cur_get_rec(cursor);
2291 	index = cursor->index;
2292 	ut_ad(!!page_rec_is_comp(rec) == dict_table_is_comp(index->table));
2293 	ut_ad(thr_get_trx(thr)->fake_changes
2294 	      || mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
2295 	/* The insert buffer tree should never be updated in place. */
2296 	ut_ad(!dict_index_is_ibuf(index));
2297 	ut_ad(dict_index_is_online_ddl(index) == !!(flags & BTR_CREATE_FLAG)
2298 	      || dict_index_is_clust(index));
2299 	ut_ad(thr_get_trx(thr)->id == trx_id
2300 	      || (flags & ~(BTR_KEEP_POS_FLAG | BTR_KEEP_IBUF_BITMAP))
2301 	      == (BTR_NO_UNDO_LOG_FLAG | BTR_NO_LOCKING_FLAG
2302 		  | BTR_CREATE_FLAG | BTR_KEEP_SYS_FLAG));
2303 	ut_ad(fil_page_get_type(page) == FIL_PAGE_INDEX);
2304 	ut_ad(btr_page_get_index_id(page) == index->id);
2305 
2306 	*offsets = rec_get_offsets(rec, index, *offsets,
2307 				   ULINT_UNDEFINED, heap);
2308 #if defined UNIV_DEBUG || defined UNIV_BLOB_LIGHT_DEBUG
2309 	ut_a(!rec_offs_any_null_extern(rec, *offsets)
2310 	     || trx_is_recv(thr_get_trx(thr)));
2311 #endif /* UNIV_DEBUG || UNIV_BLOB_LIGHT_DEBUG */
2312 
2313 #ifdef UNIV_DEBUG
2314 	if (btr_cur_print_record_ops) {
2315 		btr_cur_trx_report(trx_id, index, "update ");
2316 		rec_print_new(stderr, rec, *offsets);
2317 	}
2318 #endif /* UNIV_DEBUG */
2319 
2320 	if (!row_upd_changes_field_size_or_external(index, *offsets, update)) {
2321 
2322 		/* The simplest and the most common case: the update does not
2323 		change the size of any field and none of the updated fields is
2324 		externally stored in rec or update, and there is enough space
2325 		on the compressed page to log the update. */
2326 
2327 		return(btr_cur_update_in_place(
2328 			       flags, cursor, *offsets, update,
2329 			       cmpl_info, thr, trx_id, mtr));
2330 	}
2331 
2332 	if (rec_offs_any_extern(*offsets)) {
2333 any_extern:
2334 		/* Externally stored fields are treated in pessimistic
2335 		update */
2336 
2337 		return(DB_OVERFLOW);
2338 	}
2339 
2340 	for (i = 0; i < upd_get_n_fields(update); i++) {
2341 		if (dfield_is_ext(&upd_get_nth_field(update, i)->new_val)) {
2342 
2343 			goto any_extern;
2344 		}
2345 	}
2346 
2347 	page_cursor = btr_cur_get_page_cur(cursor);
2348 
2349 	if (!*heap) {
2350 		*heap = mem_heap_create(
2351 			rec_offs_size(*offsets)
2352 			+ DTUPLE_EST_ALLOC(rec_offs_n_fields(*offsets)));
2353 	}
2354 
2355 	new_entry = row_rec_to_index_entry(rec, index, *offsets,
2356 					   &n_ext, *heap);
2357 	/* We checked above that there are no externally stored fields. */
2358 	ut_a(!n_ext);
2359 
2360 	/* The page containing the clustered index record
2361 	corresponding to new_entry is latched in mtr.
2362 	Thus the following call is safe. */
2363 	row_upd_index_replace_new_col_vals_index_pos(new_entry, index, update,
2364 						     FALSE, *heap);
2365 	old_rec_size = rec_offs_size(*offsets);
2366 	new_rec_size = rec_get_converted_size(index, new_entry, 0);
2367 
2368 	page_zip = buf_block_get_page_zip(block);
2369 #ifdef UNIV_ZIP_DEBUG
2370 	ut_a(!page_zip || page_zip_validate(page_zip, page, index));
2371 #endif /* UNIV_ZIP_DEBUG */
2372 
2373 	if (page_zip) {
2374 		if (!btr_cur_update_alloc_zip(
2375 			    page_zip, page_cursor, index, *offsets,
2376 			    new_rec_size, true, mtr, thr_get_trx(thr))) {
2377 			return(DB_ZIP_OVERFLOW);
2378 		}
2379 
2380 		rec = page_cur_get_rec(page_cursor);
2381 	}
2382 
2383 	if (UNIV_UNLIKELY(new_rec_size
2384 			  >= (page_get_free_space_of_empty(page_is_comp(page))
2385 			      / 2))) {
2386 		/* We may need to update the IBUF_BITMAP_FREE
2387 		bits after a reorganize that was done in
2388 		btr_cur_update_alloc_zip(). */
2389 		err = DB_OVERFLOW;
2390 		goto func_exit;
2391 	}
2392 
2393 	if (UNIV_UNLIKELY(page_get_data_size(page)
2394 			  - old_rec_size + new_rec_size
2395 			  < BTR_CUR_PAGE_COMPRESS_LIMIT)) {
2396 		/* We may need to update the IBUF_BITMAP_FREE
2397 		bits after a reorganize that was done in
2398 		btr_cur_update_alloc_zip(). */
2399 
2400 		/* The page would become too empty */
2401 		err = DB_UNDERFLOW;
2402 		goto func_exit;
2403 	}
2404 
2405 	/* We do not attempt to reorganize if the page is compressed.
2406 	This is because the page may fail to compress after reorganization. */
2407 	max_size = page_zip
2408 		? page_get_max_insert_size(page, 1)
2409 		: (old_rec_size
2410 		   + page_get_max_insert_size_after_reorganize(page, 1));
2411 
2412 	if (!page_zip) {
2413 		max_ins_size = page_get_max_insert_size_after_reorganize(page, 1);
2414 	}
2415 
2416 	if (!(((max_size >= BTR_CUR_PAGE_REORGANIZE_LIMIT)
2417 	       && (max_size >= new_rec_size))
2418 	      || (page_get_n_recs(page) <= 1))) {
2419 
2420 		/* We may need to update the IBUF_BITMAP_FREE
2421 		bits after a reorganize that was done in
2422 		btr_cur_update_alloc_zip(). */
2423 
2424 		/* There was not enough space, or it did not pay to
2425 		reorganize: for simplicity, we decide what to do assuming a
2426 		reorganization is needed, though it might not be necessary */
2427 
2428 		err = DB_OVERFLOW;
2429 		goto func_exit;
2430 	}
2431 
2432 	/* Do lock checking and undo logging */
2433 	err = btr_cur_upd_lock_and_undo(flags, cursor, *offsets,
2434 					update, cmpl_info,
2435 					thr, mtr, &roll_ptr);
2436 	if (err != DB_SUCCESS) {
2437 		/* We may need to update the IBUF_BITMAP_FREE
2438 		bits after a reorganize that was done in
2439 		btr_cur_update_alloc_zip(). */
2440 		goto func_exit;
2441 	}
2442 
2443 	if (UNIV_UNLIKELY(thr_get_trx(thr)->fake_changes)) {
2444 		/* skip CHANGE, LOG */
2445 		ut_ad(err == DB_SUCCESS);
2446 		return(DB_SUCCESS);
2447 	}
2448 
2449 	/* Ok, we may do the replacement. Store on the page infimum the
2450 	explicit locks on rec, before deleting rec (see the comment in
2451 	btr_cur_pessimistic_update). */
2452 
2453 	lock_rec_store_on_page_infimum(block, rec);
2454 
2455 	btr_search_update_hash_on_delete(cursor);
2456 
2457 	page_cur_delete_rec(page_cursor, index, *offsets, mtr);
2458 
2459 	page_cur_move_to_prev(page_cursor);
2460 
2461 	if (!(flags & BTR_KEEP_SYS_FLAG)) {
2462 		row_upd_index_entry_sys_field(new_entry, index, DATA_ROLL_PTR,
2463 					      roll_ptr);
2464 		row_upd_index_entry_sys_field(new_entry, index, DATA_TRX_ID,
2465 					      trx_id);
2466 	}
2467 
2468 	/* There are no externally stored columns in new_entry */
2469 	rec = btr_cur_insert_if_possible(
2470 		cursor, new_entry, offsets, heap, 0/*n_ext*/, mtr);
2471 	ut_a(rec); /* <- We calculated above the insert would fit */
2472 
2473 	/* Restore the old explicit lock state on the record */
2474 
2475 	lock_rec_restore_from_page_infimum(block, rec, block);
2476 
2477 	page_cur_move_to_next(page_cursor);
2478 	ut_ad(err == DB_SUCCESS);
2479 
2480 func_exit:
2481 	if (!(flags & BTR_KEEP_IBUF_BITMAP)
2482 	    && !dict_index_is_clust(index)
2483 	    && page_is_leaf(page)) {
2484 
2485 		if (page_zip) {
2486 			ibuf_update_free_bits_zip(block, mtr);
2487 		} else {
2488 			ibuf_update_free_bits_low(block, max_ins_size, mtr);
2489 		}
2490 	}
2491 
2492 	return(err);
2493 }
2494 
2495 /*************************************************************//**
2496 If, in a split, a new supremum record was created as the predecessor of the
2497 updated record, the supremum record must inherit exactly the locks on the
2498 updated record. In the split it may have inherited locks from the successor
2499 of the updated record, which is not correct. This function restores the
2500 right locks for the new supremum. */
2501 static
2502 void
btr_cur_pess_upd_restore_supremum(buf_block_t * block,const rec_t * rec,mtr_t * mtr)2503 btr_cur_pess_upd_restore_supremum(
2504 /*==============================*/
2505 	buf_block_t*	block,	/*!< in: buffer block of rec */
2506 	const rec_t*	rec,	/*!< in: updated record */
2507 	mtr_t*		mtr)	/*!< in: mtr */
2508 {
2509 	page_t*		page;
2510 	buf_block_t*	prev_block;
2511 	ulint		space;
2512 	ulint		zip_size;
2513 	ulint		prev_page_no;
2514 
2515 	page = buf_block_get_frame(block);
2516 
2517 	if (page_rec_get_next(page_get_infimum_rec(page)) != rec) {
2518 		/* Updated record is not the first user record on its page */
2519 
2520 		return;
2521 	}
2522 
2523 	space = buf_block_get_space(block);
2524 	zip_size = buf_block_get_zip_size(block);
2525 	prev_page_no = btr_page_get_prev(page, mtr);
2526 
2527 	ut_ad(prev_page_no != FIL_NULL);
2528 	prev_block = buf_page_get_with_no_latch(space, zip_size,
2529 						prev_page_no, mtr);
2530 #ifdef UNIV_BTR_DEBUG
2531 	ut_a(btr_page_get_next(prev_block->frame, mtr)
2532 	     == page_get_page_no(page));
2533 #endif /* UNIV_BTR_DEBUG */
2534 
2535 	/* We must already have an x-latch on prev_block! */
2536 	ut_ad(mtr_memo_contains(mtr, prev_block, MTR_MEMO_PAGE_X_FIX));
2537 
2538 	lock_rec_reset_and_inherit_gap_locks(prev_block, block,
2539 					     PAGE_HEAP_NO_SUPREMUM,
2540 					     page_rec_get_heap_no(rec));
2541 }
2542 
2543 /*************************************************************//**
2544 Check if the total length of the modified blob for the row is within 10%
2545 of the total redo log size.  This constraint on the blob length is to
2546 avoid overwriting the redo logs beyond the last checkpoint lsn.
2547 @return	DB_SUCCESS or DB_TOO_BIG_FOR_REDO. */
2548 static
2549 dberr_t
btr_check_blob_limit(const big_rec_t * big_rec_vec)2550 btr_check_blob_limit(const big_rec_t*	big_rec_vec)
2551 {
2552 	const	ib_uint64_t redo_size = srv_n_log_files * srv_log_file_size
2553 		* UNIV_PAGE_SIZE;
2554 	const	ib_uint64_t redo_10p = redo_size / 10;
2555 	ib_uint64_t	total_blob_len = 0;
2556 	dberr_t	err = DB_SUCCESS;
2557 
2558 	/* Calculate the total number of bytes for blob data */
2559 	for (ulint i = 0; i < big_rec_vec->n_fields; i++) {
2560 		total_blob_len += big_rec_vec->fields[i].len;
2561 	}
2562 
2563 	if (total_blob_len > redo_10p) {
2564 		ib_logf(IB_LOG_LEVEL_ERROR, "The total blob data"
2565 			" length (" UINT64PF ") is greater than"
2566 			" 10%% of the total redo log size (" UINT64PF
2567 			"). Please increase total redo log size.",
2568 			total_blob_len, redo_size);
2569 		err = DB_TOO_BIG_FOR_REDO;
2570 	}
2571 
2572 	return(err);
2573 }
2574 
2575 /*************************************************************//**
2576 Performs an update of a record on a page of a tree. It is assumed
2577 that mtr holds an x-latch on the tree and on the cursor page. If the
2578 update is made on the leaf level, to avoid deadlocks, mtr must also
2579 own x-latches to brothers of page, if those brothers exist. We assume
2580 here that the ordering fields of the record do not change.
2581 @return	DB_SUCCESS or error code */
2582 UNIV_INTERN
2583 dberr_t
btr_cur_pessimistic_update(ulint flags,btr_cur_t * cursor,ulint ** offsets,mem_heap_t ** offsets_heap,mem_heap_t * entry_heap,big_rec_t ** big_rec,const upd_t * update,ulint cmpl_info,que_thr_t * thr,trx_id_t trx_id,mtr_t * mtr)2584 btr_cur_pessimistic_update(
2585 /*=======================*/
2586 	ulint		flags,	/*!< in: undo logging, locking, and rollback
2587 				flags */
2588 	btr_cur_t*	cursor,	/*!< in/out: cursor on the record to update;
2589 				cursor may become invalid if *big_rec == NULL
2590 				|| !(flags & BTR_KEEP_POS_FLAG) */
2591 	ulint**		offsets,/*!< out: offsets on cursor->page_cur.rec */
2592 	mem_heap_t**	offsets_heap,
2593 				/*!< in/out: pointer to memory heap
2594 				that can be emptied, or NULL */
2595 	mem_heap_t*	entry_heap,
2596 				/*!< in/out: memory heap for allocating
2597 				big_rec and the index tuple */
2598 	big_rec_t**	big_rec,/*!< out: big rec vector whose fields have to
2599 				be stored externally by the caller, or NULL */
2600 	const upd_t*	update,	/*!< in: update vector; this is allowed also
2601 				contain trx id and roll ptr fields, but
2602 				the values in update vector have no effect */
2603 	ulint		cmpl_info,/*!< in: compiler info on secondary index
2604 				updates */
2605 	que_thr_t*	thr,	/*!< in: query thread */
2606 	trx_id_t	trx_id,	/*!< in: transaction id */
2607 	mtr_t*		mtr)	/*!< in/out: mini-transaction; must be
2608 				committed before latching any further pages */
2609 {
2610 	big_rec_t*	big_rec_vec	= NULL;
2611 	big_rec_t*	dummy_big_rec;
2612 	dict_index_t*	index;
2613 	buf_block_t*	block;
2614 	page_t*		page;
2615 	page_zip_des_t*	page_zip;
2616 	rec_t*		rec;
2617 	page_cur_t*	page_cursor;
2618 	dberr_t		err;
2619 	dberr_t		optim_err;
2620 	roll_ptr_t	roll_ptr;
2621 	ibool		was_first;
2622 	ulint		n_reserved	= 0;
2623 	ulint		n_ext;
2624 	trx_t*		trx;
2625 	ulint		max_ins_size	= 0;
2626 
2627 	*offsets = NULL;
2628 	*big_rec = NULL;
2629 
2630 	block = btr_cur_get_block(cursor);
2631 	page = buf_block_get_frame(block);
2632 	page_zip = buf_block_get_page_zip(block);
2633 	index = cursor->index;
2634 
2635 	ut_ad(thr_get_trx(thr)->fake_changes
2636 	      || mtr_memo_contains(mtr, dict_index_get_lock(index),
2637 				   MTR_MEMO_X_LOCK));
2638 	ut_ad(thr_get_trx(thr)->fake_changes
2639 	      || mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
2640 #ifdef UNIV_ZIP_DEBUG
2641 	ut_a(!page_zip || page_zip_validate(page_zip, page, index));
2642 #endif /* UNIV_ZIP_DEBUG */
2643 	/* The insert buffer tree should never be updated in place. */
2644 	ut_ad(!dict_index_is_ibuf(index));
2645 	ut_ad(dict_index_is_online_ddl(index) == !!(flags & BTR_CREATE_FLAG)
2646 	      || dict_index_is_clust(index));
2647 	ut_ad(thr_get_trx(thr)->id == trx_id
2648 	      || (flags & ~BTR_KEEP_POS_FLAG)
2649 	      == (BTR_NO_UNDO_LOG_FLAG | BTR_NO_LOCKING_FLAG
2650 		  | BTR_CREATE_FLAG | BTR_KEEP_SYS_FLAG));
2651 
2652 	err = optim_err = btr_cur_optimistic_update(
2653 		flags | BTR_KEEP_IBUF_BITMAP,
2654 		cursor, offsets, offsets_heap, update,
2655 		cmpl_info, thr, trx_id, mtr);
2656 
2657 	switch (err) {
2658 	case DB_ZIP_OVERFLOW:
2659 	case DB_UNDERFLOW:
2660 	case DB_OVERFLOW:
2661 		break;
2662 	default:
2663 	err_exit:
2664 		/* We suppressed this with BTR_KEEP_IBUF_BITMAP.
2665 		For DB_ZIP_OVERFLOW, the IBUF_BITMAP_FREE bits were
2666 		already reset by btr_cur_update_alloc_zip() if the
2667 		page was recompressed. */
2668 		if (page_zip
2669 		    && optim_err != DB_ZIP_OVERFLOW
2670 		    && !dict_index_is_clust(index)
2671 		    && page_is_leaf(page)) {
2672 			ibuf_update_free_bits_zip(block, mtr);
2673 		}
2674 
2675 		return(err);
2676 	}
2677 
2678 	/* Do lock checking and undo logging */
2679 	err = btr_cur_upd_lock_and_undo(flags, cursor, *offsets,
2680 					update, cmpl_info,
2681 					thr, mtr, &roll_ptr);
2682 	if (err != DB_SUCCESS) {
2683 		goto err_exit;
2684 	}
2685 
2686 	if (optim_err == DB_OVERFLOW) {
2687 		ulint	reserve_flag;
2688 		ulint	n_extents;
2689 
2690 		/* First reserve enough free space for the file segments
2691 		of the index tree, so that the update will not fail because
2692 		of lack of space */
2693 		if (UNIV_UNLIKELY(cursor->tree_height == ULINT_UNDEFINED)) {
2694 			/* When the tree height is uninitialized due to fake
2695 			changes, reserve some hardcoded number of extents.  */
2696 			ut_a(thr_get_trx(thr)->fake_changes);
2697 			n_extents = 3;
2698 		}
2699 		else {
2700 			n_extents = cursor->tree_height / 16 + 3;
2701 		}
2702 
2703 		if (flags & BTR_NO_UNDO_LOG_FLAG) {
2704 			reserve_flag = FSP_CLEANING;
2705 		} else {
2706 			reserve_flag = FSP_NORMAL;
2707 		}
2708 
2709 		if (!fsp_reserve_free_extents(&n_reserved, index->space,
2710 					      n_extents, reserve_flag, mtr)) {
2711 			err = DB_OUT_OF_FILE_SPACE;
2712 			goto err_exit;
2713 		}
2714 	}
2715 
2716 	rec = btr_cur_get_rec(cursor);
2717 
2718 	*offsets = rec_get_offsets(
2719 		rec, index, *offsets, ULINT_UNDEFINED, offsets_heap);
2720 
2721 	dtuple_t*	new_entry = row_rec_to_index_entry(
2722 		rec, index, *offsets, &n_ext, entry_heap);
2723 
2724 	/* The page containing the clustered index record
2725 	corresponding to new_entry is latched in mtr.  If the
2726 	clustered index record is delete-marked, then its externally
2727 	stored fields cannot have been purged yet, because then the
2728 	purge would also have removed the clustered index record
2729 	itself.  Thus the following call is safe. */
2730 	row_upd_index_replace_new_col_vals_index_pos(new_entry, index, update,
2731 						     FALSE, entry_heap);
2732 
2733 	trx = thr_get_trx(thr);
2734 
2735 	if (!(flags & BTR_KEEP_SYS_FLAG) && UNIV_LIKELY(!trx->fake_changes)) {
2736 		row_upd_index_entry_sys_field(new_entry, index, DATA_ROLL_PTR,
2737 					      roll_ptr);
2738 		row_upd_index_entry_sys_field(new_entry, index, DATA_TRX_ID,
2739 					      trx_id);
2740 	}
2741 
2742 	if ((flags & BTR_NO_UNDO_LOG_FLAG) && rec_offs_any_extern(*offsets)) {
2743 		/* We are in a transaction rollback undoing a row
2744 		update: we must free possible externally stored fields
2745 		which got new values in the update, if they are not
2746 		inherited values. They can be inherited if we have
2747 		updated the primary key to another value, and then
2748 		update it back again. */
2749 
2750 		ut_ad(big_rec_vec == NULL);
2751 
2752 		/* fake_changes should not cause undo. so never reaches here */
2753 		ut_ad(!(trx->fake_changes));
2754 
2755 		btr_rec_free_updated_extern_fields(
2756 			index, rec, page_zip, *offsets, update,
2757 			trx_is_recv(thr_get_trx(thr))
2758 			? RB_RECOVERY : RB_NORMAL, mtr);
2759 	}
2760 
2761 	/* We have to set appropriate extern storage bits in the new
2762 	record to be inserted: we have to remember which fields were such */
2763 
2764 	ut_ad(!page_is_comp(page) || !rec_get_node_ptr_flag(rec));
2765 	ut_ad(rec_offs_validate(rec, index, *offsets));
2766 	n_ext += btr_push_update_extern_fields(new_entry, update, entry_heap);
2767 
2768 	if (page_zip) {
2769 		ut_ad(page_is_comp(page));
2770 		if (page_zip_rec_needs_ext(
2771 			    rec_get_converted_size(index, new_entry, n_ext),
2772 			    TRUE,
2773 			    dict_index_get_n_fields(index),
2774 			    page_zip_get_size(page_zip))) {
2775 
2776 			goto make_external;
2777 		}
2778 	} else if (page_zip_rec_needs_ext(
2779 			   rec_get_converted_size(index, new_entry, n_ext),
2780 			   page_is_comp(page), 0, 0)) {
2781 make_external:
2782 		big_rec_vec = dtuple_convert_big_rec(index, new_entry, &n_ext);
2783 		if (UNIV_UNLIKELY(big_rec_vec == NULL)) {
2784 
2785 			/* We cannot goto return_after_reservations,
2786 			because we may need to update the
2787 			IBUF_BITMAP_FREE bits, which was suppressed by
2788 			BTR_KEEP_IBUF_BITMAP. */
2789 #ifdef UNIV_ZIP_DEBUG
2790 			ut_a(!page_zip
2791 			     || page_zip_validate(page_zip, page, index));
2792 #endif /* UNIV_ZIP_DEBUG */
2793 			if (n_reserved > 0) {
2794 				fil_space_release_free_extents(
2795 					index->space, n_reserved);
2796 			}
2797 
2798 			err = DB_TOO_BIG_RECORD;
2799 			goto err_exit;
2800 		}
2801 
2802 		ut_ad(page_is_leaf(page));
2803 		ut_ad(dict_index_is_clust(index));
2804 		ut_ad(flags & BTR_KEEP_POS_FLAG);
2805 	}
2806 
2807 	if (UNIV_UNLIKELY(trx->fake_changes)) {
2808 		/* skip CHANGE, LOG */
2809 		err = DB_SUCCESS;
2810 		goto return_after_reservations;
2811 	}
2812 
2813 	if (big_rec_vec) {
2814 
2815 		err = btr_check_blob_limit(big_rec_vec);
2816 
2817 		if (err != DB_SUCCESS) {
2818 			if (n_reserved > 0) {
2819 				fil_space_release_free_extents(
2820 					index->space, n_reserved);
2821 			}
2822 			goto err_exit;
2823 		}
2824 	}
2825 
2826 	if (!page_zip) {
2827 		max_ins_size = page_get_max_insert_size_after_reorganize(page, 1);
2828 	}
2829 
2830 	/* Store state of explicit locks on rec on the page infimum record,
2831 	before deleting rec. The page infimum acts as a dummy carrier of the
2832 	locks, taking care also of lock releases, before we can move the locks
2833 	back on the actual record. There is a special case: if we are
2834 	inserting on the root page and the insert causes a call of
2835 	btr_root_raise_and_insert. Therefore we cannot in the lock system
2836 	delete the lock structs set on the root page even if the root
2837 	page carries just node pointers. */
2838 
2839 	lock_rec_store_on_page_infimum(block, rec);
2840 
2841 	btr_search_update_hash_on_delete(cursor);
2842 
2843 #ifdef UNIV_ZIP_DEBUG
2844 	ut_a(!page_zip || page_zip_validate(page_zip, page, index));
2845 #endif /* UNIV_ZIP_DEBUG */
2846 	page_cursor = btr_cur_get_page_cur(cursor);
2847 
2848 	page_cur_delete_rec(page_cursor, index, *offsets, mtr);
2849 
2850 	page_cur_move_to_prev(page_cursor);
2851 
2852 	rec = btr_cur_insert_if_possible(cursor, new_entry,
2853 					 offsets, offsets_heap, n_ext, mtr);
2854 
2855 	if (rec) {
2856 		page_cursor->rec = rec;
2857 
2858 		lock_rec_restore_from_page_infimum(btr_cur_get_block(cursor),
2859 						   rec, block);
2860 
2861 		if (!rec_get_deleted_flag(rec, rec_offs_comp(*offsets))) {
2862 			/* The new inserted record owns its possible externally
2863 			stored fields */
2864 			btr_cur_unmark_extern_fields(
2865 				page_zip, rec, index, *offsets, mtr);
2866 		}
2867 
2868 		bool adjust = big_rec_vec && (flags & BTR_KEEP_POS_FLAG);
2869 
2870 		if (btr_cur_compress_if_useful(cursor, adjust, mtr)) {
2871 			if (adjust) {
2872 				rec_offs_make_valid(
2873 					page_cursor->rec, index, *offsets);
2874 			}
2875 		} else if (!dict_index_is_clust(index)
2876 			   && page_is_leaf(page)) {
2877 
2878 			/* Update the free bits in the insert buffer.
2879 			This is the same block which was skipped by
2880 			BTR_KEEP_IBUF_BITMAP. */
2881 			if (page_zip) {
2882 				ibuf_update_free_bits_zip(block, mtr);
2883 			} else {
2884 				ibuf_update_free_bits_low(block, max_ins_size,
2885 							  mtr);
2886 			}
2887 		}
2888 
2889 		err = DB_SUCCESS;
2890 		goto return_after_reservations;
2891 	} else {
2892 		/* If the page is compressed and it initially
2893 		compresses very well, and there is a subsequent insert
2894 		of a badly-compressing record, it is possible for
2895 		btr_cur_optimistic_update() to return DB_UNDERFLOW and
2896 		btr_cur_insert_if_possible() to return FALSE. */
2897 		ut_a(page_zip || optim_err != DB_UNDERFLOW);
2898 
2899 		/* Out of space: reset the free bits.
2900 		This is the same block which was skipped by
2901 		BTR_KEEP_IBUF_BITMAP. */
2902 		if (!dict_index_is_clust(index) && page_is_leaf(page)) {
2903 			ibuf_reset_free_bits(block);
2904 		}
2905 	}
2906 
2907 	if (big_rec_vec) {
2908 		ut_ad(page_is_leaf(page));
2909 		ut_ad(dict_index_is_clust(index));
2910 		ut_ad(flags & BTR_KEEP_POS_FLAG);
2911 
2912 		/* btr_page_split_and_insert() in
2913 		btr_cur_pessimistic_insert() invokes
2914 		mtr_memo_release(mtr, index->lock, MTR_MEMO_X_LOCK).
2915 		We must keep the index->lock when we created a
2916 		big_rec, so that row_upd_clust_rec() can store the
2917 		big_rec in the same mini-transaction. */
2918 
2919 		mtr_x_lock(dict_index_get_lock(index), mtr);
2920 	}
2921 
2922 	/* Was the record to be updated positioned as the first user
2923 	record on its page? */
2924 	was_first = page_cur_is_before_first(page_cursor);
2925 
2926 	/* Lock checks and undo logging were already performed by
2927 	btr_cur_upd_lock_and_undo(). We do not try
2928 	btr_cur_optimistic_insert() because
2929 	btr_cur_insert_if_possible() already failed above. */
2930 
2931 	err = btr_cur_pessimistic_insert(BTR_NO_UNDO_LOG_FLAG
2932 					 | BTR_NO_LOCKING_FLAG
2933 					 | BTR_KEEP_SYS_FLAG,
2934 					 cursor, offsets, offsets_heap,
2935 					 new_entry, &rec,
2936 					 &dummy_big_rec, n_ext, NULL, mtr);
2937 	ut_a(rec);
2938 	ut_a(err == DB_SUCCESS);
2939 	ut_a(dummy_big_rec == NULL);
2940 	ut_ad(rec_offs_validate(rec, cursor->index, *offsets));
2941 	page_cursor->rec = rec;
2942 
2943 	if (dict_index_is_sec_or_ibuf(index)) {
2944 		/* Update PAGE_MAX_TRX_ID in the index page header.
2945 		It was not updated by btr_cur_pessimistic_insert()
2946 		because of BTR_NO_LOCKING_FLAG. */
2947 		buf_block_t*	rec_block;
2948 
2949 		rec_block = btr_cur_get_block(cursor);
2950 
2951 		page_update_max_trx_id(rec_block,
2952 				       buf_block_get_page_zip(rec_block),
2953 				       trx_id, mtr);
2954 	}
2955 
2956 	if (!rec_get_deleted_flag(rec, rec_offs_comp(*offsets))) {
2957 		/* The new inserted record owns its possible externally
2958 		stored fields */
2959 		buf_block_t*	rec_block = btr_cur_get_block(cursor);
2960 
2961 #ifdef UNIV_ZIP_DEBUG
2962 		ut_a(!page_zip || page_zip_validate(page_zip, page, index));
2963 		page = buf_block_get_frame(rec_block);
2964 #endif /* UNIV_ZIP_DEBUG */
2965 		page_zip = buf_block_get_page_zip(rec_block);
2966 
2967 		btr_cur_unmark_extern_fields(page_zip,
2968 					     rec, index, *offsets, mtr);
2969 	}
2970 
2971 	lock_rec_restore_from_page_infimum(btr_cur_get_block(cursor),
2972 					   rec, block);
2973 
2974 	/* If necessary, restore also the correct lock state for a new,
2975 	preceding supremum record created in a page split. While the old
2976 	record was nonexistent, the supremum might have inherited its locks
2977 	from a wrong record. */
2978 
2979 	if (!was_first) {
2980 		btr_cur_pess_upd_restore_supremum(btr_cur_get_block(cursor),
2981 						  rec, mtr);
2982 	}
2983 
2984 return_after_reservations:
2985 #ifdef UNIV_ZIP_DEBUG
2986 	ut_a(!page_zip || page_zip_validate(page_zip, page, index));
2987 #endif /* UNIV_ZIP_DEBUG */
2988 
2989 	if (n_reserved > 0) {
2990 		fil_space_release_free_extents(index->space, n_reserved);
2991 	}
2992 
2993 	*big_rec = big_rec_vec;
2994 
2995 	return(err);
2996 }
2997 
2998 /*==================== B-TREE DELETE MARK AND UNMARK ===============*/
2999 
3000 /****************************************************************//**
3001 Writes the redo log record for delete marking or unmarking of an index
3002 record. */
3003 UNIV_INLINE
3004 void
btr_cur_del_mark_set_clust_rec_log(rec_t * rec,dict_index_t * index,trx_id_t trx_id,roll_ptr_t roll_ptr,mtr_t * mtr)3005 btr_cur_del_mark_set_clust_rec_log(
3006 /*===============================*/
3007 	rec_t*		rec,	/*!< in: record */
3008 	dict_index_t*	index,	/*!< in: index of the record */
3009 	trx_id_t	trx_id,	/*!< in: transaction id */
3010 	roll_ptr_t	roll_ptr,/*!< in: roll ptr to the undo log record */
3011 	mtr_t*		mtr)	/*!< in: mtr */
3012 {
3013 	byte*	log_ptr;
3014 
3015 	ut_ad(!!page_rec_is_comp(rec) == dict_table_is_comp(index->table));
3016 
3017 	log_ptr = mlog_open_and_write_index(mtr, rec, index,
3018 					    page_rec_is_comp(rec)
3019 					    ? MLOG_COMP_REC_CLUST_DELETE_MARK
3020 					    : MLOG_REC_CLUST_DELETE_MARK,
3021 					    1 + 1 + DATA_ROLL_PTR_LEN
3022 					    + 14 + 2);
3023 
3024 	if (!log_ptr) {
3025 		/* Logging in mtr is switched off during crash recovery */
3026 		return;
3027 	}
3028 
3029 	*log_ptr++ = 0;
3030 	*log_ptr++ = 1;
3031 
3032 	log_ptr = row_upd_write_sys_vals_to_log(
3033 		index, trx_id, roll_ptr, log_ptr, mtr);
3034 	mach_write_to_2(log_ptr, page_offset(rec));
3035 	log_ptr += 2;
3036 
3037 	mlog_close(mtr, log_ptr);
3038 }
3039 #endif /* !UNIV_HOTBACKUP */
3040 
3041 /****************************************************************//**
3042 Parses the redo log record for delete marking or unmarking of a clustered
3043 index record.
3044 @return	end of log record or NULL */
3045 UNIV_INTERN
3046 byte*
btr_cur_parse_del_mark_set_clust_rec(byte * ptr,byte * end_ptr,page_t * page,page_zip_des_t * page_zip,dict_index_t * index)3047 btr_cur_parse_del_mark_set_clust_rec(
3048 /*=================================*/
3049 	byte*		ptr,	/*!< in: buffer */
3050 	byte*		end_ptr,/*!< in: buffer end */
3051 	page_t*		page,	/*!< in/out: page or NULL */
3052 	page_zip_des_t*	page_zip,/*!< in/out: compressed page, or NULL */
3053 	dict_index_t*	index)	/*!< in: index corresponding to page */
3054 {
3055 	ulint		flags;
3056 	ulint		val;
3057 	ulint		pos;
3058 	trx_id_t	trx_id;
3059 	roll_ptr_t	roll_ptr;
3060 	ulint		offset;
3061 	rec_t*		rec;
3062 
3063 	ut_ad(!page
3064 	      || !!page_is_comp(page) == dict_table_is_comp(index->table));
3065 
3066 	if (end_ptr < ptr + 2) {
3067 
3068 		return(NULL);
3069 	}
3070 
3071 	flags = mach_read_from_1(ptr);
3072 	ptr++;
3073 	val = mach_read_from_1(ptr);
3074 	ptr++;
3075 
3076 	ptr = row_upd_parse_sys_vals(ptr, end_ptr, &pos, &trx_id, &roll_ptr);
3077 
3078 	if (ptr == NULL) {
3079 
3080 		return(NULL);
3081 	}
3082 
3083 	if (end_ptr < ptr + 2) {
3084 
3085 		return(NULL);
3086 	}
3087 
3088 	offset = mach_read_from_2(ptr);
3089 	ptr += 2;
3090 
3091 	ut_a(offset <= UNIV_PAGE_SIZE);
3092 
3093 	if (page) {
3094 		rec = page + offset;
3095 
3096 		/* We do not need to reserve btr_search_latch, as the page
3097 		is only being recovered, and there cannot be a hash index to
3098 		it. Besides, these fields are being updated in place
3099 		and the adaptive hash index does not depend on them. */
3100 
3101 		btr_rec_set_deleted_flag(rec, page_zip, val);
3102 
3103 		if (!(flags & BTR_KEEP_SYS_FLAG)) {
3104 			mem_heap_t*	heap		= NULL;
3105 			ulint		offsets_[REC_OFFS_NORMAL_SIZE];
3106 			rec_offs_init(offsets_);
3107 
3108 			row_upd_rec_sys_fields_in_recovery(
3109 				rec, page_zip,
3110 				rec_get_offsets(rec, index, offsets_,
3111 						ULINT_UNDEFINED, &heap),
3112 				pos, trx_id, roll_ptr);
3113 			if (UNIV_LIKELY_NULL(heap)) {
3114 				mem_heap_free(heap);
3115 			}
3116 		}
3117 	}
3118 
3119 	return(ptr);
3120 }
3121 
3122 #ifndef UNIV_HOTBACKUP
3123 /***********************************************************//**
3124 Marks a clustered index record deleted. Writes an undo log record to
3125 undo log on this delete marking. Writes in the trx id field the id
3126 of the deleting transaction, and in the roll ptr field pointer to the
3127 undo log record created.
3128 @return	DB_SUCCESS, DB_LOCK_WAIT, or error number */
3129 UNIV_INTERN
3130 dberr_t
btr_cur_del_mark_set_clust_rec(buf_block_t * block,rec_t * rec,dict_index_t * index,const ulint * offsets,que_thr_t * thr,mtr_t * mtr)3131 btr_cur_del_mark_set_clust_rec(
3132 /*===========================*/
3133 	buf_block_t*	block,	/*!< in/out: buffer block of the record */
3134 	rec_t*		rec,	/*!< in/out: record */
3135 	dict_index_t*	index,	/*!< in: clustered index of the record */
3136 	const ulint*	offsets,/*!< in: rec_get_offsets(rec) */
3137 	que_thr_t*	thr,	/*!< in: query thread */
3138 	mtr_t*		mtr)	/*!< in/out: mini-transaction */
3139 {
3140 	roll_ptr_t	roll_ptr;
3141 	dberr_t		err;
3142 	page_zip_des_t*	page_zip;
3143 	trx_t*		trx;
3144 
3145 	ut_ad(dict_index_is_clust(index));
3146 	ut_ad(rec_offs_validate(rec, index, offsets));
3147 	ut_ad(!!page_rec_is_comp(rec) == dict_table_is_comp(index->table));
3148 	ut_ad(buf_block_get_frame(block) == page_align(rec));
3149 	ut_ad(page_is_leaf(page_align(rec)));
3150 
3151 #ifdef UNIV_DEBUG
3152 	if (btr_cur_print_record_ops && (thr != NULL)) {
3153 		btr_cur_trx_report(thr_get_trx(thr)->id, index, "del mark ");
3154 		rec_print_new(stderr, rec, offsets);
3155 	}
3156 #endif /* UNIV_DEBUG */
3157 
3158 	ut_ad(dict_index_is_clust(index));
3159 	ut_ad(!rec_get_deleted_flag(rec, rec_offs_comp(offsets)));
3160 
3161 	if (UNIV_UNLIKELY(thr_get_trx(thr)->fake_changes)) {
3162 		/* skip LOCK, UNDO, CHANGE, LOG */
3163 		return(DB_SUCCESS);
3164 	}
3165 
3166 	err = lock_clust_rec_modify_check_and_lock(BTR_NO_LOCKING_FLAG, block,
3167 						   rec, index, offsets, thr);
3168 
3169 	if (err != DB_SUCCESS) {
3170 
3171 		return(err);
3172 	}
3173 
3174 	err = trx_undo_report_row_operation(0, TRX_UNDO_MODIFY_OP, thr,
3175 					    index, NULL, NULL, 0, rec, offsets,
3176 					    &roll_ptr);
3177 	if (err != DB_SUCCESS) {
3178 
3179 		return(err);
3180 	}
3181 
3182 	/* The btr_search_latch is not needed here, because
3183 	the adaptive hash index does not depend on the delete-mark
3184 	and the delete-mark is being updated in place. */
3185 
3186 	page_zip = buf_block_get_page_zip(block);
3187 
3188 	btr_blob_dbg_set_deleted_flag(rec, index, offsets, TRUE);
3189 	btr_rec_set_deleted_flag(rec, page_zip, TRUE);
3190 
3191 	trx = thr_get_trx(thr);
3192 
3193 	if (dict_index_is_online_ddl(index)) {
3194 		row_log_table_delete(rec, index, offsets, NULL);
3195 	}
3196 
3197 	row_upd_rec_sys_fields(rec, page_zip, index, offsets, trx, roll_ptr);
3198 
3199 	btr_cur_del_mark_set_clust_rec_log(rec, index, trx->id,
3200 					   roll_ptr, mtr);
3201 
3202 	return(err);
3203 }
3204 
3205 /****************************************************************//**
3206 Writes the redo log record for a delete mark setting of a secondary
3207 index record. */
3208 UNIV_INLINE
3209 void
btr_cur_del_mark_set_sec_rec_log(rec_t * rec,ibool val,mtr_t * mtr)3210 btr_cur_del_mark_set_sec_rec_log(
3211 /*=============================*/
3212 	rec_t*		rec,	/*!< in: record */
3213 	ibool		val,	/*!< in: value to set */
3214 	mtr_t*		mtr)	/*!< in: mtr */
3215 {
3216 	byte*	log_ptr;
3217 	ut_ad(val <= 1);
3218 
3219 	log_ptr = mlog_open(mtr, 11 + 1 + 2);
3220 
3221 	if (!log_ptr) {
3222 		/* Logging in mtr is switched off during crash recovery:
3223 		in that case mlog_open returns NULL */
3224 		return;
3225 	}
3226 
3227 	log_ptr = mlog_write_initial_log_record_fast(
3228 		rec, MLOG_REC_SEC_DELETE_MARK, log_ptr, mtr);
3229 	mach_write_to_1(log_ptr, val);
3230 	log_ptr++;
3231 
3232 	mach_write_to_2(log_ptr, page_offset(rec));
3233 	log_ptr += 2;
3234 
3235 	mlog_close(mtr, log_ptr);
3236 }
3237 #endif /* !UNIV_HOTBACKUP */
3238 
3239 /****************************************************************//**
3240 Parses the redo log record for delete marking or unmarking of a secondary
3241 index record.
3242 @return	end of log record or NULL */
3243 UNIV_INTERN
3244 byte*
btr_cur_parse_del_mark_set_sec_rec(byte * ptr,byte * end_ptr,page_t * page,page_zip_des_t * page_zip)3245 btr_cur_parse_del_mark_set_sec_rec(
3246 /*===============================*/
3247 	byte*		ptr,	/*!< in: buffer */
3248 	byte*		end_ptr,/*!< in: buffer end */
3249 	page_t*		page,	/*!< in/out: page or NULL */
3250 	page_zip_des_t*	page_zip)/*!< in/out: compressed page, or NULL */
3251 {
3252 	ulint	val;
3253 	ulint	offset;
3254 	rec_t*	rec;
3255 
3256 	if (end_ptr < ptr + 3) {
3257 
3258 		return(NULL);
3259 	}
3260 
3261 	val = mach_read_from_1(ptr);
3262 	ptr++;
3263 
3264 	offset = mach_read_from_2(ptr);
3265 	ptr += 2;
3266 
3267 	ut_a(offset <= UNIV_PAGE_SIZE);
3268 
3269 	if (page) {
3270 		rec = page + offset;
3271 
3272 		/* We do not need to reserve btr_search_latch, as the page
3273 		is only being recovered, and there cannot be a hash index to
3274 		it. Besides, the delete-mark flag is being updated in place
3275 		and the adaptive hash index does not depend on it. */
3276 
3277 		btr_rec_set_deleted_flag(rec, page_zip, val);
3278 	}
3279 
3280 	return(ptr);
3281 }
3282 
3283 #ifndef UNIV_HOTBACKUP
3284 /***********************************************************//**
3285 Sets a secondary index record delete mark to TRUE or FALSE.
3286 @return	DB_SUCCESS, DB_LOCK_WAIT, or error number */
3287 UNIV_INTERN
3288 dberr_t
btr_cur_del_mark_set_sec_rec(ulint flags,btr_cur_t * cursor,ibool val,que_thr_t * thr,mtr_t * mtr)3289 btr_cur_del_mark_set_sec_rec(
3290 /*=========================*/
3291 	ulint		flags,	/*!< in: locking flag */
3292 	btr_cur_t*	cursor,	/*!< in: cursor */
3293 	ibool		val,	/*!< in: value to set */
3294 	que_thr_t*	thr,	/*!< in: query thread */
3295 	mtr_t*		mtr)	/*!< in/out: mini-transaction */
3296 {
3297 	buf_block_t*	block;
3298 	rec_t*		rec;
3299 	dberr_t		err;
3300 
3301 	if (UNIV_UNLIKELY(thr_get_trx(thr)->fake_changes)) {
3302 		/* skip LOCK, CHANGE, LOG */
3303 		return(DB_SUCCESS);
3304 	}
3305 
3306 	block = btr_cur_get_block(cursor);
3307 	rec = btr_cur_get_rec(cursor);
3308 
3309 #ifdef UNIV_DEBUG
3310 	if (btr_cur_print_record_ops && (thr != NULL)) {
3311 		btr_cur_trx_report(thr_get_trx(thr)->id, cursor->index,
3312 				   "del mark ");
3313 		rec_print(stderr, rec, cursor->index);
3314 	}
3315 #endif /* UNIV_DEBUG */
3316 
3317 	err = lock_sec_rec_modify_check_and_lock(flags,
3318 						 btr_cur_get_block(cursor),
3319 						 rec, cursor->index, thr, mtr);
3320 	if (err != DB_SUCCESS) {
3321 
3322 		return(err);
3323 	}
3324 
3325 	ut_ad(!!page_rec_is_comp(rec)
3326 	      == dict_table_is_comp(cursor->index->table));
3327 
3328 	/* We do not need to reserve btr_search_latch, as the
3329 	delete-mark flag is being updated in place and the adaptive
3330 	hash index does not depend on it. */
3331 	btr_rec_set_deleted_flag(rec, buf_block_get_page_zip(block), val);
3332 
3333 	btr_cur_del_mark_set_sec_rec_log(rec, val, mtr);
3334 
3335 	return(DB_SUCCESS);
3336 }
3337 
3338 /***********************************************************//**
3339 Sets a secondary index record's delete mark to the given value. This
3340 function is only used by the insert buffer merge mechanism. */
3341 UNIV_INTERN
3342 void
btr_cur_set_deleted_flag_for_ibuf(rec_t * rec,page_zip_des_t * page_zip,ibool val,mtr_t * mtr)3343 btr_cur_set_deleted_flag_for_ibuf(
3344 /*==============================*/
3345 	rec_t*		rec,		/*!< in/out: record */
3346 	page_zip_des_t*	page_zip,	/*!< in/out: compressed page
3347 					corresponding to rec, or NULL
3348 					when the tablespace is
3349 					uncompressed */
3350 	ibool		val,		/*!< in: value to set */
3351 	mtr_t*		mtr)		/*!< in/out: mini-transaction */
3352 {
3353 	/* We do not need to reserve btr_search_latch, as the page
3354 	has just been read to the buffer pool and there cannot be
3355 	a hash index to it.  Besides, the delete-mark flag is being
3356 	updated in place and the adaptive hash index does not depend
3357 	on it. */
3358 
3359 	btr_rec_set_deleted_flag(rec, page_zip, val);
3360 
3361 	btr_cur_del_mark_set_sec_rec_log(rec, val, mtr);
3362 }
3363 
3364 /*==================== B-TREE RECORD REMOVE =========================*/
3365 
3366 /*************************************************************//**
3367 Tries to compress a page of the tree if it seems useful. It is assumed
3368 that mtr holds an x-latch on the tree and on the cursor page. To avoid
3369 deadlocks, mtr must also own x-latches to brothers of page, if those
3370 brothers exist. NOTE: it is assumed that the caller has reserved enough
3371 free extents so that the compression will always succeed if done!
3372 @return	TRUE if compression occurred */
3373 UNIV_INTERN
3374 ibool
btr_cur_compress_if_useful(btr_cur_t * cursor,ibool adjust,mtr_t * mtr)3375 btr_cur_compress_if_useful(
3376 /*=======================*/
3377 	btr_cur_t*	cursor,	/*!< in/out: cursor on the page to compress;
3378 				cursor does not stay valid if !adjust and
3379 				compression occurs */
3380 	ibool		adjust,	/*!< in: TRUE if should adjust the
3381 				cursor position even if compression occurs */
3382 	mtr_t*		mtr)	/*!< in/out: mini-transaction */
3383 {
3384 	ut_ad(mtr_memo_contains(mtr,
3385 				dict_index_get_lock(btr_cur_get_index(cursor)),
3386 				MTR_MEMO_X_LOCK));
3387 	ut_ad(mtr_memo_contains(mtr, btr_cur_get_block(cursor),
3388 				MTR_MEMO_PAGE_X_FIX));
3389 
3390 	return(btr_cur_compress_recommendation(cursor, mtr)
3391 	       && btr_compress(cursor, adjust, mtr));
3392 }
3393 
3394 /*******************************************************//**
3395 Removes the record on which the tree cursor is positioned on a leaf page.
3396 It is assumed that the mtr has an x-latch on the page where the cursor is
3397 positioned, but no latch on the whole tree.
3398 @return	TRUE if success, i.e., the page did not become too empty */
3399 UNIV_INTERN
3400 ibool
btr_cur_optimistic_delete_func(btr_cur_t * cursor,ulint flags,mtr_t * mtr)3401 btr_cur_optimistic_delete_func(
3402 /*===========================*/
3403 	btr_cur_t*	cursor,	/*!< in: cursor on leaf page, on the record to
3404 				delete; cursor stays valid: if deletion
3405 				succeeds, on function exit it points to the
3406 				successor of the deleted record */
3407 #ifdef UNIV_DEBUG
3408 	ulint		flags,	/*!< in: BTR_CREATE_FLAG or 0 */
3409 #endif /* UNIV_DEBUG */
3410 	mtr_t*		mtr)	/*!< in: mtr; if this function returns
3411 				TRUE on a leaf page of a secondary
3412 				index, the mtr must be committed
3413 				before latching any further pages */
3414 {
3415 	buf_block_t*	block;
3416 	rec_t*		rec;
3417 	mem_heap_t*	heap		= NULL;
3418 	ulint		offsets_[REC_OFFS_NORMAL_SIZE];
3419 	ulint*		offsets		= offsets_;
3420 	ibool		no_compress_needed;
3421 	rec_offs_init(offsets_);
3422 
3423 	ut_ad(flags == 0 || flags == BTR_CREATE_FLAG);
3424 	ut_ad(mtr_memo_contains(mtr, btr_cur_get_block(cursor),
3425 				MTR_MEMO_PAGE_X_FIX));
3426 	/* This is intended only for leaf page deletions */
3427 
3428 	block = btr_cur_get_block(cursor);
3429 
3430 	SRV_CORRUPT_TABLE_CHECK(block, return(DB_CORRUPTION););
3431 
3432 	ut_ad(page_is_leaf(buf_block_get_frame(block)));
3433 	ut_ad(!dict_index_is_online_ddl(cursor->index)
3434 	      || dict_index_is_clust(cursor->index)
3435 	      || (flags & BTR_CREATE_FLAG));
3436 
3437 	rec = btr_cur_get_rec(cursor);
3438 	offsets = rec_get_offsets(rec, cursor->index, offsets,
3439 				  ULINT_UNDEFINED, &heap);
3440 
3441 	no_compress_needed = !rec_offs_any_extern(offsets)
3442 		&& btr_cur_can_delete_without_compress(
3443 			cursor, rec_offs_size(offsets), mtr);
3444 
3445 	if (no_compress_needed) {
3446 
3447 		page_t*		page	= buf_block_get_frame(block);
3448 		page_zip_des_t*	page_zip= buf_block_get_page_zip(block);
3449 
3450 		lock_update_delete(block, rec);
3451 
3452 		btr_search_update_hash_on_delete(cursor);
3453 
3454 		if (page_zip) {
3455 #ifdef UNIV_ZIP_DEBUG
3456 			ut_a(page_zip_validate(page_zip, page, cursor->index));
3457 #endif /* UNIV_ZIP_DEBUG */
3458 			page_cur_delete_rec(btr_cur_get_page_cur(cursor),
3459 					    cursor->index, offsets, mtr);
3460 #ifdef UNIV_ZIP_DEBUG
3461 			ut_a(page_zip_validate(page_zip, page, cursor->index));
3462 #endif /* UNIV_ZIP_DEBUG */
3463 
3464 			/* On compressed pages, the IBUF_BITMAP_FREE
3465 			space is not affected by deleting (purging)
3466 			records, because it is defined as the minimum
3467 			of space available *without* reorganize, and
3468 			space available in the modification log. */
3469 		} else {
3470 			const ulint	max_ins
3471 				= page_get_max_insert_size_after_reorganize(
3472 					page, 1);
3473 
3474 			page_cur_delete_rec(btr_cur_get_page_cur(cursor),
3475 					    cursor->index, offsets, mtr);
3476 
3477 			/* The change buffer does not handle inserts
3478 			into non-leaf pages, into clustered indexes,
3479 			or into the change buffer. */
3480 			if (page_is_leaf(page)
3481 			    && !dict_index_is_clust(cursor->index)
3482 			    && !dict_index_is_ibuf(cursor->index)) {
3483 				ibuf_update_free_bits_low(block, max_ins, mtr);
3484 			}
3485 		}
3486 	}
3487 
3488 	if (UNIV_LIKELY_NULL(heap)) {
3489 		mem_heap_free(heap);
3490 	}
3491 
3492 	return(no_compress_needed);
3493 }
3494 
3495 /*************************************************************//**
3496 Removes the record on which the tree cursor is positioned. Tries
3497 to compress the page if its fillfactor drops below a threshold
3498 or if it is the only page on the level. It is assumed that mtr holds
3499 an x-latch on the tree and on the cursor page. To avoid deadlocks,
3500 mtr must also own x-latches to brothers of page, if those brothers
3501 exist.
3502 @return	TRUE if compression occurred */
3503 UNIV_INTERN
3504 ibool
btr_cur_pessimistic_delete(dberr_t * err,ibool has_reserved_extents,btr_cur_t * cursor,ulint flags,enum trx_rb_ctx rb_ctx,mtr_t * mtr)3505 btr_cur_pessimistic_delete(
3506 /*=======================*/
3507 	dberr_t*	err,	/*!< out: DB_SUCCESS or DB_OUT_OF_FILE_SPACE;
3508 				the latter may occur because we may have
3509 				to update node pointers on upper levels,
3510 				and in the case of variable length keys
3511 				these may actually grow in size */
3512 	ibool		has_reserved_extents, /*!< in: TRUE if the
3513 				caller has already reserved enough free
3514 				extents so that he knows that the operation
3515 				will succeed */
3516 	btr_cur_t*	cursor,	/*!< in: cursor on the record to delete;
3517 				if compression does not occur, the cursor
3518 				stays valid: it points to successor of
3519 				deleted record on function exit */
3520 	ulint		flags,	/*!< in: BTR_CREATE_FLAG or 0 */
3521 	enum trx_rb_ctx	rb_ctx,	/*!< in: rollback context */
3522 	mtr_t*		mtr)	/*!< in: mtr */
3523 {
3524 	buf_block_t*	block;
3525 	page_t*		page;
3526 	page_zip_des_t*	page_zip;
3527 	dict_index_t*	index;
3528 	rec_t*		rec;
3529 	ulint		n_reserved	= 0;
3530 	ibool		success;
3531 	ibool		ret		= FALSE;
3532 	ulint		level;
3533 	mem_heap_t*	heap;
3534 	ulint*		offsets;
3535 
3536 	block = btr_cur_get_block(cursor);
3537 	page = buf_block_get_frame(block);
3538 	index = btr_cur_get_index(cursor);
3539 
3540 	ut_ad(flags == 0 || flags == BTR_CREATE_FLAG);
3541 	ut_ad(!dict_index_is_online_ddl(index)
3542 	      || dict_index_is_clust(index)
3543 	      || (flags & BTR_CREATE_FLAG));
3544 	ut_ad(mtr_memo_contains(mtr, dict_index_get_lock(index),
3545 				MTR_MEMO_X_LOCK));
3546 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
3547 	if (!has_reserved_extents) {
3548 		/* First reserve enough free space for the file segments
3549 		of the index tree, so that the node pointer updates will
3550 		not fail because of lack of space */
3551 
3552 		ut_a(cursor->tree_height != ULINT_UNDEFINED);
3553 
3554 		ulint	n_extents = cursor->tree_height / 32 + 1;
3555 
3556 		success = fsp_reserve_free_extents(&n_reserved,
3557 						   index->space,
3558 						   n_extents,
3559 						   FSP_CLEANING, mtr);
3560 		if (!success) {
3561 			*err = DB_OUT_OF_FILE_SPACE;
3562 
3563 			return(FALSE);
3564 		}
3565 	}
3566 
3567 	heap = mem_heap_create(1024);
3568 	rec = btr_cur_get_rec(cursor);
3569 	page_zip = buf_block_get_page_zip(block);
3570 #ifdef UNIV_ZIP_DEBUG
3571 	ut_a(!page_zip || page_zip_validate(page_zip, page, index));
3572 #endif /* UNIV_ZIP_DEBUG */
3573 
3574 	offsets = rec_get_offsets(rec, index, NULL, ULINT_UNDEFINED, &heap);
3575 
3576 	if (rec_offs_any_extern(offsets)) {
3577 		btr_rec_free_externally_stored_fields(index,
3578 						      rec, offsets, page_zip,
3579 						      rb_ctx, mtr);
3580 #ifdef UNIV_ZIP_DEBUG
3581 		ut_a(!page_zip || page_zip_validate(page_zip, page, index));
3582 #endif /* UNIV_ZIP_DEBUG */
3583 	}
3584 
3585 	if (UNIV_UNLIKELY(page_get_n_recs(page) < 2)
3586 	    && UNIV_UNLIKELY(dict_index_get_page(index)
3587 			     != buf_block_get_page_no(block))) {
3588 
3589 		/* If there is only one record, drop the whole page in
3590 		btr_discard_page, if this is not the root page */
3591 
3592 		btr_discard_page(cursor, mtr);
3593 
3594 		ret = TRUE;
3595 
3596 		goto return_after_reservations;
3597 	}
3598 
3599 	if (flags == 0) {
3600 		lock_update_delete(block, rec);
3601 	}
3602 
3603 	level = btr_page_get_level(page, mtr);
3604 
3605 	if (level > 0
3606 	    && UNIV_UNLIKELY(rec == page_rec_get_next(
3607 				     page_get_infimum_rec(page)))) {
3608 
3609 		rec_t*	next_rec = page_rec_get_next(rec);
3610 
3611 		if (btr_page_get_prev(page, mtr) == FIL_NULL) {
3612 
3613 			/* If we delete the leftmost node pointer on a
3614 			non-leaf level, we must mark the new leftmost node
3615 			pointer as the predefined minimum record */
3616 
3617 			/* This will make page_zip_validate() fail until
3618 			page_cur_delete_rec() completes.  This is harmless,
3619 			because everything will take place within a single
3620 			mini-transaction and because writing to the redo log
3621 			is an atomic operation (performed by mtr_commit()). */
3622 			btr_set_min_rec_mark(next_rec, mtr);
3623 		} else {
3624 			/* Otherwise, if we delete the leftmost node pointer
3625 			on a page, we have to change the father node pointer
3626 			so that it is equal to the new leftmost node pointer
3627 			on the page */
3628 
3629 			btr_node_ptr_delete(index, block, mtr);
3630 
3631 			dtuple_t*	node_ptr = dict_index_build_node_ptr(
3632 				index, next_rec, buf_block_get_page_no(block),
3633 				heap, level);
3634 
3635 			btr_insert_on_non_leaf_level(
3636 				flags, index, level + 1, node_ptr, mtr);
3637 		}
3638 	}
3639 
3640 	btr_search_update_hash_on_delete(cursor);
3641 
3642 	page_cur_delete_rec(btr_cur_get_page_cur(cursor), index, offsets, mtr);
3643 #ifdef UNIV_ZIP_DEBUG
3644 	ut_a(!page_zip || page_zip_validate(page_zip, page, index));
3645 #endif /* UNIV_ZIP_DEBUG */
3646 
3647 	ut_ad(btr_check_node_ptr(index, block, mtr));
3648 
3649 return_after_reservations:
3650 	*err = DB_SUCCESS;
3651 
3652 	mem_heap_free(heap);
3653 
3654 	if (ret == FALSE) {
3655 		ret = btr_cur_compress_if_useful(cursor, FALSE, mtr);
3656 	}
3657 
3658 	if (n_reserved > 0) {
3659 		fil_space_release_free_extents(index->space, n_reserved);
3660 	}
3661 
3662 	return(ret);
3663 }
3664 
3665 /*******************************************************************//**
3666 Adds path information to the cursor for the current page, for which
3667 the binary search has been performed. */
3668 static
3669 void
btr_cur_add_path_info(btr_cur_t * cursor,ulint height,ulint root_height)3670 btr_cur_add_path_info(
3671 /*==================*/
3672 	btr_cur_t*	cursor,		/*!< in: cursor positioned on a page */
3673 	ulint		height,		/*!< in: height of the page in tree;
3674 					0 means leaf node */
3675 	ulint		root_height)	/*!< in: root node height in tree */
3676 {
3677 	btr_path_t*	slot;
3678 	const rec_t*	rec;
3679 	const page_t*	page;
3680 
3681 	ut_a(cursor->path_arr);
3682 
3683 	if (root_height >= BTR_PATH_ARRAY_N_SLOTS - 1) {
3684 		/* Do nothing; return empty path */
3685 
3686 		slot = cursor->path_arr;
3687 		slot->nth_rec = ULINT_UNDEFINED;
3688 
3689 		return;
3690 	}
3691 
3692 	if (height == 0) {
3693 		/* Mark end of slots for path */
3694 		slot = cursor->path_arr + root_height + 1;
3695 		slot->nth_rec = ULINT_UNDEFINED;
3696 	}
3697 
3698 	rec = btr_cur_get_rec(cursor);
3699 
3700 	slot = cursor->path_arr + (root_height - height);
3701 
3702 	page = page_align(rec);
3703 
3704 	slot->nth_rec = page_rec_get_n_recs_before(rec);
3705 	slot->n_recs = page_get_n_recs(page);
3706 	slot->page_no = page_get_page_no(page);
3707 	slot->page_level = btr_page_get_level_low(page);
3708 }
3709 
3710 /*******************************************************************//**
3711 Estimate the number of rows between slot1 and slot2 for any level on a
3712 B-tree. This function starts from slot1->page and reads a few pages to
3713 the right, counting their records. If we reach slot2->page quickly then
3714 we know exactly how many records there are between slot1 and slot2 and
3715 we set is_n_rows_exact to TRUE. If we cannot reach slot2->page quickly
3716 then we calculate the average number of records in the pages scanned
3717 so far and assume that all pages that we did not scan up to slot2->page
3718 contain the same number of records, then we multiply that average to
3719 the number of pages between slot1->page and slot2->page (which is
3720 n_rows_on_prev_level). In this case we set is_n_rows_exact to FALSE.
3721 @return	number of rows (exact or estimated) */
3722 static
3723 ib_int64_t
btr_estimate_n_rows_in_range_on_level(dict_index_t * index,btr_path_t * slot1,btr_path_t * slot2,ib_int64_t n_rows_on_prev_level,ibool * is_n_rows_exact)3724 btr_estimate_n_rows_in_range_on_level(
3725 /*==================================*/
3726 	dict_index_t*	index,			/*!< in: index */
3727 	btr_path_t*	slot1,			/*!< in: left border */
3728 	btr_path_t*	slot2,			/*!< in: right border */
3729 	ib_int64_t	n_rows_on_prev_level,	/*!< in: number of rows
3730 						on the previous level for the
3731 						same descend paths; used to
3732 						determine the numbe of pages
3733 						on this level */
3734 	ibool*		is_n_rows_exact)	/*!< out: TRUE if the returned
3735 						value is exact i.e. not an
3736 						estimation */
3737 {
3738 	ulint		space;
3739 	ib_int64_t	n_rows;
3740 	ulint		n_pages_read;
3741 	ulint		page_no;
3742 	ulint		zip_size;
3743 	ulint		level;
3744 
3745 	space = dict_index_get_space(index);
3746 
3747 	n_rows = 0;
3748 	n_pages_read = 0;
3749 
3750 	/* Assume by default that we will scan all pages between
3751 	slot1->page_no and slot2->page_no */
3752 	*is_n_rows_exact = TRUE;
3753 
3754 	/* add records from slot1->page_no which are to the right of
3755 	the record which serves as a left border of the range, if any */
3756 	if (slot1->nth_rec < slot1->n_recs) {
3757 		n_rows += slot1->n_recs - slot1->nth_rec;
3758 	}
3759 
3760 	/* add records from slot2->page_no which are to the left of
3761 	the record which servers as a right border of the range, if any */
3762 	if (slot2->nth_rec > 1) {
3763 		n_rows += slot2->nth_rec - 1;
3764 	}
3765 
3766 	/* count the records in the pages between slot1->page_no and
3767 	slot2->page_no (non inclusive), if any */
3768 
3769 	zip_size = fil_space_get_zip_size(space);
3770 
3771 	/* Do not read more than this number of pages in order not to hurt
3772 	performance with this code which is just an estimation. If we read
3773 	this many pages before reaching slot2->page_no then we estimate the
3774 	average from the pages scanned so far */
3775 #	define N_PAGES_READ_LIMIT	10
3776 
3777 	page_no = slot1->page_no;
3778 	level = slot1->page_level;
3779 
3780 	do {
3781 		mtr_t		mtr;
3782 		page_t*		page;
3783 		buf_block_t*	block;
3784 
3785 		mtr_start(&mtr);
3786 
3787 		/* Fetch the page. Because we are not holding the
3788 		index->lock, the tree may have changed and we may be
3789 		attempting to read a page that is no longer part of
3790 		the B-tree. We pass BUF_GET_POSSIBLY_FREED in order to
3791 		silence a debug assertion about this. */
3792 		block = buf_page_get_gen(space, zip_size, page_no, RW_S_LATCH,
3793 					 NULL, BUF_GET_POSSIBLY_FREED,
3794 					 __FILE__, __LINE__, &mtr);
3795 
3796 		page = buf_block_get_frame(block);
3797 
3798 		/* It is possible that the tree has been reorganized in the
3799 		meantime and this is a different page. If this happens the
3800 		calculated estimate will be bogus, which is not fatal as
3801 		this is only an estimate. We are sure that a page with
3802 		page_no exists because InnoDB never frees pages, only
3803 		reuses them. */
3804 		if (fil_page_get_type(page) != FIL_PAGE_INDEX
3805 		    || btr_page_get_index_id(page) != index->id
3806 		    || btr_page_get_level_low(page) != level) {
3807 
3808 			/* The page got reused for something else */
3809 			mtr_commit(&mtr);
3810 			goto inexact;
3811 		}
3812 
3813 		/* It is possible but highly unlikely that the page was
3814 		originally written by an old version of InnoDB that did
3815 		not initialize FIL_PAGE_TYPE on other than B-tree pages.
3816 		For example, this could be an almost-empty BLOB page
3817 		that happens to contain the magic values in the fields
3818 		that we checked above. */
3819 
3820 		n_pages_read++;
3821 
3822 		if (page_no != slot1->page_no) {
3823 			/* Do not count the records on slot1->page_no,
3824 			we already counted them before this loop. */
3825 			n_rows += page_get_n_recs(page);
3826 		}
3827 
3828 		page_no = btr_page_get_next(page, &mtr);
3829 
3830 		mtr_commit(&mtr);
3831 
3832 		if (n_pages_read == N_PAGES_READ_LIMIT
3833 		    || page_no == FIL_NULL) {
3834 			/* Either we read too many pages or
3835 			we reached the end of the level without passing
3836 			through slot2->page_no, the tree must have changed
3837 			in the meantime */
3838 			goto inexact;
3839 		}
3840 
3841 	} while (page_no != slot2->page_no);
3842 
3843 	return(n_rows);
3844 
3845 inexact:
3846 
3847 	*is_n_rows_exact = FALSE;
3848 
3849 	/* We did interrupt before reaching slot2->page */
3850 
3851 	if (n_pages_read > 0) {
3852 		/* The number of pages on this level is
3853 		n_rows_on_prev_level, multiply it by the
3854 		average number of recs per page so far */
3855 		n_rows = n_rows_on_prev_level
3856 			* n_rows / n_pages_read;
3857 	} else {
3858 		/* The tree changed before we could even
3859 		start with slot1->page_no */
3860 		n_rows = 10;
3861 	}
3862 
3863 	return(n_rows);
3864 }
3865 
3866 /** If the tree gets changed too much between the two dives for the left
3867 and right boundary then btr_estimate_n_rows_in_range_low() will retry
3868 that many times before giving up and returning the value stored in
3869 rows_in_range_arbitrary_ret_val. */
3870 static const unsigned	rows_in_range_max_retries = 4;
3871 
3872 /** We pretend that a range has that many records if the tree keeps changing
3873 for rows_in_range_max_retries retries while we try to estimate the records
3874 in a given range. */
3875 static const int64_t	rows_in_range_arbitrary_ret_val = 10;
3876 
3877 /** Estimates the number of rows in a given index range.
3878 @param[in]	index		index
3879 @param[in]	tuple1		range start, may also be empty tuple
3880 @param[in]	mode1		search mode for range start
3881 @param[in]	tuple2		range end, may also be empty tuple
3882 @param[in]	mode2		search mode for range end
3883 @param[in]	nth_attempt	if the tree gets modified too much while
3884 we are trying to analyze it, then we will retry (this function will call
3885 itself, incrementing this parameter)
3886 @return estimated number of rows; if after rows_in_range_max_retries
3887 retries the tree keeps changing, then we will just return
3888 rows_in_range_arbitrary_ret_val as a result (if
3889 nth_attempt >= rows_in_range_max_retries and the tree is modified between
3890 the two dives). */
3891 static
3892 int64_t
btr_estimate_n_rows_in_range_low(dict_index_t * index,const dtuple_t * tuple1,ulint mode1,const dtuple_t * tuple2,ulint mode2,unsigned nth_attempt)3893 btr_estimate_n_rows_in_range_low(
3894 	dict_index_t*	index,
3895 	const dtuple_t*	tuple1,
3896 	ulint		mode1,
3897 	const dtuple_t*	tuple2,
3898 	ulint		mode2,
3899 	unsigned	nth_attempt)
3900 {
3901 	btr_path_t	path1[BTR_PATH_ARRAY_N_SLOTS];
3902 	btr_path_t	path2[BTR_PATH_ARRAY_N_SLOTS];
3903 	btr_cur_t	cursor;
3904 	btr_path_t*	slot1;
3905 	btr_path_t*	slot2;
3906 	ibool		diverged;
3907 	ibool		diverged_lot;
3908 	ulint		divergence_level;
3909 	ib_int64_t	n_rows;
3910 	ibool		is_n_rows_exact;
3911 	ulint		i;
3912 	mtr_t		mtr;
3913 	ib_int64_t	table_n_rows;
3914 
3915 	table_n_rows = dict_table_get_n_rows(index->table);
3916 
3917 	mtr_start(&mtr);
3918 
3919 	cursor.path_arr = path1;
3920 
3921 	if (dtuple_get_n_fields(tuple1) > 0) {
3922 
3923 		btr_cur_search_to_nth_level(index, 0, tuple1, mode1,
3924 					    BTR_SEARCH_LEAF | BTR_ESTIMATE,
3925 					    &cursor, 0,
3926 					    __FILE__, __LINE__, &mtr);
3927 	} else {
3928 		btr_cur_open_at_index_side(true, index,
3929 					   BTR_SEARCH_LEAF | BTR_ESTIMATE,
3930 					   &cursor, 0, &mtr);
3931 	}
3932 
3933 	mtr_commit(&mtr);
3934 
3935 #ifdef UNIV_DEBUG
3936 	if (!strcmp(index->name, "iC")) {
3937 		DEBUG_SYNC_C("btr_estimate_n_rows_in_range_between_dives");
3938 	}
3939 #endif
3940 
3941 	mtr_start(&mtr);
3942 
3943 	cursor.path_arr = path2;
3944 
3945 	if (dtuple_get_n_fields(tuple2) > 0) {
3946 
3947 		btr_cur_search_to_nth_level(index, 0, tuple2, mode2,
3948 					    BTR_SEARCH_LEAF | BTR_ESTIMATE,
3949 					    &cursor, 0,
3950 					    __FILE__, __LINE__, &mtr);
3951 	} else {
3952 		btr_cur_open_at_index_side(false, index,
3953 					   BTR_SEARCH_LEAF | BTR_ESTIMATE,
3954 					   &cursor, 0, &mtr);
3955 	}
3956 
3957 	mtr_commit(&mtr);
3958 
3959 	/* We have the path information for the range in path1 and path2 */
3960 
3961 	n_rows = 1;
3962 	is_n_rows_exact = TRUE;
3963 	diverged = FALSE;	    /* This becomes true when the path is not
3964 				    the same any more */
3965 	diverged_lot = FALSE;	    /* This becomes true when the paths are
3966 				    not the same or adjacent any more */
3967 	divergence_level = 1000000; /* This is the level where paths diverged
3968 				    a lot */
3969 	for (i = 0; ; i++) {
3970 		ut_ad(i < BTR_PATH_ARRAY_N_SLOTS);
3971 
3972 		slot1 = path1 + i;
3973 		slot2 = path2 + i;
3974 
3975 		if (slot1->nth_rec == ULINT_UNDEFINED
3976 		    || slot2->nth_rec == ULINT_UNDEFINED) {
3977 
3978 			if (i > divergence_level + 1 && !is_n_rows_exact) {
3979 				/* In trees whose height is > 1 our algorithm
3980 				tends to underestimate: multiply the estimate
3981 				by 2: */
3982 
3983 				n_rows = n_rows * 2;
3984 			}
3985 
3986 			DBUG_EXECUTE_IF("bug14007649", return(n_rows););
3987 
3988 			/* Do not estimate the number of rows in the range
3989 			to over 1 / 2 of the estimated rows in the whole
3990 			table */
3991 
3992 			if (n_rows > table_n_rows / 2 && !is_n_rows_exact) {
3993 
3994 				n_rows = table_n_rows / 2;
3995 
3996 				/* If there are just 0 or 1 rows in the table,
3997 				then we estimate all rows are in the range */
3998 
3999 				if (n_rows == 0) {
4000 					n_rows = table_n_rows;
4001 				}
4002 			}
4003 
4004 			return(n_rows);
4005 		}
4006 
4007 		if (!diverged && slot1->nth_rec != slot2->nth_rec) {
4008 
4009 			/* If both slots do not point to the same page or if
4010 			the paths have crossed and the same page on both
4011 			apparently contains a different number of records,
4012 			this means that the tree must have changed between
4013 			the dive for slot1 and the dive for slot2 at the
4014 			beginning of this function. */
4015 			if (slot1->page_no != slot2->page_no
4016 			    || slot1->page_level != slot2->page_level
4017 			    || (slot1->nth_rec >= slot2->nth_rec
4018 				&& slot1->n_recs != slot2->n_recs)) {
4019 
4020 				/* If the tree keeps changing even after a
4021 				few attempts, then just return some arbitrary
4022 				number. */
4023 				if (nth_attempt >= rows_in_range_max_retries) {
4024 					return(rows_in_range_arbitrary_ret_val);
4025 				}
4026 
4027 				const int64_t	ret =
4028 					btr_estimate_n_rows_in_range_low(
4029 						index, tuple1, mode1,
4030 						tuple2, mode2, nth_attempt + 1);
4031 
4032 				return(ret);
4033 			}
4034 
4035 			diverged = TRUE;
4036 
4037 			if (slot1->nth_rec < slot2->nth_rec) {
4038 				n_rows = slot2->nth_rec - slot1->nth_rec;
4039 
4040 				if (n_rows > 1) {
4041 					diverged_lot = TRUE;
4042 					divergence_level = i;
4043 				}
4044 			} else {
4045 				/* It is possible that
4046 				slot1->nth_rec >= slot2->nth_rec
4047 				if, for example, we have a single page
4048 				tree which contains (inf, 5, 6, supr)
4049 				and we select where x > 20 and x < 30;
4050 				in this case slot1->nth_rec will point
4051 				to the supr record and slot2->nth_rec
4052 				will point to 6 */
4053 				return(0);
4054 			}
4055 
4056 		} else if (diverged && !diverged_lot) {
4057 
4058 			if (slot1->nth_rec < slot1->n_recs
4059 			    || slot2->nth_rec > 1) {
4060 
4061 				diverged_lot = TRUE;
4062 				divergence_level = i;
4063 
4064 				n_rows = 0;
4065 
4066 				if (slot1->nth_rec < slot1->n_recs) {
4067 					n_rows += slot1->n_recs
4068 						- slot1->nth_rec;
4069 				}
4070 
4071 				if (slot2->nth_rec > 1) {
4072 					n_rows += slot2->nth_rec - 1;
4073 				}
4074 			}
4075 		} else if (diverged_lot) {
4076 
4077 			n_rows = btr_estimate_n_rows_in_range_on_level(
4078 				index, slot1, slot2, n_rows,
4079 				&is_n_rows_exact);
4080 		}
4081 	}
4082 }
4083 
4084 /** Estimates the number of rows in a given index range.
4085 @param[in]	index	index
4086 @param[in]	tuple1	range start, may also be empty tuple
4087 @param[in]	mode1	search mode for range start
4088 @param[in]	tuple2	range end, may also be empty tuple
4089 @param[in]	mode2	search mode for range end
4090 @return estimated number of rows */
4091 int64_t
btr_estimate_n_rows_in_range(dict_index_t * index,const dtuple_t * tuple1,ulint mode1,const dtuple_t * tuple2,ulint mode2)4092 btr_estimate_n_rows_in_range(
4093 	dict_index_t*	index,
4094 	const dtuple_t*	tuple1,
4095 	ulint		mode1,
4096 	const dtuple_t*	tuple2,
4097 	ulint		mode2)
4098 {
4099 	const int64_t	ret = btr_estimate_n_rows_in_range_low(
4100 		index, tuple1, mode1, tuple2, mode2, 1 /* first attempt */);
4101 
4102 	return(ret);
4103 }
4104 
4105 /*******************************************************************//**
4106 Record the number of non_null key values in a given index for
4107 each n-column prefix of the index where 1 <= n <= dict_index_get_n_unique(index).
4108 The estimates are eventually stored in the array:
4109 index->stat_n_non_null_key_vals[], which is indexed from 0 to n-1. */
4110 static
4111 void
btr_record_not_null_field_in_rec(ulint n_unique,const ulint * offsets,ib_uint64_t * n_not_null)4112 btr_record_not_null_field_in_rec(
4113 /*=============================*/
4114 	ulint		n_unique,	/*!< in: dict_index_get_n_unique(index),
4115 					number of columns uniquely determine
4116 					an index entry */
4117 	const ulint*	offsets,	/*!< in: rec_get_offsets(rec, index),
4118 					its size could be for all fields or
4119 					that of "n_unique" */
4120 	ib_uint64_t*	n_not_null)	/*!< in/out: array to record number of
4121 					not null rows for n-column prefix */
4122 {
4123 	ulint	i;
4124 
4125 	ut_ad(rec_offs_n_fields(offsets) >= n_unique);
4126 
4127 	if (n_not_null == NULL) {
4128 		return;
4129 	}
4130 
4131 	for (i = 0; i < n_unique; i++) {
4132 		if (rec_offs_nth_sql_null(offsets, i)) {
4133 			break;
4134 		}
4135 
4136 		n_not_null[i]++;
4137 	}
4138 }
4139 
4140 /*******************************************************************//**
4141 Estimates the number of different key values in a given index, for
4142 each n-column prefix of the index where 1 <= n <= dict_index_get_n_unique(index).
4143 The estimates are stored in the array index->stat_n_diff_key_vals[] (indexed
4144 0..n_uniq-1) and the number of pages that were sampled is saved in
4145 index->stat_n_sample_sizes[].
4146 If innodb_stats_method is nulls_ignored, we also record the number of
4147 non-null values for each prefix and stored the estimates in
4148 array index->stat_n_non_null_key_vals. */
4149 UNIV_INTERN
4150 void
btr_estimate_number_of_different_key_vals(dict_index_t * index)4151 btr_estimate_number_of_different_key_vals(
4152 /*======================================*/
4153 	dict_index_t*	index)	/*!< in: index */
4154 {
4155 	btr_cur_t	cursor;
4156 	page_t*		page;
4157 	rec_t*		rec;
4158 	ulint		n_cols;
4159 	ulint		matched_fields;
4160 	ulint		matched_bytes;
4161 	ib_uint64_t*	n_diff;
4162 	ib_uint64_t*	n_not_null;
4163 	ibool		stats_null_not_equal;
4164 	ullint		n_sample_pages; /* number of pages to sample */
4165 	ulint		not_empty_flag	= 0;
4166 	ulint		total_external_size = 0;
4167 	ulint		i;
4168 	ulint		j;
4169 	ullint		add_on;
4170 	mtr_t		mtr;
4171 	mem_heap_t*	heap		= NULL;
4172 	ulint*		offsets_rec	= NULL;
4173 	ulint*		offsets_next_rec = NULL;
4174 
4175 	n_cols = dict_index_get_n_unique(index);
4176 
4177 	heap = mem_heap_create((sizeof *n_diff + sizeof *n_not_null)
4178 			       * n_cols
4179 			       + dict_index_get_n_fields(index)
4180 			       * (sizeof *offsets_rec
4181 				  + sizeof *offsets_next_rec));
4182 
4183 	n_diff = (ib_uint64_t*) mem_heap_zalloc(
4184 		heap, n_cols * sizeof(ib_int64_t));
4185 
4186 	n_not_null = NULL;
4187 
4188 	/* Check srv_innodb_stats_method setting, and decide whether we
4189 	need to record non-null value and also decide if NULL is
4190 	considered equal (by setting stats_null_not_equal value) */
4191 	switch (srv_innodb_stats_method) {
4192 	case SRV_STATS_NULLS_IGNORED:
4193 		n_not_null = (ib_uint64_t*) mem_heap_zalloc(
4194 			heap, n_cols * sizeof *n_not_null);
4195 		/* fall through */
4196 
4197 	case SRV_STATS_NULLS_UNEQUAL:
4198 		/* for both SRV_STATS_NULLS_IGNORED and SRV_STATS_NULLS_UNEQUAL
4199 		case, we will treat NULLs as unequal value */
4200 		stats_null_not_equal = TRUE;
4201 		break;
4202 
4203 	case SRV_STATS_NULLS_EQUAL:
4204 		stats_null_not_equal = FALSE;
4205 		break;
4206 
4207 	default:
4208 		ut_error;
4209         }
4210 
4211 	/* It makes no sense to test more pages than are contained
4212 	in the index, thus we lower the number if it is too high */
4213 	if (srv_stats_transient_sample_pages > index->stat_index_size) {
4214 		if (index->stat_index_size > 0) {
4215 			n_sample_pages = index->stat_index_size;
4216 		} else {
4217 			n_sample_pages = 1;
4218 		}
4219 	} else {
4220 		n_sample_pages = srv_stats_transient_sample_pages;
4221 	}
4222 
4223 	/* We sample some pages in the index to get an estimate */
4224 
4225 	for (i = 0; i < n_sample_pages; i++) {
4226 		mtr_start(&mtr);
4227 
4228 		btr_cur_open_at_rnd_pos(index, BTR_SEARCH_LEAF, &cursor, &mtr);
4229 
4230 		/* Count the number of different key values for each prefix of
4231 		the key on this index page. If the prefix does not determine
4232 		the index record uniquely in the B-tree, then we subtract one
4233 		because otherwise our algorithm would give a wrong estimate
4234 		for an index where there is just one key value. */
4235 
4236 		page = btr_cur_get_page(&cursor);
4237 
4238 		SRV_CORRUPT_TABLE_CHECK(page, goto exit_loop;);
4239 		DBUG_EXECUTE_IF("ib_corrupt_page_while_stats_calc",
4240 				page = NULL;);
4241 
4242 		SRV_CORRUPT_TABLE_CHECK(page,
4243 		{
4244 			mtr_commit(&mtr);
4245 			goto exit_loop;
4246 		});
4247 
4248 		rec = page_rec_get_next(page_get_infimum_rec(page));
4249 
4250 		if (!page_rec_is_supremum(rec)) {
4251 			not_empty_flag = 1;
4252 			offsets_rec = rec_get_offsets(rec, index, offsets_rec,
4253 						      ULINT_UNDEFINED, &heap);
4254 
4255 			if (n_not_null != NULL) {
4256 				btr_record_not_null_field_in_rec(
4257 					n_cols, offsets_rec, n_not_null);
4258 			}
4259 		}
4260 
4261 		while (!page_rec_is_supremum(rec)) {
4262 			rec_t*	next_rec = page_rec_get_next(rec);
4263 			if (page_rec_is_supremum(next_rec)) {
4264 				total_external_size +=
4265 					btr_rec_get_externally_stored_len(
4266 						rec, offsets_rec);
4267 				break;
4268 			}
4269 
4270 			matched_fields = 0;
4271 			matched_bytes = 0;
4272 			offsets_next_rec = rec_get_offsets(next_rec, index,
4273 							   offsets_next_rec,
4274 							   ULINT_UNDEFINED,
4275 							   &heap);
4276 
4277 			cmp_rec_rec_with_match(rec, next_rec,
4278 					       offsets_rec, offsets_next_rec,
4279 					       index, stats_null_not_equal,
4280 					       &matched_fields,
4281 					       &matched_bytes);
4282 
4283 			for (j = matched_fields; j < n_cols; j++) {
4284 				/* We add one if this index record has
4285 				a different prefix from the previous */
4286 
4287 				n_diff[j]++;
4288 			}
4289 
4290 			if (n_not_null != NULL) {
4291 				btr_record_not_null_field_in_rec(
4292 					n_cols, offsets_next_rec, n_not_null);
4293 			}
4294 
4295 			total_external_size
4296 				+= btr_rec_get_externally_stored_len(
4297 					rec, offsets_rec);
4298 
4299 			rec = next_rec;
4300 			/* Initialize offsets_rec for the next round
4301 			and assign the old offsets_rec buffer to
4302 			offsets_next_rec. */
4303 			{
4304 				ulint*	offsets_tmp = offsets_rec;
4305 				offsets_rec = offsets_next_rec;
4306 				offsets_next_rec = offsets_tmp;
4307 			}
4308 		}
4309 
4310 
4311 		if (n_cols == dict_index_get_n_unique_in_tree(index)) {
4312 
4313 			/* If there is more than one leaf page in the tree,
4314 			we add one because we know that the first record
4315 			on the page certainly had a different prefix than the
4316 			last record on the previous index page in the
4317 			alphabetical order. Before this fix, if there was
4318 			just one big record on each clustered index page, the
4319 			algorithm grossly underestimated the number of rows
4320 			in the table. */
4321 
4322 			if (btr_page_get_prev(page, &mtr) != FIL_NULL
4323 			    || btr_page_get_next(page, &mtr) != FIL_NULL) {
4324 
4325 				n_diff[n_cols - 1]++;
4326 			}
4327 		}
4328 
4329 		mtr_commit(&mtr);
4330 	}
4331 
4332 exit_loop:
4333 	/* If we saw k borders between different key values on
4334 	n_sample_pages leaf pages, we can estimate how many
4335 	there will be in index->stat_n_leaf_pages */
4336 
4337 	/* We must take into account that our sample actually represents
4338 	also the pages used for external storage of fields (those pages are
4339 	included in index->stat_n_leaf_pages) */
4340 
4341 	for (j = 0; j < n_cols; j++) {
4342 		index->stat_n_diff_key_vals[j]
4343 			= BTR_TABLE_STATS_FROM_SAMPLE(
4344 				n_diff[j], index, n_sample_pages,
4345 				total_external_size, not_empty_flag);
4346 
4347 		/* If the tree is small, smaller than
4348 		10 * n_sample_pages + total_external_size, then
4349 		the above estimate is ok. For bigger trees it is common that we
4350 		do not see any borders between key values in the few pages
4351 		we pick. But still there may be n_sample_pages
4352 		different key values, or even more. Let us try to approximate
4353 		that: */
4354 
4355 		add_on = index->stat_n_leaf_pages
4356 			/ (10 * (n_sample_pages
4357 				 + total_external_size));
4358 
4359 		if (add_on > n_sample_pages) {
4360 			add_on = n_sample_pages;
4361 		}
4362 
4363 		index->stat_n_diff_key_vals[j] += add_on;
4364 
4365 		index->stat_n_sample_sizes[j] = n_sample_pages;
4366 
4367 		/* Update the stat_n_non_null_key_vals[] with our
4368 		sampled result. stat_n_non_null_key_vals[] is created
4369 		and initialized to zero in dict_index_add_to_cache(),
4370 		along with stat_n_diff_key_vals[] array */
4371 		if (n_not_null != NULL) {
4372 			index->stat_n_non_null_key_vals[j] =
4373 				 BTR_TABLE_STATS_FROM_SAMPLE(
4374 					n_not_null[j], index, n_sample_pages,
4375 					total_external_size, not_empty_flag);
4376 		}
4377 	}
4378 
4379 	mem_heap_free(heap);
4380 }
4381 
4382 /*================== EXTERNAL STORAGE OF BIG FIELDS ===================*/
4383 
4384 /***********************************************************//**
4385 Gets the offset of the pointer to the externally stored part of a field.
4386 @return	offset of the pointer to the externally stored part */
4387 static
4388 ulint
btr_rec_get_field_ref_offs(const ulint * offsets,ulint n)4389 btr_rec_get_field_ref_offs(
4390 /*=======================*/
4391 	const ulint*	offsets,/*!< in: array returned by rec_get_offsets() */
4392 	ulint		n)	/*!< in: index of the external field */
4393 {
4394 	ulint	field_ref_offs;
4395 	ulint	local_len;
4396 
4397 	ut_a(rec_offs_nth_extern(offsets, n));
4398 	field_ref_offs = rec_get_nth_field_offs(offsets, n, &local_len);
4399 	ut_a(local_len != UNIV_SQL_NULL);
4400 	ut_a(local_len >= BTR_EXTERN_FIELD_REF_SIZE);
4401 
4402 	return(field_ref_offs + local_len - BTR_EXTERN_FIELD_REF_SIZE);
4403 }
4404 
4405 /** Gets a pointer to the externally stored part of a field.
4406 @param rec	record
4407 @param offsets	rec_get_offsets(rec)
4408 @param n	index of the externally stored field
4409 @return pointer to the externally stored part */
4410 #define btr_rec_get_field_ref(rec, offsets, n)			\
4411 	((rec) + btr_rec_get_field_ref_offs(offsets, n))
4412 
4413 /** Gets the externally stored size of a record, in units of a database page.
4414 @param[in]	rec	record
4415 @param[in]	offsets	array returned by rec_get_offsets()
4416 @return	externally stored part, in units of a database page */
4417 
4418 ulint
btr_rec_get_externally_stored_len(const rec_t * rec,const ulint * offsets)4419 btr_rec_get_externally_stored_len(
4420 	const rec_t*	rec,
4421 	const ulint*	offsets)
4422 {
4423 	ulint	n_fields;
4424 	ulint	total_extern_len = 0;
4425 	ulint	i;
4426 
4427 	ut_ad(!rec_offs_comp(offsets) || !rec_get_node_ptr_flag(rec));
4428 
4429 	if (!rec_offs_any_extern(offsets)) {
4430 		return(0);
4431 	}
4432 
4433 	n_fields = rec_offs_n_fields(offsets);
4434 
4435 	for (i = 0; i < n_fields; i++) {
4436 		if (rec_offs_nth_extern(offsets, i)) {
4437 
4438 			ulint	extern_len = mach_read_from_4(
4439 				btr_rec_get_field_ref(rec, offsets, i)
4440 				+ BTR_EXTERN_LEN + 4);
4441 
4442 			total_extern_len += ut_calc_align(extern_len,
4443 							  UNIV_PAGE_SIZE);
4444 		}
4445 	}
4446 
4447 	return(total_extern_len / UNIV_PAGE_SIZE);
4448 }
4449 
4450 /*******************************************************************//**
4451 Sets the ownership bit of an externally stored field in a record. */
4452 static
4453 void
btr_cur_set_ownership_of_extern_field(page_zip_des_t * page_zip,rec_t * rec,dict_index_t * index,const ulint * offsets,ulint i,ibool val,mtr_t * mtr)4454 btr_cur_set_ownership_of_extern_field(
4455 /*==================================*/
4456 	page_zip_des_t*	page_zip,/*!< in/out: compressed page whose uncompressed
4457 				part will be updated, or NULL */
4458 	rec_t*		rec,	/*!< in/out: clustered index record */
4459 	dict_index_t*	index,	/*!< in: index of the page */
4460 	const ulint*	offsets,/*!< in: array returned by rec_get_offsets() */
4461 	ulint		i,	/*!< in: field number */
4462 	ibool		val,	/*!< in: value to set */
4463 	mtr_t*		mtr)	/*!< in: mtr, or NULL if not logged */
4464 {
4465 	byte*	data;
4466 	ulint	local_len;
4467 	ulint	byte_val;
4468 
4469 	data = rec_get_nth_field(rec, offsets, i, &local_len);
4470 	ut_ad(rec_offs_nth_extern(offsets, i));
4471 	ut_a(local_len >= BTR_EXTERN_FIELD_REF_SIZE);
4472 
4473 	local_len -= BTR_EXTERN_FIELD_REF_SIZE;
4474 
4475 	byte_val = mach_read_from_1(data + local_len + BTR_EXTERN_LEN);
4476 
4477 	if (val) {
4478 		byte_val = byte_val & (~BTR_EXTERN_OWNER_FLAG);
4479 	} else {
4480 #if defined UNIV_DEBUG || defined UNIV_BLOB_LIGHT_DEBUG
4481 		ut_a(!(byte_val & BTR_EXTERN_OWNER_FLAG));
4482 #endif /* UNIV_DEBUG || UNIV_BLOB_LIGHT_DEBUG */
4483 		byte_val = byte_val | BTR_EXTERN_OWNER_FLAG;
4484 	}
4485 
4486 	if (page_zip) {
4487 		mach_write_to_1(data + local_len + BTR_EXTERN_LEN, byte_val);
4488 		page_zip_write_blob_ptr(page_zip, rec, index, offsets, i, mtr);
4489 	} else if (mtr != NULL) {
4490 
4491 		mlog_write_ulint(data + local_len + BTR_EXTERN_LEN, byte_val,
4492 				 MLOG_1BYTE, mtr);
4493 	} else {
4494 		mach_write_to_1(data + local_len + BTR_EXTERN_LEN, byte_val);
4495 	}
4496 
4497 	btr_blob_dbg_owner(rec, index, offsets, i, val);
4498 }
4499 
4500 /*******************************************************************//**
4501 Marks non-updated off-page fields as disowned by this record. The ownership
4502 must be transferred to the updated record which is inserted elsewhere in the
4503 index tree. In purge only the owner of externally stored field is allowed
4504 to free the field. */
4505 UNIV_INTERN
4506 void
btr_cur_disown_inherited_fields(page_zip_des_t * page_zip,rec_t * rec,dict_index_t * index,const ulint * offsets,const upd_t * update,mtr_t * mtr)4507 btr_cur_disown_inherited_fields(
4508 /*============================*/
4509 	page_zip_des_t*	page_zip,/*!< in/out: compressed page whose uncompressed
4510 				part will be updated, or NULL */
4511 	rec_t*		rec,	/*!< in/out: record in a clustered index */
4512 	dict_index_t*	index,	/*!< in: index of the page */
4513 	const ulint*	offsets,/*!< in: array returned by rec_get_offsets() */
4514 	const upd_t*	update,	/*!< in: update vector */
4515 	mtr_t*		mtr)	/*!< in/out: mini-transaction */
4516 {
4517 	ulint	i;
4518 
4519 	ut_ad(rec_offs_validate(rec, index, offsets));
4520 	ut_ad(!rec_offs_comp(offsets) || !rec_get_node_ptr_flag(rec));
4521 	ut_ad(rec_offs_any_extern(offsets));
4522 	ut_ad(mtr);
4523 
4524 	for (i = 0; i < rec_offs_n_fields(offsets); i++) {
4525 		if (rec_offs_nth_extern(offsets, i)
4526 		    && !upd_get_field_by_field_no(update, i)) {
4527 			btr_cur_set_ownership_of_extern_field(
4528 				page_zip, rec, index, offsets, i, FALSE, mtr);
4529 		}
4530 	}
4531 }
4532 
4533 /*******************************************************************//**
4534 Marks all extern fields in a record as owned by the record. This function
4535 should be called if the delete mark of a record is removed: a not delete
4536 marked record always owns all its extern fields. */
4537 static
4538 void
btr_cur_unmark_extern_fields(page_zip_des_t * page_zip,rec_t * rec,dict_index_t * index,const ulint * offsets,mtr_t * mtr)4539 btr_cur_unmark_extern_fields(
4540 /*=========================*/
4541 	page_zip_des_t*	page_zip,/*!< in/out: compressed page whose uncompressed
4542 				part will be updated, or NULL */
4543 	rec_t*		rec,	/*!< in/out: record in a clustered index */
4544 	dict_index_t*	index,	/*!< in: index of the page */
4545 	const ulint*	offsets,/*!< in: array returned by rec_get_offsets() */
4546 	mtr_t*		mtr)	/*!< in: mtr, or NULL if not logged */
4547 {
4548 	ulint	n;
4549 	ulint	i;
4550 
4551 	ut_ad(!rec_offs_comp(offsets) || !rec_get_node_ptr_flag(rec));
4552 	n = rec_offs_n_fields(offsets);
4553 
4554 	if (!rec_offs_any_extern(offsets)) {
4555 
4556 		return;
4557 	}
4558 
4559 	for (i = 0; i < n; i++) {
4560 		if (rec_offs_nth_extern(offsets, i)) {
4561 
4562 			btr_cur_set_ownership_of_extern_field(
4563 				page_zip, rec, index, offsets, i, TRUE, mtr);
4564 		}
4565 	}
4566 }
4567 
4568 /*******************************************************************//**
4569 Flags the data tuple fields that are marked as extern storage in the
4570 update vector.  We use this function to remember which fields we must
4571 mark as extern storage in a record inserted for an update.
4572 @return	number of flagged external columns */
4573 UNIV_INTERN
4574 ulint
btr_push_update_extern_fields(dtuple_t * tuple,const upd_t * update,mem_heap_t * heap)4575 btr_push_update_extern_fields(
4576 /*==========================*/
4577 	dtuple_t*	tuple,	/*!< in/out: data tuple */
4578 	const upd_t*	update,	/*!< in: update vector */
4579 	mem_heap_t*	heap)	/*!< in: memory heap */
4580 {
4581 	ulint			n_pushed	= 0;
4582 	ulint			n;
4583 	const upd_field_t*	uf;
4584 
4585 	ut_ad(tuple);
4586 	ut_ad(update);
4587 
4588 	uf = update->fields;
4589 	n = upd_get_n_fields(update);
4590 
4591 	for (; n--; uf++) {
4592 		if (dfield_is_ext(&uf->new_val)) {
4593 			dfield_t*	field
4594 				= dtuple_get_nth_field(tuple, uf->field_no);
4595 
4596 			if (!dfield_is_ext(field)) {
4597 				dfield_set_ext(field);
4598 				n_pushed++;
4599 			}
4600 
4601 			switch (uf->orig_len) {
4602 				byte*	data;
4603 				ulint	len;
4604 				byte*	buf;
4605 			case 0:
4606 				break;
4607 			case BTR_EXTERN_FIELD_REF_SIZE:
4608 				/* Restore the original locally stored
4609 				part of the column.  In the undo log,
4610 				InnoDB writes a longer prefix of externally
4611 				stored columns, so that column prefixes
4612 				in secondary indexes can be reconstructed. */
4613 				dfield_set_data(field, (byte*) dfield_get_data(field)
4614 						+ dfield_get_len(field)
4615 						- BTR_EXTERN_FIELD_REF_SIZE,
4616 						BTR_EXTERN_FIELD_REF_SIZE);
4617 				dfield_set_ext(field);
4618 				break;
4619 			default:
4620 				/* Reconstruct the original locally
4621 				stored part of the column.  The data
4622 				will have to be copied. */
4623 				ut_a(uf->orig_len > BTR_EXTERN_FIELD_REF_SIZE);
4624 
4625 				data = (byte*) dfield_get_data(field);
4626 				len = dfield_get_len(field);
4627 
4628 				buf = (byte*) mem_heap_alloc(heap,
4629 							     uf->orig_len);
4630 				/* Copy the locally stored prefix. */
4631 				memcpy(buf, data,
4632 				       uf->orig_len
4633 				       - BTR_EXTERN_FIELD_REF_SIZE);
4634 				/* Copy the BLOB pointer. */
4635 				memcpy(buf + uf->orig_len
4636 				       - BTR_EXTERN_FIELD_REF_SIZE,
4637 				       data + len - BTR_EXTERN_FIELD_REF_SIZE,
4638 				       BTR_EXTERN_FIELD_REF_SIZE);
4639 
4640 				dfield_set_data(field, buf, uf->orig_len);
4641 				dfield_set_ext(field);
4642 			}
4643 		}
4644 	}
4645 
4646 	return(n_pushed);
4647 }
4648 
4649 /*******************************************************************//**
4650 Returns the length of a BLOB part stored on the header page.
4651 @return	part length */
4652 static
4653 ulint
btr_blob_get_part_len(const byte * blob_header)4654 btr_blob_get_part_len(
4655 /*==================*/
4656 	const byte*	blob_header)	/*!< in: blob header */
4657 {
4658 	return(mach_read_from_4(blob_header + BTR_BLOB_HDR_PART_LEN));
4659 }
4660 
4661 /*******************************************************************//**
4662 Returns the page number where the next BLOB part is stored.
4663 @return	page number or FIL_NULL if no more pages */
4664 static
4665 ulint
btr_blob_get_next_page_no(const byte * blob_header)4666 btr_blob_get_next_page_no(
4667 /*======================*/
4668 	const byte*	blob_header)	/*!< in: blob header */
4669 {
4670 	return(mach_read_from_4(blob_header + BTR_BLOB_HDR_NEXT_PAGE_NO));
4671 }
4672 
4673 /*******************************************************************//**
4674 Deallocate a buffer block that was reserved for a BLOB part. */
4675 static
4676 void
btr_blob_free(buf_block_t * block,ibool all,mtr_t * mtr)4677 btr_blob_free(
4678 /*==========*/
4679 	buf_block_t*	block,	/*!< in: buffer block */
4680 	ibool		all,	/*!< in: TRUE=remove also the compressed page
4681 				if there is one */
4682 	mtr_t*		mtr)	/*!< in: mini-transaction to commit */
4683 {
4684 	buf_pool_t*	buf_pool = buf_pool_from_block(block);
4685 	ulint		space	= buf_block_get_space(block);
4686 	ulint		page_no	= buf_block_get_page_no(block);
4687 	bool		freed	= false;
4688 
4689 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
4690 
4691 	mtr_commit(mtr);
4692 
4693 	mutex_enter(&buf_pool->LRU_list_mutex);
4694 	mutex_enter(&block->mutex);
4695 
4696 	/* Only free the block if it is still allocated to
4697 	the same file page. */
4698 
4699 	if (buf_block_get_state(block)
4700 	    == BUF_BLOCK_FILE_PAGE
4701 	    && buf_block_get_space(block) == space
4702 	    && buf_block_get_page_no(block) == page_no) {
4703 
4704 		freed = buf_LRU_free_page(&block->page, all);
4705 
4706 		if (!freed && all && block->page.zip.data
4707 		    /* Now, buf_LRU_free_page() may release mutexes
4708 		    temporarily */
4709 		    && buf_block_get_state(block) == BUF_BLOCK_FILE_PAGE
4710 		    && buf_block_get_space(block) == space
4711 		    && buf_block_get_page_no(block) == page_no) {
4712 
4713 			/* Attempt to deallocate the uncompressed page
4714 			if the whole block cannot be deallocted. */
4715 			freed = buf_LRU_free_page(&block->page, false);
4716 		}
4717 	}
4718 
4719 	if (!freed) {
4720 		mutex_exit(&buf_pool->LRU_list_mutex);
4721 	}
4722 
4723 	mutex_exit(&block->mutex);
4724 }
4725 
4726 /*******************************************************************//**
4727 Stores the fields in big_rec_vec to the tablespace and puts pointers to
4728 them in rec.  The extern flags in rec will have to be set beforehand.
4729 The fields are stored on pages allocated from leaf node
4730 file segment of the index tree.
4731 @return	DB_SUCCESS or DB_OUT_OF_FILE_SPACE or DB_TOO_BIG_FOR_REDO */
4732 UNIV_INTERN
4733 dberr_t
btr_store_big_rec_extern_fields(dict_index_t * index,buf_block_t * rec_block,rec_t * rec,const ulint * offsets,const big_rec_t * big_rec_vec,mtr_t * btr_mtr,enum blob_op op)4734 btr_store_big_rec_extern_fields(
4735 /*============================*/
4736 	dict_index_t*	index,		/*!< in: index of rec; the index tree
4737 					MUST be X-latched */
4738 	buf_block_t*	rec_block,	/*!< in/out: block containing rec */
4739 	rec_t*		rec,		/*!< in/out: record */
4740 	const ulint*	offsets,	/*!< in: rec_get_offsets(rec, index);
4741 					the "external storage" flags in offsets
4742 					will not correspond to rec when
4743 					this function returns */
4744 	const big_rec_t*big_rec_vec,	/*!< in: vector containing fields
4745 					to be stored externally */
4746 	mtr_t*		btr_mtr,	/*!< in: mtr containing the
4747 					latches to the clustered index */
4748 	enum blob_op	op)		/*! in: operation code */
4749 {
4750 	ulint		rec_page_no;
4751 	byte*		field_ref;
4752 	ulint		extern_len;
4753 	ulint		store_len;
4754 	ulint		page_no;
4755 	ulint		space_id;
4756 	ulint		zip_size;
4757 	ulint		prev_page_no;
4758 	ulint		hint_page_no;
4759 	ulint		i;
4760 	mtr_t		mtr;
4761 	mtr_t*		alloc_mtr;
4762 	mem_heap_t*	heap = NULL;
4763 	page_zip_des_t*	page_zip;
4764 	z_stream	c_stream;
4765 	buf_block_t**	freed_pages	= NULL;
4766 	ulint		n_freed_pages	= 0;
4767 	dberr_t		error		= DB_SUCCESS;
4768 
4769 	ut_ad(rec_offs_validate(rec, index, offsets));
4770 	ut_ad(rec_offs_any_extern(offsets));
4771 	ut_ad(btr_mtr);
4772 	ut_ad(mtr_memo_contains(btr_mtr, dict_index_get_lock(index),
4773 				MTR_MEMO_X_LOCK));
4774 	ut_ad(mtr_memo_contains(btr_mtr, rec_block, MTR_MEMO_PAGE_X_FIX));
4775 	ut_ad(buf_block_get_frame(rec_block) == page_align(rec));
4776 	ut_a(dict_index_is_clust(index));
4777 
4778 	page_zip = buf_block_get_page_zip(rec_block);
4779 	ut_a(dict_table_zip_size(index->table)
4780 	     == buf_block_get_zip_size(rec_block));
4781 
4782 	space_id = buf_block_get_space(rec_block);
4783 	zip_size = buf_block_get_zip_size(rec_block);
4784 	rec_page_no = buf_block_get_page_no(rec_block);
4785 	ut_a(fil_page_get_type(page_align(rec)) == FIL_PAGE_INDEX);
4786 
4787 	error = btr_check_blob_limit(big_rec_vec);
4788 
4789 	if (error != DB_SUCCESS) {
4790 		ut_ad(op == BTR_STORE_INSERT);
4791 		return(error);
4792 	}
4793 
4794 	if (page_zip) {
4795 		int	err;
4796 
4797 		/* Zlib deflate needs 128 kilobytes for the default
4798 		window size, plus 512 << memLevel, plus a few
4799 		kilobytes for small objects.  We use reduced memLevel
4800 		to limit the memory consumption, and preallocate the
4801 		heap, hoping to avoid memory fragmentation. */
4802 		heap = mem_heap_create(250000);
4803 		page_zip_set_alloc(&c_stream, heap);
4804 
4805 		err = deflateInit2(&c_stream, page_zip_level,
4806 				   Z_DEFLATED, 15, 7, Z_DEFAULT_STRATEGY);
4807 		ut_a(err == Z_OK);
4808 	}
4809 
4810 	if (btr_blob_op_is_update(op)) {
4811 		/* Avoid reusing pages that have been previously freed
4812 		in btr_mtr. */
4813 		if (btr_mtr->n_freed_pages) {
4814 			if (heap == NULL) {
4815 				heap = mem_heap_create(
4816 					btr_mtr->n_freed_pages
4817 					* sizeof *freed_pages);
4818 			}
4819 
4820 			freed_pages = static_cast<buf_block_t**>(
4821 				mem_heap_alloc(
4822 					heap,
4823 					btr_mtr->n_freed_pages
4824 					* sizeof *freed_pages));
4825 			n_freed_pages = 0;
4826 		}
4827 
4828 		/* Because btr_mtr will be committed after mtr, it is
4829 		possible that the tablespace has been extended when
4830 		the B-tree record was updated or inserted, or it will
4831 		be extended while allocating pages for big_rec.
4832 
4833 		TODO: In mtr (not btr_mtr), write a redo log record
4834 		about extending the tablespace to its current size,
4835 		and remember the current size. Whenever the tablespace
4836 		grows as pages are allocated, write further redo log
4837 		records to mtr. (Currently tablespace extension is not
4838 		covered by the redo log. If it were, the record would
4839 		only be written to btr_mtr, which is committed after
4840 		mtr.) */
4841 		alloc_mtr = btr_mtr;
4842 	} else {
4843 		/* Use the local mtr for allocations. */
4844 		alloc_mtr = &mtr;
4845 	}
4846 
4847 #if defined UNIV_DEBUG || defined UNIV_BLOB_LIGHT_DEBUG
4848 	/* All pointers to externally stored columns in the record
4849 	must either be zero or they must be pointers to inherited
4850 	columns, owned by this record or an earlier record version. */
4851 	for (i = 0; i < rec_offs_n_fields(offsets); i++) {
4852 		if (!rec_offs_nth_extern(offsets, i)) {
4853 			continue;
4854 		}
4855 		field_ref = btr_rec_get_field_ref(rec, offsets, i);
4856 
4857 		ut_a(!(field_ref[BTR_EXTERN_LEN] & BTR_EXTERN_OWNER_FLAG));
4858 		/* Either this must be an update in place,
4859 		or the BLOB must be inherited, or the BLOB pointer
4860 		must be zero (will be written in this function). */
4861 		ut_a(op == BTR_STORE_UPDATE
4862 		     || (field_ref[BTR_EXTERN_LEN] & BTR_EXTERN_INHERITED_FLAG)
4863 		     || !memcmp(field_ref, field_ref_zero,
4864 				BTR_EXTERN_FIELD_REF_SIZE));
4865 	}
4866 #endif /* UNIV_DEBUG || UNIV_BLOB_LIGHT_DEBUG */
4867 	/* We have to create a file segment to the tablespace
4868 	for each field and put the pointer to the field in rec */
4869 
4870 	for (i = 0; i < big_rec_vec->n_fields; i++) {
4871 		field_ref = btr_rec_get_field_ref(
4872 			rec, offsets, big_rec_vec->fields[i].field_no);
4873 #if defined UNIV_DEBUG || defined UNIV_BLOB_LIGHT_DEBUG
4874 		/* A zero BLOB pointer should have been initially inserted. */
4875 		ut_a(!memcmp(field_ref, field_ref_zero,
4876 			     BTR_EXTERN_FIELD_REF_SIZE));
4877 #endif /* UNIV_DEBUG || UNIV_BLOB_LIGHT_DEBUG */
4878 		extern_len = big_rec_vec->fields[i].len;
4879 		UNIV_MEM_ASSERT_RW(big_rec_vec->fields[i].data,
4880 				   extern_len);
4881 
4882 		ut_a(extern_len > 0);
4883 
4884 		prev_page_no = FIL_NULL;
4885 
4886 		if (page_zip) {
4887 			int	err = deflateReset(&c_stream);
4888 			ut_a(err == Z_OK);
4889 
4890 			c_stream.next_in = (Bytef*)
4891 				big_rec_vec->fields[i].data;
4892 			c_stream.avail_in = static_cast<uInt>(extern_len);
4893 		}
4894 
4895 		for (;;) {
4896 			buf_block_t*	block;
4897 			page_t*		page;
4898 
4899 			mtr_start(&mtr);
4900 
4901 			if (prev_page_no == FIL_NULL) {
4902 				hint_page_no = 1 + rec_page_no;
4903 			} else {
4904 				hint_page_no = prev_page_no + 1;
4905 			}
4906 
4907 alloc_another:
4908 			block = btr_page_alloc(index, hint_page_no,
4909 					       FSP_NO_DIR, 0, alloc_mtr, &mtr);
4910 			if (UNIV_UNLIKELY(block == NULL)) {
4911 				mtr_commit(&mtr);
4912 				error = DB_OUT_OF_FILE_SPACE;
4913 				goto func_exit;
4914 			}
4915 
4916 			if (rw_lock_get_x_lock_count(&block->lock) > 1) {
4917 				/* This page must have been freed in
4918 				btr_mtr previously. Put it aside, and
4919 				allocate another page for the BLOB data. */
4920 				ut_ad(alloc_mtr == btr_mtr);
4921 				ut_ad(btr_blob_op_is_update(op));
4922 				ut_ad(n_freed_pages < btr_mtr->n_freed_pages);
4923 				freed_pages[n_freed_pages++] = block;
4924 				goto alloc_another;
4925 			}
4926 
4927 			page_no = buf_block_get_page_no(block);
4928 			page = buf_block_get_frame(block);
4929 
4930 			if (prev_page_no != FIL_NULL) {
4931 				buf_block_t*	prev_block;
4932 				page_t*		prev_page;
4933 
4934 				prev_block = buf_page_get(space_id, zip_size,
4935 							  prev_page_no,
4936 							  RW_X_LATCH, &mtr);
4937 				buf_block_dbg_add_level(prev_block,
4938 							SYNC_EXTERN_STORAGE);
4939 				prev_page = buf_block_get_frame(prev_block);
4940 
4941 				if (page_zip) {
4942 					mlog_write_ulint(
4943 						prev_page + FIL_PAGE_NEXT,
4944 						page_no, MLOG_4BYTES, &mtr);
4945 					memcpy(buf_block_get_page_zip(
4946 						       prev_block)
4947 					       ->data + FIL_PAGE_NEXT,
4948 					       prev_page + FIL_PAGE_NEXT, 4);
4949 				} else {
4950 					mlog_write_ulint(
4951 						prev_page + FIL_PAGE_DATA
4952 						+ BTR_BLOB_HDR_NEXT_PAGE_NO,
4953 						page_no, MLOG_4BYTES, &mtr);
4954 				}
4955 
4956 			} else if (dict_index_is_online_ddl(index)) {
4957 				row_log_table_blob_alloc(index, page_no);
4958 			}
4959 
4960 			if (page_zip) {
4961 				int		err;
4962 				page_zip_des_t*	blob_page_zip;
4963 
4964 				/* Write FIL_PAGE_TYPE to the redo log
4965 				separately, before logging any other
4966 				changes to the page, so that the debug
4967 				assertions in
4968 				recv_parse_or_apply_log_rec_body() can
4969 				be made simpler.  Before InnoDB Plugin
4970 				1.0.4, the initialization of
4971 				FIL_PAGE_TYPE was logged as part of
4972 				the mlog_log_string() below. */
4973 
4974 				mlog_write_ulint(page + FIL_PAGE_TYPE,
4975 						 prev_page_no == FIL_NULL
4976 						 ? FIL_PAGE_TYPE_ZBLOB
4977 						 : FIL_PAGE_TYPE_ZBLOB2,
4978 						 MLOG_2BYTES, &mtr);
4979 
4980 				c_stream.next_out = page
4981 					+ FIL_PAGE_DATA;
4982 				c_stream.avail_out
4983 					= static_cast<uInt>(page_zip_get_size(page_zip))
4984 					- FIL_PAGE_DATA;
4985 
4986 				err = deflate(&c_stream, Z_FINISH);
4987 				ut_a(err == Z_OK || err == Z_STREAM_END);
4988 				ut_a(err == Z_STREAM_END
4989 				     || c_stream.avail_out == 0);
4990 
4991 				/* Write the "next BLOB page" pointer */
4992 				mlog_write_ulint(page + FIL_PAGE_NEXT,
4993 						 FIL_NULL, MLOG_4BYTES, &mtr);
4994 				/* Initialize the unused "prev page" pointer */
4995 				mlog_write_ulint(page + FIL_PAGE_PREV,
4996 						 FIL_NULL, MLOG_4BYTES, &mtr);
4997 				/* Write a back pointer to the record
4998 				into the otherwise unused area.  This
4999 				information could be useful in
5000 				debugging.  Later, we might want to
5001 				implement the possibility to relocate
5002 				BLOB pages.  Then, we would need to be
5003 				able to adjust the BLOB pointer in the
5004 				record.  We do not store the heap
5005 				number of the record, because it can
5006 				change in page_zip_reorganize() or
5007 				btr_page_reorganize().  However, also
5008 				the page number of the record may
5009 				change when B-tree nodes are split or
5010 				merged. */
5011 				mlog_write_ulint(page
5012 						 + FIL_PAGE_FILE_FLUSH_LSN,
5013 						 space_id,
5014 						 MLOG_4BYTES, &mtr);
5015 				mlog_write_ulint(page
5016 						 + FIL_PAGE_FILE_FLUSH_LSN + 4,
5017 						 rec_page_no,
5018 						 MLOG_4BYTES, &mtr);
5019 
5020 				/* Zero out the unused part of the page. */
5021 				memset(page + page_zip_get_size(page_zip)
5022 				       - c_stream.avail_out,
5023 				       0, c_stream.avail_out);
5024 				mlog_log_string(page + FIL_PAGE_FILE_FLUSH_LSN,
5025 						page_zip_get_size(page_zip)
5026 						- FIL_PAGE_FILE_FLUSH_LSN,
5027 						&mtr);
5028 				/* Copy the page to compressed storage,
5029 				because it will be flushed to disk
5030 				from there. */
5031 				blob_page_zip = buf_block_get_page_zip(block);
5032 				ut_ad(blob_page_zip);
5033 				ut_ad(page_zip_get_size(blob_page_zip)
5034 				      == page_zip_get_size(page_zip));
5035 				memcpy(blob_page_zip->data, page,
5036 				       page_zip_get_size(page_zip));
5037 
5038 				if (err == Z_OK && prev_page_no != FIL_NULL) {
5039 
5040 					goto next_zip_page;
5041 				}
5042 
5043 				if (alloc_mtr == &mtr) {
5044 					rec_block = buf_page_get(
5045 						space_id, zip_size,
5046 						rec_page_no,
5047 						RW_X_LATCH, &mtr);
5048 					buf_block_dbg_add_level(
5049 						rec_block,
5050 						SYNC_NO_ORDER_CHECK);
5051 				}
5052 
5053 				if (err == Z_STREAM_END) {
5054 					mach_write_to_4(field_ref
5055 							+ BTR_EXTERN_LEN, 0);
5056 					mach_write_to_4(field_ref
5057 							+ BTR_EXTERN_LEN + 4,
5058 							c_stream.total_in);
5059 				} else {
5060 					memset(field_ref + BTR_EXTERN_LEN,
5061 					       0, 8);
5062 				}
5063 
5064 				if (prev_page_no == FIL_NULL) {
5065 					btr_blob_dbg_add_blob(
5066 						rec, big_rec_vec->fields[i]
5067 						.field_no, page_no, index,
5068 						"store");
5069 
5070 					mach_write_to_4(field_ref
5071 							+ BTR_EXTERN_SPACE_ID,
5072 							space_id);
5073 
5074 					mach_write_to_4(field_ref
5075 							+ BTR_EXTERN_PAGE_NO,
5076 							page_no);
5077 
5078 					mach_write_to_4(field_ref
5079 							+ BTR_EXTERN_OFFSET,
5080 							FIL_PAGE_NEXT);
5081 				}
5082 
5083 				page_zip_write_blob_ptr(
5084 					page_zip, rec, index, offsets,
5085 					big_rec_vec->fields[i].field_no,
5086 					alloc_mtr);
5087 
5088 next_zip_page:
5089 				prev_page_no = page_no;
5090 
5091 				/* Commit mtr and release the
5092 				uncompressed page frame to save memory. */
5093 				btr_blob_free(block, FALSE, &mtr);
5094 
5095 				if (err == Z_STREAM_END) {
5096 					break;
5097 				}
5098 			} else {
5099 				mlog_write_ulint(page + FIL_PAGE_TYPE,
5100 						 FIL_PAGE_TYPE_BLOB,
5101 						 MLOG_2BYTES, &mtr);
5102 
5103 				if (extern_len > (UNIV_PAGE_SIZE
5104 						  - FIL_PAGE_DATA
5105 						  - BTR_BLOB_HDR_SIZE
5106 						  - FIL_PAGE_DATA_END)) {
5107 					store_len = UNIV_PAGE_SIZE
5108 						- FIL_PAGE_DATA
5109 						- BTR_BLOB_HDR_SIZE
5110 						- FIL_PAGE_DATA_END;
5111 				} else {
5112 					store_len = extern_len;
5113 				}
5114 
5115 				mlog_write_string(page + FIL_PAGE_DATA
5116 						  + BTR_BLOB_HDR_SIZE,
5117 						  (const byte*)
5118 						  big_rec_vec->fields[i].data
5119 						  + big_rec_vec->fields[i].len
5120 						  - extern_len,
5121 						  store_len, &mtr);
5122 				mlog_write_ulint(page + FIL_PAGE_DATA
5123 						 + BTR_BLOB_HDR_PART_LEN,
5124 						 store_len, MLOG_4BYTES, &mtr);
5125 				mlog_write_ulint(page + FIL_PAGE_DATA
5126 						 + BTR_BLOB_HDR_NEXT_PAGE_NO,
5127 						 FIL_NULL, MLOG_4BYTES, &mtr);
5128 
5129 				extern_len -= store_len;
5130 
5131 				if (alloc_mtr == &mtr) {
5132 					rec_block = buf_page_get(
5133 						space_id, zip_size,
5134 						rec_page_no,
5135 						RW_X_LATCH, &mtr);
5136 					buf_block_dbg_add_level(
5137 						rec_block,
5138 						SYNC_NO_ORDER_CHECK);
5139 				}
5140 
5141 				mlog_write_ulint(field_ref + BTR_EXTERN_LEN, 0,
5142 						 MLOG_4BYTES, alloc_mtr);
5143 				mlog_write_ulint(field_ref
5144 						 + BTR_EXTERN_LEN + 4,
5145 						 big_rec_vec->fields[i].len
5146 						 - extern_len,
5147 						 MLOG_4BYTES, alloc_mtr);
5148 
5149 				if (prev_page_no == FIL_NULL) {
5150 					btr_blob_dbg_add_blob(
5151 						rec, big_rec_vec->fields[i]
5152 						.field_no, page_no, index,
5153 						"store");
5154 
5155 					mlog_write_ulint(field_ref
5156 							 + BTR_EXTERN_SPACE_ID,
5157 							 space_id, MLOG_4BYTES,
5158 							 alloc_mtr);
5159 
5160 					mlog_write_ulint(field_ref
5161 							 + BTR_EXTERN_PAGE_NO,
5162 							 page_no, MLOG_4BYTES,
5163 							 alloc_mtr);
5164 
5165 					mlog_write_ulint(field_ref
5166 							 + BTR_EXTERN_OFFSET,
5167 							 FIL_PAGE_DATA,
5168 							 MLOG_4BYTES,
5169 							 alloc_mtr);
5170 				}
5171 
5172 				prev_page_no = page_no;
5173 
5174 				mtr_commit(&mtr);
5175 
5176 				if (extern_len == 0) {
5177 					break;
5178 				}
5179 			}
5180 		}
5181 
5182 		DBUG_EXECUTE_IF("btr_store_big_rec_extern",
5183 				error = DB_OUT_OF_FILE_SPACE;
5184 				goto func_exit;);
5185 	}
5186 
5187 func_exit:
5188 	if (page_zip) {
5189 		deflateEnd(&c_stream);
5190 	}
5191 
5192 	if (n_freed_pages) {
5193 		ulint	i;
5194 
5195 		ut_ad(alloc_mtr == btr_mtr);
5196 		ut_ad(btr_blob_op_is_update(op));
5197 
5198 		for (i = 0; i < n_freed_pages; i++) {
5199 			btr_page_free_low(index, freed_pages[i], 0, alloc_mtr);
5200 		}
5201 	}
5202 
5203 	if (heap != NULL) {
5204 		mem_heap_free(heap);
5205 	}
5206 
5207 #if defined UNIV_DEBUG || defined UNIV_BLOB_LIGHT_DEBUG
5208 	/* All pointers to externally stored columns in the record
5209 	must be valid. */
5210 	for (i = 0; i < rec_offs_n_fields(offsets); i++) {
5211 		if (!rec_offs_nth_extern(offsets, i)) {
5212 			continue;
5213 		}
5214 
5215 		field_ref = btr_rec_get_field_ref(rec, offsets, i);
5216 
5217 		/* The pointer must not be zero if the operation
5218 		succeeded. */
5219 		ut_a(0 != memcmp(field_ref, field_ref_zero,
5220 				 BTR_EXTERN_FIELD_REF_SIZE)
5221 		     || error != DB_SUCCESS);
5222 		/* The column must not be disowned by this record. */
5223 		ut_a(!(field_ref[BTR_EXTERN_LEN] & BTR_EXTERN_OWNER_FLAG));
5224 	}
5225 #endif /* UNIV_DEBUG || UNIV_BLOB_LIGHT_DEBUG */
5226 	return(error);
5227 }
5228 
5229 /*******************************************************************//**
5230 Check the FIL_PAGE_TYPE on an uncompressed BLOB page. */
5231 static
5232 void
btr_check_blob_fil_page_type(ulint space_id,ulint page_no,const page_t * page,ibool read)5233 btr_check_blob_fil_page_type(
5234 /*=========================*/
5235 	ulint		space_id,	/*!< in: space id */
5236 	ulint		page_no,	/*!< in: page number */
5237 	const page_t*	page,		/*!< in: page */
5238 	ibool		read)		/*!< in: TRUE=read, FALSE=purge */
5239 {
5240 	ulint	type = fil_page_get_type(page);
5241 
5242 	ut_a(space_id == page_get_space_id(page));
5243 	ut_a(page_no == page_get_page_no(page));
5244 
5245 	if (UNIV_UNLIKELY(type != FIL_PAGE_TYPE_BLOB)) {
5246 		ulint	flags = fil_space_get_flags(space_id);
5247 
5248 #ifndef UNIV_DEBUG /* Improve debug test coverage */
5249 		if (dict_tf_get_format(flags) == UNIV_FORMAT_A) {
5250 			/* Old versions of InnoDB did not initialize
5251 			FIL_PAGE_TYPE on BLOB pages.  Do not print
5252 			anything about the type mismatch when reading
5253 			a BLOB page that is in Antelope format.*/
5254 			return;
5255 		}
5256 #endif /* !UNIV_DEBUG */
5257 
5258 		ut_print_timestamp(stderr);
5259 		fprintf(stderr,
5260 			"  InnoDB: FIL_PAGE_TYPE=%lu"
5261 			" on BLOB %s space %lu page %lu flags %lx\n",
5262 			(ulong) type, read ? "read" : "purge",
5263 			(ulong) space_id, (ulong) page_no, (ulong) flags);
5264 		ut_error;
5265 	}
5266 }
5267 
5268 /*******************************************************************//**
5269 Frees the space in an externally stored field to the file space
5270 management if the field in data is owned by the externally stored field,
5271 in a rollback we may have the additional condition that the field must
5272 not be inherited. */
5273 UNIV_INTERN
5274 void
btr_free_externally_stored_field(dict_index_t * index,byte * field_ref,const rec_t * rec,const ulint * offsets,page_zip_des_t * page_zip,ulint i,enum trx_rb_ctx rb_ctx,mtr_t * local_mtr MY_ATTRIBUTE ((unused)))5275 btr_free_externally_stored_field(
5276 /*=============================*/
5277 	dict_index_t*	index,		/*!< in: index of the data, the index
5278 					tree MUST be X-latched; if the tree
5279 					height is 1, then also the root page
5280 					must be X-latched! (this is relevant
5281 					in the case this function is called
5282 					from purge where 'data' is located on
5283 					an undo log page, not an index
5284 					page) */
5285 	byte*		field_ref,	/*!< in/out: field reference */
5286 	const rec_t*	rec,		/*!< in: record containing field_ref, for
5287 					page_zip_write_blob_ptr(), or NULL */
5288 	const ulint*	offsets,	/*!< in: rec_get_offsets(rec, index),
5289 					or NULL */
5290 	page_zip_des_t*	page_zip,	/*!< in: compressed page corresponding
5291 					to rec, or NULL if rec == NULL */
5292 	ulint		i,		/*!< in: field number of field_ref;
5293 					ignored if rec == NULL */
5294 	enum trx_rb_ctx	rb_ctx,		/*!< in: rollback context */
5295 	mtr_t*		local_mtr MY_ATTRIBUTE((unused))) /*!< in: mtr
5296 					containing the latch to data an an
5297 					X-latch to the index tree */
5298 {
5299 	page_t*		page;
5300 	const ulint	space_id	= mach_read_from_4(
5301 		field_ref + BTR_EXTERN_SPACE_ID);
5302 	const ulint	start_page	= mach_read_from_4(
5303 		field_ref + BTR_EXTERN_PAGE_NO);
5304 	ulint		rec_zip_size = dict_table_zip_size(index->table);
5305 	ulint		ext_zip_size;
5306 	ulint		page_no;
5307 	ulint		next_page_no;
5308 	mtr_t		mtr;
5309 
5310 	ut_ad(dict_index_is_clust(index));
5311 	ut_ad(mtr_memo_contains(local_mtr, dict_index_get_lock(index),
5312 				MTR_MEMO_X_LOCK));
5313 	ut_ad(mtr_memo_contains_page(local_mtr, field_ref,
5314 				     MTR_MEMO_PAGE_X_FIX));
5315 	ut_ad(!rec || rec_offs_validate(rec, index, offsets));
5316 	ut_ad(!rec || field_ref == btr_rec_get_field_ref(rec, offsets, i));
5317 
5318 	if (UNIV_UNLIKELY(!memcmp(field_ref, field_ref_zero,
5319 				  BTR_EXTERN_FIELD_REF_SIZE))) {
5320 		/* In the rollback, we may encounter a clustered index
5321 		record with some unwritten off-page columns. There is
5322 		nothing to free then. */
5323 		ut_a(rb_ctx != RB_NONE);
5324 		return;
5325 	}
5326 
5327 	ut_ad(space_id == index->space);
5328 
5329 	if (UNIV_UNLIKELY(space_id != dict_index_get_space(index))) {
5330 		ext_zip_size = fil_space_get_zip_size(space_id);
5331 		/* This must be an undo log record in the system tablespace,
5332 		that is, in row_purge_upd_exist_or_extern().
5333 		Currently, externally stored records are stored in the
5334 		same tablespace as the referring records. */
5335 		ut_ad(!page_get_space_id(page_align(field_ref)));
5336 		ut_ad(!rec);
5337 		ut_ad(!page_zip);
5338 	} else {
5339 		ext_zip_size = rec_zip_size;
5340 	}
5341 
5342 	if (!rec) {
5343 		/* This is a call from row_purge_upd_exist_or_extern(). */
5344 		ut_ad(!page_zip);
5345 		rec_zip_size = 0;
5346 	}
5347 
5348 #ifdef UNIV_BLOB_DEBUG
5349 	if (!(field_ref[BTR_EXTERN_LEN] & BTR_EXTERN_OWNER_FLAG)
5350 	    && !((field_ref[BTR_EXTERN_LEN] & BTR_EXTERN_INHERITED_FLAG)
5351 		 && (rb_ctx == RB_NORMAL || rb_ctx == RB_RECOVERY))) {
5352 		/* This off-page column will be freed.
5353 		Check that no references remain. */
5354 
5355 		btr_blob_dbg_t	b;
5356 
5357 		b.blob_page_no = start_page;
5358 
5359 		if (rec) {
5360 			/* Remove the reference from the record to the
5361 			BLOB. If the BLOB were not freed, the
5362 			reference would be removed when the record is
5363 			removed. Freeing the BLOB will overwrite the
5364 			BTR_EXTERN_PAGE_NO in the field_ref of the
5365 			record with FIL_NULL, which would make the
5366 			btr_blob_dbg information inconsistent with the
5367 			record. */
5368 			b.ref_page_no = page_get_page_no(page_align(rec));
5369 			b.ref_heap_no = page_rec_get_heap_no(rec);
5370 			b.ref_field_no = i;
5371 			btr_blob_dbg_rbt_delete(index, &b, "free");
5372 		}
5373 
5374 		btr_blob_dbg_assert_empty(index, b.blob_page_no);
5375 	}
5376 #endif /* UNIV_BLOB_DEBUG */
5377 
5378 	for (;;) {
5379 #ifdef UNIV_SYNC_DEBUG
5380 		buf_block_t*	rec_block;
5381 #endif /* UNIV_SYNC_DEBUG */
5382 		buf_block_t*	ext_block;
5383 
5384 		mtr_start(&mtr);
5385 
5386 #ifdef UNIV_SYNC_DEBUG
5387 		rec_block =
5388 #endif /* UNIV_SYNC_DEBUG */
5389 		buf_page_get(page_get_space_id(page_align(field_ref)),
5390 			     rec_zip_size,
5391 			     page_get_page_no(page_align(field_ref)),
5392 			     RW_X_LATCH, &mtr);
5393 		buf_block_dbg_add_level(rec_block, SYNC_NO_ORDER_CHECK);
5394 		page_no = mach_read_from_4(field_ref + BTR_EXTERN_PAGE_NO);
5395 
5396 		if (/* There is no external storage data */
5397 		    page_no == FIL_NULL
5398 		    /* This field does not own the externally stored field */
5399 		    || (mach_read_from_1(field_ref + BTR_EXTERN_LEN)
5400 			& BTR_EXTERN_OWNER_FLAG)
5401 		    /* Rollback and inherited field */
5402 		    || ((rb_ctx == RB_NORMAL || rb_ctx == RB_RECOVERY)
5403 			&& (mach_read_from_1(field_ref + BTR_EXTERN_LEN)
5404 			    & BTR_EXTERN_INHERITED_FLAG))) {
5405 
5406 			/* Do not free */
5407 			mtr_commit(&mtr);
5408 
5409 			return;
5410 		}
5411 
5412 		if (page_no == start_page && dict_index_is_online_ddl(index)) {
5413 			row_log_table_blob_free(index, start_page);
5414 		}
5415 
5416 		ext_block = buf_page_get(space_id, ext_zip_size, page_no,
5417 					 RW_X_LATCH, &mtr);
5418 		buf_block_dbg_add_level(ext_block, SYNC_EXTERN_STORAGE);
5419 		page = buf_block_get_frame(ext_block);
5420 
5421 		if (ext_zip_size) {
5422 			/* Note that page_zip will be NULL
5423 			in row_purge_upd_exist_or_extern(). */
5424 			switch (fil_page_get_type(page)) {
5425 			case FIL_PAGE_TYPE_ZBLOB:
5426 			case FIL_PAGE_TYPE_ZBLOB2:
5427 				break;
5428 			default:
5429 				ut_error;
5430 			}
5431 			next_page_no = mach_read_from_4(page + FIL_PAGE_NEXT);
5432 
5433 			btr_page_free_low(index, ext_block, 0, &mtr);
5434 
5435 			if (page_zip != NULL) {
5436 				mach_write_to_4(field_ref + BTR_EXTERN_PAGE_NO,
5437 						next_page_no);
5438 				mach_write_to_4(field_ref + BTR_EXTERN_LEN + 4,
5439 						0);
5440 				page_zip_write_blob_ptr(page_zip, rec, index,
5441 							offsets, i, &mtr);
5442 			} else {
5443 				mlog_write_ulint(field_ref
5444 						 + BTR_EXTERN_PAGE_NO,
5445 						 next_page_no,
5446 						 MLOG_4BYTES, &mtr);
5447 				mlog_write_ulint(field_ref
5448 						 + BTR_EXTERN_LEN + 4, 0,
5449 						 MLOG_4BYTES, &mtr);
5450 			}
5451 		} else {
5452 			ut_a(!page_zip);
5453 			btr_check_blob_fil_page_type(space_id, page_no, page,
5454 						     FALSE);
5455 
5456 			next_page_no = mach_read_from_4(
5457 				page + FIL_PAGE_DATA
5458 				+ BTR_BLOB_HDR_NEXT_PAGE_NO);
5459 
5460 			/* We must supply the page level (= 0) as an argument
5461 			because we did not store it on the page (we save the
5462 			space overhead from an index page header. */
5463 
5464 			btr_page_free_low(index, ext_block, 0, &mtr);
5465 
5466 			mlog_write_ulint(field_ref + BTR_EXTERN_PAGE_NO,
5467 					 next_page_no,
5468 					 MLOG_4BYTES, &mtr);
5469 			/* Zero out the BLOB length.  If the server
5470 			crashes during the execution of this function,
5471 			trx_rollback_or_clean_all_recovered() could
5472 			dereference the half-deleted BLOB, fetching a
5473 			wrong prefix for the BLOB. */
5474 			mlog_write_ulint(field_ref + BTR_EXTERN_LEN + 4,
5475 					 0,
5476 					 MLOG_4BYTES, &mtr);
5477 		}
5478 
5479 		/* Commit mtr and release the BLOB block to save memory. */
5480 		btr_blob_free(ext_block, TRUE, &mtr);
5481 	}
5482 }
5483 
5484 /***********************************************************//**
5485 Frees the externally stored fields for a record. */
5486 static
5487 void
btr_rec_free_externally_stored_fields(dict_index_t * index,rec_t * rec,const ulint * offsets,page_zip_des_t * page_zip,enum trx_rb_ctx rb_ctx,mtr_t * mtr)5488 btr_rec_free_externally_stored_fields(
5489 /*==================================*/
5490 	dict_index_t*	index,	/*!< in: index of the data, the index
5491 				tree MUST be X-latched */
5492 	rec_t*		rec,	/*!< in/out: record */
5493 	const ulint*	offsets,/*!< in: rec_get_offsets(rec, index) */
5494 	page_zip_des_t*	page_zip,/*!< in: compressed page whose uncompressed
5495 				part will be updated, or NULL */
5496 	enum trx_rb_ctx	rb_ctx,	/*!< in: rollback context */
5497 	mtr_t*		mtr)	/*!< in: mini-transaction handle which contains
5498 				an X-latch to record page and to the index
5499 				tree */
5500 {
5501 	ulint	n_fields;
5502 	ulint	i;
5503 
5504 	ut_ad(rec_offs_validate(rec, index, offsets));
5505 	ut_ad(mtr_memo_contains_page(mtr, rec, MTR_MEMO_PAGE_X_FIX));
5506 	/* Free possible externally stored fields in the record */
5507 
5508 	ut_ad(dict_table_is_comp(index->table) == !!rec_offs_comp(offsets));
5509 	n_fields = rec_offs_n_fields(offsets);
5510 
5511 	for (i = 0; i < n_fields; i++) {
5512 		if (rec_offs_nth_extern(offsets, i)) {
5513 			btr_free_externally_stored_field(
5514 				index, btr_rec_get_field_ref(rec, offsets, i),
5515 				rec, offsets, page_zip, i, rb_ctx, mtr);
5516 		}
5517 	}
5518 }
5519 
5520 /***********************************************************//**
5521 Frees the externally stored fields for a record, if the field is mentioned
5522 in the update vector. */
5523 static
5524 void
btr_rec_free_updated_extern_fields(dict_index_t * index,rec_t * rec,page_zip_des_t * page_zip,const ulint * offsets,const upd_t * update,enum trx_rb_ctx rb_ctx,mtr_t * mtr)5525 btr_rec_free_updated_extern_fields(
5526 /*===============================*/
5527 	dict_index_t*	index,	/*!< in: index of rec; the index tree MUST be
5528 				X-latched */
5529 	rec_t*		rec,	/*!< in/out: record */
5530 	page_zip_des_t*	page_zip,/*!< in: compressed page whose uncompressed
5531 				part will be updated, or NULL */
5532 	const ulint*	offsets,/*!< in: rec_get_offsets(rec, index) */
5533 	const upd_t*	update,	/*!< in: update vector */
5534 	enum trx_rb_ctx	rb_ctx,	/*!< in: rollback context */
5535 	mtr_t*		mtr)	/*!< in: mini-transaction handle which contains
5536 				an X-latch to record page and to the tree */
5537 {
5538 	ulint	n_fields;
5539 	ulint	i;
5540 
5541 	ut_ad(rec_offs_validate(rec, index, offsets));
5542 	ut_ad(mtr_memo_contains_page(mtr, rec, MTR_MEMO_PAGE_X_FIX));
5543 
5544 	/* Free possible externally stored fields in the record */
5545 
5546 	n_fields = upd_get_n_fields(update);
5547 
5548 	for (i = 0; i < n_fields; i++) {
5549 		const upd_field_t* ufield = upd_get_nth_field(update, i);
5550 
5551 		if (rec_offs_nth_extern(offsets, ufield->field_no)) {
5552 			ulint	len;
5553 			byte*	data = rec_get_nth_field(
5554 				rec, offsets, ufield->field_no, &len);
5555 			ut_a(len >= BTR_EXTERN_FIELD_REF_SIZE);
5556 
5557 			btr_free_externally_stored_field(
5558 				index, data + len - BTR_EXTERN_FIELD_REF_SIZE,
5559 				rec, offsets, page_zip,
5560 				ufield->field_no, rb_ctx, mtr);
5561 		}
5562 	}
5563 }
5564 
5565 /*******************************************************************//**
5566 Copies the prefix of an uncompressed BLOB.  The clustered index record
5567 that points to this BLOB must be protected by a lock or a page latch.
5568 @return	number of bytes written to buf */
5569 static
5570 ulint
btr_copy_blob_prefix(byte * buf,ulint len,ulint space_id,ulint page_no,ulint offset)5571 btr_copy_blob_prefix(
5572 /*=================*/
5573 	byte*		buf,	/*!< out: the externally stored part of
5574 				the field, or a prefix of it */
5575 	ulint		len,	/*!< in: length of buf, in bytes */
5576 	ulint		space_id,/*!< in: space id of the BLOB pages */
5577 	ulint		page_no,/*!< in: page number of the first BLOB page */
5578 	ulint		offset)	/*!< in: offset on the first BLOB page */
5579 {
5580 	ulint	copied_len	= 0;
5581 
5582 	for (;;) {
5583 		mtr_t		mtr;
5584 		buf_block_t*	block;
5585 		const page_t*	page;
5586 		const byte*	blob_header;
5587 		ulint		part_len;
5588 		ulint		copy_len;
5589 
5590 		mtr_start(&mtr);
5591 
5592 		block = buf_page_get(space_id, 0, page_no, RW_S_LATCH, &mtr);
5593 		buf_block_dbg_add_level(block, SYNC_EXTERN_STORAGE);
5594 		page = buf_block_get_frame(block);
5595 
5596 		btr_check_blob_fil_page_type(space_id, page_no, page, TRUE);
5597 
5598 		blob_header = page + offset;
5599 		part_len = btr_blob_get_part_len(blob_header);
5600 		copy_len = ut_min(part_len, len - copied_len);
5601 
5602 		memcpy(buf + copied_len,
5603 		       blob_header + BTR_BLOB_HDR_SIZE, copy_len);
5604 		copied_len += copy_len;
5605 
5606 		page_no = btr_blob_get_next_page_no(blob_header);
5607 
5608 		mtr_commit(&mtr);
5609 
5610 		if (page_no == FIL_NULL || copy_len != part_len) {
5611 			UNIV_MEM_ASSERT_RW(buf, copied_len);
5612 			return(copied_len);
5613 		}
5614 
5615 		/* On other BLOB pages except the first the BLOB header
5616 		always is at the page data start: */
5617 
5618 		offset = FIL_PAGE_DATA;
5619 
5620 		ut_ad(copied_len <= len);
5621 	}
5622 }
5623 
5624 /*******************************************************************//**
5625 Copies the prefix of a compressed BLOB.  The clustered index record
5626 that points to this BLOB must be protected by a lock or a page latch.
5627 @return	number of bytes written to buf */
5628 static
5629 ulint
btr_copy_zblob_prefix(byte * buf,ulint len,ulint zip_size,ulint space_id,ulint page_no,ulint offset)5630 btr_copy_zblob_prefix(
5631 /*==================*/
5632 	byte*		buf,	/*!< out: the externally stored part of
5633 				the field, or a prefix of it */
5634 	ulint		len,	/*!< in: length of buf, in bytes */
5635 	ulint		zip_size,/*!< in: compressed BLOB page size */
5636 	ulint		space_id,/*!< in: space id of the BLOB pages */
5637 	ulint		page_no,/*!< in: page number of the first BLOB page */
5638 	ulint		offset)	/*!< in: offset on the first BLOB page */
5639 {
5640 	ulint		page_type = FIL_PAGE_TYPE_ZBLOB;
5641 	mem_heap_t*	heap;
5642 	int		err;
5643 	z_stream	d_stream;
5644 
5645 	d_stream.next_out = buf;
5646 	d_stream.avail_out = static_cast<uInt>(len);
5647 	d_stream.next_in = Z_NULL;
5648 	d_stream.avail_in = 0;
5649 
5650 	/* Zlib inflate needs 32 kilobytes for the default
5651 	window size, plus a few kilobytes for small objects. */
5652 	heap = mem_heap_create(40000);
5653 	page_zip_set_alloc(&d_stream, heap);
5654 
5655 	ut_ad(ut_is_2pow(zip_size));
5656 	ut_ad(zip_size >= UNIV_ZIP_SIZE_MIN);
5657 	ut_ad(zip_size <= UNIV_ZIP_SIZE_MAX);
5658 	ut_ad(space_id);
5659 
5660 	err = inflateInit(&d_stream);
5661 	ut_a(err == Z_OK);
5662 
5663 	for (;;) {
5664 		buf_page_t*	bpage;
5665 		ulint		next_page_no;
5666 
5667 		/* There is no latch on bpage directly.  Instead,
5668 		bpage is protected by the B-tree page latch that
5669 		is being held on the clustered index record, or,
5670 		in row_merge_copy_blobs(), by an exclusive table lock. */
5671 		bpage = buf_page_get_zip(space_id, zip_size, page_no);
5672 
5673 		if (UNIV_UNLIKELY(!bpage)) {
5674 			ut_print_timestamp(stderr);
5675 			fprintf(stderr,
5676 				"  InnoDB: Cannot load"
5677 				" compressed BLOB"
5678 				" page %lu space %lu\n",
5679 				(ulong) page_no, (ulong) space_id);
5680 			goto func_exit;
5681 		}
5682 
5683 		if (UNIV_UNLIKELY
5684 		    (fil_page_get_type(bpage->zip.data) != page_type)) {
5685 			ut_print_timestamp(stderr);
5686 			fprintf(stderr,
5687 				"  InnoDB: Unexpected type %lu of"
5688 				" compressed BLOB"
5689 				" page %lu space %lu\n",
5690 				(ulong) fil_page_get_type(bpage->zip.data),
5691 				(ulong) page_no, (ulong) space_id);
5692 			ut_ad(0);
5693 			goto end_of_blob;
5694 		}
5695 
5696 		next_page_no = mach_read_from_4(bpage->zip.data + offset);
5697 
5698 		if (UNIV_LIKELY(offset == FIL_PAGE_NEXT)) {
5699 			/* When the BLOB begins at page header,
5700 			the compressed data payload does not
5701 			immediately follow the next page pointer. */
5702 			offset = FIL_PAGE_DATA;
5703 		} else {
5704 			offset += 4;
5705 		}
5706 
5707 		d_stream.next_in = bpage->zip.data + offset;
5708 		d_stream.avail_in = static_cast<uInt>(zip_size - offset);
5709 
5710 		err = inflate(&d_stream, Z_NO_FLUSH);
5711 		switch (err) {
5712 		case Z_OK:
5713 			if (!d_stream.avail_out) {
5714 				goto end_of_blob;
5715 			}
5716 			break;
5717 		case Z_STREAM_END:
5718 			if (next_page_no == FIL_NULL) {
5719 				goto end_of_blob;
5720 			}
5721 			/* fall through */
5722 		default:
5723 inflate_error:
5724 			ut_print_timestamp(stderr);
5725 			fprintf(stderr,
5726 				"  InnoDB: inflate() of"
5727 				" compressed BLOB"
5728 				" page %lu space %lu returned %d (%s)\n",
5729 				(ulong) page_no, (ulong) space_id,
5730 				err, d_stream.msg);
5731 		case Z_BUF_ERROR:
5732 			goto end_of_blob;
5733 		}
5734 
5735 		if (next_page_no == FIL_NULL) {
5736 			if (!d_stream.avail_in) {
5737 				ut_print_timestamp(stderr);
5738 				fprintf(stderr,
5739 					"  InnoDB: unexpected end of"
5740 					" compressed BLOB"
5741 					" page %lu space %lu\n",
5742 					(ulong) page_no,
5743 					(ulong) space_id);
5744 			} else {
5745 				err = inflate(&d_stream, Z_FINISH);
5746 				switch (err) {
5747 				case Z_STREAM_END:
5748 				case Z_BUF_ERROR:
5749 					break;
5750 				default:
5751 					goto inflate_error;
5752 				}
5753 			}
5754 
5755 end_of_blob:
5756 			buf_page_release_zip(bpage);
5757 			goto func_exit;
5758 		}
5759 
5760 		buf_page_release_zip(bpage);
5761 
5762 		/* On other BLOB pages except the first
5763 		the BLOB header always is at the page header: */
5764 
5765 		page_no = next_page_no;
5766 		offset = FIL_PAGE_NEXT;
5767 		page_type = FIL_PAGE_TYPE_ZBLOB2;
5768 	}
5769 
5770 func_exit:
5771 	inflateEnd(&d_stream);
5772 	mem_heap_free(heap);
5773 	UNIV_MEM_ASSERT_RW(buf, d_stream.total_out);
5774 	return(d_stream.total_out);
5775 }
5776 
5777 /*******************************************************************//**
5778 Copies the prefix of an externally stored field of a record.  The
5779 clustered index record that points to this BLOB must be protected by a
5780 lock or a page latch.
5781 @return	number of bytes written to buf */
5782 static
5783 ulint
btr_copy_externally_stored_field_prefix_low(byte * buf,ulint len,ulint zip_size,ulint space_id,ulint page_no,ulint offset)5784 btr_copy_externally_stored_field_prefix_low(
5785 /*========================================*/
5786 	byte*		buf,	/*!< out: the externally stored part of
5787 				the field, or a prefix of it */
5788 	ulint		len,	/*!< in: length of buf, in bytes */
5789 	ulint		zip_size,/*!< in: nonzero=compressed BLOB page size,
5790 				zero for uncompressed BLOBs */
5791 	ulint		space_id,/*!< in: space id of the first BLOB page */
5792 	ulint		page_no,/*!< in: page number of the first BLOB page */
5793 	ulint		offset)	/*!< in: offset on the first BLOB page */
5794 {
5795 	if (UNIV_UNLIKELY(len == 0)) {
5796 		return(0);
5797 	}
5798 
5799 	if (zip_size) {
5800 		return(btr_copy_zblob_prefix(buf, len, zip_size,
5801 					     space_id, page_no, offset));
5802 	} else {
5803 		return(btr_copy_blob_prefix(buf, len, space_id,
5804 					    page_no, offset));
5805 	}
5806 }
5807 
5808 /*******************************************************************//**
5809 Copies the prefix of an externally stored field of a record.  The
5810 clustered index record must be protected by a lock or a page latch.
5811 @return the length of the copied field, or 0 if the column was being
5812 or has been deleted */
5813 UNIV_INTERN
5814 ulint
btr_copy_externally_stored_field_prefix(byte * buf,ulint len,ulint zip_size,const byte * data,ulint local_len)5815 btr_copy_externally_stored_field_prefix(
5816 /*====================================*/
5817 	byte*		buf,	/*!< out: the field, or a prefix of it */
5818 	ulint		len,	/*!< in: length of buf, in bytes */
5819 	ulint		zip_size,/*!< in: nonzero=compressed BLOB page size,
5820 				zero for uncompressed BLOBs */
5821 	const byte*	data,	/*!< in: 'internally' stored part of the
5822 				field containing also the reference to
5823 				the external part; must be protected by
5824 				a lock or a page latch */
5825 	ulint		local_len)/*!< in: length of data, in bytes */
5826 {
5827 	ulint	space_id;
5828 	ulint	page_no;
5829 	ulint	offset;
5830 
5831 	ut_a(local_len >= BTR_EXTERN_FIELD_REF_SIZE);
5832 
5833 	local_len -= BTR_EXTERN_FIELD_REF_SIZE;
5834 
5835 	if (UNIV_UNLIKELY(local_len >= len)) {
5836 		memcpy(buf, data, len);
5837 		return(len);
5838 	}
5839 
5840 	memcpy(buf, data, local_len);
5841 	data += local_len;
5842 
5843 	ut_a(memcmp(data, field_ref_zero, BTR_EXTERN_FIELD_REF_SIZE));
5844 
5845 	if (!mach_read_from_4(data + BTR_EXTERN_LEN + 4)) {
5846 		/* The externally stored part of the column has been
5847 		(partially) deleted.  Signal the half-deleted BLOB
5848 		to the caller. */
5849 
5850 		return(0);
5851 	}
5852 
5853 	space_id = mach_read_from_4(data + BTR_EXTERN_SPACE_ID);
5854 
5855 	page_no = mach_read_from_4(data + BTR_EXTERN_PAGE_NO);
5856 
5857 	offset = mach_read_from_4(data + BTR_EXTERN_OFFSET);
5858 
5859 	return(local_len
5860 	       + btr_copy_externally_stored_field_prefix_low(buf + local_len,
5861 							     len - local_len,
5862 							     zip_size,
5863 							     space_id, page_no,
5864 							     offset));
5865 }
5866 
5867 /*******************************************************************//**
5868 Copies an externally stored field of a record to mem heap.  The
5869 clustered index record must be protected by a lock or a page latch.
5870 @return	the whole field copied to heap */
5871 UNIV_INTERN
5872 byte*
btr_copy_externally_stored_field(ulint * len,const byte * data,ulint zip_size,ulint local_len,mem_heap_t * heap)5873 btr_copy_externally_stored_field(
5874 /*=============================*/
5875 	ulint*		len,	/*!< out: length of the whole field */
5876 	const byte*	data,	/*!< in: 'internally' stored part of the
5877 				field containing also the reference to
5878 				the external part; must be protected by
5879 				a lock or a page latch */
5880 	ulint		zip_size,/*!< in: nonzero=compressed BLOB page size,
5881 				zero for uncompressed BLOBs */
5882 	ulint		local_len,/*!< in: length of data */
5883 	mem_heap_t*	heap)	/*!< in: mem heap */
5884 {
5885 	ulint	space_id;
5886 	ulint	page_no;
5887 	ulint	offset;
5888 	ulint	extern_len;
5889 	byte*	buf;
5890 
5891 	ut_a(local_len >= BTR_EXTERN_FIELD_REF_SIZE);
5892 
5893 	local_len -= BTR_EXTERN_FIELD_REF_SIZE;
5894 
5895 	space_id = mach_read_from_4(data + local_len + BTR_EXTERN_SPACE_ID);
5896 
5897 	page_no = mach_read_from_4(data + local_len + BTR_EXTERN_PAGE_NO);
5898 
5899 	offset = mach_read_from_4(data + local_len + BTR_EXTERN_OFFSET);
5900 
5901 	/* Currently a BLOB cannot be bigger than 4 GB; we
5902 	leave the 4 upper bytes in the length field unused */
5903 
5904 	extern_len = mach_read_from_4(data + local_len + BTR_EXTERN_LEN + 4);
5905 
5906 	buf = (byte*) mem_heap_alloc(heap, local_len + extern_len);
5907 
5908 	memcpy(buf, data, local_len);
5909 	*len = local_len
5910 		+ btr_copy_externally_stored_field_prefix_low(buf + local_len,
5911 							      extern_len,
5912 							      zip_size,
5913 							      space_id,
5914 							      page_no, offset);
5915 
5916 	return(buf);
5917 }
5918 
5919 /*******************************************************************//**
5920 Copies an externally stored field of a record to mem heap.
5921 @return	the field copied to heap, or NULL if the field is incomplete */
5922 UNIV_INTERN
5923 byte*
btr_rec_copy_externally_stored_field(const rec_t * rec,const ulint * offsets,ulint zip_size,ulint no,ulint * len,mem_heap_t * heap)5924 btr_rec_copy_externally_stored_field(
5925 /*=================================*/
5926 	const rec_t*	rec,	/*!< in: record in a clustered index;
5927 				must be protected by a lock or a page latch */
5928 	const ulint*	offsets,/*!< in: array returned by rec_get_offsets() */
5929 	ulint		zip_size,/*!< in: nonzero=compressed BLOB page size,
5930 				zero for uncompressed BLOBs */
5931 	ulint		no,	/*!< in: field number */
5932 	ulint*		len,	/*!< out: length of the field */
5933 	mem_heap_t*	heap)	/*!< in: mem heap */
5934 {
5935 	ulint		local_len;
5936 	const byte*	data;
5937 
5938 	ut_a(rec_offs_nth_extern(offsets, no));
5939 
5940 	/* An externally stored field can contain some initial
5941 	data from the field, and in the last 20 bytes it has the
5942 	space id, page number, and offset where the rest of the
5943 	field data is stored, and the data length in addition to
5944 	the data stored locally. We may need to store some data
5945 	locally to get the local record length above the 128 byte
5946 	limit so that field offsets are stored in two bytes, and
5947 	the extern bit is available in those two bytes. */
5948 
5949 	data = rec_get_nth_field(rec, offsets, no, &local_len);
5950 
5951 	ut_a(local_len >= BTR_EXTERN_FIELD_REF_SIZE);
5952 
5953 	if (UNIV_UNLIKELY
5954 	    (!memcmp(data + local_len - BTR_EXTERN_FIELD_REF_SIZE,
5955 		     field_ref_zero, BTR_EXTERN_FIELD_REF_SIZE))) {
5956 		/* The externally stored field was not written yet.
5957 		This record should only be seen by
5958 		recv_recovery_rollback_active() or any
5959 		TRX_ISO_READ_UNCOMMITTED transactions. */
5960 		return(NULL);
5961 	}
5962 
5963 	return(btr_copy_externally_stored_field(len, data,
5964 						zip_size, local_len, heap));
5965 }
5966 #endif /* !UNIV_HOTBACKUP */
5967