1 /*------------------------------------------------------------------------- 2 * 3 * autoprewarm.c 4 * Periodically dump information about the blocks present in 5 * shared_buffers, and reload them on server restart. 6 * 7 * Due to locking considerations, we can't actually begin prewarming 8 * until the server reaches a consistent state. We need the catalogs 9 * to be consistent so that we can figure out which relation to lock, 10 * and we need to lock the relations so that we don't try to prewarm 11 * pages from a relation that is in the process of being dropped. 12 * 13 * While prewarming, autoprewarm will use two workers. There's a 14 * master worker that reads and sorts the list of blocks to be 15 * prewarmed and then launches a per-database worker for each 16 * relevant database in turn. The former keeps running after the 17 * initial prewarm is complete to update the dump file periodically. 18 * 19 * Copyright (c) 2016-2018, PostgreSQL Global Development Group 20 * 21 * IDENTIFICATION 22 * contrib/pg_prewarm/autoprewarm.c 23 * 24 *------------------------------------------------------------------------- 25 */ 26 27 #include "postgres.h" 28 29 #include <unistd.h> 30 31 #include "access/heapam.h" 32 #include "access/xact.h" 33 #include "catalog/pg_class.h" 34 #include "catalog/pg_type.h" 35 #include "miscadmin.h" 36 #include "pgstat.h" 37 #include "postmaster/bgworker.h" 38 #include "storage/buf_internals.h" 39 #include "storage/dsm.h" 40 #include "storage/ipc.h" 41 #include "storage/latch.h" 42 #include "storage/lwlock.h" 43 #include "storage/proc.h" 44 #include "storage/procsignal.h" 45 #include "storage/shmem.h" 46 #include "storage/smgr.h" 47 #include "tcop/tcopprot.h" 48 #include "utils/acl.h" 49 #include "utils/guc.h" 50 #include "utils/memutils.h" 51 #include "utils/rel.h" 52 #include "utils/relfilenodemap.h" 53 #include "utils/resowner.h" 54 55 #define AUTOPREWARM_FILE "autoprewarm.blocks" 56 57 /* Metadata for each block we dump. */ 58 typedef struct BlockInfoRecord 59 { 60 Oid database; 61 Oid tablespace; 62 Oid filenode; 63 ForkNumber forknum; 64 BlockNumber blocknum; 65 } BlockInfoRecord; 66 67 /* Shared state information for autoprewarm bgworker. */ 68 typedef struct AutoPrewarmSharedState 69 { 70 LWLock lock; /* mutual exclusion */ 71 pid_t bgworker_pid; /* for main bgworker */ 72 pid_t pid_using_dumpfile; /* for autoprewarm or block dump */ 73 74 /* Following items are for communication with per-database worker */ 75 dsm_handle block_info_handle; 76 Oid database; 77 int prewarm_start_idx; 78 int prewarm_stop_idx; 79 int prewarmed_blocks; 80 } AutoPrewarmSharedState; 81 82 void _PG_init(void); 83 void autoprewarm_main(Datum main_arg); 84 void autoprewarm_database_main(Datum main_arg); 85 86 PG_FUNCTION_INFO_V1(autoprewarm_start_worker); 87 PG_FUNCTION_INFO_V1(autoprewarm_dump_now); 88 89 static void apw_load_buffers(void); 90 static int apw_dump_now(bool is_bgworker, bool dump_unlogged); 91 static void apw_start_master_worker(void); 92 static void apw_start_database_worker(void); 93 static bool apw_init_shmem(void); 94 static void apw_detach_shmem(int code, Datum arg); 95 static int apw_compare_blockinfo(const void *p, const void *q); 96 static void apw_sigterm_handler(SIGNAL_ARGS); 97 static void apw_sighup_handler(SIGNAL_ARGS); 98 99 /* Flags set by signal handlers */ 100 static volatile sig_atomic_t got_sigterm = false; 101 static volatile sig_atomic_t got_sighup = false; 102 103 /* Pointer to shared-memory state. */ 104 static AutoPrewarmSharedState *apw_state = NULL; 105 106 /* GUC variables. */ 107 static bool autoprewarm = true; /* start worker? */ 108 static int autoprewarm_interval; /* dump interval */ 109 110 /* 111 * Module load callback. 112 */ 113 void 114 _PG_init(void) 115 { 116 DefineCustomIntVariable("pg_prewarm.autoprewarm_interval", 117 "Sets the interval between dumps of shared buffers", 118 "If set to zero, time-based dumping is disabled.", 119 &autoprewarm_interval, 120 300, 121 0, INT_MAX / 1000, 122 PGC_SIGHUP, 123 GUC_UNIT_S, 124 NULL, 125 NULL, 126 NULL); 127 128 if (!process_shared_preload_libraries_in_progress) 129 return; 130 131 /* can't define PGC_POSTMASTER variable after startup */ 132 DefineCustomBoolVariable("pg_prewarm.autoprewarm", 133 "Starts the autoprewarm worker.", 134 NULL, 135 &autoprewarm, 136 true, 137 PGC_POSTMASTER, 138 0, 139 NULL, 140 NULL, 141 NULL); 142 143 EmitWarningsOnPlaceholders("pg_prewarm"); 144 145 RequestAddinShmemSpace(MAXALIGN(sizeof(AutoPrewarmSharedState))); 146 147 /* Register autoprewarm worker, if enabled. */ 148 if (autoprewarm) 149 apw_start_master_worker(); 150 } 151 152 /* 153 * Main entry point for the master autoprewarm process. Per-database workers 154 * have a separate entry point. 155 */ 156 void 157 autoprewarm_main(Datum main_arg) 158 { 159 bool first_time = true; 160 bool final_dump_allowed = true; 161 TimestampTz last_dump_time = 0; 162 163 /* Establish signal handlers; once that's done, unblock signals. */ 164 pqsignal(SIGTERM, apw_sigterm_handler); 165 pqsignal(SIGHUP, apw_sighup_handler); 166 pqsignal(SIGUSR1, procsignal_sigusr1_handler); 167 BackgroundWorkerUnblockSignals(); 168 169 /* Create (if necessary) and attach to our shared memory area. */ 170 if (apw_init_shmem()) 171 first_time = false; 172 173 /* Set on-detach hook so that our PID will be cleared on exit. */ 174 on_shmem_exit(apw_detach_shmem, 0); 175 176 /* 177 * Store our PID in the shared memory area --- unless there's already 178 * another worker running, in which case just exit. 179 */ 180 LWLockAcquire(&apw_state->lock, LW_EXCLUSIVE); 181 if (apw_state->bgworker_pid != InvalidPid) 182 { 183 LWLockRelease(&apw_state->lock); 184 ereport(LOG, 185 (errmsg("autoprewarm worker is already running under PID %lu", 186 (unsigned long) apw_state->bgworker_pid))); 187 return; 188 } 189 apw_state->bgworker_pid = MyProcPid; 190 LWLockRelease(&apw_state->lock); 191 192 /* 193 * Preload buffers from the dump file only if we just created the shared 194 * memory region. Otherwise, it's either already been done or shouldn't 195 * be done - e.g. because the old dump file has been overwritten since the 196 * server was started. 197 * 198 * There's not much point in performing a dump immediately after we finish 199 * preloading; so, if we do end up preloading, consider the last dump time 200 * to be equal to the current time. 201 * 202 * If apw_load_buffers() is terminated early by a shutdown request, 203 * prevent dumping out our state below the loop, because we'd effectively 204 * just truncate the saved state to however much we'd managed to preload. 205 */ 206 if (first_time) 207 { 208 apw_load_buffers(); 209 final_dump_allowed = !got_sigterm; 210 last_dump_time = GetCurrentTimestamp(); 211 } 212 213 /* Periodically dump buffers until terminated. */ 214 while (!got_sigterm) 215 { 216 int rc; 217 218 /* In case of a SIGHUP, just reload the configuration. */ 219 if (got_sighup) 220 { 221 got_sighup = false; 222 ProcessConfigFile(PGC_SIGHUP); 223 } 224 225 if (autoprewarm_interval <= 0) 226 { 227 /* We're only dumping at shutdown, so just wait forever. */ 228 rc = WaitLatch(&MyProc->procLatch, 229 WL_LATCH_SET | WL_POSTMASTER_DEATH, 230 -1L, 231 PG_WAIT_EXTENSION); 232 } 233 else 234 { 235 TimestampTz next_dump_time; 236 long delay_in_ms; 237 238 /* Compute the next dump time. */ 239 next_dump_time = 240 TimestampTzPlusMilliseconds(last_dump_time, 241 autoprewarm_interval * 1000); 242 delay_in_ms = 243 TimestampDifferenceMilliseconds(GetCurrentTimestamp(), 244 next_dump_time); 245 246 /* Perform a dump if it's time. */ 247 if (delay_in_ms <= 0) 248 { 249 last_dump_time = GetCurrentTimestamp(); 250 apw_dump_now(true, false); 251 continue; 252 } 253 254 /* Sleep until the next dump time. */ 255 rc = WaitLatch(&MyProc->procLatch, 256 WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, 257 delay_in_ms, 258 PG_WAIT_EXTENSION); 259 } 260 261 /* Reset the latch, bail out if postmaster died, otherwise loop. */ 262 ResetLatch(&MyProc->procLatch); 263 if (rc & WL_POSTMASTER_DEATH) 264 proc_exit(1); 265 } 266 267 /* 268 * Dump one last time. We assume this is probably the result of a system 269 * shutdown, although it's possible that we've merely been terminated. 270 */ 271 if (final_dump_allowed) 272 apw_dump_now(true, true); 273 } 274 275 /* 276 * Read the dump file and launch per-database workers one at a time to 277 * prewarm the buffers found there. 278 */ 279 static void 280 apw_load_buffers(void) 281 { 282 FILE *file = NULL; 283 int num_elements, 284 i; 285 BlockInfoRecord *blkinfo; 286 dsm_segment *seg; 287 288 /* 289 * Skip the prewarm if the dump file is in use; otherwise, prevent any 290 * other process from writing it while we're using it. 291 */ 292 LWLockAcquire(&apw_state->lock, LW_EXCLUSIVE); 293 if (apw_state->pid_using_dumpfile == InvalidPid) 294 apw_state->pid_using_dumpfile = MyProcPid; 295 else 296 { 297 LWLockRelease(&apw_state->lock); 298 ereport(LOG, 299 (errmsg("skipping prewarm because block dump file is being written by PID %lu", 300 (unsigned long) apw_state->pid_using_dumpfile))); 301 return; 302 } 303 LWLockRelease(&apw_state->lock); 304 305 /* 306 * Open the block dump file. Exit quietly if it doesn't exist, but report 307 * any other error. 308 */ 309 file = AllocateFile(AUTOPREWARM_FILE, "r"); 310 if (!file) 311 { 312 if (errno == ENOENT) 313 { 314 LWLockAcquire(&apw_state->lock, LW_EXCLUSIVE); 315 apw_state->pid_using_dumpfile = InvalidPid; 316 LWLockRelease(&apw_state->lock); 317 return; /* No file to load. */ 318 } 319 ereport(ERROR, 320 (errcode_for_file_access(), 321 errmsg("could not read file \"%s\": %m", 322 AUTOPREWARM_FILE))); 323 } 324 325 /* First line of the file is a record count. */ 326 if (fscanf(file, "<<%d>>\n", &num_elements) != 1) 327 ereport(ERROR, 328 (errcode_for_file_access(), 329 errmsg("could not read from file \"%s\": %m", 330 AUTOPREWARM_FILE))); 331 332 /* Allocate a dynamic shared memory segment to store the record data. */ 333 seg = dsm_create(sizeof(BlockInfoRecord) * num_elements, 0); 334 blkinfo = (BlockInfoRecord *) dsm_segment_address(seg); 335 336 /* Read records, one per line. */ 337 for (i = 0; i < num_elements; i++) 338 { 339 unsigned forknum; 340 341 if (fscanf(file, "%u,%u,%u,%u,%u\n", &blkinfo[i].database, 342 &blkinfo[i].tablespace, &blkinfo[i].filenode, 343 &forknum, &blkinfo[i].blocknum) != 5) 344 ereport(ERROR, 345 (errmsg("autoprewarm block dump file is corrupted at line %d", 346 i + 1))); 347 blkinfo[i].forknum = forknum; 348 } 349 350 FreeFile(file); 351 352 /* Sort the blocks to be loaded. */ 353 pg_qsort(blkinfo, num_elements, sizeof(BlockInfoRecord), 354 apw_compare_blockinfo); 355 356 /* Populate shared memory state. */ 357 apw_state->block_info_handle = dsm_segment_handle(seg); 358 apw_state->prewarm_start_idx = apw_state->prewarm_stop_idx = 0; 359 apw_state->prewarmed_blocks = 0; 360 361 /* Get the info position of the first block of the next database. */ 362 while (apw_state->prewarm_start_idx < num_elements) 363 { 364 int j = apw_state->prewarm_start_idx; 365 Oid current_db = blkinfo[j].database; 366 367 /* 368 * Advance the prewarm_stop_idx to the first BlockRecordInfo that does 369 * not belong to this database. 370 */ 371 j++; 372 while (j < num_elements) 373 { 374 if (current_db != blkinfo[j].database) 375 { 376 /* 377 * Combine BlockRecordInfos for global objects with those of 378 * the database. 379 */ 380 if (current_db != InvalidOid) 381 break; 382 current_db = blkinfo[j].database; 383 } 384 385 j++; 386 } 387 388 /* 389 * If we reach this point with current_db == InvalidOid, then only 390 * BlockRecordInfos belonging to global objects exist. We can't 391 * prewarm without a database connection, so just bail out. 392 */ 393 if (current_db == InvalidOid) 394 break; 395 396 /* Configure stop point and database for next per-database worker. */ 397 apw_state->prewarm_stop_idx = j; 398 apw_state->database = current_db; 399 Assert(apw_state->prewarm_start_idx < apw_state->prewarm_stop_idx); 400 401 /* If we've run out of free buffers, don't launch another worker. */ 402 if (!have_free_buffer()) 403 break; 404 405 /* 406 * Likewise, don't launch if we've already been told to shut down. 407 * (The launch would fail anyway, but we might as well skip it.) 408 */ 409 if (got_sigterm) 410 break; 411 412 /* 413 * Start a per-database worker to load blocks for this database; this 414 * function will return once the per-database worker exits. 415 */ 416 apw_start_database_worker(); 417 418 /* Prepare for next database. */ 419 apw_state->prewarm_start_idx = apw_state->prewarm_stop_idx; 420 } 421 422 /* Clean up. */ 423 dsm_detach(seg); 424 LWLockAcquire(&apw_state->lock, LW_EXCLUSIVE); 425 apw_state->block_info_handle = DSM_HANDLE_INVALID; 426 apw_state->pid_using_dumpfile = InvalidPid; 427 LWLockRelease(&apw_state->lock); 428 429 /* Report our success, if we were able to finish. */ 430 if (!got_sigterm) 431 ereport(LOG, 432 (errmsg("autoprewarm successfully prewarmed %d of %d previously-loaded blocks", 433 apw_state->prewarmed_blocks, num_elements))); 434 } 435 436 /* 437 * Prewarm all blocks for one database (and possibly also global objects, if 438 * those got grouped with this database). 439 */ 440 void 441 autoprewarm_database_main(Datum main_arg) 442 { 443 int pos; 444 BlockInfoRecord *block_info; 445 Relation rel = NULL; 446 BlockNumber nblocks = 0; 447 BlockInfoRecord *old_blk = NULL; 448 dsm_segment *seg; 449 450 /* Establish signal handlers; once that's done, unblock signals. */ 451 pqsignal(SIGTERM, die); 452 BackgroundWorkerUnblockSignals(); 453 454 /* Connect to correct database and get block information. */ 455 apw_init_shmem(); 456 seg = dsm_attach(apw_state->block_info_handle); 457 if (seg == NULL) 458 ereport(ERROR, 459 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), 460 errmsg("could not map dynamic shared memory segment"))); 461 BackgroundWorkerInitializeConnectionByOid(apw_state->database, InvalidOid, 0); 462 block_info = (BlockInfoRecord *) dsm_segment_address(seg); 463 pos = apw_state->prewarm_start_idx; 464 465 /* 466 * Loop until we run out of blocks to prewarm or until we run out of free 467 * buffers. 468 */ 469 while (pos < apw_state->prewarm_stop_idx && have_free_buffer()) 470 { 471 BlockInfoRecord *blk = &block_info[pos++]; 472 Buffer buf; 473 474 CHECK_FOR_INTERRUPTS(); 475 476 /* 477 * Quit if we've reached records for another database. If previous 478 * blocks are of some global objects, then continue pre-warming. 479 */ 480 if (old_blk != NULL && old_blk->database != blk->database && 481 old_blk->database != 0) 482 break; 483 484 /* 485 * As soon as we encounter a block of a new relation, close the old 486 * relation. Note that rel will be NULL if try_relation_open failed 487 * previously; in that case, there is nothing to close. 488 */ 489 if (old_blk != NULL && old_blk->filenode != blk->filenode && 490 rel != NULL) 491 { 492 relation_close(rel, AccessShareLock); 493 rel = NULL; 494 CommitTransactionCommand(); 495 } 496 497 /* 498 * Try to open each new relation, but only once, when we first 499 * encounter it. If it's been dropped, skip the associated blocks. 500 */ 501 if (old_blk == NULL || old_blk->filenode != blk->filenode) 502 { 503 Oid reloid; 504 505 Assert(rel == NULL); 506 StartTransactionCommand(); 507 reloid = RelidByRelfilenode(blk->tablespace, blk->filenode); 508 if (OidIsValid(reloid)) 509 rel = try_relation_open(reloid, AccessShareLock); 510 511 if (!rel) 512 CommitTransactionCommand(); 513 } 514 if (!rel) 515 { 516 old_blk = blk; 517 continue; 518 } 519 520 /* Once per fork, check for fork existence and size. */ 521 if (old_blk == NULL || 522 old_blk->filenode != blk->filenode || 523 old_blk->forknum != blk->forknum) 524 { 525 RelationOpenSmgr(rel); 526 527 /* 528 * smgrexists is not safe for illegal forknum, hence check whether 529 * the passed forknum is valid before using it in smgrexists. 530 */ 531 if (blk->forknum > InvalidForkNumber && 532 blk->forknum <= MAX_FORKNUM && 533 smgrexists(rel->rd_smgr, blk->forknum)) 534 nblocks = RelationGetNumberOfBlocksInFork(rel, blk->forknum); 535 else 536 nblocks = 0; 537 } 538 539 /* Check whether blocknum is valid and within fork file size. */ 540 if (blk->blocknum >= nblocks) 541 { 542 /* Move to next forknum. */ 543 old_blk = blk; 544 continue; 545 } 546 547 /* Prewarm buffer. */ 548 buf = ReadBufferExtended(rel, blk->forknum, blk->blocknum, RBM_NORMAL, 549 NULL); 550 if (BufferIsValid(buf)) 551 { 552 apw_state->prewarmed_blocks++; 553 ReleaseBuffer(buf); 554 } 555 556 old_blk = blk; 557 } 558 559 dsm_detach(seg); 560 561 /* Release lock on previous relation. */ 562 if (rel) 563 { 564 relation_close(rel, AccessShareLock); 565 CommitTransactionCommand(); 566 } 567 } 568 569 /* 570 * Dump information on blocks in shared buffers. We use a text format here 571 * so that it's easy to understand and even change the file contents if 572 * necessary. 573 * Returns the number of blocks dumped. 574 */ 575 static int 576 apw_dump_now(bool is_bgworker, bool dump_unlogged) 577 { 578 int num_blocks; 579 int i; 580 int ret; 581 BlockInfoRecord *block_info_array; 582 BufferDesc *bufHdr; 583 FILE *file; 584 char transient_dump_file_path[MAXPGPATH]; 585 pid_t pid; 586 587 LWLockAcquire(&apw_state->lock, LW_EXCLUSIVE); 588 pid = apw_state->pid_using_dumpfile; 589 if (apw_state->pid_using_dumpfile == InvalidPid) 590 apw_state->pid_using_dumpfile = MyProcPid; 591 LWLockRelease(&apw_state->lock); 592 593 if (pid != InvalidPid) 594 { 595 if (!is_bgworker) 596 ereport(ERROR, 597 (errmsg("could not perform block dump because dump file is being used by PID %lu", 598 (unsigned long) apw_state->pid_using_dumpfile))); 599 600 ereport(LOG, 601 (errmsg("skipping block dump because it is already being performed by PID %lu", 602 (unsigned long) apw_state->pid_using_dumpfile))); 603 return 0; 604 } 605 606 block_info_array = 607 (BlockInfoRecord *) palloc(sizeof(BlockInfoRecord) * NBuffers); 608 609 for (num_blocks = 0, i = 0; i < NBuffers; i++) 610 { 611 uint32 buf_state; 612 613 CHECK_FOR_INTERRUPTS(); 614 615 bufHdr = GetBufferDescriptor(i); 616 617 /* Lock each buffer header before inspecting. */ 618 buf_state = LockBufHdr(bufHdr); 619 620 /* 621 * Unlogged tables will be automatically truncated after a crash or 622 * unclean shutdown. In such cases we need not prewarm them. Dump them 623 * only if requested by caller. 624 */ 625 if (buf_state & BM_TAG_VALID && 626 ((buf_state & BM_PERMANENT) || dump_unlogged)) 627 { 628 block_info_array[num_blocks].database = bufHdr->tag.rnode.dbNode; 629 block_info_array[num_blocks].tablespace = bufHdr->tag.rnode.spcNode; 630 block_info_array[num_blocks].filenode = bufHdr->tag.rnode.relNode; 631 block_info_array[num_blocks].forknum = bufHdr->tag.forkNum; 632 block_info_array[num_blocks].blocknum = bufHdr->tag.blockNum; 633 ++num_blocks; 634 } 635 636 UnlockBufHdr(bufHdr, buf_state); 637 } 638 639 snprintf(transient_dump_file_path, MAXPGPATH, "%s.tmp", AUTOPREWARM_FILE); 640 file = AllocateFile(transient_dump_file_path, "w"); 641 if (!file) 642 ereport(ERROR, 643 (errcode_for_file_access(), 644 errmsg("could not open file \"%s\": %m", 645 transient_dump_file_path))); 646 647 ret = fprintf(file, "<<%d>>\n", num_blocks); 648 if (ret < 0) 649 { 650 int save_errno = errno; 651 652 FreeFile(file); 653 unlink(transient_dump_file_path); 654 errno = save_errno; 655 ereport(ERROR, 656 (errcode_for_file_access(), 657 errmsg("could not write to file \"%s\" : %m", 658 transient_dump_file_path))); 659 } 660 661 for (i = 0; i < num_blocks; i++) 662 { 663 CHECK_FOR_INTERRUPTS(); 664 665 ret = fprintf(file, "%u,%u,%u,%u,%u\n", 666 block_info_array[i].database, 667 block_info_array[i].tablespace, 668 block_info_array[i].filenode, 669 (uint32) block_info_array[i].forknum, 670 block_info_array[i].blocknum); 671 if (ret < 0) 672 { 673 int save_errno = errno; 674 675 FreeFile(file); 676 unlink(transient_dump_file_path); 677 errno = save_errno; 678 ereport(ERROR, 679 (errcode_for_file_access(), 680 errmsg("could not write to file \"%s\" : %m", 681 transient_dump_file_path))); 682 } 683 } 684 685 pfree(block_info_array); 686 687 /* 688 * Rename transient_dump_file_path to AUTOPREWARM_FILE to make things 689 * permanent. 690 */ 691 ret = FreeFile(file); 692 if (ret != 0) 693 { 694 int save_errno = errno; 695 696 unlink(transient_dump_file_path); 697 errno = save_errno; 698 ereport(ERROR, 699 (errcode_for_file_access(), 700 errmsg("could not close file \"%s\" : %m", 701 transient_dump_file_path))); 702 } 703 704 (void) durable_rename(transient_dump_file_path, AUTOPREWARM_FILE, ERROR); 705 apw_state->pid_using_dumpfile = InvalidPid; 706 707 ereport(DEBUG1, 708 (errmsg("wrote block details for %d blocks", num_blocks))); 709 return num_blocks; 710 } 711 712 /* 713 * SQL-callable function to launch autoprewarm. 714 */ 715 Datum 716 autoprewarm_start_worker(PG_FUNCTION_ARGS) 717 { 718 pid_t pid; 719 720 if (!autoprewarm) 721 ereport(ERROR, 722 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), 723 errmsg("autoprewarm is disabled"))); 724 725 apw_init_shmem(); 726 LWLockAcquire(&apw_state->lock, LW_EXCLUSIVE); 727 pid = apw_state->bgworker_pid; 728 LWLockRelease(&apw_state->lock); 729 730 if (pid != InvalidPid) 731 ereport(ERROR, 732 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), 733 errmsg("autoprewarm worker is already running under PID %lu", 734 (unsigned long) pid))); 735 736 apw_start_master_worker(); 737 738 PG_RETURN_VOID(); 739 } 740 741 /* 742 * SQL-callable function to perform an immediate block dump. 743 * 744 * Note: this is declared to return int8, as insurance against some 745 * very distant day when we might make NBuffers wider than int. 746 */ 747 Datum 748 autoprewarm_dump_now(PG_FUNCTION_ARGS) 749 { 750 int num_blocks; 751 752 apw_init_shmem(); 753 754 PG_ENSURE_ERROR_CLEANUP(apw_detach_shmem, 0); 755 { 756 num_blocks = apw_dump_now(false, true); 757 } 758 PG_END_ENSURE_ERROR_CLEANUP(apw_detach_shmem, 0); 759 760 PG_RETURN_INT64((int64) num_blocks); 761 } 762 763 /* 764 * Allocate and initialize autoprewarm related shared memory, if not already 765 * done, and set up backend-local pointer to that state. Returns true if an 766 * existing shared memory segment was found. 767 */ 768 static bool 769 apw_init_shmem(void) 770 { 771 bool found; 772 773 LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); 774 apw_state = ShmemInitStruct("autoprewarm", 775 sizeof(AutoPrewarmSharedState), 776 &found); 777 if (!found) 778 { 779 /* First time through ... */ 780 LWLockInitialize(&apw_state->lock, LWLockNewTrancheId()); 781 apw_state->bgworker_pid = InvalidPid; 782 apw_state->pid_using_dumpfile = InvalidPid; 783 } 784 LWLockRelease(AddinShmemInitLock); 785 786 LWLockRegisterTranche(apw_state->lock.tranche, "autoprewarm"); 787 788 return found; 789 } 790 791 /* 792 * Clear our PID from autoprewarm shared state. 793 */ 794 static void 795 apw_detach_shmem(int code, Datum arg) 796 { 797 LWLockAcquire(&apw_state->lock, LW_EXCLUSIVE); 798 if (apw_state->pid_using_dumpfile == MyProcPid) 799 apw_state->pid_using_dumpfile = InvalidPid; 800 if (apw_state->bgworker_pid == MyProcPid) 801 apw_state->bgworker_pid = InvalidPid; 802 LWLockRelease(&apw_state->lock); 803 } 804 805 /* 806 * Start autoprewarm master worker process. 807 */ 808 static void 809 apw_start_master_worker(void) 810 { 811 BackgroundWorker worker; 812 BackgroundWorkerHandle *handle; 813 BgwHandleStatus status; 814 pid_t pid; 815 816 MemSet(&worker, 0, sizeof(BackgroundWorker)); 817 worker.bgw_flags = BGWORKER_SHMEM_ACCESS; 818 worker.bgw_start_time = BgWorkerStart_ConsistentState; 819 strcpy(worker.bgw_library_name, "pg_prewarm"); 820 strcpy(worker.bgw_function_name, "autoprewarm_main"); 821 strcpy(worker.bgw_name, "autoprewarm master"); 822 strcpy(worker.bgw_type, "autoprewarm master"); 823 824 if (process_shared_preload_libraries_in_progress) 825 { 826 RegisterBackgroundWorker(&worker); 827 return; 828 } 829 830 /* must set notify PID to wait for startup */ 831 worker.bgw_notify_pid = MyProcPid; 832 833 if (!RegisterDynamicBackgroundWorker(&worker, &handle)) 834 ereport(ERROR, 835 (errcode(ERRCODE_INSUFFICIENT_RESOURCES), 836 errmsg("could not register background process"), 837 errhint("You may need to increase max_worker_processes."))); 838 839 status = WaitForBackgroundWorkerStartup(handle, &pid); 840 if (status != BGWH_STARTED) 841 ereport(ERROR, 842 (errcode(ERRCODE_INSUFFICIENT_RESOURCES), 843 errmsg("could not start background process"), 844 errhint("More details may be available in the server log."))); 845 } 846 847 /* 848 * Start autoprewarm per-database worker process. 849 */ 850 static void 851 apw_start_database_worker(void) 852 { 853 BackgroundWorker worker; 854 BackgroundWorkerHandle *handle; 855 856 MemSet(&worker, 0, sizeof(BackgroundWorker)); 857 worker.bgw_flags = 858 BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; 859 worker.bgw_start_time = BgWorkerStart_ConsistentState; 860 worker.bgw_restart_time = BGW_NEVER_RESTART; 861 strcpy(worker.bgw_library_name, "pg_prewarm"); 862 strcpy(worker.bgw_function_name, "autoprewarm_database_main"); 863 strcpy(worker.bgw_name, "autoprewarm worker"); 864 strcpy(worker.bgw_type, "autoprewarm worker"); 865 866 /* must set notify PID to wait for shutdown */ 867 worker.bgw_notify_pid = MyProcPid; 868 869 if (!RegisterDynamicBackgroundWorker(&worker, &handle)) 870 ereport(ERROR, 871 (errcode(ERRCODE_INSUFFICIENT_RESOURCES), 872 errmsg("registering dynamic bgworker autoprewarm failed"), 873 errhint("Consider increasing configuration parameter \"max_worker_processes\"."))); 874 875 /* 876 * Ignore return value; if it fails, postmaster has died, but we have 877 * checks for that elsewhere. 878 */ 879 WaitForBackgroundWorkerShutdown(handle); 880 } 881 882 /* Compare member elements to check whether they are not equal. */ 883 #define cmp_member_elem(fld) \ 884 do { \ 885 if (a->fld < b->fld) \ 886 return -1; \ 887 else if (a->fld > b->fld) \ 888 return 1; \ 889 } while(0) 890 891 /* 892 * apw_compare_blockinfo 893 * 894 * We depend on all records for a particular database being consecutive 895 * in the dump file; each per-database worker will preload blocks until 896 * it sees a block for some other database. Sorting by tablespace, 897 * filenode, forknum, and blocknum isn't critical for correctness, but 898 * helps us get a sequential I/O pattern. 899 */ 900 static int 901 apw_compare_blockinfo(const void *p, const void *q) 902 { 903 const BlockInfoRecord *a = (const BlockInfoRecord *) p; 904 const BlockInfoRecord *b = (const BlockInfoRecord *) q; 905 906 cmp_member_elem(database); 907 cmp_member_elem(tablespace); 908 cmp_member_elem(filenode); 909 cmp_member_elem(forknum); 910 cmp_member_elem(blocknum); 911 912 return 0; 913 } 914 915 /* 916 * Signal handler for SIGTERM 917 */ 918 static void 919 apw_sigterm_handler(SIGNAL_ARGS) 920 { 921 int save_errno = errno; 922 923 got_sigterm = true; 924 925 if (MyProc) 926 SetLatch(&MyProc->procLatch); 927 928 errno = save_errno; 929 } 930 931 /* 932 * Signal handler for SIGHUP 933 */ 934 static void 935 apw_sighup_handler(SIGNAL_ARGS) 936 { 937 int save_errno = errno; 938 939 got_sighup = true; 940 941 if (MyProc) 942 SetLatch(&MyProc->procLatch); 943 944 errno = save_errno; 945 } 946