1 /*****************************************************************************
2 
3 Copyright (c) 1994, 2016, Oracle and/or its affiliates. All Rights Reserved.
4 Copyright (c) 2008, Google Inc.
5 Copyright (c) 2012, Facebook Inc.
6 
7 Portions of this file contain modifications contributed and copyrighted by
8 Google, Inc. Those modifications are gratefully acknowledged and are described
9 briefly in the InnoDB documentation. The contributions by Google are
10 incorporated with their permission, and subject to the conditions contained in
11 the file COPYING.Google.
12 
13 This program is free software; you can redistribute it and/or modify
14 it under the terms of the GNU General Public License, version 2.0,
15 as published by the Free Software Foundation.
16 
17 This program is also distributed with certain software (including
18 but not limited to OpenSSL) that is licensed under separate terms,
19 as designated in a particular file or component or in included license
20 documentation.  The authors of MySQL hereby grant you an additional
21 permission to link the program and your derivative works with the
22 separately licensed software that they have included with MySQL.
23 
24 This program is distributed in the hope that it will be useful,
25 but WITHOUT ANY WARRANTY; without even the implied warranty of
26 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
27 GNU General Public License, version 2.0, for more details.
28 
29 You should have received a copy of the GNU General Public License along with
30 this program; if not, write to the Free Software Foundation, Inc.,
31 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
32 
33 *****************************************************************************/
34 
35 /**************************************************//**
36 @file btr/btr0cur.cc
37 The index tree cursor
38 
39 All changes that row operations make to a B-tree or the records
40 there must go through this module! Undo log records are written here
41 of every modify or insert of a clustered index record.
42 
43 			NOTE!!!
44 To make sure we do not run out of disk space during a pessimistic
45 insert or update, we have to reserve 2 x the height of the index tree
46 many pages in the tablespace before we start the operation, because
47 if leaf splitting has been started, it is difficult to undo, except
48 by crashing the database and doing a roll-forward.
49 
50 Created 10/16/1994 Heikki Tuuri
51 *******************************************************/
52 
53 #include "btr0cur.h"
54 
55 #ifdef UNIV_NONINL
56 #include "btr0cur.ic"
57 #endif
58 
59 #include "row0upd.h"
60 #ifndef UNIV_HOTBACKUP
61 #include "mtr0log.h"
62 #include "page0page.h"
63 #include "page0zip.h"
64 #include "rem0rec.h"
65 #include "rem0cmp.h"
66 #include "buf0lru.h"
67 #include "btr0btr.h"
68 #include "btr0sea.h"
69 #include "row0log.h"
70 #include "row0purge.h"
71 #include "row0upd.h"
72 #include "trx0rec.h"
73 #include "trx0roll.h" /* trx_is_recv() */
74 #include "que0que.h"
75 #include "row0row.h"
76 #include "srv0srv.h"
77 #include "ibuf0ibuf.h"
78 #include "lock0lock.h"
79 #include "zlib.h"
80 
81 /** Buffered B-tree operation types, introduced as part of delete buffering. */
82 enum btr_op_t {
83 	BTR_NO_OP = 0,			/*!< Not buffered */
84 	BTR_INSERT_OP,			/*!< Insert, do not ignore UNIQUE */
85 	BTR_INSERT_IGNORE_UNIQUE_OP,	/*!< Insert, ignoring UNIQUE */
86 	BTR_DELETE_OP,			/*!< Purge a delete-marked record */
87 	BTR_DELMARK_OP			/*!< Mark a record for deletion */
88 };
89 
90 #ifdef UNIV_DEBUG
91 /** If the following is set to TRUE, this module prints a lot of
92 trace information of individual record operations */
93 UNIV_INTERN ibool	btr_cur_print_record_ops = FALSE;
94 #endif /* UNIV_DEBUG */
95 
96 /** Number of searches down the B-tree in btr_cur_search_to_nth_level(). */
97 UNIV_INTERN ulint	btr_cur_n_non_sea	= 0;
98 /** Number of successful adaptive hash index lookups in
99 btr_cur_search_to_nth_level(). */
100 UNIV_INTERN ulint	btr_cur_n_sea		= 0;
101 /** Old value of btr_cur_n_non_sea.  Copied by
102 srv_refresh_innodb_monitor_stats().  Referenced by
103 srv_printf_innodb_monitor(). */
104 UNIV_INTERN ulint	btr_cur_n_non_sea_old	= 0;
105 /** Old value of btr_cur_n_sea.  Copied by
106 srv_refresh_innodb_monitor_stats().  Referenced by
107 srv_printf_innodb_monitor(). */
108 UNIV_INTERN ulint	btr_cur_n_sea_old	= 0;
109 
110 #ifdef UNIV_DEBUG
111 /* Flag to limit optimistic insert records */
112 UNIV_INTERN uint	btr_cur_limit_optimistic_insert_debug = 0;
113 #endif /* UNIV_DEBUG */
114 
115 /** In the optimistic insert, if the insert does not fit, but this much space
116 can be released by page reorganize, then it is reorganized */
117 #define BTR_CUR_PAGE_REORGANIZE_LIMIT	(UNIV_PAGE_SIZE / 32)
118 
119 /** The structure of a BLOB part header */
120 /* @{ */
121 /*--------------------------------------*/
122 #define BTR_BLOB_HDR_PART_LEN		0	/*!< BLOB part len on this
123 						page */
124 #define BTR_BLOB_HDR_NEXT_PAGE_NO	4	/*!< next BLOB part page no,
125 						FIL_NULL if none */
126 /*--------------------------------------*/
127 #define BTR_BLOB_HDR_SIZE		8	/*!< Size of a BLOB
128 						part header, in bytes */
129 
130 /** Estimated table level stats from sampled value.
131 @param value		sampled stats
132 @param index		index being sampled
133 @param sample		number of sampled rows
134 @param ext_size		external stored data size
135 @param not_empty	table not empty
136 @return estimated table wide stats from sampled value */
137 #define BTR_TABLE_STATS_FROM_SAMPLE(value, index, sample, ext_size, not_empty)\
138 	(((value) * (ib_int64_t) index->stat_n_leaf_pages		\
139 	  + (sample) - 1 + (ext_size) + (not_empty)) / ((sample) + (ext_size)))
140 
141 /* @} */
142 #endif /* !UNIV_HOTBACKUP */
143 
144 /** A BLOB field reference full of zero, for use in assertions and tests.
145 Initially, BLOB field references are set to zero, in
146 dtuple_convert_big_rec(). */
147 const byte field_ref_zero[BTR_EXTERN_FIELD_REF_SIZE] = {
148 	0, 0, 0, 0, 0,
149 	0, 0, 0, 0, 0,
150 	0, 0, 0, 0, 0,
151 	0, 0, 0, 0, 0,
152 };
153 
154 #ifndef UNIV_HOTBACKUP
155 /*******************************************************************//**
156 Marks all extern fields in a record as owned by the record. This function
157 should be called if the delete mark of a record is removed: a not delete
158 marked record always owns all its extern fields. */
159 static
160 void
161 btr_cur_unmark_extern_fields(
162 /*=========================*/
163 	page_zip_des_t*	page_zip,/*!< in/out: compressed page whose uncompressed
164 				part will be updated, or NULL */
165 	rec_t*		rec,	/*!< in/out: record in a clustered index */
166 	dict_index_t*	index,	/*!< in: index of the page */
167 	const ulint*	offsets,/*!< in: array returned by rec_get_offsets() */
168 	mtr_t*		mtr);	/*!< in: mtr, or NULL if not logged */
169 /*******************************************************************//**
170 Adds path information to the cursor for the current page, for which
171 the binary search has been performed. */
172 static
173 void
174 btr_cur_add_path_info(
175 /*==================*/
176 	btr_cur_t*	cursor,		/*!< in: cursor positioned on a page */
177 	ulint		height,		/*!< in: height of the page in tree;
178 					0 means leaf node */
179 	ulint		root_height);	/*!< in: root node height in tree */
180 /***********************************************************//**
181 Frees the externally stored fields for a record, if the field is mentioned
182 in the update vector. */
183 static
184 void
185 btr_rec_free_updated_extern_fields(
186 /*===============================*/
187 	dict_index_t*	index,	/*!< in: index of rec; the index tree MUST be
188 				X-latched */
189 	rec_t*		rec,	/*!< in: record */
190 	page_zip_des_t*	page_zip,/*!< in: compressed page whose uncompressed
191 				part will be updated, or NULL */
192 	const ulint*	offsets,/*!< in: rec_get_offsets(rec, index) */
193 	const upd_t*	update,	/*!< in: update vector */
194 	enum trx_rb_ctx	rb_ctx,	/*!< in: rollback context */
195 	mtr_t*		mtr);	/*!< in: mini-transaction handle which contains
196 				an X-latch to record page and to the tree */
197 /***********************************************************//**
198 Frees the externally stored fields for a record. */
199 static
200 void
201 btr_rec_free_externally_stored_fields(
202 /*==================================*/
203 	dict_index_t*	index,	/*!< in: index of the data, the index
204 				tree MUST be X-latched */
205 	rec_t*		rec,	/*!< in: record */
206 	const ulint*	offsets,/*!< in: rec_get_offsets(rec, index) */
207 	page_zip_des_t*	page_zip,/*!< in: compressed page whose uncompressed
208 				part will be updated, or NULL */
209 	enum trx_rb_ctx	rb_ctx,	/*!< in: rollback context */
210 	mtr_t*		mtr);	/*!< in: mini-transaction handle which contains
211 				an X-latch to record page and to the index
212 				tree */
213 #endif /* !UNIV_HOTBACKUP */
214 
215 /******************************************************//**
216 The following function is used to set the deleted bit of a record. */
217 UNIV_INLINE
218 void
btr_rec_set_deleted_flag(rec_t * rec,page_zip_des_t * page_zip,ulint flag)219 btr_rec_set_deleted_flag(
220 /*=====================*/
221 	rec_t*		rec,	/*!< in/out: physical record */
222 	page_zip_des_t*	page_zip,/*!< in/out: compressed page (or NULL) */
223 	ulint		flag)	/*!< in: nonzero if delete marked */
224 {
225 	if (page_rec_is_comp(rec)) {
226 		rec_set_deleted_flag_new(rec, page_zip, flag);
227 	} else {
228 		ut_ad(!page_zip);
229 		rec_set_deleted_flag_old(rec, flag);
230 	}
231 }
232 
233 #ifndef UNIV_HOTBACKUP
234 /*==================== B-TREE SEARCH =========================*/
235 
236 /********************************************************************//**
237 Latches the leaf page or pages requested. */
238 static
239 void
btr_cur_latch_leaves(page_t * page,ulint space,ulint zip_size,ulint page_no,ulint latch_mode,btr_cur_t * cursor,mtr_t * mtr)240 btr_cur_latch_leaves(
241 /*=================*/
242 	page_t*		page,		/*!< in: leaf page where the search
243 					converged */
244 	ulint		space,		/*!< in: space id */
245 	ulint		zip_size,	/*!< in: compressed page size in bytes
246 					or 0 for uncompressed pages */
247 	ulint		page_no,	/*!< in: page number of the leaf */
248 	ulint		latch_mode,	/*!< in: BTR_SEARCH_LEAF, ... */
249 	btr_cur_t*	cursor,		/*!< in: cursor */
250 	mtr_t*		mtr)		/*!< in: mtr */
251 {
252 	ulint		mode;
253 	ulint		left_page_no;
254 	ulint		right_page_no;
255 	buf_block_t*	get_block;
256 
257 	ut_ad(page && mtr);
258 
259 	switch (latch_mode) {
260 	case BTR_SEARCH_LEAF:
261 	case BTR_MODIFY_LEAF:
262 		mode = latch_mode == BTR_SEARCH_LEAF ? RW_S_LATCH : RW_X_LATCH;
263 		get_block = btr_block_get(
264 			space, zip_size, page_no, mode, cursor->index, mtr);
265 #ifdef UNIV_BTR_DEBUG
266 		ut_a(page_is_comp(get_block->frame) == page_is_comp(page));
267 #endif /* UNIV_BTR_DEBUG */
268 		get_block->check_index_page_at_flush = TRUE;
269 		return;
270 	case BTR_MODIFY_TREE:
271 		/* x-latch also brothers from left to right */
272 		left_page_no = btr_page_get_prev(page, mtr);
273 
274 		if (left_page_no != FIL_NULL) {
275 			get_block = btr_block_get(
276 				space, zip_size, left_page_no,
277 				RW_X_LATCH, cursor->index, mtr);
278 #ifdef UNIV_BTR_DEBUG
279 			ut_a(page_is_comp(get_block->frame)
280 			     == page_is_comp(page));
281 			ut_a(btr_page_get_next(get_block->frame, mtr)
282 			     == page_get_page_no(page));
283 #endif /* UNIV_BTR_DEBUG */
284 			get_block->check_index_page_at_flush = TRUE;
285 		}
286 
287 		get_block = btr_block_get(
288 			space, zip_size, page_no,
289 			RW_X_LATCH, cursor->index, mtr);
290 #ifdef UNIV_BTR_DEBUG
291 		ut_a(page_is_comp(get_block->frame) == page_is_comp(page));
292 #endif /* UNIV_BTR_DEBUG */
293 		get_block->check_index_page_at_flush = TRUE;
294 
295 		right_page_no = btr_page_get_next(page, mtr);
296 
297 		if (right_page_no != FIL_NULL) {
298 			get_block = btr_block_get(
299 				space, zip_size, right_page_no,
300 				RW_X_LATCH, cursor->index, mtr);
301 #ifdef UNIV_BTR_DEBUG
302 			ut_a(page_is_comp(get_block->frame)
303 			     == page_is_comp(page));
304 			ut_a(btr_page_get_prev(get_block->frame, mtr)
305 			     == page_get_page_no(page));
306 #endif /* UNIV_BTR_DEBUG */
307 			get_block->check_index_page_at_flush = TRUE;
308 		}
309 
310 		return;
311 
312 	case BTR_SEARCH_PREV:
313 	case BTR_MODIFY_PREV:
314 		mode = latch_mode == BTR_SEARCH_PREV ? RW_S_LATCH : RW_X_LATCH;
315 		/* latch also left brother */
316 		left_page_no = btr_page_get_prev(page, mtr);
317 
318 		if (left_page_no != FIL_NULL) {
319 			get_block = btr_block_get(
320 				space, zip_size,
321 				left_page_no, mode, cursor->index, mtr);
322 			cursor->left_block = get_block;
323 #ifdef UNIV_BTR_DEBUG
324 			ut_a(page_is_comp(get_block->frame)
325 			     == page_is_comp(page));
326 			ut_a(btr_page_get_next(get_block->frame, mtr)
327 			     == page_get_page_no(page));
328 #endif /* UNIV_BTR_DEBUG */
329 			get_block->check_index_page_at_flush = TRUE;
330 		}
331 
332 		get_block = btr_block_get(
333 			space, zip_size, page_no, mode, cursor->index, mtr);
334 #ifdef UNIV_BTR_DEBUG
335 		ut_a(page_is_comp(get_block->frame) == page_is_comp(page));
336 #endif /* UNIV_BTR_DEBUG */
337 		get_block->check_index_page_at_flush = TRUE;
338 		return;
339 	}
340 
341 	ut_error;
342 }
343 
344 /********************************************************************//**
345 Searches an index tree and positions a tree cursor on a given level.
346 NOTE: n_fields_cmp in tuple must be set so that it cannot be compared
347 to node pointer page number fields on the upper levels of the tree!
348 Note that if mode is PAGE_CUR_LE, which is used in inserts, then
349 cursor->up_match and cursor->low_match both will have sensible values.
350 If mode is PAGE_CUR_GE, then up_match will a have a sensible value.
351 
352 If mode is PAGE_CUR_LE , cursor is left at the place where an insert of the
353 search tuple should be performed in the B-tree. InnoDB does an insert
354 immediately after the cursor. Thus, the cursor may end up on a user record,
355 or on a page infimum record. */
356 UNIV_INTERN
357 void
btr_cur_search_to_nth_level(dict_index_t * index,ulint level,const dtuple_t * tuple,ulint mode,ulint latch_mode,btr_cur_t * cursor,ulint has_search_latch,const char * file,ulint line,mtr_t * mtr)358 btr_cur_search_to_nth_level(
359 /*========================*/
360 	dict_index_t*	index,	/*!< in: index */
361 	ulint		level,	/*!< in: the tree level of search */
362 	const dtuple_t*	tuple,	/*!< in: data tuple; NOTE: n_fields_cmp in
363 				tuple must be set so that it cannot get
364 				compared to the node ptr page number field! */
365 	ulint		mode,	/*!< in: PAGE_CUR_L, ...;
366 				Inserts should always be made using
367 				PAGE_CUR_LE to search the position! */
368 	ulint		latch_mode, /*!< in: BTR_SEARCH_LEAF, ..., ORed with
369 				at most one of BTR_INSERT, BTR_DELETE_MARK,
370 				BTR_DELETE, or BTR_ESTIMATE;
371 				cursor->left_block is used to store a pointer
372 				to the left neighbor page, in the cases
373 				BTR_SEARCH_PREV and BTR_MODIFY_PREV;
374 				NOTE that if has_search_latch
375 				is != 0, we maybe do not have a latch set
376 				on the cursor page, we assume
377 				the caller uses his search latch
378 				to protect the record! */
379 	btr_cur_t*	cursor, /*!< in/out: tree cursor; the cursor page is
380 				s- or x-latched, but see also above! */
381 	ulint		has_search_latch,/*!< in: info on the latch mode the
382 				caller currently has on btr_search_latch:
383 				RW_S_LATCH, or 0 */
384 	const char*	file,	/*!< in: file name */
385 	ulint		line,	/*!< in: line where called */
386 	mtr_t*		mtr)	/*!< in: mtr */
387 {
388 	page_t*		page;
389 	buf_block_t*	block;
390 	ulint		space;
391 	buf_block_t*	guess;
392 	ulint		height;
393 	ulint		page_no;
394 	ulint		up_match;
395 	ulint		up_bytes;
396 	ulint		low_match;
397 	ulint		low_bytes;
398 	ulint		savepoint;
399 	ulint		rw_latch;
400 	ulint		page_mode;
401 	ulint		buf_mode;
402 	ulint		estimate;
403 	ulint		zip_size;
404 	page_cur_t*	page_cursor;
405 	btr_op_t	btr_op;
406 	ulint		root_height = 0; /* remove warning */
407 
408 #ifdef BTR_CUR_ADAPT
409 	btr_search_t*	info;
410 #endif
411 	mem_heap_t*	heap		= NULL;
412 	ulint		offsets_[REC_OFFS_NORMAL_SIZE];
413 	ulint*		offsets		= offsets_;
414 	rec_offs_init(offsets_);
415 	/* Currently, PAGE_CUR_LE is the only search mode used for searches
416 	ending to upper levels */
417 
418 	ut_ad(level == 0 || mode == PAGE_CUR_LE);
419 	ut_ad(dict_index_check_search_tuple(index, tuple));
420 	ut_ad(!dict_index_is_ibuf(index) || ibuf_inside(mtr));
421 	ut_ad(dtuple_check_typed(tuple));
422 	ut_ad(!(index->type & DICT_FTS));
423 	ut_ad(index->page != FIL_NULL);
424 
425 	UNIV_MEM_INVALID(&cursor->up_match, sizeof cursor->up_match);
426 	UNIV_MEM_INVALID(&cursor->up_bytes, sizeof cursor->up_bytes);
427 	UNIV_MEM_INVALID(&cursor->low_match, sizeof cursor->low_match);
428 	UNIV_MEM_INVALID(&cursor->low_bytes, sizeof cursor->low_bytes);
429 #ifdef UNIV_DEBUG
430 	cursor->up_match = ULINT_UNDEFINED;
431 	cursor->low_match = ULINT_UNDEFINED;
432 #endif
433 
434 	ibool	s_latch_by_caller;
435 
436 	s_latch_by_caller = latch_mode & BTR_ALREADY_S_LATCHED;
437 
438 	ut_ad(!s_latch_by_caller
439 	      || mtr_memo_contains(mtr, dict_index_get_lock(index),
440 				   MTR_MEMO_S_LOCK));
441 
442 	/* These flags are mutually exclusive, they are lumped together
443 	with the latch mode for historical reasons. It's possible for
444 	none of the flags to be set. */
445 	switch (UNIV_EXPECT(latch_mode
446 			    & (BTR_INSERT | BTR_DELETE | BTR_DELETE_MARK),
447 			    0)) {
448 	case 0:
449 		btr_op = BTR_NO_OP;
450 		break;
451 	case BTR_INSERT:
452 		btr_op = (latch_mode & BTR_IGNORE_SEC_UNIQUE)
453 			? BTR_INSERT_IGNORE_UNIQUE_OP
454 			: BTR_INSERT_OP;
455 		break;
456 	case BTR_DELETE:
457 		btr_op = BTR_DELETE_OP;
458 		ut_a(cursor->purge_node);
459 		break;
460 	case BTR_DELETE_MARK:
461 		btr_op = BTR_DELMARK_OP;
462 		break;
463 	default:
464 		/* only one of BTR_INSERT, BTR_DELETE, BTR_DELETE_MARK
465 		should be specified at a time */
466 		ut_error;
467 	}
468 
469 	/* Operations on the insert buffer tree cannot be buffered. */
470 	ut_ad(btr_op == BTR_NO_OP || !dict_index_is_ibuf(index));
471 	/* Operations on the clustered index cannot be buffered. */
472 	ut_ad(btr_op == BTR_NO_OP || !dict_index_is_clust(index));
473 
474 	estimate = latch_mode & BTR_ESTIMATE;
475 
476 	/* Turn the flags unrelated to the latch mode off. */
477 	latch_mode = BTR_LATCH_MODE_WITHOUT_FLAGS(latch_mode);
478 
479 	ut_ad(!s_latch_by_caller
480 	      || latch_mode == BTR_SEARCH_LEAF
481 	      || latch_mode == BTR_MODIFY_LEAF);
482 
483 	cursor->flag = BTR_CUR_BINARY;
484 	cursor->index = index;
485 
486 #ifndef BTR_CUR_ADAPT
487 	guess = NULL;
488 #else
489 	info = btr_search_get_info(index);
490 
491 	guess = info->root_guess;
492 
493 #ifdef BTR_CUR_HASH_ADAPT
494 
495 # ifdef UNIV_SEARCH_PERF_STAT
496 	info->n_searches++;
497 # endif
498 	if (rw_lock_get_writer(&btr_search_latch) == RW_LOCK_NOT_LOCKED
499 	    && latch_mode <= BTR_MODIFY_LEAF
500 	    && info->last_hash_succ
501 	    && !estimate
502 # ifdef PAGE_CUR_LE_OR_EXTENDS
503 	    && mode != PAGE_CUR_LE_OR_EXTENDS
504 # endif /* PAGE_CUR_LE_OR_EXTENDS */
505 	    /* If !has_search_latch, we do a dirty read of
506 	    btr_search_enabled below, and btr_search_guess_on_hash()
507 	    will have to check it again. */
508 	    && UNIV_LIKELY(btr_search_enabled)
509 	    && btr_search_guess_on_hash(index, info, tuple, mode,
510 					latch_mode, cursor,
511 					has_search_latch, mtr)) {
512 
513 		/* Search using the hash index succeeded */
514 
515 		ut_ad(cursor->up_match != ULINT_UNDEFINED
516 		      || mode != PAGE_CUR_GE);
517 		ut_ad(cursor->up_match != ULINT_UNDEFINED
518 		      || mode != PAGE_CUR_LE);
519 		ut_ad(cursor->low_match != ULINT_UNDEFINED
520 		      || mode != PAGE_CUR_LE);
521 		btr_cur_n_sea++;
522 
523 		return;
524 	}
525 # endif /* BTR_CUR_HASH_ADAPT */
526 #endif /* BTR_CUR_ADAPT */
527 	btr_cur_n_non_sea++;
528 
529 	/* If the hash search did not succeed, do binary search down the
530 	tree */
531 
532 	if (has_search_latch) {
533 		/* Release possible search latch to obey latching order */
534 		rw_lock_s_unlock(&btr_search_latch);
535 	}
536 
537 	/* Store the position of the tree latch we push to mtr so that we
538 	know how to release it when we have latched leaf node(s) */
539 
540 	savepoint = mtr_set_savepoint(mtr);
541 
542 	switch (latch_mode) {
543 	case BTR_MODIFY_TREE:
544 		mtr_x_lock(dict_index_get_lock(index), mtr);
545 		break;
546 	case BTR_CONT_MODIFY_TREE:
547 		/* Do nothing */
548 		ut_ad(mtr_memo_contains(mtr, dict_index_get_lock(index),
549 					MTR_MEMO_X_LOCK));
550 		break;
551 	default:
552 		if (!s_latch_by_caller) {
553 			mtr_s_lock(dict_index_get_lock(index), mtr);
554 		}
555 	}
556 
557 	page_cursor = btr_cur_get_page_cur(cursor);
558 
559 	space = dict_index_get_space(index);
560 	page_no = dict_index_get_page(index);
561 
562 	up_match = 0;
563 	up_bytes = 0;
564 	low_match = 0;
565 	low_bytes = 0;
566 
567 	height = ULINT_UNDEFINED;
568 
569 	/* We use these modified search modes on non-leaf levels of the
570 	B-tree. These let us end up in the right B-tree leaf. In that leaf
571 	we use the original search mode. */
572 
573 	switch (mode) {
574 	case PAGE_CUR_GE:
575 		page_mode = PAGE_CUR_L;
576 		break;
577 	case PAGE_CUR_G:
578 		page_mode = PAGE_CUR_LE;
579 		break;
580 	default:
581 #ifdef PAGE_CUR_LE_OR_EXTENDS
582 		ut_ad(mode == PAGE_CUR_L || mode == PAGE_CUR_LE
583 		      || mode == PAGE_CUR_LE_OR_EXTENDS);
584 #else /* PAGE_CUR_LE_OR_EXTENDS */
585 		ut_ad(mode == PAGE_CUR_L || mode == PAGE_CUR_LE);
586 #endif /* PAGE_CUR_LE_OR_EXTENDS */
587 		page_mode = mode;
588 		break;
589 	}
590 
591 	/* Loop and search until we arrive at the desired level */
592 
593 search_loop:
594 	buf_mode = BUF_GET;
595 	rw_latch = RW_NO_LATCH;
596 
597 	if (height != 0) {
598 		/* We are about to fetch the root or a non-leaf page. */
599 	} else if (latch_mode <= BTR_MODIFY_LEAF) {
600 		rw_latch = latch_mode;
601 
602 		if (btr_op != BTR_NO_OP
603 		    && ibuf_should_try(index, btr_op != BTR_INSERT_OP)) {
604 
605 			/* Try to buffer the operation if the leaf
606 			page is not in the buffer pool. */
607 
608 			buf_mode = btr_op == BTR_DELETE_OP
609 				? BUF_GET_IF_IN_POOL_OR_WATCH
610 				: BUF_GET_IF_IN_POOL;
611 		}
612 	}
613 
614 	zip_size = dict_table_zip_size(index->table);
615 
616 retry_page_get:
617 	block = buf_page_get_gen(
618 		space, zip_size, page_no, rw_latch, guess, buf_mode,
619 		file, line, mtr);
620 
621 	if (block == NULL) {
622 		/* This must be a search to perform an insert/delete
623 		mark/ delete; try using the insert/delete buffer */
624 
625 		ut_ad(height == 0);
626 		ut_ad(cursor->thr);
627 
628 		switch (btr_op) {
629 		case BTR_INSERT_OP:
630 		case BTR_INSERT_IGNORE_UNIQUE_OP:
631 			ut_ad(buf_mode == BUF_GET_IF_IN_POOL);
632 
633 			if (ibuf_insert(IBUF_OP_INSERT, tuple, index,
634 					space, zip_size, page_no,
635 					cursor->thr)) {
636 
637 				cursor->flag = BTR_CUR_INSERT_TO_IBUF;
638 
639 				goto func_exit;
640 			}
641 			break;
642 
643 		case BTR_DELMARK_OP:
644 			ut_ad(buf_mode == BUF_GET_IF_IN_POOL);
645 
646 			if (ibuf_insert(IBUF_OP_DELETE_MARK, tuple,
647 					index, space, zip_size,
648 					page_no, cursor->thr)) {
649 
650 				cursor->flag = BTR_CUR_DEL_MARK_IBUF;
651 
652 				goto func_exit;
653 			}
654 
655 			break;
656 
657 		case BTR_DELETE_OP:
658 			ut_ad(buf_mode == BUF_GET_IF_IN_POOL_OR_WATCH);
659 
660 			if (!row_purge_poss_sec(cursor->purge_node,
661 						index, tuple)) {
662 
663 				/* The record cannot be purged yet. */
664 				cursor->flag = BTR_CUR_DELETE_REF;
665 			} else if (ibuf_insert(IBUF_OP_DELETE, tuple,
666 					       index, space, zip_size,
667 					       page_no,
668 					       cursor->thr)) {
669 
670 				/* The purge was buffered. */
671 				cursor->flag = BTR_CUR_DELETE_IBUF;
672 			} else {
673 				/* The purge could not be buffered. */
674 				buf_pool_watch_unset(space, page_no);
675 				break;
676 			}
677 
678 			buf_pool_watch_unset(space, page_no);
679 			goto func_exit;
680 
681 		default:
682 			ut_error;
683 		}
684 
685 		/* Insert to the insert/delete buffer did not succeed, we
686 		must read the page from disk. */
687 
688 		buf_mode = BUF_GET;
689 
690 		goto retry_page_get;
691 	}
692 
693 	block->check_index_page_at_flush = TRUE;
694 	page = buf_block_get_frame(block);
695 
696 	if (rw_latch != RW_NO_LATCH) {
697 #ifdef UNIV_ZIP_DEBUG
698 		const page_zip_des_t*	page_zip
699 			= buf_block_get_page_zip(block);
700 		ut_a(!page_zip || page_zip_validate(page_zip, page, index));
701 #endif /* UNIV_ZIP_DEBUG */
702 
703 		buf_block_dbg_add_level(
704 			block, dict_index_is_ibuf(index)
705 			? SYNC_IBUF_TREE_NODE : SYNC_TREE_NODE);
706 	}
707 
708 	ut_ad(fil_page_get_type(page) == FIL_PAGE_INDEX);
709 	ut_ad(index->id == btr_page_get_index_id(page));
710 
711 	if (UNIV_UNLIKELY(height == ULINT_UNDEFINED)) {
712 		/* We are in the root node */
713 
714 		height = btr_page_get_level(page, mtr);
715 		root_height = height;
716 		cursor->tree_height = root_height + 1;
717 
718 #ifdef BTR_CUR_ADAPT
719 		if (block != guess) {
720 			info->root_guess = block;
721 		}
722 #endif
723 	}
724 
725 	if (height == 0) {
726 		if (rw_latch == RW_NO_LATCH) {
727 
728 			btr_cur_latch_leaves(
729 				page, space, zip_size, page_no, latch_mode,
730 				cursor, mtr);
731 		}
732 
733 		switch (latch_mode) {
734 		case BTR_MODIFY_TREE:
735 		case BTR_CONT_MODIFY_TREE:
736 			break;
737 		default:
738 			if (!s_latch_by_caller) {
739 				/* Release the tree s-latch */
740 				mtr_release_s_latch_at_savepoint(
741 					mtr, savepoint,
742 					dict_index_get_lock(index));
743 			}
744 		}
745 
746 		page_mode = mode;
747 	}
748 
749 	page_cur_search_with_match(
750 		block, index, tuple, page_mode, &up_match, &up_bytes,
751 		&low_match, &low_bytes, page_cursor);
752 
753 	if (estimate) {
754 		btr_cur_add_path_info(cursor, height, root_height);
755 	}
756 
757 	/* If this is the desired level, leave the loop */
758 
759 	ut_ad(height == btr_page_get_level(page_cur_get_page(page_cursor),
760 					   mtr));
761 
762 	if (level != height) {
763 
764 		const rec_t*	node_ptr;
765 		ut_ad(height > 0);
766 
767 		height--;
768 		guess = NULL;
769 
770 		node_ptr = page_cur_get_rec(page_cursor);
771 
772 		offsets = rec_get_offsets(
773 			node_ptr, index, offsets, ULINT_UNDEFINED, &heap);
774 
775 		/* Go to the child node */
776 		page_no = btr_node_ptr_get_child_page_no(node_ptr, offsets);
777 
778 		if (UNIV_UNLIKELY(height == 0 && dict_index_is_ibuf(index))) {
779 			/* We're doing a search on an ibuf tree and we're one
780 			level above the leaf page. */
781 
782 			ut_ad(level == 0);
783 
784 			buf_mode = BUF_GET;
785 			rw_latch = RW_NO_LATCH;
786 			goto retry_page_get;
787 		}
788 
789 		goto search_loop;
790 	}
791 
792 	if (level != 0) {
793 		/* x-latch the page */
794 		buf_block_t*	child_block = btr_block_get(
795 			space, zip_size, page_no, RW_X_LATCH, index, mtr);
796 
797 		page = buf_block_get_frame(child_block);
798 		btr_assert_not_corrupted(child_block, index);
799 	} else {
800 		cursor->low_match = low_match;
801 		cursor->low_bytes = low_bytes;
802 		cursor->up_match = up_match;
803 		cursor->up_bytes = up_bytes;
804 
805 #ifdef BTR_CUR_ADAPT
806 		/* We do a dirty read of btr_search_enabled here.  We
807 		will properly check btr_search_enabled again in
808 		btr_search_build_page_hash_index() before building a
809 		page hash index, while holding btr_search_latch. */
810 		if (btr_search_enabled) {
811 			btr_search_info_update(index, cursor);
812 		}
813 #endif
814 		ut_ad(cursor->up_match != ULINT_UNDEFINED
815 		      || mode != PAGE_CUR_GE);
816 		ut_ad(cursor->up_match != ULINT_UNDEFINED
817 		      || mode != PAGE_CUR_LE);
818 		ut_ad(cursor->low_match != ULINT_UNDEFINED
819 		      || mode != PAGE_CUR_LE);
820 	}
821 
822 func_exit:
823 
824 	if (UNIV_LIKELY_NULL(heap)) {
825 		mem_heap_free(heap);
826 	}
827 
828 	if (has_search_latch) {
829 
830 		rw_lock_s_lock(&btr_search_latch);
831 	}
832 }
833 
834 /*****************************************************************//**
835 Opens a cursor at either end of an index. */
836 UNIV_INTERN
837 void
btr_cur_open_at_index_side_func(bool from_left,dict_index_t * index,ulint latch_mode,btr_cur_t * cursor,ulint level,const char * file,ulint line,mtr_t * mtr)838 btr_cur_open_at_index_side_func(
839 /*============================*/
840 	bool		from_left,	/*!< in: true if open to the low end,
841 					false if to the high end */
842 	dict_index_t*	index,		/*!< in: index */
843 	ulint		latch_mode,	/*!< in: latch mode */
844 	btr_cur_t*	cursor,		/*!< in/out: cursor */
845 	ulint		level,		/*!< in: level to search for
846 					(0=leaf). */
847 	const char*	file,		/*!< in: file name */
848 	ulint		line,		/*!< in: line where called */
849 	mtr_t*		mtr)		/*!< in/out: mini-transaction */
850 {
851 	page_cur_t*	page_cursor;
852 	ulint		page_no;
853 	ulint		space;
854 	ulint		zip_size;
855 	ulint		height;
856 	ulint		root_height = 0; /* remove warning */
857 	rec_t*		node_ptr;
858 	ulint		estimate;
859 	ulint		savepoint;
860 	mem_heap_t*	heap		= NULL;
861 	ulint		offsets_[REC_OFFS_NORMAL_SIZE];
862 	ulint*		offsets		= offsets_;
863 	rec_offs_init(offsets_);
864 
865 	estimate = latch_mode & BTR_ESTIMATE;
866 	latch_mode &= ~BTR_ESTIMATE;
867 
868 	ut_ad(level != ULINT_UNDEFINED);
869 
870 	/* Store the position of the tree latch we push to mtr so that we
871 	know how to release it when we have latched the leaf node */
872 
873 	savepoint = mtr_set_savepoint(mtr);
874 
875 	switch (latch_mode) {
876 	case BTR_CONT_MODIFY_TREE:
877 		break;
878 	case BTR_MODIFY_TREE:
879 		mtr_x_lock(dict_index_get_lock(index), mtr);
880 		break;
881 	case BTR_SEARCH_LEAF | BTR_ALREADY_S_LATCHED:
882 	case BTR_MODIFY_LEAF | BTR_ALREADY_S_LATCHED:
883 		ut_ad(mtr_memo_contains(mtr, dict_index_get_lock(index),
884 					MTR_MEMO_S_LOCK));
885 		break;
886 	default:
887 		mtr_s_lock(dict_index_get_lock(index), mtr);
888 	}
889 
890 	page_cursor = btr_cur_get_page_cur(cursor);
891 	cursor->index = index;
892 
893 	space = dict_index_get_space(index);
894 	zip_size = dict_table_zip_size(index->table);
895 	page_no = dict_index_get_page(index);
896 
897 	height = ULINT_UNDEFINED;
898 
899 	for (;;) {
900 		buf_block_t*	block;
901 		page_t*		page;
902 		block = buf_page_get_gen(space, zip_size, page_no,
903 					 RW_NO_LATCH, NULL, BUF_GET,
904 					 file, line, mtr);
905 		page = buf_block_get_frame(block);
906 		ut_ad(fil_page_get_type(page) == FIL_PAGE_INDEX);
907 		ut_ad(index->id == btr_page_get_index_id(page));
908 
909 		block->check_index_page_at_flush = TRUE;
910 
911 		if (height == ULINT_UNDEFINED) {
912 			/* We are in the root node */
913 
914 			height = btr_page_get_level(page, mtr);
915 			root_height = height;
916 			ut_a(height >= level);
917 		} else {
918 			/* TODO: flag the index corrupted if this fails */
919 			ut_ad(height == btr_page_get_level(page, mtr));
920 		}
921 
922 		if (height == level) {
923 			btr_cur_latch_leaves(
924 				page, space, zip_size, page_no,
925 				latch_mode & ~BTR_ALREADY_S_LATCHED,
926 				cursor, mtr);
927 
928 			if (height == 0) {
929 				/* In versions <= 3.23.52 we had
930 				forgotten to release the tree latch
931 				here. If in an index scan we had to
932 				scan far to find a record visible to
933 				the current transaction, that could
934 				starve others waiting for the tree
935 				latch. */
936 
937 				switch (latch_mode) {
938 				case BTR_MODIFY_TREE:
939 				case BTR_CONT_MODIFY_TREE:
940 				case BTR_SEARCH_LEAF | BTR_ALREADY_S_LATCHED:
941 				case BTR_MODIFY_LEAF | BTR_ALREADY_S_LATCHED:
942 					break;
943 				default:
944 					/* Release the tree s-latch */
945 
946 					mtr_release_s_latch_at_savepoint(
947 						mtr, savepoint,
948 						dict_index_get_lock(index));
949 				}
950 			}
951 		}
952 
953 		if (from_left) {
954 			page_cur_set_before_first(block, page_cursor);
955 		} else {
956 			page_cur_set_after_last(block, page_cursor);
957 		}
958 
959 		if (height == level) {
960 			if (estimate) {
961 				btr_cur_add_path_info(cursor, height,
962 						      root_height);
963 			}
964 
965 			break;
966 		}
967 
968 		ut_ad(height > 0);
969 
970 		if (from_left) {
971 			page_cur_move_to_next(page_cursor);
972 		} else {
973 			page_cur_move_to_prev(page_cursor);
974 		}
975 
976 		if (estimate) {
977 			btr_cur_add_path_info(cursor, height, root_height);
978 		}
979 
980 		height--;
981 
982 		node_ptr = page_cur_get_rec(page_cursor);
983 		offsets = rec_get_offsets(node_ptr, cursor->index, offsets,
984 					  ULINT_UNDEFINED, &heap);
985 		/* Go to the child node */
986 		page_no = btr_node_ptr_get_child_page_no(node_ptr, offsets);
987 	}
988 
989 	if (heap) {
990 		mem_heap_free(heap);
991 	}
992 }
993 
994 /**********************************************************************//**
995 Positions a cursor at a randomly chosen position within a B-tree. */
996 UNIV_INTERN
997 void
btr_cur_open_at_rnd_pos_func(dict_index_t * index,ulint latch_mode,btr_cur_t * cursor,const char * file,ulint line,mtr_t * mtr)998 btr_cur_open_at_rnd_pos_func(
999 /*=========================*/
1000 	dict_index_t*	index,		/*!< in: index */
1001 	ulint		latch_mode,	/*!< in: BTR_SEARCH_LEAF, ... */
1002 	btr_cur_t*	cursor,		/*!< in/out: B-tree cursor */
1003 	const char*	file,		/*!< in: file name */
1004 	ulint		line,		/*!< in: line where called */
1005 	mtr_t*		mtr)		/*!< in: mtr */
1006 {
1007 	page_cur_t*	page_cursor;
1008 	ulint		page_no;
1009 	ulint		space;
1010 	ulint		zip_size;
1011 	ulint		height;
1012 	rec_t*		node_ptr;
1013 	mem_heap_t*	heap		= NULL;
1014 	ulint		offsets_[REC_OFFS_NORMAL_SIZE];
1015 	ulint*		offsets		= offsets_;
1016 	rec_offs_init(offsets_);
1017 
1018 	switch (latch_mode) {
1019 	case BTR_MODIFY_TREE:
1020 		mtr_x_lock(dict_index_get_lock(index), mtr);
1021 		break;
1022 	default:
1023 		ut_ad(latch_mode != BTR_CONT_MODIFY_TREE);
1024 		mtr_s_lock(dict_index_get_lock(index), mtr);
1025 	}
1026 
1027 	page_cursor = btr_cur_get_page_cur(cursor);
1028 	cursor->index = index;
1029 
1030 	space = dict_index_get_space(index);
1031 	zip_size = dict_table_zip_size(index->table);
1032 	page_no = dict_index_get_page(index);
1033 
1034 	height = ULINT_UNDEFINED;
1035 
1036 	for (;;) {
1037 		buf_block_t*	block;
1038 		page_t*		page;
1039 
1040 		block = buf_page_get_gen(space, zip_size, page_no,
1041 					 RW_NO_LATCH, NULL, BUF_GET,
1042 					 file, line, mtr);
1043 		page = buf_block_get_frame(block);
1044 		ut_ad(fil_page_get_type(page) == FIL_PAGE_INDEX);
1045 		ut_ad(index->id == btr_page_get_index_id(page));
1046 
1047 		if (height == ULINT_UNDEFINED) {
1048 			/* We are in the root node */
1049 
1050 			height = btr_page_get_level(page, mtr);
1051 		}
1052 
1053 		if (height == 0) {
1054 			btr_cur_latch_leaves(page, space, zip_size, page_no,
1055 					     latch_mode, cursor, mtr);
1056 		}
1057 
1058 		page_cur_open_on_rnd_user_rec(block, page_cursor);
1059 
1060 		if (height == 0) {
1061 
1062 			break;
1063 		}
1064 
1065 		ut_ad(height > 0);
1066 
1067 		height--;
1068 
1069 		node_ptr = page_cur_get_rec(page_cursor);
1070 		offsets = rec_get_offsets(node_ptr, cursor->index, offsets,
1071 					  ULINT_UNDEFINED, &heap);
1072 		/* Go to the child node */
1073 		page_no = btr_node_ptr_get_child_page_no(node_ptr, offsets);
1074 	}
1075 
1076 	if (UNIV_LIKELY_NULL(heap)) {
1077 		mem_heap_free(heap);
1078 	}
1079 }
1080 
1081 /*==================== B-TREE INSERT =========================*/
1082 
1083 /*************************************************************//**
1084 Inserts a record if there is enough space, or if enough space can
1085 be freed by reorganizing. Differs from btr_cur_optimistic_insert because
1086 no heuristics is applied to whether it pays to use CPU time for
1087 reorganizing the page or not.
1088 
1089 IMPORTANT: The caller will have to update IBUF_BITMAP_FREE
1090 if this is a compressed leaf page in a secondary index.
1091 This has to be done either within the same mini-transaction,
1092 or by invoking ibuf_reset_free_bits() before mtr_commit().
1093 
1094 @return	pointer to inserted record if succeed, else NULL */
1095 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1096 rec_t*
btr_cur_insert_if_possible(btr_cur_t * cursor,const dtuple_t * tuple,ulint ** offsets,mem_heap_t ** heap,ulint n_ext,mtr_t * mtr)1097 btr_cur_insert_if_possible(
1098 /*=======================*/
1099 	btr_cur_t*	cursor,	/*!< in: cursor on page after which to insert;
1100 				cursor stays valid */
1101 	const dtuple_t*	tuple,	/*!< in: tuple to insert; the size info need not
1102 				have been stored to tuple */
1103 	ulint**		offsets,/*!< out: offsets on *rec */
1104 	mem_heap_t**	heap,	/*!< in/out: pointer to memory heap, or NULL */
1105 	ulint		n_ext,	/*!< in: number of externally stored columns */
1106 	mtr_t*		mtr)	/*!< in/out: mini-transaction */
1107 {
1108 	page_cur_t*	page_cursor;
1109 	rec_t*		rec;
1110 
1111 	ut_ad(dtuple_check_typed(tuple));
1112 
1113 	ut_ad(mtr_memo_contains(mtr, btr_cur_get_block(cursor),
1114 				MTR_MEMO_PAGE_X_FIX));
1115 	page_cursor = btr_cur_get_page_cur(cursor);
1116 
1117 	/* Now, try the insert */
1118 	rec = page_cur_tuple_insert(page_cursor, tuple, cursor->index,
1119 				    offsets, heap, n_ext, mtr);
1120 
1121 	/* If the record did not fit, reorganize.
1122 	For compressed pages, page_cur_tuple_insert()
1123 	attempted this already. */
1124 	if (!rec && !page_cur_get_page_zip(page_cursor)
1125 	    && btr_page_reorganize(page_cursor, cursor->index, mtr)) {
1126 		rec = page_cur_tuple_insert(
1127 			page_cursor, tuple, cursor->index,
1128 			offsets, heap, n_ext, mtr);
1129 	}
1130 
1131 	ut_ad(!rec || rec_offs_validate(rec, cursor->index, *offsets));
1132 	return(rec);
1133 }
1134 
1135 /*************************************************************//**
1136 For an insert, checks the locks and does the undo logging if desired.
1137 @return	DB_SUCCESS, DB_WAIT_LOCK, DB_FAIL, or error number */
1138 UNIV_INLINE MY_ATTRIBUTE((warn_unused_result, nonnull(2,3,5,6)))
1139 dberr_t
btr_cur_ins_lock_and_undo(ulint flags,btr_cur_t * cursor,dtuple_t * entry,que_thr_t * thr,mtr_t * mtr,ibool * inherit)1140 btr_cur_ins_lock_and_undo(
1141 /*======================*/
1142 	ulint		flags,	/*!< in: undo logging and locking flags: if
1143 				not zero, the parameters index and thr
1144 				should be specified */
1145 	btr_cur_t*	cursor,	/*!< in: cursor on page after which to insert */
1146 	dtuple_t*	entry,	/*!< in/out: entry to insert */
1147 	que_thr_t*	thr,	/*!< in: query thread or NULL */
1148 	mtr_t*		mtr,	/*!< in/out: mini-transaction */
1149 	ibool*		inherit)/*!< out: TRUE if the inserted new record maybe
1150 				should inherit LOCK_GAP type locks from the
1151 				successor record */
1152 {
1153 	dict_index_t*	index;
1154 	dberr_t		err;
1155 	rec_t*		rec;
1156 	roll_ptr_t	roll_ptr;
1157 
1158 	/* Check if we have to wait for a lock: enqueue an explicit lock
1159 	request if yes */
1160 
1161 	rec = btr_cur_get_rec(cursor);
1162 	index = cursor->index;
1163 
1164 	ut_ad(!dict_index_is_online_ddl(index)
1165 	      || dict_index_is_clust(index)
1166 	      || (flags & BTR_CREATE_FLAG));
1167 
1168 	err = lock_rec_insert_check_and_lock(flags, rec,
1169 					     btr_cur_get_block(cursor),
1170 					     index, thr, mtr, inherit);
1171 
1172 	if (err != DB_SUCCESS
1173 	    || !dict_index_is_clust(index) || dict_index_is_ibuf(index)) {
1174 
1175 		return(err);
1176 	}
1177 
1178 	err = trx_undo_report_row_operation(flags, TRX_UNDO_INSERT_OP,
1179 					    thr, index, entry,
1180 					    NULL, 0, NULL, NULL,
1181 					    &roll_ptr);
1182 	if (err != DB_SUCCESS) {
1183 
1184 		return(err);
1185 	}
1186 
1187 	/* Now we can fill in the roll ptr field in entry */
1188 
1189 	if (!(flags & BTR_KEEP_SYS_FLAG)) {
1190 
1191 		row_upd_index_entry_sys_field(entry, index,
1192 					      DATA_ROLL_PTR, roll_ptr);
1193 	}
1194 
1195 	return(DB_SUCCESS);
1196 }
1197 
1198 #ifdef UNIV_DEBUG
1199 /*************************************************************//**
1200 Report information about a transaction. */
1201 static
1202 void
btr_cur_trx_report(trx_id_t trx_id,const dict_index_t * index,const char * op)1203 btr_cur_trx_report(
1204 /*===============*/
1205 	trx_id_t		trx_id,	/*!< in: transaction id */
1206 	const dict_index_t*	index,	/*!< in: index */
1207 	const char*		op)	/*!< in: operation */
1208 {
1209 	fprintf(stderr, "Trx with id " TRX_ID_FMT " going to ", trx_id);
1210 	fputs(op, stderr);
1211 	dict_index_name_print(stderr, NULL, index);
1212 	putc('\n', stderr);
1213 }
1214 #endif /* UNIV_DEBUG */
1215 
1216 /*************************************************************//**
1217 Tries to perform an insert to a page in an index tree, next to cursor.
1218 It is assumed that mtr holds an x-latch on the page. The operation does
1219 not succeed if there is too little space on the page. If there is just
1220 one record on the page, the insert will always succeed; this is to
1221 prevent trying to split a page with just one record.
1222 @return	DB_SUCCESS, DB_WAIT_LOCK, DB_FAIL, or error number */
1223 UNIV_INTERN
1224 dberr_t
btr_cur_optimistic_insert(ulint flags,btr_cur_t * cursor,ulint ** offsets,mem_heap_t ** heap,dtuple_t * entry,rec_t ** rec,big_rec_t ** big_rec,ulint n_ext,que_thr_t * thr,mtr_t * mtr)1225 btr_cur_optimistic_insert(
1226 /*======================*/
1227 	ulint		flags,	/*!< in: undo logging and locking flags: if not
1228 				zero, the parameters index and thr should be
1229 				specified */
1230 	btr_cur_t*	cursor,	/*!< in: cursor on page after which to insert;
1231 				cursor stays valid */
1232 	ulint**		offsets,/*!< out: offsets on *rec */
1233 	mem_heap_t**	heap,	/*!< in/out: pointer to memory heap, or NULL */
1234 	dtuple_t*	entry,	/*!< in/out: entry to insert */
1235 	rec_t**		rec,	/*!< out: pointer to inserted record if
1236 				succeed */
1237 	big_rec_t**	big_rec,/*!< out: big rec vector whose fields have to
1238 				be stored externally by the caller, or
1239 				NULL */
1240 	ulint		n_ext,	/*!< in: number of externally stored columns */
1241 	que_thr_t*	thr,	/*!< in: query thread or NULL */
1242 	mtr_t*		mtr)	/*!< in/out: mini-transaction;
1243 				if this function returns DB_SUCCESS on
1244 				a leaf page of a secondary index in a
1245 				compressed tablespace, the caller must
1246 				mtr_commit(mtr) before latching
1247 				any further pages */
1248 {
1249 	big_rec_t*	big_rec_vec	= NULL;
1250 	dict_index_t*	index;
1251 	page_cur_t*	page_cursor;
1252 	buf_block_t*	block;
1253 	page_t*		page;
1254 	rec_t*		dummy;
1255 	ibool		leaf;
1256 	ibool		reorg;
1257 	ibool		inherit = TRUE;
1258 	ulint		zip_size;
1259 	ulint		rec_size;
1260 	dberr_t		err;
1261 
1262 	*big_rec = NULL;
1263 
1264 	block = btr_cur_get_block(cursor);
1265 	page = buf_block_get_frame(block);
1266 	index = cursor->index;
1267 
1268 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
1269 	ut_ad(!dict_index_is_online_ddl(index)
1270 	      || dict_index_is_clust(index)
1271 	      || (flags & BTR_CREATE_FLAG));
1272 	ut_ad(dtuple_check_typed(entry));
1273 
1274 	zip_size = buf_block_get_zip_size(block);
1275 #ifdef UNIV_DEBUG_VALGRIND
1276 	if (zip_size) {
1277 		UNIV_MEM_ASSERT_RW(page, UNIV_PAGE_SIZE);
1278 		UNIV_MEM_ASSERT_RW(block->page.zip.data, zip_size);
1279 	}
1280 #endif /* UNIV_DEBUG_VALGRIND */
1281 
1282 #ifdef UNIV_DEBUG
1283 	if (btr_cur_print_record_ops && thr) {
1284 		btr_cur_trx_report(thr_get_trx(thr)->id, index, "insert ");
1285 		dtuple_print(stderr, entry);
1286 	}
1287 #endif /* UNIV_DEBUG */
1288 
1289 	leaf = page_is_leaf(page);
1290 
1291 	/* Calculate the record size when entry is converted to a record */
1292 	rec_size = rec_get_converted_size(index, entry, n_ext);
1293 
1294 	if (page_zip_rec_needs_ext(rec_size, page_is_comp(page),
1295 				   dtuple_get_n_fields(entry), zip_size)) {
1296 
1297 		/* The record is so big that we have to store some fields
1298 		externally on separate database pages */
1299 		big_rec_vec = dtuple_convert_big_rec(index, entry, &n_ext);
1300 
1301 		if (UNIV_UNLIKELY(big_rec_vec == NULL)) {
1302 
1303 			return(DB_TOO_BIG_RECORD);
1304 		}
1305 
1306 		rec_size = rec_get_converted_size(index, entry, n_ext);
1307 	}
1308 
1309 	if (zip_size) {
1310 		/* Estimate the free space of an empty compressed page.
1311 		Subtract one byte for the encoded heap_no in the
1312 		modification log. */
1313 		ulint	free_space_zip = page_zip_empty_size(
1314 			cursor->index->n_fields, zip_size);
1315 		ulint	n_uniq = dict_index_get_n_unique_in_tree(index);
1316 
1317 		ut_ad(dict_table_is_comp(index->table));
1318 
1319 		if (free_space_zip == 0) {
1320 too_big:
1321 			if (big_rec_vec) {
1322 				dtuple_convert_back_big_rec(
1323 					index, entry, big_rec_vec);
1324 			}
1325 
1326 			return(DB_TOO_BIG_RECORD);
1327 		}
1328 
1329 		/* Subtract one byte for the encoded heap_no in the
1330 		modification log. */
1331 		free_space_zip--;
1332 
1333 		/* There should be enough room for two node pointer
1334 		records on an empty non-leaf page.  This prevents
1335 		infinite page splits. */
1336 
1337 		if (entry->n_fields >= n_uniq
1338 		    && (REC_NODE_PTR_SIZE
1339 			+ rec_get_converted_size_comp_prefix(
1340 				index, entry->fields, n_uniq, NULL)
1341 			/* On a compressed page, there is
1342 			a two-byte entry in the dense
1343 			page directory for every record.
1344 			But there is no record header. */
1345 			- (REC_N_NEW_EXTRA_BYTES - 2)
1346 			> free_space_zip / 2)) {
1347 			goto too_big;
1348 		}
1349 	}
1350 
1351 	LIMIT_OPTIMISTIC_INSERT_DEBUG(page_get_n_recs(page),
1352 				      goto fail);
1353 
1354 	if (leaf && zip_size
1355 	    && (page_get_data_size(page) + rec_size
1356 		>= dict_index_zip_pad_optimal_page_size(index))) {
1357 		/* If compression padding tells us that insertion will
1358 		result in too packed up page i.e.: which is likely to
1359 		cause compression failure then don't do an optimistic
1360 		insertion. */
1361 fail:
1362 		err = DB_FAIL;
1363 fail_err:
1364 
1365 		if (big_rec_vec) {
1366 			dtuple_convert_back_big_rec(index, entry, big_rec_vec);
1367 		}
1368 
1369 		return(err);
1370 	}
1371 
1372 	ulint	max_size = page_get_max_insert_size_after_reorganize(page, 1);
1373 
1374 	if (page_has_garbage(page)) {
1375 		if ((max_size < rec_size
1376 		     || max_size < BTR_CUR_PAGE_REORGANIZE_LIMIT)
1377 		    && page_get_n_recs(page) > 1
1378 		    && page_get_max_insert_size(page, 1) < rec_size) {
1379 
1380 			goto fail;
1381 		}
1382 	} else if (max_size < rec_size) {
1383 		goto fail;
1384 	}
1385 
1386 	/* If there have been many consecutive inserts to the
1387 	clustered index leaf page of an uncompressed table, check if
1388 	we have to split the page to reserve enough free space for
1389 	future updates of records. */
1390 
1391 	if (leaf && !zip_size && dict_index_is_clust(index)
1392 	    && page_get_n_recs(page) >= 2
1393 	    && dict_index_get_space_reserve() + rec_size > max_size
1394 	    && (btr_page_get_split_rec_to_right(cursor, &dummy)
1395 		|| btr_page_get_split_rec_to_left(cursor, &dummy))) {
1396 		goto fail;
1397 	}
1398 
1399 	/* Check locks and write to the undo log, if specified */
1400 	err = btr_cur_ins_lock_and_undo(flags, cursor, entry,
1401 					thr, mtr, &inherit);
1402 
1403 	if (UNIV_UNLIKELY(err != DB_SUCCESS)) {
1404 
1405 		goto fail_err;
1406 	}
1407 
1408 	page_cursor = btr_cur_get_page_cur(cursor);
1409 
1410 	/* Now, try the insert */
1411 
1412 	{
1413 		const rec_t* page_cursor_rec = page_cur_get_rec(page_cursor);
1414 		*rec = page_cur_tuple_insert(page_cursor, entry, index,
1415 					     offsets, heap, n_ext, mtr);
1416 		reorg = page_cursor_rec != page_cur_get_rec(page_cursor);
1417 	}
1418 
1419 	if (*rec) {
1420 	} else if (zip_size) {
1421 		/* Reset the IBUF_BITMAP_FREE bits, because
1422 		page_cur_tuple_insert() will have attempted page
1423 		reorganize before failing. */
1424 		if (leaf && !dict_index_is_clust(index)) {
1425 			ibuf_reset_free_bits(block);
1426 		}
1427 
1428 		goto fail;
1429 	} else {
1430 		ut_ad(!reorg);
1431 
1432 		/* If the record did not fit, reorganize */
1433 		if (!btr_page_reorganize(page_cursor, index, mtr)) {
1434 			ut_ad(0);
1435 			goto fail;
1436 		}
1437 
1438 		ut_ad(page_get_max_insert_size(page, 1) == max_size);
1439 
1440 		reorg = TRUE;
1441 
1442 		*rec = page_cur_tuple_insert(page_cursor, entry, index,
1443 					     offsets, heap, n_ext, mtr);
1444 
1445 		if (UNIV_UNLIKELY(!*rec)) {
1446 			fputs("InnoDB: Error: cannot insert tuple ", stderr);
1447 			dtuple_print(stderr, entry);
1448 			fputs(" into ", stderr);
1449 			dict_index_name_print(stderr, thr_get_trx(thr), index);
1450 			fprintf(stderr, "\nInnoDB: max insert size %lu\n",
1451 				(ulong) max_size);
1452 			ut_error;
1453 		}
1454 	}
1455 
1456 #ifdef BTR_CUR_HASH_ADAPT
1457 	if (!reorg && leaf && (cursor->flag == BTR_CUR_HASH)) {
1458 		btr_search_update_hash_node_on_insert(cursor);
1459 	} else {
1460 		btr_search_update_hash_on_insert(cursor);
1461 	}
1462 #endif
1463 
1464 	if (!(flags & BTR_NO_LOCKING_FLAG) && inherit) {
1465 
1466 		lock_update_insert(block, *rec);
1467 	}
1468 
1469 	if (leaf && !dict_index_is_clust(index)) {
1470 		/* Update the free bits of the B-tree page in the
1471 		insert buffer bitmap. */
1472 
1473 		/* The free bits in the insert buffer bitmap must
1474 		never exceed the free space on a page.  It is safe to
1475 		decrement or reset the bits in the bitmap in a
1476 		mini-transaction that is committed before the
1477 		mini-transaction that affects the free space. */
1478 
1479 		/* It is unsafe to increment the bits in a separately
1480 		committed mini-transaction, because in crash recovery,
1481 		the free bits could momentarily be set too high. */
1482 
1483 		if (zip_size) {
1484 			/* Update the bits in the same mini-transaction. */
1485 			ibuf_update_free_bits_zip(block, mtr);
1486 		} else {
1487 			/* Decrement the bits in a separate
1488 			mini-transaction. */
1489 			ibuf_update_free_bits_if_full(
1490 				block, max_size,
1491 				rec_size + PAGE_DIR_SLOT_SIZE);
1492 		}
1493 	}
1494 
1495 	*big_rec = big_rec_vec;
1496 
1497 	return(DB_SUCCESS);
1498 }
1499 
1500 /*************************************************************//**
1501 Performs an insert on a page of an index tree. It is assumed that mtr
1502 holds an x-latch on the tree and on the cursor page. If the insert is
1503 made on the leaf level, to avoid deadlocks, mtr must also own x-latches
1504 to brothers of page, if those brothers exist.
1505 @return	DB_SUCCESS or error number */
1506 UNIV_INTERN
1507 dberr_t
btr_cur_pessimistic_insert(ulint flags,btr_cur_t * cursor,ulint ** offsets,mem_heap_t ** heap,dtuple_t * entry,rec_t ** rec,big_rec_t ** big_rec,ulint n_ext,que_thr_t * thr,mtr_t * mtr)1508 btr_cur_pessimistic_insert(
1509 /*=======================*/
1510 	ulint		flags,	/*!< in: undo logging and locking flags: if not
1511 				zero, the parameter thr should be
1512 				specified; if no undo logging is specified,
1513 				then the caller must have reserved enough
1514 				free extents in the file space so that the
1515 				insertion will certainly succeed */
1516 	btr_cur_t*	cursor,	/*!< in: cursor after which to insert;
1517 				cursor stays valid */
1518 	ulint**		offsets,/*!< out: offsets on *rec */
1519 	mem_heap_t**	heap,	/*!< in/out: pointer to memory heap
1520 				that can be emptied, or NULL */
1521 	dtuple_t*	entry,	/*!< in/out: entry to insert */
1522 	rec_t**		rec,	/*!< out: pointer to inserted record if
1523 				succeed */
1524 	big_rec_t**	big_rec,/*!< out: big rec vector whose fields have to
1525 				be stored externally by the caller, or
1526 				NULL */
1527 	ulint		n_ext,	/*!< in: number of externally stored columns */
1528 	que_thr_t*	thr,	/*!< in: query thread or NULL */
1529 	mtr_t*		mtr)	/*!< in/out: mini-transaction */
1530 {
1531 	dict_index_t*	index		= cursor->index;
1532 	ulint		zip_size	= dict_table_zip_size(index->table);
1533 	big_rec_t*	big_rec_vec	= NULL;
1534 	dberr_t		err;
1535 	ibool		inherit = FALSE;
1536 	ibool		success;
1537 	ulint		n_reserved	= 0;
1538 
1539 	ut_ad(dtuple_check_typed(entry));
1540 
1541 	*big_rec = NULL;
1542 
1543 	ut_ad(mtr_memo_contains(mtr,
1544 				dict_index_get_lock(btr_cur_get_index(cursor)),
1545 				MTR_MEMO_X_LOCK));
1546 	ut_ad(mtr_memo_contains(mtr, btr_cur_get_block(cursor),
1547 				MTR_MEMO_PAGE_X_FIX));
1548 	ut_ad(!dict_index_is_online_ddl(index)
1549 	      || dict_index_is_clust(index)
1550 	      || (flags & BTR_CREATE_FLAG));
1551 
1552 	cursor->flag = BTR_CUR_BINARY;
1553 
1554 	/* Check locks and write to undo log, if specified */
1555 
1556 	err = btr_cur_ins_lock_and_undo(flags, cursor, entry,
1557 					thr, mtr, &inherit);
1558 
1559 	if (err != DB_SUCCESS) {
1560 
1561 		return(err);
1562 	}
1563 
1564 	if (!(flags & BTR_NO_UNDO_LOG_FLAG)) {
1565 		/* First reserve enough free space for the file segments
1566 		of the index tree, so that the insert will not fail because
1567 		of lack of space */
1568 
1569 		ulint	n_extents = cursor->tree_height / 16 + 3;
1570 
1571 		success = fsp_reserve_free_extents(&n_reserved, index->space,
1572 						   n_extents, FSP_NORMAL, mtr);
1573 		if (!success) {
1574 			return(DB_OUT_OF_FILE_SPACE);
1575 		}
1576 	}
1577 
1578 	if (page_zip_rec_needs_ext(rec_get_converted_size(index, entry, n_ext),
1579 				   dict_table_is_comp(index->table),
1580 				   dtuple_get_n_fields(entry),
1581 				   zip_size)) {
1582 		/* The record is so big that we have to store some fields
1583 		externally on separate database pages */
1584 
1585 		if (UNIV_LIKELY_NULL(big_rec_vec)) {
1586 			/* This should never happen, but we handle
1587 			the situation in a robust manner. */
1588 			ut_ad(0);
1589 			dtuple_convert_back_big_rec(index, entry, big_rec_vec);
1590 		}
1591 
1592 		big_rec_vec = dtuple_convert_big_rec(index, entry, &n_ext);
1593 
1594 		if (big_rec_vec == NULL) {
1595 
1596 			if (n_reserved > 0) {
1597 				fil_space_release_free_extents(index->space,
1598 							       n_reserved);
1599 			}
1600 			return(DB_TOO_BIG_RECORD);
1601 		}
1602 	}
1603 
1604 	if (dict_index_get_page(index)
1605 	    == buf_block_get_page_no(btr_cur_get_block(cursor))) {
1606 
1607 		/* The page is the root page */
1608 		*rec = btr_root_raise_and_insert(
1609 			flags, cursor, offsets, heap, entry, n_ext, mtr);
1610 	} else {
1611 		*rec = btr_page_split_and_insert(
1612 			flags, cursor, offsets, heap, entry, n_ext, mtr);
1613 	}
1614 
1615 	if (*rec == NULL && os_has_said_disk_full) {
1616 		return(DB_OUT_OF_FILE_SPACE);
1617 	}
1618 
1619 	ut_ad(page_rec_get_next(btr_cur_get_rec(cursor)) == *rec);
1620 
1621 	if (!(flags & BTR_NO_LOCKING_FLAG)) {
1622 		/* The cursor might be moved to the other page,
1623 		and the max trx id field should be updated after
1624 		the cursor was fixed. */
1625 		if (!dict_index_is_clust(index)) {
1626 			page_update_max_trx_id(
1627 				btr_cur_get_block(cursor),
1628 				btr_cur_get_page_zip(cursor),
1629 				thr_get_trx(thr)->id, mtr);
1630 		}
1631 		if (!page_rec_is_infimum(btr_cur_get_rec(cursor))
1632 		    || btr_page_get_prev(
1633 			buf_block_get_frame(
1634 				btr_cur_get_block(cursor)), mtr)
1635 		       == FIL_NULL) {
1636 			/* split and inserted need to call
1637 			lock_update_insert() always. */
1638 			inherit = TRUE;
1639 		}
1640 	}
1641 
1642 #ifdef BTR_CUR_ADAPT
1643 	btr_search_update_hash_on_insert(cursor);
1644 #endif
1645 	if (inherit && !(flags & BTR_NO_LOCKING_FLAG)) {
1646 
1647 		lock_update_insert(btr_cur_get_block(cursor), *rec);
1648 	}
1649 
1650 	if (n_reserved > 0) {
1651 		fil_space_release_free_extents(index->space, n_reserved);
1652 	}
1653 
1654 	*big_rec = big_rec_vec;
1655 
1656 	return(DB_SUCCESS);
1657 }
1658 
1659 /*==================== B-TREE UPDATE =========================*/
1660 
1661 /*************************************************************//**
1662 For an update, checks the locks and does the undo logging.
1663 @return	DB_SUCCESS, DB_WAIT_LOCK, or error number */
1664 UNIV_INLINE MY_ATTRIBUTE((warn_unused_result, nonnull(2,3,6,7)))
1665 dberr_t
btr_cur_upd_lock_and_undo(ulint flags,btr_cur_t * cursor,const ulint * offsets,const upd_t * update,ulint cmpl_info,que_thr_t * thr,mtr_t * mtr,roll_ptr_t * roll_ptr)1666 btr_cur_upd_lock_and_undo(
1667 /*======================*/
1668 	ulint		flags,	/*!< in: undo logging and locking flags */
1669 	btr_cur_t*	cursor,	/*!< in: cursor on record to update */
1670 	const ulint*	offsets,/*!< in: rec_get_offsets() on cursor */
1671 	const upd_t*	update,	/*!< in: update vector */
1672 	ulint		cmpl_info,/*!< in: compiler info on secondary index
1673 				updates */
1674 	que_thr_t*	thr,	/*!< in: query thread
1675 				(can be NULL if BTR_NO_LOCKING_FLAG) */
1676 	mtr_t*		mtr,	/*!< in/out: mini-transaction */
1677 	roll_ptr_t*	roll_ptr)/*!< out: roll pointer */
1678 {
1679 	dict_index_t*	index;
1680 	const rec_t*	rec;
1681 	dberr_t		err;
1682 
1683 	ut_ad((thr != NULL) || (flags & BTR_NO_LOCKING_FLAG));
1684 
1685 	rec = btr_cur_get_rec(cursor);
1686 	index = cursor->index;
1687 
1688 	ut_ad(rec_offs_validate(rec, index, offsets));
1689 
1690 	if (!dict_index_is_clust(index)) {
1691 		ut_ad(dict_index_is_online_ddl(index)
1692 		      == !!(flags & BTR_CREATE_FLAG));
1693 
1694 		/* We do undo logging only when we update a clustered index
1695 		record */
1696 		return(lock_sec_rec_modify_check_and_lock(
1697 			       flags, btr_cur_get_block(cursor), rec,
1698 			       index, thr, mtr));
1699 	}
1700 
1701 	/* Check if we have to wait for a lock: enqueue an explicit lock
1702 	request if yes */
1703 
1704 	if (!(flags & BTR_NO_LOCKING_FLAG)) {
1705 		err = lock_clust_rec_modify_check_and_lock(
1706 			flags, btr_cur_get_block(cursor), rec, index,
1707 			offsets, thr);
1708 		if (err != DB_SUCCESS) {
1709 			return(err);
1710 		}
1711 	}
1712 
1713 	/* Append the info about the update in the undo log */
1714 
1715 	return(trx_undo_report_row_operation(
1716 		       flags, TRX_UNDO_MODIFY_OP, thr,
1717 		       index, NULL, update,
1718 		       cmpl_info, rec, offsets, roll_ptr));
1719 }
1720 
1721 /***********************************************************//**
1722 Writes a redo log record of updating a record in-place. */
1723 UNIV_INTERN
1724 void
btr_cur_update_in_place_log(ulint flags,const rec_t * rec,dict_index_t * index,const upd_t * update,trx_id_t trx_id,roll_ptr_t roll_ptr,mtr_t * mtr)1725 btr_cur_update_in_place_log(
1726 /*========================*/
1727 	ulint		flags,		/*!< in: flags */
1728 	const rec_t*	rec,		/*!< in: record */
1729 	dict_index_t*	index,		/*!< in: index of the record */
1730 	const upd_t*	update,		/*!< in: update vector */
1731 	trx_id_t	trx_id,		/*!< in: transaction id */
1732 	roll_ptr_t	roll_ptr,	/*!< in: roll ptr */
1733 	mtr_t*		mtr)		/*!< in: mtr */
1734 {
1735 	byte*		log_ptr;
1736 	const page_t*	page	= page_align(rec);
1737 	ut_ad(flags < 256);
1738 	ut_ad(!!page_is_comp(page) == dict_table_is_comp(index->table));
1739 
1740 	log_ptr = mlog_open_and_write_index(mtr, rec, index, page_is_comp(page)
1741 					    ? MLOG_COMP_REC_UPDATE_IN_PLACE
1742 					    : MLOG_REC_UPDATE_IN_PLACE,
1743 					    1 + DATA_ROLL_PTR_LEN + 14 + 2
1744 					    + MLOG_BUF_MARGIN);
1745 
1746 	if (!log_ptr) {
1747 		/* Logging in mtr is switched off during crash recovery */
1748 		return;
1749 	}
1750 
1751 	/* For secondary indexes, we could skip writing the dummy system fields
1752 	to the redo log but we have to change redo log parsing of
1753 	MLOG_REC_UPDATE_IN_PLACE/MLOG_COMP_REC_UPDATE_IN_PLACE or we have to add
1754 	new redo log record. For now, just write dummy sys fields to the redo
1755 	log if we are updating a secondary index record.
1756 	*/
1757 	mach_write_to_1(log_ptr, flags);
1758 	log_ptr++;
1759 
1760 	if (dict_index_is_clust(index)) {
1761 		log_ptr = row_upd_write_sys_vals_to_log(
1762 				index, trx_id, roll_ptr, log_ptr, mtr);
1763 	} else {
1764 		/* Dummy system fields for a secondary index */
1765 		/* TRX_ID Position */
1766 		log_ptr += mach_write_compressed(log_ptr, 0);
1767 		/* ROLL_PTR */
1768 		trx_write_roll_ptr(log_ptr, 0);
1769 		log_ptr += DATA_ROLL_PTR_LEN;
1770 		/* TRX_ID */
1771 		log_ptr += mach_ull_write_compressed(log_ptr, 0);
1772 	}
1773 
1774 	mach_write_to_2(log_ptr, page_offset(rec));
1775 	log_ptr += 2;
1776 
1777 	row_upd_index_write_log(update, log_ptr, mtr);
1778 }
1779 #endif /* UNIV_HOTBACKUP */
1780 
1781 /***********************************************************//**
1782 Parses a redo log record of updating a record in-place.
1783 @return	end of log record or NULL */
1784 UNIV_INTERN
1785 byte*
btr_cur_parse_update_in_place(byte * ptr,byte * end_ptr,page_t * page,page_zip_des_t * page_zip,dict_index_t * index)1786 btr_cur_parse_update_in_place(
1787 /*==========================*/
1788 	byte*		ptr,	/*!< in: buffer */
1789 	byte*		end_ptr,/*!< in: buffer end */
1790 	page_t*		page,	/*!< in/out: page or NULL */
1791 	page_zip_des_t*	page_zip,/*!< in/out: compressed page, or NULL */
1792 	dict_index_t*	index)	/*!< in: index corresponding to page */
1793 {
1794 	ulint		flags;
1795 	rec_t*		rec;
1796 	upd_t*		update;
1797 	ulint		pos;
1798 	trx_id_t	trx_id;
1799 	roll_ptr_t	roll_ptr;
1800 	ulint		rec_offset;
1801 	mem_heap_t*	heap;
1802 	ulint*		offsets;
1803 
1804 	if (end_ptr < ptr + 1) {
1805 
1806 		return(NULL);
1807 	}
1808 
1809 	flags = mach_read_from_1(ptr);
1810 	ptr++;
1811 
1812 	ptr = row_upd_parse_sys_vals(ptr, end_ptr, &pos, &trx_id, &roll_ptr);
1813 
1814 	if (ptr == NULL) {
1815 
1816 		return(NULL);
1817 	}
1818 
1819 	if (end_ptr < ptr + 2) {
1820 
1821 		return(NULL);
1822 	}
1823 
1824 	rec_offset = mach_read_from_2(ptr);
1825 	ptr += 2;
1826 
1827 	ut_a(rec_offset <= UNIV_PAGE_SIZE);
1828 
1829 	heap = mem_heap_create(256);
1830 
1831 	ptr = row_upd_index_parse(ptr, end_ptr, heap, &update);
1832 
1833 	if (!ptr || !page) {
1834 
1835 		goto func_exit;
1836 	}
1837 
1838 	ut_a((ibool)!!page_is_comp(page) == dict_table_is_comp(index->table));
1839 	rec = page + rec_offset;
1840 
1841 	/* We do not need to reserve btr_search_latch, as the page is only
1842 	being recovered, and there cannot be a hash index to it. */
1843 
1844 	offsets = rec_get_offsets(rec, index, NULL, ULINT_UNDEFINED, &heap);
1845 
1846 	if (!(flags & BTR_KEEP_SYS_FLAG)) {
1847 		row_upd_rec_sys_fields_in_recovery(rec, page_zip, offsets,
1848 						   pos, trx_id, roll_ptr);
1849 	}
1850 
1851 	row_upd_rec_in_place(rec, index, offsets, update, page_zip);
1852 
1853 func_exit:
1854 	mem_heap_free(heap);
1855 
1856 	return(ptr);
1857 }
1858 
1859 #ifndef UNIV_HOTBACKUP
1860 /*************************************************************//**
1861 See if there is enough place in the page modification log to log
1862 an update-in-place.
1863 
1864 @retval false if out of space; IBUF_BITMAP_FREE will be reset
1865 outside mtr if the page was recompressed
1866 @retval	true if enough place;
1867 
1868 IMPORTANT: The caller will have to update IBUF_BITMAP_FREE if this is
1869 a secondary index leaf page. This has to be done either within the
1870 same mini-transaction, or by invoking ibuf_reset_free_bits() before
1871 mtr_commit(mtr). */
1872 UNIV_INTERN
1873 bool
btr_cur_update_alloc_zip_func(page_zip_des_t * page_zip,page_cur_t * cursor,dict_index_t * index,ulint * offsets,ulint length,bool create,mtr_t * mtr)1874 btr_cur_update_alloc_zip_func(
1875 /*==========================*/
1876 	page_zip_des_t*	page_zip,/*!< in/out: compressed page */
1877 	page_cur_t*	cursor,	/*!< in/out: B-tree page cursor */
1878 	dict_index_t*	index,	/*!< in: the index corresponding to cursor */
1879 #ifdef UNIV_DEBUG
1880 	ulint*		offsets,/*!< in/out: offsets of the cursor record */
1881 #endif /* UNIV_DEBUG */
1882 	ulint		length,	/*!< in: size needed */
1883 	bool		create,	/*!< in: true=delete-and-insert,
1884 				false=update-in-place */
1885 	mtr_t*		mtr)	/*!< in/out: mini-transaction */
1886 {
1887 	const page_t*	page = page_cur_get_page(cursor);
1888 
1889 	ut_ad(page_zip == page_cur_get_page_zip(cursor));
1890 	ut_ad(page_zip);
1891 	ut_ad(!dict_index_is_ibuf(index));
1892 	ut_ad(rec_offs_validate(page_cur_get_rec(cursor), index, offsets));
1893 
1894 	if (page_zip_available(page_zip, dict_index_is_clust(index),
1895 			       length, create)) {
1896 		return(true);
1897 	}
1898 
1899 	if (!page_zip->m_nonempty && !page_has_garbage(page)) {
1900 		/* The page has been freshly compressed, so
1901 		reorganizing it will not help. */
1902 		return(false);
1903 	}
1904 
1905 	if (create && page_is_leaf(page)
1906 	    && (length + page_get_data_size(page)
1907 		>= dict_index_zip_pad_optimal_page_size(index))) {
1908 		return(false);
1909 	}
1910 
1911 	if (!btr_page_reorganize(cursor, index, mtr)) {
1912 		goto out_of_space;
1913 	}
1914 
1915 	rec_offs_make_valid(page_cur_get_rec(cursor), index, offsets);
1916 
1917 	/* After recompressing a page, we must make sure that the free
1918 	bits in the insert buffer bitmap will not exceed the free
1919 	space on the page.  Because this function will not attempt
1920 	recompression unless page_zip_available() fails above, it is
1921 	safe to reset the free bits if page_zip_available() fails
1922 	again, below.  The free bits can safely be reset in a separate
1923 	mini-transaction.  If page_zip_available() succeeds below, we
1924 	can be sure that the btr_page_reorganize() above did not reduce
1925 	the free space available on the page. */
1926 
1927 	if (page_zip_available(page_zip, dict_index_is_clust(index),
1928 			       length, create)) {
1929 		return(true);
1930 	}
1931 
1932 out_of_space:
1933 	ut_ad(rec_offs_validate(page_cur_get_rec(cursor), index, offsets));
1934 
1935 	/* Out of space: reset the free bits. */
1936 	if (!dict_index_is_clust(index) && page_is_leaf(page)) {
1937 		ibuf_reset_free_bits(page_cur_get_block(cursor));
1938 	}
1939 
1940 	return(false);
1941 }
1942 
1943 /*************************************************************//**
1944 Updates a record when the update causes no size changes in its fields.
1945 We assume here that the ordering fields of the record do not change.
1946 @return locking or undo log related error code, or
1947 @retval DB_SUCCESS on success
1948 @retval DB_ZIP_OVERFLOW if there is not enough space left
1949 on the compressed page (IBUF_BITMAP_FREE was reset outside mtr) */
1950 UNIV_INTERN
1951 dberr_t
btr_cur_update_in_place(ulint flags,btr_cur_t * cursor,ulint * offsets,const upd_t * update,ulint cmpl_info,que_thr_t * thr,trx_id_t trx_id,mtr_t * mtr)1952 btr_cur_update_in_place(
1953 /*====================*/
1954 	ulint		flags,	/*!< in: undo logging and locking flags */
1955 	btr_cur_t*	cursor,	/*!< in: cursor on the record to update;
1956 				cursor stays valid and positioned on the
1957 				same record */
1958 	ulint*		offsets,/*!< in/out: offsets on cursor->page_cur.rec */
1959 	const upd_t*	update,	/*!< in: update vector */
1960 	ulint		cmpl_info,/*!< in: compiler info on secondary index
1961 				updates */
1962 	que_thr_t*	thr,	/*!< in: query thread */
1963 	trx_id_t	trx_id,	/*!< in: transaction id */
1964 	mtr_t*		mtr)	/*!< in/out: mini-transaction; if this
1965 				is a secondary index, the caller must
1966 				mtr_commit(mtr) before latching any
1967 				further pages */
1968 {
1969 	dict_index_t*	index;
1970 	buf_block_t*	block;
1971 	page_zip_des_t*	page_zip;
1972 	dberr_t		err;
1973 	rec_t*		rec;
1974 	roll_ptr_t	roll_ptr	= 0;
1975 	ulint		was_delete_marked;
1976 	ibool		is_hashed;
1977 
1978 	rec = btr_cur_get_rec(cursor);
1979 	index = cursor->index;
1980 	ut_ad(rec_offs_validate(rec, index, offsets));
1981 	ut_ad(!!page_rec_is_comp(rec) == dict_table_is_comp(index->table));
1982 	/* The insert buffer tree should never be updated in place. */
1983 	ut_ad(!dict_index_is_ibuf(index));
1984 	ut_ad(dict_index_is_online_ddl(index) == !!(flags & BTR_CREATE_FLAG)
1985 	      || dict_index_is_clust(index));
1986 	ut_ad(thr_get_trx(thr)->id == trx_id
1987 	      || (flags & ~(BTR_KEEP_POS_FLAG | BTR_KEEP_IBUF_BITMAP))
1988 	      == (BTR_NO_UNDO_LOG_FLAG | BTR_NO_LOCKING_FLAG
1989 		  | BTR_CREATE_FLAG | BTR_KEEP_SYS_FLAG));
1990 	ut_ad(fil_page_get_type(btr_cur_get_page(cursor)) == FIL_PAGE_INDEX);
1991 	ut_ad(btr_page_get_index_id(btr_cur_get_page(cursor)) == index->id);
1992 
1993 #ifdef UNIV_DEBUG
1994 	if (btr_cur_print_record_ops) {
1995 		btr_cur_trx_report(trx_id, index, "update ");
1996 		rec_print_new(stderr, rec, offsets);
1997 	}
1998 #endif /* UNIV_DEBUG */
1999 
2000 	block = btr_cur_get_block(cursor);
2001 	page_zip = buf_block_get_page_zip(block);
2002 
2003 	/* Check that enough space is available on the compressed page. */
2004 	if (page_zip) {
2005 		if (!btr_cur_update_alloc_zip(
2006 			    page_zip, btr_cur_get_page_cur(cursor),
2007 			    index, offsets, rec_offs_size(offsets),
2008 			    false, mtr)) {
2009 			return(DB_ZIP_OVERFLOW);
2010 		}
2011 
2012 		rec = btr_cur_get_rec(cursor);
2013 	}
2014 
2015 	/* Do lock checking and undo logging */
2016 	err = btr_cur_upd_lock_and_undo(flags, cursor, offsets,
2017 					update, cmpl_info,
2018 					thr, mtr, &roll_ptr);
2019 	if (UNIV_UNLIKELY(err != DB_SUCCESS)) {
2020 		/* We may need to update the IBUF_BITMAP_FREE
2021 		bits after a reorganize that was done in
2022 		btr_cur_update_alloc_zip(). */
2023 		goto func_exit;
2024 	}
2025 
2026 	if (!(flags & BTR_KEEP_SYS_FLAG)) {
2027 		row_upd_rec_sys_fields(rec, NULL, index, offsets,
2028 				       thr_get_trx(thr), roll_ptr);
2029 	}
2030 
2031 	was_delete_marked = rec_get_deleted_flag(
2032 		rec, page_is_comp(buf_block_get_frame(block)));
2033 
2034 	is_hashed = (block->index != NULL);
2035 
2036 	if (is_hashed) {
2037 		/* TO DO: Can we skip this if none of the fields
2038 		index->search_info->curr_n_fields
2039 		are being updated? */
2040 
2041 		/* The function row_upd_changes_ord_field_binary works only
2042 		if the update vector was built for a clustered index, we must
2043 		NOT call it if index is secondary */
2044 
2045 		if (!dict_index_is_clust(index)
2046 		    || row_upd_changes_ord_field_binary(index, update, thr,
2047 							NULL, NULL)) {
2048 
2049 			/* Remove possible hash index pointer to this record */
2050 			btr_search_update_hash_on_delete(cursor);
2051 		}
2052 
2053 		rw_lock_x_lock(&btr_search_latch);
2054 	}
2055 
2056 	row_upd_rec_in_place(rec, index, offsets, update, page_zip);
2057 
2058 	if (is_hashed) {
2059 		rw_lock_x_unlock(&btr_search_latch);
2060 	}
2061 
2062 	btr_cur_update_in_place_log(flags, rec, index, update,
2063 				    trx_id, roll_ptr, mtr);
2064 
2065 	if (was_delete_marked
2066 	    && !rec_get_deleted_flag(
2067 		    rec, page_is_comp(buf_block_get_frame(block)))) {
2068 		/* The new updated record owns its possible externally
2069 		stored fields */
2070 
2071 		btr_cur_unmark_extern_fields(page_zip,
2072 					     rec, index, offsets, mtr);
2073 	}
2074 
2075 	ut_ad(err == DB_SUCCESS);
2076 
2077 func_exit:
2078 	if (page_zip
2079 	    && !(flags & BTR_KEEP_IBUF_BITMAP)
2080 	    && !dict_index_is_clust(index)
2081 	    && page_is_leaf(buf_block_get_frame(block))) {
2082 		/* Update the free bits in the insert buffer. */
2083 		ibuf_update_free_bits_zip(block, mtr);
2084 	}
2085 
2086 	return(err);
2087 }
2088 
2089 /*************************************************************//**
2090 Tries to update a record on a page in an index tree. It is assumed that mtr
2091 holds an x-latch on the page. The operation does not succeed if there is too
2092 little space on the page or if the update would result in too empty a page,
2093 so that tree compression is recommended. We assume here that the ordering
2094 fields of the record do not change.
2095 @return error code, including
2096 @retval DB_SUCCESS on success
2097 @retval DB_OVERFLOW if the updated record does not fit
2098 @retval DB_UNDERFLOW if the page would become too empty
2099 @retval DB_ZIP_OVERFLOW if there is not enough space left
2100 on the compressed page (IBUF_BITMAP_FREE was reset outside mtr) */
2101 UNIV_INTERN
2102 dberr_t
btr_cur_optimistic_update(ulint flags,btr_cur_t * cursor,ulint ** offsets,mem_heap_t ** heap,const upd_t * update,ulint cmpl_info,que_thr_t * thr,trx_id_t trx_id,mtr_t * mtr)2103 btr_cur_optimistic_update(
2104 /*======================*/
2105 	ulint		flags,	/*!< in: undo logging and locking flags */
2106 	btr_cur_t*	cursor,	/*!< in: cursor on the record to update;
2107 				cursor stays valid and positioned on the
2108 				same record */
2109 	ulint**		offsets,/*!< out: offsets on cursor->page_cur.rec */
2110 	mem_heap_t**	heap,	/*!< in/out: pointer to NULL or memory heap */
2111 	const upd_t*	update,	/*!< in: update vector; this must also
2112 				contain trx id and roll ptr fields */
2113 	ulint		cmpl_info,/*!< in: compiler info on secondary index
2114 				updates */
2115 	que_thr_t*	thr,	/*!< in: query thread */
2116 	trx_id_t	trx_id,	/*!< in: transaction id */
2117 	mtr_t*		mtr)	/*!< in/out: mini-transaction; if this
2118 				is a secondary index, the caller must
2119 				mtr_commit(mtr) before latching any
2120 				further pages */
2121 {
2122 	dict_index_t*	index;
2123 	page_cur_t*	page_cursor;
2124 	dberr_t		err;
2125 	buf_block_t*	block;
2126 	page_t*		page;
2127 	page_zip_des_t*	page_zip;
2128 	rec_t*		rec;
2129 	ulint		max_size;
2130 	ulint		new_rec_size;
2131 	ulint		old_rec_size;
2132 	ulint		max_ins_size = 0;
2133 	dtuple_t*	new_entry;
2134 	roll_ptr_t	roll_ptr;
2135 	ulint		i;
2136 	ulint		n_ext;
2137 
2138 	block = btr_cur_get_block(cursor);
2139 	page = buf_block_get_frame(block);
2140 	rec = btr_cur_get_rec(cursor);
2141 	index = cursor->index;
2142 	ut_ad(!!page_rec_is_comp(rec) == dict_table_is_comp(index->table));
2143 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
2144 	/* The insert buffer tree should never be updated in place. */
2145 	ut_ad(!dict_index_is_ibuf(index));
2146 	ut_ad(dict_index_is_online_ddl(index) == !!(flags & BTR_CREATE_FLAG)
2147 	      || dict_index_is_clust(index));
2148 	ut_ad(thr_get_trx(thr)->id == trx_id
2149 	      || (flags & ~(BTR_KEEP_POS_FLAG | BTR_KEEP_IBUF_BITMAP))
2150 	      == (BTR_NO_UNDO_LOG_FLAG | BTR_NO_LOCKING_FLAG
2151 		  | BTR_CREATE_FLAG | BTR_KEEP_SYS_FLAG));
2152 	ut_ad(fil_page_get_type(page) == FIL_PAGE_INDEX);
2153 	ut_ad(btr_page_get_index_id(page) == index->id);
2154 
2155 	*offsets = rec_get_offsets(rec, index, *offsets,
2156 				   ULINT_UNDEFINED, heap);
2157 #if defined UNIV_DEBUG || defined UNIV_BLOB_LIGHT_DEBUG
2158 	ut_a(!rec_offs_any_null_extern(rec, *offsets)
2159 	     || trx_is_recv(thr_get_trx(thr)));
2160 #endif /* UNIV_DEBUG || UNIV_BLOB_LIGHT_DEBUG */
2161 
2162 #ifdef UNIV_DEBUG
2163 	if (btr_cur_print_record_ops) {
2164 		btr_cur_trx_report(trx_id, index, "update ");
2165 		rec_print_new(stderr, rec, *offsets);
2166 	}
2167 #endif /* UNIV_DEBUG */
2168 
2169 	if (!row_upd_changes_field_size_or_external(index, *offsets, update)) {
2170 
2171 		/* The simplest and the most common case: the update does not
2172 		change the size of any field and none of the updated fields is
2173 		externally stored in rec or update, and there is enough space
2174 		on the compressed page to log the update. */
2175 
2176 		return(btr_cur_update_in_place(
2177 			       flags, cursor, *offsets, update,
2178 			       cmpl_info, thr, trx_id, mtr));
2179 	}
2180 
2181 	if (rec_offs_any_extern(*offsets)) {
2182 any_extern:
2183 		/* Externally stored fields are treated in pessimistic
2184 		update */
2185 
2186 		return(DB_OVERFLOW);
2187 	}
2188 
2189 	for (i = 0; i < upd_get_n_fields(update); i++) {
2190 		if (dfield_is_ext(&upd_get_nth_field(update, i)->new_val)) {
2191 
2192 			goto any_extern;
2193 		}
2194 	}
2195 
2196 	page_cursor = btr_cur_get_page_cur(cursor);
2197 
2198 	if (!*heap) {
2199 		*heap = mem_heap_create(
2200 			rec_offs_size(*offsets)
2201 			+ DTUPLE_EST_ALLOC(rec_offs_n_fields(*offsets)));
2202 	}
2203 
2204 	new_entry = row_rec_to_index_entry(rec, index, *offsets,
2205 					   &n_ext, *heap);
2206 	/* We checked above that there are no externally stored fields. */
2207 	ut_a(!n_ext);
2208 
2209 	/* The page containing the clustered index record
2210 	corresponding to new_entry is latched in mtr.
2211 	Thus the following call is safe. */
2212 	row_upd_index_replace_new_col_vals_index_pos(new_entry, index, update,
2213 						     FALSE, *heap);
2214 	old_rec_size = rec_offs_size(*offsets);
2215 	new_rec_size = rec_get_converted_size(index, new_entry, 0);
2216 
2217 	page_zip = buf_block_get_page_zip(block);
2218 #ifdef UNIV_ZIP_DEBUG
2219 	ut_a(!page_zip || page_zip_validate(page_zip, page, index));
2220 #endif /* UNIV_ZIP_DEBUG */
2221 
2222 	if (page_zip) {
2223 		if (!btr_cur_update_alloc_zip(
2224 			    page_zip, page_cursor, index, *offsets,
2225 			    new_rec_size, true, mtr)) {
2226 			return(DB_ZIP_OVERFLOW);
2227 		}
2228 
2229 		rec = page_cur_get_rec(page_cursor);
2230 	}
2231 
2232 	if (UNIV_UNLIKELY(new_rec_size
2233 			  >= (page_get_free_space_of_empty(page_is_comp(page))
2234 			      / 2))) {
2235 		/* We may need to update the IBUF_BITMAP_FREE
2236 		bits after a reorganize that was done in
2237 		btr_cur_update_alloc_zip(). */
2238 		err = DB_OVERFLOW;
2239 		goto func_exit;
2240 	}
2241 
2242 	if (UNIV_UNLIKELY(page_get_data_size(page)
2243 			  - old_rec_size + new_rec_size
2244 			  < BTR_CUR_PAGE_COMPRESS_LIMIT)) {
2245 		/* We may need to update the IBUF_BITMAP_FREE
2246 		bits after a reorganize that was done in
2247 		btr_cur_update_alloc_zip(). */
2248 
2249 		/* The page would become too empty */
2250 		err = DB_UNDERFLOW;
2251 		goto func_exit;
2252 	}
2253 
2254 	/* We do not attempt to reorganize if the page is compressed.
2255 	This is because the page may fail to compress after reorganization. */
2256 	max_size = page_zip
2257 		? page_get_max_insert_size(page, 1)
2258 		: (old_rec_size
2259 		   + page_get_max_insert_size_after_reorganize(page, 1));
2260 
2261 	if (!page_zip) {
2262 		max_ins_size = page_get_max_insert_size_after_reorganize(page, 1);
2263 	}
2264 
2265 	if (!(((max_size >= BTR_CUR_PAGE_REORGANIZE_LIMIT)
2266 	       && (max_size >= new_rec_size))
2267 	      || (page_get_n_recs(page) <= 1))) {
2268 
2269 		/* We may need to update the IBUF_BITMAP_FREE
2270 		bits after a reorganize that was done in
2271 		btr_cur_update_alloc_zip(). */
2272 
2273 		/* There was not enough space, or it did not pay to
2274 		reorganize: for simplicity, we decide what to do assuming a
2275 		reorganization is needed, though it might not be necessary */
2276 
2277 		err = DB_OVERFLOW;
2278 		goto func_exit;
2279 	}
2280 
2281 	/* Do lock checking and undo logging */
2282 	err = btr_cur_upd_lock_and_undo(flags, cursor, *offsets,
2283 					update, cmpl_info,
2284 					thr, mtr, &roll_ptr);
2285 	if (err != DB_SUCCESS) {
2286 		/* We may need to update the IBUF_BITMAP_FREE
2287 		bits after a reorganize that was done in
2288 		btr_cur_update_alloc_zip(). */
2289 		goto func_exit;
2290 	}
2291 
2292 	/* Ok, we may do the replacement. Store on the page infimum the
2293 	explicit locks on rec, before deleting rec (see the comment in
2294 	btr_cur_pessimistic_update). */
2295 
2296 	lock_rec_store_on_page_infimum(block, rec);
2297 
2298 	btr_search_update_hash_on_delete(cursor);
2299 
2300 	page_cur_delete_rec(page_cursor, index, *offsets, mtr);
2301 
2302 	page_cur_move_to_prev(page_cursor);
2303 
2304 	if (!(flags & BTR_KEEP_SYS_FLAG)) {
2305 		row_upd_index_entry_sys_field(new_entry, index, DATA_ROLL_PTR,
2306 					      roll_ptr);
2307 		row_upd_index_entry_sys_field(new_entry, index, DATA_TRX_ID,
2308 					      trx_id);
2309 	}
2310 
2311 	/* There are no externally stored columns in new_entry */
2312 	rec = btr_cur_insert_if_possible(
2313 		cursor, new_entry, offsets, heap, 0/*n_ext*/, mtr);
2314 	ut_a(rec); /* <- We calculated above the insert would fit */
2315 
2316 	/* Restore the old explicit lock state on the record */
2317 
2318 	lock_rec_restore_from_page_infimum(block, rec, block);
2319 
2320 	page_cur_move_to_next(page_cursor);
2321 	ut_ad(err == DB_SUCCESS);
2322 
2323 func_exit:
2324 	if (!(flags & BTR_KEEP_IBUF_BITMAP)
2325 	    && !dict_index_is_clust(index)
2326 	    && page_is_leaf(page)) {
2327 
2328 		if (page_zip) {
2329 			ibuf_update_free_bits_zip(block, mtr);
2330 		} else {
2331 			ibuf_update_free_bits_low(block, max_ins_size, mtr);
2332 		}
2333 	}
2334 
2335 	return(err);
2336 }
2337 
2338 /*************************************************************//**
2339 If, in a split, a new supremum record was created as the predecessor of the
2340 updated record, the supremum record must inherit exactly the locks on the
2341 updated record. In the split it may have inherited locks from the successor
2342 of the updated record, which is not correct. This function restores the
2343 right locks for the new supremum. */
2344 static
2345 void
btr_cur_pess_upd_restore_supremum(buf_block_t * block,const rec_t * rec,mtr_t * mtr)2346 btr_cur_pess_upd_restore_supremum(
2347 /*==============================*/
2348 	buf_block_t*	block,	/*!< in: buffer block of rec */
2349 	const rec_t*	rec,	/*!< in: updated record */
2350 	mtr_t*		mtr)	/*!< in: mtr */
2351 {
2352 	page_t*		page;
2353 	buf_block_t*	prev_block;
2354 	ulint		space;
2355 	ulint		zip_size;
2356 	ulint		prev_page_no;
2357 
2358 	page = buf_block_get_frame(block);
2359 
2360 	if (page_rec_get_next(page_get_infimum_rec(page)) != rec) {
2361 		/* Updated record is not the first user record on its page */
2362 
2363 		return;
2364 	}
2365 
2366 	space = buf_block_get_space(block);
2367 	zip_size = buf_block_get_zip_size(block);
2368 	prev_page_no = btr_page_get_prev(page, mtr);
2369 
2370 	ut_ad(prev_page_no != FIL_NULL);
2371 	prev_block = buf_page_get_with_no_latch(space, zip_size,
2372 						prev_page_no, mtr);
2373 #ifdef UNIV_BTR_DEBUG
2374 	ut_a(btr_page_get_next(prev_block->frame, mtr)
2375 	     == page_get_page_no(page));
2376 #endif /* UNIV_BTR_DEBUG */
2377 
2378 	/* We must already have an x-latch on prev_block! */
2379 	ut_ad(mtr_memo_contains(mtr, prev_block, MTR_MEMO_PAGE_X_FIX));
2380 
2381 	lock_rec_reset_and_inherit_gap_locks(prev_block, block,
2382 					     PAGE_HEAP_NO_SUPREMUM,
2383 					     page_rec_get_heap_no(rec));
2384 }
2385 
2386 /*************************************************************//**
2387 Check if the total length of the modified blob for the row is within 10%
2388 of the total redo log size.  This constraint on the blob length is to
2389 avoid overwriting the redo logs beyond the last checkpoint lsn.
2390 @return	DB_SUCCESS or DB_TOO_BIG_FOR_REDO. */
2391 static
2392 dberr_t
btr_check_blob_limit(const big_rec_t * big_rec_vec)2393 btr_check_blob_limit(const big_rec_t*	big_rec_vec)
2394 {
2395 	const	ib_uint64_t redo_size = srv_n_log_files * srv_log_file_size
2396 		* UNIV_PAGE_SIZE;
2397 	const	ib_uint64_t redo_10p = redo_size / 10;
2398 	ib_uint64_t	total_blob_len = 0;
2399 	dberr_t	err = DB_SUCCESS;
2400 
2401 	/* Calculate the total number of bytes for blob data */
2402 	for (ulint i = 0; i < big_rec_vec->n_fields; i++) {
2403 		total_blob_len += big_rec_vec->fields[i].len;
2404 	}
2405 
2406 	if (total_blob_len > redo_10p) {
2407 		ib_logf(IB_LOG_LEVEL_ERROR, "The total blob data"
2408 			" length (" UINT64PF ") is greater than"
2409 			" 10%% of the total redo log size (" UINT64PF
2410 			"). Please increase total redo log size.",
2411 			total_blob_len, redo_size);
2412 		err = DB_TOO_BIG_FOR_REDO;
2413 	}
2414 
2415 	return(err);
2416 }
2417 
2418 /*************************************************************//**
2419 Performs an update of a record on a page of a tree. It is assumed
2420 that mtr holds an x-latch on the tree and on the cursor page. If the
2421 update is made on the leaf level, to avoid deadlocks, mtr must also
2422 own x-latches to brothers of page, if those brothers exist. We assume
2423 here that the ordering fields of the record do not change.
2424 @return	DB_SUCCESS or error code */
2425 UNIV_INTERN
2426 dberr_t
btr_cur_pessimistic_update(ulint flags,btr_cur_t * cursor,ulint ** offsets,mem_heap_t ** offsets_heap,mem_heap_t * entry_heap,big_rec_t ** big_rec,const upd_t * update,ulint cmpl_info,que_thr_t * thr,trx_id_t trx_id,mtr_t * mtr)2427 btr_cur_pessimistic_update(
2428 /*=======================*/
2429 	ulint		flags,	/*!< in: undo logging, locking, and rollback
2430 				flags */
2431 	btr_cur_t*	cursor,	/*!< in/out: cursor on the record to update;
2432 				cursor may become invalid if *big_rec == NULL
2433 				|| !(flags & BTR_KEEP_POS_FLAG) */
2434 	ulint**		offsets,/*!< out: offsets on cursor->page_cur.rec */
2435 	mem_heap_t**	offsets_heap,
2436 				/*!< in/out: pointer to memory heap
2437 				that can be emptied, or NULL */
2438 	mem_heap_t*	entry_heap,
2439 				/*!< in/out: memory heap for allocating
2440 				big_rec and the index tuple */
2441 	big_rec_t**	big_rec,/*!< out: big rec vector whose fields have to
2442 				be stored externally by the caller, or NULL */
2443 	const upd_t*	update,	/*!< in: update vector; this is allowed also
2444 				contain trx id and roll ptr fields, but
2445 				the values in update vector have no effect */
2446 	ulint		cmpl_info,/*!< in: compiler info on secondary index
2447 				updates */
2448 	que_thr_t*	thr,	/*!< in: query thread */
2449 	trx_id_t	trx_id,	/*!< in: transaction id */
2450 	mtr_t*		mtr)	/*!< in/out: mini-transaction; must be
2451 				committed before latching any further pages */
2452 {
2453 	big_rec_t*	big_rec_vec	= NULL;
2454 	big_rec_t*	dummy_big_rec;
2455 	dict_index_t*	index;
2456 	buf_block_t*	block;
2457 	page_t*		page;
2458 	page_zip_des_t*	page_zip;
2459 	rec_t*		rec;
2460 	page_cur_t*	page_cursor;
2461 	dberr_t		err;
2462 	dberr_t		optim_err;
2463 	roll_ptr_t	roll_ptr;
2464 	ibool		was_first;
2465 	ulint		n_reserved	= 0;
2466 	ulint		n_ext;
2467 	ulint		max_ins_size	= 0;
2468 
2469 	*offsets = NULL;
2470 	*big_rec = NULL;
2471 
2472 	block = btr_cur_get_block(cursor);
2473 	page = buf_block_get_frame(block);
2474 	page_zip = buf_block_get_page_zip(block);
2475 	index = cursor->index;
2476 
2477 	ut_ad(mtr_memo_contains(mtr, dict_index_get_lock(index),
2478 				MTR_MEMO_X_LOCK));
2479 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
2480 #ifdef UNIV_ZIP_DEBUG
2481 	ut_a(!page_zip || page_zip_validate(page_zip, page, index));
2482 #endif /* UNIV_ZIP_DEBUG */
2483 	/* The insert buffer tree should never be updated in place. */
2484 	ut_ad(!dict_index_is_ibuf(index));
2485 	ut_ad(dict_index_is_online_ddl(index) == !!(flags & BTR_CREATE_FLAG)
2486 	      || dict_index_is_clust(index));
2487 	ut_ad(thr_get_trx(thr)->id == trx_id
2488 	      || (flags & ~BTR_KEEP_POS_FLAG)
2489 	      == (BTR_NO_UNDO_LOG_FLAG | BTR_NO_LOCKING_FLAG
2490 		  | BTR_CREATE_FLAG | BTR_KEEP_SYS_FLAG));
2491 
2492 	err = optim_err = btr_cur_optimistic_update(
2493 		flags | BTR_KEEP_IBUF_BITMAP,
2494 		cursor, offsets, offsets_heap, update,
2495 		cmpl_info, thr, trx_id, mtr);
2496 
2497 	switch (err) {
2498 	case DB_ZIP_OVERFLOW:
2499 	case DB_UNDERFLOW:
2500 	case DB_OVERFLOW:
2501 		break;
2502 	default:
2503 	err_exit:
2504 		/* We suppressed this with BTR_KEEP_IBUF_BITMAP.
2505 		For DB_ZIP_OVERFLOW, the IBUF_BITMAP_FREE bits were
2506 		already reset by btr_cur_update_alloc_zip() if the
2507 		page was recompressed. */
2508 		if (page_zip
2509 		    && optim_err != DB_ZIP_OVERFLOW
2510 		    && !dict_index_is_clust(index)
2511 		    && page_is_leaf(page)) {
2512 			ibuf_update_free_bits_zip(block, mtr);
2513 		}
2514 
2515 		return(err);
2516 	}
2517 
2518 	/* Do lock checking and undo logging */
2519 	err = btr_cur_upd_lock_and_undo(flags, cursor, *offsets,
2520 					update, cmpl_info,
2521 					thr, mtr, &roll_ptr);
2522 	if (err != DB_SUCCESS) {
2523 		goto err_exit;
2524 	}
2525 
2526 	if (optim_err == DB_OVERFLOW) {
2527 		ulint	reserve_flag;
2528 
2529 		/* First reserve enough free space for the file segments
2530 		of the index tree, so that the update will not fail because
2531 		of lack of space */
2532 
2533 		ulint	n_extents = cursor->tree_height / 16 + 3;
2534 
2535 		if (flags & BTR_NO_UNDO_LOG_FLAG) {
2536 			reserve_flag = FSP_CLEANING;
2537 		} else {
2538 			reserve_flag = FSP_NORMAL;
2539 		}
2540 
2541 		if (!fsp_reserve_free_extents(&n_reserved, index->space,
2542 					      n_extents, reserve_flag, mtr)) {
2543 			err = DB_OUT_OF_FILE_SPACE;
2544 			goto err_exit;
2545 		}
2546 	}
2547 
2548 	rec = btr_cur_get_rec(cursor);
2549 
2550 	*offsets = rec_get_offsets(
2551 		rec, index, *offsets, ULINT_UNDEFINED, offsets_heap);
2552 
2553 	dtuple_t*	new_entry = row_rec_to_index_entry(
2554 		rec, index, *offsets, &n_ext, entry_heap);
2555 
2556 	/* The page containing the clustered index record
2557 	corresponding to new_entry is latched in mtr.  If the
2558 	clustered index record is delete-marked, then its externally
2559 	stored fields cannot have been purged yet, because then the
2560 	purge would also have removed the clustered index record
2561 	itself.  Thus the following call is safe. */
2562 	row_upd_index_replace_new_col_vals_index_pos(new_entry, index, update,
2563 						     FALSE, entry_heap);
2564 	if (!(flags & BTR_KEEP_SYS_FLAG)) {
2565 		row_upd_index_entry_sys_field(new_entry, index, DATA_ROLL_PTR,
2566 					      roll_ptr);
2567 		row_upd_index_entry_sys_field(new_entry, index, DATA_TRX_ID,
2568 					      trx_id);
2569 	}
2570 
2571 	if ((flags & BTR_NO_UNDO_LOG_FLAG) && rec_offs_any_extern(*offsets)) {
2572 		/* We are in a transaction rollback undoing a row
2573 		update: we must free possible externally stored fields
2574 		which got new values in the update, if they are not
2575 		inherited values. They can be inherited if we have
2576 		updated the primary key to another value, and then
2577 		update it back again. */
2578 
2579 		ut_ad(big_rec_vec == NULL);
2580 
2581 		btr_rec_free_updated_extern_fields(
2582 			index, rec, page_zip, *offsets, update,
2583 			trx_is_recv(thr_get_trx(thr))
2584 			? RB_RECOVERY : RB_NORMAL, mtr);
2585 	}
2586 
2587 	/* We have to set appropriate extern storage bits in the new
2588 	record to be inserted: we have to remember which fields were such */
2589 
2590 	ut_ad(!page_is_comp(page) || !rec_get_node_ptr_flag(rec));
2591 	ut_ad(rec_offs_validate(rec, index, *offsets));
2592 	n_ext += btr_push_update_extern_fields(new_entry, update, entry_heap);
2593 
2594 	if (page_zip) {
2595 		ut_ad(page_is_comp(page));
2596 		if (page_zip_rec_needs_ext(
2597 			    rec_get_converted_size(index, new_entry, n_ext),
2598 			    TRUE,
2599 			    dict_index_get_n_fields(index),
2600 			    page_zip_get_size(page_zip))) {
2601 
2602 			goto make_external;
2603 		}
2604 	} else if (page_zip_rec_needs_ext(
2605 			   rec_get_converted_size(index, new_entry, n_ext),
2606 			   page_is_comp(page), 0, 0)) {
2607 make_external:
2608 		big_rec_vec = dtuple_convert_big_rec(index, new_entry, &n_ext);
2609 		if (UNIV_UNLIKELY(big_rec_vec == NULL)) {
2610 
2611 			/* We cannot goto return_after_reservations,
2612 			because we may need to update the
2613 			IBUF_BITMAP_FREE bits, which was suppressed by
2614 			BTR_KEEP_IBUF_BITMAP. */
2615 #ifdef UNIV_ZIP_DEBUG
2616 			ut_a(!page_zip
2617 			     || page_zip_validate(page_zip, page, index));
2618 #endif /* UNIV_ZIP_DEBUG */
2619 			if (n_reserved > 0) {
2620 				fil_space_release_free_extents(
2621 					index->space, n_reserved);
2622 			}
2623 
2624 			err = DB_TOO_BIG_RECORD;
2625 			goto err_exit;
2626 		}
2627 
2628 		ut_ad(page_is_leaf(page));
2629 		ut_ad(dict_index_is_clust(index));
2630 		ut_ad(flags & BTR_KEEP_POS_FLAG);
2631 	}
2632 
2633 	if (big_rec_vec) {
2634 
2635 		err = btr_check_blob_limit(big_rec_vec);
2636 
2637 		if (err != DB_SUCCESS) {
2638 			if (n_reserved > 0) {
2639 				fil_space_release_free_extents(
2640 					index->space, n_reserved);
2641 			}
2642 			goto err_exit;
2643 		}
2644 	}
2645 
2646 	if (!page_zip) {
2647 		max_ins_size = page_get_max_insert_size_after_reorganize(page, 1);
2648 	}
2649 
2650 	/* Store state of explicit locks on rec on the page infimum record,
2651 	before deleting rec. The page infimum acts as a dummy carrier of the
2652 	locks, taking care also of lock releases, before we can move the locks
2653 	back on the actual record. There is a special case: if we are
2654 	inserting on the root page and the insert causes a call of
2655 	btr_root_raise_and_insert. Therefore we cannot in the lock system
2656 	delete the lock structs set on the root page even if the root
2657 	page carries just node pointers. */
2658 
2659 	lock_rec_store_on_page_infimum(block, rec);
2660 
2661 	btr_search_update_hash_on_delete(cursor);
2662 
2663 #ifdef UNIV_ZIP_DEBUG
2664 	ut_a(!page_zip || page_zip_validate(page_zip, page, index));
2665 #endif /* UNIV_ZIP_DEBUG */
2666 	page_cursor = btr_cur_get_page_cur(cursor);
2667 
2668 	page_cur_delete_rec(page_cursor, index, *offsets, mtr);
2669 
2670 	page_cur_move_to_prev(page_cursor);
2671 
2672 	rec = btr_cur_insert_if_possible(cursor, new_entry,
2673 					 offsets, offsets_heap, n_ext, mtr);
2674 
2675 	if (rec) {
2676 		page_cursor->rec = rec;
2677 
2678 		lock_rec_restore_from_page_infimum(btr_cur_get_block(cursor),
2679 						   rec, block);
2680 
2681 		if (!rec_get_deleted_flag(rec, rec_offs_comp(*offsets))) {
2682 			/* The new inserted record owns its possible externally
2683 			stored fields */
2684 			btr_cur_unmark_extern_fields(
2685 				page_zip, rec, index, *offsets, mtr);
2686 		}
2687 
2688 		bool adjust = big_rec_vec && (flags & BTR_KEEP_POS_FLAG);
2689 
2690 		if (btr_cur_compress_if_useful(cursor, adjust, mtr)) {
2691 			if (adjust) {
2692 				rec_offs_make_valid(
2693 					page_cursor->rec, index, *offsets);
2694 			}
2695 		} else if (!dict_index_is_clust(index)
2696 			   && page_is_leaf(page)) {
2697 
2698 			/* Update the free bits in the insert buffer.
2699 			This is the same block which was skipped by
2700 			BTR_KEEP_IBUF_BITMAP. */
2701 			if (page_zip) {
2702 				ibuf_update_free_bits_zip(block, mtr);
2703 			} else {
2704 				ibuf_update_free_bits_low(block, max_ins_size,
2705 							  mtr);
2706 			}
2707 		}
2708 
2709 		err = DB_SUCCESS;
2710 		goto return_after_reservations;
2711 	} else {
2712 		/* If the page is compressed and it initially
2713 		compresses very well, and there is a subsequent insert
2714 		of a badly-compressing record, it is possible for
2715 		btr_cur_optimistic_update() to return DB_UNDERFLOW and
2716 		btr_cur_insert_if_possible() to return FALSE. */
2717 		ut_a(page_zip || optim_err != DB_UNDERFLOW);
2718 
2719 		/* Out of space: reset the free bits.
2720 		This is the same block which was skipped by
2721 		BTR_KEEP_IBUF_BITMAP. */
2722 		if (!dict_index_is_clust(index) && page_is_leaf(page)) {
2723 			ibuf_reset_free_bits(block);
2724 		}
2725 	}
2726 
2727 	if (big_rec_vec) {
2728 		ut_ad(page_is_leaf(page));
2729 		ut_ad(dict_index_is_clust(index));
2730 		ut_ad(flags & BTR_KEEP_POS_FLAG);
2731 
2732 		/* btr_page_split_and_insert() in
2733 		btr_cur_pessimistic_insert() invokes
2734 		mtr_memo_release(mtr, index->lock, MTR_MEMO_X_LOCK).
2735 		We must keep the index->lock when we created a
2736 		big_rec, so that row_upd_clust_rec() can store the
2737 		big_rec in the same mini-transaction. */
2738 
2739 		mtr_x_lock(dict_index_get_lock(index), mtr);
2740 	}
2741 
2742 	/* Was the record to be updated positioned as the first user
2743 	record on its page? */
2744 	was_first = page_cur_is_before_first(page_cursor);
2745 
2746 	/* Lock checks and undo logging were already performed by
2747 	btr_cur_upd_lock_and_undo(). We do not try
2748 	btr_cur_optimistic_insert() because
2749 	btr_cur_insert_if_possible() already failed above. */
2750 
2751 	err = btr_cur_pessimistic_insert(BTR_NO_UNDO_LOG_FLAG
2752 					 | BTR_NO_LOCKING_FLAG
2753 					 | BTR_KEEP_SYS_FLAG,
2754 					 cursor, offsets, offsets_heap,
2755 					 new_entry, &rec,
2756 					 &dummy_big_rec, n_ext, NULL, mtr);
2757 	ut_a(rec);
2758 	ut_a(err == DB_SUCCESS);
2759 	ut_a(dummy_big_rec == NULL);
2760 	ut_ad(rec_offs_validate(rec, cursor->index, *offsets));
2761 	page_cursor->rec = rec;
2762 
2763 	if (dict_index_is_sec_or_ibuf(index)) {
2764 		/* Update PAGE_MAX_TRX_ID in the index page header.
2765 		It was not updated by btr_cur_pessimistic_insert()
2766 		because of BTR_NO_LOCKING_FLAG. */
2767 		buf_block_t*	rec_block;
2768 
2769 		rec_block = btr_cur_get_block(cursor);
2770 
2771 		page_update_max_trx_id(rec_block,
2772 				       buf_block_get_page_zip(rec_block),
2773 				       trx_id, mtr);
2774 	}
2775 
2776 	if (!rec_get_deleted_flag(rec, rec_offs_comp(*offsets))) {
2777 		/* The new inserted record owns its possible externally
2778 		stored fields */
2779 		buf_block_t*	rec_block = btr_cur_get_block(cursor);
2780 
2781 #ifdef UNIV_ZIP_DEBUG
2782 		ut_a(!page_zip || page_zip_validate(page_zip, page, index));
2783 		page = buf_block_get_frame(rec_block);
2784 #endif /* UNIV_ZIP_DEBUG */
2785 		page_zip = buf_block_get_page_zip(rec_block);
2786 
2787 		btr_cur_unmark_extern_fields(page_zip,
2788 					     rec, index, *offsets, mtr);
2789 	}
2790 
2791 	lock_rec_restore_from_page_infimum(btr_cur_get_block(cursor),
2792 					   rec, block);
2793 
2794 	/* If necessary, restore also the correct lock state for a new,
2795 	preceding supremum record created in a page split. While the old
2796 	record was nonexistent, the supremum might have inherited its locks
2797 	from a wrong record. */
2798 
2799 	if (!was_first) {
2800 		btr_cur_pess_upd_restore_supremum(btr_cur_get_block(cursor),
2801 						  rec, mtr);
2802 	}
2803 
2804 return_after_reservations:
2805 #ifdef UNIV_ZIP_DEBUG
2806 	ut_a(!page_zip || page_zip_validate(page_zip, page, index));
2807 #endif /* UNIV_ZIP_DEBUG */
2808 
2809 	if (n_reserved > 0) {
2810 		fil_space_release_free_extents(index->space, n_reserved);
2811 	}
2812 
2813 	*big_rec = big_rec_vec;
2814 
2815 	return(err);
2816 }
2817 
2818 /*==================== B-TREE DELETE MARK AND UNMARK ===============*/
2819 
2820 /****************************************************************//**
2821 Writes the redo log record for delete marking or unmarking of an index
2822 record. */
2823 UNIV_INLINE
2824 void
btr_cur_del_mark_set_clust_rec_log(rec_t * rec,dict_index_t * index,trx_id_t trx_id,roll_ptr_t roll_ptr,mtr_t * mtr)2825 btr_cur_del_mark_set_clust_rec_log(
2826 /*===============================*/
2827 	rec_t*		rec,	/*!< in: record */
2828 	dict_index_t*	index,	/*!< in: index of the record */
2829 	trx_id_t	trx_id,	/*!< in: transaction id */
2830 	roll_ptr_t	roll_ptr,/*!< in: roll ptr to the undo log record */
2831 	mtr_t*		mtr)	/*!< in: mtr */
2832 {
2833 	byte*	log_ptr;
2834 
2835 	ut_ad(!!page_rec_is_comp(rec) == dict_table_is_comp(index->table));
2836 
2837 	log_ptr = mlog_open_and_write_index(mtr, rec, index,
2838 					    page_rec_is_comp(rec)
2839 					    ? MLOG_COMP_REC_CLUST_DELETE_MARK
2840 					    : MLOG_REC_CLUST_DELETE_MARK,
2841 					    1 + 1 + DATA_ROLL_PTR_LEN
2842 					    + 14 + 2);
2843 
2844 	if (!log_ptr) {
2845 		/* Logging in mtr is switched off during crash recovery */
2846 		return;
2847 	}
2848 
2849 	*log_ptr++ = 0;
2850 	*log_ptr++ = 1;
2851 
2852 	log_ptr = row_upd_write_sys_vals_to_log(
2853 		index, trx_id, roll_ptr, log_ptr, mtr);
2854 	mach_write_to_2(log_ptr, page_offset(rec));
2855 	log_ptr += 2;
2856 
2857 	mlog_close(mtr, log_ptr);
2858 }
2859 #endif /* !UNIV_HOTBACKUP */
2860 
2861 /****************************************************************//**
2862 Parses the redo log record for delete marking or unmarking of a clustered
2863 index record.
2864 @return	end of log record or NULL */
2865 UNIV_INTERN
2866 byte*
btr_cur_parse_del_mark_set_clust_rec(byte * ptr,byte * end_ptr,page_t * page,page_zip_des_t * page_zip,dict_index_t * index)2867 btr_cur_parse_del_mark_set_clust_rec(
2868 /*=================================*/
2869 	byte*		ptr,	/*!< in: buffer */
2870 	byte*		end_ptr,/*!< in: buffer end */
2871 	page_t*		page,	/*!< in/out: page or NULL */
2872 	page_zip_des_t*	page_zip,/*!< in/out: compressed page, or NULL */
2873 	dict_index_t*	index)	/*!< in: index corresponding to page */
2874 {
2875 	ulint		flags;
2876 	ulint		val;
2877 	ulint		pos;
2878 	trx_id_t	trx_id;
2879 	roll_ptr_t	roll_ptr;
2880 	ulint		offset;
2881 	rec_t*		rec;
2882 
2883 	ut_ad(!page
2884 	      || !!page_is_comp(page) == dict_table_is_comp(index->table));
2885 
2886 	if (end_ptr < ptr + 2) {
2887 
2888 		return(NULL);
2889 	}
2890 
2891 	flags = mach_read_from_1(ptr);
2892 	ptr++;
2893 	val = mach_read_from_1(ptr);
2894 	ptr++;
2895 
2896 	ptr = row_upd_parse_sys_vals(ptr, end_ptr, &pos, &trx_id, &roll_ptr);
2897 
2898 	if (ptr == NULL) {
2899 
2900 		return(NULL);
2901 	}
2902 
2903 	if (end_ptr < ptr + 2) {
2904 
2905 		return(NULL);
2906 	}
2907 
2908 	offset = mach_read_from_2(ptr);
2909 	ptr += 2;
2910 
2911 	ut_a(offset <= UNIV_PAGE_SIZE);
2912 
2913 	if (page) {
2914 		rec = page + offset;
2915 
2916 		/* We do not need to reserve btr_search_latch, as the page
2917 		is only being recovered, and there cannot be a hash index to
2918 		it. Besides, these fields are being updated in place
2919 		and the adaptive hash index does not depend on them. */
2920 
2921 		btr_rec_set_deleted_flag(rec, page_zip, val);
2922 
2923 		if (!(flags & BTR_KEEP_SYS_FLAG)) {
2924 			mem_heap_t*	heap		= NULL;
2925 			ulint		offsets_[REC_OFFS_NORMAL_SIZE];
2926 			rec_offs_init(offsets_);
2927 
2928 			row_upd_rec_sys_fields_in_recovery(
2929 				rec, page_zip,
2930 				rec_get_offsets(rec, index, offsets_,
2931 						ULINT_UNDEFINED, &heap),
2932 				pos, trx_id, roll_ptr);
2933 			if (UNIV_LIKELY_NULL(heap)) {
2934 				mem_heap_free(heap);
2935 			}
2936 		}
2937 	}
2938 
2939 	return(ptr);
2940 }
2941 
2942 #ifndef UNIV_HOTBACKUP
2943 /***********************************************************//**
2944 Marks a clustered index record deleted. Writes an undo log record to
2945 undo log on this delete marking. Writes in the trx id field the id
2946 of the deleting transaction, and in the roll ptr field pointer to the
2947 undo log record created.
2948 @return	DB_SUCCESS, DB_LOCK_WAIT, or error number */
2949 UNIV_INTERN
2950 dberr_t
btr_cur_del_mark_set_clust_rec(buf_block_t * block,rec_t * rec,dict_index_t * index,const ulint * offsets,que_thr_t * thr,mtr_t * mtr)2951 btr_cur_del_mark_set_clust_rec(
2952 /*===========================*/
2953 	buf_block_t*	block,	/*!< in/out: buffer block of the record */
2954 	rec_t*		rec,	/*!< in/out: record */
2955 	dict_index_t*	index,	/*!< in: clustered index of the record */
2956 	const ulint*	offsets,/*!< in: rec_get_offsets(rec) */
2957 	que_thr_t*	thr,	/*!< in: query thread */
2958 	mtr_t*		mtr)	/*!< in/out: mini-transaction */
2959 {
2960 	roll_ptr_t	roll_ptr;
2961 	dberr_t		err;
2962 	page_zip_des_t*	page_zip;
2963 	trx_t*		trx;
2964 
2965 	ut_ad(dict_index_is_clust(index));
2966 	ut_ad(rec_offs_validate(rec, index, offsets));
2967 	ut_ad(!!page_rec_is_comp(rec) == dict_table_is_comp(index->table));
2968 	ut_ad(buf_block_get_frame(block) == page_align(rec));
2969 	ut_ad(page_is_leaf(page_align(rec)));
2970 
2971 #ifdef UNIV_DEBUG
2972 	if (btr_cur_print_record_ops && (thr != NULL)) {
2973 		btr_cur_trx_report(thr_get_trx(thr)->id, index, "del mark ");
2974 		rec_print_new(stderr, rec, offsets);
2975 	}
2976 #endif /* UNIV_DEBUG */
2977 
2978 	ut_ad(dict_index_is_clust(index));
2979 	ut_ad(!rec_get_deleted_flag(rec, rec_offs_comp(offsets)));
2980 
2981 	err = lock_clust_rec_modify_check_and_lock(BTR_NO_LOCKING_FLAG, block,
2982 						   rec, index, offsets, thr);
2983 
2984 	if (err != DB_SUCCESS) {
2985 
2986 		return(err);
2987 	}
2988 
2989 	err = trx_undo_report_row_operation(0, TRX_UNDO_MODIFY_OP, thr,
2990 					    index, NULL, NULL, 0, rec, offsets,
2991 					    &roll_ptr);
2992 	if (err != DB_SUCCESS) {
2993 
2994 		return(err);
2995 	}
2996 
2997 	/* The btr_search_latch is not needed here, because
2998 	the adaptive hash index does not depend on the delete-mark
2999 	and the delete-mark is being updated in place. */
3000 
3001 	page_zip = buf_block_get_page_zip(block);
3002 
3003 	btr_blob_dbg_set_deleted_flag(rec, index, offsets, TRUE);
3004 	btr_rec_set_deleted_flag(rec, page_zip, TRUE);
3005 
3006 	trx = thr_get_trx(thr);
3007 
3008 	if (dict_index_is_online_ddl(index)) {
3009 		row_log_table_delete(rec, index, offsets, NULL);
3010 	}
3011 
3012 	row_upd_rec_sys_fields(rec, page_zip, index, offsets, trx, roll_ptr);
3013 
3014 	btr_cur_del_mark_set_clust_rec_log(rec, index, trx->id,
3015 					   roll_ptr, mtr);
3016 
3017 	return(err);
3018 }
3019 
3020 /****************************************************************//**
3021 Writes the redo log record for a delete mark setting of a secondary
3022 index record. */
3023 UNIV_INLINE
3024 void
btr_cur_del_mark_set_sec_rec_log(rec_t * rec,ibool val,mtr_t * mtr)3025 btr_cur_del_mark_set_sec_rec_log(
3026 /*=============================*/
3027 	rec_t*		rec,	/*!< in: record */
3028 	ibool		val,	/*!< in: value to set */
3029 	mtr_t*		mtr)	/*!< in: mtr */
3030 {
3031 	byte*	log_ptr;
3032 	ut_ad(val <= 1);
3033 
3034 	log_ptr = mlog_open(mtr, 11 + 1 + 2);
3035 
3036 	if (!log_ptr) {
3037 		/* Logging in mtr is switched off during crash recovery:
3038 		in that case mlog_open returns NULL */
3039 		return;
3040 	}
3041 
3042 	log_ptr = mlog_write_initial_log_record_fast(
3043 		rec, MLOG_REC_SEC_DELETE_MARK, log_ptr, mtr);
3044 	mach_write_to_1(log_ptr, val);
3045 	log_ptr++;
3046 
3047 	mach_write_to_2(log_ptr, page_offset(rec));
3048 	log_ptr += 2;
3049 
3050 	mlog_close(mtr, log_ptr);
3051 }
3052 #endif /* !UNIV_HOTBACKUP */
3053 
3054 /****************************************************************//**
3055 Parses the redo log record for delete marking or unmarking of a secondary
3056 index record.
3057 @return	end of log record or NULL */
3058 UNIV_INTERN
3059 byte*
btr_cur_parse_del_mark_set_sec_rec(byte * ptr,byte * end_ptr,page_t * page,page_zip_des_t * page_zip)3060 btr_cur_parse_del_mark_set_sec_rec(
3061 /*===============================*/
3062 	byte*		ptr,	/*!< in: buffer */
3063 	byte*		end_ptr,/*!< in: buffer end */
3064 	page_t*		page,	/*!< in/out: page or NULL */
3065 	page_zip_des_t*	page_zip)/*!< in/out: compressed page, or NULL */
3066 {
3067 	ulint	val;
3068 	ulint	offset;
3069 	rec_t*	rec;
3070 
3071 	if (end_ptr < ptr + 3) {
3072 
3073 		return(NULL);
3074 	}
3075 
3076 	val = mach_read_from_1(ptr);
3077 	ptr++;
3078 
3079 	offset = mach_read_from_2(ptr);
3080 	ptr += 2;
3081 
3082 	ut_a(offset <= UNIV_PAGE_SIZE);
3083 
3084 	if (page) {
3085 		rec = page + offset;
3086 
3087 		/* We do not need to reserve btr_search_latch, as the page
3088 		is only being recovered, and there cannot be a hash index to
3089 		it. Besides, the delete-mark flag is being updated in place
3090 		and the adaptive hash index does not depend on it. */
3091 
3092 		btr_rec_set_deleted_flag(rec, page_zip, val);
3093 	}
3094 
3095 	return(ptr);
3096 }
3097 
3098 #ifndef UNIV_HOTBACKUP
3099 /***********************************************************//**
3100 Sets a secondary index record delete mark to TRUE or FALSE.
3101 @return	DB_SUCCESS, DB_LOCK_WAIT, or error number */
3102 UNIV_INTERN
3103 dberr_t
btr_cur_del_mark_set_sec_rec(ulint flags,btr_cur_t * cursor,ibool val,que_thr_t * thr,mtr_t * mtr)3104 btr_cur_del_mark_set_sec_rec(
3105 /*=========================*/
3106 	ulint		flags,	/*!< in: locking flag */
3107 	btr_cur_t*	cursor,	/*!< in: cursor */
3108 	ibool		val,	/*!< in: value to set */
3109 	que_thr_t*	thr,	/*!< in: query thread */
3110 	mtr_t*		mtr)	/*!< in/out: mini-transaction */
3111 {
3112 	buf_block_t*	block;
3113 	rec_t*		rec;
3114 	dberr_t		err;
3115 
3116 	block = btr_cur_get_block(cursor);
3117 	rec = btr_cur_get_rec(cursor);
3118 
3119 #ifdef UNIV_DEBUG
3120 	if (btr_cur_print_record_ops && (thr != NULL)) {
3121 		btr_cur_trx_report(thr_get_trx(thr)->id, cursor->index,
3122 				   "del mark ");
3123 		rec_print(stderr, rec, cursor->index);
3124 	}
3125 #endif /* UNIV_DEBUG */
3126 
3127 	err = lock_sec_rec_modify_check_and_lock(flags,
3128 						 btr_cur_get_block(cursor),
3129 						 rec, cursor->index, thr, mtr);
3130 	if (err != DB_SUCCESS) {
3131 
3132 		return(err);
3133 	}
3134 
3135 	ut_ad(!!page_rec_is_comp(rec)
3136 	      == dict_table_is_comp(cursor->index->table));
3137 
3138 	/* We do not need to reserve btr_search_latch, as the
3139 	delete-mark flag is being updated in place and the adaptive
3140 	hash index does not depend on it. */
3141 	btr_rec_set_deleted_flag(rec, buf_block_get_page_zip(block), val);
3142 
3143 	btr_cur_del_mark_set_sec_rec_log(rec, val, mtr);
3144 
3145 	return(DB_SUCCESS);
3146 }
3147 
3148 /***********************************************************//**
3149 Sets a secondary index record's delete mark to the given value. This
3150 function is only used by the insert buffer merge mechanism. */
3151 UNIV_INTERN
3152 void
btr_cur_set_deleted_flag_for_ibuf(rec_t * rec,page_zip_des_t * page_zip,ibool val,mtr_t * mtr)3153 btr_cur_set_deleted_flag_for_ibuf(
3154 /*==============================*/
3155 	rec_t*		rec,		/*!< in/out: record */
3156 	page_zip_des_t*	page_zip,	/*!< in/out: compressed page
3157 					corresponding to rec, or NULL
3158 					when the tablespace is
3159 					uncompressed */
3160 	ibool		val,		/*!< in: value to set */
3161 	mtr_t*		mtr)		/*!< in/out: mini-transaction */
3162 {
3163 	/* We do not need to reserve btr_search_latch, as the page
3164 	has just been read to the buffer pool and there cannot be
3165 	a hash index to it.  Besides, the delete-mark flag is being
3166 	updated in place and the adaptive hash index does not depend
3167 	on it. */
3168 
3169 	btr_rec_set_deleted_flag(rec, page_zip, val);
3170 
3171 	btr_cur_del_mark_set_sec_rec_log(rec, val, mtr);
3172 }
3173 
3174 /*==================== B-TREE RECORD REMOVE =========================*/
3175 
3176 /*************************************************************//**
3177 Tries to compress a page of the tree if it seems useful. It is assumed
3178 that mtr holds an x-latch on the tree and on the cursor page. To avoid
3179 deadlocks, mtr must also own x-latches to brothers of page, if those
3180 brothers exist. NOTE: it is assumed that the caller has reserved enough
3181 free extents so that the compression will always succeed if done!
3182 @return	TRUE if compression occurred */
3183 UNIV_INTERN
3184 ibool
btr_cur_compress_if_useful(btr_cur_t * cursor,ibool adjust,mtr_t * mtr)3185 btr_cur_compress_if_useful(
3186 /*=======================*/
3187 	btr_cur_t*	cursor,	/*!< in/out: cursor on the page to compress;
3188 				cursor does not stay valid if !adjust and
3189 				compression occurs */
3190 	ibool		adjust,	/*!< in: TRUE if should adjust the
3191 				cursor position even if compression occurs */
3192 	mtr_t*		mtr)	/*!< in/out: mini-transaction */
3193 {
3194 	ut_ad(mtr_memo_contains(mtr,
3195 				dict_index_get_lock(btr_cur_get_index(cursor)),
3196 				MTR_MEMO_X_LOCK));
3197 	ut_ad(mtr_memo_contains(mtr, btr_cur_get_block(cursor),
3198 				MTR_MEMO_PAGE_X_FIX));
3199 
3200 	return(btr_cur_compress_recommendation(cursor, mtr)
3201 	       && btr_compress(cursor, adjust, mtr));
3202 }
3203 
3204 /*******************************************************//**
3205 Removes the record on which the tree cursor is positioned on a leaf page.
3206 It is assumed that the mtr has an x-latch on the page where the cursor is
3207 positioned, but no latch on the whole tree.
3208 @return	TRUE if success, i.e., the page did not become too empty */
3209 UNIV_INTERN
3210 ibool
btr_cur_optimistic_delete_func(btr_cur_t * cursor,ulint flags,mtr_t * mtr)3211 btr_cur_optimistic_delete_func(
3212 /*===========================*/
3213 	btr_cur_t*	cursor,	/*!< in: cursor on leaf page, on the record to
3214 				delete; cursor stays valid: if deletion
3215 				succeeds, on function exit it points to the
3216 				successor of the deleted record */
3217 #ifdef UNIV_DEBUG
3218 	ulint		flags,	/*!< in: BTR_CREATE_FLAG or 0 */
3219 #endif /* UNIV_DEBUG */
3220 	mtr_t*		mtr)	/*!< in: mtr; if this function returns
3221 				TRUE on a leaf page of a secondary
3222 				index, the mtr must be committed
3223 				before latching any further pages */
3224 {
3225 	buf_block_t*	block;
3226 	rec_t*		rec;
3227 	mem_heap_t*	heap		= NULL;
3228 	ulint		offsets_[REC_OFFS_NORMAL_SIZE];
3229 	ulint*		offsets		= offsets_;
3230 	ibool		no_compress_needed;
3231 	rec_offs_init(offsets_);
3232 
3233 	ut_ad(flags == 0 || flags == BTR_CREATE_FLAG);
3234 	ut_ad(mtr_memo_contains(mtr, btr_cur_get_block(cursor),
3235 				MTR_MEMO_PAGE_X_FIX));
3236 	/* This is intended only for leaf page deletions */
3237 
3238 	block = btr_cur_get_block(cursor);
3239 
3240 	ut_ad(page_is_leaf(buf_block_get_frame(block)));
3241 	ut_ad(!dict_index_is_online_ddl(cursor->index)
3242 	      || dict_index_is_clust(cursor->index)
3243 	      || (flags & BTR_CREATE_FLAG));
3244 
3245 	rec = btr_cur_get_rec(cursor);
3246 	offsets = rec_get_offsets(rec, cursor->index, offsets,
3247 				  ULINT_UNDEFINED, &heap);
3248 
3249 	no_compress_needed = !rec_offs_any_extern(offsets)
3250 		&& btr_cur_can_delete_without_compress(
3251 			cursor, rec_offs_size(offsets), mtr);
3252 
3253 	if (no_compress_needed) {
3254 
3255 		page_t*		page	= buf_block_get_frame(block);
3256 		page_zip_des_t*	page_zip= buf_block_get_page_zip(block);
3257 
3258 		lock_update_delete(block, rec);
3259 
3260 		btr_search_update_hash_on_delete(cursor);
3261 
3262 		if (page_zip) {
3263 #ifdef UNIV_ZIP_DEBUG
3264 			ut_a(page_zip_validate(page_zip, page, cursor->index));
3265 #endif /* UNIV_ZIP_DEBUG */
3266 			page_cur_delete_rec(btr_cur_get_page_cur(cursor),
3267 					    cursor->index, offsets, mtr);
3268 #ifdef UNIV_ZIP_DEBUG
3269 			ut_a(page_zip_validate(page_zip, page, cursor->index));
3270 #endif /* UNIV_ZIP_DEBUG */
3271 
3272 			/* On compressed pages, the IBUF_BITMAP_FREE
3273 			space is not affected by deleting (purging)
3274 			records, because it is defined as the minimum
3275 			of space available *without* reorganize, and
3276 			space available in the modification log. */
3277 		} else {
3278 			const ulint	max_ins
3279 				= page_get_max_insert_size_after_reorganize(
3280 					page, 1);
3281 
3282 			page_cur_delete_rec(btr_cur_get_page_cur(cursor),
3283 					    cursor->index, offsets, mtr);
3284 
3285 			/* The change buffer does not handle inserts
3286 			into non-leaf pages, into clustered indexes,
3287 			or into the change buffer. */
3288 			if (page_is_leaf(page)
3289 			    && !dict_index_is_clust(cursor->index)
3290 			    && !dict_index_is_ibuf(cursor->index)) {
3291 				ibuf_update_free_bits_low(block, max_ins, mtr);
3292 			}
3293 		}
3294 	}
3295 
3296 	if (UNIV_LIKELY_NULL(heap)) {
3297 		mem_heap_free(heap);
3298 	}
3299 
3300 	return(no_compress_needed);
3301 }
3302 
3303 /*************************************************************//**
3304 Removes the record on which the tree cursor is positioned. Tries
3305 to compress the page if its fillfactor drops below a threshold
3306 or if it is the only page on the level. It is assumed that mtr holds
3307 an x-latch on the tree and on the cursor page. To avoid deadlocks,
3308 mtr must also own x-latches to brothers of page, if those brothers
3309 exist.
3310 @return	TRUE if compression occurred */
3311 UNIV_INTERN
3312 ibool
btr_cur_pessimistic_delete(dberr_t * err,ibool has_reserved_extents,btr_cur_t * cursor,ulint flags,enum trx_rb_ctx rb_ctx,mtr_t * mtr)3313 btr_cur_pessimistic_delete(
3314 /*=======================*/
3315 	dberr_t*	err,	/*!< out: DB_SUCCESS or DB_OUT_OF_FILE_SPACE;
3316 				the latter may occur because we may have
3317 				to update node pointers on upper levels,
3318 				and in the case of variable length keys
3319 				these may actually grow in size */
3320 	ibool		has_reserved_extents, /*!< in: TRUE if the
3321 				caller has already reserved enough free
3322 				extents so that he knows that the operation
3323 				will succeed */
3324 	btr_cur_t*	cursor,	/*!< in: cursor on the record to delete;
3325 				if compression does not occur, the cursor
3326 				stays valid: it points to successor of
3327 				deleted record on function exit */
3328 	ulint		flags,	/*!< in: BTR_CREATE_FLAG or 0 */
3329 	enum trx_rb_ctx	rb_ctx,	/*!< in: rollback context */
3330 	mtr_t*		mtr)	/*!< in: mtr */
3331 {
3332 	buf_block_t*	block;
3333 	page_t*		page;
3334 	page_zip_des_t*	page_zip;
3335 	dict_index_t*	index;
3336 	rec_t*		rec;
3337 	ulint		n_reserved	= 0;
3338 	ibool		success;
3339 	ibool		ret		= FALSE;
3340 	ulint		level;
3341 	mem_heap_t*	heap;
3342 	ulint*		offsets;
3343 
3344 	block = btr_cur_get_block(cursor);
3345 	page = buf_block_get_frame(block);
3346 	index = btr_cur_get_index(cursor);
3347 
3348 	ut_ad(flags == 0 || flags == BTR_CREATE_FLAG);
3349 	ut_ad(!dict_index_is_online_ddl(index)
3350 	      || dict_index_is_clust(index)
3351 	      || (flags & BTR_CREATE_FLAG));
3352 	ut_ad(mtr_memo_contains(mtr, dict_index_get_lock(index),
3353 				MTR_MEMO_X_LOCK));
3354 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
3355 	if (!has_reserved_extents) {
3356 		/* First reserve enough free space for the file segments
3357 		of the index tree, so that the node pointer updates will
3358 		not fail because of lack of space */
3359 
3360 		ulint	n_extents = cursor->tree_height / 32 + 1;
3361 
3362 		success = fsp_reserve_free_extents(&n_reserved,
3363 						   index->space,
3364 						   n_extents,
3365 						   FSP_CLEANING, mtr);
3366 		if (!success) {
3367 			*err = DB_OUT_OF_FILE_SPACE;
3368 
3369 			return(FALSE);
3370 		}
3371 	}
3372 
3373 	heap = mem_heap_create(1024);
3374 	rec = btr_cur_get_rec(cursor);
3375 	page_zip = buf_block_get_page_zip(block);
3376 #ifdef UNIV_ZIP_DEBUG
3377 	ut_a(!page_zip || page_zip_validate(page_zip, page, index));
3378 #endif /* UNIV_ZIP_DEBUG */
3379 
3380 	offsets = rec_get_offsets(rec, index, NULL, ULINT_UNDEFINED, &heap);
3381 
3382 	if (rec_offs_any_extern(offsets)) {
3383 		btr_rec_free_externally_stored_fields(index,
3384 						      rec, offsets, page_zip,
3385 						      rb_ctx, mtr);
3386 #ifdef UNIV_ZIP_DEBUG
3387 		ut_a(!page_zip || page_zip_validate(page_zip, page, index));
3388 #endif /* UNIV_ZIP_DEBUG */
3389 	}
3390 
3391 	if (UNIV_UNLIKELY(page_get_n_recs(page) < 2)
3392 	    && UNIV_UNLIKELY(dict_index_get_page(index)
3393 			     != buf_block_get_page_no(block))) {
3394 
3395 		/* If there is only one record, drop the whole page in
3396 		btr_discard_page, if this is not the root page */
3397 
3398 		btr_discard_page(cursor, mtr);
3399 
3400 		ret = TRUE;
3401 
3402 		goto return_after_reservations;
3403 	}
3404 
3405 	if (flags == 0) {
3406 		lock_update_delete(block, rec);
3407 	}
3408 
3409 	level = btr_page_get_level(page, mtr);
3410 
3411 	if (level > 0
3412 	    && UNIV_UNLIKELY(rec == page_rec_get_next(
3413 				     page_get_infimum_rec(page)))) {
3414 
3415 		rec_t*	next_rec = page_rec_get_next(rec);
3416 
3417 		if (btr_page_get_prev(page, mtr) == FIL_NULL) {
3418 
3419 			/* If we delete the leftmost node pointer on a
3420 			non-leaf level, we must mark the new leftmost node
3421 			pointer as the predefined minimum record */
3422 
3423 			/* This will make page_zip_validate() fail until
3424 			page_cur_delete_rec() completes.  This is harmless,
3425 			because everything will take place within a single
3426 			mini-transaction and because writing to the redo log
3427 			is an atomic operation (performed by mtr_commit()). */
3428 			btr_set_min_rec_mark(next_rec, mtr);
3429 		} else {
3430 			/* Otherwise, if we delete the leftmost node pointer
3431 			on a page, we have to change the father node pointer
3432 			so that it is equal to the new leftmost node pointer
3433 			on the page */
3434 
3435 			btr_node_ptr_delete(index, block, mtr);
3436 
3437 			dtuple_t*	node_ptr = dict_index_build_node_ptr(
3438 				index, next_rec, buf_block_get_page_no(block),
3439 				heap, level);
3440 
3441 			btr_insert_on_non_leaf_level(
3442 				flags, index, level + 1, node_ptr, mtr);
3443 		}
3444 	}
3445 
3446 	btr_search_update_hash_on_delete(cursor);
3447 
3448 	page_cur_delete_rec(btr_cur_get_page_cur(cursor), index, offsets, mtr);
3449 #ifdef UNIV_ZIP_DEBUG
3450 	ut_a(!page_zip || page_zip_validate(page_zip, page, index));
3451 #endif /* UNIV_ZIP_DEBUG */
3452 
3453 	ut_ad(btr_check_node_ptr(index, block, mtr));
3454 
3455 return_after_reservations:
3456 	*err = DB_SUCCESS;
3457 
3458 	mem_heap_free(heap);
3459 
3460 	if (ret == FALSE) {
3461 		ret = btr_cur_compress_if_useful(cursor, FALSE, mtr);
3462 	}
3463 
3464 	if (n_reserved > 0) {
3465 		fil_space_release_free_extents(index->space, n_reserved);
3466 	}
3467 
3468 	return(ret);
3469 }
3470 
3471 /*******************************************************************//**
3472 Adds path information to the cursor for the current page, for which
3473 the binary search has been performed. */
3474 static
3475 void
btr_cur_add_path_info(btr_cur_t * cursor,ulint height,ulint root_height)3476 btr_cur_add_path_info(
3477 /*==================*/
3478 	btr_cur_t*	cursor,		/*!< in: cursor positioned on a page */
3479 	ulint		height,		/*!< in: height of the page in tree;
3480 					0 means leaf node */
3481 	ulint		root_height)	/*!< in: root node height in tree */
3482 {
3483 	btr_path_t*	slot;
3484 	const rec_t*	rec;
3485 	const page_t*	page;
3486 
3487 	ut_a(cursor->path_arr);
3488 
3489 	if (root_height >= BTR_PATH_ARRAY_N_SLOTS - 1) {
3490 		/* Do nothing; return empty path */
3491 
3492 		slot = cursor->path_arr;
3493 		slot->nth_rec = ULINT_UNDEFINED;
3494 
3495 		return;
3496 	}
3497 
3498 	if (height == 0) {
3499 		/* Mark end of slots for path */
3500 		slot = cursor->path_arr + root_height + 1;
3501 		slot->nth_rec = ULINT_UNDEFINED;
3502 	}
3503 
3504 	rec = btr_cur_get_rec(cursor);
3505 
3506 	slot = cursor->path_arr + (root_height - height);
3507 
3508 	page = page_align(rec);
3509 
3510 	slot->nth_rec = page_rec_get_n_recs_before(rec);
3511 	slot->n_recs = page_get_n_recs(page);
3512 	slot->page_no = page_get_page_no(page);
3513 	slot->page_level = btr_page_get_level_low(page);
3514 }
3515 
3516 /*******************************************************************//**
3517 Estimate the number of rows between slot1 and slot2 for any level on a
3518 B-tree. This function starts from slot1->page and reads a few pages to
3519 the right, counting their records. If we reach slot2->page quickly then
3520 we know exactly how many records there are between slot1 and slot2 and
3521 we set is_n_rows_exact to TRUE. If we cannot reach slot2->page quickly
3522 then we calculate the average number of records in the pages scanned
3523 so far and assume that all pages that we did not scan up to slot2->page
3524 contain the same number of records, then we multiply that average to
3525 the number of pages between slot1->page and slot2->page (which is
3526 n_rows_on_prev_level). In this case we set is_n_rows_exact to FALSE.
3527 @return	number of rows (exact or estimated) */
3528 static
3529 ib_int64_t
btr_estimate_n_rows_in_range_on_level(dict_index_t * index,btr_path_t * slot1,btr_path_t * slot2,ib_int64_t n_rows_on_prev_level,ibool * is_n_rows_exact)3530 btr_estimate_n_rows_in_range_on_level(
3531 /*==================================*/
3532 	dict_index_t*	index,			/*!< in: index */
3533 	btr_path_t*	slot1,			/*!< in: left border */
3534 	btr_path_t*	slot2,			/*!< in: right border */
3535 	ib_int64_t	n_rows_on_prev_level,	/*!< in: number of rows
3536 						on the previous level for the
3537 						same descend paths; used to
3538 						determine the numbe of pages
3539 						on this level */
3540 	ibool*		is_n_rows_exact)	/*!< out: TRUE if the returned
3541 						value is exact i.e. not an
3542 						estimation */
3543 {
3544 	ulint		space;
3545 	ib_int64_t	n_rows;
3546 	ulint		n_pages_read;
3547 	ulint		page_no;
3548 	ulint		zip_size;
3549 	ulint		level;
3550 
3551 	space = dict_index_get_space(index);
3552 
3553 	n_rows = 0;
3554 	n_pages_read = 0;
3555 
3556 	/* Assume by default that we will scan all pages between
3557 	slot1->page_no and slot2->page_no */
3558 	*is_n_rows_exact = TRUE;
3559 
3560 	/* add records from slot1->page_no which are to the right of
3561 	the record which serves as a left border of the range, if any */
3562 	if (slot1->nth_rec < slot1->n_recs) {
3563 		n_rows += slot1->n_recs - slot1->nth_rec;
3564 	}
3565 
3566 	/* add records from slot2->page_no which are to the left of
3567 	the record which servers as a right border of the range, if any */
3568 	if (slot2->nth_rec > 1) {
3569 		n_rows += slot2->nth_rec - 1;
3570 	}
3571 
3572 	/* count the records in the pages between slot1->page_no and
3573 	slot2->page_no (non inclusive), if any */
3574 
3575 	zip_size = fil_space_get_zip_size(space);
3576 
3577 	/* Do not read more than this number of pages in order not to hurt
3578 	performance with this code which is just an estimation. If we read
3579 	this many pages before reaching slot2->page_no then we estimate the
3580 	average from the pages scanned so far */
3581 #	define N_PAGES_READ_LIMIT	10
3582 
3583 	page_no = slot1->page_no;
3584 	level = slot1->page_level;
3585 
3586 	do {
3587 		mtr_t		mtr;
3588 		page_t*		page;
3589 		buf_block_t*	block;
3590 
3591 		mtr_start(&mtr);
3592 
3593 		/* Fetch the page. Because we are not holding the
3594 		index->lock, the tree may have changed and we may be
3595 		attempting to read a page that is no longer part of
3596 		the B-tree. We pass BUF_GET_POSSIBLY_FREED in order to
3597 		silence a debug assertion about this. */
3598 		block = buf_page_get_gen(space, zip_size, page_no, RW_S_LATCH,
3599 					 NULL, BUF_GET_POSSIBLY_FREED,
3600 					 __FILE__, __LINE__, &mtr);
3601 
3602 		page = buf_block_get_frame(block);
3603 
3604 		/* It is possible that the tree has been reorganized in the
3605 		meantime and this is a different page. If this happens the
3606 		calculated estimate will be bogus, which is not fatal as
3607 		this is only an estimate. We are sure that a page with
3608 		page_no exists because InnoDB never frees pages, only
3609 		reuses them. */
3610 		if (fil_page_get_type(page) != FIL_PAGE_INDEX
3611 		    || btr_page_get_index_id(page) != index->id
3612 		    || btr_page_get_level_low(page) != level) {
3613 
3614 			/* The page got reused for something else */
3615 			mtr_commit(&mtr);
3616 			goto inexact;
3617 		}
3618 
3619 		/* It is possible but highly unlikely that the page was
3620 		originally written by an old version of InnoDB that did
3621 		not initialize FIL_PAGE_TYPE on other than B-tree pages.
3622 		For example, this could be an almost-empty BLOB page
3623 		that happens to contain the magic values in the fields
3624 		that we checked above. */
3625 
3626 		n_pages_read++;
3627 
3628 		if (page_no != slot1->page_no) {
3629 			/* Do not count the records on slot1->page_no,
3630 			we already counted them before this loop. */
3631 			n_rows += page_get_n_recs(page);
3632 		}
3633 
3634 		page_no = btr_page_get_next(page, &mtr);
3635 
3636 		mtr_commit(&mtr);
3637 
3638 		if (n_pages_read == N_PAGES_READ_LIMIT
3639 		    || page_no == FIL_NULL) {
3640 			/* Either we read too many pages or
3641 			we reached the end of the level without passing
3642 			through slot2->page_no, the tree must have changed
3643 			in the meantime */
3644 			goto inexact;
3645 		}
3646 
3647 	} while (page_no != slot2->page_no);
3648 
3649 	return(n_rows);
3650 
3651 inexact:
3652 
3653 	*is_n_rows_exact = FALSE;
3654 
3655 	/* We did interrupt before reaching slot2->page */
3656 
3657 	if (n_pages_read > 0) {
3658 		/* The number of pages on this level is
3659 		n_rows_on_prev_level, multiply it by the
3660 		average number of recs per page so far */
3661 		n_rows = n_rows_on_prev_level
3662 			* n_rows / n_pages_read;
3663 	} else {
3664 		/* The tree changed before we could even
3665 		start with slot1->page_no */
3666 		n_rows = 10;
3667 	}
3668 
3669 	return(n_rows);
3670 }
3671 
3672 /*******************************************************************//**
3673 Estimates the number of rows in a given index range.
3674 @return	estimated number of rows */
3675 UNIV_INTERN
3676 ib_int64_t
btr_estimate_n_rows_in_range(dict_index_t * index,const dtuple_t * tuple1,ulint mode1,const dtuple_t * tuple2,ulint mode2)3677 btr_estimate_n_rows_in_range(
3678 /*=========================*/
3679 	dict_index_t*	index,	/*!< in: index */
3680 	const dtuple_t*	tuple1,	/*!< in: range start, may also be empty tuple */
3681 	ulint		mode1,	/*!< in: search mode for range start */
3682 	const dtuple_t*	tuple2,	/*!< in: range end, may also be empty tuple */
3683 	ulint		mode2)	/*!< in: search mode for range end */
3684 {
3685 	btr_path_t	path1[BTR_PATH_ARRAY_N_SLOTS];
3686 	btr_path_t	path2[BTR_PATH_ARRAY_N_SLOTS];
3687 	btr_cur_t	cursor;
3688 	btr_path_t*	slot1;
3689 	btr_path_t*	slot2;
3690 	ibool		diverged;
3691 	ibool		diverged_lot;
3692 	ulint		divergence_level;
3693 	ib_int64_t	n_rows;
3694 	ibool		is_n_rows_exact;
3695 	ulint		i;
3696 	mtr_t		mtr;
3697 	ib_int64_t	table_n_rows;
3698 
3699 	table_n_rows = dict_table_get_n_rows(index->table);
3700 
3701 	mtr_start(&mtr);
3702 
3703 	cursor.path_arr = path1;
3704 
3705 	if (dtuple_get_n_fields(tuple1) > 0) {
3706 
3707 		btr_cur_search_to_nth_level(index, 0, tuple1, mode1,
3708 					    BTR_SEARCH_LEAF | BTR_ESTIMATE,
3709 					    &cursor, 0,
3710 					    __FILE__, __LINE__, &mtr);
3711 	} else {
3712 		btr_cur_open_at_index_side(true, index,
3713 					   BTR_SEARCH_LEAF | BTR_ESTIMATE,
3714 					   &cursor, 0, &mtr);
3715 	}
3716 
3717 	mtr_commit(&mtr);
3718 
3719 	mtr_start(&mtr);
3720 
3721 	cursor.path_arr = path2;
3722 
3723 	if (dtuple_get_n_fields(tuple2) > 0) {
3724 
3725 		btr_cur_search_to_nth_level(index, 0, tuple2, mode2,
3726 					    BTR_SEARCH_LEAF | BTR_ESTIMATE,
3727 					    &cursor, 0,
3728 					    __FILE__, __LINE__, &mtr);
3729 	} else {
3730 		btr_cur_open_at_index_side(false, index,
3731 					   BTR_SEARCH_LEAF | BTR_ESTIMATE,
3732 					   &cursor, 0, &mtr);
3733 	}
3734 
3735 	mtr_commit(&mtr);
3736 
3737 	/* We have the path information for the range in path1 and path2 */
3738 
3739 	n_rows = 1;
3740 	is_n_rows_exact = TRUE;
3741 	diverged = FALSE;	    /* This becomes true when the path is not
3742 				    the same any more */
3743 	diverged_lot = FALSE;	    /* This becomes true when the paths are
3744 				    not the same or adjacent any more */
3745 	divergence_level = 1000000; /* This is the level where paths diverged
3746 				    a lot */
3747 	for (i = 0; ; i++) {
3748 		ut_ad(i < BTR_PATH_ARRAY_N_SLOTS);
3749 
3750 		slot1 = path1 + i;
3751 		slot2 = path2 + i;
3752 
3753 		if (slot1->nth_rec == ULINT_UNDEFINED
3754 		    || slot2->nth_rec == ULINT_UNDEFINED) {
3755 
3756 			if (i > divergence_level + 1 && !is_n_rows_exact) {
3757 				/* In trees whose height is > 1 our algorithm
3758 				tends to underestimate: multiply the estimate
3759 				by 2: */
3760 
3761 				n_rows = n_rows * 2;
3762 			}
3763 
3764 			DBUG_EXECUTE_IF("bug14007649", return(n_rows););
3765 
3766 			/* Do not estimate the number of rows in the range
3767 			to over 1 / 2 of the estimated rows in the whole
3768 			table */
3769 
3770 			if (n_rows > table_n_rows / 2 && !is_n_rows_exact) {
3771 
3772 				n_rows = table_n_rows / 2;
3773 
3774 				/* If there are just 0 or 1 rows in the table,
3775 				then we estimate all rows are in the range */
3776 
3777 				if (n_rows == 0) {
3778 					n_rows = table_n_rows;
3779 				}
3780 			}
3781 
3782 			return(n_rows);
3783 		}
3784 
3785 		if (!diverged && slot1->nth_rec != slot2->nth_rec) {
3786 
3787 			diverged = TRUE;
3788 
3789 			if (slot1->nth_rec < slot2->nth_rec) {
3790 				n_rows = slot2->nth_rec - slot1->nth_rec;
3791 
3792 				if (n_rows > 1) {
3793 					diverged_lot = TRUE;
3794 					divergence_level = i;
3795 				}
3796 			} else {
3797 				/* It is possible that
3798 				slot1->nth_rec >= slot2->nth_rec
3799 				if, for example, we have a single page
3800 				tree which contains (inf, 5, 6, supr)
3801 				and we select where x > 20 and x < 30;
3802 				in this case slot1->nth_rec will point
3803 				to the supr record and slot2->nth_rec
3804 				will point to 6 */
3805 				n_rows = 0;
3806 			}
3807 
3808 		} else if (diverged && !diverged_lot) {
3809 
3810 			if (slot1->nth_rec < slot1->n_recs
3811 			    || slot2->nth_rec > 1) {
3812 
3813 				diverged_lot = TRUE;
3814 				divergence_level = i;
3815 
3816 				n_rows = 0;
3817 
3818 				if (slot1->nth_rec < slot1->n_recs) {
3819 					n_rows += slot1->n_recs
3820 						- slot1->nth_rec;
3821 				}
3822 
3823 				if (slot2->nth_rec > 1) {
3824 					n_rows += slot2->nth_rec - 1;
3825 				}
3826 			}
3827 		} else if (diverged_lot) {
3828 
3829 			n_rows = btr_estimate_n_rows_in_range_on_level(
3830 				index, slot1, slot2, n_rows,
3831 				&is_n_rows_exact);
3832 		}
3833 	}
3834 }
3835 
3836 /*******************************************************************//**
3837 Record the number of non_null key values in a given index for
3838 each n-column prefix of the index where 1 <= n <= dict_index_get_n_unique(index).
3839 The estimates are eventually stored in the array:
3840 index->stat_n_non_null_key_vals[], which is indexed from 0 to n-1. */
3841 static
3842 void
btr_record_not_null_field_in_rec(ulint n_unique,const ulint * offsets,ib_uint64_t * n_not_null)3843 btr_record_not_null_field_in_rec(
3844 /*=============================*/
3845 	ulint		n_unique,	/*!< in: dict_index_get_n_unique(index),
3846 					number of columns uniquely determine
3847 					an index entry */
3848 	const ulint*	offsets,	/*!< in: rec_get_offsets(rec, index),
3849 					its size could be for all fields or
3850 					that of "n_unique" */
3851 	ib_uint64_t*	n_not_null)	/*!< in/out: array to record number of
3852 					not null rows for n-column prefix */
3853 {
3854 	ulint	i;
3855 
3856 	ut_ad(rec_offs_n_fields(offsets) >= n_unique);
3857 
3858 	if (n_not_null == NULL) {
3859 		return;
3860 	}
3861 
3862 	for (i = 0; i < n_unique; i++) {
3863 		if (rec_offs_nth_sql_null(offsets, i)) {
3864 			break;
3865 		}
3866 
3867 		n_not_null[i]++;
3868 	}
3869 }
3870 
3871 /*******************************************************************//**
3872 Estimates the number of different key values in a given index, for
3873 each n-column prefix of the index where 1 <= n <= dict_index_get_n_unique(index).
3874 The estimates are stored in the array index->stat_n_diff_key_vals[] (indexed
3875 0..n_uniq-1) and the number of pages that were sampled is saved in
3876 index->stat_n_sample_sizes[].
3877 If innodb_stats_method is nulls_ignored, we also record the number of
3878 non-null values for each prefix and stored the estimates in
3879 array index->stat_n_non_null_key_vals. */
3880 UNIV_INTERN
3881 void
btr_estimate_number_of_different_key_vals(dict_index_t * index)3882 btr_estimate_number_of_different_key_vals(
3883 /*======================================*/
3884 	dict_index_t*	index)	/*!< in: index */
3885 {
3886 	btr_cur_t	cursor;
3887 	page_t*		page;
3888 	rec_t*		rec;
3889 	ulint		n_cols;
3890 	ulint		matched_fields;
3891 	ulint		matched_bytes;
3892 	ib_uint64_t*	n_diff;
3893 	ib_uint64_t*	n_not_null;
3894 	ibool		stats_null_not_equal;
3895 	ullint		n_sample_pages; /* number of pages to sample */
3896 	ulint		not_empty_flag	= 0;
3897 	ulint		total_external_size = 0;
3898 	ulint		i;
3899 	ulint		j;
3900 	ullint		add_on;
3901 	mtr_t		mtr;
3902 	mem_heap_t*	heap		= NULL;
3903 	ulint*		offsets_rec	= NULL;
3904 	ulint*		offsets_next_rec = NULL;
3905 
3906 	n_cols = dict_index_get_n_unique(index);
3907 
3908 	heap = mem_heap_create((sizeof *n_diff + sizeof *n_not_null)
3909 			       * n_cols
3910 			       + dict_index_get_n_fields(index)
3911 			       * (sizeof *offsets_rec
3912 				  + sizeof *offsets_next_rec));
3913 
3914 	n_diff = (ib_uint64_t*) mem_heap_zalloc(
3915 		heap, n_cols * sizeof(ib_int64_t));
3916 
3917 	n_not_null = NULL;
3918 
3919 	/* Check srv_innodb_stats_method setting, and decide whether we
3920 	need to record non-null value and also decide if NULL is
3921 	considered equal (by setting stats_null_not_equal value) */
3922 	switch (srv_innodb_stats_method) {
3923 	case SRV_STATS_NULLS_IGNORED:
3924 		n_not_null = (ib_uint64_t*) mem_heap_zalloc(
3925 			heap, n_cols * sizeof *n_not_null);
3926 		/* fall through */
3927 
3928 	case SRV_STATS_NULLS_UNEQUAL:
3929 		/* for both SRV_STATS_NULLS_IGNORED and SRV_STATS_NULLS_UNEQUAL
3930 		case, we will treat NULLs as unequal value */
3931 		stats_null_not_equal = TRUE;
3932 		break;
3933 
3934 	case SRV_STATS_NULLS_EQUAL:
3935 		stats_null_not_equal = FALSE;
3936 		break;
3937 
3938 	default:
3939 		ut_error;
3940         }
3941 
3942 	/* It makes no sense to test more pages than are contained
3943 	in the index, thus we lower the number if it is too high */
3944 	if (srv_stats_transient_sample_pages > index->stat_index_size) {
3945 		if (index->stat_index_size > 0) {
3946 			n_sample_pages = index->stat_index_size;
3947 		} else {
3948 			n_sample_pages = 1;
3949 		}
3950 	} else {
3951 		n_sample_pages = srv_stats_transient_sample_pages;
3952 	}
3953 
3954 	/* We sample some pages in the index to get an estimate */
3955 
3956 	for (i = 0; i < n_sample_pages; i++) {
3957 		mtr_start(&mtr);
3958 
3959 		btr_cur_open_at_rnd_pos(index, BTR_SEARCH_LEAF, &cursor, &mtr);
3960 
3961 		/* Count the number of different key values for each prefix of
3962 		the key on this index page. If the prefix does not determine
3963 		the index record uniquely in the B-tree, then we subtract one
3964 		because otherwise our algorithm would give a wrong estimate
3965 		for an index where there is just one key value. */
3966 
3967 		page = btr_cur_get_page(&cursor);
3968 
3969 		rec = page_rec_get_next(page_get_infimum_rec(page));
3970 
3971 		if (!page_rec_is_supremum(rec)) {
3972 			not_empty_flag = 1;
3973 			offsets_rec = rec_get_offsets(rec, index, offsets_rec,
3974 						      ULINT_UNDEFINED, &heap);
3975 
3976 			if (n_not_null != NULL) {
3977 				btr_record_not_null_field_in_rec(
3978 					n_cols, offsets_rec, n_not_null);
3979 			}
3980 		}
3981 
3982 		while (!page_rec_is_supremum(rec)) {
3983 			rec_t*	next_rec = page_rec_get_next(rec);
3984 			if (page_rec_is_supremum(next_rec)) {
3985 				total_external_size +=
3986 					btr_rec_get_externally_stored_len(
3987 						rec, offsets_rec);
3988 				break;
3989 			}
3990 
3991 			matched_fields = 0;
3992 			matched_bytes = 0;
3993 			offsets_next_rec = rec_get_offsets(next_rec, index,
3994 							   offsets_next_rec,
3995 							   ULINT_UNDEFINED,
3996 							   &heap);
3997 
3998 			cmp_rec_rec_with_match(rec, next_rec,
3999 					       offsets_rec, offsets_next_rec,
4000 					       index, stats_null_not_equal,
4001 					       &matched_fields,
4002 					       &matched_bytes);
4003 
4004 			for (j = matched_fields; j < n_cols; j++) {
4005 				/* We add one if this index record has
4006 				a different prefix from the previous */
4007 
4008 				n_diff[j]++;
4009 			}
4010 
4011 			if (n_not_null != NULL) {
4012 				btr_record_not_null_field_in_rec(
4013 					n_cols, offsets_next_rec, n_not_null);
4014 			}
4015 
4016 			total_external_size
4017 				+= btr_rec_get_externally_stored_len(
4018 					rec, offsets_rec);
4019 
4020 			rec = next_rec;
4021 			/* Initialize offsets_rec for the next round
4022 			and assign the old offsets_rec buffer to
4023 			offsets_next_rec. */
4024 			{
4025 				ulint*	offsets_tmp = offsets_rec;
4026 				offsets_rec = offsets_next_rec;
4027 				offsets_next_rec = offsets_tmp;
4028 			}
4029 		}
4030 
4031 
4032 		if (n_cols == dict_index_get_n_unique_in_tree(index)) {
4033 
4034 			/* If there is more than one leaf page in the tree,
4035 			we add one because we know that the first record
4036 			on the page certainly had a different prefix than the
4037 			last record on the previous index page in the
4038 			alphabetical order. Before this fix, if there was
4039 			just one big record on each clustered index page, the
4040 			algorithm grossly underestimated the number of rows
4041 			in the table. */
4042 
4043 			if (btr_page_get_prev(page, &mtr) != FIL_NULL
4044 			    || btr_page_get_next(page, &mtr) != FIL_NULL) {
4045 
4046 				n_diff[n_cols - 1]++;
4047 			}
4048 		}
4049 
4050 		mtr_commit(&mtr);
4051 	}
4052 
4053 	/* If we saw k borders between different key values on
4054 	n_sample_pages leaf pages, we can estimate how many
4055 	there will be in index->stat_n_leaf_pages */
4056 
4057 	/* We must take into account that our sample actually represents
4058 	also the pages used for external storage of fields (those pages are
4059 	included in index->stat_n_leaf_pages) */
4060 
4061 	for (j = 0; j < n_cols; j++) {
4062 		index->stat_n_diff_key_vals[j]
4063 			= BTR_TABLE_STATS_FROM_SAMPLE(
4064 				n_diff[j], index, n_sample_pages,
4065 				total_external_size, not_empty_flag);
4066 
4067 		/* If the tree is small, smaller than
4068 		10 * n_sample_pages + total_external_size, then
4069 		the above estimate is ok. For bigger trees it is common that we
4070 		do not see any borders between key values in the few pages
4071 		we pick. But still there may be n_sample_pages
4072 		different key values, or even more. Let us try to approximate
4073 		that: */
4074 
4075 		add_on = index->stat_n_leaf_pages
4076 			/ (10 * (n_sample_pages
4077 				 + total_external_size));
4078 
4079 		if (add_on > n_sample_pages) {
4080 			add_on = n_sample_pages;
4081 		}
4082 
4083 		index->stat_n_diff_key_vals[j] += add_on;
4084 
4085 		index->stat_n_sample_sizes[j] = n_sample_pages;
4086 
4087 		/* Update the stat_n_non_null_key_vals[] with our
4088 		sampled result. stat_n_non_null_key_vals[] is created
4089 		and initialized to zero in dict_index_add_to_cache(),
4090 		along with stat_n_diff_key_vals[] array */
4091 		if (n_not_null != NULL) {
4092 			index->stat_n_non_null_key_vals[j] =
4093 				 BTR_TABLE_STATS_FROM_SAMPLE(
4094 					n_not_null[j], index, n_sample_pages,
4095 					total_external_size, not_empty_flag);
4096 		}
4097 	}
4098 
4099 	mem_heap_free(heap);
4100 }
4101 
4102 /*================== EXTERNAL STORAGE OF BIG FIELDS ===================*/
4103 
4104 /***********************************************************//**
4105 Gets the offset of the pointer to the externally stored part of a field.
4106 @return	offset of the pointer to the externally stored part */
4107 static
4108 ulint
btr_rec_get_field_ref_offs(const ulint * offsets,ulint n)4109 btr_rec_get_field_ref_offs(
4110 /*=======================*/
4111 	const ulint*	offsets,/*!< in: array returned by rec_get_offsets() */
4112 	ulint		n)	/*!< in: index of the external field */
4113 {
4114 	ulint	field_ref_offs;
4115 	ulint	local_len;
4116 
4117 	ut_a(rec_offs_nth_extern(offsets, n));
4118 	field_ref_offs = rec_get_nth_field_offs(offsets, n, &local_len);
4119 	ut_a(local_len != UNIV_SQL_NULL);
4120 	ut_a(local_len >= BTR_EXTERN_FIELD_REF_SIZE);
4121 
4122 	return(field_ref_offs + local_len - BTR_EXTERN_FIELD_REF_SIZE);
4123 }
4124 
4125 /** Gets a pointer to the externally stored part of a field.
4126 @param rec	record
4127 @param offsets	rec_get_offsets(rec)
4128 @param n	index of the externally stored field
4129 @return pointer to the externally stored part */
4130 #define btr_rec_get_field_ref(rec, offsets, n)			\
4131 	((rec) + btr_rec_get_field_ref_offs(offsets, n))
4132 
4133 /** Gets the externally stored size of a record, in units of a database page.
4134 @param[in]	rec	record
4135 @param[in]	offsets	array returned by rec_get_offsets()
4136 @return	externally stored part, in units of a database page */
4137 
4138 ulint
btr_rec_get_externally_stored_len(const rec_t * rec,const ulint * offsets)4139 btr_rec_get_externally_stored_len(
4140 	const rec_t*	rec,
4141 	const ulint*	offsets)
4142 {
4143 	ulint	n_fields;
4144 	ulint	total_extern_len = 0;
4145 	ulint	i;
4146 
4147 	ut_ad(!rec_offs_comp(offsets) || !rec_get_node_ptr_flag(rec));
4148 
4149 	if (!rec_offs_any_extern(offsets)) {
4150 		return(0);
4151 	}
4152 
4153 	n_fields = rec_offs_n_fields(offsets);
4154 
4155 	for (i = 0; i < n_fields; i++) {
4156 		if (rec_offs_nth_extern(offsets, i)) {
4157 
4158 			ulint	extern_len = mach_read_from_4(
4159 				btr_rec_get_field_ref(rec, offsets, i)
4160 				+ BTR_EXTERN_LEN + 4);
4161 
4162 			total_extern_len += ut_calc_align(extern_len,
4163 							  UNIV_PAGE_SIZE);
4164 		}
4165 	}
4166 
4167 	return(total_extern_len / UNIV_PAGE_SIZE);
4168 }
4169 
4170 /*******************************************************************//**
4171 Sets the ownership bit of an externally stored field in a record. */
4172 static
4173 void
btr_cur_set_ownership_of_extern_field(page_zip_des_t * page_zip,rec_t * rec,dict_index_t * index,const ulint * offsets,ulint i,ibool val,mtr_t * mtr)4174 btr_cur_set_ownership_of_extern_field(
4175 /*==================================*/
4176 	page_zip_des_t*	page_zip,/*!< in/out: compressed page whose uncompressed
4177 				part will be updated, or NULL */
4178 	rec_t*		rec,	/*!< in/out: clustered index record */
4179 	dict_index_t*	index,	/*!< in: index of the page */
4180 	const ulint*	offsets,/*!< in: array returned by rec_get_offsets() */
4181 	ulint		i,	/*!< in: field number */
4182 	ibool		val,	/*!< in: value to set */
4183 	mtr_t*		mtr)	/*!< in: mtr, or NULL if not logged */
4184 {
4185 	byte*	data;
4186 	ulint	local_len;
4187 	ulint	byte_val;
4188 
4189 	data = rec_get_nth_field(rec, offsets, i, &local_len);
4190 	ut_ad(rec_offs_nth_extern(offsets, i));
4191 	ut_a(local_len >= BTR_EXTERN_FIELD_REF_SIZE);
4192 
4193 	local_len -= BTR_EXTERN_FIELD_REF_SIZE;
4194 
4195 	byte_val = mach_read_from_1(data + local_len + BTR_EXTERN_LEN);
4196 
4197 	if (val) {
4198 		byte_val = byte_val & (~BTR_EXTERN_OWNER_FLAG);
4199 	} else {
4200 #if defined UNIV_DEBUG || defined UNIV_BLOB_LIGHT_DEBUG
4201 		ut_a(!(byte_val & BTR_EXTERN_OWNER_FLAG));
4202 #endif /* UNIV_DEBUG || UNIV_BLOB_LIGHT_DEBUG */
4203 		byte_val = byte_val | BTR_EXTERN_OWNER_FLAG;
4204 	}
4205 
4206 	if (page_zip) {
4207 		mach_write_to_1(data + local_len + BTR_EXTERN_LEN, byte_val);
4208 		page_zip_write_blob_ptr(page_zip, rec, index, offsets, i, mtr);
4209 	} else if (mtr != NULL) {
4210 
4211 		mlog_write_ulint(data + local_len + BTR_EXTERN_LEN, byte_val,
4212 				 MLOG_1BYTE, mtr);
4213 	} else {
4214 		mach_write_to_1(data + local_len + BTR_EXTERN_LEN, byte_val);
4215 	}
4216 
4217 	btr_blob_dbg_owner(rec, index, offsets, i, val);
4218 }
4219 
4220 /*******************************************************************//**
4221 Marks non-updated off-page fields as disowned by this record. The ownership
4222 must be transferred to the updated record which is inserted elsewhere in the
4223 index tree. In purge only the owner of externally stored field is allowed
4224 to free the field. */
4225 UNIV_INTERN
4226 void
btr_cur_disown_inherited_fields(page_zip_des_t * page_zip,rec_t * rec,dict_index_t * index,const ulint * offsets,const upd_t * update,mtr_t * mtr)4227 btr_cur_disown_inherited_fields(
4228 /*============================*/
4229 	page_zip_des_t*	page_zip,/*!< in/out: compressed page whose uncompressed
4230 				part will be updated, or NULL */
4231 	rec_t*		rec,	/*!< in/out: record in a clustered index */
4232 	dict_index_t*	index,	/*!< in: index of the page */
4233 	const ulint*	offsets,/*!< in: array returned by rec_get_offsets() */
4234 	const upd_t*	update,	/*!< in: update vector */
4235 	mtr_t*		mtr)	/*!< in/out: mini-transaction */
4236 {
4237 	ulint	i;
4238 
4239 	ut_ad(rec_offs_validate(rec, index, offsets));
4240 	ut_ad(!rec_offs_comp(offsets) || !rec_get_node_ptr_flag(rec));
4241 	ut_ad(rec_offs_any_extern(offsets));
4242 	ut_ad(mtr);
4243 
4244 	for (i = 0; i < rec_offs_n_fields(offsets); i++) {
4245 		if (rec_offs_nth_extern(offsets, i)
4246 		    && !upd_get_field_by_field_no(update, i)) {
4247 			btr_cur_set_ownership_of_extern_field(
4248 				page_zip, rec, index, offsets, i, FALSE, mtr);
4249 		}
4250 	}
4251 }
4252 
4253 /*******************************************************************//**
4254 Marks all extern fields in a record as owned by the record. This function
4255 should be called if the delete mark of a record is removed: a not delete
4256 marked record always owns all its extern fields. */
4257 static
4258 void
btr_cur_unmark_extern_fields(page_zip_des_t * page_zip,rec_t * rec,dict_index_t * index,const ulint * offsets,mtr_t * mtr)4259 btr_cur_unmark_extern_fields(
4260 /*=========================*/
4261 	page_zip_des_t*	page_zip,/*!< in/out: compressed page whose uncompressed
4262 				part will be updated, or NULL */
4263 	rec_t*		rec,	/*!< in/out: record in a clustered index */
4264 	dict_index_t*	index,	/*!< in: index of the page */
4265 	const ulint*	offsets,/*!< in: array returned by rec_get_offsets() */
4266 	mtr_t*		mtr)	/*!< in: mtr, or NULL if not logged */
4267 {
4268 	ulint	n;
4269 	ulint	i;
4270 
4271 	ut_ad(!rec_offs_comp(offsets) || !rec_get_node_ptr_flag(rec));
4272 	n = rec_offs_n_fields(offsets);
4273 
4274 	if (!rec_offs_any_extern(offsets)) {
4275 
4276 		return;
4277 	}
4278 
4279 	for (i = 0; i < n; i++) {
4280 		if (rec_offs_nth_extern(offsets, i)) {
4281 
4282 			btr_cur_set_ownership_of_extern_field(
4283 				page_zip, rec, index, offsets, i, TRUE, mtr);
4284 		}
4285 	}
4286 }
4287 
4288 /*******************************************************************//**
4289 Flags the data tuple fields that are marked as extern storage in the
4290 update vector.  We use this function to remember which fields we must
4291 mark as extern storage in a record inserted for an update.
4292 @return	number of flagged external columns */
4293 UNIV_INTERN
4294 ulint
btr_push_update_extern_fields(dtuple_t * tuple,const upd_t * update,mem_heap_t * heap)4295 btr_push_update_extern_fields(
4296 /*==========================*/
4297 	dtuple_t*	tuple,	/*!< in/out: data tuple */
4298 	const upd_t*	update,	/*!< in: update vector */
4299 	mem_heap_t*	heap)	/*!< in: memory heap */
4300 {
4301 	ulint			n_pushed	= 0;
4302 	ulint			n;
4303 	const upd_field_t*	uf;
4304 
4305 	ut_ad(tuple);
4306 	ut_ad(update);
4307 
4308 	uf = update->fields;
4309 	n = upd_get_n_fields(update);
4310 
4311 	for (; n--; uf++) {
4312 		if (dfield_is_ext(&uf->new_val)) {
4313 			dfield_t*	field
4314 				= dtuple_get_nth_field(tuple, uf->field_no);
4315 
4316 			if (!dfield_is_ext(field)) {
4317 				dfield_set_ext(field);
4318 				n_pushed++;
4319 			}
4320 
4321 			switch (uf->orig_len) {
4322 				byte*	data;
4323 				ulint	len;
4324 				byte*	buf;
4325 			case 0:
4326 				break;
4327 			case BTR_EXTERN_FIELD_REF_SIZE:
4328 				/* Restore the original locally stored
4329 				part of the column.  In the undo log,
4330 				InnoDB writes a longer prefix of externally
4331 				stored columns, so that column prefixes
4332 				in secondary indexes can be reconstructed. */
4333 				dfield_set_data(field, (byte*) dfield_get_data(field)
4334 						+ dfield_get_len(field)
4335 						- BTR_EXTERN_FIELD_REF_SIZE,
4336 						BTR_EXTERN_FIELD_REF_SIZE);
4337 				dfield_set_ext(field);
4338 				break;
4339 			default:
4340 				/* Reconstruct the original locally
4341 				stored part of the column.  The data
4342 				will have to be copied. */
4343 				ut_a(uf->orig_len > BTR_EXTERN_FIELD_REF_SIZE);
4344 
4345 				data = (byte*) dfield_get_data(field);
4346 				len = dfield_get_len(field);
4347 
4348 				buf = (byte*) mem_heap_alloc(heap,
4349 							     uf->orig_len);
4350 				/* Copy the locally stored prefix. */
4351 				memcpy(buf, data,
4352 				       uf->orig_len
4353 				       - BTR_EXTERN_FIELD_REF_SIZE);
4354 				/* Copy the BLOB pointer. */
4355 				memcpy(buf + uf->orig_len
4356 				       - BTR_EXTERN_FIELD_REF_SIZE,
4357 				       data + len - BTR_EXTERN_FIELD_REF_SIZE,
4358 				       BTR_EXTERN_FIELD_REF_SIZE);
4359 
4360 				dfield_set_data(field, buf, uf->orig_len);
4361 				dfield_set_ext(field);
4362 			}
4363 		}
4364 	}
4365 
4366 	return(n_pushed);
4367 }
4368 
4369 /*******************************************************************//**
4370 Returns the length of a BLOB part stored on the header page.
4371 @return	part length */
4372 static
4373 ulint
btr_blob_get_part_len(const byte * blob_header)4374 btr_blob_get_part_len(
4375 /*==================*/
4376 	const byte*	blob_header)	/*!< in: blob header */
4377 {
4378 	return(mach_read_from_4(blob_header + BTR_BLOB_HDR_PART_LEN));
4379 }
4380 
4381 /*******************************************************************//**
4382 Returns the page number where the next BLOB part is stored.
4383 @return	page number or FIL_NULL if no more pages */
4384 static
4385 ulint
btr_blob_get_next_page_no(const byte * blob_header)4386 btr_blob_get_next_page_no(
4387 /*======================*/
4388 	const byte*	blob_header)	/*!< in: blob header */
4389 {
4390 	return(mach_read_from_4(blob_header + BTR_BLOB_HDR_NEXT_PAGE_NO));
4391 }
4392 
4393 /*******************************************************************//**
4394 Deallocate a buffer block that was reserved for a BLOB part. */
4395 static
4396 void
btr_blob_free(buf_block_t * block,ibool all,mtr_t * mtr)4397 btr_blob_free(
4398 /*==========*/
4399 	buf_block_t*	block,	/*!< in: buffer block */
4400 	ibool		all,	/*!< in: TRUE=remove also the compressed page
4401 				if there is one */
4402 	mtr_t*		mtr)	/*!< in: mini-transaction to commit */
4403 {
4404 	buf_pool_t*	buf_pool = buf_pool_from_block(block);
4405 	ulint		space	= buf_block_get_space(block);
4406 	ulint		page_no	= buf_block_get_page_no(block);
4407 
4408 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
4409 
4410 	mtr_commit(mtr);
4411 
4412 	buf_pool_mutex_enter(buf_pool);
4413 
4414 	/* Only free the block if it is still allocated to
4415 	the same file page. */
4416 
4417 	if (buf_block_get_state(block)
4418 	    == BUF_BLOCK_FILE_PAGE
4419 	    && buf_block_get_space(block) == space
4420 	    && buf_block_get_page_no(block) == page_no) {
4421 
4422 		if (!buf_LRU_free_page(&block->page, all)
4423 		    && all && block->page.zip.data) {
4424 			/* Attempt to deallocate the uncompressed page
4425 			if the whole block cannot be deallocted. */
4426 
4427 			buf_LRU_free_page(&block->page, false);
4428 		}
4429 	}
4430 
4431 	buf_pool_mutex_exit(buf_pool);
4432 }
4433 
4434 /*******************************************************************//**
4435 Stores the fields in big_rec_vec to the tablespace and puts pointers to
4436 them in rec.  The extern flags in rec will have to be set beforehand.
4437 The fields are stored on pages allocated from leaf node
4438 file segment of the index tree.
4439 @return	DB_SUCCESS or DB_OUT_OF_FILE_SPACE or DB_TOO_BIG_FOR_REDO */
4440 UNIV_INTERN
4441 dberr_t
btr_store_big_rec_extern_fields(dict_index_t * index,buf_block_t * rec_block,rec_t * rec,const ulint * offsets,const big_rec_t * big_rec_vec,mtr_t * btr_mtr,enum blob_op op)4442 btr_store_big_rec_extern_fields(
4443 /*============================*/
4444 	dict_index_t*	index,		/*!< in: index of rec; the index tree
4445 					MUST be X-latched */
4446 	buf_block_t*	rec_block,	/*!< in/out: block containing rec */
4447 	rec_t*		rec,		/*!< in/out: record */
4448 	const ulint*	offsets,	/*!< in: rec_get_offsets(rec, index);
4449 					the "external storage" flags in offsets
4450 					will not correspond to rec when
4451 					this function returns */
4452 	const big_rec_t*big_rec_vec,	/*!< in: vector containing fields
4453 					to be stored externally */
4454 	mtr_t*		btr_mtr,	/*!< in: mtr containing the
4455 					latches to the clustered index */
4456 	enum blob_op	op)		/*! in: operation code */
4457 {
4458 	ulint		rec_page_no;
4459 	byte*		field_ref;
4460 	ulint		extern_len;
4461 	ulint		store_len;
4462 	ulint		page_no;
4463 	ulint		space_id;
4464 	ulint		zip_size;
4465 	ulint		prev_page_no;
4466 	ulint		hint_page_no;
4467 	ulint		i;
4468 	mtr_t		mtr;
4469 	mtr_t*		alloc_mtr;
4470 	mem_heap_t*	heap = NULL;
4471 	page_zip_des_t*	page_zip;
4472 	z_stream	c_stream;
4473 	buf_block_t**	freed_pages	= NULL;
4474 	ulint		n_freed_pages	= 0;
4475 	dberr_t		error		= DB_SUCCESS;
4476 
4477 	ut_ad(rec_offs_validate(rec, index, offsets));
4478 	ut_ad(rec_offs_any_extern(offsets));
4479 	ut_ad(btr_mtr);
4480 	ut_ad(mtr_memo_contains(btr_mtr, dict_index_get_lock(index),
4481 				MTR_MEMO_X_LOCK));
4482 	ut_ad(mtr_memo_contains(btr_mtr, rec_block, MTR_MEMO_PAGE_X_FIX));
4483 	ut_ad(buf_block_get_frame(rec_block) == page_align(rec));
4484 	ut_a(dict_index_is_clust(index));
4485 
4486 	page_zip = buf_block_get_page_zip(rec_block);
4487 	ut_a(dict_table_zip_size(index->table)
4488 	     == buf_block_get_zip_size(rec_block));
4489 
4490 	space_id = buf_block_get_space(rec_block);
4491 	zip_size = buf_block_get_zip_size(rec_block);
4492 	rec_page_no = buf_block_get_page_no(rec_block);
4493 	ut_a(fil_page_get_type(page_align(rec)) == FIL_PAGE_INDEX);
4494 
4495 	error = btr_check_blob_limit(big_rec_vec);
4496 
4497 	if (error != DB_SUCCESS) {
4498 		ut_ad(op == BTR_STORE_INSERT);
4499 		return(error);
4500 	}
4501 
4502 	if (page_zip) {
4503 		int	err;
4504 
4505 		/* Zlib deflate needs 128 kilobytes for the default
4506 		window size, plus 512 << memLevel, plus a few
4507 		kilobytes for small objects.  We use reduced memLevel
4508 		to limit the memory consumption, and preallocate the
4509 		heap, hoping to avoid memory fragmentation. */
4510 		heap = mem_heap_create(250000);
4511 		page_zip_set_alloc(&c_stream, heap);
4512 
4513 		err = deflateInit2(&c_stream, page_zip_level,
4514 				   Z_DEFLATED, 15, 7, Z_DEFAULT_STRATEGY);
4515 		ut_a(err == Z_OK);
4516 	}
4517 
4518 	if (btr_blob_op_is_update(op)) {
4519 		/* Avoid reusing pages that have been previously freed
4520 		in btr_mtr. */
4521 		if (btr_mtr->n_freed_pages) {
4522 			if (heap == NULL) {
4523 				heap = mem_heap_create(
4524 					btr_mtr->n_freed_pages
4525 					* sizeof *freed_pages);
4526 			}
4527 
4528 			freed_pages = static_cast<buf_block_t**>(
4529 				mem_heap_alloc(
4530 					heap,
4531 					btr_mtr->n_freed_pages
4532 					* sizeof *freed_pages));
4533 			n_freed_pages = 0;
4534 		}
4535 
4536 		/* Because btr_mtr will be committed after mtr, it is
4537 		possible that the tablespace has been extended when
4538 		the B-tree record was updated or inserted, or it will
4539 		be extended while allocating pages for big_rec.
4540 
4541 		TODO: In mtr (not btr_mtr), write a redo log record
4542 		about extending the tablespace to its current size,
4543 		and remember the current size. Whenever the tablespace
4544 		grows as pages are allocated, write further redo log
4545 		records to mtr. (Currently tablespace extension is not
4546 		covered by the redo log. If it were, the record would
4547 		only be written to btr_mtr, which is committed after
4548 		mtr.) */
4549 		alloc_mtr = btr_mtr;
4550 	} else {
4551 		/* Use the local mtr for allocations. */
4552 		alloc_mtr = &mtr;
4553 	}
4554 
4555 #if defined UNIV_DEBUG || defined UNIV_BLOB_LIGHT_DEBUG
4556 	/* All pointers to externally stored columns in the record
4557 	must either be zero or they must be pointers to inherited
4558 	columns, owned by this record or an earlier record version. */
4559 	for (i = 0; i < rec_offs_n_fields(offsets); i++) {
4560 		if (!rec_offs_nth_extern(offsets, i)) {
4561 			continue;
4562 		}
4563 		field_ref = btr_rec_get_field_ref(rec, offsets, i);
4564 
4565 		ut_a(!(field_ref[BTR_EXTERN_LEN] & BTR_EXTERN_OWNER_FLAG));
4566 		/* Either this must be an update in place,
4567 		or the BLOB must be inherited, or the BLOB pointer
4568 		must be zero (will be written in this function). */
4569 		ut_a(op == BTR_STORE_UPDATE
4570 		     || (field_ref[BTR_EXTERN_LEN] & BTR_EXTERN_INHERITED_FLAG)
4571 		     || !memcmp(field_ref, field_ref_zero,
4572 				BTR_EXTERN_FIELD_REF_SIZE));
4573 	}
4574 #endif /* UNIV_DEBUG || UNIV_BLOB_LIGHT_DEBUG */
4575 	/* We have to create a file segment to the tablespace
4576 	for each field and put the pointer to the field in rec */
4577 
4578 	for (i = 0; i < big_rec_vec->n_fields; i++) {
4579 		field_ref = btr_rec_get_field_ref(
4580 			rec, offsets, big_rec_vec->fields[i].field_no);
4581 #if defined UNIV_DEBUG || defined UNIV_BLOB_LIGHT_DEBUG
4582 		/* A zero BLOB pointer should have been initially inserted. */
4583 		ut_a(!memcmp(field_ref, field_ref_zero,
4584 			     BTR_EXTERN_FIELD_REF_SIZE));
4585 #endif /* UNIV_DEBUG || UNIV_BLOB_LIGHT_DEBUG */
4586 		extern_len = big_rec_vec->fields[i].len;
4587 		UNIV_MEM_ASSERT_RW(big_rec_vec->fields[i].data,
4588 				   extern_len);
4589 
4590 		ut_a(extern_len > 0);
4591 
4592 		prev_page_no = FIL_NULL;
4593 
4594 		if (page_zip) {
4595 			int	err = deflateReset(&c_stream);
4596 			ut_a(err == Z_OK);
4597 
4598 			c_stream.next_in = (Bytef*)
4599 				big_rec_vec->fields[i].data;
4600 			c_stream.avail_in = static_cast<uInt>(extern_len);
4601 		}
4602 
4603 		for (;;) {
4604 			buf_block_t*	block;
4605 			page_t*		page;
4606 
4607 			mtr_start(&mtr);
4608 
4609 			if (prev_page_no == FIL_NULL) {
4610 				hint_page_no = 1 + rec_page_no;
4611 			} else {
4612 				hint_page_no = prev_page_no + 1;
4613 			}
4614 
4615 alloc_another:
4616 			block = btr_page_alloc(index, hint_page_no,
4617 					       FSP_NO_DIR, 0, alloc_mtr, &mtr);
4618 			if (UNIV_UNLIKELY(block == NULL)) {
4619 				mtr_commit(&mtr);
4620 				error = DB_OUT_OF_FILE_SPACE;
4621 				goto func_exit;
4622 			}
4623 
4624 			if (rw_lock_get_x_lock_count(&block->lock) > 1) {
4625 				/* This page must have been freed in
4626 				btr_mtr previously. Put it aside, and
4627 				allocate another page for the BLOB data. */
4628 				ut_ad(alloc_mtr == btr_mtr);
4629 				ut_ad(btr_blob_op_is_update(op));
4630 				ut_ad(n_freed_pages < btr_mtr->n_freed_pages);
4631 				freed_pages[n_freed_pages++] = block;
4632 				goto alloc_another;
4633 			}
4634 
4635 			page_no = buf_block_get_page_no(block);
4636 			page = buf_block_get_frame(block);
4637 
4638 			if (prev_page_no != FIL_NULL) {
4639 				buf_block_t*	prev_block;
4640 				page_t*		prev_page;
4641 
4642 				prev_block = buf_page_get(space_id, zip_size,
4643 							  prev_page_no,
4644 							  RW_X_LATCH, &mtr);
4645 				buf_block_dbg_add_level(prev_block,
4646 							SYNC_EXTERN_STORAGE);
4647 				prev_page = buf_block_get_frame(prev_block);
4648 
4649 				if (page_zip) {
4650 					mlog_write_ulint(
4651 						prev_page + FIL_PAGE_NEXT,
4652 						page_no, MLOG_4BYTES, &mtr);
4653 					memcpy(buf_block_get_page_zip(
4654 						       prev_block)
4655 					       ->data + FIL_PAGE_NEXT,
4656 					       prev_page + FIL_PAGE_NEXT, 4);
4657 				} else {
4658 					mlog_write_ulint(
4659 						prev_page + FIL_PAGE_DATA
4660 						+ BTR_BLOB_HDR_NEXT_PAGE_NO,
4661 						page_no, MLOG_4BYTES, &mtr);
4662 				}
4663 
4664 			} else if (dict_index_is_online_ddl(index)) {
4665 				row_log_table_blob_alloc(index, page_no);
4666 			}
4667 
4668 			if (page_zip) {
4669 				int		err;
4670 				page_zip_des_t*	blob_page_zip;
4671 
4672 				/* Write FIL_PAGE_TYPE to the redo log
4673 				separately, before logging any other
4674 				changes to the page, so that the debug
4675 				assertions in
4676 				recv_parse_or_apply_log_rec_body() can
4677 				be made simpler.  Before InnoDB Plugin
4678 				1.0.4, the initialization of
4679 				FIL_PAGE_TYPE was logged as part of
4680 				the mlog_log_string() below. */
4681 
4682 				mlog_write_ulint(page + FIL_PAGE_TYPE,
4683 						 prev_page_no == FIL_NULL
4684 						 ? FIL_PAGE_TYPE_ZBLOB
4685 						 : FIL_PAGE_TYPE_ZBLOB2,
4686 						 MLOG_2BYTES, &mtr);
4687 
4688 				c_stream.next_out = page
4689 					+ FIL_PAGE_DATA;
4690 				c_stream.avail_out
4691 					= static_cast<uInt>(page_zip_get_size(page_zip))
4692 					- FIL_PAGE_DATA;
4693 
4694 				err = deflate(&c_stream, Z_FINISH);
4695 				ut_a(err == Z_OK || err == Z_STREAM_END);
4696 				ut_a(err == Z_STREAM_END
4697 				     || c_stream.avail_out == 0);
4698 
4699 				/* Write the "next BLOB page" pointer */
4700 				mlog_write_ulint(page + FIL_PAGE_NEXT,
4701 						 FIL_NULL, MLOG_4BYTES, &mtr);
4702 				/* Initialize the unused "prev page" pointer */
4703 				mlog_write_ulint(page + FIL_PAGE_PREV,
4704 						 FIL_NULL, MLOG_4BYTES, &mtr);
4705 				/* Write a back pointer to the record
4706 				into the otherwise unused area.  This
4707 				information could be useful in
4708 				debugging.  Later, we might want to
4709 				implement the possibility to relocate
4710 				BLOB pages.  Then, we would need to be
4711 				able to adjust the BLOB pointer in the
4712 				record.  We do not store the heap
4713 				number of the record, because it can
4714 				change in page_zip_reorganize() or
4715 				btr_page_reorganize().  However, also
4716 				the page number of the record may
4717 				change when B-tree nodes are split or
4718 				merged. */
4719 				mlog_write_ulint(page
4720 						 + FIL_PAGE_FILE_FLUSH_LSN,
4721 						 space_id,
4722 						 MLOG_4BYTES, &mtr);
4723 				mlog_write_ulint(page
4724 						 + FIL_PAGE_FILE_FLUSH_LSN + 4,
4725 						 rec_page_no,
4726 						 MLOG_4BYTES, &mtr);
4727 
4728 				/* Zero out the unused part of the page. */
4729 				memset(page + page_zip_get_size(page_zip)
4730 				       - c_stream.avail_out,
4731 				       0, c_stream.avail_out);
4732 				mlog_log_string(page + FIL_PAGE_FILE_FLUSH_LSN,
4733 						page_zip_get_size(page_zip)
4734 						- FIL_PAGE_FILE_FLUSH_LSN,
4735 						&mtr);
4736 				/* Copy the page to compressed storage,
4737 				because it will be flushed to disk
4738 				from there. */
4739 				blob_page_zip = buf_block_get_page_zip(block);
4740 				ut_ad(blob_page_zip);
4741 				ut_ad(page_zip_get_size(blob_page_zip)
4742 				      == page_zip_get_size(page_zip));
4743 				memcpy(blob_page_zip->data, page,
4744 				       page_zip_get_size(page_zip));
4745 
4746 				if (err == Z_OK && prev_page_no != FIL_NULL) {
4747 
4748 					goto next_zip_page;
4749 				}
4750 
4751 				if (alloc_mtr == &mtr) {
4752 					rec_block = buf_page_get(
4753 						space_id, zip_size,
4754 						rec_page_no,
4755 						RW_X_LATCH, &mtr);
4756 					buf_block_dbg_add_level(
4757 						rec_block,
4758 						SYNC_NO_ORDER_CHECK);
4759 				}
4760 
4761 				if (err == Z_STREAM_END) {
4762 					mach_write_to_4(field_ref
4763 							+ BTR_EXTERN_LEN, 0);
4764 					mach_write_to_4(field_ref
4765 							+ BTR_EXTERN_LEN + 4,
4766 							c_stream.total_in);
4767 				} else {
4768 					memset(field_ref + BTR_EXTERN_LEN,
4769 					       0, 8);
4770 				}
4771 
4772 				if (prev_page_no == FIL_NULL) {
4773 					btr_blob_dbg_add_blob(
4774 						rec, big_rec_vec->fields[i]
4775 						.field_no, page_no, index,
4776 						"store");
4777 
4778 					mach_write_to_4(field_ref
4779 							+ BTR_EXTERN_SPACE_ID,
4780 							space_id);
4781 
4782 					mach_write_to_4(field_ref
4783 							+ BTR_EXTERN_PAGE_NO,
4784 							page_no);
4785 
4786 					mach_write_to_4(field_ref
4787 							+ BTR_EXTERN_OFFSET,
4788 							FIL_PAGE_NEXT);
4789 				}
4790 
4791 				page_zip_write_blob_ptr(
4792 					page_zip, rec, index, offsets,
4793 					big_rec_vec->fields[i].field_no,
4794 					alloc_mtr);
4795 
4796 next_zip_page:
4797 				prev_page_no = page_no;
4798 
4799 				/* Commit mtr and release the
4800 				uncompressed page frame to save memory. */
4801 				btr_blob_free(block, FALSE, &mtr);
4802 
4803 				if (err == Z_STREAM_END) {
4804 					break;
4805 				}
4806 			} else {
4807 				mlog_write_ulint(page + FIL_PAGE_TYPE,
4808 						 FIL_PAGE_TYPE_BLOB,
4809 						 MLOG_2BYTES, &mtr);
4810 
4811 				if (extern_len > (UNIV_PAGE_SIZE
4812 						  - FIL_PAGE_DATA
4813 						  - BTR_BLOB_HDR_SIZE
4814 						  - FIL_PAGE_DATA_END)) {
4815 					store_len = UNIV_PAGE_SIZE
4816 						- FIL_PAGE_DATA
4817 						- BTR_BLOB_HDR_SIZE
4818 						- FIL_PAGE_DATA_END;
4819 				} else {
4820 					store_len = extern_len;
4821 				}
4822 
4823 				mlog_write_string(page + FIL_PAGE_DATA
4824 						  + BTR_BLOB_HDR_SIZE,
4825 						  (const byte*)
4826 						  big_rec_vec->fields[i].data
4827 						  + big_rec_vec->fields[i].len
4828 						  - extern_len,
4829 						  store_len, &mtr);
4830 				mlog_write_ulint(page + FIL_PAGE_DATA
4831 						 + BTR_BLOB_HDR_PART_LEN,
4832 						 store_len, MLOG_4BYTES, &mtr);
4833 				mlog_write_ulint(page + FIL_PAGE_DATA
4834 						 + BTR_BLOB_HDR_NEXT_PAGE_NO,
4835 						 FIL_NULL, MLOG_4BYTES, &mtr);
4836 
4837 				extern_len -= store_len;
4838 
4839 				if (alloc_mtr == &mtr) {
4840 					rec_block = buf_page_get(
4841 						space_id, zip_size,
4842 						rec_page_no,
4843 						RW_X_LATCH, &mtr);
4844 					buf_block_dbg_add_level(
4845 						rec_block,
4846 						SYNC_NO_ORDER_CHECK);
4847 				}
4848 
4849 				mlog_write_ulint(field_ref + BTR_EXTERN_LEN, 0,
4850 						 MLOG_4BYTES, alloc_mtr);
4851 				mlog_write_ulint(field_ref
4852 						 + BTR_EXTERN_LEN + 4,
4853 						 big_rec_vec->fields[i].len
4854 						 - extern_len,
4855 						 MLOG_4BYTES, alloc_mtr);
4856 
4857 				if (prev_page_no == FIL_NULL) {
4858 					btr_blob_dbg_add_blob(
4859 						rec, big_rec_vec->fields[i]
4860 						.field_no, page_no, index,
4861 						"store");
4862 
4863 					mlog_write_ulint(field_ref
4864 							 + BTR_EXTERN_SPACE_ID,
4865 							 space_id, MLOG_4BYTES,
4866 							 alloc_mtr);
4867 
4868 					mlog_write_ulint(field_ref
4869 							 + BTR_EXTERN_PAGE_NO,
4870 							 page_no, MLOG_4BYTES,
4871 							 alloc_mtr);
4872 
4873 					mlog_write_ulint(field_ref
4874 							 + BTR_EXTERN_OFFSET,
4875 							 FIL_PAGE_DATA,
4876 							 MLOG_4BYTES,
4877 							 alloc_mtr);
4878 				}
4879 
4880 				prev_page_no = page_no;
4881 
4882 				mtr_commit(&mtr);
4883 
4884 				if (extern_len == 0) {
4885 					break;
4886 				}
4887 			}
4888 		}
4889 
4890 		DBUG_EXECUTE_IF("btr_store_big_rec_extern",
4891 				error = DB_OUT_OF_FILE_SPACE;
4892 				goto func_exit;);
4893 	}
4894 
4895 func_exit:
4896 	if (page_zip) {
4897 		deflateEnd(&c_stream);
4898 	}
4899 
4900 	if (n_freed_pages) {
4901 		ulint	i;
4902 
4903 		ut_ad(alloc_mtr == btr_mtr);
4904 		ut_ad(btr_blob_op_is_update(op));
4905 
4906 		for (i = 0; i < n_freed_pages; i++) {
4907 			btr_page_free_low(index, freed_pages[i], 0, alloc_mtr);
4908 		}
4909 	}
4910 
4911 	if (heap != NULL) {
4912 		mem_heap_free(heap);
4913 	}
4914 
4915 #if defined UNIV_DEBUG || defined UNIV_BLOB_LIGHT_DEBUG
4916 	/* All pointers to externally stored columns in the record
4917 	must be valid. */
4918 	for (i = 0; i < rec_offs_n_fields(offsets); i++) {
4919 		if (!rec_offs_nth_extern(offsets, i)) {
4920 			continue;
4921 		}
4922 
4923 		field_ref = btr_rec_get_field_ref(rec, offsets, i);
4924 
4925 		/* The pointer must not be zero if the operation
4926 		succeeded. */
4927 		ut_a(0 != memcmp(field_ref, field_ref_zero,
4928 				 BTR_EXTERN_FIELD_REF_SIZE)
4929 		     || error != DB_SUCCESS);
4930 		/* The column must not be disowned by this record. */
4931 		ut_a(!(field_ref[BTR_EXTERN_LEN] & BTR_EXTERN_OWNER_FLAG));
4932 	}
4933 #endif /* UNIV_DEBUG || UNIV_BLOB_LIGHT_DEBUG */
4934 	return(error);
4935 }
4936 
4937 /*******************************************************************//**
4938 Check the FIL_PAGE_TYPE on an uncompressed BLOB page. */
4939 static
4940 void
btr_check_blob_fil_page_type(ulint space_id,ulint page_no,const page_t * page,ibool read)4941 btr_check_blob_fil_page_type(
4942 /*=========================*/
4943 	ulint		space_id,	/*!< in: space id */
4944 	ulint		page_no,	/*!< in: page number */
4945 	const page_t*	page,		/*!< in: page */
4946 	ibool		read)		/*!< in: TRUE=read, FALSE=purge */
4947 {
4948 	ulint	type = fil_page_get_type(page);
4949 
4950 	ut_a(space_id == page_get_space_id(page));
4951 	ut_a(page_no == page_get_page_no(page));
4952 
4953 	if (UNIV_UNLIKELY(type != FIL_PAGE_TYPE_BLOB)) {
4954 		ulint	flags = fil_space_get_flags(space_id);
4955 
4956 #ifndef UNIV_DEBUG /* Improve debug test coverage */
4957 		if (dict_tf_get_format(flags) == UNIV_FORMAT_A) {
4958 			/* Old versions of InnoDB did not initialize
4959 			FIL_PAGE_TYPE on BLOB pages.  Do not print
4960 			anything about the type mismatch when reading
4961 			a BLOB page that is in Antelope format.*/
4962 			return;
4963 		}
4964 #endif /* !UNIV_DEBUG */
4965 
4966 		ut_print_timestamp(stderr);
4967 		fprintf(stderr,
4968 			"  InnoDB: FIL_PAGE_TYPE=%lu"
4969 			" on BLOB %s space %lu page %lu flags %lx\n",
4970 			(ulong) type, read ? "read" : "purge",
4971 			(ulong) space_id, (ulong) page_no, (ulong) flags);
4972 		ut_error;
4973 	}
4974 }
4975 
4976 /*******************************************************************//**
4977 Frees the space in an externally stored field to the file space
4978 management if the field in data is owned by the externally stored field,
4979 in a rollback we may have the additional condition that the field must
4980 not be inherited. */
4981 UNIV_INTERN
4982 void
btr_free_externally_stored_field(dict_index_t * index,byte * field_ref,const rec_t * rec,const ulint * offsets,page_zip_des_t * page_zip,ulint i,enum trx_rb_ctx rb_ctx,mtr_t * local_mtr MY_ATTRIBUTE ((unused)))4983 btr_free_externally_stored_field(
4984 /*=============================*/
4985 	dict_index_t*	index,		/*!< in: index of the data, the index
4986 					tree MUST be X-latched; if the tree
4987 					height is 1, then also the root page
4988 					must be X-latched! (this is relevant
4989 					in the case this function is called
4990 					from purge where 'data' is located on
4991 					an undo log page, not an index
4992 					page) */
4993 	byte*		field_ref,	/*!< in/out: field reference */
4994 	const rec_t*	rec,		/*!< in: record containing field_ref, for
4995 					page_zip_write_blob_ptr(), or NULL */
4996 	const ulint*	offsets,	/*!< in: rec_get_offsets(rec, index),
4997 					or NULL */
4998 	page_zip_des_t*	page_zip,	/*!< in: compressed page corresponding
4999 					to rec, or NULL if rec == NULL */
5000 	ulint		i,		/*!< in: field number of field_ref;
5001 					ignored if rec == NULL */
5002 	enum trx_rb_ctx	rb_ctx,		/*!< in: rollback context */
5003 	mtr_t*		local_mtr MY_ATTRIBUTE((unused))) /*!< in: mtr
5004 					containing the latch to data an an
5005 					X-latch to the index tree */
5006 {
5007 	page_t*		page;
5008 	const ulint	space_id	= mach_read_from_4(
5009 		field_ref + BTR_EXTERN_SPACE_ID);
5010 	const ulint	start_page	= mach_read_from_4(
5011 		field_ref + BTR_EXTERN_PAGE_NO);
5012 	ulint		rec_zip_size = dict_table_zip_size(index->table);
5013 	ulint		ext_zip_size;
5014 	ulint		page_no;
5015 	ulint		next_page_no;
5016 	mtr_t		mtr;
5017 
5018 	ut_ad(dict_index_is_clust(index));
5019 	ut_ad(mtr_memo_contains(local_mtr, dict_index_get_lock(index),
5020 				MTR_MEMO_X_LOCK));
5021 	ut_ad(mtr_memo_contains_page(local_mtr, field_ref,
5022 				     MTR_MEMO_PAGE_X_FIX));
5023 	ut_ad(!rec || rec_offs_validate(rec, index, offsets));
5024 	ut_ad(!rec || field_ref == btr_rec_get_field_ref(rec, offsets, i));
5025 
5026 	if (UNIV_UNLIKELY(!memcmp(field_ref, field_ref_zero,
5027 				  BTR_EXTERN_FIELD_REF_SIZE))) {
5028 		/* In the rollback, we may encounter a clustered index
5029 		record with some unwritten off-page columns. There is
5030 		nothing to free then. */
5031 		ut_a(rb_ctx != RB_NONE);
5032 		return;
5033 	}
5034 
5035 	ut_ad(space_id == index->space);
5036 
5037 	if (UNIV_UNLIKELY(space_id != dict_index_get_space(index))) {
5038 		ext_zip_size = fil_space_get_zip_size(space_id);
5039 		/* This must be an undo log record in the system tablespace,
5040 		that is, in row_purge_upd_exist_or_extern().
5041 		Currently, externally stored records are stored in the
5042 		same tablespace as the referring records. */
5043 		ut_ad(!page_get_space_id(page_align(field_ref)));
5044 		ut_ad(!rec);
5045 		ut_ad(!page_zip);
5046 	} else {
5047 		ext_zip_size = rec_zip_size;
5048 	}
5049 
5050 	if (!rec) {
5051 		/* This is a call from row_purge_upd_exist_or_extern(). */
5052 		ut_ad(!page_zip);
5053 		rec_zip_size = 0;
5054 	}
5055 
5056 #ifdef UNIV_BLOB_DEBUG
5057 	if (!(field_ref[BTR_EXTERN_LEN] & BTR_EXTERN_OWNER_FLAG)
5058 	    && !((field_ref[BTR_EXTERN_LEN] & BTR_EXTERN_INHERITED_FLAG)
5059 		 && (rb_ctx == RB_NORMAL || rb_ctx == RB_RECOVERY))) {
5060 		/* This off-page column will be freed.
5061 		Check that no references remain. */
5062 
5063 		btr_blob_dbg_t	b;
5064 
5065 		b.blob_page_no = start_page;
5066 
5067 		if (rec) {
5068 			/* Remove the reference from the record to the
5069 			BLOB. If the BLOB were not freed, the
5070 			reference would be removed when the record is
5071 			removed. Freeing the BLOB will overwrite the
5072 			BTR_EXTERN_PAGE_NO in the field_ref of the
5073 			record with FIL_NULL, which would make the
5074 			btr_blob_dbg information inconsistent with the
5075 			record. */
5076 			b.ref_page_no = page_get_page_no(page_align(rec));
5077 			b.ref_heap_no = page_rec_get_heap_no(rec);
5078 			b.ref_field_no = i;
5079 			btr_blob_dbg_rbt_delete(index, &b, "free");
5080 		}
5081 
5082 		btr_blob_dbg_assert_empty(index, b.blob_page_no);
5083 	}
5084 #endif /* UNIV_BLOB_DEBUG */
5085 
5086 	for (;;) {
5087 #ifdef UNIV_SYNC_DEBUG
5088 		buf_block_t*	rec_block;
5089 #endif /* UNIV_SYNC_DEBUG */
5090 		buf_block_t*	ext_block;
5091 
5092 		mtr_start(&mtr);
5093 
5094 #ifdef UNIV_SYNC_DEBUG
5095 		rec_block =
5096 #endif /* UNIV_SYNC_DEBUG */
5097 		buf_page_get(page_get_space_id(page_align(field_ref)),
5098 			     rec_zip_size,
5099 			     page_get_page_no(page_align(field_ref)),
5100 			     RW_X_LATCH, &mtr);
5101 		buf_block_dbg_add_level(rec_block, SYNC_NO_ORDER_CHECK);
5102 		page_no = mach_read_from_4(field_ref + BTR_EXTERN_PAGE_NO);
5103 
5104 		if (/* There is no external storage data */
5105 		    page_no == FIL_NULL
5106 		    /* This field does not own the externally stored field */
5107 		    || (mach_read_from_1(field_ref + BTR_EXTERN_LEN)
5108 			& BTR_EXTERN_OWNER_FLAG)
5109 		    /* Rollback and inherited field */
5110 		    || ((rb_ctx == RB_NORMAL || rb_ctx == RB_RECOVERY)
5111 			&& (mach_read_from_1(field_ref + BTR_EXTERN_LEN)
5112 			    & BTR_EXTERN_INHERITED_FLAG))) {
5113 
5114 			/* Do not free */
5115 			mtr_commit(&mtr);
5116 
5117 			return;
5118 		}
5119 
5120 		if (page_no == start_page && dict_index_is_online_ddl(index)) {
5121 			row_log_table_blob_free(index, start_page);
5122 		}
5123 
5124 		ext_block = buf_page_get(space_id, ext_zip_size, page_no,
5125 					 RW_X_LATCH, &mtr);
5126 		buf_block_dbg_add_level(ext_block, SYNC_EXTERN_STORAGE);
5127 		page = buf_block_get_frame(ext_block);
5128 
5129 		if (ext_zip_size) {
5130 			/* Note that page_zip will be NULL
5131 			in row_purge_upd_exist_or_extern(). */
5132 			switch (fil_page_get_type(page)) {
5133 			case FIL_PAGE_TYPE_ZBLOB:
5134 			case FIL_PAGE_TYPE_ZBLOB2:
5135 				break;
5136 			default:
5137 				ut_error;
5138 			}
5139 			next_page_no = mach_read_from_4(page + FIL_PAGE_NEXT);
5140 
5141 			btr_page_free_low(index, ext_block, 0, &mtr);
5142 
5143 			if (page_zip != NULL) {
5144 				mach_write_to_4(field_ref + BTR_EXTERN_PAGE_NO,
5145 						next_page_no);
5146 				mach_write_to_4(field_ref + BTR_EXTERN_LEN + 4,
5147 						0);
5148 				page_zip_write_blob_ptr(page_zip, rec, index,
5149 							offsets, i, &mtr);
5150 			} else {
5151 				mlog_write_ulint(field_ref
5152 						 + BTR_EXTERN_PAGE_NO,
5153 						 next_page_no,
5154 						 MLOG_4BYTES, &mtr);
5155 				mlog_write_ulint(field_ref
5156 						 + BTR_EXTERN_LEN + 4, 0,
5157 						 MLOG_4BYTES, &mtr);
5158 			}
5159 		} else {
5160 			ut_a(!page_zip);
5161 			btr_check_blob_fil_page_type(space_id, page_no, page,
5162 						     FALSE);
5163 
5164 			next_page_no = mach_read_from_4(
5165 				page + FIL_PAGE_DATA
5166 				+ BTR_BLOB_HDR_NEXT_PAGE_NO);
5167 
5168 			/* We must supply the page level (= 0) as an argument
5169 			because we did not store it on the page (we save the
5170 			space overhead from an index page header. */
5171 
5172 			btr_page_free_low(index, ext_block, 0, &mtr);
5173 
5174 			mlog_write_ulint(field_ref + BTR_EXTERN_PAGE_NO,
5175 					 next_page_no,
5176 					 MLOG_4BYTES, &mtr);
5177 			/* Zero out the BLOB length.  If the server
5178 			crashes during the execution of this function,
5179 			trx_rollback_or_clean_all_recovered() could
5180 			dereference the half-deleted BLOB, fetching a
5181 			wrong prefix for the BLOB. */
5182 			mlog_write_ulint(field_ref + BTR_EXTERN_LEN + 4,
5183 					 0,
5184 					 MLOG_4BYTES, &mtr);
5185 		}
5186 
5187 		/* Commit mtr and release the BLOB block to save memory. */
5188 		btr_blob_free(ext_block, TRUE, &mtr);
5189 	}
5190 }
5191 
5192 /***********************************************************//**
5193 Frees the externally stored fields for a record. */
5194 static
5195 void
btr_rec_free_externally_stored_fields(dict_index_t * index,rec_t * rec,const ulint * offsets,page_zip_des_t * page_zip,enum trx_rb_ctx rb_ctx,mtr_t * mtr)5196 btr_rec_free_externally_stored_fields(
5197 /*==================================*/
5198 	dict_index_t*	index,	/*!< in: index of the data, the index
5199 				tree MUST be X-latched */
5200 	rec_t*		rec,	/*!< in/out: record */
5201 	const ulint*	offsets,/*!< in: rec_get_offsets(rec, index) */
5202 	page_zip_des_t*	page_zip,/*!< in: compressed page whose uncompressed
5203 				part will be updated, or NULL */
5204 	enum trx_rb_ctx	rb_ctx,	/*!< in: rollback context */
5205 	mtr_t*		mtr)	/*!< in: mini-transaction handle which contains
5206 				an X-latch to record page and to the index
5207 				tree */
5208 {
5209 	ulint	n_fields;
5210 	ulint	i;
5211 
5212 	ut_ad(rec_offs_validate(rec, index, offsets));
5213 	ut_ad(mtr_memo_contains_page(mtr, rec, MTR_MEMO_PAGE_X_FIX));
5214 	/* Free possible externally stored fields in the record */
5215 
5216 	ut_ad(dict_table_is_comp(index->table) == !!rec_offs_comp(offsets));
5217 	n_fields = rec_offs_n_fields(offsets);
5218 
5219 	for (i = 0; i < n_fields; i++) {
5220 		if (rec_offs_nth_extern(offsets, i)) {
5221 			btr_free_externally_stored_field(
5222 				index, btr_rec_get_field_ref(rec, offsets, i),
5223 				rec, offsets, page_zip, i, rb_ctx, mtr);
5224 		}
5225 	}
5226 }
5227 
5228 /***********************************************************//**
5229 Frees the externally stored fields for a record, if the field is mentioned
5230 in the update vector. */
5231 static
5232 void
btr_rec_free_updated_extern_fields(dict_index_t * index,rec_t * rec,page_zip_des_t * page_zip,const ulint * offsets,const upd_t * update,enum trx_rb_ctx rb_ctx,mtr_t * mtr)5233 btr_rec_free_updated_extern_fields(
5234 /*===============================*/
5235 	dict_index_t*	index,	/*!< in: index of rec; the index tree MUST be
5236 				X-latched */
5237 	rec_t*		rec,	/*!< in/out: record */
5238 	page_zip_des_t*	page_zip,/*!< in: compressed page whose uncompressed
5239 				part will be updated, or NULL */
5240 	const ulint*	offsets,/*!< in: rec_get_offsets(rec, index) */
5241 	const upd_t*	update,	/*!< in: update vector */
5242 	enum trx_rb_ctx	rb_ctx,	/*!< in: rollback context */
5243 	mtr_t*		mtr)	/*!< in: mini-transaction handle which contains
5244 				an X-latch to record page and to the tree */
5245 {
5246 	ulint	n_fields;
5247 	ulint	i;
5248 
5249 	ut_ad(rec_offs_validate(rec, index, offsets));
5250 	ut_ad(mtr_memo_contains_page(mtr, rec, MTR_MEMO_PAGE_X_FIX));
5251 
5252 	/* Free possible externally stored fields in the record */
5253 
5254 	n_fields = upd_get_n_fields(update);
5255 
5256 	for (i = 0; i < n_fields; i++) {
5257 		const upd_field_t* ufield = upd_get_nth_field(update, i);
5258 
5259 		if (rec_offs_nth_extern(offsets, ufield->field_no)) {
5260 			ulint	len;
5261 			byte*	data = rec_get_nth_field(
5262 				rec, offsets, ufield->field_no, &len);
5263 			ut_a(len >= BTR_EXTERN_FIELD_REF_SIZE);
5264 
5265 			btr_free_externally_stored_field(
5266 				index, data + len - BTR_EXTERN_FIELD_REF_SIZE,
5267 				rec, offsets, page_zip,
5268 				ufield->field_no, rb_ctx, mtr);
5269 		}
5270 	}
5271 }
5272 
5273 /*******************************************************************//**
5274 Copies the prefix of an uncompressed BLOB.  The clustered index record
5275 that points to this BLOB must be protected by a lock or a page latch.
5276 @return	number of bytes written to buf */
5277 static
5278 ulint
btr_copy_blob_prefix(byte * buf,ulint len,ulint space_id,ulint page_no,ulint offset)5279 btr_copy_blob_prefix(
5280 /*=================*/
5281 	byte*		buf,	/*!< out: the externally stored part of
5282 				the field, or a prefix of it */
5283 	ulint		len,	/*!< in: length of buf, in bytes */
5284 	ulint		space_id,/*!< in: space id of the BLOB pages */
5285 	ulint		page_no,/*!< in: page number of the first BLOB page */
5286 	ulint		offset)	/*!< in: offset on the first BLOB page */
5287 {
5288 	ulint	copied_len	= 0;
5289 
5290 	for (;;) {
5291 		mtr_t		mtr;
5292 		buf_block_t*	block;
5293 		const page_t*	page;
5294 		const byte*	blob_header;
5295 		ulint		part_len;
5296 		ulint		copy_len;
5297 
5298 		mtr_start(&mtr);
5299 
5300 		block = buf_page_get(space_id, 0, page_no, RW_S_LATCH, &mtr);
5301 		buf_block_dbg_add_level(block, SYNC_EXTERN_STORAGE);
5302 		page = buf_block_get_frame(block);
5303 
5304 		btr_check_blob_fil_page_type(space_id, page_no, page, TRUE);
5305 
5306 		blob_header = page + offset;
5307 		part_len = btr_blob_get_part_len(blob_header);
5308 		copy_len = ut_min(part_len, len - copied_len);
5309 
5310 		memcpy(buf + copied_len,
5311 		       blob_header + BTR_BLOB_HDR_SIZE, copy_len);
5312 		copied_len += copy_len;
5313 
5314 		page_no = btr_blob_get_next_page_no(blob_header);
5315 
5316 		mtr_commit(&mtr);
5317 
5318 		if (page_no == FIL_NULL || copy_len != part_len) {
5319 			UNIV_MEM_ASSERT_RW(buf, copied_len);
5320 			return(copied_len);
5321 		}
5322 
5323 		/* On other BLOB pages except the first the BLOB header
5324 		always is at the page data start: */
5325 
5326 		offset = FIL_PAGE_DATA;
5327 
5328 		ut_ad(copied_len <= len);
5329 	}
5330 }
5331 
5332 /*******************************************************************//**
5333 Copies the prefix of a compressed BLOB.  The clustered index record
5334 that points to this BLOB must be protected by a lock or a page latch.
5335 @return	number of bytes written to buf */
5336 static
5337 ulint
btr_copy_zblob_prefix(byte * buf,ulint len,ulint zip_size,ulint space_id,ulint page_no,ulint offset)5338 btr_copy_zblob_prefix(
5339 /*==================*/
5340 	byte*		buf,	/*!< out: the externally stored part of
5341 				the field, or a prefix of it */
5342 	ulint		len,	/*!< in: length of buf, in bytes */
5343 	ulint		zip_size,/*!< in: compressed BLOB page size */
5344 	ulint		space_id,/*!< in: space id of the BLOB pages */
5345 	ulint		page_no,/*!< in: page number of the first BLOB page */
5346 	ulint		offset)	/*!< in: offset on the first BLOB page */
5347 {
5348 	ulint		page_type = FIL_PAGE_TYPE_ZBLOB;
5349 	mem_heap_t*	heap;
5350 	int		err;
5351 	z_stream	d_stream;
5352 
5353 	d_stream.next_out = buf;
5354 	d_stream.avail_out = static_cast<uInt>(len);
5355 	d_stream.next_in = Z_NULL;
5356 	d_stream.avail_in = 0;
5357 
5358 	/* Zlib inflate needs 32 kilobytes for the default
5359 	window size, plus a few kilobytes for small objects. */
5360 	heap = mem_heap_create(40000);
5361 	page_zip_set_alloc(&d_stream, heap);
5362 
5363 	ut_ad(ut_is_2pow(zip_size));
5364 	ut_ad(zip_size >= UNIV_ZIP_SIZE_MIN);
5365 	ut_ad(zip_size <= UNIV_ZIP_SIZE_MAX);
5366 	ut_ad(space_id);
5367 
5368 	err = inflateInit(&d_stream);
5369 	ut_a(err == Z_OK);
5370 
5371 	for (;;) {
5372 		buf_page_t*	bpage;
5373 		ulint		next_page_no;
5374 
5375 		/* There is no latch on bpage directly.  Instead,
5376 		bpage is protected by the B-tree page latch that
5377 		is being held on the clustered index record, or,
5378 		in row_merge_copy_blobs(), by an exclusive table lock. */
5379 		bpage = buf_page_get_zip(space_id, zip_size, page_no);
5380 
5381 		if (UNIV_UNLIKELY(!bpage)) {
5382 			ut_print_timestamp(stderr);
5383 			fprintf(stderr,
5384 				"  InnoDB: Cannot load"
5385 				" compressed BLOB"
5386 				" page %lu space %lu\n",
5387 				(ulong) page_no, (ulong) space_id);
5388 			goto func_exit;
5389 		}
5390 
5391 		if (UNIV_UNLIKELY
5392 		    (fil_page_get_type(bpage->zip.data) != page_type)) {
5393 			ut_print_timestamp(stderr);
5394 			fprintf(stderr,
5395 				"  InnoDB: Unexpected type %lu of"
5396 				" compressed BLOB"
5397 				" page %lu space %lu\n",
5398 				(ulong) fil_page_get_type(bpage->zip.data),
5399 				(ulong) page_no, (ulong) space_id);
5400 			ut_ad(0);
5401 			goto end_of_blob;
5402 		}
5403 
5404 		next_page_no = mach_read_from_4(bpage->zip.data + offset);
5405 
5406 		if (UNIV_LIKELY(offset == FIL_PAGE_NEXT)) {
5407 			/* When the BLOB begins at page header,
5408 			the compressed data payload does not
5409 			immediately follow the next page pointer. */
5410 			offset = FIL_PAGE_DATA;
5411 		} else {
5412 			offset += 4;
5413 		}
5414 
5415 		d_stream.next_in = bpage->zip.data + offset;
5416 		d_stream.avail_in = static_cast<uInt>(zip_size - offset);
5417 
5418 		err = inflate(&d_stream, Z_NO_FLUSH);
5419 		switch (err) {
5420 		case Z_OK:
5421 			if (!d_stream.avail_out) {
5422 				goto end_of_blob;
5423 			}
5424 			break;
5425 		case Z_STREAM_END:
5426 			if (next_page_no == FIL_NULL) {
5427 				goto end_of_blob;
5428 			}
5429 			/* fall through */
5430 		default:
5431 inflate_error:
5432 			ut_print_timestamp(stderr);
5433 			fprintf(stderr,
5434 				"  InnoDB: inflate() of"
5435 				" compressed BLOB"
5436 				" page %lu space %lu returned %d (%s)\n",
5437 				(ulong) page_no, (ulong) space_id,
5438 				err, d_stream.msg);
5439 		case Z_BUF_ERROR:
5440 			goto end_of_blob;
5441 		}
5442 
5443 		if (next_page_no == FIL_NULL) {
5444 			if (!d_stream.avail_in) {
5445 				ut_print_timestamp(stderr);
5446 				fprintf(stderr,
5447 					"  InnoDB: unexpected end of"
5448 					" compressed BLOB"
5449 					" page %lu space %lu\n",
5450 					(ulong) page_no,
5451 					(ulong) space_id);
5452 			} else {
5453 				err = inflate(&d_stream, Z_FINISH);
5454 				switch (err) {
5455 				case Z_STREAM_END:
5456 				case Z_BUF_ERROR:
5457 					break;
5458 				default:
5459 					goto inflate_error;
5460 				}
5461 			}
5462 
5463 end_of_blob:
5464 			buf_page_release_zip(bpage);
5465 			goto func_exit;
5466 		}
5467 
5468 		buf_page_release_zip(bpage);
5469 
5470 		/* On other BLOB pages except the first
5471 		the BLOB header always is at the page header: */
5472 
5473 		page_no = next_page_no;
5474 		offset = FIL_PAGE_NEXT;
5475 		page_type = FIL_PAGE_TYPE_ZBLOB2;
5476 	}
5477 
5478 func_exit:
5479 	inflateEnd(&d_stream);
5480 	mem_heap_free(heap);
5481 	UNIV_MEM_ASSERT_RW(buf, d_stream.total_out);
5482 	return(d_stream.total_out);
5483 }
5484 
5485 /*******************************************************************//**
5486 Copies the prefix of an externally stored field of a record.  The
5487 clustered index record that points to this BLOB must be protected by a
5488 lock or a page latch.
5489 @return	number of bytes written to buf */
5490 static
5491 ulint
btr_copy_externally_stored_field_prefix_low(byte * buf,ulint len,ulint zip_size,ulint space_id,ulint page_no,ulint offset)5492 btr_copy_externally_stored_field_prefix_low(
5493 /*========================================*/
5494 	byte*		buf,	/*!< out: the externally stored part of
5495 				the field, or a prefix of it */
5496 	ulint		len,	/*!< in: length of buf, in bytes */
5497 	ulint		zip_size,/*!< in: nonzero=compressed BLOB page size,
5498 				zero for uncompressed BLOBs */
5499 	ulint		space_id,/*!< in: space id of the first BLOB page */
5500 	ulint		page_no,/*!< in: page number of the first BLOB page */
5501 	ulint		offset)	/*!< in: offset on the first BLOB page */
5502 {
5503 	if (UNIV_UNLIKELY(len == 0)) {
5504 		return(0);
5505 	}
5506 
5507 	if (zip_size) {
5508 		return(btr_copy_zblob_prefix(buf, len, zip_size,
5509 					     space_id, page_no, offset));
5510 	} else {
5511 		return(btr_copy_blob_prefix(buf, len, space_id,
5512 					    page_no, offset));
5513 	}
5514 }
5515 
5516 /*******************************************************************//**
5517 Copies the prefix of an externally stored field of a record.  The
5518 clustered index record must be protected by a lock or a page latch.
5519 @return the length of the copied field, or 0 if the column was being
5520 or has been deleted */
5521 UNIV_INTERN
5522 ulint
btr_copy_externally_stored_field_prefix(byte * buf,ulint len,ulint zip_size,const byte * data,ulint local_len)5523 btr_copy_externally_stored_field_prefix(
5524 /*====================================*/
5525 	byte*		buf,	/*!< out: the field, or a prefix of it */
5526 	ulint		len,	/*!< in: length of buf, in bytes */
5527 	ulint		zip_size,/*!< in: nonzero=compressed BLOB page size,
5528 				zero for uncompressed BLOBs */
5529 	const byte*	data,	/*!< in: 'internally' stored part of the
5530 				field containing also the reference to
5531 				the external part; must be protected by
5532 				a lock or a page latch */
5533 	ulint		local_len)/*!< in: length of data, in bytes */
5534 {
5535 	ulint	space_id;
5536 	ulint	page_no;
5537 	ulint	offset;
5538 
5539 	ut_a(local_len >= BTR_EXTERN_FIELD_REF_SIZE);
5540 
5541 	local_len -= BTR_EXTERN_FIELD_REF_SIZE;
5542 
5543 	if (UNIV_UNLIKELY(local_len >= len)) {
5544 		memcpy(buf, data, len);
5545 		return(len);
5546 	}
5547 
5548 	memcpy(buf, data, local_len);
5549 	data += local_len;
5550 
5551 	ut_a(memcmp(data, field_ref_zero, BTR_EXTERN_FIELD_REF_SIZE));
5552 
5553 	if (!mach_read_from_4(data + BTR_EXTERN_LEN + 4)) {
5554 		/* The externally stored part of the column has been
5555 		(partially) deleted.  Signal the half-deleted BLOB
5556 		to the caller. */
5557 
5558 		return(0);
5559 	}
5560 
5561 	space_id = mach_read_from_4(data + BTR_EXTERN_SPACE_ID);
5562 
5563 	page_no = mach_read_from_4(data + BTR_EXTERN_PAGE_NO);
5564 
5565 	offset = mach_read_from_4(data + BTR_EXTERN_OFFSET);
5566 
5567 	return(local_len
5568 	       + btr_copy_externally_stored_field_prefix_low(buf + local_len,
5569 							     len - local_len,
5570 							     zip_size,
5571 							     space_id, page_no,
5572 							     offset));
5573 }
5574 
5575 /*******************************************************************//**
5576 Copies an externally stored field of a record to mem heap.  The
5577 clustered index record must be protected by a lock or a page latch.
5578 @return	the whole field copied to heap */
5579 UNIV_INTERN
5580 byte*
btr_copy_externally_stored_field(ulint * len,const byte * data,ulint zip_size,ulint local_len,mem_heap_t * heap)5581 btr_copy_externally_stored_field(
5582 /*=============================*/
5583 	ulint*		len,	/*!< out: length of the whole field */
5584 	const byte*	data,	/*!< in: 'internally' stored part of the
5585 				field containing also the reference to
5586 				the external part; must be protected by
5587 				a lock or a page latch */
5588 	ulint		zip_size,/*!< in: nonzero=compressed BLOB page size,
5589 				zero for uncompressed BLOBs */
5590 	ulint		local_len,/*!< in: length of data */
5591 	mem_heap_t*	heap)	/*!< in: mem heap */
5592 {
5593 	ulint	space_id;
5594 	ulint	page_no;
5595 	ulint	offset;
5596 	ulint	extern_len;
5597 	byte*	buf;
5598 
5599 	ut_a(local_len >= BTR_EXTERN_FIELD_REF_SIZE);
5600 
5601 	local_len -= BTR_EXTERN_FIELD_REF_SIZE;
5602 
5603 	space_id = mach_read_from_4(data + local_len + BTR_EXTERN_SPACE_ID);
5604 
5605 	page_no = mach_read_from_4(data + local_len + BTR_EXTERN_PAGE_NO);
5606 
5607 	offset = mach_read_from_4(data + local_len + BTR_EXTERN_OFFSET);
5608 
5609 	/* Currently a BLOB cannot be bigger than 4 GB; we
5610 	leave the 4 upper bytes in the length field unused */
5611 
5612 	extern_len = mach_read_from_4(data + local_len + BTR_EXTERN_LEN + 4);
5613 
5614 	buf = (byte*) mem_heap_alloc(heap, local_len + extern_len);
5615 
5616 	memcpy(buf, data, local_len);
5617 	*len = local_len
5618 		+ btr_copy_externally_stored_field_prefix_low(buf + local_len,
5619 							      extern_len,
5620 							      zip_size,
5621 							      space_id,
5622 							      page_no, offset);
5623 
5624 	return(buf);
5625 }
5626 
5627 /*******************************************************************//**
5628 Copies an externally stored field of a record to mem heap.
5629 @return	the field copied to heap, or NULL if the field is incomplete */
5630 UNIV_INTERN
5631 byte*
btr_rec_copy_externally_stored_field(const rec_t * rec,const ulint * offsets,ulint zip_size,ulint no,ulint * len,mem_heap_t * heap)5632 btr_rec_copy_externally_stored_field(
5633 /*=================================*/
5634 	const rec_t*	rec,	/*!< in: record in a clustered index;
5635 				must be protected by a lock or a page latch */
5636 	const ulint*	offsets,/*!< in: array returned by rec_get_offsets() */
5637 	ulint		zip_size,/*!< in: nonzero=compressed BLOB page size,
5638 				zero for uncompressed BLOBs */
5639 	ulint		no,	/*!< in: field number */
5640 	ulint*		len,	/*!< out: length of the field */
5641 	mem_heap_t*	heap)	/*!< in: mem heap */
5642 {
5643 	ulint		local_len;
5644 	const byte*	data;
5645 
5646 	ut_a(rec_offs_nth_extern(offsets, no));
5647 
5648 	/* An externally stored field can contain some initial
5649 	data from the field, and in the last 20 bytes it has the
5650 	space id, page number, and offset where the rest of the
5651 	field data is stored, and the data length in addition to
5652 	the data stored locally. We may need to store some data
5653 	locally to get the local record length above the 128 byte
5654 	limit so that field offsets are stored in two bytes, and
5655 	the extern bit is available in those two bytes. */
5656 
5657 	data = rec_get_nth_field(rec, offsets, no, &local_len);
5658 
5659 	ut_a(local_len >= BTR_EXTERN_FIELD_REF_SIZE);
5660 
5661 	if (UNIV_UNLIKELY
5662 	    (!memcmp(data + local_len - BTR_EXTERN_FIELD_REF_SIZE,
5663 		     field_ref_zero, BTR_EXTERN_FIELD_REF_SIZE))) {
5664 		/* The externally stored field was not written yet.
5665 		This record should only be seen by
5666 		recv_recovery_rollback_active() or any
5667 		TRX_ISO_READ_UNCOMMITTED transactions. */
5668 		return(NULL);
5669 	}
5670 
5671 	return(btr_copy_externally_stored_field(len, data,
5672 						zip_size, local_len, heap));
5673 }
5674 #endif /* !UNIV_HOTBACKUP */
5675