1 /*****************************************************************************
2 
3 Copyright (c) 1997, 2021, Oracle and/or its affiliates.
4 
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License, version 2.0,
7 as published by the Free Software Foundation.
8 
9 This program is also distributed with certain software (including
10 but not limited to OpenSSL) that is licensed under separate terms,
11 as designated in a particular file or component or in included license
12 documentation.  The authors of MySQL hereby grant you an additional
13 permission to link the program and your derivative works with the
14 separately licensed software that they have included with MySQL.
15 
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 GNU General Public License, version 2.0, for more details.
20 
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
24 
25 *****************************************************************************/
26 
27 /**************************************************//**
28 @file ibuf/ibuf0ibuf.cc
29 Insert buffer
30 
31 Created 7/19/1997 Heikki Tuuri
32 *******************************************************/
33 
34 #include "ha_prototypes.h"
35 
36 #include "ibuf0ibuf.h"
37 #include "sync0sync.h"
38 #include "btr0sea.h"
39 
40 #if defined UNIV_DEBUG || defined UNIV_IBUF_DEBUG
41 my_bool	srv_ibuf_disable_background_merge;
42 #endif /* UNIV_DEBUG || UNIV_IBUF_DEBUG */
43 
44 /** Number of bits describing a single page */
45 #define IBUF_BITS_PER_PAGE	4
46 #if IBUF_BITS_PER_PAGE % 2
47 # error "IBUF_BITS_PER_PAGE must be an even number!"
48 #endif
49 /** The start address for an insert buffer bitmap page bitmap */
50 #define IBUF_BITMAP		PAGE_DATA
51 
52 #ifdef UNIV_NONINL
53 #include "ibuf0ibuf.ic"
54 #endif
55 
56 #ifndef UNIV_HOTBACKUP
57 
58 #include "buf0buf.h"
59 #include "buf0rea.h"
60 #include "fsp0fsp.h"
61 #include "trx0sys.h"
62 #include "fil0fil.h"
63 #include "rem0rec.h"
64 #include "btr0cur.h"
65 #include "btr0pcur.h"
66 #include "btr0btr.h"
67 #include "row0upd.h"
68 #include "dict0boot.h"
69 #include "fut0lst.h"
70 #include "lock0lock.h"
71 #include "log0recv.h"
72 #include "que0que.h"
73 #include "srv0start.h" /* srv_shutdown_state */
74 #include "fsp0sysspace.h"
75 #include "rem0cmp.h"
76 
77 /*	STRUCTURE OF AN INSERT BUFFER RECORD
78 
79 In versions < 4.1.x:
80 
81 1. The first field is the page number.
82 2. The second field is an array which stores type info for each subsequent
83    field. We store the information which affects the ordering of records, and
84    also the physical storage size of an SQL NULL value. E.g., for CHAR(10) it
85    is 10 bytes.
86 3. Next we have the fields of the actual index record.
87 
88 In versions >= 4.1.x:
89 
90 Note that contary to what we planned in the 1990's, there will only be one
91 insert buffer tree, and that is in the system tablespace of InnoDB.
92 
93 1. The first field is the space id.
94 2. The second field is a one-byte marker (0) which differentiates records from
95    the < 4.1.x storage format.
96 3. The third field is the page number.
97 4. The fourth field contains the type info, where we have also added 2 bytes to
98    store the charset. In the compressed table format of 5.0.x we must add more
99    information here so that we can build a dummy 'index' struct which 5.0.x
100    can use in the binary search on the index page in the ibuf merge phase.
101 5. The rest of the fields contain the fields of the actual index record.
102 
103 In versions >= 5.0.3:
104 
105 The first byte of the fourth field is an additional marker (0) if the record
106 is in the compact format.  The presence of this marker can be detected by
107 looking at the length of the field modulo DATA_NEW_ORDER_NULL_TYPE_BUF_SIZE.
108 
109 The high-order bit of the character set field in the type info is the
110 "nullable" flag for the field.
111 
112 In versions >= 5.5:
113 
114 The optional marker byte at the start of the fourth field is replaced by
115 mandatory 3 fields, totaling 4 bytes:
116 
117  1. 2 bytes: Counter field, used to sort records within a (space id, page
118     no) in the order they were added. This is needed so that for example the
119     sequence of operations "INSERT x, DEL MARK x, INSERT x" is handled
120     correctly.
121 
122  2. 1 byte: Operation type (see ibuf_op_t).
123 
124  3. 1 byte: Flags. Currently only one flag exists, IBUF_REC_COMPACT.
125 
126 To ensure older records, which do not have counters to enforce correct
127 sorting, are merged before any new records, ibuf_insert checks if we're
128 trying to insert to a position that contains old-style records, and if so,
129 refuses the insert. Thus, ibuf pages are gradually converted to the new
130 format as their corresponding buffer pool pages are read into memory.
131 */
132 
133 
134 /*	PREVENTING DEADLOCKS IN THE INSERT BUFFER SYSTEM
135 
136 If an OS thread performs any operation that brings in disk pages from
137 non-system tablespaces into the buffer pool, or creates such a page there,
138 then the operation may have as a side effect an insert buffer index tree
139 compression. Thus, the tree latch of the insert buffer tree may be acquired
140 in the x-mode, and also the file space latch of the system tablespace may
141 be acquired in the x-mode.
142 
143 Also, an insert to an index in a non-system tablespace can have the same
144 effect. How do we know this cannot lead to a deadlock of OS threads? There
145 is a problem with the i\o-handler threads: they break the latching order
146 because they own x-latches to pages which are on a lower level than the
147 insert buffer tree latch, its page latches, and the tablespace latch an
148 insert buffer operation can reserve.
149 
150 The solution is the following: Let all the tree and page latches connected
151 with the insert buffer be later in the latching order than the fsp latch and
152 fsp page latches.
153 
154 Insert buffer pages must be such that the insert buffer is never invoked
155 when these pages are accessed as this would result in a recursion violating
156 the latching order. We let a special i/o-handler thread take care of i/o to
157 the insert buffer pages and the ibuf bitmap pages, as well as the fsp bitmap
158 pages and the first inode page, which contains the inode of the ibuf tree: let
159 us call all these ibuf pages. To prevent deadlocks, we do not let a read-ahead
160 access both non-ibuf and ibuf pages.
161 
162 Then an i/o-handler for the insert buffer never needs to access recursively the
163 insert buffer tree and thus obeys the latching order. On the other hand, other
164 i/o-handlers for other tablespaces may require access to the insert buffer,
165 but because all kinds of latches they need to access there are later in the
166 latching order, no violation of the latching order occurs in this case,
167 either.
168 
169 A problem is how to grow and contract an insert buffer tree. As it is later
170 in the latching order than the fsp management, we have to reserve the fsp
171 latch first, before adding or removing pages from the insert buffer tree.
172 We let the insert buffer tree have its own file space management: a free
173 list of pages linked to the tree root. To prevent recursive using of the
174 insert buffer when adding pages to the tree, we must first load these pages
175 to memory, obtaining a latch on them, and only after that add them to the
176 free list of the insert buffer tree. More difficult is removing of pages
177 from the free list. If there is an excess of pages in the free list of the
178 ibuf tree, they might be needed if some thread reserves the fsp latch,
179 intending to allocate more file space. So we do the following: if a thread
180 reserves the fsp latch, we check the writer count field of the latch. If
181 this field has value 1, it means that the thread did not own the latch
182 before entering the fsp system, and the mtr of the thread contains no
183 modifications to the fsp pages. Now we are free to reserve the ibuf latch,
184 and check if there is an excess of pages in the free list. We can then, in a
185 separate mini-transaction, take them out of the free list and free them to
186 the fsp system.
187 
188 To avoid deadlocks in the ibuf system, we divide file pages into three levels:
189 
190 (1) non-ibuf pages,
191 (2) ibuf tree pages and the pages in the ibuf tree free list, and
192 (3) ibuf bitmap pages.
193 
194 No OS thread is allowed to access higher level pages if it has latches to
195 lower level pages; even if the thread owns a B-tree latch it must not access
196 the B-tree non-leaf pages if it has latches on lower level pages. Read-ahead
197 is only allowed for level 1 and 2 pages. Dedicated i/o-handler threads handle
198 exclusively level 1 i/o. A dedicated i/o handler thread handles exclusively
199 level 2 i/o. However, if an OS thread does the i/o handling for itself, i.e.,
200 it uses synchronous aio, it can access any pages, as long as it obeys the
201 access order rules. */
202 
203 /** Operations that can currently be buffered. */
204 ibuf_use_t	ibuf_use		= IBUF_USE_ALL;
205 
206 #if defined UNIV_DEBUG || defined UNIV_IBUF_DEBUG
207 /** Flag to control insert buffer debugging. */
208 uint	ibuf_debug;
209 #endif /* UNIV_DEBUG || UNIV_IBUF_DEBUG */
210 
211 /** The insert buffer control structure */
212 ibuf_t*	ibuf			= NULL;
213 
214 #ifdef UNIV_IBUF_COUNT_DEBUG
215 /** Number of tablespaces in the ibuf_counts array */
216 #define IBUF_COUNT_N_SPACES	4
217 /** Number of pages within each tablespace in the ibuf_counts array */
218 #define IBUF_COUNT_N_PAGES	130000
219 
220 /** Buffered entry counts for file pages, used in debugging */
221 static ulint	ibuf_counts[IBUF_COUNT_N_SPACES][IBUF_COUNT_N_PAGES];
222 
223 /** Checks that the indexes to ibuf_counts[][] are within limits.
224 @param[in]	page_id	page id */
225 UNIV_INLINE
226 void
ibuf_count_check(const page_id_t & page_id)227 ibuf_count_check(
228 	const page_id_t&	page_id)
229 {
230 	if (page_id.space() < IBUF_COUNT_N_SPACES
231 	    && page_id.page_no() < IBUF_COUNT_N_PAGES) {
232 		return;
233 	}
234 
235 	ib::fatal() << "UNIV_IBUF_COUNT_DEBUG limits space_id and page_no"
236 		" and breaks crash recovery. space_id=" << page_id.space()
237 		<< ", should be 0<=space_id<" << IBUF_COUNT_N_SPACES
238 		<< ". page_no=" << page_id.page_no()
239 		<< ", should be 0<=page_no<" << IBUF_COUNT_N_PAGES;
240 }
241 #endif
242 
243 /** @name Offsets to the per-page bits in the insert buffer bitmap */
244 /* @{ */
245 #define	IBUF_BITMAP_FREE	0	/*!< Bits indicating the
246 					amount of free space */
247 #define IBUF_BITMAP_BUFFERED	2	/*!< TRUE if there are buffered
248 					changes for the page */
249 #define IBUF_BITMAP_IBUF	3	/*!< TRUE if page is a part of
250 					the ibuf tree, excluding the
251 					root page, or is in the free
252 					list of the ibuf */
253 /* @} */
254 
255 #define IBUF_REC_FIELD_SPACE	0	/*!< in the pre-4.1 format,
256 					the page number. later, the space_id */
257 #define IBUF_REC_FIELD_MARKER	1	/*!< starting with 4.1, a marker
258 					consisting of 1 byte that is 0 */
259 #define IBUF_REC_FIELD_PAGE	2	/*!< starting with 4.1, the
260 					page number */
261 #define IBUF_REC_FIELD_METADATA	3	/* the metadata field */
262 #define IBUF_REC_FIELD_USER	4	/* first user field */
263 
264 /* Various constants for checking the type of an ibuf record and extracting
265 data from it. For details, see the description of the record format at the
266 top of this file. */
267 
268 /** @name Format of the IBUF_REC_FIELD_METADATA of an insert buffer record
269 The fourth column in the MySQL 5.5 format contains an operation
270 type, counter, and some flags. */
271 /* @{ */
272 #define IBUF_REC_INFO_SIZE	4	/*!< Combined size of info fields at
273 					the beginning of the fourth field */
274 #if IBUF_REC_INFO_SIZE >= DATA_NEW_ORDER_NULL_TYPE_BUF_SIZE
275 # error "IBUF_REC_INFO_SIZE >= DATA_NEW_ORDER_NULL_TYPE_BUF_SIZE"
276 #endif
277 
278 /* Offsets for the fields at the beginning of the fourth field */
279 #define IBUF_REC_OFFSET_COUNTER	0	/*!< Operation counter */
280 #define IBUF_REC_OFFSET_TYPE	2	/*!< Type of operation */
281 #define IBUF_REC_OFFSET_FLAGS	3	/*!< Additional flags */
282 
283 /* Record flag masks */
284 #define IBUF_REC_COMPACT	0x1	/*!< Set in
285 					IBUF_REC_OFFSET_FLAGS if the
286 					user index is in COMPACT
287 					format or later */
288 
289 
290 /** The mutex used to block pessimistic inserts to ibuf trees */
291 static ib_mutex_t	ibuf_pessimistic_insert_mutex;
292 
293 /** The mutex protecting the insert buffer structs */
294 static ib_mutex_t	ibuf_mutex;
295 
296 /** The mutex protecting the insert buffer bitmaps */
297 static ib_mutex_t	ibuf_bitmap_mutex;
298 
299 /** The area in pages from which contract looks for page numbers for merge */
300 const ulint		IBUF_MERGE_AREA = 8;
301 
302 /** Inside the merge area, pages which have at most 1 per this number less
303 buffered entries compared to maximum volume that can buffered for a single
304 page are merged along with the page whose buffer became full */
305 const ulint		IBUF_MERGE_THRESHOLD = 4;
306 
307 /** In ibuf_contract at most this number of pages is read to memory in one
308 batch, in order to merge the entries for them in the insert buffer */
309 const ulint		IBUF_MAX_N_PAGES_MERGED = IBUF_MERGE_AREA;
310 
311 /** If the combined size of the ibuf trees exceeds ibuf->max_size by this
312 many pages, we start to contract it in connection to inserts there, using
313 non-synchronous contract */
314 const ulint		IBUF_CONTRACT_ON_INSERT_NON_SYNC = 0;
315 
316 /** If the combined size of the ibuf trees exceeds ibuf->max_size by this
317 many pages, we start to contract it in connection to inserts there, using
318 synchronous contract */
319 const ulint		IBUF_CONTRACT_ON_INSERT_SYNC = 5;
320 
321 /** If the combined size of the ibuf trees exceeds ibuf->max_size by
322 this many pages, we start to contract it synchronous contract, but do
323 not insert */
324 const ulint		IBUF_CONTRACT_DO_NOT_INSERT = 10;
325 
326 /* TODO: how to cope with drop table if there are records in the insert
327 buffer for the indexes of the table? Is there actually any problem,
328 because ibuf merge is done to a page when it is read in, and it is
329 still physically like the index page even if the index would have been
330 dropped! So, there seems to be no problem. */
331 
332 /******************************************************************//**
333 Sets the flag in the current mini-transaction record indicating we're
334 inside an insert buffer routine. */
335 UNIV_INLINE
336 void
ibuf_enter(mtr_t * mtr)337 ibuf_enter(
338 /*=======*/
339 	mtr_t*	mtr)	/*!< in/out: mini-transaction */
340 {
341 	ut_ad(!mtr->is_inside_ibuf());
342 	mtr->enter_ibuf();
343 }
344 
345 /******************************************************************//**
346 Sets the flag in the current mini-transaction record indicating we're
347 exiting an insert buffer routine. */
348 UNIV_INLINE
349 void
ibuf_exit(mtr_t * mtr)350 ibuf_exit(
351 /*======*/
352 	mtr_t*	mtr)	/*!< in/out: mini-transaction */
353 {
354 	ut_ad(mtr->is_inside_ibuf());
355 	mtr->exit_ibuf();
356 }
357 
358 /**************************************************************//**
359 Commits an insert buffer mini-transaction and sets the persistent
360 cursor latch mode to BTR_NO_LATCHES, that is, detaches the cursor. */
361 UNIV_INLINE
362 void
ibuf_btr_pcur_commit_specify_mtr(btr_pcur_t * pcur,mtr_t * mtr)363 ibuf_btr_pcur_commit_specify_mtr(
364 /*=============================*/
365 	btr_pcur_t*	pcur,	/*!< in/out: persistent cursor */
366 	mtr_t*		mtr)	/*!< in/out: mini-transaction */
367 {
368 	ut_d(ibuf_exit(mtr));
369 	btr_pcur_commit_specify_mtr(pcur, mtr);
370 }
371 
372 /******************************************************************//**
373 Gets the ibuf header page and x-latches it.
374 @return insert buffer header page */
375 static
376 page_t*
ibuf_header_page_get(mtr_t * mtr)377 ibuf_header_page_get(
378 /*=================*/
379 	mtr_t*	mtr)	/*!< in/out: mini-transaction */
380 {
381 	buf_block_t*	block;
382 
383 	ut_ad(!ibuf_inside(mtr));
384 
385 	block = buf_page_get(
386 		page_id_t(IBUF_SPACE_ID, FSP_IBUF_HEADER_PAGE_NO),
387 		univ_page_size, RW_X_LATCH, mtr);
388 
389 	buf_block_dbg_add_level(block, SYNC_IBUF_HEADER);
390 
391 	return(buf_block_get_frame(block));
392 }
393 
394 /******************************************************************//**
395 Gets the root page and sx-latches it.
396 @return insert buffer tree root page */
397 static
398 page_t*
ibuf_tree_root_get(mtr_t * mtr)399 ibuf_tree_root_get(
400 /*===============*/
401 	mtr_t*		mtr)	/*!< in: mtr */
402 {
403 	buf_block_t*	block;
404 	page_t*		root;
405 
406 	ut_ad(ibuf_inside(mtr));
407 	ut_ad(mutex_own(&ibuf_mutex));
408 
409 	mtr_sx_lock(dict_index_get_lock(ibuf->index), mtr);
410 
411 	/* only segment list access is exclusive each other */
412 	block = buf_page_get(
413 		page_id_t(IBUF_SPACE_ID, FSP_IBUF_TREE_ROOT_PAGE_NO),
414 		univ_page_size, RW_SX_LATCH, mtr);
415 
416 	buf_block_dbg_add_level(block, SYNC_IBUF_TREE_NODE_NEW);
417 
418 	root = buf_block_get_frame(block);
419 
420 	ut_ad(page_get_space_id(root) == IBUF_SPACE_ID);
421 	ut_ad(page_get_page_no(root) == FSP_IBUF_TREE_ROOT_PAGE_NO);
422 	ut_ad(ibuf->empty == page_is_empty(root));
423 
424 	return(root);
425 }
426 
427 #ifdef UNIV_IBUF_COUNT_DEBUG
428 
429 /** Gets the ibuf count for a given page.
430 @param[in]	page_id	page id
431 @return number of entries in the insert buffer currently buffered for
432 this page */
433 ulint
ibuf_count_get(const page_id_t & page_id)434 ibuf_count_get(
435 	const page_id_t&	page_id)
436 {
437 	ibuf_count_check(page_id);
438 
439 	return(ibuf_counts[page_id.space()][page_id.page_no()]);
440 }
441 
442 /** Sets the ibuf count for a given page.
443 @param[in]	page_id	page id
444 @param[in]	val	value to set */
445 static
446 void
ibuf_count_set(const page_id_t & page_id,ulint val)447 ibuf_count_set(
448 	const page_id_t&	page_id,
449 	ulint			val)
450 {
451 	ibuf_count_check(page_id);
452 	ut_a(val < UNIV_PAGE_SIZE);
453 
454 	ibuf_counts[page_id.space()][page_id.page_no()] = val;
455 }
456 #endif
457 
458 /******************************************************************//**
459 Closes insert buffer and frees the data structures. */
460 void
ibuf_close(void)461 ibuf_close(void)
462 /*============*/
463 {
464 	mutex_free(&ibuf_pessimistic_insert_mutex);
465 
466 	mutex_free(&ibuf_mutex);
467 
468 	mutex_free(&ibuf_bitmap_mutex);
469 
470 	dict_table_t*	ibuf_table = ibuf->index->table;
471 	rw_lock_free(&ibuf->index->lock);
472 	dict_mem_index_free(ibuf->index);
473 	dict_mem_table_free(ibuf_table);
474 
475 	ut_free(ibuf);
476 	ibuf = NULL;
477 }
478 
479 /******************************************************************//**
480 Function to pass ibuf status variables */
481 
482 void
ibuf_export_ibuf_status(ulint * free_list,ulint * segment_size)483 ibuf_export_ibuf_status(
484 /*====================*/
485 	ulint*	free_list,
486 	ulint*	segment_size)
487 {
488 	*free_list = ibuf->free_list_len;
489 	*segment_size = ibuf->seg_size;
490 }
491 
492 /******************************************************************//**
493 Updates the size information of the ibuf, assuming the segment size has not
494 changed. */
495 static
496 void
ibuf_size_update(const page_t * root)497 ibuf_size_update(
498 /*=============*/
499 	const page_t*	root)	/*!< in: ibuf tree root */
500 {
501 	ut_ad(mutex_own(&ibuf_mutex));
502 
503 	ibuf->free_list_len = flst_get_len(root + PAGE_HEADER
504 					   + PAGE_BTR_IBUF_FREE_LIST);
505 
506 	ibuf->height = 1 + btr_page_get_level_low(root);
507 
508 	/* the '1 +' is the ibuf header page */
509 	ibuf->size = ibuf->seg_size - (1 + ibuf->free_list_len);
510 }
511 
512 /******************************************************************//**
513 Creates the insert buffer data structure at a database startup and initializes
514 the data structures for the insert buffer. */
515 dberr_t
ibuf_init_at_db_start(void)516 ibuf_init_at_db_start(void)
517 /*=======================*/
518 {
519 	page_t*		root;
520 	mtr_t		mtr;
521 	ulint		n_used;
522 	page_t*		header_page;
523 	dberr_t		error= DB_SUCCESS;
524 
525 	ibuf = static_cast<ibuf_t*>(ut_zalloc_nokey(sizeof(ibuf_t)));
526 
527 	/* At startup we intialize ibuf to have a maximum of
528 	CHANGE_BUFFER_DEFAULT_SIZE in terms of percentage of the
529 	buffer pool size. Once ibuf struct is initialized this
530 	value is updated with the user supplied size by calling
531 	ibuf_max_size_update(). */
532 	ibuf->max_size = ((buf_pool_get_curr_size() / UNIV_PAGE_SIZE)
533 			  * CHANGE_BUFFER_DEFAULT_SIZE) / 100;
534 
535 	mutex_create(LATCH_ID_IBUF, &ibuf_mutex);
536 
537 	mutex_create(LATCH_ID_IBUF_BITMAP, &ibuf_bitmap_mutex);
538 
539 	mutex_create(LATCH_ID_IBUF_PESSIMISTIC_INSERT,
540 		     &ibuf_pessimistic_insert_mutex);
541 
542 	mtr_start(&mtr);
543 
544 	mtr_x_lock_space(IBUF_SPACE_ID, &mtr);
545 
546 	mutex_enter(&ibuf_mutex);
547 
548 	header_page = ibuf_header_page_get(&mtr);
549 
550 	if (!header_page) {
551 		return (DB_DECRYPTION_FAILED);
552 	}
553 
554 	fseg_n_reserved_pages(header_page + IBUF_HEADER + IBUF_TREE_SEG_HEADER,
555 			      &n_used, &mtr);
556 	ibuf_enter(&mtr);
557 
558 	ut_ad(n_used >= 2);
559 
560 	ibuf->seg_size = n_used;
561 
562 	{
563 		buf_block_t*	block;
564 
565 		block = buf_page_get(
566 			page_id_t(IBUF_SPACE_ID, FSP_IBUF_TREE_ROOT_PAGE_NO),
567 			univ_page_size, RW_X_LATCH, &mtr);
568 
569 		buf_block_dbg_add_level(block, SYNC_IBUF_TREE_NODE);
570 
571 		root = buf_block_get_frame(block);
572 	}
573 
574 	ibuf_size_update(root);
575 	mutex_exit(&ibuf_mutex);
576 
577 	ibuf->empty = page_is_empty(root);
578 	ibuf_mtr_commit(&mtr);
579 
580 	ibuf->index = dict_mem_index_create(
581 		"innodb_change_buffer", "CLUST_IND",
582 		IBUF_SPACE_ID, DICT_CLUSTERED | DICT_IBUF, 1);
583 	ibuf->index->id = DICT_IBUF_ID_MIN + IBUF_SPACE_ID;
584 	ibuf->index->table = dict_mem_table_create(
585 		"innodb_change_buffer", IBUF_SPACE_ID, 1, 0, 0, 0);
586 	ibuf->index->n_uniq = REC_MAX_N_FIELDS;
587 	rw_lock_create(index_tree_rw_lock_key, &ibuf->index->lock,
588 		       SYNC_IBUF_INDEX_TREE);
589 	ibuf->index->search_info = btr_search_info_create(ibuf->index->heap);
590 	ibuf->index->page = FSP_IBUF_TREE_ROOT_PAGE_NO;
591 	ut_d(ibuf->index->cached = TRUE);
592 	return (error);
593 }
594 
595 /*********************************************************************//**
596 Updates the max_size value for ibuf. */
597 void
ibuf_max_size_update(ulint new_val)598 ibuf_max_size_update(
599 /*=================*/
600 	ulint	new_val)	/*!< in: new value in terms of
601 				percentage of the buffer pool size */
602 {
603 	ulint	new_size = ((buf_pool_get_curr_size() / UNIV_PAGE_SIZE)
604 			    * new_val) / 100;
605 	mutex_enter(&ibuf_mutex);
606 	ibuf->max_size = new_size;
607 	mutex_exit(&ibuf_mutex);
608 }
609 
610 
611 #endif /* !UNIV_HOTBACKUP */
612 /*********************************************************************//**
613 Initializes an ibuf bitmap page. */
614 void
ibuf_bitmap_page_init(buf_block_t * block,mtr_t * mtr)615 ibuf_bitmap_page_init(
616 /*==================*/
617 	buf_block_t*	block,	/*!< in: bitmap page */
618 	mtr_t*		mtr)	/*!< in: mtr */
619 {
620 	page_t*	page;
621 	ulint	byte_offset;
622 
623 	page = buf_block_get_frame(block);
624 	fil_page_set_type(page, FIL_PAGE_IBUF_BITMAP);
625 
626 	/* Write all zeros to the bitmap */
627 
628 	byte_offset = UT_BITS_IN_BYTES(block->page.size.physical()
629 				       * IBUF_BITS_PER_PAGE);
630 
631 	memset(page + IBUF_BITMAP, 0, byte_offset);
632 
633 	/* The remaining area (up to the page trailer) is uninitialized. */
634 
635 #ifndef UNIV_HOTBACKUP
636 	mlog_write_initial_log_record(page, MLOG_IBUF_BITMAP_INIT, mtr);
637 #endif /* !UNIV_HOTBACKUP */
638 }
639 
640 /*********************************************************************//**
641 Parses a redo log record of an ibuf bitmap page init.
642 @return end of log record or NULL */
643 byte*
ibuf_parse_bitmap_init(byte * ptr,byte * end_ptr MY_ATTRIBUTE ((unused)),buf_block_t * block,mtr_t * mtr)644 ibuf_parse_bitmap_init(
645 /*===================*/
646 	byte*		ptr,	/*!< in: buffer */
647 	byte*		end_ptr MY_ATTRIBUTE((unused)), /*!< in: buffer end */
648 	buf_block_t*	block,	/*!< in: block or NULL */
649 	mtr_t*		mtr)	/*!< in: mtr or NULL */
650 {
651 	ut_ad(ptr != NULL);
652 	ut_ad(end_ptr != NULL);
653 
654 	if (block) {
655 		ibuf_bitmap_page_init(block, mtr);
656 	}
657 
658 	return(ptr);
659 }
660 #ifndef UNIV_HOTBACKUP
661 # ifdef UNIV_DEBUG
662 /** Gets the desired bits for a given page from a bitmap page.
663 @param[in]	page		bitmap page
664 @param[in]	page_id		page id whose bits to get
665 @param[in]	page_size	page id whose bits to get
666 @param[in]	bit		IBUF_BITMAP_FREE, IBUF_BITMAP_BUFFERED, ...
667 @param[in,out]	mtr		mini-transaction holding an x-latch on the
668 bitmap page
669 @return value of bits */
670 #  define ibuf_bitmap_page_get_bits(page, page_id, page_size, bit, mtr)	\
671 	ibuf_bitmap_page_get_bits_low(page, page_id, page_size,		\
672 				      MTR_MEMO_PAGE_X_FIX, mtr, bit)
673 # else /* UNIV_DEBUG */
674 /** Gets the desired bits for a given page from a bitmap page.
675 @param[in]	page		bitmap page
676 @param[in]	page_id		page id whose bits to get
677 @param[in]	page_size	page id whose bits to get
678 @param[in]	bit		IBUF_BITMAP_FREE, IBUF_BITMAP_BUFFERED, ...
679 @param[in,out]	mtr		mini-transaction holding an x-latch on the
680 bitmap page
681 @return value of bits */
682 #  define ibuf_bitmap_page_get_bits(page, page_id, page_size, bit, mtr)	\
683 	ibuf_bitmap_page_get_bits_low(page, page_id, page_size, bit)
684 # endif /* UNIV_DEBUG */
685 
686 /** Gets the desired bits for a given page from a bitmap page.
687 @param[in]	page		bitmap page
688 @param[in]	page_id		page id whose bits to get
689 @param[in]	page_size	page size
690 @param[in]	latch_type	MTR_MEMO_PAGE_X_FIX, MTR_MEMO_BUF_FIX, ...
691 @param[in,out]	mtr		mini-transaction holding latch_type on the
692 bitmap page
693 @param[in]	bit		IBUF_BITMAP_FREE, IBUF_BITMAP_BUFFERED, ...
694 @return value of bits */
695 UNIV_INLINE
696 ulint
ibuf_bitmap_page_get_bits_low(const page_t * page,const page_id_t & page_id,const page_size_t & page_size,ulint latch_type,mtr_t * mtr,ulint bit)697 ibuf_bitmap_page_get_bits_low(
698 	const page_t*		page,
699 	const page_id_t&	page_id,
700 	const page_size_t&	page_size,
701 #ifdef UNIV_DEBUG
702 	ulint			latch_type,
703 	mtr_t*			mtr,
704 #endif /* UNIV_DEBUG */
705 	ulint			bit)
706 {
707 	ulint	byte_offset;
708 	ulint	bit_offset;
709 	ulint	map_byte;
710 	ulint	value;
711 
712 	ut_ad(bit < IBUF_BITS_PER_PAGE);
713 #if IBUF_BITS_PER_PAGE % 2
714 # error "IBUF_BITS_PER_PAGE % 2 != 0"
715 #endif
716 	ut_ad(mtr_memo_contains_page(mtr, page, latch_type));
717 
718 	bit_offset = (page_id.page_no() % page_size.physical())
719 		* IBUF_BITS_PER_PAGE + bit;
720 
721 	byte_offset = bit_offset / 8;
722 	bit_offset = bit_offset % 8;
723 
724 	ut_ad(byte_offset + IBUF_BITMAP < UNIV_PAGE_SIZE);
725 
726 	map_byte = mach_read_from_1(page + IBUF_BITMAP + byte_offset);
727 
728 	value = ut_bit_get_nth(map_byte, bit_offset);
729 
730 	if (bit == IBUF_BITMAP_FREE) {
731 		ut_ad(bit_offset + 1 < 8);
732 
733 		value = value * 2 + ut_bit_get_nth(map_byte, bit_offset + 1);
734 	}
735 
736 	return(value);
737 }
738 
739 /** Sets the desired bit for a given page in a bitmap page.
740 @param[in,out]	page		bitmap page
741 @param[in]	page_id		page id whose bits to set
742 @param[in]	page_size	page size
743 @param[in]	bit		IBUF_BITMAP_FREE, IBUF_BITMAP_BUFFERED, ...
744 @param[in]	val		value to set
745 @param[in,out]	mtr		mtr containing an x-latch to the bitmap page */
746 static
747 void
ibuf_bitmap_page_set_bits(page_t * page,const page_id_t & page_id,const page_size_t & page_size,ulint bit,ulint val,mtr_t * mtr)748 ibuf_bitmap_page_set_bits(
749 	page_t*			page,
750 	const page_id_t&	page_id,
751 	const page_size_t&	page_size,
752 	ulint			bit,
753 	ulint			val,
754 	mtr_t*			mtr)
755 {
756 	ulint	byte_offset;
757 	ulint	bit_offset;
758 	ulint	map_byte;
759 
760 	ut_ad(bit < IBUF_BITS_PER_PAGE);
761 #if IBUF_BITS_PER_PAGE % 2
762 # error "IBUF_BITS_PER_PAGE % 2 != 0"
763 #endif
764 	ut_ad(mtr_memo_contains_page(mtr, page, MTR_MEMO_PAGE_X_FIX));
765 	ut_ad(mtr->is_named_space(page_id.space()));
766 #ifdef UNIV_IBUF_COUNT_DEBUG
767 	ut_a((bit != IBUF_BITMAP_BUFFERED) || (val != FALSE)
768 	     || (0 == ibuf_count_get(page_id)));
769 #endif
770 
771 	bit_offset = (page_id.page_no() % page_size.physical())
772 		* IBUF_BITS_PER_PAGE + bit;
773 
774 	byte_offset = bit_offset / 8;
775 	bit_offset = bit_offset % 8;
776 
777 	ut_ad(byte_offset + IBUF_BITMAP < UNIV_PAGE_SIZE);
778 
779 	map_byte = mach_read_from_1(page + IBUF_BITMAP + byte_offset);
780 
781 	if (bit == IBUF_BITMAP_FREE) {
782 		ut_ad(bit_offset + 1 < 8);
783 		ut_ad(val <= 3);
784 
785 		map_byte = ut_bit_set_nth(map_byte, bit_offset, val / 2);
786 		map_byte = ut_bit_set_nth(map_byte, bit_offset + 1, val % 2);
787 	} else {
788 		ut_ad(val <= 1);
789 		map_byte = ut_bit_set_nth(map_byte, bit_offset, val);
790 	}
791 
792 	mlog_write_ulint(page + IBUF_BITMAP + byte_offset, map_byte,
793 			 MLOG_1BYTE, mtr);
794 }
795 
796 /** Calculates the bitmap page number for a given page number.
797 @param[in]	page_id		page id
798 @param[in]	page_size	page size
799 @return the bitmap page id where the file page is mapped */
800 UNIV_INLINE
801 const page_id_t
ibuf_bitmap_page_no_calc(const page_id_t & page_id,const page_size_t & page_size)802 ibuf_bitmap_page_no_calc(
803 	const page_id_t&	page_id,
804 	const page_size_t&	page_size)
805 {
806 	ulint	bitmap_page_no;
807 
808 	bitmap_page_no = FSP_IBUF_BITMAP_OFFSET
809 		+ (page_id.page_no() & ~(page_size.physical() - 1));
810 
811 	return(page_id_t(page_id.space(), bitmap_page_no));
812 }
813 
814 /** Gets the ibuf bitmap page where the bits describing a given file page are
815 stored.
816 @param[in]	page_id		page id of the file page
817 @param[in]	page_size	page size of the file page
818 @param[in]	file		file name
819 @param[in]	line		line where called
820 @param[in,out]	mtr		mini-transaction
821 @return bitmap page where the file page is mapped, that is, the bitmap
822 page containing the descriptor bits for the file page; the bitmap page
823 is x-latched */
824 static
825 page_t*
ibuf_bitmap_get_map_page_func(const page_id_t & page_id,const page_size_t & page_size,const char * file,ulint line,mtr_t * mtr,dberr_t * err=NULL)826 ibuf_bitmap_get_map_page_func(
827 	const page_id_t&	page_id,
828 	const page_size_t&	page_size,
829 	const char*		file,
830 	ulint			line,
831 	mtr_t*			mtr,
832         dberr_t                 *err = NULL)
833 {
834 	buf_block_t*	block;
835 	dberr_t error = DB_SUCCESS;
836 
837 	block = buf_page_get_gen(ibuf_bitmap_page_no_calc(page_id, page_size),
838 				 page_size, RW_X_LATCH, NULL, BUF_GET,
839 				 file, line, mtr, false, &error);
840 	if (err != NULL)
841 		*err = error;
842 
843 	if (error != DB_SUCCESS) {
844 		return NULL;
845 	}
846 
847 	buf_block_dbg_add_level(block, SYNC_IBUF_BITMAP);
848 
849 	return(buf_block_get_frame(block));
850 }
851 
852 /** Gets the ibuf bitmap page where the bits describing a given file page are
853 stored.
854 @param[in]	page_id		page id of the file page
855 @param[in]	page_size	page size of the file page
856 @param[in,out]	mtr		mini-transaction
857 @return bitmap page where the file page is mapped, that is, the bitmap
858 page containing the descriptor bits for the file page; the bitmap page
859 is x-latched */
860 #define ibuf_bitmap_get_map_page(page_id, page_size, mtr)	\
861 	ibuf_bitmap_get_map_page_func(page_id, page_size, \
862 				      __FILE__, __LINE__, mtr)
863 
864 /************************************************************************//**
865 Sets the free bits of the page in the ibuf bitmap. This is done in a separate
866 mini-transaction, hence this operation does not restrict further work to only
867 ibuf bitmap operations, which would result if the latch to the bitmap page
868 were kept. */
869 UNIV_INLINE
870 void
ibuf_set_free_bits_low(const buf_block_t * block,ulint val,mtr_t * mtr)871 ibuf_set_free_bits_low(
872 /*===================*/
873 	const buf_block_t*	block,	/*!< in: index page; free bits are set if
874 					the index is non-clustered and page
875 					level is 0 */
876 	ulint			val,	/*!< in: value to set: < 4 */
877 	mtr_t*			mtr)	/*!< in/out: mtr */
878 {
879 	page_t*	bitmap_page;
880 
881 	ut_ad(mtr->is_named_space(block->page.id.space()));
882 
883 	if (!page_is_leaf(buf_block_get_frame(block))) {
884 
885 		return;
886 	}
887 
888 	bitmap_page = ibuf_bitmap_get_map_page(block->page.id,
889 					       block->page.size, mtr);
890 
891 #ifdef UNIV_IBUF_DEBUG
892 	ut_a(val <= ibuf_index_page_calc_free(block));
893 #endif /* UNIV_IBUF_DEBUG */
894 
895 	ibuf_bitmap_page_set_bits(
896 		bitmap_page, block->page.id, block->page.size,
897 		IBUF_BITMAP_FREE, val, mtr);
898 }
899 
900 /************************************************************************//**
901 Sets the free bit of the page in the ibuf bitmap. This is done in a separate
902 mini-transaction, hence this operation does not restrict further work to only
903 ibuf bitmap operations, which would result if the latch to the bitmap page
904 were kept. */
905 void
ibuf_set_free_bits_func(buf_block_t * block,ulint max_val,ulint val)906 ibuf_set_free_bits_func(
907 /*====================*/
908 	buf_block_t*	block,	/*!< in: index page of a non-clustered index;
909 				free bit is reset if page level is 0 */
910 #ifdef UNIV_IBUF_DEBUG
911 	ulint		max_val,/*!< in: ULINT_UNDEFINED or a maximum
912 				value which the bits must have before
913 				setting; this is for debugging */
914 #endif /* UNIV_IBUF_DEBUG */
915 	ulint		val)	/*!< in: value to set: < 4 */
916 {
917 	mtr_t	mtr;
918 	page_t*	page;
919 	page_t*	bitmap_page;
920 
921 	page = buf_block_get_frame(block);
922 
923 	if (!page_is_leaf(page)) {
924 
925 		return;
926 	}
927 
928 	mtr_start(&mtr);
929 	const fil_space_t* space = mtr.set_named_space(block->page.id.space());
930 
931 	bitmap_page = ibuf_bitmap_get_map_page(block->page.id,
932 					       block->page.size, &mtr);
933 
934 	switch (space->purpose) {
935 	case FIL_TYPE_LOG:
936 		ut_ad(0);
937 		break;
938 	case FIL_TYPE_TABLESPACE:
939 		/* Avoid logging while fixing up truncate of table. */
940 		if (!srv_is_tablespace_truncated(block->page.id.space())) {
941 			break;
942 		}
943 		/* fall through */
944 	case FIL_TYPE_TEMPORARY:
945 	case FIL_TYPE_IMPORT:
946 		mtr_set_log_mode(&mtr, MTR_LOG_NO_REDO);
947 	}
948 
949 #ifdef UNIV_IBUF_DEBUG
950 	if (max_val != ULINT_UNDEFINED) {
951 		ulint	old_val;
952 
953 		old_val = ibuf_bitmap_page_get_bits(
954 			bitmap_page, block->page.id,
955 			IBUF_BITMAP_FREE, &mtr);
956 # if 0
957 		if (old_val != max_val) {
958 			fprintf(stderr,
959 				"Ibuf: page %lu old val %lu max val %lu\n",
960 				page_get_page_no(page),
961 				old_val, max_val);
962 		}
963 # endif
964 
965 		ut_a(old_val <= max_val);
966 	}
967 # if 0
968 	fprintf(stderr, "Setting page no %lu free bits to %lu should be %lu\n",
969 		page_get_page_no(page), val,
970 		ibuf_index_page_calc_free(block));
971 # endif
972 
973 	ut_a(val <= ibuf_index_page_calc_free(block));
974 #endif /* UNIV_IBUF_DEBUG */
975 
976 	ibuf_bitmap_page_set_bits(
977 		bitmap_page, block->page.id, block->page.size,
978 		IBUF_BITMAP_FREE, val, &mtr);
979 
980 	mtr_commit(&mtr);
981 }
982 
983 /************************************************************************//**
984 Resets the free bits of the page in the ibuf bitmap. This is done in a
985 separate mini-transaction, hence this operation does not restrict
986 further work to only ibuf bitmap operations, which would result if the
987 latch to the bitmap page were kept.  NOTE: The free bits in the insert
988 buffer bitmap must never exceed the free space on a page.  It is safe
989 to decrement or reset the bits in the bitmap in a mini-transaction
990 that is committed before the mini-transaction that affects the free
991 space. */
992 void
ibuf_reset_free_bits(buf_block_t * block)993 ibuf_reset_free_bits(
994 /*=================*/
995 	buf_block_t*	block)	/*!< in: index page; free bits are set to 0
996 				if the index is a non-clustered
997 				non-unique, and page level is 0 */
998 {
999 	ibuf_set_free_bits(block, 0, ULINT_UNDEFINED);
1000 }
1001 
1002 /**********************************************************************//**
1003 Updates the free bits for an uncompressed page to reflect the present
1004 state.  Does this in the mtr given, which means that the latching
1005 order rules virtually prevent any further operations for this OS
1006 thread until mtr is committed.  NOTE: The free bits in the insert
1007 buffer bitmap must never exceed the free space on a page.  It is safe
1008 to set the free bits in the same mini-transaction that updated the
1009 page. */
1010 void
ibuf_update_free_bits_low(const buf_block_t * block,ulint max_ins_size,mtr_t * mtr)1011 ibuf_update_free_bits_low(
1012 /*======================*/
1013 	const buf_block_t*	block,		/*!< in: index page */
1014 	ulint			max_ins_size,	/*!< in: value of
1015 						maximum insert size
1016 						with reorganize before
1017 						the latest operation
1018 						performed to the page */
1019 	mtr_t*			mtr)		/*!< in/out: mtr */
1020 {
1021 	ulint	before;
1022 	ulint	after;
1023 
1024 	ut_a(!buf_block_get_page_zip(block));
1025 	ut_ad(mtr->is_named_space(block->page.id.space()));
1026 
1027 	before = ibuf_index_page_calc_free_bits(block->page.size.logical(),
1028 						max_ins_size);
1029 
1030 	after = ibuf_index_page_calc_free(block);
1031 
1032 	/* This approach cannot be used on compressed pages, since the
1033 	computed value of "before" often does not match the current
1034 	state of the bitmap.  This is because the free space may
1035 	increase or decrease when a compressed page is reorganized. */
1036 	if (before != after) {
1037 		ibuf_set_free_bits_low(block, after, mtr);
1038 	}
1039 }
1040 
1041 /**********************************************************************//**
1042 Updates the free bits for a compressed page to reflect the present
1043 state.  Does this in the mtr given, which means that the latching
1044 order rules virtually prevent any further operations for this OS
1045 thread until mtr is committed.  NOTE: The free bits in the insert
1046 buffer bitmap must never exceed the free space on a page.  It is safe
1047 to set the free bits in the same mini-transaction that updated the
1048 page. */
1049 void
ibuf_update_free_bits_zip(buf_block_t * block,mtr_t * mtr)1050 ibuf_update_free_bits_zip(
1051 /*======================*/
1052 	buf_block_t*	block,	/*!< in/out: index page */
1053 	mtr_t*		mtr)	/*!< in/out: mtr */
1054 {
1055 	page_t*	bitmap_page;
1056 	ulint	after;
1057 
1058 	ut_a(page_is_leaf(buf_block_get_frame(block)));
1059 	ut_a(block->page.size.is_compressed());
1060 
1061 	bitmap_page = ibuf_bitmap_get_map_page(block->page.id,
1062 					       block->page.size, mtr);
1063 
1064 	after = ibuf_index_page_calc_free_zip(block);
1065 
1066 	if (after == 0) {
1067 		/* We move the page to the front of the buffer pool LRU list:
1068 		the purpose of this is to prevent those pages to which we
1069 		cannot make inserts using the insert buffer from slipping
1070 		out of the buffer pool */
1071 
1072 		buf_page_make_young(&block->page);
1073 	}
1074 
1075 	ibuf_bitmap_page_set_bits(
1076 		bitmap_page, block->page.id, block->page.size,
1077 		IBUF_BITMAP_FREE, after, mtr);
1078 }
1079 
1080 /**********************************************************************//**
1081 Updates the free bits for the two pages to reflect the present state.
1082 Does this in the mtr given, which means that the latching order rules
1083 virtually prevent any further operations until mtr is committed.
1084 NOTE: The free bits in the insert buffer bitmap must never exceed the
1085 free space on a page.  It is safe to set the free bits in the same
1086 mini-transaction that updated the pages. */
1087 void
ibuf_update_free_bits_for_two_pages_low(buf_block_t * block1,buf_block_t * block2,mtr_t * mtr)1088 ibuf_update_free_bits_for_two_pages_low(
1089 /*====================================*/
1090 	buf_block_t*	block1,	/*!< in: index page */
1091 	buf_block_t*	block2,	/*!< in: index page */
1092 	mtr_t*		mtr)	/*!< in: mtr */
1093 {
1094 	ulint	state;
1095 
1096 	ut_ad(mtr->is_named_space(block1->page.id.space()));
1097 	ut_ad(block1->page.id.space() == block2->page.id.space());
1098 
1099 	/* As we have to x-latch two random bitmap pages, we have to acquire
1100 	the bitmap mutex to prevent a deadlock with a similar operation
1101 	performed by another OS thread. */
1102 
1103 	mutex_enter(&ibuf_bitmap_mutex);
1104 
1105 	state = ibuf_index_page_calc_free(block1);
1106 
1107 	ibuf_set_free_bits_low(block1, state, mtr);
1108 
1109 	state = ibuf_index_page_calc_free(block2);
1110 
1111 	ibuf_set_free_bits_low(block2, state, mtr);
1112 
1113 	mutex_exit(&ibuf_bitmap_mutex);
1114 }
1115 
1116 /** Returns TRUE if the page is one of the fixed address ibuf pages.
1117 @param[in]	page_id		page id
1118 @param[in]	page_size	page size
1119 @return TRUE if a fixed address ibuf i/o page */
1120 UNIV_INLINE
1121 ibool
ibuf_fixed_addr_page(const page_id_t & page_id,const page_size_t & page_size)1122 ibuf_fixed_addr_page(
1123 	const page_id_t&	page_id,
1124 	const page_size_t&	page_size)
1125 {
1126 	return((page_id.space() == IBUF_SPACE_ID
1127 		&& page_id.page_no() == IBUF_TREE_ROOT_PAGE_NO)
1128 	       || ibuf_bitmap_page(page_id, page_size));
1129 }
1130 
1131 /** Checks if a page is a level 2 or 3 page in the ibuf hierarchy of pages.
1132 Must not be called when recv_no_ibuf_operations==true.
1133 @param[in]	page_id		page id
1134 @param[in]	page_size	page size
1135 @param[in]	x_latch		FALSE if relaxed check (avoid latching the
1136 bitmap page)
1137 @param[in]	file		file name
1138 @param[in]	line		line where called
1139 @param[in,out]	mtr		mtr which will contain an x-latch to the
1140 bitmap page if the page is not one of the fixed address ibuf pages, or NULL,
1141 in which case a new transaction is created.
1142 @return TRUE if level 2 or level 3 page */
1143 ibool
ibuf_page_low(const page_id_t & page_id,const page_size_t & page_size,ibool x_latch,const char * file,ulint line,mtr_t * mtr)1144 ibuf_page_low(
1145 	const page_id_t&	page_id,
1146 	const page_size_t&	page_size,
1147 #ifdef UNIV_DEBUG
1148 	ibool			x_latch,
1149 #endif /* UNIV_DEBUG */
1150 	const char*		file,
1151 	ulint			line,
1152 	mtr_t*			mtr)
1153 {
1154 	ibool	ret;
1155 	mtr_t	local_mtr;
1156 	page_t*	bitmap_page;
1157 
1158 	ut_ad(!recv_no_ibuf_operations);
1159 	ut_ad(x_latch || mtr == NULL);
1160 
1161 	if (ibuf_fixed_addr_page(page_id, page_size)) {
1162 
1163 		return(TRUE);
1164 	} else if (page_id.space() != IBUF_SPACE_ID) {
1165 
1166 		return(FALSE);
1167 	}
1168 
1169 	ut_ad(fil_space_get_type(IBUF_SPACE_ID) == FIL_TYPE_TABLESPACE);
1170 
1171 #ifdef UNIV_DEBUG
1172 	if (!x_latch) {
1173 		mtr_start(&local_mtr);
1174 
1175 		/* Get the bitmap page without a page latch, so that
1176 		we will not be violating the latching order when
1177 		another bitmap page has already been latched by this
1178 		thread. The page will be buffer-fixed, and thus it
1179 		cannot be removed or relocated while we are looking at
1180 		it. The contents of the page could change, but the
1181 		IBUF_BITMAP_IBUF bit that we are interested in should
1182 		not be modified by any other thread. Nobody should be
1183 		calling ibuf_add_free_page() or ibuf_remove_free_page()
1184 		while the page is linked to the insert buffer b-tree. */
1185 
1186 		bitmap_page = buf_block_get_frame(
1187 			buf_page_get_gen(
1188 				ibuf_bitmap_page_no_calc(page_id, page_size),
1189 				page_size, RW_NO_LATCH, NULL, BUF_GET_NO_LATCH,
1190 				file, line, &local_mtr));
1191 
1192 		ret = ibuf_bitmap_page_get_bits_low(
1193 			bitmap_page, page_id, page_size,
1194 			MTR_MEMO_BUF_FIX, &local_mtr, IBUF_BITMAP_IBUF);
1195 
1196 		mtr_commit(&local_mtr);
1197 		return(ret);
1198 	}
1199 #endif /* UNIV_DEBUG */
1200 
1201 	if (mtr == NULL) {
1202 		mtr = &local_mtr;
1203 		mtr_start(mtr);
1204 	}
1205 
1206 	bitmap_page = ibuf_bitmap_get_map_page_func(page_id, page_size,
1207 						    file, line, mtr);
1208 
1209 	ret = ibuf_bitmap_page_get_bits(bitmap_page, page_id, page_size,
1210 					IBUF_BITMAP_IBUF, mtr);
1211 
1212 	if (mtr == &local_mtr) {
1213 		mtr_commit(mtr);
1214 	}
1215 
1216 	return(ret);
1217 }
1218 
1219 #ifdef UNIV_DEBUG
1220 # define ibuf_rec_get_page_no(mtr,rec) ibuf_rec_get_page_no_func(mtr,rec)
1221 #else /* UNIV_DEBUG */
1222 # define ibuf_rec_get_page_no(mtr,rec) ibuf_rec_get_page_no_func(rec)
1223 #endif /* UNIV_DEBUG */
1224 
1225 /********************************************************************//**
1226 Returns the page number field of an ibuf record.
1227 @return page number */
1228 static
1229 ulint
ibuf_rec_get_page_no_func(mtr_t * mtr,const rec_t * rec)1230 ibuf_rec_get_page_no_func(
1231 /*======================*/
1232 #ifdef UNIV_DEBUG
1233 	mtr_t*		mtr,	/*!< in: mini-transaction owning rec */
1234 #endif /* UNIV_DEBUG */
1235 	const rec_t*	rec)	/*!< in: ibuf record */
1236 {
1237 	const byte*	field;
1238 	ulint		len;
1239 
1240 	ut_ad(mtr_memo_contains_page(mtr, rec, MTR_MEMO_PAGE_X_FIX)
1241 	      || mtr_memo_contains_page(mtr, rec, MTR_MEMO_PAGE_S_FIX));
1242 	ut_ad(ibuf_inside(mtr));
1243 	ut_ad(rec_get_n_fields_old(rec) > 2);
1244 
1245 	field = rec_get_nth_field_old(rec, IBUF_REC_FIELD_MARKER, &len);
1246 
1247 	ut_a(len == 1);
1248 
1249 	field = rec_get_nth_field_old(rec, IBUF_REC_FIELD_PAGE, &len);
1250 
1251 	ut_a(len == 4);
1252 
1253 	return(mach_read_from_4(field));
1254 }
1255 
1256 #ifdef UNIV_DEBUG
1257 # define ibuf_rec_get_space(mtr,rec) ibuf_rec_get_space_func(mtr,rec)
1258 #else /* UNIV_DEBUG */
1259 # define ibuf_rec_get_space(mtr,rec) ibuf_rec_get_space_func(rec)
1260 #endif /* UNIV_DEBUG */
1261 
1262 /********************************************************************//**
1263 Returns the space id field of an ibuf record. For < 4.1.x format records
1264 returns 0.
1265 @return space id */
1266 static
1267 ulint
ibuf_rec_get_space_func(mtr_t * mtr,const rec_t * rec)1268 ibuf_rec_get_space_func(
1269 /*====================*/
1270 #ifdef UNIV_DEBUG
1271 	mtr_t*		mtr,	/*!< in: mini-transaction owning rec */
1272 #endif /* UNIV_DEBUG */
1273 	const rec_t*	rec)	/*!< in: ibuf record */
1274 {
1275 	const byte*	field;
1276 	ulint		len;
1277 
1278 	ut_ad(mtr_memo_contains_page(mtr, rec, MTR_MEMO_PAGE_X_FIX)
1279 	      || mtr_memo_contains_page(mtr, rec, MTR_MEMO_PAGE_S_FIX));
1280 	ut_ad(ibuf_inside(mtr));
1281 	ut_ad(rec_get_n_fields_old(rec) > 2);
1282 
1283 	field = rec_get_nth_field_old(rec, IBUF_REC_FIELD_MARKER, &len);
1284 
1285 	ut_a(len == 1);
1286 
1287 	field = rec_get_nth_field_old(rec, IBUF_REC_FIELD_SPACE, &len);
1288 
1289 	ut_a(len == 4);
1290 
1291 	return(mach_read_from_4(field));
1292 }
1293 
1294 #ifdef UNIV_DEBUG
1295 # define ibuf_rec_get_info(mtr,rec,op,comp,info_len,counter)	\
1296 	ibuf_rec_get_info_func(mtr,rec,op,comp,info_len,counter)
1297 #else /* UNIV_DEBUG */
1298 # define ibuf_rec_get_info(mtr,rec,op,comp,info_len,counter)	\
1299 	ibuf_rec_get_info_func(rec,op,comp,info_len,counter)
1300 #endif
1301 /****************************************************************//**
1302 Get various information about an ibuf record in >= 4.1.x format. */
1303 static
1304 void
ibuf_rec_get_info_func(mtr_t * mtr,const rec_t * rec,ibuf_op_t * op,ibool * comp,ulint * info_len,ulint * counter)1305 ibuf_rec_get_info_func(
1306 /*===================*/
1307 #ifdef UNIV_DEBUG
1308 	mtr_t*		mtr,	/*!< in: mini-transaction owning rec */
1309 #endif /* UNIV_DEBUG */
1310 	const rec_t*	rec,		/*!< in: ibuf record */
1311 	ibuf_op_t*	op,		/*!< out: operation type, or NULL */
1312 	ibool*		comp,		/*!< out: compact flag, or NULL */
1313 	ulint*		info_len,	/*!< out: length of info fields at the
1314 					start of the fourth field, or
1315 					NULL */
1316 	ulint*		counter)	/*!< in: counter value, or NULL */
1317 {
1318 	const byte*	types;
1319 	ulint		fields;
1320 	ulint		len;
1321 
1322 	/* Local variables to shadow arguments. */
1323 	ibuf_op_t	op_local;
1324 	ibool		comp_local;
1325 	ulint		info_len_local;
1326 	ulint		counter_local;
1327 
1328 	ut_ad(mtr_memo_contains_page(mtr, rec, MTR_MEMO_PAGE_X_FIX)
1329 	      || mtr_memo_contains_page(mtr, rec, MTR_MEMO_PAGE_S_FIX));
1330 	ut_ad(ibuf_inside(mtr));
1331 	fields = rec_get_n_fields_old(rec);
1332 	ut_a(fields > IBUF_REC_FIELD_USER);
1333 
1334 	types = rec_get_nth_field_old(rec, IBUF_REC_FIELD_METADATA, &len);
1335 
1336 	info_len_local = len % DATA_NEW_ORDER_NULL_TYPE_BUF_SIZE;
1337 
1338 	switch (info_len_local) {
1339 	case 0:
1340 	case 1:
1341 		op_local = IBUF_OP_INSERT;
1342 		comp_local = info_len_local;
1343 		ut_ad(!counter);
1344 		counter_local = ULINT_UNDEFINED;
1345 		break;
1346 
1347 	case IBUF_REC_INFO_SIZE:
1348 		op_local = (ibuf_op_t) types[IBUF_REC_OFFSET_TYPE];
1349 		comp_local = types[IBUF_REC_OFFSET_FLAGS] & IBUF_REC_COMPACT;
1350 		counter_local = mach_read_from_2(
1351 			types + IBUF_REC_OFFSET_COUNTER);
1352 		break;
1353 
1354 	default:
1355 		ut_error;
1356 	}
1357 
1358 	ut_a(op_local < IBUF_OP_COUNT);
1359 	ut_a((len - info_len_local) ==
1360 	     (fields - IBUF_REC_FIELD_USER)
1361 	     * DATA_NEW_ORDER_NULL_TYPE_BUF_SIZE);
1362 
1363 	if (op) {
1364 		*op = op_local;
1365 	}
1366 
1367 	if (comp) {
1368 		*comp = comp_local;
1369 	}
1370 
1371 	if (info_len) {
1372 		*info_len = info_len_local;
1373 	}
1374 
1375 	if (counter) {
1376 		*counter = counter_local;
1377 	}
1378 }
1379 
1380 #ifdef UNIV_DEBUG
1381 # define ibuf_rec_get_op_type(mtr,rec) ibuf_rec_get_op_type_func(mtr,rec)
1382 #else /* UNIV_DEBUG */
1383 # define ibuf_rec_get_op_type(mtr,rec) ibuf_rec_get_op_type_func(rec)
1384 #endif
1385 
1386 /****************************************************************//**
1387 Returns the operation type field of an ibuf record.
1388 @return operation type */
1389 static
1390 ibuf_op_t
ibuf_rec_get_op_type_func(mtr_t * mtr,const rec_t * rec)1391 ibuf_rec_get_op_type_func(
1392 /*======================*/
1393 #ifdef UNIV_DEBUG
1394 	mtr_t*		mtr,	/*!< in: mini-transaction owning rec */
1395 #endif /* UNIV_DEBUG */
1396 	const rec_t*	rec)	/*!< in: ibuf record */
1397 {
1398 	ulint		len;
1399 
1400 	ut_ad(mtr_memo_contains_page(mtr, rec, MTR_MEMO_PAGE_X_FIX)
1401 	      || mtr_memo_contains_page(mtr, rec, MTR_MEMO_PAGE_S_FIX));
1402 	ut_ad(ibuf_inside(mtr));
1403 	ut_ad(rec_get_n_fields_old(rec) > 2);
1404 
1405 	(void) rec_get_nth_field_old(rec, IBUF_REC_FIELD_MARKER, &len);
1406 
1407 	if (len > 1) {
1408 		/* This is a < 4.1.x format record */
1409 
1410 		return(IBUF_OP_INSERT);
1411 	} else {
1412 		ibuf_op_t	op;
1413 
1414 		ibuf_rec_get_info(mtr, rec, &op, NULL, NULL, NULL);
1415 
1416 		return(op);
1417 	}
1418 }
1419 
1420 /****************************************************************//**
1421 Read the first two bytes from a record's fourth field (counter field in new
1422 records; something else in older records).
1423 @return "counter" field, or ULINT_UNDEFINED if for some reason it
1424 can't be read */
1425 ulint
ibuf_rec_get_counter(const rec_t * rec)1426 ibuf_rec_get_counter(
1427 /*=================*/
1428 	const rec_t*	rec)	/*!< in: ibuf record */
1429 {
1430 	const byte*	ptr;
1431 	ulint		len;
1432 
1433 	if (rec_get_n_fields_old(rec) <= IBUF_REC_FIELD_METADATA) {
1434 
1435 		return(ULINT_UNDEFINED);
1436 	}
1437 
1438 	ptr = rec_get_nth_field_old(rec, IBUF_REC_FIELD_METADATA, &len);
1439 
1440 	if (len >= 2) {
1441 
1442 		return(mach_read_from_2(ptr));
1443 	} else {
1444 
1445 		return(ULINT_UNDEFINED);
1446 	}
1447 }
1448 
1449 /****************************************************************//**
1450 Add accumulated operation counts to a permanent array. Both arrays must be
1451 of size IBUF_OP_COUNT. */
1452 static
1453 void
ibuf_add_ops(ulint * arr,const ulint * ops)1454 ibuf_add_ops(
1455 /*=========*/
1456 	ulint*		arr,	/*!< in/out: array to modify */
1457 	const ulint*	ops)	/*!< in: operation counts */
1458 
1459 {
1460 	ulint	i;
1461 
1462 	for (i = 0; i < IBUF_OP_COUNT; i++) {
1463 		os_atomic_increment_ulint(&arr[i], ops[i]);
1464 	}
1465 }
1466 
1467 /****************************************************************//**
1468 Print operation counts. The array must be of size IBUF_OP_COUNT. */
1469 static
1470 void
ibuf_print_ops(const ulint * ops,FILE * file)1471 ibuf_print_ops(
1472 /*===========*/
1473 	const ulint*	ops,	/*!< in: operation counts */
1474 	FILE*		file)	/*!< in: file where to print */
1475 {
1476 	static const char* op_names[] = {
1477 		"insert",
1478 		"delete mark",
1479 		"delete"
1480 	};
1481 	ulint	i;
1482 
1483 	ut_a(UT_ARR_SIZE(op_names) == IBUF_OP_COUNT);
1484 
1485 	for (i = 0; i < IBUF_OP_COUNT; i++) {
1486 		fprintf(file, "%s %lu%s", op_names[i],
1487 			(ulong) ops[i], (i < (IBUF_OP_COUNT - 1)) ? ", " : "");
1488 	}
1489 
1490 	putc('\n', file);
1491 }
1492 
1493 /********************************************************************//**
1494 Creates a dummy index for inserting a record to a non-clustered index.
1495 @return dummy index */
1496 static
1497 dict_index_t*
ibuf_dummy_index_create(ulint n,ibool comp)1498 ibuf_dummy_index_create(
1499 /*====================*/
1500 	ulint		n,	/*!< in: number of fields */
1501 	ibool		comp)	/*!< in: TRUE=use compact record format */
1502 {
1503 	dict_table_t*	table;
1504 	dict_index_t*	index;
1505 
1506 	table = dict_mem_table_create("IBUF_DUMMY",
1507 				      DICT_HDR_SPACE, n, 0,
1508 				      comp ? DICT_TF_COMPACT : 0, 0);
1509 
1510 	index = dict_mem_index_create("IBUF_DUMMY", "IBUF_DUMMY",
1511 				      DICT_HDR_SPACE, 0, n);
1512 
1513 	index->table = table;
1514 
1515 	/* avoid ut_ad(index->cached) in dict_index_get_n_unique_in_tree */
1516 	index->cached = TRUE;
1517 
1518 	return(index);
1519 }
1520 /********************************************************************//**
1521 Add a column to the dummy index */
1522 static
1523 void
ibuf_dummy_index_add_col(dict_index_t * index,const dtype_t * type,ulint len)1524 ibuf_dummy_index_add_col(
1525 /*=====================*/
1526 	dict_index_t*	index,	/*!< in: dummy index */
1527 	const dtype_t*	type,	/*!< in: the data type of the column */
1528 	ulint		len)	/*!< in: length of the column */
1529 {
1530 	ulint	i	= index->table->n_def;
1531 	dict_mem_table_add_col(index->table, NULL, NULL,
1532 			       dtype_get_mtype(type),
1533 			       dtype_get_prtype(type),
1534 			       dtype_get_len(type));
1535 	dict_index_add_col(index, index->table,
1536 			   dict_table_get_nth_col(index->table, i), len);
1537 }
1538 /********************************************************************//**
1539 Deallocates a dummy index for inserting a record to a non-clustered index. */
1540 static
1541 void
ibuf_dummy_index_free(dict_index_t * index)1542 ibuf_dummy_index_free(
1543 /*==================*/
1544 	dict_index_t*	index)	/*!< in, own: dummy index */
1545 {
1546 	dict_table_t*	table = index->table;
1547 
1548 	dict_mem_index_free(index);
1549 	dict_mem_table_free(table);
1550 }
1551 
1552 #ifdef UNIV_DEBUG
1553 # define ibuf_build_entry_from_ibuf_rec(mtr,ibuf_rec,heap,pindex)	\
1554 	ibuf_build_entry_from_ibuf_rec_func(mtr,ibuf_rec,heap,pindex)
1555 #else /* UNIV_DEBUG */
1556 # define ibuf_build_entry_from_ibuf_rec(mtr,ibuf_rec,heap,pindex)	\
1557 	ibuf_build_entry_from_ibuf_rec_func(ibuf_rec,heap,pindex)
1558 #endif
1559 
1560 /*********************************************************************//**
1561 Builds the entry used to
1562 
1563 1) IBUF_OP_INSERT: insert into a non-clustered index
1564 
1565 2) IBUF_OP_DELETE_MARK: find the record whose delete-mark flag we need to
1566    activate
1567 
1568 3) IBUF_OP_DELETE: find the record we need to delete
1569 
1570 when we have the corresponding record in an ibuf index.
1571 
1572 NOTE that as we copy pointers to fields in ibuf_rec, the caller must
1573 hold a latch to the ibuf_rec page as long as the entry is used!
1574 
1575 @return own: entry to insert to a non-clustered index */
1576 static
1577 dtuple_t*
ibuf_build_entry_from_ibuf_rec_func(mtr_t * mtr,const rec_t * ibuf_rec,mem_heap_t * heap,dict_index_t ** pindex)1578 ibuf_build_entry_from_ibuf_rec_func(
1579 /*================================*/
1580 #ifdef UNIV_DEBUG
1581 	mtr_t*		mtr,	/*!< in: mini-transaction owning rec */
1582 #endif /* UNIV_DEBUG */
1583 	const rec_t*	ibuf_rec,	/*!< in: record in an insert buffer */
1584 	mem_heap_t*	heap,		/*!< in: heap where built */
1585 	dict_index_t**	pindex)		/*!< out, own: dummy index that
1586 					describes the entry */
1587 {
1588 	dtuple_t*	tuple;
1589 	dfield_t*	field;
1590 	ulint		n_fields;
1591 	const byte*	types;
1592 	const byte*	data;
1593 	ulint		len;
1594 	ulint		info_len;
1595 	ulint		i;
1596 	ulint		comp;
1597 	dict_index_t*	index;
1598 
1599 	ut_ad(mtr_memo_contains_page(mtr, ibuf_rec, MTR_MEMO_PAGE_X_FIX)
1600 	      || mtr_memo_contains_page(mtr, ibuf_rec, MTR_MEMO_PAGE_S_FIX));
1601 	ut_ad(ibuf_inside(mtr));
1602 
1603 	data = rec_get_nth_field_old(ibuf_rec, IBUF_REC_FIELD_MARKER, &len);
1604 
1605 	ut_a(len == 1);
1606 	ut_a(*data == 0);
1607 	ut_a(rec_get_n_fields_old(ibuf_rec) > IBUF_REC_FIELD_USER);
1608 
1609 	n_fields = rec_get_n_fields_old(ibuf_rec) - IBUF_REC_FIELD_USER;
1610 
1611 	tuple = dtuple_create(heap, n_fields);
1612 
1613 	types = rec_get_nth_field_old(ibuf_rec, IBUF_REC_FIELD_METADATA, &len);
1614 
1615 	ibuf_rec_get_info(mtr, ibuf_rec, NULL, &comp, &info_len, NULL);
1616 
1617 	index = ibuf_dummy_index_create(n_fields, comp);
1618 
1619 	len -= info_len;
1620 	types += info_len;
1621 
1622 	ut_a(len == n_fields * DATA_NEW_ORDER_NULL_TYPE_BUF_SIZE);
1623 
1624 	for (i = 0; i < n_fields; i++) {
1625 		field = dtuple_get_nth_field(tuple, i);
1626 
1627 		data = rec_get_nth_field_old(
1628 			ibuf_rec, i + IBUF_REC_FIELD_USER, &len);
1629 
1630 		dfield_set_data(field, data, len);
1631 
1632 		dtype_new_read_for_order_and_null_size(
1633 			dfield_get_type(field),
1634 			types + i * DATA_NEW_ORDER_NULL_TYPE_BUF_SIZE);
1635 
1636 		ibuf_dummy_index_add_col(index, dfield_get_type(field), len);
1637 	}
1638 
1639 	/* Prevent an ut_ad() failure in page_zip_write_rec() by
1640 	adding system columns to the dummy table pointed to by the
1641 	dummy secondary index.  The insert buffer is only used for
1642 	secondary indexes, whose records never contain any system
1643 	columns, such as DB_TRX_ID. */
1644 	ut_d(dict_table_add_system_columns(index->table, index->table->heap));
1645 
1646 	*pindex = index;
1647 
1648 	return(tuple);
1649 }
1650 
1651 /******************************************************************//**
1652 Get the data size.
1653 @return size of fields */
1654 UNIV_INLINE
1655 ulint
ibuf_rec_get_size(const rec_t * rec,const byte * types,ulint n_fields,ulint comp)1656 ibuf_rec_get_size(
1657 /*==============*/
1658 	const rec_t*	rec,			/*!< in: ibuf record */
1659 	const byte*	types,			/*!< in: fields */
1660 	ulint		n_fields,		/*!< in: number of fields */
1661 	ulint		comp)			/*!< in: 0=ROW_FORMAT=REDUNDANT,
1662 						nonzero=ROW_FORMAT=COMPACT */
1663 {
1664 	ulint	i;
1665 	ulint	field_offset;
1666 	ulint	types_offset;
1667 	ulint	size = 0;
1668 
1669 	field_offset = IBUF_REC_FIELD_USER;
1670 	types_offset = DATA_NEW_ORDER_NULL_TYPE_BUF_SIZE;
1671 
1672 	for (i = 0; i < n_fields; i++) {
1673 		ulint		len;
1674 		dtype_t		dtype;
1675 
1676 		rec_get_nth_field_offs_old(rec, i + field_offset, &len);
1677 
1678 		if (len != UNIV_SQL_NULL) {
1679 			size += len;
1680 		} else {
1681 			dtype_new_read_for_order_and_null_size(&dtype, types);
1682 
1683 			size += dtype_get_sql_null_size(&dtype, comp);
1684 		}
1685 
1686 		types += types_offset;
1687 	}
1688 
1689 	return(size);
1690 }
1691 
1692 #ifdef UNIV_DEBUG
1693 # define ibuf_rec_get_volume(mtr,rec) ibuf_rec_get_volume_func(mtr,rec)
1694 #else /* UNIV_DEBUG */
1695 # define ibuf_rec_get_volume(mtr,rec) ibuf_rec_get_volume_func(rec)
1696 #endif
1697 
1698 /********************************************************************//**
1699 Returns the space taken by a stored non-clustered index entry if converted to
1700 an index record.
1701 @return size of index record in bytes + an upper limit of the space
1702 taken in the page directory */
1703 static
1704 ulint
ibuf_rec_get_volume_func(mtr_t * mtr,const rec_t * ibuf_rec)1705 ibuf_rec_get_volume_func(
1706 /*=====================*/
1707 #ifdef UNIV_DEBUG
1708 	mtr_t*		mtr,	/*!< in: mini-transaction owning rec */
1709 #endif /* UNIV_DEBUG */
1710 	const rec_t*	ibuf_rec)/*!< in: ibuf record */
1711 {
1712 	ulint		len;
1713 	const byte*	data;
1714 	const byte*	types;
1715 	ulint		n_fields;
1716 	ulint		data_size;
1717 	ulint		comp;
1718 	ibuf_op_t	op;
1719 	ulint		info_len;
1720 
1721 	ut_ad(mtr_memo_contains_page(mtr, ibuf_rec, MTR_MEMO_PAGE_X_FIX)
1722 	      || mtr_memo_contains_page(mtr, ibuf_rec, MTR_MEMO_PAGE_S_FIX));
1723 	ut_ad(ibuf_inside(mtr));
1724 	ut_ad(rec_get_n_fields_old(ibuf_rec) > 2);
1725 
1726 	data = rec_get_nth_field_old(ibuf_rec, IBUF_REC_FIELD_MARKER, &len);
1727 	ut_a(len == 1);
1728 	ut_a(*data == 0);
1729 
1730 	types = rec_get_nth_field_old(
1731 		ibuf_rec, IBUF_REC_FIELD_METADATA, &len);
1732 
1733 	ibuf_rec_get_info(mtr, ibuf_rec, &op, &comp, &info_len, NULL);
1734 
1735 	if (op == IBUF_OP_DELETE_MARK || op == IBUF_OP_DELETE) {
1736 		/* Delete-marking a record doesn't take any
1737 		additional space, and while deleting a record
1738 		actually frees up space, we have to play it safe and
1739 		pretend it takes no additional space (the record
1740 		might not exist, etc.).  */
1741 
1742 		return(0);
1743 	} else if (comp) {
1744 		dtuple_t*	entry;
1745 		ulint		volume;
1746 		dict_index_t*	dummy_index;
1747 		mem_heap_t*	heap = mem_heap_create(500);
1748 
1749 		entry = ibuf_build_entry_from_ibuf_rec(mtr, ibuf_rec,
1750 			heap, &dummy_index);
1751 
1752 		volume = rec_get_converted_size(dummy_index, entry, 0);
1753 
1754 		ibuf_dummy_index_free(dummy_index);
1755 		mem_heap_free(heap);
1756 
1757 		return(volume + page_dir_calc_reserved_space(1));
1758 	}
1759 
1760 	types += info_len;
1761 	n_fields = rec_get_n_fields_old(ibuf_rec)
1762 		- IBUF_REC_FIELD_USER;
1763 
1764 	data_size = ibuf_rec_get_size(ibuf_rec, types, n_fields, comp);
1765 
1766 	return(data_size + rec_get_converted_extra_size(data_size, n_fields, 0)
1767 	       + page_dir_calc_reserved_space(1));
1768 }
1769 
1770 /*********************************************************************//**
1771 Builds the tuple to insert to an ibuf tree when we have an entry for a
1772 non-clustered index.
1773 
1774 NOTE that the original entry must be kept because we copy pointers to
1775 its fields.
1776 
1777 @return own: entry to insert into an ibuf index tree */
1778 static
1779 dtuple_t*
ibuf_entry_build(ibuf_op_t op,dict_index_t * index,const dtuple_t * entry,ulint space,ulint page_no,ulint counter,mem_heap_t * heap)1780 ibuf_entry_build(
1781 /*=============*/
1782 	ibuf_op_t	op,	/*!< in: operation type */
1783 	dict_index_t*	index,	/*!< in: non-clustered index */
1784 	const dtuple_t*	entry,	/*!< in: entry for a non-clustered index */
1785 	ulint		space,	/*!< in: space id */
1786 	ulint		page_no,/*!< in: index page number where entry should
1787 				be inserted */
1788 	ulint		counter,/*!< in: counter value;
1789 				ULINT_UNDEFINED=not used */
1790 	mem_heap_t*	heap)	/*!< in: heap into which to build */
1791 {
1792 	dtuple_t*	tuple;
1793 	dfield_t*	field;
1794 	const dfield_t*	entry_field;
1795 	ulint		n_fields;
1796 	byte*		buf;
1797 	byte*		ti;
1798 	byte*		type_info;
1799 	ulint		i;
1800 
1801 	ut_ad(counter != ULINT_UNDEFINED || op == IBUF_OP_INSERT);
1802 	ut_ad(counter == ULINT_UNDEFINED || counter <= 0xFFFF);
1803 	ut_ad(op < IBUF_OP_COUNT);
1804 
1805 	/* We have to build a tuple with the following fields:
1806 
1807 	1-4) These are described at the top of this file.
1808 
1809 	5) The rest of the fields are copied from the entry.
1810 
1811 	All fields in the tuple are ordered like the type binary in our
1812 	insert buffer tree. */
1813 
1814 	n_fields = dtuple_get_n_fields(entry);
1815 
1816 	tuple = dtuple_create(heap, n_fields + IBUF_REC_FIELD_USER);
1817 
1818 	/* 1) Space Id */
1819 
1820 	field = dtuple_get_nth_field(tuple, IBUF_REC_FIELD_SPACE);
1821 
1822 	buf = static_cast<byte*>(mem_heap_alloc(heap, 4));
1823 
1824 	mach_write_to_4(buf, space);
1825 
1826 	dfield_set_data(field, buf, 4);
1827 
1828 	/* 2) Marker byte */
1829 
1830 	field = dtuple_get_nth_field(tuple, IBUF_REC_FIELD_MARKER);
1831 
1832 	buf = static_cast<byte*>(mem_heap_alloc(heap, 1));
1833 
1834 	/* We set the marker byte zero */
1835 
1836 	mach_write_to_1(buf, 0);
1837 
1838 	dfield_set_data(field, buf, 1);
1839 
1840 	/* 3) Page number */
1841 
1842 	field = dtuple_get_nth_field(tuple, IBUF_REC_FIELD_PAGE);
1843 
1844 	buf = static_cast<byte*>(mem_heap_alloc(heap, 4));
1845 
1846 	mach_write_to_4(buf, page_no);
1847 
1848 	dfield_set_data(field, buf, 4);
1849 
1850 	/* 4) Type info, part #1 */
1851 
1852 	if (counter == ULINT_UNDEFINED) {
1853 		i = dict_table_is_comp(index->table) ? 1 : 0;
1854 	} else {
1855 		ut_ad(counter <= 0xFFFF);
1856 		i = IBUF_REC_INFO_SIZE;
1857 	}
1858 
1859 	ti = type_info = static_cast<byte*>(
1860 		mem_heap_alloc(
1861 			heap,
1862 			i + n_fields * DATA_NEW_ORDER_NULL_TYPE_BUF_SIZE));
1863 
1864 	switch (i) {
1865 	default:
1866 		ut_error;
1867 		break;
1868 	case 1:
1869 		/* set the flag for ROW_FORMAT=COMPACT */
1870 		*ti++ = 0;
1871 		/* fall through */
1872 	case 0:
1873 		/* the old format does not allow delete buffering */
1874 		ut_ad(op == IBUF_OP_INSERT);
1875 		break;
1876 	case IBUF_REC_INFO_SIZE:
1877 		mach_write_to_2(ti + IBUF_REC_OFFSET_COUNTER, counter);
1878 
1879 		ti[IBUF_REC_OFFSET_TYPE] = (byte) op;
1880 		ti[IBUF_REC_OFFSET_FLAGS] = dict_table_is_comp(index->table)
1881 			? IBUF_REC_COMPACT : 0;
1882 		ti += IBUF_REC_INFO_SIZE;
1883 		break;
1884 	}
1885 
1886 	/* 5+) Fields from the entry */
1887 
1888 	for (i = 0; i < n_fields; i++) {
1889 		ulint			fixed_len;
1890 		const dict_field_t*	ifield;
1891 
1892 		field = dtuple_get_nth_field(tuple, i + IBUF_REC_FIELD_USER);
1893 		entry_field = dtuple_get_nth_field(entry, i);
1894 		dfield_copy(field, entry_field);
1895 
1896 		ifield = dict_index_get_nth_field(index, i);
1897 		/* Prefix index columns of fixed-length columns are of
1898 		fixed length.  However, in the function call below,
1899 		dfield_get_type(entry_field) contains the fixed length
1900 		of the column in the clustered index.  Replace it with
1901 		the fixed length of the secondary index column. */
1902 		fixed_len = ifield->fixed_len;
1903 
1904 #ifdef UNIV_DEBUG
1905 		if (fixed_len) {
1906 			/* dict_index_add_col() should guarantee these */
1907 			ut_ad(fixed_len <= (ulint)
1908 			      dfield_get_type(entry_field)->len);
1909 			if (ifield->prefix_len) {
1910 				ut_ad(ifield->prefix_len == fixed_len);
1911 			} else {
1912 				ut_ad(fixed_len == (ulint)
1913 				      dfield_get_type(entry_field)->len);
1914 			}
1915 		}
1916 #endif /* UNIV_DEBUG */
1917 
1918 		dtype_new_store_for_order_and_null_size(
1919 			ti, dfield_get_type(entry_field), fixed_len);
1920 		ti += DATA_NEW_ORDER_NULL_TYPE_BUF_SIZE;
1921 	}
1922 
1923 	/* 4) Type info, part #2 */
1924 
1925 	field = dtuple_get_nth_field(tuple, IBUF_REC_FIELD_METADATA);
1926 
1927 	dfield_set_data(field, type_info, ti - type_info);
1928 
1929 	/* Set all the types in the new tuple binary */
1930 
1931 	dtuple_set_types_binary(tuple, n_fields + IBUF_REC_FIELD_USER);
1932 
1933 	return(tuple);
1934 }
1935 
1936 /*********************************************************************//**
1937 Builds a search tuple used to search buffered inserts for an index page.
1938 This is for >= 4.1.x format records.
1939 @return own: search tuple */
1940 static
1941 dtuple_t*
ibuf_search_tuple_build(ulint space,ulint page_no,mem_heap_t * heap)1942 ibuf_search_tuple_build(
1943 /*====================*/
1944 	ulint		space,	/*!< in: space id */
1945 	ulint		page_no,/*!< in: index page number */
1946 	mem_heap_t*	heap)	/*!< in: heap into which to build */
1947 {
1948 	dtuple_t*	tuple;
1949 	dfield_t*	field;
1950 	byte*		buf;
1951 
1952 	tuple = dtuple_create(heap, IBUF_REC_FIELD_METADATA);
1953 
1954 	/* Store the space id in tuple */
1955 
1956 	field = dtuple_get_nth_field(tuple, IBUF_REC_FIELD_SPACE);
1957 
1958 	buf = static_cast<byte*>(mem_heap_alloc(heap, 4));
1959 
1960 	mach_write_to_4(buf, space);
1961 
1962 	dfield_set_data(field, buf, 4);
1963 
1964 	/* Store the new format record marker byte */
1965 
1966 	field = dtuple_get_nth_field(tuple, IBUF_REC_FIELD_MARKER);
1967 
1968 	buf = static_cast<byte*>(mem_heap_alloc(heap, 1));
1969 
1970 	mach_write_to_1(buf, 0);
1971 
1972 	dfield_set_data(field, buf, 1);
1973 
1974 	/* Store the page number in tuple */
1975 
1976 	field = dtuple_get_nth_field(tuple, IBUF_REC_FIELD_PAGE);
1977 
1978 	buf = static_cast<byte*>(mem_heap_alloc(heap, 4));
1979 
1980 	mach_write_to_4(buf, page_no);
1981 
1982 	dfield_set_data(field, buf, 4);
1983 
1984 	dtuple_set_types_binary(tuple, IBUF_REC_FIELD_METADATA);
1985 
1986 	return(tuple);
1987 }
1988 
1989 /*********************************************************************//**
1990 Checks if there are enough pages in the free list of the ibuf tree that we
1991 dare to start a pessimistic insert to the insert buffer.
1992 @return TRUE if enough free pages in list */
1993 UNIV_INLINE
1994 ibool
ibuf_data_enough_free_for_insert(void)1995 ibuf_data_enough_free_for_insert(void)
1996 /*==================================*/
1997 {
1998 	ut_ad(mutex_own(&ibuf_mutex));
1999 
2000 	/* We want a big margin of free pages, because a B-tree can sometimes
2001 	grow in size also if records are deleted from it, as the node pointers
2002 	can change, and we must make sure that we are able to delete the
2003 	inserts buffered for pages that we read to the buffer pool, without
2004 	any risk of running out of free space in the insert buffer. */
2005 
2006 	return(ibuf->free_list_len >= (ibuf->size / 2) + 3 * ibuf->height);
2007 }
2008 
2009 /*********************************************************************//**
2010 Checks if there are enough pages in the free list of the ibuf tree that we
2011 should remove them and free to the file space management.
2012 @return TRUE if enough free pages in list */
2013 UNIV_INLINE
2014 ibool
ibuf_data_too_much_free(void)2015 ibuf_data_too_much_free(void)
2016 /*=========================*/
2017 {
2018 	ut_ad(mutex_own(&ibuf_mutex));
2019 
2020 	return(ibuf->free_list_len >= 3 + (ibuf->size / 2) + 3 * ibuf->height);
2021 }
2022 
2023 /*********************************************************************//**
2024 Allocates a new page from the ibuf file segment and adds it to the free
2025 list.
2026 @return TRUE on success, FALSE if no space left */
2027 static
2028 ibool
ibuf_add_free_page(void)2029 ibuf_add_free_page(void)
2030 /*====================*/
2031 {
2032 	mtr_t		mtr;
2033 	page_t*		header_page;
2034 	buf_block_t*	block;
2035 	page_t*		page;
2036 	page_t*		root;
2037 	page_t*		bitmap_page;
2038 
2039 	mtr_start(&mtr);
2040 	fil_space_t* space = mtr.set_sys_modified();
2041 
2042 	/* Acquire the fsp latch before the ibuf header, obeying the latching
2043 	order */
2044 	mtr_x_lock(&space->latch, &mtr);
2045 	header_page = ibuf_header_page_get(&mtr);
2046 
2047 	/* Allocate a new page: NOTE that if the page has been a part of a
2048 	non-clustered index which has subsequently been dropped, then the
2049 	page may have buffered inserts in the insert buffer, and these
2050 	should be deleted from there. These get deleted when the page
2051 	allocation creates the page in buffer. Thus the call below may end
2052 	up calling the insert buffer routines and, as we yet have no latches
2053 	to insert buffer tree pages, these routines can run without a risk
2054 	of a deadlock. This is the reason why we created a special ibuf
2055 	header page apart from the ibuf tree. */
2056 
2057 	block = fseg_alloc_free_page(
2058 		header_page + IBUF_HEADER + IBUF_TREE_SEG_HEADER, 0, FSP_UP,
2059 		&mtr);
2060 
2061 	if (block == NULL) {
2062 		mtr_commit(&mtr);
2063 
2064 		return(FALSE);
2065 	}
2066 
2067 	ut_ad(rw_lock_get_x_lock_count(&block->lock) == 1);
2068 	ibuf_enter(&mtr);
2069 	mutex_enter(&ibuf_mutex);
2070 	root = ibuf_tree_root_get(&mtr);
2071 
2072 	buf_block_dbg_add_level(block, SYNC_IBUF_TREE_NODE_NEW);
2073 	page = buf_block_get_frame(block);
2074 
2075 	/* Add the page to the free list and update the ibuf size data */
2076 
2077 	flst_add_last(root + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST,
2078 		      page + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST_NODE, &mtr);
2079 
2080 	mlog_write_ulint(page + FIL_PAGE_TYPE, FIL_PAGE_IBUF_FREE_LIST,
2081 			 MLOG_2BYTES, &mtr);
2082 
2083 	ibuf->seg_size++;
2084 	ibuf->free_list_len++;
2085 
2086 	/* Set the bit indicating that this page is now an ibuf tree page
2087 	(level 2 page) */
2088 
2089 	const page_id_t		page_id(IBUF_SPACE_ID, block->page.id.page_no());
2090 	const page_size_t	page_size(space->flags);
2091 
2092 	bitmap_page = ibuf_bitmap_get_map_page(page_id, page_size, &mtr);
2093 
2094 	mutex_exit(&ibuf_mutex);
2095 
2096 	ibuf_bitmap_page_set_bits(bitmap_page, page_id, page_size,
2097 				  IBUF_BITMAP_IBUF, TRUE, &mtr);
2098 
2099 	ibuf_mtr_commit(&mtr);
2100 
2101 	return(TRUE);
2102 }
2103 
2104 /*********************************************************************//**
2105 Removes a page from the free list and frees it to the fsp system. */
2106 static
2107 void
ibuf_remove_free_page(void)2108 ibuf_remove_free_page(void)
2109 /*=======================*/
2110 {
2111 	mtr_t	mtr;
2112 	mtr_t	mtr2;
2113 	page_t*	header_page;
2114 	ulint	page_no;
2115 	page_t*	page;
2116 	page_t*	root;
2117 	page_t*	bitmap_page;
2118 
2119 	mtr_start(&mtr);
2120 	fil_space_t*		space = mtr.set_sys_modified();
2121 	const page_size_t	page_size(space->flags);
2122 
2123 	/* Acquire the fsp latch before the ibuf header, obeying the latching
2124 	order */
2125 
2126 	mtr_x_lock(&space->latch, &mtr);
2127 	header_page = ibuf_header_page_get(&mtr);
2128 
2129 	/* Prevent pessimistic inserts to insert buffer trees for a while */
2130 	ibuf_enter(&mtr);
2131 	mutex_enter(&ibuf_pessimistic_insert_mutex);
2132 	mutex_enter(&ibuf_mutex);
2133 
2134 	if (!ibuf_data_too_much_free()) {
2135 
2136 		mutex_exit(&ibuf_mutex);
2137 		mutex_exit(&ibuf_pessimistic_insert_mutex);
2138 
2139 		ibuf_mtr_commit(&mtr);
2140 
2141 		return;
2142 	}
2143 
2144 	ibuf_mtr_start(&mtr2);
2145 
2146 	root = ibuf_tree_root_get(&mtr2);
2147 
2148 	mutex_exit(&ibuf_mutex);
2149 
2150 	page_no = flst_get_last(root + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST,
2151 				&mtr2).page;
2152 
2153 	/* NOTE that we must release the latch on the ibuf tree root
2154 	because in fseg_free_page we access level 1 pages, and the root
2155 	is a level 2 page. */
2156 
2157 	ibuf_mtr_commit(&mtr2);
2158 	ibuf_exit(&mtr);
2159 
2160 	/* Since pessimistic inserts were prevented, we know that the
2161 	page is still in the free list. NOTE that also deletes may take
2162 	pages from the free list, but they take them from the start, and
2163 	the free list was so long that they cannot have taken the last
2164 	page from it. */
2165 
2166 	fseg_free_page(header_page + IBUF_HEADER + IBUF_TREE_SEG_HEADER,
2167 		       IBUF_SPACE_ID, page_no, false, &mtr);
2168 
2169 	const page_id_t	page_id(IBUF_SPACE_ID, page_no);
2170 
2171 	ut_d(buf_page_reset_file_page_was_freed(page_id));
2172 
2173 	ibuf_enter(&mtr);
2174 
2175 	mutex_enter(&ibuf_mutex);
2176 
2177 	root = ibuf_tree_root_get(&mtr);
2178 
2179 	ut_ad(page_no == flst_get_last(root + PAGE_HEADER
2180 				       + PAGE_BTR_IBUF_FREE_LIST, &mtr).page);
2181 
2182 	{
2183 		buf_block_t*	block;
2184 
2185 		block = buf_page_get(page_id, univ_page_size, RW_X_LATCH, &mtr);
2186 
2187 		buf_block_dbg_add_level(block, SYNC_IBUF_TREE_NODE);
2188 
2189 		page = buf_block_get_frame(block);
2190 	}
2191 
2192 	/* Remove the page from the free list and update the ibuf size data */
2193 
2194 	flst_remove(root + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST,
2195 		    page + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST_NODE, &mtr);
2196 
2197 	mutex_exit(&ibuf_pessimistic_insert_mutex);
2198 
2199 	ibuf->seg_size--;
2200 	ibuf->free_list_len--;
2201 
2202 	/* Set the bit indicating that this page is no more an ibuf tree page
2203 	(level 2 page) */
2204 
2205 	bitmap_page = ibuf_bitmap_get_map_page(page_id, page_size, &mtr);
2206 
2207 	mutex_exit(&ibuf_mutex);
2208 
2209 	ibuf_bitmap_page_set_bits(
2210 		bitmap_page, page_id, page_size, IBUF_BITMAP_IBUF, FALSE,
2211 		&mtr);
2212 
2213 	ut_d(buf_page_set_file_page_was_freed(page_id));
2214 
2215 	ibuf_mtr_commit(&mtr);
2216 }
2217 
2218 /***********************************************************************//**
2219 Frees excess pages from the ibuf free list. This function is called when an OS
2220 thread calls fsp services to allocate a new file segment, or a new page to a
2221 file segment, and the thread did not own the fsp latch before this call. */
2222 void
ibuf_free_excess_pages(void)2223 ibuf_free_excess_pages(void)
2224 /*========================*/
2225 {
2226 	ut_ad(rw_lock_own(fil_space_get_latch(IBUF_SPACE_ID, NULL), RW_LOCK_X));
2227 
2228 	ut_ad(rw_lock_get_x_lock_count(
2229 		fil_space_get_latch(IBUF_SPACE_ID, NULL)) == 1);
2230 
2231 	/* NOTE: We require that the thread did not own the latch before,
2232 	because then we know that we can obey the correct latching order
2233 	for ibuf latches */
2234 
2235 	if (!ibuf) {
2236 		/* Not yet initialized; not sure if this is possible, but
2237 		does no harm to check for it. */
2238 
2239 		return;
2240 	}
2241 
2242 	/* Free at most a few pages at a time, so that we do not delay the
2243 	requested service too much */
2244 
2245 	for (ulint i = 0; i < 4; i++) {
2246 
2247 		ibool	too_much_free;
2248 
2249 		mutex_enter(&ibuf_mutex);
2250 		too_much_free = ibuf_data_too_much_free();
2251 		mutex_exit(&ibuf_mutex);
2252 
2253 		if (!too_much_free) {
2254 			return;
2255 		}
2256 
2257 		ibuf_remove_free_page();
2258 	}
2259 }
2260 
2261 #ifdef UNIV_DEBUG
2262 # define ibuf_get_merge_page_nos(contract,rec,mtr,ids,pages,n_stored) \
2263 	ibuf_get_merge_page_nos_func(contract,rec,mtr,ids,pages,n_stored)
2264 #else /* UNIV_DEBUG */
2265 # define ibuf_get_merge_page_nos(contract,rec,mtr,ids,pages,n_stored) \
2266 	ibuf_get_merge_page_nos_func(contract,rec,ids,pages,n_stored)
2267 #endif /* UNIV_DEBUG */
2268 
2269 /*********************************************************************//**
2270 Reads page numbers from a leaf in an ibuf tree.
2271 @return a lower limit for the combined volume of records which will be
2272 merged */
2273 static
2274 ulint
ibuf_get_merge_page_nos_func(ibool contract,const rec_t * rec,mtr_t * mtr,ulint * space_ids,ulint * page_nos,ulint * n_stored)2275 ibuf_get_merge_page_nos_func(
2276 /*=========================*/
2277 	ibool		contract,/*!< in: TRUE if this function is called to
2278 				contract the tree, FALSE if this is called
2279 				when a single page becomes full and we look
2280 				if it pays to read also nearby pages */
2281 	const rec_t*	rec,	/*!< in: insert buffer record */
2282 #ifdef UNIV_DEBUG
2283 	mtr_t*		mtr,	/*!< in: mini-transaction holding rec */
2284 #endif /* UNIV_DEBUG */
2285 	ulint*		space_ids,/*!< in/out: space id's of the pages */
2286 	ulint*		page_nos,/*!< in/out: buffer for at least
2287 				IBUF_MAX_N_PAGES_MERGED many page numbers;
2288 				the page numbers are in an ascending order */
2289 	ulint*		n_stored)/*!< out: number of page numbers stored to
2290 				page_nos in this function */
2291 {
2292 	ulint	prev_page_no;
2293 	ulint	prev_space_id;
2294 	ulint	first_page_no;
2295 	ulint	first_space_id;
2296 	ulint	rec_page_no;
2297 	ulint	rec_space_id;
2298 	ulint	sum_volumes;
2299 	ulint	volume_for_page;
2300 	ulint	rec_volume;
2301 	ulint	limit;
2302 	ulint	n_pages;
2303 
2304 	ut_ad(mtr_memo_contains_page(mtr, rec, MTR_MEMO_PAGE_X_FIX)
2305 	      || mtr_memo_contains_page(mtr, rec, MTR_MEMO_PAGE_S_FIX));
2306 	ut_ad(ibuf_inside(mtr));
2307 
2308 	*n_stored = 0;
2309 
2310 	limit = ut_min(IBUF_MAX_N_PAGES_MERGED,
2311 		       buf_pool_get_curr_size() / 4);
2312 
2313 	if (page_rec_is_supremum(rec)) {
2314 
2315 		rec = page_rec_get_prev_const(rec);
2316 	}
2317 
2318 	if (page_rec_is_infimum(rec)) {
2319 
2320 		rec = page_rec_get_next_const(rec);
2321 	}
2322 
2323 	if (page_rec_is_supremum(rec)) {
2324 
2325 		return(0);
2326 	}
2327 
2328 	first_page_no = ibuf_rec_get_page_no(mtr, rec);
2329 	first_space_id = ibuf_rec_get_space(mtr, rec);
2330 	n_pages = 0;
2331 	prev_page_no = 0;
2332 	prev_space_id = 0;
2333 
2334 	/* Go backwards from the first rec until we reach the border of the
2335 	'merge area', or the page start or the limit of storeable pages is
2336 	reached */
2337 
2338 	while (!page_rec_is_infimum(rec) && UNIV_LIKELY(n_pages < limit)) {
2339 
2340 		rec_page_no = ibuf_rec_get_page_no(mtr, rec);
2341 		rec_space_id = ibuf_rec_get_space(mtr, rec);
2342 
2343 		if (rec_space_id != first_space_id
2344 		    || (rec_page_no / IBUF_MERGE_AREA)
2345 		    != (first_page_no / IBUF_MERGE_AREA)) {
2346 
2347 			break;
2348 		}
2349 
2350 		if (rec_page_no != prev_page_no
2351 		    || rec_space_id != prev_space_id) {
2352 			n_pages++;
2353 		}
2354 
2355 		prev_page_no = rec_page_no;
2356 		prev_space_id = rec_space_id;
2357 
2358 		rec = page_rec_get_prev_const(rec);
2359 	}
2360 
2361 	rec = page_rec_get_next_const(rec);
2362 
2363 	/* At the loop start there is no prev page; we mark this with a pair
2364 	of space id, page no (0, 0) for which there can never be entries in
2365 	the insert buffer */
2366 
2367 	prev_page_no = 0;
2368 	prev_space_id = 0;
2369 	sum_volumes = 0;
2370 	volume_for_page = 0;
2371 
2372 	while (*n_stored < limit) {
2373 		if (page_rec_is_supremum(rec)) {
2374 			/* When no more records available, mark this with
2375 			another 'impossible' pair of space id, page no */
2376 			rec_page_no = 1;
2377 			rec_space_id = 0;
2378 		} else {
2379 			rec_page_no = ibuf_rec_get_page_no(mtr, rec);
2380 			rec_space_id = ibuf_rec_get_space(mtr, rec);
2381 			/* In the system tablespace the smallest
2382 			possible secondary index leaf page number is
2383 			bigger than FSP_DICT_HDR_PAGE_NO (7).
2384 			In all tablespaces, pages 0 and 1 are reserved
2385 			for the allocation bitmap and the change
2386 			buffer bitmap. In file-per-table tablespaces,
2387 			a file segment inode page will be created at
2388 			page 2 and the clustered index tree is created
2389 			at page 3.  So for file-per-table tablespaces,
2390 			page 4 is the smallest possible secondary
2391 			index leaf page. CREATE TABLESPACE also initially
2392 			uses pages 2 and 3 for the first created table,
2393 			but that table may be dropped, allowing page 2
2394 			to be reused for a secondary index leaf page.
2395 			To keep this assertion simple, just
2396 			make sure the page is >= 2. */
2397 			ut_ad(rec_page_no >= FSP_FIRST_INODE_PAGE_NO);
2398 		}
2399 
2400 #ifdef UNIV_IBUF_DEBUG
2401 		ut_a(*n_stored < IBUF_MAX_N_PAGES_MERGED);
2402 #endif
2403 		if ((rec_space_id != prev_space_id
2404 		     || rec_page_no != prev_page_no)
2405 		    && (prev_space_id != 0 || prev_page_no != 0)) {
2406 
2407 			if (contract
2408 			    || (prev_page_no == first_page_no
2409 				&& prev_space_id == first_space_id)
2410 			    || (volume_for_page
2411 				> ((IBUF_MERGE_THRESHOLD - 1)
2412 				   * 4 * UNIV_PAGE_SIZE
2413 				   / IBUF_PAGE_SIZE_PER_FREE_SPACE)
2414 				/ IBUF_MERGE_THRESHOLD)) {
2415 
2416 				space_ids[*n_stored] = prev_space_id;
2417 				page_nos[*n_stored] = prev_page_no;
2418 
2419 				(*n_stored)++;
2420 
2421 				sum_volumes += volume_for_page;
2422 			}
2423 
2424 			if (rec_space_id != first_space_id
2425 			    || rec_page_no / IBUF_MERGE_AREA
2426 			    != first_page_no / IBUF_MERGE_AREA) {
2427 
2428 				break;
2429 			}
2430 
2431 			volume_for_page = 0;
2432 		}
2433 
2434 		if (rec_page_no == 1 && rec_space_id == 0) {
2435 			/* Supremum record */
2436 
2437 			break;
2438 		}
2439 
2440 		rec_volume = ibuf_rec_get_volume(mtr, rec);
2441 
2442 		volume_for_page += rec_volume;
2443 
2444 		prev_page_no = rec_page_no;
2445 		prev_space_id = rec_space_id;
2446 
2447 		rec = page_rec_get_next_const(rec);
2448 	}
2449 
2450 #ifdef UNIV_IBUF_DEBUG
2451 	ut_a(*n_stored <= IBUF_MAX_N_PAGES_MERGED);
2452 #endif
2453 #if 0
2454 	fprintf(stderr, "Ibuf merge batch %lu pages %lu volume\n",
2455 		*n_stored, sum_volumes);
2456 #endif
2457 	return(sum_volumes);
2458 }
2459 
2460 /*******************************************************************//**
2461 Get the matching records for space id.
2462 @return current rec or NULL */
2463 static	MY_ATTRIBUTE((nonnull, warn_unused_result))
2464 const rec_t*
ibuf_get_user_rec(btr_pcur_t * pcur,mtr_t * mtr)2465 ibuf_get_user_rec(
2466 /*===============*/
2467 	btr_pcur_t*	pcur,		/*!< in: the current cursor */
2468 	mtr_t*		mtr)		/*!< in: mini transaction */
2469 {
2470 	do {
2471 		const rec_t* rec = btr_pcur_get_rec(pcur);
2472 
2473 		if (page_rec_is_user_rec(rec)) {
2474 			return(rec);
2475 		}
2476 	} while (btr_pcur_move_to_next(pcur, mtr));
2477 
2478 	return(NULL);
2479 }
2480 
2481 /*********************************************************************//**
2482 Reads page numbers for a space id from an ibuf tree.
2483 @return a lower limit for the combined volume of records which will be
2484 merged */
2485 static	MY_ATTRIBUTE((nonnull, warn_unused_result))
2486 ulint
ibuf_get_merge_pages(btr_pcur_t * pcur,ulint space,ulint limit,ulint * pages,ulint * spaces,ulint * n_pages,mtr_t * mtr)2487 ibuf_get_merge_pages(
2488 /*=================*/
2489 	btr_pcur_t*	pcur,	/*!< in/out: cursor */
2490 	ulint		space,	/*!< in: space for which to merge */
2491 	ulint		limit,	/*!< in: max page numbers to read */
2492 	ulint*		pages,	/*!< out: pages read */
2493 	ulint*		spaces,	/*!< out: spaces read */
2494 	ulint*		n_pages,/*!< out: number of pages read */
2495 	mtr_t*		mtr)	/*!< in: mini transaction */
2496 {
2497 	const rec_t*	rec;
2498 	ulint		volume = 0;
2499 
2500 	ut_a(space != ULINT_UNDEFINED);
2501 
2502 	*n_pages = 0;
2503 
2504 	while ((rec = ibuf_get_user_rec(pcur, mtr)) != 0
2505 	       && ibuf_rec_get_space(mtr, rec) == space
2506 	       && *n_pages < limit) {
2507 
2508 		ulint	page_no = ibuf_rec_get_page_no(mtr, rec);
2509 
2510 		if (*n_pages == 0 || pages[*n_pages - 1] != page_no) {
2511 			spaces[*n_pages] = space;
2512 			pages[*n_pages] = page_no;
2513 			++*n_pages;
2514 		}
2515 
2516 		volume += ibuf_rec_get_volume(mtr, rec);
2517 
2518 		btr_pcur_move_to_next(pcur, mtr);
2519 	}
2520 
2521 	return(volume);
2522 }
2523 
2524 /*********************************************************************//**
2525 Contracts insert buffer trees by reading pages to the buffer pool.
2526 @return a lower limit for the combined size in bytes of entries which
2527 will be merged from ibuf trees to the pages read, 0 if ibuf is
2528 empty */
2529 static
2530 ulint
ibuf_merge_pages(ulint * n_pages,bool sync)2531 ibuf_merge_pages(
2532 /*=============*/
2533 	ulint*	n_pages,	/*!< out: number of pages to which merged */
2534 	bool	sync)		/*!< in: true if the caller wants to wait for
2535 				the issued read with the highest tablespace
2536 				address to complete */
2537 {
2538 	mtr_t		mtr;
2539 	btr_pcur_t	pcur;
2540 	ulint		sum_sizes;
2541 	ulint		page_nos[IBUF_MAX_N_PAGES_MERGED];
2542 	ulint		space_ids[IBUF_MAX_N_PAGES_MERGED];
2543 
2544 	*n_pages = 0;
2545 
2546 	ibuf_mtr_start(&mtr);
2547 
2548 	/* Open a cursor to a randomly chosen leaf of the tree, at a random
2549 	position within the leaf */
2550 	bool available;
2551 
2552 	available = btr_pcur_open_at_rnd_pos(ibuf->index, BTR_SEARCH_LEAF,
2553 					     &pcur, &mtr);
2554 	/* No one should make this index unavailable when server is running */
2555 	ut_a(available);
2556 
2557 	ut_ad(page_validate(btr_pcur_get_page(&pcur), ibuf->index));
2558 
2559 	if (page_is_empty(btr_pcur_get_page(&pcur))) {
2560 		/* If a B-tree page is empty, it must be the root page
2561 		and the whole B-tree must be empty. InnoDB does not
2562 		allow empty B-tree pages other than the root. */
2563 		ut_ad(ibuf->empty);
2564 		ut_ad(page_get_space_id(btr_pcur_get_page(&pcur))
2565 		      == IBUF_SPACE_ID);
2566 		ut_ad(page_get_page_no(btr_pcur_get_page(&pcur))
2567 		      == FSP_IBUF_TREE_ROOT_PAGE_NO);
2568 
2569 		ibuf_mtr_commit(&mtr);
2570 		btr_pcur_close(&pcur);
2571 
2572 		return(0);
2573 	}
2574 
2575 	sum_sizes = ibuf_get_merge_page_nos(TRUE,
2576 					    btr_pcur_get_rec(&pcur), &mtr,
2577 					    space_ids,
2578 					    page_nos, n_pages);
2579 #if 0 /* defined UNIV_IBUF_DEBUG */
2580 	fprintf(stderr, "Ibuf contract sync %lu pages %lu volume %lu\n",
2581 		sync, *n_pages, sum_sizes);
2582 #endif
2583 	ibuf_mtr_commit(&mtr);
2584 	btr_pcur_close(&pcur);
2585 
2586 	buf_read_ibuf_merge_pages(
2587 		sync, space_ids, page_nos, *n_pages);
2588 
2589 	return(sum_sizes + 1);
2590 }
2591 
2592 /*********************************************************************//**
2593 Contracts insert buffer trees by reading pages referring to space_id
2594 to the buffer pool.
2595 @returns number of pages merged.*/
2596 ulint
ibuf_merge_space(ulint space)2597 ibuf_merge_space(
2598 /*=============*/
2599 	ulint		space)	/*!< in: tablespace id to merge */
2600 {
2601 	mtr_t		mtr;
2602 	btr_pcur_t	pcur;
2603 	mem_heap_t*	heap = mem_heap_create(512);
2604 	dtuple_t*	tuple = ibuf_search_tuple_build(space, 0, heap);
2605 	ulint		n_pages = 0;
2606 
2607 	ut_ad(space < SRV_LOG_SPACE_FIRST_ID);
2608 
2609 	ibuf_mtr_start(&mtr);
2610 
2611 	/* Position the cursor on the first matching record. */
2612 
2613 	btr_pcur_open(
2614 		ibuf->index, tuple, PAGE_CUR_GE, BTR_SEARCH_LEAF, &pcur,
2615 		&mtr);
2616 
2617 	mem_heap_free(heap);
2618 
2619 	ut_ad(page_validate(btr_pcur_get_page(&pcur), ibuf->index));
2620 
2621 	ulint		sum_sizes = 0;
2622 	ulint		pages[IBUF_MAX_N_PAGES_MERGED];
2623 	ulint		spaces[IBUF_MAX_N_PAGES_MERGED];
2624 
2625 	if (page_is_empty(btr_pcur_get_page(&pcur))) {
2626 		/* If a B-tree page is empty, it must be the root page
2627 		and the whole B-tree must be empty. InnoDB does not
2628 		allow empty B-tree pages other than the root. */
2629 		ut_ad(ibuf->empty);
2630 		ut_ad(page_get_space_id(btr_pcur_get_page(&pcur))
2631 		      == IBUF_SPACE_ID);
2632 		ut_ad(page_get_page_no(btr_pcur_get_page(&pcur))
2633 		      == FSP_IBUF_TREE_ROOT_PAGE_NO);
2634 
2635 	} else {
2636 
2637 		sum_sizes = ibuf_get_merge_pages(
2638 			&pcur, space, IBUF_MAX_N_PAGES_MERGED,
2639 			&pages[0], &spaces[0], &n_pages,
2640 			&mtr);
2641 		ib::info() << "Size of pages merged " << sum_sizes;
2642 	}
2643 
2644 	ibuf_mtr_commit(&mtr);
2645 
2646 	btr_pcur_close(&pcur);
2647 
2648 	if (n_pages > 0) {
2649 		ut_ad(n_pages <= UT_ARR_SIZE(pages));
2650 
2651 #ifdef UNIV_DEBUG
2652 		for (ulint i = 0; i < n_pages; ++i) {
2653 			ut_ad(spaces[i] == space);
2654 		}
2655 #endif /* UNIV_DEBUG */
2656 
2657 		buf_read_ibuf_merge_pages(
2658 			true, spaces, pages, n_pages);
2659 	}
2660 
2661 	return(n_pages);
2662 }
2663 
2664 /** Contract the change buffer by reading pages to the buffer pool.
2665 @param[out]	n_pages		number of pages merged
2666 @param[in]	sync		whether the caller waits for
2667 the issued reads to complete
2668 @return a lower limit for the combined size in bytes of entries which
2669 will be merged from ibuf trees to the pages read, 0 if ibuf is
2670 empty */
2671 static MY_ATTRIBUTE((warn_unused_result))
2672 ulint
ibuf_merge(ulint * n_pages,bool sync)2673 ibuf_merge(
2674 	ulint*		n_pages,
2675 	bool		sync)
2676 {
2677 	*n_pages = 0;
2678 
2679 	/* We perform a dirty read of ibuf->empty, without latching
2680 	the insert buffer root page. We trust this dirty read except
2681 	when a slow shutdown is being executed. During a slow
2682 	shutdown, the insert buffer merge must be completed. */
2683 
2684 	if (ibuf->empty && !srv_shutdown_state) {
2685 		return(0);
2686 #if defined UNIV_DEBUG || defined UNIV_IBUF_DEBUG
2687 	} else if (ibuf_debug) {
2688 		return(0);
2689 #endif /* UNIV_DEBUG || UNIV_IBUF_DEBUG */
2690 	} else {
2691 		return(ibuf_merge_pages(n_pages, sync));
2692 	}
2693 }
2694 
2695 /** Contract the change buffer by reading pages to the buffer pool.
2696 @param[in]	sync	whether the caller waits for
2697 the issued reads to complete
2698 @return a lower limit for the combined size in bytes of entries which
2699 will be merged from ibuf trees to the pages read, 0 if ibuf is empty */
2700 static
2701 ulint
ibuf_contract(bool sync)2702 ibuf_contract(
2703 	bool	sync)
2704 {
2705 	ulint	n_pages;
2706 
2707 	return(ibuf_merge_pages(&n_pages, sync));
2708 }
2709 
2710 /** Contract the change buffer by reading pages to the buffer pool.
2711 @param[in]	full		If true, do a full contraction based
2712 on PCT_IO(100). If false, the size of contract batch is determined
2713 based on the current size of the change buffer.
2714 @return a lower limit for the combined size in bytes of entries which
2715 will be merged from ibuf trees to the pages read, 0 if ibuf is
2716 empty */
2717 ulint
ibuf_merge_in_background(bool full)2718 ibuf_merge_in_background(
2719 	bool	full)
2720 {
2721 	ulint	sum_bytes	= 0;
2722 	ulint	sum_pages	= 0;
2723 	ulint	n_pag2;
2724 	ulint	n_pages;
2725 
2726 #if defined UNIV_DEBUG || defined UNIV_IBUF_DEBUG
2727 	if (srv_ibuf_disable_background_merge) {
2728 		return(0);
2729 	}
2730 #endif /* UNIV_DEBUG || UNIV_IBUF_DEBUG */
2731 
2732 	if (full) {
2733 		/* Caller has requested a full batch */
2734 		n_pages = PCT_IO(100);
2735 	} else {
2736 		/* By default we do a batch of 5% of the io_capacity */
2737 		n_pages = PCT_IO(5);
2738 
2739 		mutex_enter(&ibuf_mutex);
2740 
2741 		/* If the ibuf->size is more than half the max_size
2742 		then we make more agreesive contraction.
2743 		+1 is to avoid division by zero. */
2744 		if (ibuf->size > ibuf->max_size / 2) {
2745 			ulint diff = ibuf->size - ibuf->max_size / 2;
2746 			n_pages += PCT_IO((diff * 100)
2747 					   / (ibuf->max_size + 1));
2748 		}
2749 
2750 		mutex_exit(&ibuf_mutex);
2751 	}
2752 
2753 	while (sum_pages < n_pages) {
2754 		ulint	n_bytes;
2755 
2756 		n_bytes = ibuf_merge(&n_pag2, false);
2757 
2758 		if (n_bytes == 0) {
2759 			return(sum_bytes);
2760 		}
2761 
2762 		sum_bytes += n_bytes;
2763 		sum_pages += n_pag2;
2764 
2765 		srv_inc_activity_count(true);
2766 	}
2767 
2768 	return(sum_bytes);
2769 }
2770 
2771 /*********************************************************************//**
2772 Contract insert buffer trees after insert if they are too big. */
2773 UNIV_INLINE
2774 void
ibuf_contract_after_insert(ulint entry_size)2775 ibuf_contract_after_insert(
2776 /*=======================*/
2777 	ulint	entry_size)	/*!< in: size of a record which was inserted
2778 				into an ibuf tree */
2779 {
2780 	ibool	sync;
2781 	ulint	sum_sizes;
2782 	ulint	size;
2783 	ulint	max_size;
2784 
2785 	/* Perform dirty reads of ibuf->size and ibuf->max_size, to
2786 	reduce ibuf_mutex contention. ibuf->max_size remains constant
2787 	after ibuf_init_at_db_start(), but ibuf->size should be
2788 	protected by ibuf_mutex. Given that ibuf->size fits in a
2789 	machine word, this should be OK; at worst we are doing some
2790 	excessive ibuf_contract() or occasionally skipping a
2791 	ibuf_contract(). */
2792 	size = ibuf->size;
2793 	max_size = ibuf->max_size;
2794 
2795 	if (size < max_size + IBUF_CONTRACT_ON_INSERT_NON_SYNC) {
2796 		return;
2797 	}
2798 
2799 	sync = (size >= max_size + IBUF_CONTRACT_ON_INSERT_SYNC);
2800 
2801 	/* Contract at least entry_size many bytes */
2802 	sum_sizes = 0;
2803 	size = 1;
2804 
2805 	do {
2806 
2807 		size = ibuf_contract(sync);
2808 		sum_sizes += size;
2809 	} while (size > 0 && sum_sizes < entry_size);
2810 }
2811 
2812 /*********************************************************************//**
2813 Determine if an insert buffer record has been encountered already.
2814 @return TRUE if a new record, FALSE if possible duplicate */
2815 static
2816 ibool
ibuf_get_volume_buffered_hash(const rec_t * rec,const byte * types,const byte * data,ulint comp,ulint * hash,ulint size)2817 ibuf_get_volume_buffered_hash(
2818 /*==========================*/
2819 	const rec_t*	rec,	/*!< in: ibuf record in post-4.1 format */
2820 	const byte*	types,	/*!< in: fields */
2821 	const byte*	data,	/*!< in: start of user record data */
2822 	ulint		comp,	/*!< in: 0=ROW_FORMAT=REDUNDANT,
2823 				nonzero=ROW_FORMAT=COMPACT */
2824 	ulint*		hash,	/*!< in/out: hash array */
2825 	ulint		size)	/*!< in: number of elements in hash array */
2826 {
2827 	ulint		len;
2828 	ulint		fold;
2829 	ulint		bitmask;
2830 
2831 	len = ibuf_rec_get_size(
2832 		rec, types,
2833 		rec_get_n_fields_old(rec) - IBUF_REC_FIELD_USER, comp);
2834 	fold = ut_fold_binary(data, len);
2835 
2836 	hash += (fold / (CHAR_BIT * sizeof *hash)) % size;
2837 	bitmask = static_cast<ulint>(
2838 		1 << (fold % (CHAR_BIT * sizeof(*hash))));
2839 
2840 	if (*hash & bitmask) {
2841 
2842 		return(FALSE);
2843 	}
2844 
2845 	/* We have not seen this record yet.  Insert it. */
2846 	*hash |= bitmask;
2847 
2848 	return(TRUE);
2849 }
2850 
2851 #ifdef UNIV_DEBUG
2852 # define ibuf_get_volume_buffered_count(mtr,rec,hash,size,n_recs)	\
2853 	ibuf_get_volume_buffered_count_func(mtr,rec,hash,size,n_recs)
2854 #else /* UNIV_DEBUG */
2855 # define ibuf_get_volume_buffered_count(mtr,rec,hash,size,n_recs)	\
2856 	ibuf_get_volume_buffered_count_func(rec,hash,size,n_recs)
2857 #endif /* UNIV_DEBUG */
2858 
2859 /*********************************************************************//**
2860 Update the estimate of the number of records on a page, and
2861 get the space taken by merging the buffered record to the index page.
2862 @return size of index record in bytes + an upper limit of the space
2863 taken in the page directory */
2864 static
2865 ulint
ibuf_get_volume_buffered_count_func(mtr_t * mtr,const rec_t * rec,ulint * hash,ulint size,lint * n_recs)2866 ibuf_get_volume_buffered_count_func(
2867 /*================================*/
2868 #ifdef UNIV_DEBUG
2869 	mtr_t*		mtr,	/*!< in: mini-transaction owning rec */
2870 #endif /* UNIV_DEBUG */
2871 	const rec_t*	rec,	/*!< in: insert buffer record */
2872 	ulint*		hash,	/*!< in/out: hash array */
2873 	ulint		size,	/*!< in: number of elements in hash array */
2874 	lint*		n_recs)	/*!< in/out: estimated number of records
2875 				on the page that rec points to */
2876 {
2877 	ulint		len;
2878 	ibuf_op_t	ibuf_op;
2879 	const byte*	types;
2880 	ulint		n_fields;
2881 
2882 	ut_ad(mtr_memo_contains_page(mtr, rec, MTR_MEMO_PAGE_X_FIX)
2883 	      || mtr_memo_contains_page(mtr, rec, MTR_MEMO_PAGE_S_FIX));
2884 	ut_ad(ibuf_inside(mtr));
2885 
2886 	n_fields = rec_get_n_fields_old(rec);
2887 	ut_ad(n_fields > IBUF_REC_FIELD_USER);
2888 	n_fields -= IBUF_REC_FIELD_USER;
2889 
2890 	rec_get_nth_field_offs_old(rec, 1, &len);
2891 	/* This function is only invoked when buffering new
2892 	operations.  All pre-4.1 records should have been merged
2893 	when the database was started up. */
2894 	ut_a(len == 1);
2895 
2896 	if (rec_get_deleted_flag(rec, 0)) {
2897 		/* This record has been merged already,
2898 		but apparently the system crashed before
2899 		the change was discarded from the buffer.
2900 		Pretend that the record does not exist. */
2901 		return(0);
2902 	}
2903 
2904 	types = rec_get_nth_field_old(rec, IBUF_REC_FIELD_METADATA, &len);
2905 
2906 	switch (UNIV_EXPECT(len % DATA_NEW_ORDER_NULL_TYPE_BUF_SIZE,
2907 			    IBUF_REC_INFO_SIZE)) {
2908 	default:
2909 		ut_error;
2910 	case 0:
2911 		/* This ROW_TYPE=REDUNDANT record does not include an
2912 		operation counter.  Exclude it from the *n_recs,
2913 		because deletes cannot be buffered if there are
2914 		old-style inserts buffered for the page. */
2915 
2916 		len = ibuf_rec_get_size(rec, types, n_fields, 0);
2917 
2918 		return(len
2919 		       + rec_get_converted_extra_size(len, n_fields, 0)
2920 		       + page_dir_calc_reserved_space(1));
2921 	case 1:
2922 		/* This ROW_TYPE=COMPACT record does not include an
2923 		operation counter.  Exclude it from the *n_recs,
2924 		because deletes cannot be buffered if there are
2925 		old-style inserts buffered for the page. */
2926 		goto get_volume_comp;
2927 
2928 	case IBUF_REC_INFO_SIZE:
2929 		ibuf_op = (ibuf_op_t) types[IBUF_REC_OFFSET_TYPE];
2930 		break;
2931 	}
2932 
2933 	switch (ibuf_op) {
2934 	case IBUF_OP_INSERT:
2935 		/* Inserts can be done by updating a delete-marked record.
2936 		Because delete-mark and insert operations can be pointing to
2937 		the same records, we must not count duplicates. */
2938 	case IBUF_OP_DELETE_MARK:
2939 		/* There must be a record to delete-mark.
2940 		See if this record has been already buffered. */
2941 		if (n_recs && ibuf_get_volume_buffered_hash(
2942 			    rec, types + IBUF_REC_INFO_SIZE,
2943 			    types + len,
2944 			    types[IBUF_REC_OFFSET_FLAGS] & IBUF_REC_COMPACT,
2945 			    hash, size)) {
2946 			(*n_recs)++;
2947 		}
2948 
2949 		if (ibuf_op == IBUF_OP_DELETE_MARK) {
2950 			/* Setting the delete-mark flag does not
2951 			affect the available space on the page. */
2952 			return(0);
2953 		}
2954 		break;
2955 	case IBUF_OP_DELETE:
2956 		/* A record will be removed from the page. */
2957 		if (n_recs) {
2958 			(*n_recs)--;
2959 		}
2960 		/* While deleting a record actually frees up space,
2961 		we have to play it safe and pretend that it takes no
2962 		additional space (the record might not exist, etc.). */
2963 		return(0);
2964 	default:
2965 		ut_error;
2966 	}
2967 
2968 	ut_ad(ibuf_op == IBUF_OP_INSERT);
2969 
2970 get_volume_comp:
2971 	{
2972 		dtuple_t*	entry;
2973 		ulint		volume;
2974 		dict_index_t*	dummy_index;
2975 		mem_heap_t*	heap = mem_heap_create(500);
2976 
2977 		entry = ibuf_build_entry_from_ibuf_rec(
2978 			mtr, rec, heap, &dummy_index);
2979 
2980 		volume = rec_get_converted_size(dummy_index, entry, 0);
2981 
2982 		ibuf_dummy_index_free(dummy_index);
2983 		mem_heap_free(heap);
2984 
2985 		return(volume + page_dir_calc_reserved_space(1));
2986 	}
2987 }
2988 
2989 /*********************************************************************//**
2990 Gets an upper limit for the combined size of entries buffered in the insert
2991 buffer for a given page.
2992 @return upper limit for the volume of buffered inserts for the index
2993 page, in bytes; UNIV_PAGE_SIZE, if the entries for the index page span
2994 several pages in the insert buffer */
2995 static
2996 ulint
ibuf_get_volume_buffered(const btr_pcur_t * pcur,ulint space,ulint page_no,lint * n_recs,mtr_t * mtr)2997 ibuf_get_volume_buffered(
2998 /*=====================*/
2999 	const btr_pcur_t*pcur,	/*!< in: pcur positioned at a place in an
3000 				insert buffer tree where we would insert an
3001 				entry for the index page whose number is
3002 				page_no, latch mode has to be BTR_MODIFY_PREV
3003 				or BTR_MODIFY_TREE */
3004 	ulint		space,	/*!< in: space id */
3005 	ulint		page_no,/*!< in: page number of an index page */
3006 	lint*		n_recs,	/*!< in/out: minimum number of records on the
3007 				page after the buffered changes have been
3008 				applied, or NULL to disable the counting */
3009 	mtr_t*		mtr)	/*!< in: mini-transaction of pcur */
3010 {
3011 	ulint		volume;
3012 	const rec_t*	rec;
3013 	const page_t*	page;
3014 	ulint		prev_page_no;
3015 	const page_t*	prev_page;
3016 	ulint		next_page_no;
3017 	const page_t*	next_page;
3018 	/* bitmap of buffered recs */
3019 	ulint		hash_bitmap[128 / sizeof(ulint)];
3020 
3021 	ut_ad((pcur->latch_mode == BTR_MODIFY_PREV)
3022 	      || (pcur->latch_mode == BTR_MODIFY_TREE));
3023 
3024 	/* Count the volume of inserts earlier in the alphabetical order than
3025 	pcur */
3026 
3027 	volume = 0;
3028 
3029 	if (n_recs) {
3030 		memset(hash_bitmap, 0, sizeof hash_bitmap);
3031 	}
3032 
3033 	rec = btr_pcur_get_rec(pcur);
3034 	page = page_align(rec);
3035 	ut_ad(page_validate(page, ibuf->index));
3036 
3037 	if (page_rec_is_supremum(rec)) {
3038 		rec = page_rec_get_prev_const(rec);
3039 	}
3040 
3041 	for (; !page_rec_is_infimum(rec);
3042 	     rec = page_rec_get_prev_const(rec)) {
3043 		ut_ad(page_align(rec) == page);
3044 
3045 		if (page_no != ibuf_rec_get_page_no(mtr, rec)
3046 		    || space != ibuf_rec_get_space(mtr, rec)) {
3047 
3048 			goto count_later;
3049 		}
3050 
3051 		volume += ibuf_get_volume_buffered_count(
3052 			mtr, rec,
3053 			hash_bitmap, UT_ARR_SIZE(hash_bitmap), n_recs);
3054 	}
3055 
3056 	/* Look at the previous page */
3057 
3058 	prev_page_no = btr_page_get_prev(page, mtr);
3059 
3060 	if (prev_page_no == FIL_NULL) {
3061 
3062 		goto count_later;
3063 	}
3064 
3065 	{
3066 		buf_block_t*	block;
3067 
3068 		block = buf_page_get(
3069 			page_id_t(IBUF_SPACE_ID, prev_page_no),
3070 			univ_page_size, RW_X_LATCH, mtr);
3071 
3072 		buf_block_dbg_add_level(block, SYNC_IBUF_TREE_NODE);
3073 
3074 		prev_page = buf_block_get_frame(block);
3075 		ut_ad(page_validate(prev_page, ibuf->index));
3076 	}
3077 
3078 #ifdef UNIV_BTR_DEBUG
3079 	ut_a(btr_page_get_next(prev_page, mtr) == page_get_page_no(page));
3080 #endif /* UNIV_BTR_DEBUG */
3081 
3082 	rec = page_get_supremum_rec(prev_page);
3083 	rec = page_rec_get_prev_const(rec);
3084 
3085 	for (;; rec = page_rec_get_prev_const(rec)) {
3086 		ut_ad(page_align(rec) == prev_page);
3087 
3088 		if (page_rec_is_infimum(rec)) {
3089 
3090 			/* We cannot go to yet a previous page, because we
3091 			do not have the x-latch on it, and cannot acquire one
3092 			because of the latching order: we have to give up */
3093 
3094 			return(UNIV_PAGE_SIZE);
3095 		}
3096 
3097 		if (page_no != ibuf_rec_get_page_no(mtr, rec)
3098 		    || space != ibuf_rec_get_space(mtr, rec)) {
3099 
3100 			goto count_later;
3101 		}
3102 
3103 		volume += ibuf_get_volume_buffered_count(
3104 			mtr, rec,
3105 			hash_bitmap, UT_ARR_SIZE(hash_bitmap), n_recs);
3106 	}
3107 
3108 count_later:
3109 	rec = btr_pcur_get_rec(pcur);
3110 
3111 	if (!page_rec_is_supremum(rec)) {
3112 		rec = page_rec_get_next_const(rec);
3113 	}
3114 
3115 	for (; !page_rec_is_supremum(rec);
3116 	     rec = page_rec_get_next_const(rec)) {
3117 		if (page_no != ibuf_rec_get_page_no(mtr, rec)
3118 		    || space != ibuf_rec_get_space(mtr, rec)) {
3119 
3120 			return(volume);
3121 		}
3122 
3123 		volume += ibuf_get_volume_buffered_count(
3124 			mtr, rec,
3125 			hash_bitmap, UT_ARR_SIZE(hash_bitmap), n_recs);
3126 	}
3127 
3128 	/* Look at the next page */
3129 
3130 	next_page_no = btr_page_get_next(page, mtr);
3131 
3132 	if (next_page_no == FIL_NULL) {
3133 
3134 		return(volume);
3135 	}
3136 
3137 	{
3138 		buf_block_t*	block;
3139 
3140 		block = buf_page_get(
3141 			page_id_t(IBUF_SPACE_ID, next_page_no),
3142 			univ_page_size, RW_X_LATCH, mtr);
3143 
3144 		buf_block_dbg_add_level(block, SYNC_IBUF_TREE_NODE);
3145 
3146 		next_page = buf_block_get_frame(block);
3147 		ut_ad(page_validate(next_page, ibuf->index));
3148 	}
3149 
3150 #ifdef UNIV_BTR_DEBUG
3151 	ut_a(btr_page_get_prev(next_page, mtr) == page_get_page_no(page));
3152 #endif /* UNIV_BTR_DEBUG */
3153 
3154 	rec = page_get_infimum_rec(next_page);
3155 	rec = page_rec_get_next_const(rec);
3156 
3157 	for (;; rec = page_rec_get_next_const(rec)) {
3158 		ut_ad(page_align(rec) == next_page);
3159 
3160 		if (page_rec_is_supremum(rec)) {
3161 
3162 			/* We give up */
3163 
3164 			return(UNIV_PAGE_SIZE);
3165 		}
3166 
3167 		if (page_no != ibuf_rec_get_page_no(mtr, rec)
3168 		    || space != ibuf_rec_get_space(mtr, rec)) {
3169 
3170 			return(volume);
3171 		}
3172 
3173 		volume += ibuf_get_volume_buffered_count(
3174 			mtr, rec,
3175 			hash_bitmap, UT_ARR_SIZE(hash_bitmap), n_recs);
3176 	}
3177 }
3178 
3179 /*********************************************************************//**
3180 Reads the biggest tablespace id from the high end of the insert buffer
3181 tree and updates the counter in fil_system. */
3182 void
ibuf_update_max_tablespace_id(void)3183 ibuf_update_max_tablespace_id(void)
3184 /*===============================*/
3185 {
3186 	ulint		max_space_id;
3187 	const rec_t*	rec;
3188 	const byte*	field;
3189 	ulint		len;
3190 	btr_pcur_t	pcur;
3191 	mtr_t		mtr;
3192 
3193 	ut_a(!dict_table_is_comp(ibuf->index->table));
3194 
3195 	ibuf_mtr_start(&mtr);
3196 
3197 	btr_pcur_open_at_index_side(
3198 		false, ibuf->index, BTR_SEARCH_LEAF, &pcur, true, 0, &mtr);
3199 
3200 	ut_ad(page_validate(btr_pcur_get_page(&pcur), ibuf->index));
3201 
3202 	btr_pcur_move_to_prev(&pcur, &mtr);
3203 
3204 	if (btr_pcur_is_before_first_on_page(&pcur)) {
3205 		/* The tree is empty */
3206 
3207 		max_space_id = 0;
3208 	} else {
3209 		rec = btr_pcur_get_rec(&pcur);
3210 
3211 		field = rec_get_nth_field_old(rec, IBUF_REC_FIELD_SPACE, &len);
3212 
3213 		ut_a(len == 4);
3214 
3215 		max_space_id = mach_read_from_4(field);
3216 	}
3217 
3218 	ibuf_mtr_commit(&mtr);
3219 
3220 	/* printf("Maximum space id in insert buffer %lu\n", max_space_id); */
3221 
3222 	fil_set_max_space_id_if_bigger(max_space_id);
3223 }
3224 
3225 #ifdef UNIV_DEBUG
3226 # define ibuf_get_entry_counter_low(mtr,rec,space,page_no)	\
3227 	ibuf_get_entry_counter_low_func(mtr,rec,space,page_no)
3228 #else /* UNIV_DEBUG */
3229 # define ibuf_get_entry_counter_low(mtr,rec,space,page_no)	\
3230 	ibuf_get_entry_counter_low_func(rec,space,page_no)
3231 #endif
3232 /****************************************************************//**
3233 Helper function for ibuf_get_entry_counter_func. Checks if rec is for
3234 (space, page_no), and if so, reads counter value from it and returns
3235 that + 1.
3236 @retval ULINT_UNDEFINED if the record does not contain any counter
3237 @retval 0 if the record is not for (space, page_no)
3238 @retval 1 + previous counter value, otherwise */
3239 static
3240 ulint
ibuf_get_entry_counter_low_func(mtr_t * mtr,const rec_t * rec,ulint space,ulint page_no)3241 ibuf_get_entry_counter_low_func(
3242 /*============================*/
3243 #ifdef UNIV_DEBUG
3244 	mtr_t*		mtr,		/*!< in: mini-transaction of rec */
3245 #endif /* UNIV_DEBUG */
3246 	const rec_t*	rec,		/*!< in: insert buffer record */
3247 	ulint		space,		/*!< in: space id */
3248 	ulint		page_no)	/*!< in: page number */
3249 {
3250 	ulint		counter;
3251 	const byte*	field;
3252 	ulint		len;
3253 
3254 	ut_ad(ibuf_inside(mtr));
3255 	ut_ad(mtr_memo_contains_page(mtr, rec, MTR_MEMO_PAGE_X_FIX)
3256 	      || mtr_memo_contains_page(mtr, rec, MTR_MEMO_PAGE_S_FIX));
3257 	ut_ad(rec_get_n_fields_old(rec) > 2);
3258 
3259 	field = rec_get_nth_field_old(rec, IBUF_REC_FIELD_MARKER, &len);
3260 
3261 	ut_a(len == 1);
3262 
3263 	/* Check the tablespace identifier. */
3264 	field = rec_get_nth_field_old(rec, IBUF_REC_FIELD_SPACE, &len);
3265 
3266 	ut_a(len == 4);
3267 
3268 	if (mach_read_from_4(field) != space) {
3269 
3270 		return(0);
3271 	}
3272 
3273 	/* Check the page offset. */
3274 	field = rec_get_nth_field_old(rec, IBUF_REC_FIELD_PAGE, &len);
3275 	ut_a(len == 4);
3276 
3277 	if (mach_read_from_4(field) != page_no) {
3278 
3279 		return(0);
3280 	}
3281 
3282 	/* Check if the record contains a counter field. */
3283 	field = rec_get_nth_field_old(rec, IBUF_REC_FIELD_METADATA, &len);
3284 
3285 	switch (len % DATA_NEW_ORDER_NULL_TYPE_BUF_SIZE) {
3286 	default:
3287 		ut_error;
3288 	case 0: /* ROW_FORMAT=REDUNDANT */
3289 	case 1: /* ROW_FORMAT=COMPACT */
3290 		return(ULINT_UNDEFINED);
3291 
3292 	case IBUF_REC_INFO_SIZE:
3293 		counter = mach_read_from_2(field + IBUF_REC_OFFSET_COUNTER);
3294 		ut_a(counter < 0xFFFF);
3295 		return(counter + 1);
3296 	}
3297 }
3298 
3299 #ifdef UNIV_DEBUG
3300 # define ibuf_get_entry_counter(space,page_no,rec,mtr,exact_leaf) \
3301 	ibuf_get_entry_counter_func(space,page_no,rec,mtr,exact_leaf)
3302 #else /* UNIV_DEBUG */
3303 # define ibuf_get_entry_counter(space,page_no,rec,mtr,exact_leaf) \
3304 	ibuf_get_entry_counter_func(space,page_no,rec,exact_leaf)
3305 #endif /* UNIV_DEBUG */
3306 
3307 /****************************************************************//**
3308 Calculate the counter field for an entry based on the current
3309 last record in ibuf for (space, page_no).
3310 @return the counter field, or ULINT_UNDEFINED
3311 if we should abort this insertion to ibuf */
3312 static
3313 ulint
ibuf_get_entry_counter_func(ulint space,ulint page_no,const rec_t * rec,mtr_t * mtr,ibool only_leaf)3314 ibuf_get_entry_counter_func(
3315 /*========================*/
3316 	ulint		space,		/*!< in: space id of entry */
3317 	ulint		page_no,	/*!< in: page number of entry */
3318 	const rec_t*	rec,		/*!< in: the record preceding the
3319 					insertion point */
3320 #ifdef UNIV_DEBUG
3321 	mtr_t*		mtr,		/*!< in: mini-transaction */
3322 #endif /* UNIV_DEBUG */
3323 	ibool		only_leaf)	/*!< in: TRUE if this is the only
3324 					leaf page that can contain entries
3325 					for (space,page_no), that is, there
3326 					was no exact match for (space,page_no)
3327 					in the node pointer */
3328 {
3329 	ut_ad(ibuf_inside(mtr));
3330 	ut_ad(mtr_memo_contains_page(mtr, rec, MTR_MEMO_PAGE_X_FIX));
3331 	ut_ad(page_validate(page_align(rec), ibuf->index));
3332 
3333 	if (page_rec_is_supremum(rec)) {
3334 		/* This is just for safety. The record should be a
3335 		page infimum or a user record. */
3336 		ut_ad(0);
3337 		return(ULINT_UNDEFINED);
3338 	} else if (!page_rec_is_infimum(rec)) {
3339 		return(ibuf_get_entry_counter_low(mtr, rec, space, page_no));
3340 	} else if (only_leaf
3341 		   || fil_page_get_prev(page_align(rec)) == FIL_NULL) {
3342 		/* The parent node pointer did not contain the
3343 		searched for (space, page_no), which means that the
3344 		search ended on the correct page regardless of the
3345 		counter value, and since we're at the infimum record,
3346 		there are no existing records. */
3347 		return(0);
3348 	} else {
3349 		/* We used to read the previous page here. It would
3350 		break the latching order, because the caller has
3351 		buffer-fixed an insert buffer bitmap page. */
3352 		return(ULINT_UNDEFINED);
3353 	}
3354 }
3355 
3356 /** Buffer an operation in the insert/delete buffer, instead of doing it
3357 directly to the disk page, if this is possible.
3358 @param[in]	mode		BTR_MODIFY_PREV or BTR_MODIFY_TREE
3359 @param[in]	op		operation type
3360 @param[in]	no_counter	TRUE=use 5.0.3 format; FALSE=allow delete
3361 buffering
3362 @param[in]	entry		index entry to insert
3363 @param[in]	entry_size	rec_get_converted_size(index, entry)
3364 @param[in,out]	index		index where to insert; must not be unique
3365 or clustered
3366 @param[in]	page_id		page id where to insert
3367 @param[in]	page_size	page size
3368 @param[in,out]	thr		query thread
3369 @return DB_SUCCESS, DB_STRONG_FAIL or other error */
3370 static MY_ATTRIBUTE((warn_unused_result))
3371 dberr_t
ibuf_insert_low(ulint mode,ibuf_op_t op,ibool no_counter,const dtuple_t * entry,ulint entry_size,dict_index_t * index,const page_id_t & page_id,const page_size_t & page_size,que_thr_t * thr)3372 ibuf_insert_low(
3373 	ulint			mode,
3374 	ibuf_op_t		op,
3375 	ibool			no_counter,
3376 	const dtuple_t*		entry,
3377 	ulint			entry_size,
3378 	dict_index_t*		index,
3379 	const page_id_t&	page_id,
3380 	const page_size_t&	page_size,
3381 	que_thr_t*		thr)
3382 {
3383 	big_rec_t*	dummy_big_rec;
3384 	btr_pcur_t	pcur;
3385 	btr_cur_t*	cursor;
3386 	dtuple_t*	ibuf_entry;
3387 	mem_heap_t*	offsets_heap	= NULL;
3388 	mem_heap_t*	heap;
3389 	ulint*		offsets		= NULL;
3390 	ulint		buffered;
3391 	lint		min_n_recs;
3392 	rec_t*		ins_rec;
3393 	ibool		old_bit_value;
3394 	page_t*		bitmap_page;
3395 	buf_block_t*	block;
3396 	page_t*		root;
3397 	dberr_t		err;
3398 	ibool		do_merge;
3399 	ulint		space_ids[IBUF_MAX_N_PAGES_MERGED];
3400 	ulint		page_nos[IBUF_MAX_N_PAGES_MERGED];
3401 	ulint		n_stored;
3402 	mtr_t		mtr;
3403 	mtr_t		bitmap_mtr;
3404 
3405 	ut_a(!dict_index_is_clust(index));
3406 	ut_ad(!dict_index_is_spatial(index));
3407 	ut_ad(dtuple_check_typed(entry));
3408 	ut_ad(!no_counter || op == IBUF_OP_INSERT);
3409 	ut_a(op < IBUF_OP_COUNT);
3410 
3411 	do_merge = FALSE;
3412 
3413 	/* Perform dirty reads of ibuf->size and ibuf->max_size, to
3414 	reduce ibuf_mutex contention. Given that ibuf->max_size and
3415 	ibuf->size fit in a machine word, this should be OK; at worst
3416 	we are doing some excessive ibuf_contract() or occasionally
3417 	skipping an ibuf_contract(). */
3418 	if (ibuf->max_size == 0) {
3419 		return(DB_STRONG_FAIL);
3420 	}
3421 
3422 	if (ibuf->size >= ibuf->max_size + IBUF_CONTRACT_DO_NOT_INSERT) {
3423 		/* Insert buffer is now too big, contract it but do not try
3424 		to insert */
3425 
3426 
3427 #ifdef UNIV_IBUF_DEBUG
3428 		fputs("Ibuf too big\n", stderr);
3429 #endif
3430 		ibuf_contract(true);
3431 
3432 		return(DB_STRONG_FAIL);
3433 	}
3434 
3435 	heap = mem_heap_create(1024);
3436 
3437 	/* Build the entry which contains the space id and the page number
3438 	as the first fields and the type information for other fields, and
3439 	which will be inserted to the insert buffer. Using a counter value
3440 	of 0xFFFF we find the last record for (space, page_no), from which
3441 	we can then read the counter value N and use N + 1 in the record we
3442 	insert. (We patch the ibuf_entry's counter field to the correct
3443 	value just before actually inserting the entry.) */
3444 
3445 	ibuf_entry = ibuf_entry_build(
3446 		op, index, entry, page_id.space(), page_id.page_no(),
3447 		no_counter ? ULINT_UNDEFINED : 0xFFFF, heap);
3448 
3449 	/* Open a cursor to the insert buffer tree to calculate if we can add
3450 	the new entry to it without exceeding the free space limit for the
3451 	page. */
3452 
3453 	if (BTR_LATCH_MODE_WITHOUT_INTENTION(mode) == BTR_MODIFY_TREE) {
3454 		for (;;) {
3455 			mutex_enter(&ibuf_pessimistic_insert_mutex);
3456 			mutex_enter(&ibuf_mutex);
3457 
3458 			if (UNIV_LIKELY(ibuf_data_enough_free_for_insert())) {
3459 
3460 				break;
3461 			}
3462 
3463 			mutex_exit(&ibuf_mutex);
3464 			mutex_exit(&ibuf_pessimistic_insert_mutex);
3465 
3466 			if (!ibuf_add_free_page()) {
3467 
3468 				mem_heap_free(heap);
3469 				return(DB_STRONG_FAIL);
3470 			}
3471 		}
3472 	}
3473 
3474 	ibuf_mtr_start(&mtr);
3475 
3476 	btr_pcur_open(ibuf->index, ibuf_entry, PAGE_CUR_LE, mode, &pcur, &mtr);
3477 	ut_ad(page_validate(btr_pcur_get_page(&pcur), ibuf->index));
3478 
3479 	/* Find out the volume of already buffered inserts for the same index
3480 	page */
3481 	min_n_recs = 0;
3482 	buffered = ibuf_get_volume_buffered(&pcur,
3483 					    page_id.space(),
3484 					    page_id.page_no(),
3485 					    op == IBUF_OP_DELETE
3486 					    ? &min_n_recs
3487 					    : NULL, &mtr);
3488 
3489 	if (op == IBUF_OP_DELETE
3490 	    && (min_n_recs < 2 || buf_pool_watch_occurred(page_id))) {
3491 		/* The page could become empty after the record is
3492 		deleted, or the page has been read in to the buffer
3493 		pool.  Refuse to buffer the operation. */
3494 
3495 		/* The buffer pool watch is needed for IBUF_OP_DELETE
3496 		because of latching order considerations.  We can
3497 		check buf_pool_watch_occurred() only after latching
3498 		the insert buffer B-tree pages that contain buffered
3499 		changes for the page.  We never buffer IBUF_OP_DELETE,
3500 		unless some IBUF_OP_INSERT or IBUF_OP_DELETE_MARK have
3501 		been previously buffered for the page.  Because there
3502 		are buffered operations for the page, the insert
3503 		buffer B-tree page latches held by mtr will guarantee
3504 		that no changes for the user page will be merged
3505 		before mtr_commit(&mtr).  We must not mtr_commit(&mtr)
3506 		until after the IBUF_OP_DELETE has been buffered. */
3507 
3508 fail_exit:
3509 		if (BTR_LATCH_MODE_WITHOUT_INTENTION(mode) == BTR_MODIFY_TREE) {
3510 			mutex_exit(&ibuf_mutex);
3511 			mutex_exit(&ibuf_pessimistic_insert_mutex);
3512 		}
3513 
3514 		err = DB_STRONG_FAIL;
3515 		goto func_exit;
3516 	}
3517 
3518 	/* After this point, the page could still be loaded to the
3519 	buffer pool, but we do not have to care about it, since we are
3520 	holding a latch on the insert buffer leaf page that contains
3521 	buffered changes for (space, page_no).  If the page enters the
3522 	buffer pool, buf_page_io_complete() for (space, page_no) will
3523 	have to acquire a latch on the same insert buffer leaf page,
3524 	which it cannot do until we have buffered the IBUF_OP_DELETE
3525 	and done mtr_commit(&mtr) to release the latch. */
3526 
3527 #ifdef UNIV_IBUF_COUNT_DEBUG
3528 	ut_a((buffered == 0) || ibuf_count_get(page_id));
3529 #endif
3530 	ibuf_mtr_start(&bitmap_mtr);
3531 	bitmap_mtr.set_named_space(page_id.space());
3532 
3533 	bitmap_page = ibuf_bitmap_get_map_page(page_id, page_size,
3534 					       &bitmap_mtr);
3535 
3536 	/* We check if the index page is suitable for buffered entries */
3537 
3538 	if (buf_page_peek(page_id)
3539 	    || lock_rec_expl_exist_on_page(page_id.space(),
3540 					   page_id.page_no())) {
3541 
3542 		ibuf_mtr_commit(&bitmap_mtr);
3543 		goto fail_exit;
3544 	}
3545 
3546 	if (op == IBUF_OP_INSERT) {
3547 		ulint	bits = ibuf_bitmap_page_get_bits(
3548 			bitmap_page, page_id, page_size, IBUF_BITMAP_FREE,
3549 			&bitmap_mtr);
3550 
3551 		if (buffered + entry_size + page_dir_calc_reserved_space(1)
3552 		    > ibuf_index_page_calc_free_from_bits(page_size, bits)) {
3553 			/* Release the bitmap page latch early. */
3554 			ibuf_mtr_commit(&bitmap_mtr);
3555 
3556 			/* It may not fit */
3557 			do_merge = TRUE;
3558 
3559 			ibuf_get_merge_page_nos(FALSE,
3560 						btr_pcur_get_rec(&pcur), &mtr,
3561 						space_ids,
3562 						page_nos, &n_stored);
3563 
3564 			goto fail_exit;
3565 		}
3566 	}
3567 
3568 	if (!no_counter) {
3569 		/* Patch correct counter value to the entry to
3570 		insert. This can change the insert position, which can
3571 		result in the need to abort in some cases. */
3572 		ulint		counter = ibuf_get_entry_counter(
3573 			page_id.space(), page_id.page_no(),
3574 			btr_pcur_get_rec(&pcur), &mtr,
3575 			btr_pcur_get_btr_cur(&pcur)->low_match
3576 			< IBUF_REC_FIELD_METADATA);
3577 		dfield_t*	field;
3578 
3579 		if (counter == ULINT_UNDEFINED) {
3580 			ibuf_mtr_commit(&bitmap_mtr);
3581 			goto fail_exit;
3582 		}
3583 
3584 		field = dtuple_get_nth_field(
3585 			ibuf_entry, IBUF_REC_FIELD_METADATA);
3586 		mach_write_to_2(
3587 			(byte*) dfield_get_data(field)
3588 			+ IBUF_REC_OFFSET_COUNTER, counter);
3589 	}
3590 
3591 	/* Set the bitmap bit denoting that the insert buffer contains
3592 	buffered entries for this index page, if the bit is not set yet */
3593 
3594 	old_bit_value = ibuf_bitmap_page_get_bits(
3595 		bitmap_page, page_id, page_size,
3596 		IBUF_BITMAP_BUFFERED, &bitmap_mtr);
3597 
3598 	if (!old_bit_value) {
3599 		ibuf_bitmap_page_set_bits(bitmap_page, page_id, page_size,
3600 					  IBUF_BITMAP_BUFFERED, TRUE,
3601 					  &bitmap_mtr);
3602 	}
3603 
3604 	ibuf_mtr_commit(&bitmap_mtr);
3605 
3606 	cursor = btr_pcur_get_btr_cur(&pcur);
3607 
3608 	if (mode == BTR_MODIFY_PREV) {
3609 		err = btr_cur_optimistic_insert(
3610 			BTR_NO_LOCKING_FLAG,
3611 			cursor, &offsets, &offsets_heap,
3612 			ibuf_entry, &ins_rec,
3613 			&dummy_big_rec, 0, thr, &mtr);
3614 		block = btr_cur_get_block(cursor);
3615 		ut_ad(block->page.id.space() == IBUF_SPACE_ID);
3616 
3617 		/* If this is the root page, update ibuf->empty. */
3618 		if (block->page.id.page_no() == FSP_IBUF_TREE_ROOT_PAGE_NO) {
3619 			const page_t*	root = buf_block_get_frame(block);
3620 
3621 			ut_ad(page_get_space_id(root) == IBUF_SPACE_ID);
3622 			ut_ad(page_get_page_no(root)
3623 			      == FSP_IBUF_TREE_ROOT_PAGE_NO);
3624 
3625 			ibuf->empty = page_is_empty(root);
3626 		}
3627 	} else {
3628 		ut_ad(BTR_LATCH_MODE_WITHOUT_INTENTION(mode)
3629 		      == BTR_MODIFY_TREE);
3630 
3631 		/* We acquire an sx-latch to the root page before the insert,
3632 		because a pessimistic insert releases the tree x-latch,
3633 		which would cause the sx-latching of the root after that to
3634 		break the latching order. */
3635 
3636 		root = ibuf_tree_root_get(&mtr);
3637 
3638 		err = btr_cur_optimistic_insert(
3639 			BTR_NO_LOCKING_FLAG | BTR_NO_UNDO_LOG_FLAG,
3640 			cursor, &offsets, &offsets_heap,
3641 			ibuf_entry, &ins_rec,
3642 			&dummy_big_rec, 0, thr, &mtr);
3643 
3644 		if (err == DB_FAIL) {
3645 			err = btr_cur_pessimistic_insert(
3646 				BTR_NO_LOCKING_FLAG | BTR_NO_UNDO_LOG_FLAG,
3647 				cursor, &offsets, &offsets_heap,
3648 				ibuf_entry, &ins_rec,
3649 				&dummy_big_rec, 0, thr, &mtr);
3650 		}
3651 
3652 		mutex_exit(&ibuf_pessimistic_insert_mutex);
3653 		ibuf_size_update(root);
3654 		mutex_exit(&ibuf_mutex);
3655 		ibuf->empty = page_is_empty(root);
3656 
3657 		block = btr_cur_get_block(cursor);
3658 		ut_ad(block->page.id.space() == IBUF_SPACE_ID);
3659 	}
3660 
3661 	if (offsets_heap) {
3662 		mem_heap_free(offsets_heap);
3663 	}
3664 
3665 	if (err == DB_SUCCESS && op != IBUF_OP_DELETE) {
3666 		/* Update the page max trx id field */
3667 		page_update_max_trx_id(block, NULL,
3668 				       thr_get_trx(thr)->id, &mtr);
3669 	}
3670 
3671 func_exit:
3672 #ifdef UNIV_IBUF_COUNT_DEBUG
3673 	if (err == DB_SUCCESS) {
3674 
3675 		ib::info() << "Incrementing ibuf count of page " << page_id
3676 			<< " from " << ibuf_count_get(space, page_no)
3677 			<< " by 1";
3678 
3679 		ibuf_count_set(page_id, ibuf_count_get(page_id) + 1);
3680 	}
3681 #endif
3682 
3683 	ibuf_mtr_commit(&mtr);
3684 	btr_pcur_close(&pcur);
3685 
3686 	mem_heap_free(heap);
3687 
3688 	if (err == DB_SUCCESS
3689 	    && BTR_LATCH_MODE_WITHOUT_INTENTION(mode) == BTR_MODIFY_TREE) {
3690 		ibuf_contract_after_insert(entry_size);
3691 	}
3692 
3693 	if (do_merge) {
3694 #ifdef UNIV_IBUF_DEBUG
3695 		ut_a(n_stored <= IBUF_MAX_N_PAGES_MERGED);
3696 #endif
3697 		buf_read_ibuf_merge_pages(false, space_ids,
3698 					  page_nos, n_stored);
3699 	}
3700 
3701 	return(err);
3702 }
3703 
3704 /** Buffer an operation in the insert/delete buffer, instead of doing it
3705 directly to the disk page, if this is possible. Does not do it if the index
3706 is clustered or unique.
3707 @param[in]	op		operation type
3708 @param[in]	entry		index entry to insert
3709 @param[in,out]	index		index where to insert
3710 @param[in]	page_id		page id where to insert
3711 @param[in]	page_size	page size
3712 @param[in,out]	thr		query thread
3713 @return TRUE if success */
3714 ibool
ibuf_insert(ibuf_op_t op,const dtuple_t * entry,dict_index_t * index,const page_id_t & page_id,const page_size_t & page_size,que_thr_t * thr)3715 ibuf_insert(
3716 	ibuf_op_t		op,
3717 	const dtuple_t*		entry,
3718 	dict_index_t*		index,
3719 	const page_id_t&	page_id,
3720 	const page_size_t&	page_size,
3721 	que_thr_t*		thr)
3722 {
3723 	dberr_t		err;
3724 	ulint		entry_size;
3725 	ibool		no_counter;
3726 	/* Read the settable global variable ibuf_use only once in
3727 	this function, so that we will have a consistent view of it. */
3728 	ibuf_use_t	use		= ibuf_use;
3729 	DBUG_ENTER("ibuf_insert");
3730 
3731 	DBUG_PRINT("ibuf", ("op: %d, space: " UINT32PF ", page_no: " UINT32PF,
3732 			    op, page_id.space(), page_id.page_no()));
3733 
3734 	ut_ad(dtuple_check_typed(entry));
3735 	ut_ad(page_id.space() != srv_tmp_space.space_id());
3736 
3737 	ut_a(!dict_index_is_clust(index));
3738 
3739 	no_counter = use <= IBUF_USE_INSERT;
3740 
3741 	switch (op) {
3742 	case IBUF_OP_INSERT:
3743 		switch (use) {
3744 		case IBUF_USE_NONE:
3745 		case IBUF_USE_DELETE:
3746 		case IBUF_USE_DELETE_MARK:
3747 			DBUG_RETURN(FALSE);
3748 		case IBUF_USE_INSERT:
3749 		case IBUF_USE_INSERT_DELETE_MARK:
3750 		case IBUF_USE_ALL:
3751 			goto check_watch;
3752 		case IBUF_USE_COUNT:
3753 			break;
3754 		}
3755 		break;
3756 	case IBUF_OP_DELETE_MARK:
3757 		switch (use) {
3758 		case IBUF_USE_NONE:
3759 		case IBUF_USE_INSERT:
3760 			DBUG_RETURN(FALSE);
3761 		case IBUF_USE_DELETE_MARK:
3762 		case IBUF_USE_DELETE:
3763 		case IBUF_USE_INSERT_DELETE_MARK:
3764 		case IBUF_USE_ALL:
3765 			ut_ad(!no_counter);
3766 			goto check_watch;
3767 		case IBUF_USE_COUNT:
3768 			break;
3769 		}
3770 		break;
3771 	case IBUF_OP_DELETE:
3772 		switch (use) {
3773 		case IBUF_USE_NONE:
3774 		case IBUF_USE_INSERT:
3775 		case IBUF_USE_INSERT_DELETE_MARK:
3776 			DBUG_RETURN(FALSE);
3777 		case IBUF_USE_DELETE_MARK:
3778 		case IBUF_USE_DELETE:
3779 		case IBUF_USE_ALL:
3780 			ut_ad(!no_counter);
3781 			goto skip_watch;
3782 		case IBUF_USE_COUNT:
3783 			break;
3784 		}
3785 		break;
3786 	case IBUF_OP_COUNT:
3787 		break;
3788 	}
3789 
3790 	/* unknown op or use */
3791 	ut_error;
3792 
3793 check_watch:
3794 	/* If a thread attempts to buffer an insert on a page while a
3795 	purge is in progress on the same page, the purge must not be
3796 	buffered, because it could remove a record that was
3797 	re-inserted later.  For simplicity, we block the buffering of
3798 	all operations on a page that has a purge pending.
3799 
3800 	We do not check this in the IBUF_OP_DELETE case, because that
3801 	would always trigger the buffer pool watch during purge and
3802 	thus prevent the buffering of delete operations.  We assume
3803 	that the issuer of IBUF_OP_DELETE has called
3804 	buf_pool_watch_set(space, page_no). */
3805 
3806 	{
3807 		buf_pool_t*	buf_pool = buf_pool_get(page_id);
3808 		buf_page_t*	bpage
3809 			= buf_page_get_also_watch(buf_pool, page_id);
3810 
3811 		if (bpage != NULL) {
3812 			/* A buffer pool watch has been set or the
3813 			page has been read into the buffer pool.
3814 			Do not buffer the request.  If a purge operation
3815 			is being buffered, have this request executed
3816 			directly on the page in the buffer pool after the
3817 			buffered entries for this page have been merged. */
3818 			DBUG_RETURN(FALSE);
3819 		}
3820 	}
3821 
3822 skip_watch:
3823 	entry_size = rec_get_converted_size(index, entry, 0);
3824 
3825 	if (entry_size
3826 	    >= page_get_free_space_of_empty(dict_table_is_comp(index->table))
3827 	    / 2) {
3828 
3829 		DBUG_RETURN(FALSE);
3830 	}
3831 
3832 	err = ibuf_insert_low(BTR_MODIFY_PREV, op, no_counter,
3833 			      entry, entry_size,
3834 			      index, page_id, page_size, thr);
3835 	if (err == DB_FAIL) {
3836 		err = ibuf_insert_low(BTR_MODIFY_TREE | BTR_LATCH_FOR_INSERT,
3837 				      op, no_counter, entry, entry_size,
3838 				      index, page_id, page_size, thr);
3839 	}
3840 
3841 	if (err == DB_SUCCESS) {
3842 #ifdef UNIV_IBUF_DEBUG
3843 		/* fprintf(stderr, "Ibuf insert for page no %lu of index %s\n",
3844 		page_no, index->name); */
3845 #endif
3846 		DBUG_RETURN(TRUE);
3847 
3848 	} else {
3849 		ut_a(err == DB_STRONG_FAIL || err == DB_TOO_BIG_RECORD);
3850 
3851 		DBUG_RETURN(FALSE);
3852 	}
3853 }
3854 
3855 /********************************************************************//**
3856 During merge, inserts to an index page a secondary index entry extracted
3857 from the insert buffer.
3858 @return	newly inserted record */
3859 static MY_ATTRIBUTE((nonnull))
3860 rec_t*
ibuf_insert_to_index_page_low(const dtuple_t * entry,buf_block_t * block,dict_index_t * index,ulint ** offsets,mem_heap_t * heap,mtr_t * mtr,page_cur_t * page_cur)3861 ibuf_insert_to_index_page_low(
3862 /*==========================*/
3863 	const dtuple_t*	entry,	/*!< in: buffered entry to insert */
3864 	buf_block_t*	block,	/*!< in/out: index page where the buffered
3865 				entry should be placed */
3866 	dict_index_t*	index,	/*!< in: record descriptor */
3867 	ulint**		offsets,/*!< out: offsets on *rec */
3868 	mem_heap_t*	heap,	/*!< in/out: memory heap */
3869 	mtr_t*		mtr,	/*!< in/out: mtr */
3870 	page_cur_t*	page_cur)/*!< in/out: cursor positioned on the record
3871 				after which to insert the buffered entry */
3872 {
3873 	const page_t*	page;
3874 	const page_t*	bitmap_page;
3875 	ulint		old_bits;
3876 	rec_t*		rec;
3877 	DBUG_ENTER("ibuf_insert_to_index_page_low");
3878 
3879 	rec = page_cur_tuple_insert(page_cur, entry, index,
3880 				    offsets, &heap, 0, mtr);
3881 	if (rec != NULL) {
3882 		DBUG_RETURN(rec);
3883 	}
3884 
3885 	/* Page reorganization or recompression should already have
3886 	been attempted by page_cur_tuple_insert(). Besides, per
3887 	ibuf_index_page_calc_free_zip() the page should not have been
3888 	recompressed or reorganized. */
3889 	ut_ad(!buf_block_get_page_zip(block));
3890 
3891 	/* If the record did not fit, reorganize */
3892 
3893 	btr_page_reorganize(page_cur, index, mtr);
3894 
3895 	/* This time the record must fit */
3896 
3897 	rec = page_cur_tuple_insert(page_cur, entry, index,
3898 				    offsets, &heap, 0, mtr);
3899 	if (rec != NULL) {
3900 		DBUG_RETURN(rec);
3901 	}
3902 
3903 	page = buf_block_get_frame(block);
3904 
3905 	ib::error() << "Insert buffer insert fails; page free "
3906 		<< page_get_max_insert_size(page, 1) << ", dtuple size "
3907 		<< rec_get_converted_size(index, entry, 0);
3908 
3909 	fputs("InnoDB: Cannot insert index record ", stderr);
3910 	dtuple_print(stderr, entry);
3911 	fputs("\nInnoDB: The table where this index record belongs\n"
3912 	      "InnoDB: is now probably corrupt. Please run CHECK TABLE on\n"
3913 	      "InnoDB: that table.\n", stderr);
3914 
3915 	bitmap_page = ibuf_bitmap_get_map_page(block->page.id,
3916 					       block->page.size, mtr);
3917 	old_bits = ibuf_bitmap_page_get_bits(
3918 		bitmap_page, block->page.id, block->page.size,
3919 		IBUF_BITMAP_FREE, mtr);
3920 
3921 	ib::error() << "page " << block->page.id << ", size "
3922 		<< block->page.size.physical() << ", bitmap bits " << old_bits;
3923 
3924 	ib::error() << BUG_REPORT_MSG;
3925 
3926 	ut_ad(0);
3927 	DBUG_RETURN(NULL);
3928 }
3929 
3930 /************************************************************************
3931 During merge, inserts to an index page a secondary index entry extracted
3932 from the insert buffer. */
3933 static
3934 void
ibuf_insert_to_index_page(const dtuple_t * entry,buf_block_t * block,dict_index_t * index,mtr_t * mtr)3935 ibuf_insert_to_index_page(
3936 /*======================*/
3937 	const dtuple_t*	entry,	/*!< in: buffered entry to insert */
3938 	buf_block_t*	block,	/*!< in/out: index page where the buffered entry
3939 				should be placed */
3940 	dict_index_t*	index,	/*!< in: record descriptor */
3941 	mtr_t*		mtr)	/*!< in: mtr */
3942 {
3943 	page_cur_t	page_cur;
3944 	ulint		low_match;
3945 	page_t*		page		= buf_block_get_frame(block);
3946 	rec_t*		rec;
3947 	ulint*		offsets;
3948 	mem_heap_t*	heap;
3949 
3950 	DBUG_ENTER("ibuf_insert_to_index_page");
3951 
3952 	DBUG_PRINT("ibuf", ("page " UINT32PF ":" UINT32PF,
3953 			    block->page.id.space(),
3954 			    block->page.id.page_no()));
3955 
3956 	ut_ad(!dict_index_is_online_ddl(index));// this is an ibuf_dummy index
3957 	ut_ad(ibuf_inside(mtr));
3958 	ut_ad(dtuple_check_typed(entry));
3959 	/* A change buffer merge must occur before users are granted
3960 	any access to the page. No adaptive hash index entries may
3961 	point to a freshly read page. */
3962 	ut_ad(!block->index);
3963 	assert_block_ahi_empty(block);
3964 	ut_ad(mtr->is_named_space(block->page.id.space()));
3965 
3966 	if (UNIV_UNLIKELY(dict_table_is_comp(index->table)
3967 			  != (ibool)!!page_is_comp(page))) {
3968 		ib::warn() << "Trying to insert a record from the insert"
3969 			" buffer to an index page but the 'compact' flag does"
3970 			" not match!";
3971 		goto dump;
3972 	}
3973 
3974 	rec = page_rec_get_next(page_get_infimum_rec(page));
3975 
3976 	if (page_rec_is_supremum(rec)) {
3977 		ib::warn() << "Trying to insert a record from the insert"
3978 			" buffer to an index page but the index page"
3979 			" is empty!";
3980 		goto dump;
3981 	}
3982 
3983 	if (!rec_n_fields_is_sane(index, rec, entry)) {
3984 		ib::warn() << "Trying to insert a record from the insert"
3985 			" buffer to an index page but the number of fields"
3986 			" does not match!";
3987 		rec_print(stderr, rec, index);
3988 dump:
3989 		dtuple_print(stderr, entry);
3990 		ut_ad(0);
3991 
3992 		ib::warn() << "The table where this index record belongs"
3993 			" is now probably corrupt. Please run CHECK TABLE on"
3994 			" your tables. " << BUG_REPORT_MSG;
3995 
3996 		DBUG_VOID_RETURN;
3997 	}
3998 
3999 	low_match = page_cur_search(block, index, entry, &page_cur);
4000 
4001 	heap = mem_heap_create(
4002 		sizeof(upd_t)
4003 		+ REC_OFFS_HEADER_SIZE * sizeof(*offsets)
4004 		+ dtuple_get_n_fields(entry)
4005 		* (sizeof(upd_field_t) + sizeof *offsets));
4006 
4007 	if (UNIV_UNLIKELY(low_match == dtuple_get_n_fields(entry))) {
4008 		upd_t*		update;
4009 		page_zip_des_t*	page_zip;
4010 
4011 		rec = page_cur_get_rec(&page_cur);
4012 
4013 		/* This is based on
4014 		row_ins_sec_index_entry_by_modify(BTR_MODIFY_LEAF). */
4015 		ut_ad(rec_get_deleted_flag(rec, page_is_comp(page)));
4016 
4017 		offsets = rec_get_offsets(rec, index, NULL, ULINT_UNDEFINED,
4018 					  &heap);
4019 		update = row_upd_build_sec_rec_difference_binary(
4020 			rec, index, offsets, entry, heap);
4021 
4022 		page_zip = buf_block_get_page_zip(block);
4023 
4024 		if (update->n_fields == 0) {
4025 			/* The records only differ in the delete-mark.
4026 			Clear the delete-mark, like we did before
4027 			Bug #56680 was fixed. */
4028 			btr_cur_set_deleted_flag_for_ibuf(
4029 				rec, page_zip, FALSE, mtr);
4030 			goto updated_in_place;
4031 		}
4032 
4033 		/* Copy the info bits. Clear the delete-mark. */
4034 		update->info_bits = rec_get_info_bits(rec, page_is_comp(page));
4035 		update->info_bits &= ~REC_INFO_DELETED_FLAG;
4036 
4037 		/* We cannot invoke btr_cur_optimistic_update() here,
4038 		because we do not have a btr_cur_t or que_thr_t,
4039 		as the insert buffer merge occurs at a very low level. */
4040 		if (!row_upd_changes_field_size_or_external(index, offsets,
4041 							    update)
4042 		    && (!page_zip || btr_cur_update_alloc_zip(
4043 				page_zip, &page_cur, index, offsets,
4044 				rec_offs_size(offsets), false, mtr))) {
4045 			/* This is the easy case. Do something similar
4046 			to btr_cur_update_in_place(). */
4047 			rec = page_cur_get_rec(&page_cur);
4048 			row_upd_rec_in_place(rec, index, offsets,
4049 					     update, page_zip);
4050 
4051 			/* Log the update in place operation. During recovery
4052 			MLOG_COMP_REC_UPDATE_IN_PLACE/MLOG_REC_UPDATE_IN_PLACE
4053 			expects trx_id, roll_ptr for secondary indexes. So we
4054 			just write dummy trx_id(0), roll_ptr(0) */
4055 			btr_cur_update_in_place_log(BTR_KEEP_SYS_FLAG, rec,
4056 						    index, update, 0, 0, mtr);
4057 
4058 			DBUG_EXECUTE_IF(
4059 				"crash_after_log_ibuf_upd_inplace",
4060 				log_buffer_flush_to_disk();
4061 				ib::info() << "Wrote log record for ibuf"
4062 					" update in place operation";
4063 				DBUG_SUICIDE();
4064 			);
4065 
4066 			goto updated_in_place;
4067 		}
4068 
4069 		/* btr_cur_update_alloc_zip() may have changed this */
4070 		rec = page_cur_get_rec(&page_cur);
4071 
4072 		/* A collation may identify values that differ in
4073 		storage length.
4074 		Some examples (1 or 2 bytes):
4075 		utf8_turkish_ci: I = U+0131 LATIN SMALL LETTER DOTLESS I
4076 		utf8_general_ci: S = U+00DF LATIN SMALL LETTER SHARP S
4077 		utf8_general_ci: A = U+00E4 LATIN SMALL LETTER A WITH DIAERESIS
4078 
4079 		latin1_german2_ci: SS = U+00DF LATIN SMALL LETTER SHARP S
4080 
4081 		Examples of a character (3-byte UTF-8 sequence)
4082 		identified with 2 or 4 characters (1-byte UTF-8 sequences):
4083 
4084 		utf8_unicode_ci: 'II' = U+2171 SMALL ROMAN NUMERAL TWO
4085 		utf8_unicode_ci: '(10)' = U+247D PARENTHESIZED NUMBER TEN
4086 		*/
4087 
4088 		/* Delete the different-length record, and insert the
4089 		buffered one. */
4090 
4091 		lock_rec_store_on_page_infimum(block, rec);
4092 		page_cur_delete_rec(&page_cur, index, offsets, mtr);
4093 		page_cur_move_to_prev(&page_cur);
4094 		rec = ibuf_insert_to_index_page_low(entry, block, index,
4095 				      		    &offsets, heap, mtr,
4096 						    &page_cur);
4097 
4098 		ut_ad(!cmp_dtuple_rec(entry, rec, offsets));
4099 		lock_rec_restore_from_page_infimum(block, rec, block);
4100 	} else {
4101 		offsets = NULL;
4102 		ibuf_insert_to_index_page_low(entry, block, index,
4103 					      &offsets, heap, mtr,
4104 					      &page_cur);
4105 	}
4106 updated_in_place:
4107 	mem_heap_free(heap);
4108 
4109 	DBUG_VOID_RETURN;
4110 }
4111 
4112 /****************************************************************//**
4113 During merge, sets the delete mark on a record for a secondary index
4114 entry. */
4115 static
4116 void
ibuf_set_del_mark(const dtuple_t * entry,buf_block_t * block,const dict_index_t * index,mtr_t * mtr)4117 ibuf_set_del_mark(
4118 /*==============*/
4119 	const dtuple_t*		entry,	/*!< in: entry */
4120 	buf_block_t*		block,	/*!< in/out: block */
4121 	const dict_index_t*	index,	/*!< in: record descriptor */
4122 	mtr_t*			mtr)	/*!< in: mtr */
4123 {
4124 	page_cur_t	page_cur;
4125 	ulint		low_match;
4126 
4127 	ut_ad(ibuf_inside(mtr));
4128 	ut_ad(dtuple_check_typed(entry));
4129 
4130 	low_match = page_cur_search(block, index, entry, &page_cur);
4131 
4132 	if (low_match == dtuple_get_n_fields(entry)) {
4133 		rec_t*		rec;
4134 		page_zip_des_t*	page_zip;
4135 
4136 		rec = page_cur_get_rec(&page_cur);
4137 		page_zip = page_cur_get_page_zip(&page_cur);
4138 
4139 		/* Delete mark the old index record. According to a
4140 		comment in row_upd_sec_index_entry(), it can already
4141 		have been delete marked if a lock wait occurred in
4142 		row_ins_sec_index_entry() in a previous invocation of
4143 		row_upd_sec_index_entry(). */
4144 
4145 		if (UNIV_LIKELY
4146 		    (!rec_get_deleted_flag(
4147 			    rec, dict_table_is_comp(index->table)))) {
4148 			btr_cur_set_deleted_flag_for_ibuf(rec, page_zip,
4149 							  TRUE, mtr);
4150 		}
4151 	} else {
4152 		const page_t*		page
4153 			= page_cur_get_page(&page_cur);
4154 		const buf_block_t*	block
4155 			= page_cur_get_block(&page_cur);
4156 
4157 		ib::error() << "Unable to find a record to delete-mark";
4158 		fputs("InnoDB: tuple ", stderr);
4159 		dtuple_print(stderr, entry);
4160 		fputs("\n"
4161 		      "InnoDB: record ", stderr);
4162 		rec_print(stderr, page_cur_get_rec(&page_cur), index);
4163 
4164 		ib::error() << "page " << block->page.id << " ("
4165 			<< page_get_n_recs(page) << " records, index id "
4166 			<< btr_page_get_index_id(page) << ").";
4167 
4168 		ib::error() << BUG_REPORT_MSG;
4169 		ut_ad(0);
4170 	}
4171 }
4172 
4173 /****************************************************************//**
4174 During merge, delete a record for a secondary index entry. */
4175 static
4176 void
ibuf_delete(const dtuple_t * entry,buf_block_t * block,dict_index_t * index,mtr_t * mtr)4177 ibuf_delete(
4178 /*========*/
4179 	const dtuple_t*	entry,	/*!< in: entry */
4180 	buf_block_t*	block,	/*!< in/out: block */
4181 	dict_index_t*	index,	/*!< in: record descriptor */
4182 	mtr_t*		mtr)	/*!< in/out: mtr; must be committed
4183 				before latching any further pages */
4184 {
4185 	page_cur_t	page_cur;
4186 	ulint		low_match;
4187 
4188 	ut_ad(ibuf_inside(mtr));
4189 	ut_ad(dtuple_check_typed(entry));
4190 	ut_ad(!dict_index_is_spatial(index));
4191 
4192 	low_match = page_cur_search(block, index, entry, &page_cur);
4193 
4194 	if (low_match == dtuple_get_n_fields(entry)) {
4195 		page_zip_des_t*	page_zip= buf_block_get_page_zip(block);
4196 		page_t*		page	= buf_block_get_frame(block);
4197 		rec_t*		rec	= page_cur_get_rec(&page_cur);
4198 
4199 		/* TODO: the below should probably be a separate function,
4200 		it's a bastardized version of btr_cur_optimistic_delete. */
4201 
4202 		ulint		offsets_[REC_OFFS_NORMAL_SIZE];
4203 		ulint*		offsets	= offsets_;
4204 		mem_heap_t*	heap = NULL;
4205 		ulint		max_ins_size = 0;
4206 
4207 		rec_offs_init(offsets_);
4208 
4209 		offsets = rec_get_offsets(
4210 			rec, index, offsets, ULINT_UNDEFINED, &heap);
4211 
4212 		if (page_get_n_recs(page) <= 1
4213 		    || !(REC_INFO_DELETED_FLAG
4214 			 & rec_get_info_bits(rec, page_is_comp(page)))) {
4215 			/* Refuse to purge the last record or a
4216 			record that has not been marked for deletion. */
4217 			ib::error() << "Unable to purge a record";
4218 			fputs("InnoDB: tuple ", stderr);
4219 			dtuple_print(stderr, entry);
4220 			fputs("\n"
4221 			      "InnoDB: record ", stderr);
4222 			rec_print_new(stderr, rec, offsets);
4223 			fprintf(stderr, "\nspace " UINT32PF " offset " UINT32PF
4224 				" (%u records, index id %llu)\n"
4225 				"InnoDB: Submit a detailed bug report"
4226 				" to http://bugs.mysql.com\n",
4227 				block->page.id.space(),
4228 				block->page.id.page_no(),
4229 				(unsigned) page_get_n_recs(page),
4230 				(ulonglong) btr_page_get_index_id(page));
4231 
4232 			ut_ad(0);
4233 			return;
4234 		}
4235 
4236 		lock_update_delete(block, rec);
4237 
4238 		if (!page_zip) {
4239 			max_ins_size
4240 				= page_get_max_insert_size_after_reorganize(
4241 					page, 1);
4242 		}
4243 #ifdef UNIV_ZIP_DEBUG
4244 		ut_a(!page_zip || page_zip_validate(page_zip, page, index));
4245 #endif /* UNIV_ZIP_DEBUG */
4246 		page_cur_delete_rec(&page_cur, index, offsets, mtr);
4247 #ifdef UNIV_ZIP_DEBUG
4248 		ut_a(!page_zip || page_zip_validate(page_zip, page, index));
4249 #endif /* UNIV_ZIP_DEBUG */
4250 
4251 		if (page_zip) {
4252 			ibuf_update_free_bits_zip(block, mtr);
4253 		} else {
4254 			ibuf_update_free_bits_low(block, max_ins_size, mtr);
4255 		}
4256 
4257 		if (UNIV_LIKELY_NULL(heap)) {
4258 			mem_heap_free(heap);
4259 		}
4260 	} else {
4261 		/* The record must have been purged already. */
4262 	}
4263 }
4264 
4265 /*********************************************************************//**
4266 Restores insert buffer tree cursor position
4267 @return TRUE if the position was restored; FALSE if not */
4268 static MY_ATTRIBUTE((nonnull))
4269 ibool
ibuf_restore_pos(ulint space,ulint page_no,const dtuple_t * search_tuple,ulint mode,btr_pcur_t * pcur,mtr_t * mtr)4270 ibuf_restore_pos(
4271 /*=============*/
4272 	ulint		space,	/*!< in: space id */
4273 	ulint		page_no,/*!< in: index page number where the record
4274 				should belong */
4275 	const dtuple_t*	search_tuple,
4276 				/*!< in: search tuple for entries of page_no */
4277 	ulint		mode,	/*!< in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE */
4278 	btr_pcur_t*	pcur,	/*!< in/out: persistent cursor whose
4279 				position is to be restored */
4280 	mtr_t*		mtr)	/*!< in/out: mini-transaction */
4281 {
4282 	ut_ad(mode == BTR_MODIFY_LEAF
4283 	      || BTR_LATCH_MODE_WITHOUT_INTENTION(mode) == BTR_MODIFY_TREE);
4284 
4285 	if (btr_pcur_restore_position(mode, pcur, mtr)) {
4286 
4287 		return(TRUE);
4288 	}
4289 
4290 	if (fil_space_get_flags(space) == ULINT_UNDEFINED ||
4291 		fil_space_is_being_truncated(space)) {
4292 		/* The tablespace has been dropped. Or the tablespace is being
4293 		truncated. It is possible that another thread has deleted
4294 		the insert buffer entry.  Do not complain. */
4295 		ibuf_btr_pcur_commit_specify_mtr(pcur, mtr);
4296 	} else {
4297 		ib::error() << "ibuf cursor restoration fails!."
4298 			" ibuf record inserted to page "
4299 			<< space << ":" << page_no;
4300 
4301 		ib::error() << BUG_REPORT_MSG;
4302 
4303 		rec_print_old(stderr, btr_pcur_get_rec(pcur));
4304 		rec_print_old(stderr, pcur->old_rec);
4305 		dtuple_print(stderr, search_tuple);
4306 
4307 		rec_print_old(stderr,
4308 			      page_rec_get_next(btr_pcur_get_rec(pcur)));
4309 
4310 		ib::fatal() << "Failed to restore ibuf position.";
4311 	}
4312 
4313 	return(FALSE);
4314 }
4315 
4316 /*********************************************************************//**
4317 Deletes from ibuf the record on which pcur is positioned. If we have to
4318 resort to a pessimistic delete, this function commits mtr and closes
4319 the cursor.
4320 @return TRUE if mtr was committed and pcur closed in this operation */
4321 static MY_ATTRIBUTE((warn_unused_result))
4322 ibool
ibuf_delete_rec(ulint space,ulint page_no,btr_pcur_t * pcur,const dtuple_t * search_tuple,mtr_t * mtr)4323 ibuf_delete_rec(
4324 /*============*/
4325 	ulint		space,	/*!< in: space id */
4326 	ulint		page_no,/*!< in: index page number that the record
4327 				should belong to */
4328 	btr_pcur_t*	pcur,	/*!< in: pcur positioned on the record to
4329 				delete, having latch mode BTR_MODIFY_LEAF */
4330 	const dtuple_t*	search_tuple,
4331 				/*!< in: search tuple for entries of page_no */
4332 	mtr_t*		mtr)	/*!< in: mtr */
4333 {
4334 	ibool		success;
4335 	page_t*		root;
4336 	dberr_t		err;
4337 
4338 	ut_ad(ibuf_inside(mtr));
4339 	ut_ad(page_rec_is_user_rec(btr_pcur_get_rec(pcur)));
4340 	ut_ad(ibuf_rec_get_page_no(mtr, btr_pcur_get_rec(pcur)) == page_no);
4341 	ut_ad(ibuf_rec_get_space(mtr, btr_pcur_get_rec(pcur)) == space);
4342 
4343 #if defined UNIV_DEBUG || defined UNIV_IBUF_DEBUG
4344 	if (ibuf_debug == 2) {
4345 		/* Inject a fault (crash). We do this before trying
4346 		optimistic delete, because a pessimistic delete in the
4347 		change buffer would require a larger test case. */
4348 
4349 		/* Flag the buffered record as processed, to avoid
4350 		an assertion failure after crash recovery. */
4351 		btr_cur_set_deleted_flag_for_ibuf(
4352 			btr_pcur_get_rec(pcur), NULL, TRUE, mtr);
4353 
4354 		ibuf_mtr_commit(mtr);
4355 		log_write_up_to(LSN_MAX, true);
4356 		DBUG_SUICIDE();
4357 	}
4358 #endif /* UNIV_DEBUG || UNIV_IBUF_DEBUG */
4359 
4360 	success = btr_cur_optimistic_delete(btr_pcur_get_btr_cur(pcur),
4361 					    0, mtr);
4362 
4363 	const page_id_t	page_id(space, page_no);
4364 
4365 	if (success) {
4366 		if (page_is_empty(btr_pcur_get_page(pcur))) {
4367 			/* If a B-tree page is empty, it must be the root page
4368 			and the whole B-tree must be empty. InnoDB does not
4369 			allow empty B-tree pages other than the root. */
4370 			root = btr_pcur_get_page(pcur);
4371 
4372 			ut_ad(page_get_space_id(root) == IBUF_SPACE_ID);
4373 			ut_ad(page_get_page_no(root)
4374 			      == FSP_IBUF_TREE_ROOT_PAGE_NO);
4375 
4376 			/* ibuf->empty is protected by the root page latch.
4377 			Before the deletion, it had to be FALSE. */
4378 			ut_ad(!ibuf->empty);
4379 			ibuf->empty = true;
4380 		}
4381 
4382 #ifdef UNIV_IBUF_COUNT_DEBUG
4383 		ib::info() << "Decrementing ibuf count of space " << space
4384 			<< " page " << page_no << " from "
4385 			<< ibuf_count_get(page_id) << " by 1";
4386 
4387 		ibuf_count_set(page_id, ibuf_count_get(page_id) - 1);
4388 #endif /* UNIV_IBUF_COUNT_DEBUG */
4389 
4390 		return(FALSE);
4391 	}
4392 
4393 	ut_ad(page_rec_is_user_rec(btr_pcur_get_rec(pcur)));
4394 	ut_ad(ibuf_rec_get_page_no(mtr, btr_pcur_get_rec(pcur)) == page_no);
4395 	ut_ad(ibuf_rec_get_space(mtr, btr_pcur_get_rec(pcur)) == space);
4396 
4397 	/* We have to resort to a pessimistic delete from ibuf.
4398 	Delete-mark the record so that it will not be applied again,
4399 	in case the server crashes before the pessimistic delete is
4400 	made persistent. */
4401 	btr_cur_set_deleted_flag_for_ibuf(
4402 		btr_pcur_get_rec(pcur), NULL, TRUE, mtr);
4403 
4404 	btr_pcur_store_position(pcur, mtr);
4405 	ibuf_btr_pcur_commit_specify_mtr(pcur, mtr);
4406 
4407 	ibuf_mtr_start(mtr);
4408 	mutex_enter(&ibuf_mutex);
4409 
4410 	if (!ibuf_restore_pos(space, page_no, search_tuple,
4411 			      BTR_MODIFY_TREE | BTR_LATCH_FOR_DELETE,
4412 			      pcur, mtr)) {
4413 
4414 		mutex_exit(&ibuf_mutex);
4415 		ut_ad(mtr->has_committed());
4416 		goto func_exit;
4417 	}
4418 
4419 	root = ibuf_tree_root_get(mtr);
4420 
4421 	btr_cur_pessimistic_delete(&err, TRUE, btr_pcur_get_btr_cur(pcur), 0,
4422 				   false, mtr);
4423 	ut_a(err == DB_SUCCESS);
4424 
4425 #ifdef UNIV_IBUF_COUNT_DEBUG
4426 	ibuf_count_set(page_id, ibuf_count_get(page_id) - 1);
4427 #endif /* UNIV_IBUF_COUNT_DEBUG */
4428 
4429 	ibuf_size_update(root);
4430 	mutex_exit(&ibuf_mutex);
4431 
4432 	ibuf->empty = page_is_empty(root);
4433 	ibuf_btr_pcur_commit_specify_mtr(pcur, mtr);
4434 
4435 func_exit:
4436 	ut_ad(mtr->has_committed());
4437 	btr_pcur_close(pcur);
4438 
4439 	return(TRUE);
4440 }
4441 
4442 /** When an index page is read from a disk to the buffer pool, this function
4443 applies any buffered operations to the page and deletes the entries from the
4444 insert buffer. If the page is not read, but created in the buffer pool, this
4445 function deletes its buffered entries from the insert buffer; there can
4446 exist entries for such a page if the page belonged to an index which
4447 subsequently was dropped.
4448 @param[in,out]	block			if page has been read from disk,
4449 pointer to the page x-latched, else NULL
4450 @param[in]	page_id			page id of the index page
4451 @param[in]	update_ibuf_bitmap	normally this is set to TRUE, but
4452 if we have deleted or are deleting the tablespace, then we naturally do not
4453 want to update a non-existent bitmap page */
4454 void
ibuf_merge_or_delete_for_page(buf_block_t * block,const page_id_t & page_id,const page_size_t * page_size,ibool update_ibuf_bitmap)4455 ibuf_merge_or_delete_for_page(
4456 	buf_block_t*		block,
4457 	const page_id_t&	page_id,
4458 	const page_size_t*	page_size,
4459 	ibool			update_ibuf_bitmap)
4460 {
4461 	mem_heap_t*	heap;
4462 	btr_pcur_t	pcur;
4463 	dtuple_t*	search_tuple;
4464 #ifdef UNIV_IBUF_DEBUG
4465 	ulint		volume			= 0;
4466 #endif /* UNIV_IBUF_DEBUG */
4467 	page_zip_des_t*	page_zip		= NULL;
4468 	fil_space_t*	space			= NULL;
4469 	bool		corruption_noticed	= false;
4470 	mtr_t		mtr;
4471 
4472 	/* Counts for merged & discarded operations. */
4473 	ulint		mops[IBUF_OP_COUNT];
4474 	ulint		dops[IBUF_OP_COUNT];
4475 
4476 	ut_ad(block == NULL || page_id.equals_to(block->page.id));
4477 	ut_ad(block == NULL
4478 	      || buf_block_get_io_fix_unlocked(block) == BUF_IO_READ);
4479 
4480 	if (srv_force_recovery >= SRV_FORCE_NO_IBUF_MERGE
4481 	    || trx_sys_hdr_page(page_id)
4482 	    || fsp_is_system_temporary(page_id.space())) {
4483 		return;
4484 	}
4485 
4486 	/* We cannot refer to page_size in the following, because it is passed
4487 	as NULL (it is unknown) when buf_read_ibuf_merge_pages() is merging
4488 	(discarding) changes for a dropped tablespace. When block != NULL or
4489 	update_ibuf_bitmap is specified, then page_size must be known.
4490 	That is why we will repeat the check below, with page_size in
4491 	place of univ_page_size. Passing univ_page_size assumes that the
4492 	uncompressed page size always is a power-of-2 multiple of the
4493 	compressed page size. */
4494 
4495 	if (ibuf_fixed_addr_page(page_id, univ_page_size)
4496 	    || fsp_descr_page(page_id, univ_page_size)) {
4497 		return;
4498 	}
4499 
4500 	if (update_ibuf_bitmap) {
4501 
4502 		ut_ad(page_size != NULL);
4503 
4504 		if (ibuf_fixed_addr_page(page_id, *page_size)
4505 		    || fsp_descr_page(page_id, *page_size)) {
4506 			return;
4507 		}
4508 
4509 		space = fil_space_acquire(page_id.space());
4510 
4511 		if (space == NULL) {
4512 			/* Do not try to read the bitmap page from space;
4513 			just delete the ibuf records for the page */
4514 
4515 			block = NULL;
4516 			update_ibuf_bitmap = FALSE;
4517 		} else {
4518 			page_t*	bitmap_page;
4519 			ulint	bitmap_bits;
4520 
4521 			ibuf_mtr_start(&mtr);
4522 
4523 			bitmap_page = ibuf_bitmap_get_map_page(
4524 				page_id, *page_size, &mtr);
4525 
4526 			bitmap_bits = ibuf_bitmap_page_get_bits(
4527 				bitmap_page, page_id, *page_size,
4528 				IBUF_BITMAP_BUFFERED, &mtr);
4529 
4530 			ibuf_mtr_commit(&mtr);
4531 
4532 			if (!bitmap_bits) {
4533 				/* No inserts buffered for this page */
4534 
4535 				fil_space_release(space);
4536 				return;
4537 			}
4538 		}
4539 	} else if (block != NULL
4540 		   && (ibuf_fixed_addr_page(page_id, *page_size)
4541 		       || fsp_descr_page(page_id, *page_size))) {
4542 
4543 		return;
4544 	}
4545 
4546 	heap = mem_heap_create(512);
4547 
4548 	search_tuple = ibuf_search_tuple_build(
4549 		page_id.space(), page_id.page_no(), heap);
4550 
4551 	if (block != NULL) {
4552 		/* Move the ownership of the x-latch on the page to this OS
4553 		thread, so that we can acquire a second x-latch on it. This
4554 		is needed for the insert operations to the index page to pass
4555 		the debug checks. */
4556 
4557 		rw_lock_x_lock_move_ownership(&(block->lock));
4558 		page_zip = buf_block_get_page_zip(block);
4559 
4560 		if (!fil_page_index_page_check(block->frame)
4561 		    || !page_is_leaf(block->frame)) {
4562 
4563 			corruption_noticed = true;
4564 
4565 			ib::error() << "Corruption in the tablespace. Bitmap"
4566 				" shows insert buffer records to page "
4567 				<< page_id << " though the page type is "
4568 				<< fil_page_get_type(block->frame)
4569 				<< ", which is not an index leaf page. We try"
4570 				" to resolve the problem by skipping the"
4571 				" insert buffer merge for this page. Please"
4572 				" run CHECK TABLE on your tables to determine"
4573 				" if they are corrupt after this.";
4574 
4575 			ib::error() << "Please submit a detailed bug"
4576 				" report to http://bugs.mysql.com";
4577 			ut_ad(0);
4578 		}
4579 	}
4580 
4581 	memset(mops, 0, sizeof(mops));
4582 	memset(dops, 0, sizeof(dops));
4583 
4584 loop:
4585 	ibuf_mtr_start(&mtr);
4586 
4587 	/* Position pcur in the insert buffer at the first entry for this
4588 	index page */
4589 	btr_pcur_open_on_user_rec(
4590 		ibuf->index, search_tuple, PAGE_CUR_GE, BTR_MODIFY_LEAF,
4591 		&pcur, &mtr);
4592 
4593 	if (block != NULL) {
4594 		ibool success;
4595 
4596 		mtr.set_named_space(page_id.space());
4597 
4598 		success = buf_page_get_known_nowait(
4599 			RW_X_LATCH, block,
4600 			BUF_KEEP_OLD, __FILE__, __LINE__, &mtr);
4601 
4602 		ut_a(success);
4603 
4604 		/* This is a user page (secondary index leaf page),
4605 		but we pretend that it is a change buffer page in
4606 		order to obey the latching order. This should be OK,
4607 		because buffered changes are applied immediately while
4608 		the block is io-fixed. Other threads must not try to
4609 		latch an io-fixed block. */
4610 		buf_block_dbg_add_level(block, SYNC_IBUF_TREE_NODE);
4611 	} else if (update_ibuf_bitmap) {
4612 		mtr.set_named_space(page_id.space());
4613 	}
4614 
4615 	if (!btr_pcur_is_on_user_rec(&pcur)) {
4616 		ut_ad(btr_pcur_is_after_last_in_tree(&pcur, &mtr));
4617 
4618 		goto reset_bit;
4619 	}
4620 
4621 	for (;;) {
4622 		rec_t*	rec;
4623 
4624 		ut_ad(btr_pcur_is_on_user_rec(&pcur));
4625 
4626 		rec = btr_pcur_get_rec(&pcur);
4627 
4628 		/* Check if the entry is for this index page */
4629 		if (ibuf_rec_get_page_no(&mtr, rec) != page_id.page_no()
4630 		    || ibuf_rec_get_space(&mtr, rec) != page_id.space()) {
4631 
4632 			if (block != NULL) {
4633 				page_header_reset_last_insert(
4634 					block->frame, page_zip, &mtr);
4635 			}
4636 
4637 			goto reset_bit;
4638 		}
4639 
4640 		if (corruption_noticed) {
4641 			fputs("InnoDB: Discarding record\n ", stderr);
4642 			rec_print_old(stderr, rec);
4643 			fputs("\nInnoDB: from the insert buffer!\n\n", stderr);
4644 		} else if (block != NULL && !rec_get_deleted_flag(rec, 0)) {
4645 			/* Now we have at pcur a record which should be
4646 			applied on the index page; NOTE that the call below
4647 			copies pointers to fields in rec, and we must
4648 			keep the latch to the rec page until the
4649 			insertion is finished! */
4650 			dtuple_t*	entry;
4651 			trx_id_t	max_trx_id;
4652 			dict_index_t*	dummy_index;
4653 			ibuf_op_t	op = ibuf_rec_get_op_type(&mtr, rec);
4654 
4655 			max_trx_id = page_get_max_trx_id(page_align(rec));
4656 			page_update_max_trx_id(block, page_zip, max_trx_id,
4657 					       &mtr);
4658 
4659 			ut_ad(page_validate(page_align(rec), ibuf->index));
4660 
4661 			entry = ibuf_build_entry_from_ibuf_rec(
4662 				&mtr, rec, heap, &dummy_index);
4663 
4664 			ut_ad(page_validate(block->frame, dummy_index));
4665 
4666 			switch (op) {
4667 				ibool	success;
4668 			case IBUF_OP_INSERT:
4669 #ifdef UNIV_IBUF_DEBUG
4670 				volume += rec_get_converted_size(
4671 					dummy_index, entry, 0);
4672 
4673 				volume += page_dir_calc_reserved_space(1);
4674 
4675 				ut_a(volume <= 4 * UNIV_PAGE_SIZE
4676 					/ IBUF_PAGE_SIZE_PER_FREE_SPACE);
4677 #endif
4678 				ibuf_insert_to_index_page(
4679 					entry, block, dummy_index, &mtr);
4680 				break;
4681 
4682 			case IBUF_OP_DELETE_MARK:
4683 				ibuf_set_del_mark(
4684 					entry, block, dummy_index, &mtr);
4685 				break;
4686 
4687 			case IBUF_OP_DELETE:
4688 				ibuf_delete(entry, block, dummy_index, &mtr);
4689 				/* Because ibuf_delete() will latch an
4690 				insert buffer bitmap page, commit mtr
4691 				before latching any further pages.
4692 				Store and restore the cursor position. */
4693 				ut_ad(rec == btr_pcur_get_rec(&pcur));
4694 				ut_ad(page_rec_is_user_rec(rec));
4695 				ut_ad(ibuf_rec_get_page_no(&mtr, rec)
4696 				      == page_id.page_no());
4697 				ut_ad(ibuf_rec_get_space(&mtr, rec)
4698 				      == page_id.space());
4699 
4700 				/* Mark the change buffer record processed,
4701 				so that it will not be merged again in case
4702 				the server crashes between the following
4703 				mtr_commit() and the subsequent mtr_commit()
4704 				of deleting the change buffer record. */
4705 
4706 				btr_cur_set_deleted_flag_for_ibuf(
4707 					btr_pcur_get_rec(&pcur), NULL,
4708 					TRUE, &mtr);
4709 
4710 				btr_pcur_store_position(&pcur, &mtr);
4711 				ibuf_btr_pcur_commit_specify_mtr(&pcur, &mtr);
4712 
4713 				ibuf_mtr_start(&mtr);
4714 				mtr.set_named_space(page_id.space());
4715 
4716 				success = buf_page_get_known_nowait(
4717 					RW_X_LATCH, block,
4718 					BUF_KEEP_OLD,
4719 					__FILE__, __LINE__, &mtr);
4720 				ut_a(success);
4721 
4722 				/* This is a user page (secondary
4723 				index leaf page), but it should be OK
4724 				to use too low latching order for it,
4725 				as the block is io-fixed. */
4726 				buf_block_dbg_add_level(
4727 					block, SYNC_IBUF_TREE_NODE);
4728 
4729 				if (!ibuf_restore_pos(page_id.space(),
4730 						      page_id.page_no(),
4731 						      search_tuple,
4732 						      BTR_MODIFY_LEAF,
4733 						      &pcur, &mtr)) {
4734 
4735 					ut_ad(mtr.has_committed());
4736 					mops[op]++;
4737 					ibuf_dummy_index_free(dummy_index);
4738 					goto loop;
4739 				}
4740 
4741 				break;
4742 			default:
4743 				ut_error;
4744 			}
4745 
4746 			mops[op]++;
4747 
4748 			ibuf_dummy_index_free(dummy_index);
4749 		} else {
4750 			dops[ibuf_rec_get_op_type(&mtr, rec)]++;
4751 		}
4752 
4753 		/* Delete the record from ibuf */
4754 		if (ibuf_delete_rec(page_id.space(), page_id.page_no(),
4755 				    &pcur, search_tuple, &mtr)) {
4756 			/* Deletion was pessimistic and mtr was committed:
4757 			we start from the beginning again */
4758 
4759 			ut_ad(mtr.has_committed());
4760 			goto loop;
4761 		} else if (btr_pcur_is_after_last_on_page(&pcur)) {
4762 			ibuf_mtr_commit(&mtr);
4763 			btr_pcur_close(&pcur);
4764 
4765 			goto loop;
4766 		}
4767 	}
4768 
4769 reset_bit:
4770 	if (update_ibuf_bitmap) {
4771 		page_t*	bitmap_page;
4772 
4773 		bitmap_page = ibuf_bitmap_get_map_page(page_id, *page_size,
4774 						       &mtr);
4775 
4776 		ibuf_bitmap_page_set_bits(
4777 			bitmap_page, page_id, *page_size,
4778 			IBUF_BITMAP_BUFFERED, FALSE, &mtr);
4779 
4780 		if (block != NULL) {
4781 			ulint old_bits = ibuf_bitmap_page_get_bits(
4782 				bitmap_page, page_id, *page_size,
4783 				IBUF_BITMAP_FREE, &mtr);
4784 
4785 			ulint new_bits = ibuf_index_page_calc_free(block);
4786 
4787 			if (old_bits != new_bits) {
4788 				ibuf_bitmap_page_set_bits(
4789 					bitmap_page, page_id, *page_size,
4790 					IBUF_BITMAP_FREE, new_bits, &mtr);
4791 			}
4792 		}
4793 	}
4794 
4795 	ibuf_mtr_commit(&mtr);
4796 	btr_pcur_close(&pcur);
4797 	mem_heap_free(heap);
4798 
4799 	os_atomic_increment_ulint(&ibuf->n_merges, 1);
4800 	ibuf_add_ops(ibuf->n_merged_ops, mops);
4801 	ibuf_add_ops(ibuf->n_discarded_ops, dops);
4802 
4803 	if (space != NULL) {
4804 		fil_space_release(space);
4805 	}
4806 
4807 #ifdef UNIV_IBUF_COUNT_DEBUG
4808 	ut_a(ibuf_count_get(page_id) == 0);
4809 #endif
4810 }
4811 
4812 /*********************************************************************//**
4813 Deletes all entries in the insert buffer for a given space id. This is used
4814 in DISCARD TABLESPACE, IMPORT TABLESPACE and TRUNCATE TABLESPACE.
4815 NOTE: this does not update the page free bitmaps in the space. The space will
4816 become CORRUPT when you call this function! */
4817 void
ibuf_delete_for_discarded_space(ulint space)4818 ibuf_delete_for_discarded_space(
4819 /*============================*/
4820 	ulint	space)	/*!< in: space id */
4821 {
4822 	mem_heap_t*	heap;
4823 	btr_pcur_t	pcur;
4824 	dtuple_t*	search_tuple;
4825 	const rec_t*	ibuf_rec;
4826 	ulint		page_no;
4827 	mtr_t		mtr;
4828 
4829 	/* Counts for discarded operations. */
4830 	ulint		dops[IBUF_OP_COUNT];
4831 
4832 	heap = mem_heap_create(512);
4833 
4834 	/* Use page number 0 to build the search tuple so that we get the
4835 	cursor positioned at the first entry for this space id */
4836 
4837 	search_tuple = ibuf_search_tuple_build(space, 0, heap);
4838 
4839 	memset(dops, 0, sizeof(dops));
4840 loop:
4841 	ibuf_mtr_start(&mtr);
4842 
4843 	/* Position pcur in the insert buffer at the first entry for the
4844 	space */
4845 	btr_pcur_open_on_user_rec(
4846 		ibuf->index, search_tuple, PAGE_CUR_GE, BTR_MODIFY_LEAF,
4847 		&pcur, &mtr);
4848 
4849 	if (!btr_pcur_is_on_user_rec(&pcur)) {
4850 		ut_ad(btr_pcur_is_after_last_in_tree(&pcur, &mtr));
4851 
4852 		goto leave_loop;
4853 	}
4854 
4855 	for (;;) {
4856 		ut_ad(btr_pcur_is_on_user_rec(&pcur));
4857 
4858 		ibuf_rec = btr_pcur_get_rec(&pcur);
4859 
4860 		/* Check if the entry is for this space */
4861 		if (ibuf_rec_get_space(&mtr, ibuf_rec) != space) {
4862 
4863 			goto leave_loop;
4864 		}
4865 
4866 		page_no = ibuf_rec_get_page_no(&mtr, ibuf_rec);
4867 
4868 		dops[ibuf_rec_get_op_type(&mtr, ibuf_rec)]++;
4869 
4870 		/* Delete the record from ibuf */
4871 		if (ibuf_delete_rec(space, page_no, &pcur, search_tuple,
4872 				    &mtr)) {
4873 			/* Deletion was pessimistic and mtr was committed:
4874 			we start from the beginning again */
4875 
4876 			ut_ad(mtr.has_committed());
4877 			goto loop;
4878 		}
4879 
4880 		if (btr_pcur_is_after_last_on_page(&pcur)) {
4881 			ibuf_mtr_commit(&mtr);
4882 			btr_pcur_close(&pcur);
4883 
4884 			goto loop;
4885 		}
4886 	}
4887 
4888 leave_loop:
4889 	ibuf_mtr_commit(&mtr);
4890 	btr_pcur_close(&pcur);
4891 
4892 	ibuf_add_ops(ibuf->n_discarded_ops, dops);
4893 
4894 	mem_heap_free(heap);
4895 }
4896 
4897 /******************************************************************//**
4898 Looks if the insert buffer is empty.
4899 @return true if empty */
4900 bool
ibuf_is_empty(void)4901 ibuf_is_empty(void)
4902 /*===============*/
4903 {
4904 	bool		is_empty;
4905 	const page_t*	root;
4906 	mtr_t		mtr;
4907 
4908 	ibuf_mtr_start(&mtr);
4909 
4910 	mutex_enter(&ibuf_mutex);
4911 	root = ibuf_tree_root_get(&mtr);
4912 	mutex_exit(&ibuf_mutex);
4913 
4914 	is_empty = page_is_empty(root);
4915 	ut_a(is_empty == ibuf->empty);
4916 	ibuf_mtr_commit(&mtr);
4917 
4918 	return(is_empty);
4919 }
4920 
4921 /******************************************************************//**
4922 Prints info of ibuf. */
4923 void
ibuf_print(FILE * file)4924 ibuf_print(
4925 /*=======*/
4926 	FILE*	file)	/*!< in: file where to print */
4927 {
4928 #ifdef UNIV_IBUF_COUNT_DEBUG
4929 	ulint		i;
4930 	ulint		j;
4931 #endif
4932 
4933 	mutex_enter(&ibuf_mutex);
4934 
4935 	fprintf(file,
4936 		"Ibuf: size %lu, free list len %lu,"
4937 		" seg size %lu, %lu merges\n",
4938 		(ulong) ibuf->size,
4939 		(ulong) ibuf->free_list_len,
4940 		(ulong) ibuf->seg_size,
4941 		(ulong) ibuf->n_merges);
4942 
4943 	fputs("merged operations:\n ", file);
4944 	ibuf_print_ops(ibuf->n_merged_ops, file);
4945 
4946 	fputs("discarded operations:\n ", file);
4947 	ibuf_print_ops(ibuf->n_discarded_ops, file);
4948 
4949 #ifdef UNIV_IBUF_COUNT_DEBUG
4950 	for (i = 0; i < IBUF_COUNT_N_SPACES; i++) {
4951 		for (j = 0; j < IBUF_COUNT_N_PAGES; j++) {
4952 			ulint	count = ibuf_count_get(page_id_t(i, j, 0));
4953 
4954 			if (count > 0) {
4955 				fprintf(stderr,
4956 					"Ibuf count for space/page %lu/%lu"
4957 					" is %lu\n",
4958 					(ulong) i, (ulong) j, (ulong) count);
4959 			}
4960 		}
4961 	}
4962 #endif /* UNIV_IBUF_COUNT_DEBUG */
4963 
4964 	mutex_exit(&ibuf_mutex);
4965 }
4966 
4967 /******************************************************************//**
4968 Checks the insert buffer bitmaps on IMPORT TABLESPACE.
4969 @return DB_SUCCESS or error code */
4970 dberr_t
ibuf_check_bitmap_on_import(const trx_t * trx,ulint space_id)4971 ibuf_check_bitmap_on_import(
4972 /*========================*/
4973 	const trx_t*	trx,		/*!< in: transaction */
4974 	ulint		space_id)	/*!< in: tablespace identifier */
4975 {
4976 	ulint	size;
4977 	ulint	page_no;
4978 
4979 	ut_ad(space_id);
4980 	ut_ad(trx->mysql_thd);
4981 
4982 	bool			found;
4983 	const page_size_t&	page_size
4984 		= fil_space_get_page_size(space_id, &found);
4985 
4986 	if (!found) {
4987 		return(DB_TABLE_NOT_FOUND);
4988 	}
4989 
4990 	size = fil_space_get_size(space_id);
4991 
4992 	if (size == 0) {
4993 		return(DB_TABLE_NOT_FOUND);
4994 	}
4995 
4996 	mutex_enter(&ibuf_mutex);
4997 
4998 	/* The two bitmap pages (allocation bitmap and ibuf bitmap) repeat
4999 	every page_size pages. For example if page_size is 16 KiB, then the
5000 	two bitmap pages repeat every 16 KiB * 16384 = 256 MiB. In the loop
5001 	below page_no is measured in number of pages since the beginning of
5002 	the space, as usual. */
5003 
5004 	for (page_no = 0; page_no < size; page_no += page_size.physical()) {
5005 		mtr_t	mtr;
5006 		page_t*	bitmap_page;
5007 		ulint	i;
5008 
5009 		if (trx_is_interrupted(trx)) {
5010 			mutex_exit(&ibuf_mutex);
5011 			return(DB_INTERRUPTED);
5012 		}
5013 
5014 		mtr_start(&mtr);
5015 
5016 		mtr_set_log_mode(&mtr, MTR_LOG_NO_REDO);
5017 
5018 		ibuf_enter(&mtr);
5019 
5020 		dberr_t err = DB_SUCCESS;
5021 
5022 		bitmap_page = ibuf_bitmap_get_map_page_func(page_id_t(space_id, page_no), page_size,
5023 							    __FILE__, __LINE__, &mtr, &err);
5024 
5025 		if (err != DB_SUCCESS)
5026 			return err;
5027 
5028 		if (buf_page_is_zeroes(bitmap_page, page_size)) {
5029 			/* This means we got all-zero page instead of
5030 			ibuf bitmap page. The subsequent page should be
5031 			all-zero pages. */
5032 #ifdef UNIV_DEBUG
5033 			for (ulint curr_page = page_no + 1;
5034 			     curr_page < page_size.physical(); curr_page++) {
5035 
5036 				buf_block_t* block = buf_page_get(
5037 						page_id_t(space_id, curr_page),
5038 						page_size,
5039 						RW_S_LATCH, &mtr);
5040 	                        page_t*	page = buf_block_get_frame(block);
5041 				ut_ad(buf_page_is_zeroes(page, page_size));
5042 			}
5043 #endif /* UNIV_DEBUG */
5044 			ibuf_exit(&mtr);
5045 			mtr_commit(&mtr);
5046 			continue;
5047 		}
5048 
5049 		for (i = FSP_IBUF_BITMAP_OFFSET + 1;
5050 		     i < page_size.physical();
5051 		     i++) {
5052 
5053 			const ulint	offset = page_no + i;
5054 
5055 			const page_id_t	cur_page_id(space_id, offset);
5056 
5057 			if (ibuf_bitmap_page_get_bits(
5058 					bitmap_page, cur_page_id, page_size,
5059 					IBUF_BITMAP_IBUF, &mtr)) {
5060 
5061 				mutex_exit(&ibuf_mutex);
5062 				ibuf_exit(&mtr);
5063 				mtr_commit(&mtr);
5064 
5065 				ib_errf(trx->mysql_thd,
5066 					IB_LOG_LEVEL_ERROR,
5067 					 ER_INNODB_INDEX_CORRUPT,
5068 					 "Space %u page %u"
5069 					 " is wrongly flagged to belong to the"
5070 					 " insert buffer",
5071 					 (unsigned) space_id,
5072 					 (unsigned) offset);
5073 
5074 				return(DB_CORRUPTION);
5075 			}
5076 
5077 			if (ibuf_bitmap_page_get_bits(
5078 				    bitmap_page, cur_page_id, page_size,
5079 				    IBUF_BITMAP_BUFFERED, &mtr)) {
5080 
5081 				ib_errf(trx->mysql_thd,
5082 					IB_LOG_LEVEL_WARN,
5083 					ER_INNODB_INDEX_CORRUPT,
5084 					"Buffered changes"
5085 					" for space %u page %u are lost",
5086 					(unsigned) space_id,
5087 					(unsigned) offset);
5088 
5089 				/* Tolerate this error, so that
5090 				slightly corrupted tables can be
5091 				imported and dumped.  Clear the bit. */
5092 				ibuf_bitmap_page_set_bits(
5093 					bitmap_page, cur_page_id, page_size,
5094 					IBUF_BITMAP_BUFFERED, FALSE, &mtr);
5095 			}
5096 		}
5097 
5098 		ibuf_exit(&mtr);
5099 		mtr_commit(&mtr);
5100 	}
5101 
5102 	mutex_exit(&ibuf_mutex);
5103 	return(DB_SUCCESS);
5104 }
5105 
5106 /** Updates free bits and buffered bits for bulk loaded page.
5107 @param[in]	block	index page
5108 @param[in]	reset	flag if reset free val */
5109 void
ibuf_set_bitmap_for_bulk_load(buf_block_t * block,bool reset)5110 ibuf_set_bitmap_for_bulk_load(
5111 	buf_block_t*	block,
5112 	bool		reset)
5113 {
5114 	page_t*	bitmap_page;
5115 	mtr_t	mtr;
5116 	ulint	free_val;
5117 
5118 	ut_a(page_is_leaf(buf_block_get_frame(block)));
5119 
5120 	free_val = ibuf_index_page_calc_free(block);
5121 
5122 	mtr_start(&mtr);
5123 	mtr.set_named_space(block->page.id.space());
5124 
5125 	bitmap_page = ibuf_bitmap_get_map_page(block->page.id,
5126                                                block->page.size, &mtr);
5127 
5128 	free_val = reset ? 0 : ibuf_index_page_calc_free(block);
5129 	ibuf_bitmap_page_set_bits(
5130 		bitmap_page, block->page.id, block->page.size,
5131 		IBUF_BITMAP_FREE, free_val, &mtr);
5132 
5133 	ibuf_bitmap_page_set_bits(
5134 		bitmap_page, block->page.id, block->page.size,
5135 		IBUF_BITMAP_BUFFERED, FALSE, &mtr);
5136 
5137 	mtr_commit(&mtr);
5138 }
5139 
5140 #endif /* !UNIV_HOTBACKUP */
5141