1 /*------------------------------------------------------------------------- 2 * 3 * localbuf.c 4 * local buffer manager. Fast buffer manager for temporary tables, 5 * which never need to be WAL-logged or checkpointed, etc. 6 * 7 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group 8 * Portions Copyright (c) 1994-5, Regents of the University of California 9 * 10 * 11 * IDENTIFICATION 12 * src/backend/storage/buffer/localbuf.c 13 * 14 *------------------------------------------------------------------------- 15 */ 16 #include "postgres.h" 17 18 #include "access/parallel.h" 19 #include "catalog/catalog.h" 20 #include "executor/instrument.h" 21 #include "storage/buf_internals.h" 22 #include "storage/bufmgr.h" 23 #include "utils/guc.h" 24 #include "utils/memutils.h" 25 #include "utils/resowner_private.h" 26 27 28 /*#define LBDEBUG*/ 29 30 /* entry for buffer lookup hashtable */ 31 typedef struct 32 { 33 BufferTag key; /* Tag of a disk page */ 34 int id; /* Associated local buffer's index */ 35 } LocalBufferLookupEnt; 36 37 /* Note: this macro only works on local buffers, not shared ones! */ 38 #define LocalBufHdrGetBlock(bufHdr) \ 39 LocalBufferBlockPointers[-((bufHdr)->buf_id + 2)] 40 41 int NLocBuffer = 0; /* until buffers are initialized */ 42 43 BufferDesc *LocalBufferDescriptors = NULL; 44 Block *LocalBufferBlockPointers = NULL; 45 int32 *LocalRefCount = NULL; 46 47 static int nextFreeLocalBuf = 0; 48 49 static HTAB *LocalBufHash = NULL; 50 51 52 static void InitLocalBuffers(void); 53 static Block GetLocalBufferStorage(void); 54 55 56 /* 57 * LocalPrefetchBuffer - 58 * initiate asynchronous read of a block of a relation 59 * 60 * Do PrefetchBuffer's work for temporary relations. 61 * No-op if prefetching isn't compiled in. 62 */ 63 void 64 LocalPrefetchBuffer(SMgrRelation smgr, ForkNumber forkNum, 65 BlockNumber blockNum) 66 { 67 #ifdef USE_PREFETCH 68 BufferTag newTag; /* identity of requested block */ 69 LocalBufferLookupEnt *hresult; 70 71 INIT_BUFFERTAG(newTag, smgr->smgr_rnode.node, forkNum, blockNum); 72 73 /* Initialize local buffers if first request in this session */ 74 if (LocalBufHash == NULL) 75 InitLocalBuffers(); 76 77 /* See if the desired buffer already exists */ 78 hresult = (LocalBufferLookupEnt *) 79 hash_search(LocalBufHash, (void *) &newTag, HASH_FIND, NULL); 80 81 if (hresult) 82 { 83 /* Yes, so nothing to do */ 84 return; 85 } 86 87 /* Not in buffers, so initiate prefetch */ 88 smgrprefetch(smgr, forkNum, blockNum); 89 #endif /* USE_PREFETCH */ 90 } 91 92 93 /* 94 * LocalBufferAlloc - 95 * Find or create a local buffer for the given page of the given relation. 96 * 97 * API is similar to bufmgr.c's BufferAlloc, except that we do not need 98 * to do any locking since this is all local. Also, IO_IN_PROGRESS 99 * does not get set. Lastly, we support only default access strategy 100 * (hence, usage_count is always advanced). 101 */ 102 BufferDesc * 103 LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum, 104 bool *foundPtr) 105 { 106 BufferTag newTag; /* identity of requested block */ 107 LocalBufferLookupEnt *hresult; 108 BufferDesc *bufHdr; 109 int b; 110 int trycounter; 111 bool found; 112 uint32 buf_state; 113 114 INIT_BUFFERTAG(newTag, smgr->smgr_rnode.node, forkNum, blockNum); 115 116 /* Initialize local buffers if first request in this session */ 117 if (LocalBufHash == NULL) 118 InitLocalBuffers(); 119 120 /* See if the desired buffer already exists */ 121 hresult = (LocalBufferLookupEnt *) 122 hash_search(LocalBufHash, (void *) &newTag, HASH_FIND, NULL); 123 124 if (hresult) 125 { 126 b = hresult->id; 127 bufHdr = GetLocalBufferDescriptor(b); 128 Assert(BUFFERTAGS_EQUAL(bufHdr->tag, newTag)); 129 #ifdef LBDEBUG 130 fprintf(stderr, "LB ALLOC (%u,%d,%d) %d\n", 131 smgr->smgr_rnode.node.relNode, forkNum, blockNum, -b - 1); 132 #endif 133 buf_state = pg_atomic_read_u32(&bufHdr->state); 134 135 /* this part is equivalent to PinBuffer for a shared buffer */ 136 if (LocalRefCount[b] == 0) 137 { 138 if (BUF_STATE_GET_USAGECOUNT(buf_state) < BM_MAX_USAGE_COUNT) 139 { 140 buf_state += BUF_USAGECOUNT_ONE; 141 pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state); 142 } 143 } 144 LocalRefCount[b]++; 145 ResourceOwnerRememberBuffer(CurrentResourceOwner, 146 BufferDescriptorGetBuffer(bufHdr)); 147 if (buf_state & BM_VALID) 148 *foundPtr = true; 149 else 150 { 151 /* Previous read attempt must have failed; try again */ 152 *foundPtr = false; 153 } 154 return bufHdr; 155 } 156 157 #ifdef LBDEBUG 158 fprintf(stderr, "LB ALLOC (%u,%d,%d) %d\n", 159 smgr->smgr_rnode.node.relNode, forkNum, blockNum, 160 -nextFreeLocalBuf - 1); 161 #endif 162 163 /* 164 * Need to get a new buffer. We use a clock sweep algorithm (essentially 165 * the same as what freelist.c does now...) 166 */ 167 trycounter = NLocBuffer; 168 for (;;) 169 { 170 b = nextFreeLocalBuf; 171 172 if (++nextFreeLocalBuf >= NLocBuffer) 173 nextFreeLocalBuf = 0; 174 175 bufHdr = GetLocalBufferDescriptor(b); 176 177 if (LocalRefCount[b] == 0) 178 { 179 buf_state = pg_atomic_read_u32(&bufHdr->state); 180 181 if (BUF_STATE_GET_USAGECOUNT(buf_state) > 0) 182 { 183 buf_state -= BUF_USAGECOUNT_ONE; 184 pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state); 185 trycounter = NLocBuffer; 186 } 187 else 188 { 189 /* Found a usable buffer */ 190 LocalRefCount[b]++; 191 ResourceOwnerRememberBuffer(CurrentResourceOwner, 192 BufferDescriptorGetBuffer(bufHdr)); 193 break; 194 } 195 } 196 else if (--trycounter == 0) 197 ereport(ERROR, 198 (errcode(ERRCODE_INSUFFICIENT_RESOURCES), 199 errmsg("no empty local buffer available"))); 200 } 201 202 /* 203 * this buffer is not referenced but it might still be dirty. if that's 204 * the case, write it out before reusing it! 205 */ 206 if (buf_state & BM_DIRTY) 207 { 208 SMgrRelation oreln; 209 Page localpage = (char *) LocalBufHdrGetBlock(bufHdr); 210 211 /* Find smgr relation for buffer */ 212 oreln = smgropen(bufHdr->tag.rnode, MyBackendId); 213 214 PageSetChecksumInplace(localpage, bufHdr->tag.blockNum); 215 216 /* And write... */ 217 smgrwrite(oreln, 218 bufHdr->tag.forkNum, 219 bufHdr->tag.blockNum, 220 localpage, 221 false); 222 223 /* Mark not-dirty now in case we error out below */ 224 buf_state &= ~BM_DIRTY; 225 pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state); 226 227 pgBufferUsage.local_blks_written++; 228 } 229 230 /* 231 * lazy memory allocation: allocate space on first use of a buffer. 232 */ 233 if (LocalBufHdrGetBlock(bufHdr) == NULL) 234 { 235 /* Set pointer for use by BufferGetBlock() macro */ 236 LocalBufHdrGetBlock(bufHdr) = GetLocalBufferStorage(); 237 } 238 239 /* 240 * Update the hash table: remove old entry, if any, and make new one. 241 */ 242 if (buf_state & BM_TAG_VALID) 243 { 244 hresult = (LocalBufferLookupEnt *) 245 hash_search(LocalBufHash, (void *) &bufHdr->tag, 246 HASH_REMOVE, NULL); 247 if (!hresult) /* shouldn't happen */ 248 elog(ERROR, "local buffer hash table corrupted"); 249 /* mark buffer invalid just in case hash insert fails */ 250 CLEAR_BUFFERTAG(bufHdr->tag); 251 buf_state &= ~(BM_VALID | BM_TAG_VALID); 252 pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state); 253 } 254 255 hresult = (LocalBufferLookupEnt *) 256 hash_search(LocalBufHash, (void *) &newTag, HASH_ENTER, &found); 257 if (found) /* shouldn't happen */ 258 elog(ERROR, "local buffer hash table corrupted"); 259 hresult->id = b; 260 261 /* 262 * it's all ours now. 263 */ 264 bufHdr->tag = newTag; 265 buf_state &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | BM_IO_ERROR); 266 buf_state |= BM_TAG_VALID; 267 buf_state &= ~BUF_USAGECOUNT_MASK; 268 buf_state += BUF_USAGECOUNT_ONE; 269 pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state); 270 271 *foundPtr = false; 272 return bufHdr; 273 } 274 275 /* 276 * MarkLocalBufferDirty - 277 * mark a local buffer dirty 278 */ 279 void 280 MarkLocalBufferDirty(Buffer buffer) 281 { 282 int bufid; 283 BufferDesc *bufHdr; 284 uint32 buf_state; 285 286 Assert(BufferIsLocal(buffer)); 287 288 #ifdef LBDEBUG 289 fprintf(stderr, "LB DIRTY %d\n", buffer); 290 #endif 291 292 bufid = -(buffer + 1); 293 294 Assert(LocalRefCount[bufid] > 0); 295 296 bufHdr = GetLocalBufferDescriptor(bufid); 297 298 buf_state = pg_atomic_read_u32(&bufHdr->state); 299 300 if (!(buf_state & BM_DIRTY)) 301 pgBufferUsage.local_blks_dirtied++; 302 303 buf_state |= BM_DIRTY; 304 305 pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state); 306 } 307 308 /* 309 * DropRelFileNodeLocalBuffers 310 * This function removes from the buffer pool all the pages of the 311 * specified relation that have block numbers >= firstDelBlock. 312 * (In particular, with firstDelBlock = 0, all pages are removed.) 313 * Dirty pages are simply dropped, without bothering to write them 314 * out first. Therefore, this is NOT rollback-able, and so should be 315 * used only with extreme caution! 316 * 317 * See DropRelFileNodeBuffers in bufmgr.c for more notes. 318 */ 319 void 320 DropRelFileNodeLocalBuffers(RelFileNode rnode, ForkNumber forkNum, 321 BlockNumber firstDelBlock) 322 { 323 int i; 324 325 for (i = 0; i < NLocBuffer; i++) 326 { 327 BufferDesc *bufHdr = GetLocalBufferDescriptor(i); 328 LocalBufferLookupEnt *hresult; 329 uint32 buf_state; 330 331 buf_state = pg_atomic_read_u32(&bufHdr->state); 332 333 if ((buf_state & BM_TAG_VALID) && 334 RelFileNodeEquals(bufHdr->tag.rnode, rnode) && 335 bufHdr->tag.forkNum == forkNum && 336 bufHdr->tag.blockNum >= firstDelBlock) 337 { 338 if (LocalRefCount[i] != 0) 339 elog(ERROR, "block %u of %s is still referenced (local %u)", 340 bufHdr->tag.blockNum, 341 relpathbackend(bufHdr->tag.rnode, MyBackendId, 342 bufHdr->tag.forkNum), 343 LocalRefCount[i]); 344 /* Remove entry from hashtable */ 345 hresult = (LocalBufferLookupEnt *) 346 hash_search(LocalBufHash, (void *) &bufHdr->tag, 347 HASH_REMOVE, NULL); 348 if (!hresult) /* shouldn't happen */ 349 elog(ERROR, "local buffer hash table corrupted"); 350 /* Mark buffer invalid */ 351 CLEAR_BUFFERTAG(bufHdr->tag); 352 buf_state &= ~BUF_FLAG_MASK; 353 buf_state &= ~BUF_USAGECOUNT_MASK; 354 pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state); 355 } 356 } 357 } 358 359 /* 360 * DropRelFileNodeAllLocalBuffers 361 * This function removes from the buffer pool all pages of all forks 362 * of the specified relation. 363 * 364 * See DropRelFileNodeAllBuffers in bufmgr.c for more notes. 365 */ 366 void 367 DropRelFileNodeAllLocalBuffers(RelFileNode rnode) 368 { 369 int i; 370 371 for (i = 0; i < NLocBuffer; i++) 372 { 373 BufferDesc *bufHdr = GetLocalBufferDescriptor(i); 374 LocalBufferLookupEnt *hresult; 375 uint32 buf_state; 376 377 buf_state = pg_atomic_read_u32(&bufHdr->state); 378 379 if ((buf_state & BM_TAG_VALID) && 380 RelFileNodeEquals(bufHdr->tag.rnode, rnode)) 381 { 382 if (LocalRefCount[i] != 0) 383 elog(ERROR, "block %u of %s is still referenced (local %u)", 384 bufHdr->tag.blockNum, 385 relpathbackend(bufHdr->tag.rnode, MyBackendId, 386 bufHdr->tag.forkNum), 387 LocalRefCount[i]); 388 /* Remove entry from hashtable */ 389 hresult = (LocalBufferLookupEnt *) 390 hash_search(LocalBufHash, (void *) &bufHdr->tag, 391 HASH_REMOVE, NULL); 392 if (!hresult) /* shouldn't happen */ 393 elog(ERROR, "local buffer hash table corrupted"); 394 /* Mark buffer invalid */ 395 CLEAR_BUFFERTAG(bufHdr->tag); 396 buf_state &= ~BUF_FLAG_MASK; 397 buf_state &= ~BUF_USAGECOUNT_MASK; 398 pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state); 399 } 400 } 401 } 402 403 /* 404 * InitLocalBuffers - 405 * init the local buffer cache. Since most queries (esp. multi-user ones) 406 * don't involve local buffers, we delay allocating actual memory for the 407 * buffers until we need them; just make the buffer headers here. 408 */ 409 static void 410 InitLocalBuffers(void) 411 { 412 int nbufs = num_temp_buffers; 413 HASHCTL info; 414 int i; 415 416 /* 417 * Parallel workers can't access data in temporary tables, because they 418 * have no visibility into the local buffers of their leader. This is a 419 * convenient, low-cost place to provide a backstop check for that. Note 420 * that we don't wish to prevent a parallel worker from accessing catalog 421 * metadata about a temp table, so checks at higher levels would be 422 * inappropriate. 423 */ 424 if (IsParallelWorker()) 425 ereport(ERROR, 426 (errcode(ERRCODE_INVALID_TRANSACTION_STATE), 427 errmsg("cannot access temporary tables during a parallel operation"))); 428 429 /* Allocate and zero buffer headers and auxiliary arrays */ 430 LocalBufferDescriptors = (BufferDesc *) calloc(nbufs, sizeof(BufferDesc)); 431 LocalBufferBlockPointers = (Block *) calloc(nbufs, sizeof(Block)); 432 LocalRefCount = (int32 *) calloc(nbufs, sizeof(int32)); 433 if (!LocalBufferDescriptors || !LocalBufferBlockPointers || !LocalRefCount) 434 ereport(FATAL, 435 (errcode(ERRCODE_OUT_OF_MEMORY), 436 errmsg("out of memory"))); 437 438 nextFreeLocalBuf = 0; 439 440 /* initialize fields that need to start off nonzero */ 441 for (i = 0; i < nbufs; i++) 442 { 443 BufferDesc *buf = GetLocalBufferDescriptor(i); 444 445 /* 446 * negative to indicate local buffer. This is tricky: shared buffers 447 * start with 0. We have to start with -2. (Note that the routine 448 * BufferDescriptorGetBuffer adds 1 to buf_id so our first buffer id 449 * is -1.) 450 */ 451 buf->buf_id = -i - 2; 452 453 /* 454 * Intentionally do not initialize the buffer's atomic variable 455 * (besides zeroing the underlying memory above). That way we get 456 * errors on platforms without atomics, if somebody (re-)introduces 457 * atomic operations for local buffers. 458 */ 459 } 460 461 /* Create the lookup hash table */ 462 MemSet(&info, 0, sizeof(info)); 463 info.keysize = sizeof(BufferTag); 464 info.entrysize = sizeof(LocalBufferLookupEnt); 465 466 LocalBufHash = hash_create("Local Buffer Lookup Table", 467 nbufs, 468 &info, 469 HASH_ELEM | HASH_BLOBS); 470 471 if (!LocalBufHash) 472 elog(ERROR, "could not initialize local buffer hash table"); 473 474 /* Initialization done, mark buffers allocated */ 475 NLocBuffer = nbufs; 476 } 477 478 /* 479 * GetLocalBufferStorage - allocate memory for a local buffer 480 * 481 * The idea of this function is to aggregate our requests for storage 482 * so that the memory manager doesn't see a whole lot of relatively small 483 * requests. Since we'll never give back a local buffer once it's created 484 * within a particular process, no point in burdening memmgr with separately 485 * managed chunks. 486 */ 487 static Block 488 GetLocalBufferStorage(void) 489 { 490 static char *cur_block = NULL; 491 static int next_buf_in_block = 0; 492 static int num_bufs_in_block = 0; 493 static int total_bufs_allocated = 0; 494 static MemoryContext LocalBufferContext = NULL; 495 496 char *this_buf; 497 498 Assert(total_bufs_allocated < NLocBuffer); 499 500 if (next_buf_in_block >= num_bufs_in_block) 501 { 502 /* Need to make a new request to memmgr */ 503 int num_bufs; 504 505 /* 506 * We allocate local buffers in a context of their own, so that the 507 * space eaten for them is easily recognizable in MemoryContextStats 508 * output. Create the context on first use. 509 */ 510 if (LocalBufferContext == NULL) 511 LocalBufferContext = 512 AllocSetContextCreate(TopMemoryContext, 513 "LocalBufferContext", 514 ALLOCSET_DEFAULT_SIZES); 515 516 /* Start with a 16-buffer request; subsequent ones double each time */ 517 num_bufs = Max(num_bufs_in_block * 2, 16); 518 /* But not more than what we need for all remaining local bufs */ 519 num_bufs = Min(num_bufs, NLocBuffer - total_bufs_allocated); 520 /* And don't overflow MaxAllocSize, either */ 521 num_bufs = Min(num_bufs, MaxAllocSize / BLCKSZ); 522 523 cur_block = (char *) MemoryContextAlloc(LocalBufferContext, 524 num_bufs * BLCKSZ); 525 next_buf_in_block = 0; 526 num_bufs_in_block = num_bufs; 527 } 528 529 /* Allocate next buffer in current memory block */ 530 this_buf = cur_block + next_buf_in_block * BLCKSZ; 531 next_buf_in_block++; 532 total_bufs_allocated++; 533 534 return (Block) this_buf; 535 } 536 537 /* 538 * CheckForLocalBufferLeaks - ensure this backend holds no local buffer pins 539 * 540 * This is just like CheckForBufferLeaks(), but for local buffers. 541 */ 542 static void 543 CheckForLocalBufferLeaks(void) 544 { 545 #ifdef USE_ASSERT_CHECKING 546 if (LocalRefCount) 547 { 548 int RefCountErrors = 0; 549 int i; 550 551 for (i = 0; i < NLocBuffer; i++) 552 { 553 if (LocalRefCount[i] != 0) 554 { 555 Buffer b = -i - 1; 556 557 PrintBufferLeakWarning(b); 558 RefCountErrors++; 559 } 560 } 561 Assert(RefCountErrors == 0); 562 } 563 #endif 564 } 565 566 /* 567 * AtEOXact_LocalBuffers - clean up at end of transaction. 568 * 569 * This is just like AtEOXact_Buffers, but for local buffers. 570 */ 571 void 572 AtEOXact_LocalBuffers(bool isCommit) 573 { 574 CheckForLocalBufferLeaks(); 575 } 576 577 /* 578 * AtProcExit_LocalBuffers - ensure we have dropped pins during backend exit. 579 * 580 * This is just like AtProcExit_Buffers, but for local buffers. 581 */ 582 void 583 AtProcExit_LocalBuffers(void) 584 { 585 /* 586 * We shouldn't be holding any remaining pins; if we are, and assertions 587 * aren't enabled, we'll fail later in DropRelFileNodeBuffers while trying 588 * to drop the temp rels. 589 */ 590 CheckForLocalBufferLeaks(); 591 } 592