1 /*-------------------------------------------------------------------------
2  *
3  * bufmgr.c
4  *	  buffer manager interface routines
5  *
6  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/storage/buffer/bufmgr.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 /*
16  * Principal entry points:
17  *
18  * ReadBuffer() -- find or create a buffer holding the requested page,
19  *		and pin it so that no one can destroy it while this process
20  *		is using it.
21  *
22  * ReleaseBuffer() -- unpin a buffer
23  *
24  * MarkBufferDirty() -- mark a pinned buffer's contents as "dirty".
25  *		The disk write is delayed until buffer replacement or checkpoint.
26  *
27  * See also these files:
28  *		freelist.c -- chooses victim for buffer replacement
29  *		buf_table.c -- manages the buffer lookup table
30  */
31 #include "postgres.h"
32 
33 #include <sys/file.h>
34 #include <unistd.h>
35 
36 #include "access/xlog.h"
37 #include "catalog/catalog.h"
38 #include "catalog/storage.h"
39 #include "executor/instrument.h"
40 #include "lib/binaryheap.h"
41 #include "miscadmin.h"
42 #include "pg_trace.h"
43 #include "pgstat.h"
44 #include "postmaster/bgwriter.h"
45 #include "storage/buf_internals.h"
46 #include "storage/bufmgr.h"
47 #include "storage/ipc.h"
48 #include "storage/proc.h"
49 #include "storage/smgr.h"
50 #include "storage/standby.h"
51 #include "utils/rel.h"
52 #include "utils/resowner_private.h"
53 #include "utils/timestamp.h"
54 
55 
56 /* Note: these two macros only work on shared buffers, not local ones! */
57 #define BufHdrGetBlock(bufHdr)	((Block) (BufferBlocks + ((Size) (bufHdr)->buf_id) * BLCKSZ))
58 #define BufferGetLSN(bufHdr)	(PageGetLSN(BufHdrGetBlock(bufHdr)))
59 
60 /* Note: this macro only works on local buffers, not shared ones! */
61 #define LocalBufHdrGetBlock(bufHdr) \
62 	LocalBufferBlockPointers[-((bufHdr)->buf_id + 2)]
63 
64 /* Bits in SyncOneBuffer's return value */
65 #define BUF_WRITTEN				0x01
66 #define BUF_REUSABLE			0x02
67 
68 #define DROP_RELS_BSEARCH_THRESHOLD		20
69 
70 typedef struct PrivateRefCountEntry
71 {
72 	Buffer		buffer;
73 	int32		refcount;
74 } PrivateRefCountEntry;
75 
76 /* 64 bytes, about the size of a cache line on common systems */
77 #define REFCOUNT_ARRAY_ENTRIES 8
78 
79 /*
80  * Status of buffers to checkpoint for a particular tablespace, used
81  * internally in BufferSync.
82  */
83 typedef struct CkptTsStatus
84 {
85 	/* oid of the tablespace */
86 	Oid			tsId;
87 
88 	/*
89 	 * Checkpoint progress for this tablespace. To make progress comparable
90 	 * between tablespaces the progress is, for each tablespace, measured as a
91 	 * number between 0 and the total number of to-be-checkpointed pages. Each
92 	 * page checkpointed in this tablespace increments this space's progress
93 	 * by progress_slice.
94 	 */
95 	float8		progress;
96 	float8		progress_slice;
97 
98 	/* number of to-be checkpointed pages in this tablespace */
99 	int			num_to_scan;
100 	/* already processed pages in this tablespace */
101 	int			num_scanned;
102 
103 	/* current offset in CkptBufferIds for this tablespace */
104 	int			index;
105 } CkptTsStatus;
106 
107 /* GUC variables */
108 bool		zero_damaged_pages = false;
109 int			bgwriter_lru_maxpages = 100;
110 double		bgwriter_lru_multiplier = 2.0;
111 bool		track_io_timing = false;
112 int			effective_io_concurrency = 0;
113 
114 /*
115  * GUC variables about triggering kernel writeback for buffers written; OS
116  * dependent defaults are set via the GUC mechanism.
117  */
118 int			checkpoint_flush_after = 0;
119 int			bgwriter_flush_after = 0;
120 int			backend_flush_after = 0;
121 
122 /*
123  * How many buffers PrefetchBuffer callers should try to stay ahead of their
124  * ReadBuffer calls by.  This is maintained by the assign hook for
125  * effective_io_concurrency.  Zero means "never prefetch".  This value is
126  * only used for buffers not belonging to tablespaces that have their
127  * effective_io_concurrency parameter set.
128  */
129 int			target_prefetch_pages = 0;
130 
131 /* local state for StartBufferIO and related functions */
132 static BufferDesc *InProgressBuf = NULL;
133 static bool IsForInput;
134 
135 /* local state for LockBufferForCleanup */
136 static BufferDesc *PinCountWaitBuf = NULL;
137 
138 /*
139  * Backend-Private refcount management:
140  *
141  * Each buffer also has a private refcount that keeps track of the number of
142  * times the buffer is pinned in the current process.  This is so that the
143  * shared refcount needs to be modified only once if a buffer is pinned more
144  * than once by an individual backend.  It's also used to check that no buffers
145  * are still pinned at the end of transactions and when exiting.
146  *
147  *
148  * To avoid - as we used to - requiring an array with NBuffers entries to keep
149  * track of local buffers, we use a small sequentially searched array
150  * (PrivateRefCountArray) and an overflow hash table (PrivateRefCountHash) to
151  * keep track of backend local pins.
152  *
153  * Until no more than REFCOUNT_ARRAY_ENTRIES buffers are pinned at once, all
154  * refcounts are kept track of in the array; after that, new array entries
155  * displace old ones into the hash table. That way a frequently used entry
156  * can't get "stuck" in the hashtable while infrequent ones clog the array.
157  *
158  * Note that in most scenarios the number of pinned buffers will not exceed
159  * REFCOUNT_ARRAY_ENTRIES.
160  *
161  *
162  * To enter a buffer into the refcount tracking mechanism first reserve a free
163  * entry using ReservePrivateRefCountEntry() and then later, if necessary,
164  * fill it with NewPrivateRefCountEntry(). That split lets us avoid doing
165  * memory allocations in NewPrivateRefCountEntry() which can be important
166  * because in some scenarios it's called with a spinlock held...
167  */
168 static struct PrivateRefCountEntry PrivateRefCountArray[REFCOUNT_ARRAY_ENTRIES];
169 static HTAB *PrivateRefCountHash = NULL;
170 static int32 PrivateRefCountOverflowed = 0;
171 static uint32 PrivateRefCountClock = 0;
172 static PrivateRefCountEntry *ReservedRefCountEntry = NULL;
173 
174 static void ReservePrivateRefCountEntry(void);
175 static PrivateRefCountEntry *NewPrivateRefCountEntry(Buffer buffer);
176 static PrivateRefCountEntry *GetPrivateRefCountEntry(Buffer buffer, bool do_move);
177 static inline int32 GetPrivateRefCount(Buffer buffer);
178 static void ForgetPrivateRefCountEntry(PrivateRefCountEntry *ref);
179 
180 /*
181  * Ensure that the PrivateRefCountArray has sufficient space to store one more
182  * entry. This has to be called before using NewPrivateRefCountEntry() to fill
183  * a new entry - but it's perfectly fine to not use a reserved entry.
184  */
185 static void
ReservePrivateRefCountEntry(void)186 ReservePrivateRefCountEntry(void)
187 {
188 	/* Already reserved (or freed), nothing to do */
189 	if (ReservedRefCountEntry != NULL)
190 		return;
191 
192 	/*
193 	 * First search for a free entry the array, that'll be sufficient in the
194 	 * majority of cases.
195 	 */
196 	{
197 		int			i;
198 
199 		for (i = 0; i < REFCOUNT_ARRAY_ENTRIES; i++)
200 		{
201 			PrivateRefCountEntry *res;
202 
203 			res = &PrivateRefCountArray[i];
204 
205 			if (res->buffer == InvalidBuffer)
206 			{
207 				ReservedRefCountEntry = res;
208 				return;
209 			}
210 		}
211 	}
212 
213 	/*
214 	 * No luck. All array entries are full. Move one array entry into the hash
215 	 * table.
216 	 */
217 	{
218 		/*
219 		 * Move entry from the current clock position in the array into the
220 		 * hashtable. Use that slot.
221 		 */
222 		PrivateRefCountEntry *hashent;
223 		bool		found;
224 
225 		/* select victim slot */
226 		ReservedRefCountEntry =
227 			&PrivateRefCountArray[PrivateRefCountClock++ % REFCOUNT_ARRAY_ENTRIES];
228 
229 		/* Better be used, otherwise we shouldn't get here. */
230 		Assert(ReservedRefCountEntry->buffer != InvalidBuffer);
231 
232 		/* enter victim array entry into hashtable */
233 		hashent = hash_search(PrivateRefCountHash,
234 							  (void *) &(ReservedRefCountEntry->buffer),
235 							  HASH_ENTER,
236 							  &found);
237 		Assert(!found);
238 		hashent->refcount = ReservedRefCountEntry->refcount;
239 
240 		/* clear the now free array slot */
241 		ReservedRefCountEntry->buffer = InvalidBuffer;
242 		ReservedRefCountEntry->refcount = 0;
243 
244 		PrivateRefCountOverflowed++;
245 	}
246 }
247 
248 /*
249  * Fill a previously reserved refcount entry.
250  */
251 static PrivateRefCountEntry *
NewPrivateRefCountEntry(Buffer buffer)252 NewPrivateRefCountEntry(Buffer buffer)
253 {
254 	PrivateRefCountEntry *res;
255 
256 	/* only allowed to be called when a reservation has been made */
257 	Assert(ReservedRefCountEntry != NULL);
258 
259 	/* use up the reserved entry */
260 	res = ReservedRefCountEntry;
261 	ReservedRefCountEntry = NULL;
262 
263 	/* and fill it */
264 	res->buffer = buffer;
265 	res->refcount = 0;
266 
267 	return res;
268 }
269 
270 /*
271  * Return the PrivateRefCount entry for the passed buffer.
272  *
273  * Returns NULL if a buffer doesn't have a refcount entry. Otherwise, if
274  * do_move is true, and the entry resides in the hashtable the entry is
275  * optimized for frequent access by moving it to the array.
276  */
277 static PrivateRefCountEntry *
GetPrivateRefCountEntry(Buffer buffer,bool do_move)278 GetPrivateRefCountEntry(Buffer buffer, bool do_move)
279 {
280 	PrivateRefCountEntry *res;
281 	int			i;
282 
283 	Assert(BufferIsValid(buffer));
284 	Assert(!BufferIsLocal(buffer));
285 
286 	/*
287 	 * First search for references in the array, that'll be sufficient in the
288 	 * majority of cases.
289 	 */
290 	for (i = 0; i < REFCOUNT_ARRAY_ENTRIES; i++)
291 	{
292 		res = &PrivateRefCountArray[i];
293 
294 		if (res->buffer == buffer)
295 			return res;
296 	}
297 
298 	/*
299 	 * By here we know that the buffer, if already pinned, isn't residing in
300 	 * the array.
301 	 *
302 	 * Only look up the buffer in the hashtable if we've previously overflowed
303 	 * into it.
304 	 */
305 	if (PrivateRefCountOverflowed == 0)
306 		return NULL;
307 
308 	res = hash_search(PrivateRefCountHash,
309 					  (void *) &buffer,
310 					  HASH_FIND,
311 					  NULL);
312 
313 	if (res == NULL)
314 		return NULL;
315 	else if (!do_move)
316 	{
317 		/* caller doesn't want us to move the hash entry into the array */
318 		return res;
319 	}
320 	else
321 	{
322 		/* move buffer from hashtable into the free array slot */
323 		bool		found;
324 		PrivateRefCountEntry *free;
325 
326 		/* Ensure there's a free array slot */
327 		ReservePrivateRefCountEntry();
328 
329 		/* Use up the reserved slot */
330 		Assert(ReservedRefCountEntry != NULL);
331 		free = ReservedRefCountEntry;
332 		ReservedRefCountEntry = NULL;
333 		Assert(free->buffer == InvalidBuffer);
334 
335 		/* and fill it */
336 		free->buffer = buffer;
337 		free->refcount = res->refcount;
338 
339 		/* delete from hashtable */
340 		hash_search(PrivateRefCountHash,
341 					(void *) &buffer,
342 					HASH_REMOVE,
343 					&found);
344 		Assert(found);
345 		Assert(PrivateRefCountOverflowed > 0);
346 		PrivateRefCountOverflowed--;
347 
348 		return free;
349 	}
350 }
351 
352 /*
353  * Returns how many times the passed buffer is pinned by this backend.
354  *
355  * Only works for shared memory buffers!
356  */
357 static inline int32
GetPrivateRefCount(Buffer buffer)358 GetPrivateRefCount(Buffer buffer)
359 {
360 	PrivateRefCountEntry *ref;
361 
362 	Assert(BufferIsValid(buffer));
363 	Assert(!BufferIsLocal(buffer));
364 
365 	/*
366 	 * Not moving the entry - that's ok for the current users, but we might
367 	 * want to change this one day.
368 	 */
369 	ref = GetPrivateRefCountEntry(buffer, false);
370 
371 	if (ref == NULL)
372 		return 0;
373 	return ref->refcount;
374 }
375 
376 /*
377  * Release resources used to track the reference count of a buffer which we no
378  * longer have pinned and don't want to pin again immediately.
379  */
380 static void
ForgetPrivateRefCountEntry(PrivateRefCountEntry * ref)381 ForgetPrivateRefCountEntry(PrivateRefCountEntry *ref)
382 {
383 	Assert(ref->refcount == 0);
384 
385 	if (ref >= &PrivateRefCountArray[0] &&
386 		ref < &PrivateRefCountArray[REFCOUNT_ARRAY_ENTRIES])
387 	{
388 		ref->buffer = InvalidBuffer;
389 
390 		/*
391 		 * Mark the just used entry as reserved - in many scenarios that
392 		 * allows us to avoid ever having to search the array/hash for free
393 		 * entries.
394 		 */
395 		ReservedRefCountEntry = ref;
396 	}
397 	else
398 	{
399 		bool		found;
400 		Buffer		buffer = ref->buffer;
401 
402 		hash_search(PrivateRefCountHash,
403 					(void *) &buffer,
404 					HASH_REMOVE,
405 					&found);
406 		Assert(found);
407 		Assert(PrivateRefCountOverflowed > 0);
408 		PrivateRefCountOverflowed--;
409 	}
410 }
411 
412 /*
413  * BufferIsPinned
414  *		True iff the buffer is pinned (also checks for valid buffer number).
415  *
416  *		NOTE: what we check here is that *this* backend holds a pin on
417  *		the buffer.  We do not care whether some other backend does.
418  */
419 #define BufferIsPinned(bufnum) \
420 ( \
421 	!BufferIsValid(bufnum) ? \
422 		false \
423 	: \
424 		BufferIsLocal(bufnum) ? \
425 			(LocalRefCount[-(bufnum) - 1] > 0) \
426 		: \
427 	(GetPrivateRefCount(bufnum) > 0) \
428 )
429 
430 
431 static Buffer ReadBuffer_common(SMgrRelation reln, char relpersistence,
432 				  ForkNumber forkNum, BlockNumber blockNum,
433 				  ReadBufferMode mode, BufferAccessStrategy strategy,
434 				  bool *hit);
435 static bool PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy);
436 static void PinBuffer_Locked(BufferDesc *buf);
437 static void UnpinBuffer(BufferDesc *buf, bool fixOwner);
438 static void BufferSync(int flags);
439 static uint32 WaitBufHdrUnlocked(BufferDesc *buf);
440 static int	SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *flush_context);
441 static void WaitIO(BufferDesc *buf);
442 static bool StartBufferIO(BufferDesc *buf, bool forInput);
443 static void TerminateBufferIO(BufferDesc *buf, bool clear_dirty,
444 				  uint32 set_flag_bits);
445 static void shared_buffer_write_error_callback(void *arg);
446 static void local_buffer_write_error_callback(void *arg);
447 static BufferDesc *BufferAlloc(SMgrRelation smgr,
448 			char relpersistence,
449 			ForkNumber forkNum,
450 			BlockNumber blockNum,
451 			BufferAccessStrategy strategy,
452 			bool *foundPtr);
453 static void FlushBuffer(BufferDesc *buf, SMgrRelation reln);
454 static void AtProcExit_Buffers(int code, Datum arg);
455 static void CheckForBufferLeaks(void);
456 static int	rnode_comparator(const void *p1, const void *p2);
457 static int	buffertag_comparator(const void *p1, const void *p2);
458 static int	ckpt_buforder_comparator(const void *pa, const void *pb);
459 static int	ts_ckpt_progress_comparator(Datum a, Datum b, void *arg);
460 
461 
462 /*
463  * ComputeIoConcurrency -- get the number of pages to prefetch for a given
464  *		number of spindles.
465  */
466 bool
ComputeIoConcurrency(int io_concurrency,double * target)467 ComputeIoConcurrency(int io_concurrency, double *target)
468 {
469 	double		new_prefetch_pages = 0.0;
470 	int			i;
471 
472 	/*
473 	 * Make sure the io_concurrency value is within valid range; it may have
474 	 * been forced with a manual pg_tablespace update.
475 	 */
476 	io_concurrency = Min(Max(io_concurrency, 0), MAX_IO_CONCURRENCY);
477 
478 	/*----------
479 	 * The user-visible GUC parameter is the number of drives (spindles),
480 	 * which we need to translate to a number-of-pages-to-prefetch target.
481 	 * The target value is stashed in *extra and then assigned to the actual
482 	 * variable by assign_effective_io_concurrency.
483 	 *
484 	 * The expected number of prefetch pages needed to keep N drives busy is:
485 	 *
486 	 * drives |   I/O requests
487 	 * -------+----------------
488 	 *		1 |   1
489 	 *		2 |   2/1 + 2/2 = 3
490 	 *		3 |   3/1 + 3/2 + 3/3 = 5 1/2
491 	 *		4 |   4/1 + 4/2 + 4/3 + 4/4 = 8 1/3
492 	 *		n |   n * H(n)
493 	 *
494 	 * This is called the "coupon collector problem" and H(n) is called the
495 	 * harmonic series.  This could be approximated by n * ln(n), but for
496 	 * reasonable numbers of drives we might as well just compute the series.
497 	 *
498 	 * Alternatively we could set the target to the number of pages necessary
499 	 * so that the expected number of active spindles is some arbitrary
500 	 * percentage of the total.  This sounds the same but is actually slightly
501 	 * different.  The result ends up being ln(1-P)/ln((n-1)/n) where P is
502 	 * that desired fraction.
503 	 *
504 	 * Experimental results show that both of these formulas aren't aggressive
505 	 * enough, but we don't really have any better proposals.
506 	 *
507 	 * Note that if io_concurrency = 0 (disabled), we must set target = 0.
508 	 *----------
509 	 */
510 
511 	for (i = 1; i <= io_concurrency; i++)
512 		new_prefetch_pages += (double) io_concurrency / (double) i;
513 
514 	*target = new_prefetch_pages;
515 
516 	/* This range check shouldn't fail, but let's be paranoid */
517 	return (new_prefetch_pages >= 0.0 && new_prefetch_pages < (double) INT_MAX);
518 }
519 
520 /*
521  * PrefetchBuffer -- initiate asynchronous read of a block of a relation
522  *
523  * This is named by analogy to ReadBuffer but doesn't actually allocate a
524  * buffer.  Instead it tries to ensure that a future ReadBuffer for the given
525  * block will not be delayed by the I/O.  Prefetching is optional.
526  * No-op if prefetching isn't compiled in.
527  */
528 void
PrefetchBuffer(Relation reln,ForkNumber forkNum,BlockNumber blockNum)529 PrefetchBuffer(Relation reln, ForkNumber forkNum, BlockNumber blockNum)
530 {
531 #ifdef USE_PREFETCH
532 	Assert(RelationIsValid(reln));
533 	Assert(BlockNumberIsValid(blockNum));
534 
535 	/* Open it at the smgr level if not already done */
536 	RelationOpenSmgr(reln);
537 
538 	if (RelationUsesLocalBuffers(reln))
539 	{
540 		/* see comments in ReadBufferExtended */
541 		if (RELATION_IS_OTHER_TEMP(reln))
542 			ereport(ERROR,
543 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
544 					 errmsg("cannot access temporary tables of other sessions")));
545 
546 		/* pass it off to localbuf.c */
547 		LocalPrefetchBuffer(reln->rd_smgr, forkNum, blockNum);
548 	}
549 	else
550 	{
551 		BufferTag	newTag;		/* identity of requested block */
552 		uint32		newHash;	/* hash value for newTag */
553 		LWLock	   *newPartitionLock;	/* buffer partition lock for it */
554 		int			buf_id;
555 
556 		/* create a tag so we can lookup the buffer */
557 		INIT_BUFFERTAG(newTag, reln->rd_smgr->smgr_rnode.node,
558 					   forkNum, blockNum);
559 
560 		/* determine its hash code and partition lock ID */
561 		newHash = BufTableHashCode(&newTag);
562 		newPartitionLock = BufMappingPartitionLock(newHash);
563 
564 		/* see if the block is in the buffer pool already */
565 		LWLockAcquire(newPartitionLock, LW_SHARED);
566 		buf_id = BufTableLookup(&newTag, newHash);
567 		LWLockRelease(newPartitionLock);
568 
569 		/* If not in buffers, initiate prefetch */
570 		if (buf_id < 0)
571 			smgrprefetch(reln->rd_smgr, forkNum, blockNum);
572 
573 		/*
574 		 * If the block *is* in buffers, we do nothing.  This is not really
575 		 * ideal: the block might be just about to be evicted, which would be
576 		 * stupid since we know we are going to need it soon.  But the only
577 		 * easy answer is to bump the usage_count, which does not seem like a
578 		 * great solution: when the caller does ultimately touch the block,
579 		 * usage_count would get bumped again, resulting in too much
580 		 * favoritism for blocks that are involved in a prefetch sequence. A
581 		 * real fix would involve some additional per-buffer state, and it's
582 		 * not clear that there's enough of a problem to justify that.
583 		 */
584 	}
585 #endif							/* USE_PREFETCH */
586 }
587 
588 
589 /*
590  * ReadBuffer -- a shorthand for ReadBufferExtended, for reading from main
591  *		fork with RBM_NORMAL mode and default strategy.
592  */
593 Buffer
ReadBuffer(Relation reln,BlockNumber blockNum)594 ReadBuffer(Relation reln, BlockNumber blockNum)
595 {
596 	return ReadBufferExtended(reln, MAIN_FORKNUM, blockNum, RBM_NORMAL, NULL);
597 }
598 
599 /*
600  * ReadBufferExtended -- returns a buffer containing the requested
601  *		block of the requested relation.  If the blknum
602  *		requested is P_NEW, extend the relation file and
603  *		allocate a new block.  (Caller is responsible for
604  *		ensuring that only one backend tries to extend a
605  *		relation at the same time!)
606  *
607  * Returns: the buffer number for the buffer containing
608  *		the block read.  The returned buffer has been pinned.
609  *		Does not return on error --- elog's instead.
610  *
611  * Assume when this function is called, that reln has been opened already.
612  *
613  * In RBM_NORMAL mode, the page is read from disk, and the page header is
614  * validated.  An error is thrown if the page header is not valid.  (But
615  * note that an all-zero page is considered "valid"; see PageIsVerified().)
616  *
617  * RBM_ZERO_ON_ERROR is like the normal mode, but if the page header is not
618  * valid, the page is zeroed instead of throwing an error. This is intended
619  * for non-critical data, where the caller is prepared to repair errors.
620  *
621  * In RBM_ZERO_AND_LOCK mode, if the page isn't in buffer cache already, it's
622  * filled with zeros instead of reading it from disk.  Useful when the caller
623  * is going to fill the page from scratch, since this saves I/O and avoids
624  * unnecessary failure if the page-on-disk has corrupt page headers.
625  * The page is returned locked to ensure that the caller has a chance to
626  * initialize the page before it's made visible to others.
627  * Caution: do not use this mode to read a page that is beyond the relation's
628  * current physical EOF; that is likely to cause problems in md.c when
629  * the page is modified and written out. P_NEW is OK, though.
630  *
631  * RBM_ZERO_AND_CLEANUP_LOCK is the same as RBM_ZERO_AND_LOCK, but acquires
632  * a cleanup-strength lock on the page.
633  *
634  * RBM_NORMAL_NO_LOG mode is treated the same as RBM_NORMAL here.
635  *
636  * If strategy is not NULL, a nondefault buffer access strategy is used.
637  * See buffer/README for details.
638  */
639 Buffer
ReadBufferExtended(Relation reln,ForkNumber forkNum,BlockNumber blockNum,ReadBufferMode mode,BufferAccessStrategy strategy)640 ReadBufferExtended(Relation reln, ForkNumber forkNum, BlockNumber blockNum,
641 				   ReadBufferMode mode, BufferAccessStrategy strategy)
642 {
643 	bool		hit;
644 	Buffer		buf;
645 
646 	/* Open it at the smgr level if not already done */
647 	RelationOpenSmgr(reln);
648 
649 	/*
650 	 * Reject attempts to read non-local temporary relations; we would be
651 	 * likely to get wrong data since we have no visibility into the owning
652 	 * session's local buffers.
653 	 */
654 	if (RELATION_IS_OTHER_TEMP(reln))
655 		ereport(ERROR,
656 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
657 				 errmsg("cannot access temporary tables of other sessions")));
658 
659 	/*
660 	 * Read the buffer, and update pgstat counters to reflect a cache hit or
661 	 * miss.
662 	 */
663 	pgstat_count_buffer_read(reln);
664 	buf = ReadBuffer_common(reln->rd_smgr, reln->rd_rel->relpersistence,
665 							forkNum, blockNum, mode, strategy, &hit);
666 	if (hit)
667 		pgstat_count_buffer_hit(reln);
668 	return buf;
669 }
670 
671 
672 /*
673  * ReadBufferWithoutRelcache -- like ReadBufferExtended, but doesn't require
674  *		a relcache entry for the relation.
675  *
676  * NB: At present, this function may only be used on permanent relations, which
677  * is OK, because we only use it during XLOG replay.  If in the future we
678  * want to use it on temporary or unlogged relations, we could pass additional
679  * parameters.
680  */
681 Buffer
ReadBufferWithoutRelcache(RelFileNode rnode,ForkNumber forkNum,BlockNumber blockNum,ReadBufferMode mode,BufferAccessStrategy strategy)682 ReadBufferWithoutRelcache(RelFileNode rnode, ForkNumber forkNum,
683 						  BlockNumber blockNum, ReadBufferMode mode,
684 						  BufferAccessStrategy strategy)
685 {
686 	bool		hit;
687 
688 	SMgrRelation smgr = smgropen(rnode, InvalidBackendId);
689 
690 	Assert(InRecovery);
691 
692 	return ReadBuffer_common(smgr, RELPERSISTENCE_PERMANENT, forkNum, blockNum,
693 							 mode, strategy, &hit);
694 }
695 
696 
697 /*
698  * ReadBuffer_common -- common logic for all ReadBuffer variants
699  *
700  * *hit is set to true if the request was satisfied from shared buffer cache.
701  */
702 static Buffer
ReadBuffer_common(SMgrRelation smgr,char relpersistence,ForkNumber forkNum,BlockNumber blockNum,ReadBufferMode mode,BufferAccessStrategy strategy,bool * hit)703 ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
704 				  BlockNumber blockNum, ReadBufferMode mode,
705 				  BufferAccessStrategy strategy, bool *hit)
706 {
707 	BufferDesc *bufHdr;
708 	Block		bufBlock;
709 	bool		found;
710 	bool		isExtend;
711 	bool		isLocalBuf = SmgrIsTemp(smgr);
712 
713 	*hit = false;
714 
715 	/* Make sure we will have room to remember the buffer pin */
716 	ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
717 
718 	isExtend = (blockNum == P_NEW);
719 
720 	TRACE_POSTGRESQL_BUFFER_READ_START(forkNum, blockNum,
721 									   smgr->smgr_rnode.node.spcNode,
722 									   smgr->smgr_rnode.node.dbNode,
723 									   smgr->smgr_rnode.node.relNode,
724 									   smgr->smgr_rnode.backend,
725 									   isExtend);
726 
727 	/* Substitute proper block number if caller asked for P_NEW */
728 	if (isExtend)
729 	{
730 		blockNum = smgrnblocks(smgr, forkNum);
731 		/* Fail if relation is already at maximum possible length */
732 		if (blockNum == P_NEW)
733 			ereport(ERROR,
734 					(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
735 					 errmsg("cannot extend relation %s beyond %u blocks",
736 							relpath(smgr->smgr_rnode, forkNum),
737 							P_NEW)));
738 	}
739 
740 	if (isLocalBuf)
741 	{
742 		bufHdr = LocalBufferAlloc(smgr, forkNum, blockNum, &found);
743 		if (found)
744 			pgBufferUsage.local_blks_hit++;
745 		else
746 			pgBufferUsage.local_blks_read++;
747 	}
748 	else
749 	{
750 		/*
751 		 * lookup the buffer.  IO_IN_PROGRESS is set if the requested block is
752 		 * not currently in memory.
753 		 */
754 		bufHdr = BufferAlloc(smgr, relpersistence, forkNum, blockNum,
755 							 strategy, &found);
756 		if (found)
757 			pgBufferUsage.shared_blks_hit++;
758 		else
759 			pgBufferUsage.shared_blks_read++;
760 	}
761 
762 	/* At this point we do NOT hold any locks. */
763 
764 	/* if it was already in the buffer pool, we're done */
765 	if (found)
766 	{
767 		if (!isExtend)
768 		{
769 			/* Just need to update stats before we exit */
770 			*hit = true;
771 			VacuumPageHit++;
772 
773 			if (VacuumCostActive)
774 				VacuumCostBalance += VacuumCostPageHit;
775 
776 			TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum,
777 											  smgr->smgr_rnode.node.spcNode,
778 											  smgr->smgr_rnode.node.dbNode,
779 											  smgr->smgr_rnode.node.relNode,
780 											  smgr->smgr_rnode.backend,
781 											  isExtend,
782 											  found);
783 
784 			/*
785 			 * In RBM_ZERO_AND_LOCK mode the caller expects the page to be
786 			 * locked on return.
787 			 */
788 			if (!isLocalBuf)
789 			{
790 				if (mode == RBM_ZERO_AND_LOCK)
791 					LWLockAcquire(BufferDescriptorGetContentLock(bufHdr),
792 								  LW_EXCLUSIVE);
793 				else if (mode == RBM_ZERO_AND_CLEANUP_LOCK)
794 					LockBufferForCleanup(BufferDescriptorGetBuffer(bufHdr));
795 			}
796 
797 			return BufferDescriptorGetBuffer(bufHdr);
798 		}
799 
800 		/*
801 		 * We get here only in the corner case where we are trying to extend
802 		 * the relation but we found a pre-existing buffer marked BM_VALID.
803 		 * This can happen because mdread doesn't complain about reads beyond
804 		 * EOF (when zero_damaged_pages is ON) and so a previous attempt to
805 		 * read a block beyond EOF could have left a "valid" zero-filled
806 		 * buffer.  Unfortunately, we have also seen this case occurring
807 		 * because of buggy Linux kernels that sometimes return an
808 		 * lseek(SEEK_END) result that doesn't account for a recent write. In
809 		 * that situation, the pre-existing buffer would contain valid data
810 		 * that we don't want to overwrite.  Since the legitimate case should
811 		 * always have left a zero-filled buffer, complain if not PageIsNew.
812 		 */
813 		bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr);
814 		if (!PageIsNew((Page) bufBlock))
815 			ereport(ERROR,
816 					(errmsg("unexpected data beyond EOF in block %u of relation %s",
817 							blockNum, relpath(smgr->smgr_rnode, forkNum)),
818 					 errhint("This has been seen to occur with buggy kernels; consider updating your system.")));
819 
820 		/*
821 		 * We *must* do smgrextend before succeeding, else the page will not
822 		 * be reserved by the kernel, and the next P_NEW call will decide to
823 		 * return the same page.  Clear the BM_VALID bit, do the StartBufferIO
824 		 * call that BufferAlloc didn't, and proceed.
825 		 */
826 		if (isLocalBuf)
827 		{
828 			/* Only need to adjust flags */
829 			uint32		buf_state = pg_atomic_read_u32(&bufHdr->state);
830 
831 			Assert(buf_state & BM_VALID);
832 			buf_state &= ~BM_VALID;
833 			pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
834 		}
835 		else
836 		{
837 			/*
838 			 * Loop to handle the very small possibility that someone re-sets
839 			 * BM_VALID between our clearing it and StartBufferIO inspecting
840 			 * it.
841 			 */
842 			do
843 			{
844 				uint32		buf_state = LockBufHdr(bufHdr);
845 
846 				Assert(buf_state & BM_VALID);
847 				buf_state &= ~BM_VALID;
848 				UnlockBufHdr(bufHdr, buf_state);
849 			} while (!StartBufferIO(bufHdr, true));
850 		}
851 	}
852 
853 	/*
854 	 * if we have gotten to this point, we have allocated a buffer for the
855 	 * page but its contents are not yet valid.  IO_IN_PROGRESS is set for it,
856 	 * if it's a shared buffer.
857 	 *
858 	 * Note: if smgrextend fails, we will end up with a buffer that is
859 	 * allocated but not marked BM_VALID.  P_NEW will still select the same
860 	 * block number (because the relation didn't get any longer on disk) and
861 	 * so future attempts to extend the relation will find the same buffer (if
862 	 * it's not been recycled) but come right back here to try smgrextend
863 	 * again.
864 	 */
865 	Assert(!(pg_atomic_read_u32(&bufHdr->state) & BM_VALID));	/* spinlock not needed */
866 
867 	bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr);
868 
869 	if (isExtend)
870 	{
871 		/* new buffers are zero-filled */
872 		MemSet((char *) bufBlock, 0, BLCKSZ);
873 		/* don't set checksum for all-zero page */
874 		smgrextend(smgr, forkNum, blockNum, (char *) bufBlock, false);
875 
876 		/*
877 		 * NB: we're *not* doing a ScheduleBufferTagForWriteback here;
878 		 * although we're essentially performing a write. At least on linux
879 		 * doing so defeats the 'delayed allocation' mechanism, leading to
880 		 * increased file fragmentation.
881 		 */
882 	}
883 	else
884 	{
885 		/*
886 		 * Read in the page, unless the caller intends to overwrite it and
887 		 * just wants us to allocate a buffer.
888 		 */
889 		if (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK)
890 			MemSet((char *) bufBlock, 0, BLCKSZ);
891 		else
892 		{
893 			instr_time	io_start,
894 						io_time;
895 
896 			if (track_io_timing)
897 				INSTR_TIME_SET_CURRENT(io_start);
898 
899 			smgrread(smgr, forkNum, blockNum, (char *) bufBlock);
900 
901 			if (track_io_timing)
902 			{
903 				INSTR_TIME_SET_CURRENT(io_time);
904 				INSTR_TIME_SUBTRACT(io_time, io_start);
905 				pgstat_count_buffer_read_time(INSTR_TIME_GET_MICROSEC(io_time));
906 				INSTR_TIME_ADD(pgBufferUsage.blk_read_time, io_time);
907 			}
908 
909 			/* check for garbage data */
910 			if (!PageIsVerified((Page) bufBlock, blockNum))
911 			{
912 				if (mode == RBM_ZERO_ON_ERROR || zero_damaged_pages)
913 				{
914 					ereport(WARNING,
915 							(errcode(ERRCODE_DATA_CORRUPTED),
916 							 errmsg("invalid page in block %u of relation %s; zeroing out page",
917 									blockNum,
918 									relpath(smgr->smgr_rnode, forkNum))));
919 					MemSet((char *) bufBlock, 0, BLCKSZ);
920 				}
921 				else
922 					ereport(ERROR,
923 							(errcode(ERRCODE_DATA_CORRUPTED),
924 							 errmsg("invalid page in block %u of relation %s",
925 									blockNum,
926 									relpath(smgr->smgr_rnode, forkNum))));
927 			}
928 		}
929 	}
930 
931 	/*
932 	 * In RBM_ZERO_AND_LOCK mode, grab the buffer content lock before marking
933 	 * the page as valid, to make sure that no other backend sees the zeroed
934 	 * page before the caller has had a chance to initialize it.
935 	 *
936 	 * Since no-one else can be looking at the page contents yet, there is no
937 	 * difference between an exclusive lock and a cleanup-strength lock. (Note
938 	 * that we cannot use LockBuffer() or LockBufferForCleanup() here, because
939 	 * they assert that the buffer is already valid.)
940 	 */
941 	if ((mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK) &&
942 		!isLocalBuf)
943 	{
944 		LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_EXCLUSIVE);
945 	}
946 
947 	if (isLocalBuf)
948 	{
949 		/* Only need to adjust flags */
950 		uint32		buf_state = pg_atomic_read_u32(&bufHdr->state);
951 
952 		buf_state |= BM_VALID;
953 		pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
954 	}
955 	else
956 	{
957 		/* Set BM_VALID, terminate IO, and wake up any waiters */
958 		TerminateBufferIO(bufHdr, false, BM_VALID);
959 	}
960 
961 	VacuumPageMiss++;
962 	if (VacuumCostActive)
963 		VacuumCostBalance += VacuumCostPageMiss;
964 
965 	TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum,
966 									  smgr->smgr_rnode.node.spcNode,
967 									  smgr->smgr_rnode.node.dbNode,
968 									  smgr->smgr_rnode.node.relNode,
969 									  smgr->smgr_rnode.backend,
970 									  isExtend,
971 									  found);
972 
973 	return BufferDescriptorGetBuffer(bufHdr);
974 }
975 
976 /*
977  * BufferAlloc -- subroutine for ReadBuffer.  Handles lookup of a shared
978  *		buffer.  If no buffer exists already, selects a replacement
979  *		victim and evicts the old page, but does NOT read in new page.
980  *
981  * "strategy" can be a buffer replacement strategy object, or NULL for
982  * the default strategy.  The selected buffer's usage_count is advanced when
983  * using the default strategy, but otherwise possibly not (see PinBuffer).
984  *
985  * The returned buffer is pinned and is already marked as holding the
986  * desired page.  If it already did have the desired page, *foundPtr is
987  * set TRUE.  Otherwise, *foundPtr is set FALSE and the buffer is marked
988  * as IO_IN_PROGRESS; ReadBuffer will now need to do I/O to fill it.
989  *
990  * *foundPtr is actually redundant with the buffer's BM_VALID flag, but
991  * we keep it for simplicity in ReadBuffer.
992  *
993  * No locks are held either at entry or exit.
994  */
995 static BufferDesc *
BufferAlloc(SMgrRelation smgr,char relpersistence,ForkNumber forkNum,BlockNumber blockNum,BufferAccessStrategy strategy,bool * foundPtr)996 BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
997 			BlockNumber blockNum,
998 			BufferAccessStrategy strategy,
999 			bool *foundPtr)
1000 {
1001 	BufferTag	newTag;			/* identity of requested block */
1002 	uint32		newHash;		/* hash value for newTag */
1003 	LWLock	   *newPartitionLock;	/* buffer partition lock for it */
1004 	BufferTag	oldTag;			/* previous identity of selected buffer */
1005 	uint32		oldHash;		/* hash value for oldTag */
1006 	LWLock	   *oldPartitionLock;	/* buffer partition lock for it */
1007 	uint32		oldFlags;
1008 	int			buf_id;
1009 	BufferDesc *buf;
1010 	bool		valid;
1011 	uint32		buf_state;
1012 
1013 	/* create a tag so we can lookup the buffer */
1014 	INIT_BUFFERTAG(newTag, smgr->smgr_rnode.node, forkNum, blockNum);
1015 
1016 	/* determine its hash code and partition lock ID */
1017 	newHash = BufTableHashCode(&newTag);
1018 	newPartitionLock = BufMappingPartitionLock(newHash);
1019 
1020 	/* see if the block is in the buffer pool already */
1021 	LWLockAcquire(newPartitionLock, LW_SHARED);
1022 	buf_id = BufTableLookup(&newTag, newHash);
1023 	if (buf_id >= 0)
1024 	{
1025 		/*
1026 		 * Found it.  Now, pin the buffer so no one can steal it from the
1027 		 * buffer pool, and check to see if the correct data has been loaded
1028 		 * into the buffer.
1029 		 */
1030 		buf = GetBufferDescriptor(buf_id);
1031 
1032 		valid = PinBuffer(buf, strategy);
1033 
1034 		/* Can release the mapping lock as soon as we've pinned it */
1035 		LWLockRelease(newPartitionLock);
1036 
1037 		*foundPtr = TRUE;
1038 
1039 		if (!valid)
1040 		{
1041 			/*
1042 			 * We can only get here if (a) someone else is still reading in
1043 			 * the page, or (b) a previous read attempt failed.  We have to
1044 			 * wait for any active read attempt to finish, and then set up our
1045 			 * own read attempt if the page is still not BM_VALID.
1046 			 * StartBufferIO does it all.
1047 			 */
1048 			if (StartBufferIO(buf, true))
1049 			{
1050 				/*
1051 				 * If we get here, previous attempts to read the buffer must
1052 				 * have failed ... but we shall bravely try again.
1053 				 */
1054 				*foundPtr = FALSE;
1055 			}
1056 		}
1057 
1058 		return buf;
1059 	}
1060 
1061 	/*
1062 	 * Didn't find it in the buffer pool.  We'll have to initialize a new
1063 	 * buffer.  Remember to unlock the mapping lock while doing the work.
1064 	 */
1065 	LWLockRelease(newPartitionLock);
1066 
1067 	/* Loop here in case we have to try another victim buffer */
1068 	for (;;)
1069 	{
1070 		/*
1071 		 * Ensure, while the spinlock's not yet held, that there's a free
1072 		 * refcount entry.
1073 		 */
1074 		ReservePrivateRefCountEntry();
1075 
1076 		/*
1077 		 * Select a victim buffer.  The buffer is returned with its header
1078 		 * spinlock still held!
1079 		 */
1080 		buf = StrategyGetBuffer(strategy, &buf_state);
1081 
1082 		Assert(BUF_STATE_GET_REFCOUNT(buf_state) == 0);
1083 
1084 		/* Must copy buffer flags while we still hold the spinlock */
1085 		oldFlags = buf_state & BUF_FLAG_MASK;
1086 
1087 		/* Pin the buffer and then release the buffer spinlock */
1088 		PinBuffer_Locked(buf);
1089 
1090 		/*
1091 		 * If the buffer was dirty, try to write it out.  There is a race
1092 		 * condition here, in that someone might dirty it after we released it
1093 		 * above, or even while we are writing it out (since our share-lock
1094 		 * won't prevent hint-bit updates).  We will recheck the dirty bit
1095 		 * after re-locking the buffer header.
1096 		 */
1097 		if (oldFlags & BM_DIRTY)
1098 		{
1099 			/*
1100 			 * We need a share-lock on the buffer contents to write it out
1101 			 * (else we might write invalid data, eg because someone else is
1102 			 * compacting the page contents while we write).  We must use a
1103 			 * conditional lock acquisition here to avoid deadlock.  Even
1104 			 * though the buffer was not pinned (and therefore surely not
1105 			 * locked) when StrategyGetBuffer returned it, someone else could
1106 			 * have pinned and exclusive-locked it by the time we get here. If
1107 			 * we try to get the lock unconditionally, we'd block waiting for
1108 			 * them; if they later block waiting for us, deadlock ensues.
1109 			 * (This has been observed to happen when two backends are both
1110 			 * trying to split btree index pages, and the second one just
1111 			 * happens to be trying to split the page the first one got from
1112 			 * StrategyGetBuffer.)
1113 			 */
1114 			if (LWLockConditionalAcquire(BufferDescriptorGetContentLock(buf),
1115 										 LW_SHARED))
1116 			{
1117 				/*
1118 				 * If using a nondefault strategy, and writing the buffer
1119 				 * would require a WAL flush, let the strategy decide whether
1120 				 * to go ahead and write/reuse the buffer or to choose another
1121 				 * victim.  We need lock to inspect the page LSN, so this
1122 				 * can't be done inside StrategyGetBuffer.
1123 				 */
1124 				if (strategy != NULL)
1125 				{
1126 					XLogRecPtr	lsn;
1127 
1128 					/* Read the LSN while holding buffer header lock */
1129 					buf_state = LockBufHdr(buf);
1130 					lsn = BufferGetLSN(buf);
1131 					UnlockBufHdr(buf, buf_state);
1132 
1133 					if (XLogNeedsFlush(lsn) &&
1134 						StrategyRejectBuffer(strategy, buf))
1135 					{
1136 						/* Drop lock/pin and loop around for another buffer */
1137 						LWLockRelease(BufferDescriptorGetContentLock(buf));
1138 						UnpinBuffer(buf, true);
1139 						continue;
1140 					}
1141 				}
1142 
1143 				/* OK, do the I/O */
1144 				TRACE_POSTGRESQL_BUFFER_WRITE_DIRTY_START(forkNum, blockNum,
1145 														  smgr->smgr_rnode.node.spcNode,
1146 														  smgr->smgr_rnode.node.dbNode,
1147 														  smgr->smgr_rnode.node.relNode);
1148 
1149 				FlushBuffer(buf, NULL);
1150 				LWLockRelease(BufferDescriptorGetContentLock(buf));
1151 
1152 				ScheduleBufferTagForWriteback(&BackendWritebackContext,
1153 											  &buf->tag);
1154 
1155 				TRACE_POSTGRESQL_BUFFER_WRITE_DIRTY_DONE(forkNum, blockNum,
1156 														 smgr->smgr_rnode.node.spcNode,
1157 														 smgr->smgr_rnode.node.dbNode,
1158 														 smgr->smgr_rnode.node.relNode);
1159 			}
1160 			else
1161 			{
1162 				/*
1163 				 * Someone else has locked the buffer, so give it up and loop
1164 				 * back to get another one.
1165 				 */
1166 				UnpinBuffer(buf, true);
1167 				continue;
1168 			}
1169 		}
1170 
1171 		/*
1172 		 * To change the association of a valid buffer, we'll need to have
1173 		 * exclusive lock on both the old and new mapping partitions.
1174 		 */
1175 		if (oldFlags & BM_TAG_VALID)
1176 		{
1177 			/*
1178 			 * Need to compute the old tag's hashcode and partition lock ID.
1179 			 * XXX is it worth storing the hashcode in BufferDesc so we need
1180 			 * not recompute it here?  Probably not.
1181 			 */
1182 			oldTag = buf->tag;
1183 			oldHash = BufTableHashCode(&oldTag);
1184 			oldPartitionLock = BufMappingPartitionLock(oldHash);
1185 
1186 			/*
1187 			 * Must lock the lower-numbered partition first to avoid
1188 			 * deadlocks.
1189 			 */
1190 			if (oldPartitionLock < newPartitionLock)
1191 			{
1192 				LWLockAcquire(oldPartitionLock, LW_EXCLUSIVE);
1193 				LWLockAcquire(newPartitionLock, LW_EXCLUSIVE);
1194 			}
1195 			else if (oldPartitionLock > newPartitionLock)
1196 			{
1197 				LWLockAcquire(newPartitionLock, LW_EXCLUSIVE);
1198 				LWLockAcquire(oldPartitionLock, LW_EXCLUSIVE);
1199 			}
1200 			else
1201 			{
1202 				/* only one partition, only one lock */
1203 				LWLockAcquire(newPartitionLock, LW_EXCLUSIVE);
1204 			}
1205 		}
1206 		else
1207 		{
1208 			/* if it wasn't valid, we need only the new partition */
1209 			LWLockAcquire(newPartitionLock, LW_EXCLUSIVE);
1210 			/* remember we have no old-partition lock or tag */
1211 			oldPartitionLock = NULL;
1212 			/* this just keeps the compiler quiet about uninit variables */
1213 			oldHash = 0;
1214 		}
1215 
1216 		/*
1217 		 * Try to make a hashtable entry for the buffer under its new tag.
1218 		 * This could fail because while we were writing someone else
1219 		 * allocated another buffer for the same block we want to read in.
1220 		 * Note that we have not yet removed the hashtable entry for the old
1221 		 * tag.
1222 		 */
1223 		buf_id = BufTableInsert(&newTag, newHash, buf->buf_id);
1224 
1225 		if (buf_id >= 0)
1226 		{
1227 			/*
1228 			 * Got a collision. Someone has already done what we were about to
1229 			 * do. We'll just handle this as if it were found in the buffer
1230 			 * pool in the first place.  First, give up the buffer we were
1231 			 * planning to use.
1232 			 */
1233 			UnpinBuffer(buf, true);
1234 
1235 			/* Can give up that buffer's mapping partition lock now */
1236 			if (oldPartitionLock != NULL &&
1237 				oldPartitionLock != newPartitionLock)
1238 				LWLockRelease(oldPartitionLock);
1239 
1240 			/* remaining code should match code at top of routine */
1241 
1242 			buf = GetBufferDescriptor(buf_id);
1243 
1244 			valid = PinBuffer(buf, strategy);
1245 
1246 			/* Can release the mapping lock as soon as we've pinned it */
1247 			LWLockRelease(newPartitionLock);
1248 
1249 			*foundPtr = TRUE;
1250 
1251 			if (!valid)
1252 			{
1253 				/*
1254 				 * We can only get here if (a) someone else is still reading
1255 				 * in the page, or (b) a previous read attempt failed.  We
1256 				 * have to wait for any active read attempt to finish, and
1257 				 * then set up our own read attempt if the page is still not
1258 				 * BM_VALID.  StartBufferIO does it all.
1259 				 */
1260 				if (StartBufferIO(buf, true))
1261 				{
1262 					/*
1263 					 * If we get here, previous attempts to read the buffer
1264 					 * must have failed ... but we shall bravely try again.
1265 					 */
1266 					*foundPtr = FALSE;
1267 				}
1268 			}
1269 
1270 			return buf;
1271 		}
1272 
1273 		/*
1274 		 * Need to lock the buffer header too in order to change its tag.
1275 		 */
1276 		buf_state = LockBufHdr(buf);
1277 
1278 		/*
1279 		 * Somebody could have pinned or re-dirtied the buffer while we were
1280 		 * doing the I/O and making the new hashtable entry.  If so, we can't
1281 		 * recycle this buffer; we must undo everything we've done and start
1282 		 * over with a new victim buffer.
1283 		 */
1284 		oldFlags = buf_state & BUF_FLAG_MASK;
1285 		if (BUF_STATE_GET_REFCOUNT(buf_state) == 1 && !(oldFlags & BM_DIRTY))
1286 			break;
1287 
1288 		UnlockBufHdr(buf, buf_state);
1289 		BufTableDelete(&newTag, newHash);
1290 		if (oldPartitionLock != NULL &&
1291 			oldPartitionLock != newPartitionLock)
1292 			LWLockRelease(oldPartitionLock);
1293 		LWLockRelease(newPartitionLock);
1294 		UnpinBuffer(buf, true);
1295 	}
1296 
1297 	/*
1298 	 * Okay, it's finally safe to rename the buffer.
1299 	 *
1300 	 * Clearing BM_VALID here is necessary, clearing the dirtybits is just
1301 	 * paranoia.  We also reset the usage_count since any recency of use of
1302 	 * the old content is no longer relevant.  (The usage_count starts out at
1303 	 * 1 so that the buffer can survive one clock-sweep pass.)
1304 	 *
1305 	 * Make sure BM_PERMANENT is set for buffers that must be written at every
1306 	 * checkpoint.  Unlogged buffers only need to be written at shutdown
1307 	 * checkpoints, except for their "init" forks, which need to be treated
1308 	 * just like permanent relations.
1309 	 */
1310 	buf->tag = newTag;
1311 	buf_state &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED |
1312 				   BM_CHECKPOINT_NEEDED | BM_IO_ERROR | BM_PERMANENT |
1313 				   BUF_USAGECOUNT_MASK);
1314 	if (relpersistence == RELPERSISTENCE_PERMANENT || forkNum == INIT_FORKNUM)
1315 		buf_state |= BM_TAG_VALID | BM_PERMANENT | BUF_USAGECOUNT_ONE;
1316 	else
1317 		buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
1318 
1319 	UnlockBufHdr(buf, buf_state);
1320 
1321 	if (oldPartitionLock != NULL)
1322 	{
1323 		BufTableDelete(&oldTag, oldHash);
1324 		if (oldPartitionLock != newPartitionLock)
1325 			LWLockRelease(oldPartitionLock);
1326 	}
1327 
1328 	LWLockRelease(newPartitionLock);
1329 
1330 	/*
1331 	 * Buffer contents are currently invalid.  Try to get the io_in_progress
1332 	 * lock.  If StartBufferIO returns false, then someone else managed to
1333 	 * read it before we did, so there's nothing left for BufferAlloc() to do.
1334 	 */
1335 	if (StartBufferIO(buf, true))
1336 		*foundPtr = FALSE;
1337 	else
1338 		*foundPtr = TRUE;
1339 
1340 	return buf;
1341 }
1342 
1343 /*
1344  * InvalidateBuffer -- mark a shared buffer invalid and return it to the
1345  * freelist.
1346  *
1347  * The buffer header spinlock must be held at entry.  We drop it before
1348  * returning.  (This is sane because the caller must have locked the
1349  * buffer in order to be sure it should be dropped.)
1350  *
1351  * This is used only in contexts such as dropping a relation.  We assume
1352  * that no other backend could possibly be interested in using the page,
1353  * so the only reason the buffer might be pinned is if someone else is
1354  * trying to write it out.  We have to let them finish before we can
1355  * reclaim the buffer.
1356  *
1357  * The buffer could get reclaimed by someone else while we are waiting
1358  * to acquire the necessary locks; if so, don't mess it up.
1359  */
1360 static void
InvalidateBuffer(BufferDesc * buf)1361 InvalidateBuffer(BufferDesc *buf)
1362 {
1363 	BufferTag	oldTag;
1364 	uint32		oldHash;		/* hash value for oldTag */
1365 	LWLock	   *oldPartitionLock;	/* buffer partition lock for it */
1366 	uint32		oldFlags;
1367 	uint32		buf_state;
1368 
1369 	/* Save the original buffer tag before dropping the spinlock */
1370 	oldTag = buf->tag;
1371 
1372 	buf_state = pg_atomic_read_u32(&buf->state);
1373 	Assert(buf_state & BM_LOCKED);
1374 	UnlockBufHdr(buf, buf_state);
1375 
1376 	/*
1377 	 * Need to compute the old tag's hashcode and partition lock ID. XXX is it
1378 	 * worth storing the hashcode in BufferDesc so we need not recompute it
1379 	 * here?  Probably not.
1380 	 */
1381 	oldHash = BufTableHashCode(&oldTag);
1382 	oldPartitionLock = BufMappingPartitionLock(oldHash);
1383 
1384 retry:
1385 
1386 	/*
1387 	 * Acquire exclusive mapping lock in preparation for changing the buffer's
1388 	 * association.
1389 	 */
1390 	LWLockAcquire(oldPartitionLock, LW_EXCLUSIVE);
1391 
1392 	/* Re-lock the buffer header */
1393 	buf_state = LockBufHdr(buf);
1394 
1395 	/* If it's changed while we were waiting for lock, do nothing */
1396 	if (!BUFFERTAGS_EQUAL(buf->tag, oldTag))
1397 	{
1398 		UnlockBufHdr(buf, buf_state);
1399 		LWLockRelease(oldPartitionLock);
1400 		return;
1401 	}
1402 
1403 	/*
1404 	 * We assume the only reason for it to be pinned is that someone else is
1405 	 * flushing the page out.  Wait for them to finish.  (This could be an
1406 	 * infinite loop if the refcount is messed up... it would be nice to time
1407 	 * out after awhile, but there seems no way to be sure how many loops may
1408 	 * be needed.  Note that if the other guy has pinned the buffer but not
1409 	 * yet done StartBufferIO, WaitIO will fall through and we'll effectively
1410 	 * be busy-looping here.)
1411 	 */
1412 	if (BUF_STATE_GET_REFCOUNT(buf_state) != 0)
1413 	{
1414 		UnlockBufHdr(buf, buf_state);
1415 		LWLockRelease(oldPartitionLock);
1416 		/* safety check: should definitely not be our *own* pin */
1417 		if (GetPrivateRefCount(BufferDescriptorGetBuffer(buf)) > 0)
1418 			elog(ERROR, "buffer is pinned in InvalidateBuffer");
1419 		WaitIO(buf);
1420 		goto retry;
1421 	}
1422 
1423 	/*
1424 	 * Clear out the buffer's tag and flags.  We must do this to ensure that
1425 	 * linear scans of the buffer array don't think the buffer is valid.
1426 	 */
1427 	oldFlags = buf_state & BUF_FLAG_MASK;
1428 	CLEAR_BUFFERTAG(buf->tag);
1429 	buf_state &= ~(BUF_FLAG_MASK | BUF_USAGECOUNT_MASK);
1430 	UnlockBufHdr(buf, buf_state);
1431 
1432 	/*
1433 	 * Remove the buffer from the lookup hashtable, if it was in there.
1434 	 */
1435 	if (oldFlags & BM_TAG_VALID)
1436 		BufTableDelete(&oldTag, oldHash);
1437 
1438 	/*
1439 	 * Done with mapping lock.
1440 	 */
1441 	LWLockRelease(oldPartitionLock);
1442 
1443 	/*
1444 	 * Insert the buffer at the head of the list of free buffers.
1445 	 */
1446 	StrategyFreeBuffer(buf);
1447 }
1448 
1449 /*
1450  * MarkBufferDirty
1451  *
1452  *		Marks buffer contents as dirty (actual write happens later).
1453  *
1454  * Buffer must be pinned and exclusive-locked.  (If caller does not hold
1455  * exclusive lock, then somebody could be in process of writing the buffer,
1456  * leading to risk of bad data written to disk.)
1457  */
1458 void
MarkBufferDirty(Buffer buffer)1459 MarkBufferDirty(Buffer buffer)
1460 {
1461 	BufferDesc *bufHdr;
1462 	uint32		buf_state;
1463 	uint32		old_buf_state;
1464 
1465 	if (!BufferIsValid(buffer))
1466 		elog(ERROR, "bad buffer ID: %d", buffer);
1467 
1468 	if (BufferIsLocal(buffer))
1469 	{
1470 		MarkLocalBufferDirty(buffer);
1471 		return;
1472 	}
1473 
1474 	bufHdr = GetBufferDescriptor(buffer - 1);
1475 
1476 	Assert(BufferIsPinned(buffer));
1477 	Assert(LWLockHeldByMeInMode(BufferDescriptorGetContentLock(bufHdr),
1478 								LW_EXCLUSIVE));
1479 
1480 	old_buf_state = pg_atomic_read_u32(&bufHdr->state);
1481 	for (;;)
1482 	{
1483 		if (old_buf_state & BM_LOCKED)
1484 			old_buf_state = WaitBufHdrUnlocked(bufHdr);
1485 
1486 		buf_state = old_buf_state;
1487 
1488 		Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
1489 		buf_state |= BM_DIRTY | BM_JUST_DIRTIED;
1490 
1491 		if (pg_atomic_compare_exchange_u32(&bufHdr->state, &old_buf_state,
1492 										   buf_state))
1493 			break;
1494 	}
1495 
1496 	/*
1497 	 * If the buffer was not dirty already, do vacuum accounting.
1498 	 */
1499 	if (!(old_buf_state & BM_DIRTY))
1500 	{
1501 		VacuumPageDirty++;
1502 		pgBufferUsage.shared_blks_dirtied++;
1503 		if (VacuumCostActive)
1504 			VacuumCostBalance += VacuumCostPageDirty;
1505 	}
1506 }
1507 
1508 /*
1509  * ReleaseAndReadBuffer -- combine ReleaseBuffer() and ReadBuffer()
1510  *
1511  * Formerly, this saved one cycle of acquiring/releasing the BufMgrLock
1512  * compared to calling the two routines separately.  Now it's mainly just
1513  * a convenience function.  However, if the passed buffer is valid and
1514  * already contains the desired block, we just return it as-is; and that
1515  * does save considerable work compared to a full release and reacquire.
1516  *
1517  * Note: it is OK to pass buffer == InvalidBuffer, indicating that no old
1518  * buffer actually needs to be released.  This case is the same as ReadBuffer,
1519  * but can save some tests in the caller.
1520  */
1521 Buffer
ReleaseAndReadBuffer(Buffer buffer,Relation relation,BlockNumber blockNum)1522 ReleaseAndReadBuffer(Buffer buffer,
1523 					 Relation relation,
1524 					 BlockNumber blockNum)
1525 {
1526 	ForkNumber	forkNum = MAIN_FORKNUM;
1527 	BufferDesc *bufHdr;
1528 
1529 	if (BufferIsValid(buffer))
1530 	{
1531 		Assert(BufferIsPinned(buffer));
1532 		if (BufferIsLocal(buffer))
1533 		{
1534 			bufHdr = GetLocalBufferDescriptor(-buffer - 1);
1535 			if (bufHdr->tag.blockNum == blockNum &&
1536 				RelFileNodeEquals(bufHdr->tag.rnode, relation->rd_node) &&
1537 				bufHdr->tag.forkNum == forkNum)
1538 				return buffer;
1539 			ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer);
1540 			LocalRefCount[-buffer - 1]--;
1541 		}
1542 		else
1543 		{
1544 			bufHdr = GetBufferDescriptor(buffer - 1);
1545 			/* we have pin, so it's ok to examine tag without spinlock */
1546 			if (bufHdr->tag.blockNum == blockNum &&
1547 				RelFileNodeEquals(bufHdr->tag.rnode, relation->rd_node) &&
1548 				bufHdr->tag.forkNum == forkNum)
1549 				return buffer;
1550 			UnpinBuffer(bufHdr, true);
1551 		}
1552 	}
1553 
1554 	return ReadBuffer(relation, blockNum);
1555 }
1556 
1557 /*
1558  * PinBuffer -- make buffer unavailable for replacement.
1559  *
1560  * For the default access strategy, the buffer's usage_count is incremented
1561  * when we first pin it; for other strategies we just make sure the usage_count
1562  * isn't zero.  (The idea of the latter is that we don't want synchronized
1563  * heap scans to inflate the count, but we need it to not be zero to discourage
1564  * other backends from stealing buffers from our ring.  As long as we cycle
1565  * through the ring faster than the global clock-sweep cycles, buffers in
1566  * our ring won't be chosen as victims for replacement by other backends.)
1567  *
1568  * This should be applied only to shared buffers, never local ones.
1569  *
1570  * Since buffers are pinned/unpinned very frequently, pin buffers without
1571  * taking the buffer header lock; instead update the state variable in loop of
1572  * CAS operations. Hopefully it's just a single CAS.
1573  *
1574  * Note that ResourceOwnerEnlargeBuffers must have been done already.
1575  *
1576  * Returns TRUE if buffer is BM_VALID, else FALSE.  This provision allows
1577  * some callers to avoid an extra spinlock cycle.
1578  */
1579 static bool
PinBuffer(BufferDesc * buf,BufferAccessStrategy strategy)1580 PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy)
1581 {
1582 	Buffer		b = BufferDescriptorGetBuffer(buf);
1583 	bool		result;
1584 	PrivateRefCountEntry *ref;
1585 
1586 	ref = GetPrivateRefCountEntry(b, true);
1587 
1588 	if (ref == NULL)
1589 	{
1590 		uint32		buf_state;
1591 		uint32		old_buf_state;
1592 
1593 		ReservePrivateRefCountEntry();
1594 		ref = NewPrivateRefCountEntry(b);
1595 
1596 		old_buf_state = pg_atomic_read_u32(&buf->state);
1597 		for (;;)
1598 		{
1599 			if (old_buf_state & BM_LOCKED)
1600 				old_buf_state = WaitBufHdrUnlocked(buf);
1601 
1602 			buf_state = old_buf_state;
1603 
1604 			/* increase refcount */
1605 			buf_state += BUF_REFCOUNT_ONE;
1606 
1607 			if (strategy == NULL)
1608 			{
1609 				/* Default case: increase usagecount unless already max. */
1610 				if (BUF_STATE_GET_USAGECOUNT(buf_state) < BM_MAX_USAGE_COUNT)
1611 					buf_state += BUF_USAGECOUNT_ONE;
1612 			}
1613 			else
1614 			{
1615 				/*
1616 				 * Ring buffers shouldn't evict others from pool.  Thus we
1617 				 * don't make usagecount more than 1.
1618 				 */
1619 				if (BUF_STATE_GET_USAGECOUNT(buf_state) == 0)
1620 					buf_state += BUF_USAGECOUNT_ONE;
1621 			}
1622 
1623 			if (pg_atomic_compare_exchange_u32(&buf->state, &old_buf_state,
1624 											   buf_state))
1625 			{
1626 				result = (buf_state & BM_VALID) != 0;
1627 				break;
1628 			}
1629 		}
1630 	}
1631 	else
1632 	{
1633 		/* If we previously pinned the buffer, it must surely be valid */
1634 		result = true;
1635 	}
1636 
1637 	ref->refcount++;
1638 	Assert(ref->refcount > 0);
1639 	ResourceOwnerRememberBuffer(CurrentResourceOwner, b);
1640 	return result;
1641 }
1642 
1643 /*
1644  * PinBuffer_Locked -- as above, but caller already locked the buffer header.
1645  * The spinlock is released before return.
1646  *
1647  * As this function is called with the spinlock held, the caller has to
1648  * previously call ReservePrivateRefCountEntry().
1649  *
1650  * Currently, no callers of this function want to modify the buffer's
1651  * usage_count at all, so there's no need for a strategy parameter.
1652  * Also we don't bother with a BM_VALID test (the caller could check that for
1653  * itself).
1654  *
1655  * Also all callers only ever use this function when it's known that the
1656  * buffer can't have a preexisting pin by this backend. That allows us to skip
1657  * searching the private refcount array & hash, which is a boon, because the
1658  * spinlock is still held.
1659  *
1660  * Note: use of this routine is frequently mandatory, not just an optimization
1661  * to save a spin lock/unlock cycle, because we need to pin a buffer before
1662  * its state can change under us.
1663  */
1664 static void
PinBuffer_Locked(BufferDesc * buf)1665 PinBuffer_Locked(BufferDesc *buf)
1666 {
1667 	Buffer		b;
1668 	PrivateRefCountEntry *ref;
1669 	uint32		buf_state;
1670 
1671 	/*
1672 	 * As explained, We don't expect any preexisting pins. That allows us to
1673 	 * manipulate the PrivateRefCount after releasing the spinlock
1674 	 */
1675 	Assert(GetPrivateRefCountEntry(BufferDescriptorGetBuffer(buf), false) == NULL);
1676 
1677 	/*
1678 	 * Since we hold the buffer spinlock, we can update the buffer state and
1679 	 * release the lock in one operation.
1680 	 */
1681 	buf_state = pg_atomic_read_u32(&buf->state);
1682 	Assert(buf_state & BM_LOCKED);
1683 	buf_state += BUF_REFCOUNT_ONE;
1684 	UnlockBufHdr(buf, buf_state);
1685 
1686 	b = BufferDescriptorGetBuffer(buf);
1687 
1688 	ref = NewPrivateRefCountEntry(b);
1689 	ref->refcount++;
1690 
1691 	ResourceOwnerRememberBuffer(CurrentResourceOwner, b);
1692 }
1693 
1694 /*
1695  * UnpinBuffer -- make buffer available for replacement.
1696  *
1697  * This should be applied only to shared buffers, never local ones.
1698  *
1699  * Most but not all callers want CurrentResourceOwner to be adjusted.
1700  * Those that don't should pass fixOwner = FALSE.
1701  */
1702 static void
UnpinBuffer(BufferDesc * buf,bool fixOwner)1703 UnpinBuffer(BufferDesc *buf, bool fixOwner)
1704 {
1705 	PrivateRefCountEntry *ref;
1706 	Buffer		b = BufferDescriptorGetBuffer(buf);
1707 
1708 	/* not moving as we're likely deleting it soon anyway */
1709 	ref = GetPrivateRefCountEntry(b, false);
1710 	Assert(ref != NULL);
1711 
1712 	if (fixOwner)
1713 		ResourceOwnerForgetBuffer(CurrentResourceOwner, b);
1714 
1715 	Assert(ref->refcount > 0);
1716 	ref->refcount--;
1717 	if (ref->refcount == 0)
1718 	{
1719 		uint32		buf_state;
1720 		uint32		old_buf_state;
1721 
1722 		/* I'd better not still hold any locks on the buffer */
1723 		Assert(!LWLockHeldByMe(BufferDescriptorGetContentLock(buf)));
1724 		Assert(!LWLockHeldByMe(BufferDescriptorGetIOLock(buf)));
1725 
1726 		/*
1727 		 * Decrement the shared reference count.
1728 		 *
1729 		 * Since buffer spinlock holder can update status using just write,
1730 		 * it's not safe to use atomic decrement here; thus use a CAS loop.
1731 		 */
1732 		old_buf_state = pg_atomic_read_u32(&buf->state);
1733 		for (;;)
1734 		{
1735 			if (old_buf_state & BM_LOCKED)
1736 				old_buf_state = WaitBufHdrUnlocked(buf);
1737 
1738 			buf_state = old_buf_state;
1739 
1740 			buf_state -= BUF_REFCOUNT_ONE;
1741 
1742 			if (pg_atomic_compare_exchange_u32(&buf->state, &old_buf_state,
1743 											   buf_state))
1744 				break;
1745 		}
1746 
1747 		/* Support LockBufferForCleanup() */
1748 		if (buf_state & BM_PIN_COUNT_WAITER)
1749 		{
1750 			/*
1751 			 * Acquire the buffer header lock, re-check that there's a waiter.
1752 			 * Another backend could have unpinned this buffer, and already
1753 			 * woken up the waiter.  There's no danger of the buffer being
1754 			 * replaced after we unpinned it above, as it's pinned by the
1755 			 * waiter.
1756 			 */
1757 			buf_state = LockBufHdr(buf);
1758 
1759 			if ((buf_state & BM_PIN_COUNT_WAITER) &&
1760 				BUF_STATE_GET_REFCOUNT(buf_state) == 1)
1761 			{
1762 				/* we just released the last pin other than the waiter's */
1763 				int			wait_backend_pid = buf->wait_backend_pid;
1764 
1765 				buf_state &= ~BM_PIN_COUNT_WAITER;
1766 				UnlockBufHdr(buf, buf_state);
1767 				ProcSendSignal(wait_backend_pid);
1768 			}
1769 			else
1770 				UnlockBufHdr(buf, buf_state);
1771 		}
1772 		ForgetPrivateRefCountEntry(ref);
1773 	}
1774 }
1775 
1776 /*
1777  * BufferSync -- Write out all dirty buffers in the pool.
1778  *
1779  * This is called at checkpoint time to write out all dirty shared buffers.
1780  * The checkpoint request flags should be passed in.  If CHECKPOINT_IMMEDIATE
1781  * is set, we disable delays between writes; if CHECKPOINT_IS_SHUTDOWN,
1782  * CHECKPOINT_END_OF_RECOVERY or CHECKPOINT_FLUSH_ALL is set, we write even
1783  * unlogged buffers, which are otherwise skipped.  The remaining flags
1784  * currently have no effect here.
1785  */
1786 static void
BufferSync(int flags)1787 BufferSync(int flags)
1788 {
1789 	uint32		buf_state;
1790 	int			buf_id;
1791 	int			num_to_scan;
1792 	int			num_spaces;
1793 	int			num_processed;
1794 	int			num_written;
1795 	CkptTsStatus *per_ts_stat = NULL;
1796 	Oid			last_tsid;
1797 	binaryheap *ts_heap;
1798 	int			i;
1799 	int			mask = BM_DIRTY;
1800 	WritebackContext wb_context;
1801 
1802 	/* Make sure we can handle the pin inside SyncOneBuffer */
1803 	ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
1804 
1805 	/*
1806 	 * Unless this is a shutdown checkpoint or we have been explicitly told,
1807 	 * we write only permanent, dirty buffers.  But at shutdown or end of
1808 	 * recovery, we write all dirty buffers.
1809 	 */
1810 	if (!((flags & (CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_END_OF_RECOVERY |
1811 					CHECKPOINT_FLUSH_ALL))))
1812 		mask |= BM_PERMANENT;
1813 
1814 	/*
1815 	 * Loop over all buffers, and mark the ones that need to be written with
1816 	 * BM_CHECKPOINT_NEEDED.  Count them as we go (num_to_scan), so that we
1817 	 * can estimate how much work needs to be done.
1818 	 *
1819 	 * This allows us to write only those pages that were dirty when the
1820 	 * checkpoint began, and not those that get dirtied while it proceeds.
1821 	 * Whenever a page with BM_CHECKPOINT_NEEDED is written out, either by us
1822 	 * later in this function, or by normal backends or the bgwriter cleaning
1823 	 * scan, the flag is cleared.  Any buffer dirtied after this point won't
1824 	 * have the flag set.
1825 	 *
1826 	 * Note that if we fail to write some buffer, we may leave buffers with
1827 	 * BM_CHECKPOINT_NEEDED still set.  This is OK since any such buffer would
1828 	 * certainly need to be written for the next checkpoint attempt, too.
1829 	 */
1830 	num_to_scan = 0;
1831 	for (buf_id = 0; buf_id < NBuffers; buf_id++)
1832 	{
1833 		BufferDesc *bufHdr = GetBufferDescriptor(buf_id);
1834 
1835 		/*
1836 		 * Header spinlock is enough to examine BM_DIRTY, see comment in
1837 		 * SyncOneBuffer.
1838 		 */
1839 		buf_state = LockBufHdr(bufHdr);
1840 
1841 		if ((buf_state & mask) == mask)
1842 		{
1843 			CkptSortItem *item;
1844 
1845 			buf_state |= BM_CHECKPOINT_NEEDED;
1846 
1847 			item = &CkptBufferIds[num_to_scan++];
1848 			item->buf_id = buf_id;
1849 			item->tsId = bufHdr->tag.rnode.spcNode;
1850 			item->relNode = bufHdr->tag.rnode.relNode;
1851 			item->forkNum = bufHdr->tag.forkNum;
1852 			item->blockNum = bufHdr->tag.blockNum;
1853 		}
1854 
1855 		UnlockBufHdr(bufHdr, buf_state);
1856 	}
1857 
1858 	if (num_to_scan == 0)
1859 		return;					/* nothing to do */
1860 
1861 	WritebackContextInit(&wb_context, &checkpoint_flush_after);
1862 
1863 	TRACE_POSTGRESQL_BUFFER_SYNC_START(NBuffers, num_to_scan);
1864 
1865 	/*
1866 	 * Sort buffers that need to be written to reduce the likelihood of random
1867 	 * IO. The sorting is also important for the implementation of balancing
1868 	 * writes between tablespaces. Without balancing writes we'd potentially
1869 	 * end up writing to the tablespaces one-by-one; possibly overloading the
1870 	 * underlying system.
1871 	 */
1872 	qsort(CkptBufferIds, num_to_scan, sizeof(CkptSortItem),
1873 		  ckpt_buforder_comparator);
1874 
1875 	num_spaces = 0;
1876 
1877 	/*
1878 	 * Allocate progress status for each tablespace with buffers that need to
1879 	 * be flushed. This requires the to-be-flushed array to be sorted.
1880 	 */
1881 	last_tsid = InvalidOid;
1882 	for (i = 0; i < num_to_scan; i++)
1883 	{
1884 		CkptTsStatus *s;
1885 		Oid			cur_tsid;
1886 
1887 		cur_tsid = CkptBufferIds[i].tsId;
1888 
1889 		/*
1890 		 * Grow array of per-tablespace status structs, every time a new
1891 		 * tablespace is found.
1892 		 */
1893 		if (last_tsid == InvalidOid || last_tsid != cur_tsid)
1894 		{
1895 			Size		sz;
1896 
1897 			num_spaces++;
1898 
1899 			/*
1900 			 * Not worth adding grow-by-power-of-2 logic here - even with a
1901 			 * few hundred tablespaces this should be fine.
1902 			 */
1903 			sz = sizeof(CkptTsStatus) * num_spaces;
1904 
1905 			if (per_ts_stat == NULL)
1906 				per_ts_stat = (CkptTsStatus *) palloc(sz);
1907 			else
1908 				per_ts_stat = (CkptTsStatus *) repalloc(per_ts_stat, sz);
1909 
1910 			s = &per_ts_stat[num_spaces - 1];
1911 			memset(s, 0, sizeof(*s));
1912 			s->tsId = cur_tsid;
1913 
1914 			/*
1915 			 * The first buffer in this tablespace. As CkptBufferIds is sorted
1916 			 * by tablespace all (s->num_to_scan) buffers in this tablespace
1917 			 * will follow afterwards.
1918 			 */
1919 			s->index = i;
1920 
1921 			/*
1922 			 * progress_slice will be determined once we know how many buffers
1923 			 * are in each tablespace, i.e. after this loop.
1924 			 */
1925 
1926 			last_tsid = cur_tsid;
1927 		}
1928 		else
1929 		{
1930 			s = &per_ts_stat[num_spaces - 1];
1931 		}
1932 
1933 		s->num_to_scan++;
1934 	}
1935 
1936 	Assert(num_spaces > 0);
1937 
1938 	/*
1939 	 * Build a min-heap over the write-progress in the individual tablespaces,
1940 	 * and compute how large a portion of the total progress a single
1941 	 * processed buffer is.
1942 	 */
1943 	ts_heap = binaryheap_allocate(num_spaces,
1944 								  ts_ckpt_progress_comparator,
1945 								  NULL);
1946 
1947 	for (i = 0; i < num_spaces; i++)
1948 	{
1949 		CkptTsStatus *ts_stat = &per_ts_stat[i];
1950 
1951 		ts_stat->progress_slice = (float8) num_to_scan / ts_stat->num_to_scan;
1952 
1953 		binaryheap_add_unordered(ts_heap, PointerGetDatum(ts_stat));
1954 	}
1955 
1956 	binaryheap_build(ts_heap);
1957 
1958 	/*
1959 	 * Iterate through to-be-checkpointed buffers and write the ones (still)
1960 	 * marked with BM_CHECKPOINT_NEEDED. The writes are balanced between
1961 	 * tablespaces; otherwise the sorting would lead to only one tablespace
1962 	 * receiving writes at a time, making inefficient use of the hardware.
1963 	 */
1964 	num_processed = 0;
1965 	num_written = 0;
1966 	while (!binaryheap_empty(ts_heap))
1967 	{
1968 		BufferDesc *bufHdr = NULL;
1969 		CkptTsStatus *ts_stat = (CkptTsStatus *)
1970 		DatumGetPointer(binaryheap_first(ts_heap));
1971 
1972 		buf_id = CkptBufferIds[ts_stat->index].buf_id;
1973 		Assert(buf_id != -1);
1974 
1975 		bufHdr = GetBufferDescriptor(buf_id);
1976 
1977 		num_processed++;
1978 
1979 		/*
1980 		 * We don't need to acquire the lock here, because we're only looking
1981 		 * at a single bit. It's possible that someone else writes the buffer
1982 		 * and clears the flag right after we check, but that doesn't matter
1983 		 * since SyncOneBuffer will then do nothing.  However, there is a
1984 		 * further race condition: it's conceivable that between the time we
1985 		 * examine the bit here and the time SyncOneBuffer acquires the lock,
1986 		 * someone else not only wrote the buffer but replaced it with another
1987 		 * page and dirtied it.  In that improbable case, SyncOneBuffer will
1988 		 * write the buffer though we didn't need to.  It doesn't seem worth
1989 		 * guarding against this, though.
1990 		 */
1991 		if (pg_atomic_read_u32(&bufHdr->state) & BM_CHECKPOINT_NEEDED)
1992 		{
1993 			if (SyncOneBuffer(buf_id, false, &wb_context) & BUF_WRITTEN)
1994 			{
1995 				TRACE_POSTGRESQL_BUFFER_SYNC_WRITTEN(buf_id);
1996 				BgWriterStats.m_buf_written_checkpoints++;
1997 				num_written++;
1998 			}
1999 		}
2000 
2001 		/*
2002 		 * Measure progress independent of actually having to flush the buffer
2003 		 * - otherwise writing become unbalanced.
2004 		 */
2005 		ts_stat->progress += ts_stat->progress_slice;
2006 		ts_stat->num_scanned++;
2007 		ts_stat->index++;
2008 
2009 		/* Have all the buffers from the tablespace been processed? */
2010 		if (ts_stat->num_scanned == ts_stat->num_to_scan)
2011 		{
2012 			binaryheap_remove_first(ts_heap);
2013 		}
2014 		else
2015 		{
2016 			/* update heap with the new progress */
2017 			binaryheap_replace_first(ts_heap, PointerGetDatum(ts_stat));
2018 		}
2019 
2020 		/*
2021 		 * Sleep to throttle our I/O rate.
2022 		 */
2023 		CheckpointWriteDelay(flags, (double) num_processed / num_to_scan);
2024 	}
2025 
2026 	/* issue all pending flushes */
2027 	IssuePendingWritebacks(&wb_context);
2028 
2029 	pfree(per_ts_stat);
2030 	per_ts_stat = NULL;
2031 	binaryheap_free(ts_heap);
2032 
2033 	/*
2034 	 * Update checkpoint statistics. As noted above, this doesn't include
2035 	 * buffers written by other backends or bgwriter scan.
2036 	 */
2037 	CheckpointStats.ckpt_bufs_written += num_written;
2038 
2039 	TRACE_POSTGRESQL_BUFFER_SYNC_DONE(NBuffers, num_written, num_to_scan);
2040 }
2041 
2042 /*
2043  * BgBufferSync -- Write out some dirty buffers in the pool.
2044  *
2045  * This is called periodically by the background writer process.
2046  *
2047  * Returns true if it's appropriate for the bgwriter process to go into
2048  * low-power hibernation mode.  (This happens if the strategy clock sweep
2049  * has been "lapped" and no buffer allocations have occurred recently,
2050  * or if the bgwriter has been effectively disabled by setting
2051  * bgwriter_lru_maxpages to 0.)
2052  */
2053 bool
BgBufferSync(WritebackContext * wb_context)2054 BgBufferSync(WritebackContext *wb_context)
2055 {
2056 	/* info obtained from freelist.c */
2057 	int			strategy_buf_id;
2058 	uint32		strategy_passes;
2059 	uint32		recent_alloc;
2060 
2061 	/*
2062 	 * Information saved between calls so we can determine the strategy
2063 	 * point's advance rate and avoid scanning already-cleaned buffers.
2064 	 */
2065 	static bool saved_info_valid = false;
2066 	static int	prev_strategy_buf_id;
2067 	static uint32 prev_strategy_passes;
2068 	static int	next_to_clean;
2069 	static uint32 next_passes;
2070 
2071 	/* Moving averages of allocation rate and clean-buffer density */
2072 	static float smoothed_alloc = 0;
2073 	static float smoothed_density = 10.0;
2074 
2075 	/* Potentially these could be tunables, but for now, not */
2076 	float		smoothing_samples = 16;
2077 	float		scan_whole_pool_milliseconds = 120000.0;
2078 
2079 	/* Used to compute how far we scan ahead */
2080 	long		strategy_delta;
2081 	int			bufs_to_lap;
2082 	int			bufs_ahead;
2083 	float		scans_per_alloc;
2084 	int			reusable_buffers_est;
2085 	int			upcoming_alloc_est;
2086 	int			min_scan_buffers;
2087 
2088 	/* Variables for the scanning loop proper */
2089 	int			num_to_scan;
2090 	int			num_written;
2091 	int			reusable_buffers;
2092 
2093 	/* Variables for final smoothed_density update */
2094 	long		new_strategy_delta;
2095 	uint32		new_recent_alloc;
2096 
2097 	/*
2098 	 * Find out where the freelist clock sweep currently is, and how many
2099 	 * buffer allocations have happened since our last call.
2100 	 */
2101 	strategy_buf_id = StrategySyncStart(&strategy_passes, &recent_alloc);
2102 
2103 	/* Report buffer alloc counts to pgstat */
2104 	BgWriterStats.m_buf_alloc += recent_alloc;
2105 
2106 	/*
2107 	 * If we're not running the LRU scan, just stop after doing the stats
2108 	 * stuff.  We mark the saved state invalid so that we can recover sanely
2109 	 * if LRU scan is turned back on later.
2110 	 */
2111 	if (bgwriter_lru_maxpages <= 0)
2112 	{
2113 		saved_info_valid = false;
2114 		return true;
2115 	}
2116 
2117 	/*
2118 	 * Compute strategy_delta = how many buffers have been scanned by the
2119 	 * clock sweep since last time.  If first time through, assume none. Then
2120 	 * see if we are still ahead of the clock sweep, and if so, how many
2121 	 * buffers we could scan before we'd catch up with it and "lap" it. Note:
2122 	 * weird-looking coding of xxx_passes comparisons are to avoid bogus
2123 	 * behavior when the passes counts wrap around.
2124 	 */
2125 	if (saved_info_valid)
2126 	{
2127 		int32		passes_delta = strategy_passes - prev_strategy_passes;
2128 
2129 		strategy_delta = strategy_buf_id - prev_strategy_buf_id;
2130 		strategy_delta += (long) passes_delta * NBuffers;
2131 
2132 		Assert(strategy_delta >= 0);
2133 
2134 		if ((int32) (next_passes - strategy_passes) > 0)
2135 		{
2136 			/* we're one pass ahead of the strategy point */
2137 			bufs_to_lap = strategy_buf_id - next_to_clean;
2138 #ifdef BGW_DEBUG
2139 			elog(DEBUG2, "bgwriter ahead: bgw %u-%u strategy %u-%u delta=%ld lap=%d",
2140 				 next_passes, next_to_clean,
2141 				 strategy_passes, strategy_buf_id,
2142 				 strategy_delta, bufs_to_lap);
2143 #endif
2144 		}
2145 		else if (next_passes == strategy_passes &&
2146 				 next_to_clean >= strategy_buf_id)
2147 		{
2148 			/* on same pass, but ahead or at least not behind */
2149 			bufs_to_lap = NBuffers - (next_to_clean - strategy_buf_id);
2150 #ifdef BGW_DEBUG
2151 			elog(DEBUG2, "bgwriter ahead: bgw %u-%u strategy %u-%u delta=%ld lap=%d",
2152 				 next_passes, next_to_clean,
2153 				 strategy_passes, strategy_buf_id,
2154 				 strategy_delta, bufs_to_lap);
2155 #endif
2156 		}
2157 		else
2158 		{
2159 			/*
2160 			 * We're behind, so skip forward to the strategy point and start
2161 			 * cleaning from there.
2162 			 */
2163 #ifdef BGW_DEBUG
2164 			elog(DEBUG2, "bgwriter behind: bgw %u-%u strategy %u-%u delta=%ld",
2165 				 next_passes, next_to_clean,
2166 				 strategy_passes, strategy_buf_id,
2167 				 strategy_delta);
2168 #endif
2169 			next_to_clean = strategy_buf_id;
2170 			next_passes = strategy_passes;
2171 			bufs_to_lap = NBuffers;
2172 		}
2173 	}
2174 	else
2175 	{
2176 		/*
2177 		 * Initializing at startup or after LRU scanning had been off. Always
2178 		 * start at the strategy point.
2179 		 */
2180 #ifdef BGW_DEBUG
2181 		elog(DEBUG2, "bgwriter initializing: strategy %u-%u",
2182 			 strategy_passes, strategy_buf_id);
2183 #endif
2184 		strategy_delta = 0;
2185 		next_to_clean = strategy_buf_id;
2186 		next_passes = strategy_passes;
2187 		bufs_to_lap = NBuffers;
2188 	}
2189 
2190 	/* Update saved info for next time */
2191 	prev_strategy_buf_id = strategy_buf_id;
2192 	prev_strategy_passes = strategy_passes;
2193 	saved_info_valid = true;
2194 
2195 	/*
2196 	 * Compute how many buffers had to be scanned for each new allocation, ie,
2197 	 * 1/density of reusable buffers, and track a moving average of that.
2198 	 *
2199 	 * If the strategy point didn't move, we don't update the density estimate
2200 	 */
2201 	if (strategy_delta > 0 && recent_alloc > 0)
2202 	{
2203 		scans_per_alloc = (float) strategy_delta / (float) recent_alloc;
2204 		smoothed_density += (scans_per_alloc - smoothed_density) /
2205 			smoothing_samples;
2206 	}
2207 
2208 	/*
2209 	 * Estimate how many reusable buffers there are between the current
2210 	 * strategy point and where we've scanned ahead to, based on the smoothed
2211 	 * density estimate.
2212 	 */
2213 	bufs_ahead = NBuffers - bufs_to_lap;
2214 	reusable_buffers_est = (float) bufs_ahead / smoothed_density;
2215 
2216 	/*
2217 	 * Track a moving average of recent buffer allocations.  Here, rather than
2218 	 * a true average we want a fast-attack, slow-decline behavior: we
2219 	 * immediately follow any increase.
2220 	 */
2221 	if (smoothed_alloc <= (float) recent_alloc)
2222 		smoothed_alloc = recent_alloc;
2223 	else
2224 		smoothed_alloc += ((float) recent_alloc - smoothed_alloc) /
2225 			smoothing_samples;
2226 
2227 	/* Scale the estimate by a GUC to allow more aggressive tuning. */
2228 	upcoming_alloc_est = (int) (smoothed_alloc * bgwriter_lru_multiplier);
2229 
2230 	/*
2231 	 * If recent_alloc remains at zero for many cycles, smoothed_alloc will
2232 	 * eventually underflow to zero, and the underflows produce annoying
2233 	 * kernel warnings on some platforms.  Once upcoming_alloc_est has gone to
2234 	 * zero, there's no point in tracking smaller and smaller values of
2235 	 * smoothed_alloc, so just reset it to exactly zero to avoid this
2236 	 * syndrome.  It will pop back up as soon as recent_alloc increases.
2237 	 */
2238 	if (upcoming_alloc_est == 0)
2239 		smoothed_alloc = 0;
2240 
2241 	/*
2242 	 * Even in cases where there's been little or no buffer allocation
2243 	 * activity, we want to make a small amount of progress through the buffer
2244 	 * cache so that as many reusable buffers as possible are clean after an
2245 	 * idle period.
2246 	 *
2247 	 * (scan_whole_pool_milliseconds / BgWriterDelay) computes how many times
2248 	 * the BGW will be called during the scan_whole_pool time; slice the
2249 	 * buffer pool into that many sections.
2250 	 */
2251 	min_scan_buffers = (int) (NBuffers / (scan_whole_pool_milliseconds / BgWriterDelay));
2252 
2253 	if (upcoming_alloc_est < (min_scan_buffers + reusable_buffers_est))
2254 	{
2255 #ifdef BGW_DEBUG
2256 		elog(DEBUG2, "bgwriter: alloc_est=%d too small, using min=%d + reusable_est=%d",
2257 			 upcoming_alloc_est, min_scan_buffers, reusable_buffers_est);
2258 #endif
2259 		upcoming_alloc_est = min_scan_buffers + reusable_buffers_est;
2260 	}
2261 
2262 	/*
2263 	 * Now write out dirty reusable buffers, working forward from the
2264 	 * next_to_clean point, until we have lapped the strategy scan, or cleaned
2265 	 * enough buffers to match our estimate of the next cycle's allocation
2266 	 * requirements, or hit the bgwriter_lru_maxpages limit.
2267 	 */
2268 
2269 	/* Make sure we can handle the pin inside SyncOneBuffer */
2270 	ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
2271 
2272 	num_to_scan = bufs_to_lap;
2273 	num_written = 0;
2274 	reusable_buffers = reusable_buffers_est;
2275 
2276 	/* Execute the LRU scan */
2277 	while (num_to_scan > 0 && reusable_buffers < upcoming_alloc_est)
2278 	{
2279 		int			sync_state = SyncOneBuffer(next_to_clean, true,
2280 											   wb_context);
2281 
2282 		if (++next_to_clean >= NBuffers)
2283 		{
2284 			next_to_clean = 0;
2285 			next_passes++;
2286 		}
2287 		num_to_scan--;
2288 
2289 		if (sync_state & BUF_WRITTEN)
2290 		{
2291 			reusable_buffers++;
2292 			if (++num_written >= bgwriter_lru_maxpages)
2293 			{
2294 				BgWriterStats.m_maxwritten_clean++;
2295 				break;
2296 			}
2297 		}
2298 		else if (sync_state & BUF_REUSABLE)
2299 			reusable_buffers++;
2300 	}
2301 
2302 	BgWriterStats.m_buf_written_clean += num_written;
2303 
2304 #ifdef BGW_DEBUG
2305 	elog(DEBUG1, "bgwriter: recent_alloc=%u smoothed=%.2f delta=%ld ahead=%d density=%.2f reusable_est=%d upcoming_est=%d scanned=%d wrote=%d reusable=%d",
2306 		 recent_alloc, smoothed_alloc, strategy_delta, bufs_ahead,
2307 		 smoothed_density, reusable_buffers_est, upcoming_alloc_est,
2308 		 bufs_to_lap - num_to_scan,
2309 		 num_written,
2310 		 reusable_buffers - reusable_buffers_est);
2311 #endif
2312 
2313 	/*
2314 	 * Consider the above scan as being like a new allocation scan.
2315 	 * Characterize its density and update the smoothed one based on it. This
2316 	 * effectively halves the moving average period in cases where both the
2317 	 * strategy and the background writer are doing some useful scanning,
2318 	 * which is helpful because a long memory isn't as desirable on the
2319 	 * density estimates.
2320 	 */
2321 	new_strategy_delta = bufs_to_lap - num_to_scan;
2322 	new_recent_alloc = reusable_buffers - reusable_buffers_est;
2323 	if (new_strategy_delta > 0 && new_recent_alloc > 0)
2324 	{
2325 		scans_per_alloc = (float) new_strategy_delta / (float) new_recent_alloc;
2326 		smoothed_density += (scans_per_alloc - smoothed_density) /
2327 			smoothing_samples;
2328 
2329 #ifdef BGW_DEBUG
2330 		elog(DEBUG2, "bgwriter: cleaner density alloc=%u scan=%ld density=%.2f new smoothed=%.2f",
2331 			 new_recent_alloc, new_strategy_delta,
2332 			 scans_per_alloc, smoothed_density);
2333 #endif
2334 	}
2335 
2336 	/* Return true if OK to hibernate */
2337 	return (bufs_to_lap == 0 && recent_alloc == 0);
2338 }
2339 
2340 /*
2341  * SyncOneBuffer -- process a single buffer during syncing.
2342  *
2343  * If skip_recently_used is true, we don't write currently-pinned buffers, nor
2344  * buffers marked recently used, as these are not replacement candidates.
2345  *
2346  * Returns a bitmask containing the following flag bits:
2347  *	BUF_WRITTEN: we wrote the buffer.
2348  *	BUF_REUSABLE: buffer is available for replacement, ie, it has
2349  *		pin count 0 and usage count 0.
2350  *
2351  * (BUF_WRITTEN could be set in error if FlushBuffers finds the buffer clean
2352  * after locking it, but we don't care all that much.)
2353  *
2354  * Note: caller must have done ResourceOwnerEnlargeBuffers.
2355  */
2356 static int
SyncOneBuffer(int buf_id,bool skip_recently_used,WritebackContext * wb_context)2357 SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context)
2358 {
2359 	BufferDesc *bufHdr = GetBufferDescriptor(buf_id);
2360 	int			result = 0;
2361 	uint32		buf_state;
2362 	BufferTag	tag;
2363 
2364 	ReservePrivateRefCountEntry();
2365 
2366 	/*
2367 	 * Check whether buffer needs writing.
2368 	 *
2369 	 * We can make this check without taking the buffer content lock so long
2370 	 * as we mark pages dirty in access methods *before* logging changes with
2371 	 * XLogInsert(): if someone marks the buffer dirty just after our check we
2372 	 * don't worry because our checkpoint.redo points before log record for
2373 	 * upcoming changes and so we are not required to write such dirty buffer.
2374 	 */
2375 	buf_state = LockBufHdr(bufHdr);
2376 
2377 	if (BUF_STATE_GET_REFCOUNT(buf_state) == 0 &&
2378 		BUF_STATE_GET_USAGECOUNT(buf_state) == 0)
2379 	{
2380 		result |= BUF_REUSABLE;
2381 	}
2382 	else if (skip_recently_used)
2383 	{
2384 		/* Caller told us not to write recently-used buffers */
2385 		UnlockBufHdr(bufHdr, buf_state);
2386 		return result;
2387 	}
2388 
2389 	if (!(buf_state & BM_VALID) || !(buf_state & BM_DIRTY))
2390 	{
2391 		/* It's clean, so nothing to do */
2392 		UnlockBufHdr(bufHdr, buf_state);
2393 		return result;
2394 	}
2395 
2396 	/*
2397 	 * Pin it, share-lock it, write it.  (FlushBuffer will do nothing if the
2398 	 * buffer is clean by the time we've locked it.)
2399 	 */
2400 	PinBuffer_Locked(bufHdr);
2401 	LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_SHARED);
2402 
2403 	FlushBuffer(bufHdr, NULL);
2404 
2405 	LWLockRelease(BufferDescriptorGetContentLock(bufHdr));
2406 
2407 	tag = bufHdr->tag;
2408 
2409 	UnpinBuffer(bufHdr, true);
2410 
2411 	ScheduleBufferTagForWriteback(wb_context, &tag);
2412 
2413 	return result | BUF_WRITTEN;
2414 }
2415 
2416 /*
2417  *		AtEOXact_Buffers - clean up at end of transaction.
2418  *
2419  *		As of PostgreSQL 8.0, buffer pins should get released by the
2420  *		ResourceOwner mechanism.  This routine is just a debugging
2421  *		cross-check that no pins remain.
2422  */
2423 void
AtEOXact_Buffers(bool isCommit)2424 AtEOXact_Buffers(bool isCommit)
2425 {
2426 	CheckForBufferLeaks();
2427 
2428 	AtEOXact_LocalBuffers(isCommit);
2429 
2430 	Assert(PrivateRefCountOverflowed == 0);
2431 }
2432 
2433 /*
2434  * Initialize access to shared buffer pool
2435  *
2436  * This is called during backend startup (whether standalone or under the
2437  * postmaster).  It sets up for this backend's access to the already-existing
2438  * buffer pool.
2439  *
2440  * NB: this is called before InitProcess(), so we do not have a PGPROC and
2441  * cannot do LWLockAcquire; hence we can't actually access stuff in
2442  * shared memory yet.  We are only initializing local data here.
2443  * (See also InitBufferPoolBackend)
2444  */
2445 void
InitBufferPoolAccess(void)2446 InitBufferPoolAccess(void)
2447 {
2448 	HASHCTL		hash_ctl;
2449 
2450 	memset(&PrivateRefCountArray, 0, sizeof(PrivateRefCountArray));
2451 
2452 	MemSet(&hash_ctl, 0, sizeof(hash_ctl));
2453 	hash_ctl.keysize = sizeof(int32);
2454 	hash_ctl.entrysize = sizeof(PrivateRefCountEntry);
2455 
2456 	PrivateRefCountHash = hash_create("PrivateRefCount", 100, &hash_ctl,
2457 									  HASH_ELEM | HASH_BLOBS);
2458 }
2459 
2460 /*
2461  * InitBufferPoolBackend --- second-stage initialization of a new backend
2462  *
2463  * This is called after we have acquired a PGPROC and so can safely get
2464  * LWLocks.  We don't currently need to do anything at this stage ...
2465  * except register a shmem-exit callback.  AtProcExit_Buffers needs LWLock
2466  * access, and thereby has to be called at the corresponding phase of
2467  * backend shutdown.
2468  */
2469 void
InitBufferPoolBackend(void)2470 InitBufferPoolBackend(void)
2471 {
2472 	on_shmem_exit(AtProcExit_Buffers, 0);
2473 }
2474 
2475 /*
2476  * During backend exit, ensure that we released all shared-buffer locks and
2477  * assert that we have no remaining pins.
2478  */
2479 static void
AtProcExit_Buffers(int code,Datum arg)2480 AtProcExit_Buffers(int code, Datum arg)
2481 {
2482 	AbortBufferIO();
2483 	UnlockBuffers();
2484 
2485 	CheckForBufferLeaks();
2486 
2487 	/* localbuf.c needs a chance too */
2488 	AtProcExit_LocalBuffers();
2489 }
2490 
2491 /*
2492  *		CheckForBufferLeaks - ensure this backend holds no buffer pins
2493  *
2494  *		As of PostgreSQL 8.0, buffer pins should get released by the
2495  *		ResourceOwner mechanism.  This routine is just a debugging
2496  *		cross-check that no pins remain.
2497  */
2498 static void
CheckForBufferLeaks(void)2499 CheckForBufferLeaks(void)
2500 {
2501 #ifdef USE_ASSERT_CHECKING
2502 	int			RefCountErrors = 0;
2503 	PrivateRefCountEntry *res;
2504 	int			i;
2505 
2506 	/* check the array */
2507 	for (i = 0; i < REFCOUNT_ARRAY_ENTRIES; i++)
2508 	{
2509 		res = &PrivateRefCountArray[i];
2510 
2511 		if (res->buffer != InvalidBuffer)
2512 		{
2513 			PrintBufferLeakWarning(res->buffer);
2514 			RefCountErrors++;
2515 		}
2516 	}
2517 
2518 	/* if necessary search the hash */
2519 	if (PrivateRefCountOverflowed)
2520 	{
2521 		HASH_SEQ_STATUS hstat;
2522 
2523 		hash_seq_init(&hstat, PrivateRefCountHash);
2524 		while ((res = (PrivateRefCountEntry *) hash_seq_search(&hstat)) != NULL)
2525 		{
2526 			PrintBufferLeakWarning(res->buffer);
2527 			RefCountErrors++;
2528 		}
2529 
2530 	}
2531 
2532 	Assert(RefCountErrors == 0);
2533 #endif
2534 }
2535 
2536 /*
2537  * Helper routine to issue warnings when a buffer is unexpectedly pinned
2538  */
2539 void
PrintBufferLeakWarning(Buffer buffer)2540 PrintBufferLeakWarning(Buffer buffer)
2541 {
2542 	BufferDesc *buf;
2543 	int32		loccount;
2544 	char	   *path;
2545 	BackendId	backend;
2546 	uint32		buf_state;
2547 
2548 	Assert(BufferIsValid(buffer));
2549 	if (BufferIsLocal(buffer))
2550 	{
2551 		buf = GetLocalBufferDescriptor(-buffer - 1);
2552 		loccount = LocalRefCount[-buffer - 1];
2553 		backend = MyBackendId;
2554 	}
2555 	else
2556 	{
2557 		buf = GetBufferDescriptor(buffer - 1);
2558 		loccount = GetPrivateRefCount(buffer);
2559 		backend = InvalidBackendId;
2560 	}
2561 
2562 	/* theoretically we should lock the bufhdr here */
2563 	path = relpathbackend(buf->tag.rnode, backend, buf->tag.forkNum);
2564 	buf_state = pg_atomic_read_u32(&buf->state);
2565 	elog(WARNING,
2566 		 "buffer refcount leak: [%03d] "
2567 		 "(rel=%s, blockNum=%u, flags=0x%x, refcount=%u %d)",
2568 		 buffer, path,
2569 		 buf->tag.blockNum, buf_state & BUF_FLAG_MASK,
2570 		 BUF_STATE_GET_REFCOUNT(buf_state), loccount);
2571 	pfree(path);
2572 }
2573 
2574 /*
2575  * CheckPointBuffers
2576  *
2577  * Flush all dirty blocks in buffer pool to disk at checkpoint time.
2578  *
2579  * Note: temporary relations do not participate in checkpoints, so they don't
2580  * need to be flushed.
2581  */
2582 void
CheckPointBuffers(int flags)2583 CheckPointBuffers(int flags)
2584 {
2585 	TRACE_POSTGRESQL_BUFFER_CHECKPOINT_START(flags);
2586 	CheckpointStats.ckpt_write_t = GetCurrentTimestamp();
2587 	BufferSync(flags);
2588 	CheckpointStats.ckpt_sync_t = GetCurrentTimestamp();
2589 	TRACE_POSTGRESQL_BUFFER_CHECKPOINT_SYNC_START();
2590 	smgrsync();
2591 	CheckpointStats.ckpt_sync_end_t = GetCurrentTimestamp();
2592 	TRACE_POSTGRESQL_BUFFER_CHECKPOINT_DONE();
2593 }
2594 
2595 
2596 /*
2597  * Do whatever is needed to prepare for commit at the bufmgr and smgr levels
2598  */
2599 void
BufmgrCommit(void)2600 BufmgrCommit(void)
2601 {
2602 	/* Nothing to do in bufmgr anymore... */
2603 }
2604 
2605 /*
2606  * BufferGetBlockNumber
2607  *		Returns the block number associated with a buffer.
2608  *
2609  * Note:
2610  *		Assumes that the buffer is valid and pinned, else the
2611  *		value may be obsolete immediately...
2612  */
2613 BlockNumber
BufferGetBlockNumber(Buffer buffer)2614 BufferGetBlockNumber(Buffer buffer)
2615 {
2616 	BufferDesc *bufHdr;
2617 
2618 	Assert(BufferIsPinned(buffer));
2619 
2620 	if (BufferIsLocal(buffer))
2621 		bufHdr = GetLocalBufferDescriptor(-buffer - 1);
2622 	else
2623 		bufHdr = GetBufferDescriptor(buffer - 1);
2624 
2625 	/* pinned, so OK to read tag without spinlock */
2626 	return bufHdr->tag.blockNum;
2627 }
2628 
2629 /*
2630  * BufferGetTag
2631  *		Returns the relfilenode, fork number and block number associated with
2632  *		a buffer.
2633  */
2634 void
BufferGetTag(Buffer buffer,RelFileNode * rnode,ForkNumber * forknum,BlockNumber * blknum)2635 BufferGetTag(Buffer buffer, RelFileNode *rnode, ForkNumber *forknum,
2636 			 BlockNumber *blknum)
2637 {
2638 	BufferDesc *bufHdr;
2639 
2640 	/* Do the same checks as BufferGetBlockNumber. */
2641 	Assert(BufferIsPinned(buffer));
2642 
2643 	if (BufferIsLocal(buffer))
2644 		bufHdr = GetLocalBufferDescriptor(-buffer - 1);
2645 	else
2646 		bufHdr = GetBufferDescriptor(buffer - 1);
2647 
2648 	/* pinned, so OK to read tag without spinlock */
2649 	*rnode = bufHdr->tag.rnode;
2650 	*forknum = bufHdr->tag.forkNum;
2651 	*blknum = bufHdr->tag.blockNum;
2652 }
2653 
2654 /*
2655  * FlushBuffer
2656  *		Physically write out a shared buffer.
2657  *
2658  * NOTE: this actually just passes the buffer contents to the kernel; the
2659  * real write to disk won't happen until the kernel feels like it.  This
2660  * is okay from our point of view since we can redo the changes from WAL.
2661  * However, we will need to force the changes to disk via fsync before
2662  * we can checkpoint WAL.
2663  *
2664  * The caller must hold a pin on the buffer and have share-locked the
2665  * buffer contents.  (Note: a share-lock does not prevent updates of
2666  * hint bits in the buffer, so the page could change while the write
2667  * is in progress, but we assume that that will not invalidate the data
2668  * written.)
2669  *
2670  * If the caller has an smgr reference for the buffer's relation, pass it
2671  * as the second parameter.  If not, pass NULL.
2672  */
2673 static void
FlushBuffer(BufferDesc * buf,SMgrRelation reln)2674 FlushBuffer(BufferDesc *buf, SMgrRelation reln)
2675 {
2676 	XLogRecPtr	recptr;
2677 	ErrorContextCallback errcallback;
2678 	instr_time	io_start,
2679 				io_time;
2680 	Block		bufBlock;
2681 	char	   *bufToWrite;
2682 	uint32		buf_state;
2683 
2684 	/*
2685 	 * Acquire the buffer's io_in_progress lock.  If StartBufferIO returns
2686 	 * false, then someone else flushed the buffer before we could, so we need
2687 	 * not do anything.
2688 	 */
2689 	if (!StartBufferIO(buf, false))
2690 		return;
2691 
2692 	/* Setup error traceback support for ereport() */
2693 	errcallback.callback = shared_buffer_write_error_callback;
2694 	errcallback.arg = (void *) buf;
2695 	errcallback.previous = error_context_stack;
2696 	error_context_stack = &errcallback;
2697 
2698 	/* Find smgr relation for buffer */
2699 	if (reln == NULL)
2700 		reln = smgropen(buf->tag.rnode, InvalidBackendId);
2701 
2702 	TRACE_POSTGRESQL_BUFFER_FLUSH_START(buf->tag.forkNum,
2703 										buf->tag.blockNum,
2704 										reln->smgr_rnode.node.spcNode,
2705 										reln->smgr_rnode.node.dbNode,
2706 										reln->smgr_rnode.node.relNode);
2707 
2708 	buf_state = LockBufHdr(buf);
2709 
2710 	/*
2711 	 * Run PageGetLSN while holding header lock, since we don't have the
2712 	 * buffer locked exclusively in all cases.
2713 	 */
2714 	recptr = BufferGetLSN(buf);
2715 
2716 	/* To check if block content changes while flushing. - vadim 01/17/97 */
2717 	buf_state &= ~BM_JUST_DIRTIED;
2718 	UnlockBufHdr(buf, buf_state);
2719 
2720 	/*
2721 	 * Force XLOG flush up to buffer's LSN.  This implements the basic WAL
2722 	 * rule that log updates must hit disk before any of the data-file changes
2723 	 * they describe do.
2724 	 *
2725 	 * However, this rule does not apply to unlogged relations, which will be
2726 	 * lost after a crash anyway.  Most unlogged relation pages do not bear
2727 	 * LSNs since we never emit WAL records for them, and therefore flushing
2728 	 * up through the buffer LSN would be useless, but harmless.  However,
2729 	 * GiST indexes use LSNs internally to track page-splits, and therefore
2730 	 * unlogged GiST pages bear "fake" LSNs generated by
2731 	 * GetFakeLSNForUnloggedRel.  It is unlikely but possible that the fake
2732 	 * LSN counter could advance past the WAL insertion point; and if it did
2733 	 * happen, attempting to flush WAL through that location would fail, with
2734 	 * disastrous system-wide consequences.  To make sure that can't happen,
2735 	 * skip the flush if the buffer isn't permanent.
2736 	 */
2737 	if (buf_state & BM_PERMANENT)
2738 		XLogFlush(recptr);
2739 
2740 	/*
2741 	 * Now it's safe to write buffer to disk. Note that no one else should
2742 	 * have been able to write it while we were busy with log flushing because
2743 	 * we have the io_in_progress lock.
2744 	 */
2745 	bufBlock = BufHdrGetBlock(buf);
2746 
2747 	/*
2748 	 * Update page checksum if desired.  Since we have only shared lock on the
2749 	 * buffer, other processes might be updating hint bits in it, so we must
2750 	 * copy the page to private storage if we do checksumming.
2751 	 */
2752 	bufToWrite = PageSetChecksumCopy((Page) bufBlock, buf->tag.blockNum);
2753 
2754 	if (track_io_timing)
2755 		INSTR_TIME_SET_CURRENT(io_start);
2756 
2757 	/*
2758 	 * bufToWrite is either the shared buffer or a copy, as appropriate.
2759 	 */
2760 	smgrwrite(reln,
2761 			  buf->tag.forkNum,
2762 			  buf->tag.blockNum,
2763 			  bufToWrite,
2764 			  false);
2765 
2766 	if (track_io_timing)
2767 	{
2768 		INSTR_TIME_SET_CURRENT(io_time);
2769 		INSTR_TIME_SUBTRACT(io_time, io_start);
2770 		pgstat_count_buffer_write_time(INSTR_TIME_GET_MICROSEC(io_time));
2771 		INSTR_TIME_ADD(pgBufferUsage.blk_write_time, io_time);
2772 	}
2773 
2774 	pgBufferUsage.shared_blks_written++;
2775 
2776 	/*
2777 	 * Mark the buffer as clean (unless BM_JUST_DIRTIED has become set) and
2778 	 * end the io_in_progress state.
2779 	 */
2780 	TerminateBufferIO(buf, true, 0);
2781 
2782 	TRACE_POSTGRESQL_BUFFER_FLUSH_DONE(buf->tag.forkNum,
2783 									   buf->tag.blockNum,
2784 									   reln->smgr_rnode.node.spcNode,
2785 									   reln->smgr_rnode.node.dbNode,
2786 									   reln->smgr_rnode.node.relNode);
2787 
2788 	/* Pop the error context stack */
2789 	error_context_stack = errcallback.previous;
2790 }
2791 
2792 /*
2793  * RelationGetNumberOfBlocksInFork
2794  *		Determines the current number of pages in the specified relation fork.
2795  */
2796 BlockNumber
RelationGetNumberOfBlocksInFork(Relation relation,ForkNumber forkNum)2797 RelationGetNumberOfBlocksInFork(Relation relation, ForkNumber forkNum)
2798 {
2799 	/* Open it at the smgr level if not already done */
2800 	RelationOpenSmgr(relation);
2801 
2802 	return smgrnblocks(relation->rd_smgr, forkNum);
2803 }
2804 
2805 /*
2806  * BufferIsPermanent
2807  *		Determines whether a buffer will potentially still be around after
2808  *		a crash.  Caller must hold a buffer pin.
2809  */
2810 bool
BufferIsPermanent(Buffer buffer)2811 BufferIsPermanent(Buffer buffer)
2812 {
2813 	BufferDesc *bufHdr;
2814 
2815 	/* Local buffers are used only for temp relations. */
2816 	if (BufferIsLocal(buffer))
2817 		return false;
2818 
2819 	/* Make sure we've got a real buffer, and that we hold a pin on it. */
2820 	Assert(BufferIsValid(buffer));
2821 	Assert(BufferIsPinned(buffer));
2822 
2823 	/*
2824 	 * BM_PERMANENT can't be changed while we hold a pin on the buffer, so we
2825 	 * need not bother with the buffer header spinlock.  Even if someone else
2826 	 * changes the buffer header state while we're doing this, the state is
2827 	 * changed atomically, so we'll read the old value or the new value, but
2828 	 * not random garbage.
2829 	 */
2830 	bufHdr = GetBufferDescriptor(buffer - 1);
2831 	return (pg_atomic_read_u32(&bufHdr->state) & BM_PERMANENT) != 0;
2832 }
2833 
2834 /*
2835  * BufferGetLSNAtomic
2836  *		Retrieves the LSN of the buffer atomically using a buffer header lock.
2837  *		This is necessary for some callers who may not have an exclusive lock
2838  *		on the buffer.
2839  */
2840 XLogRecPtr
BufferGetLSNAtomic(Buffer buffer)2841 BufferGetLSNAtomic(Buffer buffer)
2842 {
2843 	BufferDesc *bufHdr = GetBufferDescriptor(buffer - 1);
2844 	char	   *page = BufferGetPage(buffer);
2845 	XLogRecPtr	lsn;
2846 	uint32		buf_state;
2847 
2848 	/*
2849 	 * If we don't need locking for correctness, fastpath out.
2850 	 */
2851 	if (!XLogHintBitIsNeeded() || BufferIsLocal(buffer))
2852 		return PageGetLSN(page);
2853 
2854 	/* Make sure we've got a real buffer, and that we hold a pin on it. */
2855 	Assert(BufferIsValid(buffer));
2856 	Assert(BufferIsPinned(buffer));
2857 
2858 	buf_state = LockBufHdr(bufHdr);
2859 	lsn = PageGetLSN(page);
2860 	UnlockBufHdr(bufHdr, buf_state);
2861 
2862 	return lsn;
2863 }
2864 
2865 /* ---------------------------------------------------------------------
2866  *		DropRelFileNodeBuffers
2867  *
2868  *		This function removes from the buffer pool all the pages of the
2869  *		specified relation fork that have block numbers >= firstDelBlock.
2870  *		(In particular, with firstDelBlock = 0, all pages are removed.)
2871  *		Dirty pages are simply dropped, without bothering to write them
2872  *		out first.  Therefore, this is NOT rollback-able, and so should be
2873  *		used only with extreme caution!
2874  *
2875  *		Currently, this is called only from smgr.c when the underlying file
2876  *		is about to be deleted or truncated (firstDelBlock is needed for
2877  *		the truncation case).  The data in the affected pages would therefore
2878  *		be deleted momentarily anyway, and there is no point in writing it.
2879  *		It is the responsibility of higher-level code to ensure that the
2880  *		deletion or truncation does not lose any data that could be needed
2881  *		later.  It is also the responsibility of higher-level code to ensure
2882  *		that no other process could be trying to load more pages of the
2883  *		relation into buffers.
2884  *
2885  *		XXX currently it sequentially searches the buffer pool, should be
2886  *		changed to more clever ways of searching.  However, this routine
2887  *		is used only in code paths that aren't very performance-critical,
2888  *		and we shouldn't slow down the hot paths to make it faster ...
2889  * --------------------------------------------------------------------
2890  */
2891 void
DropRelFileNodeBuffers(RelFileNodeBackend rnode,ForkNumber forkNum,BlockNumber firstDelBlock)2892 DropRelFileNodeBuffers(RelFileNodeBackend rnode, ForkNumber forkNum,
2893 					   BlockNumber firstDelBlock)
2894 {
2895 	int			i;
2896 
2897 	/* If it's a local relation, it's localbuf.c's problem. */
2898 	if (RelFileNodeBackendIsTemp(rnode))
2899 	{
2900 		if (rnode.backend == MyBackendId)
2901 			DropRelFileNodeLocalBuffers(rnode.node, forkNum, firstDelBlock);
2902 		return;
2903 	}
2904 
2905 	for (i = 0; i < NBuffers; i++)
2906 	{
2907 		BufferDesc *bufHdr = GetBufferDescriptor(i);
2908 		uint32		buf_state;
2909 
2910 		/*
2911 		 * We can make this a tad faster by prechecking the buffer tag before
2912 		 * we attempt to lock the buffer; this saves a lot of lock
2913 		 * acquisitions in typical cases.  It should be safe because the
2914 		 * caller must have AccessExclusiveLock on the relation, or some other
2915 		 * reason to be certain that no one is loading new pages of the rel
2916 		 * into the buffer pool.  (Otherwise we might well miss such pages
2917 		 * entirely.)  Therefore, while the tag might be changing while we
2918 		 * look at it, it can't be changing *to* a value we care about, only
2919 		 * *away* from such a value.  So false negatives are impossible, and
2920 		 * false positives are safe because we'll recheck after getting the
2921 		 * buffer lock.
2922 		 *
2923 		 * We could check forkNum and blockNum as well as the rnode, but the
2924 		 * incremental win from doing so seems small.
2925 		 */
2926 		if (!RelFileNodeEquals(bufHdr->tag.rnode, rnode.node))
2927 			continue;
2928 
2929 		buf_state = LockBufHdr(bufHdr);
2930 		if (RelFileNodeEquals(bufHdr->tag.rnode, rnode.node) &&
2931 			bufHdr->tag.forkNum == forkNum &&
2932 			bufHdr->tag.blockNum >= firstDelBlock)
2933 			InvalidateBuffer(bufHdr);	/* releases spinlock */
2934 		else
2935 			UnlockBufHdr(bufHdr, buf_state);
2936 	}
2937 }
2938 
2939 /* ---------------------------------------------------------------------
2940  *		DropRelFileNodesAllBuffers
2941  *
2942  *		This function removes from the buffer pool all the pages of all
2943  *		forks of the specified relations.  It's equivalent to calling
2944  *		DropRelFileNodeBuffers once per fork per relation with
2945  *		firstDelBlock = 0.
2946  * --------------------------------------------------------------------
2947  */
2948 void
DropRelFileNodesAllBuffers(RelFileNodeBackend * rnodes,int nnodes)2949 DropRelFileNodesAllBuffers(RelFileNodeBackend *rnodes, int nnodes)
2950 {
2951 	int			i,
2952 				n = 0;
2953 	RelFileNode *nodes;
2954 	bool		use_bsearch;
2955 
2956 	if (nnodes == 0)
2957 		return;
2958 
2959 	nodes = palloc(sizeof(RelFileNode) * nnodes);	/* non-local relations */
2960 
2961 	/* If it's a local relation, it's localbuf.c's problem. */
2962 	for (i = 0; i < nnodes; i++)
2963 	{
2964 		if (RelFileNodeBackendIsTemp(rnodes[i]))
2965 		{
2966 			if (rnodes[i].backend == MyBackendId)
2967 				DropRelFileNodeAllLocalBuffers(rnodes[i].node);
2968 		}
2969 		else
2970 			nodes[n++] = rnodes[i].node;
2971 	}
2972 
2973 	/*
2974 	 * If there are no non-local relations, then we're done. Release the
2975 	 * memory and return.
2976 	 */
2977 	if (n == 0)
2978 	{
2979 		pfree(nodes);
2980 		return;
2981 	}
2982 
2983 	/*
2984 	 * For low number of relations to drop just use a simple walk through, to
2985 	 * save the bsearch overhead. The threshold to use is rather a guess than
2986 	 * an exactly determined value, as it depends on many factors (CPU and RAM
2987 	 * speeds, amount of shared buffers etc.).
2988 	 */
2989 	use_bsearch = n > DROP_RELS_BSEARCH_THRESHOLD;
2990 
2991 	/* sort the list of rnodes if necessary */
2992 	if (use_bsearch)
2993 		pg_qsort(nodes, n, sizeof(RelFileNode), rnode_comparator);
2994 
2995 	for (i = 0; i < NBuffers; i++)
2996 	{
2997 		RelFileNode *rnode = NULL;
2998 		BufferDesc *bufHdr = GetBufferDescriptor(i);
2999 		uint32		buf_state;
3000 
3001 		/*
3002 		 * As in DropRelFileNodeBuffers, an unlocked precheck should be safe
3003 		 * and saves some cycles.
3004 		 */
3005 
3006 		if (!use_bsearch)
3007 		{
3008 			int			j;
3009 
3010 			for (j = 0; j < n; j++)
3011 			{
3012 				if (RelFileNodeEquals(bufHdr->tag.rnode, nodes[j]))
3013 				{
3014 					rnode = &nodes[j];
3015 					break;
3016 				}
3017 			}
3018 		}
3019 		else
3020 		{
3021 			rnode = bsearch((const void *) &(bufHdr->tag.rnode),
3022 							nodes, n, sizeof(RelFileNode),
3023 							rnode_comparator);
3024 		}
3025 
3026 		/* buffer doesn't belong to any of the given relfilenodes; skip it */
3027 		if (rnode == NULL)
3028 			continue;
3029 
3030 		buf_state = LockBufHdr(bufHdr);
3031 		if (RelFileNodeEquals(bufHdr->tag.rnode, (*rnode)))
3032 			InvalidateBuffer(bufHdr);	/* releases spinlock */
3033 		else
3034 			UnlockBufHdr(bufHdr, buf_state);
3035 	}
3036 
3037 	pfree(nodes);
3038 }
3039 
3040 /* ---------------------------------------------------------------------
3041  *		DropDatabaseBuffers
3042  *
3043  *		This function removes all the buffers in the buffer cache for a
3044  *		particular database.  Dirty pages are simply dropped, without
3045  *		bothering to write them out first.  This is used when we destroy a
3046  *		database, to avoid trying to flush data to disk when the directory
3047  *		tree no longer exists.  Implementation is pretty similar to
3048  *		DropRelFileNodeBuffers() which is for destroying just one relation.
3049  * --------------------------------------------------------------------
3050  */
3051 void
DropDatabaseBuffers(Oid dbid)3052 DropDatabaseBuffers(Oid dbid)
3053 {
3054 	int			i;
3055 
3056 	/*
3057 	 * We needn't consider local buffers, since by assumption the target
3058 	 * database isn't our own.
3059 	 */
3060 
3061 	for (i = 0; i < NBuffers; i++)
3062 	{
3063 		BufferDesc *bufHdr = GetBufferDescriptor(i);
3064 		uint32		buf_state;
3065 
3066 		/*
3067 		 * As in DropRelFileNodeBuffers, an unlocked precheck should be safe
3068 		 * and saves some cycles.
3069 		 */
3070 		if (bufHdr->tag.rnode.dbNode != dbid)
3071 			continue;
3072 
3073 		buf_state = LockBufHdr(bufHdr);
3074 		if (bufHdr->tag.rnode.dbNode == dbid)
3075 			InvalidateBuffer(bufHdr);	/* releases spinlock */
3076 		else
3077 			UnlockBufHdr(bufHdr, buf_state);
3078 	}
3079 }
3080 
3081 /* -----------------------------------------------------------------
3082  *		PrintBufferDescs
3083  *
3084  *		this function prints all the buffer descriptors, for debugging
3085  *		use only.
3086  * -----------------------------------------------------------------
3087  */
3088 #ifdef NOT_USED
3089 void
PrintBufferDescs(void)3090 PrintBufferDescs(void)
3091 {
3092 	int			i;
3093 
3094 	for (i = 0; i < NBuffers; ++i)
3095 	{
3096 		BufferDesc *buf = GetBufferDescriptor(i);
3097 		Buffer		b = BufferDescriptorGetBuffer(buf);
3098 
3099 		/* theoretically we should lock the bufhdr here */
3100 		elog(LOG,
3101 			 "[%02d] (freeNext=%d, rel=%s, "
3102 			 "blockNum=%u, flags=0x%x, refcount=%u %d)",
3103 			 i, buf->freeNext,
3104 			 relpathbackend(buf->tag.rnode, InvalidBackendId, buf->tag.forkNum),
3105 			 buf->tag.blockNum, buf->flags,
3106 			 buf->refcount, GetPrivateRefCount(b));
3107 	}
3108 }
3109 #endif
3110 
3111 #ifdef NOT_USED
3112 void
PrintPinnedBufs(void)3113 PrintPinnedBufs(void)
3114 {
3115 	int			i;
3116 
3117 	for (i = 0; i < NBuffers; ++i)
3118 	{
3119 		BufferDesc *buf = GetBufferDescriptor(i);
3120 		Buffer		b = BufferDescriptorGetBuffer(buf);
3121 
3122 		if (GetPrivateRefCount(b) > 0)
3123 		{
3124 			/* theoretically we should lock the bufhdr here */
3125 			elog(LOG,
3126 				 "[%02d] (freeNext=%d, rel=%s, "
3127 				 "blockNum=%u, flags=0x%x, refcount=%u %d)",
3128 				 i, buf->freeNext,
3129 				 relpathperm(buf->tag.rnode, buf->tag.forkNum),
3130 				 buf->tag.blockNum, buf->flags,
3131 				 buf->refcount, GetPrivateRefCount(b));
3132 		}
3133 	}
3134 }
3135 #endif
3136 
3137 /* ---------------------------------------------------------------------
3138  *		FlushRelationBuffers
3139  *
3140  *		This function writes all dirty pages of a relation out to disk
3141  *		(or more accurately, out to kernel disk buffers), ensuring that the
3142  *		kernel has an up-to-date view of the relation.
3143  *
3144  *		Generally, the caller should be holding AccessExclusiveLock on the
3145  *		target relation to ensure that no other backend is busy dirtying
3146  *		more blocks of the relation; the effects can't be expected to last
3147  *		after the lock is released.
3148  *
3149  *		XXX currently it sequentially searches the buffer pool, should be
3150  *		changed to more clever ways of searching.  This routine is not
3151  *		used in any performance-critical code paths, so it's not worth
3152  *		adding additional overhead to normal paths to make it go faster;
3153  *		but see also DropRelFileNodeBuffers.
3154  * --------------------------------------------------------------------
3155  */
3156 void
FlushRelationBuffers(Relation rel)3157 FlushRelationBuffers(Relation rel)
3158 {
3159 	int			i;
3160 	BufferDesc *bufHdr;
3161 
3162 	/* Open rel at the smgr level if not already done */
3163 	RelationOpenSmgr(rel);
3164 
3165 	if (RelationUsesLocalBuffers(rel))
3166 	{
3167 		for (i = 0; i < NLocBuffer; i++)
3168 		{
3169 			uint32		buf_state;
3170 
3171 			bufHdr = GetLocalBufferDescriptor(i);
3172 			if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) &&
3173 				((buf_state = pg_atomic_read_u32(&bufHdr->state)) &
3174 				 (BM_VALID | BM_DIRTY)) == (BM_VALID | BM_DIRTY))
3175 			{
3176 				ErrorContextCallback errcallback;
3177 				Page		localpage;
3178 
3179 				localpage = (char *) LocalBufHdrGetBlock(bufHdr);
3180 
3181 				/* Setup error traceback support for ereport() */
3182 				errcallback.callback = local_buffer_write_error_callback;
3183 				errcallback.arg = (void *) bufHdr;
3184 				errcallback.previous = error_context_stack;
3185 				error_context_stack = &errcallback;
3186 
3187 				PageSetChecksumInplace(localpage, bufHdr->tag.blockNum);
3188 
3189 				smgrwrite(rel->rd_smgr,
3190 						  bufHdr->tag.forkNum,
3191 						  bufHdr->tag.blockNum,
3192 						  localpage,
3193 						  false);
3194 
3195 				buf_state &= ~(BM_DIRTY | BM_JUST_DIRTIED);
3196 				pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
3197 
3198 				/* Pop the error context stack */
3199 				error_context_stack = errcallback.previous;
3200 			}
3201 		}
3202 
3203 		return;
3204 	}
3205 
3206 	/* Make sure we can handle the pin inside the loop */
3207 	ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
3208 
3209 	for (i = 0; i < NBuffers; i++)
3210 	{
3211 		uint32		buf_state;
3212 
3213 		bufHdr = GetBufferDescriptor(i);
3214 
3215 		/*
3216 		 * As in DropRelFileNodeBuffers, an unlocked precheck should be safe
3217 		 * and saves some cycles.
3218 		 */
3219 		if (!RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node))
3220 			continue;
3221 
3222 		ReservePrivateRefCountEntry();
3223 
3224 		buf_state = LockBufHdr(bufHdr);
3225 		if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) &&
3226 			(buf_state & (BM_VALID | BM_DIRTY)) == (BM_VALID | BM_DIRTY))
3227 		{
3228 			PinBuffer_Locked(bufHdr);
3229 			LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_SHARED);
3230 			FlushBuffer(bufHdr, rel->rd_smgr);
3231 			LWLockRelease(BufferDescriptorGetContentLock(bufHdr));
3232 			UnpinBuffer(bufHdr, true);
3233 		}
3234 		else
3235 			UnlockBufHdr(bufHdr, buf_state);
3236 	}
3237 }
3238 
3239 /* ---------------------------------------------------------------------
3240  *		FlushDatabaseBuffers
3241  *
3242  *		This function writes all dirty pages of a database out to disk
3243  *		(or more accurately, out to kernel disk buffers), ensuring that the
3244  *		kernel has an up-to-date view of the database.
3245  *
3246  *		Generally, the caller should be holding an appropriate lock to ensure
3247  *		no other backend is active in the target database; otherwise more
3248  *		pages could get dirtied.
3249  *
3250  *		Note we don't worry about flushing any pages of temporary relations.
3251  *		It's assumed these wouldn't be interesting.
3252  * --------------------------------------------------------------------
3253  */
3254 void
FlushDatabaseBuffers(Oid dbid)3255 FlushDatabaseBuffers(Oid dbid)
3256 {
3257 	int			i;
3258 	BufferDesc *bufHdr;
3259 
3260 	/* Make sure we can handle the pin inside the loop */
3261 	ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
3262 
3263 	for (i = 0; i < NBuffers; i++)
3264 	{
3265 		uint32		buf_state;
3266 
3267 		bufHdr = GetBufferDescriptor(i);
3268 
3269 		/*
3270 		 * As in DropRelFileNodeBuffers, an unlocked precheck should be safe
3271 		 * and saves some cycles.
3272 		 */
3273 		if (bufHdr->tag.rnode.dbNode != dbid)
3274 			continue;
3275 
3276 		ReservePrivateRefCountEntry();
3277 
3278 		buf_state = LockBufHdr(bufHdr);
3279 		if (bufHdr->tag.rnode.dbNode == dbid &&
3280 			(buf_state & (BM_VALID | BM_DIRTY)) == (BM_VALID | BM_DIRTY))
3281 		{
3282 			PinBuffer_Locked(bufHdr);
3283 			LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_SHARED);
3284 			FlushBuffer(bufHdr, NULL);
3285 			LWLockRelease(BufferDescriptorGetContentLock(bufHdr));
3286 			UnpinBuffer(bufHdr, true);
3287 		}
3288 		else
3289 			UnlockBufHdr(bufHdr, buf_state);
3290 	}
3291 }
3292 
3293 /*
3294  * Flush a previously, shared or exclusively, locked and pinned buffer to the
3295  * OS.
3296  */
3297 void
FlushOneBuffer(Buffer buffer)3298 FlushOneBuffer(Buffer buffer)
3299 {
3300 	BufferDesc *bufHdr;
3301 
3302 	/* currently not needed, but no fundamental reason not to support */
3303 	Assert(!BufferIsLocal(buffer));
3304 
3305 	Assert(BufferIsPinned(buffer));
3306 
3307 	bufHdr = GetBufferDescriptor(buffer - 1);
3308 
3309 	Assert(LWLockHeldByMe(BufferDescriptorGetContentLock(bufHdr)));
3310 
3311 	FlushBuffer(bufHdr, NULL);
3312 }
3313 
3314 /*
3315  * ReleaseBuffer -- release the pin on a buffer
3316  */
3317 void
ReleaseBuffer(Buffer buffer)3318 ReleaseBuffer(Buffer buffer)
3319 {
3320 	if (!BufferIsValid(buffer))
3321 		elog(ERROR, "bad buffer ID: %d", buffer);
3322 
3323 	if (BufferIsLocal(buffer))
3324 	{
3325 		ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer);
3326 
3327 		Assert(LocalRefCount[-buffer - 1] > 0);
3328 		LocalRefCount[-buffer - 1]--;
3329 		return;
3330 	}
3331 
3332 	UnpinBuffer(GetBufferDescriptor(buffer - 1), true);
3333 }
3334 
3335 /*
3336  * UnlockReleaseBuffer -- release the content lock and pin on a buffer
3337  *
3338  * This is just a shorthand for a common combination.
3339  */
3340 void
UnlockReleaseBuffer(Buffer buffer)3341 UnlockReleaseBuffer(Buffer buffer)
3342 {
3343 	LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
3344 	ReleaseBuffer(buffer);
3345 }
3346 
3347 /*
3348  * IncrBufferRefCount
3349  *		Increment the pin count on a buffer that we have *already* pinned
3350  *		at least once.
3351  *
3352  *		This function cannot be used on a buffer we do not have pinned,
3353  *		because it doesn't change the shared buffer state.
3354  */
3355 void
IncrBufferRefCount(Buffer buffer)3356 IncrBufferRefCount(Buffer buffer)
3357 {
3358 	Assert(BufferIsPinned(buffer));
3359 	ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
3360 	if (BufferIsLocal(buffer))
3361 		LocalRefCount[-buffer - 1]++;
3362 	else
3363 	{
3364 		PrivateRefCountEntry *ref;
3365 
3366 		ref = GetPrivateRefCountEntry(buffer, true);
3367 		Assert(ref != NULL);
3368 		ref->refcount++;
3369 	}
3370 	ResourceOwnerRememberBuffer(CurrentResourceOwner, buffer);
3371 }
3372 
3373 /*
3374  * MarkBufferDirtyHint
3375  *
3376  *	Mark a buffer dirty for non-critical changes.
3377  *
3378  * This is essentially the same as MarkBufferDirty, except:
3379  *
3380  * 1. The caller does not write WAL; so if checksums are enabled, we may need
3381  *	  to write an XLOG_FPI WAL record to protect against torn pages.
3382  * 2. The caller might have only share-lock instead of exclusive-lock on the
3383  *	  buffer's content lock.
3384  * 3. This function does not guarantee that the buffer is always marked dirty
3385  *	  (due to a race condition), so it cannot be used for important changes.
3386  */
3387 void
MarkBufferDirtyHint(Buffer buffer,bool buffer_std)3388 MarkBufferDirtyHint(Buffer buffer, bool buffer_std)
3389 {
3390 	BufferDesc *bufHdr;
3391 	Page		page = BufferGetPage(buffer);
3392 
3393 	if (!BufferIsValid(buffer))
3394 		elog(ERROR, "bad buffer ID: %d", buffer);
3395 
3396 	if (BufferIsLocal(buffer))
3397 	{
3398 		MarkLocalBufferDirty(buffer);
3399 		return;
3400 	}
3401 
3402 	bufHdr = GetBufferDescriptor(buffer - 1);
3403 
3404 	Assert(GetPrivateRefCount(buffer) > 0);
3405 	/* here, either share or exclusive lock is OK */
3406 	Assert(LWLockHeldByMe(BufferDescriptorGetContentLock(bufHdr)));
3407 
3408 	/*
3409 	 * This routine might get called many times on the same page, if we are
3410 	 * making the first scan after commit of an xact that added/deleted many
3411 	 * tuples. So, be as quick as we can if the buffer is already dirty.  We
3412 	 * do this by not acquiring spinlock if it looks like the status bits are
3413 	 * already set.  Since we make this test unlocked, there's a chance we
3414 	 * might fail to notice that the flags have just been cleared, and failed
3415 	 * to reset them, due to memory-ordering issues.  But since this function
3416 	 * is only intended to be used in cases where failing to write out the
3417 	 * data would be harmless anyway, it doesn't really matter.
3418 	 */
3419 	if ((pg_atomic_read_u32(&bufHdr->state) & (BM_DIRTY | BM_JUST_DIRTIED)) !=
3420 		(BM_DIRTY | BM_JUST_DIRTIED))
3421 	{
3422 		XLogRecPtr	lsn = InvalidXLogRecPtr;
3423 		bool		dirtied = false;
3424 		bool		delayChkpt = false;
3425 		uint32		buf_state;
3426 
3427 		/*
3428 		 * If we need to protect hint bit updates from torn writes, WAL-log a
3429 		 * full page image of the page. This full page image is only necessary
3430 		 * if the hint bit update is the first change to the page since the
3431 		 * last checkpoint.
3432 		 *
3433 		 * We don't check full_page_writes here because that logic is included
3434 		 * when we call XLogInsert() since the value changes dynamically.
3435 		 */
3436 		if (XLogHintBitIsNeeded() &&
3437 			(pg_atomic_read_u32(&bufHdr->state) & BM_PERMANENT))
3438 		{
3439 			/*
3440 			 * If we're in recovery we cannot dirty a page because of a hint.
3441 			 * We can set the hint, just not dirty the page as a result so the
3442 			 * hint is lost when we evict the page or shutdown.
3443 			 *
3444 			 * See src/backend/storage/page/README for longer discussion.
3445 			 */
3446 			if (RecoveryInProgress())
3447 				return;
3448 
3449 			/*
3450 			 * If the block is already dirty because we either made a change
3451 			 * or set a hint already, then we don't need to write a full page
3452 			 * image.  Note that aggressive cleaning of blocks dirtied by hint
3453 			 * bit setting would increase the call rate. Bulk setting of hint
3454 			 * bits would reduce the call rate...
3455 			 *
3456 			 * We must issue the WAL record before we mark the buffer dirty.
3457 			 * Otherwise we might write the page before we write the WAL. That
3458 			 * causes a race condition, since a checkpoint might occur between
3459 			 * writing the WAL record and marking the buffer dirty. We solve
3460 			 * that with a kluge, but one that is already in use during
3461 			 * transaction commit to prevent race conditions. Basically, we
3462 			 * simply prevent the checkpoint WAL record from being written
3463 			 * until we have marked the buffer dirty. We don't start the
3464 			 * checkpoint flush until we have marked dirty, so our checkpoint
3465 			 * must flush the change to disk successfully or the checkpoint
3466 			 * never gets written, so crash recovery will fix.
3467 			 *
3468 			 * It's possible we may enter here without an xid, so it is
3469 			 * essential that CreateCheckpoint waits for virtual transactions
3470 			 * rather than full transactionids.
3471 			 */
3472 			MyPgXact->delayChkpt = delayChkpt = true;
3473 			lsn = XLogSaveBufferForHint(buffer, buffer_std);
3474 		}
3475 
3476 		buf_state = LockBufHdr(bufHdr);
3477 
3478 		Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
3479 
3480 		if (!(buf_state & BM_DIRTY))
3481 		{
3482 			dirtied = true;		/* Means "will be dirtied by this action" */
3483 
3484 			/*
3485 			 * Set the page LSN if we wrote a backup block. We aren't supposed
3486 			 * to set this when only holding a share lock but as long as we
3487 			 * serialise it somehow we're OK. We choose to set LSN while
3488 			 * holding the buffer header lock, which causes any reader of an
3489 			 * LSN who holds only a share lock to also obtain a buffer header
3490 			 * lock before using PageGetLSN(), which is enforced in
3491 			 * BufferGetLSNAtomic().
3492 			 *
3493 			 * If checksums are enabled, you might think we should reset the
3494 			 * checksum here. That will happen when the page is written
3495 			 * sometime later in this checkpoint cycle.
3496 			 */
3497 			if (!XLogRecPtrIsInvalid(lsn))
3498 				PageSetLSN(page, lsn);
3499 		}
3500 
3501 		buf_state |= BM_DIRTY | BM_JUST_DIRTIED;
3502 		UnlockBufHdr(bufHdr, buf_state);
3503 
3504 		if (delayChkpt)
3505 			MyPgXact->delayChkpt = false;
3506 
3507 		if (dirtied)
3508 		{
3509 			VacuumPageDirty++;
3510 			pgBufferUsage.shared_blks_dirtied++;
3511 			if (VacuumCostActive)
3512 				VacuumCostBalance += VacuumCostPageDirty;
3513 		}
3514 	}
3515 }
3516 
3517 /*
3518  * Release buffer content locks for shared buffers.
3519  *
3520  * Used to clean up after errors.
3521  *
3522  * Currently, we can expect that lwlock.c's LWLockReleaseAll() took care
3523  * of releasing buffer content locks per se; the only thing we need to deal
3524  * with here is clearing any PIN_COUNT request that was in progress.
3525  */
3526 void
UnlockBuffers(void)3527 UnlockBuffers(void)
3528 {
3529 	BufferDesc *buf = PinCountWaitBuf;
3530 
3531 	if (buf)
3532 	{
3533 		uint32		buf_state;
3534 
3535 		buf_state = LockBufHdr(buf);
3536 
3537 		/*
3538 		 * Don't complain if flag bit not set; it could have been reset but we
3539 		 * got a cancel/die interrupt before getting the signal.
3540 		 */
3541 		if ((buf_state & BM_PIN_COUNT_WAITER) != 0 &&
3542 			buf->wait_backend_pid == MyProcPid)
3543 			buf_state &= ~BM_PIN_COUNT_WAITER;
3544 
3545 		UnlockBufHdr(buf, buf_state);
3546 
3547 		PinCountWaitBuf = NULL;
3548 	}
3549 }
3550 
3551 /*
3552  * Acquire or release the content_lock for the buffer.
3553  */
3554 void
LockBuffer(Buffer buffer,int mode)3555 LockBuffer(Buffer buffer, int mode)
3556 {
3557 	BufferDesc *buf;
3558 
3559 	Assert(BufferIsValid(buffer));
3560 	if (BufferIsLocal(buffer))
3561 		return;					/* local buffers need no lock */
3562 
3563 	buf = GetBufferDescriptor(buffer - 1);
3564 
3565 	if (mode == BUFFER_LOCK_UNLOCK)
3566 		LWLockRelease(BufferDescriptorGetContentLock(buf));
3567 	else if (mode == BUFFER_LOCK_SHARE)
3568 		LWLockAcquire(BufferDescriptorGetContentLock(buf), LW_SHARED);
3569 	else if (mode == BUFFER_LOCK_EXCLUSIVE)
3570 		LWLockAcquire(BufferDescriptorGetContentLock(buf), LW_EXCLUSIVE);
3571 	else
3572 		elog(ERROR, "unrecognized buffer lock mode: %d", mode);
3573 }
3574 
3575 /*
3576  * Acquire the content_lock for the buffer, but only if we don't have to wait.
3577  *
3578  * This assumes the caller wants BUFFER_LOCK_EXCLUSIVE mode.
3579  */
3580 bool
ConditionalLockBuffer(Buffer buffer)3581 ConditionalLockBuffer(Buffer buffer)
3582 {
3583 	BufferDesc *buf;
3584 
3585 	Assert(BufferIsValid(buffer));
3586 	if (BufferIsLocal(buffer))
3587 		return true;			/* act as though we got it */
3588 
3589 	buf = GetBufferDescriptor(buffer - 1);
3590 
3591 	return LWLockConditionalAcquire(BufferDescriptorGetContentLock(buf),
3592 									LW_EXCLUSIVE);
3593 }
3594 
3595 /*
3596  * LockBufferForCleanup - lock a buffer in preparation for deleting items
3597  *
3598  * Items may be deleted from a disk page only when the caller (a) holds an
3599  * exclusive lock on the buffer and (b) has observed that no other backend
3600  * holds a pin on the buffer.  If there is a pin, then the other backend
3601  * might have a pointer into the buffer (for example, a heapscan reference
3602  * to an item --- see README for more details).  It's OK if a pin is added
3603  * after the cleanup starts, however; the newly-arrived backend will be
3604  * unable to look at the page until we release the exclusive lock.
3605  *
3606  * To implement this protocol, a would-be deleter must pin the buffer and
3607  * then call LockBufferForCleanup().  LockBufferForCleanup() is similar to
3608  * LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE), except that it loops until
3609  * it has successfully observed pin count = 1.
3610  */
3611 void
LockBufferForCleanup(Buffer buffer)3612 LockBufferForCleanup(Buffer buffer)
3613 {
3614 	BufferDesc *bufHdr;
3615 
3616 	Assert(BufferIsValid(buffer));
3617 	Assert(PinCountWaitBuf == NULL);
3618 
3619 	if (BufferIsLocal(buffer))
3620 	{
3621 		/* There should be exactly one pin */
3622 		if (LocalRefCount[-buffer - 1] != 1)
3623 			elog(ERROR, "incorrect local pin count: %d",
3624 				 LocalRefCount[-buffer - 1]);
3625 		/* Nobody else to wait for */
3626 		return;
3627 	}
3628 
3629 	/* There should be exactly one local pin */
3630 	if (GetPrivateRefCount(buffer) != 1)
3631 		elog(ERROR, "incorrect local pin count: %d",
3632 			 GetPrivateRefCount(buffer));
3633 
3634 	bufHdr = GetBufferDescriptor(buffer - 1);
3635 
3636 	for (;;)
3637 	{
3638 		uint32		buf_state;
3639 
3640 		/* Try to acquire lock */
3641 		LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
3642 		buf_state = LockBufHdr(bufHdr);
3643 
3644 		Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
3645 		if (BUF_STATE_GET_REFCOUNT(buf_state) == 1)
3646 		{
3647 			/* Successfully acquired exclusive lock with pincount 1 */
3648 			UnlockBufHdr(bufHdr, buf_state);
3649 			return;
3650 		}
3651 		/* Failed, so mark myself as waiting for pincount 1 */
3652 		if (buf_state & BM_PIN_COUNT_WAITER)
3653 		{
3654 			UnlockBufHdr(bufHdr, buf_state);
3655 			LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
3656 			elog(ERROR, "multiple backends attempting to wait for pincount 1");
3657 		}
3658 		bufHdr->wait_backend_pid = MyProcPid;
3659 		PinCountWaitBuf = bufHdr;
3660 		buf_state |= BM_PIN_COUNT_WAITER;
3661 		UnlockBufHdr(bufHdr, buf_state);
3662 		LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
3663 
3664 		/* Wait to be signaled by UnpinBuffer() */
3665 		if (InHotStandby)
3666 		{
3667 			/* Publish the bufid that Startup process waits on */
3668 			SetStartupBufferPinWaitBufId(buffer - 1);
3669 			/* Set alarm and then wait to be signaled by UnpinBuffer() */
3670 			ResolveRecoveryConflictWithBufferPin();
3671 			/* Reset the published bufid */
3672 			SetStartupBufferPinWaitBufId(-1);
3673 		}
3674 		else
3675 			ProcWaitForSignal(PG_WAIT_BUFFER_PIN);
3676 
3677 		/*
3678 		 * Remove flag marking us as waiter. Normally this will not be set
3679 		 * anymore, but ProcWaitForSignal() can return for other signals as
3680 		 * well.  We take care to only reset the flag if we're the waiter, as
3681 		 * theoretically another backend could have started waiting. That's
3682 		 * impossible with the current usages due to table level locking, but
3683 		 * better be safe.
3684 		 */
3685 		buf_state = LockBufHdr(bufHdr);
3686 		if ((buf_state & BM_PIN_COUNT_WAITER) != 0 &&
3687 			bufHdr->wait_backend_pid == MyProcPid)
3688 			buf_state &= ~BM_PIN_COUNT_WAITER;
3689 		UnlockBufHdr(bufHdr, buf_state);
3690 
3691 		PinCountWaitBuf = NULL;
3692 		/* Loop back and try again */
3693 	}
3694 }
3695 
3696 /*
3697  * Check called from RecoveryConflictInterrupt handler when Startup
3698  * process requests cancellation of all pin holders that are blocking it.
3699  */
3700 bool
HoldingBufferPinThatDelaysRecovery(void)3701 HoldingBufferPinThatDelaysRecovery(void)
3702 {
3703 	int			bufid = GetStartupBufferPinWaitBufId();
3704 
3705 	/*
3706 	 * If we get woken slowly then it's possible that the Startup process was
3707 	 * already woken by other backends before we got here. Also possible that
3708 	 * we get here by multiple interrupts or interrupts at inappropriate
3709 	 * times, so make sure we do nothing if the bufid is not set.
3710 	 */
3711 	if (bufid < 0)
3712 		return false;
3713 
3714 	if (GetPrivateRefCount(bufid + 1) > 0)
3715 		return true;
3716 
3717 	return false;
3718 }
3719 
3720 /*
3721  * ConditionalLockBufferForCleanup - as above, but don't wait to get the lock
3722  *
3723  * We won't loop, but just check once to see if the pin count is OK.  If
3724  * not, return FALSE with no lock held.
3725  */
3726 bool
ConditionalLockBufferForCleanup(Buffer buffer)3727 ConditionalLockBufferForCleanup(Buffer buffer)
3728 {
3729 	BufferDesc *bufHdr;
3730 	uint32		buf_state,
3731 				refcount;
3732 
3733 	Assert(BufferIsValid(buffer));
3734 
3735 	if (BufferIsLocal(buffer))
3736 	{
3737 		refcount = LocalRefCount[-buffer - 1];
3738 		/* There should be exactly one pin */
3739 		Assert(refcount > 0);
3740 		if (refcount != 1)
3741 			return false;
3742 		/* Nobody else to wait for */
3743 		return true;
3744 	}
3745 
3746 	/* There should be exactly one local pin */
3747 	refcount = GetPrivateRefCount(buffer);
3748 	Assert(refcount);
3749 	if (refcount != 1)
3750 		return false;
3751 
3752 	/* Try to acquire lock */
3753 	if (!ConditionalLockBuffer(buffer))
3754 		return false;
3755 
3756 	bufHdr = GetBufferDescriptor(buffer - 1);
3757 	buf_state = LockBufHdr(bufHdr);
3758 	refcount = BUF_STATE_GET_REFCOUNT(buf_state);
3759 
3760 	Assert(refcount > 0);
3761 	if (refcount == 1)
3762 	{
3763 		/* Successfully acquired exclusive lock with pincount 1 */
3764 		UnlockBufHdr(bufHdr, buf_state);
3765 		return true;
3766 	}
3767 
3768 	/* Failed, so release the lock */
3769 	UnlockBufHdr(bufHdr, buf_state);
3770 	LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
3771 	return false;
3772 }
3773 
3774 /*
3775  * IsBufferCleanupOK - as above, but we already have the lock
3776  *
3777  * Check whether it's OK to perform cleanup on a buffer we've already
3778  * locked.  If we observe that the pin count is 1, our exclusive lock
3779  * happens to be a cleanup lock, and we can proceed with anything that
3780  * would have been allowable had we sought a cleanup lock originally.
3781  */
3782 bool
IsBufferCleanupOK(Buffer buffer)3783 IsBufferCleanupOK(Buffer buffer)
3784 {
3785 	BufferDesc *bufHdr;
3786 	uint32		buf_state;
3787 
3788 	Assert(BufferIsValid(buffer));
3789 
3790 	if (BufferIsLocal(buffer))
3791 	{
3792 		/* There should be exactly one pin */
3793 		if (LocalRefCount[-buffer - 1] != 1)
3794 			return false;
3795 		/* Nobody else to wait for */
3796 		return true;
3797 	}
3798 
3799 	/* There should be exactly one local pin */
3800 	if (GetPrivateRefCount(buffer) != 1)
3801 		return false;
3802 
3803 	bufHdr = GetBufferDescriptor(buffer - 1);
3804 
3805 	/* caller must hold exclusive lock on buffer */
3806 	Assert(LWLockHeldByMeInMode(BufferDescriptorGetContentLock(bufHdr),
3807 								LW_EXCLUSIVE));
3808 
3809 	buf_state = LockBufHdr(bufHdr);
3810 
3811 	Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
3812 	if (BUF_STATE_GET_REFCOUNT(buf_state) == 1)
3813 	{
3814 		/* pincount is OK. */
3815 		UnlockBufHdr(bufHdr, buf_state);
3816 		return true;
3817 	}
3818 
3819 	UnlockBufHdr(bufHdr, buf_state);
3820 	return false;
3821 }
3822 
3823 
3824 /*
3825  *	Functions for buffer I/O handling
3826  *
3827  *	Note: We assume that nested buffer I/O never occurs.
3828  *	i.e at most one io_in_progress lock is held per proc.
3829  *
3830  *	Also note that these are used only for shared buffers, not local ones.
3831  */
3832 
3833 /*
3834  * WaitIO -- Block until the IO_IN_PROGRESS flag on 'buf' is cleared.
3835  */
3836 static void
WaitIO(BufferDesc * buf)3837 WaitIO(BufferDesc *buf)
3838 {
3839 	/*
3840 	 * Changed to wait until there's no IO - Inoue 01/13/2000
3841 	 *
3842 	 * Note this is *necessary* because an error abort in the process doing
3843 	 * I/O could release the io_in_progress_lock prematurely. See
3844 	 * AbortBufferIO.
3845 	 */
3846 	for (;;)
3847 	{
3848 		uint32		buf_state;
3849 
3850 		/*
3851 		 * It may not be necessary to acquire the spinlock to check the flag
3852 		 * here, but since this test is essential for correctness, we'd better
3853 		 * play it safe.
3854 		 */
3855 		buf_state = LockBufHdr(buf);
3856 		UnlockBufHdr(buf, buf_state);
3857 
3858 		if (!(buf_state & BM_IO_IN_PROGRESS))
3859 			break;
3860 		LWLockAcquire(BufferDescriptorGetIOLock(buf), LW_SHARED);
3861 		LWLockRelease(BufferDescriptorGetIOLock(buf));
3862 	}
3863 }
3864 
3865 /*
3866  * StartBufferIO: begin I/O on this buffer
3867  *	(Assumptions)
3868  *	My process is executing no IO
3869  *	The buffer is Pinned
3870  *
3871  * In some scenarios there are race conditions in which multiple backends
3872  * could attempt the same I/O operation concurrently.  If someone else
3873  * has already started I/O on this buffer then we will block on the
3874  * io_in_progress lock until he's done.
3875  *
3876  * Input operations are only attempted on buffers that are not BM_VALID,
3877  * and output operations only on buffers that are BM_VALID and BM_DIRTY,
3878  * so we can always tell if the work is already done.
3879  *
3880  * Returns TRUE if we successfully marked the buffer as I/O busy,
3881  * FALSE if someone else already did the work.
3882  */
3883 static bool
StartBufferIO(BufferDesc * buf,bool forInput)3884 StartBufferIO(BufferDesc *buf, bool forInput)
3885 {
3886 	uint32		buf_state;
3887 
3888 	Assert(!InProgressBuf);
3889 
3890 	for (;;)
3891 	{
3892 		/*
3893 		 * Grab the io_in_progress lock so that other processes can wait for
3894 		 * me to finish the I/O.
3895 		 */
3896 		LWLockAcquire(BufferDescriptorGetIOLock(buf), LW_EXCLUSIVE);
3897 
3898 		buf_state = LockBufHdr(buf);
3899 
3900 		if (!(buf_state & BM_IO_IN_PROGRESS))
3901 			break;
3902 
3903 		/*
3904 		 * The only way BM_IO_IN_PROGRESS could be set when the io_in_progress
3905 		 * lock isn't held is if the process doing the I/O is recovering from
3906 		 * an error (see AbortBufferIO).  If that's the case, we must wait for
3907 		 * him to get unwedged.
3908 		 */
3909 		UnlockBufHdr(buf, buf_state);
3910 		LWLockRelease(BufferDescriptorGetIOLock(buf));
3911 		WaitIO(buf);
3912 	}
3913 
3914 	/* Once we get here, there is definitely no I/O active on this buffer */
3915 
3916 	if (forInput ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY))
3917 	{
3918 		/* someone else already did the I/O */
3919 		UnlockBufHdr(buf, buf_state);
3920 		LWLockRelease(BufferDescriptorGetIOLock(buf));
3921 		return false;
3922 	}
3923 
3924 	buf_state |= BM_IO_IN_PROGRESS;
3925 	UnlockBufHdr(buf, buf_state);
3926 
3927 	InProgressBuf = buf;
3928 	IsForInput = forInput;
3929 
3930 	return true;
3931 }
3932 
3933 /*
3934  * TerminateBufferIO: release a buffer we were doing I/O on
3935  *	(Assumptions)
3936  *	My process is executing IO for the buffer
3937  *	BM_IO_IN_PROGRESS bit is set for the buffer
3938  *	We hold the buffer's io_in_progress lock
3939  *	The buffer is Pinned
3940  *
3941  * If clear_dirty is TRUE and BM_JUST_DIRTIED is not set, we clear the
3942  * buffer's BM_DIRTY flag.  This is appropriate when terminating a
3943  * successful write.  The check on BM_JUST_DIRTIED is necessary to avoid
3944  * marking the buffer clean if it was re-dirtied while we were writing.
3945  *
3946  * set_flag_bits gets ORed into the buffer's flags.  It must include
3947  * BM_IO_ERROR in a failure case.  For successful completion it could
3948  * be 0, or BM_VALID if we just finished reading in the page.
3949  */
3950 static void
TerminateBufferIO(BufferDesc * buf,bool clear_dirty,uint32 set_flag_bits)3951 TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits)
3952 {
3953 	uint32		buf_state;
3954 
3955 	Assert(buf == InProgressBuf);
3956 
3957 	buf_state = LockBufHdr(buf);
3958 
3959 	Assert(buf_state & BM_IO_IN_PROGRESS);
3960 
3961 	buf_state &= ~(BM_IO_IN_PROGRESS | BM_IO_ERROR);
3962 	if (clear_dirty && !(buf_state & BM_JUST_DIRTIED))
3963 		buf_state &= ~(BM_DIRTY | BM_CHECKPOINT_NEEDED);
3964 
3965 	buf_state |= set_flag_bits;
3966 	UnlockBufHdr(buf, buf_state);
3967 
3968 	InProgressBuf = NULL;
3969 
3970 	LWLockRelease(BufferDescriptorGetIOLock(buf));
3971 }
3972 
3973 /*
3974  * AbortBufferIO: Clean up any active buffer I/O after an error.
3975  *
3976  *	All LWLocks we might have held have been released,
3977  *	but we haven't yet released buffer pins, so the buffer is still pinned.
3978  *
3979  *	If I/O was in progress, we always set BM_IO_ERROR, even though it's
3980  *	possible the error condition wasn't related to the I/O.
3981  */
3982 void
AbortBufferIO(void)3983 AbortBufferIO(void)
3984 {
3985 	BufferDesc *buf = InProgressBuf;
3986 
3987 	if (buf)
3988 	{
3989 		uint32		buf_state;
3990 
3991 		/*
3992 		 * Since LWLockReleaseAll has already been called, we're not holding
3993 		 * the buffer's io_in_progress_lock. We have to re-acquire it so that
3994 		 * we can use TerminateBufferIO. Anyone who's executing WaitIO on the
3995 		 * buffer will be in a busy spin until we succeed in doing this.
3996 		 */
3997 		LWLockAcquire(BufferDescriptorGetIOLock(buf), LW_EXCLUSIVE);
3998 
3999 		buf_state = LockBufHdr(buf);
4000 		Assert(buf_state & BM_IO_IN_PROGRESS);
4001 		if (IsForInput)
4002 		{
4003 			Assert(!(buf_state & BM_DIRTY));
4004 
4005 			/* We'd better not think buffer is valid yet */
4006 			Assert(!(buf_state & BM_VALID));
4007 			UnlockBufHdr(buf, buf_state);
4008 		}
4009 		else
4010 		{
4011 			Assert(buf_state & BM_DIRTY);
4012 			UnlockBufHdr(buf, buf_state);
4013 			/* Issue notice if this is not the first failure... */
4014 			if (buf_state & BM_IO_ERROR)
4015 			{
4016 				/* Buffer is pinned, so we can read tag without spinlock */
4017 				char	   *path;
4018 
4019 				path = relpathperm(buf->tag.rnode, buf->tag.forkNum);
4020 				ereport(WARNING,
4021 						(errcode(ERRCODE_IO_ERROR),
4022 						 errmsg("could not write block %u of %s",
4023 								buf->tag.blockNum, path),
4024 						 errdetail("Multiple failures --- write error might be permanent.")));
4025 				pfree(path);
4026 			}
4027 		}
4028 		TerminateBufferIO(buf, false, BM_IO_ERROR);
4029 	}
4030 }
4031 
4032 /*
4033  * Error context callback for errors occurring during shared buffer writes.
4034  */
4035 static void
shared_buffer_write_error_callback(void * arg)4036 shared_buffer_write_error_callback(void *arg)
4037 {
4038 	BufferDesc *bufHdr = (BufferDesc *) arg;
4039 
4040 	/* Buffer is pinned, so we can read the tag without locking the spinlock */
4041 	if (bufHdr != NULL)
4042 	{
4043 		char	   *path = relpathperm(bufHdr->tag.rnode, bufHdr->tag.forkNum);
4044 
4045 		errcontext("writing block %u of relation %s",
4046 				   bufHdr->tag.blockNum, path);
4047 		pfree(path);
4048 	}
4049 }
4050 
4051 /*
4052  * Error context callback for errors occurring during local buffer writes.
4053  */
4054 static void
local_buffer_write_error_callback(void * arg)4055 local_buffer_write_error_callback(void *arg)
4056 {
4057 	BufferDesc *bufHdr = (BufferDesc *) arg;
4058 
4059 	if (bufHdr != NULL)
4060 	{
4061 		char	   *path = relpathbackend(bufHdr->tag.rnode, MyBackendId,
4062 										  bufHdr->tag.forkNum);
4063 
4064 		errcontext("writing block %u of relation %s",
4065 				   bufHdr->tag.blockNum, path);
4066 		pfree(path);
4067 	}
4068 }
4069 
4070 /*
4071  * RelFileNode qsort/bsearch comparator; see RelFileNodeEquals.
4072  */
4073 static int
rnode_comparator(const void * p1,const void * p2)4074 rnode_comparator(const void *p1, const void *p2)
4075 {
4076 	RelFileNode n1 = *(const RelFileNode *) p1;
4077 	RelFileNode n2 = *(const RelFileNode *) p2;
4078 
4079 	if (n1.relNode < n2.relNode)
4080 		return -1;
4081 	else if (n1.relNode > n2.relNode)
4082 		return 1;
4083 
4084 	if (n1.dbNode < n2.dbNode)
4085 		return -1;
4086 	else if (n1.dbNode > n2.dbNode)
4087 		return 1;
4088 
4089 	if (n1.spcNode < n2.spcNode)
4090 		return -1;
4091 	else if (n1.spcNode > n2.spcNode)
4092 		return 1;
4093 	else
4094 		return 0;
4095 }
4096 
4097 /*
4098  * Lock buffer header - set BM_LOCKED in buffer state.
4099  */
4100 uint32
LockBufHdr(BufferDesc * desc)4101 LockBufHdr(BufferDesc *desc)
4102 {
4103 	SpinDelayStatus delayStatus;
4104 	uint32		old_buf_state;
4105 
4106 	init_local_spin_delay(&delayStatus);
4107 
4108 	while (true)
4109 	{
4110 		/* set BM_LOCKED flag */
4111 		old_buf_state = pg_atomic_fetch_or_u32(&desc->state, BM_LOCKED);
4112 		/* if it wasn't set before we're OK */
4113 		if (!(old_buf_state & BM_LOCKED))
4114 			break;
4115 		perform_spin_delay(&delayStatus);
4116 	}
4117 	finish_spin_delay(&delayStatus);
4118 	return old_buf_state | BM_LOCKED;
4119 }
4120 
4121 /*
4122  * Wait until the BM_LOCKED flag isn't set anymore and return the buffer's
4123  * state at that point.
4124  *
4125  * Obviously the buffer could be locked by the time the value is returned, so
4126  * this is primarily useful in CAS style loops.
4127  */
4128 static uint32
WaitBufHdrUnlocked(BufferDesc * buf)4129 WaitBufHdrUnlocked(BufferDesc *buf)
4130 {
4131 	SpinDelayStatus delayStatus;
4132 	uint32		buf_state;
4133 
4134 	init_local_spin_delay(&delayStatus);
4135 
4136 	buf_state = pg_atomic_read_u32(&buf->state);
4137 
4138 	while (buf_state & BM_LOCKED)
4139 	{
4140 		perform_spin_delay(&delayStatus);
4141 		buf_state = pg_atomic_read_u32(&buf->state);
4142 	}
4143 
4144 	finish_spin_delay(&delayStatus);
4145 
4146 	return buf_state;
4147 }
4148 
4149 /*
4150  * BufferTag comparator.
4151  */
4152 static int
buffertag_comparator(const void * a,const void * b)4153 buffertag_comparator(const void *a, const void *b)
4154 {
4155 	const BufferTag *ba = (const BufferTag *) a;
4156 	const BufferTag *bb = (const BufferTag *) b;
4157 	int			ret;
4158 
4159 	ret = rnode_comparator(&ba->rnode, &bb->rnode);
4160 
4161 	if (ret != 0)
4162 		return ret;
4163 
4164 	if (ba->forkNum < bb->forkNum)
4165 		return -1;
4166 	if (ba->forkNum > bb->forkNum)
4167 		return 1;
4168 
4169 	if (ba->blockNum < bb->blockNum)
4170 		return -1;
4171 	if (ba->blockNum > bb->blockNum)
4172 		return 1;
4173 
4174 	return 0;
4175 }
4176 
4177 /*
4178  * Comparator determining the writeout order in a checkpoint.
4179  *
4180  * It is important that tablespaces are compared first, the logic balancing
4181  * writes between tablespaces relies on it.
4182  */
4183 static int
ckpt_buforder_comparator(const void * pa,const void * pb)4184 ckpt_buforder_comparator(const void *pa, const void *pb)
4185 {
4186 	const CkptSortItem *a = (const CkptSortItem *) pa;
4187 	const CkptSortItem *b = (const CkptSortItem *) pb;
4188 
4189 	/* compare tablespace */
4190 	if (a->tsId < b->tsId)
4191 		return -1;
4192 	else if (a->tsId > b->tsId)
4193 		return 1;
4194 	/* compare relation */
4195 	if (a->relNode < b->relNode)
4196 		return -1;
4197 	else if (a->relNode > b->relNode)
4198 		return 1;
4199 	/* compare fork */
4200 	else if (a->forkNum < b->forkNum)
4201 		return -1;
4202 	else if (a->forkNum > b->forkNum)
4203 		return 1;
4204 	/* compare block number */
4205 	else if (a->blockNum < b->blockNum)
4206 		return -1;
4207 	else if (a->blockNum > b->blockNum)
4208 		return 1;
4209 	/* equal page IDs are unlikely, but not impossible */
4210 	return 0;
4211 }
4212 
4213 /*
4214  * Comparator for a Min-Heap over the per-tablespace checkpoint completion
4215  * progress.
4216  */
4217 static int
ts_ckpt_progress_comparator(Datum a,Datum b,void * arg)4218 ts_ckpt_progress_comparator(Datum a, Datum b, void *arg)
4219 {
4220 	CkptTsStatus *sa = (CkptTsStatus *) a;
4221 	CkptTsStatus *sb = (CkptTsStatus *) b;
4222 
4223 	/* we want a min-heap, so return 1 for the a < b */
4224 	if (sa->progress < sb->progress)
4225 		return 1;
4226 	else if (sa->progress == sb->progress)
4227 		return 0;
4228 	else
4229 		return -1;
4230 }
4231 
4232 /*
4233  * Initialize a writeback context, discarding potential previous state.
4234  *
4235  * *max_pending is a pointer instead of an immediate value, so the coalesce
4236  * limits can easily changed by the GUC mechanism, and so calling code does
4237  * not have to check the current configuration. A value is 0 means that no
4238  * writeback control will be performed.
4239  */
4240 void
WritebackContextInit(WritebackContext * context,int * max_pending)4241 WritebackContextInit(WritebackContext *context, int *max_pending)
4242 {
4243 	Assert(*max_pending <= WRITEBACK_MAX_PENDING_FLUSHES);
4244 
4245 	context->max_pending = max_pending;
4246 	context->nr_pending = 0;
4247 }
4248 
4249 /*
4250  * Add buffer to list of pending writeback requests.
4251  */
4252 void
ScheduleBufferTagForWriteback(WritebackContext * context,BufferTag * tag)4253 ScheduleBufferTagForWriteback(WritebackContext *context, BufferTag *tag)
4254 {
4255 	PendingWriteback *pending;
4256 
4257 	/*
4258 	 * Add buffer to the pending writeback array, unless writeback control is
4259 	 * disabled.
4260 	 */
4261 	if (*context->max_pending > 0)
4262 	{
4263 		Assert(*context->max_pending <= WRITEBACK_MAX_PENDING_FLUSHES);
4264 
4265 		pending = &context->pending_writebacks[context->nr_pending++];
4266 
4267 		pending->tag = *tag;
4268 	}
4269 
4270 	/*
4271 	 * Perform pending flushes if the writeback limit is exceeded. This
4272 	 * includes the case where previously an item has been added, but control
4273 	 * is now disabled.
4274 	 */
4275 	if (context->nr_pending >= *context->max_pending)
4276 		IssuePendingWritebacks(context);
4277 }
4278 
4279 /*
4280  * Issue all pending writeback requests, previously scheduled with
4281  * ScheduleBufferTagForWriteback, to the OS.
4282  *
4283  * Because this is only used to improve the OSs IO scheduling we try to never
4284  * error out - it's just a hint.
4285  */
4286 void
IssuePendingWritebacks(WritebackContext * context)4287 IssuePendingWritebacks(WritebackContext *context)
4288 {
4289 	int			i;
4290 
4291 	if (context->nr_pending == 0)
4292 		return;
4293 
4294 	/*
4295 	 * Executing the writes in-order can make them a lot faster, and allows to
4296 	 * merge writeback requests to consecutive blocks into larger writebacks.
4297 	 */
4298 	qsort(&context->pending_writebacks, context->nr_pending,
4299 		  sizeof(PendingWriteback), buffertag_comparator);
4300 
4301 	/*
4302 	 * Coalesce neighbouring writes, but nothing else. For that we iterate
4303 	 * through the, now sorted, array of pending flushes, and look forward to
4304 	 * find all neighbouring (or identical) writes.
4305 	 */
4306 	for (i = 0; i < context->nr_pending; i++)
4307 	{
4308 		PendingWriteback *cur;
4309 		PendingWriteback *next;
4310 		SMgrRelation reln;
4311 		int			ahead;
4312 		BufferTag	tag;
4313 		Size		nblocks = 1;
4314 
4315 		cur = &context->pending_writebacks[i];
4316 		tag = cur->tag;
4317 
4318 		/*
4319 		 * Peek ahead, into following writeback requests, to see if they can
4320 		 * be combined with the current one.
4321 		 */
4322 		for (ahead = 0; i + ahead + 1 < context->nr_pending; ahead++)
4323 		{
4324 			next = &context->pending_writebacks[i + ahead + 1];
4325 
4326 			/* different file, stop */
4327 			if (!RelFileNodeEquals(cur->tag.rnode, next->tag.rnode) ||
4328 				cur->tag.forkNum != next->tag.forkNum)
4329 				break;
4330 
4331 			/* ok, block queued twice, skip */
4332 			if (cur->tag.blockNum == next->tag.blockNum)
4333 				continue;
4334 
4335 			/* only merge consecutive writes */
4336 			if (cur->tag.blockNum + 1 != next->tag.blockNum)
4337 				break;
4338 
4339 			nblocks++;
4340 			cur = next;
4341 		}
4342 
4343 		i += ahead;
4344 
4345 		/* and finally tell the kernel to write the data to storage */
4346 		reln = smgropen(tag.rnode, InvalidBackendId);
4347 		smgrwriteback(reln, tag.forkNum, tag.blockNum, nblocks);
4348 	}
4349 
4350 	context->nr_pending = 0;
4351 }
4352 
4353 
4354 /*
4355  * Implement slower/larger portions of TestForOldSnapshot
4356  *
4357  * Smaller/faster portions are put inline, but the entire set of logic is too
4358  * big for that.
4359  */
4360 void
TestForOldSnapshot_impl(Snapshot snapshot,Relation relation)4361 TestForOldSnapshot_impl(Snapshot snapshot, Relation relation)
4362 {
4363 	if (RelationAllowsEarlyPruning(relation)
4364 		&& (snapshot)->whenTaken < GetOldSnapshotThresholdTimestamp())
4365 		ereport(ERROR,
4366 				(errcode(ERRCODE_SNAPSHOT_TOO_OLD),
4367 				 errmsg("snapshot too old")));
4368 }
4369