1 /*****************************************************************************
2 
3 Copyright (c) 1994, 2016, Oracle and/or its affiliates. All Rights Reserved.
4 Copyright (c) 2012, Facebook Inc.
5 
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License, version 2.0,
8 as published by the Free Software Foundation.
9 
10 This program is also distributed with certain software (including
11 but not limited to OpenSSL) that is licensed under separate terms,
12 as designated in a particular file or component or in included license
13 documentation.  The authors of MySQL hereby grant you an additional
14 permission to link the program and your derivative works with the
15 separately licensed software that they have included with MySQL.
16 
17 This program is distributed in the hope that it will be useful,
18 but WITHOUT ANY WARRANTY; without even the implied warranty of
19 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20 GNU General Public License, version 2.0, for more details.
21 
22 You should have received a copy of the GNU General Public License along with
23 this program; if not, write to the Free Software Foundation, Inc.,
24 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
25 
26 *****************************************************************************/
27 
28 /**************************************************//**
29 @file btr/btr0btr.cc
30 The B-tree
31 
32 Created 6/2/1994 Heikki Tuuri
33 *******************************************************/
34 
35 #include "btr0btr.h"
36 
37 #ifdef UNIV_NONINL
38 #include "btr0btr.ic"
39 #endif
40 
41 #include "fsp0fsp.h"
42 #include "page0page.h"
43 #include "page0zip.h"
44 
45 #ifndef UNIV_HOTBACKUP
46 #include "btr0cur.h"
47 #include "btr0sea.h"
48 #include "btr0pcur.h"
49 #include "rem0cmp.h"
50 #include "lock0lock.h"
51 #include "ibuf0ibuf.h"
52 #include "trx0trx.h"
53 #include "srv0mon.h"
54 
55 /**************************************************************//**
56 Checks if the page in the cursor can be merged with given page.
57 If necessary, re-organize the merge_page.
58 @return	TRUE if possible to merge. */
59 UNIV_INTERN
60 ibool
61 btr_can_merge_with_page(
62 /*====================*/
63 	btr_cur_t*	cursor,		/*!< in: cursor on the page to merge */
64 	ulint		page_no,	/*!< in: a sibling page */
65 	buf_block_t**	merge_block,	/*!< out: the merge block */
66 	mtr_t*		mtr);		/*!< in: mini-transaction */
67 
68 #endif /* UNIV_HOTBACKUP */
69 
70 /**************************************************************//**
71 Report that an index page is corrupted. */
72 UNIV_INTERN
73 void
btr_corruption_report(const buf_block_t * block,const dict_index_t * index)74 btr_corruption_report(
75 /*==================*/
76 	const buf_block_t*	block,	/*!< in: corrupted block */
77 	const dict_index_t*	index)	/*!< in: index tree */
78 {
79 	fprintf(stderr, "InnoDB: flag mismatch in space %u page %u"
80 		" index %s of table %s\n",
81 		(unsigned) buf_block_get_space(block),
82 		(unsigned) buf_block_get_page_no(block),
83 		index->name, index->table_name);
84 	if (block->page.zip.data) {
85 		buf_page_print(block->page.zip.data,
86 			       buf_block_get_zip_size(block),
87 			       BUF_PAGE_PRINT_NO_CRASH);
88 	}
89 	buf_page_print(buf_nonnull_block_get_frame(block), 0, 0);
90 }
91 
92 #ifndef UNIV_HOTBACKUP
93 #ifdef UNIV_BLOB_DEBUG
94 # include "srv0srv.h"
95 # include "ut0rbt.h"
96 
97 /** TRUE when messages about index->blobs modification are enabled. */
98 static ibool btr_blob_dbg_msg;
99 
100 /** Issue a message about an operation on index->blobs.
101 @param op	operation
102 @param b	the entry being subjected to the operation
103 @param ctx	the context of the operation */
104 #define btr_blob_dbg_msg_issue(op, b, ctx)			\
105 	fprintf(stderr, op " %u:%u:%u->%u %s(%u,%u,%u)\n",	\
106 		(b)->ref_page_no, (b)->ref_heap_no,		\
107 		(b)->ref_field_no, (b)->blob_page_no, ctx,	\
108 		(b)->owner, (b)->always_owner, (b)->del)
109 
110 /** Insert to index->blobs a reference to an off-page column.
111 @param index	the index tree
112 @param b	the reference
113 @param ctx	context (for logging) */
114 UNIV_INTERN
115 void
btr_blob_dbg_rbt_insert(dict_index_t * index,const btr_blob_dbg_t * b,const char * ctx)116 btr_blob_dbg_rbt_insert(
117 /*====================*/
118 	dict_index_t*		index,	/*!< in/out: index tree */
119 	const btr_blob_dbg_t*	b,	/*!< in: the reference */
120 	const char*		ctx)	/*!< in: context (for logging) */
121 {
122 	if (btr_blob_dbg_msg) {
123 		btr_blob_dbg_msg_issue("insert", b, ctx);
124 	}
125 	mutex_enter(&index->blobs_mutex);
126 	rbt_insert(index->blobs, b, b);
127 	mutex_exit(&index->blobs_mutex);
128 }
129 
130 /** Remove from index->blobs a reference to an off-page column.
131 @param index	the index tree
132 @param b	the reference
133 @param ctx	context (for logging) */
134 UNIV_INTERN
135 void
btr_blob_dbg_rbt_delete(dict_index_t * index,const btr_blob_dbg_t * b,const char * ctx)136 btr_blob_dbg_rbt_delete(
137 /*====================*/
138 	dict_index_t*		index,	/*!< in/out: index tree */
139 	const btr_blob_dbg_t*	b,	/*!< in: the reference */
140 	const char*		ctx)	/*!< in: context (for logging) */
141 {
142 	if (btr_blob_dbg_msg) {
143 		btr_blob_dbg_msg_issue("delete", b, ctx);
144 	}
145 	mutex_enter(&index->blobs_mutex);
146 	ut_a(rbt_delete(index->blobs, b));
147 	mutex_exit(&index->blobs_mutex);
148 }
149 
150 /**************************************************************//**
151 Comparator for items (btr_blob_dbg_t) in index->blobs.
152 The key in index->blobs is (ref_page_no, ref_heap_no, ref_field_no).
153 @return negative, 0 or positive if *a<*b, *a=*b, *a>*b */
154 static
155 int
btr_blob_dbg_cmp(const void * a,const void * b)156 btr_blob_dbg_cmp(
157 /*=============*/
158 	const void*	a,	/*!< in: first btr_blob_dbg_t to compare */
159 	const void*	b)	/*!< in: second btr_blob_dbg_t to compare */
160 {
161 	const btr_blob_dbg_t*	aa = static_cast<const btr_blob_dbg_t*>(a);
162 	const btr_blob_dbg_t*	bb = static_cast<const btr_blob_dbg_t*>(b);
163 
164 	ut_ad(aa != NULL);
165 	ut_ad(bb != NULL);
166 
167 	if (aa->ref_page_no != bb->ref_page_no) {
168 		return(aa->ref_page_no < bb->ref_page_no ? -1 : 1);
169 	}
170 	if (aa->ref_heap_no != bb->ref_heap_no) {
171 		return(aa->ref_heap_no < bb->ref_heap_no ? -1 : 1);
172 	}
173 	if (aa->ref_field_no != bb->ref_field_no) {
174 		return(aa->ref_field_no < bb->ref_field_no ? -1 : 1);
175 	}
176 	return(0);
177 }
178 
179 /**************************************************************//**
180 Add a reference to an off-page column to the index->blobs map. */
181 UNIV_INTERN
182 void
btr_blob_dbg_add_blob(const rec_t * rec,ulint field_no,ulint page_no,dict_index_t * index,const char * ctx)183 btr_blob_dbg_add_blob(
184 /*==================*/
185 	const rec_t*	rec,		/*!< in: clustered index record */
186 	ulint		field_no,	/*!< in: off-page column number */
187 	ulint		page_no,	/*!< in: start page of the column */
188 	dict_index_t*	index,		/*!< in/out: index tree */
189 	const char*	ctx)		/*!< in: context (for logging) */
190 {
191 	btr_blob_dbg_t	b;
192 	const page_t*	page	= page_align(rec);
193 
194 	ut_a(index->blobs);
195 
196 	b.blob_page_no = page_no;
197 	b.ref_page_no = page_get_page_no(page);
198 	b.ref_heap_no = page_rec_get_heap_no(rec);
199 	b.ref_field_no = field_no;
200 	ut_a(b.ref_field_no >= index->n_uniq);
201 	b.always_owner = b.owner = TRUE;
202 	b.del = FALSE;
203 	ut_a(!rec_get_deleted_flag(rec, page_is_comp(page)));
204 	btr_blob_dbg_rbt_insert(index, &b, ctx);
205 }
206 
207 /**************************************************************//**
208 Add to index->blobs any references to off-page columns from a record.
209 @return number of references added */
210 UNIV_INTERN
211 ulint
btr_blob_dbg_add_rec(const rec_t * rec,dict_index_t * index,const ulint * offsets,const char * ctx)212 btr_blob_dbg_add_rec(
213 /*=================*/
214 	const rec_t*	rec,	/*!< in: record */
215 	dict_index_t*	index,	/*!< in/out: index */
216 	const ulint*	offsets,/*!< in: offsets */
217 	const char*	ctx)	/*!< in: context (for logging) */
218 {
219 	ulint		count	= 0;
220 	ulint		i;
221 	btr_blob_dbg_t	b;
222 	ibool		del;
223 
224 	ut_ad(rec_offs_validate(rec, index, offsets));
225 
226 	if (!rec_offs_any_extern(offsets)) {
227 		return(0);
228 	}
229 
230 	b.ref_page_no = page_get_page_no(page_align(rec));
231 	b.ref_heap_no = page_rec_get_heap_no(rec);
232 	del = (rec_get_deleted_flag(rec, rec_offs_comp(offsets)) != 0);
233 
234 	for (i = 0; i < rec_offs_n_fields(offsets); i++) {
235 		if (rec_offs_nth_extern(offsets, i)) {
236 			ulint		len;
237 			const byte*	field_ref = rec_get_nth_field(
238 				rec, offsets, i, &len);
239 
240 			ut_a(len != UNIV_SQL_NULL);
241 			ut_a(len >= BTR_EXTERN_FIELD_REF_SIZE);
242 			field_ref += len - BTR_EXTERN_FIELD_REF_SIZE;
243 
244 			if (!memcmp(field_ref, field_ref_zero,
245 				    BTR_EXTERN_FIELD_REF_SIZE)) {
246 				/* the column has not been stored yet */
247 				continue;
248 			}
249 
250 			b.ref_field_no = i;
251 			b.blob_page_no = mach_read_from_4(
252 				field_ref + BTR_EXTERN_PAGE_NO);
253 			ut_a(b.ref_field_no >= index->n_uniq);
254 			b.always_owner = b.owner
255 				= !(field_ref[BTR_EXTERN_LEN]
256 				    & BTR_EXTERN_OWNER_FLAG);
257 			b.del = del;
258 
259 			btr_blob_dbg_rbt_insert(index, &b, ctx);
260 			count++;
261 		}
262 	}
263 
264 	return(count);
265 }
266 
267 /**************************************************************//**
268 Display the references to off-page columns.
269 This function is to be called from a debugger,
270 for example when a breakpoint on ut_dbg_assertion_failed is hit. */
271 UNIV_INTERN
272 void
btr_blob_dbg_print(const dict_index_t * index)273 btr_blob_dbg_print(
274 /*===============*/
275 	const dict_index_t*	index)	/*!< in: index tree */
276 {
277 	const ib_rbt_node_t*	node;
278 
279 	if (!index->blobs) {
280 		return;
281 	}
282 
283 	/* We intentionally do not acquire index->blobs_mutex here.
284 	This function is to be called from a debugger, and the caller
285 	should make sure that the index->blobs_mutex is held. */
286 
287 	for (node = rbt_first(index->blobs);
288 	     node != NULL; node = rbt_next(index->blobs, node)) {
289 		const btr_blob_dbg_t*	b
290 			= rbt_value(btr_blob_dbg_t, node);
291 		fprintf(stderr, "%u:%u:%u->%u%s%s%s\n",
292 			b->ref_page_no, b->ref_heap_no, b->ref_field_no,
293 			b->blob_page_no,
294 			b->owner ? "" : "(disowned)",
295 			b->always_owner ? "" : "(has disowned)",
296 			b->del ? "(deleted)" : "");
297 	}
298 }
299 
300 /**************************************************************//**
301 Remove from index->blobs any references to off-page columns from a record.
302 @return number of references removed */
303 UNIV_INTERN
304 ulint
btr_blob_dbg_remove_rec(const rec_t * rec,dict_index_t * index,const ulint * offsets,const char * ctx)305 btr_blob_dbg_remove_rec(
306 /*====================*/
307 	const rec_t*	rec,	/*!< in: record */
308 	dict_index_t*	index,	/*!< in/out: index */
309 	const ulint*	offsets,/*!< in: offsets */
310 	const char*	ctx)	/*!< in: context (for logging) */
311 {
312 	ulint		i;
313 	ulint		count	= 0;
314 	btr_blob_dbg_t	b;
315 
316 	ut_ad(rec_offs_validate(rec, index, offsets));
317 
318 	if (!rec_offs_any_extern(offsets)) {
319 		return(0);
320 	}
321 
322 	b.ref_page_no = page_get_page_no(page_align(rec));
323 	b.ref_heap_no = page_rec_get_heap_no(rec);
324 
325 	for (i = 0; i < rec_offs_n_fields(offsets); i++) {
326 		if (rec_offs_nth_extern(offsets, i)) {
327 			ulint		len;
328 			const byte*	field_ref = rec_get_nth_field(
329 				rec, offsets, i, &len);
330 
331 			ut_a(len != UNIV_SQL_NULL);
332 			ut_a(len >= BTR_EXTERN_FIELD_REF_SIZE);
333 			field_ref += len - BTR_EXTERN_FIELD_REF_SIZE;
334 
335 			b.ref_field_no = i;
336 			b.blob_page_no = mach_read_from_4(
337 				field_ref + BTR_EXTERN_PAGE_NO);
338 
339 			switch (b.blob_page_no) {
340 			case 0:
341 				/* The column has not been stored yet.
342 				The BLOB pointer must be all zero.
343 				There cannot be a BLOB starting at
344 				page 0, because page 0 is reserved for
345 				the tablespace header. */
346 				ut_a(!memcmp(field_ref, field_ref_zero,
347 					     BTR_EXTERN_FIELD_REF_SIZE));
348 				/* fall through */
349 			case FIL_NULL:
350 				/* the column has been freed already */
351 				continue;
352 			}
353 
354 			btr_blob_dbg_rbt_delete(index, &b, ctx);
355 			count++;
356 		}
357 	}
358 
359 	return(count);
360 }
361 
362 /**************************************************************//**
363 Check that there are no references to off-page columns from or to
364 the given page. Invoked when freeing or clearing a page.
365 @return TRUE when no orphan references exist */
366 UNIV_INTERN
367 ibool
btr_blob_dbg_is_empty(dict_index_t * index,ulint page_no)368 btr_blob_dbg_is_empty(
369 /*==================*/
370 	dict_index_t*	index,		/*!< in: index */
371 	ulint		page_no)	/*!< in: page number */
372 {
373 	const ib_rbt_node_t*	node;
374 	ibool			success	= TRUE;
375 
376 	if (!index->blobs) {
377 		return(success);
378 	}
379 
380 	mutex_enter(&index->blobs_mutex);
381 
382 	for (node = rbt_first(index->blobs);
383 	     node != NULL; node = rbt_next(index->blobs, node)) {
384 		const btr_blob_dbg_t*	b
385 			= rbt_value(btr_blob_dbg_t, node);
386 
387 		if (b->ref_page_no != page_no && b->blob_page_no != page_no) {
388 			continue;
389 		}
390 
391 		fprintf(stderr,
392 			"InnoDB: orphan BLOB ref%s%s%s %u:%u:%u->%u\n",
393 			b->owner ? "" : "(disowned)",
394 			b->always_owner ? "" : "(has disowned)",
395 			b->del ? "(deleted)" : "",
396 			b->ref_page_no, b->ref_heap_no, b->ref_field_no,
397 			b->blob_page_no);
398 
399 		if (b->blob_page_no != page_no || b->owner || !b->del) {
400 			success = FALSE;
401 		}
402 	}
403 
404 	mutex_exit(&index->blobs_mutex);
405 	return(success);
406 }
407 
408 /**************************************************************//**
409 Count and process all references to off-page columns on a page.
410 @return number of references processed */
411 UNIV_INTERN
412 ulint
btr_blob_dbg_op(const page_t * page,const rec_t * rec,dict_index_t * index,const char * ctx,const btr_blob_dbg_op_f op)413 btr_blob_dbg_op(
414 /*============*/
415 	const page_t*		page,	/*!< in: B-tree leaf page */
416 	const rec_t*		rec,	/*!< in: record to start from
417 					(NULL to process the whole page) */
418 	dict_index_t*		index,	/*!< in/out: index */
419 	const char*		ctx,	/*!< in: context (for logging) */
420 	const btr_blob_dbg_op_f	op)	/*!< in: operation on records */
421 {
422 	ulint		count	= 0;
423 	mem_heap_t*	heap	= NULL;
424 	ulint		offsets_[REC_OFFS_NORMAL_SIZE];
425 	ulint*		offsets	= offsets_;
426 	rec_offs_init(offsets_);
427 
428 	ut_a(fil_page_get_type(page) == FIL_PAGE_INDEX);
429 	ut_a(!rec || page_align(rec) == page);
430 
431 	if (!index->blobs || !page_is_leaf(page)
432 	    || !dict_index_is_clust(index)) {
433 		return(0);
434 	}
435 
436 	if (rec == NULL) {
437 		rec = page_get_infimum_rec(page);
438 	}
439 
440 	do {
441 		offsets = rec_get_offsets(rec, index, offsets,
442 					  ULINT_UNDEFINED, &heap);
443 		count += op(rec, index, offsets, ctx);
444 		rec = page_rec_get_next_const(rec);
445 	} while (!page_rec_is_supremum(rec));
446 
447 	if (heap) {
448 		mem_heap_free(heap);
449 	}
450 
451 	return(count);
452 }
453 
454 /**************************************************************//**
455 Count and add to index->blobs any references to off-page columns
456 from records on a page.
457 @return number of references added */
458 UNIV_INTERN
459 ulint
btr_blob_dbg_add(const page_t * page,dict_index_t * index,const char * ctx)460 btr_blob_dbg_add(
461 /*=============*/
462 	const page_t*	page,	/*!< in: rewritten page */
463 	dict_index_t*	index,	/*!< in/out: index */
464 	const char*	ctx)	/*!< in: context (for logging) */
465 {
466 	btr_blob_dbg_assert_empty(index, page_get_page_no(page));
467 
468 	return(btr_blob_dbg_op(page, NULL, index, ctx, btr_blob_dbg_add_rec));
469 }
470 
471 /**************************************************************//**
472 Count and remove from index->blobs any references to off-page columns
473 from records on a page.
474 Used when reorganizing a page, before copying the records.
475 @return number of references removed */
476 UNIV_INTERN
477 ulint
btr_blob_dbg_remove(const page_t * page,dict_index_t * index,const char * ctx)478 btr_blob_dbg_remove(
479 /*================*/
480 	const page_t*	page,	/*!< in: b-tree page */
481 	dict_index_t*	index,	/*!< in/out: index */
482 	const char*	ctx)	/*!< in: context (for logging) */
483 {
484 	ulint	count;
485 
486 	count = btr_blob_dbg_op(page, NULL, index, ctx,
487 				btr_blob_dbg_remove_rec);
488 
489 	/* Check that no references exist. */
490 	btr_blob_dbg_assert_empty(index, page_get_page_no(page));
491 
492 	return(count);
493 }
494 
495 /**************************************************************//**
496 Restore in index->blobs any references to off-page columns
497 Used when page reorganize fails due to compressed page overflow. */
498 UNIV_INTERN
499 void
btr_blob_dbg_restore(const page_t * npage,const page_t * page,dict_index_t * index,const char * ctx)500 btr_blob_dbg_restore(
501 /*=================*/
502 	const page_t*	npage,	/*!< in: page that failed to compress  */
503 	const page_t*	page,	/*!< in: copy of original page */
504 	dict_index_t*	index,	/*!< in/out: index */
505 	const char*	ctx)	/*!< in: context (for logging) */
506 {
507 	ulint	removed;
508 	ulint	added;
509 
510 	ut_a(page_get_page_no(npage) == page_get_page_no(page));
511 	ut_a(page_get_space_id(npage) == page_get_space_id(page));
512 
513 	removed = btr_blob_dbg_remove(npage, index, ctx);
514 	added = btr_blob_dbg_add(page, index, ctx);
515 	ut_a(added == removed);
516 }
517 
518 /**************************************************************//**
519 Modify the 'deleted' flag of a record. */
520 UNIV_INTERN
521 void
btr_blob_dbg_set_deleted_flag(const rec_t * rec,dict_index_t * index,const ulint * offsets,ibool del)522 btr_blob_dbg_set_deleted_flag(
523 /*==========================*/
524 	const rec_t*		rec,	/*!< in: record */
525 	dict_index_t*		index,	/*!< in/out: index */
526 	const ulint*		offsets,/*!< in: rec_get_offs(rec, index) */
527 	ibool			del)	/*!< in: TRUE=deleted, FALSE=exists */
528 {
529 	const ib_rbt_node_t*	node;
530 	btr_blob_dbg_t		b;
531 	btr_blob_dbg_t*		c;
532 	ulint			i;
533 
534 	ut_ad(rec_offs_validate(rec, index, offsets));
535 	ut_a(dict_index_is_clust(index));
536 	ut_a(del == !!del);/* must be FALSE==0 or TRUE==1 */
537 
538 	if (!rec_offs_any_extern(offsets) || !index->blobs) {
539 
540 		return;
541 	}
542 
543 	b.ref_page_no = page_get_page_no(page_align(rec));
544 	b.ref_heap_no = page_rec_get_heap_no(rec);
545 
546 	for (i = 0; i < rec_offs_n_fields(offsets); i++) {
547 		if (rec_offs_nth_extern(offsets, i)) {
548 			ulint		len;
549 			const byte*	field_ref = rec_get_nth_field(
550 				rec, offsets, i, &len);
551 
552 			ut_a(len != UNIV_SQL_NULL);
553 			ut_a(len >= BTR_EXTERN_FIELD_REF_SIZE);
554 			field_ref += len - BTR_EXTERN_FIELD_REF_SIZE;
555 
556 			b.ref_field_no = i;
557 			b.blob_page_no = mach_read_from_4(
558 				field_ref + BTR_EXTERN_PAGE_NO);
559 
560 			switch (b.blob_page_no) {
561 			case 0:
562 				ut_a(memcmp(field_ref, field_ref_zero,
563 					    BTR_EXTERN_FIELD_REF_SIZE));
564 				/* page number 0 is for the
565 				page allocation bitmap */
566 			case FIL_NULL:
567 				/* the column has been freed already */
568 				ut_error;
569 			}
570 
571 			mutex_enter(&index->blobs_mutex);
572 			node = rbt_lookup(index->blobs, &b);
573 			ut_a(node);
574 
575 			c = rbt_value(btr_blob_dbg_t, node);
576 			/* The flag should be modified. */
577 			c->del = del;
578 			if (btr_blob_dbg_msg) {
579 				b = *c;
580 				mutex_exit(&index->blobs_mutex);
581 				btr_blob_dbg_msg_issue("del_mk", &b, "");
582 			} else {
583 				mutex_exit(&index->blobs_mutex);
584 			}
585 		}
586 	}
587 }
588 
589 /**************************************************************//**
590 Change the ownership of an off-page column. */
591 UNIV_INTERN
592 void
btr_blob_dbg_owner(const rec_t * rec,dict_index_t * index,const ulint * offsets,ulint i,ibool own)593 btr_blob_dbg_owner(
594 /*===============*/
595 	const rec_t*		rec,	/*!< in: record */
596 	dict_index_t*		index,	/*!< in/out: index */
597 	const ulint*		offsets,/*!< in: rec_get_offs(rec, index) */
598 	ulint			i,	/*!< in: ith field in rec */
599 	ibool			own)	/*!< in: TRUE=owned, FALSE=disowned */
600 {
601 	const ib_rbt_node_t*	node;
602 	btr_blob_dbg_t		b;
603 	const byte*		field_ref;
604 	ulint			len;
605 
606 	ut_ad(rec_offs_validate(rec, index, offsets));
607 	ut_a(rec_offs_nth_extern(offsets, i));
608 
609 	field_ref = rec_get_nth_field(rec, offsets, i, &len);
610 	ut_a(len != UNIV_SQL_NULL);
611 	ut_a(len >= BTR_EXTERN_FIELD_REF_SIZE);
612 	field_ref += len - BTR_EXTERN_FIELD_REF_SIZE;
613 
614 	b.ref_page_no = page_get_page_no(page_align(rec));
615 	b.ref_heap_no = page_rec_get_heap_no(rec);
616 	b.ref_field_no = i;
617 	b.owner = !(field_ref[BTR_EXTERN_LEN] & BTR_EXTERN_OWNER_FLAG);
618 	b.blob_page_no = mach_read_from_4(field_ref + BTR_EXTERN_PAGE_NO);
619 
620 	ut_a(b.owner == own);
621 
622 	mutex_enter(&index->blobs_mutex);
623 	node = rbt_lookup(index->blobs, &b);
624 	/* row_ins_clust_index_entry_by_modify() invokes
625 	btr_cur_unmark_extern_fields() also for the newly inserted
626 	references, which are all zero bytes until the columns are stored.
627 	The node lookup must fail if and only if that is the case. */
628 	ut_a(!memcmp(field_ref, field_ref_zero, BTR_EXTERN_FIELD_REF_SIZE)
629 	     == !node);
630 
631 	if (node) {
632 		btr_blob_dbg_t*	c = rbt_value(btr_blob_dbg_t, node);
633 		/* Some code sets ownership from TRUE to TRUE.
634 		We do not allow changing ownership from FALSE to FALSE. */
635 		ut_a(own || c->owner);
636 
637 		c->owner = own;
638 		if (!own) {
639 			c->always_owner = FALSE;
640 		}
641 	}
642 
643 	mutex_exit(&index->blobs_mutex);
644 }
645 #endif /* UNIV_BLOB_DEBUG */
646 
647 /*
648 Latching strategy of the InnoDB B-tree
649 --------------------------------------
650 A tree latch protects all non-leaf nodes of the tree. Each node of a tree
651 also has a latch of its own.
652 
653 A B-tree operation normally first acquires an S-latch on the tree. It
654 searches down the tree and releases the tree latch when it has the
655 leaf node latch. To save CPU time we do not acquire any latch on
656 non-leaf nodes of the tree during a search, those pages are only bufferfixed.
657 
658 If an operation needs to restructure the tree, it acquires an X-latch on
659 the tree before searching to a leaf node. If it needs, for example, to
660 split a leaf,
661 (1) InnoDB decides the split point in the leaf,
662 (2) allocates a new page,
663 (3) inserts the appropriate node pointer to the first non-leaf level,
664 (4) releases the tree X-latch,
665 (5) and then moves records from the leaf to the new allocated page.
666 
667 Node pointers
668 -------------
669 Leaf pages of a B-tree contain the index records stored in the
670 tree. On levels n > 0 we store 'node pointers' to pages on level
671 n - 1. For each page there is exactly one node pointer stored:
672 thus the our tree is an ordinary B-tree, not a B-link tree.
673 
674 A node pointer contains a prefix P of an index record. The prefix
675 is long enough so that it determines an index record uniquely.
676 The file page number of the child page is added as the last
677 field. To the child page we can store node pointers or index records
678 which are >= P in the alphabetical order, but < P1 if there is
679 a next node pointer on the level, and P1 is its prefix.
680 
681 If a node pointer with a prefix P points to a non-leaf child,
682 then the leftmost record in the child must have the same
683 prefix P. If it points to a leaf node, the child is not required
684 to contain any record with a prefix equal to P. The leaf case
685 is decided this way to allow arbitrary deletions in a leaf node
686 without touching upper levels of the tree.
687 
688 We have predefined a special minimum record which we
689 define as the smallest record in any alphabetical order.
690 A minimum record is denoted by setting a bit in the record
691 header. A minimum record acts as the prefix of a node pointer
692 which points to a leftmost node on any level of the tree.
693 
694 File page allocation
695 --------------------
696 In the root node of a B-tree there are two file segment headers.
697 The leaf pages of a tree are allocated from one file segment, to
698 make them consecutive on disk if possible. From the other file segment
699 we allocate pages for the non-leaf levels of the tree.
700 */
701 
702 #ifdef UNIV_BTR_DEBUG
703 /**************************************************************//**
704 Checks a file segment header within a B-tree root page.
705 @return	TRUE if valid */
706 static
707 ibool
btr_root_fseg_validate(const fseg_header_t * seg_header,ulint space)708 btr_root_fseg_validate(
709 /*===================*/
710 	const fseg_header_t*	seg_header,	/*!< in: segment header */
711 	ulint			space)		/*!< in: tablespace identifier */
712 {
713 	ulint	offset = mach_read_from_2(seg_header + FSEG_HDR_OFFSET);
714 
715 	if (UNIV_UNLIKELY(srv_pass_corrupt_table != 0)) {
716 		return (mach_read_from_4(seg_header + FSEG_HDR_SPACE) == space)
717 			&& (offset >= FIL_PAGE_DATA)
718 			&& (offset <= UNIV_PAGE_SIZE - FIL_PAGE_DATA_END);
719 	}
720 
721 	ut_a(mach_read_from_4(seg_header + FSEG_HDR_SPACE) == space);
722 	ut_a(offset >= FIL_PAGE_DATA);
723 	ut_a(offset <= UNIV_PAGE_SIZE - FIL_PAGE_DATA_END);
724 	return(TRUE);
725 }
726 #endif /* UNIV_BTR_DEBUG */
727 
728 /**************************************************************//**
729 Gets the root node of a tree and x- or s-latches it.
730 @return	root page, x- or s-latched */
731 static
732 buf_block_t*
btr_root_block_get(const dict_index_t * index,ulint mode,mtr_t * mtr)733 btr_root_block_get(
734 /*===============*/
735 	const dict_index_t*	index,	/*!< in: index tree */
736 	ulint			mode,	/*!< in: either RW_S_LATCH
737 					or RW_X_LATCH */
738 	mtr_t*			mtr)	/*!< in: mtr */
739 {
740 	ulint		space;
741 	ulint		zip_size;
742 	ulint		root_page_no;
743 	buf_block_t*	block;
744 
745 	space = dict_index_get_space(index);
746 	zip_size = dict_table_zip_size(index->table);
747 	root_page_no = dict_index_get_page(index);
748 
749 	block = btr_block_get(space, zip_size, root_page_no, mode, index, mtr);
750 
751 	SRV_CORRUPT_TABLE_CHECK(block, return(0););
752 
753 	btr_assert_not_corrupted(block, index);
754 #ifdef UNIV_BTR_DEBUG
755 	if (!dict_index_is_ibuf(index)) {
756 		const page_t*	root = buf_block_get_frame(block);
757 
758 		if (UNIV_UNLIKELY(srv_pass_corrupt_table != 0)) {
759 			if (!btr_root_fseg_validate(FIL_PAGE_DATA
760 						    + PAGE_BTR_SEG_LEAF
761 						    + root, space))
762 				return(NULL);
763 			if (!btr_root_fseg_validate(FIL_PAGE_DATA
764 						    + PAGE_BTR_SEG_TOP
765 						    + root, space))
766 				return(NULL);
767 			return(block);
768 		}
769 		ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_LEAF
770 					    + root, space));
771 		ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP
772 					    + root, space));
773 	}
774 #endif /* UNIV_BTR_DEBUG */
775 
776 	return(block);
777 }
778 
779 /**************************************************************//**
780 Gets the root node of a tree and x-latches it.
781 @return	root page, x-latched */
782 UNIV_INTERN
783 page_t*
btr_root_get(const dict_index_t * index,mtr_t * mtr)784 btr_root_get(
785 /*=========*/
786 	const dict_index_t*	index,	/*!< in: index tree */
787 	mtr_t*			mtr)	/*!< in: mtr */
788 {
789 	return(buf_block_get_frame(btr_root_block_get(index, RW_X_LATCH,
790 						      mtr)));
791 }
792 
793 /**************************************************************//**
794 Gets the height of the B-tree (the level of the root, when the leaf
795 level is assumed to be 0). The caller must hold an S or X latch on
796 the index.
797 @return	tree height (level of the root) */
798 UNIV_INTERN
799 ulint
btr_height_get(dict_index_t * index,mtr_t * mtr)800 btr_height_get(
801 /*===========*/
802 	dict_index_t*	index,	/*!< in: index tree */
803 	mtr_t*		mtr)	/*!< in/out: mini-transaction */
804 {
805 	ulint		height;
806 	buf_block_t*	root_block;
807 
808 	ut_ad(mtr_memo_contains(mtr, dict_index_get_lock(index),
809 				MTR_MEMO_S_LOCK)
810 	      || mtr_memo_contains(mtr, dict_index_get_lock(index),
811 				MTR_MEMO_X_LOCK));
812 
813         /* S latches the page */
814         root_block = btr_root_block_get(index, RW_S_LATCH, mtr);
815 	ut_ad(root_block); // The index must not be corrupted
816 
817 	height = btr_page_get_level(buf_nonnull_block_get_frame(root_block),
818 				    mtr);
819 
820         /* Release the S latch on the root page. */
821         mtr_memo_release(mtr, root_block, MTR_MEMO_PAGE_S_FIX);
822 #ifdef UNIV_SYNC_DEBUG
823         sync_thread_reset_level(&root_block->lock);
824 #endif /* UNIV_SYNC_DEBUG */
825 
826 	return(height);
827 }
828 
829 /**************************************************************//**
830 Checks a file segment header within a B-tree root page and updates
831 the segment header space id.
832 @return	TRUE if valid */
833 static
834 bool
btr_root_fseg_adjust_on_import(fseg_header_t * seg_header,page_zip_des_t * page_zip,ulint space,mtr_t * mtr)835 btr_root_fseg_adjust_on_import(
836 /*===========================*/
837 	fseg_header_t*	seg_header,	/*!< in/out: segment header */
838 	page_zip_des_t*	page_zip,	/*!< in/out: compressed page,
839 					or NULL */
840 	ulint		space,		/*!< in: tablespace identifier */
841 	mtr_t*		mtr)		/*!< in/out: mini-transaction */
842 {
843 	ulint	offset = mach_read_from_2(seg_header + FSEG_HDR_OFFSET);
844 
845 	if (offset < FIL_PAGE_DATA
846 	    || offset > UNIV_PAGE_SIZE - FIL_PAGE_DATA_END) {
847 
848 		return(FALSE);
849 
850 	} else if (page_zip) {
851 		mach_write_to_4(seg_header + FSEG_HDR_SPACE, space);
852 		page_zip_write_header(page_zip, seg_header + FSEG_HDR_SPACE,
853 				      4, mtr);
854 	} else {
855 		mlog_write_ulint(seg_header + FSEG_HDR_SPACE,
856 				 space, MLOG_4BYTES, mtr);
857 	}
858 
859 	return(TRUE);
860 }
861 
862 /**************************************************************//**
863 Checks and adjusts the root node of a tree during IMPORT TABLESPACE.
864 @return error code, or DB_SUCCESS */
865 UNIV_INTERN
866 dberr_t
btr_root_adjust_on_import(const dict_index_t * index)867 btr_root_adjust_on_import(
868 /*======================*/
869 	const dict_index_t*	index)	/*!< in: index tree */
870 {
871 	dberr_t		err;
872 	mtr_t		mtr;
873 	page_t*		page;
874 	buf_block_t*	block;
875 	page_zip_des_t*	page_zip;
876 	dict_table_t*	table		= index->table;
877 	ulint		space_id	= dict_index_get_space(index);
878 	ulint		zip_size	= dict_table_zip_size(table);
879 	ulint		root_page_no	= dict_index_get_page(index);
880 
881 	mtr_start(&mtr);
882 
883 	mtr_set_log_mode(&mtr, MTR_LOG_NO_REDO);
884 
885 	DBUG_EXECUTE_IF("ib_import_trigger_corruption_3",
886 			return(DB_CORRUPTION););
887 
888 	block = btr_block_get(
889 		space_id, zip_size, root_page_no, RW_X_LATCH, index, &mtr);
890 
891 	page = buf_block_get_frame(block);
892 	page_zip = buf_block_get_page_zip(block);
893 
894 	/* Check that this is a B-tree page and both the PREV and NEXT
895 	pointers are FIL_NULL, because the root page does not have any
896 	siblings. */
897 	if (fil_page_get_type(page) != FIL_PAGE_INDEX
898 	    || fil_page_get_prev(page) != FIL_NULL
899 	    || fil_page_get_next(page) != FIL_NULL) {
900 
901 		err = DB_CORRUPTION;
902 
903 	} else if (dict_index_is_clust(index)) {
904 		bool	page_is_compact_format;
905 
906 		page_is_compact_format = page_is_comp(page) > 0;
907 
908 		/* Check if the page format and table format agree. */
909 		if (page_is_compact_format != dict_table_is_comp(table)) {
910 			err = DB_CORRUPTION;
911 		} else {
912 
913 			/* Check that the table flags and the tablespace
914 			flags match. */
915 			ulint	flags = fil_space_get_flags(table->space);
916 
917 			if (flags
918 			    && flags != dict_tf_to_fsp_flags(table->flags)) {
919 
920 				err = DB_CORRUPTION;
921 			} else {
922 				err = DB_SUCCESS;
923 			}
924 		}
925 	} else {
926 		err = DB_SUCCESS;
927 	}
928 
929 	/* Check and adjust the file segment headers, if all OK so far. */
930 	if (err == DB_SUCCESS
931 	    && (!btr_root_fseg_adjust_on_import(
932 			FIL_PAGE_DATA + PAGE_BTR_SEG_LEAF
933 			+ page, page_zip, space_id, &mtr)
934 		|| !btr_root_fseg_adjust_on_import(
935 			FIL_PAGE_DATA + PAGE_BTR_SEG_TOP
936 			+ page, page_zip, space_id, &mtr))) {
937 
938 		err = DB_CORRUPTION;
939 	}
940 
941 	mtr_commit(&mtr);
942 
943 	return(err);
944 }
945 
946 /*************************************************************//**
947 Gets pointer to the previous user record in the tree. It is assumed that
948 the caller has appropriate latches on the page and its neighbor.
949 @return	previous user record, NULL if there is none */
950 UNIV_INTERN
951 rec_t*
btr_get_prev_user_rec(rec_t * rec,mtr_t * mtr)952 btr_get_prev_user_rec(
953 /*==================*/
954 	rec_t*	rec,	/*!< in: record on leaf level */
955 	mtr_t*	mtr)	/*!< in: mtr holding a latch on the page, and if
956 			needed, also to the previous page */
957 {
958 	page_t*	page;
959 	page_t*	prev_page;
960 	ulint	prev_page_no;
961 
962 	if (!page_rec_is_infimum(rec)) {
963 
964 		rec_t*	prev_rec = page_rec_get_prev(rec);
965 
966 		if (!page_rec_is_infimum(prev_rec)) {
967 
968 			return(prev_rec);
969 		}
970 	}
971 
972 	page = page_align(rec);
973 	prev_page_no = btr_page_get_prev(page, mtr);
974 
975 	if (prev_page_no != FIL_NULL) {
976 
977 		ulint		space;
978 		ulint		zip_size;
979 		buf_block_t*	prev_block;
980 
981 		space = page_get_space_id(page);
982 		zip_size = fil_space_get_zip_size(space);
983 
984 		prev_block = buf_page_get_with_no_latch(space, zip_size,
985 							prev_page_no, mtr);
986 		prev_page = buf_block_get_frame(prev_block);
987 		/* The caller must already have a latch to the brother */
988 		ut_ad(mtr_memo_contains(mtr, prev_block,
989 					MTR_MEMO_PAGE_S_FIX)
990 		      || mtr_memo_contains(mtr, prev_block,
991 					   MTR_MEMO_PAGE_X_FIX));
992 #ifdef UNIV_BTR_DEBUG
993 		ut_a(page_is_comp(prev_page) == page_is_comp(page));
994 		ut_a(btr_page_get_next(prev_page, mtr)
995 		     == page_get_page_no(page));
996 #endif /* UNIV_BTR_DEBUG */
997 
998 		return(page_rec_get_prev(page_get_supremum_rec(prev_page)));
999 	}
1000 
1001 	return(NULL);
1002 }
1003 
1004 /*************************************************************//**
1005 Gets pointer to the next user record in the tree. It is assumed that the
1006 caller has appropriate latches on the page and its neighbor.
1007 @return	next user record, NULL if there is none */
1008 UNIV_INTERN
1009 rec_t*
btr_get_next_user_rec(rec_t * rec,mtr_t * mtr)1010 btr_get_next_user_rec(
1011 /*==================*/
1012 	rec_t*	rec,	/*!< in: record on leaf level */
1013 	mtr_t*	mtr)	/*!< in: mtr holding a latch on the page, and if
1014 			needed, also to the next page */
1015 {
1016 	page_t*	page;
1017 	page_t*	next_page;
1018 	ulint	next_page_no;
1019 
1020 	if (!page_rec_is_supremum(rec)) {
1021 
1022 		rec_t*	next_rec = page_rec_get_next(rec);
1023 
1024 		if (!page_rec_is_supremum(next_rec)) {
1025 
1026 			return(next_rec);
1027 		}
1028 	}
1029 
1030 	page = page_align(rec);
1031 	next_page_no = btr_page_get_next(page, mtr);
1032 
1033 	if (next_page_no != FIL_NULL) {
1034 		ulint		space;
1035 		ulint		zip_size;
1036 		buf_block_t*	next_block;
1037 
1038 		space = page_get_space_id(page);
1039 		zip_size = fil_space_get_zip_size(space);
1040 
1041 		next_block = buf_page_get_with_no_latch(space, zip_size,
1042 							next_page_no, mtr);
1043 		next_page = buf_block_get_frame(next_block);
1044 		/* The caller must already have a latch to the brother */
1045 		ut_ad(mtr_memo_contains(mtr, next_block, MTR_MEMO_PAGE_S_FIX)
1046 		      || mtr_memo_contains(mtr, next_block,
1047 					   MTR_MEMO_PAGE_X_FIX));
1048 #ifdef UNIV_BTR_DEBUG
1049 		ut_a(page_is_comp(next_page) == page_is_comp(page));
1050 		ut_a(btr_page_get_prev(next_page, mtr)
1051 		     == page_get_page_no(page));
1052 #endif /* UNIV_BTR_DEBUG */
1053 
1054 		return(page_rec_get_next(page_get_infimum_rec(next_page)));
1055 	}
1056 
1057 	return(NULL);
1058 }
1059 
1060 /**************************************************************//**
1061 Creates a new index page (not the root, and also not
1062 used in page reorganization).  @see btr_page_empty(). */
1063 UNIV_INTERN
1064 void
btr_page_create(buf_block_t * block,page_zip_des_t * page_zip,dict_index_t * index,ulint level,mtr_t * mtr)1065 btr_page_create(
1066 /*============*/
1067 	buf_block_t*	block,	/*!< in/out: page to be created */
1068 	page_zip_des_t*	page_zip,/*!< in/out: compressed page, or NULL */
1069 	dict_index_t*	index,	/*!< in: index */
1070 	ulint		level,	/*!< in: the B-tree level of the page */
1071 	mtr_t*		mtr)	/*!< in: mtr */
1072 {
1073 	page_t*		page = buf_block_get_frame(block);
1074 
1075 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
1076 	btr_blob_dbg_assert_empty(index, buf_block_get_page_no(block));
1077 
1078 	if (page_zip) {
1079 		page_create_zip(block, index, level, 0, mtr);
1080 	} else {
1081 		page_create(block, mtr, dict_table_is_comp(index->table));
1082 		/* Set the level of the new index page */
1083 		btr_page_set_level(page, NULL, level, mtr);
1084 	}
1085 
1086 	block->check_index_page_at_flush = TRUE;
1087 
1088 	btr_page_set_index_id(page, page_zip, index->id, mtr);
1089 }
1090 
1091 /**************************************************************//**
1092 Allocates a new file page to be used in an ibuf tree. Takes the page from
1093 the free list of the tree, which must contain pages!
1094 @return	new allocated block, x-latched */
1095 static
1096 buf_block_t*
btr_page_alloc_for_ibuf(dict_index_t * index,mtr_t * mtr)1097 btr_page_alloc_for_ibuf(
1098 /*====================*/
1099 	dict_index_t*	index,	/*!< in: index tree */
1100 	mtr_t*		mtr)	/*!< in: mtr */
1101 {
1102 	fil_addr_t	node_addr;
1103 	page_t*		root;
1104 	page_t*		new_page;
1105 	buf_block_t*	new_block;
1106 
1107 	root = btr_root_get(index, mtr);
1108 
1109 	node_addr = flst_get_first(root + PAGE_HEADER
1110 				   + PAGE_BTR_IBUF_FREE_LIST, mtr);
1111 	ut_a(node_addr.page != FIL_NULL);
1112 
1113 	new_block = buf_page_get(dict_index_get_space(index),
1114 				 dict_table_zip_size(index->table),
1115 				 node_addr.page, RW_X_LATCH, mtr);
1116 	new_page = buf_block_get_frame(new_block);
1117 	buf_block_dbg_add_level(new_block, SYNC_IBUF_TREE_NODE_NEW);
1118 
1119 	flst_remove(root + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST,
1120 		    new_page + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST_NODE,
1121 		    mtr);
1122 	ut_ad(flst_validate(root + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST,
1123 			    mtr));
1124 
1125 	return(new_block);
1126 }
1127 
1128 /**************************************************************//**
1129 Allocates a new file page to be used in an index tree. NOTE: we assume
1130 that the caller has made the reservation for free extents!
1131 @retval NULL if no page could be allocated
1132 @retval block, rw_lock_x_lock_count(&block->lock) == 1 if allocation succeeded
1133 (init_mtr == mtr, or the page was not previously freed in mtr)
1134 @retval block (not allocated or initialized) otherwise */
1135 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1136 buf_block_t*
btr_page_alloc_low(dict_index_t * index,ulint hint_page_no,byte file_direction,ulint level,mtr_t * mtr,mtr_t * init_mtr)1137 btr_page_alloc_low(
1138 /*===============*/
1139 	dict_index_t*	index,		/*!< in: index */
1140 	ulint		hint_page_no,	/*!< in: hint of a good page */
1141 	byte		file_direction,	/*!< in: direction where a possible
1142 					page split is made */
1143 	ulint		level,		/*!< in: level where the page is placed
1144 					in the tree */
1145 	mtr_t*		mtr,		/*!< in/out: mini-transaction
1146 					for the allocation */
1147 	mtr_t*		init_mtr)	/*!< in/out: mtr or another
1148 					mini-transaction in which the
1149 					page should be initialized.
1150 					If init_mtr!=mtr, but the page
1151 					is already X-latched in mtr, do
1152 					not initialize the page. */
1153 {
1154 	fseg_header_t*	seg_header;
1155 	page_t*		root;
1156 
1157 	root = btr_root_get(index, mtr);
1158 
1159 	if (level == 0) {
1160 		seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_LEAF;
1161 	} else {
1162 		seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_TOP;
1163 	}
1164 
1165 	/* Parameter TRUE below states that the caller has made the
1166 	reservation for free extents, and thus we know that a page can
1167 	be allocated: */
1168 
1169 	return(fseg_alloc_free_page_general(
1170 		       seg_header, hint_page_no, file_direction,
1171 		       TRUE, mtr, init_mtr));
1172 }
1173 
1174 /**************************************************************//**
1175 Allocates a new file page to be used in an index tree. NOTE: we assume
1176 that the caller has made the reservation for free extents!
1177 @retval NULL if no page could be allocated
1178 @retval block, rw_lock_x_lock_count(&block->lock) == 1 if allocation succeeded
1179 (init_mtr == mtr, or the page was not previously freed in mtr)
1180 @retval block (not allocated or initialized) otherwise */
1181 UNIV_INTERN
1182 buf_block_t*
btr_page_alloc(dict_index_t * index,ulint hint_page_no,byte file_direction,ulint level,mtr_t * mtr,mtr_t * init_mtr)1183 btr_page_alloc(
1184 /*===========*/
1185 	dict_index_t*	index,		/*!< in: index */
1186 	ulint		hint_page_no,	/*!< in: hint of a good page */
1187 	byte		file_direction,	/*!< in: direction where a possible
1188 					page split is made */
1189 	ulint		level,		/*!< in: level where the page is placed
1190 					in the tree */
1191 	mtr_t*		mtr,		/*!< in/out: mini-transaction
1192 					for the allocation */
1193 	mtr_t*		init_mtr)	/*!< in/out: mini-transaction
1194 					for x-latching and initializing
1195 					the page */
1196 {
1197 	buf_block_t*	new_block;
1198 
1199 	if (dict_index_is_ibuf(index)) {
1200 
1201 		return(btr_page_alloc_for_ibuf(index, mtr));
1202 	}
1203 
1204 	new_block = btr_page_alloc_low(
1205 		index, hint_page_no, file_direction, level, mtr, init_mtr);
1206 
1207 	if (new_block) {
1208 		buf_block_dbg_add_level(new_block, SYNC_TREE_NODE_NEW);
1209 	}
1210 
1211 	return(new_block);
1212 }
1213 
1214 /**************************************************************//**
1215 Gets the number of pages in a B-tree.
1216 @return	number of pages, or ULINT_UNDEFINED if the index is unavailable */
1217 UNIV_INTERN
1218 ulint
btr_get_size(dict_index_t * index,ulint flag,mtr_t * mtr)1219 btr_get_size(
1220 /*=========*/
1221 	dict_index_t*	index,	/*!< in: index */
1222 	ulint		flag,	/*!< in: BTR_N_LEAF_PAGES or BTR_TOTAL_SIZE */
1223 	mtr_t*		mtr)	/*!< in/out: mini-transaction where index
1224 				is s-latched */
1225 {
1226 	fseg_header_t*	seg_header;
1227 	page_t*		root;
1228 	ulint		n;
1229 	ulint		dummy;
1230 
1231 	ut_ad(mtr_memo_contains(mtr, dict_index_get_lock(index),
1232 				MTR_MEMO_S_LOCK));
1233 
1234 	if (index->page == FIL_NULL || dict_index_is_online_ddl(index)
1235 	    || *index->name == TEMP_INDEX_PREFIX) {
1236 		return(ULINT_UNDEFINED);
1237 	}
1238 
1239 	root = btr_root_get(index, mtr);
1240 
1241 	SRV_CORRUPT_TABLE_CHECK(root,
1242 	{
1243 		mtr_commit(mtr);
1244 		return(ULINT_UNDEFINED);
1245 	});
1246 
1247 	if (flag == BTR_N_LEAF_PAGES) {
1248 		seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_LEAF;
1249 
1250 		fseg_n_reserved_pages(seg_header, &n, mtr);
1251 
1252 	} else if (flag == BTR_TOTAL_SIZE) {
1253 		seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_TOP;
1254 
1255 		n = fseg_n_reserved_pages(seg_header, &dummy, mtr);
1256 
1257 		seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_LEAF;
1258 
1259 		n += fseg_n_reserved_pages(seg_header, &dummy, mtr);
1260 	} else {
1261 		ut_error;
1262 	}
1263 
1264 	return(n);
1265 }
1266 
1267 /**************************************************************//**
1268 Frees a page used in an ibuf tree. Puts the page to the free list of the
1269 ibuf tree. */
1270 static
1271 void
btr_page_free_for_ibuf(dict_index_t * index,buf_block_t * block,mtr_t * mtr)1272 btr_page_free_for_ibuf(
1273 /*===================*/
1274 	dict_index_t*	index,	/*!< in: index tree */
1275 	buf_block_t*	block,	/*!< in: block to be freed, x-latched */
1276 	mtr_t*		mtr)	/*!< in: mtr */
1277 {
1278 	page_t*		root;
1279 
1280 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
1281 	root = btr_root_get(index, mtr);
1282 
1283 	flst_add_first(root + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST,
1284 		       buf_block_get_frame(block)
1285 		       + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST_NODE, mtr);
1286 
1287 	ut_ad(flst_validate(root + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST,
1288 			    mtr));
1289 }
1290 
1291 /**************************************************************//**
1292 Frees a file page used in an index tree. Can be used also to (BLOB)
1293 external storage pages, because the page level 0 can be given as an
1294 argument. */
1295 UNIV_INTERN
1296 void
btr_page_free_low(dict_index_t * index,buf_block_t * block,ulint level,mtr_t * mtr)1297 btr_page_free_low(
1298 /*==============*/
1299 	dict_index_t*	index,	/*!< in: index tree */
1300 	buf_block_t*	block,	/*!< in: block to be freed, x-latched */
1301 	ulint		level,	/*!< in: page level */
1302 	mtr_t*		mtr)	/*!< in: mtr */
1303 {
1304 	fseg_header_t*	seg_header;
1305 	page_t*		root;
1306 
1307 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
1308 	/* The page gets invalid for optimistic searches: increment the frame
1309 	modify clock */
1310 
1311 	buf_block_modify_clock_inc(block);
1312 	btr_blob_dbg_assert_empty(index, buf_block_get_page_no(block));
1313 
1314 	if (dict_index_is_ibuf(index)) {
1315 
1316 		btr_page_free_for_ibuf(index, block, mtr);
1317 
1318 		return;
1319 	}
1320 
1321 	root = btr_root_get(index, mtr);
1322 
1323 	if (level == 0) {
1324 		seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_LEAF;
1325 	} else {
1326 		seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_TOP;
1327 	}
1328 
1329 	fseg_free_page(seg_header,
1330 		       buf_block_get_space(block),
1331 		       buf_block_get_page_no(block), mtr);
1332 
1333 	/* The page was marked free in the allocation bitmap, but it
1334 	should remain buffer-fixed until mtr_commit(mtr) or until it
1335 	is explicitly freed from the mini-transaction. */
1336 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
1337 	/* TODO: Discard any operations on the page from the redo log
1338 	and remove the block from the flush list and the buffer pool.
1339 	This would free up buffer pool earlier and reduce writes to
1340 	both the tablespace and the redo log. */
1341 }
1342 
1343 /**************************************************************//**
1344 Frees a file page used in an index tree. NOTE: cannot free field external
1345 storage pages because the page must contain info on its level. */
1346 UNIV_INTERN
1347 void
btr_page_free(dict_index_t * index,buf_block_t * block,mtr_t * mtr)1348 btr_page_free(
1349 /*==========*/
1350 	dict_index_t*	index,	/*!< in: index tree */
1351 	buf_block_t*	block,	/*!< in: block to be freed, x-latched */
1352 	mtr_t*		mtr)	/*!< in: mtr */
1353 {
1354 	const page_t*	page	= buf_nonnull_block_get_frame(block);
1355 	ulint		level	= btr_page_get_level(page, mtr);
1356 
1357 	ut_ad(fil_page_get_type(block->frame) == FIL_PAGE_INDEX);
1358 	btr_page_free_low(index, block, level, mtr);
1359 }
1360 
1361 /**************************************************************//**
1362 Sets the child node file address in a node pointer. */
1363 UNIV_INLINE
1364 void
btr_node_ptr_set_child_page_no(rec_t * rec,page_zip_des_t * page_zip,const ulint * offsets,ulint page_no,mtr_t * mtr)1365 btr_node_ptr_set_child_page_no(
1366 /*===========================*/
1367 	rec_t*		rec,	/*!< in: node pointer record */
1368 	page_zip_des_t*	page_zip,/*!< in/out: compressed page whose uncompressed
1369 				part will be updated, or NULL */
1370 	const ulint*	offsets,/*!< in: array returned by rec_get_offsets() */
1371 	ulint		page_no,/*!< in: child node address */
1372 	mtr_t*		mtr)	/*!< in: mtr */
1373 {
1374 	byte*	field;
1375 	ulint	len;
1376 
1377 	ut_ad(rec_offs_validate(rec, NULL, offsets));
1378 	ut_ad(!page_is_leaf(page_align(rec)));
1379 	ut_ad(!rec_offs_comp(offsets) || rec_get_node_ptr_flag(rec));
1380 
1381 	/* The child address is in the last field */
1382 	field = rec_get_nth_field(rec, offsets,
1383 				  rec_offs_n_fields(offsets) - 1, &len);
1384 
1385 	ut_ad(len == REC_NODE_PTR_SIZE);
1386 
1387 	if (page_zip) {
1388 		page_zip_write_node_ptr(page_zip, rec,
1389 					rec_offs_data_size(offsets),
1390 					page_no, mtr);
1391 	} else {
1392 		mlog_write_ulint(field, page_no, MLOG_4BYTES, mtr);
1393 	}
1394 }
1395 
1396 /************************************************************//**
1397 Returns the child page of a node pointer and x-latches it.
1398 @return	child page, x-latched */
1399 static
1400 buf_block_t*
btr_node_ptr_get_child(const rec_t * node_ptr,dict_index_t * index,const ulint * offsets,mtr_t * mtr)1401 btr_node_ptr_get_child(
1402 /*===================*/
1403 	const rec_t*	node_ptr,/*!< in: node pointer */
1404 	dict_index_t*	index,	/*!< in: index */
1405 	const ulint*	offsets,/*!< in: array returned by rec_get_offsets() */
1406 	mtr_t*		mtr)	/*!< in: mtr */
1407 {
1408 	ulint	page_no;
1409 	ulint	space;
1410 
1411 	ut_ad(rec_offs_validate(node_ptr, index, offsets));
1412 	space = page_get_space_id(page_align(node_ptr));
1413 	page_no = btr_node_ptr_get_child_page_no(node_ptr, offsets);
1414 
1415 	return(btr_block_get(space, dict_table_zip_size(index->table),
1416 			     page_no, RW_X_LATCH, index, mtr));
1417 }
1418 
1419 /************************************************************//**
1420 Returns the upper level node pointer to a page. It is assumed that mtr holds
1421 an x-latch on the tree.
1422 @return	rec_get_offsets() of the node pointer record */
1423 static
1424 ulint*
btr_page_get_father_node_ptr_func(ulint * offsets,mem_heap_t * heap,btr_cur_t * cursor,const char * file,ulint line,mtr_t * mtr)1425 btr_page_get_father_node_ptr_func(
1426 /*==============================*/
1427 	ulint*		offsets,/*!< in: work area for the return value */
1428 	mem_heap_t*	heap,	/*!< in: memory heap to use */
1429 	btr_cur_t*	cursor,	/*!< in: cursor pointing to user record,
1430 				out: cursor on node pointer record,
1431 				its page x-latched */
1432 	const char*	file,	/*!< in: file name */
1433 	ulint		line,	/*!< in: line where called */
1434 	mtr_t*		mtr)	/*!< in: mtr */
1435 {
1436 	dtuple_t*	tuple;
1437 	rec_t*		user_rec;
1438 	rec_t*		node_ptr;
1439 	ulint		level;
1440 	ulint		page_no;
1441 	dict_index_t*	index;
1442 
1443 	page_no = buf_block_get_page_no(btr_cur_get_block(cursor));
1444 	index = btr_cur_get_index(cursor);
1445 
1446 	ut_ad(mtr_memo_contains(mtr, dict_index_get_lock(index),
1447 				MTR_MEMO_X_LOCK));
1448 
1449 	ut_ad(dict_index_get_page(index) != page_no);
1450 
1451 	level = btr_page_get_level(btr_cur_get_page(cursor), mtr);
1452 
1453 	user_rec = btr_cur_get_rec(cursor);
1454 	ut_a(page_rec_is_user_rec(user_rec));
1455 	tuple = dict_index_build_node_ptr(index, user_rec, 0, heap, level);
1456 
1457 	btr_cur_search_to_nth_level(index, level + 1, tuple, PAGE_CUR_LE,
1458 				    BTR_CONT_MODIFY_TREE, cursor, 0,
1459 				    file, line, mtr);
1460 
1461 	node_ptr = btr_cur_get_rec(cursor);
1462 	ut_ad(!page_rec_is_comp(node_ptr)
1463 	      || rec_get_status(node_ptr) == REC_STATUS_NODE_PTR);
1464 	offsets = rec_get_offsets(node_ptr, index, offsets,
1465 				  ULINT_UNDEFINED, &heap);
1466 
1467 	if (btr_node_ptr_get_child_page_no(node_ptr, offsets) != page_no) {
1468 		rec_t*	print_rec;
1469 		fputs("InnoDB: Dump of the child page:\n", stderr);
1470 		buf_page_print(page_align(user_rec), 0,
1471 			       BUF_PAGE_PRINT_NO_CRASH);
1472 		fputs("InnoDB: Dump of the parent page:\n", stderr);
1473 		buf_page_print(page_align(node_ptr), 0,
1474 			       BUF_PAGE_PRINT_NO_CRASH);
1475 
1476 		fputs("InnoDB: Corruption of an index tree: table ", stderr);
1477 		ut_print_name(stderr, NULL, TRUE, index->table_name);
1478 		fputs(", index ", stderr);
1479 		ut_print_name(stderr, NULL, FALSE, index->name);
1480 		fprintf(stderr, ",\n"
1481 			"InnoDB: father ptr page no %lu, child page no %lu\n",
1482 			(ulong)
1483 			btr_node_ptr_get_child_page_no(node_ptr, offsets),
1484 			(ulong) page_no);
1485 		print_rec = page_rec_get_next(
1486 			page_get_infimum_rec(page_align(user_rec)));
1487 		offsets = rec_get_offsets(print_rec, index,
1488 					  offsets, ULINT_UNDEFINED, &heap);
1489 		page_rec_print(print_rec, offsets);
1490 		offsets = rec_get_offsets(node_ptr, index, offsets,
1491 					  ULINT_UNDEFINED, &heap);
1492 		page_rec_print(node_ptr, offsets);
1493 
1494 		fputs("InnoDB: You should dump + drop + reimport the table"
1495 		      " to fix the\n"
1496 		      "InnoDB: corruption. If the crash happens at "
1497 		      "the database startup, see\n"
1498 		      "InnoDB: " REFMAN "forcing-innodb-recovery.html about\n"
1499 		      "InnoDB: forcing recovery. "
1500 		      "Then dump + drop + reimport.\n", stderr);
1501 
1502 		ut_error;
1503 	}
1504 
1505 	return(offsets);
1506 }
1507 
1508 #define btr_page_get_father_node_ptr(of,heap,cur,mtr)			\
1509 	btr_page_get_father_node_ptr_func(of,heap,cur,__FILE__,__LINE__,mtr)
1510 
1511 /************************************************************//**
1512 Returns the upper level node pointer to a page. It is assumed that mtr holds
1513 an x-latch on the tree.
1514 @return	rec_get_offsets() of the node pointer record */
1515 static
1516 ulint*
btr_page_get_father_block(ulint * offsets,mem_heap_t * heap,dict_index_t * index,buf_block_t * block,mtr_t * mtr,btr_cur_t * cursor)1517 btr_page_get_father_block(
1518 /*======================*/
1519 	ulint*		offsets,/*!< in: work area for the return value */
1520 	mem_heap_t*	heap,	/*!< in: memory heap to use */
1521 	dict_index_t*	index,	/*!< in: b-tree index */
1522 	buf_block_t*	block,	/*!< in: child page in the index */
1523 	mtr_t*		mtr,	/*!< in: mtr */
1524 	btr_cur_t*	cursor)	/*!< out: cursor on node pointer record,
1525 				its page x-latched */
1526 {
1527 	rec_t*	rec
1528 		= page_rec_get_next(page_get_infimum_rec(buf_block_get_frame(
1529 								 block)));
1530 	btr_cur_position(index, rec, block, cursor);
1531 	return(btr_page_get_father_node_ptr(offsets, heap, cursor, mtr));
1532 }
1533 
1534 /************************************************************//**
1535 Seeks to the upper level node pointer to a page.
1536 It is assumed that mtr holds an x-latch on the tree. */
1537 static
1538 void
btr_page_get_father(dict_index_t * index,buf_block_t * block,mtr_t * mtr,btr_cur_t * cursor)1539 btr_page_get_father(
1540 /*================*/
1541 	dict_index_t*	index,	/*!< in: b-tree index */
1542 	buf_block_t*	block,	/*!< in: child page in the index */
1543 	mtr_t*		mtr,	/*!< in: mtr */
1544 	btr_cur_t*	cursor)	/*!< out: cursor on node pointer record,
1545 				its page x-latched */
1546 {
1547 	mem_heap_t*	heap;
1548 	rec_t*		rec
1549 		= page_rec_get_next(page_get_infimum_rec(buf_block_get_frame(
1550 								 block)));
1551 	btr_cur_position(index, rec, block, cursor);
1552 
1553 	heap = mem_heap_create(100);
1554 	btr_page_get_father_node_ptr(NULL, heap, cursor, mtr);
1555 	mem_heap_free(heap);
1556 }
1557 
1558 /************************************************************//**
1559 Creates the root node for a new index tree.
1560 @return	page number of the created root, FIL_NULL if did not succeed */
1561 UNIV_INTERN
1562 ulint
btr_create(ulint type,ulint space,ulint zip_size,index_id_t index_id,dict_index_t * index,mtr_t * mtr)1563 btr_create(
1564 /*=======*/
1565 	ulint		type,	/*!< in: type of the index */
1566 	ulint		space,	/*!< in: space where created */
1567 	ulint		zip_size,/*!< in: compressed page size in bytes
1568 				or 0 for uncompressed pages */
1569 	index_id_t	index_id,/*!< in: index id */
1570 	dict_index_t*	index,	/*!< in: index */
1571 	mtr_t*		mtr)	/*!< in: mini-transaction handle */
1572 {
1573 	ulint		page_no;
1574 	buf_block_t*	block;
1575 	buf_frame_t*	frame;
1576 	page_t*		page;
1577 	page_zip_des_t*	page_zip;
1578 
1579 	/* Create the two new segments (one, in the case of an ibuf tree) for
1580 	the index tree; the segment headers are put on the allocated root page
1581 	(for an ibuf tree, not in the root, but on a separate ibuf header
1582 	page) */
1583 
1584 	if (type & DICT_IBUF) {
1585 		/* Allocate first the ibuf header page */
1586 		buf_block_t*	ibuf_hdr_block = fseg_create(
1587 			space, 0,
1588 			IBUF_HEADER + IBUF_TREE_SEG_HEADER, mtr);
1589 
1590 		buf_block_dbg_add_level(
1591 			ibuf_hdr_block, SYNC_IBUF_TREE_NODE_NEW);
1592 
1593 		ut_ad(buf_block_get_page_no(ibuf_hdr_block)
1594 		      == IBUF_HEADER_PAGE_NO);
1595 		/* Allocate then the next page to the segment: it will be the
1596 		tree root page */
1597 
1598 		block = fseg_alloc_free_page(
1599 			buf_block_get_frame(ibuf_hdr_block)
1600 			+ IBUF_HEADER + IBUF_TREE_SEG_HEADER,
1601 			IBUF_TREE_ROOT_PAGE_NO,
1602 			FSP_UP, mtr);
1603 		ut_ad(buf_block_get_page_no(block) == IBUF_TREE_ROOT_PAGE_NO);
1604 	} else {
1605 #ifdef UNIV_BLOB_DEBUG
1606 		if ((type & DICT_CLUSTERED) && !index->blobs) {
1607 			mutex_create(PFS_NOT_INSTRUMENTED,
1608 				     &index->blobs_mutex, SYNC_ANY_LATCH);
1609 			index->blobs = rbt_create(sizeof(btr_blob_dbg_t),
1610 						  btr_blob_dbg_cmp);
1611 		}
1612 #endif /* UNIV_BLOB_DEBUG */
1613 		block = fseg_create(space, 0,
1614 				    PAGE_HEADER + PAGE_BTR_SEG_TOP, mtr);
1615 	}
1616 
1617 	if (block == NULL) {
1618 
1619 		return(FIL_NULL);
1620 	}
1621 
1622 	page_no = buf_block_get_page_no(block);
1623 	frame = buf_block_get_frame(block);
1624 
1625 	if (type & DICT_IBUF) {
1626 		/* It is an insert buffer tree: initialize the free list */
1627 		buf_block_dbg_add_level(block, SYNC_IBUF_TREE_NODE_NEW);
1628 
1629 		ut_ad(page_no == IBUF_TREE_ROOT_PAGE_NO);
1630 
1631 		flst_init(frame + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST, mtr);
1632 	} else {
1633 		/* It is a non-ibuf tree: create a file segment for leaf
1634 		pages */
1635 		buf_block_dbg_add_level(block, SYNC_TREE_NODE_NEW);
1636 
1637 		if (!fseg_create(space, page_no,
1638 				 PAGE_HEADER + PAGE_BTR_SEG_LEAF, mtr)) {
1639 			/* Not enough space for new segment, free root
1640 			segment before return. */
1641 			btr_free_root(space, zip_size, page_no, mtr);
1642 
1643 			return(FIL_NULL);
1644 		}
1645 
1646 		/* The fseg create acquires a second latch on the page,
1647 		therefore we must declare it: */
1648 		buf_block_dbg_add_level(block, SYNC_TREE_NODE_NEW);
1649 	}
1650 
1651 	/* Create a new index page on the allocated segment page */
1652 	page_zip = buf_block_get_page_zip(block);
1653 
1654 	if (page_zip) {
1655 		page = page_create_zip(block, index, 0, 0, mtr);
1656 	} else {
1657 		page = page_create(block, mtr,
1658 				   dict_table_is_comp(index->table));
1659 		/* Set the level of the new index page */
1660 		btr_page_set_level(page, NULL, 0, mtr);
1661 	}
1662 
1663 	block->check_index_page_at_flush = TRUE;
1664 
1665 	/* Set the index id of the page */
1666 	btr_page_set_index_id(page, page_zip, index_id, mtr);
1667 
1668 	/* Set the next node and previous node fields */
1669 	btr_page_set_next(page, page_zip, FIL_NULL, mtr);
1670 	btr_page_set_prev(page, page_zip, FIL_NULL, mtr);
1671 
1672 	/* We reset the free bits for the page to allow creation of several
1673 	trees in the same mtr, otherwise the latch on a bitmap page would
1674 	prevent it because of the latching order */
1675 
1676 	if (!(type & DICT_CLUSTERED)) {
1677 		ibuf_reset_free_bits(block);
1678 	}
1679 
1680 	/* In the following assertion we test that two records of maximum
1681 	allowed size fit on the root page: this fact is needed to ensure
1682 	correctness of split algorithms */
1683 
1684 	ut_ad(page_get_max_insert_size(page, 2) > 2 * BTR_PAGE_MAX_REC_SIZE);
1685 
1686 	return(page_no);
1687 }
1688 
1689 /************************************************************//**
1690 Frees a B-tree except the root page, which MUST be freed after this
1691 by calling btr_free_root. */
1692 UNIV_INTERN
1693 void
btr_free_but_not_root(ulint space,ulint zip_size,ulint root_page_no)1694 btr_free_but_not_root(
1695 /*==================*/
1696 	ulint	space,		/*!< in: space where created */
1697 	ulint	zip_size,	/*!< in: compressed page size in bytes
1698 				or 0 for uncompressed pages */
1699 	ulint	root_page_no)	/*!< in: root page number */
1700 {
1701 	ibool	finished;
1702 	page_t*	root;
1703 	mtr_t	mtr;
1704 
1705 leaf_loop:
1706 	mtr_start(&mtr);
1707 
1708 	root = btr_page_get(space, zip_size, root_page_no, RW_X_LATCH,
1709 			    NULL, &mtr);
1710 
1711 	SRV_CORRUPT_TABLE_CHECK(root,
1712 	{
1713 		mtr_commit(&mtr);
1714 		return;
1715 	});
1716 
1717 #ifdef UNIV_BTR_DEBUG
1718 	ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_LEAF
1719 				    + root, space));
1720 	ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP
1721 				    + root, space));
1722 #endif /* UNIV_BTR_DEBUG */
1723 
1724 	/* NOTE: page hash indexes are dropped when a page is freed inside
1725 	fsp0fsp. */
1726 
1727 	finished = fseg_free_step(root + PAGE_HEADER + PAGE_BTR_SEG_LEAF,
1728 				  &mtr);
1729 	mtr_commit(&mtr);
1730 
1731 	if (!finished) {
1732 
1733 		goto leaf_loop;
1734 	}
1735 top_loop:
1736 	mtr_start(&mtr);
1737 
1738 	root = btr_page_get(space, zip_size, root_page_no, RW_X_LATCH,
1739 			    NULL, &mtr);
1740 
1741 	SRV_CORRUPT_TABLE_CHECK(root,
1742 	{
1743 		mtr_commit(&mtr);
1744 		return;
1745 	});
1746 
1747 #ifdef UNIV_BTR_DEBUG
1748 	ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP
1749 				    + root, space));
1750 #endif /* UNIV_BTR_DEBUG */
1751 
1752 	finished = fseg_free_step_not_header(
1753 		root + PAGE_HEADER + PAGE_BTR_SEG_TOP, &mtr);
1754 	mtr_commit(&mtr);
1755 
1756 	if (!finished) {
1757 
1758 		goto top_loop;
1759 	}
1760 }
1761 
1762 /************************************************************//**
1763 Frees the B-tree root page. Other tree MUST already have been freed. */
1764 UNIV_INTERN
1765 void
btr_free_root(ulint space,ulint zip_size,ulint root_page_no,mtr_t * mtr)1766 btr_free_root(
1767 /*==========*/
1768 	ulint	space,		/*!< in: space where created */
1769 	ulint	zip_size,	/*!< in: compressed page size in bytes
1770 				or 0 for uncompressed pages */
1771 	ulint	root_page_no,	/*!< in: root page number */
1772 	mtr_t*	mtr)		/*!< in/out: mini-transaction */
1773 {
1774 	buf_block_t*	block;
1775 	fseg_header_t*	header;
1776 
1777 	block = btr_block_get(space, zip_size, root_page_no, RW_X_LATCH,
1778 			      NULL, mtr);
1779 
1780 	SRV_CORRUPT_TABLE_CHECK(block, return;);
1781 
1782 	btr_search_drop_page_hash_index(block);
1783 
1784 	header = buf_block_get_frame(block) + PAGE_HEADER + PAGE_BTR_SEG_TOP;
1785 #ifdef UNIV_BTR_DEBUG
1786 	ut_a(btr_root_fseg_validate(header, space));
1787 #endif /* UNIV_BTR_DEBUG */
1788 
1789 	while (!fseg_free_step(header, mtr)) {
1790 		/* Free the entire segment in small steps. */
1791 	}
1792 }
1793 #endif /* !UNIV_HOTBACKUP */
1794 
1795 /*************************************************************//**
1796 Reorganizes an index page.
1797 
1798 IMPORTANT: On success, the caller will have to update IBUF_BITMAP_FREE
1799 if this is a compressed leaf page in a secondary index. This has to
1800 be done either within the same mini-transaction, or by invoking
1801 ibuf_reset_free_bits() before mtr_commit(). On uncompressed pages,
1802 IBUF_BITMAP_FREE is unaffected by reorganization.
1803 
1804 @retval true if the operation was successful
1805 @retval false if it is a compressed page, and recompression failed */
1806 UNIV_INTERN
1807 bool
btr_page_reorganize_low(bool recovery,ulint z_level,page_cur_t * cursor,dict_index_t * index,mtr_t * mtr)1808 btr_page_reorganize_low(
1809 /*====================*/
1810 	bool		recovery,/*!< in: true if called in recovery:
1811 				locks should not be updated, i.e.,
1812 				there cannot exist locks on the
1813 				page, and a hash index should not be
1814 				dropped: it cannot exist */
1815 	ulint		z_level,/*!< in: compression level to be used
1816 				if dealing with compressed page */
1817 	page_cur_t*	cursor,	/*!< in/out: page cursor */
1818 	dict_index_t*	index,	/*!< in: the index tree of the page */
1819 	mtr_t*		mtr)	/*!< in/out: mini-transaction */
1820 {
1821 	buf_block_t*	block		= page_cur_get_block(cursor);
1822 #ifndef UNIV_HOTBACKUP
1823 	buf_pool_t*	buf_pool	= buf_pool_from_bpage(&block->page);
1824 #endif /* !UNIV_HOTBACKUP */
1825 	page_t*		page		= buf_block_get_frame(block);
1826 	page_zip_des_t*	page_zip	= buf_block_get_page_zip(block);
1827 	buf_block_t*	temp_block;
1828 	page_t*		temp_page;
1829 	ulint		log_mode;
1830 	ulint		data_size1;
1831 	ulint		data_size2;
1832 	ulint		max_ins_size1;
1833 	ulint		max_ins_size2;
1834 	bool		success		= false;
1835 	ulint		pos;
1836 	bool		log_compressed;
1837 
1838 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
1839 	btr_assert_not_corrupted(block, index);
1840 #ifdef UNIV_ZIP_DEBUG
1841 	ut_a(!page_zip || page_zip_validate(page_zip, page, index));
1842 #endif /* UNIV_ZIP_DEBUG */
1843 	data_size1 = page_get_data_size(page);
1844 	max_ins_size1 = page_get_max_insert_size_after_reorganize(page, 1);
1845 
1846 	/* Turn logging off */
1847 	log_mode = mtr_set_log_mode(mtr, MTR_LOG_NONE);
1848 
1849 #ifndef UNIV_HOTBACKUP
1850 	temp_block = buf_block_alloc(buf_pool);
1851 #else /* !UNIV_HOTBACKUP */
1852 	ut_ad(block == back_block1);
1853 	temp_block = back_block2;
1854 #endif /* !UNIV_HOTBACKUP */
1855 	temp_page = temp_block->frame;
1856 
1857 	MONITOR_INC(MONITOR_INDEX_REORG_ATTEMPTS);
1858 
1859 	/* Copy the old page to temporary space */
1860 	buf_frame_copy(temp_page, page);
1861 
1862 #ifndef UNIV_HOTBACKUP
1863 	if (!recovery) {
1864 		btr_search_drop_page_hash_index(block);
1865 	}
1866 
1867 	block->check_index_page_at_flush = TRUE;
1868 #endif /* !UNIV_HOTBACKUP */
1869 	btr_blob_dbg_remove(page, index, "btr_page_reorganize");
1870 
1871 	/* Save the cursor position. */
1872 	pos = page_rec_get_n_recs_before(page_cur_get_rec(cursor));
1873 
1874 	/* Recreate the page: note that global data on page (possible
1875 	segment headers, next page-field, etc.) is preserved intact */
1876 
1877 	page_create(block, mtr, dict_table_is_comp(index->table));
1878 
1879 	/* Copy the records from the temporary space to the recreated page;
1880 	do not copy the lock bits yet */
1881 
1882 	page_copy_rec_list_end_no_locks(block, temp_block,
1883 					page_get_infimum_rec(temp_page),
1884 					index, mtr);
1885 
1886 	if (dict_index_is_sec_or_ibuf(index) && page_is_leaf(page)) {
1887 		/* Copy max trx id to recreated page */
1888 		trx_id_t	max_trx_id = page_get_max_trx_id(temp_page);
1889 		page_set_max_trx_id(block, NULL, max_trx_id, mtr);
1890 		/* In crash recovery, dict_index_is_sec_or_ibuf() always
1891 		holds, even for clustered indexes.  max_trx_id is
1892 		unused in clustered index pages. */
1893 		ut_ad(max_trx_id != 0 || recovery);
1894 	}
1895 
1896 	/* If innodb_log_compressed_pages is ON, page reorganize should log the
1897 	compressed page image.*/
1898 	log_compressed = page_zip && page_zip_log_pages;
1899 
1900 	if (log_compressed) {
1901 		mtr_set_log_mode(mtr, log_mode);
1902 	}
1903 
1904 	if (page_zip
1905 	    && !page_zip_compress(page_zip, page, index, z_level, mtr)) {
1906 
1907 		/* Restore the old page and exit. */
1908 		btr_blob_dbg_restore(page, temp_page, index,
1909 				     "btr_page_reorganize_compress_fail");
1910 
1911 #if defined UNIV_DEBUG || defined UNIV_ZIP_DEBUG
1912 		/* Check that the bytes that we skip are identical. */
1913 		ut_a(!memcmp(page, temp_page, PAGE_HEADER));
1914 		ut_a(!memcmp(PAGE_HEADER + PAGE_N_RECS + page,
1915 			     PAGE_HEADER + PAGE_N_RECS + temp_page,
1916 			     PAGE_DATA - (PAGE_HEADER + PAGE_N_RECS)));
1917 		ut_a(!memcmp(UNIV_PAGE_SIZE - FIL_PAGE_DATA_END + page,
1918 			     UNIV_PAGE_SIZE - FIL_PAGE_DATA_END + temp_page,
1919 			     FIL_PAGE_DATA_END));
1920 #endif /* UNIV_DEBUG || UNIV_ZIP_DEBUG */
1921 
1922 		memcpy(PAGE_HEADER + page, PAGE_HEADER + temp_page,
1923 		       PAGE_N_RECS - PAGE_N_DIR_SLOTS);
1924 		memcpy(PAGE_DATA + page, PAGE_DATA + temp_page,
1925 		       UNIV_PAGE_SIZE - PAGE_DATA - FIL_PAGE_DATA_END);
1926 
1927 #if defined UNIV_DEBUG || defined UNIV_ZIP_DEBUG
1928 		ut_a(!memcmp(page, temp_page, UNIV_PAGE_SIZE));
1929 #endif /* UNIV_DEBUG || UNIV_ZIP_DEBUG */
1930 
1931 		goto func_exit;
1932 	}
1933 
1934 #ifndef UNIV_HOTBACKUP
1935 	if (!recovery) {
1936 		/* Update the record lock bitmaps */
1937 		lock_move_reorganize_page(block, temp_block);
1938 	}
1939 #endif /* !UNIV_HOTBACKUP */
1940 
1941 	data_size2 = page_get_data_size(page);
1942 	max_ins_size2 = page_get_max_insert_size_after_reorganize(page, 1);
1943 
1944 	if (data_size1 != data_size2 || max_ins_size1 != max_ins_size2) {
1945 		buf_page_print(page, 0, BUF_PAGE_PRINT_NO_CRASH);
1946 		buf_page_print(temp_page, 0, BUF_PAGE_PRINT_NO_CRASH);
1947 
1948 		fprintf(stderr,
1949 			"InnoDB: Error: page old data size %lu"
1950 			" new data size %lu\n"
1951 			"InnoDB: Error: page old max ins size %lu"
1952 			" new max ins size %lu\n"
1953 			"InnoDB: Submit a detailed bug report"
1954 			" to http://bugs.mysql.com\n",
1955 			(unsigned long) data_size1, (unsigned long) data_size2,
1956 			(unsigned long) max_ins_size1,
1957 			(unsigned long) max_ins_size2);
1958 		ut_ad(0);
1959 	} else {
1960 		success = true;
1961 	}
1962 
1963 	/* Restore the cursor position. */
1964 	if (pos > 0) {
1965 		cursor->rec = page_rec_get_nth(page, pos);
1966 	} else {
1967 		ut_ad(cursor->rec == page_get_infimum_rec(page));
1968 	}
1969 
1970 func_exit:
1971 #ifdef UNIV_ZIP_DEBUG
1972 	ut_a(!page_zip || page_zip_validate(page_zip, page, index));
1973 #endif /* UNIV_ZIP_DEBUG */
1974 #ifndef UNIV_HOTBACKUP
1975 	buf_block_free(temp_block);
1976 #endif /* !UNIV_HOTBACKUP */
1977 
1978 	/* Restore logging mode */
1979 	mtr_set_log_mode(mtr, log_mode);
1980 
1981 #ifndef UNIV_HOTBACKUP
1982 	if (success) {
1983 		byte	type;
1984 		byte*	log_ptr;
1985 
1986 		/* Write the log record */
1987 		if (page_zip) {
1988 			ut_ad(page_is_comp(page));
1989 			type = MLOG_ZIP_PAGE_REORGANIZE;
1990 		} else if (page_is_comp(page)) {
1991 			type = MLOG_COMP_PAGE_REORGANIZE;
1992 		} else {
1993 			type = MLOG_PAGE_REORGANIZE;
1994 		}
1995 
1996 		log_ptr = log_compressed
1997 			? NULL
1998 			: mlog_open_and_write_index(
1999 				mtr, page, index, type,
2000 				page_zip ? 1 : 0);
2001 
2002 		/* For compressed pages write the compression level. */
2003 		if (log_ptr && page_zip) {
2004 			mach_write_to_1(log_ptr, z_level);
2005 			mlog_close(mtr, log_ptr + 1);
2006 		}
2007 
2008 		MONITOR_INC(MONITOR_INDEX_REORG_SUCCESSFUL);
2009 	}
2010 #endif /* !UNIV_HOTBACKUP */
2011 
2012 	return(success);
2013 }
2014 
2015 /*************************************************************//**
2016 Reorganizes an index page.
2017 
2018 IMPORTANT: On success, the caller will have to update IBUF_BITMAP_FREE
2019 if this is a compressed leaf page in a secondary index. This has to
2020 be done either within the same mini-transaction, or by invoking
2021 ibuf_reset_free_bits() before mtr_commit(). On uncompressed pages,
2022 IBUF_BITMAP_FREE is unaffected by reorganization.
2023 
2024 @retval true if the operation was successful
2025 @retval false if it is a compressed page, and recompression failed */
2026 static MY_ATTRIBUTE((nonnull))
2027 bool
btr_page_reorganize_block(bool recovery,ulint z_level,buf_block_t * block,dict_index_t * index,mtr_t * mtr)2028 btr_page_reorganize_block(
2029 /*======================*/
2030 	bool		recovery,/*!< in: true if called in recovery:
2031 				locks should not be updated, i.e.,
2032 				there cannot exist locks on the
2033 				page, and a hash index should not be
2034 				dropped: it cannot exist */
2035 	ulint		z_level,/*!< in: compression level to be used
2036 				if dealing with compressed page */
2037 	buf_block_t*	block,	/*!< in/out: B-tree page */
2038 	dict_index_t*	index,	/*!< in: the index tree of the page */
2039 	mtr_t*		mtr)	/*!< in/out: mini-transaction */
2040 {
2041 	page_cur_t	cur;
2042 	page_cur_set_before_first(block, &cur);
2043 
2044 	return(btr_page_reorganize_low(recovery, z_level, &cur, index, mtr));
2045 }
2046 
2047 #ifndef UNIV_HOTBACKUP
2048 /*************************************************************//**
2049 Reorganizes an index page.
2050 
2051 IMPORTANT: On success, the caller will have to update IBUF_BITMAP_FREE
2052 if this is a compressed leaf page in a secondary index. This has to
2053 be done either within the same mini-transaction, or by invoking
2054 ibuf_reset_free_bits() before mtr_commit(). On uncompressed pages,
2055 IBUF_BITMAP_FREE is unaffected by reorganization.
2056 
2057 @retval true if the operation was successful
2058 @retval false if it is a compressed page, and recompression failed */
2059 UNIV_INTERN
2060 bool
btr_page_reorganize(page_cur_t * cursor,dict_index_t * index,mtr_t * mtr)2061 btr_page_reorganize(
2062 /*================*/
2063 	page_cur_t*	cursor,	/*!< in/out: page cursor */
2064 	dict_index_t*	index,	/*!< in: the index tree of the page */
2065 	mtr_t*		mtr)	/*!< in/out: mini-transaction */
2066 {
2067 	return(btr_page_reorganize_low(false, page_zip_level,
2068 				       cursor, index, mtr));
2069 }
2070 #endif /* !UNIV_HOTBACKUP */
2071 
2072 /***********************************************************//**
2073 Parses a redo log record of reorganizing a page.
2074 @return	end of log record or NULL */
2075 UNIV_INTERN
2076 byte*
btr_parse_page_reorganize(byte * ptr,byte * end_ptr,dict_index_t * index,bool compressed,buf_block_t * block,mtr_t * mtr)2077 btr_parse_page_reorganize(
2078 /*======================*/
2079 	byte*		ptr,	/*!< in: buffer */
2080 	byte*		end_ptr,/*!< in: buffer end */
2081 	dict_index_t*	index,	/*!< in: record descriptor */
2082 	bool		compressed,/*!< in: true if compressed page */
2083 	buf_block_t*	block,	/*!< in: page to be reorganized, or NULL */
2084 	mtr_t*		mtr)	/*!< in: mtr or NULL */
2085 {
2086 	ulint	level;
2087 
2088 	ut_ad(ptr != NULL);
2089 	ut_ad(end_ptr != NULL);
2090 
2091 	/* If dealing with a compressed page the record has the
2092 	compression level used during original compression written in
2093 	one byte. Otherwise record is empty. */
2094 	if (compressed) {
2095 		if (ptr == end_ptr) {
2096 			return(NULL);
2097 		}
2098 
2099 		level = mach_read_from_1(ptr);
2100 
2101 		ut_a(level <= 9);
2102 		++ptr;
2103 	} else {
2104 		level = page_zip_level;
2105 	}
2106 
2107 	if (block != NULL) {
2108 		btr_page_reorganize_block(true, level, block, index, mtr);
2109 	}
2110 
2111 	return(ptr);
2112 }
2113 
2114 #ifndef UNIV_HOTBACKUP
2115 /*************************************************************//**
2116 Empties an index page.  @see btr_page_create(). */
2117 UNIV_INTERN
2118 void
btr_page_empty(buf_block_t * block,page_zip_des_t * page_zip,dict_index_t * index,ulint level,mtr_t * mtr)2119 btr_page_empty(
2120 /*===========*/
2121 	buf_block_t*	block,	/*!< in: page to be emptied */
2122 	page_zip_des_t*	page_zip,/*!< out: compressed page, or NULL */
2123 	dict_index_t*	index,	/*!< in: index of the page */
2124 	ulint		level,	/*!< in: the B-tree level of the page */
2125 	mtr_t*		mtr)	/*!< in: mtr */
2126 {
2127 	page_t*	page = buf_block_get_frame(block);
2128 
2129 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
2130 	ut_ad(page_zip == buf_block_get_page_zip(block));
2131 #ifdef UNIV_ZIP_DEBUG
2132 	ut_a(!page_zip || page_zip_validate(page_zip, page, index));
2133 #endif /* UNIV_ZIP_DEBUG */
2134 
2135 	btr_search_drop_page_hash_index(block);
2136 	btr_blob_dbg_remove(page, index, "btr_page_empty");
2137 
2138 	/* Recreate the page: note that global data on page (possible
2139 	segment headers, next page-field, etc.) is preserved intact */
2140 
2141 	if (page_zip) {
2142 		page_create_zip(block, index, level, 0, mtr);
2143 	} else {
2144 		page_create(block, mtr, dict_table_is_comp(index->table));
2145 		btr_page_set_level(page, NULL, level, mtr);
2146 	}
2147 
2148 	block->check_index_page_at_flush = TRUE;
2149 }
2150 
2151 /*************************************************************//**
2152 Makes tree one level higher by splitting the root, and inserts
2153 the tuple. It is assumed that mtr contains an x-latch on the tree.
2154 NOTE that the operation of this function must always succeed,
2155 we cannot reverse it: therefore enough free disk space must be
2156 guaranteed to be available before this function is called.
2157 @return	inserted record or NULL if run out of space */
2158 UNIV_INTERN
2159 rec_t*
btr_root_raise_and_insert(ulint flags,btr_cur_t * cursor,ulint ** offsets,mem_heap_t ** heap,const dtuple_t * tuple,ulint n_ext,mtr_t * mtr)2160 btr_root_raise_and_insert(
2161 /*======================*/
2162 	ulint		flags,	/*!< in: undo logging and locking flags */
2163 	btr_cur_t*	cursor,	/*!< in: cursor at which to insert: must be
2164 				on the root page; when the function returns,
2165 				the cursor is positioned on the predecessor
2166 				of the inserted record */
2167 	ulint**		offsets,/*!< out: offsets on inserted record */
2168 	mem_heap_t**	heap,	/*!< in/out: pointer to memory heap, or NULL */
2169 	const dtuple_t*	tuple,	/*!< in: tuple to insert */
2170 	ulint		n_ext,	/*!< in: number of externally stored columns */
2171 	mtr_t*		mtr)	/*!< in: mtr */
2172 {
2173 	dict_index_t*	index;
2174 	page_t*		root;
2175 	page_t*		new_page;
2176 	ulint		new_page_no;
2177 	rec_t*		rec;
2178 	dtuple_t*	node_ptr;
2179 	ulint		level;
2180 	rec_t*		node_ptr_rec;
2181 	page_cur_t*	page_cursor;
2182 	page_zip_des_t*	root_page_zip;
2183 	page_zip_des_t*	new_page_zip;
2184 	buf_block_t*	root_block;
2185 	buf_block_t*	new_block;
2186 
2187 	root = btr_cur_get_page(cursor);
2188 	root_block = btr_cur_get_block(cursor);
2189 	root_page_zip = buf_block_get_page_zip(root_block);
2190 	ut_ad(!page_is_empty(root));
2191 	index = btr_cur_get_index(cursor);
2192 #ifdef UNIV_ZIP_DEBUG
2193 	ut_a(!root_page_zip || page_zip_validate(root_page_zip, root, index));
2194 #endif /* UNIV_ZIP_DEBUG */
2195 #ifdef UNIV_BTR_DEBUG
2196 	if (!dict_index_is_ibuf(index)) {
2197 		ulint	space = dict_index_get_space(index);
2198 
2199 		ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_LEAF
2200 					    + root, space));
2201 		ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP
2202 					    + root, space));
2203 	}
2204 
2205 	ut_a(dict_index_get_page(index) == page_get_page_no(root));
2206 #endif /* UNIV_BTR_DEBUG */
2207 	ut_ad(mtr_memo_contains(mtr, dict_index_get_lock(index),
2208 				MTR_MEMO_X_LOCK));
2209 	ut_ad(mtr_memo_contains(mtr, root_block, MTR_MEMO_PAGE_X_FIX));
2210 
2211 	/* Allocate a new page to the tree. Root splitting is done by first
2212 	moving the root records to the new page, emptying the root, putting
2213 	a node pointer to the new page, and then splitting the new page. */
2214 
2215 	level = btr_page_get_level(root, mtr);
2216 
2217 	new_block = btr_page_alloc(index, 0, FSP_NO_DIR, level, mtr, mtr);
2218 
2219 	if (new_block == NULL && os_has_said_disk_full) {
2220 		return(NULL);
2221         }
2222 
2223 	new_page = buf_block_get_frame(new_block);
2224 	new_page_zip = buf_block_get_page_zip(new_block);
2225 	ut_a(!new_page_zip == !root_page_zip);
2226 	ut_a(!new_page_zip
2227 	     || page_zip_get_size(new_page_zip)
2228 	     == page_zip_get_size(root_page_zip));
2229 
2230 	btr_page_create(new_block, new_page_zip, index, level, mtr);
2231 
2232 	/* Set the next node and previous node fields of new page */
2233 	btr_page_set_next(new_page, new_page_zip, FIL_NULL, mtr);
2234 	btr_page_set_prev(new_page, new_page_zip, FIL_NULL, mtr);
2235 
2236 	/* Copy the records from root to the new page one by one. */
2237 
2238 	if (0
2239 #ifdef UNIV_ZIP_COPY
2240 	    || new_page_zip
2241 #endif /* UNIV_ZIP_COPY */
2242 	    || !page_copy_rec_list_end(new_block, root_block,
2243 				       page_get_infimum_rec(root),
2244 				       index, mtr)) {
2245 		ut_a(new_page_zip);
2246 
2247 		/* Copy the page byte for byte. */
2248 		page_zip_copy_recs(new_page_zip, new_page,
2249 				   root_page_zip, root, index, mtr);
2250 
2251 		/* Update the lock table and possible hash index. */
2252 
2253 		lock_move_rec_list_end(new_block, root_block,
2254 				       page_get_infimum_rec(root));
2255 
2256 		btr_search_move_or_delete_hash_entries(new_block, root_block,
2257 						       index);
2258 	}
2259 
2260 	/* If this is a pessimistic insert which is actually done to
2261 	perform a pessimistic update then we have stored the lock
2262 	information of the record to be inserted on the infimum of the
2263 	root page: we cannot discard the lock structs on the root page */
2264 
2265 	lock_update_root_raise(new_block, root_block);
2266 
2267 	/* Create a memory heap where the node pointer is stored */
2268 	if (!*heap) {
2269 		*heap = mem_heap_create(1000);
2270 	}
2271 
2272 	rec = page_rec_get_next(page_get_infimum_rec(new_page));
2273 	new_page_no = buf_block_get_page_no(new_block);
2274 
2275 	/* Build the node pointer (= node key and page address) for the
2276 	child */
2277 
2278 	node_ptr = dict_index_build_node_ptr(
2279 		index, rec, new_page_no, *heap, level);
2280 	/* The node pointer must be marked as the predefined minimum record,
2281 	as there is no lower alphabetical limit to records in the leftmost
2282 	node of a level: */
2283 	dtuple_set_info_bits(node_ptr,
2284 			     dtuple_get_info_bits(node_ptr)
2285 			     | REC_INFO_MIN_REC_FLAG);
2286 
2287 	/* Rebuild the root page to get free space */
2288 	btr_page_empty(root_block, root_page_zip, index, level + 1, mtr);
2289 
2290 	/* Set the next node and previous node fields, although
2291 	they should already have been set.  The previous node field
2292 	must be FIL_NULL if root_page_zip != NULL, because the
2293 	REC_INFO_MIN_REC_FLAG (of the first user record) will be
2294 	set if and only if btr_page_get_prev() == FIL_NULL. */
2295 	btr_page_set_next(root, root_page_zip, FIL_NULL, mtr);
2296 	btr_page_set_prev(root, root_page_zip, FIL_NULL, mtr);
2297 
2298 	page_cursor = btr_cur_get_page_cur(cursor);
2299 
2300 	/* Insert node pointer to the root */
2301 
2302 	page_cur_set_before_first(root_block, page_cursor);
2303 
2304 	node_ptr_rec = page_cur_tuple_insert(page_cursor, node_ptr,
2305 					     index, offsets, heap, 0, mtr);
2306 
2307 	/* The root page should only contain the node pointer
2308 	to new_page at this point.  Thus, the data should fit. */
2309 	ut_a(node_ptr_rec);
2310 
2311 	/* We play safe and reset the free bits for the new page */
2312 
2313 #if 0
2314 	fprintf(stderr, "Root raise new page no %lu\n", new_page_no);
2315 #endif
2316 
2317 	if (!dict_index_is_clust(index)) {
2318 		ibuf_reset_free_bits(new_block);
2319 	}
2320 
2321 	/* Reposition the cursor to the child node */
2322 	page_cur_search(new_block, index, tuple,
2323 			PAGE_CUR_LE, page_cursor);
2324 
2325 	/* Split the child and insert tuple */
2326 	return(btr_page_split_and_insert(flags, cursor, offsets, heap,
2327 					 tuple, n_ext, mtr));
2328 }
2329 
2330 /*************************************************************//**
2331 Decides if the page should be split at the convergence point of inserts
2332 converging to the left.
2333 @return	TRUE if split recommended */
2334 UNIV_INTERN
2335 ibool
btr_page_get_split_rec_to_left(btr_cur_t * cursor,rec_t ** split_rec)2336 btr_page_get_split_rec_to_left(
2337 /*===========================*/
2338 	btr_cur_t*	cursor,	/*!< in: cursor at which to insert */
2339 	rec_t**		split_rec) /*!< out: if split recommended,
2340 				the first record on upper half page,
2341 				or NULL if tuple to be inserted should
2342 				be first */
2343 {
2344 	page_t*	page;
2345 	rec_t*	insert_point;
2346 	rec_t*	infimum;
2347 
2348 	page = btr_cur_get_page(cursor);
2349 	insert_point = btr_cur_get_rec(cursor);
2350 
2351 	if (page_header_get_ptr(page, PAGE_LAST_INSERT)
2352 	    == page_rec_get_next(insert_point)) {
2353 
2354 		infimum = page_get_infimum_rec(page);
2355 
2356 		/* If the convergence is in the middle of a page, include also
2357 		the record immediately before the new insert to the upper
2358 		page. Otherwise, we could repeatedly move from page to page
2359 		lots of records smaller than the convergence point. */
2360 
2361 		if (infimum != insert_point
2362 		    && page_rec_get_next(infimum) != insert_point) {
2363 
2364 			*split_rec = insert_point;
2365 		} else {
2366 			*split_rec = page_rec_get_next(insert_point);
2367 		}
2368 
2369 		return(TRUE);
2370 	}
2371 
2372 	return(FALSE);
2373 }
2374 
2375 /*************************************************************//**
2376 Decides if the page should be split at the convergence point of inserts
2377 converging to the right.
2378 @return	TRUE if split recommended */
2379 UNIV_INTERN
2380 ibool
btr_page_get_split_rec_to_right(btr_cur_t * cursor,rec_t ** split_rec)2381 btr_page_get_split_rec_to_right(
2382 /*============================*/
2383 	btr_cur_t*	cursor,	/*!< in: cursor at which to insert */
2384 	rec_t**		split_rec) /*!< out: if split recommended,
2385 				the first record on upper half page,
2386 				or NULL if tuple to be inserted should
2387 				be first */
2388 {
2389 	page_t*	page;
2390 	rec_t*	insert_point;
2391 
2392 	page = btr_cur_get_page(cursor);
2393 	insert_point = btr_cur_get_rec(cursor);
2394 
2395 	/* We use eager heuristics: if the new insert would be right after
2396 	the previous insert on the same page, we assume that there is a
2397 	pattern of sequential inserts here. */
2398 
2399 	if (page_header_get_ptr(page, PAGE_LAST_INSERT) == insert_point) {
2400 
2401 		rec_t*	next_rec;
2402 
2403 		next_rec = page_rec_get_next(insert_point);
2404 
2405 		if (page_rec_is_supremum(next_rec)) {
2406 split_at_new:
2407 			/* Split at the new record to insert */
2408 			*split_rec = NULL;
2409 		} else {
2410 			rec_t*	next_next_rec = page_rec_get_next(next_rec);
2411 			if (page_rec_is_supremum(next_next_rec)) {
2412 
2413 				goto split_at_new;
2414 			}
2415 
2416 			/* If there are >= 2 user records up from the insert
2417 			point, split all but 1 off. We want to keep one because
2418 			then sequential inserts can use the adaptive hash
2419 			index, as they can do the necessary checks of the right
2420 			search position just by looking at the records on this
2421 			page. */
2422 
2423 			*split_rec = next_next_rec;
2424 		}
2425 
2426 		return(TRUE);
2427 	}
2428 
2429 	return(FALSE);
2430 }
2431 
2432 /*************************************************************//**
2433 Calculates a split record such that the tuple will certainly fit on
2434 its half-page when the split is performed. We assume in this function
2435 only that the cursor page has at least one user record.
2436 @return split record, or NULL if tuple will be the first record on
2437 the lower or upper half-page (determined by btr_page_tuple_smaller()) */
2438 static
2439 rec_t*
btr_page_get_split_rec(btr_cur_t * cursor,const dtuple_t * tuple,ulint n_ext)2440 btr_page_get_split_rec(
2441 /*===================*/
2442 	btr_cur_t*	cursor,	/*!< in: cursor at which insert should be made */
2443 	const dtuple_t*	tuple,	/*!< in: tuple to insert */
2444 	ulint		n_ext)	/*!< in: number of externally stored columns */
2445 {
2446 	page_t*		page;
2447 	page_zip_des_t*	page_zip;
2448 	ulint		insert_size;
2449 	ulint		free_space;
2450 	ulint		total_data;
2451 	ulint		total_n_recs;
2452 	ulint		total_space;
2453 	ulint		incl_data;
2454 	rec_t*		ins_rec;
2455 	rec_t*		rec;
2456 	rec_t*		next_rec;
2457 	ulint		n;
2458 	mem_heap_t*	heap;
2459 	ulint*		offsets;
2460 
2461 	page = btr_cur_get_page(cursor);
2462 
2463 	insert_size = rec_get_converted_size(cursor->index, tuple, n_ext);
2464 	free_space  = page_get_free_space_of_empty(page_is_comp(page));
2465 
2466 	page_zip = btr_cur_get_page_zip(cursor);
2467 	if (page_zip) {
2468 		/* Estimate the free space of an empty compressed page. */
2469 		ulint	free_space_zip = page_zip_empty_size(
2470 			cursor->index->n_fields,
2471 			page_zip_get_size(page_zip));
2472 
2473 		if (free_space > (ulint) free_space_zip) {
2474 			free_space = (ulint) free_space_zip;
2475 		}
2476 	}
2477 
2478 	/* free_space is now the free space of a created new page */
2479 
2480 	total_data   = page_get_data_size(page) + insert_size;
2481 	total_n_recs = page_get_n_recs(page) + 1;
2482 	ut_ad(total_n_recs >= 2);
2483 	total_space  = total_data + page_dir_calc_reserved_space(total_n_recs);
2484 
2485 	n = 0;
2486 	incl_data = 0;
2487 	ins_rec = btr_cur_get_rec(cursor);
2488 	rec = page_get_infimum_rec(page);
2489 
2490 	heap = NULL;
2491 	offsets = NULL;
2492 
2493 	/* We start to include records to the left half, and when the
2494 	space reserved by them exceeds half of total_space, then if
2495 	the included records fit on the left page, they will be put there
2496 	if something was left over also for the right page,
2497 	otherwise the last included record will be the first on the right
2498 	half page */
2499 
2500 	do {
2501 		/* Decide the next record to include */
2502 		if (rec == ins_rec) {
2503 			rec = NULL;	/* NULL denotes that tuple is
2504 					now included */
2505 		} else if (rec == NULL) {
2506 			rec = page_rec_get_next(ins_rec);
2507 		} else {
2508 			rec = page_rec_get_next(rec);
2509 		}
2510 
2511 		if (rec == NULL) {
2512 			/* Include tuple */
2513 			incl_data += insert_size;
2514 		} else {
2515 			offsets = rec_get_offsets(rec, cursor->index,
2516 						  offsets, ULINT_UNDEFINED,
2517 						  &heap);
2518 			incl_data += rec_offs_size(offsets);
2519 		}
2520 
2521 		n++;
2522 	} while (incl_data + page_dir_calc_reserved_space(n)
2523 		 < total_space / 2);
2524 
2525 	if (incl_data + page_dir_calc_reserved_space(n) <= free_space) {
2526 		/* The next record will be the first on
2527 		the right half page if it is not the
2528 		supremum record of page */
2529 
2530 		if (rec == ins_rec) {
2531 			rec = NULL;
2532 
2533 			goto func_exit;
2534 		} else if (rec == NULL) {
2535 			next_rec = page_rec_get_next(ins_rec);
2536 		} else {
2537 			next_rec = page_rec_get_next(rec);
2538 		}
2539 		ut_ad(next_rec);
2540 		if (!page_rec_is_supremum(next_rec)) {
2541 			rec = next_rec;
2542 		}
2543 	}
2544 
2545 func_exit:
2546 	if (heap) {
2547 		mem_heap_free(heap);
2548 	}
2549 	return(rec);
2550 }
2551 
2552 /*************************************************************//**
2553 Returns TRUE if the insert fits on the appropriate half-page with the
2554 chosen split_rec.
2555 @return	true if fits */
2556 static MY_ATTRIBUTE((nonnull(1,3,4,6), warn_unused_result))
2557 bool
btr_page_insert_fits(btr_cur_t * cursor,const rec_t * split_rec,ulint ** offsets,const dtuple_t * tuple,ulint n_ext,mem_heap_t ** heap)2558 btr_page_insert_fits(
2559 /*=================*/
2560 	btr_cur_t*	cursor,	/*!< in: cursor at which insert
2561 				should be made */
2562 	const rec_t*	split_rec,/*!< in: suggestion for first record
2563 				on upper half-page, or NULL if
2564 				tuple to be inserted should be first */
2565 	ulint**		offsets,/*!< in: rec_get_offsets(
2566 				split_rec, cursor->index); out: garbage */
2567 	const dtuple_t*	tuple,	/*!< in: tuple to insert */
2568 	ulint		n_ext,	/*!< in: number of externally stored columns */
2569 	mem_heap_t**	heap)	/*!< in: temporary memory heap */
2570 {
2571 	page_t*		page;
2572 	ulint		insert_size;
2573 	ulint		free_space;
2574 	ulint		total_data;
2575 	ulint		total_n_recs;
2576 	const rec_t*	rec;
2577 	const rec_t*	end_rec;
2578 
2579 	page = btr_cur_get_page(cursor);
2580 
2581 	ut_ad(!split_rec
2582 	      || !page_is_comp(page) == !rec_offs_comp(*offsets));
2583 	ut_ad(!split_rec
2584 	      || rec_offs_validate(split_rec, cursor->index, *offsets));
2585 
2586 	insert_size = rec_get_converted_size(cursor->index, tuple, n_ext);
2587 	free_space  = page_get_free_space_of_empty(page_is_comp(page));
2588 
2589 	/* free_space is now the free space of a created new page */
2590 
2591 	total_data   = page_get_data_size(page) + insert_size;
2592 	total_n_recs = page_get_n_recs(page) + 1;
2593 
2594 	/* We determine which records (from rec to end_rec, not including
2595 	end_rec) will end up on the other half page from tuple when it is
2596 	inserted. */
2597 
2598 	if (split_rec == NULL) {
2599 		rec = page_rec_get_next(page_get_infimum_rec(page));
2600 		end_rec = page_rec_get_next(btr_cur_get_rec(cursor));
2601 
2602 	} else if (cmp_dtuple_rec(tuple, split_rec, *offsets) >= 0) {
2603 
2604 		rec = page_rec_get_next(page_get_infimum_rec(page));
2605 		end_rec = split_rec;
2606 	} else {
2607 		rec = split_rec;
2608 		end_rec = page_get_supremum_rec(page);
2609 	}
2610 
2611 	if (total_data + page_dir_calc_reserved_space(total_n_recs)
2612 	    <= free_space) {
2613 
2614 		/* Ok, there will be enough available space on the
2615 		half page where the tuple is inserted */
2616 
2617 		return(true);
2618 	}
2619 
2620 	while (rec != end_rec) {
2621 		/* In this loop we calculate the amount of reserved
2622 		space after rec is removed from page. */
2623 
2624 		*offsets = rec_get_offsets(rec, cursor->index, *offsets,
2625 					   ULINT_UNDEFINED, heap);
2626 
2627 		total_data -= rec_offs_size(*offsets);
2628 		total_n_recs--;
2629 
2630 		if (total_data + page_dir_calc_reserved_space(total_n_recs)
2631 		    <= free_space) {
2632 
2633 			/* Ok, there will be enough available space on the
2634 			half page where the tuple is inserted */
2635 
2636 			return(true);
2637 		}
2638 
2639 		rec = page_rec_get_next_const(rec);
2640 	}
2641 
2642 	return(false);
2643 }
2644 
2645 /*******************************************************//**
2646 Inserts a data tuple to a tree on a non-leaf level. It is assumed
2647 that mtr holds an x-latch on the tree. */
2648 UNIV_INTERN
2649 void
btr_insert_on_non_leaf_level_func(ulint flags,dict_index_t * index,ulint level,dtuple_t * tuple,const char * file,ulint line,mtr_t * mtr)2650 btr_insert_on_non_leaf_level_func(
2651 /*==============================*/
2652 	ulint		flags,	/*!< in: undo logging and locking flags */
2653 	dict_index_t*	index,	/*!< in: index */
2654 	ulint		level,	/*!< in: level, must be > 0 */
2655 	dtuple_t*	tuple,	/*!< in: the record to be inserted */
2656 	const char*	file,	/*!< in: file name */
2657 	ulint		line,	/*!< in: line where called */
2658 	mtr_t*		mtr)	/*!< in: mtr */
2659 {
2660 	big_rec_t*	dummy_big_rec;
2661 	btr_cur_t	cursor;
2662 	dberr_t		err;
2663 	rec_t*		rec;
2664 	ulint*		offsets	= NULL;
2665 	mem_heap_t*	heap = NULL;
2666 
2667 	ut_ad(level > 0);
2668 
2669 	btr_cur_search_to_nth_level(index, level, tuple, PAGE_CUR_LE,
2670 				    BTR_CONT_MODIFY_TREE,
2671 				    &cursor, 0, file, line, mtr);
2672 
2673 	ut_ad(cursor.flag == BTR_CUR_BINARY);
2674 
2675 	err = btr_cur_optimistic_insert(
2676 		flags
2677 		| BTR_NO_LOCKING_FLAG
2678 		| BTR_KEEP_SYS_FLAG
2679 		| BTR_NO_UNDO_LOG_FLAG,
2680 		&cursor, &offsets, &heap,
2681 		tuple, &rec, &dummy_big_rec, 0, NULL, mtr);
2682 
2683 	if (err == DB_FAIL) {
2684 		err = btr_cur_pessimistic_insert(flags
2685 						 | BTR_NO_LOCKING_FLAG
2686 						 | BTR_KEEP_SYS_FLAG
2687 						 | BTR_NO_UNDO_LOG_FLAG,
2688 						 &cursor, &offsets, &heap,
2689 						 tuple, &rec,
2690 						 &dummy_big_rec, 0, NULL, mtr);
2691 		ut_a(err == DB_SUCCESS);
2692 	}
2693 	mem_heap_free(heap);
2694 }
2695 
2696 /**************************************************************//**
2697 Attaches the halves of an index page on the appropriate level in an
2698 index tree. */
2699 static MY_ATTRIBUTE((nonnull))
2700 void
btr_attach_half_pages(ulint flags,dict_index_t * index,buf_block_t * block,const rec_t * split_rec,buf_block_t * new_block,ulint direction,mtr_t * mtr)2701 btr_attach_half_pages(
2702 /*==================*/
2703 	ulint		flags,		/*!< in: undo logging and
2704 					locking flags */
2705 	dict_index_t*	index,		/*!< in: the index tree */
2706 	buf_block_t*	block,		/*!< in/out: page to be split */
2707 	const rec_t*	split_rec,	/*!< in: first record on upper
2708 					half page */
2709 	buf_block_t*	new_block,	/*!< in/out: the new half page */
2710 	ulint		direction,	/*!< in: FSP_UP or FSP_DOWN */
2711 	mtr_t*		mtr)		/*!< in: mtr */
2712 {
2713 	ulint		space;
2714 	ulint		zip_size;
2715 	ulint		prev_page_no;
2716 	ulint		next_page_no;
2717 	ulint		level;
2718 	page_t*		page		= buf_nonnull_block_get_frame(block);
2719 	page_t*		lower_page;
2720 	page_t*		upper_page;
2721 	ulint		lower_page_no;
2722 	ulint		upper_page_no;
2723 	page_zip_des_t*	lower_page_zip;
2724 	page_zip_des_t*	upper_page_zip;
2725 	dtuple_t*	node_ptr_upper;
2726 	mem_heap_t*	heap;
2727 
2728 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
2729 	ut_ad(mtr_memo_contains(mtr, new_block, MTR_MEMO_PAGE_X_FIX));
2730 
2731 	/* Create a memory heap where the data tuple is stored */
2732 	heap = mem_heap_create(1024);
2733 
2734 	/* Based on split direction, decide upper and lower pages */
2735 	if (direction == FSP_DOWN) {
2736 
2737 		btr_cur_t	cursor;
2738 		ulint*		offsets;
2739 
2740 		lower_page = buf_nonnull_block_get_frame(new_block);
2741 		lower_page_no = buf_block_get_page_no(new_block);
2742 		lower_page_zip = buf_block_get_page_zip(new_block);
2743 		upper_page = buf_nonnull_block_get_frame(block);
2744 		upper_page_no = buf_block_get_page_no(block);
2745 		upper_page_zip = buf_block_get_page_zip(block);
2746 
2747 		/* Look up the index for the node pointer to page */
2748 		offsets = btr_page_get_father_block(NULL, heap, index,
2749 						    block, mtr, &cursor);
2750 
2751 		/* Replace the address of the old child node (= page) with the
2752 		address of the new lower half */
2753 
2754 		btr_node_ptr_set_child_page_no(
2755 			btr_cur_get_rec(&cursor),
2756 			btr_cur_get_page_zip(&cursor),
2757 			offsets, lower_page_no, mtr);
2758 		mem_heap_empty(heap);
2759 	} else {
2760 		lower_page = buf_nonnull_block_get_frame(block);
2761 		lower_page_no = buf_block_get_page_no(block);
2762 		lower_page_zip = buf_block_get_page_zip(block);
2763 		upper_page = buf_nonnull_block_get_frame(new_block);
2764 		upper_page_no = buf_block_get_page_no(new_block);
2765 		upper_page_zip = buf_block_get_page_zip(new_block);
2766 	}
2767 
2768 	/* Get the level of the split pages */
2769 	level = btr_page_get_level(buf_nonnull_block_get_frame(block), mtr);
2770 	ut_ad(level
2771 	      == btr_page_get_level(buf_block_get_frame(new_block), mtr));
2772 
2773 	/* Build the node pointer (= node key and page address) for the upper
2774 	half */
2775 
2776 	node_ptr_upper = dict_index_build_node_ptr(index, split_rec,
2777 						   upper_page_no, heap, level);
2778 
2779 	/* Insert it next to the pointer to the lower half. Note that this
2780 	may generate recursion leading to a split on the higher level. */
2781 
2782 	btr_insert_on_non_leaf_level(flags, index, level + 1,
2783 				     node_ptr_upper, mtr);
2784 
2785 	/* Free the memory heap */
2786 	mem_heap_free(heap);
2787 
2788 	/* Get the previous and next pages of page */
2789 
2790 	prev_page_no = btr_page_get_prev(page, mtr);
2791 	next_page_no = btr_page_get_next(page, mtr);
2792 	space = buf_block_get_space(block);
2793 	zip_size = buf_block_get_zip_size(block);
2794 
2795 	/* Update page links of the level */
2796 
2797 	if (prev_page_no != FIL_NULL) {
2798 		buf_block_t*	prev_block = btr_block_get(
2799 			space, zip_size, prev_page_no, RW_X_LATCH, index, mtr);
2800 #ifdef UNIV_BTR_DEBUG
2801 		ut_a(page_is_comp(prev_block->frame) == page_is_comp(page));
2802 		ut_a(btr_page_get_next(prev_block->frame, mtr)
2803 		     == buf_block_get_page_no(block));
2804 #endif /* UNIV_BTR_DEBUG */
2805 
2806 		btr_page_set_next(buf_block_get_frame(prev_block),
2807 				  buf_block_get_page_zip(prev_block),
2808 				  lower_page_no, mtr);
2809 	}
2810 
2811 	if (next_page_no != FIL_NULL) {
2812 		buf_block_t*	next_block = btr_block_get(
2813 			space, zip_size, next_page_no, RW_X_LATCH, index, mtr);
2814 #ifdef UNIV_BTR_DEBUG
2815 		ut_a(page_is_comp(next_block->frame) == page_is_comp(page));
2816 		ut_a(btr_page_get_prev(next_block->frame, mtr)
2817 		     == page_get_page_no(page));
2818 #endif /* UNIV_BTR_DEBUG */
2819 
2820 		btr_page_set_prev(buf_block_get_frame(next_block),
2821 				  buf_block_get_page_zip(next_block),
2822 				  upper_page_no, mtr);
2823 	}
2824 
2825 	btr_page_set_prev(lower_page, lower_page_zip, prev_page_no, mtr);
2826 	btr_page_set_next(lower_page, lower_page_zip, upper_page_no, mtr);
2827 
2828 	btr_page_set_prev(upper_page, upper_page_zip, lower_page_no, mtr);
2829 	btr_page_set_next(upper_page, upper_page_zip, next_page_no, mtr);
2830 }
2831 
2832 /*************************************************************//**
2833 Determine if a tuple is smaller than any record on the page.
2834 @return TRUE if smaller */
2835 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2836 bool
btr_page_tuple_smaller(btr_cur_t * cursor,const dtuple_t * tuple,ulint ** offsets,ulint n_uniq,mem_heap_t ** heap)2837 btr_page_tuple_smaller(
2838 /*===================*/
2839 	btr_cur_t*	cursor,	/*!< in: b-tree cursor */
2840 	const dtuple_t*	tuple,	/*!< in: tuple to consider */
2841 	ulint**		offsets,/*!< in/out: temporary storage */
2842 	ulint		n_uniq,	/*!< in: number of unique fields
2843 				in the index page records */
2844 	mem_heap_t**	heap)	/*!< in/out: heap for offsets */
2845 {
2846 	buf_block_t*	block;
2847 	const rec_t*	first_rec;
2848 	page_cur_t	pcur;
2849 
2850 	/* Read the first user record in the page. */
2851 	block = btr_cur_get_block(cursor);
2852 	page_cur_set_before_first(block, &pcur);
2853 	page_cur_move_to_next(&pcur);
2854 	first_rec = page_cur_get_rec(&pcur);
2855 
2856 	*offsets = rec_get_offsets(
2857 		first_rec, cursor->index, *offsets,
2858 		n_uniq, heap);
2859 
2860 	return(cmp_dtuple_rec(tuple, first_rec, *offsets) < 0);
2861 }
2862 
2863 /** Insert the tuple into the right sibling page, if the cursor is at the end
2864 of a page.
2865 @param[in]	flags	undo logging and locking flags
2866 @param[in,out]	cursor	cursor at which to insert; when the function succeeds,
2867 			the cursor is positioned before the insert point.
2868 @param[out]	offsets	offsets on inserted record
2869 @param[in,out]	heap	memory heap for allocating offsets
2870 @param[in]	tuple	tuple to insert
2871 @param[in]	n_ext	number of externally stored columns
2872 @param[in,out]	mtr	mini-transaction
2873 @return	inserted record (first record on the right sibling page);
2874 	the cursor will be positioned on the page infimum
2875 @retval	NULL if the operation was not performed */
2876 static
2877 rec_t*
btr_insert_into_right_sibling(ulint flags,btr_cur_t * cursor,ulint ** offsets,mem_heap_t * heap,const dtuple_t * tuple,ulint n_ext,mtr_t * mtr)2878 btr_insert_into_right_sibling(
2879 	ulint		flags,
2880 	btr_cur_t*	cursor,
2881 	ulint**		offsets,
2882 	mem_heap_t*	heap,
2883 	const dtuple_t*	tuple,
2884 	ulint		n_ext,
2885 	mtr_t*		mtr)
2886 {
2887 	buf_block_t*	block = btr_cur_get_block(cursor);
2888 	page_t*		page = buf_block_get_frame(block);
2889 	ulint		next_page_no = btr_page_get_next(page, mtr);
2890 
2891 	ut_ad(mtr_memo_contains(mtr, dict_index_get_lock(cursor->index),
2892 				MTR_MEMO_X_LOCK));
2893 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
2894 	ut_ad(heap);
2895 
2896 	if (next_page_no == FIL_NULL || !page_rec_is_supremum(
2897 			page_rec_get_next(btr_cur_get_rec(cursor)))) {
2898 
2899 		return(NULL);
2900 	}
2901 
2902 	page_cur_t	next_page_cursor;
2903 	buf_block_t*	next_block;
2904 	page_t*		next_page;
2905 	btr_cur_t	next_father_cursor;
2906 	rec_t*		rec = NULL;
2907 	ulint		zip_size = buf_block_get_zip_size(block);
2908 	ulint		max_size;
2909 
2910 	next_block = btr_block_get(
2911 		buf_block_get_space(block), zip_size,
2912 		next_page_no, RW_X_LATCH, cursor->index, mtr);
2913 	next_page = buf_block_get_frame(next_block);
2914 
2915 	bool	is_leaf = page_is_leaf(next_page);
2916 
2917 	btr_page_get_father(
2918 		cursor->index, next_block, mtr, &next_father_cursor);
2919 
2920 	page_cur_search(
2921 		next_block, cursor->index, tuple, PAGE_CUR_LE,
2922 		&next_page_cursor);
2923 
2924 	max_size = page_get_max_insert_size_after_reorganize(next_page, 1);
2925 
2926 	/* Extends gap lock for the next page */
2927 	lock_update_split_left(next_block, block);
2928 
2929 	rec = page_cur_tuple_insert(
2930 		&next_page_cursor, tuple, cursor->index, offsets, &heap,
2931 		n_ext, mtr);
2932 
2933 	if (rec == NULL) {
2934 		if (zip_size && is_leaf
2935 		    && !dict_index_is_clust(cursor->index)) {
2936 			/* Reset the IBUF_BITMAP_FREE bits, because
2937 			page_cur_tuple_insert() will have attempted page
2938 			reorganize before failing. */
2939 			ibuf_reset_free_bits(next_block);
2940 		}
2941 		return(NULL);
2942 	}
2943 
2944 	ibool	compressed;
2945 	dberr_t	err;
2946 	ulint	level = btr_page_get_level(next_page, mtr);
2947 
2948 	/* adjust cursor position */
2949 	*btr_cur_get_page_cur(cursor) = next_page_cursor;
2950 
2951 	ut_ad(btr_cur_get_rec(cursor) == page_get_infimum_rec(next_page));
2952 	ut_ad(page_rec_get_next(page_get_infimum_rec(next_page)) == rec);
2953 
2954 	/* We have to change the parent node pointer */
2955 
2956 	compressed = btr_cur_pessimistic_delete(
2957 		&err, TRUE, &next_father_cursor,
2958 		BTR_CREATE_FLAG, RB_NONE, mtr);
2959 
2960 	ut_a(err == DB_SUCCESS);
2961 
2962 	if (!compressed) {
2963 		btr_cur_compress_if_useful(&next_father_cursor, FALSE, mtr);
2964 	}
2965 
2966 	dtuple_t*	node_ptr = dict_index_build_node_ptr(
2967 		cursor->index, rec, buf_block_get_page_no(next_block),
2968 		heap, level);
2969 
2970 	btr_insert_on_non_leaf_level(
2971 		flags, cursor->index, level + 1, node_ptr, mtr);
2972 
2973 	ut_ad(rec_offs_validate(rec, cursor->index, *offsets));
2974 
2975 	if (is_leaf && !dict_index_is_clust(cursor->index)) {
2976 		/* Update the free bits of the B-tree page in the
2977 		insert buffer bitmap. */
2978 
2979 		if (zip_size) {
2980 			ibuf_update_free_bits_zip(next_block, mtr);
2981 		} else {
2982 			ibuf_update_free_bits_if_full(
2983 				next_block, max_size,
2984 				rec_offs_size(*offsets) + PAGE_DIR_SLOT_SIZE);
2985 		}
2986 	}
2987 
2988 	return(rec);
2989 }
2990 
2991 /*************************************************************//**
2992 Splits an index page to halves and inserts the tuple. It is assumed
2993 that mtr holds an x-latch to the index tree. NOTE: the tree x-latch is
2994 released within this function! NOTE that the operation of this
2995 function must always succeed, we cannot reverse it: therefore enough
2996 free disk space (2 pages) must be guaranteed to be available before
2997 this function is called.
2998 
2999 @return inserted record or NULL if run out of space */
3000 UNIV_INTERN
3001 rec_t*
btr_page_split_and_insert(ulint flags,btr_cur_t * cursor,ulint ** offsets,mem_heap_t ** heap,const dtuple_t * tuple,ulint n_ext,mtr_t * mtr)3002 btr_page_split_and_insert(
3003 /*======================*/
3004 	ulint		flags,	/*!< in: undo logging and locking flags */
3005 	btr_cur_t*	cursor,	/*!< in: cursor at which to insert; when the
3006 				function returns, the cursor is positioned
3007 				on the predecessor of the inserted record */
3008 	ulint**		offsets,/*!< out: offsets on inserted record */
3009 	mem_heap_t**	heap,	/*!< in/out: pointer to memory heap, or NULL */
3010 	const dtuple_t*	tuple,	/*!< in: tuple to insert */
3011 	ulint		n_ext,	/*!< in: number of externally stored columns */
3012 	mtr_t*		mtr)	/*!< in: mtr */
3013 {
3014 	buf_block_t*	block;
3015 	page_t*		page;
3016 	page_zip_des_t*	page_zip;
3017 	ulint		page_no;
3018 	byte		direction;
3019 	ulint		hint_page_no;
3020 	buf_block_t*	new_block;
3021 	page_t*		new_page;
3022 	page_zip_des_t*	new_page_zip;
3023 	rec_t*		split_rec;
3024 	buf_block_t*	left_block;
3025 	buf_block_t*	right_block;
3026 	buf_block_t*	insert_block;
3027 	page_cur_t*	page_cursor;
3028 	rec_t*		first_rec;
3029 	byte*		buf = 0; /* remove warning */
3030 	rec_t*		move_limit;
3031 	ibool		insert_will_fit;
3032 	ibool		insert_left;
3033 	ulint		n_iterations = 0;
3034 	rec_t*		rec;
3035 	ulint		n_uniq;
3036 
3037 	if (!*heap) {
3038 		*heap = mem_heap_create(1024);
3039 	}
3040 	n_uniq = dict_index_get_n_unique_in_tree(cursor->index);
3041 func_start:
3042 	mem_heap_empty(*heap);
3043 	*offsets = NULL;
3044 
3045 	ut_ad(mtr_memo_contains(mtr, dict_index_get_lock(cursor->index),
3046 				MTR_MEMO_X_LOCK));
3047 	ut_ad(!dict_index_is_online_ddl(cursor->index)
3048 	      || (flags & BTR_CREATE_FLAG)
3049 	      || dict_index_is_clust(cursor->index));
3050 #ifdef UNIV_SYNC_DEBUG
3051 	ut_ad(rw_lock_own(dict_index_get_lock(cursor->index), RW_LOCK_EX));
3052 #endif /* UNIV_SYNC_DEBUG */
3053 
3054 	block = btr_cur_get_block(cursor);
3055 	page = buf_block_get_frame(block);
3056 	page_zip = buf_block_get_page_zip(block);
3057 
3058 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
3059 	ut_ad(!page_is_empty(page));
3060 
3061 	/* try to insert to the next page if possible before split */
3062 	rec = btr_insert_into_right_sibling(
3063 		flags, cursor, offsets, *heap, tuple, n_ext, mtr);
3064 
3065 	if (rec != NULL) {
3066 		return(rec);
3067 	}
3068 
3069 	page_no = buf_block_get_page_no(block);
3070 
3071 	/* 1. Decide the split record; split_rec == NULL means that the
3072 	tuple to be inserted should be the first record on the upper
3073 	half-page */
3074 	insert_left = FALSE;
3075 
3076 	if (n_iterations > 0) {
3077 		direction = FSP_UP;
3078 		hint_page_no = page_no + 1;
3079 		split_rec = btr_page_get_split_rec(cursor, tuple, n_ext);
3080 
3081 		if (split_rec == NULL) {
3082 			insert_left = btr_page_tuple_smaller(
3083 				cursor, tuple, offsets, n_uniq, heap);
3084 		}
3085 	} else if (btr_page_get_split_rec_to_right(cursor, &split_rec)) {
3086 		direction = FSP_UP;
3087 		hint_page_no = page_no + 1;
3088 
3089 	} else if (btr_page_get_split_rec_to_left(cursor, &split_rec)) {
3090 		direction = FSP_DOWN;
3091 		hint_page_no = page_no - 1;
3092 		ut_ad(split_rec);
3093 	} else {
3094 		direction = FSP_UP;
3095 		hint_page_no = page_no + 1;
3096 
3097 		/* If there is only one record in the index page, we
3098 		can't split the node in the middle by default. We need
3099 		to determine whether the new record will be inserted
3100 		to the left or right. */
3101 
3102 		if (page_get_n_recs(page) > 1) {
3103 			split_rec = page_get_middle_rec(page);
3104 		} else if (btr_page_tuple_smaller(cursor, tuple,
3105 						  offsets, n_uniq, heap)) {
3106 			split_rec = page_rec_get_next(
3107 				page_get_infimum_rec(page));
3108 		} else {
3109 			split_rec = NULL;
3110 		}
3111 	}
3112 
3113 	DBUG_EXECUTE_IF("disk_is_full",
3114 			os_has_said_disk_full = true;
3115                         return(NULL););
3116 
3117 	/* 2. Allocate a new page to the index */
3118 	new_block = btr_page_alloc(cursor->index, hint_page_no, direction,
3119 				   btr_page_get_level(page, mtr), mtr, mtr);
3120 
3121 	if (new_block == NULL && os_has_said_disk_full) {
3122 		return(NULL);
3123         }
3124 
3125 	new_page = buf_block_get_frame(new_block);
3126 	new_page_zip = buf_block_get_page_zip(new_block);
3127 	btr_page_create(new_block, new_page_zip, cursor->index,
3128 			btr_page_get_level(page, mtr), mtr);
3129 
3130 	/* 3. Calculate the first record on the upper half-page, and the
3131 	first record (move_limit) on original page which ends up on the
3132 	upper half */
3133 
3134 	if (split_rec) {
3135 		first_rec = move_limit = split_rec;
3136 
3137 		*offsets = rec_get_offsets(split_rec, cursor->index, *offsets,
3138 					   n_uniq, heap);
3139 
3140 		insert_left = cmp_dtuple_rec(tuple, split_rec, *offsets) < 0;
3141 
3142 		if (!insert_left && new_page_zip && n_iterations > 0) {
3143 			/* If a compressed page has already been split,
3144 			avoid further splits by inserting the record
3145 			to an empty page. */
3146 			split_rec = NULL;
3147 			goto insert_empty;
3148 		}
3149 	} else if (insert_left) {
3150 		ut_a(n_iterations > 0);
3151 		first_rec = page_rec_get_next(page_get_infimum_rec(page));
3152 		move_limit = page_rec_get_next(btr_cur_get_rec(cursor));
3153 	} else {
3154 insert_empty:
3155 		ut_ad(!split_rec);
3156 		ut_ad(!insert_left);
3157 		buf = (byte*) mem_alloc(rec_get_converted_size(cursor->index,
3158 							       tuple, n_ext));
3159 
3160 		first_rec = rec_convert_dtuple_to_rec(buf, cursor->index,
3161 						      tuple, n_ext);
3162 		move_limit = page_rec_get_next(btr_cur_get_rec(cursor));
3163 	}
3164 
3165 	/* 4. Do first the modifications in the tree structure */
3166 
3167 	btr_attach_half_pages(flags, cursor->index, block,
3168 			      first_rec, new_block, direction, mtr);
3169 
3170 	/* If the split is made on the leaf level and the insert will fit
3171 	on the appropriate half-page, we may release the tree x-latch.
3172 	We can then move the records after releasing the tree latch,
3173 	thus reducing the tree latch contention. */
3174 
3175 	if (split_rec) {
3176 		insert_will_fit = !new_page_zip
3177 			&& btr_page_insert_fits(cursor, split_rec,
3178 						offsets, tuple, n_ext, heap);
3179 	} else {
3180 		if (!insert_left) {
3181 			mem_free(buf);
3182 			buf = NULL;
3183 		}
3184 
3185 		insert_will_fit = !new_page_zip
3186 			&& btr_page_insert_fits(cursor, NULL,
3187 						offsets, tuple, n_ext, heap);
3188 	}
3189 
3190 	if (insert_will_fit && page_is_leaf(page)
3191 	    && !dict_index_is_online_ddl(cursor->index)) {
3192 
3193 		mtr_memo_release(mtr, dict_index_get_lock(cursor->index),
3194 				 MTR_MEMO_X_LOCK);
3195 	}
3196 
3197 	/* 5. Move then the records to the new page */
3198 	if (direction == FSP_DOWN) {
3199 		/*		fputs("Split left\n", stderr); */
3200 
3201 		if (0
3202 #ifdef UNIV_ZIP_COPY
3203 		    || page_zip
3204 #endif /* UNIV_ZIP_COPY */
3205 		    || !page_move_rec_list_start(new_block, block, move_limit,
3206 						 cursor->index, mtr)) {
3207 			/* For some reason, compressing new_page failed,
3208 			even though it should contain fewer records than
3209 			the original page.  Copy the page byte for byte
3210 			and then delete the records from both pages
3211 			as appropriate.  Deleting will always succeed. */
3212 			ut_a(new_page_zip);
3213 
3214 			page_zip_copy_recs(new_page_zip, new_page,
3215 					   page_zip, page, cursor->index, mtr);
3216 			page_delete_rec_list_end(move_limit - page + new_page,
3217 						 new_block, cursor->index,
3218 						 ULINT_UNDEFINED,
3219 						 ULINT_UNDEFINED, mtr);
3220 
3221 			/* Update the lock table and possible hash index. */
3222 
3223 			lock_move_rec_list_start(
3224 				new_block, block, move_limit,
3225 				new_page + PAGE_NEW_INFIMUM);
3226 
3227 			btr_search_move_or_delete_hash_entries(
3228 				new_block, block, cursor->index);
3229 
3230 			/* Delete the records from the source page. */
3231 
3232 			page_delete_rec_list_start(move_limit, block,
3233 						   cursor->index, mtr);
3234 		}
3235 
3236 		left_block = new_block;
3237 		right_block = block;
3238 
3239 		lock_update_split_left(right_block, left_block);
3240 	} else {
3241 		/*		fputs("Split right\n", stderr); */
3242 
3243 		if (0
3244 #ifdef UNIV_ZIP_COPY
3245 		    || page_zip
3246 #endif /* UNIV_ZIP_COPY */
3247 		    || !page_move_rec_list_end(new_block, block, move_limit,
3248 					       cursor->index, mtr)) {
3249 			/* For some reason, compressing new_page failed,
3250 			even though it should contain fewer records than
3251 			the original page.  Copy the page byte for byte
3252 			and then delete the records from both pages
3253 			as appropriate.  Deleting will always succeed. */
3254 			ut_a(new_page_zip);
3255 
3256 			page_zip_copy_recs(new_page_zip, new_page,
3257 					   page_zip, page, cursor->index, mtr);
3258 			page_delete_rec_list_start(move_limit - page
3259 						   + new_page, new_block,
3260 						   cursor->index, mtr);
3261 
3262 			/* Update the lock table and possible hash index. */
3263 
3264 			lock_move_rec_list_end(new_block, block, move_limit);
3265 
3266 			btr_search_move_or_delete_hash_entries(
3267 				new_block, block, cursor->index);
3268 
3269 			/* Delete the records from the source page. */
3270 
3271 			page_delete_rec_list_end(move_limit, block,
3272 						 cursor->index,
3273 						 ULINT_UNDEFINED,
3274 						 ULINT_UNDEFINED, mtr);
3275 		}
3276 
3277 		left_block = block;
3278 		right_block = new_block;
3279 
3280 		lock_update_split_right(right_block, left_block);
3281 	}
3282 
3283 #ifdef UNIV_ZIP_DEBUG
3284 	if (page_zip) {
3285 		ut_a(page_zip_validate(page_zip, page, cursor->index));
3286 		ut_a(page_zip_validate(new_page_zip, new_page, cursor->index));
3287 	}
3288 #endif /* UNIV_ZIP_DEBUG */
3289 
3290 	/* At this point, split_rec, move_limit and first_rec may point
3291 	to garbage on the old page. */
3292 
3293 	/* 6. The split and the tree modification is now completed. Decide the
3294 	page where the tuple should be inserted */
3295 
3296 	if (insert_left) {
3297 		insert_block = left_block;
3298 	} else {
3299 		insert_block = right_block;
3300 	}
3301 
3302 	/* 7. Reposition the cursor for insert and try insertion */
3303 	page_cursor = btr_cur_get_page_cur(cursor);
3304 
3305 	page_cur_search(insert_block, cursor->index, tuple,
3306 			PAGE_CUR_LE, page_cursor);
3307 
3308 	rec = page_cur_tuple_insert(page_cursor, tuple, cursor->index,
3309 				    offsets, heap, n_ext, mtr);
3310 
3311 #ifdef UNIV_ZIP_DEBUG
3312 	{
3313 		page_t*		insert_page
3314 			= buf_block_get_frame(insert_block);
3315 
3316 		page_zip_des_t*	insert_page_zip
3317 			= buf_block_get_page_zip(insert_block);
3318 
3319 		ut_a(!insert_page_zip
3320 		     || page_zip_validate(insert_page_zip, insert_page,
3321 					  cursor->index));
3322 	}
3323 #endif /* UNIV_ZIP_DEBUG */
3324 
3325 	if (rec != NULL) {
3326 
3327 		goto func_exit;
3328 	}
3329 
3330 	/* 8. If insert did not fit, try page reorganization.
3331 	For compressed pages, page_cur_tuple_insert() will have
3332 	attempted this already. */
3333 
3334 	if (page_cur_get_page_zip(page_cursor)
3335 	    || !btr_page_reorganize(page_cursor, cursor->index, mtr)) {
3336 
3337 		goto insert_failed;
3338 	}
3339 
3340 	rec = page_cur_tuple_insert(page_cursor, tuple, cursor->index,
3341 				    offsets, heap, n_ext, mtr);
3342 
3343 	if (rec == NULL) {
3344 		/* The insert did not fit on the page: loop back to the
3345 		start of the function for a new split */
3346 insert_failed:
3347 		/* We play safe and reset the free bits */
3348 		if (!dict_index_is_clust(cursor->index)) {
3349 			ibuf_reset_free_bits(new_block);
3350 			ibuf_reset_free_bits(block);
3351 		}
3352 
3353 		/* fprintf(stderr, "Split second round %lu\n",
3354 		page_get_page_no(page)); */
3355 		n_iterations++;
3356 		ut_ad(n_iterations < 2
3357 		      || buf_block_get_page_zip(insert_block));
3358 		ut_ad(!insert_will_fit);
3359 
3360 		goto func_start;
3361 	}
3362 
3363 func_exit:
3364 	/* Insert fit on the page: update the free bits for the
3365 	left and right pages in the same mtr */
3366 
3367 	if (!dict_index_is_clust(cursor->index) && page_is_leaf(page)) {
3368 		ibuf_update_free_bits_for_two_pages_low(
3369 			buf_block_get_zip_size(left_block),
3370 			left_block, right_block, mtr);
3371 	}
3372 
3373 #if 0
3374 	fprintf(stderr, "Split and insert done %lu %lu\n",
3375 		buf_block_get_page_no(left_block),
3376 		buf_block_get_page_no(right_block));
3377 #endif
3378 	MONITOR_INC(MONITOR_INDEX_SPLIT);
3379 
3380 	ut_ad(page_validate(buf_block_get_frame(left_block), cursor->index));
3381 	ut_ad(page_validate(buf_block_get_frame(right_block), cursor->index));
3382 
3383 	ut_ad(!rec || rec_offs_validate(rec, cursor->index, *offsets));
3384 	return(rec);
3385 }
3386 
3387 #ifdef UNIV_SYNC_DEBUG
3388 /*************************************************************//**
3389 Removes a page from the level list of pages.
3390 @param space	in: space where removed
3391 @param zip_size	in: compressed page size in bytes, or 0 for uncompressed
3392 @param page	in/out: page to remove
3393 @param index	in: index tree
3394 @param mtr	in/out: mini-transaction */
3395 # define btr_level_list_remove(space,zip_size,page,index,mtr)		\
3396 	btr_level_list_remove_func(space,zip_size,page,index,mtr)
3397 #else /* UNIV_SYNC_DEBUG */
3398 /*************************************************************//**
3399 Removes a page from the level list of pages.
3400 @param space	in: space where removed
3401 @param zip_size	in: compressed page size in bytes, or 0 for uncompressed
3402 @param page	in/out: page to remove
3403 @param index	in: index tree
3404 @param mtr	in/out: mini-transaction */
3405 # define btr_level_list_remove(space,zip_size,page,index,mtr)		\
3406 	btr_level_list_remove_func(space,zip_size,page,mtr)
3407 #endif /* UNIV_SYNC_DEBUG */
3408 
3409 /*************************************************************//**
3410 Removes a page from the level list of pages. */
3411 static
3412 void
btr_level_list_remove_func(ulint space,ulint zip_size,page_t * page,const dict_index_t * index,mtr_t * mtr)3413 btr_level_list_remove_func(
3414 /*=======================*/
3415 	ulint			space,	/*!< in: space where removed */
3416 	ulint			zip_size,/*!< in: compressed page size in bytes
3417 					or 0 for uncompressed pages */
3418 	page_t*			page,	/*!< in/out: page to remove */
3419 #ifdef UNIV_SYNC_DEBUG
3420 	const dict_index_t*	index,	/*!< in: index tree */
3421 #endif /* UNIV_SYNC_DEBUG */
3422 	mtr_t*			mtr)	/*!< in/out: mini-transaction */
3423 {
3424 	ulint	prev_page_no;
3425 	ulint	next_page_no;
3426 
3427 	ut_ad(page != NULL);
3428 	ut_ad(mtr != NULL);
3429 	ut_ad(mtr_memo_contains_page(mtr, page, MTR_MEMO_PAGE_X_FIX));
3430 	ut_ad(space == page_get_space_id(page));
3431 	/* Get the previous and next page numbers of page */
3432 
3433 	prev_page_no = btr_page_get_prev(page, mtr);
3434 	next_page_no = btr_page_get_next(page, mtr);
3435 
3436 	/* Update page links of the level */
3437 
3438 	if (prev_page_no != FIL_NULL) {
3439 		buf_block_t*	prev_block
3440 			= btr_block_get(space, zip_size, prev_page_no,
3441 					RW_X_LATCH, index, mtr);
3442 		page_t*		prev_page
3443 			= buf_block_get_frame(prev_block);
3444 #ifdef UNIV_BTR_DEBUG
3445 		ut_a(page_is_comp(prev_page) == page_is_comp(page));
3446 		ut_a(btr_page_get_next(prev_page, mtr)
3447 		     == page_get_page_no(page));
3448 #endif /* UNIV_BTR_DEBUG */
3449 
3450 		btr_page_set_next(prev_page,
3451 				  buf_block_get_page_zip(prev_block),
3452 				  next_page_no, mtr);
3453 	}
3454 
3455 	if (next_page_no != FIL_NULL) {
3456 		buf_block_t*	next_block
3457 			= btr_block_get(space, zip_size, next_page_no,
3458 					RW_X_LATCH, index, mtr);
3459 		page_t*		next_page
3460 			= buf_block_get_frame(next_block);
3461 #ifdef UNIV_BTR_DEBUG
3462 		ut_a(page_is_comp(next_page) == page_is_comp(page));
3463 		ut_a(btr_page_get_prev(next_page, mtr)
3464 		     == page_get_page_no(page));
3465 #endif /* UNIV_BTR_DEBUG */
3466 
3467 		btr_page_set_prev(next_page,
3468 				  buf_block_get_page_zip(next_block),
3469 				  prev_page_no, mtr);
3470 	}
3471 }
3472 
3473 /****************************************************************//**
3474 Writes the redo log record for setting an index record as the predefined
3475 minimum record. */
3476 UNIV_INLINE
3477 void
btr_set_min_rec_mark_log(rec_t * rec,byte type,mtr_t * mtr)3478 btr_set_min_rec_mark_log(
3479 /*=====================*/
3480 	rec_t*	rec,	/*!< in: record */
3481 	byte	type,	/*!< in: MLOG_COMP_REC_MIN_MARK or MLOG_REC_MIN_MARK */
3482 	mtr_t*	mtr)	/*!< in: mtr */
3483 {
3484 	mlog_write_initial_log_record(rec, type, mtr);
3485 
3486 	/* Write rec offset as a 2-byte ulint */
3487 	mlog_catenate_ulint(mtr, page_offset(rec), MLOG_2BYTES);
3488 }
3489 #else /* !UNIV_HOTBACKUP */
3490 # define btr_set_min_rec_mark_log(rec,comp,mtr) ((void) 0)
3491 #endif /* !UNIV_HOTBACKUP */
3492 
3493 /****************************************************************//**
3494 Parses the redo log record for setting an index record as the predefined
3495 minimum record.
3496 @return	end of log record or NULL */
3497 UNIV_INTERN
3498 byte*
btr_parse_set_min_rec_mark(byte * ptr,byte * end_ptr,ulint comp,page_t * page,mtr_t * mtr)3499 btr_parse_set_min_rec_mark(
3500 /*=======================*/
3501 	byte*	ptr,	/*!< in: buffer */
3502 	byte*	end_ptr,/*!< in: buffer end */
3503 	ulint	comp,	/*!< in: nonzero=compact page format */
3504 	page_t*	page,	/*!< in: page or NULL */
3505 	mtr_t*	mtr)	/*!< in: mtr or NULL */
3506 {
3507 	rec_t*	rec;
3508 
3509 	if (end_ptr < ptr + 2) {
3510 
3511 		return(NULL);
3512 	}
3513 
3514 	if (page) {
3515 		ut_a(!page_is_comp(page) == !comp);
3516 
3517 		rec = page + mach_read_from_2(ptr);
3518 
3519 		btr_set_min_rec_mark(rec, mtr);
3520 	}
3521 
3522 	return(ptr + 2);
3523 }
3524 
3525 /****************************************************************//**
3526 Sets a record as the predefined minimum record. */
3527 UNIV_INTERN
3528 void
btr_set_min_rec_mark(rec_t * rec,mtr_t * mtr)3529 btr_set_min_rec_mark(
3530 /*=================*/
3531 	rec_t*	rec,	/*!< in: record */
3532 	mtr_t*	mtr)	/*!< in: mtr */
3533 {
3534 	ulint	info_bits;
3535 
3536 	if (page_rec_is_comp(rec)) {
3537 		info_bits = rec_get_info_bits(rec, TRUE);
3538 
3539 		rec_set_info_bits_new(rec, info_bits | REC_INFO_MIN_REC_FLAG);
3540 
3541 		btr_set_min_rec_mark_log(rec, MLOG_COMP_REC_MIN_MARK, mtr);
3542 	} else {
3543 		info_bits = rec_get_info_bits(rec, FALSE);
3544 
3545 		rec_set_info_bits_old(rec, info_bits | REC_INFO_MIN_REC_FLAG);
3546 
3547 		btr_set_min_rec_mark_log(rec, MLOG_REC_MIN_MARK, mtr);
3548 	}
3549 }
3550 
3551 #ifndef UNIV_HOTBACKUP
3552 /*************************************************************//**
3553 Deletes on the upper level the node pointer to a page. */
3554 UNIV_INTERN
3555 void
btr_node_ptr_delete(dict_index_t * index,buf_block_t * block,mtr_t * mtr)3556 btr_node_ptr_delete(
3557 /*================*/
3558 	dict_index_t*	index,	/*!< in: index tree */
3559 	buf_block_t*	block,	/*!< in: page whose node pointer is deleted */
3560 	mtr_t*		mtr)	/*!< in: mtr */
3561 {
3562 	btr_cur_t	cursor;
3563 	ibool		compressed;
3564 	dberr_t		err;
3565 
3566 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
3567 
3568 	/* Delete node pointer on father page */
3569 	btr_page_get_father(index, block, mtr, &cursor);
3570 
3571 	compressed = btr_cur_pessimistic_delete(&err, TRUE, &cursor,
3572 						BTR_CREATE_FLAG, RB_NONE, mtr);
3573 	ut_a(err == DB_SUCCESS);
3574 
3575 	if (!compressed) {
3576 		btr_cur_compress_if_useful(&cursor, FALSE, mtr);
3577 	}
3578 }
3579 
3580 /*************************************************************//**
3581 If page is the only on its level, this function moves its records to the
3582 father page, thus reducing the tree height.
3583 @return father block */
3584 static
3585 buf_block_t*
btr_lift_page_up(dict_index_t * index,buf_block_t * block,mtr_t * mtr)3586 btr_lift_page_up(
3587 /*=============*/
3588 	dict_index_t*	index,	/*!< in: index tree */
3589 	buf_block_t*	block,	/*!< in: page which is the only on its level;
3590 				must not be empty: use
3591 				btr_discard_only_page_on_level if the last
3592 				record from the page should be removed */
3593 	mtr_t*		mtr)	/*!< in: mtr */
3594 {
3595 	buf_block_t*	father_block;
3596 	page_t*		father_page;
3597 	ulint		page_level;
3598 	page_zip_des_t*	father_page_zip;
3599 	page_t*		page		= buf_block_get_frame(block);
3600 	ulint		root_page_no;
3601 	buf_block_t*	blocks[BTR_MAX_LEVELS];
3602 	ulint		n_blocks;	/*!< last used index in blocks[] */
3603 	ulint		i;
3604 	bool		lift_father_up;
3605 	buf_block_t*	block_orig	= block;
3606 
3607 	ut_ad(btr_page_get_prev(page, mtr) == FIL_NULL);
3608 	ut_ad(btr_page_get_next(page, mtr) == FIL_NULL);
3609 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
3610 
3611 	page_level = btr_page_get_level(page, mtr);
3612 	root_page_no = dict_index_get_page(index);
3613 
3614 	{
3615 		btr_cur_t	cursor;
3616 		ulint*		offsets	= NULL;
3617 		mem_heap_t*	heap	= mem_heap_create(
3618 			sizeof(*offsets)
3619 			* (REC_OFFS_HEADER_SIZE + 1 + 1 + index->n_fields));
3620 		buf_block_t*	b;
3621 
3622 		offsets = btr_page_get_father_block(offsets, heap, index,
3623 						    block, mtr, &cursor);
3624 		father_block = btr_cur_get_block(&cursor);
3625 		father_page_zip = buf_block_get_page_zip(father_block);
3626 		father_page = buf_block_get_frame(father_block);
3627 
3628 		n_blocks = 0;
3629 
3630 		/* Store all ancestor pages so we can reset their
3631 		levels later on.  We have to do all the searches on
3632 		the tree now because later on, after we've replaced
3633 		the first level, the tree is in an inconsistent state
3634 		and can not be searched. */
3635 		for (b = father_block;
3636 		     buf_block_get_page_no(b) != root_page_no; ) {
3637 			ut_a(n_blocks < BTR_MAX_LEVELS);
3638 
3639 			offsets = btr_page_get_father_block(offsets, heap,
3640 							    index, b,
3641 							    mtr, &cursor);
3642 
3643 			blocks[n_blocks++] = b = btr_cur_get_block(&cursor);
3644 		}
3645 
3646 		lift_father_up = (n_blocks && page_level == 0);
3647 		if (lift_father_up) {
3648 			/* The father page also should be the only on its level (not
3649 			root). We should lift up the father page at first.
3650 			Because the leaf page should be lifted up only for root page.
3651 			The freeing page is based on page_level (==0 or !=0)
3652 			to choose segment. If the page_level is changed ==0 from !=0,
3653 			later freeing of the page doesn't find the page allocation
3654 			to be freed.*/
3655 
3656 			block = father_block;
3657 			page = buf_block_get_frame(block);
3658 			page_level = btr_page_get_level(page, mtr);
3659 
3660 			ut_ad(btr_page_get_prev(page, mtr) == FIL_NULL);
3661 			ut_ad(btr_page_get_next(page, mtr) == FIL_NULL);
3662 			ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
3663 
3664 			father_block = blocks[0];
3665 			father_page_zip = buf_block_get_page_zip(father_block);
3666 			father_page = buf_block_get_frame(father_block);
3667 		}
3668 
3669 		mem_heap_free(heap);
3670 	}
3671 
3672 	btr_search_drop_page_hash_index(block);
3673 
3674 	/* Make the father empty */
3675 	btr_page_empty(father_block, father_page_zip, index, page_level, mtr);
3676 	page_level++;
3677 
3678 	/* Copy the records to the father page one by one. */
3679 	if (0
3680 #ifdef UNIV_ZIP_COPY
3681 	    || father_page_zip
3682 #endif /* UNIV_ZIP_COPY */
3683 	    || !page_copy_rec_list_end(father_block, block,
3684 				       page_get_infimum_rec(page),
3685 				       index, mtr)) {
3686 		const page_zip_des_t*	page_zip
3687 			= buf_block_get_page_zip(block);
3688 		ut_a(father_page_zip);
3689 		ut_a(page_zip);
3690 
3691 		/* Copy the page byte for byte. */
3692 		page_zip_copy_recs(father_page_zip, father_page,
3693 				   page_zip, page, index, mtr);
3694 
3695 		/* Update the lock table and possible hash index. */
3696 
3697 		lock_move_rec_list_end(father_block, block,
3698 				       page_get_infimum_rec(page));
3699 
3700 		btr_search_move_or_delete_hash_entries(father_block, block,
3701 						       index);
3702 	}
3703 
3704 	btr_blob_dbg_remove(page, index, "btr_lift_page_up");
3705 	lock_update_copy_and_discard(father_block, block);
3706 
3707 	/* Go upward to root page, decrementing levels by one. */
3708 	for (i = lift_father_up ? 1 : 0; i < n_blocks; i++, page_level++) {
3709 		page_t*		page	= buf_block_get_frame(blocks[i]);
3710 		page_zip_des_t*	page_zip= buf_block_get_page_zip(blocks[i]);
3711 
3712 		ut_ad(btr_page_get_level(page, mtr) == page_level + 1);
3713 
3714 		btr_page_set_level(page, page_zip, page_level, mtr);
3715 #ifdef UNIV_ZIP_DEBUG
3716 		ut_a(!page_zip || page_zip_validate(page_zip, page, index));
3717 #endif /* UNIV_ZIP_DEBUG */
3718 	}
3719 
3720 	/* Free the file page */
3721 	btr_page_free(index, block, mtr);
3722 
3723 	/* We play it safe and reset the free bits for the father */
3724 	if (!dict_index_is_clust(index)) {
3725 		ibuf_reset_free_bits(father_block);
3726 	}
3727 	ut_ad(page_validate(father_page, index));
3728 	ut_ad(btr_check_node_ptr(index, father_block, mtr));
3729 
3730 	return(lift_father_up ? block_orig : father_block);
3731 }
3732 
3733 /*************************************************************//**
3734 Tries to merge the page first to the left immediate brother if such a
3735 brother exists, and the node pointers to the current page and to the brother
3736 reside on the same page. If the left brother does not satisfy these
3737 conditions, looks at the right brother. If the page is the only one on that
3738 level lifts the records of the page to the father page, thus reducing the
3739 tree height. It is assumed that mtr holds an x-latch on the tree and on the
3740 page. If cursor is on the leaf level, mtr must also hold x-latches to the
3741 brothers, if they exist.
3742 @return	TRUE on success */
3743 UNIV_INTERN
3744 ibool
btr_compress(btr_cur_t * cursor,ibool adjust,mtr_t * mtr)3745 btr_compress(
3746 /*=========*/
3747 	btr_cur_t*	cursor,	/*!< in/out: cursor on the page to merge
3748 				or lift; the page must not be empty:
3749 				when deleting records, use btr_discard_page()
3750 				if the page would become empty */
3751 	ibool		adjust,	/*!< in: TRUE if should adjust the
3752 				cursor position even if compression occurs */
3753 	mtr_t*		mtr)	/*!< in/out: mini-transaction */
3754 {
3755 	dict_index_t*	index;
3756 	ulint		space;
3757 	ulint		zip_size;
3758 	ulint		left_page_no;
3759 	ulint		right_page_no;
3760 	buf_block_t*	merge_block;
3761 	page_t*		merge_page = NULL;
3762 	page_zip_des_t*	merge_page_zip;
3763 	ibool		is_left;
3764 	buf_block_t*	block;
3765 	page_t*		page;
3766 	btr_cur_t	father_cursor;
3767 	mem_heap_t*	heap;
3768 	ulint*		offsets;
3769 	ulint		nth_rec = 0; /* remove bogus warning */
3770 	DBUG_ENTER("btr_compress");
3771 
3772 	block = btr_cur_get_block(cursor);
3773 	page = btr_cur_get_page(cursor);
3774 	index = btr_cur_get_index(cursor);
3775 
3776 	btr_assert_not_corrupted(block, index);
3777 
3778 	ut_ad(mtr_memo_contains(mtr, dict_index_get_lock(index),
3779 				MTR_MEMO_X_LOCK));
3780 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
3781 	space = dict_index_get_space(index);
3782 	zip_size = dict_table_zip_size(index->table);
3783 
3784 	MONITOR_INC(MONITOR_INDEX_MERGE_ATTEMPTS);
3785 
3786 	left_page_no = btr_page_get_prev(page, mtr);
3787 	right_page_no = btr_page_get_next(page, mtr);
3788 
3789 #ifdef UNIV_DEBUG
3790 	if (!page_is_leaf(page) && left_page_no == FIL_NULL) {
3791 		ut_a(REC_INFO_MIN_REC_FLAG & rec_get_info_bits(
3792 			page_rec_get_next(page_get_infimum_rec(page)),
3793 			page_is_comp(page)));
3794 	}
3795 #endif /* UNIV_DEBUG */
3796 
3797 	heap = mem_heap_create(100);
3798 	offsets = btr_page_get_father_block(NULL, heap, index, block, mtr,
3799 					    &father_cursor);
3800 
3801 	if (adjust) {
3802 		nth_rec = page_rec_get_n_recs_before(btr_cur_get_rec(cursor));
3803 		ut_ad(nth_rec > 0);
3804 	}
3805 
3806 	if (left_page_no == FIL_NULL && right_page_no == FIL_NULL) {
3807 		/* The page is the only one on the level, lift the records
3808 		to the father */
3809 
3810 		merge_block = btr_lift_page_up(index, block, mtr);
3811 		goto func_exit;
3812 	}
3813 
3814 	/* Decide the page to which we try to merge and which will inherit
3815 	the locks */
3816 
3817 	is_left = btr_can_merge_with_page(cursor, left_page_no,
3818 					  &merge_block, mtr);
3819 
3820 	DBUG_EXECUTE_IF("ib_always_merge_right", is_left = FALSE;);
3821 
3822 	if(!is_left
3823 	   && !btr_can_merge_with_page(cursor, right_page_no, &merge_block,
3824 				       mtr)) {
3825 		goto err_exit;
3826 	}
3827 
3828 	merge_page = buf_block_get_frame(merge_block);
3829 
3830 #ifdef UNIV_BTR_DEBUG
3831 	if (is_left) {
3832                 ut_a(btr_page_get_next(merge_page, mtr)
3833                      == buf_block_get_page_no(block));
3834 	} else {
3835                ut_a(btr_page_get_prev(merge_page, mtr)
3836                      == buf_block_get_page_no(block));
3837 	}
3838 #endif /* UNIV_BTR_DEBUG */
3839 
3840 	ut_ad(page_validate(merge_page, index));
3841 
3842 	merge_page_zip = buf_block_get_page_zip(merge_block);
3843 #ifdef UNIV_ZIP_DEBUG
3844 	if (merge_page_zip) {
3845 		const page_zip_des_t*	page_zip
3846 			= buf_block_get_page_zip(block);
3847 		ut_a(page_zip);
3848 		ut_a(page_zip_validate(merge_page_zip, merge_page, index));
3849 		ut_a(page_zip_validate(page_zip, page, index));
3850 	}
3851 #endif /* UNIV_ZIP_DEBUG */
3852 
3853 	/* Move records to the merge page */
3854 	if (is_left) {
3855 		rec_t*	orig_pred = page_copy_rec_list_start(
3856 			merge_block, block, page_get_supremum_rec(page),
3857 			index, mtr);
3858 
3859 		if (!orig_pred) {
3860 			goto err_exit;
3861 		}
3862 
3863 		btr_search_drop_page_hash_index(block);
3864 
3865 		/* Remove the page from the level list */
3866 		btr_level_list_remove(space, zip_size, page, index, mtr);
3867 
3868 		btr_node_ptr_delete(index, block, mtr);
3869 		lock_update_merge_left(merge_block, orig_pred, block);
3870 
3871 		if (adjust) {
3872 			nth_rec += page_rec_get_n_recs_before(orig_pred);
3873 		}
3874 	} else {
3875 		rec_t*		orig_succ;
3876 		ibool		compressed;
3877 		dberr_t		err;
3878 		btr_cur_t	cursor2;
3879 					/* father cursor pointing to node ptr
3880 					of the right sibling */
3881 #ifdef UNIV_BTR_DEBUG
3882 		byte		fil_page_prev[4];
3883 #endif /* UNIV_BTR_DEBUG */
3884 
3885 		btr_page_get_father(index, merge_block, mtr, &cursor2);
3886 
3887 		if (merge_page_zip && left_page_no == FIL_NULL) {
3888 
3889 			/* The function page_zip_compress(), which will be
3890 			invoked by page_copy_rec_list_end() below,
3891 			requires that FIL_PAGE_PREV be FIL_NULL.
3892 			Clear the field, but prepare to restore it. */
3893 #ifdef UNIV_BTR_DEBUG
3894 			memcpy(fil_page_prev, merge_page + FIL_PAGE_PREV, 4);
3895 #endif /* UNIV_BTR_DEBUG */
3896 #if FIL_NULL != 0xffffffff
3897 # error "FIL_NULL != 0xffffffff"
3898 #endif
3899 			memset(merge_page + FIL_PAGE_PREV, 0xff, 4);
3900 		}
3901 
3902 		orig_succ = page_copy_rec_list_end(merge_block, block,
3903 						   page_get_infimum_rec(page),
3904 						   cursor->index, mtr);
3905 
3906 		if (!orig_succ) {
3907 			ut_a(merge_page_zip);
3908 #ifdef UNIV_BTR_DEBUG
3909 			if (left_page_no == FIL_NULL) {
3910 				/* FIL_PAGE_PREV was restored from
3911 				merge_page_zip. */
3912 				ut_a(!memcmp(fil_page_prev,
3913 					     merge_page + FIL_PAGE_PREV, 4));
3914 			}
3915 #endif /* UNIV_BTR_DEBUG */
3916 			goto err_exit;
3917 		}
3918 
3919 		btr_search_drop_page_hash_index(block);
3920 
3921 #ifdef UNIV_BTR_DEBUG
3922 		if (merge_page_zip && left_page_no == FIL_NULL) {
3923 
3924 			/* Restore FIL_PAGE_PREV in order to avoid an assertion
3925 			failure in btr_level_list_remove(), which will set
3926 			the field again to FIL_NULL.  Even though this makes
3927 			merge_page and merge_page_zip inconsistent for a
3928 			split second, it is harmless, because the pages
3929 			are X-latched. */
3930 			memcpy(merge_page + FIL_PAGE_PREV, fil_page_prev, 4);
3931 		}
3932 #endif /* UNIV_BTR_DEBUG */
3933 
3934 		/* Remove the page from the level list */
3935 		btr_level_list_remove(space, zip_size, page, index, mtr);
3936 
3937 		/* Replace the address of the old child node (= page) with the
3938 		address of the merge page to the right */
3939 		btr_node_ptr_set_child_page_no(
3940 			btr_cur_get_rec(&father_cursor),
3941 			btr_cur_get_page_zip(&father_cursor),
3942 			offsets, right_page_no, mtr);
3943 
3944 		compressed = btr_cur_pessimistic_delete(&err, TRUE, &cursor2,
3945 							BTR_CREATE_FLAG,
3946 							RB_NONE, mtr);
3947 		ut_a(err == DB_SUCCESS);
3948 
3949 		if (!compressed) {
3950 			btr_cur_compress_if_useful(&cursor2, FALSE, mtr);
3951 		}
3952 
3953 		lock_update_merge_right(merge_block, orig_succ, block);
3954 	}
3955 
3956 	btr_blob_dbg_remove(page, index, "btr_compress");
3957 
3958 	if (!dict_index_is_clust(index) && page_is_leaf(merge_page)) {
3959 		/* Update the free bits of the B-tree page in the
3960 		insert buffer bitmap.  This has to be done in a
3961 		separate mini-transaction that is committed before the
3962 		main mini-transaction.  We cannot update the insert
3963 		buffer bitmap in this mini-transaction, because
3964 		btr_compress() can be invoked recursively without
3965 		committing the mini-transaction in between.  Since
3966 		insert buffer bitmap pages have a lower rank than
3967 		B-tree pages, we must not access other pages in the
3968 		same mini-transaction after accessing an insert buffer
3969 		bitmap page. */
3970 
3971 		/* The free bits in the insert buffer bitmap must
3972 		never exceed the free space on a page.  It is safe to
3973 		decrement or reset the bits in the bitmap in a
3974 		mini-transaction that is committed before the
3975 		mini-transaction that affects the free space. */
3976 
3977 		/* It is unsafe to increment the bits in a separately
3978 		committed mini-transaction, because in crash recovery,
3979 		the free bits could momentarily be set too high. */
3980 
3981 		if (zip_size) {
3982 			/* Because the free bits may be incremented
3983 			and we cannot update the insert buffer bitmap
3984 			in the same mini-transaction, the only safe
3985 			thing we can do here is the pessimistic
3986 			approach: reset the free bits. */
3987 			ibuf_reset_free_bits(merge_block);
3988 		} else {
3989 			/* On uncompressed pages, the free bits will
3990 			never increase here.  Thus, it is safe to
3991 			write the bits accurately in a separate
3992 			mini-transaction. */
3993 			ibuf_update_free_bits_if_full(merge_block,
3994 						      UNIV_PAGE_SIZE,
3995 						      ULINT_UNDEFINED);
3996 		}
3997 	}
3998 
3999 	ut_ad(page_validate(merge_page, index));
4000 #ifdef UNIV_ZIP_DEBUG
4001 	ut_a(!merge_page_zip || page_zip_validate(merge_page_zip, merge_page,
4002 						  index));
4003 #endif /* UNIV_ZIP_DEBUG */
4004 
4005 	/* Free the file page */
4006 	btr_page_free(index, block, mtr);
4007 
4008 	ut_ad(btr_check_node_ptr(index, merge_block, mtr));
4009 func_exit:
4010 	mem_heap_free(heap);
4011 
4012 	if (adjust) {
4013 		ut_ad(nth_rec > 0);
4014 		btr_cur_position(
4015 			index,
4016 			page_rec_get_nth(merge_block->frame, nth_rec),
4017 			merge_block, cursor);
4018 	}
4019 
4020 	MONITOR_INC(MONITOR_INDEX_MERGE_SUCCESSFUL);
4021 
4022 	DBUG_RETURN(TRUE);
4023 
4024 err_exit:
4025 	/* We play it safe and reset the free bits. */
4026 	if (zip_size
4027 	    && merge_page
4028 	    && page_is_leaf(merge_page)
4029 	    && !dict_index_is_clust(index)) {
4030 		ibuf_reset_free_bits(merge_block);
4031 	}
4032 
4033 	mem_heap_free(heap);
4034 	DBUG_RETURN(FALSE);
4035 }
4036 
4037 /*************************************************************//**
4038 Discards a page that is the only page on its level.  This will empty
4039 the whole B-tree, leaving just an empty root page.  This function
4040 should never be reached, because btr_compress(), which is invoked in
4041 delete operations, calls btr_lift_page_up() to flatten the B-tree. */
4042 static
4043 void
btr_discard_only_page_on_level(dict_index_t * index,buf_block_t * block,mtr_t * mtr)4044 btr_discard_only_page_on_level(
4045 /*===========================*/
4046 	dict_index_t*	index,	/*!< in: index tree */
4047 	buf_block_t*	block,	/*!< in: page which is the only on its level */
4048 	mtr_t*		mtr)	/*!< in: mtr */
4049 {
4050 	ulint		page_level = 0;
4051 	trx_id_t	max_trx_id;
4052 
4053 	/* Save the PAGE_MAX_TRX_ID from the leaf page. */
4054 	max_trx_id = page_get_max_trx_id(buf_block_get_frame(block));
4055 
4056 	while (buf_block_get_page_no(block) != dict_index_get_page(index)) {
4057 		btr_cur_t	cursor;
4058 		buf_block_t*	father;
4059 		const page_t*	page	= buf_block_get_frame(block);
4060 
4061 		ut_a(page_get_n_recs(page) == 1);
4062 		ut_a(page_level == btr_page_get_level(page, mtr));
4063 		ut_a(btr_page_get_prev(page, mtr) == FIL_NULL);
4064 		ut_a(btr_page_get_next(page, mtr) == FIL_NULL);
4065 
4066 		ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
4067 		btr_search_drop_page_hash_index(block);
4068 
4069 		btr_page_get_father(index, block, mtr, &cursor);
4070 		father = btr_cur_get_block(&cursor);
4071 
4072 		lock_update_discard(father, PAGE_HEAP_NO_SUPREMUM, block);
4073 
4074 		/* Free the file page */
4075 		btr_page_free(index, block, mtr);
4076 
4077 		block = father;
4078 		page_level++;
4079 	}
4080 
4081 	/* block is the root page, which must be empty, except
4082 	for the node pointer to the (now discarded) block(s). */
4083 
4084 #ifdef UNIV_BTR_DEBUG
4085 	if (!dict_index_is_ibuf(index)) {
4086 		const page_t*	root	= buf_block_get_frame(block);
4087 		const ulint	space	= dict_index_get_space(index);
4088 		ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_LEAF
4089 					    + root, space));
4090 		ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP
4091 					    + root, space));
4092 	}
4093 #endif /* UNIV_BTR_DEBUG */
4094 
4095 	btr_page_empty(block, buf_block_get_page_zip(block), index, 0, mtr);
4096 	ut_ad(page_is_leaf(buf_block_get_frame(block)));
4097 
4098 	if (!dict_index_is_clust(index)) {
4099 		/* We play it safe and reset the free bits for the root */
4100 		ibuf_reset_free_bits(block);
4101 
4102 		ut_a(max_trx_id);
4103 		page_set_max_trx_id(block,
4104 				    buf_block_get_page_zip(block),
4105 				    max_trx_id, mtr);
4106 	}
4107 }
4108 
4109 /*************************************************************//**
4110 Discards a page from a B-tree. This is used to remove the last record from
4111 a B-tree page: the whole page must be removed at the same time. This cannot
4112 be used for the root page, which is allowed to be empty. */
4113 UNIV_INTERN
4114 void
btr_discard_page(btr_cur_t * cursor,mtr_t * mtr)4115 btr_discard_page(
4116 /*=============*/
4117 	btr_cur_t*	cursor,	/*!< in: cursor on the page to discard: not on
4118 				the root page */
4119 	mtr_t*		mtr)	/*!< in: mtr */
4120 {
4121 	dict_index_t*	index;
4122 	ulint		space;
4123 	ulint		zip_size;
4124 	ulint		left_page_no;
4125 	ulint		right_page_no;
4126 	buf_block_t*	merge_block;
4127 	page_t*		merge_page;
4128 	buf_block_t*	block;
4129 	page_t*		page;
4130 	rec_t*		node_ptr;
4131 
4132 	block = btr_cur_get_block(cursor);
4133 	index = btr_cur_get_index(cursor);
4134 
4135 	ut_ad(dict_index_get_page(index) != buf_block_get_page_no(block));
4136 	ut_ad(mtr_memo_contains(mtr, dict_index_get_lock(index),
4137 				MTR_MEMO_X_LOCK));
4138 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
4139 	space = dict_index_get_space(index);
4140 	zip_size = dict_table_zip_size(index->table);
4141 
4142 	MONITOR_INC(MONITOR_INDEX_DISCARD);
4143 
4144 	/* Decide the page which will inherit the locks */
4145 
4146 	left_page_no = btr_page_get_prev(buf_nonnull_block_get_frame(block),
4147 					 mtr);
4148 	right_page_no = btr_page_get_next(buf_nonnull_block_get_frame(block),
4149 					  mtr);
4150 
4151 	if (left_page_no != FIL_NULL) {
4152 		merge_block = btr_block_get(space, zip_size, left_page_no,
4153 					    RW_X_LATCH, index, mtr);
4154 		merge_page = buf_block_get_frame(merge_block);
4155 #ifdef UNIV_BTR_DEBUG
4156 		ut_a(btr_page_get_next(merge_page, mtr)
4157 		     == buf_block_get_page_no(block));
4158 #endif /* UNIV_BTR_DEBUG */
4159 	} else if (right_page_no != FIL_NULL) {
4160 		merge_block = btr_block_get(space, zip_size, right_page_no,
4161 					    RW_X_LATCH, index, mtr);
4162 		merge_page = buf_block_get_frame(merge_block);
4163 #ifdef UNIV_BTR_DEBUG
4164 		ut_a(btr_page_get_prev(merge_page, mtr)
4165 		     == buf_block_get_page_no(block));
4166 #endif /* UNIV_BTR_DEBUG */
4167 	} else {
4168 		btr_discard_only_page_on_level(index, block, mtr);
4169 
4170 		return;
4171 	}
4172 
4173 	page = buf_block_get_frame(block);
4174 	ut_a(page_is_comp(merge_page) == page_is_comp(page));
4175 	btr_search_drop_page_hash_index(block);
4176 
4177 	if (left_page_no == FIL_NULL && !page_is_leaf(page)) {
4178 
4179 		/* We have to mark the leftmost node pointer on the right
4180 		side page as the predefined minimum record */
4181 		node_ptr = page_rec_get_next(page_get_infimum_rec(merge_page));
4182 
4183 		ut_ad(page_rec_is_user_rec(node_ptr));
4184 
4185 		/* This will make page_zip_validate() fail on merge_page
4186 		until btr_level_list_remove() completes.  This is harmless,
4187 		because everything will take place within a single
4188 		mini-transaction and because writing to the redo log
4189 		is an atomic operation (performed by mtr_commit()). */
4190 		btr_set_min_rec_mark(node_ptr, mtr);
4191 	}
4192 
4193 	btr_node_ptr_delete(index, block, mtr);
4194 
4195 	/* Remove the page from the level list */
4196 	btr_level_list_remove(space, zip_size, page, index, mtr);
4197 #ifdef UNIV_ZIP_DEBUG
4198 	{
4199 		page_zip_des_t*	merge_page_zip
4200 			= buf_block_get_page_zip(merge_block);
4201 		ut_a(!merge_page_zip
4202 		     || page_zip_validate(merge_page_zip, merge_page, index));
4203 	}
4204 #endif /* UNIV_ZIP_DEBUG */
4205 
4206 	if (left_page_no != FIL_NULL) {
4207 		lock_update_discard(merge_block, PAGE_HEAP_NO_SUPREMUM,
4208 				    block);
4209 	} else {
4210 		lock_update_discard(merge_block,
4211 				    lock_get_min_heap_no(merge_block),
4212 				    block);
4213 	}
4214 
4215 	btr_blob_dbg_remove(page, index, "btr_discard_page");
4216 
4217 	/* Free the file page */
4218 	btr_page_free(index, block, mtr);
4219 
4220 	ut_ad(btr_check_node_ptr(index, merge_block, mtr));
4221 }
4222 
4223 #ifdef UNIV_BTR_PRINT
4224 /*************************************************************//**
4225 Prints size info of a B-tree. */
4226 UNIV_INTERN
4227 void
btr_print_size(dict_index_t * index)4228 btr_print_size(
4229 /*===========*/
4230 	dict_index_t*	index)	/*!< in: index tree */
4231 {
4232 	page_t*		root;
4233 	fseg_header_t*	seg;
4234 	mtr_t		mtr;
4235 
4236 	if (dict_index_is_ibuf(index)) {
4237 		fputs("Sorry, cannot print info of an ibuf tree:"
4238 		      " use ibuf functions\n", stderr);
4239 
4240 		return;
4241 	}
4242 
4243 	mtr_start(&mtr);
4244 
4245 	root = btr_root_get(index, &mtr);
4246 
4247 	seg = root + PAGE_HEADER + PAGE_BTR_SEG_TOP;
4248 
4249 	fputs("INFO OF THE NON-LEAF PAGE SEGMENT\n", stderr);
4250 	fseg_print(seg, &mtr);
4251 
4252 	if (!dict_index_is_univ(index)) {
4253 
4254 		seg = root + PAGE_HEADER + PAGE_BTR_SEG_LEAF;
4255 
4256 		fputs("INFO OF THE LEAF PAGE SEGMENT\n", stderr);
4257 		fseg_print(seg, &mtr);
4258 	}
4259 
4260 	mtr_commit(&mtr);
4261 }
4262 
4263 /************************************************************//**
4264 Prints recursively index tree pages. */
4265 static
4266 void
btr_print_recursive(dict_index_t * index,buf_block_t * block,ulint width,mem_heap_t ** heap,ulint ** offsets,mtr_t * mtr)4267 btr_print_recursive(
4268 /*================*/
4269 	dict_index_t*	index,	/*!< in: index tree */
4270 	buf_block_t*	block,	/*!< in: index page */
4271 	ulint		width,	/*!< in: print this many entries from start
4272 				and end */
4273 	mem_heap_t**	heap,	/*!< in/out: heap for rec_get_offsets() */
4274 	ulint**		offsets,/*!< in/out: buffer for rec_get_offsets() */
4275 	mtr_t*		mtr)	/*!< in: mtr */
4276 {
4277 	const page_t*	page	= buf_block_get_frame(block);
4278 	page_cur_t	cursor;
4279 	ulint		n_recs;
4280 	ulint		i	= 0;
4281 	mtr_t		mtr2;
4282 
4283 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
4284 	fprintf(stderr, "NODE ON LEVEL %lu page number %lu\n",
4285 		(ulong) btr_page_get_level(page, mtr),
4286 		(ulong) buf_block_get_page_no(block));
4287 
4288 	page_print(block, index, width, width);
4289 
4290 	n_recs = page_get_n_recs(page);
4291 
4292 	page_cur_set_before_first(block, &cursor);
4293 	page_cur_move_to_next(&cursor);
4294 
4295 	while (!page_cur_is_after_last(&cursor)) {
4296 
4297 		if (page_is_leaf(page)) {
4298 
4299 			/* If this is the leaf level, do nothing */
4300 
4301 		} else if ((i <= width) || (i >= n_recs - width)) {
4302 
4303 			const rec_t*	node_ptr;
4304 
4305 			mtr_start(&mtr2);
4306 
4307 			node_ptr = page_cur_get_rec(&cursor);
4308 
4309 			*offsets = rec_get_offsets(node_ptr, index, *offsets,
4310 						   ULINT_UNDEFINED, heap);
4311 			btr_print_recursive(index,
4312 					    btr_node_ptr_get_child(node_ptr,
4313 								   index,
4314 								   *offsets,
4315 								   &mtr2),
4316 					    width, heap, offsets, &mtr2);
4317 			mtr_commit(&mtr2);
4318 		}
4319 
4320 		page_cur_move_to_next(&cursor);
4321 		i++;
4322 	}
4323 }
4324 
4325 /**************************************************************//**
4326 Prints directories and other info of all nodes in the tree. */
4327 UNIV_INTERN
4328 void
btr_print_index(dict_index_t * index,ulint width)4329 btr_print_index(
4330 /*============*/
4331 	dict_index_t*	index,	/*!< in: index */
4332 	ulint		width)	/*!< in: print this many entries from start
4333 				and end */
4334 {
4335 	mtr_t		mtr;
4336 	buf_block_t*	root;
4337 	mem_heap_t*	heap	= NULL;
4338 	ulint		offsets_[REC_OFFS_NORMAL_SIZE];
4339 	ulint*		offsets	= offsets_;
4340 	rec_offs_init(offsets_);
4341 
4342 	fputs("--------------------------\n"
4343 	      "INDEX TREE PRINT\n", stderr);
4344 
4345 	mtr_start(&mtr);
4346 
4347 	root = btr_root_block_get(index, RW_X_LATCH, &mtr);
4348 
4349 	btr_print_recursive(index, root, width, &heap, &offsets, &mtr);
4350 	if (heap) {
4351 		mem_heap_free(heap);
4352 	}
4353 
4354 	mtr_commit(&mtr);
4355 
4356 	btr_validate_index(index, 0);
4357 }
4358 #endif /* UNIV_BTR_PRINT */
4359 
4360 #ifdef UNIV_DEBUG
4361 /************************************************************//**
4362 Checks that the node pointer to a page is appropriate.
4363 @return	TRUE */
4364 UNIV_INTERN
4365 ibool
btr_check_node_ptr(dict_index_t * index,buf_block_t * block,mtr_t * mtr)4366 btr_check_node_ptr(
4367 /*===============*/
4368 	dict_index_t*	index,	/*!< in: index tree */
4369 	buf_block_t*	block,	/*!< in: index page */
4370 	mtr_t*		mtr)	/*!< in: mtr */
4371 {
4372 	mem_heap_t*	heap;
4373 	dtuple_t*	tuple;
4374 	ulint*		offsets;
4375 	btr_cur_t	cursor;
4376 	page_t*		page = buf_block_get_frame(block);
4377 
4378 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
4379 	if (dict_index_get_page(index) == buf_block_get_page_no(block)) {
4380 
4381 		return(TRUE);
4382 	}
4383 
4384 	heap = mem_heap_create(256);
4385 	offsets = btr_page_get_father_block(NULL, heap, index, block, mtr,
4386 					    &cursor);
4387 
4388 	if (page_is_leaf(page)) {
4389 
4390 		goto func_exit;
4391 	}
4392 
4393 	tuple = dict_index_build_node_ptr(
4394 		index, page_rec_get_next(page_get_infimum_rec(page)), 0, heap,
4395 		btr_page_get_level(page, mtr));
4396 
4397 	ut_a(!cmp_dtuple_rec(tuple, btr_cur_get_rec(&cursor), offsets));
4398 func_exit:
4399 	mem_heap_free(heap);
4400 
4401 	return(TRUE);
4402 }
4403 #endif /* UNIV_DEBUG */
4404 
4405 /************************************************************//**
4406 Display identification information for a record. */
4407 static
4408 void
btr_index_rec_validate_report(const page_t * page,const rec_t * rec,const dict_index_t * index)4409 btr_index_rec_validate_report(
4410 /*==========================*/
4411 	const page_t*		page,	/*!< in: index page */
4412 	const rec_t*		rec,	/*!< in: index record */
4413 	const dict_index_t*	index)	/*!< in: index */
4414 {
4415 	fputs("InnoDB: Record in ", stderr);
4416 	dict_index_name_print(stderr, NULL, index);
4417 	fprintf(stderr, ", page %lu, at offset %lu\n",
4418 		page_get_page_no(page), (ulint) page_offset(rec));
4419 }
4420 
4421 /************************************************************//**
4422 Checks the size and number of fields in a record based on the definition of
4423 the index.
4424 @return	TRUE if ok */
4425 UNIV_INTERN
4426 ibool
btr_index_rec_validate(const rec_t * rec,const dict_index_t * index,ibool dump_on_error)4427 btr_index_rec_validate(
4428 /*===================*/
4429 	const rec_t*		rec,		/*!< in: index record */
4430 	const dict_index_t*	index,		/*!< in: index */
4431 	ibool			dump_on_error)	/*!< in: TRUE if the function
4432 						should print hex dump of record
4433 						and page on error */
4434 {
4435 	ulint		len;
4436 	ulint		n;
4437 	ulint		i;
4438 	const page_t*	page;
4439 	mem_heap_t*	heap	= NULL;
4440 	ulint		offsets_[REC_OFFS_NORMAL_SIZE];
4441 	ulint*		offsets	= offsets_;
4442 	rec_offs_init(offsets_);
4443 
4444 	page = page_align(rec);
4445 
4446 	if (dict_index_is_univ(index)) {
4447 		/* The insert buffer index tree can contain records from any
4448 		other index: we cannot check the number of fields or
4449 		their length */
4450 
4451 		return(TRUE);
4452 	}
4453 
4454 	if ((ibool)!!page_is_comp(page) != dict_table_is_comp(index->table)) {
4455 		btr_index_rec_validate_report(page, rec, index);
4456 		fprintf(stderr, "InnoDB: compact flag=%lu, should be %lu\n",
4457 			(ulong) !!page_is_comp(page),
4458 			(ulong) dict_table_is_comp(index->table));
4459 
4460 		return(FALSE);
4461 	}
4462 
4463 	n = dict_index_get_n_fields(index);
4464 
4465 	if (!page_is_comp(page) && rec_get_n_fields_old(rec) != n) {
4466 		btr_index_rec_validate_report(page, rec, index);
4467 		fprintf(stderr, "InnoDB: has %lu fields, should have %lu\n",
4468 			(ulong) rec_get_n_fields_old(rec), (ulong) n);
4469 
4470 		if (dump_on_error) {
4471 			buf_page_print(page, 0, BUF_PAGE_PRINT_NO_CRASH);
4472 
4473 			fputs("InnoDB: corrupt record ", stderr);
4474 			rec_print_old(stderr, rec);
4475 			putc('\n', stderr);
4476 		}
4477 		return(FALSE);
4478 	}
4479 
4480 	offsets = rec_get_offsets(rec, index, offsets, ULINT_UNDEFINED, &heap);
4481 
4482 	for (i = 0; i < n; i++) {
4483 		ulint	fixed_size = dict_col_get_fixed_size(
4484 			dict_index_get_nth_col(index, i), page_is_comp(page));
4485 
4486 		rec_get_nth_field_offs(offsets, i, &len);
4487 
4488 		/* Note that if fixed_size != 0, it equals the
4489 		length of a fixed-size column in the clustered index.
4490 		A prefix index of the column is of fixed, but different
4491 		length.  When fixed_size == 0, prefix_len is the maximum
4492 		length of the prefix index column. */
4493 
4494 		if ((dict_index_get_nth_field(index, i)->prefix_len == 0
4495 		     && len != UNIV_SQL_NULL && fixed_size
4496 		     && len != fixed_size)
4497 		    || (dict_index_get_nth_field(index, i)->prefix_len > 0
4498 			&& len != UNIV_SQL_NULL
4499 			&& len
4500 			> dict_index_get_nth_field(index, i)->prefix_len)) {
4501 
4502 			btr_index_rec_validate_report(page, rec, index);
4503 			fprintf(stderr,
4504 				"InnoDB: field %lu len is %lu,"
4505 				" should be %lu\n",
4506 				(ulong) i, (ulong) len, (ulong) fixed_size);
4507 
4508 			if (dump_on_error) {
4509 				buf_page_print(page, 0,
4510 					       BUF_PAGE_PRINT_NO_CRASH);
4511 
4512 				fputs("InnoDB: corrupt record ", stderr);
4513 				rec_print_new(stderr, rec, offsets);
4514 				putc('\n', stderr);
4515 			}
4516 			if (heap) {
4517 				mem_heap_free(heap);
4518 			}
4519 			return(FALSE);
4520 		}
4521 	}
4522 
4523 	if (heap) {
4524 		mem_heap_free(heap);
4525 	}
4526 	return(TRUE);
4527 }
4528 
4529 /************************************************************//**
4530 Checks the size and number of fields in records based on the definition of
4531 the index.
4532 @return	TRUE if ok */
4533 static
4534 ibool
btr_index_page_validate(buf_block_t * block,dict_index_t * index)4535 btr_index_page_validate(
4536 /*====================*/
4537 	buf_block_t*	block,	/*!< in: index page */
4538 	dict_index_t*	index)	/*!< in: index */
4539 {
4540 	page_cur_t	cur;
4541 	ibool		ret	= TRUE;
4542 #ifndef DBUG_OFF
4543 	ulint		nth	= 1;
4544 #endif /* !DBUG_OFF */
4545 
4546 	page_cur_set_before_first(block, &cur);
4547 
4548 	/* Directory slot 0 should only contain the infimum record. */
4549 	DBUG_EXECUTE_IF("check_table_rec_next",
4550 			ut_a(page_rec_get_nth_const(
4551 				     page_cur_get_page(&cur), 0)
4552 			     == cur.rec);
4553 			ut_a(page_dir_slot_get_n_owned(
4554 				     page_dir_get_nth_slot(
4555 					     page_cur_get_page(&cur), 0))
4556 			     == 1););
4557 
4558 	page_cur_move_to_next(&cur);
4559 
4560 	for (;;) {
4561 		if (page_cur_is_after_last(&cur)) {
4562 
4563 			break;
4564 		}
4565 
4566 		if (!btr_index_rec_validate(cur.rec, index, TRUE)) {
4567 
4568 			return(FALSE);
4569 		}
4570 
4571 		/* Verify that page_rec_get_nth_const() is correctly
4572 		retrieving each record. */
4573 		DBUG_EXECUTE_IF("check_table_rec_next",
4574 				ut_a(cur.rec == page_rec_get_nth_const(
4575 					     page_cur_get_page(&cur),
4576 					     page_rec_get_n_recs_before(
4577 						     cur.rec)));
4578 				ut_a(nth++ == page_rec_get_n_recs_before(
4579 					     cur.rec)););
4580 
4581 		page_cur_move_to_next(&cur);
4582 	}
4583 
4584 	return(ret);
4585 }
4586 
4587 /************************************************************//**
4588 Report an error on one page of an index tree. */
4589 static
4590 void
btr_validate_report1(dict_index_t * index,ulint level,const buf_block_t * block)4591 btr_validate_report1(
4592 /*=================*/
4593 	dict_index_t*		index,	/*!< in: index */
4594 	ulint			level,	/*!< in: B-tree level */
4595 	const buf_block_t*	block)	/*!< in: index page */
4596 {
4597 	fprintf(stderr, "InnoDB: Error in page %lu of ",
4598 		buf_block_get_page_no(block));
4599 	dict_index_name_print(stderr, NULL, index);
4600 	if (level) {
4601 		fprintf(stderr, ", index tree level %lu", level);
4602 	}
4603 	putc('\n', stderr);
4604 }
4605 
4606 /************************************************************//**
4607 Report an error on two pages of an index tree. */
4608 static
4609 void
btr_validate_report2(const dict_index_t * index,ulint level,const buf_block_t * block1,const buf_block_t * block2)4610 btr_validate_report2(
4611 /*=================*/
4612 	const dict_index_t*	index,	/*!< in: index */
4613 	ulint			level,	/*!< in: B-tree level */
4614 	const buf_block_t*	block1,	/*!< in: first index page */
4615 	const buf_block_t*	block2)	/*!< in: second index page */
4616 {
4617 	fprintf(stderr, "InnoDB: Error in pages %lu and %lu of ",
4618 		buf_block_get_page_no(block1),
4619 		buf_block_get_page_no(block2));
4620 	dict_index_name_print(stderr, NULL, index);
4621 	if (level) {
4622 		fprintf(stderr, ", index tree level %lu", level);
4623 	}
4624 	putc('\n', stderr);
4625 }
4626 
4627 /************************************************************//**
4628 Validates index tree level.
4629 @return	TRUE if ok */
4630 static
4631 bool
btr_validate_level(dict_index_t * index,const trx_t * trx,ulint level)4632 btr_validate_level(
4633 /*===============*/
4634 	dict_index_t*	index,	/*!< in: index tree */
4635 	const trx_t*	trx,	/*!< in: transaction or NULL */
4636 	ulint		level)	/*!< in: level number */
4637 {
4638 	ulint		space;
4639 	ulint		space_flags;
4640 	ulint		zip_size;
4641 	buf_block_t*	block;
4642 	page_t*		page;
4643 	buf_block_t*	right_block = 0; /* remove warning */
4644 	page_t*		right_page = 0; /* remove warning */
4645 	page_t*		father_page;
4646 	btr_cur_t	node_cur;
4647 	btr_cur_t	right_node_cur;
4648 	rec_t*		rec;
4649 	ulint		right_page_no;
4650 	ulint		left_page_no;
4651 	page_cur_t	cursor;
4652 	dtuple_t*	node_ptr_tuple;
4653 	bool		ret	= true;
4654 	mtr_t		mtr;
4655 	mem_heap_t*	heap	= mem_heap_create(256);
4656 	fseg_header_t*	seg;
4657 	ulint*		offsets	= NULL;
4658 	ulint*		offsets2= NULL;
4659 #ifdef UNIV_ZIP_DEBUG
4660 	page_zip_des_t*	page_zip;
4661 #endif /* UNIV_ZIP_DEBUG */
4662 
4663 	mtr_start(&mtr);
4664 
4665 	mtr_x_lock(dict_index_get_lock(index), &mtr);
4666 
4667 	block = btr_root_block_get(index, RW_X_LATCH, &mtr);
4668 	page = buf_block_get_frame(block);
4669 	seg = page + PAGE_HEADER + PAGE_BTR_SEG_TOP;
4670 
4671 	space = dict_index_get_space(index);
4672 	zip_size = dict_table_zip_size(index->table);
4673 
4674 	fil_space_get_latch(space, &space_flags);
4675 
4676 	if (zip_size != dict_tf_get_zip_size(space_flags)) {
4677 
4678 		ib_logf(IB_LOG_LEVEL_WARN,
4679 			"Flags mismatch: table=%lu, tablespace=%lu",
4680 			(ulint) index->table->flags, (ulint) space_flags);
4681 
4682 		mtr_commit(&mtr);
4683 
4684 		return(false);
4685 	}
4686 
4687 	while (level != btr_page_get_level(page, &mtr)) {
4688 		const rec_t*	node_ptr;
4689 
4690 		if (fseg_page_is_free(seg,
4691 				      block->page.space, block->page.offset)) {
4692 
4693 			btr_validate_report1(index, level, block);
4694 
4695 			ib_logf(IB_LOG_LEVEL_WARN, "page is free");
4696 
4697 			ret = false;
4698 		}
4699 
4700 		ut_a(space == buf_block_get_space(block));
4701 		ut_a(space == page_get_space_id(page));
4702 #ifdef UNIV_ZIP_DEBUG
4703 		page_zip = buf_block_get_page_zip(block);
4704 		ut_a(!page_zip || page_zip_validate(page_zip, page, index));
4705 #endif /* UNIV_ZIP_DEBUG */
4706 		ut_a(!page_is_leaf(page));
4707 
4708 		page_cur_set_before_first(block, &cursor);
4709 		page_cur_move_to_next(&cursor);
4710 
4711 		node_ptr = page_cur_get_rec(&cursor);
4712 		offsets = rec_get_offsets(node_ptr, index, offsets,
4713 					  ULINT_UNDEFINED, &heap);
4714 		block = btr_node_ptr_get_child(node_ptr, index, offsets, &mtr);
4715 		page = buf_block_get_frame(block);
4716 	}
4717 
4718 	/* Now we are on the desired level. Loop through the pages on that
4719 	level. */
4720 
4721 	if (level == 0) {
4722 		/* Leaf pages are managed in their own file segment. */
4723 		seg -= PAGE_BTR_SEG_TOP - PAGE_BTR_SEG_LEAF;
4724 	}
4725 
4726 loop:
4727 	mem_heap_empty(heap);
4728 	offsets = offsets2 = NULL;
4729 	mtr_x_lock(dict_index_get_lock(index), &mtr);
4730 
4731 #ifdef UNIV_ZIP_DEBUG
4732 	page_zip = buf_block_get_page_zip(block);
4733 	ut_a(!page_zip || page_zip_validate(page_zip, page, index));
4734 #endif /* UNIV_ZIP_DEBUG */
4735 
4736 	ut_a(block->page.space == space);
4737 
4738 	if (fseg_page_is_free(seg, block->page.space, block->page.offset)) {
4739 
4740 		btr_validate_report1(index, level, block);
4741 
4742 		ib_logf(IB_LOG_LEVEL_WARN, "Page is marked as free");
4743 		ret = false;
4744 
4745 	} else if (btr_page_get_index_id(page) != index->id) {
4746 
4747 		ib_logf(IB_LOG_LEVEL_ERROR,
4748 			"Page index id " IB_ID_FMT " != data dictionary "
4749 			"index id " IB_ID_FMT,
4750 			btr_page_get_index_id(page), index->id);
4751 
4752 		ret = false;
4753 
4754 	} else if (!page_validate(page, index)) {
4755 
4756 		btr_validate_report1(index, level, block);
4757 		ret = false;
4758 
4759 	} else if (level == 0 && !btr_index_page_validate(block, index)) {
4760 
4761 		/* We are on level 0. Check that the records have the right
4762 		number of fields, and field lengths are right. */
4763 
4764 		ret = false;
4765 	}
4766 
4767 	ut_a(btr_page_get_level(page, &mtr) == level);
4768 
4769 	right_page_no = btr_page_get_next(page, &mtr);
4770 	left_page_no = btr_page_get_prev(page, &mtr);
4771 
4772 	ut_a(!page_is_empty(page)
4773 	     || (level == 0
4774 		 && page_get_page_no(page) == dict_index_get_page(index)));
4775 
4776 	if (right_page_no != FIL_NULL) {
4777 		const rec_t*	right_rec;
4778 		right_block = btr_block_get(space, zip_size, right_page_no,
4779 					    RW_X_LATCH, index, &mtr);
4780 		right_page = buf_block_get_frame(right_block);
4781 		if (btr_page_get_prev(right_page, &mtr)
4782 		    != page_get_page_no(page)) {
4783 
4784 			btr_validate_report2(index, level, block, right_block);
4785 			fputs("InnoDB: broken FIL_PAGE_NEXT"
4786 			      " or FIL_PAGE_PREV links\n", stderr);
4787 			buf_page_print(page, 0, BUF_PAGE_PRINT_NO_CRASH);
4788 			buf_page_print(right_page, 0, BUF_PAGE_PRINT_NO_CRASH);
4789 
4790 			ret = false;
4791 		}
4792 
4793 		if (page_is_comp(right_page) != page_is_comp(page)) {
4794 			btr_validate_report2(index, level, block, right_block);
4795 			fputs("InnoDB: 'compact' flag mismatch\n", stderr);
4796 			buf_page_print(page, 0, BUF_PAGE_PRINT_NO_CRASH);
4797 			buf_page_print(right_page, 0, BUF_PAGE_PRINT_NO_CRASH);
4798 
4799 			ret = false;
4800 
4801 			goto node_ptr_fails;
4802 		}
4803 
4804 		rec = page_rec_get_prev(page_get_supremum_rec(page));
4805 		right_rec = page_rec_get_next(page_get_infimum_rec(
4806 						      right_page));
4807 		offsets = rec_get_offsets(rec, index,
4808 					  offsets, ULINT_UNDEFINED, &heap);
4809 		offsets2 = rec_get_offsets(right_rec, index,
4810 					   offsets2, ULINT_UNDEFINED, &heap);
4811 		if (cmp_rec_rec(rec, right_rec, offsets, offsets2,
4812 			        index) >= 0) {
4813 
4814 			btr_validate_report2(index, level, block, right_block);
4815 
4816 			fputs("InnoDB: records in wrong order"
4817 			      " on adjacent pages\n", stderr);
4818 
4819 			buf_page_print(page, 0, BUF_PAGE_PRINT_NO_CRASH);
4820 			buf_page_print(right_page, 0, BUF_PAGE_PRINT_NO_CRASH);
4821 
4822 			fputs("InnoDB: record ", stderr);
4823 			rec = page_rec_get_prev(page_get_supremum_rec(page));
4824 			rec_print(stderr, rec, index);
4825 			putc('\n', stderr);
4826 			fputs("InnoDB: record ", stderr);
4827 			rec = page_rec_get_next(
4828 				page_get_infimum_rec(right_page));
4829 			rec_print(stderr, rec, index);
4830 			putc('\n', stderr);
4831 
4832 			ret = false;
4833 		}
4834 	}
4835 
4836 	if (level > 0 && left_page_no == FIL_NULL) {
4837 		ut_a(REC_INFO_MIN_REC_FLAG & rec_get_info_bits(
4838 			     page_rec_get_next(page_get_infimum_rec(page)),
4839 			     page_is_comp(page)));
4840 	}
4841 
4842 	if (buf_block_get_page_no(block) != dict_index_get_page(index)) {
4843 
4844 		/* Check father node pointers */
4845 
4846 		rec_t*	node_ptr;
4847 
4848 		offsets = btr_page_get_father_block(offsets, heap, index,
4849 						    block, &mtr, &node_cur);
4850 		father_page = btr_cur_get_page(&node_cur);
4851 		node_ptr = btr_cur_get_rec(&node_cur);
4852 
4853 		btr_cur_position(
4854 			index, page_rec_get_prev(page_get_supremum_rec(page)),
4855 			block, &node_cur);
4856 		offsets = btr_page_get_father_node_ptr(offsets, heap,
4857 						       &node_cur, &mtr);
4858 
4859 		if (node_ptr != btr_cur_get_rec(&node_cur)
4860 		    || btr_node_ptr_get_child_page_no(node_ptr, offsets)
4861 				     != buf_block_get_page_no(block)) {
4862 
4863 			btr_validate_report1(index, level, block);
4864 
4865 			fputs("InnoDB: node pointer to the page is wrong\n",
4866 			      stderr);
4867 
4868 			buf_page_print(father_page, 0, BUF_PAGE_PRINT_NO_CRASH);
4869 			buf_page_print(page, 0, BUF_PAGE_PRINT_NO_CRASH);
4870 
4871 			fputs("InnoDB: node ptr ", stderr);
4872 			rec_print(stderr, node_ptr, index);
4873 
4874 			rec = btr_cur_get_rec(&node_cur);
4875 			fprintf(stderr, "\n"
4876 				"InnoDB: node ptr child page n:o %lu\n",
4877 				(ulong) btr_node_ptr_get_child_page_no(
4878 					rec, offsets));
4879 
4880 			fputs("InnoDB: record on page ", stderr);
4881 			rec_print_new(stderr, rec, offsets);
4882 			putc('\n', stderr);
4883 			ret = false;
4884 
4885 			goto node_ptr_fails;
4886 		}
4887 
4888 		if (!page_is_leaf(page)) {
4889 			node_ptr_tuple = dict_index_build_node_ptr(
4890 				index,
4891 				page_rec_get_next(page_get_infimum_rec(page)),
4892 				0, heap, btr_page_get_level(page, &mtr));
4893 
4894 			if (cmp_dtuple_rec(node_ptr_tuple, node_ptr,
4895 					   offsets)) {
4896 				const rec_t* first_rec = page_rec_get_next(
4897 					page_get_infimum_rec(page));
4898 
4899 				btr_validate_report1(index, level, block);
4900 
4901 				buf_page_print(father_page, 0,
4902 					       BUF_PAGE_PRINT_NO_CRASH);
4903 				buf_page_print(page, 0,
4904 					       BUF_PAGE_PRINT_NO_CRASH);
4905 
4906 				fputs("InnoDB: Error: node ptrs differ"
4907 				      " on levels > 0\n"
4908 				      "InnoDB: node ptr ", stderr);
4909 				rec_print_new(stderr, node_ptr, offsets);
4910 				fputs("InnoDB: first rec ", stderr);
4911 				rec_print(stderr, first_rec, index);
4912 				putc('\n', stderr);
4913 				ret = false;
4914 
4915 				goto node_ptr_fails;
4916 			}
4917 		}
4918 
4919 		if (left_page_no == FIL_NULL) {
4920 			ut_a(node_ptr == page_rec_get_next(
4921 				     page_get_infimum_rec(father_page)));
4922 			ut_a(btr_page_get_prev(father_page, &mtr) == FIL_NULL);
4923 		}
4924 
4925 		if (right_page_no == FIL_NULL) {
4926 			ut_a(node_ptr == page_rec_get_prev(
4927 				     page_get_supremum_rec(father_page)));
4928 			ut_a(btr_page_get_next(father_page, &mtr) == FIL_NULL);
4929 		} else {
4930 			const rec_t*	right_node_ptr
4931 				= page_rec_get_next(node_ptr);
4932 
4933 			offsets = btr_page_get_father_block(
4934 				offsets, heap, index, right_block,
4935 				&mtr, &right_node_cur);
4936 			if (right_node_ptr
4937 			    != page_get_supremum_rec(father_page)) {
4938 
4939 				if (btr_cur_get_rec(&right_node_cur)
4940 				    != right_node_ptr) {
4941 					ret = false;
4942 					fputs("InnoDB: node pointer to"
4943 					      " the right page is wrong\n",
4944 					      stderr);
4945 
4946 					btr_validate_report1(index, level,
4947 							     block);
4948 
4949 					buf_page_print(
4950 						father_page, 0,
4951 						BUF_PAGE_PRINT_NO_CRASH);
4952 					buf_page_print(
4953 						page, 0,
4954 						BUF_PAGE_PRINT_NO_CRASH);
4955 					buf_page_print(
4956 						right_page, 0,
4957 						BUF_PAGE_PRINT_NO_CRASH);
4958 				}
4959 			} else {
4960 				page_t*	right_father_page
4961 					= btr_cur_get_page(&right_node_cur);
4962 
4963 				if (btr_cur_get_rec(&right_node_cur)
4964 				    != page_rec_get_next(
4965 					    page_get_infimum_rec(
4966 						    right_father_page))) {
4967 					ret = false;
4968 					fputs("InnoDB: node pointer 2 to"
4969 					      " the right page is wrong\n",
4970 					      stderr);
4971 
4972 					btr_validate_report1(index, level,
4973 							     block);
4974 
4975 					buf_page_print(
4976 						father_page, 0,
4977 						BUF_PAGE_PRINT_NO_CRASH);
4978 					buf_page_print(
4979 						right_father_page, 0,
4980 						BUF_PAGE_PRINT_NO_CRASH);
4981 					buf_page_print(
4982 						page, 0,
4983 						BUF_PAGE_PRINT_NO_CRASH);
4984 					buf_page_print(
4985 						right_page, 0,
4986 						BUF_PAGE_PRINT_NO_CRASH);
4987 				}
4988 
4989 				if (page_get_page_no(right_father_page)
4990 				    != btr_page_get_next(father_page, &mtr)) {
4991 
4992 					ret = false;
4993 					fputs("InnoDB: node pointer 3 to"
4994 					      " the right page is wrong\n",
4995 					      stderr);
4996 
4997 					btr_validate_report1(index, level,
4998 							     block);
4999 
5000 					buf_page_print(
5001 						father_page, 0,
5002 						BUF_PAGE_PRINT_NO_CRASH);
5003 					buf_page_print(
5004 						right_father_page, 0,
5005 						BUF_PAGE_PRINT_NO_CRASH);
5006 					buf_page_print(
5007 						page, 0,
5008 						BUF_PAGE_PRINT_NO_CRASH);
5009 					buf_page_print(
5010 						right_page, 0,
5011 						BUF_PAGE_PRINT_NO_CRASH);
5012 				}
5013 			}
5014 		}
5015 	}
5016 
5017 node_ptr_fails:
5018 	/* Commit the mini-transaction to release the latch on 'page'.
5019 	Re-acquire the latch on right_page, which will become 'page'
5020 	on the next loop.  The page has already been checked. */
5021 	mtr_commit(&mtr);
5022 
5023 	if (trx_is_interrupted(trx)) {
5024 		/* On interrupt, return the current status. */
5025 	} else if (right_page_no != FIL_NULL) {
5026 
5027 		mtr_start(&mtr);
5028 
5029 		block = btr_block_get(
5030 			space, zip_size, right_page_no,
5031 			RW_X_LATCH, index, &mtr);
5032 
5033 		page = buf_block_get_frame(block);
5034 
5035 		goto loop;
5036 	}
5037 
5038 	mem_heap_free(heap);
5039 
5040 	return(ret);
5041 }
5042 
5043 /**************************************************************//**
5044 Checks the consistency of an index tree.
5045 @return	TRUE if ok */
5046 UNIV_INTERN
5047 bool
btr_validate_index(dict_index_t * index,const trx_t * trx)5048 btr_validate_index(
5049 /*===============*/
5050 	dict_index_t*	index,	/*!< in: index */
5051 	const trx_t*	trx)	/*!< in: transaction or NULL */
5052 {
5053 	/* Full Text index are implemented by auxiliary tables,
5054 	not the B-tree */
5055 	if (dict_index_is_online_ddl(index) || (index->type & DICT_FTS)) {
5056 		return(true);
5057 	}
5058 
5059 	mtr_t		mtr;
5060 
5061 	mtr_start(&mtr);
5062 
5063 	mtr_x_lock(dict_index_get_lock(index), &mtr);
5064 
5065 	bool	ok = true;
5066 	page_t*	root = btr_root_get(index, &mtr);
5067 
5068 	SRV_CORRUPT_TABLE_CHECK(root,
5069 	{
5070 		mtr_commit(&mtr);
5071 		return(FALSE);
5072 	});
5073 
5074 	ulint	n = btr_page_get_level(root, &mtr);
5075 
5076 	for (ulint i = 0; i <= n; ++i) {
5077 
5078 		if (!btr_validate_level(index, trx, n - i)) {
5079 			ok = false;
5080 			break;
5081 		}
5082 	}
5083 
5084 	mtr_commit(&mtr);
5085 
5086 	return(ok);
5087 }
5088 
5089 /**************************************************************//**
5090 Checks if the page in the cursor can be merged with given page.
5091 If necessary, re-organize the merge_page.
5092 @return	TRUE if possible to merge. */
5093 UNIV_INTERN
5094 ibool
btr_can_merge_with_page(btr_cur_t * cursor,ulint page_no,buf_block_t ** merge_block,mtr_t * mtr)5095 btr_can_merge_with_page(
5096 /*====================*/
5097 	btr_cur_t*	cursor,		/*!< in: cursor on the page to merge */
5098 	ulint		page_no,	/*!< in: a sibling page */
5099 	buf_block_t**	merge_block,	/*!< out: the merge block */
5100 	mtr_t*		mtr)		/*!< in: mini-transaction */
5101 {
5102 	dict_index_t*	index;
5103 	page_t*		page;
5104 	ulint		space;
5105 	ulint		zip_size;
5106 	ulint		n_recs;
5107 	ulint		data_size;
5108         ulint           max_ins_size_reorg;
5109 	ulint		max_ins_size;
5110 	buf_block_t*	mblock;
5111 	page_t*		mpage;
5112 	DBUG_ENTER("btr_can_merge_with_page");
5113 
5114 	if (page_no == FIL_NULL) {
5115 		goto error;
5116 	}
5117 
5118 	index = btr_cur_get_index(cursor);
5119 	page  = btr_cur_get_page(cursor);
5120 	space = dict_index_get_space(index);
5121         zip_size = dict_table_zip_size(index->table);
5122 
5123 	mblock = btr_block_get(space, zip_size, page_no, RW_X_LATCH, index,
5124 			       mtr);
5125 	mpage = buf_block_get_frame(mblock);
5126 
5127         n_recs = page_get_n_recs(page);
5128         data_size = page_get_data_size(page);
5129 
5130         max_ins_size_reorg = page_get_max_insert_size_after_reorganize(
5131                 mpage, n_recs);
5132 
5133 	if (data_size > max_ins_size_reorg) {
5134 		goto error;
5135 	}
5136 
5137 	/* If compression padding tells us that merging will result in
5138 	too packed up page i.e.: which is likely to cause compression
5139 	failure then don't merge the pages. */
5140 	if (zip_size && page_is_leaf(mpage)
5141 	    && (page_get_data_size(mpage) + data_size
5142 		>= dict_index_zip_pad_optimal_page_size(index))) {
5143 
5144 		goto error;
5145 	}
5146 
5147 
5148 	max_ins_size = page_get_max_insert_size(mpage, n_recs);
5149 
5150 	if (data_size > max_ins_size) {
5151 
5152 		/* We have to reorganize mpage */
5153 
5154 		if (!btr_page_reorganize_block(
5155 			    false, page_zip_level, mblock, index, mtr)) {
5156 
5157 			goto error;
5158 		}
5159 
5160 		max_ins_size = page_get_max_insert_size(mpage, n_recs);
5161 
5162 		ut_ad(page_validate(mpage, index));
5163 		ut_ad(max_ins_size == max_ins_size_reorg);
5164 
5165 		if (data_size > max_ins_size) {
5166 
5167 			/* Add fault tolerance, though this should
5168 			never happen */
5169 
5170 			goto error;
5171 		}
5172 	}
5173 
5174 	*merge_block = mblock;
5175 	DBUG_RETURN(TRUE);
5176 
5177 error:
5178 	*merge_block = NULL;
5179 	DBUG_RETURN(FALSE);
5180 }
5181 
5182 #endif /* !UNIV_HOTBACKUP */
5183