1 /*****************************************************************************
2 
3 Copyright (c) 1994, 2015, Oracle and/or its affiliates. All Rights Reserved.
4 Copyright (c) 2008, Google Inc.
5 
6 Portions of this file contain modifications contributed and copyrighted by
7 Google, Inc. Those modifications are gratefully acknowledged and are described
8 briefly in the InnoDB documentation. The contributions by Google are
9 incorporated with their permission, and subject to the conditions contained in
10 the file COPYING.Google.
11 
12 This program is free software; you can redistribute it and/or modify it under
13 the terms of the GNU General Public License as published by the Free Software
14 Foundation; version 2 of the License.
15 
16 This program is distributed in the hope that it will be useful, but WITHOUT
17 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
18 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
19 
20 You should have received a copy of the GNU General Public License along with
21 this program; if not, write to the Free Software Foundation, Inc.,
22 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
23 
24 *****************************************************************************/
25 
26 /**************************************************//**
27 @file btr/btr0cur.c
28 The index tree cursor
29 
30 All changes that row operations make to a B-tree or the records
31 there must go through this module! Undo log records are written here
32 of every modify or insert of a clustered index record.
33 
34 			NOTE!!!
35 To make sure we do not run out of disk space during a pessimistic
36 insert or update, we have to reserve 2 x the height of the index tree
37 many pages in the tablespace before we start the operation, because
38 if leaf splitting has been started, it is difficult to undo, except
39 by crashing the database and doing a roll-forward.
40 
41 Created 10/16/1994 Heikki Tuuri
42 *******************************************************/
43 
44 #include "btr0cur.h"
45 
46 #ifdef UNIV_NONINL
47 #include "btr0cur.ic"
48 #endif
49 
50 #include "row0upd.h"
51 #ifndef UNIV_HOTBACKUP
52 #include "mtr0log.h"
53 #include "page0page.h"
54 #include "page0zip.h"
55 #include "rem0rec.h"
56 #include "rem0cmp.h"
57 #include "buf0lru.h"
58 #include "btr0btr.h"
59 #include "btr0sea.h"
60 #include "row0purge.h"
61 #include "row0upd.h"
62 #include "trx0rec.h"
63 #include "trx0roll.h" /* trx_is_recv() */
64 #include "trx0undo.h"
65 #include "que0que.h"
66 #include "row0row.h"
67 #include "srv0srv.h"
68 #include "ibuf0ibuf.h"
69 #include "lock0lock.h"
70 #include "zlib.h"
71 
72 /** Buffered B-tree operation types, introduced as part of delete buffering. */
73 typedef enum btr_op_enum {
74 	BTR_NO_OP = 0,			/*!< Not buffered */
75 	BTR_INSERT_OP,			/*!< Insert, do not ignore UNIQUE */
76 	BTR_INSERT_IGNORE_UNIQUE_OP,	/*!< Insert, ignoring UNIQUE */
77 	BTR_DELETE_OP,			/*!< Purge a delete-marked record */
78 	BTR_DELMARK_OP			/*!< Mark a record for deletion */
79 } btr_op_t;
80 
81 #ifdef UNIV_DEBUG
82 /** If the following is set to TRUE, this module prints a lot of
83 trace information of individual record operations */
84 UNIV_INTERN ibool	btr_cur_print_record_ops = FALSE;
85 #endif /* UNIV_DEBUG */
86 
87 /** Number of searches down the B-tree in btr_cur_search_to_nth_level(). */
88 UNIV_INTERN ulint	btr_cur_n_non_sea	= 0;
89 /** Number of successful adaptive hash index lookups in
90 btr_cur_search_to_nth_level(). */
91 UNIV_INTERN ulint	btr_cur_n_sea		= 0;
92 /** Old value of btr_cur_n_non_sea.  Copied by
93 srv_refresh_innodb_monitor_stats().  Referenced by
94 srv_printf_innodb_monitor(). */
95 UNIV_INTERN ulint	btr_cur_n_non_sea_old	= 0;
96 /** Old value of btr_cur_n_sea.  Copied by
97 srv_refresh_innodb_monitor_stats().  Referenced by
98 srv_printf_innodb_monitor(). */
99 UNIV_INTERN ulint	btr_cur_n_sea_old	= 0;
100 
101 #ifdef UNIV_DEBUG
102 /* Flag to limit optimistic insert records */
103 UNIV_INTERN uint	btr_cur_limit_optimistic_insert_debug = 0;
104 #endif /* UNIV_DEBUG */
105 
106 /** In the optimistic insert, if the insert does not fit, but this much space
107 can be released by page reorganize, then it is reorganized */
108 #define BTR_CUR_PAGE_REORGANIZE_LIMIT	(UNIV_PAGE_SIZE / 32)
109 
110 /** The structure of a BLOB part header */
111 /* @{ */
112 /*--------------------------------------*/
113 #define BTR_BLOB_HDR_PART_LEN		0	/*!< BLOB part len on this
114 						page */
115 #define BTR_BLOB_HDR_NEXT_PAGE_NO	4	/*!< next BLOB part page no,
116 						FIL_NULL if none */
117 /*--------------------------------------*/
118 #define BTR_BLOB_HDR_SIZE		8	/*!< Size of a BLOB
119 						part header, in bytes */
120 
121 /** Estimated table level stats from sampled value.
122 @param value		sampled stats
123 @param index		index being sampled
124 @param sample		number of sampled rows
125 @param ext_size		external stored data size
126 @param not_empty	table not empty
127 @return estimated table wide stats from sampled value */
128 #define BTR_TABLE_STATS_FROM_SAMPLE(value, index, sample, ext_size, not_empty)\
129 	(((value) * (ib_int64_t) index->stat_n_leaf_pages		\
130 	  + (sample) - 1 + (ext_size) + (not_empty)) / ((sample) + (ext_size)))
131 
132 /* @} */
133 #endif /* !UNIV_HOTBACKUP */
134 
135 /** A BLOB field reference full of zero, for use in assertions and tests.
136 Initially, BLOB field references are set to zero, in
137 dtuple_convert_big_rec(). */
138 UNIV_INTERN const byte field_ref_zero[BTR_EXTERN_FIELD_REF_SIZE];
139 
140 #ifndef UNIV_HOTBACKUP
141 /*******************************************************************//**
142 Marks all extern fields in a record as owned by the record. This function
143 should be called if the delete mark of a record is removed: a not delete
144 marked record always owns all its extern fields. */
145 static
146 void
147 btr_cur_unmark_extern_fields(
148 /*=========================*/
149 	page_zip_des_t*	page_zip,/*!< in/out: compressed page whose uncompressed
150 				part will be updated, or NULL */
151 	rec_t*		rec,	/*!< in/out: record in a clustered index */
152 	dict_index_t*	index,	/*!< in: index of the page */
153 	const ulint*	offsets,/*!< in: array returned by rec_get_offsets() */
154 	mtr_t*		mtr);	/*!< in: mtr, or NULL if not logged */
155 /*******************************************************************//**
156 Adds path information to the cursor for the current page, for which
157 the binary search has been performed. */
158 static
159 void
160 btr_cur_add_path_info(
161 /*==================*/
162 	btr_cur_t*	cursor,		/*!< in: cursor positioned on a page */
163 	ulint		height,		/*!< in: height of the page in tree;
164 					0 means leaf node */
165 	ulint		root_height);	/*!< in: root node height in tree */
166 /***********************************************************//**
167 Frees the externally stored fields for a record, if the field is mentioned
168 in the update vector. */
169 static
170 void
171 btr_rec_free_updated_extern_fields(
172 /*===============================*/
173 	dict_index_t*	index,	/*!< in: index of rec; the index tree MUST be
174 				X-latched */
175 	rec_t*		rec,	/*!< in: record */
176 	page_zip_des_t*	page_zip,/*!< in: compressed page whose uncompressed
177 				part will be updated, or NULL */
178 	const ulint*	offsets,/*!< in: rec_get_offsets(rec, index) */
179 	const upd_t*	update,	/*!< in: update vector */
180 	enum trx_rb_ctx	rb_ctx,	/*!< in: rollback context */
181 	mtr_t*		mtr);	/*!< in: mini-transaction handle which contains
182 				an X-latch to record page and to the tree */
183 /***********************************************************//**
184 Frees the externally stored fields for a record. */
185 static
186 void
187 btr_rec_free_externally_stored_fields(
188 /*==================================*/
189 	dict_index_t*	index,	/*!< in: index of the data, the index
190 				tree MUST be X-latched */
191 	rec_t*		rec,	/*!< in: record */
192 	const ulint*	offsets,/*!< in: rec_get_offsets(rec, index) */
193 	page_zip_des_t*	page_zip,/*!< in: compressed page whose uncompressed
194 				part will be updated, or NULL */
195 	enum trx_rb_ctx	rb_ctx,	/*!< in: rollback context */
196 	mtr_t*		mtr);	/*!< in: mini-transaction handle which contains
197 				an X-latch to record page and to the index
198 				tree */
199 /***********************************************************//**
200 Gets the externally stored size of a record, in units of a database page.
201 @return	externally stored part, in units of a database page */
202 static
203 ulint
204 btr_rec_get_externally_stored_len(
205 /*==============================*/
206 	const rec_t*	rec,	/*!< in: record */
207 	const ulint*	offsets);/*!< in: array returned by rec_get_offsets() */
208 #endif /* !UNIV_HOTBACKUP */
209 
210 /******************************************************//**
211 The following function is used to set the deleted bit of a record. */
212 UNIV_INLINE
213 void
btr_rec_set_deleted_flag(rec_t * rec,page_zip_des_t * page_zip,ulint flag)214 btr_rec_set_deleted_flag(
215 /*=====================*/
216 	rec_t*		rec,	/*!< in/out: physical record */
217 	page_zip_des_t*	page_zip,/*!< in/out: compressed page (or NULL) */
218 	ulint		flag)	/*!< in: nonzero if delete marked */
219 {
220 	if (page_rec_is_comp(rec)) {
221 		rec_set_deleted_flag_new(rec, page_zip, flag);
222 	} else {
223 		ut_ad(!page_zip);
224 		rec_set_deleted_flag_old(rec, flag);
225 	}
226 }
227 
228 #ifndef UNIV_HOTBACKUP
229 /*==================== B-TREE SEARCH =========================*/
230 
231 /********************************************************************//**
232 Latches the leaf page or pages requested. */
233 static
234 void
btr_cur_latch_leaves(page_t * page,ulint space,ulint zip_size,ulint page_no,ulint latch_mode,btr_cur_t * cursor,mtr_t * mtr)235 btr_cur_latch_leaves(
236 /*=================*/
237 	page_t*		page,		/*!< in: leaf page where the search
238 					converged */
239 	ulint		space,		/*!< in: space id */
240 	ulint		zip_size,	/*!< in: compressed page size in bytes
241 					or 0 for uncompressed pages */
242 	ulint		page_no,	/*!< in: page number of the leaf */
243 	ulint		latch_mode,	/*!< in: BTR_SEARCH_LEAF, ... */
244 	btr_cur_t*	cursor,		/*!< in: cursor */
245 	mtr_t*		mtr)		/*!< in: mtr */
246 {
247 	ulint		mode;
248 	ulint		left_page_no;
249 	ulint		right_page_no;
250 	buf_block_t*	get_block;
251 
252 	ut_ad(page && mtr);
253 
254 	switch (latch_mode) {
255 	case BTR_SEARCH_LEAF:
256 	case BTR_MODIFY_LEAF:
257 		mode = latch_mode == BTR_SEARCH_LEAF ? RW_S_LATCH : RW_X_LATCH;
258 		get_block = btr_block_get(
259 			space, zip_size, page_no, mode, cursor->index, mtr);
260 #ifdef UNIV_BTR_DEBUG
261 		ut_a(page_is_comp(get_block->frame) == page_is_comp(page));
262 #endif /* UNIV_BTR_DEBUG */
263 		get_block->check_index_page_at_flush = TRUE;
264 		return;
265 	case BTR_MODIFY_TREE:
266 		/* x-latch also brothers from left to right */
267 		left_page_no = btr_page_get_prev(page, mtr);
268 
269 		if (left_page_no != FIL_NULL) {
270 			get_block = btr_block_get(
271 				space, zip_size, left_page_no,
272 				RW_X_LATCH, cursor->index, mtr);
273 #ifdef UNIV_BTR_DEBUG
274 			ut_a(page_is_comp(get_block->frame)
275 			     == page_is_comp(page));
276 			ut_a(btr_page_get_next(get_block->frame, mtr)
277 			     == page_get_page_no(page));
278 #endif /* UNIV_BTR_DEBUG */
279 			get_block->check_index_page_at_flush = TRUE;
280 		}
281 
282 		get_block = btr_block_get(
283 			space, zip_size, page_no,
284 			RW_X_LATCH, cursor->index, mtr);
285 #ifdef UNIV_BTR_DEBUG
286 		ut_a(page_is_comp(get_block->frame) == page_is_comp(page));
287 #endif /* UNIV_BTR_DEBUG */
288 		get_block->check_index_page_at_flush = TRUE;
289 
290 		right_page_no = btr_page_get_next(page, mtr);
291 
292 		if (right_page_no != FIL_NULL) {
293 			get_block = btr_block_get(
294 				space, zip_size, right_page_no,
295 				RW_X_LATCH, cursor->index, mtr);
296 #ifdef UNIV_BTR_DEBUG
297 			ut_a(page_is_comp(get_block->frame)
298 			     == page_is_comp(page));
299 			ut_a(btr_page_get_prev(get_block->frame, mtr)
300 			     == page_get_page_no(page));
301 #endif /* UNIV_BTR_DEBUG */
302 			get_block->check_index_page_at_flush = TRUE;
303 		}
304 
305 		return;
306 
307 	case BTR_SEARCH_PREV:
308 	case BTR_MODIFY_PREV:
309 		mode = latch_mode == BTR_SEARCH_PREV ? RW_S_LATCH : RW_X_LATCH;
310 		/* latch also left brother */
311 		left_page_no = btr_page_get_prev(page, mtr);
312 
313 		if (left_page_no != FIL_NULL) {
314 			get_block = btr_block_get(
315 				space, zip_size,
316 				left_page_no, mode, cursor->index, mtr);
317 			cursor->left_block = get_block;
318 #ifdef UNIV_BTR_DEBUG
319 			ut_a(page_is_comp(get_block->frame)
320 			     == page_is_comp(page));
321 			ut_a(btr_page_get_next(get_block->frame, mtr)
322 			     == page_get_page_no(page));
323 #endif /* UNIV_BTR_DEBUG */
324 			get_block->check_index_page_at_flush = TRUE;
325 		}
326 
327 		get_block = btr_block_get(
328 			space, zip_size, page_no, mode, cursor->index, mtr);
329 #ifdef UNIV_BTR_DEBUG
330 		ut_a(page_is_comp(get_block->frame) == page_is_comp(page));
331 #endif /* UNIV_BTR_DEBUG */
332 		get_block->check_index_page_at_flush = TRUE;
333 		return;
334 	}
335 
336 	ut_error;
337 }
338 
339 /********************************************************************//**
340 Searches an index tree and positions a tree cursor on a given level.
341 NOTE: n_fields_cmp in tuple must be set so that it cannot be compared
342 to node pointer page number fields on the upper levels of the tree!
343 Note that if mode is PAGE_CUR_LE, which is used in inserts, then
344 cursor->up_match and cursor->low_match both will have sensible values.
345 If mode is PAGE_CUR_GE, then up_match will a have a sensible value.
346 
347 If mode is PAGE_CUR_LE , cursor is left at the place where an insert of the
348 search tuple should be performed in the B-tree. InnoDB does an insert
349 immediately after the cursor. Thus, the cursor may end up on a user record,
350 or on a page infimum record. */
351 UNIV_INTERN
352 void
btr_cur_search_to_nth_level(dict_index_t * index,ulint level,const dtuple_t * tuple,ulint mode,ulint latch_mode,btr_cur_t * cursor,ulint has_search_latch,const char * file,ulint line,mtr_t * mtr)353 btr_cur_search_to_nth_level(
354 /*========================*/
355 	dict_index_t*	index,	/*!< in: index */
356 	ulint		level,	/*!< in: the tree level of search */
357 	const dtuple_t*	tuple,	/*!< in: data tuple; NOTE: n_fields_cmp in
358 				tuple must be set so that it cannot get
359 				compared to the node ptr page number field! */
360 	ulint		mode,	/*!< in: PAGE_CUR_L, ...;
361 				Inserts should always be made using
362 				PAGE_CUR_LE to search the position! */
363 	ulint		latch_mode, /*!< in: BTR_SEARCH_LEAF, ..., ORed with
364 				at most one of BTR_INSERT, BTR_DELETE_MARK,
365 				BTR_DELETE, or BTR_ESTIMATE;
366 				cursor->left_block is used to store a pointer
367 				to the left neighbor page, in the cases
368 				BTR_SEARCH_PREV and BTR_MODIFY_PREV;
369 				NOTE that if has_search_latch
370 				is != 0, we maybe do not have a latch set
371 				on the cursor page, we assume
372 				the caller uses his search latch
373 				to protect the record! */
374 	btr_cur_t*	cursor, /*!< in/out: tree cursor; the cursor page is
375 				s- or x-latched, but see also above! */
376 	ulint		has_search_latch,/*!< in: info on the latch mode the
377 				caller currently has on btr_search_latch:
378 				RW_S_LATCH, or 0 */
379 	const char*	file,	/*!< in: file name */
380 	ulint		line,	/*!< in: line where called */
381 	mtr_t*		mtr)	/*!< in: mtr */
382 {
383 	page_t*		page;
384 	buf_block_t*	block;
385 	ulint		space;
386 	buf_block_t*	guess;
387 	ulint		height;
388 	ulint		page_no;
389 	ulint		up_match;
390 	ulint		up_bytes;
391 	ulint		low_match;
392 	ulint		low_bytes;
393 	ulint		savepoint;
394 	ulint		rw_latch;
395 	ulint		page_mode;
396 	ulint		buf_mode;
397 	ulint		estimate;
398 	ulint		zip_size;
399 	page_cur_t*	page_cursor;
400 	btr_op_t	btr_op;
401 	ulint		root_height = 0; /* remove warning */
402 
403 #ifdef BTR_CUR_ADAPT
404 	btr_search_t*	info;
405 #endif
406 	mem_heap_t*	heap		= NULL;
407 	ulint		offsets_[REC_OFFS_NORMAL_SIZE];
408 	ulint*		offsets		= offsets_;
409 	rec_offs_init(offsets_);
410 	/* Currently, PAGE_CUR_LE is the only search mode used for searches
411 	ending to upper levels */
412 
413 	ut_ad(level == 0 || mode == PAGE_CUR_LE);
414 	ut_ad(dict_index_check_search_tuple(index, tuple));
415 	ut_ad(!dict_index_is_ibuf(index) || ibuf_inside(mtr));
416 	ut_ad(dtuple_check_typed(tuple));
417 	ut_ad(index->page != FIL_NULL);
418 
419 	UNIV_MEM_INVALID(&cursor->up_match, sizeof cursor->up_match);
420 	UNIV_MEM_INVALID(&cursor->up_bytes, sizeof cursor->up_bytes);
421 	UNIV_MEM_INVALID(&cursor->low_match, sizeof cursor->low_match);
422 	UNIV_MEM_INVALID(&cursor->low_bytes, sizeof cursor->low_bytes);
423 #ifdef UNIV_DEBUG
424 	cursor->up_match = ULINT_UNDEFINED;
425 	cursor->low_match = ULINT_UNDEFINED;
426 #endif
427 
428 	/* These flags are mutually exclusive, they are lumped together
429 	with the latch mode for historical reasons. It's possible for
430 	none of the flags to be set. */
431 	switch (UNIV_EXPECT(latch_mode
432 			    & (BTR_INSERT | BTR_DELETE | BTR_DELETE_MARK),
433 			    0)) {
434 	case 0:
435 		btr_op = BTR_NO_OP;
436 		break;
437 	case BTR_INSERT:
438 		btr_op = (latch_mode & BTR_IGNORE_SEC_UNIQUE)
439 			? BTR_INSERT_IGNORE_UNIQUE_OP
440 			: BTR_INSERT_OP;
441 		break;
442 	case BTR_DELETE:
443 		btr_op = BTR_DELETE_OP;
444 		ut_a(cursor->purge_node);
445 		break;
446 	case BTR_DELETE_MARK:
447 		btr_op = BTR_DELMARK_OP;
448 		break;
449 	default:
450 		/* only one of BTR_INSERT, BTR_DELETE, BTR_DELETE_MARK
451 		should be specified at a time */
452 		ut_error;
453 	}
454 
455 	/* Operations on the insert buffer tree cannot be buffered. */
456 	ut_ad(btr_op == BTR_NO_OP || !dict_index_is_ibuf(index));
457 	/* Operations on the clustered index cannot be buffered. */
458 	ut_ad(btr_op == BTR_NO_OP || !dict_index_is_clust(index));
459 
460 	estimate = latch_mode & BTR_ESTIMATE;
461 
462 	/* Turn the flags unrelated to the latch mode off. */
463 	latch_mode &= ~(BTR_INSERT
464 			| BTR_DELETE_MARK
465 			| BTR_DELETE
466 			| BTR_ESTIMATE
467 			| BTR_IGNORE_SEC_UNIQUE);
468 
469 	cursor->flag = BTR_CUR_BINARY;
470 	cursor->index = index;
471 
472 #ifndef BTR_CUR_ADAPT
473 	guess = NULL;
474 #else
475 	info = btr_search_get_info(index);
476 
477 	guess = info->root_guess;
478 
479 #ifdef BTR_CUR_HASH_ADAPT
480 
481 #ifdef UNIV_SEARCH_PERF_STAT
482 	info->n_searches++;
483 #endif
484 	if (rw_lock_get_writer(&btr_search_latch) == RW_LOCK_NOT_LOCKED
485 	    && latch_mode <= BTR_MODIFY_LEAF
486 	    && info->last_hash_succ
487 	    && !estimate
488 #ifdef PAGE_CUR_LE_OR_EXTENDS
489 	    && mode != PAGE_CUR_LE_OR_EXTENDS
490 #endif /* PAGE_CUR_LE_OR_EXTENDS */
491 	    /* If !has_search_latch, we do a dirty read of
492 	    btr_search_enabled below, and btr_search_guess_on_hash()
493 	    will have to check it again. */
494 	    && UNIV_LIKELY(btr_search_enabled)
495 	    && btr_search_guess_on_hash(index, info, tuple, mode,
496 					latch_mode, cursor,
497 					has_search_latch, mtr)) {
498 
499 		/* Search using the hash index succeeded */
500 
501 		ut_ad(cursor->up_match != ULINT_UNDEFINED
502 		      || mode != PAGE_CUR_GE);
503 		ut_ad(cursor->up_match != ULINT_UNDEFINED
504 		      || mode != PAGE_CUR_LE);
505 		ut_ad(cursor->low_match != ULINT_UNDEFINED
506 		      || mode != PAGE_CUR_LE);
507 		btr_cur_n_sea++;
508 
509 		return;
510 	}
511 #endif /* BTR_CUR_HASH_ADAPT */
512 #endif /* BTR_CUR_ADAPT */
513 	btr_cur_n_non_sea++;
514 
515 	/* If the hash search did not succeed, do binary search down the
516 	tree */
517 
518 	if (has_search_latch) {
519 		/* Release possible search latch to obey latching order */
520 		rw_lock_s_unlock(&btr_search_latch);
521 	}
522 
523 	/* Store the position of the tree latch we push to mtr so that we
524 	know how to release it when we have latched leaf node(s) */
525 
526 	savepoint = mtr_set_savepoint(mtr);
527 
528 	if (latch_mode == BTR_MODIFY_TREE) {
529 		mtr_x_lock(dict_index_get_lock(index), mtr);
530 
531 	} else if (latch_mode == BTR_CONT_MODIFY_TREE) {
532 		/* Do nothing */
533 		ut_ad(mtr_memo_contains(mtr, dict_index_get_lock(index),
534 					MTR_MEMO_X_LOCK));
535 	} else {
536 		mtr_s_lock(dict_index_get_lock(index), mtr);
537 	}
538 
539 	page_cursor = btr_cur_get_page_cur(cursor);
540 
541 	space = dict_index_get_space(index);
542 	page_no = dict_index_get_page(index);
543 
544 	up_match = 0;
545 	up_bytes = 0;
546 	low_match = 0;
547 	low_bytes = 0;
548 
549 	height = ULINT_UNDEFINED;
550 
551 	/* We use these modified search modes on non-leaf levels of the
552 	B-tree. These let us end up in the right B-tree leaf. In that leaf
553 	we use the original search mode. */
554 
555 	switch (mode) {
556 	case PAGE_CUR_GE:
557 		page_mode = PAGE_CUR_L;
558 		break;
559 	case PAGE_CUR_G:
560 		page_mode = PAGE_CUR_LE;
561 		break;
562 	default:
563 #ifdef PAGE_CUR_LE_OR_EXTENDS
564 		ut_ad(mode == PAGE_CUR_L || mode == PAGE_CUR_LE
565 		      || mode == PAGE_CUR_LE_OR_EXTENDS);
566 #else /* PAGE_CUR_LE_OR_EXTENDS */
567 		ut_ad(mode == PAGE_CUR_L || mode == PAGE_CUR_LE);
568 #endif /* PAGE_CUR_LE_OR_EXTENDS */
569 		page_mode = mode;
570 		break;
571 	}
572 
573 	/* Loop and search until we arrive at the desired level */
574 
575 search_loop:
576 	buf_mode = BUF_GET;
577 	rw_latch = RW_NO_LATCH;
578 
579 	if (height != 0) {
580 		/* We are about to fetch the root or a non-leaf page. */
581 	} else if (latch_mode <= BTR_MODIFY_LEAF) {
582 		rw_latch = latch_mode;
583 
584 		if (btr_op != BTR_NO_OP
585 		    && ibuf_should_try(index, btr_op != BTR_INSERT_OP)) {
586 
587 			/* Try to buffer the operation if the leaf
588 			page is not in the buffer pool. */
589 
590 			buf_mode = btr_op == BTR_DELETE_OP
591 				? BUF_GET_IF_IN_POOL_OR_WATCH
592 				: BUF_GET_IF_IN_POOL;
593 		}
594 	}
595 
596 	zip_size = dict_table_zip_size(index->table);
597 
598 retry_page_get:
599 	block = buf_page_get_gen(
600 		space, zip_size, page_no, rw_latch, guess, buf_mode,
601 		file, line, mtr);
602 
603 	if (block == NULL) {
604 		/* This must be a search to perform an insert/delete
605 		mark/ delete; try using the insert/delete buffer */
606 
607 		ut_ad(height == 0);
608 		ut_ad(cursor->thr);
609 
610 		switch (btr_op) {
611 		case BTR_INSERT_OP:
612 		case BTR_INSERT_IGNORE_UNIQUE_OP:
613 			ut_ad(buf_mode == BUF_GET_IF_IN_POOL);
614 
615 			if (ibuf_insert(IBUF_OP_INSERT, tuple, index,
616 					space, zip_size, page_no,
617 					cursor->thr)) {
618 
619 				cursor->flag = BTR_CUR_INSERT_TO_IBUF;
620 
621 				goto func_exit;
622 			}
623 			break;
624 
625 		case BTR_DELMARK_OP:
626 			ut_ad(buf_mode == BUF_GET_IF_IN_POOL);
627 
628 			if (ibuf_insert(IBUF_OP_DELETE_MARK, tuple,
629 					index, space, zip_size,
630 					page_no, cursor->thr)) {
631 
632 				cursor->flag = BTR_CUR_DEL_MARK_IBUF;
633 
634 				goto func_exit;
635 			}
636 
637 			break;
638 
639 		case BTR_DELETE_OP:
640 			ut_ad(buf_mode == BUF_GET_IF_IN_POOL_OR_WATCH);
641 
642 			if (!row_purge_poss_sec(cursor->purge_node,
643 						index, tuple)) {
644 
645 				/* The record cannot be purged yet. */
646 				cursor->flag = BTR_CUR_DELETE_REF;
647 			} else if (ibuf_insert(IBUF_OP_DELETE, tuple,
648 					       index, space, zip_size,
649 					       page_no,
650 					       cursor->thr)) {
651 
652 				/* The purge was buffered. */
653 				cursor->flag = BTR_CUR_DELETE_IBUF;
654 			} else {
655 				/* The purge could not be buffered. */
656 				buf_pool_watch_unset(space, page_no);
657 				break;
658 			}
659 
660 			buf_pool_watch_unset(space, page_no);
661 			goto func_exit;
662 
663 		default:
664 			ut_error;
665 		}
666 
667 		/* Insert to the insert/delete buffer did not succeed, we
668 		must read the page from disk. */
669 
670 		buf_mode = BUF_GET;
671 
672 		goto retry_page_get;
673 	}
674 
675 	block->check_index_page_at_flush = TRUE;
676 	page = buf_block_get_frame(block);
677 
678 	if (rw_latch != RW_NO_LATCH) {
679 #ifdef UNIV_ZIP_DEBUG
680 		const page_zip_des_t*	page_zip
681 			= buf_block_get_page_zip(block);
682 		ut_a(!page_zip || page_zip_validate(page_zip, page, index));
683 #endif /* UNIV_ZIP_DEBUG */
684 
685 		buf_block_dbg_add_level(
686 			block, dict_index_is_ibuf(index)
687 			? SYNC_IBUF_TREE_NODE : SYNC_TREE_NODE);
688 	}
689 
690 	ut_ad(index->id == btr_page_get_index_id(page));
691 
692 	if (UNIV_UNLIKELY(height == ULINT_UNDEFINED)) {
693 		/* We are in the root node */
694 
695 		height = btr_page_get_level(page, mtr);
696 		root_height = height;
697 		cursor->tree_height = root_height + 1;
698 
699 #ifdef BTR_CUR_ADAPT
700 		if (block != guess) {
701 			info->root_guess = block;
702 		}
703 #endif
704 	}
705 
706 	if (height == 0) {
707 		if (rw_latch == RW_NO_LATCH) {
708 
709 			btr_cur_latch_leaves(
710 				page, space, zip_size, page_no, latch_mode,
711 				cursor, mtr);
712 		}
713 
714 		if (latch_mode != BTR_MODIFY_TREE
715 		    && latch_mode != BTR_CONT_MODIFY_TREE) {
716 
717 			/* Release the tree s-latch */
718 
719 			mtr_release_s_latch_at_savepoint(
720 				mtr, savepoint, dict_index_get_lock(index));
721 		}
722 
723 		page_mode = mode;
724 	}
725 
726 	page_cur_search_with_match(
727 		block, index, tuple, page_mode, &up_match, &up_bytes,
728 		&low_match, &low_bytes, page_cursor);
729 
730 	if (estimate) {
731 		btr_cur_add_path_info(cursor, height, root_height);
732 	}
733 
734 	/* If this is the desired level, leave the loop */
735 
736 	ut_ad(height == btr_page_get_level(page_cur_get_page(page_cursor),
737 					   mtr));
738 
739 	if (level != height) {
740 
741 		const rec_t*	node_ptr;
742 		ut_ad(height > 0);
743 
744 		height--;
745 		guess = NULL;
746 
747 		node_ptr = page_cur_get_rec(page_cursor);
748 
749 		offsets = rec_get_offsets(
750 			node_ptr, index, offsets, ULINT_UNDEFINED, &heap);
751 
752 		/* Go to the child node */
753 		page_no = btr_node_ptr_get_child_page_no(node_ptr, offsets);
754 
755 		if (UNIV_UNLIKELY(height == 0 && dict_index_is_ibuf(index))) {
756 			/* We're doing a search on an ibuf tree and we're one
757 			level above the leaf page. */
758 
759 			ut_ad(level == 0);
760 
761 			buf_mode = BUF_GET;
762 			rw_latch = RW_NO_LATCH;
763 			goto retry_page_get;
764 		}
765 
766 		goto search_loop;
767 	}
768 
769 	if (level != 0) {
770 		/* x-latch the page */
771 		buf_block_t*	child_block = btr_block_get(
772 			space, zip_size, page_no, RW_X_LATCH, index, mtr);
773 
774 		page = buf_block_get_frame(child_block);
775 		btr_assert_not_corrupted(child_block, index);
776 	} else {
777 		cursor->low_match = low_match;
778 		cursor->low_bytes = low_bytes;
779 		cursor->up_match = up_match;
780 		cursor->up_bytes = up_bytes;
781 
782 #ifdef BTR_CUR_ADAPT
783 		/* We do a dirty read of btr_search_enabled here.  We
784 		will properly check btr_search_enabled again in
785 		btr_search_build_page_hash_index() before building a
786 		page hash index, while holding btr_search_latch. */
787 		if (UNIV_LIKELY(btr_search_enabled)) {
788 
789 			btr_search_info_update(index, cursor);
790 		}
791 #endif
792 		ut_ad(cursor->up_match != ULINT_UNDEFINED
793 		      || mode != PAGE_CUR_GE);
794 		ut_ad(cursor->up_match != ULINT_UNDEFINED
795 		      || mode != PAGE_CUR_LE);
796 		ut_ad(cursor->low_match != ULINT_UNDEFINED
797 		      || mode != PAGE_CUR_LE);
798 	}
799 
800 func_exit:
801 
802 	if (UNIV_LIKELY_NULL(heap)) {
803 		mem_heap_free(heap);
804 	}
805 
806 	if (has_search_latch) {
807 
808 		rw_lock_s_lock(&btr_search_latch);
809 	}
810 }
811 
812 /*****************************************************************//**
813 Opens a cursor at either end of an index. */
814 UNIV_INTERN
815 void
btr_cur_open_at_index_side_func(ibool from_left,dict_index_t * index,ulint latch_mode,btr_cur_t * cursor,const char * file,ulint line,mtr_t * mtr)816 btr_cur_open_at_index_side_func(
817 /*============================*/
818 	ibool		from_left,	/*!< in: TRUE if open to the low end,
819 					FALSE if to the high end */
820 	dict_index_t*	index,		/*!< in: index */
821 	ulint		latch_mode,	/*!< in: latch mode */
822 	btr_cur_t*	cursor,		/*!< in: cursor */
823 	const char*	file,		/*!< in: file name */
824 	ulint		line,		/*!< in: line where called */
825 	mtr_t*		mtr)		/*!< in: mtr */
826 {
827 	page_cur_t*	page_cursor;
828 	ulint		page_no;
829 	ulint		space;
830 	ulint		zip_size;
831 	ulint		height;
832 	ulint		root_height = 0; /* remove warning */
833 	rec_t*		node_ptr;
834 	ulint		estimate;
835 	ulint		savepoint;
836 	mem_heap_t*	heap		= NULL;
837 	ulint		offsets_[REC_OFFS_NORMAL_SIZE];
838 	ulint*		offsets		= offsets_;
839 	rec_offs_init(offsets_);
840 
841 	estimate = latch_mode & BTR_ESTIMATE;
842 	latch_mode = latch_mode & ~BTR_ESTIMATE;
843 
844 	/* Store the position of the tree latch we push to mtr so that we
845 	know how to release it when we have latched the leaf node */
846 
847 	savepoint = mtr_set_savepoint(mtr);
848 
849 	if (latch_mode == BTR_MODIFY_TREE) {
850 		mtr_x_lock(dict_index_get_lock(index), mtr);
851 	} else {
852 		mtr_s_lock(dict_index_get_lock(index), mtr);
853 	}
854 
855 	page_cursor = btr_cur_get_page_cur(cursor);
856 	cursor->index = index;
857 
858 	space = dict_index_get_space(index);
859 	zip_size = dict_table_zip_size(index->table);
860 	page_no = dict_index_get_page(index);
861 
862 	height = ULINT_UNDEFINED;
863 
864 	for (;;) {
865 		buf_block_t*	block;
866 		page_t*		page;
867 		block = buf_page_get_gen(space, zip_size, page_no,
868 					 RW_NO_LATCH, NULL, BUF_GET,
869 					 file, line, mtr);
870 		page = buf_block_get_frame(block);
871 		ut_ad(index->id == btr_page_get_index_id(page));
872 
873 		block->check_index_page_at_flush = TRUE;
874 
875 		if (height == ULINT_UNDEFINED) {
876 			/* We are in the root node */
877 
878 			height = btr_page_get_level(page, mtr);
879 			root_height = height;
880 		}
881 
882 		if (height == 0) {
883 			btr_cur_latch_leaves(page, space, zip_size, page_no,
884 					     latch_mode, cursor, mtr);
885 
886 			/* In versions <= 3.23.52 we had forgotten to
887 			release the tree latch here. If in an index scan
888 			we had to scan far to find a record visible to the
889 			current transaction, that could starve others
890 			waiting for the tree latch. */
891 
892 			if ((latch_mode != BTR_MODIFY_TREE)
893 			    && (latch_mode != BTR_CONT_MODIFY_TREE)) {
894 
895 				/* Release the tree s-latch */
896 
897 				mtr_release_s_latch_at_savepoint(
898 					mtr, savepoint,
899 					dict_index_get_lock(index));
900 			}
901 		}
902 
903 		if (from_left) {
904 			page_cur_set_before_first(block, page_cursor);
905 		} else {
906 			page_cur_set_after_last(block, page_cursor);
907 		}
908 
909 		if (height == 0) {
910 			if (estimate) {
911 				btr_cur_add_path_info(cursor, height,
912 						      root_height);
913 			}
914 
915 			break;
916 		}
917 
918 		ut_ad(height > 0);
919 
920 		if (from_left) {
921 			page_cur_move_to_next(page_cursor);
922 		} else {
923 			page_cur_move_to_prev(page_cursor);
924 		}
925 
926 		if (estimate) {
927 			btr_cur_add_path_info(cursor, height, root_height);
928 		}
929 
930 		height--;
931 
932 		node_ptr = page_cur_get_rec(page_cursor);
933 		offsets = rec_get_offsets(node_ptr, cursor->index, offsets,
934 					  ULINT_UNDEFINED, &heap);
935 		/* Go to the child node */
936 		page_no = btr_node_ptr_get_child_page_no(node_ptr, offsets);
937 	}
938 
939 	if (UNIV_LIKELY_NULL(heap)) {
940 		mem_heap_free(heap);
941 	}
942 }
943 
944 /**********************************************************************//**
945 Positions a cursor at a randomly chosen position within a B-tree. */
946 UNIV_INTERN
947 void
btr_cur_open_at_rnd_pos_func(dict_index_t * index,ulint latch_mode,btr_cur_t * cursor,const char * file,ulint line,mtr_t * mtr)948 btr_cur_open_at_rnd_pos_func(
949 /*=========================*/
950 	dict_index_t*	index,		/*!< in: index */
951 	ulint		latch_mode,	/*!< in: BTR_SEARCH_LEAF, ... */
952 	btr_cur_t*	cursor,		/*!< in/out: B-tree cursor */
953 	const char*	file,		/*!< in: file name */
954 	ulint		line,		/*!< in: line where called */
955 	mtr_t*		mtr)		/*!< in: mtr */
956 {
957 	page_cur_t*	page_cursor;
958 	ulint		page_no;
959 	ulint		space;
960 	ulint		zip_size;
961 	ulint		height;
962 	rec_t*		node_ptr;
963 	mem_heap_t*	heap		= NULL;
964 	ulint		offsets_[REC_OFFS_NORMAL_SIZE];
965 	ulint*		offsets		= offsets_;
966 	rec_offs_init(offsets_);
967 
968 	if (latch_mode == BTR_MODIFY_TREE) {
969 		mtr_x_lock(dict_index_get_lock(index), mtr);
970 	} else {
971 		mtr_s_lock(dict_index_get_lock(index), mtr);
972 	}
973 
974 	page_cursor = btr_cur_get_page_cur(cursor);
975 	cursor->index = index;
976 
977 	space = dict_index_get_space(index);
978 	zip_size = dict_table_zip_size(index->table);
979 	page_no = dict_index_get_page(index);
980 
981 	height = ULINT_UNDEFINED;
982 
983 	for (;;) {
984 		buf_block_t*	block;
985 		page_t*		page;
986 
987 		block = buf_page_get_gen(space, zip_size, page_no,
988 					 RW_NO_LATCH, NULL, BUF_GET,
989 					 file, line, mtr);
990 		page = buf_block_get_frame(block);
991 		ut_ad(index->id == btr_page_get_index_id(page));
992 
993 		if (height == ULINT_UNDEFINED) {
994 			/* We are in the root node */
995 
996 			height = btr_page_get_level(page, mtr);
997 		}
998 
999 		if (height == 0) {
1000 			btr_cur_latch_leaves(page, space, zip_size, page_no,
1001 					     latch_mode, cursor, mtr);
1002 		}
1003 
1004 		page_cur_open_on_rnd_user_rec(block, page_cursor);
1005 
1006 		if (height == 0) {
1007 
1008 			break;
1009 		}
1010 
1011 		ut_ad(height > 0);
1012 
1013 		height--;
1014 
1015 		node_ptr = page_cur_get_rec(page_cursor);
1016 		offsets = rec_get_offsets(node_ptr, cursor->index, offsets,
1017 					  ULINT_UNDEFINED, &heap);
1018 		/* Go to the child node */
1019 		page_no = btr_node_ptr_get_child_page_no(node_ptr, offsets);
1020 	}
1021 
1022 	if (UNIV_LIKELY_NULL(heap)) {
1023 		mem_heap_free(heap);
1024 	}
1025 }
1026 
1027 /*==================== B-TREE INSERT =========================*/
1028 
1029 /*************************************************************//**
1030 Inserts a record if there is enough space, or if enough space can
1031 be freed by reorganizing. Differs from btr_cur_optimistic_insert because
1032 no heuristics is applied to whether it pays to use CPU time for
1033 reorganizing the page or not.
1034 @return	pointer to inserted record if succeed, else NULL */
1035 static
1036 rec_t*
btr_cur_insert_if_possible(btr_cur_t * cursor,const dtuple_t * tuple,ulint n_ext,mtr_t * mtr)1037 btr_cur_insert_if_possible(
1038 /*=======================*/
1039 	btr_cur_t*	cursor,	/*!< in: cursor on page after which to insert;
1040 				cursor stays valid */
1041 	const dtuple_t*	tuple,	/*!< in: tuple to insert; the size info need not
1042 				have been stored to tuple */
1043 	ulint		n_ext,	/*!< in: number of externally stored columns */
1044 	mtr_t*		mtr)	/*!< in: mtr */
1045 {
1046 	page_cur_t*	page_cursor;
1047 	buf_block_t*	block;
1048 	rec_t*		rec;
1049 
1050 	ut_ad(dtuple_check_typed(tuple));
1051 
1052 	block = btr_cur_get_block(cursor);
1053 
1054 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
1055 	page_cursor = btr_cur_get_page_cur(cursor);
1056 
1057 	/* Now, try the insert */
1058 	rec = page_cur_tuple_insert(page_cursor, tuple,
1059 				    cursor->index, n_ext, mtr);
1060 
1061 	if (UNIV_UNLIKELY(!rec)) {
1062 		/* If record did not fit, reorganize */
1063 
1064 		if (btr_page_reorganize(block, cursor->index, mtr)) {
1065 
1066 			page_cur_search(block, cursor->index, tuple,
1067 					PAGE_CUR_LE, page_cursor);
1068 
1069 			rec = page_cur_tuple_insert(page_cursor, tuple,
1070 						    cursor->index, n_ext, mtr);
1071 		}
1072 	}
1073 
1074 	return(rec);
1075 }
1076 
1077 /*************************************************************//**
1078 For an insert, checks the locks and does the undo logging if desired.
1079 @return	DB_SUCCESS, DB_WAIT_LOCK, DB_FAIL, or error number */
1080 UNIV_INLINE
1081 ulint
btr_cur_ins_lock_and_undo(ulint flags,btr_cur_t * cursor,dtuple_t * entry,que_thr_t * thr,mtr_t * mtr,ibool * inherit)1082 btr_cur_ins_lock_and_undo(
1083 /*======================*/
1084 	ulint		flags,	/*!< in: undo logging and locking flags: if
1085 				not zero, the parameters index and thr
1086 				should be specified */
1087 	btr_cur_t*	cursor,	/*!< in: cursor on page after which to insert */
1088 	dtuple_t*	entry,	/*!< in/out: entry to insert */
1089 	que_thr_t*	thr,	/*!< in: query thread or NULL */
1090 	mtr_t*		mtr,	/*!< in/out: mini-transaction */
1091 	ibool*		inherit)/*!< out: TRUE if the inserted new record maybe
1092 				should inherit LOCK_GAP type locks from the
1093 				successor record */
1094 {
1095 	dict_index_t*	index;
1096 	ulint		err;
1097 	rec_t*		rec;
1098 	roll_ptr_t	roll_ptr;
1099 
1100 	/* Check if we have to wait for a lock: enqueue an explicit lock
1101 	request if yes */
1102 
1103 	rec = btr_cur_get_rec(cursor);
1104 	index = cursor->index;
1105 
1106 	err = lock_rec_insert_check_and_lock(flags, rec,
1107 					     btr_cur_get_block(cursor),
1108 					     index, thr, mtr, inherit);
1109 
1110 	if (err != DB_SUCCESS) {
1111 
1112 		return(err);
1113 	}
1114 
1115 	if (dict_index_is_clust(index) && !dict_index_is_ibuf(index)) {
1116 
1117 		err = trx_undo_report_row_operation(flags, TRX_UNDO_INSERT_OP,
1118 						    thr, index, entry,
1119 						    NULL, 0, NULL,
1120 						    &roll_ptr);
1121 		if (err != DB_SUCCESS) {
1122 
1123 			return(err);
1124 		}
1125 
1126 		/* Now we can fill in the roll ptr field in entry */
1127 
1128 		if (!(flags & BTR_KEEP_SYS_FLAG)) {
1129 
1130 			row_upd_index_entry_sys_field(entry, index,
1131 						      DATA_ROLL_PTR, roll_ptr);
1132 		}
1133 	}
1134 
1135 	return(DB_SUCCESS);
1136 }
1137 
1138 #ifdef UNIV_DEBUG
1139 /*************************************************************//**
1140 Report information about a transaction. */
1141 static
1142 void
btr_cur_trx_report(trx_t * trx,const dict_index_t * index,const char * op)1143 btr_cur_trx_report(
1144 /*===============*/
1145 	trx_t*			trx,	/*!< in: transaction */
1146 	const dict_index_t*	index,	/*!< in: index */
1147 	const char*		op)	/*!< in: operation */
1148 {
1149 	fprintf(stderr, "Trx with id " TRX_ID_FMT " going to ",
1150 		(ullint) trx->id);
1151 	fputs(op, stderr);
1152 	dict_index_name_print(stderr, trx, index);
1153 	putc('\n', stderr);
1154 }
1155 #endif /* UNIV_DEBUG */
1156 
1157 /*************************************************************//**
1158 Tries to perform an insert to a page in an index tree, next to cursor.
1159 It is assumed that mtr holds an x-latch on the page. The operation does
1160 not succeed if there is too little space on the page. If there is just
1161 one record on the page, the insert will always succeed; this is to
1162 prevent trying to split a page with just one record.
1163 @return	DB_SUCCESS, DB_WAIT_LOCK, DB_FAIL, or error number */
1164 UNIV_INTERN
1165 ulint
btr_cur_optimistic_insert(ulint flags,btr_cur_t * cursor,dtuple_t * entry,rec_t ** rec,big_rec_t ** big_rec,ulint n_ext,que_thr_t * thr,mtr_t * mtr)1166 btr_cur_optimistic_insert(
1167 /*======================*/
1168 	ulint		flags,	/*!< in: undo logging and locking flags: if not
1169 				zero, the parameters index and thr should be
1170 				specified */
1171 	btr_cur_t*	cursor,	/*!< in: cursor on page after which to insert;
1172 				cursor stays valid */
1173 	dtuple_t*	entry,	/*!< in/out: entry to insert */
1174 	rec_t**		rec,	/*!< out: pointer to inserted record if
1175 				succeed */
1176 	big_rec_t**	big_rec,/*!< out: big rec vector whose fields have to
1177 				be stored externally by the caller, or
1178 				NULL */
1179 	ulint		n_ext,	/*!< in: number of externally stored columns */
1180 	que_thr_t*	thr,	/*!< in: query thread or NULL */
1181 	mtr_t*		mtr)	/*!< in: mtr; if this function returns
1182 				DB_SUCCESS on a leaf page of a secondary
1183 				index in a compressed tablespace, the
1184 				mtr must be committed before latching
1185 				any further pages */
1186 {
1187 	big_rec_t*	big_rec_vec	= NULL;
1188 	dict_index_t*	index;
1189 	page_cur_t*	page_cursor;
1190 	buf_block_t*	block;
1191 	page_t*		page;
1192 	ulint		max_size;
1193 	rec_t*		dummy_rec;
1194 	ibool		leaf;
1195 	ibool		reorg;
1196 	ibool		inherit;
1197 	ulint		zip_size;
1198 	ulint		rec_size;
1199 	ulint		err;
1200 
1201 	*big_rec = NULL;
1202 
1203 	block = btr_cur_get_block(cursor);
1204 	page = buf_block_get_frame(block);
1205 	index = cursor->index;
1206 	zip_size = buf_block_get_zip_size(block);
1207 #ifdef UNIV_DEBUG_VALGRIND
1208 	if (zip_size) {
1209 		UNIV_MEM_ASSERT_RW(page, UNIV_PAGE_SIZE);
1210 		UNIV_MEM_ASSERT_RW(block->page.zip.data, zip_size);
1211 	}
1212 #endif /* UNIV_DEBUG_VALGRIND */
1213 
1214 	if (!dtuple_check_typed_no_assert(entry)) {
1215 		fputs("InnoDB: Error in a tuple to insert into ", stderr);
1216 		dict_index_name_print(stderr, thr_get_trx(thr), index);
1217 	}
1218 #ifdef UNIV_DEBUG
1219 	if (btr_cur_print_record_ops && thr) {
1220 		btr_cur_trx_report(thr_get_trx(thr), index, "insert into ");
1221 		dtuple_print(stderr, entry);
1222 	}
1223 #endif /* UNIV_DEBUG */
1224 
1225 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
1226 	max_size = page_get_max_insert_size_after_reorganize(page, 1);
1227 	leaf = page_is_leaf(page);
1228 
1229 	/* Calculate the record size when entry is converted to a record */
1230 	rec_size = rec_get_converted_size(index, entry, n_ext);
1231 
1232 	if (page_zip_rec_needs_ext(rec_size, page_is_comp(page),
1233 				   dtuple_get_n_fields(entry), zip_size)) {
1234 
1235 		/* The record is so big that we have to store some fields
1236 		externally on separate database pages */
1237 		big_rec_vec = dtuple_convert_big_rec(index, entry, &n_ext);
1238 
1239 		if (UNIV_UNLIKELY(big_rec_vec == NULL)) {
1240 
1241 			return(DB_TOO_BIG_RECORD);
1242 		}
1243 
1244 		rec_size = rec_get_converted_size(index, entry, n_ext);
1245 	}
1246 
1247 	if (UNIV_UNLIKELY(zip_size)) {
1248 		/* Estimate the free space of an empty compressed page.
1249 		Subtract one byte for the encoded heap_no in the
1250 		modification log. */
1251 		ulint	free_space_zip = page_zip_empty_size(
1252 			cursor->index->n_fields, zip_size);
1253 		ulint	n_uniq = dict_index_get_n_unique_in_tree(index);
1254 
1255 		ut_ad(dict_table_is_comp(index->table));
1256 
1257 		if (free_space_zip == 0) {
1258 too_big:
1259 			if (big_rec_vec) {
1260 				dtuple_convert_back_big_rec(
1261 					index, entry, big_rec_vec);
1262 			}
1263 
1264 			return(DB_TOO_BIG_RECORD);
1265 		}
1266 
1267 		/* Subtract one byte for the encoded heap_no in the
1268 		modification log. */
1269 		free_space_zip--;
1270 
1271 		/* There should be enough room for two node pointer
1272 		records on an empty non-leaf page.  This prevents
1273 		infinite page splits. */
1274 
1275 		if (entry->n_fields >= n_uniq
1276 		    && (REC_NODE_PTR_SIZE
1277 			+ rec_get_converted_size_comp_prefix(
1278 				index, entry->fields, n_uniq, NULL)
1279 			/* On a compressed page, there is
1280 			a two-byte entry in the dense
1281 			page directory for every record.
1282 			But there is no record header. */
1283 			- (REC_N_NEW_EXTRA_BYTES - 2)
1284 			> free_space_zip / 2)) {
1285 			goto too_big;
1286 		}
1287 	}
1288 
1289 	LIMIT_OPTIMISTIC_INSERT_DEBUG(page_get_n_recs(page),
1290 				      goto fail);
1291 
1292 	/* If there have been many consecutive inserts, and we are on the leaf
1293 	level, check if we have to split the page to reserve enough free space
1294 	for future updates of records. */
1295 
1296 	if (dict_index_is_clust(index)
1297 	    && (page_get_n_recs(page) >= 2)
1298 	    && UNIV_LIKELY(leaf)
1299 	    && (dict_index_get_space_reserve() + rec_size > max_size)
1300 	    && (btr_page_get_split_rec_to_right(cursor, &dummy_rec)
1301 		|| btr_page_get_split_rec_to_left(cursor, &dummy_rec))) {
1302 fail:
1303 		err = DB_FAIL;
1304 fail_err:
1305 
1306 		if (big_rec_vec) {
1307 			dtuple_convert_back_big_rec(index, entry, big_rec_vec);
1308 		}
1309 
1310 		return(err);
1311 	}
1312 
1313 	if (UNIV_UNLIKELY(max_size < BTR_CUR_PAGE_REORGANIZE_LIMIT
1314 			  || max_size < rec_size)
1315 	    && UNIV_LIKELY(page_get_n_recs(page) > 1)
1316 	    && page_get_max_insert_size(page, 1) < rec_size) {
1317 
1318 		goto fail;
1319 	}
1320 
1321 	/* Check locks and write to the undo log, if specified */
1322 	err = btr_cur_ins_lock_and_undo(flags, cursor, entry,
1323 					thr, mtr, &inherit);
1324 
1325 	if (UNIV_UNLIKELY(err != DB_SUCCESS)) {
1326 
1327 		goto fail_err;
1328 	}
1329 
1330 	page_cursor = btr_cur_get_page_cur(cursor);
1331 
1332 	/* Now, try the insert */
1333 
1334 	{
1335 		const rec_t* page_cursor_rec = page_cur_get_rec(page_cursor);
1336 		*rec = page_cur_tuple_insert(page_cursor, entry, index,
1337 					     n_ext, mtr);
1338 		reorg = page_cursor_rec != page_cur_get_rec(page_cursor);
1339 
1340 		if (UNIV_UNLIKELY(reorg)) {
1341 			ut_a(zip_size);
1342 			/* It's possible for rec to be NULL if the
1343 			page is compressed.  This is because a
1344 			reorganized page may become incompressible. */
1345 			if (!*rec) {
1346 				goto fail;
1347 			}
1348 		}
1349 	}
1350 
1351 	if (UNIV_UNLIKELY(!*rec) && UNIV_LIKELY(!reorg)) {
1352 		/* If the record did not fit, reorganize */
1353 		if (UNIV_UNLIKELY(!btr_page_reorganize(block, index, mtr))) {
1354 			ut_a(zip_size);
1355 
1356 			goto fail;
1357 		}
1358 
1359 		ut_ad(zip_size
1360 		      || page_get_max_insert_size(page, 1) == max_size);
1361 
1362 		reorg = TRUE;
1363 
1364 		page_cur_search(block, index, entry, PAGE_CUR_LE, page_cursor);
1365 
1366 		*rec = page_cur_tuple_insert(page_cursor, entry, index,
1367 					     n_ext, mtr);
1368 
1369 		if (UNIV_UNLIKELY(!*rec)) {
1370 			if (UNIV_LIKELY(zip_size != 0)) {
1371 
1372 				goto fail;
1373 			}
1374 
1375 			fputs("InnoDB: Error: cannot insert tuple ", stderr);
1376 			dtuple_print(stderr, entry);
1377 			fputs(" into ", stderr);
1378 			dict_index_name_print(stderr, thr_get_trx(thr), index);
1379 			fprintf(stderr, "\nInnoDB: max insert size %lu\n",
1380 				(ulong) max_size);
1381 			ut_error;
1382 		}
1383 	}
1384 
1385 #ifdef BTR_CUR_HASH_ADAPT
1386 	if (!reorg && leaf && (cursor->flag == BTR_CUR_HASH)) {
1387 		btr_search_update_hash_node_on_insert(cursor);
1388 	} else {
1389 		btr_search_update_hash_on_insert(cursor);
1390 	}
1391 #endif
1392 
1393 	if (!(flags & BTR_NO_LOCKING_FLAG) && inherit) {
1394 
1395 		lock_update_insert(block, *rec);
1396 	}
1397 
1398 #if 0
1399 	fprintf(stderr, "Insert into page %lu, max ins size %lu,"
1400 		" rec %lu ind type %lu\n",
1401 		buf_block_get_page_no(block), max_size,
1402 		rec_size + PAGE_DIR_SLOT_SIZE, index->type);
1403 #endif
1404 	if (leaf && !dict_index_is_clust(index)) {
1405 		/* Update the free bits of the B-tree page in the
1406 		insert buffer bitmap. */
1407 
1408 		/* The free bits in the insert buffer bitmap must
1409 		never exceed the free space on a page.  It is safe to
1410 		decrement or reset the bits in the bitmap in a
1411 		mini-transaction that is committed before the
1412 		mini-transaction that affects the free space. */
1413 
1414 		/* It is unsafe to increment the bits in a separately
1415 		committed mini-transaction, because in crash recovery,
1416 		the free bits could momentarily be set too high. */
1417 
1418 		if (zip_size) {
1419 			/* Update the bits in the same mini-transaction. */
1420 			ibuf_update_free_bits_zip(block, mtr);
1421 		} else {
1422 			/* Decrement the bits in a separate
1423 			mini-transaction. */
1424 			ibuf_update_free_bits_if_full(
1425 				block, max_size,
1426 				rec_size + PAGE_DIR_SLOT_SIZE);
1427 		}
1428 	}
1429 
1430 	*big_rec = big_rec_vec;
1431 
1432 	return(DB_SUCCESS);
1433 }
1434 
1435 /*************************************************************//**
1436 Performs an insert on a page of an index tree. It is assumed that mtr
1437 holds an x-latch on the tree and on the cursor page. If the insert is
1438 made on the leaf level, to avoid deadlocks, mtr must also own x-latches
1439 to brothers of page, if those brothers exist.
1440 @return	DB_SUCCESS or error number */
1441 UNIV_INTERN
1442 ulint
btr_cur_pessimistic_insert(ulint flags,btr_cur_t * cursor,dtuple_t * entry,rec_t ** rec,big_rec_t ** big_rec,ulint n_ext,que_thr_t * thr,mtr_t * mtr)1443 btr_cur_pessimistic_insert(
1444 /*=======================*/
1445 	ulint		flags,	/*!< in: undo logging and locking flags: if not
1446 				zero, the parameter thr should be
1447 				specified; if no undo logging is specified,
1448 				then the caller must have reserved enough
1449 				free extents in the file space so that the
1450 				insertion will certainly succeed */
1451 	btr_cur_t*	cursor,	/*!< in: cursor after which to insert;
1452 				cursor stays valid */
1453 	dtuple_t*	entry,	/*!< in/out: entry to insert */
1454 	rec_t**		rec,	/*!< out: pointer to inserted record if
1455 				succeed */
1456 	big_rec_t**	big_rec,/*!< out: big rec vector whose fields have to
1457 				be stored externally by the caller, or
1458 				NULL */
1459 	ulint		n_ext,	/*!< in: number of externally stored columns */
1460 	que_thr_t*	thr,	/*!< in: query thread or NULL */
1461 	mtr_t*		mtr)	/*!< in: mtr */
1462 {
1463 	dict_index_t*	index		= cursor->index;
1464 	ulint		zip_size	= dict_table_zip_size(index->table);
1465 	big_rec_t*	big_rec_vec	= NULL;
1466 	mem_heap_t*	heap		= NULL;
1467 	ulint		err;
1468 	ibool		dummy_inh;
1469 	ibool		success;
1470 	ulint		n_extents	= 0;
1471 	ulint		n_reserved;
1472 
1473 	ut_ad(dtuple_check_typed(entry));
1474 
1475 	*big_rec = NULL;
1476 
1477 	ut_ad(mtr_memo_contains(mtr,
1478 				dict_index_get_lock(btr_cur_get_index(cursor)),
1479 				MTR_MEMO_X_LOCK));
1480 	ut_ad(mtr_memo_contains(mtr, btr_cur_get_block(cursor),
1481 				MTR_MEMO_PAGE_X_FIX));
1482 
1483 	cursor->flag = BTR_CUR_BINARY;
1484 
1485 	/* Check locks and write to undo log, if specified */
1486 
1487 	err = btr_cur_ins_lock_and_undo(flags, cursor, entry,
1488 					thr, mtr, &dummy_inh);
1489 
1490 	if (err != DB_SUCCESS) {
1491 
1492 		return(err);
1493 	}
1494 
1495 	if (!(flags & BTR_NO_UNDO_LOG_FLAG)) {
1496 		/* First reserve enough free space for the file segments
1497 		of the index tree, so that the insert will not fail because
1498 		of lack of space */
1499 
1500 		n_extents = cursor->tree_height / 16 + 3;
1501 
1502 		success = fsp_reserve_free_extents(&n_reserved, index->space,
1503 						   n_extents, FSP_NORMAL, mtr);
1504 		if (!success) {
1505 			return(DB_OUT_OF_FILE_SPACE);
1506 		}
1507 	}
1508 
1509 	if (page_zip_rec_needs_ext(rec_get_converted_size(index, entry, n_ext),
1510 				   dict_table_is_comp(index->table),
1511 				   dict_index_get_n_fields(index),
1512 				   zip_size)) {
1513 		/* The record is so big that we have to store some fields
1514 		externally on separate database pages */
1515 
1516 		if (UNIV_LIKELY_NULL(big_rec_vec)) {
1517 			/* This should never happen, but we handle
1518 			the situation in a robust manner. */
1519 			ut_ad(0);
1520 			dtuple_convert_back_big_rec(index, entry, big_rec_vec);
1521 		}
1522 
1523 		big_rec_vec = dtuple_convert_big_rec(index, entry, &n_ext);
1524 
1525 		if (big_rec_vec == NULL) {
1526 
1527 			if (n_extents > 0) {
1528 				fil_space_release_free_extents(index->space,
1529 							       n_reserved);
1530 			}
1531 			return(DB_TOO_BIG_RECORD);
1532 		}
1533 	}
1534 
1535 	if (dict_index_get_page(index)
1536 	    == buf_block_get_page_no(btr_cur_get_block(cursor))) {
1537 
1538 		/* The page is the root page */
1539 		*rec = btr_root_raise_and_insert(cursor, entry, n_ext, mtr);
1540 	} else {
1541 		*rec = btr_page_split_and_insert(cursor, entry, n_ext, mtr);
1542 	}
1543 
1544 	if (UNIV_LIKELY_NULL(heap)) {
1545 		mem_heap_free(heap);
1546 	}
1547 
1548 	ut_ad(page_rec_get_next(btr_cur_get_rec(cursor)) == *rec);
1549 
1550 #ifdef BTR_CUR_ADAPT
1551 	btr_search_update_hash_on_insert(cursor);
1552 #endif
1553 	if (!(flags & BTR_NO_LOCKING_FLAG)) {
1554 
1555 		lock_update_insert(btr_cur_get_block(cursor), *rec);
1556 	}
1557 
1558 	if (n_extents > 0) {
1559 		fil_space_release_free_extents(index->space, n_reserved);
1560 	}
1561 
1562 	*big_rec = big_rec_vec;
1563 
1564 	return(DB_SUCCESS);
1565 }
1566 
1567 /*==================== B-TREE UPDATE =========================*/
1568 
1569 /*************************************************************//**
1570 For an update, checks the locks and does the undo logging.
1571 @return	DB_SUCCESS, DB_WAIT_LOCK, or error number */
1572 UNIV_INLINE
1573 ulint
btr_cur_upd_lock_and_undo(ulint flags,btr_cur_t * cursor,const upd_t * update,ulint cmpl_info,que_thr_t * thr,mtr_t * mtr,roll_ptr_t * roll_ptr)1574 btr_cur_upd_lock_and_undo(
1575 /*======================*/
1576 	ulint		flags,	/*!< in: undo logging and locking flags */
1577 	btr_cur_t*	cursor,	/*!< in: cursor on record to update */
1578 	const upd_t*	update,	/*!< in: update vector */
1579 	ulint		cmpl_info,/*!< in: compiler info on secondary index
1580 				updates */
1581 	que_thr_t*	thr,	/*!< in: query thread */
1582 	mtr_t*		mtr,	/*!< in/out: mini-transaction */
1583 	roll_ptr_t*	roll_ptr)/*!< out: roll pointer */
1584 {
1585 	dict_index_t*	index;
1586 	rec_t*		rec;
1587 	ulint		err;
1588 
1589 	ut_ad(cursor && update && thr && roll_ptr);
1590 
1591 	rec = btr_cur_get_rec(cursor);
1592 	index = cursor->index;
1593 
1594 	if (!dict_index_is_clust(index)) {
1595 		/* We do undo logging only when we update a clustered index
1596 		record */
1597 		return(lock_sec_rec_modify_check_and_lock(
1598 			       flags, btr_cur_get_block(cursor), rec,
1599 			       index, thr, mtr));
1600 	}
1601 
1602 	/* Check if we have to wait for a lock: enqueue an explicit lock
1603 	request if yes */
1604 
1605 	err = DB_SUCCESS;
1606 
1607 	if (!(flags & BTR_NO_LOCKING_FLAG)) {
1608 		mem_heap_t*	heap		= NULL;
1609 		ulint		offsets_[REC_OFFS_NORMAL_SIZE];
1610 		rec_offs_init(offsets_);
1611 
1612 		err = lock_clust_rec_modify_check_and_lock(
1613 			flags, btr_cur_get_block(cursor), rec, index,
1614 			rec_get_offsets(rec, index, offsets_,
1615 					ULINT_UNDEFINED, &heap), thr);
1616 		if (UNIV_LIKELY_NULL(heap)) {
1617 			mem_heap_free(heap);
1618 		}
1619 		if (err != DB_SUCCESS) {
1620 
1621 			return(err);
1622 		}
1623 	}
1624 
1625 	/* Append the info about the update in the undo log */
1626 
1627 	err = trx_undo_report_row_operation(flags, TRX_UNDO_MODIFY_OP, thr,
1628 					    index, NULL, update,
1629 					    cmpl_info, rec, roll_ptr);
1630 	return(err);
1631 }
1632 
1633 /***********************************************************//**
1634 Writes a redo log record of updating a record in-place. */
1635 UNIV_INTERN
1636 void
btr_cur_update_in_place_log(ulint flags,rec_t * rec,dict_index_t * index,const upd_t * update,trx_t * trx,roll_ptr_t roll_ptr,mtr_t * mtr)1637 btr_cur_update_in_place_log(
1638 /*========================*/
1639 	ulint		flags,		/*!< in: flags */
1640 	rec_t*		rec,		/*!< in: record */
1641 	dict_index_t*	index,		/*!< in: index where cursor positioned */
1642 	const upd_t*	update,		/*!< in: update vector */
1643 	trx_t*		trx,		/*!< in: transaction */
1644 	roll_ptr_t	roll_ptr,	/*!< in: roll ptr */
1645 	mtr_t*		mtr)		/*!< in: mtr */
1646 {
1647 	byte*	log_ptr;
1648 	page_t*	page	= page_align(rec);
1649 	ut_ad(flags < 256);
1650 	ut_ad(!!page_is_comp(page) == dict_table_is_comp(index->table));
1651 
1652 	log_ptr = mlog_open_and_write_index(mtr, rec, index, page_is_comp(page)
1653 					    ? MLOG_COMP_REC_UPDATE_IN_PLACE
1654 					    : MLOG_REC_UPDATE_IN_PLACE,
1655 					    1 + DATA_ROLL_PTR_LEN + 14 + 2
1656 					    + MLOG_BUF_MARGIN);
1657 
1658 	if (!log_ptr) {
1659 		/* Logging in mtr is switched off during crash recovery */
1660 		return;
1661 	}
1662 
1663 	/* For secondary indexes, we could skip writing the dummy system fields
1664 	to the redo log but we have to change redo log parsing of
1665 	MLOG_REC_UPDATE_IN_PLACE/MLOG_COMP_REC_UPDATE_IN_PLACE or we have to add
1666 	new redo log record. For now, just write dummy sys fields to the redo
1667 	log if we are updating a secondary index record.
1668 	*/
1669 
1670 	mach_write_to_1(log_ptr, flags);
1671 	log_ptr++;
1672 
1673 	if (dict_index_is_clust(index)) {
1674 		log_ptr = row_upd_write_sys_vals_to_log(
1675 				index, trx, roll_ptr, log_ptr, mtr);
1676 	} else {
1677 		/* Dummy system fields for a secondary index */
1678 		/* TRX_ID Position */
1679 		log_ptr += mach_write_compressed(log_ptr, 0);
1680 		/* ROLL_PTR */
1681 		trx_write_roll_ptr(log_ptr, 0);
1682 		log_ptr += DATA_ROLL_PTR_LEN;
1683 		/* TRX_ID */
1684 		log_ptr += mach_ull_write_compressed(log_ptr, 0);
1685 	}
1686 
1687 	mach_write_to_2(log_ptr, page_offset(rec));
1688 	log_ptr += 2;
1689 
1690 	row_upd_index_write_log(update, log_ptr, mtr);
1691 }
1692 #endif /* UNIV_HOTBACKUP */
1693 
1694 /***********************************************************//**
1695 Parses a redo log record of updating a record in-place.
1696 @return	end of log record or NULL */
1697 UNIV_INTERN
1698 byte*
btr_cur_parse_update_in_place(byte * ptr,byte * end_ptr,page_t * page,page_zip_des_t * page_zip,dict_index_t * index)1699 btr_cur_parse_update_in_place(
1700 /*==========================*/
1701 	byte*		ptr,	/*!< in: buffer */
1702 	byte*		end_ptr,/*!< in: buffer end */
1703 	page_t*		page,	/*!< in/out: page or NULL */
1704 	page_zip_des_t*	page_zip,/*!< in/out: compressed page, or NULL */
1705 	dict_index_t*	index)	/*!< in: index corresponding to page */
1706 {
1707 	ulint		flags;
1708 	rec_t*		rec;
1709 	upd_t*		update;
1710 	ulint		pos;
1711 	trx_id_t	trx_id;
1712 	roll_ptr_t	roll_ptr;
1713 	ulint		rec_offset;
1714 	mem_heap_t*	heap;
1715 	ulint*		offsets;
1716 
1717 	if (end_ptr < ptr + 1) {
1718 
1719 		return(NULL);
1720 	}
1721 
1722 	flags = mach_read_from_1(ptr);
1723 	ptr++;
1724 
1725 	ptr = row_upd_parse_sys_vals(ptr, end_ptr, &pos, &trx_id, &roll_ptr);
1726 
1727 	if (ptr == NULL) {
1728 
1729 		return(NULL);
1730 	}
1731 
1732 	if (end_ptr < ptr + 2) {
1733 
1734 		return(NULL);
1735 	}
1736 
1737 	rec_offset = mach_read_from_2(ptr);
1738 	ptr += 2;
1739 
1740 	ut_a(rec_offset <= UNIV_PAGE_SIZE);
1741 
1742 	heap = mem_heap_create(256);
1743 
1744 	ptr = row_upd_index_parse(ptr, end_ptr, heap, &update);
1745 
1746 	if (!ptr || !page) {
1747 
1748 		goto func_exit;
1749 	}
1750 
1751 	ut_a((ibool)!!page_is_comp(page) == dict_table_is_comp(index->table));
1752 	rec = page + rec_offset;
1753 
1754 	/* We do not need to reserve btr_search_latch, as the page is only
1755 	being recovered, and there cannot be a hash index to it. */
1756 
1757 	offsets = rec_get_offsets(rec, index, NULL, ULINT_UNDEFINED, &heap);
1758 
1759 	if (!(flags & BTR_KEEP_SYS_FLAG)) {
1760 		row_upd_rec_sys_fields_in_recovery(rec, page_zip, offsets,
1761 						   pos, trx_id, roll_ptr);
1762 	}
1763 
1764 	row_upd_rec_in_place(rec, index, offsets, update, page_zip);
1765 
1766 func_exit:
1767 	mem_heap_free(heap);
1768 
1769 	return(ptr);
1770 }
1771 
1772 #ifndef UNIV_HOTBACKUP
1773 /*************************************************************//**
1774 See if there is enough place in the page modification log to log
1775 an update-in-place.
1776 @return	TRUE if enough place */
1777 UNIV_INTERN
1778 ibool
btr_cur_update_alloc_zip(page_zip_des_t * page_zip,buf_block_t * block,dict_index_t * index,ulint length,ibool create,mtr_t * mtr)1779 btr_cur_update_alloc_zip(
1780 /*=====================*/
1781 	page_zip_des_t*	page_zip,/*!< in/out: compressed page */
1782 	buf_block_t*	block,	/*!< in/out: buffer page */
1783 	dict_index_t*	index,	/*!< in: the index corresponding to the block */
1784 	ulint		length,	/*!< in: size needed */
1785 	ibool		create,	/*!< in: TRUE=delete-and-insert,
1786 				FALSE=update-in-place */
1787 	mtr_t*		mtr)	/*!< in: mini-transaction */
1788 {
1789 	ut_a(page_zip == buf_block_get_page_zip(block));
1790 	ut_ad(page_zip);
1791 	ut_ad(!dict_index_is_ibuf(index));
1792 
1793 	if (page_zip_available(page_zip, dict_index_is_clust(index),
1794 			       length, create)) {
1795 		return(TRUE);
1796 	}
1797 
1798 	if (!page_zip->m_nonempty) {
1799 		/* The page has been freshly compressed, so
1800 		recompressing it will not help. */
1801 		return(FALSE);
1802 	}
1803 
1804 	if (!page_zip_compress(page_zip, buf_block_get_frame(block),
1805 			       index, mtr)) {
1806 		/* Unable to compress the page */
1807 		return(FALSE);
1808 	}
1809 
1810 	/* After recompressing a page, we must make sure that the free
1811 	bits in the insert buffer bitmap will not exceed the free
1812 	space on the page.  Because this function will not attempt
1813 	recompression unless page_zip_available() fails above, it is
1814 	safe to reset the free bits if page_zip_available() fails
1815 	again, below.  The free bits can safely be reset in a separate
1816 	mini-transaction.  If page_zip_available() succeeds below, we
1817 	can be sure that the page_zip_compress() above did not reduce
1818 	the free space available on the page. */
1819 
1820 	if (!page_zip_available(page_zip, dict_index_is_clust(index),
1821 				length, create)) {
1822 		/* Out of space: reset the free bits. */
1823 		if (!dict_index_is_clust(index)
1824 		    && page_is_leaf(buf_block_get_frame(block))) {
1825 			ibuf_reset_free_bits(block);
1826 		}
1827 		return(FALSE);
1828 	}
1829 
1830 	return(TRUE);
1831 }
1832 
1833 /*************************************************************//**
1834 Updates a record when the update causes no size changes in its fields.
1835 We assume here that the ordering fields of the record do not change.
1836 @return	DB_SUCCESS or error number */
1837 UNIV_INTERN
1838 ulint
btr_cur_update_in_place(ulint flags,btr_cur_t * cursor,const upd_t * update,ulint cmpl_info,que_thr_t * thr,mtr_t * mtr)1839 btr_cur_update_in_place(
1840 /*====================*/
1841 	ulint		flags,	/*!< in: undo logging and locking flags */
1842 	btr_cur_t*	cursor,	/*!< in: cursor on the record to update;
1843 				cursor stays valid and positioned on the
1844 				same record */
1845 	const upd_t*	update,	/*!< in: update vector */
1846 	ulint		cmpl_info,/*!< in: compiler info on secondary index
1847 				updates */
1848 	que_thr_t*	thr,	/*!< in: query thread */
1849 	mtr_t*		mtr)	/*!< in: mtr; must be committed before
1850 				latching any further pages */
1851 {
1852 	dict_index_t*	index;
1853 	buf_block_t*	block;
1854 	page_zip_des_t*	page_zip;
1855 	ulint		err;
1856 	rec_t*		rec;
1857 	roll_ptr_t	roll_ptr	= 0;
1858 	trx_t*		trx;
1859 	ulint		was_delete_marked;
1860 	ibool		is_hashed;
1861 	mem_heap_t*	heap		= NULL;
1862 	ulint		offsets_[REC_OFFS_NORMAL_SIZE];
1863 	ulint*		offsets		= offsets_;
1864 	rec_offs_init(offsets_);
1865 
1866 	rec = btr_cur_get_rec(cursor);
1867 	index = cursor->index;
1868 	ut_ad(!!page_rec_is_comp(rec) == dict_table_is_comp(index->table));
1869 	/* The insert buffer tree should never be updated in place. */
1870 	ut_ad(!dict_index_is_ibuf(index));
1871 
1872 	trx = thr_get_trx(thr);
1873 	offsets = rec_get_offsets(rec, index, offsets, ULINT_UNDEFINED, &heap);
1874 #ifdef UNIV_DEBUG
1875 	if (btr_cur_print_record_ops && thr) {
1876 		btr_cur_trx_report(trx, index, "update ");
1877 		rec_print_new(stderr, rec, offsets);
1878 	}
1879 #endif /* UNIV_DEBUG */
1880 
1881 	block = btr_cur_get_block(cursor);
1882 	page_zip = buf_block_get_page_zip(block);
1883 
1884 	/* Check that enough space is available on the compressed page. */
1885 	if (page_zip
1886 	    && !btr_cur_update_alloc_zip(page_zip, block, index,
1887 					 rec_offs_size(offsets), FALSE, mtr)) {
1888 		return(DB_ZIP_OVERFLOW);
1889 	}
1890 
1891 	/* Do lock checking and undo logging */
1892 	err = btr_cur_upd_lock_and_undo(flags, cursor, update, cmpl_info,
1893 					thr, mtr, &roll_ptr);
1894 	if (UNIV_UNLIKELY(err != DB_SUCCESS)) {
1895 
1896 		if (UNIV_LIKELY_NULL(heap)) {
1897 			mem_heap_free(heap);
1898 		}
1899 		return(err);
1900 	}
1901 
1902 	if (!(flags & BTR_KEEP_SYS_FLAG)) {
1903 		row_upd_rec_sys_fields(rec, NULL,
1904 				       index, offsets, trx, roll_ptr);
1905 	}
1906 
1907 	was_delete_marked = rec_get_deleted_flag(
1908 		rec, page_is_comp(buf_block_get_frame(block)));
1909 
1910 	is_hashed = (block->index != NULL);
1911 
1912 	if (is_hashed) {
1913 		/* TO DO: Can we skip this if none of the fields
1914 		index->search_info->curr_n_fields
1915 		are being updated? */
1916 
1917 		/* The function row_upd_changes_ord_field_binary works only
1918 		if the update vector was built for a clustered index, we must
1919 		NOT call it if index is secondary */
1920 
1921 		if (!dict_index_is_clust(index)
1922 		    || row_upd_changes_ord_field_binary(index, update, thr,
1923 							NULL, NULL)) {
1924 
1925 			/* Remove possible hash index pointer to this record */
1926 			btr_search_update_hash_on_delete(cursor);
1927 		}
1928 
1929 		rw_lock_x_lock(&btr_search_latch);
1930 	}
1931 
1932 	row_upd_rec_in_place(rec, index, offsets, update, page_zip);
1933 
1934 	if (is_hashed) {
1935 		rw_lock_x_unlock(&btr_search_latch);
1936 	}
1937 
1938 	if (page_zip && !dict_index_is_clust(index)
1939 	    && page_is_leaf(buf_block_get_frame(block))) {
1940 		/* Update the free bits in the insert buffer. */
1941 		ibuf_update_free_bits_zip(block, mtr);
1942 	}
1943 
1944 	btr_cur_update_in_place_log(flags, rec, index, update,
1945 				    trx, roll_ptr, mtr);
1946 
1947 	if (was_delete_marked
1948 	    && !rec_get_deleted_flag(rec, page_is_comp(
1949 					     buf_block_get_frame(block)))) {
1950 		/* The new updated record owns its possible externally
1951 		stored fields */
1952 
1953 		btr_cur_unmark_extern_fields(page_zip,
1954 					     rec, index, offsets, mtr);
1955 	}
1956 
1957 	if (UNIV_LIKELY_NULL(heap)) {
1958 		mem_heap_free(heap);
1959 	}
1960 	return(DB_SUCCESS);
1961 }
1962 
1963 /*************************************************************//**
1964 Tries to update a record on a page in an index tree. It is assumed that mtr
1965 holds an x-latch on the page. The operation does not succeed if there is too
1966 little space on the page or if the update would result in too empty a page,
1967 so that tree compression is recommended. We assume here that the ordering
1968 fields of the record do not change.
1969 @return DB_SUCCESS, or DB_OVERFLOW if the updated record does not fit,
1970 DB_UNDERFLOW if the page would become too empty, or DB_ZIP_OVERFLOW if
1971 there is not enough space left on the compressed page */
1972 UNIV_INTERN
1973 ulint
btr_cur_optimistic_update(ulint flags,btr_cur_t * cursor,const upd_t * update,ulint cmpl_info,que_thr_t * thr,mtr_t * mtr)1974 btr_cur_optimistic_update(
1975 /*======================*/
1976 	ulint		flags,	/*!< in: undo logging and locking flags */
1977 	btr_cur_t*	cursor,	/*!< in: cursor on the record to update;
1978 				cursor stays valid and positioned on the
1979 				same record */
1980 	const upd_t*	update,	/*!< in: update vector; this must also
1981 				contain trx id and roll ptr fields */
1982 	ulint		cmpl_info,/*!< in: compiler info on secondary index
1983 				updates */
1984 	que_thr_t*	thr,	/*!< in: query thread */
1985 	mtr_t*		mtr)	/*!< in: mtr; must be committed before
1986 				latching any further pages */
1987 {
1988 	dict_index_t*	index;
1989 	page_cur_t*	page_cursor;
1990 	ulint		err;
1991 	buf_block_t*	block;
1992 	page_t*		page;
1993 	page_zip_des_t*	page_zip;
1994 	rec_t*		rec;
1995 	ulint		max_size;
1996 	ulint		new_rec_size;
1997 	ulint		old_rec_size;
1998 	ulint		max_ins_size = 0;
1999 	dtuple_t*	new_entry;
2000 	roll_ptr_t	roll_ptr;
2001 	trx_t*		trx;
2002 	mem_heap_t*	heap;
2003 	ulint		i;
2004 	ulint		n_ext;
2005 	ulint*		offsets;
2006 
2007 	block = btr_cur_get_block(cursor);
2008 	page = buf_block_get_frame(block);
2009 	rec = btr_cur_get_rec(cursor);
2010 	index = cursor->index;
2011 	ut_ad(!!page_rec_is_comp(rec) == dict_table_is_comp(index->table));
2012 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
2013 	/* The insert buffer tree should never be updated in place. */
2014 	ut_ad(!dict_index_is_ibuf(index));
2015 
2016 	heap = mem_heap_create(1024);
2017 	offsets = rec_get_offsets(rec, index, NULL, ULINT_UNDEFINED, &heap);
2018 #ifdef UNIV_BLOB_NULL_DEBUG
2019 	ut_a(!rec_offs_any_null_extern(rec, offsets));
2020 #endif /* UNIV_BLOB_NULL_DEBUG */
2021 
2022 #ifdef UNIV_DEBUG
2023 	if (btr_cur_print_record_ops && thr) {
2024 		btr_cur_trx_report(thr_get_trx(thr), index, "update ");
2025 		rec_print_new(stderr, rec, offsets);
2026 	}
2027 #endif /* UNIV_DEBUG */
2028 
2029 	if (!row_upd_changes_field_size_or_external(index, offsets, update)) {
2030 
2031 		/* The simplest and the most common case: the update does not
2032 		change the size of any field and none of the updated fields is
2033 		externally stored in rec or update, and there is enough space
2034 		on the compressed page to log the update. */
2035 
2036 		mem_heap_free(heap);
2037 		return(btr_cur_update_in_place(flags, cursor, update,
2038 					       cmpl_info, thr, mtr));
2039 	}
2040 
2041 	if (rec_offs_any_extern(offsets)) {
2042 any_extern:
2043 		/* Externally stored fields are treated in pessimistic
2044 		update */
2045 
2046 		mem_heap_free(heap);
2047 		return(DB_OVERFLOW);
2048 	}
2049 
2050 	for (i = 0; i < upd_get_n_fields(update); i++) {
2051 		if (dfield_is_ext(&upd_get_nth_field(update, i)->new_val)) {
2052 
2053 			goto any_extern;
2054 		}
2055 	}
2056 
2057 	page_cursor = btr_cur_get_page_cur(cursor);
2058 
2059 	new_entry = row_rec_to_index_entry(ROW_COPY_DATA, rec, index, offsets,
2060 					   &n_ext, heap);
2061 	/* We checked above that there are no externally stored fields. */
2062 	ut_a(!n_ext);
2063 
2064 	/* The page containing the clustered index record
2065 	corresponding to new_entry is latched in mtr.
2066 	Thus the following call is safe. */
2067 	row_upd_index_replace_new_col_vals_index_pos(new_entry, index, update,
2068 						     FALSE, heap);
2069 	old_rec_size = rec_offs_size(offsets);
2070 	new_rec_size = rec_get_converted_size(index, new_entry, 0);
2071 
2072 	page_zip = buf_block_get_page_zip(block);
2073 #ifdef UNIV_ZIP_DEBUG
2074 	ut_a(!page_zip || page_zip_validate(page_zip, page, index));
2075 #endif /* UNIV_ZIP_DEBUG */
2076 
2077 	if (page_zip
2078 	    && !btr_cur_update_alloc_zip(page_zip, block, index,
2079 					 new_rec_size, TRUE, mtr)) {
2080 		err = DB_ZIP_OVERFLOW;
2081 		goto err_exit;
2082 	}
2083 
2084 	if (UNIV_UNLIKELY(new_rec_size
2085 			  >= (page_get_free_space_of_empty(page_is_comp(page))
2086 			      / 2))) {
2087 
2088 		err = DB_OVERFLOW;
2089 		goto err_exit;
2090 	}
2091 
2092 	if (UNIV_UNLIKELY(page_get_data_size(page)
2093 			  - old_rec_size + new_rec_size
2094 			  < BTR_CUR_PAGE_COMPRESS_LIMIT)) {
2095 
2096 		/* The page would become too empty */
2097 
2098 		err = DB_UNDERFLOW;
2099 		goto err_exit;
2100 	}
2101 
2102 	/* We do not attempt to reorganize if the page is compressed.
2103 	This is because the page may fail to compress after reorganization. */
2104 	max_size = page_zip
2105 		? page_get_max_insert_size(page, 1)
2106 		: (old_rec_size
2107 		   + page_get_max_insert_size_after_reorganize(page, 1));
2108 
2109 	if (!page_zip) {
2110 		max_ins_size = page_get_max_insert_size_after_reorganize(
2111 					page, 1);
2112 	}
2113 
2114 	if (!(((max_size >= BTR_CUR_PAGE_REORGANIZE_LIMIT)
2115 	       && (max_size >= new_rec_size))
2116 	      || (page_get_n_recs(page) <= 1))) {
2117 
2118 		/* There was not enough space, or it did not pay to
2119 		reorganize: for simplicity, we decide what to do assuming a
2120 		reorganization is needed, though it might not be necessary */
2121 
2122 		err = DB_OVERFLOW;
2123 		goto err_exit;
2124 	}
2125 
2126 	/* Do lock checking and undo logging */
2127 	err = btr_cur_upd_lock_and_undo(flags, cursor, update, cmpl_info,
2128 					thr, mtr, &roll_ptr);
2129 	if (err != DB_SUCCESS) {
2130 
2131 		goto err_exit;
2132 	}
2133 
2134 	/* Ok, we may do the replacement. Store on the page infimum the
2135 	explicit locks on rec, before deleting rec (see the comment in
2136 	btr_cur_pessimistic_update). */
2137 
2138 	lock_rec_store_on_page_infimum(block, rec);
2139 
2140 	btr_search_update_hash_on_delete(cursor);
2141 
2142 	/* The call to row_rec_to_index_entry(ROW_COPY_DATA, ...) above
2143 	invokes rec_offs_make_valid() to point to the copied record that
2144 	the fields of new_entry point to.  We have to undo it here. */
2145 	ut_ad(rec_offs_validate(NULL, index, offsets));
2146 	rec_offs_make_valid(page_cur_get_rec(page_cursor), index, offsets);
2147 
2148 	page_cur_delete_rec(page_cursor, index, offsets, mtr);
2149 
2150 	page_cur_move_to_prev(page_cursor);
2151 
2152 	trx = thr_get_trx(thr);
2153 
2154 	if (!(flags & BTR_KEEP_SYS_FLAG)) {
2155 		row_upd_index_entry_sys_field(new_entry, index, DATA_ROLL_PTR,
2156 					      roll_ptr);
2157 		row_upd_index_entry_sys_field(new_entry, index, DATA_TRX_ID,
2158 					      trx->id);
2159 	}
2160 
2161 	/* There are no externally stored columns in new_entry */
2162 	rec = btr_cur_insert_if_possible(cursor, new_entry, 0/*n_ext*/, mtr);
2163 	ut_a(rec); /* <- We calculated above the insert would fit */
2164 
2165 	if (!dict_index_is_clust(index)
2166 	    && page_is_leaf(page)) {
2167 		/* Update the free bits in the insert buffer. */
2168 		if (page_zip) {
2169 			ibuf_update_free_bits_zip(block, mtr);
2170 		} else {
2171 			ibuf_update_free_bits_low(block, max_ins_size, mtr);
2172 		}
2173 	}
2174 
2175 	/* Restore the old explicit lock state on the record */
2176 
2177 	lock_rec_restore_from_page_infimum(block, rec, block);
2178 
2179 	page_cur_move_to_next(page_cursor);
2180 
2181 	err = DB_SUCCESS;
2182 err_exit:
2183 	mem_heap_free(heap);
2184 	return(err);
2185 }
2186 
2187 /*************************************************************//**
2188 If, in a split, a new supremum record was created as the predecessor of the
2189 updated record, the supremum record must inherit exactly the locks on the
2190 updated record. In the split it may have inherited locks from the successor
2191 of the updated record, which is not correct. This function restores the
2192 right locks for the new supremum. */
2193 static
2194 void
btr_cur_pess_upd_restore_supremum(buf_block_t * block,const rec_t * rec,mtr_t * mtr)2195 btr_cur_pess_upd_restore_supremum(
2196 /*==============================*/
2197 	buf_block_t*	block,	/*!< in: buffer block of rec */
2198 	const rec_t*	rec,	/*!< in: updated record */
2199 	mtr_t*		mtr)	/*!< in: mtr */
2200 {
2201 	page_t*		page;
2202 	buf_block_t*	prev_block;
2203 	ulint		space;
2204 	ulint		zip_size;
2205 	ulint		prev_page_no;
2206 
2207 	page = buf_block_get_frame(block);
2208 
2209 	if (page_rec_get_next(page_get_infimum_rec(page)) != rec) {
2210 		/* Updated record is not the first user record on its page */
2211 
2212 		return;
2213 	}
2214 
2215 	space = buf_block_get_space(block);
2216 	zip_size = buf_block_get_zip_size(block);
2217 	prev_page_no = btr_page_get_prev(page, mtr);
2218 
2219 	ut_ad(prev_page_no != FIL_NULL);
2220 	prev_block = buf_page_get_with_no_latch(space, zip_size,
2221 						prev_page_no, mtr);
2222 #ifdef UNIV_BTR_DEBUG
2223 	ut_a(btr_page_get_next(prev_block->frame, mtr)
2224 	     == page_get_page_no(page));
2225 #endif /* UNIV_BTR_DEBUG */
2226 
2227 	/* We must already have an x-latch on prev_block! */
2228 	ut_ad(mtr_memo_contains(mtr, prev_block, MTR_MEMO_PAGE_X_FIX));
2229 
2230 	lock_rec_reset_and_inherit_gap_locks(prev_block, block,
2231 					     PAGE_HEAP_NO_SUPREMUM,
2232 					     page_rec_get_heap_no(rec));
2233 }
2234 
2235 /*************************************************************//**
2236 Performs an update of a record on a page of a tree. It is assumed
2237 that mtr holds an x-latch on the tree and on the cursor page. If the
2238 update is made on the leaf level, to avoid deadlocks, mtr must also
2239 own x-latches to brothers of page, if those brothers exist. We assume
2240 here that the ordering fields of the record do not change.
2241 @return	DB_SUCCESS or error code */
2242 UNIV_INTERN
2243 ulint
btr_cur_pessimistic_update(ulint flags,btr_cur_t * cursor,mem_heap_t ** heap,big_rec_t ** big_rec,const upd_t * update,ulint cmpl_info,que_thr_t * thr,mtr_t * mtr)2244 btr_cur_pessimistic_update(
2245 /*=======================*/
2246 	ulint		flags,	/*!< in: undo logging, locking, and rollback
2247 				flags */
2248 	btr_cur_t*	cursor,	/*!< in/out: cursor on the record to update;
2249 				cursor may become invalid if *big_rec == NULL
2250 				|| !(flags & BTR_KEEP_POS_FLAG) */
2251 	mem_heap_t**	heap,	/*!< in/out: pointer to memory heap, or NULL */
2252 	big_rec_t**	big_rec,/*!< out: big rec vector whose fields have to
2253 				be stored externally by the caller, or NULL */
2254 	const upd_t*	update,	/*!< in: update vector; this is allowed also
2255 				contain trx id and roll ptr fields, but
2256 				the values in update vector have no effect */
2257 	ulint		cmpl_info,/*!< in: compiler info on secondary index
2258 				updates */
2259 	que_thr_t*	thr,	/*!< in: query thread */
2260 	mtr_t*		mtr)	/*!< in: mtr; must be committed before
2261 				latching any further pages */
2262 {
2263 	big_rec_t*	big_rec_vec	= NULL;
2264 	big_rec_t*	dummy_big_rec;
2265 	dict_index_t*	index;
2266 	buf_block_t*	block;
2267 	page_t*		page;
2268 	page_zip_des_t*	page_zip;
2269 	rec_t*		rec;
2270 	page_cur_t*	page_cursor;
2271 	dtuple_t*	new_entry;
2272 	ulint		err;
2273 	ulint		optim_err;
2274 	roll_ptr_t	roll_ptr;
2275 	trx_t*		trx;
2276 	ibool		was_first;
2277 	ulint		n_extents	= 0;
2278 	ulint		n_reserved;
2279 	ulint		n_ext;
2280 	ulint*		offsets		= NULL;
2281 	ulint		max_ins_size	= 0;
2282 
2283 	*big_rec = NULL;
2284 
2285 	block = btr_cur_get_block(cursor);
2286 	page = buf_block_get_frame(block);
2287 	page_zip = buf_block_get_page_zip(block);
2288 	rec = btr_cur_get_rec(cursor);
2289 	index = cursor->index;
2290 
2291 	ut_ad(mtr_memo_contains(mtr, dict_index_get_lock(index),
2292 				MTR_MEMO_X_LOCK));
2293 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
2294 #ifdef UNIV_ZIP_DEBUG
2295 	ut_a(!page_zip || page_zip_validate(page_zip, page, index));
2296 #endif /* UNIV_ZIP_DEBUG */
2297 	/* The insert buffer tree should never be updated in place. */
2298 	ut_ad(!dict_index_is_ibuf(index));
2299 
2300 	optim_err = btr_cur_optimistic_update(flags, cursor, update,
2301 					      cmpl_info, thr, mtr);
2302 
2303 	switch (optim_err) {
2304 	case DB_UNDERFLOW:
2305 	case DB_OVERFLOW:
2306 	case DB_ZIP_OVERFLOW:
2307 		break;
2308 	default:
2309 		return(optim_err);
2310 	}
2311 
2312 	/* Do lock checking and undo logging */
2313 	err = btr_cur_upd_lock_and_undo(flags, cursor, update, cmpl_info,
2314 					thr, mtr, &roll_ptr);
2315 	if (err != DB_SUCCESS) {
2316 
2317 		return(err);
2318 	}
2319 
2320 	if (optim_err == DB_OVERFLOW) {
2321 		ulint	reserve_flag;
2322 
2323 		/* First reserve enough free space for the file segments
2324 		of the index tree, so that the update will not fail because
2325 		of lack of space */
2326 
2327 		n_extents = cursor->tree_height / 16 + 3;
2328 
2329 		if (flags & BTR_NO_UNDO_LOG_FLAG) {
2330 			reserve_flag = FSP_CLEANING;
2331 		} else {
2332 			reserve_flag = FSP_NORMAL;
2333 		}
2334 
2335 		if (!fsp_reserve_free_extents(&n_reserved, index->space,
2336 					      n_extents, reserve_flag, mtr)) {
2337 			return(DB_OUT_OF_FILE_SPACE);
2338 		}
2339 	}
2340 
2341 	if (!*heap) {
2342 		*heap = mem_heap_create(1024);
2343 	}
2344 	offsets = rec_get_offsets(rec, index, NULL, ULINT_UNDEFINED, heap);
2345 
2346 	trx = thr_get_trx(thr);
2347 
2348 	new_entry = row_rec_to_index_entry(ROW_COPY_DATA, rec, index, offsets,
2349 					   &n_ext, *heap);
2350 	/* The call to row_rec_to_index_entry(ROW_COPY_DATA, ...) above
2351 	invokes rec_offs_make_valid() to point to the copied record that
2352 	the fields of new_entry point to.  We have to undo it here. */
2353 	ut_ad(rec_offs_validate(NULL, index, offsets));
2354 	rec_offs_make_valid(rec, index, offsets);
2355 
2356 	/* The page containing the clustered index record
2357 	corresponding to new_entry is latched in mtr.  If the
2358 	clustered index record is delete-marked, then its externally
2359 	stored fields cannot have been purged yet, because then the
2360 	purge would also have removed the clustered index record
2361 	itself.  Thus the following call is safe. */
2362 	row_upd_index_replace_new_col_vals_index_pos(new_entry, index, update,
2363 						     FALSE, *heap);
2364 	if (!(flags & BTR_KEEP_SYS_FLAG)) {
2365 		row_upd_index_entry_sys_field(new_entry, index, DATA_ROLL_PTR,
2366 					      roll_ptr);
2367 		row_upd_index_entry_sys_field(new_entry, index, DATA_TRX_ID,
2368 					      trx->id);
2369 	}
2370 
2371 	if ((flags & BTR_NO_UNDO_LOG_FLAG) && rec_offs_any_extern(offsets)) {
2372 		/* We are in a transaction rollback undoing a row
2373 		update: we must free possible externally stored fields
2374 		which got new values in the update, if they are not
2375 		inherited values. They can be inherited if we have
2376 		updated the primary key to another value, and then
2377 		update it back again. */
2378 
2379 		ut_ad(big_rec_vec == NULL);
2380 
2381 		btr_rec_free_updated_extern_fields(
2382 			index, rec, page_zip, offsets, update,
2383 			trx_is_recv(trx) ? RB_RECOVERY : RB_NORMAL, mtr);
2384 	}
2385 
2386 	/* We have to set appropriate extern storage bits in the new
2387 	record to be inserted: we have to remember which fields were such */
2388 
2389 	ut_ad(!page_is_comp(page) || !rec_get_node_ptr_flag(rec));
2390 	ut_ad(rec_offs_validate(rec, index, offsets));
2391 	n_ext += btr_push_update_extern_fields(new_entry, update, *heap);
2392 
2393 	if (page_zip) {
2394 		ut_ad(page_is_comp(page));
2395 		if (page_zip_rec_needs_ext(
2396 			    rec_get_converted_size(index, new_entry, n_ext),
2397 			    TRUE,
2398 			    dict_index_get_n_fields(index),
2399 			    page_zip_get_size(page_zip))) {
2400 
2401 			goto make_external;
2402 		}
2403 	} else if (page_zip_rec_needs_ext(
2404 			   rec_get_converted_size(index, new_entry, n_ext),
2405 			   page_is_comp(page), 0, 0)) {
2406 make_external:
2407 		big_rec_vec = dtuple_convert_big_rec(index, new_entry, &n_ext);
2408 		if (UNIV_UNLIKELY(big_rec_vec == NULL)) {
2409 
2410 			err = DB_TOO_BIG_RECORD;
2411 			goto return_after_reservations;
2412 		}
2413 
2414 		ut_ad(page_is_leaf(page));
2415 		ut_ad(dict_index_is_clust(index));
2416 		ut_ad(flags & BTR_KEEP_POS_FLAG);
2417 	}
2418 
2419 	if (!page_zip) {
2420 		max_ins_size = page_get_max_insert_size_after_reorganize(
2421 					page, 1);
2422 	}
2423 
2424 	/* Store state of explicit locks on rec on the page infimum record,
2425 	before deleting rec. The page infimum acts as a dummy carrier of the
2426 	locks, taking care also of lock releases, before we can move the locks
2427 	back on the actual record. There is a special case: if we are
2428 	inserting on the root page and the insert causes a call of
2429 	btr_root_raise_and_insert. Therefore we cannot in the lock system
2430 	delete the lock structs set on the root page even if the root
2431 	page carries just node pointers. */
2432 
2433 	lock_rec_store_on_page_infimum(block, rec);
2434 
2435 	btr_search_update_hash_on_delete(cursor);
2436 
2437 #ifdef UNIV_ZIP_DEBUG
2438 	ut_a(!page_zip || page_zip_validate(page_zip, page, index));
2439 #endif /* UNIV_ZIP_DEBUG */
2440 	page_cursor = btr_cur_get_page_cur(cursor);
2441 
2442 	page_cur_delete_rec(page_cursor, index, offsets, mtr);
2443 
2444 	page_cur_move_to_prev(page_cursor);
2445 
2446 	rec = btr_cur_insert_if_possible(cursor, new_entry, n_ext, mtr);
2447 
2448 	if (rec) {
2449 		page_cursor->rec = rec;
2450 
2451 		lock_rec_restore_from_page_infimum(btr_cur_get_block(cursor),
2452 						   rec, block);
2453 
2454 		offsets = rec_get_offsets(rec, index, offsets,
2455 					  ULINT_UNDEFINED, heap);
2456 
2457 		if (!rec_get_deleted_flag(rec, rec_offs_comp(offsets))) {
2458 			/* The new inserted record owns its possible externally
2459 			stored fields */
2460 			btr_cur_unmark_extern_fields(page_zip,
2461 						     rec, index, offsets, mtr);
2462 		}
2463 
2464 		btr_cur_compress_if_useful(
2465 			cursor,
2466 			big_rec_vec != NULL && (flags & BTR_KEEP_POS_FLAG),
2467 			mtr);
2468 
2469 		if (!dict_index_is_clust(index)
2470 		    && page_is_leaf(page)) {
2471 			/* Update the free bits in the insert buffer. */
2472 			if (page_zip) {
2473 				ibuf_update_free_bits_zip(block, mtr);
2474 			} else {
2475 				ibuf_update_free_bits_low(block, max_ins_size,
2476 							  mtr);
2477 			}
2478 		}
2479 
2480 		err = DB_SUCCESS;
2481 		goto return_after_reservations;
2482 	} else {
2483 		/* If the page is compressed and it initially
2484 		compresses very well, and there is a subsequent insert
2485 		of a badly-compressing record, it is possible for
2486 		btr_cur_optimistic_update() to return DB_UNDERFLOW and
2487 		btr_cur_insert_if_possible() to return FALSE. */
2488 		ut_a(page_zip || optim_err != DB_UNDERFLOW);
2489 
2490 		/* Out of space: reset the free bits. */
2491 		if (!dict_index_is_clust(index)
2492 		    && page_is_leaf(page)) {
2493 			ibuf_reset_free_bits(block);
2494 		}
2495 	}
2496 
2497 	if (big_rec_vec) {
2498 		ut_ad(page_is_leaf(page));
2499 		ut_ad(dict_index_is_clust(index));
2500 		ut_ad(flags & BTR_KEEP_POS_FLAG);
2501 
2502 		/* btr_page_split_and_insert() in
2503 		btr_cur_pessimistic_insert() invokes
2504 		mtr_memo_release(mtr, index->lock, MTR_MEMO_X_LOCK).
2505 		We must keep the index->lock when we created a
2506 		big_rec, so that row_upd_clust_rec() can store the
2507 		big_rec in the same mini-transaction. */
2508 
2509 		mtr_x_lock(dict_index_get_lock(index), mtr);
2510 	}
2511 
2512 	/* Was the record to be updated positioned as the first user
2513 	record on its page? */
2514 	was_first = page_cur_is_before_first(page_cursor);
2515 
2516 	/* Lock checks and undo logging were already performed by
2517 	btr_cur_upd_lock_and_undo(). We do not try
2518 	btr_cur_optimistic_insert() because
2519 	btr_cur_insert_if_possible() already failed above. */
2520 
2521 	err = btr_cur_pessimistic_insert(BTR_NO_UNDO_LOG_FLAG
2522 					 | BTR_NO_LOCKING_FLAG
2523 					 | BTR_KEEP_SYS_FLAG,
2524 					 cursor, new_entry, &rec,
2525 					 &dummy_big_rec, n_ext, NULL, mtr);
2526 	ut_a(rec);
2527 	ut_a(err == DB_SUCCESS);
2528 	ut_a(dummy_big_rec == NULL);
2529 	page_cursor->rec = rec;
2530 
2531 	if (dict_index_is_sec_or_ibuf(index)) {
2532 		/* Update PAGE_MAX_TRX_ID in the index page header.
2533 		It was not updated by btr_cur_pessimistic_insert()
2534 		because of BTR_NO_LOCKING_FLAG. */
2535 		buf_block_t*	rec_block;
2536 
2537 		rec_block = btr_cur_get_block(cursor);
2538 
2539 		page_update_max_trx_id(rec_block,
2540 				       buf_block_get_page_zip(rec_block),
2541 				       trx->id, mtr);
2542 	}
2543 
2544 	if (!rec_get_deleted_flag(rec, rec_offs_comp(offsets))) {
2545 		/* The new inserted record owns its possible externally
2546 		stored fields */
2547 		buf_block_t*	rec_block = btr_cur_get_block(cursor);
2548 
2549 #ifdef UNIV_ZIP_DEBUG
2550 		ut_a(!page_zip || page_zip_validate(page_zip, page, index));
2551 		page = buf_block_get_frame(rec_block);
2552 #endif /* UNIV_ZIP_DEBUG */
2553 		page_zip = buf_block_get_page_zip(rec_block);
2554 
2555 		offsets = rec_get_offsets(rec, index, offsets,
2556 					  ULINT_UNDEFINED, heap);
2557 		btr_cur_unmark_extern_fields(page_zip,
2558 					     rec, index, offsets, mtr);
2559 	}
2560 
2561 	lock_rec_restore_from_page_infimum(btr_cur_get_block(cursor),
2562 					   rec, block);
2563 
2564 	/* If necessary, restore also the correct lock state for a new,
2565 	preceding supremum record created in a page split. While the old
2566 	record was nonexistent, the supremum might have inherited its locks
2567 	from a wrong record. */
2568 
2569 	if (!was_first) {
2570 		btr_cur_pess_upd_restore_supremum(btr_cur_get_block(cursor),
2571 						  rec, mtr);
2572 	}
2573 
2574 return_after_reservations:
2575 #ifdef UNIV_ZIP_DEBUG
2576 	ut_a(!page_zip || page_zip_validate(page_zip, page, index));
2577 #endif /* UNIV_ZIP_DEBUG */
2578 
2579 	if (n_extents > 0) {
2580 		fil_space_release_free_extents(index->space, n_reserved);
2581 	}
2582 
2583 	*big_rec = big_rec_vec;
2584 
2585 	return(err);
2586 }
2587 
2588 /*==================== B-TREE DELETE MARK AND UNMARK ===============*/
2589 
2590 /****************************************************************//**
2591 Writes the redo log record for delete marking or unmarking of an index
2592 record. */
2593 UNIV_INLINE
2594 void
btr_cur_del_mark_set_clust_rec_log(ulint flags,rec_t * rec,dict_index_t * index,ibool val,trx_t * trx,roll_ptr_t roll_ptr,mtr_t * mtr)2595 btr_cur_del_mark_set_clust_rec_log(
2596 /*===============================*/
2597 	ulint		flags,	/*!< in: flags */
2598 	rec_t*		rec,	/*!< in: record */
2599 	dict_index_t*	index,	/*!< in: index of the record */
2600 	ibool		val,	/*!< in: value to set */
2601 	trx_t*		trx,	/*!< in: deleting transaction */
2602 	roll_ptr_t	roll_ptr,/*!< in: roll ptr to the undo log record */
2603 	mtr_t*		mtr)	/*!< in: mtr */
2604 {
2605 	byte*	log_ptr;
2606 	ut_ad(flags < 256);
2607 	ut_ad(val <= 1);
2608 
2609 	ut_ad(!!page_rec_is_comp(rec) == dict_table_is_comp(index->table));
2610 
2611 	log_ptr = mlog_open_and_write_index(mtr, rec, index,
2612 					    page_rec_is_comp(rec)
2613 					    ? MLOG_COMP_REC_CLUST_DELETE_MARK
2614 					    : MLOG_REC_CLUST_DELETE_MARK,
2615 					    1 + 1 + DATA_ROLL_PTR_LEN
2616 					    + 14 + 2);
2617 
2618 	if (!log_ptr) {
2619 		/* Logging in mtr is switched off during crash recovery */
2620 		return;
2621 	}
2622 
2623 	mach_write_to_1(log_ptr, flags);
2624 	log_ptr++;
2625 	mach_write_to_1(log_ptr, val);
2626 	log_ptr++;
2627 
2628 	log_ptr = row_upd_write_sys_vals_to_log(index, trx, roll_ptr, log_ptr,
2629 						mtr);
2630 	mach_write_to_2(log_ptr, page_offset(rec));
2631 	log_ptr += 2;
2632 
2633 	mlog_close(mtr, log_ptr);
2634 }
2635 #endif /* !UNIV_HOTBACKUP */
2636 
2637 /****************************************************************//**
2638 Parses the redo log record for delete marking or unmarking of a clustered
2639 index record.
2640 @return	end of log record or NULL */
2641 UNIV_INTERN
2642 byte*
btr_cur_parse_del_mark_set_clust_rec(byte * ptr,byte * end_ptr,page_t * page,page_zip_des_t * page_zip,dict_index_t * index)2643 btr_cur_parse_del_mark_set_clust_rec(
2644 /*=================================*/
2645 	byte*		ptr,	/*!< in: buffer */
2646 	byte*		end_ptr,/*!< in: buffer end */
2647 	page_t*		page,	/*!< in/out: page or NULL */
2648 	page_zip_des_t*	page_zip,/*!< in/out: compressed page, or NULL */
2649 	dict_index_t*	index)	/*!< in: index corresponding to page */
2650 {
2651 	ulint		flags;
2652 	ulint		val;
2653 	ulint		pos;
2654 	trx_id_t	trx_id;
2655 	roll_ptr_t	roll_ptr;
2656 	ulint		offset;
2657 	rec_t*		rec;
2658 
2659 	ut_ad(!page
2660 	      || !!page_is_comp(page) == dict_table_is_comp(index->table));
2661 
2662 	if (end_ptr < ptr + 2) {
2663 
2664 		return(NULL);
2665 	}
2666 
2667 	flags = mach_read_from_1(ptr);
2668 	ptr++;
2669 	val = mach_read_from_1(ptr);
2670 	ptr++;
2671 
2672 	ptr = row_upd_parse_sys_vals(ptr, end_ptr, &pos, &trx_id, &roll_ptr);
2673 
2674 	if (ptr == NULL) {
2675 
2676 		return(NULL);
2677 	}
2678 
2679 	if (end_ptr < ptr + 2) {
2680 
2681 		return(NULL);
2682 	}
2683 
2684 	offset = mach_read_from_2(ptr);
2685 	ptr += 2;
2686 
2687 	ut_a(offset <= UNIV_PAGE_SIZE);
2688 
2689 	if (page) {
2690 		rec = page + offset;
2691 
2692 		/* We do not need to reserve btr_search_latch, as the page
2693 		is only being recovered, and there cannot be a hash index to
2694 		it. Besides, these fields are being updated in place
2695 		and the adaptive hash index does not depend on them. */
2696 
2697 		btr_rec_set_deleted_flag(rec, page_zip, val);
2698 
2699 		if (!(flags & BTR_KEEP_SYS_FLAG)) {
2700 			mem_heap_t*	heap		= NULL;
2701 			ulint		offsets_[REC_OFFS_NORMAL_SIZE];
2702 			rec_offs_init(offsets_);
2703 
2704 			row_upd_rec_sys_fields_in_recovery(
2705 				rec, page_zip,
2706 				rec_get_offsets(rec, index, offsets_,
2707 						ULINT_UNDEFINED, &heap),
2708 				pos, trx_id, roll_ptr);
2709 			if (UNIV_LIKELY_NULL(heap)) {
2710 				mem_heap_free(heap);
2711 			}
2712 		}
2713 	}
2714 
2715 	return(ptr);
2716 }
2717 
2718 #ifndef UNIV_HOTBACKUP
2719 /***********************************************************//**
2720 Marks a clustered index record deleted. Writes an undo log record to
2721 undo log on this delete marking. Writes in the trx id field the id
2722 of the deleting transaction, and in the roll ptr field pointer to the
2723 undo log record created.
2724 @return	DB_SUCCESS, DB_LOCK_WAIT, or error number */
2725 UNIV_INTERN
2726 ulint
btr_cur_del_mark_set_clust_rec(ulint flags,buf_block_t * block,rec_t * rec,dict_index_t * index,const ulint * offsets,ibool val,que_thr_t * thr,mtr_t * mtr)2727 btr_cur_del_mark_set_clust_rec(
2728 /*===========================*/
2729 	ulint		flags,	/*!< in: undo logging and locking flags */
2730 	buf_block_t*	block,	/*!< in/out: buffer block of the record */
2731 	rec_t*		rec,	/*!< in/out: record */
2732 	dict_index_t*	index,	/*!< in: clustered index of the record */
2733 	const ulint*	offsets,/*!< in: rec_get_offsets(rec) */
2734 	ibool		val,	/*!< in: value to set */
2735 	que_thr_t*	thr,	/*!< in: query thread */
2736 	mtr_t*		mtr)	/*!< in: mtr */
2737 {
2738 	roll_ptr_t	roll_ptr;
2739 	ulint		err;
2740 	page_zip_des_t*	page_zip;
2741 	trx_t*		trx;
2742 
2743 	ut_ad(dict_index_is_clust(index));
2744 	ut_ad(rec_offs_validate(rec, index, offsets));
2745 	ut_ad(!!page_rec_is_comp(rec) == dict_table_is_comp(index->table));
2746 	ut_ad(buf_block_get_frame(block) == page_align(rec));
2747 	ut_ad(page_is_leaf(page_align(rec)));
2748 
2749 #ifdef UNIV_DEBUG
2750 	if (btr_cur_print_record_ops && thr) {
2751 		btr_cur_trx_report(thr_get_trx(thr), index, "del mark ");
2752 		rec_print_new(stderr, rec, offsets);
2753 	}
2754 #endif /* UNIV_DEBUG */
2755 
2756 	ut_ad(dict_index_is_clust(index));
2757 	ut_ad(!rec_get_deleted_flag(rec, rec_offs_comp(offsets)));
2758 
2759 	err = lock_clust_rec_modify_check_and_lock(flags, block,
2760 						   rec, index, offsets, thr);
2761 
2762 	if (err != DB_SUCCESS) {
2763 
2764 		return(err);
2765 	}
2766 
2767 	err = trx_undo_report_row_operation(flags, TRX_UNDO_MODIFY_OP, thr,
2768 					    index, NULL, NULL, 0, rec,
2769 					    &roll_ptr);
2770 	if (err != DB_SUCCESS) {
2771 
2772 		return(err);
2773 	}
2774 
2775 	/* The btr_search_latch is not needed here, because
2776 	the adaptive hash index does not depend on the delete-mark
2777 	and the delete-mark is being updated in place. */
2778 
2779 	page_zip = buf_block_get_page_zip(block);
2780 
2781 	btr_blob_dbg_set_deleted_flag(rec, index, offsets, val);
2782 	btr_rec_set_deleted_flag(rec, page_zip, val);
2783 
2784 	trx = thr_get_trx(thr);
2785 
2786 	if (!(flags & BTR_KEEP_SYS_FLAG)) {
2787 		row_upd_rec_sys_fields(rec, page_zip,
2788 				       index, offsets, trx, roll_ptr);
2789 	}
2790 
2791 	btr_cur_del_mark_set_clust_rec_log(flags, rec, index, val, trx,
2792 					   roll_ptr, mtr);
2793 
2794 	return(err);
2795 }
2796 
2797 /****************************************************************//**
2798 Writes the redo log record for a delete mark setting of a secondary
2799 index record. */
2800 UNIV_INLINE
2801 void
btr_cur_del_mark_set_sec_rec_log(rec_t * rec,ibool val,mtr_t * mtr)2802 btr_cur_del_mark_set_sec_rec_log(
2803 /*=============================*/
2804 	rec_t*		rec,	/*!< in: record */
2805 	ibool		val,	/*!< in: value to set */
2806 	mtr_t*		mtr)	/*!< in: mtr */
2807 {
2808 	byte*	log_ptr;
2809 	ut_ad(val <= 1);
2810 
2811 	log_ptr = mlog_open(mtr, 11 + 1 + 2);
2812 
2813 	if (!log_ptr) {
2814 		/* Logging in mtr is switched off during crash recovery:
2815 		in that case mlog_open returns NULL */
2816 		return;
2817 	}
2818 
2819 	log_ptr = mlog_write_initial_log_record_fast(
2820 		rec, MLOG_REC_SEC_DELETE_MARK, log_ptr, mtr);
2821 	mach_write_to_1(log_ptr, val);
2822 	log_ptr++;
2823 
2824 	mach_write_to_2(log_ptr, page_offset(rec));
2825 	log_ptr += 2;
2826 
2827 	mlog_close(mtr, log_ptr);
2828 }
2829 #endif /* !UNIV_HOTBACKUP */
2830 
2831 /****************************************************************//**
2832 Parses the redo log record for delete marking or unmarking of a secondary
2833 index record.
2834 @return	end of log record or NULL */
2835 UNIV_INTERN
2836 byte*
btr_cur_parse_del_mark_set_sec_rec(byte * ptr,byte * end_ptr,page_t * page,page_zip_des_t * page_zip)2837 btr_cur_parse_del_mark_set_sec_rec(
2838 /*===============================*/
2839 	byte*		ptr,	/*!< in: buffer */
2840 	byte*		end_ptr,/*!< in: buffer end */
2841 	page_t*		page,	/*!< in/out: page or NULL */
2842 	page_zip_des_t*	page_zip)/*!< in/out: compressed page, or NULL */
2843 {
2844 	ulint	val;
2845 	ulint	offset;
2846 	rec_t*	rec;
2847 
2848 	if (end_ptr < ptr + 3) {
2849 
2850 		return(NULL);
2851 	}
2852 
2853 	val = mach_read_from_1(ptr);
2854 	ptr++;
2855 
2856 	offset = mach_read_from_2(ptr);
2857 	ptr += 2;
2858 
2859 	ut_a(offset <= UNIV_PAGE_SIZE);
2860 
2861 	if (page) {
2862 		rec = page + offset;
2863 
2864 		/* We do not need to reserve btr_search_latch, as the page
2865 		is only being recovered, and there cannot be a hash index to
2866 		it. Besides, the delete-mark flag is being updated in place
2867 		and the adaptive hash index does not depend on it. */
2868 
2869 		btr_rec_set_deleted_flag(rec, page_zip, val);
2870 	}
2871 
2872 	return(ptr);
2873 }
2874 
2875 #ifndef UNIV_HOTBACKUP
2876 /***********************************************************//**
2877 Sets a secondary index record delete mark to TRUE or FALSE.
2878 @return	DB_SUCCESS, DB_LOCK_WAIT, or error number */
2879 UNIV_INTERN
2880 ulint
btr_cur_del_mark_set_sec_rec(ulint flags,btr_cur_t * cursor,ibool val,que_thr_t * thr,mtr_t * mtr)2881 btr_cur_del_mark_set_sec_rec(
2882 /*=========================*/
2883 	ulint		flags,	/*!< in: locking flag */
2884 	btr_cur_t*	cursor,	/*!< in: cursor */
2885 	ibool		val,	/*!< in: value to set */
2886 	que_thr_t*	thr,	/*!< in: query thread */
2887 	mtr_t*		mtr)	/*!< in: mtr */
2888 {
2889 	buf_block_t*	block;
2890 	rec_t*		rec;
2891 	ulint		err;
2892 
2893 	block = btr_cur_get_block(cursor);
2894 	rec = btr_cur_get_rec(cursor);
2895 
2896 #ifdef UNIV_DEBUG
2897 	if (btr_cur_print_record_ops && thr) {
2898 		btr_cur_trx_report(thr_get_trx(thr), cursor->index,
2899 				   "del mark ");
2900 		rec_print(stderr, rec, cursor->index);
2901 	}
2902 #endif /* UNIV_DEBUG */
2903 
2904 	err = lock_sec_rec_modify_check_and_lock(flags,
2905 						 btr_cur_get_block(cursor),
2906 						 rec, cursor->index, thr, mtr);
2907 	if (err != DB_SUCCESS) {
2908 
2909 		return(err);
2910 	}
2911 
2912 	ut_ad(!!page_rec_is_comp(rec)
2913 	      == dict_table_is_comp(cursor->index->table));
2914 
2915 	/* We do not need to reserve btr_search_latch, as the
2916 	delete-mark flag is being updated in place and the adaptive
2917 	hash index does not depend on it. */
2918 	btr_rec_set_deleted_flag(rec, buf_block_get_page_zip(block), val);
2919 
2920 	btr_cur_del_mark_set_sec_rec_log(rec, val, mtr);
2921 
2922 	return(DB_SUCCESS);
2923 }
2924 
2925 /***********************************************************//**
2926 Sets a secondary index record's delete mark to the given value. This
2927 function is only used by the insert buffer merge mechanism. */
2928 UNIV_INTERN
2929 void
btr_cur_set_deleted_flag_for_ibuf(rec_t * rec,page_zip_des_t * page_zip,ibool val,mtr_t * mtr)2930 btr_cur_set_deleted_flag_for_ibuf(
2931 /*==============================*/
2932 	rec_t*		rec,		/*!< in/out: record */
2933 	page_zip_des_t*	page_zip,	/*!< in/out: compressed page
2934 					corresponding to rec, or NULL
2935 					when the tablespace is
2936 					uncompressed */
2937 	ibool		val,		/*!< in: value to set */
2938 	mtr_t*		mtr)		/*!< in/out: mini-transaction */
2939 {
2940 	/* We do not need to reserve btr_search_latch, as the page
2941 	has just been read to the buffer pool and there cannot be
2942 	a hash index to it.  Besides, the delete-mark flag is being
2943 	updated in place and the adaptive hash index does not depend
2944 	on it. */
2945 
2946 	btr_rec_set_deleted_flag(rec, page_zip, val);
2947 
2948 	btr_cur_del_mark_set_sec_rec_log(rec, val, mtr);
2949 }
2950 
2951 /*==================== B-TREE RECORD REMOVE =========================*/
2952 
2953 /*************************************************************//**
2954 Tries to compress a page of the tree if it seems useful. It is assumed
2955 that mtr holds an x-latch on the tree and on the cursor page. To avoid
2956 deadlocks, mtr must also own x-latches to brothers of page, if those
2957 brothers exist. NOTE: it is assumed that the caller has reserved enough
2958 free extents so that the compression will always succeed if done!
2959 @return	TRUE if compression occurred */
2960 UNIV_INTERN
2961 ibool
btr_cur_compress_if_useful(btr_cur_t * cursor,ibool adjust,mtr_t * mtr)2962 btr_cur_compress_if_useful(
2963 /*=======================*/
2964 	btr_cur_t*	cursor,	/*!< in/out: cursor on the page to compress;
2965 				cursor does not stay valid if !adjust and
2966 				compression occurs */
2967 	ibool		adjust,	/*!< in: TRUE if should adjust the
2968 				cursor position even if compression occurs */
2969 	mtr_t*		mtr)	/*!< in/out: mini-transaction */
2970 {
2971 	ut_ad(mtr_memo_contains(mtr,
2972 				dict_index_get_lock(btr_cur_get_index(cursor)),
2973 				MTR_MEMO_X_LOCK));
2974 	ut_ad(mtr_memo_contains(mtr, btr_cur_get_block(cursor),
2975 				MTR_MEMO_PAGE_X_FIX));
2976 
2977 	return(btr_cur_compress_recommendation(cursor, mtr)
2978 	       && btr_compress(cursor, adjust, mtr));
2979 }
2980 
2981 /*******************************************************//**
2982 Removes the record on which the tree cursor is positioned on a leaf page.
2983 It is assumed that the mtr has an x-latch on the page where the cursor is
2984 positioned, but no latch on the whole tree.
2985 @return	TRUE if success, i.e., the page did not become too empty */
2986 UNIV_INTERN
2987 ibool
btr_cur_optimistic_delete(btr_cur_t * cursor,mtr_t * mtr)2988 btr_cur_optimistic_delete(
2989 /*======================*/
2990 	btr_cur_t*	cursor,	/*!< in: cursor on leaf page, on the record to
2991 				delete; cursor stays valid: if deletion
2992 				succeeds, on function exit it points to the
2993 				successor of the deleted record */
2994 	mtr_t*		mtr)	/*!< in: mtr; if this function returns
2995 				TRUE on a leaf page of a secondary
2996 				index, the mtr must be committed
2997 				before latching any further pages */
2998 {
2999 	buf_block_t*	block;
3000 	rec_t*		rec;
3001 	mem_heap_t*	heap		= NULL;
3002 	ulint		offsets_[REC_OFFS_NORMAL_SIZE];
3003 	ulint*		offsets		= offsets_;
3004 	ibool		no_compress_needed;
3005 	rec_offs_init(offsets_);
3006 
3007 	ut_ad(mtr_memo_contains(mtr, btr_cur_get_block(cursor),
3008 				MTR_MEMO_PAGE_X_FIX));
3009 	/* This is intended only for leaf page deletions */
3010 
3011 	block = btr_cur_get_block(cursor);
3012 
3013 	ut_ad(page_is_leaf(buf_block_get_frame(block)));
3014 
3015 	rec = btr_cur_get_rec(cursor);
3016 	offsets = rec_get_offsets(rec, cursor->index, offsets,
3017 				  ULINT_UNDEFINED, &heap);
3018 
3019 	no_compress_needed = !rec_offs_any_extern(offsets)
3020 		&& btr_cur_can_delete_without_compress(
3021 			cursor, rec_offs_size(offsets), mtr);
3022 
3023 	if (no_compress_needed) {
3024 
3025 		page_t*		page	= buf_block_get_frame(block);
3026 		page_zip_des_t*	page_zip= buf_block_get_page_zip(block);
3027 		ulint		max_ins	= 0;
3028 
3029 		lock_update_delete(block, rec);
3030 
3031 		btr_search_update_hash_on_delete(cursor);
3032 
3033 		if (!page_zip) {
3034 			max_ins = page_get_max_insert_size_after_reorganize(
3035 				page, 1);
3036 		}
3037 #ifdef UNIV_ZIP_DEBUG
3038 		ut_a(!page_zip
3039 		     || page_zip_validate(page_zip, page, cursor->index));
3040 #endif /* UNIV_ZIP_DEBUG */
3041 		page_cur_delete_rec(btr_cur_get_page_cur(cursor),
3042 				    cursor->index, offsets, mtr);
3043 #ifdef UNIV_ZIP_DEBUG
3044 		ut_a(!page_zip
3045 		     || page_zip_validate(page_zip, page, cursor->index));
3046 #endif /* UNIV_ZIP_DEBUG */
3047 
3048 		if (dict_index_is_clust(cursor->index)
3049 		    || dict_index_is_ibuf(cursor->index)
3050 		    || !page_is_leaf(page)) {
3051 			/* The insert buffer does not handle
3052 			inserts to clustered indexes, to
3053 			non-leaf pages of secondary index B-trees,
3054 			or to the insert buffer. */
3055 		} else if (page_zip) {
3056 			ibuf_update_free_bits_zip(block, mtr);
3057 		} else {
3058 			ibuf_update_free_bits_low(block, max_ins, mtr);
3059 		}
3060 	}
3061 
3062 	if (UNIV_LIKELY_NULL(heap)) {
3063 		mem_heap_free(heap);
3064 	}
3065 
3066 	return(no_compress_needed);
3067 }
3068 
3069 /*************************************************************//**
3070 Removes the record on which the tree cursor is positioned. Tries
3071 to compress the page if its fillfactor drops below a threshold
3072 or if it is the only page on the level. It is assumed that mtr holds
3073 an x-latch on the tree and on the cursor page. To avoid deadlocks,
3074 mtr must also own x-latches to brothers of page, if those brothers
3075 exist.
3076 @return	TRUE if compression occurred */
3077 UNIV_INTERN
3078 ibool
btr_cur_pessimistic_delete(ulint * err,ibool has_reserved_extents,btr_cur_t * cursor,enum trx_rb_ctx rb_ctx,mtr_t * mtr)3079 btr_cur_pessimistic_delete(
3080 /*=======================*/
3081 	ulint*		err,	/*!< out: DB_SUCCESS or DB_OUT_OF_FILE_SPACE;
3082 				the latter may occur because we may have
3083 				to update node pointers on upper levels,
3084 				and in the case of variable length keys
3085 				these may actually grow in size */
3086 	ibool		has_reserved_extents, /*!< in: TRUE if the
3087 				caller has already reserved enough free
3088 				extents so that he knows that the operation
3089 				will succeed */
3090 	btr_cur_t*	cursor,	/*!< in: cursor on the record to delete;
3091 				if compression does not occur, the cursor
3092 				stays valid: it points to successor of
3093 				deleted record on function exit */
3094 	enum trx_rb_ctx	rb_ctx,	/*!< in: rollback context */
3095 	mtr_t*		mtr)	/*!< in: mtr */
3096 {
3097 	buf_block_t*	block;
3098 	page_t*		page;
3099 	page_zip_des_t*	page_zip;
3100 	dict_index_t*	index;
3101 	rec_t*		rec;
3102 	dtuple_t*	node_ptr;
3103 	ulint		n_extents	= 0;
3104 	ulint		n_reserved;
3105 	ibool		success;
3106 	ibool		ret		= FALSE;
3107 	ulint		level;
3108 	mem_heap_t*	heap;
3109 	ulint*		offsets;
3110 
3111 	block = btr_cur_get_block(cursor);
3112 	page = buf_block_get_frame(block);
3113 	index = btr_cur_get_index(cursor);
3114 
3115 	ut_ad(mtr_memo_contains(mtr, dict_index_get_lock(index),
3116 				MTR_MEMO_X_LOCK));
3117 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
3118 	if (!has_reserved_extents) {
3119 		/* First reserve enough free space for the file segments
3120 		of the index tree, so that the node pointer updates will
3121 		not fail because of lack of space */
3122 
3123 		n_extents = cursor->tree_height / 32 + 1;
3124 
3125 		success = fsp_reserve_free_extents(&n_reserved,
3126 						   index->space,
3127 						   n_extents,
3128 						   FSP_CLEANING, mtr);
3129 		if (!success) {
3130 			*err = DB_OUT_OF_FILE_SPACE;
3131 
3132 			return(FALSE);
3133 		}
3134 	}
3135 
3136 	heap = mem_heap_create(1024);
3137 	rec = btr_cur_get_rec(cursor);
3138 	page_zip = buf_block_get_page_zip(block);
3139 #ifdef UNIV_ZIP_DEBUG
3140 	ut_a(!page_zip || page_zip_validate(page_zip, page, index));
3141 #endif /* UNIV_ZIP_DEBUG */
3142 
3143 	offsets = rec_get_offsets(rec, index, NULL, ULINT_UNDEFINED, &heap);
3144 
3145 	if (rec_offs_any_extern(offsets)) {
3146 		btr_rec_free_externally_stored_fields(index,
3147 						      rec, offsets, page_zip,
3148 						      rb_ctx, mtr);
3149 #ifdef UNIV_ZIP_DEBUG
3150 		ut_a(!page_zip || page_zip_validate(page_zip, page, index));
3151 #endif /* UNIV_ZIP_DEBUG */
3152 	}
3153 
3154 	if (UNIV_UNLIKELY(page_get_n_recs(page) < 2)
3155 	    && UNIV_UNLIKELY(dict_index_get_page(index)
3156 			     != buf_block_get_page_no(block))) {
3157 
3158 		/* If there is only one record, drop the whole page in
3159 		btr_discard_page, if this is not the root page */
3160 
3161 		btr_discard_page(cursor, mtr);
3162 
3163 		*err = DB_SUCCESS;
3164 		ret = TRUE;
3165 
3166 		goto return_after_reservations;
3167 	}
3168 
3169 	lock_update_delete(block, rec);
3170 	level = btr_page_get_level(page, mtr);
3171 
3172 	if (level > 0
3173 	    && UNIV_UNLIKELY(rec == page_rec_get_next(
3174 				     page_get_infimum_rec(page)))) {
3175 
3176 		rec_t*	next_rec = page_rec_get_next(rec);
3177 
3178 		if (btr_page_get_prev(page, mtr) == FIL_NULL) {
3179 
3180 			/* If we delete the leftmost node pointer on a
3181 			non-leaf level, we must mark the new leftmost node
3182 			pointer as the predefined minimum record */
3183 
3184 			/* This will make page_zip_validate() fail until
3185 			page_cur_delete_rec() completes.  This is harmless,
3186 			because everything will take place within a single
3187 			mini-transaction and because writing to the redo log
3188 			is an atomic operation (performed by mtr_commit()). */
3189 			btr_set_min_rec_mark(next_rec, mtr);
3190 		} else {
3191 			/* Otherwise, if we delete the leftmost node pointer
3192 			on a page, we have to change the father node pointer
3193 			so that it is equal to the new leftmost node pointer
3194 			on the page */
3195 
3196 			btr_node_ptr_delete(index, block, mtr);
3197 
3198 			node_ptr = dict_index_build_node_ptr(
3199 				index, next_rec, buf_block_get_page_no(block),
3200 				heap, level);
3201 
3202 			btr_insert_on_non_leaf_level(index,
3203 						     level + 1, node_ptr, mtr);
3204 		}
3205 	}
3206 
3207 	btr_search_update_hash_on_delete(cursor);
3208 
3209 	page_cur_delete_rec(btr_cur_get_page_cur(cursor), index, offsets, mtr);
3210 #ifdef UNIV_ZIP_DEBUG
3211 	ut_a(!page_zip || page_zip_validate(page_zip, page, index));
3212 #endif /* UNIV_ZIP_DEBUG */
3213 
3214 	ut_ad(btr_check_node_ptr(index, block, mtr));
3215 
3216 	*err = DB_SUCCESS;
3217 
3218 return_after_reservations:
3219 	mem_heap_free(heap);
3220 
3221 	if (ret == FALSE) {
3222 		ret = btr_cur_compress_if_useful(cursor, FALSE, mtr);
3223 	}
3224 
3225 	if (n_extents > 0) {
3226 		fil_space_release_free_extents(index->space, n_reserved);
3227 	}
3228 
3229 	return(ret);
3230 }
3231 
3232 /*******************************************************************//**
3233 Adds path information to the cursor for the current page, for which
3234 the binary search has been performed. */
3235 static
3236 void
btr_cur_add_path_info(btr_cur_t * cursor,ulint height,ulint root_height)3237 btr_cur_add_path_info(
3238 /*==================*/
3239 	btr_cur_t*	cursor,		/*!< in: cursor positioned on a page */
3240 	ulint		height,		/*!< in: height of the page in tree;
3241 					0 means leaf node */
3242 	ulint		root_height)	/*!< in: root node height in tree */
3243 {
3244 	btr_path_t*	slot;
3245 	rec_t*		rec;
3246 	page_t*		page;
3247 
3248 	ut_a(cursor->path_arr);
3249 
3250 	if (root_height >= BTR_PATH_ARRAY_N_SLOTS - 1) {
3251 		/* Do nothing; return empty path */
3252 
3253 		slot = cursor->path_arr;
3254 		slot->nth_rec = ULINT_UNDEFINED;
3255 
3256 		return;
3257 	}
3258 
3259 	if (height == 0) {
3260 		/* Mark end of slots for path */
3261 		slot = cursor->path_arr + root_height + 1;
3262 		slot->nth_rec = ULINT_UNDEFINED;
3263 	}
3264 
3265 	rec = btr_cur_get_rec(cursor);
3266 
3267 	slot = cursor->path_arr + (root_height - height);
3268 
3269 	page = page_align(rec);
3270 
3271 	slot->nth_rec = page_rec_get_n_recs_before(rec);
3272 	slot->n_recs = page_get_n_recs(page);
3273 	slot->page_no = page_get_page_no(page);
3274 	slot->page_level = btr_page_get_level_low(page);
3275 }
3276 
3277 /*******************************************************************//**
3278 Estimate the number of rows between slot1 and slot2 for any level on a
3279 B-tree. This function starts from slot1->page and reads a few pages to
3280 the right, counting their records. If we reach slot2->page quickly then
3281 we know exactly how many records there are between slot1 and slot2 and
3282 we set is_n_rows_exact to TRUE. If we cannot reach slot2->page quickly
3283 then we calculate the average number of records in the pages scanned
3284 so far and assume that all pages that we did not scan up to slot2->page
3285 contain the same number of records, then we multiply that average to
3286 the number of pages between slot1->page and slot2->page (which is
3287 n_rows_on_prev_level). In this case we set is_n_rows_exact to FALSE.
3288 @return	number of rows (exact or estimated) */
3289 static
3290 ib_int64_t
btr_estimate_n_rows_in_range_on_level(dict_index_t * index,btr_path_t * slot1,btr_path_t * slot2,ib_int64_t n_rows_on_prev_level,ibool * is_n_rows_exact)3291 btr_estimate_n_rows_in_range_on_level(
3292 /*==================================*/
3293 	dict_index_t*	index,			/*!< in: index */
3294 	btr_path_t*	slot1,			/*!< in: left border */
3295 	btr_path_t*	slot2,			/*!< in: right border */
3296 	ib_int64_t	n_rows_on_prev_level,	/*!< in: number of rows
3297 						on the previous level for the
3298 						same descend paths; used to
3299 						determine the numbe of pages
3300 						on this level */
3301 	ibool*		is_n_rows_exact)	/*!< out: TRUE if the returned
3302 						value is exact i.e. not an
3303 						estimation */
3304 {
3305 	ulint		space;
3306 	ib_int64_t	n_rows;
3307 	ulint		n_pages_read;
3308 	ulint		page_no;
3309 	ulint		zip_size;
3310 	ulint		level;
3311 
3312 	space = dict_index_get_space(index);
3313 
3314 	n_rows = 0;
3315 	n_pages_read = 0;
3316 
3317 	/* Assume by default that we will scan all pages between
3318 	slot1->page_no and slot2->page_no */
3319 	*is_n_rows_exact = TRUE;
3320 
3321 	/* add records from slot1->page_no which are to the right of
3322 	the record which serves as a left border of the range, if any */
3323 	if (slot1->nth_rec < slot1->n_recs) {
3324 		n_rows += slot1->n_recs - slot1->nth_rec;
3325 	}
3326 
3327 	/* add records from slot2->page_no which are to the left of
3328 	the record which servers as a right border of the range, if any */
3329 	if (slot2->nth_rec > 1) {
3330 		n_rows += slot2->nth_rec - 1;
3331 	}
3332 
3333 	/* count the records in the pages between slot1->page_no and
3334 	slot2->page_no (non inclusive), if any */
3335 
3336 	zip_size = fil_space_get_zip_size(space);
3337 
3338 	/* Do not read more than this number of pages in order not to hurt
3339 	performance with this code which is just an estimation. If we read
3340 	this many pages before reaching slot2->page_no then we estimate the
3341 	average from the pages scanned so far */
3342 #	define N_PAGES_READ_LIMIT	10
3343 
3344 	page_no = slot1->page_no;
3345 	level = slot1->page_level;
3346 
3347 	do {
3348 		mtr_t		mtr;
3349 		page_t*		page;
3350 		buf_block_t*	block;
3351 
3352 		mtr_start(&mtr);
3353 
3354 		/* Fetch the page. Because we are not holding the
3355 		index->lock, the tree may have changed and we may be
3356 		attempting to read a page that is no longer part of
3357 		the B-tree. We pass BUF_GET_POSSIBLY_FREED in order to
3358 		silence a debug assertion about this. */
3359 		block = buf_page_get_gen(space, zip_size, page_no, RW_S_LATCH,
3360 					 NULL, BUF_GET_POSSIBLY_FREED,
3361 					 __FILE__, __LINE__, &mtr);
3362 
3363 		page = buf_block_get_frame(block);
3364 
3365 		/* It is possible that the tree has been reorganized in the
3366 		meantime and this is a different page. If this happens the
3367 		calculated estimate will be bogus, which is not fatal as
3368 		this is only an estimate. We are sure that a page with
3369 		page_no exists because InnoDB never frees pages, only
3370 		reuses them. */
3371 		if (fil_page_get_type(page) != FIL_PAGE_INDEX
3372 		    || btr_page_get_index_id(page) != index->id
3373 		    || btr_page_get_level_low(page) != level) {
3374 
3375 			/* The page got reused for something else */
3376 			mtr_commit(&mtr);
3377 			goto inexact;
3378 		}
3379 
3380 		/* It is possible but highly unlikely that the page was
3381 		originally written by an old version of InnoDB that did
3382 		not initialize FIL_PAGE_TYPE on other than B-tree pages.
3383 		For example, this could be an almost-empty BLOB page
3384 		that happens to contain the magic values in the fields
3385 		that we checked above. */
3386 
3387 		n_pages_read++;
3388 
3389 		if (page_no != slot1->page_no) {
3390 			/* Do not count the records on slot1->page_no,
3391 			we already counted them before this loop. */
3392 			n_rows += page_get_n_recs(page);
3393 		}
3394 
3395 		page_no = btr_page_get_next(page, &mtr);
3396 
3397 		mtr_commit(&mtr);
3398 
3399 		if (n_pages_read == N_PAGES_READ_LIMIT
3400 		    || page_no == FIL_NULL) {
3401 			/* Either we read too many pages or
3402 			we reached the end of the level without passing
3403 			through slot2->page_no, the tree must have changed
3404 			in the meantime */
3405 			goto inexact;
3406 		}
3407 
3408 	} while (page_no != slot2->page_no);
3409 
3410 	return(n_rows);
3411 
3412 inexact:
3413 
3414 	*is_n_rows_exact = FALSE;
3415 
3416 	/* We did interrupt before reaching slot2->page */
3417 
3418 	if (n_pages_read > 0) {
3419 		/* The number of pages on this level is
3420 		n_rows_on_prev_level, multiply it by the
3421 		average number of recs per page so far */
3422 		n_rows = n_rows_on_prev_level
3423 			* n_rows / n_pages_read;
3424 	} else {
3425 		/* The tree changed before we could even
3426 		start with slot1->page_no */
3427 		n_rows = 10;
3428 	}
3429 
3430 	return(n_rows);
3431 }
3432 
3433 /*******************************************************************//**
3434 Estimates the number of rows in a given index range.
3435 @return	estimated number of rows */
3436 UNIV_INTERN
3437 ib_int64_t
btr_estimate_n_rows_in_range(dict_index_t * index,const dtuple_t * tuple1,ulint mode1,const dtuple_t * tuple2,ulint mode2)3438 btr_estimate_n_rows_in_range(
3439 /*=========================*/
3440 	dict_index_t*	index,	/*!< in: index */
3441 	const dtuple_t*	tuple1,	/*!< in: range start, may also be empty tuple */
3442 	ulint		mode1,	/*!< in: search mode for range start */
3443 	const dtuple_t*	tuple2,	/*!< in: range end, may also be empty tuple */
3444 	ulint		mode2)	/*!< in: search mode for range end */
3445 {
3446 	btr_path_t	path1[BTR_PATH_ARRAY_N_SLOTS];
3447 	btr_path_t	path2[BTR_PATH_ARRAY_N_SLOTS];
3448 	btr_cur_t	cursor;
3449 	btr_path_t*	slot1;
3450 	btr_path_t*	slot2;
3451 	ibool		diverged;
3452 	ibool		diverged_lot;
3453 	ulint		divergence_level;
3454 	ib_int64_t	n_rows;
3455 	ibool		is_n_rows_exact;
3456 	ulint		i;
3457 	mtr_t		mtr;
3458 
3459 	mtr_start(&mtr);
3460 
3461 	cursor.path_arr = path1;
3462 
3463 	if (dtuple_get_n_fields(tuple1) > 0) {
3464 
3465 		btr_cur_search_to_nth_level(index, 0, tuple1, mode1,
3466 					    BTR_SEARCH_LEAF | BTR_ESTIMATE,
3467 					    &cursor, 0,
3468 					    __FILE__, __LINE__, &mtr);
3469 	} else {
3470 		btr_cur_open_at_index_side(TRUE, index,
3471 					   BTR_SEARCH_LEAF | BTR_ESTIMATE,
3472 					   &cursor, &mtr);
3473 	}
3474 
3475 	mtr_commit(&mtr);
3476 
3477 	mtr_start(&mtr);
3478 
3479 	cursor.path_arr = path2;
3480 
3481 	if (dtuple_get_n_fields(tuple2) > 0) {
3482 
3483 		btr_cur_search_to_nth_level(index, 0, tuple2, mode2,
3484 					    BTR_SEARCH_LEAF | BTR_ESTIMATE,
3485 					    &cursor, 0,
3486 					    __FILE__, __LINE__, &mtr);
3487 	} else {
3488 		btr_cur_open_at_index_side(FALSE, index,
3489 					   BTR_SEARCH_LEAF | BTR_ESTIMATE,
3490 					   &cursor, &mtr);
3491 	}
3492 
3493 	mtr_commit(&mtr);
3494 
3495 	/* We have the path information for the range in path1 and path2 */
3496 
3497 	n_rows = 1;
3498 	is_n_rows_exact = TRUE;
3499 	diverged = FALSE;	    /* This becomes true when the path is not
3500 				    the same any more */
3501 	diverged_lot = FALSE;	    /* This becomes true when the paths are
3502 				    not the same or adjacent any more */
3503 	divergence_level = 1000000; /* This is the level where paths diverged
3504 				    a lot */
3505 	for (i = 0; ; i++) {
3506 		ut_ad(i < BTR_PATH_ARRAY_N_SLOTS);
3507 
3508 		slot1 = path1 + i;
3509 		slot2 = path2 + i;
3510 
3511 		if (slot1->nth_rec == ULINT_UNDEFINED
3512 		    || slot2->nth_rec == ULINT_UNDEFINED) {
3513 
3514 			if (i > divergence_level + 1 && !is_n_rows_exact) {
3515 				/* In trees whose height is > 1 our algorithm
3516 				tends to underestimate: multiply the estimate
3517 				by 2: */
3518 
3519 				n_rows = n_rows * 2;
3520 			}
3521 
3522 			DBUG_EXECUTE_IF("bug14007649", return(n_rows););
3523 
3524 			/* Do not estimate the number of rows in the range
3525 			to over 1 / 2 of the estimated rows in the whole
3526 			table */
3527 
3528 			if (n_rows > index->table->stat_n_rows / 2
3529 			    && !is_n_rows_exact) {
3530 
3531 				n_rows = index->table->stat_n_rows / 2;
3532 
3533 				/* If there are just 0 or 1 rows in the table,
3534 				then we estimate all rows are in the range */
3535 
3536 				if (n_rows == 0) {
3537 					n_rows = index->table->stat_n_rows;
3538 				}
3539 			}
3540 
3541 			return(n_rows);
3542 		}
3543 
3544 		if (!diverged && slot1->nth_rec != slot2->nth_rec) {
3545 
3546 			diverged = TRUE;
3547 
3548 			if (slot1->nth_rec < slot2->nth_rec) {
3549 				n_rows = slot2->nth_rec - slot1->nth_rec;
3550 
3551 				if (n_rows > 1) {
3552 					diverged_lot = TRUE;
3553 					divergence_level = i;
3554 				}
3555 			} else {
3556 				/* It is possible that
3557 				slot1->nth_rec >= slot2->nth_rec
3558 				if, for example, we have a single page
3559 				tree which contains (inf, 5, 6, supr)
3560 				and we select where x > 20 and x < 30;
3561 				in this case slot1->nth_rec will point
3562 				to the supr record and slot2->nth_rec
3563 				will point to 6 */
3564 				n_rows = 0;
3565 			}
3566 
3567 		} else if (diverged && !diverged_lot) {
3568 
3569 			if (slot1->nth_rec < slot1->n_recs
3570 			    || slot2->nth_rec > 1) {
3571 
3572 				diverged_lot = TRUE;
3573 				divergence_level = i;
3574 
3575 				n_rows = 0;
3576 
3577 				if (slot1->nth_rec < slot1->n_recs) {
3578 					n_rows += slot1->n_recs
3579 						- slot1->nth_rec;
3580 				}
3581 
3582 				if (slot2->nth_rec > 1) {
3583 					n_rows += slot2->nth_rec - 1;
3584 				}
3585 			}
3586 		} else if (diverged_lot) {
3587 
3588 			n_rows = btr_estimate_n_rows_in_range_on_level(
3589 				index, slot1, slot2, n_rows,
3590 				&is_n_rows_exact);
3591 		}
3592 	}
3593 }
3594 
3595 /*******************************************************************//**
3596 Record the number of non_null key values in a given index for
3597 each n-column prefix of the index where n < dict_index_get_n_unique(index).
3598 The estimates are eventually stored in the array:
3599 index->stat_n_non_null_key_vals. */
3600 static
3601 void
btr_record_not_null_field_in_rec(ulint n_unique,const ulint * offsets,ib_int64_t * n_not_null)3602 btr_record_not_null_field_in_rec(
3603 /*=============================*/
3604 	ulint		n_unique,	/*!< in: dict_index_get_n_unique(index),
3605 					number of columns uniquely determine
3606 					an index entry */
3607 	const ulint*	offsets,	/*!< in: rec_get_offsets(rec, index),
3608 					its size could be for all fields or
3609 					that of "n_unique" */
3610 	ib_int64_t*	n_not_null)	/*!< in/out: array to record number of
3611 					not null rows for n-column prefix */
3612 {
3613 	ulint	i;
3614 
3615 	ut_ad(rec_offs_n_fields(offsets) >= n_unique);
3616 
3617 	if (n_not_null == NULL) {
3618 		return;
3619 	}
3620 
3621 	for (i = 0; i < n_unique; i++) {
3622 		if (rec_offs_nth_sql_null(offsets, i)) {
3623 			break;
3624 		}
3625 
3626 		n_not_null[i]++;
3627 	}
3628 }
3629 
3630 /*******************************************************************//**
3631 Estimates the number of different key values in a given index, for
3632 each n-column prefix of the index where n <= dict_index_get_n_unique(index).
3633 The estimates are stored in the array index->stat_n_diff_key_vals.
3634 If innodb_stats_method is "nulls_ignored", we also record the number of
3635 non-null values for each prefix and store the estimates in
3636 array index->stat_n_non_null_key_vals. */
3637 UNIV_INTERN
3638 void
btr_estimate_number_of_different_key_vals(dict_index_t * index)3639 btr_estimate_number_of_different_key_vals(
3640 /*======================================*/
3641 	dict_index_t*	index)	/*!< in: index */
3642 {
3643 	btr_cur_t	cursor;
3644 	page_t*		page;
3645 	rec_t*		rec;
3646 	ulint		n_cols;
3647 	ulint		matched_fields;
3648 	ulint		matched_bytes;
3649 	ib_int64_t*	n_diff;
3650 	ib_int64_t*	n_not_null;
3651 	ibool		stats_null_not_equal;
3652 	ullint		n_sample_pages; /* number of pages to sample */
3653 	ulint		not_empty_flag	= 0;
3654 	ulint		total_external_size = 0;
3655 	ulint		i;
3656 	ulint		j;
3657 	ullint		add_on;
3658 	mtr_t		mtr;
3659 	mem_heap_t*	heap		= NULL;
3660 	ulint*		offsets_rec	= NULL;
3661 	ulint*		offsets_next_rec = NULL;
3662 
3663 	n_cols = dict_index_get_n_unique(index);
3664 
3665 	heap = mem_heap_create((sizeof *n_diff + sizeof *n_not_null)
3666 			       * (n_cols + 1)
3667 			       + dict_index_get_n_fields(index)
3668 			       * (sizeof *offsets_rec
3669 				  + sizeof *offsets_next_rec));
3670 
3671 	n_diff = mem_heap_zalloc(heap, (n_cols + 1) * sizeof(ib_int64_t));
3672 
3673 	n_not_null = NULL;
3674 
3675 	/* Check srv_innodb_stats_method setting, and decide whether we
3676 	need to record non-null value and also decide if NULL is
3677 	considered equal (by setting stats_null_not_equal value) */
3678 	switch (srv_innodb_stats_method) {
3679 	case SRV_STATS_NULLS_IGNORED:
3680 		n_not_null = mem_heap_zalloc(heap, (n_cols + 1)
3681 					     * sizeof *n_not_null);
3682 		/* fall through */
3683 
3684 	case SRV_STATS_NULLS_UNEQUAL:
3685 		/* for both SRV_STATS_NULLS_IGNORED and SRV_STATS_NULLS_UNEQUAL
3686 		case, we will treat NULLs as unequal value */
3687 		stats_null_not_equal = TRUE;
3688 		break;
3689 
3690 	case SRV_STATS_NULLS_EQUAL:
3691 		stats_null_not_equal = FALSE;
3692 		break;
3693 
3694 	default:
3695 		ut_error;
3696         }
3697 
3698 	/* It makes no sense to test more pages than are contained
3699 	in the index, thus we lower the number if it is too high */
3700 	if (srv_stats_sample_pages > index->stat_index_size) {
3701 		if (index->stat_index_size > 0) {
3702 			n_sample_pages = index->stat_index_size;
3703 		} else {
3704 			n_sample_pages = 1;
3705 		}
3706 	} else {
3707 		n_sample_pages = srv_stats_sample_pages;
3708 	}
3709 
3710 	/* We sample some pages in the index to get an estimate */
3711 
3712 	for (i = 0; i < n_sample_pages; i++) {
3713 		mtr_start(&mtr);
3714 
3715 		btr_cur_open_at_rnd_pos(index, BTR_SEARCH_LEAF, &cursor, &mtr);
3716 
3717 		/* Count the number of different key values for each prefix of
3718 		the key on this index page. If the prefix does not determine
3719 		the index record uniquely in the B-tree, then we subtract one
3720 		because otherwise our algorithm would give a wrong estimate
3721 		for an index where there is just one key value. */
3722 
3723 		page = btr_cur_get_page(&cursor);
3724 
3725 		rec = page_rec_get_next(page_get_infimum_rec(page));
3726 
3727 		if (!page_rec_is_supremum(rec)) {
3728 			not_empty_flag = 1;
3729 			offsets_rec = rec_get_offsets(rec, index, offsets_rec,
3730 						      ULINT_UNDEFINED, &heap);
3731 
3732 			if (n_not_null) {
3733 				btr_record_not_null_field_in_rec(
3734 					n_cols, offsets_rec, n_not_null);
3735 			}
3736 		}
3737 
3738 		while (!page_rec_is_supremum(rec)) {
3739 			rec_t*	next_rec = page_rec_get_next(rec);
3740 			if (page_rec_is_supremum(next_rec)) {
3741 				total_external_size +=
3742 					btr_rec_get_externally_stored_len(
3743 						rec, offsets_rec);
3744 				break;
3745 			}
3746 
3747 			matched_fields = 0;
3748 			matched_bytes = 0;
3749 			offsets_next_rec = rec_get_offsets(next_rec, index,
3750 							   offsets_next_rec,
3751 							   ULINT_UNDEFINED,
3752 							   &heap);
3753 
3754 			cmp_rec_rec_with_match(rec, next_rec,
3755 					       offsets_rec, offsets_next_rec,
3756 					       index, stats_null_not_equal,
3757 					       &matched_fields,
3758 					       &matched_bytes);
3759 
3760 			for (j = matched_fields + 1; j <= n_cols; j++) {
3761 				/* We add one if this index record has
3762 				a different prefix from the previous */
3763 
3764 				n_diff[j]++;
3765 			}
3766 
3767 			if (n_not_null) {
3768 				btr_record_not_null_field_in_rec(
3769 					n_cols, offsets_next_rec, n_not_null);
3770 			}
3771 
3772 			total_external_size
3773 				+= btr_rec_get_externally_stored_len(
3774 					rec, offsets_rec);
3775 
3776 			rec = next_rec;
3777 			/* Initialize offsets_rec for the next round
3778 			and assign the old offsets_rec buffer to
3779 			offsets_next_rec. */
3780 			{
3781 				ulint*	offsets_tmp = offsets_rec;
3782 				offsets_rec = offsets_next_rec;
3783 				offsets_next_rec = offsets_tmp;
3784 			}
3785 		}
3786 
3787 
3788 		if (n_cols == dict_index_get_n_unique_in_tree(index)) {
3789 
3790 			/* If there is more than one leaf page in the tree,
3791 			we add one because we know that the first record
3792 			on the page certainly had a different prefix than the
3793 			last record on the previous index page in the
3794 			alphabetical order. Before this fix, if there was
3795 			just one big record on each clustered index page, the
3796 			algorithm grossly underestimated the number of rows
3797 			in the table. */
3798 
3799 			if (btr_page_get_prev(page, &mtr) != FIL_NULL
3800 			    || btr_page_get_next(page, &mtr) != FIL_NULL) {
3801 
3802 				n_diff[n_cols]++;
3803 			}
3804 		}
3805 
3806 		mtr_commit(&mtr);
3807 	}
3808 
3809 	/* If we saw k borders between different key values on
3810 	n_sample_pages leaf pages, we can estimate how many
3811 	there will be in index->stat_n_leaf_pages */
3812 
3813 	/* We must take into account that our sample actually represents
3814 	also the pages used for external storage of fields (those pages are
3815 	included in index->stat_n_leaf_pages) */
3816 
3817 	for (j = 0; j <= n_cols; j++) {
3818 		index->stat_n_diff_key_vals[j]
3819 			= BTR_TABLE_STATS_FROM_SAMPLE(
3820 				n_diff[j], index, n_sample_pages,
3821 				total_external_size, not_empty_flag);
3822 
3823 		/* If the tree is small, smaller than
3824 		10 * n_sample_pages + total_external_size, then
3825 		the above estimate is ok. For bigger trees it is common that we
3826 		do not see any borders between key values in the few pages
3827 		we pick. But still there may be n_sample_pages
3828 		different key values, or even more. Let us try to approximate
3829 		that: */
3830 
3831 		add_on = index->stat_n_leaf_pages
3832 			/ (10 * (n_sample_pages
3833 				 + total_external_size));
3834 
3835 		if (add_on > n_sample_pages) {
3836 			add_on = n_sample_pages;
3837 		}
3838 
3839 		index->stat_n_diff_key_vals[j] += add_on;
3840 
3841 		/* Update the stat_n_non_null_key_vals[] with our
3842 		sampled result. stat_n_non_null_key_vals[] is created
3843 		and initialized to zero in dict_index_add_to_cache(),
3844 		along with stat_n_diff_key_vals[] array */
3845 		if (n_not_null != NULL && (j < n_cols)) {
3846 			index->stat_n_non_null_key_vals[j] =
3847 				 BTR_TABLE_STATS_FROM_SAMPLE(
3848 					n_not_null[j], index, n_sample_pages,
3849 					total_external_size, not_empty_flag);
3850 		}
3851 	}
3852 
3853 	mem_heap_free(heap);
3854 }
3855 
3856 /*================== EXTERNAL STORAGE OF BIG FIELDS ===================*/
3857 
3858 /***********************************************************//**
3859 Gets the offset of the pointer to the externally stored part of a field.
3860 @return	offset of the pointer to the externally stored part */
3861 static
3862 ulint
btr_rec_get_field_ref_offs(const ulint * offsets,ulint n)3863 btr_rec_get_field_ref_offs(
3864 /*=======================*/
3865 	const ulint*	offsets,/*!< in: array returned by rec_get_offsets() */
3866 	ulint		n)	/*!< in: index of the external field */
3867 {
3868 	ulint	field_ref_offs;
3869 	ulint	local_len;
3870 
3871 	ut_a(rec_offs_nth_extern(offsets, n));
3872 	field_ref_offs = rec_get_nth_field_offs(offsets, n, &local_len);
3873 	ut_a(local_len != UNIV_SQL_NULL);
3874 	ut_a(local_len >= BTR_EXTERN_FIELD_REF_SIZE);
3875 
3876 	return(field_ref_offs + local_len - BTR_EXTERN_FIELD_REF_SIZE);
3877 }
3878 
3879 /** Gets a pointer to the externally stored part of a field.
3880 @param rec	record
3881 @param offsets	rec_get_offsets(rec)
3882 @param n	index of the externally stored field
3883 @return pointer to the externally stored part */
3884 #define btr_rec_get_field_ref(rec, offsets, n)			\
3885 	((rec) + btr_rec_get_field_ref_offs(offsets, n))
3886 
3887 /***********************************************************//**
3888 Gets the externally stored size of a record, in units of a database page.
3889 @return	externally stored part, in units of a database page */
3890 static
3891 ulint
btr_rec_get_externally_stored_len(const rec_t * rec,const ulint * offsets)3892 btr_rec_get_externally_stored_len(
3893 /*==============================*/
3894 	const rec_t*	rec,	/*!< in: record */
3895 	const ulint*	offsets)/*!< in: array returned by rec_get_offsets() */
3896 {
3897 	ulint	n_fields;
3898 	ulint	total_extern_len = 0;
3899 	ulint	i;
3900 
3901 	ut_ad(!rec_offs_comp(offsets) || !rec_get_node_ptr_flag(rec));
3902 
3903 	if (!rec_offs_any_extern(offsets)) {
3904 		return(0);
3905 	}
3906 
3907 	n_fields = rec_offs_n_fields(offsets);
3908 
3909 	for (i = 0; i < n_fields; i++) {
3910 		if (rec_offs_nth_extern(offsets, i)) {
3911 
3912 			ulint	extern_len = mach_read_from_4(
3913 				btr_rec_get_field_ref(rec, offsets, i)
3914 				+ BTR_EXTERN_LEN + 4);
3915 
3916 			total_extern_len += ut_calc_align(extern_len,
3917 							  UNIV_PAGE_SIZE);
3918 		}
3919 	}
3920 
3921 	return(total_extern_len / UNIV_PAGE_SIZE);
3922 }
3923 
3924 /*******************************************************************//**
3925 Sets the ownership bit of an externally stored field in a record. */
3926 static
3927 void
btr_cur_set_ownership_of_extern_field(page_zip_des_t * page_zip,rec_t * rec,dict_index_t * index,const ulint * offsets,ulint i,ibool val,mtr_t * mtr)3928 btr_cur_set_ownership_of_extern_field(
3929 /*==================================*/
3930 	page_zip_des_t*	page_zip,/*!< in/out: compressed page whose uncompressed
3931 				part will be updated, or NULL */
3932 	rec_t*		rec,	/*!< in/out: clustered index record */
3933 	dict_index_t*	index,	/*!< in: index of the page */
3934 	const ulint*	offsets,/*!< in: array returned by rec_get_offsets() */
3935 	ulint		i,	/*!< in: field number */
3936 	ibool		val,	/*!< in: value to set */
3937 	mtr_t*		mtr)	/*!< in: mtr, or NULL if not logged */
3938 {
3939 	byte*	data;
3940 	ulint	local_len;
3941 	ulint	byte_val;
3942 
3943 	data = rec_get_nth_field(rec, offsets, i, &local_len);
3944 	ut_ad(rec_offs_nth_extern(offsets, i));
3945 	ut_a(local_len >= BTR_EXTERN_FIELD_REF_SIZE);
3946 
3947 	local_len -= BTR_EXTERN_FIELD_REF_SIZE;
3948 
3949 	byte_val = mach_read_from_1(data + local_len + BTR_EXTERN_LEN);
3950 
3951 	if (val) {
3952 		byte_val = byte_val & (~BTR_EXTERN_OWNER_FLAG);
3953 	} else {
3954 #if defined UNIV_DEBUG || defined UNIV_BLOB_LIGHT_DEBUG
3955 		ut_a(!(byte_val & BTR_EXTERN_OWNER_FLAG));
3956 #endif /* UNIV_DEBUG || UNIV_BLOB_LIGHT_DEBUG */
3957 		byte_val = byte_val | BTR_EXTERN_OWNER_FLAG;
3958 	}
3959 
3960 	if (page_zip) {
3961 		mach_write_to_1(data + local_len + BTR_EXTERN_LEN, byte_val);
3962 		page_zip_write_blob_ptr(page_zip, rec, index, offsets, i, mtr);
3963 	} else if (mtr != NULL) {
3964 
3965 		mlog_write_ulint(data + local_len + BTR_EXTERN_LEN, byte_val,
3966 				 MLOG_1BYTE, mtr);
3967 	} else {
3968 		mach_write_to_1(data + local_len + BTR_EXTERN_LEN, byte_val);
3969 	}
3970 
3971 	btr_blob_dbg_owner(rec, index, offsets, i, val);
3972 }
3973 
3974 /*******************************************************************//**
3975 Marks non-updated off-page fields as disowned by this record. The ownership
3976 must be transferred to the updated record which is inserted elsewhere in the
3977 index tree. In purge only the owner of externally stored field is allowed
3978 to free the field. */
3979 UNIV_INTERN
3980 void
btr_cur_disown_inherited_fields(page_zip_des_t * page_zip,rec_t * rec,dict_index_t * index,const ulint * offsets,const upd_t * update,mtr_t * mtr)3981 btr_cur_disown_inherited_fields(
3982 /*============================*/
3983 	page_zip_des_t*	page_zip,/*!< in/out: compressed page whose uncompressed
3984 				part will be updated, or NULL */
3985 	rec_t*		rec,	/*!< in/out: record in a clustered index */
3986 	dict_index_t*	index,	/*!< in: index of the page */
3987 	const ulint*	offsets,/*!< in: array returned by rec_get_offsets() */
3988 	const upd_t*	update,	/*!< in: update vector */
3989 	mtr_t*		mtr)	/*!< in/out: mini-transaction */
3990 {
3991 	ulint	i;
3992 
3993 	ut_ad(rec_offs_validate(rec, index, offsets));
3994 	ut_ad(!rec_offs_comp(offsets) || !rec_get_node_ptr_flag(rec));
3995 	ut_ad(rec_offs_any_extern(offsets));
3996 	ut_ad(mtr);
3997 
3998 	for (i = 0; i < rec_offs_n_fields(offsets); i++) {
3999 		if (rec_offs_nth_extern(offsets, i)
4000 		    && !upd_get_field_by_field_no(update, i)) {
4001 			btr_cur_set_ownership_of_extern_field(
4002 				page_zip, rec, index, offsets, i, FALSE, mtr);
4003 		}
4004 	}
4005 }
4006 
4007 /*******************************************************************//**
4008 Marks all extern fields in a record as owned by the record. This function
4009 should be called if the delete mark of a record is removed: a not delete
4010 marked record always owns all its extern fields. */
4011 static
4012 void
btr_cur_unmark_extern_fields(page_zip_des_t * page_zip,rec_t * rec,dict_index_t * index,const ulint * offsets,mtr_t * mtr)4013 btr_cur_unmark_extern_fields(
4014 /*=========================*/
4015 	page_zip_des_t*	page_zip,/*!< in/out: compressed page whose uncompressed
4016 				part will be updated, or NULL */
4017 	rec_t*		rec,	/*!< in/out: record in a clustered index */
4018 	dict_index_t*	index,	/*!< in: index of the page */
4019 	const ulint*	offsets,/*!< in: array returned by rec_get_offsets() */
4020 	mtr_t*		mtr)	/*!< in: mtr, or NULL if not logged */
4021 {
4022 	ulint	n;
4023 	ulint	i;
4024 
4025 	ut_ad(!rec_offs_comp(offsets) || !rec_get_node_ptr_flag(rec));
4026 	n = rec_offs_n_fields(offsets);
4027 
4028 	if (!rec_offs_any_extern(offsets)) {
4029 
4030 		return;
4031 	}
4032 
4033 	for (i = 0; i < n; i++) {
4034 		if (rec_offs_nth_extern(offsets, i)) {
4035 
4036 			btr_cur_set_ownership_of_extern_field(
4037 				page_zip, rec, index, offsets, i, TRUE, mtr);
4038 		}
4039 	}
4040 }
4041 
4042 /*******************************************************************//**
4043 Flags the data tuple fields that are marked as extern storage in the
4044 update vector.  We use this function to remember which fields we must
4045 mark as extern storage in a record inserted for an update.
4046 @return	number of flagged external columns */
4047 UNIV_INTERN
4048 ulint
btr_push_update_extern_fields(dtuple_t * tuple,const upd_t * update,mem_heap_t * heap)4049 btr_push_update_extern_fields(
4050 /*==========================*/
4051 	dtuple_t*	tuple,	/*!< in/out: data tuple */
4052 	const upd_t*	update,	/*!< in: update vector */
4053 	mem_heap_t*	heap)	/*!< in: memory heap */
4054 {
4055 	ulint			n_pushed	= 0;
4056 	ulint			n;
4057 	const upd_field_t*	uf;
4058 
4059 	ut_ad(tuple);
4060 	ut_ad(update);
4061 
4062 	uf = update->fields;
4063 	n = upd_get_n_fields(update);
4064 
4065 	for (; n--; uf++) {
4066 		if (dfield_is_ext(&uf->new_val)) {
4067 			dfield_t*	field
4068 				= dtuple_get_nth_field(tuple, uf->field_no);
4069 
4070 			if (!dfield_is_ext(field)) {
4071 				dfield_set_ext(field);
4072 				n_pushed++;
4073 			}
4074 
4075 			switch (uf->orig_len) {
4076 				byte*	data;
4077 				ulint	len;
4078 				byte*	buf;
4079 			case 0:
4080 				break;
4081 			case BTR_EXTERN_FIELD_REF_SIZE:
4082 				/* Restore the original locally stored
4083 				part of the column.  In the undo log,
4084 				InnoDB writes a longer prefix of externally
4085 				stored columns, so that column prefixes
4086 				in secondary indexes can be reconstructed. */
4087 				dfield_set_data(field, (byte*) dfield_get_data(field)
4088 						+ dfield_get_len(field)
4089 						- BTR_EXTERN_FIELD_REF_SIZE,
4090 						BTR_EXTERN_FIELD_REF_SIZE);
4091 				dfield_set_ext(field);
4092 				break;
4093 			default:
4094 				/* Reconstruct the original locally
4095 				stored part of the column.  The data
4096 				will have to be copied. */
4097 				ut_a(uf->orig_len > BTR_EXTERN_FIELD_REF_SIZE);
4098 
4099 				data = dfield_get_data(field);
4100 				len = dfield_get_len(field);
4101 
4102 				buf = mem_heap_alloc(heap, uf->orig_len);
4103 				/* Copy the locally stored prefix. */
4104 				memcpy(buf, data,
4105 				       uf->orig_len
4106 				       - BTR_EXTERN_FIELD_REF_SIZE);
4107 				/* Copy the BLOB pointer. */
4108 				memcpy(buf + uf->orig_len
4109 				       - BTR_EXTERN_FIELD_REF_SIZE,
4110 				       data + len - BTR_EXTERN_FIELD_REF_SIZE,
4111 				       BTR_EXTERN_FIELD_REF_SIZE);
4112 
4113 				dfield_set_data(field, buf, uf->orig_len);
4114 				dfield_set_ext(field);
4115 			}
4116 		}
4117 	}
4118 
4119 	return(n_pushed);
4120 }
4121 
4122 /*******************************************************************//**
4123 Returns the length of a BLOB part stored on the header page.
4124 @return	part length */
4125 static
4126 ulint
btr_blob_get_part_len(const byte * blob_header)4127 btr_blob_get_part_len(
4128 /*==================*/
4129 	const byte*	blob_header)	/*!< in: blob header */
4130 {
4131 	return(mach_read_from_4(blob_header + BTR_BLOB_HDR_PART_LEN));
4132 }
4133 
4134 /*******************************************************************//**
4135 Returns the page number where the next BLOB part is stored.
4136 @return	page number or FIL_NULL if no more pages */
4137 static
4138 ulint
btr_blob_get_next_page_no(const byte * blob_header)4139 btr_blob_get_next_page_no(
4140 /*======================*/
4141 	const byte*	blob_header)	/*!< in: blob header */
4142 {
4143 	return(mach_read_from_4(blob_header + BTR_BLOB_HDR_NEXT_PAGE_NO));
4144 }
4145 
4146 /*******************************************************************//**
4147 Deallocate a buffer block that was reserved for a BLOB part. */
4148 static
4149 void
btr_blob_free(buf_block_t * block,ibool all,mtr_t * mtr)4150 btr_blob_free(
4151 /*==========*/
4152 	buf_block_t*	block,	/*!< in: buffer block */
4153 	ibool		all,	/*!< in: TRUE=remove also the compressed page
4154 				if there is one */
4155 	mtr_t*		mtr)	/*!< in: mini-transaction to commit */
4156 {
4157 	buf_pool_t*	buf_pool = buf_pool_from_block(block);
4158 	ulint		space	= buf_block_get_space(block);
4159 	ulint		page_no	= buf_block_get_page_no(block);
4160 
4161 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
4162 
4163 	mtr_commit(mtr);
4164 
4165 	buf_pool_mutex_enter(buf_pool);
4166 	mutex_enter(&block->mutex);
4167 
4168 	/* Only free the block if it is still allocated to
4169 	the same file page. */
4170 
4171 	if (buf_block_get_state(block)
4172 	    == BUF_BLOCK_FILE_PAGE
4173 	    && buf_block_get_space(block) == space
4174 	    && buf_block_get_page_no(block) == page_no) {
4175 
4176 		if (!buf_LRU_free_block(&block->page, all)
4177 		    && all && block->page.zip.data) {
4178 			/* Attempt to deallocate the uncompressed page
4179 			if the whole block cannot be deallocted. */
4180 
4181 			buf_LRU_free_block(&block->page, FALSE);
4182 		}
4183 	}
4184 
4185 	buf_pool_mutex_exit(buf_pool);
4186 	mutex_exit(&block->mutex);
4187 }
4188 
4189 /*******************************************************************//**
4190 Stores the fields in big_rec_vec to the tablespace and puts pointers to
4191 them in rec.  The extern flags in rec will have to be set beforehand.
4192 The fields are stored on pages allocated from leaf node
4193 file segment of the index tree.
4194 @return	DB_SUCCESS or DB_OUT_OF_FILE_SPACE */
4195 UNIV_INTERN
4196 enum db_err
btr_store_big_rec_extern_fields(dict_index_t * index,buf_block_t * rec_block,rec_t * rec,const ulint * offsets,const big_rec_t * big_rec_vec,mtr_t * btr_mtr,enum blob_op op)4197 btr_store_big_rec_extern_fields(
4198 /*============================*/
4199 	dict_index_t*	index,		/*!< in: index of rec; the index tree
4200 					MUST be X-latched */
4201 	buf_block_t*	rec_block,	/*!< in/out: block containing rec */
4202 	rec_t*		rec,		/*!< in/out: record */
4203 	const ulint*	offsets,	/*!< in: rec_get_offsets(rec, index);
4204 					the "external storage" flags in offsets
4205 					will not correspond to rec when
4206 					this function returns */
4207 	const big_rec_t*big_rec_vec,	/*!< in: vector containing fields
4208 					to be stored externally */
4209 	mtr_t*		btr_mtr,	/*!< in: mtr containing the
4210 					latches to the clustered index */
4211 	enum blob_op	op)		/*! in: operation code */
4212 {
4213 	ulint		rec_page_no;
4214 	byte*		field_ref;
4215 	ulint		extern_len;
4216 	ulint		store_len;
4217 	ulint		page_no;
4218 	ulint		space_id;
4219 	ulint		zip_size;
4220 	ulint		prev_page_no;
4221 	ulint		hint_page_no;
4222 	ulint		i;
4223 	mtr_t		mtr;
4224 	mtr_t*		alloc_mtr;
4225 	mem_heap_t*	heap = NULL;
4226 	page_zip_des_t*	page_zip;
4227 	z_stream	c_stream;
4228 	buf_block_t**	freed_pages	= NULL;
4229 	ulint		n_freed_pages	= 0;
4230 	enum db_err	error		= DB_SUCCESS;
4231 
4232 	ut_ad(rec_offs_validate(rec, index, offsets));
4233 	ut_ad(rec_offs_any_extern(offsets));
4234 	ut_ad(btr_mtr);
4235 	ut_ad(mtr_memo_contains(btr_mtr, dict_index_get_lock(index),
4236 				MTR_MEMO_X_LOCK));
4237 	ut_ad(mtr_memo_contains(btr_mtr, rec_block, MTR_MEMO_PAGE_X_FIX));
4238 	ut_ad(buf_block_get_frame(rec_block) == page_align(rec));
4239 	ut_a(dict_index_is_clust(index));
4240 
4241 	page_zip = buf_block_get_page_zip(rec_block);
4242 	ut_a(dict_table_zip_size(index->table)
4243 	     == buf_block_get_zip_size(rec_block));
4244 
4245 	space_id = buf_block_get_space(rec_block);
4246 	zip_size = buf_block_get_zip_size(rec_block);
4247 	rec_page_no = buf_block_get_page_no(rec_block);
4248 	ut_a(fil_page_get_type(page_align(rec)) == FIL_PAGE_INDEX);
4249 
4250 	if (page_zip) {
4251 		int	err;
4252 
4253 		/* Zlib deflate needs 128 kilobytes for the default
4254 		window size, plus 512 << memLevel, plus a few
4255 		kilobytes for small objects.  We use reduced memLevel
4256 		to limit the memory consumption, and preallocate the
4257 		heap, hoping to avoid memory fragmentation. */
4258 		heap = mem_heap_create(250000);
4259 		page_zip_set_alloc(&c_stream, heap);
4260 
4261 		err = deflateInit2(&c_stream, Z_DEFAULT_COMPRESSION,
4262 				   Z_DEFLATED, 15, 7, Z_DEFAULT_STRATEGY);
4263 		ut_a(err == Z_OK);
4264 	}
4265 
4266 	if (btr_blob_op_is_update(op)) {
4267 		/* Avoid reusing pages that have been previously freed
4268 		in btr_mtr. */
4269 		if (btr_mtr->n_freed_pages) {
4270 			if (heap == NULL) {
4271 				heap = mem_heap_create(
4272 					btr_mtr->n_freed_pages
4273 					* sizeof *freed_pages);
4274 			}
4275 
4276 			freed_pages = mem_heap_alloc(
4277 				heap,
4278 				btr_mtr->n_freed_pages
4279 				* sizeof *freed_pages);
4280 			n_freed_pages = 0;
4281 		}
4282 
4283 		/* Because btr_mtr will be committed after mtr, it is
4284 		possible that the tablespace has been extended when
4285 		the B-tree record was updated or inserted, or it will
4286 		be extended while allocating pages for big_rec.
4287 
4288 		TODO: In mtr (not btr_mtr), write a redo log record
4289 		about extending the tablespace to its current size,
4290 		and remember the current size. Whenever the tablespace
4291 		grows as pages are allocated, write further redo log
4292 		records to mtr. (Currently tablespace extension is not
4293 		covered by the redo log. If it were, the record would
4294 		only be written to btr_mtr, which is committed after
4295 		mtr.) */
4296 		alloc_mtr = btr_mtr;
4297 	} else {
4298 		/* Use the local mtr for allocations. */
4299 		alloc_mtr = &mtr;
4300 	}
4301 
4302 #if defined UNIV_DEBUG || defined UNIV_BLOB_LIGHT_DEBUG
4303 	/* All pointers to externally stored columns in the record
4304 	must either be zero or they must be pointers to inherited
4305 	columns, owned by this record or an earlier record version. */
4306 	for (i = 0; i < rec_offs_n_fields(offsets); i++) {
4307 		if (!rec_offs_nth_extern(offsets, i)) {
4308 			continue;
4309 		}
4310 		field_ref = btr_rec_get_field_ref(rec, offsets, i);
4311 
4312 		ut_a(!(field_ref[BTR_EXTERN_LEN] & BTR_EXTERN_OWNER_FLAG));
4313 		/* Either this must be an update in place,
4314 		or the BLOB must be inherited, or the BLOB pointer
4315 		must be zero (will be written in this function). */
4316 		ut_a(op == BTR_STORE_UPDATE
4317 		     || (field_ref[BTR_EXTERN_LEN] & BTR_EXTERN_INHERITED_FLAG)
4318 		     || !memcmp(field_ref, field_ref_zero,
4319 				BTR_EXTERN_FIELD_REF_SIZE));
4320 	}
4321 #endif /* UNIV_DEBUG || UNIV_BLOB_LIGHT_DEBUG */
4322 	/* We have to create a file segment to the tablespace
4323 	for each field and put the pointer to the field in rec */
4324 
4325 	for (i = 0; i < big_rec_vec->n_fields; i++) {
4326 		field_ref = btr_rec_get_field_ref(
4327 			rec, offsets, big_rec_vec->fields[i].field_no);
4328 #if defined UNIV_DEBUG || defined UNIV_BLOB_LIGHT_DEBUG
4329 		/* A zero BLOB pointer should have been initially inserted. */
4330 		ut_a(!memcmp(field_ref, field_ref_zero,
4331 			     BTR_EXTERN_FIELD_REF_SIZE));
4332 #endif /* UNIV_DEBUG || UNIV_BLOB_LIGHT_DEBUG */
4333 		extern_len = big_rec_vec->fields[i].len;
4334 		UNIV_MEM_ASSERT_RW(big_rec_vec->fields[i].data,
4335 				   extern_len);
4336 
4337 		ut_a(extern_len > 0);
4338 
4339 		prev_page_no = FIL_NULL;
4340 
4341 		if (page_zip) {
4342 			int	err = deflateReset(&c_stream);
4343 			ut_a(err == Z_OK);
4344 
4345 			c_stream.next_in = (void*) big_rec_vec->fields[i].data;
4346 			c_stream.avail_in = extern_len;
4347 		}
4348 
4349 		for (;;) {
4350 			buf_block_t*	block;
4351 			page_t*		page;
4352 
4353 			mtr_start(&mtr);
4354 
4355 			if (prev_page_no == FIL_NULL) {
4356 				hint_page_no = 1 + rec_page_no;
4357 			} else {
4358 				hint_page_no = prev_page_no + 1;
4359 			}
4360 
4361 alloc_another:
4362 			block = btr_page_alloc(index, hint_page_no,
4363 					       FSP_NO_DIR, 0, alloc_mtr, &mtr);
4364 			if (UNIV_UNLIKELY(block == NULL)) {
4365 				mtr_commit(&mtr);
4366 				error = DB_OUT_OF_FILE_SPACE;
4367 				goto func_exit;
4368 			}
4369 
4370 			if (rw_lock_get_x_lock_count(&block->lock) > 1) {
4371 				/* This page must have been freed in
4372 				btr_mtr previously. Put it aside, and
4373 				allocate another page for the BLOB data. */
4374 				ut_ad(alloc_mtr == btr_mtr);
4375 				ut_ad(btr_blob_op_is_update(op));
4376 				ut_ad(n_freed_pages < btr_mtr->n_freed_pages);
4377 				freed_pages[n_freed_pages++] = block;
4378 				goto alloc_another;
4379 			}
4380 
4381 			page_no = buf_block_get_page_no(block);
4382 			page = buf_block_get_frame(block);
4383 
4384 			if (prev_page_no != FIL_NULL) {
4385 				buf_block_t*	prev_block;
4386 				page_t*		prev_page;
4387 
4388 				prev_block = buf_page_get(space_id, zip_size,
4389 							  prev_page_no,
4390 							  RW_X_LATCH, &mtr);
4391 				buf_block_dbg_add_level(prev_block,
4392 							SYNC_EXTERN_STORAGE);
4393 				prev_page = buf_block_get_frame(prev_block);
4394 
4395 				if (page_zip) {
4396 					mlog_write_ulint(
4397 						prev_page + FIL_PAGE_NEXT,
4398 						page_no, MLOG_4BYTES, &mtr);
4399 					memcpy(buf_block_get_page_zip(
4400 						       prev_block)
4401 					       ->data + FIL_PAGE_NEXT,
4402 					       prev_page + FIL_PAGE_NEXT, 4);
4403 				} else {
4404 					mlog_write_ulint(
4405 						prev_page + FIL_PAGE_DATA
4406 						+ BTR_BLOB_HDR_NEXT_PAGE_NO,
4407 						page_no, MLOG_4BYTES, &mtr);
4408 				}
4409 
4410 			}
4411 
4412 			if (page_zip) {
4413 				int		err;
4414 				page_zip_des_t*	blob_page_zip;
4415 
4416 				/* Write FIL_PAGE_TYPE to the redo log
4417 				separately, before logging any other
4418 				changes to the page, so that the debug
4419 				assertions in
4420 				recv_parse_or_apply_log_rec_body() can
4421 				be made simpler.  Before InnoDB Plugin
4422 				1.0.4, the initialization of
4423 				FIL_PAGE_TYPE was logged as part of
4424 				the mlog_log_string() below. */
4425 
4426 				mlog_write_ulint(page + FIL_PAGE_TYPE,
4427 						 prev_page_no == FIL_NULL
4428 						 ? FIL_PAGE_TYPE_ZBLOB
4429 						 : FIL_PAGE_TYPE_ZBLOB2,
4430 						 MLOG_2BYTES, &mtr);
4431 
4432 				c_stream.next_out = page
4433 					+ FIL_PAGE_DATA;
4434 				c_stream.avail_out
4435 					= page_zip_get_size(page_zip)
4436 					- FIL_PAGE_DATA;
4437 
4438 				err = deflate(&c_stream, Z_FINISH);
4439 				ut_a(err == Z_OK || err == Z_STREAM_END);
4440 				ut_a(err == Z_STREAM_END
4441 				     || c_stream.avail_out == 0);
4442 
4443 				/* Write the "next BLOB page" pointer */
4444 				mlog_write_ulint(page + FIL_PAGE_NEXT,
4445 						 FIL_NULL, MLOG_4BYTES, &mtr);
4446 				/* Initialize the unused "prev page" pointer */
4447 				mlog_write_ulint(page + FIL_PAGE_PREV,
4448 						 FIL_NULL, MLOG_4BYTES, &mtr);
4449 				/* Write a back pointer to the record
4450 				into the otherwise unused area.  This
4451 				information could be useful in
4452 				debugging.  Later, we might want to
4453 				implement the possibility to relocate
4454 				BLOB pages.  Then, we would need to be
4455 				able to adjust the BLOB pointer in the
4456 				record.  We do not store the heap
4457 				number of the record, because it can
4458 				change in page_zip_reorganize() or
4459 				btr_page_reorganize().  However, also
4460 				the page number of the record may
4461 				change when B-tree nodes are split or
4462 				merged. */
4463 				mlog_write_ulint(page
4464 						 + FIL_PAGE_FILE_FLUSH_LSN,
4465 						 space_id,
4466 						 MLOG_4BYTES, &mtr);
4467 				mlog_write_ulint(page
4468 						 + FIL_PAGE_FILE_FLUSH_LSN + 4,
4469 						 rec_page_no,
4470 						 MLOG_4BYTES, &mtr);
4471 
4472 				/* Zero out the unused part of the page. */
4473 				memset(page + page_zip_get_size(page_zip)
4474 				       - c_stream.avail_out,
4475 				       0, c_stream.avail_out);
4476 				mlog_log_string(page + FIL_PAGE_FILE_FLUSH_LSN,
4477 						page_zip_get_size(page_zip)
4478 						- FIL_PAGE_FILE_FLUSH_LSN,
4479 						&mtr);
4480 				/* Copy the page to compressed storage,
4481 				because it will be flushed to disk
4482 				from there. */
4483 				blob_page_zip = buf_block_get_page_zip(block);
4484 				ut_ad(blob_page_zip);
4485 				ut_ad(page_zip_get_size(blob_page_zip)
4486 				      == page_zip_get_size(page_zip));
4487 				memcpy(blob_page_zip->data, page,
4488 				       page_zip_get_size(page_zip));
4489 
4490 				if (err == Z_OK && prev_page_no != FIL_NULL) {
4491 
4492 					goto next_zip_page;
4493 				}
4494 
4495 				if (alloc_mtr == &mtr) {
4496 					rec_block = buf_page_get(
4497 						space_id, zip_size,
4498 						rec_page_no,
4499 						RW_X_LATCH, &mtr);
4500 					buf_block_dbg_add_level(
4501 						rec_block,
4502 						SYNC_NO_ORDER_CHECK);
4503 				}
4504 
4505 				if (err == Z_STREAM_END) {
4506 					mach_write_to_4(field_ref
4507 							+ BTR_EXTERN_LEN, 0);
4508 					mach_write_to_4(field_ref
4509 							+ BTR_EXTERN_LEN + 4,
4510 							c_stream.total_in);
4511 				} else {
4512 					memset(field_ref + BTR_EXTERN_LEN,
4513 					       0, 8);
4514 				}
4515 
4516 				if (prev_page_no == FIL_NULL) {
4517 					btr_blob_dbg_add_blob(
4518 						rec, big_rec_vec->fields[i]
4519 						.field_no, page_no, index,
4520 						"store");
4521 
4522 					mach_write_to_4(field_ref
4523 							+ BTR_EXTERN_SPACE_ID,
4524 							space_id);
4525 
4526 					mach_write_to_4(field_ref
4527 							+ BTR_EXTERN_PAGE_NO,
4528 							page_no);
4529 
4530 					mach_write_to_4(field_ref
4531 							+ BTR_EXTERN_OFFSET,
4532 							FIL_PAGE_NEXT);
4533 				}
4534 
4535 				page_zip_write_blob_ptr(
4536 					page_zip, rec, index, offsets,
4537 					big_rec_vec->fields[i].field_no,
4538 					alloc_mtr);
4539 
4540 next_zip_page:
4541 				prev_page_no = page_no;
4542 
4543 				/* Commit mtr and release the
4544 				uncompressed page frame to save memory. */
4545 				btr_blob_free(block, FALSE, &mtr);
4546 
4547 				if (err == Z_STREAM_END) {
4548 					break;
4549 				}
4550 			} else {
4551 				mlog_write_ulint(page + FIL_PAGE_TYPE,
4552 						 FIL_PAGE_TYPE_BLOB,
4553 						 MLOG_2BYTES, &mtr);
4554 
4555 				if (extern_len > (UNIV_PAGE_SIZE
4556 						  - FIL_PAGE_DATA
4557 						  - BTR_BLOB_HDR_SIZE
4558 						  - FIL_PAGE_DATA_END)) {
4559 					store_len = UNIV_PAGE_SIZE
4560 						- FIL_PAGE_DATA
4561 						- BTR_BLOB_HDR_SIZE
4562 						- FIL_PAGE_DATA_END;
4563 				} else {
4564 					store_len = extern_len;
4565 				}
4566 
4567 				mlog_write_string(page + FIL_PAGE_DATA
4568 						  + BTR_BLOB_HDR_SIZE,
4569 						  (const byte*)
4570 						  big_rec_vec->fields[i].data
4571 						  + big_rec_vec->fields[i].len
4572 						  - extern_len,
4573 						  store_len, &mtr);
4574 				mlog_write_ulint(page + FIL_PAGE_DATA
4575 						 + BTR_BLOB_HDR_PART_LEN,
4576 						 store_len, MLOG_4BYTES, &mtr);
4577 				mlog_write_ulint(page + FIL_PAGE_DATA
4578 						 + BTR_BLOB_HDR_NEXT_PAGE_NO,
4579 						 FIL_NULL, MLOG_4BYTES, &mtr);
4580 
4581 				extern_len -= store_len;
4582 
4583 				if (alloc_mtr == &mtr) {
4584 					rec_block = buf_page_get(
4585 						space_id, zip_size,
4586 						rec_page_no,
4587 						RW_X_LATCH, &mtr);
4588 					buf_block_dbg_add_level(
4589 						rec_block,
4590 						SYNC_NO_ORDER_CHECK);
4591 				}
4592 
4593 				mlog_write_ulint(field_ref + BTR_EXTERN_LEN, 0,
4594 						 MLOG_4BYTES, alloc_mtr);
4595 				mlog_write_ulint(field_ref
4596 						 + BTR_EXTERN_LEN + 4,
4597 						 big_rec_vec->fields[i].len
4598 						 - extern_len,
4599 						 MLOG_4BYTES, alloc_mtr);
4600 
4601 				if (prev_page_no == FIL_NULL) {
4602 					btr_blob_dbg_add_blob(
4603 						rec, big_rec_vec->fields[i]
4604 						.field_no, page_no, index,
4605 						"store");
4606 
4607 					mlog_write_ulint(field_ref
4608 							 + BTR_EXTERN_SPACE_ID,
4609 							 space_id, MLOG_4BYTES,
4610 							 alloc_mtr);
4611 
4612 					mlog_write_ulint(field_ref
4613 							 + BTR_EXTERN_PAGE_NO,
4614 							 page_no, MLOG_4BYTES,
4615 							 alloc_mtr);
4616 
4617 					mlog_write_ulint(field_ref
4618 							 + BTR_EXTERN_OFFSET,
4619 							 FIL_PAGE_DATA,
4620 							 MLOG_4BYTES,
4621 							 alloc_mtr);
4622 				}
4623 
4624 				prev_page_no = page_no;
4625 
4626 				mtr_commit(&mtr);
4627 
4628 				if (extern_len == 0) {
4629 					break;
4630 				}
4631 			}
4632 		}
4633 
4634 		DBUG_EXECUTE_IF("btr_store_big_rec_extern",
4635 				error = DB_OUT_OF_FILE_SPACE;
4636 				goto func_exit;);
4637 	}
4638 
4639 func_exit:
4640 	if (page_zip) {
4641 		deflateEnd(&c_stream);
4642 	}
4643 
4644 	if (n_freed_pages) {
4645 		ulint	i;
4646 
4647 		ut_ad(alloc_mtr == btr_mtr);
4648 		ut_ad(btr_blob_op_is_update(op));
4649 
4650 		for (i = 0; i < n_freed_pages; i++) {
4651 			btr_page_free_low(index, freed_pages[i], 0, alloc_mtr);
4652 		}
4653 	}
4654 
4655 	if (heap != NULL) {
4656 		mem_heap_free(heap);
4657 	}
4658 
4659 #if defined UNIV_DEBUG || defined UNIV_BLOB_LIGHT_DEBUG
4660 	/* All pointers to externally stored columns in the record
4661 	must be valid. */
4662 	for (i = 0; i < rec_offs_n_fields(offsets); i++) {
4663 		if (!rec_offs_nth_extern(offsets, i)) {
4664 			continue;
4665 		}
4666 
4667 		field_ref = btr_rec_get_field_ref(rec, offsets, i);
4668 
4669 		/* The pointer must not be zero if the operation
4670 		succeeded. */
4671 		ut_a(0 != memcmp(field_ref, field_ref_zero,
4672 				 BTR_EXTERN_FIELD_REF_SIZE)
4673 		     || error != DB_SUCCESS);
4674 		/* The column must not be disowned by this record. */
4675 		ut_a(!(field_ref[BTR_EXTERN_LEN] & BTR_EXTERN_OWNER_FLAG));
4676 	}
4677 #endif /* UNIV_DEBUG || UNIV_BLOB_LIGHT_DEBUG */
4678 	return(error);
4679 }
4680 
4681 /*******************************************************************//**
4682 Check the FIL_PAGE_TYPE on an uncompressed BLOB page. */
4683 static
4684 void
btr_check_blob_fil_page_type(ulint space_id,ulint page_no,const page_t * page,ibool read)4685 btr_check_blob_fil_page_type(
4686 /*=========================*/
4687 	ulint		space_id,	/*!< in: space id */
4688 	ulint		page_no,	/*!< in: page number */
4689 	const page_t*	page,		/*!< in: page */
4690 	ibool		read)		/*!< in: TRUE=read, FALSE=purge */
4691 {
4692 	ulint	type = fil_page_get_type(page);
4693 
4694 	ut_a(space_id == page_get_space_id(page));
4695 	ut_a(page_no == page_get_page_no(page));
4696 
4697 	if (UNIV_UNLIKELY(type != FIL_PAGE_TYPE_BLOB)) {
4698 		ulint	flags = fil_space_get_flags(space_id);
4699 
4700 #ifndef UNIV_DEBUG /* Improve debug test coverage */
4701 		if (UNIV_LIKELY
4702 		    ((flags & DICT_TF_FORMAT_MASK) == DICT_TF_FORMAT_51)) {
4703 			/* Old versions of InnoDB did not initialize
4704 			FIL_PAGE_TYPE on BLOB pages.  Do not print
4705 			anything about the type mismatch when reading
4706 			a BLOB page that is in Antelope format.*/
4707 			return;
4708 		}
4709 #endif /* !UNIV_DEBUG */
4710 
4711 		ut_print_timestamp(stderr);
4712 		fprintf(stderr,
4713 			"  InnoDB: FIL_PAGE_TYPE=%lu"
4714 			" on BLOB %s space %lu page %lu flags %lx\n",
4715 			(ulong) type, read ? "read" : "purge",
4716 			(ulong) space_id, (ulong) page_no, (ulong) flags);
4717 		ut_error;
4718 	}
4719 }
4720 
4721 /*******************************************************************//**
4722 Frees the space in an externally stored field to the file space
4723 management if the field in data is owned by the externally stored field,
4724 in a rollback we may have the additional condition that the field must
4725 not be inherited. */
4726 UNIV_INTERN
4727 void
btr_free_externally_stored_field(dict_index_t * index,byte * field_ref,const rec_t * rec,const ulint * offsets,page_zip_des_t * page_zip,ulint i,enum trx_rb_ctx rb_ctx,mtr_t * local_mtr)4728 btr_free_externally_stored_field(
4729 /*=============================*/
4730 	dict_index_t*	index,		/*!< in: index of the data, the index
4731 					tree MUST be X-latched; if the tree
4732 					height is 1, then also the root page
4733 					must be X-latched! (this is relevant
4734 					in the case this function is called
4735 					from purge where 'data' is located on
4736 					an undo log page, not an index
4737 					page) */
4738 	byte*		field_ref,	/*!< in/out: field reference */
4739 	const rec_t*	rec,		/*!< in: record containing field_ref, for
4740 					page_zip_write_blob_ptr(), or NULL */
4741 	const ulint*	offsets,	/*!< in: rec_get_offsets(rec, index),
4742 					or NULL */
4743 	page_zip_des_t*	page_zip,	/*!< in: compressed page corresponding
4744 					to rec, or NULL if rec == NULL */
4745 	ulint		i,		/*!< in: field number of field_ref;
4746 					ignored if rec == NULL */
4747 	enum trx_rb_ctx	rb_ctx,		/*!< in: rollback context */
4748 	mtr_t*		local_mtr __attribute__((unused))) /*!< in: mtr
4749 					containing the latch to data an an
4750 					X-latch to the index tree */
4751 {
4752 	page_t*		page;
4753 	ulint		space_id;
4754 	ulint		rec_zip_size = dict_table_zip_size(index->table);
4755 	ulint		ext_zip_size;
4756 	ulint		page_no;
4757 	ulint		next_page_no;
4758 	mtr_t		mtr;
4759 
4760 	ut_ad(mtr_memo_contains(local_mtr, dict_index_get_lock(index),
4761 				MTR_MEMO_X_LOCK));
4762 	ut_ad(mtr_memo_contains_page(local_mtr, field_ref,
4763 				     MTR_MEMO_PAGE_X_FIX));
4764 	ut_ad(!rec || rec_offs_validate(rec, index, offsets));
4765 	ut_ad(!rec || field_ref == btr_rec_get_field_ref(rec, offsets, i));
4766 
4767 	if (UNIV_UNLIKELY(!memcmp(field_ref, field_ref_zero,
4768 				  BTR_EXTERN_FIELD_REF_SIZE))) {
4769 		/* In the rollback, we may encounter a clustered index
4770 		record with some unwritten off-page columns. There is
4771 		nothing to free then. */
4772 		ut_a(rb_ctx != RB_NONE);
4773 		return;
4774 	}
4775 
4776 	space_id = mach_read_from_4(field_ref + BTR_EXTERN_SPACE_ID);
4777 
4778 	if (UNIV_UNLIKELY(space_id != dict_index_get_space(index))) {
4779 		ext_zip_size = fil_space_get_zip_size(space_id);
4780 		/* This must be an undo log record in the system tablespace,
4781 		that is, in row_purge_upd_exist_or_extern().
4782 		Currently, externally stored records are stored in the
4783 		same tablespace as the referring records. */
4784 		ut_ad(!page_get_space_id(page_align(field_ref)));
4785 		ut_ad(!rec);
4786 		ut_ad(!page_zip);
4787 	} else {
4788 		ext_zip_size = rec_zip_size;
4789 	}
4790 
4791 	if (!rec) {
4792 		/* This is a call from row_purge_upd_exist_or_extern(). */
4793 		ut_ad(!page_zip);
4794 		rec_zip_size = 0;
4795 	}
4796 
4797 #ifdef UNIV_BLOB_DEBUG
4798 	if (!(field_ref[BTR_EXTERN_LEN] & BTR_EXTERN_OWNER_FLAG)
4799 	    && !((field_ref[BTR_EXTERN_LEN] & BTR_EXTERN_INHERITED_FLAG)
4800 		 && (rb_ctx == RB_NORMAL || rb_ctx == RB_RECOVERY))) {
4801 		/* This off-page column will be freed.
4802 		Check that no references remain. */
4803 
4804 		btr_blob_dbg_t	b;
4805 
4806 		b.blob_page_no = mach_read_from_4(
4807 			field_ref + BTR_EXTERN_PAGE_NO);
4808 
4809 		if (rec) {
4810 			/* Remove the reference from the record to the
4811 			BLOB. If the BLOB were not freed, the
4812 			reference would be removed when the record is
4813 			removed. Freeing the BLOB will overwrite the
4814 			BTR_EXTERN_PAGE_NO in the field_ref of the
4815 			record with FIL_NULL, which would make the
4816 			btr_blob_dbg information inconsistent with the
4817 			record. */
4818 			b.ref_page_no = page_get_page_no(page_align(rec));
4819 			b.ref_heap_no = page_rec_get_heap_no(rec);
4820 			b.ref_field_no = i;
4821 			btr_blob_dbg_rbt_delete(index, &b, "free");
4822 		}
4823 
4824 		btr_blob_dbg_assert_empty(index, b.blob_page_no);
4825 	}
4826 #endif /* UNIV_BLOB_DEBUG */
4827 
4828 	for (;;) {
4829 #ifdef UNIV_SYNC_DEBUG
4830 		buf_block_t*	rec_block;
4831 #endif /* UNIV_SYNC_DEBUG */
4832 		buf_block_t*	ext_block;
4833 
4834 		mtr_start(&mtr);
4835 
4836 #ifdef UNIV_SYNC_DEBUG
4837 		rec_block =
4838 #endif /* UNIV_SYNC_DEBUG */
4839 		buf_page_get(page_get_space_id(page_align(field_ref)),
4840 			     rec_zip_size,
4841 			     page_get_page_no(page_align(field_ref)),
4842 			     RW_X_LATCH, &mtr);
4843 		buf_block_dbg_add_level(rec_block, SYNC_NO_ORDER_CHECK);
4844 		page_no = mach_read_from_4(field_ref + BTR_EXTERN_PAGE_NO);
4845 
4846 		if (/* There is no external storage data */
4847 		    page_no == FIL_NULL
4848 		    /* This field does not own the externally stored field */
4849 		    || (mach_read_from_1(field_ref + BTR_EXTERN_LEN)
4850 			& BTR_EXTERN_OWNER_FLAG)
4851 		    /* Rollback and inherited field */
4852 		    || ((rb_ctx == RB_NORMAL || rb_ctx == RB_RECOVERY)
4853 			&& (mach_read_from_1(field_ref + BTR_EXTERN_LEN)
4854 			    & BTR_EXTERN_INHERITED_FLAG))) {
4855 
4856 			/* Do not free */
4857 			mtr_commit(&mtr);
4858 
4859 			return;
4860 		}
4861 
4862 		ext_block = buf_page_get(space_id, ext_zip_size, page_no,
4863 					 RW_X_LATCH, &mtr);
4864 		buf_block_dbg_add_level(ext_block, SYNC_EXTERN_STORAGE);
4865 		page = buf_block_get_frame(ext_block);
4866 
4867 		if (ext_zip_size) {
4868 			/* Note that page_zip will be NULL
4869 			in row_purge_upd_exist_or_extern(). */
4870 			switch (fil_page_get_type(page)) {
4871 			case FIL_PAGE_TYPE_ZBLOB:
4872 			case FIL_PAGE_TYPE_ZBLOB2:
4873 				break;
4874 			default:
4875 				ut_error;
4876 			}
4877 			next_page_no = mach_read_from_4(page + FIL_PAGE_NEXT);
4878 
4879 			btr_page_free_low(index, ext_block, 0, &mtr);
4880 
4881 			if (page_zip) {
4882 				mach_write_to_4(field_ref + BTR_EXTERN_PAGE_NO,
4883 						next_page_no);
4884 				mach_write_to_4(field_ref + BTR_EXTERN_LEN + 4,
4885 						0);
4886 				page_zip_write_blob_ptr(page_zip, rec, index,
4887 							offsets, i, &mtr);
4888 			} else {
4889 				mlog_write_ulint(field_ref
4890 						 + BTR_EXTERN_PAGE_NO,
4891 						 next_page_no,
4892 						 MLOG_4BYTES, &mtr);
4893 				mlog_write_ulint(field_ref
4894 						 + BTR_EXTERN_LEN + 4, 0,
4895 						 MLOG_4BYTES, &mtr);
4896 			}
4897 		} else {
4898 			ut_a(!page_zip);
4899 			btr_check_blob_fil_page_type(space_id, page_no, page,
4900 						     FALSE);
4901 
4902 			next_page_no = mach_read_from_4(
4903 				page + FIL_PAGE_DATA
4904 				+ BTR_BLOB_HDR_NEXT_PAGE_NO);
4905 
4906 			/* We must supply the page level (= 0) as an argument
4907 			because we did not store it on the page (we save the
4908 			space overhead from an index page header. */
4909 
4910 			btr_page_free_low(index, ext_block, 0, &mtr);
4911 
4912 			mlog_write_ulint(field_ref + BTR_EXTERN_PAGE_NO,
4913 					 next_page_no,
4914 					 MLOG_4BYTES, &mtr);
4915 			/* Zero out the BLOB length.  If the server
4916 			crashes during the execution of this function,
4917 			trx_rollback_or_clean_all_recovered() could
4918 			dereference the half-deleted BLOB, fetching a
4919 			wrong prefix for the BLOB. */
4920 			mlog_write_ulint(field_ref + BTR_EXTERN_LEN + 4,
4921 					 0,
4922 					 MLOG_4BYTES, &mtr);
4923 		}
4924 
4925 		/* Commit mtr and release the BLOB block to save memory. */
4926 		btr_blob_free(ext_block, TRUE, &mtr);
4927 	}
4928 }
4929 
4930 /***********************************************************//**
4931 Frees the externally stored fields for a record. */
4932 static
4933 void
btr_rec_free_externally_stored_fields(dict_index_t * index,rec_t * rec,const ulint * offsets,page_zip_des_t * page_zip,enum trx_rb_ctx rb_ctx,mtr_t * mtr)4934 btr_rec_free_externally_stored_fields(
4935 /*==================================*/
4936 	dict_index_t*	index,	/*!< in: index of the data, the index
4937 				tree MUST be X-latched */
4938 	rec_t*		rec,	/*!< in/out: record */
4939 	const ulint*	offsets,/*!< in: rec_get_offsets(rec, index) */
4940 	page_zip_des_t*	page_zip,/*!< in: compressed page whose uncompressed
4941 				part will be updated, or NULL */
4942 	enum trx_rb_ctx	rb_ctx,	/*!< in: rollback context */
4943 	mtr_t*		mtr)	/*!< in: mini-transaction handle which contains
4944 				an X-latch to record page and to the index
4945 				tree */
4946 {
4947 	ulint	n_fields;
4948 	ulint	i;
4949 
4950 	ut_ad(rec_offs_validate(rec, index, offsets));
4951 	ut_ad(mtr_memo_contains_page(mtr, rec, MTR_MEMO_PAGE_X_FIX));
4952 	/* Free possible externally stored fields in the record */
4953 
4954 	ut_ad(dict_table_is_comp(index->table) == !!rec_offs_comp(offsets));
4955 	n_fields = rec_offs_n_fields(offsets);
4956 
4957 	for (i = 0; i < n_fields; i++) {
4958 		if (rec_offs_nth_extern(offsets, i)) {
4959 			btr_free_externally_stored_field(
4960 				index, btr_rec_get_field_ref(rec, offsets, i),
4961 				rec, offsets, page_zip, i, rb_ctx, mtr);
4962 		}
4963 	}
4964 }
4965 
4966 /***********************************************************//**
4967 Frees the externally stored fields for a record, if the field is mentioned
4968 in the update vector. */
4969 static
4970 void
btr_rec_free_updated_extern_fields(dict_index_t * index,rec_t * rec,page_zip_des_t * page_zip,const ulint * offsets,const upd_t * update,enum trx_rb_ctx rb_ctx,mtr_t * mtr)4971 btr_rec_free_updated_extern_fields(
4972 /*===============================*/
4973 	dict_index_t*	index,	/*!< in: index of rec; the index tree MUST be
4974 				X-latched */
4975 	rec_t*		rec,	/*!< in/out: record */
4976 	page_zip_des_t*	page_zip,/*!< in: compressed page whose uncompressed
4977 				part will be updated, or NULL */
4978 	const ulint*	offsets,/*!< in: rec_get_offsets(rec, index) */
4979 	const upd_t*	update,	/*!< in: update vector */
4980 	enum trx_rb_ctx	rb_ctx,	/*!< in: rollback context */
4981 	mtr_t*		mtr)	/*!< in: mini-transaction handle which contains
4982 				an X-latch to record page and to the tree */
4983 {
4984 	ulint	n_fields;
4985 	ulint	i;
4986 
4987 	ut_ad(rec_offs_validate(rec, index, offsets));
4988 	ut_ad(mtr_memo_contains_page(mtr, rec, MTR_MEMO_PAGE_X_FIX));
4989 
4990 	/* Free possible externally stored fields in the record */
4991 
4992 	n_fields = upd_get_n_fields(update);
4993 
4994 	for (i = 0; i < n_fields; i++) {
4995 		const upd_field_t* ufield = upd_get_nth_field(update, i);
4996 
4997 		if (rec_offs_nth_extern(offsets, ufield->field_no)) {
4998 			ulint	len;
4999 			byte*	data = rec_get_nth_field(
5000 				rec, offsets, ufield->field_no, &len);
5001 			ut_a(len >= BTR_EXTERN_FIELD_REF_SIZE);
5002 
5003 			btr_free_externally_stored_field(
5004 				index, data + len - BTR_EXTERN_FIELD_REF_SIZE,
5005 				rec, offsets, page_zip,
5006 				ufield->field_no, rb_ctx, mtr);
5007 		}
5008 	}
5009 }
5010 
5011 /*******************************************************************//**
5012 Copies the prefix of an uncompressed BLOB.  The clustered index record
5013 that points to this BLOB must be protected by a lock or a page latch.
5014 @return	number of bytes written to buf */
5015 static
5016 ulint
btr_copy_blob_prefix(byte * buf,ulint len,ulint space_id,ulint page_no,ulint offset)5017 btr_copy_blob_prefix(
5018 /*=================*/
5019 	byte*		buf,	/*!< out: the externally stored part of
5020 				the field, or a prefix of it */
5021 	ulint		len,	/*!< in: length of buf, in bytes */
5022 	ulint		space_id,/*!< in: space id of the BLOB pages */
5023 	ulint		page_no,/*!< in: page number of the first BLOB page */
5024 	ulint		offset)	/*!< in: offset on the first BLOB page */
5025 {
5026 	ulint	copied_len	= 0;
5027 
5028 	for (;;) {
5029 		mtr_t		mtr;
5030 		buf_block_t*	block;
5031 		const page_t*	page;
5032 		const byte*	blob_header;
5033 		ulint		part_len;
5034 		ulint		copy_len;
5035 
5036 		mtr_start(&mtr);
5037 
5038 		block = buf_page_get(space_id, 0, page_no, RW_S_LATCH, &mtr);
5039 		buf_block_dbg_add_level(block, SYNC_EXTERN_STORAGE);
5040 		page = buf_block_get_frame(block);
5041 
5042 		btr_check_blob_fil_page_type(space_id, page_no, page, TRUE);
5043 
5044 		blob_header = page + offset;
5045 		part_len = btr_blob_get_part_len(blob_header);
5046 		copy_len = ut_min(part_len, len - copied_len);
5047 
5048 		memcpy(buf + copied_len,
5049 		       blob_header + BTR_BLOB_HDR_SIZE, copy_len);
5050 		copied_len += copy_len;
5051 
5052 		page_no = btr_blob_get_next_page_no(blob_header);
5053 
5054 		mtr_commit(&mtr);
5055 
5056 		if (page_no == FIL_NULL || copy_len != part_len) {
5057 			UNIV_MEM_ASSERT_RW(buf, copied_len);
5058 			return(copied_len);
5059 		}
5060 
5061 		/* On other BLOB pages except the first the BLOB header
5062 		always is at the page data start: */
5063 
5064 		offset = FIL_PAGE_DATA;
5065 
5066 		ut_ad(copied_len <= len);
5067 	}
5068 }
5069 
5070 /*******************************************************************//**
5071 Copies the prefix of a compressed BLOB.  The clustered index record
5072 that points to this BLOB must be protected by a lock or a page latch.
5073 @return	number of bytes written to buf */
5074 static
5075 ulint
btr_copy_zblob_prefix(byte * buf,ulint len,ulint zip_size,ulint space_id,ulint page_no,ulint offset)5076 btr_copy_zblob_prefix(
5077 /*==================*/
5078 	byte*		buf,	/*!< out: the externally stored part of
5079 				the field, or a prefix of it */
5080 	ulint		len,	/*!< in: length of buf, in bytes */
5081 	ulint		zip_size,/*!< in: compressed BLOB page size */
5082 	ulint		space_id,/*!< in: space id of the BLOB pages */
5083 	ulint		page_no,/*!< in: page number of the first BLOB page */
5084 	ulint		offset)	/*!< in: offset on the first BLOB page */
5085 {
5086 	ulint		page_type = FIL_PAGE_TYPE_ZBLOB;
5087 	mem_heap_t*	heap;
5088 	int		err;
5089 	z_stream	d_stream;
5090 
5091 	d_stream.next_out = buf;
5092 	d_stream.avail_out = len;
5093 	d_stream.next_in = Z_NULL;
5094 	d_stream.avail_in = 0;
5095 
5096 	/* Zlib inflate needs 32 kilobytes for the default
5097 	window size, plus a few kilobytes for small objects. */
5098 	heap = mem_heap_create(40000);
5099 	page_zip_set_alloc(&d_stream, heap);
5100 
5101 	ut_ad(ut_is_2pow(zip_size));
5102 	ut_ad(zip_size >= PAGE_ZIP_MIN_SIZE);
5103 	ut_ad(zip_size <= UNIV_PAGE_SIZE);
5104 	ut_ad(space_id);
5105 
5106 	err = inflateInit(&d_stream);
5107 	ut_a(err == Z_OK);
5108 
5109 	for (;;) {
5110 		buf_page_t*	bpage;
5111 		ulint		next_page_no;
5112 
5113 		/* There is no latch on bpage directly.  Instead,
5114 		bpage is protected by the B-tree page latch that
5115 		is being held on the clustered index record, or,
5116 		in row_merge_copy_blobs(), by an exclusive table lock. */
5117 		bpage = buf_page_get_zip(space_id, zip_size, page_no);
5118 
5119 		if (UNIV_UNLIKELY(!bpage)) {
5120 			ut_print_timestamp(stderr);
5121 			fprintf(stderr,
5122 				"  InnoDB: Cannot load"
5123 				" compressed BLOB"
5124 				" page %lu space %lu\n",
5125 				(ulong) page_no, (ulong) space_id);
5126 			goto func_exit;
5127 		}
5128 
5129 		if (UNIV_UNLIKELY
5130 		    (fil_page_get_type(bpage->zip.data) != page_type)) {
5131 			ut_print_timestamp(stderr);
5132 			fprintf(stderr,
5133 				"  InnoDB: Unexpected type %lu of"
5134 				" compressed BLOB"
5135 				" page %lu space %lu\n",
5136 				(ulong) fil_page_get_type(bpage->zip.data),
5137 				(ulong) page_no, (ulong) space_id);
5138 			goto end_of_blob;
5139 		}
5140 
5141 		next_page_no = mach_read_from_4(bpage->zip.data + offset);
5142 
5143 		if (UNIV_LIKELY(offset == FIL_PAGE_NEXT)) {
5144 			/* When the BLOB begins at page header,
5145 			the compressed data payload does not
5146 			immediately follow the next page pointer. */
5147 			offset = FIL_PAGE_DATA;
5148 		} else {
5149 			offset += 4;
5150 		}
5151 
5152 		d_stream.next_in = bpage->zip.data + offset;
5153 		d_stream.avail_in = zip_size - offset;
5154 
5155 		err = inflate(&d_stream, Z_NO_FLUSH);
5156 		switch (err) {
5157 		case Z_OK:
5158 			if (!d_stream.avail_out) {
5159 				goto end_of_blob;
5160 			}
5161 			break;
5162 		case Z_STREAM_END:
5163 			if (next_page_no == FIL_NULL) {
5164 				goto end_of_blob;
5165 			}
5166 			/* fall through */
5167 		default:
5168 inflate_error:
5169 			ut_print_timestamp(stderr);
5170 			fprintf(stderr,
5171 				"  InnoDB: inflate() of"
5172 				" compressed BLOB"
5173 				" page %lu space %lu returned %d (%s)\n",
5174 				(ulong) page_no, (ulong) space_id,
5175 				err, d_stream.msg);
5176 		case Z_BUF_ERROR:
5177 			goto end_of_blob;
5178 		}
5179 
5180 		if (next_page_no == FIL_NULL) {
5181 			if (!d_stream.avail_in) {
5182 				ut_print_timestamp(stderr);
5183 				fprintf(stderr,
5184 					"  InnoDB: unexpected end of"
5185 					" compressed BLOB"
5186 					" page %lu space %lu\n",
5187 					(ulong) page_no,
5188 					(ulong) space_id);
5189 			} else {
5190 				err = inflate(&d_stream, Z_FINISH);
5191 				switch (err) {
5192 				case Z_STREAM_END:
5193 				case Z_BUF_ERROR:
5194 					break;
5195 				default:
5196 					goto inflate_error;
5197 				}
5198 			}
5199 
5200 end_of_blob:
5201 			buf_page_release_zip(bpage);
5202 			goto func_exit;
5203 		}
5204 
5205 		buf_page_release_zip(bpage);
5206 
5207 		/* On other BLOB pages except the first
5208 		the BLOB header always is at the page header: */
5209 
5210 		page_no = next_page_no;
5211 		offset = FIL_PAGE_NEXT;
5212 		page_type = FIL_PAGE_TYPE_ZBLOB2;
5213 	}
5214 
5215 func_exit:
5216 	inflateEnd(&d_stream);
5217 	mem_heap_free(heap);
5218 	UNIV_MEM_ASSERT_RW(buf, d_stream.total_out);
5219 	return(d_stream.total_out);
5220 }
5221 
5222 /*******************************************************************//**
5223 Copies the prefix of an externally stored field of a record.  The
5224 clustered index record that points to this BLOB must be protected by a
5225 lock or a page latch.
5226 @return	number of bytes written to buf */
5227 static
5228 ulint
btr_copy_externally_stored_field_prefix_low(byte * buf,ulint len,ulint zip_size,ulint space_id,ulint page_no,ulint offset)5229 btr_copy_externally_stored_field_prefix_low(
5230 /*========================================*/
5231 	byte*		buf,	/*!< out: the externally stored part of
5232 				the field, or a prefix of it */
5233 	ulint		len,	/*!< in: length of buf, in bytes */
5234 	ulint		zip_size,/*!< in: nonzero=compressed BLOB page size,
5235 				zero for uncompressed BLOBs */
5236 	ulint		space_id,/*!< in: space id of the first BLOB page */
5237 	ulint		page_no,/*!< in: page number of the first BLOB page */
5238 	ulint		offset)	/*!< in: offset on the first BLOB page */
5239 {
5240 	if (UNIV_UNLIKELY(len == 0)) {
5241 		return(0);
5242 	}
5243 
5244 	if (UNIV_UNLIKELY(zip_size)) {
5245 		return(btr_copy_zblob_prefix(buf, len, zip_size,
5246 					     space_id, page_no, offset));
5247 	} else {
5248 		return(btr_copy_blob_prefix(buf, len, space_id,
5249 					    page_no, offset));
5250 	}
5251 }
5252 
5253 /*******************************************************************//**
5254 Copies the prefix of an externally stored field of a record.  The
5255 clustered index record must be protected by a lock or a page latch.
5256 @return the length of the copied field, or 0 if the column was being
5257 or has been deleted */
5258 UNIV_INTERN
5259 ulint
btr_copy_externally_stored_field_prefix(byte * buf,ulint len,ulint zip_size,const byte * data,ulint local_len)5260 btr_copy_externally_stored_field_prefix(
5261 /*====================================*/
5262 	byte*		buf,	/*!< out: the field, or a prefix of it */
5263 	ulint		len,	/*!< in: length of buf, in bytes */
5264 	ulint		zip_size,/*!< in: nonzero=compressed BLOB page size,
5265 				zero for uncompressed BLOBs */
5266 	const byte*	data,	/*!< in: 'internally' stored part of the
5267 				field containing also the reference to
5268 				the external part; must be protected by
5269 				a lock or a page latch */
5270 	ulint		local_len)/*!< in: length of data, in bytes */
5271 {
5272 	ulint	space_id;
5273 	ulint	page_no;
5274 	ulint	offset;
5275 
5276 	ut_a(local_len >= BTR_EXTERN_FIELD_REF_SIZE);
5277 
5278 	local_len -= BTR_EXTERN_FIELD_REF_SIZE;
5279 
5280 	if (UNIV_UNLIKELY(local_len >= len)) {
5281 		memcpy(buf, data, len);
5282 		return(len);
5283 	}
5284 
5285 	memcpy(buf, data, local_len);
5286 	data += local_len;
5287 
5288 	ut_a(memcmp(data, field_ref_zero, BTR_EXTERN_FIELD_REF_SIZE));
5289 
5290 	if (!mach_read_from_4(data + BTR_EXTERN_LEN + 4)) {
5291 		/* The externally stored part of the column has been
5292 		(partially) deleted.  Signal the half-deleted BLOB
5293 		to the caller. */
5294 
5295 		return(0);
5296 	}
5297 
5298 	space_id = mach_read_from_4(data + BTR_EXTERN_SPACE_ID);
5299 
5300 	page_no = mach_read_from_4(data + BTR_EXTERN_PAGE_NO);
5301 
5302 	offset = mach_read_from_4(data + BTR_EXTERN_OFFSET);
5303 
5304 	return(local_len
5305 	       + btr_copy_externally_stored_field_prefix_low(buf + local_len,
5306 							     len - local_len,
5307 							     zip_size,
5308 							     space_id, page_no,
5309 							     offset));
5310 }
5311 
5312 /*******************************************************************//**
5313 Copies an externally stored field of a record to mem heap.  The
5314 clustered index record must be protected by a lock or a page latch.
5315 @return	the whole field copied to heap */
5316 static
5317 byte*
btr_copy_externally_stored_field(ulint * len,const byte * data,ulint zip_size,ulint local_len,mem_heap_t * heap)5318 btr_copy_externally_stored_field(
5319 /*=============================*/
5320 	ulint*		len,	/*!< out: length of the whole field */
5321 	const byte*	data,	/*!< in: 'internally' stored part of the
5322 				field containing also the reference to
5323 				the external part; must be protected by
5324 				a lock or a page latch */
5325 	ulint		zip_size,/*!< in: nonzero=compressed BLOB page size,
5326 				zero for uncompressed BLOBs */
5327 	ulint		local_len,/*!< in: length of data */
5328 	mem_heap_t*	heap)	/*!< in: mem heap */
5329 {
5330 	ulint	space_id;
5331 	ulint	page_no;
5332 	ulint	offset;
5333 	ulint	extern_len;
5334 	byte*	buf;
5335 
5336 	ut_a(local_len >= BTR_EXTERN_FIELD_REF_SIZE);
5337 
5338 	local_len -= BTR_EXTERN_FIELD_REF_SIZE;
5339 
5340 	space_id = mach_read_from_4(data + local_len + BTR_EXTERN_SPACE_ID);
5341 
5342 	page_no = mach_read_from_4(data + local_len + BTR_EXTERN_PAGE_NO);
5343 
5344 	offset = mach_read_from_4(data + local_len + BTR_EXTERN_OFFSET);
5345 
5346 	/* Currently a BLOB cannot be bigger than 4 GB; we
5347 	leave the 4 upper bytes in the length field unused */
5348 
5349 	extern_len = mach_read_from_4(data + local_len + BTR_EXTERN_LEN + 4);
5350 
5351 	buf = mem_heap_alloc(heap, local_len + extern_len);
5352 
5353 	memcpy(buf, data, local_len);
5354 	*len = local_len
5355 		+ btr_copy_externally_stored_field_prefix_low(buf + local_len,
5356 							      extern_len,
5357 							      zip_size,
5358 							      space_id,
5359 							      page_no, offset);
5360 
5361 	return(buf);
5362 }
5363 
5364 /*******************************************************************//**
5365 Copies an externally stored field of a record to mem heap.
5366 @return	the field copied to heap, or NULL if the field is incomplete */
5367 UNIV_INTERN
5368 byte*
btr_rec_copy_externally_stored_field(const rec_t * rec,const ulint * offsets,ulint zip_size,ulint no,ulint * len,mem_heap_t * heap)5369 btr_rec_copy_externally_stored_field(
5370 /*=================================*/
5371 	const rec_t*	rec,	/*!< in: record in a clustered index;
5372 				must be protected by a lock or a page latch */
5373 	const ulint*	offsets,/*!< in: array returned by rec_get_offsets() */
5374 	ulint		zip_size,/*!< in: nonzero=compressed BLOB page size,
5375 				zero for uncompressed BLOBs */
5376 	ulint		no,	/*!< in: field number */
5377 	ulint*		len,	/*!< out: length of the field */
5378 	mem_heap_t*	heap)	/*!< in: mem heap */
5379 {
5380 	ulint		local_len;
5381 	const byte*	data;
5382 
5383 	ut_a(rec_offs_nth_extern(offsets, no));
5384 
5385 	/* An externally stored field can contain some initial
5386 	data from the field, and in the last 20 bytes it has the
5387 	space id, page number, and offset where the rest of the
5388 	field data is stored, and the data length in addition to
5389 	the data stored locally. We may need to store some data
5390 	locally to get the local record length above the 128 byte
5391 	limit so that field offsets are stored in two bytes, and
5392 	the extern bit is available in those two bytes. */
5393 
5394 	data = rec_get_nth_field(rec, offsets, no, &local_len);
5395 
5396 	ut_a(local_len >= BTR_EXTERN_FIELD_REF_SIZE);
5397 
5398 	if (UNIV_UNLIKELY
5399 	    (!memcmp(data + local_len - BTR_EXTERN_FIELD_REF_SIZE,
5400 		     field_ref_zero, BTR_EXTERN_FIELD_REF_SIZE))) {
5401 		/* The externally stored field was not written yet.
5402 		This record should only be seen by
5403 		recv_recovery_rollback_active() or any
5404 		TRX_ISO_READ_UNCOMMITTED transactions. */
5405 		return(NULL);
5406 	}
5407 
5408 	return(btr_copy_externally_stored_field(len, data,
5409 						zip_size, local_len, heap));
5410 }
5411 #endif /* !UNIV_HOTBACKUP */
5412