1 /*------------------------------------------------------------------------- 2 * 3 * walreceiver.h 4 * Exports from replication/walreceiverfuncs.c. 5 * 6 * Portions Copyright (c) 2010-2018, PostgreSQL Global Development Group 7 * 8 * src/include/replication/walreceiver.h 9 * 10 *------------------------------------------------------------------------- 11 */ 12 #ifndef _WALRECEIVER_H 13 #define _WALRECEIVER_H 14 15 #include "access/xlog.h" 16 #include "access/xlogdefs.h" 17 #include "fmgr.h" 18 #include "getaddrinfo.h" /* for NI_MAXHOST */ 19 #include "replication/logicalproto.h" 20 #include "replication/walsender.h" 21 #include "storage/latch.h" 22 #include "storage/spin.h" 23 #include "pgtime.h" 24 #include "utils/tuplestore.h" 25 26 /* user-settable parameters */ 27 extern int wal_receiver_status_interval; 28 extern int wal_receiver_timeout; 29 extern bool hot_standby_feedback; 30 31 /* 32 * MAXCONNINFO: maximum size of a connection string. 33 * 34 * XXX: Should this move to pg_config_manual.h? 35 */ 36 #define MAXCONNINFO 1024 37 38 /* Can we allow the standby to accept replication connection from another standby? */ 39 #define AllowCascadeReplication() (EnableHotStandby && max_wal_senders > 0) 40 41 /* 42 * Values for WalRcv->walRcvState. 43 */ 44 typedef enum 45 { 46 WALRCV_STOPPED, /* stopped and mustn't start up again */ 47 WALRCV_STARTING, /* launched, but the process hasn't 48 * initialized yet */ 49 WALRCV_STREAMING, /* walreceiver is streaming */ 50 WALRCV_WAITING, /* stopped streaming, waiting for orders */ 51 WALRCV_RESTARTING, /* asked to restart streaming */ 52 WALRCV_STOPPING /* requested to stop, but still running */ 53 } WalRcvState; 54 55 /* Shared memory area for management of walreceiver process */ 56 typedef struct 57 { 58 /* 59 * PID of currently active walreceiver process, its current state and 60 * start time (actually, the time at which it was requested to be 61 * started). 62 */ 63 pid_t pid; 64 WalRcvState walRcvState; 65 pg_time_t startTime; 66 67 /* 68 * receiveStart and receiveStartTLI indicate the first byte position and 69 * timeline that will be received. When startup process starts the 70 * walreceiver, it sets these to the point where it wants the streaming to 71 * begin. 72 */ 73 XLogRecPtr receiveStart; 74 TimeLineID receiveStartTLI; 75 76 /* 77 * receivedUpto-1 is the last byte position that has already been 78 * received, and receivedTLI is the timeline it came from. At the first 79 * startup of walreceiver, these are set to receiveStart and 80 * receiveStartTLI. After that, walreceiver updates these whenever it 81 * flushes the received WAL to disk. 82 */ 83 XLogRecPtr receivedUpto; 84 TimeLineID receivedTLI; 85 86 /* 87 * latestChunkStart is the starting byte position of the current "batch" 88 * of received WAL. It's actually the same as the previous value of 89 * receivedUpto before the last flush to disk. Startup process can use 90 * this to detect whether it's keeping up or not. 91 */ 92 XLogRecPtr latestChunkStart; 93 94 /* 95 * Time of send and receive of any message received. 96 */ 97 TimestampTz lastMsgSendTime; 98 TimestampTz lastMsgReceiptTime; 99 100 /* 101 * Latest reported end of WAL on the sender 102 */ 103 XLogRecPtr latestWalEnd; 104 TimestampTz latestWalEndTime; 105 106 /* 107 * connection string; initially set to connect to the primary, and later 108 * clobbered to hide security-sensitive fields. 109 */ 110 char conninfo[MAXCONNINFO]; 111 112 /* 113 * Host name (this can be a host name, an IP address, or a directory path) 114 * and port number of the active replication connection. 115 */ 116 char sender_host[NI_MAXHOST]; 117 int sender_port; 118 119 /* 120 * replication slot name; is also used for walreceiver to connect with the 121 * primary 122 */ 123 char slotname[NAMEDATALEN]; 124 125 /* set true once conninfo is ready to display (obfuscated pwds etc) */ 126 bool ready_to_display; 127 128 /* 129 * Latch used by startup process to wake up walreceiver after telling it 130 * where to start streaming (after setting receiveStart and 131 * receiveStartTLI), and also to tell it to send apply feedback to the 132 * primary whenever specially marked commit records are applied. This is 133 * normally mapped to procLatch when walreceiver is running. 134 */ 135 Latch *latch; 136 137 slock_t mutex; /* locks shared variables shown above */ 138 139 /* 140 * force walreceiver reply? This doesn't need to be locked; memory 141 * barriers for ordering are sufficient. But we do need atomic fetch and 142 * store semantics, so use sig_atomic_t. 143 */ 144 sig_atomic_t force_reply; /* used as a bool */ 145 } WalRcvData; 146 147 extern WalRcvData *WalRcv; 148 149 typedef struct 150 { 151 bool logical; /* True if this is logical replication stream, 152 * false if physical stream. */ 153 char *slotname; /* Name of the replication slot or NULL. */ 154 XLogRecPtr startpoint; /* LSN of starting point. */ 155 156 union 157 { 158 struct 159 { 160 TimeLineID startpointTLI; /* Starting timeline */ 161 } physical; 162 struct 163 { 164 uint32 proto_version; /* Logical protocol version */ 165 List *publication_names; /* String list of publications */ 166 } logical; 167 } proto; 168 } WalRcvStreamOptions; 169 170 struct WalReceiverConn; 171 typedef struct WalReceiverConn WalReceiverConn; 172 173 /* 174 * Status of walreceiver query execution. 175 * 176 * We only define statuses that are currently used. 177 */ 178 typedef enum 179 { 180 WALRCV_ERROR, /* There was error when executing the query. */ 181 WALRCV_OK_COMMAND, /* Query executed utility or replication 182 * command. */ 183 WALRCV_OK_TUPLES, /* Query returned tuples. */ 184 WALRCV_OK_COPY_IN, /* Query started COPY FROM. */ 185 WALRCV_OK_COPY_OUT, /* Query started COPY TO. */ 186 WALRCV_OK_COPY_BOTH /* Query started COPY BOTH replication 187 * protocol. */ 188 } WalRcvExecStatus; 189 190 /* 191 * Return value for walrcv_query, returns the status of the execution and 192 * tuples if any. 193 */ 194 typedef struct WalRcvExecResult 195 { 196 WalRcvExecStatus status; 197 char *err; 198 Tuplestorestate *tuplestore; 199 TupleDesc tupledesc; 200 } WalRcvExecResult; 201 202 /* libpqwalreceiver hooks */ 203 typedef WalReceiverConn *(*walrcv_connect_fn) (const char *conninfo, bool logical, 204 const char *appname, 205 char **err); 206 typedef void (*walrcv_check_conninfo_fn) (const char *conninfo); 207 typedef char *(*walrcv_get_conninfo_fn) (WalReceiverConn *conn); 208 typedef void (*walrcv_get_senderinfo_fn) (WalReceiverConn *conn, 209 char **sender_host, 210 int *sender_port); 211 typedef char *(*walrcv_identify_system_fn) (WalReceiverConn *conn, 212 TimeLineID *primary_tli, 213 int *server_version); 214 typedef void (*walrcv_readtimelinehistoryfile_fn) (WalReceiverConn *conn, 215 TimeLineID tli, 216 char **filename, 217 char **content, int *size); 218 typedef bool (*walrcv_startstreaming_fn) (WalReceiverConn *conn, 219 const WalRcvStreamOptions *options); 220 typedef void (*walrcv_endstreaming_fn) (WalReceiverConn *conn, 221 TimeLineID *next_tli); 222 typedef int (*walrcv_receive_fn) (WalReceiverConn *conn, char **buffer, 223 pgsocket *wait_fd); 224 typedef void (*walrcv_send_fn) (WalReceiverConn *conn, const char *buffer, 225 int nbytes); 226 typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn, 227 const char *slotname, bool temporary, 228 CRSSnapshotAction snapshot_action, 229 XLogRecPtr *lsn); 230 typedef WalRcvExecResult *(*walrcv_exec_fn) (WalReceiverConn *conn, 231 const char *query, 232 const int nRetTypes, 233 const Oid *retTypes); 234 typedef void (*walrcv_disconnect_fn) (WalReceiverConn *conn); 235 236 typedef struct WalReceiverFunctionsType 237 { 238 walrcv_connect_fn walrcv_connect; 239 walrcv_check_conninfo_fn walrcv_check_conninfo; 240 walrcv_get_conninfo_fn walrcv_get_conninfo; 241 walrcv_get_senderinfo_fn walrcv_get_senderinfo; 242 walrcv_identify_system_fn walrcv_identify_system; 243 walrcv_readtimelinehistoryfile_fn walrcv_readtimelinehistoryfile; 244 walrcv_startstreaming_fn walrcv_startstreaming; 245 walrcv_endstreaming_fn walrcv_endstreaming; 246 walrcv_receive_fn walrcv_receive; 247 walrcv_send_fn walrcv_send; 248 walrcv_create_slot_fn walrcv_create_slot; 249 walrcv_exec_fn walrcv_exec; 250 walrcv_disconnect_fn walrcv_disconnect; 251 } WalReceiverFunctionsType; 252 253 extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions; 254 255 #define walrcv_connect(conninfo, logical, appname, err) \ 256 WalReceiverFunctions->walrcv_connect(conninfo, logical, appname, err) 257 #define walrcv_check_conninfo(conninfo) \ 258 WalReceiverFunctions->walrcv_check_conninfo(conninfo) 259 #define walrcv_get_conninfo(conn) \ 260 WalReceiverFunctions->walrcv_get_conninfo(conn) 261 #define walrcv_get_senderinfo(conn, sender_host, sender_port) \ 262 WalReceiverFunctions->walrcv_get_senderinfo(conn, sender_host, sender_port) 263 #define walrcv_identify_system(conn, primary_tli, server_version) \ 264 WalReceiverFunctions->walrcv_identify_system(conn, primary_tli, server_version) 265 #define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size) \ 266 WalReceiverFunctions->walrcv_readtimelinehistoryfile(conn, tli, filename, content, size) 267 #define walrcv_startstreaming(conn, options) \ 268 WalReceiverFunctions->walrcv_startstreaming(conn, options) 269 #define walrcv_endstreaming(conn, next_tli) \ 270 WalReceiverFunctions->walrcv_endstreaming(conn, next_tli) 271 #define walrcv_receive(conn, buffer, wait_fd) \ 272 WalReceiverFunctions->walrcv_receive(conn, buffer, wait_fd) 273 #define walrcv_send(conn, buffer, nbytes) \ 274 WalReceiverFunctions->walrcv_send(conn, buffer, nbytes) 275 #define walrcv_create_slot(conn, slotname, temporary, snapshot_action, lsn) \ 276 WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, snapshot_action, lsn) 277 #define walrcv_exec(conn, exec, nRetTypes, retTypes) \ 278 WalReceiverFunctions->walrcv_exec(conn, exec, nRetTypes, retTypes) 279 #define walrcv_disconnect(conn) \ 280 WalReceiverFunctions->walrcv_disconnect(conn) 281 282 static inline void 283 walrcv_clear_result(WalRcvExecResult *walres) 284 { 285 if (!walres) 286 return; 287 288 if (walres->err) 289 pfree(walres->err); 290 291 if (walres->tuplestore) 292 tuplestore_end(walres->tuplestore); 293 294 if (walres->tupledesc) 295 FreeTupleDesc(walres->tupledesc); 296 297 pfree(walres); 298 } 299 300 /* prototypes for functions in walreceiver.c */ 301 extern void WalReceiverMain(void) pg_attribute_noreturn(); 302 extern void ProcessWalRcvInterrupts(void); 303 304 /* prototypes for functions in walreceiverfuncs.c */ 305 extern Size WalRcvShmemSize(void); 306 extern void WalRcvShmemInit(void); 307 extern void ShutdownWalRcv(void); 308 extern bool WalRcvStreaming(void); 309 extern bool WalRcvRunning(void); 310 extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, 311 const char *conninfo, const char *slotname); 312 extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI); 313 extern int GetReplicationApplyDelay(void); 314 extern int GetReplicationTransferLatency(void); 315 extern void WalRcvForceReply(void); 316 317 #endif /* _WALRECEIVER_H */ 318