1 /*****************************************************************************
2 
3 Copyright (c) 2014, 2019, Oracle and/or its affiliates. All Rights Reserved.
4 Copyright (c) 2017, 2021, MariaDB Corporation.
5 
6 This program is free software; you can redistribute it and/or modify it under
7 the terms of the GNU General Public License as published by the Free Software
8 Foundation; version 2 of the License.
9 
10 This program is distributed in the hope that it will be useful, but WITHOUT
11 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
13 
14 You should have received a copy of the GNU General Public License along with
15 this program; if not, write to the Free Software Foundation, Inc.,
16 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
17 
18 *****************************************************************************/
19 
20 /**************************************************//**
21 @file btr/btr0bulk.cc
22 The B-tree bulk load
23 
24 Created 03/11/2014 Shaohua Wang
25 *******************************************************/
26 
27 #include "btr0bulk.h"
28 #include "btr0btr.h"
29 #include "btr0cur.h"
30 #include "btr0pcur.h"
31 #include "ibuf0ibuf.h"
32 #include "page0page.h"
33 #include "trx0trx.h"
34 
35 /** Innodb B-tree index fill factor for bulk load. */
36 uint	innobase_fill_factor;
37 
38 /** Initialize members, allocate page if needed and start mtr.
39 Note: we commit all mtrs on failure.
40 @return error code. */
41 dberr_t
init()42 PageBulk::init()
43 {
44 	buf_block_t*	new_block;
45 	page_t*		new_page;
46 
47 	ut_ad(m_heap == NULL);
48 	m_heap = mem_heap_create(1000);
49 
50 	m_mtr.start();
51 	m_index->set_modified(m_mtr);
52 
53 	if (m_page_no == FIL_NULL) {
54 		mtr_t	alloc_mtr;
55 
56 		/* We commit redo log for allocation by a separate mtr,
57 		because we don't guarantee pages are committed following
58 		the allocation order, and we will always generate redo log
59 		for page allocation, even when creating a new tablespace. */
60 		alloc_mtr.start();
61 		m_index->set_modified(alloc_mtr);
62 
63 		uint32_t n_reserved;
64 		if (!fsp_reserve_free_extents(&n_reserved,
65 					      m_index->table->space,
66 					      1, FSP_NORMAL, &alloc_mtr)) {
67 			alloc_mtr.commit();
68 			m_mtr.commit();
69 			return(DB_OUT_OF_FILE_SPACE);
70 		}
71 
72 		/* Allocate a new page. */
73 		new_block = btr_page_alloc(m_index, 0, FSP_UP, m_level,
74 					   &alloc_mtr, &m_mtr);
75 
76 		m_index->table->space->release_free_extents(n_reserved);
77 
78 		alloc_mtr.commit();
79 
80 		new_page = buf_block_get_frame(new_block);
81 		m_page_no = new_block->page.id().page_no();
82 
83 		byte* index_id = my_assume_aligned<2>
84 			(PAGE_HEADER + PAGE_INDEX_ID + new_page);
85 		compile_time_assert(FIL_PAGE_NEXT == FIL_PAGE_PREV + 4);
86 		compile_time_assert(FIL_NULL == 0xffffffff);
87 		memset_aligned<8>(new_page + FIL_PAGE_PREV, 0xff, 8);
88 
89 		if (UNIV_LIKELY_NULL(new_block->page.zip.data)) {
90 			mach_write_to_8(index_id, m_index->id);
91 			page_create_zip(new_block, m_index, m_level, 0,
92 					&m_mtr);
93 		} else {
94 			ut_ad(!m_index->is_spatial());
95 			page_create(new_block, &m_mtr,
96 				    m_index->table->not_redundant());
97 			m_mtr.memset(*new_block, FIL_PAGE_PREV, 8, 0xff);
98 			m_mtr.write<2,mtr_t::MAYBE_NOP>(*new_block, PAGE_HEADER
99 							+ PAGE_LEVEL
100 							+ new_page, m_level);
101 			m_mtr.write<8>(*new_block, index_id, m_index->id);
102 		}
103 	} else {
104 		new_block = btr_block_get(*m_index, m_page_no, RW_X_LATCH,
105 					  false, &m_mtr);
106 
107 		new_page = buf_block_get_frame(new_block);
108 		ut_ad(new_block->page.id().page_no() == m_page_no);
109 
110 		ut_ad(page_dir_get_n_heap(new_page) == PAGE_HEAP_NO_USER_LOW);
111 
112 		btr_page_set_level(new_block, m_level, &m_mtr);
113 	}
114 
115 	m_page_zip = buf_block_get_page_zip(new_block);
116 
117 	if (!m_level && dict_index_is_sec_or_ibuf(m_index)) {
118 		page_update_max_trx_id(new_block, m_page_zip, m_trx_id,
119 				       &m_mtr);
120 	}
121 
122 	m_block = new_block;
123 	m_page = new_page;
124 	m_cur_rec = page_get_infimum_rec(new_page);
125 	ut_ad(m_is_comp == !!page_is_comp(new_page));
126 	m_free_space = page_get_free_space_of_empty(m_is_comp);
127 
128 	if (innobase_fill_factor == 100 && dict_index_is_clust(m_index)) {
129 		/* Keep default behavior compatible with 5.6 */
130 		m_reserved_space = dict_index_get_space_reserve();
131 	} else {
132 		m_reserved_space =
133 			srv_page_size * (100 - innobase_fill_factor) / 100;
134 	}
135 
136 	m_padding_space =
137 		srv_page_size - dict_index_zip_pad_optimal_page_size(m_index);
138 	m_heap_top = page_header_get_ptr(new_page, PAGE_HEAP_TOP);
139 	m_rec_no = page_header_get_field(new_page, PAGE_N_RECS);
140 	/* Temporarily reset PAGE_DIRECTION_B from PAGE_NO_DIRECTION to 0,
141 	without writing redo log, to ensure that needs_finish() will hold
142 	on an empty page. */
143 	ut_ad(m_page[PAGE_HEADER + PAGE_DIRECTION_B] == PAGE_NO_DIRECTION);
144 	m_page[PAGE_HEADER + PAGE_DIRECTION_B] = 0;
145 	ut_d(m_total_data = 0);
146 
147 	return(DB_SUCCESS);
148 }
149 
150 /** Insert a record in the page.
151 @tparam fmt     the page format
152 @param[in,out]	rec		record
153 @param[in]	offsets		record offsets */
154 template<PageBulk::format fmt>
insertPage(rec_t * rec,rec_offs * offsets)155 inline void PageBulk::insertPage(rec_t *rec, rec_offs *offsets)
156 {
157   ut_ad((m_page_zip != nullptr) == (fmt == COMPRESSED));
158   ut_ad((fmt != REDUNDANT) == m_is_comp);
159   ut_ad(page_align(m_heap_top) == m_page);
160   ut_ad(m_heap);
161 
162   const ulint rec_size= rec_offs_size(offsets);
163   const ulint extra_size= rec_offs_extra_size(offsets);
164   ut_ad(page_align(m_heap_top + rec_size) == m_page);
165   ut_d(const bool is_leaf= page_rec_is_leaf(m_cur_rec));
166 
167 #ifdef UNIV_DEBUG
168   /* Check whether records are in order. */
169   if (page_offset(m_cur_rec) !=
170       (fmt == REDUNDANT ? PAGE_OLD_INFIMUM : PAGE_NEW_INFIMUM))
171   {
172     const rec_t *old_rec = m_cur_rec;
173     rec_offs *old_offsets= rec_get_offsets(old_rec, m_index, nullptr, is_leaf
174                                            ? m_index->n_core_fields : 0,
175                                            ULINT_UNDEFINED, &m_heap);
176     ut_ad(cmp_rec_rec(rec, old_rec, offsets, old_offsets, m_index) > 0);
177   }
178 
179   m_total_data+= rec_size;
180 #endif /* UNIV_DEBUG */
181 
182   rec_t* const insert_rec= m_heap_top + extra_size;
183 
184   /* Insert the record in the linked list. */
185   if (fmt != REDUNDANT)
186   {
187     const rec_t *next_rec= m_page +
188       page_offset(m_cur_rec + mach_read_from_2(m_cur_rec - REC_NEXT));
189     if (fmt != COMPRESSED)
190       m_mtr.write<2>(*m_block, m_cur_rec - REC_NEXT,
191                      static_cast<uint16_t>(insert_rec - m_cur_rec));
192     else
193     {
194       mach_write_to_2(m_cur_rec - REC_NEXT,
195                       static_cast<uint16_t>(insert_rec - m_cur_rec));
196       memcpy(m_heap_top, rec - extra_size, rec_size);
197     }
198 
199     rec_t * const this_rec= fmt != COMPRESSED
200       ? const_cast<rec_t*>(rec) : insert_rec;
201     rec_set_bit_field_1(this_rec, 0, REC_NEW_N_OWNED, REC_N_OWNED_MASK,
202                         REC_N_OWNED_SHIFT);
203     rec_set_bit_field_2(this_rec, PAGE_HEAP_NO_USER_LOW + m_rec_no,
204                         REC_NEW_HEAP_NO, REC_HEAP_NO_MASK, REC_HEAP_NO_SHIFT);
205     mach_write_to_2(this_rec - REC_NEXT,
206                     static_cast<uint16_t>(next_rec - insert_rec));
207   }
208   else
209   {
210     memcpy(const_cast<rec_t*>(rec) - REC_NEXT, m_cur_rec - REC_NEXT, 2);
211     m_mtr.write<2>(*m_block, m_cur_rec - REC_NEXT, page_offset(insert_rec));
212     rec_set_bit_field_1(const_cast<rec_t*>(rec), 0,
213                         REC_OLD_N_OWNED, REC_N_OWNED_MASK, REC_N_OWNED_SHIFT);
214     rec_set_bit_field_2(const_cast<rec_t*>(rec),
215                         PAGE_HEAP_NO_USER_LOW + m_rec_no,
216                         REC_OLD_HEAP_NO, REC_HEAP_NO_MASK, REC_HEAP_NO_SHIFT);
217   }
218 
219   if (fmt == COMPRESSED)
220     /* We already wrote the record. Log is written in PageBulk::compress(). */;
221   else if (page_offset(m_cur_rec) ==
222            (fmt == REDUNDANT ? PAGE_OLD_INFIMUM : PAGE_NEW_INFIMUM))
223     m_mtr.memcpy(*m_block, m_heap_top, rec - extra_size, rec_size);
224   else
225   {
226     /* Try to copy common prefix from the preceding record. */
227     const byte *r= rec - extra_size;
228     const byte * const insert_rec_end= m_heap_top + rec_size;
229     byte *b= m_heap_top;
230 
231     /* Skip any unchanged prefix of the record. */
232     for (; * b == *r; b++, r++);
233 
234     ut_ad(b < insert_rec_end);
235 
236     const byte *c= m_cur_rec - (rec - r);
237     const byte * const c_end= std::min(m_cur_rec + rec_offs_data_size(offsets),
238                                        m_heap_top);
239 
240     /* Try to copy any bytes of the preceding record. */
241     if (UNIV_LIKELY(c >= m_page && c < c_end))
242     {
243       const byte *cm= c;
244       byte *bm= b;
245       const byte *rm= r;
246       for (; cm < c_end && *rm == *cm; cm++, bm++, rm++);
247       ut_ad(bm <= insert_rec_end);
248       size_t len= static_cast<size_t>(rm - r);
249       ut_ad(!memcmp(r, c, len));
250       if (len > 2)
251       {
252         memcpy(b, c, len);
253         m_mtr.memmove(*m_block, page_offset(b), page_offset(c), len);
254         c= cm;
255         b= bm;
256         r= rm;
257       }
258     }
259 
260     if (c < m_cur_rec)
261     {
262       if (!rec_offs_data_size(offsets))
263       {
264 no_data:
265         m_mtr.memcpy<mtr_t::FORCED>(*m_block, b, r, m_cur_rec - c);
266         goto rec_done;
267       }
268       /* Some header bytes differ. Compare the data separately. */
269       const byte *cd= m_cur_rec;
270       byte *bd= insert_rec;
271       const byte *rd= rec;
272       /* Skip any unchanged prefix of the record. */
273       for (;; cd++, bd++, rd++)
274         if (bd == insert_rec_end)
275           goto no_data;
276         else if (*bd != *rd)
277           break;
278 
279       /* Try to copy any data bytes of the preceding record. */
280       if (c_end - cd > 2)
281       {
282         const byte *cdm= cd;
283         const byte *rdm= rd;
284         for (; cdm < c_end && *rdm == *cdm; cdm++, rdm++)
285         ut_ad(rdm - rd + bd <= insert_rec_end);
286         size_t len= static_cast<size_t>(rdm - rd);
287         ut_ad(!memcmp(rd, cd, len));
288         if (len > 2)
289         {
290           m_mtr.memcpy<mtr_t::FORCED>(*m_block, b, r, m_cur_rec - c);
291           memcpy(bd, cd, len);
292           m_mtr.memmove(*m_block, page_offset(bd), page_offset(cd), len);
293           c= cdm;
294           b= rdm - rd + bd;
295           r= rdm;
296         }
297       }
298     }
299 
300     if (size_t len= static_cast<size_t>(insert_rec_end - b))
301       m_mtr.memcpy<mtr_t::FORCED>(*m_block, b, r, len);
302   }
303 
304 rec_done:
305   ut_ad(fmt == COMPRESSED || !memcmp(m_heap_top, rec - extra_size, rec_size));
306   rec_offs_make_valid(insert_rec, m_index, is_leaf, offsets);
307 
308   /* Update the member variables. */
309   ulint slot_size= page_dir_calc_reserved_space(m_rec_no + 1) -
310     page_dir_calc_reserved_space(m_rec_no);
311 
312   ut_ad(m_free_space >= rec_size + slot_size);
313   ut_ad(m_heap_top + rec_size < m_page + srv_page_size);
314 
315   m_free_space-= rec_size + slot_size;
316   m_heap_top+= rec_size;
317   m_rec_no++;
318   m_cur_rec= insert_rec;
319 }
320 
321 /** Insert a record in the page.
322 @param[in]	rec		record
323 @param[in]	offsets		record offsets */
insert(const rec_t * rec,rec_offs * offsets)324 inline void PageBulk::insert(const rec_t *rec, rec_offs *offsets)
325 {
326   byte rec_hdr[REC_N_OLD_EXTRA_BYTES];
327   static_assert(REC_N_OLD_EXTRA_BYTES > REC_N_NEW_EXTRA_BYTES, "file format");
328 
329   if (UNIV_LIKELY_NULL(m_page_zip))
330     insertPage<COMPRESSED>(const_cast<rec_t*>(rec), offsets);
331   else if (m_is_comp)
332   {
333     memcpy(rec_hdr, rec - REC_N_NEW_EXTRA_BYTES, REC_N_NEW_EXTRA_BYTES);
334     insertPage<DYNAMIC>(const_cast<rec_t*>(rec), offsets);
335     memcpy(const_cast<rec_t*>(rec) - REC_N_NEW_EXTRA_BYTES, rec_hdr,
336            REC_N_NEW_EXTRA_BYTES);
337   }
338   else
339   {
340     memcpy(rec_hdr, rec - REC_N_OLD_EXTRA_BYTES, REC_N_OLD_EXTRA_BYTES);
341     insertPage<REDUNDANT>(const_cast<rec_t*>(rec), offsets);
342     memcpy(const_cast<rec_t*>(rec) - REC_N_OLD_EXTRA_BYTES, rec_hdr,
343            REC_N_OLD_EXTRA_BYTES);
344   }
345 }
346 
347 /** Set the number of owned records in the uncompressed page of
348 a ROW_FORMAT=COMPRESSED record without redo-logging. */
rec_set_n_owned_zip(rec_t * rec,ulint n_owned)349 static void rec_set_n_owned_zip(rec_t *rec, ulint n_owned)
350 {
351   rec_set_bit_field_1(rec, n_owned, REC_NEW_N_OWNED,
352                       REC_N_OWNED_MASK, REC_N_OWNED_SHIFT);
353 }
354 
355 /** Mark end of insertion to the page. Scan all records to set page dirs,
356 and set page header members.
357 @tparam fmt  page format */
358 template<PageBulk::format fmt>
finishPage()359 inline void PageBulk::finishPage()
360 {
361   ut_ad((m_page_zip != nullptr) == (fmt == COMPRESSED));
362   ut_ad((fmt != REDUNDANT) == m_is_comp);
363 
364   ulint count= 0;
365   ulint n_recs= 0;
366   byte *slot= my_assume_aligned<2>(m_page + srv_page_size -
367                                    (PAGE_DIR + PAGE_DIR_SLOT_SIZE));
368   const page_dir_slot_t *const slot0 = slot;
369   compile_time_assert(PAGE_DIR_SLOT_SIZE == 2);
370   if (fmt != REDUNDANT)
371   {
372     uint16_t offset= mach_read_from_2(PAGE_NEW_INFIMUM - REC_NEXT + m_page);
373     ut_ad(offset >= PAGE_NEW_SUPREMUM - PAGE_NEW_INFIMUM);
374     offset= static_cast<uint16_t>(offset + PAGE_NEW_INFIMUM);
375     /* Set owner & dir. */
376     while (offset != PAGE_NEW_SUPREMUM)
377     {
378       ut_ad(offset >= PAGE_NEW_SUPREMUM);
379       ut_ad(offset < page_offset(slot));
380       count++;
381       n_recs++;
382 
383       if (count == (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2)
384       {
385         slot-= PAGE_DIR_SLOT_SIZE;
386         mach_write_to_2(slot, offset);
387 
388         if (fmt != COMPRESSED)
389           page_rec_set_n_owned<false>(m_block, m_page + offset, count, true,
390                                       &m_mtr);
391         else
392           rec_set_n_owned_zip(m_page + offset, count);
393 
394         count= 0;
395       }
396 
397       uint16_t next= static_cast<uint16_t>
398         ((mach_read_from_2(m_page + offset - REC_NEXT) + offset) &
399          (srv_page_size - 1));
400       ut_ad(next);
401       offset= next;
402     }
403 
404     if (slot0 != slot && (count + 1 + (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2 <=
405                           PAGE_DIR_SLOT_MAX_N_OWNED))
406     {
407       /* Merge the last two slots, like page_cur_insert_rec_low() does. */
408       count+= (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2;
409 
410       rec_t *rec= const_cast<rec_t*>(page_dir_slot_get_rec(slot));
411       if (fmt != COMPRESSED)
412         page_rec_set_n_owned<false>(m_block, rec, 0, true, &m_mtr);
413       else
414         rec_set_n_owned_zip(rec, 0);
415     }
416     else
417       slot-= PAGE_DIR_SLOT_SIZE;
418 
419     mach_write_to_2(slot, PAGE_NEW_SUPREMUM);
420     if (fmt != COMPRESSED)
421       page_rec_set_n_owned<false>(m_block, m_page + PAGE_NEW_SUPREMUM,
422                                   count + 1, true, &m_mtr);
423     else
424       rec_set_n_owned_zip(m_page + PAGE_NEW_SUPREMUM, count + 1);
425   }
426   else
427   {
428     rec_t *insert_rec= m_page +
429       mach_read_from_2(PAGE_OLD_INFIMUM - REC_NEXT + m_page);
430 
431     /* Set owner & dir. */
432     while (insert_rec != m_page + PAGE_OLD_SUPREMUM)
433     {
434       count++;
435       n_recs++;
436 
437       if (count == (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2)
438       {
439         slot-= PAGE_DIR_SLOT_SIZE;
440         mach_write_to_2(slot, page_offset(insert_rec));
441         page_rec_set_n_owned<false>(m_block, insert_rec, count, false, &m_mtr);
442         count= 0;
443       }
444 
445       insert_rec= m_page + mach_read_from_2(insert_rec - REC_NEXT);
446     }
447 
448     if (slot0 != slot && (count + 1 + (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2 <=
449                           PAGE_DIR_SLOT_MAX_N_OWNED))
450     {
451       /* Merge the last two slots, like page_cur_insert_rec_low() does. */
452       count+= (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2;
453 
454       rec_t *rec= const_cast<rec_t*>(page_dir_slot_get_rec(slot));
455       page_rec_set_n_owned<false>(m_block, rec, 0, false, &m_mtr);
456     }
457     else
458       slot-= PAGE_DIR_SLOT_SIZE;
459 
460     mach_write_to_2(slot, PAGE_OLD_SUPREMUM);
461     page_rec_set_n_owned<false>(m_block, m_page + PAGE_OLD_SUPREMUM, count + 1,
462                                 false, &m_mtr);
463   }
464 
465   if (!m_rec_no);
466   else if (fmt != COMPRESSED)
467   {
468     static_assert(PAGE_N_DIR_SLOTS == 0, "compatibility");
469     alignas(8) byte page_header[PAGE_N_HEAP + 2];
470     mach_write_to_2(page_header + PAGE_N_DIR_SLOTS,
471                     1 + (slot0 - slot) / PAGE_DIR_SLOT_SIZE);
472     mach_write_to_2(page_header + PAGE_HEAP_TOP, m_heap_top - m_page);
473     mach_write_to_2(page_header + PAGE_N_HEAP,
474                     (PAGE_HEAP_NO_USER_LOW + m_rec_no) |
475                     uint16_t{fmt != REDUNDANT} << 15);
476     m_mtr.memcpy(*m_block, PAGE_HEADER + m_page, page_header,
477                  sizeof page_header);
478     m_mtr.write<2>(*m_block, PAGE_HEADER + PAGE_N_RECS + m_page, m_rec_no);
479     m_mtr.memcpy(*m_block, page_offset(slot), slot0 - slot);
480   }
481   else
482   {
483     /* For ROW_FORMAT=COMPRESSED, redo log may be written in
484     PageBulk::compress(). */
485     mach_write_to_2(PAGE_HEADER + PAGE_N_DIR_SLOTS + m_page,
486                     1 + (slot0 - slot) / PAGE_DIR_SLOT_SIZE);
487     mach_write_to_2(PAGE_HEADER + PAGE_HEAP_TOP + m_page,
488                     static_cast<ulint>(m_heap_top - m_page));
489     mach_write_to_2(PAGE_HEADER + PAGE_N_HEAP + m_page,
490                     (PAGE_HEAP_NO_USER_LOW + m_rec_no) | 1U << 15);
491     mach_write_to_2(PAGE_HEADER + PAGE_N_RECS + m_page, m_rec_no);
492   }
493 }
494 
needs_finish() const495 inline bool PageBulk::needs_finish() const
496 {
497   ut_ad(page_align(m_cur_rec) == m_block->frame);
498   ut_ad(m_page == m_block->frame);
499   if (!m_page[PAGE_HEADER + PAGE_DIRECTION_B])
500     return true;
501   ulint heap_no, n_heap= page_header_get_field(m_page, PAGE_N_HEAP);
502   ut_ad((n_heap & 0x7fff) >= PAGE_HEAP_NO_USER_LOW);
503   if (n_heap & 0x8000)
504   {
505     n_heap&= 0x7fff;
506     heap_no= rec_get_heap_no_new(m_cur_rec);
507     if (heap_no == PAGE_HEAP_NO_INFIMUM &&
508 	page_header_get_field(m_page, PAGE_HEAP_TOP) == PAGE_NEW_SUPREMUM_END)
509       return false;
510   }
511   else
512   {
513     heap_no= rec_get_heap_no_old(m_cur_rec);
514     if (heap_no == PAGE_HEAP_NO_INFIMUM &&
515 	page_header_get_field(m_page, PAGE_HEAP_TOP) == PAGE_OLD_SUPREMUM_END)
516       return false;
517   }
518   return heap_no != n_heap - 1;
519 }
520 
521 /** Mark end of insertion to the page. Scan all records to set page dirs,
522 and set page header members.
523 @tparam compressed  whether the page is in ROW_FORMAT=COMPRESSED */
finish()524 inline void PageBulk::finish()
525 {
526   ut_ad(!m_index->is_spatial());
527 
528   if (!needs_finish());
529   else if (UNIV_LIKELY_NULL(m_page_zip))
530     finishPage<COMPRESSED>();
531   else if (m_is_comp)
532     finishPage<DYNAMIC>();
533   else
534     finishPage<REDUNDANT>();
535 
536   /* In MariaDB 10.2, 10.3, 10.4, we would initialize
537   PAGE_DIRECTION_B, PAGE_N_DIRECTION, PAGE_LAST_INSERT
538   in the same way as we would during normal INSERT operations.
539   Starting with MariaDB Server 10.5, bulk insert will not
540   touch those fields. */
541   ut_ad(!m_page[PAGE_HEADER + PAGE_INSTANT]);
542   /* Restore the temporary change of PageBulk::init() that was necessary to
543   ensure that PageBulk::needs_finish() holds on an empty page. */
544   m_page[PAGE_HEADER + PAGE_DIRECTION_B]= PAGE_NO_DIRECTION;
545 
546   ut_ad(!page_header_get_field(m_page, PAGE_FREE));
547   ut_ad(!page_header_get_field(m_page, PAGE_GARBAGE));
548   ut_ad(!page_header_get_field(m_page, PAGE_LAST_INSERT));
549   ut_ad(!page_header_get_field(m_page, PAGE_N_DIRECTION));
550   ut_ad(m_total_data + page_dir_calc_reserved_space(m_rec_no) <=
551         page_get_free_space_of_empty(m_is_comp));
552   ut_ad(!needs_finish());
553   ut_ad(page_validate(m_page, m_index));
554 }
555 
556 /** Commit inserts done to the page
557 @param[in]	success		Flag whether all inserts succeed. */
commit(bool success)558 void PageBulk::commit(bool success)
559 {
560   finish();
561   if (success && !dict_index_is_clust(m_index) && page_is_leaf(m_page))
562     ibuf_set_bitmap_for_bulk_load(m_block, innobase_fill_factor == 100);
563   m_mtr.commit();
564 }
565 
566 /** Compress a page of compressed table
567 @return	true	compress successfully or no need to compress
568 @return	false	compress failed. */
569 bool
compress()570 PageBulk::compress()
571 {
572 	ut_ad(m_page_zip != NULL);
573 
574 	return page_zip_compress(m_block, m_index, page_zip_level, &m_mtr);
575 }
576 
577 /** Get node pointer
578 @return node pointer */
579 dtuple_t*
getNodePtr()580 PageBulk::getNodePtr()
581 {
582 	rec_t*		first_rec;
583 	dtuple_t*	node_ptr;
584 
585 	/* Create node pointer */
586 	first_rec = page_rec_get_next(page_get_infimum_rec(m_page));
587 	ut_a(page_rec_is_user_rec(first_rec));
588 	node_ptr = dict_index_build_node_ptr(m_index, first_rec, m_page_no,
589 					     m_heap, m_level);
590 
591 	return(node_ptr);
592 }
593 
594 /** Get split rec in left page.We split a page in half when compresssion fails,
595 and the split rec will be copied to right page.
596 @return split rec */
597 rec_t*
getSplitRec()598 PageBulk::getSplitRec()
599 {
600 	rec_t*		rec;
601 	rec_offs*	offsets;
602 	ulint		total_used_size;
603 	ulint		total_recs_size;
604 	ulint		n_recs;
605 
606 	ut_ad(m_page_zip != NULL);
607 	ut_ad(m_rec_no >= 2);
608 	ut_ad(!m_index->is_instant());
609 
610 	ut_ad(page_get_free_space_of_empty(m_is_comp) > m_free_space);
611 	total_used_size = page_get_free_space_of_empty(m_is_comp)
612 		- m_free_space;
613 
614 	total_recs_size = 0;
615 	n_recs = 0;
616 	offsets = NULL;
617 	rec = page_get_infimum_rec(m_page);
618 	const ulint n_core = page_is_leaf(m_page) ? m_index->n_core_fields : 0;
619 
620 	do {
621 		rec = page_rec_get_next(rec);
622 		ut_ad(page_rec_is_user_rec(rec));
623 
624 		offsets = rec_get_offsets(rec, m_index, offsets, n_core,
625 					  ULINT_UNDEFINED, &m_heap);
626 		total_recs_size += rec_offs_size(offsets);
627 		n_recs++;
628 	} while (total_recs_size + page_dir_calc_reserved_space(n_recs)
629 		 < total_used_size / 2);
630 
631 	/* Keep at least one record on left page */
632 	if (page_rec_is_infimum(page_rec_get_prev(rec))) {
633 		rec = page_rec_get_next(rec);
634 		ut_ad(page_rec_is_user_rec(rec));
635 	}
636 
637 	return(rec);
638 }
639 
640 /** Copy all records after split rec including itself.
641 @param[in]	rec	split rec */
642 void
copyIn(rec_t * split_rec)643 PageBulk::copyIn(
644 	rec_t*		split_rec)
645 {
646 
647 	rec_t*		rec = split_rec;
648 	rec_offs*	offsets = NULL;
649 
650 	ut_ad(m_rec_no == 0);
651 	ut_ad(page_rec_is_user_rec(rec));
652 
653 	const ulint n_core = page_rec_is_leaf(rec)
654 		? m_index->n_core_fields : 0;
655 
656 	do {
657 		offsets = rec_get_offsets(rec, m_index, offsets, n_core,
658 					  ULINT_UNDEFINED, &m_heap);
659 
660 		insert(rec, offsets);
661 
662 		rec = page_rec_get_next(rec);
663 	} while (!page_rec_is_supremum(rec));
664 
665 	ut_ad(m_rec_no > 0);
666 }
667 
668 /** Remove all records after split rec including itself.
669 @param[in]	rec	split rec	*/
670 void
copyOut(rec_t * split_rec)671 PageBulk::copyOut(
672 	rec_t*		split_rec)
673 {
674 	rec_t*		rec;
675 	rec_t*		last_rec;
676 	ulint		n;
677 
678 	/* Suppose before copyOut, we have 5 records on the page:
679 	infimum->r1->r2->r3->r4->r5->supremum, and r3 is the split rec.
680 
681 	after copyOut, we have 2 records on the page:
682 	infimum->r1->r2->supremum. slot ajustment is not done. */
683 
684 	rec = page_rec_get_next(page_get_infimum_rec(m_page));
685 	last_rec = page_rec_get_prev(page_get_supremum_rec(m_page));
686 	n = 0;
687 
688 	while (rec != split_rec) {
689 		rec = page_rec_get_next(rec);
690 		n++;
691 	}
692 
693 	ut_ad(n > 0);
694 
695 	/* Set last record's next in page */
696 	rec_offs*	offsets = NULL;
697 	rec = page_rec_get_prev(split_rec);
698 	const ulint n_core = page_rec_is_leaf(split_rec)
699 		? m_index->n_core_fields : 0;
700 
701 	offsets = rec_get_offsets(rec, m_index, offsets, n_core,
702 				  ULINT_UNDEFINED, &m_heap);
703 	mach_write_to_2(rec - REC_NEXT, m_is_comp
704 			? static_cast<uint16_t>
705 			(PAGE_NEW_SUPREMUM - page_offset(rec))
706 			: PAGE_OLD_SUPREMUM);
707 
708 	/* Set related members */
709 	m_cur_rec = rec;
710 	m_heap_top = rec_get_end(rec, offsets);
711 
712 	offsets = rec_get_offsets(last_rec, m_index, offsets, n_core,
713 				  ULINT_UNDEFINED, &m_heap);
714 
715 	m_free_space += ulint(rec_get_end(last_rec, offsets) - m_heap_top)
716 		+ page_dir_calc_reserved_space(m_rec_no)
717 		- page_dir_calc_reserved_space(n);
718 	ut_ad(lint(m_free_space) > 0);
719 	m_rec_no = n;
720 
721 #ifdef UNIV_DEBUG
722 	m_total_data -= ulint(rec_get_end(last_rec, offsets) - m_heap_top);
723 #endif /* UNIV_DEBUG */
724 }
725 
726 /** Set next page
727 @param[in]	next_page_no	next page no */
setNext(ulint next_page_no)728 inline void PageBulk::setNext(ulint next_page_no)
729 {
730   if (UNIV_LIKELY_NULL(m_page_zip))
731     /* For ROW_FORMAT=COMPRESSED, redo log may be written
732     in PageBulk::compress(). */
733     mach_write_to_4(m_page + FIL_PAGE_NEXT, next_page_no);
734   else
735     m_mtr.write<4>(*m_block, m_page + FIL_PAGE_NEXT, next_page_no);
736 }
737 
738 /** Set previous page
739 @param[in]	prev_page_no	previous page no */
setPrev(ulint prev_page_no)740 inline void PageBulk::setPrev(ulint prev_page_no)
741 {
742   if (UNIV_LIKELY_NULL(m_page_zip))
743     /* For ROW_FORMAT=COMPRESSED, redo log may be written
744     in PageBulk::compress(). */
745     mach_write_to_4(m_page + FIL_PAGE_PREV, prev_page_no);
746   else
747     m_mtr.write<4>(*m_block, m_page + FIL_PAGE_PREV, prev_page_no);
748 }
749 
750 /** Check if required space is available in the page for the rec to be inserted.
751 We check fill factor & padding here.
752 @param[in]	length		required length
753 @return true	if space is available */
754 bool
isSpaceAvailable(ulint rec_size)755 PageBulk::isSpaceAvailable(
756 	ulint		rec_size)
757 {
758 	ulint	slot_size;
759 	ulint	required_space;
760 
761 	slot_size = page_dir_calc_reserved_space(m_rec_no + 1)
762 		- page_dir_calc_reserved_space(m_rec_no);
763 
764 	required_space = rec_size + slot_size;
765 
766 	if (required_space > m_free_space) {
767 		ut_ad(m_rec_no > 0);
768 		return false;
769 	}
770 
771 	/* Fillfactor & Padding apply to both leaf and non-leaf pages.
772 	Note: we keep at least 2 records in a page to avoid B-tree level
773 	growing too high. */
774 	if (m_rec_no >= 2
775 	    && ((m_page_zip == NULL && m_free_space - required_space
776 		 < m_reserved_space)
777 		|| (m_page_zip != NULL && m_free_space - required_space
778 		    < m_padding_space))) {
779 		return(false);
780 	}
781 
782 	return(true);
783 }
784 
785 /** Check whether the record needs to be stored externally.
786 @return false if the entire record can be stored locally on the page  */
787 bool
needExt(const dtuple_t * tuple,ulint rec_size)788 PageBulk::needExt(
789 	const dtuple_t*		tuple,
790 	ulint			rec_size)
791 {
792 	return page_zip_rec_needs_ext(rec_size, m_is_comp,
793 				      dtuple_get_n_fields(tuple),
794 				      m_block->zip_size());
795 }
796 
797 /** Store external record
798 Since the record is not logged yet, so we don't log update to the record.
799 the blob data is logged first, then the record is logged in bulk mode.
800 @param[in]	big_rec		external recrod
801 @param[in]	offsets		record offsets
802 @return	error code */
803 dberr_t
storeExt(const big_rec_t * big_rec,rec_offs * offsets)804 PageBulk::storeExt(
805 	const big_rec_t*	big_rec,
806 	rec_offs*		offsets)
807 {
808 	finish();
809 
810 	/* Note: not all fields are initialized in btr_pcur. */
811 	btr_pcur_t	btr_pcur;
812 	btr_pcur.pos_state = BTR_PCUR_IS_POSITIONED;
813 	btr_pcur.latch_mode = BTR_MODIFY_LEAF;
814 	btr_pcur.btr_cur.index = m_index;
815 	btr_pcur.btr_cur.page_cur.index = m_index;
816 	btr_pcur.btr_cur.page_cur.rec = m_cur_rec;
817 	btr_pcur.btr_cur.page_cur.offsets = offsets;
818 	btr_pcur.btr_cur.page_cur.block = m_block;
819 
820 	dberr_t	err = btr_store_big_rec_extern_fields(
821 		&btr_pcur, offsets, big_rec, &m_mtr, BTR_STORE_INSERT_BULK);
822 
823 	/* Reset m_block and m_cur_rec from page cursor, because
824 	block may be changed during blob insert. (FIXME: Can it really?) */
825 	ut_ad(m_block == btr_pcur.btr_cur.page_cur.block);
826 
827 	m_block = btr_pcur.btr_cur.page_cur.block;
828 	m_cur_rec = btr_pcur.btr_cur.page_cur.rec;
829 	m_page = buf_block_get_frame(m_block);
830 
831 	return(err);
832 }
833 
834 /** Release block by commiting mtr
835 Note: log_free_check requires holding no lock/latch in current thread. */
836 void
release()837 PageBulk::release()
838 {
839 	finish();
840 
841 	/* We fix the block because we will re-pin it soon. */
842 	buf_block_buf_fix_inc(m_block, __FILE__, __LINE__);
843 
844 	/* No other threads can modify this block. */
845 	m_modify_clock = buf_block_get_modify_clock(m_block);
846 
847 	m_mtr.commit();
848 }
849 
850 /** Start mtr and latch the block */
851 dberr_t
latch()852 PageBulk::latch()
853 {
854 	m_mtr.start();
855 	m_index->set_modified(m_mtr);
856 
857 	ut_ad(m_block->page.buf_fix_count());
858 
859 	/* In case the block is S-latched by page_cleaner. */
860 	if (!buf_page_optimistic_get(RW_X_LATCH, m_block, m_modify_clock,
861 				     __FILE__, __LINE__, &m_mtr)) {
862 		m_block = buf_page_get_gen(page_id_t(m_index->table->space_id,
863 						     m_page_no),
864 					   0, RW_X_LATCH,
865 					   m_block, BUF_GET_IF_IN_POOL,
866 					   __FILE__, __LINE__, &m_mtr, &m_err);
867 
868 		if (m_err != DB_SUCCESS) {
869 			return (m_err);
870 		}
871 
872 		ut_ad(m_block != NULL);
873 	}
874 
875 	buf_block_buf_fix_dec(m_block);
876 
877 	ut_ad(m_block->page.buf_fix_count());
878 
879 	ut_ad(m_cur_rec > m_page && m_cur_rec < m_heap_top);
880 
881 	return (m_err);
882 }
883 
884 /** Split a page
885 @param[in]	page_bulk	page to split
886 @param[in]	next_page_bulk	next page
887 @return	error code */
888 dberr_t
pageSplit(PageBulk * page_bulk,PageBulk * next_page_bulk)889 BtrBulk::pageSplit(
890 	PageBulk*	page_bulk,
891 	PageBulk*	next_page_bulk)
892 {
893 	ut_ad(page_bulk->getPageZip() != NULL);
894 
895 	if (page_bulk->getRecNo() <= 1) {
896 		return(DB_TOO_BIG_RECORD);
897 	}
898 
899 	/* Initialize a new page */
900 	PageBulk new_page_bulk(m_index, m_trx->id, FIL_NULL,
901 			       page_bulk->getLevel());
902 	dberr_t	err = new_page_bulk.init();
903 	if (err != DB_SUCCESS) {
904 		return(err);
905 	}
906 
907 	/* Copy the upper half to the new page. */
908 	rec_t*	split_rec = page_bulk->getSplitRec();
909 	new_page_bulk.copyIn(split_rec);
910 	page_bulk->copyOut(split_rec);
911 
912 	/* Commit the pages after split. */
913 	err = pageCommit(page_bulk, &new_page_bulk, true);
914 	if (err != DB_SUCCESS) {
915 		pageAbort(&new_page_bulk);
916 		return(err);
917 	}
918 
919 	err = pageCommit(&new_page_bulk, next_page_bulk, true);
920 	if (err != DB_SUCCESS) {
921 		pageAbort(&new_page_bulk);
922 		return(err);
923 	}
924 
925 	return(err);
926 }
927 
928 /** Commit(finish) a page. We set next/prev page no, compress a page of
929 compressed table and split the page if compression fails, insert a node
930 pointer to father page if needed, and commit mini-transaction.
931 @param[in]	page_bulk	page to commit
932 @param[in]	next_page_bulk	next page
933 @param[in]	insert_father	false when page_bulk is a root page and
934 				true when it's a non-root page
935 @return	error code */
936 dberr_t
pageCommit(PageBulk * page_bulk,PageBulk * next_page_bulk,bool insert_father)937 BtrBulk::pageCommit(
938 	PageBulk*	page_bulk,
939 	PageBulk*	next_page_bulk,
940 	bool		insert_father)
941 {
942 	page_bulk->finish();
943 
944 	/* Set page links */
945 	if (next_page_bulk != NULL) {
946 		ut_ad(page_bulk->getLevel() == next_page_bulk->getLevel());
947 
948 		page_bulk->setNext(next_page_bulk->getPageNo());
949 		next_page_bulk->setPrev(page_bulk->getPageNo());
950 	} else {
951 		ut_ad(!page_has_next(page_bulk->getPage()));
952 		/* If a page is released and latched again, we need to
953 		mark it modified in mini-transaction.  */
954 		page_bulk->set_modified();
955 	}
956 
957 	ut_ad(!rw_lock_own_flagged(&m_index->lock,
958 				   RW_LOCK_FLAG_X | RW_LOCK_FLAG_SX
959 				   | RW_LOCK_FLAG_S));
960 
961 	/* Compress page if it's a compressed table. */
962 	if (page_bulk->getPageZip() != NULL && !page_bulk->compress()) {
963 		return(pageSplit(page_bulk, next_page_bulk));
964 	}
965 
966 	/* Insert node pointer to father page. */
967 	if (insert_father) {
968 		dtuple_t*	node_ptr = page_bulk->getNodePtr();
969 		dberr_t		err = insert(node_ptr, page_bulk->getLevel()+1);
970 
971 		if (err != DB_SUCCESS) {
972 			return(err);
973 		}
974 	}
975 
976 	/* Commit mtr. */
977 	page_bulk->commit(true);
978 
979 	return(DB_SUCCESS);
980 }
981 
982 /** Log free check */
logFreeCheck()983 inline void BtrBulk::logFreeCheck()
984 {
985 	if (log_sys.check_flush_or_checkpoint()) {
986 		release();
987 
988 		log_check_margins();
989 
990 		latch();
991 	}
992 }
993 
994 /** Release all latches */
995 void
release()996 BtrBulk::release()
997 {
998 	ut_ad(m_root_level + 1 == m_page_bulks.size());
999 
1000 	for (ulint level = 0; level <= m_root_level; level++) {
1001 		PageBulk*    page_bulk = m_page_bulks.at(level);
1002 
1003 		page_bulk->release();
1004 	}
1005 }
1006 
1007 /** Re-latch all latches */
1008 void
latch()1009 BtrBulk::latch()
1010 {
1011 	ut_ad(m_root_level + 1 == m_page_bulks.size());
1012 
1013 	for (ulint level = 0; level <= m_root_level; level++) {
1014 		PageBulk*    page_bulk = m_page_bulks.at(level);
1015 		page_bulk->latch();
1016 	}
1017 }
1018 
1019 /** Insert a tuple to page in a level
1020 @param[in]	tuple	tuple to insert
1021 @param[in]	level	B-tree level
1022 @return error code */
1023 dberr_t
insert(dtuple_t * tuple,ulint level)1024 BtrBulk::insert(
1025 	dtuple_t*	tuple,
1026 	ulint		level)
1027 {
1028 	bool		is_left_most = false;
1029 	dberr_t		err = DB_SUCCESS;
1030 
1031 	/* Check if we need to create a PageBulk for the level. */
1032 	if (level + 1 > m_page_bulks.size()) {
1033 		PageBulk*	new_page_bulk
1034 			= UT_NEW_NOKEY(PageBulk(m_index, m_trx->id, FIL_NULL,
1035 						level));
1036 		err = new_page_bulk->init();
1037 		if (err != DB_SUCCESS) {
1038 			UT_DELETE(new_page_bulk);
1039 			return(err);
1040 		}
1041 
1042 		m_page_bulks.push_back(new_page_bulk);
1043 		ut_ad(level + 1 == m_page_bulks.size());
1044 		m_root_level = level;
1045 
1046 		is_left_most = true;
1047 	}
1048 
1049 	ut_ad(m_page_bulks.size() > level);
1050 
1051 	PageBulk*	page_bulk = m_page_bulks.at(level);
1052 
1053 	if (is_left_most && level > 0 && page_bulk->getRecNo() == 0) {
1054 		/* The node pointer must be marked as the predefined minimum
1055 		record,	as there is no lower alphabetical limit to records in
1056 		the leftmost node of a level: */
1057 		dtuple_set_info_bits(tuple, dtuple_get_info_bits(tuple)
1058 					    | REC_INFO_MIN_REC_FLAG);
1059 	}
1060 
1061 	ulint		n_ext = 0;
1062 	ulint		rec_size = rec_get_converted_size(m_index, tuple, n_ext);
1063 	big_rec_t*	big_rec = NULL;
1064 	rec_t*		rec = NULL;
1065 	rec_offs*	offsets = NULL;
1066 
1067 	if (page_bulk->needExt(tuple, rec_size)) {
1068 		/* The record is so big that we have to store some fields
1069 		externally on separate database pages */
1070 		big_rec = dtuple_convert_big_rec(m_index, 0, tuple, &n_ext);
1071 
1072 		if (big_rec == NULL) {
1073 			return(DB_TOO_BIG_RECORD);
1074 		}
1075 
1076 		rec_size = rec_get_converted_size(m_index, tuple, n_ext);
1077 	}
1078 
1079 	if (page_bulk->getPageZip() != NULL
1080 	    && page_zip_is_too_big(m_index, tuple)) {
1081 		err = DB_TOO_BIG_RECORD;
1082 		goto func_exit;
1083 	}
1084 
1085 	if (!page_bulk->isSpaceAvailable(rec_size)) {
1086 		/* Create a sibling page_bulk. */
1087 		PageBulk*	sibling_page_bulk;
1088 		sibling_page_bulk = UT_NEW_NOKEY(PageBulk(m_index, m_trx->id,
1089 							  FIL_NULL, level));
1090 		err = sibling_page_bulk->init();
1091 		if (err != DB_SUCCESS) {
1092 			UT_DELETE(sibling_page_bulk);
1093 			goto func_exit;
1094 		}
1095 
1096 		/* Commit page bulk. */
1097 		err = pageCommit(page_bulk, sibling_page_bulk, true);
1098 		if (err != DB_SUCCESS) {
1099 			pageAbort(sibling_page_bulk);
1100 			UT_DELETE(sibling_page_bulk);
1101 			goto func_exit;
1102 		}
1103 
1104 		/* Set new page bulk to page_bulks. */
1105 		ut_ad(sibling_page_bulk->getLevel() <= m_root_level);
1106 		m_page_bulks.at(level) = sibling_page_bulk;
1107 
1108 		UT_DELETE(page_bulk);
1109 		page_bulk = sibling_page_bulk;
1110 
1111 		/* Important: log_free_check whether we need a checkpoint. */
1112 		if (page_is_leaf(sibling_page_bulk->getPage())) {
1113 			if (trx_is_interrupted(m_trx)) {
1114 				err = DB_INTERRUPTED;
1115 				goto func_exit;
1116 			}
1117 
1118 			srv_inc_activity_count();
1119 			logFreeCheck();
1120 		}
1121 	}
1122 
1123 	/* Convert tuple to rec. */
1124         rec = rec_convert_dtuple_to_rec(static_cast<byte*>(mem_heap_alloc(
1125 		page_bulk->m_heap, rec_size)), m_index, tuple, n_ext);
1126         offsets = rec_get_offsets(rec, m_index, offsets, level
1127 				  ? 0 : m_index->n_core_fields,
1128 				  ULINT_UNDEFINED, &page_bulk->m_heap);
1129 
1130 	page_bulk->insert(rec, offsets);
1131 
1132 	if (big_rec != NULL) {
1133 		ut_ad(dict_index_is_clust(m_index));
1134 		ut_ad(page_bulk->getLevel() == 0);
1135 		ut_ad(page_bulk == m_page_bulks.at(0));
1136 
1137 		/* Release all pages above the leaf level */
1138 		for (ulint level = 1; level <= m_root_level; level++) {
1139 			m_page_bulks.at(level)->release();
1140 		}
1141 
1142 		err = page_bulk->storeExt(big_rec, offsets);
1143 
1144 		/* Latch */
1145 		for (ulint level = 1; level <= m_root_level; level++) {
1146 			PageBulk*    page_bulk = m_page_bulks.at(level);
1147 			page_bulk->latch();
1148 		}
1149 	}
1150 
1151 func_exit:
1152 	if (big_rec != NULL) {
1153 		dtuple_convert_back_big_rec(m_index, tuple, big_rec);
1154 	}
1155 
1156 	return(err);
1157 }
1158 
1159 /** Btree bulk load finish. We commit the last page in each level
1160 and copy the last page in top level to the root page of the index
1161 if no error occurs.
1162 @param[in]	err	whether bulk load was successful until now
1163 @return error code  */
1164 dberr_t
finish(dberr_t err)1165 BtrBulk::finish(dberr_t	err)
1166 {
1167 	uint32_t last_page_no = FIL_NULL;
1168 
1169 	ut_ad(!m_index->table->is_temporary());
1170 
1171 	if (m_page_bulks.size() == 0) {
1172 		/* The table is empty. The root page of the index tree
1173 		is already in a consistent state. No need to flush. */
1174 		return(err);
1175 	}
1176 
1177 	ut_ad(m_root_level + 1 == m_page_bulks.size());
1178 
1179 	/* Finish all page bulks */
1180 	for (ulint level = 0; level <= m_root_level; level++) {
1181 		PageBulk*	page_bulk = m_page_bulks.at(level);
1182 
1183 		last_page_no = page_bulk->getPageNo();
1184 
1185 		if (err == DB_SUCCESS) {
1186 			err = pageCommit(page_bulk, NULL,
1187 					 level != m_root_level);
1188 		}
1189 
1190 		if (err != DB_SUCCESS) {
1191 			pageAbort(page_bulk);
1192 		}
1193 
1194 		UT_DELETE(page_bulk);
1195 	}
1196 
1197 	if (err == DB_SUCCESS) {
1198 		rec_t*		first_rec;
1199 		mtr_t		mtr;
1200 		buf_block_t*	last_block;
1201 		PageBulk	root_page_bulk(m_index, m_trx->id,
1202 					       m_index->page, m_root_level);
1203 
1204 		mtr.start();
1205 		m_index->set_modified(mtr);
1206 		mtr_x_lock_index(m_index, &mtr);
1207 
1208 		ut_ad(last_page_no != FIL_NULL);
1209 		last_block = btr_block_get(*m_index, last_page_no, RW_X_LATCH,
1210 					   false, &mtr);
1211 		first_rec = page_rec_get_next(
1212 			page_get_infimum_rec(last_block->frame));
1213 		ut_ad(page_rec_is_user_rec(first_rec));
1214 
1215 		/* Copy last page to root page. */
1216 		err = root_page_bulk.init();
1217 		if (err != DB_SUCCESS) {
1218 			mtr.commit();
1219 			return(err);
1220 		}
1221 		root_page_bulk.copyIn(first_rec);
1222 		root_page_bulk.finish();
1223 
1224 		/* Remove last page. */
1225 		btr_page_free(m_index, last_block, &mtr);
1226 
1227 		mtr.commit();
1228 
1229 		err = pageCommit(&root_page_bulk, NULL, false);
1230 		ut_ad(err == DB_SUCCESS);
1231 	}
1232 
1233 	ut_ad(!sync_check_iterate(dict_sync_check()));
1234 
1235 	ut_ad(err != DB_SUCCESS
1236 	      || btr_validate_index(m_index, NULL) == DB_SUCCESS);
1237 	return(err);
1238 }
1239