1 /*-------------------------------------------------------------------------
2  * worker.c
3  *	   PostgreSQL logical replication worker (apply)
4  *
5  * Copyright (c) 2016-2018, PostgreSQL Global Development Group
6  *
7  * IDENTIFICATION
8  *	  src/backend/replication/logical/worker.c
9  *
10  * NOTES
11  *	  This file contains the worker which applies logical changes as they come
12  *	  from remote logical replication stream.
13  *
14  *	  The main worker (apply) is started by logical replication worker
15  *	  launcher for every enabled subscription in a database. It uses
16  *	  walsender protocol to communicate with publisher.
17  *
18  *	  This module includes server facing code and shares libpqwalreceiver
19  *	  module with walreceiver for providing the libpq specific functionality.
20  *
21  *-------------------------------------------------------------------------
22  */
23 
24 #include "postgres.h"
25 
26 #include "miscadmin.h"
27 #include "pgstat.h"
28 #include "funcapi.h"
29 
30 #include "access/sysattr.h"
31 #include "access/xact.h"
32 #include "access/xlog_internal.h"
33 
34 #include "catalog/catalog.h"
35 #include "catalog/namespace.h"
36 #include "catalog/pg_subscription.h"
37 #include "catalog/pg_subscription_rel.h"
38 
39 #include "commands/tablecmds.h"
40 #include "commands/trigger.h"
41 
42 #include "executor/executor.h"
43 #include "executor/nodeModifyTable.h"
44 
45 #include "libpq/pqformat.h"
46 #include "libpq/pqsignal.h"
47 
48 #include "mb/pg_wchar.h"
49 
50 #include "nodes/makefuncs.h"
51 
52 #include "optimizer/planner.h"
53 
54 #include "parser/parse_relation.h"
55 
56 #include "postmaster/bgworker.h"
57 #include "postmaster/postmaster.h"
58 #include "postmaster/walwriter.h"
59 
60 #include "replication/decode.h"
61 #include "replication/logical.h"
62 #include "replication/logicalproto.h"
63 #include "replication/logicalrelation.h"
64 #include "replication/logicalworker.h"
65 #include "replication/reorderbuffer.h"
66 #include "replication/origin.h"
67 #include "replication/snapbuild.h"
68 #include "replication/walreceiver.h"
69 #include "replication/worker_internal.h"
70 
71 #include "rewrite/rewriteHandler.h"
72 
73 #include "storage/bufmgr.h"
74 #include "storage/ipc.h"
75 #include "storage/lmgr.h"
76 #include "storage/proc.h"
77 #include "storage/procarray.h"
78 
79 #include "tcop/tcopprot.h"
80 
81 #include "utils/builtins.h"
82 #include "utils/catcache.h"
83 #include "utils/datum.h"
84 #include "utils/fmgroids.h"
85 #include "utils/guc.h"
86 #include "utils/inval.h"
87 #include "utils/lsyscache.h"
88 #include "utils/memutils.h"
89 #include "utils/rel.h"
90 #include "utils/timeout.h"
91 #include "utils/tqual.h"
92 #include "utils/syscache.h"
93 
94 #define NAPTIME_PER_CYCLE 1000	/* max sleep time between cycles (1s) */
95 
96 typedef struct FlushPosition
97 {
98 	dlist_node	node;
99 	XLogRecPtr	local_end;
100 	XLogRecPtr	remote_end;
101 } FlushPosition;
102 
103 static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
104 
105 typedef struct SlotErrCallbackArg
106 {
107 	LogicalRepRelMapEntry *rel;
108 	int			remote_attnum;
109 } SlotErrCallbackArg;
110 
111 static MemoryContext ApplyMessageContext = NULL;
112 MemoryContext ApplyContext = NULL;
113 
114 WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
115 
116 Subscription *MySubscription = NULL;
117 bool		MySubscriptionValid = false;
118 
119 bool		in_remote_transaction = false;
120 static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
121 
122 static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
123 
124 static void store_flush_position(XLogRecPtr remote_lsn);
125 
126 static void maybe_reread_subscription(void);
127 
128 /* Flags set by signal handlers */
129 static volatile sig_atomic_t got_SIGHUP = false;
130 
131 /*
132  * Should this worker apply changes for given relation.
133  *
134  * This is mainly needed for initial relation data sync as that runs in
135  * separate worker process running in parallel and we need some way to skip
136  * changes coming to the main apply worker during the sync of a table.
137  *
138  * Note we need to do smaller or equals comparison for SYNCDONE state because
139  * it might hold position of end of initial slot consistent point WAL
140  * record + 1 (ie start of next record) and next record can be COMMIT of
141  * transaction we are now processing (which is what we set remote_final_lsn
142  * to in apply_handle_begin).
143  */
144 static bool
should_apply_changes_for_rel(LogicalRepRelMapEntry * rel)145 should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
146 {
147 	if (am_tablesync_worker())
148 		return MyLogicalRepWorker->relid == rel->localreloid;
149 	else
150 		return (rel->state == SUBREL_STATE_READY ||
151 				(rel->state == SUBREL_STATE_SYNCDONE &&
152 				 rel->statelsn <= remote_final_lsn));
153 }
154 
155 /*
156  * Begin one step (one INSERT, UPDATE, etc) of a replication transaction.
157  *
158  * Start a transaction, if this is the first step (else we keep using the
159  * existing transaction).
160  * Also provide a global snapshot and ensure we run in ApplyMessageContext.
161  */
162 static void
begin_replication_step(void)163 begin_replication_step(void)
164 {
165 	SetCurrentStatementStartTimestamp();
166 
167 	if (!IsTransactionState())
168 	{
169 		StartTransactionCommand();
170 		maybe_reread_subscription();
171 	}
172 
173 	PushActiveSnapshot(GetTransactionSnapshot());
174 
175 	MemoryContextSwitchTo(ApplyMessageContext);
176 }
177 
178 /*
179  * Finish up one step of a replication transaction.
180  * Callers of begin_replication_step() must also call this.
181  *
182  * We don't close out the transaction here, but we should increment
183  * the command counter to make the effects of this step visible.
184  */
185 static void
end_replication_step(void)186 end_replication_step(void)
187 {
188 	PopActiveSnapshot();
189 
190 	CommandCounterIncrement();
191 }
192 
193 
194 /*
195  * Executor state preparation for evaluation of constraint expressions,
196  * indexes and triggers.
197  *
198  * This is based on similar code in copy.c
199  */
200 static EState *
create_estate_for_relation(LogicalRepRelMapEntry * rel)201 create_estate_for_relation(LogicalRepRelMapEntry *rel)
202 {
203 	EState	   *estate;
204 	ResultRelInfo *resultRelInfo;
205 	RangeTblEntry *rte;
206 
207 	estate = CreateExecutorState();
208 
209 	rte = makeNode(RangeTblEntry);
210 	rte->rtekind = RTE_RELATION;
211 	rte->relid = RelationGetRelid(rel->localrel);
212 	rte->relkind = rel->localrel->rd_rel->relkind;
213 	estate->es_range_table = list_make1(rte);
214 
215 	resultRelInfo = makeNode(ResultRelInfo);
216 	InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
217 
218 	estate->es_result_relations = resultRelInfo;
219 	estate->es_num_result_relations = 1;
220 	estate->es_result_relation_info = resultRelInfo;
221 
222 	estate->es_output_cid = GetCurrentCommandId(true);
223 
224 	/* Triggers might need a slot */
225 	if (resultRelInfo->ri_TrigDesc)
226 		estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate, NULL);
227 
228 	/* Prepare to catch AFTER triggers. */
229 	AfterTriggerBeginQuery();
230 
231 	return estate;
232 }
233 
234 /*
235  * Finish any operations related to the executor state created by
236  * create_estate_for_relation().
237  */
238 static void
finish_estate(EState * estate)239 finish_estate(EState *estate)
240 {
241 	/* Handle any queued AFTER triggers. */
242 	AfterTriggerEndQuery(estate);
243 
244 	/* Cleanup. */
245 	ExecResetTupleTable(estate->es_tupleTable, false);
246 	FreeExecutorState(estate);
247 }
248 
249 /*
250  * Executes default values for columns for which we can't map to remote
251  * relation columns.
252  *
253  * This allows us to support tables which have more columns on the downstream
254  * than on the upstream.
255  */
256 static void
slot_fill_defaults(LogicalRepRelMapEntry * rel,EState * estate,TupleTableSlot * slot)257 slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate,
258 				   TupleTableSlot *slot)
259 {
260 	TupleDesc	desc = RelationGetDescr(rel->localrel);
261 	int			num_phys_attrs = desc->natts;
262 	int			i;
263 	int			attnum,
264 				num_defaults = 0;
265 	int		   *defmap;
266 	ExprState **defexprs;
267 	ExprContext *econtext;
268 
269 	econtext = GetPerTupleExprContext(estate);
270 
271 	/* We got all the data via replication, no need to evaluate anything. */
272 	if (num_phys_attrs == rel->remoterel.natts)
273 		return;
274 
275 	defmap = (int *) palloc(num_phys_attrs * sizeof(int));
276 	defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
277 
278 	for (attnum = 0; attnum < num_phys_attrs; attnum++)
279 	{
280 		Expr	   *defexpr;
281 
282 		if (TupleDescAttr(desc, attnum)->attisdropped)
283 			continue;
284 
285 		if (rel->attrmap[attnum] >= 0)
286 			continue;
287 
288 		defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
289 
290 		if (defexpr != NULL)
291 		{
292 			/* Run the expression through planner */
293 			defexpr = expression_planner(defexpr);
294 
295 			/* Initialize executable expression in copycontext */
296 			defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
297 			defmap[num_defaults] = attnum;
298 			num_defaults++;
299 		}
300 
301 	}
302 
303 	for (i = 0; i < num_defaults; i++)
304 		slot->tts_values[defmap[i]] =
305 			ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
306 }
307 
308 /*
309  * Error callback to give more context info about data conversion failures
310  * while reading data from the remote server.
311  */
312 static void
slot_store_error_callback(void * arg)313 slot_store_error_callback(void *arg)
314 {
315 	SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
316 	LogicalRepRelMapEntry *rel;
317 
318 	/* Nothing to do if remote attribute number is not set */
319 	if (errarg->remote_attnum < 0)
320 		return;
321 
322 	rel = errarg->rel;
323 	errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\"",
324 			   rel->remoterel.nspname, rel->remoterel.relname,
325 			   rel->remoterel.attnames[errarg->remote_attnum]);
326 }
327 
328 /*
329  * Store data in C string form into slot.
330  * This is similar to BuildTupleFromCStrings but TupleTableSlot fits our
331  * use better.
332  */
333 static void
slot_store_cstrings(TupleTableSlot * slot,LogicalRepRelMapEntry * rel,char ** values)334 slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
335 					char **values)
336 {
337 	int			natts = slot->tts_tupleDescriptor->natts;
338 	int			i;
339 	SlotErrCallbackArg errarg;
340 	ErrorContextCallback errcallback;
341 
342 	ExecClearTuple(slot);
343 
344 	/* Push callback + info on the error context stack */
345 	errarg.rel = rel;
346 	errarg.remote_attnum = -1;
347 	errcallback.callback = slot_store_error_callback;
348 	errcallback.arg = (void *) &errarg;
349 	errcallback.previous = error_context_stack;
350 	error_context_stack = &errcallback;
351 
352 	/* Call the "in" function for each non-dropped attribute */
353 	for (i = 0; i < natts; i++)
354 	{
355 		Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
356 		int			remoteattnum = rel->attrmap[i];
357 
358 		if (!att->attisdropped && remoteattnum >= 0 &&
359 			values[remoteattnum] != NULL)
360 		{
361 			Oid			typinput;
362 			Oid			typioparam;
363 
364 			errarg.remote_attnum = remoteattnum;
365 
366 			getTypeInputInfo(att->atttypid, &typinput, &typioparam);
367 			slot->tts_values[i] =
368 				OidInputFunctionCall(typinput, values[remoteattnum],
369 									 typioparam, att->atttypmod);
370 			slot->tts_isnull[i] = false;
371 
372 			errarg.remote_attnum = -1;
373 		}
374 		else
375 		{
376 			/*
377 			 * We assign NULL to dropped attributes, NULL values, and missing
378 			 * values (missing values should be later filled using
379 			 * slot_fill_defaults).
380 			 */
381 			slot->tts_values[i] = (Datum) 0;
382 			slot->tts_isnull[i] = true;
383 		}
384 	}
385 
386 	/* Pop the error context stack */
387 	error_context_stack = errcallback.previous;
388 
389 	ExecStoreVirtualTuple(slot);
390 }
391 
392 /*
393  * Replace selected columns with user data provided as C strings.
394  * This is somewhat similar to heap_modify_tuple but also calls the type
395  * input functions on the user data.
396  * "slot" is filled with a copy of the tuple in "srcslot", with
397  * columns selected by the "replaces" array replaced with data values
398  * from "values".
399  * Caution: unreplaced pass-by-ref columns in "slot" will point into the
400  * storage for "srcslot".  This is OK for current usage, but someday we may
401  * need to materialize "slot" at the end to make it independent of "srcslot".
402  */
403 static void
slot_modify_cstrings(TupleTableSlot * slot,TupleTableSlot * srcslot,LogicalRepRelMapEntry * rel,char ** values,bool * replaces)404 slot_modify_cstrings(TupleTableSlot *slot, TupleTableSlot *srcslot,
405 					 LogicalRepRelMapEntry *rel,
406 					 char **values, bool *replaces)
407 {
408 	int			natts = slot->tts_tupleDescriptor->natts;
409 	int			i;
410 	SlotErrCallbackArg errarg;
411 	ErrorContextCallback errcallback;
412 
413 	/* We'll fill "slot" with a virtual tuple, so we must start with ... */
414 	ExecClearTuple(slot);
415 
416 	/*
417 	 * Copy all the column data from srcslot, so that we'll have valid values
418 	 * for unreplaced columns.
419 	 */
420 	Assert(natts == srcslot->tts_tupleDescriptor->natts);
421 	slot_getallattrs(srcslot);
422 	memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
423 	memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
424 
425 	/* For error reporting, push callback + info on the error context stack */
426 	errarg.rel = rel;
427 	errarg.remote_attnum = -1;
428 	errcallback.callback = slot_store_error_callback;
429 	errcallback.arg = (void *) &errarg;
430 	errcallback.previous = error_context_stack;
431 	error_context_stack = &errcallback;
432 
433 	/* Call the "in" function for each replaced attribute */
434 	for (i = 0; i < natts; i++)
435 	{
436 		Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
437 		int			remoteattnum = rel->attrmap[i];
438 
439 		if (remoteattnum < 0)
440 			continue;
441 
442 		if (!replaces[remoteattnum])
443 			continue;
444 
445 		if (values[remoteattnum] != NULL)
446 		{
447 			Oid			typinput;
448 			Oid			typioparam;
449 
450 			errarg.remote_attnum = remoteattnum;
451 
452 			getTypeInputInfo(att->atttypid, &typinput, &typioparam);
453 			slot->tts_values[i] =
454 				OidInputFunctionCall(typinput, values[remoteattnum],
455 									 typioparam, att->atttypmod);
456 			slot->tts_isnull[i] = false;
457 
458 			errarg.remote_attnum = -1;
459 		}
460 		else
461 		{
462 			slot->tts_values[i] = (Datum) 0;
463 			slot->tts_isnull[i] = true;
464 		}
465 	}
466 
467 	/* Pop the error context stack */
468 	error_context_stack = errcallback.previous;
469 
470 	/* And finally, declare that "slot" contains a valid virtual tuple */
471 	ExecStoreVirtualTuple(slot);
472 }
473 
474 /*
475  * Handle BEGIN message.
476  */
477 static void
apply_handle_begin(StringInfo s)478 apply_handle_begin(StringInfo s)
479 {
480 	LogicalRepBeginData begin_data;
481 
482 	logicalrep_read_begin(s, &begin_data);
483 
484 	remote_final_lsn = begin_data.final_lsn;
485 
486 	in_remote_transaction = true;
487 
488 	pgstat_report_activity(STATE_RUNNING, NULL);
489 }
490 
491 /*
492  * Handle COMMIT message.
493  *
494  * TODO, support tracking of multiple origins
495  */
496 static void
apply_handle_commit(StringInfo s)497 apply_handle_commit(StringInfo s)
498 {
499 	LogicalRepCommitData commit_data;
500 
501 	logicalrep_read_commit(s, &commit_data);
502 
503 	if (commit_data.commit_lsn != remote_final_lsn)
504 		ereport(ERROR,
505 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
506 				 errmsg_internal("incorrect commit LSN %X/%X in commit message (expected %X/%X)",
507 								 (uint32) (commit_data.commit_lsn >> 32),
508 								 (uint32) commit_data.commit_lsn,
509 								 (uint32) (remote_final_lsn >> 32),
510 								 (uint32) remote_final_lsn)));
511 
512 	/* The synchronization worker runs in single transaction. */
513 	if (IsTransactionState() && !am_tablesync_worker())
514 	{
515 		/*
516 		 * Update origin state so we can restart streaming from correct
517 		 * position in case of crash.
518 		 */
519 		replorigin_session_origin_lsn = commit_data.end_lsn;
520 		replorigin_session_origin_timestamp = commit_data.committime;
521 
522 		CommitTransactionCommand();
523 		pgstat_report_stat(false);
524 
525 		store_flush_position(commit_data.end_lsn);
526 	}
527 	else
528 	{
529 		/* Process any invalidation messages that might have accumulated. */
530 		AcceptInvalidationMessages();
531 		maybe_reread_subscription();
532 	}
533 
534 	in_remote_transaction = false;
535 
536 	/* Process any tables that are being synchronized in parallel. */
537 	process_syncing_tables(commit_data.end_lsn);
538 
539 	pgstat_report_activity(STATE_IDLE, NULL);
540 }
541 
542 /*
543  * Handle ORIGIN message.
544  *
545  * TODO, support tracking of multiple origins
546  */
547 static void
apply_handle_origin(StringInfo s)548 apply_handle_origin(StringInfo s)
549 {
550 	/*
551 	 * ORIGIN message can only come inside remote transaction and before any
552 	 * actual writes.
553 	 */
554 	if (!in_remote_transaction ||
555 		(IsTransactionState() && !am_tablesync_worker()))
556 		ereport(ERROR,
557 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
558 				 errmsg("ORIGIN message sent out of order")));
559 }
560 
561 /*
562  * Handle RELATION message.
563  *
564  * Note we don't do validation against local schema here. The validation
565  * against local schema is postponed until first change for given relation
566  * comes as we only care about it when applying changes for it anyway and we
567  * do less locking this way.
568  */
569 static void
apply_handle_relation(StringInfo s)570 apply_handle_relation(StringInfo s)
571 {
572 	LogicalRepRelation *rel;
573 
574 	rel = logicalrep_read_rel(s);
575 	logicalrep_relmap_update(rel);
576 }
577 
578 /*
579  * Handle TYPE message.
580  *
581  * This is now vestigial; we read the info and discard it.
582  */
583 static void
apply_handle_type(StringInfo s)584 apply_handle_type(StringInfo s)
585 {
586 	LogicalRepTyp typ;
587 
588 	logicalrep_read_typ(s, &typ);
589 }
590 
591 /*
592  * Get replica identity index or if it is not defined a primary key.
593  *
594  * If neither is defined, returns InvalidOid
595  */
596 static Oid
GetRelationIdentityOrPK(Relation rel)597 GetRelationIdentityOrPK(Relation rel)
598 {
599 	Oid			idxoid;
600 
601 	idxoid = RelationGetReplicaIndex(rel);
602 
603 	if (!OidIsValid(idxoid))
604 		idxoid = RelationGetPrimaryKeyIndex(rel);
605 
606 	return idxoid;
607 }
608 
609 /*
610  * Handle INSERT message.
611  */
612 static void
apply_handle_insert(StringInfo s)613 apply_handle_insert(StringInfo s)
614 {
615 	LogicalRepRelMapEntry *rel;
616 	LogicalRepTupleData newtup;
617 	LogicalRepRelId relid;
618 	EState	   *estate;
619 	TupleTableSlot *remoteslot;
620 	MemoryContext oldctx;
621 
622 	begin_replication_step();
623 
624 	relid = logicalrep_read_insert(s, &newtup);
625 	rel = logicalrep_rel_open(relid, RowExclusiveLock);
626 	if (!should_apply_changes_for_rel(rel))
627 	{
628 		/*
629 		 * The relation can't become interesting in the middle of the
630 		 * transaction so it's safe to unlock it.
631 		 */
632 		logicalrep_rel_close(rel, RowExclusiveLock);
633 		end_replication_step();
634 		return;
635 	}
636 
637 	/* Initialize the executor state. */
638 	estate = create_estate_for_relation(rel);
639 	remoteslot = ExecInitExtraTupleSlot(estate,
640 										RelationGetDescr(rel->localrel));
641 
642 	/* Process and store remote tuple in the slot */
643 	oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
644 	slot_store_cstrings(remoteslot, rel, newtup.values);
645 	slot_fill_defaults(rel, estate, remoteslot);
646 	MemoryContextSwitchTo(oldctx);
647 
648 	ExecOpenIndices(estate->es_result_relation_info, false);
649 
650 	/* Do the insert. */
651 	ExecSimpleRelationInsert(estate, remoteslot);
652 
653 	/* Cleanup. */
654 	ExecCloseIndices(estate->es_result_relation_info);
655 
656 	finish_estate(estate);
657 
658 	logicalrep_rel_close(rel, NoLock);
659 
660 	end_replication_step();
661 }
662 
663 /*
664  * Check if the logical replication relation is updatable and throw
665  * appropriate error if it isn't.
666  */
667 static void
check_relation_updatable(LogicalRepRelMapEntry * rel)668 check_relation_updatable(LogicalRepRelMapEntry *rel)
669 {
670 	/* Updatable, no error. */
671 	if (rel->updatable)
672 		return;
673 
674 	/*
675 	 * We are in error mode so it's fine this is somewhat slow. It's better to
676 	 * give user correct error.
677 	 */
678 	if (OidIsValid(GetRelationIdentityOrPK(rel->localrel)))
679 	{
680 		ereport(ERROR,
681 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
682 				 errmsg("publisher did not send replica identity column "
683 						"expected by the logical replication target relation \"%s.%s\"",
684 						rel->remoterel.nspname, rel->remoterel.relname)));
685 	}
686 
687 	ereport(ERROR,
688 			(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
689 			 errmsg("logical replication target relation \"%s.%s\" has "
690 					"neither REPLICA IDENTITY index nor PRIMARY "
691 					"KEY and published relation does not have "
692 					"REPLICA IDENTITY FULL",
693 					rel->remoterel.nspname, rel->remoterel.relname)));
694 }
695 
696 /*
697  * Handle UPDATE message.
698  *
699  * TODO: FDW support
700  */
701 static void
apply_handle_update(StringInfo s)702 apply_handle_update(StringInfo s)
703 {
704 	LogicalRepRelMapEntry *rel;
705 	LogicalRepRelId relid;
706 	Oid			idxoid;
707 	EState	   *estate;
708 	EPQState	epqstate;
709 	LogicalRepTupleData oldtup;
710 	LogicalRepTupleData newtup;
711 	bool		has_oldtup;
712 	TupleTableSlot *localslot;
713 	TupleTableSlot *remoteslot;
714 	RangeTblEntry *target_rte;
715 	int			i;
716 	bool		found;
717 	MemoryContext oldctx;
718 
719 	begin_replication_step();
720 
721 	relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
722 								   &newtup);
723 	rel = logicalrep_rel_open(relid, RowExclusiveLock);
724 	if (!should_apply_changes_for_rel(rel))
725 	{
726 		/*
727 		 * The relation can't become interesting in the middle of the
728 		 * transaction so it's safe to unlock it.
729 		 */
730 		logicalrep_rel_close(rel, RowExclusiveLock);
731 		end_replication_step();
732 		return;
733 	}
734 
735 	/* Check if we can do the update. */
736 	check_relation_updatable(rel);
737 
738 	/* Initialize the executor state. */
739 	estate = create_estate_for_relation(rel);
740 	remoteslot = ExecInitExtraTupleSlot(estate,
741 										RelationGetDescr(rel->localrel));
742 	localslot = ExecInitExtraTupleSlot(estate,
743 									   RelationGetDescr(rel->localrel));
744 	EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
745 
746 	/*
747 	 * Populate updatedCols so that per-column triggers can fire.  This could
748 	 * include more columns than were actually changed on the publisher
749 	 * because the logical replication protocol doesn't contain that
750 	 * information.  But it would for example exclude columns that only exist
751 	 * on the subscriber, since we are not touching those.
752 	 */
753 	target_rte = list_nth(estate->es_range_table, 0);
754 	for (i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
755 	{
756 		Form_pg_attribute att = TupleDescAttr(remoteslot->tts_tupleDescriptor, i);
757 		int			remoteattnum = rel->attrmap[i];
758 
759 		if (!att->attisdropped && remoteattnum >= 0)
760 		{
761 			if (newtup.changed[remoteattnum])
762 				target_rte->updatedCols =
763 					bms_add_member(target_rte->updatedCols,
764 								   i + 1 - FirstLowInvalidHeapAttributeNumber);
765 		}
766 	}
767 
768 	ExecOpenIndices(estate->es_result_relation_info, false);
769 
770 	/* Build the search tuple. */
771 	oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
772 	slot_store_cstrings(remoteslot, rel,
773 						has_oldtup ? oldtup.values : newtup.values);
774 	MemoryContextSwitchTo(oldctx);
775 
776 	/*
777 	 * Try to find tuple using either replica identity index, primary key or
778 	 * if needed, sequential scan.
779 	 */
780 	idxoid = GetRelationIdentityOrPK(rel->localrel);
781 	Assert(OidIsValid(idxoid) ||
782 		   (rel->remoterel.replident == REPLICA_IDENTITY_FULL && has_oldtup));
783 
784 	if (OidIsValid(idxoid))
785 		found = RelationFindReplTupleByIndex(rel->localrel, idxoid,
786 											 LockTupleExclusive,
787 											 remoteslot, localslot);
788 	else
789 		found = RelationFindReplTupleSeq(rel->localrel, LockTupleExclusive,
790 										 remoteslot, localslot);
791 
792 	ExecClearTuple(remoteslot);
793 
794 	/*
795 	 * Tuple found.
796 	 *
797 	 * Note this will fail if there are other conflicting unique indexes.
798 	 */
799 	if (found)
800 	{
801 		/* Process and store remote tuple in the slot */
802 		oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
803 		slot_modify_cstrings(remoteslot, localslot, rel,
804 							 newtup.values, newtup.changed);
805 		MemoryContextSwitchTo(oldctx);
806 
807 		EvalPlanQualSetSlot(&epqstate, remoteslot);
808 
809 		/* Do the actual update. */
810 		ExecSimpleRelationUpdate(estate, &epqstate, localslot, remoteslot);
811 	}
812 	else
813 	{
814 		/*
815 		 * The tuple to be updated could not be found.
816 		 *
817 		 * TODO what to do here, change the log level to LOG perhaps?
818 		 */
819 		elog(DEBUG1,
820 			 "logical replication did not find row for update "
821 			 "in replication target relation \"%s\"",
822 			 RelationGetRelationName(rel->localrel));
823 	}
824 
825 	/* Cleanup. */
826 	EvalPlanQualEnd(&epqstate);
827 	ExecCloseIndices(estate->es_result_relation_info);
828 
829 	finish_estate(estate);
830 
831 	logicalrep_rel_close(rel, NoLock);
832 
833 	end_replication_step();
834 }
835 
836 /*
837  * Handle DELETE message.
838  *
839  * TODO: FDW support
840  */
841 static void
apply_handle_delete(StringInfo s)842 apply_handle_delete(StringInfo s)
843 {
844 	LogicalRepRelMapEntry *rel;
845 	LogicalRepTupleData oldtup;
846 	LogicalRepRelId relid;
847 	Oid			idxoid;
848 	EState	   *estate;
849 	EPQState	epqstate;
850 	TupleTableSlot *remoteslot;
851 	TupleTableSlot *localslot;
852 	bool		found;
853 	MemoryContext oldctx;
854 
855 	begin_replication_step();
856 
857 	relid = logicalrep_read_delete(s, &oldtup);
858 	rel = logicalrep_rel_open(relid, RowExclusiveLock);
859 	if (!should_apply_changes_for_rel(rel))
860 	{
861 		/*
862 		 * The relation can't become interesting in the middle of the
863 		 * transaction so it's safe to unlock it.
864 		 */
865 		logicalrep_rel_close(rel, RowExclusiveLock);
866 		end_replication_step();
867 		return;
868 	}
869 
870 	/* Check if we can do the delete. */
871 	check_relation_updatable(rel);
872 
873 	/* Initialize the executor state. */
874 	estate = create_estate_for_relation(rel);
875 	remoteslot = ExecInitExtraTupleSlot(estate,
876 										RelationGetDescr(rel->localrel));
877 	localslot = ExecInitExtraTupleSlot(estate,
878 									   RelationGetDescr(rel->localrel));
879 	EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
880 
881 	ExecOpenIndices(estate->es_result_relation_info, false);
882 
883 	/* Find the tuple using the replica identity index. */
884 	oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
885 	slot_store_cstrings(remoteslot, rel, oldtup.values);
886 	MemoryContextSwitchTo(oldctx);
887 
888 	/*
889 	 * Try to find tuple using either replica identity index, primary key or
890 	 * if needed, sequential scan.
891 	 */
892 	idxoid = GetRelationIdentityOrPK(rel->localrel);
893 	Assert(OidIsValid(idxoid) ||
894 		   (rel->remoterel.replident == REPLICA_IDENTITY_FULL));
895 
896 	if (OidIsValid(idxoid))
897 		found = RelationFindReplTupleByIndex(rel->localrel, idxoid,
898 											 LockTupleExclusive,
899 											 remoteslot, localslot);
900 	else
901 		found = RelationFindReplTupleSeq(rel->localrel, LockTupleExclusive,
902 										 remoteslot, localslot);
903 	/* If found delete it. */
904 	if (found)
905 	{
906 		EvalPlanQualSetSlot(&epqstate, localslot);
907 
908 		/* Do the actual delete. */
909 		ExecSimpleRelationDelete(estate, &epqstate, localslot);
910 	}
911 	else
912 	{
913 		/* The tuple to be deleted could not be found. */
914 		elog(DEBUG1,
915 			 "logical replication could not find row for delete "
916 			 "in replication target relation \"%s\"",
917 			 RelationGetRelationName(rel->localrel));
918 	}
919 
920 	/* Cleanup. */
921 	EvalPlanQualEnd(&epqstate);
922 	ExecCloseIndices(estate->es_result_relation_info);
923 
924 	finish_estate(estate);
925 
926 	logicalrep_rel_close(rel, NoLock);
927 
928 	end_replication_step();
929 }
930 
931 /*
932  * Handle TRUNCATE message.
933  *
934  * TODO: FDW support
935  */
936 static void
apply_handle_truncate(StringInfo s)937 apply_handle_truncate(StringInfo s)
938 {
939 	bool		cascade = false;
940 	bool		restart_seqs = false;
941 	List	   *remote_relids = NIL;
942 	List	   *remote_rels = NIL;
943 	List	   *rels = NIL;
944 	List	   *relids = NIL;
945 	List	   *relids_logged = NIL;
946 	ListCell   *lc;
947 	LOCKMODE	lockmode = AccessExclusiveLock;
948 
949 	begin_replication_step();
950 
951 	remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
952 
953 	foreach(lc, remote_relids)
954 	{
955 		LogicalRepRelId relid = lfirst_oid(lc);
956 		LogicalRepRelMapEntry *rel;
957 
958 		rel = logicalrep_rel_open(relid, lockmode);
959 		if (!should_apply_changes_for_rel(rel))
960 		{
961 			/*
962 			 * The relation can't become interesting in the middle of the
963 			 * transaction so it's safe to unlock it.
964 			 */
965 			logicalrep_rel_close(rel, lockmode);
966 			continue;
967 		}
968 
969 		remote_rels = lappend(remote_rels, rel);
970 		rels = lappend(rels, rel->localrel);
971 		relids = lappend_oid(relids, rel->localreloid);
972 		if (RelationIsLogicallyLogged(rel->localrel))
973 			relids_logged = lappend_oid(relids_logged, rel->localreloid);
974 	}
975 
976 	/*
977 	 * Even if we used CASCADE on the upstream master we explicitly default to
978 	 * replaying changes without further cascading. This might be later
979 	 * changeable with a user specified option.
980 	 */
981 	ExecuteTruncateGuts(rels, relids, relids_logged, DROP_RESTRICT, restart_seqs);
982 
983 	foreach(lc, remote_rels)
984 	{
985 		LogicalRepRelMapEntry *rel = lfirst(lc);
986 
987 		logicalrep_rel_close(rel, NoLock);
988 	}
989 
990 	end_replication_step();
991 }
992 
993 
994 /*
995  * Logical replication protocol message dispatcher.
996  */
997 static void
apply_dispatch(StringInfo s)998 apply_dispatch(StringInfo s)
999 {
1000 	char		action = pq_getmsgbyte(s);
1001 
1002 	switch (action)
1003 	{
1004 			/* BEGIN */
1005 		case 'B':
1006 			apply_handle_begin(s);
1007 			break;
1008 			/* COMMIT */
1009 		case 'C':
1010 			apply_handle_commit(s);
1011 			break;
1012 			/* INSERT */
1013 		case 'I':
1014 			apply_handle_insert(s);
1015 			break;
1016 			/* UPDATE */
1017 		case 'U':
1018 			apply_handle_update(s);
1019 			break;
1020 			/* DELETE */
1021 		case 'D':
1022 			apply_handle_delete(s);
1023 			break;
1024 			/* TRUNCATE */
1025 		case 'T':
1026 			apply_handle_truncate(s);
1027 			break;
1028 			/* RELATION */
1029 		case 'R':
1030 			apply_handle_relation(s);
1031 			break;
1032 			/* TYPE */
1033 		case 'Y':
1034 			apply_handle_type(s);
1035 			break;
1036 			/* ORIGIN */
1037 		case 'O':
1038 			apply_handle_origin(s);
1039 			break;
1040 		default:
1041 			ereport(ERROR,
1042 					(errcode(ERRCODE_PROTOCOL_VIOLATION),
1043 					 errmsg("invalid logical replication message type \"%c\"", action)));
1044 	}
1045 }
1046 
1047 /*
1048  * Figure out which write/flush positions to report to the walsender process.
1049  *
1050  * We can't simply report back the last LSN the walsender sent us because the
1051  * local transaction might not yet be flushed to disk locally. Instead we
1052  * build a list that associates local with remote LSNs for every commit. When
1053  * reporting back the flush position to the sender we iterate that list and
1054  * check which entries on it are already locally flushed. Those we can report
1055  * as having been flushed.
1056  *
1057  * The have_pending_txes is true if there are outstanding transactions that
1058  * need to be flushed.
1059  */
1060 static void
get_flush_position(XLogRecPtr * write,XLogRecPtr * flush,bool * have_pending_txes)1061 get_flush_position(XLogRecPtr *write, XLogRecPtr *flush,
1062 				   bool *have_pending_txes)
1063 {
1064 	dlist_mutable_iter iter;
1065 	XLogRecPtr	local_flush = GetFlushRecPtr();
1066 
1067 	*write = InvalidXLogRecPtr;
1068 	*flush = InvalidXLogRecPtr;
1069 
1070 	dlist_foreach_modify(iter, &lsn_mapping)
1071 	{
1072 		FlushPosition *pos =
1073 		dlist_container(FlushPosition, node, iter.cur);
1074 
1075 		*write = pos->remote_end;
1076 
1077 		if (pos->local_end <= local_flush)
1078 		{
1079 			*flush = pos->remote_end;
1080 			dlist_delete(iter.cur);
1081 			pfree(pos);
1082 		}
1083 		else
1084 		{
1085 			/*
1086 			 * Don't want to uselessly iterate over the rest of the list which
1087 			 * could potentially be long. Instead get the last element and
1088 			 * grab the write position from there.
1089 			 */
1090 			pos = dlist_tail_element(FlushPosition, node,
1091 									 &lsn_mapping);
1092 			*write = pos->remote_end;
1093 			*have_pending_txes = true;
1094 			return;
1095 		}
1096 	}
1097 
1098 	*have_pending_txes = !dlist_is_empty(&lsn_mapping);
1099 }
1100 
1101 /*
1102  * Store current remote/local lsn pair in the tracking list.
1103  */
1104 static void
store_flush_position(XLogRecPtr remote_lsn)1105 store_flush_position(XLogRecPtr remote_lsn)
1106 {
1107 	FlushPosition *flushpos;
1108 
1109 	/* Need to do this in permanent context */
1110 	MemoryContextSwitchTo(ApplyContext);
1111 
1112 	/* Track commit lsn  */
1113 	flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
1114 	flushpos->local_end = XactLastCommitEnd;
1115 	flushpos->remote_end = remote_lsn;
1116 
1117 	dlist_push_tail(&lsn_mapping, &flushpos->node);
1118 	MemoryContextSwitchTo(ApplyMessageContext);
1119 }
1120 
1121 
1122 /* Update statistics of the worker. */
1123 static void
UpdateWorkerStats(XLogRecPtr last_lsn,TimestampTz send_time,bool reply)1124 UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
1125 {
1126 	MyLogicalRepWorker->last_lsn = last_lsn;
1127 	MyLogicalRepWorker->last_send_time = send_time;
1128 	MyLogicalRepWorker->last_recv_time = GetCurrentTimestamp();
1129 	if (reply)
1130 	{
1131 		MyLogicalRepWorker->reply_lsn = last_lsn;
1132 		MyLogicalRepWorker->reply_time = send_time;
1133 	}
1134 }
1135 
1136 /*
1137  * Apply main loop.
1138  */
1139 static void
LogicalRepApplyLoop(XLogRecPtr last_received)1140 LogicalRepApplyLoop(XLogRecPtr last_received)
1141 {
1142 	TimestampTz last_recv_timestamp = GetCurrentTimestamp();
1143 	bool		ping_sent = false;
1144 
1145 	/*
1146 	 * Init the ApplyMessageContext which we clean up after each replication
1147 	 * protocol message.
1148 	 */
1149 	ApplyMessageContext = AllocSetContextCreate(ApplyContext,
1150 												"ApplyMessageContext",
1151 												ALLOCSET_DEFAULT_SIZES);
1152 
1153 	/* mark as idle, before starting to loop */
1154 	pgstat_report_activity(STATE_IDLE, NULL);
1155 
1156 	/* This outer loop iterates once per wait. */
1157 	for (;;)
1158 	{
1159 		pgsocket	fd = PGINVALID_SOCKET;
1160 		int			rc;
1161 		int			len;
1162 		char	   *buf = NULL;
1163 		bool		endofstream = false;
1164 		long		wait_time;
1165 
1166 		CHECK_FOR_INTERRUPTS();
1167 
1168 		MemoryContextSwitchTo(ApplyMessageContext);
1169 
1170 		len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
1171 
1172 		if (len != 0)
1173 		{
1174 			/* Loop to process all available data (without blocking). */
1175 			for (;;)
1176 			{
1177 				CHECK_FOR_INTERRUPTS();
1178 
1179 				if (len == 0)
1180 				{
1181 					break;
1182 				}
1183 				else if (len < 0)
1184 				{
1185 					ereport(LOG,
1186 							(errmsg("data stream from publisher has ended")));
1187 					endofstream = true;
1188 					break;
1189 				}
1190 				else
1191 				{
1192 					int			c;
1193 					StringInfoData s;
1194 
1195 					/* Reset timeout. */
1196 					last_recv_timestamp = GetCurrentTimestamp();
1197 					ping_sent = false;
1198 
1199 					/* Ensure we are reading the data into our memory context. */
1200 					MemoryContextSwitchTo(ApplyMessageContext);
1201 
1202 					s.data = buf;
1203 					s.len = len;
1204 					s.cursor = 0;
1205 					s.maxlen = -1;
1206 
1207 					c = pq_getmsgbyte(&s);
1208 
1209 					if (c == 'w')
1210 					{
1211 						XLogRecPtr	start_lsn;
1212 						XLogRecPtr	end_lsn;
1213 						TimestampTz send_time;
1214 
1215 						start_lsn = pq_getmsgint64(&s);
1216 						end_lsn = pq_getmsgint64(&s);
1217 						send_time = pq_getmsgint64(&s);
1218 
1219 						if (last_received < start_lsn)
1220 							last_received = start_lsn;
1221 
1222 						if (last_received < end_lsn)
1223 							last_received = end_lsn;
1224 
1225 						UpdateWorkerStats(last_received, send_time, false);
1226 
1227 						apply_dispatch(&s);
1228 					}
1229 					else if (c == 'k')
1230 					{
1231 						XLogRecPtr	end_lsn;
1232 						TimestampTz timestamp;
1233 						bool		reply_requested;
1234 
1235 						end_lsn = pq_getmsgint64(&s);
1236 						timestamp = pq_getmsgint64(&s);
1237 						reply_requested = pq_getmsgbyte(&s);
1238 
1239 						if (last_received < end_lsn)
1240 							last_received = end_lsn;
1241 
1242 						send_feedback(last_received, reply_requested, false);
1243 						UpdateWorkerStats(last_received, timestamp, true);
1244 					}
1245 					/* other message types are purposefully ignored */
1246 
1247 					MemoryContextReset(ApplyMessageContext);
1248 				}
1249 
1250 				len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
1251 			}
1252 		}
1253 
1254 		/* confirm all writes so far */
1255 		send_feedback(last_received, false, false);
1256 
1257 		if (!in_remote_transaction)
1258 		{
1259 			/*
1260 			 * If we didn't get any transactions for a while there might be
1261 			 * unconsumed invalidation messages in the queue, consume them
1262 			 * now.
1263 			 */
1264 			AcceptInvalidationMessages();
1265 			maybe_reread_subscription();
1266 
1267 			/* Process any table synchronization changes. */
1268 			process_syncing_tables(last_received);
1269 		}
1270 
1271 		/* Cleanup the memory. */
1272 		MemoryContextResetAndDeleteChildren(ApplyMessageContext);
1273 		MemoryContextSwitchTo(TopMemoryContext);
1274 
1275 		/* Check if we need to exit the streaming loop. */
1276 		if (endofstream)
1277 		{
1278 			TimeLineID	tli;
1279 
1280 			walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
1281 			break;
1282 		}
1283 
1284 		/*
1285 		 * Wait for more data or latch.  If we have unflushed transactions,
1286 		 * wake up after WalWriterDelay to see if they've been flushed yet (in
1287 		 * which case we should send a feedback message).  Otherwise, there's
1288 		 * no particular urgency about waking up unless we get data or a
1289 		 * signal.
1290 		 */
1291 		if (!dlist_is_empty(&lsn_mapping))
1292 			wait_time = WalWriterDelay;
1293 		else
1294 			wait_time = NAPTIME_PER_CYCLE;
1295 
1296 		rc = WaitLatchOrSocket(MyLatch,
1297 							   WL_SOCKET_READABLE | WL_LATCH_SET |
1298 							   WL_TIMEOUT | WL_POSTMASTER_DEATH,
1299 							   fd, wait_time,
1300 							   WAIT_EVENT_LOGICAL_APPLY_MAIN);
1301 
1302 		/* Emergency bailout if postmaster has died */
1303 		if (rc & WL_POSTMASTER_DEATH)
1304 			proc_exit(1);
1305 
1306 		if (rc & WL_LATCH_SET)
1307 		{
1308 			ResetLatch(MyLatch);
1309 			CHECK_FOR_INTERRUPTS();
1310 		}
1311 
1312 		if (got_SIGHUP)
1313 		{
1314 			got_SIGHUP = false;
1315 			ProcessConfigFile(PGC_SIGHUP);
1316 		}
1317 
1318 		if (rc & WL_TIMEOUT)
1319 		{
1320 			/*
1321 			 * We didn't receive anything new. If we haven't heard anything
1322 			 * from the server for more than wal_receiver_timeout / 2, ping
1323 			 * the server. Also, if it's been longer than
1324 			 * wal_receiver_status_interval since the last update we sent,
1325 			 * send a status update to the master anyway, to report any
1326 			 * progress in applying WAL.
1327 			 */
1328 			bool		requestReply = false;
1329 
1330 			/*
1331 			 * Check if time since last receive from standby has reached the
1332 			 * configured limit.
1333 			 */
1334 			if (wal_receiver_timeout > 0)
1335 			{
1336 				TimestampTz now = GetCurrentTimestamp();
1337 				TimestampTz timeout;
1338 
1339 				timeout =
1340 					TimestampTzPlusMilliseconds(last_recv_timestamp,
1341 												wal_receiver_timeout);
1342 
1343 				if (now >= timeout)
1344 					ereport(ERROR,
1345 							(errmsg("terminating logical replication worker due to timeout")));
1346 
1347 				/* Check to see if it's time for a ping. */
1348 				if (!ping_sent)
1349 				{
1350 					timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
1351 														  (wal_receiver_timeout / 2));
1352 					if (now >= timeout)
1353 					{
1354 						requestReply = true;
1355 						ping_sent = true;
1356 					}
1357 				}
1358 			}
1359 
1360 			send_feedback(last_received, requestReply, requestReply);
1361 		}
1362 	}
1363 }
1364 
1365 /*
1366  * Send a Standby Status Update message to server.
1367  *
1368  * 'recvpos' is the latest LSN we've received data to, force is set if we need
1369  * to send a response to avoid timeouts.
1370  */
1371 static void
send_feedback(XLogRecPtr recvpos,bool force,bool requestReply)1372 send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
1373 {
1374 	static StringInfo reply_message = NULL;
1375 	static TimestampTz send_time = 0;
1376 
1377 	static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
1378 	static XLogRecPtr last_writepos = InvalidXLogRecPtr;
1379 	static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
1380 
1381 	XLogRecPtr	writepos;
1382 	XLogRecPtr	flushpos;
1383 	TimestampTz now;
1384 	bool		have_pending_txes;
1385 
1386 	/*
1387 	 * If the user doesn't want status to be reported to the publisher, be
1388 	 * sure to exit before doing anything at all.
1389 	 */
1390 	if (!force && wal_receiver_status_interval <= 0)
1391 		return;
1392 
1393 	/* It's legal to not pass a recvpos */
1394 	if (recvpos < last_recvpos)
1395 		recvpos = last_recvpos;
1396 
1397 	get_flush_position(&writepos, &flushpos, &have_pending_txes);
1398 
1399 	/*
1400 	 * No outstanding transactions to flush, we can report the latest received
1401 	 * position. This is important for synchronous replication.
1402 	 */
1403 	if (!have_pending_txes)
1404 		flushpos = writepos = recvpos;
1405 
1406 	if (writepos < last_writepos)
1407 		writepos = last_writepos;
1408 
1409 	if (flushpos < last_flushpos)
1410 		flushpos = last_flushpos;
1411 
1412 	now = GetCurrentTimestamp();
1413 
1414 	/* if we've already reported everything we're good */
1415 	if (!force &&
1416 		writepos == last_writepos &&
1417 		flushpos == last_flushpos &&
1418 		!TimestampDifferenceExceeds(send_time, now,
1419 									wal_receiver_status_interval * 1000))
1420 		return;
1421 	send_time = now;
1422 
1423 	if (!reply_message)
1424 	{
1425 		MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
1426 
1427 		reply_message = makeStringInfo();
1428 		MemoryContextSwitchTo(oldctx);
1429 	}
1430 	else
1431 		resetStringInfo(reply_message);
1432 
1433 	pq_sendbyte(reply_message, 'r');
1434 	pq_sendint64(reply_message, recvpos);	/* write */
1435 	pq_sendint64(reply_message, flushpos);	/* flush */
1436 	pq_sendint64(reply_message, writepos);	/* apply */
1437 	pq_sendint64(reply_message, now);	/* sendTime */
1438 	pq_sendbyte(reply_message, requestReply);	/* replyRequested */
1439 
1440 	elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
1441 		 force,
1442 		 (uint32) (recvpos >> 32), (uint32) recvpos,
1443 		 (uint32) (writepos >> 32), (uint32) writepos,
1444 		 (uint32) (flushpos >> 32), (uint32) flushpos
1445 		);
1446 
1447 	walrcv_send(LogRepWorkerWalRcvConn,
1448 				reply_message->data, reply_message->len);
1449 
1450 	if (recvpos > last_recvpos)
1451 		last_recvpos = recvpos;
1452 	if (writepos > last_writepos)
1453 		last_writepos = writepos;
1454 	if (flushpos > last_flushpos)
1455 		last_flushpos = flushpos;
1456 }
1457 
1458 /*
1459  * Reread subscription info if needed. Most changes will be exit.
1460  */
1461 static void
maybe_reread_subscription(void)1462 maybe_reread_subscription(void)
1463 {
1464 	MemoryContext oldctx;
1465 	Subscription *newsub;
1466 	bool		started_tx = false;
1467 
1468 	/* When cache state is valid there is nothing to do here. */
1469 	if (MySubscriptionValid)
1470 		return;
1471 
1472 	/* This function might be called inside or outside of transaction. */
1473 	if (!IsTransactionState())
1474 	{
1475 		StartTransactionCommand();
1476 		started_tx = true;
1477 	}
1478 
1479 	/* Ensure allocations in permanent context. */
1480 	oldctx = MemoryContextSwitchTo(ApplyContext);
1481 
1482 	newsub = GetSubscription(MyLogicalRepWorker->subid, true);
1483 
1484 	/*
1485 	 * Exit if the subscription was removed. This normally should not happen
1486 	 * as the worker gets killed during DROP SUBSCRIPTION.
1487 	 */
1488 	if (!newsub)
1489 	{
1490 		ereport(LOG,
1491 				(errmsg("logical replication apply worker for subscription \"%s\" will "
1492 						"stop because the subscription was removed",
1493 						MySubscription->name)));
1494 
1495 		proc_exit(0);
1496 	}
1497 
1498 	/*
1499 	 * Exit if the subscription was disabled. This normally should not happen
1500 	 * as the worker gets killed during ALTER SUBSCRIPTION ... DISABLE.
1501 	 */
1502 	if (!newsub->enabled)
1503 	{
1504 		ereport(LOG,
1505 				(errmsg("logical replication apply worker for subscription \"%s\" will "
1506 						"stop because the subscription was disabled",
1507 						MySubscription->name)));
1508 
1509 		proc_exit(0);
1510 	}
1511 
1512 	/*
1513 	 * Exit if connection string was changed. The launcher will start new
1514 	 * worker.
1515 	 */
1516 	if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0)
1517 	{
1518 		ereport(LOG,
1519 				(errmsg("logical replication apply worker for subscription \"%s\" will "
1520 						"restart because the connection information was changed",
1521 						MySubscription->name)));
1522 
1523 		proc_exit(0);
1524 	}
1525 
1526 	/*
1527 	 * Exit if subscription name was changed (it's used for
1528 	 * fallback_application_name). The launcher will start new worker.
1529 	 */
1530 	if (strcmp(newsub->name, MySubscription->name) != 0)
1531 	{
1532 		ereport(LOG,
1533 				(errmsg("logical replication apply worker for subscription \"%s\" will "
1534 						"restart because subscription was renamed",
1535 						MySubscription->name)));
1536 
1537 		proc_exit(0);
1538 	}
1539 
1540 	/* !slotname should never happen when enabled is true. */
1541 	Assert(newsub->slotname);
1542 
1543 	/*
1544 	 * We need to make new connection to new slot if slot name has changed so
1545 	 * exit here as well if that's the case.
1546 	 */
1547 	if (strcmp(newsub->slotname, MySubscription->slotname) != 0)
1548 	{
1549 		ereport(LOG,
1550 				(errmsg("logical replication apply worker for subscription \"%s\" will "
1551 						"restart because the replication slot name was changed",
1552 						MySubscription->name)));
1553 
1554 		proc_exit(0);
1555 	}
1556 
1557 	/*
1558 	 * Exit if publication list was changed. The launcher will start new
1559 	 * worker.
1560 	 */
1561 	if (!equal(newsub->publications, MySubscription->publications))
1562 	{
1563 		ereport(LOG,
1564 				(errmsg("logical replication apply worker for subscription \"%s\" will "
1565 						"restart because subscription's publications were changed",
1566 						MySubscription->name)));
1567 
1568 		proc_exit(0);
1569 	}
1570 
1571 	/* Check for other changes that should never happen too. */
1572 	if (newsub->dbid != MySubscription->dbid)
1573 	{
1574 		elog(ERROR, "subscription %u changed unexpectedly",
1575 			 MyLogicalRepWorker->subid);
1576 	}
1577 
1578 	/* Clean old subscription info and switch to new one. */
1579 	FreeSubscription(MySubscription);
1580 	MySubscription = newsub;
1581 
1582 	MemoryContextSwitchTo(oldctx);
1583 
1584 	/* Change synchronous commit according to the user's wishes */
1585 	SetConfigOption("synchronous_commit", MySubscription->synccommit,
1586 					PGC_BACKEND, PGC_S_OVERRIDE);
1587 
1588 	if (started_tx)
1589 		CommitTransactionCommand();
1590 
1591 	MySubscriptionValid = true;
1592 }
1593 
1594 /*
1595  * Callback from subscription syscache invalidation.
1596  */
1597 static void
subscription_change_cb(Datum arg,int cacheid,uint32 hashvalue)1598 subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
1599 {
1600 	MySubscriptionValid = false;
1601 }
1602 
1603 /* SIGHUP: set flag to reload configuration at next convenient time */
1604 static void
logicalrep_worker_sighup(SIGNAL_ARGS)1605 logicalrep_worker_sighup(SIGNAL_ARGS)
1606 {
1607 	int			save_errno = errno;
1608 
1609 	got_SIGHUP = true;
1610 
1611 	/* Waken anything waiting on the process latch */
1612 	SetLatch(MyLatch);
1613 
1614 	errno = save_errno;
1615 }
1616 
1617 /* Logical Replication Apply worker entry point */
1618 void
ApplyWorkerMain(Datum main_arg)1619 ApplyWorkerMain(Datum main_arg)
1620 {
1621 	int			worker_slot = DatumGetInt32(main_arg);
1622 	MemoryContext oldctx;
1623 	char		originname[NAMEDATALEN];
1624 	XLogRecPtr	origin_startpos;
1625 	char	   *myslotname;
1626 	WalRcvStreamOptions options;
1627 
1628 	/* Attach to slot */
1629 	logicalrep_worker_attach(worker_slot);
1630 
1631 	/* Setup signal handling */
1632 	pqsignal(SIGHUP, logicalrep_worker_sighup);
1633 	pqsignal(SIGTERM, die);
1634 	BackgroundWorkerUnblockSignals();
1635 
1636 	/* Initialise stats to a sanish value */
1637 	MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
1638 		MyLogicalRepWorker->reply_time = GetCurrentTimestamp();
1639 
1640 	/* Load the libpq-specific functions */
1641 	load_file("libpqwalreceiver", false);
1642 
1643 	Assert(CurrentResourceOwner == NULL);
1644 	CurrentResourceOwner = ResourceOwnerCreate(NULL,
1645 											   "logical replication apply");
1646 
1647 	/* Run as replica session replication role. */
1648 	SetConfigOption("session_replication_role", "replica",
1649 					PGC_SUSET, PGC_S_OVERRIDE);
1650 
1651 	/* Connect to our database. */
1652 	BackgroundWorkerInitializeConnectionByOid(MyLogicalRepWorker->dbid,
1653 											  MyLogicalRepWorker->userid,
1654 											  0);
1655 
1656 	/*
1657 	 * Set always-secure search path, so malicious users can't redirect user
1658 	 * code (e.g. pg_index.indexprs).
1659 	 */
1660 	SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
1661 
1662 	/* Load the subscription into persistent memory context. */
1663 	ApplyContext = AllocSetContextCreate(TopMemoryContext,
1664 										 "ApplyContext",
1665 										 ALLOCSET_DEFAULT_SIZES);
1666 	StartTransactionCommand();
1667 	oldctx = MemoryContextSwitchTo(ApplyContext);
1668 
1669 	MySubscription = GetSubscription(MyLogicalRepWorker->subid, true);
1670 	if (!MySubscription)
1671 	{
1672 		ereport(LOG,
1673 				(errmsg("logical replication apply worker for subscription %u will not "
1674 						"start because the subscription was removed during startup",
1675 						MyLogicalRepWorker->subid)));
1676 		proc_exit(0);
1677 	}
1678 
1679 	MySubscriptionValid = true;
1680 	MemoryContextSwitchTo(oldctx);
1681 
1682 	if (!MySubscription->enabled)
1683 	{
1684 		ereport(LOG,
1685 				(errmsg("logical replication apply worker for subscription \"%s\" will not "
1686 						"start because the subscription was disabled during startup",
1687 						MySubscription->name)));
1688 
1689 		proc_exit(0);
1690 	}
1691 
1692 	/* Setup synchronous commit according to the user's wishes */
1693 	SetConfigOption("synchronous_commit", MySubscription->synccommit,
1694 					PGC_BACKEND, PGC_S_OVERRIDE);
1695 
1696 	/* Keep us informed about subscription changes. */
1697 	CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
1698 								  subscription_change_cb,
1699 								  (Datum) 0);
1700 
1701 	if (am_tablesync_worker())
1702 		ereport(LOG,
1703 				(errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
1704 						MySubscription->name, get_rel_name(MyLogicalRepWorker->relid))));
1705 	else
1706 		ereport(LOG,
1707 				(errmsg("logical replication apply worker for subscription \"%s\" has started",
1708 						MySubscription->name)));
1709 
1710 	CommitTransactionCommand();
1711 
1712 	/* Connect to the origin and start the replication. */
1713 	elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
1714 		 MySubscription->conninfo);
1715 
1716 	if (am_tablesync_worker())
1717 	{
1718 		char	   *syncslotname;
1719 
1720 		/* This is table synchroniation worker, call initial sync. */
1721 		syncslotname = LogicalRepSyncTableStart(&origin_startpos);
1722 
1723 		/* The slot name needs to be allocated in permanent memory context. */
1724 		oldctx = MemoryContextSwitchTo(ApplyContext);
1725 		myslotname = pstrdup(syncslotname);
1726 		MemoryContextSwitchTo(oldctx);
1727 
1728 		pfree(syncslotname);
1729 	}
1730 	else
1731 	{
1732 		/* This is main apply worker */
1733 		RepOriginId originid;
1734 		TimeLineID	startpointTLI;
1735 		char	   *err;
1736 		int			server_version;
1737 
1738 		myslotname = MySubscription->slotname;
1739 
1740 		/*
1741 		 * This shouldn't happen if the subscription is enabled, but guard
1742 		 * against DDL bugs or manual catalog changes.  (libpqwalreceiver will
1743 		 * crash if slot is NULL.)
1744 		 */
1745 		if (!myslotname)
1746 			ereport(ERROR,
1747 					(errmsg("subscription has no replication slot set")));
1748 
1749 		/* Setup replication origin tracking. */
1750 		StartTransactionCommand();
1751 		snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid);
1752 		originid = replorigin_by_name(originname, true);
1753 		if (!OidIsValid(originid))
1754 			originid = replorigin_create(originname);
1755 		replorigin_session_setup(originid);
1756 		replorigin_session_origin = originid;
1757 		origin_startpos = replorigin_session_get_progress(false);
1758 		CommitTransactionCommand();
1759 
1760 		LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
1761 												MySubscription->name, &err);
1762 		if (LogRepWorkerWalRcvConn == NULL)
1763 			ereport(ERROR,
1764 					(errmsg("could not connect to the publisher: %s", err)));
1765 
1766 		/*
1767 		 * We don't really use the output identify_system for anything but it
1768 		 * does some initializations on the upstream so let's still call it.
1769 		 */
1770 		(void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI,
1771 									  &server_version);
1772 	}
1773 
1774 	/*
1775 	 * Setup callback for syscache so that we know when something changes in
1776 	 * the subscription relation state.
1777 	 */
1778 	CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
1779 								  invalidate_syncing_table_states,
1780 								  (Datum) 0);
1781 
1782 	/* Build logical replication streaming options. */
1783 	options.logical = true;
1784 	options.startpoint = origin_startpos;
1785 	options.slotname = myslotname;
1786 	options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM;
1787 	options.proto.logical.publication_names = MySubscription->publications;
1788 
1789 	/* Start normal logical streaming replication. */
1790 	walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
1791 
1792 	/* Run the main loop. */
1793 	LogicalRepApplyLoop(origin_startpos);
1794 
1795 	proc_exit(0);
1796 }
1797 
1798 /*
1799  * Is current process a logical replication worker?
1800  */
1801 bool
IsLogicalWorker(void)1802 IsLogicalWorker(void)
1803 {
1804 	return MyLogicalRepWorker != NULL;
1805 }
1806