1 /*--------------------------------------------------------------------
2 * bgworker.c
3 * POSTGRES pluggable background workers implementation
4 *
5 * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
6 *
7 * IDENTIFICATION
8 * src/backend/postmaster/bgworker.c
9 *
10 *-------------------------------------------------------------------------
11 */
12
13 #include "postgres.h"
14
15 #include "access/parallel.h"
16 #include "libpq/pqsignal.h"
17 #include "miscadmin.h"
18 #include "pgstat.h"
19 #include "port/atomics.h"
20 #include "postmaster/bgworker_internals.h"
21 #include "postmaster/interrupt.h"
22 #include "postmaster/postmaster.h"
23 #include "replication/logicallauncher.h"
24 #include "replication/logicalworker.h"
25 #include "storage/dsm.h"
26 #include "storage/ipc.h"
27 #include "storage/latch.h"
28 #include "storage/lwlock.h"
29 #include "storage/pg_shmem.h"
30 #include "storage/pmsignal.h"
31 #include "storage/proc.h"
32 #include "storage/procsignal.h"
33 #include "storage/shmem.h"
34 #include "tcop/tcopprot.h"
35 #include "utils/ascii.h"
36 #include "utils/ps_status.h"
37 #include "utils/timeout.h"
38
39 /*
40 * The postmaster's list of registered background workers, in private memory.
41 */
42 slist_head BackgroundWorkerList = SLIST_STATIC_INIT(BackgroundWorkerList);
43
44 /*
45 * BackgroundWorkerSlots exist in shared memory and can be accessed (via
46 * the BackgroundWorkerArray) by both the postmaster and by regular backends.
47 * However, the postmaster cannot take locks, even spinlocks, because this
48 * might allow it to crash or become wedged if shared memory gets corrupted.
49 * Such an outcome is intolerable. Therefore, we need a lockless protocol
50 * for coordinating access to this data.
51 *
52 * The 'in_use' flag is used to hand off responsibility for the slot between
53 * the postmaster and the rest of the system. When 'in_use' is false,
54 * the postmaster will ignore the slot entirely, except for the 'in_use' flag
55 * itself, which it may read. In this state, regular backends may modify the
56 * slot. Once a backend sets 'in_use' to true, the slot becomes the
57 * responsibility of the postmaster. Regular backends may no longer modify it,
58 * but the postmaster may examine it. Thus, a backend initializing a slot
59 * must fully initialize the slot - and insert a write memory barrier - before
60 * marking it as in use.
61 *
62 * As an exception, however, even when the slot is in use, regular backends
63 * may set the 'terminate' flag for a slot, telling the postmaster not
64 * to restart it. Once the background worker is no longer running, the slot
65 * will be released for reuse.
66 *
67 * In addition to coordinating with the postmaster, backends modifying this
68 * data structure must coordinate with each other. Since they can take locks,
69 * this is straightforward: any backend wishing to manipulate a slot must
70 * take BackgroundWorkerLock in exclusive mode. Backends wishing to read
71 * data that might get concurrently modified by other backends should take
72 * this lock in shared mode. No matter what, backends reading this data
73 * structure must be able to tolerate concurrent modifications by the
74 * postmaster.
75 */
76 typedef struct BackgroundWorkerSlot
77 {
78 bool in_use;
79 bool terminate;
80 pid_t pid; /* InvalidPid = not started yet; 0 = dead */
81 uint64 generation; /* incremented when slot is recycled */
82 BackgroundWorker worker;
83 } BackgroundWorkerSlot;
84
85 /*
86 * In order to limit the total number of parallel workers (according to
87 * max_parallel_workers GUC), we maintain the number of active parallel
88 * workers. Since the postmaster cannot take locks, two variables are used for
89 * this purpose: the number of registered parallel workers (modified by the
90 * backends, protected by BackgroundWorkerLock) and the number of terminated
91 * parallel workers (modified only by the postmaster, lockless). The active
92 * number of parallel workers is the number of registered workers minus the
93 * terminated ones. These counters can of course overflow, but it's not
94 * important here since the subtraction will still give the right number.
95 */
96 typedef struct BackgroundWorkerArray
97 {
98 int total_slots;
new(re: T, im: T) -> Self99 uint32 parallel_register_count;
100 uint32 parallel_terminate_count;
101 BackgroundWorkerSlot slot[FLEXIBLE_ARRAY_MEMBER];
102 } BackgroundWorkerArray;
103
104 struct BackgroundWorkerHandle
105 {
new(re: T, im: T) -> Self106 int slot;
107 uint64 generation;
108 };
109
110 static BackgroundWorkerArray *BackgroundWorkerData;
111
112 /*
113 * List of internal background worker entry points. We need this for
i() -> Self114 * reasons explained in LookupBackgroundWorkerFunction(), below.
115 */
116 static const struct
117 {
118 const char *fn_name;
119 bgworker_main_type fn_addr;
120 } InternalBGWorkers[] =
norm_sqr(&self) -> T121
122 {
123 {
124 "ParallelWorkerMain", ParallelWorkerMain
125 },
126 {
127 "ApplyLauncherMain", ApplyLauncherMain
128 },
129 {
130 "ApplyWorkerMain", ApplyWorkerMain
131 }
132 };
unscale(&self, t: T) -> Self133
134 /* Private functions. */
135 static bgworker_main_type LookupBackgroundWorkerFunction(const char *libraryname, const char *funcname);
136
137
138 /*
powu(&self, exp: u32) -> Self139 * Calculate shared memory needed.
140 */
141 Size
142 BackgroundWorkerShmemSize(void)
143 {
144 Size size;
145
146 /* Array of workers is variably sized. */
147 size = offsetof(BackgroundWorkerArray, slot);
148 size = add_size(size, mul_size(max_worker_processes,
149 sizeof(BackgroundWorkerSlot)));
150
151 return size;
152 }
inv(&self) -> Self153
154 /*
155 * Initialize shared memory.
156 */
157 void
158 BackgroundWorkerShmemInit(void)
159 {
160 bool found;
161
162 BackgroundWorkerData = ShmemInitStruct("Background Worker Data",
163 BackgroundWorkerShmemSize(),
164 &found);
165 if (!IsUnderPostmaster)
166 {
167 slist_iter siter;
168 int slotno = 0;
169
170 BackgroundWorkerData->total_slots = max_worker_processes;
171 BackgroundWorkerData->parallel_register_count = 0;
172 BackgroundWorkerData->parallel_terminate_count = 0;
173
174 /*
175 * Copy contents of worker list into shared memory. Record the shared
176 * memory slot assigned to each worker. This ensures a 1-to-1
177 * correspondence between the postmaster's private list and the array
178 * in shared memory.
179 */
180 slist_foreach(siter, &BackgroundWorkerList)
181 {
182 BackgroundWorkerSlot *slot = &BackgroundWorkerData->slot[slotno];
183 RegisteredBgWorker *rw;
184
185 rw = slist_container(RegisteredBgWorker, rw_lnode, siter.cur);
186 Assert(slotno < max_worker_processes);
187 slot->in_use = true;
188 slot->terminate = false;
189 slot->pid = InvalidPid;
190 slot->generation = 0;
191 rw->rw_shmem_slot = slotno;
192 rw->rw_worker.bgw_notify_pid = 0; /* might be reinit after crash */
193 memcpy(&slot->worker, &rw->rw_worker, sizeof(BackgroundWorker));
194 ++slotno;
195 }
196
197 /*
198 * Mark any remaining slots as not in use.
199 */
200 while (slotno < max_worker_processes)
201 {
202 BackgroundWorkerSlot *slot = &BackgroundWorkerData->slot[slotno];
203
204 slot->in_use = false;
205 ++slotno;
206 }
207 }
208 else
209 Assert(found);
210 }
211
212 /*
213 * Search the postmaster's backend-private list of RegisteredBgWorker objects
214 * for the one that maps to the given slot number.
215 */
216 static RegisteredBgWorker *
217 FindRegisteredWorkerBySlotNumber(int slotno)
ln(&self) -> Self218 {
219 slist_iter siter;
220
221 slist_foreach(siter, &BackgroundWorkerList)
222 {
223 RegisteredBgWorker *rw;
224
225 rw = slist_container(RegisteredBgWorker, rw_lnode, siter.cur);
226 if (rw->rw_shmem_slot == slotno)
227 return rw;
228 }
229
230 return NULL;
231 }
sqrt(&self) -> Self232
233 /*
234 * Notice changes to shared memory made by other backends.
235 * Accept new worker requests only if allow_new_workers is true.
236 *
237 * This code runs in the postmaster, so we must be very careful not to assume
238 * that shared memory contents are sane. Otherwise, a rogue backend could
239 * take out the postmaster.
240 */
241 void
242 BackgroundWorkerStateChange(bool allow_new_workers)
243 {
244 int slotno;
245
246 /*
247 * The total number of slots stored in shared memory should match our
248 * notion of max_worker_processes. If it does not, something is very
249 * wrong. Further down, we always refer to this value as
250 * max_worker_processes, in case shared memory gets corrupted while we're
251 * looping.
252 */
253 if (max_worker_processes != BackgroundWorkerData->total_slots)
254 {
255 elog(LOG,
256 "inconsistent background worker state (max_worker_processes=%d, total_slots=%d",
257 max_worker_processes,
258 BackgroundWorkerData->total_slots);
259 return;
260 }
261
262 /*
263 * Iterate through slots, looking for newly-registered workers or workers
264 * who must die.
265 */
266 for (slotno = 0; slotno < max_worker_processes; ++slotno)
267 {
268 BackgroundWorkerSlot *slot = &BackgroundWorkerData->slot[slotno];
269 RegisteredBgWorker *rw;
270
271 if (!slot->in_use)
272 continue;
273
274 /*
275 * Make sure we don't see the in_use flag before the updated slot
276 * contents.
277 */
278 pg_read_barrier();
279
280 /* See whether we already know about this worker. */
281 rw = FindRegisteredWorkerBySlotNumber(slotno);
282 if (rw != NULL)
283 {
284 /*
285 * In general, the worker data can't change after it's initially
286 * registered. However, someone can set the terminate flag.
287 */
288 if (slot->terminate && !rw->rw_terminate)
289 {
290 rw->rw_terminate = true;
291 if (rw->rw_pid != 0)
292 kill(rw->rw_pid, SIGTERM);
293 else
294 {
295 /* Report never-started, now-terminated worker as dead. */
296 ReportBackgroundWorkerPID(rw);
297 }
298 }
299 continue;
300 }
301
302 /*
303 * If we aren't allowing new workers, then immediately mark it for
304 * termination; the next stanza will take care of cleaning it up.
305 * Doing this ensures that any process waiting for the worker will get
306 * awoken, even though the worker will never be allowed to run.
307 */
308 if (!allow_new_workers)
309 slot->terminate = true;
310
311 /*
312 * If the worker is marked for termination, we don't need to add it to
313 * the registered workers list; we can just free the slot. However, if
314 * bgw_notify_pid is set, the process that registered the worker may
315 * need to know that we've processed the terminate request, so be sure
316 * to signal it.
317 */
318 if (slot->terminate)
319 {
320 int notify_pid;
321
322 /*
323 * We need a memory barrier here to make sure that the load of
324 * bgw_notify_pid and the update of parallel_terminate_count
325 * complete before the store to in_use.
326 */
327 notify_pid = slot->worker.bgw_notify_pid;
328 if ((slot->worker.bgw_flags & BGWORKER_CLASS_PARALLEL) != 0)
329 BackgroundWorkerData->parallel_terminate_count++;
330 slot->pid = 0;
331
332 pg_memory_barrier();
333 slot->in_use = false;
334
335 if (notify_pid != 0)
336 kill(notify_pid, SIGUSR1);
337
338 continue;
339 }
340
341 /*
342 * Copy the registration data into the registered workers list.
343 */
344 rw = malloc(sizeof(RegisteredBgWorker));
345 if (rw == NULL)
346 {
347 ereport(LOG,
348 (errcode(ERRCODE_OUT_OF_MEMORY),
349 errmsg("out of memory")));
350 return;
351 }
352
353 /*
354 * Copy strings in a paranoid way. If shared memory is corrupted, the
355 * source data might not even be NUL-terminated.
356 */
357 ascii_safe_strlcpy(rw->rw_worker.bgw_name,
358 slot->worker.bgw_name, BGW_MAXLEN);
359 ascii_safe_strlcpy(rw->rw_worker.bgw_type,
360 slot->worker.bgw_type, BGW_MAXLEN);
361 ascii_safe_strlcpy(rw->rw_worker.bgw_library_name,
362 slot->worker.bgw_library_name, BGW_MAXLEN);
363 ascii_safe_strlcpy(rw->rw_worker.bgw_function_name,
364 slot->worker.bgw_function_name, BGW_MAXLEN);
365
366 /*
367 * Copy various fixed-size fields.
368 *
369 * flags, start_time, and restart_time are examined by the postmaster,
370 * but nothing too bad will happen if they are corrupted. The
371 * remaining fields will only be examined by the child process. It
372 * might crash, but we won't.
373 */
374 rw->rw_worker.bgw_flags = slot->worker.bgw_flags;
375 rw->rw_worker.bgw_start_time = slot->worker.bgw_start_time;
376 rw->rw_worker.bgw_restart_time = slot->worker.bgw_restart_time;
377 rw->rw_worker.bgw_main_arg = slot->worker.bgw_main_arg;
378 memcpy(rw->rw_worker.bgw_extra, slot->worker.bgw_extra, BGW_EXTRALEN);
379
380 /*
381 * Copy the PID to be notified about state changes, but only if the
382 * postmaster knows about a backend with that PID. It isn't an error
383 * if the postmaster doesn't know about the PID, because the backend
384 * that requested the worker could have died (or been killed) just
385 * after doing so. Nonetheless, at least until we get some experience
386 * with how this plays out in the wild, log a message at a relative
387 * high debug level.
388 */
389 rw->rw_worker.bgw_notify_pid = slot->worker.bgw_notify_pid;
390 if (!PostmasterMarkPIDForWorkerNotify(rw->rw_worker.bgw_notify_pid))
391 {
392 elog(DEBUG1, "worker notification PID %lu is not valid",
393 (long) rw->rw_worker.bgw_notify_pid);
394 rw->rw_worker.bgw_notify_pid = 0;
395 }
396
397 /* Initialize postmaster bookkeeping. */
398 rw->rw_backend = NULL;
399 rw->rw_pid = 0;
400 rw->rw_child_slot = 0;
401 rw->rw_crashed_at = 0;
402 rw->rw_shmem_slot = slotno;
403 rw->rw_terminate = false;
404
405 /* Log it! */
406 ereport(DEBUG1,
407 (errmsg("registering background worker \"%s\"",
408 rw->rw_worker.bgw_name)));
409
410 slist_push_head(&BackgroundWorkerList, &rw->rw_lnode);
411 }
412 }
413
414 /*
415 * Forget about a background worker that's no longer needed.
416 *
417 * The worker must be identified by passing an slist_mutable_iter that
418 * points to it. This convention allows deletion of workers during
419 * searches of the worker list, and saves having to search the list again.
420 *
acos(&self) -> Self421 * Caller is responsible for notifying bgw_notify_pid, if appropriate.
422 *
423 * This function must be invoked only in the postmaster.
424 */
425 void
426 ForgetBackgroundWorker(slist_mutable_iter *cur)
427 {
428 RegisteredBgWorker *rw;
429 BackgroundWorkerSlot *slot;
430
431 rw = slist_container(RegisteredBgWorker, rw_lnode, cur->cur);
432
433 Assert(rw->rw_shmem_slot < max_worker_processes);
434 slot = &BackgroundWorkerData->slot[rw->rw_shmem_slot];
435 Assert(slot->in_use);
436
437 /*
438 * We need a memory barrier here to make sure that the update of
439 * parallel_terminate_count completes before the store to in_use.
440 */
441 if ((rw->rw_worker.bgw_flags & BGWORKER_CLASS_PARALLEL) != 0)
442 BackgroundWorkerData->parallel_terminate_count++;
443
444 pg_memory_barrier();
445 slot->in_use = false;
446
447 ereport(DEBUG1,
448 (errmsg("unregistering background worker \"%s\"",
449 rw->rw_worker.bgw_name)));
450
451 slist_delete_current(cur);
452 free(rw);
453 }
454
455 /*
456 * Report the PID of a newly-launched background worker in shared memory.
457 *
458 * This function should only be called from the postmaster.
459 */
460 void
cosh(&self) -> Self461 ReportBackgroundWorkerPID(RegisteredBgWorker *rw)
462 {
463 BackgroundWorkerSlot *slot;
464
465 Assert(rw->rw_shmem_slot < max_worker_processes);
466 slot = &BackgroundWorkerData->slot[rw->rw_shmem_slot];
467 slot->pid = rw->rw_pid;
468
469 if (rw->rw_worker.bgw_notify_pid != 0)
470 kill(rw->rw_worker.bgw_notify_pid, SIGUSR1);
471 }
472
473 /*
474 * Report that the PID of a background worker is now zero because a
475 * previously-running background worker has exited.
476 *
477 * This function should only be called from the postmaster.
478 */
479 void
480 ReportBackgroundWorkerExit(slist_mutable_iter *cur)
481 {
482 RegisteredBgWorker *rw;
483 BackgroundWorkerSlot *slot;
484 int notify_pid;
485
asinh(&self) -> Self486 rw = slist_container(RegisteredBgWorker, rw_lnode, cur->cur);
487
488 Assert(rw->rw_shmem_slot < max_worker_processes);
489 slot = &BackgroundWorkerData->slot[rw->rw_shmem_slot];
490 slot->pid = rw->rw_pid;
491 notify_pid = rw->rw_worker.bgw_notify_pid;
492
493 /*
494 * If this worker is slated for deregistration, do that before notifying
495 * the process which started it. Otherwise, if that process tries to
496 * reuse the slot immediately, it might not be available yet. In theory
497 * that could happen anyway if the process checks slot->pid at just the
498 * wrong moment, but this makes the window narrower.
499 */
acosh(&self) -> Self500 if (rw->rw_terminate ||
501 rw->rw_worker.bgw_restart_time == BGW_NEVER_RESTART)
502 ForgetBackgroundWorker(cur);
503
504 if (notify_pid != 0)
505 kill(notify_pid, SIGUSR1);
506 }
507
508 /*
509 * Cancel SIGUSR1 notifications for a PID belonging to an exiting backend.
510 *
511 * This function should only be called from the postmaster.
512 */
513 void
514 BackgroundWorkerStopNotifications(pid_t pid)
515 {
atanh(&self) -> Self516 slist_iter siter;
517
518 slist_foreach(siter, &BackgroundWorkerList)
519 {
520 RegisteredBgWorker *rw;
521
522 rw = slist_container(RegisteredBgWorker, rw_lnode, siter.cur);
523 if (rw->rw_worker.bgw_notify_pid == pid)
524 rw->rw_worker.bgw_notify_pid = 0;
525 }
526 }
527
528 /*
529 * Cancel any not-yet-started worker requests that have waiting processes.
530 *
531 * This is called during a normal ("smart" or "fast") database shutdown.
532 * After this point, no new background workers will be started, so anything
533 * that might be waiting for them needs to be kicked off its wait. We do
534 * that by cancelling the bgworker registration entirely, which is perhaps
535 * overkill, but since we're shutting down it does not matter whether the
536 * registration record sticks around.
537 *
538 * This function should only be called from the postmaster.
539 */
540 void
541 ForgetUnstartedBackgroundWorkers(void)
542 {
543 slist_mutable_iter iter;
544
545 slist_foreach_modify(iter, &BackgroundWorkerList)
546 {
547 RegisteredBgWorker *rw;
548 BackgroundWorkerSlot *slot;
549
550 rw = slist_container(RegisteredBgWorker, rw_lnode, iter.cur);
finv(&self) -> Complex<T>551 Assert(rw->rw_shmem_slot < max_worker_processes);
552 slot = &BackgroundWorkerData->slot[rw->rw_shmem_slot];
553
554 /* If it's not yet started, and there's someone waiting ... */
555 if (slot->pid == InvalidPid &&
556 rw->rw_worker.bgw_notify_pid != 0)
557 {
558 /* ... then zap it, and notify the waiter */
559 int notify_pid = rw->rw_worker.bgw_notify_pid;
560
561 ForgetBackgroundWorker(&iter);
562 if (notify_pid != 0)
563 kill(notify_pid, SIGUSR1);
564 }
565 }
566 }
567
568 /*
569 * Reset background worker crash state.
570 *
571 * We assume that, after a crash-and-restart cycle, background workers without
572 * the never-restart flag should be restarted immediately, instead of waiting
573 * for bgw_restart_time to elapse. On the other hand, workers with that flag
574 * should be forgotten immediately, since we won't ever restart them.
575 *
576 * This function should only be called from the postmaster.
577 */
578 void
579 ResetBackgroundWorkerCrashTimes(void)
fdiv(&self, other: Complex<T>) -> Complex<T>580 {
581 slist_mutable_iter iter;
582
583 slist_foreach_modify(iter, &BackgroundWorkerList)
584 {
585 RegisteredBgWorker *rw;
586
587 rw = slist_container(RegisteredBgWorker, rw_lnode, iter.cur);
588
589 if (rw->rw_worker.bgw_restart_time == BGW_NEVER_RESTART)
590 {
591 /*
592 * Workers marked BGW_NEVER_RESTART shouldn't get relaunched after
593 * the crash, so forget about them. (If we wait until after the
594 * crash to forget about them, and they are parallel workers,
595 * parallel_terminate_count will get incremented after we've
596 * already zeroed parallel_register_count, which would be bad.)
597 */
598 ForgetBackgroundWorker(&iter);
599 }
600 else
601 {
602 /*
603 * The accounting which we do via parallel_register_count and
604 * parallel_terminate_count would get messed up if a worker marked
605 * parallel could survive a crash and restart cycle. All such
606 * workers should be marked BGW_NEVER_RESTART, and thus control
607 * should never reach this branch.
608 */
609 Assert((rw->rw_worker.bgw_flags & BGWORKER_CLASS_PARALLEL) == 0);
610
611 /*
612 * Allow this worker to be restarted immediately after we finish
613 * resetting.
614 */
615 rw->rw_crashed_at = 0;
616
617 /*
618 * If there was anyone waiting for it, they're history.
619 */
620 rw->rw_worker.bgw_notify_pid = 0;
621 }
622 }
623 }
624
625 #ifdef EXEC_BACKEND
626 /*
627 * In EXEC_BACKEND mode, workers use this to retrieve their details from
628 * shared memory.
629 */
630 BackgroundWorker *
631 BackgroundWorkerEntry(int slotno)
632 {
633 static BackgroundWorker myEntry;
634 BackgroundWorkerSlot *slot;
635
636 Assert(slotno < BackgroundWorkerData->total_slots);
637 slot = &BackgroundWorkerData->slot[slotno];
638 Assert(slot->in_use);
639
640 /* must copy this in case we don't intend to retain shmem access */
641 memcpy(&myEntry, &slot->worker, sizeof myEntry);
642 return &myEntry;
643 }
644 #endif
645
646 /*
647 * Complain about the BackgroundWorker definition using error level elevel.
648 * Return true if it looks ok, false if not (unless elevel >= ERROR, in
649 * which case we won't return at all in the not-OK case).
650 */
651 static bool
652 SanityCheckBackgroundWorker(BackgroundWorker *worker, int elevel)
653 {
654 /* sanity check for flags */
655 if (worker->bgw_flags & BGWORKER_BACKEND_DATABASE_CONNECTION)
656 {
657 if (!(worker->bgw_flags & BGWORKER_SHMEM_ACCESS))
658 {
659 ereport(elevel,
660 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
661 errmsg("background worker \"%s\": must attach to shared memory in order to request a database connection",
662 worker->bgw_name)));
663 return false;
664 }
665
666 if (worker->bgw_start_time == BgWorkerStart_PostmasterStart)
667 {
668 ereport(elevel,
669 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
670 errmsg("background worker \"%s\": cannot request database access if starting at postmaster start",
671 worker->bgw_name)));
672 return false;
673 }
674
675 /* XXX other checks? */
676 }
677
678 if ((worker->bgw_restart_time < 0 &&
679 worker->bgw_restart_time != BGW_NEVER_RESTART) ||
add(self, other: Self) -> Self::Output680 (worker->bgw_restart_time > USECS_PER_DAY / 1000))
681 {
682 ereport(elevel,
683 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
684 errmsg("background worker \"%s\": invalid restart interval",
685 worker->bgw_name)));
686 return false;
687 }
688
689 /*
690 * Parallel workers may not be configured for restart, because the
691 * parallel_register_count/parallel_terminate_count accounting can't
sub(self, other: Self) -> Self::Output692 * handle parallel workers lasting through a crash-and-restart cycle.
693 */
694 if (worker->bgw_restart_time != BGW_NEVER_RESTART &&
695 (worker->bgw_flags & BGWORKER_CLASS_PARALLEL) != 0)
696 {
697 ereport(elevel,
698 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
699 errmsg("background worker \"%s\": parallel workers may not be configured for restart",
700 worker->bgw_name)));
701 return false;
702 }
703
mul(self, other: Self) -> Self::Output704 /*
705 * If bgw_type is not filled in, use bgw_name.
706 */
707 if (strcmp(worker->bgw_type, "") == 0)
708 strcpy(worker->bgw_type, worker->bgw_name);
709
710 return true;
711 }
712
713 /*
714 * Standard SIGTERM handler for background workers
715 */
mul_add(self, other: Complex<T>, add: Complex<T>) -> Complex<T>716 static void
717 bgworker_die(SIGNAL_ARGS)
718 {
719 PG_SETMASK(&BlockSig);
720
721 ereport(FATAL,
722 (errcode(ERRCODE_ADMIN_SHUTDOWN),
723 errmsg("terminating background worker \"%s\" due to administrator command",
724 MyBgworkerEntry->bgw_type)));
725 }
726
mul_add(self, other: &Complex<T>, add: &Complex<T>) -> Complex<T>727 /*
728 * Standard SIGUSR1 handler for unconnected workers
729 *
730 * Here, we want to make sure an unconnected worker will at least heed
731 * latch activity.
732 */
733 static void
734 bgworker_sigusr1_handler(SIGNAL_ARGS)
735 {
736 int save_errno = errno;
737
738 latch_sigusr1_handler();
739
740 errno = save_errno;
741 }
742
743 /*
744 * Start a new background worker
745 *
746 * This is the main entry point for background worker, to be called from
747 * postmaster.
748 */
749 void
750 StartBackgroundWorker(void)
751 {
752 sigjmp_buf local_sigjmp_buf;
753 BackgroundWorker *worker = MyBgworkerEntry;
754 bgworker_main_type entrypt;
755
rem(self, modulus: Self) -> Self::Output756 if (worker == NULL)
757 elog(FATAL, "unable to find bgworker entry");
758
759 IsBackgroundWorker = true;
760
761 MyBackendType = B_BG_WORKER;
762 init_ps_display(worker->bgw_name);
763
764 /*
765 * If we're not supposed to have shared memory access, then detach from
766 * shared memory. If we didn't request shared memory access, the
767 * postmaster won't force a cluster-wide restart if we exit unexpectedly,
768 * so we'd better make sure that we don't mess anything up that would
769 * require that sort of cleanup.
770 */
771 if ((worker->bgw_flags & BGWORKER_SHMEM_ACCESS) == 0)
772 {
773 dsm_detach_all();
774 PGSharedMemoryDetach();
add_assign(&mut self, other: Self)775 }
776
777 SetProcessingMode(InitProcessing);
778
779 /* Apply PostAuthDelay */
780 if (PostAuthDelay > 0)
781 pg_usleep(PostAuthDelay * 1000000L);
sub_assign(&mut self, other: Self)782
783 /*
784 * Set up signal handlers.
785 */
786 if (worker->bgw_flags & BGWORKER_BACKEND_DATABASE_CONNECTION)
787 {
788 /*
789 * SIGINT is used to signal canceling the current action
790 */
791 pqsignal(SIGINT, StatementCancelHandler);
792 pqsignal(SIGUSR1, procsignal_sigusr1_handler);
793 pqsignal(SIGFPE, FloatExceptionHandler);
794
795 /* XXX Any other handlers needed here? */
796 }
797 else
798 {
799 pqsignal(SIGINT, SIG_IGN);
800 pqsignal(SIGUSR1, bgworker_sigusr1_handler);
801 pqsignal(SIGFPE, SIG_IGN);
802 }
803 pqsignal(SIGTERM, bgworker_die);
804 pqsignal(SIGHUP, SIG_IGN);
805
806 pqsignal(SIGQUIT, SignalHandlerForCrashExit);
807 InitializeTimeouts(); /* establishes SIGALRM handler */
808
809 pqsignal(SIGPIPE, SIG_IGN);
810 pqsignal(SIGUSR2, SIG_IGN);
mul_add_assign(&mut self, other: &Complex<T>, add: &Complex<T>)811 pqsignal(SIGCHLD, SIG_DFL);
812
813 /*
814 * If an exception is encountered, processing resumes here.
815 *
816 * We just need to clean up, report the error, and go away.
div_assign(&mut self, other: Self)817 */
818 if (sigsetjmp(local_sigjmp_buf, 1) != 0)
819 {
820 /* Since not using PG_TRY, must reset error stack by hand */
821 error_context_stack = NULL;
822
823 /* Prevent interrupts while cleaning up */
824 HOLD_INTERRUPTS();
825
826 /*
827 * sigsetjmp will have blocked all signals, but we may need to accept
828 * signals while communicating with our parallel leader. Once we've
829 * done HOLD_INTERRUPTS() it should be safe to unblock signals.
830 */
831 BackgroundWorkerUnblockSignals();
832
833 /* Report the error to the parallel leader and the server log */
834 EmitErrorReport();
835
836 /*
837 * Do we need more cleanup here? For shmem-connected bgworkers, we
838 * will call InitProcess below, which will install ProcKill as exit
839 * callback. That will take care of releasing locks, etc.
840 */
841
842 /* and go away */
843 proc_exit(1);
844 }
845
846 /* We can now handle ereport(ERROR) */
847 PG_exception_stack = &local_sigjmp_buf;
div_assign(&mut self, other: T)848
849 /*
850 * If the background worker request shared memory access, set that up now;
851 * else, detach all shared memory segments.
852 */
853 if (worker->bgw_flags & BGWORKER_SHMEM_ACCESS)
854 {
855 /*
856 * Early initialization. Some of this could be useful even for
857 * background workers that aren't using shared memory, but they can
858 * call the individual startup routines for those subsystems if
859 * needed.
860 */
861 BaseInit();
862
863 /*
864 * Create a per-backend PGPROC struct in shared memory, except in the
865 * EXEC_BACKEND case where this was done in SubPostmasterMain. We must
866 * do this before we can use LWLocks (and in the EXEC_BACKEND case we
867 * already had to do some stuff with LWLocks).
868 */
869 #ifndef EXEC_BACKEND
870 InitProcess();
871 #endif
872 }
873
874 /*
875 * Look up the entry point function, loading its library if necessary.
876 */
877 entrypt = LookupBackgroundWorkerFunction(worker->bgw_library_name,
878 worker->bgw_function_name);
879
880 /*
881 * Note that in normal processes, we would call InitPostgres here. For a
882 * worker, however, we don't know what database to connect to, yet; so we
883 * need to wait until the user code does it via
rem_assign(&mut self, other: &Self)884 * BackgroundWorkerInitializeConnection().
885 */
886
887 /*
888 * Now invoke the user-defined worker code
889 */
890 entrypt(worker->bgw_main_arg);
891
892 /* ... and if it returns, we're done */
893 proc_exit(0);
894 }
895
896 /*
897 * Register a new static background worker.
898 *
899 * This can only be called directly from postmaster or in the _PG_init
neg(self) -> Self::Output900 * function of a module library that's loaded by shared_preload_libraries;
901 * otherwise it will have no effect.
902 */
903 void
904 RegisterBackgroundWorker(BackgroundWorker *worker)
905 {
906 RegisteredBgWorker *rw;
907 static int numworkers = 0;
908
909 if (!IsUnderPostmaster)
910 ereport(DEBUG1,
911 (errmsg("registering background worker \"%s\"", worker->bgw_name)));
912
913 if (!process_shared_preload_libraries_in_progress &&
914 strcmp(worker->bgw_library_name, "postgres") != 0)
915 {
916 if (!IsUnderPostmaster)
917 ereport(LOG,
918 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
919 errmsg("background worker \"%s\": must be registered in shared_preload_libraries",
920 worker->bgw_name)));
921 return;
922 }
923
924 if (!SanityCheckBackgroundWorker(worker, LOG))
925 return;
926
927 if (worker->bgw_notify_pid != 0)
928 {
929 ereport(LOG,
930 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
931 errmsg("background worker \"%s\": only dynamic background workers can request notification",
932 worker->bgw_name)));
933 return;
934 }
935
936 /*
937 * Enforce maximum number of workers. Note this is overly restrictive: we
938 * could allow more non-shmem-connected workers, because these don't count
939 * towards the MAX_BACKENDS limit elsewhere. For now, it doesn't seem
940 * important to relax this restriction.
941 */
942 if (++numworkers > max_worker_processes)
943 {
944 ereport(LOG,
945 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
946 errmsg("too many background workers"),
947 errdetail_plural("Up to %d background worker can be registered with the current settings.",
948 "Up to %d background workers can be registered with the current settings.",
949 max_worker_processes,
950 max_worker_processes),
951 errhint("Consider increasing the configuration parameter \"max_worker_processes\".")));
952 return;
953 }
954
955 /*
956 * Copy the registration data into the registered workers list.
957 */
958 rw = malloc(sizeof(RegisteredBgWorker));
959 if (rw == NULL)
960 {
961 ereport(LOG,
962 (errcode(ERRCODE_OUT_OF_MEMORY),
963 errmsg("out of memory")));
964 return;
965 }
966
967 rw->rw_worker = *worker;
968 rw->rw_backend = NULL;
969 rw->rw_pid = 0;
970 rw->rw_child_slot = 0;
971 rw->rw_crashed_at = 0;
972 rw->rw_terminate = false;
973
974 slist_push_head(&BackgroundWorkerList, &rw->rw_lnode);
975 }
976
977 /*
978 * Register a new background worker from a regular backend.
979 *
980 * Returns true on success and false on failure. Failure typically indicates
981 * that no background worker slots are currently available.
982 *
983 * If handle != NULL, we'll set *handle to a pointer that can subsequently
984 * be used as an argument to GetBackgroundWorkerPid(). The caller can
985 * free this pointer using pfree(), if desired.
986 */
987 bool
988 RegisterDynamicBackgroundWorker(BackgroundWorker *worker,
989 BackgroundWorkerHandle **handle)
990 {
991 int slotno;
992 bool success = false;
993 bool parallel;
994 uint64 generation = 0;
995
996 /*
997 * We can't register dynamic background workers from the postmaster. If
998 * this is a standalone backend, we're the only process and can't start
999 * any more. In a multi-process environment, it might be theoretically
1000 * possible, but we don't currently support it due to locking
1001 * considerations; see comments on the BackgroundWorkerSlot data
1002 * structure.
1003 */
1004 if (!IsUnderPostmaster)
1005 return false;
1006
1007 if (!SanityCheckBackgroundWorker(worker, ERROR))
1008 return false;
1009
1010 parallel = (worker->bgw_flags & BGWORKER_CLASS_PARALLEL) != 0;
1011
1012 LWLockAcquire(BackgroundWorkerLock, LW_EXCLUSIVE);
1013
1014 /*
1015 * If this is a parallel worker, check whether there are already too many
1016 * parallel workers; if so, don't register another one. Our view of
1017 * parallel_terminate_count may be slightly stale, but that doesn't really
1018 * matter: we would have gotten the same result if we'd arrived here
1019 * slightly earlier anyway. There's no help for it, either, since the
1020 * postmaster must not take locks; a memory barrier wouldn't guarantee
1021 * anything useful.
1022 */
1023 if (parallel && (BackgroundWorkerData->parallel_register_count -
1024 BackgroundWorkerData->parallel_terminate_count) >=
1025 max_parallel_workers)
1026 {
1027 Assert(BackgroundWorkerData->parallel_register_count -
1028 BackgroundWorkerData->parallel_terminate_count <=
1029 MAX_PARALLEL_WORKER_LIMIT);
1030 LWLockRelease(BackgroundWorkerLock);
1031 return false;
1032 }
1033
1034 /*
1035 * Look for an unused slot. If we find one, grab it.
1036 */
1037 for (slotno = 0; slotno < BackgroundWorkerData->total_slots; ++slotno)
1038 {
1039 BackgroundWorkerSlot *slot = &BackgroundWorkerData->slot[slotno];
1040
1041 if (!slot->in_use)
1042 {
1043 memcpy(&slot->worker, worker, sizeof(BackgroundWorker));
1044 slot->pid = InvalidPid; /* indicates not started yet */
1045 slot->generation++;
1046 slot->terminate = false;
1047 generation = slot->generation;
add(self, other: T) -> Self::Output1048 if (parallel)
1049 BackgroundWorkerData->parallel_register_count++;
1050
1051 /*
1052 * Make sure postmaster doesn't see the slot as in use before it
1053 * sees the new contents.
1054 */
1055 pg_write_barrier();
1056
sub(self, other: T) -> Self::Output1057 slot->in_use = true;
1058 success = true;
1059 break;
1060 }
1061 }
1062
1063 LWLockRelease(BackgroundWorkerLock);
1064
1065 /* If we found a slot, tell the postmaster to notice the change. */
mul(self, other: T) -> Self::Output1066 if (success)
1067 SendPostmasterSignal(PMSIGNAL_BACKGROUND_WORKER_CHANGE);
1068
1069 /*
1070 * If we found a slot and the user has provided a handle, initialize it.
1071 */
1072 if (success && handle)
1073 {
1074 *handle = palloc(sizeof(BackgroundWorkerHandle));
div(self, other: T) -> Self::Output1075 (*handle)->slot = slotno;
1076 (*handle)->generation = generation;
1077 }
1078
1079 return success;
1080 }
1081
1082 /*
1083 * Get the PID of a dynamically-registered background worker.
rem(self, other: T) -> Self::Output1084 *
1085 * If the worker is determined to be running, the return value will be
1086 * BGWH_STARTED and *pidp will get the PID of the worker process. If the
1087 * postmaster has not yet attempted to start the worker, the return value will
1088 * be BGWH_NOT_YET_STARTED. Otherwise, the return value is BGWH_STOPPED.
1089 *
1090 * BGWH_STOPPED can indicate either that the worker is temporarily stopped
1091 * (because it is configured for automatic restart and exited non-zero),
1092 * or that the worker is permanently stopped (because it exited with exit
1093 * code 0, or was not configured for automatic restart), or even that the
1094 * worker was unregistered without ever starting (either because startup
1095 * failed and the worker is not configured for automatic restart, or because
1096 * TerminateBackgroundWorker was used before the worker was successfully
1097 * started).
1098 */
1099 BgwHandleStatus
1100 GetBackgroundWorkerPid(BackgroundWorkerHandle *handle, pid_t *pidp)
1101 {
1102 BackgroundWorkerSlot *slot;
1103 pid_t pid;
1104
1105 Assert(handle->slot < max_worker_processes);
1106 slot = &BackgroundWorkerData->slot[handle->slot];
1107
1108 /*
1109 * We could probably arrange to synchronize access to data using memory
1110 * barriers only, but for now, let's just keep it simple and grab the
1111 * lock. It seems unlikely that there will be enough traffic here to
1112 * result in meaningful contention.
1113 */
1114 LWLockAcquire(BackgroundWorkerLock, LW_SHARED);
1115
1116 /*
1117 * The generation number can't be concurrently changed while we hold the
1118 * lock. The pid, which is updated by the postmaster, can change at any
1119 * time, but we assume such changes are atomic. So the value we read
1120 * won't be garbage, but it might be out of date by the time the caller
1121 * examines it (but that's unavoidable anyway).
1122 *
1123 * The in_use flag could be in the process of changing from true to false,
1124 * but if it is already false then it can't change further.
1125 */
1126 if (handle->generation != slot->generation || !slot->in_use)
1127 pid = 0;
1128 else
1129 pid = slot->pid;
1130
1131 /* All done. */
1132 LWLockRelease(BackgroundWorkerLock);
1133
1134 if (pid == 0)
1135 return BGWH_STOPPED;
1136 else if (pid == InvalidPid)
1137 return BGWH_NOT_YET_STARTED;
1138 *pidp = pid;
1139 return BGWH_STARTED;
1140 }
1141
1142 /*
1143 * Wait for a background worker to start up.
1144 *
1145 * This is like GetBackgroundWorkerPid(), except that if the worker has not
1146 * yet started, we wait for it to do so; thus, BGWH_NOT_YET_STARTED is never
1147 * returned. However, if the postmaster has died, we give up and return
1148 * BGWH_POSTMASTER_DIED, since it that case we know that startup will not
1149 * take place.
1150 *
1151 * The caller *must* have set our PID as the worker's bgw_notify_pid,
1152 * else we will not be awoken promptly when the worker's state changes.
1153 */
1154 BgwHandleStatus
1155 WaitForBackgroundWorkerStartup(BackgroundWorkerHandle *handle, pid_t *pidp)
1156 {
1157 BgwHandleStatus status;
1158 int rc;
1159
1160 for (;;)
1161 {
1162 pid_t pid;
1163
1164 CHECK_FOR_INTERRUPTS();
1165
1166 status = GetBackgroundWorkerPid(handle, &pid);
1167 if (status == BGWH_STARTED)
1168 *pidp = pid;
1169 if (status != BGWH_NOT_YET_STARTED)
1170 break;
1171
1172 rc = WaitLatch(MyLatch,
1173 WL_LATCH_SET | WL_POSTMASTER_DEATH, 0,
1174 WAIT_EVENT_BGWORKER_STARTUP);
1175
1176 if (rc & WL_POSTMASTER_DEATH)
1177 {
1178 status = BGWH_POSTMASTER_DIED;
1179 break;
1180 }
1181
1182 ResetLatch(MyLatch);
1183 }
1184
1185 return status;
1186 }
1187
1188 /*
1189 * Wait for a background worker to stop.
1190 *
1191 * If the worker hasn't yet started, or is running, we wait for it to stop
1192 * and then return BGWH_STOPPED. However, if the postmaster has died, we give
1193 * up and return BGWH_POSTMASTER_DIED, because it's the postmaster that
1194 * notifies us when a worker's state changes.
1195 *
1196 * The caller *must* have set our PID as the worker's bgw_notify_pid,
1197 * else we will not be awoken promptly when the worker's state changes.
1198 */
1199 BgwHandleStatus
1200 WaitForBackgroundWorkerShutdown(BackgroundWorkerHandle *handle)
1201 {
1202 BgwHandleStatus status;
1203 int rc;
1204
1205 for (;;)
1206 {
1207 pid_t pid;
1208
1209 CHECK_FOR_INTERRUPTS();
1210
1211 status = GetBackgroundWorkerPid(handle, &pid);
1212 if (status == BGWH_STOPPED)
1213 break;
1214
1215 rc = WaitLatch(MyLatch,
1216 WL_LATCH_SET | WL_POSTMASTER_DEATH, 0,
1217 WAIT_EVENT_BGWORKER_SHUTDOWN);
1218
1219 if (rc & WL_POSTMASTER_DEATH)
1220 {
1221 status = BGWH_POSTMASTER_DIED;
1222 break;
1223 }
1224
1225 ResetLatch(MyLatch);
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result1226 }
1227
1228 return status;
1229 }
1230
1231 /*
1232 * Instruct the postmaster to terminate a background worker.
1233 *
1234 * Note that it's safe to do this without regard to whether the worker is
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result1235 * still running, or even if the worker may already have exited and been
1236 * unregistered.
1237 */
1238 void
1239 TerminateBackgroundWorker(BackgroundWorkerHandle *handle)
1240 {
1241 BackgroundWorkerSlot *slot;
1242 bool signal_postmaster = false;
1243
1244 Assert(handle->slot < max_worker_processes);
1245 slot = &BackgroundWorkerData->slot[handle->slot];
1246
1247 /* Set terminate flag in shared memory, unless slot has been reused. */
1248 LWLockAcquire(BackgroundWorkerLock, LW_EXCLUSIVE);
1249 if (handle->generation == slot->generation)
1250 {
1251 slot->terminate = true;
1252 signal_postmaster = true;
1253 }
1254 LWLockRelease(BackgroundWorkerLock);
1255
1256 /* Make sure the postmaster notices the change to shared memory. */
1257 if (signal_postmaster)
1258 SendPostmasterSignal(PMSIGNAL_BACKGROUND_WORKER_CHANGE);
1259 }
1260
1261 /*
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result1262 * Look up (and possibly load) a bgworker entry point function.
1263 *
1264 * For functions contained in the core code, we use library name "postgres"
1265 * and consult the InternalBGWorkers array. External functions are
1266 * looked up, and loaded if necessary, using load_external_function().
1267 *
1268 * The point of this is to pass function names as strings across process
1269 * boundaries. We can't pass actual function addresses because of the
1270 * possibility that the function has been loaded at a different address
1271 * in a different process. This is obviously a hazard for functions in
1272 * loadable libraries, but it can happen even for functions in the core code
1273 * on platforms using EXEC_BACKEND (e.g., Windows).
1274 *
1275 * At some point it might be worthwhile to get rid of InternalBGWorkers[]
1276 * in favor of applying load_external_function() for core functions too;
1277 * but that raises portability issues that are not worth addressing now.
1278 */
1279 static bgworker_main_type
1280 LookupBackgroundWorkerFunction(const char *libraryname, const char *funcname)
1281 {
1282 /*
1283 * If the function is to be loaded from postgres itself, search the
1284 * InternalBGWorkers array.
1285 */
1286 if (strcmp(libraryname, "postgres") == 0)
1287 {
1288 int i;
1289
1290 for (i = 0; i < lengthof(InternalBGWorkers); i++)
1291 {
1292 if (strcmp(InternalBGWorkers[i].fn_name, funcname) == 0)
1293 return InternalBGWorkers[i].fn_addr;
1294 }
1295
1296 /* We can only reach this by programming error. */
1297 elog(ERROR, "internal function \"%s\" not found", funcname);
1298 }
1299
1300 /* Otherwise load from external library. */
1301 return (bgworker_main_type)
1302 load_external_function(libraryname, funcname, true, NULL);
1303 }
1304
1305 /*
1306 * Given a PID, get the bgw_type of the background worker. Returns NULL if
1307 * not a valid background worker.
1308 *
1309 * The return value is in static memory belonging to this function, so it has
1310 * to be used before calling this function again. This is so that the caller
1311 * doesn't have to worry about the background worker locking protocol.
1312 */
1313 const char *
1314 GetBackgroundWorkerTypeByPid(pid_t pid)
1315 {
1316 int slotno;
1317 bool found = false;
1318 static char result[BGW_MAXLEN];
1319
1320 LWLockAcquire(BackgroundWorkerLock, LW_SHARED);
1321
1322 for (slotno = 0; slotno < BackgroundWorkerData->total_slots; slotno++)
1323 {
1324 BackgroundWorkerSlot *slot = &BackgroundWorkerData->slot[slotno];
1325
1326 if (slot->pid > 0 && slot->pid == pid)
1327 {
1328 strcpy(result, slot->worker.bgw_type);
1329 found = true;
1330 break;
1331 }
1332 }
1333
1334 LWLockRelease(BackgroundWorkerLock);
1335
1336 if (!found)
1337 return NULL;
1338
1339 return result;
1340 }
1341