1 /*-------------------------------------------------------------------------
2 *
3 * rumentrypage.c
4 * page utilities routines for the postgres inverted index access method.
5 *
6 *
7 * Portions Copyright (c) 2015-2019, Postgres Professional
8 * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
9 * Portions Copyright (c) 1994, Regents of the University of California
10 *
11 *-------------------------------------------------------------------------
12 */
13
14 #include "postgres.h"
15
16 #include "rum.h"
17
18 /*
19 * Read item pointers with additional information from leaf data page.
20 * Information is stored in the same manner as in leaf data pages.
21 */
22 void
rumReadTuple(RumState * rumstate,OffsetNumber attnum,IndexTuple itup,RumItem * items,bool copyAddInfo)23 rumReadTuple(RumState * rumstate, OffsetNumber attnum,
24 IndexTuple itup, RumItem * items, bool copyAddInfo)
25 {
26 Pointer ptr = RumGetPosting(itup);
27 RumItem item;
28 int nipd = RumGetNPosting(itup),
29 i;
30
31 ItemPointerSetMin(&item.iptr);
32 for (i = 0; i < nipd; i++)
33 {
34 ptr = rumDataPageLeafRead(ptr, attnum, &item, copyAddInfo, rumstate);
35 items[i] = item;
36 }
37 }
38
39 /*
40 * Read only item pointers from leaf data page.
41 * Information is stored in the same manner as in leaf data pages.
42 */
43 void
rumReadTuplePointers(RumState * rumstate,OffsetNumber attnum,IndexTuple itup,ItemPointerData * ipd)44 rumReadTuplePointers(RumState * rumstate, OffsetNumber attnum,
45 IndexTuple itup, ItemPointerData *ipd)
46 {
47 Pointer ptr = RumGetPosting(itup);
48 int nipd = RumGetNPosting(itup),
49 i;
50 RumItem item;
51
52 ItemPointerSetMin(&item.iptr);
53 for (i = 0; i < nipd; i++)
54 {
55 ptr = rumDataPageLeafReadPointer(ptr, attnum, &item, rumstate);
56 ipd[i] = item.iptr;
57 }
58 }
59
60 /*
61 * Form a non-leaf entry tuple by copying the key data from the given tuple,
62 * which can be either a leaf or non-leaf entry tuple.
63 *
64 * Any posting list in the source tuple is not copied. The specified child
65 * block number is inserted into t_tid.
66 */
67 static IndexTuple
RumFormInteriorTuple(RumBtree btree,IndexTuple itup,Page page,BlockNumber childblk)68 RumFormInteriorTuple(RumBtree btree, IndexTuple itup, Page page,
69 BlockNumber childblk)
70 {
71 IndexTuple nitup;
72 RumNullCategory category;
73
74 if (RumPageIsLeaf(page) && !RumIsPostingTree(itup))
75 {
76 /* Tuple contains a posting list, just copy stuff before that */
77 uint32 origsize = RumGetPostingOffset(itup);
78
79 origsize = MAXALIGN(origsize);
80 nitup = (IndexTuple) palloc(origsize);
81 memcpy(nitup, itup, origsize);
82 /* ... be sure to fix the size header field ... */
83 nitup->t_info &= ~INDEX_SIZE_MASK;
84 nitup->t_info |= origsize;
85 }
86 else
87 {
88 /* Copy the tuple as-is */
89 nitup = (IndexTuple) palloc(IndexTupleSize(itup));
90 memcpy(nitup, itup, IndexTupleSize(itup));
91 }
92
93 /* Now insert the correct downlink */
94 RumSetDownlink(nitup, childblk);
95
96 rumtuple_get_key(btree->rumstate, itup, &category);
97 if (category != RUM_CAT_NORM_KEY)
98 {
99 Assert(IndexTupleHasNulls(itup));
100 nitup->t_info |= INDEX_NULL_MASK;
101 RumSetNullCategory(nitup, category);
102 }
103
104 return nitup;
105 }
106
107 /*
108 * Entry tree is a "static", ie tuple never deletes from it,
109 * so we don't use right bound, we use rightmost key instead.
110 */
111 static IndexTuple
getRightMostTuple(Page page)112 getRightMostTuple(Page page)
113 {
114 OffsetNumber maxoff = PageGetMaxOffsetNumber(page);
115
116 Assert(maxoff != InvalidOffsetNumber);
117
118 return (IndexTuple) PageGetItem(page, PageGetItemId(page, maxoff));
119 }
120
121 static bool
entryIsMoveRight(RumBtree btree,Page page)122 entryIsMoveRight(RumBtree btree, Page page)
123 {
124 IndexTuple itup;
125 OffsetNumber attnum;
126 Datum key;
127 RumNullCategory category;
128
129 if (RumPageRightMost(page))
130 return false;
131
132 itup = getRightMostTuple(page);
133 attnum = rumtuple_get_attrnum(btree->rumstate, itup);
134 key = rumtuple_get_key(btree->rumstate, itup, &category);
135
136 if (rumCompareAttEntries(btree->rumstate,
137 btree->entryAttnum, btree->entryKey, btree->entryCategory,
138 attnum, key, category) > 0)
139 return true;
140
141 return false;
142 }
143
144 /*
145 * Find correct tuple in non-leaf page. It supposed that
146 * page correctly chosen and searching value SHOULD be on page
147 */
148 static BlockNumber
entryLocateEntry(RumBtree btree,RumBtreeStack * stack)149 entryLocateEntry(RumBtree btree, RumBtreeStack * stack)
150 {
151 OffsetNumber low,
152 high,
153 maxoff;
154 IndexTuple itup = NULL;
155 int result;
156 Page page = BufferGetPage(stack->buffer);
157
158 Assert(!RumPageIsLeaf(page));
159 Assert(!RumPageIsData(page));
160
161 if (btree->fullScan)
162 {
163 stack->off = FirstOffsetNumber;
164 stack->predictNumber *= PageGetMaxOffsetNumber(page);
165 return btree->getLeftMostPage(btree, page);
166 }
167
168 low = FirstOffsetNumber;
169 maxoff = high = PageGetMaxOffsetNumber(page);
170 Assert(high >= low);
171
172 high++;
173
174 while (high > low)
175 {
176 OffsetNumber mid = low + ((high - low) / 2);
177
178 if (mid == maxoff && RumPageRightMost(page))
179 {
180 /* Right infinity */
181 result = -1;
182 }
183 else
184 {
185 OffsetNumber attnum;
186 Datum key;
187 RumNullCategory category;
188
189 itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, mid));
190 attnum = rumtuple_get_attrnum(btree->rumstate, itup);
191 key = rumtuple_get_key(btree->rumstate, itup, &category);
192 result = rumCompareAttEntries(btree->rumstate,
193 btree->entryAttnum,
194 btree->entryKey,
195 btree->entryCategory,
196 attnum, key, category);
197 }
198
199 if (result == 0)
200 {
201 stack->off = mid;
202 Assert(RumGetDownlink(itup) != RUM_ROOT_BLKNO);
203 return RumGetDownlink(itup);
204 }
205 else if (result > 0)
206 low = mid + 1;
207 else
208 high = mid;
209 }
210
211 Assert(high >= FirstOffsetNumber && high <= maxoff);
212
213 stack->off = high;
214 itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, high));
215 Assert(RumGetDownlink(itup) != RUM_ROOT_BLKNO);
216 return RumGetDownlink(itup);
217 }
218
219 /*
220 * Searches correct position for value on leaf page.
221 * Page should be correctly chosen.
222 * Returns true if value found on page.
223 */
224 static bool
entryLocateLeafEntry(RumBtree btree,RumBtreeStack * stack)225 entryLocateLeafEntry(RumBtree btree, RumBtreeStack * stack)
226 {
227 Page page = BufferGetPage(stack->buffer);
228 OffsetNumber low,
229 high;
230
231 Assert(RumPageIsLeaf(page));
232 Assert(!RumPageIsData(page));
233
234 if (btree->fullScan)
235 {
236 stack->off = FirstOffsetNumber;
237 return true;
238 }
239
240 low = FirstOffsetNumber;
241 high = PageGetMaxOffsetNumber(page);
242
243 if (high < low)
244 {
245 stack->off = FirstOffsetNumber;
246 return false;
247 }
248
249 high++;
250
251 while (high > low)
252 {
253 OffsetNumber mid = low + ((high - low) / 2);
254 IndexTuple itup;
255 OffsetNumber attnum;
256 Datum key;
257 RumNullCategory category;
258 int result;
259
260 itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, mid));
261 attnum = rumtuple_get_attrnum(btree->rumstate, itup);
262 key = rumtuple_get_key(btree->rumstate, itup, &category);
263 result = rumCompareAttEntries(btree->rumstate,
264 btree->entryAttnum,
265 btree->entryKey,
266 btree->entryCategory,
267 attnum, key, category);
268 if (result == 0)
269 {
270 stack->off = mid;
271 return true;
272 }
273 else if (result > 0)
274 low = mid + 1;
275 else
276 high = mid;
277 }
278
279 stack->off = high;
280 return false;
281 }
282
283 static OffsetNumber
entryFindChildPtr(RumBtree btree,Page page,BlockNumber blkno,OffsetNumber storedOff)284 entryFindChildPtr(RumBtree btree, Page page, BlockNumber blkno, OffsetNumber storedOff)
285 {
286 OffsetNumber i,
287 maxoff = PageGetMaxOffsetNumber(page);
288 IndexTuple itup;
289
290 Assert(!RumPageIsLeaf(page));
291 Assert(!RumPageIsData(page));
292
293 /* if page isn't changed, we returns storedOff */
294 if (storedOff >= FirstOffsetNumber && storedOff <= maxoff)
295 {
296 itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, storedOff));
297 if (RumGetDownlink(itup) == blkno)
298 return storedOff;
299
300 /*
301 * we hope, that needed pointer goes to right. It's true if there
302 * wasn't a deletion
303 */
304 for (i = storedOff + 1; i <= maxoff; i++)
305 {
306 itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
307 if (RumGetDownlink(itup) == blkno)
308 return i;
309 }
310 maxoff = storedOff - 1;
311 }
312
313 /* last chance */
314 for (i = FirstOffsetNumber; i <= maxoff; i++)
315 {
316 itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
317 if (RumGetDownlink(itup) == blkno)
318 return i;
319 }
320
321 return InvalidOffsetNumber;
322 }
323
324 static BlockNumber
entryGetLeftMostPage(RumBtree btree,Page page)325 entryGetLeftMostPage(RumBtree btree, Page page)
326 {
327 IndexTuple itup;
328
329 Assert(!RumPageIsLeaf(page));
330 Assert(!RumPageIsData(page));
331 Assert(PageGetMaxOffsetNumber(page) >= FirstOffsetNumber);
332
333 itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, FirstOffsetNumber));
334 return RumGetDownlink(itup);
335 }
336
337 static bool
entryIsEnoughSpace(RumBtree btree,Buffer buf,OffsetNumber off)338 entryIsEnoughSpace(RumBtree btree, Buffer buf, OffsetNumber off)
339 {
340 Size itupsz = 0;
341 Page page = BufferGetPage(buf);
342
343 Assert(btree->entry);
344 Assert(!RumPageIsData(page));
345
346 if (btree->isDelete)
347 {
348 IndexTuple itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, off));
349
350 itupsz = MAXALIGN(IndexTupleSize(itup)) + sizeof(ItemIdData);
351 }
352
353 if (PageGetFreeSpace(page) + itupsz >= MAXALIGN(IndexTupleSize(btree->entry)) + sizeof(ItemIdData))
354 return true;
355
356 return false;
357 }
358
359 /*
360 * Delete tuple on leaf page if tuples existed and we
361 * should update it, update old child blkno to new right page
362 * if child split occurred
363 */
364 static BlockNumber
entryPreparePage(RumBtree btree,Page page,OffsetNumber off)365 entryPreparePage(RumBtree btree, Page page, OffsetNumber off)
366 {
367 BlockNumber ret = InvalidBlockNumber;
368
369 Assert(btree->entry);
370 Assert(!RumPageIsData(page));
371
372 if (btree->isDelete)
373 {
374 Assert(RumPageIsLeaf(page));
375 PageIndexTupleDelete(page, off);
376 }
377
378 if (!RumPageIsLeaf(page) && btree->rightblkno != InvalidBlockNumber)
379 {
380 IndexTuple itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, off));
381
382 RumSetDownlink(itup, btree->rightblkno);
383 ret = btree->rightblkno;
384 }
385
386 btree->rightblkno = InvalidBlockNumber;
387
388 return ret;
389 }
390
391 /*
392 * Place tuple on page and fills WAL record
393 */
394 static void
entryPlaceToPage(RumBtree btree,Page page,OffsetNumber off)395 entryPlaceToPage(RumBtree btree, Page page, OffsetNumber off)
396 {
397 OffsetNumber placed;
398
399 entryPreparePage(btree, page, off);
400
401 placed = PageAddItem(page, (Item) btree->entry, IndexTupleSize(btree->entry), off, false, false);
402 if (placed != off)
403 elog(ERROR, "failed to add item to index page in \"%s\"",
404 RelationGetRelationName(btree->index));
405
406 btree->entry = NULL;
407 }
408
409 /*
410 * Place tuple and split page, original buffer(lbuf) leaves untouched,
411 * returns shadow page of lbuf filled new data.
412 * Tuples are distributed between pages by equal size on its, not
413 * an equal number!
414 */
415 static Page
entrySplitPage(RumBtree btree,Buffer lbuf,Buffer rbuf,Page lPage,Page rPage,OffsetNumber off)416 entrySplitPage(RumBtree btree, Buffer lbuf, Buffer rbuf,
417 Page lPage, Page rPage, OffsetNumber off)
418 {
419 OffsetNumber i,
420 maxoff,
421 separator = InvalidOffsetNumber;
422 Size totalsize = 0;
423 Size lsize = 0,
424 size;
425 char *ptr;
426 IndexTuple itup,
427 leftrightmost = NULL;
428 Page page;
429 Page newlPage = PageGetTempPageCopy(lPage);
430 Size pageSize = PageGetPageSize(newlPage);
431
432 static char tupstore[2 * BLCKSZ];
433
434 entryPreparePage(btree, newlPage, off);
435
436 maxoff = PageGetMaxOffsetNumber(newlPage);
437 ptr = tupstore;
438
439 for (i = FirstOffsetNumber; i <= maxoff; i++)
440 {
441 if (i == off)
442 {
443 size = MAXALIGN(IndexTupleSize(btree->entry));
444 memcpy(ptr, btree->entry, size);
445 ptr += size;
446 totalsize += size + sizeof(ItemIdData);
447 }
448
449 itup = (IndexTuple) PageGetItem(newlPage, PageGetItemId(newlPage, i));
450 size = MAXALIGN(IndexTupleSize(itup));
451 memcpy(ptr, itup, size);
452 ptr += size;
453 totalsize += size + sizeof(ItemIdData);
454 }
455
456 if (off == maxoff + 1)
457 {
458 size = MAXALIGN(IndexTupleSize(btree->entry));
459 memcpy(ptr, btree->entry, size);
460 totalsize += size + sizeof(ItemIdData);
461 }
462
463 RumInitPage(rPage, RumPageGetOpaque(newlPage)->flags, pageSize);
464 RumInitPage(newlPage, RumPageGetOpaque(rPage)->flags, pageSize);
465
466 ptr = tupstore;
467 maxoff++;
468 lsize = 0;
469
470 page = newlPage;
471 for (i = FirstOffsetNumber; i <= maxoff; i++)
472 {
473 itup = (IndexTuple) ptr;
474
475 if (lsize > totalsize / 2)
476 {
477 if (separator == InvalidOffsetNumber)
478 separator = i - 1;
479 page = rPage;
480 }
481 else
482 {
483 leftrightmost = itup;
484 lsize += MAXALIGN(IndexTupleSize(itup)) + sizeof(ItemIdData);
485 }
486
487 if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
488 elog(ERROR, "failed to add item to index page in \"%s\"",
489 RelationGetRelationName(btree->index));
490 ptr += MAXALIGN(IndexTupleSize(itup));
491 }
492
493 btree->entry = RumFormInteriorTuple(btree, leftrightmost, newlPage,
494 BufferGetBlockNumber(lbuf));
495
496 btree->rightblkno = BufferGetBlockNumber(rbuf);
497
498 return newlPage;
499 }
500
501 /*
502 * return newly allocated rightmost tuple
503 */
504 IndexTuple
rumPageGetLinkItup(RumBtree btree,Buffer buf,Page page)505 rumPageGetLinkItup(RumBtree btree, Buffer buf, Page page)
506 {
507 IndexTuple itup,
508 nitup;
509
510 itup = getRightMostTuple(page);
511 nitup = RumFormInteriorTuple(btree, itup, page, BufferGetBlockNumber(buf));
512
513 return nitup;
514 }
515
516 /*
517 * Fills new root by rightest values from child.
518 * Also called from rumxlog, should not use btree
519 */
520 void
rumEntryFillRoot(RumBtree btree,Buffer root,Buffer lbuf,Buffer rbuf,Page page,Page lpage,Page rpage)521 rumEntryFillRoot(RumBtree btree, Buffer root, Buffer lbuf, Buffer rbuf,
522 Page page, Page lpage, Page rpage)
523 {
524 IndexTuple itup;
525
526 itup = rumPageGetLinkItup(btree, lbuf, lpage);
527 if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
528 elog(ERROR, "failed to add item to index root page");
529 pfree(itup);
530
531 itup = rumPageGetLinkItup(btree, rbuf, rpage);
532 if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
533 elog(ERROR, "failed to add item to index root page");
534 pfree(itup);
535 }
536
537 /*
538 * Set up RumBtree for entry page access
539 *
540 * Note: during WAL recovery, there may be no valid data in rumstate
541 * other than a faked-up Relation pointer; the key datum is bogus too.
542 */
543 void
rumPrepareEntryScan(RumBtree btree,OffsetNumber attnum,Datum key,RumNullCategory category,RumState * rumstate)544 rumPrepareEntryScan(RumBtree btree, OffsetNumber attnum,
545 Datum key, RumNullCategory category,
546 RumState * rumstate)
547 {
548 memset(btree, 0, sizeof(RumBtreeData));
549
550 btree->index = rumstate->index;
551 btree->rumstate = rumstate;
552
553 btree->findChildPage = entryLocateEntry;
554 btree->isMoveRight = entryIsMoveRight;
555 btree->findItem = entryLocateLeafEntry;
556 btree->findChildPtr = entryFindChildPtr;
557 btree->getLeftMostPage = entryGetLeftMostPage;
558 btree->isEnoughSpace = entryIsEnoughSpace;
559 btree->placeToPage = entryPlaceToPage;
560 btree->splitPage = entrySplitPage;
561 btree->fillRoot = rumEntryFillRoot;
562
563 btree->isData = false;
564 btree->searchMode = false;
565 btree->fullScan = false;
566
567 btree->entryAttnum = attnum;
568 btree->entryKey = key;
569 btree->entryCategory = category;
570 btree->isDelete = false;
571 }
572