1 /*-------------------------------------------------------------------------
2 * tablesync.c
3 * PostgreSQL logical replication
4 *
5 * Copyright (c) 2012-2020, PostgreSQL Global Development Group
6 *
7 * IDENTIFICATION
8 * src/backend/replication/logical/tablesync.c
9 *
10 * NOTES
11 * This file contains code for initial table data synchronization for
12 * logical replication.
13 *
14 * The initial data synchronization is done separately for each table,
15 * in a separate apply worker that only fetches the initial snapshot data
16 * from the publisher and then synchronizes the position in the stream with
17 * the main apply worker.
18 *
19 * There are several reasons for doing the synchronization this way:
20 * - It allows us to parallelize the initial data synchronization
21 * which lowers the time needed for it to happen.
22 * - The initial synchronization does not have to hold the xid and LSN
23 * for the time it takes to copy data of all tables, causing less
24 * bloat and lower disk consumption compared to doing the
25 * synchronization in a single process for the whole database.
26 * - It allows us to synchronize any tables added after the initial
27 * synchronization has finished.
28 *
29 * The stream position synchronization works in multiple steps.
30 * - Sync finishes copy and sets worker state as SYNCWAIT and waits for
31 * state to change in a loop.
32 * - Apply periodically checks tables that are synchronizing for SYNCWAIT.
33 * When the desired state appears, it will set the worker state to
34 * CATCHUP and starts loop-waiting until either the table state is set
35 * to SYNCDONE or the sync worker exits.
36 * - After the sync worker has seen the state change to CATCHUP, it will
37 * read the stream and apply changes (acting like an apply worker) until
38 * it catches up to the specified stream position. Then it sets the
39 * state to SYNCDONE. There might be zero changes applied between
40 * CATCHUP and SYNCDONE, because the sync worker might be ahead of the
41 * apply worker.
42 * - Once the state was set to SYNCDONE, the apply will continue tracking
43 * the table until it reaches the SYNCDONE stream position, at which
44 * point it sets state to READY and stops tracking. Again, there might
45 * be zero changes in between.
46 *
47 * So the state progression is always: INIT -> DATASYNC -> SYNCWAIT -> CATCHUP ->
48 * SYNCDONE -> READY.
49 *
50 * The catalog pg_subscription_rel is used to keep information about
51 * subscribed tables and their state. Some transient state during data
52 * synchronization is kept in shared memory. The states SYNCWAIT and
53 * CATCHUP only appear in memory.
54 *
55 * Example flows look like this:
56 * - Apply is in front:
57 * sync:8
58 * -> set in memory SYNCWAIT
59 * apply:10
60 * -> set in memory CATCHUP
61 * -> enter wait-loop
62 * sync:10
63 * -> set in catalog SYNCDONE
64 * -> exit
65 * apply:10
66 * -> exit wait-loop
67 * -> continue rep
68 * apply:11
69 * -> set in catalog READY
70 * - Sync in front:
71 * sync:10
72 * -> set in memory SYNCWAIT
73 * apply:8
74 * -> set in memory CATCHUP
75 * -> continue per-table filtering
76 * sync:10
77 * -> set in catalog SYNCDONE
78 * -> exit
79 * apply:10
80 * -> set in catalog READY
81 * -> stop per-table filtering
82 * -> continue rep
83 *-------------------------------------------------------------------------
84 */
85
86 #include "postgres.h"
87
88 #include "access/table.h"
89 #include "access/xact.h"
90 #include "catalog/pg_subscription_rel.h"
91 #include "catalog/pg_type.h"
92 #include "commands/copy.h"
93 #include "miscadmin.h"
94 #include "parser/parse_relation.h"
95 #include "pgstat.h"
96 #include "replication/logicallauncher.h"
97 #include "replication/logicalrelation.h"
98 #include "replication/walreceiver.h"
99 #include "replication/worker_internal.h"
100 #include "storage/ipc.h"
101 #include "utils/builtins.h"
102 #include "utils/lsyscache.h"
103 #include "utils/memutils.h"
104 #include "utils/snapmgr.h"
105
106 static bool table_states_valid = false;
107
108 StringInfo copybuf = NULL;
109
110 /*
111 * Exit routine for synchronization worker.
112 */
113 static void
pg_attribute_noreturn()114 pg_attribute_noreturn()
115 finish_sync_worker(void)
116 {
117 /*
118 * Commit any outstanding transaction. This is the usual case, unless
119 * there was nothing to do for the table.
120 */
121 if (IsTransactionState())
122 {
123 CommitTransactionCommand();
124 pgstat_report_stat(false);
125 }
126
127 /* And flush all writes. */
128 XLogFlush(GetXLogWriteRecPtr());
129
130 StartTransactionCommand();
131 ereport(LOG,
132 (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
133 MySubscription->name,
134 get_rel_name(MyLogicalRepWorker->relid))));
135 CommitTransactionCommand();
136
137 /* Find the main apply worker and signal it. */
138 logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
139
140 /* Stop gracefully */
141 proc_exit(0);
142 }
143
144 /*
145 * Wait until the relation synchronization state is set in the catalog to the
146 * expected one.
147 *
148 * Used when transitioning from CATCHUP state to SYNCDONE.
149 *
150 * Returns false if the synchronization worker has disappeared or the table state
151 * has been reset.
152 */
153 static bool
wait_for_relation_state_change(Oid relid,char expected_state)154 wait_for_relation_state_change(Oid relid, char expected_state)
155 {
156 char state;
157
158 for (;;)
159 {
160 LogicalRepWorker *worker;
161 XLogRecPtr statelsn;
162
163 CHECK_FOR_INTERRUPTS();
164
165 /* XXX use cache invalidation here to improve performance? */
166 PushActiveSnapshot(GetLatestSnapshot());
167 state = GetSubscriptionRelState(MyLogicalRepWorker->subid,
168 relid, &statelsn, true);
169 PopActiveSnapshot();
170
171 if (state == SUBREL_STATE_UNKNOWN)
172 return false;
173
174 if (state == expected_state)
175 return true;
176
177 /* Check if the sync worker is still running and bail if not. */
178 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
179
180 /* Check if the opposite worker is still running and bail if not. */
181 worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
182 am_tablesync_worker() ? InvalidOid : relid,
183 false);
184 LWLockRelease(LogicalRepWorkerLock);
185 if (!worker)
186 return false;
187
188 (void) WaitLatch(MyLatch,
189 WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
190 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
191
192 ResetLatch(MyLatch);
193 }
194
195 return false;
196 }
197
198 /*
199 * Wait until the apply worker changes the state of our synchronization
200 * worker to the expected one.
201 *
202 * Used when transitioning from SYNCWAIT state to CATCHUP.
203 *
204 * Returns false if the apply worker has disappeared.
205 */
206 static bool
wait_for_worker_state_change(char expected_state)207 wait_for_worker_state_change(char expected_state)
208 {
209 int rc;
210
211 for (;;)
212 {
213 LogicalRepWorker *worker;
214
215 CHECK_FOR_INTERRUPTS();
216
217 /*
218 * Done if already in correct state. (We assume this fetch is atomic
219 * enough to not give a misleading answer if we do it with no lock.)
220 */
221 if (MyLogicalRepWorker->relstate == expected_state)
222 return true;
223
224 /*
225 * Bail out if the apply worker has died, else signal it we're
226 * waiting.
227 */
228 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
229 worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
230 InvalidOid, false);
231 if (worker && worker->proc)
232 logicalrep_worker_wakeup_ptr(worker);
233 LWLockRelease(LogicalRepWorkerLock);
234 if (!worker)
235 break;
236
237 /*
238 * Wait. We expect to get a latch signal back from the apply worker,
239 * but use a timeout in case it dies without sending one.
240 */
241 rc = WaitLatch(MyLatch,
242 WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
243 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
244
245 if (rc & WL_LATCH_SET)
246 ResetLatch(MyLatch);
247 }
248
249 return false;
250 }
251
252 /*
253 * Callback from syscache invalidation.
254 */
255 void
invalidate_syncing_table_states(Datum arg,int cacheid,uint32 hashvalue)256 invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
257 {
258 table_states_valid = false;
259 }
260
261 /*
262 * Handle table synchronization cooperation from the synchronization
263 * worker.
264 *
265 * If the sync worker is in CATCHUP state and reached (or passed) the
266 * predetermined synchronization point in the WAL stream, mark the table as
267 * SYNCDONE and finish.
268 */
269 static void
process_syncing_tables_for_sync(XLogRecPtr current_lsn)270 process_syncing_tables_for_sync(XLogRecPtr current_lsn)
271 {
272 Assert(IsTransactionState());
273
274 SpinLockAcquire(&MyLogicalRepWorker->relmutex);
275
276 if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
277 current_lsn >= MyLogicalRepWorker->relstate_lsn)
278 {
279 TimeLineID tli;
280
281 MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE;
282 MyLogicalRepWorker->relstate_lsn = current_lsn;
283
284 SpinLockRelease(&MyLogicalRepWorker->relmutex);
285
286 UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
287 MyLogicalRepWorker->relid,
288 MyLogicalRepWorker->relstate,
289 MyLogicalRepWorker->relstate_lsn);
290
291 walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
292 finish_sync_worker();
293 }
294 else
295 SpinLockRelease(&MyLogicalRepWorker->relmutex);
296 }
297
298 /*
299 * Handle table synchronization cooperation from the apply worker.
300 *
301 * Walk over all subscription tables that are individually tracked by the
302 * apply process (currently, all that have state other than
303 * SUBREL_STATE_READY) and manage synchronization for them.
304 *
305 * If there are tables that need synchronizing and are not being synchronized
306 * yet, start sync workers for them (if there are free slots for sync
307 * workers). To prevent starting the sync worker for the same relation at a
308 * high frequency after a failure, we store its last start time with each sync
309 * state info. We start the sync worker for the same relation after waiting
310 * at least wal_retrieve_retry_interval.
311 *
312 * For tables that are being synchronized already, check if sync workers
313 * either need action from the apply worker or have finished. This is the
314 * SYNCWAIT to CATCHUP transition.
315 *
316 * If the synchronization position is reached (SYNCDONE), then the table can
317 * be marked as READY and is no longer tracked.
318 */
319 static void
process_syncing_tables_for_apply(XLogRecPtr current_lsn)320 process_syncing_tables_for_apply(XLogRecPtr current_lsn)
321 {
322 struct tablesync_start_time_mapping
323 {
324 Oid relid;
325 TimestampTz last_start_time;
326 };
327 static List *table_states = NIL;
328 static HTAB *last_start_times = NULL;
329 ListCell *lc;
330 bool started_tx = false;
331
332 Assert(!IsTransactionState());
333
334 /* We need up-to-date sync state info for subscription tables here. */
335 if (!table_states_valid)
336 {
337 MemoryContext oldctx;
338 List *rstates;
339 ListCell *lc;
340 SubscriptionRelState *rstate;
341
342 /* Clean the old list. */
343 list_free_deep(table_states);
344 table_states = NIL;
345
346 StartTransactionCommand();
347 started_tx = true;
348
349 /* Fetch all non-ready tables. */
350 rstates = GetSubscriptionNotReadyRelations(MySubscription->oid);
351
352 /* Allocate the tracking info in a permanent memory context. */
353 oldctx = MemoryContextSwitchTo(CacheMemoryContext);
354 foreach(lc, rstates)
355 {
356 rstate = palloc(sizeof(SubscriptionRelState));
357 memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState));
358 table_states = lappend(table_states, rstate);
359 }
360 MemoryContextSwitchTo(oldctx);
361
362 table_states_valid = true;
363 }
364
365 /*
366 * Prepare a hash table for tracking last start times of workers, to avoid
367 * immediate restarts. We don't need it if there are no tables that need
368 * syncing.
369 */
370 if (table_states && !last_start_times)
371 {
372 HASHCTL ctl;
373
374 memset(&ctl, 0, sizeof(ctl));
375 ctl.keysize = sizeof(Oid);
376 ctl.entrysize = sizeof(struct tablesync_start_time_mapping);
377 last_start_times = hash_create("Logical replication table sync worker start times",
378 256, &ctl, HASH_ELEM | HASH_BLOBS);
379 }
380
381 /*
382 * Clean up the hash table when we're done with all tables (just to
383 * release the bit of memory).
384 */
385 else if (!table_states && last_start_times)
386 {
387 hash_destroy(last_start_times);
388 last_start_times = NULL;
389 }
390
391 /*
392 * Process all tables that are being synchronized.
393 */
394 foreach(lc, table_states)
395 {
396 SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
397
398 if (rstate->state == SUBREL_STATE_SYNCDONE)
399 {
400 /*
401 * Apply has caught up to the position where the table sync has
402 * finished. Mark the table as ready so that the apply will just
403 * continue to replicate it normally.
404 */
405 if (current_lsn >= rstate->lsn)
406 {
407 rstate->state = SUBREL_STATE_READY;
408 rstate->lsn = current_lsn;
409 if (!started_tx)
410 {
411 StartTransactionCommand();
412 started_tx = true;
413 }
414
415 UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
416 rstate->relid, rstate->state,
417 rstate->lsn);
418 }
419 }
420 else
421 {
422 LogicalRepWorker *syncworker;
423
424 /*
425 * Look for a sync worker for this relation.
426 */
427 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
428
429 syncworker = logicalrep_worker_find(MyLogicalRepWorker->subid,
430 rstate->relid, false);
431
432 if (syncworker)
433 {
434 /* Found one, update our copy of its state */
435 SpinLockAcquire(&syncworker->relmutex);
436 rstate->state = syncworker->relstate;
437 rstate->lsn = syncworker->relstate_lsn;
438 if (rstate->state == SUBREL_STATE_SYNCWAIT)
439 {
440 /*
441 * Sync worker is waiting for apply. Tell sync worker it
442 * can catchup now.
443 */
444 syncworker->relstate = SUBREL_STATE_CATCHUP;
445 syncworker->relstate_lsn =
446 Max(syncworker->relstate_lsn, current_lsn);
447 }
448 SpinLockRelease(&syncworker->relmutex);
449
450 /* If we told worker to catch up, wait for it. */
451 if (rstate->state == SUBREL_STATE_SYNCWAIT)
452 {
453 /* Signal the sync worker, as it may be waiting for us. */
454 if (syncworker->proc)
455 logicalrep_worker_wakeup_ptr(syncworker);
456
457 /* Now safe to release the LWLock */
458 LWLockRelease(LogicalRepWorkerLock);
459
460 /*
461 * Enter busy loop and wait for synchronization worker to
462 * reach expected state (or die trying).
463 */
464 if (!started_tx)
465 {
466 StartTransactionCommand();
467 started_tx = true;
468 }
469
470 wait_for_relation_state_change(rstate->relid,
471 SUBREL_STATE_SYNCDONE);
472 }
473 else
474 LWLockRelease(LogicalRepWorkerLock);
475 }
476 else
477 {
478 /*
479 * If there is no sync worker for this table yet, count
480 * running sync workers for this subscription, while we have
481 * the lock.
482 */
483 int nsyncworkers =
484 logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
485
486 /* Now safe to release the LWLock */
487 LWLockRelease(LogicalRepWorkerLock);
488
489 /*
490 * If there are free sync worker slot(s), start a new sync
491 * worker for the table.
492 */
493 if (nsyncworkers < max_sync_workers_per_subscription)
494 {
495 TimestampTz now = GetCurrentTimestamp();
496 struct tablesync_start_time_mapping *hentry;
497 bool found;
498
499 hentry = hash_search(last_start_times, &rstate->relid,
500 HASH_ENTER, &found);
501
502 if (!found ||
503 TimestampDifferenceExceeds(hentry->last_start_time, now,
504 wal_retrieve_retry_interval))
505 {
506 logicalrep_worker_launch(MyLogicalRepWorker->dbid,
507 MySubscription->oid,
508 MySubscription->name,
509 MyLogicalRepWorker->userid,
510 rstate->relid);
511 hentry->last_start_time = now;
512 }
513 }
514 }
515 }
516 }
517
518 if (started_tx)
519 {
520 CommitTransactionCommand();
521 pgstat_report_stat(false);
522 }
523 }
524
525 /*
526 * Process possible state change(s) of tables that are being synchronized.
527 */
528 void
process_syncing_tables(XLogRecPtr current_lsn)529 process_syncing_tables(XLogRecPtr current_lsn)
530 {
531 if (am_tablesync_worker())
532 process_syncing_tables_for_sync(current_lsn);
533 else
534 process_syncing_tables_for_apply(current_lsn);
535 }
536
537 /*
538 * Create list of columns for COPY based on logical relation mapping.
539 */
540 static List *
make_copy_attnamelist(LogicalRepRelMapEntry * rel)541 make_copy_attnamelist(LogicalRepRelMapEntry *rel)
542 {
543 List *attnamelist = NIL;
544 int i;
545
546 for (i = 0; i < rel->remoterel.natts; i++)
547 {
548 attnamelist = lappend(attnamelist,
549 makeString(rel->remoterel.attnames[i]));
550 }
551
552
553 return attnamelist;
554 }
555
556 /*
557 * Data source callback for the COPY FROM, which reads from the remote
558 * connection and passes the data back to our local COPY.
559 */
560 static int
copy_read_data(void * outbuf,int minread,int maxread)561 copy_read_data(void *outbuf, int minread, int maxread)
562 {
563 int bytesread = 0;
564 int avail;
565
566 /* If there are some leftover data from previous read, use it. */
567 avail = copybuf->len - copybuf->cursor;
568 if (avail)
569 {
570 if (avail > maxread)
571 avail = maxread;
572 memcpy(outbuf, ©buf->data[copybuf->cursor], avail);
573 copybuf->cursor += avail;
574 maxread -= avail;
575 bytesread += avail;
576 }
577
578 while (maxread > 0 && bytesread < minread)
579 {
580 pgsocket fd = PGINVALID_SOCKET;
581 int len;
582 char *buf = NULL;
583
584 for (;;)
585 {
586 /* Try read the data. */
587 len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
588
589 CHECK_FOR_INTERRUPTS();
590
591 if (len == 0)
592 break;
593 else if (len < 0)
594 return bytesread;
595 else
596 {
597 /* Process the data */
598 copybuf->data = buf;
599 copybuf->len = len;
600 copybuf->cursor = 0;
601
602 avail = copybuf->len - copybuf->cursor;
603 if (avail > maxread)
604 avail = maxread;
605 memcpy(outbuf, ©buf->data[copybuf->cursor], avail);
606 outbuf = (void *) ((char *) outbuf + avail);
607 copybuf->cursor += avail;
608 maxread -= avail;
609 bytesread += avail;
610 }
611
612 if (maxread <= 0 || bytesread >= minread)
613 return bytesread;
614 }
615
616 /*
617 * Wait for more data or latch.
618 */
619 (void) WaitLatchOrSocket(MyLatch,
620 WL_SOCKET_READABLE | WL_LATCH_SET |
621 WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
622 fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA);
623
624 ResetLatch(MyLatch);
625 }
626
627 return bytesread;
628 }
629
630
631 /*
632 * Get information about remote relation in similar fashion the RELATION
633 * message provides during replication.
634 */
635 static void
fetch_remote_table_info(char * nspname,char * relname,LogicalRepRelation * lrel)636 fetch_remote_table_info(char *nspname, char *relname,
637 LogicalRepRelation *lrel)
638 {
639 WalRcvExecResult *res;
640 StringInfoData cmd;
641 TupleTableSlot *slot;
642 Oid tableRow[] = {OIDOID, CHAROID, CHAROID};
643 Oid attrRow[] = {TEXTOID, OIDOID, INT4OID, BOOLOID};
644 bool isnull;
645 int natt;
646
647 lrel->nspname = nspname;
648 lrel->relname = relname;
649
650 /* First fetch Oid and replica identity. */
651 initStringInfo(&cmd);
652 appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind"
653 " FROM pg_catalog.pg_class c"
654 " INNER JOIN pg_catalog.pg_namespace n"
655 " ON (c.relnamespace = n.oid)"
656 " WHERE n.nspname = %s"
657 " AND c.relname = %s",
658 quote_literal_cstr(nspname),
659 quote_literal_cstr(relname));
660 res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
661 lengthof(tableRow), tableRow);
662
663 if (res->status != WALRCV_OK_TUPLES)
664 ereport(ERROR,
665 (errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
666 nspname, relname, res->err)));
667
668 slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
669 if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
670 ereport(ERROR,
671 (errmsg("table \"%s.%s\" not found on publisher",
672 nspname, relname)));
673
674 lrel->remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull));
675 Assert(!isnull);
676 lrel->replident = DatumGetChar(slot_getattr(slot, 2, &isnull));
677 Assert(!isnull);
678 lrel->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
679 Assert(!isnull);
680
681 ExecDropSingleTupleTableSlot(slot);
682 walrcv_clear_result(res);
683
684 /* Now fetch columns. */
685 resetStringInfo(&cmd);
686 appendStringInfo(&cmd,
687 "SELECT a.attname,"
688 " a.atttypid,"
689 " a.atttypmod,"
690 " a.attnum = ANY(i.indkey)"
691 " FROM pg_catalog.pg_attribute a"
692 " LEFT JOIN pg_catalog.pg_index i"
693 " ON (i.indexrelid = pg_get_replica_identity_index(%u))"
694 " WHERE a.attnum > 0::pg_catalog.int2"
695 " AND NOT a.attisdropped %s"
696 " AND a.attrelid = %u"
697 " ORDER BY a.attnum",
698 lrel->remoteid,
699 (walrcv_server_version(LogRepWorkerWalRcvConn) >= 120000 ?
700 "AND a.attgenerated = ''" : ""),
701 lrel->remoteid);
702 res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
703 lengthof(attrRow), attrRow);
704
705 if (res->status != WALRCV_OK_TUPLES)
706 ereport(ERROR,
707 (errmsg("could not fetch table info for table \"%s.%s\": %s",
708 nspname, relname, res->err)));
709
710 /* We don't know the number of rows coming, so allocate enough space. */
711 lrel->attnames = palloc0(MaxTupleAttributeNumber * sizeof(char *));
712 lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid));
713 lrel->attkeys = NULL;
714
715 natt = 0;
716 slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
717 while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
718 {
719 lrel->attnames[natt] =
720 TextDatumGetCString(slot_getattr(slot, 1, &isnull));
721 Assert(!isnull);
722 lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 2, &isnull));
723 Assert(!isnull);
724 if (DatumGetBool(slot_getattr(slot, 4, &isnull)))
725 lrel->attkeys = bms_add_member(lrel->attkeys, natt);
726
727 /* Should never happen. */
728 if (++natt >= MaxTupleAttributeNumber)
729 elog(ERROR, "too many columns in remote table \"%s.%s\"",
730 nspname, relname);
731
732 ExecClearTuple(slot);
733 }
734 ExecDropSingleTupleTableSlot(slot);
735
736 lrel->natts = natt;
737
738 walrcv_clear_result(res);
739 pfree(cmd.data);
740 }
741
742 /*
743 * Copy existing data of a table from publisher.
744 *
745 * Caller is responsible for locking the local relation.
746 */
747 static void
copy_table(Relation rel)748 copy_table(Relation rel)
749 {
750 LogicalRepRelMapEntry *relmapentry;
751 LogicalRepRelation lrel;
752 WalRcvExecResult *res;
753 StringInfoData cmd;
754 CopyState cstate;
755 List *attnamelist;
756 ParseState *pstate;
757
758 /* Get the publisher relation info. */
759 fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
760 RelationGetRelationName(rel), &lrel);
761
762 /* Put the relation into relmap. */
763 logicalrep_relmap_update(&lrel);
764
765 /* Map the publisher relation to local one. */
766 relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
767 Assert(rel == relmapentry->localrel);
768
769 /* Start copy on the publisher. */
770 initStringInfo(&cmd);
771 if (lrel.relkind == RELKIND_RELATION)
772 appendStringInfo(&cmd, "COPY %s TO STDOUT",
773 quote_qualified_identifier(lrel.nspname, lrel.relname));
774 else
775 {
776 /*
777 * For non-tables, we need to do COPY (SELECT ...), but we can't just
778 * do SELECT * because we need to not copy generated columns.
779 */
780 appendStringInfo(&cmd, "COPY (SELECT ");
781 for (int i = 0; i < lrel.natts; i++)
782 {
783 appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
784 if (i < lrel.natts - 1)
785 appendStringInfoString(&cmd, ", ");
786 }
787 appendStringInfo(&cmd, " FROM %s) TO STDOUT",
788 quote_qualified_identifier(lrel.nspname, lrel.relname));
789 }
790 res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
791 pfree(cmd.data);
792 if (res->status != WALRCV_OK_COPY_OUT)
793 ereport(ERROR,
794 (errmsg("could not start initial contents copy for table \"%s.%s\": %s",
795 lrel.nspname, lrel.relname, res->err)));
796 walrcv_clear_result(res);
797
798 copybuf = makeStringInfo();
799
800 pstate = make_parsestate(NULL);
801 (void) addRangeTableEntryForRelation(pstate, rel, AccessShareLock,
802 NULL, false, false);
803
804 attnamelist = make_copy_attnamelist(relmapentry);
805 cstate = BeginCopyFrom(pstate, rel, NULL, false, copy_read_data, attnamelist, NIL);
806
807 /* Do the copy */
808 (void) CopyFrom(cstate);
809
810 logicalrep_rel_close(relmapentry, NoLock);
811 }
812
813 /*
814 * Start syncing the table in the sync worker.
815 *
816 * The returned slot name is palloc'ed in current memory context.
817 */
818 char *
LogicalRepSyncTableStart(XLogRecPtr * origin_startpos)819 LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
820 {
821 char *slotname;
822 char *err;
823 char relstate;
824 XLogRecPtr relstate_lsn;
825
826 /* Check the state of the table synchronization. */
827 StartTransactionCommand();
828 relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid,
829 MyLogicalRepWorker->relid,
830 &relstate_lsn, true);
831 CommitTransactionCommand();
832
833 SpinLockAcquire(&MyLogicalRepWorker->relmutex);
834 MyLogicalRepWorker->relstate = relstate;
835 MyLogicalRepWorker->relstate_lsn = relstate_lsn;
836 SpinLockRelease(&MyLogicalRepWorker->relmutex);
837
838 /*
839 * To build a slot name for the sync work, we are limited to NAMEDATALEN -
840 * 1 characters. We cut the original slot name to NAMEDATALEN - 28 chars
841 * and append _%u_sync_%u (1 + 10 + 6 + 10 + '\0'). (It's actually the
842 * NAMEDATALEN on the remote that matters, but this scheme will also work
843 * reasonably if that is different.)
844 */
845 StaticAssertStmt(NAMEDATALEN >= 32, "NAMEDATALEN too small"); /* for sanity */
846 slotname = psprintf("%.*s_%u_sync_%u",
847 NAMEDATALEN - 28,
848 MySubscription->slotname,
849 MySubscription->oid,
850 MyLogicalRepWorker->relid);
851
852 /*
853 * Here we use the slot name instead of the subscription name as the
854 * application_name, so that it is different from the main apply worker,
855 * so that synchronous replication can distinguish them.
856 */
857 LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
858 slotname, &err);
859 if (LogRepWorkerWalRcvConn == NULL)
860 ereport(ERROR,
861 (errmsg("could not connect to the publisher: %s", err)));
862
863 switch (MyLogicalRepWorker->relstate)
864 {
865 case SUBREL_STATE_INIT:
866 case SUBREL_STATE_DATASYNC:
867 {
868 Relation rel;
869 WalRcvExecResult *res;
870
871 SpinLockAcquire(&MyLogicalRepWorker->relmutex);
872 MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
873 MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr;
874 SpinLockRelease(&MyLogicalRepWorker->relmutex);
875
876 /* Update the state and make it visible to others. */
877 StartTransactionCommand();
878 UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
879 MyLogicalRepWorker->relid,
880 MyLogicalRepWorker->relstate,
881 MyLogicalRepWorker->relstate_lsn);
882 CommitTransactionCommand();
883 pgstat_report_stat(false);
884
885 /*
886 * We want to do the table data sync in a single transaction.
887 */
888 StartTransactionCommand();
889
890 /*
891 * Use a standard write lock here. It might be better to
892 * disallow access to the table while it's being synchronized.
893 * But we don't want to block the main apply process from
894 * working and it has to open the relation in RowExclusiveLock
895 * when remapping remote relation id to local one.
896 */
897 rel = table_open(MyLogicalRepWorker->relid, RowExclusiveLock);
898
899 /*
900 * Create a temporary slot for the sync process. We do this
901 * inside the transaction so that we can use the snapshot made
902 * by the slot to get existing data.
903 */
904 res = walrcv_exec(LogRepWorkerWalRcvConn,
905 "BEGIN READ ONLY ISOLATION LEVEL "
906 "REPEATABLE READ", 0, NULL);
907 if (res->status != WALRCV_OK_COMMAND)
908 ereport(ERROR,
909 (errmsg("table copy could not start transaction on publisher"),
910 errdetail("The error was: %s", res->err)));
911 walrcv_clear_result(res);
912
913 /*
914 * Create new temporary logical decoding slot.
915 *
916 * We'll use slot for data copy so make sure the snapshot is
917 * used for the transaction; that way the COPY will get data
918 * that is consistent with the lsn used by the slot to start
919 * decoding.
920 */
921 walrcv_create_slot(LogRepWorkerWalRcvConn, slotname, true,
922 CRS_USE_SNAPSHOT, origin_startpos);
923
924 PushActiveSnapshot(GetTransactionSnapshot());
925 copy_table(rel);
926 PopActiveSnapshot();
927
928 res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
929 if (res->status != WALRCV_OK_COMMAND)
930 ereport(ERROR,
931 (errmsg("table copy could not finish transaction on publisher"),
932 errdetail("The error was: %s", res->err)));
933 walrcv_clear_result(res);
934
935 table_close(rel, NoLock);
936
937 /* Make the copy visible. */
938 CommandCounterIncrement();
939
940 /*
941 * We are done with the initial data synchronization, update
942 * the state.
943 */
944 SpinLockAcquire(&MyLogicalRepWorker->relmutex);
945 MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
946 MyLogicalRepWorker->relstate_lsn = *origin_startpos;
947 SpinLockRelease(&MyLogicalRepWorker->relmutex);
948
949 /* Wait for main apply worker to tell us to catchup. */
950 wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
951
952 /*----------
953 * There are now two possible states here:
954 * a) Sync is behind the apply. If that's the case we need to
955 * catch up with it by consuming the logical replication
956 * stream up to the relstate_lsn. For that, we exit this
957 * function and continue in ApplyWorkerMain().
958 * b) Sync is caught up with the apply. So it can just set
959 * the state to SYNCDONE and finish.
960 *----------
961 */
962 if (*origin_startpos >= MyLogicalRepWorker->relstate_lsn)
963 {
964 /*
965 * Update the new state in catalog. No need to bother
966 * with the shmem state as we are exiting for good.
967 */
968 UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
969 MyLogicalRepWorker->relid,
970 SUBREL_STATE_SYNCDONE,
971 *origin_startpos);
972 finish_sync_worker();
973 }
974 break;
975 }
976 case SUBREL_STATE_SYNCDONE:
977 case SUBREL_STATE_READY:
978 case SUBREL_STATE_UNKNOWN:
979
980 /*
981 * Nothing to do here but finish. (UNKNOWN means the relation was
982 * removed from pg_subscription_rel before the sync worker could
983 * start.)
984 */
985 finish_sync_worker();
986 break;
987 default:
988 elog(ERROR, "unknown relation state \"%c\"",
989 MyLogicalRepWorker->relstate);
990 }
991
992 return slotname;
993 }
994