1 /*-------------------------------------------------------------------------
2 *
3 * bufmgr.c
4 * buffer manager interface routines
5 *
6 * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 *
10 * IDENTIFICATION
11 * src/backend/storage/buffer/bufmgr.c
12 *
13 *-------------------------------------------------------------------------
14 */
15 /*
16 * Principal entry points:
17 *
18 * ReadBuffer() -- find or create a buffer holding the requested page,
19 * and pin it so that no one can destroy it while this process
20 * is using it.
21 *
22 * ReleaseBuffer() -- unpin a buffer
23 *
24 * MarkBufferDirty() -- mark a pinned buffer's contents as "dirty".
25 * The disk write is delayed until buffer replacement or checkpoint.
26 *
27 * See also these files:
28 * freelist.c -- chooses victim for buffer replacement
29 * buf_table.c -- manages the buffer lookup table
30 */
31 #include "postgres.h"
32
33 #include <sys/file.h>
34 #include <unistd.h>
35
36 #include "access/xlog.h"
37 #include "catalog/catalog.h"
38 #include "catalog/storage.h"
39 #include "executor/instrument.h"
40 #include "lib/binaryheap.h"
41 #include "miscadmin.h"
42 #include "pg_trace.h"
43 #include "pgstat.h"
44 #include "postmaster/bgwriter.h"
45 #include "storage/buf_internals.h"
46 #include "storage/bufmgr.h"
47 #include "storage/ipc.h"
48 #include "storage/proc.h"
49 #include "storage/smgr.h"
50 #include "storage/standby.h"
51 #include "utils/rel.h"
52 #include "utils/resowner_private.h"
53 #include "utils/timestamp.h"
54
55
56 /* Note: these two macros only work on shared buffers, not local ones! */
57 #define BufHdrGetBlock(bufHdr) ((Block) (BufferBlocks + ((Size) (bufHdr)->buf_id) * BLCKSZ))
58 #define BufferGetLSN(bufHdr) (PageGetLSN(BufHdrGetBlock(bufHdr)))
59
60 /* Note: this macro only works on local buffers, not shared ones! */
61 #define LocalBufHdrGetBlock(bufHdr) \
62 LocalBufferBlockPointers[-((bufHdr)->buf_id + 2)]
63
64 /* Bits in SyncOneBuffer's return value */
65 #define BUF_WRITTEN 0x01
66 #define BUF_REUSABLE 0x02
67
68 #define DROP_RELS_BSEARCH_THRESHOLD 20
69
70 typedef struct PrivateRefCountEntry
71 {
72 Buffer buffer;
73 int32 refcount;
74 } PrivateRefCountEntry;
75
76 /* 64 bytes, about the size of a cache line on common systems */
77 #define REFCOUNT_ARRAY_ENTRIES 8
78
79 /*
80 * Status of buffers to checkpoint for a particular tablespace, used
81 * internally in BufferSync.
82 */
83 typedef struct CkptTsStatus
84 {
85 /* oid of the tablespace */
86 Oid tsId;
87
88 /*
89 * Checkpoint progress for this tablespace. To make progress comparable
90 * between tablespaces the progress is, for each tablespace, measured as a
91 * number between 0 and the total number of to-be-checkpointed pages. Each
92 * page checkpointed in this tablespace increments this space's progress
93 * by progress_slice.
94 */
95 float8 progress;
96 float8 progress_slice;
97
98 /* number of to-be checkpointed pages in this tablespace */
99 int num_to_scan;
100 /* already processed pages in this tablespace */
101 int num_scanned;
102
103 /* current offset in CkptBufferIds for this tablespace */
104 int index;
105 } CkptTsStatus;
106
107 /* GUC variables */
108 bool zero_damaged_pages = false;
109 int bgwriter_lru_maxpages = 100;
110 double bgwriter_lru_multiplier = 2.0;
111 bool track_io_timing = false;
112 int effective_io_concurrency = 0;
113
114 /*
115 * GUC variables about triggering kernel writeback for buffers written; OS
116 * dependent defaults are set via the GUC mechanism.
117 */
118 int checkpoint_flush_after = 0;
119 int bgwriter_flush_after = 0;
120 int backend_flush_after = 0;
121
122 /*
123 * How many buffers PrefetchBuffer callers should try to stay ahead of their
124 * ReadBuffer calls by. This is maintained by the assign hook for
125 * effective_io_concurrency. Zero means "never prefetch". This value is
126 * only used for buffers not belonging to tablespaces that have their
127 * effective_io_concurrency parameter set.
128 */
129 int target_prefetch_pages = 0;
130
131 /* local state for StartBufferIO and related functions */
132 static BufferDesc *InProgressBuf = NULL;
133 static bool IsForInput;
134
135 /* local state for LockBufferForCleanup */
136 static BufferDesc *PinCountWaitBuf = NULL;
137
138 /*
139 * Backend-Private refcount management:
140 *
141 * Each buffer also has a private refcount that keeps track of the number of
142 * times the buffer is pinned in the current process. This is so that the
143 * shared refcount needs to be modified only once if a buffer is pinned more
144 * than once by an individual backend. It's also used to check that no buffers
145 * are still pinned at the end of transactions and when exiting.
146 *
147 *
148 * To avoid - as we used to - requiring an array with NBuffers entries to keep
149 * track of local buffers, we use a small sequentially searched array
150 * (PrivateRefCountArray) and an overflow hash table (PrivateRefCountHash) to
151 * keep track of backend local pins.
152 *
153 * Until no more than REFCOUNT_ARRAY_ENTRIES buffers are pinned at once, all
154 * refcounts are kept track of in the array; after that, new array entries
155 * displace old ones into the hash table. That way a frequently used entry
156 * can't get "stuck" in the hashtable while infrequent ones clog the array.
157 *
158 * Note that in most scenarios the number of pinned buffers will not exceed
159 * REFCOUNT_ARRAY_ENTRIES.
160 *
161 *
162 * To enter a buffer into the refcount tracking mechanism first reserve a free
163 * entry using ReservePrivateRefCountEntry() and then later, if necessary,
164 * fill it with NewPrivateRefCountEntry(). That split lets us avoid doing
165 * memory allocations in NewPrivateRefCountEntry() which can be important
166 * because in some scenarios it's called with a spinlock held...
167 */
168 static struct PrivateRefCountEntry PrivateRefCountArray[REFCOUNT_ARRAY_ENTRIES];
169 static HTAB *PrivateRefCountHash = NULL;
170 static int32 PrivateRefCountOverflowed = 0;
171 static uint32 PrivateRefCountClock = 0;
172 static PrivateRefCountEntry *ReservedRefCountEntry = NULL;
173
174 static void ReservePrivateRefCountEntry(void);
175 static PrivateRefCountEntry *NewPrivateRefCountEntry(Buffer buffer);
176 static PrivateRefCountEntry *GetPrivateRefCountEntry(Buffer buffer, bool do_move);
177 static inline int32 GetPrivateRefCount(Buffer buffer);
178 static void ForgetPrivateRefCountEntry(PrivateRefCountEntry *ref);
179
180 /*
181 * Ensure that the PrivateRefCountArray has sufficient space to store one more
182 * entry. This has to be called before using NewPrivateRefCountEntry() to fill
183 * a new entry - but it's perfectly fine to not use a reserved entry.
184 */
185 static void
ReservePrivateRefCountEntry(void)186 ReservePrivateRefCountEntry(void)
187 {
188 /* Already reserved (or freed), nothing to do */
189 if (ReservedRefCountEntry != NULL)
190 return;
191
192 /*
193 * First search for a free entry the array, that'll be sufficient in the
194 * majority of cases.
195 */
196 {
197 int i;
198
199 for (i = 0; i < REFCOUNT_ARRAY_ENTRIES; i++)
200 {
201 PrivateRefCountEntry *res;
202
203 res = &PrivateRefCountArray[i];
204
205 if (res->buffer == InvalidBuffer)
206 {
207 ReservedRefCountEntry = res;
208 return;
209 }
210 }
211 }
212
213 /*
214 * No luck. All array entries are full. Move one array entry into the hash
215 * table.
216 */
217 {
218 /*
219 * Move entry from the current clock position in the array into the
220 * hashtable. Use that slot.
221 */
222 PrivateRefCountEntry *hashent;
223 bool found;
224
225 /* select victim slot */
226 ReservedRefCountEntry =
227 &PrivateRefCountArray[PrivateRefCountClock++ % REFCOUNT_ARRAY_ENTRIES];
228
229 /* Better be used, otherwise we shouldn't get here. */
230 Assert(ReservedRefCountEntry->buffer != InvalidBuffer);
231
232 /* enter victim array entry into hashtable */
233 hashent = hash_search(PrivateRefCountHash,
234 (void *) &(ReservedRefCountEntry->buffer),
235 HASH_ENTER,
236 &found);
237 Assert(!found);
238 hashent->refcount = ReservedRefCountEntry->refcount;
239
240 /* clear the now free array slot */
241 ReservedRefCountEntry->buffer = InvalidBuffer;
242 ReservedRefCountEntry->refcount = 0;
243
244 PrivateRefCountOverflowed++;
245 }
246 }
247
248 /*
249 * Fill a previously reserved refcount entry.
250 */
251 static PrivateRefCountEntry *
NewPrivateRefCountEntry(Buffer buffer)252 NewPrivateRefCountEntry(Buffer buffer)
253 {
254 PrivateRefCountEntry *res;
255
256 /* only allowed to be called when a reservation has been made */
257 Assert(ReservedRefCountEntry != NULL);
258
259 /* use up the reserved entry */
260 res = ReservedRefCountEntry;
261 ReservedRefCountEntry = NULL;
262
263 /* and fill it */
264 res->buffer = buffer;
265 res->refcount = 0;
266
267 return res;
268 }
269
270 /*
271 * Return the PrivateRefCount entry for the passed buffer.
272 *
273 * Returns NULL if a buffer doesn't have a refcount entry. Otherwise, if
274 * do_move is true, and the entry resides in the hashtable the entry is
275 * optimized for frequent access by moving it to the array.
276 */
277 static PrivateRefCountEntry *
GetPrivateRefCountEntry(Buffer buffer,bool do_move)278 GetPrivateRefCountEntry(Buffer buffer, bool do_move)
279 {
280 PrivateRefCountEntry *res;
281 int i;
282
283 Assert(BufferIsValid(buffer));
284 Assert(!BufferIsLocal(buffer));
285
286 /*
287 * First search for references in the array, that'll be sufficient in the
288 * majority of cases.
289 */
290 for (i = 0; i < REFCOUNT_ARRAY_ENTRIES; i++)
291 {
292 res = &PrivateRefCountArray[i];
293
294 if (res->buffer == buffer)
295 return res;
296 }
297
298 /*
299 * By here we know that the buffer, if already pinned, isn't residing in
300 * the array.
301 *
302 * Only look up the buffer in the hashtable if we've previously overflowed
303 * into it.
304 */
305 if (PrivateRefCountOverflowed == 0)
306 return NULL;
307
308 res = hash_search(PrivateRefCountHash,
309 (void *) &buffer,
310 HASH_FIND,
311 NULL);
312
313 if (res == NULL)
314 return NULL;
315 else if (!do_move)
316 {
317 /* caller doesn't want us to move the hash entry into the array */
318 return res;
319 }
320 else
321 {
322 /* move buffer from hashtable into the free array slot */
323 bool found;
324 PrivateRefCountEntry *free;
325
326 /* Ensure there's a free array slot */
327 ReservePrivateRefCountEntry();
328
329 /* Use up the reserved slot */
330 Assert(ReservedRefCountEntry != NULL);
331 free = ReservedRefCountEntry;
332 ReservedRefCountEntry = NULL;
333 Assert(free->buffer == InvalidBuffer);
334
335 /* and fill it */
336 free->buffer = buffer;
337 free->refcount = res->refcount;
338
339 /* delete from hashtable */
340 hash_search(PrivateRefCountHash,
341 (void *) &buffer,
342 HASH_REMOVE,
343 &found);
344 Assert(found);
345 Assert(PrivateRefCountOverflowed > 0);
346 PrivateRefCountOverflowed--;
347
348 return free;
349 }
350 }
351
352 /*
353 * Returns how many times the passed buffer is pinned by this backend.
354 *
355 * Only works for shared memory buffers!
356 */
357 static inline int32
GetPrivateRefCount(Buffer buffer)358 GetPrivateRefCount(Buffer buffer)
359 {
360 PrivateRefCountEntry *ref;
361
362 Assert(BufferIsValid(buffer));
363 Assert(!BufferIsLocal(buffer));
364
365 /*
366 * Not moving the entry - that's ok for the current users, but we might
367 * want to change this one day.
368 */
369 ref = GetPrivateRefCountEntry(buffer, false);
370
371 if (ref == NULL)
372 return 0;
373 return ref->refcount;
374 }
375
376 /*
377 * Release resources used to track the reference count of a buffer which we no
378 * longer have pinned and don't want to pin again immediately.
379 */
380 static void
ForgetPrivateRefCountEntry(PrivateRefCountEntry * ref)381 ForgetPrivateRefCountEntry(PrivateRefCountEntry *ref)
382 {
383 Assert(ref->refcount == 0);
384
385 if (ref >= &PrivateRefCountArray[0] &&
386 ref < &PrivateRefCountArray[REFCOUNT_ARRAY_ENTRIES])
387 {
388 ref->buffer = InvalidBuffer;
389
390 /*
391 * Mark the just used entry as reserved - in many scenarios that
392 * allows us to avoid ever having to search the array/hash for free
393 * entries.
394 */
395 ReservedRefCountEntry = ref;
396 }
397 else
398 {
399 bool found;
400 Buffer buffer = ref->buffer;
401
402 hash_search(PrivateRefCountHash,
403 (void *) &buffer,
404 HASH_REMOVE,
405 &found);
406 Assert(found);
407 Assert(PrivateRefCountOverflowed > 0);
408 PrivateRefCountOverflowed--;
409 }
410 }
411
412 /*
413 * BufferIsPinned
414 * True iff the buffer is pinned (also checks for valid buffer number).
415 *
416 * NOTE: what we check here is that *this* backend holds a pin on
417 * the buffer. We do not care whether some other backend does.
418 */
419 #define BufferIsPinned(bufnum) \
420 ( \
421 !BufferIsValid(bufnum) ? \
422 false \
423 : \
424 BufferIsLocal(bufnum) ? \
425 (LocalRefCount[-(bufnum) - 1] > 0) \
426 : \
427 (GetPrivateRefCount(bufnum) > 0) \
428 )
429
430
431 static Buffer ReadBuffer_common(SMgrRelation reln, char relpersistence,
432 ForkNumber forkNum, BlockNumber blockNum,
433 ReadBufferMode mode, BufferAccessStrategy strategy,
434 bool *hit);
435 static bool PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy);
436 static void PinBuffer_Locked(BufferDesc *buf);
437 static void UnpinBuffer(BufferDesc *buf, bool fixOwner);
438 static void BufferSync(int flags);
439 static uint32 WaitBufHdrUnlocked(BufferDesc *buf);
440 static int SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *flush_context);
441 static void WaitIO(BufferDesc *buf);
442 static bool StartBufferIO(BufferDesc *buf, bool forInput);
443 static void TerminateBufferIO(BufferDesc *buf, bool clear_dirty,
444 uint32 set_flag_bits);
445 static void shared_buffer_write_error_callback(void *arg);
446 static void local_buffer_write_error_callback(void *arg);
447 static BufferDesc *BufferAlloc(SMgrRelation smgr,
448 char relpersistence,
449 ForkNumber forkNum,
450 BlockNumber blockNum,
451 BufferAccessStrategy strategy,
452 bool *foundPtr);
453 static void FlushBuffer(BufferDesc *buf, SMgrRelation reln);
454 static void AtProcExit_Buffers(int code, Datum arg);
455 static void CheckForBufferLeaks(void);
456 static int rnode_comparator(const void *p1, const void *p2);
457 static int buffertag_comparator(const void *p1, const void *p2);
458 static int ckpt_buforder_comparator(const void *pa, const void *pb);
459 static int ts_ckpt_progress_comparator(Datum a, Datum b, void *arg);
460
461
462 /*
463 * ComputeIoConcurrency -- get the number of pages to prefetch for a given
464 * number of spindles.
465 */
466 bool
ComputeIoConcurrency(int io_concurrency,double * target)467 ComputeIoConcurrency(int io_concurrency, double *target)
468 {
469 double new_prefetch_pages = 0.0;
470 int i;
471
472 /*
473 * Make sure the io_concurrency value is within valid range; it may have
474 * been forced with a manual pg_tablespace update.
475 */
476 io_concurrency = Min(Max(io_concurrency, 0), MAX_IO_CONCURRENCY);
477
478 /*----------
479 * The user-visible GUC parameter is the number of drives (spindles),
480 * which we need to translate to a number-of-pages-to-prefetch target.
481 * The target value is stashed in *extra and then assigned to the actual
482 * variable by assign_effective_io_concurrency.
483 *
484 * The expected number of prefetch pages needed to keep N drives busy is:
485 *
486 * drives | I/O requests
487 * -------+----------------
488 * 1 | 1
489 * 2 | 2/1 + 2/2 = 3
490 * 3 | 3/1 + 3/2 + 3/3 = 5 1/2
491 * 4 | 4/1 + 4/2 + 4/3 + 4/4 = 8 1/3
492 * n | n * H(n)
493 *
494 * This is called the "coupon collector problem" and H(n) is called the
495 * harmonic series. This could be approximated by n * ln(n), but for
496 * reasonable numbers of drives we might as well just compute the series.
497 *
498 * Alternatively we could set the target to the number of pages necessary
499 * so that the expected number of active spindles is some arbitrary
500 * percentage of the total. This sounds the same but is actually slightly
501 * different. The result ends up being ln(1-P)/ln((n-1)/n) where P is
502 * that desired fraction.
503 *
504 * Experimental results show that both of these formulas aren't aggressive
505 * enough, but we don't really have any better proposals.
506 *
507 * Note that if io_concurrency = 0 (disabled), we must set target = 0.
508 *----------
509 */
510
511 for (i = 1; i <= io_concurrency; i++)
512 new_prefetch_pages += (double) io_concurrency / (double) i;
513
514 *target = new_prefetch_pages;
515
516 /* This range check shouldn't fail, but let's be paranoid */
517 return (new_prefetch_pages >= 0.0 && new_prefetch_pages < (double) INT_MAX);
518 }
519
520 /*
521 * PrefetchBuffer -- initiate asynchronous read of a block of a relation
522 *
523 * This is named by analogy to ReadBuffer but doesn't actually allocate a
524 * buffer. Instead it tries to ensure that a future ReadBuffer for the given
525 * block will not be delayed by the I/O. Prefetching is optional.
526 * No-op if prefetching isn't compiled in.
527 */
528 void
PrefetchBuffer(Relation reln,ForkNumber forkNum,BlockNumber blockNum)529 PrefetchBuffer(Relation reln, ForkNumber forkNum, BlockNumber blockNum)
530 {
531 #ifdef USE_PREFETCH
532 Assert(RelationIsValid(reln));
533 Assert(BlockNumberIsValid(blockNum));
534
535 /* Open it at the smgr level if not already done */
536 RelationOpenSmgr(reln);
537
538 if (RelationUsesLocalBuffers(reln))
539 {
540 /* see comments in ReadBufferExtended */
541 if (RELATION_IS_OTHER_TEMP(reln))
542 ereport(ERROR,
543 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
544 errmsg("cannot access temporary tables of other sessions")));
545
546 /* pass it off to localbuf.c */
547 LocalPrefetchBuffer(reln->rd_smgr, forkNum, blockNum);
548 }
549 else
550 {
551 BufferTag newTag; /* identity of requested block */
552 uint32 newHash; /* hash value for newTag */
553 LWLock *newPartitionLock; /* buffer partition lock for it */
554 int buf_id;
555
556 /* create a tag so we can lookup the buffer */
557 INIT_BUFFERTAG(newTag, reln->rd_smgr->smgr_rnode.node,
558 forkNum, blockNum);
559
560 /* determine its hash code and partition lock ID */
561 newHash = BufTableHashCode(&newTag);
562 newPartitionLock = BufMappingPartitionLock(newHash);
563
564 /* see if the block is in the buffer pool already */
565 LWLockAcquire(newPartitionLock, LW_SHARED);
566 buf_id = BufTableLookup(&newTag, newHash);
567 LWLockRelease(newPartitionLock);
568
569 /* If not in buffers, initiate prefetch */
570 if (buf_id < 0)
571 smgrprefetch(reln->rd_smgr, forkNum, blockNum);
572
573 /*
574 * If the block *is* in buffers, we do nothing. This is not really
575 * ideal: the block might be just about to be evicted, which would be
576 * stupid since we know we are going to need it soon. But the only
577 * easy answer is to bump the usage_count, which does not seem like a
578 * great solution: when the caller does ultimately touch the block,
579 * usage_count would get bumped again, resulting in too much
580 * favoritism for blocks that are involved in a prefetch sequence. A
581 * real fix would involve some additional per-buffer state, and it's
582 * not clear that there's enough of a problem to justify that.
583 */
584 }
585 #endif /* USE_PREFETCH */
586 }
587
588
589 /*
590 * ReadBuffer -- a shorthand for ReadBufferExtended, for reading from main
591 * fork with RBM_NORMAL mode and default strategy.
592 */
593 Buffer
ReadBuffer(Relation reln,BlockNumber blockNum)594 ReadBuffer(Relation reln, BlockNumber blockNum)
595 {
596 return ReadBufferExtended(reln, MAIN_FORKNUM, blockNum, RBM_NORMAL, NULL);
597 }
598
599 /*
600 * ReadBufferExtended -- returns a buffer containing the requested
601 * block of the requested relation. If the blknum
602 * requested is P_NEW, extend the relation file and
603 * allocate a new block. (Caller is responsible for
604 * ensuring that only one backend tries to extend a
605 * relation at the same time!)
606 *
607 * Returns: the buffer number for the buffer containing
608 * the block read. The returned buffer has been pinned.
609 * Does not return on error --- elog's instead.
610 *
611 * Assume when this function is called, that reln has been opened already.
612 *
613 * In RBM_NORMAL mode, the page is read from disk, and the page header is
614 * validated. An error is thrown if the page header is not valid. (But
615 * note that an all-zero page is considered "valid"; see PageIsVerified().)
616 *
617 * RBM_ZERO_ON_ERROR is like the normal mode, but if the page header is not
618 * valid, the page is zeroed instead of throwing an error. This is intended
619 * for non-critical data, where the caller is prepared to repair errors.
620 *
621 * In RBM_ZERO_AND_LOCK mode, if the page isn't in buffer cache already, it's
622 * filled with zeros instead of reading it from disk. Useful when the caller
623 * is going to fill the page from scratch, since this saves I/O and avoids
624 * unnecessary failure if the page-on-disk has corrupt page headers.
625 * The page is returned locked to ensure that the caller has a chance to
626 * initialize the page before it's made visible to others.
627 * Caution: do not use this mode to read a page that is beyond the relation's
628 * current physical EOF; that is likely to cause problems in md.c when
629 * the page is modified and written out. P_NEW is OK, though.
630 *
631 * RBM_ZERO_AND_CLEANUP_LOCK is the same as RBM_ZERO_AND_LOCK, but acquires
632 * a cleanup-strength lock on the page.
633 *
634 * RBM_NORMAL_NO_LOG mode is treated the same as RBM_NORMAL here.
635 *
636 * If strategy is not NULL, a nondefault buffer access strategy is used.
637 * See buffer/README for details.
638 */
639 Buffer
ReadBufferExtended(Relation reln,ForkNumber forkNum,BlockNumber blockNum,ReadBufferMode mode,BufferAccessStrategy strategy)640 ReadBufferExtended(Relation reln, ForkNumber forkNum, BlockNumber blockNum,
641 ReadBufferMode mode, BufferAccessStrategy strategy)
642 {
643 bool hit;
644 Buffer buf;
645
646 /* Open it at the smgr level if not already done */
647 RelationOpenSmgr(reln);
648
649 /*
650 * Reject attempts to read non-local temporary relations; we would be
651 * likely to get wrong data since we have no visibility into the owning
652 * session's local buffers.
653 */
654 if (RELATION_IS_OTHER_TEMP(reln))
655 ereport(ERROR,
656 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
657 errmsg("cannot access temporary tables of other sessions")));
658
659 /*
660 * Read the buffer, and update pgstat counters to reflect a cache hit or
661 * miss.
662 */
663 pgstat_count_buffer_read(reln);
664 buf = ReadBuffer_common(reln->rd_smgr, reln->rd_rel->relpersistence,
665 forkNum, blockNum, mode, strategy, &hit);
666 if (hit)
667 pgstat_count_buffer_hit(reln);
668 return buf;
669 }
670
671
672 /*
673 * ReadBufferWithoutRelcache -- like ReadBufferExtended, but doesn't require
674 * a relcache entry for the relation.
675 *
676 * NB: At present, this function may only be used on permanent relations, which
677 * is OK, because we only use it during XLOG replay. If in the future we
678 * want to use it on temporary or unlogged relations, we could pass additional
679 * parameters.
680 */
681 Buffer
ReadBufferWithoutRelcache(RelFileNode rnode,ForkNumber forkNum,BlockNumber blockNum,ReadBufferMode mode,BufferAccessStrategy strategy)682 ReadBufferWithoutRelcache(RelFileNode rnode, ForkNumber forkNum,
683 BlockNumber blockNum, ReadBufferMode mode,
684 BufferAccessStrategy strategy)
685 {
686 bool hit;
687
688 SMgrRelation smgr = smgropen(rnode, InvalidBackendId);
689
690 Assert(InRecovery);
691
692 return ReadBuffer_common(smgr, RELPERSISTENCE_PERMANENT, forkNum, blockNum,
693 mode, strategy, &hit);
694 }
695
696
697 /*
698 * ReadBuffer_common -- common logic for all ReadBuffer variants
699 *
700 * *hit is set to true if the request was satisfied from shared buffer cache.
701 */
702 static Buffer
ReadBuffer_common(SMgrRelation smgr,char relpersistence,ForkNumber forkNum,BlockNumber blockNum,ReadBufferMode mode,BufferAccessStrategy strategy,bool * hit)703 ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
704 BlockNumber blockNum, ReadBufferMode mode,
705 BufferAccessStrategy strategy, bool *hit)
706 {
707 BufferDesc *bufHdr;
708 Block bufBlock;
709 bool found;
710 bool isExtend;
711 bool isLocalBuf = SmgrIsTemp(smgr);
712
713 *hit = false;
714
715 /* Make sure we will have room to remember the buffer pin */
716 ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
717
718 isExtend = (blockNum == P_NEW);
719
720 TRACE_POSTGRESQL_BUFFER_READ_START(forkNum, blockNum,
721 smgr->smgr_rnode.node.spcNode,
722 smgr->smgr_rnode.node.dbNode,
723 smgr->smgr_rnode.node.relNode,
724 smgr->smgr_rnode.backend,
725 isExtend);
726
727 /* Substitute proper block number if caller asked for P_NEW */
728 if (isExtend)
729 {
730 blockNum = smgrnblocks(smgr, forkNum);
731 /* Fail if relation is already at maximum possible length */
732 if (blockNum == P_NEW)
733 ereport(ERROR,
734 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
735 errmsg("cannot extend relation %s beyond %u blocks",
736 relpath(smgr->smgr_rnode, forkNum),
737 P_NEW)));
738 }
739
740 if (isLocalBuf)
741 {
742 bufHdr = LocalBufferAlloc(smgr, forkNum, blockNum, &found);
743 if (found)
744 pgBufferUsage.local_blks_hit++;
745 else
746 pgBufferUsage.local_blks_read++;
747 }
748 else
749 {
750 /*
751 * lookup the buffer. IO_IN_PROGRESS is set if the requested block is
752 * not currently in memory.
753 */
754 bufHdr = BufferAlloc(smgr, relpersistence, forkNum, blockNum,
755 strategy, &found);
756 if (found)
757 pgBufferUsage.shared_blks_hit++;
758 else
759 pgBufferUsage.shared_blks_read++;
760 }
761
762 /* At this point we do NOT hold any locks. */
763
764 /* if it was already in the buffer pool, we're done */
765 if (found)
766 {
767 if (!isExtend)
768 {
769 /* Just need to update stats before we exit */
770 *hit = true;
771 VacuumPageHit++;
772
773 if (VacuumCostActive)
774 VacuumCostBalance += VacuumCostPageHit;
775
776 TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum,
777 smgr->smgr_rnode.node.spcNode,
778 smgr->smgr_rnode.node.dbNode,
779 smgr->smgr_rnode.node.relNode,
780 smgr->smgr_rnode.backend,
781 isExtend,
782 found);
783
784 /*
785 * In RBM_ZERO_AND_LOCK mode the caller expects the page to be
786 * locked on return.
787 */
788 if (!isLocalBuf)
789 {
790 if (mode == RBM_ZERO_AND_LOCK)
791 LWLockAcquire(BufferDescriptorGetContentLock(bufHdr),
792 LW_EXCLUSIVE);
793 else if (mode == RBM_ZERO_AND_CLEANUP_LOCK)
794 LockBufferForCleanup(BufferDescriptorGetBuffer(bufHdr));
795 }
796
797 return BufferDescriptorGetBuffer(bufHdr);
798 }
799
800 /*
801 * We get here only in the corner case where we are trying to extend
802 * the relation but we found a pre-existing buffer marked BM_VALID.
803 * This can happen because mdread doesn't complain about reads beyond
804 * EOF (when zero_damaged_pages is ON) and so a previous attempt to
805 * read a block beyond EOF could have left a "valid" zero-filled
806 * buffer. Unfortunately, we have also seen this case occurring
807 * because of buggy Linux kernels that sometimes return an
808 * lseek(SEEK_END) result that doesn't account for a recent write. In
809 * that situation, the pre-existing buffer would contain valid data
810 * that we don't want to overwrite. Since the legitimate case should
811 * always have left a zero-filled buffer, complain if not PageIsNew.
812 */
813 bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr);
814 if (!PageIsNew((Page) bufBlock))
815 ereport(ERROR,
816 (errmsg("unexpected data beyond EOF in block %u of relation %s",
817 blockNum, relpath(smgr->smgr_rnode, forkNum)),
818 errhint("This has been seen to occur with buggy kernels; consider updating your system.")));
819
820 /*
821 * We *must* do smgrextend before succeeding, else the page will not
822 * be reserved by the kernel, and the next P_NEW call will decide to
823 * return the same page. Clear the BM_VALID bit, do the StartBufferIO
824 * call that BufferAlloc didn't, and proceed.
825 */
826 if (isLocalBuf)
827 {
828 /* Only need to adjust flags */
829 uint32 buf_state = pg_atomic_read_u32(&bufHdr->state);
830
831 Assert(buf_state & BM_VALID);
832 buf_state &= ~BM_VALID;
833 pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
834 }
835 else
836 {
837 /*
838 * Loop to handle the very small possibility that someone re-sets
839 * BM_VALID between our clearing it and StartBufferIO inspecting
840 * it.
841 */
842 do
843 {
844 uint32 buf_state = LockBufHdr(bufHdr);
845
846 Assert(buf_state & BM_VALID);
847 buf_state &= ~BM_VALID;
848 UnlockBufHdr(bufHdr, buf_state);
849 } while (!StartBufferIO(bufHdr, true));
850 }
851 }
852
853 /*
854 * if we have gotten to this point, we have allocated a buffer for the
855 * page but its contents are not yet valid. IO_IN_PROGRESS is set for it,
856 * if it's a shared buffer.
857 *
858 * Note: if smgrextend fails, we will end up with a buffer that is
859 * allocated but not marked BM_VALID. P_NEW will still select the same
860 * block number (because the relation didn't get any longer on disk) and
861 * so future attempts to extend the relation will find the same buffer (if
862 * it's not been recycled) but come right back here to try smgrextend
863 * again.
864 */
865 Assert(!(pg_atomic_read_u32(&bufHdr->state) & BM_VALID)); /* spinlock not needed */
866
867 bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr);
868
869 if (isExtend)
870 {
871 /* new buffers are zero-filled */
872 MemSet((char *) bufBlock, 0, BLCKSZ);
873 /* don't set checksum for all-zero page */
874 smgrextend(smgr, forkNum, blockNum, (char *) bufBlock, false);
875
876 /*
877 * NB: we're *not* doing a ScheduleBufferTagForWriteback here;
878 * although we're essentially performing a write. At least on linux
879 * doing so defeats the 'delayed allocation' mechanism, leading to
880 * increased file fragmentation.
881 */
882 }
883 else
884 {
885 /*
886 * Read in the page, unless the caller intends to overwrite it and
887 * just wants us to allocate a buffer.
888 */
889 if (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK)
890 MemSet((char *) bufBlock, 0, BLCKSZ);
891 else
892 {
893 instr_time io_start,
894 io_time;
895
896 if (track_io_timing)
897 INSTR_TIME_SET_CURRENT(io_start);
898
899 smgrread(smgr, forkNum, blockNum, (char *) bufBlock);
900
901 if (track_io_timing)
902 {
903 INSTR_TIME_SET_CURRENT(io_time);
904 INSTR_TIME_SUBTRACT(io_time, io_start);
905 pgstat_count_buffer_read_time(INSTR_TIME_GET_MICROSEC(io_time));
906 INSTR_TIME_ADD(pgBufferUsage.blk_read_time, io_time);
907 }
908
909 /* check for garbage data */
910 if (!PageIsVerified((Page) bufBlock, blockNum))
911 {
912 if (mode == RBM_ZERO_ON_ERROR || zero_damaged_pages)
913 {
914 ereport(WARNING,
915 (errcode(ERRCODE_DATA_CORRUPTED),
916 errmsg("invalid page in block %u of relation %s; zeroing out page",
917 blockNum,
918 relpath(smgr->smgr_rnode, forkNum))));
919 MemSet((char *) bufBlock, 0, BLCKSZ);
920 }
921 else
922 ereport(ERROR,
923 (errcode(ERRCODE_DATA_CORRUPTED),
924 errmsg("invalid page in block %u of relation %s",
925 blockNum,
926 relpath(smgr->smgr_rnode, forkNum))));
927 }
928 }
929 }
930
931 /*
932 * In RBM_ZERO_AND_LOCK mode, grab the buffer content lock before marking
933 * the page as valid, to make sure that no other backend sees the zeroed
934 * page before the caller has had a chance to initialize it.
935 *
936 * Since no-one else can be looking at the page contents yet, there is no
937 * difference between an exclusive lock and a cleanup-strength lock. (Note
938 * that we cannot use LockBuffer() or LockBufferForCleanup() here, because
939 * they assert that the buffer is already valid.)
940 */
941 if ((mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK) &&
942 !isLocalBuf)
943 {
944 LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_EXCLUSIVE);
945 }
946
947 if (isLocalBuf)
948 {
949 /* Only need to adjust flags */
950 uint32 buf_state = pg_atomic_read_u32(&bufHdr->state);
951
952 buf_state |= BM_VALID;
953 pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
954 }
955 else
956 {
957 /* Set BM_VALID, terminate IO, and wake up any waiters */
958 TerminateBufferIO(bufHdr, false, BM_VALID);
959 }
960
961 VacuumPageMiss++;
962 if (VacuumCostActive)
963 VacuumCostBalance += VacuumCostPageMiss;
964
965 TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum,
966 smgr->smgr_rnode.node.spcNode,
967 smgr->smgr_rnode.node.dbNode,
968 smgr->smgr_rnode.node.relNode,
969 smgr->smgr_rnode.backend,
970 isExtend,
971 found);
972
973 return BufferDescriptorGetBuffer(bufHdr);
974 }
975
976 /*
977 * BufferAlloc -- subroutine for ReadBuffer. Handles lookup of a shared
978 * buffer. If no buffer exists already, selects a replacement
979 * victim and evicts the old page, but does NOT read in new page.
980 *
981 * "strategy" can be a buffer replacement strategy object, or NULL for
982 * the default strategy. The selected buffer's usage_count is advanced when
983 * using the default strategy, but otherwise possibly not (see PinBuffer).
984 *
985 * The returned buffer is pinned and is already marked as holding the
986 * desired page. If it already did have the desired page, *foundPtr is
987 * set TRUE. Otherwise, *foundPtr is set FALSE and the buffer is marked
988 * as IO_IN_PROGRESS; ReadBuffer will now need to do I/O to fill it.
989 *
990 * *foundPtr is actually redundant with the buffer's BM_VALID flag, but
991 * we keep it for simplicity in ReadBuffer.
992 *
993 * No locks are held either at entry or exit.
994 */
995 static BufferDesc *
BufferAlloc(SMgrRelation smgr,char relpersistence,ForkNumber forkNum,BlockNumber blockNum,BufferAccessStrategy strategy,bool * foundPtr)996 BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
997 BlockNumber blockNum,
998 BufferAccessStrategy strategy,
999 bool *foundPtr)
1000 {
1001 BufferTag newTag; /* identity of requested block */
1002 uint32 newHash; /* hash value for newTag */
1003 LWLock *newPartitionLock; /* buffer partition lock for it */
1004 BufferTag oldTag; /* previous identity of selected buffer */
1005 uint32 oldHash; /* hash value for oldTag */
1006 LWLock *oldPartitionLock; /* buffer partition lock for it */
1007 uint32 oldFlags;
1008 int buf_id;
1009 BufferDesc *buf;
1010 bool valid;
1011 uint32 buf_state;
1012
1013 /* create a tag so we can lookup the buffer */
1014 INIT_BUFFERTAG(newTag, smgr->smgr_rnode.node, forkNum, blockNum);
1015
1016 /* determine its hash code and partition lock ID */
1017 newHash = BufTableHashCode(&newTag);
1018 newPartitionLock = BufMappingPartitionLock(newHash);
1019
1020 /* see if the block is in the buffer pool already */
1021 LWLockAcquire(newPartitionLock, LW_SHARED);
1022 buf_id = BufTableLookup(&newTag, newHash);
1023 if (buf_id >= 0)
1024 {
1025 /*
1026 * Found it. Now, pin the buffer so no one can steal it from the
1027 * buffer pool, and check to see if the correct data has been loaded
1028 * into the buffer.
1029 */
1030 buf = GetBufferDescriptor(buf_id);
1031
1032 valid = PinBuffer(buf, strategy);
1033
1034 /* Can release the mapping lock as soon as we've pinned it */
1035 LWLockRelease(newPartitionLock);
1036
1037 *foundPtr = TRUE;
1038
1039 if (!valid)
1040 {
1041 /*
1042 * We can only get here if (a) someone else is still reading in
1043 * the page, or (b) a previous read attempt failed. We have to
1044 * wait for any active read attempt to finish, and then set up our
1045 * own read attempt if the page is still not BM_VALID.
1046 * StartBufferIO does it all.
1047 */
1048 if (StartBufferIO(buf, true))
1049 {
1050 /*
1051 * If we get here, previous attempts to read the buffer must
1052 * have failed ... but we shall bravely try again.
1053 */
1054 *foundPtr = FALSE;
1055 }
1056 }
1057
1058 return buf;
1059 }
1060
1061 /*
1062 * Didn't find it in the buffer pool. We'll have to initialize a new
1063 * buffer. Remember to unlock the mapping lock while doing the work.
1064 */
1065 LWLockRelease(newPartitionLock);
1066
1067 /* Loop here in case we have to try another victim buffer */
1068 for (;;)
1069 {
1070 /*
1071 * Ensure, while the spinlock's not yet held, that there's a free
1072 * refcount entry.
1073 */
1074 ReservePrivateRefCountEntry();
1075
1076 /*
1077 * Select a victim buffer. The buffer is returned with its header
1078 * spinlock still held!
1079 */
1080 buf = StrategyGetBuffer(strategy, &buf_state);
1081
1082 Assert(BUF_STATE_GET_REFCOUNT(buf_state) == 0);
1083
1084 /* Must copy buffer flags while we still hold the spinlock */
1085 oldFlags = buf_state & BUF_FLAG_MASK;
1086
1087 /* Pin the buffer and then release the buffer spinlock */
1088 PinBuffer_Locked(buf);
1089
1090 /*
1091 * If the buffer was dirty, try to write it out. There is a race
1092 * condition here, in that someone might dirty it after we released it
1093 * above, or even while we are writing it out (since our share-lock
1094 * won't prevent hint-bit updates). We will recheck the dirty bit
1095 * after re-locking the buffer header.
1096 */
1097 if (oldFlags & BM_DIRTY)
1098 {
1099 /*
1100 * We need a share-lock on the buffer contents to write it out
1101 * (else we might write invalid data, eg because someone else is
1102 * compacting the page contents while we write). We must use a
1103 * conditional lock acquisition here to avoid deadlock. Even
1104 * though the buffer was not pinned (and therefore surely not
1105 * locked) when StrategyGetBuffer returned it, someone else could
1106 * have pinned and exclusive-locked it by the time we get here. If
1107 * we try to get the lock unconditionally, we'd block waiting for
1108 * them; if they later block waiting for us, deadlock ensues.
1109 * (This has been observed to happen when two backends are both
1110 * trying to split btree index pages, and the second one just
1111 * happens to be trying to split the page the first one got from
1112 * StrategyGetBuffer.)
1113 */
1114 if (LWLockConditionalAcquire(BufferDescriptorGetContentLock(buf),
1115 LW_SHARED))
1116 {
1117 /*
1118 * If using a nondefault strategy, and writing the buffer
1119 * would require a WAL flush, let the strategy decide whether
1120 * to go ahead and write/reuse the buffer or to choose another
1121 * victim. We need lock to inspect the page LSN, so this
1122 * can't be done inside StrategyGetBuffer.
1123 */
1124 if (strategy != NULL)
1125 {
1126 XLogRecPtr lsn;
1127
1128 /* Read the LSN while holding buffer header lock */
1129 buf_state = LockBufHdr(buf);
1130 lsn = BufferGetLSN(buf);
1131 UnlockBufHdr(buf, buf_state);
1132
1133 if (XLogNeedsFlush(lsn) &&
1134 StrategyRejectBuffer(strategy, buf))
1135 {
1136 /* Drop lock/pin and loop around for another buffer */
1137 LWLockRelease(BufferDescriptorGetContentLock(buf));
1138 UnpinBuffer(buf, true);
1139 continue;
1140 }
1141 }
1142
1143 /* OK, do the I/O */
1144 TRACE_POSTGRESQL_BUFFER_WRITE_DIRTY_START(forkNum, blockNum,
1145 smgr->smgr_rnode.node.spcNode,
1146 smgr->smgr_rnode.node.dbNode,
1147 smgr->smgr_rnode.node.relNode);
1148
1149 FlushBuffer(buf, NULL);
1150 LWLockRelease(BufferDescriptorGetContentLock(buf));
1151
1152 ScheduleBufferTagForWriteback(&BackendWritebackContext,
1153 &buf->tag);
1154
1155 TRACE_POSTGRESQL_BUFFER_WRITE_DIRTY_DONE(forkNum, blockNum,
1156 smgr->smgr_rnode.node.spcNode,
1157 smgr->smgr_rnode.node.dbNode,
1158 smgr->smgr_rnode.node.relNode);
1159 }
1160 else
1161 {
1162 /*
1163 * Someone else has locked the buffer, so give it up and loop
1164 * back to get another one.
1165 */
1166 UnpinBuffer(buf, true);
1167 continue;
1168 }
1169 }
1170
1171 /*
1172 * To change the association of a valid buffer, we'll need to have
1173 * exclusive lock on both the old and new mapping partitions.
1174 */
1175 if (oldFlags & BM_TAG_VALID)
1176 {
1177 /*
1178 * Need to compute the old tag's hashcode and partition lock ID.
1179 * XXX is it worth storing the hashcode in BufferDesc so we need
1180 * not recompute it here? Probably not.
1181 */
1182 oldTag = buf->tag;
1183 oldHash = BufTableHashCode(&oldTag);
1184 oldPartitionLock = BufMappingPartitionLock(oldHash);
1185
1186 /*
1187 * Must lock the lower-numbered partition first to avoid
1188 * deadlocks.
1189 */
1190 if (oldPartitionLock < newPartitionLock)
1191 {
1192 LWLockAcquire(oldPartitionLock, LW_EXCLUSIVE);
1193 LWLockAcquire(newPartitionLock, LW_EXCLUSIVE);
1194 }
1195 else if (oldPartitionLock > newPartitionLock)
1196 {
1197 LWLockAcquire(newPartitionLock, LW_EXCLUSIVE);
1198 LWLockAcquire(oldPartitionLock, LW_EXCLUSIVE);
1199 }
1200 else
1201 {
1202 /* only one partition, only one lock */
1203 LWLockAcquire(newPartitionLock, LW_EXCLUSIVE);
1204 }
1205 }
1206 else
1207 {
1208 /* if it wasn't valid, we need only the new partition */
1209 LWLockAcquire(newPartitionLock, LW_EXCLUSIVE);
1210 /* remember we have no old-partition lock or tag */
1211 oldPartitionLock = NULL;
1212 /* this just keeps the compiler quiet about uninit variables */
1213 oldHash = 0;
1214 }
1215
1216 /*
1217 * Try to make a hashtable entry for the buffer under its new tag.
1218 * This could fail because while we were writing someone else
1219 * allocated another buffer for the same block we want to read in.
1220 * Note that we have not yet removed the hashtable entry for the old
1221 * tag.
1222 */
1223 buf_id = BufTableInsert(&newTag, newHash, buf->buf_id);
1224
1225 if (buf_id >= 0)
1226 {
1227 /*
1228 * Got a collision. Someone has already done what we were about to
1229 * do. We'll just handle this as if it were found in the buffer
1230 * pool in the first place. First, give up the buffer we were
1231 * planning to use.
1232 */
1233 UnpinBuffer(buf, true);
1234
1235 /* Can give up that buffer's mapping partition lock now */
1236 if (oldPartitionLock != NULL &&
1237 oldPartitionLock != newPartitionLock)
1238 LWLockRelease(oldPartitionLock);
1239
1240 /* remaining code should match code at top of routine */
1241
1242 buf = GetBufferDescriptor(buf_id);
1243
1244 valid = PinBuffer(buf, strategy);
1245
1246 /* Can release the mapping lock as soon as we've pinned it */
1247 LWLockRelease(newPartitionLock);
1248
1249 *foundPtr = TRUE;
1250
1251 if (!valid)
1252 {
1253 /*
1254 * We can only get here if (a) someone else is still reading
1255 * in the page, or (b) a previous read attempt failed. We
1256 * have to wait for any active read attempt to finish, and
1257 * then set up our own read attempt if the page is still not
1258 * BM_VALID. StartBufferIO does it all.
1259 */
1260 if (StartBufferIO(buf, true))
1261 {
1262 /*
1263 * If we get here, previous attempts to read the buffer
1264 * must have failed ... but we shall bravely try again.
1265 */
1266 *foundPtr = FALSE;
1267 }
1268 }
1269
1270 return buf;
1271 }
1272
1273 /*
1274 * Need to lock the buffer header too in order to change its tag.
1275 */
1276 buf_state = LockBufHdr(buf);
1277
1278 /*
1279 * Somebody could have pinned or re-dirtied the buffer while we were
1280 * doing the I/O and making the new hashtable entry. If so, we can't
1281 * recycle this buffer; we must undo everything we've done and start
1282 * over with a new victim buffer.
1283 */
1284 oldFlags = buf_state & BUF_FLAG_MASK;
1285 if (BUF_STATE_GET_REFCOUNT(buf_state) == 1 && !(oldFlags & BM_DIRTY))
1286 break;
1287
1288 UnlockBufHdr(buf, buf_state);
1289 BufTableDelete(&newTag, newHash);
1290 if (oldPartitionLock != NULL &&
1291 oldPartitionLock != newPartitionLock)
1292 LWLockRelease(oldPartitionLock);
1293 LWLockRelease(newPartitionLock);
1294 UnpinBuffer(buf, true);
1295 }
1296
1297 /*
1298 * Okay, it's finally safe to rename the buffer.
1299 *
1300 * Clearing BM_VALID here is necessary, clearing the dirtybits is just
1301 * paranoia. We also reset the usage_count since any recency of use of
1302 * the old content is no longer relevant. (The usage_count starts out at
1303 * 1 so that the buffer can survive one clock-sweep pass.)
1304 *
1305 * Make sure BM_PERMANENT is set for buffers that must be written at every
1306 * checkpoint. Unlogged buffers only need to be written at shutdown
1307 * checkpoints, except for their "init" forks, which need to be treated
1308 * just like permanent relations.
1309 */
1310 buf->tag = newTag;
1311 buf_state &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED |
1312 BM_CHECKPOINT_NEEDED | BM_IO_ERROR | BM_PERMANENT |
1313 BUF_USAGECOUNT_MASK);
1314 if (relpersistence == RELPERSISTENCE_PERMANENT || forkNum == INIT_FORKNUM)
1315 buf_state |= BM_TAG_VALID | BM_PERMANENT | BUF_USAGECOUNT_ONE;
1316 else
1317 buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
1318
1319 UnlockBufHdr(buf, buf_state);
1320
1321 if (oldPartitionLock != NULL)
1322 {
1323 BufTableDelete(&oldTag, oldHash);
1324 if (oldPartitionLock != newPartitionLock)
1325 LWLockRelease(oldPartitionLock);
1326 }
1327
1328 LWLockRelease(newPartitionLock);
1329
1330 /*
1331 * Buffer contents are currently invalid. Try to get the io_in_progress
1332 * lock. If StartBufferIO returns false, then someone else managed to
1333 * read it before we did, so there's nothing left for BufferAlloc() to do.
1334 */
1335 if (StartBufferIO(buf, true))
1336 *foundPtr = FALSE;
1337 else
1338 *foundPtr = TRUE;
1339
1340 return buf;
1341 }
1342
1343 /*
1344 * InvalidateBuffer -- mark a shared buffer invalid and return it to the
1345 * freelist.
1346 *
1347 * The buffer header spinlock must be held at entry. We drop it before
1348 * returning. (This is sane because the caller must have locked the
1349 * buffer in order to be sure it should be dropped.)
1350 *
1351 * This is used only in contexts such as dropping a relation. We assume
1352 * that no other backend could possibly be interested in using the page,
1353 * so the only reason the buffer might be pinned is if someone else is
1354 * trying to write it out. We have to let them finish before we can
1355 * reclaim the buffer.
1356 *
1357 * The buffer could get reclaimed by someone else while we are waiting
1358 * to acquire the necessary locks; if so, don't mess it up.
1359 */
1360 static void
InvalidateBuffer(BufferDesc * buf)1361 InvalidateBuffer(BufferDesc *buf)
1362 {
1363 BufferTag oldTag;
1364 uint32 oldHash; /* hash value for oldTag */
1365 LWLock *oldPartitionLock; /* buffer partition lock for it */
1366 uint32 oldFlags;
1367 uint32 buf_state;
1368
1369 /* Save the original buffer tag before dropping the spinlock */
1370 oldTag = buf->tag;
1371
1372 buf_state = pg_atomic_read_u32(&buf->state);
1373 Assert(buf_state & BM_LOCKED);
1374 UnlockBufHdr(buf, buf_state);
1375
1376 /*
1377 * Need to compute the old tag's hashcode and partition lock ID. XXX is it
1378 * worth storing the hashcode in BufferDesc so we need not recompute it
1379 * here? Probably not.
1380 */
1381 oldHash = BufTableHashCode(&oldTag);
1382 oldPartitionLock = BufMappingPartitionLock(oldHash);
1383
1384 retry:
1385
1386 /*
1387 * Acquire exclusive mapping lock in preparation for changing the buffer's
1388 * association.
1389 */
1390 LWLockAcquire(oldPartitionLock, LW_EXCLUSIVE);
1391
1392 /* Re-lock the buffer header */
1393 buf_state = LockBufHdr(buf);
1394
1395 /* If it's changed while we were waiting for lock, do nothing */
1396 if (!BUFFERTAGS_EQUAL(buf->tag, oldTag))
1397 {
1398 UnlockBufHdr(buf, buf_state);
1399 LWLockRelease(oldPartitionLock);
1400 return;
1401 }
1402
1403 /*
1404 * We assume the only reason for it to be pinned is that someone else is
1405 * flushing the page out. Wait for them to finish. (This could be an
1406 * infinite loop if the refcount is messed up... it would be nice to time
1407 * out after awhile, but there seems no way to be sure how many loops may
1408 * be needed. Note that if the other guy has pinned the buffer but not
1409 * yet done StartBufferIO, WaitIO will fall through and we'll effectively
1410 * be busy-looping here.)
1411 */
1412 if (BUF_STATE_GET_REFCOUNT(buf_state) != 0)
1413 {
1414 UnlockBufHdr(buf, buf_state);
1415 LWLockRelease(oldPartitionLock);
1416 /* safety check: should definitely not be our *own* pin */
1417 if (GetPrivateRefCount(BufferDescriptorGetBuffer(buf)) > 0)
1418 elog(ERROR, "buffer is pinned in InvalidateBuffer");
1419 WaitIO(buf);
1420 goto retry;
1421 }
1422
1423 /*
1424 * Clear out the buffer's tag and flags. We must do this to ensure that
1425 * linear scans of the buffer array don't think the buffer is valid.
1426 */
1427 oldFlags = buf_state & BUF_FLAG_MASK;
1428 CLEAR_BUFFERTAG(buf->tag);
1429 buf_state &= ~(BUF_FLAG_MASK | BUF_USAGECOUNT_MASK);
1430 UnlockBufHdr(buf, buf_state);
1431
1432 /*
1433 * Remove the buffer from the lookup hashtable, if it was in there.
1434 */
1435 if (oldFlags & BM_TAG_VALID)
1436 BufTableDelete(&oldTag, oldHash);
1437
1438 /*
1439 * Done with mapping lock.
1440 */
1441 LWLockRelease(oldPartitionLock);
1442
1443 /*
1444 * Insert the buffer at the head of the list of free buffers.
1445 */
1446 StrategyFreeBuffer(buf);
1447 }
1448
1449 /*
1450 * MarkBufferDirty
1451 *
1452 * Marks buffer contents as dirty (actual write happens later).
1453 *
1454 * Buffer must be pinned and exclusive-locked. (If caller does not hold
1455 * exclusive lock, then somebody could be in process of writing the buffer,
1456 * leading to risk of bad data written to disk.)
1457 */
1458 void
MarkBufferDirty(Buffer buffer)1459 MarkBufferDirty(Buffer buffer)
1460 {
1461 BufferDesc *bufHdr;
1462 uint32 buf_state;
1463 uint32 old_buf_state;
1464
1465 if (!BufferIsValid(buffer))
1466 elog(ERROR, "bad buffer ID: %d", buffer);
1467
1468 if (BufferIsLocal(buffer))
1469 {
1470 MarkLocalBufferDirty(buffer);
1471 return;
1472 }
1473
1474 bufHdr = GetBufferDescriptor(buffer - 1);
1475
1476 Assert(BufferIsPinned(buffer));
1477 Assert(LWLockHeldByMeInMode(BufferDescriptorGetContentLock(bufHdr),
1478 LW_EXCLUSIVE));
1479
1480 old_buf_state = pg_atomic_read_u32(&bufHdr->state);
1481 for (;;)
1482 {
1483 if (old_buf_state & BM_LOCKED)
1484 old_buf_state = WaitBufHdrUnlocked(bufHdr);
1485
1486 buf_state = old_buf_state;
1487
1488 Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
1489 buf_state |= BM_DIRTY | BM_JUST_DIRTIED;
1490
1491 if (pg_atomic_compare_exchange_u32(&bufHdr->state, &old_buf_state,
1492 buf_state))
1493 break;
1494 }
1495
1496 /*
1497 * If the buffer was not dirty already, do vacuum accounting.
1498 */
1499 if (!(old_buf_state & BM_DIRTY))
1500 {
1501 VacuumPageDirty++;
1502 pgBufferUsage.shared_blks_dirtied++;
1503 if (VacuumCostActive)
1504 VacuumCostBalance += VacuumCostPageDirty;
1505 }
1506 }
1507
1508 /*
1509 * ReleaseAndReadBuffer -- combine ReleaseBuffer() and ReadBuffer()
1510 *
1511 * Formerly, this saved one cycle of acquiring/releasing the BufMgrLock
1512 * compared to calling the two routines separately. Now it's mainly just
1513 * a convenience function. However, if the passed buffer is valid and
1514 * already contains the desired block, we just return it as-is; and that
1515 * does save considerable work compared to a full release and reacquire.
1516 *
1517 * Note: it is OK to pass buffer == InvalidBuffer, indicating that no old
1518 * buffer actually needs to be released. This case is the same as ReadBuffer,
1519 * but can save some tests in the caller.
1520 */
1521 Buffer
ReleaseAndReadBuffer(Buffer buffer,Relation relation,BlockNumber blockNum)1522 ReleaseAndReadBuffer(Buffer buffer,
1523 Relation relation,
1524 BlockNumber blockNum)
1525 {
1526 ForkNumber forkNum = MAIN_FORKNUM;
1527 BufferDesc *bufHdr;
1528
1529 if (BufferIsValid(buffer))
1530 {
1531 Assert(BufferIsPinned(buffer));
1532 if (BufferIsLocal(buffer))
1533 {
1534 bufHdr = GetLocalBufferDescriptor(-buffer - 1);
1535 if (bufHdr->tag.blockNum == blockNum &&
1536 RelFileNodeEquals(bufHdr->tag.rnode, relation->rd_node) &&
1537 bufHdr->tag.forkNum == forkNum)
1538 return buffer;
1539 ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer);
1540 LocalRefCount[-buffer - 1]--;
1541 }
1542 else
1543 {
1544 bufHdr = GetBufferDescriptor(buffer - 1);
1545 /* we have pin, so it's ok to examine tag without spinlock */
1546 if (bufHdr->tag.blockNum == blockNum &&
1547 RelFileNodeEquals(bufHdr->tag.rnode, relation->rd_node) &&
1548 bufHdr->tag.forkNum == forkNum)
1549 return buffer;
1550 UnpinBuffer(bufHdr, true);
1551 }
1552 }
1553
1554 return ReadBuffer(relation, blockNum);
1555 }
1556
1557 /*
1558 * PinBuffer -- make buffer unavailable for replacement.
1559 *
1560 * For the default access strategy, the buffer's usage_count is incremented
1561 * when we first pin it; for other strategies we just make sure the usage_count
1562 * isn't zero. (The idea of the latter is that we don't want synchronized
1563 * heap scans to inflate the count, but we need it to not be zero to discourage
1564 * other backends from stealing buffers from our ring. As long as we cycle
1565 * through the ring faster than the global clock-sweep cycles, buffers in
1566 * our ring won't be chosen as victims for replacement by other backends.)
1567 *
1568 * This should be applied only to shared buffers, never local ones.
1569 *
1570 * Since buffers are pinned/unpinned very frequently, pin buffers without
1571 * taking the buffer header lock; instead update the state variable in loop of
1572 * CAS operations. Hopefully it's just a single CAS.
1573 *
1574 * Note that ResourceOwnerEnlargeBuffers must have been done already.
1575 *
1576 * Returns TRUE if buffer is BM_VALID, else FALSE. This provision allows
1577 * some callers to avoid an extra spinlock cycle.
1578 */
1579 static bool
PinBuffer(BufferDesc * buf,BufferAccessStrategy strategy)1580 PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy)
1581 {
1582 Buffer b = BufferDescriptorGetBuffer(buf);
1583 bool result;
1584 PrivateRefCountEntry *ref;
1585
1586 ref = GetPrivateRefCountEntry(b, true);
1587
1588 if (ref == NULL)
1589 {
1590 uint32 buf_state;
1591 uint32 old_buf_state;
1592
1593 ReservePrivateRefCountEntry();
1594 ref = NewPrivateRefCountEntry(b);
1595
1596 old_buf_state = pg_atomic_read_u32(&buf->state);
1597 for (;;)
1598 {
1599 if (old_buf_state & BM_LOCKED)
1600 old_buf_state = WaitBufHdrUnlocked(buf);
1601
1602 buf_state = old_buf_state;
1603
1604 /* increase refcount */
1605 buf_state += BUF_REFCOUNT_ONE;
1606
1607 if (strategy == NULL)
1608 {
1609 /* Default case: increase usagecount unless already max. */
1610 if (BUF_STATE_GET_USAGECOUNT(buf_state) < BM_MAX_USAGE_COUNT)
1611 buf_state += BUF_USAGECOUNT_ONE;
1612 }
1613 else
1614 {
1615 /*
1616 * Ring buffers shouldn't evict others from pool. Thus we
1617 * don't make usagecount more than 1.
1618 */
1619 if (BUF_STATE_GET_USAGECOUNT(buf_state) == 0)
1620 buf_state += BUF_USAGECOUNT_ONE;
1621 }
1622
1623 if (pg_atomic_compare_exchange_u32(&buf->state, &old_buf_state,
1624 buf_state))
1625 {
1626 result = (buf_state & BM_VALID) != 0;
1627 break;
1628 }
1629 }
1630 }
1631 else
1632 {
1633 /* If we previously pinned the buffer, it must surely be valid */
1634 result = true;
1635 }
1636
1637 ref->refcount++;
1638 Assert(ref->refcount > 0);
1639 ResourceOwnerRememberBuffer(CurrentResourceOwner, b);
1640 return result;
1641 }
1642
1643 /*
1644 * PinBuffer_Locked -- as above, but caller already locked the buffer header.
1645 * The spinlock is released before return.
1646 *
1647 * As this function is called with the spinlock held, the caller has to
1648 * previously call ReservePrivateRefCountEntry().
1649 *
1650 * Currently, no callers of this function want to modify the buffer's
1651 * usage_count at all, so there's no need for a strategy parameter.
1652 * Also we don't bother with a BM_VALID test (the caller could check that for
1653 * itself).
1654 *
1655 * Also all callers only ever use this function when it's known that the
1656 * buffer can't have a preexisting pin by this backend. That allows us to skip
1657 * searching the private refcount array & hash, which is a boon, because the
1658 * spinlock is still held.
1659 *
1660 * Note: use of this routine is frequently mandatory, not just an optimization
1661 * to save a spin lock/unlock cycle, because we need to pin a buffer before
1662 * its state can change under us.
1663 */
1664 static void
PinBuffer_Locked(BufferDesc * buf)1665 PinBuffer_Locked(BufferDesc *buf)
1666 {
1667 Buffer b;
1668 PrivateRefCountEntry *ref;
1669 uint32 buf_state;
1670
1671 /*
1672 * As explained, We don't expect any preexisting pins. That allows us to
1673 * manipulate the PrivateRefCount after releasing the spinlock
1674 */
1675 Assert(GetPrivateRefCountEntry(BufferDescriptorGetBuffer(buf), false) == NULL);
1676
1677 /*
1678 * Since we hold the buffer spinlock, we can update the buffer state and
1679 * release the lock in one operation.
1680 */
1681 buf_state = pg_atomic_read_u32(&buf->state);
1682 Assert(buf_state & BM_LOCKED);
1683 buf_state += BUF_REFCOUNT_ONE;
1684 UnlockBufHdr(buf, buf_state);
1685
1686 b = BufferDescriptorGetBuffer(buf);
1687
1688 ref = NewPrivateRefCountEntry(b);
1689 ref->refcount++;
1690
1691 ResourceOwnerRememberBuffer(CurrentResourceOwner, b);
1692 }
1693
1694 /*
1695 * UnpinBuffer -- make buffer available for replacement.
1696 *
1697 * This should be applied only to shared buffers, never local ones.
1698 *
1699 * Most but not all callers want CurrentResourceOwner to be adjusted.
1700 * Those that don't should pass fixOwner = FALSE.
1701 */
1702 static void
UnpinBuffer(BufferDesc * buf,bool fixOwner)1703 UnpinBuffer(BufferDesc *buf, bool fixOwner)
1704 {
1705 PrivateRefCountEntry *ref;
1706 Buffer b = BufferDescriptorGetBuffer(buf);
1707
1708 /* not moving as we're likely deleting it soon anyway */
1709 ref = GetPrivateRefCountEntry(b, false);
1710 Assert(ref != NULL);
1711
1712 if (fixOwner)
1713 ResourceOwnerForgetBuffer(CurrentResourceOwner, b);
1714
1715 Assert(ref->refcount > 0);
1716 ref->refcount--;
1717 if (ref->refcount == 0)
1718 {
1719 uint32 buf_state;
1720 uint32 old_buf_state;
1721
1722 /* I'd better not still hold any locks on the buffer */
1723 Assert(!LWLockHeldByMe(BufferDescriptorGetContentLock(buf)));
1724 Assert(!LWLockHeldByMe(BufferDescriptorGetIOLock(buf)));
1725
1726 /*
1727 * Decrement the shared reference count.
1728 *
1729 * Since buffer spinlock holder can update status using just write,
1730 * it's not safe to use atomic decrement here; thus use a CAS loop.
1731 */
1732 old_buf_state = pg_atomic_read_u32(&buf->state);
1733 for (;;)
1734 {
1735 if (old_buf_state & BM_LOCKED)
1736 old_buf_state = WaitBufHdrUnlocked(buf);
1737
1738 buf_state = old_buf_state;
1739
1740 buf_state -= BUF_REFCOUNT_ONE;
1741
1742 if (pg_atomic_compare_exchange_u32(&buf->state, &old_buf_state,
1743 buf_state))
1744 break;
1745 }
1746
1747 /* Support LockBufferForCleanup() */
1748 if (buf_state & BM_PIN_COUNT_WAITER)
1749 {
1750 /*
1751 * Acquire the buffer header lock, re-check that there's a waiter.
1752 * Another backend could have unpinned this buffer, and already
1753 * woken up the waiter. There's no danger of the buffer being
1754 * replaced after we unpinned it above, as it's pinned by the
1755 * waiter.
1756 */
1757 buf_state = LockBufHdr(buf);
1758
1759 if ((buf_state & BM_PIN_COUNT_WAITER) &&
1760 BUF_STATE_GET_REFCOUNT(buf_state) == 1)
1761 {
1762 /* we just released the last pin other than the waiter's */
1763 int wait_backend_pid = buf->wait_backend_pid;
1764
1765 buf_state &= ~BM_PIN_COUNT_WAITER;
1766 UnlockBufHdr(buf, buf_state);
1767 ProcSendSignal(wait_backend_pid);
1768 }
1769 else
1770 UnlockBufHdr(buf, buf_state);
1771 }
1772 ForgetPrivateRefCountEntry(ref);
1773 }
1774 }
1775
1776 /*
1777 * BufferSync -- Write out all dirty buffers in the pool.
1778 *
1779 * This is called at checkpoint time to write out all dirty shared buffers.
1780 * The checkpoint request flags should be passed in. If CHECKPOINT_IMMEDIATE
1781 * is set, we disable delays between writes; if CHECKPOINT_IS_SHUTDOWN,
1782 * CHECKPOINT_END_OF_RECOVERY or CHECKPOINT_FLUSH_ALL is set, we write even
1783 * unlogged buffers, which are otherwise skipped. The remaining flags
1784 * currently have no effect here.
1785 */
1786 static void
BufferSync(int flags)1787 BufferSync(int flags)
1788 {
1789 uint32 buf_state;
1790 int buf_id;
1791 int num_to_scan;
1792 int num_spaces;
1793 int num_processed;
1794 int num_written;
1795 CkptTsStatus *per_ts_stat = NULL;
1796 Oid last_tsid;
1797 binaryheap *ts_heap;
1798 int i;
1799 int mask = BM_DIRTY;
1800 WritebackContext wb_context;
1801
1802 /* Make sure we can handle the pin inside SyncOneBuffer */
1803 ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
1804
1805 /*
1806 * Unless this is a shutdown checkpoint or we have been explicitly told,
1807 * we write only permanent, dirty buffers. But at shutdown or end of
1808 * recovery, we write all dirty buffers.
1809 */
1810 if (!((flags & (CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_END_OF_RECOVERY |
1811 CHECKPOINT_FLUSH_ALL))))
1812 mask |= BM_PERMANENT;
1813
1814 /*
1815 * Loop over all buffers, and mark the ones that need to be written with
1816 * BM_CHECKPOINT_NEEDED. Count them as we go (num_to_scan), so that we
1817 * can estimate how much work needs to be done.
1818 *
1819 * This allows us to write only those pages that were dirty when the
1820 * checkpoint began, and not those that get dirtied while it proceeds.
1821 * Whenever a page with BM_CHECKPOINT_NEEDED is written out, either by us
1822 * later in this function, or by normal backends or the bgwriter cleaning
1823 * scan, the flag is cleared. Any buffer dirtied after this point won't
1824 * have the flag set.
1825 *
1826 * Note that if we fail to write some buffer, we may leave buffers with
1827 * BM_CHECKPOINT_NEEDED still set. This is OK since any such buffer would
1828 * certainly need to be written for the next checkpoint attempt, too.
1829 */
1830 num_to_scan = 0;
1831 for (buf_id = 0; buf_id < NBuffers; buf_id++)
1832 {
1833 BufferDesc *bufHdr = GetBufferDescriptor(buf_id);
1834
1835 /*
1836 * Header spinlock is enough to examine BM_DIRTY, see comment in
1837 * SyncOneBuffer.
1838 */
1839 buf_state = LockBufHdr(bufHdr);
1840
1841 if ((buf_state & mask) == mask)
1842 {
1843 CkptSortItem *item;
1844
1845 buf_state |= BM_CHECKPOINT_NEEDED;
1846
1847 item = &CkptBufferIds[num_to_scan++];
1848 item->buf_id = buf_id;
1849 item->tsId = bufHdr->tag.rnode.spcNode;
1850 item->relNode = bufHdr->tag.rnode.relNode;
1851 item->forkNum = bufHdr->tag.forkNum;
1852 item->blockNum = bufHdr->tag.blockNum;
1853 }
1854
1855 UnlockBufHdr(bufHdr, buf_state);
1856 }
1857
1858 if (num_to_scan == 0)
1859 return; /* nothing to do */
1860
1861 WritebackContextInit(&wb_context, &checkpoint_flush_after);
1862
1863 TRACE_POSTGRESQL_BUFFER_SYNC_START(NBuffers, num_to_scan);
1864
1865 /*
1866 * Sort buffers that need to be written to reduce the likelihood of random
1867 * IO. The sorting is also important for the implementation of balancing
1868 * writes between tablespaces. Without balancing writes we'd potentially
1869 * end up writing to the tablespaces one-by-one; possibly overloading the
1870 * underlying system.
1871 */
1872 qsort(CkptBufferIds, num_to_scan, sizeof(CkptSortItem),
1873 ckpt_buforder_comparator);
1874
1875 num_spaces = 0;
1876
1877 /*
1878 * Allocate progress status for each tablespace with buffers that need to
1879 * be flushed. This requires the to-be-flushed array to be sorted.
1880 */
1881 last_tsid = InvalidOid;
1882 for (i = 0; i < num_to_scan; i++)
1883 {
1884 CkptTsStatus *s;
1885 Oid cur_tsid;
1886
1887 cur_tsid = CkptBufferIds[i].tsId;
1888
1889 /*
1890 * Grow array of per-tablespace status structs, every time a new
1891 * tablespace is found.
1892 */
1893 if (last_tsid == InvalidOid || last_tsid != cur_tsid)
1894 {
1895 Size sz;
1896
1897 num_spaces++;
1898
1899 /*
1900 * Not worth adding grow-by-power-of-2 logic here - even with a
1901 * few hundred tablespaces this should be fine.
1902 */
1903 sz = sizeof(CkptTsStatus) * num_spaces;
1904
1905 if (per_ts_stat == NULL)
1906 per_ts_stat = (CkptTsStatus *) palloc(sz);
1907 else
1908 per_ts_stat = (CkptTsStatus *) repalloc(per_ts_stat, sz);
1909
1910 s = &per_ts_stat[num_spaces - 1];
1911 memset(s, 0, sizeof(*s));
1912 s->tsId = cur_tsid;
1913
1914 /*
1915 * The first buffer in this tablespace. As CkptBufferIds is sorted
1916 * by tablespace all (s->num_to_scan) buffers in this tablespace
1917 * will follow afterwards.
1918 */
1919 s->index = i;
1920
1921 /*
1922 * progress_slice will be determined once we know how many buffers
1923 * are in each tablespace, i.e. after this loop.
1924 */
1925
1926 last_tsid = cur_tsid;
1927 }
1928 else
1929 {
1930 s = &per_ts_stat[num_spaces - 1];
1931 }
1932
1933 s->num_to_scan++;
1934 }
1935
1936 Assert(num_spaces > 0);
1937
1938 /*
1939 * Build a min-heap over the write-progress in the individual tablespaces,
1940 * and compute how large a portion of the total progress a single
1941 * processed buffer is.
1942 */
1943 ts_heap = binaryheap_allocate(num_spaces,
1944 ts_ckpt_progress_comparator,
1945 NULL);
1946
1947 for (i = 0; i < num_spaces; i++)
1948 {
1949 CkptTsStatus *ts_stat = &per_ts_stat[i];
1950
1951 ts_stat->progress_slice = (float8) num_to_scan / ts_stat->num_to_scan;
1952
1953 binaryheap_add_unordered(ts_heap, PointerGetDatum(ts_stat));
1954 }
1955
1956 binaryheap_build(ts_heap);
1957
1958 /*
1959 * Iterate through to-be-checkpointed buffers and write the ones (still)
1960 * marked with BM_CHECKPOINT_NEEDED. The writes are balanced between
1961 * tablespaces; otherwise the sorting would lead to only one tablespace
1962 * receiving writes at a time, making inefficient use of the hardware.
1963 */
1964 num_processed = 0;
1965 num_written = 0;
1966 while (!binaryheap_empty(ts_heap))
1967 {
1968 BufferDesc *bufHdr = NULL;
1969 CkptTsStatus *ts_stat = (CkptTsStatus *)
1970 DatumGetPointer(binaryheap_first(ts_heap));
1971
1972 buf_id = CkptBufferIds[ts_stat->index].buf_id;
1973 Assert(buf_id != -1);
1974
1975 bufHdr = GetBufferDescriptor(buf_id);
1976
1977 num_processed++;
1978
1979 /*
1980 * We don't need to acquire the lock here, because we're only looking
1981 * at a single bit. It's possible that someone else writes the buffer
1982 * and clears the flag right after we check, but that doesn't matter
1983 * since SyncOneBuffer will then do nothing. However, there is a
1984 * further race condition: it's conceivable that between the time we
1985 * examine the bit here and the time SyncOneBuffer acquires the lock,
1986 * someone else not only wrote the buffer but replaced it with another
1987 * page and dirtied it. In that improbable case, SyncOneBuffer will
1988 * write the buffer though we didn't need to. It doesn't seem worth
1989 * guarding against this, though.
1990 */
1991 if (pg_atomic_read_u32(&bufHdr->state) & BM_CHECKPOINT_NEEDED)
1992 {
1993 if (SyncOneBuffer(buf_id, false, &wb_context) & BUF_WRITTEN)
1994 {
1995 TRACE_POSTGRESQL_BUFFER_SYNC_WRITTEN(buf_id);
1996 BgWriterStats.m_buf_written_checkpoints++;
1997 num_written++;
1998 }
1999 }
2000
2001 /*
2002 * Measure progress independent of actually having to flush the buffer
2003 * - otherwise writing become unbalanced.
2004 */
2005 ts_stat->progress += ts_stat->progress_slice;
2006 ts_stat->num_scanned++;
2007 ts_stat->index++;
2008
2009 /* Have all the buffers from the tablespace been processed? */
2010 if (ts_stat->num_scanned == ts_stat->num_to_scan)
2011 {
2012 binaryheap_remove_first(ts_heap);
2013 }
2014 else
2015 {
2016 /* update heap with the new progress */
2017 binaryheap_replace_first(ts_heap, PointerGetDatum(ts_stat));
2018 }
2019
2020 /*
2021 * Sleep to throttle our I/O rate.
2022 */
2023 CheckpointWriteDelay(flags, (double) num_processed / num_to_scan);
2024 }
2025
2026 /* issue all pending flushes */
2027 IssuePendingWritebacks(&wb_context);
2028
2029 pfree(per_ts_stat);
2030 per_ts_stat = NULL;
2031 binaryheap_free(ts_heap);
2032
2033 /*
2034 * Update checkpoint statistics. As noted above, this doesn't include
2035 * buffers written by other backends or bgwriter scan.
2036 */
2037 CheckpointStats.ckpt_bufs_written += num_written;
2038
2039 TRACE_POSTGRESQL_BUFFER_SYNC_DONE(NBuffers, num_written, num_to_scan);
2040 }
2041
2042 /*
2043 * BgBufferSync -- Write out some dirty buffers in the pool.
2044 *
2045 * This is called periodically by the background writer process.
2046 *
2047 * Returns true if it's appropriate for the bgwriter process to go into
2048 * low-power hibernation mode. (This happens if the strategy clock sweep
2049 * has been "lapped" and no buffer allocations have occurred recently,
2050 * or if the bgwriter has been effectively disabled by setting
2051 * bgwriter_lru_maxpages to 0.)
2052 */
2053 bool
BgBufferSync(WritebackContext * wb_context)2054 BgBufferSync(WritebackContext *wb_context)
2055 {
2056 /* info obtained from freelist.c */
2057 int strategy_buf_id;
2058 uint32 strategy_passes;
2059 uint32 recent_alloc;
2060
2061 /*
2062 * Information saved between calls so we can determine the strategy
2063 * point's advance rate and avoid scanning already-cleaned buffers.
2064 */
2065 static bool saved_info_valid = false;
2066 static int prev_strategy_buf_id;
2067 static uint32 prev_strategy_passes;
2068 static int next_to_clean;
2069 static uint32 next_passes;
2070
2071 /* Moving averages of allocation rate and clean-buffer density */
2072 static float smoothed_alloc = 0;
2073 static float smoothed_density = 10.0;
2074
2075 /* Potentially these could be tunables, but for now, not */
2076 float smoothing_samples = 16;
2077 float scan_whole_pool_milliseconds = 120000.0;
2078
2079 /* Used to compute how far we scan ahead */
2080 long strategy_delta;
2081 int bufs_to_lap;
2082 int bufs_ahead;
2083 float scans_per_alloc;
2084 int reusable_buffers_est;
2085 int upcoming_alloc_est;
2086 int min_scan_buffers;
2087
2088 /* Variables for the scanning loop proper */
2089 int num_to_scan;
2090 int num_written;
2091 int reusable_buffers;
2092
2093 /* Variables for final smoothed_density update */
2094 long new_strategy_delta;
2095 uint32 new_recent_alloc;
2096
2097 /*
2098 * Find out where the freelist clock sweep currently is, and how many
2099 * buffer allocations have happened since our last call.
2100 */
2101 strategy_buf_id = StrategySyncStart(&strategy_passes, &recent_alloc);
2102
2103 /* Report buffer alloc counts to pgstat */
2104 BgWriterStats.m_buf_alloc += recent_alloc;
2105
2106 /*
2107 * If we're not running the LRU scan, just stop after doing the stats
2108 * stuff. We mark the saved state invalid so that we can recover sanely
2109 * if LRU scan is turned back on later.
2110 */
2111 if (bgwriter_lru_maxpages <= 0)
2112 {
2113 saved_info_valid = false;
2114 return true;
2115 }
2116
2117 /*
2118 * Compute strategy_delta = how many buffers have been scanned by the
2119 * clock sweep since last time. If first time through, assume none. Then
2120 * see if we are still ahead of the clock sweep, and if so, how many
2121 * buffers we could scan before we'd catch up with it and "lap" it. Note:
2122 * weird-looking coding of xxx_passes comparisons are to avoid bogus
2123 * behavior when the passes counts wrap around.
2124 */
2125 if (saved_info_valid)
2126 {
2127 int32 passes_delta = strategy_passes - prev_strategy_passes;
2128
2129 strategy_delta = strategy_buf_id - prev_strategy_buf_id;
2130 strategy_delta += (long) passes_delta * NBuffers;
2131
2132 Assert(strategy_delta >= 0);
2133
2134 if ((int32) (next_passes - strategy_passes) > 0)
2135 {
2136 /* we're one pass ahead of the strategy point */
2137 bufs_to_lap = strategy_buf_id - next_to_clean;
2138 #ifdef BGW_DEBUG
2139 elog(DEBUG2, "bgwriter ahead: bgw %u-%u strategy %u-%u delta=%ld lap=%d",
2140 next_passes, next_to_clean,
2141 strategy_passes, strategy_buf_id,
2142 strategy_delta, bufs_to_lap);
2143 #endif
2144 }
2145 else if (next_passes == strategy_passes &&
2146 next_to_clean >= strategy_buf_id)
2147 {
2148 /* on same pass, but ahead or at least not behind */
2149 bufs_to_lap = NBuffers - (next_to_clean - strategy_buf_id);
2150 #ifdef BGW_DEBUG
2151 elog(DEBUG2, "bgwriter ahead: bgw %u-%u strategy %u-%u delta=%ld lap=%d",
2152 next_passes, next_to_clean,
2153 strategy_passes, strategy_buf_id,
2154 strategy_delta, bufs_to_lap);
2155 #endif
2156 }
2157 else
2158 {
2159 /*
2160 * We're behind, so skip forward to the strategy point and start
2161 * cleaning from there.
2162 */
2163 #ifdef BGW_DEBUG
2164 elog(DEBUG2, "bgwriter behind: bgw %u-%u strategy %u-%u delta=%ld",
2165 next_passes, next_to_clean,
2166 strategy_passes, strategy_buf_id,
2167 strategy_delta);
2168 #endif
2169 next_to_clean = strategy_buf_id;
2170 next_passes = strategy_passes;
2171 bufs_to_lap = NBuffers;
2172 }
2173 }
2174 else
2175 {
2176 /*
2177 * Initializing at startup or after LRU scanning had been off. Always
2178 * start at the strategy point.
2179 */
2180 #ifdef BGW_DEBUG
2181 elog(DEBUG2, "bgwriter initializing: strategy %u-%u",
2182 strategy_passes, strategy_buf_id);
2183 #endif
2184 strategy_delta = 0;
2185 next_to_clean = strategy_buf_id;
2186 next_passes = strategy_passes;
2187 bufs_to_lap = NBuffers;
2188 }
2189
2190 /* Update saved info for next time */
2191 prev_strategy_buf_id = strategy_buf_id;
2192 prev_strategy_passes = strategy_passes;
2193 saved_info_valid = true;
2194
2195 /*
2196 * Compute how many buffers had to be scanned for each new allocation, ie,
2197 * 1/density of reusable buffers, and track a moving average of that.
2198 *
2199 * If the strategy point didn't move, we don't update the density estimate
2200 */
2201 if (strategy_delta > 0 && recent_alloc > 0)
2202 {
2203 scans_per_alloc = (float) strategy_delta / (float) recent_alloc;
2204 smoothed_density += (scans_per_alloc - smoothed_density) /
2205 smoothing_samples;
2206 }
2207
2208 /*
2209 * Estimate how many reusable buffers there are between the current
2210 * strategy point and where we've scanned ahead to, based on the smoothed
2211 * density estimate.
2212 */
2213 bufs_ahead = NBuffers - bufs_to_lap;
2214 reusable_buffers_est = (float) bufs_ahead / smoothed_density;
2215
2216 /*
2217 * Track a moving average of recent buffer allocations. Here, rather than
2218 * a true average we want a fast-attack, slow-decline behavior: we
2219 * immediately follow any increase.
2220 */
2221 if (smoothed_alloc <= (float) recent_alloc)
2222 smoothed_alloc = recent_alloc;
2223 else
2224 smoothed_alloc += ((float) recent_alloc - smoothed_alloc) /
2225 smoothing_samples;
2226
2227 /* Scale the estimate by a GUC to allow more aggressive tuning. */
2228 upcoming_alloc_est = (int) (smoothed_alloc * bgwriter_lru_multiplier);
2229
2230 /*
2231 * If recent_alloc remains at zero for many cycles, smoothed_alloc will
2232 * eventually underflow to zero, and the underflows produce annoying
2233 * kernel warnings on some platforms. Once upcoming_alloc_est has gone to
2234 * zero, there's no point in tracking smaller and smaller values of
2235 * smoothed_alloc, so just reset it to exactly zero to avoid this
2236 * syndrome. It will pop back up as soon as recent_alloc increases.
2237 */
2238 if (upcoming_alloc_est == 0)
2239 smoothed_alloc = 0;
2240
2241 /*
2242 * Even in cases where there's been little or no buffer allocation
2243 * activity, we want to make a small amount of progress through the buffer
2244 * cache so that as many reusable buffers as possible are clean after an
2245 * idle period.
2246 *
2247 * (scan_whole_pool_milliseconds / BgWriterDelay) computes how many times
2248 * the BGW will be called during the scan_whole_pool time; slice the
2249 * buffer pool into that many sections.
2250 */
2251 min_scan_buffers = (int) (NBuffers / (scan_whole_pool_milliseconds / BgWriterDelay));
2252
2253 if (upcoming_alloc_est < (min_scan_buffers + reusable_buffers_est))
2254 {
2255 #ifdef BGW_DEBUG
2256 elog(DEBUG2, "bgwriter: alloc_est=%d too small, using min=%d + reusable_est=%d",
2257 upcoming_alloc_est, min_scan_buffers, reusable_buffers_est);
2258 #endif
2259 upcoming_alloc_est = min_scan_buffers + reusable_buffers_est;
2260 }
2261
2262 /*
2263 * Now write out dirty reusable buffers, working forward from the
2264 * next_to_clean point, until we have lapped the strategy scan, or cleaned
2265 * enough buffers to match our estimate of the next cycle's allocation
2266 * requirements, or hit the bgwriter_lru_maxpages limit.
2267 */
2268
2269 /* Make sure we can handle the pin inside SyncOneBuffer */
2270 ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
2271
2272 num_to_scan = bufs_to_lap;
2273 num_written = 0;
2274 reusable_buffers = reusable_buffers_est;
2275
2276 /* Execute the LRU scan */
2277 while (num_to_scan > 0 && reusable_buffers < upcoming_alloc_est)
2278 {
2279 int sync_state = SyncOneBuffer(next_to_clean, true,
2280 wb_context);
2281
2282 if (++next_to_clean >= NBuffers)
2283 {
2284 next_to_clean = 0;
2285 next_passes++;
2286 }
2287 num_to_scan--;
2288
2289 if (sync_state & BUF_WRITTEN)
2290 {
2291 reusable_buffers++;
2292 if (++num_written >= bgwriter_lru_maxpages)
2293 {
2294 BgWriterStats.m_maxwritten_clean++;
2295 break;
2296 }
2297 }
2298 else if (sync_state & BUF_REUSABLE)
2299 reusable_buffers++;
2300 }
2301
2302 BgWriterStats.m_buf_written_clean += num_written;
2303
2304 #ifdef BGW_DEBUG
2305 elog(DEBUG1, "bgwriter: recent_alloc=%u smoothed=%.2f delta=%ld ahead=%d density=%.2f reusable_est=%d upcoming_est=%d scanned=%d wrote=%d reusable=%d",
2306 recent_alloc, smoothed_alloc, strategy_delta, bufs_ahead,
2307 smoothed_density, reusable_buffers_est, upcoming_alloc_est,
2308 bufs_to_lap - num_to_scan,
2309 num_written,
2310 reusable_buffers - reusable_buffers_est);
2311 #endif
2312
2313 /*
2314 * Consider the above scan as being like a new allocation scan.
2315 * Characterize its density and update the smoothed one based on it. This
2316 * effectively halves the moving average period in cases where both the
2317 * strategy and the background writer are doing some useful scanning,
2318 * which is helpful because a long memory isn't as desirable on the
2319 * density estimates.
2320 */
2321 new_strategy_delta = bufs_to_lap - num_to_scan;
2322 new_recent_alloc = reusable_buffers - reusable_buffers_est;
2323 if (new_strategy_delta > 0 && new_recent_alloc > 0)
2324 {
2325 scans_per_alloc = (float) new_strategy_delta / (float) new_recent_alloc;
2326 smoothed_density += (scans_per_alloc - smoothed_density) /
2327 smoothing_samples;
2328
2329 #ifdef BGW_DEBUG
2330 elog(DEBUG2, "bgwriter: cleaner density alloc=%u scan=%ld density=%.2f new smoothed=%.2f",
2331 new_recent_alloc, new_strategy_delta,
2332 scans_per_alloc, smoothed_density);
2333 #endif
2334 }
2335
2336 /* Return true if OK to hibernate */
2337 return (bufs_to_lap == 0 && recent_alloc == 0);
2338 }
2339
2340 /*
2341 * SyncOneBuffer -- process a single buffer during syncing.
2342 *
2343 * If skip_recently_used is true, we don't write currently-pinned buffers, nor
2344 * buffers marked recently used, as these are not replacement candidates.
2345 *
2346 * Returns a bitmask containing the following flag bits:
2347 * BUF_WRITTEN: we wrote the buffer.
2348 * BUF_REUSABLE: buffer is available for replacement, ie, it has
2349 * pin count 0 and usage count 0.
2350 *
2351 * (BUF_WRITTEN could be set in error if FlushBuffers finds the buffer clean
2352 * after locking it, but we don't care all that much.)
2353 *
2354 * Note: caller must have done ResourceOwnerEnlargeBuffers.
2355 */
2356 static int
SyncOneBuffer(int buf_id,bool skip_recently_used,WritebackContext * wb_context)2357 SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context)
2358 {
2359 BufferDesc *bufHdr = GetBufferDescriptor(buf_id);
2360 int result = 0;
2361 uint32 buf_state;
2362 BufferTag tag;
2363
2364 ReservePrivateRefCountEntry();
2365
2366 /*
2367 * Check whether buffer needs writing.
2368 *
2369 * We can make this check without taking the buffer content lock so long
2370 * as we mark pages dirty in access methods *before* logging changes with
2371 * XLogInsert(): if someone marks the buffer dirty just after our check we
2372 * don't worry because our checkpoint.redo points before log record for
2373 * upcoming changes and so we are not required to write such dirty buffer.
2374 */
2375 buf_state = LockBufHdr(bufHdr);
2376
2377 if (BUF_STATE_GET_REFCOUNT(buf_state) == 0 &&
2378 BUF_STATE_GET_USAGECOUNT(buf_state) == 0)
2379 {
2380 result |= BUF_REUSABLE;
2381 }
2382 else if (skip_recently_used)
2383 {
2384 /* Caller told us not to write recently-used buffers */
2385 UnlockBufHdr(bufHdr, buf_state);
2386 return result;
2387 }
2388
2389 if (!(buf_state & BM_VALID) || !(buf_state & BM_DIRTY))
2390 {
2391 /* It's clean, so nothing to do */
2392 UnlockBufHdr(bufHdr, buf_state);
2393 return result;
2394 }
2395
2396 /*
2397 * Pin it, share-lock it, write it. (FlushBuffer will do nothing if the
2398 * buffer is clean by the time we've locked it.)
2399 */
2400 PinBuffer_Locked(bufHdr);
2401 LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_SHARED);
2402
2403 FlushBuffer(bufHdr, NULL);
2404
2405 LWLockRelease(BufferDescriptorGetContentLock(bufHdr));
2406
2407 tag = bufHdr->tag;
2408
2409 UnpinBuffer(bufHdr, true);
2410
2411 ScheduleBufferTagForWriteback(wb_context, &tag);
2412
2413 return result | BUF_WRITTEN;
2414 }
2415
2416 /*
2417 * AtEOXact_Buffers - clean up at end of transaction.
2418 *
2419 * As of PostgreSQL 8.0, buffer pins should get released by the
2420 * ResourceOwner mechanism. This routine is just a debugging
2421 * cross-check that no pins remain.
2422 */
2423 void
AtEOXact_Buffers(bool isCommit)2424 AtEOXact_Buffers(bool isCommit)
2425 {
2426 CheckForBufferLeaks();
2427
2428 AtEOXact_LocalBuffers(isCommit);
2429
2430 Assert(PrivateRefCountOverflowed == 0);
2431 }
2432
2433 /*
2434 * Initialize access to shared buffer pool
2435 *
2436 * This is called during backend startup (whether standalone or under the
2437 * postmaster). It sets up for this backend's access to the already-existing
2438 * buffer pool.
2439 *
2440 * NB: this is called before InitProcess(), so we do not have a PGPROC and
2441 * cannot do LWLockAcquire; hence we can't actually access stuff in
2442 * shared memory yet. We are only initializing local data here.
2443 * (See also InitBufferPoolBackend)
2444 */
2445 void
InitBufferPoolAccess(void)2446 InitBufferPoolAccess(void)
2447 {
2448 HASHCTL hash_ctl;
2449
2450 memset(&PrivateRefCountArray, 0, sizeof(PrivateRefCountArray));
2451
2452 MemSet(&hash_ctl, 0, sizeof(hash_ctl));
2453 hash_ctl.keysize = sizeof(int32);
2454 hash_ctl.entrysize = sizeof(PrivateRefCountEntry);
2455
2456 PrivateRefCountHash = hash_create("PrivateRefCount", 100, &hash_ctl,
2457 HASH_ELEM | HASH_BLOBS);
2458 }
2459
2460 /*
2461 * InitBufferPoolBackend --- second-stage initialization of a new backend
2462 *
2463 * This is called after we have acquired a PGPROC and so can safely get
2464 * LWLocks. We don't currently need to do anything at this stage ...
2465 * except register a shmem-exit callback. AtProcExit_Buffers needs LWLock
2466 * access, and thereby has to be called at the corresponding phase of
2467 * backend shutdown.
2468 */
2469 void
InitBufferPoolBackend(void)2470 InitBufferPoolBackend(void)
2471 {
2472 on_shmem_exit(AtProcExit_Buffers, 0);
2473 }
2474
2475 /*
2476 * During backend exit, ensure that we released all shared-buffer locks and
2477 * assert that we have no remaining pins.
2478 */
2479 static void
AtProcExit_Buffers(int code,Datum arg)2480 AtProcExit_Buffers(int code, Datum arg)
2481 {
2482 AbortBufferIO();
2483 UnlockBuffers();
2484
2485 CheckForBufferLeaks();
2486
2487 /* localbuf.c needs a chance too */
2488 AtProcExit_LocalBuffers();
2489 }
2490
2491 /*
2492 * CheckForBufferLeaks - ensure this backend holds no buffer pins
2493 *
2494 * As of PostgreSQL 8.0, buffer pins should get released by the
2495 * ResourceOwner mechanism. This routine is just a debugging
2496 * cross-check that no pins remain.
2497 */
2498 static void
CheckForBufferLeaks(void)2499 CheckForBufferLeaks(void)
2500 {
2501 #ifdef USE_ASSERT_CHECKING
2502 int RefCountErrors = 0;
2503 PrivateRefCountEntry *res;
2504 int i;
2505
2506 /* check the array */
2507 for (i = 0; i < REFCOUNT_ARRAY_ENTRIES; i++)
2508 {
2509 res = &PrivateRefCountArray[i];
2510
2511 if (res->buffer != InvalidBuffer)
2512 {
2513 PrintBufferLeakWarning(res->buffer);
2514 RefCountErrors++;
2515 }
2516 }
2517
2518 /* if necessary search the hash */
2519 if (PrivateRefCountOverflowed)
2520 {
2521 HASH_SEQ_STATUS hstat;
2522
2523 hash_seq_init(&hstat, PrivateRefCountHash);
2524 while ((res = (PrivateRefCountEntry *) hash_seq_search(&hstat)) != NULL)
2525 {
2526 PrintBufferLeakWarning(res->buffer);
2527 RefCountErrors++;
2528 }
2529
2530 }
2531
2532 Assert(RefCountErrors == 0);
2533 #endif
2534 }
2535
2536 /*
2537 * Helper routine to issue warnings when a buffer is unexpectedly pinned
2538 */
2539 void
PrintBufferLeakWarning(Buffer buffer)2540 PrintBufferLeakWarning(Buffer buffer)
2541 {
2542 BufferDesc *buf;
2543 int32 loccount;
2544 char *path;
2545 BackendId backend;
2546 uint32 buf_state;
2547
2548 Assert(BufferIsValid(buffer));
2549 if (BufferIsLocal(buffer))
2550 {
2551 buf = GetLocalBufferDescriptor(-buffer - 1);
2552 loccount = LocalRefCount[-buffer - 1];
2553 backend = MyBackendId;
2554 }
2555 else
2556 {
2557 buf = GetBufferDescriptor(buffer - 1);
2558 loccount = GetPrivateRefCount(buffer);
2559 backend = InvalidBackendId;
2560 }
2561
2562 /* theoretically we should lock the bufhdr here */
2563 path = relpathbackend(buf->tag.rnode, backend, buf->tag.forkNum);
2564 buf_state = pg_atomic_read_u32(&buf->state);
2565 elog(WARNING,
2566 "buffer refcount leak: [%03d] "
2567 "(rel=%s, blockNum=%u, flags=0x%x, refcount=%u %d)",
2568 buffer, path,
2569 buf->tag.blockNum, buf_state & BUF_FLAG_MASK,
2570 BUF_STATE_GET_REFCOUNT(buf_state), loccount);
2571 pfree(path);
2572 }
2573
2574 /*
2575 * CheckPointBuffers
2576 *
2577 * Flush all dirty blocks in buffer pool to disk at checkpoint time.
2578 *
2579 * Note: temporary relations do not participate in checkpoints, so they don't
2580 * need to be flushed.
2581 */
2582 void
CheckPointBuffers(int flags)2583 CheckPointBuffers(int flags)
2584 {
2585 TRACE_POSTGRESQL_BUFFER_CHECKPOINT_START(flags);
2586 CheckpointStats.ckpt_write_t = GetCurrentTimestamp();
2587 BufferSync(flags);
2588 CheckpointStats.ckpt_sync_t = GetCurrentTimestamp();
2589 TRACE_POSTGRESQL_BUFFER_CHECKPOINT_SYNC_START();
2590 smgrsync();
2591 CheckpointStats.ckpt_sync_end_t = GetCurrentTimestamp();
2592 TRACE_POSTGRESQL_BUFFER_CHECKPOINT_DONE();
2593 }
2594
2595
2596 /*
2597 * Do whatever is needed to prepare for commit at the bufmgr and smgr levels
2598 */
2599 void
BufmgrCommit(void)2600 BufmgrCommit(void)
2601 {
2602 /* Nothing to do in bufmgr anymore... */
2603 }
2604
2605 /*
2606 * BufferGetBlockNumber
2607 * Returns the block number associated with a buffer.
2608 *
2609 * Note:
2610 * Assumes that the buffer is valid and pinned, else the
2611 * value may be obsolete immediately...
2612 */
2613 BlockNumber
BufferGetBlockNumber(Buffer buffer)2614 BufferGetBlockNumber(Buffer buffer)
2615 {
2616 BufferDesc *bufHdr;
2617
2618 Assert(BufferIsPinned(buffer));
2619
2620 if (BufferIsLocal(buffer))
2621 bufHdr = GetLocalBufferDescriptor(-buffer - 1);
2622 else
2623 bufHdr = GetBufferDescriptor(buffer - 1);
2624
2625 /* pinned, so OK to read tag without spinlock */
2626 return bufHdr->tag.blockNum;
2627 }
2628
2629 /*
2630 * BufferGetTag
2631 * Returns the relfilenode, fork number and block number associated with
2632 * a buffer.
2633 */
2634 void
BufferGetTag(Buffer buffer,RelFileNode * rnode,ForkNumber * forknum,BlockNumber * blknum)2635 BufferGetTag(Buffer buffer, RelFileNode *rnode, ForkNumber *forknum,
2636 BlockNumber *blknum)
2637 {
2638 BufferDesc *bufHdr;
2639
2640 /* Do the same checks as BufferGetBlockNumber. */
2641 Assert(BufferIsPinned(buffer));
2642
2643 if (BufferIsLocal(buffer))
2644 bufHdr = GetLocalBufferDescriptor(-buffer - 1);
2645 else
2646 bufHdr = GetBufferDescriptor(buffer - 1);
2647
2648 /* pinned, so OK to read tag without spinlock */
2649 *rnode = bufHdr->tag.rnode;
2650 *forknum = bufHdr->tag.forkNum;
2651 *blknum = bufHdr->tag.blockNum;
2652 }
2653
2654 /*
2655 * FlushBuffer
2656 * Physically write out a shared buffer.
2657 *
2658 * NOTE: this actually just passes the buffer contents to the kernel; the
2659 * real write to disk won't happen until the kernel feels like it. This
2660 * is okay from our point of view since we can redo the changes from WAL.
2661 * However, we will need to force the changes to disk via fsync before
2662 * we can checkpoint WAL.
2663 *
2664 * The caller must hold a pin on the buffer and have share-locked the
2665 * buffer contents. (Note: a share-lock does not prevent updates of
2666 * hint bits in the buffer, so the page could change while the write
2667 * is in progress, but we assume that that will not invalidate the data
2668 * written.)
2669 *
2670 * If the caller has an smgr reference for the buffer's relation, pass it
2671 * as the second parameter. If not, pass NULL.
2672 */
2673 static void
FlushBuffer(BufferDesc * buf,SMgrRelation reln)2674 FlushBuffer(BufferDesc *buf, SMgrRelation reln)
2675 {
2676 XLogRecPtr recptr;
2677 ErrorContextCallback errcallback;
2678 instr_time io_start,
2679 io_time;
2680 Block bufBlock;
2681 char *bufToWrite;
2682 uint32 buf_state;
2683
2684 /*
2685 * Acquire the buffer's io_in_progress lock. If StartBufferIO returns
2686 * false, then someone else flushed the buffer before we could, so we need
2687 * not do anything.
2688 */
2689 if (!StartBufferIO(buf, false))
2690 return;
2691
2692 /* Setup error traceback support for ereport() */
2693 errcallback.callback = shared_buffer_write_error_callback;
2694 errcallback.arg = (void *) buf;
2695 errcallback.previous = error_context_stack;
2696 error_context_stack = &errcallback;
2697
2698 /* Find smgr relation for buffer */
2699 if (reln == NULL)
2700 reln = smgropen(buf->tag.rnode, InvalidBackendId);
2701
2702 TRACE_POSTGRESQL_BUFFER_FLUSH_START(buf->tag.forkNum,
2703 buf->tag.blockNum,
2704 reln->smgr_rnode.node.spcNode,
2705 reln->smgr_rnode.node.dbNode,
2706 reln->smgr_rnode.node.relNode);
2707
2708 buf_state = LockBufHdr(buf);
2709
2710 /*
2711 * Run PageGetLSN while holding header lock, since we don't have the
2712 * buffer locked exclusively in all cases.
2713 */
2714 recptr = BufferGetLSN(buf);
2715
2716 /* To check if block content changes while flushing. - vadim 01/17/97 */
2717 buf_state &= ~BM_JUST_DIRTIED;
2718 UnlockBufHdr(buf, buf_state);
2719
2720 /*
2721 * Force XLOG flush up to buffer's LSN. This implements the basic WAL
2722 * rule that log updates must hit disk before any of the data-file changes
2723 * they describe do.
2724 *
2725 * However, this rule does not apply to unlogged relations, which will be
2726 * lost after a crash anyway. Most unlogged relation pages do not bear
2727 * LSNs since we never emit WAL records for them, and therefore flushing
2728 * up through the buffer LSN would be useless, but harmless. However,
2729 * GiST indexes use LSNs internally to track page-splits, and therefore
2730 * unlogged GiST pages bear "fake" LSNs generated by
2731 * GetFakeLSNForUnloggedRel. It is unlikely but possible that the fake
2732 * LSN counter could advance past the WAL insertion point; and if it did
2733 * happen, attempting to flush WAL through that location would fail, with
2734 * disastrous system-wide consequences. To make sure that can't happen,
2735 * skip the flush if the buffer isn't permanent.
2736 */
2737 if (buf_state & BM_PERMANENT)
2738 XLogFlush(recptr);
2739
2740 /*
2741 * Now it's safe to write buffer to disk. Note that no one else should
2742 * have been able to write it while we were busy with log flushing because
2743 * we have the io_in_progress lock.
2744 */
2745 bufBlock = BufHdrGetBlock(buf);
2746
2747 /*
2748 * Update page checksum if desired. Since we have only shared lock on the
2749 * buffer, other processes might be updating hint bits in it, so we must
2750 * copy the page to private storage if we do checksumming.
2751 */
2752 bufToWrite = PageSetChecksumCopy((Page) bufBlock, buf->tag.blockNum);
2753
2754 if (track_io_timing)
2755 INSTR_TIME_SET_CURRENT(io_start);
2756
2757 /*
2758 * bufToWrite is either the shared buffer or a copy, as appropriate.
2759 */
2760 smgrwrite(reln,
2761 buf->tag.forkNum,
2762 buf->tag.blockNum,
2763 bufToWrite,
2764 false);
2765
2766 if (track_io_timing)
2767 {
2768 INSTR_TIME_SET_CURRENT(io_time);
2769 INSTR_TIME_SUBTRACT(io_time, io_start);
2770 pgstat_count_buffer_write_time(INSTR_TIME_GET_MICROSEC(io_time));
2771 INSTR_TIME_ADD(pgBufferUsage.blk_write_time, io_time);
2772 }
2773
2774 pgBufferUsage.shared_blks_written++;
2775
2776 /*
2777 * Mark the buffer as clean (unless BM_JUST_DIRTIED has become set) and
2778 * end the io_in_progress state.
2779 */
2780 TerminateBufferIO(buf, true, 0);
2781
2782 TRACE_POSTGRESQL_BUFFER_FLUSH_DONE(buf->tag.forkNum,
2783 buf->tag.blockNum,
2784 reln->smgr_rnode.node.spcNode,
2785 reln->smgr_rnode.node.dbNode,
2786 reln->smgr_rnode.node.relNode);
2787
2788 /* Pop the error context stack */
2789 error_context_stack = errcallback.previous;
2790 }
2791
2792 /*
2793 * RelationGetNumberOfBlocksInFork
2794 * Determines the current number of pages in the specified relation fork.
2795 */
2796 BlockNumber
RelationGetNumberOfBlocksInFork(Relation relation,ForkNumber forkNum)2797 RelationGetNumberOfBlocksInFork(Relation relation, ForkNumber forkNum)
2798 {
2799 /* Open it at the smgr level if not already done */
2800 RelationOpenSmgr(relation);
2801
2802 return smgrnblocks(relation->rd_smgr, forkNum);
2803 }
2804
2805 /*
2806 * BufferIsPermanent
2807 * Determines whether a buffer will potentially still be around after
2808 * a crash. Caller must hold a buffer pin.
2809 */
2810 bool
BufferIsPermanent(Buffer buffer)2811 BufferIsPermanent(Buffer buffer)
2812 {
2813 BufferDesc *bufHdr;
2814
2815 /* Local buffers are used only for temp relations. */
2816 if (BufferIsLocal(buffer))
2817 return false;
2818
2819 /* Make sure we've got a real buffer, and that we hold a pin on it. */
2820 Assert(BufferIsValid(buffer));
2821 Assert(BufferIsPinned(buffer));
2822
2823 /*
2824 * BM_PERMANENT can't be changed while we hold a pin on the buffer, so we
2825 * need not bother with the buffer header spinlock. Even if someone else
2826 * changes the buffer header state while we're doing this, the state is
2827 * changed atomically, so we'll read the old value or the new value, but
2828 * not random garbage.
2829 */
2830 bufHdr = GetBufferDescriptor(buffer - 1);
2831 return (pg_atomic_read_u32(&bufHdr->state) & BM_PERMANENT) != 0;
2832 }
2833
2834 /*
2835 * BufferGetLSNAtomic
2836 * Retrieves the LSN of the buffer atomically using a buffer header lock.
2837 * This is necessary for some callers who may not have an exclusive lock
2838 * on the buffer.
2839 */
2840 XLogRecPtr
BufferGetLSNAtomic(Buffer buffer)2841 BufferGetLSNAtomic(Buffer buffer)
2842 {
2843 BufferDesc *bufHdr = GetBufferDescriptor(buffer - 1);
2844 char *page = BufferGetPage(buffer);
2845 XLogRecPtr lsn;
2846 uint32 buf_state;
2847
2848 /*
2849 * If we don't need locking for correctness, fastpath out.
2850 */
2851 if (!XLogHintBitIsNeeded() || BufferIsLocal(buffer))
2852 return PageGetLSN(page);
2853
2854 /* Make sure we've got a real buffer, and that we hold a pin on it. */
2855 Assert(BufferIsValid(buffer));
2856 Assert(BufferIsPinned(buffer));
2857
2858 buf_state = LockBufHdr(bufHdr);
2859 lsn = PageGetLSN(page);
2860 UnlockBufHdr(bufHdr, buf_state);
2861
2862 return lsn;
2863 }
2864
2865 /* ---------------------------------------------------------------------
2866 * DropRelFileNodeBuffers
2867 *
2868 * This function removes from the buffer pool all the pages of the
2869 * specified relation fork that have block numbers >= firstDelBlock.
2870 * (In particular, with firstDelBlock = 0, all pages are removed.)
2871 * Dirty pages are simply dropped, without bothering to write them
2872 * out first. Therefore, this is NOT rollback-able, and so should be
2873 * used only with extreme caution!
2874 *
2875 * Currently, this is called only from smgr.c when the underlying file
2876 * is about to be deleted or truncated (firstDelBlock is needed for
2877 * the truncation case). The data in the affected pages would therefore
2878 * be deleted momentarily anyway, and there is no point in writing it.
2879 * It is the responsibility of higher-level code to ensure that the
2880 * deletion or truncation does not lose any data that could be needed
2881 * later. It is also the responsibility of higher-level code to ensure
2882 * that no other process could be trying to load more pages of the
2883 * relation into buffers.
2884 *
2885 * XXX currently it sequentially searches the buffer pool, should be
2886 * changed to more clever ways of searching. However, this routine
2887 * is used only in code paths that aren't very performance-critical,
2888 * and we shouldn't slow down the hot paths to make it faster ...
2889 * --------------------------------------------------------------------
2890 */
2891 void
DropRelFileNodeBuffers(RelFileNodeBackend rnode,ForkNumber forkNum,BlockNumber firstDelBlock)2892 DropRelFileNodeBuffers(RelFileNodeBackend rnode, ForkNumber forkNum,
2893 BlockNumber firstDelBlock)
2894 {
2895 int i;
2896
2897 /* If it's a local relation, it's localbuf.c's problem. */
2898 if (RelFileNodeBackendIsTemp(rnode))
2899 {
2900 if (rnode.backend == MyBackendId)
2901 DropRelFileNodeLocalBuffers(rnode.node, forkNum, firstDelBlock);
2902 return;
2903 }
2904
2905 for (i = 0; i < NBuffers; i++)
2906 {
2907 BufferDesc *bufHdr = GetBufferDescriptor(i);
2908 uint32 buf_state;
2909
2910 /*
2911 * We can make this a tad faster by prechecking the buffer tag before
2912 * we attempt to lock the buffer; this saves a lot of lock
2913 * acquisitions in typical cases. It should be safe because the
2914 * caller must have AccessExclusiveLock on the relation, or some other
2915 * reason to be certain that no one is loading new pages of the rel
2916 * into the buffer pool. (Otherwise we might well miss such pages
2917 * entirely.) Therefore, while the tag might be changing while we
2918 * look at it, it can't be changing *to* a value we care about, only
2919 * *away* from such a value. So false negatives are impossible, and
2920 * false positives are safe because we'll recheck after getting the
2921 * buffer lock.
2922 *
2923 * We could check forkNum and blockNum as well as the rnode, but the
2924 * incremental win from doing so seems small.
2925 */
2926 if (!RelFileNodeEquals(bufHdr->tag.rnode, rnode.node))
2927 continue;
2928
2929 buf_state = LockBufHdr(bufHdr);
2930 if (RelFileNodeEquals(bufHdr->tag.rnode, rnode.node) &&
2931 bufHdr->tag.forkNum == forkNum &&
2932 bufHdr->tag.blockNum >= firstDelBlock)
2933 InvalidateBuffer(bufHdr); /* releases spinlock */
2934 else
2935 UnlockBufHdr(bufHdr, buf_state);
2936 }
2937 }
2938
2939 /* ---------------------------------------------------------------------
2940 * DropRelFileNodesAllBuffers
2941 *
2942 * This function removes from the buffer pool all the pages of all
2943 * forks of the specified relations. It's equivalent to calling
2944 * DropRelFileNodeBuffers once per fork per relation with
2945 * firstDelBlock = 0.
2946 * --------------------------------------------------------------------
2947 */
2948 void
DropRelFileNodesAllBuffers(RelFileNodeBackend * rnodes,int nnodes)2949 DropRelFileNodesAllBuffers(RelFileNodeBackend *rnodes, int nnodes)
2950 {
2951 int i,
2952 n = 0;
2953 RelFileNode *nodes;
2954 bool use_bsearch;
2955
2956 if (nnodes == 0)
2957 return;
2958
2959 nodes = palloc(sizeof(RelFileNode) * nnodes); /* non-local relations */
2960
2961 /* If it's a local relation, it's localbuf.c's problem. */
2962 for (i = 0; i < nnodes; i++)
2963 {
2964 if (RelFileNodeBackendIsTemp(rnodes[i]))
2965 {
2966 if (rnodes[i].backend == MyBackendId)
2967 DropRelFileNodeAllLocalBuffers(rnodes[i].node);
2968 }
2969 else
2970 nodes[n++] = rnodes[i].node;
2971 }
2972
2973 /*
2974 * If there are no non-local relations, then we're done. Release the
2975 * memory and return.
2976 */
2977 if (n == 0)
2978 {
2979 pfree(nodes);
2980 return;
2981 }
2982
2983 /*
2984 * For low number of relations to drop just use a simple walk through, to
2985 * save the bsearch overhead. The threshold to use is rather a guess than
2986 * an exactly determined value, as it depends on many factors (CPU and RAM
2987 * speeds, amount of shared buffers etc.).
2988 */
2989 use_bsearch = n > DROP_RELS_BSEARCH_THRESHOLD;
2990
2991 /* sort the list of rnodes if necessary */
2992 if (use_bsearch)
2993 pg_qsort(nodes, n, sizeof(RelFileNode), rnode_comparator);
2994
2995 for (i = 0; i < NBuffers; i++)
2996 {
2997 RelFileNode *rnode = NULL;
2998 BufferDesc *bufHdr = GetBufferDescriptor(i);
2999 uint32 buf_state;
3000
3001 /*
3002 * As in DropRelFileNodeBuffers, an unlocked precheck should be safe
3003 * and saves some cycles.
3004 */
3005
3006 if (!use_bsearch)
3007 {
3008 int j;
3009
3010 for (j = 0; j < n; j++)
3011 {
3012 if (RelFileNodeEquals(bufHdr->tag.rnode, nodes[j]))
3013 {
3014 rnode = &nodes[j];
3015 break;
3016 }
3017 }
3018 }
3019 else
3020 {
3021 rnode = bsearch((const void *) &(bufHdr->tag.rnode),
3022 nodes, n, sizeof(RelFileNode),
3023 rnode_comparator);
3024 }
3025
3026 /* buffer doesn't belong to any of the given relfilenodes; skip it */
3027 if (rnode == NULL)
3028 continue;
3029
3030 buf_state = LockBufHdr(bufHdr);
3031 if (RelFileNodeEquals(bufHdr->tag.rnode, (*rnode)))
3032 InvalidateBuffer(bufHdr); /* releases spinlock */
3033 else
3034 UnlockBufHdr(bufHdr, buf_state);
3035 }
3036
3037 pfree(nodes);
3038 }
3039
3040 /* ---------------------------------------------------------------------
3041 * DropDatabaseBuffers
3042 *
3043 * This function removes all the buffers in the buffer cache for a
3044 * particular database. Dirty pages are simply dropped, without
3045 * bothering to write them out first. This is used when we destroy a
3046 * database, to avoid trying to flush data to disk when the directory
3047 * tree no longer exists. Implementation is pretty similar to
3048 * DropRelFileNodeBuffers() which is for destroying just one relation.
3049 * --------------------------------------------------------------------
3050 */
3051 void
DropDatabaseBuffers(Oid dbid)3052 DropDatabaseBuffers(Oid dbid)
3053 {
3054 int i;
3055
3056 /*
3057 * We needn't consider local buffers, since by assumption the target
3058 * database isn't our own.
3059 */
3060
3061 for (i = 0; i < NBuffers; i++)
3062 {
3063 BufferDesc *bufHdr = GetBufferDescriptor(i);
3064 uint32 buf_state;
3065
3066 /*
3067 * As in DropRelFileNodeBuffers, an unlocked precheck should be safe
3068 * and saves some cycles.
3069 */
3070 if (bufHdr->tag.rnode.dbNode != dbid)
3071 continue;
3072
3073 buf_state = LockBufHdr(bufHdr);
3074 if (bufHdr->tag.rnode.dbNode == dbid)
3075 InvalidateBuffer(bufHdr); /* releases spinlock */
3076 else
3077 UnlockBufHdr(bufHdr, buf_state);
3078 }
3079 }
3080
3081 /* -----------------------------------------------------------------
3082 * PrintBufferDescs
3083 *
3084 * this function prints all the buffer descriptors, for debugging
3085 * use only.
3086 * -----------------------------------------------------------------
3087 */
3088 #ifdef NOT_USED
3089 void
PrintBufferDescs(void)3090 PrintBufferDescs(void)
3091 {
3092 int i;
3093
3094 for (i = 0; i < NBuffers; ++i)
3095 {
3096 BufferDesc *buf = GetBufferDescriptor(i);
3097 Buffer b = BufferDescriptorGetBuffer(buf);
3098
3099 /* theoretically we should lock the bufhdr here */
3100 elog(LOG,
3101 "[%02d] (freeNext=%d, rel=%s, "
3102 "blockNum=%u, flags=0x%x, refcount=%u %d)",
3103 i, buf->freeNext,
3104 relpathbackend(buf->tag.rnode, InvalidBackendId, buf->tag.forkNum),
3105 buf->tag.blockNum, buf->flags,
3106 buf->refcount, GetPrivateRefCount(b));
3107 }
3108 }
3109 #endif
3110
3111 #ifdef NOT_USED
3112 void
PrintPinnedBufs(void)3113 PrintPinnedBufs(void)
3114 {
3115 int i;
3116
3117 for (i = 0; i < NBuffers; ++i)
3118 {
3119 BufferDesc *buf = GetBufferDescriptor(i);
3120 Buffer b = BufferDescriptorGetBuffer(buf);
3121
3122 if (GetPrivateRefCount(b) > 0)
3123 {
3124 /* theoretically we should lock the bufhdr here */
3125 elog(LOG,
3126 "[%02d] (freeNext=%d, rel=%s, "
3127 "blockNum=%u, flags=0x%x, refcount=%u %d)",
3128 i, buf->freeNext,
3129 relpathperm(buf->tag.rnode, buf->tag.forkNum),
3130 buf->tag.blockNum, buf->flags,
3131 buf->refcount, GetPrivateRefCount(b));
3132 }
3133 }
3134 }
3135 #endif
3136
3137 /* ---------------------------------------------------------------------
3138 * FlushRelationBuffers
3139 *
3140 * This function writes all dirty pages of a relation out to disk
3141 * (or more accurately, out to kernel disk buffers), ensuring that the
3142 * kernel has an up-to-date view of the relation.
3143 *
3144 * Generally, the caller should be holding AccessExclusiveLock on the
3145 * target relation to ensure that no other backend is busy dirtying
3146 * more blocks of the relation; the effects can't be expected to last
3147 * after the lock is released.
3148 *
3149 * XXX currently it sequentially searches the buffer pool, should be
3150 * changed to more clever ways of searching. This routine is not
3151 * used in any performance-critical code paths, so it's not worth
3152 * adding additional overhead to normal paths to make it go faster;
3153 * but see also DropRelFileNodeBuffers.
3154 * --------------------------------------------------------------------
3155 */
3156 void
FlushRelationBuffers(Relation rel)3157 FlushRelationBuffers(Relation rel)
3158 {
3159 int i;
3160 BufferDesc *bufHdr;
3161
3162 /* Open rel at the smgr level if not already done */
3163 RelationOpenSmgr(rel);
3164
3165 if (RelationUsesLocalBuffers(rel))
3166 {
3167 for (i = 0; i < NLocBuffer; i++)
3168 {
3169 uint32 buf_state;
3170
3171 bufHdr = GetLocalBufferDescriptor(i);
3172 if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) &&
3173 ((buf_state = pg_atomic_read_u32(&bufHdr->state)) &
3174 (BM_VALID | BM_DIRTY)) == (BM_VALID | BM_DIRTY))
3175 {
3176 ErrorContextCallback errcallback;
3177 Page localpage;
3178
3179 localpage = (char *) LocalBufHdrGetBlock(bufHdr);
3180
3181 /* Setup error traceback support for ereport() */
3182 errcallback.callback = local_buffer_write_error_callback;
3183 errcallback.arg = (void *) bufHdr;
3184 errcallback.previous = error_context_stack;
3185 error_context_stack = &errcallback;
3186
3187 PageSetChecksumInplace(localpage, bufHdr->tag.blockNum);
3188
3189 smgrwrite(rel->rd_smgr,
3190 bufHdr->tag.forkNum,
3191 bufHdr->tag.blockNum,
3192 localpage,
3193 false);
3194
3195 buf_state &= ~(BM_DIRTY | BM_JUST_DIRTIED);
3196 pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
3197
3198 /* Pop the error context stack */
3199 error_context_stack = errcallback.previous;
3200 }
3201 }
3202
3203 return;
3204 }
3205
3206 /* Make sure we can handle the pin inside the loop */
3207 ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
3208
3209 for (i = 0; i < NBuffers; i++)
3210 {
3211 uint32 buf_state;
3212
3213 bufHdr = GetBufferDescriptor(i);
3214
3215 /*
3216 * As in DropRelFileNodeBuffers, an unlocked precheck should be safe
3217 * and saves some cycles.
3218 */
3219 if (!RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node))
3220 continue;
3221
3222 ReservePrivateRefCountEntry();
3223
3224 buf_state = LockBufHdr(bufHdr);
3225 if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) &&
3226 (buf_state & (BM_VALID | BM_DIRTY)) == (BM_VALID | BM_DIRTY))
3227 {
3228 PinBuffer_Locked(bufHdr);
3229 LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_SHARED);
3230 FlushBuffer(bufHdr, rel->rd_smgr);
3231 LWLockRelease(BufferDescriptorGetContentLock(bufHdr));
3232 UnpinBuffer(bufHdr, true);
3233 }
3234 else
3235 UnlockBufHdr(bufHdr, buf_state);
3236 }
3237 }
3238
3239 /* ---------------------------------------------------------------------
3240 * FlushDatabaseBuffers
3241 *
3242 * This function writes all dirty pages of a database out to disk
3243 * (or more accurately, out to kernel disk buffers), ensuring that the
3244 * kernel has an up-to-date view of the database.
3245 *
3246 * Generally, the caller should be holding an appropriate lock to ensure
3247 * no other backend is active in the target database; otherwise more
3248 * pages could get dirtied.
3249 *
3250 * Note we don't worry about flushing any pages of temporary relations.
3251 * It's assumed these wouldn't be interesting.
3252 * --------------------------------------------------------------------
3253 */
3254 void
FlushDatabaseBuffers(Oid dbid)3255 FlushDatabaseBuffers(Oid dbid)
3256 {
3257 int i;
3258 BufferDesc *bufHdr;
3259
3260 /* Make sure we can handle the pin inside the loop */
3261 ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
3262
3263 for (i = 0; i < NBuffers; i++)
3264 {
3265 uint32 buf_state;
3266
3267 bufHdr = GetBufferDescriptor(i);
3268
3269 /*
3270 * As in DropRelFileNodeBuffers, an unlocked precheck should be safe
3271 * and saves some cycles.
3272 */
3273 if (bufHdr->tag.rnode.dbNode != dbid)
3274 continue;
3275
3276 ReservePrivateRefCountEntry();
3277
3278 buf_state = LockBufHdr(bufHdr);
3279 if (bufHdr->tag.rnode.dbNode == dbid &&
3280 (buf_state & (BM_VALID | BM_DIRTY)) == (BM_VALID | BM_DIRTY))
3281 {
3282 PinBuffer_Locked(bufHdr);
3283 LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_SHARED);
3284 FlushBuffer(bufHdr, NULL);
3285 LWLockRelease(BufferDescriptorGetContentLock(bufHdr));
3286 UnpinBuffer(bufHdr, true);
3287 }
3288 else
3289 UnlockBufHdr(bufHdr, buf_state);
3290 }
3291 }
3292
3293 /*
3294 * Flush a previously, shared or exclusively, locked and pinned buffer to the
3295 * OS.
3296 */
3297 void
FlushOneBuffer(Buffer buffer)3298 FlushOneBuffer(Buffer buffer)
3299 {
3300 BufferDesc *bufHdr;
3301
3302 /* currently not needed, but no fundamental reason not to support */
3303 Assert(!BufferIsLocal(buffer));
3304
3305 Assert(BufferIsPinned(buffer));
3306
3307 bufHdr = GetBufferDescriptor(buffer - 1);
3308
3309 Assert(LWLockHeldByMe(BufferDescriptorGetContentLock(bufHdr)));
3310
3311 FlushBuffer(bufHdr, NULL);
3312 }
3313
3314 /*
3315 * ReleaseBuffer -- release the pin on a buffer
3316 */
3317 void
ReleaseBuffer(Buffer buffer)3318 ReleaseBuffer(Buffer buffer)
3319 {
3320 if (!BufferIsValid(buffer))
3321 elog(ERROR, "bad buffer ID: %d", buffer);
3322
3323 if (BufferIsLocal(buffer))
3324 {
3325 ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer);
3326
3327 Assert(LocalRefCount[-buffer - 1] > 0);
3328 LocalRefCount[-buffer - 1]--;
3329 return;
3330 }
3331
3332 UnpinBuffer(GetBufferDescriptor(buffer - 1), true);
3333 }
3334
3335 /*
3336 * UnlockReleaseBuffer -- release the content lock and pin on a buffer
3337 *
3338 * This is just a shorthand for a common combination.
3339 */
3340 void
UnlockReleaseBuffer(Buffer buffer)3341 UnlockReleaseBuffer(Buffer buffer)
3342 {
3343 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
3344 ReleaseBuffer(buffer);
3345 }
3346
3347 /*
3348 * IncrBufferRefCount
3349 * Increment the pin count on a buffer that we have *already* pinned
3350 * at least once.
3351 *
3352 * This function cannot be used on a buffer we do not have pinned,
3353 * because it doesn't change the shared buffer state.
3354 */
3355 void
IncrBufferRefCount(Buffer buffer)3356 IncrBufferRefCount(Buffer buffer)
3357 {
3358 Assert(BufferIsPinned(buffer));
3359 ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
3360 if (BufferIsLocal(buffer))
3361 LocalRefCount[-buffer - 1]++;
3362 else
3363 {
3364 PrivateRefCountEntry *ref;
3365
3366 ref = GetPrivateRefCountEntry(buffer, true);
3367 Assert(ref != NULL);
3368 ref->refcount++;
3369 }
3370 ResourceOwnerRememberBuffer(CurrentResourceOwner, buffer);
3371 }
3372
3373 /*
3374 * MarkBufferDirtyHint
3375 *
3376 * Mark a buffer dirty for non-critical changes.
3377 *
3378 * This is essentially the same as MarkBufferDirty, except:
3379 *
3380 * 1. The caller does not write WAL; so if checksums are enabled, we may need
3381 * to write an XLOG_FPI WAL record to protect against torn pages.
3382 * 2. The caller might have only share-lock instead of exclusive-lock on the
3383 * buffer's content lock.
3384 * 3. This function does not guarantee that the buffer is always marked dirty
3385 * (due to a race condition), so it cannot be used for important changes.
3386 */
3387 void
MarkBufferDirtyHint(Buffer buffer,bool buffer_std)3388 MarkBufferDirtyHint(Buffer buffer, bool buffer_std)
3389 {
3390 BufferDesc *bufHdr;
3391 Page page = BufferGetPage(buffer);
3392
3393 if (!BufferIsValid(buffer))
3394 elog(ERROR, "bad buffer ID: %d", buffer);
3395
3396 if (BufferIsLocal(buffer))
3397 {
3398 MarkLocalBufferDirty(buffer);
3399 return;
3400 }
3401
3402 bufHdr = GetBufferDescriptor(buffer - 1);
3403
3404 Assert(GetPrivateRefCount(buffer) > 0);
3405 /* here, either share or exclusive lock is OK */
3406 Assert(LWLockHeldByMe(BufferDescriptorGetContentLock(bufHdr)));
3407
3408 /*
3409 * This routine might get called many times on the same page, if we are
3410 * making the first scan after commit of an xact that added/deleted many
3411 * tuples. So, be as quick as we can if the buffer is already dirty. We
3412 * do this by not acquiring spinlock if it looks like the status bits are
3413 * already set. Since we make this test unlocked, there's a chance we
3414 * might fail to notice that the flags have just been cleared, and failed
3415 * to reset them, due to memory-ordering issues. But since this function
3416 * is only intended to be used in cases where failing to write out the
3417 * data would be harmless anyway, it doesn't really matter.
3418 */
3419 if ((pg_atomic_read_u32(&bufHdr->state) & (BM_DIRTY | BM_JUST_DIRTIED)) !=
3420 (BM_DIRTY | BM_JUST_DIRTIED))
3421 {
3422 XLogRecPtr lsn = InvalidXLogRecPtr;
3423 bool dirtied = false;
3424 bool delayChkpt = false;
3425 uint32 buf_state;
3426
3427 /*
3428 * If we need to protect hint bit updates from torn writes, WAL-log a
3429 * full page image of the page. This full page image is only necessary
3430 * if the hint bit update is the first change to the page since the
3431 * last checkpoint.
3432 *
3433 * We don't check full_page_writes here because that logic is included
3434 * when we call XLogInsert() since the value changes dynamically.
3435 */
3436 if (XLogHintBitIsNeeded() &&
3437 (pg_atomic_read_u32(&bufHdr->state) & BM_PERMANENT))
3438 {
3439 /*
3440 * If we're in recovery we cannot dirty a page because of a hint.
3441 * We can set the hint, just not dirty the page as a result so the
3442 * hint is lost when we evict the page or shutdown.
3443 *
3444 * See src/backend/storage/page/README for longer discussion.
3445 */
3446 if (RecoveryInProgress())
3447 return;
3448
3449 /*
3450 * If the block is already dirty because we either made a change
3451 * or set a hint already, then we don't need to write a full page
3452 * image. Note that aggressive cleaning of blocks dirtied by hint
3453 * bit setting would increase the call rate. Bulk setting of hint
3454 * bits would reduce the call rate...
3455 *
3456 * We must issue the WAL record before we mark the buffer dirty.
3457 * Otherwise we might write the page before we write the WAL. That
3458 * causes a race condition, since a checkpoint might occur between
3459 * writing the WAL record and marking the buffer dirty. We solve
3460 * that with a kluge, but one that is already in use during
3461 * transaction commit to prevent race conditions. Basically, we
3462 * simply prevent the checkpoint WAL record from being written
3463 * until we have marked the buffer dirty. We don't start the
3464 * checkpoint flush until we have marked dirty, so our checkpoint
3465 * must flush the change to disk successfully or the checkpoint
3466 * never gets written, so crash recovery will fix.
3467 *
3468 * It's possible we may enter here without an xid, so it is
3469 * essential that CreateCheckpoint waits for virtual transactions
3470 * rather than full transactionids.
3471 */
3472 MyPgXact->delayChkpt = delayChkpt = true;
3473 lsn = XLogSaveBufferForHint(buffer, buffer_std);
3474 }
3475
3476 buf_state = LockBufHdr(bufHdr);
3477
3478 Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
3479
3480 if (!(buf_state & BM_DIRTY))
3481 {
3482 dirtied = true; /* Means "will be dirtied by this action" */
3483
3484 /*
3485 * Set the page LSN if we wrote a backup block. We aren't supposed
3486 * to set this when only holding a share lock but as long as we
3487 * serialise it somehow we're OK. We choose to set LSN while
3488 * holding the buffer header lock, which causes any reader of an
3489 * LSN who holds only a share lock to also obtain a buffer header
3490 * lock before using PageGetLSN(), which is enforced in
3491 * BufferGetLSNAtomic().
3492 *
3493 * If checksums are enabled, you might think we should reset the
3494 * checksum here. That will happen when the page is written
3495 * sometime later in this checkpoint cycle.
3496 */
3497 if (!XLogRecPtrIsInvalid(lsn))
3498 PageSetLSN(page, lsn);
3499 }
3500
3501 buf_state |= BM_DIRTY | BM_JUST_DIRTIED;
3502 UnlockBufHdr(bufHdr, buf_state);
3503
3504 if (delayChkpt)
3505 MyPgXact->delayChkpt = false;
3506
3507 if (dirtied)
3508 {
3509 VacuumPageDirty++;
3510 pgBufferUsage.shared_blks_dirtied++;
3511 if (VacuumCostActive)
3512 VacuumCostBalance += VacuumCostPageDirty;
3513 }
3514 }
3515 }
3516
3517 /*
3518 * Release buffer content locks for shared buffers.
3519 *
3520 * Used to clean up after errors.
3521 *
3522 * Currently, we can expect that lwlock.c's LWLockReleaseAll() took care
3523 * of releasing buffer content locks per se; the only thing we need to deal
3524 * with here is clearing any PIN_COUNT request that was in progress.
3525 */
3526 void
UnlockBuffers(void)3527 UnlockBuffers(void)
3528 {
3529 BufferDesc *buf = PinCountWaitBuf;
3530
3531 if (buf)
3532 {
3533 uint32 buf_state;
3534
3535 buf_state = LockBufHdr(buf);
3536
3537 /*
3538 * Don't complain if flag bit not set; it could have been reset but we
3539 * got a cancel/die interrupt before getting the signal.
3540 */
3541 if ((buf_state & BM_PIN_COUNT_WAITER) != 0 &&
3542 buf->wait_backend_pid == MyProcPid)
3543 buf_state &= ~BM_PIN_COUNT_WAITER;
3544
3545 UnlockBufHdr(buf, buf_state);
3546
3547 PinCountWaitBuf = NULL;
3548 }
3549 }
3550
3551 /*
3552 * Acquire or release the content_lock for the buffer.
3553 */
3554 void
LockBuffer(Buffer buffer,int mode)3555 LockBuffer(Buffer buffer, int mode)
3556 {
3557 BufferDesc *buf;
3558
3559 Assert(BufferIsValid(buffer));
3560 if (BufferIsLocal(buffer))
3561 return; /* local buffers need no lock */
3562
3563 buf = GetBufferDescriptor(buffer - 1);
3564
3565 if (mode == BUFFER_LOCK_UNLOCK)
3566 LWLockRelease(BufferDescriptorGetContentLock(buf));
3567 else if (mode == BUFFER_LOCK_SHARE)
3568 LWLockAcquire(BufferDescriptorGetContentLock(buf), LW_SHARED);
3569 else if (mode == BUFFER_LOCK_EXCLUSIVE)
3570 LWLockAcquire(BufferDescriptorGetContentLock(buf), LW_EXCLUSIVE);
3571 else
3572 elog(ERROR, "unrecognized buffer lock mode: %d", mode);
3573 }
3574
3575 /*
3576 * Acquire the content_lock for the buffer, but only if we don't have to wait.
3577 *
3578 * This assumes the caller wants BUFFER_LOCK_EXCLUSIVE mode.
3579 */
3580 bool
ConditionalLockBuffer(Buffer buffer)3581 ConditionalLockBuffer(Buffer buffer)
3582 {
3583 BufferDesc *buf;
3584
3585 Assert(BufferIsValid(buffer));
3586 if (BufferIsLocal(buffer))
3587 return true; /* act as though we got it */
3588
3589 buf = GetBufferDescriptor(buffer - 1);
3590
3591 return LWLockConditionalAcquire(BufferDescriptorGetContentLock(buf),
3592 LW_EXCLUSIVE);
3593 }
3594
3595 /*
3596 * LockBufferForCleanup - lock a buffer in preparation for deleting items
3597 *
3598 * Items may be deleted from a disk page only when the caller (a) holds an
3599 * exclusive lock on the buffer and (b) has observed that no other backend
3600 * holds a pin on the buffer. If there is a pin, then the other backend
3601 * might have a pointer into the buffer (for example, a heapscan reference
3602 * to an item --- see README for more details). It's OK if a pin is added
3603 * after the cleanup starts, however; the newly-arrived backend will be
3604 * unable to look at the page until we release the exclusive lock.
3605 *
3606 * To implement this protocol, a would-be deleter must pin the buffer and
3607 * then call LockBufferForCleanup(). LockBufferForCleanup() is similar to
3608 * LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE), except that it loops until
3609 * it has successfully observed pin count = 1.
3610 */
3611 void
LockBufferForCleanup(Buffer buffer)3612 LockBufferForCleanup(Buffer buffer)
3613 {
3614 BufferDesc *bufHdr;
3615
3616 Assert(BufferIsValid(buffer));
3617 Assert(PinCountWaitBuf == NULL);
3618
3619 if (BufferIsLocal(buffer))
3620 {
3621 /* There should be exactly one pin */
3622 if (LocalRefCount[-buffer - 1] != 1)
3623 elog(ERROR, "incorrect local pin count: %d",
3624 LocalRefCount[-buffer - 1]);
3625 /* Nobody else to wait for */
3626 return;
3627 }
3628
3629 /* There should be exactly one local pin */
3630 if (GetPrivateRefCount(buffer) != 1)
3631 elog(ERROR, "incorrect local pin count: %d",
3632 GetPrivateRefCount(buffer));
3633
3634 bufHdr = GetBufferDescriptor(buffer - 1);
3635
3636 for (;;)
3637 {
3638 uint32 buf_state;
3639
3640 /* Try to acquire lock */
3641 LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
3642 buf_state = LockBufHdr(bufHdr);
3643
3644 Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
3645 if (BUF_STATE_GET_REFCOUNT(buf_state) == 1)
3646 {
3647 /* Successfully acquired exclusive lock with pincount 1 */
3648 UnlockBufHdr(bufHdr, buf_state);
3649 return;
3650 }
3651 /* Failed, so mark myself as waiting for pincount 1 */
3652 if (buf_state & BM_PIN_COUNT_WAITER)
3653 {
3654 UnlockBufHdr(bufHdr, buf_state);
3655 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
3656 elog(ERROR, "multiple backends attempting to wait for pincount 1");
3657 }
3658 bufHdr->wait_backend_pid = MyProcPid;
3659 PinCountWaitBuf = bufHdr;
3660 buf_state |= BM_PIN_COUNT_WAITER;
3661 UnlockBufHdr(bufHdr, buf_state);
3662 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
3663
3664 /* Wait to be signaled by UnpinBuffer() */
3665 if (InHotStandby)
3666 {
3667 /* Publish the bufid that Startup process waits on */
3668 SetStartupBufferPinWaitBufId(buffer - 1);
3669 /* Set alarm and then wait to be signaled by UnpinBuffer() */
3670 ResolveRecoveryConflictWithBufferPin();
3671 /* Reset the published bufid */
3672 SetStartupBufferPinWaitBufId(-1);
3673 }
3674 else
3675 ProcWaitForSignal(PG_WAIT_BUFFER_PIN);
3676
3677 /*
3678 * Remove flag marking us as waiter. Normally this will not be set
3679 * anymore, but ProcWaitForSignal() can return for other signals as
3680 * well. We take care to only reset the flag if we're the waiter, as
3681 * theoretically another backend could have started waiting. That's
3682 * impossible with the current usages due to table level locking, but
3683 * better be safe.
3684 */
3685 buf_state = LockBufHdr(bufHdr);
3686 if ((buf_state & BM_PIN_COUNT_WAITER) != 0 &&
3687 bufHdr->wait_backend_pid == MyProcPid)
3688 buf_state &= ~BM_PIN_COUNT_WAITER;
3689 UnlockBufHdr(bufHdr, buf_state);
3690
3691 PinCountWaitBuf = NULL;
3692 /* Loop back and try again */
3693 }
3694 }
3695
3696 /*
3697 * Check called from RecoveryConflictInterrupt handler when Startup
3698 * process requests cancellation of all pin holders that are blocking it.
3699 */
3700 bool
HoldingBufferPinThatDelaysRecovery(void)3701 HoldingBufferPinThatDelaysRecovery(void)
3702 {
3703 int bufid = GetStartupBufferPinWaitBufId();
3704
3705 /*
3706 * If we get woken slowly then it's possible that the Startup process was
3707 * already woken by other backends before we got here. Also possible that
3708 * we get here by multiple interrupts or interrupts at inappropriate
3709 * times, so make sure we do nothing if the bufid is not set.
3710 */
3711 if (bufid < 0)
3712 return false;
3713
3714 if (GetPrivateRefCount(bufid + 1) > 0)
3715 return true;
3716
3717 return false;
3718 }
3719
3720 /*
3721 * ConditionalLockBufferForCleanup - as above, but don't wait to get the lock
3722 *
3723 * We won't loop, but just check once to see if the pin count is OK. If
3724 * not, return FALSE with no lock held.
3725 */
3726 bool
ConditionalLockBufferForCleanup(Buffer buffer)3727 ConditionalLockBufferForCleanup(Buffer buffer)
3728 {
3729 BufferDesc *bufHdr;
3730 uint32 buf_state,
3731 refcount;
3732
3733 Assert(BufferIsValid(buffer));
3734
3735 if (BufferIsLocal(buffer))
3736 {
3737 refcount = LocalRefCount[-buffer - 1];
3738 /* There should be exactly one pin */
3739 Assert(refcount > 0);
3740 if (refcount != 1)
3741 return false;
3742 /* Nobody else to wait for */
3743 return true;
3744 }
3745
3746 /* There should be exactly one local pin */
3747 refcount = GetPrivateRefCount(buffer);
3748 Assert(refcount);
3749 if (refcount != 1)
3750 return false;
3751
3752 /* Try to acquire lock */
3753 if (!ConditionalLockBuffer(buffer))
3754 return false;
3755
3756 bufHdr = GetBufferDescriptor(buffer - 1);
3757 buf_state = LockBufHdr(bufHdr);
3758 refcount = BUF_STATE_GET_REFCOUNT(buf_state);
3759
3760 Assert(refcount > 0);
3761 if (refcount == 1)
3762 {
3763 /* Successfully acquired exclusive lock with pincount 1 */
3764 UnlockBufHdr(bufHdr, buf_state);
3765 return true;
3766 }
3767
3768 /* Failed, so release the lock */
3769 UnlockBufHdr(bufHdr, buf_state);
3770 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
3771 return false;
3772 }
3773
3774 /*
3775 * IsBufferCleanupOK - as above, but we already have the lock
3776 *
3777 * Check whether it's OK to perform cleanup on a buffer we've already
3778 * locked. If we observe that the pin count is 1, our exclusive lock
3779 * happens to be a cleanup lock, and we can proceed with anything that
3780 * would have been allowable had we sought a cleanup lock originally.
3781 */
3782 bool
IsBufferCleanupOK(Buffer buffer)3783 IsBufferCleanupOK(Buffer buffer)
3784 {
3785 BufferDesc *bufHdr;
3786 uint32 buf_state;
3787
3788 Assert(BufferIsValid(buffer));
3789
3790 if (BufferIsLocal(buffer))
3791 {
3792 /* There should be exactly one pin */
3793 if (LocalRefCount[-buffer - 1] != 1)
3794 return false;
3795 /* Nobody else to wait for */
3796 return true;
3797 }
3798
3799 /* There should be exactly one local pin */
3800 if (GetPrivateRefCount(buffer) != 1)
3801 return false;
3802
3803 bufHdr = GetBufferDescriptor(buffer - 1);
3804
3805 /* caller must hold exclusive lock on buffer */
3806 Assert(LWLockHeldByMeInMode(BufferDescriptorGetContentLock(bufHdr),
3807 LW_EXCLUSIVE));
3808
3809 buf_state = LockBufHdr(bufHdr);
3810
3811 Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
3812 if (BUF_STATE_GET_REFCOUNT(buf_state) == 1)
3813 {
3814 /* pincount is OK. */
3815 UnlockBufHdr(bufHdr, buf_state);
3816 return true;
3817 }
3818
3819 UnlockBufHdr(bufHdr, buf_state);
3820 return false;
3821 }
3822
3823
3824 /*
3825 * Functions for buffer I/O handling
3826 *
3827 * Note: We assume that nested buffer I/O never occurs.
3828 * i.e at most one io_in_progress lock is held per proc.
3829 *
3830 * Also note that these are used only for shared buffers, not local ones.
3831 */
3832
3833 /*
3834 * WaitIO -- Block until the IO_IN_PROGRESS flag on 'buf' is cleared.
3835 */
3836 static void
WaitIO(BufferDesc * buf)3837 WaitIO(BufferDesc *buf)
3838 {
3839 /*
3840 * Changed to wait until there's no IO - Inoue 01/13/2000
3841 *
3842 * Note this is *necessary* because an error abort in the process doing
3843 * I/O could release the io_in_progress_lock prematurely. See
3844 * AbortBufferIO.
3845 */
3846 for (;;)
3847 {
3848 uint32 buf_state;
3849
3850 /*
3851 * It may not be necessary to acquire the spinlock to check the flag
3852 * here, but since this test is essential for correctness, we'd better
3853 * play it safe.
3854 */
3855 buf_state = LockBufHdr(buf);
3856 UnlockBufHdr(buf, buf_state);
3857
3858 if (!(buf_state & BM_IO_IN_PROGRESS))
3859 break;
3860 LWLockAcquire(BufferDescriptorGetIOLock(buf), LW_SHARED);
3861 LWLockRelease(BufferDescriptorGetIOLock(buf));
3862 }
3863 }
3864
3865 /*
3866 * StartBufferIO: begin I/O on this buffer
3867 * (Assumptions)
3868 * My process is executing no IO
3869 * The buffer is Pinned
3870 *
3871 * In some scenarios there are race conditions in which multiple backends
3872 * could attempt the same I/O operation concurrently. If someone else
3873 * has already started I/O on this buffer then we will block on the
3874 * io_in_progress lock until he's done.
3875 *
3876 * Input operations are only attempted on buffers that are not BM_VALID,
3877 * and output operations only on buffers that are BM_VALID and BM_DIRTY,
3878 * so we can always tell if the work is already done.
3879 *
3880 * Returns TRUE if we successfully marked the buffer as I/O busy,
3881 * FALSE if someone else already did the work.
3882 */
3883 static bool
StartBufferIO(BufferDesc * buf,bool forInput)3884 StartBufferIO(BufferDesc *buf, bool forInput)
3885 {
3886 uint32 buf_state;
3887
3888 Assert(!InProgressBuf);
3889
3890 for (;;)
3891 {
3892 /*
3893 * Grab the io_in_progress lock so that other processes can wait for
3894 * me to finish the I/O.
3895 */
3896 LWLockAcquire(BufferDescriptorGetIOLock(buf), LW_EXCLUSIVE);
3897
3898 buf_state = LockBufHdr(buf);
3899
3900 if (!(buf_state & BM_IO_IN_PROGRESS))
3901 break;
3902
3903 /*
3904 * The only way BM_IO_IN_PROGRESS could be set when the io_in_progress
3905 * lock isn't held is if the process doing the I/O is recovering from
3906 * an error (see AbortBufferIO). If that's the case, we must wait for
3907 * him to get unwedged.
3908 */
3909 UnlockBufHdr(buf, buf_state);
3910 LWLockRelease(BufferDescriptorGetIOLock(buf));
3911 WaitIO(buf);
3912 }
3913
3914 /* Once we get here, there is definitely no I/O active on this buffer */
3915
3916 if (forInput ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY))
3917 {
3918 /* someone else already did the I/O */
3919 UnlockBufHdr(buf, buf_state);
3920 LWLockRelease(BufferDescriptorGetIOLock(buf));
3921 return false;
3922 }
3923
3924 buf_state |= BM_IO_IN_PROGRESS;
3925 UnlockBufHdr(buf, buf_state);
3926
3927 InProgressBuf = buf;
3928 IsForInput = forInput;
3929
3930 return true;
3931 }
3932
3933 /*
3934 * TerminateBufferIO: release a buffer we were doing I/O on
3935 * (Assumptions)
3936 * My process is executing IO for the buffer
3937 * BM_IO_IN_PROGRESS bit is set for the buffer
3938 * We hold the buffer's io_in_progress lock
3939 * The buffer is Pinned
3940 *
3941 * If clear_dirty is TRUE and BM_JUST_DIRTIED is not set, we clear the
3942 * buffer's BM_DIRTY flag. This is appropriate when terminating a
3943 * successful write. The check on BM_JUST_DIRTIED is necessary to avoid
3944 * marking the buffer clean if it was re-dirtied while we were writing.
3945 *
3946 * set_flag_bits gets ORed into the buffer's flags. It must include
3947 * BM_IO_ERROR in a failure case. For successful completion it could
3948 * be 0, or BM_VALID if we just finished reading in the page.
3949 */
3950 static void
TerminateBufferIO(BufferDesc * buf,bool clear_dirty,uint32 set_flag_bits)3951 TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits)
3952 {
3953 uint32 buf_state;
3954
3955 Assert(buf == InProgressBuf);
3956
3957 buf_state = LockBufHdr(buf);
3958
3959 Assert(buf_state & BM_IO_IN_PROGRESS);
3960
3961 buf_state &= ~(BM_IO_IN_PROGRESS | BM_IO_ERROR);
3962 if (clear_dirty && !(buf_state & BM_JUST_DIRTIED))
3963 buf_state &= ~(BM_DIRTY | BM_CHECKPOINT_NEEDED);
3964
3965 buf_state |= set_flag_bits;
3966 UnlockBufHdr(buf, buf_state);
3967
3968 InProgressBuf = NULL;
3969
3970 LWLockRelease(BufferDescriptorGetIOLock(buf));
3971 }
3972
3973 /*
3974 * AbortBufferIO: Clean up any active buffer I/O after an error.
3975 *
3976 * All LWLocks we might have held have been released,
3977 * but we haven't yet released buffer pins, so the buffer is still pinned.
3978 *
3979 * If I/O was in progress, we always set BM_IO_ERROR, even though it's
3980 * possible the error condition wasn't related to the I/O.
3981 */
3982 void
AbortBufferIO(void)3983 AbortBufferIO(void)
3984 {
3985 BufferDesc *buf = InProgressBuf;
3986
3987 if (buf)
3988 {
3989 uint32 buf_state;
3990
3991 /*
3992 * Since LWLockReleaseAll has already been called, we're not holding
3993 * the buffer's io_in_progress_lock. We have to re-acquire it so that
3994 * we can use TerminateBufferIO. Anyone who's executing WaitIO on the
3995 * buffer will be in a busy spin until we succeed in doing this.
3996 */
3997 LWLockAcquire(BufferDescriptorGetIOLock(buf), LW_EXCLUSIVE);
3998
3999 buf_state = LockBufHdr(buf);
4000 Assert(buf_state & BM_IO_IN_PROGRESS);
4001 if (IsForInput)
4002 {
4003 Assert(!(buf_state & BM_DIRTY));
4004
4005 /* We'd better not think buffer is valid yet */
4006 Assert(!(buf_state & BM_VALID));
4007 UnlockBufHdr(buf, buf_state);
4008 }
4009 else
4010 {
4011 Assert(buf_state & BM_DIRTY);
4012 UnlockBufHdr(buf, buf_state);
4013 /* Issue notice if this is not the first failure... */
4014 if (buf_state & BM_IO_ERROR)
4015 {
4016 /* Buffer is pinned, so we can read tag without spinlock */
4017 char *path;
4018
4019 path = relpathperm(buf->tag.rnode, buf->tag.forkNum);
4020 ereport(WARNING,
4021 (errcode(ERRCODE_IO_ERROR),
4022 errmsg("could not write block %u of %s",
4023 buf->tag.blockNum, path),
4024 errdetail("Multiple failures --- write error might be permanent.")));
4025 pfree(path);
4026 }
4027 }
4028 TerminateBufferIO(buf, false, BM_IO_ERROR);
4029 }
4030 }
4031
4032 /*
4033 * Error context callback for errors occurring during shared buffer writes.
4034 */
4035 static void
shared_buffer_write_error_callback(void * arg)4036 shared_buffer_write_error_callback(void *arg)
4037 {
4038 BufferDesc *bufHdr = (BufferDesc *) arg;
4039
4040 /* Buffer is pinned, so we can read the tag without locking the spinlock */
4041 if (bufHdr != NULL)
4042 {
4043 char *path = relpathperm(bufHdr->tag.rnode, bufHdr->tag.forkNum);
4044
4045 errcontext("writing block %u of relation %s",
4046 bufHdr->tag.blockNum, path);
4047 pfree(path);
4048 }
4049 }
4050
4051 /*
4052 * Error context callback for errors occurring during local buffer writes.
4053 */
4054 static void
local_buffer_write_error_callback(void * arg)4055 local_buffer_write_error_callback(void *arg)
4056 {
4057 BufferDesc *bufHdr = (BufferDesc *) arg;
4058
4059 if (bufHdr != NULL)
4060 {
4061 char *path = relpathbackend(bufHdr->tag.rnode, MyBackendId,
4062 bufHdr->tag.forkNum);
4063
4064 errcontext("writing block %u of relation %s",
4065 bufHdr->tag.blockNum, path);
4066 pfree(path);
4067 }
4068 }
4069
4070 /*
4071 * RelFileNode qsort/bsearch comparator; see RelFileNodeEquals.
4072 */
4073 static int
rnode_comparator(const void * p1,const void * p2)4074 rnode_comparator(const void *p1, const void *p2)
4075 {
4076 RelFileNode n1 = *(const RelFileNode *) p1;
4077 RelFileNode n2 = *(const RelFileNode *) p2;
4078
4079 if (n1.relNode < n2.relNode)
4080 return -1;
4081 else if (n1.relNode > n2.relNode)
4082 return 1;
4083
4084 if (n1.dbNode < n2.dbNode)
4085 return -1;
4086 else if (n1.dbNode > n2.dbNode)
4087 return 1;
4088
4089 if (n1.spcNode < n2.spcNode)
4090 return -1;
4091 else if (n1.spcNode > n2.spcNode)
4092 return 1;
4093 else
4094 return 0;
4095 }
4096
4097 /*
4098 * Lock buffer header - set BM_LOCKED in buffer state.
4099 */
4100 uint32
LockBufHdr(BufferDesc * desc)4101 LockBufHdr(BufferDesc *desc)
4102 {
4103 SpinDelayStatus delayStatus;
4104 uint32 old_buf_state;
4105
4106 init_local_spin_delay(&delayStatus);
4107
4108 while (true)
4109 {
4110 /* set BM_LOCKED flag */
4111 old_buf_state = pg_atomic_fetch_or_u32(&desc->state, BM_LOCKED);
4112 /* if it wasn't set before we're OK */
4113 if (!(old_buf_state & BM_LOCKED))
4114 break;
4115 perform_spin_delay(&delayStatus);
4116 }
4117 finish_spin_delay(&delayStatus);
4118 return old_buf_state | BM_LOCKED;
4119 }
4120
4121 /*
4122 * Wait until the BM_LOCKED flag isn't set anymore and return the buffer's
4123 * state at that point.
4124 *
4125 * Obviously the buffer could be locked by the time the value is returned, so
4126 * this is primarily useful in CAS style loops.
4127 */
4128 static uint32
WaitBufHdrUnlocked(BufferDesc * buf)4129 WaitBufHdrUnlocked(BufferDesc *buf)
4130 {
4131 SpinDelayStatus delayStatus;
4132 uint32 buf_state;
4133
4134 init_local_spin_delay(&delayStatus);
4135
4136 buf_state = pg_atomic_read_u32(&buf->state);
4137
4138 while (buf_state & BM_LOCKED)
4139 {
4140 perform_spin_delay(&delayStatus);
4141 buf_state = pg_atomic_read_u32(&buf->state);
4142 }
4143
4144 finish_spin_delay(&delayStatus);
4145
4146 return buf_state;
4147 }
4148
4149 /*
4150 * BufferTag comparator.
4151 */
4152 static int
buffertag_comparator(const void * a,const void * b)4153 buffertag_comparator(const void *a, const void *b)
4154 {
4155 const BufferTag *ba = (const BufferTag *) a;
4156 const BufferTag *bb = (const BufferTag *) b;
4157 int ret;
4158
4159 ret = rnode_comparator(&ba->rnode, &bb->rnode);
4160
4161 if (ret != 0)
4162 return ret;
4163
4164 if (ba->forkNum < bb->forkNum)
4165 return -1;
4166 if (ba->forkNum > bb->forkNum)
4167 return 1;
4168
4169 if (ba->blockNum < bb->blockNum)
4170 return -1;
4171 if (ba->blockNum > bb->blockNum)
4172 return 1;
4173
4174 return 0;
4175 }
4176
4177 /*
4178 * Comparator determining the writeout order in a checkpoint.
4179 *
4180 * It is important that tablespaces are compared first, the logic balancing
4181 * writes between tablespaces relies on it.
4182 */
4183 static int
ckpt_buforder_comparator(const void * pa,const void * pb)4184 ckpt_buforder_comparator(const void *pa, const void *pb)
4185 {
4186 const CkptSortItem *a = (const CkptSortItem *) pa;
4187 const CkptSortItem *b = (const CkptSortItem *) pb;
4188
4189 /* compare tablespace */
4190 if (a->tsId < b->tsId)
4191 return -1;
4192 else if (a->tsId > b->tsId)
4193 return 1;
4194 /* compare relation */
4195 if (a->relNode < b->relNode)
4196 return -1;
4197 else if (a->relNode > b->relNode)
4198 return 1;
4199 /* compare fork */
4200 else if (a->forkNum < b->forkNum)
4201 return -1;
4202 else if (a->forkNum > b->forkNum)
4203 return 1;
4204 /* compare block number */
4205 else if (a->blockNum < b->blockNum)
4206 return -1;
4207 else if (a->blockNum > b->blockNum)
4208 return 1;
4209 /* equal page IDs are unlikely, but not impossible */
4210 return 0;
4211 }
4212
4213 /*
4214 * Comparator for a Min-Heap over the per-tablespace checkpoint completion
4215 * progress.
4216 */
4217 static int
ts_ckpt_progress_comparator(Datum a,Datum b,void * arg)4218 ts_ckpt_progress_comparator(Datum a, Datum b, void *arg)
4219 {
4220 CkptTsStatus *sa = (CkptTsStatus *) a;
4221 CkptTsStatus *sb = (CkptTsStatus *) b;
4222
4223 /* we want a min-heap, so return 1 for the a < b */
4224 if (sa->progress < sb->progress)
4225 return 1;
4226 else if (sa->progress == sb->progress)
4227 return 0;
4228 else
4229 return -1;
4230 }
4231
4232 /*
4233 * Initialize a writeback context, discarding potential previous state.
4234 *
4235 * *max_pending is a pointer instead of an immediate value, so the coalesce
4236 * limits can easily changed by the GUC mechanism, and so calling code does
4237 * not have to check the current configuration. A value is 0 means that no
4238 * writeback control will be performed.
4239 */
4240 void
WritebackContextInit(WritebackContext * context,int * max_pending)4241 WritebackContextInit(WritebackContext *context, int *max_pending)
4242 {
4243 Assert(*max_pending <= WRITEBACK_MAX_PENDING_FLUSHES);
4244
4245 context->max_pending = max_pending;
4246 context->nr_pending = 0;
4247 }
4248
4249 /*
4250 * Add buffer to list of pending writeback requests.
4251 */
4252 void
ScheduleBufferTagForWriteback(WritebackContext * context,BufferTag * tag)4253 ScheduleBufferTagForWriteback(WritebackContext *context, BufferTag *tag)
4254 {
4255 PendingWriteback *pending;
4256
4257 /*
4258 * Add buffer to the pending writeback array, unless writeback control is
4259 * disabled.
4260 */
4261 if (*context->max_pending > 0)
4262 {
4263 Assert(*context->max_pending <= WRITEBACK_MAX_PENDING_FLUSHES);
4264
4265 pending = &context->pending_writebacks[context->nr_pending++];
4266
4267 pending->tag = *tag;
4268 }
4269
4270 /*
4271 * Perform pending flushes if the writeback limit is exceeded. This
4272 * includes the case where previously an item has been added, but control
4273 * is now disabled.
4274 */
4275 if (context->nr_pending >= *context->max_pending)
4276 IssuePendingWritebacks(context);
4277 }
4278
4279 /*
4280 * Issue all pending writeback requests, previously scheduled with
4281 * ScheduleBufferTagForWriteback, to the OS.
4282 *
4283 * Because this is only used to improve the OSs IO scheduling we try to never
4284 * error out - it's just a hint.
4285 */
4286 void
IssuePendingWritebacks(WritebackContext * context)4287 IssuePendingWritebacks(WritebackContext *context)
4288 {
4289 int i;
4290
4291 if (context->nr_pending == 0)
4292 return;
4293
4294 /*
4295 * Executing the writes in-order can make them a lot faster, and allows to
4296 * merge writeback requests to consecutive blocks into larger writebacks.
4297 */
4298 qsort(&context->pending_writebacks, context->nr_pending,
4299 sizeof(PendingWriteback), buffertag_comparator);
4300
4301 /*
4302 * Coalesce neighbouring writes, but nothing else. For that we iterate
4303 * through the, now sorted, array of pending flushes, and look forward to
4304 * find all neighbouring (or identical) writes.
4305 */
4306 for (i = 0; i < context->nr_pending; i++)
4307 {
4308 PendingWriteback *cur;
4309 PendingWriteback *next;
4310 SMgrRelation reln;
4311 int ahead;
4312 BufferTag tag;
4313 Size nblocks = 1;
4314
4315 cur = &context->pending_writebacks[i];
4316 tag = cur->tag;
4317
4318 /*
4319 * Peek ahead, into following writeback requests, to see if they can
4320 * be combined with the current one.
4321 */
4322 for (ahead = 0; i + ahead + 1 < context->nr_pending; ahead++)
4323 {
4324 next = &context->pending_writebacks[i + ahead + 1];
4325
4326 /* different file, stop */
4327 if (!RelFileNodeEquals(cur->tag.rnode, next->tag.rnode) ||
4328 cur->tag.forkNum != next->tag.forkNum)
4329 break;
4330
4331 /* ok, block queued twice, skip */
4332 if (cur->tag.blockNum == next->tag.blockNum)
4333 continue;
4334
4335 /* only merge consecutive writes */
4336 if (cur->tag.blockNum + 1 != next->tag.blockNum)
4337 break;
4338
4339 nblocks++;
4340 cur = next;
4341 }
4342
4343 i += ahead;
4344
4345 /* and finally tell the kernel to write the data to storage */
4346 reln = smgropen(tag.rnode, InvalidBackendId);
4347 smgrwriteback(reln, tag.forkNum, tag.blockNum, nblocks);
4348 }
4349
4350 context->nr_pending = 0;
4351 }
4352
4353
4354 /*
4355 * Implement slower/larger portions of TestForOldSnapshot
4356 *
4357 * Smaller/faster portions are put inline, but the entire set of logic is too
4358 * big for that.
4359 */
4360 void
TestForOldSnapshot_impl(Snapshot snapshot,Relation relation)4361 TestForOldSnapshot_impl(Snapshot snapshot, Relation relation)
4362 {
4363 if (RelationAllowsEarlyPruning(relation)
4364 && (snapshot)->whenTaken < GetOldSnapshotThresholdTimestamp())
4365 ereport(ERROR,
4366 (errcode(ERRCODE_SNAPSHOT_TOO_OLD),
4367 errmsg("snapshot too old")));
4368 }
4369