1 /*-------------------------------------------------------------------------
2  * launcher.c
3  *	   PostgreSQL logical replication worker launcher process
4  *
5  * Copyright (c) 2016-2017, PostgreSQL Global Development Group
6  *
7  * IDENTIFICATION
8  *	  src/backend/replication/logical/launcher.c
9  *
10  * NOTES
11  *	  This module contains the logical replication worker launcher which
12  *	  uses the background worker infrastructure to start the logical
13  *	  replication workers for every enabled subscription.
14  *
15  *-------------------------------------------------------------------------
16  */
17 
18 #include "postgres.h"
19 
20 #include "funcapi.h"
21 #include "miscadmin.h"
22 #include "pgstat.h"
23 
24 #include "access/heapam.h"
25 #include "access/htup.h"
26 #include "access/htup_details.h"
27 #include "access/xact.h"
28 
29 #include "catalog/pg_subscription.h"
30 #include "catalog/pg_subscription_rel.h"
31 
32 #include "libpq/pqsignal.h"
33 
34 #include "postmaster/bgworker.h"
35 #include "postmaster/fork_process.h"
36 #include "postmaster/postmaster.h"
37 
38 #include "replication/logicallauncher.h"
39 #include "replication/logicalworker.h"
40 #include "replication/slot.h"
41 #include "replication/walreceiver.h"
42 #include "replication/worker_internal.h"
43 
44 #include "storage/ipc.h"
45 #include "storage/proc.h"
46 #include "storage/procarray.h"
47 #include "storage/procsignal.h"
48 
49 #include "tcop/tcopprot.h"
50 
51 #include "utils/memutils.h"
52 #include "utils/pg_lsn.h"
53 #include "utils/ps_status.h"
54 #include "utils/timeout.h"
55 #include "utils/snapmgr.h"
56 
57 /* max sleep time between cycles (3min) */
58 #define DEFAULT_NAPTIME_PER_CYCLE 180000L
59 
60 int			max_logical_replication_workers = 4;
61 int			max_sync_workers_per_subscription = 2;
62 
63 LogicalRepWorker *MyLogicalRepWorker = NULL;
64 
65 typedef struct LogicalRepCtxStruct
66 {
67 	/* Supervisor process. */
68 	pid_t		launcher_pid;
69 
70 	/* Background workers. */
71 	LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER];
72 } LogicalRepCtxStruct;
73 
74 LogicalRepCtxStruct *LogicalRepCtx;
75 
76 typedef struct LogicalRepWorkerId
77 {
78 	Oid			subid;
79 	Oid			relid;
80 } LogicalRepWorkerId;
81 
82 typedef struct StopWorkersData
83 {
84 	int			nestDepth;		/* Sub-transaction nest level */
85 	List	   *workers;		/* List of LogicalRepWorkerId */
86 	struct StopWorkersData *parent; /* This need not be an immediate
87 									 * subtransaction parent */
88 } StopWorkersData;
89 
90 /*
91  * Stack of StopWorkersData elements. Each stack element contains the workers
92  * to be stopped for that subtransaction.
93  */
94 static StopWorkersData *on_commit_stop_workers = NULL;
95 
96 static void ApplyLauncherWakeup(void);
97 static void logicalrep_launcher_onexit(int code, Datum arg);
98 static void logicalrep_worker_onexit(int code, Datum arg);
99 static void logicalrep_worker_detach(void);
100 static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
101 
102 /* Flags set by signal handlers */
103 static volatile sig_atomic_t got_SIGHUP = false;
104 
105 static bool on_commit_launcher_wakeup = false;
106 
107 Datum		pg_stat_get_subscription(PG_FUNCTION_ARGS);
108 
109 
110 /*
111  * Load the list of subscriptions.
112  *
113  * Only the fields interesting for worker start/stop functions are filled for
114  * each subscription.
115  */
116 static List *
get_subscription_list(void)117 get_subscription_list(void)
118 {
119 	List	   *res = NIL;
120 	Relation	rel;
121 	HeapScanDesc scan;
122 	HeapTuple	tup;
123 	MemoryContext resultcxt;
124 
125 	/* This is the context that we will allocate our output data in */
126 	resultcxt = CurrentMemoryContext;
127 
128 	/*
129 	 * Start a transaction so we can access pg_database, and get a snapshot.
130 	 * We don't have a use for the snapshot itself, but we're interested in
131 	 * the secondary effect that it sets RecentGlobalXmin.  (This is critical
132 	 * for anything that reads heap pages, because HOT may decide to prune
133 	 * them even if the process doesn't attempt to modify any tuples.)
134 	 */
135 	StartTransactionCommand();
136 	(void) GetTransactionSnapshot();
137 
138 	rel = heap_open(SubscriptionRelationId, AccessShareLock);
139 	scan = heap_beginscan_catalog(rel, 0, NULL);
140 
141 	while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
142 	{
143 		Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup);
144 		Subscription *sub;
145 		MemoryContext oldcxt;
146 
147 		/*
148 		 * Allocate our results in the caller's context, not the
149 		 * transaction's. We do this inside the loop, and restore the original
150 		 * context at the end, so that leaky things like heap_getnext() are
151 		 * not called in a potentially long-lived context.
152 		 */
153 		oldcxt = MemoryContextSwitchTo(resultcxt);
154 
155 		sub = (Subscription *) palloc0(sizeof(Subscription));
156 		sub->oid = HeapTupleGetOid(tup);
157 		sub->dbid = subform->subdbid;
158 		sub->owner = subform->subowner;
159 		sub->enabled = subform->subenabled;
160 		sub->name = pstrdup(NameStr(subform->subname));
161 		/* We don't fill fields we are not interested in. */
162 
163 		res = lappend(res, sub);
164 		MemoryContextSwitchTo(oldcxt);
165 	}
166 
167 	heap_endscan(scan);
168 	heap_close(rel, AccessShareLock);
169 
170 	CommitTransactionCommand();
171 
172 	return res;
173 }
174 
175 /*
176  * Wait for a background worker to start up and attach to the shmem context.
177  *
178  * This is only needed for cleaning up the shared memory in case the worker
179  * fails to attach.
180  */
181 static void
WaitForReplicationWorkerAttach(LogicalRepWorker * worker,uint16 generation,BackgroundWorkerHandle * handle)182 WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
183 							   uint16 generation,
184 							   BackgroundWorkerHandle *handle)
185 {
186 	BgwHandleStatus status;
187 	int			rc;
188 
189 	for (;;)
190 	{
191 		pid_t		pid;
192 
193 		CHECK_FOR_INTERRUPTS();
194 
195 		LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
196 
197 		/* Worker either died or has started; no need to do anything. */
198 		if (!worker->in_use || worker->proc)
199 		{
200 			LWLockRelease(LogicalRepWorkerLock);
201 			return;
202 		}
203 
204 		LWLockRelease(LogicalRepWorkerLock);
205 
206 		/* Check if worker has died before attaching, and clean up after it. */
207 		status = GetBackgroundWorkerPid(handle, &pid);
208 
209 		if (status == BGWH_STOPPED)
210 		{
211 			LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
212 			/* Ensure that this was indeed the worker we waited for. */
213 			if (generation == worker->generation)
214 				logicalrep_worker_cleanup(worker);
215 			LWLockRelease(LogicalRepWorkerLock);
216 			return;
217 		}
218 
219 		/*
220 		 * We need timeout because we generally don't get notified via latch
221 		 * about the worker attach.  But we don't expect to have to wait long.
222 		 */
223 		rc = WaitLatch(MyLatch,
224 					   WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
225 					   10L, WAIT_EVENT_BGWORKER_STARTUP);
226 
227 		/* emergency bailout if postmaster has died */
228 		if (rc & WL_POSTMASTER_DEATH)
229 			proc_exit(1);
230 
231 		if (rc & WL_LATCH_SET)
232 		{
233 			ResetLatch(MyLatch);
234 			CHECK_FOR_INTERRUPTS();
235 		}
236 	}
237 
238 	return;
239 }
240 
241 /*
242  * Walks the workers array and searches for one that matches given
243  * subscription id and relid.
244  */
245 LogicalRepWorker *
logicalrep_worker_find(Oid subid,Oid relid,bool only_running)246 logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
247 {
248 	int			i;
249 	LogicalRepWorker *res = NULL;
250 
251 	Assert(LWLockHeldByMe(LogicalRepWorkerLock));
252 
253 	/* Search for attached worker for a given subscription id. */
254 	for (i = 0; i < max_logical_replication_workers; i++)
255 	{
256 		LogicalRepWorker *w = &LogicalRepCtx->workers[i];
257 
258 		if (w->in_use && w->subid == subid && w->relid == relid &&
259 			(!only_running || w->proc))
260 		{
261 			res = w;
262 			break;
263 		}
264 	}
265 
266 	return res;
267 }
268 
269 /*
270  * Similar to logicalrep_worker_find(), but returns list of all workers for
271  * the subscription, instead just one.
272  */
273 List *
logicalrep_workers_find(Oid subid,bool only_running)274 logicalrep_workers_find(Oid subid, bool only_running)
275 {
276 	int			i;
277 	List	   *res = NIL;
278 
279 	Assert(LWLockHeldByMe(LogicalRepWorkerLock));
280 
281 	/* Search for attached worker for a given subscription id. */
282 	for (i = 0; i < max_logical_replication_workers; i++)
283 	{
284 		LogicalRepWorker *w = &LogicalRepCtx->workers[i];
285 
286 		if (w->in_use && w->subid == subid && (!only_running || w->proc))
287 			res = lappend(res, w);
288 	}
289 
290 	return res;
291 }
292 
293 /*
294  * Start new apply background worker, if possible.
295  */
296 void
logicalrep_worker_launch(Oid dbid,Oid subid,const char * subname,Oid userid,Oid relid)297 logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
298 						 Oid relid)
299 {
300 	BackgroundWorker bgw;
301 	BackgroundWorkerHandle *bgw_handle;
302 	uint16		generation;
303 	int			i;
304 	int			slot = 0;
305 	LogicalRepWorker *worker = NULL;
306 	int			nsyncworkers;
307 	TimestampTz now;
308 
309 	ereport(DEBUG1,
310 			(errmsg("starting logical replication worker for subscription \"%s\"",
311 					subname)));
312 
313 	/* Report this after the initial starting message for consistency. */
314 	if (max_replication_slots == 0)
315 		ereport(ERROR,
316 				(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
317 				 errmsg("cannot start logical replication workers when max_replication_slots = 0")));
318 
319 	/*
320 	 * We need to do the modification of the shared memory under lock so that
321 	 * we have consistent view.
322 	 */
323 	LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
324 
325 retry:
326 	/* Find unused worker slot. */
327 	for (i = 0; i < max_logical_replication_workers; i++)
328 	{
329 		LogicalRepWorker *w = &LogicalRepCtx->workers[i];
330 
331 		if (!w->in_use)
332 		{
333 			worker = w;
334 			slot = i;
335 			break;
336 		}
337 	}
338 
339 	nsyncworkers = logicalrep_sync_worker_count(subid);
340 
341 	now = GetCurrentTimestamp();
342 
343 	/*
344 	 * If we didn't find a free slot, try to do garbage collection.  The
345 	 * reason we do this is because if some worker failed to start up and its
346 	 * parent has crashed while waiting, the in_use state was never cleared.
347 	 */
348 	if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
349 	{
350 		bool		did_cleanup = false;
351 
352 		for (i = 0; i < max_logical_replication_workers; i++)
353 		{
354 			LogicalRepWorker *w = &LogicalRepCtx->workers[i];
355 
356 			/*
357 			 * If the worker was marked in use but didn't manage to attach in
358 			 * time, clean it up.
359 			 */
360 			if (w->in_use && !w->proc &&
361 				TimestampDifferenceExceeds(w->launch_time, now,
362 										   wal_receiver_timeout))
363 			{
364 				elog(WARNING,
365 					 "logical replication worker for subscription %u took too long to start; canceled",
366 					 w->subid);
367 
368 				logicalrep_worker_cleanup(w);
369 				did_cleanup = true;
370 			}
371 		}
372 
373 		if (did_cleanup)
374 			goto retry;
375 	}
376 
377 	/*
378 	 * If we reached the sync worker limit per subscription, just exit
379 	 * silently as we might get here because of an otherwise harmless race
380 	 * condition.
381 	 */
382 	if (nsyncworkers >= max_sync_workers_per_subscription)
383 	{
384 		LWLockRelease(LogicalRepWorkerLock);
385 		return;
386 	}
387 
388 	/*
389 	 * However if there are no more free worker slots, inform user about it
390 	 * before exiting.
391 	 */
392 	if (worker == NULL)
393 	{
394 		LWLockRelease(LogicalRepWorkerLock);
395 		ereport(WARNING,
396 				(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
397 				 errmsg("out of logical replication worker slots"),
398 				 errhint("You might need to increase max_logical_replication_workers.")));
399 		return;
400 	}
401 
402 	/* Prepare the worker slot. */
403 	worker->launch_time = now;
404 	worker->in_use = true;
405 	worker->generation++;
406 	worker->proc = NULL;
407 	worker->dbid = dbid;
408 	worker->userid = userid;
409 	worker->subid = subid;
410 	worker->relid = relid;
411 	worker->relstate = SUBREL_STATE_UNKNOWN;
412 	worker->relstate_lsn = InvalidXLogRecPtr;
413 	worker->last_lsn = InvalidXLogRecPtr;
414 	TIMESTAMP_NOBEGIN(worker->last_send_time);
415 	TIMESTAMP_NOBEGIN(worker->last_recv_time);
416 	worker->reply_lsn = InvalidXLogRecPtr;
417 	TIMESTAMP_NOBEGIN(worker->reply_time);
418 
419 	/* Before releasing lock, remember generation for future identification. */
420 	generation = worker->generation;
421 
422 	LWLockRelease(LogicalRepWorkerLock);
423 
424 	/* Register the new dynamic worker. */
425 	memset(&bgw, 0, sizeof(bgw));
426 	bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
427 		BGWORKER_BACKEND_DATABASE_CONNECTION;
428 	bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
429 	snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres");
430 	snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
431 	if (OidIsValid(relid))
432 		snprintf(bgw.bgw_name, BGW_MAXLEN,
433 				 "logical replication worker for subscription %u sync %u", subid, relid);
434 	else
435 		snprintf(bgw.bgw_name, BGW_MAXLEN,
436 				 "logical replication worker for subscription %u", subid);
437 
438 	bgw.bgw_restart_time = BGW_NEVER_RESTART;
439 	bgw.bgw_notify_pid = MyProcPid;
440 	bgw.bgw_main_arg = Int32GetDatum(slot);
441 
442 	if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
443 	{
444 		/* Failed to start worker, so clean up the worker slot. */
445 		LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
446 		Assert(generation == worker->generation);
447 		logicalrep_worker_cleanup(worker);
448 		LWLockRelease(LogicalRepWorkerLock);
449 
450 		ereport(WARNING,
451 				(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
452 				 errmsg("out of background worker slots"),
453 				 errhint("You might need to increase max_worker_processes.")));
454 		return;
455 	}
456 
457 	/* Now wait until it attaches. */
458 	WaitForReplicationWorkerAttach(worker, generation, bgw_handle);
459 }
460 
461 /*
462  * Stop the logical replication worker for subid/relid, if any, and wait until
463  * it detaches from the slot.
464  */
465 void
logicalrep_worker_stop(Oid subid,Oid relid)466 logicalrep_worker_stop(Oid subid, Oid relid)
467 {
468 	LogicalRepWorker *worker;
469 	uint16		generation;
470 
471 	LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
472 
473 	worker = logicalrep_worker_find(subid, relid, false);
474 
475 	/* No worker, nothing to do. */
476 	if (!worker)
477 	{
478 		LWLockRelease(LogicalRepWorkerLock);
479 		return;
480 	}
481 
482 	/*
483 	 * Remember which generation was our worker so we can check if what we see
484 	 * is still the same one.
485 	 */
486 	generation = worker->generation;
487 
488 	/*
489 	 * If we found a worker but it does not have proc set then it is still
490 	 * starting up; wait for it to finish starting and then kill it.
491 	 */
492 	while (worker->in_use && !worker->proc)
493 	{
494 		int			rc;
495 
496 		LWLockRelease(LogicalRepWorkerLock);
497 
498 		/* Wait a bit --- we don't expect to have to wait long. */
499 		rc = WaitLatch(MyLatch,
500 					   WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
501 					   10L, WAIT_EVENT_BGWORKER_STARTUP);
502 
503 		/* emergency bailout if postmaster has died */
504 		if (rc & WL_POSTMASTER_DEATH)
505 			proc_exit(1);
506 
507 		if (rc & WL_LATCH_SET)
508 		{
509 			ResetLatch(MyLatch);
510 			CHECK_FOR_INTERRUPTS();
511 		}
512 
513 		/* Recheck worker status. */
514 		LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
515 
516 		/*
517 		 * Check whether the worker slot is no longer used, which would mean
518 		 * that the worker has exited, or whether the worker generation is
519 		 * different, meaning that a different worker has taken the slot.
520 		 */
521 		if (!worker->in_use || worker->generation != generation)
522 		{
523 			LWLockRelease(LogicalRepWorkerLock);
524 			return;
525 		}
526 
527 		/* Worker has assigned proc, so it has started. */
528 		if (worker->proc)
529 			break;
530 	}
531 
532 	/* Now terminate the worker ... */
533 	kill(worker->proc->pid, SIGTERM);
534 
535 	/* ... and wait for it to die. */
536 	for (;;)
537 	{
538 		int			rc;
539 
540 		/* is it gone? */
541 		if (!worker->proc || worker->generation != generation)
542 			break;
543 
544 		LWLockRelease(LogicalRepWorkerLock);
545 
546 		/* Wait a bit --- we don't expect to have to wait long. */
547 		rc = WaitLatch(MyLatch,
548 					   WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
549 					   10L, WAIT_EVENT_BGWORKER_SHUTDOWN);
550 
551 		/* emergency bailout if postmaster has died */
552 		if (rc & WL_POSTMASTER_DEATH)
553 			proc_exit(1);
554 
555 		if (rc & WL_LATCH_SET)
556 		{
557 			ResetLatch(MyLatch);
558 			CHECK_FOR_INTERRUPTS();
559 		}
560 
561 		LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
562 	}
563 
564 	LWLockRelease(LogicalRepWorkerLock);
565 }
566 
567 /*
568  * Request worker for specified sub/rel to be stopped on commit.
569  */
570 void
logicalrep_worker_stop_at_commit(Oid subid,Oid relid)571 logicalrep_worker_stop_at_commit(Oid subid, Oid relid)
572 {
573 	int			nestDepth = GetCurrentTransactionNestLevel();
574 	LogicalRepWorkerId *wid;
575 	MemoryContext oldctx;
576 
577 	/* Make sure we store the info in context that survives until commit. */
578 	oldctx = MemoryContextSwitchTo(TopTransactionContext);
579 
580 	/* Check that previous transactions were properly cleaned up. */
581 	Assert(on_commit_stop_workers == NULL ||
582 		   nestDepth >= on_commit_stop_workers->nestDepth);
583 
584 	/*
585 	 * Push a new stack element if we don't already have one for the current
586 	 * nestDepth.
587 	 */
588 	if (on_commit_stop_workers == NULL ||
589 		nestDepth > on_commit_stop_workers->nestDepth)
590 	{
591 		StopWorkersData *newdata = palloc(sizeof(StopWorkersData));
592 
593 		newdata->nestDepth = nestDepth;
594 		newdata->workers = NIL;
595 		newdata->parent = on_commit_stop_workers;
596 		on_commit_stop_workers = newdata;
597 	}
598 
599 	/*
600 	 * Finally add a new worker into the worker list of the current
601 	 * subtransaction.
602 	 */
603 	wid = palloc(sizeof(LogicalRepWorkerId));
604 	wid->subid = subid;
605 	wid->relid = relid;
606 	on_commit_stop_workers->workers =
607 		lappend(on_commit_stop_workers->workers, wid);
608 
609 	MemoryContextSwitchTo(oldctx);
610 }
611 
612 /*
613  * Wake up (using latch) any logical replication worker for specified sub/rel.
614  */
615 void
logicalrep_worker_wakeup(Oid subid,Oid relid)616 logicalrep_worker_wakeup(Oid subid, Oid relid)
617 {
618 	LogicalRepWorker *worker;
619 
620 	LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
621 
622 	worker = logicalrep_worker_find(subid, relid, true);
623 
624 	if (worker)
625 		logicalrep_worker_wakeup_ptr(worker);
626 
627 	LWLockRelease(LogicalRepWorkerLock);
628 }
629 
630 /*
631  * Wake up (using latch) the specified logical replication worker.
632  *
633  * Caller must hold lock, else worker->proc could change under us.
634  */
635 void
logicalrep_worker_wakeup_ptr(LogicalRepWorker * worker)636 logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
637 {
638 	Assert(LWLockHeldByMe(LogicalRepWorkerLock));
639 
640 	SetLatch(&worker->proc->procLatch);
641 }
642 
643 /*
644  * Attach to a slot.
645  */
646 void
logicalrep_worker_attach(int slot)647 logicalrep_worker_attach(int slot)
648 {
649 	/* Block concurrent access. */
650 	LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
651 
652 	Assert(slot >= 0 && slot < max_logical_replication_workers);
653 	MyLogicalRepWorker = &LogicalRepCtx->workers[slot];
654 
655 	if (!MyLogicalRepWorker->in_use)
656 	{
657 		LWLockRelease(LogicalRepWorkerLock);
658 		ereport(ERROR,
659 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
660 				 errmsg("logical replication worker slot %d is empty, cannot attach",
661 						slot)));
662 	}
663 
664 	if (MyLogicalRepWorker->proc)
665 	{
666 		LWLockRelease(LogicalRepWorkerLock);
667 		ereport(ERROR,
668 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
669 				 errmsg("logical replication worker slot %d is already used by "
670 						"another worker, cannot attach", slot)));
671 	}
672 
673 	MyLogicalRepWorker->proc = MyProc;
674 	before_shmem_exit(logicalrep_worker_onexit, (Datum) 0);
675 
676 	LWLockRelease(LogicalRepWorkerLock);
677 }
678 
679 /*
680  * Detach the worker (cleans up the worker info).
681  */
682 static void
logicalrep_worker_detach(void)683 logicalrep_worker_detach(void)
684 {
685 	/* Block concurrent access. */
686 	LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
687 
688 	logicalrep_worker_cleanup(MyLogicalRepWorker);
689 
690 	LWLockRelease(LogicalRepWorkerLock);
691 }
692 
693 /*
694  * Clean up worker info.
695  */
696 static void
logicalrep_worker_cleanup(LogicalRepWorker * worker)697 logicalrep_worker_cleanup(LogicalRepWorker *worker)
698 {
699 	Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_EXCLUSIVE));
700 
701 	worker->in_use = false;
702 	worker->proc = NULL;
703 	worker->dbid = InvalidOid;
704 	worker->userid = InvalidOid;
705 	worker->subid = InvalidOid;
706 	worker->relid = InvalidOid;
707 }
708 
709 /*
710  * Cleanup function for logical replication launcher.
711  *
712  * Called on logical replication launcher exit.
713  */
714 static void
logicalrep_launcher_onexit(int code,Datum arg)715 logicalrep_launcher_onexit(int code, Datum arg)
716 {
717 	LogicalRepCtx->launcher_pid = 0;
718 }
719 
720 /*
721  * Cleanup function.
722  *
723  * Called on logical replication worker exit.
724  */
725 static void
logicalrep_worker_onexit(int code,Datum arg)726 logicalrep_worker_onexit(int code, Datum arg)
727 {
728 	/* Disconnect gracefully from the remote side. */
729 	if (LogRepWorkerWalRcvConn)
730 		walrcv_disconnect(LogRepWorkerWalRcvConn);
731 
732 	logicalrep_worker_detach();
733 
734 	ApplyLauncherWakeup();
735 }
736 
737 /* SIGHUP: set flag to reload configuration at next convenient time */
738 static void
logicalrep_launcher_sighup(SIGNAL_ARGS)739 logicalrep_launcher_sighup(SIGNAL_ARGS)
740 {
741 	int			save_errno = errno;
742 
743 	got_SIGHUP = true;
744 
745 	/* Waken anything waiting on the process latch */
746 	SetLatch(MyLatch);
747 
748 	errno = save_errno;
749 }
750 
751 /*
752  * Count the number of registered (not necessarily running) sync workers
753  * for a subscription.
754  */
755 int
logicalrep_sync_worker_count(Oid subid)756 logicalrep_sync_worker_count(Oid subid)
757 {
758 	int			i;
759 	int			res = 0;
760 
761 	Assert(LWLockHeldByMe(LogicalRepWorkerLock));
762 
763 	/* Search for attached worker for a given subscription id. */
764 	for (i = 0; i < max_logical_replication_workers; i++)
765 	{
766 		LogicalRepWorker *w = &LogicalRepCtx->workers[i];
767 
768 		if (w->subid == subid && OidIsValid(w->relid))
769 			res++;
770 	}
771 
772 	return res;
773 }
774 
775 /*
776  * ApplyLauncherShmemSize
777  *		Compute space needed for replication launcher shared memory
778  */
779 Size
ApplyLauncherShmemSize(void)780 ApplyLauncherShmemSize(void)
781 {
782 	Size		size;
783 
784 	/*
785 	 * Need the fixed struct and the array of LogicalRepWorker.
786 	 */
787 	size = sizeof(LogicalRepCtxStruct);
788 	size = MAXALIGN(size);
789 	size = add_size(size, mul_size(max_logical_replication_workers,
790 								   sizeof(LogicalRepWorker)));
791 	return size;
792 }
793 
794 /*
795  * ApplyLauncherRegister
796  *		Register a background worker running the logical replication launcher.
797  */
798 void
ApplyLauncherRegister(void)799 ApplyLauncherRegister(void)
800 {
801 	BackgroundWorker bgw;
802 
803 	if (max_logical_replication_workers == 0)
804 		return;
805 
806 	memset(&bgw, 0, sizeof(bgw));
807 	bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
808 		BGWORKER_BACKEND_DATABASE_CONNECTION;
809 	bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
810 	snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres");
811 	snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyLauncherMain");
812 	snprintf(bgw.bgw_name, BGW_MAXLEN,
813 			 "logical replication launcher");
814 	bgw.bgw_restart_time = 5;
815 	bgw.bgw_notify_pid = 0;
816 	bgw.bgw_main_arg = (Datum) 0;
817 
818 	RegisterBackgroundWorker(&bgw);
819 }
820 
821 /*
822  * ApplyLauncherShmemInit
823  *		Allocate and initialize replication launcher shared memory
824  */
825 void
ApplyLauncherShmemInit(void)826 ApplyLauncherShmemInit(void)
827 {
828 	bool		found;
829 
830 	LogicalRepCtx = (LogicalRepCtxStruct *)
831 		ShmemInitStruct("Logical Replication Launcher Data",
832 						ApplyLauncherShmemSize(),
833 						&found);
834 
835 	if (!found)
836 	{
837 		int			slot;
838 
839 		memset(LogicalRepCtx, 0, ApplyLauncherShmemSize());
840 
841 		/* Initialize memory and spin locks for each worker slot. */
842 		for (slot = 0; slot < max_logical_replication_workers; slot++)
843 		{
844 			LogicalRepWorker *worker = &LogicalRepCtx->workers[slot];
845 
846 			memset(worker, 0, sizeof(LogicalRepWorker));
847 			SpinLockInit(&worker->relmutex);
848 		}
849 	}
850 }
851 
852 /*
853  * Check whether current transaction has manipulated logical replication
854  * workers.
855  */
856 bool
XactManipulatesLogicalReplicationWorkers(void)857 XactManipulatesLogicalReplicationWorkers(void)
858 {
859 	return (on_commit_stop_workers != NULL);
860 }
861 
862 /*
863  * Wakeup the launcher on commit if requested.
864  */
865 void
AtEOXact_ApplyLauncher(bool isCommit)866 AtEOXact_ApplyLauncher(bool isCommit)
867 {
868 
869 	Assert(on_commit_stop_workers == NULL ||
870 		   (on_commit_stop_workers->nestDepth == 1 &&
871 			on_commit_stop_workers->parent == NULL));
872 
873 	if (isCommit)
874 	{
875 		ListCell   *lc;
876 
877 		if (on_commit_stop_workers != NULL)
878 		{
879 			List	   *workers = on_commit_stop_workers->workers;
880 
881 			foreach(lc, workers)
882 			{
883 				LogicalRepWorkerId *wid = lfirst(lc);
884 
885 				logicalrep_worker_stop(wid->subid, wid->relid);
886 			}
887 		}
888 
889 		if (on_commit_launcher_wakeup)
890 			ApplyLauncherWakeup();
891 	}
892 
893 	/*
894 	 * No need to pfree on_commit_stop_workers.  It was allocated in
895 	 * transaction memory context, which is going to be cleaned soon.
896 	 */
897 	on_commit_stop_workers = NULL;
898 	on_commit_launcher_wakeup = false;
899 }
900 
901 /*
902  * On commit, merge the current on_commit_stop_workers list into the
903  * immediate parent, if present.
904  * On rollback, discard the current on_commit_stop_workers list.
905  * Pop out the stack.
906  */
907 void
AtEOSubXact_ApplyLauncher(bool isCommit,int nestDepth)908 AtEOSubXact_ApplyLauncher(bool isCommit, int nestDepth)
909 {
910 	StopWorkersData *parent;
911 
912 	/* Exit immediately if there's no work to do at this level. */
913 	if (on_commit_stop_workers == NULL ||
914 		on_commit_stop_workers->nestDepth < nestDepth)
915 		return;
916 
917 	Assert(on_commit_stop_workers->nestDepth == nestDepth);
918 
919 	parent = on_commit_stop_workers->parent;
920 
921 	if (isCommit)
922 	{
923 		/*
924 		 * If the upper stack element is not an immediate parent
925 		 * subtransaction, just decrement the notional nesting depth without
926 		 * doing any real work.  Else, we need to merge the current workers
927 		 * list into the parent.
928 		 */
929 		if (!parent || parent->nestDepth < nestDepth - 1)
930 		{
931 			on_commit_stop_workers->nestDepth--;
932 			return;
933 		}
934 
935 		parent->workers =
936 			list_concat(parent->workers, on_commit_stop_workers->workers);
937 	}
938 	else
939 	{
940 		/*
941 		 * Abandon everything that was done at this nesting level.  Explicitly
942 		 * free memory to avoid a transaction-lifespan leak.
943 		 */
944 		list_free_deep(on_commit_stop_workers->workers);
945 	}
946 
947 	/*
948 	 * We have taken care of the current subtransaction workers list for both
949 	 * abort or commit. So we are ready to pop the stack.
950 	 */
951 	pfree(on_commit_stop_workers);
952 	on_commit_stop_workers = parent;
953 }
954 
955 /*
956  * Request wakeup of the launcher on commit of the transaction.
957  *
958  * This is used to send launcher signal to stop sleeping and process the
959  * subscriptions when current transaction commits. Should be used when new
960  * tuple was added to the pg_subscription catalog.
961 */
962 void
ApplyLauncherWakeupAtCommit(void)963 ApplyLauncherWakeupAtCommit(void)
964 {
965 	if (!on_commit_launcher_wakeup)
966 		on_commit_launcher_wakeup = true;
967 }
968 
969 static void
ApplyLauncherWakeup(void)970 ApplyLauncherWakeup(void)
971 {
972 	if (LogicalRepCtx->launcher_pid != 0)
973 		kill(LogicalRepCtx->launcher_pid, SIGUSR1);
974 }
975 
976 /*
977  * Main loop for the apply launcher process.
978  */
979 void
ApplyLauncherMain(Datum main_arg)980 ApplyLauncherMain(Datum main_arg)
981 {
982 	TimestampTz last_start_time = 0;
983 
984 	ereport(DEBUG1,
985 			(errmsg("logical replication launcher started")));
986 
987 	before_shmem_exit(logicalrep_launcher_onexit, (Datum) 0);
988 
989 	Assert(LogicalRepCtx->launcher_pid == 0);
990 	LogicalRepCtx->launcher_pid = MyProcPid;
991 
992 	/* Establish signal handlers. */
993 	pqsignal(SIGHUP, logicalrep_launcher_sighup);
994 	pqsignal(SIGTERM, die);
995 	BackgroundWorkerUnblockSignals();
996 
997 	/*
998 	 * Establish connection to nailed catalogs (we only ever access
999 	 * pg_subscription).
1000 	 */
1001 	BackgroundWorkerInitializeConnection(NULL, NULL);
1002 
1003 	/* Enter main loop */
1004 	for (;;)
1005 	{
1006 		int			rc;
1007 		List	   *sublist;
1008 		ListCell   *lc;
1009 		MemoryContext subctx;
1010 		MemoryContext oldctx;
1011 		TimestampTz now;
1012 		long		wait_time = DEFAULT_NAPTIME_PER_CYCLE;
1013 
1014 		CHECK_FOR_INTERRUPTS();
1015 
1016 		now = GetCurrentTimestamp();
1017 
1018 		/* Limit the start retry to once a wal_retrieve_retry_interval */
1019 		if (TimestampDifferenceExceeds(last_start_time, now,
1020 									   wal_retrieve_retry_interval))
1021 		{
1022 			/* Use temporary context for the database list and worker info. */
1023 			subctx = AllocSetContextCreate(TopMemoryContext,
1024 										   "Logical Replication Launcher sublist",
1025 										   ALLOCSET_DEFAULT_MINSIZE,
1026 										   ALLOCSET_DEFAULT_INITSIZE,
1027 										   ALLOCSET_DEFAULT_MAXSIZE);
1028 			oldctx = MemoryContextSwitchTo(subctx);
1029 
1030 			/* search for subscriptions to start or stop. */
1031 			sublist = get_subscription_list();
1032 
1033 			/* Start the missing workers for enabled subscriptions. */
1034 			foreach(lc, sublist)
1035 			{
1036 				Subscription *sub = (Subscription *) lfirst(lc);
1037 				LogicalRepWorker *w;
1038 
1039 				if (!sub->enabled)
1040 					continue;
1041 
1042 				LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1043 				w = logicalrep_worker_find(sub->oid, InvalidOid, false);
1044 				LWLockRelease(LogicalRepWorkerLock);
1045 
1046 				if (w == NULL)
1047 				{
1048 					last_start_time = now;
1049 					wait_time = wal_retrieve_retry_interval;
1050 
1051 					logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
1052 											 sub->owner, InvalidOid);
1053 				}
1054 			}
1055 
1056 			/* Switch back to original memory context. */
1057 			MemoryContextSwitchTo(oldctx);
1058 			/* Clean the temporary memory. */
1059 			MemoryContextDelete(subctx);
1060 		}
1061 		else
1062 		{
1063 			/*
1064 			 * The wait in previous cycle was interrupted in less than
1065 			 * wal_retrieve_retry_interval since last worker was started, this
1066 			 * usually means crash of the worker, so we should retry in
1067 			 * wal_retrieve_retry_interval again.
1068 			 */
1069 			wait_time = wal_retrieve_retry_interval;
1070 		}
1071 
1072 		/* Wait for more work. */
1073 		rc = WaitLatch(MyLatch,
1074 					   WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
1075 					   wait_time,
1076 					   WAIT_EVENT_LOGICAL_LAUNCHER_MAIN);
1077 
1078 		/* emergency bailout if postmaster has died */
1079 		if (rc & WL_POSTMASTER_DEATH)
1080 			proc_exit(1);
1081 
1082 		if (rc & WL_LATCH_SET)
1083 		{
1084 			ResetLatch(MyLatch);
1085 			CHECK_FOR_INTERRUPTS();
1086 		}
1087 
1088 		if (got_SIGHUP)
1089 		{
1090 			got_SIGHUP = false;
1091 			ProcessConfigFile(PGC_SIGHUP);
1092 		}
1093 	}
1094 
1095 	/* Not reachable */
1096 }
1097 
1098 /*
1099  * Is current process the logical replication launcher?
1100  */
1101 bool
IsLogicalLauncher(void)1102 IsLogicalLauncher(void)
1103 {
1104 	return LogicalRepCtx->launcher_pid == MyProcPid;
1105 }
1106 
1107 /*
1108  * Returns state of the subscriptions.
1109  */
1110 Datum
pg_stat_get_subscription(PG_FUNCTION_ARGS)1111 pg_stat_get_subscription(PG_FUNCTION_ARGS)
1112 {
1113 #define PG_STAT_GET_SUBSCRIPTION_COLS	8
1114 	Oid			subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
1115 	int			i;
1116 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1117 	TupleDesc	tupdesc;
1118 	Tuplestorestate *tupstore;
1119 	MemoryContext per_query_ctx;
1120 	MemoryContext oldcontext;
1121 
1122 	/* check to see if caller supports us returning a tuplestore */
1123 	if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
1124 		ereport(ERROR,
1125 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1126 				 errmsg("set-valued function called in context that cannot accept a set")));
1127 	if (!(rsinfo->allowedModes & SFRM_Materialize))
1128 		ereport(ERROR,
1129 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1130 				 errmsg("materialize mode required, but it is not " \
1131 						"allowed in this context")));
1132 
1133 	/* Build a tuple descriptor for our result type */
1134 	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1135 		elog(ERROR, "return type must be a row type");
1136 
1137 	per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
1138 	oldcontext = MemoryContextSwitchTo(per_query_ctx);
1139 
1140 	tupstore = tuplestore_begin_heap(true, false, work_mem);
1141 	rsinfo->returnMode = SFRM_Materialize;
1142 	rsinfo->setResult = tupstore;
1143 	rsinfo->setDesc = tupdesc;
1144 
1145 	MemoryContextSwitchTo(oldcontext);
1146 
1147 	/* Make sure we get consistent view of the workers. */
1148 	LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1149 
1150 	for (i = 0; i <= max_logical_replication_workers; i++)
1151 	{
1152 		/* for each row */
1153 		Datum		values[PG_STAT_GET_SUBSCRIPTION_COLS];
1154 		bool		nulls[PG_STAT_GET_SUBSCRIPTION_COLS];
1155 		int			worker_pid;
1156 		LogicalRepWorker worker;
1157 
1158 		memcpy(&worker, &LogicalRepCtx->workers[i],
1159 			   sizeof(LogicalRepWorker));
1160 		if (!worker.proc || !IsBackendPid(worker.proc->pid))
1161 			continue;
1162 
1163 		if (OidIsValid(subid) && worker.subid != subid)
1164 			continue;
1165 
1166 		worker_pid = worker.proc->pid;
1167 
1168 		MemSet(values, 0, sizeof(values));
1169 		MemSet(nulls, 0, sizeof(nulls));
1170 
1171 		values[0] = ObjectIdGetDatum(worker.subid);
1172 		if (OidIsValid(worker.relid))
1173 			values[1] = ObjectIdGetDatum(worker.relid);
1174 		else
1175 			nulls[1] = true;
1176 		values[2] = Int32GetDatum(worker_pid);
1177 		if (XLogRecPtrIsInvalid(worker.last_lsn))
1178 			nulls[3] = true;
1179 		else
1180 			values[3] = LSNGetDatum(worker.last_lsn);
1181 		if (worker.last_send_time == 0)
1182 			nulls[4] = true;
1183 		else
1184 			values[4] = TimestampTzGetDatum(worker.last_send_time);
1185 		if (worker.last_recv_time == 0)
1186 			nulls[5] = true;
1187 		else
1188 			values[5] = TimestampTzGetDatum(worker.last_recv_time);
1189 		if (XLogRecPtrIsInvalid(worker.reply_lsn))
1190 			nulls[6] = true;
1191 		else
1192 			values[6] = LSNGetDatum(worker.reply_lsn);
1193 		if (worker.reply_time == 0)
1194 			nulls[7] = true;
1195 		else
1196 			values[7] = TimestampTzGetDatum(worker.reply_time);
1197 
1198 		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
1199 
1200 		/*
1201 		 * If only a single subscription was requested, and we found it,
1202 		 * break.
1203 		 */
1204 		if (OidIsValid(subid))
1205 			break;
1206 	}
1207 
1208 	LWLockRelease(LogicalRepWorkerLock);
1209 
1210 	/* clean up and return the tuplestore */
1211 	tuplestore_donestoring(tupstore);
1212 
1213 	return (Datum) 0;
1214 }
1215