1 /*-------------------------------------------------------------------------
2  * worker.c
3  *	   PostgreSQL logical replication worker (apply)
4  *
5  * Copyright (c) 2016-2017, PostgreSQL Global Development Group
6  *
7  * IDENTIFICATION
8  *	  src/backend/replication/logical/worker.c
9  *
10  * NOTES
11  *	  This file contains the worker which applies logical changes as they come
12  *	  from remote logical replication stream.
13  *
14  *	  The main worker (apply) is started by logical replication worker
15  *	  launcher for every enabled subscription in a database. It uses
16  *	  walsender protocol to communicate with publisher.
17  *
18  *	  This module includes server facing code and shares libpqwalreceiver
19  *	  module with walreceiver for providing the libpq specific functionality.
20  *
21  *-------------------------------------------------------------------------
22  */
23 
24 #include "postgres.h"
25 
26 #include "miscadmin.h"
27 #include "pgstat.h"
28 #include "funcapi.h"
29 
30 #include "access/sysattr.h"
31 #include "access/xact.h"
32 #include "access/xlog_internal.h"
33 
34 #include "catalog/namespace.h"
35 #include "catalog/pg_subscription.h"
36 #include "catalog/pg_subscription_rel.h"
37 
38 #include "commands/trigger.h"
39 
40 #include "executor/executor.h"
41 #include "executor/nodeModifyTable.h"
42 
43 #include "libpq/pqformat.h"
44 #include "libpq/pqsignal.h"
45 
46 #include "mb/pg_wchar.h"
47 
48 #include "nodes/makefuncs.h"
49 
50 #include "optimizer/planner.h"
51 
52 #include "parser/parse_relation.h"
53 
54 #include "postmaster/bgworker.h"
55 #include "postmaster/postmaster.h"
56 #include "postmaster/walwriter.h"
57 
58 #include "replication/decode.h"
59 #include "replication/logical.h"
60 #include "replication/logicalproto.h"
61 #include "replication/logicalrelation.h"
62 #include "replication/logicalworker.h"
63 #include "replication/reorderbuffer.h"
64 #include "replication/origin.h"
65 #include "replication/snapbuild.h"
66 #include "replication/walreceiver.h"
67 #include "replication/worker_internal.h"
68 
69 #include "rewrite/rewriteHandler.h"
70 
71 #include "storage/bufmgr.h"
72 #include "storage/ipc.h"
73 #include "storage/lmgr.h"
74 #include "storage/proc.h"
75 #include "storage/procarray.h"
76 
77 #include "tcop/tcopprot.h"
78 
79 #include "utils/builtins.h"
80 #include "utils/catcache.h"
81 #include "utils/datum.h"
82 #include "utils/fmgroids.h"
83 #include "utils/guc.h"
84 #include "utils/inval.h"
85 #include "utils/lsyscache.h"
86 #include "utils/memutils.h"
87 #include "utils/timeout.h"
88 #include "utils/tqual.h"
89 #include "utils/syscache.h"
90 
91 #define NAPTIME_PER_CYCLE 1000	/* max sleep time between cycles (1s) */
92 
93 typedef struct FlushPosition
94 {
95 	dlist_node	node;
96 	XLogRecPtr	local_end;
97 	XLogRecPtr	remote_end;
98 } FlushPosition;
99 
100 static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
101 
102 typedef struct SlotErrCallbackArg
103 {
104 	LogicalRepRelMapEntry *rel;
105 	int			remote_attnum;
106 } SlotErrCallbackArg;
107 
108 static MemoryContext ApplyMessageContext = NULL;
109 MemoryContext ApplyContext = NULL;
110 
111 WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
112 
113 Subscription *MySubscription = NULL;
114 bool		MySubscriptionValid = false;
115 
116 bool		in_remote_transaction = false;
117 static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
118 
119 static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
120 
121 static void store_flush_position(XLogRecPtr remote_lsn);
122 
123 static void maybe_reread_subscription(void);
124 
125 /* Flags set by signal handlers */
126 static volatile sig_atomic_t got_SIGHUP = false;
127 
128 /*
129  * Should this worker apply changes for given relation.
130  *
131  * This is mainly needed for initial relation data sync as that runs in
132  * separate worker process running in parallel and we need some way to skip
133  * changes coming to the main apply worker during the sync of a table.
134  *
135  * Note we need to do smaller or equals comparison for SYNCDONE state because
136  * it might hold position of end of initial slot consistent point WAL
137  * record + 1 (ie start of next record) and next record can be COMMIT of
138  * transaction we are now processing (which is what we set remote_final_lsn
139  * to in apply_handle_begin).
140  */
141 static bool
should_apply_changes_for_rel(LogicalRepRelMapEntry * rel)142 should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
143 {
144 	if (am_tablesync_worker())
145 		return MyLogicalRepWorker->relid == rel->localreloid;
146 	else
147 		return (rel->state == SUBREL_STATE_READY ||
148 				(rel->state == SUBREL_STATE_SYNCDONE &&
149 				 rel->statelsn <= remote_final_lsn));
150 }
151 
152 /*
153  * Make sure that we started local transaction.
154  *
155  * Also switches to ApplyMessageContext as necessary.
156  */
157 static bool
ensure_transaction(void)158 ensure_transaction(void)
159 {
160 	if (IsTransactionState())
161 	{
162 		SetCurrentStatementStartTimestamp();
163 
164 		if (CurrentMemoryContext != ApplyMessageContext)
165 			MemoryContextSwitchTo(ApplyMessageContext);
166 
167 		return false;
168 	}
169 
170 	SetCurrentStatementStartTimestamp();
171 	StartTransactionCommand();
172 
173 	maybe_reread_subscription();
174 
175 	MemoryContextSwitchTo(ApplyMessageContext);
176 	return true;
177 }
178 
179 
180 /*
181  * Executor state preparation for evaluation of constraint expressions,
182  * indexes and triggers.
183  *
184  * This is based on similar code in copy.c
185  */
186 static EState *
create_estate_for_relation(LogicalRepRelMapEntry * rel)187 create_estate_for_relation(LogicalRepRelMapEntry *rel)
188 {
189 	EState	   *estate;
190 	ResultRelInfo *resultRelInfo;
191 	RangeTblEntry *rte;
192 
193 	estate = CreateExecutorState();
194 
195 	rte = makeNode(RangeTblEntry);
196 	rte->rtekind = RTE_RELATION;
197 	rte->relid = RelationGetRelid(rel->localrel);
198 	rte->relkind = rel->localrel->rd_rel->relkind;
199 	estate->es_range_table = list_make1(rte);
200 
201 	resultRelInfo = makeNode(ResultRelInfo);
202 	InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
203 
204 	estate->es_result_relations = resultRelInfo;
205 	estate->es_num_result_relations = 1;
206 	estate->es_result_relation_info = resultRelInfo;
207 
208 	estate->es_output_cid = GetCurrentCommandId(true);
209 
210 	/* Triggers might need a slot */
211 	if (resultRelInfo->ri_TrigDesc)
212 		estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate);
213 
214 	/* Prepare to catch AFTER triggers. */
215 	AfterTriggerBeginQuery();
216 
217 	return estate;
218 }
219 
220 /*
221  * Executes default values for columns for which we can't map to remote
222  * relation columns.
223  *
224  * This allows us to support tables which have more columns on the downstream
225  * than on the upstream.
226  */
227 static void
slot_fill_defaults(LogicalRepRelMapEntry * rel,EState * estate,TupleTableSlot * slot)228 slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate,
229 				   TupleTableSlot *slot)
230 {
231 	TupleDesc	desc = RelationGetDescr(rel->localrel);
232 	int			num_phys_attrs = desc->natts;
233 	int			i;
234 	int			attnum,
235 				num_defaults = 0;
236 	int		   *defmap;
237 	ExprState **defexprs;
238 	ExprContext *econtext;
239 
240 	econtext = GetPerTupleExprContext(estate);
241 
242 	/* We got all the data via replication, no need to evaluate anything. */
243 	if (num_phys_attrs == rel->remoterel.natts)
244 		return;
245 
246 	defmap = (int *) palloc(num_phys_attrs * sizeof(int));
247 	defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
248 
249 	for (attnum = 0; attnum < num_phys_attrs; attnum++)
250 	{
251 		Expr	   *defexpr;
252 
253 		if (desc->attrs[attnum]->attisdropped)
254 			continue;
255 
256 		if (rel->attrmap[attnum] >= 0)
257 			continue;
258 
259 		defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
260 
261 		if (defexpr != NULL)
262 		{
263 			/* Run the expression through planner */
264 			defexpr = expression_planner(defexpr);
265 
266 			/* Initialize executable expression in copycontext */
267 			defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
268 			defmap[num_defaults] = attnum;
269 			num_defaults++;
270 		}
271 
272 	}
273 
274 	for (i = 0; i < num_defaults; i++)
275 		slot->tts_values[defmap[i]] =
276 			ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
277 }
278 
279 /*
280  * Error callback to give more context info about data conversion failures
281  * while reading data from the remote server.
282  */
283 static void
slot_store_error_callback(void * arg)284 slot_store_error_callback(void *arg)
285 {
286 	SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
287 	LogicalRepRelMapEntry *rel;
288 
289 	/* Nothing to do if remote attribute number is not set */
290 	if (errarg->remote_attnum < 0)
291 		return;
292 
293 	rel = errarg->rel;
294 	errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\"",
295 			   rel->remoterel.nspname, rel->remoterel.relname,
296 			   rel->remoterel.attnames[errarg->remote_attnum]);
297 }
298 
299 /*
300  * Store data in C string form into slot.
301  * This is similar to BuildTupleFromCStrings but TupleTableSlot fits our
302  * use better.
303  */
304 static void
slot_store_cstrings(TupleTableSlot * slot,LogicalRepRelMapEntry * rel,char ** values)305 slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
306 					char **values)
307 {
308 	int			natts = slot->tts_tupleDescriptor->natts;
309 	int			i;
310 	SlotErrCallbackArg errarg;
311 	ErrorContextCallback errcallback;
312 
313 	ExecClearTuple(slot);
314 
315 	/* Push callback + info on the error context stack */
316 	errarg.rel = rel;
317 	errarg.remote_attnum = -1;
318 	errcallback.callback = slot_store_error_callback;
319 	errcallback.arg = (void *) &errarg;
320 	errcallback.previous = error_context_stack;
321 	error_context_stack = &errcallback;
322 
323 	/* Call the "in" function for each non-dropped attribute */
324 	for (i = 0; i < natts; i++)
325 	{
326 		Form_pg_attribute att = slot->tts_tupleDescriptor->attrs[i];
327 		int			remoteattnum = rel->attrmap[i];
328 
329 		if (!att->attisdropped && remoteattnum >= 0 &&
330 			values[remoteattnum] != NULL)
331 		{
332 			Oid			typinput;
333 			Oid			typioparam;
334 
335 			errarg.remote_attnum = remoteattnum;
336 
337 			getTypeInputInfo(att->atttypid, &typinput, &typioparam);
338 			slot->tts_values[i] =
339 				OidInputFunctionCall(typinput, values[remoteattnum],
340 									 typioparam, att->atttypmod);
341 			slot->tts_isnull[i] = false;
342 
343 			errarg.remote_attnum = -1;
344 		}
345 		else
346 		{
347 			/*
348 			 * We assign NULL to dropped attributes, NULL values, and missing
349 			 * values (missing values should be later filled using
350 			 * slot_fill_defaults).
351 			 */
352 			slot->tts_values[i] = (Datum) 0;
353 			slot->tts_isnull[i] = true;
354 		}
355 	}
356 
357 	/* Pop the error context stack */
358 	error_context_stack = errcallback.previous;
359 
360 	ExecStoreVirtualTuple(slot);
361 }
362 
363 /*
364  * Replace selected columns with user data provided as C strings.
365  * This is somewhat similar to heap_modify_tuple but also calls the type
366  * input functions on the user data.
367  * "slot" is filled with a copy of the tuple in "srcslot", with
368  * columns selected by the "replaces" array replaced with data values
369  * from "values".
370  * Caution: unreplaced pass-by-ref columns in "slot" will point into the
371  * storage for "srcslot".  This is OK for current usage, but someday we may
372  * need to materialize "slot" at the end to make it independent of "srcslot".
373  */
374 static void
slot_modify_cstrings(TupleTableSlot * slot,TupleTableSlot * srcslot,LogicalRepRelMapEntry * rel,char ** values,bool * replaces)375 slot_modify_cstrings(TupleTableSlot *slot, TupleTableSlot *srcslot,
376 					 LogicalRepRelMapEntry *rel,
377 					 char **values, bool *replaces)
378 {
379 	int			natts = slot->tts_tupleDescriptor->natts;
380 	int			i;
381 	SlotErrCallbackArg errarg;
382 	ErrorContextCallback errcallback;
383 
384 	/* We'll fill "slot" with a virtual tuple, so we must start with ... */
385 	ExecClearTuple(slot);
386 
387 	/*
388 	 * Copy all the column data from srcslot, so that we'll have valid values
389 	 * for unreplaced columns.
390 	 */
391 	Assert(natts == srcslot->tts_tupleDescriptor->natts);
392 	slot_getallattrs(srcslot);
393 	memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
394 	memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
395 
396 	/* For error reporting, push callback + info on the error context stack */
397 	errarg.rel = rel;
398 	errarg.remote_attnum = -1;
399 	errcallback.callback = slot_store_error_callback;
400 	errcallback.arg = (void *) &errarg;
401 	errcallback.previous = error_context_stack;
402 	error_context_stack = &errcallback;
403 
404 	/* Call the "in" function for each replaced attribute */
405 	for (i = 0; i < natts; i++)
406 	{
407 		Form_pg_attribute att = slot->tts_tupleDescriptor->attrs[i];
408 		int			remoteattnum = rel->attrmap[i];
409 
410 		if (remoteattnum < 0)
411 			continue;
412 
413 		if (!replaces[remoteattnum])
414 			continue;
415 
416 		if (values[remoteattnum] != NULL)
417 		{
418 			Oid			typinput;
419 			Oid			typioparam;
420 
421 			errarg.remote_attnum = remoteattnum;
422 
423 			getTypeInputInfo(att->atttypid, &typinput, &typioparam);
424 			slot->tts_values[i] =
425 				OidInputFunctionCall(typinput, values[remoteattnum],
426 									 typioparam, att->atttypmod);
427 			slot->tts_isnull[i] = false;
428 
429 			errarg.remote_attnum = -1;
430 		}
431 		else
432 		{
433 			slot->tts_values[i] = (Datum) 0;
434 			slot->tts_isnull[i] = true;
435 		}
436 	}
437 
438 	/* Pop the error context stack */
439 	error_context_stack = errcallback.previous;
440 
441 	/* And finally, declare that "slot" contains a valid virtual tuple */
442 	ExecStoreVirtualTuple(slot);
443 }
444 
445 /*
446  * Handle BEGIN message.
447  */
448 static void
apply_handle_begin(StringInfo s)449 apply_handle_begin(StringInfo s)
450 {
451 	LogicalRepBeginData begin_data;
452 
453 	logicalrep_read_begin(s, &begin_data);
454 
455 	remote_final_lsn = begin_data.final_lsn;
456 
457 	in_remote_transaction = true;
458 
459 	pgstat_report_activity(STATE_RUNNING, NULL);
460 }
461 
462 /*
463  * Handle COMMIT message.
464  *
465  * TODO, support tracking of multiple origins
466  */
467 static void
apply_handle_commit(StringInfo s)468 apply_handle_commit(StringInfo s)
469 {
470 	LogicalRepCommitData commit_data;
471 
472 	logicalrep_read_commit(s, &commit_data);
473 
474 	if (commit_data.commit_lsn != remote_final_lsn)
475 		ereport(ERROR,
476 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
477 				 errmsg_internal("incorrect commit LSN %X/%X in commit message (expected %X/%X)",
478 								 (uint32) (commit_data.commit_lsn >> 32),
479 								 (uint32) commit_data.commit_lsn,
480 								 (uint32) (remote_final_lsn >> 32),
481 								 (uint32) remote_final_lsn)));
482 
483 	/* The synchronization worker runs in single transaction. */
484 	if (IsTransactionState() && !am_tablesync_worker())
485 	{
486 		/*
487 		 * Update origin state so we can restart streaming from correct
488 		 * position in case of crash.
489 		 */
490 		replorigin_session_origin_lsn = commit_data.end_lsn;
491 		replorigin_session_origin_timestamp = commit_data.committime;
492 
493 		CommitTransactionCommand();
494 		pgstat_report_stat(false);
495 
496 		store_flush_position(commit_data.end_lsn);
497 	}
498 	else
499 	{
500 		/* Process any invalidation messages that might have accumulated. */
501 		AcceptInvalidationMessages();
502 		maybe_reread_subscription();
503 	}
504 
505 	in_remote_transaction = false;
506 
507 	/* Process any tables that are being synchronized in parallel. */
508 	process_syncing_tables(commit_data.end_lsn);
509 
510 	pgstat_report_activity(STATE_IDLE, NULL);
511 }
512 
513 /*
514  * Handle ORIGIN message.
515  *
516  * TODO, support tracking of multiple origins
517  */
518 static void
apply_handle_origin(StringInfo s)519 apply_handle_origin(StringInfo s)
520 {
521 	/*
522 	 * ORIGIN message can only come inside remote transaction and before any
523 	 * actual writes.
524 	 */
525 	if (!in_remote_transaction ||
526 		(IsTransactionState() && !am_tablesync_worker()))
527 		ereport(ERROR,
528 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
529 				 errmsg("ORIGIN message sent out of order")));
530 }
531 
532 /*
533  * Handle RELATION message.
534  *
535  * Note we don't do validation against local schema here. The validation
536  * against local schema is postponed until first change for given relation
537  * comes as we only care about it when applying changes for it anyway and we
538  * do less locking this way.
539  */
540 static void
apply_handle_relation(StringInfo s)541 apply_handle_relation(StringInfo s)
542 {
543 	LogicalRepRelation *rel;
544 
545 	rel = logicalrep_read_rel(s);
546 	logicalrep_relmap_update(rel);
547 }
548 
549 /*
550  * Handle TYPE message.
551  *
552  * This is now vestigial; we read the info and discard it.
553  */
554 static void
apply_handle_type(StringInfo s)555 apply_handle_type(StringInfo s)
556 {
557 	LogicalRepTyp typ;
558 
559 	logicalrep_read_typ(s, &typ);
560 }
561 
562 /*
563  * Get replica identity index or if it is not defined a primary key.
564  *
565  * If neither is defined, returns InvalidOid
566  */
567 static Oid
GetRelationIdentityOrPK(Relation rel)568 GetRelationIdentityOrPK(Relation rel)
569 {
570 	Oid			idxoid;
571 
572 	idxoid = RelationGetReplicaIndex(rel);
573 
574 	if (!OidIsValid(idxoid))
575 		idxoid = RelationGetPrimaryKeyIndex(rel);
576 
577 	return idxoid;
578 }
579 
580 /*
581  * Handle INSERT message.
582  */
583 static void
apply_handle_insert(StringInfo s)584 apply_handle_insert(StringInfo s)
585 {
586 	LogicalRepRelMapEntry *rel;
587 	LogicalRepTupleData newtup;
588 	LogicalRepRelId relid;
589 	EState	   *estate;
590 	TupleTableSlot *remoteslot;
591 	MemoryContext oldctx;
592 
593 	ensure_transaction();
594 
595 	relid = logicalrep_read_insert(s, &newtup);
596 	rel = logicalrep_rel_open(relid, RowExclusiveLock);
597 	if (!should_apply_changes_for_rel(rel))
598 	{
599 		/*
600 		 * The relation can't become interesting in the middle of the
601 		 * transaction so it's safe to unlock it.
602 		 */
603 		logicalrep_rel_close(rel, RowExclusiveLock);
604 		return;
605 	}
606 
607 	/* Initialize the executor state. */
608 	estate = create_estate_for_relation(rel);
609 	remoteslot = ExecInitExtraTupleSlot(estate);
610 	ExecSetSlotDescriptor(remoteslot, RelationGetDescr(rel->localrel));
611 
612 	/* Input functions may need an active snapshot, so get one */
613 	PushActiveSnapshot(GetTransactionSnapshot());
614 
615 	/* Process and store remote tuple in the slot */
616 	oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
617 	slot_store_cstrings(remoteslot, rel, newtup.values);
618 	slot_fill_defaults(rel, estate, remoteslot);
619 	MemoryContextSwitchTo(oldctx);
620 
621 	ExecOpenIndices(estate->es_result_relation_info, false);
622 
623 	/* Do the insert. */
624 	ExecSimpleRelationInsert(estate, remoteslot);
625 
626 	/* Cleanup. */
627 	ExecCloseIndices(estate->es_result_relation_info);
628 	PopActiveSnapshot();
629 
630 	/* Handle queued AFTER triggers. */
631 	AfterTriggerEndQuery(estate);
632 
633 	ExecResetTupleTable(estate->es_tupleTable, false);
634 	FreeExecutorState(estate);
635 
636 	logicalrep_rel_close(rel, NoLock);
637 
638 	CommandCounterIncrement();
639 }
640 
641 /*
642  * Check if the logical replication relation is updatable and throw
643  * appropriate error if it isn't.
644  */
645 static void
check_relation_updatable(LogicalRepRelMapEntry * rel)646 check_relation_updatable(LogicalRepRelMapEntry *rel)
647 {
648 	/* Updatable, no error. */
649 	if (rel->updatable)
650 		return;
651 
652 	/*
653 	 * We are in error mode so it's fine this is somewhat slow. It's better to
654 	 * give user correct error.
655 	 */
656 	if (OidIsValid(GetRelationIdentityOrPK(rel->localrel)))
657 	{
658 		ereport(ERROR,
659 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
660 				 errmsg("publisher did not send replica identity column "
661 						"expected by the logical replication target relation \"%s.%s\"",
662 						rel->remoterel.nspname, rel->remoterel.relname)));
663 	}
664 
665 	ereport(ERROR,
666 			(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
667 			 errmsg("logical replication target relation \"%s.%s\" has "
668 					"neither REPLICA IDENTITY index nor PRIMARY "
669 					"KEY and published relation does not have "
670 					"REPLICA IDENTITY FULL",
671 					rel->remoterel.nspname, rel->remoterel.relname)));
672 }
673 
674 /*
675  * Handle UPDATE message.
676  *
677  * TODO: FDW support
678  */
679 static void
apply_handle_update(StringInfo s)680 apply_handle_update(StringInfo s)
681 {
682 	LogicalRepRelMapEntry *rel;
683 	LogicalRepRelId relid;
684 	Oid			idxoid;
685 	EState	   *estate;
686 	EPQState	epqstate;
687 	LogicalRepTupleData oldtup;
688 	LogicalRepTupleData newtup;
689 	bool		has_oldtup;
690 	TupleTableSlot *localslot;
691 	TupleTableSlot *remoteslot;
692 	RangeTblEntry *target_rte;
693 	int			i;
694 	bool		found;
695 	MemoryContext oldctx;
696 
697 	ensure_transaction();
698 
699 	relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
700 								   &newtup);
701 	rel = logicalrep_rel_open(relid, RowExclusiveLock);
702 	if (!should_apply_changes_for_rel(rel))
703 	{
704 		/*
705 		 * The relation can't become interesting in the middle of the
706 		 * transaction so it's safe to unlock it.
707 		 */
708 		logicalrep_rel_close(rel, RowExclusiveLock);
709 		return;
710 	}
711 
712 	/* Check if we can do the update. */
713 	check_relation_updatable(rel);
714 
715 	/* Initialize the executor state. */
716 	estate = create_estate_for_relation(rel);
717 	remoteslot = ExecInitExtraTupleSlot(estate);
718 	ExecSetSlotDescriptor(remoteslot, RelationGetDescr(rel->localrel));
719 	localslot = ExecInitExtraTupleSlot(estate);
720 	ExecSetSlotDescriptor(localslot, RelationGetDescr(rel->localrel));
721 	EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
722 
723 	/*
724 	 * Populate updatedCols so that per-column triggers can fire.  This could
725 	 * include more columns than were actually changed on the publisher
726 	 * because the logical replication protocol doesn't contain that
727 	 * information.  But it would for example exclude columns that only exist
728 	 * on the subscriber, since we are not touching those.
729 	 */
730 	target_rte = list_nth(estate->es_range_table, 0);
731 	for (i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
732 	{
733 		Form_pg_attribute att = TupleDescAttr(remoteslot->tts_tupleDescriptor, i);
734 		int			remoteattnum = rel->attrmap[i];
735 
736 		if (!att->attisdropped && remoteattnum >= 0)
737 		{
738 			if (newtup.changed[remoteattnum])
739 				target_rte->updatedCols =
740 					bms_add_member(target_rte->updatedCols,
741 								   i + 1 - FirstLowInvalidHeapAttributeNumber);
742 		}
743 	}
744 
745 	PushActiveSnapshot(GetTransactionSnapshot());
746 	ExecOpenIndices(estate->es_result_relation_info, false);
747 
748 	/* Build the search tuple. */
749 	oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
750 	slot_store_cstrings(remoteslot, rel,
751 						has_oldtup ? oldtup.values : newtup.values);
752 	MemoryContextSwitchTo(oldctx);
753 
754 	/*
755 	 * Try to find tuple using either replica identity index, primary key or
756 	 * if needed, sequential scan.
757 	 */
758 	idxoid = GetRelationIdentityOrPK(rel->localrel);
759 	Assert(OidIsValid(idxoid) ||
760 		   (rel->remoterel.replident == REPLICA_IDENTITY_FULL && has_oldtup));
761 
762 	if (OidIsValid(idxoid))
763 		found = RelationFindReplTupleByIndex(rel->localrel, idxoid,
764 											 LockTupleExclusive,
765 											 remoteslot, localslot);
766 	else
767 		found = RelationFindReplTupleSeq(rel->localrel, LockTupleExclusive,
768 										 remoteslot, localslot);
769 
770 	ExecClearTuple(remoteslot);
771 
772 	/*
773 	 * Tuple found.
774 	 *
775 	 * Note this will fail if there are other conflicting unique indexes.
776 	 */
777 	if (found)
778 	{
779 		/* Process and store remote tuple in the slot */
780 		oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
781 		slot_modify_cstrings(remoteslot, localslot, rel,
782 							 newtup.values, newtup.changed);
783 		MemoryContextSwitchTo(oldctx);
784 
785 		EvalPlanQualSetSlot(&epqstate, remoteslot);
786 
787 		/* Do the actual update. */
788 		ExecSimpleRelationUpdate(estate, &epqstate, localslot, remoteslot);
789 	}
790 	else
791 	{
792 		/*
793 		 * The tuple to be updated could not be found.
794 		 *
795 		 * TODO what to do here, change the log level to LOG perhaps?
796 		 */
797 		elog(DEBUG1,
798 			 "logical replication did not find row for update "
799 			 "in replication target relation \"%s\"",
800 			 RelationGetRelationName(rel->localrel));
801 	}
802 
803 	/* Cleanup. */
804 	ExecCloseIndices(estate->es_result_relation_info);
805 	PopActiveSnapshot();
806 
807 	/* Handle queued AFTER triggers. */
808 	AfterTriggerEndQuery(estate);
809 
810 	EvalPlanQualEnd(&epqstate);
811 	ExecResetTupleTable(estate->es_tupleTable, false);
812 	FreeExecutorState(estate);
813 
814 	logicalrep_rel_close(rel, NoLock);
815 
816 	CommandCounterIncrement();
817 }
818 
819 /*
820  * Handle DELETE message.
821  *
822  * TODO: FDW support
823  */
824 static void
apply_handle_delete(StringInfo s)825 apply_handle_delete(StringInfo s)
826 {
827 	LogicalRepRelMapEntry *rel;
828 	LogicalRepTupleData oldtup;
829 	LogicalRepRelId relid;
830 	Oid			idxoid;
831 	EState	   *estate;
832 	EPQState	epqstate;
833 	TupleTableSlot *remoteslot;
834 	TupleTableSlot *localslot;
835 	bool		found;
836 	MemoryContext oldctx;
837 
838 	ensure_transaction();
839 
840 	relid = logicalrep_read_delete(s, &oldtup);
841 	rel = logicalrep_rel_open(relid, RowExclusiveLock);
842 	if (!should_apply_changes_for_rel(rel))
843 	{
844 		/*
845 		 * The relation can't become interesting in the middle of the
846 		 * transaction so it's safe to unlock it.
847 		 */
848 		logicalrep_rel_close(rel, RowExclusiveLock);
849 		return;
850 	}
851 
852 	/* Check if we can do the delete. */
853 	check_relation_updatable(rel);
854 
855 	/* Initialize the executor state. */
856 	estate = create_estate_for_relation(rel);
857 	remoteslot = ExecInitExtraTupleSlot(estate);
858 	ExecSetSlotDescriptor(remoteslot, RelationGetDescr(rel->localrel));
859 	localslot = ExecInitExtraTupleSlot(estate);
860 	ExecSetSlotDescriptor(localslot, RelationGetDescr(rel->localrel));
861 	EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
862 
863 	PushActiveSnapshot(GetTransactionSnapshot());
864 	ExecOpenIndices(estate->es_result_relation_info, false);
865 
866 	/* Find the tuple using the replica identity index. */
867 	oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
868 	slot_store_cstrings(remoteslot, rel, oldtup.values);
869 	MemoryContextSwitchTo(oldctx);
870 
871 	/*
872 	 * Try to find tuple using either replica identity index, primary key or
873 	 * if needed, sequential scan.
874 	 */
875 	idxoid = GetRelationIdentityOrPK(rel->localrel);
876 	Assert(OidIsValid(idxoid) ||
877 		   (rel->remoterel.replident == REPLICA_IDENTITY_FULL));
878 
879 	if (OidIsValid(idxoid))
880 		found = RelationFindReplTupleByIndex(rel->localrel, idxoid,
881 											 LockTupleExclusive,
882 											 remoteslot, localslot);
883 	else
884 		found = RelationFindReplTupleSeq(rel->localrel, LockTupleExclusive,
885 										 remoteslot, localslot);
886 	/* If found delete it. */
887 	if (found)
888 	{
889 		EvalPlanQualSetSlot(&epqstate, localslot);
890 
891 		/* Do the actual delete. */
892 		ExecSimpleRelationDelete(estate, &epqstate, localslot);
893 	}
894 	else
895 	{
896 		/* The tuple to be deleted could not be found. */
897 		ereport(DEBUG1,
898 				(errmsg("logical replication could not find row for delete "
899 						"in replication target relation \"%s\"",
900 						RelationGetRelationName(rel->localrel))));
901 	}
902 
903 	/* Cleanup. */
904 	ExecCloseIndices(estate->es_result_relation_info);
905 	PopActiveSnapshot();
906 
907 	/* Handle queued AFTER triggers. */
908 	AfterTriggerEndQuery(estate);
909 
910 	EvalPlanQualEnd(&epqstate);
911 	ExecResetTupleTable(estate->es_tupleTable, false);
912 	FreeExecutorState(estate);
913 
914 	logicalrep_rel_close(rel, NoLock);
915 
916 	CommandCounterIncrement();
917 }
918 
919 
920 /*
921  * Logical replication protocol message dispatcher.
922  */
923 static void
apply_dispatch(StringInfo s)924 apply_dispatch(StringInfo s)
925 {
926 	char		action = pq_getmsgbyte(s);
927 
928 	switch (action)
929 	{
930 			/* BEGIN */
931 		case 'B':
932 			apply_handle_begin(s);
933 			break;
934 			/* COMMIT */
935 		case 'C':
936 			apply_handle_commit(s);
937 			break;
938 			/* INSERT */
939 		case 'I':
940 			apply_handle_insert(s);
941 			break;
942 			/* UPDATE */
943 		case 'U':
944 			apply_handle_update(s);
945 			break;
946 			/* DELETE */
947 		case 'D':
948 			apply_handle_delete(s);
949 			break;
950 			/* RELATION */
951 		case 'R':
952 			apply_handle_relation(s);
953 			break;
954 			/* TYPE */
955 		case 'Y':
956 			apply_handle_type(s);
957 			break;
958 			/* ORIGIN */
959 		case 'O':
960 			apply_handle_origin(s);
961 			break;
962 		default:
963 			ereport(ERROR,
964 					(errcode(ERRCODE_PROTOCOL_VIOLATION),
965 					 errmsg("invalid logical replication message type \"%c\"", action)));
966 	}
967 }
968 
969 /*
970  * Figure out which write/flush positions to report to the walsender process.
971  *
972  * We can't simply report back the last LSN the walsender sent us because the
973  * local transaction might not yet be flushed to disk locally. Instead we
974  * build a list that associates local with remote LSNs for every commit. When
975  * reporting back the flush position to the sender we iterate that list and
976  * check which entries on it are already locally flushed. Those we can report
977  * as having been flushed.
978  *
979  * The have_pending_txes is true if there are outstanding transactions that
980  * need to be flushed.
981  */
982 static void
get_flush_position(XLogRecPtr * write,XLogRecPtr * flush,bool * have_pending_txes)983 get_flush_position(XLogRecPtr *write, XLogRecPtr *flush,
984 				   bool *have_pending_txes)
985 {
986 	dlist_mutable_iter iter;
987 	XLogRecPtr	local_flush = GetFlushRecPtr();
988 
989 	*write = InvalidXLogRecPtr;
990 	*flush = InvalidXLogRecPtr;
991 
992 	dlist_foreach_modify(iter, &lsn_mapping)
993 	{
994 		FlushPosition *pos =
995 		dlist_container(FlushPosition, node, iter.cur);
996 
997 		*write = pos->remote_end;
998 
999 		if (pos->local_end <= local_flush)
1000 		{
1001 			*flush = pos->remote_end;
1002 			dlist_delete(iter.cur);
1003 			pfree(pos);
1004 		}
1005 		else
1006 		{
1007 			/*
1008 			 * Don't want to uselessly iterate over the rest of the list which
1009 			 * could potentially be long. Instead get the last element and
1010 			 * grab the write position from there.
1011 			 */
1012 			pos = dlist_tail_element(FlushPosition, node,
1013 									 &lsn_mapping);
1014 			*write = pos->remote_end;
1015 			*have_pending_txes = true;
1016 			return;
1017 		}
1018 	}
1019 
1020 	*have_pending_txes = !dlist_is_empty(&lsn_mapping);
1021 }
1022 
1023 /*
1024  * Store current remote/local lsn pair in the tracking list.
1025  */
1026 static void
store_flush_position(XLogRecPtr remote_lsn)1027 store_flush_position(XLogRecPtr remote_lsn)
1028 {
1029 	FlushPosition *flushpos;
1030 
1031 	/* Need to do this in permanent context */
1032 	MemoryContextSwitchTo(ApplyContext);
1033 
1034 	/* Track commit lsn  */
1035 	flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
1036 	flushpos->local_end = XactLastCommitEnd;
1037 	flushpos->remote_end = remote_lsn;
1038 
1039 	dlist_push_tail(&lsn_mapping, &flushpos->node);
1040 	MemoryContextSwitchTo(ApplyMessageContext);
1041 }
1042 
1043 
1044 /* Update statistics of the worker. */
1045 static void
UpdateWorkerStats(XLogRecPtr last_lsn,TimestampTz send_time,bool reply)1046 UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
1047 {
1048 	MyLogicalRepWorker->last_lsn = last_lsn;
1049 	MyLogicalRepWorker->last_send_time = send_time;
1050 	MyLogicalRepWorker->last_recv_time = GetCurrentTimestamp();
1051 	if (reply)
1052 	{
1053 		MyLogicalRepWorker->reply_lsn = last_lsn;
1054 		MyLogicalRepWorker->reply_time = send_time;
1055 	}
1056 }
1057 
1058 /*
1059  * Apply main loop.
1060  */
1061 static void
LogicalRepApplyLoop(XLogRecPtr last_received)1062 LogicalRepApplyLoop(XLogRecPtr last_received)
1063 {
1064 	TimestampTz last_recv_timestamp = GetCurrentTimestamp();
1065 	bool		ping_sent = false;
1066 
1067 	/*
1068 	 * Init the ApplyMessageContext which we clean up after each replication
1069 	 * protocol message.
1070 	 */
1071 	ApplyMessageContext = AllocSetContextCreate(ApplyContext,
1072 												"ApplyMessageContext",
1073 												ALLOCSET_DEFAULT_SIZES);
1074 
1075 	/* mark as idle, before starting to loop */
1076 	pgstat_report_activity(STATE_IDLE, NULL);
1077 
1078 	/* This outer loop iterates once per wait. */
1079 	for (;;)
1080 	{
1081 		pgsocket	fd = PGINVALID_SOCKET;
1082 		int			rc;
1083 		int			len;
1084 		char	   *buf = NULL;
1085 		bool		endofstream = false;
1086 		long		wait_time;
1087 
1088 		CHECK_FOR_INTERRUPTS();
1089 
1090 		MemoryContextSwitchTo(ApplyMessageContext);
1091 
1092 		len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
1093 
1094 		if (len != 0)
1095 		{
1096 			/* Loop to process all available data (without blocking). */
1097 			for (;;)
1098 			{
1099 				CHECK_FOR_INTERRUPTS();
1100 
1101 				if (len == 0)
1102 				{
1103 					break;
1104 				}
1105 				else if (len < 0)
1106 				{
1107 					ereport(LOG,
1108 							(errmsg("data stream from publisher has ended")));
1109 					endofstream = true;
1110 					break;
1111 				}
1112 				else
1113 				{
1114 					int			c;
1115 					StringInfoData s;
1116 
1117 					/* Reset timeout. */
1118 					last_recv_timestamp = GetCurrentTimestamp();
1119 					ping_sent = false;
1120 
1121 					/* Ensure we are reading the data into our memory context. */
1122 					MemoryContextSwitchTo(ApplyMessageContext);
1123 
1124 					s.data = buf;
1125 					s.len = len;
1126 					s.cursor = 0;
1127 					s.maxlen = -1;
1128 
1129 					c = pq_getmsgbyte(&s);
1130 
1131 					if (c == 'w')
1132 					{
1133 						XLogRecPtr	start_lsn;
1134 						XLogRecPtr	end_lsn;
1135 						TimestampTz send_time;
1136 
1137 						start_lsn = pq_getmsgint64(&s);
1138 						end_lsn = pq_getmsgint64(&s);
1139 						send_time = pq_getmsgint64(&s);
1140 
1141 						if (last_received < start_lsn)
1142 							last_received = start_lsn;
1143 
1144 						if (last_received < end_lsn)
1145 							last_received = end_lsn;
1146 
1147 						UpdateWorkerStats(last_received, send_time, false);
1148 
1149 						apply_dispatch(&s);
1150 					}
1151 					else if (c == 'k')
1152 					{
1153 						XLogRecPtr	end_lsn;
1154 						TimestampTz timestamp;
1155 						bool		reply_requested;
1156 
1157 						end_lsn = pq_getmsgint64(&s);
1158 						timestamp = pq_getmsgint64(&s);
1159 						reply_requested = pq_getmsgbyte(&s);
1160 
1161 						if (last_received < end_lsn)
1162 							last_received = end_lsn;
1163 
1164 						send_feedback(last_received, reply_requested, false);
1165 						UpdateWorkerStats(last_received, timestamp, true);
1166 					}
1167 					/* other message types are purposefully ignored */
1168 
1169 					MemoryContextReset(ApplyMessageContext);
1170 				}
1171 
1172 				len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
1173 			}
1174 		}
1175 
1176 		/* confirm all writes so far */
1177 		send_feedback(last_received, false, false);
1178 
1179 		if (!in_remote_transaction)
1180 		{
1181 			/*
1182 			 * If we didn't get any transactions for a while there might be
1183 			 * unconsumed invalidation messages in the queue, consume them
1184 			 * now.
1185 			 */
1186 			AcceptInvalidationMessages();
1187 			maybe_reread_subscription();
1188 
1189 			/* Process any table synchronization changes. */
1190 			process_syncing_tables(last_received);
1191 		}
1192 
1193 		/* Cleanup the memory. */
1194 		MemoryContextResetAndDeleteChildren(ApplyMessageContext);
1195 		MemoryContextSwitchTo(TopMemoryContext);
1196 
1197 		/* Check if we need to exit the streaming loop. */
1198 		if (endofstream)
1199 		{
1200 			TimeLineID	tli;
1201 
1202 			walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
1203 			break;
1204 		}
1205 
1206 		/*
1207 		 * Wait for more data or latch.  If we have unflushed transactions,
1208 		 * wake up after WalWriterDelay to see if they've been flushed yet (in
1209 		 * which case we should send a feedback message).  Otherwise, there's
1210 		 * no particular urgency about waking up unless we get data or a
1211 		 * signal.
1212 		 */
1213 		if (!dlist_is_empty(&lsn_mapping))
1214 			wait_time = WalWriterDelay;
1215 		else
1216 			wait_time = NAPTIME_PER_CYCLE;
1217 
1218 		rc = WaitLatchOrSocket(MyLatch,
1219 							   WL_SOCKET_READABLE | WL_LATCH_SET |
1220 							   WL_TIMEOUT | WL_POSTMASTER_DEATH,
1221 							   fd, wait_time,
1222 							   WAIT_EVENT_LOGICAL_APPLY_MAIN);
1223 
1224 		/* Emergency bailout if postmaster has died */
1225 		if (rc & WL_POSTMASTER_DEATH)
1226 			proc_exit(1);
1227 
1228 		if (rc & WL_LATCH_SET)
1229 		{
1230 			ResetLatch(MyLatch);
1231 			CHECK_FOR_INTERRUPTS();
1232 		}
1233 
1234 		if (got_SIGHUP)
1235 		{
1236 			got_SIGHUP = false;
1237 			ProcessConfigFile(PGC_SIGHUP);
1238 		}
1239 
1240 		if (rc & WL_TIMEOUT)
1241 		{
1242 			/*
1243 			 * We didn't receive anything new. If we haven't heard anything
1244 			 * from the server for more than wal_receiver_timeout / 2, ping
1245 			 * the server. Also, if it's been longer than
1246 			 * wal_receiver_status_interval since the last update we sent,
1247 			 * send a status update to the master anyway, to report any
1248 			 * progress in applying WAL.
1249 			 */
1250 			bool		requestReply = false;
1251 
1252 			/*
1253 			 * Check if time since last receive from standby has reached the
1254 			 * configured limit.
1255 			 */
1256 			if (wal_receiver_timeout > 0)
1257 			{
1258 				TimestampTz now = GetCurrentTimestamp();
1259 				TimestampTz timeout;
1260 
1261 				timeout =
1262 					TimestampTzPlusMilliseconds(last_recv_timestamp,
1263 												wal_receiver_timeout);
1264 
1265 				if (now >= timeout)
1266 					ereport(ERROR,
1267 							(errmsg("terminating logical replication worker due to timeout")));
1268 
1269 				/* Check to see if it's time for a ping. */
1270 				if (!ping_sent)
1271 				{
1272 					timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
1273 														  (wal_receiver_timeout / 2));
1274 					if (now >= timeout)
1275 					{
1276 						requestReply = true;
1277 						ping_sent = true;
1278 					}
1279 				}
1280 			}
1281 
1282 			send_feedback(last_received, requestReply, requestReply);
1283 		}
1284 	}
1285 }
1286 
1287 /*
1288  * Send a Standby Status Update message to server.
1289  *
1290  * 'recvpos' is the latest LSN we've received data to, force is set if we need
1291  * to send a response to avoid timeouts.
1292  */
1293 static void
send_feedback(XLogRecPtr recvpos,bool force,bool requestReply)1294 send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
1295 {
1296 	static StringInfo reply_message = NULL;
1297 	static TimestampTz send_time = 0;
1298 
1299 	static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
1300 	static XLogRecPtr last_writepos = InvalidXLogRecPtr;
1301 	static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
1302 
1303 	XLogRecPtr	writepos;
1304 	XLogRecPtr	flushpos;
1305 	TimestampTz now;
1306 	bool		have_pending_txes;
1307 
1308 	/*
1309 	 * If the user doesn't want status to be reported to the publisher, be
1310 	 * sure to exit before doing anything at all.
1311 	 */
1312 	if (!force && wal_receiver_status_interval <= 0)
1313 		return;
1314 
1315 	/* It's legal to not pass a recvpos */
1316 	if (recvpos < last_recvpos)
1317 		recvpos = last_recvpos;
1318 
1319 	get_flush_position(&writepos, &flushpos, &have_pending_txes);
1320 
1321 	/*
1322 	 * No outstanding transactions to flush, we can report the latest received
1323 	 * position. This is important for synchronous replication.
1324 	 */
1325 	if (!have_pending_txes)
1326 		flushpos = writepos = recvpos;
1327 
1328 	if (writepos < last_writepos)
1329 		writepos = last_writepos;
1330 
1331 	if (flushpos < last_flushpos)
1332 		flushpos = last_flushpos;
1333 
1334 	now = GetCurrentTimestamp();
1335 
1336 	/* if we've already reported everything we're good */
1337 	if (!force &&
1338 		writepos == last_writepos &&
1339 		flushpos == last_flushpos &&
1340 		!TimestampDifferenceExceeds(send_time, now,
1341 									wal_receiver_status_interval * 1000))
1342 		return;
1343 	send_time = now;
1344 
1345 	if (!reply_message)
1346 	{
1347 		MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
1348 
1349 		reply_message = makeStringInfo();
1350 		MemoryContextSwitchTo(oldctx);
1351 	}
1352 	else
1353 		resetStringInfo(reply_message);
1354 
1355 	pq_sendbyte(reply_message, 'r');
1356 	pq_sendint64(reply_message, recvpos);	/* write */
1357 	pq_sendint64(reply_message, flushpos);	/* flush */
1358 	pq_sendint64(reply_message, writepos);	/* apply */
1359 	pq_sendint64(reply_message, now);	/* sendTime */
1360 	pq_sendbyte(reply_message, requestReply);	/* replyRequested */
1361 
1362 	elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
1363 		 force,
1364 		 (uint32) (recvpos >> 32), (uint32) recvpos,
1365 		 (uint32) (writepos >> 32), (uint32) writepos,
1366 		 (uint32) (flushpos >> 32), (uint32) flushpos
1367 		);
1368 
1369 	walrcv_send(LogRepWorkerWalRcvConn,
1370 				reply_message->data, reply_message->len);
1371 
1372 	if (recvpos > last_recvpos)
1373 		last_recvpos = recvpos;
1374 	if (writepos > last_writepos)
1375 		last_writepos = writepos;
1376 	if (flushpos > last_flushpos)
1377 		last_flushpos = flushpos;
1378 }
1379 
1380 /*
1381  * Reread subscription info if needed. Most changes will be exit.
1382  */
1383 static void
maybe_reread_subscription(void)1384 maybe_reread_subscription(void)
1385 {
1386 	MemoryContext oldctx;
1387 	Subscription *newsub;
1388 	bool		started_tx = false;
1389 
1390 	/* When cache state is valid there is nothing to do here. */
1391 	if (MySubscriptionValid)
1392 		return;
1393 
1394 	/* This function might be called inside or outside of transaction. */
1395 	if (!IsTransactionState())
1396 	{
1397 		StartTransactionCommand();
1398 		started_tx = true;
1399 	}
1400 
1401 	/* Ensure allocations in permanent context. */
1402 	oldctx = MemoryContextSwitchTo(ApplyContext);
1403 
1404 	newsub = GetSubscription(MyLogicalRepWorker->subid, true);
1405 
1406 	/*
1407 	 * Exit if the subscription was removed. This normally should not happen
1408 	 * as the worker gets killed during DROP SUBSCRIPTION.
1409 	 */
1410 	if (!newsub)
1411 	{
1412 		ereport(LOG,
1413 				(errmsg("logical replication apply worker for subscription \"%s\" will "
1414 						"stop because the subscription was removed",
1415 						MySubscription->name)));
1416 
1417 		proc_exit(0);
1418 	}
1419 
1420 	/*
1421 	 * Exit if the subscription was disabled. This normally should not happen
1422 	 * as the worker gets killed during ALTER SUBSCRIPTION ... DISABLE.
1423 	 */
1424 	if (!newsub->enabled)
1425 	{
1426 		ereport(LOG,
1427 				(errmsg("logical replication apply worker for subscription \"%s\" will "
1428 						"stop because the subscription was disabled",
1429 						MySubscription->name)));
1430 
1431 		proc_exit(0);
1432 	}
1433 
1434 	/*
1435 	 * Exit if connection string was changed. The launcher will start new
1436 	 * worker.
1437 	 */
1438 	if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0)
1439 	{
1440 		ereport(LOG,
1441 				(errmsg("logical replication apply worker for subscription \"%s\" will "
1442 						"restart because the connection information was changed",
1443 						MySubscription->name)));
1444 
1445 		proc_exit(0);
1446 	}
1447 
1448 	/*
1449 	 * Exit if subscription name was changed (it's used for
1450 	 * fallback_application_name). The launcher will start new worker.
1451 	 */
1452 	if (strcmp(newsub->name, MySubscription->name) != 0)
1453 	{
1454 		ereport(LOG,
1455 				(errmsg("logical replication apply worker for subscription \"%s\" will "
1456 						"restart because subscription was renamed",
1457 						MySubscription->name)));
1458 
1459 		proc_exit(0);
1460 	}
1461 
1462 	/* !slotname should never happen when enabled is true. */
1463 	Assert(newsub->slotname);
1464 
1465 	/*
1466 	 * We need to make new connection to new slot if slot name has changed so
1467 	 * exit here as well if that's the case.
1468 	 */
1469 	if (strcmp(newsub->slotname, MySubscription->slotname) != 0)
1470 	{
1471 		ereport(LOG,
1472 				(errmsg("logical replication apply worker for subscription \"%s\" will "
1473 						"restart because the replication slot name was changed",
1474 						MySubscription->name)));
1475 
1476 		proc_exit(0);
1477 	}
1478 
1479 	/*
1480 	 * Exit if publication list was changed. The launcher will start new
1481 	 * worker.
1482 	 */
1483 	if (!equal(newsub->publications, MySubscription->publications))
1484 	{
1485 		ereport(LOG,
1486 				(errmsg("logical replication apply worker for subscription \"%s\" will "
1487 						"restart because subscription's publications were changed",
1488 						MySubscription->name)));
1489 
1490 		proc_exit(0);
1491 	}
1492 
1493 	/* Check for other changes that should never happen too. */
1494 	if (newsub->dbid != MySubscription->dbid)
1495 	{
1496 		elog(ERROR, "subscription %u changed unexpectedly",
1497 			 MyLogicalRepWorker->subid);
1498 	}
1499 
1500 	/* Clean old subscription info and switch to new one. */
1501 	FreeSubscription(MySubscription);
1502 	MySubscription = newsub;
1503 
1504 	MemoryContextSwitchTo(oldctx);
1505 
1506 	/* Change synchronous commit according to the user's wishes */
1507 	SetConfigOption("synchronous_commit", MySubscription->synccommit,
1508 					PGC_BACKEND, PGC_S_OVERRIDE);
1509 
1510 	if (started_tx)
1511 		CommitTransactionCommand();
1512 
1513 	MySubscriptionValid = true;
1514 }
1515 
1516 /*
1517  * Callback from subscription syscache invalidation.
1518  */
1519 static void
subscription_change_cb(Datum arg,int cacheid,uint32 hashvalue)1520 subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
1521 {
1522 	MySubscriptionValid = false;
1523 }
1524 
1525 /* SIGHUP: set flag to reload configuration at next convenient time */
1526 static void
logicalrep_worker_sighup(SIGNAL_ARGS)1527 logicalrep_worker_sighup(SIGNAL_ARGS)
1528 {
1529 	int			save_errno = errno;
1530 
1531 	got_SIGHUP = true;
1532 
1533 	/* Waken anything waiting on the process latch */
1534 	SetLatch(MyLatch);
1535 
1536 	errno = save_errno;
1537 }
1538 
1539 /* Logical Replication Apply worker entry point */
1540 void
ApplyWorkerMain(Datum main_arg)1541 ApplyWorkerMain(Datum main_arg)
1542 {
1543 	int			worker_slot = DatumGetInt32(main_arg);
1544 	MemoryContext oldctx;
1545 	char		originname[NAMEDATALEN];
1546 	XLogRecPtr	origin_startpos;
1547 	char	   *myslotname;
1548 	WalRcvStreamOptions options;
1549 
1550 	/* Attach to slot */
1551 	logicalrep_worker_attach(worker_slot);
1552 
1553 	/* Setup signal handling */
1554 	pqsignal(SIGHUP, logicalrep_worker_sighup);
1555 	pqsignal(SIGTERM, die);
1556 	BackgroundWorkerUnblockSignals();
1557 
1558 	/* Initialise stats to a sanish value */
1559 	MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
1560 		MyLogicalRepWorker->reply_time = GetCurrentTimestamp();
1561 
1562 	/* Load the libpq-specific functions */
1563 	load_file("libpqwalreceiver", false);
1564 
1565 	Assert(CurrentResourceOwner == NULL);
1566 	CurrentResourceOwner = ResourceOwnerCreate(NULL,
1567 											   "logical replication apply");
1568 
1569 	/* Run as replica session replication role. */
1570 	SetConfigOption("session_replication_role", "replica",
1571 					PGC_SUSET, PGC_S_OVERRIDE);
1572 
1573 	/* Connect to our database. */
1574 	BackgroundWorkerInitializeConnectionByOid(MyLogicalRepWorker->dbid,
1575 											  MyLogicalRepWorker->userid);
1576 
1577 	/*
1578 	 * Set always-secure search path, so malicious users can't redirect user
1579 	 * code (e.g. pg_index.indexprs).
1580 	 */
1581 	SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
1582 
1583 	/* Load the subscription into persistent memory context. */
1584 	ApplyContext = AllocSetContextCreate(TopMemoryContext,
1585 										 "ApplyContext",
1586 										 ALLOCSET_DEFAULT_SIZES);
1587 	StartTransactionCommand();
1588 	oldctx = MemoryContextSwitchTo(ApplyContext);
1589 	MySubscription = GetSubscription(MyLogicalRepWorker->subid, false);
1590 	MySubscriptionValid = true;
1591 	MemoryContextSwitchTo(oldctx);
1592 
1593 	/* Setup synchronous commit according to the user's wishes */
1594 	SetConfigOption("synchronous_commit", MySubscription->synccommit,
1595 					PGC_BACKEND, PGC_S_OVERRIDE);
1596 
1597 	if (!MySubscription->enabled)
1598 	{
1599 		ereport(LOG,
1600 				(errmsg("logical replication apply worker for subscription \"%s\" will not "
1601 						"start because the subscription was disabled during startup",
1602 						MySubscription->name)));
1603 
1604 		proc_exit(0);
1605 	}
1606 
1607 	/* Keep us informed about subscription changes. */
1608 	CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
1609 								  subscription_change_cb,
1610 								  (Datum) 0);
1611 
1612 	if (am_tablesync_worker())
1613 		ereport(LOG,
1614 				(errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
1615 						MySubscription->name, get_rel_name(MyLogicalRepWorker->relid))));
1616 	else
1617 		ereport(LOG,
1618 				(errmsg("logical replication apply worker for subscription \"%s\" has started",
1619 						MySubscription->name)));
1620 
1621 	CommitTransactionCommand();
1622 
1623 	/* Connect to the origin and start the replication. */
1624 	elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
1625 		 MySubscription->conninfo);
1626 
1627 	if (am_tablesync_worker())
1628 	{
1629 		char	   *syncslotname;
1630 
1631 		/* This is table synchroniation worker, call initial sync. */
1632 		syncslotname = LogicalRepSyncTableStart(&origin_startpos);
1633 
1634 		/* The slot name needs to be allocated in permanent memory context. */
1635 		oldctx = MemoryContextSwitchTo(ApplyContext);
1636 		myslotname = pstrdup(syncslotname);
1637 		MemoryContextSwitchTo(oldctx);
1638 
1639 		pfree(syncslotname);
1640 	}
1641 	else
1642 	{
1643 		/* This is main apply worker */
1644 		RepOriginId originid;
1645 		TimeLineID	startpointTLI;
1646 		char	   *err;
1647 		int			server_version;
1648 
1649 		myslotname = MySubscription->slotname;
1650 
1651 		/*
1652 		 * This shouldn't happen if the subscription is enabled, but guard
1653 		 * against DDL bugs or manual catalog changes.  (libpqwalreceiver will
1654 		 * crash if slot is NULL.)
1655 		 */
1656 		if (!myslotname)
1657 			ereport(ERROR,
1658 					(errmsg("subscription has no replication slot set")));
1659 
1660 		/* Setup replication origin tracking. */
1661 		StartTransactionCommand();
1662 		snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid);
1663 		originid = replorigin_by_name(originname, true);
1664 		if (!OidIsValid(originid))
1665 			originid = replorigin_create(originname);
1666 		replorigin_session_setup(originid);
1667 		replorigin_session_origin = originid;
1668 		origin_startpos = replorigin_session_get_progress(false);
1669 		CommitTransactionCommand();
1670 
1671 		LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
1672 												MySubscription->name, &err);
1673 		if (LogRepWorkerWalRcvConn == NULL)
1674 			ereport(ERROR,
1675 					(errmsg("could not connect to the publisher: %s", err)));
1676 
1677 		/*
1678 		 * We don't really use the output identify_system for anything but it
1679 		 * does some initializations on the upstream so let's still call it.
1680 		 */
1681 		(void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI,
1682 									  &server_version);
1683 	}
1684 
1685 	/*
1686 	 * Setup callback for syscache so that we know when something changes in
1687 	 * the subscription relation state.
1688 	 */
1689 	CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
1690 								  invalidate_syncing_table_states,
1691 								  (Datum) 0);
1692 
1693 	/* Build logical replication streaming options. */
1694 	options.logical = true;
1695 	options.startpoint = origin_startpos;
1696 	options.slotname = myslotname;
1697 	options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM;
1698 	options.proto.logical.publication_names = MySubscription->publications;
1699 
1700 	/* Start normal logical streaming replication. */
1701 	walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
1702 
1703 	/* Run the main loop. */
1704 	LogicalRepApplyLoop(origin_startpos);
1705 
1706 	proc_exit(0);
1707 }
1708 
1709 /*
1710  * Is current process a logical replication worker?
1711  */
1712 bool
IsLogicalWorker(void)1713 IsLogicalWorker(void)
1714 {
1715 	return MyLogicalRepWorker != NULL;
1716 }
1717