1 /*-------------------------------------------------------------------------
2 * worker.c
3 * PostgreSQL logical replication worker (apply)
4 *
5 * Copyright (c) 2016-2019, PostgreSQL Global Development Group
6 *
7 * IDENTIFICATION
8 * src/backend/replication/logical/worker.c
9 *
10 * NOTES
11 * This file contains the worker which applies logical changes as they come
12 * from remote logical replication stream.
13 *
14 * The main worker (apply) is started by logical replication worker
15 * launcher for every enabled subscription in a database. It uses
16 * walsender protocol to communicate with publisher.
17 *
18 * This module includes server facing code and shares libpqwalreceiver
19 * module with walreceiver for providing the libpq specific functionality.
20 *
21 *-------------------------------------------------------------------------
22 */
23
24 #include "postgres.h"
25
26 #include "access/table.h"
27 #include "access/tableam.h"
28 #include "access/xact.h"
29 #include "access/xlog_internal.h"
30 #include "catalog/catalog.h"
31 #include "catalog/namespace.h"
32 #include "catalog/pg_subscription.h"
33 #include "catalog/pg_subscription_rel.h"
34 #include "commands/tablecmds.h"
35 #include "commands/trigger.h"
36 #include "executor/executor.h"
37 #include "executor/nodeModifyTable.h"
38 #include "funcapi.h"
39 #include "libpq/pqformat.h"
40 #include "libpq/pqsignal.h"
41 #include "mb/pg_wchar.h"
42 #include "miscadmin.h"
43 #include "nodes/makefuncs.h"
44 #include "optimizer/optimizer.h"
45 #include "pgstat.h"
46 #include "postmaster/bgworker.h"
47 #include "postmaster/postmaster.h"
48 #include "postmaster/walwriter.h"
49 #include "replication/decode.h"
50 #include "replication/logical.h"
51 #include "replication/logicalproto.h"
52 #include "replication/logicalrelation.h"
53 #include "replication/logicalworker.h"
54 #include "replication/origin.h"
55 #include "replication/reorderbuffer.h"
56 #include "replication/snapbuild.h"
57 #include "replication/walreceiver.h"
58 #include "replication/worker_internal.h"
59 #include "rewrite/rewriteHandler.h"
60 #include "storage/bufmgr.h"
61 #include "storage/ipc.h"
62 #include "storage/lmgr.h"
63 #include "storage/proc.h"
64 #include "storage/procarray.h"
65 #include "tcop/tcopprot.h"
66 #include "utils/builtins.h"
67 #include "utils/catcache.h"
68 #include "utils/datum.h"
69 #include "utils/fmgroids.h"
70 #include "utils/guc.h"
71 #include "utils/inval.h"
72 #include "utils/lsyscache.h"
73 #include "utils/memutils.h"
74 #include "utils/rel.h"
75 #include "utils/syscache.h"
76 #include "utils/timeout.h"
77
78 #define NAPTIME_PER_CYCLE 1000 /* max sleep time between cycles (1s) */
79
80 typedef struct FlushPosition
81 {
82 dlist_node node;
83 XLogRecPtr local_end;
84 XLogRecPtr remote_end;
85 } FlushPosition;
86
87 static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
88
89 typedef struct SlotErrCallbackArg
90 {
91 LogicalRepRelMapEntry *rel;
92 int remote_attnum;
93 } SlotErrCallbackArg;
94
95 static MemoryContext ApplyMessageContext = NULL;
96 MemoryContext ApplyContext = NULL;
97
98 WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
99
100 Subscription *MySubscription = NULL;
101 bool MySubscriptionValid = false;
102
103 bool in_remote_transaction = false;
104 static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
105
106 static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
107
108 static void store_flush_position(XLogRecPtr remote_lsn);
109
110 static void maybe_reread_subscription(void);
111
112 /* Flags set by signal handlers */
113 static volatile sig_atomic_t got_SIGHUP = false;
114
115 /*
116 * Should this worker apply changes for given relation.
117 *
118 * This is mainly needed for initial relation data sync as that runs in
119 * separate worker process running in parallel and we need some way to skip
120 * changes coming to the main apply worker during the sync of a table.
121 *
122 * Note we need to do smaller or equals comparison for SYNCDONE state because
123 * it might hold position of end of initial slot consistent point WAL
124 * record + 1 (ie start of next record) and next record can be COMMIT of
125 * transaction we are now processing (which is what we set remote_final_lsn
126 * to in apply_handle_begin).
127 */
128 static bool
should_apply_changes_for_rel(LogicalRepRelMapEntry * rel)129 should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
130 {
131 if (am_tablesync_worker())
132 return MyLogicalRepWorker->relid == rel->localreloid;
133 else
134 return (rel->state == SUBREL_STATE_READY ||
135 (rel->state == SUBREL_STATE_SYNCDONE &&
136 rel->statelsn <= remote_final_lsn));
137 }
138
139 /*
140 * Begin one step (one INSERT, UPDATE, etc) of a replication transaction.
141 *
142 * Start a transaction, if this is the first step (else we keep using the
143 * existing transaction).
144 * Also provide a global snapshot and ensure we run in ApplyMessageContext.
145 */
146 static void
begin_replication_step(void)147 begin_replication_step(void)
148 {
149 SetCurrentStatementStartTimestamp();
150
151 if (!IsTransactionState())
152 {
153 StartTransactionCommand();
154 maybe_reread_subscription();
155 }
156
157 PushActiveSnapshot(GetTransactionSnapshot());
158
159 MemoryContextSwitchTo(ApplyMessageContext);
160 }
161
162 /*
163 * Finish up one step of a replication transaction.
164 * Callers of begin_replication_step() must also call this.
165 *
166 * We don't close out the transaction here, but we should increment
167 * the command counter to make the effects of this step visible.
168 */
169 static void
end_replication_step(void)170 end_replication_step(void)
171 {
172 PopActiveSnapshot();
173
174 CommandCounterIncrement();
175 }
176
177
178 /*
179 * Executor state preparation for evaluation of constraint expressions,
180 * indexes and triggers.
181 *
182 * This is based on similar code in copy.c
183 */
184 static EState *
create_estate_for_relation(LogicalRepRelMapEntry * rel)185 create_estate_for_relation(LogicalRepRelMapEntry *rel)
186 {
187 EState *estate;
188 ResultRelInfo *resultRelInfo;
189 RangeTblEntry *rte;
190
191 estate = CreateExecutorState();
192
193 rte = makeNode(RangeTblEntry);
194 rte->rtekind = RTE_RELATION;
195 rte->relid = RelationGetRelid(rel->localrel);
196 rte->relkind = rel->localrel->rd_rel->relkind;
197 rte->rellockmode = AccessShareLock;
198 ExecInitRangeTable(estate, list_make1(rte));
199
200 resultRelInfo = makeNode(ResultRelInfo);
201 InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
202
203 estate->es_result_relations = resultRelInfo;
204 estate->es_num_result_relations = 1;
205 estate->es_result_relation_info = resultRelInfo;
206
207 estate->es_output_cid = GetCurrentCommandId(true);
208
209 /* Prepare to catch AFTER triggers. */
210 AfterTriggerBeginQuery();
211
212 return estate;
213 }
214
215 /*
216 * Finish any operations related to the executor state created by
217 * create_estate_for_relation().
218 */
219 static void
finish_estate(EState * estate)220 finish_estate(EState *estate)
221 {
222 /* Handle any queued AFTER triggers. */
223 AfterTriggerEndQuery(estate);
224
225 /* Cleanup. */
226 ExecResetTupleTable(estate->es_tupleTable, false);
227 FreeExecutorState(estate);
228 }
229
230 /*
231 * Executes default values for columns for which we can't map to remote
232 * relation columns.
233 *
234 * This allows us to support tables which have more columns on the downstream
235 * than on the upstream.
236 */
237 static void
slot_fill_defaults(LogicalRepRelMapEntry * rel,EState * estate,TupleTableSlot * slot)238 slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate,
239 TupleTableSlot *slot)
240 {
241 TupleDesc desc = RelationGetDescr(rel->localrel);
242 int num_phys_attrs = desc->natts;
243 int i;
244 int attnum,
245 num_defaults = 0;
246 int *defmap;
247 ExprState **defexprs;
248 ExprContext *econtext;
249
250 econtext = GetPerTupleExprContext(estate);
251
252 /* We got all the data via replication, no need to evaluate anything. */
253 if (num_phys_attrs == rel->remoterel.natts)
254 return;
255
256 defmap = (int *) palloc(num_phys_attrs * sizeof(int));
257 defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
258
259 for (attnum = 0; attnum < num_phys_attrs; attnum++)
260 {
261 Expr *defexpr;
262
263 if (TupleDescAttr(desc, attnum)->attisdropped || TupleDescAttr(desc, attnum)->attgenerated)
264 continue;
265
266 if (rel->attrmap[attnum] >= 0)
267 continue;
268
269 defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
270
271 if (defexpr != NULL)
272 {
273 /* Run the expression through planner */
274 defexpr = expression_planner(defexpr);
275
276 /* Initialize executable expression in copycontext */
277 defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
278 defmap[num_defaults] = attnum;
279 num_defaults++;
280 }
281
282 }
283
284 for (i = 0; i < num_defaults; i++)
285 slot->tts_values[defmap[i]] =
286 ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
287 }
288
289 /*
290 * Error callback to give more context info about data conversion failures
291 * while reading data from the remote server.
292 */
293 static void
slot_store_error_callback(void * arg)294 slot_store_error_callback(void *arg)
295 {
296 SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
297 LogicalRepRelMapEntry *rel;
298
299 /* Nothing to do if remote attribute number is not set */
300 if (errarg->remote_attnum < 0)
301 return;
302
303 rel = errarg->rel;
304 errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\"",
305 rel->remoterel.nspname, rel->remoterel.relname,
306 rel->remoterel.attnames[errarg->remote_attnum]);
307 }
308
309 /*
310 * Store data in C string form into slot.
311 * This is similar to BuildTupleFromCStrings but TupleTableSlot fits our
312 * use better.
313 */
314 static void
slot_store_cstrings(TupleTableSlot * slot,LogicalRepRelMapEntry * rel,char ** values)315 slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
316 char **values)
317 {
318 int natts = slot->tts_tupleDescriptor->natts;
319 int i;
320 SlotErrCallbackArg errarg;
321 ErrorContextCallback errcallback;
322
323 ExecClearTuple(slot);
324
325 /* Push callback + info on the error context stack */
326 errarg.rel = rel;
327 errarg.remote_attnum = -1;
328 errcallback.callback = slot_store_error_callback;
329 errcallback.arg = (void *) &errarg;
330 errcallback.previous = error_context_stack;
331 error_context_stack = &errcallback;
332
333 /* Call the "in" function for each non-dropped attribute */
334 for (i = 0; i < natts; i++)
335 {
336 Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
337 int remoteattnum = rel->attrmap[i];
338
339 if (!att->attisdropped && remoteattnum >= 0 &&
340 values[remoteattnum] != NULL)
341 {
342 Oid typinput;
343 Oid typioparam;
344
345 errarg.remote_attnum = remoteattnum;
346
347 getTypeInputInfo(att->atttypid, &typinput, &typioparam);
348 slot->tts_values[i] =
349 OidInputFunctionCall(typinput, values[remoteattnum],
350 typioparam, att->atttypmod);
351 slot->tts_isnull[i] = false;
352
353 errarg.remote_attnum = -1;
354 }
355 else
356 {
357 /*
358 * We assign NULL to dropped attributes, NULL values, and missing
359 * values (missing values should be later filled using
360 * slot_fill_defaults).
361 */
362 slot->tts_values[i] = (Datum) 0;
363 slot->tts_isnull[i] = true;
364 }
365 }
366
367 /* Pop the error context stack */
368 error_context_stack = errcallback.previous;
369
370 ExecStoreVirtualTuple(slot);
371 }
372
373 /*
374 * Replace selected columns with user data provided as C strings.
375 * This is somewhat similar to heap_modify_tuple but also calls the type
376 * input functions on the user data.
377 * "slot" is filled with a copy of the tuple in "srcslot", with
378 * columns selected by the "replaces" array replaced with data values
379 * from "values".
380 * Caution: unreplaced pass-by-ref columns in "slot" will point into the
381 * storage for "srcslot". This is OK for current usage, but someday we may
382 * need to materialize "slot" at the end to make it independent of "srcslot".
383 */
384 static void
slot_modify_cstrings(TupleTableSlot * slot,TupleTableSlot * srcslot,LogicalRepRelMapEntry * rel,char ** values,bool * replaces)385 slot_modify_cstrings(TupleTableSlot *slot, TupleTableSlot *srcslot,
386 LogicalRepRelMapEntry *rel,
387 char **values, bool *replaces)
388 {
389 int natts = slot->tts_tupleDescriptor->natts;
390 int i;
391 SlotErrCallbackArg errarg;
392 ErrorContextCallback errcallback;
393
394 /* We'll fill "slot" with a virtual tuple, so we must start with ... */
395 ExecClearTuple(slot);
396
397 /*
398 * Copy all the column data from srcslot, so that we'll have valid values
399 * for unreplaced columns.
400 */
401 Assert(natts == srcslot->tts_tupleDescriptor->natts);
402 slot_getallattrs(srcslot);
403 memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
404 memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
405
406 /* For error reporting, push callback + info on the error context stack */
407 errarg.rel = rel;
408 errarg.remote_attnum = -1;
409 errcallback.callback = slot_store_error_callback;
410 errcallback.arg = (void *) &errarg;
411 errcallback.previous = error_context_stack;
412 error_context_stack = &errcallback;
413
414 /* Call the "in" function for each replaced attribute */
415 for (i = 0; i < natts; i++)
416 {
417 Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
418 int remoteattnum = rel->attrmap[i];
419
420 if (remoteattnum < 0)
421 continue;
422
423 if (!replaces[remoteattnum])
424 continue;
425
426 if (values[remoteattnum] != NULL)
427 {
428 Oid typinput;
429 Oid typioparam;
430
431 errarg.remote_attnum = remoteattnum;
432
433 getTypeInputInfo(att->atttypid, &typinput, &typioparam);
434 slot->tts_values[i] =
435 OidInputFunctionCall(typinput, values[remoteattnum],
436 typioparam, att->atttypmod);
437 slot->tts_isnull[i] = false;
438
439 errarg.remote_attnum = -1;
440 }
441 else
442 {
443 slot->tts_values[i] = (Datum) 0;
444 slot->tts_isnull[i] = true;
445 }
446 }
447
448 /* Pop the error context stack */
449 error_context_stack = errcallback.previous;
450
451 /* And finally, declare that "slot" contains a valid virtual tuple */
452 ExecStoreVirtualTuple(slot);
453 }
454
455 /*
456 * Handle BEGIN message.
457 */
458 static void
apply_handle_begin(StringInfo s)459 apply_handle_begin(StringInfo s)
460 {
461 LogicalRepBeginData begin_data;
462
463 logicalrep_read_begin(s, &begin_data);
464
465 remote_final_lsn = begin_data.final_lsn;
466
467 in_remote_transaction = true;
468
469 pgstat_report_activity(STATE_RUNNING, NULL);
470 }
471
472 /*
473 * Handle COMMIT message.
474 *
475 * TODO, support tracking of multiple origins
476 */
477 static void
apply_handle_commit(StringInfo s)478 apply_handle_commit(StringInfo s)
479 {
480 LogicalRepCommitData commit_data;
481
482 logicalrep_read_commit(s, &commit_data);
483
484 if (commit_data.commit_lsn != remote_final_lsn)
485 ereport(ERROR,
486 (errcode(ERRCODE_PROTOCOL_VIOLATION),
487 errmsg_internal("incorrect commit LSN %X/%X in commit message (expected %X/%X)",
488 (uint32) (commit_data.commit_lsn >> 32),
489 (uint32) commit_data.commit_lsn,
490 (uint32) (remote_final_lsn >> 32),
491 (uint32) remote_final_lsn)));
492
493 /* The synchronization worker runs in single transaction. */
494 if (IsTransactionState() && !am_tablesync_worker())
495 {
496 /*
497 * Update origin state so we can restart streaming from correct
498 * position in case of crash.
499 */
500 replorigin_session_origin_lsn = commit_data.end_lsn;
501 replorigin_session_origin_timestamp = commit_data.committime;
502
503 CommitTransactionCommand();
504 pgstat_report_stat(false);
505
506 store_flush_position(commit_data.end_lsn);
507 }
508 else
509 {
510 /* Process any invalidation messages that might have accumulated. */
511 AcceptInvalidationMessages();
512 maybe_reread_subscription();
513 }
514
515 in_remote_transaction = false;
516
517 /* Process any tables that are being synchronized in parallel. */
518 process_syncing_tables(commit_data.end_lsn);
519
520 pgstat_report_activity(STATE_IDLE, NULL);
521 }
522
523 /*
524 * Handle ORIGIN message.
525 *
526 * TODO, support tracking of multiple origins
527 */
528 static void
apply_handle_origin(StringInfo s)529 apply_handle_origin(StringInfo s)
530 {
531 /*
532 * ORIGIN message can only come inside remote transaction and before any
533 * actual writes.
534 */
535 if (!in_remote_transaction ||
536 (IsTransactionState() && !am_tablesync_worker()))
537 ereport(ERROR,
538 (errcode(ERRCODE_PROTOCOL_VIOLATION),
539 errmsg("ORIGIN message sent out of order")));
540 }
541
542 /*
543 * Handle RELATION message.
544 *
545 * Note we don't do validation against local schema here. The validation
546 * against local schema is postponed until first change for given relation
547 * comes as we only care about it when applying changes for it anyway and we
548 * do less locking this way.
549 */
550 static void
apply_handle_relation(StringInfo s)551 apply_handle_relation(StringInfo s)
552 {
553 LogicalRepRelation *rel;
554
555 rel = logicalrep_read_rel(s);
556 logicalrep_relmap_update(rel);
557 }
558
559 /*
560 * Handle TYPE message.
561 *
562 * This is now vestigial; we read the info and discard it.
563 */
564 static void
apply_handle_type(StringInfo s)565 apply_handle_type(StringInfo s)
566 {
567 LogicalRepTyp typ;
568
569 logicalrep_read_typ(s, &typ);
570 }
571
572 /*
573 * Get replica identity index or if it is not defined a primary key.
574 *
575 * If neither is defined, returns InvalidOid
576 */
577 static Oid
GetRelationIdentityOrPK(Relation rel)578 GetRelationIdentityOrPK(Relation rel)
579 {
580 Oid idxoid;
581
582 idxoid = RelationGetReplicaIndex(rel);
583
584 if (!OidIsValid(idxoid))
585 idxoid = RelationGetPrimaryKeyIndex(rel);
586
587 return idxoid;
588 }
589
590 /*
591 * Handle INSERT message.
592 */
593 static void
apply_handle_insert(StringInfo s)594 apply_handle_insert(StringInfo s)
595 {
596 LogicalRepRelMapEntry *rel;
597 LogicalRepTupleData newtup;
598 LogicalRepRelId relid;
599 EState *estate;
600 TupleTableSlot *remoteslot;
601 MemoryContext oldctx;
602
603 begin_replication_step();
604
605 relid = logicalrep_read_insert(s, &newtup);
606 rel = logicalrep_rel_open(relid, RowExclusiveLock);
607 if (!should_apply_changes_for_rel(rel))
608 {
609 /*
610 * The relation can't become interesting in the middle of the
611 * transaction so it's safe to unlock it.
612 */
613 logicalrep_rel_close(rel, RowExclusiveLock);
614 end_replication_step();
615 return;
616 }
617
618 /* Initialize the executor state. */
619 estate = create_estate_for_relation(rel);
620 remoteslot = ExecInitExtraTupleSlot(estate,
621 RelationGetDescr(rel->localrel),
622 &TTSOpsVirtual);
623
624 /* Process and store remote tuple in the slot */
625 oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
626 slot_store_cstrings(remoteslot, rel, newtup.values);
627 slot_fill_defaults(rel, estate, remoteslot);
628 MemoryContextSwitchTo(oldctx);
629
630 ExecOpenIndices(estate->es_result_relation_info, false);
631
632 /* Do the insert. */
633 ExecSimpleRelationInsert(estate, remoteslot);
634
635 /* Cleanup. */
636 ExecCloseIndices(estate->es_result_relation_info);
637
638 finish_estate(estate);
639
640 logicalrep_rel_close(rel, NoLock);
641
642 end_replication_step();
643 }
644
645 /*
646 * Check if the logical replication relation is updatable and throw
647 * appropriate error if it isn't.
648 */
649 static void
check_relation_updatable(LogicalRepRelMapEntry * rel)650 check_relation_updatable(LogicalRepRelMapEntry *rel)
651 {
652 /* Updatable, no error. */
653 if (rel->updatable)
654 return;
655
656 /*
657 * We are in error mode so it's fine this is somewhat slow. It's better to
658 * give user correct error.
659 */
660 if (OidIsValid(GetRelationIdentityOrPK(rel->localrel)))
661 {
662 ereport(ERROR,
663 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
664 errmsg("publisher did not send replica identity column "
665 "expected by the logical replication target relation \"%s.%s\"",
666 rel->remoterel.nspname, rel->remoterel.relname)));
667 }
668
669 ereport(ERROR,
670 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
671 errmsg("logical replication target relation \"%s.%s\" has "
672 "neither REPLICA IDENTITY index nor PRIMARY "
673 "KEY and published relation does not have "
674 "REPLICA IDENTITY FULL",
675 rel->remoterel.nspname, rel->remoterel.relname)));
676 }
677
678 /*
679 * Handle UPDATE message.
680 *
681 * TODO: FDW support
682 */
683 static void
apply_handle_update(StringInfo s)684 apply_handle_update(StringInfo s)
685 {
686 LogicalRepRelMapEntry *rel;
687 LogicalRepRelId relid;
688 Oid idxoid;
689 EState *estate;
690 EPQState epqstate;
691 LogicalRepTupleData oldtup;
692 LogicalRepTupleData newtup;
693 bool has_oldtup;
694 TupleTableSlot *localslot;
695 TupleTableSlot *remoteslot;
696 RangeTblEntry *target_rte;
697 bool found;
698 MemoryContext oldctx;
699
700 begin_replication_step();
701
702 relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
703 &newtup);
704 rel = logicalrep_rel_open(relid, RowExclusiveLock);
705 if (!should_apply_changes_for_rel(rel))
706 {
707 /*
708 * The relation can't become interesting in the middle of the
709 * transaction so it's safe to unlock it.
710 */
711 logicalrep_rel_close(rel, RowExclusiveLock);
712 end_replication_step();
713 return;
714 }
715
716 /* Check if we can do the update. */
717 check_relation_updatable(rel);
718
719 /* Initialize the executor state. */
720 estate = create_estate_for_relation(rel);
721 remoteslot = ExecInitExtraTupleSlot(estate,
722 RelationGetDescr(rel->localrel),
723 &TTSOpsVirtual);
724 localslot = table_slot_create(rel->localrel,
725 &estate->es_tupleTable);
726 EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
727
728 /*
729 * Populate updatedCols so that per-column triggers can fire. This could
730 * include more columns than were actually changed on the publisher
731 * because the logical replication protocol doesn't contain that
732 * information. But it would for example exclude columns that only exist
733 * on the subscriber, since we are not touching those.
734 */
735 target_rte = list_nth(estate->es_range_table, 0);
736 for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
737 {
738 Form_pg_attribute att = TupleDescAttr(remoteslot->tts_tupleDescriptor, i);
739 int remoteattnum = rel->attrmap[i];
740
741 if (!att->attisdropped && remoteattnum >= 0)
742 {
743 if (newtup.changed[remoteattnum])
744 target_rte->updatedCols =
745 bms_add_member(target_rte->updatedCols,
746 i + 1 - FirstLowInvalidHeapAttributeNumber);
747 }
748 }
749
750 /* Also populate extraUpdatedCols, in case we have generated columns */
751 fill_extraUpdatedCols(target_rte, rel->localrel);
752
753 ExecOpenIndices(estate->es_result_relation_info, false);
754
755 /* Build the search tuple. */
756 oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
757 slot_store_cstrings(remoteslot, rel,
758 has_oldtup ? oldtup.values : newtup.values);
759 MemoryContextSwitchTo(oldctx);
760
761 /*
762 * Try to find tuple using either replica identity index, primary key or
763 * if needed, sequential scan.
764 */
765 idxoid = GetRelationIdentityOrPK(rel->localrel);
766 Assert(OidIsValid(idxoid) ||
767 (rel->remoterel.replident == REPLICA_IDENTITY_FULL && has_oldtup));
768
769 if (OidIsValid(idxoid))
770 found = RelationFindReplTupleByIndex(rel->localrel, idxoid,
771 LockTupleExclusive,
772 remoteslot, localslot);
773 else
774 found = RelationFindReplTupleSeq(rel->localrel, LockTupleExclusive,
775 remoteslot, localslot);
776
777 ExecClearTuple(remoteslot);
778
779 /*
780 * Tuple found.
781 *
782 * Note this will fail if there are other conflicting unique indexes.
783 */
784 if (found)
785 {
786 /* Process and store remote tuple in the slot */
787 oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
788 slot_modify_cstrings(remoteslot, localslot, rel,
789 newtup.values, newtup.changed);
790 MemoryContextSwitchTo(oldctx);
791
792 EvalPlanQualSetSlot(&epqstate, remoteslot);
793
794 /* Do the actual update. */
795 ExecSimpleRelationUpdate(estate, &epqstate, localslot, remoteslot);
796 }
797 else
798 {
799 /*
800 * The tuple to be updated could not be found.
801 *
802 * TODO what to do here, change the log level to LOG perhaps?
803 */
804 elog(DEBUG1,
805 "logical replication did not find row for update "
806 "in replication target relation \"%s\"",
807 RelationGetRelationName(rel->localrel));
808 }
809
810 /* Cleanup. */
811 EvalPlanQualEnd(&epqstate);
812 ExecCloseIndices(estate->es_result_relation_info);
813
814 finish_estate(estate);
815
816 logicalrep_rel_close(rel, NoLock);
817
818 end_replication_step();
819 }
820
821 /*
822 * Handle DELETE message.
823 *
824 * TODO: FDW support
825 */
826 static void
apply_handle_delete(StringInfo s)827 apply_handle_delete(StringInfo s)
828 {
829 LogicalRepRelMapEntry *rel;
830 LogicalRepTupleData oldtup;
831 LogicalRepRelId relid;
832 Oid idxoid;
833 EState *estate;
834 EPQState epqstate;
835 TupleTableSlot *remoteslot;
836 TupleTableSlot *localslot;
837 bool found;
838 MemoryContext oldctx;
839
840 begin_replication_step();
841
842 relid = logicalrep_read_delete(s, &oldtup);
843 rel = logicalrep_rel_open(relid, RowExclusiveLock);
844 if (!should_apply_changes_for_rel(rel))
845 {
846 /*
847 * The relation can't become interesting in the middle of the
848 * transaction so it's safe to unlock it.
849 */
850 logicalrep_rel_close(rel, RowExclusiveLock);
851 end_replication_step();
852 return;
853 }
854
855 /* Check if we can do the delete. */
856 check_relation_updatable(rel);
857
858 /* Initialize the executor state. */
859 estate = create_estate_for_relation(rel);
860 remoteslot = ExecInitExtraTupleSlot(estate,
861 RelationGetDescr(rel->localrel),
862 &TTSOpsVirtual);
863 localslot = table_slot_create(rel->localrel,
864 &estate->es_tupleTable);
865 EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
866
867 ExecOpenIndices(estate->es_result_relation_info, false);
868
869 /* Find the tuple using the replica identity index. */
870 oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
871 slot_store_cstrings(remoteslot, rel, oldtup.values);
872 MemoryContextSwitchTo(oldctx);
873
874 /*
875 * Try to find tuple using either replica identity index, primary key or
876 * if needed, sequential scan.
877 */
878 idxoid = GetRelationIdentityOrPK(rel->localrel);
879 Assert(OidIsValid(idxoid) ||
880 (rel->remoterel.replident == REPLICA_IDENTITY_FULL));
881
882 if (OidIsValid(idxoid))
883 found = RelationFindReplTupleByIndex(rel->localrel, idxoid,
884 LockTupleExclusive,
885 remoteslot, localslot);
886 else
887 found = RelationFindReplTupleSeq(rel->localrel, LockTupleExclusive,
888 remoteslot, localslot);
889 /* If found delete it. */
890 if (found)
891 {
892 EvalPlanQualSetSlot(&epqstate, localslot);
893
894 /* Do the actual delete. */
895 ExecSimpleRelationDelete(estate, &epqstate, localslot);
896 }
897 else
898 {
899 /* The tuple to be deleted could not be found. */
900 elog(DEBUG1,
901 "logical replication could not find row for delete "
902 "in replication target relation \"%s\"",
903 RelationGetRelationName(rel->localrel));
904 }
905
906 /* Cleanup. */
907 EvalPlanQualEnd(&epqstate);
908 ExecCloseIndices(estate->es_result_relation_info);
909
910 finish_estate(estate);
911
912 logicalrep_rel_close(rel, NoLock);
913
914 end_replication_step();
915 }
916
917 /*
918 * Handle TRUNCATE message.
919 *
920 * TODO: FDW support
921 */
922 static void
apply_handle_truncate(StringInfo s)923 apply_handle_truncate(StringInfo s)
924 {
925 bool cascade = false;
926 bool restart_seqs = false;
927 List *remote_relids = NIL;
928 List *remote_rels = NIL;
929 List *rels = NIL;
930 List *relids = NIL;
931 List *relids_logged = NIL;
932 ListCell *lc;
933 LOCKMODE lockmode = AccessExclusiveLock;
934
935 begin_replication_step();
936
937 remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
938
939 foreach(lc, remote_relids)
940 {
941 LogicalRepRelId relid = lfirst_oid(lc);
942 LogicalRepRelMapEntry *rel;
943
944 rel = logicalrep_rel_open(relid, lockmode);
945 if (!should_apply_changes_for_rel(rel))
946 {
947 /*
948 * The relation can't become interesting in the middle of the
949 * transaction so it's safe to unlock it.
950 */
951 logicalrep_rel_close(rel, lockmode);
952 continue;
953 }
954
955 remote_rels = lappend(remote_rels, rel);
956 rels = lappend(rels, rel->localrel);
957 relids = lappend_oid(relids, rel->localreloid);
958 if (RelationIsLogicallyLogged(rel->localrel))
959 relids_logged = lappend_oid(relids_logged, rel->localreloid);
960 }
961
962 /*
963 * Even if we used CASCADE on the upstream master we explicitly default to
964 * replaying changes without further cascading. This might be later
965 * changeable with a user specified option.
966 */
967 ExecuteTruncateGuts(rels, relids, relids_logged, DROP_RESTRICT, restart_seqs);
968
969 foreach(lc, remote_rels)
970 {
971 LogicalRepRelMapEntry *rel = lfirst(lc);
972
973 logicalrep_rel_close(rel, NoLock);
974 }
975
976 end_replication_step();
977 }
978
979
980 /*
981 * Logical replication protocol message dispatcher.
982 */
983 static void
apply_dispatch(StringInfo s)984 apply_dispatch(StringInfo s)
985 {
986 char action = pq_getmsgbyte(s);
987
988 switch (action)
989 {
990 /* BEGIN */
991 case 'B':
992 apply_handle_begin(s);
993 break;
994 /* COMMIT */
995 case 'C':
996 apply_handle_commit(s);
997 break;
998 /* INSERT */
999 case 'I':
1000 apply_handle_insert(s);
1001 break;
1002 /* UPDATE */
1003 case 'U':
1004 apply_handle_update(s);
1005 break;
1006 /* DELETE */
1007 case 'D':
1008 apply_handle_delete(s);
1009 break;
1010 /* TRUNCATE */
1011 case 'T':
1012 apply_handle_truncate(s);
1013 break;
1014 /* RELATION */
1015 case 'R':
1016 apply_handle_relation(s);
1017 break;
1018 /* TYPE */
1019 case 'Y':
1020 apply_handle_type(s);
1021 break;
1022 /* ORIGIN */
1023 case 'O':
1024 apply_handle_origin(s);
1025 break;
1026 default:
1027 ereport(ERROR,
1028 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1029 errmsg("invalid logical replication message type \"%c\"", action)));
1030 }
1031 }
1032
1033 /*
1034 * Figure out which write/flush positions to report to the walsender process.
1035 *
1036 * We can't simply report back the last LSN the walsender sent us because the
1037 * local transaction might not yet be flushed to disk locally. Instead we
1038 * build a list that associates local with remote LSNs for every commit. When
1039 * reporting back the flush position to the sender we iterate that list and
1040 * check which entries on it are already locally flushed. Those we can report
1041 * as having been flushed.
1042 *
1043 * The have_pending_txes is true if there are outstanding transactions that
1044 * need to be flushed.
1045 */
1046 static void
get_flush_position(XLogRecPtr * write,XLogRecPtr * flush,bool * have_pending_txes)1047 get_flush_position(XLogRecPtr *write, XLogRecPtr *flush,
1048 bool *have_pending_txes)
1049 {
1050 dlist_mutable_iter iter;
1051 XLogRecPtr local_flush = GetFlushRecPtr();
1052
1053 *write = InvalidXLogRecPtr;
1054 *flush = InvalidXLogRecPtr;
1055
1056 dlist_foreach_modify(iter, &lsn_mapping)
1057 {
1058 FlushPosition *pos =
1059 dlist_container(FlushPosition, node, iter.cur);
1060
1061 *write = pos->remote_end;
1062
1063 if (pos->local_end <= local_flush)
1064 {
1065 *flush = pos->remote_end;
1066 dlist_delete(iter.cur);
1067 pfree(pos);
1068 }
1069 else
1070 {
1071 /*
1072 * Don't want to uselessly iterate over the rest of the list which
1073 * could potentially be long. Instead get the last element and
1074 * grab the write position from there.
1075 */
1076 pos = dlist_tail_element(FlushPosition, node,
1077 &lsn_mapping);
1078 *write = pos->remote_end;
1079 *have_pending_txes = true;
1080 return;
1081 }
1082 }
1083
1084 *have_pending_txes = !dlist_is_empty(&lsn_mapping);
1085 }
1086
1087 /*
1088 * Store current remote/local lsn pair in the tracking list.
1089 */
1090 static void
store_flush_position(XLogRecPtr remote_lsn)1091 store_flush_position(XLogRecPtr remote_lsn)
1092 {
1093 FlushPosition *flushpos;
1094
1095 /* Need to do this in permanent context */
1096 MemoryContextSwitchTo(ApplyContext);
1097
1098 /* Track commit lsn */
1099 flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
1100 flushpos->local_end = XactLastCommitEnd;
1101 flushpos->remote_end = remote_lsn;
1102
1103 dlist_push_tail(&lsn_mapping, &flushpos->node);
1104 MemoryContextSwitchTo(ApplyMessageContext);
1105 }
1106
1107
1108 /* Update statistics of the worker. */
1109 static void
UpdateWorkerStats(XLogRecPtr last_lsn,TimestampTz send_time,bool reply)1110 UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
1111 {
1112 MyLogicalRepWorker->last_lsn = last_lsn;
1113 MyLogicalRepWorker->last_send_time = send_time;
1114 MyLogicalRepWorker->last_recv_time = GetCurrentTimestamp();
1115 if (reply)
1116 {
1117 MyLogicalRepWorker->reply_lsn = last_lsn;
1118 MyLogicalRepWorker->reply_time = send_time;
1119 }
1120 }
1121
1122 /*
1123 * Apply main loop.
1124 */
1125 static void
LogicalRepApplyLoop(XLogRecPtr last_received)1126 LogicalRepApplyLoop(XLogRecPtr last_received)
1127 {
1128 TimestampTz last_recv_timestamp = GetCurrentTimestamp();
1129 bool ping_sent = false;
1130
1131 /*
1132 * Init the ApplyMessageContext which we clean up after each replication
1133 * protocol message.
1134 */
1135 ApplyMessageContext = AllocSetContextCreate(ApplyContext,
1136 "ApplyMessageContext",
1137 ALLOCSET_DEFAULT_SIZES);
1138
1139 /* mark as idle, before starting to loop */
1140 pgstat_report_activity(STATE_IDLE, NULL);
1141
1142 /* This outer loop iterates once per wait. */
1143 for (;;)
1144 {
1145 pgsocket fd = PGINVALID_SOCKET;
1146 int rc;
1147 int len;
1148 char *buf = NULL;
1149 bool endofstream = false;
1150 long wait_time;
1151
1152 CHECK_FOR_INTERRUPTS();
1153
1154 MemoryContextSwitchTo(ApplyMessageContext);
1155
1156 len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
1157
1158 if (len != 0)
1159 {
1160 /* Loop to process all available data (without blocking). */
1161 for (;;)
1162 {
1163 CHECK_FOR_INTERRUPTS();
1164
1165 if (len == 0)
1166 {
1167 break;
1168 }
1169 else if (len < 0)
1170 {
1171 ereport(LOG,
1172 (errmsg("data stream from publisher has ended")));
1173 endofstream = true;
1174 break;
1175 }
1176 else
1177 {
1178 int c;
1179 StringInfoData s;
1180
1181 /* Reset timeout. */
1182 last_recv_timestamp = GetCurrentTimestamp();
1183 ping_sent = false;
1184
1185 /* Ensure we are reading the data into our memory context. */
1186 MemoryContextSwitchTo(ApplyMessageContext);
1187
1188 s.data = buf;
1189 s.len = len;
1190 s.cursor = 0;
1191 s.maxlen = -1;
1192
1193 c = pq_getmsgbyte(&s);
1194
1195 if (c == 'w')
1196 {
1197 XLogRecPtr start_lsn;
1198 XLogRecPtr end_lsn;
1199 TimestampTz send_time;
1200
1201 start_lsn = pq_getmsgint64(&s);
1202 end_lsn = pq_getmsgint64(&s);
1203 send_time = pq_getmsgint64(&s);
1204
1205 if (last_received < start_lsn)
1206 last_received = start_lsn;
1207
1208 if (last_received < end_lsn)
1209 last_received = end_lsn;
1210
1211 UpdateWorkerStats(last_received, send_time, false);
1212
1213 apply_dispatch(&s);
1214 }
1215 else if (c == 'k')
1216 {
1217 XLogRecPtr end_lsn;
1218 TimestampTz timestamp;
1219 bool reply_requested;
1220
1221 end_lsn = pq_getmsgint64(&s);
1222 timestamp = pq_getmsgint64(&s);
1223 reply_requested = pq_getmsgbyte(&s);
1224
1225 if (last_received < end_lsn)
1226 last_received = end_lsn;
1227
1228 send_feedback(last_received, reply_requested, false);
1229 UpdateWorkerStats(last_received, timestamp, true);
1230 }
1231 /* other message types are purposefully ignored */
1232
1233 MemoryContextReset(ApplyMessageContext);
1234 }
1235
1236 len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
1237 }
1238 }
1239
1240 /* confirm all writes so far */
1241 send_feedback(last_received, false, false);
1242
1243 if (!in_remote_transaction)
1244 {
1245 /*
1246 * If we didn't get any transactions for a while there might be
1247 * unconsumed invalidation messages in the queue, consume them
1248 * now.
1249 */
1250 AcceptInvalidationMessages();
1251 maybe_reread_subscription();
1252
1253 /* Process any table synchronization changes. */
1254 process_syncing_tables(last_received);
1255 }
1256
1257 /* Cleanup the memory. */
1258 MemoryContextResetAndDeleteChildren(ApplyMessageContext);
1259 MemoryContextSwitchTo(TopMemoryContext);
1260
1261 /* Check if we need to exit the streaming loop. */
1262 if (endofstream)
1263 {
1264 TimeLineID tli;
1265
1266 walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
1267 break;
1268 }
1269
1270 /*
1271 * Wait for more data or latch. If we have unflushed transactions,
1272 * wake up after WalWriterDelay to see if they've been flushed yet (in
1273 * which case we should send a feedback message). Otherwise, there's
1274 * no particular urgency about waking up unless we get data or a
1275 * signal.
1276 */
1277 if (!dlist_is_empty(&lsn_mapping))
1278 wait_time = WalWriterDelay;
1279 else
1280 wait_time = NAPTIME_PER_CYCLE;
1281
1282 rc = WaitLatchOrSocket(MyLatch,
1283 WL_SOCKET_READABLE | WL_LATCH_SET |
1284 WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
1285 fd, wait_time,
1286 WAIT_EVENT_LOGICAL_APPLY_MAIN);
1287
1288 if (rc & WL_LATCH_SET)
1289 {
1290 ResetLatch(MyLatch);
1291 CHECK_FOR_INTERRUPTS();
1292 }
1293
1294 if (got_SIGHUP)
1295 {
1296 got_SIGHUP = false;
1297 ProcessConfigFile(PGC_SIGHUP);
1298 }
1299
1300 if (rc & WL_TIMEOUT)
1301 {
1302 /*
1303 * We didn't receive anything new. If we haven't heard anything
1304 * from the server for more than wal_receiver_timeout / 2, ping
1305 * the server. Also, if it's been longer than
1306 * wal_receiver_status_interval since the last update we sent,
1307 * send a status update to the master anyway, to report any
1308 * progress in applying WAL.
1309 */
1310 bool requestReply = false;
1311
1312 /*
1313 * Check if time since last receive from standby has reached the
1314 * configured limit.
1315 */
1316 if (wal_receiver_timeout > 0)
1317 {
1318 TimestampTz now = GetCurrentTimestamp();
1319 TimestampTz timeout;
1320
1321 timeout =
1322 TimestampTzPlusMilliseconds(last_recv_timestamp,
1323 wal_receiver_timeout);
1324
1325 if (now >= timeout)
1326 ereport(ERROR,
1327 (errmsg("terminating logical replication worker due to timeout")));
1328
1329 /* Check to see if it's time for a ping. */
1330 if (!ping_sent)
1331 {
1332 timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
1333 (wal_receiver_timeout / 2));
1334 if (now >= timeout)
1335 {
1336 requestReply = true;
1337 ping_sent = true;
1338 }
1339 }
1340 }
1341
1342 send_feedback(last_received, requestReply, requestReply);
1343 }
1344 }
1345 }
1346
1347 /*
1348 * Send a Standby Status Update message to server.
1349 *
1350 * 'recvpos' is the latest LSN we've received data to, force is set if we need
1351 * to send a response to avoid timeouts.
1352 */
1353 static void
send_feedback(XLogRecPtr recvpos,bool force,bool requestReply)1354 send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
1355 {
1356 static StringInfo reply_message = NULL;
1357 static TimestampTz send_time = 0;
1358
1359 static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
1360 static XLogRecPtr last_writepos = InvalidXLogRecPtr;
1361 static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
1362
1363 XLogRecPtr writepos;
1364 XLogRecPtr flushpos;
1365 TimestampTz now;
1366 bool have_pending_txes;
1367
1368 /*
1369 * If the user doesn't want status to be reported to the publisher, be
1370 * sure to exit before doing anything at all.
1371 */
1372 if (!force && wal_receiver_status_interval <= 0)
1373 return;
1374
1375 /* It's legal to not pass a recvpos */
1376 if (recvpos < last_recvpos)
1377 recvpos = last_recvpos;
1378
1379 get_flush_position(&writepos, &flushpos, &have_pending_txes);
1380
1381 /*
1382 * No outstanding transactions to flush, we can report the latest received
1383 * position. This is important for synchronous replication.
1384 */
1385 if (!have_pending_txes)
1386 flushpos = writepos = recvpos;
1387
1388 if (writepos < last_writepos)
1389 writepos = last_writepos;
1390
1391 if (flushpos < last_flushpos)
1392 flushpos = last_flushpos;
1393
1394 now = GetCurrentTimestamp();
1395
1396 /* if we've already reported everything we're good */
1397 if (!force &&
1398 writepos == last_writepos &&
1399 flushpos == last_flushpos &&
1400 !TimestampDifferenceExceeds(send_time, now,
1401 wal_receiver_status_interval * 1000))
1402 return;
1403 send_time = now;
1404
1405 if (!reply_message)
1406 {
1407 MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
1408
1409 reply_message = makeStringInfo();
1410 MemoryContextSwitchTo(oldctx);
1411 }
1412 else
1413 resetStringInfo(reply_message);
1414
1415 pq_sendbyte(reply_message, 'r');
1416 pq_sendint64(reply_message, recvpos); /* write */
1417 pq_sendint64(reply_message, flushpos); /* flush */
1418 pq_sendint64(reply_message, writepos); /* apply */
1419 pq_sendint64(reply_message, now); /* sendTime */
1420 pq_sendbyte(reply_message, requestReply); /* replyRequested */
1421
1422 elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
1423 force,
1424 (uint32) (recvpos >> 32), (uint32) recvpos,
1425 (uint32) (writepos >> 32), (uint32) writepos,
1426 (uint32) (flushpos >> 32), (uint32) flushpos
1427 );
1428
1429 walrcv_send(LogRepWorkerWalRcvConn,
1430 reply_message->data, reply_message->len);
1431
1432 if (recvpos > last_recvpos)
1433 last_recvpos = recvpos;
1434 if (writepos > last_writepos)
1435 last_writepos = writepos;
1436 if (flushpos > last_flushpos)
1437 last_flushpos = flushpos;
1438 }
1439
1440 /*
1441 * Reread subscription info if needed. Most changes will be exit.
1442 */
1443 static void
maybe_reread_subscription(void)1444 maybe_reread_subscription(void)
1445 {
1446 MemoryContext oldctx;
1447 Subscription *newsub;
1448 bool started_tx = false;
1449
1450 /* When cache state is valid there is nothing to do here. */
1451 if (MySubscriptionValid)
1452 return;
1453
1454 /* This function might be called inside or outside of transaction. */
1455 if (!IsTransactionState())
1456 {
1457 StartTransactionCommand();
1458 started_tx = true;
1459 }
1460
1461 /* Ensure allocations in permanent context. */
1462 oldctx = MemoryContextSwitchTo(ApplyContext);
1463
1464 newsub = GetSubscription(MyLogicalRepWorker->subid, true);
1465
1466 /*
1467 * Exit if the subscription was removed. This normally should not happen
1468 * as the worker gets killed during DROP SUBSCRIPTION.
1469 */
1470 if (!newsub)
1471 {
1472 ereport(LOG,
1473 (errmsg("logical replication apply worker for subscription \"%s\" will "
1474 "stop because the subscription was removed",
1475 MySubscription->name)));
1476
1477 proc_exit(0);
1478 }
1479
1480 /*
1481 * Exit if the subscription was disabled. This normally should not happen
1482 * as the worker gets killed during ALTER SUBSCRIPTION ... DISABLE.
1483 */
1484 if (!newsub->enabled)
1485 {
1486 ereport(LOG,
1487 (errmsg("logical replication apply worker for subscription \"%s\" will "
1488 "stop because the subscription was disabled",
1489 MySubscription->name)));
1490
1491 proc_exit(0);
1492 }
1493
1494 /*
1495 * Exit if connection string was changed. The launcher will start new
1496 * worker.
1497 */
1498 if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0)
1499 {
1500 ereport(LOG,
1501 (errmsg("logical replication apply worker for subscription \"%s\" will "
1502 "restart because the connection information was changed",
1503 MySubscription->name)));
1504
1505 proc_exit(0);
1506 }
1507
1508 /*
1509 * Exit if subscription name was changed (it's used for
1510 * fallback_application_name). The launcher will start new worker.
1511 */
1512 if (strcmp(newsub->name, MySubscription->name) != 0)
1513 {
1514 ereport(LOG,
1515 (errmsg("logical replication apply worker for subscription \"%s\" will "
1516 "restart because subscription was renamed",
1517 MySubscription->name)));
1518
1519 proc_exit(0);
1520 }
1521
1522 /* !slotname should never happen when enabled is true. */
1523 Assert(newsub->slotname);
1524
1525 /*
1526 * We need to make new connection to new slot if slot name has changed so
1527 * exit here as well if that's the case.
1528 */
1529 if (strcmp(newsub->slotname, MySubscription->slotname) != 0)
1530 {
1531 ereport(LOG,
1532 (errmsg("logical replication apply worker for subscription \"%s\" will "
1533 "restart because the replication slot name was changed",
1534 MySubscription->name)));
1535
1536 proc_exit(0);
1537 }
1538
1539 /*
1540 * Exit if publication list was changed. The launcher will start new
1541 * worker.
1542 */
1543 if (!equal(newsub->publications, MySubscription->publications))
1544 {
1545 ereport(LOG,
1546 (errmsg("logical replication apply worker for subscription \"%s\" will "
1547 "restart because subscription's publications were changed",
1548 MySubscription->name)));
1549
1550 proc_exit(0);
1551 }
1552
1553 /* Check for other changes that should never happen too. */
1554 if (newsub->dbid != MySubscription->dbid)
1555 {
1556 elog(ERROR, "subscription %u changed unexpectedly",
1557 MyLogicalRepWorker->subid);
1558 }
1559
1560 /* Clean old subscription info and switch to new one. */
1561 FreeSubscription(MySubscription);
1562 MySubscription = newsub;
1563
1564 MemoryContextSwitchTo(oldctx);
1565
1566 /* Change synchronous commit according to the user's wishes */
1567 SetConfigOption("synchronous_commit", MySubscription->synccommit,
1568 PGC_BACKEND, PGC_S_OVERRIDE);
1569
1570 if (started_tx)
1571 CommitTransactionCommand();
1572
1573 MySubscriptionValid = true;
1574 }
1575
1576 /*
1577 * Callback from subscription syscache invalidation.
1578 */
1579 static void
subscription_change_cb(Datum arg,int cacheid,uint32 hashvalue)1580 subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
1581 {
1582 MySubscriptionValid = false;
1583 }
1584
1585 /* SIGHUP: set flag to reload configuration at next convenient time */
1586 static void
logicalrep_worker_sighup(SIGNAL_ARGS)1587 logicalrep_worker_sighup(SIGNAL_ARGS)
1588 {
1589 int save_errno = errno;
1590
1591 got_SIGHUP = true;
1592
1593 /* Waken anything waiting on the process latch */
1594 SetLatch(MyLatch);
1595
1596 errno = save_errno;
1597 }
1598
1599 /* Logical Replication Apply worker entry point */
1600 void
ApplyWorkerMain(Datum main_arg)1601 ApplyWorkerMain(Datum main_arg)
1602 {
1603 int worker_slot = DatumGetInt32(main_arg);
1604 MemoryContext oldctx;
1605 char originname[NAMEDATALEN];
1606 XLogRecPtr origin_startpos;
1607 char *myslotname;
1608 WalRcvStreamOptions options;
1609
1610 /* Attach to slot */
1611 logicalrep_worker_attach(worker_slot);
1612
1613 /* Setup signal handling */
1614 pqsignal(SIGHUP, logicalrep_worker_sighup);
1615 pqsignal(SIGTERM, die);
1616 BackgroundWorkerUnblockSignals();
1617
1618 /*
1619 * We don't currently need any ResourceOwner in a walreceiver process, but
1620 * if we did, we could call CreateAuxProcessResourceOwner here.
1621 */
1622
1623 /* Initialise stats to a sanish value */
1624 MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
1625 MyLogicalRepWorker->reply_time = GetCurrentTimestamp();
1626
1627 /* Load the libpq-specific functions */
1628 load_file("libpqwalreceiver", false);
1629
1630 /* Run as replica session replication role. */
1631 SetConfigOption("session_replication_role", "replica",
1632 PGC_SUSET, PGC_S_OVERRIDE);
1633
1634 /* Connect to our database. */
1635 BackgroundWorkerInitializeConnectionByOid(MyLogicalRepWorker->dbid,
1636 MyLogicalRepWorker->userid,
1637 0);
1638
1639 /*
1640 * Set always-secure search path, so malicious users can't redirect user
1641 * code (e.g. pg_index.indexprs).
1642 */
1643 SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
1644
1645 /* Load the subscription into persistent memory context. */
1646 ApplyContext = AllocSetContextCreate(TopMemoryContext,
1647 "ApplyContext",
1648 ALLOCSET_DEFAULT_SIZES);
1649 StartTransactionCommand();
1650 oldctx = MemoryContextSwitchTo(ApplyContext);
1651
1652 MySubscription = GetSubscription(MyLogicalRepWorker->subid, true);
1653 if (!MySubscription)
1654 {
1655 ereport(LOG,
1656 (errmsg("logical replication apply worker for subscription %u will not "
1657 "start because the subscription was removed during startup",
1658 MyLogicalRepWorker->subid)));
1659 proc_exit(0);
1660 }
1661
1662 MySubscriptionValid = true;
1663 MemoryContextSwitchTo(oldctx);
1664
1665 if (!MySubscription->enabled)
1666 {
1667 ereport(LOG,
1668 (errmsg("logical replication apply worker for subscription \"%s\" will not "
1669 "start because the subscription was disabled during startup",
1670 MySubscription->name)));
1671
1672 proc_exit(0);
1673 }
1674
1675 /* Setup synchronous commit according to the user's wishes */
1676 SetConfigOption("synchronous_commit", MySubscription->synccommit,
1677 PGC_BACKEND, PGC_S_OVERRIDE);
1678
1679 /* Keep us informed about subscription changes. */
1680 CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
1681 subscription_change_cb,
1682 (Datum) 0);
1683
1684 if (am_tablesync_worker())
1685 ereport(LOG,
1686 (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
1687 MySubscription->name, get_rel_name(MyLogicalRepWorker->relid))));
1688 else
1689 ereport(LOG,
1690 (errmsg("logical replication apply worker for subscription \"%s\" has started",
1691 MySubscription->name)));
1692
1693 CommitTransactionCommand();
1694
1695 /* Connect to the origin and start the replication. */
1696 elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
1697 MySubscription->conninfo);
1698
1699 if (am_tablesync_worker())
1700 {
1701 char *syncslotname;
1702
1703 /* This is table synchroniation worker, call initial sync. */
1704 syncslotname = LogicalRepSyncTableStart(&origin_startpos);
1705
1706 /* The slot name needs to be allocated in permanent memory context. */
1707 oldctx = MemoryContextSwitchTo(ApplyContext);
1708 myslotname = pstrdup(syncslotname);
1709 MemoryContextSwitchTo(oldctx);
1710
1711 pfree(syncslotname);
1712 }
1713 else
1714 {
1715 /* This is main apply worker */
1716 RepOriginId originid;
1717 TimeLineID startpointTLI;
1718 char *err;
1719
1720 myslotname = MySubscription->slotname;
1721
1722 /*
1723 * This shouldn't happen if the subscription is enabled, but guard
1724 * against DDL bugs or manual catalog changes. (libpqwalreceiver will
1725 * crash if slot is NULL.)
1726 */
1727 if (!myslotname)
1728 ereport(ERROR,
1729 (errmsg("subscription has no replication slot set")));
1730
1731 /* Setup replication origin tracking. */
1732 StartTransactionCommand();
1733 snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid);
1734 originid = replorigin_by_name(originname, true);
1735 if (!OidIsValid(originid))
1736 originid = replorigin_create(originname);
1737 replorigin_session_setup(originid);
1738 replorigin_session_origin = originid;
1739 origin_startpos = replorigin_session_get_progress(false);
1740 CommitTransactionCommand();
1741
1742 LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
1743 MySubscription->name, &err);
1744 if (LogRepWorkerWalRcvConn == NULL)
1745 ereport(ERROR,
1746 (errmsg("could not connect to the publisher: %s", err)));
1747
1748 /*
1749 * We don't really use the output identify_system for anything but it
1750 * does some initializations on the upstream so let's still call it.
1751 */
1752 (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
1753 }
1754
1755 /*
1756 * Setup callback for syscache so that we know when something changes in
1757 * the subscription relation state.
1758 */
1759 CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
1760 invalidate_syncing_table_states,
1761 (Datum) 0);
1762
1763 /* Build logical replication streaming options. */
1764 options.logical = true;
1765 options.startpoint = origin_startpos;
1766 options.slotname = myslotname;
1767 options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM;
1768 options.proto.logical.publication_names = MySubscription->publications;
1769
1770 /* Start normal logical streaming replication. */
1771 walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
1772
1773 /* Run the main loop. */
1774 LogicalRepApplyLoop(origin_startpos);
1775
1776 proc_exit(0);
1777 }
1778
1779 /*
1780 * Is current process a logical replication worker?
1781 */
1782 bool
IsLogicalWorker(void)1783 IsLogicalWorker(void)
1784 {
1785 return MyLogicalRepWorker != NULL;
1786 }
1787