1 /*-------------------------------------------------------------------------
2  *
3  * rumentrypage.c
4  *	  page utilities routines for the postgres inverted index access method.
5  *
6  *
7  * Portions Copyright (c) 2015-2019, Postgres Professional
8  * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
9  * Portions Copyright (c) 1994, Regents of the University of California
10  *
11  *-------------------------------------------------------------------------
12  */
13 
14 #include "postgres.h"
15 
16 #include "rum.h"
17 
18 /*
19  * Read item pointers with additional information from leaf data page.
20  * Information is stored in the same manner as in leaf data pages.
21  */
22 void
rumReadTuple(RumState * rumstate,OffsetNumber attnum,IndexTuple itup,RumItem * items,bool copyAddInfo)23 rumReadTuple(RumState * rumstate, OffsetNumber attnum,
24 			 IndexTuple itup, RumItem * items, bool copyAddInfo)
25 {
26 	Pointer		ptr = RumGetPosting(itup);
27 	RumItem		item;
28 	int			nipd = RumGetNPosting(itup),
29 				i;
30 
31 	ItemPointerSetMin(&item.iptr);
32 	for (i = 0; i < nipd; i++)
33 	{
34 		ptr = rumDataPageLeafRead(ptr, attnum, &item, copyAddInfo, rumstate);
35 		items[i] = item;
36 	}
37 }
38 
39 /*
40  * Read only item pointers from leaf data page.
41  * Information is stored in the same manner as in leaf data pages.
42  */
43 void
rumReadTuplePointers(RumState * rumstate,OffsetNumber attnum,IndexTuple itup,ItemPointerData * ipd)44 rumReadTuplePointers(RumState * rumstate, OffsetNumber attnum,
45 					 IndexTuple itup, ItemPointerData *ipd)
46 {
47 	Pointer		ptr = RumGetPosting(itup);
48 	int			nipd = RumGetNPosting(itup),
49 				i;
50 	RumItem		item;
51 
52 	ItemPointerSetMin(&item.iptr);
53 	for (i = 0; i < nipd; i++)
54 	{
55 		ptr = rumDataPageLeafReadPointer(ptr, attnum, &item, rumstate);
56 		ipd[i] = item.iptr;
57 	}
58 }
59 
60 /*
61  * Form a non-leaf entry tuple by copying the key data from the given tuple,
62  * which can be either a leaf or non-leaf entry tuple.
63  *
64  * Any posting list in the source tuple is not copied.  The specified child
65  * block number is inserted into t_tid.
66  */
67 static IndexTuple
RumFormInteriorTuple(RumBtree btree,IndexTuple itup,Page page,BlockNumber childblk)68 RumFormInteriorTuple(RumBtree btree, IndexTuple itup, Page page,
69 					 BlockNumber childblk)
70 {
71 	IndexTuple	nitup;
72 	RumNullCategory category;
73 
74 	if (RumPageIsLeaf(page) && !RumIsPostingTree(itup))
75 	{
76 		/* Tuple contains a posting list, just copy stuff before that */
77 		uint32		origsize = RumGetPostingOffset(itup);
78 
79 		origsize = MAXALIGN(origsize);
80 		nitup = (IndexTuple) palloc(origsize);
81 		memcpy(nitup, itup, origsize);
82 		/* ... be sure to fix the size header field ... */
83 		nitup->t_info &= ~INDEX_SIZE_MASK;
84 		nitup->t_info |= origsize;
85 	}
86 	else
87 	{
88 		/* Copy the tuple as-is */
89 		nitup = (IndexTuple) palloc(IndexTupleSize(itup));
90 		memcpy(nitup, itup, IndexTupleSize(itup));
91 	}
92 
93 	/* Now insert the correct downlink */
94 	RumSetDownlink(nitup, childblk);
95 
96 	rumtuple_get_key(btree->rumstate, itup, &category);
97 	if (category != RUM_CAT_NORM_KEY)
98 	{
99 		Assert(IndexTupleHasNulls(itup));
100 		nitup->t_info |= INDEX_NULL_MASK;
101 		RumSetNullCategory(nitup, category);
102 	}
103 
104 	return nitup;
105 }
106 
107 /*
108  * Entry tree is a "static", ie tuple never deletes from it,
109  * so we don't use right bound, we use rightmost key instead.
110  */
111 static IndexTuple
getRightMostTuple(Page page)112 getRightMostTuple(Page page)
113 {
114 	OffsetNumber maxoff = PageGetMaxOffsetNumber(page);
115 
116 	Assert(maxoff != InvalidOffsetNumber);
117 
118 	return (IndexTuple) PageGetItem(page, PageGetItemId(page, maxoff));
119 }
120 
121 static bool
entryIsMoveRight(RumBtree btree,Page page)122 entryIsMoveRight(RumBtree btree, Page page)
123 {
124 	IndexTuple	itup;
125 	OffsetNumber attnum;
126 	Datum		key;
127 	RumNullCategory category;
128 
129 	if (RumPageRightMost(page))
130 		return false;
131 
132 	itup = getRightMostTuple(page);
133 	attnum = rumtuple_get_attrnum(btree->rumstate, itup);
134 	key = rumtuple_get_key(btree->rumstate, itup, &category);
135 
136 	if (rumCompareAttEntries(btree->rumstate,
137 				   btree->entryAttnum, btree->entryKey, btree->entryCategory,
138 							 attnum, key, category) > 0)
139 		return true;
140 
141 	return false;
142 }
143 
144 /*
145  * Find correct tuple in non-leaf page. It supposed that
146  * page correctly chosen and searching value SHOULD be on page
147  */
148 static BlockNumber
entryLocateEntry(RumBtree btree,RumBtreeStack * stack)149 entryLocateEntry(RumBtree btree, RumBtreeStack * stack)
150 {
151 	OffsetNumber low,
152 				high,
153 				maxoff;
154 	IndexTuple	itup = NULL;
155 	int			result;
156 	Page		page = BufferGetPage(stack->buffer);
157 
158 	Assert(!RumPageIsLeaf(page));
159 	Assert(!RumPageIsData(page));
160 
161 	if (btree->fullScan)
162 	{
163 		stack->off = FirstOffsetNumber;
164 		stack->predictNumber *= PageGetMaxOffsetNumber(page);
165 		return btree->getLeftMostPage(btree, page);
166 	}
167 
168 	low = FirstOffsetNumber;
169 	maxoff = high = PageGetMaxOffsetNumber(page);
170 	Assert(high >= low);
171 
172 	high++;
173 
174 	while (high > low)
175 	{
176 		OffsetNumber mid = low + ((high - low) / 2);
177 
178 		if (mid == maxoff && RumPageRightMost(page))
179 		{
180 			/* Right infinity */
181 			result = -1;
182 		}
183 		else
184 		{
185 			OffsetNumber attnum;
186 			Datum		key;
187 			RumNullCategory category;
188 
189 			itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, mid));
190 			attnum = rumtuple_get_attrnum(btree->rumstate, itup);
191 			key = rumtuple_get_key(btree->rumstate, itup, &category);
192 			result = rumCompareAttEntries(btree->rumstate,
193 										  btree->entryAttnum,
194 										  btree->entryKey,
195 										  btree->entryCategory,
196 										  attnum, key, category);
197 		}
198 
199 		if (result == 0)
200 		{
201 			stack->off = mid;
202 			Assert(RumGetDownlink(itup) != RUM_ROOT_BLKNO);
203 			return RumGetDownlink(itup);
204 		}
205 		else if (result > 0)
206 			low = mid + 1;
207 		else
208 			high = mid;
209 	}
210 
211 	Assert(high >= FirstOffsetNumber && high <= maxoff);
212 
213 	stack->off = high;
214 	itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, high));
215 	Assert(RumGetDownlink(itup) != RUM_ROOT_BLKNO);
216 	return RumGetDownlink(itup);
217 }
218 
219 /*
220  * Searches correct position for value on leaf page.
221  * Page should be correctly chosen.
222  * Returns true if value found on page.
223  */
224 static bool
entryLocateLeafEntry(RumBtree btree,RumBtreeStack * stack)225 entryLocateLeafEntry(RumBtree btree, RumBtreeStack * stack)
226 {
227 	Page		page = BufferGetPage(stack->buffer);
228 	OffsetNumber low,
229 				high;
230 
231 	Assert(RumPageIsLeaf(page));
232 	Assert(!RumPageIsData(page));
233 
234 	if (btree->fullScan)
235 	{
236 		stack->off = FirstOffsetNumber;
237 		return true;
238 	}
239 
240 	low = FirstOffsetNumber;
241 	high = PageGetMaxOffsetNumber(page);
242 
243 	if (high < low)
244 	{
245 		stack->off = FirstOffsetNumber;
246 		return false;
247 	}
248 
249 	high++;
250 
251 	while (high > low)
252 	{
253 		OffsetNumber mid = low + ((high - low) / 2);
254 		IndexTuple	itup;
255 		OffsetNumber attnum;
256 		Datum		key;
257 		RumNullCategory category;
258 		int			result;
259 
260 		itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, mid));
261 		attnum = rumtuple_get_attrnum(btree->rumstate, itup);
262 		key = rumtuple_get_key(btree->rumstate, itup, &category);
263 		result = rumCompareAttEntries(btree->rumstate,
264 									  btree->entryAttnum,
265 									  btree->entryKey,
266 									  btree->entryCategory,
267 									  attnum, key, category);
268 		if (result == 0)
269 		{
270 			stack->off = mid;
271 			return true;
272 		}
273 		else if (result > 0)
274 			low = mid + 1;
275 		else
276 			high = mid;
277 	}
278 
279 	stack->off = high;
280 	return false;
281 }
282 
283 static OffsetNumber
entryFindChildPtr(RumBtree btree,Page page,BlockNumber blkno,OffsetNumber storedOff)284 entryFindChildPtr(RumBtree btree, Page page, BlockNumber blkno, OffsetNumber storedOff)
285 {
286 	OffsetNumber i,
287 				maxoff = PageGetMaxOffsetNumber(page);
288 	IndexTuple	itup;
289 
290 	Assert(!RumPageIsLeaf(page));
291 	Assert(!RumPageIsData(page));
292 
293 	/* if page isn't changed, we returns storedOff */
294 	if (storedOff >= FirstOffsetNumber && storedOff <= maxoff)
295 	{
296 		itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, storedOff));
297 		if (RumGetDownlink(itup) == blkno)
298 			return storedOff;
299 
300 		/*
301 		 * we hope, that needed pointer goes to right. It's true if there
302 		 * wasn't a deletion
303 		 */
304 		for (i = storedOff + 1; i <= maxoff; i++)
305 		{
306 			itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
307 			if (RumGetDownlink(itup) == blkno)
308 				return i;
309 		}
310 		maxoff = storedOff - 1;
311 	}
312 
313 	/* last chance */
314 	for (i = FirstOffsetNumber; i <= maxoff; i++)
315 	{
316 		itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
317 		if (RumGetDownlink(itup) == blkno)
318 			return i;
319 	}
320 
321 	return InvalidOffsetNumber;
322 }
323 
324 static BlockNumber
entryGetLeftMostPage(RumBtree btree,Page page)325 entryGetLeftMostPage(RumBtree btree, Page page)
326 {
327 	IndexTuple	itup;
328 
329 	Assert(!RumPageIsLeaf(page));
330 	Assert(!RumPageIsData(page));
331 	Assert(PageGetMaxOffsetNumber(page) >= FirstOffsetNumber);
332 
333 	itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, FirstOffsetNumber));
334 	return RumGetDownlink(itup);
335 }
336 
337 static bool
entryIsEnoughSpace(RumBtree btree,Buffer buf,OffsetNumber off)338 entryIsEnoughSpace(RumBtree btree, Buffer buf, OffsetNumber off)
339 {
340 	Size		itupsz = 0;
341 	Page		page = BufferGetPage(buf);
342 
343 	Assert(btree->entry);
344 	Assert(!RumPageIsData(page));
345 
346 	if (btree->isDelete)
347 	{
348 		IndexTuple	itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, off));
349 
350 		itupsz = MAXALIGN(IndexTupleSize(itup)) + sizeof(ItemIdData);
351 	}
352 
353 	if (PageGetFreeSpace(page) + itupsz >= MAXALIGN(IndexTupleSize(btree->entry)) + sizeof(ItemIdData))
354 		return true;
355 
356 	return false;
357 }
358 
359 /*
360  * Delete tuple on leaf page if tuples existed and we
361  * should update it, update old child blkno to new right page
362  * if child split occurred
363  */
364 static BlockNumber
entryPreparePage(RumBtree btree,Page page,OffsetNumber off)365 entryPreparePage(RumBtree btree, Page page, OffsetNumber off)
366 {
367 	BlockNumber ret = InvalidBlockNumber;
368 
369 	Assert(btree->entry);
370 	Assert(!RumPageIsData(page));
371 
372 	if (btree->isDelete)
373 	{
374 		Assert(RumPageIsLeaf(page));
375 		PageIndexTupleDelete(page, off);
376 	}
377 
378 	if (!RumPageIsLeaf(page) && btree->rightblkno != InvalidBlockNumber)
379 	{
380 		IndexTuple	itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, off));
381 
382 		RumSetDownlink(itup, btree->rightblkno);
383 		ret = btree->rightblkno;
384 	}
385 
386 	btree->rightblkno = InvalidBlockNumber;
387 
388 	return ret;
389 }
390 
391 /*
392  * Place tuple on page and fills WAL record
393  */
394 static void
entryPlaceToPage(RumBtree btree,Page page,OffsetNumber off)395 entryPlaceToPage(RumBtree btree, Page page, OffsetNumber off)
396 {
397 	OffsetNumber placed;
398 
399 	entryPreparePage(btree, page, off);
400 
401 	placed = PageAddItem(page, (Item) btree->entry, IndexTupleSize(btree->entry), off, false, false);
402 	if (placed != off)
403 		elog(ERROR, "failed to add item to index page in \"%s\"",
404 			 RelationGetRelationName(btree->index));
405 
406 	btree->entry = NULL;
407 }
408 
409 /*
410  * Place tuple and split page, original buffer(lbuf) leaves untouched,
411  * returns shadow page of lbuf filled new data.
412  * Tuples are distributed between pages by equal size on its, not
413  * an equal number!
414  */
415 static Page
entrySplitPage(RumBtree btree,Buffer lbuf,Buffer rbuf,Page lPage,Page rPage,OffsetNumber off)416 entrySplitPage(RumBtree btree, Buffer lbuf, Buffer rbuf,
417 			   Page lPage, Page rPage, OffsetNumber off)
418 {
419 	OffsetNumber i,
420 				maxoff,
421 				separator = InvalidOffsetNumber;
422 	Size		totalsize = 0;
423 	Size		lsize = 0,
424 				size;
425 	char	   *ptr;
426 	IndexTuple	itup,
427 				leftrightmost = NULL;
428 	Page		page;
429 	Page		newlPage = PageGetTempPageCopy(lPage);
430 	Size		pageSize = PageGetPageSize(newlPage);
431 
432 	static char tupstore[2 * BLCKSZ];
433 
434 	entryPreparePage(btree, newlPage, off);
435 
436 	maxoff = PageGetMaxOffsetNumber(newlPage);
437 	ptr = tupstore;
438 
439 	for (i = FirstOffsetNumber; i <= maxoff; i++)
440 	{
441 		if (i == off)
442 		{
443 			size = MAXALIGN(IndexTupleSize(btree->entry));
444 			memcpy(ptr, btree->entry, size);
445 			ptr += size;
446 			totalsize += size + sizeof(ItemIdData);
447 		}
448 
449 		itup = (IndexTuple) PageGetItem(newlPage, PageGetItemId(newlPage, i));
450 		size = MAXALIGN(IndexTupleSize(itup));
451 		memcpy(ptr, itup, size);
452 		ptr += size;
453 		totalsize += size + sizeof(ItemIdData);
454 	}
455 
456 	if (off == maxoff + 1)
457 	{
458 		size = MAXALIGN(IndexTupleSize(btree->entry));
459 		memcpy(ptr, btree->entry, size);
460 		totalsize += size + sizeof(ItemIdData);
461 	}
462 
463 	RumInitPage(rPage, RumPageGetOpaque(newlPage)->flags, pageSize);
464 	RumInitPage(newlPage, RumPageGetOpaque(rPage)->flags, pageSize);
465 
466 	ptr = tupstore;
467 	maxoff++;
468 	lsize = 0;
469 
470 	page = newlPage;
471 	for (i = FirstOffsetNumber; i <= maxoff; i++)
472 	{
473 		itup = (IndexTuple) ptr;
474 
475 		if (lsize > totalsize / 2)
476 		{
477 			if (separator == InvalidOffsetNumber)
478 				separator = i - 1;
479 			page = rPage;
480 		}
481 		else
482 		{
483 			leftrightmost = itup;
484 			lsize += MAXALIGN(IndexTupleSize(itup)) + sizeof(ItemIdData);
485 		}
486 
487 		if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
488 			elog(ERROR, "failed to add item to index page in \"%s\"",
489 				 RelationGetRelationName(btree->index));
490 		ptr += MAXALIGN(IndexTupleSize(itup));
491 	}
492 
493 	btree->entry = RumFormInteriorTuple(btree, leftrightmost, newlPage,
494 										BufferGetBlockNumber(lbuf));
495 
496 	btree->rightblkno = BufferGetBlockNumber(rbuf);
497 
498 	return newlPage;
499 }
500 
501 /*
502  * return newly allocated rightmost tuple
503  */
504 IndexTuple
rumPageGetLinkItup(RumBtree btree,Buffer buf,Page page)505 rumPageGetLinkItup(RumBtree btree, Buffer buf, Page page)
506 {
507 	IndexTuple	itup,
508 				nitup;
509 
510 	itup = getRightMostTuple(page);
511 	nitup = RumFormInteriorTuple(btree, itup, page, BufferGetBlockNumber(buf));
512 
513 	return nitup;
514 }
515 
516 /*
517  * Fills new root by rightest values from child.
518  * Also called from rumxlog, should not use btree
519  */
520 void
rumEntryFillRoot(RumBtree btree,Buffer root,Buffer lbuf,Buffer rbuf,Page page,Page lpage,Page rpage)521 rumEntryFillRoot(RumBtree btree, Buffer root, Buffer lbuf, Buffer rbuf,
522 				 Page page, Page lpage, Page rpage)
523 {
524 	IndexTuple	itup;
525 
526 	itup = rumPageGetLinkItup(btree, lbuf, lpage);
527 	if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
528 		elog(ERROR, "failed to add item to index root page");
529 	pfree(itup);
530 
531 	itup = rumPageGetLinkItup(btree, rbuf, rpage);
532 	if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
533 		elog(ERROR, "failed to add item to index root page");
534 	pfree(itup);
535 }
536 
537 /*
538  * Set up RumBtree for entry page access
539  *
540  * Note: during WAL recovery, there may be no valid data in rumstate
541  * other than a faked-up Relation pointer; the key datum is bogus too.
542  */
543 void
rumPrepareEntryScan(RumBtree btree,OffsetNumber attnum,Datum key,RumNullCategory category,RumState * rumstate)544 rumPrepareEntryScan(RumBtree btree, OffsetNumber attnum,
545 					Datum key, RumNullCategory category,
546 					RumState * rumstate)
547 {
548 	memset(btree, 0, sizeof(RumBtreeData));
549 
550 	btree->index = rumstate->index;
551 	btree->rumstate = rumstate;
552 
553 	btree->findChildPage = entryLocateEntry;
554 	btree->isMoveRight = entryIsMoveRight;
555 	btree->findItem = entryLocateLeafEntry;
556 	btree->findChildPtr = entryFindChildPtr;
557 	btree->getLeftMostPage = entryGetLeftMostPage;
558 	btree->isEnoughSpace = entryIsEnoughSpace;
559 	btree->placeToPage = entryPlaceToPage;
560 	btree->splitPage = entrySplitPage;
561 	btree->fillRoot = rumEntryFillRoot;
562 
563 	btree->isData = false;
564 	btree->searchMode = false;
565 	btree->fullScan = false;
566 
567 	btree->entryAttnum = attnum;
568 	btree->entryKey = key;
569 	btree->entryCategory = category;
570 	btree->isDelete = false;
571 }
572