1 /*-------------------------------------------------------------------------
2 * launcher.c
3 * PostgreSQL logical replication worker launcher process
4 *
5 * Copyright (c) 2016-2017, PostgreSQL Global Development Group
6 *
7 * IDENTIFICATION
8 * src/backend/replication/logical/launcher.c
9 *
10 * NOTES
11 * This module contains the logical replication worker launcher which
12 * uses the background worker infrastructure to start the logical
13 * replication workers for every enabled subscription.
14 *
15 *-------------------------------------------------------------------------
16 */
17
18 #include "postgres.h"
19
20 #include "funcapi.h"
21 #include "miscadmin.h"
22 #include "pgstat.h"
23
24 #include "access/heapam.h"
25 #include "access/htup.h"
26 #include "access/htup_details.h"
27 #include "access/xact.h"
28
29 #include "catalog/pg_subscription.h"
30 #include "catalog/pg_subscription_rel.h"
31
32 #include "libpq/pqsignal.h"
33
34 #include "postmaster/bgworker.h"
35 #include "postmaster/fork_process.h"
36 #include "postmaster/postmaster.h"
37
38 #include "replication/logicallauncher.h"
39 #include "replication/logicalworker.h"
40 #include "replication/slot.h"
41 #include "replication/walreceiver.h"
42 #include "replication/worker_internal.h"
43
44 #include "storage/ipc.h"
45 #include "storage/proc.h"
46 #include "storage/procarray.h"
47 #include "storage/procsignal.h"
48
49 #include "tcop/tcopprot.h"
50
51 #include "utils/memutils.h"
52 #include "utils/pg_lsn.h"
53 #include "utils/ps_status.h"
54 #include "utils/timeout.h"
55 #include "utils/snapmgr.h"
56
57 /* max sleep time between cycles (3min) */
58 #define DEFAULT_NAPTIME_PER_CYCLE 180000L
59
60 int max_logical_replication_workers = 4;
61 int max_sync_workers_per_subscription = 2;
62
63 LogicalRepWorker *MyLogicalRepWorker = NULL;
64
65 typedef struct LogicalRepCtxStruct
66 {
67 /* Supervisor process. */
68 pid_t launcher_pid;
69
70 /* Background workers. */
71 LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER];
72 } LogicalRepCtxStruct;
73
74 LogicalRepCtxStruct *LogicalRepCtx;
75
76 typedef struct LogicalRepWorkerId
77 {
78 Oid subid;
79 Oid relid;
80 } LogicalRepWorkerId;
81
82 typedef struct StopWorkersData
83 {
84 int nestDepth; /* Sub-transaction nest level */
85 List *workers; /* List of LogicalRepWorkerId */
86 struct StopWorkersData *parent; /* This need not be an immediate
87 * subtransaction parent */
88 } StopWorkersData;
89
90 /*
91 * Stack of StopWorkersData elements. Each stack element contains the workers
92 * to be stopped for that subtransaction.
93 */
94 static StopWorkersData *on_commit_stop_workers = NULL;
95
96 static void ApplyLauncherWakeup(void);
97 static void logicalrep_launcher_onexit(int code, Datum arg);
98 static void logicalrep_worker_onexit(int code, Datum arg);
99 static void logicalrep_worker_detach(void);
100 static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
101
102 /* Flags set by signal handlers */
103 static volatile sig_atomic_t got_SIGHUP = false;
104
105 static bool on_commit_launcher_wakeup = false;
106
107 Datum pg_stat_get_subscription(PG_FUNCTION_ARGS);
108
109
110 /*
111 * Load the list of subscriptions.
112 *
113 * Only the fields interesting for worker start/stop functions are filled for
114 * each subscription.
115 */
116 static List *
get_subscription_list(void)117 get_subscription_list(void)
118 {
119 List *res = NIL;
120 Relation rel;
121 HeapScanDesc scan;
122 HeapTuple tup;
123 MemoryContext resultcxt;
124
125 /* This is the context that we will allocate our output data in */
126 resultcxt = CurrentMemoryContext;
127
128 /*
129 * Start a transaction so we can access pg_database, and get a snapshot.
130 * We don't have a use for the snapshot itself, but we're interested in
131 * the secondary effect that it sets RecentGlobalXmin. (This is critical
132 * for anything that reads heap pages, because HOT may decide to prune
133 * them even if the process doesn't attempt to modify any tuples.)
134 */
135 StartTransactionCommand();
136 (void) GetTransactionSnapshot();
137
138 rel = heap_open(SubscriptionRelationId, AccessShareLock);
139 scan = heap_beginscan_catalog(rel, 0, NULL);
140
141 while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
142 {
143 Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup);
144 Subscription *sub;
145 MemoryContext oldcxt;
146
147 /*
148 * Allocate our results in the caller's context, not the
149 * transaction's. We do this inside the loop, and restore the original
150 * context at the end, so that leaky things like heap_getnext() are
151 * not called in a potentially long-lived context.
152 */
153 oldcxt = MemoryContextSwitchTo(resultcxt);
154
155 sub = (Subscription *) palloc0(sizeof(Subscription));
156 sub->oid = HeapTupleGetOid(tup);
157 sub->dbid = subform->subdbid;
158 sub->owner = subform->subowner;
159 sub->enabled = subform->subenabled;
160 sub->name = pstrdup(NameStr(subform->subname));
161 /* We don't fill fields we are not interested in. */
162
163 res = lappend(res, sub);
164 MemoryContextSwitchTo(oldcxt);
165 }
166
167 heap_endscan(scan);
168 heap_close(rel, AccessShareLock);
169
170 CommitTransactionCommand();
171
172 return res;
173 }
174
175 /*
176 * Wait for a background worker to start up and attach to the shmem context.
177 *
178 * This is only needed for cleaning up the shared memory in case the worker
179 * fails to attach.
180 */
181 static void
WaitForReplicationWorkerAttach(LogicalRepWorker * worker,uint16 generation,BackgroundWorkerHandle * handle)182 WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
183 uint16 generation,
184 BackgroundWorkerHandle *handle)
185 {
186 BgwHandleStatus status;
187 int rc;
188
189 for (;;)
190 {
191 pid_t pid;
192
193 CHECK_FOR_INTERRUPTS();
194
195 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
196
197 /* Worker either died or has started; no need to do anything. */
198 if (!worker->in_use || worker->proc)
199 {
200 LWLockRelease(LogicalRepWorkerLock);
201 return;
202 }
203
204 LWLockRelease(LogicalRepWorkerLock);
205
206 /* Check if worker has died before attaching, and clean up after it. */
207 status = GetBackgroundWorkerPid(handle, &pid);
208
209 if (status == BGWH_STOPPED)
210 {
211 LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
212 /* Ensure that this was indeed the worker we waited for. */
213 if (generation == worker->generation)
214 logicalrep_worker_cleanup(worker);
215 LWLockRelease(LogicalRepWorkerLock);
216 return;
217 }
218
219 /*
220 * We need timeout because we generally don't get notified via latch
221 * about the worker attach. But we don't expect to have to wait long.
222 */
223 rc = WaitLatch(MyLatch,
224 WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
225 10L, WAIT_EVENT_BGWORKER_STARTUP);
226
227 /* emergency bailout if postmaster has died */
228 if (rc & WL_POSTMASTER_DEATH)
229 proc_exit(1);
230
231 if (rc & WL_LATCH_SET)
232 {
233 ResetLatch(MyLatch);
234 CHECK_FOR_INTERRUPTS();
235 }
236 }
237
238 return;
239 }
240
241 /*
242 * Walks the workers array and searches for one that matches given
243 * subscription id and relid.
244 */
245 LogicalRepWorker *
logicalrep_worker_find(Oid subid,Oid relid,bool only_running)246 logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
247 {
248 int i;
249 LogicalRepWorker *res = NULL;
250
251 Assert(LWLockHeldByMe(LogicalRepWorkerLock));
252
253 /* Search for attached worker for a given subscription id. */
254 for (i = 0; i < max_logical_replication_workers; i++)
255 {
256 LogicalRepWorker *w = &LogicalRepCtx->workers[i];
257
258 if (w->in_use && w->subid == subid && w->relid == relid &&
259 (!only_running || w->proc))
260 {
261 res = w;
262 break;
263 }
264 }
265
266 return res;
267 }
268
269 /*
270 * Similar to logicalrep_worker_find(), but returns list of all workers for
271 * the subscription, instead just one.
272 */
273 List *
logicalrep_workers_find(Oid subid,bool only_running)274 logicalrep_workers_find(Oid subid, bool only_running)
275 {
276 int i;
277 List *res = NIL;
278
279 Assert(LWLockHeldByMe(LogicalRepWorkerLock));
280
281 /* Search for attached worker for a given subscription id. */
282 for (i = 0; i < max_logical_replication_workers; i++)
283 {
284 LogicalRepWorker *w = &LogicalRepCtx->workers[i];
285
286 if (w->in_use && w->subid == subid && (!only_running || w->proc))
287 res = lappend(res, w);
288 }
289
290 return res;
291 }
292
293 /*
294 * Start new apply background worker, if possible.
295 */
296 void
logicalrep_worker_launch(Oid dbid,Oid subid,const char * subname,Oid userid,Oid relid)297 logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
298 Oid relid)
299 {
300 BackgroundWorker bgw;
301 BackgroundWorkerHandle *bgw_handle;
302 uint16 generation;
303 int i;
304 int slot = 0;
305 LogicalRepWorker *worker = NULL;
306 int nsyncworkers;
307 TimestampTz now;
308
309 ereport(DEBUG1,
310 (errmsg("starting logical replication worker for subscription \"%s\"",
311 subname)));
312
313 /* Report this after the initial starting message for consistency. */
314 if (max_replication_slots == 0)
315 ereport(ERROR,
316 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
317 errmsg("cannot start logical replication workers when max_replication_slots = 0")));
318
319 /*
320 * We need to do the modification of the shared memory under lock so that
321 * we have consistent view.
322 */
323 LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
324
325 retry:
326 /* Find unused worker slot. */
327 for (i = 0; i < max_logical_replication_workers; i++)
328 {
329 LogicalRepWorker *w = &LogicalRepCtx->workers[i];
330
331 if (!w->in_use)
332 {
333 worker = w;
334 slot = i;
335 break;
336 }
337 }
338
339 nsyncworkers = logicalrep_sync_worker_count(subid);
340
341 now = GetCurrentTimestamp();
342
343 /*
344 * If we didn't find a free slot, try to do garbage collection. The
345 * reason we do this is because if some worker failed to start up and its
346 * parent has crashed while waiting, the in_use state was never cleared.
347 */
348 if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
349 {
350 bool did_cleanup = false;
351
352 for (i = 0; i < max_logical_replication_workers; i++)
353 {
354 LogicalRepWorker *w = &LogicalRepCtx->workers[i];
355
356 /*
357 * If the worker was marked in use but didn't manage to attach in
358 * time, clean it up.
359 */
360 if (w->in_use && !w->proc &&
361 TimestampDifferenceExceeds(w->launch_time, now,
362 wal_receiver_timeout))
363 {
364 elog(WARNING,
365 "logical replication worker for subscription %u took too long to start; canceled",
366 w->subid);
367
368 logicalrep_worker_cleanup(w);
369 did_cleanup = true;
370 }
371 }
372
373 if (did_cleanup)
374 goto retry;
375 }
376
377 /*
378 * If we reached the sync worker limit per subscription, just exit
379 * silently as we might get here because of an otherwise harmless race
380 * condition.
381 */
382 if (nsyncworkers >= max_sync_workers_per_subscription)
383 {
384 LWLockRelease(LogicalRepWorkerLock);
385 return;
386 }
387
388 /*
389 * However if there are no more free worker slots, inform user about it
390 * before exiting.
391 */
392 if (worker == NULL)
393 {
394 LWLockRelease(LogicalRepWorkerLock);
395 ereport(WARNING,
396 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
397 errmsg("out of logical replication worker slots"),
398 errhint("You might need to increase max_logical_replication_workers.")));
399 return;
400 }
401
402 /* Prepare the worker slot. */
403 worker->launch_time = now;
404 worker->in_use = true;
405 worker->generation++;
406 worker->proc = NULL;
407 worker->dbid = dbid;
408 worker->userid = userid;
409 worker->subid = subid;
410 worker->relid = relid;
411 worker->relstate = SUBREL_STATE_UNKNOWN;
412 worker->relstate_lsn = InvalidXLogRecPtr;
413 worker->last_lsn = InvalidXLogRecPtr;
414 TIMESTAMP_NOBEGIN(worker->last_send_time);
415 TIMESTAMP_NOBEGIN(worker->last_recv_time);
416 worker->reply_lsn = InvalidXLogRecPtr;
417 TIMESTAMP_NOBEGIN(worker->reply_time);
418
419 /* Before releasing lock, remember generation for future identification. */
420 generation = worker->generation;
421
422 LWLockRelease(LogicalRepWorkerLock);
423
424 /* Register the new dynamic worker. */
425 memset(&bgw, 0, sizeof(bgw));
426 bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
427 BGWORKER_BACKEND_DATABASE_CONNECTION;
428 bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
429 snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres");
430 snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
431 if (OidIsValid(relid))
432 snprintf(bgw.bgw_name, BGW_MAXLEN,
433 "logical replication worker for subscription %u sync %u", subid, relid);
434 else
435 snprintf(bgw.bgw_name, BGW_MAXLEN,
436 "logical replication worker for subscription %u", subid);
437
438 bgw.bgw_restart_time = BGW_NEVER_RESTART;
439 bgw.bgw_notify_pid = MyProcPid;
440 bgw.bgw_main_arg = Int32GetDatum(slot);
441
442 if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
443 {
444 /* Failed to start worker, so clean up the worker slot. */
445 LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
446 Assert(generation == worker->generation);
447 logicalrep_worker_cleanup(worker);
448 LWLockRelease(LogicalRepWorkerLock);
449
450 ereport(WARNING,
451 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
452 errmsg("out of background worker slots"),
453 errhint("You might need to increase max_worker_processes.")));
454 return;
455 }
456
457 /* Now wait until it attaches. */
458 WaitForReplicationWorkerAttach(worker, generation, bgw_handle);
459 }
460
461 /*
462 * Stop the logical replication worker for subid/relid, if any, and wait until
463 * it detaches from the slot.
464 */
465 void
logicalrep_worker_stop(Oid subid,Oid relid)466 logicalrep_worker_stop(Oid subid, Oid relid)
467 {
468 LogicalRepWorker *worker;
469 uint16 generation;
470
471 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
472
473 worker = logicalrep_worker_find(subid, relid, false);
474
475 /* No worker, nothing to do. */
476 if (!worker)
477 {
478 LWLockRelease(LogicalRepWorkerLock);
479 return;
480 }
481
482 /*
483 * Remember which generation was our worker so we can check if what we see
484 * is still the same one.
485 */
486 generation = worker->generation;
487
488 /*
489 * If we found a worker but it does not have proc set then it is still
490 * starting up; wait for it to finish starting and then kill it.
491 */
492 while (worker->in_use && !worker->proc)
493 {
494 int rc;
495
496 LWLockRelease(LogicalRepWorkerLock);
497
498 /* Wait a bit --- we don't expect to have to wait long. */
499 rc = WaitLatch(MyLatch,
500 WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
501 10L, WAIT_EVENT_BGWORKER_STARTUP);
502
503 /* emergency bailout if postmaster has died */
504 if (rc & WL_POSTMASTER_DEATH)
505 proc_exit(1);
506
507 if (rc & WL_LATCH_SET)
508 {
509 ResetLatch(MyLatch);
510 CHECK_FOR_INTERRUPTS();
511 }
512
513 /* Recheck worker status. */
514 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
515
516 /*
517 * Check whether the worker slot is no longer used, which would mean
518 * that the worker has exited, or whether the worker generation is
519 * different, meaning that a different worker has taken the slot.
520 */
521 if (!worker->in_use || worker->generation != generation)
522 {
523 LWLockRelease(LogicalRepWorkerLock);
524 return;
525 }
526
527 /* Worker has assigned proc, so it has started. */
528 if (worker->proc)
529 break;
530 }
531
532 /* Now terminate the worker ... */
533 kill(worker->proc->pid, SIGTERM);
534
535 /* ... and wait for it to die. */
536 for (;;)
537 {
538 int rc;
539
540 /* is it gone? */
541 if (!worker->proc || worker->generation != generation)
542 break;
543
544 LWLockRelease(LogicalRepWorkerLock);
545
546 /* Wait a bit --- we don't expect to have to wait long. */
547 rc = WaitLatch(MyLatch,
548 WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
549 10L, WAIT_EVENT_BGWORKER_SHUTDOWN);
550
551 /* emergency bailout if postmaster has died */
552 if (rc & WL_POSTMASTER_DEATH)
553 proc_exit(1);
554
555 if (rc & WL_LATCH_SET)
556 {
557 ResetLatch(MyLatch);
558 CHECK_FOR_INTERRUPTS();
559 }
560
561 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
562 }
563
564 LWLockRelease(LogicalRepWorkerLock);
565 }
566
567 /*
568 * Request worker for specified sub/rel to be stopped on commit.
569 */
570 void
logicalrep_worker_stop_at_commit(Oid subid,Oid relid)571 logicalrep_worker_stop_at_commit(Oid subid, Oid relid)
572 {
573 int nestDepth = GetCurrentTransactionNestLevel();
574 LogicalRepWorkerId *wid;
575 MemoryContext oldctx;
576
577 /* Make sure we store the info in context that survives until commit. */
578 oldctx = MemoryContextSwitchTo(TopTransactionContext);
579
580 /* Check that previous transactions were properly cleaned up. */
581 Assert(on_commit_stop_workers == NULL ||
582 nestDepth >= on_commit_stop_workers->nestDepth);
583
584 /*
585 * Push a new stack element if we don't already have one for the current
586 * nestDepth.
587 */
588 if (on_commit_stop_workers == NULL ||
589 nestDepth > on_commit_stop_workers->nestDepth)
590 {
591 StopWorkersData *newdata = palloc(sizeof(StopWorkersData));
592
593 newdata->nestDepth = nestDepth;
594 newdata->workers = NIL;
595 newdata->parent = on_commit_stop_workers;
596 on_commit_stop_workers = newdata;
597 }
598
599 /*
600 * Finally add a new worker into the worker list of the current
601 * subtransaction.
602 */
603 wid = palloc(sizeof(LogicalRepWorkerId));
604 wid->subid = subid;
605 wid->relid = relid;
606 on_commit_stop_workers->workers =
607 lappend(on_commit_stop_workers->workers, wid);
608
609 MemoryContextSwitchTo(oldctx);
610 }
611
612 /*
613 * Wake up (using latch) any logical replication worker for specified sub/rel.
614 */
615 void
logicalrep_worker_wakeup(Oid subid,Oid relid)616 logicalrep_worker_wakeup(Oid subid, Oid relid)
617 {
618 LogicalRepWorker *worker;
619
620 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
621
622 worker = logicalrep_worker_find(subid, relid, true);
623
624 if (worker)
625 logicalrep_worker_wakeup_ptr(worker);
626
627 LWLockRelease(LogicalRepWorkerLock);
628 }
629
630 /*
631 * Wake up (using latch) the specified logical replication worker.
632 *
633 * Caller must hold lock, else worker->proc could change under us.
634 */
635 void
logicalrep_worker_wakeup_ptr(LogicalRepWorker * worker)636 logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
637 {
638 Assert(LWLockHeldByMe(LogicalRepWorkerLock));
639
640 SetLatch(&worker->proc->procLatch);
641 }
642
643 /*
644 * Attach to a slot.
645 */
646 void
logicalrep_worker_attach(int slot)647 logicalrep_worker_attach(int slot)
648 {
649 /* Block concurrent access. */
650 LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
651
652 Assert(slot >= 0 && slot < max_logical_replication_workers);
653 MyLogicalRepWorker = &LogicalRepCtx->workers[slot];
654
655 if (!MyLogicalRepWorker->in_use)
656 {
657 LWLockRelease(LogicalRepWorkerLock);
658 ereport(ERROR,
659 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
660 errmsg("logical replication worker slot %d is empty, cannot attach",
661 slot)));
662 }
663
664 if (MyLogicalRepWorker->proc)
665 {
666 LWLockRelease(LogicalRepWorkerLock);
667 ereport(ERROR,
668 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
669 errmsg("logical replication worker slot %d is already used by "
670 "another worker, cannot attach", slot)));
671 }
672
673 MyLogicalRepWorker->proc = MyProc;
674 before_shmem_exit(logicalrep_worker_onexit, (Datum) 0);
675
676 LWLockRelease(LogicalRepWorkerLock);
677 }
678
679 /*
680 * Detach the worker (cleans up the worker info).
681 */
682 static void
logicalrep_worker_detach(void)683 logicalrep_worker_detach(void)
684 {
685 /* Block concurrent access. */
686 LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
687
688 logicalrep_worker_cleanup(MyLogicalRepWorker);
689
690 LWLockRelease(LogicalRepWorkerLock);
691 }
692
693 /*
694 * Clean up worker info.
695 */
696 static void
logicalrep_worker_cleanup(LogicalRepWorker * worker)697 logicalrep_worker_cleanup(LogicalRepWorker *worker)
698 {
699 Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_EXCLUSIVE));
700
701 worker->in_use = false;
702 worker->proc = NULL;
703 worker->dbid = InvalidOid;
704 worker->userid = InvalidOid;
705 worker->subid = InvalidOid;
706 worker->relid = InvalidOid;
707 }
708
709 /*
710 * Cleanup function for logical replication launcher.
711 *
712 * Called on logical replication launcher exit.
713 */
714 static void
logicalrep_launcher_onexit(int code,Datum arg)715 logicalrep_launcher_onexit(int code, Datum arg)
716 {
717 LogicalRepCtx->launcher_pid = 0;
718 }
719
720 /*
721 * Cleanup function.
722 *
723 * Called on logical replication worker exit.
724 */
725 static void
logicalrep_worker_onexit(int code,Datum arg)726 logicalrep_worker_onexit(int code, Datum arg)
727 {
728 /* Disconnect gracefully from the remote side. */
729 if (LogRepWorkerWalRcvConn)
730 walrcv_disconnect(LogRepWorkerWalRcvConn);
731
732 logicalrep_worker_detach();
733
734 ApplyLauncherWakeup();
735 }
736
737 /* SIGHUP: set flag to reload configuration at next convenient time */
738 static void
logicalrep_launcher_sighup(SIGNAL_ARGS)739 logicalrep_launcher_sighup(SIGNAL_ARGS)
740 {
741 int save_errno = errno;
742
743 got_SIGHUP = true;
744
745 /* Waken anything waiting on the process latch */
746 SetLatch(MyLatch);
747
748 errno = save_errno;
749 }
750
751 /*
752 * Count the number of registered (not necessarily running) sync workers
753 * for a subscription.
754 */
755 int
logicalrep_sync_worker_count(Oid subid)756 logicalrep_sync_worker_count(Oid subid)
757 {
758 int i;
759 int res = 0;
760
761 Assert(LWLockHeldByMe(LogicalRepWorkerLock));
762
763 /* Search for attached worker for a given subscription id. */
764 for (i = 0; i < max_logical_replication_workers; i++)
765 {
766 LogicalRepWorker *w = &LogicalRepCtx->workers[i];
767
768 if (w->subid == subid && OidIsValid(w->relid))
769 res++;
770 }
771
772 return res;
773 }
774
775 /*
776 * ApplyLauncherShmemSize
777 * Compute space needed for replication launcher shared memory
778 */
779 Size
ApplyLauncherShmemSize(void)780 ApplyLauncherShmemSize(void)
781 {
782 Size size;
783
784 /*
785 * Need the fixed struct and the array of LogicalRepWorker.
786 */
787 size = sizeof(LogicalRepCtxStruct);
788 size = MAXALIGN(size);
789 size = add_size(size, mul_size(max_logical_replication_workers,
790 sizeof(LogicalRepWorker)));
791 return size;
792 }
793
794 /*
795 * ApplyLauncherRegister
796 * Register a background worker running the logical replication launcher.
797 */
798 void
ApplyLauncherRegister(void)799 ApplyLauncherRegister(void)
800 {
801 BackgroundWorker bgw;
802
803 if (max_logical_replication_workers == 0)
804 return;
805
806 memset(&bgw, 0, sizeof(bgw));
807 bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
808 BGWORKER_BACKEND_DATABASE_CONNECTION;
809 bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
810 snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres");
811 snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyLauncherMain");
812 snprintf(bgw.bgw_name, BGW_MAXLEN,
813 "logical replication launcher");
814 bgw.bgw_restart_time = 5;
815 bgw.bgw_notify_pid = 0;
816 bgw.bgw_main_arg = (Datum) 0;
817
818 RegisterBackgroundWorker(&bgw);
819 }
820
821 /*
822 * ApplyLauncherShmemInit
823 * Allocate and initialize replication launcher shared memory
824 */
825 void
ApplyLauncherShmemInit(void)826 ApplyLauncherShmemInit(void)
827 {
828 bool found;
829
830 LogicalRepCtx = (LogicalRepCtxStruct *)
831 ShmemInitStruct("Logical Replication Launcher Data",
832 ApplyLauncherShmemSize(),
833 &found);
834
835 if (!found)
836 {
837 int slot;
838
839 memset(LogicalRepCtx, 0, ApplyLauncherShmemSize());
840
841 /* Initialize memory and spin locks for each worker slot. */
842 for (slot = 0; slot < max_logical_replication_workers; slot++)
843 {
844 LogicalRepWorker *worker = &LogicalRepCtx->workers[slot];
845
846 memset(worker, 0, sizeof(LogicalRepWorker));
847 SpinLockInit(&worker->relmutex);
848 }
849 }
850 }
851
852 /*
853 * Check whether current transaction has manipulated logical replication
854 * workers.
855 */
856 bool
XactManipulatesLogicalReplicationWorkers(void)857 XactManipulatesLogicalReplicationWorkers(void)
858 {
859 return (on_commit_stop_workers != NULL);
860 }
861
862 /*
863 * Wakeup the launcher on commit if requested.
864 */
865 void
AtEOXact_ApplyLauncher(bool isCommit)866 AtEOXact_ApplyLauncher(bool isCommit)
867 {
868
869 Assert(on_commit_stop_workers == NULL ||
870 (on_commit_stop_workers->nestDepth == 1 &&
871 on_commit_stop_workers->parent == NULL));
872
873 if (isCommit)
874 {
875 ListCell *lc;
876
877 if (on_commit_stop_workers != NULL)
878 {
879 List *workers = on_commit_stop_workers->workers;
880
881 foreach(lc, workers)
882 {
883 LogicalRepWorkerId *wid = lfirst(lc);
884
885 logicalrep_worker_stop(wid->subid, wid->relid);
886 }
887 }
888
889 if (on_commit_launcher_wakeup)
890 ApplyLauncherWakeup();
891 }
892
893 /*
894 * No need to pfree on_commit_stop_workers. It was allocated in
895 * transaction memory context, which is going to be cleaned soon.
896 */
897 on_commit_stop_workers = NULL;
898 on_commit_launcher_wakeup = false;
899 }
900
901 /*
902 * On commit, merge the current on_commit_stop_workers list into the
903 * immediate parent, if present.
904 * On rollback, discard the current on_commit_stop_workers list.
905 * Pop out the stack.
906 */
907 void
AtEOSubXact_ApplyLauncher(bool isCommit,int nestDepth)908 AtEOSubXact_ApplyLauncher(bool isCommit, int nestDepth)
909 {
910 StopWorkersData *parent;
911
912 /* Exit immediately if there's no work to do at this level. */
913 if (on_commit_stop_workers == NULL ||
914 on_commit_stop_workers->nestDepth < nestDepth)
915 return;
916
917 Assert(on_commit_stop_workers->nestDepth == nestDepth);
918
919 parent = on_commit_stop_workers->parent;
920
921 if (isCommit)
922 {
923 /*
924 * If the upper stack element is not an immediate parent
925 * subtransaction, just decrement the notional nesting depth without
926 * doing any real work. Else, we need to merge the current workers
927 * list into the parent.
928 */
929 if (!parent || parent->nestDepth < nestDepth - 1)
930 {
931 on_commit_stop_workers->nestDepth--;
932 return;
933 }
934
935 parent->workers =
936 list_concat(parent->workers, on_commit_stop_workers->workers);
937 }
938 else
939 {
940 /*
941 * Abandon everything that was done at this nesting level. Explicitly
942 * free memory to avoid a transaction-lifespan leak.
943 */
944 list_free_deep(on_commit_stop_workers->workers);
945 }
946
947 /*
948 * We have taken care of the current subtransaction workers list for both
949 * abort or commit. So we are ready to pop the stack.
950 */
951 pfree(on_commit_stop_workers);
952 on_commit_stop_workers = parent;
953 }
954
955 /*
956 * Request wakeup of the launcher on commit of the transaction.
957 *
958 * This is used to send launcher signal to stop sleeping and process the
959 * subscriptions when current transaction commits. Should be used when new
960 * tuple was added to the pg_subscription catalog.
961 */
962 void
ApplyLauncherWakeupAtCommit(void)963 ApplyLauncherWakeupAtCommit(void)
964 {
965 if (!on_commit_launcher_wakeup)
966 on_commit_launcher_wakeup = true;
967 }
968
969 static void
ApplyLauncherWakeup(void)970 ApplyLauncherWakeup(void)
971 {
972 if (LogicalRepCtx->launcher_pid != 0)
973 kill(LogicalRepCtx->launcher_pid, SIGUSR1);
974 }
975
976 /*
977 * Main loop for the apply launcher process.
978 */
979 void
ApplyLauncherMain(Datum main_arg)980 ApplyLauncherMain(Datum main_arg)
981 {
982 TimestampTz last_start_time = 0;
983
984 ereport(DEBUG1,
985 (errmsg("logical replication launcher started")));
986
987 before_shmem_exit(logicalrep_launcher_onexit, (Datum) 0);
988
989 Assert(LogicalRepCtx->launcher_pid == 0);
990 LogicalRepCtx->launcher_pid = MyProcPid;
991
992 /* Establish signal handlers. */
993 pqsignal(SIGHUP, logicalrep_launcher_sighup);
994 pqsignal(SIGTERM, die);
995 BackgroundWorkerUnblockSignals();
996
997 /*
998 * Establish connection to nailed catalogs (we only ever access
999 * pg_subscription).
1000 */
1001 BackgroundWorkerInitializeConnection(NULL, NULL);
1002
1003 /* Enter main loop */
1004 for (;;)
1005 {
1006 int rc;
1007 List *sublist;
1008 ListCell *lc;
1009 MemoryContext subctx;
1010 MemoryContext oldctx;
1011 TimestampTz now;
1012 long wait_time = DEFAULT_NAPTIME_PER_CYCLE;
1013
1014 CHECK_FOR_INTERRUPTS();
1015
1016 now = GetCurrentTimestamp();
1017
1018 /* Limit the start retry to once a wal_retrieve_retry_interval */
1019 if (TimestampDifferenceExceeds(last_start_time, now,
1020 wal_retrieve_retry_interval))
1021 {
1022 /* Use temporary context for the database list and worker info. */
1023 subctx = AllocSetContextCreate(TopMemoryContext,
1024 "Logical Replication Launcher sublist",
1025 ALLOCSET_DEFAULT_MINSIZE,
1026 ALLOCSET_DEFAULT_INITSIZE,
1027 ALLOCSET_DEFAULT_MAXSIZE);
1028 oldctx = MemoryContextSwitchTo(subctx);
1029
1030 /* search for subscriptions to start or stop. */
1031 sublist = get_subscription_list();
1032
1033 /* Start the missing workers for enabled subscriptions. */
1034 foreach(lc, sublist)
1035 {
1036 Subscription *sub = (Subscription *) lfirst(lc);
1037 LogicalRepWorker *w;
1038
1039 if (!sub->enabled)
1040 continue;
1041
1042 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1043 w = logicalrep_worker_find(sub->oid, InvalidOid, false);
1044 LWLockRelease(LogicalRepWorkerLock);
1045
1046 if (w == NULL)
1047 {
1048 last_start_time = now;
1049 wait_time = wal_retrieve_retry_interval;
1050
1051 logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
1052 sub->owner, InvalidOid);
1053 }
1054 }
1055
1056 /* Switch back to original memory context. */
1057 MemoryContextSwitchTo(oldctx);
1058 /* Clean the temporary memory. */
1059 MemoryContextDelete(subctx);
1060 }
1061 else
1062 {
1063 /*
1064 * The wait in previous cycle was interrupted in less than
1065 * wal_retrieve_retry_interval since last worker was started, this
1066 * usually means crash of the worker, so we should retry in
1067 * wal_retrieve_retry_interval again.
1068 */
1069 wait_time = wal_retrieve_retry_interval;
1070 }
1071
1072 /* Wait for more work. */
1073 rc = WaitLatch(MyLatch,
1074 WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
1075 wait_time,
1076 WAIT_EVENT_LOGICAL_LAUNCHER_MAIN);
1077
1078 /* emergency bailout if postmaster has died */
1079 if (rc & WL_POSTMASTER_DEATH)
1080 proc_exit(1);
1081
1082 if (rc & WL_LATCH_SET)
1083 {
1084 ResetLatch(MyLatch);
1085 CHECK_FOR_INTERRUPTS();
1086 }
1087
1088 if (got_SIGHUP)
1089 {
1090 got_SIGHUP = false;
1091 ProcessConfigFile(PGC_SIGHUP);
1092 }
1093 }
1094
1095 /* Not reachable */
1096 }
1097
1098 /*
1099 * Is current process the logical replication launcher?
1100 */
1101 bool
IsLogicalLauncher(void)1102 IsLogicalLauncher(void)
1103 {
1104 return LogicalRepCtx->launcher_pid == MyProcPid;
1105 }
1106
1107 /*
1108 * Returns state of the subscriptions.
1109 */
1110 Datum
pg_stat_get_subscription(PG_FUNCTION_ARGS)1111 pg_stat_get_subscription(PG_FUNCTION_ARGS)
1112 {
1113 #define PG_STAT_GET_SUBSCRIPTION_COLS 8
1114 Oid subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
1115 int i;
1116 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1117 TupleDesc tupdesc;
1118 Tuplestorestate *tupstore;
1119 MemoryContext per_query_ctx;
1120 MemoryContext oldcontext;
1121
1122 /* check to see if caller supports us returning a tuplestore */
1123 if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
1124 ereport(ERROR,
1125 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1126 errmsg("set-valued function called in context that cannot accept a set")));
1127 if (!(rsinfo->allowedModes & SFRM_Materialize))
1128 ereport(ERROR,
1129 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1130 errmsg("materialize mode required, but it is not " \
1131 "allowed in this context")));
1132
1133 /* Build a tuple descriptor for our result type */
1134 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1135 elog(ERROR, "return type must be a row type");
1136
1137 per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
1138 oldcontext = MemoryContextSwitchTo(per_query_ctx);
1139
1140 tupstore = tuplestore_begin_heap(true, false, work_mem);
1141 rsinfo->returnMode = SFRM_Materialize;
1142 rsinfo->setResult = tupstore;
1143 rsinfo->setDesc = tupdesc;
1144
1145 MemoryContextSwitchTo(oldcontext);
1146
1147 /* Make sure we get consistent view of the workers. */
1148 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1149
1150 for (i = 0; i <= max_logical_replication_workers; i++)
1151 {
1152 /* for each row */
1153 Datum values[PG_STAT_GET_SUBSCRIPTION_COLS];
1154 bool nulls[PG_STAT_GET_SUBSCRIPTION_COLS];
1155 int worker_pid;
1156 LogicalRepWorker worker;
1157
1158 memcpy(&worker, &LogicalRepCtx->workers[i],
1159 sizeof(LogicalRepWorker));
1160 if (!worker.proc || !IsBackendPid(worker.proc->pid))
1161 continue;
1162
1163 if (OidIsValid(subid) && worker.subid != subid)
1164 continue;
1165
1166 worker_pid = worker.proc->pid;
1167
1168 MemSet(values, 0, sizeof(values));
1169 MemSet(nulls, 0, sizeof(nulls));
1170
1171 values[0] = ObjectIdGetDatum(worker.subid);
1172 if (OidIsValid(worker.relid))
1173 values[1] = ObjectIdGetDatum(worker.relid);
1174 else
1175 nulls[1] = true;
1176 values[2] = Int32GetDatum(worker_pid);
1177 if (XLogRecPtrIsInvalid(worker.last_lsn))
1178 nulls[3] = true;
1179 else
1180 values[3] = LSNGetDatum(worker.last_lsn);
1181 if (worker.last_send_time == 0)
1182 nulls[4] = true;
1183 else
1184 values[4] = TimestampTzGetDatum(worker.last_send_time);
1185 if (worker.last_recv_time == 0)
1186 nulls[5] = true;
1187 else
1188 values[5] = TimestampTzGetDatum(worker.last_recv_time);
1189 if (XLogRecPtrIsInvalid(worker.reply_lsn))
1190 nulls[6] = true;
1191 else
1192 values[6] = LSNGetDatum(worker.reply_lsn);
1193 if (worker.reply_time == 0)
1194 nulls[7] = true;
1195 else
1196 values[7] = TimestampTzGetDatum(worker.reply_time);
1197
1198 tuplestore_putvalues(tupstore, tupdesc, values, nulls);
1199
1200 /*
1201 * If only a single subscription was requested, and we found it,
1202 * break.
1203 */
1204 if (OidIsValid(subid))
1205 break;
1206 }
1207
1208 LWLockRelease(LogicalRepWorkerLock);
1209
1210 /* clean up and return the tuplestore */
1211 tuplestore_donestoring(tupstore);
1212
1213 return (Datum) 0;
1214 }
1215