1 /*****************************************************************************
2 
3 Copyright (c) 1997, 2016, Oracle and/or its affiliates. All Rights Reserved.
4 Copyright (c) 2016, 2021, MariaDB Corporation.
5 
6 This program is free software; you can redistribute it and/or modify it under
7 the terms of the GNU General Public License as published by the Free Software
8 Foundation; version 2 of the License.
9 
10 This program is distributed in the hope that it will be useful, but WITHOUT
11 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
13 
14 You should have received a copy of the GNU General Public License along with
15 this program; if not, write to the Free Software Foundation, Inc.,
16 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
17 
18 *****************************************************************************/
19 
20 /**************************************************//**
21 @file ibuf/ibuf0ibuf.cc
22 Insert buffer
23 
24 Created 7/19/1997 Heikki Tuuri
25 *******************************************************/
26 
27 #include "ibuf0ibuf.h"
28 #include "sync0sync.h"
29 #include "btr0sea.h"
30 
31 using st_::span;
32 
33 #if defined UNIV_DEBUG || defined UNIV_IBUF_DEBUG
34 my_bool	srv_ibuf_disable_background_merge;
35 #endif /* UNIV_DEBUG || UNIV_IBUF_DEBUG */
36 
37 /** Number of bits describing a single page */
38 #define IBUF_BITS_PER_PAGE	4
39 /** The start address for an insert buffer bitmap page bitmap */
40 #define IBUF_BITMAP		PAGE_DATA
41 
42 #include "buf0buf.h"
43 #include "buf0rea.h"
44 #include "fsp0fsp.h"
45 #include "trx0sys.h"
46 #include "fil0fil.h"
47 #include "rem0rec.h"
48 #include "btr0cur.h"
49 #include "btr0pcur.h"
50 #include "btr0btr.h"
51 #include "row0upd.h"
52 #include "dict0boot.h"
53 #include "fut0lst.h"
54 #include "lock0lock.h"
55 #include "log0recv.h"
56 #include "que0que.h"
57 #include "srv0start.h" /* srv_shutdown_state */
58 #include "rem0cmp.h"
59 
60 /*	STRUCTURE OF AN INSERT BUFFER RECORD
61 
62 In versions < 4.1.x:
63 
64 1. The first field is the page number.
65 2. The second field is an array which stores type info for each subsequent
66    field. We store the information which affects the ordering of records, and
67    also the physical storage size of an SQL NULL value. E.g., for CHAR(10) it
68    is 10 bytes.
69 3. Next we have the fields of the actual index record.
70 
71 In versions >= 4.1.x:
72 
73 Note that contary to what we planned in the 1990's, there will only be one
74 insert buffer tree, and that is in the system tablespace of InnoDB.
75 
76 1. The first field is the space id.
77 2. The second field is a one-byte marker (0) which differentiates records from
78    the < 4.1.x storage format.
79 3. The third field is the page number.
80 4. The fourth field contains the type info, where we have also added 2 bytes to
81    store the charset. In the compressed table format of 5.0.x we must add more
82    information here so that we can build a dummy 'index' struct which 5.0.x
83    can use in the binary search on the index page in the ibuf merge phase.
84 5. The rest of the fields contain the fields of the actual index record.
85 
86 In versions >= 5.0.3:
87 
88 The first byte of the fourth field is an additional marker (0) if the record
89 is in the compact format.  The presence of this marker can be detected by
90 looking at the length of the field modulo DATA_NEW_ORDER_NULL_TYPE_BUF_SIZE.
91 
92 The high-order bit of the character set field in the type info is the
93 "nullable" flag for the field.
94 
95 In versions >= 5.5:
96 
97 The optional marker byte at the start of the fourth field is replaced by
98 mandatory 3 fields, totaling 4 bytes:
99 
100  1. 2 bytes: Counter field, used to sort records within a (space id, page
101     no) in the order they were added. This is needed so that for example the
102     sequence of operations "INSERT x, DEL MARK x, INSERT x" is handled
103     correctly.
104 
105  2. 1 byte: Operation type (see ibuf_op_t).
106 
107  3. 1 byte: Flags. Currently only one flag exists, IBUF_REC_COMPACT.
108 
109 To ensure older records, which do not have counters to enforce correct
110 sorting, are merged before any new records, ibuf_insert checks if we're
111 trying to insert to a position that contains old-style records, and if so,
112 refuses the insert. Thus, ibuf pages are gradually converted to the new
113 format as their corresponding buffer pool pages are read into memory.
114 */
115 
116 
117 /*	PREVENTING DEADLOCKS IN THE INSERT BUFFER SYSTEM
118 
119 If an OS thread performs any operation that brings in disk pages from
120 non-system tablespaces into the buffer pool, or creates such a page there,
121 then the operation may have as a side effect an insert buffer index tree
122 compression. Thus, the tree latch of the insert buffer tree may be acquired
123 in the x-mode, and also the file space latch of the system tablespace may
124 be acquired in the x-mode.
125 
126 Also, an insert to an index in a non-system tablespace can have the same
127 effect. How do we know this cannot lead to a deadlock of OS threads? There
128 is a problem with the i\o-handler threads: they break the latching order
129 because they own x-latches to pages which are on a lower level than the
130 insert buffer tree latch, its page latches, and the tablespace latch an
131 insert buffer operation can reserve.
132 
133 The solution is the following: Let all the tree and page latches connected
134 with the insert buffer be later in the latching order than the fsp latch and
135 fsp page latches.
136 
137 Insert buffer pages must be such that the insert buffer is never invoked
138 when these pages are accessed as this would result in a recursion violating
139 the latching order. We let a special i/o-handler thread take care of i/o to
140 the insert buffer pages and the ibuf bitmap pages, as well as the fsp bitmap
141 pages and the first inode page, which contains the inode of the ibuf tree: let
142 us call all these ibuf pages. To prevent deadlocks, we do not let a read-ahead
143 access both non-ibuf and ibuf pages.
144 
145 Then an i/o-handler for the insert buffer never needs to access recursively the
146 insert buffer tree and thus obeys the latching order. On the other hand, other
147 i/o-handlers for other tablespaces may require access to the insert buffer,
148 but because all kinds of latches they need to access there are later in the
149 latching order, no violation of the latching order occurs in this case,
150 either.
151 
152 A problem is how to grow and contract an insert buffer tree. As it is later
153 in the latching order than the fsp management, we have to reserve the fsp
154 latch first, before adding or removing pages from the insert buffer tree.
155 We let the insert buffer tree have its own file space management: a free
156 list of pages linked to the tree root. To prevent recursive using of the
157 insert buffer when adding pages to the tree, we must first load these pages
158 to memory, obtaining a latch on them, and only after that add them to the
159 free list of the insert buffer tree. More difficult is removing of pages
160 from the free list. If there is an excess of pages in the free list of the
161 ibuf tree, they might be needed if some thread reserves the fsp latch,
162 intending to allocate more file space. So we do the following: if a thread
163 reserves the fsp latch, we check the writer count field of the latch. If
164 this field has value 1, it means that the thread did not own the latch
165 before entering the fsp system, and the mtr of the thread contains no
166 modifications to the fsp pages. Now we are free to reserve the ibuf latch,
167 and check if there is an excess of pages in the free list. We can then, in a
168 separate mini-transaction, take them out of the free list and free them to
169 the fsp system.
170 
171 To avoid deadlocks in the ibuf system, we divide file pages into three levels:
172 
173 (1) non-ibuf pages,
174 (2) ibuf tree pages and the pages in the ibuf tree free list, and
175 (3) ibuf bitmap pages.
176 
177 No OS thread is allowed to access higher level pages if it has latches to
178 lower level pages; even if the thread owns a B-tree latch it must not access
179 the B-tree non-leaf pages if it has latches on lower level pages. Read-ahead
180 is only allowed for level 1 and 2 pages. Dedicated i/o-handler threads handle
181 exclusively level 1 i/o. A dedicated i/o handler thread handles exclusively
182 level 2 i/o. However, if an OS thread does the i/o handling for itself, i.e.,
183 it uses synchronous aio, it can access any pages, as long as it obeys the
184 access order rules. */
185 
186 /** Operations that can currently be buffered. */
187 ulong	innodb_change_buffering;
188 
189 #if defined UNIV_DEBUG || defined UNIV_IBUF_DEBUG
190 /** Dump the change buffer at startup */
191 my_bool	ibuf_dump;
192 /** Flag to control insert buffer debugging. */
193 uint	ibuf_debug;
194 #endif /* UNIV_DEBUG || UNIV_IBUF_DEBUG */
195 
196 /** The insert buffer control structure */
197 ibuf_t*	ibuf			= NULL;
198 
199 /** @name Offsets to the per-page bits in the insert buffer bitmap */
200 /* @{ */
201 #define	IBUF_BITMAP_FREE	0	/*!< Bits indicating the
202 					amount of free space */
203 #define IBUF_BITMAP_BUFFERED	2	/*!< TRUE if there are buffered
204 					changes for the page */
205 #define IBUF_BITMAP_IBUF	3	/*!< TRUE if page is a part of
206 					the ibuf tree, excluding the
207 					root page, or is in the free
208 					list of the ibuf */
209 /* @} */
210 
211 #define IBUF_REC_FIELD_SPACE	0	/*!< in the pre-4.1 format,
212 					the page number. later, the space_id */
213 #define IBUF_REC_FIELD_MARKER	1	/*!< starting with 4.1, a marker
214 					consisting of 1 byte that is 0 */
215 #define IBUF_REC_FIELD_PAGE	2	/*!< starting with 4.1, the
216 					page number */
217 #define IBUF_REC_FIELD_METADATA	3	/* the metadata field */
218 #define IBUF_REC_FIELD_USER	4	/* first user field */
219 
220 /* Various constants for checking the type of an ibuf record and extracting
221 data from it. For details, see the description of the record format at the
222 top of this file. */
223 
224 /** @name Format of the IBUF_REC_FIELD_METADATA of an insert buffer record
225 The fourth column in the MySQL 5.5 format contains an operation
226 type, counter, and some flags. */
227 /* @{ */
228 #define IBUF_REC_INFO_SIZE	4	/*!< Combined size of info fields at
229 					the beginning of the fourth field */
230 
231 /* Offsets for the fields at the beginning of the fourth field */
232 #define IBUF_REC_OFFSET_COUNTER	0	/*!< Operation counter */
233 #define IBUF_REC_OFFSET_TYPE	2	/*!< Type of operation */
234 #define IBUF_REC_OFFSET_FLAGS	3	/*!< Additional flags */
235 
236 /* Record flag masks */
237 #define IBUF_REC_COMPACT	0x1	/*!< Set in
238 					IBUF_REC_OFFSET_FLAGS if the
239 					user index is in COMPACT
240 					format or later */
241 
242 
243 /** The mutex used to block pessimistic inserts to ibuf trees */
244 static ib_mutex_t	ibuf_pessimistic_insert_mutex;
245 
246 /** The mutex protecting the insert buffer structs */
247 static ib_mutex_t	ibuf_mutex;
248 
249 /** The mutex protecting the insert buffer bitmaps */
250 static ib_mutex_t	ibuf_bitmap_mutex;
251 
252 /** The area in pages from which contract looks for page numbers for merge */
253 const ulint		IBUF_MERGE_AREA = 8;
254 
255 /** Inside the merge area, pages which have at most 1 per this number less
256 buffered entries compared to maximum volume that can buffered for a single
257 page are merged along with the page whose buffer became full */
258 const ulint		IBUF_MERGE_THRESHOLD = 4;
259 
260 /** In ibuf_contract at most this number of pages is read to memory in one
261 batch, in order to merge the entries for them in the insert buffer */
262 const ulint		IBUF_MAX_N_PAGES_MERGED = IBUF_MERGE_AREA;
263 
264 /** If the combined size of the ibuf trees exceeds ibuf->max_size by this
265 many pages, we start to contract it in connection to inserts there, using
266 non-synchronous contract */
267 const ulint		IBUF_CONTRACT_ON_INSERT_NON_SYNC = 0;
268 
269 /** If the combined size of the ibuf trees exceeds ibuf->max_size by this
270 many pages, we start to contract it in connection to inserts there, using
271 synchronous contract */
272 const ulint		IBUF_CONTRACT_ON_INSERT_SYNC = 5;
273 
274 /** If the combined size of the ibuf trees exceeds ibuf->max_size by
275 this many pages, we start to contract it synchronous contract, but do
276 not insert */
277 const ulint		IBUF_CONTRACT_DO_NOT_INSERT = 10;
278 
279 /* TODO: how to cope with drop table if there are records in the insert
280 buffer for the indexes of the table? Is there actually any problem,
281 because ibuf merge is done to a page when it is read in, and it is
282 still physically like the index page even if the index would have been
283 dropped! So, there seems to be no problem. */
284 
285 /******************************************************************//**
286 Sets the flag in the current mini-transaction record indicating we're
287 inside an insert buffer routine. */
288 UNIV_INLINE
289 void
ibuf_enter(mtr_t * mtr)290 ibuf_enter(
291 /*=======*/
292 	mtr_t*	mtr)	/*!< in/out: mini-transaction */
293 {
294 	ut_ad(!mtr->is_inside_ibuf());
295 	mtr->enter_ibuf();
296 }
297 
298 /******************************************************************//**
299 Sets the flag in the current mini-transaction record indicating we're
300 exiting an insert buffer routine. */
301 UNIV_INLINE
302 void
ibuf_exit(mtr_t * mtr)303 ibuf_exit(
304 /*======*/
305 	mtr_t*	mtr)	/*!< in/out: mini-transaction */
306 {
307 	ut_ad(mtr->is_inside_ibuf());
308 	mtr->exit_ibuf();
309 }
310 
311 /**************************************************************//**
312 Commits an insert buffer mini-transaction and sets the persistent
313 cursor latch mode to BTR_NO_LATCHES, that is, detaches the cursor. */
314 UNIV_INLINE
315 void
ibuf_btr_pcur_commit_specify_mtr(btr_pcur_t * pcur,mtr_t * mtr)316 ibuf_btr_pcur_commit_specify_mtr(
317 /*=============================*/
318 	btr_pcur_t*	pcur,	/*!< in/out: persistent cursor */
319 	mtr_t*		mtr)	/*!< in/out: mini-transaction */
320 {
321 	ut_d(ibuf_exit(mtr));
322 	btr_pcur_commit_specify_mtr(pcur, mtr);
323 }
324 
325 /******************************************************************//**
326 Gets the ibuf header page and x-latches it.
327 @return insert buffer header page */
328 static
329 page_t*
ibuf_header_page_get(mtr_t * mtr)330 ibuf_header_page_get(
331 /*=================*/
332 	mtr_t*	mtr)	/*!< in/out: mini-transaction */
333 {
334 	buf_block_t*	block;
335 
336 	ut_ad(!ibuf_inside(mtr));
337 	page_t* page = NULL;
338 
339 	block = buf_page_get(
340 		page_id_t(IBUF_SPACE_ID, FSP_IBUF_HEADER_PAGE_NO),
341 		0, RW_X_LATCH, mtr);
342 
343 	if (block) {
344 		buf_block_dbg_add_level(block, SYNC_IBUF_HEADER);
345 		page = buf_block_get_frame(block);
346 	}
347 
348 	return page;
349 }
350 
351 /******************************************************************//**
352 Gets the root page and sx-latches it.
353 @return insert buffer tree root page */
354 static
355 page_t*
ibuf_tree_root_get(mtr_t * mtr)356 ibuf_tree_root_get(
357 /*===============*/
358 	mtr_t*		mtr)	/*!< in: mtr */
359 {
360 	buf_block_t*	block;
361 	page_t*		root;
362 
363 	ut_ad(ibuf_inside(mtr));
364 	ut_ad(mutex_own(&ibuf_mutex));
365 
366 	mtr_sx_lock_index(ibuf->index, mtr);
367 
368 	/* only segment list access is exclusive each other */
369 	block = buf_page_get(
370 		page_id_t(IBUF_SPACE_ID, FSP_IBUF_TREE_ROOT_PAGE_NO),
371 		0, RW_SX_LATCH, mtr);
372 
373 	buf_block_dbg_add_level(block, SYNC_IBUF_TREE_NODE_NEW);
374 
375 	root = buf_block_get_frame(block);
376 
377 	ut_ad(page_get_space_id(root) == IBUF_SPACE_ID);
378 	ut_ad(page_get_page_no(root) == FSP_IBUF_TREE_ROOT_PAGE_NO);
379 	ut_ad(ibuf->empty == page_is_empty(root));
380 
381 	return(root);
382 }
383 
384 /******************************************************************//**
385 Closes insert buffer and frees the data structures. */
386 void
ibuf_close(void)387 ibuf_close(void)
388 /*============*/
389 {
390 	if (ibuf == NULL) {
391 		return;
392 	}
393 
394 	mutex_free(&ibuf_pessimistic_insert_mutex);
395 
396 	mutex_free(&ibuf_mutex);
397 
398 	mutex_free(&ibuf_bitmap_mutex);
399 
400 	dict_table_t*	ibuf_table = ibuf->index->table;
401 	rw_lock_free(&ibuf->index->lock);
402 	dict_mem_index_free(ibuf->index);
403 	dict_mem_table_free(ibuf_table);
404 
405 	ut_free(ibuf);
406 	ibuf = NULL;
407 }
408 
409 /******************************************************************//**
410 Updates the size information of the ibuf, assuming the segment size has not
411 changed. */
412 static
413 void
ibuf_size_update(const page_t * root)414 ibuf_size_update(
415 /*=============*/
416 	const page_t*	root)	/*!< in: ibuf tree root */
417 {
418 	ut_ad(mutex_own(&ibuf_mutex));
419 
420 	ibuf->free_list_len = flst_get_len(root + PAGE_HEADER
421 					   + PAGE_BTR_IBUF_FREE_LIST);
422 
423 	ibuf->height = 1 + btr_page_get_level(root);
424 
425 	/* the '1 +' is the ibuf header page */
426 	ibuf->size = ibuf->seg_size - (1 + ibuf->free_list_len);
427 }
428 
429 /******************************************************************//**
430 Creates the insert buffer data structure at a database startup and initializes
431 the data structures for the insert buffer.
432 @return DB_SUCCESS or failure */
433 dberr_t
ibuf_init_at_db_start(void)434 ibuf_init_at_db_start(void)
435 /*=======================*/
436 {
437 	page_t*		root;
438 	mtr_t		mtr;
439 	ulint		n_used;
440 	page_t*		header_page;
441 	dberr_t		error= DB_SUCCESS;
442 
443 	ibuf = static_cast<ibuf_t*>(ut_zalloc_nokey(sizeof(ibuf_t)));
444 
445 	/* At startup we intialize ibuf to have a maximum of
446 	CHANGE_BUFFER_DEFAULT_SIZE in terms of percentage of the
447 	buffer pool size. Once ibuf struct is initialized this
448 	value is updated with the user supplied size by calling
449 	ibuf_max_size_update(). */
450 	ibuf->max_size = ((buf_pool_get_curr_size() >> srv_page_size_shift)
451 			  * CHANGE_BUFFER_DEFAULT_SIZE) / 100;
452 
453 	mutex_create(LATCH_ID_IBUF, &ibuf_mutex);
454 
455 	mutex_create(LATCH_ID_IBUF_BITMAP, &ibuf_bitmap_mutex);
456 
457 	mutex_create(LATCH_ID_IBUF_PESSIMISTIC_INSERT,
458 		     &ibuf_pessimistic_insert_mutex);
459 
460 	mtr_start(&mtr);
461 
462 	compile_time_assert(IBUF_SPACE_ID == TRX_SYS_SPACE);
463 	compile_time_assert(IBUF_SPACE_ID == 0);
464 	mtr_x_lock_space(fil_system.sys_space, &mtr);
465 
466 	mutex_enter(&ibuf_mutex);
467 
468 	header_page = ibuf_header_page_get(&mtr);
469 
470 	if (!header_page) {
471 		return (DB_DECRYPTION_FAILED);
472 	}
473 
474 	fseg_n_reserved_pages(header_page + IBUF_HEADER + IBUF_TREE_SEG_HEADER,
475 			      &n_used, &mtr);
476 
477 	ut_ad(n_used >= 2);
478 
479 	ibuf->seg_size = n_used;
480 
481 	{
482 		buf_block_t*	block;
483 
484 		block = buf_page_get(
485 			page_id_t(IBUF_SPACE_ID, FSP_IBUF_TREE_ROOT_PAGE_NO),
486 			0, RW_X_LATCH, &mtr);
487 
488 		buf_block_dbg_add_level(block, SYNC_IBUF_TREE_NODE);
489 
490 		root = buf_block_get_frame(block);
491 	}
492 
493 	ibuf_size_update(root);
494 	mutex_exit(&ibuf_mutex);
495 
496 	ibuf->empty = page_is_empty(root);
497 	mtr.commit();
498 
499 	ibuf->index = dict_mem_index_create(
500 		dict_mem_table_create("innodb_change_buffer",
501 				      fil_system.sys_space, 1, 0, 0, 0),
502 		"CLUST_IND",
503 		DICT_CLUSTERED | DICT_IBUF, 1);
504 	ibuf->index->id = DICT_IBUF_ID_MIN + IBUF_SPACE_ID;
505 	ibuf->index->n_uniq = REC_MAX_N_FIELDS;
506 	rw_lock_create(index_tree_rw_lock_key, &ibuf->index->lock,
507 		       SYNC_IBUF_INDEX_TREE);
508 #ifdef BTR_CUR_ADAPT
509 	ibuf->index->search_info = btr_search_info_create(ibuf->index->heap);
510 #endif /* BTR_CUR_ADAPT */
511 	ibuf->index->page = FSP_IBUF_TREE_ROOT_PAGE_NO;
512 	ut_d(ibuf->index->cached = TRUE);
513 
514 #if defined UNIV_DEBUG || defined UNIV_IBUF_DEBUG
515 	if (!ibuf_dump) {
516 		return error;
517 	}
518 	ib::info() << "Dumping the change buffer";
519 	ibuf_mtr_start(&mtr);
520 	btr_pcur_t pcur;
521 	if (DB_SUCCESS == btr_pcur_open_at_index_side(
522 		    true, ibuf->index, BTR_SEARCH_LEAF, &pcur,
523 		    true, 0, &mtr)) {
524 		while (btr_pcur_move_to_next_user_rec(&pcur, &mtr)) {
525 			rec_print_old(stderr, btr_pcur_get_rec(&pcur));
526 		}
527 	}
528 	ibuf_mtr_commit(&mtr);
529 	ib::info() << "Dumped the change buffer";
530 #endif
531 
532 	return (error);
533 }
534 
535 /*********************************************************************//**
536 Updates the max_size value for ibuf. */
537 void
ibuf_max_size_update(ulint new_val)538 ibuf_max_size_update(
539 /*=================*/
540 	ulint	new_val)	/*!< in: new value in terms of
541 				percentage of the buffer pool size */
542 {
543 	ulint	new_size = ((buf_pool_get_curr_size() >> srv_page_size_shift)
544 			    * new_val) / 100;
545 	mutex_enter(&ibuf_mutex);
546 	ibuf->max_size = new_size;
547 	mutex_exit(&ibuf_mutex);
548 }
549 
550 
551 /** Apply MLOG_IBUF_BITMAP_INIT when crash-upgrading */
ibuf_bitmap_init_apply(buf_block_t * block)552 ATTRIBUTE_COLD void ibuf_bitmap_init_apply(buf_block_t* block)
553 {
554 	page_t*	page;
555 	ulint	byte_offset;
556 
557 	page = buf_block_get_frame(block);
558 	fil_page_set_type(page, FIL_PAGE_IBUF_BITMAP);
559 
560 	/* Write all zeros to the bitmap */
561 	compile_time_assert(!(IBUF_BITS_PER_PAGE % 2));
562 
563 	byte_offset = UT_BITS_IN_BYTES(block->physical_size()
564 				       * IBUF_BITS_PER_PAGE);
565 
566 	memset(page + IBUF_BITMAP, 0, byte_offset);
567 }
568 
569 # ifdef UNIV_DEBUG
570 /** Gets the desired bits for a given page from a bitmap page.
571 @param[in]	page		bitmap page
572 @param[in]	page_id		page id whose bits to get
573 @param[in]	zip_size	ROW_FORMAT=COMPRESSED page size, or 0
574 @param[in]	bit		IBUF_BITMAP_FREE, IBUF_BITMAP_BUFFERED, ...
575 @param[in,out]	mtr		mini-transaction holding an x-latch on the
576 bitmap page
577 @return value of bits */
578 #  define ibuf_bitmap_page_get_bits(page, page_id, zip_size, bit, mtr)	\
579 	ibuf_bitmap_page_get_bits_low(page, page_id, zip_size,		\
580 				      MTR_MEMO_PAGE_X_FIX, mtr, bit)
581 # else /* UNIV_DEBUG */
582 /** Gets the desired bits for a given page from a bitmap page.
583 @param[in]	page		bitmap page
584 @param[in]	page_id		page id whose bits to get
585 @param[in]	zip_size	ROW_FORMAT=COMPRESSED page size, or 0
586 @param[in]	bit		IBUF_BITMAP_FREE, IBUF_BITMAP_BUFFERED, ...
587 @param[in,out]	mtr		mini-transaction holding an x-latch on the
588 bitmap page
589 @return value of bits */
590 #  define ibuf_bitmap_page_get_bits(page, page_id, zip_size, bit, mtr)	\
591 	ibuf_bitmap_page_get_bits_low(page, page_id, zip_size, bit)
592 # endif /* UNIV_DEBUG */
593 
594 /** Gets the desired bits for a given page from a bitmap page.
595 @param[in]	page		bitmap page
596 @param[in]	page_id		page id whose bits to get
597 @param[in]	zip_size	ROW_FORMAT=COMPRESSED page size, or 0
598 @param[in]	latch_type	MTR_MEMO_PAGE_X_FIX, MTR_MEMO_BUF_FIX, ...
599 @param[in,out]	mtr		mini-transaction holding latch_type on the
600 bitmap page
601 @param[in]	bit		IBUF_BITMAP_FREE, IBUF_BITMAP_BUFFERED, ...
602 @return value of bits */
603 UNIV_INLINE
604 ulint
ibuf_bitmap_page_get_bits_low(const page_t * page,const page_id_t page_id,ulint zip_size,ulint latch_type,mtr_t * mtr,ulint bit)605 ibuf_bitmap_page_get_bits_low(
606 	const page_t*		page,
607 	const page_id_t		page_id,
608 	ulint			zip_size,
609 #ifdef UNIV_DEBUG
610 	ulint			latch_type,
611 	mtr_t*			mtr,
612 #endif /* UNIV_DEBUG */
613 	ulint			bit)
614 {
615 	ulint	byte_offset;
616 	ulint	bit_offset;
617 	ulint	map_byte;
618 	ulint	value;
619 	const ulint size = zip_size ? zip_size : srv_page_size;
620 
621 	ut_ad(ut_is_2pow(zip_size));
622 	ut_ad(bit < IBUF_BITS_PER_PAGE);
623 	compile_time_assert(!(IBUF_BITS_PER_PAGE % 2));
624 	ut_ad(mtr_memo_contains_page(mtr, page, latch_type));
625 
626 	bit_offset = (page_id.page_no() & (size - 1))
627 		* IBUF_BITS_PER_PAGE + bit;
628 
629 	byte_offset = bit_offset / 8;
630 	bit_offset = bit_offset % 8;
631 
632 	ut_ad(byte_offset + IBUF_BITMAP < srv_page_size);
633 
634 	map_byte = mach_read_from_1(page + IBUF_BITMAP + byte_offset);
635 
636 	value = ut_bit_get_nth(map_byte, bit_offset);
637 
638 	if (bit == IBUF_BITMAP_FREE) {
639 		ut_ad(bit_offset + 1 < 8);
640 
641 		value = value * 2 + ut_bit_get_nth(map_byte, bit_offset + 1);
642 	}
643 
644 	return(value);
645 }
646 
647 /** Sets the desired bit for a given page in a bitmap page.
648 @param[in,out]	page		bitmap page
649 @param[in]	page_id		page id whose bits to set
650 @param[in]	physical_size	page size
651 @param[in]	bit		IBUF_BITMAP_FREE, IBUF_BITMAP_BUFFERED, ...
652 @param[in]	val		value to set
653 @param[in,out]	mtr		mtr containing an x-latch to the bitmap page */
654 static
655 void
ibuf_bitmap_page_set_bits(page_t * page,const page_id_t page_id,ulint physical_size,ulint bit,ulint val,mtr_t * mtr)656 ibuf_bitmap_page_set_bits(
657 	page_t*			page,
658 	const page_id_t		page_id,
659 	ulint			physical_size,
660 	ulint			bit,
661 	ulint			val,
662 	mtr_t*			mtr)
663 {
664 	ulint	byte_offset;
665 	ulint	bit_offset;
666 	ulint	map_byte;
667 
668 	ut_ad(bit < IBUF_BITS_PER_PAGE);
669 	compile_time_assert(!(IBUF_BITS_PER_PAGE % 2));
670 	ut_ad(mtr_memo_contains_page(mtr, page, MTR_MEMO_PAGE_X_FIX));
671 	ut_ad(mtr->is_named_space(page_id.space()));
672 
673 	bit_offset = (page_id.page_no() % physical_size)
674 		* IBUF_BITS_PER_PAGE + bit;
675 
676 	byte_offset = bit_offset / 8;
677 	bit_offset = bit_offset % 8;
678 
679 	ut_ad(byte_offset + IBUF_BITMAP < srv_page_size);
680 
681 	map_byte = mach_read_from_1(page + IBUF_BITMAP + byte_offset);
682 
683 	if (bit == IBUF_BITMAP_FREE) {
684 		ut_ad(bit_offset + 1 < 8);
685 		ut_ad(val <= 3);
686 
687 		map_byte = ut_bit_set_nth(map_byte, bit_offset, val / 2);
688 		map_byte = ut_bit_set_nth(map_byte, bit_offset + 1, val % 2);
689 	} else {
690 		ut_ad(val <= 1);
691 		map_byte = ut_bit_set_nth(map_byte, bit_offset, val);
692 	}
693 
694 	mlog_write_ulint(page + IBUF_BITMAP + byte_offset, map_byte,
695 			 MLOG_1BYTE, mtr);
696 }
697 
698 /** Calculates the bitmap page number for a given page number.
699 @param[in]	page_id		page id
700 @param[in]	size		page size
701 @return the bitmap page id where the file page is mapped */
ibuf_bitmap_page_no_calc(const page_id_t page_id,ulint size)702 inline page_id_t ibuf_bitmap_page_no_calc(const page_id_t page_id, ulint size)
703 {
704 	if (!size) size = srv_page_size;
705 
706 	return page_id_t(page_id.space(), FSP_IBUF_BITMAP_OFFSET
707 			 + (page_id.page_no() & ~(size - 1)));
708 }
709 
710 /** Gets the ibuf bitmap page where the bits describing a given file page are
711 stored.
712 @param[in]	page_id		page id of the file page
713 @param[in]	zip_size	ROW_FORMAT=COMPRESSED page size, or 0
714 @param[in]	file		file name
715 @param[in]	line		line where called
716 @param[in,out]	mtr		mini-transaction
717 @return bitmap page where the file page is mapped, that is, the bitmap
718 page containing the descriptor bits for the file page; the bitmap page
719 is x-latched */
720 static
721 page_t*
ibuf_bitmap_get_map_page_func(const page_id_t page_id,ulint zip_size,const char * file,unsigned line,mtr_t * mtr)722 ibuf_bitmap_get_map_page_func(
723 	const page_id_t		page_id,
724 	ulint			zip_size,
725 	const char*		file,
726 	unsigned		line,
727 	mtr_t*			mtr)
728 {
729 	buf_block_t*	block = NULL;
730 	dberr_t		err = DB_SUCCESS;
731 
732 	block = buf_page_get_gen(ibuf_bitmap_page_no_calc(page_id, zip_size),
733 				 zip_size, RW_X_LATCH, NULL, BUF_GET,
734 				 file, line, mtr, &err);
735 
736 	if (err != DB_SUCCESS) {
737 		return NULL;
738 	}
739 
740 
741 	buf_block_dbg_add_level(block, SYNC_IBUF_BITMAP);
742 
743 	return(buf_block_get_frame(block));
744 }
745 
746 /** Gets the ibuf bitmap page where the bits describing a given file page are
747 stored.
748 @param[in]	page_id		page id of the file page
749 @param[in]	zip_size	ROW_FORMAT=COMPRESSED page size, or 0
750 @param[in,out]	mtr		mini-transaction
751 @return bitmap page where the file page is mapped, that is, the bitmap
752 page containing the descriptor bits for the file page; the bitmap page
753 is x-latched */
754 #define ibuf_bitmap_get_map_page(page_id, zip_size, mtr)	\
755 	ibuf_bitmap_get_map_page_func(page_id, zip_size, \
756 				      __FILE__, __LINE__, mtr)
757 
758 /************************************************************************//**
759 Sets the free bits of the page in the ibuf bitmap. This is done in a separate
760 mini-transaction, hence this operation does not restrict further work to only
761 ibuf bitmap operations, which would result if the latch to the bitmap page
762 were kept. */
763 UNIV_INLINE
764 void
ibuf_set_free_bits_low(const buf_block_t * block,ulint val,mtr_t * mtr)765 ibuf_set_free_bits_low(
766 /*===================*/
767 	const buf_block_t*	block,	/*!< in: index page; free bits are set if
768 					the index is non-clustered and page
769 					level is 0 */
770 	ulint			val,	/*!< in: value to set: < 4 */
771 	mtr_t*			mtr)	/*!< in/out: mtr */
772 {
773 	page_t*	bitmap_page;
774 	buf_frame_t* frame;
775 
776 	ut_ad(mtr->is_named_space(block->page.id.space()));
777 
778 	if (!block) {
779 		return;
780 	}
781 
782 	frame = buf_block_get_frame(block);
783 
784 	if (!frame || !page_is_leaf(frame)) {
785 		return;
786 	}
787 
788 	bitmap_page = ibuf_bitmap_get_map_page(block->page.id,
789 					       block->zip_size(), mtr);
790 
791 #ifdef UNIV_IBUF_DEBUG
792 	ut_a(val <= ibuf_index_page_calc_free(block));
793 #endif /* UNIV_IBUF_DEBUG */
794 
795 	ibuf_bitmap_page_set_bits(
796 		bitmap_page, block->page.id, block->physical_size(),
797 		IBUF_BITMAP_FREE, val, mtr);
798 }
799 
800 /************************************************************************//**
801 Sets the free bit of the page in the ibuf bitmap. This is done in a separate
802 mini-transaction, hence this operation does not restrict further work to only
803 ibuf bitmap operations, which would result if the latch to the bitmap page
804 were kept. */
805 void
ibuf_set_free_bits_func(buf_block_t * block,ulint max_val,ulint val)806 ibuf_set_free_bits_func(
807 /*====================*/
808 	buf_block_t*	block,	/*!< in: index page of a non-clustered index;
809 				free bit is reset if page level is 0 */
810 #ifdef UNIV_IBUF_DEBUG
811 	ulint		max_val,/*!< in: ULINT_UNDEFINED or a maximum
812 				value which the bits must have before
813 				setting; this is for debugging */
814 #endif /* UNIV_IBUF_DEBUG */
815 	ulint		val)	/*!< in: value to set: < 4 */
816 {
817 	mtr_t	mtr;
818 	page_t*	page;
819 	page_t*	bitmap_page;
820 
821 	page = buf_block_get_frame(block);
822 
823 	if (!page_is_leaf(page)) {
824 
825 		return;
826 	}
827 
828 	mtr_start(&mtr);
829 	const fil_space_t* space = mtr.set_named_space_id(
830 		block->page.id.space());
831 
832 	bitmap_page = ibuf_bitmap_get_map_page(block->page.id,
833 					       block->zip_size(), &mtr);
834 
835 	switch (space->purpose) {
836 	case FIL_TYPE_LOG:
837 		ut_ad(0);
838 		break;
839 	case FIL_TYPE_TABLESPACE:
840 		break;
841 		/* fall through */
842 	case FIL_TYPE_TEMPORARY:
843 	case FIL_TYPE_IMPORT:
844 		mtr_set_log_mode(&mtr, MTR_LOG_NO_REDO);
845 	}
846 
847 #ifdef UNIV_IBUF_DEBUG
848 	if (max_val != ULINT_UNDEFINED) {
849 		ulint	old_val;
850 
851 		old_val = ibuf_bitmap_page_get_bits(
852 			bitmap_page, block->page.id,
853 			IBUF_BITMAP_FREE, &mtr);
854 # if 0
855 		if (old_val != max_val) {
856 			fprintf(stderr,
857 				"Ibuf: page %lu old val %lu max val %lu\n",
858 				page_get_page_no(page),
859 				old_val, max_val);
860 		}
861 # endif
862 
863 		ut_a(old_val <= max_val);
864 	}
865 # if 0
866 	fprintf(stderr, "Setting page no %lu free bits to %lu should be %lu\n",
867 		page_get_page_no(page), val,
868 		ibuf_index_page_calc_free(block));
869 # endif
870 
871 	ut_a(val <= ibuf_index_page_calc_free(block));
872 #endif /* UNIV_IBUF_DEBUG */
873 
874 	ibuf_bitmap_page_set_bits(
875 		bitmap_page, block->page.id, block->physical_size(),
876 		IBUF_BITMAP_FREE, val, &mtr);
877 
878 	mtr_commit(&mtr);
879 }
880 
881 /************************************************************************//**
882 Resets the free bits of the page in the ibuf bitmap. This is done in a
883 separate mini-transaction, hence this operation does not restrict
884 further work to only ibuf bitmap operations, which would result if the
885 latch to the bitmap page were kept.  NOTE: The free bits in the insert
886 buffer bitmap must never exceed the free space on a page.  It is safe
887 to decrement or reset the bits in the bitmap in a mini-transaction
888 that is committed before the mini-transaction that affects the free
889 space. */
890 void
ibuf_reset_free_bits(buf_block_t * block)891 ibuf_reset_free_bits(
892 /*=================*/
893 	buf_block_t*	block)	/*!< in: index page; free bits are set to 0
894 				if the index is a non-clustered
895 				non-unique, and page level is 0 */
896 {
897 	ibuf_set_free_bits(block, 0, ULINT_UNDEFINED);
898 }
899 
900 /**********************************************************************//**
901 Updates the free bits for an uncompressed page to reflect the present
902 state.  Does this in the mtr given, which means that the latching
903 order rules virtually prevent any further operations for this OS
904 thread until mtr is committed.  NOTE: The free bits in the insert
905 buffer bitmap must never exceed the free space on a page.  It is safe
906 to set the free bits in the same mini-transaction that updated the
907 page. */
908 void
ibuf_update_free_bits_low(const buf_block_t * block,ulint max_ins_size,mtr_t * mtr)909 ibuf_update_free_bits_low(
910 /*======================*/
911 	const buf_block_t*	block,		/*!< in: index page */
912 	ulint			max_ins_size,	/*!< in: value of
913 						maximum insert size
914 						with reorganize before
915 						the latest operation
916 						performed to the page */
917 	mtr_t*			mtr)		/*!< in/out: mtr */
918 {
919 	ulint	before;
920 	ulint	after;
921 
922 	ut_a(!buf_block_get_page_zip(block));
923 	ut_ad(mtr->is_named_space(block->page.id.space()));
924 
925 	before = ibuf_index_page_calc_free_bits(srv_page_size,
926 						max_ins_size);
927 
928 	after = ibuf_index_page_calc_free(block);
929 
930 	/* This approach cannot be used on compressed pages, since the
931 	computed value of "before" often does not match the current
932 	state of the bitmap.  This is because the free space may
933 	increase or decrease when a compressed page is reorganized. */
934 	if (before != after) {
935 		ibuf_set_free_bits_low(block, after, mtr);
936 	}
937 }
938 
939 /**********************************************************************//**
940 Updates the free bits for a compressed page to reflect the present
941 state.  Does this in the mtr given, which means that the latching
942 order rules virtually prevent any further operations for this OS
943 thread until mtr is committed.  NOTE: The free bits in the insert
944 buffer bitmap must never exceed the free space on a page.  It is safe
945 to set the free bits in the same mini-transaction that updated the
946 page. */
947 void
ibuf_update_free_bits_zip(buf_block_t * block,mtr_t * mtr)948 ibuf_update_free_bits_zip(
949 /*======================*/
950 	buf_block_t*	block,	/*!< in/out: index page */
951 	mtr_t*		mtr)	/*!< in/out: mtr */
952 {
953 	page_t*	bitmap_page;
954 	ulint	after;
955 
956 	ut_a(block);
957 	buf_frame_t* frame = buf_block_get_frame(block);
958 	ut_a(frame);
959 	ut_a(page_is_leaf(frame));
960 	ut_a(block->zip_size());
961 
962 	bitmap_page = ibuf_bitmap_get_map_page(block->page.id,
963 					       block->zip_size(), mtr);
964 
965 	after = ibuf_index_page_calc_free_zip(block);
966 
967 	if (after == 0) {
968 		/* We move the page to the front of the buffer pool LRU list:
969 		the purpose of this is to prevent those pages to which we
970 		cannot make inserts using the insert buffer from slipping
971 		out of the buffer pool */
972 
973 		buf_page_make_young(&block->page);
974 	}
975 
976 	ibuf_bitmap_page_set_bits(
977 		bitmap_page, block->page.id, block->physical_size(),
978 		IBUF_BITMAP_FREE, after, mtr);
979 }
980 
981 /**********************************************************************//**
982 Updates the free bits for the two pages to reflect the present state.
983 Does this in the mtr given, which means that the latching order rules
984 virtually prevent any further operations until mtr is committed.
985 NOTE: The free bits in the insert buffer bitmap must never exceed the
986 free space on a page.  It is safe to set the free bits in the same
987 mini-transaction that updated the pages. */
988 void
ibuf_update_free_bits_for_two_pages_low(buf_block_t * block1,buf_block_t * block2,mtr_t * mtr)989 ibuf_update_free_bits_for_two_pages_low(
990 /*====================================*/
991 	buf_block_t*	block1,	/*!< in: index page */
992 	buf_block_t*	block2,	/*!< in: index page */
993 	mtr_t*		mtr)	/*!< in: mtr */
994 {
995 	ulint	state;
996 
997 	ut_ad(mtr->is_named_space(block1->page.id.space()));
998 	ut_ad(block1->page.id.space() == block2->page.id.space());
999 
1000 	/* As we have to x-latch two random bitmap pages, we have to acquire
1001 	the bitmap mutex to prevent a deadlock with a similar operation
1002 	performed by another OS thread. */
1003 
1004 	mutex_enter(&ibuf_bitmap_mutex);
1005 
1006 	state = ibuf_index_page_calc_free(block1);
1007 
1008 	ibuf_set_free_bits_low(block1, state, mtr);
1009 
1010 	state = ibuf_index_page_calc_free(block2);
1011 
1012 	ibuf_set_free_bits_low(block2, state, mtr);
1013 
1014 	mutex_exit(&ibuf_bitmap_mutex);
1015 }
1016 
1017 /** Returns TRUE if the page is one of the fixed address ibuf pages.
1018 @param[in]	page_id		page id
1019 @param[in]	zip_size	ROW_FORMAT=COMPRESSED page size, or 0
1020 @return TRUE if a fixed address ibuf i/o page */
ibuf_fixed_addr_page(const page_id_t page_id,ulint zip_size)1021 inline bool ibuf_fixed_addr_page(const page_id_t page_id, ulint zip_size)
1022 {
1023 	return((page_id.space() == IBUF_SPACE_ID
1024 		&& page_id.page_no() == IBUF_TREE_ROOT_PAGE_NO)
1025 	       || ibuf_bitmap_page(page_id, zip_size));
1026 }
1027 
1028 /** Checks if a page is a level 2 or 3 page in the ibuf hierarchy of pages.
1029 Must not be called when recv_no_ibuf_operations==true.
1030 @param[in]	page_id		page id
1031 @param[in]	zip_size	ROW_FORMAT=COMPRESSED page size, or 0
1032 @param[in]	x_latch		FALSE if relaxed check (avoid latching the
1033 bitmap page)
1034 @param[in]	file		file name
1035 @param[in]	line		line where called
1036 @param[in,out]	mtr		mtr which will contain an x-latch to the
1037 bitmap page if the page is not one of the fixed address ibuf pages, or NULL,
1038 in which case a new transaction is created.
1039 @return TRUE if level 2 or level 3 page */
1040 bool
ibuf_page_low(const page_id_t page_id,ulint zip_size,bool x_latch,const char * file,unsigned line,mtr_t * mtr)1041 ibuf_page_low(
1042 	const page_id_t		page_id,
1043 	ulint			zip_size,
1044 #ifdef UNIV_DEBUG
1045 	bool			x_latch,
1046 #endif /* UNIV_DEBUG */
1047 	const char*		file,
1048 	unsigned		line,
1049 	mtr_t*			mtr)
1050 {
1051 	ibool	ret;
1052 	mtr_t	local_mtr;
1053 	page_t*	bitmap_page;
1054 
1055 	ut_ad(!recv_no_ibuf_operations);
1056 	ut_ad(x_latch || mtr == NULL);
1057 
1058 	if (ibuf_fixed_addr_page(page_id, zip_size)) {
1059 		return(true);
1060 	} else if (page_id.space() != IBUF_SPACE_ID) {
1061 		return(false);
1062 	}
1063 
1064 	compile_time_assert(IBUF_SPACE_ID == 0);
1065 	ut_ad(fil_system.sys_space->purpose == FIL_TYPE_TABLESPACE);
1066 
1067 #ifdef UNIV_DEBUG
1068 	if (!x_latch) {
1069 		mtr_start(&local_mtr);
1070 
1071 		/* Get the bitmap page without a page latch, so that
1072 		we will not be violating the latching order when
1073 		another bitmap page has already been latched by this
1074 		thread. The page will be buffer-fixed, and thus it
1075 		cannot be removed or relocated while we are looking at
1076 		it. The contents of the page could change, but the
1077 		IBUF_BITMAP_IBUF bit that we are interested in should
1078 		not be modified by any other thread. Nobody should be
1079 		calling ibuf_add_free_page() or ibuf_remove_free_page()
1080 		while the page is linked to the insert buffer b-tree. */
1081 		dberr_t err = DB_SUCCESS;
1082 
1083 		buf_block_t* block = buf_page_get_gen(
1084 			ibuf_bitmap_page_no_calc(page_id, zip_size),
1085 			zip_size, RW_NO_LATCH, NULL, BUF_GET_NO_LATCH,
1086 			file, line, &local_mtr, &err);
1087 
1088 		bitmap_page = buf_block_get_frame(block);
1089 
1090 		ret = ibuf_bitmap_page_get_bits_low(
1091 			bitmap_page, page_id, zip_size,
1092 			MTR_MEMO_BUF_FIX, &local_mtr, IBUF_BITMAP_IBUF);
1093 
1094 		mtr_commit(&local_mtr);
1095 		return(ret);
1096 	}
1097 #endif /* UNIV_DEBUG */
1098 
1099 	if (mtr == NULL) {
1100 		mtr = &local_mtr;
1101 		mtr_start(mtr);
1102 	}
1103 
1104 	bitmap_page = ibuf_bitmap_get_map_page_func(page_id, zip_size,
1105 						    file, line, mtr);
1106 
1107 	ret = ibuf_bitmap_page_get_bits(bitmap_page, page_id, zip_size,
1108 					IBUF_BITMAP_IBUF, mtr);
1109 
1110 	if (mtr == &local_mtr) {
1111 		mtr_commit(mtr);
1112 	}
1113 
1114 	return(ret);
1115 }
1116 
1117 #ifdef UNIV_DEBUG
1118 # define ibuf_rec_get_page_no(mtr,rec) ibuf_rec_get_page_no_func(mtr,rec)
1119 #else /* UNIV_DEBUG */
1120 # define ibuf_rec_get_page_no(mtr,rec) ibuf_rec_get_page_no_func(rec)
1121 #endif /* UNIV_DEBUG */
1122 
1123 /********************************************************************//**
1124 Returns the page number field of an ibuf record.
1125 @return page number */
1126 static
1127 ulint
ibuf_rec_get_page_no_func(mtr_t * mtr,const rec_t * rec)1128 ibuf_rec_get_page_no_func(
1129 /*======================*/
1130 #ifdef UNIV_DEBUG
1131 	mtr_t*		mtr,	/*!< in: mini-transaction owning rec */
1132 #endif /* UNIV_DEBUG */
1133 	const rec_t*	rec)	/*!< in: ibuf record */
1134 {
1135 	const byte*	field;
1136 	ulint		len;
1137 
1138 	ut_ad(mtr_memo_contains_page_flagged(mtr, rec,
1139 					     MTR_MEMO_PAGE_X_FIX
1140 					     | MTR_MEMO_PAGE_S_FIX));
1141 	ut_ad(ibuf_inside(mtr));
1142 	ut_ad(rec_get_n_fields_old(rec) > 2);
1143 
1144 	field = rec_get_nth_field_old(rec, IBUF_REC_FIELD_MARKER, &len);
1145 
1146 	ut_a(len == 1);
1147 
1148 	field = rec_get_nth_field_old(rec, IBUF_REC_FIELD_PAGE, &len);
1149 
1150 	ut_a(len == 4);
1151 
1152 	return(mach_read_from_4(field));
1153 }
1154 
1155 #ifdef UNIV_DEBUG
1156 # define ibuf_rec_get_space(mtr,rec) ibuf_rec_get_space_func(mtr,rec)
1157 #else /* UNIV_DEBUG */
1158 # define ibuf_rec_get_space(mtr,rec) ibuf_rec_get_space_func(rec)
1159 #endif /* UNIV_DEBUG */
1160 
1161 /********************************************************************//**
1162 Returns the space id field of an ibuf record. For < 4.1.x format records
1163 returns 0.
1164 @return space id */
1165 static
1166 ulint
ibuf_rec_get_space_func(mtr_t * mtr,const rec_t * rec)1167 ibuf_rec_get_space_func(
1168 /*====================*/
1169 #ifdef UNIV_DEBUG
1170 	mtr_t*		mtr,	/*!< in: mini-transaction owning rec */
1171 #endif /* UNIV_DEBUG */
1172 	const rec_t*	rec)	/*!< in: ibuf record */
1173 {
1174 	const byte*	field;
1175 	ulint		len;
1176 
1177 	ut_ad(mtr_memo_contains_page_flagged(mtr, rec, MTR_MEMO_PAGE_X_FIX
1178 					     | MTR_MEMO_PAGE_S_FIX));
1179 	ut_ad(ibuf_inside(mtr));
1180 	ut_ad(rec_get_n_fields_old(rec) > 2);
1181 
1182 	field = rec_get_nth_field_old(rec, IBUF_REC_FIELD_MARKER, &len);
1183 
1184 	ut_a(len == 1);
1185 
1186 	field = rec_get_nth_field_old(rec, IBUF_REC_FIELD_SPACE, &len);
1187 
1188 	ut_a(len == 4);
1189 
1190 	return(mach_read_from_4(field));
1191 }
1192 
1193 #ifdef UNIV_DEBUG
1194 # define ibuf_rec_get_info(mtr,rec,op,comp,info_len,counter)	\
1195 	ibuf_rec_get_info_func(mtr,rec,op,comp,info_len,counter)
1196 #else /* UNIV_DEBUG */
1197 # define ibuf_rec_get_info(mtr,rec,op,comp,info_len,counter)	\
1198 	ibuf_rec_get_info_func(rec,op,comp,info_len,counter)
1199 #endif
1200 /****************************************************************//**
1201 Get various information about an ibuf record in >= 4.1.x format. */
1202 static
1203 void
ibuf_rec_get_info_func(mtr_t * mtr,const rec_t * rec,ibuf_op_t * op,ibool * comp,ulint * info_len,ulint * counter)1204 ibuf_rec_get_info_func(
1205 /*===================*/
1206 #ifdef UNIV_DEBUG
1207 	mtr_t*		mtr,	/*!< in: mini-transaction owning rec */
1208 #endif /* UNIV_DEBUG */
1209 	const rec_t*	rec,		/*!< in: ibuf record */
1210 	ibuf_op_t*	op,		/*!< out: operation type, or NULL */
1211 	ibool*		comp,		/*!< out: compact flag, or NULL */
1212 	ulint*		info_len,	/*!< out: length of info fields at the
1213 					start of the fourth field, or
1214 					NULL */
1215 	ulint*		counter)	/*!< in: counter value, or NULL */
1216 {
1217 	const byte*	types;
1218 	ulint		fields;
1219 	ulint		len;
1220 
1221 	/* Local variables to shadow arguments. */
1222 	ibuf_op_t	op_local;
1223 	ibool		comp_local;
1224 	ulint		info_len_local;
1225 	ulint		counter_local;
1226 
1227 	ut_ad(mtr_memo_contains_page_flagged(mtr, rec, MTR_MEMO_PAGE_X_FIX
1228 					     | MTR_MEMO_PAGE_S_FIX));
1229 	ut_ad(ibuf_inside(mtr));
1230 	fields = rec_get_n_fields_old(rec);
1231 	ut_a(fields > IBUF_REC_FIELD_USER);
1232 
1233 	types = rec_get_nth_field_old(rec, IBUF_REC_FIELD_METADATA, &len);
1234 
1235 	info_len_local = len % DATA_NEW_ORDER_NULL_TYPE_BUF_SIZE;
1236 	compile_time_assert(IBUF_REC_INFO_SIZE
1237 			    < DATA_NEW_ORDER_NULL_TYPE_BUF_SIZE);
1238 
1239 	switch (info_len_local) {
1240 	case 0:
1241 	case 1:
1242 		op_local = IBUF_OP_INSERT;
1243 		comp_local = info_len_local;
1244 		ut_ad(!counter);
1245 		counter_local = ULINT_UNDEFINED;
1246 		break;
1247 
1248 	case IBUF_REC_INFO_SIZE:
1249 		op_local = (ibuf_op_t) types[IBUF_REC_OFFSET_TYPE];
1250 		comp_local = types[IBUF_REC_OFFSET_FLAGS] & IBUF_REC_COMPACT;
1251 		counter_local = mach_read_from_2(
1252 			types + IBUF_REC_OFFSET_COUNTER);
1253 		break;
1254 
1255 	default:
1256 		ut_error;
1257 	}
1258 
1259 	ut_a(op_local < IBUF_OP_COUNT);
1260 	ut_a((len - info_len_local) ==
1261 	     (fields - IBUF_REC_FIELD_USER)
1262 	     * DATA_NEW_ORDER_NULL_TYPE_BUF_SIZE);
1263 
1264 	if (op) {
1265 		*op = op_local;
1266 	}
1267 
1268 	if (comp) {
1269 		*comp = comp_local;
1270 	}
1271 
1272 	if (info_len) {
1273 		*info_len = info_len_local;
1274 	}
1275 
1276 	if (counter) {
1277 		*counter = counter_local;
1278 	}
1279 }
1280 
1281 #ifdef UNIV_DEBUG
1282 # define ibuf_rec_get_op_type(mtr,rec) ibuf_rec_get_op_type_func(mtr,rec)
1283 #else /* UNIV_DEBUG */
1284 # define ibuf_rec_get_op_type(mtr,rec) ibuf_rec_get_op_type_func(rec)
1285 #endif
1286 
1287 /****************************************************************//**
1288 Returns the operation type field of an ibuf record.
1289 @return operation type */
1290 static
1291 ibuf_op_t
ibuf_rec_get_op_type_func(mtr_t * mtr,const rec_t * rec)1292 ibuf_rec_get_op_type_func(
1293 /*======================*/
1294 #ifdef UNIV_DEBUG
1295 	mtr_t*		mtr,	/*!< in: mini-transaction owning rec */
1296 #endif /* UNIV_DEBUG */
1297 	const rec_t*	rec)	/*!< in: ibuf record */
1298 {
1299 	ulint		len;
1300 
1301 	ut_ad(mtr_memo_contains_page_flagged(mtr, rec, MTR_MEMO_PAGE_X_FIX
1302 					     | MTR_MEMO_PAGE_S_FIX));
1303 	ut_ad(ibuf_inside(mtr));
1304 	ut_ad(rec_get_n_fields_old(rec) > 2);
1305 
1306 	(void) rec_get_nth_field_old(rec, IBUF_REC_FIELD_MARKER, &len);
1307 
1308 	if (len > 1) {
1309 		/* This is a < 4.1.x format record */
1310 
1311 		return(IBUF_OP_INSERT);
1312 	} else {
1313 		ibuf_op_t	op;
1314 
1315 		ibuf_rec_get_info(mtr, rec, &op, NULL, NULL, NULL);
1316 
1317 		return(op);
1318 	}
1319 }
1320 
1321 /****************************************************************//**
1322 Read the first two bytes from a record's fourth field (counter field in new
1323 records; something else in older records).
1324 @return "counter" field, or ULINT_UNDEFINED if for some reason it
1325 can't be read */
1326 ulint
ibuf_rec_get_counter(const rec_t * rec)1327 ibuf_rec_get_counter(
1328 /*=================*/
1329 	const rec_t*	rec)	/*!< in: ibuf record */
1330 {
1331 	const byte*	ptr;
1332 	ulint		len;
1333 
1334 	if (rec_get_n_fields_old(rec) <= IBUF_REC_FIELD_METADATA) {
1335 
1336 		return(ULINT_UNDEFINED);
1337 	}
1338 
1339 	ptr = rec_get_nth_field_old(rec, IBUF_REC_FIELD_METADATA, &len);
1340 
1341 	if (len >= 2) {
1342 
1343 		return(mach_read_from_2(ptr));
1344 	} else {
1345 
1346 		return(ULINT_UNDEFINED);
1347 	}
1348 }
1349 
1350 
1351 /**
1352   Add accumulated operation counts to a permanent array.
1353   Both arrays must be of size IBUF_OP_COUNT.
1354 */
ibuf_add_ops(Atomic_counter<ulint> * out,const ulint * in)1355 static void ibuf_add_ops(Atomic_counter<ulint> *out, const ulint *in)
1356 {
1357   for (auto i = 0; i < IBUF_OP_COUNT; i++)
1358     out[i]+= in[i];
1359 }
1360 
1361 
1362 /****************************************************************//**
1363 Print operation counts. The array must be of size IBUF_OP_COUNT. */
1364 static
1365 void
ibuf_print_ops(const Atomic_counter<ulint> * ops,FILE * file)1366 ibuf_print_ops(
1367 /*===========*/
1368 	const Atomic_counter<ulint>*	ops,	/*!< in: operation counts */
1369 	FILE*				file)	/*!< in: file where to print */
1370 {
1371 	static const char* op_names[] = {
1372 		"insert",
1373 		"delete mark",
1374 		"delete"
1375 	};
1376 	ulint	i;
1377 
1378 	ut_a(UT_ARR_SIZE(op_names) == IBUF_OP_COUNT);
1379 
1380 	for (i = 0; i < IBUF_OP_COUNT; i++) {
1381 		fprintf(file, "%s " ULINTPF "%s", op_names[i],
1382 			ulint{ops[i]}, (i < (IBUF_OP_COUNT - 1)) ? ", " : "");
1383 	}
1384 
1385 	putc('\n', file);
1386 }
1387 
1388 /********************************************************************//**
1389 Creates a dummy index for inserting a record to a non-clustered index.
1390 @return dummy index */
1391 static
1392 dict_index_t*
ibuf_dummy_index_create(ulint n,ibool comp)1393 ibuf_dummy_index_create(
1394 /*====================*/
1395 	ulint		n,	/*!< in: number of fields */
1396 	ibool		comp)	/*!< in: TRUE=use compact record format */
1397 {
1398 	dict_table_t*	table;
1399 	dict_index_t*	index;
1400 
1401 	table = dict_mem_table_create("IBUF_DUMMY", NULL, n, 0,
1402 				      comp ? DICT_TF_COMPACT : 0, 0);
1403 
1404 	index = dict_mem_index_create(table, "IBUF_DUMMY", 0, n);
1405 
1406 	/* avoid ut_ad(index->cached) in dict_index_get_n_unique_in_tree */
1407 	index->cached = TRUE;
1408 	ut_d(index->is_dummy = true);
1409 
1410 	return(index);
1411 }
1412 /********************************************************************//**
1413 Add a column to the dummy index */
1414 static
1415 void
ibuf_dummy_index_add_col(dict_index_t * index,const dtype_t * type,ulint len)1416 ibuf_dummy_index_add_col(
1417 /*=====================*/
1418 	dict_index_t*	index,	/*!< in: dummy index */
1419 	const dtype_t*	type,	/*!< in: the data type of the column */
1420 	ulint		len)	/*!< in: length of the column */
1421 {
1422 	ulint	i	= index->table->n_def;
1423 	dict_mem_table_add_col(index->table, NULL, NULL,
1424 			       dtype_get_mtype(type),
1425 			       dtype_get_prtype(type),
1426 			       dtype_get_len(type));
1427 	dict_index_add_col(index, index->table,
1428 			   dict_table_get_nth_col(index->table, i), len);
1429 }
1430 /********************************************************************//**
1431 Deallocates a dummy index for inserting a record to a non-clustered index. */
1432 static
1433 void
ibuf_dummy_index_free(dict_index_t * index)1434 ibuf_dummy_index_free(
1435 /*==================*/
1436 	dict_index_t*	index)	/*!< in, own: dummy index */
1437 {
1438 	dict_table_t*	table = index->table;
1439 
1440 	dict_mem_index_free(index);
1441 	dict_mem_table_free(table);
1442 }
1443 
1444 #ifdef UNIV_DEBUG
1445 # define ibuf_build_entry_from_ibuf_rec(mtr,ibuf_rec,heap,pindex)	\
1446 	ibuf_build_entry_from_ibuf_rec_func(mtr,ibuf_rec,heap,pindex)
1447 #else /* UNIV_DEBUG */
1448 # define ibuf_build_entry_from_ibuf_rec(mtr,ibuf_rec,heap,pindex)	\
1449 	ibuf_build_entry_from_ibuf_rec_func(ibuf_rec,heap,pindex)
1450 #endif
1451 
1452 /*********************************************************************//**
1453 Builds the entry used to
1454 
1455 1) IBUF_OP_INSERT: insert into a non-clustered index
1456 
1457 2) IBUF_OP_DELETE_MARK: find the record whose delete-mark flag we need to
1458    activate
1459 
1460 3) IBUF_OP_DELETE: find the record we need to delete
1461 
1462 when we have the corresponding record in an ibuf index.
1463 
1464 NOTE that as we copy pointers to fields in ibuf_rec, the caller must
1465 hold a latch to the ibuf_rec page as long as the entry is used!
1466 
1467 @return own: entry to insert to a non-clustered index */
1468 static
1469 dtuple_t*
ibuf_build_entry_from_ibuf_rec_func(mtr_t * mtr,const rec_t * ibuf_rec,mem_heap_t * heap,dict_index_t ** pindex)1470 ibuf_build_entry_from_ibuf_rec_func(
1471 /*================================*/
1472 #ifdef UNIV_DEBUG
1473 	mtr_t*		mtr,	/*!< in: mini-transaction owning rec */
1474 #endif /* UNIV_DEBUG */
1475 	const rec_t*	ibuf_rec,	/*!< in: record in an insert buffer */
1476 	mem_heap_t*	heap,		/*!< in: heap where built */
1477 	dict_index_t**	pindex)		/*!< out, own: dummy index that
1478 					describes the entry */
1479 {
1480 	dtuple_t*	tuple;
1481 	dfield_t*	field;
1482 	ulint		n_fields;
1483 	const byte*	types;
1484 	const byte*	data;
1485 	ulint		len;
1486 	ulint		info_len;
1487 	ulint		i;
1488 	ulint		comp;
1489 	dict_index_t*	index;
1490 
1491 	ut_ad(mtr_memo_contains_page_flagged(mtr, ibuf_rec, MTR_MEMO_PAGE_X_FIX
1492 					     | MTR_MEMO_PAGE_S_FIX));
1493 	ut_ad(ibuf_inside(mtr));
1494 
1495 	data = rec_get_nth_field_old(ibuf_rec, IBUF_REC_FIELD_MARKER, &len);
1496 
1497 	ut_a(len == 1);
1498 	ut_a(*data == 0);
1499 	ut_a(rec_get_n_fields_old(ibuf_rec) > IBUF_REC_FIELD_USER);
1500 
1501 	n_fields = rec_get_n_fields_old(ibuf_rec) - IBUF_REC_FIELD_USER;
1502 
1503 	tuple = dtuple_create(heap, n_fields);
1504 
1505 	types = rec_get_nth_field_old(ibuf_rec, IBUF_REC_FIELD_METADATA, &len);
1506 
1507 	ibuf_rec_get_info(mtr, ibuf_rec, NULL, &comp, &info_len, NULL);
1508 
1509 	index = ibuf_dummy_index_create(n_fields, comp);
1510 
1511 	len -= info_len;
1512 	types += info_len;
1513 
1514 	ut_a(len == n_fields * DATA_NEW_ORDER_NULL_TYPE_BUF_SIZE);
1515 
1516 	for (i = 0; i < n_fields; i++) {
1517 		field = dtuple_get_nth_field(tuple, i);
1518 
1519 		data = rec_get_nth_field_old(
1520 			ibuf_rec, i + IBUF_REC_FIELD_USER, &len);
1521 
1522 		dfield_set_data(field, data, len);
1523 
1524 		dtype_new_read_for_order_and_null_size(
1525 			dfield_get_type(field),
1526 			types + i * DATA_NEW_ORDER_NULL_TYPE_BUF_SIZE);
1527 
1528 		ibuf_dummy_index_add_col(index, dfield_get_type(field), len);
1529 	}
1530 
1531 	index->n_core_null_bytes
1532 		= UT_BITS_IN_BYTES(unsigned(index->n_nullable));
1533 
1534 	/* Prevent an ut_ad() failure in page_zip_write_rec() by
1535 	adding system columns to the dummy table pointed to by the
1536 	dummy secondary index.  The insert buffer is only used for
1537 	secondary indexes, whose records never contain any system
1538 	columns, such as DB_TRX_ID. */
1539 	ut_d(dict_table_add_system_columns(index->table, index->table->heap));
1540 
1541 	*pindex = index;
1542 
1543 	return(tuple);
1544 }
1545 
1546 /******************************************************************//**
1547 Get the data size.
1548 @return size of fields */
1549 UNIV_INLINE
1550 ulint
ibuf_rec_get_size(const rec_t * rec,const byte * types,ulint n_fields,ulint comp)1551 ibuf_rec_get_size(
1552 /*==============*/
1553 	const rec_t*	rec,			/*!< in: ibuf record */
1554 	const byte*	types,			/*!< in: fields */
1555 	ulint		n_fields,		/*!< in: number of fields */
1556 	ulint		comp)			/*!< in: 0=ROW_FORMAT=REDUNDANT,
1557 						nonzero=ROW_FORMAT=COMPACT */
1558 {
1559 	ulint	i;
1560 	ulint	field_offset;
1561 	ulint	types_offset;
1562 	ulint	size = 0;
1563 
1564 	field_offset = IBUF_REC_FIELD_USER;
1565 	types_offset = DATA_NEW_ORDER_NULL_TYPE_BUF_SIZE;
1566 
1567 	for (i = 0; i < n_fields; i++) {
1568 		ulint		len;
1569 		dtype_t		dtype;
1570 
1571 		rec_get_nth_field_offs_old(rec, i + field_offset, &len);
1572 
1573 		if (len != UNIV_SQL_NULL) {
1574 			size += len;
1575 		} else {
1576 			dtype_new_read_for_order_and_null_size(&dtype, types);
1577 
1578 			size += dtype_get_sql_null_size(&dtype, comp);
1579 		}
1580 
1581 		types += types_offset;
1582 	}
1583 
1584 	return(size);
1585 }
1586 
1587 #ifdef UNIV_DEBUG
1588 # define ibuf_rec_get_volume(mtr,rec) ibuf_rec_get_volume_func(mtr,rec)
1589 #else /* UNIV_DEBUG */
1590 # define ibuf_rec_get_volume(mtr,rec) ibuf_rec_get_volume_func(rec)
1591 #endif
1592 
1593 /********************************************************************//**
1594 Returns the space taken by a stored non-clustered index entry if converted to
1595 an index record.
1596 @return size of index record in bytes + an upper limit of the space
1597 taken in the page directory */
1598 static
1599 ulint
ibuf_rec_get_volume_func(mtr_t * mtr,const rec_t * ibuf_rec)1600 ibuf_rec_get_volume_func(
1601 /*=====================*/
1602 #ifdef UNIV_DEBUG
1603 	mtr_t*		mtr,	/*!< in: mini-transaction owning rec */
1604 #endif /* UNIV_DEBUG */
1605 	const rec_t*	ibuf_rec)/*!< in: ibuf record */
1606 {
1607 	ulint		len;
1608 	const byte*	data;
1609 	const byte*	types;
1610 	ulint		n_fields;
1611 	ulint		data_size;
1612 	ulint		comp;
1613 	ibuf_op_t	op;
1614 	ulint		info_len;
1615 
1616 	ut_ad(mtr_memo_contains_page_flagged(mtr, ibuf_rec, MTR_MEMO_PAGE_X_FIX
1617 					     | MTR_MEMO_PAGE_S_FIX));
1618 	ut_ad(ibuf_inside(mtr));
1619 	ut_ad(rec_get_n_fields_old(ibuf_rec) > 2);
1620 
1621 	data = rec_get_nth_field_old(ibuf_rec, IBUF_REC_FIELD_MARKER, &len);
1622 	ut_a(len == 1);
1623 	ut_a(*data == 0);
1624 
1625 	types = rec_get_nth_field_old(
1626 		ibuf_rec, IBUF_REC_FIELD_METADATA, &len);
1627 
1628 	ibuf_rec_get_info(mtr, ibuf_rec, &op, &comp, &info_len, NULL);
1629 
1630 	if (op == IBUF_OP_DELETE_MARK || op == IBUF_OP_DELETE) {
1631 		/* Delete-marking a record doesn't take any
1632 		additional space, and while deleting a record
1633 		actually frees up space, we have to play it safe and
1634 		pretend it takes no additional space (the record
1635 		might not exist, etc.).  */
1636 
1637 		return(0);
1638 	} else if (comp) {
1639 		dtuple_t*	entry;
1640 		ulint		volume;
1641 		dict_index_t*	dummy_index;
1642 		mem_heap_t*	heap = mem_heap_create(500);
1643 
1644 		entry = ibuf_build_entry_from_ibuf_rec(mtr, ibuf_rec,
1645 			heap, &dummy_index);
1646 
1647 		volume = rec_get_converted_size(dummy_index, entry, 0);
1648 
1649 		ibuf_dummy_index_free(dummy_index);
1650 		mem_heap_free(heap);
1651 
1652 		return(volume + page_dir_calc_reserved_space(1));
1653 	}
1654 
1655 	types += info_len;
1656 	n_fields = rec_get_n_fields_old(ibuf_rec)
1657 		- IBUF_REC_FIELD_USER;
1658 
1659 	data_size = ibuf_rec_get_size(ibuf_rec, types, n_fields, comp);
1660 
1661 	return(data_size + rec_get_converted_extra_size(data_size, n_fields, 0)
1662 	       + page_dir_calc_reserved_space(1));
1663 }
1664 
1665 /*********************************************************************//**
1666 Builds the tuple to insert to an ibuf tree when we have an entry for a
1667 non-clustered index.
1668 
1669 NOTE that the original entry must be kept because we copy pointers to
1670 its fields.
1671 
1672 @return own: entry to insert into an ibuf index tree */
1673 static
1674 dtuple_t*
ibuf_entry_build(ibuf_op_t op,dict_index_t * index,const dtuple_t * entry,ulint space,ulint page_no,ulint counter,mem_heap_t * heap)1675 ibuf_entry_build(
1676 /*=============*/
1677 	ibuf_op_t	op,	/*!< in: operation type */
1678 	dict_index_t*	index,	/*!< in: non-clustered index */
1679 	const dtuple_t*	entry,	/*!< in: entry for a non-clustered index */
1680 	ulint		space,	/*!< in: space id */
1681 	ulint		page_no,/*!< in: index page number where entry should
1682 				be inserted */
1683 	ulint		counter,/*!< in: counter value;
1684 				ULINT_UNDEFINED=not used */
1685 	mem_heap_t*	heap)	/*!< in: heap into which to build */
1686 {
1687 	dtuple_t*	tuple;
1688 	dfield_t*	field;
1689 	const dfield_t*	entry_field;
1690 	ulint		n_fields;
1691 	byte*		buf;
1692 	byte*		ti;
1693 	byte*		type_info;
1694 	ulint		i;
1695 
1696 	ut_ad(counter != ULINT_UNDEFINED || op == IBUF_OP_INSERT);
1697 	ut_ad(counter == ULINT_UNDEFINED || counter <= 0xFFFF);
1698 	ut_ad(op < IBUF_OP_COUNT);
1699 
1700 	/* We have to build a tuple with the following fields:
1701 
1702 	1-4) These are described at the top of this file.
1703 
1704 	5) The rest of the fields are copied from the entry.
1705 
1706 	All fields in the tuple are ordered like the type binary in our
1707 	insert buffer tree. */
1708 
1709 	n_fields = dtuple_get_n_fields(entry);
1710 
1711 	tuple = dtuple_create(heap, n_fields + IBUF_REC_FIELD_USER);
1712 
1713 	/* 1) Space Id */
1714 
1715 	field = dtuple_get_nth_field(tuple, IBUF_REC_FIELD_SPACE);
1716 
1717 	buf = static_cast<byte*>(mem_heap_alloc(heap, 4));
1718 
1719 	mach_write_to_4(buf, space);
1720 
1721 	dfield_set_data(field, buf, 4);
1722 
1723 	/* 2) Marker byte */
1724 
1725 	field = dtuple_get_nth_field(tuple, IBUF_REC_FIELD_MARKER);
1726 
1727 	buf = static_cast<byte*>(mem_heap_alloc(heap, 1));
1728 
1729 	/* We set the marker byte zero */
1730 
1731 	mach_write_to_1(buf, 0);
1732 
1733 	dfield_set_data(field, buf, 1);
1734 
1735 	/* 3) Page number */
1736 
1737 	field = dtuple_get_nth_field(tuple, IBUF_REC_FIELD_PAGE);
1738 
1739 	buf = static_cast<byte*>(mem_heap_alloc(heap, 4));
1740 
1741 	mach_write_to_4(buf, page_no);
1742 
1743 	dfield_set_data(field, buf, 4);
1744 
1745 	/* 4) Type info, part #1 */
1746 
1747 	if (counter == ULINT_UNDEFINED) {
1748 		i = dict_table_is_comp(index->table) ? 1 : 0;
1749 	} else {
1750 		ut_ad(counter <= 0xFFFF);
1751 		i = IBUF_REC_INFO_SIZE;
1752 	}
1753 
1754 	ti = type_info = static_cast<byte*>(
1755 		mem_heap_alloc(
1756 			heap,
1757 			i + n_fields * DATA_NEW_ORDER_NULL_TYPE_BUF_SIZE));
1758 
1759 	switch (i) {
1760 	default:
1761 		ut_error;
1762 		break;
1763 	case 1:
1764 		/* set the flag for ROW_FORMAT=COMPACT */
1765 		*ti++ = 0;
1766 		/* fall through */
1767 	case 0:
1768 		/* the old format does not allow delete buffering */
1769 		ut_ad(op == IBUF_OP_INSERT);
1770 		break;
1771 	case IBUF_REC_INFO_SIZE:
1772 		mach_write_to_2(ti + IBUF_REC_OFFSET_COUNTER, counter);
1773 
1774 		ti[IBUF_REC_OFFSET_TYPE] = (byte) op;
1775 		ti[IBUF_REC_OFFSET_FLAGS] = dict_table_is_comp(index->table)
1776 			? IBUF_REC_COMPACT : 0;
1777 		ti += IBUF_REC_INFO_SIZE;
1778 		break;
1779 	}
1780 
1781 	/* 5+) Fields from the entry */
1782 
1783 	for (i = 0; i < n_fields; i++) {
1784 		ulint			fixed_len;
1785 		const dict_field_t*	ifield;
1786 
1787 		field = dtuple_get_nth_field(tuple, i + IBUF_REC_FIELD_USER);
1788 		entry_field = dtuple_get_nth_field(entry, i);
1789 		dfield_copy(field, entry_field);
1790 
1791 		ifield = dict_index_get_nth_field(index, i);
1792 		/* Prefix index columns of fixed-length columns are of
1793 		fixed length.  However, in the function call below,
1794 		dfield_get_type(entry_field) contains the fixed length
1795 		of the column in the clustered index.  Replace it with
1796 		the fixed length of the secondary index column. */
1797 		fixed_len = ifield->fixed_len;
1798 
1799 #ifdef UNIV_DEBUG
1800 		if (fixed_len) {
1801 			/* dict_index_add_col() should guarantee these */
1802 			ut_ad(fixed_len <= (ulint)
1803 			      dfield_get_type(entry_field)->len);
1804 			if (ifield->prefix_len) {
1805 				ut_ad(ifield->prefix_len == fixed_len);
1806 			} else {
1807 				ut_ad(fixed_len == (ulint)
1808 				      dfield_get_type(entry_field)->len);
1809 			}
1810 		}
1811 #endif /* UNIV_DEBUG */
1812 
1813 		dtype_new_store_for_order_and_null_size(
1814 			ti, dfield_get_type(entry_field), fixed_len);
1815 		ti += DATA_NEW_ORDER_NULL_TYPE_BUF_SIZE;
1816 	}
1817 
1818 	/* 4) Type info, part #2 */
1819 
1820 	field = dtuple_get_nth_field(tuple, IBUF_REC_FIELD_METADATA);
1821 
1822 	dfield_set_data(field, type_info, ulint(ti - type_info));
1823 
1824 	/* Set all the types in the new tuple binary */
1825 
1826 	dtuple_set_types_binary(tuple, n_fields + IBUF_REC_FIELD_USER);
1827 
1828 	return(tuple);
1829 }
1830 
1831 /*********************************************************************//**
1832 Builds a search tuple used to search buffered inserts for an index page.
1833 This is for >= 4.1.x format records.
1834 @return own: search tuple */
1835 static
1836 dtuple_t*
ibuf_search_tuple_build(ulint space,ulint page_no,mem_heap_t * heap)1837 ibuf_search_tuple_build(
1838 /*====================*/
1839 	ulint		space,	/*!< in: space id */
1840 	ulint		page_no,/*!< in: index page number */
1841 	mem_heap_t*	heap)	/*!< in: heap into which to build */
1842 {
1843 	dtuple_t*	tuple;
1844 	dfield_t*	field;
1845 	byte*		buf;
1846 
1847 	tuple = dtuple_create(heap, IBUF_REC_FIELD_METADATA);
1848 
1849 	/* Store the space id in tuple */
1850 
1851 	field = dtuple_get_nth_field(tuple, IBUF_REC_FIELD_SPACE);
1852 
1853 	buf = static_cast<byte*>(mem_heap_alloc(heap, 4));
1854 
1855 	mach_write_to_4(buf, space);
1856 
1857 	dfield_set_data(field, buf, 4);
1858 
1859 	/* Store the new format record marker byte */
1860 
1861 	field = dtuple_get_nth_field(tuple, IBUF_REC_FIELD_MARKER);
1862 
1863 	buf = static_cast<byte*>(mem_heap_alloc(heap, 1));
1864 
1865 	mach_write_to_1(buf, 0);
1866 
1867 	dfield_set_data(field, buf, 1);
1868 
1869 	/* Store the page number in tuple */
1870 
1871 	field = dtuple_get_nth_field(tuple, IBUF_REC_FIELD_PAGE);
1872 
1873 	buf = static_cast<byte*>(mem_heap_alloc(heap, 4));
1874 
1875 	mach_write_to_4(buf, page_no);
1876 
1877 	dfield_set_data(field, buf, 4);
1878 
1879 	dtuple_set_types_binary(tuple, IBUF_REC_FIELD_METADATA);
1880 
1881 	return(tuple);
1882 }
1883 
1884 /*********************************************************************//**
1885 Checks if there are enough pages in the free list of the ibuf tree that we
1886 dare to start a pessimistic insert to the insert buffer.
1887 @return whether enough free pages in list */
ibuf_data_enough_free_for_insert()1888 static inline bool ibuf_data_enough_free_for_insert()
1889 {
1890 	ut_ad(mutex_own(&ibuf_mutex));
1891 
1892 	/* We want a big margin of free pages, because a B-tree can sometimes
1893 	grow in size also if records are deleted from it, as the node pointers
1894 	can change, and we must make sure that we are able to delete the
1895 	inserts buffered for pages that we read to the buffer pool, without
1896 	any risk of running out of free space in the insert buffer. */
1897 
1898 	return(ibuf->free_list_len >= (ibuf->size / 2) + 3 * ibuf->height);
1899 }
1900 
1901 /*********************************************************************//**
1902 Checks if there are enough pages in the free list of the ibuf tree that we
1903 should remove them and free to the file space management.
1904 @return TRUE if enough free pages in list */
1905 UNIV_INLINE
1906 ibool
ibuf_data_too_much_free(void)1907 ibuf_data_too_much_free(void)
1908 /*=========================*/
1909 {
1910 	ut_ad(mutex_own(&ibuf_mutex));
1911 
1912 	return(ibuf->free_list_len >= 3 + (ibuf->size / 2) + 3 * ibuf->height);
1913 }
1914 
1915 /*********************************************************************//**
1916 Allocates a new page from the ibuf file segment and adds it to the free
1917 list.
1918 @return TRUE on success, FALSE if no space left */
1919 static
1920 ibool
ibuf_add_free_page(void)1921 ibuf_add_free_page(void)
1922 /*====================*/
1923 {
1924 	mtr_t		mtr;
1925 	page_t*		header_page;
1926 	buf_block_t*	block;
1927 	page_t*		page;
1928 	page_t*		root;
1929 	page_t*		bitmap_page;
1930 
1931 	mtr_start(&mtr);
1932 	/* Acquire the fsp latch before the ibuf header, obeying the latching
1933 	order */
1934 	mtr_x_lock_space(fil_system.sys_space, &mtr);
1935 	header_page = ibuf_header_page_get(&mtr);
1936 
1937 	/* Allocate a new page: NOTE that if the page has been a part of a
1938 	non-clustered index which has subsequently been dropped, then the
1939 	page may have buffered inserts in the insert buffer, and these
1940 	should be deleted from there. These get deleted when the page
1941 	allocation creates the page in buffer. Thus the call below may end
1942 	up calling the insert buffer routines and, as we yet have no latches
1943 	to insert buffer tree pages, these routines can run without a risk
1944 	of a deadlock. This is the reason why we created a special ibuf
1945 	header page apart from the ibuf tree. */
1946 
1947 	block = fseg_alloc_free_page(
1948 		header_page + IBUF_HEADER + IBUF_TREE_SEG_HEADER, 0, FSP_UP,
1949 		&mtr);
1950 
1951 	if (block == NULL) {
1952 		mtr_commit(&mtr);
1953 
1954 		return(FALSE);
1955 	}
1956 
1957 	ut_ad(rw_lock_get_x_lock_count(&block->lock) == 1);
1958 	ibuf_enter(&mtr);
1959 	mutex_enter(&ibuf_mutex);
1960 	root = ibuf_tree_root_get(&mtr);
1961 
1962 	buf_block_dbg_add_level(block, SYNC_IBUF_TREE_NODE_NEW);
1963 	page = buf_block_get_frame(block);
1964 
1965 	mlog_write_ulint(page + FIL_PAGE_TYPE, FIL_PAGE_IBUF_FREE_LIST,
1966 			 MLOG_2BYTES, &mtr);
1967 
1968 	/* Add the page to the free list and update the ibuf size data */
1969 
1970 	flst_add_last(root + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST,
1971 		      page + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST_NODE, &mtr);
1972 
1973 	ibuf->seg_size++;
1974 	ibuf->free_list_len++;
1975 
1976 	/* Set the bit indicating that this page is now an ibuf tree page
1977 	(level 2 page) */
1978 
1979 	const page_id_t		page_id(IBUF_SPACE_ID, block->page.id.page_no());
1980 	bitmap_page = ibuf_bitmap_get_map_page(page_id, 0, &mtr);
1981 
1982 	mutex_exit(&ibuf_mutex);
1983 
1984 	ibuf_bitmap_page_set_bits(bitmap_page, page_id, srv_page_size,
1985 				  IBUF_BITMAP_IBUF, TRUE, &mtr);
1986 
1987 	ibuf_mtr_commit(&mtr);
1988 
1989 	return(TRUE);
1990 }
1991 
1992 /*********************************************************************//**
1993 Removes a page from the free list and frees it to the fsp system. */
1994 static
1995 void
ibuf_remove_free_page(void)1996 ibuf_remove_free_page(void)
1997 /*=======================*/
1998 {
1999 	mtr_t	mtr;
2000 	mtr_t	mtr2;
2001 	page_t*	header_page;
2002 	ulint	page_no;
2003 	page_t*	page;
2004 	page_t*	root;
2005 	page_t*	bitmap_page;
2006 
2007 	log_free_check();
2008 
2009 	mtr_start(&mtr);
2010 	/* Acquire the fsp latch before the ibuf header, obeying the latching
2011 	order */
2012 
2013 	mtr_x_lock_space(fil_system.sys_space, &mtr);
2014 	header_page = ibuf_header_page_get(&mtr);
2015 
2016 	/* Prevent pessimistic inserts to insert buffer trees for a while */
2017 	ibuf_enter(&mtr);
2018 	mutex_enter(&ibuf_pessimistic_insert_mutex);
2019 	mutex_enter(&ibuf_mutex);
2020 
2021 	if (!ibuf_data_too_much_free()) {
2022 
2023 		mutex_exit(&ibuf_mutex);
2024 		mutex_exit(&ibuf_pessimistic_insert_mutex);
2025 
2026 		ibuf_mtr_commit(&mtr);
2027 
2028 		return;
2029 	}
2030 
2031 	ibuf_mtr_start(&mtr2);
2032 
2033 	root = ibuf_tree_root_get(&mtr2);
2034 
2035 	mutex_exit(&ibuf_mutex);
2036 
2037 	page_no = flst_get_last(root + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST,
2038 				&mtr2).page;
2039 
2040 	/* NOTE that we must release the latch on the ibuf tree root
2041 	because in fseg_free_page we access level 1 pages, and the root
2042 	is a level 2 page. */
2043 
2044 	ibuf_mtr_commit(&mtr2);
2045 	ibuf_exit(&mtr);
2046 
2047 	/* Since pessimistic inserts were prevented, we know that the
2048 	page is still in the free list. NOTE that also deletes may take
2049 	pages from the free list, but they take them from the start, and
2050 	the free list was so long that they cannot have taken the last
2051 	page from it. */
2052 
2053 	compile_time_assert(IBUF_SPACE_ID == 0);
2054 	fseg_free_page(header_page + IBUF_HEADER + IBUF_TREE_SEG_HEADER,
2055 		       fil_system.sys_space, page_no, true, &mtr);
2056 
2057 	const page_id_t	page_id(IBUF_SPACE_ID, page_no);
2058 
2059 	ut_d(buf_page_reset_file_page_was_freed(page_id));
2060 
2061 	ibuf_enter(&mtr);
2062 
2063 	mutex_enter(&ibuf_mutex);
2064 
2065 	root = ibuf_tree_root_get(&mtr);
2066 
2067 	ut_ad(page_no == flst_get_last(root + PAGE_HEADER
2068 				       + PAGE_BTR_IBUF_FREE_LIST, &mtr).page);
2069 
2070 	{
2071 		buf_block_t*	block;
2072 
2073 		block = buf_page_get(page_id, 0, RW_X_LATCH, &mtr);
2074 
2075 		buf_block_dbg_add_level(block, SYNC_IBUF_TREE_NODE);
2076 
2077 		page = buf_block_get_frame(block);
2078 	}
2079 
2080 	/* Remove the page from the free list and update the ibuf size data */
2081 
2082 	flst_remove(root + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST,
2083 		    page + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST_NODE, &mtr);
2084 
2085 	mutex_exit(&ibuf_pessimistic_insert_mutex);
2086 
2087 	ibuf->seg_size--;
2088 	ibuf->free_list_len--;
2089 
2090 	/* Set the bit indicating that this page is no more an ibuf tree page
2091 	(level 2 page) */
2092 
2093 	bitmap_page = ibuf_bitmap_get_map_page(page_id, 0, &mtr);
2094 
2095 	mutex_exit(&ibuf_mutex);
2096 
2097 	ibuf_bitmap_page_set_bits(
2098 		bitmap_page, page_id, srv_page_size,
2099 		IBUF_BITMAP_IBUF, FALSE, &mtr);
2100 
2101 	ut_d(buf_page_set_file_page_was_freed(page_id));
2102 
2103 	ibuf_mtr_commit(&mtr);
2104 }
2105 
2106 /***********************************************************************//**
2107 Frees excess pages from the ibuf free list. This function is called when an OS
2108 thread calls fsp services to allocate a new file segment, or a new page to a
2109 file segment, and the thread did not own the fsp latch before this call. */
2110 void
ibuf_free_excess_pages(void)2111 ibuf_free_excess_pages(void)
2112 /*========================*/
2113 {
2114 	if (srv_force_recovery >= SRV_FORCE_NO_IBUF_MERGE) {
2115 		return;
2116 	}
2117 
2118 	/* Free at most a few pages at a time, so that we do not delay the
2119 	requested service too much */
2120 
2121 	for (ulint i = 0; i < 4; i++) {
2122 
2123 		ibool	too_much_free;
2124 
2125 		mutex_enter(&ibuf_mutex);
2126 		too_much_free = ibuf_data_too_much_free();
2127 		mutex_exit(&ibuf_mutex);
2128 
2129 		if (!too_much_free) {
2130 			return;
2131 		}
2132 
2133 		ibuf_remove_free_page();
2134 	}
2135 }
2136 
2137 #ifdef UNIV_DEBUG
2138 # define ibuf_get_merge_page_nos(contract,rec,mtr,ids,pages,n_stored) \
2139 	ibuf_get_merge_page_nos_func(contract,rec,mtr,ids,pages,n_stored)
2140 #else /* UNIV_DEBUG */
2141 # define ibuf_get_merge_page_nos(contract,rec,mtr,ids,pages,n_stored) \
2142 	ibuf_get_merge_page_nos_func(contract,rec,ids,pages,n_stored)
2143 #endif /* UNIV_DEBUG */
2144 
2145 /*********************************************************************//**
2146 Reads page numbers from a leaf in an ibuf tree.
2147 @return a lower limit for the combined volume of records which will be
2148 merged */
2149 static
2150 ulint
ibuf_get_merge_page_nos_func(ibool contract,const rec_t * rec,mtr_t * mtr,ulint * space_ids,ulint * page_nos,ulint * n_stored)2151 ibuf_get_merge_page_nos_func(
2152 /*=========================*/
2153 	ibool		contract,/*!< in: TRUE if this function is called to
2154 				contract the tree, FALSE if this is called
2155 				when a single page becomes full and we look
2156 				if it pays to read also nearby pages */
2157 	const rec_t*	rec,	/*!< in: insert buffer record */
2158 #ifdef UNIV_DEBUG
2159 	mtr_t*		mtr,	/*!< in: mini-transaction holding rec */
2160 #endif /* UNIV_DEBUG */
2161 	ulint*		space_ids,/*!< in/out: space id's of the pages */
2162 	ulint*		page_nos,/*!< in/out: buffer for at least
2163 				IBUF_MAX_N_PAGES_MERGED many page numbers;
2164 				the page numbers are in an ascending order */
2165 	ulint*		n_stored)/*!< out: number of page numbers stored to
2166 				page_nos in this function */
2167 {
2168 	ulint	prev_page_no;
2169 	ulint	prev_space_id;
2170 	ulint	first_page_no;
2171 	ulint	first_space_id;
2172 	ulint	rec_page_no;
2173 	ulint	rec_space_id;
2174 	ulint	sum_volumes;
2175 	ulint	volume_for_page;
2176 	ulint	rec_volume;
2177 	ulint	limit;
2178 	ulint	n_pages;
2179 
2180 	ut_ad(mtr_memo_contains_page_flagged(mtr, rec, MTR_MEMO_PAGE_X_FIX
2181 					     | MTR_MEMO_PAGE_S_FIX));
2182 	ut_ad(ibuf_inside(mtr));
2183 
2184 	*n_stored = 0;
2185 
2186 	limit = ut_min(IBUF_MAX_N_PAGES_MERGED,
2187 		       buf_pool_get_curr_size() / 4);
2188 
2189 	if (page_rec_is_supremum(rec)) {
2190 
2191 		rec = page_rec_get_prev_const(rec);
2192 	}
2193 
2194 	if (page_rec_is_infimum(rec)) {
2195 
2196 		rec = page_rec_get_next_const(rec);
2197 	}
2198 
2199 	if (page_rec_is_supremum(rec)) {
2200 
2201 		return(0);
2202 	}
2203 
2204 	first_page_no = ibuf_rec_get_page_no(mtr, rec);
2205 	first_space_id = ibuf_rec_get_space(mtr, rec);
2206 	n_pages = 0;
2207 	prev_page_no = 0;
2208 	prev_space_id = 0;
2209 
2210 	/* Go backwards from the first rec until we reach the border of the
2211 	'merge area', or the page start or the limit of storeable pages is
2212 	reached */
2213 
2214 	while (!page_rec_is_infimum(rec) && UNIV_LIKELY(n_pages < limit)) {
2215 
2216 		rec_page_no = ibuf_rec_get_page_no(mtr, rec);
2217 		rec_space_id = ibuf_rec_get_space(mtr, rec);
2218 
2219 		if (rec_space_id != first_space_id
2220 		    || (rec_page_no / IBUF_MERGE_AREA)
2221 		    != (first_page_no / IBUF_MERGE_AREA)) {
2222 
2223 			break;
2224 		}
2225 
2226 		if (rec_page_no != prev_page_no
2227 		    || rec_space_id != prev_space_id) {
2228 			n_pages++;
2229 		}
2230 
2231 		prev_page_no = rec_page_no;
2232 		prev_space_id = rec_space_id;
2233 
2234 		rec = page_rec_get_prev_const(rec);
2235 	}
2236 
2237 	rec = page_rec_get_next_const(rec);
2238 
2239 	/* At the loop start there is no prev page; we mark this with a pair
2240 	of space id, page no (0, 0) for which there can never be entries in
2241 	the insert buffer */
2242 
2243 	prev_page_no = 0;
2244 	prev_space_id = 0;
2245 	sum_volumes = 0;
2246 	volume_for_page = 0;
2247 
2248 	while (*n_stored < limit) {
2249 		if (page_rec_is_supremum(rec)) {
2250 			/* When no more records available, mark this with
2251 			another 'impossible' pair of space id, page no */
2252 			rec_page_no = 1;
2253 			rec_space_id = 0;
2254 		} else {
2255 			rec_page_no = ibuf_rec_get_page_no(mtr, rec);
2256 			rec_space_id = ibuf_rec_get_space(mtr, rec);
2257 			/* In the system tablespace the smallest
2258 			possible secondary index leaf page number is
2259 			bigger than FSP_DICT_HDR_PAGE_NO (7).
2260 			In all tablespaces, pages 0 and 1 are reserved
2261 			for the allocation bitmap and the change
2262 			buffer bitmap. In file-per-table tablespaces,
2263 			a file segment inode page will be created at
2264 			page 2 and the clustered index tree is created
2265 			at page 3.  So for file-per-table tablespaces,
2266 			page 4 is the smallest possible secondary
2267 			index leaf page. CREATE TABLESPACE also initially
2268 			uses pages 2 and 3 for the first created table,
2269 			but that table may be dropped, allowing page 2
2270 			to be reused for a secondary index leaf page.
2271 			To keep this assertion simple, just
2272 			make sure the page is >= 2. */
2273 			ut_ad(rec_page_no >= FSP_FIRST_INODE_PAGE_NO);
2274 		}
2275 
2276 #ifdef UNIV_IBUF_DEBUG
2277 		ut_a(*n_stored < IBUF_MAX_N_PAGES_MERGED);
2278 #endif
2279 		if ((rec_space_id != prev_space_id
2280 		     || rec_page_no != prev_page_no)
2281 		    && (prev_space_id != 0 || prev_page_no != 0)) {
2282 
2283 			if (contract
2284 			    || (prev_page_no == first_page_no
2285 				&& prev_space_id == first_space_id)
2286 			    || (volume_for_page
2287 				> ((IBUF_MERGE_THRESHOLD - 1)
2288 				   * 4U << srv_page_size_shift
2289 				   / IBUF_PAGE_SIZE_PER_FREE_SPACE)
2290 				/ IBUF_MERGE_THRESHOLD)) {
2291 
2292 				space_ids[*n_stored] = prev_space_id;
2293 				page_nos[*n_stored] = prev_page_no;
2294 
2295 				(*n_stored)++;
2296 
2297 				sum_volumes += volume_for_page;
2298 			}
2299 
2300 			if (rec_space_id != first_space_id
2301 			    || rec_page_no / IBUF_MERGE_AREA
2302 			    != first_page_no / IBUF_MERGE_AREA) {
2303 
2304 				break;
2305 			}
2306 
2307 			volume_for_page = 0;
2308 		}
2309 
2310 		if (rec_page_no == 1 && rec_space_id == 0) {
2311 			/* Supremum record */
2312 
2313 			break;
2314 		}
2315 
2316 		rec_volume = ibuf_rec_get_volume(mtr, rec);
2317 
2318 		volume_for_page += rec_volume;
2319 
2320 		prev_page_no = rec_page_no;
2321 		prev_space_id = rec_space_id;
2322 
2323 		rec = page_rec_get_next_const(rec);
2324 	}
2325 
2326 #ifdef UNIV_IBUF_DEBUG
2327 	ut_a(*n_stored <= IBUF_MAX_N_PAGES_MERGED);
2328 #endif
2329 #if 0
2330 	fprintf(stderr, "Ibuf merge batch %lu pages %lu volume\n",
2331 		*n_stored, sum_volumes);
2332 #endif
2333 	return(sum_volumes);
2334 }
2335 
2336 /*******************************************************************//**
2337 Get the matching records for space id.
2338 @return current rec or NULL */
2339 static	MY_ATTRIBUTE((nonnull, warn_unused_result))
2340 const rec_t*
ibuf_get_user_rec(btr_pcur_t * pcur,mtr_t * mtr)2341 ibuf_get_user_rec(
2342 /*===============*/
2343 	btr_pcur_t*	pcur,		/*!< in: the current cursor */
2344 	mtr_t*		mtr)		/*!< in: mini transaction */
2345 {
2346 	do {
2347 		const rec_t* rec = btr_pcur_get_rec(pcur);
2348 
2349 		if (page_rec_is_user_rec(rec)) {
2350 			return(rec);
2351 		}
2352 	} while (btr_pcur_move_to_next(pcur, mtr));
2353 
2354 	return(NULL);
2355 }
2356 
2357 /*********************************************************************//**
2358 Reads page numbers for a space id from an ibuf tree.
2359 @return a lower limit for the combined volume of records which will be
2360 merged */
2361 static	MY_ATTRIBUTE((nonnull, warn_unused_result))
2362 ulint
ibuf_get_merge_pages(btr_pcur_t * pcur,ulint space,ulint limit,ulint * pages,ulint * spaces,ulint * n_pages,mtr_t * mtr)2363 ibuf_get_merge_pages(
2364 /*=================*/
2365 	btr_pcur_t*	pcur,	/*!< in/out: cursor */
2366 	ulint		space,	/*!< in: space for which to merge */
2367 	ulint		limit,	/*!< in: max page numbers to read */
2368 	ulint*		pages,	/*!< out: pages read */
2369 	ulint*		spaces,	/*!< out: spaces read */
2370 	ulint*		n_pages,/*!< out: number of pages read */
2371 	mtr_t*		mtr)	/*!< in: mini transaction */
2372 {
2373 	const rec_t*	rec;
2374 	ulint		volume = 0;
2375 
2376 	ut_a(space != ULINT_UNDEFINED);
2377 
2378 	*n_pages = 0;
2379 
2380 	while ((rec = ibuf_get_user_rec(pcur, mtr)) != 0
2381 	       && ibuf_rec_get_space(mtr, rec) == space
2382 	       && *n_pages < limit) {
2383 
2384 		ulint	page_no = ibuf_rec_get_page_no(mtr, rec);
2385 
2386 		if (*n_pages == 0 || pages[*n_pages - 1] != page_no) {
2387 			spaces[*n_pages] = space;
2388 			pages[*n_pages] = page_no;
2389 			++*n_pages;
2390 		}
2391 
2392 		volume += ibuf_rec_get_volume(mtr, rec);
2393 
2394 		btr_pcur_move_to_next(pcur, mtr);
2395 	}
2396 
2397 	return(volume);
2398 }
2399 
2400 /*********************************************************************//**
2401 Contracts insert buffer trees by reading pages to the buffer pool.
2402 @return a lower limit for the combined size in bytes of entries which
2403 will be merged from ibuf trees to the pages read, 0 if ibuf is
2404 empty */
2405 static
2406 ulint
ibuf_merge_pages(ulint * n_pages,bool sync)2407 ibuf_merge_pages(
2408 /*=============*/
2409 	ulint*	n_pages,	/*!< out: number of pages to which merged */
2410 	bool	sync)		/*!< in: true if the caller wants to wait for
2411 				the issued read with the highest tablespace
2412 				address to complete */
2413 {
2414 	mtr_t		mtr;
2415 	btr_pcur_t	pcur;
2416 	ulint		sum_sizes;
2417 	ulint		page_nos[IBUF_MAX_N_PAGES_MERGED];
2418 	ulint		space_ids[IBUF_MAX_N_PAGES_MERGED];
2419 
2420 	*n_pages = 0;
2421 
2422 	ibuf_mtr_start(&mtr);
2423 
2424 	/* Open a cursor to a randomly chosen leaf of the tree, at a random
2425 	position within the leaf */
2426 	bool available;
2427 
2428 	available = btr_pcur_open_at_rnd_pos(ibuf->index, BTR_SEARCH_LEAF,
2429 					     &pcur, &mtr);
2430 	/* No one should make this index unavailable when server is running */
2431 	ut_a(available);
2432 
2433 	ut_ad(page_validate(btr_pcur_get_page(&pcur), ibuf->index));
2434 
2435 	if (page_is_empty(btr_pcur_get_page(&pcur))) {
2436 		/* If a B-tree page is empty, it must be the root page
2437 		and the whole B-tree must be empty. InnoDB does not
2438 		allow empty B-tree pages other than the root. */
2439 		ut_ad(ibuf->empty);
2440 		ut_ad(page_get_space_id(btr_pcur_get_page(&pcur))
2441 		      == IBUF_SPACE_ID);
2442 		ut_ad(page_get_page_no(btr_pcur_get_page(&pcur))
2443 		      == FSP_IBUF_TREE_ROOT_PAGE_NO);
2444 
2445 		ibuf_mtr_commit(&mtr);
2446 		btr_pcur_close(&pcur);
2447 
2448 		return(0);
2449 	}
2450 
2451 	sum_sizes = ibuf_get_merge_page_nos(TRUE,
2452 					    btr_pcur_get_rec(&pcur), &mtr,
2453 					    space_ids,
2454 					    page_nos, n_pages);
2455 #if 0 /* defined UNIV_IBUF_DEBUG */
2456 	fprintf(stderr, "Ibuf contract sync %lu pages %lu volume %lu\n",
2457 		sync, *n_pages, sum_sizes);
2458 #endif
2459 	ibuf_mtr_commit(&mtr);
2460 	btr_pcur_close(&pcur);
2461 
2462 	buf_read_ibuf_merge_pages(
2463 		sync, space_ids, page_nos, *n_pages);
2464 
2465 	return(sum_sizes + 1);
2466 }
2467 
2468 /*********************************************************************//**
2469 Contracts insert buffer trees by reading pages referring to space_id
2470 to the buffer pool.
2471 @returns number of pages merged.*/
2472 ulint
ibuf_merge_space(ulint space)2473 ibuf_merge_space(
2474 /*=============*/
2475 	ulint		space)	/*!< in: tablespace id to merge */
2476 {
2477 	mtr_t		mtr;
2478 	btr_pcur_t	pcur;
2479 	mem_heap_t*	heap = mem_heap_create(512);
2480 	dtuple_t*	tuple = ibuf_search_tuple_build(space, 0, heap);
2481 	ulint		n_pages = 0;
2482 
2483 	ut_ad(space < SRV_LOG_SPACE_FIRST_ID);
2484 
2485 	ibuf_mtr_start(&mtr);
2486 
2487 	/* Position the cursor on the first matching record. */
2488 
2489 	btr_pcur_open(
2490 		ibuf->index, tuple, PAGE_CUR_GE, BTR_SEARCH_LEAF, &pcur,
2491 		&mtr);
2492 
2493 	mem_heap_free(heap);
2494 
2495 	ut_ad(page_validate(btr_pcur_get_page(&pcur), ibuf->index));
2496 
2497 	ulint		sum_sizes = 0;
2498 	ulint		pages[IBUF_MAX_N_PAGES_MERGED];
2499 	ulint		spaces[IBUF_MAX_N_PAGES_MERGED];
2500 
2501 	if (page_is_empty(btr_pcur_get_page(&pcur))) {
2502 		/* If a B-tree page is empty, it must be the root page
2503 		and the whole B-tree must be empty. InnoDB does not
2504 		allow empty B-tree pages other than the root. */
2505 		ut_ad(ibuf->empty);
2506 		ut_ad(page_get_space_id(btr_pcur_get_page(&pcur))
2507 		      == IBUF_SPACE_ID);
2508 		ut_ad(page_get_page_no(btr_pcur_get_page(&pcur))
2509 		      == FSP_IBUF_TREE_ROOT_PAGE_NO);
2510 
2511 	} else {
2512 
2513 		sum_sizes = ibuf_get_merge_pages(
2514 			&pcur, space, IBUF_MAX_N_PAGES_MERGED,
2515 			&pages[0], &spaces[0], &n_pages,
2516 			&mtr);
2517 		ib::info() << "Size of pages merged " << sum_sizes;
2518 	}
2519 
2520 	ibuf_mtr_commit(&mtr);
2521 
2522 	btr_pcur_close(&pcur);
2523 
2524 	if (n_pages > 0) {
2525 		ut_ad(n_pages <= UT_ARR_SIZE(pages));
2526 
2527 #ifdef UNIV_DEBUG
2528 		for (ulint i = 0; i < n_pages; ++i) {
2529 			ut_ad(spaces[i] == space);
2530 		}
2531 #endif /* UNIV_DEBUG */
2532 
2533 		buf_read_ibuf_merge_pages(
2534 			true, spaces, pages, n_pages);
2535 	}
2536 
2537 	return(n_pages);
2538 }
2539 
2540 /** Contract the change buffer by reading pages to the buffer pool.
2541 @param[out]	n_pages		number of pages merged
2542 @param[in]	sync		whether the caller waits for
2543 the issued reads to complete
2544 @return a lower limit for the combined size in bytes of entries which
2545 will be merged from ibuf trees to the pages read, 0 if ibuf is
2546 empty */
2547 static MY_ATTRIBUTE((warn_unused_result))
2548 ulint
ibuf_merge(ulint * n_pages,bool sync)2549 ibuf_merge(
2550 	ulint*		n_pages,
2551 	bool		sync)
2552 {
2553 	*n_pages = 0;
2554 
2555 	/* We perform a dirty read of ibuf->empty, without latching
2556 	the insert buffer root page. We trust this dirty read except
2557 	when a slow shutdown is being executed. During a slow
2558 	shutdown, the insert buffer merge must be completed. */
2559 
2560 	if (ibuf->empty && srv_shutdown_state <= SRV_SHUTDOWN_INITIATED) {
2561 		return(0);
2562 #if defined UNIV_DEBUG || defined UNIV_IBUF_DEBUG
2563 	} else if (ibuf_debug) {
2564 		return(0);
2565 #endif /* UNIV_DEBUG || UNIV_IBUF_DEBUG */
2566 	} else {
2567 		return(ibuf_merge_pages(n_pages, sync));
2568 	}
2569 }
2570 
2571 /** Contract the change buffer by reading pages to the buffer pool.
2572 @param[in]	sync	whether the caller waits for
2573 the issued reads to complete
2574 @return a lower limit for the combined size in bytes of entries which
2575 will be merged from ibuf trees to the pages read, 0 if ibuf is empty */
2576 static
2577 ulint
ibuf_contract(bool sync)2578 ibuf_contract(
2579 	bool	sync)
2580 {
2581 	ulint	n_pages;
2582 
2583 	return(ibuf_merge_pages(&n_pages, sync));
2584 }
2585 
2586 /** Contract the change buffer by reading pages to the buffer pool.
2587 @param[in]	full		If true, do a full contraction based
2588 on PCT_IO(100). If false, the size of contract batch is determined
2589 based on the current size of the change buffer.
2590 @return a lower limit for the combined size in bytes of entries which
2591 will be merged from ibuf trees to the pages read, 0 if ibuf is
2592 empty */
2593 ulint
ibuf_merge_in_background(bool full)2594 ibuf_merge_in_background(
2595 	bool	full)
2596 {
2597 	ulint	sum_bytes	= 0;
2598 	ulint	sum_pages	= 0;
2599 	ulint	n_pag2;
2600 	ulint	n_pages;
2601 
2602 #if defined UNIV_DEBUG || defined UNIV_IBUF_DEBUG
2603 	if (srv_ibuf_disable_background_merge) {
2604 		return(0);
2605 	}
2606 #endif /* UNIV_DEBUG || UNIV_IBUF_DEBUG */
2607 
2608 	if (full) {
2609 		/* Caller has requested a full batch */
2610 		n_pages = PCT_IO(100);
2611 	} else {
2612 		/* By default we do a batch of 5% of the io_capacity */
2613 		n_pages = PCT_IO(5);
2614 
2615 		mutex_enter(&ibuf_mutex);
2616 
2617 		/* If the ibuf->size is more than half the max_size
2618 		then we make more agreesive contraction.
2619 		+1 is to avoid division by zero. */
2620 		if (ibuf->size > ibuf->max_size / 2) {
2621 			ulint diff = ibuf->size - ibuf->max_size / 2;
2622 			n_pages += PCT_IO((diff * 100)
2623 					   / (ibuf->max_size + 1));
2624 		}
2625 
2626 		mutex_exit(&ibuf_mutex);
2627 	}
2628 
2629 #if defined UNIV_DEBUG || defined UNIV_IBUF_DEBUG
2630 	if (ibuf_debug) {
2631 		return(0);
2632 	}
2633 #endif /* UNIV_DEBUG || UNIV_IBUF_DEBUG */
2634 
2635 	while (sum_pages < n_pages) {
2636 		ulint	n_bytes;
2637 
2638 		n_bytes = ibuf_merge(&n_pag2, false);
2639 
2640 		if (n_bytes == 0) {
2641 			return(sum_bytes);
2642 		}
2643 
2644 		sum_bytes += n_bytes;
2645 		sum_pages += n_pag2;
2646 	}
2647 
2648 	return(sum_bytes);
2649 }
2650 
2651 /*********************************************************************//**
2652 Contract insert buffer trees after insert if they are too big. */
2653 UNIV_INLINE
2654 void
ibuf_contract_after_insert(ulint entry_size)2655 ibuf_contract_after_insert(
2656 /*=======================*/
2657 	ulint	entry_size)	/*!< in: size of a record which was inserted
2658 				into an ibuf tree */
2659 {
2660 	ibool	sync;
2661 	ulint	sum_sizes;
2662 	ulint	size;
2663 	ulint	max_size;
2664 
2665 	/* Perform dirty reads of ibuf->size and ibuf->max_size, to
2666 	reduce ibuf_mutex contention. ibuf->max_size remains constant
2667 	after ibuf_init_at_db_start(), but ibuf->size should be
2668 	protected by ibuf_mutex. Given that ibuf->size fits in a
2669 	machine word, this should be OK; at worst we are doing some
2670 	excessive ibuf_contract() or occasionally skipping a
2671 	ibuf_contract(). */
2672 	size = ibuf->size;
2673 	max_size = ibuf->max_size;
2674 
2675 	if (size < max_size + IBUF_CONTRACT_ON_INSERT_NON_SYNC) {
2676 		return;
2677 	}
2678 
2679 	sync = (size >= max_size + IBUF_CONTRACT_ON_INSERT_SYNC);
2680 
2681 	/* Contract at least entry_size many bytes */
2682 	sum_sizes = 0;
2683 	size = 1;
2684 
2685 	do {
2686 
2687 		size = ibuf_contract(sync);
2688 		sum_sizes += size;
2689 	} while (size > 0 && sum_sizes < entry_size);
2690 }
2691 
2692 /** Determine if a change buffer record has been encountered already.
2693 @param rec   change buffer record in the MySQL 5.5 format
2694 @param hash  hash table of encountered records
2695 @param size  number of elements in hash
2696 @retval true if a distinct record
2697 @retval false if this may be duplicating an earlier record */
ibuf_get_volume_buffered_hash(const rec_t * rec,ulint * hash,ulint size)2698 static bool ibuf_get_volume_buffered_hash(const rec_t *rec, ulint *hash,
2699                                           ulint size)
2700 {
2701   ut_ad(rec_get_n_fields_old(rec) > IBUF_REC_FIELD_USER);
2702   const ulint start= rec_get_field_start_offs(rec, IBUF_REC_FIELD_USER);
2703   const ulint len= rec_get_data_size_old(rec) - start;
2704   const uint32_t fold= ut_crc32(rec + start, len);
2705   hash+= (fold / (CHAR_BIT * sizeof *hash)) % size;
2706   ulint bitmask= static_cast<ulint>(1) << (fold % (CHAR_BIT * sizeof(*hash)));
2707 
2708   if (*hash & bitmask)
2709     return false;
2710 
2711   /* We have not seen this record yet. Remember it. */
2712   *hash|= bitmask;
2713   return true;
2714 }
2715 
2716 #ifdef UNIV_DEBUG
2717 # define ibuf_get_volume_buffered_count(mtr,rec,hash,size,n_recs)	\
2718 	ibuf_get_volume_buffered_count_func(mtr,rec,hash,size,n_recs)
2719 #else /* UNIV_DEBUG */
2720 # define ibuf_get_volume_buffered_count(mtr,rec,hash,size,n_recs)	\
2721 	ibuf_get_volume_buffered_count_func(rec,hash,size,n_recs)
2722 #endif /* UNIV_DEBUG */
2723 
2724 /*********************************************************************//**
2725 Update the estimate of the number of records on a page, and
2726 get the space taken by merging the buffered record to the index page.
2727 @return size of index record in bytes + an upper limit of the space
2728 taken in the page directory */
2729 static
2730 ulint
ibuf_get_volume_buffered_count_func(mtr_t * mtr,const rec_t * rec,ulint * hash,ulint size,lint * n_recs)2731 ibuf_get_volume_buffered_count_func(
2732 /*================================*/
2733 #ifdef UNIV_DEBUG
2734 	mtr_t*		mtr,	/*!< in: mini-transaction owning rec */
2735 #endif /* UNIV_DEBUG */
2736 	const rec_t*	rec,	/*!< in: insert buffer record */
2737 	ulint*		hash,	/*!< in/out: hash array */
2738 	ulint		size,	/*!< in: number of elements in hash array */
2739 	lint*		n_recs)	/*!< in/out: estimated number of records
2740 				on the page that rec points to */
2741 {
2742 	ulint		len;
2743 	ibuf_op_t	ibuf_op;
2744 	const byte*	types;
2745 	ulint		n_fields;
2746 
2747 	ut_ad(mtr_memo_contains_page_flagged(mtr, rec, MTR_MEMO_PAGE_X_FIX
2748 					     | MTR_MEMO_PAGE_S_FIX));
2749 	ut_ad(ibuf_inside(mtr));
2750 
2751 	n_fields = rec_get_n_fields_old(rec);
2752 	ut_ad(n_fields > IBUF_REC_FIELD_USER);
2753 	n_fields -= IBUF_REC_FIELD_USER;
2754 
2755 	rec_get_nth_field_offs_old(rec, 1, &len);
2756 	/* This function is only invoked when buffering new
2757 	operations.  All pre-4.1 records should have been merged
2758 	when the database was started up. */
2759 	ut_a(len == 1);
2760 
2761 	if (rec_get_deleted_flag(rec, 0)) {
2762 		/* This record has been merged already,
2763 		but apparently the system crashed before
2764 		the change was discarded from the buffer.
2765 		Pretend that the record does not exist. */
2766 		return(0);
2767 	}
2768 
2769 	types = rec_get_nth_field_old(rec, IBUF_REC_FIELD_METADATA, &len);
2770 
2771 	switch (UNIV_EXPECT(int(len % DATA_NEW_ORDER_NULL_TYPE_BUF_SIZE),
2772 			    IBUF_REC_INFO_SIZE)) {
2773 	default:
2774 		ut_error;
2775 	case 0:
2776 		/* This ROW_TYPE=REDUNDANT record does not include an
2777 		operation counter.  Exclude it from the *n_recs,
2778 		because deletes cannot be buffered if there are
2779 		old-style inserts buffered for the page. */
2780 
2781 		len = ibuf_rec_get_size(rec, types, n_fields, 0);
2782 
2783 		return(len
2784 		       + rec_get_converted_extra_size(len, n_fields, 0)
2785 		       + page_dir_calc_reserved_space(1));
2786 	case 1:
2787 		/* This ROW_TYPE=COMPACT record does not include an
2788 		operation counter.  Exclude it from the *n_recs,
2789 		because deletes cannot be buffered if there are
2790 		old-style inserts buffered for the page. */
2791 		goto get_volume_comp;
2792 
2793 	case IBUF_REC_INFO_SIZE:
2794 		ibuf_op = (ibuf_op_t) types[IBUF_REC_OFFSET_TYPE];
2795 		break;
2796 	}
2797 
2798 	switch (ibuf_op) {
2799 	case IBUF_OP_INSERT:
2800 		/* Inserts can be done by updating a delete-marked record.
2801 		Because delete-mark and insert operations can be pointing to
2802 		the same records, we must not count duplicates. */
2803 	case IBUF_OP_DELETE_MARK:
2804 		/* There must be a record to delete-mark.
2805 		See if this record has been already buffered. */
2806 		if (n_recs && ibuf_get_volume_buffered_hash(rec, hash, size)) {
2807 			(*n_recs)++;
2808 		}
2809 
2810 		if (ibuf_op == IBUF_OP_DELETE_MARK) {
2811 			/* Setting the delete-mark flag does not
2812 			affect the available space on the page. */
2813 			return(0);
2814 		}
2815 		break;
2816 	case IBUF_OP_DELETE:
2817 		/* A record will be removed from the page. */
2818 		if (n_recs) {
2819 			(*n_recs)--;
2820 		}
2821 		/* While deleting a record actually frees up space,
2822 		we have to play it safe and pretend that it takes no
2823 		additional space (the record might not exist, etc.). */
2824 		return(0);
2825 	default:
2826 		ut_error;
2827 	}
2828 
2829 	ut_ad(ibuf_op == IBUF_OP_INSERT);
2830 
2831 get_volume_comp:
2832 	{
2833 		dtuple_t*	entry;
2834 		ulint		volume;
2835 		dict_index_t*	dummy_index;
2836 		mem_heap_t*	heap = mem_heap_create(500);
2837 
2838 		entry = ibuf_build_entry_from_ibuf_rec(
2839 			mtr, rec, heap, &dummy_index);
2840 
2841 		volume = rec_get_converted_size(dummy_index, entry, 0);
2842 
2843 		ibuf_dummy_index_free(dummy_index);
2844 		mem_heap_free(heap);
2845 
2846 		return(volume + page_dir_calc_reserved_space(1));
2847 	}
2848 }
2849 
2850 /*********************************************************************//**
2851 Gets an upper limit for the combined size of entries buffered in the insert
2852 buffer for a given page.
2853 @return upper limit for the volume of buffered inserts for the index
2854 page, in bytes; srv_page_size, if the entries for the index page span
2855 several pages in the insert buffer */
2856 static
2857 ulint
ibuf_get_volume_buffered(const btr_pcur_t * pcur,ulint space,ulint page_no,lint * n_recs,mtr_t * mtr)2858 ibuf_get_volume_buffered(
2859 /*=====================*/
2860 	const btr_pcur_t*pcur,	/*!< in: pcur positioned at a place in an
2861 				insert buffer tree where we would insert an
2862 				entry for the index page whose number is
2863 				page_no, latch mode has to be BTR_MODIFY_PREV
2864 				or BTR_MODIFY_TREE */
2865 	ulint		space,	/*!< in: space id */
2866 	ulint		page_no,/*!< in: page number of an index page */
2867 	lint*		n_recs,	/*!< in/out: minimum number of records on the
2868 				page after the buffered changes have been
2869 				applied, or NULL to disable the counting */
2870 	mtr_t*		mtr)	/*!< in: mini-transaction of pcur */
2871 {
2872 	ulint		volume;
2873 	const rec_t*	rec;
2874 	const page_t*	page;
2875 	ulint		prev_page_no;
2876 	const page_t*	prev_page;
2877 	ulint		next_page_no;
2878 	const page_t*	next_page;
2879 	/* bitmap of buffered recs */
2880 	ulint		hash_bitmap[128 / sizeof(ulint)];
2881 
2882 	ut_ad((pcur->latch_mode == BTR_MODIFY_PREV)
2883 	      || (pcur->latch_mode == BTR_MODIFY_TREE));
2884 
2885 	/* Count the volume of inserts earlier in the alphabetical order than
2886 	pcur */
2887 
2888 	volume = 0;
2889 
2890 	if (n_recs) {
2891 		memset(hash_bitmap, 0, sizeof hash_bitmap);
2892 	}
2893 
2894 	rec = btr_pcur_get_rec(pcur);
2895 	page = page_align(rec);
2896 	ut_ad(page_validate(page, ibuf->index));
2897 
2898 	if (page_rec_is_supremum(rec)) {
2899 		rec = page_rec_get_prev_const(rec);
2900 	}
2901 
2902 	for (; !page_rec_is_infimum(rec);
2903 	     rec = page_rec_get_prev_const(rec)) {
2904 		ut_ad(page_align(rec) == page);
2905 
2906 		if (page_no != ibuf_rec_get_page_no(mtr, rec)
2907 		    || space != ibuf_rec_get_space(mtr, rec)) {
2908 
2909 			goto count_later;
2910 		}
2911 
2912 		volume += ibuf_get_volume_buffered_count(
2913 			mtr, rec,
2914 			hash_bitmap, UT_ARR_SIZE(hash_bitmap), n_recs);
2915 	}
2916 
2917 	/* Look at the previous page */
2918 
2919 	prev_page_no = btr_page_get_prev(page);
2920 
2921 	if (prev_page_no == FIL_NULL) {
2922 
2923 		goto count_later;
2924 	}
2925 
2926 	{
2927 		buf_block_t*	block;
2928 
2929 		block = buf_page_get(
2930 			page_id_t(IBUF_SPACE_ID, prev_page_no),
2931 			0, RW_X_LATCH, mtr);
2932 
2933 		buf_block_dbg_add_level(block, SYNC_IBUF_TREE_NODE);
2934 
2935 		prev_page = buf_block_get_frame(block);
2936 		ut_ad(page_validate(prev_page, ibuf->index));
2937 	}
2938 
2939 #ifdef UNIV_BTR_DEBUG
2940 	ut_a(!memcmp(prev_page + FIL_PAGE_NEXT, page + FIL_PAGE_OFFSET, 4));
2941 #endif /* UNIV_BTR_DEBUG */
2942 
2943 	rec = page_get_supremum_rec(prev_page);
2944 	rec = page_rec_get_prev_const(rec);
2945 
2946 	for (;; rec = page_rec_get_prev_const(rec)) {
2947 		ut_ad(page_align(rec) == prev_page);
2948 
2949 		if (page_rec_is_infimum(rec)) {
2950 
2951 			/* We cannot go to yet a previous page, because we
2952 			do not have the x-latch on it, and cannot acquire one
2953 			because of the latching order: we have to give up */
2954 
2955 			return(srv_page_size);
2956 		}
2957 
2958 		if (page_no != ibuf_rec_get_page_no(mtr, rec)
2959 		    || space != ibuf_rec_get_space(mtr, rec)) {
2960 
2961 			goto count_later;
2962 		}
2963 
2964 		volume += ibuf_get_volume_buffered_count(
2965 			mtr, rec,
2966 			hash_bitmap, UT_ARR_SIZE(hash_bitmap), n_recs);
2967 	}
2968 
2969 count_later:
2970 	rec = btr_pcur_get_rec(pcur);
2971 
2972 	if (!page_rec_is_supremum(rec)) {
2973 		rec = page_rec_get_next_const(rec);
2974 	}
2975 
2976 	for (; !page_rec_is_supremum(rec);
2977 	     rec = page_rec_get_next_const(rec)) {
2978 		if (page_no != ibuf_rec_get_page_no(mtr, rec)
2979 		    || space != ibuf_rec_get_space(mtr, rec)) {
2980 
2981 			return(volume);
2982 		}
2983 
2984 		volume += ibuf_get_volume_buffered_count(
2985 			mtr, rec,
2986 			hash_bitmap, UT_ARR_SIZE(hash_bitmap), n_recs);
2987 	}
2988 
2989 	/* Look at the next page */
2990 
2991 	next_page_no = btr_page_get_next(page);
2992 
2993 	if (next_page_no == FIL_NULL) {
2994 
2995 		return(volume);
2996 	}
2997 
2998 	{
2999 		buf_block_t*	block;
3000 
3001 		block = buf_page_get(
3002 			page_id_t(IBUF_SPACE_ID, next_page_no),
3003 			0, RW_X_LATCH, mtr);
3004 
3005 		buf_block_dbg_add_level(block, SYNC_IBUF_TREE_NODE);
3006 
3007 		next_page = buf_block_get_frame(block);
3008 		ut_ad(page_validate(next_page, ibuf->index));
3009 	}
3010 
3011 #ifdef UNIV_BTR_DEBUG
3012 	ut_a(!memcmp(next_page + FIL_PAGE_PREV, page + FIL_PAGE_OFFSET, 4));
3013 #endif /* UNIV_BTR_DEBUG */
3014 
3015 	rec = page_get_infimum_rec(next_page);
3016 	rec = page_rec_get_next_const(rec);
3017 
3018 	for (;; rec = page_rec_get_next_const(rec)) {
3019 		ut_ad(page_align(rec) == next_page);
3020 
3021 		if (page_rec_is_supremum(rec)) {
3022 
3023 			/* We give up */
3024 
3025 			return(srv_page_size);
3026 		}
3027 
3028 		if (page_no != ibuf_rec_get_page_no(mtr, rec)
3029 		    || space != ibuf_rec_get_space(mtr, rec)) {
3030 
3031 			return(volume);
3032 		}
3033 
3034 		volume += ibuf_get_volume_buffered_count(
3035 			mtr, rec,
3036 			hash_bitmap, UT_ARR_SIZE(hash_bitmap), n_recs);
3037 	}
3038 }
3039 
3040 /*********************************************************************//**
3041 Reads the biggest tablespace id from the high end of the insert buffer
3042 tree and updates the counter in fil_system. */
3043 void
ibuf_update_max_tablespace_id(void)3044 ibuf_update_max_tablespace_id(void)
3045 /*===============================*/
3046 {
3047 	ulint		max_space_id;
3048 	const rec_t*	rec;
3049 	const byte*	field;
3050 	ulint		len;
3051 	btr_pcur_t	pcur;
3052 	mtr_t		mtr;
3053 
3054 	ut_a(!dict_table_is_comp(ibuf->index->table));
3055 
3056 	ibuf_mtr_start(&mtr);
3057 
3058 	btr_pcur_open_at_index_side(
3059 		false, ibuf->index, BTR_SEARCH_LEAF, &pcur, true, 0, &mtr);
3060 
3061 	ut_ad(page_validate(btr_pcur_get_page(&pcur), ibuf->index));
3062 
3063 	btr_pcur_move_to_prev(&pcur, &mtr);
3064 
3065 	if (btr_pcur_is_before_first_on_page(&pcur)) {
3066 		/* The tree is empty */
3067 
3068 		max_space_id = 0;
3069 	} else {
3070 		rec = btr_pcur_get_rec(&pcur);
3071 
3072 		field = rec_get_nth_field_old(rec, IBUF_REC_FIELD_SPACE, &len);
3073 
3074 		ut_a(len == 4);
3075 
3076 		max_space_id = mach_read_from_4(field);
3077 	}
3078 
3079 	ibuf_mtr_commit(&mtr);
3080 
3081 	/* printf("Maximum space id in insert buffer %lu\n", max_space_id); */
3082 
3083 	fil_set_max_space_id_if_bigger(max_space_id);
3084 }
3085 
3086 #ifdef UNIV_DEBUG
3087 # define ibuf_get_entry_counter_low(mtr,rec,space,page_no)	\
3088 	ibuf_get_entry_counter_low_func(mtr,rec,space,page_no)
3089 #else /* UNIV_DEBUG */
3090 # define ibuf_get_entry_counter_low(mtr,rec,space,page_no)	\
3091 	ibuf_get_entry_counter_low_func(rec,space,page_no)
3092 #endif
3093 /****************************************************************//**
3094 Helper function for ibuf_get_entry_counter_func. Checks if rec is for
3095 (space, page_no), and if so, reads counter value from it and returns
3096 that + 1.
3097 @retval ULINT_UNDEFINED if the record does not contain any counter
3098 @retval 0 if the record is not for (space, page_no)
3099 @retval 1 + previous counter value, otherwise */
3100 static
3101 ulint
ibuf_get_entry_counter_low_func(mtr_t * mtr,const rec_t * rec,ulint space,ulint page_no)3102 ibuf_get_entry_counter_low_func(
3103 /*============================*/
3104 #ifdef UNIV_DEBUG
3105 	mtr_t*		mtr,		/*!< in: mini-transaction of rec */
3106 #endif /* UNIV_DEBUG */
3107 	const rec_t*	rec,		/*!< in: insert buffer record */
3108 	ulint		space,		/*!< in: space id */
3109 	ulint		page_no)	/*!< in: page number */
3110 {
3111 	ulint		counter;
3112 	const byte*	field;
3113 	ulint		len;
3114 
3115 	ut_ad(ibuf_inside(mtr));
3116 	ut_ad(mtr_memo_contains_page_flagged(mtr, rec, MTR_MEMO_PAGE_X_FIX
3117 					     | MTR_MEMO_PAGE_S_FIX));
3118 	ut_ad(rec_get_n_fields_old(rec) > 2);
3119 
3120 	field = rec_get_nth_field_old(rec, IBUF_REC_FIELD_MARKER, &len);
3121 
3122 	ut_a(len == 1);
3123 
3124 	/* Check the tablespace identifier. */
3125 	field = rec_get_nth_field_old(rec, IBUF_REC_FIELD_SPACE, &len);
3126 
3127 	ut_a(len == 4);
3128 
3129 	if (mach_read_from_4(field) != space) {
3130 
3131 		return(0);
3132 	}
3133 
3134 	/* Check the page offset. */
3135 	field = rec_get_nth_field_old(rec, IBUF_REC_FIELD_PAGE, &len);
3136 	ut_a(len == 4);
3137 
3138 	if (mach_read_from_4(field) != page_no) {
3139 
3140 		return(0);
3141 	}
3142 
3143 	/* Check if the record contains a counter field. */
3144 	field = rec_get_nth_field_old(rec, IBUF_REC_FIELD_METADATA, &len);
3145 
3146 	switch (len % DATA_NEW_ORDER_NULL_TYPE_BUF_SIZE) {
3147 	default:
3148 		ut_error;
3149 	case 0: /* ROW_FORMAT=REDUNDANT */
3150 	case 1: /* ROW_FORMAT=COMPACT */
3151 		return(ULINT_UNDEFINED);
3152 
3153 	case IBUF_REC_INFO_SIZE:
3154 		counter = mach_read_from_2(field + IBUF_REC_OFFSET_COUNTER);
3155 		ut_a(counter < 0xFFFF);
3156 		return(counter + 1);
3157 	}
3158 }
3159 
3160 #ifdef UNIV_DEBUG
3161 # define ibuf_get_entry_counter(space,page_no,rec,mtr,exact_leaf) \
3162 	ibuf_get_entry_counter_func(space,page_no,rec,mtr,exact_leaf)
3163 #else /* UNIV_DEBUG */
3164 # define ibuf_get_entry_counter(space,page_no,rec,mtr,exact_leaf) \
3165 	ibuf_get_entry_counter_func(space,page_no,rec,exact_leaf)
3166 #endif /* UNIV_DEBUG */
3167 
3168 /****************************************************************//**
3169 Calculate the counter field for an entry based on the current
3170 last record in ibuf for (space, page_no).
3171 @return the counter field, or ULINT_UNDEFINED
3172 if we should abort this insertion to ibuf */
3173 static
3174 ulint
ibuf_get_entry_counter_func(ulint space,ulint page_no,const rec_t * rec,mtr_t * mtr,ibool only_leaf)3175 ibuf_get_entry_counter_func(
3176 /*========================*/
3177 	ulint		space,		/*!< in: space id of entry */
3178 	ulint		page_no,	/*!< in: page number of entry */
3179 	const rec_t*	rec,		/*!< in: the record preceding the
3180 					insertion point */
3181 #ifdef UNIV_DEBUG
3182 	mtr_t*		mtr,		/*!< in: mini-transaction */
3183 #endif /* UNIV_DEBUG */
3184 	ibool		only_leaf)	/*!< in: TRUE if this is the only
3185 					leaf page that can contain entries
3186 					for (space,page_no), that is, there
3187 					was no exact match for (space,page_no)
3188 					in the node pointer */
3189 {
3190 	ut_ad(ibuf_inside(mtr));
3191 	ut_ad(mtr_memo_contains_page(mtr, rec, MTR_MEMO_PAGE_X_FIX));
3192 	ut_ad(page_validate(page_align(rec), ibuf->index));
3193 
3194 	if (page_rec_is_supremum(rec)) {
3195 		/* This is just for safety. The record should be a
3196 		page infimum or a user record. */
3197 		ut_ad(0);
3198 		return(ULINT_UNDEFINED);
3199 	} else if (!page_rec_is_infimum(rec)) {
3200 		return(ibuf_get_entry_counter_low(mtr, rec, space, page_no));
3201 	} else if (only_leaf || !page_has_prev(page_align(rec))) {
3202 		/* The parent node pointer did not contain the
3203 		searched for (space, page_no), which means that the
3204 		search ended on the correct page regardless of the
3205 		counter value, and since we're at the infimum record,
3206 		there are no existing records. */
3207 		return(0);
3208 	} else {
3209 		/* We used to read the previous page here. It would
3210 		break the latching order, because the caller has
3211 		buffer-fixed an insert buffer bitmap page. */
3212 		return(ULINT_UNDEFINED);
3213 	}
3214 }
3215 
3216 
3217 /** Translates the ibuf free bits to the free space on a page in bytes.
3218 @param[in]	physical_size	page_size
3219 @param[in]	bits		value for ibuf bitmap bits
3220 @return maximum insert size after reorganize for the page */
3221 inline ulint
ibuf_index_page_calc_free_from_bits(ulint physical_size,ulint bits)3222 ibuf_index_page_calc_free_from_bits(ulint physical_size, ulint bits)
3223 {
3224 	ut_ad(bits < 4);
3225 	ut_ad(physical_size > IBUF_PAGE_SIZE_PER_FREE_SPACE);
3226 
3227 	if (bits == 3) {
3228 		bits = 4;
3229 	}
3230 
3231 	return bits * physical_size / IBUF_PAGE_SIZE_PER_FREE_SPACE;
3232 }
3233 
3234 /** Buffer an operation in the insert/delete buffer, instead of doing it
3235 directly to the disk page, if this is possible.
3236 @param[in]	mode		BTR_MODIFY_PREV or BTR_MODIFY_TREE
3237 @param[in]	op		operation type
3238 @param[in]	no_counter	TRUE=use 5.0.3 format; FALSE=allow delete
3239 buffering
3240 @param[in]	entry		index entry to insert
3241 @param[in]	entry_size	rec_get_converted_size(index, entry)
3242 @param[in,out]	index		index where to insert; must not be unique
3243 or clustered
3244 @param[in]	page_id		page id where to insert
3245 @param[in]	zip_size	ROW_FORMAT=COMPRESSED page size, or 0
3246 @param[in,out]	thr		query thread
3247 @return DB_SUCCESS, DB_STRONG_FAIL or other error */
3248 static MY_ATTRIBUTE((warn_unused_result))
3249 dberr_t
ibuf_insert_low(ulint mode,ibuf_op_t op,ibool no_counter,const dtuple_t * entry,ulint entry_size,dict_index_t * index,const page_id_t page_id,ulint zip_size,que_thr_t * thr)3250 ibuf_insert_low(
3251 	ulint			mode,
3252 	ibuf_op_t		op,
3253 	ibool			no_counter,
3254 	const dtuple_t*		entry,
3255 	ulint			entry_size,
3256 	dict_index_t*		index,
3257 	const page_id_t		page_id,
3258 	ulint			zip_size,
3259 	que_thr_t*		thr)
3260 {
3261 	big_rec_t*	dummy_big_rec;
3262 	btr_pcur_t	pcur;
3263 	btr_cur_t*	cursor;
3264 	dtuple_t*	ibuf_entry;
3265 	mem_heap_t*	offsets_heap	= NULL;
3266 	mem_heap_t*	heap;
3267 	rec_offs*	offsets		= NULL;
3268 	ulint		buffered;
3269 	lint		min_n_recs;
3270 	rec_t*		ins_rec;
3271 	ibool		old_bit_value;
3272 	page_t*		bitmap_page;
3273 	buf_block_t*	block;
3274 	page_t*		root;
3275 	dberr_t		err;
3276 	ibool		do_merge;
3277 	ulint		space_ids[IBUF_MAX_N_PAGES_MERGED];
3278 	ulint		page_nos[IBUF_MAX_N_PAGES_MERGED];
3279 	ulint		n_stored;
3280 	mtr_t		mtr;
3281 	mtr_t		bitmap_mtr;
3282 
3283 	ut_a(!dict_index_is_clust(index));
3284 	ut_ad(!dict_index_is_spatial(index));
3285 	ut_ad(dtuple_check_typed(entry));
3286 	ut_ad(!no_counter || op == IBUF_OP_INSERT);
3287 	ut_ad(page_id.space() == index->table->space_id);
3288 	ut_a(op < IBUF_OP_COUNT);
3289 
3290 	do_merge = FALSE;
3291 
3292 	/* Perform dirty reads of ibuf->size and ibuf->max_size, to
3293 	reduce ibuf_mutex contention. Given that ibuf->max_size and
3294 	ibuf->size fit in a machine word, this should be OK; at worst
3295 	we are doing some excessive ibuf_contract() or occasionally
3296 	skipping an ibuf_contract(). */
3297 	if (ibuf->max_size == 0) {
3298 		return(DB_STRONG_FAIL);
3299 	}
3300 
3301 	if (ibuf->size >= ibuf->max_size + IBUF_CONTRACT_DO_NOT_INSERT) {
3302 		/* Insert buffer is now too big, contract it but do not try
3303 		to insert */
3304 
3305 
3306 #ifdef UNIV_IBUF_DEBUG
3307 		fputs("Ibuf too big\n", stderr);
3308 #endif
3309 		ibuf_contract(true);
3310 
3311 		return(DB_STRONG_FAIL);
3312 	}
3313 
3314 	heap = mem_heap_create(1024);
3315 
3316 	/* Build the entry which contains the space id and the page number
3317 	as the first fields and the type information for other fields, and
3318 	which will be inserted to the insert buffer. Using a counter value
3319 	of 0xFFFF we find the last record for (space, page_no), from which
3320 	we can then read the counter value N and use N + 1 in the record we
3321 	insert. (We patch the ibuf_entry's counter field to the correct
3322 	value just before actually inserting the entry.) */
3323 
3324 	ibuf_entry = ibuf_entry_build(
3325 		op, index, entry, page_id.space(), page_id.page_no(),
3326 		no_counter ? ULINT_UNDEFINED : 0xFFFF, heap);
3327 
3328 	/* Open a cursor to the insert buffer tree to calculate if we can add
3329 	the new entry to it without exceeding the free space limit for the
3330 	page. */
3331 
3332 	if (BTR_LATCH_MODE_WITHOUT_INTENTION(mode) == BTR_MODIFY_TREE) {
3333 		for (;;) {
3334 			mutex_enter(&ibuf_pessimistic_insert_mutex);
3335 			mutex_enter(&ibuf_mutex);
3336 
3337 			if (UNIV_LIKELY(ibuf_data_enough_free_for_insert())) {
3338 
3339 				break;
3340 			}
3341 
3342 			mutex_exit(&ibuf_mutex);
3343 			mutex_exit(&ibuf_pessimistic_insert_mutex);
3344 
3345 			if (!ibuf_add_free_page()) {
3346 
3347 				mem_heap_free(heap);
3348 				return(DB_STRONG_FAIL);
3349 			}
3350 		}
3351 	}
3352 
3353 	ibuf_mtr_start(&mtr);
3354 
3355 	btr_pcur_open(ibuf->index, ibuf_entry, PAGE_CUR_LE, mode, &pcur, &mtr);
3356 	ut_ad(page_validate(btr_pcur_get_page(&pcur), ibuf->index));
3357 
3358 	/* Find out the volume of already buffered inserts for the same index
3359 	page */
3360 	min_n_recs = 0;
3361 	buffered = ibuf_get_volume_buffered(&pcur,
3362 					    page_id.space(),
3363 					    page_id.page_no(),
3364 					    op == IBUF_OP_DELETE
3365 					    ? &min_n_recs
3366 					    : NULL, &mtr);
3367 
3368 	const ulint physical_size = zip_size ? zip_size : srv_page_size;
3369 
3370 	if (op == IBUF_OP_DELETE
3371 	    && (min_n_recs < 2 || buf_pool_watch_occurred(page_id))) {
3372 		/* The page could become empty after the record is
3373 		deleted, or the page has been read in to the buffer
3374 		pool.  Refuse to buffer the operation. */
3375 
3376 		/* The buffer pool watch is needed for IBUF_OP_DELETE
3377 		because of latching order considerations.  We can
3378 		check buf_pool_watch_occurred() only after latching
3379 		the insert buffer B-tree pages that contain buffered
3380 		changes for the page.  We never buffer IBUF_OP_DELETE,
3381 		unless some IBUF_OP_INSERT or IBUF_OP_DELETE_MARK have
3382 		been previously buffered for the page.  Because there
3383 		are buffered operations for the page, the insert
3384 		buffer B-tree page latches held by mtr will guarantee
3385 		that no changes for the user page will be merged
3386 		before mtr_commit(&mtr).  We must not mtr_commit(&mtr)
3387 		until after the IBUF_OP_DELETE has been buffered. */
3388 
3389 fail_exit:
3390 		if (BTR_LATCH_MODE_WITHOUT_INTENTION(mode) == BTR_MODIFY_TREE) {
3391 			mutex_exit(&ibuf_mutex);
3392 			mutex_exit(&ibuf_pessimistic_insert_mutex);
3393 		}
3394 
3395 		err = DB_STRONG_FAIL;
3396 		goto func_exit;
3397 	}
3398 
3399 	/* After this point, the page could still be loaded to the
3400 	buffer pool, but we do not have to care about it, since we are
3401 	holding a latch on the insert buffer leaf page that contains
3402 	buffered changes for (space, page_no).  If the page enters the
3403 	buffer pool, buf_page_io_complete() for (space, page_no) will
3404 	have to acquire a latch on the same insert buffer leaf page,
3405 	which it cannot do until we have buffered the IBUF_OP_DELETE
3406 	and done mtr_commit(&mtr) to release the latch. */
3407 
3408 	ibuf_mtr_start(&bitmap_mtr);
3409 	index->set_modified(bitmap_mtr);
3410 
3411 	bitmap_page = ibuf_bitmap_get_map_page(page_id, zip_size, &bitmap_mtr);
3412 
3413 	/* We check if the index page is suitable for buffered entries */
3414 
3415 	if (buf_page_peek(page_id)
3416 	    || lock_rec_expl_exist_on_page(page_id.space(),
3417 					   page_id.page_no())) {
3418 
3419 		ibuf_mtr_commit(&bitmap_mtr);
3420 		goto fail_exit;
3421 	}
3422 
3423 	if (op == IBUF_OP_INSERT) {
3424 		ulint	bits = ibuf_bitmap_page_get_bits(
3425 			bitmap_page, page_id, physical_size, IBUF_BITMAP_FREE,
3426 			&bitmap_mtr);
3427 
3428 		if (buffered + entry_size + page_dir_calc_reserved_space(1)
3429 		    > ibuf_index_page_calc_free_from_bits(physical_size,
3430 							  bits)) {
3431 			/* Release the bitmap page latch early. */
3432 			ibuf_mtr_commit(&bitmap_mtr);
3433 
3434 			/* It may not fit */
3435 			do_merge = TRUE;
3436 
3437 			ibuf_get_merge_page_nos(FALSE,
3438 						btr_pcur_get_rec(&pcur), &mtr,
3439 						space_ids,
3440 						page_nos, &n_stored);
3441 
3442 			goto fail_exit;
3443 		}
3444 	}
3445 
3446 	if (!no_counter) {
3447 		/* Patch correct counter value to the entry to
3448 		insert. This can change the insert position, which can
3449 		result in the need to abort in some cases. */
3450 		ulint		counter = ibuf_get_entry_counter(
3451 			page_id.space(), page_id.page_no(),
3452 			btr_pcur_get_rec(&pcur), &mtr,
3453 			btr_pcur_get_btr_cur(&pcur)->low_match
3454 			< IBUF_REC_FIELD_METADATA);
3455 		dfield_t*	field;
3456 
3457 		if (counter == ULINT_UNDEFINED) {
3458 			ibuf_mtr_commit(&bitmap_mtr);
3459 			goto fail_exit;
3460 		}
3461 
3462 		field = dtuple_get_nth_field(
3463 			ibuf_entry, IBUF_REC_FIELD_METADATA);
3464 		mach_write_to_2(
3465 			(byte*) dfield_get_data(field)
3466 			+ IBUF_REC_OFFSET_COUNTER, counter);
3467 	}
3468 
3469 	/* Set the bitmap bit denoting that the insert buffer contains
3470 	buffered entries for this index page, if the bit is not set yet */
3471 
3472 	old_bit_value = ibuf_bitmap_page_get_bits(
3473 		bitmap_page, page_id, physical_size,
3474 		IBUF_BITMAP_BUFFERED, &bitmap_mtr);
3475 
3476 	if (!old_bit_value) {
3477 		ibuf_bitmap_page_set_bits(bitmap_page, page_id, physical_size,
3478 					  IBUF_BITMAP_BUFFERED, TRUE,
3479 					  &bitmap_mtr);
3480 	}
3481 
3482 	ibuf_mtr_commit(&bitmap_mtr);
3483 
3484 	cursor = btr_pcur_get_btr_cur(&pcur);
3485 
3486 	if (mode == BTR_MODIFY_PREV) {
3487 		err = btr_cur_optimistic_insert(
3488 			BTR_NO_LOCKING_FLAG | BTR_NO_UNDO_LOG_FLAG,
3489 			cursor, &offsets, &offsets_heap,
3490 			ibuf_entry, &ins_rec,
3491 			&dummy_big_rec, 0, thr, &mtr);
3492 		block = btr_cur_get_block(cursor);
3493 		ut_ad(block->page.id.space() == IBUF_SPACE_ID);
3494 
3495 		/* If this is the root page, update ibuf->empty. */
3496 		if (block->page.id.page_no() == FSP_IBUF_TREE_ROOT_PAGE_NO) {
3497 			const page_t*	root = buf_block_get_frame(block);
3498 
3499 			ut_ad(page_get_space_id(root) == IBUF_SPACE_ID);
3500 			ut_ad(page_get_page_no(root)
3501 			      == FSP_IBUF_TREE_ROOT_PAGE_NO);
3502 
3503 			ibuf->empty = page_is_empty(root);
3504 		}
3505 	} else {
3506 		ut_ad(BTR_LATCH_MODE_WITHOUT_INTENTION(mode)
3507 		      == BTR_MODIFY_TREE);
3508 
3509 		/* We acquire an sx-latch to the root page before the insert,
3510 		because a pessimistic insert releases the tree x-latch,
3511 		which would cause the sx-latching of the root after that to
3512 		break the latching order. */
3513 
3514 		root = ibuf_tree_root_get(&mtr);
3515 
3516 		err = btr_cur_optimistic_insert(
3517 			BTR_NO_LOCKING_FLAG | BTR_NO_UNDO_LOG_FLAG,
3518 			cursor, &offsets, &offsets_heap,
3519 			ibuf_entry, &ins_rec,
3520 			&dummy_big_rec, 0, thr, &mtr);
3521 
3522 		if (err == DB_FAIL) {
3523 			err = btr_cur_pessimistic_insert(
3524 				BTR_NO_LOCKING_FLAG | BTR_NO_UNDO_LOG_FLAG,
3525 				cursor, &offsets, &offsets_heap,
3526 				ibuf_entry, &ins_rec,
3527 				&dummy_big_rec, 0, thr, &mtr);
3528 		}
3529 
3530 		mutex_exit(&ibuf_pessimistic_insert_mutex);
3531 		ibuf_size_update(root);
3532 		mutex_exit(&ibuf_mutex);
3533 		ibuf->empty = page_is_empty(root);
3534 
3535 		block = btr_cur_get_block(cursor);
3536 		ut_ad(block->page.id.space() == IBUF_SPACE_ID);
3537 	}
3538 
3539 	if (offsets_heap) {
3540 		mem_heap_free(offsets_heap);
3541 	}
3542 
3543 	if (err == DB_SUCCESS && op != IBUF_OP_DELETE) {
3544 		/* Update the page max trx id field */
3545 		page_update_max_trx_id(block, NULL,
3546 				       thr_get_trx(thr)->id, &mtr);
3547 	}
3548 
3549 func_exit:
3550 	ibuf_mtr_commit(&mtr);
3551 	btr_pcur_close(&pcur);
3552 
3553 	mem_heap_free(heap);
3554 
3555 	if (err == DB_SUCCESS
3556 	    && BTR_LATCH_MODE_WITHOUT_INTENTION(mode) == BTR_MODIFY_TREE) {
3557 		ibuf_contract_after_insert(entry_size);
3558 	}
3559 
3560 	if (do_merge) {
3561 #ifdef UNIV_IBUF_DEBUG
3562 		ut_a(n_stored <= IBUF_MAX_N_PAGES_MERGED);
3563 #endif
3564 		buf_read_ibuf_merge_pages(false, space_ids,
3565 					  page_nos, n_stored);
3566 	}
3567 
3568 	return(err);
3569 }
3570 
3571 /** Buffer an operation in the change buffer, instead of applying it
3572 directly to the file page, if this is possible. Does not do it if the index
3573 is clustered or unique.
3574 @param[in]	op		operation type
3575 @param[in]	entry		index entry to insert
3576 @param[in,out]	index		index where to insert
3577 @param[in]	page_id		page id where to insert
3578 @param[in]	zip_size	ROW_FORMAT=COMPRESSED page size, or 0
3579 @param[in,out]	thr		query thread
3580 @return true if success */
3581 bool
ibuf_insert(ibuf_op_t op,const dtuple_t * entry,dict_index_t * index,const page_id_t page_id,ulint zip_size,que_thr_t * thr)3582 ibuf_insert(
3583 	ibuf_op_t		op,
3584 	const dtuple_t*		entry,
3585 	dict_index_t*		index,
3586 	const page_id_t		page_id,
3587 	ulint			zip_size,
3588 	que_thr_t*		thr)
3589 {
3590 	dberr_t		err;
3591 	ulint		entry_size;
3592 	ibool		no_counter;
3593 	/* Read the settable global variable only once in
3594 	this function, so that we will have a consistent view of it. */
3595 	ibuf_use_t	use		= ibuf_use_t(innodb_change_buffering);
3596 	DBUG_ENTER("ibuf_insert");
3597 
3598 	DBUG_PRINT("ibuf", ("op: %d, space: " UINT32PF ", page_no: " UINT32PF,
3599 			    op, page_id.space(), page_id.page_no()));
3600 
3601 	ut_ad(dtuple_check_typed(entry));
3602 	ut_ad(page_id.space() != SRV_TMP_SPACE_ID);
3603 
3604 	ut_a(!dict_index_is_clust(index));
3605 	ut_ad(!index->table->is_temporary());
3606 
3607 	no_counter = use <= IBUF_USE_INSERT;
3608 
3609 	switch (op) {
3610 	case IBUF_OP_INSERT:
3611 		switch (use) {
3612 		case IBUF_USE_NONE:
3613 		case IBUF_USE_DELETE:
3614 		case IBUF_USE_DELETE_MARK:
3615 			DBUG_RETURN(false);
3616 		case IBUF_USE_INSERT:
3617 		case IBUF_USE_INSERT_DELETE_MARK:
3618 		case IBUF_USE_ALL:
3619 			goto check_watch;
3620 		}
3621 		break;
3622 	case IBUF_OP_DELETE_MARK:
3623 		switch (use) {
3624 		case IBUF_USE_NONE:
3625 		case IBUF_USE_INSERT:
3626 			DBUG_RETURN(false);
3627 		case IBUF_USE_DELETE_MARK:
3628 		case IBUF_USE_DELETE:
3629 		case IBUF_USE_INSERT_DELETE_MARK:
3630 		case IBUF_USE_ALL:
3631 			ut_ad(!no_counter);
3632 			goto check_watch;
3633 		}
3634 		break;
3635 	case IBUF_OP_DELETE:
3636 		switch (use) {
3637 		case IBUF_USE_NONE:
3638 		case IBUF_USE_INSERT:
3639 		case IBUF_USE_INSERT_DELETE_MARK:
3640 			DBUG_RETURN(false);
3641 		case IBUF_USE_DELETE_MARK:
3642 		case IBUF_USE_DELETE:
3643 		case IBUF_USE_ALL:
3644 			ut_ad(!no_counter);
3645 			goto skip_watch;
3646 		}
3647 		break;
3648 	case IBUF_OP_COUNT:
3649 		break;
3650 	}
3651 
3652 	/* unknown op or use */
3653 	ut_error;
3654 
3655 check_watch:
3656 	/* If a thread attempts to buffer an insert on a page while a
3657 	purge is in progress on the same page, the purge must not be
3658 	buffered, because it could remove a record that was
3659 	re-inserted later.  For simplicity, we block the buffering of
3660 	all operations on a page that has a purge pending.
3661 
3662 	We do not check this in the IBUF_OP_DELETE case, because that
3663 	would always trigger the buffer pool watch during purge and
3664 	thus prevent the buffering of delete operations.  We assume
3665 	that the issuer of IBUF_OP_DELETE has called
3666 	buf_pool_watch_set(space, page_no). */
3667 
3668 	{
3669 		buf_pool_t*	buf_pool = buf_pool_get(page_id);
3670 		buf_page_t*	bpage
3671 			= buf_page_get_also_watch(buf_pool, page_id);
3672 
3673 		if (bpage != NULL) {
3674 			/* A buffer pool watch has been set or the
3675 			page has been read into the buffer pool.
3676 			Do not buffer the request.  If a purge operation
3677 			is being buffered, have this request executed
3678 			directly on the page in the buffer pool after the
3679 			buffered entries for this page have been merged. */
3680 			DBUG_RETURN(false);
3681 		}
3682 	}
3683 
3684 skip_watch:
3685 	entry_size = rec_get_converted_size(index, entry, 0);
3686 
3687 	if (entry_size
3688 	    >= page_get_free_space_of_empty(dict_table_is_comp(index->table))
3689 	    / 2) {
3690 
3691 		DBUG_RETURN(false);
3692 	}
3693 
3694 	err = ibuf_insert_low(BTR_MODIFY_PREV, op, no_counter,
3695 			      entry, entry_size,
3696 			      index, page_id, zip_size, thr);
3697 	if (err == DB_FAIL) {
3698 		err = ibuf_insert_low(BTR_MODIFY_TREE | BTR_LATCH_FOR_INSERT,
3699 				      op, no_counter, entry, entry_size,
3700 				      index, page_id, zip_size, thr);
3701 	}
3702 
3703 	ut_a(err == DB_SUCCESS || err == DB_STRONG_FAIL
3704 	     || err == DB_TOO_BIG_RECORD);
3705 
3706 	DBUG_RETURN(err == DB_SUCCESS);
3707 }
3708 
3709 /********************************************************************//**
3710 During merge, inserts to an index page a secondary index entry extracted
3711 from the insert buffer.
3712 @return	newly inserted record */
3713 static MY_ATTRIBUTE((nonnull))
3714 rec_t*
ibuf_insert_to_index_page_low(const dtuple_t * entry,buf_block_t * block,dict_index_t * index,rec_offs ** offsets,mem_heap_t * heap,mtr_t * mtr,page_cur_t * page_cur)3715 ibuf_insert_to_index_page_low(
3716 /*==========================*/
3717 	const dtuple_t*	entry,	/*!< in: buffered entry to insert */
3718 	buf_block_t*	block,	/*!< in/out: index page where the buffered
3719 				entry should be placed */
3720 	dict_index_t*	index,	/*!< in: record descriptor */
3721 	rec_offs**	offsets,/*!< out: offsets on *rec */
3722 	mem_heap_t*	heap,	/*!< in/out: memory heap */
3723 	mtr_t*		mtr,	/*!< in/out: mtr */
3724 	page_cur_t*	page_cur)/*!< in/out: cursor positioned on the record
3725 				after which to insert the buffered entry */
3726 {
3727 	const page_t*	page;
3728 	const page_t*	bitmap_page;
3729 	ulint		old_bits;
3730 	rec_t*		rec;
3731 	DBUG_ENTER("ibuf_insert_to_index_page_low");
3732 
3733 	rec = page_cur_tuple_insert(page_cur, entry, index,
3734 				    offsets, &heap, 0, mtr);
3735 	if (rec != NULL) {
3736 		DBUG_RETURN(rec);
3737 	}
3738 
3739 	/* Page reorganization or recompression should already have
3740 	been attempted by page_cur_tuple_insert(). Besides, per
3741 	ibuf_index_page_calc_free_zip() the page should not have been
3742 	recompressed or reorganized. */
3743 	ut_ad(!buf_block_get_page_zip(block));
3744 
3745 	/* If the record did not fit, reorganize */
3746 
3747 	btr_page_reorganize(page_cur, index, mtr);
3748 
3749 	/* This time the record must fit */
3750 
3751 	rec = page_cur_tuple_insert(page_cur, entry, index,
3752 				    offsets, &heap, 0, mtr);
3753 	if (rec != NULL) {
3754 		DBUG_RETURN(rec);
3755 	}
3756 
3757 	page = buf_block_get_frame(block);
3758 
3759 	ib::error() << "Insert buffer insert fails; page free "
3760 		<< page_get_max_insert_size(page, 1) << ", dtuple size "
3761 		<< rec_get_converted_size(index, entry, 0);
3762 
3763 	fputs("InnoDB: Cannot insert index record ", stderr);
3764 	dtuple_print(stderr, entry);
3765 	fputs("\nInnoDB: The table where this index record belongs\n"
3766 	      "InnoDB: is now probably corrupt. Please run CHECK TABLE on\n"
3767 	      "InnoDB: that table.\n", stderr);
3768 
3769 	bitmap_page = ibuf_bitmap_get_map_page(block->page.id,
3770 					       block->zip_size(), mtr);
3771 	old_bits = ibuf_bitmap_page_get_bits(
3772 		bitmap_page, block->page.id, block->zip_size(),
3773 		IBUF_BITMAP_FREE, mtr);
3774 
3775 	ib::error() << "page " << block->page.id << ", size "
3776 		<< block->physical_size() << ", bitmap bits " << old_bits;
3777 
3778 	ib::error() << BUG_REPORT_MSG;
3779 
3780 	ut_ad(0);
3781 	DBUG_RETURN(NULL);
3782 }
3783 
3784 /************************************************************************
3785 During merge, inserts to an index page a secondary index entry extracted
3786 from the insert buffer. */
3787 static
3788 void
ibuf_insert_to_index_page(const dtuple_t * entry,buf_block_t * block,dict_index_t * index,mtr_t * mtr)3789 ibuf_insert_to_index_page(
3790 /*======================*/
3791 	const dtuple_t*	entry,	/*!< in: buffered entry to insert */
3792 	buf_block_t*	block,	/*!< in/out: index page where the buffered entry
3793 				should be placed */
3794 	dict_index_t*	index,	/*!< in: record descriptor */
3795 	mtr_t*		mtr)	/*!< in: mtr */
3796 {
3797 	page_cur_t	page_cur;
3798 	ulint		low_match;
3799 	page_t*		page		= buf_block_get_frame(block);
3800 	rec_t*		rec;
3801 	rec_offs*	offsets;
3802 	mem_heap_t*	heap;
3803 
3804 	DBUG_ENTER("ibuf_insert_to_index_page");
3805 
3806 	DBUG_PRINT("ibuf", ("page " UINT32PF ":" UINT32PF,
3807 			    block->page.id.space(),
3808 			    block->page.id.page_no()));
3809 
3810 	ut_ad(!dict_index_is_online_ddl(index));// this is an ibuf_dummy index
3811 	ut_ad(ibuf_inside(mtr));
3812 	ut_ad(dtuple_check_typed(entry));
3813 #ifdef BTR_CUR_HASH_ADAPT
3814 	/* A change buffer merge must occur before users are granted
3815 	any access to the page. No adaptive hash index entries may
3816 	point to a freshly read page. */
3817 	ut_ad(!block->index);
3818 	assert_block_ahi_empty(block);
3819 #endif /* BTR_CUR_HASH_ADAPT */
3820 	ut_ad(mtr->is_named_space(block->page.id.space()));
3821 
3822 	if (UNIV_UNLIKELY(dict_table_is_comp(index->table)
3823 			  != (ibool)!!page_is_comp(page))) {
3824 		ib::warn() << "Trying to insert a record from the insert"
3825 			" buffer to an index page but the 'compact' flag does"
3826 			" not match!";
3827 		goto dump;
3828 	}
3829 
3830 	rec = page_rec_get_next(page_get_infimum_rec(page));
3831 
3832 	if (page_rec_is_supremum(rec)) {
3833 		ib::warn() << "Trying to insert a record from the insert"
3834 			" buffer to an index page but the index page"
3835 			" is empty!";
3836 		goto dump;
3837 	}
3838 
3839 	if (!rec_n_fields_is_sane(index, rec, entry)) {
3840 		ib::warn() << "Trying to insert a record from the insert"
3841 			" buffer to an index page but the number of fields"
3842 			" does not match!";
3843 		rec_print(stderr, rec, index);
3844 dump:
3845 		dtuple_print(stderr, entry);
3846 		ut_ad(0);
3847 
3848 		ib::warn() << "The table where this index record belongs"
3849 			" is now probably corrupt. Please run CHECK TABLE on"
3850 			" your tables. " << BUG_REPORT_MSG;
3851 
3852 		DBUG_VOID_RETURN;
3853 	}
3854 
3855 	low_match = page_cur_search(block, index, entry, &page_cur);
3856 
3857 	heap = mem_heap_create(
3858 		sizeof(upd_t)
3859 		+ REC_OFFS_HEADER_SIZE * sizeof(*offsets)
3860 		+ dtuple_get_n_fields(entry)
3861 		* (sizeof(upd_field_t) + sizeof *offsets));
3862 
3863 	if (UNIV_UNLIKELY(low_match == dtuple_get_n_fields(entry))) {
3864 		upd_t*		update;
3865 		page_zip_des_t*	page_zip;
3866 
3867 		rec = page_cur_get_rec(&page_cur);
3868 
3869 		/* This is based on
3870 		row_ins_sec_index_entry_by_modify(BTR_MODIFY_LEAF). */
3871 		ut_ad(rec_get_deleted_flag(rec, page_is_comp(page)));
3872 
3873 		offsets = rec_get_offsets(rec, index, NULL, index->n_fields,
3874 					  ULINT_UNDEFINED, &heap);
3875 		update = row_upd_build_sec_rec_difference_binary(
3876 			rec, index, offsets, entry, heap);
3877 
3878 		page_zip = buf_block_get_page_zip(block);
3879 
3880 		if (update->n_fields == 0) {
3881 			/* The records only differ in the delete-mark.
3882 			Clear the delete-mark, like we did before
3883 			Bug #56680 was fixed. */
3884 			btr_cur_set_deleted_flag_for_ibuf(
3885 				rec, page_zip, FALSE, mtr);
3886 			goto updated_in_place;
3887 		}
3888 
3889 		/* Copy the info bits. Clear the delete-mark. */
3890 		update->info_bits = rec_get_info_bits(rec, page_is_comp(page));
3891 		update->info_bits &= ~REC_INFO_DELETED_FLAG;
3892 
3893 		/* We cannot invoke btr_cur_optimistic_update() here,
3894 		because we do not have a btr_cur_t or que_thr_t,
3895 		as the insert buffer merge occurs at a very low level. */
3896 		if (!row_upd_changes_field_size_or_external(index, offsets,
3897 							    update)
3898 		    && (!page_zip || btr_cur_update_alloc_zip(
3899 				page_zip, &page_cur, index, offsets,
3900 				rec_offs_size(offsets), false, mtr))) {
3901 			/* This is the easy case. Do something similar
3902 			to btr_cur_update_in_place(). */
3903 			rec = page_cur_get_rec(&page_cur);
3904 			row_upd_rec_in_place(rec, index, offsets,
3905 					     update, page_zip);
3906 
3907 			/* Log the update in place operation. During recovery
3908 			MLOG_COMP_REC_UPDATE_IN_PLACE/MLOG_REC_UPDATE_IN_PLACE
3909 			expects trx_id, roll_ptr for secondary indexes. So we
3910 			just write dummy trx_id(0), roll_ptr(0) */
3911 			btr_cur_update_in_place_log(BTR_KEEP_SYS_FLAG, rec,
3912 						    index, update, 0, 0, mtr);
3913 
3914 			DBUG_EXECUTE_IF(
3915 				"crash_after_log_ibuf_upd_inplace",
3916 				log_buffer_flush_to_disk();
3917 				ib::info() << "Wrote log record for ibuf"
3918 					" update in place operation";
3919 				DBUG_SUICIDE();
3920 			);
3921 
3922 			goto updated_in_place;
3923 		}
3924 
3925 		/* btr_cur_update_alloc_zip() may have changed this */
3926 		rec = page_cur_get_rec(&page_cur);
3927 
3928 		/* A collation may identify values that differ in
3929 		storage length.
3930 		Some examples (1 or 2 bytes):
3931 		utf8_turkish_ci: I = U+0131 LATIN SMALL LETTER DOTLESS I
3932 		utf8_general_ci: S = U+00DF LATIN SMALL LETTER SHARP S
3933 		utf8_general_ci: A = U+00E4 LATIN SMALL LETTER A WITH DIAERESIS
3934 
3935 		latin1_german2_ci: SS = U+00DF LATIN SMALL LETTER SHARP S
3936 
3937 		Examples of a character (3-byte UTF-8 sequence)
3938 		identified with 2 or 4 characters (1-byte UTF-8 sequences):
3939 
3940 		utf8_unicode_ci: 'II' = U+2171 SMALL ROMAN NUMERAL TWO
3941 		utf8_unicode_ci: '(10)' = U+247D PARENTHESIZED NUMBER TEN
3942 		*/
3943 
3944 		/* Delete the different-length record, and insert the
3945 		buffered one. */
3946 
3947 		lock_rec_store_on_page_infimum(block, rec);
3948 		page_cur_delete_rec(&page_cur, index, offsets, mtr);
3949 		page_cur_move_to_prev(&page_cur);
3950 		rec = ibuf_insert_to_index_page_low(entry, block, index,
3951 				      		    &offsets, heap, mtr,
3952 						    &page_cur);
3953 
3954 		ut_ad(!cmp_dtuple_rec(entry, rec, offsets));
3955 		lock_rec_restore_from_page_infimum(block, rec, block);
3956 	} else {
3957 		offsets = NULL;
3958 		ibuf_insert_to_index_page_low(entry, block, index,
3959 					      &offsets, heap, mtr,
3960 					      &page_cur);
3961 	}
3962 updated_in_place:
3963 	mem_heap_free(heap);
3964 
3965 	DBUG_VOID_RETURN;
3966 }
3967 
3968 /****************************************************************//**
3969 During merge, sets the delete mark on a record for a secondary index
3970 entry. */
3971 static
3972 void
ibuf_set_del_mark(const dtuple_t * entry,buf_block_t * block,const dict_index_t * index,mtr_t * mtr)3973 ibuf_set_del_mark(
3974 /*==============*/
3975 	const dtuple_t*		entry,	/*!< in: entry */
3976 	buf_block_t*		block,	/*!< in/out: block */
3977 	const dict_index_t*	index,	/*!< in: record descriptor */
3978 	mtr_t*			mtr)	/*!< in: mtr */
3979 {
3980 	page_cur_t	page_cur;
3981 	ulint		low_match;
3982 
3983 	ut_ad(ibuf_inside(mtr));
3984 	ut_ad(dtuple_check_typed(entry));
3985 
3986 	low_match = page_cur_search(block, index, entry, &page_cur);
3987 
3988 	if (low_match == dtuple_get_n_fields(entry)) {
3989 		rec_t*		rec;
3990 		page_zip_des_t*	page_zip;
3991 
3992 		rec = page_cur_get_rec(&page_cur);
3993 		page_zip = page_cur_get_page_zip(&page_cur);
3994 
3995 		/* Delete mark the old index record. According to a
3996 		comment in row_upd_sec_index_entry(), it can already
3997 		have been delete marked if a lock wait occurred in
3998 		row_ins_sec_index_entry() in a previous invocation of
3999 		row_upd_sec_index_entry(). */
4000 
4001 		if (UNIV_LIKELY
4002 		    (!rec_get_deleted_flag(
4003 			    rec, dict_table_is_comp(index->table)))) {
4004 			btr_cur_set_deleted_flag_for_ibuf(rec, page_zip,
4005 							  TRUE, mtr);
4006 		}
4007 	} else {
4008 		const page_t*		page
4009 			= page_cur_get_page(&page_cur);
4010 		const buf_block_t*	block
4011 			= page_cur_get_block(&page_cur);
4012 
4013 		ib::error() << "Unable to find a record to delete-mark";
4014 		fputs("InnoDB: tuple ", stderr);
4015 		dtuple_print(stderr, entry);
4016 		fputs("\n"
4017 		      "InnoDB: record ", stderr);
4018 		rec_print(stderr, page_cur_get_rec(&page_cur), index);
4019 
4020 		ib::error() << "page " << block->page.id << " ("
4021 			<< page_get_n_recs(page) << " records, index id "
4022 			<< btr_page_get_index_id(page) << ").";
4023 
4024 		ib::error() << BUG_REPORT_MSG;
4025 		ut_ad(0);
4026 	}
4027 }
4028 
4029 /****************************************************************//**
4030 During merge, delete a record for a secondary index entry. */
4031 static
4032 void
ibuf_delete(const dtuple_t * entry,buf_block_t * block,dict_index_t * index,mtr_t * mtr)4033 ibuf_delete(
4034 /*========*/
4035 	const dtuple_t*	entry,	/*!< in: entry */
4036 	buf_block_t*	block,	/*!< in/out: block */
4037 	dict_index_t*	index,	/*!< in: record descriptor */
4038 	mtr_t*		mtr)	/*!< in/out: mtr; must be committed
4039 				before latching any further pages */
4040 {
4041 	page_cur_t	page_cur;
4042 	ulint		low_match;
4043 
4044 	ut_ad(ibuf_inside(mtr));
4045 	ut_ad(dtuple_check_typed(entry));
4046 	ut_ad(!index->is_spatial());
4047 	ut_ad(!index->is_clust());
4048 
4049 	low_match = page_cur_search(block, index, entry, &page_cur);
4050 
4051 	if (low_match == dtuple_get_n_fields(entry)) {
4052 		page_zip_des_t*	page_zip= buf_block_get_page_zip(block);
4053 		page_t*		page	= buf_block_get_frame(block);
4054 		rec_t*		rec	= page_cur_get_rec(&page_cur);
4055 
4056 		/* TODO: the below should probably be a separate function,
4057 		it's a bastardized version of btr_cur_optimistic_delete. */
4058 
4059 		rec_offs	offsets_[REC_OFFS_NORMAL_SIZE];
4060 		rec_offs*	offsets	= offsets_;
4061 		mem_heap_t*	heap = NULL;
4062 		ulint		max_ins_size = 0;
4063 
4064 		rec_offs_init(offsets_);
4065 
4066 		offsets = rec_get_offsets(rec, index, offsets, index->n_fields,
4067 					  ULINT_UNDEFINED, &heap);
4068 
4069 		if (page_get_n_recs(page) <= 1
4070 		    || !(REC_INFO_DELETED_FLAG
4071 			 & rec_get_info_bits(rec, page_is_comp(page)))) {
4072 			/* Refuse to purge the last record or a
4073 			record that has not been marked for deletion. */
4074 			ib::error() << "Unable to purge a record";
4075 			fputs("InnoDB: tuple ", stderr);
4076 			dtuple_print(stderr, entry);
4077 			fputs("\n"
4078 			      "InnoDB: record ", stderr);
4079 			rec_print_new(stderr, rec, offsets);
4080 			fprintf(stderr, "\nspace " UINT32PF " offset " UINT32PF
4081 				" (%u records, index id %llu)\n"
4082 				"InnoDB: Submit a detailed bug report"
4083 				" to https://jira.mariadb.org/\n",
4084 				block->page.id.space(),
4085 				block->page.id.page_no(),
4086 				(unsigned) page_get_n_recs(page),
4087 				(ulonglong) btr_page_get_index_id(page));
4088 
4089 			ut_ad(0);
4090 			return;
4091 		}
4092 
4093 		lock_update_delete(block, rec);
4094 
4095 		if (!page_zip) {
4096 			max_ins_size
4097 				= page_get_max_insert_size_after_reorganize(
4098 					page, 1);
4099 		}
4100 #ifdef UNIV_ZIP_DEBUG
4101 		ut_a(!page_zip || page_zip_validate(page_zip, page, index));
4102 #endif /* UNIV_ZIP_DEBUG */
4103 		page_cur_delete_rec(&page_cur, index, offsets, mtr);
4104 #ifdef UNIV_ZIP_DEBUG
4105 		ut_a(!page_zip || page_zip_validate(page_zip, page, index));
4106 #endif /* UNIV_ZIP_DEBUG */
4107 
4108 		if (page_zip) {
4109 			ibuf_update_free_bits_zip(block, mtr);
4110 		} else {
4111 			ibuf_update_free_bits_low(block, max_ins_size, mtr);
4112 		}
4113 
4114 		if (UNIV_LIKELY_NULL(heap)) {
4115 			mem_heap_free(heap);
4116 		}
4117 	} else {
4118 		/* The record must have been purged already. */
4119 	}
4120 }
4121 
4122 /*********************************************************************//**
4123 Restores insert buffer tree cursor position
4124 @return TRUE if the position was restored; FALSE if not */
4125 static MY_ATTRIBUTE((nonnull))
4126 ibool
ibuf_restore_pos(ulint space,ulint page_no,const dtuple_t * search_tuple,ulint mode,btr_pcur_t * pcur,mtr_t * mtr)4127 ibuf_restore_pos(
4128 /*=============*/
4129 	ulint		space,	/*!< in: space id */
4130 	ulint		page_no,/*!< in: index page number where the record
4131 				should belong */
4132 	const dtuple_t*	search_tuple,
4133 				/*!< in: search tuple for entries of page_no */
4134 	ulint		mode,	/*!< in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE */
4135 	btr_pcur_t*	pcur,	/*!< in/out: persistent cursor whose
4136 				position is to be restored */
4137 	mtr_t*		mtr)	/*!< in/out: mini-transaction */
4138 {
4139 	ut_ad(mode == BTR_MODIFY_LEAF
4140 	      || BTR_LATCH_MODE_WITHOUT_INTENTION(mode) == BTR_MODIFY_TREE);
4141 
4142 	if (btr_pcur_restore_position(mode, pcur, mtr)) {
4143 
4144 		return(TRUE);
4145 	}
4146 
4147 	if (fil_space_t* s = fil_space_acquire_silent(space)) {
4148 		ib::error() << "ibuf cursor restoration fails!"
4149 			" ibuf record inserted to page "
4150 			<< space << ":" << page_no
4151 			<< " in file " << s->chain.start->name;
4152 		s->release();
4153 
4154 		ib::error() << BUG_REPORT_MSG;
4155 
4156 		rec_print_old(stderr, btr_pcur_get_rec(pcur));
4157 		rec_print_old(stderr, pcur->old_rec);
4158 		dtuple_print(stderr, search_tuple);
4159 
4160 		rec_print_old(stderr,
4161 			      page_rec_get_next(btr_pcur_get_rec(pcur)));
4162 	}
4163 
4164 	ibuf_btr_pcur_commit_specify_mtr(pcur, mtr);
4165 	return(FALSE);
4166 }
4167 
4168 /*********************************************************************//**
4169 Deletes from ibuf the record on which pcur is positioned. If we have to
4170 resort to a pessimistic delete, this function commits mtr and closes
4171 the cursor.
4172 @return TRUE if mtr was committed and pcur closed in this operation */
4173 static MY_ATTRIBUTE((warn_unused_result))
4174 ibool
ibuf_delete_rec(ulint space,ulint page_no,btr_pcur_t * pcur,const dtuple_t * search_tuple,mtr_t * mtr)4175 ibuf_delete_rec(
4176 /*============*/
4177 	ulint		space,	/*!< in: space id */
4178 	ulint		page_no,/*!< in: index page number that the record
4179 				should belong to */
4180 	btr_pcur_t*	pcur,	/*!< in: pcur positioned on the record to
4181 				delete, having latch mode BTR_MODIFY_LEAF */
4182 	const dtuple_t*	search_tuple,
4183 				/*!< in: search tuple for entries of page_no */
4184 	mtr_t*		mtr)	/*!< in: mtr */
4185 {
4186 	ibool		success;
4187 	page_t*		root;
4188 	dberr_t		err;
4189 
4190 	ut_ad(ibuf_inside(mtr));
4191 	ut_ad(page_rec_is_user_rec(btr_pcur_get_rec(pcur)));
4192 	ut_ad(ibuf_rec_get_page_no(mtr, btr_pcur_get_rec(pcur)) == page_no);
4193 	ut_ad(ibuf_rec_get_space(mtr, btr_pcur_get_rec(pcur)) == space);
4194 
4195 	success = btr_cur_optimistic_delete(btr_pcur_get_btr_cur(pcur),
4196 					    0, mtr);
4197 
4198 	const page_id_t	page_id(space, page_no);
4199 
4200 	if (success) {
4201 		if (page_is_empty(btr_pcur_get_page(pcur))) {
4202 			/* If a B-tree page is empty, it must be the root page
4203 			and the whole B-tree must be empty. InnoDB does not
4204 			allow empty B-tree pages other than the root. */
4205 			root = btr_pcur_get_page(pcur);
4206 
4207 			ut_ad(page_get_space_id(root) == IBUF_SPACE_ID);
4208 			ut_ad(page_get_page_no(root)
4209 			      == FSP_IBUF_TREE_ROOT_PAGE_NO);
4210 
4211 			/* ibuf->empty is protected by the root page latch.
4212 			Before the deletion, it had to be FALSE. */
4213 			ut_ad(!ibuf->empty);
4214 			ibuf->empty = true;
4215 		}
4216 
4217 		return(FALSE);
4218 	}
4219 
4220 	ut_ad(page_rec_is_user_rec(btr_pcur_get_rec(pcur)));
4221 	ut_ad(ibuf_rec_get_page_no(mtr, btr_pcur_get_rec(pcur)) == page_no);
4222 	ut_ad(ibuf_rec_get_space(mtr, btr_pcur_get_rec(pcur)) == space);
4223 
4224 	/* We have to resort to a pessimistic delete from ibuf.
4225 	Delete-mark the record so that it will not be applied again,
4226 	in case the server crashes before the pessimistic delete is
4227 	made persistent. */
4228 	btr_cur_set_deleted_flag_for_ibuf(
4229 		btr_pcur_get_rec(pcur), NULL, TRUE, mtr);
4230 
4231 	btr_pcur_store_position(pcur, mtr);
4232 	ibuf_btr_pcur_commit_specify_mtr(pcur, mtr);
4233 
4234 	ibuf_mtr_start(mtr);
4235 	mutex_enter(&ibuf_mutex);
4236 
4237 	if (!ibuf_restore_pos(space, page_no, search_tuple,
4238 			      BTR_MODIFY_TREE | BTR_LATCH_FOR_DELETE,
4239 			      pcur, mtr)) {
4240 
4241 		mutex_exit(&ibuf_mutex);
4242 		ut_ad(mtr->has_committed());
4243 		goto func_exit;
4244 	}
4245 
4246 	root = ibuf_tree_root_get(mtr);
4247 
4248 	btr_cur_pessimistic_delete(&err, TRUE, btr_pcur_get_btr_cur(pcur), 0,
4249 				   false, mtr);
4250 	ut_a(err == DB_SUCCESS);
4251 
4252 	ibuf_size_update(root);
4253 	mutex_exit(&ibuf_mutex);
4254 
4255 	ibuf->empty = page_is_empty(root);
4256 	ibuf_btr_pcur_commit_specify_mtr(pcur, mtr);
4257 
4258 func_exit:
4259 	ut_ad(mtr->has_committed());
4260 	btr_pcur_close(pcur);
4261 
4262 	return(TRUE);
4263 }
4264 
4265 /**
4266 Delete any buffered entries for a page.
4267 This prevents an infinite loop on slow shutdown
4268 in the case where the change buffer bitmap claims that no buffered
4269 changes exist, while entries exist in the change buffer tree.
4270 @param page_id  page number for which there should be no unbuffered changes */
ibuf_delete_recs(const page_id_t page_id)4271 ATTRIBUTE_COLD void ibuf_delete_recs(const page_id_t page_id)
4272 {
4273 	ulint dops[IBUF_OP_COUNT];
4274 	mtr_t mtr;
4275 	btr_pcur_t pcur;
4276 	mem_heap_t* heap = mem_heap_create(512);
4277 	const dtuple_t* tuple = ibuf_search_tuple_build(
4278 		page_id.space(), page_id.page_no(), heap);
4279 	memset(dops, 0, sizeof(dops));
4280 
4281 loop:
4282 	ibuf_mtr_start(&mtr);
4283 	btr_pcur_open(ibuf->index, tuple, PAGE_CUR_GE, BTR_MODIFY_LEAF,
4284 		      &pcur, &mtr);
4285 
4286 	if (!btr_pcur_is_on_user_rec(&pcur)) {
4287 		ut_ad(btr_pcur_is_after_last_in_tree(&pcur));
4288 		goto func_exit;
4289 	}
4290 
4291 	for (;;) {
4292 		ut_ad(btr_pcur_is_on_user_rec(&pcur));
4293 
4294 		const rec_t* ibuf_rec = btr_pcur_get_rec(&pcur);
4295 
4296 		if (ibuf_rec_get_space(&mtr, ibuf_rec)
4297 		    != page_id.space()
4298 		    || ibuf_rec_get_page_no(&mtr, ibuf_rec)
4299 		    != page_id.page_no()) {
4300 			break;
4301 		}
4302 
4303 		dops[ibuf_rec_get_op_type(&mtr, ibuf_rec)]++;
4304 
4305 		/* Delete the record from ibuf */
4306 		if (ibuf_delete_rec(page_id.space(), page_id.page_no(),
4307 				    &pcur, tuple, &mtr)) {
4308 			/* Deletion was pessimistic and mtr was committed:
4309 			we start from the beginning again */
4310 			ut_ad(mtr.has_committed());
4311 			goto loop;
4312 		}
4313 
4314 		if (btr_pcur_is_after_last_on_page(&pcur)) {
4315 			ibuf_mtr_commit(&mtr);
4316 			btr_pcur_close(&pcur);
4317 			goto loop;
4318 		}
4319 	}
4320 
4321 func_exit:
4322 	ibuf_mtr_commit(&mtr);
4323 	btr_pcur_close(&pcur);
4324 
4325 	ibuf_add_ops(ibuf->n_discarded_ops, dops);
4326 
4327 	mem_heap_free(heap);
4328 }
4329 
4330 /** When an index page is read from a disk to the buffer pool, this function
4331 applies any buffered operations to the page and deletes the entries from the
4332 insert buffer. If the page is not read, but created in the buffer pool, this
4333 function deletes its buffered entries from the insert buffer; there can
4334 exist entries for such a page if the page belonged to an index which
4335 subsequently was dropped.
4336 @param block    X-latched page to try to apply changes to, or NULL to discard
4337 @param page_id  page identifier
4338 @param zip_size ROW_FORMAT=COMPRESSED page size, or 0 */
ibuf_merge_or_delete_for_page(buf_block_t * block,const page_id_t page_id,ulint zip_size)4339 void ibuf_merge_or_delete_for_page(buf_block_t *block, const page_id_t page_id,
4340                                    ulint zip_size)
4341 {
4342 	btr_pcur_t	pcur;
4343 #ifdef UNIV_IBUF_DEBUG
4344 	ulint		volume			= 0;
4345 #endif /* UNIV_IBUF_DEBUG */
4346 	page_zip_des_t*	page_zip		= NULL;
4347 	bool		corruption_noticed	= false;
4348 	mtr_t		mtr;
4349 
4350 	/* Counts for merged & discarded operations. */
4351 	ulint		mops[IBUF_OP_COUNT];
4352 	ulint		dops[IBUF_OP_COUNT];
4353 
4354 	ut_ad(block == NULL || page_id == block->page.id);
4355 	ut_ad(block == NULL || buf_block_get_io_fix(block) == BUF_IO_READ
4356 	      || recv_recovery_is_on());
4357 
4358 	if (srv_force_recovery >= SRV_FORCE_NO_IBUF_MERGE
4359 	    || trx_sys_hdr_page(page_id)
4360 	    || fsp_is_system_temporary(page_id.space())) {
4361 		return;
4362 	}
4363 
4364 	const ulint physical_size = zip_size ? zip_size : srv_page_size;
4365 
4366 	if (ibuf_fixed_addr_page(page_id, physical_size)
4367 	    || fsp_descr_page(page_id, physical_size)) {
4368 		return;
4369 	}
4370 
4371 	fil_space_t* space = fil_space_acquire_silent(page_id.space());
4372 
4373 	if (UNIV_UNLIKELY(!space)) {
4374 		block = NULL;
4375 	} else {
4376 		ulint	bitmap_bits = 0;
4377 
4378 		ibuf_mtr_start(&mtr);
4379 
4380 		page_t* bitmap_page = ibuf_bitmap_get_map_page(
4381 			page_id, zip_size, &mtr);
4382 
4383 		if (bitmap_page &&
4384 		    fil_page_get_type(bitmap_page) != FIL_PAGE_TYPE_ALLOCATED) {
4385 			bitmap_bits = ibuf_bitmap_page_get_bits(
4386 				bitmap_page, page_id, zip_size,
4387 				IBUF_BITMAP_BUFFERED, &mtr);
4388 		}
4389 
4390 		ibuf_mtr_commit(&mtr);
4391 
4392 		if (!bitmap_bits) {
4393 			/* No changes are buffered for this page. */
4394 			space->release();
4395 			if (UNIV_UNLIKELY(srv_shutdown_state)
4396 			    && !srv_fast_shutdown
4397 			    && (!block
4398 				|| btr_page_get_index_id(block->frame)
4399 				!= DICT_IBUF_ID_MIN + IBUF_SPACE_ID)) {
4400 				/* Prevent an infinite loop on slow
4401 				shutdown, in case the bitmap bits are
4402 				wrongly clear even though buffered
4403 				changes exist. */
4404 				ibuf_delete_recs(page_id);
4405 			}
4406 			return;
4407 		}
4408 	}
4409 
4410 	mem_heap_t* heap = mem_heap_create(512);
4411 
4412 	const dtuple_t* search_tuple = ibuf_search_tuple_build(
4413 		page_id.space(), page_id.page_no(), heap);
4414 
4415 	if (block != NULL) {
4416 		/* Move the ownership of the x-latch on the page to this OS
4417 		thread, so that we can acquire a second x-latch on it. This
4418 		is needed for the insert operations to the index page to pass
4419 		the debug checks. */
4420 
4421 		rw_lock_x_lock_move_ownership(&(block->lock));
4422 		page_zip = buf_block_get_page_zip(block);
4423 
4424 		if (!fil_page_index_page_check(block->frame)
4425 		    || !page_is_leaf(block->frame)) {
4426 
4427 			corruption_noticed = true;
4428 
4429 			ib::error() << "Corruption in the tablespace. Bitmap"
4430 				" shows insert buffer records to page "
4431 				<< page_id << " though the page type is "
4432 				<< fil_page_get_type(block->frame)
4433 				<< ", which is not an index leaf page. We try"
4434 				" to resolve the problem by skipping the"
4435 				" insert buffer merge for this page. Please"
4436 				" run CHECK TABLE on your tables to determine"
4437 				" if they are corrupt after this.";
4438 			ut_ad(0);
4439 		}
4440 	}
4441 
4442 	memset(mops, 0, sizeof(mops));
4443 	memset(dops, 0, sizeof(dops));
4444 
4445 loop:
4446 	ibuf_mtr_start(&mtr);
4447 
4448 	/* Position pcur in the insert buffer at the first entry for this
4449 	index page */
4450 	btr_pcur_open_on_user_rec(
4451 		ibuf->index, search_tuple, PAGE_CUR_GE, BTR_MODIFY_LEAF,
4452 		&pcur, &mtr);
4453 
4454 	if (block) {
4455 		ibool success = buf_page_get_known_nowait(
4456 			RW_X_LATCH, block,
4457 			BUF_KEEP_OLD, __FILE__, __LINE__, &mtr);
4458 
4459 		ut_a(success);
4460 
4461 		/* This is a user page (secondary index leaf page),
4462 		but we pretend that it is a change buffer page in
4463 		order to obey the latching order. This should be OK,
4464 		because buffered changes are applied immediately while
4465 		the block is io-fixed. Other threads must not try to
4466 		latch an io-fixed block. */
4467 		buf_block_dbg_add_level(block, SYNC_IBUF_TREE_NODE);
4468 	}
4469 
4470 	if (space) {
4471 		mtr.set_named_space(space);
4472 	}
4473 
4474 	if (!btr_pcur_is_on_user_rec(&pcur)) {
4475 		ut_ad(btr_pcur_is_after_last_on_page(&pcur));
4476 		goto reset_bit;
4477 	}
4478 
4479 	for (;;) {
4480 		rec_t*	rec;
4481 
4482 		ut_ad(btr_pcur_is_on_user_rec(&pcur));
4483 
4484 		rec = btr_pcur_get_rec(&pcur);
4485 
4486 		/* Check if the entry is for this index page */
4487 		if (ibuf_rec_get_page_no(&mtr, rec) != page_id.page_no()
4488 		    || ibuf_rec_get_space(&mtr, rec) != page_id.space()) {
4489 
4490 			if (block != NULL) {
4491 				page_header_reset_last_insert(
4492 					block->frame, page_zip, &mtr);
4493 			}
4494 
4495 			goto reset_bit;
4496 		}
4497 
4498 		if (corruption_noticed) {
4499 			fputs("InnoDB: Discarding record\n ", stderr);
4500 			rec_print_old(stderr, rec);
4501 			fputs("\nInnoDB: from the insert buffer!\n\n", stderr);
4502 		} else if (block != NULL && !rec_get_deleted_flag(rec, 0)) {
4503 			/* Now we have at pcur a record which should be
4504 			applied on the index page; NOTE that the call below
4505 			copies pointers to fields in rec, and we must
4506 			keep the latch to the rec page until the
4507 			insertion is finished! */
4508 			dtuple_t*	entry;
4509 			trx_id_t	max_trx_id;
4510 			dict_index_t*	dummy_index;
4511 			ibuf_op_t	op = ibuf_rec_get_op_type(&mtr, rec);
4512 
4513 			max_trx_id = page_get_max_trx_id(page_align(rec));
4514 			page_update_max_trx_id(block, page_zip, max_trx_id,
4515 					       &mtr);
4516 
4517 			ut_ad(page_validate(page_align(rec), ibuf->index));
4518 
4519 			entry = ibuf_build_entry_from_ibuf_rec(
4520 				&mtr, rec, heap, &dummy_index);
4521 			ut_ad(!dummy_index->table->space);
4522 			dummy_index->table->space = space;
4523 			dummy_index->table->space_id = space->id;
4524 
4525 			ut_ad(page_validate(block->frame, dummy_index));
4526 
4527 			switch (op) {
4528 				ibool	success;
4529 			case IBUF_OP_INSERT:
4530 #ifdef UNIV_IBUF_DEBUG
4531 				volume += rec_get_converted_size(
4532 					dummy_index, entry, 0);
4533 
4534 				volume += page_dir_calc_reserved_space(1);
4535 
4536 				ut_a(volume <= (4U << srv_page_size_shift)
4537 				     / IBUF_PAGE_SIZE_PER_FREE_SPACE);
4538 #endif
4539 				ibuf_insert_to_index_page(
4540 					entry, block, dummy_index, &mtr);
4541 				break;
4542 
4543 			case IBUF_OP_DELETE_MARK:
4544 				ibuf_set_del_mark(
4545 					entry, block, dummy_index, &mtr);
4546 				break;
4547 
4548 			case IBUF_OP_DELETE:
4549 				ibuf_delete(entry, block, dummy_index, &mtr);
4550 				/* Because ibuf_delete() will latch an
4551 				insert buffer bitmap page, commit mtr
4552 				before latching any further pages.
4553 				Store and restore the cursor position. */
4554 				ut_ad(rec == btr_pcur_get_rec(&pcur));
4555 				ut_ad(page_rec_is_user_rec(rec));
4556 				ut_ad(ibuf_rec_get_page_no(&mtr, rec)
4557 				      == page_id.page_no());
4558 				ut_ad(ibuf_rec_get_space(&mtr, rec)
4559 				      == page_id.space());
4560 
4561 				/* Mark the change buffer record processed,
4562 				so that it will not be merged again in case
4563 				the server crashes between the following
4564 				mtr_commit() and the subsequent mtr_commit()
4565 				of deleting the change buffer record. */
4566 
4567 				btr_cur_set_deleted_flag_for_ibuf(
4568 					btr_pcur_get_rec(&pcur), NULL,
4569 					TRUE, &mtr);
4570 
4571 				btr_pcur_store_position(&pcur, &mtr);
4572 				ibuf_btr_pcur_commit_specify_mtr(&pcur, &mtr);
4573 
4574 				ibuf_mtr_start(&mtr);
4575 				mtr.set_named_space(space);
4576 
4577 				success = buf_page_get_known_nowait(
4578 					RW_X_LATCH, block,
4579 					BUF_KEEP_OLD,
4580 					__FILE__, __LINE__, &mtr);
4581 				ut_a(success);
4582 
4583 				/* This is a user page (secondary
4584 				index leaf page), but it should be OK
4585 				to use too low latching order for it,
4586 				as the block is io-fixed. */
4587 				buf_block_dbg_add_level(
4588 					block, SYNC_IBUF_TREE_NODE);
4589 
4590 				if (!ibuf_restore_pos(page_id.space(),
4591 						      page_id.page_no(),
4592 						      search_tuple,
4593 						      BTR_MODIFY_LEAF,
4594 						      &pcur, &mtr)) {
4595 
4596 					ut_ad(mtr.has_committed());
4597 					mops[op]++;
4598 					ibuf_dummy_index_free(dummy_index);
4599 					goto loop;
4600 				}
4601 
4602 				break;
4603 			default:
4604 				ut_error;
4605 			}
4606 
4607 			mops[op]++;
4608 
4609 			ibuf_dummy_index_free(dummy_index);
4610 		} else {
4611 			dops[ibuf_rec_get_op_type(&mtr, rec)]++;
4612 		}
4613 
4614 		/* Delete the record from ibuf */
4615 		if (ibuf_delete_rec(page_id.space(), page_id.page_no(),
4616 				    &pcur, search_tuple, &mtr)) {
4617 			/* Deletion was pessimistic and mtr was committed:
4618 			we start from the beginning again */
4619 
4620 			ut_ad(mtr.has_committed());
4621 			goto loop;
4622 		} else if (btr_pcur_is_after_last_on_page(&pcur)) {
4623 			ibuf_mtr_commit(&mtr);
4624 			btr_pcur_close(&pcur);
4625 
4626 			goto loop;
4627 		}
4628 	}
4629 
4630 reset_bit:
4631 	if (space) {
4632 		page_t*	bitmap_page;
4633 
4634 		bitmap_page = ibuf_bitmap_get_map_page(page_id, zip_size,
4635 						       &mtr);
4636 
4637 		ibuf_bitmap_page_set_bits(
4638 			bitmap_page, page_id, physical_size,
4639 			IBUF_BITMAP_BUFFERED, FALSE, &mtr);
4640 
4641 		if (block != NULL) {
4642 			ulint old_bits = ibuf_bitmap_page_get_bits(
4643 				bitmap_page, page_id, zip_size,
4644 				IBUF_BITMAP_FREE, &mtr);
4645 
4646 			ulint new_bits = ibuf_index_page_calc_free(block);
4647 
4648 			if (old_bits != new_bits) {
4649 				ibuf_bitmap_page_set_bits(
4650 					bitmap_page, page_id, physical_size,
4651 					IBUF_BITMAP_FREE, new_bits, &mtr);
4652 			}
4653 		}
4654 	}
4655 
4656 	ibuf_mtr_commit(&mtr);
4657 
4658 	if (space) {
4659 		space->release();
4660 	}
4661 
4662 	btr_pcur_close(&pcur);
4663 	mem_heap_free(heap);
4664 
4665 	ibuf->n_merges++;
4666 	ibuf_add_ops(ibuf->n_merged_ops, mops);
4667 	ibuf_add_ops(ibuf->n_discarded_ops, dops);
4668 }
4669 
4670 /** Delete all change buffer entries for a tablespace,
4671 in DISCARD TABLESPACE, IMPORT TABLESPACE, or crash recovery.
4672 @param[in]	space		missing or to-be-discarded tablespace */
ibuf_delete_for_discarded_space(ulint space)4673 void ibuf_delete_for_discarded_space(ulint space)
4674 {
4675 	mem_heap_t*	heap;
4676 	btr_pcur_t	pcur;
4677 	dtuple_t*	search_tuple;
4678 	const rec_t*	ibuf_rec;
4679 	ulint		page_no;
4680 	mtr_t		mtr;
4681 
4682 	/* Counts for discarded operations. */
4683 	ulint		dops[IBUF_OP_COUNT];
4684 
4685 	heap = mem_heap_create(512);
4686 
4687 	/* Use page number 0 to build the search tuple so that we get the
4688 	cursor positioned at the first entry for this space id */
4689 
4690 	search_tuple = ibuf_search_tuple_build(space, 0, heap);
4691 
4692 	memset(dops, 0, sizeof(dops));
4693 loop:
4694 	ibuf_mtr_start(&mtr);
4695 
4696 	/* Position pcur in the insert buffer at the first entry for the
4697 	space */
4698 	btr_pcur_open_on_user_rec(
4699 		ibuf->index, search_tuple, PAGE_CUR_GE, BTR_MODIFY_LEAF,
4700 		&pcur, &mtr);
4701 
4702 	if (!btr_pcur_is_on_user_rec(&pcur)) {
4703 		ut_ad(btr_pcur_is_after_last_on_page(&pcur));
4704 		goto leave_loop;
4705 	}
4706 
4707 	for (;;) {
4708 		ut_ad(btr_pcur_is_on_user_rec(&pcur));
4709 
4710 		ibuf_rec = btr_pcur_get_rec(&pcur);
4711 
4712 		/* Check if the entry is for this space */
4713 		if (ibuf_rec_get_space(&mtr, ibuf_rec) != space) {
4714 
4715 			goto leave_loop;
4716 		}
4717 
4718 		page_no = ibuf_rec_get_page_no(&mtr, ibuf_rec);
4719 
4720 		dops[ibuf_rec_get_op_type(&mtr, ibuf_rec)]++;
4721 
4722 		/* Delete the record from ibuf */
4723 		if (ibuf_delete_rec(space, page_no, &pcur, search_tuple,
4724 				    &mtr)) {
4725 			/* Deletion was pessimistic and mtr was committed:
4726 			we start from the beginning again */
4727 
4728 			ut_ad(mtr.has_committed());
4729 			goto loop;
4730 		}
4731 
4732 		if (btr_pcur_is_after_last_on_page(&pcur)) {
4733 			ibuf_mtr_commit(&mtr);
4734 			btr_pcur_close(&pcur);
4735 
4736 			goto loop;
4737 		}
4738 	}
4739 
4740 leave_loop:
4741 	ibuf_mtr_commit(&mtr);
4742 	btr_pcur_close(&pcur);
4743 
4744 	ibuf_add_ops(ibuf->n_discarded_ops, dops);
4745 
4746 	mem_heap_free(heap);
4747 }
4748 
4749 /******************************************************************//**
4750 Looks if the insert buffer is empty.
4751 @return true if empty */
4752 bool
ibuf_is_empty(void)4753 ibuf_is_empty(void)
4754 /*===============*/
4755 {
4756 	bool		is_empty;
4757 	const page_t*	root;
4758 	mtr_t		mtr;
4759 
4760 	ibuf_mtr_start(&mtr);
4761 
4762 	mutex_enter(&ibuf_mutex);
4763 	root = ibuf_tree_root_get(&mtr);
4764 	mutex_exit(&ibuf_mutex);
4765 
4766 	is_empty = page_is_empty(root);
4767 	ut_a(is_empty == ibuf->empty);
4768 	ibuf_mtr_commit(&mtr);
4769 
4770 	return(is_empty);
4771 }
4772 
4773 /******************************************************************//**
4774 Prints info of ibuf. */
4775 void
ibuf_print(FILE * file)4776 ibuf_print(
4777 /*=======*/
4778 	FILE*	file)	/*!< in: file where to print */
4779 {
4780 	mutex_enter(&ibuf_mutex);
4781 
4782 	fprintf(file,
4783 		"Ibuf: size " ULINTPF ", free list len " ULINTPF ","
4784 		" seg size " ULINTPF ", " ULINTPF " merges\n",
4785 		ibuf->size,
4786 		ibuf->free_list_len,
4787 		ibuf->seg_size,
4788 		ulint{ibuf->n_merges});
4789 
4790 	fputs("merged operations:\n ", file);
4791 	ibuf_print_ops(ibuf->n_merged_ops, file);
4792 
4793 	fputs("discarded operations:\n ", file);
4794 	ibuf_print_ops(ibuf->n_discarded_ops, file);
4795 
4796 	mutex_exit(&ibuf_mutex);
4797 }
4798 
4799 /** Check the insert buffer bitmaps on IMPORT TABLESPACE.
4800 @param[in]	trx	transaction
4801 @param[in,out]	space	tablespace being imported
4802 @return DB_SUCCESS or error code */
ibuf_check_bitmap_on_import(const trx_t * trx,fil_space_t * space)4803 dberr_t ibuf_check_bitmap_on_import(const trx_t* trx, fil_space_t* space)
4804 {
4805 	ulint	page_no;
4806 	ut_ad(trx->mysql_thd);
4807 	ut_ad(space->purpose == FIL_TYPE_IMPORT);
4808 
4809 	const ulint zip_size = space->zip_size();
4810 	const ulint physical_size = space->physical_size();
4811 	/* fil_space_t::size and fil_space_t::free_limit would still be 0
4812 	at this point. So, we will have to read page 0. */
4813 	ut_ad(!space->free_limit);
4814 	ut_ad(!space->size);
4815 
4816 	mtr_t	mtr;
4817 	ulint	size;
4818 	mtr.start();
4819 	if (buf_block_t* sp = buf_page_get(page_id_t(space->id, 0),
4820 					   zip_size,
4821 					   RW_S_LATCH, &mtr)) {
4822 		size = std::min(
4823 			mach_read_from_4(FSP_HEADER_OFFSET + FSP_FREE_LIMIT
4824 					 + sp->frame),
4825 			mach_read_from_4(FSP_HEADER_OFFSET + FSP_SIZE
4826 					 + sp->frame));
4827 	} else {
4828 		size = 0;
4829 	}
4830 	mtr.commit();
4831 
4832 	if (size == 0) {
4833 		return(DB_TABLE_NOT_FOUND);
4834 	}
4835 
4836 	mutex_enter(&ibuf_mutex);
4837 
4838 	/* The two bitmap pages (allocation bitmap and ibuf bitmap) repeat
4839 	every page_size pages. For example if page_size is 16 KiB, then the
4840 	two bitmap pages repeat every 16 KiB * 16384 = 256 MiB. In the loop
4841 	below page_no is measured in number of pages since the beginning of
4842 	the space, as usual. */
4843 
4844 	for (page_no = 0; page_no < size; page_no += physical_size) {
4845 		page_t*	bitmap_page;
4846 		ulint	i;
4847 
4848 		if (trx_is_interrupted(trx)) {
4849 			mutex_exit(&ibuf_mutex);
4850 			return(DB_INTERRUPTED);
4851 		}
4852 
4853 		mtr_start(&mtr);
4854 
4855 		mtr_set_log_mode(&mtr, MTR_LOG_NO_REDO);
4856 
4857 		ibuf_enter(&mtr);
4858 
4859 		bitmap_page = ibuf_bitmap_get_map_page(
4860 			page_id_t(space->id, page_no), zip_size, &mtr);
4861 
4862 		if (!bitmap_page) {
4863 			mutex_exit(&ibuf_mutex);
4864 			ibuf_exit(&mtr);
4865 			mtr_commit(&mtr);
4866 			return DB_CORRUPTION;
4867 		}
4868 
4869 		if (buf_is_zeroes(span<const byte>(bitmap_page,
4870 						   physical_size))) {
4871 			/* This means we got all-zero page instead of
4872 			ibuf bitmap page. The subsequent page should be
4873 			all-zero pages. */
4874 #ifdef UNIV_DEBUG
4875 			for (ulint curr_page = page_no + 1;
4876 			     curr_page < physical_size; curr_page++) {
4877 
4878 				buf_block_t* block = buf_page_get(
4879 					page_id_t(space->id, curr_page),
4880 					zip_size, RW_S_LATCH, &mtr);
4881 	                        page_t*	page = buf_block_get_frame(block);
4882 				ut_ad(buf_is_zeroes(span<const byte>(
4883 							    page,
4884 							    physical_size)));
4885 			}
4886 #endif /* UNIV_DEBUG */
4887 			ibuf_exit(&mtr);
4888 			mtr_commit(&mtr);
4889 			continue;
4890 		}
4891 
4892 		for (i = FSP_IBUF_BITMAP_OFFSET + 1; i < physical_size; i++) {
4893 			const ulint	offset = page_no + i;
4894 			const page_id_t	cur_page_id(space->id, offset);
4895 
4896 			if (ibuf_bitmap_page_get_bits(
4897 				    bitmap_page, cur_page_id, zip_size,
4898 				    IBUF_BITMAP_IBUF, &mtr)) {
4899 
4900 				mutex_exit(&ibuf_mutex);
4901 				ibuf_exit(&mtr);
4902 				mtr_commit(&mtr);
4903 
4904 				ib_errf(trx->mysql_thd,
4905 					IB_LOG_LEVEL_ERROR,
4906 					 ER_INNODB_INDEX_CORRUPT,
4907 					 "File %s page " ULINTPF
4908 					 " is wrongly flagged to belong to the"
4909 					 " insert buffer",
4910 					space->chain.start->name, offset);
4911 				return(DB_CORRUPTION);
4912 			}
4913 
4914 			if (ibuf_bitmap_page_get_bits(
4915 				    bitmap_page, cur_page_id, zip_size,
4916 				    IBUF_BITMAP_BUFFERED, &mtr)) {
4917 
4918 				ib_errf(trx->mysql_thd,
4919 					IB_LOG_LEVEL_WARN,
4920 					ER_INNODB_INDEX_CORRUPT,
4921 					"Buffered changes"
4922 					" for file %s page " ULINTPF
4923 					" are lost",
4924 					space->chain.start->name, offset);
4925 
4926 				/* Tolerate this error, so that
4927 				slightly corrupted tables can be
4928 				imported and dumped.  Clear the bit. */
4929 				ibuf_bitmap_page_set_bits(
4930 					bitmap_page, cur_page_id,
4931 					physical_size,
4932 					IBUF_BITMAP_BUFFERED, FALSE, &mtr);
4933 			}
4934 		}
4935 
4936 		ibuf_exit(&mtr);
4937 		mtr_commit(&mtr);
4938 	}
4939 
4940 	mutex_exit(&ibuf_mutex);
4941 	return(DB_SUCCESS);
4942 }
4943 
4944 /** Updates free bits and buffered bits for bulk loaded page.
4945 @param[in]	block	index page
4946 @param[in]	reset	flag if reset free val */
4947 void
ibuf_set_bitmap_for_bulk_load(buf_block_t * block,bool reset)4948 ibuf_set_bitmap_for_bulk_load(
4949 	buf_block_t*	block,
4950 	bool		reset)
4951 {
4952 	page_t*	bitmap_page;
4953 	mtr_t	mtr;
4954 	ulint	free_val;
4955 
4956 	ut_a(page_is_leaf(buf_block_get_frame(block)));
4957 
4958 	free_val = ibuf_index_page_calc_free(block);
4959 
4960 	mtr_start(&mtr);
4961 	fil_space_t* space = mtr.set_named_space_id(block->page.id.space());
4962 
4963 	bitmap_page = ibuf_bitmap_get_map_page(block->page.id,
4964                                                space->zip_size(), &mtr);
4965 
4966 	free_val = reset ? 0 : ibuf_index_page_calc_free(block);
4967 	ibuf_bitmap_page_set_bits(
4968 		bitmap_page, block->page.id, block->physical_size(),
4969 		IBUF_BITMAP_FREE, free_val, &mtr);
4970 
4971 	ibuf_bitmap_page_set_bits(
4972 		bitmap_page, block->page.id, block->physical_size(),
4973 		IBUF_BITMAP_BUFFERED, FALSE, &mtr);
4974 
4975 	mtr_commit(&mtr);
4976 }
4977