1 /*-------------------------------------------------------------------------
2  * worker.c
3  *	   PostgreSQL logical replication worker (apply)
4  *
5  * Copyright (c) 2016-2019, PostgreSQL Global Development Group
6  *
7  * IDENTIFICATION
8  *	  src/backend/replication/logical/worker.c
9  *
10  * NOTES
11  *	  This file contains the worker which applies logical changes as they come
12  *	  from remote logical replication stream.
13  *
14  *	  The main worker (apply) is started by logical replication worker
15  *	  launcher for every enabled subscription in a database. It uses
16  *	  walsender protocol to communicate with publisher.
17  *
18  *	  This module includes server facing code and shares libpqwalreceiver
19  *	  module with walreceiver for providing the libpq specific functionality.
20  *
21  *-------------------------------------------------------------------------
22  */
23 
24 #include "postgres.h"
25 
26 #include "access/table.h"
27 #include "access/tableam.h"
28 #include "access/xact.h"
29 #include "access/xlog_internal.h"
30 #include "catalog/catalog.h"
31 #include "catalog/namespace.h"
32 #include "catalog/pg_subscription.h"
33 #include "catalog/pg_subscription_rel.h"
34 #include "commands/tablecmds.h"
35 #include "commands/trigger.h"
36 #include "executor/executor.h"
37 #include "executor/nodeModifyTable.h"
38 #include "funcapi.h"
39 #include "libpq/pqformat.h"
40 #include "libpq/pqsignal.h"
41 #include "mb/pg_wchar.h"
42 #include "miscadmin.h"
43 #include "nodes/makefuncs.h"
44 #include "optimizer/optimizer.h"
45 #include "pgstat.h"
46 #include "postmaster/bgworker.h"
47 #include "postmaster/postmaster.h"
48 #include "postmaster/walwriter.h"
49 #include "replication/decode.h"
50 #include "replication/logical.h"
51 #include "replication/logicalproto.h"
52 #include "replication/logicalrelation.h"
53 #include "replication/logicalworker.h"
54 #include "replication/origin.h"
55 #include "replication/reorderbuffer.h"
56 #include "replication/snapbuild.h"
57 #include "replication/walreceiver.h"
58 #include "replication/worker_internal.h"
59 #include "rewrite/rewriteHandler.h"
60 #include "storage/bufmgr.h"
61 #include "storage/ipc.h"
62 #include "storage/lmgr.h"
63 #include "storage/proc.h"
64 #include "storage/procarray.h"
65 #include "tcop/tcopprot.h"
66 #include "utils/builtins.h"
67 #include "utils/catcache.h"
68 #include "utils/datum.h"
69 #include "utils/fmgroids.h"
70 #include "utils/guc.h"
71 #include "utils/inval.h"
72 #include "utils/lsyscache.h"
73 #include "utils/memutils.h"
74 #include "utils/rel.h"
75 #include "utils/syscache.h"
76 #include "utils/timeout.h"
77 
78 #define NAPTIME_PER_CYCLE 1000	/* max sleep time between cycles (1s) */
79 
80 typedef struct FlushPosition
81 {
82 	dlist_node	node;
83 	XLogRecPtr	local_end;
84 	XLogRecPtr	remote_end;
85 } FlushPosition;
86 
87 static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
88 
89 typedef struct SlotErrCallbackArg
90 {
91 	LogicalRepRelMapEntry *rel;
92 	int			remote_attnum;
93 } SlotErrCallbackArg;
94 
95 static MemoryContext ApplyMessageContext = NULL;
96 MemoryContext ApplyContext = NULL;
97 
98 WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
99 
100 Subscription *MySubscription = NULL;
101 bool		MySubscriptionValid = false;
102 
103 bool		in_remote_transaction = false;
104 static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
105 
106 static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
107 
108 static void store_flush_position(XLogRecPtr remote_lsn);
109 
110 static void maybe_reread_subscription(void);
111 
112 /* Flags set by signal handlers */
113 static volatile sig_atomic_t got_SIGHUP = false;
114 
115 /*
116  * Should this worker apply changes for given relation.
117  *
118  * This is mainly needed for initial relation data sync as that runs in
119  * separate worker process running in parallel and we need some way to skip
120  * changes coming to the main apply worker during the sync of a table.
121  *
122  * Note we need to do smaller or equals comparison for SYNCDONE state because
123  * it might hold position of end of initial slot consistent point WAL
124  * record + 1 (ie start of next record) and next record can be COMMIT of
125  * transaction we are now processing (which is what we set remote_final_lsn
126  * to in apply_handle_begin).
127  */
128 static bool
should_apply_changes_for_rel(LogicalRepRelMapEntry * rel)129 should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
130 {
131 	if (am_tablesync_worker())
132 		return MyLogicalRepWorker->relid == rel->localreloid;
133 	else
134 		return (rel->state == SUBREL_STATE_READY ||
135 				(rel->state == SUBREL_STATE_SYNCDONE &&
136 				 rel->statelsn <= remote_final_lsn));
137 }
138 
139 /*
140  * Begin one step (one INSERT, UPDATE, etc) of a replication transaction.
141  *
142  * Start a transaction, if this is the first step (else we keep using the
143  * existing transaction).
144  * Also provide a global snapshot and ensure we run in ApplyMessageContext.
145  */
146 static void
begin_replication_step(void)147 begin_replication_step(void)
148 {
149 	SetCurrentStatementStartTimestamp();
150 
151 	if (!IsTransactionState())
152 	{
153 		StartTransactionCommand();
154 		maybe_reread_subscription();
155 	}
156 
157 	PushActiveSnapshot(GetTransactionSnapshot());
158 
159 	MemoryContextSwitchTo(ApplyMessageContext);
160 }
161 
162 /*
163  * Finish up one step of a replication transaction.
164  * Callers of begin_replication_step() must also call this.
165  *
166  * We don't close out the transaction here, but we should increment
167  * the command counter to make the effects of this step visible.
168  */
169 static void
end_replication_step(void)170 end_replication_step(void)
171 {
172 	PopActiveSnapshot();
173 
174 	CommandCounterIncrement();
175 }
176 
177 
178 /*
179  * Executor state preparation for evaluation of constraint expressions,
180  * indexes and triggers.
181  *
182  * This is based on similar code in copy.c
183  */
184 static EState *
create_estate_for_relation(LogicalRepRelMapEntry * rel)185 create_estate_for_relation(LogicalRepRelMapEntry *rel)
186 {
187 	EState	   *estate;
188 	ResultRelInfo *resultRelInfo;
189 	RangeTblEntry *rte;
190 
191 	estate = CreateExecutorState();
192 
193 	rte = makeNode(RangeTblEntry);
194 	rte->rtekind = RTE_RELATION;
195 	rte->relid = RelationGetRelid(rel->localrel);
196 	rte->relkind = rel->localrel->rd_rel->relkind;
197 	rte->rellockmode = AccessShareLock;
198 	ExecInitRangeTable(estate, list_make1(rte));
199 
200 	resultRelInfo = makeNode(ResultRelInfo);
201 	InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
202 
203 	estate->es_result_relations = resultRelInfo;
204 	estate->es_num_result_relations = 1;
205 	estate->es_result_relation_info = resultRelInfo;
206 
207 	estate->es_output_cid = GetCurrentCommandId(true);
208 
209 	/* Prepare to catch AFTER triggers. */
210 	AfterTriggerBeginQuery();
211 
212 	return estate;
213 }
214 
215 /*
216  * Finish any operations related to the executor state created by
217  * create_estate_for_relation().
218  */
219 static void
finish_estate(EState * estate)220 finish_estate(EState *estate)
221 {
222 	/* Handle any queued AFTER triggers. */
223 	AfterTriggerEndQuery(estate);
224 
225 	/* Cleanup. */
226 	ExecResetTupleTable(estate->es_tupleTable, false);
227 	FreeExecutorState(estate);
228 }
229 
230 /*
231  * Executes default values for columns for which we can't map to remote
232  * relation columns.
233  *
234  * This allows us to support tables which have more columns on the downstream
235  * than on the upstream.
236  */
237 static void
slot_fill_defaults(LogicalRepRelMapEntry * rel,EState * estate,TupleTableSlot * slot)238 slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate,
239 				   TupleTableSlot *slot)
240 {
241 	TupleDesc	desc = RelationGetDescr(rel->localrel);
242 	int			num_phys_attrs = desc->natts;
243 	int			i;
244 	int			attnum,
245 				num_defaults = 0;
246 	int		   *defmap;
247 	ExprState **defexprs;
248 	ExprContext *econtext;
249 
250 	econtext = GetPerTupleExprContext(estate);
251 
252 	/* We got all the data via replication, no need to evaluate anything. */
253 	if (num_phys_attrs == rel->remoterel.natts)
254 		return;
255 
256 	defmap = (int *) palloc(num_phys_attrs * sizeof(int));
257 	defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
258 
259 	for (attnum = 0; attnum < num_phys_attrs; attnum++)
260 	{
261 		Expr	   *defexpr;
262 
263 		if (TupleDescAttr(desc, attnum)->attisdropped || TupleDescAttr(desc, attnum)->attgenerated)
264 			continue;
265 
266 		if (rel->attrmap[attnum] >= 0)
267 			continue;
268 
269 		defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
270 
271 		if (defexpr != NULL)
272 		{
273 			/* Run the expression through planner */
274 			defexpr = expression_planner(defexpr);
275 
276 			/* Initialize executable expression in copycontext */
277 			defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
278 			defmap[num_defaults] = attnum;
279 			num_defaults++;
280 		}
281 
282 	}
283 
284 	for (i = 0; i < num_defaults; i++)
285 		slot->tts_values[defmap[i]] =
286 			ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
287 }
288 
289 /*
290  * Error callback to give more context info about data conversion failures
291  * while reading data from the remote server.
292  */
293 static void
slot_store_error_callback(void * arg)294 slot_store_error_callback(void *arg)
295 {
296 	SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
297 	LogicalRepRelMapEntry *rel;
298 
299 	/* Nothing to do if remote attribute number is not set */
300 	if (errarg->remote_attnum < 0)
301 		return;
302 
303 	rel = errarg->rel;
304 	errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\"",
305 			   rel->remoterel.nspname, rel->remoterel.relname,
306 			   rel->remoterel.attnames[errarg->remote_attnum]);
307 }
308 
309 /*
310  * Store data in C string form into slot.
311  * This is similar to BuildTupleFromCStrings but TupleTableSlot fits our
312  * use better.
313  */
314 static void
slot_store_cstrings(TupleTableSlot * slot,LogicalRepRelMapEntry * rel,char ** values)315 slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
316 					char **values)
317 {
318 	int			natts = slot->tts_tupleDescriptor->natts;
319 	int			i;
320 	SlotErrCallbackArg errarg;
321 	ErrorContextCallback errcallback;
322 
323 	ExecClearTuple(slot);
324 
325 	/* Push callback + info on the error context stack */
326 	errarg.rel = rel;
327 	errarg.remote_attnum = -1;
328 	errcallback.callback = slot_store_error_callback;
329 	errcallback.arg = (void *) &errarg;
330 	errcallback.previous = error_context_stack;
331 	error_context_stack = &errcallback;
332 
333 	/* Call the "in" function for each non-dropped attribute */
334 	for (i = 0; i < natts; i++)
335 	{
336 		Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
337 		int			remoteattnum = rel->attrmap[i];
338 
339 		if (!att->attisdropped && remoteattnum >= 0 &&
340 			values[remoteattnum] != NULL)
341 		{
342 			Oid			typinput;
343 			Oid			typioparam;
344 
345 			errarg.remote_attnum = remoteattnum;
346 
347 			getTypeInputInfo(att->atttypid, &typinput, &typioparam);
348 			slot->tts_values[i] =
349 				OidInputFunctionCall(typinput, values[remoteattnum],
350 									 typioparam, att->atttypmod);
351 			slot->tts_isnull[i] = false;
352 
353 			errarg.remote_attnum = -1;
354 		}
355 		else
356 		{
357 			/*
358 			 * We assign NULL to dropped attributes, NULL values, and missing
359 			 * values (missing values should be later filled using
360 			 * slot_fill_defaults).
361 			 */
362 			slot->tts_values[i] = (Datum) 0;
363 			slot->tts_isnull[i] = true;
364 		}
365 	}
366 
367 	/* Pop the error context stack */
368 	error_context_stack = errcallback.previous;
369 
370 	ExecStoreVirtualTuple(slot);
371 }
372 
373 /*
374  * Replace selected columns with user data provided as C strings.
375  * This is somewhat similar to heap_modify_tuple but also calls the type
376  * input functions on the user data.
377  * "slot" is filled with a copy of the tuple in "srcslot", with
378  * columns selected by the "replaces" array replaced with data values
379  * from "values".
380  * Caution: unreplaced pass-by-ref columns in "slot" will point into the
381  * storage for "srcslot".  This is OK for current usage, but someday we may
382  * need to materialize "slot" at the end to make it independent of "srcslot".
383  */
384 static void
slot_modify_cstrings(TupleTableSlot * slot,TupleTableSlot * srcslot,LogicalRepRelMapEntry * rel,char ** values,bool * replaces)385 slot_modify_cstrings(TupleTableSlot *slot, TupleTableSlot *srcslot,
386 					 LogicalRepRelMapEntry *rel,
387 					 char **values, bool *replaces)
388 {
389 	int			natts = slot->tts_tupleDescriptor->natts;
390 	int			i;
391 	SlotErrCallbackArg errarg;
392 	ErrorContextCallback errcallback;
393 
394 	/* We'll fill "slot" with a virtual tuple, so we must start with ... */
395 	ExecClearTuple(slot);
396 
397 	/*
398 	 * Copy all the column data from srcslot, so that we'll have valid values
399 	 * for unreplaced columns.
400 	 */
401 	Assert(natts == srcslot->tts_tupleDescriptor->natts);
402 	slot_getallattrs(srcslot);
403 	memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
404 	memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
405 
406 	/* For error reporting, push callback + info on the error context stack */
407 	errarg.rel = rel;
408 	errarg.remote_attnum = -1;
409 	errcallback.callback = slot_store_error_callback;
410 	errcallback.arg = (void *) &errarg;
411 	errcallback.previous = error_context_stack;
412 	error_context_stack = &errcallback;
413 
414 	/* Call the "in" function for each replaced attribute */
415 	for (i = 0; i < natts; i++)
416 	{
417 		Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
418 		int			remoteattnum = rel->attrmap[i];
419 
420 		if (remoteattnum < 0)
421 			continue;
422 
423 		if (!replaces[remoteattnum])
424 			continue;
425 
426 		if (values[remoteattnum] != NULL)
427 		{
428 			Oid			typinput;
429 			Oid			typioparam;
430 
431 			errarg.remote_attnum = remoteattnum;
432 
433 			getTypeInputInfo(att->atttypid, &typinput, &typioparam);
434 			slot->tts_values[i] =
435 				OidInputFunctionCall(typinput, values[remoteattnum],
436 									 typioparam, att->atttypmod);
437 			slot->tts_isnull[i] = false;
438 
439 			errarg.remote_attnum = -1;
440 		}
441 		else
442 		{
443 			slot->tts_values[i] = (Datum) 0;
444 			slot->tts_isnull[i] = true;
445 		}
446 	}
447 
448 	/* Pop the error context stack */
449 	error_context_stack = errcallback.previous;
450 
451 	/* And finally, declare that "slot" contains a valid virtual tuple */
452 	ExecStoreVirtualTuple(slot);
453 }
454 
455 /*
456  * Handle BEGIN message.
457  */
458 static void
apply_handle_begin(StringInfo s)459 apply_handle_begin(StringInfo s)
460 {
461 	LogicalRepBeginData begin_data;
462 
463 	logicalrep_read_begin(s, &begin_data);
464 
465 	remote_final_lsn = begin_data.final_lsn;
466 
467 	in_remote_transaction = true;
468 
469 	pgstat_report_activity(STATE_RUNNING, NULL);
470 }
471 
472 /*
473  * Handle COMMIT message.
474  *
475  * TODO, support tracking of multiple origins
476  */
477 static void
apply_handle_commit(StringInfo s)478 apply_handle_commit(StringInfo s)
479 {
480 	LogicalRepCommitData commit_data;
481 
482 	logicalrep_read_commit(s, &commit_data);
483 
484 	if (commit_data.commit_lsn != remote_final_lsn)
485 		ereport(ERROR,
486 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
487 				 errmsg_internal("incorrect commit LSN %X/%X in commit message (expected %X/%X)",
488 								 (uint32) (commit_data.commit_lsn >> 32),
489 								 (uint32) commit_data.commit_lsn,
490 								 (uint32) (remote_final_lsn >> 32),
491 								 (uint32) remote_final_lsn)));
492 
493 	/* The synchronization worker runs in single transaction. */
494 	if (IsTransactionState() && !am_tablesync_worker())
495 	{
496 		/*
497 		 * Update origin state so we can restart streaming from correct
498 		 * position in case of crash.
499 		 */
500 		replorigin_session_origin_lsn = commit_data.end_lsn;
501 		replorigin_session_origin_timestamp = commit_data.committime;
502 
503 		CommitTransactionCommand();
504 		pgstat_report_stat(false);
505 
506 		store_flush_position(commit_data.end_lsn);
507 	}
508 	else
509 	{
510 		/* Process any invalidation messages that might have accumulated. */
511 		AcceptInvalidationMessages();
512 		maybe_reread_subscription();
513 	}
514 
515 	in_remote_transaction = false;
516 
517 	/* Process any tables that are being synchronized in parallel. */
518 	process_syncing_tables(commit_data.end_lsn);
519 
520 	pgstat_report_activity(STATE_IDLE, NULL);
521 }
522 
523 /*
524  * Handle ORIGIN message.
525  *
526  * TODO, support tracking of multiple origins
527  */
528 static void
apply_handle_origin(StringInfo s)529 apply_handle_origin(StringInfo s)
530 {
531 	/*
532 	 * ORIGIN message can only come inside remote transaction and before any
533 	 * actual writes.
534 	 */
535 	if (!in_remote_transaction ||
536 		(IsTransactionState() && !am_tablesync_worker()))
537 		ereport(ERROR,
538 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
539 				 errmsg("ORIGIN message sent out of order")));
540 }
541 
542 /*
543  * Handle RELATION message.
544  *
545  * Note we don't do validation against local schema here. The validation
546  * against local schema is postponed until first change for given relation
547  * comes as we only care about it when applying changes for it anyway and we
548  * do less locking this way.
549  */
550 static void
apply_handle_relation(StringInfo s)551 apply_handle_relation(StringInfo s)
552 {
553 	LogicalRepRelation *rel;
554 
555 	rel = logicalrep_read_rel(s);
556 	logicalrep_relmap_update(rel);
557 }
558 
559 /*
560  * Handle TYPE message.
561  *
562  * This is now vestigial; we read the info and discard it.
563  */
564 static void
apply_handle_type(StringInfo s)565 apply_handle_type(StringInfo s)
566 {
567 	LogicalRepTyp typ;
568 
569 	logicalrep_read_typ(s, &typ);
570 }
571 
572 /*
573  * Get replica identity index or if it is not defined a primary key.
574  *
575  * If neither is defined, returns InvalidOid
576  */
577 static Oid
GetRelationIdentityOrPK(Relation rel)578 GetRelationIdentityOrPK(Relation rel)
579 {
580 	Oid			idxoid;
581 
582 	idxoid = RelationGetReplicaIndex(rel);
583 
584 	if (!OidIsValid(idxoid))
585 		idxoid = RelationGetPrimaryKeyIndex(rel);
586 
587 	return idxoid;
588 }
589 
590 /*
591  * Handle INSERT message.
592  */
593 static void
apply_handle_insert(StringInfo s)594 apply_handle_insert(StringInfo s)
595 {
596 	LogicalRepRelMapEntry *rel;
597 	LogicalRepTupleData newtup;
598 	LogicalRepRelId relid;
599 	EState	   *estate;
600 	TupleTableSlot *remoteslot;
601 	MemoryContext oldctx;
602 
603 	begin_replication_step();
604 
605 	relid = logicalrep_read_insert(s, &newtup);
606 	rel = logicalrep_rel_open(relid, RowExclusiveLock);
607 	if (!should_apply_changes_for_rel(rel))
608 	{
609 		/*
610 		 * The relation can't become interesting in the middle of the
611 		 * transaction so it's safe to unlock it.
612 		 */
613 		logicalrep_rel_close(rel, RowExclusiveLock);
614 		end_replication_step();
615 		return;
616 	}
617 
618 	/* Initialize the executor state. */
619 	estate = create_estate_for_relation(rel);
620 	remoteslot = ExecInitExtraTupleSlot(estate,
621 										RelationGetDescr(rel->localrel),
622 										&TTSOpsVirtual);
623 
624 	/* Process and store remote tuple in the slot */
625 	oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
626 	slot_store_cstrings(remoteslot, rel, newtup.values);
627 	slot_fill_defaults(rel, estate, remoteslot);
628 	MemoryContextSwitchTo(oldctx);
629 
630 	ExecOpenIndices(estate->es_result_relation_info, false);
631 
632 	/* Do the insert. */
633 	ExecSimpleRelationInsert(estate, remoteslot);
634 
635 	/* Cleanup. */
636 	ExecCloseIndices(estate->es_result_relation_info);
637 
638 	finish_estate(estate);
639 
640 	logicalrep_rel_close(rel, NoLock);
641 
642 	end_replication_step();
643 }
644 
645 /*
646  * Check if the logical replication relation is updatable and throw
647  * appropriate error if it isn't.
648  */
649 static void
check_relation_updatable(LogicalRepRelMapEntry * rel)650 check_relation_updatable(LogicalRepRelMapEntry *rel)
651 {
652 	/* Updatable, no error. */
653 	if (rel->updatable)
654 		return;
655 
656 	/*
657 	 * We are in error mode so it's fine this is somewhat slow. It's better to
658 	 * give user correct error.
659 	 */
660 	if (OidIsValid(GetRelationIdentityOrPK(rel->localrel)))
661 	{
662 		ereport(ERROR,
663 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
664 				 errmsg("publisher did not send replica identity column "
665 						"expected by the logical replication target relation \"%s.%s\"",
666 						rel->remoterel.nspname, rel->remoterel.relname)));
667 	}
668 
669 	ereport(ERROR,
670 			(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
671 			 errmsg("logical replication target relation \"%s.%s\" has "
672 					"neither REPLICA IDENTITY index nor PRIMARY "
673 					"KEY and published relation does not have "
674 					"REPLICA IDENTITY FULL",
675 					rel->remoterel.nspname, rel->remoterel.relname)));
676 }
677 
678 /*
679  * Handle UPDATE message.
680  *
681  * TODO: FDW support
682  */
683 static void
apply_handle_update(StringInfo s)684 apply_handle_update(StringInfo s)
685 {
686 	LogicalRepRelMapEntry *rel;
687 	LogicalRepRelId relid;
688 	Oid			idxoid;
689 	EState	   *estate;
690 	EPQState	epqstate;
691 	LogicalRepTupleData oldtup;
692 	LogicalRepTupleData newtup;
693 	bool		has_oldtup;
694 	TupleTableSlot *localslot;
695 	TupleTableSlot *remoteslot;
696 	RangeTblEntry *target_rte;
697 	bool		found;
698 	MemoryContext oldctx;
699 
700 	begin_replication_step();
701 
702 	relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
703 								   &newtup);
704 	rel = logicalrep_rel_open(relid, RowExclusiveLock);
705 	if (!should_apply_changes_for_rel(rel))
706 	{
707 		/*
708 		 * The relation can't become interesting in the middle of the
709 		 * transaction so it's safe to unlock it.
710 		 */
711 		logicalrep_rel_close(rel, RowExclusiveLock);
712 		end_replication_step();
713 		return;
714 	}
715 
716 	/* Check if we can do the update. */
717 	check_relation_updatable(rel);
718 
719 	/* Initialize the executor state. */
720 	estate = create_estate_for_relation(rel);
721 	remoteslot = ExecInitExtraTupleSlot(estate,
722 										RelationGetDescr(rel->localrel),
723 										&TTSOpsVirtual);
724 	localslot = table_slot_create(rel->localrel,
725 								  &estate->es_tupleTable);
726 	EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
727 
728 	/*
729 	 * Populate updatedCols so that per-column triggers can fire.  This could
730 	 * include more columns than were actually changed on the publisher
731 	 * because the logical replication protocol doesn't contain that
732 	 * information.  But it would for example exclude columns that only exist
733 	 * on the subscriber, since we are not touching those.
734 	 */
735 	target_rte = list_nth(estate->es_range_table, 0);
736 	for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
737 	{
738 		Form_pg_attribute att = TupleDescAttr(remoteslot->tts_tupleDescriptor, i);
739 		int			remoteattnum = rel->attrmap[i];
740 
741 		if (!att->attisdropped && remoteattnum >= 0)
742 		{
743 			if (newtup.changed[remoteattnum])
744 				target_rte->updatedCols =
745 					bms_add_member(target_rte->updatedCols,
746 								   i + 1 - FirstLowInvalidHeapAttributeNumber);
747 		}
748 	}
749 
750 	/* Also populate extraUpdatedCols, in case we have generated columns */
751 	fill_extraUpdatedCols(target_rte, rel->localrel);
752 
753 	ExecOpenIndices(estate->es_result_relation_info, false);
754 
755 	/* Build the search tuple. */
756 	oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
757 	slot_store_cstrings(remoteslot, rel,
758 						has_oldtup ? oldtup.values : newtup.values);
759 	MemoryContextSwitchTo(oldctx);
760 
761 	/*
762 	 * Try to find tuple using either replica identity index, primary key or
763 	 * if needed, sequential scan.
764 	 */
765 	idxoid = GetRelationIdentityOrPK(rel->localrel);
766 	Assert(OidIsValid(idxoid) ||
767 		   (rel->remoterel.replident == REPLICA_IDENTITY_FULL && has_oldtup));
768 
769 	if (OidIsValid(idxoid))
770 		found = RelationFindReplTupleByIndex(rel->localrel, idxoid,
771 											 LockTupleExclusive,
772 											 remoteslot, localslot);
773 	else
774 		found = RelationFindReplTupleSeq(rel->localrel, LockTupleExclusive,
775 										 remoteslot, localslot);
776 
777 	ExecClearTuple(remoteslot);
778 
779 	/*
780 	 * Tuple found.
781 	 *
782 	 * Note this will fail if there are other conflicting unique indexes.
783 	 */
784 	if (found)
785 	{
786 		/* Process and store remote tuple in the slot */
787 		oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
788 		slot_modify_cstrings(remoteslot, localslot, rel,
789 							 newtup.values, newtup.changed);
790 		MemoryContextSwitchTo(oldctx);
791 
792 		EvalPlanQualSetSlot(&epqstate, remoteslot);
793 
794 		/* Do the actual update. */
795 		ExecSimpleRelationUpdate(estate, &epqstate, localslot, remoteslot);
796 	}
797 	else
798 	{
799 		/*
800 		 * The tuple to be updated could not be found.
801 		 *
802 		 * TODO what to do here, change the log level to LOG perhaps?
803 		 */
804 		elog(DEBUG1,
805 			 "logical replication did not find row for update "
806 			 "in replication target relation \"%s\"",
807 			 RelationGetRelationName(rel->localrel));
808 	}
809 
810 	/* Cleanup. */
811 	EvalPlanQualEnd(&epqstate);
812 	ExecCloseIndices(estate->es_result_relation_info);
813 
814 	finish_estate(estate);
815 
816 	logicalrep_rel_close(rel, NoLock);
817 
818 	end_replication_step();
819 }
820 
821 /*
822  * Handle DELETE message.
823  *
824  * TODO: FDW support
825  */
826 static void
apply_handle_delete(StringInfo s)827 apply_handle_delete(StringInfo s)
828 {
829 	LogicalRepRelMapEntry *rel;
830 	LogicalRepTupleData oldtup;
831 	LogicalRepRelId relid;
832 	Oid			idxoid;
833 	EState	   *estate;
834 	EPQState	epqstate;
835 	TupleTableSlot *remoteslot;
836 	TupleTableSlot *localslot;
837 	bool		found;
838 	MemoryContext oldctx;
839 
840 	begin_replication_step();
841 
842 	relid = logicalrep_read_delete(s, &oldtup);
843 	rel = logicalrep_rel_open(relid, RowExclusiveLock);
844 	if (!should_apply_changes_for_rel(rel))
845 	{
846 		/*
847 		 * The relation can't become interesting in the middle of the
848 		 * transaction so it's safe to unlock it.
849 		 */
850 		logicalrep_rel_close(rel, RowExclusiveLock);
851 		end_replication_step();
852 		return;
853 	}
854 
855 	/* Check if we can do the delete. */
856 	check_relation_updatable(rel);
857 
858 	/* Initialize the executor state. */
859 	estate = create_estate_for_relation(rel);
860 	remoteslot = ExecInitExtraTupleSlot(estate,
861 										RelationGetDescr(rel->localrel),
862 										&TTSOpsVirtual);
863 	localslot = table_slot_create(rel->localrel,
864 								  &estate->es_tupleTable);
865 	EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
866 
867 	ExecOpenIndices(estate->es_result_relation_info, false);
868 
869 	/* Find the tuple using the replica identity index. */
870 	oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
871 	slot_store_cstrings(remoteslot, rel, oldtup.values);
872 	MemoryContextSwitchTo(oldctx);
873 
874 	/*
875 	 * Try to find tuple using either replica identity index, primary key or
876 	 * if needed, sequential scan.
877 	 */
878 	idxoid = GetRelationIdentityOrPK(rel->localrel);
879 	Assert(OidIsValid(idxoid) ||
880 		   (rel->remoterel.replident == REPLICA_IDENTITY_FULL));
881 
882 	if (OidIsValid(idxoid))
883 		found = RelationFindReplTupleByIndex(rel->localrel, idxoid,
884 											 LockTupleExclusive,
885 											 remoteslot, localslot);
886 	else
887 		found = RelationFindReplTupleSeq(rel->localrel, LockTupleExclusive,
888 										 remoteslot, localslot);
889 	/* If found delete it. */
890 	if (found)
891 	{
892 		EvalPlanQualSetSlot(&epqstate, localslot);
893 
894 		/* Do the actual delete. */
895 		ExecSimpleRelationDelete(estate, &epqstate, localslot);
896 	}
897 	else
898 	{
899 		/* The tuple to be deleted could not be found. */
900 		elog(DEBUG1,
901 			 "logical replication could not find row for delete "
902 			 "in replication target relation \"%s\"",
903 			 RelationGetRelationName(rel->localrel));
904 	}
905 
906 	/* Cleanup. */
907 	EvalPlanQualEnd(&epqstate);
908 	ExecCloseIndices(estate->es_result_relation_info);
909 
910 	finish_estate(estate);
911 
912 	logicalrep_rel_close(rel, NoLock);
913 
914 	end_replication_step();
915 }
916 
917 /*
918  * Handle TRUNCATE message.
919  *
920  * TODO: FDW support
921  */
922 static void
apply_handle_truncate(StringInfo s)923 apply_handle_truncate(StringInfo s)
924 {
925 	bool		cascade = false;
926 	bool		restart_seqs = false;
927 	List	   *remote_relids = NIL;
928 	List	   *remote_rels = NIL;
929 	List	   *rels = NIL;
930 	List	   *relids = NIL;
931 	List	   *relids_logged = NIL;
932 	ListCell   *lc;
933 	LOCKMODE	lockmode = AccessExclusiveLock;
934 
935 	begin_replication_step();
936 
937 	remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
938 
939 	foreach(lc, remote_relids)
940 	{
941 		LogicalRepRelId relid = lfirst_oid(lc);
942 		LogicalRepRelMapEntry *rel;
943 
944 		rel = logicalrep_rel_open(relid, lockmode);
945 		if (!should_apply_changes_for_rel(rel))
946 		{
947 			/*
948 			 * The relation can't become interesting in the middle of the
949 			 * transaction so it's safe to unlock it.
950 			 */
951 			logicalrep_rel_close(rel, lockmode);
952 			continue;
953 		}
954 
955 		remote_rels = lappend(remote_rels, rel);
956 		rels = lappend(rels, rel->localrel);
957 		relids = lappend_oid(relids, rel->localreloid);
958 		if (RelationIsLogicallyLogged(rel->localrel))
959 			relids_logged = lappend_oid(relids_logged, rel->localreloid);
960 	}
961 
962 	/*
963 	 * Even if we used CASCADE on the upstream master we explicitly default to
964 	 * replaying changes without further cascading. This might be later
965 	 * changeable with a user specified option.
966 	 */
967 	ExecuteTruncateGuts(rels, relids, relids_logged, DROP_RESTRICT, restart_seqs);
968 
969 	foreach(lc, remote_rels)
970 	{
971 		LogicalRepRelMapEntry *rel = lfirst(lc);
972 
973 		logicalrep_rel_close(rel, NoLock);
974 	}
975 
976 	end_replication_step();
977 }
978 
979 
980 /*
981  * Logical replication protocol message dispatcher.
982  */
983 static void
apply_dispatch(StringInfo s)984 apply_dispatch(StringInfo s)
985 {
986 	char		action = pq_getmsgbyte(s);
987 
988 	switch (action)
989 	{
990 			/* BEGIN */
991 		case 'B':
992 			apply_handle_begin(s);
993 			break;
994 			/* COMMIT */
995 		case 'C':
996 			apply_handle_commit(s);
997 			break;
998 			/* INSERT */
999 		case 'I':
1000 			apply_handle_insert(s);
1001 			break;
1002 			/* UPDATE */
1003 		case 'U':
1004 			apply_handle_update(s);
1005 			break;
1006 			/* DELETE */
1007 		case 'D':
1008 			apply_handle_delete(s);
1009 			break;
1010 			/* TRUNCATE */
1011 		case 'T':
1012 			apply_handle_truncate(s);
1013 			break;
1014 			/* RELATION */
1015 		case 'R':
1016 			apply_handle_relation(s);
1017 			break;
1018 			/* TYPE */
1019 		case 'Y':
1020 			apply_handle_type(s);
1021 			break;
1022 			/* ORIGIN */
1023 		case 'O':
1024 			apply_handle_origin(s);
1025 			break;
1026 		default:
1027 			ereport(ERROR,
1028 					(errcode(ERRCODE_PROTOCOL_VIOLATION),
1029 					 errmsg("invalid logical replication message type \"%c\"", action)));
1030 	}
1031 }
1032 
1033 /*
1034  * Figure out which write/flush positions to report to the walsender process.
1035  *
1036  * We can't simply report back the last LSN the walsender sent us because the
1037  * local transaction might not yet be flushed to disk locally. Instead we
1038  * build a list that associates local with remote LSNs for every commit. When
1039  * reporting back the flush position to the sender we iterate that list and
1040  * check which entries on it are already locally flushed. Those we can report
1041  * as having been flushed.
1042  *
1043  * The have_pending_txes is true if there are outstanding transactions that
1044  * need to be flushed.
1045  */
1046 static void
get_flush_position(XLogRecPtr * write,XLogRecPtr * flush,bool * have_pending_txes)1047 get_flush_position(XLogRecPtr *write, XLogRecPtr *flush,
1048 				   bool *have_pending_txes)
1049 {
1050 	dlist_mutable_iter iter;
1051 	XLogRecPtr	local_flush = GetFlushRecPtr();
1052 
1053 	*write = InvalidXLogRecPtr;
1054 	*flush = InvalidXLogRecPtr;
1055 
1056 	dlist_foreach_modify(iter, &lsn_mapping)
1057 	{
1058 		FlushPosition *pos =
1059 		dlist_container(FlushPosition, node, iter.cur);
1060 
1061 		*write = pos->remote_end;
1062 
1063 		if (pos->local_end <= local_flush)
1064 		{
1065 			*flush = pos->remote_end;
1066 			dlist_delete(iter.cur);
1067 			pfree(pos);
1068 		}
1069 		else
1070 		{
1071 			/*
1072 			 * Don't want to uselessly iterate over the rest of the list which
1073 			 * could potentially be long. Instead get the last element and
1074 			 * grab the write position from there.
1075 			 */
1076 			pos = dlist_tail_element(FlushPosition, node,
1077 									 &lsn_mapping);
1078 			*write = pos->remote_end;
1079 			*have_pending_txes = true;
1080 			return;
1081 		}
1082 	}
1083 
1084 	*have_pending_txes = !dlist_is_empty(&lsn_mapping);
1085 }
1086 
1087 /*
1088  * Store current remote/local lsn pair in the tracking list.
1089  */
1090 static void
store_flush_position(XLogRecPtr remote_lsn)1091 store_flush_position(XLogRecPtr remote_lsn)
1092 {
1093 	FlushPosition *flushpos;
1094 
1095 	/* Need to do this in permanent context */
1096 	MemoryContextSwitchTo(ApplyContext);
1097 
1098 	/* Track commit lsn  */
1099 	flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
1100 	flushpos->local_end = XactLastCommitEnd;
1101 	flushpos->remote_end = remote_lsn;
1102 
1103 	dlist_push_tail(&lsn_mapping, &flushpos->node);
1104 	MemoryContextSwitchTo(ApplyMessageContext);
1105 }
1106 
1107 
1108 /* Update statistics of the worker. */
1109 static void
UpdateWorkerStats(XLogRecPtr last_lsn,TimestampTz send_time,bool reply)1110 UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
1111 {
1112 	MyLogicalRepWorker->last_lsn = last_lsn;
1113 	MyLogicalRepWorker->last_send_time = send_time;
1114 	MyLogicalRepWorker->last_recv_time = GetCurrentTimestamp();
1115 	if (reply)
1116 	{
1117 		MyLogicalRepWorker->reply_lsn = last_lsn;
1118 		MyLogicalRepWorker->reply_time = send_time;
1119 	}
1120 }
1121 
1122 /*
1123  * Apply main loop.
1124  */
1125 static void
LogicalRepApplyLoop(XLogRecPtr last_received)1126 LogicalRepApplyLoop(XLogRecPtr last_received)
1127 {
1128 	TimestampTz last_recv_timestamp = GetCurrentTimestamp();
1129 	bool		ping_sent = false;
1130 
1131 	/*
1132 	 * Init the ApplyMessageContext which we clean up after each replication
1133 	 * protocol message.
1134 	 */
1135 	ApplyMessageContext = AllocSetContextCreate(ApplyContext,
1136 												"ApplyMessageContext",
1137 												ALLOCSET_DEFAULT_SIZES);
1138 
1139 	/* mark as idle, before starting to loop */
1140 	pgstat_report_activity(STATE_IDLE, NULL);
1141 
1142 	/* This outer loop iterates once per wait. */
1143 	for (;;)
1144 	{
1145 		pgsocket	fd = PGINVALID_SOCKET;
1146 		int			rc;
1147 		int			len;
1148 		char	   *buf = NULL;
1149 		bool		endofstream = false;
1150 		long		wait_time;
1151 
1152 		CHECK_FOR_INTERRUPTS();
1153 
1154 		MemoryContextSwitchTo(ApplyMessageContext);
1155 
1156 		len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
1157 
1158 		if (len != 0)
1159 		{
1160 			/* Loop to process all available data (without blocking). */
1161 			for (;;)
1162 			{
1163 				CHECK_FOR_INTERRUPTS();
1164 
1165 				if (len == 0)
1166 				{
1167 					break;
1168 				}
1169 				else if (len < 0)
1170 				{
1171 					ereport(LOG,
1172 							(errmsg("data stream from publisher has ended")));
1173 					endofstream = true;
1174 					break;
1175 				}
1176 				else
1177 				{
1178 					int			c;
1179 					StringInfoData s;
1180 
1181 					/* Reset timeout. */
1182 					last_recv_timestamp = GetCurrentTimestamp();
1183 					ping_sent = false;
1184 
1185 					/* Ensure we are reading the data into our memory context. */
1186 					MemoryContextSwitchTo(ApplyMessageContext);
1187 
1188 					s.data = buf;
1189 					s.len = len;
1190 					s.cursor = 0;
1191 					s.maxlen = -1;
1192 
1193 					c = pq_getmsgbyte(&s);
1194 
1195 					if (c == 'w')
1196 					{
1197 						XLogRecPtr	start_lsn;
1198 						XLogRecPtr	end_lsn;
1199 						TimestampTz send_time;
1200 
1201 						start_lsn = pq_getmsgint64(&s);
1202 						end_lsn = pq_getmsgint64(&s);
1203 						send_time = pq_getmsgint64(&s);
1204 
1205 						if (last_received < start_lsn)
1206 							last_received = start_lsn;
1207 
1208 						if (last_received < end_lsn)
1209 							last_received = end_lsn;
1210 
1211 						UpdateWorkerStats(last_received, send_time, false);
1212 
1213 						apply_dispatch(&s);
1214 					}
1215 					else if (c == 'k')
1216 					{
1217 						XLogRecPtr	end_lsn;
1218 						TimestampTz timestamp;
1219 						bool		reply_requested;
1220 
1221 						end_lsn = pq_getmsgint64(&s);
1222 						timestamp = pq_getmsgint64(&s);
1223 						reply_requested = pq_getmsgbyte(&s);
1224 
1225 						if (last_received < end_lsn)
1226 							last_received = end_lsn;
1227 
1228 						send_feedback(last_received, reply_requested, false);
1229 						UpdateWorkerStats(last_received, timestamp, true);
1230 					}
1231 					/* other message types are purposefully ignored */
1232 
1233 					MemoryContextReset(ApplyMessageContext);
1234 				}
1235 
1236 				len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
1237 			}
1238 		}
1239 
1240 		/* confirm all writes so far */
1241 		send_feedback(last_received, false, false);
1242 
1243 		if (!in_remote_transaction)
1244 		{
1245 			/*
1246 			 * If we didn't get any transactions for a while there might be
1247 			 * unconsumed invalidation messages in the queue, consume them
1248 			 * now.
1249 			 */
1250 			AcceptInvalidationMessages();
1251 			maybe_reread_subscription();
1252 
1253 			/* Process any table synchronization changes. */
1254 			process_syncing_tables(last_received);
1255 		}
1256 
1257 		/* Cleanup the memory. */
1258 		MemoryContextResetAndDeleteChildren(ApplyMessageContext);
1259 		MemoryContextSwitchTo(TopMemoryContext);
1260 
1261 		/* Check if we need to exit the streaming loop. */
1262 		if (endofstream)
1263 		{
1264 			TimeLineID	tli;
1265 
1266 			walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
1267 			break;
1268 		}
1269 
1270 		/*
1271 		 * Wait for more data or latch.  If we have unflushed transactions,
1272 		 * wake up after WalWriterDelay to see if they've been flushed yet (in
1273 		 * which case we should send a feedback message).  Otherwise, there's
1274 		 * no particular urgency about waking up unless we get data or a
1275 		 * signal.
1276 		 */
1277 		if (!dlist_is_empty(&lsn_mapping))
1278 			wait_time = WalWriterDelay;
1279 		else
1280 			wait_time = NAPTIME_PER_CYCLE;
1281 
1282 		rc = WaitLatchOrSocket(MyLatch,
1283 							   WL_SOCKET_READABLE | WL_LATCH_SET |
1284 							   WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
1285 							   fd, wait_time,
1286 							   WAIT_EVENT_LOGICAL_APPLY_MAIN);
1287 
1288 		if (rc & WL_LATCH_SET)
1289 		{
1290 			ResetLatch(MyLatch);
1291 			CHECK_FOR_INTERRUPTS();
1292 		}
1293 
1294 		if (got_SIGHUP)
1295 		{
1296 			got_SIGHUP = false;
1297 			ProcessConfigFile(PGC_SIGHUP);
1298 		}
1299 
1300 		if (rc & WL_TIMEOUT)
1301 		{
1302 			/*
1303 			 * We didn't receive anything new. If we haven't heard anything
1304 			 * from the server for more than wal_receiver_timeout / 2, ping
1305 			 * the server. Also, if it's been longer than
1306 			 * wal_receiver_status_interval since the last update we sent,
1307 			 * send a status update to the master anyway, to report any
1308 			 * progress in applying WAL.
1309 			 */
1310 			bool		requestReply = false;
1311 
1312 			/*
1313 			 * Check if time since last receive from standby has reached the
1314 			 * configured limit.
1315 			 */
1316 			if (wal_receiver_timeout > 0)
1317 			{
1318 				TimestampTz now = GetCurrentTimestamp();
1319 				TimestampTz timeout;
1320 
1321 				timeout =
1322 					TimestampTzPlusMilliseconds(last_recv_timestamp,
1323 												wal_receiver_timeout);
1324 
1325 				if (now >= timeout)
1326 					ereport(ERROR,
1327 							(errmsg("terminating logical replication worker due to timeout")));
1328 
1329 				/* Check to see if it's time for a ping. */
1330 				if (!ping_sent)
1331 				{
1332 					timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
1333 														  (wal_receiver_timeout / 2));
1334 					if (now >= timeout)
1335 					{
1336 						requestReply = true;
1337 						ping_sent = true;
1338 					}
1339 				}
1340 			}
1341 
1342 			send_feedback(last_received, requestReply, requestReply);
1343 		}
1344 	}
1345 }
1346 
1347 /*
1348  * Send a Standby Status Update message to server.
1349  *
1350  * 'recvpos' is the latest LSN we've received data to, force is set if we need
1351  * to send a response to avoid timeouts.
1352  */
1353 static void
send_feedback(XLogRecPtr recvpos,bool force,bool requestReply)1354 send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
1355 {
1356 	static StringInfo reply_message = NULL;
1357 	static TimestampTz send_time = 0;
1358 
1359 	static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
1360 	static XLogRecPtr last_writepos = InvalidXLogRecPtr;
1361 	static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
1362 
1363 	XLogRecPtr	writepos;
1364 	XLogRecPtr	flushpos;
1365 	TimestampTz now;
1366 	bool		have_pending_txes;
1367 
1368 	/*
1369 	 * If the user doesn't want status to be reported to the publisher, be
1370 	 * sure to exit before doing anything at all.
1371 	 */
1372 	if (!force && wal_receiver_status_interval <= 0)
1373 		return;
1374 
1375 	/* It's legal to not pass a recvpos */
1376 	if (recvpos < last_recvpos)
1377 		recvpos = last_recvpos;
1378 
1379 	get_flush_position(&writepos, &flushpos, &have_pending_txes);
1380 
1381 	/*
1382 	 * No outstanding transactions to flush, we can report the latest received
1383 	 * position. This is important for synchronous replication.
1384 	 */
1385 	if (!have_pending_txes)
1386 		flushpos = writepos = recvpos;
1387 
1388 	if (writepos < last_writepos)
1389 		writepos = last_writepos;
1390 
1391 	if (flushpos < last_flushpos)
1392 		flushpos = last_flushpos;
1393 
1394 	now = GetCurrentTimestamp();
1395 
1396 	/* if we've already reported everything we're good */
1397 	if (!force &&
1398 		writepos == last_writepos &&
1399 		flushpos == last_flushpos &&
1400 		!TimestampDifferenceExceeds(send_time, now,
1401 									wal_receiver_status_interval * 1000))
1402 		return;
1403 	send_time = now;
1404 
1405 	if (!reply_message)
1406 	{
1407 		MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
1408 
1409 		reply_message = makeStringInfo();
1410 		MemoryContextSwitchTo(oldctx);
1411 	}
1412 	else
1413 		resetStringInfo(reply_message);
1414 
1415 	pq_sendbyte(reply_message, 'r');
1416 	pq_sendint64(reply_message, recvpos);	/* write */
1417 	pq_sendint64(reply_message, flushpos);	/* flush */
1418 	pq_sendint64(reply_message, writepos);	/* apply */
1419 	pq_sendint64(reply_message, now);	/* sendTime */
1420 	pq_sendbyte(reply_message, requestReply);	/* replyRequested */
1421 
1422 	elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
1423 		 force,
1424 		 (uint32) (recvpos >> 32), (uint32) recvpos,
1425 		 (uint32) (writepos >> 32), (uint32) writepos,
1426 		 (uint32) (flushpos >> 32), (uint32) flushpos
1427 		);
1428 
1429 	walrcv_send(LogRepWorkerWalRcvConn,
1430 				reply_message->data, reply_message->len);
1431 
1432 	if (recvpos > last_recvpos)
1433 		last_recvpos = recvpos;
1434 	if (writepos > last_writepos)
1435 		last_writepos = writepos;
1436 	if (flushpos > last_flushpos)
1437 		last_flushpos = flushpos;
1438 }
1439 
1440 /*
1441  * Reread subscription info if needed. Most changes will be exit.
1442  */
1443 static void
maybe_reread_subscription(void)1444 maybe_reread_subscription(void)
1445 {
1446 	MemoryContext oldctx;
1447 	Subscription *newsub;
1448 	bool		started_tx = false;
1449 
1450 	/* When cache state is valid there is nothing to do here. */
1451 	if (MySubscriptionValid)
1452 		return;
1453 
1454 	/* This function might be called inside or outside of transaction. */
1455 	if (!IsTransactionState())
1456 	{
1457 		StartTransactionCommand();
1458 		started_tx = true;
1459 	}
1460 
1461 	/* Ensure allocations in permanent context. */
1462 	oldctx = MemoryContextSwitchTo(ApplyContext);
1463 
1464 	newsub = GetSubscription(MyLogicalRepWorker->subid, true);
1465 
1466 	/*
1467 	 * Exit if the subscription was removed. This normally should not happen
1468 	 * as the worker gets killed during DROP SUBSCRIPTION.
1469 	 */
1470 	if (!newsub)
1471 	{
1472 		ereport(LOG,
1473 				(errmsg("logical replication apply worker for subscription \"%s\" will "
1474 						"stop because the subscription was removed",
1475 						MySubscription->name)));
1476 
1477 		proc_exit(0);
1478 	}
1479 
1480 	/*
1481 	 * Exit if the subscription was disabled. This normally should not happen
1482 	 * as the worker gets killed during ALTER SUBSCRIPTION ... DISABLE.
1483 	 */
1484 	if (!newsub->enabled)
1485 	{
1486 		ereport(LOG,
1487 				(errmsg("logical replication apply worker for subscription \"%s\" will "
1488 						"stop because the subscription was disabled",
1489 						MySubscription->name)));
1490 
1491 		proc_exit(0);
1492 	}
1493 
1494 	/*
1495 	 * Exit if connection string was changed. The launcher will start new
1496 	 * worker.
1497 	 */
1498 	if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0)
1499 	{
1500 		ereport(LOG,
1501 				(errmsg("logical replication apply worker for subscription \"%s\" will "
1502 						"restart because the connection information was changed",
1503 						MySubscription->name)));
1504 
1505 		proc_exit(0);
1506 	}
1507 
1508 	/*
1509 	 * Exit if subscription name was changed (it's used for
1510 	 * fallback_application_name). The launcher will start new worker.
1511 	 */
1512 	if (strcmp(newsub->name, MySubscription->name) != 0)
1513 	{
1514 		ereport(LOG,
1515 				(errmsg("logical replication apply worker for subscription \"%s\" will "
1516 						"restart because subscription was renamed",
1517 						MySubscription->name)));
1518 
1519 		proc_exit(0);
1520 	}
1521 
1522 	/* !slotname should never happen when enabled is true. */
1523 	Assert(newsub->slotname);
1524 
1525 	/*
1526 	 * We need to make new connection to new slot if slot name has changed so
1527 	 * exit here as well if that's the case.
1528 	 */
1529 	if (strcmp(newsub->slotname, MySubscription->slotname) != 0)
1530 	{
1531 		ereport(LOG,
1532 				(errmsg("logical replication apply worker for subscription \"%s\" will "
1533 						"restart because the replication slot name was changed",
1534 						MySubscription->name)));
1535 
1536 		proc_exit(0);
1537 	}
1538 
1539 	/*
1540 	 * Exit if publication list was changed. The launcher will start new
1541 	 * worker.
1542 	 */
1543 	if (!equal(newsub->publications, MySubscription->publications))
1544 	{
1545 		ereport(LOG,
1546 				(errmsg("logical replication apply worker for subscription \"%s\" will "
1547 						"restart because subscription's publications were changed",
1548 						MySubscription->name)));
1549 
1550 		proc_exit(0);
1551 	}
1552 
1553 	/* Check for other changes that should never happen too. */
1554 	if (newsub->dbid != MySubscription->dbid)
1555 	{
1556 		elog(ERROR, "subscription %u changed unexpectedly",
1557 			 MyLogicalRepWorker->subid);
1558 	}
1559 
1560 	/* Clean old subscription info and switch to new one. */
1561 	FreeSubscription(MySubscription);
1562 	MySubscription = newsub;
1563 
1564 	MemoryContextSwitchTo(oldctx);
1565 
1566 	/* Change synchronous commit according to the user's wishes */
1567 	SetConfigOption("synchronous_commit", MySubscription->synccommit,
1568 					PGC_BACKEND, PGC_S_OVERRIDE);
1569 
1570 	if (started_tx)
1571 		CommitTransactionCommand();
1572 
1573 	MySubscriptionValid = true;
1574 }
1575 
1576 /*
1577  * Callback from subscription syscache invalidation.
1578  */
1579 static void
subscription_change_cb(Datum arg,int cacheid,uint32 hashvalue)1580 subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
1581 {
1582 	MySubscriptionValid = false;
1583 }
1584 
1585 /* SIGHUP: set flag to reload configuration at next convenient time */
1586 static void
logicalrep_worker_sighup(SIGNAL_ARGS)1587 logicalrep_worker_sighup(SIGNAL_ARGS)
1588 {
1589 	int			save_errno = errno;
1590 
1591 	got_SIGHUP = true;
1592 
1593 	/* Waken anything waiting on the process latch */
1594 	SetLatch(MyLatch);
1595 
1596 	errno = save_errno;
1597 }
1598 
1599 /* Logical Replication Apply worker entry point */
1600 void
ApplyWorkerMain(Datum main_arg)1601 ApplyWorkerMain(Datum main_arg)
1602 {
1603 	int			worker_slot = DatumGetInt32(main_arg);
1604 	MemoryContext oldctx;
1605 	char		originname[NAMEDATALEN];
1606 	XLogRecPtr	origin_startpos;
1607 	char	   *myslotname;
1608 	WalRcvStreamOptions options;
1609 
1610 	/* Attach to slot */
1611 	logicalrep_worker_attach(worker_slot);
1612 
1613 	/* Setup signal handling */
1614 	pqsignal(SIGHUP, logicalrep_worker_sighup);
1615 	pqsignal(SIGTERM, die);
1616 	BackgroundWorkerUnblockSignals();
1617 
1618 	/*
1619 	 * We don't currently need any ResourceOwner in a walreceiver process, but
1620 	 * if we did, we could call CreateAuxProcessResourceOwner here.
1621 	 */
1622 
1623 	/* Initialise stats to a sanish value */
1624 	MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
1625 		MyLogicalRepWorker->reply_time = GetCurrentTimestamp();
1626 
1627 	/* Load the libpq-specific functions */
1628 	load_file("libpqwalreceiver", false);
1629 
1630 	/* Run as replica session replication role. */
1631 	SetConfigOption("session_replication_role", "replica",
1632 					PGC_SUSET, PGC_S_OVERRIDE);
1633 
1634 	/* Connect to our database. */
1635 	BackgroundWorkerInitializeConnectionByOid(MyLogicalRepWorker->dbid,
1636 											  MyLogicalRepWorker->userid,
1637 											  0);
1638 
1639 	/*
1640 	 * Set always-secure search path, so malicious users can't redirect user
1641 	 * code (e.g. pg_index.indexprs).
1642 	 */
1643 	SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
1644 
1645 	/* Load the subscription into persistent memory context. */
1646 	ApplyContext = AllocSetContextCreate(TopMemoryContext,
1647 										 "ApplyContext",
1648 										 ALLOCSET_DEFAULT_SIZES);
1649 	StartTransactionCommand();
1650 	oldctx = MemoryContextSwitchTo(ApplyContext);
1651 
1652 	MySubscription = GetSubscription(MyLogicalRepWorker->subid, true);
1653 	if (!MySubscription)
1654 	{
1655 		ereport(LOG,
1656 				(errmsg("logical replication apply worker for subscription %u will not "
1657 						"start because the subscription was removed during startup",
1658 						MyLogicalRepWorker->subid)));
1659 		proc_exit(0);
1660 	}
1661 
1662 	MySubscriptionValid = true;
1663 	MemoryContextSwitchTo(oldctx);
1664 
1665 	if (!MySubscription->enabled)
1666 	{
1667 		ereport(LOG,
1668 				(errmsg("logical replication apply worker for subscription \"%s\" will not "
1669 						"start because the subscription was disabled during startup",
1670 						MySubscription->name)));
1671 
1672 		proc_exit(0);
1673 	}
1674 
1675 	/* Setup synchronous commit according to the user's wishes */
1676 	SetConfigOption("synchronous_commit", MySubscription->synccommit,
1677 					PGC_BACKEND, PGC_S_OVERRIDE);
1678 
1679 	/* Keep us informed about subscription changes. */
1680 	CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
1681 								  subscription_change_cb,
1682 								  (Datum) 0);
1683 
1684 	if (am_tablesync_worker())
1685 		ereport(LOG,
1686 				(errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
1687 						MySubscription->name, get_rel_name(MyLogicalRepWorker->relid))));
1688 	else
1689 		ereport(LOG,
1690 				(errmsg("logical replication apply worker for subscription \"%s\" has started",
1691 						MySubscription->name)));
1692 
1693 	CommitTransactionCommand();
1694 
1695 	/* Connect to the origin and start the replication. */
1696 	elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
1697 		 MySubscription->conninfo);
1698 
1699 	if (am_tablesync_worker())
1700 	{
1701 		char	   *syncslotname;
1702 
1703 		/* This is table synchroniation worker, call initial sync. */
1704 		syncslotname = LogicalRepSyncTableStart(&origin_startpos);
1705 
1706 		/* The slot name needs to be allocated in permanent memory context. */
1707 		oldctx = MemoryContextSwitchTo(ApplyContext);
1708 		myslotname = pstrdup(syncslotname);
1709 		MemoryContextSwitchTo(oldctx);
1710 
1711 		pfree(syncslotname);
1712 	}
1713 	else
1714 	{
1715 		/* This is main apply worker */
1716 		RepOriginId originid;
1717 		TimeLineID	startpointTLI;
1718 		char	   *err;
1719 
1720 		myslotname = MySubscription->slotname;
1721 
1722 		/*
1723 		 * This shouldn't happen if the subscription is enabled, but guard
1724 		 * against DDL bugs or manual catalog changes.  (libpqwalreceiver will
1725 		 * crash if slot is NULL.)
1726 		 */
1727 		if (!myslotname)
1728 			ereport(ERROR,
1729 					(errmsg("subscription has no replication slot set")));
1730 
1731 		/* Setup replication origin tracking. */
1732 		StartTransactionCommand();
1733 		snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid);
1734 		originid = replorigin_by_name(originname, true);
1735 		if (!OidIsValid(originid))
1736 			originid = replorigin_create(originname);
1737 		replorigin_session_setup(originid);
1738 		replorigin_session_origin = originid;
1739 		origin_startpos = replorigin_session_get_progress(false);
1740 		CommitTransactionCommand();
1741 
1742 		LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
1743 												MySubscription->name, &err);
1744 		if (LogRepWorkerWalRcvConn == NULL)
1745 			ereport(ERROR,
1746 					(errmsg("could not connect to the publisher: %s", err)));
1747 
1748 		/*
1749 		 * We don't really use the output identify_system for anything but it
1750 		 * does some initializations on the upstream so let's still call it.
1751 		 */
1752 		(void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
1753 	}
1754 
1755 	/*
1756 	 * Setup callback for syscache so that we know when something changes in
1757 	 * the subscription relation state.
1758 	 */
1759 	CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
1760 								  invalidate_syncing_table_states,
1761 								  (Datum) 0);
1762 
1763 	/* Build logical replication streaming options. */
1764 	options.logical = true;
1765 	options.startpoint = origin_startpos;
1766 	options.slotname = myslotname;
1767 	options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM;
1768 	options.proto.logical.publication_names = MySubscription->publications;
1769 
1770 	/* Start normal logical streaming replication. */
1771 	walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
1772 
1773 	/* Run the main loop. */
1774 	LogicalRepApplyLoop(origin_startpos);
1775 
1776 	proc_exit(0);
1777 }
1778 
1779 /*
1780  * Is current process a logical replication worker?
1781  */
1782 bool
IsLogicalWorker(void)1783 IsLogicalWorker(void)
1784 {
1785 	return MyLogicalRepWorker != NULL;
1786 }
1787