1 /*****************************************************************************
2 
3 Copyright (c) 1994, 2012, Oracle and/or its affiliates. All Rights Reserved.
4 
5 This program is free software; you can redistribute it and/or modify it under
6 the terms of the GNU General Public License as published by the Free Software
7 Foundation; version 2 of the License.
8 
9 This program is distributed in the hope that it will be useful, but WITHOUT
10 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
12 
13 You should have received a copy of the GNU General Public License along with
14 this program; if not, write to the Free Software Foundation, Inc.,
15 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
16 
17 *****************************************************************************/
18 
19 /**************************************************//**
20 @file btr/btr0btr.c
21 The B-tree
22 
23 Created 6/2/1994 Heikki Tuuri
24 *******************************************************/
25 
26 #include "btr0btr.h"
27 
28 #ifdef UNIV_NONINL
29 #include "btr0btr.ic"
30 #endif
31 
32 #include "fsp0fsp.h"
33 #include "page0page.h"
34 #include "page0zip.h"
35 
36 #ifndef UNIV_HOTBACKUP
37 #include "btr0cur.h"
38 #include "btr0sea.h"
39 #include "btr0pcur.h"
40 #include "rem0cmp.h"
41 #include "lock0lock.h"
42 #include "ibuf0ibuf.h"
43 #include "trx0trx.h"
44 
45 /**************************************************************//**
46 Checks if the page in the cursor can be merged with given page.
47 If necessary, re-organize the merge_page.
48 @return	TRUE if possible to merge. */
49 UNIV_INTERN
50 ibool
51 btr_can_merge_with_page(
52 /*====================*/
53 	btr_cur_t*	cursor,		/*!< in: cursor on the page to merge */
54 	ulint		page_no,	/*!< in: a sibling page */
55 	buf_block_t**	merge_block,	/*!< out: the merge block */
56 	mtr_t*		mtr);		/*!< in: mini-transaction */
57 
58 #endif /* UNIV_HOTBACKUP */
59 
60 /**************************************************************//**
61 Report that an index page is corrupted. */
62 UNIV_INTERN
63 void
btr_corruption_report(const buf_block_t * block,const dict_index_t * index)64 btr_corruption_report(
65 /*==================*/
66 	const buf_block_t*	block,	/*!< in: corrupted block */
67 	const dict_index_t*	index)	/*!< in: index tree */
68 {
69 	fprintf(stderr, "InnoDB: flag mismatch in space %u page %u"
70 		" index %s of table %s\n",
71 		(unsigned) buf_block_get_space(block),
72 		(unsigned) buf_block_get_page_no(block),
73 		index->name, index->table_name);
74 	if (block->page.zip.data) {
75 		buf_page_print(block->page.zip.data,
76 			       buf_block_get_zip_size(block),
77 			       BUF_PAGE_PRINT_NO_CRASH);
78 	}
79 	buf_page_print(buf_block_get_frame(block), 0, 0);
80 }
81 
82 #ifndef UNIV_HOTBACKUP
83 #ifdef UNIV_BLOB_DEBUG
84 # include "srv0srv.h"
85 # include "ut0rbt.h"
86 
87 /** TRUE when messages about index->blobs modification are enabled. */
88 static ibool btr_blob_dbg_msg;
89 
90 /** Issue a message about an operation on index->blobs.
91 @param op	operation
92 @param b	the entry being subjected to the operation
93 @param ctx	the context of the operation */
94 #define btr_blob_dbg_msg_issue(op, b, ctx)			\
95 	fprintf(stderr, op " %u:%u:%u->%u %s(%u,%u,%u)\n",	\
96 		(b)->ref_page_no, (b)->ref_heap_no,		\
97 		(b)->ref_field_no, (b)->blob_page_no, ctx,	\
98 		(b)->owner, (b)->always_owner, (b)->del)
99 
100 /** Insert to index->blobs a reference to an off-page column.
101 @param index	the index tree
102 @param b	the reference
103 @param ctx	context (for logging) */
104 UNIV_INTERN
105 void
btr_blob_dbg_rbt_insert(dict_index_t * index,const btr_blob_dbg_t * b,const char * ctx)106 btr_blob_dbg_rbt_insert(
107 /*====================*/
108 	dict_index_t*		index,	/*!< in/out: index tree */
109 	const btr_blob_dbg_t*	b,	/*!< in: the reference */
110 	const char*		ctx)	/*!< in: context (for logging) */
111 {
112 	if (btr_blob_dbg_msg) {
113 		btr_blob_dbg_msg_issue("insert", b, ctx);
114 	}
115 	mutex_enter(&index->blobs_mutex);
116 	rbt_insert(index->blobs, b, b);
117 	mutex_exit(&index->blobs_mutex);
118 }
119 
120 /** Remove from index->blobs a reference to an off-page column.
121 @param index	the index tree
122 @param b	the reference
123 @param ctx	context (for logging) */
124 UNIV_INTERN
125 void
btr_blob_dbg_rbt_delete(dict_index_t * index,const btr_blob_dbg_t * b,const char * ctx)126 btr_blob_dbg_rbt_delete(
127 /*====================*/
128 	dict_index_t*		index,	/*!< in/out: index tree */
129 	const btr_blob_dbg_t*	b,	/*!< in: the reference */
130 	const char*		ctx)	/*!< in: context (for logging) */
131 {
132 	if (btr_blob_dbg_msg) {
133 		btr_blob_dbg_msg_issue("delete", b, ctx);
134 	}
135 	mutex_enter(&index->blobs_mutex);
136 	ut_a(rbt_delete(index->blobs, b));
137 	mutex_exit(&index->blobs_mutex);
138 }
139 
140 /**************************************************************//**
141 Comparator for items (btr_blob_dbg_t) in index->blobs.
142 The key in index->blobs is (ref_page_no, ref_heap_no, ref_field_no).
143 @return negative, 0 or positive if *a<*b, *a=*b, *a>*b */
144 static
145 int
btr_blob_dbg_cmp(const void * a,const void * b)146 btr_blob_dbg_cmp(
147 /*=============*/
148 	const void*	a,	/*!< in: first btr_blob_dbg_t to compare */
149 	const void*	b)	/*!< in: second btr_blob_dbg_t to compare */
150 {
151 	const btr_blob_dbg_t*	aa	= a;
152 	const btr_blob_dbg_t*	bb	= b;
153 
154 	ut_ad(aa != NULL);
155 	ut_ad(bb != NULL);
156 
157 	if (aa->ref_page_no != bb->ref_page_no) {
158 		return(aa->ref_page_no < bb->ref_page_no ? -1 : 1);
159 	}
160 	if (aa->ref_heap_no != bb->ref_heap_no) {
161 		return(aa->ref_heap_no < bb->ref_heap_no ? -1 : 1);
162 	}
163 	if (aa->ref_field_no != bb->ref_field_no) {
164 		return(aa->ref_field_no < bb->ref_field_no ? -1 : 1);
165 	}
166 	return(0);
167 }
168 
169 /**************************************************************//**
170 Add a reference to an off-page column to the index->blobs map. */
171 UNIV_INTERN
172 void
btr_blob_dbg_add_blob(const rec_t * rec,ulint field_no,ulint page_no,dict_index_t * index,const char * ctx)173 btr_blob_dbg_add_blob(
174 /*==================*/
175 	const rec_t*	rec,		/*!< in: clustered index record */
176 	ulint		field_no,	/*!< in: off-page column number */
177 	ulint		page_no,	/*!< in: start page of the column */
178 	dict_index_t*	index,		/*!< in/out: index tree */
179 	const char*	ctx)		/*!< in: context (for logging) */
180 {
181 	btr_blob_dbg_t	b;
182 	const page_t*	page	= page_align(rec);
183 
184 	ut_a(index->blobs);
185 
186 	b.blob_page_no = page_no;
187 	b.ref_page_no = page_get_page_no(page);
188 	b.ref_heap_no = page_rec_get_heap_no(rec);
189 	b.ref_field_no = field_no;
190 	ut_a(b.ref_field_no >= index->n_uniq);
191 	b.always_owner = b.owner = TRUE;
192 	b.del = FALSE;
193 	ut_a(!rec_get_deleted_flag(rec, page_is_comp(page)));
194 	btr_blob_dbg_rbt_insert(index, &b, ctx);
195 }
196 
197 /**************************************************************//**
198 Add to index->blobs any references to off-page columns from a record.
199 @return number of references added */
200 UNIV_INTERN
201 ulint
btr_blob_dbg_add_rec(const rec_t * rec,dict_index_t * index,const ulint * offsets,const char * ctx)202 btr_blob_dbg_add_rec(
203 /*=================*/
204 	const rec_t*	rec,	/*!< in: record */
205 	dict_index_t*	index,	/*!< in/out: index */
206 	const ulint*	offsets,/*!< in: offsets */
207 	const char*	ctx)	/*!< in: context (for logging) */
208 {
209 	ulint		count	= 0;
210 	ulint		i;
211 	btr_blob_dbg_t	b;
212 	ibool		del;
213 
214 	ut_ad(rec_offs_validate(rec, index, offsets));
215 
216 	if (!rec_offs_any_extern(offsets)) {
217 		return(0);
218 	}
219 
220 	b.ref_page_no = page_get_page_no(page_align(rec));
221 	b.ref_heap_no = page_rec_get_heap_no(rec);
222 	del = (rec_get_deleted_flag(rec, rec_offs_comp(offsets)) != 0);
223 
224 	for (i = 0; i < rec_offs_n_fields(offsets); i++) {
225 		if (rec_offs_nth_extern(offsets, i)) {
226 			ulint		len;
227 			const byte*	field_ref = rec_get_nth_field(
228 				rec, offsets, i, &len);
229 
230 			ut_a(len != UNIV_SQL_NULL);
231 			ut_a(len >= BTR_EXTERN_FIELD_REF_SIZE);
232 			field_ref += len - BTR_EXTERN_FIELD_REF_SIZE;
233 
234 			if (!memcmp(field_ref, field_ref_zero,
235 				    BTR_EXTERN_FIELD_REF_SIZE)) {
236 				/* the column has not been stored yet */
237 				continue;
238 			}
239 
240 			b.ref_field_no = i;
241 			b.blob_page_no = mach_read_from_4(
242 				field_ref + BTR_EXTERN_PAGE_NO);
243 			ut_a(b.ref_field_no >= index->n_uniq);
244 			b.always_owner = b.owner
245 				= !(field_ref[BTR_EXTERN_LEN]
246 				    & BTR_EXTERN_OWNER_FLAG);
247 			b.del = del;
248 
249 			btr_blob_dbg_rbt_insert(index, &b, ctx);
250 			count++;
251 		}
252 	}
253 
254 	return(count);
255 }
256 
257 /**************************************************************//**
258 Display the references to off-page columns.
259 This function is to be called from a debugger,
260 for example when a breakpoint on ut_dbg_assertion_failed is hit. */
261 UNIV_INTERN
262 void
btr_blob_dbg_print(const dict_index_t * index)263 btr_blob_dbg_print(
264 /*===============*/
265 	const dict_index_t*	index)	/*!< in: index tree */
266 {
267 	const ib_rbt_node_t*	node;
268 
269 	if (!index->blobs) {
270 		return;
271 	}
272 
273 	/* We intentionally do not acquire index->blobs_mutex here.
274 	This function is to be called from a debugger, and the caller
275 	should make sure that the index->blobs_mutex is held. */
276 
277 	for (node = rbt_first(index->blobs);
278 	     node != NULL; node = rbt_next(index->blobs, node)) {
279 		const btr_blob_dbg_t*	b
280 			= rbt_value(btr_blob_dbg_t, node);
281 		fprintf(stderr, "%u:%u:%u->%u%s%s%s\n",
282 			b->ref_page_no, b->ref_heap_no, b->ref_field_no,
283 			b->blob_page_no,
284 			b->owner ? "" : "(disowned)",
285 			b->always_owner ? "" : "(has disowned)",
286 			b->del ? "(deleted)" : "");
287 	}
288 }
289 
290 /**************************************************************//**
291 Remove from index->blobs any references to off-page columns from a record.
292 @return number of references removed */
293 UNIV_INTERN
294 ulint
btr_blob_dbg_remove_rec(const rec_t * rec,dict_index_t * index,const ulint * offsets,const char * ctx)295 btr_blob_dbg_remove_rec(
296 /*====================*/
297 	const rec_t*	rec,	/*!< in: record */
298 	dict_index_t*	index,	/*!< in/out: index */
299 	const ulint*	offsets,/*!< in: offsets */
300 	const char*	ctx)	/*!< in: context (for logging) */
301 {
302 	ulint		i;
303 	ulint		count	= 0;
304 	btr_blob_dbg_t	b;
305 
306 	ut_ad(rec_offs_validate(rec, index, offsets));
307 
308 	if (!rec_offs_any_extern(offsets)) {
309 		return(0);
310 	}
311 
312 	b.ref_page_no = page_get_page_no(page_align(rec));
313 	b.ref_heap_no = page_rec_get_heap_no(rec);
314 
315 	for (i = 0; i < rec_offs_n_fields(offsets); i++) {
316 		if (rec_offs_nth_extern(offsets, i)) {
317 			ulint		len;
318 			const byte*	field_ref = rec_get_nth_field(
319 				rec, offsets, i, &len);
320 
321 			ut_a(len != UNIV_SQL_NULL);
322 			ut_a(len >= BTR_EXTERN_FIELD_REF_SIZE);
323 			field_ref += len - BTR_EXTERN_FIELD_REF_SIZE;
324 
325 			b.ref_field_no = i;
326 			b.blob_page_no = mach_read_from_4(
327 				field_ref + BTR_EXTERN_PAGE_NO);
328 
329 			switch (b.blob_page_no) {
330 			case 0:
331 				/* The column has not been stored yet.
332 				The BLOB pointer must be all zero.
333 				There cannot be a BLOB starting at
334 				page 0, because page 0 is reserved for
335 				the tablespace header. */
336 				ut_a(!memcmp(field_ref, field_ref_zero,
337 					     BTR_EXTERN_FIELD_REF_SIZE));
338 				/* fall through */
339 			case FIL_NULL:
340 				/* the column has been freed already */
341 				continue;
342 			}
343 
344 			btr_blob_dbg_rbt_delete(index, &b, ctx);
345 			count++;
346 		}
347 	}
348 
349 	return(count);
350 }
351 
352 /**************************************************************//**
353 Check that there are no references to off-page columns from or to
354 the given page. Invoked when freeing or clearing a page.
355 @return TRUE when no orphan references exist */
356 UNIV_INTERN
357 ibool
btr_blob_dbg_is_empty(dict_index_t * index,ulint page_no)358 btr_blob_dbg_is_empty(
359 /*==================*/
360 	dict_index_t*	index,		/*!< in: index */
361 	ulint		page_no)	/*!< in: page number */
362 {
363 	const ib_rbt_node_t*	node;
364 	ibool			success	= TRUE;
365 
366 	if (!index->blobs) {
367 		return(success);
368 	}
369 
370 	mutex_enter(&index->blobs_mutex);
371 
372 	for (node = rbt_first(index->blobs);
373 	     node != NULL; node = rbt_next(index->blobs, node)) {
374 		const btr_blob_dbg_t*	b
375 			= rbt_value(btr_blob_dbg_t, node);
376 
377 		if (b->ref_page_no != page_no && b->blob_page_no != page_no) {
378 			continue;
379 		}
380 
381 		fprintf(stderr,
382 			"InnoDB: orphan BLOB ref%s%s%s %u:%u:%u->%u\n",
383 			b->owner ? "" : "(disowned)",
384 			b->always_owner ? "" : "(has disowned)",
385 			b->del ? "(deleted)" : "",
386 			b->ref_page_no, b->ref_heap_no, b->ref_field_no,
387 			b->blob_page_no);
388 
389 		if (b->blob_page_no != page_no || b->owner || !b->del) {
390 			success = FALSE;
391 		}
392 	}
393 
394 	mutex_exit(&index->blobs_mutex);
395 	return(success);
396 }
397 
398 /**************************************************************//**
399 Count and process all references to off-page columns on a page.
400 @return number of references processed */
401 UNIV_INTERN
402 ulint
btr_blob_dbg_op(const page_t * page,const rec_t * rec,dict_index_t * index,const char * ctx,const btr_blob_dbg_op_f op)403 btr_blob_dbg_op(
404 /*============*/
405 	const page_t*		page,	/*!< in: B-tree leaf page */
406 	const rec_t*		rec,	/*!< in: record to start from
407 					(NULL to process the whole page) */
408 	dict_index_t*		index,	/*!< in/out: index */
409 	const char*		ctx,	/*!< in: context (for logging) */
410 	const btr_blob_dbg_op_f	op)	/*!< in: operation on records */
411 {
412 	ulint		count	= 0;
413 	mem_heap_t*	heap	= NULL;
414 	ulint		offsets_[REC_OFFS_NORMAL_SIZE];
415 	ulint*		offsets	= offsets_;
416 	rec_offs_init(offsets_);
417 
418 	ut_a(fil_page_get_type(page) == FIL_PAGE_INDEX);
419 	ut_a(!rec || page_align(rec) == page);
420 
421 	if (!index->blobs || !page_is_leaf(page)
422 	    || !dict_index_is_clust(index)) {
423 		return(0);
424 	}
425 
426 	if (rec == NULL) {
427 		rec = page_get_infimum_rec(page);
428 	}
429 
430 	do {
431 		offsets = rec_get_offsets(rec, index, offsets,
432 					  ULINT_UNDEFINED, &heap);
433 		count += op(rec, index, offsets, ctx);
434 		rec = page_rec_get_next_const(rec);
435 	} while (!page_rec_is_supremum(rec));
436 
437 	if (UNIV_LIKELY_NULL(heap)) {
438 		mem_heap_free(heap);
439 	}
440 
441 	return(count);
442 }
443 
444 /**************************************************************//**
445 Count and add to index->blobs any references to off-page columns
446 from records on a page.
447 @return number of references added */
448 UNIV_INTERN
449 ulint
btr_blob_dbg_add(const page_t * page,dict_index_t * index,const char * ctx)450 btr_blob_dbg_add(
451 /*=============*/
452 	const page_t*	page,	/*!< in: rewritten page */
453 	dict_index_t*	index,	/*!< in/out: index */
454 	const char*	ctx)	/*!< in: context (for logging) */
455 {
456 	btr_blob_dbg_assert_empty(index, page_get_page_no(page));
457 
458 	return(btr_blob_dbg_op(page, NULL, index, ctx, btr_blob_dbg_add_rec));
459 }
460 
461 /**************************************************************//**
462 Count and remove from index->blobs any references to off-page columns
463 from records on a page.
464 Used when reorganizing a page, before copying the records.
465 @return number of references removed */
466 UNIV_INTERN
467 ulint
btr_blob_dbg_remove(const page_t * page,dict_index_t * index,const char * ctx)468 btr_blob_dbg_remove(
469 /*================*/
470 	const page_t*	page,	/*!< in: b-tree page */
471 	dict_index_t*	index,	/*!< in/out: index */
472 	const char*	ctx)	/*!< in: context (for logging) */
473 {
474 	ulint	count;
475 
476 	count = btr_blob_dbg_op(page, NULL, index, ctx,
477 				btr_blob_dbg_remove_rec);
478 
479 	/* Check that no references exist. */
480 	btr_blob_dbg_assert_empty(index, page_get_page_no(page));
481 
482 	return(count);
483 }
484 
485 /**************************************************************//**
486 Restore in index->blobs any references to off-page columns
487 Used when page reorganize fails due to compressed page overflow. */
488 UNIV_INTERN
489 void
btr_blob_dbg_restore(const page_t * npage,const page_t * page,dict_index_t * index,const char * ctx)490 btr_blob_dbg_restore(
491 /*=================*/
492 	const page_t*	npage,	/*!< in: page that failed to compress  */
493 	const page_t*	page,	/*!< in: copy of original page */
494 	dict_index_t*	index,	/*!< in/out: index */
495 	const char*	ctx)	/*!< in: context (for logging) */
496 {
497 	ulint	removed;
498 	ulint	added;
499 
500 	ut_a(page_get_page_no(npage) == page_get_page_no(page));
501 	ut_a(page_get_space_id(npage) == page_get_space_id(page));
502 
503 	removed = btr_blob_dbg_remove(npage, index, ctx);
504 	added = btr_blob_dbg_add(page, index, ctx);
505 	ut_a(added == removed);
506 }
507 
508 /**************************************************************//**
509 Modify the 'deleted' flag of a record. */
510 UNIV_INTERN
511 void
btr_blob_dbg_set_deleted_flag(const rec_t * rec,dict_index_t * index,const ulint * offsets,ibool del)512 btr_blob_dbg_set_deleted_flag(
513 /*==========================*/
514 	const rec_t*		rec,	/*!< in: record */
515 	dict_index_t*		index,	/*!< in/out: index */
516 	const ulint*		offsets,/*!< in: rec_get_offs(rec, index) */
517 	ibool			del)	/*!< in: TRUE=deleted, FALSE=exists */
518 {
519 	const ib_rbt_node_t*	node;
520 	btr_blob_dbg_t		b;
521 	btr_blob_dbg_t*		c;
522 	ulint			i;
523 
524 	ut_ad(rec_offs_validate(rec, index, offsets));
525 	ut_a(dict_index_is_clust(index));
526 	ut_a(del == !!del);/* must be FALSE==0 or TRUE==1 */
527 
528 	if (!rec_offs_any_extern(offsets) || !index->blobs) {
529 
530 		return;
531 	}
532 
533 	b.ref_page_no = page_get_page_no(page_align(rec));
534 	b.ref_heap_no = page_rec_get_heap_no(rec);
535 
536 	for (i = 0; i < rec_offs_n_fields(offsets); i++) {
537 		if (rec_offs_nth_extern(offsets, i)) {
538 			ulint		len;
539 			const byte*	field_ref = rec_get_nth_field(
540 				rec, offsets, i, &len);
541 
542 			ut_a(len != UNIV_SQL_NULL);
543 			ut_a(len >= BTR_EXTERN_FIELD_REF_SIZE);
544 			field_ref += len - BTR_EXTERN_FIELD_REF_SIZE;
545 
546 			b.ref_field_no = i;
547 			b.blob_page_no = mach_read_from_4(
548 				field_ref + BTR_EXTERN_PAGE_NO);
549 
550 			switch (b.blob_page_no) {
551 			case 0:
552 				ut_a(memcmp(field_ref, field_ref_zero,
553 					    BTR_EXTERN_FIELD_REF_SIZE));
554 				/* page number 0 is for the
555 				page allocation bitmap */
556 			case FIL_NULL:
557 				/* the column has been freed already */
558 				ut_error;
559 			}
560 
561 			mutex_enter(&index->blobs_mutex);
562 			node = rbt_lookup(index->blobs, &b);
563 			ut_a(node);
564 
565 			c = rbt_value(btr_blob_dbg_t, node);
566 			/* The flag should be modified. */
567 			c->del = del;
568 			if (btr_blob_dbg_msg) {
569 				b = *c;
570 				mutex_exit(&index->blobs_mutex);
571 				btr_blob_dbg_msg_issue("del_mk", &b, "");
572 			} else {
573 				mutex_exit(&index->blobs_mutex);
574 			}
575 		}
576 	}
577 }
578 
579 /**************************************************************//**
580 Change the ownership of an off-page column. */
581 UNIV_INTERN
582 void
btr_blob_dbg_owner(const rec_t * rec,dict_index_t * index,const ulint * offsets,ulint i,ibool own)583 btr_blob_dbg_owner(
584 /*===============*/
585 	const rec_t*		rec,	/*!< in: record */
586 	dict_index_t*		index,	/*!< in/out: index */
587 	const ulint*		offsets,/*!< in: rec_get_offs(rec, index) */
588 	ulint			i,	/*!< in: ith field in rec */
589 	ibool			own)	/*!< in: TRUE=owned, FALSE=disowned */
590 {
591 	const ib_rbt_node_t*	node;
592 	btr_blob_dbg_t		b;
593 	const byte*		field_ref;
594 	ulint			len;
595 
596 	ut_ad(rec_offs_validate(rec, index, offsets));
597 	ut_a(rec_offs_nth_extern(offsets, i));
598 
599 	field_ref = rec_get_nth_field(rec, offsets, i, &len);
600 	ut_a(len != UNIV_SQL_NULL);
601 	ut_a(len >= BTR_EXTERN_FIELD_REF_SIZE);
602 	field_ref += len - BTR_EXTERN_FIELD_REF_SIZE;
603 
604 	b.ref_page_no = page_get_page_no(page_align(rec));
605 	b.ref_heap_no = page_rec_get_heap_no(rec);
606 	b.ref_field_no = i;
607 	b.owner = !(field_ref[BTR_EXTERN_LEN] & BTR_EXTERN_OWNER_FLAG);
608 	b.blob_page_no = mach_read_from_4(field_ref + BTR_EXTERN_PAGE_NO);
609 
610 	ut_a(b.owner == own);
611 
612 	mutex_enter(&index->blobs_mutex);
613 	node = rbt_lookup(index->blobs, &b);
614 	/* row_ins_clust_index_entry_by_modify() invokes
615 	btr_cur_unmark_extern_fields() also for the newly inserted
616 	references, which are all zero bytes until the columns are stored.
617 	The node lookup must fail if and only if that is the case. */
618 	ut_a(!memcmp(field_ref, field_ref_zero, BTR_EXTERN_FIELD_REF_SIZE)
619 	     == !node);
620 
621 	if (node) {
622 		btr_blob_dbg_t*	c = rbt_value(btr_blob_dbg_t, node);
623 		/* Some code sets ownership from TRUE to TRUE.
624 		We do not allow changing ownership from FALSE to FALSE. */
625 		ut_a(own || c->owner);
626 
627 		c->owner = own;
628 		if (!own) {
629 			c->always_owner = FALSE;
630 		}
631 	}
632 
633 	mutex_exit(&index->blobs_mutex);
634 }
635 #endif /* UNIV_BLOB_DEBUG */
636 
637 /*
638 Latching strategy of the InnoDB B-tree
639 --------------------------------------
640 A tree latch protects all non-leaf nodes of the tree. Each node of a tree
641 also has a latch of its own.
642 
643 A B-tree operation normally first acquires an S-latch on the tree. It
644 searches down the tree and releases the tree latch when it has the
645 leaf node latch. To save CPU time we do not acquire any latch on
646 non-leaf nodes of the tree during a search, those pages are only bufferfixed.
647 
648 If an operation needs to restructure the tree, it acquires an X-latch on
649 the tree before searching to a leaf node. If it needs, for example, to
650 split a leaf,
651 (1) InnoDB decides the split point in the leaf,
652 (2) allocates a new page,
653 (3) inserts the appropriate node pointer to the first non-leaf level,
654 (4) releases the tree X-latch,
655 (5) and then moves records from the leaf to the new allocated page.
656 
657 Node pointers
658 -------------
659 Leaf pages of a B-tree contain the index records stored in the
660 tree. On levels n > 0 we store 'node pointers' to pages on level
661 n - 1. For each page there is exactly one node pointer stored:
662 thus the our tree is an ordinary B-tree, not a B-link tree.
663 
664 A node pointer contains a prefix P of an index record. The prefix
665 is long enough so that it determines an index record uniquely.
666 The file page number of the child page is added as the last
667 field. To the child page we can store node pointers or index records
668 which are >= P in the alphabetical order, but < P1 if there is
669 a next node pointer on the level, and P1 is its prefix.
670 
671 If a node pointer with a prefix P points to a non-leaf child,
672 then the leftmost record in the child must have the same
673 prefix P. If it points to a leaf node, the child is not required
674 to contain any record with a prefix equal to P. The leaf case
675 is decided this way to allow arbitrary deletions in a leaf node
676 without touching upper levels of the tree.
677 
678 We have predefined a special minimum record which we
679 define as the smallest record in any alphabetical order.
680 A minimum record is denoted by setting a bit in the record
681 header. A minimum record acts as the prefix of a node pointer
682 which points to a leftmost node on any level of the tree.
683 
684 File page allocation
685 --------------------
686 In the root node of a B-tree there are two file segment headers.
687 The leaf pages of a tree are allocated from one file segment, to
688 make them consecutive on disk if possible. From the other file segment
689 we allocate pages for the non-leaf levels of the tree.
690 */
691 
692 #ifdef UNIV_BTR_DEBUG
693 /**************************************************************//**
694 Checks a file segment header within a B-tree root page.
695 @return	TRUE if valid */
696 static
697 ibool
btr_root_fseg_validate(const fseg_header_t * seg_header,ulint space)698 btr_root_fseg_validate(
699 /*===================*/
700 	const fseg_header_t*	seg_header,	/*!< in: segment header */
701 	ulint			space)		/*!< in: tablespace identifier */
702 {
703 	ulint	offset = mach_read_from_2(seg_header + FSEG_HDR_OFFSET);
704 
705 	ut_a(mach_read_from_4(seg_header + FSEG_HDR_SPACE) == space);
706 	ut_a(offset >= FIL_PAGE_DATA);
707 	ut_a(offset <= UNIV_PAGE_SIZE - FIL_PAGE_DATA_END);
708 	return(TRUE);
709 }
710 #endif /* UNIV_BTR_DEBUG */
711 
712 /**************************************************************//**
713 Gets the root node of a tree and x-latches it.
714 @return	root page, x-latched */
715 static
716 buf_block_t*
btr_root_block_get(dict_index_t * index,mtr_t * mtr)717 btr_root_block_get(
718 /*===============*/
719 	dict_index_t*	index,	/*!< in: index tree */
720 	mtr_t*		mtr)	/*!< in: mtr */
721 {
722 	ulint		space;
723 	ulint		zip_size;
724 	ulint		root_page_no;
725 	buf_block_t*	block;
726 
727 	space = dict_index_get_space(index);
728 	zip_size = dict_table_zip_size(index->table);
729 	root_page_no = dict_index_get_page(index);
730 
731 	block = btr_block_get(space, zip_size, root_page_no, RW_X_LATCH,
732 			      index, mtr);
733 	btr_assert_not_corrupted(block, index);
734 #ifdef UNIV_BTR_DEBUG
735 	if (!dict_index_is_ibuf(index)) {
736 		const page_t*	root = buf_block_get_frame(block);
737 
738 		ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_LEAF
739 					    + root, space));
740 		ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP
741 					    + root, space));
742 	}
743 #endif /* UNIV_BTR_DEBUG */
744 
745 	return(block);
746 }
747 
748 /**************************************************************//**
749 Gets the root node of a tree and x-latches it.
750 @return	root page, x-latched */
751 UNIV_INTERN
752 page_t*
btr_root_get(dict_index_t * index,mtr_t * mtr)753 btr_root_get(
754 /*=========*/
755 	dict_index_t*	index,	/*!< in: index tree */
756 	mtr_t*		mtr)	/*!< in: mtr */
757 {
758 	return(buf_block_get_frame(btr_root_block_get(index, mtr)));
759 }
760 
761 /*************************************************************//**
762 Gets pointer to the previous user record in the tree. It is assumed that
763 the caller has appropriate latches on the page and its neighbor.
764 @return	previous user record, NULL if there is none */
765 UNIV_INTERN
766 rec_t*
btr_get_prev_user_rec(rec_t * rec,mtr_t * mtr)767 btr_get_prev_user_rec(
768 /*==================*/
769 	rec_t*	rec,	/*!< in: record on leaf level */
770 	mtr_t*	mtr)	/*!< in: mtr holding a latch on the page, and if
771 			needed, also to the previous page */
772 {
773 	page_t*	page;
774 	page_t*	prev_page;
775 	ulint	prev_page_no;
776 
777 	if (!page_rec_is_infimum(rec)) {
778 
779 		rec_t*	prev_rec = page_rec_get_prev(rec);
780 
781 		if (!page_rec_is_infimum(prev_rec)) {
782 
783 			return(prev_rec);
784 		}
785 	}
786 
787 	page = page_align(rec);
788 	prev_page_no = btr_page_get_prev(page, mtr);
789 
790 	if (prev_page_no != FIL_NULL) {
791 
792 		ulint		space;
793 		ulint		zip_size;
794 		buf_block_t*	prev_block;
795 
796 		space = page_get_space_id(page);
797 		zip_size = fil_space_get_zip_size(space);
798 
799 		prev_block = buf_page_get_with_no_latch(space, zip_size,
800 							prev_page_no, mtr);
801 		prev_page = buf_block_get_frame(prev_block);
802 		/* The caller must already have a latch to the brother */
803 		ut_ad(mtr_memo_contains(mtr, prev_block,
804 					MTR_MEMO_PAGE_S_FIX)
805 		      || mtr_memo_contains(mtr, prev_block,
806 					   MTR_MEMO_PAGE_X_FIX));
807 #ifdef UNIV_BTR_DEBUG
808 		ut_a(page_is_comp(prev_page) == page_is_comp(page));
809 		ut_a(btr_page_get_next(prev_page, mtr)
810 		     == page_get_page_no(page));
811 #endif /* UNIV_BTR_DEBUG */
812 
813 		return(page_rec_get_prev(page_get_supremum_rec(prev_page)));
814 	}
815 
816 	return(NULL);
817 }
818 
819 /*************************************************************//**
820 Gets pointer to the next user record in the tree. It is assumed that the
821 caller has appropriate latches on the page and its neighbor.
822 @return	next user record, NULL if there is none */
823 UNIV_INTERN
824 rec_t*
btr_get_next_user_rec(rec_t * rec,mtr_t * mtr)825 btr_get_next_user_rec(
826 /*==================*/
827 	rec_t*	rec,	/*!< in: record on leaf level */
828 	mtr_t*	mtr)	/*!< in: mtr holding a latch on the page, and if
829 			needed, also to the next page */
830 {
831 	page_t*	page;
832 	page_t*	next_page;
833 	ulint	next_page_no;
834 
835 	if (!page_rec_is_supremum(rec)) {
836 
837 		rec_t*	next_rec = page_rec_get_next(rec);
838 
839 		if (!page_rec_is_supremum(next_rec)) {
840 
841 			return(next_rec);
842 		}
843 	}
844 
845 	page = page_align(rec);
846 	next_page_no = btr_page_get_next(page, mtr);
847 
848 	if (next_page_no != FIL_NULL) {
849 		ulint		space;
850 		ulint		zip_size;
851 		buf_block_t*	next_block;
852 
853 		space = page_get_space_id(page);
854 		zip_size = fil_space_get_zip_size(space);
855 
856 		next_block = buf_page_get_with_no_latch(space, zip_size,
857 							next_page_no, mtr);
858 		next_page = buf_block_get_frame(next_block);
859 		/* The caller must already have a latch to the brother */
860 		ut_ad(mtr_memo_contains(mtr, next_block, MTR_MEMO_PAGE_S_FIX)
861 		      || mtr_memo_contains(mtr, next_block,
862 					   MTR_MEMO_PAGE_X_FIX));
863 #ifdef UNIV_BTR_DEBUG
864 		ut_a(page_is_comp(next_page) == page_is_comp(page));
865 		ut_a(btr_page_get_prev(next_page, mtr)
866 		     == page_get_page_no(page));
867 #endif /* UNIV_BTR_DEBUG */
868 
869 		return(page_rec_get_next(page_get_infimum_rec(next_page)));
870 	}
871 
872 	return(NULL);
873 }
874 
875 /**************************************************************//**
876 Creates a new index page (not the root, and also not
877 used in page reorganization).  @see btr_page_empty(). */
878 static
879 void
btr_page_create(buf_block_t * block,page_zip_des_t * page_zip,dict_index_t * index,ulint level,mtr_t * mtr)880 btr_page_create(
881 /*============*/
882 	buf_block_t*	block,	/*!< in/out: page to be created */
883 	page_zip_des_t*	page_zip,/*!< in/out: compressed page, or NULL */
884 	dict_index_t*	index,	/*!< in: index */
885 	ulint		level,	/*!< in: the B-tree level of the page */
886 	mtr_t*		mtr)	/*!< in: mtr */
887 {
888 	page_t*		page = buf_block_get_frame(block);
889 
890 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
891 	btr_blob_dbg_assert_empty(index, buf_block_get_page_no(block));
892 
893 	if (UNIV_LIKELY_NULL(page_zip)) {
894 		page_create_zip(block, index, level, mtr);
895 	} else {
896 		page_create(block, mtr, dict_table_is_comp(index->table));
897 		/* Set the level of the new index page */
898 		btr_page_set_level(page, NULL, level, mtr);
899 	}
900 
901 	block->check_index_page_at_flush = TRUE;
902 
903 	btr_page_set_index_id(page, page_zip, index->id, mtr);
904 }
905 
906 /**************************************************************//**
907 Allocates a new file page to be used in an ibuf tree. Takes the page from
908 the free list of the tree, which must contain pages!
909 @return	new allocated block, x-latched */
910 static
911 buf_block_t*
btr_page_alloc_for_ibuf(dict_index_t * index,mtr_t * mtr)912 btr_page_alloc_for_ibuf(
913 /*====================*/
914 	dict_index_t*	index,	/*!< in: index tree */
915 	mtr_t*		mtr)	/*!< in: mtr */
916 {
917 	fil_addr_t	node_addr;
918 	page_t*		root;
919 	page_t*		new_page;
920 	buf_block_t*	new_block;
921 
922 	root = btr_root_get(index, mtr);
923 
924 	node_addr = flst_get_first(root + PAGE_HEADER
925 				   + PAGE_BTR_IBUF_FREE_LIST, mtr);
926 	ut_a(node_addr.page != FIL_NULL);
927 
928 	new_block = buf_page_get(dict_index_get_space(index),
929 				 dict_table_zip_size(index->table),
930 				 node_addr.page, RW_X_LATCH, mtr);
931 	new_page = buf_block_get_frame(new_block);
932 	buf_block_dbg_add_level(new_block, SYNC_IBUF_TREE_NODE_NEW);
933 
934 	flst_remove(root + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST,
935 		    new_page + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST_NODE,
936 		    mtr);
937 	ut_ad(flst_validate(root + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST,
938 			    mtr));
939 
940 	return(new_block);
941 }
942 
943 /**************************************************************//**
944 Allocates a new file page to be used in an index tree. NOTE: we assume
945 that the caller has made the reservation for free extents!
946 @retval NULL if no page could be allocated
947 @retval block, rw_lock_x_lock_count(&block->lock) == 1 if allocation succeeded
948 (init_mtr == mtr, or the page was not previously freed in mtr)
949 @retval block (not allocated or initialized) otherwise */
950 static __attribute__((nonnull, warn_unused_result))
951 buf_block_t*
btr_page_alloc_low(dict_index_t * index,ulint hint_page_no,byte file_direction,ulint level,mtr_t * mtr,mtr_t * init_mtr)952 btr_page_alloc_low(
953 /*===============*/
954 	dict_index_t*	index,		/*!< in: index */
955 	ulint		hint_page_no,	/*!< in: hint of a good page */
956 	byte		file_direction,	/*!< in: direction where a possible
957 					page split is made */
958 	ulint		level,		/*!< in: level where the page is placed
959 					in the tree */
960 	mtr_t*		mtr,		/*!< in/out: mini-transaction
961 					for the allocation */
962 	mtr_t*		init_mtr)	/*!< in/out: mtr or another
963 					mini-transaction in which the
964 					page should be initialized.
965 					If init_mtr!=mtr, but the page
966 					is already X-latched in mtr, do
967 					not initialize the page. */
968 {
969 	fseg_header_t*	seg_header;
970 	page_t*		root;
971 
972 	root = btr_root_get(index, mtr);
973 
974 	if (level == 0) {
975 		seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_LEAF;
976 	} else {
977 		seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_TOP;
978 	}
979 
980 	/* Parameter TRUE below states that the caller has made the
981 	reservation for free extents, and thus we know that a page can
982 	be allocated: */
983 
984 	return(fseg_alloc_free_page_general(
985 		       seg_header, hint_page_no, file_direction,
986 		       TRUE, mtr, init_mtr));
987 }
988 
989 /**************************************************************//**
990 Allocates a new file page to be used in an index tree. NOTE: we assume
991 that the caller has made the reservation for free extents!
992 @retval NULL if no page could be allocated
993 @retval block, rw_lock_x_lock_count(&block->lock) == 1 if allocation succeeded
994 (init_mtr == mtr, or the page was not previously freed in mtr)
995 @retval block (not allocated or initialized) otherwise */
996 UNIV_INTERN
997 buf_block_t*
btr_page_alloc(dict_index_t * index,ulint hint_page_no,byte file_direction,ulint level,mtr_t * mtr,mtr_t * init_mtr)998 btr_page_alloc(
999 /*===========*/
1000 	dict_index_t*	index,		/*!< in: index */
1001 	ulint		hint_page_no,	/*!< in: hint of a good page */
1002 	byte		file_direction,	/*!< in: direction where a possible
1003 					page split is made */
1004 	ulint		level,		/*!< in: level where the page is placed
1005 					in the tree */
1006 	mtr_t*		mtr,		/*!< in/out: mini-transaction
1007 					for the allocation */
1008 	mtr_t*		init_mtr)	/*!< in/out: mini-transaction
1009 					for x-latching and initializing
1010 					the page */
1011 {
1012 	buf_block_t*	new_block;
1013 
1014 	if (dict_index_is_ibuf(index)) {
1015 
1016 		return(btr_page_alloc_for_ibuf(index, mtr));
1017 	}
1018 
1019 	new_block = btr_page_alloc_low(
1020 		index, hint_page_no, file_direction, level, mtr, init_mtr);
1021 
1022 	if (new_block) {
1023 		buf_block_dbg_add_level(new_block, SYNC_TREE_NODE_NEW);
1024 	}
1025 
1026 	return(new_block);
1027 }
1028 
1029 /**************************************************************//**
1030 Gets the number of pages in a B-tree.
1031 @return	number of pages, or ULINT_UNDEFINED if the index is unavailable */
1032 UNIV_INTERN
1033 ulint
btr_get_size(dict_index_t * index,ulint flag,mtr_t * mtr)1034 btr_get_size(
1035 /*=========*/
1036 	dict_index_t*	index,	/*!< in: index */
1037 	ulint		flag,	/*!< in: BTR_N_LEAF_PAGES or BTR_TOTAL_SIZE */
1038 	mtr_t*		mtr)	/*!< in/out: mini-transaction where index
1039 				is s-latched */
1040 {
1041 	fseg_header_t*	seg_header;
1042 	page_t*		root;
1043 	ulint		n;
1044 	ulint		dummy;
1045 
1046 	ut_ad(mtr_memo_contains(mtr, dict_index_get_lock(index),
1047 				MTR_MEMO_S_LOCK));
1048 
1049 	if (index->page == FIL_NULL
1050 	    || index->to_be_dropped
1051 	    || *index->name == TEMP_INDEX_PREFIX) {
1052 		return(ULINT_UNDEFINED);
1053 	}
1054 
1055 	root = btr_root_get(index, mtr);
1056 
1057 	if (flag == BTR_N_LEAF_PAGES) {
1058 		seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_LEAF;
1059 
1060 		fseg_n_reserved_pages(seg_header, &n, mtr);
1061 
1062 	} else if (flag == BTR_TOTAL_SIZE) {
1063 		seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_TOP;
1064 
1065 		n = fseg_n_reserved_pages(seg_header, &dummy, mtr);
1066 
1067 		seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_LEAF;
1068 
1069 		n += fseg_n_reserved_pages(seg_header, &dummy, mtr);
1070 	} else {
1071 		ut_error;
1072 	}
1073 
1074 	return(n);
1075 }
1076 
1077 /**************************************************************//**
1078 Frees a page used in an ibuf tree. Puts the page to the free list of the
1079 ibuf tree. */
1080 static
1081 void
btr_page_free_for_ibuf(dict_index_t * index,buf_block_t * block,mtr_t * mtr)1082 btr_page_free_for_ibuf(
1083 /*===================*/
1084 	dict_index_t*	index,	/*!< in: index tree */
1085 	buf_block_t*	block,	/*!< in: block to be freed, x-latched */
1086 	mtr_t*		mtr)	/*!< in: mtr */
1087 {
1088 	page_t*		root;
1089 
1090 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
1091 	root = btr_root_get(index, mtr);
1092 
1093 	flst_add_first(root + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST,
1094 		       buf_block_get_frame(block)
1095 		       + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST_NODE, mtr);
1096 
1097 	ut_ad(flst_validate(root + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST,
1098 			    mtr));
1099 }
1100 
1101 /**************************************************************//**
1102 Frees a file page used in an index tree. Can be used also to (BLOB)
1103 external storage pages, because the page level 0 can be given as an
1104 argument. */
1105 UNIV_INTERN
1106 void
btr_page_free_low(dict_index_t * index,buf_block_t * block,ulint level,mtr_t * mtr)1107 btr_page_free_low(
1108 /*==============*/
1109 	dict_index_t*	index,	/*!< in: index tree */
1110 	buf_block_t*	block,	/*!< in: block to be freed, x-latched */
1111 	ulint		level,	/*!< in: page level */
1112 	mtr_t*		mtr)	/*!< in: mtr */
1113 {
1114 	fseg_header_t*	seg_header;
1115 	page_t*		root;
1116 
1117 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
1118 	/* The page gets invalid for optimistic searches: increment the frame
1119 	modify clock */
1120 
1121 	buf_block_modify_clock_inc(block);
1122 	btr_blob_dbg_assert_empty(index, buf_block_get_page_no(block));
1123 
1124 	if (dict_index_is_ibuf(index)) {
1125 
1126 		btr_page_free_for_ibuf(index, block, mtr);
1127 
1128 		return;
1129 	}
1130 
1131 	root = btr_root_get(index, mtr);
1132 
1133 	if (level == 0) {
1134 		seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_LEAF;
1135 	} else {
1136 		seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_TOP;
1137 	}
1138 
1139 	fseg_free_page(seg_header,
1140 		       buf_block_get_space(block),
1141 		       buf_block_get_page_no(block), mtr);
1142 }
1143 
1144 /**************************************************************//**
1145 Frees a file page used in an index tree. NOTE: cannot free field external
1146 storage pages because the page must contain info on its level. */
1147 UNIV_INTERN
1148 void
btr_page_free(dict_index_t * index,buf_block_t * block,mtr_t * mtr)1149 btr_page_free(
1150 /*==========*/
1151 	dict_index_t*	index,	/*!< in: index tree */
1152 	buf_block_t*	block,	/*!< in: block to be freed, x-latched */
1153 	mtr_t*		mtr)	/*!< in: mtr */
1154 {
1155 	const page_t*	page	= buf_block_get_frame(block);
1156 	ulint		level	= btr_page_get_level(page, mtr);
1157 
1158 	ut_ad(fil_page_get_type(block->frame) == FIL_PAGE_INDEX);
1159 	btr_page_free_low(index, block, level, mtr);
1160 }
1161 
1162 /**************************************************************//**
1163 Sets the child node file address in a node pointer. */
1164 UNIV_INLINE
1165 void
btr_node_ptr_set_child_page_no(rec_t * rec,page_zip_des_t * page_zip,const ulint * offsets,ulint page_no,mtr_t * mtr)1166 btr_node_ptr_set_child_page_no(
1167 /*===========================*/
1168 	rec_t*		rec,	/*!< in: node pointer record */
1169 	page_zip_des_t*	page_zip,/*!< in/out: compressed page whose uncompressed
1170 				part will be updated, or NULL */
1171 	const ulint*	offsets,/*!< in: array returned by rec_get_offsets() */
1172 	ulint		page_no,/*!< in: child node address */
1173 	mtr_t*		mtr)	/*!< in: mtr */
1174 {
1175 	byte*	field;
1176 	ulint	len;
1177 
1178 	ut_ad(rec_offs_validate(rec, NULL, offsets));
1179 	ut_ad(!page_is_leaf(page_align(rec)));
1180 	ut_ad(!rec_offs_comp(offsets) || rec_get_node_ptr_flag(rec));
1181 
1182 	/* The child address is in the last field */
1183 	field = rec_get_nth_field(rec, offsets,
1184 				  rec_offs_n_fields(offsets) - 1, &len);
1185 
1186 	ut_ad(len == REC_NODE_PTR_SIZE);
1187 
1188 	if (UNIV_LIKELY_NULL(page_zip)) {
1189 		page_zip_write_node_ptr(page_zip, rec,
1190 					rec_offs_data_size(offsets),
1191 					page_no, mtr);
1192 	} else {
1193 		mlog_write_ulint(field, page_no, MLOG_4BYTES, mtr);
1194 	}
1195 }
1196 
1197 /************************************************************//**
1198 Returns the child page of a node pointer and x-latches it.
1199 @return	child page, x-latched */
1200 static
1201 buf_block_t*
btr_node_ptr_get_child(const rec_t * node_ptr,dict_index_t * index,const ulint * offsets,mtr_t * mtr)1202 btr_node_ptr_get_child(
1203 /*===================*/
1204 	const rec_t*	node_ptr,/*!< in: node pointer */
1205 	dict_index_t*	index,	/*!< in: index */
1206 	const ulint*	offsets,/*!< in: array returned by rec_get_offsets() */
1207 	mtr_t*		mtr)	/*!< in: mtr */
1208 {
1209 	ulint	page_no;
1210 	ulint	space;
1211 
1212 	ut_ad(rec_offs_validate(node_ptr, index, offsets));
1213 	space = page_get_space_id(page_align(node_ptr));
1214 	page_no = btr_node_ptr_get_child_page_no(node_ptr, offsets);
1215 
1216 	return(btr_block_get(space, dict_table_zip_size(index->table),
1217 			     page_no, RW_X_LATCH, index, mtr));
1218 }
1219 
1220 /************************************************************//**
1221 Returns the upper level node pointer to a page. It is assumed that mtr holds
1222 an x-latch on the tree.
1223 @return	rec_get_offsets() of the node pointer record */
1224 static
1225 ulint*
btr_page_get_father_node_ptr_func(ulint * offsets,mem_heap_t * heap,btr_cur_t * cursor,const char * file,ulint line,mtr_t * mtr)1226 btr_page_get_father_node_ptr_func(
1227 /*==============================*/
1228 	ulint*		offsets,/*!< in: work area for the return value */
1229 	mem_heap_t*	heap,	/*!< in: memory heap to use */
1230 	btr_cur_t*	cursor,	/*!< in: cursor pointing to user record,
1231 				out: cursor on node pointer record,
1232 				its page x-latched */
1233 	const char*	file,	/*!< in: file name */
1234 	ulint		line,	/*!< in: line where called */
1235 	mtr_t*		mtr)	/*!< in: mtr */
1236 {
1237 	dtuple_t*	tuple;
1238 	rec_t*		user_rec;
1239 	rec_t*		node_ptr;
1240 	ulint		level;
1241 	ulint		page_no;
1242 	dict_index_t*	index;
1243 
1244 	page_no = buf_block_get_page_no(btr_cur_get_block(cursor));
1245 	index = btr_cur_get_index(cursor);
1246 
1247 	ut_ad(mtr_memo_contains(mtr, dict_index_get_lock(index),
1248 				MTR_MEMO_X_LOCK));
1249 
1250 	ut_ad(dict_index_get_page(index) != page_no);
1251 
1252 	level = btr_page_get_level(btr_cur_get_page(cursor), mtr);
1253 
1254 	user_rec = btr_cur_get_rec(cursor);
1255 	ut_a(page_rec_is_user_rec(user_rec));
1256 	tuple = dict_index_build_node_ptr(index, user_rec, 0, heap, level);
1257 
1258 	btr_cur_search_to_nth_level(index, level + 1, tuple, PAGE_CUR_LE,
1259 				    BTR_CONT_MODIFY_TREE, cursor, 0,
1260 				    file, line, mtr);
1261 
1262 	node_ptr = btr_cur_get_rec(cursor);
1263 	ut_ad(!page_rec_is_comp(node_ptr)
1264 	      || rec_get_status(node_ptr) == REC_STATUS_NODE_PTR);
1265 	offsets = rec_get_offsets(node_ptr, index, offsets,
1266 				  ULINT_UNDEFINED, &heap);
1267 
1268 	if (UNIV_UNLIKELY(btr_node_ptr_get_child_page_no(node_ptr, offsets)
1269 			  != page_no)) {
1270 		rec_t*	print_rec;
1271 		fputs("InnoDB: Dump of the child page:\n", stderr);
1272 		buf_page_print(page_align(user_rec), 0,
1273 			       BUF_PAGE_PRINT_NO_CRASH);
1274 		fputs("InnoDB: Dump of the parent page:\n", stderr);
1275 		buf_page_print(page_align(node_ptr), 0,
1276 			       BUF_PAGE_PRINT_NO_CRASH);
1277 
1278 		fputs("InnoDB: Corruption of an index tree: table ", stderr);
1279 		ut_print_name(stderr, NULL, TRUE, index->table_name);
1280 		fputs(", index ", stderr);
1281 		ut_print_name(stderr, NULL, FALSE, index->name);
1282 		fprintf(stderr, ",\n"
1283 			"InnoDB: father ptr page no %lu, child page no %lu\n",
1284 			(ulong)
1285 			btr_node_ptr_get_child_page_no(node_ptr, offsets),
1286 			(ulong) page_no);
1287 		print_rec = page_rec_get_next(
1288 			page_get_infimum_rec(page_align(user_rec)));
1289 		offsets = rec_get_offsets(print_rec, index,
1290 					  offsets, ULINT_UNDEFINED, &heap);
1291 		page_rec_print(print_rec, offsets);
1292 		offsets = rec_get_offsets(node_ptr, index, offsets,
1293 					  ULINT_UNDEFINED, &heap);
1294 		page_rec_print(node_ptr, offsets);
1295 
1296 		fputs("InnoDB: You should dump + drop + reimport the table"
1297 		      " to fix the\n"
1298 		      "InnoDB: corruption. If the crash happens at "
1299 		      "the database startup, see\n"
1300 		      "InnoDB: " REFMAN "forcing-innodb-recovery.html about\n"
1301 		      "InnoDB: forcing recovery. "
1302 		      "Then dump + drop + reimport.\n", stderr);
1303 
1304 		ut_error;
1305 	}
1306 
1307 	return(offsets);
1308 }
1309 
1310 #define btr_page_get_father_node_ptr(of,heap,cur,mtr)			\
1311 	btr_page_get_father_node_ptr_func(of,heap,cur,__FILE__,__LINE__,mtr)
1312 
1313 /************************************************************//**
1314 Returns the upper level node pointer to a page. It is assumed that mtr holds
1315 an x-latch on the tree.
1316 @return	rec_get_offsets() of the node pointer record */
1317 static
1318 ulint*
btr_page_get_father_block(ulint * offsets,mem_heap_t * heap,dict_index_t * index,buf_block_t * block,mtr_t * mtr,btr_cur_t * cursor)1319 btr_page_get_father_block(
1320 /*======================*/
1321 	ulint*		offsets,/*!< in: work area for the return value */
1322 	mem_heap_t*	heap,	/*!< in: memory heap to use */
1323 	dict_index_t*	index,	/*!< in: b-tree index */
1324 	buf_block_t*	block,	/*!< in: child page in the index */
1325 	mtr_t*		mtr,	/*!< in: mtr */
1326 	btr_cur_t*	cursor)	/*!< out: cursor on node pointer record,
1327 				its page x-latched */
1328 {
1329 	rec_t*	rec
1330 		= page_rec_get_next(page_get_infimum_rec(buf_block_get_frame(
1331 								 block)));
1332 	btr_cur_position(index, rec, block, cursor);
1333 	return(btr_page_get_father_node_ptr(offsets, heap, cursor, mtr));
1334 }
1335 
1336 /************************************************************//**
1337 Seeks to the upper level node pointer to a page.
1338 It is assumed that mtr holds an x-latch on the tree. */
1339 static
1340 void
btr_page_get_father(dict_index_t * index,buf_block_t * block,mtr_t * mtr,btr_cur_t * cursor)1341 btr_page_get_father(
1342 /*================*/
1343 	dict_index_t*	index,	/*!< in: b-tree index */
1344 	buf_block_t*	block,	/*!< in: child page in the index */
1345 	mtr_t*		mtr,	/*!< in: mtr */
1346 	btr_cur_t*	cursor)	/*!< out: cursor on node pointer record,
1347 				its page x-latched */
1348 {
1349 	mem_heap_t*	heap;
1350 	rec_t*		rec
1351 		= page_rec_get_next(page_get_infimum_rec(buf_block_get_frame(
1352 								 block)));
1353 	btr_cur_position(index, rec, block, cursor);
1354 
1355 	heap = mem_heap_create(100);
1356 	btr_page_get_father_node_ptr(NULL, heap, cursor, mtr);
1357 	mem_heap_free(heap);
1358 }
1359 
1360 /************************************************************//**
1361 Creates the root node for a new index tree.
1362 @return	page number of the created root, FIL_NULL if did not succeed */
1363 UNIV_INTERN
1364 ulint
btr_create(ulint type,ulint space,ulint zip_size,index_id_t index_id,dict_index_t * index,mtr_t * mtr)1365 btr_create(
1366 /*=======*/
1367 	ulint		type,	/*!< in: type of the index */
1368 	ulint		space,	/*!< in: space where created */
1369 	ulint		zip_size,/*!< in: compressed page size in bytes
1370 				or 0 for uncompressed pages */
1371 	index_id_t	index_id,/*!< in: index id */
1372 	dict_index_t*	index,	/*!< in: index */
1373 	mtr_t*		mtr)	/*!< in: mini-transaction handle */
1374 {
1375 	ulint		page_no;
1376 	buf_block_t*	block;
1377 	buf_frame_t*	frame;
1378 	page_t*		page;
1379 	page_zip_des_t*	page_zip;
1380 
1381 	/* Create the two new segments (one, in the case of an ibuf tree) for
1382 	the index tree; the segment headers are put on the allocated root page
1383 	(for an ibuf tree, not in the root, but on a separate ibuf header
1384 	page) */
1385 
1386 	if (type & DICT_IBUF) {
1387 		/* Allocate first the ibuf header page */
1388 		buf_block_t*	ibuf_hdr_block = fseg_create(
1389 			space, 0,
1390 			IBUF_HEADER + IBUF_TREE_SEG_HEADER, mtr);
1391 
1392 		buf_block_dbg_add_level(
1393 			ibuf_hdr_block, SYNC_IBUF_TREE_NODE_NEW);
1394 
1395 		ut_ad(buf_block_get_page_no(ibuf_hdr_block)
1396 		      == IBUF_HEADER_PAGE_NO);
1397 		/* Allocate then the next page to the segment: it will be the
1398 		tree root page */
1399 
1400 		block = fseg_alloc_free_page(
1401 			buf_block_get_frame(ibuf_hdr_block)
1402 			+ IBUF_HEADER + IBUF_TREE_SEG_HEADER,
1403 			IBUF_TREE_ROOT_PAGE_NO,
1404 			FSP_UP, mtr);
1405 		ut_ad(buf_block_get_page_no(block) == IBUF_TREE_ROOT_PAGE_NO);
1406 	} else {
1407 #ifdef UNIV_BLOB_DEBUG
1408 		if ((type & DICT_CLUSTERED) && !index->blobs) {
1409 			mutex_create(PFS_NOT_INSTRUMENTED,
1410 				     &index->blobs_mutex, SYNC_ANY_LATCH);
1411 			index->blobs = rbt_create(sizeof(btr_blob_dbg_t),
1412 						  btr_blob_dbg_cmp);
1413 		}
1414 #endif /* UNIV_BLOB_DEBUG */
1415 		block = fseg_create(space, 0,
1416 				    PAGE_HEADER + PAGE_BTR_SEG_TOP, mtr);
1417 	}
1418 
1419 	if (block == NULL) {
1420 
1421 		return(FIL_NULL);
1422 	}
1423 
1424 	page_no = buf_block_get_page_no(block);
1425 	frame = buf_block_get_frame(block);
1426 
1427 	if (type & DICT_IBUF) {
1428 		/* It is an insert buffer tree: initialize the free list */
1429 		buf_block_dbg_add_level(block, SYNC_IBUF_TREE_NODE_NEW);
1430 
1431 		ut_ad(page_no == IBUF_TREE_ROOT_PAGE_NO);
1432 
1433 		flst_init(frame + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST, mtr);
1434 	} else {
1435 		/* It is a non-ibuf tree: create a file segment for leaf
1436 		pages */
1437 		buf_block_dbg_add_level(block, SYNC_TREE_NODE_NEW);
1438 
1439 		if (!fseg_create(space, page_no,
1440 				 PAGE_HEADER + PAGE_BTR_SEG_LEAF, mtr)) {
1441 			/* Not enough space for new segment, free root
1442 			segment before return. */
1443 			btr_free_root(space, zip_size, page_no, mtr);
1444 
1445 			return(FIL_NULL);
1446 		}
1447 
1448 		/* The fseg create acquires a second latch on the page,
1449 		therefore we must declare it: */
1450 		buf_block_dbg_add_level(block, SYNC_TREE_NODE_NEW);
1451 	}
1452 
1453 	/* Create a new index page on the allocated segment page */
1454 	page_zip = buf_block_get_page_zip(block);
1455 
1456 	if (UNIV_LIKELY_NULL(page_zip)) {
1457 		page = page_create_zip(block, index, 0, mtr);
1458 	} else {
1459 		page = page_create(block, mtr,
1460 				   dict_table_is_comp(index->table));
1461 		/* Set the level of the new index page */
1462 		btr_page_set_level(page, NULL, 0, mtr);
1463 	}
1464 
1465 	block->check_index_page_at_flush = TRUE;
1466 
1467 	/* Set the index id of the page */
1468 	btr_page_set_index_id(page, page_zip, index_id, mtr);
1469 
1470 	/* Set the next node and previous node fields */
1471 	btr_page_set_next(page, page_zip, FIL_NULL, mtr);
1472 	btr_page_set_prev(page, page_zip, FIL_NULL, mtr);
1473 
1474 	/* We reset the free bits for the page to allow creation of several
1475 	trees in the same mtr, otherwise the latch on a bitmap page would
1476 	prevent it because of the latching order */
1477 
1478 	if (!(type & DICT_CLUSTERED)) {
1479 		ibuf_reset_free_bits(block);
1480 	}
1481 
1482 	/* In the following assertion we test that two records of maximum
1483 	allowed size fit on the root page: this fact is needed to ensure
1484 	correctness of split algorithms */
1485 
1486 	ut_ad(page_get_max_insert_size(page, 2) > 2 * BTR_PAGE_MAX_REC_SIZE);
1487 
1488 	return(page_no);
1489 }
1490 
1491 /************************************************************//**
1492 Frees a B-tree except the root page, which MUST be freed after this
1493 by calling btr_free_root. */
1494 UNIV_INTERN
1495 void
btr_free_but_not_root(ulint space,ulint zip_size,ulint root_page_no)1496 btr_free_but_not_root(
1497 /*==================*/
1498 	ulint	space,		/*!< in: space where created */
1499 	ulint	zip_size,	/*!< in: compressed page size in bytes
1500 				or 0 for uncompressed pages */
1501 	ulint	root_page_no)	/*!< in: root page number */
1502 {
1503 	ibool	finished;
1504 	page_t*	root;
1505 	mtr_t	mtr;
1506 
1507 leaf_loop:
1508 	mtr_start(&mtr);
1509 
1510 	root = btr_page_get(space, zip_size, root_page_no, RW_X_LATCH,
1511 			    NULL, &mtr);
1512 #ifdef UNIV_BTR_DEBUG
1513 	ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_LEAF
1514 				    + root, space));
1515 	ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP
1516 				    + root, space));
1517 #endif /* UNIV_BTR_DEBUG */
1518 
1519 	/* NOTE: page hash indexes are dropped when a page is freed inside
1520 	fsp0fsp. */
1521 
1522 	finished = fseg_free_step(root + PAGE_HEADER + PAGE_BTR_SEG_LEAF,
1523 				  &mtr);
1524 	mtr_commit(&mtr);
1525 
1526 	if (!finished) {
1527 
1528 		goto leaf_loop;
1529 	}
1530 top_loop:
1531 	mtr_start(&mtr);
1532 
1533 	root = btr_page_get(space, zip_size, root_page_no, RW_X_LATCH,
1534 			    NULL, &mtr);
1535 #ifdef UNIV_BTR_DEBUG
1536 	ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP
1537 				    + root, space));
1538 #endif /* UNIV_BTR_DEBUG */
1539 
1540 	finished = fseg_free_step_not_header(
1541 		root + PAGE_HEADER + PAGE_BTR_SEG_TOP, &mtr);
1542 	mtr_commit(&mtr);
1543 
1544 	if (!finished) {
1545 
1546 		goto top_loop;
1547 	}
1548 }
1549 
1550 /************************************************************//**
1551 Frees the B-tree root page. Other tree MUST already have been freed. */
1552 UNIV_INTERN
1553 void
btr_free_root(ulint space,ulint zip_size,ulint root_page_no,mtr_t * mtr)1554 btr_free_root(
1555 /*==========*/
1556 	ulint	space,		/*!< in: space where created */
1557 	ulint	zip_size,	/*!< in: compressed page size in bytes
1558 				or 0 for uncompressed pages */
1559 	ulint	root_page_no,	/*!< in: root page number */
1560 	mtr_t*	mtr)		/*!< in/out: mini-transaction */
1561 {
1562 	buf_block_t*	block;
1563 	fseg_header_t*	header;
1564 
1565 	block = btr_block_get(space, zip_size, root_page_no, RW_X_LATCH,
1566 			      NULL, mtr);
1567 
1568 	btr_search_drop_page_hash_index(block);
1569 
1570 	header = buf_block_get_frame(block) + PAGE_HEADER + PAGE_BTR_SEG_TOP;
1571 #ifdef UNIV_BTR_DEBUG
1572 	ut_a(btr_root_fseg_validate(header, space));
1573 #endif /* UNIV_BTR_DEBUG */
1574 
1575 	while (!fseg_free_step(header, mtr));
1576 }
1577 #endif /* !UNIV_HOTBACKUP */
1578 
1579 /*************************************************************//**
1580 Reorganizes an index page. */
1581 static
1582 ibool
btr_page_reorganize_low(ibool recovery,buf_block_t * block,dict_index_t * index,mtr_t * mtr)1583 btr_page_reorganize_low(
1584 /*====================*/
1585 	ibool		recovery,/*!< in: TRUE if called in recovery:
1586 				locks should not be updated, i.e.,
1587 				there cannot exist locks on the
1588 				page, and a hash index should not be
1589 				dropped: it cannot exist */
1590 	buf_block_t*	block,	/*!< in: page to be reorganized */
1591 	dict_index_t*	index,	/*!< in: record descriptor */
1592 	mtr_t*		mtr)	/*!< in: mtr */
1593 {
1594 #ifndef UNIV_HOTBACKUP
1595 	buf_pool_t*	buf_pool	= buf_pool_from_bpage(&block->page);
1596 #endif /* !UNIV_HOTBACKUP */
1597 	page_t*		page		= buf_block_get_frame(block);
1598 	page_zip_des_t*	page_zip	= buf_block_get_page_zip(block);
1599 	buf_block_t*	temp_block;
1600 	page_t*		temp_page;
1601 	ulint		log_mode;
1602 	ulint		data_size1;
1603 	ulint		data_size2;
1604 	ulint		max_ins_size1;
1605 	ulint		max_ins_size2;
1606 	ibool		success		= FALSE;
1607 
1608 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
1609 	btr_assert_not_corrupted(block, index);
1610 #ifdef UNIV_ZIP_DEBUG
1611 	ut_a(!page_zip || page_zip_validate(page_zip, page, index));
1612 #endif /* UNIV_ZIP_DEBUG */
1613 	data_size1 = page_get_data_size(page);
1614 	max_ins_size1 = page_get_max_insert_size_after_reorganize(page, 1);
1615 
1616 #ifndef UNIV_HOTBACKUP
1617 	/* Write the log record */
1618 	mlog_open_and_write_index(mtr, page, index, page_is_comp(page)
1619 				  ? MLOG_COMP_PAGE_REORGANIZE
1620 				  : MLOG_PAGE_REORGANIZE, 0);
1621 #endif /* !UNIV_HOTBACKUP */
1622 
1623 	/* Turn logging off */
1624 	log_mode = mtr_set_log_mode(mtr, MTR_LOG_NONE);
1625 
1626 #ifndef UNIV_HOTBACKUP
1627 	temp_block = buf_block_alloc(buf_pool);
1628 #else /* !UNIV_HOTBACKUP */
1629 	ut_ad(block == back_block1);
1630 	temp_block = back_block2;
1631 #endif /* !UNIV_HOTBACKUP */
1632 	temp_page = temp_block->frame;
1633 
1634 	/* Copy the old page to temporary space */
1635 	buf_frame_copy(temp_page, page);
1636 
1637 #ifndef UNIV_HOTBACKUP
1638 	if (UNIV_LIKELY(!recovery)) {
1639 		btr_search_drop_page_hash_index(block);
1640 	}
1641 
1642 	block->check_index_page_at_flush = TRUE;
1643 #endif /* !UNIV_HOTBACKUP */
1644 	btr_blob_dbg_remove(page, index, "btr_page_reorganize");
1645 
1646 	/* Recreate the page: note that global data on page (possible
1647 	segment headers, next page-field, etc.) is preserved intact */
1648 
1649 	page_create(block, mtr, dict_table_is_comp(index->table));
1650 
1651 	/* Copy the records from the temporary space to the recreated page;
1652 	do not copy the lock bits yet */
1653 
1654 	page_copy_rec_list_end_no_locks(block, temp_block,
1655 					page_get_infimum_rec(temp_page),
1656 					index, mtr);
1657 
1658 	if (dict_index_is_sec_or_ibuf(index) && page_is_leaf(page)) {
1659 		/* Copy max trx id to recreated page */
1660 		trx_id_t	max_trx_id = page_get_max_trx_id(temp_page);
1661 		page_set_max_trx_id(block, NULL, max_trx_id, mtr);
1662 		/* In crash recovery, dict_index_is_sec_or_ibuf() always
1663 		returns TRUE, even for clustered indexes.  max_trx_id is
1664 		unused in clustered index pages. */
1665 		ut_ad(max_trx_id != 0 || recovery);
1666 	}
1667 
1668 	if (UNIV_LIKELY_NULL(page_zip)
1669 	    && UNIV_UNLIKELY
1670 	    (!page_zip_compress(page_zip, page, index, NULL))) {
1671 
1672 		/* Restore the old page and exit. */
1673 		btr_blob_dbg_restore(page, temp_page, index,
1674 				     "btr_page_reorganize_compress_fail");
1675 
1676 #if defined UNIV_DEBUG || defined UNIV_ZIP_DEBUG
1677 		/* Check that the bytes that we skip are identical. */
1678 		ut_a(!memcmp(page, temp_page, PAGE_HEADER));
1679 		ut_a(!memcmp(PAGE_HEADER + PAGE_N_RECS + page,
1680 			     PAGE_HEADER + PAGE_N_RECS + temp_page,
1681 			     PAGE_DATA - (PAGE_HEADER + PAGE_N_RECS)));
1682 		ut_a(!memcmp(UNIV_PAGE_SIZE - FIL_PAGE_DATA_END + page,
1683 			     UNIV_PAGE_SIZE - FIL_PAGE_DATA_END + temp_page,
1684 			     FIL_PAGE_DATA_END));
1685 #endif /* UNIV_DEBUG || UNIV_ZIP_DEBUG */
1686 
1687 		memcpy(PAGE_HEADER + page, PAGE_HEADER + temp_page,
1688 		       PAGE_N_RECS - PAGE_N_DIR_SLOTS);
1689 		memcpy(PAGE_DATA + page, PAGE_DATA + temp_page,
1690 		       UNIV_PAGE_SIZE - PAGE_DATA - FIL_PAGE_DATA_END);
1691 
1692 #if defined UNIV_DEBUG || defined UNIV_ZIP_DEBUG
1693 		ut_a(!memcmp(page, temp_page, UNIV_PAGE_SIZE));
1694 #endif /* UNIV_DEBUG || UNIV_ZIP_DEBUG */
1695 
1696 		goto func_exit;
1697 	}
1698 
1699 #ifndef UNIV_HOTBACKUP
1700 	if (UNIV_LIKELY(!recovery)) {
1701 		/* Update the record lock bitmaps */
1702 		lock_move_reorganize_page(block, temp_block);
1703 	}
1704 #endif /* !UNIV_HOTBACKUP */
1705 
1706 	data_size2 = page_get_data_size(page);
1707 	max_ins_size2 = page_get_max_insert_size_after_reorganize(page, 1);
1708 
1709 	if (UNIV_UNLIKELY(data_size1 != data_size2)
1710 	    || UNIV_UNLIKELY(max_ins_size1 != max_ins_size2)) {
1711 		buf_page_print(page, 0, BUF_PAGE_PRINT_NO_CRASH);
1712 		buf_page_print(temp_page, 0, BUF_PAGE_PRINT_NO_CRASH);
1713 		fprintf(stderr,
1714 			"InnoDB: Error: page old data size %lu"
1715 			" new data size %lu\n"
1716 			"InnoDB: Error: page old max ins size %lu"
1717 			" new max ins size %lu\n"
1718 			"InnoDB: Submit a detailed bug report"
1719 			" to http://bugs.mysql.com\n",
1720 			(unsigned long) data_size1, (unsigned long) data_size2,
1721 			(unsigned long) max_ins_size1,
1722 			(unsigned long) max_ins_size2);
1723 		ut_ad(0);
1724 	} else {
1725 		success = TRUE;
1726 	}
1727 
1728 func_exit:
1729 #ifdef UNIV_ZIP_DEBUG
1730 	ut_a(!page_zip || page_zip_validate(page_zip, page, index));
1731 #endif /* UNIV_ZIP_DEBUG */
1732 #ifndef UNIV_HOTBACKUP
1733 	buf_block_free(temp_block);
1734 #endif /* !UNIV_HOTBACKUP */
1735 
1736 	/* Restore logging mode */
1737 	mtr_set_log_mode(mtr, log_mode);
1738 
1739 	return(success);
1740 }
1741 
1742 #ifndef UNIV_HOTBACKUP
1743 /*************************************************************//**
1744 Reorganizes an index page.
1745 IMPORTANT: if btr_page_reorganize() is invoked on a compressed leaf
1746 page of a non-clustered index, the caller must update the insert
1747 buffer free bits in the same mini-transaction in such a way that the
1748 modification will be redo-logged.
1749 @return	TRUE on success, FALSE on failure */
1750 UNIV_INTERN
1751 ibool
btr_page_reorganize(buf_block_t * block,dict_index_t * index,mtr_t * mtr)1752 btr_page_reorganize(
1753 /*================*/
1754 	buf_block_t*	block,	/*!< in: page to be reorganized */
1755 	dict_index_t*	index,	/*!< in: record descriptor */
1756 	mtr_t*		mtr)	/*!< in: mtr */
1757 {
1758 	return(btr_page_reorganize_low(FALSE, block, index, mtr));
1759 }
1760 #endif /* !UNIV_HOTBACKUP */
1761 
1762 /***********************************************************//**
1763 Parses a redo log record of reorganizing a page.
1764 @return	end of log record or NULL */
1765 UNIV_INTERN
1766 byte*
btr_parse_page_reorganize(byte * ptr,byte * end_ptr,dict_index_t * index,buf_block_t * block,mtr_t * mtr)1767 btr_parse_page_reorganize(
1768 /*======================*/
1769 	byte*		ptr,	/*!< in: buffer */
1770 	byte*		end_ptr __attribute__((unused)),
1771 				/*!< in: buffer end */
1772 	dict_index_t*	index,	/*!< in: record descriptor */
1773 	buf_block_t*	block,	/*!< in: page to be reorganized, or NULL */
1774 	mtr_t*		mtr)	/*!< in: mtr or NULL */
1775 {
1776 	ut_ad(ptr && end_ptr);
1777 
1778 	/* The record is empty, except for the record initial part */
1779 
1780 	if (UNIV_LIKELY(block != NULL)) {
1781 		btr_page_reorganize_low(TRUE, block, index, mtr);
1782 	}
1783 
1784 	return(ptr);
1785 }
1786 
1787 #ifndef UNIV_HOTBACKUP
1788 /*************************************************************//**
1789 Empties an index page.  @see btr_page_create(). */
1790 static
1791 void
btr_page_empty(buf_block_t * block,page_zip_des_t * page_zip,dict_index_t * index,ulint level,mtr_t * mtr)1792 btr_page_empty(
1793 /*===========*/
1794 	buf_block_t*	block,	/*!< in: page to be emptied */
1795 	page_zip_des_t*	page_zip,/*!< out: compressed page, or NULL */
1796 	dict_index_t*	index,	/*!< in: index of the page */
1797 	ulint		level,	/*!< in: the B-tree level of the page */
1798 	mtr_t*		mtr)	/*!< in: mtr */
1799 {
1800 	page_t*	page = buf_block_get_frame(block);
1801 
1802 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
1803 	ut_ad(page_zip == buf_block_get_page_zip(block));
1804 #ifdef UNIV_ZIP_DEBUG
1805 	ut_a(!page_zip || page_zip_validate(page_zip, page, index));
1806 #endif /* UNIV_ZIP_DEBUG */
1807 
1808 	btr_search_drop_page_hash_index(block);
1809 	btr_blob_dbg_remove(page, index, "btr_page_empty");
1810 
1811 	/* Recreate the page: note that global data on page (possible
1812 	segment headers, next page-field, etc.) is preserved intact */
1813 
1814 	if (UNIV_LIKELY_NULL(page_zip)) {
1815 		page_create_zip(block, index, level, mtr);
1816 	} else {
1817 		page_create(block, mtr, dict_table_is_comp(index->table));
1818 		btr_page_set_level(page, NULL, level, mtr);
1819 	}
1820 
1821 	block->check_index_page_at_flush = TRUE;
1822 }
1823 
1824 /*************************************************************//**
1825 Makes tree one level higher by splitting the root, and inserts
1826 the tuple. It is assumed that mtr contains an x-latch on the tree.
1827 NOTE that the operation of this function must always succeed,
1828 we cannot reverse it: therefore enough free disk space must be
1829 guaranteed to be available before this function is called.
1830 @return	inserted record */
1831 UNIV_INTERN
1832 rec_t*
btr_root_raise_and_insert(btr_cur_t * cursor,const dtuple_t * tuple,ulint n_ext,mtr_t * mtr)1833 btr_root_raise_and_insert(
1834 /*======================*/
1835 	btr_cur_t*	cursor,	/*!< in: cursor at which to insert: must be
1836 				on the root page; when the function returns,
1837 				the cursor is positioned on the predecessor
1838 				of the inserted record */
1839 	const dtuple_t*	tuple,	/*!< in: tuple to insert */
1840 	ulint		n_ext,	/*!< in: number of externally stored columns */
1841 	mtr_t*		mtr)	/*!< in: mtr */
1842 {
1843 	dict_index_t*	index;
1844 	page_t*		root;
1845 	page_t*		new_page;
1846 	ulint		new_page_no;
1847 	rec_t*		rec;
1848 	mem_heap_t*	heap;
1849 	dtuple_t*	node_ptr;
1850 	ulint		level;
1851 	rec_t*		node_ptr_rec;
1852 	page_cur_t*	page_cursor;
1853 	page_zip_des_t*	root_page_zip;
1854 	page_zip_des_t*	new_page_zip;
1855 	buf_block_t*	root_block;
1856 	buf_block_t*	new_block;
1857 
1858 	root = btr_cur_get_page(cursor);
1859 	root_block = btr_cur_get_block(cursor);
1860 	root_page_zip = buf_block_get_page_zip(root_block);
1861 	ut_ad(page_get_n_recs(root) > 0);
1862 	index = btr_cur_get_index(cursor);
1863 #ifdef UNIV_ZIP_DEBUG
1864 	ut_a(!root_page_zip || page_zip_validate(root_page_zip, root, index));
1865 #endif /* UNIV_ZIP_DEBUG */
1866 #ifdef UNIV_BTR_DEBUG
1867 	if (!dict_index_is_ibuf(index)) {
1868 		ulint	space = dict_index_get_space(index);
1869 
1870 		ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_LEAF
1871 					    + root, space));
1872 		ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP
1873 					    + root, space));
1874 	}
1875 
1876 	ut_a(dict_index_get_page(index) == page_get_page_no(root));
1877 #endif /* UNIV_BTR_DEBUG */
1878 	ut_ad(mtr_memo_contains(mtr, dict_index_get_lock(index),
1879 				MTR_MEMO_X_LOCK));
1880 	ut_ad(mtr_memo_contains(mtr, root_block, MTR_MEMO_PAGE_X_FIX));
1881 
1882 	/* Allocate a new page to the tree. Root splitting is done by first
1883 	moving the root records to the new page, emptying the root, putting
1884 	a node pointer to the new page, and then splitting the new page. */
1885 
1886 	level = btr_page_get_level(root, mtr);
1887 
1888 	new_block = btr_page_alloc(index, 0, FSP_NO_DIR, level, mtr, mtr);
1889 	new_page = buf_block_get_frame(new_block);
1890 	new_page_zip = buf_block_get_page_zip(new_block);
1891 	ut_a(!new_page_zip == !root_page_zip);
1892 	ut_a(!new_page_zip
1893 	     || page_zip_get_size(new_page_zip)
1894 	     == page_zip_get_size(root_page_zip));
1895 
1896 	btr_page_create(new_block, new_page_zip, index, level, mtr);
1897 
1898 	/* Set the next node and previous node fields of new page */
1899 	btr_page_set_next(new_page, new_page_zip, FIL_NULL, mtr);
1900 	btr_page_set_prev(new_page, new_page_zip, FIL_NULL, mtr);
1901 
1902 	/* Copy the records from root to the new page one by one. */
1903 
1904 	if (0
1905 #ifdef UNIV_ZIP_COPY
1906 	    || new_page_zip
1907 #endif /* UNIV_ZIP_COPY */
1908 	    || UNIV_UNLIKELY
1909 	    (!page_copy_rec_list_end(new_block, root_block,
1910 				     page_get_infimum_rec(root),
1911 				     index, mtr))) {
1912 		ut_a(new_page_zip);
1913 
1914 		/* Copy the page byte for byte. */
1915 		page_zip_copy_recs(new_page_zip, new_page,
1916 				   root_page_zip, root, index, mtr);
1917 
1918 		/* Update the lock table and possible hash index. */
1919 
1920 		lock_move_rec_list_end(new_block, root_block,
1921 				       page_get_infimum_rec(root));
1922 
1923 		btr_search_move_or_delete_hash_entries(new_block, root_block,
1924 						       index);
1925 	}
1926 
1927 	/* If this is a pessimistic insert which is actually done to
1928 	perform a pessimistic update then we have stored the lock
1929 	information of the record to be inserted on the infimum of the
1930 	root page: we cannot discard the lock structs on the root page */
1931 
1932 	lock_update_root_raise(new_block, root_block);
1933 
1934 	/* Create a memory heap where the node pointer is stored */
1935 	heap = mem_heap_create(100);
1936 
1937 	rec = page_rec_get_next(page_get_infimum_rec(new_page));
1938 	new_page_no = buf_block_get_page_no(new_block);
1939 
1940 	/* Build the node pointer (= node key and page address) for the
1941 	child */
1942 
1943 	node_ptr = dict_index_build_node_ptr(index, rec, new_page_no, heap,
1944 					     level);
1945 	/* The node pointer must be marked as the predefined minimum record,
1946 	as there is no lower alphabetical limit to records in the leftmost
1947 	node of a level: */
1948 	dtuple_set_info_bits(node_ptr,
1949 			     dtuple_get_info_bits(node_ptr)
1950 			     | REC_INFO_MIN_REC_FLAG);
1951 
1952 	/* Rebuild the root page to get free space */
1953 	btr_page_empty(root_block, root_page_zip, index, level + 1, mtr);
1954 
1955 	/* Set the next node and previous node fields, although
1956 	they should already have been set.  The previous node field
1957 	must be FIL_NULL if root_page_zip != NULL, because the
1958 	REC_INFO_MIN_REC_FLAG (of the first user record) will be
1959 	set if and only if btr_page_get_prev() == FIL_NULL. */
1960 	btr_page_set_next(root, root_page_zip, FIL_NULL, mtr);
1961 	btr_page_set_prev(root, root_page_zip, FIL_NULL, mtr);
1962 
1963 	page_cursor = btr_cur_get_page_cur(cursor);
1964 
1965 	/* Insert node pointer to the root */
1966 
1967 	page_cur_set_before_first(root_block, page_cursor);
1968 
1969 	node_ptr_rec = page_cur_tuple_insert(page_cursor, node_ptr,
1970 					     index, 0, mtr);
1971 
1972 	/* The root page should only contain the node pointer
1973 	to new_page at this point.  Thus, the data should fit. */
1974 	ut_a(node_ptr_rec);
1975 
1976 	/* Free the memory heap */
1977 	mem_heap_free(heap);
1978 
1979 	/* We play safe and reset the free bits for the new page */
1980 
1981 #if 0
1982 	fprintf(stderr, "Root raise new page no %lu\n", new_page_no);
1983 #endif
1984 
1985 	if (!dict_index_is_clust(index)) {
1986 		ibuf_reset_free_bits(new_block);
1987 	}
1988 
1989 	/* Reposition the cursor to the child node */
1990 	page_cur_search(new_block, index, tuple,
1991 			PAGE_CUR_LE, page_cursor);
1992 
1993 	/* Split the child and insert tuple */
1994 	return(btr_page_split_and_insert(cursor, tuple, n_ext, mtr));
1995 }
1996 
1997 /*************************************************************//**
1998 Decides if the page should be split at the convergence point of inserts
1999 converging to the left.
2000 @return	TRUE if split recommended */
2001 UNIV_INTERN
2002 ibool
btr_page_get_split_rec_to_left(btr_cur_t * cursor,rec_t ** split_rec)2003 btr_page_get_split_rec_to_left(
2004 /*===========================*/
2005 	btr_cur_t*	cursor,	/*!< in: cursor at which to insert */
2006 	rec_t**		split_rec) /*!< out: if split recommended,
2007 				the first record on upper half page,
2008 				or NULL if tuple to be inserted should
2009 				be first */
2010 {
2011 	page_t*	page;
2012 	rec_t*	insert_point;
2013 	rec_t*	infimum;
2014 
2015 	page = btr_cur_get_page(cursor);
2016 	insert_point = btr_cur_get_rec(cursor);
2017 
2018 	if (page_header_get_ptr(page, PAGE_LAST_INSERT)
2019 	    == page_rec_get_next(insert_point)) {
2020 
2021 		infimum = page_get_infimum_rec(page);
2022 
2023 		/* If the convergence is in the middle of a page, include also
2024 		the record immediately before the new insert to the upper
2025 		page. Otherwise, we could repeatedly move from page to page
2026 		lots of records smaller than the convergence point. */
2027 
2028 		if (infimum != insert_point
2029 		    && page_rec_get_next(infimum) != insert_point) {
2030 
2031 			*split_rec = insert_point;
2032 		} else {
2033 			*split_rec = page_rec_get_next(insert_point);
2034 		}
2035 
2036 		return(TRUE);
2037 	}
2038 
2039 	return(FALSE);
2040 }
2041 
2042 /*************************************************************//**
2043 Decides if the page should be split at the convergence point of inserts
2044 converging to the right.
2045 @return	TRUE if split recommended */
2046 UNIV_INTERN
2047 ibool
btr_page_get_split_rec_to_right(btr_cur_t * cursor,rec_t ** split_rec)2048 btr_page_get_split_rec_to_right(
2049 /*============================*/
2050 	btr_cur_t*	cursor,	/*!< in: cursor at which to insert */
2051 	rec_t**		split_rec) /*!< out: if split recommended,
2052 				the first record on upper half page,
2053 				or NULL if tuple to be inserted should
2054 				be first */
2055 {
2056 	page_t*	page;
2057 	rec_t*	insert_point;
2058 
2059 	page = btr_cur_get_page(cursor);
2060 	insert_point = btr_cur_get_rec(cursor);
2061 
2062 	/* We use eager heuristics: if the new insert would be right after
2063 	the previous insert on the same page, we assume that there is a
2064 	pattern of sequential inserts here. */
2065 
2066 	if (UNIV_LIKELY(page_header_get_ptr(page, PAGE_LAST_INSERT)
2067 			== insert_point)) {
2068 
2069 		rec_t*	next_rec;
2070 
2071 		next_rec = page_rec_get_next(insert_point);
2072 
2073 		if (page_rec_is_supremum(next_rec)) {
2074 split_at_new:
2075 			/* Split at the new record to insert */
2076 			*split_rec = NULL;
2077 		} else {
2078 			rec_t*	next_next_rec = page_rec_get_next(next_rec);
2079 			if (page_rec_is_supremum(next_next_rec)) {
2080 
2081 				goto split_at_new;
2082 			}
2083 
2084 			/* If there are >= 2 user records up from the insert
2085 			point, split all but 1 off. We want to keep one because
2086 			then sequential inserts can use the adaptive hash
2087 			index, as they can do the necessary checks of the right
2088 			search position just by looking at the records on this
2089 			page. */
2090 
2091 			*split_rec = next_next_rec;
2092 		}
2093 
2094 		return(TRUE);
2095 	}
2096 
2097 	return(FALSE);
2098 }
2099 
2100 /*************************************************************//**
2101 Calculates a split record such that the tuple will certainly fit on
2102 its half-page when the split is performed. We assume in this function
2103 only that the cursor page has at least one user record.
2104 @return split record, or NULL if tuple will be the first record on
2105 the lower or upper half-page (determined by btr_page_tuple_smaller()) */
2106 static
2107 rec_t*
btr_page_get_split_rec(btr_cur_t * cursor,const dtuple_t * tuple,ulint n_ext)2108 btr_page_get_split_rec(
2109 /*===================*/
2110 	btr_cur_t*	cursor,	/*!< in: cursor at which insert should be made */
2111 	const dtuple_t*	tuple,	/*!< in: tuple to insert */
2112 	ulint		n_ext)	/*!< in: number of externally stored columns */
2113 {
2114 	page_t*		page;
2115 	page_zip_des_t*	page_zip;
2116 	ulint		insert_size;
2117 	ulint		free_space;
2118 	ulint		total_data;
2119 	ulint		total_n_recs;
2120 	ulint		total_space;
2121 	ulint		incl_data;
2122 	rec_t*		ins_rec;
2123 	rec_t*		rec;
2124 	rec_t*		next_rec;
2125 	ulint		n;
2126 	mem_heap_t*	heap;
2127 	ulint*		offsets;
2128 
2129 	page = btr_cur_get_page(cursor);
2130 
2131 	insert_size = rec_get_converted_size(cursor->index, tuple, n_ext);
2132 	free_space  = page_get_free_space_of_empty(page_is_comp(page));
2133 
2134 	page_zip = btr_cur_get_page_zip(cursor);
2135 	if (UNIV_LIKELY_NULL(page_zip)) {
2136 		/* Estimate the free space of an empty compressed page. */
2137 		ulint	free_space_zip = page_zip_empty_size(
2138 			cursor->index->n_fields,
2139 			page_zip_get_size(page_zip));
2140 
2141 		if (UNIV_LIKELY(free_space > (ulint) free_space_zip)) {
2142 			free_space = (ulint) free_space_zip;
2143 		}
2144 	}
2145 
2146 	/* free_space is now the free space of a created new page */
2147 
2148 	total_data   = page_get_data_size(page) + insert_size;
2149 	total_n_recs = page_get_n_recs(page) + 1;
2150 	ut_ad(total_n_recs >= 2);
2151 	total_space  = total_data + page_dir_calc_reserved_space(total_n_recs);
2152 
2153 	n = 0;
2154 	incl_data = 0;
2155 	ins_rec = btr_cur_get_rec(cursor);
2156 	rec = page_get_infimum_rec(page);
2157 
2158 	heap = NULL;
2159 	offsets = NULL;
2160 
2161 	/* We start to include records to the left half, and when the
2162 	space reserved by them exceeds half of total_space, then if
2163 	the included records fit on the left page, they will be put there
2164 	if something was left over also for the right page,
2165 	otherwise the last included record will be the first on the right
2166 	half page */
2167 
2168 	do {
2169 		/* Decide the next record to include */
2170 		if (rec == ins_rec) {
2171 			rec = NULL;	/* NULL denotes that tuple is
2172 					now included */
2173 		} else if (rec == NULL) {
2174 			rec = page_rec_get_next(ins_rec);
2175 		} else {
2176 			rec = page_rec_get_next(rec);
2177 		}
2178 
2179 		if (rec == NULL) {
2180 			/* Include tuple */
2181 			incl_data += insert_size;
2182 		} else {
2183 			offsets = rec_get_offsets(rec, cursor->index,
2184 						  offsets, ULINT_UNDEFINED,
2185 						  &heap);
2186 			incl_data += rec_offs_size(offsets);
2187 		}
2188 
2189 		n++;
2190 	} while (incl_data + page_dir_calc_reserved_space(n)
2191 		 < total_space / 2);
2192 
2193 	if (incl_data + page_dir_calc_reserved_space(n) <= free_space) {
2194 		/* The next record will be the first on
2195 		the right half page if it is not the
2196 		supremum record of page */
2197 
2198 		if (rec == ins_rec) {
2199 			rec = NULL;
2200 
2201 			goto func_exit;
2202 		} else if (rec == NULL) {
2203 			next_rec = page_rec_get_next(ins_rec);
2204 		} else {
2205 			next_rec = page_rec_get_next(rec);
2206 		}
2207 		ut_ad(next_rec);
2208 		if (!page_rec_is_supremum(next_rec)) {
2209 			rec = next_rec;
2210 		}
2211 	}
2212 
2213 func_exit:
2214 	if (UNIV_LIKELY_NULL(heap)) {
2215 		mem_heap_free(heap);
2216 	}
2217 	return(rec);
2218 }
2219 
2220 /*************************************************************//**
2221 Returns TRUE if the insert fits on the appropriate half-page with the
2222 chosen split_rec.
2223 @return	TRUE if fits */
2224 static
2225 ibool
btr_page_insert_fits(btr_cur_t * cursor,const rec_t * split_rec,const ulint * offsets,const dtuple_t * tuple,ulint n_ext,mem_heap_t * heap)2226 btr_page_insert_fits(
2227 /*=================*/
2228 	btr_cur_t*	cursor,	/*!< in: cursor at which insert
2229 				should be made */
2230 	const rec_t*	split_rec,/*!< in: suggestion for first record
2231 				on upper half-page, or NULL if
2232 				tuple to be inserted should be first */
2233 	const ulint*	offsets,/*!< in: rec_get_offsets(
2234 				split_rec, cursor->index) */
2235 	const dtuple_t*	tuple,	/*!< in: tuple to insert */
2236 	ulint		n_ext,	/*!< in: number of externally stored columns */
2237 	mem_heap_t*	heap)	/*!< in: temporary memory heap */
2238 {
2239 	page_t*		page;
2240 	ulint		insert_size;
2241 	ulint		free_space;
2242 	ulint		total_data;
2243 	ulint		total_n_recs;
2244 	const rec_t*	rec;
2245 	const rec_t*	end_rec;
2246 	ulint*		offs;
2247 
2248 	page = btr_cur_get_page(cursor);
2249 
2250 	ut_ad(!split_rec == !offsets);
2251 	ut_ad(!offsets
2252 	      || !page_is_comp(page) == !rec_offs_comp(offsets));
2253 	ut_ad(!offsets
2254 	      || rec_offs_validate(split_rec, cursor->index, offsets));
2255 
2256 	insert_size = rec_get_converted_size(cursor->index, tuple, n_ext);
2257 	free_space  = page_get_free_space_of_empty(page_is_comp(page));
2258 
2259 	/* free_space is now the free space of a created new page */
2260 
2261 	total_data   = page_get_data_size(page) + insert_size;
2262 	total_n_recs = page_get_n_recs(page) + 1;
2263 
2264 	/* We determine which records (from rec to end_rec, not including
2265 	end_rec) will end up on the other half page from tuple when it is
2266 	inserted. */
2267 
2268 	if (split_rec == NULL) {
2269 		rec = page_rec_get_next(page_get_infimum_rec(page));
2270 		end_rec = page_rec_get_next(btr_cur_get_rec(cursor));
2271 
2272 	} else if (cmp_dtuple_rec(tuple, split_rec, offsets) >= 0) {
2273 
2274 		rec = page_rec_get_next(page_get_infimum_rec(page));
2275 		end_rec = split_rec;
2276 	} else {
2277 		rec = split_rec;
2278 		end_rec = page_get_supremum_rec(page);
2279 	}
2280 
2281 	if (total_data + page_dir_calc_reserved_space(total_n_recs)
2282 	    <= free_space) {
2283 
2284 		/* Ok, there will be enough available space on the
2285 		half page where the tuple is inserted */
2286 
2287 		return(TRUE);
2288 	}
2289 
2290 	offs = NULL;
2291 
2292 	while (rec != end_rec) {
2293 		/* In this loop we calculate the amount of reserved
2294 		space after rec is removed from page. */
2295 
2296 		offs = rec_get_offsets(rec, cursor->index, offs,
2297 				       ULINT_UNDEFINED, &heap);
2298 
2299 		total_data -= rec_offs_size(offs);
2300 		total_n_recs--;
2301 
2302 		if (total_data + page_dir_calc_reserved_space(total_n_recs)
2303 		    <= free_space) {
2304 
2305 			/* Ok, there will be enough available space on the
2306 			half page where the tuple is inserted */
2307 
2308 			return(TRUE);
2309 		}
2310 
2311 		rec = page_rec_get_next_const(rec);
2312 	}
2313 
2314 	return(FALSE);
2315 }
2316 
2317 /*******************************************************//**
2318 Inserts a data tuple to a tree on a non-leaf level. It is assumed
2319 that mtr holds an x-latch on the tree. */
2320 UNIV_INTERN
2321 void
btr_insert_on_non_leaf_level_func(dict_index_t * index,ulint level,dtuple_t * tuple,const char * file,ulint line,mtr_t * mtr)2322 btr_insert_on_non_leaf_level_func(
2323 /*==============================*/
2324 	dict_index_t*	index,	/*!< in: index */
2325 	ulint		level,	/*!< in: level, must be > 0 */
2326 	dtuple_t*	tuple,	/*!< in: the record to be inserted */
2327 	const char*	file,	/*!< in: file name */
2328 	ulint		line,	/*!< in: line where called */
2329 	mtr_t*		mtr)	/*!< in: mtr */
2330 {
2331 	big_rec_t*	dummy_big_rec;
2332 	btr_cur_t	cursor;
2333 	ulint		err;
2334 	rec_t*		rec;
2335 
2336 	ut_ad(level > 0);
2337 
2338 	btr_cur_search_to_nth_level(index, level, tuple, PAGE_CUR_LE,
2339 				    BTR_CONT_MODIFY_TREE,
2340 				    &cursor, 0, file, line, mtr);
2341 
2342 	ut_ad(cursor.flag == BTR_CUR_BINARY);
2343 
2344 	err = btr_cur_optimistic_insert(
2345 		BTR_NO_LOCKING_FLAG | BTR_KEEP_SYS_FLAG
2346 		| BTR_NO_UNDO_LOG_FLAG, &cursor, tuple, &rec,
2347 		&dummy_big_rec, 0, NULL, mtr);
2348 
2349 	if (err == DB_FAIL) {
2350 		err = btr_cur_pessimistic_insert(
2351 			BTR_NO_LOCKING_FLAG | BTR_KEEP_SYS_FLAG
2352 			| BTR_NO_UNDO_LOG_FLAG,
2353 			&cursor, tuple, &rec, &dummy_big_rec, 0, NULL, mtr);
2354 		ut_a(err == DB_SUCCESS);
2355 	}
2356 }
2357 
2358 /**************************************************************//**
2359 Attaches the halves of an index page on the appropriate level in an
2360 index tree. */
2361 static
2362 void
btr_attach_half_pages(dict_index_t * index,buf_block_t * block,rec_t * split_rec,buf_block_t * new_block,ulint direction,mtr_t * mtr)2363 btr_attach_half_pages(
2364 /*==================*/
2365 	dict_index_t*	index,		/*!< in: the index tree */
2366 	buf_block_t*	block,		/*!< in/out: page to be split */
2367 	rec_t*		split_rec,	/*!< in: first record on upper
2368 					half page */
2369 	buf_block_t*	new_block,	/*!< in/out: the new half page */
2370 	ulint		direction,	/*!< in: FSP_UP or FSP_DOWN */
2371 	mtr_t*		mtr)		/*!< in: mtr */
2372 {
2373 	ulint		space;
2374 	ulint		zip_size;
2375 	ulint		prev_page_no;
2376 	ulint		next_page_no;
2377 	ulint		level;
2378 	page_t*		page		= buf_block_get_frame(block);
2379 	page_t*		lower_page;
2380 	page_t*		upper_page;
2381 	ulint		lower_page_no;
2382 	ulint		upper_page_no;
2383 	page_zip_des_t*	lower_page_zip;
2384 	page_zip_des_t*	upper_page_zip;
2385 	dtuple_t*	node_ptr_upper;
2386 	mem_heap_t*	heap;
2387 
2388 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
2389 	ut_ad(mtr_memo_contains(mtr, new_block, MTR_MEMO_PAGE_X_FIX));
2390 
2391 	/* Create a memory heap where the data tuple is stored */
2392 	heap = mem_heap_create(1024);
2393 
2394 	/* Based on split direction, decide upper and lower pages */
2395 	if (direction == FSP_DOWN) {
2396 
2397 		btr_cur_t	cursor;
2398 		ulint*		offsets;
2399 
2400 		lower_page = buf_block_get_frame(new_block);
2401 		lower_page_no = buf_block_get_page_no(new_block);
2402 		lower_page_zip = buf_block_get_page_zip(new_block);
2403 		upper_page = buf_block_get_frame(block);
2404 		upper_page_no = buf_block_get_page_no(block);
2405 		upper_page_zip = buf_block_get_page_zip(block);
2406 
2407 		/* Look up the index for the node pointer to page */
2408 		offsets = btr_page_get_father_block(NULL, heap, index,
2409 						    block, mtr, &cursor);
2410 
2411 		/* Replace the address of the old child node (= page) with the
2412 		address of the new lower half */
2413 
2414 		btr_node_ptr_set_child_page_no(
2415 			btr_cur_get_rec(&cursor),
2416 			btr_cur_get_page_zip(&cursor),
2417 			offsets, lower_page_no, mtr);
2418 		mem_heap_empty(heap);
2419 	} else {
2420 		lower_page = buf_block_get_frame(block);
2421 		lower_page_no = buf_block_get_page_no(block);
2422 		lower_page_zip = buf_block_get_page_zip(block);
2423 		upper_page = buf_block_get_frame(new_block);
2424 		upper_page_no = buf_block_get_page_no(new_block);
2425 		upper_page_zip = buf_block_get_page_zip(new_block);
2426 	}
2427 
2428 	/* Get the level of the split pages */
2429 	level = btr_page_get_level(buf_block_get_frame(block), mtr);
2430 	ut_ad(level
2431 	      == btr_page_get_level(buf_block_get_frame(new_block), mtr));
2432 
2433 	/* Build the node pointer (= node key and page address) for the upper
2434 	half */
2435 
2436 	node_ptr_upper = dict_index_build_node_ptr(index, split_rec,
2437 						   upper_page_no, heap, level);
2438 
2439 	/* Insert it next to the pointer to the lower half. Note that this
2440 	may generate recursion leading to a split on the higher level. */
2441 
2442 	btr_insert_on_non_leaf_level(index, level + 1, node_ptr_upper, mtr);
2443 
2444 	/* Free the memory heap */
2445 	mem_heap_free(heap);
2446 
2447 	/* Get the previous and next pages of page */
2448 
2449 	prev_page_no = btr_page_get_prev(page, mtr);
2450 	next_page_no = btr_page_get_next(page, mtr);
2451 	space = buf_block_get_space(block);
2452 	zip_size = buf_block_get_zip_size(block);
2453 
2454 	/* Update page links of the level */
2455 
2456 	if (prev_page_no != FIL_NULL) {
2457 		buf_block_t*	prev_block = btr_block_get(
2458 			space, zip_size, prev_page_no, RW_X_LATCH, index, mtr);
2459 #ifdef UNIV_BTR_DEBUG
2460 		ut_a(page_is_comp(prev_block->frame) == page_is_comp(page));
2461 		ut_a(btr_page_get_next(prev_block->frame, mtr)
2462 		     == buf_block_get_page_no(block));
2463 #endif /* UNIV_BTR_DEBUG */
2464 
2465 		btr_page_set_next(buf_block_get_frame(prev_block),
2466 				  buf_block_get_page_zip(prev_block),
2467 				  lower_page_no, mtr);
2468 	}
2469 
2470 	if (next_page_no != FIL_NULL) {
2471 		buf_block_t*	next_block = btr_block_get(
2472 			space, zip_size, next_page_no, RW_X_LATCH, index, mtr);
2473 #ifdef UNIV_BTR_DEBUG
2474 		ut_a(page_is_comp(next_block->frame) == page_is_comp(page));
2475 		ut_a(btr_page_get_prev(next_block->frame, mtr)
2476 		     == page_get_page_no(page));
2477 #endif /* UNIV_BTR_DEBUG */
2478 
2479 		btr_page_set_prev(buf_block_get_frame(next_block),
2480 				  buf_block_get_page_zip(next_block),
2481 				  upper_page_no, mtr);
2482 	}
2483 
2484 	btr_page_set_prev(lower_page, lower_page_zip, prev_page_no, mtr);
2485 	btr_page_set_next(lower_page, lower_page_zip, upper_page_no, mtr);
2486 
2487 	btr_page_set_prev(upper_page, upper_page_zip, lower_page_no, mtr);
2488 	btr_page_set_next(upper_page, upper_page_zip, next_page_no, mtr);
2489 }
2490 
2491 /*************************************************************//**
2492 Determine if a tuple is smaller than any record on the page.
2493 @return TRUE if smaller */
2494 static
2495 ibool
btr_page_tuple_smaller(btr_cur_t * cursor,const dtuple_t * tuple,ulint * offsets,ulint n_uniq,mem_heap_t ** heap)2496 btr_page_tuple_smaller(
2497 /*===================*/
2498 	btr_cur_t*	cursor,	/*!< in: b-tree cursor */
2499 	const dtuple_t*	tuple,	/*!< in: tuple to consider */
2500 	ulint*		offsets,/*!< in/out: temporary storage */
2501 	ulint		n_uniq,	/*!< in: number of unique fields
2502 				in the index page records */
2503 	mem_heap_t**	heap)	/*!< in/out: heap for offsets */
2504 {
2505 	buf_block_t*	block;
2506 	const rec_t*	first_rec;
2507 	page_cur_t	pcur;
2508 
2509 	/* Read the first user record in the page. */
2510 	block = btr_cur_get_block(cursor);
2511 	page_cur_set_before_first(block, &pcur);
2512 	page_cur_move_to_next(&pcur);
2513 	first_rec = page_cur_get_rec(&pcur);
2514 
2515 	offsets = rec_get_offsets(
2516 		first_rec, cursor->index, offsets,
2517 		n_uniq, heap);
2518 
2519 	return(cmp_dtuple_rec(tuple, first_rec, offsets) < 0);
2520 }
2521 
2522 /*************************************************************//**
2523 Splits an index page to halves and inserts the tuple. It is assumed
2524 that mtr holds an x-latch to the index tree. NOTE: the tree x-latch is
2525 released within this function! NOTE that the operation of this
2526 function must always succeed, we cannot reverse it: therefore enough
2527 free disk space (2 pages) must be guaranteed to be available before
2528 this function is called.
2529 
2530 @return inserted record */
2531 UNIV_INTERN
2532 rec_t*
btr_page_split_and_insert(btr_cur_t * cursor,const dtuple_t * tuple,ulint n_ext,mtr_t * mtr)2533 btr_page_split_and_insert(
2534 /*======================*/
2535 	btr_cur_t*	cursor,	/*!< in: cursor at which to insert; when the
2536 				function returns, the cursor is positioned
2537 				on the predecessor of the inserted record */
2538 	const dtuple_t*	tuple,	/*!< in: tuple to insert */
2539 	ulint		n_ext,	/*!< in: number of externally stored columns */
2540 	mtr_t*		mtr)	/*!< in: mtr */
2541 {
2542 	buf_block_t*	block;
2543 	page_t*		page;
2544 	page_zip_des_t*	page_zip;
2545 	ulint		page_no;
2546 	byte		direction;
2547 	ulint		hint_page_no;
2548 	buf_block_t*	new_block;
2549 	page_t*		new_page;
2550 	page_zip_des_t*	new_page_zip;
2551 	rec_t*		split_rec;
2552 	buf_block_t*	left_block;
2553 	buf_block_t*	right_block;
2554 	buf_block_t*	insert_block;
2555 	page_cur_t*	page_cursor;
2556 	rec_t*		first_rec;
2557 	byte*		buf = 0; /* remove warning */
2558 	rec_t*		move_limit;
2559 	ibool		insert_will_fit;
2560 	ibool		insert_left;
2561 	ulint		n_iterations = 0;
2562 	rec_t*		rec;
2563 	mem_heap_t*	heap;
2564 	ulint		n_uniq;
2565 	ulint*		offsets;
2566 
2567 	heap = mem_heap_create(1024);
2568 	n_uniq = dict_index_get_n_unique_in_tree(cursor->index);
2569 func_start:
2570 	mem_heap_empty(heap);
2571 	offsets = NULL;
2572 
2573 	ut_ad(mtr_memo_contains(mtr, dict_index_get_lock(cursor->index),
2574 				MTR_MEMO_X_LOCK));
2575 #ifdef UNIV_SYNC_DEBUG
2576 	ut_ad(rw_lock_own(dict_index_get_lock(cursor->index), RW_LOCK_EX));
2577 #endif /* UNIV_SYNC_DEBUG */
2578 
2579 	block = btr_cur_get_block(cursor);
2580 	page = buf_block_get_frame(block);
2581 	page_zip = buf_block_get_page_zip(block);
2582 
2583 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
2584 	ut_ad(page_get_n_recs(page) >= 1);
2585 
2586 	page_no = buf_block_get_page_no(block);
2587 
2588 	/* 1. Decide the split record; split_rec == NULL means that the
2589 	tuple to be inserted should be the first record on the upper
2590 	half-page */
2591 	insert_left = FALSE;
2592 
2593 	if (n_iterations > 0) {
2594 		direction = FSP_UP;
2595 		hint_page_no = page_no + 1;
2596 		split_rec = btr_page_get_split_rec(cursor, tuple, n_ext);
2597 
2598 		if (UNIV_UNLIKELY(split_rec == NULL)) {
2599 			insert_left = btr_page_tuple_smaller(
2600 				cursor, tuple, offsets, n_uniq, &heap);
2601 		}
2602 	} else if (btr_page_get_split_rec_to_right(cursor, &split_rec)) {
2603 		direction = FSP_UP;
2604 		hint_page_no = page_no + 1;
2605 
2606 	} else if (btr_page_get_split_rec_to_left(cursor, &split_rec)) {
2607 		direction = FSP_DOWN;
2608 		hint_page_no = page_no - 1;
2609 		ut_ad(split_rec);
2610 	} else {
2611 		direction = FSP_UP;
2612 		hint_page_no = page_no + 1;
2613 
2614 		/* If there is only one record in the index page, we
2615 		can't split the node in the middle by default. We need
2616 		to determine whether the new record will be inserted
2617 		to the left or right. */
2618 
2619 		if (page_get_n_recs(page) > 1) {
2620 			split_rec = page_get_middle_rec(page);
2621 		} else if (btr_page_tuple_smaller(cursor, tuple,
2622 						  offsets, n_uniq, &heap)) {
2623 			split_rec = page_rec_get_next(
2624 				page_get_infimum_rec(page));
2625 		} else {
2626 			split_rec = NULL;
2627 		}
2628 	}
2629 
2630 	/* 2. Allocate a new page to the index */
2631 	new_block = btr_page_alloc(cursor->index, hint_page_no, direction,
2632 				   btr_page_get_level(page, mtr), mtr, mtr);
2633 	new_page = buf_block_get_frame(new_block);
2634 	new_page_zip = buf_block_get_page_zip(new_block);
2635 	btr_page_create(new_block, new_page_zip, cursor->index,
2636 			btr_page_get_level(page, mtr), mtr);
2637 
2638 	/* 3. Calculate the first record on the upper half-page, and the
2639 	first record (move_limit) on original page which ends up on the
2640 	upper half */
2641 
2642 	if (split_rec) {
2643 		first_rec = move_limit = split_rec;
2644 
2645 		offsets = rec_get_offsets(split_rec, cursor->index, offsets,
2646 					  n_uniq, &heap);
2647 
2648 		insert_left = cmp_dtuple_rec(tuple, split_rec, offsets) < 0;
2649 
2650 		if (UNIV_UNLIKELY(!insert_left && new_page_zip
2651 				  && n_iterations > 0)) {
2652 			/* If a compressed page has already been split,
2653 			avoid further splits by inserting the record
2654 			to an empty page. */
2655 			split_rec = NULL;
2656 			goto insert_empty;
2657 		}
2658 	} else if (UNIV_UNLIKELY(insert_left)) {
2659 		ut_a(n_iterations > 0);
2660 		first_rec = page_rec_get_next(page_get_infimum_rec(page));
2661 		move_limit = page_rec_get_next(btr_cur_get_rec(cursor));
2662 	} else {
2663 insert_empty:
2664 		ut_ad(!split_rec);
2665 		ut_ad(!insert_left);
2666 		buf = mem_alloc(rec_get_converted_size(cursor->index,
2667 						       tuple, n_ext));
2668 
2669 		first_rec = rec_convert_dtuple_to_rec(buf, cursor->index,
2670 						      tuple, n_ext);
2671 		move_limit = page_rec_get_next(btr_cur_get_rec(cursor));
2672 	}
2673 
2674 	/* 4. Do first the modifications in the tree structure */
2675 
2676 	btr_attach_half_pages(cursor->index, block,
2677 			      first_rec, new_block, direction, mtr);
2678 
2679 	/* If the split is made on the leaf level and the insert will fit
2680 	on the appropriate half-page, we may release the tree x-latch.
2681 	We can then move the records after releasing the tree latch,
2682 	thus reducing the tree latch contention. */
2683 
2684 	if (split_rec) {
2685 		insert_will_fit = !new_page_zip
2686 			&& btr_page_insert_fits(cursor, split_rec,
2687 						offsets, tuple, n_ext, heap);
2688 	} else {
2689 		if (!insert_left) {
2690 			mem_free(buf);
2691 			buf = NULL;
2692 		}
2693 
2694 		insert_will_fit = !new_page_zip
2695 			&& btr_page_insert_fits(cursor, NULL,
2696 						NULL, tuple, n_ext, heap);
2697 	}
2698 
2699 	if (insert_will_fit && page_is_leaf(page)) {
2700 
2701 		mtr_memo_release(mtr, dict_index_get_lock(cursor->index),
2702 				 MTR_MEMO_X_LOCK);
2703 	}
2704 
2705 	/* 5. Move then the records to the new page */
2706 	if (direction == FSP_DOWN) {
2707 		/*		fputs("Split left\n", stderr); */
2708 
2709 		if (0
2710 #ifdef UNIV_ZIP_COPY
2711 		    || page_zip
2712 #endif /* UNIV_ZIP_COPY */
2713 		    || UNIV_UNLIKELY
2714 		    (!page_move_rec_list_start(new_block, block, move_limit,
2715 					       cursor->index, mtr))) {
2716 			/* For some reason, compressing new_page failed,
2717 			even though it should contain fewer records than
2718 			the original page.  Copy the page byte for byte
2719 			and then delete the records from both pages
2720 			as appropriate.  Deleting will always succeed. */
2721 			ut_a(new_page_zip);
2722 
2723 			page_zip_copy_recs(new_page_zip, new_page,
2724 					   page_zip, page, cursor->index, mtr);
2725 			page_delete_rec_list_end(move_limit - page + new_page,
2726 						 new_block, cursor->index,
2727 						 ULINT_UNDEFINED,
2728 						 ULINT_UNDEFINED, mtr);
2729 
2730 			/* Update the lock table and possible hash index. */
2731 
2732 			lock_move_rec_list_start(
2733 				new_block, block, move_limit,
2734 				new_page + PAGE_NEW_INFIMUM);
2735 
2736 			btr_search_move_or_delete_hash_entries(
2737 				new_block, block, cursor->index);
2738 
2739 			/* Delete the records from the source page. */
2740 
2741 			page_delete_rec_list_start(move_limit, block,
2742 						   cursor->index, mtr);
2743 		}
2744 
2745 		left_block = new_block;
2746 		right_block = block;
2747 
2748 		lock_update_split_left(right_block, left_block);
2749 	} else {
2750 		/*		fputs("Split right\n", stderr); */
2751 
2752 		if (0
2753 #ifdef UNIV_ZIP_COPY
2754 		    || page_zip
2755 #endif /* UNIV_ZIP_COPY */
2756 		    || UNIV_UNLIKELY
2757 		    (!page_move_rec_list_end(new_block, block, move_limit,
2758 					     cursor->index, mtr))) {
2759 			/* For some reason, compressing new_page failed,
2760 			even though it should contain fewer records than
2761 			the original page.  Copy the page byte for byte
2762 			and then delete the records from both pages
2763 			as appropriate.  Deleting will always succeed. */
2764 			ut_a(new_page_zip);
2765 
2766 			page_zip_copy_recs(new_page_zip, new_page,
2767 					   page_zip, page, cursor->index, mtr);
2768 			page_delete_rec_list_start(move_limit - page
2769 						   + new_page, new_block,
2770 						   cursor->index, mtr);
2771 
2772 			/* Update the lock table and possible hash index. */
2773 
2774 			lock_move_rec_list_end(new_block, block, move_limit);
2775 
2776 			btr_search_move_or_delete_hash_entries(
2777 				new_block, block, cursor->index);
2778 
2779 			/* Delete the records from the source page. */
2780 
2781 			page_delete_rec_list_end(move_limit, block,
2782 						 cursor->index,
2783 						 ULINT_UNDEFINED,
2784 						 ULINT_UNDEFINED, mtr);
2785 		}
2786 
2787 		left_block = block;
2788 		right_block = new_block;
2789 
2790 		lock_update_split_right(right_block, left_block);
2791 	}
2792 
2793 #ifdef UNIV_ZIP_DEBUG
2794 	if (UNIV_LIKELY_NULL(page_zip)) {
2795 		ut_a(page_zip_validate(page_zip, page, cursor->index));
2796 		ut_a(page_zip_validate(new_page_zip, new_page, cursor->index));
2797 	}
2798 #endif /* UNIV_ZIP_DEBUG */
2799 
2800 	/* At this point, split_rec, move_limit and first_rec may point
2801 	to garbage on the old page. */
2802 
2803 	/* 6. The split and the tree modification is now completed. Decide the
2804 	page where the tuple should be inserted */
2805 
2806 	if (insert_left) {
2807 		insert_block = left_block;
2808 	} else {
2809 		insert_block = right_block;
2810 	}
2811 
2812 	/* 7. Reposition the cursor for insert and try insertion */
2813 	page_cursor = btr_cur_get_page_cur(cursor);
2814 
2815 	page_cur_search(insert_block, cursor->index, tuple,
2816 			PAGE_CUR_LE, page_cursor);
2817 
2818 	rec = page_cur_tuple_insert(page_cursor, tuple,
2819 				    cursor->index, n_ext, mtr);
2820 
2821 #ifdef UNIV_ZIP_DEBUG
2822 	{
2823 		page_t*		insert_page
2824 			= buf_block_get_frame(insert_block);
2825 
2826 		page_zip_des_t*	insert_page_zip
2827 			= buf_block_get_page_zip(insert_block);
2828 
2829 		ut_a(!insert_page_zip
2830 		     || page_zip_validate(insert_page_zip, insert_page,
2831 					  cursor->index));
2832 	}
2833 #endif /* UNIV_ZIP_DEBUG */
2834 
2835 	if (UNIV_LIKELY(rec != NULL)) {
2836 
2837 		goto func_exit;
2838 	}
2839 
2840 	/* 8. If insert did not fit, try page reorganization */
2841 
2842 	if (UNIV_UNLIKELY
2843 	    (!btr_page_reorganize(insert_block, cursor->index, mtr))) {
2844 
2845 		goto insert_failed;
2846 	}
2847 
2848 	page_cur_search(insert_block, cursor->index, tuple,
2849 			PAGE_CUR_LE, page_cursor);
2850 	rec = page_cur_tuple_insert(page_cursor, tuple, cursor->index,
2851 				    n_ext, mtr);
2852 
2853 	if (UNIV_UNLIKELY(rec == NULL)) {
2854 		/* The insert did not fit on the page: loop back to the
2855 		start of the function for a new split */
2856 insert_failed:
2857 		/* We play safe and reset the free bits for new_page */
2858 		if (!dict_index_is_clust(cursor->index)) {
2859 			ibuf_reset_free_bits(new_block);
2860 		}
2861 
2862 		/* fprintf(stderr, "Split second round %lu\n",
2863 		page_get_page_no(page)); */
2864 		n_iterations++;
2865 		ut_ad(n_iterations < 2
2866 		      || buf_block_get_page_zip(insert_block));
2867 		ut_ad(!insert_will_fit);
2868 
2869 		goto func_start;
2870 	}
2871 
2872 func_exit:
2873 	/* Insert fit on the page: update the free bits for the
2874 	left and right pages in the same mtr */
2875 
2876 	if (!dict_index_is_clust(cursor->index) && page_is_leaf(page)) {
2877 		ibuf_update_free_bits_for_two_pages_low(
2878 			buf_block_get_zip_size(left_block),
2879 			left_block, right_block, mtr);
2880 	}
2881 
2882 #if 0
2883 	fprintf(stderr, "Split and insert done %lu %lu\n",
2884 		buf_block_get_page_no(left_block),
2885 		buf_block_get_page_no(right_block));
2886 #endif
2887 
2888 	ut_ad(page_validate(buf_block_get_frame(left_block), cursor->index));
2889 	ut_ad(page_validate(buf_block_get_frame(right_block), cursor->index));
2890 
2891 	mem_heap_free(heap);
2892 	return(rec);
2893 }
2894 
2895 #ifdef UNIV_SYNC_DEBUG
2896 /*************************************************************//**
2897 Removes a page from the level list of pages.
2898 @param space	in: space where removed
2899 @param zip_size	in: compressed page size in bytes, or 0 for uncompressed
2900 @param page	in/out: page to remove
2901 @param index	in: index tree
2902 @param mtr	in/out: mini-transaction */
2903 # define btr_level_list_remove(space,zip_size,page,index,mtr)		\
2904 	btr_level_list_remove_func(space,zip_size,page,index,mtr)
2905 #else /* UNIV_SYNC_DEBUG */
2906 /*************************************************************//**
2907 Removes a page from the level list of pages.
2908 @param space	in: space where removed
2909 @param zip_size	in: compressed page size in bytes, or 0 for uncompressed
2910 @param page	in/out: page to remove
2911 @param index	in: index tree
2912 @param mtr	in/out: mini-transaction */
2913 # define btr_level_list_remove(space,zip_size,page,index,mtr)		\
2914 	btr_level_list_remove_func(space,zip_size,page,mtr)
2915 #endif /* UNIV_SYNC_DEBUG */
2916 
2917 /*************************************************************//**
2918 Removes a page from the level list of pages. */
2919 static __attribute__((nonnull))
2920 void
btr_level_list_remove_func(ulint space,ulint zip_size,page_t * page,const dict_index_t * index,mtr_t * mtr)2921 btr_level_list_remove_func(
2922 /*=======================*/
2923 	ulint			space,	/*!< in: space where removed */
2924 	ulint			zip_size,/*!< in: compressed page size in bytes
2925 					or 0 for uncompressed pages */
2926 	page_t*			page,	/*!< in/out: page to remove */
2927 #ifdef UNIV_SYNC_DEBUG
2928 	const dict_index_t*	index,	/*!< in: index tree */
2929 #endif /* UNIV_SYNC_DEBUG */
2930 	mtr_t*			mtr)	/*!< in/out: mini-transaction */
2931 {
2932 	ulint	prev_page_no;
2933 	ulint	next_page_no;
2934 
2935 	ut_ad(page && mtr);
2936 	ut_ad(mtr_memo_contains_page(mtr, page, MTR_MEMO_PAGE_X_FIX));
2937 	ut_ad(space == page_get_space_id(page));
2938 	/* Get the previous and next page numbers of page */
2939 
2940 	prev_page_no = btr_page_get_prev(page, mtr);
2941 	next_page_no = btr_page_get_next(page, mtr);
2942 
2943 	/* Update page links of the level */
2944 
2945 	if (prev_page_no != FIL_NULL) {
2946 		buf_block_t*	prev_block
2947 			= btr_block_get(space, zip_size, prev_page_no,
2948 					RW_X_LATCH, index, mtr);
2949 		page_t*		prev_page
2950 			= buf_block_get_frame(prev_block);
2951 #ifdef UNIV_BTR_DEBUG
2952 		ut_a(page_is_comp(prev_page) == page_is_comp(page));
2953 		ut_a(btr_page_get_next(prev_page, mtr)
2954 		     == page_get_page_no(page));
2955 #endif /* UNIV_BTR_DEBUG */
2956 
2957 		btr_page_set_next(prev_page,
2958 				  buf_block_get_page_zip(prev_block),
2959 				  next_page_no, mtr);
2960 	}
2961 
2962 	if (next_page_no != FIL_NULL) {
2963 		buf_block_t*	next_block
2964 			= btr_block_get(space, zip_size, next_page_no,
2965 					RW_X_LATCH, index, mtr);
2966 		page_t*		next_page
2967 			= buf_block_get_frame(next_block);
2968 #ifdef UNIV_BTR_DEBUG
2969 		ut_a(page_is_comp(next_page) == page_is_comp(page));
2970 		ut_a(btr_page_get_prev(next_page, mtr)
2971 		     == page_get_page_no(page));
2972 #endif /* UNIV_BTR_DEBUG */
2973 
2974 		btr_page_set_prev(next_page,
2975 				  buf_block_get_page_zip(next_block),
2976 				  prev_page_no, mtr);
2977 	}
2978 }
2979 
2980 /****************************************************************//**
2981 Writes the redo log record for setting an index record as the predefined
2982 minimum record. */
2983 UNIV_INLINE
2984 void
btr_set_min_rec_mark_log(rec_t * rec,byte type,mtr_t * mtr)2985 btr_set_min_rec_mark_log(
2986 /*=====================*/
2987 	rec_t*	rec,	/*!< in: record */
2988 	byte	type,	/*!< in: MLOG_COMP_REC_MIN_MARK or MLOG_REC_MIN_MARK */
2989 	mtr_t*	mtr)	/*!< in: mtr */
2990 {
2991 	mlog_write_initial_log_record(rec, type, mtr);
2992 
2993 	/* Write rec offset as a 2-byte ulint */
2994 	mlog_catenate_ulint(mtr, page_offset(rec), MLOG_2BYTES);
2995 }
2996 #else /* !UNIV_HOTBACKUP */
2997 # define btr_set_min_rec_mark_log(rec,comp,mtr) ((void) 0)
2998 #endif /* !UNIV_HOTBACKUP */
2999 
3000 /****************************************************************//**
3001 Parses the redo log record for setting an index record as the predefined
3002 minimum record.
3003 @return	end of log record or NULL */
3004 UNIV_INTERN
3005 byte*
btr_parse_set_min_rec_mark(byte * ptr,byte * end_ptr,ulint comp,page_t * page,mtr_t * mtr)3006 btr_parse_set_min_rec_mark(
3007 /*=======================*/
3008 	byte*	ptr,	/*!< in: buffer */
3009 	byte*	end_ptr,/*!< in: buffer end */
3010 	ulint	comp,	/*!< in: nonzero=compact page format */
3011 	page_t*	page,	/*!< in: page or NULL */
3012 	mtr_t*	mtr)	/*!< in: mtr or NULL */
3013 {
3014 	rec_t*	rec;
3015 
3016 	if (end_ptr < ptr + 2) {
3017 
3018 		return(NULL);
3019 	}
3020 
3021 	if (page) {
3022 		ut_a(!page_is_comp(page) == !comp);
3023 
3024 		rec = page + mach_read_from_2(ptr);
3025 
3026 		btr_set_min_rec_mark(rec, mtr);
3027 	}
3028 
3029 	return(ptr + 2);
3030 }
3031 
3032 /****************************************************************//**
3033 Sets a record as the predefined minimum record. */
3034 UNIV_INTERN
3035 void
btr_set_min_rec_mark(rec_t * rec,mtr_t * mtr)3036 btr_set_min_rec_mark(
3037 /*=================*/
3038 	rec_t*	rec,	/*!< in: record */
3039 	mtr_t*	mtr)	/*!< in: mtr */
3040 {
3041 	ulint	info_bits;
3042 
3043 	if (UNIV_LIKELY(page_rec_is_comp(rec))) {
3044 		info_bits = rec_get_info_bits(rec, TRUE);
3045 
3046 		rec_set_info_bits_new(rec, info_bits | REC_INFO_MIN_REC_FLAG);
3047 
3048 		btr_set_min_rec_mark_log(rec, MLOG_COMP_REC_MIN_MARK, mtr);
3049 	} else {
3050 		info_bits = rec_get_info_bits(rec, FALSE);
3051 
3052 		rec_set_info_bits_old(rec, info_bits | REC_INFO_MIN_REC_FLAG);
3053 
3054 		btr_set_min_rec_mark_log(rec, MLOG_REC_MIN_MARK, mtr);
3055 	}
3056 }
3057 
3058 #ifndef UNIV_HOTBACKUP
3059 /*************************************************************//**
3060 Deletes on the upper level the node pointer to a page. */
3061 UNIV_INTERN
3062 void
btr_node_ptr_delete(dict_index_t * index,buf_block_t * block,mtr_t * mtr)3063 btr_node_ptr_delete(
3064 /*================*/
3065 	dict_index_t*	index,	/*!< in: index tree */
3066 	buf_block_t*	block,	/*!< in: page whose node pointer is deleted */
3067 	mtr_t*		mtr)	/*!< in: mtr */
3068 {
3069 	btr_cur_t	cursor;
3070 	ibool		compressed;
3071 	ulint		err;
3072 
3073 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
3074 
3075 	/* Delete node pointer on father page */
3076 	btr_page_get_father(index, block, mtr, &cursor);
3077 
3078 	compressed = btr_cur_pessimistic_delete(&err, TRUE, &cursor, RB_NONE,
3079 						mtr);
3080 	ut_a(err == DB_SUCCESS);
3081 
3082 	if (!compressed) {
3083 		btr_cur_compress_if_useful(&cursor, FALSE, mtr);
3084 	}
3085 }
3086 
3087 /*************************************************************//**
3088 If page is the only on its level, this function moves its records to the
3089 father page, thus reducing the tree height.
3090 @return father block */
3091 static
3092 buf_block_t*
btr_lift_page_up(dict_index_t * index,buf_block_t * block,mtr_t * mtr)3093 btr_lift_page_up(
3094 /*=============*/
3095 	dict_index_t*	index,	/*!< in: index tree */
3096 	buf_block_t*	block,	/*!< in: page which is the only on its level;
3097 				must not be empty: use
3098 				btr_discard_only_page_on_level if the last
3099 				record from the page should be removed */
3100 	mtr_t*		mtr)	/*!< in: mtr */
3101 {
3102 	buf_block_t*	father_block;
3103 	page_t*		father_page;
3104 	ulint		page_level;
3105 	page_zip_des_t*	father_page_zip;
3106 	page_t*		page		= buf_block_get_frame(block);
3107 	ulint		root_page_no;
3108 	buf_block_t*	blocks[BTR_MAX_LEVELS];
3109 	ulint		n_blocks;	/*!< last used index in blocks[] */
3110 	ulint		i;
3111 	ibool		lift_father_up	= FALSE;
3112 	buf_block_t*	block_orig	= block;
3113 
3114 	ut_ad(btr_page_get_prev(page, mtr) == FIL_NULL);
3115 	ut_ad(btr_page_get_next(page, mtr) == FIL_NULL);
3116 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
3117 
3118 	page_level = btr_page_get_level(page, mtr);
3119 	root_page_no = dict_index_get_page(index);
3120 
3121 	{
3122 		btr_cur_t	cursor;
3123 		ulint*		offsets	= NULL;
3124 		mem_heap_t*	heap	= mem_heap_create(
3125 			sizeof(*offsets)
3126 			* (REC_OFFS_HEADER_SIZE + 1 + 1 + index->n_fields));
3127 		buf_block_t*	b;
3128 
3129 		offsets = btr_page_get_father_block(offsets, heap, index,
3130 						    block, mtr, &cursor);
3131 		father_block = btr_cur_get_block(&cursor);
3132 		father_page_zip = buf_block_get_page_zip(father_block);
3133 		father_page = buf_block_get_frame(father_block);
3134 
3135 		n_blocks = 0;
3136 
3137 		/* Store all ancestor pages so we can reset their
3138 		levels later on.  We have to do all the searches on
3139 		the tree now because later on, after we've replaced
3140 		the first level, the tree is in an inconsistent state
3141 		and can not be searched. */
3142 		for (b = father_block;
3143 		     buf_block_get_page_no(b) != root_page_no; ) {
3144 			ut_a(n_blocks < BTR_MAX_LEVELS);
3145 
3146 			offsets = btr_page_get_father_block(offsets, heap,
3147 							    index, b,
3148 							    mtr, &cursor);
3149 
3150 			blocks[n_blocks++] = b = btr_cur_get_block(&cursor);
3151 		}
3152 
3153 		if (n_blocks && page_level == 0) {
3154 			/* The father page also should be the only on its level (not
3155 			root). We should lift up the father page at first.
3156 			Because the leaf page should be lifted up only for root page.
3157 			The freeing page is based on page_level (==0 or !=0)
3158 			to choose segment. If the page_level is changed ==0 from !=0,
3159 			later freeing of the page doesn't find the page allocation
3160 			to be freed.*/
3161 
3162 			lift_father_up = TRUE;
3163 			block = father_block;
3164 			page = buf_block_get_frame(block);
3165 			page_level = btr_page_get_level(page, mtr);
3166 
3167 			ut_ad(btr_page_get_prev(page, mtr) == FIL_NULL);
3168 			ut_ad(btr_page_get_next(page, mtr) == FIL_NULL);
3169 			ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
3170 
3171 			father_block = blocks[0];
3172 			father_page_zip = buf_block_get_page_zip(father_block);
3173 			father_page = buf_block_get_frame(father_block);
3174 		}
3175 
3176 		mem_heap_free(heap);
3177 	}
3178 
3179 	btr_search_drop_page_hash_index(block);
3180 
3181 	/* Make the father empty */
3182 	btr_page_empty(father_block, father_page_zip, index, page_level, mtr);
3183 	page_level++;
3184 
3185 	/* Copy the records to the father page one by one. */
3186 	if (0
3187 #ifdef UNIV_ZIP_COPY
3188 	    || father_page_zip
3189 #endif /* UNIV_ZIP_COPY */
3190 	    || UNIV_UNLIKELY
3191 	    (!page_copy_rec_list_end(father_block, block,
3192 				     page_get_infimum_rec(page),
3193 				     index, mtr))) {
3194 		const page_zip_des_t*	page_zip
3195 			= buf_block_get_page_zip(block);
3196 		ut_a(father_page_zip);
3197 		ut_a(page_zip);
3198 
3199 		/* Copy the page byte for byte. */
3200 		page_zip_copy_recs(father_page_zip, father_page,
3201 				   page_zip, page, index, mtr);
3202 
3203 		/* Update the lock table and possible hash index. */
3204 
3205 		lock_move_rec_list_end(father_block, block,
3206 				       page_get_infimum_rec(page));
3207 
3208 		btr_search_move_or_delete_hash_entries(father_block, block,
3209 						       index);
3210 	}
3211 
3212 	btr_blob_dbg_remove(page, index, "btr_lift_page_up");
3213 	lock_update_copy_and_discard(father_block, block);
3214 
3215 	/* Go upward to root page, decrementing levels by one. */
3216 	for (i = lift_father_up ? 1 : 0; i < n_blocks; i++, page_level++) {
3217 		page_t*		page	= buf_block_get_frame(blocks[i]);
3218 		page_zip_des_t*	page_zip= buf_block_get_page_zip(blocks[i]);
3219 
3220 		ut_ad(btr_page_get_level(page, mtr) == page_level + 1);
3221 
3222 		btr_page_set_level(page, page_zip, page_level, mtr);
3223 #ifdef UNIV_ZIP_DEBUG
3224 		ut_a(!page_zip || page_zip_validate(page_zip, page, index));
3225 #endif /* UNIV_ZIP_DEBUG */
3226 	}
3227 
3228 	/* Free the file page */
3229 	btr_page_free(index, block, mtr);
3230 
3231 	/* We play it safe and reset the free bits for the father */
3232 	if (!dict_index_is_clust(index)) {
3233 		ibuf_reset_free_bits(father_block);
3234 	}
3235 	ut_ad(page_validate(father_page, index));
3236 	ut_ad(btr_check_node_ptr(index, father_block, mtr));
3237 
3238 	return(lift_father_up ? block_orig : father_block);
3239 }
3240 
3241 /*************************************************************//**
3242 Tries to merge the page first to the left immediate brother if such a
3243 brother exists, and the node pointers to the current page and to the brother
3244 reside on the same page. If the left brother does not satisfy these
3245 conditions, looks at the right brother. If the page is the only one on that
3246 level lifts the records of the page to the father page, thus reducing the
3247 tree height. It is assumed that mtr holds an x-latch on the tree and on the
3248 page. If cursor is on the leaf level, mtr must also hold x-latches to the
3249 brothers, if they exist.
3250 @return	TRUE on success */
3251 UNIV_INTERN
3252 ibool
btr_compress(btr_cur_t * cursor,ibool adjust,mtr_t * mtr)3253 btr_compress(
3254 /*=========*/
3255 	btr_cur_t*	cursor,	/*!< in/out: cursor on the page to merge
3256 				or lift; the page must not be empty:
3257 				when deleting records, use btr_discard_page()
3258 				if the page would become empty */
3259 	ibool		adjust,	/*!< in: TRUE if should adjust the
3260 				cursor position even if compression occurs */
3261 	mtr_t*		mtr)	/*!< in/out: mini-transaction */
3262 {
3263 	dict_index_t*	index;
3264 	ulint		space;
3265 	ulint		zip_size;
3266 	ulint		left_page_no;
3267 	ulint		right_page_no;
3268 	buf_block_t*	merge_block;
3269 	page_t*		merge_page = NULL;
3270 	page_zip_des_t*	merge_page_zip;
3271 	ibool		is_left;
3272 	buf_block_t*	block;
3273 	page_t*		page;
3274 	btr_cur_t	father_cursor;
3275 	mem_heap_t*	heap;
3276 	ulint*		offsets;
3277 	ulint		nth_rec = 0; /* remove bogus warning */
3278 	DBUG_ENTER("btr_compress");
3279 
3280 	block = btr_cur_get_block(cursor);
3281 	page = btr_cur_get_page(cursor);
3282 	index = btr_cur_get_index(cursor);
3283 
3284 	btr_assert_not_corrupted(block, index);
3285 
3286 	ut_ad(mtr_memo_contains(mtr, dict_index_get_lock(index),
3287 				MTR_MEMO_X_LOCK));
3288 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
3289 	space = dict_index_get_space(index);
3290 	zip_size = dict_table_zip_size(index->table);
3291 
3292 	left_page_no = btr_page_get_prev(page, mtr);
3293 	right_page_no = btr_page_get_next(page, mtr);
3294 
3295 #ifdef UNIV_DEBUG
3296 	if (!page_is_leaf(page) && left_page_no == FIL_NULL) {
3297 		ut_a(REC_INFO_MIN_REC_FLAG & rec_get_info_bits(
3298 			page_rec_get_next(page_get_infimum_rec(page)),
3299 			page_is_comp(page)));
3300 	}
3301 #endif /* UNIV_DEBUG */
3302 
3303 	heap = mem_heap_create(100);
3304 	offsets = btr_page_get_father_block(NULL, heap, index, block, mtr,
3305 					    &father_cursor);
3306 
3307 	if (adjust) {
3308 		nth_rec = page_rec_get_n_recs_before(btr_cur_get_rec(cursor));
3309 		ut_ad(nth_rec > 0);
3310 	}
3311 
3312 	if (left_page_no == FIL_NULL && right_page_no == FIL_NULL) {
3313 		/* The page is the only one on the level, lift the records
3314 		to the father */
3315 
3316 		merge_block = btr_lift_page_up(index, block, mtr);
3317 		goto func_exit;
3318 	}
3319 
3320 	/* Decide the page to which we try to merge and which will inherit
3321 	the locks */
3322 
3323 	is_left = btr_can_merge_with_page(cursor, left_page_no,
3324 					  &merge_block, mtr);
3325 
3326 	DBUG_EXECUTE_IF("ib_always_merge_right", is_left = FALSE;);
3327 
3328 	if(!is_left
3329 	   && !btr_can_merge_with_page(cursor, right_page_no, &merge_block,
3330 				       mtr)) {
3331 		goto err_exit;
3332 	}
3333 
3334 	merge_page = buf_block_get_frame(merge_block);
3335 
3336 #ifdef UNIV_BTR_DEBUG
3337 	if (is_left) {
3338                 ut_a(btr_page_get_next(merge_page, mtr)
3339                      == buf_block_get_page_no(block));
3340 	} else {
3341                ut_a(btr_page_get_prev(merge_page, mtr)
3342                      == buf_block_get_page_no(block));
3343 	}
3344 #endif /* UNIV_BTR_DEBUG */
3345 
3346 	ut_ad(page_validate(merge_page, index));
3347 
3348 	merge_page_zip = buf_block_get_page_zip(merge_block);
3349 #ifdef UNIV_ZIP_DEBUG
3350 	if (UNIV_LIKELY_NULL(merge_page_zip)) {
3351 		const page_zip_des_t*	page_zip
3352 			= buf_block_get_page_zip(block);
3353 		ut_a(page_zip);
3354 		ut_a(page_zip_validate(merge_page_zip, merge_page, index));
3355 		ut_a(page_zip_validate(page_zip, page, index));
3356 	}
3357 #endif /* UNIV_ZIP_DEBUG */
3358 
3359 	/* Move records to the merge page */
3360 	if (is_left) {
3361 		rec_t*	orig_pred = page_copy_rec_list_start(
3362 			merge_block, block, page_get_supremum_rec(page),
3363 			index, mtr);
3364 
3365 		if (UNIV_UNLIKELY(!orig_pred)) {
3366 			goto err_exit;
3367 		}
3368 
3369 		btr_search_drop_page_hash_index(block);
3370 
3371 		/* Remove the page from the level list */
3372 		btr_level_list_remove(space, zip_size, page, index, mtr);
3373 
3374 		btr_node_ptr_delete(index, block, mtr);
3375 		lock_update_merge_left(merge_block, orig_pred, block);
3376 
3377 		if (adjust) {
3378 			nth_rec += page_rec_get_n_recs_before(orig_pred);
3379 		}
3380 	} else {
3381 		rec_t*		orig_succ;
3382 		ibool		compressed;
3383 		ulint		err;
3384 		btr_cur_t	cursor2;
3385 					/* father cursor pointing to node ptr
3386 					of the right sibling */
3387 #ifdef UNIV_BTR_DEBUG
3388 		byte		fil_page_prev[4];
3389 #endif /* UNIV_BTR_DEBUG */
3390 
3391 		btr_page_get_father(index, merge_block, mtr, &cursor2);
3392 
3393 		if (merge_page_zip && left_page_no == FIL_NULL) {
3394 			/* The function page_zip_compress(), which will be
3395 			invoked by page_copy_rec_list_end() below,
3396 			requires that FIL_PAGE_PREV be FIL_NULL.
3397 			Clear the field, but prepare to restore it. */
3398 #ifdef UNIV_BTR_DEBUG
3399 			memcpy(fil_page_prev, merge_page + FIL_PAGE_PREV, 4);
3400 #endif /* UNIV_BTR_DEBUG */
3401 #if FIL_NULL != 0xffffffff
3402 # error "FIL_NULL != 0xffffffff"
3403 #endif
3404 			memset(merge_page + FIL_PAGE_PREV, 0xff, 4);
3405 		}
3406 
3407 		orig_succ = page_copy_rec_list_end(merge_block, block,
3408 						   page_get_infimum_rec(page),
3409 						   cursor->index, mtr);
3410 
3411 		if (UNIV_UNLIKELY(!orig_succ)) {
3412 			ut_a(merge_page_zip);
3413 #ifdef UNIV_BTR_DEBUG
3414 			if (left_page_no == FIL_NULL) {
3415 				/* FIL_PAGE_PREV was restored from
3416 				merge_page_zip. */
3417 				ut_a(!memcmp(fil_page_prev,
3418 					     merge_page + FIL_PAGE_PREV, 4));
3419 			}
3420 #endif /* UNIV_BTR_DEBUG */
3421 			goto err_exit;
3422 		}
3423 
3424 		btr_search_drop_page_hash_index(block);
3425 
3426 #ifdef UNIV_BTR_DEBUG
3427 		if (merge_page_zip && left_page_no == FIL_NULL) {
3428 			/* Restore FIL_PAGE_PREV in order to avoid an assertion
3429 			failure in btr_level_list_remove(), which will set
3430 			the field again to FIL_NULL.  Even though this makes
3431 			merge_page and merge_page_zip inconsistent for a
3432 			split second, it is harmless, because the pages
3433 			are X-latched. */
3434 			memcpy(merge_page + FIL_PAGE_PREV, fil_page_prev, 4);
3435 		}
3436 #endif /* UNIV_BTR_DEBUG */
3437 
3438 		/* Remove the page from the level list */
3439 		btr_level_list_remove(space, zip_size, page, index, mtr);
3440 
3441 		/* Replace the address of the old child node (= page) with the
3442 		address of the merge page to the right */
3443 		btr_node_ptr_set_child_page_no(
3444 			btr_cur_get_rec(&father_cursor),
3445 			btr_cur_get_page_zip(&father_cursor),
3446 			offsets, right_page_no, mtr);
3447 
3448 		compressed = btr_cur_pessimistic_delete(&err, TRUE, &cursor2,
3449 							RB_NONE, mtr);
3450 		ut_a(err == DB_SUCCESS);
3451 
3452 		if (!compressed) {
3453 			btr_cur_compress_if_useful(&cursor2, FALSE, mtr);
3454 		}
3455 
3456 		lock_update_merge_right(merge_block, orig_succ, block);
3457 	}
3458 
3459 	btr_blob_dbg_remove(page, index, "btr_compress");
3460 
3461 	if (!dict_index_is_clust(index) && page_is_leaf(merge_page)) {
3462 		/* Update the free bits of the B-tree page in the
3463 		insert buffer bitmap.  This has to be done in a
3464 		separate mini-transaction that is committed before the
3465 		main mini-transaction.  We cannot update the insert
3466 		buffer bitmap in this mini-transaction, because
3467 		btr_compress() can be invoked recursively without
3468 		committing the mini-transaction in between.  Since
3469 		insert buffer bitmap pages have a lower rank than
3470 		B-tree pages, we must not access other pages in the
3471 		same mini-transaction after accessing an insert buffer
3472 		bitmap page. */
3473 
3474 		/* The free bits in the insert buffer bitmap must
3475 		never exceed the free space on a page.  It is safe to
3476 		decrement or reset the bits in the bitmap in a
3477 		mini-transaction that is committed before the
3478 		mini-transaction that affects the free space. */
3479 
3480 		/* It is unsafe to increment the bits in a separately
3481 		committed mini-transaction, because in crash recovery,
3482 		the free bits could momentarily be set too high. */
3483 
3484 		if (zip_size) {
3485 			/* Because the free bits may be incremented
3486 			and we cannot update the insert buffer bitmap
3487 			in the same mini-transaction, the only safe
3488 			thing we can do here is the pessimistic
3489 			approach: reset the free bits. */
3490 			ibuf_reset_free_bits(merge_block);
3491 		} else {
3492 			/* On uncompressed pages, the free bits will
3493 			never increase here.  Thus, it is safe to
3494 			write the bits accurately in a separate
3495 			mini-transaction. */
3496 			ibuf_update_free_bits_if_full(merge_block,
3497 						      UNIV_PAGE_SIZE,
3498 						      ULINT_UNDEFINED);
3499 		}
3500 	}
3501 
3502 	ut_ad(page_validate(merge_page, index));
3503 #ifdef UNIV_ZIP_DEBUG
3504 	ut_a(!merge_page_zip || page_zip_validate(merge_page_zip, merge_page,
3505 						  index));
3506 #endif /* UNIV_ZIP_DEBUG */
3507 
3508 	/* Free the file page */
3509 	btr_page_free(index, block, mtr);
3510 
3511 	ut_ad(btr_check_node_ptr(index, merge_block, mtr));
3512 func_exit:
3513 	mem_heap_free(heap);
3514 
3515 	if (adjust) {
3516 		ut_ad(nth_rec > 0);
3517 		btr_cur_position(
3518 			index,
3519 			page_rec_get_nth(merge_block->frame, nth_rec),
3520 			merge_block, cursor);
3521 	}
3522 	DBUG_RETURN(TRUE);
3523 
3524 err_exit:
3525 	/* We play it safe and reset the free bits. */
3526 	if (zip_size
3527 	    && merge_page
3528 	    && page_is_leaf(merge_page)
3529 	    && !dict_index_is_clust(index)) {
3530 		ibuf_reset_free_bits(merge_block);
3531 	}
3532 
3533 	mem_heap_free(heap);
3534 	DBUG_RETURN(FALSE);
3535 }
3536 
3537 /*************************************************************//**
3538 Discards a page that is the only page on its level.  This will empty
3539 the whole B-tree, leaving just an empty root page.  This function
3540 should never be reached, because btr_compress(), which is invoked in
3541 delete operations, calls btr_lift_page_up() to flatten the B-tree. */
3542 static
3543 void
btr_discard_only_page_on_level(dict_index_t * index,buf_block_t * block,mtr_t * mtr)3544 btr_discard_only_page_on_level(
3545 /*===========================*/
3546 	dict_index_t*	index,	/*!< in: index tree */
3547 	buf_block_t*	block,	/*!< in: page which is the only on its level */
3548 	mtr_t*		mtr)	/*!< in: mtr */
3549 {
3550 	ulint		page_level = 0;
3551 	trx_id_t	max_trx_id;
3552 
3553 	/* Save the PAGE_MAX_TRX_ID from the leaf page. */
3554 	max_trx_id = page_get_max_trx_id(buf_block_get_frame(block));
3555 
3556 	while (buf_block_get_page_no(block) != dict_index_get_page(index)) {
3557 		btr_cur_t	cursor;
3558 		buf_block_t*	father;
3559 		const page_t*	page	= buf_block_get_frame(block);
3560 
3561 		ut_a(page_get_n_recs(page) == 1);
3562 		ut_a(page_level == btr_page_get_level(page, mtr));
3563 		ut_a(btr_page_get_prev(page, mtr) == FIL_NULL);
3564 		ut_a(btr_page_get_next(page, mtr) == FIL_NULL);
3565 
3566 		ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
3567 		btr_search_drop_page_hash_index(block);
3568 
3569 		btr_page_get_father(index, block, mtr, &cursor);
3570 		father = btr_cur_get_block(&cursor);
3571 
3572 		lock_update_discard(father, PAGE_HEAP_NO_SUPREMUM, block);
3573 
3574 		/* Free the file page */
3575 		btr_page_free(index, block, mtr);
3576 
3577 		block = father;
3578 		page_level++;
3579 	}
3580 
3581 	/* block is the root page, which must be empty, except
3582 	for the node pointer to the (now discarded) block(s). */
3583 
3584 #ifdef UNIV_BTR_DEBUG
3585 	if (!dict_index_is_ibuf(index)) {
3586 		const page_t*	root	= buf_block_get_frame(block);
3587 		const ulint	space	= dict_index_get_space(index);
3588 		ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_LEAF
3589 					    + root, space));
3590 		ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP
3591 					    + root, space));
3592 	}
3593 #endif /* UNIV_BTR_DEBUG */
3594 
3595 	btr_page_empty(block, buf_block_get_page_zip(block), index, 0, mtr);
3596 
3597 	if (!dict_index_is_clust(index)) {
3598 		/* We play it safe and reset the free bits for the root */
3599 		ibuf_reset_free_bits(block);
3600 
3601 		if (page_is_leaf(buf_block_get_frame(block))) {
3602 			ut_a(max_trx_id);
3603 			page_set_max_trx_id(block,
3604 					    buf_block_get_page_zip(block),
3605 					    max_trx_id, mtr);
3606 		}
3607 	}
3608 }
3609 
3610 /*************************************************************//**
3611 Discards a page from a B-tree. This is used to remove the last record from
3612 a B-tree page: the whole page must be removed at the same time. This cannot
3613 be used for the root page, which is allowed to be empty. */
3614 UNIV_INTERN
3615 void
btr_discard_page(btr_cur_t * cursor,mtr_t * mtr)3616 btr_discard_page(
3617 /*=============*/
3618 	btr_cur_t*	cursor,	/*!< in: cursor on the page to discard: not on
3619 				the root page */
3620 	mtr_t*		mtr)	/*!< in: mtr */
3621 {
3622 	dict_index_t*	index;
3623 	ulint		space;
3624 	ulint		zip_size;
3625 	ulint		left_page_no;
3626 	ulint		right_page_no;
3627 	buf_block_t*	merge_block;
3628 	page_t*		merge_page;
3629 	buf_block_t*	block;
3630 	page_t*		page;
3631 	rec_t*		node_ptr;
3632 
3633 	block = btr_cur_get_block(cursor);
3634 	index = btr_cur_get_index(cursor);
3635 
3636 	ut_ad(dict_index_get_page(index) != buf_block_get_page_no(block));
3637 	ut_ad(mtr_memo_contains(mtr, dict_index_get_lock(index),
3638 				MTR_MEMO_X_LOCK));
3639 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
3640 	space = dict_index_get_space(index);
3641 	zip_size = dict_table_zip_size(index->table);
3642 
3643 	/* Decide the page which will inherit the locks */
3644 
3645 	left_page_no = btr_page_get_prev(buf_block_get_frame(block), mtr);
3646 	right_page_no = btr_page_get_next(buf_block_get_frame(block), mtr);
3647 
3648 	if (left_page_no != FIL_NULL) {
3649 		merge_block = btr_block_get(space, zip_size, left_page_no,
3650 					    RW_X_LATCH, index, mtr);
3651 		merge_page = buf_block_get_frame(merge_block);
3652 #ifdef UNIV_BTR_DEBUG
3653 		ut_a(btr_page_get_next(merge_page, mtr)
3654 		     == buf_block_get_page_no(block));
3655 #endif /* UNIV_BTR_DEBUG */
3656 	} else if (right_page_no != FIL_NULL) {
3657 		merge_block = btr_block_get(space, zip_size, right_page_no,
3658 					    RW_X_LATCH, index, mtr);
3659 		merge_page = buf_block_get_frame(merge_block);
3660 #ifdef UNIV_BTR_DEBUG
3661 		ut_a(btr_page_get_prev(merge_page, mtr)
3662 		     == buf_block_get_page_no(block));
3663 #endif /* UNIV_BTR_DEBUG */
3664 	} else {
3665 		btr_discard_only_page_on_level(index, block, mtr);
3666 
3667 		return;
3668 	}
3669 
3670 	page = buf_block_get_frame(block);
3671 	ut_a(page_is_comp(merge_page) == page_is_comp(page));
3672 	btr_search_drop_page_hash_index(block);
3673 
3674 	if (left_page_no == FIL_NULL && !page_is_leaf(page)) {
3675 
3676 		/* We have to mark the leftmost node pointer on the right
3677 		side page as the predefined minimum record */
3678 		node_ptr = page_rec_get_next(page_get_infimum_rec(merge_page));
3679 
3680 		ut_ad(page_rec_is_user_rec(node_ptr));
3681 
3682 		/* This will make page_zip_validate() fail on merge_page
3683 		until btr_level_list_remove() completes.  This is harmless,
3684 		because everything will take place within a single
3685 		mini-transaction and because writing to the redo log
3686 		is an atomic operation (performed by mtr_commit()). */
3687 		btr_set_min_rec_mark(node_ptr, mtr);
3688 	}
3689 
3690 	btr_node_ptr_delete(index, block, mtr);
3691 
3692 	/* Remove the page from the level list */
3693 	btr_level_list_remove(space, zip_size, page, index, mtr);
3694 #ifdef UNIV_ZIP_DEBUG
3695 	{
3696 		page_zip_des_t*	merge_page_zip
3697 			= buf_block_get_page_zip(merge_block);
3698 		ut_a(!merge_page_zip
3699 		     || page_zip_validate(merge_page_zip, merge_page, index));
3700 	}
3701 #endif /* UNIV_ZIP_DEBUG */
3702 
3703 	if (left_page_no != FIL_NULL) {
3704 		lock_update_discard(merge_block, PAGE_HEAP_NO_SUPREMUM,
3705 				    block);
3706 	} else {
3707 		lock_update_discard(merge_block,
3708 				    lock_get_min_heap_no(merge_block),
3709 				    block);
3710 	}
3711 
3712 	btr_blob_dbg_remove(page, index, "btr_discard_page");
3713 
3714 	/* Free the file page */
3715 	btr_page_free(index, block, mtr);
3716 
3717 	ut_ad(btr_check_node_ptr(index, merge_block, mtr));
3718 }
3719 
3720 #ifdef UNIV_BTR_PRINT
3721 /*************************************************************//**
3722 Prints size info of a B-tree. */
3723 UNIV_INTERN
3724 void
btr_print_size(dict_index_t * index)3725 btr_print_size(
3726 /*===========*/
3727 	dict_index_t*	index)	/*!< in: index tree */
3728 {
3729 	page_t*		root;
3730 	fseg_header_t*	seg;
3731 	mtr_t		mtr;
3732 
3733 	if (dict_index_is_ibuf(index)) {
3734 		fputs("Sorry, cannot print info of an ibuf tree:"
3735 		      " use ibuf functions\n", stderr);
3736 
3737 		return;
3738 	}
3739 
3740 	mtr_start(&mtr);
3741 
3742 	root = btr_root_get(index, &mtr);
3743 
3744 	seg = root + PAGE_HEADER + PAGE_BTR_SEG_TOP;
3745 
3746 	fputs("INFO OF THE NON-LEAF PAGE SEGMENT\n", stderr);
3747 	fseg_print(seg, &mtr);
3748 
3749 	if (!(index->type & DICT_UNIVERSAL)) {
3750 
3751 		seg = root + PAGE_HEADER + PAGE_BTR_SEG_LEAF;
3752 
3753 		fputs("INFO OF THE LEAF PAGE SEGMENT\n", stderr);
3754 		fseg_print(seg, &mtr);
3755 	}
3756 
3757 	mtr_commit(&mtr);
3758 }
3759 
3760 /************************************************************//**
3761 Prints recursively index tree pages. */
3762 static
3763 void
btr_print_recursive(dict_index_t * index,buf_block_t * block,ulint width,mem_heap_t ** heap,ulint ** offsets,mtr_t * mtr)3764 btr_print_recursive(
3765 /*================*/
3766 	dict_index_t*	index,	/*!< in: index tree */
3767 	buf_block_t*	block,	/*!< in: index page */
3768 	ulint		width,	/*!< in: print this many entries from start
3769 				and end */
3770 	mem_heap_t**	heap,	/*!< in/out: heap for rec_get_offsets() */
3771 	ulint**		offsets,/*!< in/out: buffer for rec_get_offsets() */
3772 	mtr_t*		mtr)	/*!< in: mtr */
3773 {
3774 	const page_t*	page	= buf_block_get_frame(block);
3775 	page_cur_t	cursor;
3776 	ulint		n_recs;
3777 	ulint		i	= 0;
3778 	mtr_t		mtr2;
3779 
3780 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
3781 	fprintf(stderr, "NODE ON LEVEL %lu page number %lu\n",
3782 		(ulong) btr_page_get_level(page, mtr),
3783 		(ulong) buf_block_get_page_no(block));
3784 
3785 	page_print(block, index, width, width);
3786 
3787 	n_recs = page_get_n_recs(page);
3788 
3789 	page_cur_set_before_first(block, &cursor);
3790 	page_cur_move_to_next(&cursor);
3791 
3792 	while (!page_cur_is_after_last(&cursor)) {
3793 
3794 		if (page_is_leaf(page)) {
3795 
3796 			/* If this is the leaf level, do nothing */
3797 
3798 		} else if ((i <= width) || (i >= n_recs - width)) {
3799 
3800 			const rec_t*	node_ptr;
3801 
3802 			mtr_start(&mtr2);
3803 
3804 			node_ptr = page_cur_get_rec(&cursor);
3805 
3806 			*offsets = rec_get_offsets(node_ptr, index, *offsets,
3807 						   ULINT_UNDEFINED, heap);
3808 			btr_print_recursive(index,
3809 					    btr_node_ptr_get_child(node_ptr,
3810 								   index,
3811 								   *offsets,
3812 								   &mtr2),
3813 					    width, heap, offsets, &mtr2);
3814 			mtr_commit(&mtr2);
3815 		}
3816 
3817 		page_cur_move_to_next(&cursor);
3818 		i++;
3819 	}
3820 }
3821 
3822 /**************************************************************//**
3823 Prints directories and other info of all nodes in the tree. */
3824 UNIV_INTERN
3825 void
btr_print_index(dict_index_t * index,ulint width)3826 btr_print_index(
3827 /*============*/
3828 	dict_index_t*	index,	/*!< in: index */
3829 	ulint		width)	/*!< in: print this many entries from start
3830 				and end */
3831 {
3832 	mtr_t		mtr;
3833 	buf_block_t*	root;
3834 	mem_heap_t*	heap	= NULL;
3835 	ulint		offsets_[REC_OFFS_NORMAL_SIZE];
3836 	ulint*		offsets	= offsets_;
3837 	rec_offs_init(offsets_);
3838 
3839 	fputs("--------------------------\n"
3840 	      "INDEX TREE PRINT\n", stderr);
3841 
3842 	mtr_start(&mtr);
3843 
3844 	root = btr_root_block_get(index, &mtr);
3845 
3846 	btr_print_recursive(index, root, width, &heap, &offsets, &mtr);
3847 	if (UNIV_LIKELY_NULL(heap)) {
3848 		mem_heap_free(heap);
3849 	}
3850 
3851 	mtr_commit(&mtr);
3852 
3853 	btr_validate_index(index, NULL);
3854 }
3855 #endif /* UNIV_BTR_PRINT */
3856 
3857 #ifdef UNIV_DEBUG
3858 /************************************************************//**
3859 Checks that the node pointer to a page is appropriate.
3860 @return	TRUE */
3861 UNIV_INTERN
3862 ibool
btr_check_node_ptr(dict_index_t * index,buf_block_t * block,mtr_t * mtr)3863 btr_check_node_ptr(
3864 /*===============*/
3865 	dict_index_t*	index,	/*!< in: index tree */
3866 	buf_block_t*	block,	/*!< in: index page */
3867 	mtr_t*		mtr)	/*!< in: mtr */
3868 {
3869 	mem_heap_t*	heap;
3870 	dtuple_t*	tuple;
3871 	ulint*		offsets;
3872 	btr_cur_t	cursor;
3873 	page_t*		page = buf_block_get_frame(block);
3874 
3875 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
3876 	if (dict_index_get_page(index) == buf_block_get_page_no(block)) {
3877 
3878 		return(TRUE);
3879 	}
3880 
3881 	heap = mem_heap_create(256);
3882 	offsets = btr_page_get_father_block(NULL, heap, index, block, mtr,
3883 					    &cursor);
3884 
3885 	if (page_is_leaf(page)) {
3886 
3887 		goto func_exit;
3888 	}
3889 
3890 	tuple = dict_index_build_node_ptr(
3891 		index, page_rec_get_next(page_get_infimum_rec(page)), 0, heap,
3892 		btr_page_get_level(page, mtr));
3893 
3894 	ut_a(!cmp_dtuple_rec(tuple, btr_cur_get_rec(&cursor), offsets));
3895 func_exit:
3896 	mem_heap_free(heap);
3897 
3898 	return(TRUE);
3899 }
3900 #endif /* UNIV_DEBUG */
3901 
3902 /************************************************************//**
3903 Display identification information for a record. */
3904 static
3905 void
btr_index_rec_validate_report(const page_t * page,const rec_t * rec,const dict_index_t * index)3906 btr_index_rec_validate_report(
3907 /*==========================*/
3908 	const page_t*		page,	/*!< in: index page */
3909 	const rec_t*		rec,	/*!< in: index record */
3910 	const dict_index_t*	index)	/*!< in: index */
3911 {
3912 	fputs("InnoDB: Record in ", stderr);
3913 	dict_index_name_print(stderr, NULL, index);
3914 	fprintf(stderr, ", page %lu, at offset %lu\n",
3915 		page_get_page_no(page), (ulint) page_offset(rec));
3916 }
3917 
3918 /************************************************************//**
3919 Checks the size and number of fields in a record based on the definition of
3920 the index.
3921 @return	TRUE if ok */
3922 UNIV_INTERN
3923 ibool
btr_index_rec_validate(const rec_t * rec,const dict_index_t * index,ibool dump_on_error)3924 btr_index_rec_validate(
3925 /*===================*/
3926 	const rec_t*		rec,		/*!< in: index record */
3927 	const dict_index_t*	index,		/*!< in: index */
3928 	ibool			dump_on_error)	/*!< in: TRUE if the function
3929 						should print hex dump of record
3930 						and page on error */
3931 {
3932 	ulint		len;
3933 	ulint		n;
3934 	ulint		i;
3935 	const page_t*	page;
3936 	mem_heap_t*	heap	= NULL;
3937 	ulint		offsets_[REC_OFFS_NORMAL_SIZE];
3938 	ulint*		offsets	= offsets_;
3939 	rec_offs_init(offsets_);
3940 
3941 	page = page_align(rec);
3942 
3943 	if (UNIV_UNLIKELY(index->type & DICT_UNIVERSAL)) {
3944 		/* The insert buffer index tree can contain records from any
3945 		other index: we cannot check the number of fields or
3946 		their length */
3947 
3948 		return(TRUE);
3949 	}
3950 
3951 	if (UNIV_UNLIKELY((ibool)!!page_is_comp(page)
3952 			  != dict_table_is_comp(index->table))) {
3953 		btr_index_rec_validate_report(page, rec, index);
3954 		fprintf(stderr, "InnoDB: compact flag=%lu, should be %lu\n",
3955 			(ulong) !!page_is_comp(page),
3956 			(ulong) dict_table_is_comp(index->table));
3957 
3958 		return(FALSE);
3959 	}
3960 
3961 	n = dict_index_get_n_fields(index);
3962 
3963 	if (!page_is_comp(page)
3964 	    && UNIV_UNLIKELY(rec_get_n_fields_old(rec) != n)) {
3965 		btr_index_rec_validate_report(page, rec, index);
3966 		fprintf(stderr, "InnoDB: has %lu fields, should have %lu\n",
3967 			(ulong) rec_get_n_fields_old(rec), (ulong) n);
3968 
3969 		if (dump_on_error) {
3970 			buf_page_print(page, 0, BUF_PAGE_PRINT_NO_CRASH);
3971 
3972 			fputs("InnoDB: corrupt record ", stderr);
3973 			rec_print_old(stderr, rec);
3974 			putc('\n', stderr);
3975 		}
3976 		return(FALSE);
3977 	}
3978 
3979 	offsets = rec_get_offsets(rec, index, offsets, ULINT_UNDEFINED, &heap);
3980 
3981 	for (i = 0; i < n; i++) {
3982 		ulint	fixed_size = dict_col_get_fixed_size(
3983 			dict_index_get_nth_col(index, i), page_is_comp(page));
3984 
3985 		rec_get_nth_field_offs(offsets, i, &len);
3986 
3987 		/* Note that if fixed_size != 0, it equals the
3988 		length of a fixed-size column in the clustered index.
3989 		A prefix index of the column is of fixed, but different
3990 		length.  When fixed_size == 0, prefix_len is the maximum
3991 		length of the prefix index column. */
3992 
3993 		if ((dict_index_get_nth_field(index, i)->prefix_len == 0
3994 		     && len != UNIV_SQL_NULL && fixed_size
3995 		     && len != fixed_size)
3996 		    || (dict_index_get_nth_field(index, i)->prefix_len > 0
3997 			&& len != UNIV_SQL_NULL
3998 			&& len
3999 			> dict_index_get_nth_field(index, i)->prefix_len)) {
4000 
4001 			btr_index_rec_validate_report(page, rec, index);
4002 			fprintf(stderr,
4003 				"InnoDB: field %lu len is %lu,"
4004 				" should be %lu\n",
4005 				(ulong) i, (ulong) len, (ulong) fixed_size);
4006 
4007 			if (dump_on_error) {
4008 				buf_page_print(page, 0,
4009 					       BUF_PAGE_PRINT_NO_CRASH);
4010 
4011 				fputs("InnoDB: corrupt record ", stderr);
4012 				rec_print_new(stderr, rec, offsets);
4013 				putc('\n', stderr);
4014 			}
4015 			if (UNIV_LIKELY_NULL(heap)) {
4016 				mem_heap_free(heap);
4017 			}
4018 			return(FALSE);
4019 		}
4020 	}
4021 
4022 	if (UNIV_LIKELY_NULL(heap)) {
4023 		mem_heap_free(heap);
4024 	}
4025 	return(TRUE);
4026 }
4027 
4028 /************************************************************//**
4029 Checks the size and number of fields in records based on the definition of
4030 the index.
4031 @return	TRUE if ok */
4032 static
4033 ibool
btr_index_page_validate(buf_block_t * block,dict_index_t * index)4034 btr_index_page_validate(
4035 /*====================*/
4036 	buf_block_t*	block,	/*!< in: index page */
4037 	dict_index_t*	index)	/*!< in: index */
4038 {
4039 	page_cur_t	cur;
4040 	ibool		ret	= TRUE;
4041 #ifndef DBUG_OFF
4042 	ulint		nth	= 1;
4043 #endif /* !DBUG_OFF */
4044 
4045 	page_cur_set_before_first(block, &cur);
4046 
4047 	/* Directory slot 0 should only contain the infimum record. */
4048 	DBUG_EXECUTE_IF("check_table_rec_next",
4049 			ut_a(page_rec_get_nth_const(
4050 				     page_cur_get_page(&cur), 0)
4051 			     == cur.rec);
4052 			ut_a(page_dir_slot_get_n_owned(
4053 				     page_dir_get_nth_slot(
4054 					     page_cur_get_page(&cur), 0))
4055 			     == 1););
4056 
4057 	page_cur_move_to_next(&cur);
4058 
4059 	for (;;) {
4060 		if (page_cur_is_after_last(&cur)) {
4061 
4062 			break;
4063 		}
4064 
4065 		if (!btr_index_rec_validate(cur.rec, index, TRUE)) {
4066 
4067 			return(FALSE);
4068 		}
4069 
4070 		/* Verify that page_rec_get_nth_const() is correctly
4071 		retrieving each record. */
4072 		DBUG_EXECUTE_IF("check_table_rec_next",
4073 				ut_a(cur.rec == page_rec_get_nth_const(
4074 					     page_cur_get_page(&cur),
4075 					     page_rec_get_n_recs_before(
4076 						     cur.rec)));
4077 				ut_a(nth++ == page_rec_get_n_recs_before(
4078 					     cur.rec)););
4079 
4080 		page_cur_move_to_next(&cur);
4081 	}
4082 
4083 	return(ret);
4084 }
4085 
4086 /************************************************************//**
4087 Report an error on one page of an index tree. */
4088 static
4089 void
btr_validate_report1(dict_index_t * index,ulint level,const buf_block_t * block)4090 btr_validate_report1(
4091 /*=================*/
4092 	dict_index_t*		index,	/*!< in: index */
4093 	ulint			level,	/*!< in: B-tree level */
4094 	const buf_block_t*	block)	/*!< in: index page */
4095 {
4096 	fprintf(stderr, "InnoDB: Error in page %lu of ",
4097 		buf_block_get_page_no(block));
4098 	dict_index_name_print(stderr, NULL, index);
4099 	if (level) {
4100 		fprintf(stderr, ", index tree level %lu", level);
4101 	}
4102 	putc('\n', stderr);
4103 }
4104 
4105 /************************************************************//**
4106 Report an error on two pages of an index tree. */
4107 static
4108 void
btr_validate_report2(const dict_index_t * index,ulint level,const buf_block_t * block1,const buf_block_t * block2)4109 btr_validate_report2(
4110 /*=================*/
4111 	const dict_index_t*	index,	/*!< in: index */
4112 	ulint			level,	/*!< in: B-tree level */
4113 	const buf_block_t*	block1,	/*!< in: first index page */
4114 	const buf_block_t*	block2)	/*!< in: second index page */
4115 {
4116 	fprintf(stderr, "InnoDB: Error in pages %lu and %lu of ",
4117 		buf_block_get_page_no(block1),
4118 		buf_block_get_page_no(block2));
4119 	dict_index_name_print(stderr, NULL, index);
4120 	if (level) {
4121 		fprintf(stderr, ", index tree level %lu", level);
4122 	}
4123 	putc('\n', stderr);
4124 }
4125 
4126 /************************************************************//**
4127 Validates index tree level.
4128 @return	TRUE if ok */
4129 static
4130 ibool
btr_validate_level(dict_index_t * index,trx_t * trx,ulint level)4131 btr_validate_level(
4132 /*===============*/
4133 	dict_index_t*	index,	/*!< in: index tree */
4134 	trx_t*		trx,	/*!< in: transaction or NULL */
4135 	ulint		level)	/*!< in: level number */
4136 {
4137 	ulint		space;
4138 	ulint		zip_size;
4139 	buf_block_t*	block;
4140 	page_t*		page;
4141 	buf_block_t*	right_block = 0; /* remove warning */
4142 	page_t*		right_page = 0; /* remove warning */
4143 	page_t*		father_page;
4144 	btr_cur_t	node_cur;
4145 	btr_cur_t	right_node_cur;
4146 	rec_t*		rec;
4147 	ulint		right_page_no;
4148 	ulint		left_page_no;
4149 	page_cur_t	cursor;
4150 	dtuple_t*	node_ptr_tuple;
4151 	ibool		ret	= TRUE;
4152 	mtr_t		mtr;
4153 	mem_heap_t*	heap	= mem_heap_create(256);
4154 	ulint*		offsets	= NULL;
4155 	ulint*		offsets2= NULL;
4156 #ifdef UNIV_ZIP_DEBUG
4157 	page_zip_des_t*	page_zip;
4158 #endif /* UNIV_ZIP_DEBUG */
4159 
4160 	mtr_start(&mtr);
4161 
4162 	mtr_x_lock(dict_index_get_lock(index), &mtr);
4163 
4164 	block = btr_root_block_get(index, &mtr);
4165 	page = buf_block_get_frame(block);
4166 
4167 	space = dict_index_get_space(index);
4168 	zip_size = dict_table_zip_size(index->table);
4169 
4170 	while (level != btr_page_get_level(page, &mtr)) {
4171 		const rec_t*	node_ptr;
4172 
4173 		ut_a(space == buf_block_get_space(block));
4174 		ut_a(space == page_get_space_id(page));
4175 #ifdef UNIV_ZIP_DEBUG
4176 		page_zip = buf_block_get_page_zip(block);
4177 		ut_a(!page_zip || page_zip_validate(page_zip, page, index));
4178 #endif /* UNIV_ZIP_DEBUG */
4179 		ut_a(!page_is_leaf(page));
4180 
4181 		page_cur_set_before_first(block, &cursor);
4182 		page_cur_move_to_next(&cursor);
4183 
4184 		node_ptr = page_cur_get_rec(&cursor);
4185 		offsets = rec_get_offsets(node_ptr, index, offsets,
4186 					  ULINT_UNDEFINED, &heap);
4187 		block = btr_node_ptr_get_child(node_ptr, index, offsets, &mtr);
4188 		page = buf_block_get_frame(block);
4189 	}
4190 
4191 	/* Now we are on the desired level. Loop through the pages on that
4192 	level. */
4193 loop:
4194 	if (trx_is_interrupted(trx)) {
4195 		mtr_commit(&mtr);
4196 		mem_heap_free(heap);
4197 		return(ret);
4198 	}
4199 	mem_heap_empty(heap);
4200 	offsets = offsets2 = NULL;
4201 	mtr_x_lock(dict_index_get_lock(index), &mtr);
4202 
4203 #ifdef UNIV_ZIP_DEBUG
4204 	page_zip = buf_block_get_page_zip(block);
4205 	ut_a(!page_zip || page_zip_validate(page_zip, page, index));
4206 #endif /* UNIV_ZIP_DEBUG */
4207 
4208 	/* Check ordering etc. of records */
4209 
4210 	if (!page_validate(page, index)) {
4211 		btr_validate_report1(index, level, block);
4212 
4213 		ret = FALSE;
4214 	} else if (level == 0) {
4215 		/* We are on level 0. Check that the records have the right
4216 		number of fields, and field lengths are right. */
4217 
4218 		if (!btr_index_page_validate(block, index)) {
4219 
4220 			ret = FALSE;
4221 		}
4222 	}
4223 
4224 	ut_a(btr_page_get_level(page, &mtr) == level);
4225 
4226 	right_page_no = btr_page_get_next(page, &mtr);
4227 	left_page_no = btr_page_get_prev(page, &mtr);
4228 
4229 	ut_a(page_get_n_recs(page) > 0 || (level == 0
4230 					   && page_get_page_no(page)
4231 					   == dict_index_get_page(index)));
4232 
4233 	if (right_page_no != FIL_NULL) {
4234 		const rec_t*	right_rec;
4235 		right_block = btr_block_get(space, zip_size, right_page_no,
4236 					    RW_X_LATCH, index, &mtr);
4237 		right_page = buf_block_get_frame(right_block);
4238 		if (UNIV_UNLIKELY(btr_page_get_prev(right_page, &mtr)
4239 				  != page_get_page_no(page))) {
4240 			btr_validate_report2(index, level, block, right_block);
4241 			fputs("InnoDB: broken FIL_PAGE_NEXT"
4242 			      " or FIL_PAGE_PREV links\n", stderr);
4243 			buf_page_print(page, 0, BUF_PAGE_PRINT_NO_CRASH);
4244 			buf_page_print(right_page, 0, BUF_PAGE_PRINT_NO_CRASH);
4245 
4246 			ret = FALSE;
4247 		}
4248 
4249 		if (UNIV_UNLIKELY(page_is_comp(right_page)
4250 				  != page_is_comp(page))) {
4251 			btr_validate_report2(index, level, block, right_block);
4252 			fputs("InnoDB: 'compact' flag mismatch\n", stderr);
4253 			buf_page_print(page, 0, BUF_PAGE_PRINT_NO_CRASH);
4254 			buf_page_print(right_page, 0, BUF_PAGE_PRINT_NO_CRASH);
4255 
4256 			ret = FALSE;
4257 
4258 			goto node_ptr_fails;
4259 		}
4260 
4261 		rec = page_rec_get_prev(page_get_supremum_rec(page));
4262 		right_rec = page_rec_get_next(page_get_infimum_rec(
4263 						      right_page));
4264 		offsets = rec_get_offsets(rec, index,
4265 					  offsets, ULINT_UNDEFINED, &heap);
4266 		offsets2 = rec_get_offsets(right_rec, index,
4267 					   offsets2, ULINT_UNDEFINED, &heap);
4268 		if (UNIV_UNLIKELY(cmp_rec_rec(rec, right_rec,
4269 					      offsets, offsets2,
4270 					      index) >= 0)) {
4271 
4272 			btr_validate_report2(index, level, block, right_block);
4273 
4274 			fputs("InnoDB: records in wrong order"
4275 			      " on adjacent pages\n", stderr);
4276 
4277 			buf_page_print(page, 0, BUF_PAGE_PRINT_NO_CRASH);
4278 			buf_page_print(right_page, 0, BUF_PAGE_PRINT_NO_CRASH);
4279 
4280 			fputs("InnoDB: record ", stderr);
4281 			rec = page_rec_get_prev(page_get_supremum_rec(page));
4282 			rec_print(stderr, rec, index);
4283 			putc('\n', stderr);
4284 			fputs("InnoDB: record ", stderr);
4285 			rec = page_rec_get_next(
4286 				page_get_infimum_rec(right_page));
4287 			rec_print(stderr, rec, index);
4288 			putc('\n', stderr);
4289 
4290 			ret = FALSE;
4291 		}
4292 	}
4293 
4294 	if (level > 0 && left_page_no == FIL_NULL) {
4295 		ut_a(REC_INFO_MIN_REC_FLAG & rec_get_info_bits(
4296 			     page_rec_get_next(page_get_infimum_rec(page)),
4297 			     page_is_comp(page)));
4298 	}
4299 
4300 	if (buf_block_get_page_no(block) != dict_index_get_page(index)) {
4301 
4302 		/* Check father node pointers */
4303 
4304 		rec_t*	node_ptr;
4305 
4306 		offsets = btr_page_get_father_block(offsets, heap, index,
4307 						    block, &mtr, &node_cur);
4308 		father_page = btr_cur_get_page(&node_cur);
4309 		node_ptr = btr_cur_get_rec(&node_cur);
4310 
4311 		btr_cur_position(
4312 			index, page_rec_get_prev(page_get_supremum_rec(page)),
4313 			block, &node_cur);
4314 		offsets = btr_page_get_father_node_ptr(offsets, heap,
4315 						       &node_cur, &mtr);
4316 
4317 		if (UNIV_UNLIKELY(node_ptr != btr_cur_get_rec(&node_cur))
4318 		    || UNIV_UNLIKELY(btr_node_ptr_get_child_page_no(node_ptr,
4319 								    offsets)
4320 				     != buf_block_get_page_no(block))) {
4321 
4322 			btr_validate_report1(index, level, block);
4323 
4324 			fputs("InnoDB: node pointer to the page is wrong\n",
4325 			      stderr);
4326 
4327 			buf_page_print(father_page, 0, BUF_PAGE_PRINT_NO_CRASH);
4328 			buf_page_print(page, 0, BUF_PAGE_PRINT_NO_CRASH);
4329 
4330 			fputs("InnoDB: node ptr ", stderr);
4331 			rec_print(stderr, node_ptr, index);
4332 
4333 			rec = btr_cur_get_rec(&node_cur);
4334 			fprintf(stderr, "\n"
4335 				"InnoDB: node ptr child page n:o %lu\n",
4336 				(ulong) btr_node_ptr_get_child_page_no(
4337 					rec, offsets));
4338 
4339 			fputs("InnoDB: record on page ", stderr);
4340 			rec_print_new(stderr, rec, offsets);
4341 			putc('\n', stderr);
4342 			ret = FALSE;
4343 
4344 			goto node_ptr_fails;
4345 		}
4346 
4347 		if (!page_is_leaf(page)) {
4348 			node_ptr_tuple = dict_index_build_node_ptr(
4349 				index,
4350 				page_rec_get_next(page_get_infimum_rec(page)),
4351 				0, heap, btr_page_get_level(page, &mtr));
4352 
4353 			if (cmp_dtuple_rec(node_ptr_tuple, node_ptr,
4354 					   offsets)) {
4355 				const rec_t* first_rec = page_rec_get_next(
4356 					page_get_infimum_rec(page));
4357 
4358 				btr_validate_report1(index, level, block);
4359 
4360 				buf_page_print(father_page, 0,
4361 					       BUF_PAGE_PRINT_NO_CRASH);
4362 				buf_page_print(page, 0,
4363 					       BUF_PAGE_PRINT_NO_CRASH);
4364 
4365 				fputs("InnoDB: Error: node ptrs differ"
4366 				      " on levels > 0\n"
4367 				      "InnoDB: node ptr ", stderr);
4368 				rec_print_new(stderr, node_ptr, offsets);
4369 				fputs("InnoDB: first rec ", stderr);
4370 				rec_print(stderr, first_rec, index);
4371 				putc('\n', stderr);
4372 				ret = FALSE;
4373 
4374 				goto node_ptr_fails;
4375 			}
4376 		}
4377 
4378 		if (left_page_no == FIL_NULL) {
4379 			ut_a(node_ptr == page_rec_get_next(
4380 				     page_get_infimum_rec(father_page)));
4381 			ut_a(btr_page_get_prev(father_page, &mtr) == FIL_NULL);
4382 		}
4383 
4384 		if (right_page_no == FIL_NULL) {
4385 			ut_a(node_ptr == page_rec_get_prev(
4386 				     page_get_supremum_rec(father_page)));
4387 			ut_a(btr_page_get_next(father_page, &mtr) == FIL_NULL);
4388 		} else {
4389 			const rec_t*	right_node_ptr
4390 				= page_rec_get_next(node_ptr);
4391 
4392 			offsets = btr_page_get_father_block(
4393 				offsets, heap, index, right_block,
4394 				&mtr, &right_node_cur);
4395 			if (right_node_ptr
4396 			    != page_get_supremum_rec(father_page)) {
4397 
4398 				if (btr_cur_get_rec(&right_node_cur)
4399 				    != right_node_ptr) {
4400 					ret = FALSE;
4401 					fputs("InnoDB: node pointer to"
4402 					      " the right page is wrong\n",
4403 					      stderr);
4404 
4405 					btr_validate_report1(index, level,
4406 							     block);
4407 
4408 					buf_page_print(
4409 						father_page, 0,
4410 						BUF_PAGE_PRINT_NO_CRASH);
4411 					buf_page_print(
4412 						page, 0,
4413 						BUF_PAGE_PRINT_NO_CRASH);
4414 					buf_page_print(
4415 						right_page, 0,
4416 						BUF_PAGE_PRINT_NO_CRASH);
4417 				}
4418 			} else {
4419 				page_t*	right_father_page
4420 					= btr_cur_get_page(&right_node_cur);
4421 
4422 				if (btr_cur_get_rec(&right_node_cur)
4423 				    != page_rec_get_next(
4424 					    page_get_infimum_rec(
4425 						    right_father_page))) {
4426 					ret = FALSE;
4427 					fputs("InnoDB: node pointer 2 to"
4428 					      " the right page is wrong\n",
4429 					      stderr);
4430 
4431 					btr_validate_report1(index, level,
4432 							     block);
4433 
4434 					buf_page_print(
4435 						father_page, 0,
4436 						BUF_PAGE_PRINT_NO_CRASH);
4437 					buf_page_print(
4438 						right_father_page, 0,
4439 						BUF_PAGE_PRINT_NO_CRASH);
4440 					buf_page_print(
4441 						page, 0,
4442 						BUF_PAGE_PRINT_NO_CRASH);
4443 					buf_page_print(
4444 						right_page, 0,
4445 						BUF_PAGE_PRINT_NO_CRASH);
4446 				}
4447 
4448 				if (page_get_page_no(right_father_page)
4449 				    != btr_page_get_next(father_page, &mtr)) {
4450 
4451 					ret = FALSE;
4452 					fputs("InnoDB: node pointer 3 to"
4453 					      " the right page is wrong\n",
4454 					      stderr);
4455 
4456 					btr_validate_report1(index, level,
4457 							     block);
4458 
4459 					buf_page_print(
4460 						father_page, 0,
4461 						BUF_PAGE_PRINT_NO_CRASH);
4462 					buf_page_print(
4463 						right_father_page, 0,
4464 						BUF_PAGE_PRINT_NO_CRASH);
4465 					buf_page_print(
4466 						page, 0,
4467 						BUF_PAGE_PRINT_NO_CRASH);
4468 					buf_page_print(
4469 						right_page, 0,
4470 						BUF_PAGE_PRINT_NO_CRASH);
4471 				}
4472 			}
4473 		}
4474 	}
4475 
4476 node_ptr_fails:
4477 	/* Commit the mini-transaction to release the latch on 'page'.
4478 	Re-acquire the latch on right_page, which will become 'page'
4479 	on the next loop.  The page has already been checked. */
4480 	mtr_commit(&mtr);
4481 
4482 	if (right_page_no != FIL_NULL) {
4483 		mtr_start(&mtr);
4484 
4485 		block = btr_block_get(space, zip_size, right_page_no,
4486 				      RW_X_LATCH, index, &mtr);
4487 		page = buf_block_get_frame(block);
4488 
4489 		goto loop;
4490 	}
4491 
4492 	mem_heap_free(heap);
4493 	return(ret);
4494 }
4495 
4496 /**************************************************************//**
4497 Checks the consistency of an index tree.
4498 @return	TRUE if ok */
4499 UNIV_INTERN
4500 ibool
btr_validate_index(dict_index_t * index,trx_t * trx)4501 btr_validate_index(
4502 /*===============*/
4503 	dict_index_t*	index,	/*!< in: index */
4504 	trx_t*		trx)	/*!< in: transaction or NULL */
4505 {
4506 	mtr_t	mtr;
4507 	page_t*	root;
4508 	ulint	i;
4509 	ulint	n;
4510 
4511 	mtr_start(&mtr);
4512 	mtr_x_lock(dict_index_get_lock(index), &mtr);
4513 
4514 	root = btr_root_get(index, &mtr);
4515 	n = btr_page_get_level(root, &mtr);
4516 
4517 	for (i = 0; i <= n && !trx_is_interrupted(trx); i++) {
4518 		if (!btr_validate_level(index, trx, n - i)) {
4519 
4520 			mtr_commit(&mtr);
4521 
4522 			return(FALSE);
4523 		}
4524 	}
4525 
4526 	mtr_commit(&mtr);
4527 
4528 	return(TRUE);
4529 }
4530 
4531 /**************************************************************//**
4532 Checks if the page in the cursor can be merged with given page.
4533 If necessary, re-organize the merge_page.
4534 @return	TRUE if possible to merge. */
4535 UNIV_INTERN
4536 ibool
btr_can_merge_with_page(btr_cur_t * cursor,ulint page_no,buf_block_t ** merge_block,mtr_t * mtr)4537 btr_can_merge_with_page(
4538 /*====================*/
4539 	btr_cur_t*	cursor,		/*!< in: cursor on the page to merge */
4540 	ulint		page_no,	/*!< in: a sibling page */
4541 	buf_block_t**	merge_block,	/*!< out: the merge block */
4542 	mtr_t*		mtr)		/*!< in: mini-transaction */
4543 {
4544 	dict_index_t*	index;
4545 	page_t*		page;
4546 	ulint		space;
4547 	ulint		zip_size;
4548 	ulint		n_recs;
4549 	ulint		data_size;
4550         ulint           max_ins_size_reorg;
4551 	ulint		max_ins_size;
4552 	buf_block_t*	mblock;
4553 	page_t*		mpage;
4554 	DBUG_ENTER("btr_can_merge_with_page");
4555 
4556 	if (page_no == FIL_NULL) {
4557 		goto error;
4558 	}
4559 
4560 	index = btr_cur_get_index(cursor);
4561 	page  = btr_cur_get_page(cursor);
4562 	space = dict_index_get_space(index);
4563         zip_size = dict_table_zip_size(index->table);
4564 
4565 	mblock = btr_block_get(space, zip_size, page_no, RW_X_LATCH, index,
4566 			       mtr);
4567 	mpage = buf_block_get_frame(mblock);
4568 
4569         n_recs = page_get_n_recs(page);
4570         data_size = page_get_data_size(page);
4571 
4572         max_ins_size_reorg = page_get_max_insert_size_after_reorganize(
4573                 mpage, n_recs);
4574 
4575 	if (data_size > max_ins_size_reorg) {
4576 		goto error;
4577 	}
4578 
4579 	max_ins_size = page_get_max_insert_size(mpage, n_recs);
4580 
4581 	if (data_size > max_ins_size) {
4582 
4583 		/* We have to reorganize mpage */
4584 
4585 		if (!btr_page_reorganize(mblock, index, mtr)) {
4586 
4587 			goto error;
4588 		}
4589 
4590 		max_ins_size = page_get_max_insert_size(mpage, n_recs);
4591 
4592 		ut_ad(page_validate(mpage, index));
4593 		ut_ad(max_ins_size == max_ins_size_reorg);
4594 
4595 		if (data_size > max_ins_size) {
4596 
4597 			/* Add fault tolerance, though this should
4598 			never happen */
4599 
4600 			goto error;
4601 		}
4602 	}
4603 
4604 	*merge_block = mblock;
4605 	DBUG_RETURN(TRUE);
4606 
4607 error:
4608 	*merge_block = NULL;
4609 	DBUG_RETURN(FALSE);
4610 }
4611 
4612 #endif /* !UNIV_HOTBACKUP */
4613