1 /*-------------------------------------------------------------------------
2  *
3  * slru.c
4  *		Simple LRU buffering for transaction status logfiles
5  *
6  * We use a simple least-recently-used scheme to manage a pool of page
7  * buffers.  Under ordinary circumstances we expect that write
8  * traffic will occur mostly to the latest page (and to the just-prior
9  * page, soon after a page transition).  Read traffic will probably touch
10  * a larger span of pages, but in any case a fairly small number of page
11  * buffers should be sufficient.  So, we just search the buffers using plain
12  * linear search; there's no need for a hashtable or anything fancy.
13  * The management algorithm is straight LRU except that we will never swap
14  * out the latest page (since we know it's going to be hit again eventually).
15  *
16  * We use a control LWLock to protect the shared data structures, plus
17  * per-buffer LWLocks that synchronize I/O for each buffer.  The control lock
18  * must be held to examine or modify any shared state.  A process that is
19  * reading in or writing out a page buffer does not hold the control lock,
20  * only the per-buffer lock for the buffer it is working on.
21  *
22  * "Holding the control lock" means exclusive lock in all cases except for
23  * SimpleLruReadPage_ReadOnly(); see comments for SlruRecentlyUsed() for
24  * the implications of that.
25  *
26  * When initiating I/O on a buffer, we acquire the per-buffer lock exclusively
27  * before releasing the control lock.  The per-buffer lock is released after
28  * completing the I/O, re-acquiring the control lock, and updating the shared
29  * state.  (Deadlock is not possible here, because we never try to initiate
30  * I/O when someone else is already doing I/O on the same buffer.)
31  * To wait for I/O to complete, release the control lock, acquire the
32  * per-buffer lock in shared mode, immediately release the per-buffer lock,
33  * reacquire the control lock, and then recheck state (since arbitrary things
34  * could have happened while we didn't have the lock).
35  *
36  * As with the regular buffer manager, it is possible for another process
37  * to re-dirty a page that is currently being written out.  This is handled
38  * by re-setting the page's page_dirty flag.
39  *
40  *
41  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
42  * Portions Copyright (c) 1994, Regents of the University of California
43  *
44  * src/backend/access/transam/slru.c
45  *
46  *-------------------------------------------------------------------------
47  */
48 #include "postgres.h"
49 
50 #include <fcntl.h>
51 #include <sys/stat.h>
52 #include <unistd.h>
53 
54 #include "access/slru.h"
55 #include "access/transam.h"
56 #include "access/xlog.h"
57 #include "pgstat.h"
58 #include "storage/fd.h"
59 #include "storage/shmem.h"
60 #include "miscadmin.h"
61 
62 
63 #define SlruFileName(ctl, path, seg) \
64 	snprintf(path, MAXPGPATH, "%s/%04X", (ctl)->Dir, seg)
65 
66 /*
67  * During SimpleLruFlush(), we will usually not need to write/fsync more
68  * than one or two physical files, but we may need to write several pages
69  * per file.  We can consolidate the I/O requests by leaving files open
70  * until control returns to SimpleLruFlush().  This data structure remembers
71  * which files are open.
72  */
73 #define MAX_FLUSH_BUFFERS	16
74 
75 typedef struct SlruFlushData
76 {
77 	int			num_files;		/* # files actually open */
78 	int			fd[MAX_FLUSH_BUFFERS];	/* their FD's */
79 	int			segno[MAX_FLUSH_BUFFERS];	/* their log seg#s */
80 } SlruFlushData;
81 
82 typedef struct SlruFlushData *SlruFlush;
83 
84 /*
85  * Macro to mark a buffer slot "most recently used".  Note multiple evaluation
86  * of arguments!
87  *
88  * The reason for the if-test is that there are often many consecutive
89  * accesses to the same page (particularly the latest page).  By suppressing
90  * useless increments of cur_lru_count, we reduce the probability that old
91  * pages' counts will "wrap around" and make them appear recently used.
92  *
93  * We allow this code to be executed concurrently by multiple processes within
94  * SimpleLruReadPage_ReadOnly().  As long as int reads and writes are atomic,
95  * this should not cause any completely-bogus values to enter the computation.
96  * However, it is possible for either cur_lru_count or individual
97  * page_lru_count entries to be "reset" to lower values than they should have,
98  * in case a process is delayed while it executes this macro.  With care in
99  * SlruSelectLRUPage(), this does little harm, and in any case the absolute
100  * worst possible consequence is a nonoptimal choice of page to evict.  The
101  * gain from allowing concurrent reads of SLRU pages seems worth it.
102  */
103 #define SlruRecentlyUsed(shared, slotno)	\
104 	do { \
105 		int		new_lru_count = (shared)->cur_lru_count; \
106 		if (new_lru_count != (shared)->page_lru_count[slotno]) { \
107 			(shared)->cur_lru_count = ++new_lru_count; \
108 			(shared)->page_lru_count[slotno] = new_lru_count; \
109 		} \
110 	} while (0)
111 
112 /* Saved info for SlruReportIOError */
113 typedef enum
114 {
115 	SLRU_OPEN_FAILED,
116 	SLRU_SEEK_FAILED,
117 	SLRU_READ_FAILED,
118 	SLRU_WRITE_FAILED,
119 	SLRU_FSYNC_FAILED,
120 	SLRU_CLOSE_FAILED
121 } SlruErrorCause;
122 
123 static SlruErrorCause slru_errcause;
124 static int	slru_errno;
125 
126 
127 static void SimpleLruZeroLSNs(SlruCtl ctl, int slotno);
128 static void SimpleLruWaitIO(SlruCtl ctl, int slotno);
129 static void SlruInternalWritePage(SlruCtl ctl, int slotno, SlruFlush fdata);
130 static bool SlruPhysicalReadPage(SlruCtl ctl, int pageno, int slotno);
131 static bool SlruPhysicalWritePage(SlruCtl ctl, int pageno, int slotno,
132 								  SlruFlush fdata);
133 static void SlruReportIOError(SlruCtl ctl, int pageno, TransactionId xid);
134 static int	SlruSelectLRUPage(SlruCtl ctl, int pageno);
135 
136 static bool SlruScanDirCbDeleteCutoff(SlruCtl ctl, char *filename,
137 									  int segpage, void *data);
138 static void SlruInternalDeleteSegment(SlruCtl ctl, char *filename);
139 
140 /*
141  * Initialization of shared memory
142  */
143 
144 Size
SimpleLruShmemSize(int nslots,int nlsns)145 SimpleLruShmemSize(int nslots, int nlsns)
146 {
147 	Size		sz;
148 
149 	/* we assume nslots isn't so large as to risk overflow */
150 	sz = MAXALIGN(sizeof(SlruSharedData));
151 	sz += MAXALIGN(nslots * sizeof(char *));	/* page_buffer[] */
152 	sz += MAXALIGN(nslots * sizeof(SlruPageStatus));	/* page_status[] */
153 	sz += MAXALIGN(nslots * sizeof(bool));	/* page_dirty[] */
154 	sz += MAXALIGN(nslots * sizeof(int));	/* page_number[] */
155 	sz += MAXALIGN(nslots * sizeof(int));	/* page_lru_count[] */
156 	sz += MAXALIGN(nslots * sizeof(LWLockPadded));	/* buffer_locks[] */
157 
158 	if (nlsns > 0)
159 		sz += MAXALIGN(nslots * nlsns * sizeof(XLogRecPtr));	/* group_lsn[] */
160 
161 	return BUFFERALIGN(sz) + BLCKSZ * nslots;
162 }
163 
164 void
SimpleLruInit(SlruCtl ctl,const char * name,int nslots,int nlsns,LWLock * ctllock,const char * subdir,int tranche_id)165 SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns,
166 			  LWLock *ctllock, const char *subdir, int tranche_id)
167 {
168 	SlruShared	shared;
169 	bool		found;
170 
171 	shared = (SlruShared) ShmemInitStruct(name,
172 										  SimpleLruShmemSize(nslots, nlsns),
173 										  &found);
174 
175 	if (!IsUnderPostmaster)
176 	{
177 		/* Initialize locks and shared memory area */
178 		char	   *ptr;
179 		Size		offset;
180 		int			slotno;
181 
182 		Assert(!found);
183 
184 		memset(shared, 0, sizeof(SlruSharedData));
185 
186 		shared->ControlLock = ctllock;
187 
188 		shared->num_slots = nslots;
189 		shared->lsn_groups_per_page = nlsns;
190 
191 		shared->cur_lru_count = 0;
192 
193 		/* shared->latest_page_number will be set later */
194 
195 		ptr = (char *) shared;
196 		offset = MAXALIGN(sizeof(SlruSharedData));
197 		shared->page_buffer = (char **) (ptr + offset);
198 		offset += MAXALIGN(nslots * sizeof(char *));
199 		shared->page_status = (SlruPageStatus *) (ptr + offset);
200 		offset += MAXALIGN(nslots * sizeof(SlruPageStatus));
201 		shared->page_dirty = (bool *) (ptr + offset);
202 		offset += MAXALIGN(nslots * sizeof(bool));
203 		shared->page_number = (int *) (ptr + offset);
204 		offset += MAXALIGN(nslots * sizeof(int));
205 		shared->page_lru_count = (int *) (ptr + offset);
206 		offset += MAXALIGN(nslots * sizeof(int));
207 
208 		/* Initialize LWLocks */
209 		shared->buffer_locks = (LWLockPadded *) (ptr + offset);
210 		offset += MAXALIGN(nslots * sizeof(LWLockPadded));
211 
212 		if (nlsns > 0)
213 		{
214 			shared->group_lsn = (XLogRecPtr *) (ptr + offset);
215 			offset += MAXALIGN(nslots * nlsns * sizeof(XLogRecPtr));
216 		}
217 
218 		Assert(strlen(name) + 1 < SLRU_MAX_NAME_LENGTH);
219 		strlcpy(shared->lwlock_tranche_name, name, SLRU_MAX_NAME_LENGTH);
220 		shared->lwlock_tranche_id = tranche_id;
221 
222 		ptr += BUFFERALIGN(offset);
223 		for (slotno = 0; slotno < nslots; slotno++)
224 		{
225 			LWLockInitialize(&shared->buffer_locks[slotno].lock,
226 							 shared->lwlock_tranche_id);
227 
228 			shared->page_buffer[slotno] = ptr;
229 			shared->page_status[slotno] = SLRU_PAGE_EMPTY;
230 			shared->page_dirty[slotno] = false;
231 			shared->page_lru_count[slotno] = 0;
232 			ptr += BLCKSZ;
233 		}
234 
235 		/* Should fit to estimated shmem size */
236 		Assert(ptr - (char *) shared <= SimpleLruShmemSize(nslots, nlsns));
237 	}
238 	else
239 		Assert(found);
240 
241 	/* Register SLRU tranche in the main tranches array */
242 	LWLockRegisterTranche(shared->lwlock_tranche_id,
243 						  shared->lwlock_tranche_name);
244 
245 	/*
246 	 * Initialize the unshared control struct, including directory path. We
247 	 * assume caller set PagePrecedes.
248 	 */
249 	ctl->shared = shared;
250 	ctl->do_fsync = true;		/* default behavior */
251 	StrNCpy(ctl->Dir, subdir, sizeof(ctl->Dir));
252 }
253 
254 /*
255  * Initialize (or reinitialize) a page to zeroes.
256  *
257  * The page is not actually written, just set up in shared memory.
258  * The slot number of the new page is returned.
259  *
260  * Control lock must be held at entry, and will be held at exit.
261  */
262 int
SimpleLruZeroPage(SlruCtl ctl,int pageno)263 SimpleLruZeroPage(SlruCtl ctl, int pageno)
264 {
265 	SlruShared	shared = ctl->shared;
266 	int			slotno;
267 
268 	/* Find a suitable buffer slot for the page */
269 	slotno = SlruSelectLRUPage(ctl, pageno);
270 	Assert(shared->page_status[slotno] == SLRU_PAGE_EMPTY ||
271 		   (shared->page_status[slotno] == SLRU_PAGE_VALID &&
272 			!shared->page_dirty[slotno]) ||
273 		   shared->page_number[slotno] == pageno);
274 
275 	/* Mark the slot as containing this page */
276 	shared->page_number[slotno] = pageno;
277 	shared->page_status[slotno] = SLRU_PAGE_VALID;
278 	shared->page_dirty[slotno] = true;
279 	SlruRecentlyUsed(shared, slotno);
280 
281 	/* Set the buffer to zeroes */
282 	MemSet(shared->page_buffer[slotno], 0, BLCKSZ);
283 
284 	/* Set the LSNs for this new page to zero */
285 	SimpleLruZeroLSNs(ctl, slotno);
286 
287 	/* Assume this page is now the latest active page */
288 	shared->latest_page_number = pageno;
289 
290 	return slotno;
291 }
292 
293 /*
294  * Zero all the LSNs we store for this slru page.
295  *
296  * This should be called each time we create a new page, and each time we read
297  * in a page from disk into an existing buffer.  (Such an old page cannot
298  * have any interesting LSNs, since we'd have flushed them before writing
299  * the page in the first place.)
300  *
301  * This assumes that InvalidXLogRecPtr is bitwise-all-0.
302  */
303 static void
SimpleLruZeroLSNs(SlruCtl ctl,int slotno)304 SimpleLruZeroLSNs(SlruCtl ctl, int slotno)
305 {
306 	SlruShared	shared = ctl->shared;
307 
308 	if (shared->lsn_groups_per_page > 0)
309 		MemSet(&shared->group_lsn[slotno * shared->lsn_groups_per_page], 0,
310 			   shared->lsn_groups_per_page * sizeof(XLogRecPtr));
311 }
312 
313 /*
314  * Wait for any active I/O on a page slot to finish.  (This does not
315  * guarantee that new I/O hasn't been started before we return, though.
316  * In fact the slot might not even contain the same page anymore.)
317  *
318  * Control lock must be held at entry, and will be held at exit.
319  */
320 static void
SimpleLruWaitIO(SlruCtl ctl,int slotno)321 SimpleLruWaitIO(SlruCtl ctl, int slotno)
322 {
323 	SlruShared	shared = ctl->shared;
324 
325 	/* See notes at top of file */
326 	LWLockRelease(shared->ControlLock);
327 	LWLockAcquire(&shared->buffer_locks[slotno].lock, LW_SHARED);
328 	LWLockRelease(&shared->buffer_locks[slotno].lock);
329 	LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
330 
331 	/*
332 	 * If the slot is still in an io-in-progress state, then either someone
333 	 * already started a new I/O on the slot, or a previous I/O failed and
334 	 * neglected to reset the page state.  That shouldn't happen, really, but
335 	 * it seems worth a few extra cycles to check and recover from it. We can
336 	 * cheaply test for failure by seeing if the buffer lock is still held (we
337 	 * assume that transaction abort would release the lock).
338 	 */
339 	if (shared->page_status[slotno] == SLRU_PAGE_READ_IN_PROGRESS ||
340 		shared->page_status[slotno] == SLRU_PAGE_WRITE_IN_PROGRESS)
341 	{
342 		if (LWLockConditionalAcquire(&shared->buffer_locks[slotno].lock, LW_SHARED))
343 		{
344 			/* indeed, the I/O must have failed */
345 			if (shared->page_status[slotno] == SLRU_PAGE_READ_IN_PROGRESS)
346 				shared->page_status[slotno] = SLRU_PAGE_EMPTY;
347 			else				/* write_in_progress */
348 			{
349 				shared->page_status[slotno] = SLRU_PAGE_VALID;
350 				shared->page_dirty[slotno] = true;
351 			}
352 			LWLockRelease(&shared->buffer_locks[slotno].lock);
353 		}
354 	}
355 }
356 
357 /*
358  * Find a page in a shared buffer, reading it in if necessary.
359  * The page number must correspond to an already-initialized page.
360  *
361  * If write_ok is true then it is OK to return a page that is in
362  * WRITE_IN_PROGRESS state; it is the caller's responsibility to be sure
363  * that modification of the page is safe.  If write_ok is false then we
364  * will not return the page until it is not undergoing active I/O.
365  *
366  * The passed-in xid is used only for error reporting, and may be
367  * InvalidTransactionId if no specific xid is associated with the action.
368  *
369  * Return value is the shared-buffer slot number now holding the page.
370  * The buffer's LRU access info is updated.
371  *
372  * Control lock must be held at entry, and will be held at exit.
373  */
374 int
SimpleLruReadPage(SlruCtl ctl,int pageno,bool write_ok,TransactionId xid)375 SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok,
376 				  TransactionId xid)
377 {
378 	SlruShared	shared = ctl->shared;
379 
380 	/* Outer loop handles restart if we must wait for someone else's I/O */
381 	for (;;)
382 	{
383 		int			slotno;
384 		bool		ok;
385 
386 		/* See if page already is in memory; if not, pick victim slot */
387 		slotno = SlruSelectLRUPage(ctl, pageno);
388 
389 		/* Did we find the page in memory? */
390 		if (shared->page_number[slotno] == pageno &&
391 			shared->page_status[slotno] != SLRU_PAGE_EMPTY)
392 		{
393 			/*
394 			 * If page is still being read in, we must wait for I/O.  Likewise
395 			 * if the page is being written and the caller said that's not OK.
396 			 */
397 			if (shared->page_status[slotno] == SLRU_PAGE_READ_IN_PROGRESS ||
398 				(shared->page_status[slotno] == SLRU_PAGE_WRITE_IN_PROGRESS &&
399 				 !write_ok))
400 			{
401 				SimpleLruWaitIO(ctl, slotno);
402 				/* Now we must recheck state from the top */
403 				continue;
404 			}
405 			/* Otherwise, it's ready to use */
406 			SlruRecentlyUsed(shared, slotno);
407 			return slotno;
408 		}
409 
410 		/* We found no match; assert we selected a freeable slot */
411 		Assert(shared->page_status[slotno] == SLRU_PAGE_EMPTY ||
412 			   (shared->page_status[slotno] == SLRU_PAGE_VALID &&
413 				!shared->page_dirty[slotno]));
414 
415 		/* Mark the slot read-busy */
416 		shared->page_number[slotno] = pageno;
417 		shared->page_status[slotno] = SLRU_PAGE_READ_IN_PROGRESS;
418 		shared->page_dirty[slotno] = false;
419 
420 		/* Acquire per-buffer lock (cannot deadlock, see notes at top) */
421 		LWLockAcquire(&shared->buffer_locks[slotno].lock, LW_EXCLUSIVE);
422 
423 		/* Release control lock while doing I/O */
424 		LWLockRelease(shared->ControlLock);
425 
426 		/* Do the read */
427 		ok = SlruPhysicalReadPage(ctl, pageno, slotno);
428 
429 		/* Set the LSNs for this newly read-in page to zero */
430 		SimpleLruZeroLSNs(ctl, slotno);
431 
432 		/* Re-acquire control lock and update page state */
433 		LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
434 
435 		Assert(shared->page_number[slotno] == pageno &&
436 			   shared->page_status[slotno] == SLRU_PAGE_READ_IN_PROGRESS &&
437 			   !shared->page_dirty[slotno]);
438 
439 		shared->page_status[slotno] = ok ? SLRU_PAGE_VALID : SLRU_PAGE_EMPTY;
440 
441 		LWLockRelease(&shared->buffer_locks[slotno].lock);
442 
443 		/* Now it's okay to ereport if we failed */
444 		if (!ok)
445 			SlruReportIOError(ctl, pageno, xid);
446 
447 		SlruRecentlyUsed(shared, slotno);
448 		return slotno;
449 	}
450 }
451 
452 /*
453  * Find a page in a shared buffer, reading it in if necessary.
454  * The page number must correspond to an already-initialized page.
455  * The caller must intend only read-only access to the page.
456  *
457  * The passed-in xid is used only for error reporting, and may be
458  * InvalidTransactionId if no specific xid is associated with the action.
459  *
460  * Return value is the shared-buffer slot number now holding the page.
461  * The buffer's LRU access info is updated.
462  *
463  * Control lock must NOT be held at entry, but will be held at exit.
464  * It is unspecified whether the lock will be shared or exclusive.
465  */
466 int
SimpleLruReadPage_ReadOnly(SlruCtl ctl,int pageno,TransactionId xid)467 SimpleLruReadPage_ReadOnly(SlruCtl ctl, int pageno, TransactionId xid)
468 {
469 	SlruShared	shared = ctl->shared;
470 	int			slotno;
471 
472 	/* Try to find the page while holding only shared lock */
473 	LWLockAcquire(shared->ControlLock, LW_SHARED);
474 
475 	/* See if page is already in a buffer */
476 	for (slotno = 0; slotno < shared->num_slots; slotno++)
477 	{
478 		if (shared->page_number[slotno] == pageno &&
479 			shared->page_status[slotno] != SLRU_PAGE_EMPTY &&
480 			shared->page_status[slotno] != SLRU_PAGE_READ_IN_PROGRESS)
481 		{
482 			/* See comments for SlruRecentlyUsed macro */
483 			SlruRecentlyUsed(shared, slotno);
484 			return slotno;
485 		}
486 	}
487 
488 	/* No luck, so switch to normal exclusive lock and do regular read */
489 	LWLockRelease(shared->ControlLock);
490 	LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
491 
492 	return SimpleLruReadPage(ctl, pageno, true, xid);
493 }
494 
495 /*
496  * Write a page from a shared buffer, if necessary.
497  * Does nothing if the specified slot is not dirty.
498  *
499  * NOTE: only one write attempt is made here.  Hence, it is possible that
500  * the page is still dirty at exit (if someone else re-dirtied it during
501  * the write).  However, we *do* attempt a fresh write even if the page
502  * is already being written; this is for checkpoints.
503  *
504  * Control lock must be held at entry, and will be held at exit.
505  */
506 static void
SlruInternalWritePage(SlruCtl ctl,int slotno,SlruFlush fdata)507 SlruInternalWritePage(SlruCtl ctl, int slotno, SlruFlush fdata)
508 {
509 	SlruShared	shared = ctl->shared;
510 	int			pageno = shared->page_number[slotno];
511 	bool		ok;
512 
513 	/* If a write is in progress, wait for it to finish */
514 	while (shared->page_status[slotno] == SLRU_PAGE_WRITE_IN_PROGRESS &&
515 		   shared->page_number[slotno] == pageno)
516 	{
517 		SimpleLruWaitIO(ctl, slotno);
518 	}
519 
520 	/*
521 	 * Do nothing if page is not dirty, or if buffer no longer contains the
522 	 * same page we were called for.
523 	 */
524 	if (!shared->page_dirty[slotno] ||
525 		shared->page_status[slotno] != SLRU_PAGE_VALID ||
526 		shared->page_number[slotno] != pageno)
527 		return;
528 
529 	/*
530 	 * Mark the slot write-busy, and clear the dirtybit.  After this point, a
531 	 * transaction status update on this page will mark it dirty again.
532 	 */
533 	shared->page_status[slotno] = SLRU_PAGE_WRITE_IN_PROGRESS;
534 	shared->page_dirty[slotno] = false;
535 
536 	/* Acquire per-buffer lock (cannot deadlock, see notes at top) */
537 	LWLockAcquire(&shared->buffer_locks[slotno].lock, LW_EXCLUSIVE);
538 
539 	/* Release control lock while doing I/O */
540 	LWLockRelease(shared->ControlLock);
541 
542 	/* Do the write */
543 	ok = SlruPhysicalWritePage(ctl, pageno, slotno, fdata);
544 
545 	/* If we failed, and we're in a flush, better close the files */
546 	if (!ok && fdata)
547 	{
548 		int			i;
549 
550 		for (i = 0; i < fdata->num_files; i++)
551 			CloseTransientFile(fdata->fd[i]);
552 	}
553 
554 	/* Re-acquire control lock and update page state */
555 	LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
556 
557 	Assert(shared->page_number[slotno] == pageno &&
558 		   shared->page_status[slotno] == SLRU_PAGE_WRITE_IN_PROGRESS);
559 
560 	/* If we failed to write, mark the page dirty again */
561 	if (!ok)
562 		shared->page_dirty[slotno] = true;
563 
564 	shared->page_status[slotno] = SLRU_PAGE_VALID;
565 
566 	LWLockRelease(&shared->buffer_locks[slotno].lock);
567 
568 	/* Now it's okay to ereport if we failed */
569 	if (!ok)
570 		SlruReportIOError(ctl, pageno, InvalidTransactionId);
571 }
572 
573 /*
574  * Wrapper of SlruInternalWritePage, for external callers.
575  * fdata is always passed a NULL here.
576  */
577 void
SimpleLruWritePage(SlruCtl ctl,int slotno)578 SimpleLruWritePage(SlruCtl ctl, int slotno)
579 {
580 	SlruInternalWritePage(ctl, slotno, NULL);
581 }
582 
583 /*
584  * Return whether the given page exists on disk.
585  *
586  * A false return means that either the file does not exist, or that it's not
587  * large enough to contain the given page.
588  */
589 bool
SimpleLruDoesPhysicalPageExist(SlruCtl ctl,int pageno)590 SimpleLruDoesPhysicalPageExist(SlruCtl ctl, int pageno)
591 {
592 	int			segno = pageno / SLRU_PAGES_PER_SEGMENT;
593 	int			rpageno = pageno % SLRU_PAGES_PER_SEGMENT;
594 	int			offset = rpageno * BLCKSZ;
595 	char		path[MAXPGPATH];
596 	int			fd;
597 	bool		result;
598 	off_t		endpos;
599 
600 	SlruFileName(ctl, path, segno);
601 
602 	fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
603 	if (fd < 0)
604 	{
605 		/* expected: file doesn't exist */
606 		if (errno == ENOENT)
607 			return false;
608 
609 		/* report error normally */
610 		slru_errcause = SLRU_OPEN_FAILED;
611 		slru_errno = errno;
612 		SlruReportIOError(ctl, pageno, 0);
613 	}
614 
615 	if ((endpos = lseek(fd, 0, SEEK_END)) < 0)
616 	{
617 		slru_errcause = SLRU_SEEK_FAILED;
618 		slru_errno = errno;
619 		SlruReportIOError(ctl, pageno, 0);
620 	}
621 
622 	result = endpos >= (off_t) (offset + BLCKSZ);
623 
624 	if (CloseTransientFile(fd))
625 	{
626 		slru_errcause = SLRU_CLOSE_FAILED;
627 		slru_errno = errno;
628 		return false;
629 	}
630 
631 	return result;
632 }
633 
634 /*
635  * Physical read of a (previously existing) page into a buffer slot
636  *
637  * On failure, we cannot just ereport(ERROR) since caller has put state in
638  * shared memory that must be undone.  So, we return false and save enough
639  * info in static variables to let SlruReportIOError make the report.
640  *
641  * For now, assume it's not worth keeping a file pointer open across
642  * read/write operations.  We could cache one virtual file pointer ...
643  */
644 static bool
SlruPhysicalReadPage(SlruCtl ctl,int pageno,int slotno)645 SlruPhysicalReadPage(SlruCtl ctl, int pageno, int slotno)
646 {
647 	SlruShared	shared = ctl->shared;
648 	int			segno = pageno / SLRU_PAGES_PER_SEGMENT;
649 	int			rpageno = pageno % SLRU_PAGES_PER_SEGMENT;
650 	int			offset = rpageno * BLCKSZ;
651 	char		path[MAXPGPATH];
652 	int			fd;
653 
654 	SlruFileName(ctl, path, segno);
655 
656 	/*
657 	 * In a crash-and-restart situation, it's possible for us to receive
658 	 * commands to set the commit status of transactions whose bits are in
659 	 * already-truncated segments of the commit log (see notes in
660 	 * SlruPhysicalWritePage).  Hence, if we are InRecovery, allow the case
661 	 * where the file doesn't exist, and return zeroes instead.
662 	 */
663 	fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
664 	if (fd < 0)
665 	{
666 		if (errno != ENOENT || !InRecovery)
667 		{
668 			slru_errcause = SLRU_OPEN_FAILED;
669 			slru_errno = errno;
670 			return false;
671 		}
672 
673 		ereport(LOG,
674 				(errmsg("file \"%s\" doesn't exist, reading as zeroes",
675 						path)));
676 		MemSet(shared->page_buffer[slotno], 0, BLCKSZ);
677 		return true;
678 	}
679 
680 	if (lseek(fd, (off_t) offset, SEEK_SET) < 0)
681 	{
682 		slru_errcause = SLRU_SEEK_FAILED;
683 		slru_errno = errno;
684 		CloseTransientFile(fd);
685 		return false;
686 	}
687 
688 	errno = 0;
689 	pgstat_report_wait_start(WAIT_EVENT_SLRU_READ);
690 	if (read(fd, shared->page_buffer[slotno], BLCKSZ) != BLCKSZ)
691 	{
692 		pgstat_report_wait_end();
693 		slru_errcause = SLRU_READ_FAILED;
694 		slru_errno = errno;
695 		CloseTransientFile(fd);
696 		return false;
697 	}
698 	pgstat_report_wait_end();
699 
700 	if (CloseTransientFile(fd))
701 	{
702 		slru_errcause = SLRU_CLOSE_FAILED;
703 		slru_errno = errno;
704 		return false;
705 	}
706 
707 	return true;
708 }
709 
710 /*
711  * Physical write of a page from a buffer slot
712  *
713  * On failure, we cannot just ereport(ERROR) since caller has put state in
714  * shared memory that must be undone.  So, we return false and save enough
715  * info in static variables to let SlruReportIOError make the report.
716  *
717  * For now, assume it's not worth keeping a file pointer open across
718  * independent read/write operations.  We do batch operations during
719  * SimpleLruFlush, though.
720  *
721  * fdata is NULL for a standalone write, pointer to open-file info during
722  * SimpleLruFlush.
723  */
724 static bool
SlruPhysicalWritePage(SlruCtl ctl,int pageno,int slotno,SlruFlush fdata)725 SlruPhysicalWritePage(SlruCtl ctl, int pageno, int slotno, SlruFlush fdata)
726 {
727 	SlruShared	shared = ctl->shared;
728 	int			segno = pageno / SLRU_PAGES_PER_SEGMENT;
729 	int			rpageno = pageno % SLRU_PAGES_PER_SEGMENT;
730 	int			offset = rpageno * BLCKSZ;
731 	char		path[MAXPGPATH];
732 	int			fd = -1;
733 
734 	/*
735 	 * Honor the write-WAL-before-data rule, if appropriate, so that we do not
736 	 * write out data before associated WAL records.  This is the same action
737 	 * performed during FlushBuffer() in the main buffer manager.
738 	 */
739 	if (shared->group_lsn != NULL)
740 	{
741 		/*
742 		 * We must determine the largest async-commit LSN for the page. This
743 		 * is a bit tedious, but since this entire function is a slow path
744 		 * anyway, it seems better to do this here than to maintain a per-page
745 		 * LSN variable (which'd need an extra comparison in the
746 		 * transaction-commit path).
747 		 */
748 		XLogRecPtr	max_lsn;
749 		int			lsnindex,
750 					lsnoff;
751 
752 		lsnindex = slotno * shared->lsn_groups_per_page;
753 		max_lsn = shared->group_lsn[lsnindex++];
754 		for (lsnoff = 1; lsnoff < shared->lsn_groups_per_page; lsnoff++)
755 		{
756 			XLogRecPtr	this_lsn = shared->group_lsn[lsnindex++];
757 
758 			if (max_lsn < this_lsn)
759 				max_lsn = this_lsn;
760 		}
761 
762 		if (!XLogRecPtrIsInvalid(max_lsn))
763 		{
764 			/*
765 			 * As noted above, elog(ERROR) is not acceptable here, so if
766 			 * XLogFlush were to fail, we must PANIC.  This isn't much of a
767 			 * restriction because XLogFlush is just about all critical
768 			 * section anyway, but let's make sure.
769 			 */
770 			START_CRIT_SECTION();
771 			XLogFlush(max_lsn);
772 			END_CRIT_SECTION();
773 		}
774 	}
775 
776 	/*
777 	 * During a Flush, we may already have the desired file open.
778 	 */
779 	if (fdata)
780 	{
781 		int			i;
782 
783 		for (i = 0; i < fdata->num_files; i++)
784 		{
785 			if (fdata->segno[i] == segno)
786 			{
787 				fd = fdata->fd[i];
788 				break;
789 			}
790 		}
791 	}
792 
793 	if (fd < 0)
794 	{
795 		/*
796 		 * If the file doesn't already exist, we should create it.  It is
797 		 * possible for this to need to happen when writing a page that's not
798 		 * first in its segment; we assume the OS can cope with that. (Note:
799 		 * it might seem that it'd be okay to create files only when
800 		 * SimpleLruZeroPage is called for the first page of a segment.
801 		 * However, if after a crash and restart the REDO logic elects to
802 		 * replay the log from a checkpoint before the latest one, then it's
803 		 * possible that we will get commands to set transaction status of
804 		 * transactions that have already been truncated from the commit log.
805 		 * Easiest way to deal with that is to accept references to
806 		 * nonexistent files here and in SlruPhysicalReadPage.)
807 		 *
808 		 * Note: it is possible for more than one backend to be executing this
809 		 * code simultaneously for different pages of the same file. Hence,
810 		 * don't use O_EXCL or O_TRUNC or anything like that.
811 		 */
812 		SlruFileName(ctl, path, segno);
813 		fd = OpenTransientFile(path, O_RDWR | O_CREAT | PG_BINARY);
814 		if (fd < 0)
815 		{
816 			slru_errcause = SLRU_OPEN_FAILED;
817 			slru_errno = errno;
818 			return false;
819 		}
820 
821 		if (fdata)
822 		{
823 			if (fdata->num_files < MAX_FLUSH_BUFFERS)
824 			{
825 				fdata->fd[fdata->num_files] = fd;
826 				fdata->segno[fdata->num_files] = segno;
827 				fdata->num_files++;
828 			}
829 			else
830 			{
831 				/*
832 				 * In the unlikely event that we exceed MAX_FLUSH_BUFFERS,
833 				 * fall back to treating it as a standalone write.
834 				 */
835 				fdata = NULL;
836 			}
837 		}
838 	}
839 
840 	if (lseek(fd, (off_t) offset, SEEK_SET) < 0)
841 	{
842 		slru_errcause = SLRU_SEEK_FAILED;
843 		slru_errno = errno;
844 		if (!fdata)
845 			CloseTransientFile(fd);
846 		return false;
847 	}
848 
849 	errno = 0;
850 	pgstat_report_wait_start(WAIT_EVENT_SLRU_WRITE);
851 	if (write(fd, shared->page_buffer[slotno], BLCKSZ) != BLCKSZ)
852 	{
853 		pgstat_report_wait_end();
854 		/* if write didn't set errno, assume problem is no disk space */
855 		if (errno == 0)
856 			errno = ENOSPC;
857 		slru_errcause = SLRU_WRITE_FAILED;
858 		slru_errno = errno;
859 		if (!fdata)
860 			CloseTransientFile(fd);
861 		return false;
862 	}
863 	pgstat_report_wait_end();
864 
865 	/*
866 	 * If not part of Flush, need to fsync now.  We assume this happens
867 	 * infrequently enough that it's not a performance issue.
868 	 */
869 	if (!fdata)
870 	{
871 		pgstat_report_wait_start(WAIT_EVENT_SLRU_SYNC);
872 		if (ctl->do_fsync && pg_fsync(fd))
873 		{
874 			pgstat_report_wait_end();
875 			slru_errcause = SLRU_FSYNC_FAILED;
876 			slru_errno = errno;
877 			CloseTransientFile(fd);
878 			return false;
879 		}
880 		pgstat_report_wait_end();
881 
882 		if (CloseTransientFile(fd))
883 		{
884 			slru_errcause = SLRU_CLOSE_FAILED;
885 			slru_errno = errno;
886 			return false;
887 		}
888 	}
889 
890 	return true;
891 }
892 
893 /*
894  * Issue the error message after failure of SlruPhysicalReadPage or
895  * SlruPhysicalWritePage.  Call this after cleaning up shared-memory state.
896  */
897 static void
SlruReportIOError(SlruCtl ctl,int pageno,TransactionId xid)898 SlruReportIOError(SlruCtl ctl, int pageno, TransactionId xid)
899 {
900 	int			segno = pageno / SLRU_PAGES_PER_SEGMENT;
901 	int			rpageno = pageno % SLRU_PAGES_PER_SEGMENT;
902 	int			offset = rpageno * BLCKSZ;
903 	char		path[MAXPGPATH];
904 
905 	SlruFileName(ctl, path, segno);
906 	errno = slru_errno;
907 	switch (slru_errcause)
908 	{
909 		case SLRU_OPEN_FAILED:
910 			ereport(ERROR,
911 					(errcode_for_file_access(),
912 					 errmsg("could not access status of transaction %u", xid),
913 					 errdetail("Could not open file \"%s\": %m.", path)));
914 			break;
915 		case SLRU_SEEK_FAILED:
916 			ereport(ERROR,
917 					(errcode_for_file_access(),
918 					 errmsg("could not access status of transaction %u", xid),
919 					 errdetail("Could not seek in file \"%s\" to offset %u: %m.",
920 							   path, offset)));
921 			break;
922 		case SLRU_READ_FAILED:
923 			if (errno)
924 				ereport(ERROR,
925 						(errcode_for_file_access(),
926 						 errmsg("could not access status of transaction %u", xid),
927 						 errdetail("Could not read from file \"%s\" at offset %u: %m.",
928 								   path, offset)));
929 			else
930 				ereport(ERROR,
931 						(errmsg("could not access status of transaction %u", xid),
932 						 errdetail("Could not read from file \"%s\" at offset %u: read too few bytes.", path, offset)));
933 			break;
934 		case SLRU_WRITE_FAILED:
935 			if (errno)
936 				ereport(ERROR,
937 						(errcode_for_file_access(),
938 						 errmsg("could not access status of transaction %u", xid),
939 						 errdetail("Could not write to file \"%s\" at offset %u: %m.",
940 								   path, offset)));
941 			else
942 				ereport(ERROR,
943 						(errmsg("could not access status of transaction %u", xid),
944 						 errdetail("Could not write to file \"%s\" at offset %u: wrote too few bytes.",
945 								   path, offset)));
946 			break;
947 		case SLRU_FSYNC_FAILED:
948 			ereport(data_sync_elevel(ERROR),
949 					(errcode_for_file_access(),
950 					 errmsg("could not access status of transaction %u", xid),
951 					 errdetail("Could not fsync file \"%s\": %m.",
952 							   path)));
953 			break;
954 		case SLRU_CLOSE_FAILED:
955 			ereport(ERROR,
956 					(errcode_for_file_access(),
957 					 errmsg("could not access status of transaction %u", xid),
958 					 errdetail("Could not close file \"%s\": %m.",
959 							   path)));
960 			break;
961 		default:
962 			/* can't get here, we trust */
963 			elog(ERROR, "unrecognized SimpleLru error cause: %d",
964 				 (int) slru_errcause);
965 			break;
966 	}
967 }
968 
969 /*
970  * Select the slot to re-use when we need a free slot.
971  *
972  * The target page number is passed because we need to consider the
973  * possibility that some other process reads in the target page while
974  * we are doing I/O to free a slot.  Hence, check or recheck to see if
975  * any slot already holds the target page, and return that slot if so.
976  * Thus, the returned slot is *either* a slot already holding the pageno
977  * (could be any state except EMPTY), *or* a freeable slot (state EMPTY
978  * or CLEAN).
979  *
980  * Control lock must be held at entry, and will be held at exit.
981  */
982 static int
SlruSelectLRUPage(SlruCtl ctl,int pageno)983 SlruSelectLRUPage(SlruCtl ctl, int pageno)
984 {
985 	SlruShared	shared = ctl->shared;
986 
987 	/* Outer loop handles restart after I/O */
988 	for (;;)
989 	{
990 		int			slotno;
991 		int			cur_count;
992 		int			bestvalidslot = 0;	/* keep compiler quiet */
993 		int			best_valid_delta = -1;
994 		int			best_valid_page_number = 0; /* keep compiler quiet */
995 		int			bestinvalidslot = 0;	/* keep compiler quiet */
996 		int			best_invalid_delta = -1;
997 		int			best_invalid_page_number = 0;	/* keep compiler quiet */
998 
999 		/* See if page already has a buffer assigned */
1000 		for (slotno = 0; slotno < shared->num_slots; slotno++)
1001 		{
1002 			if (shared->page_number[slotno] == pageno &&
1003 				shared->page_status[slotno] != SLRU_PAGE_EMPTY)
1004 				return slotno;
1005 		}
1006 
1007 		/*
1008 		 * If we find any EMPTY slot, just select that one. Else choose a
1009 		 * victim page to replace.  We normally take the least recently used
1010 		 * valid page, but we will never take the slot containing
1011 		 * latest_page_number, even if it appears least recently used.  We
1012 		 * will select a slot that is already I/O busy only if there is no
1013 		 * other choice: a read-busy slot will not be least recently used once
1014 		 * the read finishes, and waiting for an I/O on a write-busy slot is
1015 		 * inferior to just picking some other slot.  Testing shows the slot
1016 		 * we pick instead will often be clean, allowing us to begin a read at
1017 		 * once.
1018 		 *
1019 		 * Normally the page_lru_count values will all be different and so
1020 		 * there will be a well-defined LRU page.  But since we allow
1021 		 * concurrent execution of SlruRecentlyUsed() within
1022 		 * SimpleLruReadPage_ReadOnly(), it is possible that multiple pages
1023 		 * acquire the same lru_count values.  In that case we break ties by
1024 		 * choosing the furthest-back page.
1025 		 *
1026 		 * Notice that this next line forcibly advances cur_lru_count to a
1027 		 * value that is certainly beyond any value that will be in the
1028 		 * page_lru_count array after the loop finishes.  This ensures that
1029 		 * the next execution of SlruRecentlyUsed will mark the page newly
1030 		 * used, even if it's for a page that has the current counter value.
1031 		 * That gets us back on the path to having good data when there are
1032 		 * multiple pages with the same lru_count.
1033 		 */
1034 		cur_count = (shared->cur_lru_count)++;
1035 		for (slotno = 0; slotno < shared->num_slots; slotno++)
1036 		{
1037 			int			this_delta;
1038 			int			this_page_number;
1039 
1040 			if (shared->page_status[slotno] == SLRU_PAGE_EMPTY)
1041 				return slotno;
1042 			this_delta = cur_count - shared->page_lru_count[slotno];
1043 			if (this_delta < 0)
1044 			{
1045 				/*
1046 				 * Clean up in case shared updates have caused cur_count
1047 				 * increments to get "lost".  We back off the page counts,
1048 				 * rather than trying to increase cur_count, to avoid any
1049 				 * question of infinite loops or failure in the presence of
1050 				 * wrapped-around counts.
1051 				 */
1052 				shared->page_lru_count[slotno] = cur_count;
1053 				this_delta = 0;
1054 			}
1055 			this_page_number = shared->page_number[slotno];
1056 			if (this_page_number == shared->latest_page_number)
1057 				continue;
1058 			if (shared->page_status[slotno] == SLRU_PAGE_VALID)
1059 			{
1060 				if (this_delta > best_valid_delta ||
1061 					(this_delta == best_valid_delta &&
1062 					 ctl->PagePrecedes(this_page_number,
1063 									   best_valid_page_number)))
1064 				{
1065 					bestvalidslot = slotno;
1066 					best_valid_delta = this_delta;
1067 					best_valid_page_number = this_page_number;
1068 				}
1069 			}
1070 			else
1071 			{
1072 				if (this_delta > best_invalid_delta ||
1073 					(this_delta == best_invalid_delta &&
1074 					 ctl->PagePrecedes(this_page_number,
1075 									   best_invalid_page_number)))
1076 				{
1077 					bestinvalidslot = slotno;
1078 					best_invalid_delta = this_delta;
1079 					best_invalid_page_number = this_page_number;
1080 				}
1081 			}
1082 		}
1083 
1084 		/*
1085 		 * If all pages (except possibly the latest one) are I/O busy, we'll
1086 		 * have to wait for an I/O to complete and then retry.  In that
1087 		 * unhappy case, we choose to wait for the I/O on the least recently
1088 		 * used slot, on the assumption that it was likely initiated first of
1089 		 * all the I/Os in progress and may therefore finish first.
1090 		 */
1091 		if (best_valid_delta < 0)
1092 		{
1093 			SimpleLruWaitIO(ctl, bestinvalidslot);
1094 			continue;
1095 		}
1096 
1097 		/*
1098 		 * If the selected page is clean, we're set.
1099 		 */
1100 		if (!shared->page_dirty[bestvalidslot])
1101 			return bestvalidslot;
1102 
1103 		/*
1104 		 * Write the page.
1105 		 */
1106 		SlruInternalWritePage(ctl, bestvalidslot, NULL);
1107 
1108 		/*
1109 		 * Now loop back and try again.  This is the easiest way of dealing
1110 		 * with corner cases such as the victim page being re-dirtied while we
1111 		 * wrote it.
1112 		 */
1113 	}
1114 }
1115 
1116 /*
1117  * Flush dirty pages to disk during checkpoint or database shutdown
1118  */
1119 void
SimpleLruFlush(SlruCtl ctl,bool allow_redirtied)1120 SimpleLruFlush(SlruCtl ctl, bool allow_redirtied)
1121 {
1122 	SlruShared	shared = ctl->shared;
1123 	SlruFlushData fdata;
1124 	int			slotno;
1125 	int			pageno = 0;
1126 	int			i;
1127 	bool		ok;
1128 
1129 	/*
1130 	 * Find and write dirty pages
1131 	 */
1132 	fdata.num_files = 0;
1133 
1134 	LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
1135 
1136 	for (slotno = 0; slotno < shared->num_slots; slotno++)
1137 	{
1138 		SlruInternalWritePage(ctl, slotno, &fdata);
1139 
1140 		/*
1141 		 * In some places (e.g. checkpoints), we cannot assert that the slot
1142 		 * is clean now, since another process might have re-dirtied it
1143 		 * already.  That's okay.
1144 		 */
1145 		Assert(allow_redirtied ||
1146 			   shared->page_status[slotno] == SLRU_PAGE_EMPTY ||
1147 			   (shared->page_status[slotno] == SLRU_PAGE_VALID &&
1148 				!shared->page_dirty[slotno]));
1149 	}
1150 
1151 	LWLockRelease(shared->ControlLock);
1152 
1153 	/*
1154 	 * Now fsync and close any files that were open
1155 	 */
1156 	ok = true;
1157 	for (i = 0; i < fdata.num_files; i++)
1158 	{
1159 		pgstat_report_wait_start(WAIT_EVENT_SLRU_FLUSH_SYNC);
1160 		if (ctl->do_fsync && pg_fsync(fdata.fd[i]))
1161 		{
1162 			slru_errcause = SLRU_FSYNC_FAILED;
1163 			slru_errno = errno;
1164 			pageno = fdata.segno[i] * SLRU_PAGES_PER_SEGMENT;
1165 			ok = false;
1166 		}
1167 		pgstat_report_wait_end();
1168 
1169 		if (CloseTransientFile(fdata.fd[i]))
1170 		{
1171 			slru_errcause = SLRU_CLOSE_FAILED;
1172 			slru_errno = errno;
1173 			pageno = fdata.segno[i] * SLRU_PAGES_PER_SEGMENT;
1174 			ok = false;
1175 		}
1176 	}
1177 	if (!ok)
1178 		SlruReportIOError(ctl, pageno, InvalidTransactionId);
1179 
1180 	/* Ensure that directory entries for new files are on disk. */
1181 	if (ctl->do_fsync)
1182 		fsync_fname(ctl->Dir, true);
1183 }
1184 
1185 /*
1186  * Remove all segments before the one holding the passed page number
1187  *
1188  * All SLRUs prevent concurrent calls to this function, either with an LWLock
1189  * or by calling it only as part of a checkpoint.  Mutual exclusion must begin
1190  * before computing cutoffPage.  Mutual exclusion must end after any limit
1191  * update that would permit other backends to write fresh data into the
1192  * segment immediately preceding the one containing cutoffPage.  Otherwise,
1193  * when the SLRU is quite full, SimpleLruTruncate() might delete that segment
1194  * after it has accrued freshly-written data.
1195  */
1196 void
SimpleLruTruncate(SlruCtl ctl,int cutoffPage)1197 SimpleLruTruncate(SlruCtl ctl, int cutoffPage)
1198 {
1199 	SlruShared	shared = ctl->shared;
1200 	int			slotno;
1201 
1202 	/*
1203 	 * Scan shared memory and remove any pages preceding the cutoff page, to
1204 	 * ensure we won't rewrite them later.  (Since this is normally called in
1205 	 * or just after a checkpoint, any dirty pages should have been flushed
1206 	 * already ... we're just being extra careful here.)
1207 	 */
1208 	LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
1209 
1210 restart:;
1211 
1212 	/*
1213 	 * While we are holding the lock, make an important safety check: the
1214 	 * current endpoint page must not be eligible for removal.
1215 	 */
1216 	if (ctl->PagePrecedes(shared->latest_page_number, cutoffPage))
1217 	{
1218 		LWLockRelease(shared->ControlLock);
1219 		ereport(LOG,
1220 				(errmsg("could not truncate directory \"%s\": apparent wraparound",
1221 						ctl->Dir)));
1222 		return;
1223 	}
1224 
1225 	for (slotno = 0; slotno < shared->num_slots; slotno++)
1226 	{
1227 		if (shared->page_status[slotno] == SLRU_PAGE_EMPTY)
1228 			continue;
1229 		if (!ctl->PagePrecedes(shared->page_number[slotno], cutoffPage))
1230 			continue;
1231 
1232 		/*
1233 		 * If page is clean, just change state to EMPTY (expected case).
1234 		 */
1235 		if (shared->page_status[slotno] == SLRU_PAGE_VALID &&
1236 			!shared->page_dirty[slotno])
1237 		{
1238 			shared->page_status[slotno] = SLRU_PAGE_EMPTY;
1239 			continue;
1240 		}
1241 
1242 		/*
1243 		 * Hmm, we have (or may have) I/O operations acting on the page, so
1244 		 * we've got to wait for them to finish and then start again. This is
1245 		 * the same logic as in SlruSelectLRUPage.  (XXX if page is dirty,
1246 		 * wouldn't it be OK to just discard it without writing it?
1247 		 * SlruMayDeleteSegment() uses a stricter qualification, so we might
1248 		 * not delete this page in the end; even if we don't delete it, we
1249 		 * won't have cause to read its data again.  For now, keep the logic
1250 		 * the same as it was.)
1251 		 */
1252 		if (shared->page_status[slotno] == SLRU_PAGE_VALID)
1253 			SlruInternalWritePage(ctl, slotno, NULL);
1254 		else
1255 			SimpleLruWaitIO(ctl, slotno);
1256 		goto restart;
1257 	}
1258 
1259 	LWLockRelease(shared->ControlLock);
1260 
1261 	/* Now we can remove the old segment(s) */
1262 	(void) SlruScanDirectory(ctl, SlruScanDirCbDeleteCutoff, &cutoffPage);
1263 }
1264 
1265 /*
1266  * Delete an individual SLRU segment, identified by the filename.
1267  *
1268  * NB: This does not touch the SLRU buffers themselves, callers have to ensure
1269  * they either can't yet contain anything, or have already been cleaned out.
1270  */
1271 static void
SlruInternalDeleteSegment(SlruCtl ctl,char * filename)1272 SlruInternalDeleteSegment(SlruCtl ctl, char *filename)
1273 {
1274 	char		path[MAXPGPATH];
1275 
1276 	snprintf(path, MAXPGPATH, "%s/%s", ctl->Dir, filename);
1277 	ereport(DEBUG2,
1278 			(errmsg("removing file \"%s\"", path)));
1279 	unlink(path);
1280 }
1281 
1282 /*
1283  * Delete an individual SLRU segment, identified by the segment number.
1284  */
1285 void
SlruDeleteSegment(SlruCtl ctl,int segno)1286 SlruDeleteSegment(SlruCtl ctl, int segno)
1287 {
1288 	SlruShared	shared = ctl->shared;
1289 	int			slotno;
1290 	char		path[MAXPGPATH];
1291 	bool		did_write;
1292 
1293 	/* Clean out any possibly existing references to the segment. */
1294 	LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
1295 restart:
1296 	did_write = false;
1297 	for (slotno = 0; slotno < shared->num_slots; slotno++)
1298 	{
1299 		int			pagesegno = shared->page_number[slotno] / SLRU_PAGES_PER_SEGMENT;
1300 
1301 		if (shared->page_status[slotno] == SLRU_PAGE_EMPTY)
1302 			continue;
1303 
1304 		/* not the segment we're looking for */
1305 		if (pagesegno != segno)
1306 			continue;
1307 
1308 		/* If page is clean, just change state to EMPTY (expected case). */
1309 		if (shared->page_status[slotno] == SLRU_PAGE_VALID &&
1310 			!shared->page_dirty[slotno])
1311 		{
1312 			shared->page_status[slotno] = SLRU_PAGE_EMPTY;
1313 			continue;
1314 		}
1315 
1316 		/* Same logic as SimpleLruTruncate() */
1317 		if (shared->page_status[slotno] == SLRU_PAGE_VALID)
1318 			SlruInternalWritePage(ctl, slotno, NULL);
1319 		else
1320 			SimpleLruWaitIO(ctl, slotno);
1321 
1322 		did_write = true;
1323 	}
1324 
1325 	/*
1326 	 * Be extra careful and re-check. The IO functions release the control
1327 	 * lock, so new pages could have been read in.
1328 	 */
1329 	if (did_write)
1330 		goto restart;
1331 
1332 	snprintf(path, MAXPGPATH, "%s/%04X", ctl->Dir, segno);
1333 	ereport(DEBUG2,
1334 			(errmsg("removing file \"%s\"", path)));
1335 	unlink(path);
1336 
1337 	LWLockRelease(shared->ControlLock);
1338 }
1339 
1340 /*
1341  * Determine whether a segment is okay to delete.
1342  *
1343  * segpage is the first page of the segment, and cutoffPage is the oldest (in
1344  * PagePrecedes order) page in the SLRU containing still-useful data.  Since
1345  * every core PagePrecedes callback implements "wrap around", check the
1346  * segment's first and last pages:
1347  *
1348  * first<cutoff  && last<cutoff:  yes
1349  * first<cutoff  && last>=cutoff: no; cutoff falls inside this segment
1350  * first>=cutoff && last<cutoff:  no; wrap point falls inside this segment
1351  * first>=cutoff && last>=cutoff: no; every page of this segment is too young
1352  */
1353 static bool
SlruMayDeleteSegment(SlruCtl ctl,int segpage,int cutoffPage)1354 SlruMayDeleteSegment(SlruCtl ctl, int segpage, int cutoffPage)
1355 {
1356 	int			seg_last_page = segpage + SLRU_PAGES_PER_SEGMENT - 1;
1357 
1358 	Assert(segpage % SLRU_PAGES_PER_SEGMENT == 0);
1359 
1360 	return (ctl->PagePrecedes(segpage, cutoffPage) &&
1361 			ctl->PagePrecedes(seg_last_page, cutoffPage));
1362 }
1363 
1364 #ifdef USE_ASSERT_CHECKING
1365 static void
SlruPagePrecedesTestOffset(SlruCtl ctl,int per_page,uint32 offset)1366 SlruPagePrecedesTestOffset(SlruCtl ctl, int per_page, uint32 offset)
1367 {
1368 	TransactionId lhs,
1369 				rhs;
1370 	int			newestPage,
1371 				oldestPage;
1372 	TransactionId newestXact,
1373 				oldestXact;
1374 
1375 	/*
1376 	 * Compare an XID pair having undefined order (see RFC 1982), a pair at
1377 	 * "opposite ends" of the XID space.  TransactionIdPrecedes() treats each
1378 	 * as preceding the other.  If RHS is oldestXact, LHS is the first XID we
1379 	 * must not assign.
1380 	 */
1381 	lhs = per_page + offset;	/* skip first page to avoid non-normal XIDs */
1382 	rhs = lhs + (1U << 31);
1383 	Assert(TransactionIdPrecedes(lhs, rhs));
1384 	Assert(TransactionIdPrecedes(rhs, lhs));
1385 	Assert(!TransactionIdPrecedes(lhs - 1, rhs));
1386 	Assert(TransactionIdPrecedes(rhs, lhs - 1));
1387 	Assert(TransactionIdPrecedes(lhs + 1, rhs));
1388 	Assert(!TransactionIdPrecedes(rhs, lhs + 1));
1389 	Assert(!TransactionIdFollowsOrEquals(lhs, rhs));
1390 	Assert(!TransactionIdFollowsOrEquals(rhs, lhs));
1391 	Assert(!ctl->PagePrecedes(lhs / per_page, lhs / per_page));
1392 	Assert(!ctl->PagePrecedes(lhs / per_page, rhs / per_page));
1393 	Assert(!ctl->PagePrecedes(rhs / per_page, lhs / per_page));
1394 	Assert(!ctl->PagePrecedes((lhs - per_page) / per_page, rhs / per_page));
1395 	Assert(ctl->PagePrecedes(rhs / per_page, (lhs - 3 * per_page) / per_page));
1396 	Assert(ctl->PagePrecedes(rhs / per_page, (lhs - 2 * per_page) / per_page));
1397 	Assert(ctl->PagePrecedes(rhs / per_page, (lhs - 1 * per_page) / per_page)
1398 		   || (1U << 31) % per_page != 0);	/* See CommitTsPagePrecedes() */
1399 	Assert(ctl->PagePrecedes((lhs + 1 * per_page) / per_page, rhs / per_page)
1400 		   || (1U << 31) % per_page != 0);
1401 	Assert(ctl->PagePrecedes((lhs + 2 * per_page) / per_page, rhs / per_page));
1402 	Assert(ctl->PagePrecedes((lhs + 3 * per_page) / per_page, rhs / per_page));
1403 	Assert(!ctl->PagePrecedes(rhs / per_page, (lhs + per_page) / per_page));
1404 
1405 	/*
1406 	 * GetNewTransactionId() has assigned the last XID it can safely use, and
1407 	 * that XID is in the *LAST* page of the second segment.  We must not
1408 	 * delete that segment.
1409 	 */
1410 	newestPage = 2 * SLRU_PAGES_PER_SEGMENT - 1;
1411 	newestXact = newestPage * per_page + offset;
1412 	Assert(newestXact / per_page == newestPage);
1413 	oldestXact = newestXact + 1;
1414 	oldestXact -= 1U << 31;
1415 	oldestPage = oldestXact / per_page;
1416 	Assert(!SlruMayDeleteSegment(ctl,
1417 								 (newestPage -
1418 								  newestPage % SLRU_PAGES_PER_SEGMENT),
1419 								 oldestPage));
1420 
1421 	/*
1422 	 * GetNewTransactionId() has assigned the last XID it can safely use, and
1423 	 * that XID is in the *FIRST* page of the second segment.  We must not
1424 	 * delete that segment.
1425 	 */
1426 	newestPage = SLRU_PAGES_PER_SEGMENT;
1427 	newestXact = newestPage * per_page + offset;
1428 	Assert(newestXact / per_page == newestPage);
1429 	oldestXact = newestXact + 1;
1430 	oldestXact -= 1U << 31;
1431 	oldestPage = oldestXact / per_page;
1432 	Assert(!SlruMayDeleteSegment(ctl,
1433 								 (newestPage -
1434 								  newestPage % SLRU_PAGES_PER_SEGMENT),
1435 								 oldestPage));
1436 }
1437 
1438 /*
1439  * Unit-test a PagePrecedes function.
1440  *
1441  * This assumes every uint32 >= FirstNormalTransactionId is a valid key.  It
1442  * assumes each value occupies a contiguous, fixed-size region of SLRU bytes.
1443  * (MultiXactMemberCtl separates flags from XIDs.  AsyncCtl has
1444  * variable-length entries, no keys, and no random access.  These unit tests
1445  * do not apply to them.)
1446  */
1447 void
SlruPagePrecedesUnitTests(SlruCtl ctl,int per_page)1448 SlruPagePrecedesUnitTests(SlruCtl ctl, int per_page)
1449 {
1450 	/* Test first, middle and last entries of a page. */
1451 	SlruPagePrecedesTestOffset(ctl, per_page, 0);
1452 	SlruPagePrecedesTestOffset(ctl, per_page, per_page / 2);
1453 	SlruPagePrecedesTestOffset(ctl, per_page, per_page - 1);
1454 }
1455 #endif
1456 
1457 /*
1458  * SlruScanDirectory callback
1459  *		This callback reports true if there's any segment wholly prior to the
1460  *		one containing the page passed as "data".
1461  */
1462 bool
SlruScanDirCbReportPresence(SlruCtl ctl,char * filename,int segpage,void * data)1463 SlruScanDirCbReportPresence(SlruCtl ctl, char *filename, int segpage, void *data)
1464 {
1465 	int			cutoffPage = *(int *) data;
1466 
1467 	if (SlruMayDeleteSegment(ctl, segpage, cutoffPage))
1468 		return true;			/* found one; don't iterate any more */
1469 
1470 	return false;				/* keep going */
1471 }
1472 
1473 /*
1474  * SlruScanDirectory callback.
1475  *		This callback deletes segments prior to the one passed in as "data".
1476  */
1477 static bool
SlruScanDirCbDeleteCutoff(SlruCtl ctl,char * filename,int segpage,void * data)1478 SlruScanDirCbDeleteCutoff(SlruCtl ctl, char *filename, int segpage, void *data)
1479 {
1480 	int			cutoffPage = *(int *) data;
1481 
1482 	if (SlruMayDeleteSegment(ctl, segpage, cutoffPage))
1483 		SlruInternalDeleteSegment(ctl, filename);
1484 
1485 	return false;				/* keep going */
1486 }
1487 
1488 /*
1489  * SlruScanDirectory callback.
1490  *		This callback deletes all segments.
1491  */
1492 bool
SlruScanDirCbDeleteAll(SlruCtl ctl,char * filename,int segpage,void * data)1493 SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int segpage, void *data)
1494 {
1495 	SlruInternalDeleteSegment(ctl, filename);
1496 
1497 	return false;				/* keep going */
1498 }
1499 
1500 /*
1501  * Scan the SimpleLRU directory and apply a callback to each file found in it.
1502  *
1503  * If the callback returns true, the scan is stopped.  The last return value
1504  * from the callback is returned.
1505  *
1506  * The callback receives the following arguments: 1. the SlruCtl struct for the
1507  * slru being truncated; 2. the filename being considered; 3. the page number
1508  * for the first page of that file; 4. a pointer to the opaque data given to us
1509  * by the caller.
1510  *
1511  * Note that the ordering in which the directory is scanned is not guaranteed.
1512  *
1513  * Note that no locking is applied.
1514  */
1515 bool
SlruScanDirectory(SlruCtl ctl,SlruScanCallback callback,void * data)1516 SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)
1517 {
1518 	bool		retval = false;
1519 	DIR		   *cldir;
1520 	struct dirent *clde;
1521 	int			segno;
1522 	int			segpage;
1523 
1524 	cldir = AllocateDir(ctl->Dir);
1525 	while ((clde = ReadDir(cldir, ctl->Dir)) != NULL)
1526 	{
1527 		size_t		len;
1528 
1529 		len = strlen(clde->d_name);
1530 
1531 		if ((len == 4 || len == 5 || len == 6) &&
1532 			strspn(clde->d_name, "0123456789ABCDEF") == len)
1533 		{
1534 			segno = (int) strtol(clde->d_name, NULL, 16);
1535 			segpage = segno * SLRU_PAGES_PER_SEGMENT;
1536 
1537 			elog(DEBUG2, "SlruScanDirectory invoking callback on %s/%s",
1538 				 ctl->Dir, clde->d_name);
1539 			retval = callback(ctl, clde->d_name, segpage, data);
1540 			if (retval)
1541 				break;
1542 		}
1543 	}
1544 	FreeDir(cldir);
1545 
1546 	return retval;
1547 }
1548