1 /*-------------------------------------------------------------------------
2  * worker.c
3  *	   PostgreSQL logical replication worker (apply)
4  *
5  * Copyright (c) 2016-2020, PostgreSQL Global Development Group
6  *
7  * IDENTIFICATION
8  *	  src/backend/replication/logical/worker.c
9  *
10  * NOTES
11  *	  This file contains the worker which applies logical changes as they come
12  *	  from remote logical replication stream.
13  *
14  *	  The main worker (apply) is started by logical replication worker
15  *	  launcher for every enabled subscription in a database. It uses
16  *	  walsender protocol to communicate with publisher.
17  *
18  *	  This module includes server facing code and shares libpqwalreceiver
19  *	  module with walreceiver for providing the libpq specific functionality.
20  *
21  *-------------------------------------------------------------------------
22  */
23 
24 #include "postgres.h"
25 
26 #include "access/table.h"
27 #include "access/tableam.h"
28 #include "access/xact.h"
29 #include "access/xlog_internal.h"
30 #include "catalog/catalog.h"
31 #include "catalog/namespace.h"
32 #include "catalog/partition.h"
33 #include "catalog/pg_inherits.h"
34 #include "catalog/pg_subscription.h"
35 #include "catalog/pg_subscription_rel.h"
36 #include "commands/tablecmds.h"
37 #include "commands/trigger.h"
38 #include "executor/executor.h"
39 #include "executor/execPartition.h"
40 #include "executor/nodeModifyTable.h"
41 #include "funcapi.h"
42 #include "libpq/pqformat.h"
43 #include "libpq/pqsignal.h"
44 #include "mb/pg_wchar.h"
45 #include "miscadmin.h"
46 #include "nodes/makefuncs.h"
47 #include "optimizer/optimizer.h"
48 #include "pgstat.h"
49 #include "postmaster/bgworker.h"
50 #include "postmaster/interrupt.h"
51 #include "postmaster/postmaster.h"
52 #include "postmaster/walwriter.h"
53 #include "replication/decode.h"
54 #include "replication/logical.h"
55 #include "replication/logicalproto.h"
56 #include "replication/logicalrelation.h"
57 #include "replication/logicalworker.h"
58 #include "replication/origin.h"
59 #include "replication/reorderbuffer.h"
60 #include "replication/snapbuild.h"
61 #include "replication/walreceiver.h"
62 #include "replication/worker_internal.h"
63 #include "rewrite/rewriteHandler.h"
64 #include "storage/bufmgr.h"
65 #include "storage/ipc.h"
66 #include "storage/lmgr.h"
67 #include "storage/proc.h"
68 #include "storage/procarray.h"
69 #include "tcop/tcopprot.h"
70 #include "utils/builtins.h"
71 #include "utils/catcache.h"
72 #include "utils/datum.h"
73 #include "utils/fmgroids.h"
74 #include "utils/guc.h"
75 #include "utils/inval.h"
76 #include "utils/lsyscache.h"
77 #include "utils/memutils.h"
78 #include "utils/rel.h"
79 #include "utils/syscache.h"
80 #include "utils/timeout.h"
81 
82 #define NAPTIME_PER_CYCLE 1000	/* max sleep time between cycles (1s) */
83 
84 typedef struct FlushPosition
85 {
86 	dlist_node	node;
87 	XLogRecPtr	local_end;
88 	XLogRecPtr	remote_end;
89 } FlushPosition;
90 
91 static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
92 
93 typedef struct SlotErrCallbackArg
94 {
95 	LogicalRepRelMapEntry *rel;
96 	int			remote_attnum;
97 } SlotErrCallbackArg;
98 
99 typedef struct ApplyExecutionData
100 {
101 	EState	   *estate;			/* executor state, used to track resources */
102 
103 	LogicalRepRelMapEntry *targetRel;	/* replication target rel */
104 	ResultRelInfo *targetRelInfo;	/* ResultRelInfo for same */
105 
106 	/* These fields are used when the target relation is partitioned: */
107 	ModifyTableState *mtstate;	/* dummy ModifyTable state */
108 	PartitionTupleRouting *proute;	/* partition routing info */
109 } ApplyExecutionData;
110 
111 static MemoryContext ApplyMessageContext = NULL;
112 MemoryContext ApplyContext = NULL;
113 
114 WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
115 
116 Subscription *MySubscription = NULL;
117 bool		MySubscriptionValid = false;
118 
119 bool		in_remote_transaction = false;
120 static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
121 
122 static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
123 
124 static void store_flush_position(XLogRecPtr remote_lsn);
125 
126 static void maybe_reread_subscription(void);
127 
128 static void apply_handle_insert_internal(ResultRelInfo *relinfo,
129 										 EState *estate, TupleTableSlot *remoteslot);
130 static void apply_handle_update_internal(ResultRelInfo *relinfo,
131 										 EState *estate, TupleTableSlot *remoteslot,
132 										 LogicalRepTupleData *newtup,
133 										 LogicalRepRelMapEntry *relmapentry);
134 static void apply_handle_delete_internal(ResultRelInfo *relinfo, EState *estate,
135 										 TupleTableSlot *remoteslot,
136 										 LogicalRepRelation *remoterel);
137 static bool FindReplTupleInLocalRel(EState *estate, Relation localrel,
138 									LogicalRepRelation *remoterel,
139 									TupleTableSlot *remoteslot,
140 									TupleTableSlot **localslot);
141 static void apply_handle_tuple_routing(ApplyExecutionData *edata,
142 									   TupleTableSlot *remoteslot,
143 									   LogicalRepTupleData *newtup,
144 									   CmdType operation);
145 
146 /*
147  * Should this worker apply changes for given relation.
148  *
149  * This is mainly needed for initial relation data sync as that runs in
150  * separate worker process running in parallel and we need some way to skip
151  * changes coming to the main apply worker during the sync of a table.
152  *
153  * Note we need to do smaller or equals comparison for SYNCDONE state because
154  * it might hold position of end of initial slot consistent point WAL
155  * record + 1 (ie start of next record) and next record can be COMMIT of
156  * transaction we are now processing (which is what we set remote_final_lsn
157  * to in apply_handle_begin).
158  */
159 static bool
should_apply_changes_for_rel(LogicalRepRelMapEntry * rel)160 should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
161 {
162 	if (am_tablesync_worker())
163 		return MyLogicalRepWorker->relid == rel->localreloid;
164 	else
165 		return (rel->state == SUBREL_STATE_READY ||
166 				(rel->state == SUBREL_STATE_SYNCDONE &&
167 				 rel->statelsn <= remote_final_lsn));
168 }
169 
170 /*
171  * Begin one step (one INSERT, UPDATE, etc) of a replication transaction.
172  *
173  * Start a transaction, if this is the first step (else we keep using the
174  * existing transaction).
175  * Also provide a global snapshot and ensure we run in ApplyMessageContext.
176  */
177 static void
begin_replication_step(void)178 begin_replication_step(void)
179 {
180 	SetCurrentStatementStartTimestamp();
181 
182 	if (!IsTransactionState())
183 	{
184 		StartTransactionCommand();
185 		maybe_reread_subscription();
186 	}
187 
188 	PushActiveSnapshot(GetTransactionSnapshot());
189 
190 	MemoryContextSwitchTo(ApplyMessageContext);
191 }
192 
193 /*
194  * Finish up one step of a replication transaction.
195  * Callers of begin_replication_step() must also call this.
196  *
197  * We don't close out the transaction here, but we should increment
198  * the command counter to make the effects of this step visible.
199  */
200 static void
end_replication_step(void)201 end_replication_step(void)
202 {
203 	PopActiveSnapshot();
204 
205 	CommandCounterIncrement();
206 }
207 
208 
209 /*
210  * Executor state preparation for evaluation of constraint expressions,
211  * indexes and triggers for the specified relation.
212  *
213  * Note that the caller must open and close any indexes to be updated.
214  */
215 static ApplyExecutionData *
create_edata_for_relation(LogicalRepRelMapEntry * rel)216 create_edata_for_relation(LogicalRepRelMapEntry *rel)
217 {
218 	ApplyExecutionData *edata;
219 	EState	   *estate;
220 	ResultRelInfo *resultRelInfo;
221 	RangeTblEntry *rte;
222 
223 	edata = (ApplyExecutionData *) palloc0(sizeof(ApplyExecutionData));
224 	edata->targetRel = rel;
225 
226 	edata->estate = estate = CreateExecutorState();
227 
228 	rte = makeNode(RangeTblEntry);
229 	rte->rtekind = RTE_RELATION;
230 	rte->relid = RelationGetRelid(rel->localrel);
231 	rte->relkind = rel->localrel->rd_rel->relkind;
232 	rte->rellockmode = AccessShareLock;
233 	ExecInitRangeTable(estate, list_make1(rte));
234 
235 	edata->targetRelInfo = resultRelInfo = makeNode(ResultRelInfo);
236 
237 	/*
238 	 * Use Relation opened by logicalrep_rel_open() instead of opening it
239 	 * again.
240 	 */
241 	InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
242 
243 	estate->es_result_relations = resultRelInfo;
244 	estate->es_num_result_relations = 1;
245 	estate->es_result_relation_info = resultRelInfo;
246 
247 	estate->es_output_cid = GetCurrentCommandId(true);
248 
249 	/* Prepare to catch AFTER triggers. */
250 	AfterTriggerBeginQuery();
251 
252 	/* other fields of edata remain NULL for now */
253 
254 	return edata;
255 }
256 
257 /*
258  * Finish any operations related to the executor state created by
259  * create_edata_for_relation().
260  */
261 static void
finish_edata(ApplyExecutionData * edata)262 finish_edata(ApplyExecutionData *edata)
263 {
264 	EState	   *estate = edata->estate;
265 
266 	/* Handle any queued AFTER triggers. */
267 	AfterTriggerEndQuery(estate);
268 
269 	/* Shut down tuple routing, if any was done. */
270 	if (edata->proute)
271 		ExecCleanupTupleRouting(edata->mtstate, edata->proute);
272 
273 	/*
274 	 * Cleanup.  It might seem that we should call ExecCloseResultRelations()
275 	 * here, but we intentionally don't.  It would close the rel we added to
276 	 * the estate above, which is wrong because we took no corresponding
277 	 * refcount.  We rely on ExecCleanupTupleRouting() to close any other
278 	 * relations opened during execution.
279 	 */
280 	ExecResetTupleTable(estate->es_tupleTable, false);
281 	FreeExecutorState(estate);
282 	pfree(edata);
283 }
284 
285 /*
286  * Executes default values for columns for which we can't map to remote
287  * relation columns.
288  *
289  * This allows us to support tables which have more columns on the downstream
290  * than on the upstream.
291  */
292 static void
slot_fill_defaults(LogicalRepRelMapEntry * rel,EState * estate,TupleTableSlot * slot)293 slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate,
294 				   TupleTableSlot *slot)
295 {
296 	TupleDesc	desc = RelationGetDescr(rel->localrel);
297 	int			num_phys_attrs = desc->natts;
298 	int			i;
299 	int			attnum,
300 				num_defaults = 0;
301 	int		   *defmap;
302 	ExprState **defexprs;
303 	ExprContext *econtext;
304 
305 	econtext = GetPerTupleExprContext(estate);
306 
307 	/* We got all the data via replication, no need to evaluate anything. */
308 	if (num_phys_attrs == rel->remoterel.natts)
309 		return;
310 
311 	defmap = (int *) palloc(num_phys_attrs * sizeof(int));
312 	defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
313 
314 	Assert(rel->attrmap->maplen == num_phys_attrs);
315 	for (attnum = 0; attnum < num_phys_attrs; attnum++)
316 	{
317 		Expr	   *defexpr;
318 
319 		if (TupleDescAttr(desc, attnum)->attisdropped || TupleDescAttr(desc, attnum)->attgenerated)
320 			continue;
321 
322 		if (rel->attrmap->attnums[attnum] >= 0)
323 			continue;
324 
325 		defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
326 
327 		if (defexpr != NULL)
328 		{
329 			/* Run the expression through planner */
330 			defexpr = expression_planner(defexpr);
331 
332 			/* Initialize executable expression in copycontext */
333 			defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
334 			defmap[num_defaults] = attnum;
335 			num_defaults++;
336 		}
337 
338 	}
339 
340 	for (i = 0; i < num_defaults; i++)
341 		slot->tts_values[defmap[i]] =
342 			ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
343 }
344 
345 /*
346  * Error callback to give more context info about data conversion failures
347  * while reading data from the remote server.
348  */
349 static void
slot_store_error_callback(void * arg)350 slot_store_error_callback(void *arg)
351 {
352 	SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
353 	LogicalRepRelMapEntry *rel;
354 
355 	/* Nothing to do if remote attribute number is not set */
356 	if (errarg->remote_attnum < 0)
357 		return;
358 
359 	rel = errarg->rel;
360 	errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\"",
361 			   rel->remoterel.nspname, rel->remoterel.relname,
362 			   rel->remoterel.attnames[errarg->remote_attnum]);
363 }
364 
365 /*
366  * Store data in C string form into slot.
367  * This is similar to BuildTupleFromCStrings but TupleTableSlot fits our
368  * use better.
369  */
370 static void
slot_store_cstrings(TupleTableSlot * slot,LogicalRepRelMapEntry * rel,char ** values)371 slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
372 					char **values)
373 {
374 	int			natts = slot->tts_tupleDescriptor->natts;
375 	int			i;
376 	SlotErrCallbackArg errarg;
377 	ErrorContextCallback errcallback;
378 
379 	ExecClearTuple(slot);
380 
381 	/* Push callback + info on the error context stack */
382 	errarg.rel = rel;
383 	errarg.remote_attnum = -1;
384 	errcallback.callback = slot_store_error_callback;
385 	errcallback.arg = (void *) &errarg;
386 	errcallback.previous = error_context_stack;
387 	error_context_stack = &errcallback;
388 
389 	/* Call the "in" function for each non-dropped attribute */
390 	Assert(natts == rel->attrmap->maplen);
391 	for (i = 0; i < natts; i++)
392 	{
393 		Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
394 		int			remoteattnum = rel->attrmap->attnums[i];
395 
396 		if (!att->attisdropped && remoteattnum >= 0 &&
397 			values[remoteattnum] != NULL)
398 		{
399 			Oid			typinput;
400 			Oid			typioparam;
401 
402 			errarg.remote_attnum = remoteattnum;
403 
404 			getTypeInputInfo(att->atttypid, &typinput, &typioparam);
405 			slot->tts_values[i] =
406 				OidInputFunctionCall(typinput, values[remoteattnum],
407 									 typioparam, att->atttypmod);
408 			slot->tts_isnull[i] = false;
409 
410 			errarg.remote_attnum = -1;
411 		}
412 		else
413 		{
414 			/*
415 			 * We assign NULL to dropped attributes, NULL values, and missing
416 			 * values (missing values should be later filled using
417 			 * slot_fill_defaults).
418 			 */
419 			slot->tts_values[i] = (Datum) 0;
420 			slot->tts_isnull[i] = true;
421 		}
422 	}
423 
424 	/* Pop the error context stack */
425 	error_context_stack = errcallback.previous;
426 
427 	ExecStoreVirtualTuple(slot);
428 }
429 
430 /*
431  * Replace selected columns with user data provided as C strings.
432  * This is somewhat similar to heap_modify_tuple but also calls the type
433  * input functions on the user data.
434  * "slot" is filled with a copy of the tuple in "srcslot", with
435  * columns selected by the "replaces" array replaced with data values
436  * from "values".
437  * Caution: unreplaced pass-by-ref columns in "slot" will point into the
438  * storage for "srcslot".  This is OK for current usage, but someday we may
439  * need to materialize "slot" at the end to make it independent of "srcslot".
440  */
441 static void
slot_modify_cstrings(TupleTableSlot * slot,TupleTableSlot * srcslot,LogicalRepRelMapEntry * rel,char ** values,bool * replaces)442 slot_modify_cstrings(TupleTableSlot *slot, TupleTableSlot *srcslot,
443 					 LogicalRepRelMapEntry *rel,
444 					 char **values, bool *replaces)
445 {
446 	int			natts = slot->tts_tupleDescriptor->natts;
447 	int			i;
448 	SlotErrCallbackArg errarg;
449 	ErrorContextCallback errcallback;
450 
451 	/* We'll fill "slot" with a virtual tuple, so we must start with ... */
452 	ExecClearTuple(slot);
453 
454 	/*
455 	 * Copy all the column data from srcslot, so that we'll have valid values
456 	 * for unreplaced columns.
457 	 */
458 	Assert(natts == srcslot->tts_tupleDescriptor->natts);
459 	slot_getallattrs(srcslot);
460 	memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
461 	memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
462 
463 	/* For error reporting, push callback + info on the error context stack */
464 	errarg.rel = rel;
465 	errarg.remote_attnum = -1;
466 	errcallback.callback = slot_store_error_callback;
467 	errcallback.arg = (void *) &errarg;
468 	errcallback.previous = error_context_stack;
469 	error_context_stack = &errcallback;
470 
471 	/* Call the "in" function for each replaced attribute */
472 	Assert(natts == rel->attrmap->maplen);
473 	for (i = 0; i < natts; i++)
474 	{
475 		Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
476 		int			remoteattnum = rel->attrmap->attnums[i];
477 
478 		if (remoteattnum < 0)
479 			continue;
480 
481 		if (!replaces[remoteattnum])
482 			continue;
483 
484 		if (values[remoteattnum] != NULL)
485 		{
486 			Oid			typinput;
487 			Oid			typioparam;
488 
489 			errarg.remote_attnum = remoteattnum;
490 
491 			getTypeInputInfo(att->atttypid, &typinput, &typioparam);
492 			slot->tts_values[i] =
493 				OidInputFunctionCall(typinput, values[remoteattnum],
494 									 typioparam, att->atttypmod);
495 			slot->tts_isnull[i] = false;
496 
497 			errarg.remote_attnum = -1;
498 		}
499 		else
500 		{
501 			slot->tts_values[i] = (Datum) 0;
502 			slot->tts_isnull[i] = true;
503 		}
504 	}
505 
506 	/* Pop the error context stack */
507 	error_context_stack = errcallback.previous;
508 
509 	/* And finally, declare that "slot" contains a valid virtual tuple */
510 	ExecStoreVirtualTuple(slot);
511 }
512 
513 /*
514  * Handle BEGIN message.
515  */
516 static void
apply_handle_begin(StringInfo s)517 apply_handle_begin(StringInfo s)
518 {
519 	LogicalRepBeginData begin_data;
520 
521 	logicalrep_read_begin(s, &begin_data);
522 
523 	remote_final_lsn = begin_data.final_lsn;
524 
525 	in_remote_transaction = true;
526 
527 	pgstat_report_activity(STATE_RUNNING, NULL);
528 }
529 
530 /*
531  * Handle COMMIT message.
532  *
533  * TODO, support tracking of multiple origins
534  */
535 static void
apply_handle_commit(StringInfo s)536 apply_handle_commit(StringInfo s)
537 {
538 	LogicalRepCommitData commit_data;
539 
540 	logicalrep_read_commit(s, &commit_data);
541 
542 	if (commit_data.commit_lsn != remote_final_lsn)
543 		ereport(ERROR,
544 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
545 				 errmsg_internal("incorrect commit LSN %X/%X in commit message (expected %X/%X)",
546 								 (uint32) (commit_data.commit_lsn >> 32),
547 								 (uint32) commit_data.commit_lsn,
548 								 (uint32) (remote_final_lsn >> 32),
549 								 (uint32) remote_final_lsn)));
550 
551 	/* The synchronization worker runs in single transaction. */
552 	if (IsTransactionState() && !am_tablesync_worker())
553 	{
554 		/*
555 		 * Update origin state so we can restart streaming from correct
556 		 * position in case of crash.
557 		 */
558 		replorigin_session_origin_lsn = commit_data.end_lsn;
559 		replorigin_session_origin_timestamp = commit_data.committime;
560 
561 		CommitTransactionCommand();
562 		pgstat_report_stat(false);
563 
564 		store_flush_position(commit_data.end_lsn);
565 	}
566 	else
567 	{
568 		/* Process any invalidation messages that might have accumulated. */
569 		AcceptInvalidationMessages();
570 		maybe_reread_subscription();
571 	}
572 
573 	in_remote_transaction = false;
574 
575 	/* Process any tables that are being synchronized in parallel. */
576 	process_syncing_tables(commit_data.end_lsn);
577 
578 	pgstat_report_activity(STATE_IDLE, NULL);
579 }
580 
581 /*
582  * Handle ORIGIN message.
583  *
584  * TODO, support tracking of multiple origins
585  */
586 static void
apply_handle_origin(StringInfo s)587 apply_handle_origin(StringInfo s)
588 {
589 	/*
590 	 * ORIGIN message can only come inside remote transaction and before any
591 	 * actual writes.
592 	 */
593 	if (!in_remote_transaction ||
594 		(IsTransactionState() && !am_tablesync_worker()))
595 		ereport(ERROR,
596 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
597 				 errmsg("ORIGIN message sent out of order")));
598 }
599 
600 /*
601  * Handle RELATION message.
602  *
603  * Note we don't do validation against local schema here. The validation
604  * against local schema is postponed until first change for given relation
605  * comes as we only care about it when applying changes for it anyway and we
606  * do less locking this way.
607  */
608 static void
apply_handle_relation(StringInfo s)609 apply_handle_relation(StringInfo s)
610 {
611 	LogicalRepRelation *rel;
612 
613 	rel = logicalrep_read_rel(s);
614 	logicalrep_relmap_update(rel);
615 }
616 
617 /*
618  * Handle TYPE message.
619  *
620  * This is now vestigial; we read the info and discard it.
621  */
622 static void
apply_handle_type(StringInfo s)623 apply_handle_type(StringInfo s)
624 {
625 	LogicalRepTyp typ;
626 
627 	logicalrep_read_typ(s, &typ);
628 }
629 
630 /*
631  * Get replica identity index or if it is not defined a primary key.
632  *
633  * If neither is defined, returns InvalidOid
634  */
635 static Oid
GetRelationIdentityOrPK(Relation rel)636 GetRelationIdentityOrPK(Relation rel)
637 {
638 	Oid			idxoid;
639 
640 	idxoid = RelationGetReplicaIndex(rel);
641 
642 	if (!OidIsValid(idxoid))
643 		idxoid = RelationGetPrimaryKeyIndex(rel);
644 
645 	return idxoid;
646 }
647 
648 /*
649  * Handle INSERT message.
650  */
651 
652 static void
apply_handle_insert(StringInfo s)653 apply_handle_insert(StringInfo s)
654 {
655 	LogicalRepRelMapEntry *rel;
656 	LogicalRepTupleData newtup;
657 	LogicalRepRelId relid;
658 	ApplyExecutionData *edata;
659 	EState	   *estate;
660 	TupleTableSlot *remoteslot;
661 	MemoryContext oldctx;
662 
663 	begin_replication_step();
664 
665 	relid = logicalrep_read_insert(s, &newtup);
666 	rel = logicalrep_rel_open(relid, RowExclusiveLock);
667 	if (!should_apply_changes_for_rel(rel))
668 	{
669 		/*
670 		 * The relation can't become interesting in the middle of the
671 		 * transaction so it's safe to unlock it.
672 		 */
673 		logicalrep_rel_close(rel, RowExclusiveLock);
674 		end_replication_step();
675 		return;
676 	}
677 
678 	/* Initialize the executor state. */
679 	edata = create_edata_for_relation(rel);
680 	estate = edata->estate;
681 	remoteslot = ExecInitExtraTupleSlot(estate,
682 										RelationGetDescr(rel->localrel),
683 										&TTSOpsVirtual);
684 
685 	/* Process and store remote tuple in the slot */
686 	oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
687 	slot_store_cstrings(remoteslot, rel, newtup.values);
688 	slot_fill_defaults(rel, estate, remoteslot);
689 	MemoryContextSwitchTo(oldctx);
690 
691 	/* For a partitioned table, insert the tuple into a partition. */
692 	if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
693 		apply_handle_tuple_routing(edata,
694 								   remoteslot, NULL, CMD_INSERT);
695 	else
696 		apply_handle_insert_internal(estate->es_result_relation_info, estate,
697 									 remoteslot);
698 
699 	finish_edata(edata);
700 
701 	logicalrep_rel_close(rel, NoLock);
702 
703 	end_replication_step();
704 }
705 
706 /* Workhorse for apply_handle_insert() */
707 static void
apply_handle_insert_internal(ResultRelInfo * relinfo,EState * estate,TupleTableSlot * remoteslot)708 apply_handle_insert_internal(ResultRelInfo *relinfo,
709 							 EState *estate, TupleTableSlot *remoteslot)
710 {
711 	ExecOpenIndices(relinfo, false);
712 
713 	/* Do the insert. */
714 	ExecSimpleRelationInsert(estate, remoteslot);
715 
716 	/* Cleanup. */
717 	ExecCloseIndices(relinfo);
718 }
719 
720 /*
721  * Check if the logical replication relation is updatable and throw
722  * appropriate error if it isn't.
723  */
724 static void
check_relation_updatable(LogicalRepRelMapEntry * rel)725 check_relation_updatable(LogicalRepRelMapEntry *rel)
726 {
727 	/* Updatable, no error. */
728 	if (rel->updatable)
729 		return;
730 
731 	/*
732 	 * We are in error mode so it's fine this is somewhat slow. It's better to
733 	 * give user correct error.
734 	 */
735 	if (OidIsValid(GetRelationIdentityOrPK(rel->localrel)))
736 	{
737 		ereport(ERROR,
738 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
739 				 errmsg("publisher did not send replica identity column "
740 						"expected by the logical replication target relation \"%s.%s\"",
741 						rel->remoterel.nspname, rel->remoterel.relname)));
742 	}
743 
744 	ereport(ERROR,
745 			(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
746 			 errmsg("logical replication target relation \"%s.%s\" has "
747 					"neither REPLICA IDENTITY index nor PRIMARY "
748 					"KEY and published relation does not have "
749 					"REPLICA IDENTITY FULL",
750 					rel->remoterel.nspname, rel->remoterel.relname)));
751 }
752 
753 /*
754  * Handle UPDATE message.
755  *
756  * TODO: FDW support
757  */
758 static void
apply_handle_update(StringInfo s)759 apply_handle_update(StringInfo s)
760 {
761 	LogicalRepRelMapEntry *rel;
762 	LogicalRepRelId relid;
763 	ApplyExecutionData *edata;
764 	EState	   *estate;
765 	LogicalRepTupleData oldtup;
766 	LogicalRepTupleData newtup;
767 	bool		has_oldtup;
768 	TupleTableSlot *remoteslot;
769 	RangeTblEntry *target_rte;
770 	MemoryContext oldctx;
771 
772 	begin_replication_step();
773 
774 	relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
775 								   &newtup);
776 	rel = logicalrep_rel_open(relid, RowExclusiveLock);
777 	if (!should_apply_changes_for_rel(rel))
778 	{
779 		/*
780 		 * The relation can't become interesting in the middle of the
781 		 * transaction so it's safe to unlock it.
782 		 */
783 		logicalrep_rel_close(rel, RowExclusiveLock);
784 		end_replication_step();
785 		return;
786 	}
787 
788 	/* Check if we can do the update. */
789 	check_relation_updatable(rel);
790 
791 	/* Initialize the executor state. */
792 	edata = create_edata_for_relation(rel);
793 	estate = edata->estate;
794 	remoteslot = ExecInitExtraTupleSlot(estate,
795 										RelationGetDescr(rel->localrel),
796 										&TTSOpsVirtual);
797 
798 	/*
799 	 * Populate updatedCols so that per-column triggers can fire.  This could
800 	 * include more columns than were actually changed on the publisher
801 	 * because the logical replication protocol doesn't contain that
802 	 * information.  But it would for example exclude columns that only exist
803 	 * on the subscriber, since we are not touching those.
804 	 */
805 	target_rte = list_nth(estate->es_range_table, 0);
806 	for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
807 	{
808 		Form_pg_attribute att = TupleDescAttr(remoteslot->tts_tupleDescriptor, i);
809 		int			remoteattnum = rel->attrmap->attnums[i];
810 
811 		if (!att->attisdropped && remoteattnum >= 0)
812 		{
813 			if (newtup.changed[remoteattnum])
814 				target_rte->updatedCols =
815 					bms_add_member(target_rte->updatedCols,
816 								   i + 1 - FirstLowInvalidHeapAttributeNumber);
817 		}
818 	}
819 
820 	/* Also populate extraUpdatedCols, in case we have generated columns */
821 	fill_extraUpdatedCols(target_rte, rel->localrel);
822 
823 	/* Build the search tuple. */
824 	oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
825 	slot_store_cstrings(remoteslot, rel,
826 						has_oldtup ? oldtup.values : newtup.values);
827 	MemoryContextSwitchTo(oldctx);
828 
829 	/* For a partitioned table, apply update to correct partition. */
830 	if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
831 		apply_handle_tuple_routing(edata,
832 								   remoteslot, &newtup, CMD_UPDATE);
833 	else
834 		apply_handle_update_internal(estate->es_result_relation_info, estate,
835 									 remoteslot, &newtup, rel);
836 
837 	finish_edata(edata);
838 
839 	logicalrep_rel_close(rel, NoLock);
840 
841 	end_replication_step();
842 }
843 
844 /* Workhorse for apply_handle_update() */
845 static void
apply_handle_update_internal(ResultRelInfo * relinfo,EState * estate,TupleTableSlot * remoteslot,LogicalRepTupleData * newtup,LogicalRepRelMapEntry * relmapentry)846 apply_handle_update_internal(ResultRelInfo *relinfo,
847 							 EState *estate, TupleTableSlot *remoteslot,
848 							 LogicalRepTupleData *newtup,
849 							 LogicalRepRelMapEntry *relmapentry)
850 {
851 	Relation	localrel = relinfo->ri_RelationDesc;
852 	EPQState	epqstate;
853 	TupleTableSlot *localslot;
854 	bool		found;
855 	MemoryContext oldctx;
856 
857 	EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
858 	ExecOpenIndices(relinfo, false);
859 
860 	found = FindReplTupleInLocalRel(estate, localrel,
861 									&relmapentry->remoterel,
862 									remoteslot, &localslot);
863 	ExecClearTuple(remoteslot);
864 
865 	/*
866 	 * Tuple found.
867 	 *
868 	 * Note this will fail if there are other conflicting unique indexes.
869 	 */
870 	if (found)
871 	{
872 		/* Process and store remote tuple in the slot */
873 		oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
874 		slot_modify_cstrings(remoteslot, localslot, relmapentry,
875 							 newtup->values, newtup->changed);
876 		MemoryContextSwitchTo(oldctx);
877 
878 		EvalPlanQualSetSlot(&epqstate, remoteslot);
879 
880 		/* Do the actual update. */
881 		ExecSimpleRelationUpdate(estate, &epqstate, localslot, remoteslot);
882 	}
883 	else
884 	{
885 		/*
886 		 * The tuple to be updated could not be found.  Do nothing except for
887 		 * emitting a log message.
888 		 *
889 		 * XXX should this be promoted to ereport(LOG) perhaps?
890 		 */
891 		elog(DEBUG1,
892 			 "logical replication did not find row to be updated "
893 			 "in replication target relation \"%s\"",
894 			 RelationGetRelationName(localrel));
895 	}
896 
897 	/* Cleanup. */
898 	ExecCloseIndices(relinfo);
899 	EvalPlanQualEnd(&epqstate);
900 }
901 
902 /*
903  * Handle DELETE message.
904  *
905  * TODO: FDW support
906  */
907 static void
apply_handle_delete(StringInfo s)908 apply_handle_delete(StringInfo s)
909 {
910 	LogicalRepRelMapEntry *rel;
911 	LogicalRepTupleData oldtup;
912 	LogicalRepRelId relid;
913 	ApplyExecutionData *edata;
914 	EState	   *estate;
915 	TupleTableSlot *remoteslot;
916 	MemoryContext oldctx;
917 
918 	begin_replication_step();
919 
920 	relid = logicalrep_read_delete(s, &oldtup);
921 	rel = logicalrep_rel_open(relid, RowExclusiveLock);
922 	if (!should_apply_changes_for_rel(rel))
923 	{
924 		/*
925 		 * The relation can't become interesting in the middle of the
926 		 * transaction so it's safe to unlock it.
927 		 */
928 		logicalrep_rel_close(rel, RowExclusiveLock);
929 		end_replication_step();
930 		return;
931 	}
932 
933 	/* Check if we can do the delete. */
934 	check_relation_updatable(rel);
935 
936 	/* Initialize the executor state. */
937 	edata = create_edata_for_relation(rel);
938 	estate = edata->estate;
939 	remoteslot = ExecInitExtraTupleSlot(estate,
940 										RelationGetDescr(rel->localrel),
941 										&TTSOpsVirtual);
942 
943 	/* Build the search tuple. */
944 	oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
945 	slot_store_cstrings(remoteslot, rel, oldtup.values);
946 	MemoryContextSwitchTo(oldctx);
947 
948 	/* For a partitioned table, apply delete to correct partition. */
949 	if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
950 		apply_handle_tuple_routing(edata,
951 								   remoteslot, NULL, CMD_DELETE);
952 	else
953 		apply_handle_delete_internal(estate->es_result_relation_info, estate,
954 									 remoteslot, &rel->remoterel);
955 
956 	finish_edata(edata);
957 
958 	logicalrep_rel_close(rel, NoLock);
959 
960 	end_replication_step();
961 }
962 
963 /* Workhorse for apply_handle_delete() */
964 static void
apply_handle_delete_internal(ResultRelInfo * relinfo,EState * estate,TupleTableSlot * remoteslot,LogicalRepRelation * remoterel)965 apply_handle_delete_internal(ResultRelInfo *relinfo, EState *estate,
966 							 TupleTableSlot *remoteslot,
967 							 LogicalRepRelation *remoterel)
968 {
969 	Relation	localrel = relinfo->ri_RelationDesc;
970 	EPQState	epqstate;
971 	TupleTableSlot *localslot;
972 	bool		found;
973 
974 	EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
975 	ExecOpenIndices(relinfo, false);
976 
977 	found = FindReplTupleInLocalRel(estate, localrel, remoterel,
978 									remoteslot, &localslot);
979 
980 	/* If found delete it. */
981 	if (found)
982 	{
983 		EvalPlanQualSetSlot(&epqstate, localslot);
984 
985 		/* Do the actual delete. */
986 		ExecSimpleRelationDelete(estate, &epqstate, localslot);
987 	}
988 	else
989 	{
990 		/*
991 		 * The tuple to be deleted could not be found.  Do nothing except for
992 		 * emitting a log message.
993 		 *
994 		 * XXX should this be promoted to ereport(LOG) perhaps?
995 		 */
996 		elog(DEBUG1,
997 			 "logical replication did not find row to be deleted "
998 			 "in replication target relation \"%s\"",
999 			 RelationGetRelationName(localrel));
1000 	}
1001 
1002 	/* Cleanup. */
1003 	ExecCloseIndices(relinfo);
1004 	EvalPlanQualEnd(&epqstate);
1005 }
1006 
1007 /*
1008  * Try to find a tuple received from the publication side (in 'remoteslot') in
1009  * the corresponding local relation using either replica identity index,
1010  * primary key or if needed, sequential scan.
1011  *
1012  * Local tuple, if found, is returned in '*localslot'.
1013  */
1014 static bool
FindReplTupleInLocalRel(EState * estate,Relation localrel,LogicalRepRelation * remoterel,TupleTableSlot * remoteslot,TupleTableSlot ** localslot)1015 FindReplTupleInLocalRel(EState *estate, Relation localrel,
1016 						LogicalRepRelation *remoterel,
1017 						TupleTableSlot *remoteslot,
1018 						TupleTableSlot **localslot)
1019 {
1020 	Oid			idxoid;
1021 	bool		found;
1022 
1023 	*localslot = table_slot_create(localrel, &estate->es_tupleTable);
1024 
1025 	idxoid = GetRelationIdentityOrPK(localrel);
1026 	Assert(OidIsValid(idxoid) ||
1027 		   (remoterel->replident == REPLICA_IDENTITY_FULL));
1028 
1029 	if (OidIsValid(idxoid))
1030 		found = RelationFindReplTupleByIndex(localrel, idxoid,
1031 											 LockTupleExclusive,
1032 											 remoteslot, *localslot);
1033 	else
1034 		found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
1035 										 remoteslot, *localslot);
1036 
1037 	return found;
1038 }
1039 
1040 /*
1041  * This handles insert, update, delete on a partitioned table.
1042  */
1043 static void
apply_handle_tuple_routing(ApplyExecutionData * edata,TupleTableSlot * remoteslot,LogicalRepTupleData * newtup,CmdType operation)1044 apply_handle_tuple_routing(ApplyExecutionData *edata,
1045 						   TupleTableSlot *remoteslot,
1046 						   LogicalRepTupleData *newtup,
1047 						   CmdType operation)
1048 {
1049 	EState	   *estate = edata->estate;
1050 	LogicalRepRelMapEntry *relmapentry = edata->targetRel;
1051 	ResultRelInfo *relinfo = edata->targetRelInfo;
1052 	Relation	parentrel = relinfo->ri_RelationDesc;
1053 	ModifyTableState *mtstate;
1054 	PartitionTupleRouting *proute;
1055 	ResultRelInfo *partrelinfo;
1056 	Relation	partrel;
1057 	TupleTableSlot *remoteslot_part;
1058 	PartitionRoutingInfo *partinfo;
1059 	TupleConversionMap *map;
1060 	MemoryContext oldctx;
1061 
1062 	/* ModifyTableState is needed for ExecFindPartition(). */
1063 	edata->mtstate = mtstate = makeNode(ModifyTableState);
1064 	mtstate->ps.plan = NULL;
1065 	mtstate->ps.state = estate;
1066 	mtstate->operation = operation;
1067 	mtstate->resultRelInfo = relinfo;
1068 
1069 	/* ... as is PartitionTupleRouting. */
1070 	edata->proute = proute = ExecSetupPartitionTupleRouting(estate, mtstate,
1071 															parentrel);
1072 
1073 	/*
1074 	 * Find the partition to which the "search tuple" belongs.
1075 	 */
1076 	Assert(remoteslot != NULL);
1077 	oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
1078 	partrelinfo = ExecFindPartition(mtstate, relinfo, proute,
1079 									remoteslot, estate);
1080 	Assert(partrelinfo != NULL);
1081 	partrel = partrelinfo->ri_RelationDesc;
1082 
1083 	/*
1084 	 * To perform any of the operations below, the tuple must match the
1085 	 * partition's rowtype. Convert if needed or just copy, using a dedicated
1086 	 * slot to store the tuple in any case.
1087 	 */
1088 	partinfo = partrelinfo->ri_PartitionInfo;
1089 	remoteslot_part = partinfo->pi_PartitionTupleSlot;
1090 	if (remoteslot_part == NULL)
1091 		remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
1092 	map = partinfo->pi_RootToPartitionMap;
1093 	if (map != NULL)
1094 		remoteslot_part = execute_attr_map_slot(map->attrMap, remoteslot,
1095 												remoteslot_part);
1096 	else
1097 	{
1098 		remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
1099 		slot_getallattrs(remoteslot_part);
1100 	}
1101 	MemoryContextSwitchTo(oldctx);
1102 
1103 	estate->es_result_relation_info = partrelinfo;
1104 	switch (operation)
1105 	{
1106 		case CMD_INSERT:
1107 			apply_handle_insert_internal(partrelinfo, estate,
1108 										 remoteslot_part);
1109 			break;
1110 
1111 		case CMD_DELETE:
1112 			apply_handle_delete_internal(partrelinfo, estate,
1113 										 remoteslot_part,
1114 										 &relmapentry->remoterel);
1115 			break;
1116 
1117 		case CMD_UPDATE:
1118 
1119 			/*
1120 			 * For UPDATE, depending on whether or not the updated tuple
1121 			 * satisfies the partition's constraint, perform a simple UPDATE
1122 			 * of the partition or move the updated tuple into a different
1123 			 * suitable partition.
1124 			 */
1125 			{
1126 				AttrMap    *attrmap = map ? map->attrMap : NULL;
1127 				LogicalRepRelMapEntry *part_entry;
1128 				TupleTableSlot *localslot;
1129 				ResultRelInfo *partrelinfo_new;
1130 				bool		found;
1131 
1132 				part_entry = logicalrep_partition_open(relmapentry, partrel,
1133 													   attrmap);
1134 
1135 				/* Get the matching local tuple from the partition. */
1136 				found = FindReplTupleInLocalRel(estate, partrel,
1137 												&part_entry->remoterel,
1138 												remoteslot_part, &localslot);
1139 				if (!found)
1140 				{
1141 					/*
1142 					 * The tuple to be updated could not be found.  Do nothing
1143 					 * except for emitting a log message.
1144 					 *
1145 					 * XXX should this be promoted to ereport(LOG) perhaps?
1146 					 */
1147 					elog(DEBUG1,
1148 						 "logical replication did not find row to be updated "
1149 						 "in replication target relation's partition \"%s\"",
1150 						 RelationGetRelationName(partrel));
1151 					return;
1152 				}
1153 
1154 				/*
1155 				 * Apply the update to the local tuple, putting the result in
1156 				 * remoteslot_part.
1157 				 */
1158 				oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
1159 				slot_modify_cstrings(remoteslot_part, localslot,
1160 									 part_entry,
1161 									 newtup->values, newtup->changed);
1162 				MemoryContextSwitchTo(oldctx);
1163 
1164 				/*
1165 				 * Does the updated tuple still satisfy the current
1166 				 * partition's constraint?
1167 				 */
1168 				if (partrelinfo->ri_PartitionCheck == NULL ||
1169 					ExecPartitionCheck(partrelinfo, remoteslot_part, estate,
1170 									   false))
1171 				{
1172 					/*
1173 					 * Yes, so simply UPDATE the partition.  We don't call
1174 					 * apply_handle_update_internal() here, which would
1175 					 * normally do the following work, to avoid repeating some
1176 					 * work already done above to find the local tuple in the
1177 					 * partition.
1178 					 */
1179 					EPQState	epqstate;
1180 
1181 					EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
1182 					ExecOpenIndices(partrelinfo, false);
1183 
1184 					EvalPlanQualSetSlot(&epqstate, remoteslot_part);
1185 					ExecSimpleRelationUpdate(estate, &epqstate, localslot,
1186 											 remoteslot_part);
1187 					ExecCloseIndices(partrelinfo);
1188 					EvalPlanQualEnd(&epqstate);
1189 				}
1190 				else
1191 				{
1192 					/* Move the tuple into the new partition. */
1193 
1194 					/*
1195 					 * New partition will be found using tuple routing, which
1196 					 * can only occur via the parent table.  We might need to
1197 					 * convert the tuple to the parent's rowtype.  Note that
1198 					 * this is the tuple found in the partition, not the
1199 					 * original search tuple received by this function.
1200 					 */
1201 					if (map)
1202 					{
1203 						TupleConversionMap *PartitionToRootMap =
1204 						convert_tuples_by_name(RelationGetDescr(partrel),
1205 											   RelationGetDescr(parentrel));
1206 
1207 						remoteslot =
1208 							execute_attr_map_slot(PartitionToRootMap->attrMap,
1209 												  remoteslot_part, remoteslot);
1210 					}
1211 					else
1212 					{
1213 						remoteslot = ExecCopySlot(remoteslot, remoteslot_part);
1214 						slot_getallattrs(remoteslot);
1215 					}
1216 
1217 
1218 					/* Find the new partition. */
1219 					oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
1220 					partrelinfo_new = ExecFindPartition(mtstate, relinfo,
1221 														proute, remoteslot,
1222 														estate);
1223 					MemoryContextSwitchTo(oldctx);
1224 					Assert(partrelinfo_new != partrelinfo);
1225 
1226 					/* DELETE old tuple found in the old partition. */
1227 					estate->es_result_relation_info = partrelinfo;
1228 					apply_handle_delete_internal(partrelinfo, estate,
1229 												 localslot,
1230 												 &relmapentry->remoterel);
1231 
1232 					/* INSERT new tuple into the new partition. */
1233 
1234 					/*
1235 					 * Convert the replacement tuple to match the destination
1236 					 * partition rowtype.
1237 					 */
1238 					oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
1239 					partrel = partrelinfo_new->ri_RelationDesc;
1240 					partinfo = partrelinfo_new->ri_PartitionInfo;
1241 					remoteslot_part = partinfo->pi_PartitionTupleSlot;
1242 					if (remoteslot_part == NULL)
1243 						remoteslot_part = table_slot_create(partrel,
1244 															&estate->es_tupleTable);
1245 					map = partinfo->pi_RootToPartitionMap;
1246 					if (map != NULL)
1247 					{
1248 						remoteslot_part = execute_attr_map_slot(map->attrMap,
1249 																remoteslot,
1250 																remoteslot_part);
1251 					}
1252 					else
1253 					{
1254 						remoteslot_part = ExecCopySlot(remoteslot_part,
1255 													   remoteslot);
1256 						slot_getallattrs(remoteslot);
1257 					}
1258 					MemoryContextSwitchTo(oldctx);
1259 					estate->es_result_relation_info = partrelinfo_new;
1260 					apply_handle_insert_internal(partrelinfo_new, estate,
1261 												 remoteslot_part);
1262 				}
1263 			}
1264 			break;
1265 
1266 		default:
1267 			elog(ERROR, "unrecognized CmdType: %d", (int) operation);
1268 			break;
1269 	}
1270 }
1271 
1272 /*
1273  * Handle TRUNCATE message.
1274  *
1275  * TODO: FDW support
1276  */
1277 static void
apply_handle_truncate(StringInfo s)1278 apply_handle_truncate(StringInfo s)
1279 {
1280 	bool		cascade = false;
1281 	bool		restart_seqs = false;
1282 	List	   *remote_relids = NIL;
1283 	List	   *remote_rels = NIL;
1284 	List	   *rels = NIL;
1285 	List	   *part_rels = NIL;
1286 	List	   *relids = NIL;
1287 	List	   *relids_logged = NIL;
1288 	ListCell   *lc;
1289 	LOCKMODE	lockmode = AccessExclusiveLock;
1290 
1291 	begin_replication_step();
1292 
1293 	remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
1294 
1295 	foreach(lc, remote_relids)
1296 	{
1297 		LogicalRepRelId relid = lfirst_oid(lc);
1298 		LogicalRepRelMapEntry *rel;
1299 
1300 		rel = logicalrep_rel_open(relid, lockmode);
1301 		if (!should_apply_changes_for_rel(rel))
1302 		{
1303 			/*
1304 			 * The relation can't become interesting in the middle of the
1305 			 * transaction so it's safe to unlock it.
1306 			 */
1307 			logicalrep_rel_close(rel, lockmode);
1308 			continue;
1309 		}
1310 
1311 		remote_rels = lappend(remote_rels, rel);
1312 		rels = lappend(rels, rel->localrel);
1313 		relids = lappend_oid(relids, rel->localreloid);
1314 		if (RelationIsLogicallyLogged(rel->localrel))
1315 			relids_logged = lappend_oid(relids_logged, rel->localreloid);
1316 
1317 		/*
1318 		 * Truncate partitions if we got a message to truncate a partitioned
1319 		 * table.
1320 		 */
1321 		if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1322 		{
1323 			ListCell   *child;
1324 			List	   *children = find_all_inheritors(rel->localreloid,
1325 													   lockmode,
1326 													   NULL);
1327 
1328 			foreach(child, children)
1329 			{
1330 				Oid			childrelid = lfirst_oid(child);
1331 				Relation	childrel;
1332 
1333 				if (list_member_oid(relids, childrelid))
1334 					continue;
1335 
1336 				/* find_all_inheritors already got lock */
1337 				childrel = table_open(childrelid, NoLock);
1338 
1339 				/*
1340 				 * Ignore temp tables of other backends.  See similar code in
1341 				 * ExecuteTruncate().
1342 				 */
1343 				if (RELATION_IS_OTHER_TEMP(childrel))
1344 				{
1345 					table_close(childrel, lockmode);
1346 					continue;
1347 				}
1348 
1349 				rels = lappend(rels, childrel);
1350 				part_rels = lappend(part_rels, childrel);
1351 				relids = lappend_oid(relids, childrelid);
1352 				/* Log this relation only if needed for logical decoding */
1353 				if (RelationIsLogicallyLogged(childrel))
1354 					relids_logged = lappend_oid(relids_logged, childrelid);
1355 			}
1356 		}
1357 	}
1358 
1359 	/*
1360 	 * Even if we used CASCADE on the upstream master we explicitly default to
1361 	 * replaying changes without further cascading. This might be later
1362 	 * changeable with a user specified option.
1363 	 */
1364 	ExecuteTruncateGuts(rels, relids, relids_logged, DROP_RESTRICT, restart_seqs);
1365 
1366 	foreach(lc, remote_rels)
1367 	{
1368 		LogicalRepRelMapEntry *rel = lfirst(lc);
1369 
1370 		logicalrep_rel_close(rel, NoLock);
1371 	}
1372 	foreach(lc, part_rels)
1373 	{
1374 		Relation	rel = lfirst(lc);
1375 
1376 		table_close(rel, NoLock);
1377 	}
1378 
1379 	end_replication_step();
1380 }
1381 
1382 
1383 /*
1384  * Logical replication protocol message dispatcher.
1385  */
1386 static void
apply_dispatch(StringInfo s)1387 apply_dispatch(StringInfo s)
1388 {
1389 	char		action = pq_getmsgbyte(s);
1390 
1391 	switch (action)
1392 	{
1393 			/* BEGIN */
1394 		case 'B':
1395 			apply_handle_begin(s);
1396 			break;
1397 			/* COMMIT */
1398 		case 'C':
1399 			apply_handle_commit(s);
1400 			break;
1401 			/* INSERT */
1402 		case 'I':
1403 			apply_handle_insert(s);
1404 			break;
1405 			/* UPDATE */
1406 		case 'U':
1407 			apply_handle_update(s);
1408 			break;
1409 			/* DELETE */
1410 		case 'D':
1411 			apply_handle_delete(s);
1412 			break;
1413 			/* TRUNCATE */
1414 		case 'T':
1415 			apply_handle_truncate(s);
1416 			break;
1417 			/* RELATION */
1418 		case 'R':
1419 			apply_handle_relation(s);
1420 			break;
1421 			/* TYPE */
1422 		case 'Y':
1423 			apply_handle_type(s);
1424 			break;
1425 			/* ORIGIN */
1426 		case 'O':
1427 			apply_handle_origin(s);
1428 			break;
1429 		default:
1430 			ereport(ERROR,
1431 					(errcode(ERRCODE_PROTOCOL_VIOLATION),
1432 					 errmsg("invalid logical replication message type \"%c\"", action)));
1433 	}
1434 }
1435 
1436 /*
1437  * Figure out which write/flush positions to report to the walsender process.
1438  *
1439  * We can't simply report back the last LSN the walsender sent us because the
1440  * local transaction might not yet be flushed to disk locally. Instead we
1441  * build a list that associates local with remote LSNs for every commit. When
1442  * reporting back the flush position to the sender we iterate that list and
1443  * check which entries on it are already locally flushed. Those we can report
1444  * as having been flushed.
1445  *
1446  * The have_pending_txes is true if there are outstanding transactions that
1447  * need to be flushed.
1448  */
1449 static void
get_flush_position(XLogRecPtr * write,XLogRecPtr * flush,bool * have_pending_txes)1450 get_flush_position(XLogRecPtr *write, XLogRecPtr *flush,
1451 				   bool *have_pending_txes)
1452 {
1453 	dlist_mutable_iter iter;
1454 	XLogRecPtr	local_flush = GetFlushRecPtr();
1455 
1456 	*write = InvalidXLogRecPtr;
1457 	*flush = InvalidXLogRecPtr;
1458 
1459 	dlist_foreach_modify(iter, &lsn_mapping)
1460 	{
1461 		FlushPosition *pos =
1462 		dlist_container(FlushPosition, node, iter.cur);
1463 
1464 		*write = pos->remote_end;
1465 
1466 		if (pos->local_end <= local_flush)
1467 		{
1468 			*flush = pos->remote_end;
1469 			dlist_delete(iter.cur);
1470 			pfree(pos);
1471 		}
1472 		else
1473 		{
1474 			/*
1475 			 * Don't want to uselessly iterate over the rest of the list which
1476 			 * could potentially be long. Instead get the last element and
1477 			 * grab the write position from there.
1478 			 */
1479 			pos = dlist_tail_element(FlushPosition, node,
1480 									 &lsn_mapping);
1481 			*write = pos->remote_end;
1482 			*have_pending_txes = true;
1483 			return;
1484 		}
1485 	}
1486 
1487 	*have_pending_txes = !dlist_is_empty(&lsn_mapping);
1488 }
1489 
1490 /*
1491  * Store current remote/local lsn pair in the tracking list.
1492  */
1493 static void
store_flush_position(XLogRecPtr remote_lsn)1494 store_flush_position(XLogRecPtr remote_lsn)
1495 {
1496 	FlushPosition *flushpos;
1497 
1498 	/* Need to do this in permanent context */
1499 	MemoryContextSwitchTo(ApplyContext);
1500 
1501 	/* Track commit lsn  */
1502 	flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
1503 	flushpos->local_end = XactLastCommitEnd;
1504 	flushpos->remote_end = remote_lsn;
1505 
1506 	dlist_push_tail(&lsn_mapping, &flushpos->node);
1507 	MemoryContextSwitchTo(ApplyMessageContext);
1508 }
1509 
1510 
1511 /* Update statistics of the worker. */
1512 static void
UpdateWorkerStats(XLogRecPtr last_lsn,TimestampTz send_time,bool reply)1513 UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
1514 {
1515 	MyLogicalRepWorker->last_lsn = last_lsn;
1516 	MyLogicalRepWorker->last_send_time = send_time;
1517 	MyLogicalRepWorker->last_recv_time = GetCurrentTimestamp();
1518 	if (reply)
1519 	{
1520 		MyLogicalRepWorker->reply_lsn = last_lsn;
1521 		MyLogicalRepWorker->reply_time = send_time;
1522 	}
1523 }
1524 
1525 /*
1526  * Apply main loop.
1527  */
1528 static void
LogicalRepApplyLoop(XLogRecPtr last_received)1529 LogicalRepApplyLoop(XLogRecPtr last_received)
1530 {
1531 	TimestampTz last_recv_timestamp = GetCurrentTimestamp();
1532 	bool		ping_sent = false;
1533 
1534 	/*
1535 	 * Init the ApplyMessageContext which we clean up after each replication
1536 	 * protocol message.
1537 	 */
1538 	ApplyMessageContext = AllocSetContextCreate(ApplyContext,
1539 												"ApplyMessageContext",
1540 												ALLOCSET_DEFAULT_SIZES);
1541 
1542 	/* mark as idle, before starting to loop */
1543 	pgstat_report_activity(STATE_IDLE, NULL);
1544 
1545 	/* This outer loop iterates once per wait. */
1546 	for (;;)
1547 	{
1548 		pgsocket	fd = PGINVALID_SOCKET;
1549 		int			rc;
1550 		int			len;
1551 		char	   *buf = NULL;
1552 		bool		endofstream = false;
1553 		long		wait_time;
1554 
1555 		CHECK_FOR_INTERRUPTS();
1556 
1557 		MemoryContextSwitchTo(ApplyMessageContext);
1558 
1559 		len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
1560 
1561 		if (len != 0)
1562 		{
1563 			/* Loop to process all available data (without blocking). */
1564 			for (;;)
1565 			{
1566 				CHECK_FOR_INTERRUPTS();
1567 
1568 				if (len == 0)
1569 				{
1570 					break;
1571 				}
1572 				else if (len < 0)
1573 				{
1574 					ereport(LOG,
1575 							(errmsg("data stream from publisher has ended")));
1576 					endofstream = true;
1577 					break;
1578 				}
1579 				else
1580 				{
1581 					int			c;
1582 					StringInfoData s;
1583 
1584 					/* Reset timeout. */
1585 					last_recv_timestamp = GetCurrentTimestamp();
1586 					ping_sent = false;
1587 
1588 					/* Ensure we are reading the data into our memory context. */
1589 					MemoryContextSwitchTo(ApplyMessageContext);
1590 
1591 					s.data = buf;
1592 					s.len = len;
1593 					s.cursor = 0;
1594 					s.maxlen = -1;
1595 
1596 					c = pq_getmsgbyte(&s);
1597 
1598 					if (c == 'w')
1599 					{
1600 						XLogRecPtr	start_lsn;
1601 						XLogRecPtr	end_lsn;
1602 						TimestampTz send_time;
1603 
1604 						start_lsn = pq_getmsgint64(&s);
1605 						end_lsn = pq_getmsgint64(&s);
1606 						send_time = pq_getmsgint64(&s);
1607 
1608 						if (last_received < start_lsn)
1609 							last_received = start_lsn;
1610 
1611 						if (last_received < end_lsn)
1612 							last_received = end_lsn;
1613 
1614 						UpdateWorkerStats(last_received, send_time, false);
1615 
1616 						apply_dispatch(&s);
1617 					}
1618 					else if (c == 'k')
1619 					{
1620 						XLogRecPtr	end_lsn;
1621 						TimestampTz timestamp;
1622 						bool		reply_requested;
1623 
1624 						end_lsn = pq_getmsgint64(&s);
1625 						timestamp = pq_getmsgint64(&s);
1626 						reply_requested = pq_getmsgbyte(&s);
1627 
1628 						if (last_received < end_lsn)
1629 							last_received = end_lsn;
1630 
1631 						send_feedback(last_received, reply_requested, false);
1632 						UpdateWorkerStats(last_received, timestamp, true);
1633 					}
1634 					/* other message types are purposefully ignored */
1635 
1636 					MemoryContextReset(ApplyMessageContext);
1637 				}
1638 
1639 				len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
1640 			}
1641 		}
1642 
1643 		/* confirm all writes so far */
1644 		send_feedback(last_received, false, false);
1645 
1646 		if (!in_remote_transaction)
1647 		{
1648 			/*
1649 			 * If we didn't get any transactions for a while there might be
1650 			 * unconsumed invalidation messages in the queue, consume them
1651 			 * now.
1652 			 */
1653 			AcceptInvalidationMessages();
1654 			maybe_reread_subscription();
1655 
1656 			/* Process any table synchronization changes. */
1657 			process_syncing_tables(last_received);
1658 		}
1659 
1660 		/* Cleanup the memory. */
1661 		MemoryContextResetAndDeleteChildren(ApplyMessageContext);
1662 		MemoryContextSwitchTo(TopMemoryContext);
1663 
1664 		/* Check if we need to exit the streaming loop. */
1665 		if (endofstream)
1666 		{
1667 			TimeLineID	tli;
1668 
1669 			walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
1670 			break;
1671 		}
1672 
1673 		/*
1674 		 * Wait for more data or latch.  If we have unflushed transactions,
1675 		 * wake up after WalWriterDelay to see if they've been flushed yet (in
1676 		 * which case we should send a feedback message).  Otherwise, there's
1677 		 * no particular urgency about waking up unless we get data or a
1678 		 * signal.
1679 		 */
1680 		if (!dlist_is_empty(&lsn_mapping))
1681 			wait_time = WalWriterDelay;
1682 		else
1683 			wait_time = NAPTIME_PER_CYCLE;
1684 
1685 		rc = WaitLatchOrSocket(MyLatch,
1686 							   WL_SOCKET_READABLE | WL_LATCH_SET |
1687 							   WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
1688 							   fd, wait_time,
1689 							   WAIT_EVENT_LOGICAL_APPLY_MAIN);
1690 
1691 		if (rc & WL_LATCH_SET)
1692 		{
1693 			ResetLatch(MyLatch);
1694 			CHECK_FOR_INTERRUPTS();
1695 		}
1696 
1697 		if (ConfigReloadPending)
1698 		{
1699 			ConfigReloadPending = false;
1700 			ProcessConfigFile(PGC_SIGHUP);
1701 		}
1702 
1703 		if (rc & WL_TIMEOUT)
1704 		{
1705 			/*
1706 			 * We didn't receive anything new. If we haven't heard anything
1707 			 * from the server for more than wal_receiver_timeout / 2, ping
1708 			 * the server. Also, if it's been longer than
1709 			 * wal_receiver_status_interval since the last update we sent,
1710 			 * send a status update to the master anyway, to report any
1711 			 * progress in applying WAL.
1712 			 */
1713 			bool		requestReply = false;
1714 
1715 			/*
1716 			 * Check if time since last receive from standby has reached the
1717 			 * configured limit.
1718 			 */
1719 			if (wal_receiver_timeout > 0)
1720 			{
1721 				TimestampTz now = GetCurrentTimestamp();
1722 				TimestampTz timeout;
1723 
1724 				timeout =
1725 					TimestampTzPlusMilliseconds(last_recv_timestamp,
1726 												wal_receiver_timeout);
1727 
1728 				if (now >= timeout)
1729 					ereport(ERROR,
1730 							(errmsg("terminating logical replication worker due to timeout")));
1731 
1732 				/* Check to see if it's time for a ping. */
1733 				if (!ping_sent)
1734 				{
1735 					timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
1736 														  (wal_receiver_timeout / 2));
1737 					if (now >= timeout)
1738 					{
1739 						requestReply = true;
1740 						ping_sent = true;
1741 					}
1742 				}
1743 			}
1744 
1745 			send_feedback(last_received, requestReply, requestReply);
1746 		}
1747 	}
1748 }
1749 
1750 /*
1751  * Send a Standby Status Update message to server.
1752  *
1753  * 'recvpos' is the latest LSN we've received data to, force is set if we need
1754  * to send a response to avoid timeouts.
1755  */
1756 static void
send_feedback(XLogRecPtr recvpos,bool force,bool requestReply)1757 send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
1758 {
1759 	static StringInfo reply_message = NULL;
1760 	static TimestampTz send_time = 0;
1761 
1762 	static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
1763 	static XLogRecPtr last_writepos = InvalidXLogRecPtr;
1764 	static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
1765 
1766 	XLogRecPtr	writepos;
1767 	XLogRecPtr	flushpos;
1768 	TimestampTz now;
1769 	bool		have_pending_txes;
1770 
1771 	/*
1772 	 * If the user doesn't want status to be reported to the publisher, be
1773 	 * sure to exit before doing anything at all.
1774 	 */
1775 	if (!force && wal_receiver_status_interval <= 0)
1776 		return;
1777 
1778 	/* It's legal to not pass a recvpos */
1779 	if (recvpos < last_recvpos)
1780 		recvpos = last_recvpos;
1781 
1782 	get_flush_position(&writepos, &flushpos, &have_pending_txes);
1783 
1784 	/*
1785 	 * No outstanding transactions to flush, we can report the latest received
1786 	 * position. This is important for synchronous replication.
1787 	 */
1788 	if (!have_pending_txes)
1789 		flushpos = writepos = recvpos;
1790 
1791 	if (writepos < last_writepos)
1792 		writepos = last_writepos;
1793 
1794 	if (flushpos < last_flushpos)
1795 		flushpos = last_flushpos;
1796 
1797 	now = GetCurrentTimestamp();
1798 
1799 	/* if we've already reported everything we're good */
1800 	if (!force &&
1801 		writepos == last_writepos &&
1802 		flushpos == last_flushpos &&
1803 		!TimestampDifferenceExceeds(send_time, now,
1804 									wal_receiver_status_interval * 1000))
1805 		return;
1806 	send_time = now;
1807 
1808 	if (!reply_message)
1809 	{
1810 		MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
1811 
1812 		reply_message = makeStringInfo();
1813 		MemoryContextSwitchTo(oldctx);
1814 	}
1815 	else
1816 		resetStringInfo(reply_message);
1817 
1818 	pq_sendbyte(reply_message, 'r');
1819 	pq_sendint64(reply_message, recvpos);	/* write */
1820 	pq_sendint64(reply_message, flushpos);	/* flush */
1821 	pq_sendint64(reply_message, writepos);	/* apply */
1822 	pq_sendint64(reply_message, now);	/* sendTime */
1823 	pq_sendbyte(reply_message, requestReply);	/* replyRequested */
1824 
1825 	elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
1826 		 force,
1827 		 (uint32) (recvpos >> 32), (uint32) recvpos,
1828 		 (uint32) (writepos >> 32), (uint32) writepos,
1829 		 (uint32) (flushpos >> 32), (uint32) flushpos
1830 		);
1831 
1832 	walrcv_send(LogRepWorkerWalRcvConn,
1833 				reply_message->data, reply_message->len);
1834 
1835 	if (recvpos > last_recvpos)
1836 		last_recvpos = recvpos;
1837 	if (writepos > last_writepos)
1838 		last_writepos = writepos;
1839 	if (flushpos > last_flushpos)
1840 		last_flushpos = flushpos;
1841 }
1842 
1843 /*
1844  * Reread subscription info if needed. Most changes will be exit.
1845  */
1846 static void
maybe_reread_subscription(void)1847 maybe_reread_subscription(void)
1848 {
1849 	MemoryContext oldctx;
1850 	Subscription *newsub;
1851 	bool		started_tx = false;
1852 
1853 	/* When cache state is valid there is nothing to do here. */
1854 	if (MySubscriptionValid)
1855 		return;
1856 
1857 	/* This function might be called inside or outside of transaction. */
1858 	if (!IsTransactionState())
1859 	{
1860 		StartTransactionCommand();
1861 		started_tx = true;
1862 	}
1863 
1864 	/* Ensure allocations in permanent context. */
1865 	oldctx = MemoryContextSwitchTo(ApplyContext);
1866 
1867 	newsub = GetSubscription(MyLogicalRepWorker->subid, true);
1868 
1869 	/*
1870 	 * Exit if the subscription was removed. This normally should not happen
1871 	 * as the worker gets killed during DROP SUBSCRIPTION.
1872 	 */
1873 	if (!newsub)
1874 	{
1875 		ereport(LOG,
1876 				(errmsg("logical replication apply worker for subscription \"%s\" will "
1877 						"stop because the subscription was removed",
1878 						MySubscription->name)));
1879 
1880 		proc_exit(0);
1881 	}
1882 
1883 	/*
1884 	 * Exit if the subscription was disabled. This normally should not happen
1885 	 * as the worker gets killed during ALTER SUBSCRIPTION ... DISABLE.
1886 	 */
1887 	if (!newsub->enabled)
1888 	{
1889 		ereport(LOG,
1890 				(errmsg("logical replication apply worker for subscription \"%s\" will "
1891 						"stop because the subscription was disabled",
1892 						MySubscription->name)));
1893 
1894 		proc_exit(0);
1895 	}
1896 
1897 	/*
1898 	 * Exit if connection string was changed. The launcher will start new
1899 	 * worker.
1900 	 */
1901 	if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0)
1902 	{
1903 		ereport(LOG,
1904 				(errmsg("logical replication apply worker for subscription \"%s\" will "
1905 						"restart because the connection information was changed",
1906 						MySubscription->name)));
1907 
1908 		proc_exit(0);
1909 	}
1910 
1911 	/*
1912 	 * Exit if subscription name was changed (it's used for
1913 	 * fallback_application_name). The launcher will start new worker.
1914 	 */
1915 	if (strcmp(newsub->name, MySubscription->name) != 0)
1916 	{
1917 		ereport(LOG,
1918 				(errmsg("logical replication apply worker for subscription \"%s\" will "
1919 						"restart because subscription was renamed",
1920 						MySubscription->name)));
1921 
1922 		proc_exit(0);
1923 	}
1924 
1925 	/* !slotname should never happen when enabled is true. */
1926 	Assert(newsub->slotname);
1927 
1928 	/*
1929 	 * We need to make new connection to new slot if slot name has changed so
1930 	 * exit here as well if that's the case.
1931 	 */
1932 	if (strcmp(newsub->slotname, MySubscription->slotname) != 0)
1933 	{
1934 		ereport(LOG,
1935 				(errmsg("logical replication apply worker for subscription \"%s\" will "
1936 						"restart because the replication slot name was changed",
1937 						MySubscription->name)));
1938 
1939 		proc_exit(0);
1940 	}
1941 
1942 	/*
1943 	 * Exit if publication list was changed. The launcher will start new
1944 	 * worker.
1945 	 */
1946 	if (!equal(newsub->publications, MySubscription->publications))
1947 	{
1948 		ereport(LOG,
1949 				(errmsg("logical replication apply worker for subscription \"%s\" will "
1950 						"restart because subscription's publications were changed",
1951 						MySubscription->name)));
1952 
1953 		proc_exit(0);
1954 	}
1955 
1956 	/* Check for other changes that should never happen too. */
1957 	if (newsub->dbid != MySubscription->dbid)
1958 	{
1959 		elog(ERROR, "subscription %u changed unexpectedly",
1960 			 MyLogicalRepWorker->subid);
1961 	}
1962 
1963 	/* Clean old subscription info and switch to new one. */
1964 	FreeSubscription(MySubscription);
1965 	MySubscription = newsub;
1966 
1967 	MemoryContextSwitchTo(oldctx);
1968 
1969 	/* Change synchronous commit according to the user's wishes */
1970 	SetConfigOption("synchronous_commit", MySubscription->synccommit,
1971 					PGC_BACKEND, PGC_S_OVERRIDE);
1972 
1973 	if (started_tx)
1974 		CommitTransactionCommand();
1975 
1976 	MySubscriptionValid = true;
1977 }
1978 
1979 /*
1980  * Callback from subscription syscache invalidation.
1981  */
1982 static void
subscription_change_cb(Datum arg,int cacheid,uint32 hashvalue)1983 subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
1984 {
1985 	MySubscriptionValid = false;
1986 }
1987 
1988 /* Logical Replication Apply worker entry point */
1989 void
ApplyWorkerMain(Datum main_arg)1990 ApplyWorkerMain(Datum main_arg)
1991 {
1992 	int			worker_slot = DatumGetInt32(main_arg);
1993 	MemoryContext oldctx;
1994 	char		originname[NAMEDATALEN];
1995 	XLogRecPtr	origin_startpos;
1996 	char	   *myslotname;
1997 	WalRcvStreamOptions options;
1998 
1999 	/* Attach to slot */
2000 	logicalrep_worker_attach(worker_slot);
2001 
2002 	/* Setup signal handling */
2003 	pqsignal(SIGHUP, SignalHandlerForConfigReload);
2004 	pqsignal(SIGTERM, die);
2005 	BackgroundWorkerUnblockSignals();
2006 
2007 	/*
2008 	 * We don't currently need any ResourceOwner in a walreceiver process, but
2009 	 * if we did, we could call CreateAuxProcessResourceOwner here.
2010 	 */
2011 
2012 	/* Initialise stats to a sanish value */
2013 	MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
2014 		MyLogicalRepWorker->reply_time = GetCurrentTimestamp();
2015 
2016 	/* Load the libpq-specific functions */
2017 	load_file("libpqwalreceiver", false);
2018 
2019 	/* Run as replica session replication role. */
2020 	SetConfigOption("session_replication_role", "replica",
2021 					PGC_SUSET, PGC_S_OVERRIDE);
2022 
2023 	/* Connect to our database. */
2024 	BackgroundWorkerInitializeConnectionByOid(MyLogicalRepWorker->dbid,
2025 											  MyLogicalRepWorker->userid,
2026 											  0);
2027 
2028 	/*
2029 	 * Set always-secure search path, so malicious users can't redirect user
2030 	 * code (e.g. pg_index.indexprs).
2031 	 */
2032 	SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
2033 
2034 	/* Load the subscription into persistent memory context. */
2035 	ApplyContext = AllocSetContextCreate(TopMemoryContext,
2036 										 "ApplyContext",
2037 										 ALLOCSET_DEFAULT_SIZES);
2038 	StartTransactionCommand();
2039 	oldctx = MemoryContextSwitchTo(ApplyContext);
2040 
2041 	MySubscription = GetSubscription(MyLogicalRepWorker->subid, true);
2042 	if (!MySubscription)
2043 	{
2044 		ereport(LOG,
2045 				(errmsg("logical replication apply worker for subscription %u will not "
2046 						"start because the subscription was removed during startup",
2047 						MyLogicalRepWorker->subid)));
2048 		proc_exit(0);
2049 	}
2050 
2051 	MySubscriptionValid = true;
2052 	MemoryContextSwitchTo(oldctx);
2053 
2054 	if (!MySubscription->enabled)
2055 	{
2056 		ereport(LOG,
2057 				(errmsg("logical replication apply worker for subscription \"%s\" will not "
2058 						"start because the subscription was disabled during startup",
2059 						MySubscription->name)));
2060 
2061 		proc_exit(0);
2062 	}
2063 
2064 	/* Setup synchronous commit according to the user's wishes */
2065 	SetConfigOption("synchronous_commit", MySubscription->synccommit,
2066 					PGC_BACKEND, PGC_S_OVERRIDE);
2067 
2068 	/* Keep us informed about subscription changes. */
2069 	CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
2070 								  subscription_change_cb,
2071 								  (Datum) 0);
2072 
2073 	if (am_tablesync_worker())
2074 		ereport(LOG,
2075 				(errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
2076 						MySubscription->name, get_rel_name(MyLogicalRepWorker->relid))));
2077 	else
2078 		ereport(LOG,
2079 				(errmsg("logical replication apply worker for subscription \"%s\" has started",
2080 						MySubscription->name)));
2081 
2082 	CommitTransactionCommand();
2083 
2084 	/* Connect to the origin and start the replication. */
2085 	elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
2086 		 MySubscription->conninfo);
2087 
2088 	if (am_tablesync_worker())
2089 	{
2090 		char	   *syncslotname;
2091 
2092 		/* This is table synchronization worker, call initial sync. */
2093 		syncslotname = LogicalRepSyncTableStart(&origin_startpos);
2094 
2095 		/* The slot name needs to be allocated in permanent memory context. */
2096 		oldctx = MemoryContextSwitchTo(ApplyContext);
2097 		myslotname = pstrdup(syncslotname);
2098 		MemoryContextSwitchTo(oldctx);
2099 
2100 		pfree(syncslotname);
2101 	}
2102 	else
2103 	{
2104 		/* This is main apply worker */
2105 		RepOriginId originid;
2106 		TimeLineID	startpointTLI;
2107 		char	   *err;
2108 
2109 		myslotname = MySubscription->slotname;
2110 
2111 		/*
2112 		 * This shouldn't happen if the subscription is enabled, but guard
2113 		 * against DDL bugs or manual catalog changes.  (libpqwalreceiver will
2114 		 * crash if slot is NULL.)
2115 		 */
2116 		if (!myslotname)
2117 			ereport(ERROR,
2118 					(errmsg("subscription has no replication slot set")));
2119 
2120 		/* Setup replication origin tracking. */
2121 		StartTransactionCommand();
2122 		snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid);
2123 		originid = replorigin_by_name(originname, true);
2124 		if (!OidIsValid(originid))
2125 			originid = replorigin_create(originname);
2126 		replorigin_session_setup(originid);
2127 		replorigin_session_origin = originid;
2128 		origin_startpos = replorigin_session_get_progress(false);
2129 		CommitTransactionCommand();
2130 
2131 		LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
2132 												MySubscription->name, &err);
2133 		if (LogRepWorkerWalRcvConn == NULL)
2134 			ereport(ERROR,
2135 					(errmsg("could not connect to the publisher: %s", err)));
2136 
2137 		/*
2138 		 * We don't really use the output identify_system for anything but it
2139 		 * does some initializations on the upstream so let's still call it.
2140 		 */
2141 		(void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
2142 	}
2143 
2144 	/*
2145 	 * Setup callback for syscache so that we know when something changes in
2146 	 * the subscription relation state.
2147 	 */
2148 	CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
2149 								  invalidate_syncing_table_states,
2150 								  (Datum) 0);
2151 
2152 	/* Build logical replication streaming options. */
2153 	options.logical = true;
2154 	options.startpoint = origin_startpos;
2155 	options.slotname = myslotname;
2156 	options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM;
2157 	options.proto.logical.publication_names = MySubscription->publications;
2158 
2159 	/* Start normal logical streaming replication. */
2160 	walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
2161 
2162 	/* Run the main loop. */
2163 	LogicalRepApplyLoop(origin_startpos);
2164 
2165 	proc_exit(0);
2166 }
2167 
2168 /*
2169  * Is current process a logical replication worker?
2170  */
2171 bool
IsLogicalWorker(void)2172 IsLogicalWorker(void)
2173 {
2174 	return MyLogicalRepWorker != NULL;
2175 }
2176