1 /*------------------------------------------------------------------------- 2 * 3 * autoprewarm.c 4 * Periodically dump information about the blocks present in 5 * shared_buffers, and reload them on server restart. 6 * 7 * Due to locking considerations, we can't actually begin prewarming 8 * until the server reaches a consistent state. We need the catalogs 9 * to be consistent so that we can figure out which relation to lock, 10 * and we need to lock the relations so that we don't try to prewarm 11 * pages from a relation that is in the process of being dropped. 12 * 13 * While prewarming, autoprewarm will use two workers. There's a 14 * master worker that reads and sorts the list of blocks to be 15 * prewarmed and then launches a per-database worker for each 16 * relevant database in turn. The former keeps running after the 17 * initial prewarm is complete to update the dump file periodically. 18 * 19 * Copyright (c) 2016-2019, PostgreSQL Global Development Group 20 * 21 * IDENTIFICATION 22 * contrib/pg_prewarm/autoprewarm.c 23 * 24 *------------------------------------------------------------------------- 25 */ 26 27 #include "postgres.h" 28 29 #include <unistd.h> 30 31 #include "access/relation.h" 32 #include "access/xact.h" 33 #include "catalog/pg_class.h" 34 #include "catalog/pg_type.h" 35 #include "miscadmin.h" 36 #include "pgstat.h" 37 #include "postmaster/bgworker.h" 38 #include "storage/buf_internals.h" 39 #include "storage/dsm.h" 40 #include "storage/ipc.h" 41 #include "storage/latch.h" 42 #include "storage/lwlock.h" 43 #include "storage/proc.h" 44 #include "storage/procsignal.h" 45 #include "storage/shmem.h" 46 #include "storage/smgr.h" 47 #include "tcop/tcopprot.h" 48 #include "utils/acl.h" 49 #include "utils/guc.h" 50 #include "utils/memutils.h" 51 #include "utils/rel.h" 52 #include "utils/relfilenodemap.h" 53 #include "utils/resowner.h" 54 55 #define AUTOPREWARM_FILE "autoprewarm.blocks" 56 57 /* Metadata for each block we dump. */ 58 typedef struct BlockInfoRecord 59 { 60 Oid database; 61 Oid tablespace; 62 Oid filenode; 63 ForkNumber forknum; 64 BlockNumber blocknum; 65 } BlockInfoRecord; 66 67 /* Shared state information for autoprewarm bgworker. */ 68 typedef struct AutoPrewarmSharedState 69 { 70 LWLock lock; /* mutual exclusion */ 71 pid_t bgworker_pid; /* for main bgworker */ 72 pid_t pid_using_dumpfile; /* for autoprewarm or block dump */ 73 74 /* Following items are for communication with per-database worker */ 75 dsm_handle block_info_handle; 76 Oid database; 77 int prewarm_start_idx; 78 int prewarm_stop_idx; 79 int prewarmed_blocks; 80 } AutoPrewarmSharedState; 81 82 void _PG_init(void); 83 void autoprewarm_main(Datum main_arg); 84 void autoprewarm_database_main(Datum main_arg); 85 86 PG_FUNCTION_INFO_V1(autoprewarm_start_worker); 87 PG_FUNCTION_INFO_V1(autoprewarm_dump_now); 88 89 static void apw_load_buffers(void); 90 static int apw_dump_now(bool is_bgworker, bool dump_unlogged); 91 static void apw_start_master_worker(void); 92 static void apw_start_database_worker(void); 93 static bool apw_init_shmem(void); 94 static void apw_detach_shmem(int code, Datum arg); 95 static int apw_compare_blockinfo(const void *p, const void *q); 96 static void apw_sigterm_handler(SIGNAL_ARGS); 97 static void apw_sighup_handler(SIGNAL_ARGS); 98 99 /* Flags set by signal handlers */ 100 static volatile sig_atomic_t got_sigterm = false; 101 static volatile sig_atomic_t got_sighup = false; 102 103 /* Pointer to shared-memory state. */ 104 static AutoPrewarmSharedState *apw_state = NULL; 105 106 /* GUC variables. */ 107 static bool autoprewarm = true; /* start worker? */ 108 static int autoprewarm_interval; /* dump interval */ 109 110 /* 111 * Module load callback. 112 */ 113 void 114 _PG_init(void) 115 { 116 DefineCustomIntVariable("pg_prewarm.autoprewarm_interval", 117 "Sets the interval between dumps of shared buffers", 118 "If set to zero, time-based dumping is disabled.", 119 &autoprewarm_interval, 120 300, 121 0, INT_MAX / 1000, 122 PGC_SIGHUP, 123 GUC_UNIT_S, 124 NULL, 125 NULL, 126 NULL); 127 128 if (!process_shared_preload_libraries_in_progress) 129 return; 130 131 /* can't define PGC_POSTMASTER variable after startup */ 132 DefineCustomBoolVariable("pg_prewarm.autoprewarm", 133 "Starts the autoprewarm worker.", 134 NULL, 135 &autoprewarm, 136 true, 137 PGC_POSTMASTER, 138 0, 139 NULL, 140 NULL, 141 NULL); 142 143 EmitWarningsOnPlaceholders("pg_prewarm"); 144 145 RequestAddinShmemSpace(MAXALIGN(sizeof(AutoPrewarmSharedState))); 146 147 /* Register autoprewarm worker, if enabled. */ 148 if (autoprewarm) 149 apw_start_master_worker(); 150 } 151 152 /* 153 * Main entry point for the master autoprewarm process. Per-database workers 154 * have a separate entry point. 155 */ 156 void 157 autoprewarm_main(Datum main_arg) 158 { 159 bool first_time = true; 160 bool final_dump_allowed = true; 161 TimestampTz last_dump_time = 0; 162 163 /* Establish signal handlers; once that's done, unblock signals. */ 164 pqsignal(SIGTERM, apw_sigterm_handler); 165 pqsignal(SIGHUP, apw_sighup_handler); 166 pqsignal(SIGUSR1, procsignal_sigusr1_handler); 167 BackgroundWorkerUnblockSignals(); 168 169 /* Create (if necessary) and attach to our shared memory area. */ 170 if (apw_init_shmem()) 171 first_time = false; 172 173 /* Set on-detach hook so that our PID will be cleared on exit. */ 174 on_shmem_exit(apw_detach_shmem, 0); 175 176 /* 177 * Store our PID in the shared memory area --- unless there's already 178 * another worker running, in which case just exit. 179 */ 180 LWLockAcquire(&apw_state->lock, LW_EXCLUSIVE); 181 if (apw_state->bgworker_pid != InvalidPid) 182 { 183 LWLockRelease(&apw_state->lock); 184 ereport(LOG, 185 (errmsg("autoprewarm worker is already running under PID %lu", 186 (unsigned long) apw_state->bgworker_pid))); 187 return; 188 } 189 apw_state->bgworker_pid = MyProcPid; 190 LWLockRelease(&apw_state->lock); 191 192 /* 193 * Preload buffers from the dump file only if we just created the shared 194 * memory region. Otherwise, it's either already been done or shouldn't 195 * be done - e.g. because the old dump file has been overwritten since the 196 * server was started. 197 * 198 * There's not much point in performing a dump immediately after we finish 199 * preloading; so, if we do end up preloading, consider the last dump time 200 * to be equal to the current time. 201 * 202 * If apw_load_buffers() is terminated early by a shutdown request, 203 * prevent dumping out our state below the loop, because we'd effectively 204 * just truncate the saved state to however much we'd managed to preload. 205 */ 206 if (first_time) 207 { 208 apw_load_buffers(); 209 final_dump_allowed = !got_sigterm; 210 last_dump_time = GetCurrentTimestamp(); 211 } 212 213 /* Periodically dump buffers until terminated. */ 214 while (!got_sigterm) 215 { 216 /* In case of a SIGHUP, just reload the configuration. */ 217 if (got_sighup) 218 { 219 got_sighup = false; 220 ProcessConfigFile(PGC_SIGHUP); 221 } 222 223 if (autoprewarm_interval <= 0) 224 { 225 /* We're only dumping at shutdown, so just wait forever. */ 226 (void) WaitLatch(&MyProc->procLatch, 227 WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 228 -1L, 229 PG_WAIT_EXTENSION); 230 } 231 else 232 { 233 TimestampTz next_dump_time; 234 long delay_in_ms; 235 236 /* Compute the next dump time. */ 237 next_dump_time = 238 TimestampTzPlusMilliseconds(last_dump_time, 239 autoprewarm_interval * 1000); 240 delay_in_ms = 241 TimestampDifferenceMilliseconds(GetCurrentTimestamp(), 242 next_dump_time); 243 244 /* Perform a dump if it's time. */ 245 if (delay_in_ms <= 0) 246 { 247 last_dump_time = GetCurrentTimestamp(); 248 apw_dump_now(true, false); 249 continue; 250 } 251 252 /* Sleep until the next dump time. */ 253 (void) WaitLatch(&MyProc->procLatch, 254 WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, 255 delay_in_ms, 256 PG_WAIT_EXTENSION); 257 } 258 259 /* Reset the latch, loop. */ 260 ResetLatch(&MyProc->procLatch); 261 } 262 263 /* 264 * Dump one last time. We assume this is probably the result of a system 265 * shutdown, although it's possible that we've merely been terminated. 266 */ 267 if (final_dump_allowed) 268 apw_dump_now(true, true); 269 } 270 271 /* 272 * Read the dump file and launch per-database workers one at a time to 273 * prewarm the buffers found there. 274 */ 275 static void 276 apw_load_buffers(void) 277 { 278 FILE *file = NULL; 279 int num_elements, 280 i; 281 BlockInfoRecord *blkinfo; 282 dsm_segment *seg; 283 284 /* 285 * Skip the prewarm if the dump file is in use; otherwise, prevent any 286 * other process from writing it while we're using it. 287 */ 288 LWLockAcquire(&apw_state->lock, LW_EXCLUSIVE); 289 if (apw_state->pid_using_dumpfile == InvalidPid) 290 apw_state->pid_using_dumpfile = MyProcPid; 291 else 292 { 293 LWLockRelease(&apw_state->lock); 294 ereport(LOG, 295 (errmsg("skipping prewarm because block dump file is being written by PID %lu", 296 (unsigned long) apw_state->pid_using_dumpfile))); 297 return; 298 } 299 LWLockRelease(&apw_state->lock); 300 301 /* 302 * Open the block dump file. Exit quietly if it doesn't exist, but report 303 * any other error. 304 */ 305 file = AllocateFile(AUTOPREWARM_FILE, "r"); 306 if (!file) 307 { 308 if (errno == ENOENT) 309 { 310 LWLockAcquire(&apw_state->lock, LW_EXCLUSIVE); 311 apw_state->pid_using_dumpfile = InvalidPid; 312 LWLockRelease(&apw_state->lock); 313 return; /* No file to load. */ 314 } 315 ereport(ERROR, 316 (errcode_for_file_access(), 317 errmsg("could not read file \"%s\": %m", 318 AUTOPREWARM_FILE))); 319 } 320 321 /* First line of the file is a record count. */ 322 if (fscanf(file, "<<%d>>\n", &num_elements) != 1) 323 ereport(ERROR, 324 (errcode_for_file_access(), 325 errmsg("could not read from file \"%s\": %m", 326 AUTOPREWARM_FILE))); 327 328 /* Allocate a dynamic shared memory segment to store the record data. */ 329 seg = dsm_create(sizeof(BlockInfoRecord) * num_elements, 0); 330 blkinfo = (BlockInfoRecord *) dsm_segment_address(seg); 331 332 /* Read records, one per line. */ 333 for (i = 0; i < num_elements; i++) 334 { 335 unsigned forknum; 336 337 if (fscanf(file, "%u,%u,%u,%u,%u\n", &blkinfo[i].database, 338 &blkinfo[i].tablespace, &blkinfo[i].filenode, 339 &forknum, &blkinfo[i].blocknum) != 5) 340 ereport(ERROR, 341 (errmsg("autoprewarm block dump file is corrupted at line %d", 342 i + 1))); 343 blkinfo[i].forknum = forknum; 344 } 345 346 FreeFile(file); 347 348 /* Sort the blocks to be loaded. */ 349 pg_qsort(blkinfo, num_elements, sizeof(BlockInfoRecord), 350 apw_compare_blockinfo); 351 352 /* Populate shared memory state. */ 353 apw_state->block_info_handle = dsm_segment_handle(seg); 354 apw_state->prewarm_start_idx = apw_state->prewarm_stop_idx = 0; 355 apw_state->prewarmed_blocks = 0; 356 357 /* Get the info position of the first block of the next database. */ 358 while (apw_state->prewarm_start_idx < num_elements) 359 { 360 int j = apw_state->prewarm_start_idx; 361 Oid current_db = blkinfo[j].database; 362 363 /* 364 * Advance the prewarm_stop_idx to the first BlockInfoRecord that does 365 * not belong to this database. 366 */ 367 j++; 368 while (j < num_elements) 369 { 370 if (current_db != blkinfo[j].database) 371 { 372 /* 373 * Combine BlockInfoRecords for global objects with those of 374 * the database. 375 */ 376 if (current_db != InvalidOid) 377 break; 378 current_db = blkinfo[j].database; 379 } 380 381 j++; 382 } 383 384 /* 385 * If we reach this point with current_db == InvalidOid, then only 386 * BlockInfoRecords belonging to global objects exist. We can't 387 * prewarm without a database connection, so just bail out. 388 */ 389 if (current_db == InvalidOid) 390 break; 391 392 /* Configure stop point and database for next per-database worker. */ 393 apw_state->prewarm_stop_idx = j; 394 apw_state->database = current_db; 395 Assert(apw_state->prewarm_start_idx < apw_state->prewarm_stop_idx); 396 397 /* If we've run out of free buffers, don't launch another worker. */ 398 if (!have_free_buffer()) 399 break; 400 401 /* 402 * Likewise, don't launch if we've already been told to shut down. 403 * (The launch would fail anyway, but we might as well skip it.) 404 */ 405 if (got_sigterm) 406 break; 407 408 /* 409 * Start a per-database worker to load blocks for this database; this 410 * function will return once the per-database worker exits. 411 */ 412 apw_start_database_worker(); 413 414 /* Prepare for next database. */ 415 apw_state->prewarm_start_idx = apw_state->prewarm_stop_idx; 416 } 417 418 /* Clean up. */ 419 dsm_detach(seg); 420 LWLockAcquire(&apw_state->lock, LW_EXCLUSIVE); 421 apw_state->block_info_handle = DSM_HANDLE_INVALID; 422 apw_state->pid_using_dumpfile = InvalidPid; 423 LWLockRelease(&apw_state->lock); 424 425 /* Report our success, if we were able to finish. */ 426 if (!got_sigterm) 427 ereport(LOG, 428 (errmsg("autoprewarm successfully prewarmed %d of %d previously-loaded blocks", 429 apw_state->prewarmed_blocks, num_elements))); 430 } 431 432 /* 433 * Prewarm all blocks for one database (and possibly also global objects, if 434 * those got grouped with this database). 435 */ 436 void 437 autoprewarm_database_main(Datum main_arg) 438 { 439 int pos; 440 BlockInfoRecord *block_info; 441 Relation rel = NULL; 442 BlockNumber nblocks = 0; 443 BlockInfoRecord *old_blk = NULL; 444 dsm_segment *seg; 445 446 /* Establish signal handlers; once that's done, unblock signals. */ 447 pqsignal(SIGTERM, die); 448 BackgroundWorkerUnblockSignals(); 449 450 /* Connect to correct database and get block information. */ 451 apw_init_shmem(); 452 seg = dsm_attach(apw_state->block_info_handle); 453 if (seg == NULL) 454 ereport(ERROR, 455 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), 456 errmsg("could not map dynamic shared memory segment"))); 457 BackgroundWorkerInitializeConnectionByOid(apw_state->database, InvalidOid, 0); 458 block_info = (BlockInfoRecord *) dsm_segment_address(seg); 459 pos = apw_state->prewarm_start_idx; 460 461 /* 462 * Loop until we run out of blocks to prewarm or until we run out of free 463 * buffers. 464 */ 465 while (pos < apw_state->prewarm_stop_idx && have_free_buffer()) 466 { 467 BlockInfoRecord *blk = &block_info[pos++]; 468 Buffer buf; 469 470 CHECK_FOR_INTERRUPTS(); 471 472 /* 473 * Quit if we've reached records for another database. If previous 474 * blocks are of some global objects, then continue pre-warming. 475 */ 476 if (old_blk != NULL && old_blk->database != blk->database && 477 old_blk->database != 0) 478 break; 479 480 /* 481 * As soon as we encounter a block of a new relation, close the old 482 * relation. Note that rel will be NULL if try_relation_open failed 483 * previously; in that case, there is nothing to close. 484 */ 485 if (old_blk != NULL && old_blk->filenode != blk->filenode && 486 rel != NULL) 487 { 488 relation_close(rel, AccessShareLock); 489 rel = NULL; 490 CommitTransactionCommand(); 491 } 492 493 /* 494 * Try to open each new relation, but only once, when we first 495 * encounter it. If it's been dropped, skip the associated blocks. 496 */ 497 if (old_blk == NULL || old_blk->filenode != blk->filenode) 498 { 499 Oid reloid; 500 501 Assert(rel == NULL); 502 StartTransactionCommand(); 503 reloid = RelidByRelfilenode(blk->tablespace, blk->filenode); 504 if (OidIsValid(reloid)) 505 rel = try_relation_open(reloid, AccessShareLock); 506 507 if (!rel) 508 CommitTransactionCommand(); 509 } 510 if (!rel) 511 { 512 old_blk = blk; 513 continue; 514 } 515 516 /* Once per fork, check for fork existence and size. */ 517 if (old_blk == NULL || 518 old_blk->filenode != blk->filenode || 519 old_blk->forknum != blk->forknum) 520 { 521 RelationOpenSmgr(rel); 522 523 /* 524 * smgrexists is not safe for illegal forknum, hence check whether 525 * the passed forknum is valid before using it in smgrexists. 526 */ 527 if (blk->forknum > InvalidForkNumber && 528 blk->forknum <= MAX_FORKNUM && 529 smgrexists(rel->rd_smgr, blk->forknum)) 530 nblocks = RelationGetNumberOfBlocksInFork(rel, blk->forknum); 531 else 532 nblocks = 0; 533 } 534 535 /* Check whether blocknum is valid and within fork file size. */ 536 if (blk->blocknum >= nblocks) 537 { 538 /* Move to next forknum. */ 539 old_blk = blk; 540 continue; 541 } 542 543 /* Prewarm buffer. */ 544 buf = ReadBufferExtended(rel, blk->forknum, blk->blocknum, RBM_NORMAL, 545 NULL); 546 if (BufferIsValid(buf)) 547 { 548 apw_state->prewarmed_blocks++; 549 ReleaseBuffer(buf); 550 } 551 552 old_blk = blk; 553 } 554 555 dsm_detach(seg); 556 557 /* Release lock on previous relation. */ 558 if (rel) 559 { 560 relation_close(rel, AccessShareLock); 561 CommitTransactionCommand(); 562 } 563 } 564 565 /* 566 * Dump information on blocks in shared buffers. We use a text format here 567 * so that it's easy to understand and even change the file contents if 568 * necessary. 569 * Returns the number of blocks dumped. 570 */ 571 static int 572 apw_dump_now(bool is_bgworker, bool dump_unlogged) 573 { 574 int num_blocks; 575 int i; 576 int ret; 577 BlockInfoRecord *block_info_array; 578 BufferDesc *bufHdr; 579 FILE *file; 580 char transient_dump_file_path[MAXPGPATH]; 581 pid_t pid; 582 583 LWLockAcquire(&apw_state->lock, LW_EXCLUSIVE); 584 pid = apw_state->pid_using_dumpfile; 585 if (apw_state->pid_using_dumpfile == InvalidPid) 586 apw_state->pid_using_dumpfile = MyProcPid; 587 LWLockRelease(&apw_state->lock); 588 589 if (pid != InvalidPid) 590 { 591 if (!is_bgworker) 592 ereport(ERROR, 593 (errmsg("could not perform block dump because dump file is being used by PID %lu", 594 (unsigned long) apw_state->pid_using_dumpfile))); 595 596 ereport(LOG, 597 (errmsg("skipping block dump because it is already being performed by PID %lu", 598 (unsigned long) apw_state->pid_using_dumpfile))); 599 return 0; 600 } 601 602 block_info_array = 603 (BlockInfoRecord *) palloc(sizeof(BlockInfoRecord) * NBuffers); 604 605 for (num_blocks = 0, i = 0; i < NBuffers; i++) 606 { 607 uint32 buf_state; 608 609 CHECK_FOR_INTERRUPTS(); 610 611 bufHdr = GetBufferDescriptor(i); 612 613 /* Lock each buffer header before inspecting. */ 614 buf_state = LockBufHdr(bufHdr); 615 616 /* 617 * Unlogged tables will be automatically truncated after a crash or 618 * unclean shutdown. In such cases we need not prewarm them. Dump them 619 * only if requested by caller. 620 */ 621 if (buf_state & BM_TAG_VALID && 622 ((buf_state & BM_PERMANENT) || dump_unlogged)) 623 { 624 block_info_array[num_blocks].database = bufHdr->tag.rnode.dbNode; 625 block_info_array[num_blocks].tablespace = bufHdr->tag.rnode.spcNode; 626 block_info_array[num_blocks].filenode = bufHdr->tag.rnode.relNode; 627 block_info_array[num_blocks].forknum = bufHdr->tag.forkNum; 628 block_info_array[num_blocks].blocknum = bufHdr->tag.blockNum; 629 ++num_blocks; 630 } 631 632 UnlockBufHdr(bufHdr, buf_state); 633 } 634 635 snprintf(transient_dump_file_path, MAXPGPATH, "%s.tmp", AUTOPREWARM_FILE); 636 file = AllocateFile(transient_dump_file_path, "w"); 637 if (!file) 638 ereport(ERROR, 639 (errcode_for_file_access(), 640 errmsg("could not open file \"%s\": %m", 641 transient_dump_file_path))); 642 643 ret = fprintf(file, "<<%d>>\n", num_blocks); 644 if (ret < 0) 645 { 646 int save_errno = errno; 647 648 FreeFile(file); 649 unlink(transient_dump_file_path); 650 errno = save_errno; 651 ereport(ERROR, 652 (errcode_for_file_access(), 653 errmsg("could not write to file \"%s\": %m", 654 transient_dump_file_path))); 655 } 656 657 for (i = 0; i < num_blocks; i++) 658 { 659 CHECK_FOR_INTERRUPTS(); 660 661 ret = fprintf(file, "%u,%u,%u,%u,%u\n", 662 block_info_array[i].database, 663 block_info_array[i].tablespace, 664 block_info_array[i].filenode, 665 (uint32) block_info_array[i].forknum, 666 block_info_array[i].blocknum); 667 if (ret < 0) 668 { 669 int save_errno = errno; 670 671 FreeFile(file); 672 unlink(transient_dump_file_path); 673 errno = save_errno; 674 ereport(ERROR, 675 (errcode_for_file_access(), 676 errmsg("could not write to file \"%s\": %m", 677 transient_dump_file_path))); 678 } 679 } 680 681 pfree(block_info_array); 682 683 /* 684 * Rename transient_dump_file_path to AUTOPREWARM_FILE to make things 685 * permanent. 686 */ 687 ret = FreeFile(file); 688 if (ret != 0) 689 { 690 int save_errno = errno; 691 692 unlink(transient_dump_file_path); 693 errno = save_errno; 694 ereport(ERROR, 695 (errcode_for_file_access(), 696 errmsg("could not close file \"%s\": %m", 697 transient_dump_file_path))); 698 } 699 700 (void) durable_rename(transient_dump_file_path, AUTOPREWARM_FILE, ERROR); 701 apw_state->pid_using_dumpfile = InvalidPid; 702 703 ereport(DEBUG1, 704 (errmsg("wrote block details for %d blocks", num_blocks))); 705 return num_blocks; 706 } 707 708 /* 709 * SQL-callable function to launch autoprewarm. 710 */ 711 Datum 712 autoprewarm_start_worker(PG_FUNCTION_ARGS) 713 { 714 pid_t pid; 715 716 if (!autoprewarm) 717 ereport(ERROR, 718 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), 719 errmsg("autoprewarm is disabled"))); 720 721 apw_init_shmem(); 722 LWLockAcquire(&apw_state->lock, LW_EXCLUSIVE); 723 pid = apw_state->bgworker_pid; 724 LWLockRelease(&apw_state->lock); 725 726 if (pid != InvalidPid) 727 ereport(ERROR, 728 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), 729 errmsg("autoprewarm worker is already running under PID %lu", 730 (unsigned long) pid))); 731 732 apw_start_master_worker(); 733 734 PG_RETURN_VOID(); 735 } 736 737 /* 738 * SQL-callable function to perform an immediate block dump. 739 * 740 * Note: this is declared to return int8, as insurance against some 741 * very distant day when we might make NBuffers wider than int. 742 */ 743 Datum 744 autoprewarm_dump_now(PG_FUNCTION_ARGS) 745 { 746 int num_blocks; 747 748 apw_init_shmem(); 749 750 PG_ENSURE_ERROR_CLEANUP(apw_detach_shmem, 0); 751 { 752 num_blocks = apw_dump_now(false, true); 753 } 754 PG_END_ENSURE_ERROR_CLEANUP(apw_detach_shmem, 0); 755 756 PG_RETURN_INT64((int64) num_blocks); 757 } 758 759 /* 760 * Allocate and initialize autoprewarm related shared memory, if not already 761 * done, and set up backend-local pointer to that state. Returns true if an 762 * existing shared memory segment was found. 763 */ 764 static bool 765 apw_init_shmem(void) 766 { 767 bool found; 768 769 LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); 770 apw_state = ShmemInitStruct("autoprewarm", 771 sizeof(AutoPrewarmSharedState), 772 &found); 773 if (!found) 774 { 775 /* First time through ... */ 776 LWLockInitialize(&apw_state->lock, LWLockNewTrancheId()); 777 apw_state->bgworker_pid = InvalidPid; 778 apw_state->pid_using_dumpfile = InvalidPid; 779 } 780 LWLockRelease(AddinShmemInitLock); 781 782 LWLockRegisterTranche(apw_state->lock.tranche, "autoprewarm"); 783 784 return found; 785 } 786 787 /* 788 * Clear our PID from autoprewarm shared state. 789 */ 790 static void 791 apw_detach_shmem(int code, Datum arg) 792 { 793 LWLockAcquire(&apw_state->lock, LW_EXCLUSIVE); 794 if (apw_state->pid_using_dumpfile == MyProcPid) 795 apw_state->pid_using_dumpfile = InvalidPid; 796 if (apw_state->bgworker_pid == MyProcPid) 797 apw_state->bgworker_pid = InvalidPid; 798 LWLockRelease(&apw_state->lock); 799 } 800 801 /* 802 * Start autoprewarm master worker process. 803 */ 804 static void 805 apw_start_master_worker(void) 806 { 807 BackgroundWorker worker; 808 BackgroundWorkerHandle *handle; 809 BgwHandleStatus status; 810 pid_t pid; 811 812 MemSet(&worker, 0, sizeof(BackgroundWorker)); 813 worker.bgw_flags = BGWORKER_SHMEM_ACCESS; 814 worker.bgw_start_time = BgWorkerStart_ConsistentState; 815 strcpy(worker.bgw_library_name, "pg_prewarm"); 816 strcpy(worker.bgw_function_name, "autoprewarm_main"); 817 strcpy(worker.bgw_name, "autoprewarm master"); 818 strcpy(worker.bgw_type, "autoprewarm master"); 819 820 if (process_shared_preload_libraries_in_progress) 821 { 822 RegisterBackgroundWorker(&worker); 823 return; 824 } 825 826 /* must set notify PID to wait for startup */ 827 worker.bgw_notify_pid = MyProcPid; 828 829 if (!RegisterDynamicBackgroundWorker(&worker, &handle)) 830 ereport(ERROR, 831 (errcode(ERRCODE_INSUFFICIENT_RESOURCES), 832 errmsg("could not register background process"), 833 errhint("You may need to increase max_worker_processes."))); 834 835 status = WaitForBackgroundWorkerStartup(handle, &pid); 836 if (status != BGWH_STARTED) 837 ereport(ERROR, 838 (errcode(ERRCODE_INSUFFICIENT_RESOURCES), 839 errmsg("could not start background process"), 840 errhint("More details may be available in the server log."))); 841 } 842 843 /* 844 * Start autoprewarm per-database worker process. 845 */ 846 static void 847 apw_start_database_worker(void) 848 { 849 BackgroundWorker worker; 850 BackgroundWorkerHandle *handle; 851 852 MemSet(&worker, 0, sizeof(BackgroundWorker)); 853 worker.bgw_flags = 854 BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; 855 worker.bgw_start_time = BgWorkerStart_ConsistentState; 856 worker.bgw_restart_time = BGW_NEVER_RESTART; 857 strcpy(worker.bgw_library_name, "pg_prewarm"); 858 strcpy(worker.bgw_function_name, "autoprewarm_database_main"); 859 strcpy(worker.bgw_name, "autoprewarm worker"); 860 strcpy(worker.bgw_type, "autoprewarm worker"); 861 862 /* must set notify PID to wait for shutdown */ 863 worker.bgw_notify_pid = MyProcPid; 864 865 if (!RegisterDynamicBackgroundWorker(&worker, &handle)) 866 ereport(ERROR, 867 (errcode(ERRCODE_INSUFFICIENT_RESOURCES), 868 errmsg("registering dynamic bgworker autoprewarm failed"), 869 errhint("Consider increasing configuration parameter \"max_worker_processes\"."))); 870 871 /* 872 * Ignore return value; if it fails, postmaster has died, but we have 873 * checks for that elsewhere. 874 */ 875 WaitForBackgroundWorkerShutdown(handle); 876 } 877 878 /* Compare member elements to check whether they are not equal. */ 879 #define cmp_member_elem(fld) \ 880 do { \ 881 if (a->fld < b->fld) \ 882 return -1; \ 883 else if (a->fld > b->fld) \ 884 return 1; \ 885 } while(0) 886 887 /* 888 * apw_compare_blockinfo 889 * 890 * We depend on all records for a particular database being consecutive 891 * in the dump file; each per-database worker will preload blocks until 892 * it sees a block for some other database. Sorting by tablespace, 893 * filenode, forknum, and blocknum isn't critical for correctness, but 894 * helps us get a sequential I/O pattern. 895 */ 896 static int 897 apw_compare_blockinfo(const void *p, const void *q) 898 { 899 const BlockInfoRecord *a = (const BlockInfoRecord *) p; 900 const BlockInfoRecord *b = (const BlockInfoRecord *) q; 901 902 cmp_member_elem(database); 903 cmp_member_elem(tablespace); 904 cmp_member_elem(filenode); 905 cmp_member_elem(forknum); 906 cmp_member_elem(blocknum); 907 908 return 0; 909 } 910 911 /* 912 * Signal handler for SIGTERM 913 */ 914 static void 915 apw_sigterm_handler(SIGNAL_ARGS) 916 { 917 int save_errno = errno; 918 919 got_sigterm = true; 920 921 if (MyProc) 922 SetLatch(&MyProc->procLatch); 923 924 errno = save_errno; 925 } 926 927 /* 928 * Signal handler for SIGHUP 929 */ 930 static void 931 apw_sighup_handler(SIGNAL_ARGS) 932 { 933 int save_errno = errno; 934 935 got_sighup = true; 936 937 if (MyProc) 938 SetLatch(&MyProc->procLatch); 939 940 errno = save_errno; 941 } 942