1 /*-------------------------------------------------------------------------
2 * worker.c
3 * PostgreSQL logical replication worker (apply)
4 *
5 * Copyright (c) 2016-2018, PostgreSQL Global Development Group
6 *
7 * IDENTIFICATION
8 * src/backend/replication/logical/worker.c
9 *
10 * NOTES
11 * This file contains the worker which applies logical changes as they come
12 * from remote logical replication stream.
13 *
14 * The main worker (apply) is started by logical replication worker
15 * launcher for every enabled subscription in a database. It uses
16 * walsender protocol to communicate with publisher.
17 *
18 * This module includes server facing code and shares libpqwalreceiver
19 * module with walreceiver for providing the libpq specific functionality.
20 *
21 *-------------------------------------------------------------------------
22 */
23
24 #include "postgres.h"
25
26 #include "miscadmin.h"
27 #include "pgstat.h"
28 #include "funcapi.h"
29
30 #include "access/sysattr.h"
31 #include "access/xact.h"
32 #include "access/xlog_internal.h"
33
34 #include "catalog/catalog.h"
35 #include "catalog/namespace.h"
36 #include "catalog/pg_subscription.h"
37 #include "catalog/pg_subscription_rel.h"
38
39 #include "commands/tablecmds.h"
40 #include "commands/trigger.h"
41
42 #include "executor/executor.h"
43 #include "executor/nodeModifyTable.h"
44
45 #include "libpq/pqformat.h"
46 #include "libpq/pqsignal.h"
47
48 #include "mb/pg_wchar.h"
49
50 #include "nodes/makefuncs.h"
51
52 #include "optimizer/planner.h"
53
54 #include "parser/parse_relation.h"
55
56 #include "postmaster/bgworker.h"
57 #include "postmaster/postmaster.h"
58 #include "postmaster/walwriter.h"
59
60 #include "replication/decode.h"
61 #include "replication/logical.h"
62 #include "replication/logicalproto.h"
63 #include "replication/logicalrelation.h"
64 #include "replication/logicalworker.h"
65 #include "replication/reorderbuffer.h"
66 #include "replication/origin.h"
67 #include "replication/snapbuild.h"
68 #include "replication/walreceiver.h"
69 #include "replication/worker_internal.h"
70
71 #include "rewrite/rewriteHandler.h"
72
73 #include "storage/bufmgr.h"
74 #include "storage/ipc.h"
75 #include "storage/lmgr.h"
76 #include "storage/proc.h"
77 #include "storage/procarray.h"
78
79 #include "tcop/tcopprot.h"
80
81 #include "utils/builtins.h"
82 #include "utils/catcache.h"
83 #include "utils/datum.h"
84 #include "utils/fmgroids.h"
85 #include "utils/guc.h"
86 #include "utils/inval.h"
87 #include "utils/lsyscache.h"
88 #include "utils/memutils.h"
89 #include "utils/rel.h"
90 #include "utils/timeout.h"
91 #include "utils/tqual.h"
92 #include "utils/syscache.h"
93
94 #define NAPTIME_PER_CYCLE 1000 /* max sleep time between cycles (1s) */
95
96 typedef struct FlushPosition
97 {
98 dlist_node node;
99 XLogRecPtr local_end;
100 XLogRecPtr remote_end;
101 } FlushPosition;
102
103 static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
104
105 typedef struct SlotErrCallbackArg
106 {
107 LogicalRepRelMapEntry *rel;
108 int remote_attnum;
109 } SlotErrCallbackArg;
110
111 static MemoryContext ApplyMessageContext = NULL;
112 MemoryContext ApplyContext = NULL;
113
114 WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
115
116 Subscription *MySubscription = NULL;
117 bool MySubscriptionValid = false;
118
119 bool in_remote_transaction = false;
120 static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
121
122 static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
123
124 static void store_flush_position(XLogRecPtr remote_lsn);
125
126 static void maybe_reread_subscription(void);
127
128 /* Flags set by signal handlers */
129 static volatile sig_atomic_t got_SIGHUP = false;
130
131 /*
132 * Should this worker apply changes for given relation.
133 *
134 * This is mainly needed for initial relation data sync as that runs in
135 * separate worker process running in parallel and we need some way to skip
136 * changes coming to the main apply worker during the sync of a table.
137 *
138 * Note we need to do smaller or equals comparison for SYNCDONE state because
139 * it might hold position of end of initial slot consistent point WAL
140 * record + 1 (ie start of next record) and next record can be COMMIT of
141 * transaction we are now processing (which is what we set remote_final_lsn
142 * to in apply_handle_begin).
143 */
144 static bool
should_apply_changes_for_rel(LogicalRepRelMapEntry * rel)145 should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
146 {
147 if (am_tablesync_worker())
148 return MyLogicalRepWorker->relid == rel->localreloid;
149 else
150 return (rel->state == SUBREL_STATE_READY ||
151 (rel->state == SUBREL_STATE_SYNCDONE &&
152 rel->statelsn <= remote_final_lsn));
153 }
154
155 /*
156 * Begin one step (one INSERT, UPDATE, etc) of a replication transaction.
157 *
158 * Start a transaction, if this is the first step (else we keep using the
159 * existing transaction).
160 * Also provide a global snapshot and ensure we run in ApplyMessageContext.
161 */
162 static void
begin_replication_step(void)163 begin_replication_step(void)
164 {
165 SetCurrentStatementStartTimestamp();
166
167 if (!IsTransactionState())
168 {
169 StartTransactionCommand();
170 maybe_reread_subscription();
171 }
172
173 PushActiveSnapshot(GetTransactionSnapshot());
174
175 MemoryContextSwitchTo(ApplyMessageContext);
176 }
177
178 /*
179 * Finish up one step of a replication transaction.
180 * Callers of begin_replication_step() must also call this.
181 *
182 * We don't close out the transaction here, but we should increment
183 * the command counter to make the effects of this step visible.
184 */
185 static void
end_replication_step(void)186 end_replication_step(void)
187 {
188 PopActiveSnapshot();
189
190 CommandCounterIncrement();
191 }
192
193
194 /*
195 * Executor state preparation for evaluation of constraint expressions,
196 * indexes and triggers.
197 *
198 * This is based on similar code in copy.c
199 */
200 static EState *
create_estate_for_relation(LogicalRepRelMapEntry * rel)201 create_estate_for_relation(LogicalRepRelMapEntry *rel)
202 {
203 EState *estate;
204 ResultRelInfo *resultRelInfo;
205 RangeTblEntry *rte;
206
207 estate = CreateExecutorState();
208
209 rte = makeNode(RangeTblEntry);
210 rte->rtekind = RTE_RELATION;
211 rte->relid = RelationGetRelid(rel->localrel);
212 rte->relkind = rel->localrel->rd_rel->relkind;
213 estate->es_range_table = list_make1(rte);
214
215 resultRelInfo = makeNode(ResultRelInfo);
216 InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
217
218 estate->es_result_relations = resultRelInfo;
219 estate->es_num_result_relations = 1;
220 estate->es_result_relation_info = resultRelInfo;
221
222 estate->es_output_cid = GetCurrentCommandId(true);
223
224 /* Triggers might need a slot */
225 if (resultRelInfo->ri_TrigDesc)
226 estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate, NULL);
227
228 /* Prepare to catch AFTER triggers. */
229 AfterTriggerBeginQuery();
230
231 return estate;
232 }
233
234 /*
235 * Finish any operations related to the executor state created by
236 * create_estate_for_relation().
237 */
238 static void
finish_estate(EState * estate)239 finish_estate(EState *estate)
240 {
241 /* Handle any queued AFTER triggers. */
242 AfterTriggerEndQuery(estate);
243
244 /* Cleanup. */
245 ExecResetTupleTable(estate->es_tupleTable, false);
246 FreeExecutorState(estate);
247 }
248
249 /*
250 * Executes default values for columns for which we can't map to remote
251 * relation columns.
252 *
253 * This allows us to support tables which have more columns on the downstream
254 * than on the upstream.
255 */
256 static void
slot_fill_defaults(LogicalRepRelMapEntry * rel,EState * estate,TupleTableSlot * slot)257 slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate,
258 TupleTableSlot *slot)
259 {
260 TupleDesc desc = RelationGetDescr(rel->localrel);
261 int num_phys_attrs = desc->natts;
262 int i;
263 int attnum,
264 num_defaults = 0;
265 int *defmap;
266 ExprState **defexprs;
267 ExprContext *econtext;
268
269 econtext = GetPerTupleExprContext(estate);
270
271 /* We got all the data via replication, no need to evaluate anything. */
272 if (num_phys_attrs == rel->remoterel.natts)
273 return;
274
275 defmap = (int *) palloc(num_phys_attrs * sizeof(int));
276 defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
277
278 for (attnum = 0; attnum < num_phys_attrs; attnum++)
279 {
280 Expr *defexpr;
281
282 if (TupleDescAttr(desc, attnum)->attisdropped)
283 continue;
284
285 if (rel->attrmap[attnum] >= 0)
286 continue;
287
288 defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
289
290 if (defexpr != NULL)
291 {
292 /* Run the expression through planner */
293 defexpr = expression_planner(defexpr);
294
295 /* Initialize executable expression in copycontext */
296 defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
297 defmap[num_defaults] = attnum;
298 num_defaults++;
299 }
300
301 }
302
303 for (i = 0; i < num_defaults; i++)
304 slot->tts_values[defmap[i]] =
305 ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
306 }
307
308 /*
309 * Error callback to give more context info about data conversion failures
310 * while reading data from the remote server.
311 */
312 static void
slot_store_error_callback(void * arg)313 slot_store_error_callback(void *arg)
314 {
315 SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
316 LogicalRepRelMapEntry *rel;
317
318 /* Nothing to do if remote attribute number is not set */
319 if (errarg->remote_attnum < 0)
320 return;
321
322 rel = errarg->rel;
323 errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\"",
324 rel->remoterel.nspname, rel->remoterel.relname,
325 rel->remoterel.attnames[errarg->remote_attnum]);
326 }
327
328 /*
329 * Store data in C string form into slot.
330 * This is similar to BuildTupleFromCStrings but TupleTableSlot fits our
331 * use better.
332 */
333 static void
slot_store_cstrings(TupleTableSlot * slot,LogicalRepRelMapEntry * rel,char ** values)334 slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
335 char **values)
336 {
337 int natts = slot->tts_tupleDescriptor->natts;
338 int i;
339 SlotErrCallbackArg errarg;
340 ErrorContextCallback errcallback;
341
342 ExecClearTuple(slot);
343
344 /* Push callback + info on the error context stack */
345 errarg.rel = rel;
346 errarg.remote_attnum = -1;
347 errcallback.callback = slot_store_error_callback;
348 errcallback.arg = (void *) &errarg;
349 errcallback.previous = error_context_stack;
350 error_context_stack = &errcallback;
351
352 /* Call the "in" function for each non-dropped attribute */
353 for (i = 0; i < natts; i++)
354 {
355 Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
356 int remoteattnum = rel->attrmap[i];
357
358 if (!att->attisdropped && remoteattnum >= 0 &&
359 values[remoteattnum] != NULL)
360 {
361 Oid typinput;
362 Oid typioparam;
363
364 errarg.remote_attnum = remoteattnum;
365
366 getTypeInputInfo(att->atttypid, &typinput, &typioparam);
367 slot->tts_values[i] =
368 OidInputFunctionCall(typinput, values[remoteattnum],
369 typioparam, att->atttypmod);
370 slot->tts_isnull[i] = false;
371
372 errarg.remote_attnum = -1;
373 }
374 else
375 {
376 /*
377 * We assign NULL to dropped attributes, NULL values, and missing
378 * values (missing values should be later filled using
379 * slot_fill_defaults).
380 */
381 slot->tts_values[i] = (Datum) 0;
382 slot->tts_isnull[i] = true;
383 }
384 }
385
386 /* Pop the error context stack */
387 error_context_stack = errcallback.previous;
388
389 ExecStoreVirtualTuple(slot);
390 }
391
392 /*
393 * Replace selected columns with user data provided as C strings.
394 * This is somewhat similar to heap_modify_tuple but also calls the type
395 * input functions on the user data.
396 * "slot" is filled with a copy of the tuple in "srcslot", with
397 * columns selected by the "replaces" array replaced with data values
398 * from "values".
399 * Caution: unreplaced pass-by-ref columns in "slot" will point into the
400 * storage for "srcslot". This is OK for current usage, but someday we may
401 * need to materialize "slot" at the end to make it independent of "srcslot".
402 */
403 static void
slot_modify_cstrings(TupleTableSlot * slot,TupleTableSlot * srcslot,LogicalRepRelMapEntry * rel,char ** values,bool * replaces)404 slot_modify_cstrings(TupleTableSlot *slot, TupleTableSlot *srcslot,
405 LogicalRepRelMapEntry *rel,
406 char **values, bool *replaces)
407 {
408 int natts = slot->tts_tupleDescriptor->natts;
409 int i;
410 SlotErrCallbackArg errarg;
411 ErrorContextCallback errcallback;
412
413 /* We'll fill "slot" with a virtual tuple, so we must start with ... */
414 ExecClearTuple(slot);
415
416 /*
417 * Copy all the column data from srcslot, so that we'll have valid values
418 * for unreplaced columns.
419 */
420 Assert(natts == srcslot->tts_tupleDescriptor->natts);
421 slot_getallattrs(srcslot);
422 memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
423 memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
424
425 /* For error reporting, push callback + info on the error context stack */
426 errarg.rel = rel;
427 errarg.remote_attnum = -1;
428 errcallback.callback = slot_store_error_callback;
429 errcallback.arg = (void *) &errarg;
430 errcallback.previous = error_context_stack;
431 error_context_stack = &errcallback;
432
433 /* Call the "in" function for each replaced attribute */
434 for (i = 0; i < natts; i++)
435 {
436 Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
437 int remoteattnum = rel->attrmap[i];
438
439 if (remoteattnum < 0)
440 continue;
441
442 if (!replaces[remoteattnum])
443 continue;
444
445 if (values[remoteattnum] != NULL)
446 {
447 Oid typinput;
448 Oid typioparam;
449
450 errarg.remote_attnum = remoteattnum;
451
452 getTypeInputInfo(att->atttypid, &typinput, &typioparam);
453 slot->tts_values[i] =
454 OidInputFunctionCall(typinput, values[remoteattnum],
455 typioparam, att->atttypmod);
456 slot->tts_isnull[i] = false;
457
458 errarg.remote_attnum = -1;
459 }
460 else
461 {
462 slot->tts_values[i] = (Datum) 0;
463 slot->tts_isnull[i] = true;
464 }
465 }
466
467 /* Pop the error context stack */
468 error_context_stack = errcallback.previous;
469
470 /* And finally, declare that "slot" contains a valid virtual tuple */
471 ExecStoreVirtualTuple(slot);
472 }
473
474 /*
475 * Handle BEGIN message.
476 */
477 static void
apply_handle_begin(StringInfo s)478 apply_handle_begin(StringInfo s)
479 {
480 LogicalRepBeginData begin_data;
481
482 logicalrep_read_begin(s, &begin_data);
483
484 remote_final_lsn = begin_data.final_lsn;
485
486 in_remote_transaction = true;
487
488 pgstat_report_activity(STATE_RUNNING, NULL);
489 }
490
491 /*
492 * Handle COMMIT message.
493 *
494 * TODO, support tracking of multiple origins
495 */
496 static void
apply_handle_commit(StringInfo s)497 apply_handle_commit(StringInfo s)
498 {
499 LogicalRepCommitData commit_data;
500
501 logicalrep_read_commit(s, &commit_data);
502
503 if (commit_data.commit_lsn != remote_final_lsn)
504 ereport(ERROR,
505 (errcode(ERRCODE_PROTOCOL_VIOLATION),
506 errmsg_internal("incorrect commit LSN %X/%X in commit message (expected %X/%X)",
507 (uint32) (commit_data.commit_lsn >> 32),
508 (uint32) commit_data.commit_lsn,
509 (uint32) (remote_final_lsn >> 32),
510 (uint32) remote_final_lsn)));
511
512 /* The synchronization worker runs in single transaction. */
513 if (IsTransactionState() && !am_tablesync_worker())
514 {
515 /*
516 * Update origin state so we can restart streaming from correct
517 * position in case of crash.
518 */
519 replorigin_session_origin_lsn = commit_data.end_lsn;
520 replorigin_session_origin_timestamp = commit_data.committime;
521
522 CommitTransactionCommand();
523 pgstat_report_stat(false);
524
525 store_flush_position(commit_data.end_lsn);
526 }
527 else
528 {
529 /* Process any invalidation messages that might have accumulated. */
530 AcceptInvalidationMessages();
531 maybe_reread_subscription();
532 }
533
534 in_remote_transaction = false;
535
536 /* Process any tables that are being synchronized in parallel. */
537 process_syncing_tables(commit_data.end_lsn);
538
539 pgstat_report_activity(STATE_IDLE, NULL);
540 }
541
542 /*
543 * Handle ORIGIN message.
544 *
545 * TODO, support tracking of multiple origins
546 */
547 static void
apply_handle_origin(StringInfo s)548 apply_handle_origin(StringInfo s)
549 {
550 /*
551 * ORIGIN message can only come inside remote transaction and before any
552 * actual writes.
553 */
554 if (!in_remote_transaction ||
555 (IsTransactionState() && !am_tablesync_worker()))
556 ereport(ERROR,
557 (errcode(ERRCODE_PROTOCOL_VIOLATION),
558 errmsg("ORIGIN message sent out of order")));
559 }
560
561 /*
562 * Handle RELATION message.
563 *
564 * Note we don't do validation against local schema here. The validation
565 * against local schema is postponed until first change for given relation
566 * comes as we only care about it when applying changes for it anyway and we
567 * do less locking this way.
568 */
569 static void
apply_handle_relation(StringInfo s)570 apply_handle_relation(StringInfo s)
571 {
572 LogicalRepRelation *rel;
573
574 rel = logicalrep_read_rel(s);
575 logicalrep_relmap_update(rel);
576 }
577
578 /*
579 * Handle TYPE message.
580 *
581 * This is now vestigial; we read the info and discard it.
582 */
583 static void
apply_handle_type(StringInfo s)584 apply_handle_type(StringInfo s)
585 {
586 LogicalRepTyp typ;
587
588 logicalrep_read_typ(s, &typ);
589 }
590
591 /*
592 * Get replica identity index or if it is not defined a primary key.
593 *
594 * If neither is defined, returns InvalidOid
595 */
596 static Oid
GetRelationIdentityOrPK(Relation rel)597 GetRelationIdentityOrPK(Relation rel)
598 {
599 Oid idxoid;
600
601 idxoid = RelationGetReplicaIndex(rel);
602
603 if (!OidIsValid(idxoid))
604 idxoid = RelationGetPrimaryKeyIndex(rel);
605
606 return idxoid;
607 }
608
609 /*
610 * Handle INSERT message.
611 */
612 static void
apply_handle_insert(StringInfo s)613 apply_handle_insert(StringInfo s)
614 {
615 LogicalRepRelMapEntry *rel;
616 LogicalRepTupleData newtup;
617 LogicalRepRelId relid;
618 EState *estate;
619 TupleTableSlot *remoteslot;
620 MemoryContext oldctx;
621
622 begin_replication_step();
623
624 relid = logicalrep_read_insert(s, &newtup);
625 rel = logicalrep_rel_open(relid, RowExclusiveLock);
626 if (!should_apply_changes_for_rel(rel))
627 {
628 /*
629 * The relation can't become interesting in the middle of the
630 * transaction so it's safe to unlock it.
631 */
632 logicalrep_rel_close(rel, RowExclusiveLock);
633 end_replication_step();
634 return;
635 }
636
637 /* Initialize the executor state. */
638 estate = create_estate_for_relation(rel);
639 remoteslot = ExecInitExtraTupleSlot(estate,
640 RelationGetDescr(rel->localrel));
641
642 /* Process and store remote tuple in the slot */
643 oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
644 slot_store_cstrings(remoteslot, rel, newtup.values);
645 slot_fill_defaults(rel, estate, remoteslot);
646 MemoryContextSwitchTo(oldctx);
647
648 ExecOpenIndices(estate->es_result_relation_info, false);
649
650 /* Do the insert. */
651 ExecSimpleRelationInsert(estate, remoteslot);
652
653 /* Cleanup. */
654 ExecCloseIndices(estate->es_result_relation_info);
655
656 finish_estate(estate);
657
658 logicalrep_rel_close(rel, NoLock);
659
660 end_replication_step();
661 }
662
663 /*
664 * Check if the logical replication relation is updatable and throw
665 * appropriate error if it isn't.
666 */
667 static void
check_relation_updatable(LogicalRepRelMapEntry * rel)668 check_relation_updatable(LogicalRepRelMapEntry *rel)
669 {
670 /* Updatable, no error. */
671 if (rel->updatable)
672 return;
673
674 /*
675 * We are in error mode so it's fine this is somewhat slow. It's better to
676 * give user correct error.
677 */
678 if (OidIsValid(GetRelationIdentityOrPK(rel->localrel)))
679 {
680 ereport(ERROR,
681 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
682 errmsg("publisher did not send replica identity column "
683 "expected by the logical replication target relation \"%s.%s\"",
684 rel->remoterel.nspname, rel->remoterel.relname)));
685 }
686
687 ereport(ERROR,
688 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
689 errmsg("logical replication target relation \"%s.%s\" has "
690 "neither REPLICA IDENTITY index nor PRIMARY "
691 "KEY and published relation does not have "
692 "REPLICA IDENTITY FULL",
693 rel->remoterel.nspname, rel->remoterel.relname)));
694 }
695
696 /*
697 * Handle UPDATE message.
698 *
699 * TODO: FDW support
700 */
701 static void
apply_handle_update(StringInfo s)702 apply_handle_update(StringInfo s)
703 {
704 LogicalRepRelMapEntry *rel;
705 LogicalRepRelId relid;
706 Oid idxoid;
707 EState *estate;
708 EPQState epqstate;
709 LogicalRepTupleData oldtup;
710 LogicalRepTupleData newtup;
711 bool has_oldtup;
712 TupleTableSlot *localslot;
713 TupleTableSlot *remoteslot;
714 RangeTblEntry *target_rte;
715 int i;
716 bool found;
717 MemoryContext oldctx;
718
719 begin_replication_step();
720
721 relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
722 &newtup);
723 rel = logicalrep_rel_open(relid, RowExclusiveLock);
724 if (!should_apply_changes_for_rel(rel))
725 {
726 /*
727 * The relation can't become interesting in the middle of the
728 * transaction so it's safe to unlock it.
729 */
730 logicalrep_rel_close(rel, RowExclusiveLock);
731 end_replication_step();
732 return;
733 }
734
735 /* Check if we can do the update. */
736 check_relation_updatable(rel);
737
738 /* Initialize the executor state. */
739 estate = create_estate_for_relation(rel);
740 remoteslot = ExecInitExtraTupleSlot(estate,
741 RelationGetDescr(rel->localrel));
742 localslot = ExecInitExtraTupleSlot(estate,
743 RelationGetDescr(rel->localrel));
744 EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
745
746 /*
747 * Populate updatedCols so that per-column triggers can fire. This could
748 * include more columns than were actually changed on the publisher
749 * because the logical replication protocol doesn't contain that
750 * information. But it would for example exclude columns that only exist
751 * on the subscriber, since we are not touching those.
752 */
753 target_rte = list_nth(estate->es_range_table, 0);
754 for (i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
755 {
756 Form_pg_attribute att = TupleDescAttr(remoteslot->tts_tupleDescriptor, i);
757 int remoteattnum = rel->attrmap[i];
758
759 if (!att->attisdropped && remoteattnum >= 0)
760 {
761 if (newtup.changed[remoteattnum])
762 target_rte->updatedCols =
763 bms_add_member(target_rte->updatedCols,
764 i + 1 - FirstLowInvalidHeapAttributeNumber);
765 }
766 }
767
768 ExecOpenIndices(estate->es_result_relation_info, false);
769
770 /* Build the search tuple. */
771 oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
772 slot_store_cstrings(remoteslot, rel,
773 has_oldtup ? oldtup.values : newtup.values);
774 MemoryContextSwitchTo(oldctx);
775
776 /*
777 * Try to find tuple using either replica identity index, primary key or
778 * if needed, sequential scan.
779 */
780 idxoid = GetRelationIdentityOrPK(rel->localrel);
781 Assert(OidIsValid(idxoid) ||
782 (rel->remoterel.replident == REPLICA_IDENTITY_FULL && has_oldtup));
783
784 if (OidIsValid(idxoid))
785 found = RelationFindReplTupleByIndex(rel->localrel, idxoid,
786 LockTupleExclusive,
787 remoteslot, localslot);
788 else
789 found = RelationFindReplTupleSeq(rel->localrel, LockTupleExclusive,
790 remoteslot, localslot);
791
792 ExecClearTuple(remoteslot);
793
794 /*
795 * Tuple found.
796 *
797 * Note this will fail if there are other conflicting unique indexes.
798 */
799 if (found)
800 {
801 /* Process and store remote tuple in the slot */
802 oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
803 slot_modify_cstrings(remoteslot, localslot, rel,
804 newtup.values, newtup.changed);
805 MemoryContextSwitchTo(oldctx);
806
807 EvalPlanQualSetSlot(&epqstate, remoteslot);
808
809 /* Do the actual update. */
810 ExecSimpleRelationUpdate(estate, &epqstate, localslot, remoteslot);
811 }
812 else
813 {
814 /*
815 * The tuple to be updated could not be found.
816 *
817 * TODO what to do here, change the log level to LOG perhaps?
818 */
819 elog(DEBUG1,
820 "logical replication did not find row for update "
821 "in replication target relation \"%s\"",
822 RelationGetRelationName(rel->localrel));
823 }
824
825 /* Cleanup. */
826 EvalPlanQualEnd(&epqstate);
827 ExecCloseIndices(estate->es_result_relation_info);
828
829 finish_estate(estate);
830
831 logicalrep_rel_close(rel, NoLock);
832
833 end_replication_step();
834 }
835
836 /*
837 * Handle DELETE message.
838 *
839 * TODO: FDW support
840 */
841 static void
apply_handle_delete(StringInfo s)842 apply_handle_delete(StringInfo s)
843 {
844 LogicalRepRelMapEntry *rel;
845 LogicalRepTupleData oldtup;
846 LogicalRepRelId relid;
847 Oid idxoid;
848 EState *estate;
849 EPQState epqstate;
850 TupleTableSlot *remoteslot;
851 TupleTableSlot *localslot;
852 bool found;
853 MemoryContext oldctx;
854
855 begin_replication_step();
856
857 relid = logicalrep_read_delete(s, &oldtup);
858 rel = logicalrep_rel_open(relid, RowExclusiveLock);
859 if (!should_apply_changes_for_rel(rel))
860 {
861 /*
862 * The relation can't become interesting in the middle of the
863 * transaction so it's safe to unlock it.
864 */
865 logicalrep_rel_close(rel, RowExclusiveLock);
866 end_replication_step();
867 return;
868 }
869
870 /* Check if we can do the delete. */
871 check_relation_updatable(rel);
872
873 /* Initialize the executor state. */
874 estate = create_estate_for_relation(rel);
875 remoteslot = ExecInitExtraTupleSlot(estate,
876 RelationGetDescr(rel->localrel));
877 localslot = ExecInitExtraTupleSlot(estate,
878 RelationGetDescr(rel->localrel));
879 EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
880
881 ExecOpenIndices(estate->es_result_relation_info, false);
882
883 /* Find the tuple using the replica identity index. */
884 oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
885 slot_store_cstrings(remoteslot, rel, oldtup.values);
886 MemoryContextSwitchTo(oldctx);
887
888 /*
889 * Try to find tuple using either replica identity index, primary key or
890 * if needed, sequential scan.
891 */
892 idxoid = GetRelationIdentityOrPK(rel->localrel);
893 Assert(OidIsValid(idxoid) ||
894 (rel->remoterel.replident == REPLICA_IDENTITY_FULL));
895
896 if (OidIsValid(idxoid))
897 found = RelationFindReplTupleByIndex(rel->localrel, idxoid,
898 LockTupleExclusive,
899 remoteslot, localslot);
900 else
901 found = RelationFindReplTupleSeq(rel->localrel, LockTupleExclusive,
902 remoteslot, localslot);
903 /* If found delete it. */
904 if (found)
905 {
906 EvalPlanQualSetSlot(&epqstate, localslot);
907
908 /* Do the actual delete. */
909 ExecSimpleRelationDelete(estate, &epqstate, localslot);
910 }
911 else
912 {
913 /* The tuple to be deleted could not be found. */
914 elog(DEBUG1,
915 "logical replication could not find row for delete "
916 "in replication target relation \"%s\"",
917 RelationGetRelationName(rel->localrel));
918 }
919
920 /* Cleanup. */
921 EvalPlanQualEnd(&epqstate);
922 ExecCloseIndices(estate->es_result_relation_info);
923
924 finish_estate(estate);
925
926 logicalrep_rel_close(rel, NoLock);
927
928 end_replication_step();
929 }
930
931 /*
932 * Handle TRUNCATE message.
933 *
934 * TODO: FDW support
935 */
936 static void
apply_handle_truncate(StringInfo s)937 apply_handle_truncate(StringInfo s)
938 {
939 bool cascade = false;
940 bool restart_seqs = false;
941 List *remote_relids = NIL;
942 List *remote_rels = NIL;
943 List *rels = NIL;
944 List *relids = NIL;
945 List *relids_logged = NIL;
946 ListCell *lc;
947 LOCKMODE lockmode = AccessExclusiveLock;
948
949 begin_replication_step();
950
951 remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
952
953 foreach(lc, remote_relids)
954 {
955 LogicalRepRelId relid = lfirst_oid(lc);
956 LogicalRepRelMapEntry *rel;
957
958 rel = logicalrep_rel_open(relid, lockmode);
959 if (!should_apply_changes_for_rel(rel))
960 {
961 /*
962 * The relation can't become interesting in the middle of the
963 * transaction so it's safe to unlock it.
964 */
965 logicalrep_rel_close(rel, lockmode);
966 continue;
967 }
968
969 remote_rels = lappend(remote_rels, rel);
970 rels = lappend(rels, rel->localrel);
971 relids = lappend_oid(relids, rel->localreloid);
972 if (RelationIsLogicallyLogged(rel->localrel))
973 relids_logged = lappend_oid(relids_logged, rel->localreloid);
974 }
975
976 /*
977 * Even if we used CASCADE on the upstream master we explicitly default to
978 * replaying changes without further cascading. This might be later
979 * changeable with a user specified option.
980 */
981 ExecuteTruncateGuts(rels, relids, relids_logged, DROP_RESTRICT, restart_seqs);
982
983 foreach(lc, remote_rels)
984 {
985 LogicalRepRelMapEntry *rel = lfirst(lc);
986
987 logicalrep_rel_close(rel, NoLock);
988 }
989
990 end_replication_step();
991 }
992
993
994 /*
995 * Logical replication protocol message dispatcher.
996 */
997 static void
apply_dispatch(StringInfo s)998 apply_dispatch(StringInfo s)
999 {
1000 char action = pq_getmsgbyte(s);
1001
1002 switch (action)
1003 {
1004 /* BEGIN */
1005 case 'B':
1006 apply_handle_begin(s);
1007 break;
1008 /* COMMIT */
1009 case 'C':
1010 apply_handle_commit(s);
1011 break;
1012 /* INSERT */
1013 case 'I':
1014 apply_handle_insert(s);
1015 break;
1016 /* UPDATE */
1017 case 'U':
1018 apply_handle_update(s);
1019 break;
1020 /* DELETE */
1021 case 'D':
1022 apply_handle_delete(s);
1023 break;
1024 /* TRUNCATE */
1025 case 'T':
1026 apply_handle_truncate(s);
1027 break;
1028 /* RELATION */
1029 case 'R':
1030 apply_handle_relation(s);
1031 break;
1032 /* TYPE */
1033 case 'Y':
1034 apply_handle_type(s);
1035 break;
1036 /* ORIGIN */
1037 case 'O':
1038 apply_handle_origin(s);
1039 break;
1040 default:
1041 ereport(ERROR,
1042 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1043 errmsg("invalid logical replication message type \"%c\"", action)));
1044 }
1045 }
1046
1047 /*
1048 * Figure out which write/flush positions to report to the walsender process.
1049 *
1050 * We can't simply report back the last LSN the walsender sent us because the
1051 * local transaction might not yet be flushed to disk locally. Instead we
1052 * build a list that associates local with remote LSNs for every commit. When
1053 * reporting back the flush position to the sender we iterate that list and
1054 * check which entries on it are already locally flushed. Those we can report
1055 * as having been flushed.
1056 *
1057 * The have_pending_txes is true if there are outstanding transactions that
1058 * need to be flushed.
1059 */
1060 static void
get_flush_position(XLogRecPtr * write,XLogRecPtr * flush,bool * have_pending_txes)1061 get_flush_position(XLogRecPtr *write, XLogRecPtr *flush,
1062 bool *have_pending_txes)
1063 {
1064 dlist_mutable_iter iter;
1065 XLogRecPtr local_flush = GetFlushRecPtr();
1066
1067 *write = InvalidXLogRecPtr;
1068 *flush = InvalidXLogRecPtr;
1069
1070 dlist_foreach_modify(iter, &lsn_mapping)
1071 {
1072 FlushPosition *pos =
1073 dlist_container(FlushPosition, node, iter.cur);
1074
1075 *write = pos->remote_end;
1076
1077 if (pos->local_end <= local_flush)
1078 {
1079 *flush = pos->remote_end;
1080 dlist_delete(iter.cur);
1081 pfree(pos);
1082 }
1083 else
1084 {
1085 /*
1086 * Don't want to uselessly iterate over the rest of the list which
1087 * could potentially be long. Instead get the last element and
1088 * grab the write position from there.
1089 */
1090 pos = dlist_tail_element(FlushPosition, node,
1091 &lsn_mapping);
1092 *write = pos->remote_end;
1093 *have_pending_txes = true;
1094 return;
1095 }
1096 }
1097
1098 *have_pending_txes = !dlist_is_empty(&lsn_mapping);
1099 }
1100
1101 /*
1102 * Store current remote/local lsn pair in the tracking list.
1103 */
1104 static void
store_flush_position(XLogRecPtr remote_lsn)1105 store_flush_position(XLogRecPtr remote_lsn)
1106 {
1107 FlushPosition *flushpos;
1108
1109 /* Need to do this in permanent context */
1110 MemoryContextSwitchTo(ApplyContext);
1111
1112 /* Track commit lsn */
1113 flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
1114 flushpos->local_end = XactLastCommitEnd;
1115 flushpos->remote_end = remote_lsn;
1116
1117 dlist_push_tail(&lsn_mapping, &flushpos->node);
1118 MemoryContextSwitchTo(ApplyMessageContext);
1119 }
1120
1121
1122 /* Update statistics of the worker. */
1123 static void
UpdateWorkerStats(XLogRecPtr last_lsn,TimestampTz send_time,bool reply)1124 UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
1125 {
1126 MyLogicalRepWorker->last_lsn = last_lsn;
1127 MyLogicalRepWorker->last_send_time = send_time;
1128 MyLogicalRepWorker->last_recv_time = GetCurrentTimestamp();
1129 if (reply)
1130 {
1131 MyLogicalRepWorker->reply_lsn = last_lsn;
1132 MyLogicalRepWorker->reply_time = send_time;
1133 }
1134 }
1135
1136 /*
1137 * Apply main loop.
1138 */
1139 static void
LogicalRepApplyLoop(XLogRecPtr last_received)1140 LogicalRepApplyLoop(XLogRecPtr last_received)
1141 {
1142 TimestampTz last_recv_timestamp = GetCurrentTimestamp();
1143 bool ping_sent = false;
1144
1145 /*
1146 * Init the ApplyMessageContext which we clean up after each replication
1147 * protocol message.
1148 */
1149 ApplyMessageContext = AllocSetContextCreate(ApplyContext,
1150 "ApplyMessageContext",
1151 ALLOCSET_DEFAULT_SIZES);
1152
1153 /* mark as idle, before starting to loop */
1154 pgstat_report_activity(STATE_IDLE, NULL);
1155
1156 /* This outer loop iterates once per wait. */
1157 for (;;)
1158 {
1159 pgsocket fd = PGINVALID_SOCKET;
1160 int rc;
1161 int len;
1162 char *buf = NULL;
1163 bool endofstream = false;
1164 long wait_time;
1165
1166 CHECK_FOR_INTERRUPTS();
1167
1168 MemoryContextSwitchTo(ApplyMessageContext);
1169
1170 len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
1171
1172 if (len != 0)
1173 {
1174 /* Loop to process all available data (without blocking). */
1175 for (;;)
1176 {
1177 CHECK_FOR_INTERRUPTS();
1178
1179 if (len == 0)
1180 {
1181 break;
1182 }
1183 else if (len < 0)
1184 {
1185 ereport(LOG,
1186 (errmsg("data stream from publisher has ended")));
1187 endofstream = true;
1188 break;
1189 }
1190 else
1191 {
1192 int c;
1193 StringInfoData s;
1194
1195 /* Reset timeout. */
1196 last_recv_timestamp = GetCurrentTimestamp();
1197 ping_sent = false;
1198
1199 /* Ensure we are reading the data into our memory context. */
1200 MemoryContextSwitchTo(ApplyMessageContext);
1201
1202 s.data = buf;
1203 s.len = len;
1204 s.cursor = 0;
1205 s.maxlen = -1;
1206
1207 c = pq_getmsgbyte(&s);
1208
1209 if (c == 'w')
1210 {
1211 XLogRecPtr start_lsn;
1212 XLogRecPtr end_lsn;
1213 TimestampTz send_time;
1214
1215 start_lsn = pq_getmsgint64(&s);
1216 end_lsn = pq_getmsgint64(&s);
1217 send_time = pq_getmsgint64(&s);
1218
1219 if (last_received < start_lsn)
1220 last_received = start_lsn;
1221
1222 if (last_received < end_lsn)
1223 last_received = end_lsn;
1224
1225 UpdateWorkerStats(last_received, send_time, false);
1226
1227 apply_dispatch(&s);
1228 }
1229 else if (c == 'k')
1230 {
1231 XLogRecPtr end_lsn;
1232 TimestampTz timestamp;
1233 bool reply_requested;
1234
1235 end_lsn = pq_getmsgint64(&s);
1236 timestamp = pq_getmsgint64(&s);
1237 reply_requested = pq_getmsgbyte(&s);
1238
1239 if (last_received < end_lsn)
1240 last_received = end_lsn;
1241
1242 send_feedback(last_received, reply_requested, false);
1243 UpdateWorkerStats(last_received, timestamp, true);
1244 }
1245 /* other message types are purposefully ignored */
1246
1247 MemoryContextReset(ApplyMessageContext);
1248 }
1249
1250 len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
1251 }
1252 }
1253
1254 /* confirm all writes so far */
1255 send_feedback(last_received, false, false);
1256
1257 if (!in_remote_transaction)
1258 {
1259 /*
1260 * If we didn't get any transactions for a while there might be
1261 * unconsumed invalidation messages in the queue, consume them
1262 * now.
1263 */
1264 AcceptInvalidationMessages();
1265 maybe_reread_subscription();
1266
1267 /* Process any table synchronization changes. */
1268 process_syncing_tables(last_received);
1269 }
1270
1271 /* Cleanup the memory. */
1272 MemoryContextResetAndDeleteChildren(ApplyMessageContext);
1273 MemoryContextSwitchTo(TopMemoryContext);
1274
1275 /* Check if we need to exit the streaming loop. */
1276 if (endofstream)
1277 {
1278 TimeLineID tli;
1279
1280 walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
1281 break;
1282 }
1283
1284 /*
1285 * Wait for more data or latch. If we have unflushed transactions,
1286 * wake up after WalWriterDelay to see if they've been flushed yet (in
1287 * which case we should send a feedback message). Otherwise, there's
1288 * no particular urgency about waking up unless we get data or a
1289 * signal.
1290 */
1291 if (!dlist_is_empty(&lsn_mapping))
1292 wait_time = WalWriterDelay;
1293 else
1294 wait_time = NAPTIME_PER_CYCLE;
1295
1296 rc = WaitLatchOrSocket(MyLatch,
1297 WL_SOCKET_READABLE | WL_LATCH_SET |
1298 WL_TIMEOUT | WL_POSTMASTER_DEATH,
1299 fd, wait_time,
1300 WAIT_EVENT_LOGICAL_APPLY_MAIN);
1301
1302 /* Emergency bailout if postmaster has died */
1303 if (rc & WL_POSTMASTER_DEATH)
1304 proc_exit(1);
1305
1306 if (rc & WL_LATCH_SET)
1307 {
1308 ResetLatch(MyLatch);
1309 CHECK_FOR_INTERRUPTS();
1310 }
1311
1312 if (got_SIGHUP)
1313 {
1314 got_SIGHUP = false;
1315 ProcessConfigFile(PGC_SIGHUP);
1316 }
1317
1318 if (rc & WL_TIMEOUT)
1319 {
1320 /*
1321 * We didn't receive anything new. If we haven't heard anything
1322 * from the server for more than wal_receiver_timeout / 2, ping
1323 * the server. Also, if it's been longer than
1324 * wal_receiver_status_interval since the last update we sent,
1325 * send a status update to the master anyway, to report any
1326 * progress in applying WAL.
1327 */
1328 bool requestReply = false;
1329
1330 /*
1331 * Check if time since last receive from standby has reached the
1332 * configured limit.
1333 */
1334 if (wal_receiver_timeout > 0)
1335 {
1336 TimestampTz now = GetCurrentTimestamp();
1337 TimestampTz timeout;
1338
1339 timeout =
1340 TimestampTzPlusMilliseconds(last_recv_timestamp,
1341 wal_receiver_timeout);
1342
1343 if (now >= timeout)
1344 ereport(ERROR,
1345 (errmsg("terminating logical replication worker due to timeout")));
1346
1347 /* Check to see if it's time for a ping. */
1348 if (!ping_sent)
1349 {
1350 timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
1351 (wal_receiver_timeout / 2));
1352 if (now >= timeout)
1353 {
1354 requestReply = true;
1355 ping_sent = true;
1356 }
1357 }
1358 }
1359
1360 send_feedback(last_received, requestReply, requestReply);
1361 }
1362 }
1363 }
1364
1365 /*
1366 * Send a Standby Status Update message to server.
1367 *
1368 * 'recvpos' is the latest LSN we've received data to, force is set if we need
1369 * to send a response to avoid timeouts.
1370 */
1371 static void
send_feedback(XLogRecPtr recvpos,bool force,bool requestReply)1372 send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
1373 {
1374 static StringInfo reply_message = NULL;
1375 static TimestampTz send_time = 0;
1376
1377 static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
1378 static XLogRecPtr last_writepos = InvalidXLogRecPtr;
1379 static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
1380
1381 XLogRecPtr writepos;
1382 XLogRecPtr flushpos;
1383 TimestampTz now;
1384 bool have_pending_txes;
1385
1386 /*
1387 * If the user doesn't want status to be reported to the publisher, be
1388 * sure to exit before doing anything at all.
1389 */
1390 if (!force && wal_receiver_status_interval <= 0)
1391 return;
1392
1393 /* It's legal to not pass a recvpos */
1394 if (recvpos < last_recvpos)
1395 recvpos = last_recvpos;
1396
1397 get_flush_position(&writepos, &flushpos, &have_pending_txes);
1398
1399 /*
1400 * No outstanding transactions to flush, we can report the latest received
1401 * position. This is important for synchronous replication.
1402 */
1403 if (!have_pending_txes)
1404 flushpos = writepos = recvpos;
1405
1406 if (writepos < last_writepos)
1407 writepos = last_writepos;
1408
1409 if (flushpos < last_flushpos)
1410 flushpos = last_flushpos;
1411
1412 now = GetCurrentTimestamp();
1413
1414 /* if we've already reported everything we're good */
1415 if (!force &&
1416 writepos == last_writepos &&
1417 flushpos == last_flushpos &&
1418 !TimestampDifferenceExceeds(send_time, now,
1419 wal_receiver_status_interval * 1000))
1420 return;
1421 send_time = now;
1422
1423 if (!reply_message)
1424 {
1425 MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
1426
1427 reply_message = makeStringInfo();
1428 MemoryContextSwitchTo(oldctx);
1429 }
1430 else
1431 resetStringInfo(reply_message);
1432
1433 pq_sendbyte(reply_message, 'r');
1434 pq_sendint64(reply_message, recvpos); /* write */
1435 pq_sendint64(reply_message, flushpos); /* flush */
1436 pq_sendint64(reply_message, writepos); /* apply */
1437 pq_sendint64(reply_message, now); /* sendTime */
1438 pq_sendbyte(reply_message, requestReply); /* replyRequested */
1439
1440 elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
1441 force,
1442 (uint32) (recvpos >> 32), (uint32) recvpos,
1443 (uint32) (writepos >> 32), (uint32) writepos,
1444 (uint32) (flushpos >> 32), (uint32) flushpos
1445 );
1446
1447 walrcv_send(LogRepWorkerWalRcvConn,
1448 reply_message->data, reply_message->len);
1449
1450 if (recvpos > last_recvpos)
1451 last_recvpos = recvpos;
1452 if (writepos > last_writepos)
1453 last_writepos = writepos;
1454 if (flushpos > last_flushpos)
1455 last_flushpos = flushpos;
1456 }
1457
1458 /*
1459 * Reread subscription info if needed. Most changes will be exit.
1460 */
1461 static void
maybe_reread_subscription(void)1462 maybe_reread_subscription(void)
1463 {
1464 MemoryContext oldctx;
1465 Subscription *newsub;
1466 bool started_tx = false;
1467
1468 /* When cache state is valid there is nothing to do here. */
1469 if (MySubscriptionValid)
1470 return;
1471
1472 /* This function might be called inside or outside of transaction. */
1473 if (!IsTransactionState())
1474 {
1475 StartTransactionCommand();
1476 started_tx = true;
1477 }
1478
1479 /* Ensure allocations in permanent context. */
1480 oldctx = MemoryContextSwitchTo(ApplyContext);
1481
1482 newsub = GetSubscription(MyLogicalRepWorker->subid, true);
1483
1484 /*
1485 * Exit if the subscription was removed. This normally should not happen
1486 * as the worker gets killed during DROP SUBSCRIPTION.
1487 */
1488 if (!newsub)
1489 {
1490 ereport(LOG,
1491 (errmsg("logical replication apply worker for subscription \"%s\" will "
1492 "stop because the subscription was removed",
1493 MySubscription->name)));
1494
1495 proc_exit(0);
1496 }
1497
1498 /*
1499 * Exit if the subscription was disabled. This normally should not happen
1500 * as the worker gets killed during ALTER SUBSCRIPTION ... DISABLE.
1501 */
1502 if (!newsub->enabled)
1503 {
1504 ereport(LOG,
1505 (errmsg("logical replication apply worker for subscription \"%s\" will "
1506 "stop because the subscription was disabled",
1507 MySubscription->name)));
1508
1509 proc_exit(0);
1510 }
1511
1512 /*
1513 * Exit if connection string was changed. The launcher will start new
1514 * worker.
1515 */
1516 if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0)
1517 {
1518 ereport(LOG,
1519 (errmsg("logical replication apply worker for subscription \"%s\" will "
1520 "restart because the connection information was changed",
1521 MySubscription->name)));
1522
1523 proc_exit(0);
1524 }
1525
1526 /*
1527 * Exit if subscription name was changed (it's used for
1528 * fallback_application_name). The launcher will start new worker.
1529 */
1530 if (strcmp(newsub->name, MySubscription->name) != 0)
1531 {
1532 ereport(LOG,
1533 (errmsg("logical replication apply worker for subscription \"%s\" will "
1534 "restart because subscription was renamed",
1535 MySubscription->name)));
1536
1537 proc_exit(0);
1538 }
1539
1540 /* !slotname should never happen when enabled is true. */
1541 Assert(newsub->slotname);
1542
1543 /*
1544 * We need to make new connection to new slot if slot name has changed so
1545 * exit here as well if that's the case.
1546 */
1547 if (strcmp(newsub->slotname, MySubscription->slotname) != 0)
1548 {
1549 ereport(LOG,
1550 (errmsg("logical replication apply worker for subscription \"%s\" will "
1551 "restart because the replication slot name was changed",
1552 MySubscription->name)));
1553
1554 proc_exit(0);
1555 }
1556
1557 /*
1558 * Exit if publication list was changed. The launcher will start new
1559 * worker.
1560 */
1561 if (!equal(newsub->publications, MySubscription->publications))
1562 {
1563 ereport(LOG,
1564 (errmsg("logical replication apply worker for subscription \"%s\" will "
1565 "restart because subscription's publications were changed",
1566 MySubscription->name)));
1567
1568 proc_exit(0);
1569 }
1570
1571 /* Check for other changes that should never happen too. */
1572 if (newsub->dbid != MySubscription->dbid)
1573 {
1574 elog(ERROR, "subscription %u changed unexpectedly",
1575 MyLogicalRepWorker->subid);
1576 }
1577
1578 /* Clean old subscription info and switch to new one. */
1579 FreeSubscription(MySubscription);
1580 MySubscription = newsub;
1581
1582 MemoryContextSwitchTo(oldctx);
1583
1584 /* Change synchronous commit according to the user's wishes */
1585 SetConfigOption("synchronous_commit", MySubscription->synccommit,
1586 PGC_BACKEND, PGC_S_OVERRIDE);
1587
1588 if (started_tx)
1589 CommitTransactionCommand();
1590
1591 MySubscriptionValid = true;
1592 }
1593
1594 /*
1595 * Callback from subscription syscache invalidation.
1596 */
1597 static void
subscription_change_cb(Datum arg,int cacheid,uint32 hashvalue)1598 subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
1599 {
1600 MySubscriptionValid = false;
1601 }
1602
1603 /* SIGHUP: set flag to reload configuration at next convenient time */
1604 static void
logicalrep_worker_sighup(SIGNAL_ARGS)1605 logicalrep_worker_sighup(SIGNAL_ARGS)
1606 {
1607 int save_errno = errno;
1608
1609 got_SIGHUP = true;
1610
1611 /* Waken anything waiting on the process latch */
1612 SetLatch(MyLatch);
1613
1614 errno = save_errno;
1615 }
1616
1617 /* Logical Replication Apply worker entry point */
1618 void
ApplyWorkerMain(Datum main_arg)1619 ApplyWorkerMain(Datum main_arg)
1620 {
1621 int worker_slot = DatumGetInt32(main_arg);
1622 MemoryContext oldctx;
1623 char originname[NAMEDATALEN];
1624 XLogRecPtr origin_startpos;
1625 char *myslotname;
1626 WalRcvStreamOptions options;
1627
1628 /* Attach to slot */
1629 logicalrep_worker_attach(worker_slot);
1630
1631 /* Setup signal handling */
1632 pqsignal(SIGHUP, logicalrep_worker_sighup);
1633 pqsignal(SIGTERM, die);
1634 BackgroundWorkerUnblockSignals();
1635
1636 /* Initialise stats to a sanish value */
1637 MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
1638 MyLogicalRepWorker->reply_time = GetCurrentTimestamp();
1639
1640 /* Load the libpq-specific functions */
1641 load_file("libpqwalreceiver", false);
1642
1643 Assert(CurrentResourceOwner == NULL);
1644 CurrentResourceOwner = ResourceOwnerCreate(NULL,
1645 "logical replication apply");
1646
1647 /* Run as replica session replication role. */
1648 SetConfigOption("session_replication_role", "replica",
1649 PGC_SUSET, PGC_S_OVERRIDE);
1650
1651 /* Connect to our database. */
1652 BackgroundWorkerInitializeConnectionByOid(MyLogicalRepWorker->dbid,
1653 MyLogicalRepWorker->userid,
1654 0);
1655
1656 /*
1657 * Set always-secure search path, so malicious users can't redirect user
1658 * code (e.g. pg_index.indexprs).
1659 */
1660 SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
1661
1662 /* Load the subscription into persistent memory context. */
1663 ApplyContext = AllocSetContextCreate(TopMemoryContext,
1664 "ApplyContext",
1665 ALLOCSET_DEFAULT_SIZES);
1666 StartTransactionCommand();
1667 oldctx = MemoryContextSwitchTo(ApplyContext);
1668
1669 MySubscription = GetSubscription(MyLogicalRepWorker->subid, true);
1670 if (!MySubscription)
1671 {
1672 ereport(LOG,
1673 (errmsg("logical replication apply worker for subscription %u will not "
1674 "start because the subscription was removed during startup",
1675 MyLogicalRepWorker->subid)));
1676 proc_exit(0);
1677 }
1678
1679 MySubscriptionValid = true;
1680 MemoryContextSwitchTo(oldctx);
1681
1682 if (!MySubscription->enabled)
1683 {
1684 ereport(LOG,
1685 (errmsg("logical replication apply worker for subscription \"%s\" will not "
1686 "start because the subscription was disabled during startup",
1687 MySubscription->name)));
1688
1689 proc_exit(0);
1690 }
1691
1692 /* Setup synchronous commit according to the user's wishes */
1693 SetConfigOption("synchronous_commit", MySubscription->synccommit,
1694 PGC_BACKEND, PGC_S_OVERRIDE);
1695
1696 /* Keep us informed about subscription changes. */
1697 CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
1698 subscription_change_cb,
1699 (Datum) 0);
1700
1701 if (am_tablesync_worker())
1702 ereport(LOG,
1703 (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
1704 MySubscription->name, get_rel_name(MyLogicalRepWorker->relid))));
1705 else
1706 ereport(LOG,
1707 (errmsg("logical replication apply worker for subscription \"%s\" has started",
1708 MySubscription->name)));
1709
1710 CommitTransactionCommand();
1711
1712 /* Connect to the origin and start the replication. */
1713 elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
1714 MySubscription->conninfo);
1715
1716 if (am_tablesync_worker())
1717 {
1718 char *syncslotname;
1719
1720 /* This is table synchroniation worker, call initial sync. */
1721 syncslotname = LogicalRepSyncTableStart(&origin_startpos);
1722
1723 /* The slot name needs to be allocated in permanent memory context. */
1724 oldctx = MemoryContextSwitchTo(ApplyContext);
1725 myslotname = pstrdup(syncslotname);
1726 MemoryContextSwitchTo(oldctx);
1727
1728 pfree(syncslotname);
1729 }
1730 else
1731 {
1732 /* This is main apply worker */
1733 RepOriginId originid;
1734 TimeLineID startpointTLI;
1735 char *err;
1736 int server_version;
1737
1738 myslotname = MySubscription->slotname;
1739
1740 /*
1741 * This shouldn't happen if the subscription is enabled, but guard
1742 * against DDL bugs or manual catalog changes. (libpqwalreceiver will
1743 * crash if slot is NULL.)
1744 */
1745 if (!myslotname)
1746 ereport(ERROR,
1747 (errmsg("subscription has no replication slot set")));
1748
1749 /* Setup replication origin tracking. */
1750 StartTransactionCommand();
1751 snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid);
1752 originid = replorigin_by_name(originname, true);
1753 if (!OidIsValid(originid))
1754 originid = replorigin_create(originname);
1755 replorigin_session_setup(originid);
1756 replorigin_session_origin = originid;
1757 origin_startpos = replorigin_session_get_progress(false);
1758 CommitTransactionCommand();
1759
1760 LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
1761 MySubscription->name, &err);
1762 if (LogRepWorkerWalRcvConn == NULL)
1763 ereport(ERROR,
1764 (errmsg("could not connect to the publisher: %s", err)));
1765
1766 /*
1767 * We don't really use the output identify_system for anything but it
1768 * does some initializations on the upstream so let's still call it.
1769 */
1770 (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI,
1771 &server_version);
1772 }
1773
1774 /*
1775 * Setup callback for syscache so that we know when something changes in
1776 * the subscription relation state.
1777 */
1778 CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
1779 invalidate_syncing_table_states,
1780 (Datum) 0);
1781
1782 /* Build logical replication streaming options. */
1783 options.logical = true;
1784 options.startpoint = origin_startpos;
1785 options.slotname = myslotname;
1786 options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM;
1787 options.proto.logical.publication_names = MySubscription->publications;
1788
1789 /* Start normal logical streaming replication. */
1790 walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
1791
1792 /* Run the main loop. */
1793 LogicalRepApplyLoop(origin_startpos);
1794
1795 proc_exit(0);
1796 }
1797
1798 /*
1799 * Is current process a logical replication worker?
1800 */
1801 bool
IsLogicalWorker(void)1802 IsLogicalWorker(void)
1803 {
1804 return MyLogicalRepWorker != NULL;
1805 }
1806