1 /*-------------------------------------------------------------------------
2 *
3 * nbtinsert.c
4 * Item insertion in Lehman and Yao btrees for Postgres.
5 *
6 * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 *
10 * IDENTIFICATION
11 * src/backend/access/nbtree/nbtinsert.c
12 *
13 *-------------------------------------------------------------------------
14 */
15
16 #include "postgres.h"
17
18 #include "access/heapam.h"
19 #include "access/nbtree.h"
20 #include "access/nbtxlog.h"
21 #include "access/transam.h"
22 #include "access/xloginsert.h"
23 #include "miscadmin.h"
24 #include "storage/lmgr.h"
25 #include "storage/predicate.h"
26 #include "utils/tqual.h"
27
28
29 typedef struct
30 {
31 /* context data for _bt_checksplitloc */
32 Size newitemsz; /* size of new item to be inserted */
33 int fillfactor; /* needed when splitting rightmost page */
34 bool is_leaf; /* T if splitting a leaf page */
35 bool is_rightmost; /* T if splitting a rightmost page */
36 OffsetNumber newitemoff; /* where the new item is to be inserted */
37 int leftspace; /* space available for items on left page */
38 int rightspace; /* space available for items on right page */
39 int olddataitemstotal; /* space taken by old items */
40
41 bool have_split; /* found a valid split? */
42
43 /* these fields valid only if have_split is true */
44 bool newitemonleft; /* new item on left or right of best split */
45 OffsetNumber firstright; /* best split point */
46 int best_delta; /* best size delta so far */
47 } FindSplitData;
48
49
50 static Buffer _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf);
51
52 static TransactionId _bt_check_unique(Relation rel, IndexTuple itup,
53 Relation heapRel, Buffer buf, OffsetNumber offset,
54 ScanKey itup_scankey,
55 IndexUniqueCheck checkUnique, bool *is_unique,
56 uint32 *speculativeToken);
57 static void _bt_findinsertloc(Relation rel,
58 Buffer *bufptr,
59 OffsetNumber *offsetptr,
60 int keysz,
61 ScanKey scankey,
62 IndexTuple newtup,
63 BTStack stack,
64 Relation heapRel);
65 static void _bt_insertonpg(Relation rel, Buffer buf, Buffer cbuf,
66 BTStack stack,
67 IndexTuple itup,
68 OffsetNumber newitemoff,
69 bool split_only_page);
70 static Buffer _bt_split(Relation rel, Buffer buf, Buffer cbuf,
71 OffsetNumber firstright, OffsetNumber newitemoff, Size newitemsz,
72 IndexTuple newitem, bool newitemonleft);
73 static void _bt_insert_parent(Relation rel, Buffer buf, Buffer rbuf,
74 BTStack stack, bool is_root, bool is_only);
75 static OffsetNumber _bt_findsplitloc(Relation rel, Page page,
76 OffsetNumber newitemoff,
77 Size newitemsz,
78 bool *newitemonleft);
79 static void _bt_checksplitloc(FindSplitData *state,
80 OffsetNumber firstoldonright, bool newitemonleft,
81 int dataitemstoleft, Size firstoldonrightsz);
82 static bool _bt_pgaddtup(Page page, Size itemsize, IndexTuple itup,
83 OffsetNumber itup_off);
84 static bool _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
85 int keysz, ScanKey scankey);
86 static void _bt_vacuum_one_page(Relation rel, Buffer buffer, Relation heapRel);
87
88
89 /*
90 * _bt_doinsert() -- Handle insertion of a single index tuple in the tree.
91 *
92 * This routine is called by the public interface routine, btinsert.
93 * By here, itup is filled in, including the TID.
94 *
95 * If checkUnique is UNIQUE_CHECK_NO or UNIQUE_CHECK_PARTIAL, this
96 * will allow duplicates. Otherwise (UNIQUE_CHECK_YES or
97 * UNIQUE_CHECK_EXISTING) it will throw error for a duplicate.
98 * For UNIQUE_CHECK_EXISTING we merely run the duplicate check, and
99 * don't actually insert.
100 *
101 * The result value is only significant for UNIQUE_CHECK_PARTIAL:
102 * it must be TRUE if the entry is known unique, else FALSE.
103 * (In the current implementation we'll also return TRUE after a
104 * successful UNIQUE_CHECK_YES or UNIQUE_CHECK_EXISTING call, but
105 * that's just a coding artifact.)
106 */
107 bool
_bt_doinsert(Relation rel,IndexTuple itup,IndexUniqueCheck checkUnique,Relation heapRel)108 _bt_doinsert(Relation rel, IndexTuple itup,
109 IndexUniqueCheck checkUnique, Relation heapRel)
110 {
111 bool is_unique = false;
112 int natts = rel->rd_rel->relnatts;
113 ScanKey itup_scankey;
114 BTStack stack;
115 Buffer buf;
116 OffsetNumber offset;
117
118 /* we need an insertion scan key to do our search, so build one */
119 itup_scankey = _bt_mkscankey(rel, itup);
120
121 top:
122 /* find the first page containing this key */
123 stack = _bt_search(rel, natts, itup_scankey, false, &buf, BT_WRITE, NULL);
124
125 offset = InvalidOffsetNumber;
126
127 /* trade in our read lock for a write lock */
128 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
129 LockBuffer(buf, BT_WRITE);
130
131 /*
132 * If the page was split between the time that we surrendered our read
133 * lock and acquired our write lock, then this page may no longer be the
134 * right place for the key we want to insert. In this case, we need to
135 * move right in the tree. See Lehman and Yao for an excruciatingly
136 * precise description.
137 */
138 buf = _bt_moveright(rel, buf, natts, itup_scankey, false,
139 true, stack, BT_WRITE, NULL);
140
141 /*
142 * If we're not allowing duplicates, make sure the key isn't already in
143 * the index.
144 *
145 * NOTE: obviously, _bt_check_unique can only detect keys that are already
146 * in the index; so it cannot defend against concurrent insertions of the
147 * same key. We protect against that by means of holding a write lock on
148 * the target page. Any other would-be inserter of the same key must
149 * acquire a write lock on the same target page, so only one would-be
150 * inserter can be making the check at one time. Furthermore, once we are
151 * past the check we hold write locks continuously until we have performed
152 * our insertion, so no later inserter can fail to see our insertion.
153 * (This requires some care in _bt_insertonpg.)
154 *
155 * If we must wait for another xact, we release the lock while waiting,
156 * and then must start over completely.
157 *
158 * For a partial uniqueness check, we don't wait for the other xact. Just
159 * let the tuple in and return false for possibly non-unique, or true for
160 * definitely unique.
161 */
162 if (checkUnique != UNIQUE_CHECK_NO)
163 {
164 TransactionId xwait;
165 uint32 speculativeToken;
166
167 offset = _bt_binsrch(rel, buf, natts, itup_scankey, false);
168 xwait = _bt_check_unique(rel, itup, heapRel, buf, offset, itup_scankey,
169 checkUnique, &is_unique, &speculativeToken);
170
171 if (TransactionIdIsValid(xwait))
172 {
173 /* Have to wait for the other guy ... */
174 _bt_relbuf(rel, buf);
175
176 /*
177 * If it's a speculative insertion, wait for it to finish (ie. to
178 * go ahead with the insertion, or kill the tuple). Otherwise
179 * wait for the transaction to finish as usual.
180 */
181 if (speculativeToken)
182 SpeculativeInsertionWait(xwait, speculativeToken);
183 else
184 XactLockTableWait(xwait, rel, &itup->t_tid, XLTW_InsertIndex);
185
186 /* start over... */
187 _bt_freestack(stack);
188 goto top;
189 }
190 }
191
192 if (checkUnique != UNIQUE_CHECK_EXISTING)
193 {
194 /*
195 * The only conflict predicate locking cares about for indexes is when
196 * an index tuple insert conflicts with an existing lock. Since the
197 * actual location of the insert is hard to predict because of the
198 * random search used to prevent O(N^2) performance when there are
199 * many duplicate entries, we can just use the "first valid" page.
200 */
201 CheckForSerializableConflictIn(rel, NULL, buf);
202 /* do the insertion */
203 _bt_findinsertloc(rel, &buf, &offset, natts, itup_scankey, itup,
204 stack, heapRel);
205 _bt_insertonpg(rel, buf, InvalidBuffer, stack, itup, offset, false);
206 }
207 else
208 {
209 /* just release the buffer */
210 _bt_relbuf(rel, buf);
211 }
212
213 /* be tidy */
214 _bt_freestack(stack);
215 _bt_freeskey(itup_scankey);
216
217 return is_unique;
218 }
219
220 /*
221 * _bt_check_unique() -- Check for violation of unique index constraint
222 *
223 * offset points to the first possible item that could conflict. It can
224 * also point to end-of-page, which means that the first tuple to check
225 * is the first tuple on the next page.
226 *
227 * Returns InvalidTransactionId if there is no conflict, else an xact ID
228 * we must wait for to see if it commits a conflicting tuple. If an actual
229 * conflict is detected, no return --- just ereport(). If an xact ID is
230 * returned, and the conflicting tuple still has a speculative insertion in
231 * progress, *speculativeToken is set to non-zero, and the caller can wait for
232 * the verdict on the insertion using SpeculativeInsertionWait().
233 *
234 * However, if checkUnique == UNIQUE_CHECK_PARTIAL, we always return
235 * InvalidTransactionId because we don't want to wait. In this case we
236 * set *is_unique to false if there is a potential conflict, and the
237 * core code must redo the uniqueness check later.
238 */
239 static TransactionId
_bt_check_unique(Relation rel,IndexTuple itup,Relation heapRel,Buffer buf,OffsetNumber offset,ScanKey itup_scankey,IndexUniqueCheck checkUnique,bool * is_unique,uint32 * speculativeToken)240 _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
241 Buffer buf, OffsetNumber offset, ScanKey itup_scankey,
242 IndexUniqueCheck checkUnique, bool *is_unique,
243 uint32 *speculativeToken)
244 {
245 TupleDesc itupdesc = RelationGetDescr(rel);
246 int natts = rel->rd_rel->relnatts;
247 SnapshotData SnapshotDirty;
248 OffsetNumber maxoff;
249 Page page;
250 BTPageOpaque opaque;
251 Buffer nbuf = InvalidBuffer;
252 bool found = false;
253
254 /* Assume unique until we find a duplicate */
255 *is_unique = true;
256
257 InitDirtySnapshot(SnapshotDirty);
258
259 page = BufferGetPage(buf);
260 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
261 maxoff = PageGetMaxOffsetNumber(page);
262
263 /*
264 * Scan over all equal tuples, looking for live conflicts.
265 */
266 for (;;)
267 {
268 ItemId curitemid;
269 IndexTuple curitup;
270 BlockNumber nblkno;
271
272 /*
273 * make sure the offset points to an actual item before trying to
274 * examine it...
275 */
276 if (offset <= maxoff)
277 {
278 curitemid = PageGetItemId(page, offset);
279
280 /*
281 * We can skip items that are marked killed.
282 *
283 * Formerly, we applied _bt_isequal() before checking the kill
284 * flag, so as to fall out of the item loop as soon as possible.
285 * However, in the presence of heavy update activity an index may
286 * contain many killed items with the same key; running
287 * _bt_isequal() on each killed item gets expensive. Furthermore
288 * it is likely that the non-killed version of each key appears
289 * first, so that we didn't actually get to exit any sooner
290 * anyway. So now we just advance over killed items as quickly as
291 * we can. We only apply _bt_isequal() when we get to a non-killed
292 * item or the end of the page.
293 */
294 if (!ItemIdIsDead(curitemid))
295 {
296 ItemPointerData htid;
297 bool all_dead;
298
299 /*
300 * _bt_compare returns 0 for (1,NULL) and (1,NULL) - this's
301 * how we handling NULLs - and so we must not use _bt_compare
302 * in real comparison, but only for ordering/finding items on
303 * pages. - vadim 03/24/97
304 */
305 if (!_bt_isequal(itupdesc, page, offset, natts, itup_scankey))
306 break; /* we're past all the equal tuples */
307
308 /* okay, we gotta fetch the heap tuple ... */
309 curitup = (IndexTuple) PageGetItem(page, curitemid);
310 htid = curitup->t_tid;
311
312 /*
313 * If we are doing a recheck, we expect to find the tuple we
314 * are rechecking. It's not a duplicate, but we have to keep
315 * scanning.
316 */
317 if (checkUnique == UNIQUE_CHECK_EXISTING &&
318 ItemPointerCompare(&htid, &itup->t_tid) == 0)
319 {
320 found = true;
321 }
322
323 /*
324 * We check the whole HOT-chain to see if there is any tuple
325 * that satisfies SnapshotDirty. This is necessary because we
326 * have just a single index entry for the entire chain.
327 */
328 else if (heap_hot_search(&htid, heapRel, &SnapshotDirty,
329 &all_dead))
330 {
331 TransactionId xwait;
332
333 /*
334 * It is a duplicate. If we are only doing a partial
335 * check, then don't bother checking if the tuple is being
336 * updated in another transaction. Just return the fact
337 * that it is a potential conflict and leave the full
338 * check till later.
339 */
340 if (checkUnique == UNIQUE_CHECK_PARTIAL)
341 {
342 if (nbuf != InvalidBuffer)
343 _bt_relbuf(rel, nbuf);
344 *is_unique = false;
345 return InvalidTransactionId;
346 }
347
348 /*
349 * If this tuple is being updated by other transaction
350 * then we have to wait for its commit/abort.
351 */
352 xwait = (TransactionIdIsValid(SnapshotDirty.xmin)) ?
353 SnapshotDirty.xmin : SnapshotDirty.xmax;
354
355 if (TransactionIdIsValid(xwait))
356 {
357 if (nbuf != InvalidBuffer)
358 _bt_relbuf(rel, nbuf);
359 /* Tell _bt_doinsert to wait... */
360 *speculativeToken = SnapshotDirty.speculativeToken;
361 return xwait;
362 }
363
364 /*
365 * Otherwise we have a definite conflict. But before
366 * complaining, look to see if the tuple we want to insert
367 * is itself now committed dead --- if so, don't complain.
368 * This is a waste of time in normal scenarios but we must
369 * do it to support CREATE INDEX CONCURRENTLY.
370 *
371 * We must follow HOT-chains here because during
372 * concurrent index build, we insert the root TID though
373 * the actual tuple may be somewhere in the HOT-chain.
374 * While following the chain we might not stop at the
375 * exact tuple which triggered the insert, but that's OK
376 * because if we find a live tuple anywhere in this chain,
377 * we have a unique key conflict. The other live tuple is
378 * not part of this chain because it had a different index
379 * entry.
380 */
381 htid = itup->t_tid;
382 if (heap_hot_search(&htid, heapRel, SnapshotSelf, NULL))
383 {
384 /* Normal case --- it's still live */
385 }
386 else
387 {
388 /*
389 * It's been deleted, so no error, and no need to
390 * continue searching
391 */
392 break;
393 }
394
395 /*
396 * Check for a conflict-in as we would if we were going to
397 * write to this page. We aren't actually going to write,
398 * but we want a chance to report SSI conflicts that would
399 * otherwise be masked by this unique constraint
400 * violation.
401 */
402 CheckForSerializableConflictIn(rel, NULL, buf);
403
404 /*
405 * This is a definite conflict. Break the tuple down into
406 * datums and report the error. But first, make sure we
407 * release the buffer locks we're holding ---
408 * BuildIndexValueDescription could make catalog accesses,
409 * which in the worst case might touch this same index and
410 * cause deadlocks.
411 */
412 if (nbuf != InvalidBuffer)
413 _bt_relbuf(rel, nbuf);
414 _bt_relbuf(rel, buf);
415
416 {
417 Datum values[INDEX_MAX_KEYS];
418 bool isnull[INDEX_MAX_KEYS];
419 char *key_desc;
420
421 index_deform_tuple(itup, RelationGetDescr(rel),
422 values, isnull);
423
424 key_desc = BuildIndexValueDescription(rel, values,
425 isnull);
426
427 ereport(ERROR,
428 (errcode(ERRCODE_UNIQUE_VIOLATION),
429 errmsg("duplicate key value violates unique constraint \"%s\"",
430 RelationGetRelationName(rel)),
431 key_desc ? errdetail("Key %s already exists.",
432 key_desc) : 0,
433 errtableconstraint(heapRel,
434 RelationGetRelationName(rel))));
435 }
436 }
437 else if (all_dead)
438 {
439 /*
440 * The conflicting tuple (or whole HOT chain) is dead to
441 * everyone, so we may as well mark the index entry
442 * killed.
443 */
444 ItemIdMarkDead(curitemid);
445 opaque->btpo_flags |= BTP_HAS_GARBAGE;
446
447 /*
448 * Mark buffer with a dirty hint, since state is not
449 * crucial. Be sure to mark the proper buffer dirty.
450 */
451 if (nbuf != InvalidBuffer)
452 MarkBufferDirtyHint(nbuf, true);
453 else
454 MarkBufferDirtyHint(buf, true);
455 }
456 }
457 }
458
459 /*
460 * Advance to next tuple to continue checking.
461 */
462 if (offset < maxoff)
463 offset = OffsetNumberNext(offset);
464 else
465 {
466 /* If scankey == hikey we gotta check the next page too */
467 if (P_RIGHTMOST(opaque))
468 break;
469 if (!_bt_isequal(itupdesc, page, P_HIKEY,
470 natts, itup_scankey))
471 break;
472 /* Advance to next non-dead page --- there must be one */
473 for (;;)
474 {
475 nblkno = opaque->btpo_next;
476 nbuf = _bt_relandgetbuf(rel, nbuf, nblkno, BT_READ);
477 page = BufferGetPage(nbuf);
478 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
479 if (!P_IGNORE(opaque))
480 break;
481 if (P_RIGHTMOST(opaque))
482 elog(ERROR, "fell off the end of index \"%s\"",
483 RelationGetRelationName(rel));
484 }
485 maxoff = PageGetMaxOffsetNumber(page);
486 offset = P_FIRSTDATAKEY(opaque);
487 }
488 }
489
490 /*
491 * If we are doing a recheck then we should have found the tuple we are
492 * checking. Otherwise there's something very wrong --- probably, the
493 * index is on a non-immutable expression.
494 */
495 if (checkUnique == UNIQUE_CHECK_EXISTING && !found)
496 ereport(ERROR,
497 (errcode(ERRCODE_INTERNAL_ERROR),
498 errmsg("failed to re-find tuple within index \"%s\"",
499 RelationGetRelationName(rel)),
500 errhint("This may be because of a non-immutable index expression."),
501 errtableconstraint(heapRel,
502 RelationGetRelationName(rel))));
503
504 if (nbuf != InvalidBuffer)
505 _bt_relbuf(rel, nbuf);
506
507 return InvalidTransactionId;
508 }
509
510
511 /*
512 * _bt_findinsertloc() -- Finds an insert location for a tuple
513 *
514 * If the new key is equal to one or more existing keys, we can
515 * legitimately place it anywhere in the series of equal keys --- in fact,
516 * if the new key is equal to the page's "high key" we can place it on
517 * the next page. If it is equal to the high key, and there's not room
518 * to insert the new tuple on the current page without splitting, then
519 * we can move right hoping to find more free space and avoid a split.
520 * (We should not move right indefinitely, however, since that leads to
521 * O(N^2) insertion behavior in the presence of many equal keys.)
522 * Once we have chosen the page to put the key on, we'll insert it before
523 * any existing equal keys because of the way _bt_binsrch() works.
524 *
525 * If there's not enough room in the space, we try to make room by
526 * removing any LP_DEAD tuples.
527 *
528 * On entry, *bufptr and *offsetptr point to the first legal position
529 * where the new tuple could be inserted. The caller should hold an
530 * exclusive lock on *bufptr. *offsetptr can also be set to
531 * InvalidOffsetNumber, in which case the function will search for the
532 * right location within the page if needed. On exit, they point to the
533 * chosen insert location. If _bt_findinsertloc decides to move right,
534 * the lock and pin on the original page will be released and the new
535 * page returned to the caller is exclusively locked instead.
536 *
537 * newtup is the new tuple we're inserting, and scankey is an insertion
538 * type scan key for it.
539 */
540 static void
_bt_findinsertloc(Relation rel,Buffer * bufptr,OffsetNumber * offsetptr,int keysz,ScanKey scankey,IndexTuple newtup,BTStack stack,Relation heapRel)541 _bt_findinsertloc(Relation rel,
542 Buffer *bufptr,
543 OffsetNumber *offsetptr,
544 int keysz,
545 ScanKey scankey,
546 IndexTuple newtup,
547 BTStack stack,
548 Relation heapRel)
549 {
550 Buffer buf = *bufptr;
551 Page page = BufferGetPage(buf);
552 Size itemsz;
553 BTPageOpaque lpageop;
554 bool movedright,
555 vacuumed;
556 OffsetNumber newitemoff;
557 OffsetNumber firstlegaloff = *offsetptr;
558
559 lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
560
561 itemsz = IndexTupleDSize(*newtup);
562 itemsz = MAXALIGN(itemsz); /* be safe, PageAddItem will do this but we
563 * need to be consistent */
564
565 /*
566 * Check whether the item can fit on a btree page at all. (Eventually, we
567 * ought to try to apply TOAST methods if not.) We actually need to be
568 * able to fit three items on every page, so restrict any one item to 1/3
569 * the per-page available space. Note that at this point, itemsz doesn't
570 * include the ItemId.
571 *
572 * NOTE: if you change this, see also the similar code in _bt_buildadd().
573 */
574 if (itemsz > BTMaxItemSize(page))
575 ereport(ERROR,
576 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
577 errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
578 itemsz, BTMaxItemSize(page),
579 RelationGetRelationName(rel)),
580 errhint("Values larger than 1/3 of a buffer page cannot be indexed.\n"
581 "Consider a function index of an MD5 hash of the value, "
582 "or use full text indexing."),
583 errtableconstraint(heapRel,
584 RelationGetRelationName(rel))));
585
586 /*----------
587 * If we will need to split the page to put the item on this page,
588 * check whether we can put the tuple somewhere to the right,
589 * instead. Keep scanning right until we
590 * (a) find a page with enough free space,
591 * (b) reach the last page where the tuple can legally go, or
592 * (c) get tired of searching.
593 * (c) is not flippant; it is important because if there are many
594 * pages' worth of equal keys, it's better to split one of the early
595 * pages than to scan all the way to the end of the run of equal keys
596 * on every insert. We implement "get tired" as a random choice,
597 * since stopping after scanning a fixed number of pages wouldn't work
598 * well (we'd never reach the right-hand side of previously split
599 * pages). Currently the probability of moving right is set at 0.99,
600 * which may seem too high to change the behavior much, but it does an
601 * excellent job of preventing O(N^2) behavior with many equal keys.
602 *----------
603 */
604 movedright = false;
605 vacuumed = false;
606 while (PageGetFreeSpace(page) < itemsz)
607 {
608 Buffer rbuf;
609 BlockNumber rblkno;
610
611 /*
612 * before considering moving right, see if we can obtain enough space
613 * by erasing LP_DEAD items
614 */
615 if (P_ISLEAF(lpageop) && P_HAS_GARBAGE(lpageop))
616 {
617 _bt_vacuum_one_page(rel, buf, heapRel);
618
619 /*
620 * remember that we vacuumed this page, because that makes the
621 * hint supplied by the caller invalid
622 */
623 vacuumed = true;
624
625 if (PageGetFreeSpace(page) >= itemsz)
626 break; /* OK, now we have enough space */
627 }
628
629 /*
630 * nope, so check conditions (b) and (c) enumerated above
631 */
632 if (P_RIGHTMOST(lpageop) ||
633 _bt_compare(rel, keysz, scankey, page, P_HIKEY) != 0 ||
634 random() <= (MAX_RANDOM_VALUE / 100))
635 break;
636
637 /*
638 * step right to next non-dead page
639 *
640 * must write-lock that page before releasing write lock on current
641 * page; else someone else's _bt_check_unique scan could fail to see
642 * our insertion. write locks on intermediate dead pages won't do
643 * because we don't know when they will get de-linked from the tree.
644 */
645 rbuf = InvalidBuffer;
646
647 rblkno = lpageop->btpo_next;
648 for (;;)
649 {
650 rbuf = _bt_relandgetbuf(rel, rbuf, rblkno, BT_WRITE);
651 page = BufferGetPage(rbuf);
652 lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
653
654 /*
655 * If this page was incompletely split, finish the split now. We
656 * do this while holding a lock on the left sibling, which is not
657 * good because finishing the split could be a fairly lengthy
658 * operation. But this should happen very seldom.
659 */
660 if (P_INCOMPLETE_SPLIT(lpageop))
661 {
662 _bt_finish_split(rel, rbuf, stack);
663 rbuf = InvalidBuffer;
664 continue;
665 }
666
667 if (!P_IGNORE(lpageop))
668 break;
669 if (P_RIGHTMOST(lpageop))
670 elog(ERROR, "fell off the end of index \"%s\"",
671 RelationGetRelationName(rel));
672
673 rblkno = lpageop->btpo_next;
674 }
675 _bt_relbuf(rel, buf);
676 buf = rbuf;
677 movedright = true;
678 vacuumed = false;
679 }
680
681 /*
682 * Now we are on the right page, so find the insert position. If we moved
683 * right at all, we know we should insert at the start of the page. If we
684 * didn't move right, we can use the firstlegaloff hint if the caller
685 * supplied one, unless we vacuumed the page which might have moved tuples
686 * around making the hint invalid. If we didn't move right or can't use
687 * the hint, find the position by searching.
688 */
689 if (movedright)
690 newitemoff = P_FIRSTDATAKEY(lpageop);
691 else if (firstlegaloff != InvalidOffsetNumber && !vacuumed)
692 newitemoff = firstlegaloff;
693 else
694 newitemoff = _bt_binsrch(rel, buf, keysz, scankey, false);
695
696 *bufptr = buf;
697 *offsetptr = newitemoff;
698 }
699
700 /*----------
701 * _bt_insertonpg() -- Insert a tuple on a particular page in the index.
702 *
703 * This recursive procedure does the following things:
704 *
705 * + if necessary, splits the target page (making sure that the
706 * split is equitable as far as post-insert free space goes).
707 * + inserts the tuple.
708 * + if the page was split, pops the parent stack, and finds the
709 * right place to insert the new child pointer (by walking
710 * right using information stored in the parent stack).
711 * + invokes itself with the appropriate tuple for the right
712 * child page on the parent.
713 * + updates the metapage if a true root or fast root is split.
714 *
715 * On entry, we must have the correct buffer in which to do the
716 * insertion, and the buffer must be pinned and write-locked. On return,
717 * we will have dropped both the pin and the lock on the buffer.
718 *
719 * When inserting to a non-leaf page, 'cbuf' is the left-sibling of the
720 * page we're inserting the downlink for. This function will clear the
721 * INCOMPLETE_SPLIT flag on it, and release the buffer.
722 *
723 * The locking interactions in this code are critical. You should
724 * grok Lehman and Yao's paper before making any changes. In addition,
725 * you need to understand how we disambiguate duplicate keys in this
726 * implementation, in order to be able to find our location using
727 * L&Y "move right" operations. Since we may insert duplicate user
728 * keys, and since these dups may propagate up the tree, we use the
729 * 'afteritem' parameter to position ourselves correctly for the
730 * insertion on internal pages.
731 *----------
732 */
733 static void
_bt_insertonpg(Relation rel,Buffer buf,Buffer cbuf,BTStack stack,IndexTuple itup,OffsetNumber newitemoff,bool split_only_page)734 _bt_insertonpg(Relation rel,
735 Buffer buf,
736 Buffer cbuf,
737 BTStack stack,
738 IndexTuple itup,
739 OffsetNumber newitemoff,
740 bool split_only_page)
741 {
742 Page page;
743 BTPageOpaque lpageop;
744 OffsetNumber firstright = InvalidOffsetNumber;
745 Size itemsz;
746
747 page = BufferGetPage(buf);
748 lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
749
750 /* child buffer must be given iff inserting on an internal page */
751 Assert(P_ISLEAF(lpageop) == !BufferIsValid(cbuf));
752
753 /* The caller should've finished any incomplete splits already. */
754 if (P_INCOMPLETE_SPLIT(lpageop))
755 elog(ERROR, "cannot insert to incompletely split page %u",
756 BufferGetBlockNumber(buf));
757
758 itemsz = IndexTupleDSize(*itup);
759 itemsz = MAXALIGN(itemsz); /* be safe, PageAddItem will do this but we
760 * need to be consistent */
761
762 /*
763 * Do we need to split the page to fit the item on it?
764 *
765 * Note: PageGetFreeSpace() subtracts sizeof(ItemIdData) from its result,
766 * so this comparison is correct even though we appear to be accounting
767 * only for the item and not for its line pointer.
768 */
769 if (PageGetFreeSpace(page) < itemsz)
770 {
771 bool is_root = P_ISROOT(lpageop);
772 bool is_only = P_LEFTMOST(lpageop) && P_RIGHTMOST(lpageop);
773 bool newitemonleft;
774 Buffer rbuf;
775
776 /* Choose the split point */
777 firstright = _bt_findsplitloc(rel, page,
778 newitemoff, itemsz,
779 &newitemonleft);
780
781 /* split the buffer into left and right halves */
782 rbuf = _bt_split(rel, buf, cbuf, firstright,
783 newitemoff, itemsz, itup, newitemonleft);
784 PredicateLockPageSplit(rel,
785 BufferGetBlockNumber(buf),
786 BufferGetBlockNumber(rbuf));
787
788 /*----------
789 * By here,
790 *
791 * + our target page has been split;
792 * + the original tuple has been inserted;
793 * + we have write locks on both the old (left half)
794 * and new (right half) buffers, after the split; and
795 * + we know the key we want to insert into the parent
796 * (it's the "high key" on the left child page).
797 *
798 * We're ready to do the parent insertion. We need to hold onto the
799 * locks for the child pages until we locate the parent, but we can
800 * release them before doing the actual insertion (see Lehman and Yao
801 * for the reasoning).
802 *----------
803 */
804 _bt_insert_parent(rel, buf, rbuf, stack, is_root, is_only);
805 }
806 else
807 {
808 Buffer metabuf = InvalidBuffer;
809 Page metapg = NULL;
810 BTMetaPageData *metad = NULL;
811 OffsetNumber itup_off;
812 BlockNumber itup_blkno;
813
814 itup_off = newitemoff;
815 itup_blkno = BufferGetBlockNumber(buf);
816
817 /*
818 * If we are doing this insert because we split a page that was the
819 * only one on its tree level, but was not the root, it may have been
820 * the "fast root". We need to ensure that the fast root link points
821 * at or above the current page. We can safely acquire a lock on the
822 * metapage here --- see comments for _bt_newroot().
823 */
824 if (split_only_page)
825 {
826 Assert(!P_ISLEAF(lpageop));
827
828 metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
829 metapg = BufferGetPage(metabuf);
830 metad = BTPageGetMeta(metapg);
831
832 if (metad->btm_fastlevel >= lpageop->btpo.level)
833 {
834 /* no update wanted */
835 _bt_relbuf(rel, metabuf);
836 metabuf = InvalidBuffer;
837 }
838 }
839
840 /* Do the update. No ereport(ERROR) until changes are logged */
841 START_CRIT_SECTION();
842
843 if (!_bt_pgaddtup(page, itemsz, itup, newitemoff))
844 elog(PANIC, "failed to add new item to block %u in index \"%s\"",
845 itup_blkno, RelationGetRelationName(rel));
846
847 MarkBufferDirty(buf);
848
849 if (BufferIsValid(metabuf))
850 {
851 metad->btm_fastroot = itup_blkno;
852 metad->btm_fastlevel = lpageop->btpo.level;
853 MarkBufferDirty(metabuf);
854 }
855
856 /* clear INCOMPLETE_SPLIT flag on child if inserting a downlink */
857 if (BufferIsValid(cbuf))
858 {
859 Page cpage = BufferGetPage(cbuf);
860 BTPageOpaque cpageop = (BTPageOpaque) PageGetSpecialPointer(cpage);
861
862 Assert(P_INCOMPLETE_SPLIT(cpageop));
863 cpageop->btpo_flags &= ~BTP_INCOMPLETE_SPLIT;
864 MarkBufferDirty(cbuf);
865 }
866
867 /* XLOG stuff */
868 if (RelationNeedsWAL(rel))
869 {
870 xl_btree_insert xlrec;
871 xl_btree_metadata xlmeta;
872 uint8 xlinfo;
873 XLogRecPtr recptr;
874 IndexTupleData trunctuple;
875
876 xlrec.offnum = itup_off;
877
878 XLogBeginInsert();
879 XLogRegisterData((char *) &xlrec, SizeOfBtreeInsert);
880
881 if (P_ISLEAF(lpageop))
882 xlinfo = XLOG_BTREE_INSERT_LEAF;
883 else
884 {
885 /*
886 * Register the left child whose INCOMPLETE_SPLIT flag was
887 * cleared.
888 */
889 XLogRegisterBuffer(1, cbuf, REGBUF_STANDARD);
890
891 xlinfo = XLOG_BTREE_INSERT_UPPER;
892 }
893
894 if (BufferIsValid(metabuf))
895 {
896 xlmeta.root = metad->btm_root;
897 xlmeta.level = metad->btm_level;
898 xlmeta.fastroot = metad->btm_fastroot;
899 xlmeta.fastlevel = metad->btm_fastlevel;
900
901 XLogRegisterBuffer(2, metabuf, REGBUF_WILL_INIT);
902 XLogRegisterBufData(2, (char *) &xlmeta, sizeof(xl_btree_metadata));
903
904 xlinfo = XLOG_BTREE_INSERT_META;
905 }
906
907 /* Read comments in _bt_pgaddtup */
908 XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
909 if (!P_ISLEAF(lpageop) && newitemoff == P_FIRSTDATAKEY(lpageop))
910 {
911 trunctuple = *itup;
912 trunctuple.t_info = sizeof(IndexTupleData);
913 XLogRegisterBufData(0, (char *) &trunctuple,
914 sizeof(IndexTupleData));
915 }
916 else
917 XLogRegisterBufData(0, (char *) itup, IndexTupleDSize(*itup));
918
919 recptr = XLogInsert(RM_BTREE_ID, xlinfo);
920
921 if (BufferIsValid(metabuf))
922 {
923 PageSetLSN(metapg, recptr);
924 }
925 if (BufferIsValid(cbuf))
926 {
927 PageSetLSN(BufferGetPage(cbuf), recptr);
928 }
929
930 PageSetLSN(page, recptr);
931 }
932
933 END_CRIT_SECTION();
934
935 /* release buffers */
936 if (BufferIsValid(metabuf))
937 _bt_relbuf(rel, metabuf);
938 if (BufferIsValid(cbuf))
939 _bt_relbuf(rel, cbuf);
940 _bt_relbuf(rel, buf);
941 }
942 }
943
944 /*
945 * _bt_split() -- split a page in the btree.
946 *
947 * On entry, buf is the page to split, and is pinned and write-locked.
948 * firstright is the item index of the first item to be moved to the
949 * new right page. newitemoff etc. tell us about the new item that
950 * must be inserted along with the data from the old page.
951 *
952 * When splitting a non-leaf page, 'cbuf' is the left-sibling of the
953 * page we're inserting the downlink for. This function will clear the
954 * INCOMPLETE_SPLIT flag on it, and release the buffer.
955 *
956 * Returns the new right sibling of buf, pinned and write-locked.
957 * The pin and lock on buf are maintained.
958 */
959 static Buffer
_bt_split(Relation rel,Buffer buf,Buffer cbuf,OffsetNumber firstright,OffsetNumber newitemoff,Size newitemsz,IndexTuple newitem,bool newitemonleft)960 _bt_split(Relation rel, Buffer buf, Buffer cbuf, OffsetNumber firstright,
961 OffsetNumber newitemoff, Size newitemsz, IndexTuple newitem,
962 bool newitemonleft)
963 {
964 Buffer rbuf;
965 Page origpage;
966 Page leftpage,
967 rightpage;
968 BlockNumber origpagenumber,
969 rightpagenumber;
970 BTPageOpaque ropaque,
971 lopaque,
972 oopaque;
973 Buffer sbuf = InvalidBuffer;
974 Page spage = NULL;
975 BTPageOpaque sopaque = NULL;
976 Size itemsz;
977 ItemId itemid;
978 IndexTuple item;
979 OffsetNumber leftoff,
980 rightoff;
981 OffsetNumber maxoff;
982 OffsetNumber i;
983 bool isroot;
984 bool isleaf;
985
986 /* Acquire a new page to split into */
987 rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
988
989 /*
990 * origpage is the original page to be split. leftpage is a temporary
991 * buffer that receives the left-sibling data, which will be copied back
992 * into origpage on success. rightpage is the new page that receives the
993 * right-sibling data. If we fail before reaching the critical section,
994 * origpage hasn't been modified and leftpage is only workspace. In
995 * principle we shouldn't need to worry about rightpage either, because it
996 * hasn't been linked into the btree page structure; but to avoid leaving
997 * possibly-confusing junk behind, we are careful to rewrite rightpage as
998 * zeroes before throwing any error.
999 */
1000 origpage = BufferGetPage(buf);
1001 leftpage = PageGetTempPage(origpage);
1002 rightpage = BufferGetPage(rbuf);
1003
1004 origpagenumber = BufferGetBlockNumber(buf);
1005 rightpagenumber = BufferGetBlockNumber(rbuf);
1006
1007 _bt_pageinit(leftpage, BufferGetPageSize(buf));
1008 /* rightpage was already initialized by _bt_getbuf */
1009
1010 /*
1011 * Copy the original page's LSN into leftpage, which will become the
1012 * updated version of the page. We need this because XLogInsert will
1013 * examine the LSN and possibly dump it in a page image.
1014 */
1015 PageSetLSN(leftpage, PageGetLSN(origpage));
1016
1017 /* init btree private data */
1018 oopaque = (BTPageOpaque) PageGetSpecialPointer(origpage);
1019 lopaque = (BTPageOpaque) PageGetSpecialPointer(leftpage);
1020 ropaque = (BTPageOpaque) PageGetSpecialPointer(rightpage);
1021
1022 isroot = P_ISROOT(oopaque);
1023 isleaf = P_ISLEAF(oopaque);
1024
1025 /* if we're splitting this page, it won't be the root when we're done */
1026 /* also, clear the SPLIT_END and HAS_GARBAGE flags in both pages */
1027 lopaque->btpo_flags = oopaque->btpo_flags;
1028 lopaque->btpo_flags &= ~(BTP_ROOT | BTP_SPLIT_END | BTP_HAS_GARBAGE);
1029 ropaque->btpo_flags = lopaque->btpo_flags;
1030 /* set flag in left page indicating that the right page has no downlink */
1031 lopaque->btpo_flags |= BTP_INCOMPLETE_SPLIT;
1032 lopaque->btpo_prev = oopaque->btpo_prev;
1033 lopaque->btpo_next = rightpagenumber;
1034 ropaque->btpo_prev = origpagenumber;
1035 ropaque->btpo_next = oopaque->btpo_next;
1036 lopaque->btpo.level = ropaque->btpo.level = oopaque->btpo.level;
1037 /* Since we already have write-lock on both pages, ok to read cycleid */
1038 lopaque->btpo_cycleid = _bt_vacuum_cycleid(rel);
1039 ropaque->btpo_cycleid = lopaque->btpo_cycleid;
1040
1041 /*
1042 * If the page we're splitting is not the rightmost page at its level in
1043 * the tree, then the first entry on the page is the high key for the
1044 * page. We need to copy that to the right half. Otherwise (meaning the
1045 * rightmost page case), all the items on the right half will be user
1046 * data.
1047 */
1048 rightoff = P_HIKEY;
1049
1050 if (!P_RIGHTMOST(oopaque))
1051 {
1052 itemid = PageGetItemId(origpage, P_HIKEY);
1053 itemsz = ItemIdGetLength(itemid);
1054 item = (IndexTuple) PageGetItem(origpage, itemid);
1055 if (PageAddItem(rightpage, (Item) item, itemsz, rightoff,
1056 false, false) == InvalidOffsetNumber)
1057 {
1058 memset(rightpage, 0, BufferGetPageSize(rbuf));
1059 elog(ERROR, "failed to add hikey to the right sibling"
1060 " while splitting block %u of index \"%s\"",
1061 origpagenumber, RelationGetRelationName(rel));
1062 }
1063 rightoff = OffsetNumberNext(rightoff);
1064 }
1065
1066 /*
1067 * The "high key" for the new left page will be the first key that's going
1068 * to go into the new right page. This might be either the existing data
1069 * item at position firstright, or the incoming tuple.
1070 */
1071 leftoff = P_HIKEY;
1072 if (!newitemonleft && newitemoff == firstright)
1073 {
1074 /* incoming tuple will become first on right page */
1075 itemsz = newitemsz;
1076 item = newitem;
1077 }
1078 else
1079 {
1080 /* existing item at firstright will become first on right page */
1081 itemid = PageGetItemId(origpage, firstright);
1082 itemsz = ItemIdGetLength(itemid);
1083 item = (IndexTuple) PageGetItem(origpage, itemid);
1084 }
1085 if (PageAddItem(leftpage, (Item) item, itemsz, leftoff,
1086 false, false) == InvalidOffsetNumber)
1087 {
1088 memset(rightpage, 0, BufferGetPageSize(rbuf));
1089 elog(ERROR, "failed to add hikey to the left sibling"
1090 " while splitting block %u of index \"%s\"",
1091 origpagenumber, RelationGetRelationName(rel));
1092 }
1093 leftoff = OffsetNumberNext(leftoff);
1094
1095 /*
1096 * Now transfer all the data items to the appropriate page.
1097 *
1098 * Note: we *must* insert at least the right page's items in item-number
1099 * order, for the benefit of _bt_restore_page().
1100 */
1101 maxoff = PageGetMaxOffsetNumber(origpage);
1102
1103 for (i = P_FIRSTDATAKEY(oopaque); i <= maxoff; i = OffsetNumberNext(i))
1104 {
1105 itemid = PageGetItemId(origpage, i);
1106 itemsz = ItemIdGetLength(itemid);
1107 item = (IndexTuple) PageGetItem(origpage, itemid);
1108
1109 /* does new item belong before this one? */
1110 if (i == newitemoff)
1111 {
1112 if (newitemonleft)
1113 {
1114 if (!_bt_pgaddtup(leftpage, newitemsz, newitem, leftoff))
1115 {
1116 memset(rightpage, 0, BufferGetPageSize(rbuf));
1117 elog(ERROR, "failed to add new item to the left sibling"
1118 " while splitting block %u of index \"%s\"",
1119 origpagenumber, RelationGetRelationName(rel));
1120 }
1121 leftoff = OffsetNumberNext(leftoff);
1122 }
1123 else
1124 {
1125 if (!_bt_pgaddtup(rightpage, newitemsz, newitem, rightoff))
1126 {
1127 memset(rightpage, 0, BufferGetPageSize(rbuf));
1128 elog(ERROR, "failed to add new item to the right sibling"
1129 " while splitting block %u of index \"%s\"",
1130 origpagenumber, RelationGetRelationName(rel));
1131 }
1132 rightoff = OffsetNumberNext(rightoff);
1133 }
1134 }
1135
1136 /* decide which page to put it on */
1137 if (i < firstright)
1138 {
1139 if (!_bt_pgaddtup(leftpage, itemsz, item, leftoff))
1140 {
1141 memset(rightpage, 0, BufferGetPageSize(rbuf));
1142 elog(ERROR, "failed to add old item to the left sibling"
1143 " while splitting block %u of index \"%s\"",
1144 origpagenumber, RelationGetRelationName(rel));
1145 }
1146 leftoff = OffsetNumberNext(leftoff);
1147 }
1148 else
1149 {
1150 if (!_bt_pgaddtup(rightpage, itemsz, item, rightoff))
1151 {
1152 memset(rightpage, 0, BufferGetPageSize(rbuf));
1153 elog(ERROR, "failed to add old item to the right sibling"
1154 " while splitting block %u of index \"%s\"",
1155 origpagenumber, RelationGetRelationName(rel));
1156 }
1157 rightoff = OffsetNumberNext(rightoff);
1158 }
1159 }
1160
1161 /* cope with possibility that newitem goes at the end */
1162 if (i <= newitemoff)
1163 {
1164 /*
1165 * Can't have newitemonleft here; that would imply we were told to put
1166 * *everything* on the left page, which cannot fit (if it could, we'd
1167 * not be splitting the page).
1168 */
1169 Assert(!newitemonleft);
1170 if (!_bt_pgaddtup(rightpage, newitemsz, newitem, rightoff))
1171 {
1172 memset(rightpage, 0, BufferGetPageSize(rbuf));
1173 elog(ERROR, "failed to add new item to the right sibling"
1174 " while splitting block %u of index \"%s\"",
1175 origpagenumber, RelationGetRelationName(rel));
1176 }
1177 rightoff = OffsetNumberNext(rightoff);
1178 }
1179
1180 /*
1181 * We have to grab the right sibling (if any) and fix the prev pointer
1182 * there. We are guaranteed that this is deadlock-free since no other
1183 * writer will be holding a lock on that page and trying to move left, and
1184 * all readers release locks on a page before trying to fetch its
1185 * neighbors.
1186 */
1187
1188 if (!P_RIGHTMOST(oopaque))
1189 {
1190 sbuf = _bt_getbuf(rel, oopaque->btpo_next, BT_WRITE);
1191 spage = BufferGetPage(sbuf);
1192 sopaque = (BTPageOpaque) PageGetSpecialPointer(spage);
1193 if (sopaque->btpo_prev != origpagenumber)
1194 {
1195 memset(rightpage, 0, BufferGetPageSize(rbuf));
1196 elog(ERROR, "right sibling's left-link doesn't match: "
1197 "block %u links to %u instead of expected %u in index \"%s\"",
1198 oopaque->btpo_next, sopaque->btpo_prev, origpagenumber,
1199 RelationGetRelationName(rel));
1200 }
1201
1202 /*
1203 * Check to see if we can set the SPLIT_END flag in the right-hand
1204 * split page; this can save some I/O for vacuum since it need not
1205 * proceed to the right sibling. We can set the flag if the right
1206 * sibling has a different cycleid: that means it could not be part of
1207 * a group of pages that were all split off from the same ancestor
1208 * page. If you're confused, imagine that page A splits to A B and
1209 * then again, yielding A C B, while vacuum is in progress. Tuples
1210 * originally in A could now be in either B or C, hence vacuum must
1211 * examine both pages. But if D, our right sibling, has a different
1212 * cycleid then it could not contain any tuples that were in A when
1213 * the vacuum started.
1214 */
1215 if (sopaque->btpo_cycleid != ropaque->btpo_cycleid)
1216 ropaque->btpo_flags |= BTP_SPLIT_END;
1217 }
1218
1219 /*
1220 * Right sibling is locked, new siblings are prepared, but original page
1221 * is not updated yet.
1222 *
1223 * NO EREPORT(ERROR) till right sibling is updated. We can get away with
1224 * not starting the critical section till here because we haven't been
1225 * scribbling on the original page yet; see comments above.
1226 */
1227 START_CRIT_SECTION();
1228
1229 /*
1230 * By here, the original data page has been split into two new halves, and
1231 * these are correct. The algorithm requires that the left page never
1232 * move during a split, so we copy the new left page back on top of the
1233 * original. Note that this is not a waste of time, since we also require
1234 * (in the page management code) that the center of a page always be
1235 * clean, and the most efficient way to guarantee this is just to compact
1236 * the data by reinserting it into a new left page. (XXX the latter
1237 * comment is probably obsolete; but in any case it's good to not scribble
1238 * on the original page until we enter the critical section.)
1239 *
1240 * We need to do this before writing the WAL record, so that XLogInsert
1241 * can WAL log an image of the page if necessary.
1242 */
1243 PageRestoreTempPage(leftpage, origpage);
1244 /* leftpage, lopaque must not be used below here */
1245
1246 MarkBufferDirty(buf);
1247 MarkBufferDirty(rbuf);
1248
1249 if (!P_RIGHTMOST(ropaque))
1250 {
1251 sopaque->btpo_prev = rightpagenumber;
1252 MarkBufferDirty(sbuf);
1253 }
1254
1255 /*
1256 * Clear INCOMPLETE_SPLIT flag on child if inserting the new item finishes
1257 * a split.
1258 */
1259 if (!isleaf)
1260 {
1261 Page cpage = BufferGetPage(cbuf);
1262 BTPageOpaque cpageop = (BTPageOpaque) PageGetSpecialPointer(cpage);
1263
1264 cpageop->btpo_flags &= ~BTP_INCOMPLETE_SPLIT;
1265 MarkBufferDirty(cbuf);
1266 }
1267
1268 /* XLOG stuff */
1269 if (RelationNeedsWAL(rel))
1270 {
1271 xl_btree_split xlrec;
1272 uint8 xlinfo;
1273 XLogRecPtr recptr;
1274
1275 xlrec.level = ropaque->btpo.level;
1276 xlrec.firstright = firstright;
1277 xlrec.newitemoff = newitemoff;
1278
1279 XLogBeginInsert();
1280 XLogRegisterData((char *) &xlrec, SizeOfBtreeSplit);
1281
1282 XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
1283 XLogRegisterBuffer(1, rbuf, REGBUF_WILL_INIT);
1284 /* Log the right sibling, because we've changed its prev-pointer. */
1285 if (!P_RIGHTMOST(ropaque))
1286 XLogRegisterBuffer(2, sbuf, REGBUF_STANDARD);
1287 if (BufferIsValid(cbuf))
1288 XLogRegisterBuffer(3, cbuf, REGBUF_STANDARD);
1289
1290 /*
1291 * Log the new item, if it was inserted on the left page. (If it was
1292 * put on the right page, we don't need to explicitly WAL log it
1293 * because it's included with all the other items on the right page.)
1294 * Show the new item as belonging to the left page buffer, so that it
1295 * is not stored if XLogInsert decides it needs a full-page image of
1296 * the left page. We store the offset anyway, though, to support
1297 * archive compression of these records.
1298 */
1299 if (newitemonleft)
1300 XLogRegisterBufData(0, (char *) newitem, MAXALIGN(newitemsz));
1301
1302 /* Log left page */
1303 if (!isleaf)
1304 {
1305 /*
1306 * We must also log the left page's high key, because the right
1307 * page's leftmost key is suppressed on non-leaf levels. Show it
1308 * as belonging to the left page buffer, so that it is not stored
1309 * if XLogInsert decides it needs a full-page image of the left
1310 * page.
1311 */
1312 itemid = PageGetItemId(origpage, P_HIKEY);
1313 item = (IndexTuple) PageGetItem(origpage, itemid);
1314 XLogRegisterBufData(0, (char *) item, MAXALIGN(IndexTupleSize(item)));
1315 }
1316
1317 /*
1318 * Log the contents of the right page in the format understood by
1319 * _bt_restore_page(). We set lastrdata->buffer to InvalidBuffer,
1320 * because we're going to recreate the whole page anyway, so it should
1321 * never be stored by XLogInsert.
1322 *
1323 * Direct access to page is not good but faster - we should implement
1324 * some new func in page API. Note we only store the tuples
1325 * themselves, knowing that they were inserted in item-number order
1326 * and so the item pointers can be reconstructed. See comments for
1327 * _bt_restore_page().
1328 */
1329 XLogRegisterBufData(1,
1330 (char *) rightpage + ((PageHeader) rightpage)->pd_upper,
1331 ((PageHeader) rightpage)->pd_special - ((PageHeader) rightpage)->pd_upper);
1332
1333 if (isroot)
1334 xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L_ROOT : XLOG_BTREE_SPLIT_R_ROOT;
1335 else
1336 xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L : XLOG_BTREE_SPLIT_R;
1337
1338 recptr = XLogInsert(RM_BTREE_ID, xlinfo);
1339
1340 PageSetLSN(origpage, recptr);
1341 PageSetLSN(rightpage, recptr);
1342 if (!P_RIGHTMOST(ropaque))
1343 {
1344 PageSetLSN(spage, recptr);
1345 }
1346 if (!isleaf)
1347 {
1348 PageSetLSN(BufferGetPage(cbuf), recptr);
1349 }
1350 }
1351
1352 END_CRIT_SECTION();
1353
1354 /* release the old right sibling */
1355 if (!P_RIGHTMOST(ropaque))
1356 _bt_relbuf(rel, sbuf);
1357
1358 /* release the child */
1359 if (!isleaf)
1360 _bt_relbuf(rel, cbuf);
1361
1362 /* split's done */
1363 return rbuf;
1364 }
1365
1366 /*
1367 * _bt_findsplitloc() -- find an appropriate place to split a page.
1368 *
1369 * The idea here is to equalize the free space that will be on each split
1370 * page, *after accounting for the inserted tuple*. (If we fail to account
1371 * for it, we might find ourselves with too little room on the page that
1372 * it needs to go into!)
1373 *
1374 * If the page is the rightmost page on its level, we instead try to arrange
1375 * to leave the left split page fillfactor% full. In this way, when we are
1376 * inserting successively increasing keys (consider sequences, timestamps,
1377 * etc) we will end up with a tree whose pages are about fillfactor% full,
1378 * instead of the 50% full result that we'd get without this special case.
1379 * This is the same as nbtsort.c produces for a newly-created tree. Note
1380 * that leaf and nonleaf pages use different fillfactors.
1381 *
1382 * We are passed the intended insert position of the new tuple, expressed as
1383 * the offsetnumber of the tuple it must go in front of. (This could be
1384 * maxoff+1 if the tuple is to go at the end.)
1385 *
1386 * We return the index of the first existing tuple that should go on the
1387 * righthand page, plus a boolean indicating whether the new tuple goes on
1388 * the left or right page. The bool is necessary to disambiguate the case
1389 * where firstright == newitemoff.
1390 */
1391 static OffsetNumber
_bt_findsplitloc(Relation rel,Page page,OffsetNumber newitemoff,Size newitemsz,bool * newitemonleft)1392 _bt_findsplitloc(Relation rel,
1393 Page page,
1394 OffsetNumber newitemoff,
1395 Size newitemsz,
1396 bool *newitemonleft)
1397 {
1398 BTPageOpaque opaque;
1399 OffsetNumber offnum;
1400 OffsetNumber maxoff;
1401 ItemId itemid;
1402 FindSplitData state;
1403 int leftspace,
1404 rightspace,
1405 goodenough,
1406 olddataitemstotal,
1407 olddataitemstoleft;
1408 bool goodenoughfound;
1409
1410 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1411
1412 /* Passed-in newitemsz is MAXALIGNED but does not include line pointer */
1413 newitemsz += sizeof(ItemIdData);
1414
1415 /* Total free space available on a btree page, after fixed overhead */
1416 leftspace = rightspace =
1417 PageGetPageSize(page) - SizeOfPageHeaderData -
1418 MAXALIGN(sizeof(BTPageOpaqueData));
1419
1420 /* The right page will have the same high key as the old page */
1421 if (!P_RIGHTMOST(opaque))
1422 {
1423 itemid = PageGetItemId(page, P_HIKEY);
1424 rightspace -= (int) (MAXALIGN(ItemIdGetLength(itemid)) +
1425 sizeof(ItemIdData));
1426 }
1427
1428 /* Count up total space in data items without actually scanning 'em */
1429 olddataitemstotal = rightspace - (int) PageGetExactFreeSpace(page);
1430
1431 state.newitemsz = newitemsz;
1432 state.is_leaf = P_ISLEAF(opaque);
1433 state.is_rightmost = P_RIGHTMOST(opaque);
1434 state.have_split = false;
1435 if (state.is_leaf)
1436 state.fillfactor = RelationGetFillFactor(rel,
1437 BTREE_DEFAULT_FILLFACTOR);
1438 else
1439 state.fillfactor = BTREE_NONLEAF_FILLFACTOR;
1440 state.newitemonleft = false; /* these just to keep compiler quiet */
1441 state.firstright = 0;
1442 state.best_delta = 0;
1443 state.leftspace = leftspace;
1444 state.rightspace = rightspace;
1445 state.olddataitemstotal = olddataitemstotal;
1446 state.newitemoff = newitemoff;
1447
1448 /*
1449 * Finding the best possible split would require checking all the possible
1450 * split points, because of the high-key and left-key special cases.
1451 * That's probably more work than it's worth; instead, stop as soon as we
1452 * find a "good-enough" split, where good-enough is defined as an
1453 * imbalance in free space of no more than pagesize/16 (arbitrary...) This
1454 * should let us stop near the middle on most pages, instead of plowing to
1455 * the end.
1456 */
1457 goodenough = leftspace / 16;
1458
1459 /*
1460 * Scan through the data items and calculate space usage for a split at
1461 * each possible position.
1462 */
1463 olddataitemstoleft = 0;
1464 goodenoughfound = false;
1465 maxoff = PageGetMaxOffsetNumber(page);
1466
1467 for (offnum = P_FIRSTDATAKEY(opaque);
1468 offnum <= maxoff;
1469 offnum = OffsetNumberNext(offnum))
1470 {
1471 Size itemsz;
1472
1473 itemid = PageGetItemId(page, offnum);
1474 itemsz = MAXALIGN(ItemIdGetLength(itemid)) + sizeof(ItemIdData);
1475
1476 /*
1477 * Will the new item go to left or right of split?
1478 */
1479 if (offnum > newitemoff)
1480 _bt_checksplitloc(&state, offnum, true,
1481 olddataitemstoleft, itemsz);
1482
1483 else if (offnum < newitemoff)
1484 _bt_checksplitloc(&state, offnum, false,
1485 olddataitemstoleft, itemsz);
1486 else
1487 {
1488 /* need to try it both ways! */
1489 _bt_checksplitloc(&state, offnum, true,
1490 olddataitemstoleft, itemsz);
1491
1492 _bt_checksplitloc(&state, offnum, false,
1493 olddataitemstoleft, itemsz);
1494 }
1495
1496 /* Abort scan once we find a good-enough choice */
1497 if (state.have_split && state.best_delta <= goodenough)
1498 {
1499 goodenoughfound = true;
1500 break;
1501 }
1502
1503 olddataitemstoleft += itemsz;
1504 }
1505
1506 /*
1507 * If the new item goes as the last item, check for splitting so that all
1508 * the old items go to the left page and the new item goes to the right
1509 * page.
1510 */
1511 if (newitemoff > maxoff && !goodenoughfound)
1512 _bt_checksplitloc(&state, newitemoff, false, olddataitemstotal, 0);
1513
1514 /*
1515 * I believe it is not possible to fail to find a feasible split, but just
1516 * in case ...
1517 */
1518 if (!state.have_split)
1519 elog(ERROR, "could not find a feasible split point for index \"%s\"",
1520 RelationGetRelationName(rel));
1521
1522 *newitemonleft = state.newitemonleft;
1523 return state.firstright;
1524 }
1525
1526 /*
1527 * Subroutine to analyze a particular possible split choice (ie, firstright
1528 * and newitemonleft settings), and record the best split so far in *state.
1529 *
1530 * firstoldonright is the offset of the first item on the original page
1531 * that goes to the right page, and firstoldonrightsz is the size of that
1532 * tuple. firstoldonright can be > max offset, which means that all the old
1533 * items go to the left page and only the new item goes to the right page.
1534 * In that case, firstoldonrightsz is not used.
1535 *
1536 * olddataitemstoleft is the total size of all old items to the left of
1537 * firstoldonright.
1538 */
1539 static void
_bt_checksplitloc(FindSplitData * state,OffsetNumber firstoldonright,bool newitemonleft,int olddataitemstoleft,Size firstoldonrightsz)1540 _bt_checksplitloc(FindSplitData *state,
1541 OffsetNumber firstoldonright,
1542 bool newitemonleft,
1543 int olddataitemstoleft,
1544 Size firstoldonrightsz)
1545 {
1546 int leftfree,
1547 rightfree;
1548 Size firstrightitemsz;
1549 bool newitemisfirstonright;
1550
1551 /* Is the new item going to be the first item on the right page? */
1552 newitemisfirstonright = (firstoldonright == state->newitemoff
1553 && !newitemonleft);
1554
1555 if (newitemisfirstonright)
1556 firstrightitemsz = state->newitemsz;
1557 else
1558 firstrightitemsz = firstoldonrightsz;
1559
1560 /* Account for all the old tuples */
1561 leftfree = state->leftspace - olddataitemstoleft;
1562 rightfree = state->rightspace -
1563 (state->olddataitemstotal - olddataitemstoleft);
1564
1565 /*
1566 * The first item on the right page becomes the high key of the left page;
1567 * therefore it counts against left space as well as right space.
1568 */
1569 leftfree -= firstrightitemsz;
1570
1571 /* account for the new item */
1572 if (newitemonleft)
1573 leftfree -= (int) state->newitemsz;
1574 else
1575 rightfree -= (int) state->newitemsz;
1576
1577 /*
1578 * If we are not on the leaf level, we will be able to discard the key
1579 * data from the first item that winds up on the right page.
1580 */
1581 if (!state->is_leaf)
1582 rightfree += (int) firstrightitemsz -
1583 (int) (MAXALIGN(sizeof(IndexTupleData)) + sizeof(ItemIdData));
1584
1585 /*
1586 * If feasible split point, remember best delta.
1587 */
1588 if (leftfree >= 0 && rightfree >= 0)
1589 {
1590 int delta;
1591
1592 if (state->is_rightmost)
1593 {
1594 /*
1595 * If splitting a rightmost page, try to put (100-fillfactor)% of
1596 * free space on left page. See comments for _bt_findsplitloc.
1597 */
1598 delta = (state->fillfactor * leftfree)
1599 - ((100 - state->fillfactor) * rightfree);
1600 }
1601 else
1602 {
1603 /* Otherwise, aim for equal free space on both sides */
1604 delta = leftfree - rightfree;
1605 }
1606
1607 if (delta < 0)
1608 delta = -delta;
1609 if (!state->have_split || delta < state->best_delta)
1610 {
1611 state->have_split = true;
1612 state->newitemonleft = newitemonleft;
1613 state->firstright = firstoldonright;
1614 state->best_delta = delta;
1615 }
1616 }
1617 }
1618
1619 /*
1620 * _bt_insert_parent() -- Insert downlink into parent after a page split.
1621 *
1622 * On entry, buf and rbuf are the left and right split pages, which we
1623 * still hold write locks on per the L&Y algorithm. We release the
1624 * write locks once we have write lock on the parent page. (Any sooner,
1625 * and it'd be possible for some other process to try to split or delete
1626 * one of these pages, and get confused because it cannot find the downlink.)
1627 *
1628 * stack - stack showing how we got here. May be NULL in cases that don't
1629 * have to be efficient (concurrent ROOT split, WAL recovery)
1630 * is_root - we split the true root
1631 * is_only - we split a page alone on its level (might have been fast root)
1632 */
1633 static void
_bt_insert_parent(Relation rel,Buffer buf,Buffer rbuf,BTStack stack,bool is_root,bool is_only)1634 _bt_insert_parent(Relation rel,
1635 Buffer buf,
1636 Buffer rbuf,
1637 BTStack stack,
1638 bool is_root,
1639 bool is_only)
1640 {
1641 /*
1642 * Here we have to do something Lehman and Yao don't talk about: deal with
1643 * a root split and construction of a new root. If our stack is empty
1644 * then we have just split a node on what had been the root level when we
1645 * descended the tree. If it was still the root then we perform a
1646 * new-root construction. If it *wasn't* the root anymore, search to find
1647 * the next higher level that someone constructed meanwhile, and find the
1648 * right place to insert as for the normal case.
1649 *
1650 * If we have to search for the parent level, we do so by re-descending
1651 * from the root. This is not super-efficient, but it's rare enough not
1652 * to matter.
1653 */
1654 if (is_root)
1655 {
1656 Buffer rootbuf;
1657
1658 Assert(stack == NULL);
1659 Assert(is_only);
1660 /* create a new root node and update the metapage */
1661 rootbuf = _bt_newroot(rel, buf, rbuf);
1662 /* release the split buffers */
1663 _bt_relbuf(rel, rootbuf);
1664 _bt_relbuf(rel, rbuf);
1665 _bt_relbuf(rel, buf);
1666 }
1667 else
1668 {
1669 BlockNumber bknum = BufferGetBlockNumber(buf);
1670 BlockNumber rbknum = BufferGetBlockNumber(rbuf);
1671 Page page = BufferGetPage(buf);
1672 IndexTuple new_item;
1673 BTStackData fakestack;
1674 IndexTuple ritem;
1675 Buffer pbuf;
1676
1677 if (stack == NULL)
1678 {
1679 BTPageOpaque lpageop;
1680
1681 elog(DEBUG2, "concurrent ROOT page split");
1682 lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
1683 /* Find the leftmost page at the next level up */
1684 pbuf = _bt_get_endpoint(rel, lpageop->btpo.level + 1, false,
1685 NULL);
1686 /* Set up a phony stack entry pointing there */
1687 stack = &fakestack;
1688 stack->bts_blkno = BufferGetBlockNumber(pbuf);
1689 stack->bts_offset = InvalidOffsetNumber;
1690 /* bts_btentry will be initialized below */
1691 stack->bts_parent = NULL;
1692 _bt_relbuf(rel, pbuf);
1693 }
1694
1695 /* get high key from left page == lowest key on new right page */
1696 ritem = (IndexTuple) PageGetItem(page,
1697 PageGetItemId(page, P_HIKEY));
1698
1699 /* form an index tuple that points at the new right page */
1700 new_item = CopyIndexTuple(ritem);
1701 ItemPointerSet(&(new_item->t_tid), rbknum, P_HIKEY);
1702
1703 /*
1704 * Find the parent buffer and get the parent page.
1705 *
1706 * Oops - if we were moved right then we need to change stack item! We
1707 * want to find parent pointing to where we are, right ? - vadim
1708 * 05/27/97
1709 */
1710 ItemPointerSet(&(stack->bts_btentry.t_tid), bknum, P_HIKEY);
1711 pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);
1712
1713 /*
1714 * Now we can unlock the right child. The left child will be unlocked
1715 * by _bt_insertonpg().
1716 */
1717 _bt_relbuf(rel, rbuf);
1718
1719 /* Check for error only after writing children */
1720 if (pbuf == InvalidBuffer)
1721 elog(ERROR, "failed to re-find parent key in index \"%s\" for split pages %u/%u",
1722 RelationGetRelationName(rel), bknum, rbknum);
1723
1724 /* Recursively update the parent */
1725 _bt_insertonpg(rel, pbuf, buf, stack->bts_parent,
1726 new_item, stack->bts_offset + 1,
1727 is_only);
1728
1729 /* be tidy */
1730 pfree(new_item);
1731 }
1732 }
1733
1734 /*
1735 * _bt_finish_split() -- Finish an incomplete split
1736 *
1737 * A crash or other failure can leave a split incomplete. The insertion
1738 * routines won't allow to insert on a page that is incompletely split.
1739 * Before inserting on such a page, call _bt_finish_split().
1740 *
1741 * On entry, 'lbuf' must be locked in write-mode. On exit, it is unlocked
1742 * and unpinned.
1743 */
1744 void
_bt_finish_split(Relation rel,Buffer lbuf,BTStack stack)1745 _bt_finish_split(Relation rel, Buffer lbuf, BTStack stack)
1746 {
1747 Page lpage = BufferGetPage(lbuf);
1748 BTPageOpaque lpageop = (BTPageOpaque) PageGetSpecialPointer(lpage);
1749 Buffer rbuf;
1750 Page rpage;
1751 BTPageOpaque rpageop;
1752 bool was_root;
1753 bool was_only;
1754
1755 Assert(P_INCOMPLETE_SPLIT(lpageop));
1756
1757 /* Lock right sibling, the one missing the downlink */
1758 rbuf = _bt_getbuf(rel, lpageop->btpo_next, BT_WRITE);
1759 rpage = BufferGetPage(rbuf);
1760 rpageop = (BTPageOpaque) PageGetSpecialPointer(rpage);
1761
1762 /* Could this be a root split? */
1763 if (!stack)
1764 {
1765 Buffer metabuf;
1766 Page metapg;
1767 BTMetaPageData *metad;
1768
1769 /* acquire lock on the metapage */
1770 metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
1771 metapg = BufferGetPage(metabuf);
1772 metad = BTPageGetMeta(metapg);
1773
1774 was_root = (metad->btm_root == BufferGetBlockNumber(lbuf));
1775
1776 _bt_relbuf(rel, metabuf);
1777 }
1778 else
1779 was_root = false;
1780
1781 /* Was this the only page on the level before split? */
1782 was_only = (P_LEFTMOST(lpageop) && P_RIGHTMOST(rpageop));
1783
1784 elog(DEBUG1, "finishing incomplete split of %u/%u",
1785 BufferGetBlockNumber(lbuf), BufferGetBlockNumber(rbuf));
1786
1787 _bt_insert_parent(rel, lbuf, rbuf, stack, was_root, was_only);
1788 }
1789
1790 /*
1791 * _bt_getstackbuf() -- Walk back up the tree one step, and find the item
1792 * we last looked at in the parent.
1793 *
1794 * This is possible because we save the downlink from the parent item,
1795 * which is enough to uniquely identify it. Insertions into the parent
1796 * level could cause the item to move right; deletions could cause it
1797 * to move left, but not left of the page we previously found it in.
1798 *
1799 * Adjusts bts_blkno & bts_offset if changed.
1800 *
1801 * Returns InvalidBuffer if item not found (should not happen).
1802 */
1803 Buffer
_bt_getstackbuf(Relation rel,BTStack stack,int access)1804 _bt_getstackbuf(Relation rel, BTStack stack, int access)
1805 {
1806 BlockNumber blkno;
1807 OffsetNumber start;
1808
1809 blkno = stack->bts_blkno;
1810 start = stack->bts_offset;
1811
1812 for (;;)
1813 {
1814 Buffer buf;
1815 Page page;
1816 BTPageOpaque opaque;
1817
1818 buf = _bt_getbuf(rel, blkno, access);
1819 page = BufferGetPage(buf);
1820 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1821
1822 if (access == BT_WRITE && P_INCOMPLETE_SPLIT(opaque))
1823 {
1824 _bt_finish_split(rel, buf, stack->bts_parent);
1825 continue;
1826 }
1827
1828 if (!P_IGNORE(opaque))
1829 {
1830 OffsetNumber offnum,
1831 minoff,
1832 maxoff;
1833 ItemId itemid;
1834 IndexTuple item;
1835
1836 minoff = P_FIRSTDATAKEY(opaque);
1837 maxoff = PageGetMaxOffsetNumber(page);
1838
1839 /*
1840 * start = InvalidOffsetNumber means "search the whole page". We
1841 * need this test anyway due to possibility that page has a high
1842 * key now when it didn't before.
1843 */
1844 if (start < minoff)
1845 start = minoff;
1846
1847 /*
1848 * Need this check too, to guard against possibility that page
1849 * split since we visited it originally.
1850 */
1851 if (start > maxoff)
1852 start = OffsetNumberNext(maxoff);
1853
1854 /*
1855 * These loops will check every item on the page --- but in an
1856 * order that's attuned to the probability of where it actually
1857 * is. Scan to the right first, then to the left.
1858 */
1859 for (offnum = start;
1860 offnum <= maxoff;
1861 offnum = OffsetNumberNext(offnum))
1862 {
1863 itemid = PageGetItemId(page, offnum);
1864 item = (IndexTuple) PageGetItem(page, itemid);
1865 if (BTEntrySame(item, &stack->bts_btentry))
1866 {
1867 /* Return accurate pointer to where link is now */
1868 stack->bts_blkno = blkno;
1869 stack->bts_offset = offnum;
1870 return buf;
1871 }
1872 }
1873
1874 for (offnum = OffsetNumberPrev(start);
1875 offnum >= minoff;
1876 offnum = OffsetNumberPrev(offnum))
1877 {
1878 itemid = PageGetItemId(page, offnum);
1879 item = (IndexTuple) PageGetItem(page, itemid);
1880 if (BTEntrySame(item, &stack->bts_btentry))
1881 {
1882 /* Return accurate pointer to where link is now */
1883 stack->bts_blkno = blkno;
1884 stack->bts_offset = offnum;
1885 return buf;
1886 }
1887 }
1888 }
1889
1890 /*
1891 * The item we're looking for moved right at least one page.
1892 */
1893 if (P_RIGHTMOST(opaque))
1894 {
1895 _bt_relbuf(rel, buf);
1896 return InvalidBuffer;
1897 }
1898 blkno = opaque->btpo_next;
1899 start = InvalidOffsetNumber;
1900 _bt_relbuf(rel, buf);
1901 }
1902 }
1903
1904 /*
1905 * _bt_newroot() -- Create a new root page for the index.
1906 *
1907 * We've just split the old root page and need to create a new one.
1908 * In order to do this, we add a new root page to the file, then lock
1909 * the metadata page and update it. This is guaranteed to be deadlock-
1910 * free, because all readers release their locks on the metadata page
1911 * before trying to lock the root, and all writers lock the root before
1912 * trying to lock the metadata page. We have a write lock on the old
1913 * root page, so we have not introduced any cycles into the waits-for
1914 * graph.
1915 *
1916 * On entry, lbuf (the old root) and rbuf (its new peer) are write-
1917 * locked. On exit, a new root page exists with entries for the
1918 * two new children, metapage is updated and unlocked/unpinned.
1919 * The new root buffer is returned to caller which has to unlock/unpin
1920 * lbuf, rbuf & rootbuf.
1921 */
1922 static Buffer
_bt_newroot(Relation rel,Buffer lbuf,Buffer rbuf)1923 _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
1924 {
1925 Buffer rootbuf;
1926 Page lpage,
1927 rootpage;
1928 BlockNumber lbkno,
1929 rbkno;
1930 BlockNumber rootblknum;
1931 BTPageOpaque rootopaque;
1932 BTPageOpaque lopaque;
1933 ItemId itemid;
1934 IndexTuple item;
1935 IndexTuple left_item;
1936 Size left_item_sz;
1937 IndexTuple right_item;
1938 Size right_item_sz;
1939 Buffer metabuf;
1940 Page metapg;
1941 BTMetaPageData *metad;
1942
1943 lbkno = BufferGetBlockNumber(lbuf);
1944 rbkno = BufferGetBlockNumber(rbuf);
1945 lpage = BufferGetPage(lbuf);
1946 lopaque = (BTPageOpaque) PageGetSpecialPointer(lpage);
1947
1948 /* get a new root page */
1949 rootbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
1950 rootpage = BufferGetPage(rootbuf);
1951 rootblknum = BufferGetBlockNumber(rootbuf);
1952
1953 /* acquire lock on the metapage */
1954 metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
1955 metapg = BufferGetPage(metabuf);
1956 metad = BTPageGetMeta(metapg);
1957
1958 /*
1959 * Create downlink item for left page (old root). Since this will be the
1960 * first item in a non-leaf page, it implicitly has minus-infinity key
1961 * value, so we need not store any actual key in it.
1962 */
1963 left_item_sz = sizeof(IndexTupleData);
1964 left_item = (IndexTuple) palloc(left_item_sz);
1965 left_item->t_info = left_item_sz;
1966 ItemPointerSet(&(left_item->t_tid), lbkno, P_HIKEY);
1967
1968 /*
1969 * Create downlink item for right page. The key for it is obtained from
1970 * the "high key" position in the left page.
1971 */
1972 itemid = PageGetItemId(lpage, P_HIKEY);
1973 right_item_sz = ItemIdGetLength(itemid);
1974 item = (IndexTuple) PageGetItem(lpage, itemid);
1975 right_item = CopyIndexTuple(item);
1976 ItemPointerSet(&(right_item->t_tid), rbkno, P_HIKEY);
1977
1978 /* NO EREPORT(ERROR) from here till newroot op is logged */
1979 START_CRIT_SECTION();
1980
1981 /* set btree special data */
1982 rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpage);
1983 rootopaque->btpo_prev = rootopaque->btpo_next = P_NONE;
1984 rootopaque->btpo_flags = BTP_ROOT;
1985 rootopaque->btpo.level =
1986 ((BTPageOpaque) PageGetSpecialPointer(lpage))->btpo.level + 1;
1987 rootopaque->btpo_cycleid = 0;
1988
1989 /* update metapage data */
1990 metad->btm_root = rootblknum;
1991 metad->btm_level = rootopaque->btpo.level;
1992 metad->btm_fastroot = rootblknum;
1993 metad->btm_fastlevel = rootopaque->btpo.level;
1994
1995 /*
1996 * Insert the left page pointer into the new root page. The root page is
1997 * the rightmost page on its level so there is no "high key" in it; the
1998 * two items will go into positions P_HIKEY and P_FIRSTKEY.
1999 *
2000 * Note: we *must* insert the two items in item-number order, for the
2001 * benefit of _bt_restore_page().
2002 */
2003 if (PageAddItem(rootpage, (Item) left_item, left_item_sz, P_HIKEY,
2004 false, false) == InvalidOffsetNumber)
2005 elog(PANIC, "failed to add leftkey to new root page"
2006 " while splitting block %u of index \"%s\"",
2007 BufferGetBlockNumber(lbuf), RelationGetRelationName(rel));
2008
2009 /*
2010 * insert the right page pointer into the new root page.
2011 */
2012 if (PageAddItem(rootpage, (Item) right_item, right_item_sz, P_FIRSTKEY,
2013 false, false) == InvalidOffsetNumber)
2014 elog(PANIC, "failed to add rightkey to new root page"
2015 " while splitting block %u of index \"%s\"",
2016 BufferGetBlockNumber(lbuf), RelationGetRelationName(rel));
2017
2018 /* Clear the incomplete-split flag in the left child */
2019 Assert(P_INCOMPLETE_SPLIT(lopaque));
2020 lopaque->btpo_flags &= ~BTP_INCOMPLETE_SPLIT;
2021 MarkBufferDirty(lbuf);
2022
2023 MarkBufferDirty(rootbuf);
2024 MarkBufferDirty(metabuf);
2025
2026 /* XLOG stuff */
2027 if (RelationNeedsWAL(rel))
2028 {
2029 xl_btree_newroot xlrec;
2030 XLogRecPtr recptr;
2031 xl_btree_metadata md;
2032
2033 xlrec.rootblk = rootblknum;
2034 xlrec.level = metad->btm_level;
2035
2036 XLogBeginInsert();
2037 XLogRegisterData((char *) &xlrec, SizeOfBtreeNewroot);
2038
2039 XLogRegisterBuffer(0, rootbuf, REGBUF_WILL_INIT);
2040 XLogRegisterBuffer(1, lbuf, REGBUF_STANDARD);
2041 XLogRegisterBuffer(2, metabuf, REGBUF_WILL_INIT);
2042
2043 md.root = rootblknum;
2044 md.level = metad->btm_level;
2045 md.fastroot = rootblknum;
2046 md.fastlevel = metad->btm_level;
2047
2048 XLogRegisterBufData(2, (char *) &md, sizeof(xl_btree_metadata));
2049
2050 /*
2051 * Direct access to page is not good but faster - we should implement
2052 * some new func in page API.
2053 */
2054 XLogRegisterBufData(0,
2055 (char *) rootpage + ((PageHeader) rootpage)->pd_upper,
2056 ((PageHeader) rootpage)->pd_special -
2057 ((PageHeader) rootpage)->pd_upper);
2058
2059 recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT);
2060
2061 PageSetLSN(lpage, recptr);
2062 PageSetLSN(rootpage, recptr);
2063 PageSetLSN(metapg, recptr);
2064 }
2065
2066 END_CRIT_SECTION();
2067
2068 /* done with metapage */
2069 _bt_relbuf(rel, metabuf);
2070
2071 pfree(left_item);
2072 pfree(right_item);
2073
2074 return rootbuf;
2075 }
2076
2077 /*
2078 * _bt_pgaddtup() -- add a tuple to a particular page in the index.
2079 *
2080 * This routine adds the tuple to the page as requested. It does
2081 * not affect pin/lock status, but you'd better have a write lock
2082 * and pin on the target buffer! Don't forget to write and release
2083 * the buffer afterwards, either.
2084 *
2085 * The main difference between this routine and a bare PageAddItem call
2086 * is that this code knows that the leftmost index tuple on a non-leaf
2087 * btree page doesn't need to have a key. Therefore, it strips such
2088 * tuples down to just the tuple header. CAUTION: this works ONLY if
2089 * we insert the tuples in order, so that the given itup_off does
2090 * represent the final position of the tuple!
2091 */
2092 static bool
_bt_pgaddtup(Page page,Size itemsize,IndexTuple itup,OffsetNumber itup_off)2093 _bt_pgaddtup(Page page,
2094 Size itemsize,
2095 IndexTuple itup,
2096 OffsetNumber itup_off)
2097 {
2098 BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
2099 IndexTupleData trunctuple;
2100
2101 if (!P_ISLEAF(opaque) && itup_off == P_FIRSTDATAKEY(opaque))
2102 {
2103 trunctuple = *itup;
2104 trunctuple.t_info = sizeof(IndexTupleData);
2105 itup = &trunctuple;
2106 itemsize = sizeof(IndexTupleData);
2107 }
2108
2109 if (PageAddItem(page, (Item) itup, itemsize, itup_off,
2110 false, false) == InvalidOffsetNumber)
2111 return false;
2112
2113 return true;
2114 }
2115
2116 /*
2117 * _bt_isequal - used in _bt_doinsert in check for duplicates.
2118 *
2119 * This is very similar to _bt_compare, except for NULL handling.
2120 * Rule is simple: NOT_NULL not equal NULL, NULL not equal NULL too.
2121 */
2122 static bool
_bt_isequal(TupleDesc itupdesc,Page page,OffsetNumber offnum,int keysz,ScanKey scankey)2123 _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
2124 int keysz, ScanKey scankey)
2125 {
2126 IndexTuple itup;
2127 int i;
2128
2129 /* Better be comparing to a leaf item */
2130 Assert(P_ISLEAF((BTPageOpaque) PageGetSpecialPointer(page)));
2131
2132 itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum));
2133
2134 for (i = 1; i <= keysz; i++)
2135 {
2136 AttrNumber attno;
2137 Datum datum;
2138 bool isNull;
2139 int32 result;
2140
2141 attno = scankey->sk_attno;
2142 Assert(attno == i);
2143 datum = index_getattr(itup, attno, itupdesc, &isNull);
2144
2145 /* NULLs are never equal to anything */
2146 if (isNull || (scankey->sk_flags & SK_ISNULL))
2147 return false;
2148
2149 result = DatumGetInt32(FunctionCall2Coll(&scankey->sk_func,
2150 scankey->sk_collation,
2151 datum,
2152 scankey->sk_argument));
2153
2154 if (result != 0)
2155 return false;
2156
2157 scankey++;
2158 }
2159
2160 /* if we get here, the keys are equal */
2161 return true;
2162 }
2163
2164 /*
2165 * _bt_vacuum_one_page - vacuum just one index page.
2166 *
2167 * Try to remove LP_DEAD items from the given page. The passed buffer
2168 * must be exclusive-locked, but unlike a real VACUUM, we don't need a
2169 * super-exclusive "cleanup" lock (see nbtree/README).
2170 */
2171 static void
_bt_vacuum_one_page(Relation rel,Buffer buffer,Relation heapRel)2172 _bt_vacuum_one_page(Relation rel, Buffer buffer, Relation heapRel)
2173 {
2174 OffsetNumber deletable[MaxOffsetNumber];
2175 int ndeletable = 0;
2176 OffsetNumber offnum,
2177 minoff,
2178 maxoff;
2179 Page page = BufferGetPage(buffer);
2180 BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
2181
2182 /*
2183 * Scan over all items to see which ones need to be deleted according to
2184 * LP_DEAD flags.
2185 */
2186 minoff = P_FIRSTDATAKEY(opaque);
2187 maxoff = PageGetMaxOffsetNumber(page);
2188 for (offnum = minoff;
2189 offnum <= maxoff;
2190 offnum = OffsetNumberNext(offnum))
2191 {
2192 ItemId itemId = PageGetItemId(page, offnum);
2193
2194 if (ItemIdIsDead(itemId))
2195 deletable[ndeletable++] = offnum;
2196 }
2197
2198 if (ndeletable > 0)
2199 _bt_delitems_delete(rel, buffer, deletable, ndeletable, heapRel);
2200
2201 /*
2202 * Note: if we didn't find any LP_DEAD items, then the page's
2203 * BTP_HAS_GARBAGE hint bit is falsely set. We do not bother expending a
2204 * separate write to clear it, however. We will clear it when we split
2205 * the page.
2206 */
2207 }
2208