1 /*-------------------------------------------------------------------------
2 * worker.c
3 * PostgreSQL logical replication worker (apply)
4 *
5 * Copyright (c) 2016-2021, PostgreSQL Global Development Group
6 *
7 * IDENTIFICATION
8 * src/backend/replication/logical/worker.c
9 *
10 * NOTES
11 * This file contains the worker which applies logical changes as they come
12 * from remote logical replication stream.
13 *
14 * The main worker (apply) is started by logical replication worker
15 * launcher for every enabled subscription in a database. It uses
16 * walsender protocol to communicate with publisher.
17 *
18 * This module includes server facing code and shares libpqwalreceiver
19 * module with walreceiver for providing the libpq specific functionality.
20 *
21 *
22 * STREAMED TRANSACTIONS
23 * ---------------------
24 * Streamed transactions (large transactions exceeding a memory limit on the
25 * upstream) are not applied immediately, but instead, the data is written
26 * to temporary files and then applied at once when the final commit arrives.
27 *
28 * Unlike the regular (non-streamed) case, handling streamed transactions has
29 * to handle aborts of both the toplevel transaction and subtransactions. This
30 * is achieved by tracking offsets for subtransactions, which is then used
31 * to truncate the file with serialized changes.
32 *
33 * The files are placed in tmp file directory by default, and the filenames
34 * include both the XID of the toplevel transaction and OID of the
35 * subscription. This is necessary so that different workers processing a
36 * remote transaction with the same XID doesn't interfere.
37 *
38 * We use BufFiles instead of using normal temporary files because (a) the
39 * BufFile infrastructure supports temporary files that exceed the OS file size
40 * limit, (b) provides a way for automatic clean up on the error and (c) provides
41 * a way to survive these files across local transactions and allow to open and
42 * close at stream start and close. We decided to use SharedFileSet
43 * infrastructure as without that it deletes the files on the closure of the
44 * file and if we decide to keep stream files open across the start/stop stream
45 * then it will consume a lot of memory (more than 8K for each BufFile and
46 * there could be multiple such BufFiles as the subscriber could receive
47 * multiple start/stop streams for different transactions before getting the
48 * commit). Moreover, if we don't use SharedFileSet then we also need to invent
49 * a new way to pass filenames to BufFile APIs so that we are allowed to open
50 * the file we desired across multiple stream-open calls for the same
51 * transaction.
52 *-------------------------------------------------------------------------
53 */
54
55 #include "postgres.h"
56
57 #include <sys/stat.h>
58 #include <unistd.h>
59
60 #include "access/table.h"
61 #include "access/tableam.h"
62 #include "access/xact.h"
63 #include "access/xlog_internal.h"
64 #include "catalog/catalog.h"
65 #include "catalog/namespace.h"
66 #include "catalog/partition.h"
67 #include "catalog/pg_inherits.h"
68 #include "catalog/pg_subscription.h"
69 #include "catalog/pg_subscription_rel.h"
70 #include "catalog/pg_tablespace.h"
71 #include "commands/tablecmds.h"
72 #include "commands/tablespace.h"
73 #include "commands/trigger.h"
74 #include "executor/executor.h"
75 #include "executor/execPartition.h"
76 #include "executor/nodeModifyTable.h"
77 #include "funcapi.h"
78 #include "libpq/pqformat.h"
79 #include "libpq/pqsignal.h"
80 #include "mb/pg_wchar.h"
81 #include "miscadmin.h"
82 #include "nodes/makefuncs.h"
83 #include "optimizer/optimizer.h"
84 #include "pgstat.h"
85 #include "postmaster/bgworker.h"
86 #include "postmaster/interrupt.h"
87 #include "postmaster/postmaster.h"
88 #include "postmaster/walwriter.h"
89 #include "replication/decode.h"
90 #include "replication/logical.h"
91 #include "replication/logicalproto.h"
92 #include "replication/logicalrelation.h"
93 #include "replication/logicalworker.h"
94 #include "replication/origin.h"
95 #include "replication/reorderbuffer.h"
96 #include "replication/snapbuild.h"
97 #include "replication/walreceiver.h"
98 #include "replication/worker_internal.h"
99 #include "rewrite/rewriteHandler.h"
100 #include "storage/buffile.h"
101 #include "storage/bufmgr.h"
102 #include "storage/fd.h"
103 #include "storage/ipc.h"
104 #include "storage/lmgr.h"
105 #include "storage/proc.h"
106 #include "storage/procarray.h"
107 #include "tcop/tcopprot.h"
108 #include "utils/builtins.h"
109 #include "utils/catcache.h"
110 #include "utils/dynahash.h"
111 #include "utils/datum.h"
112 #include "utils/fmgroids.h"
113 #include "utils/guc.h"
114 #include "utils/inval.h"
115 #include "utils/lsyscache.h"
116 #include "utils/memutils.h"
117 #include "utils/rel.h"
118 #include "utils/syscache.h"
119 #include "utils/timeout.h"
120
121 #define NAPTIME_PER_CYCLE 1000 /* max sleep time between cycles (1s) */
122
123 typedef struct FlushPosition
124 {
125 dlist_node node;
126 XLogRecPtr local_end;
127 XLogRecPtr remote_end;
128 } FlushPosition;
129
130 static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
131
132 typedef struct SlotErrCallbackArg
133 {
134 LogicalRepRelMapEntry *rel;
135 int remote_attnum;
136 } SlotErrCallbackArg;
137
138 typedef struct ApplyExecutionData
139 {
140 EState *estate; /* executor state, used to track resources */
141
142 LogicalRepRelMapEntry *targetRel; /* replication target rel */
143 ResultRelInfo *targetRelInfo; /* ResultRelInfo for same */
144
145 /* These fields are used when the target relation is partitioned: */
146 ModifyTableState *mtstate; /* dummy ModifyTable state */
147 PartitionTupleRouting *proute; /* partition routing info */
148 } ApplyExecutionData;
149
150 /*
151 * Stream xid hash entry. Whenever we see a new xid we create this entry in the
152 * xidhash and along with it create the streaming file and store the fileset handle.
153 * The subxact file is created iff there is any subxact info under this xid. This
154 * entry is used on the subsequent streams for the xid to get the corresponding
155 * fileset handles, so storing them in hash makes the search faster.
156 */
157 typedef struct StreamXidHash
158 {
159 TransactionId xid; /* xid is the hash key and must be first */
160 SharedFileSet *stream_fileset; /* shared file set for stream data */
161 SharedFileSet *subxact_fileset; /* shared file set for subxact info */
162 } StreamXidHash;
163
164 static MemoryContext ApplyMessageContext = NULL;
165 MemoryContext ApplyContext = NULL;
166
167 /* per stream context for streaming transactions */
168 static MemoryContext LogicalStreamingContext = NULL;
169
170 WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
171
172 Subscription *MySubscription = NULL;
173 bool MySubscriptionValid = false;
174
175 bool in_remote_transaction = false;
176 static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
177
178 /* fields valid only when processing streamed transaction */
179 static bool in_streamed_transaction = false;
180
181 static TransactionId stream_xid = InvalidTransactionId;
182
183 /*
184 * Hash table for storing the streaming xid information along with shared file
185 * set for streaming and subxact files.
186 */
187 static HTAB *xidhash = NULL;
188
189 /* BufFile handle of the current streaming file */
190 static BufFile *stream_fd = NULL;
191
192 typedef struct SubXactInfo
193 {
194 TransactionId xid; /* XID of the subxact */
195 int fileno; /* file number in the buffile */
196 off_t offset; /* offset in the file */
197 } SubXactInfo;
198
199 /* Sub-transaction data for the current streaming transaction */
200 typedef struct ApplySubXactData
201 {
202 uint32 nsubxacts; /* number of sub-transactions */
203 uint32 nsubxacts_max; /* current capacity of subxacts */
204 TransactionId subxact_last; /* xid of the last sub-transaction */
205 SubXactInfo *subxacts; /* sub-xact offset in changes file */
206 } ApplySubXactData;
207
208 static ApplySubXactData subxact_data = {0, 0, InvalidTransactionId, NULL};
209
210 static inline void subxact_filename(char *path, Oid subid, TransactionId xid);
211 static inline void changes_filename(char *path, Oid subid, TransactionId xid);
212
213 /*
214 * Information about subtransactions of a given toplevel transaction.
215 */
216 static void subxact_info_write(Oid subid, TransactionId xid);
217 static void subxact_info_read(Oid subid, TransactionId xid);
218 static void subxact_info_add(TransactionId xid);
219 static inline void cleanup_subxact_info(void);
220
221 /*
222 * Serialize and deserialize changes for a toplevel transaction.
223 */
224 static void stream_cleanup_files(Oid subid, TransactionId xid);
225 static void stream_open_file(Oid subid, TransactionId xid, bool first);
226 static void stream_write_change(char action, StringInfo s);
227 static void stream_close_file(void);
228
229 static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
230
231 static void store_flush_position(XLogRecPtr remote_lsn);
232
233 static void maybe_reread_subscription(void);
234
235 /* prototype needed because of stream_commit */
236 static void apply_dispatch(StringInfo s);
237
238 static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
239 static void apply_handle_insert_internal(ApplyExecutionData *edata,
240 ResultRelInfo *relinfo,
241 TupleTableSlot *remoteslot);
242 static void apply_handle_update_internal(ApplyExecutionData *edata,
243 ResultRelInfo *relinfo,
244 TupleTableSlot *remoteslot,
245 LogicalRepTupleData *newtup);
246 static void apply_handle_delete_internal(ApplyExecutionData *edata,
247 ResultRelInfo *relinfo,
248 TupleTableSlot *remoteslot);
249 static bool FindReplTupleInLocalRel(EState *estate, Relation localrel,
250 LogicalRepRelation *remoterel,
251 TupleTableSlot *remoteslot,
252 TupleTableSlot **localslot);
253 static void apply_handle_tuple_routing(ApplyExecutionData *edata,
254 TupleTableSlot *remoteslot,
255 LogicalRepTupleData *newtup,
256 CmdType operation);
257
258 /*
259 * Should this worker apply changes for given relation.
260 *
261 * This is mainly needed for initial relation data sync as that runs in
262 * separate worker process running in parallel and we need some way to skip
263 * changes coming to the main apply worker during the sync of a table.
264 *
265 * Note we need to do smaller or equals comparison for SYNCDONE state because
266 * it might hold position of end of initial slot consistent point WAL
267 * record + 1 (ie start of next record) and next record can be COMMIT of
268 * transaction we are now processing (which is what we set remote_final_lsn
269 * to in apply_handle_begin).
270 */
271 static bool
should_apply_changes_for_rel(LogicalRepRelMapEntry * rel)272 should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
273 {
274 if (am_tablesync_worker())
275 return MyLogicalRepWorker->relid == rel->localreloid;
276 else
277 return (rel->state == SUBREL_STATE_READY ||
278 (rel->state == SUBREL_STATE_SYNCDONE &&
279 rel->statelsn <= remote_final_lsn));
280 }
281
282 /*
283 * Begin one step (one INSERT, UPDATE, etc) of a replication transaction.
284 *
285 * Start a transaction, if this is the first step (else we keep using the
286 * existing transaction).
287 * Also provide a global snapshot and ensure we run in ApplyMessageContext.
288 */
289 static void
begin_replication_step(void)290 begin_replication_step(void)
291 {
292 SetCurrentStatementStartTimestamp();
293
294 if (!IsTransactionState())
295 {
296 StartTransactionCommand();
297 maybe_reread_subscription();
298 }
299
300 PushActiveSnapshot(GetTransactionSnapshot());
301
302 MemoryContextSwitchTo(ApplyMessageContext);
303 }
304
305 /*
306 * Finish up one step of a replication transaction.
307 * Callers of begin_replication_step() must also call this.
308 *
309 * We don't close out the transaction here, but we should increment
310 * the command counter to make the effects of this step visible.
311 */
312 static void
end_replication_step(void)313 end_replication_step(void)
314 {
315 PopActiveSnapshot();
316
317 CommandCounterIncrement();
318 }
319
320 /*
321 * Handle streamed transactions.
322 *
323 * If in streaming mode (receiving a block of streamed transaction), we
324 * simply redirect it to a file for the proper toplevel transaction.
325 *
326 * Returns true for streamed transactions, false otherwise (regular mode).
327 */
328 static bool
handle_streamed_transaction(LogicalRepMsgType action,StringInfo s)329 handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
330 {
331 TransactionId xid;
332
333 /* not in streaming mode */
334 if (!in_streamed_transaction)
335 return false;
336
337 Assert(stream_fd != NULL);
338 Assert(TransactionIdIsValid(stream_xid));
339
340 /*
341 * We should have received XID of the subxact as the first part of the
342 * message, so extract it.
343 */
344 xid = pq_getmsgint(s, 4);
345
346 if (!TransactionIdIsValid(xid))
347 ereport(ERROR,
348 (errcode(ERRCODE_PROTOCOL_VIOLATION),
349 errmsg_internal("invalid transaction ID in streamed replication transaction")));
350
351 /* Add the new subxact to the array (unless already there). */
352 subxact_info_add(xid);
353
354 /* write the change to the current file */
355 stream_write_change(action, s);
356
357 return true;
358 }
359
360 /*
361 * Executor state preparation for evaluation of constraint expressions,
362 * indexes and triggers for the specified relation.
363 *
364 * Note that the caller must open and close any indexes to be updated.
365 */
366 static ApplyExecutionData *
create_edata_for_relation(LogicalRepRelMapEntry * rel)367 create_edata_for_relation(LogicalRepRelMapEntry *rel)
368 {
369 ApplyExecutionData *edata;
370 EState *estate;
371 RangeTblEntry *rte;
372 ResultRelInfo *resultRelInfo;
373
374 edata = (ApplyExecutionData *) palloc0(sizeof(ApplyExecutionData));
375 edata->targetRel = rel;
376
377 edata->estate = estate = CreateExecutorState();
378
379 rte = makeNode(RangeTblEntry);
380 rte->rtekind = RTE_RELATION;
381 rte->relid = RelationGetRelid(rel->localrel);
382 rte->relkind = rel->localrel->rd_rel->relkind;
383 rte->rellockmode = AccessShareLock;
384 ExecInitRangeTable(estate, list_make1(rte));
385
386 edata->targetRelInfo = resultRelInfo = makeNode(ResultRelInfo);
387
388 /*
389 * Use Relation opened by logicalrep_rel_open() instead of opening it
390 * again.
391 */
392 InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
393
394 /*
395 * We put the ResultRelInfo in the es_opened_result_relations list, even
396 * though we don't populate the es_result_relations array. That's a bit
397 * bogus, but it's enough to make ExecGetTriggerResultRel() find them.
398 *
399 * ExecOpenIndices() is not called here either, each execution path doing
400 * an apply operation being responsible for that.
401 */
402 estate->es_opened_result_relations =
403 lappend(estate->es_opened_result_relations, resultRelInfo);
404
405 estate->es_output_cid = GetCurrentCommandId(true);
406
407 /* Prepare to catch AFTER triggers. */
408 AfterTriggerBeginQuery();
409
410 /* other fields of edata remain NULL for now */
411
412 return edata;
413 }
414
415 /*
416 * Finish any operations related to the executor state created by
417 * create_edata_for_relation().
418 */
419 static void
finish_edata(ApplyExecutionData * edata)420 finish_edata(ApplyExecutionData *edata)
421 {
422 EState *estate = edata->estate;
423
424 /* Handle any queued AFTER triggers. */
425 AfterTriggerEndQuery(estate);
426
427 /* Shut down tuple routing, if any was done. */
428 if (edata->proute)
429 ExecCleanupTupleRouting(edata->mtstate, edata->proute);
430
431 /*
432 * Cleanup. It might seem that we should call ExecCloseResultRelations()
433 * here, but we intentionally don't. It would close the rel we added to
434 * es_opened_result_relations above, which is wrong because we took no
435 * corresponding refcount. We rely on ExecCleanupTupleRouting() to close
436 * any other relations opened during execution.
437 */
438 ExecResetTupleTable(estate->es_tupleTable, false);
439 FreeExecutorState(estate);
440 pfree(edata);
441 }
442
443 /*
444 * Executes default values for columns for which we can't map to remote
445 * relation columns.
446 *
447 * This allows us to support tables which have more columns on the downstream
448 * than on the upstream.
449 */
450 static void
slot_fill_defaults(LogicalRepRelMapEntry * rel,EState * estate,TupleTableSlot * slot)451 slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate,
452 TupleTableSlot *slot)
453 {
454 TupleDesc desc = RelationGetDescr(rel->localrel);
455 int num_phys_attrs = desc->natts;
456 int i;
457 int attnum,
458 num_defaults = 0;
459 int *defmap;
460 ExprState **defexprs;
461 ExprContext *econtext;
462
463 econtext = GetPerTupleExprContext(estate);
464
465 /* We got all the data via replication, no need to evaluate anything. */
466 if (num_phys_attrs == rel->remoterel.natts)
467 return;
468
469 defmap = (int *) palloc(num_phys_attrs * sizeof(int));
470 defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
471
472 Assert(rel->attrmap->maplen == num_phys_attrs);
473 for (attnum = 0; attnum < num_phys_attrs; attnum++)
474 {
475 Expr *defexpr;
476
477 if (TupleDescAttr(desc, attnum)->attisdropped || TupleDescAttr(desc, attnum)->attgenerated)
478 continue;
479
480 if (rel->attrmap->attnums[attnum] >= 0)
481 continue;
482
483 defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
484
485 if (defexpr != NULL)
486 {
487 /* Run the expression through planner */
488 defexpr = expression_planner(defexpr);
489
490 /* Initialize executable expression in copycontext */
491 defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
492 defmap[num_defaults] = attnum;
493 num_defaults++;
494 }
495
496 }
497
498 for (i = 0; i < num_defaults; i++)
499 slot->tts_values[defmap[i]] =
500 ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
501 }
502
503 /*
504 * Error callback to give more context info about data conversion failures
505 * while reading data from the remote server.
506 */
507 static void
slot_store_error_callback(void * arg)508 slot_store_error_callback(void *arg)
509 {
510 SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
511 LogicalRepRelMapEntry *rel;
512
513 /* Nothing to do if remote attribute number is not set */
514 if (errarg->remote_attnum < 0)
515 return;
516
517 rel = errarg->rel;
518 errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\"",
519 rel->remoterel.nspname, rel->remoterel.relname,
520 rel->remoterel.attnames[errarg->remote_attnum]);
521 }
522
523 /*
524 * Store tuple data into slot.
525 *
526 * Incoming data can be either text or binary format.
527 */
528 static void
slot_store_data(TupleTableSlot * slot,LogicalRepRelMapEntry * rel,LogicalRepTupleData * tupleData)529 slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
530 LogicalRepTupleData *tupleData)
531 {
532 int natts = slot->tts_tupleDescriptor->natts;
533 int i;
534 SlotErrCallbackArg errarg;
535 ErrorContextCallback errcallback;
536
537 ExecClearTuple(slot);
538
539 /* Push callback + info on the error context stack */
540 errarg.rel = rel;
541 errarg.remote_attnum = -1;
542 errcallback.callback = slot_store_error_callback;
543 errcallback.arg = (void *) &errarg;
544 errcallback.previous = error_context_stack;
545 error_context_stack = &errcallback;
546
547 /* Call the "in" function for each non-dropped, non-null attribute */
548 Assert(natts == rel->attrmap->maplen);
549 for (i = 0; i < natts; i++)
550 {
551 Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
552 int remoteattnum = rel->attrmap->attnums[i];
553
554 if (!att->attisdropped && remoteattnum >= 0)
555 {
556 StringInfo colvalue = &tupleData->colvalues[remoteattnum];
557
558 Assert(remoteattnum < tupleData->ncols);
559
560 errarg.remote_attnum = remoteattnum;
561
562 if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
563 {
564 Oid typinput;
565 Oid typioparam;
566
567 getTypeInputInfo(att->atttypid, &typinput, &typioparam);
568 slot->tts_values[i] =
569 OidInputFunctionCall(typinput, colvalue->data,
570 typioparam, att->atttypmod);
571 slot->tts_isnull[i] = false;
572 }
573 else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
574 {
575 Oid typreceive;
576 Oid typioparam;
577
578 /*
579 * In some code paths we may be asked to re-parse the same
580 * tuple data. Reset the StringInfo's cursor so that works.
581 */
582 colvalue->cursor = 0;
583
584 getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
585 slot->tts_values[i] =
586 OidReceiveFunctionCall(typreceive, colvalue,
587 typioparam, att->atttypmod);
588
589 /* Trouble if it didn't eat the whole buffer */
590 if (colvalue->cursor != colvalue->len)
591 ereport(ERROR,
592 (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
593 errmsg("incorrect binary data format in logical replication column %d",
594 remoteattnum + 1)));
595 slot->tts_isnull[i] = false;
596 }
597 else
598 {
599 /*
600 * NULL value from remote. (We don't expect to see
601 * LOGICALREP_COLUMN_UNCHANGED here, but if we do, treat it as
602 * NULL.)
603 */
604 slot->tts_values[i] = (Datum) 0;
605 slot->tts_isnull[i] = true;
606 }
607
608 errarg.remote_attnum = -1;
609 }
610 else
611 {
612 /*
613 * We assign NULL to dropped attributes and missing values
614 * (missing values should be later filled using
615 * slot_fill_defaults).
616 */
617 slot->tts_values[i] = (Datum) 0;
618 slot->tts_isnull[i] = true;
619 }
620 }
621
622 /* Pop the error context stack */
623 error_context_stack = errcallback.previous;
624
625 ExecStoreVirtualTuple(slot);
626 }
627
628 /*
629 * Replace updated columns with data from the LogicalRepTupleData struct.
630 * This is somewhat similar to heap_modify_tuple but also calls the type
631 * input functions on the user data.
632 *
633 * "slot" is filled with a copy of the tuple in "srcslot", replacing
634 * columns provided in "tupleData" and leaving others as-is.
635 *
636 * Caution: unreplaced pass-by-ref columns in "slot" will point into the
637 * storage for "srcslot". This is OK for current usage, but someday we may
638 * need to materialize "slot" at the end to make it independent of "srcslot".
639 */
640 static void
slot_modify_data(TupleTableSlot * slot,TupleTableSlot * srcslot,LogicalRepRelMapEntry * rel,LogicalRepTupleData * tupleData)641 slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot,
642 LogicalRepRelMapEntry *rel,
643 LogicalRepTupleData *tupleData)
644 {
645 int natts = slot->tts_tupleDescriptor->natts;
646 int i;
647 SlotErrCallbackArg errarg;
648 ErrorContextCallback errcallback;
649
650 /* We'll fill "slot" with a virtual tuple, so we must start with ... */
651 ExecClearTuple(slot);
652
653 /*
654 * Copy all the column data from srcslot, so that we'll have valid values
655 * for unreplaced columns.
656 */
657 Assert(natts == srcslot->tts_tupleDescriptor->natts);
658 slot_getallattrs(srcslot);
659 memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
660 memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
661
662 /* For error reporting, push callback + info on the error context stack */
663 errarg.rel = rel;
664 errarg.remote_attnum = -1;
665 errcallback.callback = slot_store_error_callback;
666 errcallback.arg = (void *) &errarg;
667 errcallback.previous = error_context_stack;
668 error_context_stack = &errcallback;
669
670 /* Call the "in" function for each replaced attribute */
671 Assert(natts == rel->attrmap->maplen);
672 for (i = 0; i < natts; i++)
673 {
674 Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
675 int remoteattnum = rel->attrmap->attnums[i];
676
677 if (remoteattnum < 0)
678 continue;
679
680 Assert(remoteattnum < tupleData->ncols);
681
682 if (tupleData->colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
683 {
684 StringInfo colvalue = &tupleData->colvalues[remoteattnum];
685
686 errarg.remote_attnum = remoteattnum;
687
688 if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
689 {
690 Oid typinput;
691 Oid typioparam;
692
693 getTypeInputInfo(att->atttypid, &typinput, &typioparam);
694 slot->tts_values[i] =
695 OidInputFunctionCall(typinput, colvalue->data,
696 typioparam, att->atttypmod);
697 slot->tts_isnull[i] = false;
698 }
699 else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
700 {
701 Oid typreceive;
702 Oid typioparam;
703
704 /*
705 * In some code paths we may be asked to re-parse the same
706 * tuple data. Reset the StringInfo's cursor so that works.
707 */
708 colvalue->cursor = 0;
709
710 getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
711 slot->tts_values[i] =
712 OidReceiveFunctionCall(typreceive, colvalue,
713 typioparam, att->atttypmod);
714
715 /* Trouble if it didn't eat the whole buffer */
716 if (colvalue->cursor != colvalue->len)
717 ereport(ERROR,
718 (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
719 errmsg("incorrect binary data format in logical replication column %d",
720 remoteattnum + 1)));
721 slot->tts_isnull[i] = false;
722 }
723 else
724 {
725 /* must be LOGICALREP_COLUMN_NULL */
726 slot->tts_values[i] = (Datum) 0;
727 slot->tts_isnull[i] = true;
728 }
729
730 errarg.remote_attnum = -1;
731 }
732 }
733
734 /* Pop the error context stack */
735 error_context_stack = errcallback.previous;
736
737 /* And finally, declare that "slot" contains a valid virtual tuple */
738 ExecStoreVirtualTuple(slot);
739 }
740
741 /*
742 * Handle BEGIN message.
743 */
744 static void
apply_handle_begin(StringInfo s)745 apply_handle_begin(StringInfo s)
746 {
747 LogicalRepBeginData begin_data;
748
749 logicalrep_read_begin(s, &begin_data);
750
751 remote_final_lsn = begin_data.final_lsn;
752
753 in_remote_transaction = true;
754
755 pgstat_report_activity(STATE_RUNNING, NULL);
756 }
757
758 /*
759 * Handle COMMIT message.
760 *
761 * TODO, support tracking of multiple origins
762 */
763 static void
apply_handle_commit(StringInfo s)764 apply_handle_commit(StringInfo s)
765 {
766 LogicalRepCommitData commit_data;
767
768 logicalrep_read_commit(s, &commit_data);
769
770 if (commit_data.commit_lsn != remote_final_lsn)
771 ereport(ERROR,
772 (errcode(ERRCODE_PROTOCOL_VIOLATION),
773 errmsg_internal("incorrect commit LSN %X/%X in commit message (expected %X/%X)",
774 LSN_FORMAT_ARGS(commit_data.commit_lsn),
775 LSN_FORMAT_ARGS(remote_final_lsn))));
776
777 apply_handle_commit_internal(&commit_data);
778
779 /* Process any tables that are being synchronized in parallel. */
780 process_syncing_tables(commit_data.end_lsn);
781
782 pgstat_report_activity(STATE_IDLE, NULL);
783 }
784
785 /*
786 * Handle ORIGIN message.
787 *
788 * TODO, support tracking of multiple origins
789 */
790 static void
apply_handle_origin(StringInfo s)791 apply_handle_origin(StringInfo s)
792 {
793 /*
794 * ORIGIN message can only come inside streaming transaction or inside
795 * remote transaction and before any actual writes.
796 */
797 if (!in_streamed_transaction &&
798 (!in_remote_transaction ||
799 (IsTransactionState() && !am_tablesync_worker())))
800 ereport(ERROR,
801 (errcode(ERRCODE_PROTOCOL_VIOLATION),
802 errmsg_internal("ORIGIN message sent out of order")));
803 }
804
805 /*
806 * Handle STREAM START message.
807 */
808 static void
apply_handle_stream_start(StringInfo s)809 apply_handle_stream_start(StringInfo s)
810 {
811 bool first_segment;
812 HASHCTL hash_ctl;
813
814 if (in_streamed_transaction)
815 ereport(ERROR,
816 (errcode(ERRCODE_PROTOCOL_VIOLATION),
817 errmsg_internal("duplicate STREAM START message")));
818
819 /*
820 * Start a transaction on stream start, this transaction will be committed
821 * on the stream stop unless it is a tablesync worker in which case it
822 * will be committed after processing all the messages. We need the
823 * transaction for handling the buffile, used for serializing the
824 * streaming data and subxact info.
825 */
826 begin_replication_step();
827
828 /* notify handle methods we're processing a remote transaction */
829 in_streamed_transaction = true;
830
831 /* extract XID of the top-level transaction */
832 stream_xid = logicalrep_read_stream_start(s, &first_segment);
833
834 if (!TransactionIdIsValid(stream_xid))
835 ereport(ERROR,
836 (errcode(ERRCODE_PROTOCOL_VIOLATION),
837 errmsg_internal("invalid transaction ID in streamed replication transaction")));
838
839 /*
840 * Initialize the xidhash table if we haven't yet. This will be used for
841 * the entire duration of the apply worker so create it in permanent
842 * context.
843 */
844 if (xidhash == NULL)
845 {
846 hash_ctl.keysize = sizeof(TransactionId);
847 hash_ctl.entrysize = sizeof(StreamXidHash);
848 hash_ctl.hcxt = ApplyContext;
849 xidhash = hash_create("StreamXidHash", 1024, &hash_ctl,
850 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
851 }
852
853 /* open the spool file for this transaction */
854 stream_open_file(MyLogicalRepWorker->subid, stream_xid, first_segment);
855
856 /* if this is not the first segment, open existing subxact file */
857 if (!first_segment)
858 subxact_info_read(MyLogicalRepWorker->subid, stream_xid);
859
860 pgstat_report_activity(STATE_RUNNING, NULL);
861
862 end_replication_step();
863 }
864
865 /*
866 * Handle STREAM STOP message.
867 */
868 static void
apply_handle_stream_stop(StringInfo s)869 apply_handle_stream_stop(StringInfo s)
870 {
871 if (!in_streamed_transaction)
872 ereport(ERROR,
873 (errcode(ERRCODE_PROTOCOL_VIOLATION),
874 errmsg_internal("STREAM STOP message without STREAM START")));
875
876 /*
877 * Close the file with serialized changes, and serialize information about
878 * subxacts for the toplevel transaction.
879 */
880 subxact_info_write(MyLogicalRepWorker->subid, stream_xid);
881 stream_close_file();
882
883 /* We must be in a valid transaction state */
884 Assert(IsTransactionState());
885
886 /* Commit the per-stream transaction */
887 CommitTransactionCommand();
888
889 in_streamed_transaction = false;
890
891 /* Reset per-stream context */
892 MemoryContextReset(LogicalStreamingContext);
893
894 pgstat_report_activity(STATE_IDLE, NULL);
895 }
896
897 /*
898 * Handle STREAM abort message.
899 */
900 static void
apply_handle_stream_abort(StringInfo s)901 apply_handle_stream_abort(StringInfo s)
902 {
903 TransactionId xid;
904 TransactionId subxid;
905
906 if (in_streamed_transaction)
907 ereport(ERROR,
908 (errcode(ERRCODE_PROTOCOL_VIOLATION),
909 errmsg_internal("STREAM ABORT message without STREAM STOP")));
910
911 logicalrep_read_stream_abort(s, &xid, &subxid);
912
913 /*
914 * If the two XIDs are the same, it's in fact abort of toplevel xact, so
915 * just delete the files with serialized info.
916 */
917 if (xid == subxid)
918 stream_cleanup_files(MyLogicalRepWorker->subid, xid);
919 else
920 {
921 /*
922 * OK, so it's a subxact. We need to read the subxact file for the
923 * toplevel transaction, determine the offset tracked for the subxact,
924 * and truncate the file with changes. We also remove the subxacts
925 * with higher offsets (or rather higher XIDs).
926 *
927 * We intentionally scan the array from the tail, because we're likely
928 * aborting a change for the most recent subtransactions.
929 *
930 * We can't use the binary search here as subxact XIDs won't
931 * necessarily arrive in sorted order, consider the case where we have
932 * released the savepoint for multiple subtransactions and then
933 * performed rollback to savepoint for one of the earlier
934 * sub-transaction.
935 */
936 int64 i;
937 int64 subidx;
938 BufFile *fd;
939 bool found = false;
940 char path[MAXPGPATH];
941 StreamXidHash *ent;
942
943 subidx = -1;
944 begin_replication_step();
945 subxact_info_read(MyLogicalRepWorker->subid, xid);
946
947 for (i = subxact_data.nsubxacts; i > 0; i--)
948 {
949 if (subxact_data.subxacts[i - 1].xid == subxid)
950 {
951 subidx = (i - 1);
952 found = true;
953 break;
954 }
955 }
956
957 /*
958 * If it's an empty sub-transaction then we will not find the subxid
959 * here so just cleanup the subxact info and return.
960 */
961 if (!found)
962 {
963 /* Cleanup the subxact info */
964 cleanup_subxact_info();
965 end_replication_step();
966 CommitTransactionCommand();
967 return;
968 }
969
970 ent = (StreamXidHash *) hash_search(xidhash,
971 (void *) &xid,
972 HASH_FIND,
973 NULL);
974 if (!ent)
975 ereport(ERROR,
976 (errcode(ERRCODE_PROTOCOL_VIOLATION),
977 errmsg_internal("transaction %u not found in stream XID hash table",
978 xid)));
979
980 /* open the changes file */
981 changes_filename(path, MyLogicalRepWorker->subid, xid);
982 fd = BufFileOpenShared(ent->stream_fileset, path, O_RDWR);
983
984 /* OK, truncate the file at the right offset */
985 BufFileTruncateShared(fd, subxact_data.subxacts[subidx].fileno,
986 subxact_data.subxacts[subidx].offset);
987 BufFileClose(fd);
988
989 /* discard the subxacts added later */
990 subxact_data.nsubxacts = subidx;
991
992 /* write the updated subxact list */
993 subxact_info_write(MyLogicalRepWorker->subid, xid);
994
995 end_replication_step();
996 CommitTransactionCommand();
997 }
998 }
999
1000 /*
1001 * Handle STREAM COMMIT message.
1002 */
1003 static void
apply_handle_stream_commit(StringInfo s)1004 apply_handle_stream_commit(StringInfo s)
1005 {
1006 TransactionId xid;
1007 StringInfoData s2;
1008 int nchanges;
1009 char path[MAXPGPATH];
1010 char *buffer = NULL;
1011 LogicalRepCommitData commit_data;
1012 StreamXidHash *ent;
1013 MemoryContext oldcxt;
1014 BufFile *fd;
1015
1016 if (in_streamed_transaction)
1017 ereport(ERROR,
1018 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1019 errmsg_internal("STREAM COMMIT message without STREAM STOP")));
1020
1021 xid = logicalrep_read_stream_commit(s, &commit_data);
1022
1023 elog(DEBUG1, "received commit for streamed transaction %u", xid);
1024
1025 /* Make sure we have an open transaction */
1026 begin_replication_step();
1027
1028 /*
1029 * Allocate file handle and memory required to process all the messages in
1030 * TopTransactionContext to avoid them getting reset after each message is
1031 * processed.
1032 */
1033 oldcxt = MemoryContextSwitchTo(TopTransactionContext);
1034
1035 /* open the spool file for the committed transaction */
1036 changes_filename(path, MyLogicalRepWorker->subid, xid);
1037 elog(DEBUG1, "replaying changes from file \"%s\"", path);
1038
1039 ent = (StreamXidHash *) hash_search(xidhash,
1040 (void *) &xid,
1041 HASH_FIND,
1042 NULL);
1043 if (!ent)
1044 ereport(ERROR,
1045 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1046 errmsg_internal("transaction %u not found in stream XID hash table",
1047 xid)));
1048
1049 fd = BufFileOpenShared(ent->stream_fileset, path, O_RDONLY);
1050
1051 buffer = palloc(BLCKSZ);
1052 initStringInfo(&s2);
1053
1054 MemoryContextSwitchTo(oldcxt);
1055
1056 remote_final_lsn = commit_data.commit_lsn;
1057
1058 /*
1059 * Make sure the handle apply_dispatch methods are aware we're in a remote
1060 * transaction.
1061 */
1062 in_remote_transaction = true;
1063 pgstat_report_activity(STATE_RUNNING, NULL);
1064
1065 end_replication_step();
1066
1067 /*
1068 * Read the entries one by one and pass them through the same logic as in
1069 * apply_dispatch.
1070 */
1071 nchanges = 0;
1072 while (true)
1073 {
1074 int nbytes;
1075 int len;
1076
1077 CHECK_FOR_INTERRUPTS();
1078
1079 /* read length of the on-disk record */
1080 nbytes = BufFileRead(fd, &len, sizeof(len));
1081
1082 /* have we reached end of the file? */
1083 if (nbytes == 0)
1084 break;
1085
1086 /* do we have a correct length? */
1087 if (nbytes != sizeof(len))
1088 ereport(ERROR,
1089 (errcode_for_file_access(),
1090 errmsg("could not read from streaming transaction's changes file \"%s\": %m",
1091 path)));
1092
1093 if (len <= 0)
1094 elog(ERROR, "incorrect length %d in streaming transaction's changes file \"%s\"",
1095 len, path);
1096
1097 /* make sure we have sufficiently large buffer */
1098 buffer = repalloc(buffer, len);
1099
1100 /* and finally read the data into the buffer */
1101 if (BufFileRead(fd, buffer, len) != len)
1102 ereport(ERROR,
1103 (errcode_for_file_access(),
1104 errmsg("could not read from streaming transaction's changes file \"%s\": %m",
1105 path)));
1106
1107 /* copy the buffer to the stringinfo and call apply_dispatch */
1108 resetStringInfo(&s2);
1109 appendBinaryStringInfo(&s2, buffer, len);
1110
1111 /* Ensure we are reading the data into our memory context. */
1112 oldcxt = MemoryContextSwitchTo(ApplyMessageContext);
1113
1114 apply_dispatch(&s2);
1115
1116 MemoryContextReset(ApplyMessageContext);
1117
1118 MemoryContextSwitchTo(oldcxt);
1119
1120 nchanges++;
1121
1122 if (nchanges % 1000 == 0)
1123 elog(DEBUG1, "replayed %d changes from file \"%s\"",
1124 nchanges, path);
1125 }
1126
1127 BufFileClose(fd);
1128
1129 pfree(buffer);
1130 pfree(s2.data);
1131
1132 elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
1133 nchanges, path);
1134
1135 apply_handle_commit_internal(&commit_data);
1136
1137 /* unlink the files with serialized changes and subxact info */
1138 stream_cleanup_files(MyLogicalRepWorker->subid, xid);
1139
1140 /* Process any tables that are being synchronized in parallel. */
1141 process_syncing_tables(commit_data.end_lsn);
1142
1143 pgstat_report_activity(STATE_IDLE, NULL);
1144 }
1145
1146 /*
1147 * Helper function for apply_handle_commit and apply_handle_stream_commit.
1148 */
1149 static void
apply_handle_commit_internal(LogicalRepCommitData * commit_data)1150 apply_handle_commit_internal(LogicalRepCommitData *commit_data)
1151 {
1152 if (IsTransactionState())
1153 {
1154 /*
1155 * Update origin state so we can restart streaming from correct
1156 * position in case of crash.
1157 */
1158 replorigin_session_origin_lsn = commit_data->end_lsn;
1159 replorigin_session_origin_timestamp = commit_data->committime;
1160
1161 CommitTransactionCommand();
1162 pgstat_report_stat(false);
1163
1164 store_flush_position(commit_data->end_lsn);
1165 }
1166 else
1167 {
1168 /* Process any invalidation messages that might have accumulated. */
1169 AcceptInvalidationMessages();
1170 maybe_reread_subscription();
1171 }
1172
1173 in_remote_transaction = false;
1174 }
1175
1176 /*
1177 * Handle RELATION message.
1178 *
1179 * Note we don't do validation against local schema here. The validation
1180 * against local schema is postponed until first change for given relation
1181 * comes as we only care about it when applying changes for it anyway and we
1182 * do less locking this way.
1183 */
1184 static void
apply_handle_relation(StringInfo s)1185 apply_handle_relation(StringInfo s)
1186 {
1187 LogicalRepRelation *rel;
1188
1189 if (handle_streamed_transaction(LOGICAL_REP_MSG_RELATION, s))
1190 return;
1191
1192 rel = logicalrep_read_rel(s);
1193 logicalrep_relmap_update(rel);
1194 }
1195
1196 /*
1197 * Handle TYPE message.
1198 *
1199 * This is now vestigial; we read the info and discard it.
1200 */
1201 static void
apply_handle_type(StringInfo s)1202 apply_handle_type(StringInfo s)
1203 {
1204 LogicalRepTyp typ;
1205
1206 if (handle_streamed_transaction(LOGICAL_REP_MSG_TYPE, s))
1207 return;
1208
1209 logicalrep_read_typ(s, &typ);
1210 }
1211
1212 /*
1213 * Get replica identity index or if it is not defined a primary key.
1214 *
1215 * If neither is defined, returns InvalidOid
1216 */
1217 static Oid
GetRelationIdentityOrPK(Relation rel)1218 GetRelationIdentityOrPK(Relation rel)
1219 {
1220 Oid idxoid;
1221
1222 idxoid = RelationGetReplicaIndex(rel);
1223
1224 if (!OidIsValid(idxoid))
1225 idxoid = RelationGetPrimaryKeyIndex(rel);
1226
1227 return idxoid;
1228 }
1229
1230 /*
1231 * Handle INSERT message.
1232 */
1233
1234 static void
apply_handle_insert(StringInfo s)1235 apply_handle_insert(StringInfo s)
1236 {
1237 LogicalRepRelMapEntry *rel;
1238 LogicalRepTupleData newtup;
1239 LogicalRepRelId relid;
1240 ApplyExecutionData *edata;
1241 EState *estate;
1242 TupleTableSlot *remoteslot;
1243 MemoryContext oldctx;
1244
1245 if (handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s))
1246 return;
1247
1248 begin_replication_step();
1249
1250 relid = logicalrep_read_insert(s, &newtup);
1251 rel = logicalrep_rel_open(relid, RowExclusiveLock);
1252 if (!should_apply_changes_for_rel(rel))
1253 {
1254 /*
1255 * The relation can't become interesting in the middle of the
1256 * transaction so it's safe to unlock it.
1257 */
1258 logicalrep_rel_close(rel, RowExclusiveLock);
1259 end_replication_step();
1260 return;
1261 }
1262
1263 /* Initialize the executor state. */
1264 edata = create_edata_for_relation(rel);
1265 estate = edata->estate;
1266 remoteslot = ExecInitExtraTupleSlot(estate,
1267 RelationGetDescr(rel->localrel),
1268 &TTSOpsVirtual);
1269
1270 /* Process and store remote tuple in the slot */
1271 oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
1272 slot_store_data(remoteslot, rel, &newtup);
1273 slot_fill_defaults(rel, estate, remoteslot);
1274 MemoryContextSwitchTo(oldctx);
1275
1276 /* For a partitioned table, insert the tuple into a partition. */
1277 if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1278 apply_handle_tuple_routing(edata,
1279 remoteslot, NULL, CMD_INSERT);
1280 else
1281 apply_handle_insert_internal(edata, edata->targetRelInfo,
1282 remoteslot);
1283
1284 finish_edata(edata);
1285
1286 logicalrep_rel_close(rel, NoLock);
1287
1288 end_replication_step();
1289 }
1290
1291 /*
1292 * Workhorse for apply_handle_insert()
1293 * relinfo is for the relation we're actually inserting into
1294 * (could be a child partition of edata->targetRelInfo)
1295 */
1296 static void
apply_handle_insert_internal(ApplyExecutionData * edata,ResultRelInfo * relinfo,TupleTableSlot * remoteslot)1297 apply_handle_insert_internal(ApplyExecutionData *edata,
1298 ResultRelInfo *relinfo,
1299 TupleTableSlot *remoteslot)
1300 {
1301 EState *estate = edata->estate;
1302
1303 /* We must open indexes here. */
1304 ExecOpenIndices(relinfo, false);
1305
1306 /* Do the insert. */
1307 ExecSimpleRelationInsert(relinfo, estate, remoteslot);
1308
1309 /* Cleanup. */
1310 ExecCloseIndices(relinfo);
1311 }
1312
1313 /*
1314 * Check if the logical replication relation is updatable and throw
1315 * appropriate error if it isn't.
1316 */
1317 static void
check_relation_updatable(LogicalRepRelMapEntry * rel)1318 check_relation_updatable(LogicalRepRelMapEntry *rel)
1319 {
1320 /* Updatable, no error. */
1321 if (rel->updatable)
1322 return;
1323
1324 /*
1325 * We are in error mode so it's fine this is somewhat slow. It's better to
1326 * give user correct error.
1327 */
1328 if (OidIsValid(GetRelationIdentityOrPK(rel->localrel)))
1329 {
1330 ereport(ERROR,
1331 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1332 errmsg("publisher did not send replica identity column "
1333 "expected by the logical replication target relation \"%s.%s\"",
1334 rel->remoterel.nspname, rel->remoterel.relname)));
1335 }
1336
1337 ereport(ERROR,
1338 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1339 errmsg("logical replication target relation \"%s.%s\" has "
1340 "neither REPLICA IDENTITY index nor PRIMARY "
1341 "KEY and published relation does not have "
1342 "REPLICA IDENTITY FULL",
1343 rel->remoterel.nspname, rel->remoterel.relname)));
1344 }
1345
1346 /*
1347 * Handle UPDATE message.
1348 *
1349 * TODO: FDW support
1350 */
1351 static void
apply_handle_update(StringInfo s)1352 apply_handle_update(StringInfo s)
1353 {
1354 LogicalRepRelMapEntry *rel;
1355 LogicalRepRelId relid;
1356 ApplyExecutionData *edata;
1357 EState *estate;
1358 LogicalRepTupleData oldtup;
1359 LogicalRepTupleData newtup;
1360 bool has_oldtup;
1361 TupleTableSlot *remoteslot;
1362 RangeTblEntry *target_rte;
1363 MemoryContext oldctx;
1364
1365 if (handle_streamed_transaction(LOGICAL_REP_MSG_UPDATE, s))
1366 return;
1367
1368 begin_replication_step();
1369
1370 relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
1371 &newtup);
1372 rel = logicalrep_rel_open(relid, RowExclusiveLock);
1373 if (!should_apply_changes_for_rel(rel))
1374 {
1375 /*
1376 * The relation can't become interesting in the middle of the
1377 * transaction so it's safe to unlock it.
1378 */
1379 logicalrep_rel_close(rel, RowExclusiveLock);
1380 end_replication_step();
1381 return;
1382 }
1383
1384 /* Check if we can do the update. */
1385 check_relation_updatable(rel);
1386
1387 /* Initialize the executor state. */
1388 edata = create_edata_for_relation(rel);
1389 estate = edata->estate;
1390 remoteslot = ExecInitExtraTupleSlot(estate,
1391 RelationGetDescr(rel->localrel),
1392 &TTSOpsVirtual);
1393
1394 /*
1395 * Populate updatedCols so that per-column triggers can fire, and so
1396 * executor can correctly pass down indexUnchanged hint. This could
1397 * include more columns than were actually changed on the publisher
1398 * because the logical replication protocol doesn't contain that
1399 * information. But it would for example exclude columns that only exist
1400 * on the subscriber, since we are not touching those.
1401 */
1402 target_rte = list_nth(estate->es_range_table, 0);
1403 for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
1404 {
1405 Form_pg_attribute att = TupleDescAttr(remoteslot->tts_tupleDescriptor, i);
1406 int remoteattnum = rel->attrmap->attnums[i];
1407
1408 if (!att->attisdropped && remoteattnum >= 0)
1409 {
1410 Assert(remoteattnum < newtup.ncols);
1411 if (newtup.colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
1412 target_rte->updatedCols =
1413 bms_add_member(target_rte->updatedCols,
1414 i + 1 - FirstLowInvalidHeapAttributeNumber);
1415 }
1416 }
1417
1418 /* Also populate extraUpdatedCols, in case we have generated columns */
1419 fill_extraUpdatedCols(target_rte, rel->localrel);
1420
1421 /* Build the search tuple. */
1422 oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
1423 slot_store_data(remoteslot, rel,
1424 has_oldtup ? &oldtup : &newtup);
1425 MemoryContextSwitchTo(oldctx);
1426
1427 /* For a partitioned table, apply update to correct partition. */
1428 if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1429 apply_handle_tuple_routing(edata,
1430 remoteslot, &newtup, CMD_UPDATE);
1431 else
1432 apply_handle_update_internal(edata, edata->targetRelInfo,
1433 remoteslot, &newtup);
1434
1435 finish_edata(edata);
1436
1437 logicalrep_rel_close(rel, NoLock);
1438
1439 end_replication_step();
1440 }
1441
1442 /*
1443 * Workhorse for apply_handle_update()
1444 * relinfo is for the relation we're actually updating in
1445 * (could be a child partition of edata->targetRelInfo)
1446 */
1447 static void
apply_handle_update_internal(ApplyExecutionData * edata,ResultRelInfo * relinfo,TupleTableSlot * remoteslot,LogicalRepTupleData * newtup)1448 apply_handle_update_internal(ApplyExecutionData *edata,
1449 ResultRelInfo *relinfo,
1450 TupleTableSlot *remoteslot,
1451 LogicalRepTupleData *newtup)
1452 {
1453 EState *estate = edata->estate;
1454 LogicalRepRelMapEntry *relmapentry = edata->targetRel;
1455 Relation localrel = relinfo->ri_RelationDesc;
1456 EPQState epqstate;
1457 TupleTableSlot *localslot;
1458 bool found;
1459 MemoryContext oldctx;
1460
1461 EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
1462 ExecOpenIndices(relinfo, false);
1463
1464 found = FindReplTupleInLocalRel(estate, localrel,
1465 &relmapentry->remoterel,
1466 remoteslot, &localslot);
1467 ExecClearTuple(remoteslot);
1468
1469 /*
1470 * Tuple found.
1471 *
1472 * Note this will fail if there are other conflicting unique indexes.
1473 */
1474 if (found)
1475 {
1476 /* Process and store remote tuple in the slot */
1477 oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
1478 slot_modify_data(remoteslot, localslot, relmapentry, newtup);
1479 MemoryContextSwitchTo(oldctx);
1480
1481 EvalPlanQualSetSlot(&epqstate, remoteslot);
1482
1483 /* Do the actual update. */
1484 ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot,
1485 remoteslot);
1486 }
1487 else
1488 {
1489 /*
1490 * The tuple to be updated could not be found. Do nothing except for
1491 * emitting a log message.
1492 *
1493 * XXX should this be promoted to ereport(LOG) perhaps?
1494 */
1495 elog(DEBUG1,
1496 "logical replication did not find row to be updated "
1497 "in replication target relation \"%s\"",
1498 RelationGetRelationName(localrel));
1499 }
1500
1501 /* Cleanup. */
1502 ExecCloseIndices(relinfo);
1503 EvalPlanQualEnd(&epqstate);
1504 }
1505
1506 /*
1507 * Handle DELETE message.
1508 *
1509 * TODO: FDW support
1510 */
1511 static void
apply_handle_delete(StringInfo s)1512 apply_handle_delete(StringInfo s)
1513 {
1514 LogicalRepRelMapEntry *rel;
1515 LogicalRepTupleData oldtup;
1516 LogicalRepRelId relid;
1517 ApplyExecutionData *edata;
1518 EState *estate;
1519 TupleTableSlot *remoteslot;
1520 MemoryContext oldctx;
1521
1522 if (handle_streamed_transaction(LOGICAL_REP_MSG_DELETE, s))
1523 return;
1524
1525 begin_replication_step();
1526
1527 relid = logicalrep_read_delete(s, &oldtup);
1528 rel = logicalrep_rel_open(relid, RowExclusiveLock);
1529 if (!should_apply_changes_for_rel(rel))
1530 {
1531 /*
1532 * The relation can't become interesting in the middle of the
1533 * transaction so it's safe to unlock it.
1534 */
1535 logicalrep_rel_close(rel, RowExclusiveLock);
1536 end_replication_step();
1537 return;
1538 }
1539
1540 /* Check if we can do the delete. */
1541 check_relation_updatable(rel);
1542
1543 /* Initialize the executor state. */
1544 edata = create_edata_for_relation(rel);
1545 estate = edata->estate;
1546 remoteslot = ExecInitExtraTupleSlot(estate,
1547 RelationGetDescr(rel->localrel),
1548 &TTSOpsVirtual);
1549
1550 /* Build the search tuple. */
1551 oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
1552 slot_store_data(remoteslot, rel, &oldtup);
1553 MemoryContextSwitchTo(oldctx);
1554
1555 /* For a partitioned table, apply delete to correct partition. */
1556 if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1557 apply_handle_tuple_routing(edata,
1558 remoteslot, NULL, CMD_DELETE);
1559 else
1560 apply_handle_delete_internal(edata, edata->targetRelInfo,
1561 remoteslot);
1562
1563 finish_edata(edata);
1564
1565 logicalrep_rel_close(rel, NoLock);
1566
1567 end_replication_step();
1568 }
1569
1570 /*
1571 * Workhorse for apply_handle_delete()
1572 * relinfo is for the relation we're actually deleting from
1573 * (could be a child partition of edata->targetRelInfo)
1574 */
1575 static void
apply_handle_delete_internal(ApplyExecutionData * edata,ResultRelInfo * relinfo,TupleTableSlot * remoteslot)1576 apply_handle_delete_internal(ApplyExecutionData *edata,
1577 ResultRelInfo *relinfo,
1578 TupleTableSlot *remoteslot)
1579 {
1580 EState *estate = edata->estate;
1581 Relation localrel = relinfo->ri_RelationDesc;
1582 LogicalRepRelation *remoterel = &edata->targetRel->remoterel;
1583 EPQState epqstate;
1584 TupleTableSlot *localslot;
1585 bool found;
1586
1587 EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
1588 ExecOpenIndices(relinfo, false);
1589
1590 found = FindReplTupleInLocalRel(estate, localrel, remoterel,
1591 remoteslot, &localslot);
1592
1593 /* If found delete it. */
1594 if (found)
1595 {
1596 EvalPlanQualSetSlot(&epqstate, localslot);
1597
1598 /* Do the actual delete. */
1599 ExecSimpleRelationDelete(relinfo, estate, &epqstate, localslot);
1600 }
1601 else
1602 {
1603 /*
1604 * The tuple to be deleted could not be found. Do nothing except for
1605 * emitting a log message.
1606 *
1607 * XXX should this be promoted to ereport(LOG) perhaps?
1608 */
1609 elog(DEBUG1,
1610 "logical replication did not find row to be deleted "
1611 "in replication target relation \"%s\"",
1612 RelationGetRelationName(localrel));
1613 }
1614
1615 /* Cleanup. */
1616 ExecCloseIndices(relinfo);
1617 EvalPlanQualEnd(&epqstate);
1618 }
1619
1620 /*
1621 * Try to find a tuple received from the publication side (in 'remoteslot') in
1622 * the corresponding local relation using either replica identity index,
1623 * primary key or if needed, sequential scan.
1624 *
1625 * Local tuple, if found, is returned in '*localslot'.
1626 */
1627 static bool
FindReplTupleInLocalRel(EState * estate,Relation localrel,LogicalRepRelation * remoterel,TupleTableSlot * remoteslot,TupleTableSlot ** localslot)1628 FindReplTupleInLocalRel(EState *estate, Relation localrel,
1629 LogicalRepRelation *remoterel,
1630 TupleTableSlot *remoteslot,
1631 TupleTableSlot **localslot)
1632 {
1633 Oid idxoid;
1634 bool found;
1635
1636 *localslot = table_slot_create(localrel, &estate->es_tupleTable);
1637
1638 idxoid = GetRelationIdentityOrPK(localrel);
1639 Assert(OidIsValid(idxoid) ||
1640 (remoterel->replident == REPLICA_IDENTITY_FULL));
1641
1642 if (OidIsValid(idxoid))
1643 found = RelationFindReplTupleByIndex(localrel, idxoid,
1644 LockTupleExclusive,
1645 remoteslot, *localslot);
1646 else
1647 found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
1648 remoteslot, *localslot);
1649
1650 return found;
1651 }
1652
1653 /*
1654 * This handles insert, update, delete on a partitioned table.
1655 */
1656 static void
apply_handle_tuple_routing(ApplyExecutionData * edata,TupleTableSlot * remoteslot,LogicalRepTupleData * newtup,CmdType operation)1657 apply_handle_tuple_routing(ApplyExecutionData *edata,
1658 TupleTableSlot *remoteslot,
1659 LogicalRepTupleData *newtup,
1660 CmdType operation)
1661 {
1662 EState *estate = edata->estate;
1663 LogicalRepRelMapEntry *relmapentry = edata->targetRel;
1664 ResultRelInfo *relinfo = edata->targetRelInfo;
1665 Relation parentrel = relinfo->ri_RelationDesc;
1666 ModifyTableState *mtstate;
1667 PartitionTupleRouting *proute;
1668 ResultRelInfo *partrelinfo;
1669 Relation partrel;
1670 TupleTableSlot *remoteslot_part;
1671 TupleConversionMap *map;
1672 MemoryContext oldctx;
1673
1674 /* ModifyTableState is needed for ExecFindPartition(). */
1675 edata->mtstate = mtstate = makeNode(ModifyTableState);
1676 mtstate->ps.plan = NULL;
1677 mtstate->ps.state = estate;
1678 mtstate->operation = operation;
1679 mtstate->resultRelInfo = relinfo;
1680
1681 /* ... as is PartitionTupleRouting. */
1682 edata->proute = proute = ExecSetupPartitionTupleRouting(estate, parentrel);
1683
1684 /*
1685 * Find the partition to which the "search tuple" belongs.
1686 */
1687 Assert(remoteslot != NULL);
1688 oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
1689 partrelinfo = ExecFindPartition(mtstate, relinfo, proute,
1690 remoteslot, estate);
1691 Assert(partrelinfo != NULL);
1692 partrel = partrelinfo->ri_RelationDesc;
1693
1694 /*
1695 * To perform any of the operations below, the tuple must match the
1696 * partition's rowtype. Convert if needed or just copy, using a dedicated
1697 * slot to store the tuple in any case.
1698 */
1699 remoteslot_part = partrelinfo->ri_PartitionTupleSlot;
1700 if (remoteslot_part == NULL)
1701 remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
1702 map = partrelinfo->ri_RootToPartitionMap;
1703 if (map != NULL)
1704 remoteslot_part = execute_attr_map_slot(map->attrMap, remoteslot,
1705 remoteslot_part);
1706 else
1707 {
1708 remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
1709 slot_getallattrs(remoteslot_part);
1710 }
1711 MemoryContextSwitchTo(oldctx);
1712
1713 switch (operation)
1714 {
1715 case CMD_INSERT:
1716 apply_handle_insert_internal(edata, partrelinfo,
1717 remoteslot_part);
1718 break;
1719
1720 case CMD_DELETE:
1721 apply_handle_delete_internal(edata, partrelinfo,
1722 remoteslot_part);
1723 break;
1724
1725 case CMD_UPDATE:
1726
1727 /*
1728 * For UPDATE, depending on whether or not the updated tuple
1729 * satisfies the partition's constraint, perform a simple UPDATE
1730 * of the partition or move the updated tuple into a different
1731 * suitable partition.
1732 */
1733 {
1734 AttrMap *attrmap = map ? map->attrMap : NULL;
1735 LogicalRepRelMapEntry *part_entry;
1736 TupleTableSlot *localslot;
1737 ResultRelInfo *partrelinfo_new;
1738 bool found;
1739
1740 part_entry = logicalrep_partition_open(relmapentry, partrel,
1741 attrmap);
1742
1743 /* Get the matching local tuple from the partition. */
1744 found = FindReplTupleInLocalRel(estate, partrel,
1745 &part_entry->remoterel,
1746 remoteslot_part, &localslot);
1747 if (!found)
1748 {
1749 /*
1750 * The tuple to be updated could not be found. Do nothing
1751 * except for emitting a log message.
1752 *
1753 * XXX should this be promoted to ereport(LOG) perhaps?
1754 */
1755 elog(DEBUG1,
1756 "logical replication did not find row to be updated "
1757 "in replication target relation's partition \"%s\"",
1758 RelationGetRelationName(partrel));
1759 return;
1760 }
1761
1762 /*
1763 * Apply the update to the local tuple, putting the result in
1764 * remoteslot_part.
1765 */
1766 oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
1767 slot_modify_data(remoteslot_part, localslot, part_entry,
1768 newtup);
1769 MemoryContextSwitchTo(oldctx);
1770
1771 /*
1772 * Does the updated tuple still satisfy the current
1773 * partition's constraint?
1774 */
1775 if (!partrel->rd_rel->relispartition ||
1776 ExecPartitionCheck(partrelinfo, remoteslot_part, estate,
1777 false))
1778 {
1779 /*
1780 * Yes, so simply UPDATE the partition. We don't call
1781 * apply_handle_update_internal() here, which would
1782 * normally do the following work, to avoid repeating some
1783 * work already done above to find the local tuple in the
1784 * partition.
1785 */
1786 EPQState epqstate;
1787
1788 EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
1789 ExecOpenIndices(partrelinfo, false);
1790
1791 EvalPlanQualSetSlot(&epqstate, remoteslot_part);
1792 ExecSimpleRelationUpdate(partrelinfo, estate, &epqstate,
1793 localslot, remoteslot_part);
1794 ExecCloseIndices(partrelinfo);
1795 EvalPlanQualEnd(&epqstate);
1796 }
1797 else
1798 {
1799 /* Move the tuple into the new partition. */
1800
1801 /*
1802 * New partition will be found using tuple routing, which
1803 * can only occur via the parent table. We might need to
1804 * convert the tuple to the parent's rowtype. Note that
1805 * this is the tuple found in the partition, not the
1806 * original search tuple received by this function.
1807 */
1808 if (map)
1809 {
1810 TupleConversionMap *PartitionToRootMap =
1811 convert_tuples_by_name(RelationGetDescr(partrel),
1812 RelationGetDescr(parentrel));
1813
1814 remoteslot =
1815 execute_attr_map_slot(PartitionToRootMap->attrMap,
1816 remoteslot_part, remoteslot);
1817 }
1818 else
1819 {
1820 remoteslot = ExecCopySlot(remoteslot, remoteslot_part);
1821 slot_getallattrs(remoteslot);
1822 }
1823
1824
1825 /* Find the new partition. */
1826 oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
1827 partrelinfo_new = ExecFindPartition(mtstate, relinfo,
1828 proute, remoteslot,
1829 estate);
1830 MemoryContextSwitchTo(oldctx);
1831 Assert(partrelinfo_new != partrelinfo);
1832
1833 /* DELETE old tuple found in the old partition. */
1834 apply_handle_delete_internal(edata, partrelinfo,
1835 localslot);
1836
1837 /* INSERT new tuple into the new partition. */
1838
1839 /*
1840 * Convert the replacement tuple to match the destination
1841 * partition rowtype.
1842 */
1843 oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
1844 partrel = partrelinfo_new->ri_RelationDesc;
1845 remoteslot_part = partrelinfo_new->ri_PartitionTupleSlot;
1846 if (remoteslot_part == NULL)
1847 remoteslot_part = table_slot_create(partrel,
1848 &estate->es_tupleTable);
1849 map = partrelinfo_new->ri_RootToPartitionMap;
1850 if (map != NULL)
1851 {
1852 remoteslot_part = execute_attr_map_slot(map->attrMap,
1853 remoteslot,
1854 remoteslot_part);
1855 }
1856 else
1857 {
1858 remoteslot_part = ExecCopySlot(remoteslot_part,
1859 remoteslot);
1860 slot_getallattrs(remoteslot);
1861 }
1862 MemoryContextSwitchTo(oldctx);
1863 apply_handle_insert_internal(edata, partrelinfo_new,
1864 remoteslot_part);
1865 }
1866 }
1867 break;
1868
1869 default:
1870 elog(ERROR, "unrecognized CmdType: %d", (int) operation);
1871 break;
1872 }
1873 }
1874
1875 /*
1876 * Handle TRUNCATE message.
1877 *
1878 * TODO: FDW support
1879 */
1880 static void
apply_handle_truncate(StringInfo s)1881 apply_handle_truncate(StringInfo s)
1882 {
1883 bool cascade = false;
1884 bool restart_seqs = false;
1885 List *remote_relids = NIL;
1886 List *remote_rels = NIL;
1887 List *rels = NIL;
1888 List *part_rels = NIL;
1889 List *relids = NIL;
1890 List *relids_logged = NIL;
1891 ListCell *lc;
1892 LOCKMODE lockmode = AccessExclusiveLock;
1893
1894 if (handle_streamed_transaction(LOGICAL_REP_MSG_TRUNCATE, s))
1895 return;
1896
1897 begin_replication_step();
1898
1899 remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
1900
1901 foreach(lc, remote_relids)
1902 {
1903 LogicalRepRelId relid = lfirst_oid(lc);
1904 LogicalRepRelMapEntry *rel;
1905
1906 rel = logicalrep_rel_open(relid, lockmode);
1907 if (!should_apply_changes_for_rel(rel))
1908 {
1909 /*
1910 * The relation can't become interesting in the middle of the
1911 * transaction so it's safe to unlock it.
1912 */
1913 logicalrep_rel_close(rel, lockmode);
1914 continue;
1915 }
1916
1917 remote_rels = lappend(remote_rels, rel);
1918 rels = lappend(rels, rel->localrel);
1919 relids = lappend_oid(relids, rel->localreloid);
1920 if (RelationIsLogicallyLogged(rel->localrel))
1921 relids_logged = lappend_oid(relids_logged, rel->localreloid);
1922
1923 /*
1924 * Truncate partitions if we got a message to truncate a partitioned
1925 * table.
1926 */
1927 if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1928 {
1929 ListCell *child;
1930 List *children = find_all_inheritors(rel->localreloid,
1931 lockmode,
1932 NULL);
1933
1934 foreach(child, children)
1935 {
1936 Oid childrelid = lfirst_oid(child);
1937 Relation childrel;
1938
1939 if (list_member_oid(relids, childrelid))
1940 continue;
1941
1942 /* find_all_inheritors already got lock */
1943 childrel = table_open(childrelid, NoLock);
1944
1945 /*
1946 * Ignore temp tables of other backends. See similar code in
1947 * ExecuteTruncate().
1948 */
1949 if (RELATION_IS_OTHER_TEMP(childrel))
1950 {
1951 table_close(childrel, lockmode);
1952 continue;
1953 }
1954
1955 rels = lappend(rels, childrel);
1956 part_rels = lappend(part_rels, childrel);
1957 relids = lappend_oid(relids, childrelid);
1958 /* Log this relation only if needed for logical decoding */
1959 if (RelationIsLogicallyLogged(childrel))
1960 relids_logged = lappend_oid(relids_logged, childrelid);
1961 }
1962 }
1963 }
1964
1965 /*
1966 * Even if we used CASCADE on the upstream primary we explicitly default
1967 * to replaying changes without further cascading. This might be later
1968 * changeable with a user specified option.
1969 */
1970 ExecuteTruncateGuts(rels,
1971 relids,
1972 relids_logged,
1973 DROP_RESTRICT,
1974 restart_seqs);
1975 foreach(lc, remote_rels)
1976 {
1977 LogicalRepRelMapEntry *rel = lfirst(lc);
1978
1979 logicalrep_rel_close(rel, NoLock);
1980 }
1981 foreach(lc, part_rels)
1982 {
1983 Relation rel = lfirst(lc);
1984
1985 table_close(rel, NoLock);
1986 }
1987
1988 end_replication_step();
1989 }
1990
1991
1992 /*
1993 * Logical replication protocol message dispatcher.
1994 */
1995 static void
apply_dispatch(StringInfo s)1996 apply_dispatch(StringInfo s)
1997 {
1998 LogicalRepMsgType action = pq_getmsgbyte(s);
1999
2000 switch (action)
2001 {
2002 case LOGICAL_REP_MSG_BEGIN:
2003 apply_handle_begin(s);
2004 return;
2005
2006 case LOGICAL_REP_MSG_COMMIT:
2007 apply_handle_commit(s);
2008 return;
2009
2010 case LOGICAL_REP_MSG_INSERT:
2011 apply_handle_insert(s);
2012 return;
2013
2014 case LOGICAL_REP_MSG_UPDATE:
2015 apply_handle_update(s);
2016 return;
2017
2018 case LOGICAL_REP_MSG_DELETE:
2019 apply_handle_delete(s);
2020 return;
2021
2022 case LOGICAL_REP_MSG_TRUNCATE:
2023 apply_handle_truncate(s);
2024 return;
2025
2026 case LOGICAL_REP_MSG_RELATION:
2027 apply_handle_relation(s);
2028 return;
2029
2030 case LOGICAL_REP_MSG_TYPE:
2031 apply_handle_type(s);
2032 return;
2033
2034 case LOGICAL_REP_MSG_ORIGIN:
2035 apply_handle_origin(s);
2036 return;
2037
2038 case LOGICAL_REP_MSG_MESSAGE:
2039
2040 /*
2041 * Logical replication does not use generic logical messages yet.
2042 * Although, it could be used by other applications that use this
2043 * output plugin.
2044 */
2045 return;
2046
2047 case LOGICAL_REP_MSG_STREAM_START:
2048 apply_handle_stream_start(s);
2049 return;
2050
2051 case LOGICAL_REP_MSG_STREAM_END:
2052 apply_handle_stream_stop(s);
2053 return;
2054
2055 case LOGICAL_REP_MSG_STREAM_ABORT:
2056 apply_handle_stream_abort(s);
2057 return;
2058
2059 case LOGICAL_REP_MSG_STREAM_COMMIT:
2060 apply_handle_stream_commit(s);
2061 return;
2062 }
2063
2064 ereport(ERROR,
2065 (errcode(ERRCODE_PROTOCOL_VIOLATION),
2066 errmsg_internal("invalid logical replication message type \"%c\"",
2067 action)));
2068 }
2069
2070 /*
2071 * Figure out which write/flush positions to report to the walsender process.
2072 *
2073 * We can't simply report back the last LSN the walsender sent us because the
2074 * local transaction might not yet be flushed to disk locally. Instead we
2075 * build a list that associates local with remote LSNs for every commit. When
2076 * reporting back the flush position to the sender we iterate that list and
2077 * check which entries on it are already locally flushed. Those we can report
2078 * as having been flushed.
2079 *
2080 * The have_pending_txes is true if there are outstanding transactions that
2081 * need to be flushed.
2082 */
2083 static void
get_flush_position(XLogRecPtr * write,XLogRecPtr * flush,bool * have_pending_txes)2084 get_flush_position(XLogRecPtr *write, XLogRecPtr *flush,
2085 bool *have_pending_txes)
2086 {
2087 dlist_mutable_iter iter;
2088 XLogRecPtr local_flush = GetFlushRecPtr();
2089
2090 *write = InvalidXLogRecPtr;
2091 *flush = InvalidXLogRecPtr;
2092
2093 dlist_foreach_modify(iter, &lsn_mapping)
2094 {
2095 FlushPosition *pos =
2096 dlist_container(FlushPosition, node, iter.cur);
2097
2098 *write = pos->remote_end;
2099
2100 if (pos->local_end <= local_flush)
2101 {
2102 *flush = pos->remote_end;
2103 dlist_delete(iter.cur);
2104 pfree(pos);
2105 }
2106 else
2107 {
2108 /*
2109 * Don't want to uselessly iterate over the rest of the list which
2110 * could potentially be long. Instead get the last element and
2111 * grab the write position from there.
2112 */
2113 pos = dlist_tail_element(FlushPosition, node,
2114 &lsn_mapping);
2115 *write = pos->remote_end;
2116 *have_pending_txes = true;
2117 return;
2118 }
2119 }
2120
2121 *have_pending_txes = !dlist_is_empty(&lsn_mapping);
2122 }
2123
2124 /*
2125 * Store current remote/local lsn pair in the tracking list.
2126 */
2127 static void
store_flush_position(XLogRecPtr remote_lsn)2128 store_flush_position(XLogRecPtr remote_lsn)
2129 {
2130 FlushPosition *flushpos;
2131
2132 /* Need to do this in permanent context */
2133 MemoryContextSwitchTo(ApplyContext);
2134
2135 /* Track commit lsn */
2136 flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
2137 flushpos->local_end = XactLastCommitEnd;
2138 flushpos->remote_end = remote_lsn;
2139
2140 dlist_push_tail(&lsn_mapping, &flushpos->node);
2141 MemoryContextSwitchTo(ApplyMessageContext);
2142 }
2143
2144
2145 /* Update statistics of the worker. */
2146 static void
UpdateWorkerStats(XLogRecPtr last_lsn,TimestampTz send_time,bool reply)2147 UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
2148 {
2149 MyLogicalRepWorker->last_lsn = last_lsn;
2150 MyLogicalRepWorker->last_send_time = send_time;
2151 MyLogicalRepWorker->last_recv_time = GetCurrentTimestamp();
2152 if (reply)
2153 {
2154 MyLogicalRepWorker->reply_lsn = last_lsn;
2155 MyLogicalRepWorker->reply_time = send_time;
2156 }
2157 }
2158
2159 /*
2160 * Apply main loop.
2161 */
2162 static void
LogicalRepApplyLoop(XLogRecPtr last_received)2163 LogicalRepApplyLoop(XLogRecPtr last_received)
2164 {
2165 TimestampTz last_recv_timestamp = GetCurrentTimestamp();
2166 bool ping_sent = false;
2167 TimeLineID tli;
2168
2169 /*
2170 * Init the ApplyMessageContext which we clean up after each replication
2171 * protocol message.
2172 */
2173 ApplyMessageContext = AllocSetContextCreate(ApplyContext,
2174 "ApplyMessageContext",
2175 ALLOCSET_DEFAULT_SIZES);
2176
2177 /*
2178 * This memory context is used for per-stream data when the streaming mode
2179 * is enabled. This context is reset on each stream stop.
2180 */
2181 LogicalStreamingContext = AllocSetContextCreate(ApplyContext,
2182 "LogicalStreamingContext",
2183 ALLOCSET_DEFAULT_SIZES);
2184
2185 /* mark as idle, before starting to loop */
2186 pgstat_report_activity(STATE_IDLE, NULL);
2187
2188 /* This outer loop iterates once per wait. */
2189 for (;;)
2190 {
2191 pgsocket fd = PGINVALID_SOCKET;
2192 int rc;
2193 int len;
2194 char *buf = NULL;
2195 bool endofstream = false;
2196 long wait_time;
2197
2198 CHECK_FOR_INTERRUPTS();
2199
2200 MemoryContextSwitchTo(ApplyMessageContext);
2201
2202 len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
2203
2204 if (len != 0)
2205 {
2206 /* Loop to process all available data (without blocking). */
2207 for (;;)
2208 {
2209 CHECK_FOR_INTERRUPTS();
2210
2211 if (len == 0)
2212 {
2213 break;
2214 }
2215 else if (len < 0)
2216 {
2217 ereport(LOG,
2218 (errmsg("data stream from publisher has ended")));
2219 endofstream = true;
2220 break;
2221 }
2222 else
2223 {
2224 int c;
2225 StringInfoData s;
2226
2227 /* Reset timeout. */
2228 last_recv_timestamp = GetCurrentTimestamp();
2229 ping_sent = false;
2230
2231 /* Ensure we are reading the data into our memory context. */
2232 MemoryContextSwitchTo(ApplyMessageContext);
2233
2234 s.data = buf;
2235 s.len = len;
2236 s.cursor = 0;
2237 s.maxlen = -1;
2238
2239 c = pq_getmsgbyte(&s);
2240
2241 if (c == 'w')
2242 {
2243 XLogRecPtr start_lsn;
2244 XLogRecPtr end_lsn;
2245 TimestampTz send_time;
2246
2247 start_lsn = pq_getmsgint64(&s);
2248 end_lsn = pq_getmsgint64(&s);
2249 send_time = pq_getmsgint64(&s);
2250
2251 if (last_received < start_lsn)
2252 last_received = start_lsn;
2253
2254 if (last_received < end_lsn)
2255 last_received = end_lsn;
2256
2257 UpdateWorkerStats(last_received, send_time, false);
2258
2259 apply_dispatch(&s);
2260 }
2261 else if (c == 'k')
2262 {
2263 XLogRecPtr end_lsn;
2264 TimestampTz timestamp;
2265 bool reply_requested;
2266
2267 end_lsn = pq_getmsgint64(&s);
2268 timestamp = pq_getmsgint64(&s);
2269 reply_requested = pq_getmsgbyte(&s);
2270
2271 if (last_received < end_lsn)
2272 last_received = end_lsn;
2273
2274 send_feedback(last_received, reply_requested, false);
2275 UpdateWorkerStats(last_received, timestamp, true);
2276 }
2277 /* other message types are purposefully ignored */
2278
2279 MemoryContextReset(ApplyMessageContext);
2280 }
2281
2282 len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
2283 }
2284 }
2285
2286 /* confirm all writes so far */
2287 send_feedback(last_received, false, false);
2288
2289 if (!in_remote_transaction && !in_streamed_transaction)
2290 {
2291 /*
2292 * If we didn't get any transactions for a while there might be
2293 * unconsumed invalidation messages in the queue, consume them
2294 * now.
2295 */
2296 AcceptInvalidationMessages();
2297 maybe_reread_subscription();
2298
2299 /* Process any table synchronization changes. */
2300 process_syncing_tables(last_received);
2301 }
2302
2303 /* Cleanup the memory. */
2304 MemoryContextResetAndDeleteChildren(ApplyMessageContext);
2305 MemoryContextSwitchTo(TopMemoryContext);
2306
2307 /* Check if we need to exit the streaming loop. */
2308 if (endofstream)
2309 break;
2310
2311 /*
2312 * Wait for more data or latch. If we have unflushed transactions,
2313 * wake up after WalWriterDelay to see if they've been flushed yet (in
2314 * which case we should send a feedback message). Otherwise, there's
2315 * no particular urgency about waking up unless we get data or a
2316 * signal.
2317 */
2318 if (!dlist_is_empty(&lsn_mapping))
2319 wait_time = WalWriterDelay;
2320 else
2321 wait_time = NAPTIME_PER_CYCLE;
2322
2323 rc = WaitLatchOrSocket(MyLatch,
2324 WL_SOCKET_READABLE | WL_LATCH_SET |
2325 WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
2326 fd, wait_time,
2327 WAIT_EVENT_LOGICAL_APPLY_MAIN);
2328
2329 if (rc & WL_LATCH_SET)
2330 {
2331 ResetLatch(MyLatch);
2332 CHECK_FOR_INTERRUPTS();
2333 }
2334
2335 if (ConfigReloadPending)
2336 {
2337 ConfigReloadPending = false;
2338 ProcessConfigFile(PGC_SIGHUP);
2339 }
2340
2341 if (rc & WL_TIMEOUT)
2342 {
2343 /*
2344 * We didn't receive anything new. If we haven't heard anything
2345 * from the server for more than wal_receiver_timeout / 2, ping
2346 * the server. Also, if it's been longer than
2347 * wal_receiver_status_interval since the last update we sent,
2348 * send a status update to the primary anyway, to report any
2349 * progress in applying WAL.
2350 */
2351 bool requestReply = false;
2352
2353 /*
2354 * Check if time since last receive from primary has reached the
2355 * configured limit.
2356 */
2357 if (wal_receiver_timeout > 0)
2358 {
2359 TimestampTz now = GetCurrentTimestamp();
2360 TimestampTz timeout;
2361
2362 timeout =
2363 TimestampTzPlusMilliseconds(last_recv_timestamp,
2364 wal_receiver_timeout);
2365
2366 if (now >= timeout)
2367 ereport(ERROR,
2368 (errcode(ERRCODE_CONNECTION_FAILURE),
2369 errmsg("terminating logical replication worker due to timeout")));
2370
2371 /* Check to see if it's time for a ping. */
2372 if (!ping_sent)
2373 {
2374 timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
2375 (wal_receiver_timeout / 2));
2376 if (now >= timeout)
2377 {
2378 requestReply = true;
2379 ping_sent = true;
2380 }
2381 }
2382 }
2383
2384 send_feedback(last_received, requestReply, requestReply);
2385 }
2386 }
2387
2388 /* All done */
2389 walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
2390 }
2391
2392 /*
2393 * Send a Standby Status Update message to server.
2394 *
2395 * 'recvpos' is the latest LSN we've received data to, force is set if we need
2396 * to send a response to avoid timeouts.
2397 */
2398 static void
send_feedback(XLogRecPtr recvpos,bool force,bool requestReply)2399 send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
2400 {
2401 static StringInfo reply_message = NULL;
2402 static TimestampTz send_time = 0;
2403
2404 static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
2405 static XLogRecPtr last_writepos = InvalidXLogRecPtr;
2406 static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
2407
2408 XLogRecPtr writepos;
2409 XLogRecPtr flushpos;
2410 TimestampTz now;
2411 bool have_pending_txes;
2412
2413 /*
2414 * If the user doesn't want status to be reported to the publisher, be
2415 * sure to exit before doing anything at all.
2416 */
2417 if (!force && wal_receiver_status_interval <= 0)
2418 return;
2419
2420 /* It's legal to not pass a recvpos */
2421 if (recvpos < last_recvpos)
2422 recvpos = last_recvpos;
2423
2424 get_flush_position(&writepos, &flushpos, &have_pending_txes);
2425
2426 /*
2427 * No outstanding transactions to flush, we can report the latest received
2428 * position. This is important for synchronous replication.
2429 */
2430 if (!have_pending_txes)
2431 flushpos = writepos = recvpos;
2432
2433 if (writepos < last_writepos)
2434 writepos = last_writepos;
2435
2436 if (flushpos < last_flushpos)
2437 flushpos = last_flushpos;
2438
2439 now = GetCurrentTimestamp();
2440
2441 /* if we've already reported everything we're good */
2442 if (!force &&
2443 writepos == last_writepos &&
2444 flushpos == last_flushpos &&
2445 !TimestampDifferenceExceeds(send_time, now,
2446 wal_receiver_status_interval * 1000))
2447 return;
2448 send_time = now;
2449
2450 if (!reply_message)
2451 {
2452 MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
2453
2454 reply_message = makeStringInfo();
2455 MemoryContextSwitchTo(oldctx);
2456 }
2457 else
2458 resetStringInfo(reply_message);
2459
2460 pq_sendbyte(reply_message, 'r');
2461 pq_sendint64(reply_message, recvpos); /* write */
2462 pq_sendint64(reply_message, flushpos); /* flush */
2463 pq_sendint64(reply_message, writepos); /* apply */
2464 pq_sendint64(reply_message, now); /* sendTime */
2465 pq_sendbyte(reply_message, requestReply); /* replyRequested */
2466
2467 elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
2468 force,
2469 LSN_FORMAT_ARGS(recvpos),
2470 LSN_FORMAT_ARGS(writepos),
2471 LSN_FORMAT_ARGS(flushpos));
2472
2473 walrcv_send(LogRepWorkerWalRcvConn,
2474 reply_message->data, reply_message->len);
2475
2476 if (recvpos > last_recvpos)
2477 last_recvpos = recvpos;
2478 if (writepos > last_writepos)
2479 last_writepos = writepos;
2480 if (flushpos > last_flushpos)
2481 last_flushpos = flushpos;
2482 }
2483
2484 /*
2485 * Reread subscription info if needed. Most changes will be exit.
2486 */
2487 static void
maybe_reread_subscription(void)2488 maybe_reread_subscription(void)
2489 {
2490 MemoryContext oldctx;
2491 Subscription *newsub;
2492 bool started_tx = false;
2493
2494 /* When cache state is valid there is nothing to do here. */
2495 if (MySubscriptionValid)
2496 return;
2497
2498 /* This function might be called inside or outside of transaction. */
2499 if (!IsTransactionState())
2500 {
2501 StartTransactionCommand();
2502 started_tx = true;
2503 }
2504
2505 /* Ensure allocations in permanent context. */
2506 oldctx = MemoryContextSwitchTo(ApplyContext);
2507
2508 newsub = GetSubscription(MyLogicalRepWorker->subid, true);
2509
2510 /*
2511 * Exit if the subscription was removed. This normally should not happen
2512 * as the worker gets killed during DROP SUBSCRIPTION.
2513 */
2514 if (!newsub)
2515 {
2516 ereport(LOG,
2517 (errmsg("logical replication apply worker for subscription \"%s\" will "
2518 "stop because the subscription was removed",
2519 MySubscription->name)));
2520
2521 proc_exit(0);
2522 }
2523
2524 /*
2525 * Exit if the subscription was disabled. This normally should not happen
2526 * as the worker gets killed during ALTER SUBSCRIPTION ... DISABLE.
2527 */
2528 if (!newsub->enabled)
2529 {
2530 ereport(LOG,
2531 (errmsg("logical replication apply worker for subscription \"%s\" will "
2532 "stop because the subscription was disabled",
2533 MySubscription->name)));
2534
2535 proc_exit(0);
2536 }
2537
2538 /* !slotname should never happen when enabled is true. */
2539 Assert(newsub->slotname);
2540
2541 /*
2542 * Exit if any parameter that affects the remote connection was changed.
2543 * The launcher will start a new worker.
2544 */
2545 if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
2546 strcmp(newsub->name, MySubscription->name) != 0 ||
2547 strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
2548 newsub->binary != MySubscription->binary ||
2549 newsub->stream != MySubscription->stream ||
2550 !equal(newsub->publications, MySubscription->publications))
2551 {
2552 ereport(LOG,
2553 (errmsg("logical replication apply worker for subscription \"%s\" will restart because of a parameter change",
2554 MySubscription->name)));
2555
2556 proc_exit(0);
2557 }
2558
2559 /* Check for other changes that should never happen too. */
2560 if (newsub->dbid != MySubscription->dbid)
2561 {
2562 elog(ERROR, "subscription %u changed unexpectedly",
2563 MyLogicalRepWorker->subid);
2564 }
2565
2566 /* Clean old subscription info and switch to new one. */
2567 FreeSubscription(MySubscription);
2568 MySubscription = newsub;
2569
2570 MemoryContextSwitchTo(oldctx);
2571
2572 /* Change synchronous commit according to the user's wishes */
2573 SetConfigOption("synchronous_commit", MySubscription->synccommit,
2574 PGC_BACKEND, PGC_S_OVERRIDE);
2575
2576 if (started_tx)
2577 CommitTransactionCommand();
2578
2579 MySubscriptionValid = true;
2580 }
2581
2582 /*
2583 * Callback from subscription syscache invalidation.
2584 */
2585 static void
subscription_change_cb(Datum arg,int cacheid,uint32 hashvalue)2586 subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
2587 {
2588 MySubscriptionValid = false;
2589 }
2590
2591 /*
2592 * subxact_info_write
2593 * Store information about subxacts for a toplevel transaction.
2594 *
2595 * For each subxact we store offset of it's first change in the main file.
2596 * The file is always over-written as a whole.
2597 *
2598 * XXX We should only store subxacts that were not aborted yet.
2599 */
2600 static void
subxact_info_write(Oid subid,TransactionId xid)2601 subxact_info_write(Oid subid, TransactionId xid)
2602 {
2603 char path[MAXPGPATH];
2604 Size len;
2605 StreamXidHash *ent;
2606 BufFile *fd;
2607
2608 Assert(TransactionIdIsValid(xid));
2609
2610 /* Find the xid entry in the xidhash */
2611 ent = (StreamXidHash *) hash_search(xidhash,
2612 (void *) &xid,
2613 HASH_FIND,
2614 NULL);
2615 /* By this time we must have created the transaction entry */
2616 Assert(ent);
2617
2618 /*
2619 * If there is no subtransaction then nothing to do, but if already have
2620 * subxact file then delete that.
2621 */
2622 if (subxact_data.nsubxacts == 0)
2623 {
2624 if (ent->subxact_fileset)
2625 {
2626 cleanup_subxact_info();
2627 SharedFileSetDeleteAll(ent->subxact_fileset);
2628 pfree(ent->subxact_fileset);
2629 ent->subxact_fileset = NULL;
2630 }
2631 return;
2632 }
2633
2634 subxact_filename(path, subid, xid);
2635
2636 /*
2637 * Create the subxact file if it not already created, otherwise open the
2638 * existing file.
2639 */
2640 if (ent->subxact_fileset == NULL)
2641 {
2642 MemoryContext oldctx;
2643
2644 /*
2645 * We need to maintain shared fileset across multiple stream
2646 * start/stop calls. So, need to allocate it in a persistent context.
2647 */
2648 oldctx = MemoryContextSwitchTo(ApplyContext);
2649 ent->subxact_fileset = palloc(sizeof(SharedFileSet));
2650 SharedFileSetInit(ent->subxact_fileset, NULL);
2651 MemoryContextSwitchTo(oldctx);
2652
2653 fd = BufFileCreateShared(ent->subxact_fileset, path);
2654 }
2655 else
2656 fd = BufFileOpenShared(ent->subxact_fileset, path, O_RDWR);
2657
2658 len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
2659
2660 /* Write the subxact count and subxact info */
2661 BufFileWrite(fd, &subxact_data.nsubxacts, sizeof(subxact_data.nsubxacts));
2662 BufFileWrite(fd, subxact_data.subxacts, len);
2663
2664 BufFileClose(fd);
2665
2666 /* free the memory allocated for subxact info */
2667 cleanup_subxact_info();
2668 }
2669
2670 /*
2671 * subxact_info_read
2672 * Restore information about subxacts of a streamed transaction.
2673 *
2674 * Read information about subxacts into the structure subxact_data that can be
2675 * used later.
2676 */
2677 static void
subxact_info_read(Oid subid,TransactionId xid)2678 subxact_info_read(Oid subid, TransactionId xid)
2679 {
2680 char path[MAXPGPATH];
2681 Size len;
2682 BufFile *fd;
2683 StreamXidHash *ent;
2684 MemoryContext oldctx;
2685
2686 Assert(!subxact_data.subxacts);
2687 Assert(subxact_data.nsubxacts == 0);
2688 Assert(subxact_data.nsubxacts_max == 0);
2689
2690 /* Find the stream xid entry in the xidhash */
2691 ent = (StreamXidHash *) hash_search(xidhash,
2692 (void *) &xid,
2693 HASH_FIND,
2694 NULL);
2695 if (!ent)
2696 ereport(ERROR,
2697 (errcode(ERRCODE_PROTOCOL_VIOLATION),
2698 errmsg_internal("transaction %u not found in stream XID hash table",
2699 xid)));
2700
2701 /*
2702 * If subxact_fileset is not valid that mean we don't have any subxact
2703 * info
2704 */
2705 if (ent->subxact_fileset == NULL)
2706 return;
2707
2708 subxact_filename(path, subid, xid);
2709
2710 fd = BufFileOpenShared(ent->subxact_fileset, path, O_RDONLY);
2711
2712 /* read number of subxact items */
2713 if (BufFileRead(fd, &subxact_data.nsubxacts,
2714 sizeof(subxact_data.nsubxacts)) !=
2715 sizeof(subxact_data.nsubxacts))
2716 ereport(ERROR,
2717 (errcode_for_file_access(),
2718 errmsg("could not read from streaming transaction's subxact file \"%s\": %m",
2719 path)));
2720
2721 len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
2722
2723 /* we keep the maximum as a power of 2 */
2724 subxact_data.nsubxacts_max = 1 << my_log2(subxact_data.nsubxacts);
2725
2726 /*
2727 * Allocate subxact information in the logical streaming context. We need
2728 * this information during the complete stream so that we can add the sub
2729 * transaction info to this. On stream stop we will flush this information
2730 * to the subxact file and reset the logical streaming context.
2731 */
2732 oldctx = MemoryContextSwitchTo(LogicalStreamingContext);
2733 subxact_data.subxacts = palloc(subxact_data.nsubxacts_max *
2734 sizeof(SubXactInfo));
2735 MemoryContextSwitchTo(oldctx);
2736
2737 if ((len > 0) && ((BufFileRead(fd, subxact_data.subxacts, len)) != len))
2738 ereport(ERROR,
2739 (errcode_for_file_access(),
2740 errmsg("could not read from streaming transaction's subxact file \"%s\": %m",
2741 path)));
2742
2743 BufFileClose(fd);
2744 }
2745
2746 /*
2747 * subxact_info_add
2748 * Add information about a subxact (offset in the main file).
2749 */
2750 static void
subxact_info_add(TransactionId xid)2751 subxact_info_add(TransactionId xid)
2752 {
2753 SubXactInfo *subxacts = subxact_data.subxacts;
2754 int64 i;
2755
2756 /* We must have a valid top level stream xid and a stream fd. */
2757 Assert(TransactionIdIsValid(stream_xid));
2758 Assert(stream_fd != NULL);
2759
2760 /*
2761 * If the XID matches the toplevel transaction, we don't want to add it.
2762 */
2763 if (stream_xid == xid)
2764 return;
2765
2766 /*
2767 * In most cases we're checking the same subxact as we've already seen in
2768 * the last call, so make sure to ignore it (this change comes later).
2769 */
2770 if (subxact_data.subxact_last == xid)
2771 return;
2772
2773 /* OK, remember we're processing this XID. */
2774 subxact_data.subxact_last = xid;
2775
2776 /*
2777 * Check if the transaction is already present in the array of subxact. We
2778 * intentionally scan the array from the tail, because we're likely adding
2779 * a change for the most recent subtransactions.
2780 *
2781 * XXX Can we rely on the subxact XIDs arriving in sorted order? That
2782 * would allow us to use binary search here.
2783 */
2784 for (i = subxact_data.nsubxacts; i > 0; i--)
2785 {
2786 /* found, so we're done */
2787 if (subxacts[i - 1].xid == xid)
2788 return;
2789 }
2790
2791 /* This is a new subxact, so we need to add it to the array. */
2792 if (subxact_data.nsubxacts == 0)
2793 {
2794 MemoryContext oldctx;
2795
2796 subxact_data.nsubxacts_max = 128;
2797
2798 /*
2799 * Allocate this memory for subxacts in per-stream context, see
2800 * subxact_info_read.
2801 */
2802 oldctx = MemoryContextSwitchTo(LogicalStreamingContext);
2803 subxacts = palloc(subxact_data.nsubxacts_max * sizeof(SubXactInfo));
2804 MemoryContextSwitchTo(oldctx);
2805 }
2806 else if (subxact_data.nsubxacts == subxact_data.nsubxacts_max)
2807 {
2808 subxact_data.nsubxacts_max *= 2;
2809 subxacts = repalloc(subxacts,
2810 subxact_data.nsubxacts_max * sizeof(SubXactInfo));
2811 }
2812
2813 subxacts[subxact_data.nsubxacts].xid = xid;
2814
2815 /*
2816 * Get the current offset of the stream file and store it as offset of
2817 * this subxact.
2818 */
2819 BufFileTell(stream_fd,
2820 &subxacts[subxact_data.nsubxacts].fileno,
2821 &subxacts[subxact_data.nsubxacts].offset);
2822
2823 subxact_data.nsubxacts++;
2824 subxact_data.subxacts = subxacts;
2825 }
2826
2827 /* format filename for file containing the info about subxacts */
2828 static inline void
subxact_filename(char * path,Oid subid,TransactionId xid)2829 subxact_filename(char *path, Oid subid, TransactionId xid)
2830 {
2831 snprintf(path, MAXPGPATH, "%u-%u.subxacts", subid, xid);
2832 }
2833
2834 /* format filename for file containing serialized changes */
2835 static inline void
changes_filename(char * path,Oid subid,TransactionId xid)2836 changes_filename(char *path, Oid subid, TransactionId xid)
2837 {
2838 snprintf(path, MAXPGPATH, "%u-%u.changes", subid, xid);
2839 }
2840
2841 /*
2842 * stream_cleanup_files
2843 * Cleanup files for a subscription / toplevel transaction.
2844 *
2845 * Remove files with serialized changes and subxact info for a particular
2846 * toplevel transaction. Each subscription has a separate set of files.
2847 */
2848 static void
stream_cleanup_files(Oid subid,TransactionId xid)2849 stream_cleanup_files(Oid subid, TransactionId xid)
2850 {
2851 char path[MAXPGPATH];
2852 StreamXidHash *ent;
2853
2854 /* Find the xid entry in the xidhash */
2855 ent = (StreamXidHash *) hash_search(xidhash,
2856 (void *) &xid,
2857 HASH_FIND,
2858 NULL);
2859 if (!ent)
2860 ereport(ERROR,
2861 (errcode(ERRCODE_PROTOCOL_VIOLATION),
2862 errmsg_internal("transaction %u not found in stream XID hash table",
2863 xid)));
2864
2865 /* Delete the change file and release the stream fileset memory */
2866 changes_filename(path, subid, xid);
2867 SharedFileSetDeleteAll(ent->stream_fileset);
2868 pfree(ent->stream_fileset);
2869 ent->stream_fileset = NULL;
2870
2871 /* Delete the subxact file and release the memory, if it exist */
2872 if (ent->subxact_fileset)
2873 {
2874 subxact_filename(path, subid, xid);
2875 SharedFileSetDeleteAll(ent->subxact_fileset);
2876 pfree(ent->subxact_fileset);
2877 ent->subxact_fileset = NULL;
2878 }
2879
2880 /* Remove the xid entry from the stream xid hash */
2881 hash_search(xidhash, (void *) &xid, HASH_REMOVE, NULL);
2882 }
2883
2884 /*
2885 * stream_open_file
2886 * Open a file that we'll use to serialize changes for a toplevel
2887 * transaction.
2888 *
2889 * Open a file for streamed changes from a toplevel transaction identified
2890 * by stream_xid (global variable). If it's the first chunk of streamed
2891 * changes for this transaction, initialize the shared fileset and create the
2892 * buffile, otherwise open the previously created file.
2893 *
2894 * This can only be called at the beginning of a "streaming" block, i.e.
2895 * between stream_start/stream_stop messages from the upstream.
2896 */
2897 static void
stream_open_file(Oid subid,TransactionId xid,bool first_segment)2898 stream_open_file(Oid subid, TransactionId xid, bool first_segment)
2899 {
2900 char path[MAXPGPATH];
2901 bool found;
2902 MemoryContext oldcxt;
2903 StreamXidHash *ent;
2904
2905 Assert(in_streamed_transaction);
2906 Assert(OidIsValid(subid));
2907 Assert(TransactionIdIsValid(xid));
2908 Assert(stream_fd == NULL);
2909
2910 /* create or find the xid entry in the xidhash */
2911 ent = (StreamXidHash *) hash_search(xidhash,
2912 (void *) &xid,
2913 HASH_ENTER,
2914 &found);
2915
2916 changes_filename(path, subid, xid);
2917 elog(DEBUG1, "opening file \"%s\" for streamed changes", path);
2918
2919 /*
2920 * Create/open the buffiles under the logical streaming context so that we
2921 * have those files until stream stop.
2922 */
2923 oldcxt = MemoryContextSwitchTo(LogicalStreamingContext);
2924
2925 /*
2926 * If this is the first streamed segment, the file must not exist, so make
2927 * sure we're the ones creating it. Otherwise just open the file for
2928 * writing, in append mode.
2929 */
2930 if (first_segment)
2931 {
2932 MemoryContext savectx;
2933 SharedFileSet *fileset;
2934
2935 if (found)
2936 ereport(ERROR,
2937 (errcode(ERRCODE_PROTOCOL_VIOLATION),
2938 errmsg_internal("incorrect first-segment flag for streamed replication transaction")));
2939
2940 /*
2941 * We need to maintain shared fileset across multiple stream
2942 * start/stop calls. So, need to allocate it in a persistent context.
2943 */
2944 savectx = MemoryContextSwitchTo(ApplyContext);
2945 fileset = palloc(sizeof(SharedFileSet));
2946
2947 SharedFileSetInit(fileset, NULL);
2948 MemoryContextSwitchTo(savectx);
2949
2950 stream_fd = BufFileCreateShared(fileset, path);
2951
2952 /* Remember the fileset for the next stream of the same transaction */
2953 ent->xid = xid;
2954 ent->stream_fileset = fileset;
2955 ent->subxact_fileset = NULL;
2956 }
2957 else
2958 {
2959 if (!found)
2960 ereport(ERROR,
2961 (errcode(ERRCODE_PROTOCOL_VIOLATION),
2962 errmsg_internal("incorrect first-segment flag for streamed replication transaction")));
2963
2964 /*
2965 * Open the file and seek to the end of the file because we always
2966 * append the changes file.
2967 */
2968 stream_fd = BufFileOpenShared(ent->stream_fileset, path, O_RDWR);
2969 BufFileSeek(stream_fd, 0, 0, SEEK_END);
2970 }
2971
2972 MemoryContextSwitchTo(oldcxt);
2973 }
2974
2975 /*
2976 * stream_close_file
2977 * Close the currently open file with streamed changes.
2978 *
2979 * This can only be called at the end of a streaming block, i.e. at stream_stop
2980 * message from the upstream.
2981 */
2982 static void
stream_close_file(void)2983 stream_close_file(void)
2984 {
2985 Assert(in_streamed_transaction);
2986 Assert(TransactionIdIsValid(stream_xid));
2987 Assert(stream_fd != NULL);
2988
2989 BufFileClose(stream_fd);
2990
2991 stream_xid = InvalidTransactionId;
2992 stream_fd = NULL;
2993 }
2994
2995 /*
2996 * stream_write_change
2997 * Serialize a change to a file for the current toplevel transaction.
2998 *
2999 * The change is serialized in a simple format, with length (not including
3000 * the length), action code (identifying the message type) and message
3001 * contents (without the subxact TransactionId value).
3002 */
3003 static void
stream_write_change(char action,StringInfo s)3004 stream_write_change(char action, StringInfo s)
3005 {
3006 int len;
3007
3008 Assert(in_streamed_transaction);
3009 Assert(TransactionIdIsValid(stream_xid));
3010 Assert(stream_fd != NULL);
3011
3012 /* total on-disk size, including the action type character */
3013 len = (s->len - s->cursor) + sizeof(char);
3014
3015 /* first write the size */
3016 BufFileWrite(stream_fd, &len, sizeof(len));
3017
3018 /* then the action */
3019 BufFileWrite(stream_fd, &action, sizeof(action));
3020
3021 /* and finally the remaining part of the buffer (after the XID) */
3022 len = (s->len - s->cursor);
3023
3024 BufFileWrite(stream_fd, &s->data[s->cursor], len);
3025 }
3026
3027 /*
3028 * Cleanup the memory for subxacts and reset the related variables.
3029 */
3030 static inline void
cleanup_subxact_info()3031 cleanup_subxact_info()
3032 {
3033 if (subxact_data.subxacts)
3034 pfree(subxact_data.subxacts);
3035
3036 subxact_data.subxacts = NULL;
3037 subxact_data.subxact_last = InvalidTransactionId;
3038 subxact_data.nsubxacts = 0;
3039 subxact_data.nsubxacts_max = 0;
3040 }
3041
3042 /* Logical Replication Apply worker entry point */
3043 void
ApplyWorkerMain(Datum main_arg)3044 ApplyWorkerMain(Datum main_arg)
3045 {
3046 int worker_slot = DatumGetInt32(main_arg);
3047 MemoryContext oldctx;
3048 char originname[NAMEDATALEN];
3049 XLogRecPtr origin_startpos;
3050 char *myslotname;
3051 WalRcvStreamOptions options;
3052
3053 /* Attach to slot */
3054 logicalrep_worker_attach(worker_slot);
3055
3056 /* Setup signal handling */
3057 pqsignal(SIGHUP, SignalHandlerForConfigReload);
3058 pqsignal(SIGTERM, die);
3059 BackgroundWorkerUnblockSignals();
3060
3061 /*
3062 * We don't currently need any ResourceOwner in a walreceiver process, but
3063 * if we did, we could call CreateAuxProcessResourceOwner here.
3064 */
3065
3066 /* Initialise stats to a sanish value */
3067 MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
3068 MyLogicalRepWorker->reply_time = GetCurrentTimestamp();
3069
3070 /* Load the libpq-specific functions */
3071 load_file("libpqwalreceiver", false);
3072
3073 /* Run as replica session replication role. */
3074 SetConfigOption("session_replication_role", "replica",
3075 PGC_SUSET, PGC_S_OVERRIDE);
3076
3077 /* Connect to our database. */
3078 BackgroundWorkerInitializeConnectionByOid(MyLogicalRepWorker->dbid,
3079 MyLogicalRepWorker->userid,
3080 0);
3081
3082 /*
3083 * Set always-secure search path, so malicious users can't redirect user
3084 * code (e.g. pg_index.indexprs).
3085 */
3086 SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
3087
3088 /* Load the subscription into persistent memory context. */
3089 ApplyContext = AllocSetContextCreate(TopMemoryContext,
3090 "ApplyContext",
3091 ALLOCSET_DEFAULT_SIZES);
3092 StartTransactionCommand();
3093 oldctx = MemoryContextSwitchTo(ApplyContext);
3094
3095 MySubscription = GetSubscription(MyLogicalRepWorker->subid, true);
3096 if (!MySubscription)
3097 {
3098 ereport(LOG,
3099 (errmsg("logical replication apply worker for subscription %u will not "
3100 "start because the subscription was removed during startup",
3101 MyLogicalRepWorker->subid)));
3102 proc_exit(0);
3103 }
3104
3105 MySubscriptionValid = true;
3106 MemoryContextSwitchTo(oldctx);
3107
3108 if (!MySubscription->enabled)
3109 {
3110 ereport(LOG,
3111 (errmsg("logical replication apply worker for subscription \"%s\" will not "
3112 "start because the subscription was disabled during startup",
3113 MySubscription->name)));
3114
3115 proc_exit(0);
3116 }
3117
3118 /* Setup synchronous commit according to the user's wishes */
3119 SetConfigOption("synchronous_commit", MySubscription->synccommit,
3120 PGC_BACKEND, PGC_S_OVERRIDE);
3121
3122 /* Keep us informed about subscription changes. */
3123 CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
3124 subscription_change_cb,
3125 (Datum) 0);
3126
3127 if (am_tablesync_worker())
3128 ereport(LOG,
3129 (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
3130 MySubscription->name, get_rel_name(MyLogicalRepWorker->relid))));
3131 else
3132 ereport(LOG,
3133 (errmsg("logical replication apply worker for subscription \"%s\" has started",
3134 MySubscription->name)));
3135
3136 CommitTransactionCommand();
3137
3138 /* Connect to the origin and start the replication. */
3139 elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
3140 MySubscription->conninfo);
3141
3142 if (am_tablesync_worker())
3143 {
3144 char *syncslotname;
3145
3146 /* This is table synchronization worker, call initial sync. */
3147 syncslotname = LogicalRepSyncTableStart(&origin_startpos);
3148
3149 /* allocate slot name in long-lived context */
3150 myslotname = MemoryContextStrdup(ApplyContext, syncslotname);
3151
3152 pfree(syncslotname);
3153 }
3154 else
3155 {
3156 /* This is main apply worker */
3157 RepOriginId originid;
3158 TimeLineID startpointTLI;
3159 char *err;
3160
3161 myslotname = MySubscription->slotname;
3162
3163 /*
3164 * This shouldn't happen if the subscription is enabled, but guard
3165 * against DDL bugs or manual catalog changes. (libpqwalreceiver will
3166 * crash if slot is NULL.)
3167 */
3168 if (!myslotname)
3169 ereport(ERROR,
3170 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3171 errmsg("subscription has no replication slot set")));
3172
3173 /* Setup replication origin tracking. */
3174 StartTransactionCommand();
3175 snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid);
3176 originid = replorigin_by_name(originname, true);
3177 if (!OidIsValid(originid))
3178 originid = replorigin_create(originname);
3179 replorigin_session_setup(originid);
3180 replorigin_session_origin = originid;
3181 origin_startpos = replorigin_session_get_progress(false);
3182 CommitTransactionCommand();
3183
3184 LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
3185 MySubscription->name, &err);
3186 if (LogRepWorkerWalRcvConn == NULL)
3187 ereport(ERROR,
3188 (errcode(ERRCODE_CONNECTION_FAILURE),
3189 errmsg("could not connect to the publisher: %s", err)));
3190
3191 /*
3192 * We don't really use the output identify_system for anything but it
3193 * does some initializations on the upstream so let's still call it.
3194 */
3195 (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
3196 }
3197
3198 /*
3199 * Setup callback for syscache so that we know when something changes in
3200 * the subscription relation state.
3201 */
3202 CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
3203 invalidate_syncing_table_states,
3204 (Datum) 0);
3205
3206 /* Build logical replication streaming options. */
3207 options.logical = true;
3208 options.startpoint = origin_startpos;
3209 options.slotname = myslotname;
3210 options.proto.logical.proto_version =
3211 walrcv_server_version(LogRepWorkerWalRcvConn) >= 140000 ?
3212 LOGICALREP_PROTO_STREAM_VERSION_NUM : LOGICALREP_PROTO_VERSION_NUM;
3213 options.proto.logical.publication_names = MySubscription->publications;
3214 options.proto.logical.binary = MySubscription->binary;
3215 options.proto.logical.streaming = MySubscription->stream;
3216
3217 /* Start normal logical streaming replication. */
3218 walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
3219
3220 /* Run the main loop. */
3221 LogicalRepApplyLoop(origin_startpos);
3222
3223 proc_exit(0);
3224 }
3225
3226 /*
3227 * Is current process a logical replication worker?
3228 */
3229 bool
IsLogicalWorker(void)3230 IsLogicalWorker(void)
3231 {
3232 return MyLogicalRepWorker != NULL;
3233 }
3234