1 /*-------------------------------------------------------------------------
2  * tablesync.c
3  *	  PostgreSQL logical replication
4  *
5  * Copyright (c) 2012-2018, PostgreSQL Global Development Group
6  *
7  * IDENTIFICATION
8  *	  src/backend/replication/logical/tablesync.c
9  *
10  * NOTES
11  *	  This file contains code for initial table data synchronization for
12  *	  logical replication.
13  *
14  *	  The initial data synchronization is done separately for each table,
15  *	  in a separate apply worker that only fetches the initial snapshot data
16  *	  from the publisher and then synchronizes the position in the stream with
17  *	  the main apply worker.
18  *
19  *	  There are several reasons for doing the synchronization this way:
20  *	   - It allows us to parallelize the initial data synchronization
21  *		 which lowers the time needed for it to happen.
22  *	   - The initial synchronization does not have to hold the xid and LSN
23  *		 for the time it takes to copy data of all tables, causing less
24  *		 bloat and lower disk consumption compared to doing the
25  *		 synchronization in a single process for the whole database.
26  *	   - It allows us to synchronize any tables added after the initial
27  *		 synchronization has finished.
28  *
29  *	  The stream position synchronization works in multiple steps.
30  *	   - Sync finishes copy and sets worker state as SYNCWAIT and waits for
31  *		 state to change in a loop.
32  *	   - Apply periodically checks tables that are synchronizing for SYNCWAIT.
33  *		 When the desired state appears, it will set the worker state to
34  *		 CATCHUP and starts loop-waiting until either the table state is set
35  *		 to SYNCDONE or the sync worker exits.
36  *	   - After the sync worker has seen the state change to CATCHUP, it will
37  *		 read the stream and apply changes (acting like an apply worker) until
38  *		 it catches up to the specified stream position.  Then it sets the
39  *		 state to SYNCDONE.  There might be zero changes applied between
40  *		 CATCHUP and SYNCDONE, because the sync worker might be ahead of the
41  *		 apply worker.
42  *	   - Once the state was set to SYNCDONE, the apply will continue tracking
43  *		 the table until it reaches the SYNCDONE stream position, at which
44  *		 point it sets state to READY and stops tracking.  Again, there might
45  *		 be zero changes in between.
46  *
47  *	  So the state progression is always: INIT -> DATASYNC -> SYNCWAIT -> CATCHUP ->
48  *	  SYNCDONE -> READY.
49  *
50  *	  The catalog pg_subscription_rel is used to keep information about
51  *	  subscribed tables and their state.  Some transient state during data
52  *	  synchronization is kept in shared memory.  The states SYNCWAIT and
53  *	  CATCHUP only appear in memory.
54  *
55  *	  Example flows look like this:
56  *	   - Apply is in front:
57  *		  sync:8
58  *			-> set in memory SYNCWAIT
59  *		  apply:10
60  *			-> set in memory CATCHUP
61  *			-> enter wait-loop
62  *		  sync:10
63  *			-> set in catalog SYNCDONE
64  *			-> exit
65  *		  apply:10
66  *			-> exit wait-loop
67  *			-> continue rep
68  *		  apply:11
69  *			-> set in catalog READY
70  *	   - Sync in front:
71  *		  sync:10
72  *			-> set in memory SYNCWAIT
73  *		  apply:8
74  *			-> set in memory CATCHUP
75  *			-> continue per-table filtering
76  *		  sync:10
77  *			-> set in catalog SYNCDONE
78  *			-> exit
79  *		  apply:10
80  *			-> set in catalog READY
81  *			-> stop per-table filtering
82  *			-> continue rep
83  *-------------------------------------------------------------------------
84  */
85 
86 #include "postgres.h"
87 
88 #include "miscadmin.h"
89 #include "pgstat.h"
90 
91 #include "access/xact.h"
92 
93 #include "catalog/pg_subscription_rel.h"
94 #include "catalog/pg_type.h"
95 
96 #include "commands/copy.h"
97 
98 #include "parser/parse_relation.h"
99 
100 #include "replication/logicallauncher.h"
101 #include "replication/logicalrelation.h"
102 #include "replication/walreceiver.h"
103 #include "replication/worker_internal.h"
104 
105 #include "utils/snapmgr.h"
106 #include "storage/ipc.h"
107 
108 #include "utils/builtins.h"
109 #include "utils/lsyscache.h"
110 #include "utils/memutils.h"
111 
112 static bool table_states_valid = false;
113 
114 StringInfo	copybuf = NULL;
115 
116 /*
117  * Exit routine for synchronization worker.
118  */
119 static void
120 pg_attribute_noreturn()
121 finish_sync_worker(void)
122 {
123 	/*
124 	 * Commit any outstanding transaction. This is the usual case, unless
125 	 * there was nothing to do for the table.
126 	 */
127 	if (IsTransactionState())
128 	{
129 		CommitTransactionCommand();
130 		pgstat_report_stat(false);
131 	}
132 
133 	/* And flush all writes. */
134 	XLogFlush(GetXLogWriteRecPtr());
135 
136 	StartTransactionCommand();
137 	ereport(LOG,
138 			(errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
139 					MySubscription->name,
140 					get_rel_name(MyLogicalRepWorker->relid))));
141 	CommitTransactionCommand();
142 
143 	/* Find the main apply worker and signal it. */
144 	logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
145 
146 	/* Stop gracefully */
147 	proc_exit(0);
148 }
149 
150 /*
151  * Wait until the relation synchronization state is set in the catalog to the
152  * expected one.
153  *
154  * Used when transitioning from CATCHUP state to SYNCDONE.
155  *
156  * Returns false if the synchronization worker has disappeared or the table state
157  * has been reset.
158  */
159 static bool
160 wait_for_relation_state_change(Oid relid, char expected_state)
161 {
162 	int			rc;
163 	char		state;
164 
165 	for (;;)
166 	{
167 		LogicalRepWorker *worker;
168 		XLogRecPtr	statelsn;
169 
170 		CHECK_FOR_INTERRUPTS();
171 
172 		/* XXX use cache invalidation here to improve performance? */
173 		PushActiveSnapshot(GetLatestSnapshot());
174 		state = GetSubscriptionRelState(MyLogicalRepWorker->subid,
175 										relid, &statelsn, true);
176 		PopActiveSnapshot();
177 
178 		if (state == SUBREL_STATE_UNKNOWN)
179 			return false;
180 
181 		if (state == expected_state)
182 			return true;
183 
184 		/* Check if the sync worker is still running and bail if not. */
185 		LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
186 
187 		/* Check if the opposite worker is still running and bail if not. */
188 		worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
189 										am_tablesync_worker() ? InvalidOid : relid,
190 										false);
191 		LWLockRelease(LogicalRepWorkerLock);
192 		if (!worker)
193 			return false;
194 
195 		rc = WaitLatch(MyLatch,
196 					   WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
197 					   1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
198 
199 		/* emergency bailout if postmaster has died */
200 		if (rc & WL_POSTMASTER_DEATH)
201 			proc_exit(1);
202 
203 		ResetLatch(MyLatch);
204 	}
205 
206 	return false;
207 }
208 
209 /*
210  * Wait until the apply worker changes the state of our synchronization
211  * worker to the expected one.
212  *
213  * Used when transitioning from SYNCWAIT state to CATCHUP.
214  *
215  * Returns false if the apply worker has disappeared.
216  */
217 static bool
218 wait_for_worker_state_change(char expected_state)
219 {
220 	int			rc;
221 
222 	for (;;)
223 	{
224 		LogicalRepWorker *worker;
225 
226 		CHECK_FOR_INTERRUPTS();
227 
228 		/*
229 		 * Done if already in correct state.  (We assume this fetch is atomic
230 		 * enough to not give a misleading answer if we do it with no lock.)
231 		 */
232 		if (MyLogicalRepWorker->relstate == expected_state)
233 			return true;
234 
235 		/*
236 		 * Bail out if the apply worker has died, else signal it we're
237 		 * waiting.
238 		 */
239 		LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
240 		worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
241 										InvalidOid, false);
242 		if (worker && worker->proc)
243 			logicalrep_worker_wakeup_ptr(worker);
244 		LWLockRelease(LogicalRepWorkerLock);
245 		if (!worker)
246 			break;
247 
248 		/*
249 		 * Wait.  We expect to get a latch signal back from the apply worker,
250 		 * but use a timeout in case it dies without sending one.
251 		 */
252 		rc = WaitLatch(MyLatch,
253 					   WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
254 					   1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
255 
256 		/* emergency bailout if postmaster has died */
257 		if (rc & WL_POSTMASTER_DEATH)
258 			proc_exit(1);
259 
260 		if (rc & WL_LATCH_SET)
261 			ResetLatch(MyLatch);
262 	}
263 
264 	return false;
265 }
266 
267 /*
268  * Callback from syscache invalidation.
269  */
270 void
271 invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
272 {
273 	table_states_valid = false;
274 }
275 
276 /*
277  * Handle table synchronization cooperation from the synchronization
278  * worker.
279  *
280  * If the sync worker is in CATCHUP state and reached (or passed) the
281  * predetermined synchronization point in the WAL stream, mark the table as
282  * SYNCDONE and finish.
283  */
284 static void
285 process_syncing_tables_for_sync(XLogRecPtr current_lsn)
286 {
287 	Assert(IsTransactionState());
288 
289 	SpinLockAcquire(&MyLogicalRepWorker->relmutex);
290 
291 	if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
292 		current_lsn >= MyLogicalRepWorker->relstate_lsn)
293 	{
294 		TimeLineID	tli;
295 
296 		MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE;
297 		MyLogicalRepWorker->relstate_lsn = current_lsn;
298 
299 		SpinLockRelease(&MyLogicalRepWorker->relmutex);
300 
301 		UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
302 								   MyLogicalRepWorker->relid,
303 								   MyLogicalRepWorker->relstate,
304 								   MyLogicalRepWorker->relstate_lsn);
305 
306 		walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
307 		finish_sync_worker();
308 	}
309 	else
310 		SpinLockRelease(&MyLogicalRepWorker->relmutex);
311 }
312 
313 /*
314  * Handle table synchronization cooperation from the apply worker.
315  *
316  * Walk over all subscription tables that are individually tracked by the
317  * apply process (currently, all that have state other than
318  * SUBREL_STATE_READY) and manage synchronization for them.
319  *
320  * If there are tables that need synchronizing and are not being synchronized
321  * yet, start sync workers for them (if there are free slots for sync
322  * workers).  To prevent starting the sync worker for the same relation at a
323  * high frequency after a failure, we store its last start time with each sync
324  * state info.  We start the sync worker for the same relation after waiting
325  * at least wal_retrieve_retry_interval.
326  *
327  * For tables that are being synchronized already, check if sync workers
328  * either need action from the apply worker or have finished.  This is the
329  * SYNCWAIT to CATCHUP transition.
330  *
331  * If the synchronization position is reached (SYNCDONE), then the table can
332  * be marked as READY and is no longer tracked.
333  */
334 static void
335 process_syncing_tables_for_apply(XLogRecPtr current_lsn)
336 {
337 	struct tablesync_start_time_mapping
338 	{
339 		Oid			relid;
340 		TimestampTz last_start_time;
341 	};
342 	static List *table_states = NIL;
343 	static HTAB *last_start_times = NULL;
344 	ListCell   *lc;
345 	bool		started_tx = false;
346 
347 	Assert(!IsTransactionState());
348 
349 	/* We need up-to-date sync state info for subscription tables here. */
350 	if (!table_states_valid)
351 	{
352 		MemoryContext oldctx;
353 		List	   *rstates;
354 		ListCell   *lc;
355 		SubscriptionRelState *rstate;
356 
357 		/* Clean the old list. */
358 		list_free_deep(table_states);
359 		table_states = NIL;
360 
361 		StartTransactionCommand();
362 		started_tx = true;
363 
364 		/* Fetch all non-ready tables. */
365 		rstates = GetSubscriptionNotReadyRelations(MySubscription->oid);
366 
367 		/* Allocate the tracking info in a permanent memory context. */
368 		oldctx = MemoryContextSwitchTo(CacheMemoryContext);
369 		foreach(lc, rstates)
370 		{
371 			rstate = palloc(sizeof(SubscriptionRelState));
372 			memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState));
373 			table_states = lappend(table_states, rstate);
374 		}
375 		MemoryContextSwitchTo(oldctx);
376 
377 		table_states_valid = true;
378 	}
379 
380 	/*
381 	 * Prepare a hash table for tracking last start times of workers, to avoid
382 	 * immediate restarts.  We don't need it if there are no tables that need
383 	 * syncing.
384 	 */
385 	if (table_states && !last_start_times)
386 	{
387 		HASHCTL		ctl;
388 
389 		memset(&ctl, 0, sizeof(ctl));
390 		ctl.keysize = sizeof(Oid);
391 		ctl.entrysize = sizeof(struct tablesync_start_time_mapping);
392 		last_start_times = hash_create("Logical replication table sync worker start times",
393 									   256, &ctl, HASH_ELEM | HASH_BLOBS);
394 	}
395 
396 	/*
397 	 * Clean up the hash table when we're done with all tables (just to
398 	 * release the bit of memory).
399 	 */
400 	else if (!table_states && last_start_times)
401 	{
402 		hash_destroy(last_start_times);
403 		last_start_times = NULL;
404 	}
405 
406 	/*
407 	 * Process all tables that are being synchronized.
408 	 */
409 	foreach(lc, table_states)
410 	{
411 		SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
412 
413 		if (rstate->state == SUBREL_STATE_SYNCDONE)
414 		{
415 			/*
416 			 * Apply has caught up to the position where the table sync has
417 			 * finished.  Mark the table as ready so that the apply will just
418 			 * continue to replicate it normally.
419 			 */
420 			if (current_lsn >= rstate->lsn)
421 			{
422 				rstate->state = SUBREL_STATE_READY;
423 				rstate->lsn = current_lsn;
424 				if (!started_tx)
425 				{
426 					StartTransactionCommand();
427 					started_tx = true;
428 				}
429 
430 				UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
431 										   rstate->relid, rstate->state,
432 										   rstate->lsn);
433 			}
434 		}
435 		else
436 		{
437 			LogicalRepWorker *syncworker;
438 
439 			/*
440 			 * Look for a sync worker for this relation.
441 			 */
442 			LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
443 
444 			syncworker = logicalrep_worker_find(MyLogicalRepWorker->subid,
445 												rstate->relid, false);
446 
447 			if (syncworker)
448 			{
449 				/* Found one, update our copy of its state */
450 				SpinLockAcquire(&syncworker->relmutex);
451 				rstate->state = syncworker->relstate;
452 				rstate->lsn = syncworker->relstate_lsn;
453 				if (rstate->state == SUBREL_STATE_SYNCWAIT)
454 				{
455 					/*
456 					 * Sync worker is waiting for apply.  Tell sync worker it
457 					 * can catchup now.
458 					 */
459 					syncworker->relstate = SUBREL_STATE_CATCHUP;
460 					syncworker->relstate_lsn =
461 						Max(syncworker->relstate_lsn, current_lsn);
462 				}
463 				SpinLockRelease(&syncworker->relmutex);
464 
465 				/* If we told worker to catch up, wait for it. */
466 				if (rstate->state == SUBREL_STATE_SYNCWAIT)
467 				{
468 					/* Signal the sync worker, as it may be waiting for us. */
469 					if (syncworker->proc)
470 						logicalrep_worker_wakeup_ptr(syncworker);
471 
472 					/* Now safe to release the LWLock */
473 					LWLockRelease(LogicalRepWorkerLock);
474 
475 					/*
476 					 * Enter busy loop and wait for synchronization worker to
477 					 * reach expected state (or die trying).
478 					 */
479 					if (!started_tx)
480 					{
481 						StartTransactionCommand();
482 						started_tx = true;
483 					}
484 
485 					wait_for_relation_state_change(rstate->relid,
486 												   SUBREL_STATE_SYNCDONE);
487 				}
488 				else
489 					LWLockRelease(LogicalRepWorkerLock);
490 			}
491 			else
492 			{
493 				/*
494 				 * If there is no sync worker for this table yet, count
495 				 * running sync workers for this subscription, while we have
496 				 * the lock.
497 				 */
498 				int			nsyncworkers =
499 				logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
500 
501 				/* Now safe to release the LWLock */
502 				LWLockRelease(LogicalRepWorkerLock);
503 
504 				/*
505 				 * If there are free sync worker slot(s), start a new sync
506 				 * worker for the table.
507 				 */
508 				if (nsyncworkers < max_sync_workers_per_subscription)
509 				{
510 					TimestampTz now = GetCurrentTimestamp();
511 					struct tablesync_start_time_mapping *hentry;
512 					bool		found;
513 
514 					hentry = hash_search(last_start_times, &rstate->relid,
515 										 HASH_ENTER, &found);
516 
517 					if (!found ||
518 						TimestampDifferenceExceeds(hentry->last_start_time, now,
519 												   wal_retrieve_retry_interval))
520 					{
521 						logicalrep_worker_launch(MyLogicalRepWorker->dbid,
522 												 MySubscription->oid,
523 												 MySubscription->name,
524 												 MyLogicalRepWorker->userid,
525 												 rstate->relid);
526 						hentry->last_start_time = now;
527 					}
528 				}
529 			}
530 		}
531 	}
532 
533 	if (started_tx)
534 	{
535 		CommitTransactionCommand();
536 		pgstat_report_stat(false);
537 	}
538 }
539 
540 /*
541  * Process possible state change(s) of tables that are being synchronized.
542  */
543 void
544 process_syncing_tables(XLogRecPtr current_lsn)
545 {
546 	if (am_tablesync_worker())
547 		process_syncing_tables_for_sync(current_lsn);
548 	else
549 		process_syncing_tables_for_apply(current_lsn);
550 }
551 
552 /*
553  * Create list of columns for COPY based on logical relation mapping.
554  */
555 static List *
556 make_copy_attnamelist(LogicalRepRelMapEntry *rel)
557 {
558 	List	   *attnamelist = NIL;
559 	int			i;
560 
561 	for (i = 0; i < rel->remoterel.natts; i++)
562 	{
563 		attnamelist = lappend(attnamelist,
564 							  makeString(rel->remoterel.attnames[i]));
565 	}
566 
567 
568 	return attnamelist;
569 }
570 
571 /*
572  * Data source callback for the COPY FROM, which reads from the remote
573  * connection and passes the data back to our local COPY.
574  */
575 static int
576 copy_read_data(void *outbuf, int minread, int maxread)
577 {
578 	int			bytesread = 0;
579 	int			avail;
580 
581 	/* If there are some leftover data from previous read, use it. */
582 	avail = copybuf->len - copybuf->cursor;
583 	if (avail)
584 	{
585 		if (avail > maxread)
586 			avail = maxread;
587 		memcpy(outbuf, &copybuf->data[copybuf->cursor], avail);
588 		copybuf->cursor += avail;
589 		maxread -= avail;
590 		bytesread += avail;
591 	}
592 
593 	while (maxread > 0 && bytesread < minread)
594 	{
595 		pgsocket	fd = PGINVALID_SOCKET;
596 		int			rc;
597 		int			len;
598 		char	   *buf = NULL;
599 
600 		for (;;)
601 		{
602 			/* Try read the data. */
603 			len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
604 
605 			CHECK_FOR_INTERRUPTS();
606 
607 			if (len == 0)
608 				break;
609 			else if (len < 0)
610 				return bytesread;
611 			else
612 			{
613 				/* Process the data */
614 				copybuf->data = buf;
615 				copybuf->len = len;
616 				copybuf->cursor = 0;
617 
618 				avail = copybuf->len - copybuf->cursor;
619 				if (avail > maxread)
620 					avail = maxread;
621 				memcpy(outbuf, &copybuf->data[copybuf->cursor], avail);
622 				outbuf = (void *) ((char *) outbuf + avail);
623 				copybuf->cursor += avail;
624 				maxread -= avail;
625 				bytesread += avail;
626 			}
627 
628 			if (maxread <= 0 || bytesread >= minread)
629 				return bytesread;
630 		}
631 
632 		/*
633 		 * Wait for more data or latch.
634 		 */
635 		rc = WaitLatchOrSocket(MyLatch,
636 							   WL_SOCKET_READABLE | WL_LATCH_SET |
637 							   WL_TIMEOUT | WL_POSTMASTER_DEATH,
638 							   fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA);
639 
640 		/* Emergency bailout if postmaster has died */
641 		if (rc & WL_POSTMASTER_DEATH)
642 			proc_exit(1);
643 
644 		ResetLatch(MyLatch);
645 	}
646 
647 	return bytesread;
648 }
649 
650 
651 /*
652  * Get information about remote relation in similar fashion the RELATION
653  * message provides during replication.
654  */
655 static void
656 fetch_remote_table_info(char *nspname, char *relname,
657 						LogicalRepRelation *lrel)
658 {
659 	WalRcvExecResult *res;
660 	StringInfoData cmd;
661 	TupleTableSlot *slot;
662 	Oid			tableRow[2] = {OIDOID, CHAROID};
663 	Oid			attrRow[4] = {TEXTOID, OIDOID, INT4OID, BOOLOID};
664 	bool		isnull;
665 	int			natt;
666 
667 	lrel->nspname = nspname;
668 	lrel->relname = relname;
669 
670 	/* First fetch Oid and replica identity. */
671 	initStringInfo(&cmd);
672 	appendStringInfo(&cmd, "SELECT c.oid, c.relreplident"
673 					 "  FROM pg_catalog.pg_class c"
674 					 "  INNER JOIN pg_catalog.pg_namespace n"
675 					 "        ON (c.relnamespace = n.oid)"
676 					 " WHERE n.nspname = %s"
677 					 "   AND c.relname = %s"
678 					 "   AND c.relkind = 'r'",
679 					 quote_literal_cstr(nspname),
680 					 quote_literal_cstr(relname));
681 	res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
682 					  lengthof(tableRow), tableRow);
683 
684 	if (res->status != WALRCV_OK_TUPLES)
685 		ereport(ERROR,
686 				(errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
687 						nspname, relname, res->err)));
688 
689 	slot = MakeSingleTupleTableSlot(res->tupledesc);
690 	if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
691 		ereport(ERROR,
692 				(errmsg("table \"%s.%s\" not found on publisher",
693 						nspname, relname)));
694 
695 	lrel->remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull));
696 	Assert(!isnull);
697 	lrel->replident = DatumGetChar(slot_getattr(slot, 2, &isnull));
698 	Assert(!isnull);
699 
700 	ExecDropSingleTupleTableSlot(slot);
701 	walrcv_clear_result(res);
702 
703 	/* Now fetch columns. */
704 	resetStringInfo(&cmd);
705 	appendStringInfo(&cmd,
706 					 "SELECT a.attname,"
707 					 "       a.atttypid,"
708 					 "       a.atttypmod,"
709 					 "       a.attnum = ANY(i.indkey)"
710 					 "  FROM pg_catalog.pg_attribute a"
711 					 "  LEFT JOIN pg_catalog.pg_index i"
712 					 "       ON (i.indexrelid = pg_get_replica_identity_index(%u))"
713 					 " WHERE a.attnum > 0::pg_catalog.int2"
714 					 "   AND NOT a.attisdropped"
715 					 "   AND a.attrelid = %u"
716 					 " ORDER BY a.attnum",
717 					 lrel->remoteid, lrel->remoteid);
718 	res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
719 					  lengthof(attrRow), attrRow);
720 
721 	if (res->status != WALRCV_OK_TUPLES)
722 		ereport(ERROR,
723 				(errmsg("could not fetch table info for table \"%s.%s\": %s",
724 						nspname, relname, res->err)));
725 
726 	/* We don't know the number of rows coming, so allocate enough space. */
727 	lrel->attnames = palloc0(MaxTupleAttributeNumber * sizeof(char *));
728 	lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid));
729 	lrel->attkeys = NULL;
730 
731 	natt = 0;
732 	slot = MakeSingleTupleTableSlot(res->tupledesc);
733 	while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
734 	{
735 		lrel->attnames[natt] =
736 			TextDatumGetCString(slot_getattr(slot, 1, &isnull));
737 		Assert(!isnull);
738 		lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 2, &isnull));
739 		Assert(!isnull);
740 		if (DatumGetBool(slot_getattr(slot, 4, &isnull)))
741 			lrel->attkeys = bms_add_member(lrel->attkeys, natt);
742 
743 		/* Should never happen. */
744 		if (++natt >= MaxTupleAttributeNumber)
745 			elog(ERROR, "too many columns in remote table \"%s.%s\"",
746 				 nspname, relname);
747 
748 		ExecClearTuple(slot);
749 	}
750 	ExecDropSingleTupleTableSlot(slot);
751 
752 	lrel->natts = natt;
753 
754 	walrcv_clear_result(res);
755 	pfree(cmd.data);
756 }
757 
758 /*
759  * Copy existing data of a table from publisher.
760  *
761  * Caller is responsible for locking the local relation.
762  */
763 static void
764 copy_table(Relation rel)
765 {
766 	LogicalRepRelMapEntry *relmapentry;
767 	LogicalRepRelation lrel;
768 	WalRcvExecResult *res;
769 	StringInfoData cmd;
770 	CopyState	cstate;
771 	List	   *attnamelist;
772 	ParseState *pstate;
773 
774 	/* Get the publisher relation info. */
775 	fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
776 							RelationGetRelationName(rel), &lrel);
777 
778 	/* Put the relation into relmap. */
779 	logicalrep_relmap_update(&lrel);
780 
781 	/* Map the publisher relation to local one. */
782 	relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
783 	Assert(rel == relmapentry->localrel);
784 
785 	/* Start copy on the publisher. */
786 	initStringInfo(&cmd);
787 	appendStringInfo(&cmd, "COPY %s TO STDOUT",
788 					 quote_qualified_identifier(lrel.nspname, lrel.relname));
789 	res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
790 	pfree(cmd.data);
791 	if (res->status != WALRCV_OK_COPY_OUT)
792 		ereport(ERROR,
793 				(errmsg("could not start initial contents copy for table \"%s.%s\": %s",
794 						lrel.nspname, lrel.relname, res->err)));
795 	walrcv_clear_result(res);
796 
797 	copybuf = makeStringInfo();
798 
799 	pstate = make_parsestate(NULL);
800 	addRangeTableEntryForRelation(pstate, rel, NULL, false, false);
801 
802 	attnamelist = make_copy_attnamelist(relmapentry);
803 	cstate = BeginCopyFrom(pstate, rel, NULL, false, copy_read_data, attnamelist, NIL);
804 
805 	/* Do the copy */
806 	(void) CopyFrom(cstate);
807 
808 	logicalrep_rel_close(relmapentry, NoLock);
809 }
810 
811 /*
812  * Start syncing the table in the sync worker.
813  *
814  * The returned slot name is palloc'ed in current memory context.
815  */
816 char *
817 LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
818 {
819 	char	   *slotname;
820 	char	   *err;
821 	char		relstate;
822 	XLogRecPtr	relstate_lsn;
823 
824 	/* Check the state of the table synchronization. */
825 	StartTransactionCommand();
826 	relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid,
827 									   MyLogicalRepWorker->relid,
828 									   &relstate_lsn, true);
829 	CommitTransactionCommand();
830 
831 	SpinLockAcquire(&MyLogicalRepWorker->relmutex);
832 	MyLogicalRepWorker->relstate = relstate;
833 	MyLogicalRepWorker->relstate_lsn = relstate_lsn;
834 	SpinLockRelease(&MyLogicalRepWorker->relmutex);
835 
836 	/*
837 	 * To build a slot name for the sync work, we are limited to NAMEDATALEN -
838 	 * 1 characters.  We cut the original slot name to NAMEDATALEN - 28 chars
839 	 * and append _%u_sync_%u (1 + 10 + 6 + 10 + '\0').  (It's actually the
840 	 * NAMEDATALEN on the remote that matters, but this scheme will also work
841 	 * reasonably if that is different.)
842 	 */
843 	StaticAssertStmt(NAMEDATALEN >= 32, "NAMEDATALEN too small");	/* for sanity */
844 	slotname = psprintf("%.*s_%u_sync_%u",
845 						NAMEDATALEN - 28,
846 						MySubscription->slotname,
847 						MySubscription->oid,
848 						MyLogicalRepWorker->relid);
849 
850 	/*
851 	 * Here we use the slot name instead of the subscription name as the
852 	 * application_name, so that it is different from the main apply worker,
853 	 * so that synchronous replication can distinguish them.
854 	 */
855 	LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
856 											slotname, &err);
857 	if (LogRepWorkerWalRcvConn == NULL)
858 		ereport(ERROR,
859 				(errmsg("could not connect to the publisher: %s", err)));
860 
861 	switch (MyLogicalRepWorker->relstate)
862 	{
863 		case SUBREL_STATE_INIT:
864 		case SUBREL_STATE_DATASYNC:
865 			{
866 				Relation	rel;
867 				WalRcvExecResult *res;
868 
869 				SpinLockAcquire(&MyLogicalRepWorker->relmutex);
870 				MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
871 				MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr;
872 				SpinLockRelease(&MyLogicalRepWorker->relmutex);
873 
874 				/* Update the state and make it visible to others. */
875 				StartTransactionCommand();
876 				UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
877 										   MyLogicalRepWorker->relid,
878 										   MyLogicalRepWorker->relstate,
879 										   MyLogicalRepWorker->relstate_lsn);
880 				CommitTransactionCommand();
881 				pgstat_report_stat(false);
882 
883 				/*
884 				 * We want to do the table data sync in a single transaction.
885 				 */
886 				StartTransactionCommand();
887 
888 				/*
889 				 * Use a standard write lock here. It might be better to
890 				 * disallow access to the table while it's being synchronized.
891 				 * But we don't want to block the main apply process from
892 				 * working and it has to open the relation in RowExclusiveLock
893 				 * when remapping remote relation id to local one.
894 				 */
895 				rel = heap_open(MyLogicalRepWorker->relid, RowExclusiveLock);
896 
897 				/*
898 				 * Create a temporary slot for the sync process. We do this
899 				 * inside the transaction so that we can use the snapshot made
900 				 * by the slot to get existing data.
901 				 */
902 				res = walrcv_exec(LogRepWorkerWalRcvConn,
903 								  "BEGIN READ ONLY ISOLATION LEVEL "
904 								  "REPEATABLE READ", 0, NULL);
905 				if (res->status != WALRCV_OK_COMMAND)
906 					ereport(ERROR,
907 							(errmsg("table copy could not start transaction on publisher"),
908 							 errdetail("The error was: %s", res->err)));
909 				walrcv_clear_result(res);
910 
911 				/*
912 				 * Create new temporary logical decoding slot.
913 				 *
914 				 * We'll use slot for data copy so make sure the snapshot is
915 				 * used for the transaction; that way the COPY will get data
916 				 * that is consistent with the lsn used by the slot to start
917 				 * decoding.
918 				 */
919 				walrcv_create_slot(LogRepWorkerWalRcvConn, slotname, true,
920 								   CRS_USE_SNAPSHOT, origin_startpos);
921 
922 				PushActiveSnapshot(GetTransactionSnapshot());
923 				copy_table(rel);
924 				PopActiveSnapshot();
925 
926 				res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
927 				if (res->status != WALRCV_OK_COMMAND)
928 					ereport(ERROR,
929 							(errmsg("table copy could not finish transaction on publisher"),
930 							 errdetail("The error was: %s", res->err)));
931 				walrcv_clear_result(res);
932 
933 				heap_close(rel, NoLock);
934 
935 				/* Make the copy visible. */
936 				CommandCounterIncrement();
937 
938 				/*
939 				 * We are done with the initial data synchronization, update
940 				 * the state.
941 				 */
942 				SpinLockAcquire(&MyLogicalRepWorker->relmutex);
943 				MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
944 				MyLogicalRepWorker->relstate_lsn = *origin_startpos;
945 				SpinLockRelease(&MyLogicalRepWorker->relmutex);
946 
947 				/* Wait for main apply worker to tell us to catchup. */
948 				wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
949 
950 				/*----------
951 				 * There are now two possible states here:
952 				 * a) Sync is behind the apply.  If that's the case we need to
953 				 *	  catch up with it by consuming the logical replication
954 				 *	  stream up to the relstate_lsn.  For that, we exit this
955 				 *	  function and continue in ApplyWorkerMain().
956 				 * b) Sync is caught up with the apply.  So it can just set
957 				 *	  the state to SYNCDONE and finish.
958 				 *----------
959 				 */
960 				if (*origin_startpos >= MyLogicalRepWorker->relstate_lsn)
961 				{
962 					/*
963 					 * Update the new state in catalog.  No need to bother
964 					 * with the shmem state as we are exiting for good.
965 					 */
966 					UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
967 											   MyLogicalRepWorker->relid,
968 											   SUBREL_STATE_SYNCDONE,
969 											   *origin_startpos);
970 					finish_sync_worker();
971 				}
972 				break;
973 			}
974 		case SUBREL_STATE_SYNCDONE:
975 		case SUBREL_STATE_READY:
976 		case SUBREL_STATE_UNKNOWN:
977 
978 			/*
979 			 * Nothing to do here but finish.  (UNKNOWN means the relation was
980 			 * removed from pg_subscription_rel before the sync worker could
981 			 * start.)
982 			 */
983 			finish_sync_worker();
984 			break;
985 		default:
986 			elog(ERROR, "unknown relation state \"%c\"",
987 				 MyLogicalRepWorker->relstate);
988 	}
989 
990 	return slotname;
991 }
992