1 /*-------------------------------------------------------------------------
2  *
3  * hash_xlog.c
4  *	  WAL replay logic for hash index.
5  *
6  *
7  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *	  src/backend/access/hash/hash_xlog.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16 
17 #include "access/bufmask.h"
18 #include "access/hash.h"
19 #include "access/hash_xlog.h"
20 #include "access/xlogutils.h"
21 #include "access/xlog.h"
22 #include "access/transam.h"
23 #include "storage/procarray.h"
24 #include "miscadmin.h"
25 
26 /*
27  * replay a hash index meta page
28  */
29 static void
30 hash_xlog_init_meta_page(XLogReaderState *record)
31 {
32 	XLogRecPtr	lsn = record->EndRecPtr;
33 	Page		page;
34 	Buffer		metabuf;
35 	ForkNumber	forknum;
36 
37 	xl_hash_init_meta_page *xlrec = (xl_hash_init_meta_page *) XLogRecGetData(record);
38 
39 	/* create the index' metapage */
40 	metabuf = XLogInitBufferForRedo(record, 0);
41 	Assert(BufferIsValid(metabuf));
42 	_hash_init_metabuffer(metabuf, xlrec->num_tuples, xlrec->procid,
43 						  xlrec->ffactor, true);
44 	page = (Page) BufferGetPage(metabuf);
45 	PageSetLSN(page, lsn);
46 	MarkBufferDirty(metabuf);
47 
48 	/*
49 	 * Force the on-disk state of init forks to always be in sync with the
50 	 * state in shared buffers.  See XLogReadBufferForRedoExtended.  We need
51 	 * special handling for init forks as create index operations don't log a
52 	 * full page image of the metapage.
53 	 */
54 	XLogRecGetBlockTag(record, 0, NULL, &forknum, NULL);
55 	if (forknum == INIT_FORKNUM)
56 		FlushOneBuffer(metabuf);
57 
58 	/* all done */
59 	UnlockReleaseBuffer(metabuf);
60 }
61 
62 /*
63  * replay a hash index bitmap page
64  */
65 static void
66 hash_xlog_init_bitmap_page(XLogReaderState *record)
67 {
68 	XLogRecPtr	lsn = record->EndRecPtr;
69 	Buffer		bitmapbuf;
70 	Buffer		metabuf;
71 	Page		page;
72 	HashMetaPage metap;
73 	uint32		num_buckets;
74 	ForkNumber	forknum;
75 
76 	xl_hash_init_bitmap_page *xlrec = (xl_hash_init_bitmap_page *) XLogRecGetData(record);
77 
78 	/*
79 	 * Initialize bitmap page
80 	 */
81 	bitmapbuf = XLogInitBufferForRedo(record, 0);
82 	_hash_initbitmapbuffer(bitmapbuf, xlrec->bmsize, true);
83 	PageSetLSN(BufferGetPage(bitmapbuf), lsn);
84 	MarkBufferDirty(bitmapbuf);
85 
86 	/*
87 	 * Force the on-disk state of init forks to always be in sync with the
88 	 * state in shared buffers.  See XLogReadBufferForRedoExtended.  We need
89 	 * special handling for init forks as create index operations don't log a
90 	 * full page image of the metapage.
91 	 */
92 	XLogRecGetBlockTag(record, 0, NULL, &forknum, NULL);
93 	if (forknum == INIT_FORKNUM)
94 		FlushOneBuffer(bitmapbuf);
95 	UnlockReleaseBuffer(bitmapbuf);
96 
97 	/* add the new bitmap page to the metapage's list of bitmaps */
98 	if (XLogReadBufferForRedo(record, 1, &metabuf) == BLK_NEEDS_REDO)
99 	{
100 		/*
101 		 * Note: in normal operation, we'd update the metapage while still
102 		 * holding lock on the bitmap page.  But during replay it's not
103 		 * necessary to hold that lock, since nobody can see it yet; the
104 		 * creating transaction hasn't yet committed.
105 		 */
106 		page = BufferGetPage(metabuf);
107 		metap = HashPageGetMeta(page);
108 
109 		num_buckets = metap->hashm_maxbucket + 1;
110 		metap->hashm_mapp[metap->hashm_nmaps] = num_buckets + 1;
111 		metap->hashm_nmaps++;
112 
113 		PageSetLSN(page, lsn);
114 		MarkBufferDirty(metabuf);
115 
116 		XLogRecGetBlockTag(record, 1, NULL, &forknum, NULL);
117 		if (forknum == INIT_FORKNUM)
118 			FlushOneBuffer(metabuf);
119 	}
120 	if (BufferIsValid(metabuf))
121 		UnlockReleaseBuffer(metabuf);
122 }
123 
124 /*
125  * replay a hash index insert without split
126  */
127 static void
128 hash_xlog_insert(XLogReaderState *record)
129 {
130 	HashMetaPage metap;
131 	XLogRecPtr	lsn = record->EndRecPtr;
132 	xl_hash_insert *xlrec = (xl_hash_insert *) XLogRecGetData(record);
133 	Buffer		buffer;
134 	Page		page;
135 
136 	if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
137 	{
138 		Size		datalen;
139 		char	   *datapos = XLogRecGetBlockData(record, 0, &datalen);
140 
141 		page = BufferGetPage(buffer);
142 
143 		if (PageAddItem(page, (Item) datapos, datalen, xlrec->offnum,
144 						false, false) == InvalidOffsetNumber)
145 			elog(PANIC, "hash_xlog_insert: failed to add item");
146 
147 		PageSetLSN(page, lsn);
148 		MarkBufferDirty(buffer);
149 	}
150 	if (BufferIsValid(buffer))
151 		UnlockReleaseBuffer(buffer);
152 
153 	if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO)
154 	{
155 		/*
156 		 * Note: in normal operation, we'd update the metapage while still
157 		 * holding lock on the page we inserted into.  But during replay it's
158 		 * not necessary to hold that lock, since no other index updates can
159 		 * be happening concurrently.
160 		 */
161 		page = BufferGetPage(buffer);
162 		metap = HashPageGetMeta(page);
163 		metap->hashm_ntuples += 1;
164 
165 		PageSetLSN(page, lsn);
166 		MarkBufferDirty(buffer);
167 	}
168 	if (BufferIsValid(buffer))
169 		UnlockReleaseBuffer(buffer);
170 }
171 
172 /*
173  * replay addition of overflow page for hash index
174  */
175 static void
176 hash_xlog_add_ovfl_page(XLogReaderState *record)
177 {
178 	XLogRecPtr	lsn = record->EndRecPtr;
179 	xl_hash_add_ovfl_page *xlrec = (xl_hash_add_ovfl_page *) XLogRecGetData(record);
180 	Buffer		leftbuf;
181 	Buffer		ovflbuf;
182 	Buffer		metabuf;
183 	BlockNumber leftblk;
184 	BlockNumber rightblk;
185 	BlockNumber newmapblk = InvalidBlockNumber;
186 	Page		ovflpage;
187 	HashPageOpaque ovflopaque;
188 	uint32	   *num_bucket;
189 	char	   *data;
190 	Size		datalen PG_USED_FOR_ASSERTS_ONLY;
191 	bool		new_bmpage = false;
192 
193 	XLogRecGetBlockTag(record, 0, NULL, NULL, &rightblk);
194 	XLogRecGetBlockTag(record, 1, NULL, NULL, &leftblk);
195 
196 	ovflbuf = XLogInitBufferForRedo(record, 0);
197 	Assert(BufferIsValid(ovflbuf));
198 
199 	data = XLogRecGetBlockData(record, 0, &datalen);
200 	num_bucket = (uint32 *) data;
201 	Assert(datalen == sizeof(uint32));
202 	_hash_initbuf(ovflbuf, InvalidBlockNumber, *num_bucket, LH_OVERFLOW_PAGE,
203 				  true);
204 	/* update backlink */
205 	ovflpage = BufferGetPage(ovflbuf);
206 	ovflopaque = (HashPageOpaque) PageGetSpecialPointer(ovflpage);
207 	ovflopaque->hasho_prevblkno = leftblk;
208 
209 	PageSetLSN(ovflpage, lsn);
210 	MarkBufferDirty(ovflbuf);
211 
212 	if (XLogReadBufferForRedo(record, 1, &leftbuf) == BLK_NEEDS_REDO)
213 	{
214 		Page		leftpage;
215 		HashPageOpaque leftopaque;
216 
217 		leftpage = BufferGetPage(leftbuf);
218 		leftopaque = (HashPageOpaque) PageGetSpecialPointer(leftpage);
219 		leftopaque->hasho_nextblkno = rightblk;
220 
221 		PageSetLSN(leftpage, lsn);
222 		MarkBufferDirty(leftbuf);
223 	}
224 
225 	if (BufferIsValid(leftbuf))
226 		UnlockReleaseBuffer(leftbuf);
227 	UnlockReleaseBuffer(ovflbuf);
228 
229 	/*
230 	 * Note: in normal operation, we'd update the bitmap and meta page while
231 	 * still holding lock on the overflow pages.  But during replay it's not
232 	 * necessary to hold those locks, since no other index updates can be
233 	 * happening concurrently.
234 	 */
235 	if (XLogRecHasBlockRef(record, 2))
236 	{
237 		Buffer		mapbuffer;
238 
239 		if (XLogReadBufferForRedo(record, 2, &mapbuffer) == BLK_NEEDS_REDO)
240 		{
241 			Page		mappage = (Page) BufferGetPage(mapbuffer);
242 			uint32	   *freep = NULL;
243 			char	   *data;
244 			uint32	   *bitmap_page_bit;
245 
246 			freep = HashPageGetBitmap(mappage);
247 
248 			data = XLogRecGetBlockData(record, 2, &datalen);
249 			bitmap_page_bit = (uint32 *) data;
250 
251 			SETBIT(freep, *bitmap_page_bit);
252 
253 			PageSetLSN(mappage, lsn);
254 			MarkBufferDirty(mapbuffer);
255 		}
256 		if (BufferIsValid(mapbuffer))
257 			UnlockReleaseBuffer(mapbuffer);
258 	}
259 
260 	if (XLogRecHasBlockRef(record, 3))
261 	{
262 		Buffer		newmapbuf;
263 
264 		newmapbuf = XLogInitBufferForRedo(record, 3);
265 
266 		_hash_initbitmapbuffer(newmapbuf, xlrec->bmsize, true);
267 
268 		new_bmpage = true;
269 		newmapblk = BufferGetBlockNumber(newmapbuf);
270 
271 		MarkBufferDirty(newmapbuf);
272 		PageSetLSN(BufferGetPage(newmapbuf), lsn);
273 
274 		UnlockReleaseBuffer(newmapbuf);
275 	}
276 
277 	if (XLogReadBufferForRedo(record, 4, &metabuf) == BLK_NEEDS_REDO)
278 	{
279 		HashMetaPage metap;
280 		Page		page;
281 		uint32	   *firstfree_ovflpage;
282 
283 		data = XLogRecGetBlockData(record, 4, &datalen);
284 		firstfree_ovflpage = (uint32 *) data;
285 
286 		page = BufferGetPage(metabuf);
287 		metap = HashPageGetMeta(page);
288 		metap->hashm_firstfree = *firstfree_ovflpage;
289 
290 		if (!xlrec->bmpage_found)
291 		{
292 			metap->hashm_spares[metap->hashm_ovflpoint]++;
293 
294 			if (new_bmpage)
295 			{
296 				Assert(BlockNumberIsValid(newmapblk));
297 
298 				metap->hashm_mapp[metap->hashm_nmaps] = newmapblk;
299 				metap->hashm_nmaps++;
300 				metap->hashm_spares[metap->hashm_ovflpoint]++;
301 			}
302 		}
303 
304 		PageSetLSN(page, lsn);
305 		MarkBufferDirty(metabuf);
306 	}
307 	if (BufferIsValid(metabuf))
308 		UnlockReleaseBuffer(metabuf);
309 }
310 
311 /*
312  * replay allocation of page for split operation
313  */
314 static void
315 hash_xlog_split_allocate_page(XLogReaderState *record)
316 {
317 	XLogRecPtr	lsn = record->EndRecPtr;
318 	xl_hash_split_allocate_page *xlrec = (xl_hash_split_allocate_page *) XLogRecGetData(record);
319 	Buffer		oldbuf;
320 	Buffer		newbuf;
321 	Buffer		metabuf;
322 	Size		datalen PG_USED_FOR_ASSERTS_ONLY;
323 	char	   *data;
324 	XLogRedoAction action;
325 
326 	/*
327 	 * To be consistent with normal operation, here we take cleanup locks on
328 	 * both the old and new buckets even though there can't be any concurrent
329 	 * inserts.
330 	 */
331 
332 	/* replay the record for old bucket */
333 	action = XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &oldbuf);
334 
335 	/*
336 	 * Note that we still update the page even if it was restored from a full
337 	 * page image, because the special space is not included in the image.
338 	 */
339 	if (action == BLK_NEEDS_REDO || action == BLK_RESTORED)
340 	{
341 		Page		oldpage;
342 		HashPageOpaque oldopaque;
343 
344 		oldpage = BufferGetPage(oldbuf);
345 		oldopaque = (HashPageOpaque) PageGetSpecialPointer(oldpage);
346 
347 		oldopaque->hasho_flag = xlrec->old_bucket_flag;
348 		oldopaque->hasho_prevblkno = xlrec->new_bucket;
349 
350 		PageSetLSN(oldpage, lsn);
351 		MarkBufferDirty(oldbuf);
352 	}
353 
354 	/* replay the record for new bucket */
355 	newbuf = XLogInitBufferForRedo(record, 1);
356 	_hash_initbuf(newbuf, xlrec->new_bucket, xlrec->new_bucket,
357 				  xlrec->new_bucket_flag, true);
358 	if (!IsBufferCleanupOK(newbuf))
359 		elog(PANIC, "hash_xlog_split_allocate_page: failed to acquire cleanup lock");
360 	MarkBufferDirty(newbuf);
361 	PageSetLSN(BufferGetPage(newbuf), lsn);
362 
363 	/*
364 	 * We can release the lock on old bucket early as well but doing here to
365 	 * consistent with normal operation.
366 	 */
367 	if (BufferIsValid(oldbuf))
368 		UnlockReleaseBuffer(oldbuf);
369 	if (BufferIsValid(newbuf))
370 		UnlockReleaseBuffer(newbuf);
371 
372 	/*
373 	 * Note: in normal operation, we'd update the meta page while still
374 	 * holding lock on the old and new bucket pages.  But during replay it's
375 	 * not necessary to hold those locks, since no other bucket splits can be
376 	 * happening concurrently.
377 	 */
378 
379 	/* replay the record for metapage changes */
380 	if (XLogReadBufferForRedo(record, 2, &metabuf) == BLK_NEEDS_REDO)
381 	{
382 		Page		page;
383 		HashMetaPage metap;
384 
385 		page = BufferGetPage(metabuf);
386 		metap = HashPageGetMeta(page);
387 		metap->hashm_maxbucket = xlrec->new_bucket;
388 
389 		data = XLogRecGetBlockData(record, 2, &datalen);
390 
391 		if (xlrec->flags & XLH_SPLIT_META_UPDATE_MASKS)
392 		{
393 			uint32		lowmask;
394 			uint32	   *highmask;
395 
396 			/* extract low and high masks. */
397 			memcpy(&lowmask, data, sizeof(uint32));
398 			highmask = (uint32 *) ((char *) data + sizeof(uint32));
399 
400 			/* update metapage */
401 			metap->hashm_lowmask = lowmask;
402 			metap->hashm_highmask = *highmask;
403 
404 			data += sizeof(uint32) * 2;
405 		}
406 
407 		if (xlrec->flags & XLH_SPLIT_META_UPDATE_SPLITPOINT)
408 		{
409 			uint32		ovflpoint;
410 			uint32	   *ovflpages;
411 
412 			/* extract information of overflow pages. */
413 			memcpy(&ovflpoint, data, sizeof(uint32));
414 			ovflpages = (uint32 *) ((char *) data + sizeof(uint32));
415 
416 			/* update metapage */
417 			metap->hashm_spares[ovflpoint] = *ovflpages;
418 			metap->hashm_ovflpoint = ovflpoint;
419 		}
420 
421 		MarkBufferDirty(metabuf);
422 		PageSetLSN(BufferGetPage(metabuf), lsn);
423 	}
424 
425 	if (BufferIsValid(metabuf))
426 		UnlockReleaseBuffer(metabuf);
427 }
428 
429 /*
430  * replay of split operation
431  */
432 static void
433 hash_xlog_split_page(XLogReaderState *record)
434 {
435 	Buffer		buf;
436 
437 	if (XLogReadBufferForRedo(record, 0, &buf) != BLK_RESTORED)
438 		elog(ERROR, "Hash split record did not contain a full-page image");
439 
440 	UnlockReleaseBuffer(buf);
441 }
442 
443 /*
444  * replay completion of split operation
445  */
446 static void
447 hash_xlog_split_complete(XLogReaderState *record)
448 {
449 	XLogRecPtr	lsn = record->EndRecPtr;
450 	xl_hash_split_complete *xlrec = (xl_hash_split_complete *) XLogRecGetData(record);
451 	Buffer		oldbuf;
452 	Buffer		newbuf;
453 	XLogRedoAction action;
454 
455 	/* replay the record for old bucket */
456 	action = XLogReadBufferForRedo(record, 0, &oldbuf);
457 
458 	/*
459 	 * Note that we still update the page even if it was restored from a full
460 	 * page image, because the bucket flag is not included in the image.
461 	 */
462 	if (action == BLK_NEEDS_REDO || action == BLK_RESTORED)
463 	{
464 		Page		oldpage;
465 		HashPageOpaque oldopaque;
466 
467 		oldpage = BufferGetPage(oldbuf);
468 		oldopaque = (HashPageOpaque) PageGetSpecialPointer(oldpage);
469 
470 		oldopaque->hasho_flag = xlrec->old_bucket_flag;
471 
472 		PageSetLSN(oldpage, lsn);
473 		MarkBufferDirty(oldbuf);
474 	}
475 	if (BufferIsValid(oldbuf))
476 		UnlockReleaseBuffer(oldbuf);
477 
478 	/* replay the record for new bucket */
479 	action = XLogReadBufferForRedo(record, 1, &newbuf);
480 
481 	/*
482 	 * Note that we still update the page even if it was restored from a full
483 	 * page image, because the bucket flag is not included in the image.
484 	 */
485 	if (action == BLK_NEEDS_REDO || action == BLK_RESTORED)
486 	{
487 		Page		newpage;
488 		HashPageOpaque nopaque;
489 
490 		newpage = BufferGetPage(newbuf);
491 		nopaque = (HashPageOpaque) PageGetSpecialPointer(newpage);
492 
493 		nopaque->hasho_flag = xlrec->new_bucket_flag;
494 
495 		PageSetLSN(newpage, lsn);
496 		MarkBufferDirty(newbuf);
497 	}
498 	if (BufferIsValid(newbuf))
499 		UnlockReleaseBuffer(newbuf);
500 }
501 
502 /*
503  * replay move of page contents for squeeze operation of hash index
504  */
505 static void
506 hash_xlog_move_page_contents(XLogReaderState *record)
507 {
508 	XLogRecPtr	lsn = record->EndRecPtr;
509 	xl_hash_move_page_contents *xldata = (xl_hash_move_page_contents *) XLogRecGetData(record);
510 	Buffer		bucketbuf = InvalidBuffer;
511 	Buffer		writebuf = InvalidBuffer;
512 	Buffer		deletebuf = InvalidBuffer;
513 	XLogRedoAction action;
514 
515 	/*
516 	 * Ensure we have a cleanup lock on primary bucket page before we start
517 	 * with the actual replay operation.  This is to ensure that neither a
518 	 * scan can start nor a scan can be already-in-progress during the replay
519 	 * of this operation.  If we allow scans during this operation, then they
520 	 * can miss some records or show the same record multiple times.
521 	 */
522 	if (xldata->is_prim_bucket_same_wrt)
523 		action = XLogReadBufferForRedoExtended(record, 1, RBM_NORMAL, true, &writebuf);
524 	else
525 	{
526 		/*
527 		 * we don't care for return value as the purpose of reading bucketbuf
528 		 * is to ensure a cleanup lock on primary bucket page.
529 		 */
530 		(void) XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &bucketbuf);
531 
532 		action = XLogReadBufferForRedo(record, 1, &writebuf);
533 	}
534 
535 	/* replay the record for adding entries in overflow buffer */
536 	if (action == BLK_NEEDS_REDO)
537 	{
538 		Page		writepage;
539 		char	   *begin;
540 		char	   *data;
541 		Size		datalen;
542 		uint16		ninserted = 0;
543 
544 		data = begin = XLogRecGetBlockData(record, 1, &datalen);
545 
546 		writepage = (Page) BufferGetPage(writebuf);
547 
548 		if (xldata->ntups > 0)
549 		{
550 			OffsetNumber *towrite = (OffsetNumber *) data;
551 
552 			data += sizeof(OffsetNumber) * xldata->ntups;
553 
554 			while (data - begin < datalen)
555 			{
556 				IndexTuple	itup = (IndexTuple) data;
557 				Size		itemsz;
558 				OffsetNumber l;
559 
560 				itemsz = IndexTupleSize(itup);
561 				itemsz = MAXALIGN(itemsz);
562 
563 				data += itemsz;
564 
565 				l = PageAddItem(writepage, (Item) itup, itemsz, towrite[ninserted], false, false);
566 				if (l == InvalidOffsetNumber)
567 					elog(ERROR, "hash_xlog_move_page_contents: failed to add item to hash index page, size %d bytes",
568 						 (int) itemsz);
569 
570 				ninserted++;
571 			}
572 		}
573 
574 		/*
575 		 * number of tuples inserted must be same as requested in REDO record.
576 		 */
577 		Assert(ninserted == xldata->ntups);
578 
579 		PageSetLSN(writepage, lsn);
580 		MarkBufferDirty(writebuf);
581 	}
582 
583 	/* replay the record for deleting entries from overflow buffer */
584 	if (XLogReadBufferForRedo(record, 2, &deletebuf) == BLK_NEEDS_REDO)
585 	{
586 		Page		page;
587 		char	   *ptr;
588 		Size		len;
589 
590 		ptr = XLogRecGetBlockData(record, 2, &len);
591 
592 		page = (Page) BufferGetPage(deletebuf);
593 
594 		if (len > 0)
595 		{
596 			OffsetNumber *unused;
597 			OffsetNumber *unend;
598 
599 			unused = (OffsetNumber *) ptr;
600 			unend = (OffsetNumber *) ((char *) ptr + len);
601 
602 			if ((unend - unused) > 0)
603 				PageIndexMultiDelete(page, unused, unend - unused);
604 		}
605 
606 		PageSetLSN(page, lsn);
607 		MarkBufferDirty(deletebuf);
608 	}
609 
610 	/*
611 	 * Replay is complete, now we can release the buffers. We release locks at
612 	 * end of replay operation to ensure that we hold lock on primary bucket
613 	 * page till end of operation.  We can optimize by releasing the lock on
614 	 * write buffer as soon as the operation for same is complete, if it is
615 	 * not same as primary bucket page, but that doesn't seem to be worth
616 	 * complicating the code.
617 	 */
618 	if (BufferIsValid(deletebuf))
619 		UnlockReleaseBuffer(deletebuf);
620 
621 	if (BufferIsValid(writebuf))
622 		UnlockReleaseBuffer(writebuf);
623 
624 	if (BufferIsValid(bucketbuf))
625 		UnlockReleaseBuffer(bucketbuf);
626 }
627 
628 /*
629  * replay squeeze page operation of hash index
630  */
631 static void
632 hash_xlog_squeeze_page(XLogReaderState *record)
633 {
634 	XLogRecPtr	lsn = record->EndRecPtr;
635 	xl_hash_squeeze_page *xldata = (xl_hash_squeeze_page *) XLogRecGetData(record);
636 	Buffer		bucketbuf = InvalidBuffer;
637 	Buffer		writebuf;
638 	Buffer		ovflbuf;
639 	Buffer		prevbuf = InvalidBuffer;
640 	Buffer		mapbuf;
641 	XLogRedoAction action;
642 
643 	/*
644 	 * Ensure we have a cleanup lock on primary bucket page before we start
645 	 * with the actual replay operation.  This is to ensure that neither a
646 	 * scan can start nor a scan can be already-in-progress during the replay
647 	 * of this operation.  If we allow scans during this operation, then they
648 	 * can miss some records or show the same record multiple times.
649 	 */
650 	if (xldata->is_prim_bucket_same_wrt)
651 		action = XLogReadBufferForRedoExtended(record, 1, RBM_NORMAL, true, &writebuf);
652 	else
653 	{
654 		/*
655 		 * we don't care for return value as the purpose of reading bucketbuf
656 		 * is to ensure a cleanup lock on primary bucket page.
__construct($exceptions = null)657 		 */
658 		(void) XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &bucketbuf);
659 
660 		action = XLogReadBufferForRedo(record, 1, &writebuf);
661 	}
662 
663 	/* replay the record for adding entries in overflow buffer */
664 	if (action == BLK_NEEDS_REDO)
665 	{
666 		Page		writepage;
__destruct()667 		char	   *begin;
668 		char	   *data;
669 		Size		datalen;
670 		uint16		ninserted = 0;
671 
672 		data = begin = XLogRecGetBlockData(record, 1, &datalen);
673 
674 		writepage = (Page) BufferGetPage(writebuf);
675 
676 		if (xldata->ntups > 0)
677 		{
678 			OffsetNumber *towrite = (OffsetNumber *) data;
679 
680 			data += sizeof(OffsetNumber) * xldata->ntups;
681 
682 			while (data - begin < datalen)
683 			{
684 				IndexTuple	itup = (IndexTuple) data;
685 				Size		itemsz;
mailPassthru($to, $subject, $body, $header, $params)686 				OffsetNumber l;
687 
688 				itemsz = IndexTupleSize(itup);
689 				itemsz = MAXALIGN(itemsz);
690 
691 				data += itemsz;
692 
693 				l = PageAddItem(writepage, (Item) itup, itemsz, towrite[ninserted], false, false);
694 				if (l == InvalidOffsetNumber)
695 					elog(ERROR, "hash_xlog_squeeze_page: failed to add item to hash index page, size %d bytes",
696 						 (int) itemsz);
697 
698 				ninserted++;
699 			}
700 		}
701 
702 		/*
703 		 * number of tuples inserted must be same as requested in REDO record.
704 		 */
705 		Assert(ninserted == xldata->ntups);
706 
707 		/*
708 		 * if the page on which are adding tuples is a page previous to freed
709 		 * overflow page, then update its nextblno.
710 		 */
edebug($str)711 		if (xldata->is_prev_bucket_same_wrt)
712 		{
713 			HashPageOpaque writeopaque = (HashPageOpaque) PageGetSpecialPointer(writepage);
714 
715 			writeopaque->hasho_nextblkno = xldata->nextblkno;
716 		}
717 
718 		PageSetLSN(writepage, lsn);
719 		MarkBufferDirty(writebuf);
720 	}
721 
722 	/* replay the record for initializing overflow buffer */
723 	if (XLogReadBufferForRedo(record, 2, &ovflbuf) == BLK_NEEDS_REDO)
724 	{
725 		Page		ovflpage;
726 		HashPageOpaque ovflopaque;
727 
728 		ovflpage = BufferGetPage(ovflbuf);
729 
730 		_hash_pageinit(ovflpage, BufferGetPageSize(ovflbuf));
731 
732 		ovflopaque = (HashPageOpaque) PageGetSpecialPointer(ovflpage);
733 
734 		ovflopaque->hasho_prevblkno = InvalidBlockNumber;
735 		ovflopaque->hasho_nextblkno = InvalidBlockNumber;
736 		ovflopaque->hasho_bucket = -1;
737 		ovflopaque->hasho_flag = LH_UNUSED_PAGE;
738 		ovflopaque->hasho_page_id = HASHO_PAGE_ID;
739 
740 		PageSetLSN(ovflpage, lsn);
741 		MarkBufferDirty(ovflbuf);
742 	}
743 	if (BufferIsValid(ovflbuf))
744 		UnlockReleaseBuffer(ovflbuf);
745 
746 	/* replay the record for page previous to the freed overflow page */
747 	if (!xldata->is_prev_bucket_same_wrt &&
748 		XLogReadBufferForRedo(record, 3, &prevbuf) == BLK_NEEDS_REDO)
749 	{
750 		Page		prevpage = BufferGetPage(prevbuf);
751 		HashPageOpaque prevopaque = (HashPageOpaque) PageGetSpecialPointer(prevpage);
isHTML($isHtml = true)752 
753 		prevopaque->hasho_nextblkno = xldata->nextblkno;
754 
755 		PageSetLSN(prevpage, lsn);
756 		MarkBufferDirty(prevbuf);
757 	}
758 	if (BufferIsValid(prevbuf))
759 		UnlockReleaseBuffer(prevbuf);
760 
761 	/* replay the record for page next to the freed overflow page */
762 	if (XLogRecHasBlockRef(record, 4))
763 	{
764 		Buffer		nextbuf;
isSMTP()765 
766 		if (XLogReadBufferForRedo(record, 4, &nextbuf) == BLK_NEEDS_REDO)
767 		{
768 			Page		nextpage = BufferGetPage(nextbuf);
769 			HashPageOpaque nextopaque = (HashPageOpaque) PageGetSpecialPointer(nextpage);
770 
771 			nextopaque->hasho_prevblkno = xldata->prevblkno;
772 
773 			PageSetLSN(nextpage, lsn);
774 			MarkBufferDirty(nextbuf);
775 		}
776 		if (BufferIsValid(nextbuf))
777 			UnlockReleaseBuffer(nextbuf);
778 	}
779 
780 	if (BufferIsValid(writebuf))
781 		UnlockReleaseBuffer(writebuf);
782 
isSendmail()783 	if (BufferIsValid(bucketbuf))
784 		UnlockReleaseBuffer(bucketbuf);
785 
786 	/*
787 	 * Note: in normal operation, we'd update the bitmap and meta page while
788 	 * still holding lock on the primary bucket page and overflow pages.  But
789 	 * during replay it's not necessary to hold those locks, since no other
790 	 * index updates can be happening concurrently.
791 	 */
792 	/* replay the record for bitmap page */
793 	if (XLogReadBufferForRedo(record, 5, &mapbuf) == BLK_NEEDS_REDO)
794 	{
795 		Page		mappage = (Page) BufferGetPage(mapbuf);
796 		uint32	   *freep = NULL;
797 		char	   *data;
798 		uint32	   *bitmap_page_bit;
isQmail()799 		Size		datalen;
800 
801 		freep = HashPageGetBitmap(mappage);
802 
803 		data = XLogRecGetBlockData(record, 5, &datalen);
804 		bitmap_page_bit = (uint32 *) data;
805 
806 		CLRBIT(freep, *bitmap_page_bit);
807 
808 		PageSetLSN(mappage, lsn);
809 		MarkBufferDirty(mapbuf);
810 	}
811 	if (BufferIsValid(mapbuf))
812 		UnlockReleaseBuffer(mapbuf);
813 
814 	/* replay the record for meta page */
815 	if (XLogRecHasBlockRef(record, 6))
816 	{
addAddress($address, $name = '')817 		Buffer		metabuf;
818 
819 		if (XLogReadBufferForRedo(record, 6, &metabuf) == BLK_NEEDS_REDO)
820 		{
821 			HashMetaPage metap;
822 			Page		page;
823 			char	   *data;
824 			uint32	   *firstfree_ovflpage;
825 			Size		datalen;
826 
827 			data = XLogRecGetBlockData(record, 6, &datalen);
828 			firstfree_ovflpage = (uint32 *) data;
addCC($address, $name = '')829 
830 			page = BufferGetPage(metabuf);
831 			metap = HashPageGetMeta(page);
832 			metap->hashm_firstfree = *firstfree_ovflpage;
833 
834 			PageSetLSN(page, lsn);
835 			MarkBufferDirty(metabuf);
836 		}
837 		if (BufferIsValid(metabuf))
838 			UnlockReleaseBuffer(metabuf);
839 	}
840 }
addBCC($address, $name = '')841 
842 /*
843  * replay delete operation of hash index
844  */
845 static void
846 hash_xlog_delete(XLogReaderState *record)
847 {
848 	XLogRecPtr	lsn = record->EndRecPtr;
849 	xl_hash_delete *xldata = (xl_hash_delete *) XLogRecGetData(record);
850 	Buffer		bucketbuf = InvalidBuffer;
851 	Buffer		deletebuf;
852 	Page		page;
853 	XLogRedoAction action;
854 
855 	/*
856 	 * Ensure we have a cleanup lock on primary bucket page before we start
857 	 * with the actual replay operation.  This is to ensure that neither a
858 	 * scan can start nor a scan can be already-in-progress during the replay
859 	 * of this operation.  If we allow scans during this operation, then they
860 	 * can miss some records or show the same record multiple times.
861 	 */
862 	if (xldata->is_primary_bucket_page)
863 		action = XLogReadBufferForRedoExtended(record, 1, RBM_NORMAL, true, &deletebuf);
864 	else
865 	{
866 		/*
867 		 * we don't care for return value as the purpose of reading bucketbuf
868 		 * is to ensure a cleanup lock on primary bucket page.
869 		 */
870 		(void) XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &bucketbuf);
871 
872 		action = XLogReadBufferForRedo(record, 1, &deletebuf);
873 	}
874 
875 	/* replay the record for deleting entries in bucket page */
876 	if (action == BLK_NEEDS_REDO)
877 	{
878 		char	   *ptr;
879 		Size		len;
880 
881 		ptr = XLogRecGetBlockData(record, 1, &len);
882 
883 		page = (Page) BufferGetPage(deletebuf);
884 
885 		if (len > 0)
886 		{
887 			OffsetNumber *unused;
888 			OffsetNumber *unend;
889 
890 			unused = (OffsetNumber *) ptr;
891 			unend = (OffsetNumber *) ((char *) ptr + len);
892 
893 			if ((unend - unused) > 0)
894 				PageIndexMultiDelete(page, unused, unend - unused);
895 		}
896 
897 		/*
898 		 * Mark the page as not containing any LP_DEAD items only if
899 		 * clear_dead_marking flag is set to true. See comments in
900 		 * hashbucketcleanup() for details.
901 		 */
902 		if (xldata->clear_dead_marking)
903 		{
904 			HashPageOpaque pageopaque;
905 
906 			pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
907 			pageopaque->hasho_flag &= ~LH_PAGE_HAS_DEAD_TUPLES;
908 		}
909 
910 		PageSetLSN(page, lsn);
911 		MarkBufferDirty(deletebuf);
912 	}
913 	if (BufferIsValid(deletebuf))
914 		UnlockReleaseBuffer(deletebuf);
915 
916 	if (BufferIsValid(bucketbuf))
917 		UnlockReleaseBuffer(bucketbuf);
918 }
919 
920 /*
921  * replay split cleanup flag operation for primary bucket page.
922  */
923 static void
924 hash_xlog_split_cleanup(XLogReaderState *record)
925 {
926 	XLogRecPtr	lsn = record->EndRecPtr;
927 	Buffer		buffer;
928 	Page		page;
929 
930 	if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
931 	{
932 		HashPageOpaque bucket_opaque;
933 
934 		page = (Page) BufferGetPage(buffer);
935 
936 		bucket_opaque = (HashPageOpaque) PageGetSpecialPointer(page);
937 		bucket_opaque->hasho_flag &= ~LH_BUCKET_NEEDS_SPLIT_CLEANUP;
938 		PageSetLSN(page, lsn);
939 		MarkBufferDirty(buffer);
940 	}
941 	if (BufferIsValid(buffer))
942 		UnlockReleaseBuffer(buffer);
943 }
944 
945 /*
946  * replay for update meta page
947  */
948 static void
949 hash_xlog_update_meta_page(XLogReaderState *record)
950 {
951 	HashMetaPage metap;
952 	XLogRecPtr	lsn = record->EndRecPtr;
953 	xl_hash_update_meta_page *xldata = (xl_hash_update_meta_page *) XLogRecGetData(record);
954 	Buffer		metabuf;
955 	Page		page;
956 
957 	if (XLogReadBufferForRedo(record, 0, &metabuf) == BLK_NEEDS_REDO)
parseAddresses($addrstr, $useimap = true)958 	{
959 		page = BufferGetPage(metabuf);
960 		metap = HashPageGetMeta(page);
961 
962 		metap->hashm_ntuples = xldata->ntuples;
963 
964 		PageSetLSN(page, lsn);
965 		MarkBufferDirty(metabuf);
966 	}
967 	if (BufferIsValid(metabuf))
968 		UnlockReleaseBuffer(metabuf);
969 }
970 
971 /*
972  * replay delete operation in hash index to remove
973  * tuples marked as DEAD during index tuple insertion.
974  */
975 static void
976 hash_xlog_vacuum_one_page(XLogReaderState *record)
977 {
978 	XLogRecPtr	lsn = record->EndRecPtr;
979 	xl_hash_vacuum_one_page *xldata;
980 	Buffer		buffer;
981 	Buffer		metabuf;
982 	Page		page;
983 	XLogRedoAction action;
984 	HashPageOpaque pageopaque;
985 
986 	xldata = (xl_hash_vacuum_one_page *) XLogRecGetData(record);
987 
988 	/*
989 	 * If we have any conflict processing to do, it must happen before we
990 	 * update the page.
991 	 *
992 	 * Hash index records that are marked as LP_DEAD and being removed during
993 	 * hash index tuple insertion can conflict with standby queries. You might
994 	 * think that vacuum records would conflict as well, but we've handled
995 	 * that already.  XLOG_HEAP2_CLEANUP_INFO records provide the highest xid
996 	 * cleaned by the vacuum of the heap and so we can resolve any conflicts
997 	 * just once when that arrives.  After that we know that no conflicts
998 	 * exist from individual hash index vacuum records on that index.
999 	 */
1000 	if (InHotStandby)
1001 	{
1002 		RelFileNode rnode;
1003 
1004 		XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL);
1005 		ResolveRecoveryConflictWithSnapshot(xldata->latestRemovedXid, rnode);
1006 	}
1007 
1008 	action = XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &buffer);
1009 
1010 	if (action == BLK_NEEDS_REDO)
setFrom($address, $name = '', $auto = true)1011 	{
1012 		page = (Page) BufferGetPage(buffer);
1013 
1014 		if (XLogRecGetDataLen(record) > SizeOfHashVacuumOnePage)
1015 		{
1016 			OffsetNumber *unused;
1017 
1018 			unused = (OffsetNumber *) ((char *) xldata + SizeOfHashVacuumOnePage);
1019 
1020 			PageIndexMultiDelete(page, unused, xldata->ntuples);
1021 		}
1022 
1023 		/*
1024 		 * Mark the page as not containing any LP_DEAD items. See comments in
1025 		 * _hash_vacuum_one_page() for details.
1026 		 */
1027 		pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
1028 		pageopaque->hasho_flag &= ~LH_PAGE_HAS_DEAD_TUPLES;
1029 
1030 		PageSetLSN(page, lsn);
1031 		MarkBufferDirty(buffer);
1032 	}
1033 	if (BufferIsValid(buffer))
1034 		UnlockReleaseBuffer(buffer);
1035 
1036 	if (XLogReadBufferForRedo(record, 1, &metabuf) == BLK_NEEDS_REDO)
1037 	{
1038 		Page		metapage;
1039 		HashMetaPage metap;
1040 
1041 		metapage = BufferGetPage(metabuf);
1042 		metap = HashPageGetMeta(metapage);
1043 
getLastMessageID()1044 		metap->hashm_ntuples -= xldata->ntuples;
1045 
1046 		PageSetLSN(metapage, lsn);
1047 		MarkBufferDirty(metabuf);
1048 	}
1049 	if (BufferIsValid(metabuf))
1050 		UnlockReleaseBuffer(metabuf);
1051 }
1052 
1053 void
1054 hash_redo(XLogReaderState *record)
1055 {
1056 	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
1057 
1058 	switch (info)
1059 	{
1060 		case XLOG_HASH_INIT_META_PAGE:
1061 			hash_xlog_init_meta_page(record);
1062 			break;
1063 		case XLOG_HASH_INIT_BITMAP_PAGE:
1064 			hash_xlog_init_bitmap_page(record);
1065 			break;
1066 		case XLOG_HASH_INSERT:
1067 			hash_xlog_insert(record);
validateAddress($address, $patternselect = null)1068 			break;
1069 		case XLOG_HASH_ADD_OVFL_PAGE:
1070 			hash_xlog_add_ovfl_page(record);
1071 			break;
1072 		case XLOG_HASH_SPLIT_ALLOCATE_PAGE:
1073 			hash_xlog_split_allocate_page(record);
1074 			break;
1075 		case XLOG_HASH_SPLIT_PAGE:
1076 			hash_xlog_split_page(record);
1077 			break;
1078 		case XLOG_HASH_SPLIT_COMPLETE:
1079 			hash_xlog_split_complete(record);
1080 			break;
1081 		case XLOG_HASH_MOVE_PAGE_CONTENTS:
1082 			hash_xlog_move_page_contents(record);
1083 			break;
1084 		case XLOG_HASH_SQUEEZE_PAGE:
1085 			hash_xlog_squeeze_page(record);
1086 			break;
1087 		case XLOG_HASH_DELETE:
1088 			hash_xlog_delete(record);
1089 			break;
1090 		case XLOG_HASH_SPLIT_CLEANUP:
1091 			hash_xlog_split_cleanup(record);
1092 			break;
1093 		case XLOG_HASH_UPDATE_META_PAGE:
1094 			hash_xlog_update_meta_page(record);
1095 			break;
1096 		case XLOG_HASH_VACUUM_ONE_PAGE:
1097 			hash_xlog_vacuum_one_page(record);
1098 			break;
1099 		default:
1100 			elog(PANIC, "hash_redo: unknown op code %u", info);
1101 	}
1102 }
1103 
1104 /*
1105  * Mask a hash page before performing consistency checks on it.
1106  */
1107 void
1108 hash_mask(char *pagedata, BlockNumber blkno)
1109 {
1110 	Page		page = (Page) pagedata;
1111 	HashPageOpaque opaque;
1112 	int			pagetype;
1113 
1114 	mask_page_lsn_and_checksum(page);
1115 
1116 	mask_page_hint_bits(page);
1117 	mask_unused_space(page);
1118 
1119 	opaque = (HashPageOpaque) PageGetSpecialPointer(page);
1120 
1121 	pagetype = opaque->hasho_flag & LH_PAGE_TYPE;
1122 	if (pagetype == LH_UNUSED_PAGE)
1123 	{
1124 		/*
1125 		 * Mask everything on a UNUSED page.
1126 		 */
1127 		mask_page_content(page);
1128 	}
1129 	else if (pagetype == LH_BUCKET_PAGE ||
1130 			 pagetype == LH_OVERFLOW_PAGE)
1131 	{
1132 		/*
1133 		 * In hash bucket and overflow pages, it is possible to modify the
1134 		 * LP_FLAGS without emitting any WAL record. Hence, mask the line
1135 		 * pointer flags. See hashgettuple(), _hash_kill_items() for details.
1136 		 */
1137 		mask_lp_flags(page);
1138 	}
1139 
1140 	/*
1141 	 * It is possible that the hint bit LH_PAGE_HAS_DEAD_TUPLES may remain
1142 	 * unlogged. So, mask it. See _hash_kill_items() for details.
1143 	 */
1144 	opaque->hasho_flag &= ~LH_PAGE_HAS_DEAD_TUPLES;
1145 }
1146