1 /*-------------------------------------------------------------------------
2 * worker.c
3 * PostgreSQL logical replication worker (apply)
4 *
5 * Copyright (c) 2016-2017, PostgreSQL Global Development Group
6 *
7 * IDENTIFICATION
8 * src/backend/replication/logical/worker.c
9 *
10 * NOTES
11 * This file contains the worker which applies logical changes as they come
12 * from remote logical replication stream.
13 *
14 * The main worker (apply) is started by logical replication worker
15 * launcher for every enabled subscription in a database. It uses
16 * walsender protocol to communicate with publisher.
17 *
18 * This module includes server facing code and shares libpqwalreceiver
19 * module with walreceiver for providing the libpq specific functionality.
20 *
21 *-------------------------------------------------------------------------
22 */
23
24 #include "postgres.h"
25
26 #include "miscadmin.h"
27 #include "pgstat.h"
28 #include "funcapi.h"
29
30 #include "access/sysattr.h"
31 #include "access/xact.h"
32 #include "access/xlog_internal.h"
33
34 #include "catalog/namespace.h"
35 #include "catalog/pg_subscription.h"
36 #include "catalog/pg_subscription_rel.h"
37
38 #include "commands/trigger.h"
39
40 #include "executor/executor.h"
41 #include "executor/nodeModifyTable.h"
42
43 #include "libpq/pqformat.h"
44 #include "libpq/pqsignal.h"
45
46 #include "mb/pg_wchar.h"
47
48 #include "nodes/makefuncs.h"
49
50 #include "optimizer/planner.h"
51
52 #include "parser/parse_relation.h"
53
54 #include "postmaster/bgworker.h"
55 #include "postmaster/postmaster.h"
56 #include "postmaster/walwriter.h"
57
58 #include "replication/decode.h"
59 #include "replication/logical.h"
60 #include "replication/logicalproto.h"
61 #include "replication/logicalrelation.h"
62 #include "replication/logicalworker.h"
63 #include "replication/reorderbuffer.h"
64 #include "replication/origin.h"
65 #include "replication/snapbuild.h"
66 #include "replication/walreceiver.h"
67 #include "replication/worker_internal.h"
68
69 #include "rewrite/rewriteHandler.h"
70
71 #include "storage/bufmgr.h"
72 #include "storage/ipc.h"
73 #include "storage/lmgr.h"
74 #include "storage/proc.h"
75 #include "storage/procarray.h"
76
77 #include "tcop/tcopprot.h"
78
79 #include "utils/builtins.h"
80 #include "utils/catcache.h"
81 #include "utils/datum.h"
82 #include "utils/fmgroids.h"
83 #include "utils/guc.h"
84 #include "utils/inval.h"
85 #include "utils/lsyscache.h"
86 #include "utils/memutils.h"
87 #include "utils/timeout.h"
88 #include "utils/tqual.h"
89 #include "utils/syscache.h"
90
91 #define NAPTIME_PER_CYCLE 1000 /* max sleep time between cycles (1s) */
92
93 typedef struct FlushPosition
94 {
95 dlist_node node;
96 XLogRecPtr local_end;
97 XLogRecPtr remote_end;
98 } FlushPosition;
99
100 static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
101
102 typedef struct SlotErrCallbackArg
103 {
104 LogicalRepRelMapEntry *rel;
105 int remote_attnum;
106 } SlotErrCallbackArg;
107
108 static MemoryContext ApplyMessageContext = NULL;
109 MemoryContext ApplyContext = NULL;
110
111 WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
112
113 Subscription *MySubscription = NULL;
114 bool MySubscriptionValid = false;
115
116 bool in_remote_transaction = false;
117 static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
118
119 static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
120
121 static void store_flush_position(XLogRecPtr remote_lsn);
122
123 static void maybe_reread_subscription(void);
124
125 /* Flags set by signal handlers */
126 static volatile sig_atomic_t got_SIGHUP = false;
127
128 /*
129 * Should this worker apply changes for given relation.
130 *
131 * This is mainly needed for initial relation data sync as that runs in
132 * separate worker process running in parallel and we need some way to skip
133 * changes coming to the main apply worker during the sync of a table.
134 *
135 * Note we need to do smaller or equals comparison for SYNCDONE state because
136 * it might hold position of end of initial slot consistent point WAL
137 * record + 1 (ie start of next record) and next record can be COMMIT of
138 * transaction we are now processing (which is what we set remote_final_lsn
139 * to in apply_handle_begin).
140 */
141 static bool
should_apply_changes_for_rel(LogicalRepRelMapEntry * rel)142 should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
143 {
144 if (am_tablesync_worker())
145 return MyLogicalRepWorker->relid == rel->localreloid;
146 else
147 return (rel->state == SUBREL_STATE_READY ||
148 (rel->state == SUBREL_STATE_SYNCDONE &&
149 rel->statelsn <= remote_final_lsn));
150 }
151
152 /*
153 * Make sure that we started local transaction.
154 *
155 * Also switches to ApplyMessageContext as necessary.
156 */
157 static bool
ensure_transaction(void)158 ensure_transaction(void)
159 {
160 if (IsTransactionState())
161 {
162 SetCurrentStatementStartTimestamp();
163
164 if (CurrentMemoryContext != ApplyMessageContext)
165 MemoryContextSwitchTo(ApplyMessageContext);
166
167 return false;
168 }
169
170 SetCurrentStatementStartTimestamp();
171 StartTransactionCommand();
172
173 maybe_reread_subscription();
174
175 MemoryContextSwitchTo(ApplyMessageContext);
176 return true;
177 }
178
179
180 /*
181 * Executor state preparation for evaluation of constraint expressions,
182 * indexes and triggers.
183 *
184 * This is based on similar code in copy.c
185 */
186 static EState *
create_estate_for_relation(LogicalRepRelMapEntry * rel)187 create_estate_for_relation(LogicalRepRelMapEntry *rel)
188 {
189 EState *estate;
190 ResultRelInfo *resultRelInfo;
191 RangeTblEntry *rte;
192
193 estate = CreateExecutorState();
194
195 rte = makeNode(RangeTblEntry);
196 rte->rtekind = RTE_RELATION;
197 rte->relid = RelationGetRelid(rel->localrel);
198 rte->relkind = rel->localrel->rd_rel->relkind;
199 estate->es_range_table = list_make1(rte);
200
201 resultRelInfo = makeNode(ResultRelInfo);
202 InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
203
204 estate->es_result_relations = resultRelInfo;
205 estate->es_num_result_relations = 1;
206 estate->es_result_relation_info = resultRelInfo;
207
208 estate->es_output_cid = GetCurrentCommandId(true);
209
210 /* Triggers might need a slot */
211 if (resultRelInfo->ri_TrigDesc)
212 estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate);
213
214 /* Prepare to catch AFTER triggers. */
215 AfterTriggerBeginQuery();
216
217 return estate;
218 }
219
220 /*
221 * Executes default values for columns for which we can't map to remote
222 * relation columns.
223 *
224 * This allows us to support tables which have more columns on the downstream
225 * than on the upstream.
226 */
227 static void
slot_fill_defaults(LogicalRepRelMapEntry * rel,EState * estate,TupleTableSlot * slot)228 slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate,
229 TupleTableSlot *slot)
230 {
231 TupleDesc desc = RelationGetDescr(rel->localrel);
232 int num_phys_attrs = desc->natts;
233 int i;
234 int attnum,
235 num_defaults = 0;
236 int *defmap;
237 ExprState **defexprs;
238 ExprContext *econtext;
239
240 econtext = GetPerTupleExprContext(estate);
241
242 /* We got all the data via replication, no need to evaluate anything. */
243 if (num_phys_attrs == rel->remoterel.natts)
244 return;
245
246 defmap = (int *) palloc(num_phys_attrs * sizeof(int));
247 defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
248
249 for (attnum = 0; attnum < num_phys_attrs; attnum++)
250 {
251 Expr *defexpr;
252
253 if (desc->attrs[attnum]->attisdropped)
254 continue;
255
256 if (rel->attrmap[attnum] >= 0)
257 continue;
258
259 defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
260
261 if (defexpr != NULL)
262 {
263 /* Run the expression through planner */
264 defexpr = expression_planner(defexpr);
265
266 /* Initialize executable expression in copycontext */
267 defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
268 defmap[num_defaults] = attnum;
269 num_defaults++;
270 }
271
272 }
273
274 for (i = 0; i < num_defaults; i++)
275 slot->tts_values[defmap[i]] =
276 ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
277 }
278
279 /*
280 * Error callback to give more context info about data conversion failures
281 * while reading data from the remote server.
282 */
283 static void
slot_store_error_callback(void * arg)284 slot_store_error_callback(void *arg)
285 {
286 SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
287 LogicalRepRelMapEntry *rel;
288
289 /* Nothing to do if remote attribute number is not set */
290 if (errarg->remote_attnum < 0)
291 return;
292
293 rel = errarg->rel;
294 errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\"",
295 rel->remoterel.nspname, rel->remoterel.relname,
296 rel->remoterel.attnames[errarg->remote_attnum]);
297 }
298
299 /*
300 * Store data in C string form into slot.
301 * This is similar to BuildTupleFromCStrings but TupleTableSlot fits our
302 * use better.
303 */
304 static void
slot_store_cstrings(TupleTableSlot * slot,LogicalRepRelMapEntry * rel,char ** values)305 slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
306 char **values)
307 {
308 int natts = slot->tts_tupleDescriptor->natts;
309 int i;
310 SlotErrCallbackArg errarg;
311 ErrorContextCallback errcallback;
312
313 ExecClearTuple(slot);
314
315 /* Push callback + info on the error context stack */
316 errarg.rel = rel;
317 errarg.remote_attnum = -1;
318 errcallback.callback = slot_store_error_callback;
319 errcallback.arg = (void *) &errarg;
320 errcallback.previous = error_context_stack;
321 error_context_stack = &errcallback;
322
323 /* Call the "in" function for each non-dropped attribute */
324 for (i = 0; i < natts; i++)
325 {
326 Form_pg_attribute att = slot->tts_tupleDescriptor->attrs[i];
327 int remoteattnum = rel->attrmap[i];
328
329 if (!att->attisdropped && remoteattnum >= 0 &&
330 values[remoteattnum] != NULL)
331 {
332 Oid typinput;
333 Oid typioparam;
334
335 errarg.remote_attnum = remoteattnum;
336
337 getTypeInputInfo(att->atttypid, &typinput, &typioparam);
338 slot->tts_values[i] =
339 OidInputFunctionCall(typinput, values[remoteattnum],
340 typioparam, att->atttypmod);
341 slot->tts_isnull[i] = false;
342
343 errarg.remote_attnum = -1;
344 }
345 else
346 {
347 /*
348 * We assign NULL to dropped attributes, NULL values, and missing
349 * values (missing values should be later filled using
350 * slot_fill_defaults).
351 */
352 slot->tts_values[i] = (Datum) 0;
353 slot->tts_isnull[i] = true;
354 }
355 }
356
357 /* Pop the error context stack */
358 error_context_stack = errcallback.previous;
359
360 ExecStoreVirtualTuple(slot);
361 }
362
363 /*
364 * Replace selected columns with user data provided as C strings.
365 * This is somewhat similar to heap_modify_tuple but also calls the type
366 * input functions on the user data.
367 * "slot" is filled with a copy of the tuple in "srcslot", with
368 * columns selected by the "replaces" array replaced with data values
369 * from "values".
370 * Caution: unreplaced pass-by-ref columns in "slot" will point into the
371 * storage for "srcslot". This is OK for current usage, but someday we may
372 * need to materialize "slot" at the end to make it independent of "srcslot".
373 */
374 static void
slot_modify_cstrings(TupleTableSlot * slot,TupleTableSlot * srcslot,LogicalRepRelMapEntry * rel,char ** values,bool * replaces)375 slot_modify_cstrings(TupleTableSlot *slot, TupleTableSlot *srcslot,
376 LogicalRepRelMapEntry *rel,
377 char **values, bool *replaces)
378 {
379 int natts = slot->tts_tupleDescriptor->natts;
380 int i;
381 SlotErrCallbackArg errarg;
382 ErrorContextCallback errcallback;
383
384 /* We'll fill "slot" with a virtual tuple, so we must start with ... */
385 ExecClearTuple(slot);
386
387 /*
388 * Copy all the column data from srcslot, so that we'll have valid values
389 * for unreplaced columns.
390 */
391 Assert(natts == srcslot->tts_tupleDescriptor->natts);
392 slot_getallattrs(srcslot);
393 memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
394 memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
395
396 /* For error reporting, push callback + info on the error context stack */
397 errarg.rel = rel;
398 errarg.remote_attnum = -1;
399 errcallback.callback = slot_store_error_callback;
400 errcallback.arg = (void *) &errarg;
401 errcallback.previous = error_context_stack;
402 error_context_stack = &errcallback;
403
404 /* Call the "in" function for each replaced attribute */
405 for (i = 0; i < natts; i++)
406 {
407 Form_pg_attribute att = slot->tts_tupleDescriptor->attrs[i];
408 int remoteattnum = rel->attrmap[i];
409
410 if (remoteattnum < 0)
411 continue;
412
413 if (!replaces[remoteattnum])
414 continue;
415
416 if (values[remoteattnum] != NULL)
417 {
418 Oid typinput;
419 Oid typioparam;
420
421 errarg.remote_attnum = remoteattnum;
422
423 getTypeInputInfo(att->atttypid, &typinput, &typioparam);
424 slot->tts_values[i] =
425 OidInputFunctionCall(typinput, values[remoteattnum],
426 typioparam, att->atttypmod);
427 slot->tts_isnull[i] = false;
428
429 errarg.remote_attnum = -1;
430 }
431 else
432 {
433 slot->tts_values[i] = (Datum) 0;
434 slot->tts_isnull[i] = true;
435 }
436 }
437
438 /* Pop the error context stack */
439 error_context_stack = errcallback.previous;
440
441 /* And finally, declare that "slot" contains a valid virtual tuple */
442 ExecStoreVirtualTuple(slot);
443 }
444
445 /*
446 * Handle BEGIN message.
447 */
448 static void
apply_handle_begin(StringInfo s)449 apply_handle_begin(StringInfo s)
450 {
451 LogicalRepBeginData begin_data;
452
453 logicalrep_read_begin(s, &begin_data);
454
455 remote_final_lsn = begin_data.final_lsn;
456
457 in_remote_transaction = true;
458
459 pgstat_report_activity(STATE_RUNNING, NULL);
460 }
461
462 /*
463 * Handle COMMIT message.
464 *
465 * TODO, support tracking of multiple origins
466 */
467 static void
apply_handle_commit(StringInfo s)468 apply_handle_commit(StringInfo s)
469 {
470 LogicalRepCommitData commit_data;
471
472 logicalrep_read_commit(s, &commit_data);
473
474 if (commit_data.commit_lsn != remote_final_lsn)
475 ereport(ERROR,
476 (errcode(ERRCODE_PROTOCOL_VIOLATION),
477 errmsg_internal("incorrect commit LSN %X/%X in commit message (expected %X/%X)",
478 (uint32) (commit_data.commit_lsn >> 32),
479 (uint32) commit_data.commit_lsn,
480 (uint32) (remote_final_lsn >> 32),
481 (uint32) remote_final_lsn)));
482
483 /* The synchronization worker runs in single transaction. */
484 if (IsTransactionState() && !am_tablesync_worker())
485 {
486 /*
487 * Update origin state so we can restart streaming from correct
488 * position in case of crash.
489 */
490 replorigin_session_origin_lsn = commit_data.end_lsn;
491 replorigin_session_origin_timestamp = commit_data.committime;
492
493 CommitTransactionCommand();
494 pgstat_report_stat(false);
495
496 store_flush_position(commit_data.end_lsn);
497 }
498 else
499 {
500 /* Process any invalidation messages that might have accumulated. */
501 AcceptInvalidationMessages();
502 maybe_reread_subscription();
503 }
504
505 in_remote_transaction = false;
506
507 /* Process any tables that are being synchronized in parallel. */
508 process_syncing_tables(commit_data.end_lsn);
509
510 pgstat_report_activity(STATE_IDLE, NULL);
511 }
512
513 /*
514 * Handle ORIGIN message.
515 *
516 * TODO, support tracking of multiple origins
517 */
518 static void
apply_handle_origin(StringInfo s)519 apply_handle_origin(StringInfo s)
520 {
521 /*
522 * ORIGIN message can only come inside remote transaction and before any
523 * actual writes.
524 */
525 if (!in_remote_transaction ||
526 (IsTransactionState() && !am_tablesync_worker()))
527 ereport(ERROR,
528 (errcode(ERRCODE_PROTOCOL_VIOLATION),
529 errmsg("ORIGIN message sent out of order")));
530 }
531
532 /*
533 * Handle RELATION message.
534 *
535 * Note we don't do validation against local schema here. The validation
536 * against local schema is postponed until first change for given relation
537 * comes as we only care about it when applying changes for it anyway and we
538 * do less locking this way.
539 */
540 static void
apply_handle_relation(StringInfo s)541 apply_handle_relation(StringInfo s)
542 {
543 LogicalRepRelation *rel;
544
545 rel = logicalrep_read_rel(s);
546 logicalrep_relmap_update(rel);
547 }
548
549 /*
550 * Handle TYPE message.
551 *
552 * This is now vestigial; we read the info and discard it.
553 */
554 static void
apply_handle_type(StringInfo s)555 apply_handle_type(StringInfo s)
556 {
557 LogicalRepTyp typ;
558
559 logicalrep_read_typ(s, &typ);
560 }
561
562 /*
563 * Get replica identity index or if it is not defined a primary key.
564 *
565 * If neither is defined, returns InvalidOid
566 */
567 static Oid
GetRelationIdentityOrPK(Relation rel)568 GetRelationIdentityOrPK(Relation rel)
569 {
570 Oid idxoid;
571
572 idxoid = RelationGetReplicaIndex(rel);
573
574 if (!OidIsValid(idxoid))
575 idxoid = RelationGetPrimaryKeyIndex(rel);
576
577 return idxoid;
578 }
579
580 /*
581 * Handle INSERT message.
582 */
583 static void
apply_handle_insert(StringInfo s)584 apply_handle_insert(StringInfo s)
585 {
586 LogicalRepRelMapEntry *rel;
587 LogicalRepTupleData newtup;
588 LogicalRepRelId relid;
589 EState *estate;
590 TupleTableSlot *remoteslot;
591 MemoryContext oldctx;
592
593 ensure_transaction();
594
595 relid = logicalrep_read_insert(s, &newtup);
596 rel = logicalrep_rel_open(relid, RowExclusiveLock);
597 if (!should_apply_changes_for_rel(rel))
598 {
599 /*
600 * The relation can't become interesting in the middle of the
601 * transaction so it's safe to unlock it.
602 */
603 logicalrep_rel_close(rel, RowExclusiveLock);
604 return;
605 }
606
607 /* Initialize the executor state. */
608 estate = create_estate_for_relation(rel);
609 remoteslot = ExecInitExtraTupleSlot(estate);
610 ExecSetSlotDescriptor(remoteslot, RelationGetDescr(rel->localrel));
611
612 /* Input functions may need an active snapshot, so get one */
613 PushActiveSnapshot(GetTransactionSnapshot());
614
615 /* Process and store remote tuple in the slot */
616 oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
617 slot_store_cstrings(remoteslot, rel, newtup.values);
618 slot_fill_defaults(rel, estate, remoteslot);
619 MemoryContextSwitchTo(oldctx);
620
621 ExecOpenIndices(estate->es_result_relation_info, false);
622
623 /* Do the insert. */
624 ExecSimpleRelationInsert(estate, remoteslot);
625
626 /* Cleanup. */
627 ExecCloseIndices(estate->es_result_relation_info);
628 PopActiveSnapshot();
629
630 /* Handle queued AFTER triggers. */
631 AfterTriggerEndQuery(estate);
632
633 ExecResetTupleTable(estate->es_tupleTable, false);
634 FreeExecutorState(estate);
635
636 logicalrep_rel_close(rel, NoLock);
637
638 CommandCounterIncrement();
639 }
640
641 /*
642 * Check if the logical replication relation is updatable and throw
643 * appropriate error if it isn't.
644 */
645 static void
check_relation_updatable(LogicalRepRelMapEntry * rel)646 check_relation_updatable(LogicalRepRelMapEntry *rel)
647 {
648 /* Updatable, no error. */
649 if (rel->updatable)
650 return;
651
652 /*
653 * We are in error mode so it's fine this is somewhat slow. It's better to
654 * give user correct error.
655 */
656 if (OidIsValid(GetRelationIdentityOrPK(rel->localrel)))
657 {
658 ereport(ERROR,
659 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
660 errmsg("publisher did not send replica identity column "
661 "expected by the logical replication target relation \"%s.%s\"",
662 rel->remoterel.nspname, rel->remoterel.relname)));
663 }
664
665 ereport(ERROR,
666 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
667 errmsg("logical replication target relation \"%s.%s\" has "
668 "neither REPLICA IDENTITY index nor PRIMARY "
669 "KEY and published relation does not have "
670 "REPLICA IDENTITY FULL",
671 rel->remoterel.nspname, rel->remoterel.relname)));
672 }
673
674 /*
675 * Handle UPDATE message.
676 *
677 * TODO: FDW support
678 */
679 static void
apply_handle_update(StringInfo s)680 apply_handle_update(StringInfo s)
681 {
682 LogicalRepRelMapEntry *rel;
683 LogicalRepRelId relid;
684 Oid idxoid;
685 EState *estate;
686 EPQState epqstate;
687 LogicalRepTupleData oldtup;
688 LogicalRepTupleData newtup;
689 bool has_oldtup;
690 TupleTableSlot *localslot;
691 TupleTableSlot *remoteslot;
692 RangeTblEntry *target_rte;
693 int i;
694 bool found;
695 MemoryContext oldctx;
696
697 ensure_transaction();
698
699 relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
700 &newtup);
701 rel = logicalrep_rel_open(relid, RowExclusiveLock);
702 if (!should_apply_changes_for_rel(rel))
703 {
704 /*
705 * The relation can't become interesting in the middle of the
706 * transaction so it's safe to unlock it.
707 */
708 logicalrep_rel_close(rel, RowExclusiveLock);
709 return;
710 }
711
712 /* Check if we can do the update. */
713 check_relation_updatable(rel);
714
715 /* Initialize the executor state. */
716 estate = create_estate_for_relation(rel);
717 remoteslot = ExecInitExtraTupleSlot(estate);
718 ExecSetSlotDescriptor(remoteslot, RelationGetDescr(rel->localrel));
719 localslot = ExecInitExtraTupleSlot(estate);
720 ExecSetSlotDescriptor(localslot, RelationGetDescr(rel->localrel));
721 EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
722
723 /*
724 * Populate updatedCols so that per-column triggers can fire. This could
725 * include more columns than were actually changed on the publisher
726 * because the logical replication protocol doesn't contain that
727 * information. But it would for example exclude columns that only exist
728 * on the subscriber, since we are not touching those.
729 */
730 target_rte = list_nth(estate->es_range_table, 0);
731 for (i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
732 {
733 Form_pg_attribute att = TupleDescAttr(remoteslot->tts_tupleDescriptor, i);
734 int remoteattnum = rel->attrmap[i];
735
736 if (!att->attisdropped && remoteattnum >= 0)
737 {
738 if (newtup.changed[remoteattnum])
739 target_rte->updatedCols =
740 bms_add_member(target_rte->updatedCols,
741 i + 1 - FirstLowInvalidHeapAttributeNumber);
742 }
743 }
744
745 PushActiveSnapshot(GetTransactionSnapshot());
746 ExecOpenIndices(estate->es_result_relation_info, false);
747
748 /* Build the search tuple. */
749 oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
750 slot_store_cstrings(remoteslot, rel,
751 has_oldtup ? oldtup.values : newtup.values);
752 MemoryContextSwitchTo(oldctx);
753
754 /*
755 * Try to find tuple using either replica identity index, primary key or
756 * if needed, sequential scan.
757 */
758 idxoid = GetRelationIdentityOrPK(rel->localrel);
759 Assert(OidIsValid(idxoid) ||
760 (rel->remoterel.replident == REPLICA_IDENTITY_FULL && has_oldtup));
761
762 if (OidIsValid(idxoid))
763 found = RelationFindReplTupleByIndex(rel->localrel, idxoid,
764 LockTupleExclusive,
765 remoteslot, localslot);
766 else
767 found = RelationFindReplTupleSeq(rel->localrel, LockTupleExclusive,
768 remoteslot, localslot);
769
770 ExecClearTuple(remoteslot);
771
772 /*
773 * Tuple found.
774 *
775 * Note this will fail if there are other conflicting unique indexes.
776 */
777 if (found)
778 {
779 /* Process and store remote tuple in the slot */
780 oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
781 slot_modify_cstrings(remoteslot, localslot, rel,
782 newtup.values, newtup.changed);
783 MemoryContextSwitchTo(oldctx);
784
785 EvalPlanQualSetSlot(&epqstate, remoteslot);
786
787 /* Do the actual update. */
788 ExecSimpleRelationUpdate(estate, &epqstate, localslot, remoteslot);
789 }
790 else
791 {
792 /*
793 * The tuple to be updated could not be found.
794 *
795 * TODO what to do here, change the log level to LOG perhaps?
796 */
797 elog(DEBUG1,
798 "logical replication did not find row for update "
799 "in replication target relation \"%s\"",
800 RelationGetRelationName(rel->localrel));
801 }
802
803 /* Cleanup. */
804 ExecCloseIndices(estate->es_result_relation_info);
805 PopActiveSnapshot();
806
807 /* Handle queued AFTER triggers. */
808 AfterTriggerEndQuery(estate);
809
810 EvalPlanQualEnd(&epqstate);
811 ExecResetTupleTable(estate->es_tupleTable, false);
812 FreeExecutorState(estate);
813
814 logicalrep_rel_close(rel, NoLock);
815
816 CommandCounterIncrement();
817 }
818
819 /*
820 * Handle DELETE message.
821 *
822 * TODO: FDW support
823 */
824 static void
apply_handle_delete(StringInfo s)825 apply_handle_delete(StringInfo s)
826 {
827 LogicalRepRelMapEntry *rel;
828 LogicalRepTupleData oldtup;
829 LogicalRepRelId relid;
830 Oid idxoid;
831 EState *estate;
832 EPQState epqstate;
833 TupleTableSlot *remoteslot;
834 TupleTableSlot *localslot;
835 bool found;
836 MemoryContext oldctx;
837
838 ensure_transaction();
839
840 relid = logicalrep_read_delete(s, &oldtup);
841 rel = logicalrep_rel_open(relid, RowExclusiveLock);
842 if (!should_apply_changes_for_rel(rel))
843 {
844 /*
845 * The relation can't become interesting in the middle of the
846 * transaction so it's safe to unlock it.
847 */
848 logicalrep_rel_close(rel, RowExclusiveLock);
849 return;
850 }
851
852 /* Check if we can do the delete. */
853 check_relation_updatable(rel);
854
855 /* Initialize the executor state. */
856 estate = create_estate_for_relation(rel);
857 remoteslot = ExecInitExtraTupleSlot(estate);
858 ExecSetSlotDescriptor(remoteslot, RelationGetDescr(rel->localrel));
859 localslot = ExecInitExtraTupleSlot(estate);
860 ExecSetSlotDescriptor(localslot, RelationGetDescr(rel->localrel));
861 EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
862
863 PushActiveSnapshot(GetTransactionSnapshot());
864 ExecOpenIndices(estate->es_result_relation_info, false);
865
866 /* Find the tuple using the replica identity index. */
867 oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
868 slot_store_cstrings(remoteslot, rel, oldtup.values);
869 MemoryContextSwitchTo(oldctx);
870
871 /*
872 * Try to find tuple using either replica identity index, primary key or
873 * if needed, sequential scan.
874 */
875 idxoid = GetRelationIdentityOrPK(rel->localrel);
876 Assert(OidIsValid(idxoid) ||
877 (rel->remoterel.replident == REPLICA_IDENTITY_FULL));
878
879 if (OidIsValid(idxoid))
880 found = RelationFindReplTupleByIndex(rel->localrel, idxoid,
881 LockTupleExclusive,
882 remoteslot, localslot);
883 else
884 found = RelationFindReplTupleSeq(rel->localrel, LockTupleExclusive,
885 remoteslot, localslot);
886 /* If found delete it. */
887 if (found)
888 {
889 EvalPlanQualSetSlot(&epqstate, localslot);
890
891 /* Do the actual delete. */
892 ExecSimpleRelationDelete(estate, &epqstate, localslot);
893 }
894 else
895 {
896 /* The tuple to be deleted could not be found. */
897 ereport(DEBUG1,
898 (errmsg("logical replication could not find row for delete "
899 "in replication target relation \"%s\"",
900 RelationGetRelationName(rel->localrel))));
901 }
902
903 /* Cleanup. */
904 ExecCloseIndices(estate->es_result_relation_info);
905 PopActiveSnapshot();
906
907 /* Handle queued AFTER triggers. */
908 AfterTriggerEndQuery(estate);
909
910 EvalPlanQualEnd(&epqstate);
911 ExecResetTupleTable(estate->es_tupleTable, false);
912 FreeExecutorState(estate);
913
914 logicalrep_rel_close(rel, NoLock);
915
916 CommandCounterIncrement();
917 }
918
919
920 /*
921 * Logical replication protocol message dispatcher.
922 */
923 static void
apply_dispatch(StringInfo s)924 apply_dispatch(StringInfo s)
925 {
926 char action = pq_getmsgbyte(s);
927
928 switch (action)
929 {
930 /* BEGIN */
931 case 'B':
932 apply_handle_begin(s);
933 break;
934 /* COMMIT */
935 case 'C':
936 apply_handle_commit(s);
937 break;
938 /* INSERT */
939 case 'I':
940 apply_handle_insert(s);
941 break;
942 /* UPDATE */
943 case 'U':
944 apply_handle_update(s);
945 break;
946 /* DELETE */
947 case 'D':
948 apply_handle_delete(s);
949 break;
950 /* RELATION */
951 case 'R':
952 apply_handle_relation(s);
953 break;
954 /* TYPE */
955 case 'Y':
956 apply_handle_type(s);
957 break;
958 /* ORIGIN */
959 case 'O':
960 apply_handle_origin(s);
961 break;
962 default:
963 ereport(ERROR,
964 (errcode(ERRCODE_PROTOCOL_VIOLATION),
965 errmsg("invalid logical replication message type \"%c\"", action)));
966 }
967 }
968
969 /*
970 * Figure out which write/flush positions to report to the walsender process.
971 *
972 * We can't simply report back the last LSN the walsender sent us because the
973 * local transaction might not yet be flushed to disk locally. Instead we
974 * build a list that associates local with remote LSNs for every commit. When
975 * reporting back the flush position to the sender we iterate that list and
976 * check which entries on it are already locally flushed. Those we can report
977 * as having been flushed.
978 *
979 * The have_pending_txes is true if there are outstanding transactions that
980 * need to be flushed.
981 */
982 static void
get_flush_position(XLogRecPtr * write,XLogRecPtr * flush,bool * have_pending_txes)983 get_flush_position(XLogRecPtr *write, XLogRecPtr *flush,
984 bool *have_pending_txes)
985 {
986 dlist_mutable_iter iter;
987 XLogRecPtr local_flush = GetFlushRecPtr();
988
989 *write = InvalidXLogRecPtr;
990 *flush = InvalidXLogRecPtr;
991
992 dlist_foreach_modify(iter, &lsn_mapping)
993 {
994 FlushPosition *pos =
995 dlist_container(FlushPosition, node, iter.cur);
996
997 *write = pos->remote_end;
998
999 if (pos->local_end <= local_flush)
1000 {
1001 *flush = pos->remote_end;
1002 dlist_delete(iter.cur);
1003 pfree(pos);
1004 }
1005 else
1006 {
1007 /*
1008 * Don't want to uselessly iterate over the rest of the list which
1009 * could potentially be long. Instead get the last element and
1010 * grab the write position from there.
1011 */
1012 pos = dlist_tail_element(FlushPosition, node,
1013 &lsn_mapping);
1014 *write = pos->remote_end;
1015 *have_pending_txes = true;
1016 return;
1017 }
1018 }
1019
1020 *have_pending_txes = !dlist_is_empty(&lsn_mapping);
1021 }
1022
1023 /*
1024 * Store current remote/local lsn pair in the tracking list.
1025 */
1026 static void
store_flush_position(XLogRecPtr remote_lsn)1027 store_flush_position(XLogRecPtr remote_lsn)
1028 {
1029 FlushPosition *flushpos;
1030
1031 /* Need to do this in permanent context */
1032 MemoryContextSwitchTo(ApplyContext);
1033
1034 /* Track commit lsn */
1035 flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
1036 flushpos->local_end = XactLastCommitEnd;
1037 flushpos->remote_end = remote_lsn;
1038
1039 dlist_push_tail(&lsn_mapping, &flushpos->node);
1040 MemoryContextSwitchTo(ApplyMessageContext);
1041 }
1042
1043
1044 /* Update statistics of the worker. */
1045 static void
UpdateWorkerStats(XLogRecPtr last_lsn,TimestampTz send_time,bool reply)1046 UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
1047 {
1048 MyLogicalRepWorker->last_lsn = last_lsn;
1049 MyLogicalRepWorker->last_send_time = send_time;
1050 MyLogicalRepWorker->last_recv_time = GetCurrentTimestamp();
1051 if (reply)
1052 {
1053 MyLogicalRepWorker->reply_lsn = last_lsn;
1054 MyLogicalRepWorker->reply_time = send_time;
1055 }
1056 }
1057
1058 /*
1059 * Apply main loop.
1060 */
1061 static void
LogicalRepApplyLoop(XLogRecPtr last_received)1062 LogicalRepApplyLoop(XLogRecPtr last_received)
1063 {
1064 TimestampTz last_recv_timestamp = GetCurrentTimestamp();
1065 bool ping_sent = false;
1066
1067 /*
1068 * Init the ApplyMessageContext which we clean up after each replication
1069 * protocol message.
1070 */
1071 ApplyMessageContext = AllocSetContextCreate(ApplyContext,
1072 "ApplyMessageContext",
1073 ALLOCSET_DEFAULT_SIZES);
1074
1075 /* mark as idle, before starting to loop */
1076 pgstat_report_activity(STATE_IDLE, NULL);
1077
1078 /* This outer loop iterates once per wait. */
1079 for (;;)
1080 {
1081 pgsocket fd = PGINVALID_SOCKET;
1082 int rc;
1083 int len;
1084 char *buf = NULL;
1085 bool endofstream = false;
1086 long wait_time;
1087
1088 CHECK_FOR_INTERRUPTS();
1089
1090 MemoryContextSwitchTo(ApplyMessageContext);
1091
1092 len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
1093
1094 if (len != 0)
1095 {
1096 /* Loop to process all available data (without blocking). */
1097 for (;;)
1098 {
1099 CHECK_FOR_INTERRUPTS();
1100
1101 if (len == 0)
1102 {
1103 break;
1104 }
1105 else if (len < 0)
1106 {
1107 ereport(LOG,
1108 (errmsg("data stream from publisher has ended")));
1109 endofstream = true;
1110 break;
1111 }
1112 else
1113 {
1114 int c;
1115 StringInfoData s;
1116
1117 /* Reset timeout. */
1118 last_recv_timestamp = GetCurrentTimestamp();
1119 ping_sent = false;
1120
1121 /* Ensure we are reading the data into our memory context. */
1122 MemoryContextSwitchTo(ApplyMessageContext);
1123
1124 s.data = buf;
1125 s.len = len;
1126 s.cursor = 0;
1127 s.maxlen = -1;
1128
1129 c = pq_getmsgbyte(&s);
1130
1131 if (c == 'w')
1132 {
1133 XLogRecPtr start_lsn;
1134 XLogRecPtr end_lsn;
1135 TimestampTz send_time;
1136
1137 start_lsn = pq_getmsgint64(&s);
1138 end_lsn = pq_getmsgint64(&s);
1139 send_time = pq_getmsgint64(&s);
1140
1141 if (last_received < start_lsn)
1142 last_received = start_lsn;
1143
1144 if (last_received < end_lsn)
1145 last_received = end_lsn;
1146
1147 UpdateWorkerStats(last_received, send_time, false);
1148
1149 apply_dispatch(&s);
1150 }
1151 else if (c == 'k')
1152 {
1153 XLogRecPtr end_lsn;
1154 TimestampTz timestamp;
1155 bool reply_requested;
1156
1157 end_lsn = pq_getmsgint64(&s);
1158 timestamp = pq_getmsgint64(&s);
1159 reply_requested = pq_getmsgbyte(&s);
1160
1161 if (last_received < end_lsn)
1162 last_received = end_lsn;
1163
1164 send_feedback(last_received, reply_requested, false);
1165 UpdateWorkerStats(last_received, timestamp, true);
1166 }
1167 /* other message types are purposefully ignored */
1168
1169 MemoryContextReset(ApplyMessageContext);
1170 }
1171
1172 len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
1173 }
1174 }
1175
1176 /* confirm all writes so far */
1177 send_feedback(last_received, false, false);
1178
1179 if (!in_remote_transaction)
1180 {
1181 /*
1182 * If we didn't get any transactions for a while there might be
1183 * unconsumed invalidation messages in the queue, consume them
1184 * now.
1185 */
1186 AcceptInvalidationMessages();
1187 maybe_reread_subscription();
1188
1189 /* Process any table synchronization changes. */
1190 process_syncing_tables(last_received);
1191 }
1192
1193 /* Cleanup the memory. */
1194 MemoryContextResetAndDeleteChildren(ApplyMessageContext);
1195 MemoryContextSwitchTo(TopMemoryContext);
1196
1197 /* Check if we need to exit the streaming loop. */
1198 if (endofstream)
1199 {
1200 TimeLineID tli;
1201
1202 walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
1203 break;
1204 }
1205
1206 /*
1207 * Wait for more data or latch. If we have unflushed transactions,
1208 * wake up after WalWriterDelay to see if they've been flushed yet (in
1209 * which case we should send a feedback message). Otherwise, there's
1210 * no particular urgency about waking up unless we get data or a
1211 * signal.
1212 */
1213 if (!dlist_is_empty(&lsn_mapping))
1214 wait_time = WalWriterDelay;
1215 else
1216 wait_time = NAPTIME_PER_CYCLE;
1217
1218 rc = WaitLatchOrSocket(MyLatch,
1219 WL_SOCKET_READABLE | WL_LATCH_SET |
1220 WL_TIMEOUT | WL_POSTMASTER_DEATH,
1221 fd, wait_time,
1222 WAIT_EVENT_LOGICAL_APPLY_MAIN);
1223
1224 /* Emergency bailout if postmaster has died */
1225 if (rc & WL_POSTMASTER_DEATH)
1226 proc_exit(1);
1227
1228 if (rc & WL_LATCH_SET)
1229 {
1230 ResetLatch(MyLatch);
1231 CHECK_FOR_INTERRUPTS();
1232 }
1233
1234 if (got_SIGHUP)
1235 {
1236 got_SIGHUP = false;
1237 ProcessConfigFile(PGC_SIGHUP);
1238 }
1239
1240 if (rc & WL_TIMEOUT)
1241 {
1242 /*
1243 * We didn't receive anything new. If we haven't heard anything
1244 * from the server for more than wal_receiver_timeout / 2, ping
1245 * the server. Also, if it's been longer than
1246 * wal_receiver_status_interval since the last update we sent,
1247 * send a status update to the master anyway, to report any
1248 * progress in applying WAL.
1249 */
1250 bool requestReply = false;
1251
1252 /*
1253 * Check if time since last receive from standby has reached the
1254 * configured limit.
1255 */
1256 if (wal_receiver_timeout > 0)
1257 {
1258 TimestampTz now = GetCurrentTimestamp();
1259 TimestampTz timeout;
1260
1261 timeout =
1262 TimestampTzPlusMilliseconds(last_recv_timestamp,
1263 wal_receiver_timeout);
1264
1265 if (now >= timeout)
1266 ereport(ERROR,
1267 (errmsg("terminating logical replication worker due to timeout")));
1268
1269 /* Check to see if it's time for a ping. */
1270 if (!ping_sent)
1271 {
1272 timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
1273 (wal_receiver_timeout / 2));
1274 if (now >= timeout)
1275 {
1276 requestReply = true;
1277 ping_sent = true;
1278 }
1279 }
1280 }
1281
1282 send_feedback(last_received, requestReply, requestReply);
1283 }
1284 }
1285 }
1286
1287 /*
1288 * Send a Standby Status Update message to server.
1289 *
1290 * 'recvpos' is the latest LSN we've received data to, force is set if we need
1291 * to send a response to avoid timeouts.
1292 */
1293 static void
send_feedback(XLogRecPtr recvpos,bool force,bool requestReply)1294 send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
1295 {
1296 static StringInfo reply_message = NULL;
1297 static TimestampTz send_time = 0;
1298
1299 static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
1300 static XLogRecPtr last_writepos = InvalidXLogRecPtr;
1301 static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
1302
1303 XLogRecPtr writepos;
1304 XLogRecPtr flushpos;
1305 TimestampTz now;
1306 bool have_pending_txes;
1307
1308 /*
1309 * If the user doesn't want status to be reported to the publisher, be
1310 * sure to exit before doing anything at all.
1311 */
1312 if (!force && wal_receiver_status_interval <= 0)
1313 return;
1314
1315 /* It's legal to not pass a recvpos */
1316 if (recvpos < last_recvpos)
1317 recvpos = last_recvpos;
1318
1319 get_flush_position(&writepos, &flushpos, &have_pending_txes);
1320
1321 /*
1322 * No outstanding transactions to flush, we can report the latest received
1323 * position. This is important for synchronous replication.
1324 */
1325 if (!have_pending_txes)
1326 flushpos = writepos = recvpos;
1327
1328 if (writepos < last_writepos)
1329 writepos = last_writepos;
1330
1331 if (flushpos < last_flushpos)
1332 flushpos = last_flushpos;
1333
1334 now = GetCurrentTimestamp();
1335
1336 /* if we've already reported everything we're good */
1337 if (!force &&
1338 writepos == last_writepos &&
1339 flushpos == last_flushpos &&
1340 !TimestampDifferenceExceeds(send_time, now,
1341 wal_receiver_status_interval * 1000))
1342 return;
1343 send_time = now;
1344
1345 if (!reply_message)
1346 {
1347 MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
1348
1349 reply_message = makeStringInfo();
1350 MemoryContextSwitchTo(oldctx);
1351 }
1352 else
1353 resetStringInfo(reply_message);
1354
1355 pq_sendbyte(reply_message, 'r');
1356 pq_sendint64(reply_message, recvpos); /* write */
1357 pq_sendint64(reply_message, flushpos); /* flush */
1358 pq_sendint64(reply_message, writepos); /* apply */
1359 pq_sendint64(reply_message, now); /* sendTime */
1360 pq_sendbyte(reply_message, requestReply); /* replyRequested */
1361
1362 elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
1363 force,
1364 (uint32) (recvpos >> 32), (uint32) recvpos,
1365 (uint32) (writepos >> 32), (uint32) writepos,
1366 (uint32) (flushpos >> 32), (uint32) flushpos
1367 );
1368
1369 walrcv_send(LogRepWorkerWalRcvConn,
1370 reply_message->data, reply_message->len);
1371
1372 if (recvpos > last_recvpos)
1373 last_recvpos = recvpos;
1374 if (writepos > last_writepos)
1375 last_writepos = writepos;
1376 if (flushpos > last_flushpos)
1377 last_flushpos = flushpos;
1378 }
1379
1380 /*
1381 * Reread subscription info if needed. Most changes will be exit.
1382 */
1383 static void
maybe_reread_subscription(void)1384 maybe_reread_subscription(void)
1385 {
1386 MemoryContext oldctx;
1387 Subscription *newsub;
1388 bool started_tx = false;
1389
1390 /* When cache state is valid there is nothing to do here. */
1391 if (MySubscriptionValid)
1392 return;
1393
1394 /* This function might be called inside or outside of transaction. */
1395 if (!IsTransactionState())
1396 {
1397 StartTransactionCommand();
1398 started_tx = true;
1399 }
1400
1401 /* Ensure allocations in permanent context. */
1402 oldctx = MemoryContextSwitchTo(ApplyContext);
1403
1404 newsub = GetSubscription(MyLogicalRepWorker->subid, true);
1405
1406 /*
1407 * Exit if the subscription was removed. This normally should not happen
1408 * as the worker gets killed during DROP SUBSCRIPTION.
1409 */
1410 if (!newsub)
1411 {
1412 ereport(LOG,
1413 (errmsg("logical replication apply worker for subscription \"%s\" will "
1414 "stop because the subscription was removed",
1415 MySubscription->name)));
1416
1417 proc_exit(0);
1418 }
1419
1420 /*
1421 * Exit if the subscription was disabled. This normally should not happen
1422 * as the worker gets killed during ALTER SUBSCRIPTION ... DISABLE.
1423 */
1424 if (!newsub->enabled)
1425 {
1426 ereport(LOG,
1427 (errmsg("logical replication apply worker for subscription \"%s\" will "
1428 "stop because the subscription was disabled",
1429 MySubscription->name)));
1430
1431 proc_exit(0);
1432 }
1433
1434 /*
1435 * Exit if connection string was changed. The launcher will start new
1436 * worker.
1437 */
1438 if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0)
1439 {
1440 ereport(LOG,
1441 (errmsg("logical replication apply worker for subscription \"%s\" will "
1442 "restart because the connection information was changed",
1443 MySubscription->name)));
1444
1445 proc_exit(0);
1446 }
1447
1448 /*
1449 * Exit if subscription name was changed (it's used for
1450 * fallback_application_name). The launcher will start new worker.
1451 */
1452 if (strcmp(newsub->name, MySubscription->name) != 0)
1453 {
1454 ereport(LOG,
1455 (errmsg("logical replication apply worker for subscription \"%s\" will "
1456 "restart because subscription was renamed",
1457 MySubscription->name)));
1458
1459 proc_exit(0);
1460 }
1461
1462 /* !slotname should never happen when enabled is true. */
1463 Assert(newsub->slotname);
1464
1465 /*
1466 * We need to make new connection to new slot if slot name has changed so
1467 * exit here as well if that's the case.
1468 */
1469 if (strcmp(newsub->slotname, MySubscription->slotname) != 0)
1470 {
1471 ereport(LOG,
1472 (errmsg("logical replication apply worker for subscription \"%s\" will "
1473 "restart because the replication slot name was changed",
1474 MySubscription->name)));
1475
1476 proc_exit(0);
1477 }
1478
1479 /*
1480 * Exit if publication list was changed. The launcher will start new
1481 * worker.
1482 */
1483 if (!equal(newsub->publications, MySubscription->publications))
1484 {
1485 ereport(LOG,
1486 (errmsg("logical replication apply worker for subscription \"%s\" will "
1487 "restart because subscription's publications were changed",
1488 MySubscription->name)));
1489
1490 proc_exit(0);
1491 }
1492
1493 /* Check for other changes that should never happen too. */
1494 if (newsub->dbid != MySubscription->dbid)
1495 {
1496 elog(ERROR, "subscription %u changed unexpectedly",
1497 MyLogicalRepWorker->subid);
1498 }
1499
1500 /* Clean old subscription info and switch to new one. */
1501 FreeSubscription(MySubscription);
1502 MySubscription = newsub;
1503
1504 MemoryContextSwitchTo(oldctx);
1505
1506 /* Change synchronous commit according to the user's wishes */
1507 SetConfigOption("synchronous_commit", MySubscription->synccommit,
1508 PGC_BACKEND, PGC_S_OVERRIDE);
1509
1510 if (started_tx)
1511 CommitTransactionCommand();
1512
1513 MySubscriptionValid = true;
1514 }
1515
1516 /*
1517 * Callback from subscription syscache invalidation.
1518 */
1519 static void
subscription_change_cb(Datum arg,int cacheid,uint32 hashvalue)1520 subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
1521 {
1522 MySubscriptionValid = false;
1523 }
1524
1525 /* SIGHUP: set flag to reload configuration at next convenient time */
1526 static void
logicalrep_worker_sighup(SIGNAL_ARGS)1527 logicalrep_worker_sighup(SIGNAL_ARGS)
1528 {
1529 int save_errno = errno;
1530
1531 got_SIGHUP = true;
1532
1533 /* Waken anything waiting on the process latch */
1534 SetLatch(MyLatch);
1535
1536 errno = save_errno;
1537 }
1538
1539 /* Logical Replication Apply worker entry point */
1540 void
ApplyWorkerMain(Datum main_arg)1541 ApplyWorkerMain(Datum main_arg)
1542 {
1543 int worker_slot = DatumGetInt32(main_arg);
1544 MemoryContext oldctx;
1545 char originname[NAMEDATALEN];
1546 XLogRecPtr origin_startpos;
1547 char *myslotname;
1548 WalRcvStreamOptions options;
1549
1550 /* Attach to slot */
1551 logicalrep_worker_attach(worker_slot);
1552
1553 /* Setup signal handling */
1554 pqsignal(SIGHUP, logicalrep_worker_sighup);
1555 pqsignal(SIGTERM, die);
1556 BackgroundWorkerUnblockSignals();
1557
1558 /* Initialise stats to a sanish value */
1559 MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
1560 MyLogicalRepWorker->reply_time = GetCurrentTimestamp();
1561
1562 /* Load the libpq-specific functions */
1563 load_file("libpqwalreceiver", false);
1564
1565 Assert(CurrentResourceOwner == NULL);
1566 CurrentResourceOwner = ResourceOwnerCreate(NULL,
1567 "logical replication apply");
1568
1569 /* Run as replica session replication role. */
1570 SetConfigOption("session_replication_role", "replica",
1571 PGC_SUSET, PGC_S_OVERRIDE);
1572
1573 /* Connect to our database. */
1574 BackgroundWorkerInitializeConnectionByOid(MyLogicalRepWorker->dbid,
1575 MyLogicalRepWorker->userid);
1576
1577 /*
1578 * Set always-secure search path, so malicious users can't redirect user
1579 * code (e.g. pg_index.indexprs).
1580 */
1581 SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
1582
1583 /* Load the subscription into persistent memory context. */
1584 ApplyContext = AllocSetContextCreate(TopMemoryContext,
1585 "ApplyContext",
1586 ALLOCSET_DEFAULT_SIZES);
1587 StartTransactionCommand();
1588 oldctx = MemoryContextSwitchTo(ApplyContext);
1589 MySubscription = GetSubscription(MyLogicalRepWorker->subid, false);
1590 MySubscriptionValid = true;
1591 MemoryContextSwitchTo(oldctx);
1592
1593 /* Setup synchronous commit according to the user's wishes */
1594 SetConfigOption("synchronous_commit", MySubscription->synccommit,
1595 PGC_BACKEND, PGC_S_OVERRIDE);
1596
1597 if (!MySubscription->enabled)
1598 {
1599 ereport(LOG,
1600 (errmsg("logical replication apply worker for subscription \"%s\" will not "
1601 "start because the subscription was disabled during startup",
1602 MySubscription->name)));
1603
1604 proc_exit(0);
1605 }
1606
1607 /* Keep us informed about subscription changes. */
1608 CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
1609 subscription_change_cb,
1610 (Datum) 0);
1611
1612 if (am_tablesync_worker())
1613 ereport(LOG,
1614 (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
1615 MySubscription->name, get_rel_name(MyLogicalRepWorker->relid))));
1616 else
1617 ereport(LOG,
1618 (errmsg("logical replication apply worker for subscription \"%s\" has started",
1619 MySubscription->name)));
1620
1621 CommitTransactionCommand();
1622
1623 /* Connect to the origin and start the replication. */
1624 elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
1625 MySubscription->conninfo);
1626
1627 if (am_tablesync_worker())
1628 {
1629 char *syncslotname;
1630
1631 /* This is table synchroniation worker, call initial sync. */
1632 syncslotname = LogicalRepSyncTableStart(&origin_startpos);
1633
1634 /* The slot name needs to be allocated in permanent memory context. */
1635 oldctx = MemoryContextSwitchTo(ApplyContext);
1636 myslotname = pstrdup(syncslotname);
1637 MemoryContextSwitchTo(oldctx);
1638
1639 pfree(syncslotname);
1640 }
1641 else
1642 {
1643 /* This is main apply worker */
1644 RepOriginId originid;
1645 TimeLineID startpointTLI;
1646 char *err;
1647 int server_version;
1648
1649 myslotname = MySubscription->slotname;
1650
1651 /*
1652 * This shouldn't happen if the subscription is enabled, but guard
1653 * against DDL bugs or manual catalog changes. (libpqwalreceiver will
1654 * crash if slot is NULL.)
1655 */
1656 if (!myslotname)
1657 ereport(ERROR,
1658 (errmsg("subscription has no replication slot set")));
1659
1660 /* Setup replication origin tracking. */
1661 StartTransactionCommand();
1662 snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid);
1663 originid = replorigin_by_name(originname, true);
1664 if (!OidIsValid(originid))
1665 originid = replorigin_create(originname);
1666 replorigin_session_setup(originid);
1667 replorigin_session_origin = originid;
1668 origin_startpos = replorigin_session_get_progress(false);
1669 CommitTransactionCommand();
1670
1671 LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
1672 MySubscription->name, &err);
1673 if (LogRepWorkerWalRcvConn == NULL)
1674 ereport(ERROR,
1675 (errmsg("could not connect to the publisher: %s", err)));
1676
1677 /*
1678 * We don't really use the output identify_system for anything but it
1679 * does some initializations on the upstream so let's still call it.
1680 */
1681 (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI,
1682 &server_version);
1683 }
1684
1685 /*
1686 * Setup callback for syscache so that we know when something changes in
1687 * the subscription relation state.
1688 */
1689 CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
1690 invalidate_syncing_table_states,
1691 (Datum) 0);
1692
1693 /* Build logical replication streaming options. */
1694 options.logical = true;
1695 options.startpoint = origin_startpos;
1696 options.slotname = myslotname;
1697 options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM;
1698 options.proto.logical.publication_names = MySubscription->publications;
1699
1700 /* Start normal logical streaming replication. */
1701 walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
1702
1703 /* Run the main loop. */
1704 LogicalRepApplyLoop(origin_startpos);
1705
1706 proc_exit(0);
1707 }
1708
1709 /*
1710 * Is current process a logical replication worker?
1711 */
1712 bool
IsLogicalWorker(void)1713 IsLogicalWorker(void)
1714 {
1715 return MyLogicalRepWorker != NULL;
1716 }
1717