1 /*-------------------------------------------------------------------------
2 *
3 * autoprewarm.c
4 * Periodically dump information about the blocks present in
5 * shared_buffers, and reload them on server restart.
6 *
7 * Due to locking considerations, we can't actually begin prewarming
8 * until the server reaches a consistent state. We need the catalogs
9 * to be consistent so that we can figure out which relation to lock,
10 * and we need to lock the relations so that we don't try to prewarm
11 * pages from a relation that is in the process of being dropped.
12 *
13 * While prewarming, autoprewarm will use two workers. There's a
14 * master worker that reads and sorts the list of blocks to be
15 * prewarmed and then launches a per-database worker for each
16 * relevant database in turn. The former keeps running after the
17 * initial prewarm is complete to update the dump file periodically.
18 *
19 * Copyright (c) 2016-2018, PostgreSQL Global Development Group
20 *
21 * IDENTIFICATION
22 * contrib/pg_prewarm/autoprewarm.c
23 *
24 *-------------------------------------------------------------------------
25 */
26
27 #include "postgres.h"
28
29 #include <unistd.h>
30
31 #include "access/heapam.h"
32 #include "access/xact.h"
33 #include "catalog/pg_class.h"
34 #include "catalog/pg_type.h"
35 #include "miscadmin.h"
36 #include "pgstat.h"
37 #include "postmaster/bgworker.h"
38 #include "storage/buf_internals.h"
39 #include "storage/dsm.h"
40 #include "storage/ipc.h"
41 #include "storage/latch.h"
42 #include "storage/lwlock.h"
43 #include "storage/proc.h"
44 #include "storage/procsignal.h"
45 #include "storage/shmem.h"
46 #include "storage/smgr.h"
47 #include "tcop/tcopprot.h"
48 #include "utils/acl.h"
49 #include "utils/guc.h"
50 #include "utils/memutils.h"
51 #include "utils/rel.h"
52 #include "utils/relfilenodemap.h"
53 #include "utils/resowner.h"
54
55 #define AUTOPREWARM_FILE "autoprewarm.blocks"
56
57 /* Metadata for each block we dump. */
58 typedef struct BlockInfoRecord
59 {
60 Oid database;
61 Oid tablespace;
62 Oid filenode;
63 ForkNumber forknum;
64 BlockNumber blocknum;
65 } BlockInfoRecord;
66
67 /* Shared state information for autoprewarm bgworker. */
68 typedef struct AutoPrewarmSharedState
69 {
70 LWLock lock; /* mutual exclusion */
71 pid_t bgworker_pid; /* for main bgworker */
72 pid_t pid_using_dumpfile; /* for autoprewarm or block dump */
73
74 /* Following items are for communication with per-database worker */
75 dsm_handle block_info_handle;
76 Oid database;
77 int prewarm_start_idx;
78 int prewarm_stop_idx;
79 int prewarmed_blocks;
80 } AutoPrewarmSharedState;
81
82 void _PG_init(void);
83 void autoprewarm_main(Datum main_arg);
84 void autoprewarm_database_main(Datum main_arg);
85
86 PG_FUNCTION_INFO_V1(autoprewarm_start_worker);
87 PG_FUNCTION_INFO_V1(autoprewarm_dump_now);
88
89 static void apw_load_buffers(void);
90 static int apw_dump_now(bool is_bgworker, bool dump_unlogged);
91 static void apw_start_master_worker(void);
92 static void apw_start_database_worker(void);
93 static bool apw_init_shmem(void);
94 static void apw_detach_shmem(int code, Datum arg);
95 static int apw_compare_blockinfo(const void *p, const void *q);
96 static void apw_sigterm_handler(SIGNAL_ARGS);
97 static void apw_sighup_handler(SIGNAL_ARGS);
98
99 /* Flags set by signal handlers */
100 static volatile sig_atomic_t got_sigterm = false;
101 static volatile sig_atomic_t got_sighup = false;
102
103 /* Pointer to shared-memory state. */
104 static AutoPrewarmSharedState *apw_state = NULL;
105
106 /* GUC variables. */
107 static bool autoprewarm = true; /* start worker? */
108 static int autoprewarm_interval; /* dump interval */
109
110 /*
111 * Module load callback.
112 */
113 void
_PG_init(void)114 _PG_init(void)
115 {
116 DefineCustomIntVariable("pg_prewarm.autoprewarm_interval",
117 "Sets the interval between dumps of shared buffers",
118 "If set to zero, time-based dumping is disabled.",
119 &autoprewarm_interval,
120 300,
121 0, INT_MAX / 1000,
122 PGC_SIGHUP,
123 GUC_UNIT_S,
124 NULL,
125 NULL,
126 NULL);
127
128 if (!process_shared_preload_libraries_in_progress)
129 return;
130
131 /* can't define PGC_POSTMASTER variable after startup */
132 DefineCustomBoolVariable("pg_prewarm.autoprewarm",
133 "Starts the autoprewarm worker.",
134 NULL,
135 &autoprewarm,
136 true,
137 PGC_POSTMASTER,
138 0,
139 NULL,
140 NULL,
141 NULL);
142
143 EmitWarningsOnPlaceholders("pg_prewarm");
144
145 RequestAddinShmemSpace(MAXALIGN(sizeof(AutoPrewarmSharedState)));
146
147 /* Register autoprewarm worker, if enabled. */
148 if (autoprewarm)
149 apw_start_master_worker();
150 }
151
152 /*
153 * Main entry point for the master autoprewarm process. Per-database workers
154 * have a separate entry point.
155 */
156 void
autoprewarm_main(Datum main_arg)157 autoprewarm_main(Datum main_arg)
158 {
159 bool first_time = true;
160 bool final_dump_allowed = true;
161 TimestampTz last_dump_time = 0;
162
163 /* Establish signal handlers; once that's done, unblock signals. */
164 pqsignal(SIGTERM, apw_sigterm_handler);
165 pqsignal(SIGHUP, apw_sighup_handler);
166 pqsignal(SIGUSR1, procsignal_sigusr1_handler);
167 BackgroundWorkerUnblockSignals();
168
169 /* Create (if necessary) and attach to our shared memory area. */
170 if (apw_init_shmem())
171 first_time = false;
172
173 /* Set on-detach hook so that our PID will be cleared on exit. */
174 on_shmem_exit(apw_detach_shmem, 0);
175
176 /*
177 * Store our PID in the shared memory area --- unless there's already
178 * another worker running, in which case just exit.
179 */
180 LWLockAcquire(&apw_state->lock, LW_EXCLUSIVE);
181 if (apw_state->bgworker_pid != InvalidPid)
182 {
183 LWLockRelease(&apw_state->lock);
184 ereport(LOG,
185 (errmsg("autoprewarm worker is already running under PID %lu",
186 (unsigned long) apw_state->bgworker_pid)));
187 return;
188 }
189 apw_state->bgworker_pid = MyProcPid;
190 LWLockRelease(&apw_state->lock);
191
192 /*
193 * Preload buffers from the dump file only if we just created the shared
194 * memory region. Otherwise, it's either already been done or shouldn't
195 * be done - e.g. because the old dump file has been overwritten since the
196 * server was started.
197 *
198 * There's not much point in performing a dump immediately after we finish
199 * preloading; so, if we do end up preloading, consider the last dump time
200 * to be equal to the current time.
201 *
202 * If apw_load_buffers() is terminated early by a shutdown request,
203 * prevent dumping out our state below the loop, because we'd effectively
204 * just truncate the saved state to however much we'd managed to preload.
205 */
206 if (first_time)
207 {
208 apw_load_buffers();
209 final_dump_allowed = !got_sigterm;
210 last_dump_time = GetCurrentTimestamp();
211 }
212
213 /* Periodically dump buffers until terminated. */
214 while (!got_sigterm)
215 {
216 int rc;
217
218 /* In case of a SIGHUP, just reload the configuration. */
219 if (got_sighup)
220 {
221 got_sighup = false;
222 ProcessConfigFile(PGC_SIGHUP);
223 }
224
225 if (autoprewarm_interval <= 0)
226 {
227 /* We're only dumping at shutdown, so just wait forever. */
228 rc = WaitLatch(&MyProc->procLatch,
229 WL_LATCH_SET | WL_POSTMASTER_DEATH,
230 -1L,
231 PG_WAIT_EXTENSION);
232 }
233 else
234 {
235 TimestampTz next_dump_time;
236 long delay_in_ms;
237
238 /* Compute the next dump time. */
239 next_dump_time =
240 TimestampTzPlusMilliseconds(last_dump_time,
241 autoprewarm_interval * 1000);
242 delay_in_ms =
243 TimestampDifferenceMilliseconds(GetCurrentTimestamp(),
244 next_dump_time);
245
246 /* Perform a dump if it's time. */
247 if (delay_in_ms <= 0)
248 {
249 last_dump_time = GetCurrentTimestamp();
250 apw_dump_now(true, false);
251 continue;
252 }
253
254 /* Sleep until the next dump time. */
255 rc = WaitLatch(&MyProc->procLatch,
256 WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
257 delay_in_ms,
258 PG_WAIT_EXTENSION);
259 }
260
261 /* Reset the latch, bail out if postmaster died, otherwise loop. */
262 ResetLatch(&MyProc->procLatch);
263 if (rc & WL_POSTMASTER_DEATH)
264 proc_exit(1);
265 }
266
267 /*
268 * Dump one last time. We assume this is probably the result of a system
269 * shutdown, although it's possible that we've merely been terminated.
270 */
271 if (final_dump_allowed)
272 apw_dump_now(true, true);
273 }
274
275 /*
276 * Read the dump file and launch per-database workers one at a time to
277 * prewarm the buffers found there.
278 */
279 static void
apw_load_buffers(void)280 apw_load_buffers(void)
281 {
282 FILE *file = NULL;
283 int num_elements,
284 i;
285 BlockInfoRecord *blkinfo;
286 dsm_segment *seg;
287
288 /*
289 * Skip the prewarm if the dump file is in use; otherwise, prevent any
290 * other process from writing it while we're using it.
291 */
292 LWLockAcquire(&apw_state->lock, LW_EXCLUSIVE);
293 if (apw_state->pid_using_dumpfile == InvalidPid)
294 apw_state->pid_using_dumpfile = MyProcPid;
295 else
296 {
297 LWLockRelease(&apw_state->lock);
298 ereport(LOG,
299 (errmsg("skipping prewarm because block dump file is being written by PID %lu",
300 (unsigned long) apw_state->pid_using_dumpfile)));
301 return;
302 }
303 LWLockRelease(&apw_state->lock);
304
305 /*
306 * Open the block dump file. Exit quietly if it doesn't exist, but report
307 * any other error.
308 */
309 file = AllocateFile(AUTOPREWARM_FILE, "r");
310 if (!file)
311 {
312 if (errno == ENOENT)
313 {
314 LWLockAcquire(&apw_state->lock, LW_EXCLUSIVE);
315 apw_state->pid_using_dumpfile = InvalidPid;
316 LWLockRelease(&apw_state->lock);
317 return; /* No file to load. */
318 }
319 ereport(ERROR,
320 (errcode_for_file_access(),
321 errmsg("could not read file \"%s\": %m",
322 AUTOPREWARM_FILE)));
323 }
324
325 /* First line of the file is a record count. */
326 if (fscanf(file, "<<%d>>\n", &num_elements) != 1)
327 ereport(ERROR,
328 (errcode_for_file_access(),
329 errmsg("could not read from file \"%s\": %m",
330 AUTOPREWARM_FILE)));
331
332 /* Allocate a dynamic shared memory segment to store the record data. */
333 seg = dsm_create(sizeof(BlockInfoRecord) * num_elements, 0);
334 blkinfo = (BlockInfoRecord *) dsm_segment_address(seg);
335
336 /* Read records, one per line. */
337 for (i = 0; i < num_elements; i++)
338 {
339 unsigned forknum;
340
341 if (fscanf(file, "%u,%u,%u,%u,%u\n", &blkinfo[i].database,
342 &blkinfo[i].tablespace, &blkinfo[i].filenode,
343 &forknum, &blkinfo[i].blocknum) != 5)
344 ereport(ERROR,
345 (errmsg("autoprewarm block dump file is corrupted at line %d",
346 i + 1)));
347 blkinfo[i].forknum = forknum;
348 }
349
350 FreeFile(file);
351
352 /* Sort the blocks to be loaded. */
353 pg_qsort(blkinfo, num_elements, sizeof(BlockInfoRecord),
354 apw_compare_blockinfo);
355
356 /* Populate shared memory state. */
357 apw_state->block_info_handle = dsm_segment_handle(seg);
358 apw_state->prewarm_start_idx = apw_state->prewarm_stop_idx = 0;
359 apw_state->prewarmed_blocks = 0;
360
361 /* Get the info position of the first block of the next database. */
362 while (apw_state->prewarm_start_idx < num_elements)
363 {
364 int j = apw_state->prewarm_start_idx;
365 Oid current_db = blkinfo[j].database;
366
367 /*
368 * Advance the prewarm_stop_idx to the first BlockRecordInfo that does
369 * not belong to this database.
370 */
371 j++;
372 while (j < num_elements)
373 {
374 if (current_db != blkinfo[j].database)
375 {
376 /*
377 * Combine BlockRecordInfos for global objects with those of
378 * the database.
379 */
380 if (current_db != InvalidOid)
381 break;
382 current_db = blkinfo[j].database;
383 }
384
385 j++;
386 }
387
388 /*
389 * If we reach this point with current_db == InvalidOid, then only
390 * BlockRecordInfos belonging to global objects exist. We can't
391 * prewarm without a database connection, so just bail out.
392 */
393 if (current_db == InvalidOid)
394 break;
395
396 /* Configure stop point and database for next per-database worker. */
397 apw_state->prewarm_stop_idx = j;
398 apw_state->database = current_db;
399 Assert(apw_state->prewarm_start_idx < apw_state->prewarm_stop_idx);
400
401 /* If we've run out of free buffers, don't launch another worker. */
402 if (!have_free_buffer())
403 break;
404
405 /*
406 * Likewise, don't launch if we've already been told to shut down.
407 * (The launch would fail anyway, but we might as well skip it.)
408 */
409 if (got_sigterm)
410 break;
411
412 /*
413 * Start a per-database worker to load blocks for this database; this
414 * function will return once the per-database worker exits.
415 */
416 apw_start_database_worker();
417
418 /* Prepare for next database. */
419 apw_state->prewarm_start_idx = apw_state->prewarm_stop_idx;
420 }
421
422 /* Clean up. */
423 dsm_detach(seg);
424 LWLockAcquire(&apw_state->lock, LW_EXCLUSIVE);
425 apw_state->block_info_handle = DSM_HANDLE_INVALID;
426 apw_state->pid_using_dumpfile = InvalidPid;
427 LWLockRelease(&apw_state->lock);
428
429 /* Report our success, if we were able to finish. */
430 if (!got_sigterm)
431 ereport(LOG,
432 (errmsg("autoprewarm successfully prewarmed %d of %d previously-loaded blocks",
433 apw_state->prewarmed_blocks, num_elements)));
434 }
435
436 /*
437 * Prewarm all blocks for one database (and possibly also global objects, if
438 * those got grouped with this database).
439 */
440 void
autoprewarm_database_main(Datum main_arg)441 autoprewarm_database_main(Datum main_arg)
442 {
443 int pos;
444 BlockInfoRecord *block_info;
445 Relation rel = NULL;
446 BlockNumber nblocks = 0;
447 BlockInfoRecord *old_blk = NULL;
448 dsm_segment *seg;
449
450 /* Establish signal handlers; once that's done, unblock signals. */
451 pqsignal(SIGTERM, die);
452 BackgroundWorkerUnblockSignals();
453
454 /* Connect to correct database and get block information. */
455 apw_init_shmem();
456 seg = dsm_attach(apw_state->block_info_handle);
457 if (seg == NULL)
458 ereport(ERROR,
459 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
460 errmsg("could not map dynamic shared memory segment")));
461 BackgroundWorkerInitializeConnectionByOid(apw_state->database, InvalidOid, 0);
462 block_info = (BlockInfoRecord *) dsm_segment_address(seg);
463 pos = apw_state->prewarm_start_idx;
464
465 /*
466 * Loop until we run out of blocks to prewarm or until we run out of free
467 * buffers.
468 */
469 while (pos < apw_state->prewarm_stop_idx && have_free_buffer())
470 {
471 BlockInfoRecord *blk = &block_info[pos++];
472 Buffer buf;
473
474 CHECK_FOR_INTERRUPTS();
475
476 /*
477 * Quit if we've reached records for another database. If previous
478 * blocks are of some global objects, then continue pre-warming.
479 */
480 if (old_blk != NULL && old_blk->database != blk->database &&
481 old_blk->database != 0)
482 break;
483
484 /*
485 * As soon as we encounter a block of a new relation, close the old
486 * relation. Note that rel will be NULL if try_relation_open failed
487 * previously; in that case, there is nothing to close.
488 */
489 if (old_blk != NULL && old_blk->filenode != blk->filenode &&
490 rel != NULL)
491 {
492 relation_close(rel, AccessShareLock);
493 rel = NULL;
494 CommitTransactionCommand();
495 }
496
497 /*
498 * Try to open each new relation, but only once, when we first
499 * encounter it. If it's been dropped, skip the associated blocks.
500 */
501 if (old_blk == NULL || old_blk->filenode != blk->filenode)
502 {
503 Oid reloid;
504
505 Assert(rel == NULL);
506 StartTransactionCommand();
507 reloid = RelidByRelfilenode(blk->tablespace, blk->filenode);
508 if (OidIsValid(reloid))
509 rel = try_relation_open(reloid, AccessShareLock);
510
511 if (!rel)
512 CommitTransactionCommand();
513 }
514 if (!rel)
515 {
516 old_blk = blk;
517 continue;
518 }
519
520 /* Once per fork, check for fork existence and size. */
521 if (old_blk == NULL ||
522 old_blk->filenode != blk->filenode ||
523 old_blk->forknum != blk->forknum)
524 {
525 RelationOpenSmgr(rel);
526
527 /*
528 * smgrexists is not safe for illegal forknum, hence check whether
529 * the passed forknum is valid before using it in smgrexists.
530 */
531 if (blk->forknum > InvalidForkNumber &&
532 blk->forknum <= MAX_FORKNUM &&
533 smgrexists(rel->rd_smgr, blk->forknum))
534 nblocks = RelationGetNumberOfBlocksInFork(rel, blk->forknum);
535 else
536 nblocks = 0;
537 }
538
539 /* Check whether blocknum is valid and within fork file size. */
540 if (blk->blocknum >= nblocks)
541 {
542 /* Move to next forknum. */
543 old_blk = blk;
544 continue;
545 }
546
547 /* Prewarm buffer. */
548 buf = ReadBufferExtended(rel, blk->forknum, blk->blocknum, RBM_NORMAL,
549 NULL);
550 if (BufferIsValid(buf))
551 {
552 apw_state->prewarmed_blocks++;
553 ReleaseBuffer(buf);
554 }
555
556 old_blk = blk;
557 }
558
559 dsm_detach(seg);
560
561 /* Release lock on previous relation. */
562 if (rel)
563 {
564 relation_close(rel, AccessShareLock);
565 CommitTransactionCommand();
566 }
567 }
568
569 /*
570 * Dump information on blocks in shared buffers. We use a text format here
571 * so that it's easy to understand and even change the file contents if
572 * necessary.
573 * Returns the number of blocks dumped.
574 */
575 static int
apw_dump_now(bool is_bgworker,bool dump_unlogged)576 apw_dump_now(bool is_bgworker, bool dump_unlogged)
577 {
578 int num_blocks;
579 int i;
580 int ret;
581 BlockInfoRecord *block_info_array;
582 BufferDesc *bufHdr;
583 FILE *file;
584 char transient_dump_file_path[MAXPGPATH];
585 pid_t pid;
586
587 LWLockAcquire(&apw_state->lock, LW_EXCLUSIVE);
588 pid = apw_state->pid_using_dumpfile;
589 if (apw_state->pid_using_dumpfile == InvalidPid)
590 apw_state->pid_using_dumpfile = MyProcPid;
591 LWLockRelease(&apw_state->lock);
592
593 if (pid != InvalidPid)
594 {
595 if (!is_bgworker)
596 ereport(ERROR,
597 (errmsg("could not perform block dump because dump file is being used by PID %lu",
598 (unsigned long) apw_state->pid_using_dumpfile)));
599
600 ereport(LOG,
601 (errmsg("skipping block dump because it is already being performed by PID %lu",
602 (unsigned long) apw_state->pid_using_dumpfile)));
603 return 0;
604 }
605
606 block_info_array =
607 (BlockInfoRecord *) palloc(sizeof(BlockInfoRecord) * NBuffers);
608
609 for (num_blocks = 0, i = 0; i < NBuffers; i++)
610 {
611 uint32 buf_state;
612
613 CHECK_FOR_INTERRUPTS();
614
615 bufHdr = GetBufferDescriptor(i);
616
617 /* Lock each buffer header before inspecting. */
618 buf_state = LockBufHdr(bufHdr);
619
620 /*
621 * Unlogged tables will be automatically truncated after a crash or
622 * unclean shutdown. In such cases we need not prewarm them. Dump them
623 * only if requested by caller.
624 */
625 if (buf_state & BM_TAG_VALID &&
626 ((buf_state & BM_PERMANENT) || dump_unlogged))
627 {
628 block_info_array[num_blocks].database = bufHdr->tag.rnode.dbNode;
629 block_info_array[num_blocks].tablespace = bufHdr->tag.rnode.spcNode;
630 block_info_array[num_blocks].filenode = bufHdr->tag.rnode.relNode;
631 block_info_array[num_blocks].forknum = bufHdr->tag.forkNum;
632 block_info_array[num_blocks].blocknum = bufHdr->tag.blockNum;
633 ++num_blocks;
634 }
635
636 UnlockBufHdr(bufHdr, buf_state);
637 }
638
639 snprintf(transient_dump_file_path, MAXPGPATH, "%s.tmp", AUTOPREWARM_FILE);
640 file = AllocateFile(transient_dump_file_path, "w");
641 if (!file)
642 ereport(ERROR,
643 (errcode_for_file_access(),
644 errmsg("could not open file \"%s\": %m",
645 transient_dump_file_path)));
646
647 ret = fprintf(file, "<<%d>>\n", num_blocks);
648 if (ret < 0)
649 {
650 int save_errno = errno;
651
652 FreeFile(file);
653 unlink(transient_dump_file_path);
654 errno = save_errno;
655 ereport(ERROR,
656 (errcode_for_file_access(),
657 errmsg("could not write to file \"%s\" : %m",
658 transient_dump_file_path)));
659 }
660
661 for (i = 0; i < num_blocks; i++)
662 {
663 CHECK_FOR_INTERRUPTS();
664
665 ret = fprintf(file, "%u,%u,%u,%u,%u\n",
666 block_info_array[i].database,
667 block_info_array[i].tablespace,
668 block_info_array[i].filenode,
669 (uint32) block_info_array[i].forknum,
670 block_info_array[i].blocknum);
671 if (ret < 0)
672 {
673 int save_errno = errno;
674
675 FreeFile(file);
676 unlink(transient_dump_file_path);
677 errno = save_errno;
678 ereport(ERROR,
679 (errcode_for_file_access(),
680 errmsg("could not write to file \"%s\" : %m",
681 transient_dump_file_path)));
682 }
683 }
684
685 pfree(block_info_array);
686
687 /*
688 * Rename transient_dump_file_path to AUTOPREWARM_FILE to make things
689 * permanent.
690 */
691 ret = FreeFile(file);
692 if (ret != 0)
693 {
694 int save_errno = errno;
695
696 unlink(transient_dump_file_path);
697 errno = save_errno;
698 ereport(ERROR,
699 (errcode_for_file_access(),
700 errmsg("could not close file \"%s\" : %m",
701 transient_dump_file_path)));
702 }
703
704 (void) durable_rename(transient_dump_file_path, AUTOPREWARM_FILE, ERROR);
705 apw_state->pid_using_dumpfile = InvalidPid;
706
707 ereport(DEBUG1,
708 (errmsg("wrote block details for %d blocks", num_blocks)));
709 return num_blocks;
710 }
711
712 /*
713 * SQL-callable function to launch autoprewarm.
714 */
715 Datum
autoprewarm_start_worker(PG_FUNCTION_ARGS)716 autoprewarm_start_worker(PG_FUNCTION_ARGS)
717 {
718 pid_t pid;
719
720 if (!autoprewarm)
721 ereport(ERROR,
722 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
723 errmsg("autoprewarm is disabled")));
724
725 apw_init_shmem();
726 LWLockAcquire(&apw_state->lock, LW_EXCLUSIVE);
727 pid = apw_state->bgworker_pid;
728 LWLockRelease(&apw_state->lock);
729
730 if (pid != InvalidPid)
731 ereport(ERROR,
732 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
733 errmsg("autoprewarm worker is already running under PID %lu",
734 (unsigned long) pid)));
735
736 apw_start_master_worker();
737
738 PG_RETURN_VOID();
739 }
740
741 /*
742 * SQL-callable function to perform an immediate block dump.
743 *
744 * Note: this is declared to return int8, as insurance against some
745 * very distant day when we might make NBuffers wider than int.
746 */
747 Datum
autoprewarm_dump_now(PG_FUNCTION_ARGS)748 autoprewarm_dump_now(PG_FUNCTION_ARGS)
749 {
750 int num_blocks;
751
752 apw_init_shmem();
753
754 PG_ENSURE_ERROR_CLEANUP(apw_detach_shmem, 0);
755 {
756 num_blocks = apw_dump_now(false, true);
757 }
758 PG_END_ENSURE_ERROR_CLEANUP(apw_detach_shmem, 0);
759
760 PG_RETURN_INT64((int64) num_blocks);
761 }
762
763 /*
764 * Allocate and initialize autoprewarm related shared memory, if not already
765 * done, and set up backend-local pointer to that state. Returns true if an
766 * existing shared memory segment was found.
767 */
768 static bool
apw_init_shmem(void)769 apw_init_shmem(void)
770 {
771 bool found;
772
773 LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
774 apw_state = ShmemInitStruct("autoprewarm",
775 sizeof(AutoPrewarmSharedState),
776 &found);
777 if (!found)
778 {
779 /* First time through ... */
780 LWLockInitialize(&apw_state->lock, LWLockNewTrancheId());
781 apw_state->bgworker_pid = InvalidPid;
782 apw_state->pid_using_dumpfile = InvalidPid;
783 }
784 LWLockRelease(AddinShmemInitLock);
785
786 LWLockRegisterTranche(apw_state->lock.tranche, "autoprewarm");
787
788 return found;
789 }
790
791 /*
792 * Clear our PID from autoprewarm shared state.
793 */
794 static void
apw_detach_shmem(int code,Datum arg)795 apw_detach_shmem(int code, Datum arg)
796 {
797 LWLockAcquire(&apw_state->lock, LW_EXCLUSIVE);
798 if (apw_state->pid_using_dumpfile == MyProcPid)
799 apw_state->pid_using_dumpfile = InvalidPid;
800 if (apw_state->bgworker_pid == MyProcPid)
801 apw_state->bgworker_pid = InvalidPid;
802 LWLockRelease(&apw_state->lock);
803 }
804
805 /*
806 * Start autoprewarm master worker process.
807 */
808 static void
apw_start_master_worker(void)809 apw_start_master_worker(void)
810 {
811 BackgroundWorker worker;
812 BackgroundWorkerHandle *handle;
813 BgwHandleStatus status;
814 pid_t pid;
815
816 MemSet(&worker, 0, sizeof(BackgroundWorker));
817 worker.bgw_flags = BGWORKER_SHMEM_ACCESS;
818 worker.bgw_start_time = BgWorkerStart_ConsistentState;
819 strcpy(worker.bgw_library_name, "pg_prewarm");
820 strcpy(worker.bgw_function_name, "autoprewarm_main");
821 strcpy(worker.bgw_name, "autoprewarm master");
822 strcpy(worker.bgw_type, "autoprewarm master");
823
824 if (process_shared_preload_libraries_in_progress)
825 {
826 RegisterBackgroundWorker(&worker);
827 return;
828 }
829
830 /* must set notify PID to wait for startup */
831 worker.bgw_notify_pid = MyProcPid;
832
833 if (!RegisterDynamicBackgroundWorker(&worker, &handle))
834 ereport(ERROR,
835 (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
836 errmsg("could not register background process"),
837 errhint("You may need to increase max_worker_processes.")));
838
839 status = WaitForBackgroundWorkerStartup(handle, &pid);
840 if (status != BGWH_STARTED)
841 ereport(ERROR,
842 (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
843 errmsg("could not start background process"),
844 errhint("More details may be available in the server log.")));
845 }
846
847 /*
848 * Start autoprewarm per-database worker process.
849 */
850 static void
apw_start_database_worker(void)851 apw_start_database_worker(void)
852 {
853 BackgroundWorker worker;
854 BackgroundWorkerHandle *handle;
855
856 MemSet(&worker, 0, sizeof(BackgroundWorker));
857 worker.bgw_flags =
858 BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
859 worker.bgw_start_time = BgWorkerStart_ConsistentState;
860 worker.bgw_restart_time = BGW_NEVER_RESTART;
861 strcpy(worker.bgw_library_name, "pg_prewarm");
862 strcpy(worker.bgw_function_name, "autoprewarm_database_main");
863 strcpy(worker.bgw_name, "autoprewarm worker");
864 strcpy(worker.bgw_type, "autoprewarm worker");
865
866 /* must set notify PID to wait for shutdown */
867 worker.bgw_notify_pid = MyProcPid;
868
869 if (!RegisterDynamicBackgroundWorker(&worker, &handle))
870 ereport(ERROR,
871 (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
872 errmsg("registering dynamic bgworker autoprewarm failed"),
873 errhint("Consider increasing configuration parameter \"max_worker_processes\".")));
874
875 /*
876 * Ignore return value; if it fails, postmaster has died, but we have
877 * checks for that elsewhere.
878 */
879 WaitForBackgroundWorkerShutdown(handle);
880 }
881
882 /* Compare member elements to check whether they are not equal. */
883 #define cmp_member_elem(fld) \
884 do { \
885 if (a->fld < b->fld) \
886 return -1; \
887 else if (a->fld > b->fld) \
888 return 1; \
889 } while(0)
890
891 /*
892 * apw_compare_blockinfo
893 *
894 * We depend on all records for a particular database being consecutive
895 * in the dump file; each per-database worker will preload blocks until
896 * it sees a block for some other database. Sorting by tablespace,
897 * filenode, forknum, and blocknum isn't critical for correctness, but
898 * helps us get a sequential I/O pattern.
899 */
900 static int
apw_compare_blockinfo(const void * p,const void * q)901 apw_compare_blockinfo(const void *p, const void *q)
902 {
903 const BlockInfoRecord *a = (const BlockInfoRecord *) p;
904 const BlockInfoRecord *b = (const BlockInfoRecord *) q;
905
906 cmp_member_elem(database);
907 cmp_member_elem(tablespace);
908 cmp_member_elem(filenode);
909 cmp_member_elem(forknum);
910 cmp_member_elem(blocknum);
911
912 return 0;
913 }
914
915 /*
916 * Signal handler for SIGTERM
917 */
918 static void
apw_sigterm_handler(SIGNAL_ARGS)919 apw_sigterm_handler(SIGNAL_ARGS)
920 {
921 int save_errno = errno;
922
923 got_sigterm = true;
924
925 if (MyProc)
926 SetLatch(&MyProc->procLatch);
927
928 errno = save_errno;
929 }
930
931 /*
932 * Signal handler for SIGHUP
933 */
934 static void
apw_sighup_handler(SIGNAL_ARGS)935 apw_sighup_handler(SIGNAL_ARGS)
936 {
937 int save_errno = errno;
938
939 got_sighup = true;
940
941 if (MyProc)
942 SetLatch(&MyProc->procLatch);
943
944 errno = save_errno;
945 }
946