1 /*****************************************************************************
2
3 Copyright (c) 2014, 2021, Oracle and/or its affiliates.
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License, version 2.0,
7 as published by the Free Software Foundation.
8
9 This program is also distributed with certain software (including
10 but not limited to OpenSSL) that is licensed under separate terms,
11 as designated in a particular file or component or in included license
12 documentation. The authors of MySQL hereby grant you an additional
13 permission to link the program and your derivative works with the
14 separately licensed software that they have included with MySQL.
15
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License, version 2.0, for more details.
20
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
24
25 *****************************************************************************/
26
27 /**************************************************//**
28 @file btr/btr0bulk.cc
29 The B-tree bulk load
30
31 Created 03/11/2014 Shaohua Wang
32 *******************************************************/
33
34 #include "btr0bulk.h"
35 #include "btr0btr.h"
36 #include "btr0cur.h"
37 #include "btr0pcur.h"
38 #include "ibuf0ibuf.h"
39
40 /** Innodb B-tree index fill factor for bulk load. */
41 long innobase_fill_factor;
42
43 /** Initialize members, allocate page if needed and start mtr.
44 Note: we commit all mtrs on failure.
45 @return error code. */
46 dberr_t
init()47 PageBulk::init()
48 {
49 mtr_t* mtr;
50 buf_block_t* new_block;
51 page_t* new_page;
52 page_zip_des_t* new_page_zip;
53 ulint new_page_no;
54
55 ut_ad(m_heap == NULL);
56 m_heap = mem_heap_create(1000);
57
58 mtr = static_cast<mtr_t*>(
59 mem_heap_alloc(m_heap, sizeof(mtr_t)));
60 mtr_start(mtr);
61
62 if (!dict_index_is_online_ddl(m_index)) {
63 mtr_x_lock(dict_index_get_lock(m_index), mtr);
64 }
65
66 mtr_set_log_mode(mtr, MTR_LOG_NO_REDO);
67 mtr_set_flush_observer(mtr, m_flush_observer);
68
69 if (m_page_no == FIL_NULL) {
70 mtr_t alloc_mtr;
71
72 /* We commit redo log for allocation by a separate mtr,
73 because we don't guarantee pages are committed following
74 the allocation order, and we will always generate redo log
75 for page allocation, even when creating a new tablespace. */
76 mtr_start(&alloc_mtr);
77 alloc_mtr.set_named_space(dict_index_get_space(m_index));
78
79 ulint n_reserved;
80 bool success;
81 success = fsp_reserve_free_extents(&n_reserved, m_index->space,
82 1, FSP_NORMAL, &alloc_mtr);
83 if (!success) {
84 mtr_commit(&alloc_mtr);
85 mtr_commit(mtr);
86 return(DB_OUT_OF_FILE_SPACE);
87 }
88
89 /* Allocate a new page. */
90 new_block = btr_page_alloc(m_index, 0, FSP_UP, m_level,
91 &alloc_mtr, mtr);
92
93 if (n_reserved > 0) {
94 fil_space_release_free_extents(m_index->space,
95 n_reserved);
96 }
97
98 mtr_commit(&alloc_mtr);
99
100 new_page = buf_block_get_frame(new_block);
101 new_page_zip = buf_block_get_page_zip(new_block);
102 new_page_no = page_get_page_no(new_page);
103
104 if (new_page_zip) {
105 page_create_zip(new_block, m_index, m_level, 0,
106 NULL, mtr);
107 } else {
108 ut_ad(!dict_index_is_spatial(m_index));
109 page_create(new_block, mtr,
110 dict_table_is_comp(m_index->table),
111 false);
112 btr_page_set_level(new_page, NULL, m_level, mtr);
113 }
114
115 btr_page_set_next(new_page, NULL, FIL_NULL, mtr);
116 btr_page_set_prev(new_page, NULL, FIL_NULL, mtr);
117
118 btr_page_set_index_id(new_page, NULL, m_index->id, mtr);
119 } else {
120 page_id_t page_id(dict_index_get_space(m_index), m_page_no);
121 page_size_t page_size(dict_table_page_size(m_index->table));
122
123 new_block = btr_block_get(page_id, page_size,
124 RW_X_LATCH, m_index, mtr);
125
126 new_page = buf_block_get_frame(new_block);
127 new_page_zip = buf_block_get_page_zip(new_block);
128 new_page_no = page_get_page_no(new_page);
129 ut_ad(m_page_no == new_page_no);
130
131 ut_ad(page_dir_get_n_heap(new_page) == PAGE_HEAP_NO_USER_LOW);
132
133 btr_page_set_level(new_page, NULL, m_level, mtr);
134 }
135
136 if (dict_index_is_sec_or_ibuf(m_index)
137 && !dict_table_is_temporary(m_index->table)
138 && page_is_leaf(new_page)) {
139 page_update_max_trx_id(new_block, NULL, m_trx_id, mtr);
140 }
141
142 m_mtr = mtr;
143 m_block = new_block;
144 m_block->skip_flush_check = true;
145 m_page = new_page;
146 m_page_zip = new_page_zip;
147 m_page_no = new_page_no;
148 m_cur_rec = page_get_infimum_rec(new_page);
149 ut_ad(m_is_comp == !!page_is_comp(new_page));
150 m_free_space = page_get_free_space_of_empty(m_is_comp);
151
152 if (innobase_fill_factor == 100 && dict_index_is_clust(m_index)) {
153 /* Keep default behavior compatible with 5.6 */
154 m_reserved_space = dict_index_get_space_reserve();
155 } else {
156 m_reserved_space =
157 UNIV_PAGE_SIZE * (100 - innobase_fill_factor) / 100;
158 }
159
160 m_padding_space =
161 UNIV_PAGE_SIZE - dict_index_zip_pad_optimal_page_size(m_index);
162 m_heap_top = page_header_get_ptr(new_page, PAGE_HEAP_TOP);
163 m_rec_no = page_header_get_field(new_page, PAGE_N_RECS);
164
165 ut_d(m_total_data = 0);
166 page_header_set_field(m_page, NULL, PAGE_HEAP_TOP, UNIV_PAGE_SIZE - 1);
167
168 return(DB_SUCCESS);
169 }
170
171 /** Insert a record in the page.
172 @param[in] rec record
173 @param[in] offsets record offsets */
174 void
insert(const rec_t * rec,ulint * offsets)175 PageBulk::insert(
176 const rec_t* rec,
177 ulint* offsets)
178 {
179 ulint rec_size;
180
181 ut_ad(m_heap != NULL);
182
183 rec_size = rec_offs_size(offsets);
184
185 #ifdef UNIV_DEBUG
186 /* Check whether records are in order. */
187 if (!page_rec_is_infimum(m_cur_rec)) {
188 rec_t* old_rec = m_cur_rec;
189 ulint* old_offsets = rec_get_offsets(
190 old_rec, m_index, NULL, ULINT_UNDEFINED, &m_heap);
191
192 ut_ad(cmp_rec_rec(rec, old_rec, offsets, old_offsets, m_index,
193 page_is_spatial_non_leaf(old_rec, m_index))
194 > 0);
195 }
196
197 m_total_data += rec_size;
198 #endif /* UNIV_DEBUG */
199
200 /* 1. Copy the record to page. */
201 rec_t* insert_rec = rec_copy(m_heap_top, rec, offsets);
202 rec_offs_make_valid(insert_rec, m_index, offsets);
203
204 /* 2. Insert the record in the linked list. */
205 rec_t* next_rec = page_rec_get_next(m_cur_rec);
206
207 page_rec_set_next(insert_rec, next_rec);
208 page_rec_set_next(m_cur_rec, insert_rec);
209
210 /* 3. Set the n_owned field in the inserted record to zero,
211 and set the heap_no field. */
212 if (m_is_comp) {
213 rec_set_n_owned_new(insert_rec, NULL, 0);
214 rec_set_heap_no_new(insert_rec,
215 PAGE_HEAP_NO_USER_LOW + m_rec_no);
216 } else {
217 rec_set_n_owned_old(insert_rec, 0);
218 rec_set_heap_no_old(insert_rec,
219 PAGE_HEAP_NO_USER_LOW + m_rec_no);
220 }
221
222 /* 4. Set member variables. */
223 ulint slot_size;
224 slot_size = page_dir_calc_reserved_space(m_rec_no + 1)
225 - page_dir_calc_reserved_space(m_rec_no);
226
227 ut_ad(m_free_space >= rec_size + slot_size);
228 ut_ad(m_heap_top + rec_size < m_page + UNIV_PAGE_SIZE);
229
230 m_free_space -= rec_size + slot_size;
231 m_heap_top += rec_size;
232 m_rec_no += 1;
233 m_cur_rec = insert_rec;
234 }
235
236 /** Mark end of insertion to the page. Scan all records to set page dirs,
237 and set page header members.
238 Note: we refer to page_copy_rec_list_end_to_created_page. */
239 void
finish()240 PageBulk::finish()
241 {
242 ut_ad(m_rec_no > 0);
243
244 #ifdef UNIV_DEBUG
245 ut_ad(m_total_data + page_dir_calc_reserved_space(m_rec_no)
246 <= page_get_free_space_of_empty(m_is_comp));
247
248 /* To pass the debug tests we have to set these dummy values
249 in the debug version */
250 page_dir_set_n_slots(m_page, NULL, UNIV_PAGE_SIZE / 2);
251 #endif
252
253 ulint count = 0;
254 ulint n_recs = 0;
255 ulint slot_index = 0;
256 rec_t* insert_rec = page_rec_get_next(page_get_infimum_rec(m_page));
257 page_dir_slot_t* slot = NULL;
258
259 /* Set owner & dir. */
260 do {
261
262 count++;
263 n_recs++;
264
265 if (count == (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2) {
266
267 slot_index++;
268
269 slot = page_dir_get_nth_slot(m_page, slot_index);
270
271 page_dir_slot_set_rec(slot, insert_rec);
272 page_dir_slot_set_n_owned(slot, NULL, count);
273
274 count = 0;
275 }
276
277 insert_rec = page_rec_get_next(insert_rec);
278 } while (!page_rec_is_supremum(insert_rec));
279
280 if (slot_index > 0
281 && (count + 1 + (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2
282 <= PAGE_DIR_SLOT_MAX_N_OWNED)) {
283 /* We can merge the two last dir slots. This operation is
284 here to make this function imitate exactly the equivalent
285 task made using page_cur_insert_rec, which we use in database
286 recovery to reproduce the task performed by this function.
287 To be able to check the correctness of recovery, it is good
288 that it imitates exactly. */
289
290 count += (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2;
291
292 page_dir_slot_set_n_owned(slot, NULL, 0);
293
294 slot_index--;
295 }
296
297 slot = page_dir_get_nth_slot(m_page, 1 + slot_index);
298 page_dir_slot_set_rec(slot, page_get_supremum_rec(m_page));
299 page_dir_slot_set_n_owned(slot, NULL, count + 1);
300
301 ut_ad(!dict_index_is_spatial(m_index));
302 page_dir_set_n_slots(m_page, NULL, 2 + slot_index);
303 page_header_set_ptr(m_page, NULL, PAGE_HEAP_TOP, m_heap_top);
304 page_dir_set_n_heap(m_page, NULL, PAGE_HEAP_NO_USER_LOW + m_rec_no);
305 page_header_set_field(m_page, NULL, PAGE_N_RECS, m_rec_no);
306
307 page_header_set_ptr(m_page, NULL, PAGE_LAST_INSERT, m_cur_rec);
308 page_header_set_field(m_page, NULL, PAGE_DIRECTION, PAGE_RIGHT);
309 page_header_set_field(m_page, NULL, PAGE_N_DIRECTION, 0);
310
311 m_block->skip_flush_check = false;
312 }
313
314 /** Commit inserts done to the page
315 @param[in] success Flag whether all inserts succeed. */
316 void
commit(bool success)317 PageBulk::commit(
318 bool success)
319 {
320 if (success) {
321 ut_ad(page_validate(m_page, m_index));
322
323 /* Set no free space left and no buffered changes in ibuf. */
324 if (!dict_index_is_clust(m_index)
325 && !dict_table_is_temporary(m_index->table)
326 && page_is_leaf(m_page)) {
327 ibuf_set_bitmap_for_bulk_load(
328 m_block, innobase_fill_factor == 100);
329 }
330 }
331
332 mtr_commit(m_mtr);
333 }
334
335 /** Compress a page of compressed table
336 @return true compress successfully or no need to compress
337 @return false compress failed. */
338 bool
compress()339 PageBulk::compress()
340 {
341 ut_ad(m_page_zip != NULL);
342
343 return(page_zip_compress(m_page_zip, m_page, m_index,
344 page_zip_level, NULL, m_mtr));
345 }
346
347 /** Get node pointer
348 @return node pointer */
349 dtuple_t*
getNodePtr()350 PageBulk::getNodePtr()
351 {
352 rec_t* first_rec;
353 dtuple_t* node_ptr;
354
355 /* Create node pointer */
356 first_rec = page_rec_get_next(page_get_infimum_rec(m_page));
357 ut_a(page_rec_is_user_rec(first_rec));
358 node_ptr = dict_index_build_node_ptr(m_index, first_rec, m_page_no,
359 m_heap, m_level);
360
361 return(node_ptr);
362 }
363
364 /** Get split rec in left page.We split a page in half when compresssion fails,
365 and the split rec will be copied to right page.
366 @return split rec */
367 rec_t*
getSplitRec()368 PageBulk::getSplitRec()
369 {
370 rec_t* rec;
371 ulint* offsets;
372 ulint total_used_size;
373 ulint total_recs_size;
374 ulint n_recs;
375
376 ut_ad(m_page_zip != NULL);
377 ut_ad(m_rec_no >= 2);
378
379 ut_ad(page_get_free_space_of_empty(m_is_comp) > m_free_space);
380 total_used_size = page_get_free_space_of_empty(m_is_comp)
381 - m_free_space;
382
383 total_recs_size = 0;
384 n_recs = 0;
385 offsets = NULL;
386 rec = page_get_infimum_rec(m_page);
387
388 do {
389 rec = page_rec_get_next(rec);
390 ut_ad(page_rec_is_user_rec(rec));
391
392 offsets = rec_get_offsets(rec, m_index,
393 offsets, ULINT_UNDEFINED,
394 &(m_heap));
395 total_recs_size += rec_offs_size(offsets);
396 n_recs++;
397 } while (total_recs_size + page_dir_calc_reserved_space(n_recs)
398 < total_used_size / 2);
399
400 /* Keep at least one record on left page */
401 if (page_rec_is_infimum(page_rec_get_prev(rec))) {
402 rec = page_rec_get_next(rec);
403 ut_ad(page_rec_is_user_rec(rec));
404 }
405
406 return(rec);
407 }
408
409 /** Copy all records after split rec including itself.
410 @param[in] rec split rec */
411 void
copyIn(rec_t * split_rec)412 PageBulk::copyIn(
413 rec_t* split_rec)
414 {
415
416 rec_t* rec = split_rec;
417 ulint* offsets = NULL;
418
419 ut_ad(m_rec_no == 0);
420 ut_ad(page_rec_is_user_rec(rec));
421
422 do {
423 offsets = rec_get_offsets(rec, m_index, offsets,
424 ULINT_UNDEFINED, &(m_heap));
425
426 insert(rec, offsets);
427
428 rec = page_rec_get_next(rec);
429 } while (!page_rec_is_supremum(rec));
430
431 ut_ad(m_rec_no > 0);
432 }
433
434 /** Remove all records after split rec including itself.
435 @param[in] rec split rec */
436 void
copyOut(rec_t * split_rec)437 PageBulk::copyOut(
438 rec_t* split_rec)
439 {
440 rec_t* rec;
441 rec_t* last_rec;
442 ulint n;
443
444 /* Suppose before copyOut, we have 5 records on the page:
445 infimum->r1->r2->r3->r4->r5->supremum, and r3 is the split rec.
446
447 after copyOut, we have 2 records on the page:
448 infimum->r1->r2->supremum. slot ajustment is not done. */
449
450 rec = page_rec_get_next(page_get_infimum_rec(m_page));
451 last_rec = page_rec_get_prev(page_get_supremum_rec(m_page));
452 n = 0;
453
454 while (rec != split_rec) {
455 rec = page_rec_get_next(rec);
456 n++;
457 }
458
459 ut_ad(n > 0);
460
461 /* Set last record's next in page */
462 ulint* offsets = NULL;
463 rec = page_rec_get_prev(split_rec);
464 offsets = rec_get_offsets(rec, m_index,
465 offsets, ULINT_UNDEFINED,
466 &(m_heap));
467 page_rec_set_next(rec, page_get_supremum_rec(m_page));
468
469 /* Set related members */
470 m_cur_rec = rec;
471 m_heap_top = rec_get_end(rec, offsets);
472
473 offsets = rec_get_offsets(last_rec, m_index,
474 offsets, ULINT_UNDEFINED,
475 &(m_heap));
476
477 m_free_space += rec_get_end(last_rec, offsets)
478 - m_heap_top
479 + page_dir_calc_reserved_space(m_rec_no)
480 - page_dir_calc_reserved_space(n);
481 ut_ad(m_free_space > 0);
482 m_rec_no = n;
483
484 #ifdef UNIV_DEBUG
485 m_total_data -= rec_get_end(last_rec, offsets) - m_heap_top;
486 #endif /* UNIV_DEBUG */
487 }
488
489 /** Set next page
490 @param[in] next_page_no next page no */
491 void
setNext(ulint next_page_no)492 PageBulk::setNext(
493 ulint next_page_no)
494 {
495 btr_page_set_next(m_page, NULL, next_page_no, m_mtr);
496 }
497
498 /** Set previous page
499 @param[in] prev_page_no previous page no */
500 void
setPrev(ulint prev_page_no)501 PageBulk::setPrev(
502 ulint prev_page_no)
503 {
504 btr_page_set_prev(m_page, NULL, prev_page_no, m_mtr);
505 }
506
507 /** Check if required space is available in the page for the rec to be inserted.
508 We check fill factor & padding here.
509 @param[in] length required length
510 @return true if space is available */
511 bool
isSpaceAvailable(ulint rec_size)512 PageBulk::isSpaceAvailable(
513 ulint rec_size)
514 {
515 ulint slot_size;
516 ulint required_space;
517
518 slot_size = page_dir_calc_reserved_space(m_rec_no + 1)
519 - page_dir_calc_reserved_space(m_rec_no);
520
521 required_space = rec_size + slot_size;
522
523 if (required_space > m_free_space) {
524 ut_ad(m_rec_no > 0);
525 return false;
526 }
527
528 /* Fillfactor & Padding apply to both leaf and non-leaf pages.
529 Note: we keep at least 2 records in a page to avoid B-tree level
530 growing too high. */
531 if (m_rec_no >= 2
532 && ((m_page_zip == NULL && m_free_space - required_space
533 < m_reserved_space)
534 || (m_page_zip != NULL && m_free_space - required_space
535 < m_padding_space))) {
536 return(false);
537 }
538
539 return(true);
540 }
541
542 /** Check whether the record needs to be stored externally.
543 @return false if the entire record can be stored locally on the page */
544 bool
needExt(const dtuple_t * tuple,ulint rec_size)545 PageBulk::needExt(
546 const dtuple_t* tuple,
547 ulint rec_size)
548 {
549 return(page_zip_rec_needs_ext(rec_size, m_is_comp,
550 dtuple_get_n_fields(tuple), m_block->page.size));
551 }
552
553 /** Store external record
554 Since the record is not logged yet, so we don't log update to the record.
555 the blob data is logged first, then the record is logged in bulk mode.
556 @param[in] big_rec external recrod
557 @param[in] offsets record offsets
558 @return error code */
559 dberr_t
storeExt(const big_rec_t * big_rec,ulint * offsets)560 PageBulk::storeExt(
561 const big_rec_t* big_rec,
562 ulint* offsets)
563 {
564 /* Note: not all fileds are initialized in btr_pcur. */
565 btr_pcur_t btr_pcur;
566 btr_pcur.pos_state = BTR_PCUR_IS_POSITIONED;
567 btr_pcur.latch_mode = BTR_MODIFY_LEAF;
568 btr_pcur.btr_cur.index = m_index;
569
570 page_cur_t* page_cur = &btr_pcur.btr_cur.page_cur;
571 page_cur->index = m_index;
572 page_cur->rec = m_cur_rec;
573 page_cur->offsets = offsets;
574 page_cur->block = m_block;
575
576 dberr_t err = btr_store_big_rec_extern_fields(
577 &btr_pcur, NULL, offsets, big_rec, m_mtr,
578 BTR_STORE_INSERT_BULK);
579
580 ut_ad(page_offset(m_cur_rec) == page_offset(page_cur->rec));
581
582 /* Reset m_block and m_cur_rec from page cursor, because
583 block may be changed during blob insert. */
584 m_block = page_cur->block;
585 m_cur_rec = page_cur->rec;
586 m_page = buf_block_get_frame(m_block);
587
588 return(err);
589 }
590
591 /** Release block by commiting mtr
592 Note: log_free_check requires holding no lock/latch in current thread. */
593 void
release()594 PageBulk::release()
595 {
596 ut_ad(!dict_index_is_spatial(m_index));
597 ut_ad(m_block->page.buf_fix_count > 0);
598
599 /* We fix the block because we will re-pin it soon. */
600 buf_block_buf_fix_inc(m_block, __FILE__, __LINE__);
601
602 /* No other threads can modify this block. */
603 m_modify_clock = buf_block_get_modify_clock(m_block);
604
605 mtr_commit(m_mtr);
606 }
607
608 /** Start mtr and latch the block */
609 void
latch()610 PageBulk::latch()
611 {
612 ibool ret;
613
614 mtr_start(m_mtr);
615
616 if (!dict_index_is_online_ddl(m_index)) {
617 mtr_x_lock(dict_index_get_lock(m_index), m_mtr);
618 }
619
620 mtr_set_log_mode(m_mtr, MTR_LOG_NO_REDO);
621 mtr_set_flush_observer(m_mtr, m_flush_observer);
622
623 ut_ad(m_block->page.buf_fix_count > 0);
624
625 /* TODO: need a simple and wait version of buf_page_optimistic_get. */
626 ret = buf_page_optimistic_get(RW_X_LATCH, m_block, m_modify_clock,
627 __FILE__, __LINE__, m_mtr);
628 /* In case the block is S-latched by page_cleaner. */
629 if (!ret) {
630 page_id_t page_id(dict_index_get_space(m_index), m_page_no);
631 page_size_t page_size(dict_table_page_size(m_index->table));
632
633 m_block = buf_page_get_gen(page_id, page_size, RW_X_LATCH,
634 m_block, BUF_GET_IF_IN_POOL,
635 __FILE__, __LINE__, m_mtr);
636 ut_ad(m_block != NULL);
637 }
638
639 buf_block_buf_fix_dec(m_block);
640 /*
641 The caller is going to use the m_block, so it needs to be buffer-fixed even
642 after the decrement above. This works like this:
643 release(){ //initially buf_fix_count == N > 0
644 buf_fix_count++ // N+1
645 mtr_commit(){
646 buf_fix_count-- // N
647 }
648 }//at the end buf_fix_count == N > 0
649 latch(){//initially buf_fix_count == M > 0
650 buf_page_get_gen/buf_page_optimistic_get internally(){
651 buf_fix_count++ // M+1
652 }
653 buf_fix_count-- // M
654 }//at the end buf_fix_count == M > 0
655 */
656 ut_ad(m_block->page.buf_fix_count > 0);
657
658 ut_ad(m_cur_rec > m_page && m_cur_rec < m_heap_top);
659 }
660
661 #ifdef UNIV_DEBUG
662 /* Check if an index is locked */
isIndexXLocked()663 bool PageBulk::isIndexXLocked() {
664 return (dict_index_is_online_ddl(m_index) &&
665 mtr_memo_contains_flagged(m_mtr, dict_index_get_lock(m_index),
666 MTR_MEMO_X_LOCK | MTR_MEMO_SX_LOCK));
667 }
668 #endif // UNIV_DEBUG
669
670 /** Split a page
671 @param[in] page_bulk page to split
672 @param[in] next_page_bulk next page
673 @return error code */
674 dberr_t
pageSplit(PageBulk * page_bulk,PageBulk * next_page_bulk)675 BtrBulk::pageSplit(
676 PageBulk* page_bulk,
677 PageBulk* next_page_bulk)
678 {
679 ut_ad(page_bulk->getPageZip() != NULL);
680
681 /* 1. Check if we have only one user record on the page. */
682 if (page_bulk->getRecNo() <= 1) {
683 return(DB_TOO_BIG_RECORD);
684 }
685
686 /* 2. create a new page. */
687 PageBulk new_page_bulk(m_index, m_trx_id, FIL_NULL,
688 page_bulk->getLevel(), m_flush_observer);
689 dberr_t err = new_page_bulk.init();
690 if (err != DB_SUCCESS) {
691 return(err);
692 }
693
694 /* 3. copy the upper half to new page. */
695 rec_t* split_rec = page_bulk->getSplitRec();
696 new_page_bulk.copyIn(split_rec);
697 page_bulk->copyOut(split_rec);
698
699 /* 4. commit the splitted page. */
700 err = pageCommit(page_bulk, &new_page_bulk, true);
701 if (err != DB_SUCCESS) {
702 pageAbort(&new_page_bulk);
703 return(err);
704 }
705
706 /* 5. commit the new page. */
707 err = pageCommit(&new_page_bulk, next_page_bulk, true);
708 if (err != DB_SUCCESS) {
709 pageAbort(&new_page_bulk);
710 return(err);
711 }
712
713 return(err);
714 }
715
716 /** Commit(finish) a page. We set next/prev page no, compress a page of
717 compressed table and split the page if compression fails, insert a node
718 pointer to father page if needed, and commit mini-transaction.
719 @param[in] page_bulk page to commit
720 @param[in] next_page_bulk next page
721 @param[in] insert_father false when page_bulk is a root page and
722 true when it's a non-root page
723 @return error code */
724 dberr_t
pageCommit(PageBulk * page_bulk,PageBulk * next_page_bulk,bool insert_father)725 BtrBulk::pageCommit(
726 PageBulk* page_bulk,
727 PageBulk* next_page_bulk,
728 bool insert_father)
729 {
730 page_bulk->finish();
731
732 /* Set page links */
733 if (next_page_bulk != NULL) {
734 ut_ad(page_bulk->getLevel() == next_page_bulk->getLevel());
735
736 page_bulk->setNext(next_page_bulk->getPageNo());
737 next_page_bulk->setPrev(page_bulk->getPageNo());
738 } else {
739 /** Suppose a page is released and latched again, we need to
740 mark it modified in mini-transaction. */
741 page_bulk->setNext(FIL_NULL);
742 }
743
744 /* Assert that no locks are held during bulk load operation
745 in case of a online ddl operation. Insert thread acquires index->lock
746 to check the online status of index. During bulk load index,
747 there are no concurrent insert of reads and hence, there is no
748 need to acquire a lock in that case. */
749 ut_ad(!page_bulk->isIndexXLocked());
750
751 DBUG_EXECUTE_IF("innodb_bulk_load_sleep",
752 os_thread_sleep(1000000););
753
754 /* Compress page if it's a compressed table. */
755 if (page_bulk->getPageZip() != NULL && !page_bulk->compress()) {
756 return(pageSplit(page_bulk, next_page_bulk));
757 }
758
759 /* Insert node pointer to father page. */
760 if (insert_father) {
761 dtuple_t* node_ptr = page_bulk->getNodePtr();
762 dberr_t err = insert(node_ptr, page_bulk->getLevel()+1);
763
764 if (err != DB_SUCCESS) {
765 return(err);
766 }
767 }
768
769 /* Commit mtr. */
770 page_bulk->commit(true);
771
772 return(DB_SUCCESS);
773 }
774
775 /** Log free check */
776 void
logFreeCheck()777 BtrBulk::logFreeCheck()
778 {
779 if (log_sys->check_flush_or_checkpoint) {
780 release();
781
782 log_free_check();
783
784 latch();
785 }
786 }
787
788 /** Release all latches */
789 void
release()790 BtrBulk::release()
791 {
792 ut_ad(m_root_level + 1 == m_page_bulks->size());
793
794 for (ulint level = 0; level <= m_root_level; level++) {
795 PageBulk* page_bulk = m_page_bulks->at(level);
796
797 page_bulk->release();
798 }
799 }
800
801 /** Re-latch all latches */
802 void
latch()803 BtrBulk::latch()
804 {
805 ut_ad(m_root_level + 1 == m_page_bulks->size());
806
807 for (ulint level = 0; level <= m_root_level; level++) {
808 PageBulk* page_bulk = m_page_bulks->at(level);
809 page_bulk->latch();
810 }
811 }
812
813 /** Insert a tuple to page in a level
814 @param[in] tuple tuple to insert
815 @param[in] level B-tree level
816 @return error code */
817 dberr_t
insert(dtuple_t * tuple,ulint level)818 BtrBulk::insert(
819 dtuple_t* tuple,
820 ulint level)
821 {
822 bool is_left_most = false;
823 dberr_t err = DB_SUCCESS;
824
825 ut_ad(m_heap != NULL);
826
827 /* Check if we need to create a PageBulk for the level. */
828 if (level + 1 > m_page_bulks->size()) {
829 PageBulk* new_page_bulk
830 = UT_NEW_NOKEY(PageBulk(m_index, m_trx_id, FIL_NULL,
831 level, m_flush_observer));
832 err = new_page_bulk->init();
833 if (err != DB_SUCCESS) {
834 return(err);
835 }
836
837 DEBUG_SYNC_C("bulk_load_insert");
838 m_page_bulks->push_back(new_page_bulk);
839 ut_ad(level + 1 == m_page_bulks->size());
840 m_root_level = level;
841
842 is_left_most = true;
843 }
844
845 ut_ad(m_page_bulks->size() > level);
846
847 PageBulk* page_bulk = m_page_bulks->at(level);
848
849 if (is_left_most && level > 0 && page_bulk->getRecNo() == 0) {
850 /* The node pointer must be marked as the predefined minimum
851 record, as there is no lower alphabetical limit to records in
852 the leftmost node of a level: */
853 dtuple_set_info_bits(tuple, dtuple_get_info_bits(tuple)
854 | REC_INFO_MIN_REC_FLAG);
855 }
856
857 ulint n_ext = 0;
858 ulint rec_size = rec_get_converted_size(m_index, tuple, n_ext);
859 big_rec_t* big_rec = NULL;
860 rec_t* rec = NULL;
861 ulint* offsets = NULL;
862
863 if (page_bulk->needExt(tuple, rec_size)) {
864 /* The record is so big that we have to store some fields
865 externally on separate database pages */
866 big_rec = dtuple_convert_big_rec(m_index, 0, tuple, &n_ext);
867
868 if (big_rec == NULL) {
869 return(DB_TOO_BIG_RECORD);
870 }
871
872 rec_size = rec_get_converted_size(m_index, tuple, n_ext);
873 }
874
875 if (page_bulk->getPageZip() != NULL
876 && page_zip_is_too_big(m_index, tuple)) {
877 err = DB_TOO_BIG_RECORD;
878 goto func_exit;
879 }
880
881 if (!page_bulk->isSpaceAvailable(rec_size)) {
882 /* Create a sibling page_bulk. */
883 PageBulk* sibling_page_bulk;
884 sibling_page_bulk = UT_NEW_NOKEY(PageBulk(m_index, m_trx_id,
885 FIL_NULL, level,
886 m_flush_observer));
887 err = sibling_page_bulk->init();
888 if (err != DB_SUCCESS) {
889 UT_DELETE(sibling_page_bulk);
890 goto func_exit;
891 }
892
893 /* Commit page bulk. */
894 err = pageCommit(page_bulk, sibling_page_bulk, true);
895 if (err != DB_SUCCESS) {
896 pageAbort(sibling_page_bulk);
897 UT_DELETE(sibling_page_bulk);
898 goto func_exit;
899 }
900
901 /* Set new page bulk to page_bulks. */
902 ut_ad(sibling_page_bulk->getLevel() <= m_root_level);
903 m_page_bulks->at(level) = sibling_page_bulk;
904
905 UT_DELETE(page_bulk);
906 page_bulk = sibling_page_bulk;
907
908 /* Important: log_free_check whether we need a checkpoint. */
909 if (page_is_leaf(sibling_page_bulk->getPage())) {
910 /* Check whether trx is interrupted */
911 if (m_flush_observer->check_interrupted()) {
912 err = DB_INTERRUPTED;
913 goto func_exit;
914 }
915
916 /* Wake up page cleaner to flush dirty pages. */
917 srv_inc_activity_count();
918 os_event_set(buf_flush_event);
919
920 logFreeCheck();
921 }
922
923 }
924
925 /* Convert tuple to rec. */
926 rec = rec_convert_dtuple_to_rec(static_cast<byte*>(mem_heap_alloc(
927 page_bulk->m_heap, rec_size)), m_index, tuple, n_ext);
928 offsets = rec_get_offsets(rec, m_index, offsets, ULINT_UNDEFINED,
929 &(page_bulk->m_heap));
930
931 page_bulk->insert(rec, offsets);
932
933 if (big_rec != NULL) {
934 ut_ad(dict_index_is_clust(m_index));
935 ut_ad(page_bulk->getLevel() == 0);
936 ut_ad(page_bulk == m_page_bulks->at(0));
937
938 /* Release all latched but leaf node. */
939 for (ulint level = 1; level <= m_root_level; level++) {
940 PageBulk* page_bulk = m_page_bulks->at(level);
941
942 page_bulk->release();
943 }
944
945 err = page_bulk->storeExt(big_rec, offsets);
946
947 /* Latch */
948 for (ulint level = 1; level <= m_root_level; level++) {
949 PageBulk* page_bulk = m_page_bulks->at(level);
950 page_bulk->latch();
951 }
952 }
953
954 func_exit:
955 if (big_rec != NULL) {
956 dtuple_convert_back_big_rec(m_index, tuple, big_rec);
957 }
958
959 return(err);
960 }
961
962 /** Btree bulk load finish. We commit the last page in each level
963 and copy the last page in top level to the root page of the index
964 if no error occurs.
965 @param[in] err whether bulk load was successful until now
966 @return error code */
967 dberr_t
finish(dberr_t err)968 BtrBulk::finish(dberr_t err)
969 {
970 ulint last_page_no = FIL_NULL;
971
972 ut_ad(!dict_table_is_temporary(m_index->table));
973
974 #ifdef UNIV_DEBUG
975 /* Assert that the index online status has not changed */
976 ut_ad(m_index->online_status == m_index_online);
977 #endif // UNIV_DEBUG
978
979 if (m_page_bulks->size() == 0) {
980 /* The table is empty. The root page of the index tree
981 is already in a consistent state. No need to flush. */
982 return(err);
983 }
984
985 ut_ad(m_root_level + 1 == m_page_bulks->size());
986
987 /* Finish all page bulks */
988 for (ulint level = 0; level <= m_root_level; level++) {
989 PageBulk* page_bulk = m_page_bulks->at(level);
990
991 last_page_no = page_bulk->getPageNo();
992
993 if (err == DB_SUCCESS) {
994 err = pageCommit(page_bulk, NULL,
995 level != m_root_level);
996 }
997
998 if (err != DB_SUCCESS) {
999 pageAbort(page_bulk);
1000 }
1001
1002 UT_DELETE(page_bulk);
1003 }
1004
1005 if (err == DB_SUCCESS) {
1006 rec_t* first_rec;
1007 mtr_t mtr;
1008 buf_block_t* last_block;
1009 page_t* last_page;
1010 page_id_t page_id(dict_index_get_space(m_index),
1011 last_page_no);
1012 page_size_t page_size(dict_table_page_size(m_index->table));
1013 ulint root_page_no = dict_index_get_page(m_index);
1014 PageBulk root_page_bulk(m_index, m_trx_id,
1015 root_page_no, m_root_level,
1016 m_flush_observer);
1017
1018 mtr_start(&mtr);
1019 mtr.set_named_space(dict_index_get_space(m_index));
1020 mtr_x_lock(dict_index_get_lock(m_index), &mtr);
1021
1022 ut_ad(last_page_no != FIL_NULL);
1023 last_block = btr_block_get(page_id, page_size,
1024 RW_X_LATCH, m_index, &mtr);
1025 last_page = buf_block_get_frame(last_block);
1026 first_rec = page_rec_get_next(page_get_infimum_rec(last_page));
1027 ut_ad(page_rec_is_user_rec(first_rec));
1028
1029 /* Copy last page to root page. */
1030 err = root_page_bulk.init();
1031 if (err != DB_SUCCESS) {
1032 mtr_commit(&mtr);
1033 return(err);
1034 }
1035 root_page_bulk.copyIn(first_rec);
1036
1037 /* Remove last page. */
1038 btr_page_free_low(m_index, last_block, m_root_level, &mtr);
1039
1040 /* Do not flush the last page. */
1041 last_block->page.flush_observer = NULL;
1042
1043 mtr_commit(&mtr);
1044
1045 err = pageCommit(&root_page_bulk, NULL, false);
1046 ut_ad(err == DB_SUCCESS);
1047 }
1048
1049 #ifdef UNIV_DEBUG
1050 dict_sync_check check(true);
1051
1052 ut_ad(!sync_check_iterate(check));
1053 #endif /* UNIV_DEBUG */
1054
1055 ut_ad(err != DB_SUCCESS || btr_validate_index(m_index, NULL, false));
1056 return(err);
1057 }
1058