1 /*-------------------------------------------------------------------------
2  *
3  * nbtsort.c
4  *		Build a btree from sorted input by loading leaf pages sequentially.
5  *
6  * NOTES
7  *
8  * We use tuplesort.c to sort the given index tuples into order.
9  * Then we scan the index tuples in order and build the btree pages
10  * for each level.  We load source tuples into leaf-level pages.
11  * Whenever we fill a page at one level, we add a link to it to its
12  * parent level (starting a new parent level if necessary).  When
13  * done, we write out each final page on each level, adding it to
14  * its parent level.  When we have only one page on a level, it must be
15  * the root -- it can be attached to the btree metapage and we are done.
16  *
17  * This code is moderately slow (~10% slower) compared to the regular
18  * btree (insertion) build code on sorted or well-clustered data.  On
19  * random data, however, the insertion build code is unusable -- the
20  * difference on a 60MB heap is a factor of 15 because the random
21  * probes into the btree thrash the buffer pool.  (NOTE: the above
22  * "10%" estimate is probably obsolete, since it refers to an old and
23  * not very good external sort implementation that used to exist in
24  * this module.  tuplesort.c is almost certainly faster.)
25  *
26  * It is not wise to pack the pages entirely full, since then *any*
27  * insertion would cause a split (and not only of the leaf page; the need
28  * for a split would cascade right up the tree).  The steady-state load
29  * factor for btrees is usually estimated at 70%.  We choose to pack leaf
30  * pages to the user-controllable fill factor (default 90%) while upper pages
31  * are always packed to 70%.  This gives us reasonable density (there aren't
32  * many upper pages if the keys are reasonable-size) without risking a lot of
33  * cascading splits during early insertions.
34  *
35  * Formerly the index pages being built were kept in shared buffers, but
36  * that is of no value (since other backends have no interest in them yet)
37  * and it created locking problems for CHECKPOINT, because the upper-level
38  * pages were held exclusive-locked for long periods.  Now we just build
39  * the pages in local memory and smgrwrite or smgrextend them as we finish
40  * them.  They will need to be re-read into shared buffers on first use after
41  * the build finishes.
42  *
43  * Since the index will never be used unless it is completely built,
44  * from a crash-recovery point of view there is no need to WAL-log the
45  * steps of the build.  After completing the index build, we can just sync
46  * the whole file to disk using smgrimmedsync() before exiting this module.
47  * This can be seen to be sufficient for crash recovery by considering that
48  * it's effectively equivalent to what would happen if a CHECKPOINT occurred
49  * just after the index build.  However, it is clearly not sufficient if the
50  * DBA is using the WAL log for PITR or replication purposes, since another
51  * machine would not be able to reconstruct the index from WAL.  Therefore,
52  * we log the completed index pages to WAL if and only if WAL archiving is
53  * active.
54  *
55  * This code isn't concerned about the FSM at all. The caller is responsible
56  * for initializing that.
57  *
58  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
59  * Portions Copyright (c) 1994, Regents of the University of California
60  *
61  * IDENTIFICATION
62  *	  src/backend/access/nbtree/nbtsort.c
63  *
64  *-------------------------------------------------------------------------
65  */
66 
67 #include "postgres.h"
68 
69 #include "access/nbtree.h"
70 #include "access/xlog.h"
71 #include "access/xloginsert.h"
72 #include "miscadmin.h"
73 #include "storage/smgr.h"
74 #include "tcop/tcopprot.h"
75 #include "utils/rel.h"
76 #include "utils/sortsupport.h"
77 #include "utils/tuplesort.h"
78 
79 
80 /*
81  * Status record for spooling/sorting phase.  (Note we may have two of
82  * these due to the special requirements for uniqueness-checking with
83  * dead tuples.)
84  */
85 struct BTSpool
86 {
87 	Tuplesortstate *sortstate;	/* state data for tuplesort.c */
88 	Relation	heap;
89 	Relation	index;
90 	bool		isunique;
91 };
92 
93 /*
94  * Status record for a btree page being built.  We have one of these
95  * for each active tree level.
96  *
97  * The reason we need to store a copy of the minimum key is that we'll
98  * need to propagate it to the parent node when this page is linked
99  * into its parent.  However, if the page is not a leaf page, the first
100  * entry on the page doesn't need to contain a key, so we will not have
101  * stored the key itself on the page.  (You might think we could skip
102  * copying the minimum key on leaf pages, but actually we must have a
103  * writable copy anyway because we'll poke the page's address into it
104  * before passing it up to the parent...)
105  */
106 typedef struct BTPageState
107 {
108 	Page		btps_page;		/* workspace for page building */
109 	BlockNumber btps_blkno;		/* block # to write this page at */
110 	IndexTuple	btps_minkey;	/* copy of minimum key (first item) on page */
111 	OffsetNumber btps_lastoff;	/* last item offset loaded */
112 	uint32		btps_level;		/* tree level (0 = leaf) */
113 	Size		btps_full;		/* "full" if less than this much free space */
114 	struct BTPageState *btps_next;	/* link to parent level, if any */
115 } BTPageState;
116 
117 /*
118  * Overall status record for index writing phase.
119  */
120 typedef struct BTWriteState
121 {
122 	Relation	heap;
123 	Relation	index;
124 	bool		btws_use_wal;	/* dump pages to WAL? */
125 	BlockNumber btws_pages_alloced; /* # pages allocated */
126 	BlockNumber btws_pages_written; /* # pages written out */
127 	Page		btws_zeropage;	/* workspace for filling zeroes */
128 } BTWriteState;
129 
130 
131 static Page _bt_blnewpage(uint32 level);
132 static BTPageState *_bt_pagestate(BTWriteState *wstate, uint32 level);
133 static void _bt_slideleft(Page page);
134 static void _bt_sortaddtup(Page page, Size itemsize,
135 			   IndexTuple itup, OffsetNumber itup_off);
136 static void _bt_buildadd(BTWriteState *wstate, BTPageState *state,
137 			 IndexTuple itup);
138 static void _bt_uppershutdown(BTWriteState *wstate, BTPageState *state);
139 static void _bt_load(BTWriteState *wstate,
140 		 BTSpool *btspool, BTSpool *btspool2);
141 
142 
143 /*
144  * Interface routines
145  */
146 
147 
148 /*
149  * create and initialize a spool structure
150  */
151 BTSpool *
_bt_spoolinit(Relation heap,Relation index,bool isunique,bool isdead)152 _bt_spoolinit(Relation heap, Relation index, bool isunique, bool isdead)
153 {
154 	BTSpool    *btspool = (BTSpool *) palloc0(sizeof(BTSpool));
155 	int			btKbytes;
156 
157 	btspool->heap = heap;
158 	btspool->index = index;
159 	btspool->isunique = isunique;
160 
161 	/*
162 	 * We size the sort area as maintenance_work_mem rather than work_mem to
163 	 * speed index creation.  This should be OK since a single backend can't
164 	 * run multiple index creations in parallel.  Note that creation of a
165 	 * unique index actually requires two BTSpool objects.  We expect that the
166 	 * second one (for dead tuples) won't get very full, so we give it only
167 	 * work_mem.
168 	 */
169 	btKbytes = isdead ? work_mem : maintenance_work_mem;
170 	btspool->sortstate = tuplesort_begin_index_btree(heap, index, isunique,
171 													 btKbytes, false);
172 
173 	return btspool;
174 }
175 
176 /*
177  * clean up a spool structure and its substructures.
178  */
179 void
_bt_spooldestroy(BTSpool * btspool)180 _bt_spooldestroy(BTSpool *btspool)
181 {
182 	tuplesort_end(btspool->sortstate);
183 	pfree(btspool);
184 }
185 
186 /*
187  * spool an index entry into the sort file.
188  */
189 void
_bt_spool(BTSpool * btspool,ItemPointer self,Datum * values,bool * isnull)190 _bt_spool(BTSpool *btspool, ItemPointer self, Datum *values, bool *isnull)
191 {
192 	tuplesort_putindextuplevalues(btspool->sortstate, btspool->index,
193 								  self, values, isnull);
194 }
195 
196 /*
197  * given a spool loaded by successive calls to _bt_spool,
198  * create an entire btree.
199  */
200 void
_bt_leafbuild(BTSpool * btspool,BTSpool * btspool2)201 _bt_leafbuild(BTSpool *btspool, BTSpool *btspool2)
202 {
203 	BTWriteState wstate;
204 
205 #ifdef BTREE_BUILD_STATS
206 	if (log_btree_build_stats)
207 	{
208 		ShowUsage("BTREE BUILD (Spool) STATISTICS");
209 		ResetUsage();
210 	}
211 #endif							/* BTREE_BUILD_STATS */
212 
213 	tuplesort_performsort(btspool->sortstate);
214 	if (btspool2)
215 		tuplesort_performsort(btspool2->sortstate);
216 
217 	wstate.heap = btspool->heap;
218 	wstate.index = btspool->index;
219 
220 	/*
221 	 * We need to log index creation in WAL iff WAL archiving/streaming is
222 	 * enabled UNLESS the index isn't WAL-logged anyway.
223 	 */
224 	wstate.btws_use_wal = XLogIsNeeded() && RelationNeedsWAL(wstate.index);
225 
226 	/* reserve the metapage */
227 	wstate.btws_pages_alloced = BTREE_METAPAGE + 1;
228 	wstate.btws_pages_written = 0;
229 	wstate.btws_zeropage = NULL;	/* until needed */
230 
231 	_bt_load(&wstate, btspool, btspool2);
232 }
233 
234 
235 /*
236  * Internal routines.
237  */
238 
239 
240 /*
241  * allocate workspace for a new, clean btree page, not linked to any siblings.
242  */
243 static Page
_bt_blnewpage(uint32 level)244 _bt_blnewpage(uint32 level)
245 {
246 	Page		page;
247 	BTPageOpaque opaque;
248 
249 	page = (Page) palloc(BLCKSZ);
250 
251 	/* Zero the page and set up standard page header info */
252 	_bt_pageinit(page, BLCKSZ);
253 
254 	/* Initialize BT opaque state */
255 	opaque = (BTPageOpaque) PageGetSpecialPointer(page);
256 	opaque->btpo_prev = opaque->btpo_next = P_NONE;
257 	opaque->btpo.level = level;
258 	opaque->btpo_flags = (level > 0) ? 0 : BTP_LEAF;
259 	opaque->btpo_cycleid = 0;
260 
261 	/* Make the P_HIKEY line pointer appear allocated */
262 	((PageHeader) page)->pd_lower += sizeof(ItemIdData);
263 
264 	return page;
265 }
266 
267 /*
268  * emit a completed btree page, and release the working storage.
269  */
270 static void
_bt_blwritepage(BTWriteState * wstate,Page page,BlockNumber blkno)271 _bt_blwritepage(BTWriteState *wstate, Page page, BlockNumber blkno)
272 {
273 	/* Ensure rd_smgr is open (could have been closed by relcache flush!) */
274 	RelationOpenSmgr(wstate->index);
275 
276 	/* XLOG stuff */
277 	if (wstate->btws_use_wal)
278 	{
279 		/* We use the heap NEWPAGE record type for this */
280 		log_newpage(&wstate->index->rd_node, MAIN_FORKNUM, blkno, page, true);
281 	}
282 
283 	/*
284 	 * If we have to write pages nonsequentially, fill in the space with
285 	 * zeroes until we come back and overwrite.  This is not logically
286 	 * necessary on standard Unix filesystems (unwritten space will read as
287 	 * zeroes anyway), but it should help to avoid fragmentation. The dummy
288 	 * pages aren't WAL-logged though.
289 	 */
290 	while (blkno > wstate->btws_pages_written)
291 	{
292 		if (!wstate->btws_zeropage)
293 			wstate->btws_zeropage = (Page) palloc0(BLCKSZ);
294 		/* don't set checksum for all-zero page */
295 		smgrextend(wstate->index->rd_smgr, MAIN_FORKNUM,
296 				   wstate->btws_pages_written++,
297 				   (char *) wstate->btws_zeropage,
298 				   true);
299 	}
300 
301 	PageSetChecksumInplace(page, blkno);
302 
303 	/*
304 	 * Now write the page.  There's no need for smgr to schedule an fsync for
305 	 * this write; we'll do it ourselves before ending the build.
306 	 */
307 	if (blkno == wstate->btws_pages_written)
308 	{
309 		/* extending the file... */
310 		smgrextend(wstate->index->rd_smgr, MAIN_FORKNUM, blkno,
311 				   (char *) page, true);
312 		wstate->btws_pages_written++;
313 	}
314 	else
315 	{
316 		/* overwriting a block we zero-filled before */
317 		smgrwrite(wstate->index->rd_smgr, MAIN_FORKNUM, blkno,
318 				  (char *) page, true);
319 	}
320 
321 	pfree(page);
322 }
323 
324 /*
325  * allocate and initialize a new BTPageState.  the returned structure
326  * is suitable for immediate use by _bt_buildadd.
327  */
328 static BTPageState *
_bt_pagestate(BTWriteState * wstate,uint32 level)329 _bt_pagestate(BTWriteState *wstate, uint32 level)
330 {
331 	BTPageState *state = (BTPageState *) palloc0(sizeof(BTPageState));
332 
333 	/* create initial page for level */
334 	state->btps_page = _bt_blnewpage(level);
335 
336 	/* and assign it a page position */
337 	state->btps_blkno = wstate->btws_pages_alloced++;
338 
339 	state->btps_minkey = NULL;
340 	/* initialize lastoff so first item goes into P_FIRSTKEY */
341 	state->btps_lastoff = P_HIKEY;
342 	state->btps_level = level;
343 	/* set "full" threshold based on level.  See notes at head of file. */
344 	if (level > 0)
345 		state->btps_full = (BLCKSZ * (100 - BTREE_NONLEAF_FILLFACTOR) / 100);
346 	else
347 		state->btps_full = RelationGetTargetPageFreeSpace(wstate->index,
348 														  BTREE_DEFAULT_FILLFACTOR);
349 	/* no parent level, yet */
350 	state->btps_next = NULL;
351 
352 	return state;
353 }
354 
355 /*
356  * slide an array of ItemIds back one slot (from P_FIRSTKEY to
357  * P_HIKEY, overwriting P_HIKEY).  we need to do this when we discover
358  * that we have built an ItemId array in what has turned out to be a
359  * P_RIGHTMOST page.
360  */
361 static void
_bt_slideleft(Page page)362 _bt_slideleft(Page page)
363 {
364 	OffsetNumber off;
365 	OffsetNumber maxoff;
366 	ItemId		previi;
367 	ItemId		thisii;
368 
369 	if (!PageIsEmpty(page))
370 	{
371 		maxoff = PageGetMaxOffsetNumber(page);
372 		previi = PageGetItemId(page, P_HIKEY);
373 		for (off = P_FIRSTKEY; off <= maxoff; off = OffsetNumberNext(off))
374 		{
375 			thisii = PageGetItemId(page, off);
376 			*previi = *thisii;
377 			previi = thisii;
378 		}
379 		((PageHeader) page)->pd_lower -= sizeof(ItemIdData);
380 	}
381 }
382 
383 /*
384  * Add an item to a page being built.
385  *
386  * The main difference between this routine and a bare PageAddItem call
387  * is that this code knows that the leftmost data item on a non-leaf
388  * btree page doesn't need to have a key.  Therefore, it strips such
389  * items down to just the item header.
390  *
391  * This is almost like nbtinsert.c's _bt_pgaddtup(), but we can't use
392  * that because it assumes that P_RIGHTMOST() will return the correct
393  * answer for the page.  Here, we don't know yet if the page will be
394  * rightmost.  Offset P_FIRSTKEY is always the first data key.
395  */
396 static void
_bt_sortaddtup(Page page,Size itemsize,IndexTuple itup,OffsetNumber itup_off)397 _bt_sortaddtup(Page page,
398 			   Size itemsize,
399 			   IndexTuple itup,
400 			   OffsetNumber itup_off)
401 {
402 	BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
403 	IndexTupleData trunctuple;
404 
405 	if (!P_ISLEAF(opaque) && itup_off == P_FIRSTKEY)
406 	{
407 		trunctuple = *itup;
408 		trunctuple.t_info = sizeof(IndexTupleData);
409 		itup = &trunctuple;
410 		itemsize = sizeof(IndexTupleData);
411 	}
412 
413 	if (PageAddItem(page, (Item) itup, itemsize, itup_off,
414 					false, false) == InvalidOffsetNumber)
415 		elog(ERROR, "failed to add item to the index page");
416 }
417 
418 /*----------
419  * Add an item to a disk page from the sort output.
420  *
421  * We must be careful to observe the page layout conventions of nbtsearch.c:
422  * - rightmost pages start data items at P_HIKEY instead of at P_FIRSTKEY.
423  * - on non-leaf pages, the key portion of the first item need not be
424  *	 stored, we should store only the link.
425  *
426  * A leaf page being built looks like:
427  *
428  * +----------------+---------------------------------+
429  * | PageHeaderData | linp0 linp1 linp2 ...           |
430  * +-----------+----+---------------------------------+
431  * | ... linpN |									  |
432  * +-----------+--------------------------------------+
433  * |	 ^ last										  |
434  * |												  |
435  * +-------------+------------------------------------+
436  * |			 | itemN ...                          |
437  * +-------------+------------------+-----------------+
438  * |		  ... item3 item2 item1 | "special space" |
439  * +--------------------------------+-----------------+
440  *
441  * Contrast this with the diagram in bufpage.h; note the mismatch
442  * between linps and items.  This is because we reserve linp0 as a
443  * placeholder for the pointer to the "high key" item; when we have
444  * filled up the page, we will set linp0 to point to itemN and clear
445  * linpN.  On the other hand, if we find this is the last (rightmost)
446  * page, we leave the items alone and slide the linp array over.
447  *
448  * 'last' pointer indicates the last offset added to the page.
449  *----------
450  */
451 static void
_bt_buildadd(BTWriteState * wstate,BTPageState * state,IndexTuple itup)452 _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup)
453 {
454 	Page		npage;
455 	BlockNumber nblkno;
456 	OffsetNumber last_off;
457 	Size		pgspc;
458 	Size		itupsz;
459 
460 	/*
461 	 * This is a handy place to check for cancel interrupts during the btree
462 	 * load phase of index creation.
463 	 */
464 	CHECK_FOR_INTERRUPTS();
465 
466 	npage = state->btps_page;
467 	nblkno = state->btps_blkno;
468 	last_off = state->btps_lastoff;
469 
470 	pgspc = PageGetFreeSpace(npage);
471 	itupsz = IndexTupleDSize(*itup);
472 	itupsz = MAXALIGN(itupsz);
473 
474 	/*
475 	 * Check whether the item can fit on a btree page at all. (Eventually, we
476 	 * ought to try to apply TOAST methods if not.) We actually need to be
477 	 * able to fit three items on every page, so restrict any one item to 1/3
478 	 * the per-page available space. Note that at this point, itupsz doesn't
479 	 * include the ItemId.
480 	 *
481 	 * NOTE: similar code appears in _bt_insertonpg() to defend against
482 	 * oversize items being inserted into an already-existing index. But
483 	 * during creation of an index, we don't go through there.
484 	 */
485 	if (itupsz > BTMaxItemSize(npage))
486 		ereport(ERROR,
487 				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
488 				 errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
489 						itupsz, BTMaxItemSize(npage),
490 						RelationGetRelationName(wstate->index)),
491 				 errhint("Values larger than 1/3 of a buffer page cannot be indexed.\n"
492 						 "Consider a function index of an MD5 hash of the value, "
493 						 "or use full text indexing."),
494 				 errtableconstraint(wstate->heap,
495 									RelationGetRelationName(wstate->index))));
496 
497 	/*
498 	 * Check to see if page is "full".  It's definitely full if the item won't
499 	 * fit.  Otherwise, compare to the target freespace derived from the
500 	 * fillfactor.  However, we must put at least two items on each page, so
501 	 * disregard fillfactor if we don't have that many.
502 	 */
503 	if (pgspc < itupsz || (pgspc < state->btps_full && last_off > P_FIRSTKEY))
504 	{
505 		/*
506 		 * Finish off the page and write it out.
507 		 */
508 		Page		opage = npage;
509 		BlockNumber oblkno = nblkno;
510 		ItemId		ii;
511 		ItemId		hii;
512 		IndexTuple	oitup;
513 
514 		/* Create new page of same level */
515 		npage = _bt_blnewpage(state->btps_level);
516 
517 		/* and assign it a page position */
518 		nblkno = wstate->btws_pages_alloced++;
519 
520 		/*
521 		 * We copy the last item on the page into the new page, and then
522 		 * rearrange the old page so that the 'last item' becomes its high key
523 		 * rather than a true data item.  There had better be at least two
524 		 * items on the page already, else the page would be empty of useful
525 		 * data.
526 		 */
527 		Assert(last_off > P_FIRSTKEY);
528 		ii = PageGetItemId(opage, last_off);
529 		oitup = (IndexTuple) PageGetItem(opage, ii);
530 		_bt_sortaddtup(npage, ItemIdGetLength(ii), oitup, P_FIRSTKEY);
531 
532 		/*
533 		 * Move 'last' into the high key position on opage
534 		 */
535 		hii = PageGetItemId(opage, P_HIKEY);
536 		*hii = *ii;
537 		ItemIdSetUnused(ii);	/* redundant */
538 		((PageHeader) opage)->pd_lower -= sizeof(ItemIdData);
539 
540 		/*
541 		 * Link the old page into its parent, using its minimum key. If we
542 		 * don't have a parent, we have to create one; this adds a new btree
543 		 * level.
544 		 */
545 		if (state->btps_next == NULL)
546 			state->btps_next = _bt_pagestate(wstate, state->btps_level + 1);
547 
548 		Assert(state->btps_minkey != NULL);
549 		ItemPointerSet(&(state->btps_minkey->t_tid), oblkno, P_HIKEY);
550 		_bt_buildadd(wstate, state->btps_next, state->btps_minkey);
551 		pfree(state->btps_minkey);
552 
553 		/*
554 		 * Save a copy of the minimum key for the new page.  We have to copy
555 		 * it off the old page, not the new one, in case we are not at leaf
556 		 * level.
557 		 */
558 		state->btps_minkey = CopyIndexTuple(oitup);
559 
560 		/*
561 		 * Set the sibling links for both pages.
562 		 */
563 		{
564 			BTPageOpaque oopaque = (BTPageOpaque) PageGetSpecialPointer(opage);
565 			BTPageOpaque nopaque = (BTPageOpaque) PageGetSpecialPointer(npage);
566 
567 			oopaque->btpo_next = nblkno;
568 			nopaque->btpo_prev = oblkno;
569 			nopaque->btpo_next = P_NONE;	/* redundant */
570 		}
571 
572 		/*
573 		 * Write out the old page.  We never need to touch it again, so we can
574 		 * free the opage workspace too.
575 		 */
576 		_bt_blwritepage(wstate, opage, oblkno);
577 
578 		/*
579 		 * Reset last_off to point to new page
580 		 */
581 		last_off = P_FIRSTKEY;
582 	}
583 
584 	/*
585 	 * If the new item is the first for its page, stash a copy for later. Note
586 	 * this will only happen for the first item on a level; on later pages,
587 	 * the first item for a page is copied from the prior page in the code
588 	 * above.
589 	 */
590 	if (last_off == P_HIKEY)
591 	{
592 		Assert(state->btps_minkey == NULL);
593 		state->btps_minkey = CopyIndexTuple(itup);
594 	}
595 
596 	/*
597 	 * Add the new item into the current page.
598 	 */
599 	last_off = OffsetNumberNext(last_off);
600 	_bt_sortaddtup(npage, itupsz, itup, last_off);
601 
602 	state->btps_page = npage;
603 	state->btps_blkno = nblkno;
604 	state->btps_lastoff = last_off;
605 }
606 
607 /*
608  * Finish writing out the completed btree.
609  */
610 static void
_bt_uppershutdown(BTWriteState * wstate,BTPageState * state)611 _bt_uppershutdown(BTWriteState *wstate, BTPageState *state)
612 {
613 	BTPageState *s;
614 	BlockNumber rootblkno = P_NONE;
615 	uint32		rootlevel = 0;
616 	Page		metapage;
617 
618 	/*
619 	 * Each iteration of this loop completes one more level of the tree.
620 	 */
621 	for (s = state; s != NULL; s = s->btps_next)
622 	{
623 		BlockNumber blkno;
624 		BTPageOpaque opaque;
625 
626 		blkno = s->btps_blkno;
627 		opaque = (BTPageOpaque) PageGetSpecialPointer(s->btps_page);
628 
629 		/*
630 		 * We have to link the last page on this level to somewhere.
631 		 *
632 		 * If we're at the top, it's the root, so attach it to the metapage.
633 		 * Otherwise, add an entry for it to its parent using its minimum key.
634 		 * This may cause the last page of the parent level to split, but
635 		 * that's not a problem -- we haven't gotten to it yet.
636 		 */
637 		if (s->btps_next == NULL)
638 		{
639 			opaque->btpo_flags |= BTP_ROOT;
640 			rootblkno = blkno;
641 			rootlevel = s->btps_level;
642 		}
643 		else
644 		{
645 			Assert(s->btps_minkey != NULL);
646 			ItemPointerSet(&(s->btps_minkey->t_tid), blkno, P_HIKEY);
647 			_bt_buildadd(wstate, s->btps_next, s->btps_minkey);
648 			pfree(s->btps_minkey);
649 			s->btps_minkey = NULL;
650 		}
651 
652 		/*
653 		 * This is the rightmost page, so the ItemId array needs to be slid
654 		 * back one slot.  Then we can dump out the page.
655 		 */
656 		_bt_slideleft(s->btps_page);
657 		_bt_blwritepage(wstate, s->btps_page, s->btps_blkno);
658 		s->btps_page = NULL;	/* writepage freed the workspace */
659 	}
660 
661 	/*
662 	 * As the last step in the process, construct the metapage and make it
663 	 * point to the new root (unless we had no data at all, in which case it's
664 	 * set to point to "P_NONE").  This changes the index to the "valid" state
665 	 * by filling in a valid magic number in the metapage.
666 	 */
667 	metapage = (Page) palloc(BLCKSZ);
668 	_bt_initmetapage(metapage, rootblkno, rootlevel);
669 	_bt_blwritepage(wstate, metapage, BTREE_METAPAGE);
670 }
671 
672 /*
673  * Read tuples in correct sort order from tuplesort, and load them into
674  * btree leaves.
675  */
676 static void
_bt_load(BTWriteState * wstate,BTSpool * btspool,BTSpool * btspool2)677 _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2)
678 {
679 	BTPageState *state = NULL;
680 	bool		merge = (btspool2 != NULL);
681 	IndexTuple	itup,
682 				itup2 = NULL;
683 	bool		load1;
684 	TupleDesc	tupdes = RelationGetDescr(wstate->index);
685 	int			i,
686 				keysz = RelationGetNumberOfAttributes(wstate->index);
687 	ScanKey		indexScanKey = NULL;
688 	SortSupport sortKeys;
689 
690 	if (merge)
691 	{
692 		/*
693 		 * Another BTSpool for dead tuples exists. Now we have to merge
694 		 * btspool and btspool2.
695 		 */
696 
697 		/* the preparation of merge */
698 		itup = tuplesort_getindextuple(btspool->sortstate, true);
699 		itup2 = tuplesort_getindextuple(btspool2->sortstate, true);
700 		indexScanKey = _bt_mkscankey_nodata(wstate->index);
701 
702 		/* Prepare SortSupport data for each column */
703 		sortKeys = (SortSupport) palloc0(keysz * sizeof(SortSupportData));
704 
705 		for (i = 0; i < keysz; i++)
706 		{
707 			SortSupport sortKey = sortKeys + i;
708 			ScanKey		scanKey = indexScanKey + i;
709 			int16		strategy;
710 
711 			sortKey->ssup_cxt = CurrentMemoryContext;
712 			sortKey->ssup_collation = scanKey->sk_collation;
713 			sortKey->ssup_nulls_first =
714 				(scanKey->sk_flags & SK_BT_NULLS_FIRST) != 0;
715 			sortKey->ssup_attno = scanKey->sk_attno;
716 			/* Abbreviation is not supported here */
717 			sortKey->abbreviate = false;
718 
719 			AssertState(sortKey->ssup_attno != 0);
720 
721 			strategy = (scanKey->sk_flags & SK_BT_DESC) != 0 ?
722 				BTGreaterStrategyNumber : BTLessStrategyNumber;
723 
724 			PrepareSortSupportFromIndexRel(wstate->index, strategy, sortKey);
725 		}
726 
727 		_bt_freeskey(indexScanKey);
728 
729 		for (;;)
730 		{
731 			load1 = true;		/* load BTSpool next ? */
732 			if (itup2 == NULL)
733 			{
734 				if (itup == NULL)
735 					break;
736 			}
737 			else if (itup != NULL)
738 			{
739 				for (i = 1; i <= keysz; i++)
740 				{
741 					SortSupport entry;
742 					Datum		attrDatum1,
743 								attrDatum2;
744 					bool		isNull1,
745 								isNull2;
746 					int32		compare;
747 
748 					entry = sortKeys + i - 1;
749 					attrDatum1 = index_getattr(itup, i, tupdes, &isNull1);
750 					attrDatum2 = index_getattr(itup2, i, tupdes, &isNull2);
751 
752 					compare = ApplySortComparator(attrDatum1, isNull1,
753 												  attrDatum2, isNull2,
754 												  entry);
755 					if (compare > 0)
756 					{
757 						load1 = false;
758 						break;
759 					}
760 					else if (compare < 0)
761 						break;
762 				}
763 			}
764 			else
765 				load1 = false;
766 
767 			/* When we see first tuple, create first index page */
768 			if (state == NULL)
769 				state = _bt_pagestate(wstate, 0);
770 
771 			if (load1)
772 			{
773 				_bt_buildadd(wstate, state, itup);
774 				itup = tuplesort_getindextuple(btspool->sortstate, true);
775 			}
776 			else
777 			{
778 				_bt_buildadd(wstate, state, itup2);
779 				itup2 = tuplesort_getindextuple(btspool2->sortstate, true);
780 			}
781 		}
782 		pfree(sortKeys);
783 	}
784 	else
785 	{
786 		/* merge is unnecessary */
787 		while ((itup = tuplesort_getindextuple(btspool->sortstate,
788 											   true)) != NULL)
789 		{
790 			/* When we see first tuple, create first index page */
791 			if (state == NULL)
792 				state = _bt_pagestate(wstate, 0);
793 
794 			_bt_buildadd(wstate, state, itup);
795 		}
796 	}
797 
798 	/* Close down final pages and write the metapage */
799 	_bt_uppershutdown(wstate, state);
800 
801 	/*
802 	 * If the index is WAL-logged, we must fsync it down to disk before it's
803 	 * safe to commit the transaction.  (For a non-WAL-logged index we don't
804 	 * care since the index will be uninteresting after a crash anyway.)
805 	 *
806 	 * It's obvious that we must do this when not WAL-logging the build. It's
807 	 * less obvious that we have to do it even if we did WAL-log the index
808 	 * pages.  The reason is that since we're building outside shared buffers,
809 	 * a CHECKPOINT occurring during the build has no way to flush the
810 	 * previously written data to disk (indeed it won't know the index even
811 	 * exists).  A crash later on would replay WAL from the checkpoint,
812 	 * therefore it wouldn't replay our earlier WAL entries. If we do not
813 	 * fsync those pages here, they might still not be on disk when the crash
814 	 * occurs.
815 	 */
816 	if (RelationNeedsWAL(wstate->index))
817 	{
818 		RelationOpenSmgr(wstate->index);
819 		smgrimmedsync(wstate->index->rd_smgr, MAIN_FORKNUM);
820 	}
821 }
822