1 /*****************************************************************************
2
3 Copyright (c) 1994, 2016, Oracle and/or its affiliates. All Rights Reserved.
4 Copyright (c) 2012, Facebook Inc.
5 Copyright (c) 2017, 2021, MariaDB Corporation.
6
7 This program is free software; you can redistribute it and/or modify it under
8 the terms of the GNU General Public License as published by the Free Software
9 Foundation; version 2 of the License.
10
11 This program is distributed in the hope that it will be useful, but WITHOUT
12 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
13 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License along with
16 this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
18
19 *****************************************************************************/
20
21 /**************************************************//**
22 @file page/page0page.cc
23 Index page routines
24
25 Created 2/2/1994 Heikki Tuuri
26 *******************************************************/
27
28 #include "page0page.h"
29 #include "page0cur.h"
30 #include "page0zip.h"
31 #include "buf0buf.h"
32 #include "buf0checksum.h"
33 #include "btr0btr.h"
34 #include "srv0srv.h"
35 #include "lock0lock.h"
36 #include "fut0lst.h"
37 #include "btr0sea.h"
38 #include "trx0sys.h"
39 #include <algorithm>
40
41 /* THE INDEX PAGE
42 ==============
43
44 The index page consists of a page header which contains the page's
45 id and other information. On top of it are the index records
46 in a heap linked into a one way linear list according to alphabetic order.
47
48 Just below page end is an array of pointers which we call page directory,
49 to about every sixth record in the list. The pointers are placed in
50 the directory in the alphabetical order of the records pointed to,
51 enabling us to make binary search using the array. Each slot n:o I
52 in the directory points to a record, where a 4-bit field contains a count
53 of those records which are in the linear list between pointer I and
54 the pointer I - 1 in the directory, including the record
55 pointed to by pointer I and not including the record pointed to by I - 1.
56 We say that the record pointed to by slot I, or that slot I, owns
57 these records. The count is always kept in the range 4 to 8, with
58 the exception that it is 1 for the first slot, and 1--8 for the second slot.
59
60 An essentially binary search can be performed in the list of index
61 records, like we could do if we had pointer to every record in the
62 page directory. The data structure is, however, more efficient when
63 we are doing inserts, because most inserts are just pushed on a heap.
64 Only every 8th insert requires block move in the directory pointer
65 table, which itself is quite small. A record is deleted from the page
66 by just taking it off the linear list and updating the number of owned
67 records-field of the record which owns it, and updating the page directory,
68 if necessary. A special case is the one when the record owns itself.
69 Because the overhead of inserts is so small, we may also increase the
70 page size from the projected default of 8 kB to 64 kB without too
71 much loss of efficiency in inserts. Bigger page becomes actual
72 when the disk transfer rate compared to seek and latency time rises.
73 On the present system, the page size is set so that the page transfer
74 time (3 ms) is 20 % of the disk random access time (15 ms).
75
76 When the page is split, merged, or becomes full but contains deleted
77 records, we have to reorganize the page.
78
79 Assuming a page size of 8 kB, a typical index page of a secondary
80 index contains 300 index entries, and the size of the page directory
81 is 50 x 4 bytes = 200 bytes. */
82
83 /***************************************************************//**
84 Looks for the directory slot which owns the given record.
85 @return the directory slot number */
86 ulint
page_dir_find_owner_slot(const rec_t * rec)87 page_dir_find_owner_slot(
88 /*=====================*/
89 const rec_t* rec) /*!< in: the physical record */
90 {
91 ut_ad(page_rec_check(rec));
92
93 const page_t* page = page_align(rec);
94 const page_dir_slot_t* first_slot = page_dir_get_nth_slot(page, 0);
95 const page_dir_slot_t* slot = page_dir_get_nth_slot(
96 page, ulint(page_dir_get_n_slots(page)) - 1);
97 const rec_t* r = rec;
98
99 if (page_is_comp(page)) {
100 while (rec_get_n_owned_new(r) == 0) {
101 r = rec_get_next_ptr_const(r, TRUE);
102 ut_ad(r >= page + PAGE_NEW_SUPREMUM);
103 ut_ad(r < page + (srv_page_size - PAGE_DIR));
104 }
105 } else {
106 while (rec_get_n_owned_old(r) == 0) {
107 r = rec_get_next_ptr_const(r, FALSE);
108 ut_ad(r >= page + PAGE_OLD_SUPREMUM);
109 ut_ad(r < page + (srv_page_size - PAGE_DIR));
110 }
111 }
112
113 uint16 rec_offs_bytes = mach_encode_2(ulint(r - page));
114
115 while (UNIV_LIKELY(*(uint16*) slot != rec_offs_bytes)) {
116
117 if (UNIV_UNLIKELY(slot == first_slot)) {
118 ib::error() << "Probable data corruption on page "
119 << page_get_page_no(page)
120 << ". Original record on that page;";
121
122 if (page_is_comp(page)) {
123 fputs("(compact record)", stderr);
124 } else {
125 rec_print_old(stderr, rec);
126 }
127
128 ib::error() << "Cannot find the dir slot for this"
129 " record on that page;";
130
131 if (page_is_comp(page)) {
132 fputs("(compact record)", stderr);
133 } else {
134 rec_print_old(stderr, page
135 + mach_decode_2(rec_offs_bytes));
136 }
137
138 ut_error;
139 }
140
141 slot += PAGE_DIR_SLOT_SIZE;
142 }
143
144 return(((ulint) (first_slot - slot)) / PAGE_DIR_SLOT_SIZE);
145 }
146
147 /**************************************************************//**
148 Used to check the consistency of a directory slot.
149 @return TRUE if succeed */
150 static
151 ibool
page_dir_slot_check(const page_dir_slot_t * slot)152 page_dir_slot_check(
153 /*================*/
154 const page_dir_slot_t* slot) /*!< in: slot */
155 {
156 const page_t* page;
157 ulint n_slots;
158 ulint n_owned;
159
160 ut_a(slot);
161
162 page = page_align(slot);
163
164 n_slots = page_dir_get_n_slots(page);
165
166 ut_a(slot <= page_dir_get_nth_slot(page, 0));
167 ut_a(slot >= page_dir_get_nth_slot(page, n_slots - 1));
168
169 ut_a(page_rec_check(page_dir_slot_get_rec(slot)));
170
171 if (page_is_comp(page)) {
172 n_owned = rec_get_n_owned_new(page_dir_slot_get_rec(slot));
173 } else {
174 n_owned = rec_get_n_owned_old(page_dir_slot_get_rec(slot));
175 }
176
177 if (slot == page_dir_get_nth_slot(page, 0)) {
178 ut_a(n_owned == 1);
179 } else if (slot == page_dir_get_nth_slot(page, n_slots - 1)) {
180 ut_a(n_owned >= 1);
181 ut_a(n_owned <= PAGE_DIR_SLOT_MAX_N_OWNED);
182 } else {
183 ut_a(n_owned >= PAGE_DIR_SLOT_MIN_N_OWNED);
184 ut_a(n_owned <= PAGE_DIR_SLOT_MAX_N_OWNED);
185 }
186
187 return(TRUE);
188 }
189
190 /*************************************************************//**
191 Sets the max trx id field value. */
192 void
page_set_max_trx_id(buf_block_t * block,page_zip_des_t * page_zip,trx_id_t trx_id,mtr_t * mtr)193 page_set_max_trx_id(
194 /*================*/
195 buf_block_t* block, /*!< in/out: page */
196 page_zip_des_t* page_zip,/*!< in/out: compressed page, or NULL */
197 trx_id_t trx_id, /*!< in: transaction id */
198 mtr_t* mtr) /*!< in/out: mini-transaction, or NULL */
199 {
200 page_t* page = buf_block_get_frame(block);
201 ut_ad(!mtr || mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
202
203 /* It is not necessary to write this change to the redo log, as
204 during a database recovery we assume that the max trx id of every
205 page is the maximum trx id assigned before the crash. */
206
207 if (page_zip) {
208 mach_write_to_8(page + (PAGE_HEADER + PAGE_MAX_TRX_ID), trx_id);
209 page_zip_write_header(page_zip,
210 page + (PAGE_HEADER + PAGE_MAX_TRX_ID),
211 8, mtr);
212 } else if (mtr) {
213 mlog_write_ull(page + (PAGE_HEADER + PAGE_MAX_TRX_ID),
214 trx_id, mtr);
215 } else {
216 mach_write_to_8(page + (PAGE_HEADER + PAGE_MAX_TRX_ID), trx_id);
217 }
218 }
219
220 /** Persist the AUTO_INCREMENT value on a clustered index root page.
221 @param[in,out] block clustered index root page
222 @param[in] index clustered index
223 @param[in] autoinc next available AUTO_INCREMENT value
224 @param[in,out] mtr mini-transaction
225 @param[in] reset whether to reset the AUTO_INCREMENT
226 to a possibly smaller value than currently
227 exists in the page */
228 void
page_set_autoinc(buf_block_t * block,const dict_index_t * index MY_ATTRIBUTE ((unused)),ib_uint64_t autoinc,mtr_t * mtr,bool reset)229 page_set_autoinc(
230 buf_block_t* block,
231 const dict_index_t* index MY_ATTRIBUTE((unused)),
232 ib_uint64_t autoinc,
233 mtr_t* mtr,
234 bool reset)
235 {
236 ut_ad(mtr_memo_contains_flagged(
237 mtr, block, MTR_MEMO_PAGE_X_FIX | MTR_MEMO_PAGE_SX_FIX));
238 ut_ad(index->is_primary());
239 ut_ad(index->page == block->page.id.page_no());
240 ut_ad(index->table->space_id == block->page.id.space());
241
242 byte* field = PAGE_HEADER + PAGE_ROOT_AUTO_INC
243 + buf_block_get_frame(block);
244 if (!reset && mach_read_from_8(field) >= autoinc) {
245 /* nothing to update */
246 } else if (page_zip_des_t* page_zip = buf_block_get_page_zip(block)) {
247 mach_write_to_8(field, autoinc);
248 page_zip_write_header(page_zip, field, 8, mtr);
249 } else {
250 mlog_write_ull(field, autoinc, mtr);
251 }
252 }
253
254 /**********************************************************//**
255 Writes a log record of page creation. */
256 UNIV_INLINE
257 void
page_create_write_log(buf_frame_t * frame,mtr_t * mtr,ibool comp,bool is_rtree)258 page_create_write_log(
259 /*==================*/
260 buf_frame_t* frame, /*!< in: a buffer frame where the page is
261 created */
262 mtr_t* mtr, /*!< in: mini-transaction handle */
263 ibool comp, /*!< in: TRUE=compact page format */
264 bool is_rtree) /*!< in: whether it is R-tree */
265 {
266 mlog_id_t type;
267
268 if (is_rtree) {
269 type = comp ? MLOG_COMP_PAGE_CREATE_RTREE
270 : MLOG_PAGE_CREATE_RTREE;
271 } else {
272 type = comp ? MLOG_COMP_PAGE_CREATE : MLOG_PAGE_CREATE;
273 }
274
275 mlog_write_initial_log_record(frame, type, mtr);
276 }
277
278 /** The page infimum and supremum of an empty page in ROW_FORMAT=REDUNDANT */
279 static const byte infimum_supremum_redundant[] = {
280 /* the infimum record */
281 0x08/*end offset*/,
282 0x01/*n_owned*/,
283 0x00, 0x00/*heap_no=0*/,
284 0x03/*n_fields=1, 1-byte offsets*/,
285 0x00, 0x74/* pointer to supremum */,
286 'i', 'n', 'f', 'i', 'm', 'u', 'm', 0,
287 /* the supremum record */
288 0x09/*end offset*/,
289 0x01/*n_owned*/,
290 0x00, 0x08/*heap_no=1*/,
291 0x03/*n_fields=1, 1-byte offsets*/,
292 0x00, 0x00/* end of record list */,
293 's', 'u', 'p', 'r', 'e', 'm', 'u', 'm', 0
294 };
295
296 /** The page infimum and supremum of an empty page in ROW_FORMAT=COMPACT */
297 static const byte infimum_supremum_compact[] = {
298 /* the infimum record */
299 0x01/*n_owned=1*/,
300 0x00, 0x02/* heap_no=0, REC_STATUS_INFIMUM */,
301 0x00, 0x0d/* pointer to supremum */,
302 'i', 'n', 'f', 'i', 'm', 'u', 'm', 0,
303 /* the supremum record */
304 0x01/*n_owned=1*/,
305 0x00, 0x0b/* heap_no=1, REC_STATUS_SUPREMUM */,
306 0x00, 0x00/* end of record list */,
307 's', 'u', 'p', 'r', 'e', 'm', 'u', 'm'
308 };
309
310 /**********************************************************//**
311 The index page creation function.
312 @return pointer to the page */
313 static
314 page_t*
page_create_low(buf_block_t * block,ulint comp,bool is_rtree)315 page_create_low(
316 /*============*/
317 buf_block_t* block, /*!< in: a buffer block where the
318 page is created */
319 ulint comp, /*!< in: nonzero=compact page format */
320 bool is_rtree) /*!< in: if it is an R-Tree page */
321 {
322 page_t* page;
323
324 compile_time_assert(PAGE_BTR_IBUF_FREE_LIST + FLST_BASE_NODE_SIZE
325 <= PAGE_DATA);
326 compile_time_assert(PAGE_BTR_IBUF_FREE_LIST_NODE + FLST_NODE_SIZE
327 <= PAGE_DATA);
328
329 buf_block_modify_clock_inc(block);
330
331 page = buf_block_get_frame(block);
332
333 if (is_rtree) {
334 fil_page_set_type(page, FIL_PAGE_RTREE);
335 } else {
336 fil_page_set_type(page, FIL_PAGE_INDEX);
337 }
338
339 memset(page + PAGE_HEADER, 0, PAGE_HEADER_PRIV_END);
340 page[PAGE_HEADER + PAGE_N_DIR_SLOTS + 1] = 2;
341 page[PAGE_HEADER + PAGE_INSTANT] = 0;
342 page[PAGE_HEADER + PAGE_DIRECTION_B] = PAGE_NO_DIRECTION;
343
344 if (comp) {
345 page[PAGE_HEADER + PAGE_N_HEAP] = 0x80;/*page_is_comp()*/
346 page[PAGE_HEADER + PAGE_N_HEAP + 1] = PAGE_HEAP_NO_USER_LOW;
347 page[PAGE_HEADER + PAGE_HEAP_TOP + 1] = PAGE_NEW_SUPREMUM_END;
348 memcpy(page + PAGE_DATA, infimum_supremum_compact,
349 sizeof infimum_supremum_compact);
350 memset(page
351 + PAGE_NEW_SUPREMUM_END, 0,
352 srv_page_size - PAGE_DIR - PAGE_NEW_SUPREMUM_END);
353 page[srv_page_size - PAGE_DIR - PAGE_DIR_SLOT_SIZE * 2 + 1]
354 = PAGE_NEW_SUPREMUM;
355 page[srv_page_size - PAGE_DIR - PAGE_DIR_SLOT_SIZE + 1]
356 = PAGE_NEW_INFIMUM;
357 } else {
358 page[PAGE_HEADER + PAGE_N_HEAP + 1] = PAGE_HEAP_NO_USER_LOW;
359 page[PAGE_HEADER + PAGE_HEAP_TOP + 1] = PAGE_OLD_SUPREMUM_END;
360 memcpy(page + PAGE_DATA, infimum_supremum_redundant,
361 sizeof infimum_supremum_redundant);
362 memset(page
363 + PAGE_OLD_SUPREMUM_END, 0,
364 srv_page_size - PAGE_DIR - PAGE_OLD_SUPREMUM_END);
365 page[srv_page_size - PAGE_DIR - PAGE_DIR_SLOT_SIZE * 2 + 1]
366 = PAGE_OLD_SUPREMUM;
367 page[srv_page_size - PAGE_DIR - PAGE_DIR_SLOT_SIZE + 1]
368 = PAGE_OLD_INFIMUM;
369 }
370
371 return(page);
372 }
373
374 /** Parses a redo log record of creating a page.
375 @param[in,out] block buffer block, or NULL
376 @param[in] comp nonzero=compact page format
377 @param[in] is_rtree whether it is rtree page */
378 void
page_parse_create(buf_block_t * block,ulint comp,bool is_rtree)379 page_parse_create(
380 buf_block_t* block,
381 ulint comp,
382 bool is_rtree)
383 {
384 if (block != NULL) {
385 page_create_low(block, comp, is_rtree);
386 }
387 }
388
389 /**********************************************************//**
390 Create an uncompressed B-tree or R-tree index page.
391 @return pointer to the page */
392 page_t*
page_create(buf_block_t * block,mtr_t * mtr,ulint comp,bool is_rtree)393 page_create(
394 /*========*/
395 buf_block_t* block, /*!< in: a buffer block where the
396 page is created */
397 mtr_t* mtr, /*!< in: mini-transaction handle */
398 ulint comp, /*!< in: nonzero=compact page format */
399 bool is_rtree) /*!< in: whether it is a R-Tree page */
400 {
401 ut_ad(mtr->is_named_space(block->page.id.space()));
402 page_create_write_log(buf_block_get_frame(block), mtr, comp, is_rtree);
403 return(page_create_low(block, comp, is_rtree));
404 }
405
406 /**********************************************************//**
407 Create a compressed B-tree index page.
408 @return pointer to the page */
409 page_t*
page_create_zip(buf_block_t * block,dict_index_t * index,ulint level,trx_id_t max_trx_id,mtr_t * mtr)410 page_create_zip(
411 /*============*/
412 buf_block_t* block, /*!< in/out: a buffer frame
413 where the page is created */
414 dict_index_t* index, /*!< in: the index of the
415 page */
416 ulint level, /*!< in: the B-tree level
417 of the page */
418 trx_id_t max_trx_id, /*!< in: PAGE_MAX_TRX_ID */
419 mtr_t* mtr) /*!< in/out: mini-transaction
420 handle */
421 {
422 page_t* page;
423 page_zip_des_t* page_zip = buf_block_get_page_zip(block);
424
425 ut_ad(block);
426 ut_ad(page_zip);
427 ut_ad(dict_table_is_comp(index->table));
428
429 /* PAGE_MAX_TRX_ID or PAGE_ROOT_AUTO_INC are always 0 for
430 temporary tables. */
431 ut_ad(max_trx_id == 0 || !index->table->is_temporary());
432 /* In secondary indexes and the change buffer, PAGE_MAX_TRX_ID
433 must be zero on non-leaf pages. max_trx_id can be 0 when the
434 index consists of an empty root (leaf) page. */
435 ut_ad(max_trx_id == 0
436 || level == 0
437 || !dict_index_is_sec_or_ibuf(index)
438 || index->table->is_temporary());
439 /* In the clustered index, PAGE_ROOT_AUTOINC or
440 PAGE_MAX_TRX_ID must be 0 on other pages than the root. */
441 ut_ad(level == 0 || max_trx_id == 0
442 || !dict_index_is_sec_or_ibuf(index)
443 || index->table->is_temporary());
444
445 page = page_create_low(block, TRUE, dict_index_is_spatial(index));
446 mach_write_to_2(PAGE_HEADER + PAGE_LEVEL + page, level);
447 mach_write_to_8(PAGE_HEADER + PAGE_MAX_TRX_ID + page, max_trx_id);
448
449 if (!page_zip_compress(page_zip, page, index, page_zip_level, mtr)) {
450 /* The compression of a newly created
451 page should always succeed. */
452 ut_error;
453 }
454
455 return(page);
456 }
457
458 /**********************************************************//**
459 Empty a previously created B-tree index page. */
460 void
page_create_empty(buf_block_t * block,dict_index_t * index,mtr_t * mtr)461 page_create_empty(
462 /*==============*/
463 buf_block_t* block, /*!< in/out: B-tree block */
464 dict_index_t* index, /*!< in: the index of the page */
465 mtr_t* mtr) /*!< in/out: mini-transaction */
466 {
467 trx_id_t max_trx_id;
468 page_t* page = buf_block_get_frame(block);
469 page_zip_des_t* page_zip= buf_block_get_page_zip(block);
470
471 ut_ad(fil_page_index_page_check(page));
472 ut_ad(!index->is_dummy);
473 ut_ad(block->page.id.space() == index->table->space->id);
474
475 /* Multiple transactions cannot simultaneously operate on the
476 same temp-table in parallel.
477 max_trx_id is ignored for temp tables because it not required
478 for MVCC. */
479 if (dict_index_is_sec_or_ibuf(index)
480 && !index->table->is_temporary()
481 && page_is_leaf(page)) {
482 max_trx_id = page_get_max_trx_id(page);
483 ut_ad(max_trx_id);
484 } else if (block->page.id.page_no() == index->page) {
485 /* Preserve PAGE_ROOT_AUTO_INC. */
486 max_trx_id = page_get_max_trx_id(page);
487 } else {
488 max_trx_id = 0;
489 }
490
491 if (page_zip) {
492 ut_ad(!index->table->is_temporary());
493 page_create_zip(block, index,
494 page_header_get_field(page, PAGE_LEVEL),
495 max_trx_id, mtr);
496 } else {
497 page_create(block, mtr, page_is_comp(page),
498 dict_index_is_spatial(index));
499
500 if (max_trx_id) {
501 mlog_write_ull(PAGE_HEADER + PAGE_MAX_TRX_ID + page,
502 max_trx_id, mtr);
503 }
504 }
505 }
506
507 /*************************************************************//**
508 Differs from page_copy_rec_list_end, because this function does not
509 touch the lock table and max trx id on page or compress the page.
510
511 IMPORTANT: The caller will have to update IBUF_BITMAP_FREE
512 if new_block is a compressed leaf page in a secondary index.
513 This has to be done either within the same mini-transaction,
514 or by invoking ibuf_reset_free_bits() before mtr_commit(). */
515 void
page_copy_rec_list_end_no_locks(buf_block_t * new_block,buf_block_t * block,rec_t * rec,dict_index_t * index,mtr_t * mtr)516 page_copy_rec_list_end_no_locks(
517 /*============================*/
518 buf_block_t* new_block, /*!< in: index page to copy to */
519 buf_block_t* block, /*!< in: index page of rec */
520 rec_t* rec, /*!< in: record on page */
521 dict_index_t* index, /*!< in: record descriptor */
522 mtr_t* mtr) /*!< in: mtr */
523 {
524 page_t* new_page = buf_block_get_frame(new_block);
525 page_cur_t cur1;
526 rec_t* cur2;
527 mem_heap_t* heap = NULL;
528 rec_offs offsets_[REC_OFFS_NORMAL_SIZE];
529 rec_offs* offsets = offsets_;
530 rec_offs_init(offsets_);
531
532 page_cur_position(rec, block, &cur1);
533
534 if (page_cur_is_before_first(&cur1)) {
535
536 page_cur_move_to_next(&cur1);
537 }
538
539 btr_assert_not_corrupted(new_block, index);
540 ut_a(page_is_comp(new_page) == page_rec_is_comp(rec));
541 ut_a(mach_read_from_2(new_page + srv_page_size - 10) == (ulint)
542 (page_is_comp(new_page) ? PAGE_NEW_INFIMUM : PAGE_OLD_INFIMUM));
543 const ulint n_core = page_is_leaf(block->frame)
544 ? index->n_core_fields : 0;
545
546 cur2 = page_get_infimum_rec(buf_block_get_frame(new_block));
547
548 /* Copy records from the original page to the new page */
549
550 while (!page_cur_is_after_last(&cur1)) {
551 rec_t* ins_rec;
552 offsets = rec_get_offsets(cur1.rec, index, offsets, n_core,
553 ULINT_UNDEFINED, &heap);
554 ins_rec = page_cur_insert_rec_low(cur2, index,
555 cur1.rec, offsets, mtr);
556 if (UNIV_UNLIKELY(!ins_rec)) {
557 ib::fatal() << "Rec offset " << page_offset(rec)
558 << ", cur1 offset " << page_offset(cur1.rec)
559 << ", cur2 offset " << page_offset(cur2);
560 }
561
562 page_cur_move_to_next(&cur1);
563 ut_ad(!(rec_get_info_bits(cur1.rec, page_is_comp(new_page))
564 & REC_INFO_MIN_REC_FLAG));
565 cur2 = ins_rec;
566 }
567
568 if (UNIV_LIKELY_NULL(heap)) {
569 mem_heap_free(heap);
570 }
571 }
572
573 /*************************************************************//**
574 Copies records from page to new_page, from a given record onward,
575 including that record. Infimum and supremum records are not copied.
576 The records are copied to the start of the record list on new_page.
577
578 IMPORTANT: The caller will have to update IBUF_BITMAP_FREE
579 if new_block is a compressed leaf page in a secondary index.
580 This has to be done either within the same mini-transaction,
581 or by invoking ibuf_reset_free_bits() before mtr_commit().
582
583 @return pointer to the original successor of the infimum record on
584 new_page, or NULL on zip overflow (new_block will be decompressed) */
585 rec_t*
page_copy_rec_list_end(buf_block_t * new_block,buf_block_t * block,rec_t * rec,dict_index_t * index,mtr_t * mtr)586 page_copy_rec_list_end(
587 /*===================*/
588 buf_block_t* new_block, /*!< in/out: index page to copy to */
589 buf_block_t* block, /*!< in: index page containing rec */
590 rec_t* rec, /*!< in: record on page */
591 dict_index_t* index, /*!< in: record descriptor */
592 mtr_t* mtr) /*!< in: mtr */
593 {
594 page_t* new_page = buf_block_get_frame(new_block);
595 page_zip_des_t* new_page_zip = buf_block_get_page_zip(new_block);
596 page_t* page = page_align(rec);
597 rec_t* ret = page_rec_get_next(
598 page_get_infimum_rec(new_page));
599 ulint num_moved = 0;
600 rtr_rec_move_t* rec_move = NULL;
601 mem_heap_t* heap = NULL;
602
603 #ifdef UNIV_ZIP_DEBUG
604 if (new_page_zip) {
605 page_zip_des_t* page_zip = buf_block_get_page_zip(block);
606 ut_a(page_zip);
607
608 /* Strict page_zip_validate() may fail here.
609 Furthermore, btr_compress() may set FIL_PAGE_PREV to
610 FIL_NULL on new_page while leaving it intact on
611 new_page_zip. So, we cannot validate new_page_zip. */
612 ut_a(page_zip_validate_low(page_zip, page, index, TRUE));
613 }
614 #endif /* UNIV_ZIP_DEBUG */
615 ut_ad(buf_block_get_frame(block) == page);
616 ut_ad(page_is_leaf(page) == page_is_leaf(new_page));
617 ut_ad(page_is_comp(page) == page_is_comp(new_page));
618 /* Here, "ret" may be pointing to a user record or the
619 predefined supremum record. */
620
621 mtr_log_t log_mode = MTR_LOG_NONE;
622
623 if (new_page_zip) {
624 log_mode = mtr_set_log_mode(mtr, MTR_LOG_NONE);
625 }
626
627 if (page_dir_get_n_heap(new_page) == PAGE_HEAP_NO_USER_LOW) {
628 page_copy_rec_list_end_to_created_page(new_page, rec,
629 index, mtr);
630 } else {
631 if (dict_index_is_spatial(index)) {
632 ulint max_to_move = page_get_n_recs(
633 buf_block_get_frame(block));
634 heap = mem_heap_create(256);
635
636 rec_move = static_cast<rtr_rec_move_t*>(mem_heap_alloc(
637 heap,
638 sizeof (*rec_move) * max_to_move));
639
640 /* For spatial index, we need to insert recs one by one
641 to keep recs ordered. */
642 rtr_page_copy_rec_list_end_no_locks(new_block,
643 block, rec, index,
644 heap, rec_move,
645 max_to_move,
646 &num_moved,
647 mtr);
648 } else {
649 page_copy_rec_list_end_no_locks(new_block, block, rec,
650 index, mtr);
651 }
652 }
653
654 /* Update PAGE_MAX_TRX_ID on the uncompressed page.
655 Modifications will be redo logged and copied to the compressed
656 page in page_zip_compress() or page_zip_reorganize() below.
657 Multiple transactions cannot simultaneously operate on the
658 same temp-table in parallel.
659 max_trx_id is ignored for temp tables because it not required
660 for MVCC. */
661 if (dict_index_is_sec_or_ibuf(index)
662 && page_is_leaf(page)
663 && !index->table->is_temporary()) {
664 page_update_max_trx_id(new_block, NULL,
665 page_get_max_trx_id(page), mtr);
666 }
667
668 if (new_page_zip) {
669 mtr_set_log_mode(mtr, log_mode);
670
671 if (!page_zip_compress(new_page_zip, new_page, index,
672 page_zip_level, mtr)) {
673 /* Before trying to reorganize the page,
674 store the number of preceding records on the page. */
675 ulint ret_pos
676 = page_rec_get_n_recs_before(ret);
677 /* Before copying, "ret" was the successor of
678 the predefined infimum record. It must still
679 have at least one predecessor (the predefined
680 infimum record, or a freshly copied record
681 that is smaller than "ret"). */
682 ut_a(ret_pos > 0);
683
684 if (!page_zip_reorganize(new_block, index, mtr)) {
685
686 if (!page_zip_decompress(new_page_zip,
687 new_page, FALSE)) {
688 ut_error;
689 }
690 ut_ad(page_validate(new_page, index));
691
692 if (heap) {
693 mem_heap_free(heap);
694 }
695
696 return(NULL);
697 } else {
698 /* The page was reorganized:
699 Seek to ret_pos. */
700 ret = new_page + PAGE_NEW_INFIMUM;
701
702 do {
703 ret = rec_get_next_ptr(ret, TRUE);
704 } while (--ret_pos);
705 }
706 }
707 }
708
709 /* Update the lock table and possible hash index */
710
711 if (dict_table_is_locking_disabled(index->table)) {
712 } else if (rec_move && dict_index_is_spatial(index)) {
713 lock_rtr_move_rec_list(new_block, block, rec_move, num_moved);
714 } else {
715 lock_move_rec_list_end(new_block, block, rec);
716 }
717
718 if (heap) {
719 mem_heap_free(heap);
720 }
721
722 btr_search_move_or_delete_hash_entries(new_block, block);
723
724 return(ret);
725 }
726
727 /*************************************************************//**
728 Copies records from page to new_page, up to the given record,
729 NOT including that record. Infimum and supremum records are not copied.
730 The records are copied to the end of the record list on new_page.
731
732 IMPORTANT: The caller will have to update IBUF_BITMAP_FREE
733 if new_block is a compressed leaf page in a secondary index.
734 This has to be done either within the same mini-transaction,
735 or by invoking ibuf_reset_free_bits() before mtr_commit().
736
737 @return pointer to the original predecessor of the supremum record on
738 new_page, or NULL on zip overflow (new_block will be decompressed) */
739 rec_t*
page_copy_rec_list_start(buf_block_t * new_block,buf_block_t * block,rec_t * rec,dict_index_t * index,mtr_t * mtr)740 page_copy_rec_list_start(
741 /*=====================*/
742 buf_block_t* new_block, /*!< in/out: index page to copy to */
743 buf_block_t* block, /*!< in: index page containing rec */
744 rec_t* rec, /*!< in: record on page */
745 dict_index_t* index, /*!< in: record descriptor */
746 mtr_t* mtr) /*!< in: mtr */
747 {
748 ut_ad(page_align(rec) == block->frame);
749
750 page_t* new_page = buf_block_get_frame(new_block);
751 page_zip_des_t* new_page_zip = buf_block_get_page_zip(new_block);
752 page_cur_t cur1;
753 rec_t* cur2;
754 mem_heap_t* heap = NULL;
755 ulint num_moved = 0;
756 rtr_rec_move_t* rec_move = NULL;
757 rec_t* ret
758 = page_rec_get_prev(page_get_supremum_rec(new_page));
759 rec_offs offsets_[REC_OFFS_NORMAL_SIZE];
760 rec_offs* offsets = offsets_;
761 rec_offs_init(offsets_);
762
763 /* Here, "ret" may be pointing to a user record or the
764 predefined infimum record. */
765
766 if (page_rec_is_infimum(rec)) {
767 return(ret);
768 }
769
770 mtr_log_t log_mode = MTR_LOG_NONE;
771
772 if (new_page_zip) {
773 log_mode = mtr_set_log_mode(mtr, MTR_LOG_NONE);
774 }
775
776 page_cur_set_before_first(block, &cur1);
777 page_cur_move_to_next(&cur1);
778
779 cur2 = ret;
780
781 const ulint n_core = page_rec_is_leaf(rec) ? index->n_core_fields : 0;
782
783 /* Copy records from the original page to the new page */
784 if (index->is_spatial()) {
785 ut_ad(!index->is_instant());
786 ulint max_to_move = page_get_n_recs(
787 buf_block_get_frame(block));
788 heap = mem_heap_create(256);
789
790 rec_move = static_cast<rtr_rec_move_t*>(mem_heap_alloc(
791 heap,
792 sizeof (*rec_move) * max_to_move));
793
794 /* For spatial index, we need to insert recs one by one
795 to keep recs ordered. */
796 rtr_page_copy_rec_list_start_no_locks(new_block,
797 block, rec, index, heap,
798 rec_move, max_to_move,
799 &num_moved, mtr);
800 } else {
801 while (page_cur_get_rec(&cur1) != rec) {
802 offsets = rec_get_offsets(cur1.rec, index, offsets,
803 n_core,
804 ULINT_UNDEFINED, &heap);
805 cur2 = page_cur_insert_rec_low(cur2, index,
806 cur1.rec, offsets, mtr);
807 ut_a(cur2);
808
809 page_cur_move_to_next(&cur1);
810 ut_ad(!(rec_get_info_bits(cur1.rec,
811 page_is_comp(new_page))
812 & REC_INFO_MIN_REC_FLAG));
813 }
814 }
815
816 /* Update PAGE_MAX_TRX_ID on the uncompressed page.
817 Modifications will be redo logged and copied to the compressed
818 page in page_zip_compress() or page_zip_reorganize() below.
819 Multiple transactions cannot simultaneously operate on the
820 same temp-table in parallel.
821 max_trx_id is ignored for temp tables because it not required
822 for MVCC. */
823 if (n_core && dict_index_is_sec_or_ibuf(index)
824 && !index->table->is_temporary()) {
825 page_update_max_trx_id(new_block, NULL,
826 page_get_max_trx_id(page_align(rec)),
827 mtr);
828 }
829
830 if (new_page_zip) {
831 mtr_set_log_mode(mtr, log_mode);
832
833 DBUG_EXECUTE_IF("page_copy_rec_list_start_compress_fail",
834 goto zip_reorganize;);
835
836 if (!page_zip_compress(new_page_zip, new_page, index,
837 page_zip_level, mtr)) {
838 ulint ret_pos;
839 #ifndef DBUG_OFF
840 zip_reorganize:
841 #endif /* DBUG_OFF */
842 /* Before trying to reorganize the page,
843 store the number of preceding records on the page. */
844 ret_pos = page_rec_get_n_recs_before(ret);
845 /* Before copying, "ret" was the predecessor
846 of the predefined supremum record. If it was
847 the predefined infimum record, then it would
848 still be the infimum, and we would have
849 ret_pos == 0. */
850
851 if (UNIV_UNLIKELY
852 (!page_zip_reorganize(new_block, index, mtr))) {
853
854 if (UNIV_UNLIKELY
855 (!page_zip_decompress(new_page_zip,
856 new_page, FALSE))) {
857 ut_error;
858 }
859 ut_ad(page_validate(new_page, index));
860
861 if (UNIV_LIKELY_NULL(heap)) {
862 mem_heap_free(heap);
863 }
864
865 return(NULL);
866 }
867
868 /* The page was reorganized: Seek to ret_pos. */
869 ret = page_rec_get_nth(new_page, ret_pos);
870 }
871 }
872
873 /* Update the lock table and possible hash index */
874
875 if (dict_table_is_locking_disabled(index->table)) {
876 } else if (dict_index_is_spatial(index)) {
877 lock_rtr_move_rec_list(new_block, block, rec_move, num_moved);
878 } else {
879 lock_move_rec_list_start(new_block, block, rec, ret);
880 }
881
882 if (heap) {
883 mem_heap_free(heap);
884 }
885
886 btr_search_move_or_delete_hash_entries(new_block, block);
887
888 return(ret);
889 }
890
891 /**********************************************************//**
892 Writes a log record of a record list end or start deletion. */
893 UNIV_INLINE
894 void
page_delete_rec_list_write_log(rec_t * rec,dict_index_t * index,mlog_id_t type,mtr_t * mtr)895 page_delete_rec_list_write_log(
896 /*===========================*/
897 rec_t* rec, /*!< in: record on page */
898 dict_index_t* index, /*!< in: record descriptor */
899 mlog_id_t type, /*!< in: operation type:
900 MLOG_LIST_END_DELETE, ... */
901 mtr_t* mtr) /*!< in: mtr */
902 {
903 byte* log_ptr;
904 ut_ad(type == MLOG_LIST_END_DELETE
905 || type == MLOG_LIST_START_DELETE
906 || type == MLOG_COMP_LIST_END_DELETE
907 || type == MLOG_COMP_LIST_START_DELETE);
908
909 log_ptr = mlog_open_and_write_index(mtr, rec, index, type, 2);
910 if (log_ptr) {
911 /* Write the parameter as a 2-byte ulint */
912 mach_write_to_2(log_ptr, page_offset(rec));
913 mlog_close(mtr, log_ptr + 2);
914 }
915 }
916
917 /**********************************************************//**
918 Parses a log record of a record list end or start deletion.
919 @return end of log record or NULL */
920 byte*
page_parse_delete_rec_list(mlog_id_t type,byte * ptr,byte * end_ptr,buf_block_t * block,dict_index_t * index,mtr_t * mtr)921 page_parse_delete_rec_list(
922 /*=======================*/
923 mlog_id_t type, /*!< in: MLOG_LIST_END_DELETE,
924 MLOG_LIST_START_DELETE,
925 MLOG_COMP_LIST_END_DELETE or
926 MLOG_COMP_LIST_START_DELETE */
927 byte* ptr, /*!< in: buffer */
928 byte* end_ptr,/*!< in: buffer end */
929 buf_block_t* block, /*!< in/out: buffer block or NULL */
930 dict_index_t* index, /*!< in: record descriptor */
931 mtr_t* mtr) /*!< in: mtr or NULL */
932 {
933 page_t* page;
934 ulint offset;
935
936 ut_ad(type == MLOG_LIST_END_DELETE
937 || type == MLOG_LIST_START_DELETE
938 || type == MLOG_COMP_LIST_END_DELETE
939 || type == MLOG_COMP_LIST_START_DELETE);
940
941 /* Read the record offset as a 2-byte ulint */
942
943 if (end_ptr < ptr + 2) {
944
945 return(NULL);
946 }
947
948 offset = mach_read_from_2(ptr);
949 ptr += 2;
950
951 if (!block) {
952
953 return(ptr);
954 }
955
956 page = buf_block_get_frame(block);
957
958 ut_ad(!!page_is_comp(page) == dict_table_is_comp(index->table));
959
960 if (type == MLOG_LIST_END_DELETE
961 || type == MLOG_COMP_LIST_END_DELETE) {
962 page_delete_rec_list_end(page + offset, block, index,
963 ULINT_UNDEFINED, ULINT_UNDEFINED,
964 mtr);
965 } else {
966 page_delete_rec_list_start(page + offset, block, index, mtr);
967 }
968
969 return(ptr);
970 }
971
972 /*************************************************************//**
973 Deletes records from a page from a given record onward, including that record.
974 The infimum and supremum records are not deleted. */
975 void
page_delete_rec_list_end(rec_t * rec,buf_block_t * block,dict_index_t * index,ulint n_recs,ulint size,mtr_t * mtr)976 page_delete_rec_list_end(
977 /*=====================*/
978 rec_t* rec, /*!< in: pointer to record on page */
979 buf_block_t* block, /*!< in: buffer block of the page */
980 dict_index_t* index, /*!< in: record descriptor */
981 ulint n_recs, /*!< in: number of records to delete,
982 or ULINT_UNDEFINED if not known */
983 ulint size, /*!< in: the sum of the sizes of the
984 records in the end of the chain to
985 delete, or ULINT_UNDEFINED if not known */
986 mtr_t* mtr) /*!< in: mtr */
987 {
988 page_dir_slot_t*slot;
989 ulint slot_index;
990 rec_t* last_rec;
991 rec_t* prev_rec;
992 ulint n_owned;
993 page_zip_des_t* page_zip = buf_block_get_page_zip(block);
994 page_t* page = page_align(rec);
995 mem_heap_t* heap = NULL;
996 rec_offs offsets_[REC_OFFS_NORMAL_SIZE];
997 rec_offs* offsets = offsets_;
998 rec_offs_init(offsets_);
999
1000 ut_ad(size == ULINT_UNDEFINED || size < srv_page_size);
1001 ut_ad(!page_zip || page_rec_is_comp(rec));
1002 #ifdef UNIV_ZIP_DEBUG
1003 ut_a(!page_zip || page_zip_validate(page_zip, page, index));
1004 #endif /* UNIV_ZIP_DEBUG */
1005
1006 if (page_rec_is_supremum(rec)) {
1007 ut_ad(n_recs == 0 || n_recs == ULINT_UNDEFINED);
1008 /* Nothing to do, there are no records bigger than the
1009 page supremum. */
1010 return;
1011 }
1012
1013 if (recv_recovery_is_on()) {
1014 /* If we are replaying a redo log record, we must
1015 replay it exactly. Since MySQL 5.6.11, we should be
1016 generating a redo log record for page creation if
1017 the page would become empty. Thus, this branch should
1018 only be executed when applying redo log that was
1019 generated by an older version of MySQL. */
1020 } else if (page_rec_is_infimum(rec)
1021 || n_recs == page_get_n_recs(page)) {
1022 delete_all:
1023 /* We are deleting all records. */
1024 page_create_empty(block, index, mtr);
1025 return;
1026 } else if (page_is_comp(page)) {
1027 if (page_rec_get_next_low(page + PAGE_NEW_INFIMUM, 1) == rec) {
1028 /* We are deleting everything from the first
1029 user record onwards. */
1030 goto delete_all;
1031 }
1032 } else {
1033 if (page_rec_get_next_low(page + PAGE_OLD_INFIMUM, 0) == rec) {
1034 /* We are deleting everything from the first
1035 user record onwards. */
1036 goto delete_all;
1037 }
1038 }
1039
1040 /* Reset the last insert info in the page header and increment
1041 the modify clock for the frame */
1042
1043 page_header_set_ptr(page, page_zip, PAGE_LAST_INSERT, NULL);
1044
1045 /* The page gets invalid for optimistic searches: increment the
1046 frame modify clock */
1047
1048 buf_block_modify_clock_inc(block);
1049
1050 page_delete_rec_list_write_log(rec, index, page_is_comp(page)
1051 ? MLOG_COMP_LIST_END_DELETE
1052 : MLOG_LIST_END_DELETE, mtr);
1053
1054 const ulint n_core = page_is_leaf(page) ? index->n_core_fields : 0;
1055
1056 if (page_zip) {
1057 mtr_log_t log_mode;
1058
1059 ut_a(page_is_comp(page));
1060 /* Individual deletes are not logged */
1061
1062 log_mode = mtr_set_log_mode(mtr, MTR_LOG_NONE);
1063
1064 do {
1065 page_cur_t cur;
1066 page_cur_position(rec, block, &cur);
1067
1068 offsets = rec_get_offsets(rec, index, offsets, n_core,
1069 ULINT_UNDEFINED, &heap);
1070 rec = rec_get_next_ptr(rec, TRUE);
1071 #ifdef UNIV_ZIP_DEBUG
1072 ut_a(page_zip_validate(page_zip, page, index));
1073 #endif /* UNIV_ZIP_DEBUG */
1074 page_cur_delete_rec(&cur, index, offsets, mtr);
1075 } while (page_offset(rec) != PAGE_NEW_SUPREMUM);
1076
1077 if (UNIV_LIKELY_NULL(heap)) {
1078 mem_heap_free(heap);
1079 }
1080
1081 /* Restore log mode */
1082
1083 mtr_set_log_mode(mtr, log_mode);
1084 return;
1085 }
1086
1087 prev_rec = page_rec_get_prev(rec);
1088
1089 last_rec = page_rec_get_prev(page_get_supremum_rec(page));
1090
1091 bool scrub = srv_immediate_scrub_data_uncompressed;
1092 if ((size == ULINT_UNDEFINED) || (n_recs == ULINT_UNDEFINED) ||
1093 scrub) {
1094 rec_t* rec2 = rec;
1095 /* Calculate the sum of sizes and the number of records */
1096 size = 0;
1097 n_recs = 0;
1098
1099 do {
1100 ulint s;
1101 offsets = rec_get_offsets(rec2, index, offsets, n_core,
1102 ULINT_UNDEFINED, &heap);
1103 s = rec_offs_size(offsets);
1104 ut_ad(ulint(rec2 - page) + s
1105 - rec_offs_extra_size(offsets)
1106 < srv_page_size);
1107 ut_ad(size + s < srv_page_size);
1108 size += s;
1109 n_recs++;
1110
1111 if (scrub) {
1112 /* scrub record */
1113 memset(rec2, 0, rec_offs_data_size(offsets));
1114 }
1115
1116 rec2 = page_rec_get_next(rec2);
1117 } while (!page_rec_is_supremum(rec2));
1118
1119 if (UNIV_LIKELY_NULL(heap)) {
1120 mem_heap_free(heap);
1121 }
1122 }
1123
1124 ut_ad(size < srv_page_size);
1125
1126 /* Update the page directory; there is no need to balance the number
1127 of the records owned by the supremum record, as it is allowed to be
1128 less than PAGE_DIR_SLOT_MIN_N_OWNED */
1129
1130 if (page_is_comp(page)) {
1131 rec_t* rec2 = rec;
1132 ulint count = 0;
1133
1134 while (rec_get_n_owned_new(rec2) == 0) {
1135 count++;
1136
1137 rec2 = rec_get_next_ptr(rec2, TRUE);
1138 }
1139
1140 ut_ad(rec_get_n_owned_new(rec2) > count);
1141
1142 n_owned = rec_get_n_owned_new(rec2) - count;
1143 slot_index = page_dir_find_owner_slot(rec2);
1144 ut_ad(slot_index > 0);
1145 slot = page_dir_get_nth_slot(page, slot_index);
1146 } else {
1147 rec_t* rec2 = rec;
1148 ulint count = 0;
1149
1150 while (rec_get_n_owned_old(rec2) == 0) {
1151 count++;
1152
1153 rec2 = rec_get_next_ptr(rec2, FALSE);
1154 }
1155
1156 ut_ad(rec_get_n_owned_old(rec2) > count);
1157
1158 n_owned = rec_get_n_owned_old(rec2) - count;
1159 slot_index = page_dir_find_owner_slot(rec2);
1160 ut_ad(slot_index > 0);
1161 slot = page_dir_get_nth_slot(page, slot_index);
1162 }
1163
1164 page_dir_slot_set_rec(slot, page_get_supremum_rec(page));
1165 page_dir_slot_set_n_owned(slot, NULL, n_owned);
1166
1167 page_dir_set_n_slots(page, NULL, slot_index + 1);
1168
1169 /* Remove the record chain segment from the record chain */
1170 page_rec_set_next(prev_rec, page_get_supremum_rec(page));
1171
1172 /* Catenate the deleted chain segment to the page free list */
1173
1174 page_rec_set_next(last_rec, page_header_get_ptr(page, PAGE_FREE));
1175 page_header_set_ptr(page, NULL, PAGE_FREE, rec);
1176
1177 page_header_set_field(page, NULL, PAGE_GARBAGE, size
1178 + page_header_get_field(page, PAGE_GARBAGE));
1179
1180 ut_ad(page_get_n_recs(page) > n_recs);
1181 page_header_set_field(page, NULL, PAGE_N_RECS,
1182 (ulint)(page_get_n_recs(page) - n_recs));
1183 }
1184
1185 /*************************************************************//**
1186 Deletes records from page, up to the given record, NOT including
1187 that record. Infimum and supremum records are not deleted. */
1188 void
page_delete_rec_list_start(rec_t * rec,buf_block_t * block,dict_index_t * index,mtr_t * mtr)1189 page_delete_rec_list_start(
1190 /*=======================*/
1191 rec_t* rec, /*!< in: record on page */
1192 buf_block_t* block, /*!< in: buffer block of the page */
1193 dict_index_t* index, /*!< in: record descriptor */
1194 mtr_t* mtr) /*!< in: mtr */
1195 {
1196 page_cur_t cur1;
1197 rec_offs offsets_[REC_OFFS_NORMAL_SIZE];
1198 rec_offs* offsets = offsets_;
1199 mem_heap_t* heap = NULL;
1200
1201 rec_offs_init(offsets_);
1202
1203 ut_ad(page_align(rec) == block->frame);
1204 ut_ad((ibool) !!page_rec_is_comp(rec)
1205 == dict_table_is_comp(index->table));
1206 #ifdef UNIV_ZIP_DEBUG
1207 {
1208 page_zip_des_t* page_zip= buf_block_get_page_zip(block);
1209 page_t* page = buf_block_get_frame(block);
1210
1211 /* page_zip_validate() would detect a min_rec_mark mismatch
1212 in btr_page_split_and_insert()
1213 between btr_attach_half_pages() and insert_page = ...
1214 when btr_page_get_split_rec_to_left() holds
1215 (direction == FSP_DOWN). */
1216 ut_a(!page_zip
1217 || page_zip_validate_low(page_zip, page, index, TRUE));
1218 }
1219 #endif /* UNIV_ZIP_DEBUG */
1220
1221 if (page_rec_is_infimum(rec)) {
1222 return;
1223 }
1224
1225 if (page_rec_is_supremum(rec)) {
1226 /* We are deleting all records. */
1227 page_create_empty(block, index, mtr);
1228 return;
1229 }
1230
1231 mlog_id_t type;
1232
1233 if (page_rec_is_comp(rec)) {
1234 type = MLOG_COMP_LIST_START_DELETE;
1235 } else {
1236 type = MLOG_LIST_START_DELETE;
1237 }
1238
1239 page_delete_rec_list_write_log(rec, index, type, mtr);
1240
1241 page_cur_set_before_first(block, &cur1);
1242 page_cur_move_to_next(&cur1);
1243
1244 /* Individual deletes are not logged */
1245
1246 mtr_log_t log_mode = mtr_set_log_mode(mtr, MTR_LOG_NONE);
1247 const ulint n_core = page_rec_is_leaf(rec)
1248 ? index->n_core_fields : 0;
1249
1250 while (page_cur_get_rec(&cur1) != rec) {
1251 offsets = rec_get_offsets(page_cur_get_rec(&cur1), index,
1252 offsets, n_core,
1253 ULINT_UNDEFINED, &heap);
1254 page_cur_delete_rec(&cur1, index, offsets, mtr);
1255 }
1256
1257 if (UNIV_LIKELY_NULL(heap)) {
1258 mem_heap_free(heap);
1259 }
1260
1261 /* Restore log mode */
1262
1263 mtr_set_log_mode(mtr, log_mode);
1264 }
1265
1266 /*************************************************************//**
1267 Moves record list end to another page. Moved records include
1268 split_rec.
1269
1270 IMPORTANT: The caller will have to update IBUF_BITMAP_FREE
1271 if new_block is a compressed leaf page in a secondary index.
1272 This has to be done either within the same mini-transaction,
1273 or by invoking ibuf_reset_free_bits() before mtr_commit().
1274
1275 @return TRUE on success; FALSE on compression failure (new_block will
1276 be decompressed) */
1277 ibool
page_move_rec_list_end(buf_block_t * new_block,buf_block_t * block,rec_t * split_rec,dict_index_t * index,mtr_t * mtr)1278 page_move_rec_list_end(
1279 /*===================*/
1280 buf_block_t* new_block, /*!< in/out: index page where to move */
1281 buf_block_t* block, /*!< in: index page from where to move */
1282 rec_t* split_rec, /*!< in: first record to move */
1283 dict_index_t* index, /*!< in: record descriptor */
1284 mtr_t* mtr) /*!< in: mtr */
1285 {
1286 page_t* new_page = buf_block_get_frame(new_block);
1287 ulint old_data_size;
1288 ulint new_data_size;
1289 ulint old_n_recs;
1290 ulint new_n_recs;
1291
1292 ut_ad(!dict_index_is_spatial(index));
1293
1294 old_data_size = page_get_data_size(new_page);
1295 old_n_recs = page_get_n_recs(new_page);
1296 #ifdef UNIV_ZIP_DEBUG
1297 {
1298 page_zip_des_t* new_page_zip
1299 = buf_block_get_page_zip(new_block);
1300 page_zip_des_t* page_zip
1301 = buf_block_get_page_zip(block);
1302 ut_a(!new_page_zip == !page_zip);
1303 ut_a(!new_page_zip
1304 || page_zip_validate(new_page_zip, new_page, index));
1305 ut_a(!page_zip
1306 || page_zip_validate(page_zip, page_align(split_rec),
1307 index));
1308 }
1309 #endif /* UNIV_ZIP_DEBUG */
1310
1311 if (UNIV_UNLIKELY(!page_copy_rec_list_end(new_block, block,
1312 split_rec, index, mtr))) {
1313 return(FALSE);
1314 }
1315
1316 new_data_size = page_get_data_size(new_page);
1317 new_n_recs = page_get_n_recs(new_page);
1318
1319 ut_ad(new_data_size >= old_data_size);
1320
1321 page_delete_rec_list_end(split_rec, block, index,
1322 new_n_recs - old_n_recs,
1323 new_data_size - old_data_size, mtr);
1324
1325 return(TRUE);
1326 }
1327
1328 /*************************************************************//**
1329 Moves record list start to another page. Moved records do not include
1330 split_rec.
1331
1332 IMPORTANT: The caller will have to update IBUF_BITMAP_FREE
1333 if new_block is a compressed leaf page in a secondary index.
1334 This has to be done either within the same mini-transaction,
1335 or by invoking ibuf_reset_free_bits() before mtr_commit().
1336
1337 @return TRUE on success; FALSE on compression failure */
1338 ibool
page_move_rec_list_start(buf_block_t * new_block,buf_block_t * block,rec_t * split_rec,dict_index_t * index,mtr_t * mtr)1339 page_move_rec_list_start(
1340 /*=====================*/
1341 buf_block_t* new_block, /*!< in/out: index page where to move */
1342 buf_block_t* block, /*!< in/out: page containing split_rec */
1343 rec_t* split_rec, /*!< in: first record not to move */
1344 dict_index_t* index, /*!< in: record descriptor */
1345 mtr_t* mtr) /*!< in: mtr */
1346 {
1347 if (UNIV_UNLIKELY(!page_copy_rec_list_start(new_block, block,
1348 split_rec, index, mtr))) {
1349 return(FALSE);
1350 }
1351
1352 page_delete_rec_list_start(split_rec, block, index, mtr);
1353
1354 return(TRUE);
1355 }
1356
1357 /**************************************************************//**
1358 Used to delete n slots from the directory. This function updates
1359 also n_owned fields in the records, so that the first slot after
1360 the deleted ones inherits the records of the deleted slots. */
1361 UNIV_INLINE
1362 void
page_dir_delete_slot(page_t * page,page_zip_des_t * page_zip,ulint slot_no)1363 page_dir_delete_slot(
1364 /*=================*/
1365 page_t* page, /*!< in/out: the index page */
1366 page_zip_des_t* page_zip,/*!< in/out: compressed page, or NULL */
1367 ulint slot_no)/*!< in: slot to be deleted */
1368 {
1369 page_dir_slot_t* slot;
1370 ulint n_owned;
1371 ulint i;
1372 ulint n_slots;
1373
1374 ut_ad(!page_zip || page_is_comp(page));
1375 ut_ad(slot_no > 0);
1376 ut_ad(slot_no + 1 < page_dir_get_n_slots(page));
1377
1378 n_slots = page_dir_get_n_slots(page);
1379
1380 /* 1. Reset the n_owned fields of the slots to be
1381 deleted */
1382 slot = page_dir_get_nth_slot(page, slot_no);
1383 n_owned = page_dir_slot_get_n_owned(slot);
1384 page_dir_slot_set_n_owned(slot, page_zip, 0);
1385
1386 /* 2. Update the n_owned value of the first non-deleted slot */
1387
1388 slot = page_dir_get_nth_slot(page, slot_no + 1);
1389 page_dir_slot_set_n_owned(slot, page_zip,
1390 n_owned + page_dir_slot_get_n_owned(slot));
1391
1392 /* 3. Destroy the slot by copying slots */
1393 for (i = slot_no + 1; i < n_slots; i++) {
1394 rec_t* rec = (rec_t*)
1395 page_dir_slot_get_rec(page_dir_get_nth_slot(page, i));
1396 page_dir_slot_set_rec(page_dir_get_nth_slot(page, i - 1), rec);
1397 }
1398
1399 /* 4. Zero out the last slot, which will be removed */
1400 mach_write_to_2(page_dir_get_nth_slot(page, n_slots - 1), 0);
1401
1402 /* 5. Update the page header */
1403 page_header_set_field(page, page_zip, PAGE_N_DIR_SLOTS, n_slots - 1);
1404 }
1405
1406 /**************************************************************//**
1407 Used to add n slots to the directory. Does not set the record pointers
1408 in the added slots or update n_owned values: this is the responsibility
1409 of the caller. */
1410 UNIV_INLINE
1411 void
page_dir_add_slot(page_t * page,page_zip_des_t * page_zip,ulint start)1412 page_dir_add_slot(
1413 /*==============*/
1414 page_t* page, /*!< in/out: the index page */
1415 page_zip_des_t* page_zip,/*!< in/out: comprssed page, or NULL */
1416 ulint start) /*!< in: the slot above which the new slots
1417 are added */
1418 {
1419 page_dir_slot_t* slot;
1420 ulint n_slots;
1421
1422 n_slots = page_dir_get_n_slots(page);
1423
1424 ut_ad(start < n_slots - 1);
1425
1426 /* Update the page header */
1427 page_dir_set_n_slots(page, page_zip, n_slots + 1);
1428
1429 /* Move slots up */
1430 slot = page_dir_get_nth_slot(page, n_slots);
1431 memmove(slot, slot + PAGE_DIR_SLOT_SIZE,
1432 (n_slots - 1 - start) * PAGE_DIR_SLOT_SIZE);
1433 }
1434
1435 /****************************************************************//**
1436 Splits a directory slot which owns too many records. */
1437 void
page_dir_split_slot(page_t * page,page_zip_des_t * page_zip,ulint slot_no)1438 page_dir_split_slot(
1439 /*================*/
1440 page_t* page, /*!< in/out: index page */
1441 page_zip_des_t* page_zip,/*!< in/out: compressed page whose
1442 uncompressed part will be written, or NULL */
1443 ulint slot_no)/*!< in: the directory slot */
1444 {
1445 rec_t* rec;
1446 page_dir_slot_t* new_slot;
1447 page_dir_slot_t* prev_slot;
1448 page_dir_slot_t* slot;
1449 ulint i;
1450 ulint n_owned;
1451
1452 ut_ad(!page_zip || page_is_comp(page));
1453 ut_ad(slot_no > 0);
1454
1455 slot = page_dir_get_nth_slot(page, slot_no);
1456
1457 n_owned = page_dir_slot_get_n_owned(slot);
1458 ut_ad(n_owned == PAGE_DIR_SLOT_MAX_N_OWNED + 1);
1459
1460 /* 1. We loop to find a record approximately in the middle of the
1461 records owned by the slot. */
1462
1463 prev_slot = page_dir_get_nth_slot(page, slot_no - 1);
1464 rec = (rec_t*) page_dir_slot_get_rec(prev_slot);
1465
1466 for (i = 0; i < n_owned / 2; i++) {
1467 rec = page_rec_get_next(rec);
1468 }
1469
1470 ut_ad(n_owned / 2 >= PAGE_DIR_SLOT_MIN_N_OWNED);
1471
1472 /* 2. We add one directory slot immediately below the slot to be
1473 split. */
1474
1475 page_dir_add_slot(page, page_zip, slot_no - 1);
1476
1477 /* The added slot is now number slot_no, and the old slot is
1478 now number slot_no + 1 */
1479
1480 new_slot = page_dir_get_nth_slot(page, slot_no);
1481 slot = page_dir_get_nth_slot(page, slot_no + 1);
1482
1483 /* 3. We store the appropriate values to the new slot. */
1484
1485 page_dir_slot_set_rec(new_slot, rec);
1486 page_dir_slot_set_n_owned(new_slot, page_zip, n_owned / 2);
1487
1488 /* 4. Finally, we update the number of records field of the
1489 original slot */
1490
1491 page_dir_slot_set_n_owned(slot, page_zip, n_owned - (n_owned / 2));
1492 }
1493
1494 /*************************************************************//**
1495 Tries to balance the given directory slot with too few records with the upper
1496 neighbor, so that there are at least the minimum number of records owned by
1497 the slot; this may result in the merging of two slots. */
1498 void
page_dir_balance_slot(page_t * page,page_zip_des_t * page_zip,ulint slot_no)1499 page_dir_balance_slot(
1500 /*==================*/
1501 page_t* page, /*!< in/out: index page */
1502 page_zip_des_t* page_zip,/*!< in/out: compressed page, or NULL */
1503 ulint slot_no)/*!< in: the directory slot */
1504 {
1505 page_dir_slot_t* slot;
1506 page_dir_slot_t* up_slot;
1507 ulint n_owned;
1508 ulint up_n_owned;
1509 rec_t* old_rec;
1510 rec_t* new_rec;
1511
1512 ut_ad(!page_zip || page_is_comp(page));
1513 ut_ad(slot_no > 0);
1514
1515 slot = page_dir_get_nth_slot(page, slot_no);
1516
1517 /* The last directory slot cannot be balanced with the upper
1518 neighbor, as there is none. */
1519
1520 if (UNIV_UNLIKELY(slot_no + 1 == page_dir_get_n_slots(page))) {
1521
1522 return;
1523 }
1524
1525 up_slot = page_dir_get_nth_slot(page, slot_no + 1);
1526
1527 n_owned = page_dir_slot_get_n_owned(slot);
1528 up_n_owned = page_dir_slot_get_n_owned(up_slot);
1529
1530 ut_ad(n_owned == PAGE_DIR_SLOT_MIN_N_OWNED - 1);
1531
1532 /* If the upper slot has the minimum value of n_owned, we will merge
1533 the two slots, therefore we assert: */
1534 ut_ad(2 * PAGE_DIR_SLOT_MIN_N_OWNED - 1 <= PAGE_DIR_SLOT_MAX_N_OWNED);
1535
1536 if (up_n_owned > PAGE_DIR_SLOT_MIN_N_OWNED) {
1537
1538 /* In this case we can just transfer one record owned
1539 by the upper slot to the property of the lower slot */
1540 old_rec = (rec_t*) page_dir_slot_get_rec(slot);
1541
1542 if (page_is_comp(page)) {
1543 new_rec = rec_get_next_ptr(old_rec, TRUE);
1544
1545 rec_set_n_owned_new(old_rec, page_zip, 0);
1546 rec_set_n_owned_new(new_rec, page_zip, n_owned + 1);
1547 } else {
1548 new_rec = rec_get_next_ptr(old_rec, FALSE);
1549
1550 rec_set_n_owned_old(old_rec, 0);
1551 rec_set_n_owned_old(new_rec, n_owned + 1);
1552 }
1553
1554 page_dir_slot_set_rec(slot, new_rec);
1555
1556 page_dir_slot_set_n_owned(up_slot, page_zip, up_n_owned -1);
1557 } else {
1558 /* In this case we may merge the two slots */
1559 page_dir_delete_slot(page, page_zip, slot_no);
1560 }
1561 }
1562
1563 /************************************************************//**
1564 Returns the nth record of the record list.
1565 This is the inverse function of page_rec_get_n_recs_before().
1566 @return nth record */
1567 const rec_t*
page_rec_get_nth_const(const page_t * page,ulint nth)1568 page_rec_get_nth_const(
1569 /*===================*/
1570 const page_t* page, /*!< in: page */
1571 ulint nth) /*!< in: nth record */
1572 {
1573 const page_dir_slot_t* slot;
1574 ulint i;
1575 ulint n_owned;
1576 const rec_t* rec;
1577
1578 if (nth == 0) {
1579 return(page_get_infimum_rec(page));
1580 }
1581
1582 ut_ad(nth < srv_page_size / (REC_N_NEW_EXTRA_BYTES + 1));
1583
1584 for (i = 0;; i++) {
1585
1586 slot = page_dir_get_nth_slot(page, i);
1587 n_owned = page_dir_slot_get_n_owned(slot);
1588
1589 if (n_owned > nth) {
1590 break;
1591 } else {
1592 nth -= n_owned;
1593 }
1594 }
1595
1596 ut_ad(i > 0);
1597 slot = page_dir_get_nth_slot(page, i - 1);
1598 rec = page_dir_slot_get_rec(slot);
1599
1600 if (page_is_comp(page)) {
1601 do {
1602 rec = page_rec_get_next_low(rec, TRUE);
1603 ut_ad(rec);
1604 } while (nth--);
1605 } else {
1606 do {
1607 rec = page_rec_get_next_low(rec, FALSE);
1608 ut_ad(rec);
1609 } while (nth--);
1610 }
1611
1612 return(rec);
1613 }
1614
1615 /***************************************************************//**
1616 Returns the number of records before the given record in chain.
1617 The number includes infimum and supremum records.
1618 @return number of records */
1619 ulint
page_rec_get_n_recs_before(const rec_t * rec)1620 page_rec_get_n_recs_before(
1621 /*=======================*/
1622 const rec_t* rec) /*!< in: the physical record */
1623 {
1624 const page_dir_slot_t* slot;
1625 const rec_t* slot_rec;
1626 const page_t* page;
1627 ulint i;
1628 lint n = 0;
1629
1630 ut_ad(page_rec_check(rec));
1631
1632 page = page_align(rec);
1633 if (page_is_comp(page)) {
1634 while (rec_get_n_owned_new(rec) == 0) {
1635
1636 rec = rec_get_next_ptr_const(rec, TRUE);
1637 n--;
1638 }
1639
1640 for (i = 0; ; i++) {
1641 slot = page_dir_get_nth_slot(page, i);
1642 slot_rec = page_dir_slot_get_rec(slot);
1643
1644 n += lint(rec_get_n_owned_new(slot_rec));
1645
1646 if (rec == slot_rec) {
1647
1648 break;
1649 }
1650 }
1651 } else {
1652 while (rec_get_n_owned_old(rec) == 0) {
1653
1654 rec = rec_get_next_ptr_const(rec, FALSE);
1655 n--;
1656 }
1657
1658 for (i = 0; ; i++) {
1659 slot = page_dir_get_nth_slot(page, i);
1660 slot_rec = page_dir_slot_get_rec(slot);
1661
1662 n += lint(rec_get_n_owned_old(slot_rec));
1663
1664 if (rec == slot_rec) {
1665
1666 break;
1667 }
1668 }
1669 }
1670
1671 n--;
1672
1673 ut_ad(n >= 0);
1674 ut_ad((ulong) n < srv_page_size / (REC_N_NEW_EXTRA_BYTES + 1));
1675
1676 return((ulint) n);
1677 }
1678
1679 /************************************************************//**
1680 Prints record contents including the data relevant only in
1681 the index page context. */
1682 void
page_rec_print(const rec_t * rec,const rec_offs * offsets)1683 page_rec_print(
1684 /*===========*/
1685 const rec_t* rec, /*!< in: physical record */
1686 const rec_offs* offsets)/*!< in: record descriptor */
1687 {
1688 ut_a(!page_rec_is_comp(rec) == !rec_offs_comp(offsets));
1689 rec_print_new(stderr, rec, offsets);
1690 if (page_rec_is_comp(rec)) {
1691 ib::info() << "n_owned: " << rec_get_n_owned_new(rec)
1692 << "; heap_no: " << rec_get_heap_no_new(rec)
1693 << "; next rec: " << rec_get_next_offs(rec, TRUE);
1694 } else {
1695 ib::info() << "n_owned: " << rec_get_n_owned_old(rec)
1696 << "; heap_no: " << rec_get_heap_no_old(rec)
1697 << "; next rec: " << rec_get_next_offs(rec, FALSE);
1698 }
1699
1700 page_rec_check(rec);
1701 rec_validate(rec, offsets);
1702 }
1703
1704 #ifdef UNIV_BTR_PRINT
1705 /***************************************************************//**
1706 This is used to print the contents of the directory for
1707 debugging purposes. */
1708 void
page_dir_print(page_t * page,ulint pr_n)1709 page_dir_print(
1710 /*===========*/
1711 page_t* page, /*!< in: index page */
1712 ulint pr_n) /*!< in: print n first and n last entries */
1713 {
1714 ulint n;
1715 ulint i;
1716 page_dir_slot_t* slot;
1717
1718 n = page_dir_get_n_slots(page);
1719
1720 fprintf(stderr, "--------------------------------\n"
1721 "PAGE DIRECTORY\n"
1722 "Page address %p\n"
1723 "Directory stack top at offs: %lu; number of slots: %lu\n",
1724 page, (ulong) page_offset(page_dir_get_nth_slot(page, n - 1)),
1725 (ulong) n);
1726 for (i = 0; i < n; i++) {
1727 slot = page_dir_get_nth_slot(page, i);
1728 if ((i == pr_n) && (i < n - pr_n)) {
1729 fputs(" ... \n", stderr);
1730 }
1731 if ((i < pr_n) || (i >= n - pr_n)) {
1732 fprintf(stderr,
1733 "Contents of slot: %lu: n_owned: %lu,"
1734 " rec offs: %lu\n",
1735 (ulong) i,
1736 (ulong) page_dir_slot_get_n_owned(slot),
1737 (ulong)
1738 page_offset(page_dir_slot_get_rec(slot)));
1739 }
1740 }
1741 fprintf(stderr, "Total of %lu records\n"
1742 "--------------------------------\n",
1743 (ulong) (PAGE_HEAP_NO_USER_LOW + page_get_n_recs(page)));
1744 }
1745
1746 /***************************************************************//**
1747 This is used to print the contents of the page record list for
1748 debugging purposes. */
1749 void
page_print_list(buf_block_t * block,dict_index_t * index,ulint pr_n)1750 page_print_list(
1751 /*============*/
1752 buf_block_t* block, /*!< in: index page */
1753 dict_index_t* index, /*!< in: dictionary index of the page */
1754 ulint pr_n) /*!< in: print n first and n last entries */
1755 {
1756 page_t* page = block->frame;
1757 page_cur_t cur;
1758 ulint count;
1759 ulint n_recs;
1760 mem_heap_t* heap = NULL;
1761 rec_offs offsets_[REC_OFFS_NORMAL_SIZE];
1762 rec_offs* offsets = offsets_;
1763 rec_offs_init(offsets_);
1764
1765 ut_a((ibool)!!page_is_comp(page) == dict_table_is_comp(index->table));
1766
1767 fprint(stderr,
1768 "--------------------------------\n"
1769 "PAGE RECORD LIST\n"
1770 "Page address %p\n", page);
1771
1772 n_recs = page_get_n_recs(page);
1773
1774 page_cur_set_before_first(block, &cur);
1775 count = 0;
1776 for (;;) {
1777 offsets = rec_get_offsets(cur.rec, index, offsets,
1778 page_rec_is_leaf(cur.rec),
1779 ULINT_UNDEFINED, &heap);
1780 page_rec_print(cur.rec, offsets);
1781
1782 if (count == pr_n) {
1783 break;
1784 }
1785 if (page_cur_is_after_last(&cur)) {
1786 break;
1787 }
1788 page_cur_move_to_next(&cur);
1789 count++;
1790 }
1791
1792 if (n_recs > 2 * pr_n) {
1793 fputs(" ... \n", stderr);
1794 }
1795
1796 while (!page_cur_is_after_last(&cur)) {
1797 page_cur_move_to_next(&cur);
1798
1799 if (count + pr_n >= n_recs) {
1800 offsets = rec_get_offsets(cur.rec, index, offsets,
1801 page_rec_is_leaf(cur.rec),
1802 ULINT_UNDEFINED, &heap);
1803 page_rec_print(cur.rec, offsets);
1804 }
1805 count++;
1806 }
1807
1808 fprintf(stderr,
1809 "Total of %lu records \n"
1810 "--------------------------------\n",
1811 (ulong) (count + 1));
1812
1813 if (UNIV_LIKELY_NULL(heap)) {
1814 mem_heap_free(heap);
1815 }
1816 }
1817
1818 /***************************************************************//**
1819 Prints the info in a page header. */
1820 void
page_header_print(const page_t * page)1821 page_header_print(
1822 /*==============*/
1823 const page_t* page)
1824 {
1825 fprintf(stderr,
1826 "--------------------------------\n"
1827 "PAGE HEADER INFO\n"
1828 "Page address %p, n records %u (%s)\n"
1829 "n dir slots %u, heap top %u\n"
1830 "Page n heap %u, free %u, garbage %u\n"
1831 "Page last insert %u, direction %u, n direction %u\n",
1832 page, page_header_get_field(page, PAGE_N_RECS),
1833 page_is_comp(page) ? "compact format" : "original format",
1834 page_header_get_field(page, PAGE_N_DIR_SLOTS),
1835 page_header_get_field(page, PAGE_HEAP_TOP),
1836 page_dir_get_n_heap(page),
1837 page_header_get_field(page, PAGE_FREE),
1838 page_header_get_field(page, PAGE_GARBAGE),
1839 page_header_get_field(page, PAGE_LAST_INSERT),
1840 page_get_direction(page),
1841 page_header_get_field(page, PAGE_N_DIRECTION));
1842 }
1843
1844 /***************************************************************//**
1845 This is used to print the contents of the page for
1846 debugging purposes. */
1847 void
page_print(buf_block_t * block,dict_index_t * index,ulint dn,ulint rn)1848 page_print(
1849 /*=======*/
1850 buf_block_t* block, /*!< in: index page */
1851 dict_index_t* index, /*!< in: dictionary index of the page */
1852 ulint dn, /*!< in: print dn first and last entries
1853 in directory */
1854 ulint rn) /*!< in: print rn first and last records
1855 in directory */
1856 {
1857 page_t* page = block->frame;
1858
1859 page_header_print(page);
1860 page_dir_print(page, dn);
1861 page_print_list(block, index, rn);
1862 }
1863 #endif /* UNIV_BTR_PRINT */
1864
1865 /***************************************************************//**
1866 The following is used to validate a record on a page. This function
1867 differs from rec_validate as it can also check the n_owned field and
1868 the heap_no field.
1869 @return TRUE if ok */
1870 ibool
page_rec_validate(const rec_t * rec,const rec_offs * offsets)1871 page_rec_validate(
1872 /*==============*/
1873 const rec_t* rec, /*!< in: physical record */
1874 const rec_offs* offsets)/*!< in: array returned by rec_get_offsets() */
1875 {
1876 ulint n_owned;
1877 ulint heap_no;
1878 const page_t* page;
1879
1880 page = page_align(rec);
1881 ut_a(!page_is_comp(page) == !rec_offs_comp(offsets));
1882
1883 page_rec_check(rec);
1884 rec_validate(rec, offsets);
1885
1886 if (page_rec_is_comp(rec)) {
1887 n_owned = rec_get_n_owned_new(rec);
1888 heap_no = rec_get_heap_no_new(rec);
1889 } else {
1890 n_owned = rec_get_n_owned_old(rec);
1891 heap_no = rec_get_heap_no_old(rec);
1892 }
1893
1894 if (UNIV_UNLIKELY(!(n_owned <= PAGE_DIR_SLOT_MAX_N_OWNED))) {
1895 ib::warn() << "Dir slot of rec " << page_offset(rec)
1896 << ", n owned too big " << n_owned;
1897 return(FALSE);
1898 }
1899
1900 if (UNIV_UNLIKELY(!(heap_no < page_dir_get_n_heap(page)))) {
1901 ib::warn() << "Heap no of rec " << page_offset(rec)
1902 << " too big " << heap_no << " "
1903 << page_dir_get_n_heap(page);
1904 return(FALSE);
1905 }
1906
1907 return(TRUE);
1908 }
1909
1910 #ifdef UNIV_DEBUG
1911 /***************************************************************//**
1912 Checks that the first directory slot points to the infimum record and
1913 the last to the supremum. This function is intended to track if the
1914 bug fixed in 4.0.14 has caused corruption to users' databases. */
1915 void
page_check_dir(const page_t * page)1916 page_check_dir(
1917 /*===========*/
1918 const page_t* page) /*!< in: index page */
1919 {
1920 ulint n_slots;
1921 ulint infimum_offs;
1922 ulint supremum_offs;
1923
1924 n_slots = page_dir_get_n_slots(page);
1925 infimum_offs = mach_read_from_2(page_dir_get_nth_slot(page, 0));
1926 supremum_offs = mach_read_from_2(page_dir_get_nth_slot(page,
1927 n_slots - 1));
1928
1929 if (UNIV_UNLIKELY(!page_rec_is_infimum_low(infimum_offs))) {
1930
1931 ib::fatal() << "Page directory corruption: infimum not"
1932 " pointed to";
1933 }
1934
1935 if (UNIV_UNLIKELY(!page_rec_is_supremum_low(supremum_offs))) {
1936
1937 ib::fatal() << "Page directory corruption: supremum not"
1938 " pointed to";
1939 }
1940 }
1941 #endif /* UNIV_DEBUG */
1942
1943 /***************************************************************//**
1944 This function checks the consistency of an index page when we do not
1945 know the index. This is also resilient so that this should never crash
1946 even if the page is total garbage.
1947 @return TRUE if ok */
1948 ibool
page_simple_validate_old(const page_t * page)1949 page_simple_validate_old(
1950 /*=====================*/
1951 const page_t* page) /*!< in: index page in ROW_FORMAT=REDUNDANT */
1952 {
1953 const page_dir_slot_t* slot;
1954 ulint slot_no;
1955 ulint n_slots;
1956 const rec_t* rec;
1957 const byte* rec_heap_top;
1958 ulint count;
1959 ulint own_count;
1960 ibool ret = FALSE;
1961
1962 ut_a(!page_is_comp(page));
1963
1964 /* Check first that the record heap and the directory do not
1965 overlap. */
1966
1967 n_slots = page_dir_get_n_slots(page);
1968
1969 if (UNIV_UNLIKELY(n_slots < 2 || n_slots > srv_page_size / 4)) {
1970 ib::error() << "Nonsensical number of page dir slots: "
1971 << n_slots;
1972 goto func_exit;
1973 }
1974
1975 rec_heap_top = page_header_get_ptr(page, PAGE_HEAP_TOP);
1976
1977 if (UNIV_UNLIKELY(rec_heap_top
1978 > page_dir_get_nth_slot(page, n_slots - 1))) {
1979 ib::error()
1980 << "Record heap and dir overlap on a page, heap top "
1981 << page_header_get_field(page, PAGE_HEAP_TOP)
1982 << ", dir "
1983 << page_offset(page_dir_get_nth_slot(page,
1984 n_slots - 1));
1985
1986 goto func_exit;
1987 }
1988
1989 /* Validate the record list in a loop checking also that it is
1990 consistent with the page record directory. */
1991
1992 count = 0;
1993 own_count = 1;
1994 slot_no = 0;
1995 slot = page_dir_get_nth_slot(page, slot_no);
1996
1997 rec = page_get_infimum_rec(page);
1998
1999 for (;;) {
2000 if (UNIV_UNLIKELY(rec > rec_heap_top)) {
2001 ib::error() << "Record " << (rec - page)
2002 << " is above rec heap top "
2003 << (rec_heap_top - page);
2004
2005 goto func_exit;
2006 }
2007
2008 if (UNIV_UNLIKELY(rec_get_n_owned_old(rec) != 0)) {
2009 /* This is a record pointed to by a dir slot */
2010 if (UNIV_UNLIKELY(rec_get_n_owned_old(rec)
2011 != own_count)) {
2012
2013 ib::error() << "Wrong owned count "
2014 << rec_get_n_owned_old(rec)
2015 << ", " << own_count << ", rec "
2016 << (rec - page);
2017
2018 goto func_exit;
2019 }
2020
2021 if (UNIV_UNLIKELY
2022 (page_dir_slot_get_rec(slot) != rec)) {
2023 ib::error() << "Dir slot does not point"
2024 " to right rec " << (rec - page);
2025
2026 goto func_exit;
2027 }
2028
2029 own_count = 0;
2030
2031 if (!page_rec_is_supremum(rec)) {
2032 slot_no++;
2033 slot = page_dir_get_nth_slot(page, slot_no);
2034 }
2035 }
2036
2037 if (page_rec_is_supremum(rec)) {
2038
2039 break;
2040 }
2041
2042 if (UNIV_UNLIKELY
2043 (rec_get_next_offs(rec, FALSE) < FIL_PAGE_DATA
2044 || rec_get_next_offs(rec, FALSE) >= srv_page_size)) {
2045
2046 ib::error() << "Next record offset nonsensical "
2047 << rec_get_next_offs(rec, FALSE) << " for rec "
2048 << (rec - page);
2049
2050 goto func_exit;
2051 }
2052
2053 count++;
2054
2055 if (UNIV_UNLIKELY(count > srv_page_size)) {
2056 ib::error() << "Page record list appears"
2057 " to be circular " << count;
2058 goto func_exit;
2059 }
2060
2061 rec = page_rec_get_next_const(rec);
2062 own_count++;
2063 }
2064
2065 if (UNIV_UNLIKELY(rec_get_n_owned_old(rec) == 0)) {
2066 ib::error() << "n owned is zero in a supremum rec";
2067
2068 goto func_exit;
2069 }
2070
2071 if (UNIV_UNLIKELY(slot_no != n_slots - 1)) {
2072 ib::error() << "n slots wrong "
2073 << slot_no << ", " << (n_slots - 1);
2074 goto func_exit;
2075 }
2076
2077 if (UNIV_UNLIKELY(ulint(page_header_get_field(page, PAGE_N_RECS))
2078 + PAGE_HEAP_NO_USER_LOW
2079 != count + 1)) {
2080 ib::error() << "n recs wrong "
2081 << page_header_get_field(page, PAGE_N_RECS)
2082 + PAGE_HEAP_NO_USER_LOW << " " << (count + 1);
2083
2084 goto func_exit;
2085 }
2086
2087 /* Check then the free list */
2088 rec = page_header_get_ptr(page, PAGE_FREE);
2089
2090 while (rec != NULL) {
2091 if (UNIV_UNLIKELY(rec < page + FIL_PAGE_DATA
2092 || rec >= page + srv_page_size)) {
2093 ib::error() << "Free list record has"
2094 " a nonsensical offset " << (rec - page);
2095
2096 goto func_exit;
2097 }
2098
2099 if (UNIV_UNLIKELY(rec > rec_heap_top)) {
2100 ib::error() << "Free list record " << (rec - page)
2101 << " is above rec heap top "
2102 << (rec_heap_top - page);
2103
2104 goto func_exit;
2105 }
2106
2107 count++;
2108
2109 if (UNIV_UNLIKELY(count > srv_page_size)) {
2110 ib::error() << "Page free list appears"
2111 " to be circular " << count;
2112 goto func_exit;
2113 }
2114
2115 ulint offs = rec_get_next_offs(rec, FALSE);
2116 if (!offs) {
2117 break;
2118 }
2119 if (UNIV_UNLIKELY(offs < PAGE_OLD_INFIMUM
2120 || offs >= srv_page_size)) {
2121 ib::error() << "Page free list is corrupted " << count;
2122 goto func_exit;
2123 }
2124
2125 rec = page + offs;
2126 }
2127
2128 if (UNIV_UNLIKELY(page_dir_get_n_heap(page) != count + 1)) {
2129
2130 ib::error() << "N heap is wrong "
2131 << page_dir_get_n_heap(page) << ", " << (count + 1);
2132
2133 goto func_exit;
2134 }
2135
2136 ret = TRUE;
2137
2138 func_exit:
2139 return(ret);
2140 }
2141
2142 /***************************************************************//**
2143 This function checks the consistency of an index page when we do not
2144 know the index. This is also resilient so that this should never crash
2145 even if the page is total garbage.
2146 @return TRUE if ok */
2147 ibool
page_simple_validate_new(const page_t * page)2148 page_simple_validate_new(
2149 /*=====================*/
2150 const page_t* page) /*!< in: index page in ROW_FORMAT!=REDUNDANT */
2151 {
2152 const page_dir_slot_t* slot;
2153 ulint slot_no;
2154 ulint n_slots;
2155 const rec_t* rec;
2156 const byte* rec_heap_top;
2157 ulint count;
2158 ulint own_count;
2159 ibool ret = FALSE;
2160
2161 ut_a(page_is_comp(page));
2162
2163 /* Check first that the record heap and the directory do not
2164 overlap. */
2165
2166 n_slots = page_dir_get_n_slots(page);
2167
2168 if (UNIV_UNLIKELY(n_slots < 2 || n_slots > srv_page_size / 4)) {
2169 ib::error() << "Nonsensical number of page dir slots: "
2170 << n_slots;
2171 goto func_exit;
2172 }
2173
2174 rec_heap_top = page_header_get_ptr(page, PAGE_HEAP_TOP);
2175
2176 if (UNIV_UNLIKELY(rec_heap_top
2177 > page_dir_get_nth_slot(page, n_slots - 1))) {
2178
2179 ib::error() << "Record heap and dir overlap on a page,"
2180 " heap top "
2181 << page_header_get_field(page, PAGE_HEAP_TOP)
2182 << ", dir " << page_offset(
2183 page_dir_get_nth_slot(page, n_slots - 1));
2184
2185 goto func_exit;
2186 }
2187
2188 /* Validate the record list in a loop checking also that it is
2189 consistent with the page record directory. */
2190
2191 count = 0;
2192 own_count = 1;
2193 slot_no = 0;
2194 slot = page_dir_get_nth_slot(page, slot_no);
2195
2196 rec = page_get_infimum_rec(page);
2197
2198 for (;;) {
2199 if (UNIV_UNLIKELY(rec > rec_heap_top)) {
2200
2201 ib::error() << "Record " << page_offset(rec)
2202 << " is above rec heap top "
2203 << page_offset(rec_heap_top);
2204
2205 goto func_exit;
2206 }
2207
2208 if (UNIV_UNLIKELY(rec_get_n_owned_new(rec) != 0)) {
2209 /* This is a record pointed to by a dir slot */
2210 if (UNIV_UNLIKELY(rec_get_n_owned_new(rec)
2211 != own_count)) {
2212
2213 ib::error() << "Wrong owned count "
2214 << rec_get_n_owned_new(rec) << ", "
2215 << own_count << ", rec "
2216 << page_offset(rec);
2217
2218 goto func_exit;
2219 }
2220
2221 if (UNIV_UNLIKELY
2222 (page_dir_slot_get_rec(slot) != rec)) {
2223 ib::error() << "Dir slot does not point"
2224 " to right rec " << page_offset(rec);
2225
2226 goto func_exit;
2227 }
2228
2229 own_count = 0;
2230
2231 if (!page_rec_is_supremum(rec)) {
2232 slot_no++;
2233 slot = page_dir_get_nth_slot(page, slot_no);
2234 }
2235 }
2236
2237 if (page_rec_is_supremum(rec)) {
2238
2239 break;
2240 }
2241
2242 if (UNIV_UNLIKELY
2243 (rec_get_next_offs(rec, TRUE) < FIL_PAGE_DATA
2244 || rec_get_next_offs(rec, TRUE) >= srv_page_size)) {
2245
2246 ib::error() << "Next record offset nonsensical "
2247 << rec_get_next_offs(rec, TRUE)
2248 << " for rec " << page_offset(rec);
2249
2250 goto func_exit;
2251 }
2252
2253 count++;
2254
2255 if (UNIV_UNLIKELY(count > srv_page_size)) {
2256 ib::error() << "Page record list appears to be"
2257 " circular " << count;
2258 goto func_exit;
2259 }
2260
2261 rec = page_rec_get_next_const(rec);
2262 own_count++;
2263 }
2264
2265 if (UNIV_UNLIKELY(rec_get_n_owned_new(rec) == 0)) {
2266 ib::error() << "n owned is zero in a supremum rec";
2267
2268 goto func_exit;
2269 }
2270
2271 if (UNIV_UNLIKELY(slot_no != n_slots - 1)) {
2272 ib::error() << "n slots wrong " << slot_no << ", "
2273 << (n_slots - 1);
2274 goto func_exit;
2275 }
2276
2277 if (UNIV_UNLIKELY(ulint(page_header_get_field(page, PAGE_N_RECS))
2278 + PAGE_HEAP_NO_USER_LOW
2279 != count + 1)) {
2280 ib::error() << "n recs wrong "
2281 << page_header_get_field(page, PAGE_N_RECS)
2282 + PAGE_HEAP_NO_USER_LOW << " " << (count + 1);
2283
2284 goto func_exit;
2285 }
2286
2287 /* Check then the free list */
2288 rec = page_header_get_ptr(page, PAGE_FREE);
2289
2290 while (rec != NULL) {
2291 if (UNIV_UNLIKELY(rec < page + FIL_PAGE_DATA
2292 || rec >= page + srv_page_size)) {
2293
2294 ib::error() << "Free list record has"
2295 " a nonsensical offset " << page_offset(rec);
2296
2297 goto func_exit;
2298 }
2299
2300 if (UNIV_UNLIKELY(rec > rec_heap_top)) {
2301 ib::error() << "Free list record " << page_offset(rec)
2302 << " is above rec heap top "
2303 << page_offset(rec_heap_top);
2304
2305 goto func_exit;
2306 }
2307
2308 count++;
2309
2310 if (UNIV_UNLIKELY(count > srv_page_size)) {
2311 ib::error() << "Page free list appears to be"
2312 " circular " << count;
2313 goto func_exit;
2314 }
2315
2316 const ulint offs = rec_get_next_offs(rec, TRUE);
2317 if (!offs) {
2318 break;
2319 }
2320 if (UNIV_UNLIKELY(offs < PAGE_OLD_INFIMUM
2321 || offs >= srv_page_size)) {
2322 ib::error() << "Page free list is corrupted " << count;
2323 goto func_exit;
2324 }
2325
2326 rec = page + offs;
2327 }
2328
2329 if (UNIV_UNLIKELY(page_dir_get_n_heap(page) != count + 1)) {
2330
2331 ib::error() << "N heap is wrong "
2332 << page_dir_get_n_heap(page) << ", " << (count + 1);
2333
2334 goto func_exit;
2335 }
2336
2337 ret = TRUE;
2338
2339 func_exit:
2340 return(ret);
2341 }
2342
2343 /** Check the consistency of an index page.
2344 @param[in] page index page
2345 @param[in] index B-tree or R-tree index
2346 @return whether the page is valid */
page_validate(const page_t * page,const dict_index_t * index)2347 bool page_validate(const page_t* page, const dict_index_t* index)
2348 {
2349 const page_dir_slot_t* slot;
2350 const rec_t* rec;
2351 const rec_t* old_rec = NULL;
2352 const rec_t* first_rec = NULL;
2353 ulint offs = 0;
2354 ulint n_slots;
2355 ibool ret = TRUE;
2356 ulint i;
2357 rec_offs offsets_1[REC_OFFS_NORMAL_SIZE];
2358 rec_offs offsets_2[REC_OFFS_NORMAL_SIZE];
2359 rec_offs* offsets = offsets_1;
2360 rec_offs* old_offsets = offsets_2;
2361
2362 rec_offs_init(offsets_1);
2363 rec_offs_init(offsets_2);
2364
2365 #ifdef UNIV_GIS_DEBUG
2366 if (dict_index_is_spatial(index)) {
2367 fprintf(stderr, "Page no: %lu\n", page_get_page_no(page));
2368 }
2369 #endif /* UNIV_DEBUG */
2370
2371 if (UNIV_UNLIKELY((ibool) !!page_is_comp(page)
2372 != dict_table_is_comp(index->table))) {
2373 ib::error() << "'compact format' flag mismatch";
2374 func_exit2:
2375 ib::error() << "Apparent corruption in space "
2376 << page_get_space_id(page) << " page "
2377 << page_get_page_no(page)
2378 << " of index " << index->name
2379 << " of table " << index->table->name;
2380 return FALSE;
2381 }
2382
2383 if (page_is_comp(page)) {
2384 if (UNIV_UNLIKELY(!page_simple_validate_new(page))) {
2385 goto func_exit2;
2386 }
2387 } else {
2388 if (UNIV_UNLIKELY(!page_simple_validate_old(page))) {
2389 goto func_exit2;
2390 }
2391 }
2392
2393 /* Multiple transactions cannot simultaneously operate on the
2394 same temp-table in parallel.
2395 max_trx_id is ignored for temp tables because it not required
2396 for MVCC. */
2397 if (!page_is_leaf(page) || page_is_empty(page)
2398 || !dict_index_is_sec_or_ibuf(index)
2399 || index->table->is_temporary()) {
2400 } else if (trx_id_t sys_max_trx_id = trx_sys.get_max_trx_id()) {
2401 trx_id_t max_trx_id = page_get_max_trx_id(page);
2402
2403 if (max_trx_id == 0 || max_trx_id > sys_max_trx_id) {
2404 ib::error() << "PAGE_MAX_TRX_ID out of bounds: "
2405 << max_trx_id << ", " << sys_max_trx_id;
2406 ret = FALSE;
2407 }
2408 } else {
2409 ut_ad(srv_force_recovery >= SRV_FORCE_NO_UNDO_LOG_SCAN);
2410 }
2411
2412 /* Check first that the record heap and the directory do not
2413 overlap. */
2414
2415 n_slots = page_dir_get_n_slots(page);
2416
2417 if (UNIV_UNLIKELY(!(page_header_get_ptr(page, PAGE_HEAP_TOP)
2418 <= page_dir_get_nth_slot(page, n_slots - 1)))) {
2419
2420 ib::warn() << "Record heap and directory overlap";
2421 goto func_exit2;
2422 }
2423
2424 switch (uint16_t type = fil_page_get_type(page)) {
2425 case FIL_PAGE_RTREE:
2426 if (!index->is_spatial()) {
2427 wrong_page_type:
2428 ib::warn() << "Wrong page type " << type;
2429 ret = FALSE;
2430 }
2431 break;
2432 case FIL_PAGE_TYPE_INSTANT:
2433 if (index->is_instant()
2434 && page_get_page_no(page) == index->page) {
2435 break;
2436 }
2437 goto wrong_page_type;
2438 case FIL_PAGE_INDEX:
2439 if (index->is_spatial()) {
2440 goto wrong_page_type;
2441 }
2442 if (index->is_instant()
2443 && page_get_page_no(page) == index->page) {
2444 goto wrong_page_type;
2445 }
2446 break;
2447 default:
2448 goto wrong_page_type;
2449 }
2450
2451 /* The following buffer is used to check that the
2452 records in the page record heap do not overlap */
2453 mem_heap_t* heap = mem_heap_create(srv_page_size + 200);;
2454 byte* buf = static_cast<byte*>(mem_heap_zalloc(heap, srv_page_size));
2455
2456 /* Validate the record list in a loop checking also that
2457 it is consistent with the directory. */
2458 ulint count = 0, data_size = 0, own_count = 1, slot_no = 0;
2459 ulint info_bits;
2460 slot_no = 0;
2461 slot = page_dir_get_nth_slot(page, slot_no);
2462
2463 rec = page_get_infimum_rec(page);
2464
2465 const ulint n_core = page_is_leaf(page) ? index->n_core_fields : 0;
2466
2467 for (;;) {
2468 offsets = rec_get_offsets(rec, index, offsets, n_core,
2469 ULINT_UNDEFINED, &heap);
2470
2471 if (page_is_comp(page) && page_rec_is_user_rec(rec)
2472 && UNIV_UNLIKELY(rec_get_node_ptr_flag(rec)
2473 == page_is_leaf(page))) {
2474 ib::error() << "'node_ptr' flag mismatch";
2475 ret = FALSE;
2476 goto next_rec;
2477 }
2478
2479 if (UNIV_UNLIKELY(!page_rec_validate(rec, offsets))) {
2480 ret = FALSE;
2481 goto next_rec;
2482 }
2483
2484 info_bits = rec_get_info_bits(rec, page_is_comp(page));
2485 if (info_bits
2486 & ~(REC_INFO_MIN_REC_FLAG | REC_INFO_DELETED_FLAG)) {
2487 ib::error() << "info_bits has an incorrect value "
2488 << info_bits;
2489 ret = false;
2490 }
2491
2492 if (rec == first_rec) {
2493 if (info_bits & REC_INFO_MIN_REC_FLAG) {
2494 if (page_has_prev(page)) {
2495 ib::error() << "REC_INFO_MIN_REC_FLAG "
2496 "is set on non-left page";
2497 ret = false;
2498 } else if (!page_is_leaf(page)) {
2499 /* leftmost node pointer page */
2500 } else if (!index->is_instant()) {
2501 ib::error() << "REC_INFO_MIN_REC_FLAG "
2502 "is set in a leaf-page record";
2503 ret = false;
2504 } else if (!(info_bits & REC_INFO_DELETED_FLAG)
2505 != !index->table->instant) {
2506 ib::error() << (index->table->instant
2507 ? "Metadata record "
2508 "is not delete-marked"
2509 : "Metadata record "
2510 "is delete-marked");
2511 ret = false;
2512 }
2513 } else if (!page_has_prev(page)
2514 && index->is_instant()) {
2515 ib::error() << "Metadata record is missing";
2516 ret = false;
2517 }
2518 } else if (info_bits & REC_INFO_MIN_REC_FLAG) {
2519 ib::error() << "REC_INFO_MIN_REC_FLAG record is not "
2520 "first in page";
2521 ret = false;
2522 }
2523
2524 if (page_is_comp(page)) {
2525 const rec_comp_status_t status = rec_get_status(rec);
2526 if (status != REC_STATUS_ORDINARY
2527 && status != REC_STATUS_NODE_PTR
2528 && status != REC_STATUS_INFIMUM
2529 && status != REC_STATUS_SUPREMUM
2530 && status != REC_STATUS_INSTANT) {
2531 ib::error() << "impossible record status "
2532 << status;
2533 ret = false;
2534 } else if (page_rec_is_infimum(rec)) {
2535 if (status != REC_STATUS_INFIMUM) {
2536 ib::error()
2537 << "infimum record has status "
2538 << status;
2539 ret = false;
2540 }
2541 } else if (page_rec_is_supremum(rec)) {
2542 if (status != REC_STATUS_SUPREMUM) {
2543 ib::error() << "supremum record has "
2544 "status "
2545 << status;
2546 ret = false;
2547 }
2548 } else if (!page_is_leaf(page)) {
2549 if (status != REC_STATUS_NODE_PTR) {
2550 ib::error() << "node ptr record has "
2551 "status "
2552 << status;
2553 ret = false;
2554 }
2555 } else if (!index->is_instant()
2556 && status == REC_STATUS_INSTANT) {
2557 ib::error() << "instantly added record in a "
2558 "non-instant index";
2559 ret = false;
2560 }
2561 }
2562
2563 /* Check that the records are in the ascending order */
2564 if (count >= PAGE_HEAP_NO_USER_LOW
2565 && !page_rec_is_supremum(rec)) {
2566
2567 int ret = cmp_rec_rec(
2568 rec, old_rec, offsets, old_offsets, index);
2569
2570 /* For spatial index, on nonleaf leavel, we
2571 allow recs to be equal. */
2572 if (ret <= 0 && !(ret == 0 && index->is_spatial()
2573 && !page_is_leaf(page))) {
2574
2575 ib::error() << "Records in wrong order";
2576
2577 fputs("\nInnoDB: previous record ", stderr);
2578 /* For spatial index, print the mbr info.*/
2579 if (index->type & DICT_SPATIAL) {
2580 putc('\n', stderr);
2581 rec_print_mbr_rec(stderr,
2582 old_rec, old_offsets);
2583 fputs("\nInnoDB: record ", stderr);
2584 putc('\n', stderr);
2585 rec_print_mbr_rec(stderr, rec, offsets);
2586 putc('\n', stderr);
2587 putc('\n', stderr);
2588
2589 } else {
2590 rec_print_new(stderr, old_rec, old_offsets);
2591 fputs("\nInnoDB: record ", stderr);
2592 rec_print_new(stderr, rec, offsets);
2593 putc('\n', stderr);
2594 }
2595
2596 ret = FALSE;
2597 }
2598 }
2599
2600 if (page_rec_is_user_rec(rec)) {
2601
2602 data_size += rec_offs_size(offsets);
2603
2604 #if defined(UNIV_GIS_DEBUG)
2605 /* For spatial index, print the mbr info.*/
2606 if (index->type & DICT_SPATIAL) {
2607 rec_print_mbr_rec(stderr, rec, offsets);
2608 putc('\n', stderr);
2609 }
2610 #endif /* UNIV_GIS_DEBUG */
2611 }
2612
2613 offs = page_offset(rec_get_start(rec, offsets));
2614 i = rec_offs_size(offsets);
2615 if (UNIV_UNLIKELY(offs + i >= srv_page_size)) {
2616 ib::error() << "Record offset out of bounds: "
2617 << offs << '+' << i;
2618 ret = FALSE;
2619 goto next_rec;
2620 }
2621 while (i--) {
2622 if (UNIV_UNLIKELY(buf[offs + i])) {
2623 ib::error() << "Record overlaps another: "
2624 << offs << '+' << i;
2625 ret = FALSE;
2626 break;
2627 }
2628 buf[offs + i] = 1;
2629 }
2630
2631 if (ulint rec_own_count = page_is_comp(page)
2632 ? rec_get_n_owned_new(rec)
2633 : rec_get_n_owned_old(rec)) {
2634 /* This is a record pointed to by a dir slot */
2635 if (UNIV_UNLIKELY(rec_own_count != own_count)) {
2636 ib::error() << "Wrong owned count at " << offs
2637 << ": " << rec_own_count
2638 << ", " << own_count;
2639 ret = FALSE;
2640 }
2641
2642 if (page_dir_slot_get_rec(slot) != rec) {
2643 ib::error() << "Dir slot does not"
2644 " point to right rec at " << offs;
2645 ret = FALSE;
2646 }
2647
2648 if (ret) {
2649 page_dir_slot_check(slot);
2650 }
2651
2652 own_count = 0;
2653 if (!page_rec_is_supremum(rec)) {
2654 slot_no++;
2655 slot = page_dir_get_nth_slot(page, slot_no);
2656 }
2657 }
2658
2659 next_rec:
2660 if (page_rec_is_supremum(rec)) {
2661 break;
2662 }
2663
2664 count++;
2665 own_count++;
2666 old_rec = rec;
2667 rec = page_rec_get_next_const(rec);
2668
2669 if (page_rec_is_infimum(old_rec)
2670 && page_rec_is_user_rec(rec)) {
2671 first_rec = rec;
2672 }
2673
2674 /* set old_offsets to offsets; recycle offsets */
2675 std::swap(old_offsets, offsets);
2676 }
2677
2678 if (page_is_comp(page)) {
2679 if (UNIV_UNLIKELY(rec_get_n_owned_new(rec) == 0)) {
2680
2681 goto n_owned_zero;
2682 }
2683 } else if (UNIV_UNLIKELY(rec_get_n_owned_old(rec) == 0)) {
2684 n_owned_zero:
2685 ib::error() << "n owned is zero at " << offs;
2686 ret = FALSE;
2687 }
2688
2689 if (UNIV_UNLIKELY(slot_no != n_slots - 1)) {
2690 ib::error() << "n slots wrong " << slot_no << " "
2691 << (n_slots - 1);
2692 ret = FALSE;
2693 }
2694
2695 if (UNIV_UNLIKELY(ulint(page_header_get_field(page, PAGE_N_RECS))
2696 + PAGE_HEAP_NO_USER_LOW
2697 != count + 1)) {
2698 ib::error() << "n recs wrong "
2699 << page_header_get_field(page, PAGE_N_RECS)
2700 + PAGE_HEAP_NO_USER_LOW << " " << (count + 1);
2701 ret = FALSE;
2702 }
2703
2704 if (UNIV_UNLIKELY(data_size != page_get_data_size(page))) {
2705 ib::error() << "Summed data size " << data_size
2706 << ", returned by func " << page_get_data_size(page);
2707 ret = FALSE;
2708 }
2709
2710 /* Check then the free list */
2711 rec = page_header_get_ptr(page, PAGE_FREE);
2712
2713 while (rec != NULL) {
2714 offsets = rec_get_offsets(rec, index, offsets, n_core,
2715 ULINT_UNDEFINED, &heap);
2716 if (UNIV_UNLIKELY(!page_rec_validate(rec, offsets))) {
2717 ret = FALSE;
2718 next_free:
2719 const ulint offs = rec_get_next_offs(
2720 rec, page_is_comp(page));
2721 if (!offs) {
2722 break;
2723 }
2724 if (UNIV_UNLIKELY(offs < PAGE_OLD_INFIMUM
2725 || offs >= srv_page_size)) {
2726 ib::error() << "Page free list is corrupted";
2727 ret = FALSE;
2728 break;
2729 }
2730
2731 rec = page + offs;
2732 continue;
2733 }
2734
2735 count++;
2736 offs = page_offset(rec_get_start(rec, offsets));
2737 i = rec_offs_size(offsets);
2738 if (UNIV_UNLIKELY(offs + i >= srv_page_size)) {
2739 ib::error() << "Free record offset out of bounds: "
2740 << offs << '+' << i;
2741 ret = FALSE;
2742 goto next_free;
2743 }
2744 while (i--) {
2745 if (UNIV_UNLIKELY(buf[offs + i])) {
2746 ib::error() << "Free record overlaps another: "
2747 << offs << '+' << i;
2748 ret = FALSE;
2749 break;
2750 }
2751 buf[offs + i] = 1;
2752 }
2753
2754 goto next_free;
2755 }
2756
2757 if (UNIV_UNLIKELY(page_dir_get_n_heap(page) != count + 1)) {
2758 ib::error() << "N heap is wrong "
2759 << page_dir_get_n_heap(page) << " " << count + 1;
2760 ret = FALSE;
2761 }
2762
2763 mem_heap_free(heap);
2764
2765 if (UNIV_UNLIKELY(!ret)) {
2766 goto func_exit2;
2767 }
2768
2769 return(ret);
2770 }
2771
2772 /***************************************************************//**
2773 Looks in the page record list for a record with the given heap number.
2774 @return record, NULL if not found */
2775 const rec_t*
page_find_rec_with_heap_no(const page_t * page,ulint heap_no)2776 page_find_rec_with_heap_no(
2777 /*=======================*/
2778 const page_t* page, /*!< in: index page */
2779 ulint heap_no)/*!< in: heap number */
2780 {
2781 const rec_t* rec;
2782
2783 if (page_is_comp(page)) {
2784 rec = page + PAGE_NEW_INFIMUM;
2785
2786 for (;;) {
2787 ulint rec_heap_no = rec_get_heap_no_new(rec);
2788
2789 if (rec_heap_no == heap_no) {
2790
2791 return(rec);
2792 } else if (rec_heap_no == PAGE_HEAP_NO_SUPREMUM) {
2793
2794 return(NULL);
2795 }
2796
2797 rec = page + rec_get_next_offs(rec, TRUE);
2798 }
2799 } else {
2800 rec = page + PAGE_OLD_INFIMUM;
2801
2802 for (;;) {
2803 ulint rec_heap_no = rec_get_heap_no_old(rec);
2804
2805 if (rec_heap_no == heap_no) {
2806
2807 return(rec);
2808 } else if (rec_heap_no == PAGE_HEAP_NO_SUPREMUM) {
2809
2810 return(NULL);
2811 }
2812
2813 rec = page + rec_get_next_offs(rec, FALSE);
2814 }
2815 }
2816 }
2817
2818 /*******************************************************//**
2819 Removes the record from a leaf page. This function does not log
2820 any changes. It is used by the IMPORT tablespace functions.
2821 The cursor is moved to the next record after the deleted one.
2822 @return true if success, i.e., the page did not become too empty */
2823 bool
page_delete_rec(const dict_index_t * index,page_cur_t * pcur,page_zip_des_t * page_zip,const rec_offs * offsets)2824 page_delete_rec(
2825 /*============*/
2826 const dict_index_t* index, /*!< in: The index that the record
2827 belongs to */
2828 page_cur_t* pcur, /*!< in/out: page cursor on record
2829 to delete */
2830 page_zip_des_t*
2831 #ifdef UNIV_ZIP_DEBUG
2832 page_zip/*!< in: compressed page descriptor */
2833 #endif
2834 ,
2835 const rec_offs* offsets)/*!< in: offsets for record */
2836 {
2837 bool no_compress_needed;
2838 buf_block_t* block = pcur->block;
2839 page_t* page = buf_block_get_frame(block);
2840
2841 ut_ad(page_is_leaf(page));
2842
2843 if (!rec_offs_any_extern(offsets)
2844 && ((page_get_data_size(page) - rec_offs_size(offsets)
2845 < BTR_CUR_PAGE_COMPRESS_LIMIT(index))
2846 || !page_has_siblings(page)
2847 || (page_get_n_recs(page) < 2))) {
2848
2849 ulint root_page_no = dict_index_get_page(index);
2850
2851 /* The page fillfactor will drop below a predefined
2852 minimum value, OR the level in the B-tree contains just
2853 one page, OR the page will become empty: we recommend
2854 compression if this is not the root page. */
2855
2856 no_compress_needed = page_get_page_no(page) == root_page_no;
2857 } else {
2858 no_compress_needed = true;
2859 }
2860
2861 if (no_compress_needed) {
2862 #ifdef UNIV_ZIP_DEBUG
2863 ut_a(!page_zip || page_zip_validate(page_zip, page, index));
2864 #endif /* UNIV_ZIP_DEBUG */
2865
2866 page_cur_delete_rec(pcur, index, offsets, 0);
2867
2868 #ifdef UNIV_ZIP_DEBUG
2869 ut_a(!page_zip || page_zip_validate(page_zip, page, index));
2870 #endif /* UNIV_ZIP_DEBUG */
2871 }
2872
2873 return(no_compress_needed);
2874 }
2875
2876 /** Get the last non-delete-marked record on a page.
2877 @param[in] page index tree leaf page
2878 @return the last record, not delete-marked
2879 @retval infimum record if all records are delete-marked */
2880 const rec_t*
page_find_rec_max_not_deleted(const page_t * page)2881 page_find_rec_max_not_deleted(
2882 const page_t* page)
2883 {
2884 const rec_t* rec = page_get_infimum_rec(page);
2885 const rec_t* prev_rec = NULL; // remove warning
2886
2887 /* Because the page infimum is never delete-marked
2888 and never the metadata pseudo-record (MIN_REC_FLAG)),
2889 prev_rec will always be assigned to it first. */
2890 ut_ad(!rec_get_info_bits(rec, page_rec_is_comp(rec)));
2891 ut_ad(page_is_leaf(page));
2892
2893 if (page_is_comp(page)) {
2894 do {
2895 if (!(rec[-REC_NEW_INFO_BITS]
2896 & (REC_INFO_DELETED_FLAG
2897 | REC_INFO_MIN_REC_FLAG))) {
2898 prev_rec = rec;
2899 }
2900 rec = page_rec_get_next_low(rec, true);
2901 } while (rec != page + PAGE_NEW_SUPREMUM);
2902 } else {
2903 do {
2904 if (!(rec[-REC_OLD_INFO_BITS]
2905 & (REC_INFO_DELETED_FLAG
2906 | REC_INFO_MIN_REC_FLAG))) {
2907 prev_rec = rec;
2908 }
2909 rec = page_rec_get_next_low(rec, false);
2910 } while (rec != page + PAGE_OLD_SUPREMUM);
2911 }
2912 return(prev_rec);
2913 }
2914