1 /*-------------------------------------------------------------------------
2 *
3 * nbtinsert.c
4 * Item insertion in Lehman and Yao btrees for Postgres.
5 *
6 * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 *
10 * IDENTIFICATION
11 * src/backend/access/nbtree/nbtinsert.c
12 *
13 *-------------------------------------------------------------------------
14 */
15
16 #include "postgres.h"
17
18 #include "access/heapam.h"
19 #include "access/nbtree.h"
20 #include "access/transam.h"
21 #include "access/xloginsert.h"
22 #include "miscadmin.h"
23 #include "storage/lmgr.h"
24 #include "storage/predicate.h"
25 #include "utils/tqual.h"
26
27
28 typedef struct
29 {
30 /* context data for _bt_checksplitloc */
31 Size newitemsz; /* size of new item to be inserted */
32 int fillfactor; /* needed when splitting rightmost page */
33 bool is_leaf; /* T if splitting a leaf page */
34 bool is_rightmost; /* T if splitting a rightmost page */
35 OffsetNumber newitemoff; /* where the new item is to be inserted */
36 int leftspace; /* space available for items on left page */
37 int rightspace; /* space available for items on right page */
38 int olddataitemstotal; /* space taken by old items */
39
40 bool have_split; /* found a valid split? */
41
42 /* these fields valid only if have_split is true */
43 bool newitemonleft; /* new item on left or right of best split */
44 OffsetNumber firstright; /* best split point */
45 int best_delta; /* best size delta so far */
46 } FindSplitData;
47
48
49 static Buffer _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf);
50
51 static TransactionId _bt_check_unique(Relation rel, IndexTuple itup,
52 Relation heapRel, Buffer buf, OffsetNumber offset,
53 ScanKey itup_scankey,
54 IndexUniqueCheck checkUnique, bool *is_unique,
55 uint32 *speculativeToken);
56 static void _bt_findinsertloc(Relation rel,
57 Buffer *bufptr,
58 OffsetNumber *offsetptr,
59 int keysz,
60 ScanKey scankey,
61 IndexTuple newtup,
62 BTStack stack,
63 Relation heapRel);
64 static void _bt_insertonpg(Relation rel, Buffer buf, Buffer cbuf,
65 BTStack stack,
66 IndexTuple itup,
67 OffsetNumber newitemoff,
68 bool split_only_page);
69 static Buffer _bt_split(Relation rel, Buffer buf, Buffer cbuf,
70 OffsetNumber firstright, OffsetNumber newitemoff, Size newitemsz,
71 IndexTuple newitem, bool newitemonleft);
72 static void _bt_insert_parent(Relation rel, Buffer buf, Buffer rbuf,
73 BTStack stack, bool is_root, bool is_only);
74 static OffsetNumber _bt_findsplitloc(Relation rel, Page page,
75 OffsetNumber newitemoff,
76 Size newitemsz,
77 bool *newitemonleft);
78 static void _bt_checksplitloc(FindSplitData *state,
79 OffsetNumber firstoldonright, bool newitemonleft,
80 int dataitemstoleft, Size firstoldonrightsz);
81 static bool _bt_pgaddtup(Page page, Size itemsize, IndexTuple itup,
82 OffsetNumber itup_off);
83 static bool _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
84 int keysz, ScanKey scankey);
85 static void _bt_vacuum_one_page(Relation rel, Buffer buffer, Relation heapRel);
86
87
88 /*
89 * _bt_doinsert() -- Handle insertion of a single index tuple in the tree.
90 *
91 * This routine is called by the public interface routine, btinsert.
92 * By here, itup is filled in, including the TID.
93 *
94 * If checkUnique is UNIQUE_CHECK_NO or UNIQUE_CHECK_PARTIAL, this
95 * will allow duplicates. Otherwise (UNIQUE_CHECK_YES or
96 * UNIQUE_CHECK_EXISTING) it will throw error for a duplicate.
97 * For UNIQUE_CHECK_EXISTING we merely run the duplicate check, and
98 * don't actually insert.
99 *
100 * The result value is only significant for UNIQUE_CHECK_PARTIAL:
101 * it must be TRUE if the entry is known unique, else FALSE.
102 * (In the current implementation we'll also return TRUE after a
103 * successful UNIQUE_CHECK_YES or UNIQUE_CHECK_EXISTING call, but
104 * that's just a coding artifact.)
105 */
106 bool
_bt_doinsert(Relation rel,IndexTuple itup,IndexUniqueCheck checkUnique,Relation heapRel)107 _bt_doinsert(Relation rel, IndexTuple itup,
108 IndexUniqueCheck checkUnique, Relation heapRel)
109 {
110 bool is_unique = false;
111 int natts = rel->rd_rel->relnatts;
112 ScanKey itup_scankey;
113 BTStack stack;
114 Buffer buf;
115 OffsetNumber offset;
116
117 /* we need an insertion scan key to do our search, so build one */
118 itup_scankey = _bt_mkscankey(rel, itup);
119
120 top:
121 /* find the first page containing this key */
122 stack = _bt_search(rel, natts, itup_scankey, false, &buf, BT_WRITE, NULL);
123
124 offset = InvalidOffsetNumber;
125
126 /* trade in our read lock for a write lock */
127 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
128 LockBuffer(buf, BT_WRITE);
129
130 /*
131 * If the page was split between the time that we surrendered our read
132 * lock and acquired our write lock, then this page may no longer be the
133 * right place for the key we want to insert. In this case, we need to
134 * move right in the tree. See Lehman and Yao for an excruciatingly
135 * precise description.
136 */
137 buf = _bt_moveright(rel, buf, natts, itup_scankey, false,
138 true, stack, BT_WRITE, NULL);
139
140 /*
141 * If we're not allowing duplicates, make sure the key isn't already in
142 * the index.
143 *
144 * NOTE: obviously, _bt_check_unique can only detect keys that are already
145 * in the index; so it cannot defend against concurrent insertions of the
146 * same key. We protect against that by means of holding a write lock on
147 * the target page. Any other would-be inserter of the same key must
148 * acquire a write lock on the same target page, so only one would-be
149 * inserter can be making the check at one time. Furthermore, once we are
150 * past the check we hold write locks continuously until we have performed
151 * our insertion, so no later inserter can fail to see our insertion.
152 * (This requires some care in _bt_insertonpg.)
153 *
154 * If we must wait for another xact, we release the lock while waiting,
155 * and then must start over completely.
156 *
157 * For a partial uniqueness check, we don't wait for the other xact. Just
158 * let the tuple in and return false for possibly non-unique, or true for
159 * definitely unique.
160 */
161 if (checkUnique != UNIQUE_CHECK_NO)
162 {
163 TransactionId xwait;
164 uint32 speculativeToken;
165
166 offset = _bt_binsrch(rel, buf, natts, itup_scankey, false);
167 xwait = _bt_check_unique(rel, itup, heapRel, buf, offset, itup_scankey,
168 checkUnique, &is_unique, &speculativeToken);
169
170 if (TransactionIdIsValid(xwait))
171 {
172 /* Have to wait for the other guy ... */
173 _bt_relbuf(rel, buf);
174
175 /*
176 * If it's a speculative insertion, wait for it to finish (ie. to
177 * go ahead with the insertion, or kill the tuple). Otherwise
178 * wait for the transaction to finish as usual.
179 */
180 if (speculativeToken)
181 SpeculativeInsertionWait(xwait, speculativeToken);
182 else
183 XactLockTableWait(xwait, rel, &itup->t_tid, XLTW_InsertIndex);
184
185 /* start over... */
186 _bt_freestack(stack);
187 goto top;
188 }
189 }
190
191 if (checkUnique != UNIQUE_CHECK_EXISTING)
192 {
193 /*
194 * The only conflict predicate locking cares about for indexes is when
195 * an index tuple insert conflicts with an existing lock. Since the
196 * actual location of the insert is hard to predict because of the
197 * random search used to prevent O(N^2) performance when there are
198 * many duplicate entries, we can just use the "first valid" page.
199 */
200 CheckForSerializableConflictIn(rel, NULL, buf);
201 /* do the insertion */
202 _bt_findinsertloc(rel, &buf, &offset, natts, itup_scankey, itup,
203 stack, heapRel);
204 _bt_insertonpg(rel, buf, InvalidBuffer, stack, itup, offset, false);
205 }
206 else
207 {
208 /* just release the buffer */
209 _bt_relbuf(rel, buf);
210 }
211
212 /* be tidy */
213 _bt_freestack(stack);
214 _bt_freeskey(itup_scankey);
215
216 return is_unique;
217 }
218
219 /*
220 * _bt_check_unique() -- Check for violation of unique index constraint
221 *
222 * offset points to the first possible item that could conflict. It can
223 * also point to end-of-page, which means that the first tuple to check
224 * is the first tuple on the next page.
225 *
226 * Returns InvalidTransactionId if there is no conflict, else an xact ID
227 * we must wait for to see if it commits a conflicting tuple. If an actual
228 * conflict is detected, no return --- just ereport(). If an xact ID is
229 * returned, and the conflicting tuple still has a speculative insertion in
230 * progress, *speculativeToken is set to non-zero, and the caller can wait for
231 * the verdict on the insertion using SpeculativeInsertionWait().
232 *
233 * However, if checkUnique == UNIQUE_CHECK_PARTIAL, we always return
234 * InvalidTransactionId because we don't want to wait. In this case we
235 * set *is_unique to false if there is a potential conflict, and the
236 * core code must redo the uniqueness check later.
237 */
238 static TransactionId
_bt_check_unique(Relation rel,IndexTuple itup,Relation heapRel,Buffer buf,OffsetNumber offset,ScanKey itup_scankey,IndexUniqueCheck checkUnique,bool * is_unique,uint32 * speculativeToken)239 _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
240 Buffer buf, OffsetNumber offset, ScanKey itup_scankey,
241 IndexUniqueCheck checkUnique, bool *is_unique,
242 uint32 *speculativeToken)
243 {
244 TupleDesc itupdesc = RelationGetDescr(rel);
245 int natts = rel->rd_rel->relnatts;
246 SnapshotData SnapshotDirty;
247 OffsetNumber maxoff;
248 Page page;
249 BTPageOpaque opaque;
250 Buffer nbuf = InvalidBuffer;
251 bool found = false;
252
253 /* Assume unique until we find a duplicate */
254 *is_unique = true;
255
256 InitDirtySnapshot(SnapshotDirty);
257
258 page = BufferGetPage(buf);
259 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
260 maxoff = PageGetMaxOffsetNumber(page);
261
262 /*
263 * Scan over all equal tuples, looking for live conflicts.
264 */
265 for (;;)
266 {
267 ItemId curitemid;
268 IndexTuple curitup;
269 BlockNumber nblkno;
270
271 /*
272 * make sure the offset points to an actual item before trying to
273 * examine it...
274 */
275 if (offset <= maxoff)
276 {
277 curitemid = PageGetItemId(page, offset);
278
279 /*
280 * We can skip items that are marked killed.
281 *
282 * Formerly, we applied _bt_isequal() before checking the kill
283 * flag, so as to fall out of the item loop as soon as possible.
284 * However, in the presence of heavy update activity an index may
285 * contain many killed items with the same key; running
286 * _bt_isequal() on each killed item gets expensive. Furthermore
287 * it is likely that the non-killed version of each key appears
288 * first, so that we didn't actually get to exit any sooner
289 * anyway. So now we just advance over killed items as quickly as
290 * we can. We only apply _bt_isequal() when we get to a non-killed
291 * item or the end of the page.
292 */
293 if (!ItemIdIsDead(curitemid))
294 {
295 ItemPointerData htid;
296 bool all_dead;
297
298 /*
299 * _bt_compare returns 0 for (1,NULL) and (1,NULL) - this's
300 * how we handling NULLs - and so we must not use _bt_compare
301 * in real comparison, but only for ordering/finding items on
302 * pages. - vadim 03/24/97
303 */
304 if (!_bt_isequal(itupdesc, page, offset, natts, itup_scankey))
305 break; /* we're past all the equal tuples */
306
307 /* okay, we gotta fetch the heap tuple ... */
308 curitup = (IndexTuple) PageGetItem(page, curitemid);
309 htid = curitup->t_tid;
310
311 /*
312 * If we are doing a recheck, we expect to find the tuple we
313 * are rechecking. It's not a duplicate, but we have to keep
314 * scanning.
315 */
316 if (checkUnique == UNIQUE_CHECK_EXISTING &&
317 ItemPointerCompare(&htid, &itup->t_tid) == 0)
318 {
319 found = true;
320 }
321
322 /*
323 * We check the whole HOT-chain to see if there is any tuple
324 * that satisfies SnapshotDirty. This is necessary because we
325 * have just a single index entry for the entire chain.
326 */
327 else if (heap_hot_search(&htid, heapRel, &SnapshotDirty,
328 &all_dead))
329 {
330 TransactionId xwait;
331
332 /*
333 * It is a duplicate. If we are only doing a partial
334 * check, then don't bother checking if the tuple is being
335 * updated in another transaction. Just return the fact
336 * that it is a potential conflict and leave the full
337 * check till later.
338 */
339 if (checkUnique == UNIQUE_CHECK_PARTIAL)
340 {
341 if (nbuf != InvalidBuffer)
342 _bt_relbuf(rel, nbuf);
343 *is_unique = false;
344 return InvalidTransactionId;
345 }
346
347 /*
348 * If this tuple is being updated by other transaction
349 * then we have to wait for its commit/abort.
350 */
351 xwait = (TransactionIdIsValid(SnapshotDirty.xmin)) ?
352 SnapshotDirty.xmin : SnapshotDirty.xmax;
353
354 if (TransactionIdIsValid(xwait))
355 {
356 if (nbuf != InvalidBuffer)
357 _bt_relbuf(rel, nbuf);
358 /* Tell _bt_doinsert to wait... */
359 *speculativeToken = SnapshotDirty.speculativeToken;
360 return xwait;
361 }
362
363 /*
364 * Otherwise we have a definite conflict. But before
365 * complaining, look to see if the tuple we want to insert
366 * is itself now committed dead --- if so, don't complain.
367 * This is a waste of time in normal scenarios but we must
368 * do it to support CREATE INDEX CONCURRENTLY.
369 *
370 * We must follow HOT-chains here because during
371 * concurrent index build, we insert the root TID though
372 * the actual tuple may be somewhere in the HOT-chain.
373 * While following the chain we might not stop at the
374 * exact tuple which triggered the insert, but that's OK
375 * because if we find a live tuple anywhere in this chain,
376 * we have a unique key conflict. The other live tuple is
377 * not part of this chain because it had a different index
378 * entry.
379 */
380 htid = itup->t_tid;
381 if (heap_hot_search(&htid, heapRel, SnapshotSelf, NULL))
382 {
383 /* Normal case --- it's still live */
384 }
385 else
386 {
387 /*
388 * It's been deleted, so no error, and no need to
389 * continue searching
390 */
391 break;
392 }
393
394 /*
395 * Check for a conflict-in as we would if we were going to
396 * write to this page. We aren't actually going to write,
397 * but we want a chance to report SSI conflicts that would
398 * otherwise be masked by this unique constraint
399 * violation.
400 */
401 CheckForSerializableConflictIn(rel, NULL, buf);
402
403 /*
404 * This is a definite conflict. Break the tuple down into
405 * datums and report the error. But first, make sure we
406 * release the buffer locks we're holding ---
407 * BuildIndexValueDescription could make catalog accesses,
408 * which in the worst case might touch this same index and
409 * cause deadlocks.
410 */
411 if (nbuf != InvalidBuffer)
412 _bt_relbuf(rel, nbuf);
413 _bt_relbuf(rel, buf);
414
415 {
416 Datum values[INDEX_MAX_KEYS];
417 bool isnull[INDEX_MAX_KEYS];
418 char *key_desc;
419
420 index_deform_tuple(itup, RelationGetDescr(rel),
421 values, isnull);
422
423 key_desc = BuildIndexValueDescription(rel, values,
424 isnull);
425
426 ereport(ERROR,
427 (errcode(ERRCODE_UNIQUE_VIOLATION),
428 errmsg("duplicate key value violates unique constraint \"%s\"",
429 RelationGetRelationName(rel)),
430 key_desc ? errdetail("Key %s already exists.",
431 key_desc) : 0,
432 errtableconstraint(heapRel,
433 RelationGetRelationName(rel))));
434 }
435 }
436 else if (all_dead)
437 {
438 /*
439 * The conflicting tuple (or whole HOT chain) is dead to
440 * everyone, so we may as well mark the index entry
441 * killed.
442 */
443 ItemIdMarkDead(curitemid);
444 opaque->btpo_flags |= BTP_HAS_GARBAGE;
445
446 /*
447 * Mark buffer with a dirty hint, since state is not
448 * crucial. Be sure to mark the proper buffer dirty.
449 */
450 if (nbuf != InvalidBuffer)
451 MarkBufferDirtyHint(nbuf, true);
452 else
453 MarkBufferDirtyHint(buf, true);
454 }
455 }
456 }
457
458 /*
459 * Advance to next tuple to continue checking.
460 */
461 if (offset < maxoff)
462 offset = OffsetNumberNext(offset);
463 else
464 {
465 /* If scankey == hikey we gotta check the next page too */
466 if (P_RIGHTMOST(opaque))
467 break;
468 if (!_bt_isequal(itupdesc, page, P_HIKEY,
469 natts, itup_scankey))
470 break;
471 /* Advance to next non-dead page --- there must be one */
472 for (;;)
473 {
474 nblkno = opaque->btpo_next;
475 nbuf = _bt_relandgetbuf(rel, nbuf, nblkno, BT_READ);
476 page = BufferGetPage(nbuf);
477 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
478 if (!P_IGNORE(opaque))
479 break;
480 if (P_RIGHTMOST(opaque))
481 elog(ERROR, "fell off the end of index \"%s\"",
482 RelationGetRelationName(rel));
483 }
484 maxoff = PageGetMaxOffsetNumber(page);
485 offset = P_FIRSTDATAKEY(opaque);
486 }
487 }
488
489 /*
490 * If we are doing a recheck then we should have found the tuple we are
491 * checking. Otherwise there's something very wrong --- probably, the
492 * index is on a non-immutable expression.
493 */
494 if (checkUnique == UNIQUE_CHECK_EXISTING && !found)
495 ereport(ERROR,
496 (errcode(ERRCODE_INTERNAL_ERROR),
497 errmsg("failed to re-find tuple within index \"%s\"",
498 RelationGetRelationName(rel)),
499 errhint("This may be because of a non-immutable index expression."),
500 errtableconstraint(heapRel,
501 RelationGetRelationName(rel))));
502
503 if (nbuf != InvalidBuffer)
504 _bt_relbuf(rel, nbuf);
505
506 return InvalidTransactionId;
507 }
508
509
510 /*
511 * _bt_findinsertloc() -- Finds an insert location for a tuple
512 *
513 * If the new key is equal to one or more existing keys, we can
514 * legitimately place it anywhere in the series of equal keys --- in fact,
515 * if the new key is equal to the page's "high key" we can place it on
516 * the next page. If it is equal to the high key, and there's not room
517 * to insert the new tuple on the current page without splitting, then
518 * we can move right hoping to find more free space and avoid a split.
519 * (We should not move right indefinitely, however, since that leads to
520 * O(N^2) insertion behavior in the presence of many equal keys.)
521 * Once we have chosen the page to put the key on, we'll insert it before
522 * any existing equal keys because of the way _bt_binsrch() works.
523 *
524 * If there's not enough room in the space, we try to make room by
525 * removing any LP_DEAD tuples.
526 *
527 * On entry, *bufptr and *offsetptr point to the first legal position
528 * where the new tuple could be inserted. The caller should hold an
529 * exclusive lock on *bufptr. *offsetptr can also be set to
530 * InvalidOffsetNumber, in which case the function will search for the
531 * right location within the page if needed. On exit, they point to the
532 * chosen insert location. If _bt_findinsertloc decides to move right,
533 * the lock and pin on the original page will be released and the new
534 * page returned to the caller is exclusively locked instead.
535 *
536 * newtup is the new tuple we're inserting, and scankey is an insertion
537 * type scan key for it.
538 */
539 static void
_bt_findinsertloc(Relation rel,Buffer * bufptr,OffsetNumber * offsetptr,int keysz,ScanKey scankey,IndexTuple newtup,BTStack stack,Relation heapRel)540 _bt_findinsertloc(Relation rel,
541 Buffer *bufptr,
542 OffsetNumber *offsetptr,
543 int keysz,
544 ScanKey scankey,
545 IndexTuple newtup,
546 BTStack stack,
547 Relation heapRel)
548 {
549 Buffer buf = *bufptr;
550 Page page = BufferGetPage(buf);
551 Size itemsz;
552 BTPageOpaque lpageop;
553 bool movedright,
554 vacuumed;
555 OffsetNumber newitemoff;
556 OffsetNumber firstlegaloff = *offsetptr;
557
558 lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
559
560 itemsz = IndexTupleDSize(*newtup);
561 itemsz = MAXALIGN(itemsz); /* be safe, PageAddItem will do this but we
562 * need to be consistent */
563
564 /*
565 * Check whether the item can fit on a btree page at all. (Eventually, we
566 * ought to try to apply TOAST methods if not.) We actually need to be
567 * able to fit three items on every page, so restrict any one item to 1/3
568 * the per-page available space. Note that at this point, itemsz doesn't
569 * include the ItemId.
570 *
571 * NOTE: if you change this, see also the similar code in _bt_buildadd().
572 */
573 if (itemsz > BTMaxItemSize(page))
574 ereport(ERROR,
575 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
576 errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
577 itemsz, BTMaxItemSize(page),
578 RelationGetRelationName(rel)),
579 errhint("Values larger than 1/3 of a buffer page cannot be indexed.\n"
580 "Consider a function index of an MD5 hash of the value, "
581 "or use full text indexing."),
582 errtableconstraint(heapRel,
583 RelationGetRelationName(rel))));
584
585 /*----------
586 * If we will need to split the page to put the item on this page,
587 * check whether we can put the tuple somewhere to the right,
588 * instead. Keep scanning right until we
589 * (a) find a page with enough free space,
590 * (b) reach the last page where the tuple can legally go, or
591 * (c) get tired of searching.
592 * (c) is not flippant; it is important because if there are many
593 * pages' worth of equal keys, it's better to split one of the early
594 * pages than to scan all the way to the end of the run of equal keys
595 * on every insert. We implement "get tired" as a random choice,
596 * since stopping after scanning a fixed number of pages wouldn't work
597 * well (we'd never reach the right-hand side of previously split
598 * pages). Currently the probability of moving right is set at 0.99,
599 * which may seem too high to change the behavior much, but it does an
600 * excellent job of preventing O(N^2) behavior with many equal keys.
601 *----------
602 */
603 movedright = false;
604 vacuumed = false;
605 while (PageGetFreeSpace(page) < itemsz)
606 {
607 Buffer rbuf;
608 BlockNumber rblkno;
609
610 /*
611 * before considering moving right, see if we can obtain enough space
612 * by erasing LP_DEAD items
613 */
614 if (P_ISLEAF(lpageop) && P_HAS_GARBAGE(lpageop))
615 {
616 _bt_vacuum_one_page(rel, buf, heapRel);
617
618 /*
619 * remember that we vacuumed this page, because that makes the
620 * hint supplied by the caller invalid
621 */
622 vacuumed = true;
623
624 if (PageGetFreeSpace(page) >= itemsz)
625 break; /* OK, now we have enough space */
626 }
627
628 /*
629 * nope, so check conditions (b) and (c) enumerated above
630 */
631 if (P_RIGHTMOST(lpageop) ||
632 _bt_compare(rel, keysz, scankey, page, P_HIKEY) != 0 ||
633 random() <= (MAX_RANDOM_VALUE / 100))
634 break;
635
636 /*
637 * step right to next non-dead page
638 *
639 * must write-lock that page before releasing write lock on current
640 * page; else someone else's _bt_check_unique scan could fail to see
641 * our insertion. write locks on intermediate dead pages won't do
642 * because we don't know when they will get de-linked from the tree.
643 */
644 rbuf = InvalidBuffer;
645
646 rblkno = lpageop->btpo_next;
647 for (;;)
648 {
649 rbuf = _bt_relandgetbuf(rel, rbuf, rblkno, BT_WRITE);
650 page = BufferGetPage(rbuf);
651 lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
652
653 /*
654 * If this page was incompletely split, finish the split now. We
655 * do this while holding a lock on the left sibling, which is not
656 * good because finishing the split could be a fairly lengthy
657 * operation. But this should happen very seldom.
658 */
659 if (P_INCOMPLETE_SPLIT(lpageop))
660 {
661 _bt_finish_split(rel, rbuf, stack);
662 rbuf = InvalidBuffer;
663 continue;
664 }
665
666 if (!P_IGNORE(lpageop))
667 break;
668 if (P_RIGHTMOST(lpageop))
669 elog(ERROR, "fell off the end of index \"%s\"",
670 RelationGetRelationName(rel));
671
672 rblkno = lpageop->btpo_next;
673 }
674 _bt_relbuf(rel, buf);
675 buf = rbuf;
676 movedright = true;
677 vacuumed = false;
678 }
679
680 /*
681 * Now we are on the right page, so find the insert position. If we moved
682 * right at all, we know we should insert at the start of the page. If we
683 * didn't move right, we can use the firstlegaloff hint if the caller
684 * supplied one, unless we vacuumed the page which might have moved tuples
685 * around making the hint invalid. If we didn't move right or can't use
686 * the hint, find the position by searching.
687 */
688 if (movedright)
689 newitemoff = P_FIRSTDATAKEY(lpageop);
690 else if (firstlegaloff != InvalidOffsetNumber && !vacuumed)
691 newitemoff = firstlegaloff;
692 else
693 newitemoff = _bt_binsrch(rel, buf, keysz, scankey, false);
694
695 *bufptr = buf;
696 *offsetptr = newitemoff;
697 }
698
699 /*----------
700 * _bt_insertonpg() -- Insert a tuple on a particular page in the index.
701 *
702 * This recursive procedure does the following things:
703 *
704 * + if necessary, splits the target page (making sure that the
705 * split is equitable as far as post-insert free space goes).
706 * + inserts the tuple.
707 * + if the page was split, pops the parent stack, and finds the
708 * right place to insert the new child pointer (by walking
709 * right using information stored in the parent stack).
710 * + invokes itself with the appropriate tuple for the right
711 * child page on the parent.
712 * + updates the metapage if a true root or fast root is split.
713 *
714 * On entry, we must have the correct buffer in which to do the
715 * insertion, and the buffer must be pinned and write-locked. On return,
716 * we will have dropped both the pin and the lock on the buffer.
717 *
718 * When inserting to a non-leaf page, 'cbuf' is the left-sibling of the
719 * page we're inserting the downlink for. This function will clear the
720 * INCOMPLETE_SPLIT flag on it, and release the buffer.
721 *
722 * The locking interactions in this code are critical. You should
723 * grok Lehman and Yao's paper before making any changes. In addition,
724 * you need to understand how we disambiguate duplicate keys in this
725 * implementation, in order to be able to find our location using
726 * L&Y "move right" operations. Since we may insert duplicate user
727 * keys, and since these dups may propagate up the tree, we use the
728 * 'afteritem' parameter to position ourselves correctly for the
729 * insertion on internal pages.
730 *----------
731 */
732 static void
_bt_insertonpg(Relation rel,Buffer buf,Buffer cbuf,BTStack stack,IndexTuple itup,OffsetNumber newitemoff,bool split_only_page)733 _bt_insertonpg(Relation rel,
734 Buffer buf,
735 Buffer cbuf,
736 BTStack stack,
737 IndexTuple itup,
738 OffsetNumber newitemoff,
739 bool split_only_page)
740 {
741 Page page;
742 BTPageOpaque lpageop;
743 OffsetNumber firstright = InvalidOffsetNumber;
744 Size itemsz;
745
746 page = BufferGetPage(buf);
747 lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
748
749 /* child buffer must be given iff inserting on an internal page */
750 Assert(P_ISLEAF(lpageop) == !BufferIsValid(cbuf));
751
752 /* The caller should've finished any incomplete splits already. */
753 if (P_INCOMPLETE_SPLIT(lpageop))
754 elog(ERROR, "cannot insert to incompletely split page %u",
755 BufferGetBlockNumber(buf));
756
757 itemsz = IndexTupleDSize(*itup);
758 itemsz = MAXALIGN(itemsz); /* be safe, PageAddItem will do this but we
759 * need to be consistent */
760
761 /*
762 * Do we need to split the page to fit the item on it?
763 *
764 * Note: PageGetFreeSpace() subtracts sizeof(ItemIdData) from its result,
765 * so this comparison is correct even though we appear to be accounting
766 * only for the item and not for its line pointer.
767 */
768 if (PageGetFreeSpace(page) < itemsz)
769 {
770 bool is_root = P_ISROOT(lpageop);
771 bool is_only = P_LEFTMOST(lpageop) && P_RIGHTMOST(lpageop);
772 bool newitemonleft;
773 Buffer rbuf;
774
775 /* Choose the split point */
776 firstright = _bt_findsplitloc(rel, page,
777 newitemoff, itemsz,
778 &newitemonleft);
779
780 /* split the buffer into left and right halves */
781 rbuf = _bt_split(rel, buf, cbuf, firstright,
782 newitemoff, itemsz, itup, newitemonleft);
783 PredicateLockPageSplit(rel,
784 BufferGetBlockNumber(buf),
785 BufferGetBlockNumber(rbuf));
786
787 /*----------
788 * By here,
789 *
790 * + our target page has been split;
791 * + the original tuple has been inserted;
792 * + we have write locks on both the old (left half)
793 * and new (right half) buffers, after the split; and
794 * + we know the key we want to insert into the parent
795 * (it's the "high key" on the left child page).
796 *
797 * We're ready to do the parent insertion. We need to hold onto the
798 * locks for the child pages until we locate the parent, but we can
799 * release them before doing the actual insertion (see Lehman and Yao
800 * for the reasoning).
801 *----------
802 */
803 _bt_insert_parent(rel, buf, rbuf, stack, is_root, is_only);
804 }
805 else
806 {
807 Buffer metabuf = InvalidBuffer;
808 Page metapg = NULL;
809 BTMetaPageData *metad = NULL;
810 OffsetNumber itup_off;
811 BlockNumber itup_blkno;
812
813 itup_off = newitemoff;
814 itup_blkno = BufferGetBlockNumber(buf);
815
816 /*
817 * If we are doing this insert because we split a page that was the
818 * only one on its tree level, but was not the root, it may have been
819 * the "fast root". We need to ensure that the fast root link points
820 * at or above the current page. We can safely acquire a lock on the
821 * metapage here --- see comments for _bt_newroot().
822 */
823 if (split_only_page)
824 {
825 Assert(!P_ISLEAF(lpageop));
826
827 metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
828 metapg = BufferGetPage(metabuf);
829 metad = BTPageGetMeta(metapg);
830
831 if (metad->btm_fastlevel >= lpageop->btpo.level)
832 {
833 /* no update wanted */
834 _bt_relbuf(rel, metabuf);
835 metabuf = InvalidBuffer;
836 }
837 }
838
839 /* Do the update. No ereport(ERROR) until changes are logged */
840 START_CRIT_SECTION();
841
842 if (!_bt_pgaddtup(page, itemsz, itup, newitemoff))
843 elog(PANIC, "failed to add new item to block %u in index \"%s\"",
844 itup_blkno, RelationGetRelationName(rel));
845
846 MarkBufferDirty(buf);
847
848 if (BufferIsValid(metabuf))
849 {
850 metad->btm_fastroot = itup_blkno;
851 metad->btm_fastlevel = lpageop->btpo.level;
852 MarkBufferDirty(metabuf);
853 }
854
855 /* clear INCOMPLETE_SPLIT flag on child if inserting a downlink */
856 if (BufferIsValid(cbuf))
857 {
858 Page cpage = BufferGetPage(cbuf);
859 BTPageOpaque cpageop = (BTPageOpaque) PageGetSpecialPointer(cpage);
860
861 Assert(P_INCOMPLETE_SPLIT(cpageop));
862 cpageop->btpo_flags &= ~BTP_INCOMPLETE_SPLIT;
863 MarkBufferDirty(cbuf);
864 }
865
866 /* XLOG stuff */
867 if (RelationNeedsWAL(rel))
868 {
869 xl_btree_insert xlrec;
870 xl_btree_metadata xlmeta;
871 uint8 xlinfo;
872 XLogRecPtr recptr;
873 IndexTupleData trunctuple;
874
875 xlrec.offnum = itup_off;
876
877 XLogBeginInsert();
878 XLogRegisterData((char *) &xlrec, SizeOfBtreeInsert);
879
880 if (P_ISLEAF(lpageop))
881 xlinfo = XLOG_BTREE_INSERT_LEAF;
882 else
883 {
884 /*
885 * Register the left child whose INCOMPLETE_SPLIT flag was
886 * cleared.
887 */
888 XLogRegisterBuffer(1, cbuf, REGBUF_STANDARD);
889
890 xlinfo = XLOG_BTREE_INSERT_UPPER;
891 }
892
893 if (BufferIsValid(metabuf))
894 {
895 xlmeta.root = metad->btm_root;
896 xlmeta.level = metad->btm_level;
897 xlmeta.fastroot = metad->btm_fastroot;
898 xlmeta.fastlevel = metad->btm_fastlevel;
899
900 XLogRegisterBuffer(2, metabuf, REGBUF_WILL_INIT);
901 XLogRegisterBufData(2, (char *) &xlmeta, sizeof(xl_btree_metadata));
902
903 xlinfo = XLOG_BTREE_INSERT_META;
904 }
905
906 /* Read comments in _bt_pgaddtup */
907 XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
908 if (!P_ISLEAF(lpageop) && newitemoff == P_FIRSTDATAKEY(lpageop))
909 {
910 trunctuple = *itup;
911 trunctuple.t_info = sizeof(IndexTupleData);
912 XLogRegisterBufData(0, (char *) &trunctuple,
913 sizeof(IndexTupleData));
914 }
915 else
916 XLogRegisterBufData(0, (char *) itup, IndexTupleDSize(*itup));
917
918 recptr = XLogInsert(RM_BTREE_ID, xlinfo);
919
920 if (BufferIsValid(metabuf))
921 {
922 PageSetLSN(metapg, recptr);
923 }
924 if (BufferIsValid(cbuf))
925 {
926 PageSetLSN(BufferGetPage(cbuf), recptr);
927 }
928
929 PageSetLSN(page, recptr);
930 }
931
932 END_CRIT_SECTION();
933
934 /* release buffers */
935 if (BufferIsValid(metabuf))
936 _bt_relbuf(rel, metabuf);
937 if (BufferIsValid(cbuf))
938 _bt_relbuf(rel, cbuf);
939 _bt_relbuf(rel, buf);
940 }
941 }
942
943 /*
944 * _bt_split() -- split a page in the btree.
945 *
946 * On entry, buf is the page to split, and is pinned and write-locked.
947 * firstright is the item index of the first item to be moved to the
948 * new right page. newitemoff etc. tell us about the new item that
949 * must be inserted along with the data from the old page.
950 *
951 * When splitting a non-leaf page, 'cbuf' is the left-sibling of the
952 * page we're inserting the downlink for. This function will clear the
953 * INCOMPLETE_SPLIT flag on it, and release the buffer.
954 *
955 * Returns the new right sibling of buf, pinned and write-locked.
956 * The pin and lock on buf are maintained.
957 */
958 static Buffer
_bt_split(Relation rel,Buffer buf,Buffer cbuf,OffsetNumber firstright,OffsetNumber newitemoff,Size newitemsz,IndexTuple newitem,bool newitemonleft)959 _bt_split(Relation rel, Buffer buf, Buffer cbuf, OffsetNumber firstright,
960 OffsetNumber newitemoff, Size newitemsz, IndexTuple newitem,
961 bool newitemonleft)
962 {
963 Buffer rbuf;
964 Page origpage;
965 Page leftpage,
966 rightpage;
967 BlockNumber origpagenumber,
968 rightpagenumber;
969 BTPageOpaque ropaque,
970 lopaque,
971 oopaque;
972 Buffer sbuf = InvalidBuffer;
973 Page spage = NULL;
974 BTPageOpaque sopaque = NULL;
975 Size itemsz;
976 ItemId itemid;
977 IndexTuple item;
978 OffsetNumber leftoff,
979 rightoff;
980 OffsetNumber maxoff;
981 OffsetNumber i;
982 bool isroot;
983 bool isleaf;
984
985 /* Acquire a new page to split into */
986 rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
987
988 /*
989 * origpage is the original page to be split. leftpage is a temporary
990 * buffer that receives the left-sibling data, which will be copied back
991 * into origpage on success. rightpage is the new page that receives the
992 * right-sibling data. If we fail before reaching the critical section,
993 * origpage hasn't been modified and leftpage is only workspace. In
994 * principle we shouldn't need to worry about rightpage either, because it
995 * hasn't been linked into the btree page structure; but to avoid leaving
996 * possibly-confusing junk behind, we are careful to rewrite rightpage as
997 * zeroes before throwing any error.
998 */
999 origpage = BufferGetPage(buf);
1000 leftpage = PageGetTempPage(origpage);
1001 rightpage = BufferGetPage(rbuf);
1002
1003 origpagenumber = BufferGetBlockNumber(buf);
1004 rightpagenumber = BufferGetBlockNumber(rbuf);
1005
1006 _bt_pageinit(leftpage, BufferGetPageSize(buf));
1007 /* rightpage was already initialized by _bt_getbuf */
1008
1009 /*
1010 * Copy the original page's LSN into leftpage, which will become the
1011 * updated version of the page. We need this because XLogInsert will
1012 * examine the LSN and possibly dump it in a page image.
1013 */
1014 PageSetLSN(leftpage, PageGetLSN(origpage));
1015
1016 /* init btree private data */
1017 oopaque = (BTPageOpaque) PageGetSpecialPointer(origpage);
1018 lopaque = (BTPageOpaque) PageGetSpecialPointer(leftpage);
1019 ropaque = (BTPageOpaque) PageGetSpecialPointer(rightpage);
1020
1021 isroot = P_ISROOT(oopaque);
1022 isleaf = P_ISLEAF(oopaque);
1023
1024 /* if we're splitting this page, it won't be the root when we're done */
1025 /* also, clear the SPLIT_END and HAS_GARBAGE flags in both pages */
1026 lopaque->btpo_flags = oopaque->btpo_flags;
1027 lopaque->btpo_flags &= ~(BTP_ROOT | BTP_SPLIT_END | BTP_HAS_GARBAGE);
1028 ropaque->btpo_flags = lopaque->btpo_flags;
1029 /* set flag in left page indicating that the right page has no downlink */
1030 lopaque->btpo_flags |= BTP_INCOMPLETE_SPLIT;
1031 lopaque->btpo_prev = oopaque->btpo_prev;
1032 lopaque->btpo_next = rightpagenumber;
1033 ropaque->btpo_prev = origpagenumber;
1034 ropaque->btpo_next = oopaque->btpo_next;
1035 lopaque->btpo.level = ropaque->btpo.level = oopaque->btpo.level;
1036 /* Since we already have write-lock on both pages, ok to read cycleid */
1037 lopaque->btpo_cycleid = _bt_vacuum_cycleid(rel);
1038 ropaque->btpo_cycleid = lopaque->btpo_cycleid;
1039
1040 /*
1041 * If the page we're splitting is not the rightmost page at its level in
1042 * the tree, then the first entry on the page is the high key for the
1043 * page. We need to copy that to the right half. Otherwise (meaning the
1044 * rightmost page case), all the items on the right half will be user
1045 * data.
1046 */
1047 rightoff = P_HIKEY;
1048
1049 if (!P_RIGHTMOST(oopaque))
1050 {
1051 itemid = PageGetItemId(origpage, P_HIKEY);
1052 itemsz = ItemIdGetLength(itemid);
1053 item = (IndexTuple) PageGetItem(origpage, itemid);
1054 if (PageAddItem(rightpage, (Item) item, itemsz, rightoff,
1055 false, false) == InvalidOffsetNumber)
1056 {
1057 memset(rightpage, 0, BufferGetPageSize(rbuf));
1058 elog(ERROR, "failed to add hikey to the right sibling"
1059 " while splitting block %u of index \"%s\"",
1060 origpagenumber, RelationGetRelationName(rel));
1061 }
1062 rightoff = OffsetNumberNext(rightoff);
1063 }
1064
1065 /*
1066 * The "high key" for the new left page will be the first key that's going
1067 * to go into the new right page. This might be either the existing data
1068 * item at position firstright, or the incoming tuple.
1069 */
1070 leftoff = P_HIKEY;
1071 if (!newitemonleft && newitemoff == firstright)
1072 {
1073 /* incoming tuple will become first on right page */
1074 itemsz = newitemsz;
1075 item = newitem;
1076 }
1077 else
1078 {
1079 /* existing item at firstright will become first on right page */
1080 itemid = PageGetItemId(origpage, firstright);
1081 itemsz = ItemIdGetLength(itemid);
1082 item = (IndexTuple) PageGetItem(origpage, itemid);
1083 }
1084 if (PageAddItem(leftpage, (Item) item, itemsz, leftoff,
1085 false, false) == InvalidOffsetNumber)
1086 {
1087 memset(rightpage, 0, BufferGetPageSize(rbuf));
1088 elog(ERROR, "failed to add hikey to the left sibling"
1089 " while splitting block %u of index \"%s\"",
1090 origpagenumber, RelationGetRelationName(rel));
1091 }
1092 leftoff = OffsetNumberNext(leftoff);
1093
1094 /*
1095 * Now transfer all the data items to the appropriate page.
1096 *
1097 * Note: we *must* insert at least the right page's items in item-number
1098 * order, for the benefit of _bt_restore_page().
1099 */
1100 maxoff = PageGetMaxOffsetNumber(origpage);
1101
1102 for (i = P_FIRSTDATAKEY(oopaque); i <= maxoff; i = OffsetNumberNext(i))
1103 {
1104 itemid = PageGetItemId(origpage, i);
1105 itemsz = ItemIdGetLength(itemid);
1106 item = (IndexTuple) PageGetItem(origpage, itemid);
1107
1108 /* does new item belong before this one? */
1109 if (i == newitemoff)
1110 {
1111 if (newitemonleft)
1112 {
1113 if (!_bt_pgaddtup(leftpage, newitemsz, newitem, leftoff))
1114 {
1115 memset(rightpage, 0, BufferGetPageSize(rbuf));
1116 elog(ERROR, "failed to add new item to the left sibling"
1117 " while splitting block %u of index \"%s\"",
1118 origpagenumber, RelationGetRelationName(rel));
1119 }
1120 leftoff = OffsetNumberNext(leftoff);
1121 }
1122 else
1123 {
1124 if (!_bt_pgaddtup(rightpage, newitemsz, newitem, rightoff))
1125 {
1126 memset(rightpage, 0, BufferGetPageSize(rbuf));
1127 elog(ERROR, "failed to add new item to the right sibling"
1128 " while splitting block %u of index \"%s\"",
1129 origpagenumber, RelationGetRelationName(rel));
1130 }
1131 rightoff = OffsetNumberNext(rightoff);
1132 }
1133 }
1134
1135 /* decide which page to put it on */
1136 if (i < firstright)
1137 {
1138 if (!_bt_pgaddtup(leftpage, itemsz, item, leftoff))
1139 {
1140 memset(rightpage, 0, BufferGetPageSize(rbuf));
1141 elog(ERROR, "failed to add old item to the left sibling"
1142 " while splitting block %u of index \"%s\"",
1143 origpagenumber, RelationGetRelationName(rel));
1144 }
1145 leftoff = OffsetNumberNext(leftoff);
1146 }
1147 else
1148 {
1149 if (!_bt_pgaddtup(rightpage, itemsz, item, rightoff))
1150 {
1151 memset(rightpage, 0, BufferGetPageSize(rbuf));
1152 elog(ERROR, "failed to add old item to the right sibling"
1153 " while splitting block %u of index \"%s\"",
1154 origpagenumber, RelationGetRelationName(rel));
1155 }
1156 rightoff = OffsetNumberNext(rightoff);
1157 }
1158 }
1159
1160 /* cope with possibility that newitem goes at the end */
1161 if (i <= newitemoff)
1162 {
1163 /*
1164 * Can't have newitemonleft here; that would imply we were told to put
1165 * *everything* on the left page, which cannot fit (if it could, we'd
1166 * not be splitting the page).
1167 */
1168 Assert(!newitemonleft);
1169 if (!_bt_pgaddtup(rightpage, newitemsz, newitem, rightoff))
1170 {
1171 memset(rightpage, 0, BufferGetPageSize(rbuf));
1172 elog(ERROR, "failed to add new item to the right sibling"
1173 " while splitting block %u of index \"%s\"",
1174 origpagenumber, RelationGetRelationName(rel));
1175 }
1176 rightoff = OffsetNumberNext(rightoff);
1177 }
1178
1179 /*
1180 * We have to grab the right sibling (if any) and fix the prev pointer
1181 * there. We are guaranteed that this is deadlock-free since no other
1182 * writer will be holding a lock on that page and trying to move left, and
1183 * all readers release locks on a page before trying to fetch its
1184 * neighbors.
1185 */
1186
1187 if (!P_RIGHTMOST(oopaque))
1188 {
1189 sbuf = _bt_getbuf(rel, oopaque->btpo_next, BT_WRITE);
1190 spage = BufferGetPage(sbuf);
1191 sopaque = (BTPageOpaque) PageGetSpecialPointer(spage);
1192 if (sopaque->btpo_prev != origpagenumber)
1193 {
1194 memset(rightpage, 0, BufferGetPageSize(rbuf));
1195 elog(ERROR, "right sibling's left-link doesn't match: "
1196 "block %u links to %u instead of expected %u in index \"%s\"",
1197 oopaque->btpo_next, sopaque->btpo_prev, origpagenumber,
1198 RelationGetRelationName(rel));
1199 }
1200
1201 /*
1202 * Check to see if we can set the SPLIT_END flag in the right-hand
1203 * split page; this can save some I/O for vacuum since it need not
1204 * proceed to the right sibling. We can set the flag if the right
1205 * sibling has a different cycleid: that means it could not be part of
1206 * a group of pages that were all split off from the same ancestor
1207 * page. If you're confused, imagine that page A splits to A B and
1208 * then again, yielding A C B, while vacuum is in progress. Tuples
1209 * originally in A could now be in either B or C, hence vacuum must
1210 * examine both pages. But if D, our right sibling, has a different
1211 * cycleid then it could not contain any tuples that were in A when
1212 * the vacuum started.
1213 */
1214 if (sopaque->btpo_cycleid != ropaque->btpo_cycleid)
1215 ropaque->btpo_flags |= BTP_SPLIT_END;
1216 }
1217
1218 /*
1219 * Right sibling is locked, new siblings are prepared, but original page
1220 * is not updated yet.
1221 *
1222 * NO EREPORT(ERROR) till right sibling is updated. We can get away with
1223 * not starting the critical section till here because we haven't been
1224 * scribbling on the original page yet; see comments above.
1225 */
1226 START_CRIT_SECTION();
1227
1228 /*
1229 * By here, the original data page has been split into two new halves, and
1230 * these are correct. The algorithm requires that the left page never
1231 * move during a split, so we copy the new left page back on top of the
1232 * original. Note that this is not a waste of time, since we also require
1233 * (in the page management code) that the center of a page always be
1234 * clean, and the most efficient way to guarantee this is just to compact
1235 * the data by reinserting it into a new left page. (XXX the latter
1236 * comment is probably obsolete; but in any case it's good to not scribble
1237 * on the original page until we enter the critical section.)
1238 *
1239 * We need to do this before writing the WAL record, so that XLogInsert
1240 * can WAL log an image of the page if necessary.
1241 */
1242 PageRestoreTempPage(leftpage, origpage);
1243 /* leftpage, lopaque must not be used below here */
1244
1245 MarkBufferDirty(buf);
1246 MarkBufferDirty(rbuf);
1247
1248 if (!P_RIGHTMOST(ropaque))
1249 {
1250 sopaque->btpo_prev = rightpagenumber;
1251 MarkBufferDirty(sbuf);
1252 }
1253
1254 /*
1255 * Clear INCOMPLETE_SPLIT flag on child if inserting the new item finishes
1256 * a split.
1257 */
1258 if (!isleaf)
1259 {
1260 Page cpage = BufferGetPage(cbuf);
1261 BTPageOpaque cpageop = (BTPageOpaque) PageGetSpecialPointer(cpage);
1262
1263 cpageop->btpo_flags &= ~BTP_INCOMPLETE_SPLIT;
1264 MarkBufferDirty(cbuf);
1265 }
1266
1267 /* XLOG stuff */
1268 if (RelationNeedsWAL(rel))
1269 {
1270 xl_btree_split xlrec;
1271 uint8 xlinfo;
1272 XLogRecPtr recptr;
1273
1274 xlrec.level = ropaque->btpo.level;
1275 xlrec.firstright = firstright;
1276 xlrec.newitemoff = newitemoff;
1277
1278 XLogBeginInsert();
1279 XLogRegisterData((char *) &xlrec, SizeOfBtreeSplit);
1280
1281 XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
1282 XLogRegisterBuffer(1, rbuf, REGBUF_WILL_INIT);
1283 /* Log the right sibling, because we've changed its prev-pointer. */
1284 if (!P_RIGHTMOST(ropaque))
1285 XLogRegisterBuffer(2, sbuf, REGBUF_STANDARD);
1286 if (BufferIsValid(cbuf))
1287 XLogRegisterBuffer(3, cbuf, REGBUF_STANDARD);
1288
1289 /*
1290 * Log the new item, if it was inserted on the left page. (If it was
1291 * put on the right page, we don't need to explicitly WAL log it
1292 * because it's included with all the other items on the right page.)
1293 * Show the new item as belonging to the left page buffer, so that it
1294 * is not stored if XLogInsert decides it needs a full-page image of
1295 * the left page. We store the offset anyway, though, to support
1296 * archive compression of these records.
1297 */
1298 if (newitemonleft)
1299 XLogRegisterBufData(0, (char *) newitem, MAXALIGN(newitemsz));
1300
1301 /* Log left page */
1302 if (!isleaf)
1303 {
1304 /*
1305 * We must also log the left page's high key, because the right
1306 * page's leftmost key is suppressed on non-leaf levels. Show it
1307 * as belonging to the left page buffer, so that it is not stored
1308 * if XLogInsert decides it needs a full-page image of the left
1309 * page.
1310 */
1311 itemid = PageGetItemId(origpage, P_HIKEY);
1312 item = (IndexTuple) PageGetItem(origpage, itemid);
1313 XLogRegisterBufData(0, (char *) item, MAXALIGN(IndexTupleSize(item)));
1314 }
1315
1316 /*
1317 * Log the contents of the right page in the format understood by
1318 * _bt_restore_page(). We set lastrdata->buffer to InvalidBuffer,
1319 * because we're going to recreate the whole page anyway, so it should
1320 * never be stored by XLogInsert.
1321 *
1322 * Direct access to page is not good but faster - we should implement
1323 * some new func in page API. Note we only store the tuples
1324 * themselves, knowing that they were inserted in item-number order
1325 * and so the item pointers can be reconstructed. See comments for
1326 * _bt_restore_page().
1327 */
1328 XLogRegisterBufData(1,
1329 (char *) rightpage + ((PageHeader) rightpage)->pd_upper,
1330 ((PageHeader) rightpage)->pd_special - ((PageHeader) rightpage)->pd_upper);
1331
1332 if (isroot)
1333 xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L_ROOT : XLOG_BTREE_SPLIT_R_ROOT;
1334 else
1335 xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L : XLOG_BTREE_SPLIT_R;
1336
1337 recptr = XLogInsert(RM_BTREE_ID, xlinfo);
1338
1339 PageSetLSN(origpage, recptr);
1340 PageSetLSN(rightpage, recptr);
1341 if (!P_RIGHTMOST(ropaque))
1342 {
1343 PageSetLSN(spage, recptr);
1344 }
1345 if (!isleaf)
1346 {
1347 PageSetLSN(BufferGetPage(cbuf), recptr);
1348 }
1349 }
1350
1351 END_CRIT_SECTION();
1352
1353 /* release the old right sibling */
1354 if (!P_RIGHTMOST(ropaque))
1355 _bt_relbuf(rel, sbuf);
1356
1357 /* release the child */
1358 if (!isleaf)
1359 _bt_relbuf(rel, cbuf);
1360
1361 /* split's done */
1362 return rbuf;
1363 }
1364
1365 /*
1366 * _bt_findsplitloc() -- find an appropriate place to split a page.
1367 *
1368 * The idea here is to equalize the free space that will be on each split
1369 * page, *after accounting for the inserted tuple*. (If we fail to account
1370 * for it, we might find ourselves with too little room on the page that
1371 * it needs to go into!)
1372 *
1373 * If the page is the rightmost page on its level, we instead try to arrange
1374 * to leave the left split page fillfactor% full. In this way, when we are
1375 * inserting successively increasing keys (consider sequences, timestamps,
1376 * etc) we will end up with a tree whose pages are about fillfactor% full,
1377 * instead of the 50% full result that we'd get without this special case.
1378 * This is the same as nbtsort.c produces for a newly-created tree. Note
1379 * that leaf and nonleaf pages use different fillfactors.
1380 *
1381 * We are passed the intended insert position of the new tuple, expressed as
1382 * the offsetnumber of the tuple it must go in front of. (This could be
1383 * maxoff+1 if the tuple is to go at the end.)
1384 *
1385 * We return the index of the first existing tuple that should go on the
1386 * righthand page, plus a boolean indicating whether the new tuple goes on
1387 * the left or right page. The bool is necessary to disambiguate the case
1388 * where firstright == newitemoff.
1389 */
1390 static OffsetNumber
_bt_findsplitloc(Relation rel,Page page,OffsetNumber newitemoff,Size newitemsz,bool * newitemonleft)1391 _bt_findsplitloc(Relation rel,
1392 Page page,
1393 OffsetNumber newitemoff,
1394 Size newitemsz,
1395 bool *newitemonleft)
1396 {
1397 BTPageOpaque opaque;
1398 OffsetNumber offnum;
1399 OffsetNumber maxoff;
1400 ItemId itemid;
1401 FindSplitData state;
1402 int leftspace,
1403 rightspace,
1404 goodenough,
1405 olddataitemstotal,
1406 olddataitemstoleft;
1407 bool goodenoughfound;
1408
1409 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1410
1411 /* Passed-in newitemsz is MAXALIGNED but does not include line pointer */
1412 newitemsz += sizeof(ItemIdData);
1413
1414 /* Total free space available on a btree page, after fixed overhead */
1415 leftspace = rightspace =
1416 PageGetPageSize(page) - SizeOfPageHeaderData -
1417 MAXALIGN(sizeof(BTPageOpaqueData));
1418
1419 /* The right page will have the same high key as the old page */
1420 if (!P_RIGHTMOST(opaque))
1421 {
1422 itemid = PageGetItemId(page, P_HIKEY);
1423 rightspace -= (int) (MAXALIGN(ItemIdGetLength(itemid)) +
1424 sizeof(ItemIdData));
1425 }
1426
1427 /* Count up total space in data items without actually scanning 'em */
1428 olddataitemstotal = rightspace - (int) PageGetExactFreeSpace(page);
1429
1430 state.newitemsz = newitemsz;
1431 state.is_leaf = P_ISLEAF(opaque);
1432 state.is_rightmost = P_RIGHTMOST(opaque);
1433 state.have_split = false;
1434 if (state.is_leaf)
1435 state.fillfactor = RelationGetFillFactor(rel,
1436 BTREE_DEFAULT_FILLFACTOR);
1437 else
1438 state.fillfactor = BTREE_NONLEAF_FILLFACTOR;
1439 state.newitemonleft = false; /* these just to keep compiler quiet */
1440 state.firstright = 0;
1441 state.best_delta = 0;
1442 state.leftspace = leftspace;
1443 state.rightspace = rightspace;
1444 state.olddataitemstotal = olddataitemstotal;
1445 state.newitemoff = newitemoff;
1446
1447 /*
1448 * Finding the best possible split would require checking all the possible
1449 * split points, because of the high-key and left-key special cases.
1450 * That's probably more work than it's worth; instead, stop as soon as we
1451 * find a "good-enough" split, where good-enough is defined as an
1452 * imbalance in free space of no more than pagesize/16 (arbitrary...) This
1453 * should let us stop near the middle on most pages, instead of plowing to
1454 * the end.
1455 */
1456 goodenough = leftspace / 16;
1457
1458 /*
1459 * Scan through the data items and calculate space usage for a split at
1460 * each possible position.
1461 */
1462 olddataitemstoleft = 0;
1463 goodenoughfound = false;
1464 maxoff = PageGetMaxOffsetNumber(page);
1465
1466 for (offnum = P_FIRSTDATAKEY(opaque);
1467 offnum <= maxoff;
1468 offnum = OffsetNumberNext(offnum))
1469 {
1470 Size itemsz;
1471
1472 itemid = PageGetItemId(page, offnum);
1473 itemsz = MAXALIGN(ItemIdGetLength(itemid)) + sizeof(ItemIdData);
1474
1475 /*
1476 * Will the new item go to left or right of split?
1477 */
1478 if (offnum > newitemoff)
1479 _bt_checksplitloc(&state, offnum, true,
1480 olddataitemstoleft, itemsz);
1481
1482 else if (offnum < newitemoff)
1483 _bt_checksplitloc(&state, offnum, false,
1484 olddataitemstoleft, itemsz);
1485 else
1486 {
1487 /* need to try it both ways! */
1488 _bt_checksplitloc(&state, offnum, true,
1489 olddataitemstoleft, itemsz);
1490
1491 _bt_checksplitloc(&state, offnum, false,
1492 olddataitemstoleft, itemsz);
1493 }
1494
1495 /* Abort scan once we find a good-enough choice */
1496 if (state.have_split && state.best_delta <= goodenough)
1497 {
1498 goodenoughfound = true;
1499 break;
1500 }
1501
1502 olddataitemstoleft += itemsz;
1503 }
1504
1505 /*
1506 * If the new item goes as the last item, check for splitting so that all
1507 * the old items go to the left page and the new item goes to the right
1508 * page.
1509 */
1510 if (newitemoff > maxoff && !goodenoughfound)
1511 _bt_checksplitloc(&state, newitemoff, false, olddataitemstotal, 0);
1512
1513 /*
1514 * I believe it is not possible to fail to find a feasible split, but just
1515 * in case ...
1516 */
1517 if (!state.have_split)
1518 elog(ERROR, "could not find a feasible split point for index \"%s\"",
1519 RelationGetRelationName(rel));
1520
1521 *newitemonleft = state.newitemonleft;
1522 return state.firstright;
1523 }
1524
1525 /*
1526 * Subroutine to analyze a particular possible split choice (ie, firstright
1527 * and newitemonleft settings), and record the best split so far in *state.
1528 *
1529 * firstoldonright is the offset of the first item on the original page
1530 * that goes to the right page, and firstoldonrightsz is the size of that
1531 * tuple. firstoldonright can be > max offset, which means that all the old
1532 * items go to the left page and only the new item goes to the right page.
1533 * In that case, firstoldonrightsz is not used.
1534 *
1535 * olddataitemstoleft is the total size of all old items to the left of
1536 * firstoldonright.
1537 */
1538 static void
_bt_checksplitloc(FindSplitData * state,OffsetNumber firstoldonright,bool newitemonleft,int olddataitemstoleft,Size firstoldonrightsz)1539 _bt_checksplitloc(FindSplitData *state,
1540 OffsetNumber firstoldonright,
1541 bool newitemonleft,
1542 int olddataitemstoleft,
1543 Size firstoldonrightsz)
1544 {
1545 int leftfree,
1546 rightfree;
1547 Size firstrightitemsz;
1548 bool newitemisfirstonright;
1549
1550 /* Is the new item going to be the first item on the right page? */
1551 newitemisfirstonright = (firstoldonright == state->newitemoff
1552 && !newitemonleft);
1553
1554 if (newitemisfirstonright)
1555 firstrightitemsz = state->newitemsz;
1556 else
1557 firstrightitemsz = firstoldonrightsz;
1558
1559 /* Account for all the old tuples */
1560 leftfree = state->leftspace - olddataitemstoleft;
1561 rightfree = state->rightspace -
1562 (state->olddataitemstotal - olddataitemstoleft);
1563
1564 /*
1565 * The first item on the right page becomes the high key of the left page;
1566 * therefore it counts against left space as well as right space.
1567 */
1568 leftfree -= firstrightitemsz;
1569
1570 /* account for the new item */
1571 if (newitemonleft)
1572 leftfree -= (int) state->newitemsz;
1573 else
1574 rightfree -= (int) state->newitemsz;
1575
1576 /*
1577 * If we are not on the leaf level, we will be able to discard the key
1578 * data from the first item that winds up on the right page.
1579 */
1580 if (!state->is_leaf)
1581 rightfree += (int) firstrightitemsz -
1582 (int) (MAXALIGN(sizeof(IndexTupleData)) + sizeof(ItemIdData));
1583
1584 /*
1585 * If feasible split point, remember best delta.
1586 */
1587 if (leftfree >= 0 && rightfree >= 0)
1588 {
1589 int delta;
1590
1591 if (state->is_rightmost)
1592 {
1593 /*
1594 * If splitting a rightmost page, try to put (100-fillfactor)% of
1595 * free space on left page. See comments for _bt_findsplitloc.
1596 */
1597 delta = (state->fillfactor * leftfree)
1598 - ((100 - state->fillfactor) * rightfree);
1599 }
1600 else
1601 {
1602 /* Otherwise, aim for equal free space on both sides */
1603 delta = leftfree - rightfree;
1604 }
1605
1606 if (delta < 0)
1607 delta = -delta;
1608 if (!state->have_split || delta < state->best_delta)
1609 {
1610 state->have_split = true;
1611 state->newitemonleft = newitemonleft;
1612 state->firstright = firstoldonright;
1613 state->best_delta = delta;
1614 }
1615 }
1616 }
1617
1618 /*
1619 * _bt_insert_parent() -- Insert downlink into parent after a page split.
1620 *
1621 * On entry, buf and rbuf are the left and right split pages, which we
1622 * still hold write locks on per the L&Y algorithm. We release the
1623 * write locks once we have write lock on the parent page. (Any sooner,
1624 * and it'd be possible for some other process to try to split or delete
1625 * one of these pages, and get confused because it cannot find the downlink.)
1626 *
1627 * stack - stack showing how we got here. May be NULL in cases that don't
1628 * have to be efficient (concurrent ROOT split, WAL recovery)
1629 * is_root - we split the true root
1630 * is_only - we split a page alone on its level (might have been fast root)
1631 */
1632 static void
_bt_insert_parent(Relation rel,Buffer buf,Buffer rbuf,BTStack stack,bool is_root,bool is_only)1633 _bt_insert_parent(Relation rel,
1634 Buffer buf,
1635 Buffer rbuf,
1636 BTStack stack,
1637 bool is_root,
1638 bool is_only)
1639 {
1640 /*
1641 * Here we have to do something Lehman and Yao don't talk about: deal with
1642 * a root split and construction of a new root. If our stack is empty
1643 * then we have just split a node on what had been the root level when we
1644 * descended the tree. If it was still the root then we perform a
1645 * new-root construction. If it *wasn't* the root anymore, search to find
1646 * the next higher level that someone constructed meanwhile, and find the
1647 * right place to insert as for the normal case.
1648 *
1649 * If we have to search for the parent level, we do so by re-descending
1650 * from the root. This is not super-efficient, but it's rare enough not
1651 * to matter.
1652 */
1653 if (is_root)
1654 {
1655 Buffer rootbuf;
1656
1657 Assert(stack == NULL);
1658 Assert(is_only);
1659 /* create a new root node and update the metapage */
1660 rootbuf = _bt_newroot(rel, buf, rbuf);
1661 /* release the split buffers */
1662 _bt_relbuf(rel, rootbuf);
1663 _bt_relbuf(rel, rbuf);
1664 _bt_relbuf(rel, buf);
1665 }
1666 else
1667 {
1668 BlockNumber bknum = BufferGetBlockNumber(buf);
1669 BlockNumber rbknum = BufferGetBlockNumber(rbuf);
1670 Page page = BufferGetPage(buf);
1671 IndexTuple new_item;
1672 BTStackData fakestack;
1673 IndexTuple ritem;
1674 Buffer pbuf;
1675
1676 if (stack == NULL)
1677 {
1678 BTPageOpaque lpageop;
1679
1680 elog(DEBUG2, "concurrent ROOT page split");
1681 lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
1682 /* Find the leftmost page at the next level up */
1683 pbuf = _bt_get_endpoint(rel, lpageop->btpo.level + 1, false,
1684 NULL);
1685 /* Set up a phony stack entry pointing there */
1686 stack = &fakestack;
1687 stack->bts_blkno = BufferGetBlockNumber(pbuf);
1688 stack->bts_offset = InvalidOffsetNumber;
1689 /* bts_btentry will be initialized below */
1690 stack->bts_parent = NULL;
1691 _bt_relbuf(rel, pbuf);
1692 }
1693
1694 /* get high key from left page == lowest key on new right page */
1695 ritem = (IndexTuple) PageGetItem(page,
1696 PageGetItemId(page, P_HIKEY));
1697
1698 /* form an index tuple that points at the new right page */
1699 new_item = CopyIndexTuple(ritem);
1700 ItemPointerSet(&(new_item->t_tid), rbknum, P_HIKEY);
1701
1702 /*
1703 * Find the parent buffer and get the parent page.
1704 *
1705 * Oops - if we were moved right then we need to change stack item! We
1706 * want to find parent pointing to where we are, right ? - vadim
1707 * 05/27/97
1708 */
1709 ItemPointerSet(&(stack->bts_btentry.t_tid), bknum, P_HIKEY);
1710 pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);
1711
1712 /*
1713 * Now we can unlock the right child. The left child will be unlocked
1714 * by _bt_insertonpg().
1715 */
1716 _bt_relbuf(rel, rbuf);
1717
1718 /* Check for error only after writing children */
1719 if (pbuf == InvalidBuffer)
1720 elog(ERROR, "failed to re-find parent key in index \"%s\" for split pages %u/%u",
1721 RelationGetRelationName(rel), bknum, rbknum);
1722
1723 /* Recursively update the parent */
1724 _bt_insertonpg(rel, pbuf, buf, stack->bts_parent,
1725 new_item, stack->bts_offset + 1,
1726 is_only);
1727
1728 /* be tidy */
1729 pfree(new_item);
1730 }
1731 }
1732
1733 /*
1734 * _bt_finish_split() -- Finish an incomplete split
1735 *
1736 * A crash or other failure can leave a split incomplete. The insertion
1737 * routines won't allow to insert on a page that is incompletely split.
1738 * Before inserting on such a page, call _bt_finish_split().
1739 *
1740 * On entry, 'lbuf' must be locked in write-mode. On exit, it is unlocked
1741 * and unpinned.
1742 */
1743 void
_bt_finish_split(Relation rel,Buffer lbuf,BTStack stack)1744 _bt_finish_split(Relation rel, Buffer lbuf, BTStack stack)
1745 {
1746 Page lpage = BufferGetPage(lbuf);
1747 BTPageOpaque lpageop = (BTPageOpaque) PageGetSpecialPointer(lpage);
1748 Buffer rbuf;
1749 Page rpage;
1750 BTPageOpaque rpageop;
1751 bool was_root;
1752 bool was_only;
1753
1754 Assert(P_INCOMPLETE_SPLIT(lpageop));
1755
1756 /* Lock right sibling, the one missing the downlink */
1757 rbuf = _bt_getbuf(rel, lpageop->btpo_next, BT_WRITE);
1758 rpage = BufferGetPage(rbuf);
1759 rpageop = (BTPageOpaque) PageGetSpecialPointer(rpage);
1760
1761 /* Could this be a root split? */
1762 if (!stack)
1763 {
1764 Buffer metabuf;
1765 Page metapg;
1766 BTMetaPageData *metad;
1767
1768 /* acquire lock on the metapage */
1769 metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
1770 metapg = BufferGetPage(metabuf);
1771 metad = BTPageGetMeta(metapg);
1772
1773 was_root = (metad->btm_root == BufferGetBlockNumber(lbuf));
1774
1775 _bt_relbuf(rel, metabuf);
1776 }
1777 else
1778 was_root = false;
1779
1780 /* Was this the only page on the level before split? */
1781 was_only = (P_LEFTMOST(lpageop) && P_RIGHTMOST(rpageop));
1782
1783 elog(DEBUG1, "finishing incomplete split of %u/%u",
1784 BufferGetBlockNumber(lbuf), BufferGetBlockNumber(rbuf));
1785
1786 _bt_insert_parent(rel, lbuf, rbuf, stack, was_root, was_only);
1787 }
1788
1789 /*
1790 * _bt_getstackbuf() -- Walk back up the tree one step, and find the item
1791 * we last looked at in the parent.
1792 *
1793 * This is possible because we save the downlink from the parent item,
1794 * which is enough to uniquely identify it. Insertions into the parent
1795 * level could cause the item to move right; deletions could cause it
1796 * to move left, but not left of the page we previously found it in.
1797 *
1798 * Adjusts bts_blkno & bts_offset if changed.
1799 *
1800 * Returns InvalidBuffer if item not found (should not happen).
1801 */
1802 Buffer
_bt_getstackbuf(Relation rel,BTStack stack,int access)1803 _bt_getstackbuf(Relation rel, BTStack stack, int access)
1804 {
1805 BlockNumber blkno;
1806 OffsetNumber start;
1807
1808 blkno = stack->bts_blkno;
1809 start = stack->bts_offset;
1810
1811 for (;;)
1812 {
1813 Buffer buf;
1814 Page page;
1815 BTPageOpaque opaque;
1816
1817 buf = _bt_getbuf(rel, blkno, access);
1818 page = BufferGetPage(buf);
1819 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1820
1821 if (access == BT_WRITE && P_INCOMPLETE_SPLIT(opaque))
1822 {
1823 _bt_finish_split(rel, buf, stack->bts_parent);
1824 continue;
1825 }
1826
1827 if (!P_IGNORE(opaque))
1828 {
1829 OffsetNumber offnum,
1830 minoff,
1831 maxoff;
1832 ItemId itemid;
1833 IndexTuple item;
1834
1835 minoff = P_FIRSTDATAKEY(opaque);
1836 maxoff = PageGetMaxOffsetNumber(page);
1837
1838 /*
1839 * start = InvalidOffsetNumber means "search the whole page". We
1840 * need this test anyway due to possibility that page has a high
1841 * key now when it didn't before.
1842 */
1843 if (start < minoff)
1844 start = minoff;
1845
1846 /*
1847 * Need this check too, to guard against possibility that page
1848 * split since we visited it originally.
1849 */
1850 if (start > maxoff)
1851 start = OffsetNumberNext(maxoff);
1852
1853 /*
1854 * These loops will check every item on the page --- but in an
1855 * order that's attuned to the probability of where it actually
1856 * is. Scan to the right first, then to the left.
1857 */
1858 for (offnum = start;
1859 offnum <= maxoff;
1860 offnum = OffsetNumberNext(offnum))
1861 {
1862 itemid = PageGetItemId(page, offnum);
1863 item = (IndexTuple) PageGetItem(page, itemid);
1864 if (BTEntrySame(item, &stack->bts_btentry))
1865 {
1866 /* Return accurate pointer to where link is now */
1867 stack->bts_blkno = blkno;
1868 stack->bts_offset = offnum;
1869 return buf;
1870 }
1871 }
1872
1873 for (offnum = OffsetNumberPrev(start);
1874 offnum >= minoff;
1875 offnum = OffsetNumberPrev(offnum))
1876 {
1877 itemid = PageGetItemId(page, offnum);
1878 item = (IndexTuple) PageGetItem(page, itemid);
1879 if (BTEntrySame(item, &stack->bts_btentry))
1880 {
1881 /* Return accurate pointer to where link is now */
1882 stack->bts_blkno = blkno;
1883 stack->bts_offset = offnum;
1884 return buf;
1885 }
1886 }
1887 }
1888
1889 /*
1890 * The item we're looking for moved right at least one page.
1891 */
1892 if (P_RIGHTMOST(opaque))
1893 {
1894 _bt_relbuf(rel, buf);
1895 return InvalidBuffer;
1896 }
1897 blkno = opaque->btpo_next;
1898 start = InvalidOffsetNumber;
1899 _bt_relbuf(rel, buf);
1900 }
1901 }
1902
1903 /*
1904 * _bt_newroot() -- Create a new root page for the index.
1905 *
1906 * We've just split the old root page and need to create a new one.
1907 * In order to do this, we add a new root page to the file, then lock
1908 * the metadata page and update it. This is guaranteed to be deadlock-
1909 * free, because all readers release their locks on the metadata page
1910 * before trying to lock the root, and all writers lock the root before
1911 * trying to lock the metadata page. We have a write lock on the old
1912 * root page, so we have not introduced any cycles into the waits-for
1913 * graph.
1914 *
1915 * On entry, lbuf (the old root) and rbuf (its new peer) are write-
1916 * locked. On exit, a new root page exists with entries for the
1917 * two new children, metapage is updated and unlocked/unpinned.
1918 * The new root buffer is returned to caller which has to unlock/unpin
1919 * lbuf, rbuf & rootbuf.
1920 */
1921 static Buffer
_bt_newroot(Relation rel,Buffer lbuf,Buffer rbuf)1922 _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
1923 {
1924 Buffer rootbuf;
1925 Page lpage,
1926 rootpage;
1927 BlockNumber lbkno,
1928 rbkno;
1929 BlockNumber rootblknum;
1930 BTPageOpaque rootopaque;
1931 BTPageOpaque lopaque;
1932 ItemId itemid;
1933 IndexTuple item;
1934 IndexTuple left_item;
1935 Size left_item_sz;
1936 IndexTuple right_item;
1937 Size right_item_sz;
1938 Buffer metabuf;
1939 Page metapg;
1940 BTMetaPageData *metad;
1941
1942 lbkno = BufferGetBlockNumber(lbuf);
1943 rbkno = BufferGetBlockNumber(rbuf);
1944 lpage = BufferGetPage(lbuf);
1945 lopaque = (BTPageOpaque) PageGetSpecialPointer(lpage);
1946
1947 /* get a new root page */
1948 rootbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
1949 rootpage = BufferGetPage(rootbuf);
1950 rootblknum = BufferGetBlockNumber(rootbuf);
1951
1952 /* acquire lock on the metapage */
1953 metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
1954 metapg = BufferGetPage(metabuf);
1955 metad = BTPageGetMeta(metapg);
1956
1957 /*
1958 * Create downlink item for left page (old root). Since this will be the
1959 * first item in a non-leaf page, it implicitly has minus-infinity key
1960 * value, so we need not store any actual key in it.
1961 */
1962 left_item_sz = sizeof(IndexTupleData);
1963 left_item = (IndexTuple) palloc(left_item_sz);
1964 left_item->t_info = left_item_sz;
1965 ItemPointerSet(&(left_item->t_tid), lbkno, P_HIKEY);
1966
1967 /*
1968 * Create downlink item for right page. The key for it is obtained from
1969 * the "high key" position in the left page.
1970 */
1971 itemid = PageGetItemId(lpage, P_HIKEY);
1972 right_item_sz = ItemIdGetLength(itemid);
1973 item = (IndexTuple) PageGetItem(lpage, itemid);
1974 right_item = CopyIndexTuple(item);
1975 ItemPointerSet(&(right_item->t_tid), rbkno, P_HIKEY);
1976
1977 /* NO EREPORT(ERROR) from here till newroot op is logged */
1978 START_CRIT_SECTION();
1979
1980 /* set btree special data */
1981 rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpage);
1982 rootopaque->btpo_prev = rootopaque->btpo_next = P_NONE;
1983 rootopaque->btpo_flags = BTP_ROOT;
1984 rootopaque->btpo.level =
1985 ((BTPageOpaque) PageGetSpecialPointer(lpage))->btpo.level + 1;
1986 rootopaque->btpo_cycleid = 0;
1987
1988 /* update metapage data */
1989 metad->btm_root = rootblknum;
1990 metad->btm_level = rootopaque->btpo.level;
1991 metad->btm_fastroot = rootblknum;
1992 metad->btm_fastlevel = rootopaque->btpo.level;
1993
1994 /*
1995 * Insert the left page pointer into the new root page. The root page is
1996 * the rightmost page on its level so there is no "high key" in it; the
1997 * two items will go into positions P_HIKEY and P_FIRSTKEY.
1998 *
1999 * Note: we *must* insert the two items in item-number order, for the
2000 * benefit of _bt_restore_page().
2001 */
2002 if (PageAddItem(rootpage, (Item) left_item, left_item_sz, P_HIKEY,
2003 false, false) == InvalidOffsetNumber)
2004 elog(PANIC, "failed to add leftkey to new root page"
2005 " while splitting block %u of index \"%s\"",
2006 BufferGetBlockNumber(lbuf), RelationGetRelationName(rel));
2007
2008 /*
2009 * insert the right page pointer into the new root page.
2010 */
2011 if (PageAddItem(rootpage, (Item) right_item, right_item_sz, P_FIRSTKEY,
2012 false, false) == InvalidOffsetNumber)
2013 elog(PANIC, "failed to add rightkey to new root page"
2014 " while splitting block %u of index \"%s\"",
2015 BufferGetBlockNumber(lbuf), RelationGetRelationName(rel));
2016
2017 /* Clear the incomplete-split flag in the left child */
2018 Assert(P_INCOMPLETE_SPLIT(lopaque));
2019 lopaque->btpo_flags &= ~BTP_INCOMPLETE_SPLIT;
2020 MarkBufferDirty(lbuf);
2021
2022 MarkBufferDirty(rootbuf);
2023 MarkBufferDirty(metabuf);
2024
2025 /* XLOG stuff */
2026 if (RelationNeedsWAL(rel))
2027 {
2028 xl_btree_newroot xlrec;
2029 XLogRecPtr recptr;
2030 xl_btree_metadata md;
2031
2032 xlrec.rootblk = rootblknum;
2033 xlrec.level = metad->btm_level;
2034
2035 XLogBeginInsert();
2036 XLogRegisterData((char *) &xlrec, SizeOfBtreeNewroot);
2037
2038 XLogRegisterBuffer(0, rootbuf, REGBUF_WILL_INIT);
2039 XLogRegisterBuffer(1, lbuf, REGBUF_STANDARD);
2040 XLogRegisterBuffer(2, metabuf, REGBUF_WILL_INIT);
2041
2042 md.root = rootblknum;
2043 md.level = metad->btm_level;
2044 md.fastroot = rootblknum;
2045 md.fastlevel = metad->btm_level;
2046
2047 XLogRegisterBufData(2, (char *) &md, sizeof(xl_btree_metadata));
2048
2049 /*
2050 * Direct access to page is not good but faster - we should implement
2051 * some new func in page API.
2052 */
2053 XLogRegisterBufData(0,
2054 (char *) rootpage + ((PageHeader) rootpage)->pd_upper,
2055 ((PageHeader) rootpage)->pd_special -
2056 ((PageHeader) rootpage)->pd_upper);
2057
2058 recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT);
2059
2060 PageSetLSN(lpage, recptr);
2061 PageSetLSN(rootpage, recptr);
2062 PageSetLSN(metapg, recptr);
2063 }
2064
2065 END_CRIT_SECTION();
2066
2067 /* done with metapage */
2068 _bt_relbuf(rel, metabuf);
2069
2070 pfree(left_item);
2071 pfree(right_item);
2072
2073 return rootbuf;
2074 }
2075
2076 /*
2077 * _bt_pgaddtup() -- add a tuple to a particular page in the index.
2078 *
2079 * This routine adds the tuple to the page as requested. It does
2080 * not affect pin/lock status, but you'd better have a write lock
2081 * and pin on the target buffer! Don't forget to write and release
2082 * the buffer afterwards, either.
2083 *
2084 * The main difference between this routine and a bare PageAddItem call
2085 * is that this code knows that the leftmost index tuple on a non-leaf
2086 * btree page doesn't need to have a key. Therefore, it strips such
2087 * tuples down to just the tuple header. CAUTION: this works ONLY if
2088 * we insert the tuples in order, so that the given itup_off does
2089 * represent the final position of the tuple!
2090 */
2091 static bool
_bt_pgaddtup(Page page,Size itemsize,IndexTuple itup,OffsetNumber itup_off)2092 _bt_pgaddtup(Page page,
2093 Size itemsize,
2094 IndexTuple itup,
2095 OffsetNumber itup_off)
2096 {
2097 BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
2098 IndexTupleData trunctuple;
2099
2100 if (!P_ISLEAF(opaque) && itup_off == P_FIRSTDATAKEY(opaque))
2101 {
2102 trunctuple = *itup;
2103 trunctuple.t_info = sizeof(IndexTupleData);
2104 itup = &trunctuple;
2105 itemsize = sizeof(IndexTupleData);
2106 }
2107
2108 if (PageAddItem(page, (Item) itup, itemsize, itup_off,
2109 false, false) == InvalidOffsetNumber)
2110 return false;
2111
2112 return true;
2113 }
2114
2115 /*
2116 * _bt_isequal - used in _bt_doinsert in check for duplicates.
2117 *
2118 * This is very similar to _bt_compare, except for NULL handling.
2119 * Rule is simple: NOT_NULL not equal NULL, NULL not equal NULL too.
2120 */
2121 static bool
_bt_isequal(TupleDesc itupdesc,Page page,OffsetNumber offnum,int keysz,ScanKey scankey)2122 _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
2123 int keysz, ScanKey scankey)
2124 {
2125 IndexTuple itup;
2126 int i;
2127
2128 /* Better be comparing to a leaf item */
2129 Assert(P_ISLEAF((BTPageOpaque) PageGetSpecialPointer(page)));
2130
2131 itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum));
2132
2133 for (i = 1; i <= keysz; i++)
2134 {
2135 AttrNumber attno;
2136 Datum datum;
2137 bool isNull;
2138 int32 result;
2139
2140 attno = scankey->sk_attno;
2141 Assert(attno == i);
2142 datum = index_getattr(itup, attno, itupdesc, &isNull);
2143
2144 /* NULLs are never equal to anything */
2145 if (isNull || (scankey->sk_flags & SK_ISNULL))
2146 return false;
2147
2148 result = DatumGetInt32(FunctionCall2Coll(&scankey->sk_func,
2149 scankey->sk_collation,
2150 datum,
2151 scankey->sk_argument));
2152
2153 if (result != 0)
2154 return false;
2155
2156 scankey++;
2157 }
2158
2159 /* if we get here, the keys are equal */
2160 return true;
2161 }
2162
2163 /*
2164 * _bt_vacuum_one_page - vacuum just one index page.
2165 *
2166 * Try to remove LP_DEAD items from the given page. The passed buffer
2167 * must be exclusive-locked, but unlike a real VACUUM, we don't need a
2168 * super-exclusive "cleanup" lock (see nbtree/README).
2169 */
2170 static void
_bt_vacuum_one_page(Relation rel,Buffer buffer,Relation heapRel)2171 _bt_vacuum_one_page(Relation rel, Buffer buffer, Relation heapRel)
2172 {
2173 OffsetNumber deletable[MaxOffsetNumber];
2174 int ndeletable = 0;
2175 OffsetNumber offnum,
2176 minoff,
2177 maxoff;
2178 Page page = BufferGetPage(buffer);
2179 BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
2180
2181 /*
2182 * Scan over all items to see which ones need to be deleted according to
2183 * LP_DEAD flags.
2184 */
2185 minoff = P_FIRSTDATAKEY(opaque);
2186 maxoff = PageGetMaxOffsetNumber(page);
2187 for (offnum = minoff;
2188 offnum <= maxoff;
2189 offnum = OffsetNumberNext(offnum))
2190 {
2191 ItemId itemId = PageGetItemId(page, offnum);
2192
2193 if (ItemIdIsDead(itemId))
2194 deletable[ndeletable++] = offnum;
2195 }
2196
2197 if (ndeletable > 0)
2198 _bt_delitems_delete(rel, buffer, deletable, ndeletable, heapRel);
2199
2200 /*
2201 * Note: if we didn't find any LP_DEAD items, then the page's
2202 * BTP_HAS_GARBAGE hint bit is falsely set. We do not bother expending a
2203 * separate write to clear it, however. We will clear it when we split
2204 * the page.
2205 */
2206 }
2207