1 /*****************************************************************************
2
3 Copyright (c) 2014, 2019, Oracle and/or its affiliates. All Rights Reserved.
4 Copyright (c) 2017, 2021, MariaDB Corporation.
5
6 This program is free software; you can redistribute it and/or modify it under
7 the terms of the GNU General Public License as published by the Free Software
8 Foundation; version 2 of the License.
9
10 This program is distributed in the hope that it will be useful, but WITHOUT
11 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License along with
15 this program; if not, write to the Free Software Foundation, Inc.,
16 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
17
18 *****************************************************************************/
19
20 /**************************************************//**
21 @file btr/btr0bulk.cc
22 The B-tree bulk load
23
24 Created 03/11/2014 Shaohua Wang
25 *******************************************************/
26
27 #include "btr0bulk.h"
28 #include "btr0btr.h"
29 #include "btr0cur.h"
30 #include "btr0pcur.h"
31 #include "ibuf0ibuf.h"
32 #include "page0page.h"
33 #include "trx0trx.h"
34
35 /** Innodb B-tree index fill factor for bulk load. */
36 uint innobase_fill_factor;
37
38 /** Initialize members, allocate page if needed and start mtr.
39 Note: we commit all mtrs on failure.
40 @return error code. */
41 dberr_t
init()42 PageBulk::init()
43 {
44 buf_block_t* new_block;
45 page_t* new_page;
46
47 ut_ad(m_heap == NULL);
48 m_heap = mem_heap_create(1000);
49
50 m_mtr.start();
51 m_index->set_modified(m_mtr);
52
53 if (m_page_no == FIL_NULL) {
54 mtr_t alloc_mtr;
55
56 /* We commit redo log for allocation by a separate mtr,
57 because we don't guarantee pages are committed following
58 the allocation order, and we will always generate redo log
59 for page allocation, even when creating a new tablespace. */
60 alloc_mtr.start();
61 m_index->set_modified(alloc_mtr);
62
63 uint32_t n_reserved;
64 if (!fsp_reserve_free_extents(&n_reserved,
65 m_index->table->space,
66 1, FSP_NORMAL, &alloc_mtr)) {
67 alloc_mtr.commit();
68 m_mtr.commit();
69 return(DB_OUT_OF_FILE_SPACE);
70 }
71
72 /* Allocate a new page. */
73 new_block = btr_page_alloc(m_index, 0, FSP_UP, m_level,
74 &alloc_mtr, &m_mtr);
75
76 m_index->table->space->release_free_extents(n_reserved);
77
78 alloc_mtr.commit();
79
80 new_page = buf_block_get_frame(new_block);
81 m_page_no = new_block->page.id().page_no();
82
83 byte* index_id = my_assume_aligned<2>
84 (PAGE_HEADER + PAGE_INDEX_ID + new_page);
85 compile_time_assert(FIL_PAGE_NEXT == FIL_PAGE_PREV + 4);
86 compile_time_assert(FIL_NULL == 0xffffffff);
87 memset_aligned<8>(new_page + FIL_PAGE_PREV, 0xff, 8);
88
89 if (UNIV_LIKELY_NULL(new_block->page.zip.data)) {
90 mach_write_to_8(index_id, m_index->id);
91 page_create_zip(new_block, m_index, m_level, 0,
92 &m_mtr);
93 } else {
94 ut_ad(!m_index->is_spatial());
95 page_create(new_block, &m_mtr,
96 m_index->table->not_redundant());
97 m_mtr.memset(*new_block, FIL_PAGE_PREV, 8, 0xff);
98 m_mtr.write<2,mtr_t::MAYBE_NOP>(*new_block, PAGE_HEADER
99 + PAGE_LEVEL
100 + new_page, m_level);
101 m_mtr.write<8>(*new_block, index_id, m_index->id);
102 }
103 } else {
104 new_block = btr_block_get(*m_index, m_page_no, RW_X_LATCH,
105 false, &m_mtr);
106
107 new_page = buf_block_get_frame(new_block);
108 ut_ad(new_block->page.id().page_no() == m_page_no);
109
110 ut_ad(page_dir_get_n_heap(new_page) == PAGE_HEAP_NO_USER_LOW);
111
112 btr_page_set_level(new_block, m_level, &m_mtr);
113 }
114
115 m_page_zip = buf_block_get_page_zip(new_block);
116
117 if (!m_level && dict_index_is_sec_or_ibuf(m_index)) {
118 page_update_max_trx_id(new_block, m_page_zip, m_trx_id,
119 &m_mtr);
120 }
121
122 m_block = new_block;
123 m_page = new_page;
124 m_cur_rec = page_get_infimum_rec(new_page);
125 ut_ad(m_is_comp == !!page_is_comp(new_page));
126 m_free_space = page_get_free_space_of_empty(m_is_comp);
127
128 if (innobase_fill_factor == 100 && dict_index_is_clust(m_index)) {
129 /* Keep default behavior compatible with 5.6 */
130 m_reserved_space = dict_index_get_space_reserve();
131 } else {
132 m_reserved_space =
133 srv_page_size * (100 - innobase_fill_factor) / 100;
134 }
135
136 m_padding_space =
137 srv_page_size - dict_index_zip_pad_optimal_page_size(m_index);
138 m_heap_top = page_header_get_ptr(new_page, PAGE_HEAP_TOP);
139 m_rec_no = page_header_get_field(new_page, PAGE_N_RECS);
140 /* Temporarily reset PAGE_DIRECTION_B from PAGE_NO_DIRECTION to 0,
141 without writing redo log, to ensure that needs_finish() will hold
142 on an empty page. */
143 ut_ad(m_page[PAGE_HEADER + PAGE_DIRECTION_B] == PAGE_NO_DIRECTION);
144 m_page[PAGE_HEADER + PAGE_DIRECTION_B] = 0;
145 ut_d(m_total_data = 0);
146
147 return(DB_SUCCESS);
148 }
149
150 /** Insert a record in the page.
151 @tparam fmt the page format
152 @param[in,out] rec record
153 @param[in] offsets record offsets */
154 template<PageBulk::format fmt>
insertPage(rec_t * rec,rec_offs * offsets)155 inline void PageBulk::insertPage(rec_t *rec, rec_offs *offsets)
156 {
157 ut_ad((m_page_zip != nullptr) == (fmt == COMPRESSED));
158 ut_ad((fmt != REDUNDANT) == m_is_comp);
159 ut_ad(page_align(m_heap_top) == m_page);
160 ut_ad(m_heap);
161
162 const ulint rec_size= rec_offs_size(offsets);
163 const ulint extra_size= rec_offs_extra_size(offsets);
164 ut_ad(page_align(m_heap_top + rec_size) == m_page);
165 ut_d(const bool is_leaf= page_rec_is_leaf(m_cur_rec));
166
167 #ifdef UNIV_DEBUG
168 /* Check whether records are in order. */
169 if (page_offset(m_cur_rec) !=
170 (fmt == REDUNDANT ? PAGE_OLD_INFIMUM : PAGE_NEW_INFIMUM))
171 {
172 const rec_t *old_rec = m_cur_rec;
173 rec_offs *old_offsets= rec_get_offsets(old_rec, m_index, nullptr, is_leaf
174 ? m_index->n_core_fields : 0,
175 ULINT_UNDEFINED, &m_heap);
176 ut_ad(cmp_rec_rec(rec, old_rec, offsets, old_offsets, m_index) > 0);
177 }
178
179 m_total_data+= rec_size;
180 #endif /* UNIV_DEBUG */
181
182 rec_t* const insert_rec= m_heap_top + extra_size;
183
184 /* Insert the record in the linked list. */
185 if (fmt != REDUNDANT)
186 {
187 const rec_t *next_rec= m_page +
188 page_offset(m_cur_rec + mach_read_from_2(m_cur_rec - REC_NEXT));
189 if (fmt != COMPRESSED)
190 m_mtr.write<2>(*m_block, m_cur_rec - REC_NEXT,
191 static_cast<uint16_t>(insert_rec - m_cur_rec));
192 else
193 {
194 mach_write_to_2(m_cur_rec - REC_NEXT,
195 static_cast<uint16_t>(insert_rec - m_cur_rec));
196 memcpy(m_heap_top, rec - extra_size, rec_size);
197 }
198
199 rec_t * const this_rec= fmt != COMPRESSED
200 ? const_cast<rec_t*>(rec) : insert_rec;
201 rec_set_bit_field_1(this_rec, 0, REC_NEW_N_OWNED, REC_N_OWNED_MASK,
202 REC_N_OWNED_SHIFT);
203 rec_set_bit_field_2(this_rec, PAGE_HEAP_NO_USER_LOW + m_rec_no,
204 REC_NEW_HEAP_NO, REC_HEAP_NO_MASK, REC_HEAP_NO_SHIFT);
205 mach_write_to_2(this_rec - REC_NEXT,
206 static_cast<uint16_t>(next_rec - insert_rec));
207 }
208 else
209 {
210 memcpy(const_cast<rec_t*>(rec) - REC_NEXT, m_cur_rec - REC_NEXT, 2);
211 m_mtr.write<2>(*m_block, m_cur_rec - REC_NEXT, page_offset(insert_rec));
212 rec_set_bit_field_1(const_cast<rec_t*>(rec), 0,
213 REC_OLD_N_OWNED, REC_N_OWNED_MASK, REC_N_OWNED_SHIFT);
214 rec_set_bit_field_2(const_cast<rec_t*>(rec),
215 PAGE_HEAP_NO_USER_LOW + m_rec_no,
216 REC_OLD_HEAP_NO, REC_HEAP_NO_MASK, REC_HEAP_NO_SHIFT);
217 }
218
219 if (fmt == COMPRESSED)
220 /* We already wrote the record. Log is written in PageBulk::compress(). */;
221 else if (page_offset(m_cur_rec) ==
222 (fmt == REDUNDANT ? PAGE_OLD_INFIMUM : PAGE_NEW_INFIMUM))
223 m_mtr.memcpy(*m_block, m_heap_top, rec - extra_size, rec_size);
224 else
225 {
226 /* Try to copy common prefix from the preceding record. */
227 const byte *r= rec - extra_size;
228 const byte * const insert_rec_end= m_heap_top + rec_size;
229 byte *b= m_heap_top;
230
231 /* Skip any unchanged prefix of the record. */
232 for (; * b == *r; b++, r++);
233
234 ut_ad(b < insert_rec_end);
235
236 const byte *c= m_cur_rec - (rec - r);
237 const byte * const c_end= std::min(m_cur_rec + rec_offs_data_size(offsets),
238 m_heap_top);
239
240 /* Try to copy any bytes of the preceding record. */
241 if (UNIV_LIKELY(c >= m_page && c < c_end))
242 {
243 const byte *cm= c;
244 byte *bm= b;
245 const byte *rm= r;
246 for (; cm < c_end && *rm == *cm; cm++, bm++, rm++);
247 ut_ad(bm <= insert_rec_end);
248 size_t len= static_cast<size_t>(rm - r);
249 ut_ad(!memcmp(r, c, len));
250 if (len > 2)
251 {
252 memcpy(b, c, len);
253 m_mtr.memmove(*m_block, page_offset(b), page_offset(c), len);
254 c= cm;
255 b= bm;
256 r= rm;
257 }
258 }
259
260 if (c < m_cur_rec)
261 {
262 if (!rec_offs_data_size(offsets))
263 {
264 no_data:
265 m_mtr.memcpy<mtr_t::FORCED>(*m_block, b, r, m_cur_rec - c);
266 goto rec_done;
267 }
268 /* Some header bytes differ. Compare the data separately. */
269 const byte *cd= m_cur_rec;
270 byte *bd= insert_rec;
271 const byte *rd= rec;
272 /* Skip any unchanged prefix of the record. */
273 for (;; cd++, bd++, rd++)
274 if (bd == insert_rec_end)
275 goto no_data;
276 else if (*bd != *rd)
277 break;
278
279 /* Try to copy any data bytes of the preceding record. */
280 if (c_end - cd > 2)
281 {
282 const byte *cdm= cd;
283 const byte *rdm= rd;
284 for (; cdm < c_end && *rdm == *cdm; cdm++, rdm++)
285 ut_ad(rdm - rd + bd <= insert_rec_end);
286 size_t len= static_cast<size_t>(rdm - rd);
287 ut_ad(!memcmp(rd, cd, len));
288 if (len > 2)
289 {
290 m_mtr.memcpy<mtr_t::FORCED>(*m_block, b, r, m_cur_rec - c);
291 memcpy(bd, cd, len);
292 m_mtr.memmove(*m_block, page_offset(bd), page_offset(cd), len);
293 c= cdm;
294 b= rdm - rd + bd;
295 r= rdm;
296 }
297 }
298 }
299
300 if (size_t len= static_cast<size_t>(insert_rec_end - b))
301 m_mtr.memcpy<mtr_t::FORCED>(*m_block, b, r, len);
302 }
303
304 rec_done:
305 ut_ad(fmt == COMPRESSED || !memcmp(m_heap_top, rec - extra_size, rec_size));
306 rec_offs_make_valid(insert_rec, m_index, is_leaf, offsets);
307
308 /* Update the member variables. */
309 ulint slot_size= page_dir_calc_reserved_space(m_rec_no + 1) -
310 page_dir_calc_reserved_space(m_rec_no);
311
312 ut_ad(m_free_space >= rec_size + slot_size);
313 ut_ad(m_heap_top + rec_size < m_page + srv_page_size);
314
315 m_free_space-= rec_size + slot_size;
316 m_heap_top+= rec_size;
317 m_rec_no++;
318 m_cur_rec= insert_rec;
319 }
320
321 /** Insert a record in the page.
322 @param[in] rec record
323 @param[in] offsets record offsets */
insert(const rec_t * rec,rec_offs * offsets)324 inline void PageBulk::insert(const rec_t *rec, rec_offs *offsets)
325 {
326 byte rec_hdr[REC_N_OLD_EXTRA_BYTES];
327 static_assert(REC_N_OLD_EXTRA_BYTES > REC_N_NEW_EXTRA_BYTES, "file format");
328
329 if (UNIV_LIKELY_NULL(m_page_zip))
330 insertPage<COMPRESSED>(const_cast<rec_t*>(rec), offsets);
331 else if (m_is_comp)
332 {
333 memcpy(rec_hdr, rec - REC_N_NEW_EXTRA_BYTES, REC_N_NEW_EXTRA_BYTES);
334 insertPage<DYNAMIC>(const_cast<rec_t*>(rec), offsets);
335 memcpy(const_cast<rec_t*>(rec) - REC_N_NEW_EXTRA_BYTES, rec_hdr,
336 REC_N_NEW_EXTRA_BYTES);
337 }
338 else
339 {
340 memcpy(rec_hdr, rec - REC_N_OLD_EXTRA_BYTES, REC_N_OLD_EXTRA_BYTES);
341 insertPage<REDUNDANT>(const_cast<rec_t*>(rec), offsets);
342 memcpy(const_cast<rec_t*>(rec) - REC_N_OLD_EXTRA_BYTES, rec_hdr,
343 REC_N_OLD_EXTRA_BYTES);
344 }
345 }
346
347 /** Set the number of owned records in the uncompressed page of
348 a ROW_FORMAT=COMPRESSED record without redo-logging. */
rec_set_n_owned_zip(rec_t * rec,ulint n_owned)349 static void rec_set_n_owned_zip(rec_t *rec, ulint n_owned)
350 {
351 rec_set_bit_field_1(rec, n_owned, REC_NEW_N_OWNED,
352 REC_N_OWNED_MASK, REC_N_OWNED_SHIFT);
353 }
354
355 /** Mark end of insertion to the page. Scan all records to set page dirs,
356 and set page header members.
357 @tparam fmt page format */
358 template<PageBulk::format fmt>
finishPage()359 inline void PageBulk::finishPage()
360 {
361 ut_ad((m_page_zip != nullptr) == (fmt == COMPRESSED));
362 ut_ad((fmt != REDUNDANT) == m_is_comp);
363
364 ulint count= 0;
365 ulint n_recs= 0;
366 byte *slot= my_assume_aligned<2>(m_page + srv_page_size -
367 (PAGE_DIR + PAGE_DIR_SLOT_SIZE));
368 const page_dir_slot_t *const slot0 = slot;
369 compile_time_assert(PAGE_DIR_SLOT_SIZE == 2);
370 if (fmt != REDUNDANT)
371 {
372 uint16_t offset= mach_read_from_2(PAGE_NEW_INFIMUM - REC_NEXT + m_page);
373 ut_ad(offset >= PAGE_NEW_SUPREMUM - PAGE_NEW_INFIMUM);
374 offset= static_cast<uint16_t>(offset + PAGE_NEW_INFIMUM);
375 /* Set owner & dir. */
376 while (offset != PAGE_NEW_SUPREMUM)
377 {
378 ut_ad(offset >= PAGE_NEW_SUPREMUM);
379 ut_ad(offset < page_offset(slot));
380 count++;
381 n_recs++;
382
383 if (count == (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2)
384 {
385 slot-= PAGE_DIR_SLOT_SIZE;
386 mach_write_to_2(slot, offset);
387
388 if (fmt != COMPRESSED)
389 page_rec_set_n_owned<false>(m_block, m_page + offset, count, true,
390 &m_mtr);
391 else
392 rec_set_n_owned_zip(m_page + offset, count);
393
394 count= 0;
395 }
396
397 uint16_t next= static_cast<uint16_t>
398 ((mach_read_from_2(m_page + offset - REC_NEXT) + offset) &
399 (srv_page_size - 1));
400 ut_ad(next);
401 offset= next;
402 }
403
404 if (slot0 != slot && (count + 1 + (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2 <=
405 PAGE_DIR_SLOT_MAX_N_OWNED))
406 {
407 /* Merge the last two slots, like page_cur_insert_rec_low() does. */
408 count+= (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2;
409
410 rec_t *rec= const_cast<rec_t*>(page_dir_slot_get_rec(slot));
411 if (fmt != COMPRESSED)
412 page_rec_set_n_owned<false>(m_block, rec, 0, true, &m_mtr);
413 else
414 rec_set_n_owned_zip(rec, 0);
415 }
416 else
417 slot-= PAGE_DIR_SLOT_SIZE;
418
419 mach_write_to_2(slot, PAGE_NEW_SUPREMUM);
420 if (fmt != COMPRESSED)
421 page_rec_set_n_owned<false>(m_block, m_page + PAGE_NEW_SUPREMUM,
422 count + 1, true, &m_mtr);
423 else
424 rec_set_n_owned_zip(m_page + PAGE_NEW_SUPREMUM, count + 1);
425 }
426 else
427 {
428 rec_t *insert_rec= m_page +
429 mach_read_from_2(PAGE_OLD_INFIMUM - REC_NEXT + m_page);
430
431 /* Set owner & dir. */
432 while (insert_rec != m_page + PAGE_OLD_SUPREMUM)
433 {
434 count++;
435 n_recs++;
436
437 if (count == (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2)
438 {
439 slot-= PAGE_DIR_SLOT_SIZE;
440 mach_write_to_2(slot, page_offset(insert_rec));
441 page_rec_set_n_owned<false>(m_block, insert_rec, count, false, &m_mtr);
442 count= 0;
443 }
444
445 insert_rec= m_page + mach_read_from_2(insert_rec - REC_NEXT);
446 }
447
448 if (slot0 != slot && (count + 1 + (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2 <=
449 PAGE_DIR_SLOT_MAX_N_OWNED))
450 {
451 /* Merge the last two slots, like page_cur_insert_rec_low() does. */
452 count+= (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2;
453
454 rec_t *rec= const_cast<rec_t*>(page_dir_slot_get_rec(slot));
455 page_rec_set_n_owned<false>(m_block, rec, 0, false, &m_mtr);
456 }
457 else
458 slot-= PAGE_DIR_SLOT_SIZE;
459
460 mach_write_to_2(slot, PAGE_OLD_SUPREMUM);
461 page_rec_set_n_owned<false>(m_block, m_page + PAGE_OLD_SUPREMUM, count + 1,
462 false, &m_mtr);
463 }
464
465 if (!m_rec_no);
466 else if (fmt != COMPRESSED)
467 {
468 static_assert(PAGE_N_DIR_SLOTS == 0, "compatibility");
469 alignas(8) byte page_header[PAGE_N_HEAP + 2];
470 mach_write_to_2(page_header + PAGE_N_DIR_SLOTS,
471 1 + (slot0 - slot) / PAGE_DIR_SLOT_SIZE);
472 mach_write_to_2(page_header + PAGE_HEAP_TOP, m_heap_top - m_page);
473 mach_write_to_2(page_header + PAGE_N_HEAP,
474 (PAGE_HEAP_NO_USER_LOW + m_rec_no) |
475 uint16_t{fmt != REDUNDANT} << 15);
476 m_mtr.memcpy(*m_block, PAGE_HEADER + m_page, page_header,
477 sizeof page_header);
478 m_mtr.write<2>(*m_block, PAGE_HEADER + PAGE_N_RECS + m_page, m_rec_no);
479 m_mtr.memcpy(*m_block, page_offset(slot), slot0 - slot);
480 }
481 else
482 {
483 /* For ROW_FORMAT=COMPRESSED, redo log may be written in
484 PageBulk::compress(). */
485 mach_write_to_2(PAGE_HEADER + PAGE_N_DIR_SLOTS + m_page,
486 1 + (slot0 - slot) / PAGE_DIR_SLOT_SIZE);
487 mach_write_to_2(PAGE_HEADER + PAGE_HEAP_TOP + m_page,
488 static_cast<ulint>(m_heap_top - m_page));
489 mach_write_to_2(PAGE_HEADER + PAGE_N_HEAP + m_page,
490 (PAGE_HEAP_NO_USER_LOW + m_rec_no) | 1U << 15);
491 mach_write_to_2(PAGE_HEADER + PAGE_N_RECS + m_page, m_rec_no);
492 }
493 }
494
needs_finish() const495 inline bool PageBulk::needs_finish() const
496 {
497 ut_ad(page_align(m_cur_rec) == m_block->frame);
498 ut_ad(m_page == m_block->frame);
499 if (!m_page[PAGE_HEADER + PAGE_DIRECTION_B])
500 return true;
501 ulint heap_no, n_heap= page_header_get_field(m_page, PAGE_N_HEAP);
502 ut_ad((n_heap & 0x7fff) >= PAGE_HEAP_NO_USER_LOW);
503 if (n_heap & 0x8000)
504 {
505 n_heap&= 0x7fff;
506 heap_no= rec_get_heap_no_new(m_cur_rec);
507 if (heap_no == PAGE_HEAP_NO_INFIMUM &&
508 page_header_get_field(m_page, PAGE_HEAP_TOP) == PAGE_NEW_SUPREMUM_END)
509 return false;
510 }
511 else
512 {
513 heap_no= rec_get_heap_no_old(m_cur_rec);
514 if (heap_no == PAGE_HEAP_NO_INFIMUM &&
515 page_header_get_field(m_page, PAGE_HEAP_TOP) == PAGE_OLD_SUPREMUM_END)
516 return false;
517 }
518 return heap_no != n_heap - 1;
519 }
520
521 /** Mark end of insertion to the page. Scan all records to set page dirs,
522 and set page header members.
523 @tparam compressed whether the page is in ROW_FORMAT=COMPRESSED */
finish()524 inline void PageBulk::finish()
525 {
526 ut_ad(!m_index->is_spatial());
527
528 if (!needs_finish());
529 else if (UNIV_LIKELY_NULL(m_page_zip))
530 finishPage<COMPRESSED>();
531 else if (m_is_comp)
532 finishPage<DYNAMIC>();
533 else
534 finishPage<REDUNDANT>();
535
536 /* In MariaDB 10.2, 10.3, 10.4, we would initialize
537 PAGE_DIRECTION_B, PAGE_N_DIRECTION, PAGE_LAST_INSERT
538 in the same way as we would during normal INSERT operations.
539 Starting with MariaDB Server 10.5, bulk insert will not
540 touch those fields. */
541 ut_ad(!m_page[PAGE_HEADER + PAGE_INSTANT]);
542 /* Restore the temporary change of PageBulk::init() that was necessary to
543 ensure that PageBulk::needs_finish() holds on an empty page. */
544 m_page[PAGE_HEADER + PAGE_DIRECTION_B]= PAGE_NO_DIRECTION;
545
546 ut_ad(!page_header_get_field(m_page, PAGE_FREE));
547 ut_ad(!page_header_get_field(m_page, PAGE_GARBAGE));
548 ut_ad(!page_header_get_field(m_page, PAGE_LAST_INSERT));
549 ut_ad(!page_header_get_field(m_page, PAGE_N_DIRECTION));
550 ut_ad(m_total_data + page_dir_calc_reserved_space(m_rec_no) <=
551 page_get_free_space_of_empty(m_is_comp));
552 ut_ad(!needs_finish());
553 ut_ad(page_validate(m_page, m_index));
554 }
555
556 /** Commit inserts done to the page
557 @param[in] success Flag whether all inserts succeed. */
commit(bool success)558 void PageBulk::commit(bool success)
559 {
560 finish();
561 if (success && !dict_index_is_clust(m_index) && page_is_leaf(m_page))
562 ibuf_set_bitmap_for_bulk_load(m_block, innobase_fill_factor == 100);
563 m_mtr.commit();
564 }
565
566 /** Compress a page of compressed table
567 @return true compress successfully or no need to compress
568 @return false compress failed. */
569 bool
compress()570 PageBulk::compress()
571 {
572 ut_ad(m_page_zip != NULL);
573
574 return page_zip_compress(m_block, m_index, page_zip_level, &m_mtr);
575 }
576
577 /** Get node pointer
578 @return node pointer */
579 dtuple_t*
getNodePtr()580 PageBulk::getNodePtr()
581 {
582 rec_t* first_rec;
583 dtuple_t* node_ptr;
584
585 /* Create node pointer */
586 first_rec = page_rec_get_next(page_get_infimum_rec(m_page));
587 ut_a(page_rec_is_user_rec(first_rec));
588 node_ptr = dict_index_build_node_ptr(m_index, first_rec, m_page_no,
589 m_heap, m_level);
590
591 return(node_ptr);
592 }
593
594 /** Get split rec in left page.We split a page in half when compresssion fails,
595 and the split rec will be copied to right page.
596 @return split rec */
597 rec_t*
getSplitRec()598 PageBulk::getSplitRec()
599 {
600 rec_t* rec;
601 rec_offs* offsets;
602 ulint total_used_size;
603 ulint total_recs_size;
604 ulint n_recs;
605
606 ut_ad(m_page_zip != NULL);
607 ut_ad(m_rec_no >= 2);
608 ut_ad(!m_index->is_instant());
609
610 ut_ad(page_get_free_space_of_empty(m_is_comp) > m_free_space);
611 total_used_size = page_get_free_space_of_empty(m_is_comp)
612 - m_free_space;
613
614 total_recs_size = 0;
615 n_recs = 0;
616 offsets = NULL;
617 rec = page_get_infimum_rec(m_page);
618 const ulint n_core = page_is_leaf(m_page) ? m_index->n_core_fields : 0;
619
620 do {
621 rec = page_rec_get_next(rec);
622 ut_ad(page_rec_is_user_rec(rec));
623
624 offsets = rec_get_offsets(rec, m_index, offsets, n_core,
625 ULINT_UNDEFINED, &m_heap);
626 total_recs_size += rec_offs_size(offsets);
627 n_recs++;
628 } while (total_recs_size + page_dir_calc_reserved_space(n_recs)
629 < total_used_size / 2);
630
631 /* Keep at least one record on left page */
632 if (page_rec_is_infimum(page_rec_get_prev(rec))) {
633 rec = page_rec_get_next(rec);
634 ut_ad(page_rec_is_user_rec(rec));
635 }
636
637 return(rec);
638 }
639
640 /** Copy all records after split rec including itself.
641 @param[in] rec split rec */
642 void
copyIn(rec_t * split_rec)643 PageBulk::copyIn(
644 rec_t* split_rec)
645 {
646
647 rec_t* rec = split_rec;
648 rec_offs* offsets = NULL;
649
650 ut_ad(m_rec_no == 0);
651 ut_ad(page_rec_is_user_rec(rec));
652
653 const ulint n_core = page_rec_is_leaf(rec)
654 ? m_index->n_core_fields : 0;
655
656 do {
657 offsets = rec_get_offsets(rec, m_index, offsets, n_core,
658 ULINT_UNDEFINED, &m_heap);
659
660 insert(rec, offsets);
661
662 rec = page_rec_get_next(rec);
663 } while (!page_rec_is_supremum(rec));
664
665 ut_ad(m_rec_no > 0);
666 }
667
668 /** Remove all records after split rec including itself.
669 @param[in] rec split rec */
670 void
copyOut(rec_t * split_rec)671 PageBulk::copyOut(
672 rec_t* split_rec)
673 {
674 rec_t* rec;
675 rec_t* last_rec;
676 ulint n;
677
678 /* Suppose before copyOut, we have 5 records on the page:
679 infimum->r1->r2->r3->r4->r5->supremum, and r3 is the split rec.
680
681 after copyOut, we have 2 records on the page:
682 infimum->r1->r2->supremum. slot ajustment is not done. */
683
684 rec = page_rec_get_next(page_get_infimum_rec(m_page));
685 last_rec = page_rec_get_prev(page_get_supremum_rec(m_page));
686 n = 0;
687
688 while (rec != split_rec) {
689 rec = page_rec_get_next(rec);
690 n++;
691 }
692
693 ut_ad(n > 0);
694
695 /* Set last record's next in page */
696 rec_offs* offsets = NULL;
697 rec = page_rec_get_prev(split_rec);
698 const ulint n_core = page_rec_is_leaf(split_rec)
699 ? m_index->n_core_fields : 0;
700
701 offsets = rec_get_offsets(rec, m_index, offsets, n_core,
702 ULINT_UNDEFINED, &m_heap);
703 mach_write_to_2(rec - REC_NEXT, m_is_comp
704 ? static_cast<uint16_t>
705 (PAGE_NEW_SUPREMUM - page_offset(rec))
706 : PAGE_OLD_SUPREMUM);
707
708 /* Set related members */
709 m_cur_rec = rec;
710 m_heap_top = rec_get_end(rec, offsets);
711
712 offsets = rec_get_offsets(last_rec, m_index, offsets, n_core,
713 ULINT_UNDEFINED, &m_heap);
714
715 m_free_space += ulint(rec_get_end(last_rec, offsets) - m_heap_top)
716 + page_dir_calc_reserved_space(m_rec_no)
717 - page_dir_calc_reserved_space(n);
718 ut_ad(lint(m_free_space) > 0);
719 m_rec_no = n;
720
721 #ifdef UNIV_DEBUG
722 m_total_data -= ulint(rec_get_end(last_rec, offsets) - m_heap_top);
723 #endif /* UNIV_DEBUG */
724 }
725
726 /** Set next page
727 @param[in] next_page_no next page no */
setNext(ulint next_page_no)728 inline void PageBulk::setNext(ulint next_page_no)
729 {
730 if (UNIV_LIKELY_NULL(m_page_zip))
731 /* For ROW_FORMAT=COMPRESSED, redo log may be written
732 in PageBulk::compress(). */
733 mach_write_to_4(m_page + FIL_PAGE_NEXT, next_page_no);
734 else
735 m_mtr.write<4>(*m_block, m_page + FIL_PAGE_NEXT, next_page_no);
736 }
737
738 /** Set previous page
739 @param[in] prev_page_no previous page no */
setPrev(ulint prev_page_no)740 inline void PageBulk::setPrev(ulint prev_page_no)
741 {
742 if (UNIV_LIKELY_NULL(m_page_zip))
743 /* For ROW_FORMAT=COMPRESSED, redo log may be written
744 in PageBulk::compress(). */
745 mach_write_to_4(m_page + FIL_PAGE_PREV, prev_page_no);
746 else
747 m_mtr.write<4>(*m_block, m_page + FIL_PAGE_PREV, prev_page_no);
748 }
749
750 /** Check if required space is available in the page for the rec to be inserted.
751 We check fill factor & padding here.
752 @param[in] length required length
753 @return true if space is available */
754 bool
isSpaceAvailable(ulint rec_size)755 PageBulk::isSpaceAvailable(
756 ulint rec_size)
757 {
758 ulint slot_size;
759 ulint required_space;
760
761 slot_size = page_dir_calc_reserved_space(m_rec_no + 1)
762 - page_dir_calc_reserved_space(m_rec_no);
763
764 required_space = rec_size + slot_size;
765
766 if (required_space > m_free_space) {
767 ut_ad(m_rec_no > 0);
768 return false;
769 }
770
771 /* Fillfactor & Padding apply to both leaf and non-leaf pages.
772 Note: we keep at least 2 records in a page to avoid B-tree level
773 growing too high. */
774 if (m_rec_no >= 2
775 && ((m_page_zip == NULL && m_free_space - required_space
776 < m_reserved_space)
777 || (m_page_zip != NULL && m_free_space - required_space
778 < m_padding_space))) {
779 return(false);
780 }
781
782 return(true);
783 }
784
785 /** Check whether the record needs to be stored externally.
786 @return false if the entire record can be stored locally on the page */
787 bool
needExt(const dtuple_t * tuple,ulint rec_size)788 PageBulk::needExt(
789 const dtuple_t* tuple,
790 ulint rec_size)
791 {
792 return page_zip_rec_needs_ext(rec_size, m_is_comp,
793 dtuple_get_n_fields(tuple),
794 m_block->zip_size());
795 }
796
797 /** Store external record
798 Since the record is not logged yet, so we don't log update to the record.
799 the blob data is logged first, then the record is logged in bulk mode.
800 @param[in] big_rec external recrod
801 @param[in] offsets record offsets
802 @return error code */
803 dberr_t
storeExt(const big_rec_t * big_rec,rec_offs * offsets)804 PageBulk::storeExt(
805 const big_rec_t* big_rec,
806 rec_offs* offsets)
807 {
808 finish();
809
810 /* Note: not all fields are initialized in btr_pcur. */
811 btr_pcur_t btr_pcur;
812 btr_pcur.pos_state = BTR_PCUR_IS_POSITIONED;
813 btr_pcur.latch_mode = BTR_MODIFY_LEAF;
814 btr_pcur.btr_cur.index = m_index;
815 btr_pcur.btr_cur.page_cur.index = m_index;
816 btr_pcur.btr_cur.page_cur.rec = m_cur_rec;
817 btr_pcur.btr_cur.page_cur.offsets = offsets;
818 btr_pcur.btr_cur.page_cur.block = m_block;
819
820 dberr_t err = btr_store_big_rec_extern_fields(
821 &btr_pcur, offsets, big_rec, &m_mtr, BTR_STORE_INSERT_BULK);
822
823 /* Reset m_block and m_cur_rec from page cursor, because
824 block may be changed during blob insert. (FIXME: Can it really?) */
825 ut_ad(m_block == btr_pcur.btr_cur.page_cur.block);
826
827 m_block = btr_pcur.btr_cur.page_cur.block;
828 m_cur_rec = btr_pcur.btr_cur.page_cur.rec;
829 m_page = buf_block_get_frame(m_block);
830
831 return(err);
832 }
833
834 /** Release block by commiting mtr
835 Note: log_free_check requires holding no lock/latch in current thread. */
836 void
release()837 PageBulk::release()
838 {
839 finish();
840
841 /* We fix the block because we will re-pin it soon. */
842 buf_block_buf_fix_inc(m_block, __FILE__, __LINE__);
843
844 /* No other threads can modify this block. */
845 m_modify_clock = buf_block_get_modify_clock(m_block);
846
847 m_mtr.commit();
848 }
849
850 /** Start mtr and latch the block */
851 dberr_t
latch()852 PageBulk::latch()
853 {
854 m_mtr.start();
855 m_index->set_modified(m_mtr);
856
857 ut_ad(m_block->page.buf_fix_count());
858
859 /* In case the block is S-latched by page_cleaner. */
860 if (!buf_page_optimistic_get(RW_X_LATCH, m_block, m_modify_clock,
861 __FILE__, __LINE__, &m_mtr)) {
862 m_block = buf_page_get_gen(page_id_t(m_index->table->space_id,
863 m_page_no),
864 0, RW_X_LATCH,
865 m_block, BUF_GET_IF_IN_POOL,
866 __FILE__, __LINE__, &m_mtr, &m_err);
867
868 if (m_err != DB_SUCCESS) {
869 return (m_err);
870 }
871
872 ut_ad(m_block != NULL);
873 }
874
875 buf_block_buf_fix_dec(m_block);
876
877 ut_ad(m_block->page.buf_fix_count());
878
879 ut_ad(m_cur_rec > m_page && m_cur_rec < m_heap_top);
880
881 return (m_err);
882 }
883
884 /** Split a page
885 @param[in] page_bulk page to split
886 @param[in] next_page_bulk next page
887 @return error code */
888 dberr_t
pageSplit(PageBulk * page_bulk,PageBulk * next_page_bulk)889 BtrBulk::pageSplit(
890 PageBulk* page_bulk,
891 PageBulk* next_page_bulk)
892 {
893 ut_ad(page_bulk->getPageZip() != NULL);
894
895 if (page_bulk->getRecNo() <= 1) {
896 return(DB_TOO_BIG_RECORD);
897 }
898
899 /* Initialize a new page */
900 PageBulk new_page_bulk(m_index, m_trx->id, FIL_NULL,
901 page_bulk->getLevel());
902 dberr_t err = new_page_bulk.init();
903 if (err != DB_SUCCESS) {
904 return(err);
905 }
906
907 /* Copy the upper half to the new page. */
908 rec_t* split_rec = page_bulk->getSplitRec();
909 new_page_bulk.copyIn(split_rec);
910 page_bulk->copyOut(split_rec);
911
912 /* Commit the pages after split. */
913 err = pageCommit(page_bulk, &new_page_bulk, true);
914 if (err != DB_SUCCESS) {
915 pageAbort(&new_page_bulk);
916 return(err);
917 }
918
919 err = pageCommit(&new_page_bulk, next_page_bulk, true);
920 if (err != DB_SUCCESS) {
921 pageAbort(&new_page_bulk);
922 return(err);
923 }
924
925 return(err);
926 }
927
928 /** Commit(finish) a page. We set next/prev page no, compress a page of
929 compressed table and split the page if compression fails, insert a node
930 pointer to father page if needed, and commit mini-transaction.
931 @param[in] page_bulk page to commit
932 @param[in] next_page_bulk next page
933 @param[in] insert_father false when page_bulk is a root page and
934 true when it's a non-root page
935 @return error code */
936 dberr_t
pageCommit(PageBulk * page_bulk,PageBulk * next_page_bulk,bool insert_father)937 BtrBulk::pageCommit(
938 PageBulk* page_bulk,
939 PageBulk* next_page_bulk,
940 bool insert_father)
941 {
942 page_bulk->finish();
943
944 /* Set page links */
945 if (next_page_bulk != NULL) {
946 ut_ad(page_bulk->getLevel() == next_page_bulk->getLevel());
947
948 page_bulk->setNext(next_page_bulk->getPageNo());
949 next_page_bulk->setPrev(page_bulk->getPageNo());
950 } else {
951 ut_ad(!page_has_next(page_bulk->getPage()));
952 /* If a page is released and latched again, we need to
953 mark it modified in mini-transaction. */
954 page_bulk->set_modified();
955 }
956
957 ut_ad(!rw_lock_own_flagged(&m_index->lock,
958 RW_LOCK_FLAG_X | RW_LOCK_FLAG_SX
959 | RW_LOCK_FLAG_S));
960
961 /* Compress page if it's a compressed table. */
962 if (page_bulk->getPageZip() != NULL && !page_bulk->compress()) {
963 return(pageSplit(page_bulk, next_page_bulk));
964 }
965
966 /* Insert node pointer to father page. */
967 if (insert_father) {
968 dtuple_t* node_ptr = page_bulk->getNodePtr();
969 dberr_t err = insert(node_ptr, page_bulk->getLevel()+1);
970
971 if (err != DB_SUCCESS) {
972 return(err);
973 }
974 }
975
976 /* Commit mtr. */
977 page_bulk->commit(true);
978
979 return(DB_SUCCESS);
980 }
981
982 /** Log free check */
logFreeCheck()983 inline void BtrBulk::logFreeCheck()
984 {
985 if (log_sys.check_flush_or_checkpoint()) {
986 release();
987
988 log_check_margins();
989
990 latch();
991 }
992 }
993
994 /** Release all latches */
995 void
release()996 BtrBulk::release()
997 {
998 ut_ad(m_root_level + 1 == m_page_bulks.size());
999
1000 for (ulint level = 0; level <= m_root_level; level++) {
1001 PageBulk* page_bulk = m_page_bulks.at(level);
1002
1003 page_bulk->release();
1004 }
1005 }
1006
1007 /** Re-latch all latches */
1008 void
latch()1009 BtrBulk::latch()
1010 {
1011 ut_ad(m_root_level + 1 == m_page_bulks.size());
1012
1013 for (ulint level = 0; level <= m_root_level; level++) {
1014 PageBulk* page_bulk = m_page_bulks.at(level);
1015 page_bulk->latch();
1016 }
1017 }
1018
1019 /** Insert a tuple to page in a level
1020 @param[in] tuple tuple to insert
1021 @param[in] level B-tree level
1022 @return error code */
1023 dberr_t
insert(dtuple_t * tuple,ulint level)1024 BtrBulk::insert(
1025 dtuple_t* tuple,
1026 ulint level)
1027 {
1028 bool is_left_most = false;
1029 dberr_t err = DB_SUCCESS;
1030
1031 /* Check if we need to create a PageBulk for the level. */
1032 if (level + 1 > m_page_bulks.size()) {
1033 PageBulk* new_page_bulk
1034 = UT_NEW_NOKEY(PageBulk(m_index, m_trx->id, FIL_NULL,
1035 level));
1036 err = new_page_bulk->init();
1037 if (err != DB_SUCCESS) {
1038 UT_DELETE(new_page_bulk);
1039 return(err);
1040 }
1041
1042 m_page_bulks.push_back(new_page_bulk);
1043 ut_ad(level + 1 == m_page_bulks.size());
1044 m_root_level = level;
1045
1046 is_left_most = true;
1047 }
1048
1049 ut_ad(m_page_bulks.size() > level);
1050
1051 PageBulk* page_bulk = m_page_bulks.at(level);
1052
1053 if (is_left_most && level > 0 && page_bulk->getRecNo() == 0) {
1054 /* The node pointer must be marked as the predefined minimum
1055 record, as there is no lower alphabetical limit to records in
1056 the leftmost node of a level: */
1057 dtuple_set_info_bits(tuple, dtuple_get_info_bits(tuple)
1058 | REC_INFO_MIN_REC_FLAG);
1059 }
1060
1061 ulint n_ext = 0;
1062 ulint rec_size = rec_get_converted_size(m_index, tuple, n_ext);
1063 big_rec_t* big_rec = NULL;
1064 rec_t* rec = NULL;
1065 rec_offs* offsets = NULL;
1066
1067 if (page_bulk->needExt(tuple, rec_size)) {
1068 /* The record is so big that we have to store some fields
1069 externally on separate database pages */
1070 big_rec = dtuple_convert_big_rec(m_index, 0, tuple, &n_ext);
1071
1072 if (big_rec == NULL) {
1073 return(DB_TOO_BIG_RECORD);
1074 }
1075
1076 rec_size = rec_get_converted_size(m_index, tuple, n_ext);
1077 }
1078
1079 if (page_bulk->getPageZip() != NULL
1080 && page_zip_is_too_big(m_index, tuple)) {
1081 err = DB_TOO_BIG_RECORD;
1082 goto func_exit;
1083 }
1084
1085 if (!page_bulk->isSpaceAvailable(rec_size)) {
1086 /* Create a sibling page_bulk. */
1087 PageBulk* sibling_page_bulk;
1088 sibling_page_bulk = UT_NEW_NOKEY(PageBulk(m_index, m_trx->id,
1089 FIL_NULL, level));
1090 err = sibling_page_bulk->init();
1091 if (err != DB_SUCCESS) {
1092 UT_DELETE(sibling_page_bulk);
1093 goto func_exit;
1094 }
1095
1096 /* Commit page bulk. */
1097 err = pageCommit(page_bulk, sibling_page_bulk, true);
1098 if (err != DB_SUCCESS) {
1099 pageAbort(sibling_page_bulk);
1100 UT_DELETE(sibling_page_bulk);
1101 goto func_exit;
1102 }
1103
1104 /* Set new page bulk to page_bulks. */
1105 ut_ad(sibling_page_bulk->getLevel() <= m_root_level);
1106 m_page_bulks.at(level) = sibling_page_bulk;
1107
1108 UT_DELETE(page_bulk);
1109 page_bulk = sibling_page_bulk;
1110
1111 /* Important: log_free_check whether we need a checkpoint. */
1112 if (page_is_leaf(sibling_page_bulk->getPage())) {
1113 if (trx_is_interrupted(m_trx)) {
1114 err = DB_INTERRUPTED;
1115 goto func_exit;
1116 }
1117
1118 srv_inc_activity_count();
1119 logFreeCheck();
1120 }
1121 }
1122
1123 /* Convert tuple to rec. */
1124 rec = rec_convert_dtuple_to_rec(static_cast<byte*>(mem_heap_alloc(
1125 page_bulk->m_heap, rec_size)), m_index, tuple, n_ext);
1126 offsets = rec_get_offsets(rec, m_index, offsets, level
1127 ? 0 : m_index->n_core_fields,
1128 ULINT_UNDEFINED, &page_bulk->m_heap);
1129
1130 page_bulk->insert(rec, offsets);
1131
1132 if (big_rec != NULL) {
1133 ut_ad(dict_index_is_clust(m_index));
1134 ut_ad(page_bulk->getLevel() == 0);
1135 ut_ad(page_bulk == m_page_bulks.at(0));
1136
1137 /* Release all pages above the leaf level */
1138 for (ulint level = 1; level <= m_root_level; level++) {
1139 m_page_bulks.at(level)->release();
1140 }
1141
1142 err = page_bulk->storeExt(big_rec, offsets);
1143
1144 /* Latch */
1145 for (ulint level = 1; level <= m_root_level; level++) {
1146 PageBulk* page_bulk = m_page_bulks.at(level);
1147 page_bulk->latch();
1148 }
1149 }
1150
1151 func_exit:
1152 if (big_rec != NULL) {
1153 dtuple_convert_back_big_rec(m_index, tuple, big_rec);
1154 }
1155
1156 return(err);
1157 }
1158
1159 /** Btree bulk load finish. We commit the last page in each level
1160 and copy the last page in top level to the root page of the index
1161 if no error occurs.
1162 @param[in] err whether bulk load was successful until now
1163 @return error code */
1164 dberr_t
finish(dberr_t err)1165 BtrBulk::finish(dberr_t err)
1166 {
1167 uint32_t last_page_no = FIL_NULL;
1168
1169 ut_ad(!m_index->table->is_temporary());
1170
1171 if (m_page_bulks.size() == 0) {
1172 /* The table is empty. The root page of the index tree
1173 is already in a consistent state. No need to flush. */
1174 return(err);
1175 }
1176
1177 ut_ad(m_root_level + 1 == m_page_bulks.size());
1178
1179 /* Finish all page bulks */
1180 for (ulint level = 0; level <= m_root_level; level++) {
1181 PageBulk* page_bulk = m_page_bulks.at(level);
1182
1183 last_page_no = page_bulk->getPageNo();
1184
1185 if (err == DB_SUCCESS) {
1186 err = pageCommit(page_bulk, NULL,
1187 level != m_root_level);
1188 }
1189
1190 if (err != DB_SUCCESS) {
1191 pageAbort(page_bulk);
1192 }
1193
1194 UT_DELETE(page_bulk);
1195 }
1196
1197 if (err == DB_SUCCESS) {
1198 rec_t* first_rec;
1199 mtr_t mtr;
1200 buf_block_t* last_block;
1201 PageBulk root_page_bulk(m_index, m_trx->id,
1202 m_index->page, m_root_level);
1203
1204 mtr.start();
1205 m_index->set_modified(mtr);
1206 mtr_x_lock_index(m_index, &mtr);
1207
1208 ut_ad(last_page_no != FIL_NULL);
1209 last_block = btr_block_get(*m_index, last_page_no, RW_X_LATCH,
1210 false, &mtr);
1211 first_rec = page_rec_get_next(
1212 page_get_infimum_rec(last_block->frame));
1213 ut_ad(page_rec_is_user_rec(first_rec));
1214
1215 /* Copy last page to root page. */
1216 err = root_page_bulk.init();
1217 if (err != DB_SUCCESS) {
1218 mtr.commit();
1219 return(err);
1220 }
1221 root_page_bulk.copyIn(first_rec);
1222 root_page_bulk.finish();
1223
1224 /* Remove last page. */
1225 btr_page_free(m_index, last_block, &mtr);
1226
1227 mtr.commit();
1228
1229 err = pageCommit(&root_page_bulk, NULL, false);
1230 ut_ad(err == DB_SUCCESS);
1231 }
1232
1233 ut_ad(!sync_check_iterate(dict_sync_check()));
1234
1235 ut_ad(err != DB_SUCCESS
1236 || btr_validate_index(m_index, NULL) == DB_SUCCESS);
1237 return(err);
1238 }
1239