1 /*****************************************************************************
2
3 Copyright (c) 1994, 2016, Oracle and/or its affiliates. All Rights Reserved.
4 Copyright (c) 2012, Facebook Inc.
5 Copyright (c) 2017, 2021, MariaDB Corporation.
6
7 This program is free software; you can redistribute it and/or modify it under
8 the terms of the GNU General Public License as published by the Free Software
9 Foundation; version 2 of the License.
10
11 This program is distributed in the hope that it will be useful, but WITHOUT
12 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
13 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License along with
16 this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
18
19 *****************************************************************************/
20
21 /**************************************************//**
22 @file page/page0page.cc
23 Index page routines
24
25 Created 2/2/1994 Heikki Tuuri
26 *******************************************************/
27
28 #include "page0page.h"
29 #include "page0cur.h"
30 #include "page0zip.h"
31 #include "buf0buf.h"
32 #include "buf0checksum.h"
33 #include "btr0btr.h"
34 #include "srv0srv.h"
35 #include "lock0lock.h"
36 #include "fut0lst.h"
37 #include "btr0sea.h"
38 #include "trx0sys.h"
39 #include <algorithm>
40
41 /* THE INDEX PAGE
42 ==============
43
44 The index page consists of a page header which contains the page's
45 id and other information. On top of it are the index records
46 in a heap linked into a one way linear list according to alphabetic order.
47
48 Just below page end is an array of pointers which we call page directory,
49 to about every sixth record in the list. The pointers are placed in
50 the directory in the alphabetical order of the records pointed to,
51 enabling us to make binary search using the array. Each slot n:o I
52 in the directory points to a record, where a 4-bit field contains a count
53 of those records which are in the linear list between pointer I and
54 the pointer I - 1 in the directory, including the record
55 pointed to by pointer I and not including the record pointed to by I - 1.
56 We say that the record pointed to by slot I, or that slot I, owns
57 these records. The count is always kept in the range 4 to 8, with
58 the exception that it is 1 for the first slot, and 1--8 for the second slot.
59
60 An essentially binary search can be performed in the list of index
61 records, like we could do if we had pointer to every record in the
62 page directory. The data structure is, however, more efficient when
63 we are doing inserts, because most inserts are just pushed on a heap.
64 Only every 8th insert requires block move in the directory pointer
65 table, which itself is quite small. A record is deleted from the page
66 by just taking it off the linear list and updating the number of owned
67 records-field of the record which owns it, and updating the page directory,
68 if necessary. A special case is the one when the record owns itself.
69 Because the overhead of inserts is so small, we may also increase the
70 page size from the projected default of 8 kB to 64 kB without too
71 much loss of efficiency in inserts. Bigger page becomes actual
72 when the disk transfer rate compared to seek and latency time rises.
73 On the present system, the page size is set so that the page transfer
74 time (3 ms) is 20 % of the disk random access time (15 ms).
75
76 When the page is split, merged, or becomes full but contains deleted
77 records, we have to reorganize the page.
78
79 Assuming a page size of 8 kB, a typical index page of a secondary
80 index contains 300 index entries, and the size of the page directory
81 is 50 x 4 bytes = 200 bytes. */
82
83 /***************************************************************//**
84 Looks for the directory slot which owns the given record.
85 @return the directory slot number */
86 ulint
page_dir_find_owner_slot(const rec_t * rec)87 page_dir_find_owner_slot(
88 /*=====================*/
89 const rec_t* rec) /*!< in: the physical record */
90 {
91 ut_ad(page_rec_check(rec));
92
93 const page_t* page = page_align(rec);
94 const page_dir_slot_t* first_slot = page_dir_get_nth_slot(page, 0);
95 const page_dir_slot_t* slot = page_dir_get_nth_slot(
96 page, ulint(page_dir_get_n_slots(page)) - 1);
97 const rec_t* r = rec;
98
99 if (page_is_comp(page)) {
100 while (rec_get_n_owned_new(r) == 0) {
101 r = rec_get_next_ptr_const(r, TRUE);
102 ut_ad(r >= page + PAGE_NEW_SUPREMUM);
103 ut_ad(r < page + (srv_page_size - PAGE_DIR));
104 }
105 } else {
106 while (rec_get_n_owned_old(r) == 0) {
107 r = rec_get_next_ptr_const(r, FALSE);
108 ut_ad(r >= page + PAGE_OLD_SUPREMUM);
109 ut_ad(r < page + (srv_page_size - PAGE_DIR));
110 }
111 }
112
113 uint16 rec_offs_bytes = mach_encode_2(ulint(r - page));
114
115 while (UNIV_LIKELY(*(uint16*) slot != rec_offs_bytes)) {
116
117 if (UNIV_UNLIKELY(slot == first_slot)) {
118 ib::error() << "Probable data corruption on page "
119 << page_get_page_no(page)
120 << ". Original record on that page;";
121
122 if (page_is_comp(page)) {
123 fputs("(compact record)", stderr);
124 } else {
125 rec_print_old(stderr, rec);
126 }
127
128 ib::error() << "Cannot find the dir slot for this"
129 " record on that page;";
130
131 if (page_is_comp(page)) {
132 fputs("(compact record)", stderr);
133 } else {
134 rec_print_old(stderr, page
135 + mach_decode_2(rec_offs_bytes));
136 }
137
138 ut_error;
139 }
140
141 slot += PAGE_DIR_SLOT_SIZE;
142 }
143
144 return(((ulint) (first_slot - slot)) / PAGE_DIR_SLOT_SIZE);
145 }
146
147 /**************************************************************//**
148 Used to check the consistency of a directory slot.
149 @return TRUE if succeed */
150 static
151 ibool
page_dir_slot_check(const page_dir_slot_t * slot)152 page_dir_slot_check(
153 /*================*/
154 const page_dir_slot_t* slot) /*!< in: slot */
155 {
156 const page_t* page;
157 ulint n_slots;
158 ulint n_owned;
159
160 ut_a(slot);
161
162 page = page_align(slot);
163
164 n_slots = page_dir_get_n_slots(page);
165
166 ut_a(slot <= page_dir_get_nth_slot(page, 0));
167 ut_a(slot >= page_dir_get_nth_slot(page, n_slots - 1));
168
169 ut_a(page_rec_check(page_dir_slot_get_rec(slot)));
170
171 if (page_is_comp(page)) {
172 n_owned = rec_get_n_owned_new(page_dir_slot_get_rec(slot));
173 } else {
174 n_owned = rec_get_n_owned_old(page_dir_slot_get_rec(slot));
175 }
176
177 if (slot == page_dir_get_nth_slot(page, 0)) {
178 ut_a(n_owned == 1);
179 } else if (slot == page_dir_get_nth_slot(page, n_slots - 1)) {
180 ut_a(n_owned >= 1);
181 ut_a(n_owned <= PAGE_DIR_SLOT_MAX_N_OWNED);
182 } else {
183 ut_a(n_owned >= PAGE_DIR_SLOT_MIN_N_OWNED);
184 ut_a(n_owned <= PAGE_DIR_SLOT_MAX_N_OWNED);
185 }
186
187 return(TRUE);
188 }
189
190 /*************************************************************//**
191 Sets the max trx id field value. */
192 void
page_set_max_trx_id(buf_block_t * block,page_zip_des_t * page_zip,trx_id_t trx_id,mtr_t * mtr)193 page_set_max_trx_id(
194 /*================*/
195 buf_block_t* block, /*!< in/out: page */
196 page_zip_des_t* page_zip,/*!< in/out: compressed page, or NULL */
197 trx_id_t trx_id, /*!< in: transaction id */
198 mtr_t* mtr) /*!< in/out: mini-transaction, or NULL */
199 {
200 ut_ad(!mtr || mtr->memo_contains_flagged(block, MTR_MEMO_PAGE_X_FIX));
201 ut_ad(!page_zip || page_zip == &block->page.zip);
202 static_assert((PAGE_HEADER + PAGE_MAX_TRX_ID) % 8 == 0, "alignment");
203 byte *max_trx_id= my_assume_aligned<8>(PAGE_MAX_TRX_ID +
204 PAGE_HEADER + block->frame);
205
206 mtr->write<8>(*block, max_trx_id, trx_id);
207 if (UNIV_LIKELY_NULL(page_zip))
208 memcpy_aligned<8>(&page_zip->data[PAGE_MAX_TRX_ID + PAGE_HEADER],
209 max_trx_id, 8);
210 }
211
212 /** Persist the AUTO_INCREMENT value on a clustered index root page.
213 @param[in,out] block clustered index root page
214 @param[in] index clustered index
215 @param[in] autoinc next available AUTO_INCREMENT value
216 @param[in,out] mtr mini-transaction
217 @param[in] reset whether to reset the AUTO_INCREMENT
218 to a possibly smaller value than currently
219 exists in the page */
220 void
page_set_autoinc(buf_block_t * block,ib_uint64_t autoinc,mtr_t * mtr,bool reset)221 page_set_autoinc(
222 buf_block_t* block,
223 ib_uint64_t autoinc,
224 mtr_t* mtr,
225 bool reset)
226 {
227 ut_ad(mtr->memo_contains_flagged(block, MTR_MEMO_PAGE_X_FIX |
228 MTR_MEMO_PAGE_SX_FIX));
229
230 byte *field= my_assume_aligned<8>(PAGE_HEADER + PAGE_ROOT_AUTO_INC +
231 block->frame);
232 ib_uint64_t old= mach_read_from_8(field);
233 if (old == autoinc || (old > autoinc && !reset))
234 return; /* nothing to update */
235
236 mtr->write<8>(*block, field, autoinc);
237 if (UNIV_LIKELY_NULL(block->page.zip.data))
238 memcpy_aligned<8>(PAGE_HEADER + PAGE_ROOT_AUTO_INC + block->page.zip.data,
239 field, 8);
240 }
241
242 /** The page infimum and supremum of an empty page in ROW_FORMAT=REDUNDANT */
243 static const byte infimum_supremum_redundant[] = {
244 /* the infimum record */
245 0x08/*end offset*/,
246 0x01/*n_owned*/,
247 0x00, 0x00/*heap_no=0*/,
248 0x03/*n_fields=1, 1-byte offsets*/,
249 0x00, 0x74/* pointer to supremum */,
250 'i', 'n', 'f', 'i', 'm', 'u', 'm', 0,
251 /* the supremum record */
252 0x09/*end offset*/,
253 0x01/*n_owned*/,
254 0x00, 0x08/*heap_no=1*/,
255 0x03/*n_fields=1, 1-byte offsets*/,
256 0x00, 0x00/* end of record list */,
257 's', 'u', 'p', 'r', 'e', 'm', 'u', 'm', 0
258 };
259
260 /** The page infimum and supremum of an empty page in ROW_FORMAT=COMPACT */
261 static const byte infimum_supremum_compact[] = {
262 /* the infimum record */
263 0x01/*n_owned=1*/,
264 0x00, 0x02/* heap_no=0, REC_STATUS_INFIMUM */,
265 0x00, 0x0d/* pointer to supremum */,
266 'i', 'n', 'f', 'i', 'm', 'u', 'm', 0,
267 /* the supremum record */
268 0x01/*n_owned=1*/,
269 0x00, 0x0b/* heap_no=1, REC_STATUS_SUPREMUM */,
270 0x00, 0x00/* end of record list */,
271 's', 'u', 'p', 'r', 'e', 'm', 'u', 'm'
272 };
273
274 /** Create an index page.
275 @param[in,out] block buffer block
276 @param[in] comp nonzero=compact page format */
page_create_low(const buf_block_t * block,bool comp)277 void page_create_low(const buf_block_t* block, bool comp)
278 {
279 page_t* page;
280
281 compile_time_assert(PAGE_BTR_IBUF_FREE_LIST + FLST_BASE_NODE_SIZE
282 <= PAGE_DATA);
283 compile_time_assert(PAGE_BTR_IBUF_FREE_LIST_NODE + FLST_NODE_SIZE
284 <= PAGE_DATA);
285
286 page = block->frame;
287
288 fil_page_set_type(page, FIL_PAGE_INDEX);
289
290 memset(page + PAGE_HEADER, 0, PAGE_HEADER_PRIV_END);
291 page[PAGE_HEADER + PAGE_N_DIR_SLOTS + 1] = 2;
292 page[PAGE_HEADER + PAGE_INSTANT] = 0;
293 page[PAGE_HEADER + PAGE_DIRECTION_B] = PAGE_NO_DIRECTION;
294
295 if (comp) {
296 page[PAGE_HEADER + PAGE_N_HEAP] = 0x80;/*page_is_comp()*/
297 page[PAGE_HEADER + PAGE_N_HEAP + 1] = PAGE_HEAP_NO_USER_LOW;
298 page[PAGE_HEADER + PAGE_HEAP_TOP + 1] = PAGE_NEW_SUPREMUM_END;
299 memcpy(page + PAGE_DATA, infimum_supremum_compact,
300 sizeof infimum_supremum_compact);
301 memset(page
302 + PAGE_NEW_SUPREMUM_END, 0,
303 srv_page_size - PAGE_DIR - PAGE_NEW_SUPREMUM_END);
304 page[srv_page_size - PAGE_DIR - PAGE_DIR_SLOT_SIZE * 2 + 1]
305 = PAGE_NEW_SUPREMUM;
306 page[srv_page_size - PAGE_DIR - PAGE_DIR_SLOT_SIZE + 1]
307 = PAGE_NEW_INFIMUM;
308 } else {
309 page[PAGE_HEADER + PAGE_N_HEAP + 1] = PAGE_HEAP_NO_USER_LOW;
310 page[PAGE_HEADER + PAGE_HEAP_TOP + 1] = PAGE_OLD_SUPREMUM_END;
311 memcpy(page + PAGE_DATA, infimum_supremum_redundant,
312 sizeof infimum_supremum_redundant);
313 memset(page
314 + PAGE_OLD_SUPREMUM_END, 0,
315 srv_page_size - PAGE_DIR - PAGE_OLD_SUPREMUM_END);
316 page[srv_page_size - PAGE_DIR - PAGE_DIR_SLOT_SIZE * 2 + 1]
317 = PAGE_OLD_SUPREMUM;
318 page[srv_page_size - PAGE_DIR - PAGE_DIR_SLOT_SIZE + 1]
319 = PAGE_OLD_INFIMUM;
320 }
321 }
322
323 /** Create an uncompressed index page.
324 @param[in,out] block buffer block
325 @param[in,out] mtr mini-transaction
326 @param[in] comp set unless ROW_FORMAT=REDUNDANT */
page_create(buf_block_t * block,mtr_t * mtr,bool comp)327 void page_create(buf_block_t *block, mtr_t *mtr, bool comp)
328 {
329 mtr->page_create(*block, comp);
330 buf_block_modify_clock_inc(block);
331 page_create_low(block, comp);
332 }
333
334 /**********************************************************//**
335 Create a compressed B-tree index page. */
336 void
page_create_zip(buf_block_t * block,dict_index_t * index,ulint level,trx_id_t max_trx_id,mtr_t * mtr)337 page_create_zip(
338 /*============*/
339 buf_block_t* block, /*!< in/out: a buffer frame
340 where the page is created */
341 dict_index_t* index, /*!< in: the index of the
342 page */
343 ulint level, /*!< in: the B-tree level
344 of the page */
345 trx_id_t max_trx_id, /*!< in: PAGE_MAX_TRX_ID */
346 mtr_t* mtr) /*!< in/out: mini-transaction
347 handle */
348 {
349 ut_ad(block);
350 ut_ad(buf_block_get_page_zip(block));
351 ut_ad(dict_table_is_comp(index->table));
352
353 /* PAGE_MAX_TRX_ID or PAGE_ROOT_AUTO_INC are always 0 for
354 temporary tables. */
355 ut_ad(max_trx_id == 0 || !index->table->is_temporary());
356 /* In secondary indexes and the change buffer, PAGE_MAX_TRX_ID
357 must be zero on non-leaf pages. max_trx_id can be 0 when the
358 index consists of an empty root (leaf) page. */
359 ut_ad(max_trx_id == 0
360 || level == 0
361 || !dict_index_is_sec_or_ibuf(index)
362 || index->table->is_temporary());
363 /* In the clustered index, PAGE_ROOT_AUTOINC or
364 PAGE_MAX_TRX_ID must be 0 on other pages than the root. */
365 ut_ad(level == 0 || max_trx_id == 0
366 || !dict_index_is_sec_or_ibuf(index)
367 || index->table->is_temporary());
368
369 buf_block_modify_clock_inc(block);
370 page_create_low(block, true);
371
372 if (index->is_spatial()) {
373 mach_write_to_2(FIL_PAGE_TYPE + block->frame, FIL_PAGE_RTREE);
374 memset(block->frame + FIL_RTREE_SPLIT_SEQ_NUM, 0, 8);
375 memset(block->page.zip.data + FIL_RTREE_SPLIT_SEQ_NUM, 0, 8);
376 }
377
378 mach_write_to_2(PAGE_HEADER + PAGE_LEVEL + block->frame, level);
379 mach_write_to_8(PAGE_HEADER + PAGE_MAX_TRX_ID + block->frame,
380 max_trx_id);
381
382 if (!page_zip_compress(block, index, page_zip_level, mtr)) {
383 /* The compression of a newly created
384 page should always succeed. */
385 ut_error;
386 }
387 }
388
389 /**********************************************************//**
390 Empty a previously created B-tree index page. */
391 void
page_create_empty(buf_block_t * block,dict_index_t * index,mtr_t * mtr)392 page_create_empty(
393 /*==============*/
394 buf_block_t* block, /*!< in/out: B-tree block */
395 dict_index_t* index, /*!< in: the index of the page */
396 mtr_t* mtr) /*!< in/out: mini-transaction */
397 {
398 trx_id_t max_trx_id;
399 page_zip_des_t* page_zip= buf_block_get_page_zip(block);
400
401 ut_ad(fil_page_index_page_check(block->frame));
402 ut_ad(!index->is_dummy);
403 ut_ad(block->page.id().space() == index->table->space->id);
404
405 /* Multiple transactions cannot simultaneously operate on the
406 same temp-table in parallel.
407 max_trx_id is ignored for temp tables because it not required
408 for MVCC. */
409 if (dict_index_is_sec_or_ibuf(index)
410 && !index->table->is_temporary()
411 && page_is_leaf(block->frame)) {
412 max_trx_id = page_get_max_trx_id(block->frame);
413 ut_ad(max_trx_id);
414 } else if (block->page.id().page_no() == index->page) {
415 /* Preserve PAGE_ROOT_AUTO_INC. */
416 max_trx_id = page_get_max_trx_id(block->frame);
417 } else {
418 max_trx_id = 0;
419 }
420
421 if (page_zip) {
422 ut_ad(!index->table->is_temporary());
423 page_create_zip(block, index,
424 page_header_get_field(block->frame,
425 PAGE_LEVEL),
426 max_trx_id, mtr);
427 } else {
428 page_create(block, mtr, index->table->not_redundant());
429 if (index->is_spatial()) {
430 static_assert(((FIL_PAGE_INDEX & 0xff00)
431 | byte(FIL_PAGE_RTREE))
432 == FIL_PAGE_RTREE, "compatibility");
433 mtr->write<1>(*block, FIL_PAGE_TYPE + 1 + block->frame,
434 byte(FIL_PAGE_RTREE));
435 if (mach_read_from_8(block->frame
436 + FIL_RTREE_SPLIT_SEQ_NUM)) {
437 mtr->memset(block, FIL_RTREE_SPLIT_SEQ_NUM,
438 8, 0);
439 }
440 }
441
442 if (max_trx_id) {
443 mtr->write<8>(*block, PAGE_HEADER + PAGE_MAX_TRX_ID
444 + block->frame, max_trx_id);
445 }
446 }
447 }
448
449 /*************************************************************//**
450 Differs from page_copy_rec_list_end, because this function does not
451 touch the lock table and max trx id on page or compress the page.
452
453 IMPORTANT: The caller will have to update IBUF_BITMAP_FREE
454 if new_block is a compressed leaf page in a secondary index.
455 This has to be done either within the same mini-transaction,
456 or by invoking ibuf_reset_free_bits() before mtr_commit(). */
457 void
page_copy_rec_list_end_no_locks(buf_block_t * new_block,buf_block_t * block,rec_t * rec,dict_index_t * index,mtr_t * mtr)458 page_copy_rec_list_end_no_locks(
459 /*============================*/
460 buf_block_t* new_block, /*!< in: index page to copy to */
461 buf_block_t* block, /*!< in: index page of rec */
462 rec_t* rec, /*!< in: record on page */
463 dict_index_t* index, /*!< in: record descriptor */
464 mtr_t* mtr) /*!< in: mtr */
465 {
466 page_t* new_page = buf_block_get_frame(new_block);
467 page_cur_t cur1;
468 page_cur_t cur2;
469 mem_heap_t* heap = NULL;
470 rec_offs offsets_[REC_OFFS_NORMAL_SIZE];
471 rec_offs* offsets = offsets_;
472 rec_offs_init(offsets_);
473
474 page_cur_position(rec, block, &cur1);
475
476 if (page_cur_is_before_first(&cur1)) {
477
478 page_cur_move_to_next(&cur1);
479 }
480
481 btr_assert_not_corrupted(new_block, index);
482 ut_a(page_is_comp(new_page) == page_rec_is_comp(rec));
483 ut_a(mach_read_from_2(new_page + srv_page_size - 10) == (ulint)
484 (page_is_comp(new_page) ? PAGE_NEW_INFIMUM : PAGE_OLD_INFIMUM));
485 const ulint n_core = page_is_leaf(block->frame)
486 ? index->n_core_fields : 0;
487
488 page_cur_set_before_first(new_block, &cur2);
489
490 /* Copy records from the original page to the new page */
491
492 while (!page_cur_is_after_last(&cur1)) {
493 rec_t* ins_rec;
494 offsets = rec_get_offsets(cur1.rec, index, offsets, n_core,
495 ULINT_UNDEFINED, &heap);
496 ins_rec = page_cur_insert_rec_low(&cur2, index,
497 cur1.rec, offsets, mtr);
498 if (UNIV_UNLIKELY(!ins_rec)) {
499 ib::fatal() << "Rec offset " << page_offset(rec)
500 << ", cur1 offset " << page_offset(cur1.rec)
501 << ", cur2 offset " << page_offset(cur2.rec);
502 }
503
504 page_cur_move_to_next(&cur1);
505 ut_ad(!(rec_get_info_bits(cur1.rec, page_is_comp(new_page))
506 & REC_INFO_MIN_REC_FLAG));
507 cur2.rec = ins_rec;
508 }
509
510 if (UNIV_LIKELY_NULL(heap)) {
511 mem_heap_free(heap);
512 }
513 }
514
515 /*************************************************************//**
516 Copies records from page to new_page, from a given record onward,
517 including that record. Infimum and supremum records are not copied.
518 The records are copied to the start of the record list on new_page.
519
520 IMPORTANT: The caller will have to update IBUF_BITMAP_FREE
521 if new_block is a compressed leaf page in a secondary index.
522 This has to be done either within the same mini-transaction,
523 or by invoking ibuf_reset_free_bits() before mtr_commit().
524
525 @return pointer to the original successor of the infimum record on
526 new_page, or NULL on zip overflow (new_block will be decompressed) */
527 rec_t*
page_copy_rec_list_end(buf_block_t * new_block,buf_block_t * block,rec_t * rec,dict_index_t * index,mtr_t * mtr)528 page_copy_rec_list_end(
529 /*===================*/
530 buf_block_t* new_block, /*!< in/out: index page to copy to */
531 buf_block_t* block, /*!< in: index page containing rec */
532 rec_t* rec, /*!< in: record on page */
533 dict_index_t* index, /*!< in: record descriptor */
534 mtr_t* mtr) /*!< in: mtr */
535 {
536 page_t* new_page = buf_block_get_frame(new_block);
537 page_zip_des_t* new_page_zip = buf_block_get_page_zip(new_block);
538 page_t* page = block->frame;
539 rec_t* ret = page_rec_get_next(
540 page_get_infimum_rec(new_page));
541 ulint num_moved = 0;
542 rtr_rec_move_t* rec_move = NULL;
543 mem_heap_t* heap = NULL;
544 ut_ad(page_align(rec) == page);
545
546 #ifdef UNIV_ZIP_DEBUG
547 if (new_page_zip) {
548 page_zip_des_t* page_zip = buf_block_get_page_zip(block);
549 ut_a(page_zip);
550
551 /* Strict page_zip_validate() may fail here.
552 Furthermore, btr_compress() may set FIL_PAGE_PREV to
553 FIL_NULL on new_page while leaving it intact on
554 new_page_zip. So, we cannot validate new_page_zip. */
555 ut_a(page_zip_validate_low(page_zip, page, index, TRUE));
556 }
557 #endif /* UNIV_ZIP_DEBUG */
558 ut_ad(buf_block_get_frame(block) == page);
559 ut_ad(page_is_leaf(page) == page_is_leaf(new_page));
560 ut_ad(page_is_comp(page) == page_is_comp(new_page));
561 /* Here, "ret" may be pointing to a user record or the
562 predefined supremum record. */
563
564 const mtr_log_t log_mode = new_page_zip
565 ? mtr->set_log_mode(MTR_LOG_NONE) : MTR_LOG_NONE;
566 const bool was_empty = page_dir_get_n_heap(new_page)
567 == PAGE_HEAP_NO_USER_LOW;
568 alignas(2) byte h[PAGE_N_DIRECTION + 2 - PAGE_LAST_INSERT];
569 memcpy_aligned<2>(h, PAGE_HEADER + PAGE_LAST_INSERT + new_page,
570 sizeof h);
571
572 if (index->is_spatial()) {
573 ulint max_to_move = page_get_n_recs(
574 buf_block_get_frame(block));
575 heap = mem_heap_create(256);
576
577 rec_move = static_cast<rtr_rec_move_t*>(
578 mem_heap_alloc(heap, max_to_move * sizeof *rec_move));
579
580 /* For spatial index, we need to insert recs one by one
581 to keep recs ordered. */
582 rtr_page_copy_rec_list_end_no_locks(new_block,
583 block, rec, index,
584 heap, rec_move,
585 max_to_move,
586 &num_moved,
587 mtr);
588 } else {
589 page_copy_rec_list_end_no_locks(new_block, block, rec,
590 index, mtr);
591 if (was_empty) {
592 mtr->memcpy<mtr_t::MAYBE_NOP>(*new_block, PAGE_HEADER
593 + PAGE_LAST_INSERT
594 + new_page, h, sizeof h);
595 }
596 }
597
598 /* Update PAGE_MAX_TRX_ID on the uncompressed page.
599 Modifications will be redo logged and copied to the compressed
600 page in page_zip_compress() or page_zip_reorganize() below.
601 Multiple transactions cannot simultaneously operate on the
602 same temp-table in parallel.
603 max_trx_id is ignored for temp tables because it not required
604 for MVCC. */
605 if (dict_index_is_sec_or_ibuf(index)
606 && page_is_leaf(page)
607 && !index->table->is_temporary()) {
608 ut_ad(!was_empty || page_dir_get_n_heap(new_page)
609 == PAGE_HEAP_NO_USER_LOW
610 + page_header_get_field(new_page, PAGE_N_RECS));
611 page_update_max_trx_id(new_block, NULL,
612 page_get_max_trx_id(page), mtr);
613 }
614
615 if (new_page_zip) {
616 mtr_set_log_mode(mtr, log_mode);
617
618 if (!page_zip_compress(new_block, index,
619 page_zip_level, mtr)) {
620 /* Before trying to reorganize the page,
621 store the number of preceding records on the page. */
622 ulint ret_pos
623 = page_rec_get_n_recs_before(ret);
624 /* Before copying, "ret" was the successor of
625 the predefined infimum record. It must still
626 have at least one predecessor (the predefined
627 infimum record, or a freshly copied record
628 that is smaller than "ret"). */
629 ut_a(ret_pos > 0);
630
631 if (!page_zip_reorganize(new_block, index,
632 page_zip_level, mtr)) {
633
634 if (!page_zip_decompress(new_page_zip,
635 new_page, FALSE)) {
636 ut_error;
637 }
638 ut_ad(page_validate(new_page, index));
639
640 if (heap) {
641 mem_heap_free(heap);
642 }
643
644 return(NULL);
645 } else {
646 /* The page was reorganized:
647 Seek to ret_pos. */
648 ret = page_rec_get_nth(new_page, ret_pos);
649 }
650 }
651 }
652
653 /* Update the lock table and possible hash index */
654
655 if (dict_table_is_locking_disabled(index->table)) {
656 } else if (rec_move && dict_index_is_spatial(index)) {
657 lock_rtr_move_rec_list(new_block, block, rec_move, num_moved);
658 } else {
659 lock_move_rec_list_end(new_block, block, rec);
660 }
661
662 if (heap) {
663 mem_heap_free(heap);
664 }
665
666 btr_search_move_or_delete_hash_entries(new_block, block);
667
668 return(ret);
669 }
670
671 /*************************************************************//**
672 Copies records from page to new_page, up to the given record,
673 NOT including that record. Infimum and supremum records are not copied.
674 The records are copied to the end of the record list on new_page.
675
676 IMPORTANT: The caller will have to update IBUF_BITMAP_FREE
677 if new_block is a compressed leaf page in a secondary index.
678 This has to be done either within the same mini-transaction,
679 or by invoking ibuf_reset_free_bits() before mtr_commit().
680
681 @return pointer to the original predecessor of the supremum record on
682 new_page, or NULL on zip overflow (new_block will be decompressed) */
683 rec_t*
page_copy_rec_list_start(buf_block_t * new_block,buf_block_t * block,rec_t * rec,dict_index_t * index,mtr_t * mtr)684 page_copy_rec_list_start(
685 /*=====================*/
686 buf_block_t* new_block, /*!< in/out: index page to copy to */
687 buf_block_t* block, /*!< in: index page containing rec */
688 rec_t* rec, /*!< in: record on page */
689 dict_index_t* index, /*!< in: record descriptor */
690 mtr_t* mtr) /*!< in: mtr */
691 {
692 ut_ad(page_align(rec) == block->frame);
693
694 page_t* new_page = buf_block_get_frame(new_block);
695 page_zip_des_t* new_page_zip = buf_block_get_page_zip(new_block);
696 page_cur_t cur1;
697 page_cur_t cur2;
698 mem_heap_t* heap = NULL;
699 ulint num_moved = 0;
700 rtr_rec_move_t* rec_move = NULL;
701 rec_t* ret
702 = page_rec_get_prev(page_get_supremum_rec(new_page));
703 rec_offs offsets_[REC_OFFS_NORMAL_SIZE];
704 rec_offs* offsets = offsets_;
705 rec_offs_init(offsets_);
706
707 /* Here, "ret" may be pointing to a user record or the
708 predefined infimum record. */
709
710 if (page_rec_is_infimum(rec)) {
711 return(ret);
712 }
713
714 mtr_log_t log_mode = MTR_LOG_NONE;
715
716 if (new_page_zip) {
717 log_mode = mtr_set_log_mode(mtr, MTR_LOG_NONE);
718 }
719
720 page_cur_set_before_first(block, &cur1);
721 page_cur_move_to_next(&cur1);
722
723 page_cur_position(ret, new_block, &cur2);
724
725 const ulint n_core = page_rec_is_leaf(rec) ? index->n_core_fields : 0;
726
727 /* Copy records from the original page to the new page */
728 if (index->is_spatial()) {
729 ut_ad(!index->is_instant());
730 ulint max_to_move = page_get_n_recs(
731 buf_block_get_frame(block));
732 heap = mem_heap_create(256);
733
734 rec_move = static_cast<rtr_rec_move_t*>(mem_heap_alloc(
735 heap,
736 sizeof (*rec_move) * max_to_move));
737
738 /* For spatial index, we need to insert recs one by one
739 to keep recs ordered. */
740 rtr_page_copy_rec_list_start_no_locks(new_block,
741 block, rec, index, heap,
742 rec_move, max_to_move,
743 &num_moved, mtr);
744 } else {
745 while (page_cur_get_rec(&cur1) != rec) {
746 offsets = rec_get_offsets(cur1.rec, index, offsets,
747 n_core,
748 ULINT_UNDEFINED, &heap);
749 cur2.rec = page_cur_insert_rec_low(&cur2, index,
750 cur1.rec, offsets,
751 mtr);
752 ut_a(cur2.rec);
753
754 page_cur_move_to_next(&cur1);
755 ut_ad(!(rec_get_info_bits(cur1.rec,
756 page_is_comp(new_page))
757 & REC_INFO_MIN_REC_FLAG));
758 }
759 }
760
761 /* Update PAGE_MAX_TRX_ID on the uncompressed page.
762 Modifications will be redo logged and copied to the compressed
763 page in page_zip_compress() or page_zip_reorganize() below.
764 Multiple transactions cannot simultaneously operate on the
765 same temp-table in parallel.
766 max_trx_id is ignored for temp tables because it not required
767 for MVCC. */
768 if (n_core && dict_index_is_sec_or_ibuf(index)
769 && !index->table->is_temporary()) {
770 page_update_max_trx_id(new_block,
771 new_page_zip,
772 page_get_max_trx_id(block->frame),
773 mtr);
774 }
775
776 if (new_page_zip) {
777 mtr_set_log_mode(mtr, log_mode);
778
779 DBUG_EXECUTE_IF("page_copy_rec_list_start_compress_fail",
780 goto zip_reorganize;);
781
782 if (!page_zip_compress(new_block, index,
783 page_zip_level, mtr)) {
784 ulint ret_pos;
785 #ifndef DBUG_OFF
786 zip_reorganize:
787 #endif /* DBUG_OFF */
788 /* Before trying to reorganize the page,
789 store the number of preceding records on the page. */
790 ret_pos = page_rec_get_n_recs_before(ret);
791 /* Before copying, "ret" was the predecessor
792 of the predefined supremum record. If it was
793 the predefined infimum record, then it would
794 still be the infimum, and we would have
795 ret_pos == 0. */
796
797 if (UNIV_UNLIKELY
798 (!page_zip_reorganize(new_block, index,
799 page_zip_level, mtr))) {
800
801 if (UNIV_UNLIKELY
802 (!page_zip_decompress(new_page_zip,
803 new_page, FALSE))) {
804 ut_error;
805 }
806 ut_ad(page_validate(new_page, index));
807
808 if (UNIV_LIKELY_NULL(heap)) {
809 mem_heap_free(heap);
810 }
811
812 return(NULL);
813 }
814
815 /* The page was reorganized: Seek to ret_pos. */
816 ret = page_rec_get_nth(new_page, ret_pos);
817 }
818 }
819
820 /* Update the lock table and possible hash index */
821
822 if (dict_table_is_locking_disabled(index->table)) {
823 } else if (dict_index_is_spatial(index)) {
824 lock_rtr_move_rec_list(new_block, block, rec_move, num_moved);
825 } else {
826 lock_move_rec_list_start(new_block, block, rec, ret);
827 }
828
829 if (heap) {
830 mem_heap_free(heap);
831 }
832
833 btr_search_move_or_delete_hash_entries(new_block, block);
834
835 return(ret);
836 }
837
838 /*************************************************************//**
839 Deletes records from a page from a given record onward, including that record.
840 The infimum and supremum records are not deleted. */
841 void
page_delete_rec_list_end(rec_t * rec,buf_block_t * block,dict_index_t * index,ulint n_recs,ulint size,mtr_t * mtr)842 page_delete_rec_list_end(
843 /*=====================*/
844 rec_t* rec, /*!< in: pointer to record on page */
845 buf_block_t* block, /*!< in: buffer block of the page */
846 dict_index_t* index, /*!< in: record descriptor */
847 ulint n_recs, /*!< in: number of records to delete,
848 or ULINT_UNDEFINED if not known */
849 ulint size, /*!< in: the sum of the sizes of the
850 records in the end of the chain to
851 delete, or ULINT_UNDEFINED if not known */
852 mtr_t* mtr) /*!< in: mtr */
853 {
854 ut_ad(size == ULINT_UNDEFINED || size < srv_page_size);
855 ut_ad(page_align(rec) == block->frame);
856 ut_ad(index->table->not_redundant() == !!page_is_comp(block->frame));
857 #ifdef UNIV_ZIP_DEBUG
858 ut_a(!block->page.zip.data ||
859 page_zip_validate(&block->page.zip, block->frame, index));
860 #endif /* UNIV_ZIP_DEBUG */
861
862 if (page_rec_is_supremum(rec))
863 {
864 ut_ad(n_recs == 0 || n_recs == ULINT_UNDEFINED);
865 /* Nothing to do, there are no records bigger than the page supremum. */
866 return;
867 }
868
869 if (page_rec_is_infimum(rec) || n_recs == page_get_n_recs(block->frame) ||
870 rec == (page_is_comp(block->frame)
871 ? page_rec_get_next_low(block->frame + PAGE_NEW_INFIMUM, 1)
872 : page_rec_get_next_low(block->frame + PAGE_OLD_INFIMUM, 0)))
873 {
874 /* We are deleting all records. */
875 page_create_empty(block, index, mtr);
876 return;
877 }
878
879 #if 0 // FIXME: consider deleting the last record as a special case
880 if (page_rec_is_last(rec))
881 {
882 page_cur_t cursor= { index, rec, offsets, block };
883 page_cur_delete_rec(&cursor, index, offsets, mtr);
884 return;
885 }
886 #endif
887
888 /* The page becomes invalid for optimistic searches */
889 buf_block_modify_clock_inc(block);
890
891 const ulint n_core= page_is_leaf(block->frame) ? index->n_core_fields : 0;
892 mem_heap_t *heap= nullptr;
893 rec_offs offsets_[REC_OFFS_NORMAL_SIZE];
894 rec_offs *offsets= offsets_;
895 rec_offs_init(offsets_);
896
897 #if 1 // FIXME: remove this, and write minimal amount of log! */
898 if (UNIV_LIKELY_NULL(block->page.zip.data))
899 {
900 ut_ad(page_is_comp(block->frame));
901 do
902 {
903 page_cur_t cur;
904 page_cur_position(rec, block, &cur);
905 offsets= rec_get_offsets(rec, index, offsets, n_core,
906 ULINT_UNDEFINED, &heap);
907 rec= rec_get_next_ptr(rec, TRUE);
908 #ifdef UNIV_ZIP_DEBUG
909 ut_a(page_zip_validate(&block->page.zip, block->frame, index));
910 #endif /* UNIV_ZIP_DEBUG */
911 page_cur_delete_rec(&cur, index, offsets, mtr);
912 }
913 while (page_offset(rec) != PAGE_NEW_SUPREMUM);
914
915 if (UNIV_LIKELY_NULL(heap))
916 mem_heap_free(heap);
917 return;
918 }
919 #endif
920
921 byte *prev_rec= page_rec_get_prev(rec);
922 byte *last_rec= page_rec_get_prev(page_get_supremum_rec(block->frame));
923
924 // FIXME: consider a special case of shrinking PAGE_HEAP_TOP
925
926 const bool scrub= srv_immediate_scrub_data_uncompressed;
927 if (scrub || size == ULINT_UNDEFINED || n_recs == ULINT_UNDEFINED)
928 {
929 rec_t *rec2= rec;
930 /* Calculate the sum of sizes and the number of records */
931 size= 0;
932 n_recs= 0;
933
934 do
935 {
936 offsets = rec_get_offsets(rec2, index, offsets, n_core,
937 ULINT_UNDEFINED, &heap);
938 ulint s= rec_offs_size(offsets);
939 ut_ad(ulint(rec2 - block->frame) + s - rec_offs_extra_size(offsets) <
940 srv_page_size);
941 ut_ad(size + s < srv_page_size);
942 size+= s;
943 n_recs++;
944
945 if (scrub)
946 mtr->memset(block, page_offset(rec2), rec_offs_data_size(offsets), 0);
947
948 rec2 = page_rec_get_next(rec2);
949 }
950 while (!page_rec_is_supremum(rec2));
951
952 if (UNIV_LIKELY_NULL(heap))
953 mem_heap_free(heap);
954 }
955
956 ut_ad(size < srv_page_size);
957
958 ulint slot_index, n_owned;
959 {
960 const rec_t *owner_rec= rec;
961 ulint count= 0;
962
963 if (page_is_comp(block->frame))
964 while (!(n_owned= rec_get_n_owned_new(owner_rec)))
965 {
966 count++;
967 owner_rec= rec_get_next_ptr_const(owner_rec, TRUE);
968 }
969 else
970 while (!(n_owned= rec_get_n_owned_old(owner_rec)))
971 {
972 count++;
973 owner_rec= rec_get_next_ptr_const(owner_rec, FALSE);
974 }
975
976 ut_ad(n_owned > count);
977 n_owned-= count;
978 slot_index= page_dir_find_owner_slot(owner_rec);
979 ut_ad(slot_index > 0);
980 }
981
982 mtr->write<2,mtr_t::MAYBE_NOP>(*block, my_assume_aligned<2>
983 (PAGE_N_DIR_SLOTS + PAGE_HEADER +
984 block->frame), slot_index + 1);
985 mtr->write<2,mtr_t::MAYBE_NOP>(*block, my_assume_aligned<2>
986 (PAGE_LAST_INSERT + PAGE_HEADER +
987 block->frame), 0U);
988 /* Catenate the deleted chain segment to the page free list */
989 alignas(4) byte page_header[4];
990 byte *page_free= my_assume_aligned<4>(PAGE_HEADER + PAGE_FREE +
991 block->frame);
992 const uint16_t free= page_header_get_field(block->frame, PAGE_FREE);
993 static_assert(PAGE_FREE + 2 == PAGE_GARBAGE, "compatibility");
994
995 mach_write_to_2(page_header, page_offset(rec));
996 mach_write_to_2(my_assume_aligned<2>(page_header + 2),
997 mach_read_from_2(my_assume_aligned<2>(page_free + 2)) +
998 size);
999 mtr->memcpy(*block, page_free, page_header, 4);
1000
1001 byte *page_n_recs= my_assume_aligned<2>(PAGE_N_RECS + PAGE_HEADER +
1002 block->frame);
1003 mtr->write<2>(*block, page_n_recs,
1004 ulint{mach_read_from_2(page_n_recs)} - n_recs);
1005
1006 /* Update the page directory; there is no need to balance the number
1007 of the records owned by the supremum record, as it is allowed to be
1008 less than PAGE_DIR_SLOT_MIN_N_OWNED */
1009 page_dir_slot_t *slot= page_dir_get_nth_slot(block->frame, slot_index);
1010
1011 if (page_is_comp(block->frame))
1012 {
1013 mtr->write<2,mtr_t::MAYBE_NOP>(*block, slot, PAGE_NEW_SUPREMUM);
1014 byte *owned= PAGE_NEW_SUPREMUM - REC_NEW_N_OWNED + block->frame;
1015 byte new_owned= static_cast<byte>((*owned & ~REC_N_OWNED_MASK) |
1016 n_owned << REC_N_OWNED_SHIFT);
1017 #if 0 // FIXME: implement minimal logging for ROW_FORMAT=COMPRESSED
1018 if (UNIV_LIKELY_NULL(block->page.zip.data))
1019 {
1020 *owned= new_owned;
1021 memcpy_aligned<2>(PAGE_N_DIR_SLOTS + PAGE_HEADER + block->page.zip.data,
1022 PAGE_N_DIR_SLOTS + PAGE_HEADER + block->frame,
1023 PAGE_N_RECS + 2 - PAGE_N_DIR_SLOTS);
1024 // TODO: the equivalent of page_zip_dir_delete() for all records
1025 mach_write_to_2(prev_rec - REC_NEXT, static_cast<uint16_t>
1026 (PAGE_NEW_SUPREMUM - page_offset(prev_rec)));
1027 mach_write_to_2(last_rec - REC_NEXT, free
1028 ? static_cast<uint16_t>(free - page_offset(last_rec))
1029 : 0U);
1030 return;
1031 }
1032 #endif
1033 mtr->write<1,mtr_t::MAYBE_NOP>(*block, owned, new_owned);
1034 mtr->write<2>(*block, prev_rec - REC_NEXT, static_cast<uint16_t>
1035 (PAGE_NEW_SUPREMUM - page_offset(prev_rec)));
1036 mtr->write<2>(*block, last_rec - REC_NEXT, free
1037 ? static_cast<uint16_t>(free - page_offset(last_rec))
1038 : 0U);
1039 }
1040 else
1041 {
1042 mtr->write<2,mtr_t::MAYBE_NOP>(*block, slot, PAGE_OLD_SUPREMUM);
1043 byte *owned= PAGE_OLD_SUPREMUM - REC_OLD_N_OWNED + block->frame;
1044 byte new_owned= static_cast<byte>((*owned & ~REC_N_OWNED_MASK) |
1045 n_owned << REC_N_OWNED_SHIFT);
1046 mtr->write<1,mtr_t::MAYBE_NOP>(*block, owned, new_owned);
1047 mtr->write<2>(*block, prev_rec - REC_NEXT, PAGE_OLD_SUPREMUM);
1048 mtr->write<2>(*block, last_rec - REC_NEXT, free);
1049 }
1050 }
1051
1052 /*************************************************************//**
1053 Deletes records from page, up to the given record, NOT including
1054 that record. Infimum and supremum records are not deleted. */
1055 void
page_delete_rec_list_start(rec_t * rec,buf_block_t * block,dict_index_t * index,mtr_t * mtr)1056 page_delete_rec_list_start(
1057 /*=======================*/
1058 rec_t* rec, /*!< in: record on page */
1059 buf_block_t* block, /*!< in: buffer block of the page */
1060 dict_index_t* index, /*!< in: record descriptor */
1061 mtr_t* mtr) /*!< in: mtr */
1062 {
1063 page_cur_t cur1;
1064 rec_offs offsets_[REC_OFFS_NORMAL_SIZE];
1065 rec_offs* offsets = offsets_;
1066 mem_heap_t* heap = NULL;
1067
1068 rec_offs_init(offsets_);
1069
1070 ut_ad(page_align(rec) == block->frame);
1071 ut_ad((ibool) !!page_rec_is_comp(rec)
1072 == dict_table_is_comp(index->table));
1073 #ifdef UNIV_ZIP_DEBUG
1074 {
1075 page_zip_des_t* page_zip= buf_block_get_page_zip(block);
1076 page_t* page = buf_block_get_frame(block);
1077
1078 /* page_zip_validate() would detect a min_rec_mark mismatch
1079 in btr_page_split_and_insert()
1080 between btr_attach_half_pages() and insert_page = ...
1081 when btr_page_get_split_rec_to_left() holds
1082 (direction == FSP_DOWN). */
1083 ut_a(!page_zip
1084 || page_zip_validate_low(page_zip, page, index, TRUE));
1085 }
1086 #endif /* UNIV_ZIP_DEBUG */
1087
1088 if (page_rec_is_infimum(rec)) {
1089 return;
1090 }
1091
1092 if (page_rec_is_supremum(rec)) {
1093 /* We are deleting all records. */
1094 page_create_empty(block, index, mtr);
1095 return;
1096 }
1097
1098 page_cur_set_before_first(block, &cur1);
1099 page_cur_move_to_next(&cur1);
1100
1101 const ulint n_core = page_rec_is_leaf(rec)
1102 ? index->n_core_fields : 0;
1103
1104 while (page_cur_get_rec(&cur1) != rec) {
1105 offsets = rec_get_offsets(page_cur_get_rec(&cur1), index,
1106 offsets, n_core,
1107 ULINT_UNDEFINED, &heap);
1108 page_cur_delete_rec(&cur1, index, offsets, mtr);
1109 }
1110
1111 if (UNIV_LIKELY_NULL(heap)) {
1112 mem_heap_free(heap);
1113 }
1114 }
1115
1116 /*************************************************************//**
1117 Moves record list end to another page. Moved records include
1118 split_rec.
1119
1120 IMPORTANT: The caller will have to update IBUF_BITMAP_FREE
1121 if new_block is a compressed leaf page in a secondary index.
1122 This has to be done either within the same mini-transaction,
1123 or by invoking ibuf_reset_free_bits() before mtr_commit().
1124
1125 @return TRUE on success; FALSE on compression failure (new_block will
1126 be decompressed) */
1127 ibool
page_move_rec_list_end(buf_block_t * new_block,buf_block_t * block,rec_t * split_rec,dict_index_t * index,mtr_t * mtr)1128 page_move_rec_list_end(
1129 /*===================*/
1130 buf_block_t* new_block, /*!< in/out: index page where to move */
1131 buf_block_t* block, /*!< in: index page from where to move */
1132 rec_t* split_rec, /*!< in: first record to move */
1133 dict_index_t* index, /*!< in: record descriptor */
1134 mtr_t* mtr) /*!< in: mtr */
1135 {
1136 page_t* new_page = buf_block_get_frame(new_block);
1137 ulint old_data_size;
1138 ulint new_data_size;
1139 ulint old_n_recs;
1140 ulint new_n_recs;
1141
1142 ut_ad(!dict_index_is_spatial(index));
1143
1144 old_data_size = page_get_data_size(new_page);
1145 old_n_recs = page_get_n_recs(new_page);
1146 #ifdef UNIV_ZIP_DEBUG
1147 {
1148 page_zip_des_t* new_page_zip
1149 = buf_block_get_page_zip(new_block);
1150 page_zip_des_t* page_zip
1151 = buf_block_get_page_zip(block);
1152 ut_a(!new_page_zip == !page_zip);
1153 ut_a(!new_page_zip
1154 || page_zip_validate(new_page_zip, new_page, index));
1155 ut_a(!page_zip
1156 || page_zip_validate(page_zip, page_align(split_rec),
1157 index));
1158 }
1159 #endif /* UNIV_ZIP_DEBUG */
1160
1161 if (UNIV_UNLIKELY(!page_copy_rec_list_end(new_block, block,
1162 split_rec, index, mtr))) {
1163 return(FALSE);
1164 }
1165
1166 new_data_size = page_get_data_size(new_page);
1167 new_n_recs = page_get_n_recs(new_page);
1168
1169 ut_ad(new_data_size >= old_data_size);
1170
1171 page_delete_rec_list_end(split_rec, block, index,
1172 new_n_recs - old_n_recs,
1173 new_data_size - old_data_size, mtr);
1174
1175 return(TRUE);
1176 }
1177
1178 /*************************************************************//**
1179 Moves record list start to another page. Moved records do not include
1180 split_rec.
1181
1182 IMPORTANT: The caller will have to update IBUF_BITMAP_FREE
1183 if new_block is a compressed leaf page in a secondary index.
1184 This has to be done either within the same mini-transaction,
1185 or by invoking ibuf_reset_free_bits() before mtr_commit().
1186
1187 @return TRUE on success; FALSE on compression failure */
1188 ibool
page_move_rec_list_start(buf_block_t * new_block,buf_block_t * block,rec_t * split_rec,dict_index_t * index,mtr_t * mtr)1189 page_move_rec_list_start(
1190 /*=====================*/
1191 buf_block_t* new_block, /*!< in/out: index page where to move */
1192 buf_block_t* block, /*!< in/out: page containing split_rec */
1193 rec_t* split_rec, /*!< in: first record not to move */
1194 dict_index_t* index, /*!< in: record descriptor */
1195 mtr_t* mtr) /*!< in: mtr */
1196 {
1197 if (UNIV_UNLIKELY(!page_copy_rec_list_start(new_block, block,
1198 split_rec, index, mtr))) {
1199 return(FALSE);
1200 }
1201
1202 page_delete_rec_list_start(split_rec, block, index, mtr);
1203
1204 return(TRUE);
1205 }
1206
1207 /************************************************************//**
1208 Returns the nth record of the record list.
1209 This is the inverse function of page_rec_get_n_recs_before().
1210 @return nth record */
1211 const rec_t*
page_rec_get_nth_const(const page_t * page,ulint nth)1212 page_rec_get_nth_const(
1213 /*===================*/
1214 const page_t* page, /*!< in: page */
1215 ulint nth) /*!< in: nth record */
1216 {
1217 const page_dir_slot_t* slot;
1218 ulint i;
1219 ulint n_owned;
1220 const rec_t* rec;
1221
1222 if (nth == 0) {
1223 return(page_get_infimum_rec(page));
1224 }
1225
1226 ut_ad(nth < srv_page_size / (REC_N_NEW_EXTRA_BYTES + 1));
1227
1228 for (i = 0;; i++) {
1229
1230 slot = page_dir_get_nth_slot(page, i);
1231 n_owned = page_dir_slot_get_n_owned(slot);
1232
1233 if (n_owned > nth) {
1234 break;
1235 } else {
1236 nth -= n_owned;
1237 }
1238 }
1239
1240 ut_ad(i > 0);
1241 slot = page_dir_get_nth_slot(page, i - 1);
1242 rec = page_dir_slot_get_rec(slot);
1243
1244 if (page_is_comp(page)) {
1245 do {
1246 rec = page_rec_get_next_low(rec, TRUE);
1247 ut_ad(rec);
1248 } while (nth--);
1249 } else {
1250 do {
1251 rec = page_rec_get_next_low(rec, FALSE);
1252 ut_ad(rec);
1253 } while (nth--);
1254 }
1255
1256 return(rec);
1257 }
1258
1259 /***************************************************************//**
1260 Returns the number of records before the given record in chain.
1261 The number includes infimum and supremum records.
1262 @return number of records */
1263 ulint
page_rec_get_n_recs_before(const rec_t * rec)1264 page_rec_get_n_recs_before(
1265 /*=======================*/
1266 const rec_t* rec) /*!< in: the physical record */
1267 {
1268 const page_dir_slot_t* slot;
1269 const rec_t* slot_rec;
1270 const page_t* page;
1271 ulint i;
1272 lint n = 0;
1273
1274 ut_ad(page_rec_check(rec));
1275
1276 page = page_align(rec);
1277 if (page_is_comp(page)) {
1278 while (rec_get_n_owned_new(rec) == 0) {
1279
1280 rec = rec_get_next_ptr_const(rec, TRUE);
1281 n--;
1282 }
1283
1284 for (i = 0; ; i++) {
1285 slot = page_dir_get_nth_slot(page, i);
1286 slot_rec = page_dir_slot_get_rec(slot);
1287
1288 n += lint(rec_get_n_owned_new(slot_rec));
1289
1290 if (rec == slot_rec) {
1291
1292 break;
1293 }
1294 }
1295 } else {
1296 while (rec_get_n_owned_old(rec) == 0) {
1297
1298 rec = rec_get_next_ptr_const(rec, FALSE);
1299 n--;
1300 }
1301
1302 for (i = 0; ; i++) {
1303 slot = page_dir_get_nth_slot(page, i);
1304 slot_rec = page_dir_slot_get_rec(slot);
1305
1306 n += lint(rec_get_n_owned_old(slot_rec));
1307
1308 if (rec == slot_rec) {
1309
1310 break;
1311 }
1312 }
1313 }
1314
1315 n--;
1316
1317 ut_ad(n >= 0);
1318 ut_ad((ulong) n < srv_page_size / (REC_N_NEW_EXTRA_BYTES + 1));
1319
1320 return((ulint) n);
1321 }
1322
1323 /************************************************************//**
1324 Prints record contents including the data relevant only in
1325 the index page context. */
1326 void
page_rec_print(const rec_t * rec,const rec_offs * offsets)1327 page_rec_print(
1328 /*===========*/
1329 const rec_t* rec, /*!< in: physical record */
1330 const rec_offs* offsets)/*!< in: record descriptor */
1331 {
1332 ut_a(!page_rec_is_comp(rec) == !rec_offs_comp(offsets));
1333 rec_print_new(stderr, rec, offsets);
1334 if (page_rec_is_comp(rec)) {
1335 ib::info() << "n_owned: " << rec_get_n_owned_new(rec)
1336 << "; heap_no: " << rec_get_heap_no_new(rec)
1337 << "; next rec: " << rec_get_next_offs(rec, TRUE);
1338 } else {
1339 ib::info() << "n_owned: " << rec_get_n_owned_old(rec)
1340 << "; heap_no: " << rec_get_heap_no_old(rec)
1341 << "; next rec: " << rec_get_next_offs(rec, FALSE);
1342 }
1343
1344 page_rec_check(rec);
1345 rec_validate(rec, offsets);
1346 }
1347
1348 #ifdef UNIV_BTR_PRINT
1349 /***************************************************************//**
1350 This is used to print the contents of the directory for
1351 debugging purposes. */
1352 void
page_dir_print(page_t * page,ulint pr_n)1353 page_dir_print(
1354 /*===========*/
1355 page_t* page, /*!< in: index page */
1356 ulint pr_n) /*!< in: print n first and n last entries */
1357 {
1358 ulint n;
1359 ulint i;
1360 page_dir_slot_t* slot;
1361
1362 n = page_dir_get_n_slots(page);
1363
1364 fprintf(stderr, "--------------------------------\n"
1365 "PAGE DIRECTORY\n"
1366 "Page address %p\n"
1367 "Directory stack top at offs: %lu; number of slots: %lu\n",
1368 page, (ulong) page_offset(page_dir_get_nth_slot(page, n - 1)),
1369 (ulong) n);
1370 for (i = 0; i < n; i++) {
1371 slot = page_dir_get_nth_slot(page, i);
1372 if ((i == pr_n) && (i < n - pr_n)) {
1373 fputs(" ... \n", stderr);
1374 }
1375 if ((i < pr_n) || (i >= n - pr_n)) {
1376 fprintf(stderr,
1377 "Contents of slot: %lu: n_owned: %lu,"
1378 " rec offs: %lu\n",
1379 (ulong) i,
1380 (ulong) page_dir_slot_get_n_owned(slot),
1381 (ulong)
1382 page_offset(page_dir_slot_get_rec(slot)));
1383 }
1384 }
1385 fprintf(stderr, "Total of %lu records\n"
1386 "--------------------------------\n",
1387 (ulong) (PAGE_HEAP_NO_USER_LOW + page_get_n_recs(page)));
1388 }
1389
1390 /***************************************************************//**
1391 This is used to print the contents of the page record list for
1392 debugging purposes. */
1393 void
page_print_list(buf_block_t * block,dict_index_t * index,ulint pr_n)1394 page_print_list(
1395 /*============*/
1396 buf_block_t* block, /*!< in: index page */
1397 dict_index_t* index, /*!< in: dictionary index of the page */
1398 ulint pr_n) /*!< in: print n first and n last entries */
1399 {
1400 page_t* page = block->frame;
1401 page_cur_t cur;
1402 ulint count;
1403 ulint n_recs;
1404 mem_heap_t* heap = NULL;
1405 rec_offs offsets_[REC_OFFS_NORMAL_SIZE];
1406 rec_offs* offsets = offsets_;
1407 rec_offs_init(offsets_);
1408
1409 ut_a((ibool)!!page_is_comp(page) == dict_table_is_comp(index->table));
1410
1411 fprint(stderr,
1412 "--------------------------------\n"
1413 "PAGE RECORD LIST\n"
1414 "Page address %p\n", page);
1415
1416 n_recs = page_get_n_recs(page);
1417
1418 page_cur_set_before_first(block, &cur);
1419 count = 0;
1420 for (;;) {
1421 offsets = rec_get_offsets(cur.rec, index, offsets,
1422 page_rec_is_leaf(cur.rec),
1423 ULINT_UNDEFINED, &heap);
1424 page_rec_print(cur.rec, offsets);
1425
1426 if (count == pr_n) {
1427 break;
1428 }
1429 if (page_cur_is_after_last(&cur)) {
1430 break;
1431 }
1432 page_cur_move_to_next(&cur);
1433 count++;
1434 }
1435
1436 if (n_recs > 2 * pr_n) {
1437 fputs(" ... \n", stderr);
1438 }
1439
1440 while (!page_cur_is_after_last(&cur)) {
1441 page_cur_move_to_next(&cur);
1442
1443 if (count + pr_n >= n_recs) {
1444 offsets = rec_get_offsets(cur.rec, index, offsets,
1445 page_rec_is_leaf(cur.rec),
1446 ULINT_UNDEFINED, &heap);
1447 page_rec_print(cur.rec, offsets);
1448 }
1449 count++;
1450 }
1451
1452 fprintf(stderr,
1453 "Total of %lu records \n"
1454 "--------------------------------\n",
1455 (ulong) (count + 1));
1456
1457 if (UNIV_LIKELY_NULL(heap)) {
1458 mem_heap_free(heap);
1459 }
1460 }
1461
1462 /***************************************************************//**
1463 Prints the info in a page header. */
1464 void
page_header_print(const page_t * page)1465 page_header_print(
1466 /*==============*/
1467 const page_t* page)
1468 {
1469 fprintf(stderr,
1470 "--------------------------------\n"
1471 "PAGE HEADER INFO\n"
1472 "Page address %p, n records %u (%s)\n"
1473 "n dir slots %u, heap top %u\n"
1474 "Page n heap %u, free %u, garbage %u\n"
1475 "Page last insert %u, direction %u, n direction %u\n",
1476 page, page_header_get_field(page, PAGE_N_RECS),
1477 page_is_comp(page) ? "compact format" : "original format",
1478 page_header_get_field(page, PAGE_N_DIR_SLOTS),
1479 page_header_get_field(page, PAGE_HEAP_TOP),
1480 page_dir_get_n_heap(page),
1481 page_header_get_field(page, PAGE_FREE),
1482 page_header_get_field(page, PAGE_GARBAGE),
1483 page_header_get_field(page, PAGE_LAST_INSERT),
1484 page_get_direction(page),
1485 page_header_get_field(page, PAGE_N_DIRECTION));
1486 }
1487
1488 /***************************************************************//**
1489 This is used to print the contents of the page for
1490 debugging purposes. */
1491 void
page_print(buf_block_t * block,dict_index_t * index,ulint dn,ulint rn)1492 page_print(
1493 /*=======*/
1494 buf_block_t* block, /*!< in: index page */
1495 dict_index_t* index, /*!< in: dictionary index of the page */
1496 ulint dn, /*!< in: print dn first and last entries
1497 in directory */
1498 ulint rn) /*!< in: print rn first and last records
1499 in directory */
1500 {
1501 page_t* page = block->frame;
1502
1503 page_header_print(page);
1504 page_dir_print(page, dn);
1505 page_print_list(block, index, rn);
1506 }
1507 #endif /* UNIV_BTR_PRINT */
1508
1509 /***************************************************************//**
1510 The following is used to validate a record on a page. This function
1511 differs from rec_validate as it can also check the n_owned field and
1512 the heap_no field.
1513 @return TRUE if ok */
1514 ibool
page_rec_validate(const rec_t * rec,const rec_offs * offsets)1515 page_rec_validate(
1516 /*==============*/
1517 const rec_t* rec, /*!< in: physical record */
1518 const rec_offs* offsets)/*!< in: array returned by rec_get_offsets() */
1519 {
1520 ulint n_owned;
1521 ulint heap_no;
1522 const page_t* page;
1523
1524 page = page_align(rec);
1525 ut_a(!page_is_comp(page) == !rec_offs_comp(offsets));
1526
1527 page_rec_check(rec);
1528 rec_validate(rec, offsets);
1529
1530 if (page_rec_is_comp(rec)) {
1531 n_owned = rec_get_n_owned_new(rec);
1532 heap_no = rec_get_heap_no_new(rec);
1533 } else {
1534 n_owned = rec_get_n_owned_old(rec);
1535 heap_no = rec_get_heap_no_old(rec);
1536 }
1537
1538 if (UNIV_UNLIKELY(!(n_owned <= PAGE_DIR_SLOT_MAX_N_OWNED))) {
1539 ib::warn() << "Dir slot of rec " << page_offset(rec)
1540 << ", n owned too big " << n_owned;
1541 return(FALSE);
1542 }
1543
1544 if (UNIV_UNLIKELY(!(heap_no < page_dir_get_n_heap(page)))) {
1545 ib::warn() << "Heap no of rec " << page_offset(rec)
1546 << " too big " << heap_no << " "
1547 << page_dir_get_n_heap(page);
1548 return(FALSE);
1549 }
1550
1551 return(TRUE);
1552 }
1553
1554 #ifdef UNIV_DEBUG
1555 /***************************************************************//**
1556 Checks that the first directory slot points to the infimum record and
1557 the last to the supremum. This function is intended to track if the
1558 bug fixed in 4.0.14 has caused corruption to users' databases. */
1559 void
page_check_dir(const page_t * page)1560 page_check_dir(
1561 /*===========*/
1562 const page_t* page) /*!< in: index page */
1563 {
1564 ulint n_slots;
1565 ulint infimum_offs;
1566 ulint supremum_offs;
1567
1568 n_slots = page_dir_get_n_slots(page);
1569 infimum_offs = mach_read_from_2(page_dir_get_nth_slot(page, 0));
1570 supremum_offs = mach_read_from_2(page_dir_get_nth_slot(page,
1571 n_slots - 1));
1572
1573 if (UNIV_UNLIKELY(!page_rec_is_infimum_low(infimum_offs))) {
1574
1575 ib::fatal() << "Page directory corruption: infimum not"
1576 " pointed to";
1577 }
1578
1579 if (UNIV_UNLIKELY(!page_rec_is_supremum_low(supremum_offs))) {
1580
1581 ib::fatal() << "Page directory corruption: supremum not"
1582 " pointed to";
1583 }
1584 }
1585 #endif /* UNIV_DEBUG */
1586
1587 /***************************************************************//**
1588 This function checks the consistency of an index page when we do not
1589 know the index. This is also resilient so that this should never crash
1590 even if the page is total garbage.
1591 @return TRUE if ok */
1592 ibool
page_simple_validate_old(const page_t * page)1593 page_simple_validate_old(
1594 /*=====================*/
1595 const page_t* page) /*!< in: index page in ROW_FORMAT=REDUNDANT */
1596 {
1597 const page_dir_slot_t* slot;
1598 ulint slot_no;
1599 ulint n_slots;
1600 const rec_t* rec;
1601 const byte* rec_heap_top;
1602 ulint count;
1603 ulint own_count;
1604 ibool ret = FALSE;
1605
1606 ut_a(!page_is_comp(page));
1607
1608 /* Check first that the record heap and the directory do not
1609 overlap. */
1610
1611 n_slots = page_dir_get_n_slots(page);
1612
1613 if (UNIV_UNLIKELY(n_slots < 2 || n_slots > srv_page_size / 4)) {
1614 ib::error() << "Nonsensical number of page dir slots: "
1615 << n_slots;
1616 goto func_exit;
1617 }
1618
1619 rec_heap_top = page_header_get_ptr(page, PAGE_HEAP_TOP);
1620
1621 if (UNIV_UNLIKELY(rec_heap_top
1622 > page_dir_get_nth_slot(page, n_slots - 1))) {
1623 ib::error()
1624 << "Record heap and dir overlap on a page, heap top "
1625 << page_header_get_field(page, PAGE_HEAP_TOP)
1626 << ", dir "
1627 << page_offset(page_dir_get_nth_slot(page,
1628 n_slots - 1));
1629
1630 goto func_exit;
1631 }
1632
1633 /* Validate the record list in a loop checking also that it is
1634 consistent with the page record directory. */
1635
1636 count = 0;
1637 own_count = 1;
1638 slot_no = 0;
1639 slot = page_dir_get_nth_slot(page, slot_no);
1640
1641 rec = page_get_infimum_rec(page);
1642
1643 for (;;) {
1644 if (UNIV_UNLIKELY(rec > rec_heap_top)) {
1645 ib::error() << "Record " << (rec - page)
1646 << " is above rec heap top "
1647 << (rec_heap_top - page);
1648
1649 goto func_exit;
1650 }
1651
1652 if (UNIV_UNLIKELY(rec_get_n_owned_old(rec) != 0)) {
1653 /* This is a record pointed to by a dir slot */
1654 if (UNIV_UNLIKELY(rec_get_n_owned_old(rec)
1655 != own_count)) {
1656
1657 ib::error() << "Wrong owned count "
1658 << rec_get_n_owned_old(rec)
1659 << ", " << own_count << ", rec "
1660 << (rec - page);
1661
1662 goto func_exit;
1663 }
1664
1665 if (UNIV_UNLIKELY
1666 (page_dir_slot_get_rec(slot) != rec)) {
1667 ib::error() << "Dir slot does not point"
1668 " to right rec " << (rec - page);
1669
1670 goto func_exit;
1671 }
1672
1673 own_count = 0;
1674
1675 if (!page_rec_is_supremum(rec)) {
1676 slot_no++;
1677 slot = page_dir_get_nth_slot(page, slot_no);
1678 }
1679 }
1680
1681 if (page_rec_is_supremum(rec)) {
1682
1683 break;
1684 }
1685
1686 if (UNIV_UNLIKELY
1687 (rec_get_next_offs(rec, FALSE) < FIL_PAGE_DATA
1688 || rec_get_next_offs(rec, FALSE) >= srv_page_size)) {
1689
1690 ib::error() << "Next record offset nonsensical "
1691 << rec_get_next_offs(rec, FALSE) << " for rec "
1692 << (rec - page);
1693
1694 goto func_exit;
1695 }
1696
1697 count++;
1698
1699 if (UNIV_UNLIKELY(count > srv_page_size)) {
1700 ib::error() << "Page record list appears"
1701 " to be circular " << count;
1702 goto func_exit;
1703 }
1704
1705 rec = page_rec_get_next_const(rec);
1706 own_count++;
1707 }
1708
1709 if (UNIV_UNLIKELY(rec_get_n_owned_old(rec) == 0)) {
1710 ib::error() << "n owned is zero in a supremum rec";
1711
1712 goto func_exit;
1713 }
1714
1715 if (UNIV_UNLIKELY(slot_no != n_slots - 1)) {
1716 ib::error() << "n slots wrong "
1717 << slot_no << ", " << (n_slots - 1);
1718 goto func_exit;
1719 }
1720
1721 if (UNIV_UNLIKELY(ulint(page_header_get_field(page, PAGE_N_RECS))
1722 + PAGE_HEAP_NO_USER_LOW
1723 != count + 1)) {
1724 ib::error() << "n recs wrong "
1725 << page_header_get_field(page, PAGE_N_RECS)
1726 + PAGE_HEAP_NO_USER_LOW << " " << (count + 1);
1727
1728 goto func_exit;
1729 }
1730
1731 /* Check then the free list */
1732 rec = page_header_get_ptr(page, PAGE_FREE);
1733
1734 while (rec != NULL) {
1735 if (UNIV_UNLIKELY(rec < page + FIL_PAGE_DATA
1736 || rec >= page + srv_page_size)) {
1737 ib::error() << "Free list record has"
1738 " a nonsensical offset " << (rec - page);
1739
1740 goto func_exit;
1741 }
1742
1743 if (UNIV_UNLIKELY(rec > rec_heap_top)) {
1744 ib::error() << "Free list record " << (rec - page)
1745 << " is above rec heap top "
1746 << (rec_heap_top - page);
1747
1748 goto func_exit;
1749 }
1750
1751 count++;
1752
1753 if (UNIV_UNLIKELY(count > srv_page_size)) {
1754 ib::error() << "Page free list appears"
1755 " to be circular " << count;
1756 goto func_exit;
1757 }
1758
1759 ulint offs = rec_get_next_offs(rec, FALSE);
1760 if (!offs) {
1761 break;
1762 }
1763 if (UNIV_UNLIKELY(offs < PAGE_OLD_INFIMUM
1764 || offs >= srv_page_size)) {
1765 ib::error() << "Page free list is corrupted " << count;
1766 goto func_exit;
1767 }
1768
1769 rec = page + offs;
1770 }
1771
1772 if (UNIV_UNLIKELY(page_dir_get_n_heap(page) != count + 1)) {
1773
1774 ib::error() << "N heap is wrong "
1775 << page_dir_get_n_heap(page) << ", " << (count + 1);
1776
1777 goto func_exit;
1778 }
1779
1780 ret = TRUE;
1781
1782 func_exit:
1783 return(ret);
1784 }
1785
1786 /***************************************************************//**
1787 This function checks the consistency of an index page when we do not
1788 know the index. This is also resilient so that this should never crash
1789 even if the page is total garbage.
1790 @return TRUE if ok */
1791 ibool
page_simple_validate_new(const page_t * page)1792 page_simple_validate_new(
1793 /*=====================*/
1794 const page_t* page) /*!< in: index page in ROW_FORMAT!=REDUNDANT */
1795 {
1796 const page_dir_slot_t* slot;
1797 ulint slot_no;
1798 ulint n_slots;
1799 const rec_t* rec;
1800 const byte* rec_heap_top;
1801 ulint count;
1802 ulint own_count;
1803 ibool ret = FALSE;
1804
1805 ut_a(page_is_comp(page));
1806
1807 /* Check first that the record heap and the directory do not
1808 overlap. */
1809
1810 n_slots = page_dir_get_n_slots(page);
1811
1812 if (UNIV_UNLIKELY(n_slots < 2 || n_slots > srv_page_size / 4)) {
1813 ib::error() << "Nonsensical number of page dir slots: "
1814 << n_slots;
1815 goto func_exit;
1816 }
1817
1818 rec_heap_top = page_header_get_ptr(page, PAGE_HEAP_TOP);
1819
1820 if (UNIV_UNLIKELY(rec_heap_top
1821 > page_dir_get_nth_slot(page, n_slots - 1))) {
1822
1823 ib::error() << "Record heap and dir overlap on a page,"
1824 " heap top "
1825 << page_header_get_field(page, PAGE_HEAP_TOP)
1826 << ", dir " << page_offset(
1827 page_dir_get_nth_slot(page, n_slots - 1));
1828
1829 goto func_exit;
1830 }
1831
1832 /* Validate the record list in a loop checking also that it is
1833 consistent with the page record directory. */
1834
1835 count = 0;
1836 own_count = 1;
1837 slot_no = 0;
1838 slot = page_dir_get_nth_slot(page, slot_no);
1839
1840 rec = page_get_infimum_rec(page);
1841
1842 for (;;) {
1843 if (UNIV_UNLIKELY(rec > rec_heap_top)) {
1844
1845 ib::error() << "Record " << page_offset(rec)
1846 << " is above rec heap top "
1847 << page_offset(rec_heap_top);
1848
1849 goto func_exit;
1850 }
1851
1852 if (UNIV_UNLIKELY(rec_get_n_owned_new(rec) != 0)) {
1853 /* This is a record pointed to by a dir slot */
1854 if (UNIV_UNLIKELY(rec_get_n_owned_new(rec)
1855 != own_count)) {
1856
1857 ib::error() << "Wrong owned count "
1858 << rec_get_n_owned_new(rec) << ", "
1859 << own_count << ", rec "
1860 << page_offset(rec);
1861
1862 goto func_exit;
1863 }
1864
1865 if (UNIV_UNLIKELY
1866 (page_dir_slot_get_rec(slot) != rec)) {
1867 ib::error() << "Dir slot does not point"
1868 " to right rec " << page_offset(rec);
1869
1870 goto func_exit;
1871 }
1872
1873 own_count = 0;
1874
1875 if (!page_rec_is_supremum(rec)) {
1876 slot_no++;
1877 slot = page_dir_get_nth_slot(page, slot_no);
1878 }
1879 }
1880
1881 if (page_rec_is_supremum(rec)) {
1882
1883 break;
1884 }
1885
1886 if (UNIV_UNLIKELY
1887 (rec_get_next_offs(rec, TRUE) < FIL_PAGE_DATA
1888 || rec_get_next_offs(rec, TRUE) >= srv_page_size)) {
1889
1890 ib::error() << "Next record offset nonsensical "
1891 << rec_get_next_offs(rec, TRUE)
1892 << " for rec " << page_offset(rec);
1893
1894 goto func_exit;
1895 }
1896
1897 count++;
1898
1899 if (UNIV_UNLIKELY(count > srv_page_size)) {
1900 ib::error() << "Page record list appears to be"
1901 " circular " << count;
1902 goto func_exit;
1903 }
1904
1905 rec = page_rec_get_next_const(rec);
1906 own_count++;
1907 }
1908
1909 if (UNIV_UNLIKELY(rec_get_n_owned_new(rec) == 0)) {
1910 ib::error() << "n owned is zero in a supremum rec";
1911
1912 goto func_exit;
1913 }
1914
1915 if (UNIV_UNLIKELY(slot_no != n_slots - 1)) {
1916 ib::error() << "n slots wrong " << slot_no << ", "
1917 << (n_slots - 1);
1918 goto func_exit;
1919 }
1920
1921 if (UNIV_UNLIKELY(ulint(page_header_get_field(page, PAGE_N_RECS))
1922 + PAGE_HEAP_NO_USER_LOW
1923 != count + 1)) {
1924 ib::error() << "n recs wrong "
1925 << page_header_get_field(page, PAGE_N_RECS)
1926 + PAGE_HEAP_NO_USER_LOW << " " << (count + 1);
1927
1928 goto func_exit;
1929 }
1930
1931 /* Check then the free list */
1932 rec = page_header_get_ptr(page, PAGE_FREE);
1933
1934 while (rec != NULL) {
1935 if (UNIV_UNLIKELY(rec < page + FIL_PAGE_DATA
1936 || rec >= page + srv_page_size)) {
1937
1938 ib::error() << "Free list record has"
1939 " a nonsensical offset " << page_offset(rec);
1940
1941 goto func_exit;
1942 }
1943
1944 if (UNIV_UNLIKELY(rec > rec_heap_top)) {
1945 ib::error() << "Free list record " << page_offset(rec)
1946 << " is above rec heap top "
1947 << page_offset(rec_heap_top);
1948
1949 goto func_exit;
1950 }
1951
1952 count++;
1953
1954 if (UNIV_UNLIKELY(count > srv_page_size)) {
1955 ib::error() << "Page free list appears to be"
1956 " circular " << count;
1957 goto func_exit;
1958 }
1959
1960 const ulint offs = rec_get_next_offs(rec, TRUE);
1961 if (!offs) {
1962 break;
1963 }
1964 if (UNIV_UNLIKELY(offs < PAGE_OLD_INFIMUM
1965 || offs >= srv_page_size)) {
1966 ib::error() << "Page free list is corrupted " << count;
1967 goto func_exit;
1968 }
1969
1970 rec = page + offs;
1971 }
1972
1973 if (UNIV_UNLIKELY(page_dir_get_n_heap(page) != count + 1)) {
1974
1975 ib::error() << "N heap is wrong "
1976 << page_dir_get_n_heap(page) << ", " << (count + 1);
1977
1978 goto func_exit;
1979 }
1980
1981 ret = TRUE;
1982
1983 func_exit:
1984 return(ret);
1985 }
1986
1987 /** Check the consistency of an index page.
1988 @param[in] page index page
1989 @param[in] index B-tree or R-tree index
1990 @return whether the page is valid */
page_validate(const page_t * page,const dict_index_t * index)1991 bool page_validate(const page_t* page, const dict_index_t* index)
1992 {
1993 const page_dir_slot_t* slot;
1994 const rec_t* rec;
1995 const rec_t* old_rec = NULL;
1996 const rec_t* first_rec = NULL;
1997 ulint offs = 0;
1998 ulint n_slots;
1999 ibool ret = TRUE;
2000 ulint i;
2001 rec_offs offsets_1[REC_OFFS_NORMAL_SIZE];
2002 rec_offs offsets_2[REC_OFFS_NORMAL_SIZE];
2003 rec_offs* offsets = offsets_1;
2004 rec_offs* old_offsets = offsets_2;
2005
2006 rec_offs_init(offsets_1);
2007 rec_offs_init(offsets_2);
2008
2009 #ifdef UNIV_GIS_DEBUG
2010 if (dict_index_is_spatial(index)) {
2011 fprintf(stderr, "Page no: %lu\n", page_get_page_no(page));
2012 }
2013 #endif /* UNIV_DEBUG */
2014
2015 if (UNIV_UNLIKELY((ibool) !!page_is_comp(page)
2016 != dict_table_is_comp(index->table))) {
2017 ib::error() << "'compact format' flag mismatch";
2018 func_exit2:
2019 ib::error() << "Apparent corruption in space "
2020 << page_get_space_id(page) << " page "
2021 << page_get_page_no(page)
2022 << " of index " << index->name
2023 << " of table " << index->table->name;
2024 return FALSE;
2025 }
2026
2027 if (page_is_comp(page)) {
2028 if (UNIV_UNLIKELY(!page_simple_validate_new(page))) {
2029 goto func_exit2;
2030 }
2031 } else {
2032 if (UNIV_UNLIKELY(!page_simple_validate_old(page))) {
2033 goto func_exit2;
2034 }
2035 }
2036
2037 /* Multiple transactions cannot simultaneously operate on the
2038 same temp-table in parallel.
2039 max_trx_id is ignored for temp tables because it not required
2040 for MVCC. */
2041 if (!page_is_leaf(page) || page_is_empty(page)
2042 || !dict_index_is_sec_or_ibuf(index)
2043 || index->table->is_temporary()) {
2044 } else if (trx_id_t sys_max_trx_id = trx_sys.get_max_trx_id()) {
2045 trx_id_t max_trx_id = page_get_max_trx_id(page);
2046
2047 if (max_trx_id == 0 || max_trx_id > sys_max_trx_id) {
2048 ib::error() << "PAGE_MAX_TRX_ID out of bounds: "
2049 << max_trx_id << ", " << sys_max_trx_id;
2050 ret = FALSE;
2051 }
2052 } else {
2053 ut_ad(srv_force_recovery >= SRV_FORCE_NO_UNDO_LOG_SCAN);
2054 }
2055
2056 /* Check first that the record heap and the directory do not
2057 overlap. */
2058
2059 n_slots = page_dir_get_n_slots(page);
2060
2061 if (UNIV_UNLIKELY(!(page_header_get_ptr(page, PAGE_HEAP_TOP)
2062 <= page_dir_get_nth_slot(page, n_slots - 1)))) {
2063
2064 ib::warn() << "Record heap and directory overlap";
2065 goto func_exit2;
2066 }
2067
2068 switch (uint16_t type = fil_page_get_type(page)) {
2069 case FIL_PAGE_RTREE:
2070 if (!index->is_spatial()) {
2071 wrong_page_type:
2072 ib::warn() << "Wrong page type " << type;
2073 ret = FALSE;
2074 }
2075 break;
2076 case FIL_PAGE_TYPE_INSTANT:
2077 if (index->is_instant()
2078 && page_get_page_no(page) == index->page) {
2079 break;
2080 }
2081 goto wrong_page_type;
2082 case FIL_PAGE_INDEX:
2083 if (index->is_spatial()) {
2084 goto wrong_page_type;
2085 }
2086 if (index->is_instant()
2087 && page_get_page_no(page) == index->page) {
2088 goto wrong_page_type;
2089 }
2090 break;
2091 default:
2092 goto wrong_page_type;
2093 }
2094
2095 /* The following buffer is used to check that the
2096 records in the page record heap do not overlap */
2097 mem_heap_t* heap = mem_heap_create(srv_page_size + 200);;
2098 byte* buf = static_cast<byte*>(mem_heap_zalloc(heap, srv_page_size));
2099
2100 /* Validate the record list in a loop checking also that
2101 it is consistent with the directory. */
2102 ulint count = 0, data_size = 0, own_count = 1, slot_no = 0;
2103 ulint info_bits;
2104 slot_no = 0;
2105 slot = page_dir_get_nth_slot(page, slot_no);
2106
2107 rec = page_get_infimum_rec(page);
2108
2109 const ulint n_core = page_is_leaf(page) ? index->n_core_fields : 0;
2110
2111 for (;;) {
2112 offsets = rec_get_offsets(rec, index, offsets, n_core,
2113 ULINT_UNDEFINED, &heap);
2114
2115 if (page_is_comp(page) && page_rec_is_user_rec(rec)
2116 && UNIV_UNLIKELY(rec_get_node_ptr_flag(rec)
2117 == page_is_leaf(page))) {
2118 ib::error() << "'node_ptr' flag mismatch";
2119 ret = FALSE;
2120 goto next_rec;
2121 }
2122
2123 if (UNIV_UNLIKELY(!page_rec_validate(rec, offsets))) {
2124 ret = FALSE;
2125 goto next_rec;
2126 }
2127
2128 info_bits = rec_get_info_bits(rec, page_is_comp(page));
2129 if (info_bits
2130 & ~(REC_INFO_MIN_REC_FLAG | REC_INFO_DELETED_FLAG)) {
2131 ib::error() << "info_bits has an incorrect value "
2132 << info_bits;
2133 ret = false;
2134 }
2135
2136 if (rec == first_rec) {
2137 if (info_bits & REC_INFO_MIN_REC_FLAG) {
2138 if (page_has_prev(page)) {
2139 ib::error() << "REC_INFO_MIN_REC_FLAG "
2140 "is set on non-left page";
2141 ret = false;
2142 } else if (!page_is_leaf(page)) {
2143 /* leftmost node pointer page */
2144 } else if (!index->is_instant()) {
2145 ib::error() << "REC_INFO_MIN_REC_FLAG "
2146 "is set in a leaf-page record";
2147 ret = false;
2148 } else if (!(info_bits & REC_INFO_DELETED_FLAG)
2149 != !index->table->instant) {
2150 ib::error() << (index->table->instant
2151 ? "Metadata record "
2152 "is not delete-marked"
2153 : "Metadata record "
2154 "is delete-marked");
2155 ret = false;
2156 }
2157 } else if (!page_has_prev(page)
2158 && index->is_instant()) {
2159 ib::error() << "Metadata record is missing";
2160 ret = false;
2161 }
2162 } else if (info_bits & REC_INFO_MIN_REC_FLAG) {
2163 ib::error() << "REC_INFO_MIN_REC_FLAG record is not "
2164 "first in page";
2165 ret = false;
2166 }
2167
2168 if (page_is_comp(page)) {
2169 const rec_comp_status_t status = rec_get_status(rec);
2170 if (status != REC_STATUS_ORDINARY
2171 && status != REC_STATUS_NODE_PTR
2172 && status != REC_STATUS_INFIMUM
2173 && status != REC_STATUS_SUPREMUM
2174 && status != REC_STATUS_INSTANT) {
2175 ib::error() << "impossible record status "
2176 << status;
2177 ret = false;
2178 } else if (page_rec_is_infimum(rec)) {
2179 if (status != REC_STATUS_INFIMUM) {
2180 ib::error()
2181 << "infimum record has status "
2182 << status;
2183 ret = false;
2184 }
2185 } else if (page_rec_is_supremum(rec)) {
2186 if (status != REC_STATUS_SUPREMUM) {
2187 ib::error() << "supremum record has "
2188 "status "
2189 << status;
2190 ret = false;
2191 }
2192 } else if (!page_is_leaf(page)) {
2193 if (status != REC_STATUS_NODE_PTR) {
2194 ib::error() << "node ptr record has "
2195 "status "
2196 << status;
2197 ret = false;
2198 }
2199 } else if (!index->is_instant()
2200 && status == REC_STATUS_INSTANT) {
2201 ib::error() << "instantly added record in a "
2202 "non-instant index";
2203 ret = false;
2204 }
2205 }
2206
2207 /* Check that the records are in the ascending order */
2208 if (count >= PAGE_HEAP_NO_USER_LOW
2209 && !page_rec_is_supremum(rec)) {
2210
2211 int ret = cmp_rec_rec(
2212 rec, old_rec, offsets, old_offsets, index);
2213
2214 /* For spatial index, on nonleaf leavel, we
2215 allow recs to be equal. */
2216 if (ret <= 0 && !(ret == 0 && index->is_spatial()
2217 && !page_is_leaf(page))) {
2218
2219 ib::error() << "Records in wrong order";
2220
2221 fputs("\nInnoDB: previous record ", stderr);
2222 /* For spatial index, print the mbr info.*/
2223 if (index->type & DICT_SPATIAL) {
2224 putc('\n', stderr);
2225 rec_print_mbr_rec(stderr,
2226 old_rec, old_offsets);
2227 fputs("\nInnoDB: record ", stderr);
2228 putc('\n', stderr);
2229 rec_print_mbr_rec(stderr, rec, offsets);
2230 putc('\n', stderr);
2231 putc('\n', stderr);
2232
2233 } else {
2234 rec_print_new(stderr, old_rec, old_offsets);
2235 fputs("\nInnoDB: record ", stderr);
2236 rec_print_new(stderr, rec, offsets);
2237 putc('\n', stderr);
2238 }
2239
2240 ret = FALSE;
2241 }
2242 }
2243
2244 if (page_rec_is_user_rec(rec)) {
2245
2246 data_size += rec_offs_size(offsets);
2247
2248 #if defined(UNIV_GIS_DEBUG)
2249 /* For spatial index, print the mbr info.*/
2250 if (index->type & DICT_SPATIAL) {
2251 rec_print_mbr_rec(stderr, rec, offsets);
2252 putc('\n', stderr);
2253 }
2254 #endif /* UNIV_GIS_DEBUG */
2255 }
2256
2257 offs = page_offset(rec_get_start(rec, offsets));
2258 i = rec_offs_size(offsets);
2259 if (UNIV_UNLIKELY(offs + i >= srv_page_size)) {
2260 ib::error() << "Record offset out of bounds: "
2261 << offs << '+' << i;
2262 ret = FALSE;
2263 goto next_rec;
2264 }
2265 while (i--) {
2266 if (UNIV_UNLIKELY(buf[offs + i])) {
2267 ib::error() << "Record overlaps another: "
2268 << offs << '+' << i;
2269 ret = FALSE;
2270 break;
2271 }
2272 buf[offs + i] = 1;
2273 }
2274
2275 if (ulint rec_own_count = page_is_comp(page)
2276 ? rec_get_n_owned_new(rec)
2277 : rec_get_n_owned_old(rec)) {
2278 /* This is a record pointed to by a dir slot */
2279 if (UNIV_UNLIKELY(rec_own_count != own_count)) {
2280 ib::error() << "Wrong owned count at " << offs
2281 << ": " << rec_own_count
2282 << ", " << own_count;
2283 ret = FALSE;
2284 }
2285
2286 if (page_dir_slot_get_rec(slot) != rec) {
2287 ib::error() << "Dir slot does not"
2288 " point to right rec at " << offs;
2289 ret = FALSE;
2290 }
2291
2292 if (ret) {
2293 page_dir_slot_check(slot);
2294 }
2295
2296 own_count = 0;
2297 if (!page_rec_is_supremum(rec)) {
2298 slot_no++;
2299 slot = page_dir_get_nth_slot(page, slot_no);
2300 }
2301 }
2302
2303 next_rec:
2304 if (page_rec_is_supremum(rec)) {
2305 break;
2306 }
2307
2308 count++;
2309 own_count++;
2310 old_rec = rec;
2311 rec = page_rec_get_next_const(rec);
2312
2313 if (page_rec_is_infimum(old_rec)
2314 && page_rec_is_user_rec(rec)) {
2315 first_rec = rec;
2316 }
2317
2318 /* set old_offsets to offsets; recycle offsets */
2319 std::swap(old_offsets, offsets);
2320 }
2321
2322 if (page_is_comp(page)) {
2323 if (UNIV_UNLIKELY(rec_get_n_owned_new(rec) == 0)) {
2324
2325 goto n_owned_zero;
2326 }
2327 } else if (UNIV_UNLIKELY(rec_get_n_owned_old(rec) == 0)) {
2328 n_owned_zero:
2329 ib::error() << "n owned is zero at " << offs;
2330 ret = FALSE;
2331 }
2332
2333 if (UNIV_UNLIKELY(slot_no != n_slots - 1)) {
2334 ib::error() << "n slots wrong " << slot_no << " "
2335 << (n_slots - 1);
2336 ret = FALSE;
2337 }
2338
2339 if (UNIV_UNLIKELY(ulint(page_header_get_field(page, PAGE_N_RECS))
2340 + PAGE_HEAP_NO_USER_LOW
2341 != count + 1)) {
2342 ib::error() << "n recs wrong "
2343 << page_header_get_field(page, PAGE_N_RECS)
2344 + PAGE_HEAP_NO_USER_LOW << " " << (count + 1);
2345 ret = FALSE;
2346 }
2347
2348 if (UNIV_UNLIKELY(data_size != page_get_data_size(page))) {
2349 ib::error() << "Summed data size " << data_size
2350 << ", returned by func " << page_get_data_size(page);
2351 ret = FALSE;
2352 }
2353
2354 /* Check then the free list */
2355 rec = page_header_get_ptr(page, PAGE_FREE);
2356
2357 while (rec != NULL) {
2358 offsets = rec_get_offsets(rec, index, offsets, n_core,
2359 ULINT_UNDEFINED, &heap);
2360 if (UNIV_UNLIKELY(!page_rec_validate(rec, offsets))) {
2361 ret = FALSE;
2362 next_free:
2363 const ulint offs = rec_get_next_offs(
2364 rec, page_is_comp(page));
2365 if (!offs) {
2366 break;
2367 }
2368 if (UNIV_UNLIKELY(offs < PAGE_OLD_INFIMUM
2369 || offs >= srv_page_size)) {
2370 ib::error() << "Page free list is corrupted";
2371 ret = FALSE;
2372 break;
2373 }
2374
2375 rec = page + offs;
2376 continue;
2377 }
2378
2379 count++;
2380 offs = page_offset(rec_get_start(rec, offsets));
2381 i = rec_offs_size(offsets);
2382 if (UNIV_UNLIKELY(offs + i >= srv_page_size)) {
2383 ib::error() << "Free record offset out of bounds: "
2384 << offs << '+' << i;
2385 ret = FALSE;
2386 goto next_free;
2387 }
2388 while (i--) {
2389 if (UNIV_UNLIKELY(buf[offs + i])) {
2390 ib::error() << "Free record overlaps another: "
2391 << offs << '+' << i;
2392 ret = FALSE;
2393 break;
2394 }
2395 buf[offs + i] = 1;
2396 }
2397
2398 goto next_free;
2399 }
2400
2401 if (UNIV_UNLIKELY(page_dir_get_n_heap(page) != count + 1)) {
2402 ib::error() << "N heap is wrong "
2403 << page_dir_get_n_heap(page) << " " << count + 1;
2404 ret = FALSE;
2405 }
2406
2407 mem_heap_free(heap);
2408
2409 if (UNIV_UNLIKELY(!ret)) {
2410 goto func_exit2;
2411 }
2412
2413 return(ret);
2414 }
2415
2416 /***************************************************************//**
2417 Looks in the page record list for a record with the given heap number.
2418 @return record, NULL if not found */
2419 const rec_t*
page_find_rec_with_heap_no(const page_t * page,ulint heap_no)2420 page_find_rec_with_heap_no(
2421 /*=======================*/
2422 const page_t* page, /*!< in: index page */
2423 ulint heap_no)/*!< in: heap number */
2424 {
2425 const rec_t* rec;
2426
2427 if (page_is_comp(page)) {
2428 rec = page + PAGE_NEW_INFIMUM;
2429
2430 for (;;) {
2431 ulint rec_heap_no = rec_get_heap_no_new(rec);
2432
2433 if (rec_heap_no == heap_no) {
2434
2435 return(rec);
2436 } else if (rec_heap_no == PAGE_HEAP_NO_SUPREMUM) {
2437
2438 return(NULL);
2439 }
2440
2441 rec = page + rec_get_next_offs(rec, TRUE);
2442 }
2443 } else {
2444 rec = page + PAGE_OLD_INFIMUM;
2445
2446 for (;;) {
2447 ulint rec_heap_no = rec_get_heap_no_old(rec);
2448
2449 if (rec_heap_no == heap_no) {
2450
2451 return(rec);
2452 } else if (rec_heap_no == PAGE_HEAP_NO_SUPREMUM) {
2453
2454 return(NULL);
2455 }
2456
2457 rec = page + rec_get_next_offs(rec, FALSE);
2458 }
2459 }
2460 }
2461
2462 /** Get the last non-delete-marked record on a page.
2463 @param[in] page index tree leaf page
2464 @return the last record, not delete-marked
2465 @retval infimum record if all records are delete-marked */
2466 const rec_t*
page_find_rec_max_not_deleted(const page_t * page)2467 page_find_rec_max_not_deleted(
2468 const page_t* page)
2469 {
2470 const rec_t* rec = page_get_infimum_rec(page);
2471 const rec_t* prev_rec = NULL; // remove warning
2472
2473 /* Because the page infimum is never delete-marked
2474 and never the metadata pseudo-record (MIN_REC_FLAG)),
2475 prev_rec will always be assigned to it first. */
2476 ut_ad(!rec_get_info_bits(rec, page_rec_is_comp(rec)));
2477 ut_ad(page_is_leaf(page));
2478
2479 if (page_is_comp(page)) {
2480 do {
2481 if (!(rec[-REC_NEW_INFO_BITS]
2482 & (REC_INFO_DELETED_FLAG
2483 | REC_INFO_MIN_REC_FLAG))) {
2484 prev_rec = rec;
2485 }
2486 rec = page_rec_get_next_low(rec, true);
2487 } while (rec != page + PAGE_NEW_SUPREMUM);
2488 } else {
2489 do {
2490 if (!(rec[-REC_OLD_INFO_BITS]
2491 & (REC_INFO_DELETED_FLAG
2492 | REC_INFO_MIN_REC_FLAG))) {
2493 prev_rec = rec;
2494 }
2495 rec = page_rec_get_next_low(rec, false);
2496 } while (rec != page + PAGE_OLD_SUPREMUM);
2497 }
2498 return(prev_rec);
2499 }
2500