1 /*-------------------------------------------------------------------------
2  * tablesync.c
3  *	  PostgreSQL logical replication
4  *
5  * Copyright (c) 2012-2019, PostgreSQL Global Development Group
6  *
7  * IDENTIFICATION
8  *	  src/backend/replication/logical/tablesync.c
9  *
10  * NOTES
11  *	  This file contains code for initial table data synchronization for
12  *	  logical replication.
13  *
14  *	  The initial data synchronization is done separately for each table,
15  *	  in a separate apply worker that only fetches the initial snapshot data
16  *	  from the publisher and then synchronizes the position in the stream with
17  *	  the main apply worker.
18  *
19  *	  There are several reasons for doing the synchronization this way:
20  *	   - It allows us to parallelize the initial data synchronization
21  *		 which lowers the time needed for it to happen.
22  *	   - The initial synchronization does not have to hold the xid and LSN
23  *		 for the time it takes to copy data of all tables, causing less
24  *		 bloat and lower disk consumption compared to doing the
25  *		 synchronization in a single process for the whole database.
26  *	   - It allows us to synchronize any tables added after the initial
27  *		 synchronization has finished.
28  *
29  *	  The stream position synchronization works in multiple steps.
30  *	   - Sync finishes copy and sets worker state as SYNCWAIT and waits for
31  *		 state to change in a loop.
32  *	   - Apply periodically checks tables that are synchronizing for SYNCWAIT.
33  *		 When the desired state appears, it will set the worker state to
34  *		 CATCHUP and starts loop-waiting until either the table state is set
35  *		 to SYNCDONE or the sync worker exits.
36  *	   - After the sync worker has seen the state change to CATCHUP, it will
37  *		 read the stream and apply changes (acting like an apply worker) until
38  *		 it catches up to the specified stream position.  Then it sets the
39  *		 state to SYNCDONE.  There might be zero changes applied between
40  *		 CATCHUP and SYNCDONE, because the sync worker might be ahead of the
41  *		 apply worker.
42  *	   - Once the state was set to SYNCDONE, the apply will continue tracking
43  *		 the table until it reaches the SYNCDONE stream position, at which
44  *		 point it sets state to READY and stops tracking.  Again, there might
45  *		 be zero changes in between.
46  *
47  *	  So the state progression is always: INIT -> DATASYNC -> SYNCWAIT -> CATCHUP ->
48  *	  SYNCDONE -> READY.
49  *
50  *	  The catalog pg_subscription_rel is used to keep information about
51  *	  subscribed tables and their state.  Some transient state during data
52  *	  synchronization is kept in shared memory.  The states SYNCWAIT and
53  *	  CATCHUP only appear in memory.
54  *
55  *	  Example flows look like this:
56  *	   - Apply is in front:
57  *		  sync:8
58  *			-> set in memory SYNCWAIT
59  *		  apply:10
60  *			-> set in memory CATCHUP
61  *			-> enter wait-loop
62  *		  sync:10
63  *			-> set in catalog SYNCDONE
64  *			-> exit
65  *		  apply:10
66  *			-> exit wait-loop
67  *			-> continue rep
68  *		  apply:11
69  *			-> set in catalog READY
70  *	   - Sync in front:
71  *		  sync:10
72  *			-> set in memory SYNCWAIT
73  *		  apply:8
74  *			-> set in memory CATCHUP
75  *			-> continue per-table filtering
76  *		  sync:10
77  *			-> set in catalog SYNCDONE
78  *			-> exit
79  *		  apply:10
80  *			-> set in catalog READY
81  *			-> stop per-table filtering
82  *			-> continue rep
83  *-------------------------------------------------------------------------
84  */
85 
86 #include "postgres.h"
87 
88 #include "miscadmin.h"
89 #include "pgstat.h"
90 
91 #include "access/table.h"
92 #include "access/xact.h"
93 
94 #include "catalog/pg_subscription_rel.h"
95 #include "catalog/pg_type.h"
96 
97 #include "commands/copy.h"
98 
99 #include "parser/parse_relation.h"
100 
101 #include "replication/logicallauncher.h"
102 #include "replication/logicalrelation.h"
103 #include "replication/walreceiver.h"
104 #include "replication/worker_internal.h"
105 
106 #include "utils/snapmgr.h"
107 #include "storage/ipc.h"
108 
109 #include "utils/builtins.h"
110 #include "utils/lsyscache.h"
111 #include "utils/memutils.h"
112 
113 static bool table_states_valid = false;
114 
115 StringInfo	copybuf = NULL;
116 
117 /*
118  * Exit routine for synchronization worker.
119  */
120 static void
pg_attribute_noreturn()121 pg_attribute_noreturn()
122 finish_sync_worker(void)
123 {
124 	/*
125 	 * Commit any outstanding transaction. This is the usual case, unless
126 	 * there was nothing to do for the table.
127 	 */
128 	if (IsTransactionState())
129 	{
130 		CommitTransactionCommand();
131 		pgstat_report_stat(false);
132 	}
133 
134 	/* And flush all writes. */
135 	XLogFlush(GetXLogWriteRecPtr());
136 
137 	StartTransactionCommand();
138 	ereport(LOG,
139 			(errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
140 					MySubscription->name,
141 					get_rel_name(MyLogicalRepWorker->relid))));
142 	CommitTransactionCommand();
143 
144 	/* Find the main apply worker and signal it. */
145 	logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
146 
147 	/* Stop gracefully */
148 	proc_exit(0);
149 }
150 
151 /*
152  * Wait until the relation synchronization state is set in the catalog to the
153  * expected one.
154  *
155  * Used when transitioning from CATCHUP state to SYNCDONE.
156  *
157  * Returns false if the synchronization worker has disappeared or the table state
158  * has been reset.
159  */
160 static bool
wait_for_relation_state_change(Oid relid,char expected_state)161 wait_for_relation_state_change(Oid relid, char expected_state)
162 {
163 	char		state;
164 
165 	for (;;)
166 	{
167 		LogicalRepWorker *worker;
168 		XLogRecPtr	statelsn;
169 
170 		CHECK_FOR_INTERRUPTS();
171 
172 		/* XXX use cache invalidation here to improve performance? */
173 		PushActiveSnapshot(GetLatestSnapshot());
174 		state = GetSubscriptionRelState(MyLogicalRepWorker->subid,
175 										relid, &statelsn, true);
176 		PopActiveSnapshot();
177 
178 		if (state == SUBREL_STATE_UNKNOWN)
179 			return false;
180 
181 		if (state == expected_state)
182 			return true;
183 
184 		/* Check if the sync worker is still running and bail if not. */
185 		LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
186 
187 		/* Check if the opposite worker is still running and bail if not. */
188 		worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
189 										am_tablesync_worker() ? InvalidOid : relid,
190 										false);
191 		LWLockRelease(LogicalRepWorkerLock);
192 		if (!worker)
193 			return false;
194 
195 		(void) WaitLatch(MyLatch,
196 						 WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
197 						 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
198 
199 		ResetLatch(MyLatch);
200 	}
201 
202 	return false;
203 }
204 
205 /*
206  * Wait until the apply worker changes the state of our synchronization
207  * worker to the expected one.
208  *
209  * Used when transitioning from SYNCWAIT state to CATCHUP.
210  *
211  * Returns false if the apply worker has disappeared.
212  */
213 static bool
wait_for_worker_state_change(char expected_state)214 wait_for_worker_state_change(char expected_state)
215 {
216 	int			rc;
217 
218 	for (;;)
219 	{
220 		LogicalRepWorker *worker;
221 
222 		CHECK_FOR_INTERRUPTS();
223 
224 		/*
225 		 * Done if already in correct state.  (We assume this fetch is atomic
226 		 * enough to not give a misleading answer if we do it with no lock.)
227 		 */
228 		if (MyLogicalRepWorker->relstate == expected_state)
229 			return true;
230 
231 		/*
232 		 * Bail out if the apply worker has died, else signal it we're
233 		 * waiting.
234 		 */
235 		LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
236 		worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
237 										InvalidOid, false);
238 		if (worker && worker->proc)
239 			logicalrep_worker_wakeup_ptr(worker);
240 		LWLockRelease(LogicalRepWorkerLock);
241 		if (!worker)
242 			break;
243 
244 		/*
245 		 * Wait.  We expect to get a latch signal back from the apply worker,
246 		 * but use a timeout in case it dies without sending one.
247 		 */
248 		rc = WaitLatch(MyLatch,
249 					   WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
250 					   1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
251 
252 		if (rc & WL_LATCH_SET)
253 			ResetLatch(MyLatch);
254 	}
255 
256 	return false;
257 }
258 
259 /*
260  * Callback from syscache invalidation.
261  */
262 void
invalidate_syncing_table_states(Datum arg,int cacheid,uint32 hashvalue)263 invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
264 {
265 	table_states_valid = false;
266 }
267 
268 /*
269  * Handle table synchronization cooperation from the synchronization
270  * worker.
271  *
272  * If the sync worker is in CATCHUP state and reached (or passed) the
273  * predetermined synchronization point in the WAL stream, mark the table as
274  * SYNCDONE and finish.
275  */
276 static void
process_syncing_tables_for_sync(XLogRecPtr current_lsn)277 process_syncing_tables_for_sync(XLogRecPtr current_lsn)
278 {
279 	Assert(IsTransactionState());
280 
281 	SpinLockAcquire(&MyLogicalRepWorker->relmutex);
282 
283 	if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
284 		current_lsn >= MyLogicalRepWorker->relstate_lsn)
285 	{
286 		TimeLineID	tli;
287 
288 		MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE;
289 		MyLogicalRepWorker->relstate_lsn = current_lsn;
290 
291 		SpinLockRelease(&MyLogicalRepWorker->relmutex);
292 
293 		UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
294 								   MyLogicalRepWorker->relid,
295 								   MyLogicalRepWorker->relstate,
296 								   MyLogicalRepWorker->relstate_lsn);
297 
298 		walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
299 		finish_sync_worker();
300 	}
301 	else
302 		SpinLockRelease(&MyLogicalRepWorker->relmutex);
303 }
304 
305 /*
306  * Handle table synchronization cooperation from the apply worker.
307  *
308  * Walk over all subscription tables that are individually tracked by the
309  * apply process (currently, all that have state other than
310  * SUBREL_STATE_READY) and manage synchronization for them.
311  *
312  * If there are tables that need synchronizing and are not being synchronized
313  * yet, start sync workers for them (if there are free slots for sync
314  * workers).  To prevent starting the sync worker for the same relation at a
315  * high frequency after a failure, we store its last start time with each sync
316  * state info.  We start the sync worker for the same relation after waiting
317  * at least wal_retrieve_retry_interval.
318  *
319  * For tables that are being synchronized already, check if sync workers
320  * either need action from the apply worker or have finished.  This is the
321  * SYNCWAIT to CATCHUP transition.
322  *
323  * If the synchronization position is reached (SYNCDONE), then the table can
324  * be marked as READY and is no longer tracked.
325  */
326 static void
process_syncing_tables_for_apply(XLogRecPtr current_lsn)327 process_syncing_tables_for_apply(XLogRecPtr current_lsn)
328 {
329 	struct tablesync_start_time_mapping
330 	{
331 		Oid			relid;
332 		TimestampTz last_start_time;
333 	};
334 	static List *table_states = NIL;
335 	static HTAB *last_start_times = NULL;
336 	ListCell   *lc;
337 	bool		started_tx = false;
338 
339 	Assert(!IsTransactionState());
340 
341 	/* We need up-to-date sync state info for subscription tables here. */
342 	if (!table_states_valid)
343 	{
344 		MemoryContext oldctx;
345 		List	   *rstates;
346 		ListCell   *lc;
347 		SubscriptionRelState *rstate;
348 
349 		/* Clean the old list. */
350 		list_free_deep(table_states);
351 		table_states = NIL;
352 
353 		StartTransactionCommand();
354 		started_tx = true;
355 
356 		/* Fetch all non-ready tables. */
357 		rstates = GetSubscriptionNotReadyRelations(MySubscription->oid);
358 
359 		/* Allocate the tracking info in a permanent memory context. */
360 		oldctx = MemoryContextSwitchTo(CacheMemoryContext);
361 		foreach(lc, rstates)
362 		{
363 			rstate = palloc(sizeof(SubscriptionRelState));
364 			memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState));
365 			table_states = lappend(table_states, rstate);
366 		}
367 		MemoryContextSwitchTo(oldctx);
368 
369 		table_states_valid = true;
370 	}
371 
372 	/*
373 	 * Prepare a hash table for tracking last start times of workers, to avoid
374 	 * immediate restarts.  We don't need it if there are no tables that need
375 	 * syncing.
376 	 */
377 	if (table_states && !last_start_times)
378 	{
379 		HASHCTL		ctl;
380 
381 		memset(&ctl, 0, sizeof(ctl));
382 		ctl.keysize = sizeof(Oid);
383 		ctl.entrysize = sizeof(struct tablesync_start_time_mapping);
384 		last_start_times = hash_create("Logical replication table sync worker start times",
385 									   256, &ctl, HASH_ELEM | HASH_BLOBS);
386 	}
387 
388 	/*
389 	 * Clean up the hash table when we're done with all tables (just to
390 	 * release the bit of memory).
391 	 */
392 	else if (!table_states && last_start_times)
393 	{
394 		hash_destroy(last_start_times);
395 		last_start_times = NULL;
396 	}
397 
398 	/*
399 	 * Process all tables that are being synchronized.
400 	 */
401 	foreach(lc, table_states)
402 	{
403 		SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
404 
405 		if (rstate->state == SUBREL_STATE_SYNCDONE)
406 		{
407 			/*
408 			 * Apply has caught up to the position where the table sync has
409 			 * finished.  Mark the table as ready so that the apply will just
410 			 * continue to replicate it normally.
411 			 */
412 			if (current_lsn >= rstate->lsn)
413 			{
414 				rstate->state = SUBREL_STATE_READY;
415 				rstate->lsn = current_lsn;
416 				if (!started_tx)
417 				{
418 					StartTransactionCommand();
419 					started_tx = true;
420 				}
421 
422 				UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
423 										   rstate->relid, rstate->state,
424 										   rstate->lsn);
425 			}
426 		}
427 		else
428 		{
429 			LogicalRepWorker *syncworker;
430 
431 			/*
432 			 * Look for a sync worker for this relation.
433 			 */
434 			LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
435 
436 			syncworker = logicalrep_worker_find(MyLogicalRepWorker->subid,
437 												rstate->relid, false);
438 
439 			if (syncworker)
440 			{
441 				/* Found one, update our copy of its state */
442 				SpinLockAcquire(&syncworker->relmutex);
443 				rstate->state = syncworker->relstate;
444 				rstate->lsn = syncworker->relstate_lsn;
445 				if (rstate->state == SUBREL_STATE_SYNCWAIT)
446 				{
447 					/*
448 					 * Sync worker is waiting for apply.  Tell sync worker it
449 					 * can catchup now.
450 					 */
451 					syncworker->relstate = SUBREL_STATE_CATCHUP;
452 					syncworker->relstate_lsn =
453 						Max(syncworker->relstate_lsn, current_lsn);
454 				}
455 				SpinLockRelease(&syncworker->relmutex);
456 
457 				/* If we told worker to catch up, wait for it. */
458 				if (rstate->state == SUBREL_STATE_SYNCWAIT)
459 				{
460 					/* Signal the sync worker, as it may be waiting for us. */
461 					if (syncworker->proc)
462 						logicalrep_worker_wakeup_ptr(syncworker);
463 
464 					/* Now safe to release the LWLock */
465 					LWLockRelease(LogicalRepWorkerLock);
466 
467 					/*
468 					 * Enter busy loop and wait for synchronization worker to
469 					 * reach expected state (or die trying).
470 					 */
471 					if (!started_tx)
472 					{
473 						StartTransactionCommand();
474 						started_tx = true;
475 					}
476 
477 					wait_for_relation_state_change(rstate->relid,
478 												   SUBREL_STATE_SYNCDONE);
479 				}
480 				else
481 					LWLockRelease(LogicalRepWorkerLock);
482 			}
483 			else
484 			{
485 				/*
486 				 * If there is no sync worker for this table yet, count
487 				 * running sync workers for this subscription, while we have
488 				 * the lock.
489 				 */
490 				int			nsyncworkers =
491 				logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
492 
493 				/* Now safe to release the LWLock */
494 				LWLockRelease(LogicalRepWorkerLock);
495 
496 				/*
497 				 * If there are free sync worker slot(s), start a new sync
498 				 * worker for the table.
499 				 */
500 				if (nsyncworkers < max_sync_workers_per_subscription)
501 				{
502 					TimestampTz now = GetCurrentTimestamp();
503 					struct tablesync_start_time_mapping *hentry;
504 					bool		found;
505 
506 					hentry = hash_search(last_start_times, &rstate->relid,
507 										 HASH_ENTER, &found);
508 
509 					if (!found ||
510 						TimestampDifferenceExceeds(hentry->last_start_time, now,
511 												   wal_retrieve_retry_interval))
512 					{
513 						logicalrep_worker_launch(MyLogicalRepWorker->dbid,
514 												 MySubscription->oid,
515 												 MySubscription->name,
516 												 MyLogicalRepWorker->userid,
517 												 rstate->relid);
518 						hentry->last_start_time = now;
519 					}
520 				}
521 			}
522 		}
523 	}
524 
525 	if (started_tx)
526 	{
527 		CommitTransactionCommand();
528 		pgstat_report_stat(false);
529 	}
530 }
531 
532 /*
533  * Process possible state change(s) of tables that are being synchronized.
534  */
535 void
process_syncing_tables(XLogRecPtr current_lsn)536 process_syncing_tables(XLogRecPtr current_lsn)
537 {
538 	if (am_tablesync_worker())
539 		process_syncing_tables_for_sync(current_lsn);
540 	else
541 		process_syncing_tables_for_apply(current_lsn);
542 }
543 
544 /*
545  * Create list of columns for COPY based on logical relation mapping.
546  */
547 static List *
make_copy_attnamelist(LogicalRepRelMapEntry * rel)548 make_copy_attnamelist(LogicalRepRelMapEntry *rel)
549 {
550 	List	   *attnamelist = NIL;
551 	int			i;
552 
553 	for (i = 0; i < rel->remoterel.natts; i++)
554 	{
555 		attnamelist = lappend(attnamelist,
556 							  makeString(rel->remoterel.attnames[i]));
557 	}
558 
559 
560 	return attnamelist;
561 }
562 
563 /*
564  * Data source callback for the COPY FROM, which reads from the remote
565  * connection and passes the data back to our local COPY.
566  */
567 static int
copy_read_data(void * outbuf,int minread,int maxread)568 copy_read_data(void *outbuf, int minread, int maxread)
569 {
570 	int			bytesread = 0;
571 	int			avail;
572 
573 	/* If there are some leftover data from previous read, use it. */
574 	avail = copybuf->len - copybuf->cursor;
575 	if (avail)
576 	{
577 		if (avail > maxread)
578 			avail = maxread;
579 		memcpy(outbuf, &copybuf->data[copybuf->cursor], avail);
580 		copybuf->cursor += avail;
581 		maxread -= avail;
582 		bytesread += avail;
583 	}
584 
585 	while (maxread > 0 && bytesread < minread)
586 	{
587 		pgsocket	fd = PGINVALID_SOCKET;
588 		int			len;
589 		char	   *buf = NULL;
590 
591 		for (;;)
592 		{
593 			/* Try read the data. */
594 			len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
595 
596 			CHECK_FOR_INTERRUPTS();
597 
598 			if (len == 0)
599 				break;
600 			else if (len < 0)
601 				return bytesread;
602 			else
603 			{
604 				/* Process the data */
605 				copybuf->data = buf;
606 				copybuf->len = len;
607 				copybuf->cursor = 0;
608 
609 				avail = copybuf->len - copybuf->cursor;
610 				if (avail > maxread)
611 					avail = maxread;
612 				memcpy(outbuf, &copybuf->data[copybuf->cursor], avail);
613 				outbuf = (void *) ((char *) outbuf + avail);
614 				copybuf->cursor += avail;
615 				maxread -= avail;
616 				bytesread += avail;
617 			}
618 
619 			if (maxread <= 0 || bytesread >= minread)
620 				return bytesread;
621 		}
622 
623 		/*
624 		 * Wait for more data or latch.
625 		 */
626 		(void) WaitLatchOrSocket(MyLatch,
627 								 WL_SOCKET_READABLE | WL_LATCH_SET |
628 								 WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
629 								 fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA);
630 
631 		ResetLatch(MyLatch);
632 	}
633 
634 	return bytesread;
635 }
636 
637 
638 /*
639  * Get information about remote relation in similar fashion the RELATION
640  * message provides during replication.
641  */
642 static void
fetch_remote_table_info(char * nspname,char * relname,LogicalRepRelation * lrel)643 fetch_remote_table_info(char *nspname, char *relname,
644 						LogicalRepRelation *lrel)
645 {
646 	WalRcvExecResult *res;
647 	StringInfoData cmd;
648 	TupleTableSlot *slot;
649 	Oid			tableRow[2] = {OIDOID, CHAROID};
650 	Oid			attrRow[4] = {TEXTOID, OIDOID, INT4OID, BOOLOID};
651 	bool		isnull;
652 	int			natt;
653 
654 	lrel->nspname = nspname;
655 	lrel->relname = relname;
656 
657 	/* First fetch Oid and replica identity. */
658 	initStringInfo(&cmd);
659 	appendStringInfo(&cmd, "SELECT c.oid, c.relreplident"
660 					 "  FROM pg_catalog.pg_class c"
661 					 "  INNER JOIN pg_catalog.pg_namespace n"
662 					 "        ON (c.relnamespace = n.oid)"
663 					 " WHERE n.nspname = %s"
664 					 "   AND c.relname = %s"
665 					 "   AND c.relkind = 'r'",
666 					 quote_literal_cstr(nspname),
667 					 quote_literal_cstr(relname));
668 	res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
669 					  lengthof(tableRow), tableRow);
670 
671 	if (res->status != WALRCV_OK_TUPLES)
672 		ereport(ERROR,
673 				(errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
674 						nspname, relname, res->err)));
675 
676 	slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
677 	if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
678 		ereport(ERROR,
679 				(errmsg("table \"%s.%s\" not found on publisher",
680 						nspname, relname)));
681 
682 	lrel->remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull));
683 	Assert(!isnull);
684 	lrel->replident = DatumGetChar(slot_getattr(slot, 2, &isnull));
685 	Assert(!isnull);
686 
687 	ExecDropSingleTupleTableSlot(slot);
688 	walrcv_clear_result(res);
689 
690 	/* Now fetch columns. */
691 	resetStringInfo(&cmd);
692 	appendStringInfo(&cmd,
693 					 "SELECT a.attname,"
694 					 "       a.atttypid,"
695 					 "       a.atttypmod,"
696 					 "       a.attnum = ANY(i.indkey)"
697 					 "  FROM pg_catalog.pg_attribute a"
698 					 "  LEFT JOIN pg_catalog.pg_index i"
699 					 "       ON (i.indexrelid = pg_get_replica_identity_index(%u))"
700 					 " WHERE a.attnum > 0::pg_catalog.int2"
701 					 "   AND NOT a.attisdropped %s"
702 					 "   AND a.attrelid = %u"
703 					 " ORDER BY a.attnum",
704 					 lrel->remoteid,
705 					 (walrcv_server_version(LogRepWorkerWalRcvConn) >= 120000 ?
706 					  "AND a.attgenerated = ''" : ""),
707 					 lrel->remoteid);
708 	res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
709 					  lengthof(attrRow), attrRow);
710 
711 	if (res->status != WALRCV_OK_TUPLES)
712 		ereport(ERROR,
713 				(errmsg("could not fetch table info for table \"%s.%s\": %s",
714 						nspname, relname, res->err)));
715 
716 	/* We don't know the number of rows coming, so allocate enough space. */
717 	lrel->attnames = palloc0(MaxTupleAttributeNumber * sizeof(char *));
718 	lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid));
719 	lrel->attkeys = NULL;
720 
721 	natt = 0;
722 	slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
723 	while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
724 	{
725 		lrel->attnames[natt] =
726 			TextDatumGetCString(slot_getattr(slot, 1, &isnull));
727 		Assert(!isnull);
728 		lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 2, &isnull));
729 		Assert(!isnull);
730 		if (DatumGetBool(slot_getattr(slot, 4, &isnull)))
731 			lrel->attkeys = bms_add_member(lrel->attkeys, natt);
732 
733 		/* Should never happen. */
734 		if (++natt >= MaxTupleAttributeNumber)
735 			elog(ERROR, "too many columns in remote table \"%s.%s\"",
736 				 nspname, relname);
737 
738 		ExecClearTuple(slot);
739 	}
740 	ExecDropSingleTupleTableSlot(slot);
741 
742 	lrel->natts = natt;
743 
744 	walrcv_clear_result(res);
745 	pfree(cmd.data);
746 }
747 
748 /*
749  * Copy existing data of a table from publisher.
750  *
751  * Caller is responsible for locking the local relation.
752  */
753 static void
copy_table(Relation rel)754 copy_table(Relation rel)
755 {
756 	LogicalRepRelMapEntry *relmapentry;
757 	LogicalRepRelation lrel;
758 	WalRcvExecResult *res;
759 	StringInfoData cmd;
760 	CopyState	cstate;
761 	List	   *attnamelist;
762 	ParseState *pstate;
763 
764 	/* Get the publisher relation info. */
765 	fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
766 							RelationGetRelationName(rel), &lrel);
767 
768 	/* Put the relation into relmap. */
769 	logicalrep_relmap_update(&lrel);
770 
771 	/* Map the publisher relation to local one. */
772 	relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
773 	Assert(rel == relmapentry->localrel);
774 
775 	/* Start copy on the publisher. */
776 	initStringInfo(&cmd);
777 	appendStringInfo(&cmd, "COPY %s TO STDOUT",
778 					 quote_qualified_identifier(lrel.nspname, lrel.relname));
779 	res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
780 	pfree(cmd.data);
781 	if (res->status != WALRCV_OK_COPY_OUT)
782 		ereport(ERROR,
783 				(errmsg("could not start initial contents copy for table \"%s.%s\": %s",
784 						lrel.nspname, lrel.relname, res->err)));
785 	walrcv_clear_result(res);
786 
787 	copybuf = makeStringInfo();
788 
789 	pstate = make_parsestate(NULL);
790 	addRangeTableEntryForRelation(pstate, rel, AccessShareLock,
791 								  NULL, false, false);
792 
793 	attnamelist = make_copy_attnamelist(relmapentry);
794 	cstate = BeginCopyFrom(pstate, rel, NULL, false, copy_read_data, attnamelist, NIL);
795 
796 	/* Do the copy */
797 	(void) CopyFrom(cstate);
798 
799 	logicalrep_rel_close(relmapentry, NoLock);
800 }
801 
802 /*
803  * Start syncing the table in the sync worker.
804  *
805  * The returned slot name is palloc'ed in current memory context.
806  */
807 char *
LogicalRepSyncTableStart(XLogRecPtr * origin_startpos)808 LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
809 {
810 	char	   *slotname;
811 	char	   *err;
812 	char		relstate;
813 	XLogRecPtr	relstate_lsn;
814 
815 	/* Check the state of the table synchronization. */
816 	StartTransactionCommand();
817 	relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid,
818 									   MyLogicalRepWorker->relid,
819 									   &relstate_lsn, true);
820 	CommitTransactionCommand();
821 
822 	SpinLockAcquire(&MyLogicalRepWorker->relmutex);
823 	MyLogicalRepWorker->relstate = relstate;
824 	MyLogicalRepWorker->relstate_lsn = relstate_lsn;
825 	SpinLockRelease(&MyLogicalRepWorker->relmutex);
826 
827 	/*
828 	 * To build a slot name for the sync work, we are limited to NAMEDATALEN -
829 	 * 1 characters.  We cut the original slot name to NAMEDATALEN - 28 chars
830 	 * and append _%u_sync_%u (1 + 10 + 6 + 10 + '\0').  (It's actually the
831 	 * NAMEDATALEN on the remote that matters, but this scheme will also work
832 	 * reasonably if that is different.)
833 	 */
834 	StaticAssertStmt(NAMEDATALEN >= 32, "NAMEDATALEN too small");	/* for sanity */
835 	slotname = psprintf("%.*s_%u_sync_%u",
836 						NAMEDATALEN - 28,
837 						MySubscription->slotname,
838 						MySubscription->oid,
839 						MyLogicalRepWorker->relid);
840 
841 	/*
842 	 * Here we use the slot name instead of the subscription name as the
843 	 * application_name, so that it is different from the main apply worker,
844 	 * so that synchronous replication can distinguish them.
845 	 */
846 	LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
847 											slotname, &err);
848 	if (LogRepWorkerWalRcvConn == NULL)
849 		ereport(ERROR,
850 				(errmsg("could not connect to the publisher: %s", err)));
851 
852 	switch (MyLogicalRepWorker->relstate)
853 	{
854 		case SUBREL_STATE_INIT:
855 		case SUBREL_STATE_DATASYNC:
856 			{
857 				Relation	rel;
858 				WalRcvExecResult *res;
859 
860 				SpinLockAcquire(&MyLogicalRepWorker->relmutex);
861 				MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
862 				MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr;
863 				SpinLockRelease(&MyLogicalRepWorker->relmutex);
864 
865 				/* Update the state and make it visible to others. */
866 				StartTransactionCommand();
867 				UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
868 										   MyLogicalRepWorker->relid,
869 										   MyLogicalRepWorker->relstate,
870 										   MyLogicalRepWorker->relstate_lsn);
871 				CommitTransactionCommand();
872 				pgstat_report_stat(false);
873 
874 				/*
875 				 * We want to do the table data sync in a single transaction.
876 				 */
877 				StartTransactionCommand();
878 
879 				/*
880 				 * Use a standard write lock here. It might be better to
881 				 * disallow access to the table while it's being synchronized.
882 				 * But we don't want to block the main apply process from
883 				 * working and it has to open the relation in RowExclusiveLock
884 				 * when remapping remote relation id to local one.
885 				 */
886 				rel = table_open(MyLogicalRepWorker->relid, RowExclusiveLock);
887 
888 				/*
889 				 * Create a temporary slot for the sync process. We do this
890 				 * inside the transaction so that we can use the snapshot made
891 				 * by the slot to get existing data.
892 				 */
893 				res = walrcv_exec(LogRepWorkerWalRcvConn,
894 								  "BEGIN READ ONLY ISOLATION LEVEL "
895 								  "REPEATABLE READ", 0, NULL);
896 				if (res->status != WALRCV_OK_COMMAND)
897 					ereport(ERROR,
898 							(errmsg("table copy could not start transaction on publisher"),
899 							 errdetail("The error was: %s", res->err)));
900 				walrcv_clear_result(res);
901 
902 				/*
903 				 * Create new temporary logical decoding slot.
904 				 *
905 				 * We'll use slot for data copy so make sure the snapshot is
906 				 * used for the transaction; that way the COPY will get data
907 				 * that is consistent with the lsn used by the slot to start
908 				 * decoding.
909 				 */
910 				walrcv_create_slot(LogRepWorkerWalRcvConn, slotname, true,
911 								   CRS_USE_SNAPSHOT, origin_startpos);
912 
913 				PushActiveSnapshot(GetTransactionSnapshot());
914 				copy_table(rel);
915 				PopActiveSnapshot();
916 
917 				res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
918 				if (res->status != WALRCV_OK_COMMAND)
919 					ereport(ERROR,
920 							(errmsg("table copy could not finish transaction on publisher"),
921 							 errdetail("The error was: %s", res->err)));
922 				walrcv_clear_result(res);
923 
924 				table_close(rel, NoLock);
925 
926 				/* Make the copy visible. */
927 				CommandCounterIncrement();
928 
929 				/*
930 				 * We are done with the initial data synchronization, update
931 				 * the state.
932 				 */
933 				SpinLockAcquire(&MyLogicalRepWorker->relmutex);
934 				MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
935 				MyLogicalRepWorker->relstate_lsn = *origin_startpos;
936 				SpinLockRelease(&MyLogicalRepWorker->relmutex);
937 
938 				/* Wait for main apply worker to tell us to catchup. */
939 				wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
940 
941 				/*----------
942 				 * There are now two possible states here:
943 				 * a) Sync is behind the apply.  If that's the case we need to
944 				 *	  catch up with it by consuming the logical replication
945 				 *	  stream up to the relstate_lsn.  For that, we exit this
946 				 *	  function and continue in ApplyWorkerMain().
947 				 * b) Sync is caught up with the apply.  So it can just set
948 				 *	  the state to SYNCDONE and finish.
949 				 *----------
950 				 */
951 				if (*origin_startpos >= MyLogicalRepWorker->relstate_lsn)
952 				{
953 					/*
954 					 * Update the new state in catalog.  No need to bother
955 					 * with the shmem state as we are exiting for good.
956 					 */
957 					UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
958 											   MyLogicalRepWorker->relid,
959 											   SUBREL_STATE_SYNCDONE,
960 											   *origin_startpos);
961 					finish_sync_worker();
962 				}
963 				break;
964 			}
965 		case SUBREL_STATE_SYNCDONE:
966 		case SUBREL_STATE_READY:
967 		case SUBREL_STATE_UNKNOWN:
968 
969 			/*
970 			 * Nothing to do here but finish.  (UNKNOWN means the relation was
971 			 * removed from pg_subscription_rel before the sync worker could
972 			 * start.)
973 			 */
974 			finish_sync_worker();
975 			break;
976 		default:
977 			elog(ERROR, "unknown relation state \"%c\"",
978 				 MyLogicalRepWorker->relstate);
979 	}
980 
981 	return slotname;
982 }
983