1 /*****************************************************************************
2 
3 Copyright (c) 2014, 2021, Oracle and/or its affiliates.
4 
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License, version 2.0,
7 as published by the Free Software Foundation.
8 
9 This program is also distributed with certain software (including
10 but not limited to OpenSSL) that is licensed under separate terms,
11 as designated in a particular file or component or in included license
12 documentation.  The authors of MySQL hereby grant you an additional
13 permission to link the program and your derivative works with the
14 separately licensed software that they have included with MySQL.
15 
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 GNU General Public License, version 2.0, for more details.
20 
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
24 
25 *****************************************************************************/
26 
27 /**************************************************//**
28 @file btr/btr0bulk.cc
29 The B-tree bulk load
30 
31 Created 03/11/2014 Shaohua Wang
32 *******************************************************/
33 
34 #include "btr0bulk.h"
35 #include "btr0btr.h"
36 #include "btr0cur.h"
37 #include "btr0pcur.h"
38 #include "ibuf0ibuf.h"
39 
40 /** Innodb B-tree index fill factor for bulk load. */
41 long	innobase_fill_factor;
42 
43 /** Initialize members, allocate page if needed and start mtr.
44 Note: we commit all mtrs on failure.
45 @return error code. */
46 dberr_t
init()47 PageBulk::init()
48 {
49 	mtr_t*		mtr;
50 	buf_block_t*	new_block;
51 	page_t*		new_page;
52 	page_zip_des_t*	new_page_zip;
53 	ulint		new_page_no;
54 
55 	ut_ad(m_heap == NULL);
56 	m_heap = mem_heap_create(1000);
57 
58 	mtr = static_cast<mtr_t*>(
59 		mem_heap_alloc(m_heap, sizeof(mtr_t)));
60 	mtr_start(mtr);
61 
62 	if (!dict_index_is_online_ddl(m_index)) {
63 		mtr_x_lock(dict_index_get_lock(m_index), mtr);
64 	}
65 
66 	mtr_set_log_mode(mtr, MTR_LOG_NO_REDO);
67 	mtr_set_flush_observer(mtr, m_flush_observer);
68 
69 	if (m_page_no == FIL_NULL) {
70 		mtr_t	alloc_mtr;
71 
72 		/* We commit redo log for allocation by a separate mtr,
73 		because we don't guarantee pages are committed following
74 		the allocation order, and we will always generate redo log
75 		for page allocation, even when creating a new tablespace. */
76 		mtr_start(&alloc_mtr);
77 		alloc_mtr.set_named_space(dict_index_get_space(m_index));
78 
79 		ulint	n_reserved;
80 		bool	success;
81 		success = fsp_reserve_free_extents(&n_reserved, m_index->space,
82 						   1, FSP_NORMAL, &alloc_mtr);
83 		if (!success) {
84 			mtr_commit(&alloc_mtr);
85 			mtr_commit(mtr);
86 			return(DB_OUT_OF_FILE_SPACE);
87 		}
88 
89 		/* Allocate a new page. */
90 		new_block = btr_page_alloc(m_index, 0, FSP_UP, m_level,
91 					   &alloc_mtr, mtr);
92 
93 		if (n_reserved > 0) {
94 			fil_space_release_free_extents(m_index->space,
95 						       n_reserved);
96 		}
97 
98 		mtr_commit(&alloc_mtr);
99 
100 		new_page = buf_block_get_frame(new_block);
101 		new_page_zip = buf_block_get_page_zip(new_block);
102 		new_page_no = page_get_page_no(new_page);
103 
104 		if (new_page_zip) {
105 			page_create_zip(new_block, m_index, m_level, 0,
106 					NULL, mtr);
107 		} else {
108 			ut_ad(!dict_index_is_spatial(m_index));
109 			page_create(new_block, mtr,
110 				    dict_table_is_comp(m_index->table),
111 				    false);
112 			btr_page_set_level(new_page, NULL, m_level, mtr);
113 		}
114 
115 		btr_page_set_next(new_page, NULL, FIL_NULL, mtr);
116 		btr_page_set_prev(new_page, NULL, FIL_NULL, mtr);
117 
118 		btr_page_set_index_id(new_page, NULL, m_index->id, mtr);
119 	} else {
120 		page_id_t	page_id(dict_index_get_space(m_index), m_page_no);
121 		page_size_t	page_size(dict_table_page_size(m_index->table));
122 
123 		new_block = btr_block_get(page_id, page_size,
124 					  RW_X_LATCH, m_index, mtr);
125 
126 		new_page = buf_block_get_frame(new_block);
127 		new_page_zip = buf_block_get_page_zip(new_block);
128 		new_page_no = page_get_page_no(new_page);
129 		ut_ad(m_page_no == new_page_no);
130 
131 		ut_ad(page_dir_get_n_heap(new_page) == PAGE_HEAP_NO_USER_LOW);
132 
133 		btr_page_set_level(new_page, NULL, m_level, mtr);
134 	}
135 
136 	if (dict_index_is_sec_or_ibuf(m_index)
137 	    && !dict_table_is_temporary(m_index->table)
138 	    && page_is_leaf(new_page)) {
139 		page_update_max_trx_id(new_block, NULL, m_trx_id, mtr);
140 	}
141 
142 	m_mtr = mtr;
143 	m_block = new_block;
144 	m_block->skip_flush_check = true;
145 	m_page = new_page;
146 	m_page_zip = new_page_zip;
147 	m_page_no = new_page_no;
148 	m_cur_rec = page_get_infimum_rec(new_page);
149 	ut_ad(m_is_comp == !!page_is_comp(new_page));
150 	m_free_space = page_get_free_space_of_empty(m_is_comp);
151 
152 	if (innobase_fill_factor == 100 && dict_index_is_clust(m_index)) {
153 		/* Keep default behavior compatible with 5.6 */
154 		m_reserved_space = dict_index_get_space_reserve();
155 	} else {
156 		m_reserved_space =
157 			UNIV_PAGE_SIZE * (100 - innobase_fill_factor) / 100;
158 	}
159 
160 	m_padding_space =
161 		UNIV_PAGE_SIZE - dict_index_zip_pad_optimal_page_size(m_index);
162 	m_heap_top = page_header_get_ptr(new_page, PAGE_HEAP_TOP);
163 	m_rec_no = page_header_get_field(new_page, PAGE_N_RECS);
164 
165 	ut_d(m_total_data = 0);
166 	page_header_set_field(m_page, NULL, PAGE_HEAP_TOP, UNIV_PAGE_SIZE - 1);
167 
168 	return(DB_SUCCESS);
169 }
170 
171 /** Insert a record in the page.
172 @param[in]	rec		record
173 @param[in]	offsets		record offsets */
174 void
insert(const rec_t * rec,ulint * offsets)175 PageBulk::insert(
176 	const rec_t*		rec,
177 	ulint*			offsets)
178 {
179 	ulint		rec_size;
180 
181 	ut_ad(m_heap != NULL);
182 
183 	rec_size = rec_offs_size(offsets);
184 
185 #ifdef UNIV_DEBUG
186 	/* Check whether records are in order. */
187 	if (!page_rec_is_infimum(m_cur_rec)) {
188 		rec_t*	old_rec = m_cur_rec;
189 		ulint*	old_offsets = rec_get_offsets(
190 			old_rec, m_index, NULL,	ULINT_UNDEFINED, &m_heap);
191 
192 		ut_ad(cmp_rec_rec(rec, old_rec, offsets, old_offsets, m_index,
193 				  page_is_spatial_non_leaf(old_rec, m_index))
194 		      > 0);
195 	}
196 
197 	m_total_data += rec_size;
198 #endif /* UNIV_DEBUG */
199 
200 	/* 1. Copy the record to page. */
201 	rec_t*	insert_rec = rec_copy(m_heap_top, rec, offsets);
202 	rec_offs_make_valid(insert_rec, m_index, offsets);
203 
204 	/* 2. Insert the record in the linked list. */
205 	rec_t*	next_rec = page_rec_get_next(m_cur_rec);
206 
207 	page_rec_set_next(insert_rec, next_rec);
208 	page_rec_set_next(m_cur_rec, insert_rec);
209 
210 	/* 3. Set the n_owned field in the inserted record to zero,
211 	and set the heap_no field. */
212 	if (m_is_comp) {
213 		rec_set_n_owned_new(insert_rec, NULL, 0);
214 		rec_set_heap_no_new(insert_rec,
215 				    PAGE_HEAP_NO_USER_LOW + m_rec_no);
216 	} else {
217 		rec_set_n_owned_old(insert_rec, 0);
218 		rec_set_heap_no_old(insert_rec,
219 				    PAGE_HEAP_NO_USER_LOW + m_rec_no);
220 	}
221 
222 	/* 4. Set member variables. */
223 	ulint		slot_size;
224 	slot_size = page_dir_calc_reserved_space(m_rec_no + 1)
225 		- page_dir_calc_reserved_space(m_rec_no);
226 
227 	ut_ad(m_free_space >= rec_size + slot_size);
228 	ut_ad(m_heap_top + rec_size < m_page + UNIV_PAGE_SIZE);
229 
230 	m_free_space -= rec_size + slot_size;
231 	m_heap_top += rec_size;
232 	m_rec_no += 1;
233 	m_cur_rec = insert_rec;
234 }
235 
236 /** Mark end of insertion to the page. Scan all records to set page dirs,
237 and set page header members.
238 Note: we refer to page_copy_rec_list_end_to_created_page. */
239 void
finish()240 PageBulk::finish()
241 {
242 	ut_ad(m_rec_no > 0);
243 
244 #ifdef UNIV_DEBUG
245 	ut_ad(m_total_data + page_dir_calc_reserved_space(m_rec_no)
246 	      <= page_get_free_space_of_empty(m_is_comp));
247 
248 	/* To pass the debug tests we have to set these dummy values
249 	in the debug version */
250 	page_dir_set_n_slots(m_page, NULL, UNIV_PAGE_SIZE / 2);
251 #endif
252 
253 	ulint	count = 0;
254 	ulint	n_recs = 0;
255 	ulint	slot_index = 0;
256 	rec_t*	insert_rec = page_rec_get_next(page_get_infimum_rec(m_page));
257 	page_dir_slot_t* slot = NULL;
258 
259 	/* Set owner & dir. */
260 	do {
261 
262 		count++;
263 		n_recs++;
264 
265 		if (count == (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2) {
266 
267 			slot_index++;
268 
269 			slot = page_dir_get_nth_slot(m_page, slot_index);
270 
271 			page_dir_slot_set_rec(slot, insert_rec);
272 			page_dir_slot_set_n_owned(slot, NULL, count);
273 
274 			count = 0;
275 		}
276 
277 		insert_rec = page_rec_get_next(insert_rec);
278 	} while (!page_rec_is_supremum(insert_rec));
279 
280 	if (slot_index > 0
281 	    && (count + 1 + (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2
282 		<= PAGE_DIR_SLOT_MAX_N_OWNED)) {
283 		/* We can merge the two last dir slots. This operation is
284 		here to make this function imitate exactly the equivalent
285 		task made using page_cur_insert_rec, which we use in database
286 		recovery to reproduce the task performed by this function.
287 		To be able to check the correctness of recovery, it is good
288 		that it imitates exactly. */
289 
290 		count += (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2;
291 
292 		page_dir_slot_set_n_owned(slot, NULL, 0);
293 
294 		slot_index--;
295 	}
296 
297 	slot = page_dir_get_nth_slot(m_page, 1 + slot_index);
298 	page_dir_slot_set_rec(slot, page_get_supremum_rec(m_page));
299 	page_dir_slot_set_n_owned(slot, NULL, count + 1);
300 
301 	ut_ad(!dict_index_is_spatial(m_index));
302 	page_dir_set_n_slots(m_page, NULL, 2 + slot_index);
303 	page_header_set_ptr(m_page, NULL, PAGE_HEAP_TOP, m_heap_top);
304 	page_dir_set_n_heap(m_page, NULL, PAGE_HEAP_NO_USER_LOW + m_rec_no);
305 	page_header_set_field(m_page, NULL, PAGE_N_RECS, m_rec_no);
306 
307 	page_header_set_ptr(m_page, NULL, PAGE_LAST_INSERT, m_cur_rec);
308 	page_header_set_field(m_page, NULL, PAGE_DIRECTION, PAGE_RIGHT);
309 	page_header_set_field(m_page, NULL, PAGE_N_DIRECTION, 0);
310 
311 	m_block->skip_flush_check = false;
312 }
313 
314 /** Commit inserts done to the page
315 @param[in]	success		Flag whether all inserts succeed. */
316 void
commit(bool success)317 PageBulk::commit(
318 	bool	success)
319 {
320 	if (success) {
321 		ut_ad(page_validate(m_page, m_index));
322 
323 		/* Set no free space left and no buffered changes in ibuf. */
324 		if (!dict_index_is_clust(m_index)
325 		    && !dict_table_is_temporary(m_index->table)
326 		    && page_is_leaf(m_page)) {
327 			ibuf_set_bitmap_for_bulk_load(
328 				m_block, innobase_fill_factor == 100);
329 		}
330 	}
331 
332 	mtr_commit(m_mtr);
333 }
334 
335 /** Compress a page of compressed table
336 @return	true	compress successfully or no need to compress
337 @return	false	compress failed. */
338 bool
compress()339 PageBulk::compress()
340 {
341 	ut_ad(m_page_zip != NULL);
342 
343 	return(page_zip_compress(m_page_zip, m_page, m_index,
344 				 page_zip_level, NULL, m_mtr));
345 }
346 
347 /** Get node pointer
348 @return node pointer */
349 dtuple_t*
getNodePtr()350 PageBulk::getNodePtr()
351 {
352 	rec_t*		first_rec;
353 	dtuple_t*	node_ptr;
354 
355 	/* Create node pointer */
356 	first_rec = page_rec_get_next(page_get_infimum_rec(m_page));
357 	ut_a(page_rec_is_user_rec(first_rec));
358 	node_ptr = dict_index_build_node_ptr(m_index, first_rec, m_page_no,
359 					     m_heap, m_level);
360 
361 	return(node_ptr);
362 }
363 
364 /** Get split rec in left page.We split a page in half when compresssion fails,
365 and the split rec will be copied to right page.
366 @return split rec */
367 rec_t*
getSplitRec()368 PageBulk::getSplitRec()
369 {
370 	rec_t*		rec;
371 	ulint*		offsets;
372 	ulint		total_used_size;
373 	ulint		total_recs_size;
374 	ulint		n_recs;
375 
376 	ut_ad(m_page_zip != NULL);
377 	ut_ad(m_rec_no >= 2);
378 
379 	ut_ad(page_get_free_space_of_empty(m_is_comp) > m_free_space);
380 	total_used_size = page_get_free_space_of_empty(m_is_comp)
381 		- m_free_space;
382 
383 	total_recs_size = 0;
384 	n_recs = 0;
385 	offsets = NULL;
386 	rec = page_get_infimum_rec(m_page);
387 
388 	do {
389 		rec = page_rec_get_next(rec);
390 		ut_ad(page_rec_is_user_rec(rec));
391 
392 		offsets = rec_get_offsets(rec, m_index,
393 					  offsets, ULINT_UNDEFINED,
394 					  &(m_heap));
395 		total_recs_size += rec_offs_size(offsets);
396 		n_recs++;
397 	} while (total_recs_size + page_dir_calc_reserved_space(n_recs)
398 		 < total_used_size / 2);
399 
400 	/* Keep at least one record on left page */
401 	if (page_rec_is_infimum(page_rec_get_prev(rec))) {
402 		rec = page_rec_get_next(rec);
403 		ut_ad(page_rec_is_user_rec(rec));
404 	}
405 
406 	return(rec);
407 }
408 
409 /** Copy all records after split rec including itself.
410 @param[in]	rec	split rec */
411 void
copyIn(rec_t * split_rec)412 PageBulk::copyIn(
413 	rec_t*		split_rec)
414 {
415 
416 	rec_t*		rec = split_rec;
417 	ulint*		offsets = NULL;
418 
419 	ut_ad(m_rec_no == 0);
420 	ut_ad(page_rec_is_user_rec(rec));
421 
422 	do {
423 		offsets = rec_get_offsets(rec, m_index, offsets,
424 					  ULINT_UNDEFINED, &(m_heap));
425 
426 		insert(rec, offsets);
427 
428 		rec = page_rec_get_next(rec);
429 	} while (!page_rec_is_supremum(rec));
430 
431 	ut_ad(m_rec_no > 0);
432 }
433 
434 /** Remove all records after split rec including itself.
435 @param[in]	rec	split rec	*/
436 void
copyOut(rec_t * split_rec)437 PageBulk::copyOut(
438 	rec_t*		split_rec)
439 {
440 	rec_t*		rec;
441 	rec_t*		last_rec;
442 	ulint		n;
443 
444 	/* Suppose before copyOut, we have 5 records on the page:
445 	infimum->r1->r2->r3->r4->r5->supremum, and r3 is the split rec.
446 
447 	after copyOut, we have 2 records on the page:
448 	infimum->r1->r2->supremum. slot ajustment is not done. */
449 
450 	rec = page_rec_get_next(page_get_infimum_rec(m_page));
451 	last_rec = page_rec_get_prev(page_get_supremum_rec(m_page));
452 	n = 0;
453 
454 	while (rec != split_rec) {
455 		rec = page_rec_get_next(rec);
456 		n++;
457 	}
458 
459 	ut_ad(n > 0);
460 
461 	/* Set last record's next in page */
462 	ulint*		offsets = NULL;
463 	rec = page_rec_get_prev(split_rec);
464 	offsets = rec_get_offsets(rec, m_index,
465 				  offsets, ULINT_UNDEFINED,
466 				  &(m_heap));
467 	page_rec_set_next(rec, page_get_supremum_rec(m_page));
468 
469 	/* Set related members */
470 	m_cur_rec = rec;
471 	m_heap_top = rec_get_end(rec, offsets);
472 
473 	offsets = rec_get_offsets(last_rec, m_index,
474 				  offsets, ULINT_UNDEFINED,
475 				  &(m_heap));
476 
477 	m_free_space += rec_get_end(last_rec, offsets)
478 		- m_heap_top
479 		+ page_dir_calc_reserved_space(m_rec_no)
480 		- page_dir_calc_reserved_space(n);
481 	ut_ad(m_free_space > 0);
482 	m_rec_no = n;
483 
484 #ifdef UNIV_DEBUG
485 	m_total_data -= rec_get_end(last_rec, offsets) - m_heap_top;
486 #endif /* UNIV_DEBUG */
487 }
488 
489 /** Set next page
490 @param[in]	next_page_no	next page no */
491 void
setNext(ulint next_page_no)492 PageBulk::setNext(
493 	ulint		next_page_no)
494 {
495 	btr_page_set_next(m_page, NULL, next_page_no, m_mtr);
496 }
497 
498 /** Set previous page
499 @param[in]	prev_page_no	previous page no */
500 void
setPrev(ulint prev_page_no)501 PageBulk::setPrev(
502 	ulint		prev_page_no)
503 {
504 	btr_page_set_prev(m_page, NULL, prev_page_no, m_mtr);
505 }
506 
507 /** Check if required space is available in the page for the rec to be inserted.
508 We check fill factor & padding here.
509 @param[in]	length		required length
510 @return true	if space is available */
511 bool
isSpaceAvailable(ulint rec_size)512 PageBulk::isSpaceAvailable(
513 	ulint		rec_size)
514 {
515 	ulint	slot_size;
516 	ulint	required_space;
517 
518 	slot_size = page_dir_calc_reserved_space(m_rec_no + 1)
519 		- page_dir_calc_reserved_space(m_rec_no);
520 
521 	required_space = rec_size + slot_size;
522 
523 	if (required_space > m_free_space) {
524 		ut_ad(m_rec_no > 0);
525 		return false;
526 	}
527 
528 	/* Fillfactor & Padding apply to both leaf and non-leaf pages.
529 	Note: we keep at least 2 records in a page to avoid B-tree level
530 	growing too high. */
531 	if (m_rec_no >= 2
532 	    && ((m_page_zip == NULL && m_free_space - required_space
533 		 < m_reserved_space)
534 		|| (m_page_zip != NULL && m_free_space - required_space
535 		    < m_padding_space))) {
536 		return(false);
537 	}
538 
539 	return(true);
540 }
541 
542 /** Check whether the record needs to be stored externally.
543 @return false if the entire record can be stored locally on the page  */
544 bool
needExt(const dtuple_t * tuple,ulint rec_size)545 PageBulk::needExt(
546 	const dtuple_t*		tuple,
547 	ulint			rec_size)
548 {
549 	return(page_zip_rec_needs_ext(rec_size, m_is_comp,
550 		dtuple_get_n_fields(tuple), m_block->page.size));
551 }
552 
553 /** Store external record
554 Since the record is not logged yet, so we don't log update to the record.
555 the blob data is logged first, then the record is logged in bulk mode.
556 @param[in]	big_rec		external recrod
557 @param[in]	offsets		record offsets
558 @return	error code */
559 dberr_t
storeExt(const big_rec_t * big_rec,ulint * offsets)560 PageBulk::storeExt(
561 	const big_rec_t*	big_rec,
562 	ulint*			offsets)
563 {
564 	/* Note: not all fileds are initialized in btr_pcur. */
565 	btr_pcur_t	btr_pcur;
566 	btr_pcur.pos_state = BTR_PCUR_IS_POSITIONED;
567 	btr_pcur.latch_mode = BTR_MODIFY_LEAF;
568 	btr_pcur.btr_cur.index = m_index;
569 
570 	page_cur_t*	page_cur = &btr_pcur.btr_cur.page_cur;
571 	page_cur->index = m_index;
572 	page_cur->rec = m_cur_rec;
573 	page_cur->offsets = offsets;
574 	page_cur->block = m_block;
575 
576 	dberr_t	err = btr_store_big_rec_extern_fields(
577 		&btr_pcur, NULL, offsets, big_rec, m_mtr,
578 		BTR_STORE_INSERT_BULK);
579 
580 	ut_ad(page_offset(m_cur_rec) == page_offset(page_cur->rec));
581 
582 	/* Reset m_block and m_cur_rec from page cursor, because
583 	block may be changed during blob insert. */
584 	m_block = page_cur->block;
585 	m_cur_rec = page_cur->rec;
586 	m_page = buf_block_get_frame(m_block);
587 
588 	return(err);
589 }
590 
591 /** Release block by commiting mtr
592 Note: log_free_check requires holding no lock/latch in current thread. */
593 void
release()594 PageBulk::release()
595 {
596 	ut_ad(!dict_index_is_spatial(m_index));
597 	ut_ad(m_block->page.buf_fix_count > 0);
598 
599 	/* We fix the block because we will re-pin it soon. */
600 	buf_block_buf_fix_inc(m_block, __FILE__, __LINE__);
601 
602 	/* No other threads can modify this block. */
603 	m_modify_clock = buf_block_get_modify_clock(m_block);
604 
605 	mtr_commit(m_mtr);
606 }
607 
608 /** Start mtr and latch the block */
609 void
latch()610 PageBulk::latch()
611 {
612 	ibool	ret;
613 
614 	mtr_start(m_mtr);
615 
616 	if (!dict_index_is_online_ddl(m_index)) {
617 		mtr_x_lock(dict_index_get_lock(m_index), m_mtr);
618 	}
619 
620 	mtr_set_log_mode(m_mtr, MTR_LOG_NO_REDO);
621 	mtr_set_flush_observer(m_mtr, m_flush_observer);
622 
623 	ut_ad(m_block->page.buf_fix_count > 0);
624 
625 	/* TODO: need a simple and wait version of buf_page_optimistic_get. */
626 	ret = buf_page_optimistic_get(RW_X_LATCH, m_block, m_modify_clock,
627 				      __FILE__, __LINE__, m_mtr);
628 	/* In case the block is S-latched by page_cleaner. */
629 	if (!ret) {
630 		page_id_t       page_id(dict_index_get_space(m_index), m_page_no);
631 		page_size_t     page_size(dict_table_page_size(m_index->table));
632 
633 		m_block = buf_page_get_gen(page_id, page_size, RW_X_LATCH,
634 					   m_block, BUF_GET_IF_IN_POOL,
635 					   __FILE__, __LINE__, m_mtr);
636 		ut_ad(m_block != NULL);
637 	}
638 
639 	buf_block_buf_fix_dec(m_block);
640 	/*
641 	The caller is going to use the m_block, so it needs to be buffer-fixed even
642 	after the decrement above. This works like this:
643 	release(){ //initially buf_fix_count == N > 0
644 		buf_fix_count++ // N+1
645 		mtr_commit(){
646 			buf_fix_count-- // N
647 		}
648 	}//at the end buf_fix_count == N > 0
649 	latch(){//initially buf_fix_count == M > 0
650 		buf_page_get_gen/buf_page_optimistic_get internally(){
651 			buf_fix_count++ // M+1
652 		}
653 		buf_fix_count-- // M
654 	}//at the end buf_fix_count == M > 0
655 	*/
656 	ut_ad(m_block->page.buf_fix_count > 0);
657 
658 	ut_ad(m_cur_rec > m_page && m_cur_rec < m_heap_top);
659 }
660 
661 #ifdef UNIV_DEBUG
662 /* Check if an index is locked */
isIndexXLocked()663 bool PageBulk::isIndexXLocked() {
664 	return (dict_index_is_online_ddl(m_index) &&
665 		mtr_memo_contains_flagged(m_mtr, dict_index_get_lock(m_index),
666 			MTR_MEMO_X_LOCK | MTR_MEMO_SX_LOCK));
667 }
668 #endif // UNIV_DEBUG
669 
670 /** Split a page
671 @param[in]	page_bulk	page to split
672 @param[in]	next_page_bulk	next page
673 @return	error code */
674 dberr_t
pageSplit(PageBulk * page_bulk,PageBulk * next_page_bulk)675 BtrBulk::pageSplit(
676 	PageBulk*	page_bulk,
677 	PageBulk*	next_page_bulk)
678 {
679 	ut_ad(page_bulk->getPageZip() != NULL);
680 
681 	/* 1. Check if we have only one user record on the page. */
682 	if (page_bulk->getRecNo() <= 1) {
683 		return(DB_TOO_BIG_RECORD);
684 	}
685 
686 	/* 2. create a new page. */
687 	PageBulk new_page_bulk(m_index, m_trx_id, FIL_NULL,
688 			       page_bulk->getLevel(), m_flush_observer);
689 	dberr_t	err = new_page_bulk.init();
690 	if (err != DB_SUCCESS) {
691 		return(err);
692 	}
693 
694 	/* 3. copy the upper half to new page. */
695 	rec_t*	split_rec = page_bulk->getSplitRec();
696 	new_page_bulk.copyIn(split_rec);
697 	page_bulk->copyOut(split_rec);
698 
699 	/* 4. commit the splitted page. */
700 	err = pageCommit(page_bulk, &new_page_bulk, true);
701 	if (err != DB_SUCCESS) {
702 		pageAbort(&new_page_bulk);
703 		return(err);
704 	}
705 
706 	/* 5. commit the new page. */
707 	err = pageCommit(&new_page_bulk, next_page_bulk, true);
708 	if (err != DB_SUCCESS) {
709 		pageAbort(&new_page_bulk);
710 		return(err);
711 	}
712 
713 	return(err);
714 }
715 
716 /** Commit(finish) a page. We set next/prev page no, compress a page of
717 compressed table and split the page if compression fails, insert a node
718 pointer to father page if needed, and commit mini-transaction.
719 @param[in]	page_bulk	page to commit
720 @param[in]	next_page_bulk	next page
721 @param[in]	insert_father	false when page_bulk is a root page and
722 				true when it's a non-root page
723 @return	error code */
724 dberr_t
pageCommit(PageBulk * page_bulk,PageBulk * next_page_bulk,bool insert_father)725 BtrBulk::pageCommit(
726 	PageBulk*	page_bulk,
727 	PageBulk*	next_page_bulk,
728 	bool		insert_father)
729 {
730 	page_bulk->finish();
731 
732 	/* Set page links */
733 	if (next_page_bulk != NULL) {
734 		ut_ad(page_bulk->getLevel() == next_page_bulk->getLevel());
735 
736 		page_bulk->setNext(next_page_bulk->getPageNo());
737 		next_page_bulk->setPrev(page_bulk->getPageNo());
738 	} else {
739 		/** Suppose a page is released and latched again, we need to
740 		mark it modified in mini-transaction.  */
741 		page_bulk->setNext(FIL_NULL);
742 	}
743 
744 	/* Assert that no locks are held during bulk load operation
745 	in case of a online ddl operation. Insert thread acquires index->lock
746 	to check the online status of index. During bulk load index,
747 	there are no concurrent insert of reads and hence, there is no
748 	need to acquire a lock in that case. */
749 	ut_ad(!page_bulk->isIndexXLocked());
750 
751 	DBUG_EXECUTE_IF("innodb_bulk_load_sleep",
752 			os_thread_sleep(1000000););
753 
754 	/* Compress page if it's a compressed table. */
755 	if (page_bulk->getPageZip() != NULL && !page_bulk->compress()) {
756 		return(pageSplit(page_bulk, next_page_bulk));
757 	}
758 
759 	/* Insert node pointer to father page. */
760 	if (insert_father) {
761 		dtuple_t*	node_ptr = page_bulk->getNodePtr();
762 		dberr_t		err = insert(node_ptr, page_bulk->getLevel()+1);
763 
764 		if (err != DB_SUCCESS) {
765 			return(err);
766 		}
767 	}
768 
769 	/* Commit mtr. */
770 	page_bulk->commit(true);
771 
772 	return(DB_SUCCESS);
773 }
774 
775 /** Log free check */
776 void
logFreeCheck()777 BtrBulk::logFreeCheck()
778 {
779 	if (log_sys->check_flush_or_checkpoint) {
780 		release();
781 
782 		log_free_check();
783 
784 		latch();
785 	}
786 }
787 
788 /** Release all latches */
789 void
release()790 BtrBulk::release()
791 {
792 	ut_ad(m_root_level + 1 == m_page_bulks->size());
793 
794 	for (ulint level = 0; level <= m_root_level; level++) {
795 		PageBulk*    page_bulk = m_page_bulks->at(level);
796 
797 		page_bulk->release();
798 	}
799 }
800 
801 /** Re-latch all latches */
802 void
latch()803 BtrBulk::latch()
804 {
805 	ut_ad(m_root_level + 1 == m_page_bulks->size());
806 
807 	for (ulint level = 0; level <= m_root_level; level++) {
808 		PageBulk*    page_bulk = m_page_bulks->at(level);
809 		page_bulk->latch();
810 	}
811 }
812 
813 /** Insert a tuple to page in a level
814 @param[in]	tuple	tuple to insert
815 @param[in]	level	B-tree level
816 @return error code */
817 dberr_t
insert(dtuple_t * tuple,ulint level)818 BtrBulk::insert(
819 	dtuple_t*	tuple,
820 	ulint		level)
821 {
822 	bool		is_left_most = false;
823 	dberr_t		err = DB_SUCCESS;
824 
825 	ut_ad(m_heap != NULL);
826 
827 	/* Check if we need to create a PageBulk for the level. */
828 	if (level + 1 > m_page_bulks->size()) {
829 		PageBulk*	new_page_bulk
830 			= UT_NEW_NOKEY(PageBulk(m_index, m_trx_id, FIL_NULL,
831 						level, m_flush_observer));
832 		err = new_page_bulk->init();
833 		if (err != DB_SUCCESS) {
834 			return(err);
835 		}
836 
837 		DEBUG_SYNC_C("bulk_load_insert");
838 		m_page_bulks->push_back(new_page_bulk);
839 		ut_ad(level + 1 == m_page_bulks->size());
840 		m_root_level = level;
841 
842 		is_left_most = true;
843 	}
844 
845 	ut_ad(m_page_bulks->size() > level);
846 
847 	PageBulk*	page_bulk = m_page_bulks->at(level);
848 
849 	if (is_left_most && level > 0 && page_bulk->getRecNo() == 0) {
850 		/* The node pointer must be marked as the predefined minimum
851 		record,	as there is no lower alphabetical limit to records in
852 		the leftmost node of a level: */
853 		dtuple_set_info_bits(tuple, dtuple_get_info_bits(tuple)
854 					    | REC_INFO_MIN_REC_FLAG);
855 	}
856 
857 	ulint		n_ext = 0;
858 	ulint		rec_size = rec_get_converted_size(m_index, tuple, n_ext);
859 	big_rec_t*	big_rec = NULL;
860 	rec_t*		rec = NULL;
861 	ulint*		offsets = NULL;
862 
863 	if (page_bulk->needExt(tuple, rec_size)) {
864 		/* The record is so big that we have to store some fields
865 		externally on separate database pages */
866 		big_rec = dtuple_convert_big_rec(m_index, 0, tuple, &n_ext);
867 
868 		if (big_rec == NULL) {
869 			return(DB_TOO_BIG_RECORD);
870 		}
871 
872 		rec_size = rec_get_converted_size(m_index, tuple, n_ext);
873 	}
874 
875 	if (page_bulk->getPageZip() != NULL
876 	    && page_zip_is_too_big(m_index, tuple)) {
877 		err = DB_TOO_BIG_RECORD;
878 		goto func_exit;
879 	}
880 
881 	if (!page_bulk->isSpaceAvailable(rec_size)) {
882 		/* Create a sibling page_bulk. */
883 		PageBulk*	sibling_page_bulk;
884 		sibling_page_bulk = UT_NEW_NOKEY(PageBulk(m_index, m_trx_id,
885 							  FIL_NULL, level,
886 							  m_flush_observer));
887 		err = sibling_page_bulk->init();
888 		if (err != DB_SUCCESS) {
889 			UT_DELETE(sibling_page_bulk);
890 			goto func_exit;
891 		}
892 
893 		/* Commit page bulk. */
894 		err = pageCommit(page_bulk, sibling_page_bulk, true);
895 		if (err != DB_SUCCESS) {
896 			pageAbort(sibling_page_bulk);
897 			UT_DELETE(sibling_page_bulk);
898 			goto func_exit;
899 		}
900 
901 		/* Set new page bulk to page_bulks. */
902 		ut_ad(sibling_page_bulk->getLevel() <= m_root_level);
903 		m_page_bulks->at(level) = sibling_page_bulk;
904 
905 		UT_DELETE(page_bulk);
906 		page_bulk = sibling_page_bulk;
907 
908 		/* Important: log_free_check whether we need a checkpoint. */
909 		if (page_is_leaf(sibling_page_bulk->getPage())) {
910 			/* Check whether trx is interrupted */
911 			if (m_flush_observer->check_interrupted()) {
912 				err = DB_INTERRUPTED;
913 				goto func_exit;
914 			}
915 
916 			/* Wake up page cleaner to flush dirty pages. */
917 			srv_inc_activity_count();
918 			os_event_set(buf_flush_event);
919 
920 			logFreeCheck();
921 		}
922 
923 	}
924 
925 	/* Convert tuple to rec. */
926         rec = rec_convert_dtuple_to_rec(static_cast<byte*>(mem_heap_alloc(
927 		page_bulk->m_heap, rec_size)), m_index, tuple, n_ext);
928         offsets = rec_get_offsets(rec, m_index, offsets, ULINT_UNDEFINED,
929 		&(page_bulk->m_heap));
930 
931 	page_bulk->insert(rec, offsets);
932 
933 	if (big_rec != NULL) {
934 		ut_ad(dict_index_is_clust(m_index));
935 		ut_ad(page_bulk->getLevel() == 0);
936 		ut_ad(page_bulk == m_page_bulks->at(0));
937 
938 		/* Release all latched but leaf node. */
939 		for (ulint level = 1; level <= m_root_level; level++) {
940 			PageBulk*    page_bulk = m_page_bulks->at(level);
941 
942 			page_bulk->release();
943 		}
944 
945 		err = page_bulk->storeExt(big_rec, offsets);
946 
947 		/* Latch */
948 		for (ulint level = 1; level <= m_root_level; level++) {
949 			PageBulk*    page_bulk = m_page_bulks->at(level);
950 			page_bulk->latch();
951 		}
952 	}
953 
954 func_exit:
955 	if (big_rec != NULL) {
956 		dtuple_convert_back_big_rec(m_index, tuple, big_rec);
957 	}
958 
959 	return(err);
960 }
961 
962 /** Btree bulk load finish. We commit the last page in each level
963 and copy the last page in top level to the root page of the index
964 if no error occurs.
965 @param[in]	err	whether bulk load was successful until now
966 @return error code  */
967 dberr_t
finish(dberr_t err)968 BtrBulk::finish(dberr_t	err)
969 {
970 	ulint		last_page_no = FIL_NULL;
971 
972 	ut_ad(!dict_table_is_temporary(m_index->table));
973 
974 #ifdef UNIV_DEBUG
975 	/* Assert that the index online status has not changed */
976 	ut_ad(m_index->online_status == m_index_online);
977 #endif // UNIV_DEBUG
978 
979 	if (m_page_bulks->size() == 0) {
980 		/* The table is empty. The root page of the index tree
981 		is already in a consistent state. No need to flush. */
982 		return(err);
983 	}
984 
985 	ut_ad(m_root_level + 1 == m_page_bulks->size());
986 
987 	/* Finish all page bulks */
988 	for (ulint level = 0; level <= m_root_level; level++) {
989 		PageBulk*	page_bulk = m_page_bulks->at(level);
990 
991 		last_page_no = page_bulk->getPageNo();
992 
993 		if (err == DB_SUCCESS) {
994 			err = pageCommit(page_bulk, NULL,
995 					 level != m_root_level);
996 		}
997 
998 		if (err != DB_SUCCESS) {
999 			pageAbort(page_bulk);
1000 		}
1001 
1002 		UT_DELETE(page_bulk);
1003 	}
1004 
1005 	if (err == DB_SUCCESS) {
1006 		rec_t*		first_rec;
1007 		mtr_t		mtr;
1008 		buf_block_t*	last_block;
1009 		page_t*		last_page;
1010 		page_id_t	page_id(dict_index_get_space(m_index),
1011 					last_page_no);
1012 		page_size_t	page_size(dict_table_page_size(m_index->table));
1013 		ulint		root_page_no = dict_index_get_page(m_index);
1014 		PageBulk	root_page_bulk(m_index, m_trx_id,
1015 					       root_page_no, m_root_level,
1016 					       m_flush_observer);
1017 
1018 		mtr_start(&mtr);
1019 		mtr.set_named_space(dict_index_get_space(m_index));
1020 		mtr_x_lock(dict_index_get_lock(m_index), &mtr);
1021 
1022 		ut_ad(last_page_no != FIL_NULL);
1023 		last_block = btr_block_get(page_id, page_size,
1024 					   RW_X_LATCH, m_index, &mtr);
1025 		last_page = buf_block_get_frame(last_block);
1026 		first_rec = page_rec_get_next(page_get_infimum_rec(last_page));
1027 		ut_ad(page_rec_is_user_rec(first_rec));
1028 
1029 		/* Copy last page to root page. */
1030 		err = root_page_bulk.init();
1031 		if (err != DB_SUCCESS) {
1032 			mtr_commit(&mtr);
1033 			return(err);
1034 		}
1035 		root_page_bulk.copyIn(first_rec);
1036 
1037 		/* Remove last page. */
1038 		btr_page_free_low(m_index, last_block, m_root_level, &mtr);
1039 
1040 		/* Do not flush the last page. */
1041 		last_block->page.flush_observer = NULL;
1042 
1043 		mtr_commit(&mtr);
1044 
1045 		err = pageCommit(&root_page_bulk, NULL, false);
1046 		ut_ad(err == DB_SUCCESS);
1047 	}
1048 
1049 #ifdef UNIV_DEBUG
1050 	dict_sync_check check(true);
1051 
1052 	ut_ad(!sync_check_iterate(check));
1053 #endif /* UNIV_DEBUG */
1054 
1055 	ut_ad(err != DB_SUCCESS || btr_validate_index(m_index, NULL, false));
1056 	return(err);
1057 }
1058