1 /*-------------------------------------------------------------------------
2  *
3  * worker_internal.h
4  *	  Internal headers shared by logical replication workers.
5  *
6  * Portions Copyright (c) 2016-2021, PostgreSQL Global Development Group
7  *
8  * src/include/replication/worker_internal.h
9  *
10  *-------------------------------------------------------------------------
11  */
12 #ifndef WORKER_INTERNAL_H
13 #define WORKER_INTERNAL_H
14 
15 #include <signal.h>
16 
17 #include "access/xlogdefs.h"
18 #include "catalog/pg_subscription.h"
19 #include "datatype/timestamp.h"
20 #include "storage/lock.h"
21 #include "storage/spin.h"
22 
23 
24 typedef struct LogicalRepWorker
25 {
26 	/* Time at which this worker was launched. */
27 	TimestampTz launch_time;
28 
29 	/* Indicates if this slot is used or free. */
30 	bool		in_use;
31 
32 	/* Increased every time the slot is taken by new worker. */
33 	uint16		generation;
34 
35 	/* Pointer to proc array. NULL if not running. */
36 	PGPROC	   *proc;
37 
38 	/* Database id to connect to. */
39 	Oid			dbid;
40 
41 	/* User to use for connection (will be same as owner of subscription). */
42 	Oid			userid;
43 
44 	/* Subscription id for the worker. */
45 	Oid			subid;
46 
47 	/* Used for initial table synchronization. */
48 	Oid			relid;
49 	char		relstate;
50 	XLogRecPtr	relstate_lsn;
51 	slock_t		relmutex;
52 
53 	/* Stats. */
54 	XLogRecPtr	last_lsn;
55 	TimestampTz last_send_time;
56 	TimestampTz last_recv_time;
57 	XLogRecPtr	reply_lsn;
58 	TimestampTz reply_time;
59 } LogicalRepWorker;
60 
61 /* Main memory context for apply worker. Permanent during worker lifetime. */
62 extern MemoryContext ApplyContext;
63 
64 /* libpqreceiver connection */
65 extern struct WalReceiverConn *LogRepWorkerWalRcvConn;
66 
67 /* Worker and subscription objects. */
68 extern Subscription *MySubscription;
69 extern LogicalRepWorker *MyLogicalRepWorker;
70 
71 extern bool in_remote_transaction;
72 
73 extern void logicalrep_worker_attach(int slot);
74 extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
75 												bool only_running);
76 extern List *logicalrep_workers_find(Oid subid, bool only_running);
77 extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
78 									 Oid userid, Oid relid);
79 extern void logicalrep_worker_stop(Oid subid, Oid relid);
80 extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
81 extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
82 
83 extern int	logicalrep_sync_worker_count(Oid subid);
84 
85 extern void ReplicationOriginNameForTablesync(Oid suboid, Oid relid,
86 											  char *originname, int szorgname);
87 extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos);
88 
89 void		process_syncing_tables(XLogRecPtr current_lsn);
90 void		invalidate_syncing_table_states(Datum arg, int cacheid,
91 											uint32 hashvalue);
92 
93 static inline bool
am_tablesync_worker(void)94 am_tablesync_worker(void)
95 {
96 	return OidIsValid(MyLogicalRepWorker->relid);
97 }
98 
99 #endif							/* WORKER_INTERNAL_H */
100