1 /*-------------------------------------------------------------------------
2  * tablesync.c
3  *	  PostgreSQL logical replication
4  *
5  * Copyright (c) 2012-2020, PostgreSQL Global Development Group
6  *
7  * IDENTIFICATION
8  *	  src/backend/replication/logical/tablesync.c
9  *
10  * NOTES
11  *	  This file contains code for initial table data synchronization for
12  *	  logical replication.
13  *
14  *	  The initial data synchronization is done separately for each table,
15  *	  in a separate apply worker that only fetches the initial snapshot data
16  *	  from the publisher and then synchronizes the position in the stream with
17  *	  the main apply worker.
18  *
19  *	  There are several reasons for doing the synchronization this way:
20  *	   - It allows us to parallelize the initial data synchronization
21  *		 which lowers the time needed for it to happen.
22  *	   - The initial synchronization does not have to hold the xid and LSN
23  *		 for the time it takes to copy data of all tables, causing less
24  *		 bloat and lower disk consumption compared to doing the
25  *		 synchronization in a single process for the whole database.
26  *	   - It allows us to synchronize any tables added after the initial
27  *		 synchronization has finished.
28  *
29  *	  The stream position synchronization works in multiple steps.
30  *	   - Sync finishes copy and sets worker state as SYNCWAIT and waits for
31  *		 state to change in a loop.
32  *	   - Apply periodically checks tables that are synchronizing for SYNCWAIT.
33  *		 When the desired state appears, it will set the worker state to
34  *		 CATCHUP and starts loop-waiting until either the table state is set
35  *		 to SYNCDONE or the sync worker exits.
36  *	   - After the sync worker has seen the state change to CATCHUP, it will
37  *		 read the stream and apply changes (acting like an apply worker) until
38  *		 it catches up to the specified stream position.  Then it sets the
39  *		 state to SYNCDONE.  There might be zero changes applied between
40  *		 CATCHUP and SYNCDONE, because the sync worker might be ahead of the
41  *		 apply worker.
42  *	   - Once the state was set to SYNCDONE, the apply will continue tracking
43  *		 the table until it reaches the SYNCDONE stream position, at which
44  *		 point it sets state to READY and stops tracking.  Again, there might
45  *		 be zero changes in between.
46  *
47  *	  So the state progression is always: INIT -> DATASYNC -> SYNCWAIT -> CATCHUP ->
48  *	  SYNCDONE -> READY.
49  *
50  *	  The catalog pg_subscription_rel is used to keep information about
51  *	  subscribed tables and their state.  Some transient state during data
52  *	  synchronization is kept in shared memory.  The states SYNCWAIT and
53  *	  CATCHUP only appear in memory.
54  *
55  *	  Example flows look like this:
56  *	   - Apply is in front:
57  *		  sync:8
58  *			-> set in memory SYNCWAIT
59  *		  apply:10
60  *			-> set in memory CATCHUP
61  *			-> enter wait-loop
62  *		  sync:10
63  *			-> set in catalog SYNCDONE
64  *			-> exit
65  *		  apply:10
66  *			-> exit wait-loop
67  *			-> continue rep
68  *		  apply:11
69  *			-> set in catalog READY
70  *	   - Sync in front:
71  *		  sync:10
72  *			-> set in memory SYNCWAIT
73  *		  apply:8
74  *			-> set in memory CATCHUP
75  *			-> continue per-table filtering
76  *		  sync:10
77  *			-> set in catalog SYNCDONE
78  *			-> exit
79  *		  apply:10
80  *			-> set in catalog READY
81  *			-> stop per-table filtering
82  *			-> continue rep
83  *-------------------------------------------------------------------------
84  */
85 
86 #include "postgres.h"
87 
88 #include "access/table.h"
89 #include "access/xact.h"
90 #include "catalog/pg_subscription_rel.h"
91 #include "catalog/pg_type.h"
92 #include "commands/copy.h"
93 #include "miscadmin.h"
94 #include "parser/parse_relation.h"
95 #include "pgstat.h"
96 #include "replication/logicallauncher.h"
97 #include "replication/logicalrelation.h"
98 #include "replication/walreceiver.h"
99 #include "replication/worker_internal.h"
100 #include "storage/ipc.h"
101 #include "utils/builtins.h"
102 #include "utils/lsyscache.h"
103 #include "utils/memutils.h"
104 #include "utils/snapmgr.h"
105 
106 static bool table_states_valid = false;
107 
108 StringInfo	copybuf = NULL;
109 
110 /*
111  * Exit routine for synchronization worker.
112  */
113 static void
pg_attribute_noreturn()114 pg_attribute_noreturn()
115 finish_sync_worker(void)
116 {
117 	/*
118 	 * Commit any outstanding transaction. This is the usual case, unless
119 	 * there was nothing to do for the table.
120 	 */
121 	if (IsTransactionState())
122 	{
123 		CommitTransactionCommand();
124 		pgstat_report_stat(false);
125 	}
126 
127 	/* And flush all writes. */
128 	XLogFlush(GetXLogWriteRecPtr());
129 
130 	StartTransactionCommand();
131 	ereport(LOG,
132 			(errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
133 					MySubscription->name,
134 					get_rel_name(MyLogicalRepWorker->relid))));
135 	CommitTransactionCommand();
136 
137 	/* Find the main apply worker and signal it. */
138 	logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
139 
140 	/* Stop gracefully */
141 	proc_exit(0);
142 }
143 
144 /*
145  * Wait until the relation synchronization state is set in the catalog to the
146  * expected one.
147  *
148  * Used when transitioning from CATCHUP state to SYNCDONE.
149  *
150  * Returns false if the synchronization worker has disappeared or the table state
151  * has been reset.
152  */
153 static bool
wait_for_relation_state_change(Oid relid,char expected_state)154 wait_for_relation_state_change(Oid relid, char expected_state)
155 {
156 	char		state;
157 
158 	for (;;)
159 	{
160 		LogicalRepWorker *worker;
161 		XLogRecPtr	statelsn;
162 
163 		CHECK_FOR_INTERRUPTS();
164 
165 		/* XXX use cache invalidation here to improve performance? */
166 		PushActiveSnapshot(GetLatestSnapshot());
167 		state = GetSubscriptionRelState(MyLogicalRepWorker->subid,
168 										relid, &statelsn, true);
169 		PopActiveSnapshot();
170 
171 		if (state == SUBREL_STATE_UNKNOWN)
172 			return false;
173 
174 		if (state == expected_state)
175 			return true;
176 
177 		/* Check if the sync worker is still running and bail if not. */
178 		LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
179 
180 		/* Check if the opposite worker is still running and bail if not. */
181 		worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
182 										am_tablesync_worker() ? InvalidOid : relid,
183 										false);
184 		LWLockRelease(LogicalRepWorkerLock);
185 		if (!worker)
186 			return false;
187 
188 		(void) WaitLatch(MyLatch,
189 						 WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
190 						 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
191 
192 		ResetLatch(MyLatch);
193 	}
194 
195 	return false;
196 }
197 
198 /*
199  * Wait until the apply worker changes the state of our synchronization
200  * worker to the expected one.
201  *
202  * Used when transitioning from SYNCWAIT state to CATCHUP.
203  *
204  * Returns false if the apply worker has disappeared.
205  */
206 static bool
wait_for_worker_state_change(char expected_state)207 wait_for_worker_state_change(char expected_state)
208 {
209 	int			rc;
210 
211 	for (;;)
212 	{
213 		LogicalRepWorker *worker;
214 
215 		CHECK_FOR_INTERRUPTS();
216 
217 		/*
218 		 * Done if already in correct state.  (We assume this fetch is atomic
219 		 * enough to not give a misleading answer if we do it with no lock.)
220 		 */
221 		if (MyLogicalRepWorker->relstate == expected_state)
222 			return true;
223 
224 		/*
225 		 * Bail out if the apply worker has died, else signal it we're
226 		 * waiting.
227 		 */
228 		LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
229 		worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
230 										InvalidOid, false);
231 		if (worker && worker->proc)
232 			logicalrep_worker_wakeup_ptr(worker);
233 		LWLockRelease(LogicalRepWorkerLock);
234 		if (!worker)
235 			break;
236 
237 		/*
238 		 * Wait.  We expect to get a latch signal back from the apply worker,
239 		 * but use a timeout in case it dies without sending one.
240 		 */
241 		rc = WaitLatch(MyLatch,
242 					   WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
243 					   1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
244 
245 		if (rc & WL_LATCH_SET)
246 			ResetLatch(MyLatch);
247 	}
248 
249 	return false;
250 }
251 
252 /*
253  * Callback from syscache invalidation.
254  */
255 void
invalidate_syncing_table_states(Datum arg,int cacheid,uint32 hashvalue)256 invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
257 {
258 	table_states_valid = false;
259 }
260 
261 /*
262  * Handle table synchronization cooperation from the synchronization
263  * worker.
264  *
265  * If the sync worker is in CATCHUP state and reached (or passed) the
266  * predetermined synchronization point in the WAL stream, mark the table as
267  * SYNCDONE and finish.
268  */
269 static void
process_syncing_tables_for_sync(XLogRecPtr current_lsn)270 process_syncing_tables_for_sync(XLogRecPtr current_lsn)
271 {
272 	Assert(IsTransactionState());
273 
274 	SpinLockAcquire(&MyLogicalRepWorker->relmutex);
275 
276 	if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
277 		current_lsn >= MyLogicalRepWorker->relstate_lsn)
278 	{
279 		TimeLineID	tli;
280 
281 		MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE;
282 		MyLogicalRepWorker->relstate_lsn = current_lsn;
283 
284 		SpinLockRelease(&MyLogicalRepWorker->relmutex);
285 
286 		UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
287 								   MyLogicalRepWorker->relid,
288 								   MyLogicalRepWorker->relstate,
289 								   MyLogicalRepWorker->relstate_lsn);
290 
291 		walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
292 		finish_sync_worker();
293 	}
294 	else
295 		SpinLockRelease(&MyLogicalRepWorker->relmutex);
296 }
297 
298 /*
299  * Handle table synchronization cooperation from the apply worker.
300  *
301  * Walk over all subscription tables that are individually tracked by the
302  * apply process (currently, all that have state other than
303  * SUBREL_STATE_READY) and manage synchronization for them.
304  *
305  * If there are tables that need synchronizing and are not being synchronized
306  * yet, start sync workers for them (if there are free slots for sync
307  * workers).  To prevent starting the sync worker for the same relation at a
308  * high frequency after a failure, we store its last start time with each sync
309  * state info.  We start the sync worker for the same relation after waiting
310  * at least wal_retrieve_retry_interval.
311  *
312  * For tables that are being synchronized already, check if sync workers
313  * either need action from the apply worker or have finished.  This is the
314  * SYNCWAIT to CATCHUP transition.
315  *
316  * If the synchronization position is reached (SYNCDONE), then the table can
317  * be marked as READY and is no longer tracked.
318  */
319 static void
process_syncing_tables_for_apply(XLogRecPtr current_lsn)320 process_syncing_tables_for_apply(XLogRecPtr current_lsn)
321 {
322 	struct tablesync_start_time_mapping
323 	{
324 		Oid			relid;
325 		TimestampTz last_start_time;
326 	};
327 	static List *table_states = NIL;
328 	static HTAB *last_start_times = NULL;
329 	ListCell   *lc;
330 	bool		started_tx = false;
331 
332 	Assert(!IsTransactionState());
333 
334 	/* We need up-to-date sync state info for subscription tables here. */
335 	if (!table_states_valid)
336 	{
337 		MemoryContext oldctx;
338 		List	   *rstates;
339 		ListCell   *lc;
340 		SubscriptionRelState *rstate;
341 
342 		/* Clean the old list. */
343 		list_free_deep(table_states);
344 		table_states = NIL;
345 
346 		StartTransactionCommand();
347 		started_tx = true;
348 
349 		/* Fetch all non-ready tables. */
350 		rstates = GetSubscriptionNotReadyRelations(MySubscription->oid);
351 
352 		/* Allocate the tracking info in a permanent memory context. */
353 		oldctx = MemoryContextSwitchTo(CacheMemoryContext);
354 		foreach(lc, rstates)
355 		{
356 			rstate = palloc(sizeof(SubscriptionRelState));
357 			memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState));
358 			table_states = lappend(table_states, rstate);
359 		}
360 		MemoryContextSwitchTo(oldctx);
361 
362 		table_states_valid = true;
363 	}
364 
365 	/*
366 	 * Prepare a hash table for tracking last start times of workers, to avoid
367 	 * immediate restarts.  We don't need it if there are no tables that need
368 	 * syncing.
369 	 */
370 	if (table_states && !last_start_times)
371 	{
372 		HASHCTL		ctl;
373 
374 		memset(&ctl, 0, sizeof(ctl));
375 		ctl.keysize = sizeof(Oid);
376 		ctl.entrysize = sizeof(struct tablesync_start_time_mapping);
377 		last_start_times = hash_create("Logical replication table sync worker start times",
378 									   256, &ctl, HASH_ELEM | HASH_BLOBS);
379 	}
380 
381 	/*
382 	 * Clean up the hash table when we're done with all tables (just to
383 	 * release the bit of memory).
384 	 */
385 	else if (!table_states && last_start_times)
386 	{
387 		hash_destroy(last_start_times);
388 		last_start_times = NULL;
389 	}
390 
391 	/*
392 	 * Process all tables that are being synchronized.
393 	 */
394 	foreach(lc, table_states)
395 	{
396 		SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
397 
398 		if (rstate->state == SUBREL_STATE_SYNCDONE)
399 		{
400 			/*
401 			 * Apply has caught up to the position where the table sync has
402 			 * finished.  Mark the table as ready so that the apply will just
403 			 * continue to replicate it normally.
404 			 */
405 			if (current_lsn >= rstate->lsn)
406 			{
407 				rstate->state = SUBREL_STATE_READY;
408 				rstate->lsn = current_lsn;
409 				if (!started_tx)
410 				{
411 					StartTransactionCommand();
412 					started_tx = true;
413 				}
414 
415 				UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
416 										   rstate->relid, rstate->state,
417 										   rstate->lsn);
418 			}
419 		}
420 		else
421 		{
422 			LogicalRepWorker *syncworker;
423 
424 			/*
425 			 * Look for a sync worker for this relation.
426 			 */
427 			LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
428 
429 			syncworker = logicalrep_worker_find(MyLogicalRepWorker->subid,
430 												rstate->relid, false);
431 
432 			if (syncworker)
433 			{
434 				/* Found one, update our copy of its state */
435 				SpinLockAcquire(&syncworker->relmutex);
436 				rstate->state = syncworker->relstate;
437 				rstate->lsn = syncworker->relstate_lsn;
438 				if (rstate->state == SUBREL_STATE_SYNCWAIT)
439 				{
440 					/*
441 					 * Sync worker is waiting for apply.  Tell sync worker it
442 					 * can catchup now.
443 					 */
444 					syncworker->relstate = SUBREL_STATE_CATCHUP;
445 					syncworker->relstate_lsn =
446 						Max(syncworker->relstate_lsn, current_lsn);
447 				}
448 				SpinLockRelease(&syncworker->relmutex);
449 
450 				/* If we told worker to catch up, wait for it. */
451 				if (rstate->state == SUBREL_STATE_SYNCWAIT)
452 				{
453 					/* Signal the sync worker, as it may be waiting for us. */
454 					if (syncworker->proc)
455 						logicalrep_worker_wakeup_ptr(syncworker);
456 
457 					/* Now safe to release the LWLock */
458 					LWLockRelease(LogicalRepWorkerLock);
459 
460 					/*
461 					 * Enter busy loop and wait for synchronization worker to
462 					 * reach expected state (or die trying).
463 					 */
464 					if (!started_tx)
465 					{
466 						StartTransactionCommand();
467 						started_tx = true;
468 					}
469 
470 					wait_for_relation_state_change(rstate->relid,
471 												   SUBREL_STATE_SYNCDONE);
472 				}
473 				else
474 					LWLockRelease(LogicalRepWorkerLock);
475 			}
476 			else
477 			{
478 				/*
479 				 * If there is no sync worker for this table yet, count
480 				 * running sync workers for this subscription, while we have
481 				 * the lock.
482 				 */
483 				int			nsyncworkers =
484 				logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
485 
486 				/* Now safe to release the LWLock */
487 				LWLockRelease(LogicalRepWorkerLock);
488 
489 				/*
490 				 * If there are free sync worker slot(s), start a new sync
491 				 * worker for the table.
492 				 */
493 				if (nsyncworkers < max_sync_workers_per_subscription)
494 				{
495 					TimestampTz now = GetCurrentTimestamp();
496 					struct tablesync_start_time_mapping *hentry;
497 					bool		found;
498 
499 					hentry = hash_search(last_start_times, &rstate->relid,
500 										 HASH_ENTER, &found);
501 
502 					if (!found ||
503 						TimestampDifferenceExceeds(hentry->last_start_time, now,
504 												   wal_retrieve_retry_interval))
505 					{
506 						logicalrep_worker_launch(MyLogicalRepWorker->dbid,
507 												 MySubscription->oid,
508 												 MySubscription->name,
509 												 MyLogicalRepWorker->userid,
510 												 rstate->relid);
511 						hentry->last_start_time = now;
512 					}
513 				}
514 			}
515 		}
516 	}
517 
518 	if (started_tx)
519 	{
520 		CommitTransactionCommand();
521 		pgstat_report_stat(false);
522 	}
523 }
524 
525 /*
526  * Process possible state change(s) of tables that are being synchronized.
527  */
528 void
process_syncing_tables(XLogRecPtr current_lsn)529 process_syncing_tables(XLogRecPtr current_lsn)
530 {
531 	if (am_tablesync_worker())
532 		process_syncing_tables_for_sync(current_lsn);
533 	else
534 		process_syncing_tables_for_apply(current_lsn);
535 }
536 
537 /*
538  * Create list of columns for COPY based on logical relation mapping.
539  */
540 static List *
make_copy_attnamelist(LogicalRepRelMapEntry * rel)541 make_copy_attnamelist(LogicalRepRelMapEntry *rel)
542 {
543 	List	   *attnamelist = NIL;
544 	int			i;
545 
546 	for (i = 0; i < rel->remoterel.natts; i++)
547 	{
548 		attnamelist = lappend(attnamelist,
549 							  makeString(rel->remoterel.attnames[i]));
550 	}
551 
552 
553 	return attnamelist;
554 }
555 
556 /*
557  * Data source callback for the COPY FROM, which reads from the remote
558  * connection and passes the data back to our local COPY.
559  */
560 static int
copy_read_data(void * outbuf,int minread,int maxread)561 copy_read_data(void *outbuf, int minread, int maxread)
562 {
563 	int			bytesread = 0;
564 	int			avail;
565 
566 	/* If there are some leftover data from previous read, use it. */
567 	avail = copybuf->len - copybuf->cursor;
568 	if (avail)
569 	{
570 		if (avail > maxread)
571 			avail = maxread;
572 		memcpy(outbuf, &copybuf->data[copybuf->cursor], avail);
573 		copybuf->cursor += avail;
574 		maxread -= avail;
575 		bytesread += avail;
576 	}
577 
578 	while (maxread > 0 && bytesread < minread)
579 	{
580 		pgsocket	fd = PGINVALID_SOCKET;
581 		int			len;
582 		char	   *buf = NULL;
583 
584 		for (;;)
585 		{
586 			/* Try read the data. */
587 			len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
588 
589 			CHECK_FOR_INTERRUPTS();
590 
591 			if (len == 0)
592 				break;
593 			else if (len < 0)
594 				return bytesread;
595 			else
596 			{
597 				/* Process the data */
598 				copybuf->data = buf;
599 				copybuf->len = len;
600 				copybuf->cursor = 0;
601 
602 				avail = copybuf->len - copybuf->cursor;
603 				if (avail > maxread)
604 					avail = maxread;
605 				memcpy(outbuf, &copybuf->data[copybuf->cursor], avail);
606 				outbuf = (void *) ((char *) outbuf + avail);
607 				copybuf->cursor += avail;
608 				maxread -= avail;
609 				bytesread += avail;
610 			}
611 
612 			if (maxread <= 0 || bytesread >= minread)
613 				return bytesread;
614 		}
615 
616 		/*
617 		 * Wait for more data or latch.
618 		 */
619 		(void) WaitLatchOrSocket(MyLatch,
620 								 WL_SOCKET_READABLE | WL_LATCH_SET |
621 								 WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
622 								 fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA);
623 
624 		ResetLatch(MyLatch);
625 	}
626 
627 	return bytesread;
628 }
629 
630 
631 /*
632  * Get information about remote relation in similar fashion the RELATION
633  * message provides during replication.
634  */
635 static void
fetch_remote_table_info(char * nspname,char * relname,LogicalRepRelation * lrel)636 fetch_remote_table_info(char *nspname, char *relname,
637 						LogicalRepRelation *lrel)
638 {
639 	WalRcvExecResult *res;
640 	StringInfoData cmd;
641 	TupleTableSlot *slot;
642 	Oid			tableRow[] = {OIDOID, CHAROID, CHAROID};
643 	Oid			attrRow[] = {TEXTOID, OIDOID, INT4OID, BOOLOID};
644 	bool		isnull;
645 	int			natt;
646 
647 	lrel->nspname = nspname;
648 	lrel->relname = relname;
649 
650 	/* First fetch Oid and replica identity. */
651 	initStringInfo(&cmd);
652 	appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind"
653 					 "  FROM pg_catalog.pg_class c"
654 					 "  INNER JOIN pg_catalog.pg_namespace n"
655 					 "        ON (c.relnamespace = n.oid)"
656 					 " WHERE n.nspname = %s"
657 					 "   AND c.relname = %s",
658 					 quote_literal_cstr(nspname),
659 					 quote_literal_cstr(relname));
660 	res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
661 					  lengthof(tableRow), tableRow);
662 
663 	if (res->status != WALRCV_OK_TUPLES)
664 		ereport(ERROR,
665 				(errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
666 						nspname, relname, res->err)));
667 
668 	slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
669 	if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
670 		ereport(ERROR,
671 				(errmsg("table \"%s.%s\" not found on publisher",
672 						nspname, relname)));
673 
674 	lrel->remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull));
675 	Assert(!isnull);
676 	lrel->replident = DatumGetChar(slot_getattr(slot, 2, &isnull));
677 	Assert(!isnull);
678 	lrel->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
679 	Assert(!isnull);
680 
681 	ExecDropSingleTupleTableSlot(slot);
682 	walrcv_clear_result(res);
683 
684 	/* Now fetch columns. */
685 	resetStringInfo(&cmd);
686 	appendStringInfo(&cmd,
687 					 "SELECT a.attname,"
688 					 "       a.atttypid,"
689 					 "       a.atttypmod,"
690 					 "       a.attnum = ANY(i.indkey)"
691 					 "  FROM pg_catalog.pg_attribute a"
692 					 "  LEFT JOIN pg_catalog.pg_index i"
693 					 "       ON (i.indexrelid = pg_get_replica_identity_index(%u))"
694 					 " WHERE a.attnum > 0::pg_catalog.int2"
695 					 "   AND NOT a.attisdropped %s"
696 					 "   AND a.attrelid = %u"
697 					 " ORDER BY a.attnum",
698 					 lrel->remoteid,
699 					 (walrcv_server_version(LogRepWorkerWalRcvConn) >= 120000 ?
700 					  "AND a.attgenerated = ''" : ""),
701 					 lrel->remoteid);
702 	res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
703 					  lengthof(attrRow), attrRow);
704 
705 	if (res->status != WALRCV_OK_TUPLES)
706 		ereport(ERROR,
707 				(errmsg("could not fetch table info for table \"%s.%s\": %s",
708 						nspname, relname, res->err)));
709 
710 	/* We don't know the number of rows coming, so allocate enough space. */
711 	lrel->attnames = palloc0(MaxTupleAttributeNumber * sizeof(char *));
712 	lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid));
713 	lrel->attkeys = NULL;
714 
715 	natt = 0;
716 	slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
717 	while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
718 	{
719 		lrel->attnames[natt] =
720 			TextDatumGetCString(slot_getattr(slot, 1, &isnull));
721 		Assert(!isnull);
722 		lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 2, &isnull));
723 		Assert(!isnull);
724 		if (DatumGetBool(slot_getattr(slot, 4, &isnull)))
725 			lrel->attkeys = bms_add_member(lrel->attkeys, natt);
726 
727 		/* Should never happen. */
728 		if (++natt >= MaxTupleAttributeNumber)
729 			elog(ERROR, "too many columns in remote table \"%s.%s\"",
730 				 nspname, relname);
731 
732 		ExecClearTuple(slot);
733 	}
734 	ExecDropSingleTupleTableSlot(slot);
735 
736 	lrel->natts = natt;
737 
738 	walrcv_clear_result(res);
739 	pfree(cmd.data);
740 }
741 
742 /*
743  * Copy existing data of a table from publisher.
744  *
745  * Caller is responsible for locking the local relation.
746  */
747 static void
copy_table(Relation rel)748 copy_table(Relation rel)
749 {
750 	LogicalRepRelMapEntry *relmapentry;
751 	LogicalRepRelation lrel;
752 	WalRcvExecResult *res;
753 	StringInfoData cmd;
754 	CopyState	cstate;
755 	List	   *attnamelist;
756 	ParseState *pstate;
757 
758 	/* Get the publisher relation info. */
759 	fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
760 							RelationGetRelationName(rel), &lrel);
761 
762 	/* Put the relation into relmap. */
763 	logicalrep_relmap_update(&lrel);
764 
765 	/* Map the publisher relation to local one. */
766 	relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
767 	Assert(rel == relmapentry->localrel);
768 
769 	/* Start copy on the publisher. */
770 	initStringInfo(&cmd);
771 	if (lrel.relkind == RELKIND_RELATION)
772 		appendStringInfo(&cmd, "COPY %s TO STDOUT",
773 						 quote_qualified_identifier(lrel.nspname, lrel.relname));
774 	else
775 	{
776 		/*
777 		 * For non-tables, we need to do COPY (SELECT ...), but we can't just
778 		 * do SELECT * because we need to not copy generated columns.
779 		 */
780 		appendStringInfo(&cmd, "COPY (SELECT ");
781 		for (int i = 0; i < lrel.natts; i++)
782 		{
783 			appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
784 			if (i < lrel.natts - 1)
785 				appendStringInfoString(&cmd, ", ");
786 		}
787 		appendStringInfo(&cmd, " FROM %s) TO STDOUT",
788 						 quote_qualified_identifier(lrel.nspname, lrel.relname));
789 	}
790 	res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
791 	pfree(cmd.data);
792 	if (res->status != WALRCV_OK_COPY_OUT)
793 		ereport(ERROR,
794 				(errmsg("could not start initial contents copy for table \"%s.%s\": %s",
795 						lrel.nspname, lrel.relname, res->err)));
796 	walrcv_clear_result(res);
797 
798 	copybuf = makeStringInfo();
799 
800 	pstate = make_parsestate(NULL);
801 	(void) addRangeTableEntryForRelation(pstate, rel, AccessShareLock,
802 										 NULL, false, false);
803 
804 	attnamelist = make_copy_attnamelist(relmapentry);
805 	cstate = BeginCopyFrom(pstate, rel, NULL, false, copy_read_data, attnamelist, NIL);
806 
807 	/* Do the copy */
808 	(void) CopyFrom(cstate);
809 
810 	logicalrep_rel_close(relmapentry, NoLock);
811 }
812 
813 /*
814  * Start syncing the table in the sync worker.
815  *
816  * The returned slot name is palloc'ed in current memory context.
817  */
818 char *
LogicalRepSyncTableStart(XLogRecPtr * origin_startpos)819 LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
820 {
821 	char	   *slotname;
822 	char	   *err;
823 	char		relstate;
824 	XLogRecPtr	relstate_lsn;
825 
826 	/* Check the state of the table synchronization. */
827 	StartTransactionCommand();
828 	relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid,
829 									   MyLogicalRepWorker->relid,
830 									   &relstate_lsn, true);
831 	CommitTransactionCommand();
832 
833 	SpinLockAcquire(&MyLogicalRepWorker->relmutex);
834 	MyLogicalRepWorker->relstate = relstate;
835 	MyLogicalRepWorker->relstate_lsn = relstate_lsn;
836 	SpinLockRelease(&MyLogicalRepWorker->relmutex);
837 
838 	/*
839 	 * To build a slot name for the sync work, we are limited to NAMEDATALEN -
840 	 * 1 characters.  We cut the original slot name to NAMEDATALEN - 28 chars
841 	 * and append _%u_sync_%u (1 + 10 + 6 + 10 + '\0').  (It's actually the
842 	 * NAMEDATALEN on the remote that matters, but this scheme will also work
843 	 * reasonably if that is different.)
844 	 */
845 	StaticAssertStmt(NAMEDATALEN >= 32, "NAMEDATALEN too small");	/* for sanity */
846 	slotname = psprintf("%.*s_%u_sync_%u",
847 						NAMEDATALEN - 28,
848 						MySubscription->slotname,
849 						MySubscription->oid,
850 						MyLogicalRepWorker->relid);
851 
852 	/*
853 	 * Here we use the slot name instead of the subscription name as the
854 	 * application_name, so that it is different from the main apply worker,
855 	 * so that synchronous replication can distinguish them.
856 	 */
857 	LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
858 											slotname, &err);
859 	if (LogRepWorkerWalRcvConn == NULL)
860 		ereport(ERROR,
861 				(errmsg("could not connect to the publisher: %s", err)));
862 
863 	switch (MyLogicalRepWorker->relstate)
864 	{
865 		case SUBREL_STATE_INIT:
866 		case SUBREL_STATE_DATASYNC:
867 			{
868 				Relation	rel;
869 				WalRcvExecResult *res;
870 
871 				SpinLockAcquire(&MyLogicalRepWorker->relmutex);
872 				MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
873 				MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr;
874 				SpinLockRelease(&MyLogicalRepWorker->relmutex);
875 
876 				/* Update the state and make it visible to others. */
877 				StartTransactionCommand();
878 				UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
879 										   MyLogicalRepWorker->relid,
880 										   MyLogicalRepWorker->relstate,
881 										   MyLogicalRepWorker->relstate_lsn);
882 				CommitTransactionCommand();
883 				pgstat_report_stat(false);
884 
885 				/*
886 				 * We want to do the table data sync in a single transaction.
887 				 */
888 				StartTransactionCommand();
889 
890 				/*
891 				 * Use a standard write lock here. It might be better to
892 				 * disallow access to the table while it's being synchronized.
893 				 * But we don't want to block the main apply process from
894 				 * working and it has to open the relation in RowExclusiveLock
895 				 * when remapping remote relation id to local one.
896 				 */
897 				rel = table_open(MyLogicalRepWorker->relid, RowExclusiveLock);
898 
899 				/*
900 				 * Create a temporary slot for the sync process. We do this
901 				 * inside the transaction so that we can use the snapshot made
902 				 * by the slot to get existing data.
903 				 */
904 				res = walrcv_exec(LogRepWorkerWalRcvConn,
905 								  "BEGIN READ ONLY ISOLATION LEVEL "
906 								  "REPEATABLE READ", 0, NULL);
907 				if (res->status != WALRCV_OK_COMMAND)
908 					ereport(ERROR,
909 							(errmsg("table copy could not start transaction on publisher"),
910 							 errdetail("The error was: %s", res->err)));
911 				walrcv_clear_result(res);
912 
913 				/*
914 				 * Create new temporary logical decoding slot.
915 				 *
916 				 * We'll use slot for data copy so make sure the snapshot is
917 				 * used for the transaction; that way the COPY will get data
918 				 * that is consistent with the lsn used by the slot to start
919 				 * decoding.
920 				 */
921 				walrcv_create_slot(LogRepWorkerWalRcvConn, slotname, true,
922 								   CRS_USE_SNAPSHOT, origin_startpos);
923 
924 				PushActiveSnapshot(GetTransactionSnapshot());
925 				copy_table(rel);
926 				PopActiveSnapshot();
927 
928 				res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
929 				if (res->status != WALRCV_OK_COMMAND)
930 					ereport(ERROR,
931 							(errmsg("table copy could not finish transaction on publisher"),
932 							 errdetail("The error was: %s", res->err)));
933 				walrcv_clear_result(res);
934 
935 				table_close(rel, NoLock);
936 
937 				/* Make the copy visible. */
938 				CommandCounterIncrement();
939 
940 				/*
941 				 * We are done with the initial data synchronization, update
942 				 * the state.
943 				 */
944 				SpinLockAcquire(&MyLogicalRepWorker->relmutex);
945 				MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
946 				MyLogicalRepWorker->relstate_lsn = *origin_startpos;
947 				SpinLockRelease(&MyLogicalRepWorker->relmutex);
948 
949 				/* Wait for main apply worker to tell us to catchup. */
950 				wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
951 
952 				/*----------
953 				 * There are now two possible states here:
954 				 * a) Sync is behind the apply.  If that's the case we need to
955 				 *	  catch up with it by consuming the logical replication
956 				 *	  stream up to the relstate_lsn.  For that, we exit this
957 				 *	  function and continue in ApplyWorkerMain().
958 				 * b) Sync is caught up with the apply.  So it can just set
959 				 *	  the state to SYNCDONE and finish.
960 				 *----------
961 				 */
962 				if (*origin_startpos >= MyLogicalRepWorker->relstate_lsn)
963 				{
964 					/*
965 					 * Update the new state in catalog.  No need to bother
966 					 * with the shmem state as we are exiting for good.
967 					 */
968 					UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
969 											   MyLogicalRepWorker->relid,
970 											   SUBREL_STATE_SYNCDONE,
971 											   *origin_startpos);
972 					finish_sync_worker();
973 				}
974 				break;
975 			}
976 		case SUBREL_STATE_SYNCDONE:
977 		case SUBREL_STATE_READY:
978 		case SUBREL_STATE_UNKNOWN:
979 
980 			/*
981 			 * Nothing to do here but finish.  (UNKNOWN means the relation was
982 			 * removed from pg_subscription_rel before the sync worker could
983 			 * start.)
984 			 */
985 			finish_sync_worker();
986 			break;
987 		default:
988 			elog(ERROR, "unknown relation state \"%c\"",
989 				 MyLogicalRepWorker->relstate);
990 	}
991 
992 	return slotname;
993 }
994