1 /*****************************************************************************
2
3 Copyright (c) 2014, 2019, Oracle and/or its affiliates. All Rights Reserved.
4 Copyright (c) 2017, 2021, MariaDB Corporation.
5
6 This program is free software; you can redistribute it and/or modify it under
7 the terms of the GNU General Public License as published by the Free Software
8 Foundation; version 2 of the License.
9
10 This program is distributed in the hope that it will be useful, but WITHOUT
11 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License along with
15 this program; if not, write to the Free Software Foundation, Inc.,
16 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
17
18 *****************************************************************************/
19
20 /**************************************************//**
21 @file btr/btr0bulk.cc
22 The B-tree bulk load
23
24 Created 03/11/2014 Shaohua Wang
25 *******************************************************/
26
27 #include "btr0bulk.h"
28 #include "btr0btr.h"
29 #include "btr0cur.h"
30 #include "btr0pcur.h"
31 #include "ibuf0ibuf.h"
32 #include "page0page.h"
33 #include "trx0trx.h"
34
35 /** Innodb B-tree index fill factor for bulk load. */
36 uint innobase_fill_factor;
37 /** whether to reduce redo logging during ALTER TABLE */
38 my_bool innodb_log_optimize_ddl;
39
40 /** Initialize members, allocate page if needed and start mtr.
41 Note: we commit all mtrs on failure.
42 @return error code. */
43 dberr_t
44 PageBulk::init()
45 {
46 buf_block_t* new_block;
47 page_t* new_page;
48 page_zip_des_t* new_page_zip;
49 ulint new_page_no;
50
51 ut_ad(m_heap == NULL);
52 m_heap = mem_heap_create(1000);
53
54 m_mtr.start();
55
56 if (m_flush_observer) {
57 m_mtr.set_log_mode(MTR_LOG_NO_REDO);
58 m_mtr.set_flush_observer(m_flush_observer);
59 } else {
60 m_index->set_modified(m_mtr);
61 }
62
63 if (m_page_no == FIL_NULL) {
64 mtr_t alloc_mtr;
65
66 /* We commit redo log for allocation by a separate mtr,
67 because we don't guarantee pages are committed following
68 the allocation order, and we will always generate redo log
69 for page allocation, even when creating a new tablespace. */
70 alloc_mtr.start();
71 m_index->set_modified(alloc_mtr);
72
73 ulint n_reserved;
74 bool success;
75 success = fsp_reserve_free_extents(&n_reserved,
76 m_index->table->space,
computeSampleSizeFromFormat(PaSampleFormat format)77 1, FSP_NORMAL, &alloc_mtr);
78 if (!success) {
79 alloc_mtr.commit();
80 m_mtr.commit();
81 return(DB_OUT_OF_FILE_SPACE);
82 }
83
84 /* Allocate a new page. */
85 new_block = btr_page_alloc(m_index, 0, FSP_UP, m_level,
86 &alloc_mtr, &m_mtr);
87
88 m_index->table->space->release_free_extents(n_reserved);
89
90 alloc_mtr.commit();
91
computeSampleSizeFromFormatPow2(PaSampleFormat format)92 new_page = buf_block_get_frame(new_block);
93 new_page_zip = buf_block_get_page_zip(new_block);
94 new_page_no = page_get_page_no(new_page);
95
96 if (new_page_zip) {
97 page_create_zip(new_block, m_index, m_level, 0,
98 NULL, &m_mtr);
99 memset(FIL_PAGE_PREV + new_page, 0xff, 8);
100 page_zip_write_header(new_page_zip,
101 FIL_PAGE_PREV + new_page,
102 8, &m_mtr);
103 mach_write_to_8(PAGE_HEADER + PAGE_INDEX_ID + new_page,
104 m_index->id);
105 page_zip_write_header(new_page_zip,
106 PAGE_HEADER + PAGE_INDEX_ID
107 + new_page, 8, &m_mtr);
108 } else {
109 ut_ad(!dict_index_is_spatial(m_index));
110 page_create(new_block, &m_mtr,
111 dict_table_is_comp(m_index->table),
112 false);
113 mlog_write_ulint(FIL_PAGE_PREV + new_page, FIL_NULL,
114 MLOG_4BYTES, &m_mtr);
115 mlog_write_ulint(FIL_PAGE_NEXT + new_page, FIL_NULL,
initializeBlioRingBuffers(PaMacBlio * blio,PaSampleFormat inputSampleFormat,PaSampleFormat outputSampleFormat,long ringBufferSizeInFrames,int inChan,int outChan)116 MLOG_4BYTES, &m_mtr);
117 mlog_write_ulint(PAGE_HEADER + PAGE_LEVEL + new_page,
118 m_level, MLOG_2BYTES, &m_mtr);
119 mlog_write_ull(PAGE_HEADER + PAGE_INDEX_ID + new_page,
120 m_index->id, &m_mtr);
121 }
122 } else {
123 new_block = btr_block_get(
124 page_id_t(m_index->table->space_id, m_page_no),
125 page_size_t(m_index->table->space->flags),
126 RW_X_LATCH, m_index, &m_mtr);
127
128 new_page = buf_block_get_frame(new_block);
129 new_page_zip = buf_block_get_page_zip(new_block);
130 new_page_no = page_get_page_no(new_page);
131 ut_ad(m_page_no == new_page_no);
132
133 ut_ad(page_dir_get_n_heap(new_page) == PAGE_HEAP_NO_USER_LOW);
134
135 btr_page_set_level(new_page, new_page_zip, m_level, &m_mtr);
136 }
137
138 if (!m_level && dict_index_is_sec_or_ibuf(m_index)) {
139 page_update_max_trx_id(new_block, new_page_zip, m_trx_id,
140 &m_mtr);
141 }
142
143 m_block = new_block;
144 m_page = new_page;
145 m_page_zip = new_page_zip;
146 m_page_no = new_page_no;
147 m_cur_rec = page_get_infimum_rec(new_page);
148 ut_ad(m_is_comp == !!page_is_comp(new_page));
149 m_free_space = page_get_free_space_of_empty(m_is_comp);
150
151 if (innobase_fill_factor == 100 && dict_index_is_clust(m_index)) {
152 /* Keep default behavior compatible with 5.6 */
153 m_reserved_space = dict_index_get_space_reserve();
154 } else {
155 m_reserved_space =
156 srv_page_size * (100 - innobase_fill_factor) / 100;
157 }
158
159 m_padding_space =
160 srv_page_size - dict_index_zip_pad_optimal_page_size(m_index);
161 m_heap_top = page_header_get_ptr(new_page, PAGE_HEAP_TOP);
162 m_rec_no = page_header_get_field(new_page, PAGE_N_RECS);
163 /* Temporarily reset PAGE_DIRECTION_B from PAGE_NO_DIRECTION to 0,
164 without writing redo log, to ensure that needs_finish() will hold
165 on an empty page. */
166 ut_ad(m_page[PAGE_HEADER + PAGE_DIRECTION_B] == PAGE_NO_DIRECTION);
167 m_page[PAGE_HEADER + PAGE_DIRECTION_B] = 0;
168 ut_d(m_total_data = 0);
169 /* See page_copy_rec_list_end_to_created_page() */
170 ut_d(page_header_set_field(m_page, NULL, PAGE_HEAP_TOP,
171 srv_page_size - 1));
172
173 return(DB_SUCCESS);
174 }
175
176 /** Insert a record in the page.
177 @param[in] rec record
178 @param[in] offsets record offsets */
179 void
180 PageBulk::insert(
181 const rec_t* rec,
182 rec_offs* offsets)
183 {
184 ulint rec_size;
185
186 ut_ad(m_heap != NULL);
187
188 rec_size = rec_offs_size(offsets);
189 ut_d(const bool is_leaf = page_rec_is_leaf(m_cur_rec));
190
191 #ifdef UNIV_DEBUG
192 /* Check whether records are in order. */
193 if (!page_rec_is_infimum_low(page_offset(m_cur_rec))) {
194 rec_t* old_rec = m_cur_rec;
195 rec_offs* old_offsets = rec_get_offsets(
196 old_rec, m_index, NULL, is_leaf
197 ? m_index->n_core_fields : 0,
198 ULINT_UNDEFINED, &m_heap);
199
200 ut_ad(cmp_rec_rec(rec, old_rec, offsets, old_offsets, m_index)
201 > 0);
202 }
203
204 m_total_data += rec_size;
205 #endif /* UNIV_DEBUG */
206
207 /* 1. Copy the record to page. */
208 rec_t* insert_rec = rec_copy(m_heap_top, rec, offsets);
blioSetIsInputEmpty(PaMacBlio * blio,bool isEmpty)209 rec_offs_make_valid(insert_rec, m_index, is_leaf, offsets);
210
211 /* 2. Insert the record in the linked list. */
212 /* 3. Set the n_owned field in the inserted record to zero,
213 and set the heap_no field. */
214 if (m_is_comp) {
215 ulint next_offs = rec_get_next_offs(m_cur_rec, TRUE);
216 rec_set_next_offs_new(insert_rec, next_offs);
217 rec_set_next_offs_new(m_cur_rec, page_offset(insert_rec));
218
219 rec_set_n_owned_new(insert_rec, NULL, 0);
220 rec_set_heap_no_new(insert_rec,
221 PAGE_HEAP_NO_USER_LOW + m_rec_no);
222 } else {
223 ulint next_offs = rec_get_next_offs(m_cur_rec, FALSE);
224 rec_set_next_offs_old(insert_rec, next_offs);
225 rec_set_next_offs_old(m_cur_rec, page_offset(insert_rec));
226
227 rec_set_n_owned_old(insert_rec, 0);
228 rec_set_heap_no_old(insert_rec,
229 PAGE_HEAP_NO_USER_LOW + m_rec_no);
230 }
231
232 /* 4. Set member variables. */
233 ulint slot_size;
234 slot_size = page_dir_calc_reserved_space(m_rec_no + 1)
blioSetIsOutputFull(PaMacBlio * blio,bool isFull)235 - page_dir_calc_reserved_space(m_rec_no);
236
237 ut_ad(m_free_space >= rec_size + slot_size);
238 ut_ad(m_heap_top + rec_size < m_page + srv_page_size);
239
240 m_free_space -= rec_size + slot_size;
241 m_heap_top += rec_size;
242 m_rec_no += 1;
243
244 if (!m_flush_observer && !m_page_zip) {
245 /* For ROW_FORMAT=COMPRESSED, redo log may be written
246 in PageBulk::compress(). */
247 page_cur_insert_rec_write_log(insert_rec, rec_size,
248 m_cur_rec, m_index, &m_mtr);
249 }
250
251 m_cur_rec = insert_rec;
252 }
253
254 inline bool PageBulk::needs_finish() const
255 {
256 ut_ad(page_align(m_cur_rec) == m_block->frame);
257 ut_ad(m_page == m_block->frame);
258 if (!m_page[PAGE_HEADER + PAGE_DIRECTION_B])
259 return true;
260 ulint heap_no, n_heap= page_header_get_field(m_page, PAGE_N_HEAP);
261 ut_ad((n_heap & 0x7fff) >= PAGE_HEAP_NO_USER_LOW);
262 if (n_heap & 0x8000)
263 {
264 n_heap&= 0x7fff;
resetBlioRingBuffers(PaMacBlio * blio)265 heap_no= rec_get_heap_no_new(m_cur_rec);
266 if (heap_no == PAGE_HEAP_NO_INFIMUM &&
267 page_header_get_field(m_page, PAGE_HEAP_TOP) == PAGE_NEW_SUPREMUM_END)
268 return false;
269 }
270 else
271 {
272 heap_no= rec_get_heap_no_old(m_cur_rec);
273 if (heap_no == PAGE_HEAP_NO_INFIMUM &&
274 page_header_get_field(m_page, PAGE_HEAP_TOP) == PAGE_OLD_SUPREMUM_END)
275 return false;
276 }
277 return heap_no != n_heap - 1;
278 }
279
280 /** Mark end of insertion to the page. Scan all records to set page dirs,
281 and set page header members.
282 Note: we refer to page_copy_rec_list_end_to_created_page. */
283 void
284 PageBulk::finish()
285 {
286 ut_ad(!dict_index_is_spatial(m_index));
287
288 if (!needs_finish()) {
289 return;
290 }
291
292 ut_ad(m_total_data + page_dir_calc_reserved_space(m_rec_no)
293 <= page_get_free_space_of_empty(m_is_comp));
294 #ifdef UNIV_DEBUG
295 /* See page_copy_rec_list_end_to_created_page() */
296 if (m_rec_no) {
297 page_dir_set_n_slots(m_page, NULL, srv_page_size / 2);
298 }
299 mach_write_to_2(PAGE_HEADER + PAGE_HEAP_TOP + m_page,
300 ulint(m_heap_top - m_page));
301 #endif
302
303 ulint count = 0;
304 ulint n_recs = 0;
305 ulint slot_index = 0;
306 rec_t* insert_rec = page_rec_get_next(page_get_infimum_rec(m_page));
307 page_dir_slot_t* slot = NULL;
308
destroyBlioRingBuffers(PaMacBlio * blio)309 /* Set owner & dir. */
310 while (!page_rec_is_supremum(insert_rec)) {
311 count++;
312 n_recs++;
313
314 if (count == (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2) {
315
316 slot_index++;
317
318 slot = page_dir_get_nth_slot(m_page, slot_index);
319
320 page_dir_slot_set_rec(slot, insert_rec);
321 page_dir_slot_set_n_owned(slot, NULL, count);
322
323 count = 0;
324 }
325
326 insert_rec = page_rec_get_next(insert_rec);
327 }
328
329 if (slot_index > 0
330 && (count + 1 + (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2
331 <= PAGE_DIR_SLOT_MAX_N_OWNED)) {
332 /* We can merge the two last dir slots. This operation is
333 here to make this function imitate exactly the equivalent
334 task made using page_cur_insert_rec, which we use in database
335 recovery to reproduce the task performed by this function.
336 To be able to check the correctness of recovery, it is good
337 that it imitates exactly. */
338
339 count += (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2;
340
BlioCallback(const void * input,void * output,unsigned long frameCount,const PaStreamCallbackTimeInfo * timeInfo,PaStreamCallbackFlags statusFlags,void * userData)341 page_dir_slot_set_n_owned(slot, NULL, 0);
342
343 slot_index--;
344 }
345
346 slot = page_dir_get_nth_slot(m_page, 1 + slot_index);
347 page_dir_slot_set_rec(slot, page_get_supremum_rec(m_page));
348 page_dir_slot_set_n_owned(slot, NULL, count + 1);
349
350 ut_ad(!page_get_instant(m_page));
351
352 if (!m_rec_no) {
353 /* Restore PAGE_DIRECTION_B from 0 to
354 PAGE_NO_DIRECTION like it should be on an empty page,
355 again without writing redo log. */
356 m_page[PAGE_HEADER + PAGE_DIRECTION_B] = PAGE_NO_DIRECTION;
357 } else if (!m_flush_observer && !m_page_zip) {
358 mlog_write_ulint(PAGE_HEADER + PAGE_N_DIR_SLOTS + m_page,
359 2 + slot_index, MLOG_2BYTES, &m_mtr);
360 mlog_write_ulint(PAGE_HEADER + PAGE_HEAP_TOP + m_page,
361 ulint(m_heap_top - m_page),
362 MLOG_2BYTES, &m_mtr);
363 mlog_write_ulint(PAGE_HEADER + PAGE_N_HEAP + m_page,
364 (PAGE_HEAP_NO_USER_LOW + m_rec_no)
365 | ulint(m_is_comp) << 15,
366 MLOG_2BYTES, &m_mtr);
367 mlog_write_ulint(PAGE_HEADER + PAGE_N_RECS + m_page, m_rec_no,
368 MLOG_2BYTES, &m_mtr);
369 mlog_write_ulint(PAGE_HEADER + PAGE_LAST_INSERT + m_page,
370 ulint(m_cur_rec - m_page),
371 MLOG_2BYTES, &m_mtr);
372 mlog_write_ulint(PAGE_HEADER + PAGE_DIRECTION_B - 1 + m_page,
373 PAGE_RIGHT, MLOG_2BYTES, &m_mtr);
374 mlog_write_ulint(PAGE_HEADER + PAGE_N_DIRECTION + m_page, 0,
375 MLOG_2BYTES, &m_mtr);
376 } else {
377 /* For ROW_FORMAT=COMPRESSED, redo log may be written
378 in PageBulk::compress(). */
379 mach_write_to_2(PAGE_HEADER + PAGE_N_DIR_SLOTS + m_page,
380 2 + slot_index);
381 mach_write_to_2(PAGE_HEADER + PAGE_HEAP_TOP + m_page,
382 ulint(m_heap_top - m_page));
383 mach_write_to_2(PAGE_HEADER + PAGE_N_HEAP + m_page,
384 (PAGE_HEAP_NO_USER_LOW + m_rec_no)
385 | ulint(m_is_comp) << 15);
386 mach_write_to_2(PAGE_HEADER + PAGE_N_RECS + m_page, m_rec_no);
387 mach_write_to_2(PAGE_HEADER + PAGE_LAST_INSERT + m_page,
388 ulint(m_cur_rec - m_page));
389 mach_write_to_2(PAGE_HEADER + PAGE_DIRECTION_B - 1 + m_page,
390 PAGE_RIGHT);
391 mach_write_to_2(PAGE_HEADER + PAGE_N_DIRECTION + m_page, 0);
392 }
393
394 ut_ad(!needs_finish());
395 ut_ad(page_validate(m_page, m_index));
396 }
397
398 /** Commit inserts done to the page
399 @param[in] success Flag whether all inserts succeed. */
400 void PageBulk::commit(bool success)
401 {
402 finish();
403 if (success && !dict_index_is_clust(m_index) && page_is_leaf(m_page))
404 ibuf_set_bitmap_for_bulk_load(m_block, innobase_fill_factor == 100);
405 m_mtr.commit();
406 }
407
408 /** Compress a page of compressed table
409 @return true compress successfully or no need to compress
410 @return false compress failed. */
411 bool
412 PageBulk::compress()
413 {
414 ut_ad(m_page_zip != NULL);
415
416 return(page_zip_compress(m_page_zip, m_page, m_index,
417 page_zip_level, NULL, &m_mtr));
418 }
ReadStream(PaStream * stream,void * buffer,unsigned long framesRequested)419
420 /** Get node pointer
421 @return node pointer */
422 dtuple_t*
423 PageBulk::getNodePtr()
424 {
425 rec_t* first_rec;
426 dtuple_t* node_ptr;
427
428 /* Create node pointer */
429 first_rec = page_rec_get_next(page_get_infimum_rec(m_page));
430 ut_a(page_rec_is_user_rec(first_rec));
431 node_ptr = dict_index_build_node_ptr(m_index, first_rec, m_page_no,
432 m_heap, m_level);
433
434 return(node_ptr);
435 }
436
437 /** Get split rec in left page.We split a page in half when compresssion fails,
438 and the split rec will be copied to right page.
439 @return split rec */
440 rec_t*
441 PageBulk::getSplitRec()
442 {
443 rec_t* rec;
444 rec_offs* offsets;
445 ulint total_used_size;
446 ulint total_recs_size;
447 ulint n_recs;
448
449 ut_ad(m_page_zip != NULL);
450 ut_ad(m_rec_no >= 2);
451 ut_ad(!m_index->is_instant());
452
453 ut_ad(page_get_free_space_of_empty(m_is_comp) > m_free_space);
454 total_used_size = page_get_free_space_of_empty(m_is_comp)
455 - m_free_space;
456
457 total_recs_size = 0;
458 n_recs = 0;
459 offsets = NULL;
460 rec = page_get_infimum_rec(m_page);
461 const ulint n_core = page_is_leaf(m_page) ? m_index->n_core_fields : 0;
462
463 do {
464 rec = page_rec_get_next(rec);
465 ut_ad(page_rec_is_user_rec(rec));
466
467 offsets = rec_get_offsets(rec, m_index, offsets, n_core,
468 ULINT_UNDEFINED, &m_heap);
469 total_recs_size += rec_offs_size(offsets);
470 n_recs++;
471 } while (total_recs_size + page_dir_calc_reserved_space(n_recs)
472 < total_used_size / 2);
473
474 /* Keep at least one record on left page */
475 if (page_rec_is_infimum(page_rec_get_prev(rec))) {
476 rec = page_rec_get_next(rec);
477 ut_ad(page_rec_is_user_rec(rec));
478 }
479
480 return(rec);
481 }
482
483 /** Copy all records after split rec including itself.
484 @param[in] rec split rec */
485 void
486 PageBulk::copyIn(
487 rec_t* split_rec)
488 {
489
490 rec_t* rec = split_rec;
491 rec_offs* offsets = NULL;
492
493 ut_ad(m_rec_no == 0);
494 ut_ad(page_rec_is_user_rec(rec));
495
496 const ulint n_core = page_rec_is_leaf(rec)
497 ? m_index->n_core_fields : 0;
WriteStream(PaStream * stream,const void * buffer,unsigned long framesRequested)498
499 do {
500 offsets = rec_get_offsets(rec, m_index, offsets, n_core,
501 ULINT_UNDEFINED, &m_heap);
502
503 insert(rec, offsets);
504
505 rec = page_rec_get_next(rec);
506 } while (!page_rec_is_supremum(rec));
507
508 ut_ad(m_rec_no > 0);
509 }
510
511 /** Remove all records after split rec including itself.
512 @param[in] rec split rec */
513 void
514 PageBulk::copyOut(
515 rec_t* split_rec)
516 {
517 rec_t* rec;
518 rec_t* last_rec;
519 ulint n;
520
521 /* Suppose before copyOut, we have 5 records on the page:
522 infimum->r1->r2->r3->r4->r5->supremum, and r3 is the split rec.
523
524 after copyOut, we have 2 records on the page:
525 infimum->r1->r2->supremum. slot ajustment is not done. */
526
527 rec = page_rec_get_next(page_get_infimum_rec(m_page));
528 last_rec = page_rec_get_prev(page_get_supremum_rec(m_page));
529 n = 0;
530
531 while (rec != split_rec) {
532 rec = page_rec_get_next(rec);
533 n++;
534 }
535
536 ut_ad(n > 0);
537
538 /* Set last record's next in page */
539 rec_offs* offsets = NULL;
540 rec = page_rec_get_prev(split_rec);
541 const ulint n_core = page_rec_is_leaf(split_rec)
542 ? m_index->n_core_fields : 0;
543
544 offsets = rec_get_offsets(rec, m_index, offsets, n_core,
545 ULINT_UNDEFINED, &m_heap);
546 page_rec_set_next(rec, page_get_supremum_rec(m_page));
547
548 /* Set related members */
549 m_cur_rec = rec;
550 m_heap_top = rec_get_end(rec, offsets);
551
552 offsets = rec_get_offsets(last_rec, m_index, offsets, n_core,
553 ULINT_UNDEFINED, &m_heap);
554
555 m_free_space += ulint(rec_get_end(last_rec, offsets) - m_heap_top)
556 + page_dir_calc_reserved_space(m_rec_no)
557 - page_dir_calc_reserved_space(n);
558 ut_ad(lint(m_free_space) > 0);
559 m_rec_no = n;
560
561 #ifdef UNIV_DEBUG
562 m_total_data -= ulint(rec_get_end(last_rec, offsets) - m_heap_top);
563 #endif /* UNIV_DEBUG */
564 }
565
566 /** Set next page
567 @param[in] next_page_no next page no */
568 inline void PageBulk::setNext(ulint next_page_no)
569 {
570 if (UNIV_LIKELY_NULL(m_page_zip)) {
571 /* For ROW_FORMAT=COMPRESSED, redo log may be written
572 in PageBulk::compress(). */
573 mach_write_to_4(m_page + FIL_PAGE_NEXT, next_page_no);
574 } else {
575 mlog_write_ulint(m_page + FIL_PAGE_NEXT, next_page_no,
576 MLOG_4BYTES, &m_mtr);
577 }
578 }
579
580 /** Set previous page
581 @param[in] prev_page_no previous page no */
582 inline void PageBulk::setPrev(ulint prev_page_no)
583 {
584 if (UNIV_LIKELY_NULL(m_page_zip)) {
585 /* For ROW_FORMAT=COMPRESSED, redo log may be written
586 in PageBulk::compress(). */
587 mach_write_to_4(m_page + FIL_PAGE_PREV, prev_page_no);
588 } else {
589 mlog_write_ulint(m_page + FIL_PAGE_PREV, prev_page_no,
590 MLOG_4BYTES, &m_mtr);
waitUntilBlioWriteBufferIsEmpty(PaMacBlio * blio,double sampleRate,size_t framesPerBuffer)591 }
592 }
593
594 /** Check if required space is available in the page for the rec to be inserted.
595 We check fill factor & padding here.
596 @param[in] length required length
597 @return true if space is available */
598 bool
599 PageBulk::isSpaceAvailable(
600 ulint rec_size)
601 {
602 ulint slot_size;
603 ulint required_space;
604
605 slot_size = page_dir_calc_reserved_space(m_rec_no + 1)
606 - page_dir_calc_reserved_space(m_rec_no);
607
608 required_space = rec_size + slot_size;
609
610 if (required_space > m_free_space) {
611 ut_ad(m_rec_no > 0);
612 return false;
613 }
614
615 /* Fillfactor & Padding apply to both leaf and non-leaf pages.
616 Note: we keep at least 2 records in a page to avoid B-tree level
617 growing too high. */
618 if (m_rec_no >= 2
619 && ((m_page_zip == NULL && m_free_space - required_space
620 < m_reserved_space)
621 || (m_page_zip != NULL && m_free_space - required_space
622 < m_padding_space))) {
623 return(false);
624 }
625
626 return(true);
627 }
628
629 /** Check whether the record needs to be stored externally.
GetStreamWriteAvailable(PaStream * stream)630 @return false if the entire record can be stored locally on the page */
631 bool
632 PageBulk::needExt(
633 const dtuple_t* tuple,
634 ulint rec_size)
635 {
636 return(page_zip_rec_needs_ext(rec_size, m_is_comp,
637 dtuple_get_n_fields(tuple), m_block->page.size));
638 }
639
640 /** Store external record
641 Since the record is not logged yet, so we don't log update to the record.
642 the blob data is logged first, then the record is logged in bulk mode.
643 @param[in] big_rec external recrod
644 @param[in] offsets record offsets
645 @return error code */
646 dberr_t
647 PageBulk::storeExt(
648 const big_rec_t* big_rec,
649 rec_offs* offsets)
650 {
651 finish();
652
653 /* Note: not all fields are initialized in btr_pcur. */
654 btr_pcur_t btr_pcur;
655 btr_pcur.pos_state = BTR_PCUR_IS_POSITIONED;
656 btr_pcur.latch_mode = BTR_MODIFY_LEAF;
657 btr_pcur.btr_cur.index = m_index;
658 btr_pcur.btr_cur.page_cur.index = m_index;
659 btr_pcur.btr_cur.page_cur.rec = m_cur_rec;
660 btr_pcur.btr_cur.page_cur.offsets = offsets;
661 btr_pcur.btr_cur.page_cur.block = m_block;
662
663 dberr_t err = btr_store_big_rec_extern_fields(
664 &btr_pcur, offsets, big_rec, &m_mtr, BTR_STORE_INSERT_BULK);
665
666 /* Reset m_block and m_cur_rec from page cursor, because
667 block may be changed during blob insert. (FIXME: Can it really?) */
668 ut_ad(m_block == btr_pcur.btr_cur.page_cur.block);
669
670 m_block = btr_pcur.btr_cur.page_cur.block;
671 m_cur_rec = btr_pcur.btr_cur.page_cur.rec;
672 m_page = buf_block_get_frame(m_block);
673
674 return(err);
675 }
676
677 /** Release block by commiting mtr
678 Note: log_free_check requires holding no lock/latch in current thread. */
679 void
680 PageBulk::release()
681 {
682 finish();
683
684 /* We fix the block because we will re-pin it soon. */
685 buf_block_buf_fix_inc(m_block, __FILE__, __LINE__);
686
687 /* No other threads can modify this block. */
688 m_modify_clock = buf_block_get_modify_clock(m_block);
689
690 m_mtr.commit();
691 }
692
693 /** Start mtr and latch the block */
694 dberr_t
695 PageBulk::latch()
696 {
697 m_mtr.start();
698
699 if (m_flush_observer) {
700 m_mtr.set_log_mode(MTR_LOG_NO_REDO);
701 m_mtr.set_flush_observer(m_flush_observer);
702 } else {
703 m_index->set_modified(m_mtr);
704 }
705
706 ut_ad(m_block->page.buf_fix_count);
707
708 /* In case the block is S-latched by page_cleaner. */
709 if (!buf_page_optimistic_get(RW_X_LATCH, m_block, m_modify_clock,
710 __FILE__, __LINE__, &m_mtr)) {
711 m_block = buf_page_get_gen(page_id_t(m_index->table->space_id,
712 m_page_no),
713 univ_page_size, RW_X_LATCH,
714 m_block, BUF_GET_IF_IN_POOL,
715 __FILE__, __LINE__, &m_mtr, &m_err);
716
717 if (m_err != DB_SUCCESS) {
718 return (m_err);
719 }
720
721 ut_ad(m_block != NULL);
722 }
723
724 buf_block_buf_fix_dec(m_block);
725
726 ut_ad(m_block->page.buf_fix_count);
727
728 ut_ad(m_cur_rec > m_page && m_cur_rec < m_heap_top);
729
730 return (m_err);
731 }
732
733 /** Split a page
734 @param[in] page_bulk page to split
735 @param[in] next_page_bulk next page
736 @return error code */
737 dberr_t
738 BtrBulk::pageSplit(
739 PageBulk* page_bulk,
740 PageBulk* next_page_bulk)
741 {
742 ut_ad(page_bulk->getPageZip() != NULL);
743
744 if (page_bulk->getRecNo() <= 1) {
745 return(DB_TOO_BIG_RECORD);
746 }
747
748 /* Initialize a new page */
749 PageBulk new_page_bulk(m_index, m_trx->id, FIL_NULL,
750 page_bulk->getLevel(), m_flush_observer);
751 dberr_t err = new_page_bulk.init();
752 if (err != DB_SUCCESS) {
753 return(err);
754 }
755
756 /* Copy the upper half to the new page. */
757 rec_t* split_rec = page_bulk->getSplitRec();
758 new_page_bulk.copyIn(split_rec);
759 page_bulk->copyOut(split_rec);
760
761 /* Commit the pages after split. */
762 err = pageCommit(page_bulk, &new_page_bulk, true);
763 if (err != DB_SUCCESS) {
764 pageAbort(&new_page_bulk);
765 return(err);
766 }
767
768 err = pageCommit(&new_page_bulk, next_page_bulk, true);
769 if (err != DB_SUCCESS) {
770 pageAbort(&new_page_bulk);
771 return(err);
772 }
773
774 return(err);
775 }
776
777 /** Commit(finish) a page. We set next/prev page no, compress a page of
778 compressed table and split the page if compression fails, insert a node
779 pointer to father page if needed, and commit mini-transaction.
780 @param[in] page_bulk page to commit
781 @param[in] next_page_bulk next page
782 @param[in] insert_father false when page_bulk is a root page and
783 true when it's a non-root page
784 @return error code */
785 dberr_t
786 BtrBulk::pageCommit(
787 PageBulk* page_bulk,
788 PageBulk* next_page_bulk,
789 bool insert_father)
790 {
791 page_bulk->finish();
792
793 /* Set page links */
794 if (next_page_bulk != NULL) {
795 ut_ad(page_bulk->getLevel() == next_page_bulk->getLevel());
796
797 page_bulk->setNext(next_page_bulk->getPageNo());
798 next_page_bulk->setPrev(page_bulk->getPageNo());
799 } else {
800 /** Suppose a page is released and latched again, we need to
801 mark it modified in mini-transaction. */
802 page_bulk->setNext(FIL_NULL);
803 }
804
805 ut_ad(!rw_lock_own_flagged(&m_index->lock,
806 RW_LOCK_FLAG_X | RW_LOCK_FLAG_SX
807 | RW_LOCK_FLAG_S));
808
809 /* Compress page if it's a compressed table. */
810 if (page_bulk->getPageZip() != NULL && !page_bulk->compress()) {
811 return(pageSplit(page_bulk, next_page_bulk));
812 }
813
814 /* Insert node pointer to father page. */
815 if (insert_father) {
816 dtuple_t* node_ptr = page_bulk->getNodePtr();
817 dberr_t err = insert(node_ptr, page_bulk->getLevel()+1);
818
819 if (err != DB_SUCCESS) {
820 return(err);
821 }
822 }
823
824 /* Commit mtr. */
825 page_bulk->commit(true);
826
827 return(DB_SUCCESS);
828 }
829
830 /** Log free check */
831 inline void BtrBulk::logFreeCheck()
832 {
833 if (log_sys.check_flush_or_checkpoint) {
834 release();
835
836 log_free_check();
837
838 latch();
839 }
840 }
841
842 /** Release all latches */
843 void
844 BtrBulk::release()
845 {
846 ut_ad(m_root_level + 1 == m_page_bulks.size());
847
848 for (ulint level = 0; level <= m_root_level; level++) {
849 PageBulk* page_bulk = m_page_bulks.at(level);
850
851 page_bulk->release();
852 }
853 }
854
855 /** Re-latch all latches */
856 void
857 BtrBulk::latch()
858 {
859 ut_ad(m_root_level + 1 == m_page_bulks.size());
860
861 for (ulint level = 0; level <= m_root_level; level++) {
862 PageBulk* page_bulk = m_page_bulks.at(level);
863 page_bulk->latch();
864 }
865 }
866
867 /** Insert a tuple to page in a level
868 @param[in] tuple tuple to insert
869 @param[in] level B-tree level
870 @return error code */
871 dberr_t
872 BtrBulk::insert(
873 dtuple_t* tuple,
874 ulint level)
875 {
876 bool is_left_most = false;
877 dberr_t err = DB_SUCCESS;
878
879 /* Check if we need to create a PageBulk for the level. */
880 if (level + 1 > m_page_bulks.size()) {
881 PageBulk* new_page_bulk
882 = UT_NEW_NOKEY(PageBulk(m_index, m_trx->id, FIL_NULL,
883 level, m_flush_observer));
884 err = new_page_bulk->init();
885 if (err != DB_SUCCESS) {
886 UT_DELETE(new_page_bulk);
887 return(err);
888 }
889
890 m_page_bulks.push_back(new_page_bulk);
891 ut_ad(level + 1 == m_page_bulks.size());
892 m_root_level = level;
893
894 is_left_most = true;
895 }
896
897 ut_ad(m_page_bulks.size() > level);
898
899 PageBulk* page_bulk = m_page_bulks.at(level);
900
901 if (is_left_most && level > 0 && page_bulk->getRecNo() == 0) {
902 /* The node pointer must be marked as the predefined minimum
903 record, as there is no lower alphabetical limit to records in
904 the leftmost node of a level: */
905 dtuple_set_info_bits(tuple, dtuple_get_info_bits(tuple)
906 | REC_INFO_MIN_REC_FLAG);
907 }
908
909 ulint n_ext = 0;
910 ulint rec_size = rec_get_converted_size(m_index, tuple, n_ext);
911 big_rec_t* big_rec = NULL;
912 rec_t* rec = NULL;
913 rec_offs* offsets = NULL;
914
915 if (page_bulk->needExt(tuple, rec_size)) {
916 /* The record is so big that we have to store some fields
917 externally on separate database pages */
918 big_rec = dtuple_convert_big_rec(m_index, 0, tuple, &n_ext);
919
920 if (big_rec == NULL) {
921 return(DB_TOO_BIG_RECORD);
922 }
923
924 rec_size = rec_get_converted_size(m_index, tuple, n_ext);
925 }
926
927 if (page_bulk->getPageZip() != NULL
928 && page_zip_is_too_big(m_index, tuple)) {
929 err = DB_TOO_BIG_RECORD;
930 goto func_exit;
931 }
932
933 if (!page_bulk->isSpaceAvailable(rec_size)) {
934 /* Create a sibling page_bulk. */
935 PageBulk* sibling_page_bulk;
936 sibling_page_bulk = UT_NEW_NOKEY(PageBulk(m_index, m_trx->id,
937 FIL_NULL, level,
938 m_flush_observer));
939 err = sibling_page_bulk->init();
940 if (err != DB_SUCCESS) {
941 UT_DELETE(sibling_page_bulk);
942 goto func_exit;
943 }
944
945 /* Commit page bulk. */
946 err = pageCommit(page_bulk, sibling_page_bulk, true);
947 if (err != DB_SUCCESS) {
948 pageAbort(sibling_page_bulk);
949 UT_DELETE(sibling_page_bulk);
950 goto func_exit;
951 }
952
953 /* Set new page bulk to page_bulks. */
954 ut_ad(sibling_page_bulk->getLevel() <= m_root_level);
955 m_page_bulks.at(level) = sibling_page_bulk;
956
957 UT_DELETE(page_bulk);
958 page_bulk = sibling_page_bulk;
959
960 /* Important: log_free_check whether we need a checkpoint. */
961 if (page_is_leaf(sibling_page_bulk->getPage())) {
962 if (trx_is_interrupted(m_trx)) {
963 if (m_flush_observer) {
964 m_flush_observer->interrupted();
965 }
966
967 err = DB_INTERRUPTED;
968 goto func_exit;
969 }
970
971 /* Wake up page cleaner to flush dirty pages. */
972 srv_inc_activity_count();
973 os_event_set(buf_flush_event);
974
975 logFreeCheck();
976 }
977
978 }
979
980 /* Convert tuple to rec. */
981 rec = rec_convert_dtuple_to_rec(static_cast<byte*>(mem_heap_alloc(
982 page_bulk->m_heap, rec_size)), m_index, tuple, n_ext);
983 offsets = rec_get_offsets(rec, m_index, offsets, level
984 ? 0 : m_index->n_core_fields,
985 ULINT_UNDEFINED, &page_bulk->m_heap);
986
987 page_bulk->insert(rec, offsets);
988
989 if (big_rec != NULL) {
990 ut_ad(dict_index_is_clust(m_index));
991 ut_ad(page_bulk->getLevel() == 0);
992 ut_ad(page_bulk == m_page_bulks.at(0));
993
994 /* Release all pages above the leaf level */
995 for (ulint level = 1; level <= m_root_level; level++) {
996 m_page_bulks.at(level)->release();
997 }
998
999 err = page_bulk->storeExt(big_rec, offsets);
1000
1001 /* Latch */
1002 for (ulint level = 1; level <= m_root_level; level++) {
1003 PageBulk* page_bulk = m_page_bulks.at(level);
1004 page_bulk->latch();
1005 }
1006 }
1007
1008 func_exit:
1009 if (big_rec != NULL) {
1010 dtuple_convert_back_big_rec(m_index, tuple, big_rec);
1011 }
1012
1013 return(err);
1014 }
1015
1016 /** Btree bulk load finish. We commit the last page in each level
1017 and copy the last page in top level to the root page of the index
1018 if no error occurs.
1019 @param[in] err whether bulk load was successful until now
1020 @return error code */
1021 dberr_t
1022 BtrBulk::finish(dberr_t err)
1023 {
1024 ulint last_page_no = FIL_NULL;
1025
1026 ut_ad(!m_index->table->is_temporary());
1027
1028 if (m_page_bulks.size() == 0) {
1029 /* The table is empty. The root page of the index tree
1030 is already in a consistent state. No need to flush. */
1031 return(err);
1032 }
1033
1034 ut_ad(m_root_level + 1 == m_page_bulks.size());
1035
1036 /* Finish all page bulks */
1037 for (ulint level = 0; level <= m_root_level; level++) {
1038 PageBulk* page_bulk = m_page_bulks.at(level);
1039
1040 last_page_no = page_bulk->getPageNo();
1041
1042 if (err == DB_SUCCESS) {
1043 err = pageCommit(page_bulk, NULL,
1044 level != m_root_level);
1045 }
1046
1047 if (err != DB_SUCCESS) {
1048 pageAbort(page_bulk);
1049 }
1050
1051 UT_DELETE(page_bulk);
1052 }
1053
1054 if (err == DB_SUCCESS) {
1055 rec_t* first_rec;
1056 mtr_t mtr;
1057 buf_block_t* last_block;
1058 PageBulk root_page_bulk(m_index, m_trx->id,
1059 m_index->page, m_root_level,
1060 m_flush_observer);
1061
1062 mtr.start();
1063 m_index->set_modified(mtr);
1064 mtr_x_lock_index(m_index, &mtr);
1065
1066 ut_ad(last_page_no != FIL_NULL);
1067 last_block = btr_block_get(
1068 page_id_t(m_index->table->space_id, last_page_no),
1069 page_size_t(m_index->table->space->flags),
1070 RW_X_LATCH, m_index, &mtr);
1071 first_rec = page_rec_get_next(
1072 page_get_infimum_rec(last_block->frame));
1073 ut_ad(page_rec_is_user_rec(first_rec));
1074
1075 /* Copy last page to root page. */
1076 err = root_page_bulk.init();
1077 if (err != DB_SUCCESS) {
1078 mtr.commit();
1079 return(err);
1080 }
1081 root_page_bulk.copyIn(first_rec);
1082 root_page_bulk.finish();
1083
1084 /* Remove last page. */
1085 btr_page_free(m_index, last_block, &mtr);
1086
1087 /* Do not flush the last page. */
1088 last_block->page.flush_observer = NULL;
1089
1090 mtr.commit();
1091
1092 err = pageCommit(&root_page_bulk, NULL, false);
1093 ut_ad(err == DB_SUCCESS);
1094 }
1095
1096 ut_ad(!sync_check_iterate(dict_sync_check()));
1097
1098 ut_ad(err != DB_SUCCESS
1099 || btr_validate_index(m_index, NULL, false) == DB_SUCCESS);
1100 return(err);
1101 }
1102