1 /*****************************************************************************
2 
3 Copyright (c) 2014, 2019, Oracle and/or its affiliates. All Rights Reserved.
4 Copyright (c) 2017, 2021, MariaDB Corporation.
5 
6 This program is free software; you can redistribute it and/or modify it under
7 the terms of the GNU General Public License as published by the Free Software
8 Foundation; version 2 of the License.
9 
10 This program is distributed in the hope that it will be useful, but WITHOUT
11 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
13 
14 You should have received a copy of the GNU General Public License along with
15 this program; if not, write to the Free Software Foundation, Inc.,
16 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
17 
18 *****************************************************************************/
19 
20 /**************************************************//**
21 @file btr/btr0bulk.cc
22 The B-tree bulk load
23 
24 Created 03/11/2014 Shaohua Wang
25 *******************************************************/
26 
27 #include "btr0bulk.h"
28 #include "btr0btr.h"
29 #include "btr0cur.h"
30 #include "btr0pcur.h"
31 #include "ibuf0ibuf.h"
32 #include "page0page.h"
33 #include "trx0trx.h"
34 
35 /** Innodb B-tree index fill factor for bulk load. */
36 uint	innobase_fill_factor;
37 /** whether to reduce redo logging during ALTER TABLE */
38 my_bool	innodb_log_optimize_ddl;
39 
40 /** Initialize members, allocate page if needed and start mtr.
41 Note: we commit all mtrs on failure.
42 @return error code. */
43 dberr_t
44 PageBulk::init()
45 {
46 	buf_block_t*	new_block;
47 	page_t*		new_page;
48 	page_zip_des_t*	new_page_zip;
49 	ulint		new_page_no;
50 
51 	ut_ad(m_heap == NULL);
52 	m_heap = mem_heap_create(1000);
53 
54 	m_mtr.start();
55 
56 	if (m_flush_observer) {
57 		m_mtr.set_log_mode(MTR_LOG_NO_REDO);
58 		m_mtr.set_flush_observer(m_flush_observer);
59 	} else {
60 		m_index->set_modified(m_mtr);
61 	}
62 
63 	if (m_page_no == FIL_NULL) {
64 		mtr_t	alloc_mtr;
65 
66 		/* We commit redo log for allocation by a separate mtr,
67 		because we don't guarantee pages are committed following
68 		the allocation order, and we will always generate redo log
69 		for page allocation, even when creating a new tablespace. */
70 		alloc_mtr.start();
71 		m_index->set_modified(alloc_mtr);
72 
73 		ulint	n_reserved;
74 		bool	success;
75 		success = fsp_reserve_free_extents(&n_reserved,
76 						   m_index->table->space,
computeSampleSizeFromFormat(PaSampleFormat format)77 						   1, FSP_NORMAL, &alloc_mtr);
78 		if (!success) {
79 			alloc_mtr.commit();
80 			m_mtr.commit();
81 			return(DB_OUT_OF_FILE_SPACE);
82 		}
83 
84 		/* Allocate a new page. */
85 		new_block = btr_page_alloc(m_index, 0, FSP_UP, m_level,
86 					   &alloc_mtr, &m_mtr);
87 
88 		m_index->table->space->release_free_extents(n_reserved);
89 
90 		alloc_mtr.commit();
91 
computeSampleSizeFromFormatPow2(PaSampleFormat format)92 		new_page = buf_block_get_frame(new_block);
93 		new_page_zip = buf_block_get_page_zip(new_block);
94 		new_page_no = page_get_page_no(new_page);
95 
96 		if (new_page_zip) {
97 			page_create_zip(new_block, m_index, m_level, 0,
98 					NULL, &m_mtr);
99 			memset(FIL_PAGE_PREV + new_page, 0xff, 8);
100 			page_zip_write_header(new_page_zip,
101 					      FIL_PAGE_PREV + new_page,
102 					      8, &m_mtr);
103 			mach_write_to_8(PAGE_HEADER + PAGE_INDEX_ID + new_page,
104 					m_index->id);
105 			page_zip_write_header(new_page_zip,
106 					      PAGE_HEADER + PAGE_INDEX_ID
107 					      + new_page, 8, &m_mtr);
108 		} else {
109 			ut_ad(!dict_index_is_spatial(m_index));
110 			page_create(new_block, &m_mtr,
111 				    dict_table_is_comp(m_index->table),
112 				    false);
113 			mlog_write_ulint(FIL_PAGE_PREV + new_page, FIL_NULL,
114 					 MLOG_4BYTES, &m_mtr);
115 			mlog_write_ulint(FIL_PAGE_NEXT + new_page, FIL_NULL,
initializeBlioRingBuffers(PaMacBlio * blio,PaSampleFormat inputSampleFormat,PaSampleFormat outputSampleFormat,long ringBufferSizeInFrames,int inChan,int outChan)116 					 MLOG_4BYTES, &m_mtr);
117 			mlog_write_ulint(PAGE_HEADER + PAGE_LEVEL + new_page,
118 					 m_level, MLOG_2BYTES, &m_mtr);
119 			mlog_write_ull(PAGE_HEADER + PAGE_INDEX_ID + new_page,
120 				       m_index->id, &m_mtr);
121 		}
122 	} else {
123 		new_block = btr_block_get(
124 			page_id_t(m_index->table->space_id, m_page_no),
125 			page_size_t(m_index->table->space->flags),
126 			RW_X_LATCH, m_index, &m_mtr);
127 
128 		new_page = buf_block_get_frame(new_block);
129 		new_page_zip = buf_block_get_page_zip(new_block);
130 		new_page_no = page_get_page_no(new_page);
131 		ut_ad(m_page_no == new_page_no);
132 
133 		ut_ad(page_dir_get_n_heap(new_page) == PAGE_HEAP_NO_USER_LOW);
134 
135 		btr_page_set_level(new_page, new_page_zip, m_level, &m_mtr);
136 	}
137 
138 	if (!m_level && dict_index_is_sec_or_ibuf(m_index)) {
139 		page_update_max_trx_id(new_block, new_page_zip, m_trx_id,
140 				       &m_mtr);
141 	}
142 
143 	m_block = new_block;
144 	m_page = new_page;
145 	m_page_zip = new_page_zip;
146 	m_page_no = new_page_no;
147 	m_cur_rec = page_get_infimum_rec(new_page);
148 	ut_ad(m_is_comp == !!page_is_comp(new_page));
149 	m_free_space = page_get_free_space_of_empty(m_is_comp);
150 
151 	if (innobase_fill_factor == 100 && dict_index_is_clust(m_index)) {
152 		/* Keep default behavior compatible with 5.6 */
153 		m_reserved_space = dict_index_get_space_reserve();
154 	} else {
155 		m_reserved_space =
156 			srv_page_size * (100 - innobase_fill_factor) / 100;
157 	}
158 
159 	m_padding_space =
160 		srv_page_size - dict_index_zip_pad_optimal_page_size(m_index);
161 	m_heap_top = page_header_get_ptr(new_page, PAGE_HEAP_TOP);
162 	m_rec_no = page_header_get_field(new_page, PAGE_N_RECS);
163 	/* Temporarily reset PAGE_DIRECTION_B from PAGE_NO_DIRECTION to 0,
164 	without writing redo log, to ensure that needs_finish() will hold
165 	on an empty page. */
166 	ut_ad(m_page[PAGE_HEADER + PAGE_DIRECTION_B] == PAGE_NO_DIRECTION);
167 	m_page[PAGE_HEADER + PAGE_DIRECTION_B] = 0;
168 	ut_d(m_total_data = 0);
169 	/* See page_copy_rec_list_end_to_created_page() */
170 	ut_d(page_header_set_field(m_page, NULL, PAGE_HEAP_TOP,
171 				   srv_page_size - 1));
172 
173 	return(DB_SUCCESS);
174 }
175 
176 /** Insert a record in the page.
177 @param[in]	rec		record
178 @param[in]	offsets		record offsets */
179 void
180 PageBulk::insert(
181 	const rec_t*		rec,
182 	rec_offs*		offsets)
183 {
184 	ulint		rec_size;
185 
186 	ut_ad(m_heap != NULL);
187 
188 	rec_size = rec_offs_size(offsets);
189 	ut_d(const bool is_leaf = page_rec_is_leaf(m_cur_rec));
190 
191 #ifdef UNIV_DEBUG
192 	/* Check whether records are in order. */
193 	if (!page_rec_is_infimum_low(page_offset(m_cur_rec))) {
194 		rec_t*	old_rec = m_cur_rec;
195 		rec_offs* old_offsets = rec_get_offsets(
196 			old_rec, m_index, NULL,	is_leaf
197 			? m_index->n_core_fields : 0,
198 			ULINT_UNDEFINED, &m_heap);
199 
200 		ut_ad(cmp_rec_rec(rec, old_rec, offsets, old_offsets, m_index)
201 		      > 0);
202 	}
203 
204 	m_total_data += rec_size;
205 #endif /* UNIV_DEBUG */
206 
207 	/* 1. Copy the record to page. */
208 	rec_t*	insert_rec = rec_copy(m_heap_top, rec, offsets);
blioSetIsInputEmpty(PaMacBlio * blio,bool isEmpty)209 	rec_offs_make_valid(insert_rec, m_index, is_leaf, offsets);
210 
211 	/* 2. Insert the record in the linked list. */
212 	/* 3. Set the n_owned field in the inserted record to zero,
213 	and set the heap_no field. */
214 	if (m_is_comp) {
215 		ulint next_offs = rec_get_next_offs(m_cur_rec, TRUE);
216 		rec_set_next_offs_new(insert_rec, next_offs);
217 		rec_set_next_offs_new(m_cur_rec, page_offset(insert_rec));
218 
219 		rec_set_n_owned_new(insert_rec, NULL, 0);
220 		rec_set_heap_no_new(insert_rec,
221 				    PAGE_HEAP_NO_USER_LOW + m_rec_no);
222 	} else {
223 		ulint next_offs = rec_get_next_offs(m_cur_rec, FALSE);
224 		rec_set_next_offs_old(insert_rec, next_offs);
225 		rec_set_next_offs_old(m_cur_rec, page_offset(insert_rec));
226 
227 		rec_set_n_owned_old(insert_rec, 0);
228 		rec_set_heap_no_old(insert_rec,
229 				    PAGE_HEAP_NO_USER_LOW + m_rec_no);
230 	}
231 
232 	/* 4. Set member variables. */
233 	ulint		slot_size;
234 	slot_size = page_dir_calc_reserved_space(m_rec_no + 1)
blioSetIsOutputFull(PaMacBlio * blio,bool isFull)235 		- page_dir_calc_reserved_space(m_rec_no);
236 
237 	ut_ad(m_free_space >= rec_size + slot_size);
238 	ut_ad(m_heap_top + rec_size < m_page + srv_page_size);
239 
240 	m_free_space -= rec_size + slot_size;
241 	m_heap_top += rec_size;
242 	m_rec_no += 1;
243 
244 	if (!m_flush_observer && !m_page_zip) {
245 		/* For ROW_FORMAT=COMPRESSED, redo log may be written
246 		in PageBulk::compress(). */
247 		page_cur_insert_rec_write_log(insert_rec, rec_size,
248 					      m_cur_rec, m_index, &m_mtr);
249 	}
250 
251 	m_cur_rec = insert_rec;
252 }
253 
254 inline bool PageBulk::needs_finish() const
255 {
256   ut_ad(page_align(m_cur_rec) == m_block->frame);
257   ut_ad(m_page == m_block->frame);
258   if (!m_page[PAGE_HEADER + PAGE_DIRECTION_B])
259     return true;
260   ulint heap_no, n_heap= page_header_get_field(m_page, PAGE_N_HEAP);
261   ut_ad((n_heap & 0x7fff) >= PAGE_HEAP_NO_USER_LOW);
262   if (n_heap & 0x8000)
263   {
264     n_heap&= 0x7fff;
resetBlioRingBuffers(PaMacBlio * blio)265     heap_no= rec_get_heap_no_new(m_cur_rec);
266     if (heap_no == PAGE_HEAP_NO_INFIMUM &&
267 	page_header_get_field(m_page, PAGE_HEAP_TOP) == PAGE_NEW_SUPREMUM_END)
268       return false;
269   }
270   else
271   {
272     heap_no= rec_get_heap_no_old(m_cur_rec);
273     if (heap_no == PAGE_HEAP_NO_INFIMUM &&
274 	page_header_get_field(m_page, PAGE_HEAP_TOP) == PAGE_OLD_SUPREMUM_END)
275       return false;
276   }
277   return heap_no != n_heap - 1;
278 }
279 
280 /** Mark end of insertion to the page. Scan all records to set page dirs,
281 and set page header members.
282 Note: we refer to page_copy_rec_list_end_to_created_page. */
283 void
284 PageBulk::finish()
285 {
286 	ut_ad(!dict_index_is_spatial(m_index));
287 
288 	if (!needs_finish()) {
289 		return;
290 	}
291 
292 	ut_ad(m_total_data + page_dir_calc_reserved_space(m_rec_no)
293 	      <= page_get_free_space_of_empty(m_is_comp));
294 #ifdef UNIV_DEBUG
295 	/* See page_copy_rec_list_end_to_created_page() */
296 	if (m_rec_no) {
297 		page_dir_set_n_slots(m_page, NULL, srv_page_size / 2);
298 	}
299 	mach_write_to_2(PAGE_HEADER + PAGE_HEAP_TOP + m_page,
300 			ulint(m_heap_top - m_page));
301 #endif
302 
303 	ulint	count = 0;
304 	ulint	n_recs = 0;
305 	ulint	slot_index = 0;
306 	rec_t*	insert_rec = page_rec_get_next(page_get_infimum_rec(m_page));
307 	page_dir_slot_t* slot = NULL;
308 
destroyBlioRingBuffers(PaMacBlio * blio)309 	/* Set owner & dir. */
310 	while (!page_rec_is_supremum(insert_rec)) {
311 		count++;
312 		n_recs++;
313 
314 		if (count == (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2) {
315 
316 			slot_index++;
317 
318 			slot = page_dir_get_nth_slot(m_page, slot_index);
319 
320 			page_dir_slot_set_rec(slot, insert_rec);
321 			page_dir_slot_set_n_owned(slot, NULL, count);
322 
323 			count = 0;
324 		}
325 
326 		insert_rec = page_rec_get_next(insert_rec);
327 	}
328 
329 	if (slot_index > 0
330 	    && (count + 1 + (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2
331 		<= PAGE_DIR_SLOT_MAX_N_OWNED)) {
332 		/* We can merge the two last dir slots. This operation is
333 		here to make this function imitate exactly the equivalent
334 		task made using page_cur_insert_rec, which we use in database
335 		recovery to reproduce the task performed by this function.
336 		To be able to check the correctness of recovery, it is good
337 		that it imitates exactly. */
338 
339 		count += (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2;
340 
BlioCallback(const void * input,void * output,unsigned long frameCount,const PaStreamCallbackTimeInfo * timeInfo,PaStreamCallbackFlags statusFlags,void * userData)341 		page_dir_slot_set_n_owned(slot, NULL, 0);
342 
343 		slot_index--;
344 	}
345 
346 	slot = page_dir_get_nth_slot(m_page, 1 + slot_index);
347 	page_dir_slot_set_rec(slot, page_get_supremum_rec(m_page));
348 	page_dir_slot_set_n_owned(slot, NULL, count + 1);
349 
350 	ut_ad(!page_get_instant(m_page));
351 
352 	if (!m_rec_no) {
353 		/* Restore PAGE_DIRECTION_B from 0 to
354 		PAGE_NO_DIRECTION like it should be on an empty page,
355 		again without writing redo log. */
356 		m_page[PAGE_HEADER + PAGE_DIRECTION_B] = PAGE_NO_DIRECTION;
357 	} else if (!m_flush_observer && !m_page_zip) {
358 		mlog_write_ulint(PAGE_HEADER + PAGE_N_DIR_SLOTS + m_page,
359 				 2 + slot_index, MLOG_2BYTES, &m_mtr);
360 		mlog_write_ulint(PAGE_HEADER + PAGE_HEAP_TOP + m_page,
361 				 ulint(m_heap_top - m_page),
362 				 MLOG_2BYTES, &m_mtr);
363 		mlog_write_ulint(PAGE_HEADER + PAGE_N_HEAP + m_page,
364 				 (PAGE_HEAP_NO_USER_LOW + m_rec_no)
365 				 | ulint(m_is_comp) << 15,
366 				 MLOG_2BYTES, &m_mtr);
367 		mlog_write_ulint(PAGE_HEADER + PAGE_N_RECS + m_page, m_rec_no,
368 				 MLOG_2BYTES, &m_mtr);
369 		mlog_write_ulint(PAGE_HEADER + PAGE_LAST_INSERT + m_page,
370 				 ulint(m_cur_rec - m_page),
371 				 MLOG_2BYTES, &m_mtr);
372 		mlog_write_ulint(PAGE_HEADER + PAGE_DIRECTION_B - 1 + m_page,
373 				 PAGE_RIGHT, MLOG_2BYTES, &m_mtr);
374 		mlog_write_ulint(PAGE_HEADER + PAGE_N_DIRECTION + m_page, 0,
375 				 MLOG_2BYTES, &m_mtr);
376 	} else {
377 		/* For ROW_FORMAT=COMPRESSED, redo log may be written
378 		in PageBulk::compress(). */
379 		mach_write_to_2(PAGE_HEADER + PAGE_N_DIR_SLOTS + m_page,
380 				2 + slot_index);
381 		mach_write_to_2(PAGE_HEADER + PAGE_HEAP_TOP + m_page,
382 				ulint(m_heap_top - m_page));
383 		mach_write_to_2(PAGE_HEADER + PAGE_N_HEAP + m_page,
384 				(PAGE_HEAP_NO_USER_LOW + m_rec_no)
385 				| ulint(m_is_comp) << 15);
386 		mach_write_to_2(PAGE_HEADER + PAGE_N_RECS + m_page, m_rec_no);
387 		mach_write_to_2(PAGE_HEADER + PAGE_LAST_INSERT + m_page,
388 				ulint(m_cur_rec - m_page));
389 		mach_write_to_2(PAGE_HEADER + PAGE_DIRECTION_B - 1 + m_page,
390 				PAGE_RIGHT);
391 		mach_write_to_2(PAGE_HEADER + PAGE_N_DIRECTION + m_page, 0);
392 	}
393 
394 	ut_ad(!needs_finish());
395 	ut_ad(page_validate(m_page, m_index));
396 }
397 
398 /** Commit inserts done to the page
399 @param[in]	success		Flag whether all inserts succeed. */
400 void PageBulk::commit(bool success)
401 {
402   finish();
403   if (success && !dict_index_is_clust(m_index) && page_is_leaf(m_page))
404     ibuf_set_bitmap_for_bulk_load(m_block, innobase_fill_factor == 100);
405   m_mtr.commit();
406 }
407 
408 /** Compress a page of compressed table
409 @return	true	compress successfully or no need to compress
410 @return	false	compress failed. */
411 bool
412 PageBulk::compress()
413 {
414 	ut_ad(m_page_zip != NULL);
415 
416 	return(page_zip_compress(m_page_zip, m_page, m_index,
417 				 page_zip_level, NULL, &m_mtr));
418 }
ReadStream(PaStream * stream,void * buffer,unsigned long framesRequested)419 
420 /** Get node pointer
421 @return node pointer */
422 dtuple_t*
423 PageBulk::getNodePtr()
424 {
425 	rec_t*		first_rec;
426 	dtuple_t*	node_ptr;
427 
428 	/* Create node pointer */
429 	first_rec = page_rec_get_next(page_get_infimum_rec(m_page));
430 	ut_a(page_rec_is_user_rec(first_rec));
431 	node_ptr = dict_index_build_node_ptr(m_index, first_rec, m_page_no,
432 					     m_heap, m_level);
433 
434 	return(node_ptr);
435 }
436 
437 /** Get split rec in left page.We split a page in half when compresssion fails,
438 and the split rec will be copied to right page.
439 @return split rec */
440 rec_t*
441 PageBulk::getSplitRec()
442 {
443 	rec_t*		rec;
444 	rec_offs*	offsets;
445 	ulint		total_used_size;
446 	ulint		total_recs_size;
447 	ulint		n_recs;
448 
449 	ut_ad(m_page_zip != NULL);
450 	ut_ad(m_rec_no >= 2);
451 	ut_ad(!m_index->is_instant());
452 
453 	ut_ad(page_get_free_space_of_empty(m_is_comp) > m_free_space);
454 	total_used_size = page_get_free_space_of_empty(m_is_comp)
455 		- m_free_space;
456 
457 	total_recs_size = 0;
458 	n_recs = 0;
459 	offsets = NULL;
460 	rec = page_get_infimum_rec(m_page);
461 	const ulint n_core = page_is_leaf(m_page) ? m_index->n_core_fields : 0;
462 
463 	do {
464 		rec = page_rec_get_next(rec);
465 		ut_ad(page_rec_is_user_rec(rec));
466 
467 		offsets = rec_get_offsets(rec, m_index, offsets, n_core,
468 					  ULINT_UNDEFINED, &m_heap);
469 		total_recs_size += rec_offs_size(offsets);
470 		n_recs++;
471 	} while (total_recs_size + page_dir_calc_reserved_space(n_recs)
472 		 < total_used_size / 2);
473 
474 	/* Keep at least one record on left page */
475 	if (page_rec_is_infimum(page_rec_get_prev(rec))) {
476 		rec = page_rec_get_next(rec);
477 		ut_ad(page_rec_is_user_rec(rec));
478 	}
479 
480 	return(rec);
481 }
482 
483 /** Copy all records after split rec including itself.
484 @param[in]	rec	split rec */
485 void
486 PageBulk::copyIn(
487 	rec_t*		split_rec)
488 {
489 
490 	rec_t*		rec = split_rec;
491 	rec_offs*	offsets = NULL;
492 
493 	ut_ad(m_rec_no == 0);
494 	ut_ad(page_rec_is_user_rec(rec));
495 
496 	const ulint n_core = page_rec_is_leaf(rec)
497 		? m_index->n_core_fields : 0;
WriteStream(PaStream * stream,const void * buffer,unsigned long framesRequested)498 
499 	do {
500 		offsets = rec_get_offsets(rec, m_index, offsets, n_core,
501 					  ULINT_UNDEFINED, &m_heap);
502 
503 		insert(rec, offsets);
504 
505 		rec = page_rec_get_next(rec);
506 	} while (!page_rec_is_supremum(rec));
507 
508 	ut_ad(m_rec_no > 0);
509 }
510 
511 /** Remove all records after split rec including itself.
512 @param[in]	rec	split rec	*/
513 void
514 PageBulk::copyOut(
515 	rec_t*		split_rec)
516 {
517 	rec_t*		rec;
518 	rec_t*		last_rec;
519 	ulint		n;
520 
521 	/* Suppose before copyOut, we have 5 records on the page:
522 	infimum->r1->r2->r3->r4->r5->supremum, and r3 is the split rec.
523 
524 	after copyOut, we have 2 records on the page:
525 	infimum->r1->r2->supremum. slot ajustment is not done. */
526 
527 	rec = page_rec_get_next(page_get_infimum_rec(m_page));
528 	last_rec = page_rec_get_prev(page_get_supremum_rec(m_page));
529 	n = 0;
530 
531 	while (rec != split_rec) {
532 		rec = page_rec_get_next(rec);
533 		n++;
534 	}
535 
536 	ut_ad(n > 0);
537 
538 	/* Set last record's next in page */
539 	rec_offs*	offsets = NULL;
540 	rec = page_rec_get_prev(split_rec);
541 	const ulint n_core = page_rec_is_leaf(split_rec)
542 		? m_index->n_core_fields : 0;
543 
544 	offsets = rec_get_offsets(rec, m_index, offsets, n_core,
545 				  ULINT_UNDEFINED, &m_heap);
546 	page_rec_set_next(rec, page_get_supremum_rec(m_page));
547 
548 	/* Set related members */
549 	m_cur_rec = rec;
550 	m_heap_top = rec_get_end(rec, offsets);
551 
552 	offsets = rec_get_offsets(last_rec, m_index, offsets, n_core,
553 				  ULINT_UNDEFINED, &m_heap);
554 
555 	m_free_space += ulint(rec_get_end(last_rec, offsets) - m_heap_top)
556 		+ page_dir_calc_reserved_space(m_rec_no)
557 		- page_dir_calc_reserved_space(n);
558 	ut_ad(lint(m_free_space) > 0);
559 	m_rec_no = n;
560 
561 #ifdef UNIV_DEBUG
562 	m_total_data -= ulint(rec_get_end(last_rec, offsets) - m_heap_top);
563 #endif /* UNIV_DEBUG */
564 }
565 
566 /** Set next page
567 @param[in]	next_page_no	next page no */
568 inline void PageBulk::setNext(ulint next_page_no)
569 {
570 	if (UNIV_LIKELY_NULL(m_page_zip)) {
571 		/* For ROW_FORMAT=COMPRESSED, redo log may be written
572 		in PageBulk::compress(). */
573 		mach_write_to_4(m_page + FIL_PAGE_NEXT, next_page_no);
574 	} else {
575 		mlog_write_ulint(m_page + FIL_PAGE_NEXT, next_page_no,
576 				 MLOG_4BYTES, &m_mtr);
577 	}
578 }
579 
580 /** Set previous page
581 @param[in]	prev_page_no	previous page no */
582 inline void PageBulk::setPrev(ulint prev_page_no)
583 {
584 	if (UNIV_LIKELY_NULL(m_page_zip)) {
585 		/* For ROW_FORMAT=COMPRESSED, redo log may be written
586 		in PageBulk::compress(). */
587 		mach_write_to_4(m_page + FIL_PAGE_PREV, prev_page_no);
588 	} else {
589 		mlog_write_ulint(m_page + FIL_PAGE_PREV, prev_page_no,
590 				 MLOG_4BYTES, &m_mtr);
waitUntilBlioWriteBufferIsEmpty(PaMacBlio * blio,double sampleRate,size_t framesPerBuffer)591 	}
592 }
593 
594 /** Check if required space is available in the page for the rec to be inserted.
595 We check fill factor & padding here.
596 @param[in]	length		required length
597 @return true	if space is available */
598 bool
599 PageBulk::isSpaceAvailable(
600 	ulint		rec_size)
601 {
602 	ulint	slot_size;
603 	ulint	required_space;
604 
605 	slot_size = page_dir_calc_reserved_space(m_rec_no + 1)
606 		- page_dir_calc_reserved_space(m_rec_no);
607 
608 	required_space = rec_size + slot_size;
609 
610 	if (required_space > m_free_space) {
611 		ut_ad(m_rec_no > 0);
612 		return false;
613 	}
614 
615 	/* Fillfactor & Padding apply to both leaf and non-leaf pages.
616 	Note: we keep at least 2 records in a page to avoid B-tree level
617 	growing too high. */
618 	if (m_rec_no >= 2
619 	    && ((m_page_zip == NULL && m_free_space - required_space
620 		 < m_reserved_space)
621 		|| (m_page_zip != NULL && m_free_space - required_space
622 		    < m_padding_space))) {
623 		return(false);
624 	}
625 
626 	return(true);
627 }
628 
629 /** Check whether the record needs to be stored externally.
GetStreamWriteAvailable(PaStream * stream)630 @return false if the entire record can be stored locally on the page  */
631 bool
632 PageBulk::needExt(
633 	const dtuple_t*		tuple,
634 	ulint			rec_size)
635 {
636 	return(page_zip_rec_needs_ext(rec_size, m_is_comp,
637 		dtuple_get_n_fields(tuple), m_block->page.size));
638 }
639 
640 /** Store external record
641 Since the record is not logged yet, so we don't log update to the record.
642 the blob data is logged first, then the record is logged in bulk mode.
643 @param[in]	big_rec		external recrod
644 @param[in]	offsets		record offsets
645 @return	error code */
646 dberr_t
647 PageBulk::storeExt(
648 	const big_rec_t*	big_rec,
649 	rec_offs*		offsets)
650 {
651 	finish();
652 
653 	/* Note: not all fields are initialized in btr_pcur. */
654 	btr_pcur_t	btr_pcur;
655 	btr_pcur.pos_state = BTR_PCUR_IS_POSITIONED;
656 	btr_pcur.latch_mode = BTR_MODIFY_LEAF;
657 	btr_pcur.btr_cur.index = m_index;
658 	btr_pcur.btr_cur.page_cur.index = m_index;
659 	btr_pcur.btr_cur.page_cur.rec = m_cur_rec;
660 	btr_pcur.btr_cur.page_cur.offsets = offsets;
661 	btr_pcur.btr_cur.page_cur.block = m_block;
662 
663 	dberr_t	err = btr_store_big_rec_extern_fields(
664 		&btr_pcur, offsets, big_rec, &m_mtr, BTR_STORE_INSERT_BULK);
665 
666 	/* Reset m_block and m_cur_rec from page cursor, because
667 	block may be changed during blob insert. (FIXME: Can it really?) */
668 	ut_ad(m_block == btr_pcur.btr_cur.page_cur.block);
669 
670 	m_block = btr_pcur.btr_cur.page_cur.block;
671 	m_cur_rec = btr_pcur.btr_cur.page_cur.rec;
672 	m_page = buf_block_get_frame(m_block);
673 
674 	return(err);
675 }
676 
677 /** Release block by commiting mtr
678 Note: log_free_check requires holding no lock/latch in current thread. */
679 void
680 PageBulk::release()
681 {
682 	finish();
683 
684 	/* We fix the block because we will re-pin it soon. */
685 	buf_block_buf_fix_inc(m_block, __FILE__, __LINE__);
686 
687 	/* No other threads can modify this block. */
688 	m_modify_clock = buf_block_get_modify_clock(m_block);
689 
690 	m_mtr.commit();
691 }
692 
693 /** Start mtr and latch the block */
694 dberr_t
695 PageBulk::latch()
696 {
697 	m_mtr.start();
698 
699 	if (m_flush_observer) {
700 		m_mtr.set_log_mode(MTR_LOG_NO_REDO);
701 		m_mtr.set_flush_observer(m_flush_observer);
702 	} else {
703 		m_index->set_modified(m_mtr);
704 	}
705 
706 	ut_ad(m_block->page.buf_fix_count);
707 
708 	/* In case the block is S-latched by page_cleaner. */
709 	if (!buf_page_optimistic_get(RW_X_LATCH, m_block, m_modify_clock,
710 				     __FILE__, __LINE__, &m_mtr)) {
711 		m_block = buf_page_get_gen(page_id_t(m_index->table->space_id,
712 						     m_page_no),
713 					   univ_page_size, RW_X_LATCH,
714 					   m_block, BUF_GET_IF_IN_POOL,
715 					   __FILE__, __LINE__, &m_mtr, &m_err);
716 
717 		if (m_err != DB_SUCCESS) {
718 			return (m_err);
719 		}
720 
721 		ut_ad(m_block != NULL);
722 	}
723 
724 	buf_block_buf_fix_dec(m_block);
725 
726 	ut_ad(m_block->page.buf_fix_count);
727 
728 	ut_ad(m_cur_rec > m_page && m_cur_rec < m_heap_top);
729 
730 	return (m_err);
731 }
732 
733 /** Split a page
734 @param[in]	page_bulk	page to split
735 @param[in]	next_page_bulk	next page
736 @return	error code */
737 dberr_t
738 BtrBulk::pageSplit(
739 	PageBulk*	page_bulk,
740 	PageBulk*	next_page_bulk)
741 {
742 	ut_ad(page_bulk->getPageZip() != NULL);
743 
744 	if (page_bulk->getRecNo() <= 1) {
745 		return(DB_TOO_BIG_RECORD);
746 	}
747 
748 	/* Initialize a new page */
749 	PageBulk new_page_bulk(m_index, m_trx->id, FIL_NULL,
750 			       page_bulk->getLevel(), m_flush_observer);
751 	dberr_t	err = new_page_bulk.init();
752 	if (err != DB_SUCCESS) {
753 		return(err);
754 	}
755 
756 	/* Copy the upper half to the new page. */
757 	rec_t*	split_rec = page_bulk->getSplitRec();
758 	new_page_bulk.copyIn(split_rec);
759 	page_bulk->copyOut(split_rec);
760 
761 	/* Commit the pages after split. */
762 	err = pageCommit(page_bulk, &new_page_bulk, true);
763 	if (err != DB_SUCCESS) {
764 		pageAbort(&new_page_bulk);
765 		return(err);
766 	}
767 
768 	err = pageCommit(&new_page_bulk, next_page_bulk, true);
769 	if (err != DB_SUCCESS) {
770 		pageAbort(&new_page_bulk);
771 		return(err);
772 	}
773 
774 	return(err);
775 }
776 
777 /** Commit(finish) a page. We set next/prev page no, compress a page of
778 compressed table and split the page if compression fails, insert a node
779 pointer to father page if needed, and commit mini-transaction.
780 @param[in]	page_bulk	page to commit
781 @param[in]	next_page_bulk	next page
782 @param[in]	insert_father	false when page_bulk is a root page and
783 				true when it's a non-root page
784 @return	error code */
785 dberr_t
786 BtrBulk::pageCommit(
787 	PageBulk*	page_bulk,
788 	PageBulk*	next_page_bulk,
789 	bool		insert_father)
790 {
791 	page_bulk->finish();
792 
793 	/* Set page links */
794 	if (next_page_bulk != NULL) {
795 		ut_ad(page_bulk->getLevel() == next_page_bulk->getLevel());
796 
797 		page_bulk->setNext(next_page_bulk->getPageNo());
798 		next_page_bulk->setPrev(page_bulk->getPageNo());
799 	} else {
800 		/** Suppose a page is released and latched again, we need to
801 		mark it modified in mini-transaction.  */
802 		page_bulk->setNext(FIL_NULL);
803 	}
804 
805 	ut_ad(!rw_lock_own_flagged(&m_index->lock,
806 				   RW_LOCK_FLAG_X | RW_LOCK_FLAG_SX
807 				   | RW_LOCK_FLAG_S));
808 
809 	/* Compress page if it's a compressed table. */
810 	if (page_bulk->getPageZip() != NULL && !page_bulk->compress()) {
811 		return(pageSplit(page_bulk, next_page_bulk));
812 	}
813 
814 	/* Insert node pointer to father page. */
815 	if (insert_father) {
816 		dtuple_t*	node_ptr = page_bulk->getNodePtr();
817 		dberr_t		err = insert(node_ptr, page_bulk->getLevel()+1);
818 
819 		if (err != DB_SUCCESS) {
820 			return(err);
821 		}
822 	}
823 
824 	/* Commit mtr. */
825 	page_bulk->commit(true);
826 
827 	return(DB_SUCCESS);
828 }
829 
830 /** Log free check */
831 inline void BtrBulk::logFreeCheck()
832 {
833 	if (log_sys.check_flush_or_checkpoint) {
834 		release();
835 
836 		log_free_check();
837 
838 		latch();
839 	}
840 }
841 
842 /** Release all latches */
843 void
844 BtrBulk::release()
845 {
846 	ut_ad(m_root_level + 1 == m_page_bulks.size());
847 
848 	for (ulint level = 0; level <= m_root_level; level++) {
849 		PageBulk*    page_bulk = m_page_bulks.at(level);
850 
851 		page_bulk->release();
852 	}
853 }
854 
855 /** Re-latch all latches */
856 void
857 BtrBulk::latch()
858 {
859 	ut_ad(m_root_level + 1 == m_page_bulks.size());
860 
861 	for (ulint level = 0; level <= m_root_level; level++) {
862 		PageBulk*    page_bulk = m_page_bulks.at(level);
863 		page_bulk->latch();
864 	}
865 }
866 
867 /** Insert a tuple to page in a level
868 @param[in]	tuple	tuple to insert
869 @param[in]	level	B-tree level
870 @return error code */
871 dberr_t
872 BtrBulk::insert(
873 	dtuple_t*	tuple,
874 	ulint		level)
875 {
876 	bool		is_left_most = false;
877 	dberr_t		err = DB_SUCCESS;
878 
879 	/* Check if we need to create a PageBulk for the level. */
880 	if (level + 1 > m_page_bulks.size()) {
881 		PageBulk*	new_page_bulk
882 			= UT_NEW_NOKEY(PageBulk(m_index, m_trx->id, FIL_NULL,
883 						level, m_flush_observer));
884 		err = new_page_bulk->init();
885 		if (err != DB_SUCCESS) {
886 			UT_DELETE(new_page_bulk);
887 			return(err);
888 		}
889 
890 		m_page_bulks.push_back(new_page_bulk);
891 		ut_ad(level + 1 == m_page_bulks.size());
892 		m_root_level = level;
893 
894 		is_left_most = true;
895 	}
896 
897 	ut_ad(m_page_bulks.size() > level);
898 
899 	PageBulk*	page_bulk = m_page_bulks.at(level);
900 
901 	if (is_left_most && level > 0 && page_bulk->getRecNo() == 0) {
902 		/* The node pointer must be marked as the predefined minimum
903 		record,	as there is no lower alphabetical limit to records in
904 		the leftmost node of a level: */
905 		dtuple_set_info_bits(tuple, dtuple_get_info_bits(tuple)
906 					    | REC_INFO_MIN_REC_FLAG);
907 	}
908 
909 	ulint		n_ext = 0;
910 	ulint		rec_size = rec_get_converted_size(m_index, tuple, n_ext);
911 	big_rec_t*	big_rec = NULL;
912 	rec_t*		rec = NULL;
913 	rec_offs*	offsets = NULL;
914 
915 	if (page_bulk->needExt(tuple, rec_size)) {
916 		/* The record is so big that we have to store some fields
917 		externally on separate database pages */
918 		big_rec = dtuple_convert_big_rec(m_index, 0, tuple, &n_ext);
919 
920 		if (big_rec == NULL) {
921 			return(DB_TOO_BIG_RECORD);
922 		}
923 
924 		rec_size = rec_get_converted_size(m_index, tuple, n_ext);
925 	}
926 
927 	if (page_bulk->getPageZip() != NULL
928 	    && page_zip_is_too_big(m_index, tuple)) {
929 		err = DB_TOO_BIG_RECORD;
930 		goto func_exit;
931 	}
932 
933 	if (!page_bulk->isSpaceAvailable(rec_size)) {
934 		/* Create a sibling page_bulk. */
935 		PageBulk*	sibling_page_bulk;
936 		sibling_page_bulk = UT_NEW_NOKEY(PageBulk(m_index, m_trx->id,
937 							  FIL_NULL, level,
938 							  m_flush_observer));
939 		err = sibling_page_bulk->init();
940 		if (err != DB_SUCCESS) {
941 			UT_DELETE(sibling_page_bulk);
942 			goto func_exit;
943 		}
944 
945 		/* Commit page bulk. */
946 		err = pageCommit(page_bulk, sibling_page_bulk, true);
947 		if (err != DB_SUCCESS) {
948 			pageAbort(sibling_page_bulk);
949 			UT_DELETE(sibling_page_bulk);
950 			goto func_exit;
951 		}
952 
953 		/* Set new page bulk to page_bulks. */
954 		ut_ad(sibling_page_bulk->getLevel() <= m_root_level);
955 		m_page_bulks.at(level) = sibling_page_bulk;
956 
957 		UT_DELETE(page_bulk);
958 		page_bulk = sibling_page_bulk;
959 
960 		/* Important: log_free_check whether we need a checkpoint. */
961 		if (page_is_leaf(sibling_page_bulk->getPage())) {
962 			if (trx_is_interrupted(m_trx)) {
963 				if (m_flush_observer) {
964 					m_flush_observer->interrupted();
965 				}
966 
967 				err = DB_INTERRUPTED;
968 				goto func_exit;
969 			}
970 
971 			/* Wake up page cleaner to flush dirty pages. */
972 			srv_inc_activity_count();
973 			os_event_set(buf_flush_event);
974 
975 			logFreeCheck();
976 		}
977 
978 	}
979 
980 	/* Convert tuple to rec. */
981         rec = rec_convert_dtuple_to_rec(static_cast<byte*>(mem_heap_alloc(
982 		page_bulk->m_heap, rec_size)), m_index, tuple, n_ext);
983         offsets = rec_get_offsets(rec, m_index, offsets, level
984 				  ? 0 : m_index->n_core_fields,
985 				  ULINT_UNDEFINED, &page_bulk->m_heap);
986 
987 	page_bulk->insert(rec, offsets);
988 
989 	if (big_rec != NULL) {
990 		ut_ad(dict_index_is_clust(m_index));
991 		ut_ad(page_bulk->getLevel() == 0);
992 		ut_ad(page_bulk == m_page_bulks.at(0));
993 
994 		/* Release all pages above the leaf level */
995 		for (ulint level = 1; level <= m_root_level; level++) {
996 			m_page_bulks.at(level)->release();
997 		}
998 
999 		err = page_bulk->storeExt(big_rec, offsets);
1000 
1001 		/* Latch */
1002 		for (ulint level = 1; level <= m_root_level; level++) {
1003 			PageBulk*    page_bulk = m_page_bulks.at(level);
1004 			page_bulk->latch();
1005 		}
1006 	}
1007 
1008 func_exit:
1009 	if (big_rec != NULL) {
1010 		dtuple_convert_back_big_rec(m_index, tuple, big_rec);
1011 	}
1012 
1013 	return(err);
1014 }
1015 
1016 /** Btree bulk load finish. We commit the last page in each level
1017 and copy the last page in top level to the root page of the index
1018 if no error occurs.
1019 @param[in]	err	whether bulk load was successful until now
1020 @return error code  */
1021 dberr_t
1022 BtrBulk::finish(dberr_t	err)
1023 {
1024 	ulint		last_page_no = FIL_NULL;
1025 
1026 	ut_ad(!m_index->table->is_temporary());
1027 
1028 	if (m_page_bulks.size() == 0) {
1029 		/* The table is empty. The root page of the index tree
1030 		is already in a consistent state. No need to flush. */
1031 		return(err);
1032 	}
1033 
1034 	ut_ad(m_root_level + 1 == m_page_bulks.size());
1035 
1036 	/* Finish all page bulks */
1037 	for (ulint level = 0; level <= m_root_level; level++) {
1038 		PageBulk*	page_bulk = m_page_bulks.at(level);
1039 
1040 		last_page_no = page_bulk->getPageNo();
1041 
1042 		if (err == DB_SUCCESS) {
1043 			err = pageCommit(page_bulk, NULL,
1044 					 level != m_root_level);
1045 		}
1046 
1047 		if (err != DB_SUCCESS) {
1048 			pageAbort(page_bulk);
1049 		}
1050 
1051 		UT_DELETE(page_bulk);
1052 	}
1053 
1054 	if (err == DB_SUCCESS) {
1055 		rec_t*		first_rec;
1056 		mtr_t		mtr;
1057 		buf_block_t*	last_block;
1058 		PageBulk	root_page_bulk(m_index, m_trx->id,
1059 					       m_index->page, m_root_level,
1060 					       m_flush_observer);
1061 
1062 		mtr.start();
1063 		m_index->set_modified(mtr);
1064 		mtr_x_lock_index(m_index, &mtr);
1065 
1066 		ut_ad(last_page_no != FIL_NULL);
1067 		last_block = btr_block_get(
1068 			page_id_t(m_index->table->space_id, last_page_no),
1069 			page_size_t(m_index->table->space->flags),
1070 			RW_X_LATCH, m_index, &mtr);
1071 		first_rec = page_rec_get_next(
1072 			page_get_infimum_rec(last_block->frame));
1073 		ut_ad(page_rec_is_user_rec(first_rec));
1074 
1075 		/* Copy last page to root page. */
1076 		err = root_page_bulk.init();
1077 		if (err != DB_SUCCESS) {
1078 			mtr.commit();
1079 			return(err);
1080 		}
1081 		root_page_bulk.copyIn(first_rec);
1082 		root_page_bulk.finish();
1083 
1084 		/* Remove last page. */
1085 		btr_page_free(m_index, last_block, &mtr);
1086 
1087 		/* Do not flush the last page. */
1088 		last_block->page.flush_observer = NULL;
1089 
1090 		mtr.commit();
1091 
1092 		err = pageCommit(&root_page_bulk, NULL, false);
1093 		ut_ad(err == DB_SUCCESS);
1094 	}
1095 
1096 	ut_ad(!sync_check_iterate(dict_sync_check()));
1097 
1098 	ut_ad(err != DB_SUCCESS
1099 	      || btr_validate_index(m_index, NULL, false) == DB_SUCCESS);
1100 	return(err);
1101 }
1102