1 /*------------------------------------------------------------------------- 2 * 3 * logicalproto.h 4 * logical replication protocol 5 * 6 * Copyright (c) 2015-2021, PostgreSQL Global Development Group 7 * 8 * IDENTIFICATION 9 * src/include/replication/logicalproto.h 10 * 11 *------------------------------------------------------------------------- 12 */ 13 #ifndef LOGICAL_PROTO_H 14 #define LOGICAL_PROTO_H 15 16 #include "replication/reorderbuffer.h" 17 #include "utils/rel.h" 18 19 /* 20 * Protocol capabilities 21 * 22 * LOGICALREP_PROTO_VERSION_NUM is our native protocol. 23 * LOGICALREP_PROTO_MAX_VERSION_NUM is the greatest version we can support. 24 * LOGICALREP_PROTO_MIN_VERSION_NUM is the oldest version we 25 * have backwards compatibility for. The client requests protocol version at 26 * connect time. 27 * 28 * LOGICALREP_PROTO_STREAM_VERSION_NUM is the minimum protocol version with 29 * support for streaming large transactions. 30 */ 31 #define LOGICALREP_PROTO_MIN_VERSION_NUM 1 32 #define LOGICALREP_PROTO_VERSION_NUM 1 33 #define LOGICALREP_PROTO_STREAM_VERSION_NUM 2 34 #define LOGICALREP_PROTO_MAX_VERSION_NUM LOGICALREP_PROTO_STREAM_VERSION_NUM 35 36 /* 37 * Logical message types 38 * 39 * Used by logical replication wire protocol. 40 * 41 * Note: though this is an enum, the values are used to identify message types 42 * in logical replication protocol, which uses a single byte to identify a 43 * message type. Hence the values should be single-byte wide and preferably 44 * human-readable characters. 45 */ 46 typedef enum LogicalRepMsgType 47 { 48 LOGICAL_REP_MSG_BEGIN = 'B', 49 LOGICAL_REP_MSG_COMMIT = 'C', 50 LOGICAL_REP_MSG_ORIGIN = 'O', 51 LOGICAL_REP_MSG_INSERT = 'I', 52 LOGICAL_REP_MSG_UPDATE = 'U', 53 LOGICAL_REP_MSG_DELETE = 'D', 54 LOGICAL_REP_MSG_TRUNCATE = 'T', 55 LOGICAL_REP_MSG_RELATION = 'R', 56 LOGICAL_REP_MSG_TYPE = 'Y', 57 LOGICAL_REP_MSG_MESSAGE = 'M', 58 LOGICAL_REP_MSG_STREAM_START = 'S', 59 LOGICAL_REP_MSG_STREAM_END = 'E', 60 LOGICAL_REP_MSG_STREAM_COMMIT = 'c', 61 LOGICAL_REP_MSG_STREAM_ABORT = 'A' 62 } LogicalRepMsgType; 63 64 /* 65 * This struct stores a tuple received via logical replication. 66 * Keep in mind that the columns correspond to the *remote* table. 67 */ 68 typedef struct LogicalRepTupleData 69 { 70 /* Array of StringInfos, one per column; some may be unused */ 71 StringInfoData *colvalues; 72 /* Array of markers for null/unchanged/text/binary, one per column */ 73 char *colstatus; 74 /* Length of above arrays */ 75 int ncols; 76 } LogicalRepTupleData; 77 78 /* Possible values for LogicalRepTupleData.colstatus[colnum] */ 79 /* These values are also used in the on-the-wire protocol */ 80 #define LOGICALREP_COLUMN_NULL 'n' 81 #define LOGICALREP_COLUMN_UNCHANGED 'u' 82 #define LOGICALREP_COLUMN_TEXT 't' 83 #define LOGICALREP_COLUMN_BINARY 'b' /* added in PG14 */ 84 85 typedef uint32 LogicalRepRelId; 86 87 /* Relation information */ 88 typedef struct LogicalRepRelation 89 { 90 /* Info coming from the remote side. */ 91 LogicalRepRelId remoteid; /* unique id of the relation */ 92 char *nspname; /* schema name */ 93 char *relname; /* relation name */ 94 int natts; /* number of columns */ 95 char **attnames; /* column names */ 96 Oid *atttyps; /* column types */ 97 char replident; /* replica identity */ 98 char relkind; /* remote relation kind */ 99 Bitmapset *attkeys; /* Bitmap of key columns */ 100 } LogicalRepRelation; 101 102 /* Type mapping info */ 103 typedef struct LogicalRepTyp 104 { 105 Oid remoteid; /* unique id of the remote type */ 106 char *nspname; /* schema name of remote type */ 107 char *typname; /* name of the remote type */ 108 } LogicalRepTyp; 109 110 /* Transaction info */ 111 typedef struct LogicalRepBeginData 112 { 113 XLogRecPtr final_lsn; 114 TimestampTz committime; 115 TransactionId xid; 116 } LogicalRepBeginData; 117 118 typedef struct LogicalRepCommitData 119 { 120 XLogRecPtr commit_lsn; 121 XLogRecPtr end_lsn; 122 TimestampTz committime; 123 } LogicalRepCommitData; 124 125 extern void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn); 126 extern void logicalrep_read_begin(StringInfo in, 127 LogicalRepBeginData *begin_data); 128 extern void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, 129 XLogRecPtr commit_lsn); 130 extern void logicalrep_read_commit(StringInfo in, 131 LogicalRepCommitData *commit_data); 132 extern void logicalrep_write_origin(StringInfo out, const char *origin, 133 XLogRecPtr origin_lsn); 134 extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn); 135 extern void logicalrep_write_insert(StringInfo out, TransactionId xid, 136 Relation rel, HeapTuple newtuple, 137 bool binary); 138 extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup); 139 extern void logicalrep_write_update(StringInfo out, TransactionId xid, 140 Relation rel, HeapTuple oldtuple, 141 HeapTuple newtuple, bool binary); 142 extern LogicalRepRelId logicalrep_read_update(StringInfo in, 143 bool *has_oldtuple, LogicalRepTupleData *oldtup, 144 LogicalRepTupleData *newtup); 145 extern void logicalrep_write_delete(StringInfo out, TransactionId xid, 146 Relation rel, HeapTuple oldtuple, 147 bool binary); 148 extern LogicalRepRelId logicalrep_read_delete(StringInfo in, 149 LogicalRepTupleData *oldtup); 150 extern void logicalrep_write_truncate(StringInfo out, TransactionId xid, 151 int nrelids, Oid relids[], 152 bool cascade, bool restart_seqs); 153 extern List *logicalrep_read_truncate(StringInfo in, 154 bool *cascade, bool *restart_seqs); 155 extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, 156 bool transactional, const char *prefix, Size sz, const char *message); 157 extern void logicalrep_write_rel(StringInfo out, TransactionId xid, 158 Relation rel); 159 extern LogicalRepRelation *logicalrep_read_rel(StringInfo in); 160 extern void logicalrep_write_typ(StringInfo out, TransactionId xid, 161 Oid typoid); 162 extern void logicalrep_read_typ(StringInfo out, LogicalRepTyp *ltyp); 163 extern void logicalrep_write_stream_start(StringInfo out, TransactionId xid, 164 bool first_segment); 165 extern TransactionId logicalrep_read_stream_start(StringInfo in, 166 bool *first_segment); 167 extern void logicalrep_write_stream_stop(StringInfo out); 168 extern void logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn, 169 XLogRecPtr commit_lsn); 170 extern TransactionId logicalrep_read_stream_commit(StringInfo out, 171 LogicalRepCommitData *commit_data); 172 extern void logicalrep_write_stream_abort(StringInfo out, TransactionId xid, 173 TransactionId subxid); 174 extern void logicalrep_read_stream_abort(StringInfo in, TransactionId *xid, 175 TransactionId *subxid); 176 177 #endif /* LOGICAL_PROTO_H */ 178