1 /*-------------------------------------------------------------------------
2 * worker.c
3 * PostgreSQL logical replication worker (apply)
4 *
5 * Copyright (c) 2016-2020, PostgreSQL Global Development Group
6 *
7 * IDENTIFICATION
8 * src/backend/replication/logical/worker.c
9 *
10 * NOTES
11 * This file contains the worker which applies logical changes as they come
12 * from remote logical replication stream.
13 *
14 * The main worker (apply) is started by logical replication worker
15 * launcher for every enabled subscription in a database. It uses
16 * walsender protocol to communicate with publisher.
17 *
18 * This module includes server facing code and shares libpqwalreceiver
19 * module with walreceiver for providing the libpq specific functionality.
20 *
21 *-------------------------------------------------------------------------
22 */
23
24 #include "postgres.h"
25
26 #include "access/table.h"
27 #include "access/tableam.h"
28 #include "access/xact.h"
29 #include "access/xlog_internal.h"
30 #include "catalog/catalog.h"
31 #include "catalog/namespace.h"
32 #include "catalog/partition.h"
33 #include "catalog/pg_inherits.h"
34 #include "catalog/pg_subscription.h"
35 #include "catalog/pg_subscription_rel.h"
36 #include "commands/tablecmds.h"
37 #include "commands/trigger.h"
38 #include "executor/executor.h"
39 #include "executor/execPartition.h"
40 #include "executor/nodeModifyTable.h"
41 #include "funcapi.h"
42 #include "libpq/pqformat.h"
43 #include "libpq/pqsignal.h"
44 #include "mb/pg_wchar.h"
45 #include "miscadmin.h"
46 #include "nodes/makefuncs.h"
47 #include "optimizer/optimizer.h"
48 #include "pgstat.h"
49 #include "postmaster/bgworker.h"
50 #include "postmaster/interrupt.h"
51 #include "postmaster/postmaster.h"
52 #include "postmaster/walwriter.h"
53 #include "replication/decode.h"
54 #include "replication/logical.h"
55 #include "replication/logicalproto.h"
56 #include "replication/logicalrelation.h"
57 #include "replication/logicalworker.h"
58 #include "replication/origin.h"
59 #include "replication/reorderbuffer.h"
60 #include "replication/snapbuild.h"
61 #include "replication/walreceiver.h"
62 #include "replication/worker_internal.h"
63 #include "rewrite/rewriteHandler.h"
64 #include "storage/bufmgr.h"
65 #include "storage/ipc.h"
66 #include "storage/lmgr.h"
67 #include "storage/proc.h"
68 #include "storage/procarray.h"
69 #include "tcop/tcopprot.h"
70 #include "utils/builtins.h"
71 #include "utils/catcache.h"
72 #include "utils/datum.h"
73 #include "utils/fmgroids.h"
74 #include "utils/guc.h"
75 #include "utils/inval.h"
76 #include "utils/lsyscache.h"
77 #include "utils/memutils.h"
78 #include "utils/rel.h"
79 #include "utils/syscache.h"
80 #include "utils/timeout.h"
81
82 #define NAPTIME_PER_CYCLE 1000 /* max sleep time between cycles (1s) */
83
84 typedef struct FlushPosition
85 {
86 dlist_node node;
87 XLogRecPtr local_end;
88 XLogRecPtr remote_end;
89 } FlushPosition;
90
91 static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
92
93 typedef struct SlotErrCallbackArg
94 {
95 LogicalRepRelMapEntry *rel;
96 int remote_attnum;
97 } SlotErrCallbackArg;
98
99 typedef struct ApplyExecutionData
100 {
101 EState *estate; /* executor state, used to track resources */
102
103 LogicalRepRelMapEntry *targetRel; /* replication target rel */
104 ResultRelInfo *targetRelInfo; /* ResultRelInfo for same */
105
106 /* These fields are used when the target relation is partitioned: */
107 ModifyTableState *mtstate; /* dummy ModifyTable state */
108 PartitionTupleRouting *proute; /* partition routing info */
109 } ApplyExecutionData;
110
111 static MemoryContext ApplyMessageContext = NULL;
112 MemoryContext ApplyContext = NULL;
113
114 WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
115
116 Subscription *MySubscription = NULL;
117 bool MySubscriptionValid = false;
118
119 bool in_remote_transaction = false;
120 static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
121
122 static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
123
124 static void store_flush_position(XLogRecPtr remote_lsn);
125
126 static void maybe_reread_subscription(void);
127
128 static void apply_handle_insert_internal(ResultRelInfo *relinfo,
129 EState *estate, TupleTableSlot *remoteslot);
130 static void apply_handle_update_internal(ResultRelInfo *relinfo,
131 EState *estate, TupleTableSlot *remoteslot,
132 LogicalRepTupleData *newtup,
133 LogicalRepRelMapEntry *relmapentry);
134 static void apply_handle_delete_internal(ResultRelInfo *relinfo, EState *estate,
135 TupleTableSlot *remoteslot,
136 LogicalRepRelation *remoterel);
137 static bool FindReplTupleInLocalRel(EState *estate, Relation localrel,
138 LogicalRepRelation *remoterel,
139 TupleTableSlot *remoteslot,
140 TupleTableSlot **localslot);
141 static void apply_handle_tuple_routing(ApplyExecutionData *edata,
142 TupleTableSlot *remoteslot,
143 LogicalRepTupleData *newtup,
144 CmdType operation);
145
146 /*
147 * Should this worker apply changes for given relation.
148 *
149 * This is mainly needed for initial relation data sync as that runs in
150 * separate worker process running in parallel and we need some way to skip
151 * changes coming to the main apply worker during the sync of a table.
152 *
153 * Note we need to do smaller or equals comparison for SYNCDONE state because
154 * it might hold position of end of initial slot consistent point WAL
155 * record + 1 (ie start of next record) and next record can be COMMIT of
156 * transaction we are now processing (which is what we set remote_final_lsn
157 * to in apply_handle_begin).
158 */
159 static bool
should_apply_changes_for_rel(LogicalRepRelMapEntry * rel)160 should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
161 {
162 if (am_tablesync_worker())
163 return MyLogicalRepWorker->relid == rel->localreloid;
164 else
165 return (rel->state == SUBREL_STATE_READY ||
166 (rel->state == SUBREL_STATE_SYNCDONE &&
167 rel->statelsn <= remote_final_lsn));
168 }
169
170 /*
171 * Begin one step (one INSERT, UPDATE, etc) of a replication transaction.
172 *
173 * Start a transaction, if this is the first step (else we keep using the
174 * existing transaction).
175 * Also provide a global snapshot and ensure we run in ApplyMessageContext.
176 */
177 static void
begin_replication_step(void)178 begin_replication_step(void)
179 {
180 SetCurrentStatementStartTimestamp();
181
182 if (!IsTransactionState())
183 {
184 StartTransactionCommand();
185 maybe_reread_subscription();
186 }
187
188 PushActiveSnapshot(GetTransactionSnapshot());
189
190 MemoryContextSwitchTo(ApplyMessageContext);
191 }
192
193 /*
194 * Finish up one step of a replication transaction.
195 * Callers of begin_replication_step() must also call this.
196 *
197 * We don't close out the transaction here, but we should increment
198 * the command counter to make the effects of this step visible.
199 */
200 static void
end_replication_step(void)201 end_replication_step(void)
202 {
203 PopActiveSnapshot();
204
205 CommandCounterIncrement();
206 }
207
208
209 /*
210 * Executor state preparation for evaluation of constraint expressions,
211 * indexes and triggers for the specified relation.
212 *
213 * Note that the caller must open and close any indexes to be updated.
214 */
215 static ApplyExecutionData *
create_edata_for_relation(LogicalRepRelMapEntry * rel)216 create_edata_for_relation(LogicalRepRelMapEntry *rel)
217 {
218 ApplyExecutionData *edata;
219 EState *estate;
220 ResultRelInfo *resultRelInfo;
221 RangeTblEntry *rte;
222
223 edata = (ApplyExecutionData *) palloc0(sizeof(ApplyExecutionData));
224 edata->targetRel = rel;
225
226 edata->estate = estate = CreateExecutorState();
227
228 rte = makeNode(RangeTblEntry);
229 rte->rtekind = RTE_RELATION;
230 rte->relid = RelationGetRelid(rel->localrel);
231 rte->relkind = rel->localrel->rd_rel->relkind;
232 rte->rellockmode = AccessShareLock;
233 ExecInitRangeTable(estate, list_make1(rte));
234
235 edata->targetRelInfo = resultRelInfo = makeNode(ResultRelInfo);
236
237 /*
238 * Use Relation opened by logicalrep_rel_open() instead of opening it
239 * again.
240 */
241 InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
242
243 estate->es_result_relations = resultRelInfo;
244 estate->es_num_result_relations = 1;
245 estate->es_result_relation_info = resultRelInfo;
246
247 estate->es_output_cid = GetCurrentCommandId(true);
248
249 /* Prepare to catch AFTER triggers. */
250 AfterTriggerBeginQuery();
251
252 /* other fields of edata remain NULL for now */
253
254 return edata;
255 }
256
257 /*
258 * Finish any operations related to the executor state created by
259 * create_edata_for_relation().
260 */
261 static void
finish_edata(ApplyExecutionData * edata)262 finish_edata(ApplyExecutionData *edata)
263 {
264 EState *estate = edata->estate;
265
266 /* Handle any queued AFTER triggers. */
267 AfterTriggerEndQuery(estate);
268
269 /* Shut down tuple routing, if any was done. */
270 if (edata->proute)
271 ExecCleanupTupleRouting(edata->mtstate, edata->proute);
272
273 /*
274 * Cleanup. It might seem that we should call ExecCloseResultRelations()
275 * here, but we intentionally don't. It would close the rel we added to
276 * the estate above, which is wrong because we took no corresponding
277 * refcount. We rely on ExecCleanupTupleRouting() to close any other
278 * relations opened during execution.
279 */
280 ExecResetTupleTable(estate->es_tupleTable, false);
281 FreeExecutorState(estate);
282 pfree(edata);
283 }
284
285 /*
286 * Executes default values for columns for which we can't map to remote
287 * relation columns.
288 *
289 * This allows us to support tables which have more columns on the downstream
290 * than on the upstream.
291 */
292 static void
slot_fill_defaults(LogicalRepRelMapEntry * rel,EState * estate,TupleTableSlot * slot)293 slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate,
294 TupleTableSlot *slot)
295 {
296 TupleDesc desc = RelationGetDescr(rel->localrel);
297 int num_phys_attrs = desc->natts;
298 int i;
299 int attnum,
300 num_defaults = 0;
301 int *defmap;
302 ExprState **defexprs;
303 ExprContext *econtext;
304
305 econtext = GetPerTupleExprContext(estate);
306
307 /* We got all the data via replication, no need to evaluate anything. */
308 if (num_phys_attrs == rel->remoterel.natts)
309 return;
310
311 defmap = (int *) palloc(num_phys_attrs * sizeof(int));
312 defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
313
314 Assert(rel->attrmap->maplen == num_phys_attrs);
315 for (attnum = 0; attnum < num_phys_attrs; attnum++)
316 {
317 Expr *defexpr;
318
319 if (TupleDescAttr(desc, attnum)->attisdropped || TupleDescAttr(desc, attnum)->attgenerated)
320 continue;
321
322 if (rel->attrmap->attnums[attnum] >= 0)
323 continue;
324
325 defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
326
327 if (defexpr != NULL)
328 {
329 /* Run the expression through planner */
330 defexpr = expression_planner(defexpr);
331
332 /* Initialize executable expression in copycontext */
333 defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
334 defmap[num_defaults] = attnum;
335 num_defaults++;
336 }
337
338 }
339
340 for (i = 0; i < num_defaults; i++)
341 slot->tts_values[defmap[i]] =
342 ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
343 }
344
345 /*
346 * Error callback to give more context info about data conversion failures
347 * while reading data from the remote server.
348 */
349 static void
slot_store_error_callback(void * arg)350 slot_store_error_callback(void *arg)
351 {
352 SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
353 LogicalRepRelMapEntry *rel;
354
355 /* Nothing to do if remote attribute number is not set */
356 if (errarg->remote_attnum < 0)
357 return;
358
359 rel = errarg->rel;
360 errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\"",
361 rel->remoterel.nspname, rel->remoterel.relname,
362 rel->remoterel.attnames[errarg->remote_attnum]);
363 }
364
365 /*
366 * Store data in C string form into slot.
367 * This is similar to BuildTupleFromCStrings but TupleTableSlot fits our
368 * use better.
369 */
370 static void
slot_store_cstrings(TupleTableSlot * slot,LogicalRepRelMapEntry * rel,char ** values)371 slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
372 char **values)
373 {
374 int natts = slot->tts_tupleDescriptor->natts;
375 int i;
376 SlotErrCallbackArg errarg;
377 ErrorContextCallback errcallback;
378
379 ExecClearTuple(slot);
380
381 /* Push callback + info on the error context stack */
382 errarg.rel = rel;
383 errarg.remote_attnum = -1;
384 errcallback.callback = slot_store_error_callback;
385 errcallback.arg = (void *) &errarg;
386 errcallback.previous = error_context_stack;
387 error_context_stack = &errcallback;
388
389 /* Call the "in" function for each non-dropped attribute */
390 Assert(natts == rel->attrmap->maplen);
391 for (i = 0; i < natts; i++)
392 {
393 Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
394 int remoteattnum = rel->attrmap->attnums[i];
395
396 if (!att->attisdropped && remoteattnum >= 0 &&
397 values[remoteattnum] != NULL)
398 {
399 Oid typinput;
400 Oid typioparam;
401
402 errarg.remote_attnum = remoteattnum;
403
404 getTypeInputInfo(att->atttypid, &typinput, &typioparam);
405 slot->tts_values[i] =
406 OidInputFunctionCall(typinput, values[remoteattnum],
407 typioparam, att->atttypmod);
408 slot->tts_isnull[i] = false;
409
410 errarg.remote_attnum = -1;
411 }
412 else
413 {
414 /*
415 * We assign NULL to dropped attributes, NULL values, and missing
416 * values (missing values should be later filled using
417 * slot_fill_defaults).
418 */
419 slot->tts_values[i] = (Datum) 0;
420 slot->tts_isnull[i] = true;
421 }
422 }
423
424 /* Pop the error context stack */
425 error_context_stack = errcallback.previous;
426
427 ExecStoreVirtualTuple(slot);
428 }
429
430 /*
431 * Replace selected columns with user data provided as C strings.
432 * This is somewhat similar to heap_modify_tuple but also calls the type
433 * input functions on the user data.
434 * "slot" is filled with a copy of the tuple in "srcslot", with
435 * columns selected by the "replaces" array replaced with data values
436 * from "values".
437 * Caution: unreplaced pass-by-ref columns in "slot" will point into the
438 * storage for "srcslot". This is OK for current usage, but someday we may
439 * need to materialize "slot" at the end to make it independent of "srcslot".
440 */
441 static void
slot_modify_cstrings(TupleTableSlot * slot,TupleTableSlot * srcslot,LogicalRepRelMapEntry * rel,char ** values,bool * replaces)442 slot_modify_cstrings(TupleTableSlot *slot, TupleTableSlot *srcslot,
443 LogicalRepRelMapEntry *rel,
444 char **values, bool *replaces)
445 {
446 int natts = slot->tts_tupleDescriptor->natts;
447 int i;
448 SlotErrCallbackArg errarg;
449 ErrorContextCallback errcallback;
450
451 /* We'll fill "slot" with a virtual tuple, so we must start with ... */
452 ExecClearTuple(slot);
453
454 /*
455 * Copy all the column data from srcslot, so that we'll have valid values
456 * for unreplaced columns.
457 */
458 Assert(natts == srcslot->tts_tupleDescriptor->natts);
459 slot_getallattrs(srcslot);
460 memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
461 memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
462
463 /* For error reporting, push callback + info on the error context stack */
464 errarg.rel = rel;
465 errarg.remote_attnum = -1;
466 errcallback.callback = slot_store_error_callback;
467 errcallback.arg = (void *) &errarg;
468 errcallback.previous = error_context_stack;
469 error_context_stack = &errcallback;
470
471 /* Call the "in" function for each replaced attribute */
472 Assert(natts == rel->attrmap->maplen);
473 for (i = 0; i < natts; i++)
474 {
475 Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
476 int remoteattnum = rel->attrmap->attnums[i];
477
478 if (remoteattnum < 0)
479 continue;
480
481 if (!replaces[remoteattnum])
482 continue;
483
484 if (values[remoteattnum] != NULL)
485 {
486 Oid typinput;
487 Oid typioparam;
488
489 errarg.remote_attnum = remoteattnum;
490
491 getTypeInputInfo(att->atttypid, &typinput, &typioparam);
492 slot->tts_values[i] =
493 OidInputFunctionCall(typinput, values[remoteattnum],
494 typioparam, att->atttypmod);
495 slot->tts_isnull[i] = false;
496
497 errarg.remote_attnum = -1;
498 }
499 else
500 {
501 slot->tts_values[i] = (Datum) 0;
502 slot->tts_isnull[i] = true;
503 }
504 }
505
506 /* Pop the error context stack */
507 error_context_stack = errcallback.previous;
508
509 /* And finally, declare that "slot" contains a valid virtual tuple */
510 ExecStoreVirtualTuple(slot);
511 }
512
513 /*
514 * Handle BEGIN message.
515 */
516 static void
apply_handle_begin(StringInfo s)517 apply_handle_begin(StringInfo s)
518 {
519 LogicalRepBeginData begin_data;
520
521 logicalrep_read_begin(s, &begin_data);
522
523 remote_final_lsn = begin_data.final_lsn;
524
525 in_remote_transaction = true;
526
527 pgstat_report_activity(STATE_RUNNING, NULL);
528 }
529
530 /*
531 * Handle COMMIT message.
532 *
533 * TODO, support tracking of multiple origins
534 */
535 static void
apply_handle_commit(StringInfo s)536 apply_handle_commit(StringInfo s)
537 {
538 LogicalRepCommitData commit_data;
539
540 logicalrep_read_commit(s, &commit_data);
541
542 if (commit_data.commit_lsn != remote_final_lsn)
543 ereport(ERROR,
544 (errcode(ERRCODE_PROTOCOL_VIOLATION),
545 errmsg_internal("incorrect commit LSN %X/%X in commit message (expected %X/%X)",
546 (uint32) (commit_data.commit_lsn >> 32),
547 (uint32) commit_data.commit_lsn,
548 (uint32) (remote_final_lsn >> 32),
549 (uint32) remote_final_lsn)));
550
551 /* The synchronization worker runs in single transaction. */
552 if (IsTransactionState() && !am_tablesync_worker())
553 {
554 /*
555 * Update origin state so we can restart streaming from correct
556 * position in case of crash.
557 */
558 replorigin_session_origin_lsn = commit_data.end_lsn;
559 replorigin_session_origin_timestamp = commit_data.committime;
560
561 CommitTransactionCommand();
562 pgstat_report_stat(false);
563
564 store_flush_position(commit_data.end_lsn);
565 }
566 else
567 {
568 /* Process any invalidation messages that might have accumulated. */
569 AcceptInvalidationMessages();
570 maybe_reread_subscription();
571 }
572
573 in_remote_transaction = false;
574
575 /* Process any tables that are being synchronized in parallel. */
576 process_syncing_tables(commit_data.end_lsn);
577
578 pgstat_report_activity(STATE_IDLE, NULL);
579 }
580
581 /*
582 * Handle ORIGIN message.
583 *
584 * TODO, support tracking of multiple origins
585 */
586 static void
apply_handle_origin(StringInfo s)587 apply_handle_origin(StringInfo s)
588 {
589 /*
590 * ORIGIN message can only come inside remote transaction and before any
591 * actual writes.
592 */
593 if (!in_remote_transaction ||
594 (IsTransactionState() && !am_tablesync_worker()))
595 ereport(ERROR,
596 (errcode(ERRCODE_PROTOCOL_VIOLATION),
597 errmsg("ORIGIN message sent out of order")));
598 }
599
600 /*
601 * Handle RELATION message.
602 *
603 * Note we don't do validation against local schema here. The validation
604 * against local schema is postponed until first change for given relation
605 * comes as we only care about it when applying changes for it anyway and we
606 * do less locking this way.
607 */
608 static void
apply_handle_relation(StringInfo s)609 apply_handle_relation(StringInfo s)
610 {
611 LogicalRepRelation *rel;
612
613 rel = logicalrep_read_rel(s);
614 logicalrep_relmap_update(rel);
615 }
616
617 /*
618 * Handle TYPE message.
619 *
620 * This is now vestigial; we read the info and discard it.
621 */
622 static void
apply_handle_type(StringInfo s)623 apply_handle_type(StringInfo s)
624 {
625 LogicalRepTyp typ;
626
627 logicalrep_read_typ(s, &typ);
628 }
629
630 /*
631 * Get replica identity index or if it is not defined a primary key.
632 *
633 * If neither is defined, returns InvalidOid
634 */
635 static Oid
GetRelationIdentityOrPK(Relation rel)636 GetRelationIdentityOrPK(Relation rel)
637 {
638 Oid idxoid;
639
640 idxoid = RelationGetReplicaIndex(rel);
641
642 if (!OidIsValid(idxoid))
643 idxoid = RelationGetPrimaryKeyIndex(rel);
644
645 return idxoid;
646 }
647
648 /*
649 * Handle INSERT message.
650 */
651
652 static void
apply_handle_insert(StringInfo s)653 apply_handle_insert(StringInfo s)
654 {
655 LogicalRepRelMapEntry *rel;
656 LogicalRepTupleData newtup;
657 LogicalRepRelId relid;
658 ApplyExecutionData *edata;
659 EState *estate;
660 TupleTableSlot *remoteslot;
661 MemoryContext oldctx;
662
663 begin_replication_step();
664
665 relid = logicalrep_read_insert(s, &newtup);
666 rel = logicalrep_rel_open(relid, RowExclusiveLock);
667 if (!should_apply_changes_for_rel(rel))
668 {
669 /*
670 * The relation can't become interesting in the middle of the
671 * transaction so it's safe to unlock it.
672 */
673 logicalrep_rel_close(rel, RowExclusiveLock);
674 end_replication_step();
675 return;
676 }
677
678 /* Initialize the executor state. */
679 edata = create_edata_for_relation(rel);
680 estate = edata->estate;
681 remoteslot = ExecInitExtraTupleSlot(estate,
682 RelationGetDescr(rel->localrel),
683 &TTSOpsVirtual);
684
685 /* Process and store remote tuple in the slot */
686 oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
687 slot_store_cstrings(remoteslot, rel, newtup.values);
688 slot_fill_defaults(rel, estate, remoteslot);
689 MemoryContextSwitchTo(oldctx);
690
691 /* For a partitioned table, insert the tuple into a partition. */
692 if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
693 apply_handle_tuple_routing(edata,
694 remoteslot, NULL, CMD_INSERT);
695 else
696 apply_handle_insert_internal(estate->es_result_relation_info, estate,
697 remoteslot);
698
699 finish_edata(edata);
700
701 logicalrep_rel_close(rel, NoLock);
702
703 end_replication_step();
704 }
705
706 /* Workhorse for apply_handle_insert() */
707 static void
apply_handle_insert_internal(ResultRelInfo * relinfo,EState * estate,TupleTableSlot * remoteslot)708 apply_handle_insert_internal(ResultRelInfo *relinfo,
709 EState *estate, TupleTableSlot *remoteslot)
710 {
711 ExecOpenIndices(relinfo, false);
712
713 /* Do the insert. */
714 ExecSimpleRelationInsert(estate, remoteslot);
715
716 /* Cleanup. */
717 ExecCloseIndices(relinfo);
718 }
719
720 /*
721 * Check if the logical replication relation is updatable and throw
722 * appropriate error if it isn't.
723 */
724 static void
check_relation_updatable(LogicalRepRelMapEntry * rel)725 check_relation_updatable(LogicalRepRelMapEntry *rel)
726 {
727 /* Updatable, no error. */
728 if (rel->updatable)
729 return;
730
731 /*
732 * We are in error mode so it's fine this is somewhat slow. It's better to
733 * give user correct error.
734 */
735 if (OidIsValid(GetRelationIdentityOrPK(rel->localrel)))
736 {
737 ereport(ERROR,
738 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
739 errmsg("publisher did not send replica identity column "
740 "expected by the logical replication target relation \"%s.%s\"",
741 rel->remoterel.nspname, rel->remoterel.relname)));
742 }
743
744 ereport(ERROR,
745 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
746 errmsg("logical replication target relation \"%s.%s\" has "
747 "neither REPLICA IDENTITY index nor PRIMARY "
748 "KEY and published relation does not have "
749 "REPLICA IDENTITY FULL",
750 rel->remoterel.nspname, rel->remoterel.relname)));
751 }
752
753 /*
754 * Handle UPDATE message.
755 *
756 * TODO: FDW support
757 */
758 static void
apply_handle_update(StringInfo s)759 apply_handle_update(StringInfo s)
760 {
761 LogicalRepRelMapEntry *rel;
762 LogicalRepRelId relid;
763 ApplyExecutionData *edata;
764 EState *estate;
765 LogicalRepTupleData oldtup;
766 LogicalRepTupleData newtup;
767 bool has_oldtup;
768 TupleTableSlot *remoteslot;
769 RangeTblEntry *target_rte;
770 MemoryContext oldctx;
771
772 begin_replication_step();
773
774 relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
775 &newtup);
776 rel = logicalrep_rel_open(relid, RowExclusiveLock);
777 if (!should_apply_changes_for_rel(rel))
778 {
779 /*
780 * The relation can't become interesting in the middle of the
781 * transaction so it's safe to unlock it.
782 */
783 logicalrep_rel_close(rel, RowExclusiveLock);
784 end_replication_step();
785 return;
786 }
787
788 /* Check if we can do the update. */
789 check_relation_updatable(rel);
790
791 /* Initialize the executor state. */
792 edata = create_edata_for_relation(rel);
793 estate = edata->estate;
794 remoteslot = ExecInitExtraTupleSlot(estate,
795 RelationGetDescr(rel->localrel),
796 &TTSOpsVirtual);
797
798 /*
799 * Populate updatedCols so that per-column triggers can fire. This could
800 * include more columns than were actually changed on the publisher
801 * because the logical replication protocol doesn't contain that
802 * information. But it would for example exclude columns that only exist
803 * on the subscriber, since we are not touching those.
804 */
805 target_rte = list_nth(estate->es_range_table, 0);
806 for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
807 {
808 Form_pg_attribute att = TupleDescAttr(remoteslot->tts_tupleDescriptor, i);
809 int remoteattnum = rel->attrmap->attnums[i];
810
811 if (!att->attisdropped && remoteattnum >= 0)
812 {
813 if (newtup.changed[remoteattnum])
814 target_rte->updatedCols =
815 bms_add_member(target_rte->updatedCols,
816 i + 1 - FirstLowInvalidHeapAttributeNumber);
817 }
818 }
819
820 /* Also populate extraUpdatedCols, in case we have generated columns */
821 fill_extraUpdatedCols(target_rte, rel->localrel);
822
823 /* Build the search tuple. */
824 oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
825 slot_store_cstrings(remoteslot, rel,
826 has_oldtup ? oldtup.values : newtup.values);
827 MemoryContextSwitchTo(oldctx);
828
829 /* For a partitioned table, apply update to correct partition. */
830 if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
831 apply_handle_tuple_routing(edata,
832 remoteslot, &newtup, CMD_UPDATE);
833 else
834 apply_handle_update_internal(estate->es_result_relation_info, estate,
835 remoteslot, &newtup, rel);
836
837 finish_edata(edata);
838
839 logicalrep_rel_close(rel, NoLock);
840
841 end_replication_step();
842 }
843
844 /* Workhorse for apply_handle_update() */
845 static void
apply_handle_update_internal(ResultRelInfo * relinfo,EState * estate,TupleTableSlot * remoteslot,LogicalRepTupleData * newtup,LogicalRepRelMapEntry * relmapentry)846 apply_handle_update_internal(ResultRelInfo *relinfo,
847 EState *estate, TupleTableSlot *remoteslot,
848 LogicalRepTupleData *newtup,
849 LogicalRepRelMapEntry *relmapentry)
850 {
851 Relation localrel = relinfo->ri_RelationDesc;
852 EPQState epqstate;
853 TupleTableSlot *localslot;
854 bool found;
855 MemoryContext oldctx;
856
857 EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
858 ExecOpenIndices(relinfo, false);
859
860 found = FindReplTupleInLocalRel(estate, localrel,
861 &relmapentry->remoterel,
862 remoteslot, &localslot);
863 ExecClearTuple(remoteslot);
864
865 /*
866 * Tuple found.
867 *
868 * Note this will fail if there are other conflicting unique indexes.
869 */
870 if (found)
871 {
872 /* Process and store remote tuple in the slot */
873 oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
874 slot_modify_cstrings(remoteslot, localslot, relmapentry,
875 newtup->values, newtup->changed);
876 MemoryContextSwitchTo(oldctx);
877
878 EvalPlanQualSetSlot(&epqstate, remoteslot);
879
880 /* Do the actual update. */
881 ExecSimpleRelationUpdate(estate, &epqstate, localslot, remoteslot);
882 }
883 else
884 {
885 /*
886 * The tuple to be updated could not be found. Do nothing except for
887 * emitting a log message.
888 *
889 * XXX should this be promoted to ereport(LOG) perhaps?
890 */
891 elog(DEBUG1,
892 "logical replication did not find row to be updated "
893 "in replication target relation \"%s\"",
894 RelationGetRelationName(localrel));
895 }
896
897 /* Cleanup. */
898 ExecCloseIndices(relinfo);
899 EvalPlanQualEnd(&epqstate);
900 }
901
902 /*
903 * Handle DELETE message.
904 *
905 * TODO: FDW support
906 */
907 static void
apply_handle_delete(StringInfo s)908 apply_handle_delete(StringInfo s)
909 {
910 LogicalRepRelMapEntry *rel;
911 LogicalRepTupleData oldtup;
912 LogicalRepRelId relid;
913 ApplyExecutionData *edata;
914 EState *estate;
915 TupleTableSlot *remoteslot;
916 MemoryContext oldctx;
917
918 begin_replication_step();
919
920 relid = logicalrep_read_delete(s, &oldtup);
921 rel = logicalrep_rel_open(relid, RowExclusiveLock);
922 if (!should_apply_changes_for_rel(rel))
923 {
924 /*
925 * The relation can't become interesting in the middle of the
926 * transaction so it's safe to unlock it.
927 */
928 logicalrep_rel_close(rel, RowExclusiveLock);
929 end_replication_step();
930 return;
931 }
932
933 /* Check if we can do the delete. */
934 check_relation_updatable(rel);
935
936 /* Initialize the executor state. */
937 edata = create_edata_for_relation(rel);
938 estate = edata->estate;
939 remoteslot = ExecInitExtraTupleSlot(estate,
940 RelationGetDescr(rel->localrel),
941 &TTSOpsVirtual);
942
943 /* Build the search tuple. */
944 oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
945 slot_store_cstrings(remoteslot, rel, oldtup.values);
946 MemoryContextSwitchTo(oldctx);
947
948 /* For a partitioned table, apply delete to correct partition. */
949 if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
950 apply_handle_tuple_routing(edata,
951 remoteslot, NULL, CMD_DELETE);
952 else
953 apply_handle_delete_internal(estate->es_result_relation_info, estate,
954 remoteslot, &rel->remoterel);
955
956 finish_edata(edata);
957
958 logicalrep_rel_close(rel, NoLock);
959
960 end_replication_step();
961 }
962
963 /* Workhorse for apply_handle_delete() */
964 static void
apply_handle_delete_internal(ResultRelInfo * relinfo,EState * estate,TupleTableSlot * remoteslot,LogicalRepRelation * remoterel)965 apply_handle_delete_internal(ResultRelInfo *relinfo, EState *estate,
966 TupleTableSlot *remoteslot,
967 LogicalRepRelation *remoterel)
968 {
969 Relation localrel = relinfo->ri_RelationDesc;
970 EPQState epqstate;
971 TupleTableSlot *localslot;
972 bool found;
973
974 EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
975 ExecOpenIndices(relinfo, false);
976
977 found = FindReplTupleInLocalRel(estate, localrel, remoterel,
978 remoteslot, &localslot);
979
980 /* If found delete it. */
981 if (found)
982 {
983 EvalPlanQualSetSlot(&epqstate, localslot);
984
985 /* Do the actual delete. */
986 ExecSimpleRelationDelete(estate, &epqstate, localslot);
987 }
988 else
989 {
990 /*
991 * The tuple to be deleted could not be found. Do nothing except for
992 * emitting a log message.
993 *
994 * XXX should this be promoted to ereport(LOG) perhaps?
995 */
996 elog(DEBUG1,
997 "logical replication did not find row to be deleted "
998 "in replication target relation \"%s\"",
999 RelationGetRelationName(localrel));
1000 }
1001
1002 /* Cleanup. */
1003 ExecCloseIndices(relinfo);
1004 EvalPlanQualEnd(&epqstate);
1005 }
1006
1007 /*
1008 * Try to find a tuple received from the publication side (in 'remoteslot') in
1009 * the corresponding local relation using either replica identity index,
1010 * primary key or if needed, sequential scan.
1011 *
1012 * Local tuple, if found, is returned in '*localslot'.
1013 */
1014 static bool
FindReplTupleInLocalRel(EState * estate,Relation localrel,LogicalRepRelation * remoterel,TupleTableSlot * remoteslot,TupleTableSlot ** localslot)1015 FindReplTupleInLocalRel(EState *estate, Relation localrel,
1016 LogicalRepRelation *remoterel,
1017 TupleTableSlot *remoteslot,
1018 TupleTableSlot **localslot)
1019 {
1020 Oid idxoid;
1021 bool found;
1022
1023 *localslot = table_slot_create(localrel, &estate->es_tupleTable);
1024
1025 idxoid = GetRelationIdentityOrPK(localrel);
1026 Assert(OidIsValid(idxoid) ||
1027 (remoterel->replident == REPLICA_IDENTITY_FULL));
1028
1029 if (OidIsValid(idxoid))
1030 found = RelationFindReplTupleByIndex(localrel, idxoid,
1031 LockTupleExclusive,
1032 remoteslot, *localslot);
1033 else
1034 found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
1035 remoteslot, *localslot);
1036
1037 return found;
1038 }
1039
1040 /*
1041 * This handles insert, update, delete on a partitioned table.
1042 */
1043 static void
apply_handle_tuple_routing(ApplyExecutionData * edata,TupleTableSlot * remoteslot,LogicalRepTupleData * newtup,CmdType operation)1044 apply_handle_tuple_routing(ApplyExecutionData *edata,
1045 TupleTableSlot *remoteslot,
1046 LogicalRepTupleData *newtup,
1047 CmdType operation)
1048 {
1049 EState *estate = edata->estate;
1050 LogicalRepRelMapEntry *relmapentry = edata->targetRel;
1051 ResultRelInfo *relinfo = edata->targetRelInfo;
1052 Relation parentrel = relinfo->ri_RelationDesc;
1053 ModifyTableState *mtstate;
1054 PartitionTupleRouting *proute;
1055 ResultRelInfo *partrelinfo;
1056 Relation partrel;
1057 TupleTableSlot *remoteslot_part;
1058 PartitionRoutingInfo *partinfo;
1059 TupleConversionMap *map;
1060 MemoryContext oldctx;
1061
1062 /* ModifyTableState is needed for ExecFindPartition(). */
1063 edata->mtstate = mtstate = makeNode(ModifyTableState);
1064 mtstate->ps.plan = NULL;
1065 mtstate->ps.state = estate;
1066 mtstate->operation = operation;
1067 mtstate->resultRelInfo = relinfo;
1068
1069 /* ... as is PartitionTupleRouting. */
1070 edata->proute = proute = ExecSetupPartitionTupleRouting(estate, mtstate,
1071 parentrel);
1072
1073 /*
1074 * Find the partition to which the "search tuple" belongs.
1075 */
1076 Assert(remoteslot != NULL);
1077 oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
1078 partrelinfo = ExecFindPartition(mtstate, relinfo, proute,
1079 remoteslot, estate);
1080 Assert(partrelinfo != NULL);
1081 partrel = partrelinfo->ri_RelationDesc;
1082
1083 /*
1084 * To perform any of the operations below, the tuple must match the
1085 * partition's rowtype. Convert if needed or just copy, using a dedicated
1086 * slot to store the tuple in any case.
1087 */
1088 partinfo = partrelinfo->ri_PartitionInfo;
1089 remoteslot_part = partinfo->pi_PartitionTupleSlot;
1090 if (remoteslot_part == NULL)
1091 remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
1092 map = partinfo->pi_RootToPartitionMap;
1093 if (map != NULL)
1094 remoteslot_part = execute_attr_map_slot(map->attrMap, remoteslot,
1095 remoteslot_part);
1096 else
1097 {
1098 remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
1099 slot_getallattrs(remoteslot_part);
1100 }
1101 MemoryContextSwitchTo(oldctx);
1102
1103 estate->es_result_relation_info = partrelinfo;
1104 switch (operation)
1105 {
1106 case CMD_INSERT:
1107 apply_handle_insert_internal(partrelinfo, estate,
1108 remoteslot_part);
1109 break;
1110
1111 case CMD_DELETE:
1112 apply_handle_delete_internal(partrelinfo, estate,
1113 remoteslot_part,
1114 &relmapentry->remoterel);
1115 break;
1116
1117 case CMD_UPDATE:
1118
1119 /*
1120 * For UPDATE, depending on whether or not the updated tuple
1121 * satisfies the partition's constraint, perform a simple UPDATE
1122 * of the partition or move the updated tuple into a different
1123 * suitable partition.
1124 */
1125 {
1126 AttrMap *attrmap = map ? map->attrMap : NULL;
1127 LogicalRepRelMapEntry *part_entry;
1128 TupleTableSlot *localslot;
1129 ResultRelInfo *partrelinfo_new;
1130 bool found;
1131
1132 part_entry = logicalrep_partition_open(relmapentry, partrel,
1133 attrmap);
1134
1135 /* Get the matching local tuple from the partition. */
1136 found = FindReplTupleInLocalRel(estate, partrel,
1137 &part_entry->remoterel,
1138 remoteslot_part, &localslot);
1139 if (!found)
1140 {
1141 /*
1142 * The tuple to be updated could not be found. Do nothing
1143 * except for emitting a log message.
1144 *
1145 * XXX should this be promoted to ereport(LOG) perhaps?
1146 */
1147 elog(DEBUG1,
1148 "logical replication did not find row to be updated "
1149 "in replication target relation's partition \"%s\"",
1150 RelationGetRelationName(partrel));
1151 return;
1152 }
1153
1154 /*
1155 * Apply the update to the local tuple, putting the result in
1156 * remoteslot_part.
1157 */
1158 oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
1159 slot_modify_cstrings(remoteslot_part, localslot,
1160 part_entry,
1161 newtup->values, newtup->changed);
1162 MemoryContextSwitchTo(oldctx);
1163
1164 /*
1165 * Does the updated tuple still satisfy the current
1166 * partition's constraint?
1167 */
1168 if (partrelinfo->ri_PartitionCheck == NULL ||
1169 ExecPartitionCheck(partrelinfo, remoteslot_part, estate,
1170 false))
1171 {
1172 /*
1173 * Yes, so simply UPDATE the partition. We don't call
1174 * apply_handle_update_internal() here, which would
1175 * normally do the following work, to avoid repeating some
1176 * work already done above to find the local tuple in the
1177 * partition.
1178 */
1179 EPQState epqstate;
1180
1181 EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
1182 ExecOpenIndices(partrelinfo, false);
1183
1184 EvalPlanQualSetSlot(&epqstate, remoteslot_part);
1185 ExecSimpleRelationUpdate(estate, &epqstate, localslot,
1186 remoteslot_part);
1187 ExecCloseIndices(partrelinfo);
1188 EvalPlanQualEnd(&epqstate);
1189 }
1190 else
1191 {
1192 /* Move the tuple into the new partition. */
1193
1194 /*
1195 * New partition will be found using tuple routing, which
1196 * can only occur via the parent table. We might need to
1197 * convert the tuple to the parent's rowtype. Note that
1198 * this is the tuple found in the partition, not the
1199 * original search tuple received by this function.
1200 */
1201 if (map)
1202 {
1203 TupleConversionMap *PartitionToRootMap =
1204 convert_tuples_by_name(RelationGetDescr(partrel),
1205 RelationGetDescr(parentrel));
1206
1207 remoteslot =
1208 execute_attr_map_slot(PartitionToRootMap->attrMap,
1209 remoteslot_part, remoteslot);
1210 }
1211 else
1212 {
1213 remoteslot = ExecCopySlot(remoteslot, remoteslot_part);
1214 slot_getallattrs(remoteslot);
1215 }
1216
1217
1218 /* Find the new partition. */
1219 oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
1220 partrelinfo_new = ExecFindPartition(mtstate, relinfo,
1221 proute, remoteslot,
1222 estate);
1223 MemoryContextSwitchTo(oldctx);
1224 Assert(partrelinfo_new != partrelinfo);
1225
1226 /* DELETE old tuple found in the old partition. */
1227 estate->es_result_relation_info = partrelinfo;
1228 apply_handle_delete_internal(partrelinfo, estate,
1229 localslot,
1230 &relmapentry->remoterel);
1231
1232 /* INSERT new tuple into the new partition. */
1233
1234 /*
1235 * Convert the replacement tuple to match the destination
1236 * partition rowtype.
1237 */
1238 oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
1239 partrel = partrelinfo_new->ri_RelationDesc;
1240 partinfo = partrelinfo_new->ri_PartitionInfo;
1241 remoteslot_part = partinfo->pi_PartitionTupleSlot;
1242 if (remoteslot_part == NULL)
1243 remoteslot_part = table_slot_create(partrel,
1244 &estate->es_tupleTable);
1245 map = partinfo->pi_RootToPartitionMap;
1246 if (map != NULL)
1247 {
1248 remoteslot_part = execute_attr_map_slot(map->attrMap,
1249 remoteslot,
1250 remoteslot_part);
1251 }
1252 else
1253 {
1254 remoteslot_part = ExecCopySlot(remoteslot_part,
1255 remoteslot);
1256 slot_getallattrs(remoteslot);
1257 }
1258 MemoryContextSwitchTo(oldctx);
1259 estate->es_result_relation_info = partrelinfo_new;
1260 apply_handle_insert_internal(partrelinfo_new, estate,
1261 remoteslot_part);
1262 }
1263 }
1264 break;
1265
1266 default:
1267 elog(ERROR, "unrecognized CmdType: %d", (int) operation);
1268 break;
1269 }
1270 }
1271
1272 /*
1273 * Handle TRUNCATE message.
1274 *
1275 * TODO: FDW support
1276 */
1277 static void
apply_handle_truncate(StringInfo s)1278 apply_handle_truncate(StringInfo s)
1279 {
1280 bool cascade = false;
1281 bool restart_seqs = false;
1282 List *remote_relids = NIL;
1283 List *remote_rels = NIL;
1284 List *rels = NIL;
1285 List *part_rels = NIL;
1286 List *relids = NIL;
1287 List *relids_logged = NIL;
1288 ListCell *lc;
1289 LOCKMODE lockmode = AccessExclusiveLock;
1290
1291 begin_replication_step();
1292
1293 remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
1294
1295 foreach(lc, remote_relids)
1296 {
1297 LogicalRepRelId relid = lfirst_oid(lc);
1298 LogicalRepRelMapEntry *rel;
1299
1300 rel = logicalrep_rel_open(relid, lockmode);
1301 if (!should_apply_changes_for_rel(rel))
1302 {
1303 /*
1304 * The relation can't become interesting in the middle of the
1305 * transaction so it's safe to unlock it.
1306 */
1307 logicalrep_rel_close(rel, lockmode);
1308 continue;
1309 }
1310
1311 remote_rels = lappend(remote_rels, rel);
1312 rels = lappend(rels, rel->localrel);
1313 relids = lappend_oid(relids, rel->localreloid);
1314 if (RelationIsLogicallyLogged(rel->localrel))
1315 relids_logged = lappend_oid(relids_logged, rel->localreloid);
1316
1317 /*
1318 * Truncate partitions if we got a message to truncate a partitioned
1319 * table.
1320 */
1321 if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1322 {
1323 ListCell *child;
1324 List *children = find_all_inheritors(rel->localreloid,
1325 lockmode,
1326 NULL);
1327
1328 foreach(child, children)
1329 {
1330 Oid childrelid = lfirst_oid(child);
1331 Relation childrel;
1332
1333 if (list_member_oid(relids, childrelid))
1334 continue;
1335
1336 /* find_all_inheritors already got lock */
1337 childrel = table_open(childrelid, NoLock);
1338
1339 /*
1340 * Ignore temp tables of other backends. See similar code in
1341 * ExecuteTruncate().
1342 */
1343 if (RELATION_IS_OTHER_TEMP(childrel))
1344 {
1345 table_close(childrel, lockmode);
1346 continue;
1347 }
1348
1349 rels = lappend(rels, childrel);
1350 part_rels = lappend(part_rels, childrel);
1351 relids = lappend_oid(relids, childrelid);
1352 /* Log this relation only if needed for logical decoding */
1353 if (RelationIsLogicallyLogged(childrel))
1354 relids_logged = lappend_oid(relids_logged, childrelid);
1355 }
1356 }
1357 }
1358
1359 /*
1360 * Even if we used CASCADE on the upstream master we explicitly default to
1361 * replaying changes without further cascading. This might be later
1362 * changeable with a user specified option.
1363 */
1364 ExecuteTruncateGuts(rels, relids, relids_logged, DROP_RESTRICT, restart_seqs);
1365
1366 foreach(lc, remote_rels)
1367 {
1368 LogicalRepRelMapEntry *rel = lfirst(lc);
1369
1370 logicalrep_rel_close(rel, NoLock);
1371 }
1372 foreach(lc, part_rels)
1373 {
1374 Relation rel = lfirst(lc);
1375
1376 table_close(rel, NoLock);
1377 }
1378
1379 end_replication_step();
1380 }
1381
1382
1383 /*
1384 * Logical replication protocol message dispatcher.
1385 */
1386 static void
apply_dispatch(StringInfo s)1387 apply_dispatch(StringInfo s)
1388 {
1389 char action = pq_getmsgbyte(s);
1390
1391 switch (action)
1392 {
1393 /* BEGIN */
1394 case 'B':
1395 apply_handle_begin(s);
1396 break;
1397 /* COMMIT */
1398 case 'C':
1399 apply_handle_commit(s);
1400 break;
1401 /* INSERT */
1402 case 'I':
1403 apply_handle_insert(s);
1404 break;
1405 /* UPDATE */
1406 case 'U':
1407 apply_handle_update(s);
1408 break;
1409 /* DELETE */
1410 case 'D':
1411 apply_handle_delete(s);
1412 break;
1413 /* TRUNCATE */
1414 case 'T':
1415 apply_handle_truncate(s);
1416 break;
1417 /* RELATION */
1418 case 'R':
1419 apply_handle_relation(s);
1420 break;
1421 /* TYPE */
1422 case 'Y':
1423 apply_handle_type(s);
1424 break;
1425 /* ORIGIN */
1426 case 'O':
1427 apply_handle_origin(s);
1428 break;
1429 default:
1430 ereport(ERROR,
1431 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1432 errmsg("invalid logical replication message type \"%c\"", action)));
1433 }
1434 }
1435
1436 /*
1437 * Figure out which write/flush positions to report to the walsender process.
1438 *
1439 * We can't simply report back the last LSN the walsender sent us because the
1440 * local transaction might not yet be flushed to disk locally. Instead we
1441 * build a list that associates local with remote LSNs for every commit. When
1442 * reporting back the flush position to the sender we iterate that list and
1443 * check which entries on it are already locally flushed. Those we can report
1444 * as having been flushed.
1445 *
1446 * The have_pending_txes is true if there are outstanding transactions that
1447 * need to be flushed.
1448 */
1449 static void
get_flush_position(XLogRecPtr * write,XLogRecPtr * flush,bool * have_pending_txes)1450 get_flush_position(XLogRecPtr *write, XLogRecPtr *flush,
1451 bool *have_pending_txes)
1452 {
1453 dlist_mutable_iter iter;
1454 XLogRecPtr local_flush = GetFlushRecPtr();
1455
1456 *write = InvalidXLogRecPtr;
1457 *flush = InvalidXLogRecPtr;
1458
1459 dlist_foreach_modify(iter, &lsn_mapping)
1460 {
1461 FlushPosition *pos =
1462 dlist_container(FlushPosition, node, iter.cur);
1463
1464 *write = pos->remote_end;
1465
1466 if (pos->local_end <= local_flush)
1467 {
1468 *flush = pos->remote_end;
1469 dlist_delete(iter.cur);
1470 pfree(pos);
1471 }
1472 else
1473 {
1474 /*
1475 * Don't want to uselessly iterate over the rest of the list which
1476 * could potentially be long. Instead get the last element and
1477 * grab the write position from there.
1478 */
1479 pos = dlist_tail_element(FlushPosition, node,
1480 &lsn_mapping);
1481 *write = pos->remote_end;
1482 *have_pending_txes = true;
1483 return;
1484 }
1485 }
1486
1487 *have_pending_txes = !dlist_is_empty(&lsn_mapping);
1488 }
1489
1490 /*
1491 * Store current remote/local lsn pair in the tracking list.
1492 */
1493 static void
store_flush_position(XLogRecPtr remote_lsn)1494 store_flush_position(XLogRecPtr remote_lsn)
1495 {
1496 FlushPosition *flushpos;
1497
1498 /* Need to do this in permanent context */
1499 MemoryContextSwitchTo(ApplyContext);
1500
1501 /* Track commit lsn */
1502 flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
1503 flushpos->local_end = XactLastCommitEnd;
1504 flushpos->remote_end = remote_lsn;
1505
1506 dlist_push_tail(&lsn_mapping, &flushpos->node);
1507 MemoryContextSwitchTo(ApplyMessageContext);
1508 }
1509
1510
1511 /* Update statistics of the worker. */
1512 static void
UpdateWorkerStats(XLogRecPtr last_lsn,TimestampTz send_time,bool reply)1513 UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
1514 {
1515 MyLogicalRepWorker->last_lsn = last_lsn;
1516 MyLogicalRepWorker->last_send_time = send_time;
1517 MyLogicalRepWorker->last_recv_time = GetCurrentTimestamp();
1518 if (reply)
1519 {
1520 MyLogicalRepWorker->reply_lsn = last_lsn;
1521 MyLogicalRepWorker->reply_time = send_time;
1522 }
1523 }
1524
1525 /*
1526 * Apply main loop.
1527 */
1528 static void
LogicalRepApplyLoop(XLogRecPtr last_received)1529 LogicalRepApplyLoop(XLogRecPtr last_received)
1530 {
1531 TimestampTz last_recv_timestamp = GetCurrentTimestamp();
1532 bool ping_sent = false;
1533
1534 /*
1535 * Init the ApplyMessageContext which we clean up after each replication
1536 * protocol message.
1537 */
1538 ApplyMessageContext = AllocSetContextCreate(ApplyContext,
1539 "ApplyMessageContext",
1540 ALLOCSET_DEFAULT_SIZES);
1541
1542 /* mark as idle, before starting to loop */
1543 pgstat_report_activity(STATE_IDLE, NULL);
1544
1545 /* This outer loop iterates once per wait. */
1546 for (;;)
1547 {
1548 pgsocket fd = PGINVALID_SOCKET;
1549 int rc;
1550 int len;
1551 char *buf = NULL;
1552 bool endofstream = false;
1553 long wait_time;
1554
1555 CHECK_FOR_INTERRUPTS();
1556
1557 MemoryContextSwitchTo(ApplyMessageContext);
1558
1559 len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
1560
1561 if (len != 0)
1562 {
1563 /* Loop to process all available data (without blocking). */
1564 for (;;)
1565 {
1566 CHECK_FOR_INTERRUPTS();
1567
1568 if (len == 0)
1569 {
1570 break;
1571 }
1572 else if (len < 0)
1573 {
1574 ereport(LOG,
1575 (errmsg("data stream from publisher has ended")));
1576 endofstream = true;
1577 break;
1578 }
1579 else
1580 {
1581 int c;
1582 StringInfoData s;
1583
1584 /* Reset timeout. */
1585 last_recv_timestamp = GetCurrentTimestamp();
1586 ping_sent = false;
1587
1588 /* Ensure we are reading the data into our memory context. */
1589 MemoryContextSwitchTo(ApplyMessageContext);
1590
1591 s.data = buf;
1592 s.len = len;
1593 s.cursor = 0;
1594 s.maxlen = -1;
1595
1596 c = pq_getmsgbyte(&s);
1597
1598 if (c == 'w')
1599 {
1600 XLogRecPtr start_lsn;
1601 XLogRecPtr end_lsn;
1602 TimestampTz send_time;
1603
1604 start_lsn = pq_getmsgint64(&s);
1605 end_lsn = pq_getmsgint64(&s);
1606 send_time = pq_getmsgint64(&s);
1607
1608 if (last_received < start_lsn)
1609 last_received = start_lsn;
1610
1611 if (last_received < end_lsn)
1612 last_received = end_lsn;
1613
1614 UpdateWorkerStats(last_received, send_time, false);
1615
1616 apply_dispatch(&s);
1617 }
1618 else if (c == 'k')
1619 {
1620 XLogRecPtr end_lsn;
1621 TimestampTz timestamp;
1622 bool reply_requested;
1623
1624 end_lsn = pq_getmsgint64(&s);
1625 timestamp = pq_getmsgint64(&s);
1626 reply_requested = pq_getmsgbyte(&s);
1627
1628 if (last_received < end_lsn)
1629 last_received = end_lsn;
1630
1631 send_feedback(last_received, reply_requested, false);
1632 UpdateWorkerStats(last_received, timestamp, true);
1633 }
1634 /* other message types are purposefully ignored */
1635
1636 MemoryContextReset(ApplyMessageContext);
1637 }
1638
1639 len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
1640 }
1641 }
1642
1643 /* confirm all writes so far */
1644 send_feedback(last_received, false, false);
1645
1646 if (!in_remote_transaction)
1647 {
1648 /*
1649 * If we didn't get any transactions for a while there might be
1650 * unconsumed invalidation messages in the queue, consume them
1651 * now.
1652 */
1653 AcceptInvalidationMessages();
1654 maybe_reread_subscription();
1655
1656 /* Process any table synchronization changes. */
1657 process_syncing_tables(last_received);
1658 }
1659
1660 /* Cleanup the memory. */
1661 MemoryContextResetAndDeleteChildren(ApplyMessageContext);
1662 MemoryContextSwitchTo(TopMemoryContext);
1663
1664 /* Check if we need to exit the streaming loop. */
1665 if (endofstream)
1666 {
1667 TimeLineID tli;
1668
1669 walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
1670 break;
1671 }
1672
1673 /*
1674 * Wait for more data or latch. If we have unflushed transactions,
1675 * wake up after WalWriterDelay to see if they've been flushed yet (in
1676 * which case we should send a feedback message). Otherwise, there's
1677 * no particular urgency about waking up unless we get data or a
1678 * signal.
1679 */
1680 if (!dlist_is_empty(&lsn_mapping))
1681 wait_time = WalWriterDelay;
1682 else
1683 wait_time = NAPTIME_PER_CYCLE;
1684
1685 rc = WaitLatchOrSocket(MyLatch,
1686 WL_SOCKET_READABLE | WL_LATCH_SET |
1687 WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
1688 fd, wait_time,
1689 WAIT_EVENT_LOGICAL_APPLY_MAIN);
1690
1691 if (rc & WL_LATCH_SET)
1692 {
1693 ResetLatch(MyLatch);
1694 CHECK_FOR_INTERRUPTS();
1695 }
1696
1697 if (ConfigReloadPending)
1698 {
1699 ConfigReloadPending = false;
1700 ProcessConfigFile(PGC_SIGHUP);
1701 }
1702
1703 if (rc & WL_TIMEOUT)
1704 {
1705 /*
1706 * We didn't receive anything new. If we haven't heard anything
1707 * from the server for more than wal_receiver_timeout / 2, ping
1708 * the server. Also, if it's been longer than
1709 * wal_receiver_status_interval since the last update we sent,
1710 * send a status update to the master anyway, to report any
1711 * progress in applying WAL.
1712 */
1713 bool requestReply = false;
1714
1715 /*
1716 * Check if time since last receive from standby has reached the
1717 * configured limit.
1718 */
1719 if (wal_receiver_timeout > 0)
1720 {
1721 TimestampTz now = GetCurrentTimestamp();
1722 TimestampTz timeout;
1723
1724 timeout =
1725 TimestampTzPlusMilliseconds(last_recv_timestamp,
1726 wal_receiver_timeout);
1727
1728 if (now >= timeout)
1729 ereport(ERROR,
1730 (errmsg("terminating logical replication worker due to timeout")));
1731
1732 /* Check to see if it's time for a ping. */
1733 if (!ping_sent)
1734 {
1735 timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
1736 (wal_receiver_timeout / 2));
1737 if (now >= timeout)
1738 {
1739 requestReply = true;
1740 ping_sent = true;
1741 }
1742 }
1743 }
1744
1745 send_feedback(last_received, requestReply, requestReply);
1746 }
1747 }
1748 }
1749
1750 /*
1751 * Send a Standby Status Update message to server.
1752 *
1753 * 'recvpos' is the latest LSN we've received data to, force is set if we need
1754 * to send a response to avoid timeouts.
1755 */
1756 static void
send_feedback(XLogRecPtr recvpos,bool force,bool requestReply)1757 send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
1758 {
1759 static StringInfo reply_message = NULL;
1760 static TimestampTz send_time = 0;
1761
1762 static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
1763 static XLogRecPtr last_writepos = InvalidXLogRecPtr;
1764 static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
1765
1766 XLogRecPtr writepos;
1767 XLogRecPtr flushpos;
1768 TimestampTz now;
1769 bool have_pending_txes;
1770
1771 /*
1772 * If the user doesn't want status to be reported to the publisher, be
1773 * sure to exit before doing anything at all.
1774 */
1775 if (!force && wal_receiver_status_interval <= 0)
1776 return;
1777
1778 /* It's legal to not pass a recvpos */
1779 if (recvpos < last_recvpos)
1780 recvpos = last_recvpos;
1781
1782 get_flush_position(&writepos, &flushpos, &have_pending_txes);
1783
1784 /*
1785 * No outstanding transactions to flush, we can report the latest received
1786 * position. This is important for synchronous replication.
1787 */
1788 if (!have_pending_txes)
1789 flushpos = writepos = recvpos;
1790
1791 if (writepos < last_writepos)
1792 writepos = last_writepos;
1793
1794 if (flushpos < last_flushpos)
1795 flushpos = last_flushpos;
1796
1797 now = GetCurrentTimestamp();
1798
1799 /* if we've already reported everything we're good */
1800 if (!force &&
1801 writepos == last_writepos &&
1802 flushpos == last_flushpos &&
1803 !TimestampDifferenceExceeds(send_time, now,
1804 wal_receiver_status_interval * 1000))
1805 return;
1806 send_time = now;
1807
1808 if (!reply_message)
1809 {
1810 MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
1811
1812 reply_message = makeStringInfo();
1813 MemoryContextSwitchTo(oldctx);
1814 }
1815 else
1816 resetStringInfo(reply_message);
1817
1818 pq_sendbyte(reply_message, 'r');
1819 pq_sendint64(reply_message, recvpos); /* write */
1820 pq_sendint64(reply_message, flushpos); /* flush */
1821 pq_sendint64(reply_message, writepos); /* apply */
1822 pq_sendint64(reply_message, now); /* sendTime */
1823 pq_sendbyte(reply_message, requestReply); /* replyRequested */
1824
1825 elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
1826 force,
1827 (uint32) (recvpos >> 32), (uint32) recvpos,
1828 (uint32) (writepos >> 32), (uint32) writepos,
1829 (uint32) (flushpos >> 32), (uint32) flushpos
1830 );
1831
1832 walrcv_send(LogRepWorkerWalRcvConn,
1833 reply_message->data, reply_message->len);
1834
1835 if (recvpos > last_recvpos)
1836 last_recvpos = recvpos;
1837 if (writepos > last_writepos)
1838 last_writepos = writepos;
1839 if (flushpos > last_flushpos)
1840 last_flushpos = flushpos;
1841 }
1842
1843 /*
1844 * Reread subscription info if needed. Most changes will be exit.
1845 */
1846 static void
maybe_reread_subscription(void)1847 maybe_reread_subscription(void)
1848 {
1849 MemoryContext oldctx;
1850 Subscription *newsub;
1851 bool started_tx = false;
1852
1853 /* When cache state is valid there is nothing to do here. */
1854 if (MySubscriptionValid)
1855 return;
1856
1857 /* This function might be called inside or outside of transaction. */
1858 if (!IsTransactionState())
1859 {
1860 StartTransactionCommand();
1861 started_tx = true;
1862 }
1863
1864 /* Ensure allocations in permanent context. */
1865 oldctx = MemoryContextSwitchTo(ApplyContext);
1866
1867 newsub = GetSubscription(MyLogicalRepWorker->subid, true);
1868
1869 /*
1870 * Exit if the subscription was removed. This normally should not happen
1871 * as the worker gets killed during DROP SUBSCRIPTION.
1872 */
1873 if (!newsub)
1874 {
1875 ereport(LOG,
1876 (errmsg("logical replication apply worker for subscription \"%s\" will "
1877 "stop because the subscription was removed",
1878 MySubscription->name)));
1879
1880 proc_exit(0);
1881 }
1882
1883 /*
1884 * Exit if the subscription was disabled. This normally should not happen
1885 * as the worker gets killed during ALTER SUBSCRIPTION ... DISABLE.
1886 */
1887 if (!newsub->enabled)
1888 {
1889 ereport(LOG,
1890 (errmsg("logical replication apply worker for subscription \"%s\" will "
1891 "stop because the subscription was disabled",
1892 MySubscription->name)));
1893
1894 proc_exit(0);
1895 }
1896
1897 /*
1898 * Exit if connection string was changed. The launcher will start new
1899 * worker.
1900 */
1901 if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0)
1902 {
1903 ereport(LOG,
1904 (errmsg("logical replication apply worker for subscription \"%s\" will "
1905 "restart because the connection information was changed",
1906 MySubscription->name)));
1907
1908 proc_exit(0);
1909 }
1910
1911 /*
1912 * Exit if subscription name was changed (it's used for
1913 * fallback_application_name). The launcher will start new worker.
1914 */
1915 if (strcmp(newsub->name, MySubscription->name) != 0)
1916 {
1917 ereport(LOG,
1918 (errmsg("logical replication apply worker for subscription \"%s\" will "
1919 "restart because subscription was renamed",
1920 MySubscription->name)));
1921
1922 proc_exit(0);
1923 }
1924
1925 /* !slotname should never happen when enabled is true. */
1926 Assert(newsub->slotname);
1927
1928 /*
1929 * We need to make new connection to new slot if slot name has changed so
1930 * exit here as well if that's the case.
1931 */
1932 if (strcmp(newsub->slotname, MySubscription->slotname) != 0)
1933 {
1934 ereport(LOG,
1935 (errmsg("logical replication apply worker for subscription \"%s\" will "
1936 "restart because the replication slot name was changed",
1937 MySubscription->name)));
1938
1939 proc_exit(0);
1940 }
1941
1942 /*
1943 * Exit if publication list was changed. The launcher will start new
1944 * worker.
1945 */
1946 if (!equal(newsub->publications, MySubscription->publications))
1947 {
1948 ereport(LOG,
1949 (errmsg("logical replication apply worker for subscription \"%s\" will "
1950 "restart because subscription's publications were changed",
1951 MySubscription->name)));
1952
1953 proc_exit(0);
1954 }
1955
1956 /* Check for other changes that should never happen too. */
1957 if (newsub->dbid != MySubscription->dbid)
1958 {
1959 elog(ERROR, "subscription %u changed unexpectedly",
1960 MyLogicalRepWorker->subid);
1961 }
1962
1963 /* Clean old subscription info and switch to new one. */
1964 FreeSubscription(MySubscription);
1965 MySubscription = newsub;
1966
1967 MemoryContextSwitchTo(oldctx);
1968
1969 /* Change synchronous commit according to the user's wishes */
1970 SetConfigOption("synchronous_commit", MySubscription->synccommit,
1971 PGC_BACKEND, PGC_S_OVERRIDE);
1972
1973 if (started_tx)
1974 CommitTransactionCommand();
1975
1976 MySubscriptionValid = true;
1977 }
1978
1979 /*
1980 * Callback from subscription syscache invalidation.
1981 */
1982 static void
subscription_change_cb(Datum arg,int cacheid,uint32 hashvalue)1983 subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
1984 {
1985 MySubscriptionValid = false;
1986 }
1987
1988 /* Logical Replication Apply worker entry point */
1989 void
ApplyWorkerMain(Datum main_arg)1990 ApplyWorkerMain(Datum main_arg)
1991 {
1992 int worker_slot = DatumGetInt32(main_arg);
1993 MemoryContext oldctx;
1994 char originname[NAMEDATALEN];
1995 XLogRecPtr origin_startpos;
1996 char *myslotname;
1997 WalRcvStreamOptions options;
1998
1999 /* Attach to slot */
2000 logicalrep_worker_attach(worker_slot);
2001
2002 /* Setup signal handling */
2003 pqsignal(SIGHUP, SignalHandlerForConfigReload);
2004 pqsignal(SIGTERM, die);
2005 BackgroundWorkerUnblockSignals();
2006
2007 /*
2008 * We don't currently need any ResourceOwner in a walreceiver process, but
2009 * if we did, we could call CreateAuxProcessResourceOwner here.
2010 */
2011
2012 /* Initialise stats to a sanish value */
2013 MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
2014 MyLogicalRepWorker->reply_time = GetCurrentTimestamp();
2015
2016 /* Load the libpq-specific functions */
2017 load_file("libpqwalreceiver", false);
2018
2019 /* Run as replica session replication role. */
2020 SetConfigOption("session_replication_role", "replica",
2021 PGC_SUSET, PGC_S_OVERRIDE);
2022
2023 /* Connect to our database. */
2024 BackgroundWorkerInitializeConnectionByOid(MyLogicalRepWorker->dbid,
2025 MyLogicalRepWorker->userid,
2026 0);
2027
2028 /*
2029 * Set always-secure search path, so malicious users can't redirect user
2030 * code (e.g. pg_index.indexprs).
2031 */
2032 SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
2033
2034 /* Load the subscription into persistent memory context. */
2035 ApplyContext = AllocSetContextCreate(TopMemoryContext,
2036 "ApplyContext",
2037 ALLOCSET_DEFAULT_SIZES);
2038 StartTransactionCommand();
2039 oldctx = MemoryContextSwitchTo(ApplyContext);
2040
2041 MySubscription = GetSubscription(MyLogicalRepWorker->subid, true);
2042 if (!MySubscription)
2043 {
2044 ereport(LOG,
2045 (errmsg("logical replication apply worker for subscription %u will not "
2046 "start because the subscription was removed during startup",
2047 MyLogicalRepWorker->subid)));
2048 proc_exit(0);
2049 }
2050
2051 MySubscriptionValid = true;
2052 MemoryContextSwitchTo(oldctx);
2053
2054 if (!MySubscription->enabled)
2055 {
2056 ereport(LOG,
2057 (errmsg("logical replication apply worker for subscription \"%s\" will not "
2058 "start because the subscription was disabled during startup",
2059 MySubscription->name)));
2060
2061 proc_exit(0);
2062 }
2063
2064 /* Setup synchronous commit according to the user's wishes */
2065 SetConfigOption("synchronous_commit", MySubscription->synccommit,
2066 PGC_BACKEND, PGC_S_OVERRIDE);
2067
2068 /* Keep us informed about subscription changes. */
2069 CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
2070 subscription_change_cb,
2071 (Datum) 0);
2072
2073 if (am_tablesync_worker())
2074 ereport(LOG,
2075 (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
2076 MySubscription->name, get_rel_name(MyLogicalRepWorker->relid))));
2077 else
2078 ereport(LOG,
2079 (errmsg("logical replication apply worker for subscription \"%s\" has started",
2080 MySubscription->name)));
2081
2082 CommitTransactionCommand();
2083
2084 /* Connect to the origin and start the replication. */
2085 elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
2086 MySubscription->conninfo);
2087
2088 if (am_tablesync_worker())
2089 {
2090 char *syncslotname;
2091
2092 /* This is table synchronization worker, call initial sync. */
2093 syncslotname = LogicalRepSyncTableStart(&origin_startpos);
2094
2095 /* The slot name needs to be allocated in permanent memory context. */
2096 oldctx = MemoryContextSwitchTo(ApplyContext);
2097 myslotname = pstrdup(syncslotname);
2098 MemoryContextSwitchTo(oldctx);
2099
2100 pfree(syncslotname);
2101 }
2102 else
2103 {
2104 /* This is main apply worker */
2105 RepOriginId originid;
2106 TimeLineID startpointTLI;
2107 char *err;
2108
2109 myslotname = MySubscription->slotname;
2110
2111 /*
2112 * This shouldn't happen if the subscription is enabled, but guard
2113 * against DDL bugs or manual catalog changes. (libpqwalreceiver will
2114 * crash if slot is NULL.)
2115 */
2116 if (!myslotname)
2117 ereport(ERROR,
2118 (errmsg("subscription has no replication slot set")));
2119
2120 /* Setup replication origin tracking. */
2121 StartTransactionCommand();
2122 snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid);
2123 originid = replorigin_by_name(originname, true);
2124 if (!OidIsValid(originid))
2125 originid = replorigin_create(originname);
2126 replorigin_session_setup(originid);
2127 replorigin_session_origin = originid;
2128 origin_startpos = replorigin_session_get_progress(false);
2129 CommitTransactionCommand();
2130
2131 LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
2132 MySubscription->name, &err);
2133 if (LogRepWorkerWalRcvConn == NULL)
2134 ereport(ERROR,
2135 (errmsg("could not connect to the publisher: %s", err)));
2136
2137 /*
2138 * We don't really use the output identify_system for anything but it
2139 * does some initializations on the upstream so let's still call it.
2140 */
2141 (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
2142 }
2143
2144 /*
2145 * Setup callback for syscache so that we know when something changes in
2146 * the subscription relation state.
2147 */
2148 CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
2149 invalidate_syncing_table_states,
2150 (Datum) 0);
2151
2152 /* Build logical replication streaming options. */
2153 options.logical = true;
2154 options.startpoint = origin_startpos;
2155 options.slotname = myslotname;
2156 options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM;
2157 options.proto.logical.publication_names = MySubscription->publications;
2158
2159 /* Start normal logical streaming replication. */
2160 walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
2161
2162 /* Run the main loop. */
2163 LogicalRepApplyLoop(origin_startpos);
2164
2165 proc_exit(0);
2166 }
2167
2168 /*
2169 * Is current process a logical replication worker?
2170 */
2171 bool
IsLogicalWorker(void)2172 IsLogicalWorker(void)
2173 {
2174 return MyLogicalRepWorker != NULL;
2175 }
2176