1 /*-------------------------------------------------------------------------
2  *
3  * localbuf.c
4  *	  local buffer manager. Fast buffer manager for temporary tables,
5  *	  which never need to be WAL-logged or checkpointed, etc.
6  *
7  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994-5, Regents of the University of California
9  *
10  *
11  * IDENTIFICATION
12  *	  src/backend/storage/buffer/localbuf.c
13  *
14  *-------------------------------------------------------------------------
15  */
16 #include "postgres.h"
17 
18 #include "access/parallel.h"
19 #include "catalog/catalog.h"
20 #include "executor/instrument.h"
21 #include "storage/buf_internals.h"
22 #include "storage/bufmgr.h"
23 #include "utils/guc.h"
24 #include "utils/memutils.h"
25 #include "utils/resowner_private.h"
26 
27 
28 /*#define LBDEBUG*/
29 
30 /* entry for buffer lookup hashtable */
31 typedef struct
32 {
33 	BufferTag	key;			/* Tag of a disk page */
34 	int			id;				/* Associated local buffer's index */
35 } LocalBufferLookupEnt;
36 
37 /* Note: this macro only works on local buffers, not shared ones! */
38 #define LocalBufHdrGetBlock(bufHdr) \
39 	LocalBufferBlockPointers[-((bufHdr)->buf_id + 2)]
40 
41 int			NLocBuffer = 0;		/* until buffers are initialized */
42 
43 BufferDesc *LocalBufferDescriptors = NULL;
44 Block	   *LocalBufferBlockPointers = NULL;
45 int32	   *LocalRefCount = NULL;
46 
47 static int	nextFreeLocalBuf = 0;
48 
49 static HTAB *LocalBufHash = NULL;
50 
51 
52 static void InitLocalBuffers(void);
53 static Block GetLocalBufferStorage(void);
54 
55 
56 /*
57  * LocalPrefetchBuffer -
58  *	  initiate asynchronous read of a block of a relation
59  *
60  * Do PrefetchBuffer's work for temporary relations.
61  * No-op if prefetching isn't compiled in.
62  */
63 void
64 LocalPrefetchBuffer(SMgrRelation smgr, ForkNumber forkNum,
65 					BlockNumber blockNum)
66 {
67 #ifdef USE_PREFETCH
68 	BufferTag	newTag;			/* identity of requested block */
69 	LocalBufferLookupEnt *hresult;
70 
71 	INIT_BUFFERTAG(newTag, smgr->smgr_rnode.node, forkNum, blockNum);
72 
73 	/* Initialize local buffers if first request in this session */
74 	if (LocalBufHash == NULL)
75 		InitLocalBuffers();
76 
77 	/* See if the desired buffer already exists */
78 	hresult = (LocalBufferLookupEnt *)
79 		hash_search(LocalBufHash, (void *) &newTag, HASH_FIND, NULL);
80 
81 	if (hresult)
82 	{
83 		/* Yes, so nothing to do */
84 		return;
85 	}
86 
87 	/* Not in buffers, so initiate prefetch */
88 	smgrprefetch(smgr, forkNum, blockNum);
89 #endif							/* USE_PREFETCH */
90 }
91 
92 
93 /*
94  * LocalBufferAlloc -
95  *	  Find or create a local buffer for the given page of the given relation.
96  *
97  * API is similar to bufmgr.c's BufferAlloc, except that we do not need
98  * to do any locking since this is all local.   Also, IO_IN_PROGRESS
99  * does not get set.  Lastly, we support only default access strategy
100  * (hence, usage_count is always advanced).
101  */
102 BufferDesc *
103 LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
104 				 bool *foundPtr)
105 {
106 	BufferTag	newTag;			/* identity of requested block */
107 	LocalBufferLookupEnt *hresult;
108 	BufferDesc *bufHdr;
109 	int			b;
110 	int			trycounter;
111 	bool		found;
112 	uint32		buf_state;
113 
114 	INIT_BUFFERTAG(newTag, smgr->smgr_rnode.node, forkNum, blockNum);
115 
116 	/* Initialize local buffers if first request in this session */
117 	if (LocalBufHash == NULL)
118 		InitLocalBuffers();
119 
120 	/* See if the desired buffer already exists */
121 	hresult = (LocalBufferLookupEnt *)
122 		hash_search(LocalBufHash, (void *) &newTag, HASH_FIND, NULL);
123 
124 	if (hresult)
125 	{
126 		b = hresult->id;
127 		bufHdr = GetLocalBufferDescriptor(b);
128 		Assert(BUFFERTAGS_EQUAL(bufHdr->tag, newTag));
129 #ifdef LBDEBUG
130 		fprintf(stderr, "LB ALLOC (%u,%d,%d) %d\n",
131 				smgr->smgr_rnode.node.relNode, forkNum, blockNum, -b - 1);
132 #endif
133 		buf_state = pg_atomic_read_u32(&bufHdr->state);
134 
135 		/* this part is equivalent to PinBuffer for a shared buffer */
136 		if (LocalRefCount[b] == 0)
137 		{
138 			if (BUF_STATE_GET_USAGECOUNT(buf_state) < BM_MAX_USAGE_COUNT)
139 			{
140 				buf_state += BUF_USAGECOUNT_ONE;
141 				pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
142 			}
143 		}
144 		LocalRefCount[b]++;
145 		ResourceOwnerRememberBuffer(CurrentResourceOwner,
146 									BufferDescriptorGetBuffer(bufHdr));
147 		if (buf_state & BM_VALID)
148 			*foundPtr = true;
149 		else
150 		{
151 			/* Previous read attempt must have failed; try again */
152 			*foundPtr = false;
153 		}
154 		return bufHdr;
155 	}
156 
157 #ifdef LBDEBUG
158 	fprintf(stderr, "LB ALLOC (%u,%d,%d) %d\n",
159 			smgr->smgr_rnode.node.relNode, forkNum, blockNum,
160 			-nextFreeLocalBuf - 1);
161 #endif
162 
163 	/*
164 	 * Need to get a new buffer.  We use a clock sweep algorithm (essentially
165 	 * the same as what freelist.c does now...)
166 	 */
167 	trycounter = NLocBuffer;
168 	for (;;)
169 	{
170 		b = nextFreeLocalBuf;
171 
172 		if (++nextFreeLocalBuf >= NLocBuffer)
173 			nextFreeLocalBuf = 0;
174 
175 		bufHdr = GetLocalBufferDescriptor(b);
176 
177 		if (LocalRefCount[b] == 0)
178 		{
179 			buf_state = pg_atomic_read_u32(&bufHdr->state);
180 
181 			if (BUF_STATE_GET_USAGECOUNT(buf_state) > 0)
182 			{
183 				buf_state -= BUF_USAGECOUNT_ONE;
184 				pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
185 				trycounter = NLocBuffer;
186 			}
187 			else
188 			{
189 				/* Found a usable buffer */
190 				LocalRefCount[b]++;
191 				ResourceOwnerRememberBuffer(CurrentResourceOwner,
192 											BufferDescriptorGetBuffer(bufHdr));
193 				break;
194 			}
195 		}
196 		else if (--trycounter == 0)
197 			ereport(ERROR,
198 					(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
199 					 errmsg("no empty local buffer available")));
200 	}
201 
202 	/*
203 	 * this buffer is not referenced but it might still be dirty. if that's
204 	 * the case, write it out before reusing it!
205 	 */
206 	if (buf_state & BM_DIRTY)
207 	{
208 		SMgrRelation oreln;
209 		Page		localpage = (char *) LocalBufHdrGetBlock(bufHdr);
210 
211 		/* Find smgr relation for buffer */
212 		oreln = smgropen(bufHdr->tag.rnode, MyBackendId);
213 
214 		PageSetChecksumInplace(localpage, bufHdr->tag.blockNum);
215 
216 		/* And write... */
217 		smgrwrite(oreln,
218 				  bufHdr->tag.forkNum,
219 				  bufHdr->tag.blockNum,
220 				  localpage,
221 				  false);
222 
223 		/* Mark not-dirty now in case we error out below */
224 		buf_state &= ~BM_DIRTY;
225 		pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
226 
227 		pgBufferUsage.local_blks_written++;
228 	}
229 
230 	/*
231 	 * lazy memory allocation: allocate space on first use of a buffer.
232 	 */
233 	if (LocalBufHdrGetBlock(bufHdr) == NULL)
234 	{
235 		/* Set pointer for use by BufferGetBlock() macro */
236 		LocalBufHdrGetBlock(bufHdr) = GetLocalBufferStorage();
237 	}
238 
239 	/*
240 	 * Update the hash table: remove old entry, if any, and make new one.
241 	 */
242 	if (buf_state & BM_TAG_VALID)
243 	{
244 		hresult = (LocalBufferLookupEnt *)
245 			hash_search(LocalBufHash, (void *) &bufHdr->tag,
246 						HASH_REMOVE, NULL);
247 		if (!hresult)			/* shouldn't happen */
248 			elog(ERROR, "local buffer hash table corrupted");
249 		/* mark buffer invalid just in case hash insert fails */
250 		CLEAR_BUFFERTAG(bufHdr->tag);
251 		buf_state &= ~(BM_VALID | BM_TAG_VALID);
252 		pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
253 	}
254 
255 	hresult = (LocalBufferLookupEnt *)
256 		hash_search(LocalBufHash, (void *) &newTag, HASH_ENTER, &found);
257 	if (found)					/* shouldn't happen */
258 		elog(ERROR, "local buffer hash table corrupted");
259 	hresult->id = b;
260 
261 	/*
262 	 * it's all ours now.
263 	 */
264 	bufHdr->tag = newTag;
265 	buf_state &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | BM_IO_ERROR);
266 	buf_state |= BM_TAG_VALID;
267 	buf_state &= ~BUF_USAGECOUNT_MASK;
268 	buf_state += BUF_USAGECOUNT_ONE;
269 	pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
270 
271 	*foundPtr = false;
272 	return bufHdr;
273 }
274 
275 /*
276  * MarkLocalBufferDirty -
277  *	  mark a local buffer dirty
278  */
279 void
280 MarkLocalBufferDirty(Buffer buffer)
281 {
282 	int			bufid;
283 	BufferDesc *bufHdr;
284 	uint32		buf_state;
285 
286 	Assert(BufferIsLocal(buffer));
287 
288 #ifdef LBDEBUG
289 	fprintf(stderr, "LB DIRTY %d\n", buffer);
290 #endif
291 
292 	bufid = -(buffer + 1);
293 
294 	Assert(LocalRefCount[bufid] > 0);
295 
296 	bufHdr = GetLocalBufferDescriptor(bufid);
297 
298 	buf_state = pg_atomic_read_u32(&bufHdr->state);
299 
300 	if (!(buf_state & BM_DIRTY))
301 		pgBufferUsage.local_blks_dirtied++;
302 
303 	buf_state |= BM_DIRTY;
304 
305 	pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
306 }
307 
308 /*
309  * DropRelFileNodeLocalBuffers
310  *		This function removes from the buffer pool all the pages of the
311  *		specified relation that have block numbers >= firstDelBlock.
312  *		(In particular, with firstDelBlock = 0, all pages are removed.)
313  *		Dirty pages are simply dropped, without bothering to write them
314  *		out first.  Therefore, this is NOT rollback-able, and so should be
315  *		used only with extreme caution!
316  *
317  *		See DropRelFileNodeBuffers in bufmgr.c for more notes.
318  */
319 void
320 DropRelFileNodeLocalBuffers(RelFileNode rnode, ForkNumber forkNum,
321 							BlockNumber firstDelBlock)
322 {
323 	int			i;
324 
325 	for (i = 0; i < NLocBuffer; i++)
326 	{
327 		BufferDesc *bufHdr = GetLocalBufferDescriptor(i);
328 		LocalBufferLookupEnt *hresult;
329 		uint32		buf_state;
330 
331 		buf_state = pg_atomic_read_u32(&bufHdr->state);
332 
333 		if ((buf_state & BM_TAG_VALID) &&
334 			RelFileNodeEquals(bufHdr->tag.rnode, rnode) &&
335 			bufHdr->tag.forkNum == forkNum &&
336 			bufHdr->tag.blockNum >= firstDelBlock)
337 		{
338 			if (LocalRefCount[i] != 0)
339 				elog(ERROR, "block %u of %s is still referenced (local %u)",
340 					 bufHdr->tag.blockNum,
341 					 relpathbackend(bufHdr->tag.rnode, MyBackendId,
342 									bufHdr->tag.forkNum),
343 					 LocalRefCount[i]);
344 			/* Remove entry from hashtable */
345 			hresult = (LocalBufferLookupEnt *)
346 				hash_search(LocalBufHash, (void *) &bufHdr->tag,
347 							HASH_REMOVE, NULL);
348 			if (!hresult)		/* shouldn't happen */
349 				elog(ERROR, "local buffer hash table corrupted");
350 			/* Mark buffer invalid */
351 			CLEAR_BUFFERTAG(bufHdr->tag);
352 			buf_state &= ~BUF_FLAG_MASK;
353 			buf_state &= ~BUF_USAGECOUNT_MASK;
354 			pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
355 		}
356 	}
357 }
358 
359 /*
360  * DropRelFileNodeAllLocalBuffers
361  *		This function removes from the buffer pool all pages of all forks
362  *		of the specified relation.
363  *
364  *		See DropRelFileNodeAllBuffers in bufmgr.c for more notes.
365  */
366 void
367 DropRelFileNodeAllLocalBuffers(RelFileNode rnode)
368 {
369 	int			i;
370 
371 	for (i = 0; i < NLocBuffer; i++)
372 	{
373 		BufferDesc *bufHdr = GetLocalBufferDescriptor(i);
374 		LocalBufferLookupEnt *hresult;
375 		uint32		buf_state;
376 
377 		buf_state = pg_atomic_read_u32(&bufHdr->state);
378 
379 		if ((buf_state & BM_TAG_VALID) &&
380 			RelFileNodeEquals(bufHdr->tag.rnode, rnode))
381 		{
382 			if (LocalRefCount[i] != 0)
383 				elog(ERROR, "block %u of %s is still referenced (local %u)",
384 					 bufHdr->tag.blockNum,
385 					 relpathbackend(bufHdr->tag.rnode, MyBackendId,
386 									bufHdr->tag.forkNum),
387 					 LocalRefCount[i]);
388 			/* Remove entry from hashtable */
389 			hresult = (LocalBufferLookupEnt *)
390 				hash_search(LocalBufHash, (void *) &bufHdr->tag,
391 							HASH_REMOVE, NULL);
392 			if (!hresult)		/* shouldn't happen */
393 				elog(ERROR, "local buffer hash table corrupted");
394 			/* Mark buffer invalid */
395 			CLEAR_BUFFERTAG(bufHdr->tag);
396 			buf_state &= ~BUF_FLAG_MASK;
397 			buf_state &= ~BUF_USAGECOUNT_MASK;
398 			pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
399 		}
400 	}
401 }
402 
403 /*
404  * InitLocalBuffers -
405  *	  init the local buffer cache. Since most queries (esp. multi-user ones)
406  *	  don't involve local buffers, we delay allocating actual memory for the
407  *	  buffers until we need them; just make the buffer headers here.
408  */
409 static void
410 InitLocalBuffers(void)
411 {
412 	int			nbufs = num_temp_buffers;
413 	HASHCTL		info;
414 	int			i;
415 
416 	/*
417 	 * Parallel workers can't access data in temporary tables, because they
418 	 * have no visibility into the local buffers of their leader.  This is a
419 	 * convenient, low-cost place to provide a backstop check for that.  Note
420 	 * that we don't wish to prevent a parallel worker from accessing catalog
421 	 * metadata about a temp table, so checks at higher levels would be
422 	 * inappropriate.
423 	 */
424 	if (IsParallelWorker())
425 		ereport(ERROR,
426 				(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
427 				 errmsg("cannot access temporary tables during a parallel operation")));
428 
429 	/* Allocate and zero buffer headers and auxiliary arrays */
430 	LocalBufferDescriptors = (BufferDesc *) calloc(nbufs, sizeof(BufferDesc));
431 	LocalBufferBlockPointers = (Block *) calloc(nbufs, sizeof(Block));
432 	LocalRefCount = (int32 *) calloc(nbufs, sizeof(int32));
433 	if (!LocalBufferDescriptors || !LocalBufferBlockPointers || !LocalRefCount)
434 		ereport(FATAL,
435 				(errcode(ERRCODE_OUT_OF_MEMORY),
436 				 errmsg("out of memory")));
437 
438 	nextFreeLocalBuf = 0;
439 
440 	/* initialize fields that need to start off nonzero */
441 	for (i = 0; i < nbufs; i++)
442 	{
443 		BufferDesc *buf = GetLocalBufferDescriptor(i);
444 
445 		/*
446 		 * negative to indicate local buffer. This is tricky: shared buffers
447 		 * start with 0. We have to start with -2. (Note that the routine
448 		 * BufferDescriptorGetBuffer adds 1 to buf_id so our first buffer id
449 		 * is -1.)
450 		 */
451 		buf->buf_id = -i - 2;
452 
453 		/*
454 		 * Intentionally do not initialize the buffer's atomic variable
455 		 * (besides zeroing the underlying memory above). That way we get
456 		 * errors on platforms without atomics, if somebody (re-)introduces
457 		 * atomic operations for local buffers.
458 		 */
459 	}
460 
461 	/* Create the lookup hash table */
462 	MemSet(&info, 0, sizeof(info));
463 	info.keysize = sizeof(BufferTag);
464 	info.entrysize = sizeof(LocalBufferLookupEnt);
465 
466 	LocalBufHash = hash_create("Local Buffer Lookup Table",
467 							   nbufs,
468 							   &info,
469 							   HASH_ELEM | HASH_BLOBS);
470 
471 	if (!LocalBufHash)
472 		elog(ERROR, "could not initialize local buffer hash table");
473 
474 	/* Initialization done, mark buffers allocated */
475 	NLocBuffer = nbufs;
476 }
477 
478 /*
479  * GetLocalBufferStorage - allocate memory for a local buffer
480  *
481  * The idea of this function is to aggregate our requests for storage
482  * so that the memory manager doesn't see a whole lot of relatively small
483  * requests.  Since we'll never give back a local buffer once it's created
484  * within a particular process, no point in burdening memmgr with separately
485  * managed chunks.
486  */
487 static Block
488 GetLocalBufferStorage(void)
489 {
490 	static char *cur_block = NULL;
491 	static int	next_buf_in_block = 0;
492 	static int	num_bufs_in_block = 0;
493 	static int	total_bufs_allocated = 0;
494 	static MemoryContext LocalBufferContext = NULL;
495 
496 	char	   *this_buf;
497 
498 	Assert(total_bufs_allocated < NLocBuffer);
499 
500 	if (next_buf_in_block >= num_bufs_in_block)
501 	{
502 		/* Need to make a new request to memmgr */
503 		int			num_bufs;
504 
505 		/*
506 		 * We allocate local buffers in a context of their own, so that the
507 		 * space eaten for them is easily recognizable in MemoryContextStats
508 		 * output.  Create the context on first use.
509 		 */
510 		if (LocalBufferContext == NULL)
511 			LocalBufferContext =
512 				AllocSetContextCreate(TopMemoryContext,
513 									  "LocalBufferContext",
514 									  ALLOCSET_DEFAULT_SIZES);
515 
516 		/* Start with a 16-buffer request; subsequent ones double each time */
517 		num_bufs = Max(num_bufs_in_block * 2, 16);
518 		/* But not more than what we need for all remaining local bufs */
519 		num_bufs = Min(num_bufs, NLocBuffer - total_bufs_allocated);
520 		/* And don't overflow MaxAllocSize, either */
521 		num_bufs = Min(num_bufs, MaxAllocSize / BLCKSZ);
522 
523 		cur_block = (char *) MemoryContextAlloc(LocalBufferContext,
524 												num_bufs * BLCKSZ);
525 		next_buf_in_block = 0;
526 		num_bufs_in_block = num_bufs;
527 	}
528 
529 	/* Allocate next buffer in current memory block */
530 	this_buf = cur_block + next_buf_in_block * BLCKSZ;
531 	next_buf_in_block++;
532 	total_bufs_allocated++;
533 
534 	return (Block) this_buf;
535 }
536 
537 /*
538  * CheckForLocalBufferLeaks - ensure this backend holds no local buffer pins
539  *
540  * This is just like CheckForBufferLeaks(), but for local buffers.
541  */
542 static void
543 CheckForLocalBufferLeaks(void)
544 {
545 #ifdef USE_ASSERT_CHECKING
546 	if (LocalRefCount)
547 	{
548 		int			RefCountErrors = 0;
549 		int			i;
550 
551 		for (i = 0; i < NLocBuffer; i++)
552 		{
553 			if (LocalRefCount[i] != 0)
554 			{
555 				Buffer		b = -i - 1;
556 
557 				PrintBufferLeakWarning(b);
558 				RefCountErrors++;
559 			}
560 		}
561 		Assert(RefCountErrors == 0);
562 	}
563 #endif
564 }
565 
566 /*
567  * AtEOXact_LocalBuffers - clean up at end of transaction.
568  *
569  * This is just like AtEOXact_Buffers, but for local buffers.
570  */
571 void
572 AtEOXact_LocalBuffers(bool isCommit)
573 {
574 	CheckForLocalBufferLeaks();
575 }
576 
577 /*
578  * AtProcExit_LocalBuffers - ensure we have dropped pins during backend exit.
579  *
580  * This is just like AtProcExit_Buffers, but for local buffers.
581  */
582 void
583 AtProcExit_LocalBuffers(void)
584 {
585 	/*
586 	 * We shouldn't be holding any remaining pins; if we are, and assertions
587 	 * aren't enabled, we'll fail later in DropRelFileNodeBuffers while trying
588 	 * to drop the temp rels.
589 	 */
590 	CheckForLocalBufferLeaks();
591 }
592