1 /*-------------------------------------------------------------------------
2  * worker.c
3  *	   PostgreSQL logical replication worker (apply)
4  *
5  * Copyright (c) 2016-2021, PostgreSQL Global Development Group
6  *
7  * IDENTIFICATION
8  *	  src/backend/replication/logical/worker.c
9  *
10  * NOTES
11  *	  This file contains the worker which applies logical changes as they come
12  *	  from remote logical replication stream.
13  *
14  *	  The main worker (apply) is started by logical replication worker
15  *	  launcher for every enabled subscription in a database. It uses
16  *	  walsender protocol to communicate with publisher.
17  *
18  *	  This module includes server facing code and shares libpqwalreceiver
19  *	  module with walreceiver for providing the libpq specific functionality.
20  *
21  *
22  * STREAMED TRANSACTIONS
23  * ---------------------
24  * Streamed transactions (large transactions exceeding a memory limit on the
25  * upstream) are not applied immediately, but instead, the data is written
26  * to temporary files and then applied at once when the final commit arrives.
27  *
28  * Unlike the regular (non-streamed) case, handling streamed transactions has
29  * to handle aborts of both the toplevel transaction and subtransactions. This
30  * is achieved by tracking offsets for subtransactions, which is then used
31  * to truncate the file with serialized changes.
32  *
33  * The files are placed in tmp file directory by default, and the filenames
34  * include both the XID of the toplevel transaction and OID of the
35  * subscription. This is necessary so that different workers processing a
36  * remote transaction with the same XID doesn't interfere.
37  *
38  * We use BufFiles instead of using normal temporary files because (a) the
39  * BufFile infrastructure supports temporary files that exceed the OS file size
40  * limit, (b) provides a way for automatic clean up on the error and (c) provides
41  * a way to survive these files across local transactions and allow to open and
42  * close at stream start and close. We decided to use SharedFileSet
43  * infrastructure as without that it deletes the files on the closure of the
44  * file and if we decide to keep stream files open across the start/stop stream
45  * then it will consume a lot of memory (more than 8K for each BufFile and
46  * there could be multiple such BufFiles as the subscriber could receive
47  * multiple start/stop streams for different transactions before getting the
48  * commit). Moreover, if we don't use SharedFileSet then we also need to invent
49  * a new way to pass filenames to BufFile APIs so that we are allowed to open
50  * the file we desired across multiple stream-open calls for the same
51  * transaction.
52  *-------------------------------------------------------------------------
53  */
54 
55 #include "postgres.h"
56 
57 #include <sys/stat.h>
58 #include <unistd.h>
59 
60 #include "access/table.h"
61 #include "access/tableam.h"
62 #include "access/xact.h"
63 #include "access/xlog_internal.h"
64 #include "catalog/catalog.h"
65 #include "catalog/namespace.h"
66 #include "catalog/partition.h"
67 #include "catalog/pg_inherits.h"
68 #include "catalog/pg_subscription.h"
69 #include "catalog/pg_subscription_rel.h"
70 #include "catalog/pg_tablespace.h"
71 #include "commands/tablecmds.h"
72 #include "commands/tablespace.h"
73 #include "commands/trigger.h"
74 #include "executor/executor.h"
75 #include "executor/execPartition.h"
76 #include "executor/nodeModifyTable.h"
77 #include "funcapi.h"
78 #include "libpq/pqformat.h"
79 #include "libpq/pqsignal.h"
80 #include "mb/pg_wchar.h"
81 #include "miscadmin.h"
82 #include "nodes/makefuncs.h"
83 #include "optimizer/optimizer.h"
84 #include "pgstat.h"
85 #include "postmaster/bgworker.h"
86 #include "postmaster/interrupt.h"
87 #include "postmaster/postmaster.h"
88 #include "postmaster/walwriter.h"
89 #include "replication/decode.h"
90 #include "replication/logical.h"
91 #include "replication/logicalproto.h"
92 #include "replication/logicalrelation.h"
93 #include "replication/logicalworker.h"
94 #include "replication/origin.h"
95 #include "replication/reorderbuffer.h"
96 #include "replication/snapbuild.h"
97 #include "replication/walreceiver.h"
98 #include "replication/worker_internal.h"
99 #include "rewrite/rewriteHandler.h"
100 #include "storage/buffile.h"
101 #include "storage/bufmgr.h"
102 #include "storage/fd.h"
103 #include "storage/ipc.h"
104 #include "storage/lmgr.h"
105 #include "storage/proc.h"
106 #include "storage/procarray.h"
107 #include "tcop/tcopprot.h"
108 #include "utils/builtins.h"
109 #include "utils/catcache.h"
110 #include "utils/dynahash.h"
111 #include "utils/datum.h"
112 #include "utils/fmgroids.h"
113 #include "utils/guc.h"
114 #include "utils/inval.h"
115 #include "utils/lsyscache.h"
116 #include "utils/memutils.h"
117 #include "utils/rel.h"
118 #include "utils/syscache.h"
119 #include "utils/timeout.h"
120 
121 #define NAPTIME_PER_CYCLE 1000	/* max sleep time between cycles (1s) */
122 
123 typedef struct FlushPosition
124 {
125 	dlist_node	node;
126 	XLogRecPtr	local_end;
127 	XLogRecPtr	remote_end;
128 } FlushPosition;
129 
130 static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
131 
132 typedef struct SlotErrCallbackArg
133 {
134 	LogicalRepRelMapEntry *rel;
135 	int			remote_attnum;
136 } SlotErrCallbackArg;
137 
138 typedef struct ApplyExecutionData
139 {
140 	EState	   *estate;			/* executor state, used to track resources */
141 
142 	LogicalRepRelMapEntry *targetRel;	/* replication target rel */
143 	ResultRelInfo *targetRelInfo;	/* ResultRelInfo for same */
144 
145 	/* These fields are used when the target relation is partitioned: */
146 	ModifyTableState *mtstate;	/* dummy ModifyTable state */
147 	PartitionTupleRouting *proute;	/* partition routing info */
148 } ApplyExecutionData;
149 
150 /*
151  * Stream xid hash entry. Whenever we see a new xid we create this entry in the
152  * xidhash and along with it create the streaming file and store the fileset handle.
153  * The subxact file is created iff there is any subxact info under this xid. This
154  * entry is used on the subsequent streams for the xid to get the corresponding
155  * fileset handles, so storing them in hash makes the search faster.
156  */
157 typedef struct StreamXidHash
158 {
159 	TransactionId xid;			/* xid is the hash key and must be first */
160 	SharedFileSet *stream_fileset;	/* shared file set for stream data */
161 	SharedFileSet *subxact_fileset; /* shared file set for subxact info */
162 } StreamXidHash;
163 
164 static MemoryContext ApplyMessageContext = NULL;
165 MemoryContext ApplyContext = NULL;
166 
167 /* per stream context for streaming transactions */
168 static MemoryContext LogicalStreamingContext = NULL;
169 
170 WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
171 
172 Subscription *MySubscription = NULL;
173 bool		MySubscriptionValid = false;
174 
175 bool		in_remote_transaction = false;
176 static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
177 
178 /* fields valid only when processing streamed transaction */
179 static bool in_streamed_transaction = false;
180 
181 static TransactionId stream_xid = InvalidTransactionId;
182 
183 /*
184  * Hash table for storing the streaming xid information along with shared file
185  * set for streaming and subxact files.
186  */
187 static HTAB *xidhash = NULL;
188 
189 /* BufFile handle of the current streaming file */
190 static BufFile *stream_fd = NULL;
191 
192 typedef struct SubXactInfo
193 {
194 	TransactionId xid;			/* XID of the subxact */
195 	int			fileno;			/* file number in the buffile */
196 	off_t		offset;			/* offset in the file */
197 } SubXactInfo;
198 
199 /* Sub-transaction data for the current streaming transaction */
200 typedef struct ApplySubXactData
201 {
202 	uint32		nsubxacts;		/* number of sub-transactions */
203 	uint32		nsubxacts_max;	/* current capacity of subxacts */
204 	TransactionId subxact_last; /* xid of the last sub-transaction */
205 	SubXactInfo *subxacts;		/* sub-xact offset in changes file */
206 } ApplySubXactData;
207 
208 static ApplySubXactData subxact_data = {0, 0, InvalidTransactionId, NULL};
209 
210 static inline void subxact_filename(char *path, Oid subid, TransactionId xid);
211 static inline void changes_filename(char *path, Oid subid, TransactionId xid);
212 
213 /*
214  * Information about subtransactions of a given toplevel transaction.
215  */
216 static void subxact_info_write(Oid subid, TransactionId xid);
217 static void subxact_info_read(Oid subid, TransactionId xid);
218 static void subxact_info_add(TransactionId xid);
219 static inline void cleanup_subxact_info(void);
220 
221 /*
222  * Serialize and deserialize changes for a toplevel transaction.
223  */
224 static void stream_cleanup_files(Oid subid, TransactionId xid);
225 static void stream_open_file(Oid subid, TransactionId xid, bool first);
226 static void stream_write_change(char action, StringInfo s);
227 static void stream_close_file(void);
228 
229 static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
230 
231 static void store_flush_position(XLogRecPtr remote_lsn);
232 
233 static void maybe_reread_subscription(void);
234 
235 /* prototype needed because of stream_commit */
236 static void apply_dispatch(StringInfo s);
237 
238 static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
239 static void apply_handle_insert_internal(ApplyExecutionData *edata,
240 										 ResultRelInfo *relinfo,
241 										 TupleTableSlot *remoteslot);
242 static void apply_handle_update_internal(ApplyExecutionData *edata,
243 										 ResultRelInfo *relinfo,
244 										 TupleTableSlot *remoteslot,
245 										 LogicalRepTupleData *newtup);
246 static void apply_handle_delete_internal(ApplyExecutionData *edata,
247 										 ResultRelInfo *relinfo,
248 										 TupleTableSlot *remoteslot);
249 static bool FindReplTupleInLocalRel(EState *estate, Relation localrel,
250 									LogicalRepRelation *remoterel,
251 									TupleTableSlot *remoteslot,
252 									TupleTableSlot **localslot);
253 static void apply_handle_tuple_routing(ApplyExecutionData *edata,
254 									   TupleTableSlot *remoteslot,
255 									   LogicalRepTupleData *newtup,
256 									   CmdType operation);
257 
258 /*
259  * Should this worker apply changes for given relation.
260  *
261  * This is mainly needed for initial relation data sync as that runs in
262  * separate worker process running in parallel and we need some way to skip
263  * changes coming to the main apply worker during the sync of a table.
264  *
265  * Note we need to do smaller or equals comparison for SYNCDONE state because
266  * it might hold position of end of initial slot consistent point WAL
267  * record + 1 (ie start of next record) and next record can be COMMIT of
268  * transaction we are now processing (which is what we set remote_final_lsn
269  * to in apply_handle_begin).
270  */
271 static bool
should_apply_changes_for_rel(LogicalRepRelMapEntry * rel)272 should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
273 {
274 	if (am_tablesync_worker())
275 		return MyLogicalRepWorker->relid == rel->localreloid;
276 	else
277 		return (rel->state == SUBREL_STATE_READY ||
278 				(rel->state == SUBREL_STATE_SYNCDONE &&
279 				 rel->statelsn <= remote_final_lsn));
280 }
281 
282 /*
283  * Begin one step (one INSERT, UPDATE, etc) of a replication transaction.
284  *
285  * Start a transaction, if this is the first step (else we keep using the
286  * existing transaction).
287  * Also provide a global snapshot and ensure we run in ApplyMessageContext.
288  */
289 static void
begin_replication_step(void)290 begin_replication_step(void)
291 {
292 	SetCurrentStatementStartTimestamp();
293 
294 	if (!IsTransactionState())
295 	{
296 		StartTransactionCommand();
297 		maybe_reread_subscription();
298 	}
299 
300 	PushActiveSnapshot(GetTransactionSnapshot());
301 
302 	MemoryContextSwitchTo(ApplyMessageContext);
303 }
304 
305 /*
306  * Finish up one step of a replication transaction.
307  * Callers of begin_replication_step() must also call this.
308  *
309  * We don't close out the transaction here, but we should increment
310  * the command counter to make the effects of this step visible.
311  */
312 static void
end_replication_step(void)313 end_replication_step(void)
314 {
315 	PopActiveSnapshot();
316 
317 	CommandCounterIncrement();
318 }
319 
320 /*
321  * Handle streamed transactions.
322  *
323  * If in streaming mode (receiving a block of streamed transaction), we
324  * simply redirect it to a file for the proper toplevel transaction.
325  *
326  * Returns true for streamed transactions, false otherwise (regular mode).
327  */
328 static bool
handle_streamed_transaction(LogicalRepMsgType action,StringInfo s)329 handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
330 {
331 	TransactionId xid;
332 
333 	/* not in streaming mode */
334 	if (!in_streamed_transaction)
335 		return false;
336 
337 	Assert(stream_fd != NULL);
338 	Assert(TransactionIdIsValid(stream_xid));
339 
340 	/*
341 	 * We should have received XID of the subxact as the first part of the
342 	 * message, so extract it.
343 	 */
344 	xid = pq_getmsgint(s, 4);
345 
346 	if (!TransactionIdIsValid(xid))
347 		ereport(ERROR,
348 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
349 				 errmsg_internal("invalid transaction ID in streamed replication transaction")));
350 
351 	/* Add the new subxact to the array (unless already there). */
352 	subxact_info_add(xid);
353 
354 	/* write the change to the current file */
355 	stream_write_change(action, s);
356 
357 	return true;
358 }
359 
360 /*
361  * Executor state preparation for evaluation of constraint expressions,
362  * indexes and triggers for the specified relation.
363  *
364  * Note that the caller must open and close any indexes to be updated.
365  */
366 static ApplyExecutionData *
create_edata_for_relation(LogicalRepRelMapEntry * rel)367 create_edata_for_relation(LogicalRepRelMapEntry *rel)
368 {
369 	ApplyExecutionData *edata;
370 	EState	   *estate;
371 	RangeTblEntry *rte;
372 	ResultRelInfo *resultRelInfo;
373 
374 	edata = (ApplyExecutionData *) palloc0(sizeof(ApplyExecutionData));
375 	edata->targetRel = rel;
376 
377 	edata->estate = estate = CreateExecutorState();
378 
379 	rte = makeNode(RangeTblEntry);
380 	rte->rtekind = RTE_RELATION;
381 	rte->relid = RelationGetRelid(rel->localrel);
382 	rte->relkind = rel->localrel->rd_rel->relkind;
383 	rte->rellockmode = AccessShareLock;
384 	ExecInitRangeTable(estate, list_make1(rte));
385 
386 	edata->targetRelInfo = resultRelInfo = makeNode(ResultRelInfo);
387 
388 	/*
389 	 * Use Relation opened by logicalrep_rel_open() instead of opening it
390 	 * again.
391 	 */
392 	InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
393 
394 	/*
395 	 * We put the ResultRelInfo in the es_opened_result_relations list, even
396 	 * though we don't populate the es_result_relations array.  That's a bit
397 	 * bogus, but it's enough to make ExecGetTriggerResultRel() find them.
398 	 *
399 	 * ExecOpenIndices() is not called here either, each execution path doing
400 	 * an apply operation being responsible for that.
401 	 */
402 	estate->es_opened_result_relations =
403 		lappend(estate->es_opened_result_relations, resultRelInfo);
404 
405 	estate->es_output_cid = GetCurrentCommandId(true);
406 
407 	/* Prepare to catch AFTER triggers. */
408 	AfterTriggerBeginQuery();
409 
410 	/* other fields of edata remain NULL for now */
411 
412 	return edata;
413 }
414 
415 /*
416  * Finish any operations related to the executor state created by
417  * create_edata_for_relation().
418  */
419 static void
finish_edata(ApplyExecutionData * edata)420 finish_edata(ApplyExecutionData *edata)
421 {
422 	EState	   *estate = edata->estate;
423 
424 	/* Handle any queued AFTER triggers. */
425 	AfterTriggerEndQuery(estate);
426 
427 	/* Shut down tuple routing, if any was done. */
428 	if (edata->proute)
429 		ExecCleanupTupleRouting(edata->mtstate, edata->proute);
430 
431 	/*
432 	 * Cleanup.  It might seem that we should call ExecCloseResultRelations()
433 	 * here, but we intentionally don't.  It would close the rel we added to
434 	 * es_opened_result_relations above, which is wrong because we took no
435 	 * corresponding refcount.  We rely on ExecCleanupTupleRouting() to close
436 	 * any other relations opened during execution.
437 	 */
438 	ExecResetTupleTable(estate->es_tupleTable, false);
439 	FreeExecutorState(estate);
440 	pfree(edata);
441 }
442 
443 /*
444  * Executes default values for columns for which we can't map to remote
445  * relation columns.
446  *
447  * This allows us to support tables which have more columns on the downstream
448  * than on the upstream.
449  */
450 static void
slot_fill_defaults(LogicalRepRelMapEntry * rel,EState * estate,TupleTableSlot * slot)451 slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate,
452 				   TupleTableSlot *slot)
453 {
454 	TupleDesc	desc = RelationGetDescr(rel->localrel);
455 	int			num_phys_attrs = desc->natts;
456 	int			i;
457 	int			attnum,
458 				num_defaults = 0;
459 	int		   *defmap;
460 	ExprState **defexprs;
461 	ExprContext *econtext;
462 
463 	econtext = GetPerTupleExprContext(estate);
464 
465 	/* We got all the data via replication, no need to evaluate anything. */
466 	if (num_phys_attrs == rel->remoterel.natts)
467 		return;
468 
469 	defmap = (int *) palloc(num_phys_attrs * sizeof(int));
470 	defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
471 
472 	Assert(rel->attrmap->maplen == num_phys_attrs);
473 	for (attnum = 0; attnum < num_phys_attrs; attnum++)
474 	{
475 		Expr	   *defexpr;
476 
477 		if (TupleDescAttr(desc, attnum)->attisdropped || TupleDescAttr(desc, attnum)->attgenerated)
478 			continue;
479 
480 		if (rel->attrmap->attnums[attnum] >= 0)
481 			continue;
482 
483 		defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
484 
485 		if (defexpr != NULL)
486 		{
487 			/* Run the expression through planner */
488 			defexpr = expression_planner(defexpr);
489 
490 			/* Initialize executable expression in copycontext */
491 			defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
492 			defmap[num_defaults] = attnum;
493 			num_defaults++;
494 		}
495 
496 	}
497 
498 	for (i = 0; i < num_defaults; i++)
499 		slot->tts_values[defmap[i]] =
500 			ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
501 }
502 
503 /*
504  * Error callback to give more context info about data conversion failures
505  * while reading data from the remote server.
506  */
507 static void
slot_store_error_callback(void * arg)508 slot_store_error_callback(void *arg)
509 {
510 	SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
511 	LogicalRepRelMapEntry *rel;
512 
513 	/* Nothing to do if remote attribute number is not set */
514 	if (errarg->remote_attnum < 0)
515 		return;
516 
517 	rel = errarg->rel;
518 	errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\"",
519 			   rel->remoterel.nspname, rel->remoterel.relname,
520 			   rel->remoterel.attnames[errarg->remote_attnum]);
521 }
522 
523 /*
524  * Store tuple data into slot.
525  *
526  * Incoming data can be either text or binary format.
527  */
528 static void
slot_store_data(TupleTableSlot * slot,LogicalRepRelMapEntry * rel,LogicalRepTupleData * tupleData)529 slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
530 				LogicalRepTupleData *tupleData)
531 {
532 	int			natts = slot->tts_tupleDescriptor->natts;
533 	int			i;
534 	SlotErrCallbackArg errarg;
535 	ErrorContextCallback errcallback;
536 
537 	ExecClearTuple(slot);
538 
539 	/* Push callback + info on the error context stack */
540 	errarg.rel = rel;
541 	errarg.remote_attnum = -1;
542 	errcallback.callback = slot_store_error_callback;
543 	errcallback.arg = (void *) &errarg;
544 	errcallback.previous = error_context_stack;
545 	error_context_stack = &errcallback;
546 
547 	/* Call the "in" function for each non-dropped, non-null attribute */
548 	Assert(natts == rel->attrmap->maplen);
549 	for (i = 0; i < natts; i++)
550 	{
551 		Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
552 		int			remoteattnum = rel->attrmap->attnums[i];
553 
554 		if (!att->attisdropped && remoteattnum >= 0)
555 		{
556 			StringInfo	colvalue = &tupleData->colvalues[remoteattnum];
557 
558 			Assert(remoteattnum < tupleData->ncols);
559 
560 			errarg.remote_attnum = remoteattnum;
561 
562 			if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
563 			{
564 				Oid			typinput;
565 				Oid			typioparam;
566 
567 				getTypeInputInfo(att->atttypid, &typinput, &typioparam);
568 				slot->tts_values[i] =
569 					OidInputFunctionCall(typinput, colvalue->data,
570 										 typioparam, att->atttypmod);
571 				slot->tts_isnull[i] = false;
572 			}
573 			else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
574 			{
575 				Oid			typreceive;
576 				Oid			typioparam;
577 
578 				/*
579 				 * In some code paths we may be asked to re-parse the same
580 				 * tuple data.  Reset the StringInfo's cursor so that works.
581 				 */
582 				colvalue->cursor = 0;
583 
584 				getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
585 				slot->tts_values[i] =
586 					OidReceiveFunctionCall(typreceive, colvalue,
587 										   typioparam, att->atttypmod);
588 
589 				/* Trouble if it didn't eat the whole buffer */
590 				if (colvalue->cursor != colvalue->len)
591 					ereport(ERROR,
592 							(errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
593 							 errmsg("incorrect binary data format in logical replication column %d",
594 									remoteattnum + 1)));
595 				slot->tts_isnull[i] = false;
596 			}
597 			else
598 			{
599 				/*
600 				 * NULL value from remote.  (We don't expect to see
601 				 * LOGICALREP_COLUMN_UNCHANGED here, but if we do, treat it as
602 				 * NULL.)
603 				 */
604 				slot->tts_values[i] = (Datum) 0;
605 				slot->tts_isnull[i] = true;
606 			}
607 
608 			errarg.remote_attnum = -1;
609 		}
610 		else
611 		{
612 			/*
613 			 * We assign NULL to dropped attributes and missing values
614 			 * (missing values should be later filled using
615 			 * slot_fill_defaults).
616 			 */
617 			slot->tts_values[i] = (Datum) 0;
618 			slot->tts_isnull[i] = true;
619 		}
620 	}
621 
622 	/* Pop the error context stack */
623 	error_context_stack = errcallback.previous;
624 
625 	ExecStoreVirtualTuple(slot);
626 }
627 
628 /*
629  * Replace updated columns with data from the LogicalRepTupleData struct.
630  * This is somewhat similar to heap_modify_tuple but also calls the type
631  * input functions on the user data.
632  *
633  * "slot" is filled with a copy of the tuple in "srcslot", replacing
634  * columns provided in "tupleData" and leaving others as-is.
635  *
636  * Caution: unreplaced pass-by-ref columns in "slot" will point into the
637  * storage for "srcslot".  This is OK for current usage, but someday we may
638  * need to materialize "slot" at the end to make it independent of "srcslot".
639  */
640 static void
slot_modify_data(TupleTableSlot * slot,TupleTableSlot * srcslot,LogicalRepRelMapEntry * rel,LogicalRepTupleData * tupleData)641 slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot,
642 				 LogicalRepRelMapEntry *rel,
643 				 LogicalRepTupleData *tupleData)
644 {
645 	int			natts = slot->tts_tupleDescriptor->natts;
646 	int			i;
647 	SlotErrCallbackArg errarg;
648 	ErrorContextCallback errcallback;
649 
650 	/* We'll fill "slot" with a virtual tuple, so we must start with ... */
651 	ExecClearTuple(slot);
652 
653 	/*
654 	 * Copy all the column data from srcslot, so that we'll have valid values
655 	 * for unreplaced columns.
656 	 */
657 	Assert(natts == srcslot->tts_tupleDescriptor->natts);
658 	slot_getallattrs(srcslot);
659 	memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
660 	memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
661 
662 	/* For error reporting, push callback + info on the error context stack */
663 	errarg.rel = rel;
664 	errarg.remote_attnum = -1;
665 	errcallback.callback = slot_store_error_callback;
666 	errcallback.arg = (void *) &errarg;
667 	errcallback.previous = error_context_stack;
668 	error_context_stack = &errcallback;
669 
670 	/* Call the "in" function for each replaced attribute */
671 	Assert(natts == rel->attrmap->maplen);
672 	for (i = 0; i < natts; i++)
673 	{
674 		Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
675 		int			remoteattnum = rel->attrmap->attnums[i];
676 
677 		if (remoteattnum < 0)
678 			continue;
679 
680 		Assert(remoteattnum < tupleData->ncols);
681 
682 		if (tupleData->colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
683 		{
684 			StringInfo	colvalue = &tupleData->colvalues[remoteattnum];
685 
686 			errarg.remote_attnum = remoteattnum;
687 
688 			if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
689 			{
690 				Oid			typinput;
691 				Oid			typioparam;
692 
693 				getTypeInputInfo(att->atttypid, &typinput, &typioparam);
694 				slot->tts_values[i] =
695 					OidInputFunctionCall(typinput, colvalue->data,
696 										 typioparam, att->atttypmod);
697 				slot->tts_isnull[i] = false;
698 			}
699 			else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
700 			{
701 				Oid			typreceive;
702 				Oid			typioparam;
703 
704 				/*
705 				 * In some code paths we may be asked to re-parse the same
706 				 * tuple data.  Reset the StringInfo's cursor so that works.
707 				 */
708 				colvalue->cursor = 0;
709 
710 				getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
711 				slot->tts_values[i] =
712 					OidReceiveFunctionCall(typreceive, colvalue,
713 										   typioparam, att->atttypmod);
714 
715 				/* Trouble if it didn't eat the whole buffer */
716 				if (colvalue->cursor != colvalue->len)
717 					ereport(ERROR,
718 							(errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
719 							 errmsg("incorrect binary data format in logical replication column %d",
720 									remoteattnum + 1)));
721 				slot->tts_isnull[i] = false;
722 			}
723 			else
724 			{
725 				/* must be LOGICALREP_COLUMN_NULL */
726 				slot->tts_values[i] = (Datum) 0;
727 				slot->tts_isnull[i] = true;
728 			}
729 
730 			errarg.remote_attnum = -1;
731 		}
732 	}
733 
734 	/* Pop the error context stack */
735 	error_context_stack = errcallback.previous;
736 
737 	/* And finally, declare that "slot" contains a valid virtual tuple */
738 	ExecStoreVirtualTuple(slot);
739 }
740 
741 /*
742  * Handle BEGIN message.
743  */
744 static void
apply_handle_begin(StringInfo s)745 apply_handle_begin(StringInfo s)
746 {
747 	LogicalRepBeginData begin_data;
748 
749 	logicalrep_read_begin(s, &begin_data);
750 
751 	remote_final_lsn = begin_data.final_lsn;
752 
753 	in_remote_transaction = true;
754 
755 	pgstat_report_activity(STATE_RUNNING, NULL);
756 }
757 
758 /*
759  * Handle COMMIT message.
760  *
761  * TODO, support tracking of multiple origins
762  */
763 static void
apply_handle_commit(StringInfo s)764 apply_handle_commit(StringInfo s)
765 {
766 	LogicalRepCommitData commit_data;
767 
768 	logicalrep_read_commit(s, &commit_data);
769 
770 	if (commit_data.commit_lsn != remote_final_lsn)
771 		ereport(ERROR,
772 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
773 				 errmsg_internal("incorrect commit LSN %X/%X in commit message (expected %X/%X)",
774 								 LSN_FORMAT_ARGS(commit_data.commit_lsn),
775 								 LSN_FORMAT_ARGS(remote_final_lsn))));
776 
777 	apply_handle_commit_internal(&commit_data);
778 
779 	/* Process any tables that are being synchronized in parallel. */
780 	process_syncing_tables(commit_data.end_lsn);
781 
782 	pgstat_report_activity(STATE_IDLE, NULL);
783 }
784 
785 /*
786  * Handle ORIGIN message.
787  *
788  * TODO, support tracking of multiple origins
789  */
790 static void
apply_handle_origin(StringInfo s)791 apply_handle_origin(StringInfo s)
792 {
793 	/*
794 	 * ORIGIN message can only come inside streaming transaction or inside
795 	 * remote transaction and before any actual writes.
796 	 */
797 	if (!in_streamed_transaction &&
798 		(!in_remote_transaction ||
799 		 (IsTransactionState() && !am_tablesync_worker())))
800 		ereport(ERROR,
801 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
802 				 errmsg_internal("ORIGIN message sent out of order")));
803 }
804 
805 /*
806  * Handle STREAM START message.
807  */
808 static void
apply_handle_stream_start(StringInfo s)809 apply_handle_stream_start(StringInfo s)
810 {
811 	bool		first_segment;
812 	HASHCTL		hash_ctl;
813 
814 	if (in_streamed_transaction)
815 		ereport(ERROR,
816 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
817 				 errmsg_internal("duplicate STREAM START message")));
818 
819 	/*
820 	 * Start a transaction on stream start, this transaction will be committed
821 	 * on the stream stop unless it is a tablesync worker in which case it
822 	 * will be committed after processing all the messages. We need the
823 	 * transaction for handling the buffile, used for serializing the
824 	 * streaming data and subxact info.
825 	 */
826 	begin_replication_step();
827 
828 	/* notify handle methods we're processing a remote transaction */
829 	in_streamed_transaction = true;
830 
831 	/* extract XID of the top-level transaction */
832 	stream_xid = logicalrep_read_stream_start(s, &first_segment);
833 
834 	if (!TransactionIdIsValid(stream_xid))
835 		ereport(ERROR,
836 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
837 				 errmsg_internal("invalid transaction ID in streamed replication transaction")));
838 
839 	/*
840 	 * Initialize the xidhash table if we haven't yet. This will be used for
841 	 * the entire duration of the apply worker so create it in permanent
842 	 * context.
843 	 */
844 	if (xidhash == NULL)
845 	{
846 		hash_ctl.keysize = sizeof(TransactionId);
847 		hash_ctl.entrysize = sizeof(StreamXidHash);
848 		hash_ctl.hcxt = ApplyContext;
849 		xidhash = hash_create("StreamXidHash", 1024, &hash_ctl,
850 							  HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
851 	}
852 
853 	/* open the spool file for this transaction */
854 	stream_open_file(MyLogicalRepWorker->subid, stream_xid, first_segment);
855 
856 	/* if this is not the first segment, open existing subxact file */
857 	if (!first_segment)
858 		subxact_info_read(MyLogicalRepWorker->subid, stream_xid);
859 
860 	pgstat_report_activity(STATE_RUNNING, NULL);
861 
862 	end_replication_step();
863 }
864 
865 /*
866  * Handle STREAM STOP message.
867  */
868 static void
apply_handle_stream_stop(StringInfo s)869 apply_handle_stream_stop(StringInfo s)
870 {
871 	if (!in_streamed_transaction)
872 		ereport(ERROR,
873 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
874 				 errmsg_internal("STREAM STOP message without STREAM START")));
875 
876 	/*
877 	 * Close the file with serialized changes, and serialize information about
878 	 * subxacts for the toplevel transaction.
879 	 */
880 	subxact_info_write(MyLogicalRepWorker->subid, stream_xid);
881 	stream_close_file();
882 
883 	/* We must be in a valid transaction state */
884 	Assert(IsTransactionState());
885 
886 	/* Commit the per-stream transaction */
887 	CommitTransactionCommand();
888 
889 	in_streamed_transaction = false;
890 
891 	/* Reset per-stream context */
892 	MemoryContextReset(LogicalStreamingContext);
893 
894 	pgstat_report_activity(STATE_IDLE, NULL);
895 }
896 
897 /*
898  * Handle STREAM abort message.
899  */
900 static void
apply_handle_stream_abort(StringInfo s)901 apply_handle_stream_abort(StringInfo s)
902 {
903 	TransactionId xid;
904 	TransactionId subxid;
905 
906 	if (in_streamed_transaction)
907 		ereport(ERROR,
908 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
909 				 errmsg_internal("STREAM ABORT message without STREAM STOP")));
910 
911 	logicalrep_read_stream_abort(s, &xid, &subxid);
912 
913 	/*
914 	 * If the two XIDs are the same, it's in fact abort of toplevel xact, so
915 	 * just delete the files with serialized info.
916 	 */
917 	if (xid == subxid)
918 		stream_cleanup_files(MyLogicalRepWorker->subid, xid);
919 	else
920 	{
921 		/*
922 		 * OK, so it's a subxact. We need to read the subxact file for the
923 		 * toplevel transaction, determine the offset tracked for the subxact,
924 		 * and truncate the file with changes. We also remove the subxacts
925 		 * with higher offsets (or rather higher XIDs).
926 		 *
927 		 * We intentionally scan the array from the tail, because we're likely
928 		 * aborting a change for the most recent subtransactions.
929 		 *
930 		 * We can't use the binary search here as subxact XIDs won't
931 		 * necessarily arrive in sorted order, consider the case where we have
932 		 * released the savepoint for multiple subtransactions and then
933 		 * performed rollback to savepoint for one of the earlier
934 		 * sub-transaction.
935 		 */
936 		int64		i;
937 		int64		subidx;
938 		BufFile    *fd;
939 		bool		found = false;
940 		char		path[MAXPGPATH];
941 		StreamXidHash *ent;
942 
943 		subidx = -1;
944 		begin_replication_step();
945 		subxact_info_read(MyLogicalRepWorker->subid, xid);
946 
947 		for (i = subxact_data.nsubxacts; i > 0; i--)
948 		{
949 			if (subxact_data.subxacts[i - 1].xid == subxid)
950 			{
951 				subidx = (i - 1);
952 				found = true;
953 				break;
954 			}
955 		}
956 
957 		/*
958 		 * If it's an empty sub-transaction then we will not find the subxid
959 		 * here so just cleanup the subxact info and return.
960 		 */
961 		if (!found)
962 		{
963 			/* Cleanup the subxact info */
964 			cleanup_subxact_info();
965 			end_replication_step();
966 			CommitTransactionCommand();
967 			return;
968 		}
969 
970 		ent = (StreamXidHash *) hash_search(xidhash,
971 											(void *) &xid,
972 											HASH_FIND,
973 											NULL);
974 		if (!ent)
975 			ereport(ERROR,
976 					(errcode(ERRCODE_PROTOCOL_VIOLATION),
977 					 errmsg_internal("transaction %u not found in stream XID hash table",
978 									 xid)));
979 
980 		/* open the changes file */
981 		changes_filename(path, MyLogicalRepWorker->subid, xid);
982 		fd = BufFileOpenShared(ent->stream_fileset, path, O_RDWR);
983 
984 		/* OK, truncate the file at the right offset */
985 		BufFileTruncateShared(fd, subxact_data.subxacts[subidx].fileno,
986 							  subxact_data.subxacts[subidx].offset);
987 		BufFileClose(fd);
988 
989 		/* discard the subxacts added later */
990 		subxact_data.nsubxacts = subidx;
991 
992 		/* write the updated subxact list */
993 		subxact_info_write(MyLogicalRepWorker->subid, xid);
994 
995 		end_replication_step();
996 		CommitTransactionCommand();
997 	}
998 }
999 
1000 /*
1001  * Handle STREAM COMMIT message.
1002  */
1003 static void
apply_handle_stream_commit(StringInfo s)1004 apply_handle_stream_commit(StringInfo s)
1005 {
1006 	TransactionId xid;
1007 	StringInfoData s2;
1008 	int			nchanges;
1009 	char		path[MAXPGPATH];
1010 	char	   *buffer = NULL;
1011 	LogicalRepCommitData commit_data;
1012 	StreamXidHash *ent;
1013 	MemoryContext oldcxt;
1014 	BufFile    *fd;
1015 
1016 	if (in_streamed_transaction)
1017 		ereport(ERROR,
1018 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
1019 				 errmsg_internal("STREAM COMMIT message without STREAM STOP")));
1020 
1021 	xid = logicalrep_read_stream_commit(s, &commit_data);
1022 
1023 	elog(DEBUG1, "received commit for streamed transaction %u", xid);
1024 
1025 	/* Make sure we have an open transaction */
1026 	begin_replication_step();
1027 
1028 	/*
1029 	 * Allocate file handle and memory required to process all the messages in
1030 	 * TopTransactionContext to avoid them getting reset after each message is
1031 	 * processed.
1032 	 */
1033 	oldcxt = MemoryContextSwitchTo(TopTransactionContext);
1034 
1035 	/* open the spool file for the committed transaction */
1036 	changes_filename(path, MyLogicalRepWorker->subid, xid);
1037 	elog(DEBUG1, "replaying changes from file \"%s\"", path);
1038 
1039 	ent = (StreamXidHash *) hash_search(xidhash,
1040 										(void *) &xid,
1041 										HASH_FIND,
1042 										NULL);
1043 	if (!ent)
1044 		ereport(ERROR,
1045 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
1046 				 errmsg_internal("transaction %u not found in stream XID hash table",
1047 								 xid)));
1048 
1049 	fd = BufFileOpenShared(ent->stream_fileset, path, O_RDONLY);
1050 
1051 	buffer = palloc(BLCKSZ);
1052 	initStringInfo(&s2);
1053 
1054 	MemoryContextSwitchTo(oldcxt);
1055 
1056 	remote_final_lsn = commit_data.commit_lsn;
1057 
1058 	/*
1059 	 * Make sure the handle apply_dispatch methods are aware we're in a remote
1060 	 * transaction.
1061 	 */
1062 	in_remote_transaction = true;
1063 	pgstat_report_activity(STATE_RUNNING, NULL);
1064 
1065 	end_replication_step();
1066 
1067 	/*
1068 	 * Read the entries one by one and pass them through the same logic as in
1069 	 * apply_dispatch.
1070 	 */
1071 	nchanges = 0;
1072 	while (true)
1073 	{
1074 		int			nbytes;
1075 		int			len;
1076 
1077 		CHECK_FOR_INTERRUPTS();
1078 
1079 		/* read length of the on-disk record */
1080 		nbytes = BufFileRead(fd, &len, sizeof(len));
1081 
1082 		/* have we reached end of the file? */
1083 		if (nbytes == 0)
1084 			break;
1085 
1086 		/* do we have a correct length? */
1087 		if (nbytes != sizeof(len))
1088 			ereport(ERROR,
1089 					(errcode_for_file_access(),
1090 					 errmsg("could not read from streaming transaction's changes file \"%s\": %m",
1091 							path)));
1092 
1093 		if (len <= 0)
1094 			elog(ERROR, "incorrect length %d in streaming transaction's changes file \"%s\"",
1095 				 len, path);
1096 
1097 		/* make sure we have sufficiently large buffer */
1098 		buffer = repalloc(buffer, len);
1099 
1100 		/* and finally read the data into the buffer */
1101 		if (BufFileRead(fd, buffer, len) != len)
1102 			ereport(ERROR,
1103 					(errcode_for_file_access(),
1104 					 errmsg("could not read from streaming transaction's changes file \"%s\": %m",
1105 							path)));
1106 
1107 		/* copy the buffer to the stringinfo and call apply_dispatch */
1108 		resetStringInfo(&s2);
1109 		appendBinaryStringInfo(&s2, buffer, len);
1110 
1111 		/* Ensure we are reading the data into our memory context. */
1112 		oldcxt = MemoryContextSwitchTo(ApplyMessageContext);
1113 
1114 		apply_dispatch(&s2);
1115 
1116 		MemoryContextReset(ApplyMessageContext);
1117 
1118 		MemoryContextSwitchTo(oldcxt);
1119 
1120 		nchanges++;
1121 
1122 		if (nchanges % 1000 == 0)
1123 			elog(DEBUG1, "replayed %d changes from file \"%s\"",
1124 				 nchanges, path);
1125 	}
1126 
1127 	BufFileClose(fd);
1128 
1129 	pfree(buffer);
1130 	pfree(s2.data);
1131 
1132 	elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
1133 		 nchanges, path);
1134 
1135 	apply_handle_commit_internal(&commit_data);
1136 
1137 	/* unlink the files with serialized changes and subxact info */
1138 	stream_cleanup_files(MyLogicalRepWorker->subid, xid);
1139 
1140 	/* Process any tables that are being synchronized in parallel. */
1141 	process_syncing_tables(commit_data.end_lsn);
1142 
1143 	pgstat_report_activity(STATE_IDLE, NULL);
1144 }
1145 
1146 /*
1147  * Helper function for apply_handle_commit and apply_handle_stream_commit.
1148  */
1149 static void
apply_handle_commit_internal(LogicalRepCommitData * commit_data)1150 apply_handle_commit_internal(LogicalRepCommitData *commit_data)
1151 {
1152 	if (IsTransactionState())
1153 	{
1154 		/*
1155 		 * Update origin state so we can restart streaming from correct
1156 		 * position in case of crash.
1157 		 */
1158 		replorigin_session_origin_lsn = commit_data->end_lsn;
1159 		replorigin_session_origin_timestamp = commit_data->committime;
1160 
1161 		CommitTransactionCommand();
1162 		pgstat_report_stat(false);
1163 
1164 		store_flush_position(commit_data->end_lsn);
1165 	}
1166 	else
1167 	{
1168 		/* Process any invalidation messages that might have accumulated. */
1169 		AcceptInvalidationMessages();
1170 		maybe_reread_subscription();
1171 	}
1172 
1173 	in_remote_transaction = false;
1174 }
1175 
1176 /*
1177  * Handle RELATION message.
1178  *
1179  * Note we don't do validation against local schema here. The validation
1180  * against local schema is postponed until first change for given relation
1181  * comes as we only care about it when applying changes for it anyway and we
1182  * do less locking this way.
1183  */
1184 static void
apply_handle_relation(StringInfo s)1185 apply_handle_relation(StringInfo s)
1186 {
1187 	LogicalRepRelation *rel;
1188 
1189 	if (handle_streamed_transaction(LOGICAL_REP_MSG_RELATION, s))
1190 		return;
1191 
1192 	rel = logicalrep_read_rel(s);
1193 	logicalrep_relmap_update(rel);
1194 }
1195 
1196 /*
1197  * Handle TYPE message.
1198  *
1199  * This is now vestigial; we read the info and discard it.
1200  */
1201 static void
apply_handle_type(StringInfo s)1202 apply_handle_type(StringInfo s)
1203 {
1204 	LogicalRepTyp typ;
1205 
1206 	if (handle_streamed_transaction(LOGICAL_REP_MSG_TYPE, s))
1207 		return;
1208 
1209 	logicalrep_read_typ(s, &typ);
1210 }
1211 
1212 /*
1213  * Get replica identity index or if it is not defined a primary key.
1214  *
1215  * If neither is defined, returns InvalidOid
1216  */
1217 static Oid
GetRelationIdentityOrPK(Relation rel)1218 GetRelationIdentityOrPK(Relation rel)
1219 {
1220 	Oid			idxoid;
1221 
1222 	idxoid = RelationGetReplicaIndex(rel);
1223 
1224 	if (!OidIsValid(idxoid))
1225 		idxoid = RelationGetPrimaryKeyIndex(rel);
1226 
1227 	return idxoid;
1228 }
1229 
1230 /*
1231  * Handle INSERT message.
1232  */
1233 
1234 static void
apply_handle_insert(StringInfo s)1235 apply_handle_insert(StringInfo s)
1236 {
1237 	LogicalRepRelMapEntry *rel;
1238 	LogicalRepTupleData newtup;
1239 	LogicalRepRelId relid;
1240 	ApplyExecutionData *edata;
1241 	EState	   *estate;
1242 	TupleTableSlot *remoteslot;
1243 	MemoryContext oldctx;
1244 
1245 	if (handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s))
1246 		return;
1247 
1248 	begin_replication_step();
1249 
1250 	relid = logicalrep_read_insert(s, &newtup);
1251 	rel = logicalrep_rel_open(relid, RowExclusiveLock);
1252 	if (!should_apply_changes_for_rel(rel))
1253 	{
1254 		/*
1255 		 * The relation can't become interesting in the middle of the
1256 		 * transaction so it's safe to unlock it.
1257 		 */
1258 		logicalrep_rel_close(rel, RowExclusiveLock);
1259 		end_replication_step();
1260 		return;
1261 	}
1262 
1263 	/* Initialize the executor state. */
1264 	edata = create_edata_for_relation(rel);
1265 	estate = edata->estate;
1266 	remoteslot = ExecInitExtraTupleSlot(estate,
1267 										RelationGetDescr(rel->localrel),
1268 										&TTSOpsVirtual);
1269 
1270 	/* Process and store remote tuple in the slot */
1271 	oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
1272 	slot_store_data(remoteslot, rel, &newtup);
1273 	slot_fill_defaults(rel, estate, remoteslot);
1274 	MemoryContextSwitchTo(oldctx);
1275 
1276 	/* For a partitioned table, insert the tuple into a partition. */
1277 	if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1278 		apply_handle_tuple_routing(edata,
1279 								   remoteslot, NULL, CMD_INSERT);
1280 	else
1281 		apply_handle_insert_internal(edata, edata->targetRelInfo,
1282 									 remoteslot);
1283 
1284 	finish_edata(edata);
1285 
1286 	logicalrep_rel_close(rel, NoLock);
1287 
1288 	end_replication_step();
1289 }
1290 
1291 /*
1292  * Workhorse for apply_handle_insert()
1293  * relinfo is for the relation we're actually inserting into
1294  * (could be a child partition of edata->targetRelInfo)
1295  */
1296 static void
apply_handle_insert_internal(ApplyExecutionData * edata,ResultRelInfo * relinfo,TupleTableSlot * remoteslot)1297 apply_handle_insert_internal(ApplyExecutionData *edata,
1298 							 ResultRelInfo *relinfo,
1299 							 TupleTableSlot *remoteslot)
1300 {
1301 	EState	   *estate = edata->estate;
1302 
1303 	/* We must open indexes here. */
1304 	ExecOpenIndices(relinfo, false);
1305 
1306 	/* Do the insert. */
1307 	ExecSimpleRelationInsert(relinfo, estate, remoteslot);
1308 
1309 	/* Cleanup. */
1310 	ExecCloseIndices(relinfo);
1311 }
1312 
1313 /*
1314  * Check if the logical replication relation is updatable and throw
1315  * appropriate error if it isn't.
1316  */
1317 static void
check_relation_updatable(LogicalRepRelMapEntry * rel)1318 check_relation_updatable(LogicalRepRelMapEntry *rel)
1319 {
1320 	/* Updatable, no error. */
1321 	if (rel->updatable)
1322 		return;
1323 
1324 	/*
1325 	 * We are in error mode so it's fine this is somewhat slow. It's better to
1326 	 * give user correct error.
1327 	 */
1328 	if (OidIsValid(GetRelationIdentityOrPK(rel->localrel)))
1329 	{
1330 		ereport(ERROR,
1331 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1332 				 errmsg("publisher did not send replica identity column "
1333 						"expected by the logical replication target relation \"%s.%s\"",
1334 						rel->remoterel.nspname, rel->remoterel.relname)));
1335 	}
1336 
1337 	ereport(ERROR,
1338 			(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1339 			 errmsg("logical replication target relation \"%s.%s\" has "
1340 					"neither REPLICA IDENTITY index nor PRIMARY "
1341 					"KEY and published relation does not have "
1342 					"REPLICA IDENTITY FULL",
1343 					rel->remoterel.nspname, rel->remoterel.relname)));
1344 }
1345 
1346 /*
1347  * Handle UPDATE message.
1348  *
1349  * TODO: FDW support
1350  */
1351 static void
apply_handle_update(StringInfo s)1352 apply_handle_update(StringInfo s)
1353 {
1354 	LogicalRepRelMapEntry *rel;
1355 	LogicalRepRelId relid;
1356 	ApplyExecutionData *edata;
1357 	EState	   *estate;
1358 	LogicalRepTupleData oldtup;
1359 	LogicalRepTupleData newtup;
1360 	bool		has_oldtup;
1361 	TupleTableSlot *remoteslot;
1362 	RangeTblEntry *target_rte;
1363 	MemoryContext oldctx;
1364 
1365 	if (handle_streamed_transaction(LOGICAL_REP_MSG_UPDATE, s))
1366 		return;
1367 
1368 	begin_replication_step();
1369 
1370 	relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
1371 								   &newtup);
1372 	rel = logicalrep_rel_open(relid, RowExclusiveLock);
1373 	if (!should_apply_changes_for_rel(rel))
1374 	{
1375 		/*
1376 		 * The relation can't become interesting in the middle of the
1377 		 * transaction so it's safe to unlock it.
1378 		 */
1379 		logicalrep_rel_close(rel, RowExclusiveLock);
1380 		end_replication_step();
1381 		return;
1382 	}
1383 
1384 	/* Check if we can do the update. */
1385 	check_relation_updatable(rel);
1386 
1387 	/* Initialize the executor state. */
1388 	edata = create_edata_for_relation(rel);
1389 	estate = edata->estate;
1390 	remoteslot = ExecInitExtraTupleSlot(estate,
1391 										RelationGetDescr(rel->localrel),
1392 										&TTSOpsVirtual);
1393 
1394 	/*
1395 	 * Populate updatedCols so that per-column triggers can fire, and so
1396 	 * executor can correctly pass down indexUnchanged hint.  This could
1397 	 * include more columns than were actually changed on the publisher
1398 	 * because the logical replication protocol doesn't contain that
1399 	 * information.  But it would for example exclude columns that only exist
1400 	 * on the subscriber, since we are not touching those.
1401 	 */
1402 	target_rte = list_nth(estate->es_range_table, 0);
1403 	for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
1404 	{
1405 		Form_pg_attribute att = TupleDescAttr(remoteslot->tts_tupleDescriptor, i);
1406 		int			remoteattnum = rel->attrmap->attnums[i];
1407 
1408 		if (!att->attisdropped && remoteattnum >= 0)
1409 		{
1410 			Assert(remoteattnum < newtup.ncols);
1411 			if (newtup.colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
1412 				target_rte->updatedCols =
1413 					bms_add_member(target_rte->updatedCols,
1414 								   i + 1 - FirstLowInvalidHeapAttributeNumber);
1415 		}
1416 	}
1417 
1418 	/* Also populate extraUpdatedCols, in case we have generated columns */
1419 	fill_extraUpdatedCols(target_rte, rel->localrel);
1420 
1421 	/* Build the search tuple. */
1422 	oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
1423 	slot_store_data(remoteslot, rel,
1424 					has_oldtup ? &oldtup : &newtup);
1425 	MemoryContextSwitchTo(oldctx);
1426 
1427 	/* For a partitioned table, apply update to correct partition. */
1428 	if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1429 		apply_handle_tuple_routing(edata,
1430 								   remoteslot, &newtup, CMD_UPDATE);
1431 	else
1432 		apply_handle_update_internal(edata, edata->targetRelInfo,
1433 									 remoteslot, &newtup);
1434 
1435 	finish_edata(edata);
1436 
1437 	logicalrep_rel_close(rel, NoLock);
1438 
1439 	end_replication_step();
1440 }
1441 
1442 /*
1443  * Workhorse for apply_handle_update()
1444  * relinfo is for the relation we're actually updating in
1445  * (could be a child partition of edata->targetRelInfo)
1446  */
1447 static void
apply_handle_update_internal(ApplyExecutionData * edata,ResultRelInfo * relinfo,TupleTableSlot * remoteslot,LogicalRepTupleData * newtup)1448 apply_handle_update_internal(ApplyExecutionData *edata,
1449 							 ResultRelInfo *relinfo,
1450 							 TupleTableSlot *remoteslot,
1451 							 LogicalRepTupleData *newtup)
1452 {
1453 	EState	   *estate = edata->estate;
1454 	LogicalRepRelMapEntry *relmapentry = edata->targetRel;
1455 	Relation	localrel = relinfo->ri_RelationDesc;
1456 	EPQState	epqstate;
1457 	TupleTableSlot *localslot;
1458 	bool		found;
1459 	MemoryContext oldctx;
1460 
1461 	EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
1462 	ExecOpenIndices(relinfo, false);
1463 
1464 	found = FindReplTupleInLocalRel(estate, localrel,
1465 									&relmapentry->remoterel,
1466 									remoteslot, &localslot);
1467 	ExecClearTuple(remoteslot);
1468 
1469 	/*
1470 	 * Tuple found.
1471 	 *
1472 	 * Note this will fail if there are other conflicting unique indexes.
1473 	 */
1474 	if (found)
1475 	{
1476 		/* Process and store remote tuple in the slot */
1477 		oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
1478 		slot_modify_data(remoteslot, localslot, relmapentry, newtup);
1479 		MemoryContextSwitchTo(oldctx);
1480 
1481 		EvalPlanQualSetSlot(&epqstate, remoteslot);
1482 
1483 		/* Do the actual update. */
1484 		ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot,
1485 								 remoteslot);
1486 	}
1487 	else
1488 	{
1489 		/*
1490 		 * The tuple to be updated could not be found.  Do nothing except for
1491 		 * emitting a log message.
1492 		 *
1493 		 * XXX should this be promoted to ereport(LOG) perhaps?
1494 		 */
1495 		elog(DEBUG1,
1496 			 "logical replication did not find row to be updated "
1497 			 "in replication target relation \"%s\"",
1498 			 RelationGetRelationName(localrel));
1499 	}
1500 
1501 	/* Cleanup. */
1502 	ExecCloseIndices(relinfo);
1503 	EvalPlanQualEnd(&epqstate);
1504 }
1505 
1506 /*
1507  * Handle DELETE message.
1508  *
1509  * TODO: FDW support
1510  */
1511 static void
apply_handle_delete(StringInfo s)1512 apply_handle_delete(StringInfo s)
1513 {
1514 	LogicalRepRelMapEntry *rel;
1515 	LogicalRepTupleData oldtup;
1516 	LogicalRepRelId relid;
1517 	ApplyExecutionData *edata;
1518 	EState	   *estate;
1519 	TupleTableSlot *remoteslot;
1520 	MemoryContext oldctx;
1521 
1522 	if (handle_streamed_transaction(LOGICAL_REP_MSG_DELETE, s))
1523 		return;
1524 
1525 	begin_replication_step();
1526 
1527 	relid = logicalrep_read_delete(s, &oldtup);
1528 	rel = logicalrep_rel_open(relid, RowExclusiveLock);
1529 	if (!should_apply_changes_for_rel(rel))
1530 	{
1531 		/*
1532 		 * The relation can't become interesting in the middle of the
1533 		 * transaction so it's safe to unlock it.
1534 		 */
1535 		logicalrep_rel_close(rel, RowExclusiveLock);
1536 		end_replication_step();
1537 		return;
1538 	}
1539 
1540 	/* Check if we can do the delete. */
1541 	check_relation_updatable(rel);
1542 
1543 	/* Initialize the executor state. */
1544 	edata = create_edata_for_relation(rel);
1545 	estate = edata->estate;
1546 	remoteslot = ExecInitExtraTupleSlot(estate,
1547 										RelationGetDescr(rel->localrel),
1548 										&TTSOpsVirtual);
1549 
1550 	/* Build the search tuple. */
1551 	oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
1552 	slot_store_data(remoteslot, rel, &oldtup);
1553 	MemoryContextSwitchTo(oldctx);
1554 
1555 	/* For a partitioned table, apply delete to correct partition. */
1556 	if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1557 		apply_handle_tuple_routing(edata,
1558 								   remoteslot, NULL, CMD_DELETE);
1559 	else
1560 		apply_handle_delete_internal(edata, edata->targetRelInfo,
1561 									 remoteslot);
1562 
1563 	finish_edata(edata);
1564 
1565 	logicalrep_rel_close(rel, NoLock);
1566 
1567 	end_replication_step();
1568 }
1569 
1570 /*
1571  * Workhorse for apply_handle_delete()
1572  * relinfo is for the relation we're actually deleting from
1573  * (could be a child partition of edata->targetRelInfo)
1574  */
1575 static void
apply_handle_delete_internal(ApplyExecutionData * edata,ResultRelInfo * relinfo,TupleTableSlot * remoteslot)1576 apply_handle_delete_internal(ApplyExecutionData *edata,
1577 							 ResultRelInfo *relinfo,
1578 							 TupleTableSlot *remoteslot)
1579 {
1580 	EState	   *estate = edata->estate;
1581 	Relation	localrel = relinfo->ri_RelationDesc;
1582 	LogicalRepRelation *remoterel = &edata->targetRel->remoterel;
1583 	EPQState	epqstate;
1584 	TupleTableSlot *localslot;
1585 	bool		found;
1586 
1587 	EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
1588 	ExecOpenIndices(relinfo, false);
1589 
1590 	found = FindReplTupleInLocalRel(estate, localrel, remoterel,
1591 									remoteslot, &localslot);
1592 
1593 	/* If found delete it. */
1594 	if (found)
1595 	{
1596 		EvalPlanQualSetSlot(&epqstate, localslot);
1597 
1598 		/* Do the actual delete. */
1599 		ExecSimpleRelationDelete(relinfo, estate, &epqstate, localslot);
1600 	}
1601 	else
1602 	{
1603 		/*
1604 		 * The tuple to be deleted could not be found.  Do nothing except for
1605 		 * emitting a log message.
1606 		 *
1607 		 * XXX should this be promoted to ereport(LOG) perhaps?
1608 		 */
1609 		elog(DEBUG1,
1610 			 "logical replication did not find row to be deleted "
1611 			 "in replication target relation \"%s\"",
1612 			 RelationGetRelationName(localrel));
1613 	}
1614 
1615 	/* Cleanup. */
1616 	ExecCloseIndices(relinfo);
1617 	EvalPlanQualEnd(&epqstate);
1618 }
1619 
1620 /*
1621  * Try to find a tuple received from the publication side (in 'remoteslot') in
1622  * the corresponding local relation using either replica identity index,
1623  * primary key or if needed, sequential scan.
1624  *
1625  * Local tuple, if found, is returned in '*localslot'.
1626  */
1627 static bool
FindReplTupleInLocalRel(EState * estate,Relation localrel,LogicalRepRelation * remoterel,TupleTableSlot * remoteslot,TupleTableSlot ** localslot)1628 FindReplTupleInLocalRel(EState *estate, Relation localrel,
1629 						LogicalRepRelation *remoterel,
1630 						TupleTableSlot *remoteslot,
1631 						TupleTableSlot **localslot)
1632 {
1633 	Oid			idxoid;
1634 	bool		found;
1635 
1636 	*localslot = table_slot_create(localrel, &estate->es_tupleTable);
1637 
1638 	idxoid = GetRelationIdentityOrPK(localrel);
1639 	Assert(OidIsValid(idxoid) ||
1640 		   (remoterel->replident == REPLICA_IDENTITY_FULL));
1641 
1642 	if (OidIsValid(idxoid))
1643 		found = RelationFindReplTupleByIndex(localrel, idxoid,
1644 											 LockTupleExclusive,
1645 											 remoteslot, *localslot);
1646 	else
1647 		found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
1648 										 remoteslot, *localslot);
1649 
1650 	return found;
1651 }
1652 
1653 /*
1654  * This handles insert, update, delete on a partitioned table.
1655  */
1656 static void
apply_handle_tuple_routing(ApplyExecutionData * edata,TupleTableSlot * remoteslot,LogicalRepTupleData * newtup,CmdType operation)1657 apply_handle_tuple_routing(ApplyExecutionData *edata,
1658 						   TupleTableSlot *remoteslot,
1659 						   LogicalRepTupleData *newtup,
1660 						   CmdType operation)
1661 {
1662 	EState	   *estate = edata->estate;
1663 	LogicalRepRelMapEntry *relmapentry = edata->targetRel;
1664 	ResultRelInfo *relinfo = edata->targetRelInfo;
1665 	Relation	parentrel = relinfo->ri_RelationDesc;
1666 	ModifyTableState *mtstate;
1667 	PartitionTupleRouting *proute;
1668 	ResultRelInfo *partrelinfo;
1669 	Relation	partrel;
1670 	TupleTableSlot *remoteslot_part;
1671 	TupleConversionMap *map;
1672 	MemoryContext oldctx;
1673 
1674 	/* ModifyTableState is needed for ExecFindPartition(). */
1675 	edata->mtstate = mtstate = makeNode(ModifyTableState);
1676 	mtstate->ps.plan = NULL;
1677 	mtstate->ps.state = estate;
1678 	mtstate->operation = operation;
1679 	mtstate->resultRelInfo = relinfo;
1680 
1681 	/* ... as is PartitionTupleRouting. */
1682 	edata->proute = proute = ExecSetupPartitionTupleRouting(estate, parentrel);
1683 
1684 	/*
1685 	 * Find the partition to which the "search tuple" belongs.
1686 	 */
1687 	Assert(remoteslot != NULL);
1688 	oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
1689 	partrelinfo = ExecFindPartition(mtstate, relinfo, proute,
1690 									remoteslot, estate);
1691 	Assert(partrelinfo != NULL);
1692 	partrel = partrelinfo->ri_RelationDesc;
1693 
1694 	/*
1695 	 * To perform any of the operations below, the tuple must match the
1696 	 * partition's rowtype. Convert if needed or just copy, using a dedicated
1697 	 * slot to store the tuple in any case.
1698 	 */
1699 	remoteslot_part = partrelinfo->ri_PartitionTupleSlot;
1700 	if (remoteslot_part == NULL)
1701 		remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
1702 	map = partrelinfo->ri_RootToPartitionMap;
1703 	if (map != NULL)
1704 		remoteslot_part = execute_attr_map_slot(map->attrMap, remoteslot,
1705 												remoteslot_part);
1706 	else
1707 	{
1708 		remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
1709 		slot_getallattrs(remoteslot_part);
1710 	}
1711 	MemoryContextSwitchTo(oldctx);
1712 
1713 	switch (operation)
1714 	{
1715 		case CMD_INSERT:
1716 			apply_handle_insert_internal(edata, partrelinfo,
1717 										 remoteslot_part);
1718 			break;
1719 
1720 		case CMD_DELETE:
1721 			apply_handle_delete_internal(edata, partrelinfo,
1722 										 remoteslot_part);
1723 			break;
1724 
1725 		case CMD_UPDATE:
1726 
1727 			/*
1728 			 * For UPDATE, depending on whether or not the updated tuple
1729 			 * satisfies the partition's constraint, perform a simple UPDATE
1730 			 * of the partition or move the updated tuple into a different
1731 			 * suitable partition.
1732 			 */
1733 			{
1734 				AttrMap    *attrmap = map ? map->attrMap : NULL;
1735 				LogicalRepRelMapEntry *part_entry;
1736 				TupleTableSlot *localslot;
1737 				ResultRelInfo *partrelinfo_new;
1738 				bool		found;
1739 
1740 				part_entry = logicalrep_partition_open(relmapentry, partrel,
1741 													   attrmap);
1742 
1743 				/* Get the matching local tuple from the partition. */
1744 				found = FindReplTupleInLocalRel(estate, partrel,
1745 												&part_entry->remoterel,
1746 												remoteslot_part, &localslot);
1747 				if (!found)
1748 				{
1749 					/*
1750 					 * The tuple to be updated could not be found.  Do nothing
1751 					 * except for emitting a log message.
1752 					 *
1753 					 * XXX should this be promoted to ereport(LOG) perhaps?
1754 					 */
1755 					elog(DEBUG1,
1756 						 "logical replication did not find row to be updated "
1757 						 "in replication target relation's partition \"%s\"",
1758 						 RelationGetRelationName(partrel));
1759 					return;
1760 				}
1761 
1762 				/*
1763 				 * Apply the update to the local tuple, putting the result in
1764 				 * remoteslot_part.
1765 				 */
1766 				oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
1767 				slot_modify_data(remoteslot_part, localslot, part_entry,
1768 								 newtup);
1769 				MemoryContextSwitchTo(oldctx);
1770 
1771 				/*
1772 				 * Does the updated tuple still satisfy the current
1773 				 * partition's constraint?
1774 				 */
1775 				if (!partrel->rd_rel->relispartition ||
1776 					ExecPartitionCheck(partrelinfo, remoteslot_part, estate,
1777 									   false))
1778 				{
1779 					/*
1780 					 * Yes, so simply UPDATE the partition.  We don't call
1781 					 * apply_handle_update_internal() here, which would
1782 					 * normally do the following work, to avoid repeating some
1783 					 * work already done above to find the local tuple in the
1784 					 * partition.
1785 					 */
1786 					EPQState	epqstate;
1787 
1788 					EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
1789 					ExecOpenIndices(partrelinfo, false);
1790 
1791 					EvalPlanQualSetSlot(&epqstate, remoteslot_part);
1792 					ExecSimpleRelationUpdate(partrelinfo, estate, &epqstate,
1793 											 localslot, remoteslot_part);
1794 					ExecCloseIndices(partrelinfo);
1795 					EvalPlanQualEnd(&epqstate);
1796 				}
1797 				else
1798 				{
1799 					/* Move the tuple into the new partition. */
1800 
1801 					/*
1802 					 * New partition will be found using tuple routing, which
1803 					 * can only occur via the parent table.  We might need to
1804 					 * convert the tuple to the parent's rowtype.  Note that
1805 					 * this is the tuple found in the partition, not the
1806 					 * original search tuple received by this function.
1807 					 */
1808 					if (map)
1809 					{
1810 						TupleConversionMap *PartitionToRootMap =
1811 						convert_tuples_by_name(RelationGetDescr(partrel),
1812 											   RelationGetDescr(parentrel));
1813 
1814 						remoteslot =
1815 							execute_attr_map_slot(PartitionToRootMap->attrMap,
1816 												  remoteslot_part, remoteslot);
1817 					}
1818 					else
1819 					{
1820 						remoteslot = ExecCopySlot(remoteslot, remoteslot_part);
1821 						slot_getallattrs(remoteslot);
1822 					}
1823 
1824 
1825 					/* Find the new partition. */
1826 					oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
1827 					partrelinfo_new = ExecFindPartition(mtstate, relinfo,
1828 														proute, remoteslot,
1829 														estate);
1830 					MemoryContextSwitchTo(oldctx);
1831 					Assert(partrelinfo_new != partrelinfo);
1832 
1833 					/* DELETE old tuple found in the old partition. */
1834 					apply_handle_delete_internal(edata, partrelinfo,
1835 												 localslot);
1836 
1837 					/* INSERT new tuple into the new partition. */
1838 
1839 					/*
1840 					 * Convert the replacement tuple to match the destination
1841 					 * partition rowtype.
1842 					 */
1843 					oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
1844 					partrel = partrelinfo_new->ri_RelationDesc;
1845 					remoteslot_part = partrelinfo_new->ri_PartitionTupleSlot;
1846 					if (remoteslot_part == NULL)
1847 						remoteslot_part = table_slot_create(partrel,
1848 															&estate->es_tupleTable);
1849 					map = partrelinfo_new->ri_RootToPartitionMap;
1850 					if (map != NULL)
1851 					{
1852 						remoteslot_part = execute_attr_map_slot(map->attrMap,
1853 																remoteslot,
1854 																remoteslot_part);
1855 					}
1856 					else
1857 					{
1858 						remoteslot_part = ExecCopySlot(remoteslot_part,
1859 													   remoteslot);
1860 						slot_getallattrs(remoteslot);
1861 					}
1862 					MemoryContextSwitchTo(oldctx);
1863 					apply_handle_insert_internal(edata, partrelinfo_new,
1864 												 remoteslot_part);
1865 				}
1866 			}
1867 			break;
1868 
1869 		default:
1870 			elog(ERROR, "unrecognized CmdType: %d", (int) operation);
1871 			break;
1872 	}
1873 }
1874 
1875 /*
1876  * Handle TRUNCATE message.
1877  *
1878  * TODO: FDW support
1879  */
1880 static void
apply_handle_truncate(StringInfo s)1881 apply_handle_truncate(StringInfo s)
1882 {
1883 	bool		cascade = false;
1884 	bool		restart_seqs = false;
1885 	List	   *remote_relids = NIL;
1886 	List	   *remote_rels = NIL;
1887 	List	   *rels = NIL;
1888 	List	   *part_rels = NIL;
1889 	List	   *relids = NIL;
1890 	List	   *relids_logged = NIL;
1891 	ListCell   *lc;
1892 	LOCKMODE	lockmode = AccessExclusiveLock;
1893 
1894 	if (handle_streamed_transaction(LOGICAL_REP_MSG_TRUNCATE, s))
1895 		return;
1896 
1897 	begin_replication_step();
1898 
1899 	remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
1900 
1901 	foreach(lc, remote_relids)
1902 	{
1903 		LogicalRepRelId relid = lfirst_oid(lc);
1904 		LogicalRepRelMapEntry *rel;
1905 
1906 		rel = logicalrep_rel_open(relid, lockmode);
1907 		if (!should_apply_changes_for_rel(rel))
1908 		{
1909 			/*
1910 			 * The relation can't become interesting in the middle of the
1911 			 * transaction so it's safe to unlock it.
1912 			 */
1913 			logicalrep_rel_close(rel, lockmode);
1914 			continue;
1915 		}
1916 
1917 		remote_rels = lappend(remote_rels, rel);
1918 		rels = lappend(rels, rel->localrel);
1919 		relids = lappend_oid(relids, rel->localreloid);
1920 		if (RelationIsLogicallyLogged(rel->localrel))
1921 			relids_logged = lappend_oid(relids_logged, rel->localreloid);
1922 
1923 		/*
1924 		 * Truncate partitions if we got a message to truncate a partitioned
1925 		 * table.
1926 		 */
1927 		if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1928 		{
1929 			ListCell   *child;
1930 			List	   *children = find_all_inheritors(rel->localreloid,
1931 													   lockmode,
1932 													   NULL);
1933 
1934 			foreach(child, children)
1935 			{
1936 				Oid			childrelid = lfirst_oid(child);
1937 				Relation	childrel;
1938 
1939 				if (list_member_oid(relids, childrelid))
1940 					continue;
1941 
1942 				/* find_all_inheritors already got lock */
1943 				childrel = table_open(childrelid, NoLock);
1944 
1945 				/*
1946 				 * Ignore temp tables of other backends.  See similar code in
1947 				 * ExecuteTruncate().
1948 				 */
1949 				if (RELATION_IS_OTHER_TEMP(childrel))
1950 				{
1951 					table_close(childrel, lockmode);
1952 					continue;
1953 				}
1954 
1955 				rels = lappend(rels, childrel);
1956 				part_rels = lappend(part_rels, childrel);
1957 				relids = lappend_oid(relids, childrelid);
1958 				/* Log this relation only if needed for logical decoding */
1959 				if (RelationIsLogicallyLogged(childrel))
1960 					relids_logged = lappend_oid(relids_logged, childrelid);
1961 			}
1962 		}
1963 	}
1964 
1965 	/*
1966 	 * Even if we used CASCADE on the upstream primary we explicitly default
1967 	 * to replaying changes without further cascading. This might be later
1968 	 * changeable with a user specified option.
1969 	 */
1970 	ExecuteTruncateGuts(rels,
1971 						relids,
1972 						relids_logged,
1973 						DROP_RESTRICT,
1974 						restart_seqs);
1975 	foreach(lc, remote_rels)
1976 	{
1977 		LogicalRepRelMapEntry *rel = lfirst(lc);
1978 
1979 		logicalrep_rel_close(rel, NoLock);
1980 	}
1981 	foreach(lc, part_rels)
1982 	{
1983 		Relation	rel = lfirst(lc);
1984 
1985 		table_close(rel, NoLock);
1986 	}
1987 
1988 	end_replication_step();
1989 }
1990 
1991 
1992 /*
1993  * Logical replication protocol message dispatcher.
1994  */
1995 static void
apply_dispatch(StringInfo s)1996 apply_dispatch(StringInfo s)
1997 {
1998 	LogicalRepMsgType action = pq_getmsgbyte(s);
1999 
2000 	switch (action)
2001 	{
2002 		case LOGICAL_REP_MSG_BEGIN:
2003 			apply_handle_begin(s);
2004 			return;
2005 
2006 		case LOGICAL_REP_MSG_COMMIT:
2007 			apply_handle_commit(s);
2008 			return;
2009 
2010 		case LOGICAL_REP_MSG_INSERT:
2011 			apply_handle_insert(s);
2012 			return;
2013 
2014 		case LOGICAL_REP_MSG_UPDATE:
2015 			apply_handle_update(s);
2016 			return;
2017 
2018 		case LOGICAL_REP_MSG_DELETE:
2019 			apply_handle_delete(s);
2020 			return;
2021 
2022 		case LOGICAL_REP_MSG_TRUNCATE:
2023 			apply_handle_truncate(s);
2024 			return;
2025 
2026 		case LOGICAL_REP_MSG_RELATION:
2027 			apply_handle_relation(s);
2028 			return;
2029 
2030 		case LOGICAL_REP_MSG_TYPE:
2031 			apply_handle_type(s);
2032 			return;
2033 
2034 		case LOGICAL_REP_MSG_ORIGIN:
2035 			apply_handle_origin(s);
2036 			return;
2037 
2038 		case LOGICAL_REP_MSG_MESSAGE:
2039 
2040 			/*
2041 			 * Logical replication does not use generic logical messages yet.
2042 			 * Although, it could be used by other applications that use this
2043 			 * output plugin.
2044 			 */
2045 			return;
2046 
2047 		case LOGICAL_REP_MSG_STREAM_START:
2048 			apply_handle_stream_start(s);
2049 			return;
2050 
2051 		case LOGICAL_REP_MSG_STREAM_END:
2052 			apply_handle_stream_stop(s);
2053 			return;
2054 
2055 		case LOGICAL_REP_MSG_STREAM_ABORT:
2056 			apply_handle_stream_abort(s);
2057 			return;
2058 
2059 		case LOGICAL_REP_MSG_STREAM_COMMIT:
2060 			apply_handle_stream_commit(s);
2061 			return;
2062 	}
2063 
2064 	ereport(ERROR,
2065 			(errcode(ERRCODE_PROTOCOL_VIOLATION),
2066 			 errmsg_internal("invalid logical replication message type \"%c\"",
2067 							 action)));
2068 }
2069 
2070 /*
2071  * Figure out which write/flush positions to report to the walsender process.
2072  *
2073  * We can't simply report back the last LSN the walsender sent us because the
2074  * local transaction might not yet be flushed to disk locally. Instead we
2075  * build a list that associates local with remote LSNs for every commit. When
2076  * reporting back the flush position to the sender we iterate that list and
2077  * check which entries on it are already locally flushed. Those we can report
2078  * as having been flushed.
2079  *
2080  * The have_pending_txes is true if there are outstanding transactions that
2081  * need to be flushed.
2082  */
2083 static void
get_flush_position(XLogRecPtr * write,XLogRecPtr * flush,bool * have_pending_txes)2084 get_flush_position(XLogRecPtr *write, XLogRecPtr *flush,
2085 				   bool *have_pending_txes)
2086 {
2087 	dlist_mutable_iter iter;
2088 	XLogRecPtr	local_flush = GetFlushRecPtr();
2089 
2090 	*write = InvalidXLogRecPtr;
2091 	*flush = InvalidXLogRecPtr;
2092 
2093 	dlist_foreach_modify(iter, &lsn_mapping)
2094 	{
2095 		FlushPosition *pos =
2096 		dlist_container(FlushPosition, node, iter.cur);
2097 
2098 		*write = pos->remote_end;
2099 
2100 		if (pos->local_end <= local_flush)
2101 		{
2102 			*flush = pos->remote_end;
2103 			dlist_delete(iter.cur);
2104 			pfree(pos);
2105 		}
2106 		else
2107 		{
2108 			/*
2109 			 * Don't want to uselessly iterate over the rest of the list which
2110 			 * could potentially be long. Instead get the last element and
2111 			 * grab the write position from there.
2112 			 */
2113 			pos = dlist_tail_element(FlushPosition, node,
2114 									 &lsn_mapping);
2115 			*write = pos->remote_end;
2116 			*have_pending_txes = true;
2117 			return;
2118 		}
2119 	}
2120 
2121 	*have_pending_txes = !dlist_is_empty(&lsn_mapping);
2122 }
2123 
2124 /*
2125  * Store current remote/local lsn pair in the tracking list.
2126  */
2127 static void
store_flush_position(XLogRecPtr remote_lsn)2128 store_flush_position(XLogRecPtr remote_lsn)
2129 {
2130 	FlushPosition *flushpos;
2131 
2132 	/* Need to do this in permanent context */
2133 	MemoryContextSwitchTo(ApplyContext);
2134 
2135 	/* Track commit lsn  */
2136 	flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
2137 	flushpos->local_end = XactLastCommitEnd;
2138 	flushpos->remote_end = remote_lsn;
2139 
2140 	dlist_push_tail(&lsn_mapping, &flushpos->node);
2141 	MemoryContextSwitchTo(ApplyMessageContext);
2142 }
2143 
2144 
2145 /* Update statistics of the worker. */
2146 static void
UpdateWorkerStats(XLogRecPtr last_lsn,TimestampTz send_time,bool reply)2147 UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
2148 {
2149 	MyLogicalRepWorker->last_lsn = last_lsn;
2150 	MyLogicalRepWorker->last_send_time = send_time;
2151 	MyLogicalRepWorker->last_recv_time = GetCurrentTimestamp();
2152 	if (reply)
2153 	{
2154 		MyLogicalRepWorker->reply_lsn = last_lsn;
2155 		MyLogicalRepWorker->reply_time = send_time;
2156 	}
2157 }
2158 
2159 /*
2160  * Apply main loop.
2161  */
2162 static void
LogicalRepApplyLoop(XLogRecPtr last_received)2163 LogicalRepApplyLoop(XLogRecPtr last_received)
2164 {
2165 	TimestampTz last_recv_timestamp = GetCurrentTimestamp();
2166 	bool		ping_sent = false;
2167 	TimeLineID	tli;
2168 
2169 	/*
2170 	 * Init the ApplyMessageContext which we clean up after each replication
2171 	 * protocol message.
2172 	 */
2173 	ApplyMessageContext = AllocSetContextCreate(ApplyContext,
2174 												"ApplyMessageContext",
2175 												ALLOCSET_DEFAULT_SIZES);
2176 
2177 	/*
2178 	 * This memory context is used for per-stream data when the streaming mode
2179 	 * is enabled. This context is reset on each stream stop.
2180 	 */
2181 	LogicalStreamingContext = AllocSetContextCreate(ApplyContext,
2182 													"LogicalStreamingContext",
2183 													ALLOCSET_DEFAULT_SIZES);
2184 
2185 	/* mark as idle, before starting to loop */
2186 	pgstat_report_activity(STATE_IDLE, NULL);
2187 
2188 	/* This outer loop iterates once per wait. */
2189 	for (;;)
2190 	{
2191 		pgsocket	fd = PGINVALID_SOCKET;
2192 		int			rc;
2193 		int			len;
2194 		char	   *buf = NULL;
2195 		bool		endofstream = false;
2196 		long		wait_time;
2197 
2198 		CHECK_FOR_INTERRUPTS();
2199 
2200 		MemoryContextSwitchTo(ApplyMessageContext);
2201 
2202 		len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
2203 
2204 		if (len != 0)
2205 		{
2206 			/* Loop to process all available data (without blocking). */
2207 			for (;;)
2208 			{
2209 				CHECK_FOR_INTERRUPTS();
2210 
2211 				if (len == 0)
2212 				{
2213 					break;
2214 				}
2215 				else if (len < 0)
2216 				{
2217 					ereport(LOG,
2218 							(errmsg("data stream from publisher has ended")));
2219 					endofstream = true;
2220 					break;
2221 				}
2222 				else
2223 				{
2224 					int			c;
2225 					StringInfoData s;
2226 
2227 					/* Reset timeout. */
2228 					last_recv_timestamp = GetCurrentTimestamp();
2229 					ping_sent = false;
2230 
2231 					/* Ensure we are reading the data into our memory context. */
2232 					MemoryContextSwitchTo(ApplyMessageContext);
2233 
2234 					s.data = buf;
2235 					s.len = len;
2236 					s.cursor = 0;
2237 					s.maxlen = -1;
2238 
2239 					c = pq_getmsgbyte(&s);
2240 
2241 					if (c == 'w')
2242 					{
2243 						XLogRecPtr	start_lsn;
2244 						XLogRecPtr	end_lsn;
2245 						TimestampTz send_time;
2246 
2247 						start_lsn = pq_getmsgint64(&s);
2248 						end_lsn = pq_getmsgint64(&s);
2249 						send_time = pq_getmsgint64(&s);
2250 
2251 						if (last_received < start_lsn)
2252 							last_received = start_lsn;
2253 
2254 						if (last_received < end_lsn)
2255 							last_received = end_lsn;
2256 
2257 						UpdateWorkerStats(last_received, send_time, false);
2258 
2259 						apply_dispatch(&s);
2260 					}
2261 					else if (c == 'k')
2262 					{
2263 						XLogRecPtr	end_lsn;
2264 						TimestampTz timestamp;
2265 						bool		reply_requested;
2266 
2267 						end_lsn = pq_getmsgint64(&s);
2268 						timestamp = pq_getmsgint64(&s);
2269 						reply_requested = pq_getmsgbyte(&s);
2270 
2271 						if (last_received < end_lsn)
2272 							last_received = end_lsn;
2273 
2274 						send_feedback(last_received, reply_requested, false);
2275 						UpdateWorkerStats(last_received, timestamp, true);
2276 					}
2277 					/* other message types are purposefully ignored */
2278 
2279 					MemoryContextReset(ApplyMessageContext);
2280 				}
2281 
2282 				len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
2283 			}
2284 		}
2285 
2286 		/* confirm all writes so far */
2287 		send_feedback(last_received, false, false);
2288 
2289 		if (!in_remote_transaction && !in_streamed_transaction)
2290 		{
2291 			/*
2292 			 * If we didn't get any transactions for a while there might be
2293 			 * unconsumed invalidation messages in the queue, consume them
2294 			 * now.
2295 			 */
2296 			AcceptInvalidationMessages();
2297 			maybe_reread_subscription();
2298 
2299 			/* Process any table synchronization changes. */
2300 			process_syncing_tables(last_received);
2301 		}
2302 
2303 		/* Cleanup the memory. */
2304 		MemoryContextResetAndDeleteChildren(ApplyMessageContext);
2305 		MemoryContextSwitchTo(TopMemoryContext);
2306 
2307 		/* Check if we need to exit the streaming loop. */
2308 		if (endofstream)
2309 			break;
2310 
2311 		/*
2312 		 * Wait for more data or latch.  If we have unflushed transactions,
2313 		 * wake up after WalWriterDelay to see if they've been flushed yet (in
2314 		 * which case we should send a feedback message).  Otherwise, there's
2315 		 * no particular urgency about waking up unless we get data or a
2316 		 * signal.
2317 		 */
2318 		if (!dlist_is_empty(&lsn_mapping))
2319 			wait_time = WalWriterDelay;
2320 		else
2321 			wait_time = NAPTIME_PER_CYCLE;
2322 
2323 		rc = WaitLatchOrSocket(MyLatch,
2324 							   WL_SOCKET_READABLE | WL_LATCH_SET |
2325 							   WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
2326 							   fd, wait_time,
2327 							   WAIT_EVENT_LOGICAL_APPLY_MAIN);
2328 
2329 		if (rc & WL_LATCH_SET)
2330 		{
2331 			ResetLatch(MyLatch);
2332 			CHECK_FOR_INTERRUPTS();
2333 		}
2334 
2335 		if (ConfigReloadPending)
2336 		{
2337 			ConfigReloadPending = false;
2338 			ProcessConfigFile(PGC_SIGHUP);
2339 		}
2340 
2341 		if (rc & WL_TIMEOUT)
2342 		{
2343 			/*
2344 			 * We didn't receive anything new. If we haven't heard anything
2345 			 * from the server for more than wal_receiver_timeout / 2, ping
2346 			 * the server. Also, if it's been longer than
2347 			 * wal_receiver_status_interval since the last update we sent,
2348 			 * send a status update to the primary anyway, to report any
2349 			 * progress in applying WAL.
2350 			 */
2351 			bool		requestReply = false;
2352 
2353 			/*
2354 			 * Check if time since last receive from primary has reached the
2355 			 * configured limit.
2356 			 */
2357 			if (wal_receiver_timeout > 0)
2358 			{
2359 				TimestampTz now = GetCurrentTimestamp();
2360 				TimestampTz timeout;
2361 
2362 				timeout =
2363 					TimestampTzPlusMilliseconds(last_recv_timestamp,
2364 												wal_receiver_timeout);
2365 
2366 				if (now >= timeout)
2367 					ereport(ERROR,
2368 							(errcode(ERRCODE_CONNECTION_FAILURE),
2369 							 errmsg("terminating logical replication worker due to timeout")));
2370 
2371 				/* Check to see if it's time for a ping. */
2372 				if (!ping_sent)
2373 				{
2374 					timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
2375 														  (wal_receiver_timeout / 2));
2376 					if (now >= timeout)
2377 					{
2378 						requestReply = true;
2379 						ping_sent = true;
2380 					}
2381 				}
2382 			}
2383 
2384 			send_feedback(last_received, requestReply, requestReply);
2385 		}
2386 	}
2387 
2388 	/* All done */
2389 	walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
2390 }
2391 
2392 /*
2393  * Send a Standby Status Update message to server.
2394  *
2395  * 'recvpos' is the latest LSN we've received data to, force is set if we need
2396  * to send a response to avoid timeouts.
2397  */
2398 static void
send_feedback(XLogRecPtr recvpos,bool force,bool requestReply)2399 send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
2400 {
2401 	static StringInfo reply_message = NULL;
2402 	static TimestampTz send_time = 0;
2403 
2404 	static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
2405 	static XLogRecPtr last_writepos = InvalidXLogRecPtr;
2406 	static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
2407 
2408 	XLogRecPtr	writepos;
2409 	XLogRecPtr	flushpos;
2410 	TimestampTz now;
2411 	bool		have_pending_txes;
2412 
2413 	/*
2414 	 * If the user doesn't want status to be reported to the publisher, be
2415 	 * sure to exit before doing anything at all.
2416 	 */
2417 	if (!force && wal_receiver_status_interval <= 0)
2418 		return;
2419 
2420 	/* It's legal to not pass a recvpos */
2421 	if (recvpos < last_recvpos)
2422 		recvpos = last_recvpos;
2423 
2424 	get_flush_position(&writepos, &flushpos, &have_pending_txes);
2425 
2426 	/*
2427 	 * No outstanding transactions to flush, we can report the latest received
2428 	 * position. This is important for synchronous replication.
2429 	 */
2430 	if (!have_pending_txes)
2431 		flushpos = writepos = recvpos;
2432 
2433 	if (writepos < last_writepos)
2434 		writepos = last_writepos;
2435 
2436 	if (flushpos < last_flushpos)
2437 		flushpos = last_flushpos;
2438 
2439 	now = GetCurrentTimestamp();
2440 
2441 	/* if we've already reported everything we're good */
2442 	if (!force &&
2443 		writepos == last_writepos &&
2444 		flushpos == last_flushpos &&
2445 		!TimestampDifferenceExceeds(send_time, now,
2446 									wal_receiver_status_interval * 1000))
2447 		return;
2448 	send_time = now;
2449 
2450 	if (!reply_message)
2451 	{
2452 		MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
2453 
2454 		reply_message = makeStringInfo();
2455 		MemoryContextSwitchTo(oldctx);
2456 	}
2457 	else
2458 		resetStringInfo(reply_message);
2459 
2460 	pq_sendbyte(reply_message, 'r');
2461 	pq_sendint64(reply_message, recvpos);	/* write */
2462 	pq_sendint64(reply_message, flushpos);	/* flush */
2463 	pq_sendint64(reply_message, writepos);	/* apply */
2464 	pq_sendint64(reply_message, now);	/* sendTime */
2465 	pq_sendbyte(reply_message, requestReply);	/* replyRequested */
2466 
2467 	elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
2468 		 force,
2469 		 LSN_FORMAT_ARGS(recvpos),
2470 		 LSN_FORMAT_ARGS(writepos),
2471 		 LSN_FORMAT_ARGS(flushpos));
2472 
2473 	walrcv_send(LogRepWorkerWalRcvConn,
2474 				reply_message->data, reply_message->len);
2475 
2476 	if (recvpos > last_recvpos)
2477 		last_recvpos = recvpos;
2478 	if (writepos > last_writepos)
2479 		last_writepos = writepos;
2480 	if (flushpos > last_flushpos)
2481 		last_flushpos = flushpos;
2482 }
2483 
2484 /*
2485  * Reread subscription info if needed. Most changes will be exit.
2486  */
2487 static void
maybe_reread_subscription(void)2488 maybe_reread_subscription(void)
2489 {
2490 	MemoryContext oldctx;
2491 	Subscription *newsub;
2492 	bool		started_tx = false;
2493 
2494 	/* When cache state is valid there is nothing to do here. */
2495 	if (MySubscriptionValid)
2496 		return;
2497 
2498 	/* This function might be called inside or outside of transaction. */
2499 	if (!IsTransactionState())
2500 	{
2501 		StartTransactionCommand();
2502 		started_tx = true;
2503 	}
2504 
2505 	/* Ensure allocations in permanent context. */
2506 	oldctx = MemoryContextSwitchTo(ApplyContext);
2507 
2508 	newsub = GetSubscription(MyLogicalRepWorker->subid, true);
2509 
2510 	/*
2511 	 * Exit if the subscription was removed. This normally should not happen
2512 	 * as the worker gets killed during DROP SUBSCRIPTION.
2513 	 */
2514 	if (!newsub)
2515 	{
2516 		ereport(LOG,
2517 				(errmsg("logical replication apply worker for subscription \"%s\" will "
2518 						"stop because the subscription was removed",
2519 						MySubscription->name)));
2520 
2521 		proc_exit(0);
2522 	}
2523 
2524 	/*
2525 	 * Exit if the subscription was disabled. This normally should not happen
2526 	 * as the worker gets killed during ALTER SUBSCRIPTION ... DISABLE.
2527 	 */
2528 	if (!newsub->enabled)
2529 	{
2530 		ereport(LOG,
2531 				(errmsg("logical replication apply worker for subscription \"%s\" will "
2532 						"stop because the subscription was disabled",
2533 						MySubscription->name)));
2534 
2535 		proc_exit(0);
2536 	}
2537 
2538 	/* !slotname should never happen when enabled is true. */
2539 	Assert(newsub->slotname);
2540 
2541 	/*
2542 	 * Exit if any parameter that affects the remote connection was changed.
2543 	 * The launcher will start a new worker.
2544 	 */
2545 	if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
2546 		strcmp(newsub->name, MySubscription->name) != 0 ||
2547 		strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
2548 		newsub->binary != MySubscription->binary ||
2549 		newsub->stream != MySubscription->stream ||
2550 		!equal(newsub->publications, MySubscription->publications))
2551 	{
2552 		ereport(LOG,
2553 				(errmsg("logical replication apply worker for subscription \"%s\" will restart because of a parameter change",
2554 						MySubscription->name)));
2555 
2556 		proc_exit(0);
2557 	}
2558 
2559 	/* Check for other changes that should never happen too. */
2560 	if (newsub->dbid != MySubscription->dbid)
2561 	{
2562 		elog(ERROR, "subscription %u changed unexpectedly",
2563 			 MyLogicalRepWorker->subid);
2564 	}
2565 
2566 	/* Clean old subscription info and switch to new one. */
2567 	FreeSubscription(MySubscription);
2568 	MySubscription = newsub;
2569 
2570 	MemoryContextSwitchTo(oldctx);
2571 
2572 	/* Change synchronous commit according to the user's wishes */
2573 	SetConfigOption("synchronous_commit", MySubscription->synccommit,
2574 					PGC_BACKEND, PGC_S_OVERRIDE);
2575 
2576 	if (started_tx)
2577 		CommitTransactionCommand();
2578 
2579 	MySubscriptionValid = true;
2580 }
2581 
2582 /*
2583  * Callback from subscription syscache invalidation.
2584  */
2585 static void
subscription_change_cb(Datum arg,int cacheid,uint32 hashvalue)2586 subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
2587 {
2588 	MySubscriptionValid = false;
2589 }
2590 
2591 /*
2592  * subxact_info_write
2593  *	  Store information about subxacts for a toplevel transaction.
2594  *
2595  * For each subxact we store offset of it's first change in the main file.
2596  * The file is always over-written as a whole.
2597  *
2598  * XXX We should only store subxacts that were not aborted yet.
2599  */
2600 static void
subxact_info_write(Oid subid,TransactionId xid)2601 subxact_info_write(Oid subid, TransactionId xid)
2602 {
2603 	char		path[MAXPGPATH];
2604 	Size		len;
2605 	StreamXidHash *ent;
2606 	BufFile    *fd;
2607 
2608 	Assert(TransactionIdIsValid(xid));
2609 
2610 	/* Find the xid entry in the xidhash */
2611 	ent = (StreamXidHash *) hash_search(xidhash,
2612 										(void *) &xid,
2613 										HASH_FIND,
2614 										NULL);
2615 	/* By this time we must have created the transaction entry */
2616 	Assert(ent);
2617 
2618 	/*
2619 	 * If there is no subtransaction then nothing to do, but if already have
2620 	 * subxact file then delete that.
2621 	 */
2622 	if (subxact_data.nsubxacts == 0)
2623 	{
2624 		if (ent->subxact_fileset)
2625 		{
2626 			cleanup_subxact_info();
2627 			SharedFileSetDeleteAll(ent->subxact_fileset);
2628 			pfree(ent->subxact_fileset);
2629 			ent->subxact_fileset = NULL;
2630 		}
2631 		return;
2632 	}
2633 
2634 	subxact_filename(path, subid, xid);
2635 
2636 	/*
2637 	 * Create the subxact file if it not already created, otherwise open the
2638 	 * existing file.
2639 	 */
2640 	if (ent->subxact_fileset == NULL)
2641 	{
2642 		MemoryContext oldctx;
2643 
2644 		/*
2645 		 * We need to maintain shared fileset across multiple stream
2646 		 * start/stop calls.  So, need to allocate it in a persistent context.
2647 		 */
2648 		oldctx = MemoryContextSwitchTo(ApplyContext);
2649 		ent->subxact_fileset = palloc(sizeof(SharedFileSet));
2650 		SharedFileSetInit(ent->subxact_fileset, NULL);
2651 		MemoryContextSwitchTo(oldctx);
2652 
2653 		fd = BufFileCreateShared(ent->subxact_fileset, path);
2654 	}
2655 	else
2656 		fd = BufFileOpenShared(ent->subxact_fileset, path, O_RDWR);
2657 
2658 	len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
2659 
2660 	/* Write the subxact count and subxact info */
2661 	BufFileWrite(fd, &subxact_data.nsubxacts, sizeof(subxact_data.nsubxacts));
2662 	BufFileWrite(fd, subxact_data.subxacts, len);
2663 
2664 	BufFileClose(fd);
2665 
2666 	/* free the memory allocated for subxact info */
2667 	cleanup_subxact_info();
2668 }
2669 
2670 /*
2671  * subxact_info_read
2672  *	  Restore information about subxacts of a streamed transaction.
2673  *
2674  * Read information about subxacts into the structure subxact_data that can be
2675  * used later.
2676  */
2677 static void
subxact_info_read(Oid subid,TransactionId xid)2678 subxact_info_read(Oid subid, TransactionId xid)
2679 {
2680 	char		path[MAXPGPATH];
2681 	Size		len;
2682 	BufFile    *fd;
2683 	StreamXidHash *ent;
2684 	MemoryContext oldctx;
2685 
2686 	Assert(!subxact_data.subxacts);
2687 	Assert(subxact_data.nsubxacts == 0);
2688 	Assert(subxact_data.nsubxacts_max == 0);
2689 
2690 	/* Find the stream xid entry in the xidhash */
2691 	ent = (StreamXidHash *) hash_search(xidhash,
2692 										(void *) &xid,
2693 										HASH_FIND,
2694 										NULL);
2695 	if (!ent)
2696 		ereport(ERROR,
2697 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
2698 				 errmsg_internal("transaction %u not found in stream XID hash table",
2699 								 xid)));
2700 
2701 	/*
2702 	 * If subxact_fileset is not valid that mean we don't have any subxact
2703 	 * info
2704 	 */
2705 	if (ent->subxact_fileset == NULL)
2706 		return;
2707 
2708 	subxact_filename(path, subid, xid);
2709 
2710 	fd = BufFileOpenShared(ent->subxact_fileset, path, O_RDONLY);
2711 
2712 	/* read number of subxact items */
2713 	if (BufFileRead(fd, &subxact_data.nsubxacts,
2714 					sizeof(subxact_data.nsubxacts)) !=
2715 		sizeof(subxact_data.nsubxacts))
2716 		ereport(ERROR,
2717 				(errcode_for_file_access(),
2718 				 errmsg("could not read from streaming transaction's subxact file \"%s\": %m",
2719 						path)));
2720 
2721 	len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
2722 
2723 	/* we keep the maximum as a power of 2 */
2724 	subxact_data.nsubxacts_max = 1 << my_log2(subxact_data.nsubxacts);
2725 
2726 	/*
2727 	 * Allocate subxact information in the logical streaming context. We need
2728 	 * this information during the complete stream so that we can add the sub
2729 	 * transaction info to this. On stream stop we will flush this information
2730 	 * to the subxact file and reset the logical streaming context.
2731 	 */
2732 	oldctx = MemoryContextSwitchTo(LogicalStreamingContext);
2733 	subxact_data.subxacts = palloc(subxact_data.nsubxacts_max *
2734 								   sizeof(SubXactInfo));
2735 	MemoryContextSwitchTo(oldctx);
2736 
2737 	if ((len > 0) && ((BufFileRead(fd, subxact_data.subxacts, len)) != len))
2738 		ereport(ERROR,
2739 				(errcode_for_file_access(),
2740 				 errmsg("could not read from streaming transaction's subxact file \"%s\": %m",
2741 						path)));
2742 
2743 	BufFileClose(fd);
2744 }
2745 
2746 /*
2747  * subxact_info_add
2748  *	  Add information about a subxact (offset in the main file).
2749  */
2750 static void
subxact_info_add(TransactionId xid)2751 subxact_info_add(TransactionId xid)
2752 {
2753 	SubXactInfo *subxacts = subxact_data.subxacts;
2754 	int64		i;
2755 
2756 	/* We must have a valid top level stream xid and a stream fd. */
2757 	Assert(TransactionIdIsValid(stream_xid));
2758 	Assert(stream_fd != NULL);
2759 
2760 	/*
2761 	 * If the XID matches the toplevel transaction, we don't want to add it.
2762 	 */
2763 	if (stream_xid == xid)
2764 		return;
2765 
2766 	/*
2767 	 * In most cases we're checking the same subxact as we've already seen in
2768 	 * the last call, so make sure to ignore it (this change comes later).
2769 	 */
2770 	if (subxact_data.subxact_last == xid)
2771 		return;
2772 
2773 	/* OK, remember we're processing this XID. */
2774 	subxact_data.subxact_last = xid;
2775 
2776 	/*
2777 	 * Check if the transaction is already present in the array of subxact. We
2778 	 * intentionally scan the array from the tail, because we're likely adding
2779 	 * a change for the most recent subtransactions.
2780 	 *
2781 	 * XXX Can we rely on the subxact XIDs arriving in sorted order? That
2782 	 * would allow us to use binary search here.
2783 	 */
2784 	for (i = subxact_data.nsubxacts; i > 0; i--)
2785 	{
2786 		/* found, so we're done */
2787 		if (subxacts[i - 1].xid == xid)
2788 			return;
2789 	}
2790 
2791 	/* This is a new subxact, so we need to add it to the array. */
2792 	if (subxact_data.nsubxacts == 0)
2793 	{
2794 		MemoryContext oldctx;
2795 
2796 		subxact_data.nsubxacts_max = 128;
2797 
2798 		/*
2799 		 * Allocate this memory for subxacts in per-stream context, see
2800 		 * subxact_info_read.
2801 		 */
2802 		oldctx = MemoryContextSwitchTo(LogicalStreamingContext);
2803 		subxacts = palloc(subxact_data.nsubxacts_max * sizeof(SubXactInfo));
2804 		MemoryContextSwitchTo(oldctx);
2805 	}
2806 	else if (subxact_data.nsubxacts == subxact_data.nsubxacts_max)
2807 	{
2808 		subxact_data.nsubxacts_max *= 2;
2809 		subxacts = repalloc(subxacts,
2810 							subxact_data.nsubxacts_max * sizeof(SubXactInfo));
2811 	}
2812 
2813 	subxacts[subxact_data.nsubxacts].xid = xid;
2814 
2815 	/*
2816 	 * Get the current offset of the stream file and store it as offset of
2817 	 * this subxact.
2818 	 */
2819 	BufFileTell(stream_fd,
2820 				&subxacts[subxact_data.nsubxacts].fileno,
2821 				&subxacts[subxact_data.nsubxacts].offset);
2822 
2823 	subxact_data.nsubxacts++;
2824 	subxact_data.subxacts = subxacts;
2825 }
2826 
2827 /* format filename for file containing the info about subxacts */
2828 static inline void
subxact_filename(char * path,Oid subid,TransactionId xid)2829 subxact_filename(char *path, Oid subid, TransactionId xid)
2830 {
2831 	snprintf(path, MAXPGPATH, "%u-%u.subxacts", subid, xid);
2832 }
2833 
2834 /* format filename for file containing serialized changes */
2835 static inline void
changes_filename(char * path,Oid subid,TransactionId xid)2836 changes_filename(char *path, Oid subid, TransactionId xid)
2837 {
2838 	snprintf(path, MAXPGPATH, "%u-%u.changes", subid, xid);
2839 }
2840 
2841 /*
2842  * stream_cleanup_files
2843  *	  Cleanup files for a subscription / toplevel transaction.
2844  *
2845  * Remove files with serialized changes and subxact info for a particular
2846  * toplevel transaction. Each subscription has a separate set of files.
2847  */
2848 static void
stream_cleanup_files(Oid subid,TransactionId xid)2849 stream_cleanup_files(Oid subid, TransactionId xid)
2850 {
2851 	char		path[MAXPGPATH];
2852 	StreamXidHash *ent;
2853 
2854 	/* Find the xid entry in the xidhash */
2855 	ent = (StreamXidHash *) hash_search(xidhash,
2856 										(void *) &xid,
2857 										HASH_FIND,
2858 										NULL);
2859 	if (!ent)
2860 		ereport(ERROR,
2861 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
2862 				 errmsg_internal("transaction %u not found in stream XID hash table",
2863 								 xid)));
2864 
2865 	/* Delete the change file and release the stream fileset memory */
2866 	changes_filename(path, subid, xid);
2867 	SharedFileSetDeleteAll(ent->stream_fileset);
2868 	pfree(ent->stream_fileset);
2869 	ent->stream_fileset = NULL;
2870 
2871 	/* Delete the subxact file and release the memory, if it exist */
2872 	if (ent->subxact_fileset)
2873 	{
2874 		subxact_filename(path, subid, xid);
2875 		SharedFileSetDeleteAll(ent->subxact_fileset);
2876 		pfree(ent->subxact_fileset);
2877 		ent->subxact_fileset = NULL;
2878 	}
2879 
2880 	/* Remove the xid entry from the stream xid hash */
2881 	hash_search(xidhash, (void *) &xid, HASH_REMOVE, NULL);
2882 }
2883 
2884 /*
2885  * stream_open_file
2886  *	  Open a file that we'll use to serialize changes for a toplevel
2887  * transaction.
2888  *
2889  * Open a file for streamed changes from a toplevel transaction identified
2890  * by stream_xid (global variable). If it's the first chunk of streamed
2891  * changes for this transaction, initialize the shared fileset and create the
2892  * buffile, otherwise open the previously created file.
2893  *
2894  * This can only be called at the beginning of a "streaming" block, i.e.
2895  * between stream_start/stream_stop messages from the upstream.
2896  */
2897 static void
stream_open_file(Oid subid,TransactionId xid,bool first_segment)2898 stream_open_file(Oid subid, TransactionId xid, bool first_segment)
2899 {
2900 	char		path[MAXPGPATH];
2901 	bool		found;
2902 	MemoryContext oldcxt;
2903 	StreamXidHash *ent;
2904 
2905 	Assert(in_streamed_transaction);
2906 	Assert(OidIsValid(subid));
2907 	Assert(TransactionIdIsValid(xid));
2908 	Assert(stream_fd == NULL);
2909 
2910 	/* create or find the xid entry in the xidhash */
2911 	ent = (StreamXidHash *) hash_search(xidhash,
2912 										(void *) &xid,
2913 										HASH_ENTER,
2914 										&found);
2915 
2916 	changes_filename(path, subid, xid);
2917 	elog(DEBUG1, "opening file \"%s\" for streamed changes", path);
2918 
2919 	/*
2920 	 * Create/open the buffiles under the logical streaming context so that we
2921 	 * have those files until stream stop.
2922 	 */
2923 	oldcxt = MemoryContextSwitchTo(LogicalStreamingContext);
2924 
2925 	/*
2926 	 * If this is the first streamed segment, the file must not exist, so make
2927 	 * sure we're the ones creating it. Otherwise just open the file for
2928 	 * writing, in append mode.
2929 	 */
2930 	if (first_segment)
2931 	{
2932 		MemoryContext savectx;
2933 		SharedFileSet *fileset;
2934 
2935 		if (found)
2936 			ereport(ERROR,
2937 					(errcode(ERRCODE_PROTOCOL_VIOLATION),
2938 					 errmsg_internal("incorrect first-segment flag for streamed replication transaction")));
2939 
2940 		/*
2941 		 * We need to maintain shared fileset across multiple stream
2942 		 * start/stop calls. So, need to allocate it in a persistent context.
2943 		 */
2944 		savectx = MemoryContextSwitchTo(ApplyContext);
2945 		fileset = palloc(sizeof(SharedFileSet));
2946 
2947 		SharedFileSetInit(fileset, NULL);
2948 		MemoryContextSwitchTo(savectx);
2949 
2950 		stream_fd = BufFileCreateShared(fileset, path);
2951 
2952 		/* Remember the fileset for the next stream of the same transaction */
2953 		ent->xid = xid;
2954 		ent->stream_fileset = fileset;
2955 		ent->subxact_fileset = NULL;
2956 	}
2957 	else
2958 	{
2959 		if (!found)
2960 			ereport(ERROR,
2961 					(errcode(ERRCODE_PROTOCOL_VIOLATION),
2962 					 errmsg_internal("incorrect first-segment flag for streamed replication transaction")));
2963 
2964 		/*
2965 		 * Open the file and seek to the end of the file because we always
2966 		 * append the changes file.
2967 		 */
2968 		stream_fd = BufFileOpenShared(ent->stream_fileset, path, O_RDWR);
2969 		BufFileSeek(stream_fd, 0, 0, SEEK_END);
2970 	}
2971 
2972 	MemoryContextSwitchTo(oldcxt);
2973 }
2974 
2975 /*
2976  * stream_close_file
2977  *	  Close the currently open file with streamed changes.
2978  *
2979  * This can only be called at the end of a streaming block, i.e. at stream_stop
2980  * message from the upstream.
2981  */
2982 static void
stream_close_file(void)2983 stream_close_file(void)
2984 {
2985 	Assert(in_streamed_transaction);
2986 	Assert(TransactionIdIsValid(stream_xid));
2987 	Assert(stream_fd != NULL);
2988 
2989 	BufFileClose(stream_fd);
2990 
2991 	stream_xid = InvalidTransactionId;
2992 	stream_fd = NULL;
2993 }
2994 
2995 /*
2996  * stream_write_change
2997  *	  Serialize a change to a file for the current toplevel transaction.
2998  *
2999  * The change is serialized in a simple format, with length (not including
3000  * the length), action code (identifying the message type) and message
3001  * contents (without the subxact TransactionId value).
3002  */
3003 static void
stream_write_change(char action,StringInfo s)3004 stream_write_change(char action, StringInfo s)
3005 {
3006 	int			len;
3007 
3008 	Assert(in_streamed_transaction);
3009 	Assert(TransactionIdIsValid(stream_xid));
3010 	Assert(stream_fd != NULL);
3011 
3012 	/* total on-disk size, including the action type character */
3013 	len = (s->len - s->cursor) + sizeof(char);
3014 
3015 	/* first write the size */
3016 	BufFileWrite(stream_fd, &len, sizeof(len));
3017 
3018 	/* then the action */
3019 	BufFileWrite(stream_fd, &action, sizeof(action));
3020 
3021 	/* and finally the remaining part of the buffer (after the XID) */
3022 	len = (s->len - s->cursor);
3023 
3024 	BufFileWrite(stream_fd, &s->data[s->cursor], len);
3025 }
3026 
3027 /*
3028  * Cleanup the memory for subxacts and reset the related variables.
3029  */
3030 static inline void
cleanup_subxact_info()3031 cleanup_subxact_info()
3032 {
3033 	if (subxact_data.subxacts)
3034 		pfree(subxact_data.subxacts);
3035 
3036 	subxact_data.subxacts = NULL;
3037 	subxact_data.subxact_last = InvalidTransactionId;
3038 	subxact_data.nsubxacts = 0;
3039 	subxact_data.nsubxacts_max = 0;
3040 }
3041 
3042 /* Logical Replication Apply worker entry point */
3043 void
ApplyWorkerMain(Datum main_arg)3044 ApplyWorkerMain(Datum main_arg)
3045 {
3046 	int			worker_slot = DatumGetInt32(main_arg);
3047 	MemoryContext oldctx;
3048 	char		originname[NAMEDATALEN];
3049 	XLogRecPtr	origin_startpos;
3050 	char	   *myslotname;
3051 	WalRcvStreamOptions options;
3052 
3053 	/* Attach to slot */
3054 	logicalrep_worker_attach(worker_slot);
3055 
3056 	/* Setup signal handling */
3057 	pqsignal(SIGHUP, SignalHandlerForConfigReload);
3058 	pqsignal(SIGTERM, die);
3059 	BackgroundWorkerUnblockSignals();
3060 
3061 	/*
3062 	 * We don't currently need any ResourceOwner in a walreceiver process, but
3063 	 * if we did, we could call CreateAuxProcessResourceOwner here.
3064 	 */
3065 
3066 	/* Initialise stats to a sanish value */
3067 	MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
3068 		MyLogicalRepWorker->reply_time = GetCurrentTimestamp();
3069 
3070 	/* Load the libpq-specific functions */
3071 	load_file("libpqwalreceiver", false);
3072 
3073 	/* Run as replica session replication role. */
3074 	SetConfigOption("session_replication_role", "replica",
3075 					PGC_SUSET, PGC_S_OVERRIDE);
3076 
3077 	/* Connect to our database. */
3078 	BackgroundWorkerInitializeConnectionByOid(MyLogicalRepWorker->dbid,
3079 											  MyLogicalRepWorker->userid,
3080 											  0);
3081 
3082 	/*
3083 	 * Set always-secure search path, so malicious users can't redirect user
3084 	 * code (e.g. pg_index.indexprs).
3085 	 */
3086 	SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
3087 
3088 	/* Load the subscription into persistent memory context. */
3089 	ApplyContext = AllocSetContextCreate(TopMemoryContext,
3090 										 "ApplyContext",
3091 										 ALLOCSET_DEFAULT_SIZES);
3092 	StartTransactionCommand();
3093 	oldctx = MemoryContextSwitchTo(ApplyContext);
3094 
3095 	MySubscription = GetSubscription(MyLogicalRepWorker->subid, true);
3096 	if (!MySubscription)
3097 	{
3098 		ereport(LOG,
3099 				(errmsg("logical replication apply worker for subscription %u will not "
3100 						"start because the subscription was removed during startup",
3101 						MyLogicalRepWorker->subid)));
3102 		proc_exit(0);
3103 	}
3104 
3105 	MySubscriptionValid = true;
3106 	MemoryContextSwitchTo(oldctx);
3107 
3108 	if (!MySubscription->enabled)
3109 	{
3110 		ereport(LOG,
3111 				(errmsg("logical replication apply worker for subscription \"%s\" will not "
3112 						"start because the subscription was disabled during startup",
3113 						MySubscription->name)));
3114 
3115 		proc_exit(0);
3116 	}
3117 
3118 	/* Setup synchronous commit according to the user's wishes */
3119 	SetConfigOption("synchronous_commit", MySubscription->synccommit,
3120 					PGC_BACKEND, PGC_S_OVERRIDE);
3121 
3122 	/* Keep us informed about subscription changes. */
3123 	CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
3124 								  subscription_change_cb,
3125 								  (Datum) 0);
3126 
3127 	if (am_tablesync_worker())
3128 		ereport(LOG,
3129 				(errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
3130 						MySubscription->name, get_rel_name(MyLogicalRepWorker->relid))));
3131 	else
3132 		ereport(LOG,
3133 				(errmsg("logical replication apply worker for subscription \"%s\" has started",
3134 						MySubscription->name)));
3135 
3136 	CommitTransactionCommand();
3137 
3138 	/* Connect to the origin and start the replication. */
3139 	elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
3140 		 MySubscription->conninfo);
3141 
3142 	if (am_tablesync_worker())
3143 	{
3144 		char	   *syncslotname;
3145 
3146 		/* This is table synchronization worker, call initial sync. */
3147 		syncslotname = LogicalRepSyncTableStart(&origin_startpos);
3148 
3149 		/* allocate slot name in long-lived context */
3150 		myslotname = MemoryContextStrdup(ApplyContext, syncslotname);
3151 
3152 		pfree(syncslotname);
3153 	}
3154 	else
3155 	{
3156 		/* This is main apply worker */
3157 		RepOriginId originid;
3158 		TimeLineID	startpointTLI;
3159 		char	   *err;
3160 
3161 		myslotname = MySubscription->slotname;
3162 
3163 		/*
3164 		 * This shouldn't happen if the subscription is enabled, but guard
3165 		 * against DDL bugs or manual catalog changes.  (libpqwalreceiver will
3166 		 * crash if slot is NULL.)
3167 		 */
3168 		if (!myslotname)
3169 			ereport(ERROR,
3170 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3171 					 errmsg("subscription has no replication slot set")));
3172 
3173 		/* Setup replication origin tracking. */
3174 		StartTransactionCommand();
3175 		snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid);
3176 		originid = replorigin_by_name(originname, true);
3177 		if (!OidIsValid(originid))
3178 			originid = replorigin_create(originname);
3179 		replorigin_session_setup(originid);
3180 		replorigin_session_origin = originid;
3181 		origin_startpos = replorigin_session_get_progress(false);
3182 		CommitTransactionCommand();
3183 
3184 		LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
3185 												MySubscription->name, &err);
3186 		if (LogRepWorkerWalRcvConn == NULL)
3187 			ereport(ERROR,
3188 					(errcode(ERRCODE_CONNECTION_FAILURE),
3189 					 errmsg("could not connect to the publisher: %s", err)));
3190 
3191 		/*
3192 		 * We don't really use the output identify_system for anything but it
3193 		 * does some initializations on the upstream so let's still call it.
3194 		 */
3195 		(void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
3196 	}
3197 
3198 	/*
3199 	 * Setup callback for syscache so that we know when something changes in
3200 	 * the subscription relation state.
3201 	 */
3202 	CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
3203 								  invalidate_syncing_table_states,
3204 								  (Datum) 0);
3205 
3206 	/* Build logical replication streaming options. */
3207 	options.logical = true;
3208 	options.startpoint = origin_startpos;
3209 	options.slotname = myslotname;
3210 	options.proto.logical.proto_version =
3211 		walrcv_server_version(LogRepWorkerWalRcvConn) >= 140000 ?
3212 		LOGICALREP_PROTO_STREAM_VERSION_NUM : LOGICALREP_PROTO_VERSION_NUM;
3213 	options.proto.logical.publication_names = MySubscription->publications;
3214 	options.proto.logical.binary = MySubscription->binary;
3215 	options.proto.logical.streaming = MySubscription->stream;
3216 
3217 	/* Start normal logical streaming replication. */
3218 	walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
3219 
3220 	/* Run the main loop. */
3221 	LogicalRepApplyLoop(origin_startpos);
3222 
3223 	proc_exit(0);
3224 }
3225 
3226 /*
3227  * Is current process a logical replication worker?
3228  */
3229 bool
IsLogicalWorker(void)3230 IsLogicalWorker(void)
3231 {
3232 	return MyLogicalRepWorker != NULL;
3233 }
3234