1 /*-------------------------------------------------------------------------
2  *
3  * slru.c
4  *		Simple LRU buffering for transaction status logfiles
5  *
6  * We use a simple least-recently-used scheme to manage a pool of page
7  * buffers.  Under ordinary circumstances we expect that write
8  * traffic will occur mostly to the latest page (and to the just-prior
9  * page, soon after a page transition).  Read traffic will probably touch
10  * a larger span of pages, but in any case a fairly small number of page
11  * buffers should be sufficient.  So, we just search the buffers using plain
12  * linear search; there's no need for a hashtable or anything fancy.
13  * The management algorithm is straight LRU except that we will never swap
14  * out the latest page (since we know it's going to be hit again eventually).
15  *
16  * We use a control LWLock to protect the shared data structures, plus
17  * per-buffer LWLocks that synchronize I/O for each buffer.  The control lock
18  * must be held to examine or modify any shared state.  A process that is
19  * reading in or writing out a page buffer does not hold the control lock,
20  * only the per-buffer lock for the buffer it is working on.
21  *
22  * "Holding the control lock" means exclusive lock in all cases except for
23  * SimpleLruReadPage_ReadOnly(); see comments for SlruRecentlyUsed() for
24  * the implications of that.
25  *
26  * When initiating I/O on a buffer, we acquire the per-buffer lock exclusively
27  * before releasing the control lock.  The per-buffer lock is released after
28  * completing the I/O, re-acquiring the control lock, and updating the shared
29  * state.  (Deadlock is not possible here, because we never try to initiate
30  * I/O when someone else is already doing I/O on the same buffer.)
31  * To wait for I/O to complete, release the control lock, acquire the
32  * per-buffer lock in shared mode, immediately release the per-buffer lock,
33  * reacquire the control lock, and then recheck state (since arbitrary things
34  * could have happened while we didn't have the lock).
35  *
36  * As with the regular buffer manager, it is possible for another process
37  * to re-dirty a page that is currently being written out.  This is handled
38  * by re-setting the page's page_dirty flag.
39  *
40  *
41  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
42  * Portions Copyright (c) 1994, Regents of the University of California
43  *
44  * src/backend/access/transam/slru.c
45  *
46  *-------------------------------------------------------------------------
47  */
48 #include "postgres.h"
49 
50 #include <fcntl.h>
51 #include <sys/stat.h>
52 #include <unistd.h>
53 
54 #include "access/slru.h"
55 #include "access/transam.h"
56 #include "access/xlog.h"
57 #include "pgstat.h"
58 #include "storage/fd.h"
59 #include "storage/shmem.h"
60 #include "miscadmin.h"
61 
62 
63 #define SlruFileName(ctl, path, seg) \
64 	snprintf(path, MAXPGPATH, "%s/%04X", (ctl)->Dir, seg)
65 
66 /*
67  * During SimpleLruFlush(), we will usually not need to write/fsync more
68  * than one or two physical files, but we may need to write several pages
69  * per file.  We can consolidate the I/O requests by leaving files open
70  * until control returns to SimpleLruFlush().  This data structure remembers
71  * which files are open.
72  */
73 #define MAX_FLUSH_BUFFERS	16
74 
75 typedef struct SlruFlushData
76 {
77 	int			num_files;		/* # files actually open */
78 	int			fd[MAX_FLUSH_BUFFERS];	/* their FD's */
79 	int			segno[MAX_FLUSH_BUFFERS];	/* their log seg#s */
80 } SlruFlushData;
81 
82 typedef struct SlruFlushData *SlruFlush;
83 
84 /*
85  * Macro to mark a buffer slot "most recently used".  Note multiple evaluation
86  * of arguments!
87  *
88  * The reason for the if-test is that there are often many consecutive
89  * accesses to the same page (particularly the latest page).  By suppressing
90  * useless increments of cur_lru_count, we reduce the probability that old
91  * pages' counts will "wrap around" and make them appear recently used.
92  *
93  * We allow this code to be executed concurrently by multiple processes within
94  * SimpleLruReadPage_ReadOnly().  As long as int reads and writes are atomic,
95  * this should not cause any completely-bogus values to enter the computation.
96  * However, it is possible for either cur_lru_count or individual
97  * page_lru_count entries to be "reset" to lower values than they should have,
98  * in case a process is delayed while it executes this macro.  With care in
99  * SlruSelectLRUPage(), this does little harm, and in any case the absolute
100  * worst possible consequence is a nonoptimal choice of page to evict.  The
101  * gain from allowing concurrent reads of SLRU pages seems worth it.
102  */
103 #define SlruRecentlyUsed(shared, slotno)	\
104 	do { \
105 		int		new_lru_count = (shared)->cur_lru_count; \
106 		if (new_lru_count != (shared)->page_lru_count[slotno]) { \
107 			(shared)->cur_lru_count = ++new_lru_count; \
108 			(shared)->page_lru_count[slotno] = new_lru_count; \
109 		} \
110 	} while (0)
111 
112 /* Saved info for SlruReportIOError */
113 typedef enum
114 {
115 	SLRU_OPEN_FAILED,
116 	SLRU_SEEK_FAILED,
117 	SLRU_READ_FAILED,
118 	SLRU_WRITE_FAILED,
119 	SLRU_FSYNC_FAILED,
120 	SLRU_CLOSE_FAILED
121 } SlruErrorCause;
122 
123 static SlruErrorCause slru_errcause;
124 static int	slru_errno;
125 
126 
127 static void SimpleLruZeroLSNs(SlruCtl ctl, int slotno);
128 static void SimpleLruWaitIO(SlruCtl ctl, int slotno);
129 static void SlruInternalWritePage(SlruCtl ctl, int slotno, SlruFlush fdata);
130 static bool SlruPhysicalReadPage(SlruCtl ctl, int pageno, int slotno);
131 static bool SlruPhysicalWritePage(SlruCtl ctl, int pageno, int slotno,
132 					  SlruFlush fdata);
133 static void SlruReportIOError(SlruCtl ctl, int pageno, TransactionId xid);
134 static int	SlruSelectLRUPage(SlruCtl ctl, int pageno);
135 
136 static bool SlruScanDirCbDeleteCutoff(SlruCtl ctl, char *filename,
137 						  int segpage, void *data);
138 static void SlruInternalDeleteSegment(SlruCtl ctl, char *filename);
139 
140 /*
141  * Initialization of shared memory
142  */
143 
144 Size
SimpleLruShmemSize(int nslots,int nlsns)145 SimpleLruShmemSize(int nslots, int nlsns)
146 {
147 	Size		sz;
148 
149 	/* we assume nslots isn't so large as to risk overflow */
150 	sz = MAXALIGN(sizeof(SlruSharedData));
151 	sz += MAXALIGN(nslots * sizeof(char *));	/* page_buffer[] */
152 	sz += MAXALIGN(nslots * sizeof(SlruPageStatus));	/* page_status[] */
153 	sz += MAXALIGN(nslots * sizeof(bool));	/* page_dirty[] */
154 	sz += MAXALIGN(nslots * sizeof(int));	/* page_number[] */
155 	sz += MAXALIGN(nslots * sizeof(int));	/* page_lru_count[] */
156 	sz += MAXALIGN(nslots * sizeof(LWLockPadded));	/* buffer_locks[] */
157 
158 	if (nlsns > 0)
159 		sz += MAXALIGN(nslots * nlsns * sizeof(XLogRecPtr));	/* group_lsn[] */
160 
161 	return BUFFERALIGN(sz) + BLCKSZ * nslots;
162 }
163 
164 void
SimpleLruInit(SlruCtl ctl,const char * name,int nslots,int nlsns,LWLock * ctllock,const char * subdir,int tranche_id)165 SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns,
166 			  LWLock *ctllock, const char *subdir, int tranche_id)
167 {
168 	SlruShared	shared;
169 	bool		found;
170 
171 	shared = (SlruShared) ShmemInitStruct(name,
172 										  SimpleLruShmemSize(nslots, nlsns),
173 										  &found);
174 
175 	if (!IsUnderPostmaster)
176 	{
177 		/* Initialize locks and shared memory area */
178 		char	   *ptr;
179 		Size		offset;
180 		int			slotno;
181 
182 		Assert(!found);
183 
184 		memset(shared, 0, sizeof(SlruSharedData));
185 
186 		shared->ControlLock = ctllock;
187 
188 		shared->num_slots = nslots;
189 		shared->lsn_groups_per_page = nlsns;
190 
191 		shared->cur_lru_count = 0;
192 
193 		/* shared->latest_page_number will be set later */
194 
195 		ptr = (char *) shared;
196 		offset = MAXALIGN(sizeof(SlruSharedData));
197 		shared->page_buffer = (char **) (ptr + offset);
198 		offset += MAXALIGN(nslots * sizeof(char *));
199 		shared->page_status = (SlruPageStatus *) (ptr + offset);
200 		offset += MAXALIGN(nslots * sizeof(SlruPageStatus));
201 		shared->page_dirty = (bool *) (ptr + offset);
202 		offset += MAXALIGN(nslots * sizeof(bool));
203 		shared->page_number = (int *) (ptr + offset);
204 		offset += MAXALIGN(nslots * sizeof(int));
205 		shared->page_lru_count = (int *) (ptr + offset);
206 		offset += MAXALIGN(nslots * sizeof(int));
207 
208 		/* Initialize LWLocks */
209 		shared->buffer_locks = (LWLockPadded *) (ptr + offset);
210 		offset += MAXALIGN(nslots * sizeof(LWLockPadded));
211 
212 		if (nlsns > 0)
213 		{
214 			shared->group_lsn = (XLogRecPtr *) (ptr + offset);
215 			offset += MAXALIGN(nslots * nlsns * sizeof(XLogRecPtr));
216 		}
217 
218 		Assert(strlen(name) + 1 < SLRU_MAX_NAME_LENGTH);
219 		strlcpy(shared->lwlock_tranche_name, name, SLRU_MAX_NAME_LENGTH);
220 		shared->lwlock_tranche_id = tranche_id;
221 
222 		ptr += BUFFERALIGN(offset);
223 		for (slotno = 0; slotno < nslots; slotno++)
224 		{
225 			LWLockInitialize(&shared->buffer_locks[slotno].lock,
226 							 shared->lwlock_tranche_id);
227 
228 			shared->page_buffer[slotno] = ptr;
229 			shared->page_status[slotno] = SLRU_PAGE_EMPTY;
230 			shared->page_dirty[slotno] = false;
231 			shared->page_lru_count[slotno] = 0;
232 			ptr += BLCKSZ;
233 		}
234 
235 		/* Should fit to estimated shmem size */
236 		Assert(ptr - (char *) shared <= SimpleLruShmemSize(nslots, nlsns));
237 	}
238 	else
239 		Assert(found);
240 
241 	/* Register SLRU tranche in the main tranches array */
242 	LWLockRegisterTranche(shared->lwlock_tranche_id,
243 						  shared->lwlock_tranche_name);
244 
245 	/*
246 	 * Initialize the unshared control struct, including directory path. We
247 	 * assume caller set PagePrecedes.
248 	 */
249 	ctl->shared = shared;
250 	ctl->do_fsync = true;		/* default behavior */
251 	StrNCpy(ctl->Dir, subdir, sizeof(ctl->Dir));
252 }
253 
254 /*
255  * Initialize (or reinitialize) a page to zeroes.
256  *
257  * The page is not actually written, just set up in shared memory.
258  * The slot number of the new page is returned.
259  *
260  * Control lock must be held at entry, and will be held at exit.
261  */
262 int
SimpleLruZeroPage(SlruCtl ctl,int pageno)263 SimpleLruZeroPage(SlruCtl ctl, int pageno)
264 {
265 	SlruShared	shared = ctl->shared;
266 	int			slotno;
267 
268 	/* Find a suitable buffer slot for the page */
269 	slotno = SlruSelectLRUPage(ctl, pageno);
270 	Assert(shared->page_status[slotno] == SLRU_PAGE_EMPTY ||
271 		   (shared->page_status[slotno] == SLRU_PAGE_VALID &&
272 			!shared->page_dirty[slotno]) ||
273 		   shared->page_number[slotno] == pageno);
274 
275 	/* Mark the slot as containing this page */
276 	shared->page_number[slotno] = pageno;
277 	shared->page_status[slotno] = SLRU_PAGE_VALID;
278 	shared->page_dirty[slotno] = true;
279 	SlruRecentlyUsed(shared, slotno);
280 
281 	/* Set the buffer to zeroes */
282 	MemSet(shared->page_buffer[slotno], 0, BLCKSZ);
283 
284 	/* Set the LSNs for this new page to zero */
285 	SimpleLruZeroLSNs(ctl, slotno);
286 
287 	/* Assume this page is now the latest active page */
288 	shared->latest_page_number = pageno;
289 
290 	return slotno;
291 }
292 
293 /*
294  * Zero all the LSNs we store for this slru page.
295  *
296  * This should be called each time we create a new page, and each time we read
297  * in a page from disk into an existing buffer.  (Such an old page cannot
298  * have any interesting LSNs, since we'd have flushed them before writing
299  * the page in the first place.)
300  *
301  * This assumes that InvalidXLogRecPtr is bitwise-all-0.
302  */
303 static void
SimpleLruZeroLSNs(SlruCtl ctl,int slotno)304 SimpleLruZeroLSNs(SlruCtl ctl, int slotno)
305 {
306 	SlruShared	shared = ctl->shared;
307 
308 	if (shared->lsn_groups_per_page > 0)
309 		MemSet(&shared->group_lsn[slotno * shared->lsn_groups_per_page], 0,
310 			   shared->lsn_groups_per_page * sizeof(XLogRecPtr));
311 }
312 
313 /*
314  * Wait for any active I/O on a page slot to finish.  (This does not
315  * guarantee that new I/O hasn't been started before we return, though.
316  * In fact the slot might not even contain the same page anymore.)
317  *
318  * Control lock must be held at entry, and will be held at exit.
319  */
320 static void
SimpleLruWaitIO(SlruCtl ctl,int slotno)321 SimpleLruWaitIO(SlruCtl ctl, int slotno)
322 {
323 	SlruShared	shared = ctl->shared;
324 
325 	/* See notes at top of file */
326 	LWLockRelease(shared->ControlLock);
327 	LWLockAcquire(&shared->buffer_locks[slotno].lock, LW_SHARED);
328 	LWLockRelease(&shared->buffer_locks[slotno].lock);
329 	LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
330 
331 	/*
332 	 * If the slot is still in an io-in-progress state, then either someone
333 	 * already started a new I/O on the slot, or a previous I/O failed and
334 	 * neglected to reset the page state.  That shouldn't happen, really, but
335 	 * it seems worth a few extra cycles to check and recover from it. We can
336 	 * cheaply test for failure by seeing if the buffer lock is still held (we
337 	 * assume that transaction abort would release the lock).
338 	 */
339 	if (shared->page_status[slotno] == SLRU_PAGE_READ_IN_PROGRESS ||
340 		shared->page_status[slotno] == SLRU_PAGE_WRITE_IN_PROGRESS)
341 	{
342 		if (LWLockConditionalAcquire(&shared->buffer_locks[slotno].lock, LW_SHARED))
343 		{
344 			/* indeed, the I/O must have failed */
345 			if (shared->page_status[slotno] == SLRU_PAGE_READ_IN_PROGRESS)
346 				shared->page_status[slotno] = SLRU_PAGE_EMPTY;
347 			else				/* write_in_progress */
348 			{
349 				shared->page_status[slotno] = SLRU_PAGE_VALID;
350 				shared->page_dirty[slotno] = true;
351 			}
352 			LWLockRelease(&shared->buffer_locks[slotno].lock);
353 		}
354 	}
355 }
356 
357 /*
358  * Find a page in a shared buffer, reading it in if necessary.
359  * The page number must correspond to an already-initialized page.
360  *
361  * If write_ok is true then it is OK to return a page that is in
362  * WRITE_IN_PROGRESS state; it is the caller's responsibility to be sure
363  * that modification of the page is safe.  If write_ok is false then we
364  * will not return the page until it is not undergoing active I/O.
365  *
366  * The passed-in xid is used only for error reporting, and may be
367  * InvalidTransactionId if no specific xid is associated with the action.
368  *
369  * Return value is the shared-buffer slot number now holding the page.
370  * The buffer's LRU access info is updated.
371  *
372  * Control lock must be held at entry, and will be held at exit.
373  */
374 int
SimpleLruReadPage(SlruCtl ctl,int pageno,bool write_ok,TransactionId xid)375 SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok,
376 				  TransactionId xid)
377 {
378 	SlruShared	shared = ctl->shared;
379 
380 	/* Outer loop handles restart if we must wait for someone else's I/O */
381 	for (;;)
382 	{
383 		int			slotno;
384 		bool		ok;
385 
386 		/* See if page already is in memory; if not, pick victim slot */
387 		slotno = SlruSelectLRUPage(ctl, pageno);
388 
389 		/* Did we find the page in memory? */
390 		if (shared->page_number[slotno] == pageno &&
391 			shared->page_status[slotno] != SLRU_PAGE_EMPTY)
392 		{
393 			/*
394 			 * If page is still being read in, we must wait for I/O.  Likewise
395 			 * if the page is being written and the caller said that's not OK.
396 			 */
397 			if (shared->page_status[slotno] == SLRU_PAGE_READ_IN_PROGRESS ||
398 				(shared->page_status[slotno] == SLRU_PAGE_WRITE_IN_PROGRESS &&
399 				 !write_ok))
400 			{
401 				SimpleLruWaitIO(ctl, slotno);
402 				/* Now we must recheck state from the top */
403 				continue;
404 			}
405 			/* Otherwise, it's ready to use */
406 			SlruRecentlyUsed(shared, slotno);
407 			return slotno;
408 		}
409 
410 		/* We found no match; assert we selected a freeable slot */
411 		Assert(shared->page_status[slotno] == SLRU_PAGE_EMPTY ||
412 			   (shared->page_status[slotno] == SLRU_PAGE_VALID &&
413 				!shared->page_dirty[slotno]));
414 
415 		/* Mark the slot read-busy */
416 		shared->page_number[slotno] = pageno;
417 		shared->page_status[slotno] = SLRU_PAGE_READ_IN_PROGRESS;
418 		shared->page_dirty[slotno] = false;
419 
420 		/* Acquire per-buffer lock (cannot deadlock, see notes at top) */
421 		LWLockAcquire(&shared->buffer_locks[slotno].lock, LW_EXCLUSIVE);
422 
423 		/* Release control lock while doing I/O */
424 		LWLockRelease(shared->ControlLock);
425 
426 		/* Do the read */
427 		ok = SlruPhysicalReadPage(ctl, pageno, slotno);
428 
429 		/* Set the LSNs for this newly read-in page to zero */
430 		SimpleLruZeroLSNs(ctl, slotno);
431 
432 		/* Re-acquire control lock and update page state */
433 		LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
434 
435 		Assert(shared->page_number[slotno] == pageno &&
436 			   shared->page_status[slotno] == SLRU_PAGE_READ_IN_PROGRESS &&
437 			   !shared->page_dirty[slotno]);
438 
439 		shared->page_status[slotno] = ok ? SLRU_PAGE_VALID : SLRU_PAGE_EMPTY;
440 
441 		LWLockRelease(&shared->buffer_locks[slotno].lock);
442 
443 		/* Now it's okay to ereport if we failed */
444 		if (!ok)
445 			SlruReportIOError(ctl, pageno, xid);
446 
447 		SlruRecentlyUsed(shared, slotno);
448 		return slotno;
449 	}
450 }
451 
452 /*
453  * Find a page in a shared buffer, reading it in if necessary.
454  * The page number must correspond to an already-initialized page.
455  * The caller must intend only read-only access to the page.
456  *
457  * The passed-in xid is used only for error reporting, and may be
458  * InvalidTransactionId if no specific xid is associated with the action.
459  *
460  * Return value is the shared-buffer slot number now holding the page.
461  * The buffer's LRU access info is updated.
462  *
463  * Control lock must NOT be held at entry, but will be held at exit.
464  * It is unspecified whether the lock will be shared or exclusive.
465  */
466 int
SimpleLruReadPage_ReadOnly(SlruCtl ctl,int pageno,TransactionId xid)467 SimpleLruReadPage_ReadOnly(SlruCtl ctl, int pageno, TransactionId xid)
468 {
469 	SlruShared	shared = ctl->shared;
470 	int			slotno;
471 
472 	/* Try to find the page while holding only shared lock */
473 	LWLockAcquire(shared->ControlLock, LW_SHARED);
474 
475 	/* See if page is already in a buffer */
476 	for (slotno = 0; slotno < shared->num_slots; slotno++)
477 	{
478 		if (shared->page_number[slotno] == pageno &&
479 			shared->page_status[slotno] != SLRU_PAGE_EMPTY &&
480 			shared->page_status[slotno] != SLRU_PAGE_READ_IN_PROGRESS)
481 		{
482 			/* See comments for SlruRecentlyUsed macro */
483 			SlruRecentlyUsed(shared, slotno);
484 			return slotno;
485 		}
486 	}
487 
488 	/* No luck, so switch to normal exclusive lock and do regular read */
489 	LWLockRelease(shared->ControlLock);
490 	LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
491 
492 	return SimpleLruReadPage(ctl, pageno, true, xid);
493 }
494 
495 /*
496  * Write a page from a shared buffer, if necessary.
497  * Does nothing if the specified slot is not dirty.
498  *
499  * NOTE: only one write attempt is made here.  Hence, it is possible that
500  * the page is still dirty at exit (if someone else re-dirtied it during
501  * the write).  However, we *do* attempt a fresh write even if the page
502  * is already being written; this is for checkpoints.
503  *
504  * Control lock must be held at entry, and will be held at exit.
505  */
506 static void
SlruInternalWritePage(SlruCtl ctl,int slotno,SlruFlush fdata)507 SlruInternalWritePage(SlruCtl ctl, int slotno, SlruFlush fdata)
508 {
509 	SlruShared	shared = ctl->shared;
510 	int			pageno = shared->page_number[slotno];
511 	bool		ok;
512 
513 	/* If a write is in progress, wait for it to finish */
514 	while (shared->page_status[slotno] == SLRU_PAGE_WRITE_IN_PROGRESS &&
515 		   shared->page_number[slotno] == pageno)
516 	{
517 		SimpleLruWaitIO(ctl, slotno);
518 	}
519 
520 	/*
521 	 * Do nothing if page is not dirty, or if buffer no longer contains the
522 	 * same page we were called for.
523 	 */
524 	if (!shared->page_dirty[slotno] ||
525 		shared->page_status[slotno] != SLRU_PAGE_VALID ||
526 		shared->page_number[slotno] != pageno)
527 		return;
528 
529 	/*
530 	 * Mark the slot write-busy, and clear the dirtybit.  After this point, a
531 	 * transaction status update on this page will mark it dirty again.
532 	 */
533 	shared->page_status[slotno] = SLRU_PAGE_WRITE_IN_PROGRESS;
534 	shared->page_dirty[slotno] = false;
535 
536 	/* Acquire per-buffer lock (cannot deadlock, see notes at top) */
537 	LWLockAcquire(&shared->buffer_locks[slotno].lock, LW_EXCLUSIVE);
538 
539 	/* Release control lock while doing I/O */
540 	LWLockRelease(shared->ControlLock);
541 
542 	/* Do the write */
543 	ok = SlruPhysicalWritePage(ctl, pageno, slotno, fdata);
544 
545 	/* If we failed, and we're in a flush, better close the files */
546 	if (!ok && fdata)
547 	{
548 		int			i;
549 
550 		for (i = 0; i < fdata->num_files; i++)
551 			CloseTransientFile(fdata->fd[i]);
552 	}
553 
554 	/* Re-acquire control lock and update page state */
555 	LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
556 
557 	Assert(shared->page_number[slotno] == pageno &&
558 		   shared->page_status[slotno] == SLRU_PAGE_WRITE_IN_PROGRESS);
559 
560 	/* If we failed to write, mark the page dirty again */
561 	if (!ok)
562 		shared->page_dirty[slotno] = true;
563 
564 	shared->page_status[slotno] = SLRU_PAGE_VALID;
565 
566 	LWLockRelease(&shared->buffer_locks[slotno].lock);
567 
568 	/* Now it's okay to ereport if we failed */
569 	if (!ok)
570 		SlruReportIOError(ctl, pageno, InvalidTransactionId);
571 }
572 
573 /*
574  * Wrapper of SlruInternalWritePage, for external callers.
575  * fdata is always passed a NULL here.
576  */
577 void
SimpleLruWritePage(SlruCtl ctl,int slotno)578 SimpleLruWritePage(SlruCtl ctl, int slotno)
579 {
580 	SlruInternalWritePage(ctl, slotno, NULL);
581 }
582 
583 /*
584  * Return whether the given page exists on disk.
585  *
586  * A false return means that either the file does not exist, or that it's not
587  * large enough to contain the given page.
588  */
589 bool
SimpleLruDoesPhysicalPageExist(SlruCtl ctl,int pageno)590 SimpleLruDoesPhysicalPageExist(SlruCtl ctl, int pageno)
591 {
592 	int			segno = pageno / SLRU_PAGES_PER_SEGMENT;
593 	int			rpageno = pageno % SLRU_PAGES_PER_SEGMENT;
594 	int			offset = rpageno * BLCKSZ;
595 	char		path[MAXPGPATH];
596 	int			fd;
597 	bool		result;
598 	off_t		endpos;
599 
600 	SlruFileName(ctl, path, segno);
601 
602 	fd = OpenTransientFile(path, O_RDWR | PG_BINARY, S_IRUSR | S_IWUSR);
603 	if (fd < 0)
604 	{
605 		/* expected: file doesn't exist */
606 		if (errno == ENOENT)
607 			return false;
608 
609 		/* report error normally */
610 		slru_errcause = SLRU_OPEN_FAILED;
611 		slru_errno = errno;
612 		SlruReportIOError(ctl, pageno, 0);
613 	}
614 
615 	if ((endpos = lseek(fd, 0, SEEK_END)) < 0)
616 	{
617 		slru_errcause = SLRU_SEEK_FAILED;
618 		slru_errno = errno;
619 		SlruReportIOError(ctl, pageno, 0);
620 	}
621 
622 	result = endpos >= (off_t) (offset + BLCKSZ);
623 
624 	CloseTransientFile(fd);
625 	return result;
626 }
627 
628 /*
629  * Physical read of a (previously existing) page into a buffer slot
630  *
631  * On failure, we cannot just ereport(ERROR) since caller has put state in
632  * shared memory that must be undone.  So, we return FALSE and save enough
633  * info in static variables to let SlruReportIOError make the report.
634  *
635  * For now, assume it's not worth keeping a file pointer open across
636  * read/write operations.  We could cache one virtual file pointer ...
637  */
638 static bool
SlruPhysicalReadPage(SlruCtl ctl,int pageno,int slotno)639 SlruPhysicalReadPage(SlruCtl ctl, int pageno, int slotno)
640 {
641 	SlruShared	shared = ctl->shared;
642 	int			segno = pageno / SLRU_PAGES_PER_SEGMENT;
643 	int			rpageno = pageno % SLRU_PAGES_PER_SEGMENT;
644 	int			offset = rpageno * BLCKSZ;
645 	char		path[MAXPGPATH];
646 	int			fd;
647 
648 	SlruFileName(ctl, path, segno);
649 
650 	/*
651 	 * In a crash-and-restart situation, it's possible for us to receive
652 	 * commands to set the commit status of transactions whose bits are in
653 	 * already-truncated segments of the commit log (see notes in
654 	 * SlruPhysicalWritePage).  Hence, if we are InRecovery, allow the case
655 	 * where the file doesn't exist, and return zeroes instead.
656 	 */
657 	fd = OpenTransientFile(path, O_RDWR | PG_BINARY, S_IRUSR | S_IWUSR);
658 	if (fd < 0)
659 	{
660 		if (errno != ENOENT || !InRecovery)
661 		{
662 			slru_errcause = SLRU_OPEN_FAILED;
663 			slru_errno = errno;
664 			return false;
665 		}
666 
667 		ereport(LOG,
668 				(errmsg("file \"%s\" doesn't exist, reading as zeroes",
669 						path)));
670 		MemSet(shared->page_buffer[slotno], 0, BLCKSZ);
671 		return true;
672 	}
673 
674 	if (lseek(fd, (off_t) offset, SEEK_SET) < 0)
675 	{
676 		slru_errcause = SLRU_SEEK_FAILED;
677 		slru_errno = errno;
678 		CloseTransientFile(fd);
679 		return false;
680 	}
681 
682 	errno = 0;
683 	pgstat_report_wait_start(WAIT_EVENT_SLRU_READ);
684 	if (read(fd, shared->page_buffer[slotno], BLCKSZ) != BLCKSZ)
685 	{
686 		pgstat_report_wait_end();
687 		slru_errcause = SLRU_READ_FAILED;
688 		slru_errno = errno;
689 		CloseTransientFile(fd);
690 		return false;
691 	}
692 	pgstat_report_wait_end();
693 
694 	if (CloseTransientFile(fd))
695 	{
696 		slru_errcause = SLRU_CLOSE_FAILED;
697 		slru_errno = errno;
698 		return false;
699 	}
700 
701 	return true;
702 }
703 
704 /*
705  * Physical write of a page from a buffer slot
706  *
707  * On failure, we cannot just ereport(ERROR) since caller has put state in
708  * shared memory that must be undone.  So, we return FALSE and save enough
709  * info in static variables to let SlruReportIOError make the report.
710  *
711  * For now, assume it's not worth keeping a file pointer open across
712  * independent read/write operations.  We do batch operations during
713  * SimpleLruFlush, though.
714  *
715  * fdata is NULL for a standalone write, pointer to open-file info during
716  * SimpleLruFlush.
717  */
718 static bool
SlruPhysicalWritePage(SlruCtl ctl,int pageno,int slotno,SlruFlush fdata)719 SlruPhysicalWritePage(SlruCtl ctl, int pageno, int slotno, SlruFlush fdata)
720 {
721 	SlruShared	shared = ctl->shared;
722 	int			segno = pageno / SLRU_PAGES_PER_SEGMENT;
723 	int			rpageno = pageno % SLRU_PAGES_PER_SEGMENT;
724 	int			offset = rpageno * BLCKSZ;
725 	char		path[MAXPGPATH];
726 	int			fd = -1;
727 
728 	/*
729 	 * Honor the write-WAL-before-data rule, if appropriate, so that we do not
730 	 * write out data before associated WAL records.  This is the same action
731 	 * performed during FlushBuffer() in the main buffer manager.
732 	 */
733 	if (shared->group_lsn != NULL)
734 	{
735 		/*
736 		 * We must determine the largest async-commit LSN for the page. This
737 		 * is a bit tedious, but since this entire function is a slow path
738 		 * anyway, it seems better to do this here than to maintain a per-page
739 		 * LSN variable (which'd need an extra comparison in the
740 		 * transaction-commit path).
741 		 */
742 		XLogRecPtr	max_lsn;
743 		int			lsnindex,
744 					lsnoff;
745 
746 		lsnindex = slotno * shared->lsn_groups_per_page;
747 		max_lsn = shared->group_lsn[lsnindex++];
748 		for (lsnoff = 1; lsnoff < shared->lsn_groups_per_page; lsnoff++)
749 		{
750 			XLogRecPtr	this_lsn = shared->group_lsn[lsnindex++];
751 
752 			if (max_lsn < this_lsn)
753 				max_lsn = this_lsn;
754 		}
755 
756 		if (!XLogRecPtrIsInvalid(max_lsn))
757 		{
758 			/*
759 			 * As noted above, elog(ERROR) is not acceptable here, so if
760 			 * XLogFlush were to fail, we must PANIC.  This isn't much of a
761 			 * restriction because XLogFlush is just about all critical
762 			 * section anyway, but let's make sure.
763 			 */
764 			START_CRIT_SECTION();
765 			XLogFlush(max_lsn);
766 			END_CRIT_SECTION();
767 		}
768 	}
769 
770 	/*
771 	 * During a Flush, we may already have the desired file open.
772 	 */
773 	if (fdata)
774 	{
775 		int			i;
776 
777 		for (i = 0; i < fdata->num_files; i++)
778 		{
779 			if (fdata->segno[i] == segno)
780 			{
781 				fd = fdata->fd[i];
782 				break;
783 			}
784 		}
785 	}
786 
787 	if (fd < 0)
788 	{
789 		/*
790 		 * If the file doesn't already exist, we should create it.  It is
791 		 * possible for this to need to happen when writing a page that's not
792 		 * first in its segment; we assume the OS can cope with that. (Note:
793 		 * it might seem that it'd be okay to create files only when
794 		 * SimpleLruZeroPage is called for the first page of a segment.
795 		 * However, if after a crash and restart the REDO logic elects to
796 		 * replay the log from a checkpoint before the latest one, then it's
797 		 * possible that we will get commands to set transaction status of
798 		 * transactions that have already been truncated from the commit log.
799 		 * Easiest way to deal with that is to accept references to
800 		 * nonexistent files here and in SlruPhysicalReadPage.)
801 		 *
802 		 * Note: it is possible for more than one backend to be executing this
803 		 * code simultaneously for different pages of the same file. Hence,
804 		 * don't use O_EXCL or O_TRUNC or anything like that.
805 		 */
806 		SlruFileName(ctl, path, segno);
807 		fd = OpenTransientFile(path, O_RDWR | O_CREAT | PG_BINARY,
808 							   S_IRUSR | S_IWUSR);
809 		if (fd < 0)
810 		{
811 			slru_errcause = SLRU_OPEN_FAILED;
812 			slru_errno = errno;
813 			return false;
814 		}
815 
816 		if (fdata)
817 		{
818 			if (fdata->num_files < MAX_FLUSH_BUFFERS)
819 			{
820 				fdata->fd[fdata->num_files] = fd;
821 				fdata->segno[fdata->num_files] = segno;
822 				fdata->num_files++;
823 			}
824 			else
825 			{
826 				/*
827 				 * In the unlikely event that we exceed MAX_FLUSH_BUFFERS,
828 				 * fall back to treating it as a standalone write.
829 				 */
830 				fdata = NULL;
831 			}
832 		}
833 	}
834 
835 	if (lseek(fd, (off_t) offset, SEEK_SET) < 0)
836 	{
837 		slru_errcause = SLRU_SEEK_FAILED;
838 		slru_errno = errno;
839 		if (!fdata)
840 			CloseTransientFile(fd);
841 		return false;
842 	}
843 
844 	errno = 0;
845 	pgstat_report_wait_start(WAIT_EVENT_SLRU_WRITE);
846 	if (write(fd, shared->page_buffer[slotno], BLCKSZ) != BLCKSZ)
847 	{
848 		pgstat_report_wait_end();
849 		/* if write didn't set errno, assume problem is no disk space */
850 		if (errno == 0)
851 			errno = ENOSPC;
852 		slru_errcause = SLRU_WRITE_FAILED;
853 		slru_errno = errno;
854 		if (!fdata)
855 			CloseTransientFile(fd);
856 		return false;
857 	}
858 	pgstat_report_wait_end();
859 
860 	/*
861 	 * If not part of Flush, need to fsync now.  We assume this happens
862 	 * infrequently enough that it's not a performance issue.
863 	 */
864 	if (!fdata)
865 	{
866 		pgstat_report_wait_start(WAIT_EVENT_SLRU_SYNC);
867 		if (ctl->do_fsync && pg_fsync(fd))
868 		{
869 			pgstat_report_wait_end();
870 			slru_errcause = SLRU_FSYNC_FAILED;
871 			slru_errno = errno;
872 			CloseTransientFile(fd);
873 			return false;
874 		}
875 		pgstat_report_wait_end();
876 
877 		if (CloseTransientFile(fd))
878 		{
879 			slru_errcause = SLRU_CLOSE_FAILED;
880 			slru_errno = errno;
881 			return false;
882 		}
883 	}
884 
885 	return true;
886 }
887 
888 /*
889  * Issue the error message after failure of SlruPhysicalReadPage or
890  * SlruPhysicalWritePage.  Call this after cleaning up shared-memory state.
891  */
892 static void
SlruReportIOError(SlruCtl ctl,int pageno,TransactionId xid)893 SlruReportIOError(SlruCtl ctl, int pageno, TransactionId xid)
894 {
895 	int			segno = pageno / SLRU_PAGES_PER_SEGMENT;
896 	int			rpageno = pageno % SLRU_PAGES_PER_SEGMENT;
897 	int			offset = rpageno * BLCKSZ;
898 	char		path[MAXPGPATH];
899 
900 	SlruFileName(ctl, path, segno);
901 	errno = slru_errno;
902 	switch (slru_errcause)
903 	{
904 		case SLRU_OPEN_FAILED:
905 			ereport(ERROR,
906 					(errcode_for_file_access(),
907 					 errmsg("could not access status of transaction %u", xid),
908 					 errdetail("Could not open file \"%s\": %m.", path)));
909 			break;
910 		case SLRU_SEEK_FAILED:
911 			ereport(ERROR,
912 					(errcode_for_file_access(),
913 					 errmsg("could not access status of transaction %u", xid),
914 					 errdetail("Could not seek in file \"%s\" to offset %u: %m.",
915 							   path, offset)));
916 			break;
917 		case SLRU_READ_FAILED:
918 			ereport(ERROR,
919 					(errcode_for_file_access(),
920 					 errmsg("could not access status of transaction %u", xid),
921 					 errdetail("Could not read from file \"%s\" at offset %u: %m.",
922 							   path, offset)));
923 			break;
924 		case SLRU_WRITE_FAILED:
925 			ereport(ERROR,
926 					(errcode_for_file_access(),
927 					 errmsg("could not access status of transaction %u", xid),
928 					 errdetail("Could not write to file \"%s\" at offset %u: %m.",
929 							   path, offset)));
930 			break;
931 		case SLRU_FSYNC_FAILED:
932 			ereport(data_sync_elevel(ERROR),
933 					(errcode_for_file_access(),
934 					 errmsg("could not access status of transaction %u", xid),
935 					 errdetail("Could not fsync file \"%s\": %m.",
936 							   path)));
937 			break;
938 		case SLRU_CLOSE_FAILED:
939 			ereport(ERROR,
940 					(errcode_for_file_access(),
941 					 errmsg("could not access status of transaction %u", xid),
942 					 errdetail("Could not close file \"%s\": %m.",
943 							   path)));
944 			break;
945 		default:
946 			/* can't get here, we trust */
947 			elog(ERROR, "unrecognized SimpleLru error cause: %d",
948 				 (int) slru_errcause);
949 			break;
950 	}
951 }
952 
953 /*
954  * Select the slot to re-use when we need a free slot.
955  *
956  * The target page number is passed because we need to consider the
957  * possibility that some other process reads in the target page while
958  * we are doing I/O to free a slot.  Hence, check or recheck to see if
959  * any slot already holds the target page, and return that slot if so.
960  * Thus, the returned slot is *either* a slot already holding the pageno
961  * (could be any state except EMPTY), *or* a freeable slot (state EMPTY
962  * or CLEAN).
963  *
964  * Control lock must be held at entry, and will be held at exit.
965  */
966 static int
SlruSelectLRUPage(SlruCtl ctl,int pageno)967 SlruSelectLRUPage(SlruCtl ctl, int pageno)
968 {
969 	SlruShared	shared = ctl->shared;
970 
971 	/* Outer loop handles restart after I/O */
972 	for (;;)
973 	{
974 		int			slotno;
975 		int			cur_count;
976 		int			bestvalidslot = 0;	/* keep compiler quiet */
977 		int			best_valid_delta = -1;
978 		int			best_valid_page_number = 0; /* keep compiler quiet */
979 		int			bestinvalidslot = 0;	/* keep compiler quiet */
980 		int			best_invalid_delta = -1;
981 		int			best_invalid_page_number = 0;	/* keep compiler quiet */
982 
983 		/* See if page already has a buffer assigned */
984 		for (slotno = 0; slotno < shared->num_slots; slotno++)
985 		{
986 			if (shared->page_number[slotno] == pageno &&
987 				shared->page_status[slotno] != SLRU_PAGE_EMPTY)
988 				return slotno;
989 		}
990 
991 		/*
992 		 * If we find any EMPTY slot, just select that one. Else choose a
993 		 * victim page to replace.  We normally take the least recently used
994 		 * valid page, but we will never take the slot containing
995 		 * latest_page_number, even if it appears least recently used.  We
996 		 * will select a slot that is already I/O busy only if there is no
997 		 * other choice: a read-busy slot will not be least recently used once
998 		 * the read finishes, and waiting for an I/O on a write-busy slot is
999 		 * inferior to just picking some other slot.  Testing shows the slot
1000 		 * we pick instead will often be clean, allowing us to begin a read at
1001 		 * once.
1002 		 *
1003 		 * Normally the page_lru_count values will all be different and so
1004 		 * there will be a well-defined LRU page.  But since we allow
1005 		 * concurrent execution of SlruRecentlyUsed() within
1006 		 * SimpleLruReadPage_ReadOnly(), it is possible that multiple pages
1007 		 * acquire the same lru_count values.  In that case we break ties by
1008 		 * choosing the furthest-back page.
1009 		 *
1010 		 * Notice that this next line forcibly advances cur_lru_count to a
1011 		 * value that is certainly beyond any value that will be in the
1012 		 * page_lru_count array after the loop finishes.  This ensures that
1013 		 * the next execution of SlruRecentlyUsed will mark the page newly
1014 		 * used, even if it's for a page that has the current counter value.
1015 		 * That gets us back on the path to having good data when there are
1016 		 * multiple pages with the same lru_count.
1017 		 */
1018 		cur_count = (shared->cur_lru_count)++;
1019 		for (slotno = 0; slotno < shared->num_slots; slotno++)
1020 		{
1021 			int			this_delta;
1022 			int			this_page_number;
1023 
1024 			if (shared->page_status[slotno] == SLRU_PAGE_EMPTY)
1025 				return slotno;
1026 			this_delta = cur_count - shared->page_lru_count[slotno];
1027 			if (this_delta < 0)
1028 			{
1029 				/*
1030 				 * Clean up in case shared updates have caused cur_count
1031 				 * increments to get "lost".  We back off the page counts,
1032 				 * rather than trying to increase cur_count, to avoid any
1033 				 * question of infinite loops or failure in the presence of
1034 				 * wrapped-around counts.
1035 				 */
1036 				shared->page_lru_count[slotno] = cur_count;
1037 				this_delta = 0;
1038 			}
1039 			this_page_number = shared->page_number[slotno];
1040 			if (this_page_number == shared->latest_page_number)
1041 				continue;
1042 			if (shared->page_status[slotno] == SLRU_PAGE_VALID)
1043 			{
1044 				if (this_delta > best_valid_delta ||
1045 					(this_delta == best_valid_delta &&
1046 					 ctl->PagePrecedes(this_page_number,
1047 									   best_valid_page_number)))
1048 				{
1049 					bestvalidslot = slotno;
1050 					best_valid_delta = this_delta;
1051 					best_valid_page_number = this_page_number;
1052 				}
1053 			}
1054 			else
1055 			{
1056 				if (this_delta > best_invalid_delta ||
1057 					(this_delta == best_invalid_delta &&
1058 					 ctl->PagePrecedes(this_page_number,
1059 									   best_invalid_page_number)))
1060 				{
1061 					bestinvalidslot = slotno;
1062 					best_invalid_delta = this_delta;
1063 					best_invalid_page_number = this_page_number;
1064 				}
1065 			}
1066 		}
1067 
1068 		/*
1069 		 * If all pages (except possibly the latest one) are I/O busy, we'll
1070 		 * have to wait for an I/O to complete and then retry.  In that
1071 		 * unhappy case, we choose to wait for the I/O on the least recently
1072 		 * used slot, on the assumption that it was likely initiated first of
1073 		 * all the I/Os in progress and may therefore finish first.
1074 		 */
1075 		if (best_valid_delta < 0)
1076 		{
1077 			SimpleLruWaitIO(ctl, bestinvalidslot);
1078 			continue;
1079 		}
1080 
1081 		/*
1082 		 * If the selected page is clean, we're set.
1083 		 */
1084 		if (!shared->page_dirty[bestvalidslot])
1085 			return bestvalidslot;
1086 
1087 		/*
1088 		 * Write the page.
1089 		 */
1090 		SlruInternalWritePage(ctl, bestvalidslot, NULL);
1091 
1092 		/*
1093 		 * Now loop back and try again.  This is the easiest way of dealing
1094 		 * with corner cases such as the victim page being re-dirtied while we
1095 		 * wrote it.
1096 		 */
1097 	}
1098 }
1099 
1100 /*
1101  * Flush dirty pages to disk during checkpoint or database shutdown
1102  */
1103 void
SimpleLruFlush(SlruCtl ctl,bool allow_redirtied)1104 SimpleLruFlush(SlruCtl ctl, bool allow_redirtied)
1105 {
1106 	SlruShared	shared = ctl->shared;
1107 	SlruFlushData fdata;
1108 	int			slotno;
1109 	int			pageno = 0;
1110 	int			i;
1111 	bool		ok;
1112 
1113 	/*
1114 	 * Find and write dirty pages
1115 	 */
1116 	fdata.num_files = 0;
1117 
1118 	LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
1119 
1120 	for (slotno = 0; slotno < shared->num_slots; slotno++)
1121 	{
1122 		SlruInternalWritePage(ctl, slotno, &fdata);
1123 
1124 		/*
1125 		 * In some places (e.g. checkpoints), we cannot assert that the slot
1126 		 * is clean now, since another process might have re-dirtied it
1127 		 * already.  That's okay.
1128 		 */
1129 		Assert(allow_redirtied ||
1130 			   shared->page_status[slotno] == SLRU_PAGE_EMPTY ||
1131 			   (shared->page_status[slotno] == SLRU_PAGE_VALID &&
1132 				!shared->page_dirty[slotno]));
1133 	}
1134 
1135 	LWLockRelease(shared->ControlLock);
1136 
1137 	/*
1138 	 * Now fsync and close any files that were open
1139 	 */
1140 	ok = true;
1141 	for (i = 0; i < fdata.num_files; i++)
1142 	{
1143 		pgstat_report_wait_start(WAIT_EVENT_SLRU_FLUSH_SYNC);
1144 		if (ctl->do_fsync && pg_fsync(fdata.fd[i]))
1145 		{
1146 			slru_errcause = SLRU_FSYNC_FAILED;
1147 			slru_errno = errno;
1148 			pageno = fdata.segno[i] * SLRU_PAGES_PER_SEGMENT;
1149 			ok = false;
1150 		}
1151 		pgstat_report_wait_end();
1152 
1153 		if (CloseTransientFile(fdata.fd[i]))
1154 		{
1155 			slru_errcause = SLRU_CLOSE_FAILED;
1156 			slru_errno = errno;
1157 			pageno = fdata.segno[i] * SLRU_PAGES_PER_SEGMENT;
1158 			ok = false;
1159 		}
1160 	}
1161 	if (!ok)
1162 		SlruReportIOError(ctl, pageno, InvalidTransactionId);
1163 
1164 	/* Ensure that directory entries for new files are on disk. */
1165 	if (ctl->do_fsync)
1166 		fsync_fname(ctl->Dir, true);
1167 }
1168 
1169 /*
1170  * Remove all segments before the one holding the passed page number
1171  *
1172  * All SLRUs prevent concurrent calls to this function, either with an LWLock
1173  * or by calling it only as part of a checkpoint.  Mutual exclusion must begin
1174  * before computing cutoffPage.  Mutual exclusion must end after any limit
1175  * update that would permit other backends to write fresh data into the
1176  * segment immediately preceding the one containing cutoffPage.  Otherwise,
1177  * when the SLRU is quite full, SimpleLruTruncate() might delete that segment
1178  * after it has accrued freshly-written data.
1179  */
1180 void
SimpleLruTruncate(SlruCtl ctl,int cutoffPage)1181 SimpleLruTruncate(SlruCtl ctl, int cutoffPage)
1182 {
1183 	SlruShared	shared = ctl->shared;
1184 	int			slotno;
1185 
1186 	/*
1187 	 * Scan shared memory and remove any pages preceding the cutoff page, to
1188 	 * ensure we won't rewrite them later.  (Since this is normally called in
1189 	 * or just after a checkpoint, any dirty pages should have been flushed
1190 	 * already ... we're just being extra careful here.)
1191 	 */
1192 	LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
1193 
1194 restart:;
1195 
1196 	/*
1197 	 * While we are holding the lock, make an important safety check: the
1198 	 * current endpoint page must not be eligible for removal.
1199 	 */
1200 	if (ctl->PagePrecedes(shared->latest_page_number, cutoffPage))
1201 	{
1202 		LWLockRelease(shared->ControlLock);
1203 		ereport(LOG,
1204 				(errmsg("could not truncate directory \"%s\": apparent wraparound",
1205 						ctl->Dir)));
1206 		return;
1207 	}
1208 
1209 	for (slotno = 0; slotno < shared->num_slots; slotno++)
1210 	{
1211 		if (shared->page_status[slotno] == SLRU_PAGE_EMPTY)
1212 			continue;
1213 		if (!ctl->PagePrecedes(shared->page_number[slotno], cutoffPage))
1214 			continue;
1215 
1216 		/*
1217 		 * If page is clean, just change state to EMPTY (expected case).
1218 		 */
1219 		if (shared->page_status[slotno] == SLRU_PAGE_VALID &&
1220 			!shared->page_dirty[slotno])
1221 		{
1222 			shared->page_status[slotno] = SLRU_PAGE_EMPTY;
1223 			continue;
1224 		}
1225 
1226 		/*
1227 		 * Hmm, we have (or may have) I/O operations acting on the page, so
1228 		 * we've got to wait for them to finish and then start again. This is
1229 		 * the same logic as in SlruSelectLRUPage.  (XXX if page is dirty,
1230 		 * wouldn't it be OK to just discard it without writing it?
1231 		 * SlruMayDeleteSegment() uses a stricter qualification, so we might
1232 		 * not delete this page in the end; even if we don't delete it, we
1233 		 * won't have cause to read its data again.  For now, keep the logic
1234 		 * the same as it was.)
1235 		 */
1236 		if (shared->page_status[slotno] == SLRU_PAGE_VALID)
1237 			SlruInternalWritePage(ctl, slotno, NULL);
1238 		else
1239 			SimpleLruWaitIO(ctl, slotno);
1240 		goto restart;
1241 	}
1242 
1243 	LWLockRelease(shared->ControlLock);
1244 
1245 	/* Now we can remove the old segment(s) */
1246 	(void) SlruScanDirectory(ctl, SlruScanDirCbDeleteCutoff, &cutoffPage);
1247 }
1248 
1249 /*
1250  * Delete an individual SLRU segment, identified by the filename.
1251  *
1252  * NB: This does not touch the SLRU buffers themselves, callers have to ensure
1253  * they either can't yet contain anything, or have already been cleaned out.
1254  */
1255 static void
SlruInternalDeleteSegment(SlruCtl ctl,char * filename)1256 SlruInternalDeleteSegment(SlruCtl ctl, char *filename)
1257 {
1258 	char		path[MAXPGPATH];
1259 
1260 	snprintf(path, MAXPGPATH, "%s/%s", ctl->Dir, filename);
1261 	ereport(DEBUG2,
1262 			(errmsg("removing file \"%s\"", path)));
1263 	unlink(path);
1264 }
1265 
1266 /*
1267  * Delete an individual SLRU segment, identified by the segment number.
1268  */
1269 void
SlruDeleteSegment(SlruCtl ctl,int segno)1270 SlruDeleteSegment(SlruCtl ctl, int segno)
1271 {
1272 	SlruShared	shared = ctl->shared;
1273 	int			slotno;
1274 	char		path[MAXPGPATH];
1275 	bool		did_write;
1276 
1277 	/* Clean out any possibly existing references to the segment. */
1278 	LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
1279 restart:
1280 	did_write = false;
1281 	for (slotno = 0; slotno < shared->num_slots; slotno++)
1282 	{
1283 		int			pagesegno = shared->page_number[slotno] / SLRU_PAGES_PER_SEGMENT;
1284 
1285 		if (shared->page_status[slotno] == SLRU_PAGE_EMPTY)
1286 			continue;
1287 
1288 		/* not the segment we're looking for */
1289 		if (pagesegno != segno)
1290 			continue;
1291 
1292 		/* If page is clean, just change state to EMPTY (expected case). */
1293 		if (shared->page_status[slotno] == SLRU_PAGE_VALID &&
1294 			!shared->page_dirty[slotno])
1295 		{
1296 			shared->page_status[slotno] = SLRU_PAGE_EMPTY;
1297 			continue;
1298 		}
1299 
1300 		/* Same logic as SimpleLruTruncate() */
1301 		if (shared->page_status[slotno] == SLRU_PAGE_VALID)
1302 			SlruInternalWritePage(ctl, slotno, NULL);
1303 		else
1304 			SimpleLruWaitIO(ctl, slotno);
1305 
1306 		did_write = true;
1307 	}
1308 
1309 	/*
1310 	 * Be extra careful and re-check. The IO functions release the control
1311 	 * lock, so new pages could have been read in.
1312 	 */
1313 	if (did_write)
1314 		goto restart;
1315 
1316 	snprintf(path, MAXPGPATH, "%s/%04X", ctl->Dir, segno);
1317 	ereport(DEBUG2,
1318 			(errmsg("removing file \"%s\"", path)));
1319 	unlink(path);
1320 
1321 	LWLockRelease(shared->ControlLock);
1322 }
1323 
1324 /*
1325  * Determine whether a segment is okay to delete.
1326  *
1327  * segpage is the first page of the segment, and cutoffPage is the oldest (in
1328  * PagePrecedes order) page in the SLRU containing still-useful data.  Since
1329  * every core PagePrecedes callback implements "wrap around", check the
1330  * segment's first and last pages:
1331  *
1332  * first<cutoff  && last<cutoff:  yes
1333  * first<cutoff  && last>=cutoff: no; cutoff falls inside this segment
1334  * first>=cutoff && last<cutoff:  no; wrap point falls inside this segment
1335  * first>=cutoff && last>=cutoff: no; every page of this segment is too young
1336  */
1337 static bool
SlruMayDeleteSegment(SlruCtl ctl,int segpage,int cutoffPage)1338 SlruMayDeleteSegment(SlruCtl ctl, int segpage, int cutoffPage)
1339 {
1340 	int			seg_last_page = segpage + SLRU_PAGES_PER_SEGMENT - 1;
1341 
1342 	Assert(segpage % SLRU_PAGES_PER_SEGMENT == 0);
1343 
1344 	return (ctl->PagePrecedes(segpage, cutoffPage) &&
1345 			ctl->PagePrecedes(seg_last_page, cutoffPage));
1346 }
1347 
1348 #ifdef USE_ASSERT_CHECKING
1349 static void
SlruPagePrecedesTestOffset(SlruCtl ctl,int per_page,uint32 offset)1350 SlruPagePrecedesTestOffset(SlruCtl ctl, int per_page, uint32 offset)
1351 {
1352 	TransactionId lhs,
1353 				rhs;
1354 	int			newestPage,
1355 				oldestPage;
1356 	TransactionId newestXact,
1357 				oldestXact;
1358 
1359 	/*
1360 	 * Compare an XID pair having undefined order (see RFC 1982), a pair at
1361 	 * "opposite ends" of the XID space.  TransactionIdPrecedes() treats each
1362 	 * as preceding the other.  If RHS is oldestXact, LHS is the first XID we
1363 	 * must not assign.
1364 	 */
1365 	lhs = per_page + offset;	/* skip first page to avoid non-normal XIDs */
1366 	rhs = lhs + (1U << 31);
1367 	Assert(TransactionIdPrecedes(lhs, rhs));
1368 	Assert(TransactionIdPrecedes(rhs, lhs));
1369 	Assert(!TransactionIdPrecedes(lhs - 1, rhs));
1370 	Assert(TransactionIdPrecedes(rhs, lhs - 1));
1371 	Assert(TransactionIdPrecedes(lhs + 1, rhs));
1372 	Assert(!TransactionIdPrecedes(rhs, lhs + 1));
1373 	Assert(!TransactionIdFollowsOrEquals(lhs, rhs));
1374 	Assert(!TransactionIdFollowsOrEquals(rhs, lhs));
1375 	Assert(!ctl->PagePrecedes(lhs / per_page, lhs / per_page));
1376 	Assert(!ctl->PagePrecedes(lhs / per_page, rhs / per_page));
1377 	Assert(!ctl->PagePrecedes(rhs / per_page, lhs / per_page));
1378 	Assert(!ctl->PagePrecedes((lhs - per_page) / per_page, rhs / per_page));
1379 	Assert(ctl->PagePrecedes(rhs / per_page, (lhs - 3 * per_page) / per_page));
1380 	Assert(ctl->PagePrecedes(rhs / per_page, (lhs - 2 * per_page) / per_page));
1381 	Assert(ctl->PagePrecedes(rhs / per_page, (lhs - 1 * per_page) / per_page)
1382 		   || (1U << 31) % per_page != 0);	/* See CommitTsPagePrecedes() */
1383 	Assert(ctl->PagePrecedes((lhs + 1 * per_page) / per_page, rhs / per_page)
1384 		   || (1U << 31) % per_page != 0);
1385 	Assert(ctl->PagePrecedes((lhs + 2 * per_page) / per_page, rhs / per_page));
1386 	Assert(ctl->PagePrecedes((lhs + 3 * per_page) / per_page, rhs / per_page));
1387 	Assert(!ctl->PagePrecedes(rhs / per_page, (lhs + per_page) / per_page));
1388 
1389 	/*
1390 	 * GetNewTransactionId() has assigned the last XID it can safely use, and
1391 	 * that XID is in the *LAST* page of the second segment.  We must not
1392 	 * delete that segment.
1393 	 */
1394 	newestPage = 2 * SLRU_PAGES_PER_SEGMENT - 1;
1395 	newestXact = newestPage * per_page + offset;
1396 	Assert(newestXact / per_page == newestPage);
1397 	oldestXact = newestXact + 1;
1398 	oldestXact -= 1U << 31;
1399 	oldestPage = oldestXact / per_page;
1400 	Assert(!SlruMayDeleteSegment(ctl,
1401 								 (newestPage -
1402 								  newestPage % SLRU_PAGES_PER_SEGMENT),
1403 								 oldestPage));
1404 
1405 	/*
1406 	 * GetNewTransactionId() has assigned the last XID it can safely use, and
1407 	 * that XID is in the *FIRST* page of the second segment.  We must not
1408 	 * delete that segment.
1409 	 */
1410 	newestPage = SLRU_PAGES_PER_SEGMENT;
1411 	newestXact = newestPage * per_page + offset;
1412 	Assert(newestXact / per_page == newestPage);
1413 	oldestXact = newestXact + 1;
1414 	oldestXact -= 1U << 31;
1415 	oldestPage = oldestXact / per_page;
1416 	Assert(!SlruMayDeleteSegment(ctl,
1417 								 (newestPage -
1418 								  newestPage % SLRU_PAGES_PER_SEGMENT),
1419 								 oldestPage));
1420 }
1421 
1422 /*
1423  * Unit-test a PagePrecedes function.
1424  *
1425  * This assumes every uint32 >= FirstNormalTransactionId is a valid key.  It
1426  * assumes each value occupies a contiguous, fixed-size region of SLRU bytes.
1427  * (MultiXactMemberCtl separates flags from XIDs.  AsyncCtl has
1428  * variable-length entries, no keys, and no random access.  These unit tests
1429  * do not apply to them.)
1430  */
1431 void
SlruPagePrecedesUnitTests(SlruCtl ctl,int per_page)1432 SlruPagePrecedesUnitTests(SlruCtl ctl, int per_page)
1433 {
1434 	/* Test first, middle and last entries of a page. */
1435 	SlruPagePrecedesTestOffset(ctl, per_page, 0);
1436 	SlruPagePrecedesTestOffset(ctl, per_page, per_page / 2);
1437 	SlruPagePrecedesTestOffset(ctl, per_page, per_page - 1);
1438 }
1439 #endif
1440 
1441 /*
1442  * SlruScanDirectory callback
1443  *		This callback reports true if there's any segment wholly prior to the
1444  *		one containing the page passed as "data".
1445  */
1446 bool
SlruScanDirCbReportPresence(SlruCtl ctl,char * filename,int segpage,void * data)1447 SlruScanDirCbReportPresence(SlruCtl ctl, char *filename, int segpage, void *data)
1448 {
1449 	int			cutoffPage = *(int *) data;
1450 
1451 	if (SlruMayDeleteSegment(ctl, segpage, cutoffPage))
1452 		return true;			/* found one; don't iterate any more */
1453 
1454 	return false;				/* keep going */
1455 }
1456 
1457 /*
1458  * SlruScanDirectory callback.
1459  *		This callback deletes segments prior to the one passed in as "data".
1460  */
1461 static bool
SlruScanDirCbDeleteCutoff(SlruCtl ctl,char * filename,int segpage,void * data)1462 SlruScanDirCbDeleteCutoff(SlruCtl ctl, char *filename, int segpage, void *data)
1463 {
1464 	int			cutoffPage = *(int *) data;
1465 
1466 	if (SlruMayDeleteSegment(ctl, segpage, cutoffPage))
1467 		SlruInternalDeleteSegment(ctl, filename);
1468 
1469 	return false;				/* keep going */
1470 }
1471 
1472 /*
1473  * SlruScanDirectory callback.
1474  *		This callback deletes all segments.
1475  */
1476 bool
SlruScanDirCbDeleteAll(SlruCtl ctl,char * filename,int segpage,void * data)1477 SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int segpage, void *data)
1478 {
1479 	SlruInternalDeleteSegment(ctl, filename);
1480 
1481 	return false;				/* keep going */
1482 }
1483 
1484 /*
1485  * Scan the SimpleLRU directory and apply a callback to each file found in it.
1486  *
1487  * If the callback returns true, the scan is stopped.  The last return value
1488  * from the callback is returned.
1489  *
1490  * The callback receives the following arguments: 1. the SlruCtl struct for the
1491  * slru being truncated; 2. the filename being considered; 3. the page number
1492  * for the first page of that file; 4. a pointer to the opaque data given to us
1493  * by the caller.
1494  *
1495  * Note that the ordering in which the directory is scanned is not guaranteed.
1496  *
1497  * Note that no locking is applied.
1498  */
1499 bool
SlruScanDirectory(SlruCtl ctl,SlruScanCallback callback,void * data)1500 SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)
1501 {
1502 	bool		retval = false;
1503 	DIR		   *cldir;
1504 	struct dirent *clde;
1505 	int			segno;
1506 	int			segpage;
1507 
1508 	cldir = AllocateDir(ctl->Dir);
1509 	while ((clde = ReadDir(cldir, ctl->Dir)) != NULL)
1510 	{
1511 		size_t		len;
1512 
1513 		len = strlen(clde->d_name);
1514 
1515 		if ((len == 4 || len == 5 || len == 6) &&
1516 			strspn(clde->d_name, "0123456789ABCDEF") == len)
1517 		{
1518 			segno = (int) strtol(clde->d_name, NULL, 16);
1519 			segpage = segno * SLRU_PAGES_PER_SEGMENT;
1520 
1521 			elog(DEBUG2, "SlruScanDirectory invoking callback on %s/%s",
1522 				 ctl->Dir, clde->d_name);
1523 			retval = callback(ctl, clde->d_name, segpage, data);
1524 			if (retval)
1525 				break;
1526 		}
1527 	}
1528 	FreeDir(cldir);
1529 
1530 	return retval;
1531 }
1532