1 /*-------------------------------------------------------------------------
2  *
3  * proto.c
4  *		logical replication protocol functions
5  *
6  * Copyright (c) 2015-2021, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  *		src/backend/replication/logical/proto.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include "access/sysattr.h"
16 #include "catalog/pg_namespace.h"
17 #include "catalog/pg_type.h"
18 #include "libpq/pqformat.h"
19 #include "replication/logicalproto.h"
20 #include "utils/lsyscache.h"
21 #include "utils/syscache.h"
22 
23 /*
24  * Protocol message flags.
25  */
26 #define LOGICALREP_IS_REPLICA_IDENTITY 1
27 
28 #define MESSAGE_TRANSACTIONAL (1<<0)
29 #define TRUNCATE_CASCADE		(1<<0)
30 #define TRUNCATE_RESTART_SEQS	(1<<1)
31 
32 static void logicalrep_write_attrs(StringInfo out, Relation rel);
33 static void logicalrep_write_tuple(StringInfo out, Relation rel,
34 								   HeapTuple tuple, bool binary);
35 
36 static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel);
37 static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple);
38 
39 static void logicalrep_write_namespace(StringInfo out, Oid nspid);
40 static const char *logicalrep_read_namespace(StringInfo in);
41 
42 /*
43  * Write BEGIN to the output stream.
44  */
45 void
logicalrep_write_begin(StringInfo out,ReorderBufferTXN * txn)46 logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
47 {
48 	pq_sendbyte(out, LOGICAL_REP_MSG_BEGIN);
49 
50 	/* fixed fields */
51 	pq_sendint64(out, txn->final_lsn);
52 	pq_sendint64(out, txn->commit_time);
53 	pq_sendint32(out, txn->xid);
54 }
55 
56 /*
57  * Read transaction BEGIN from the stream.
58  */
59 void
logicalrep_read_begin(StringInfo in,LogicalRepBeginData * begin_data)60 logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
61 {
62 	/* read fields */
63 	begin_data->final_lsn = pq_getmsgint64(in);
64 	if (begin_data->final_lsn == InvalidXLogRecPtr)
65 		elog(ERROR, "final_lsn not set in begin message");
66 	begin_data->committime = pq_getmsgint64(in);
67 	begin_data->xid = pq_getmsgint(in, 4);
68 }
69 
70 
71 /*
72  * Write COMMIT to the output stream.
73  */
74 void
logicalrep_write_commit(StringInfo out,ReorderBufferTXN * txn,XLogRecPtr commit_lsn)75 logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn,
76 						XLogRecPtr commit_lsn)
77 {
78 	uint8		flags = 0;
79 
80 	pq_sendbyte(out, LOGICAL_REP_MSG_COMMIT);
81 
82 	/* send the flags field (unused for now) */
83 	pq_sendbyte(out, flags);
84 
85 	/* send fields */
86 	pq_sendint64(out, commit_lsn);
87 	pq_sendint64(out, txn->end_lsn);
88 	pq_sendint64(out, txn->commit_time);
89 }
90 
91 /*
92  * Read transaction COMMIT from the stream.
93  */
94 void
logicalrep_read_commit(StringInfo in,LogicalRepCommitData * commit_data)95 logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
96 {
97 	/* read flags (unused for now) */
98 	uint8		flags = pq_getmsgbyte(in);
99 
100 	if (flags != 0)
101 		elog(ERROR, "unrecognized flags %u in commit message", flags);
102 
103 	/* read fields */
104 	commit_data->commit_lsn = pq_getmsgint64(in);
105 	commit_data->end_lsn = pq_getmsgint64(in);
106 	commit_data->committime = pq_getmsgint64(in);
107 }
108 
109 /*
110  * Write ORIGIN to the output stream.
111  */
112 void
logicalrep_write_origin(StringInfo out,const char * origin,XLogRecPtr origin_lsn)113 logicalrep_write_origin(StringInfo out, const char *origin,
114 						XLogRecPtr origin_lsn)
115 {
116 	pq_sendbyte(out, LOGICAL_REP_MSG_ORIGIN);
117 
118 	/* fixed fields */
119 	pq_sendint64(out, origin_lsn);
120 
121 	/* origin string */
122 	pq_sendstring(out, origin);
123 }
124 
125 /*
126  * Read ORIGIN from the output stream.
127  */
128 char *
logicalrep_read_origin(StringInfo in,XLogRecPtr * origin_lsn)129 logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
130 {
131 	/* fixed fields */
132 	*origin_lsn = pq_getmsgint64(in);
133 
134 	/* return origin */
135 	return pstrdup(pq_getmsgstring(in));
136 }
137 
138 /*
139  * Write INSERT to the output stream.
140  */
141 void
logicalrep_write_insert(StringInfo out,TransactionId xid,Relation rel,HeapTuple newtuple,bool binary)142 logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
143 						HeapTuple newtuple, bool binary)
144 {
145 	pq_sendbyte(out, LOGICAL_REP_MSG_INSERT);
146 
147 	/* transaction ID (if not valid, we're not streaming) */
148 	if (TransactionIdIsValid(xid))
149 		pq_sendint32(out, xid);
150 
151 	/* use Oid as relation identifier */
152 	pq_sendint32(out, RelationGetRelid(rel));
153 
154 	pq_sendbyte(out, 'N');		/* new tuple follows */
155 	logicalrep_write_tuple(out, rel, newtuple, binary);
156 }
157 
158 /*
159  * Read INSERT from stream.
160  *
161  * Fills the new tuple.
162  */
163 LogicalRepRelId
logicalrep_read_insert(StringInfo in,LogicalRepTupleData * newtup)164 logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
165 {
166 	char		action;
167 	LogicalRepRelId relid;
168 
169 	/* read the relation id */
170 	relid = pq_getmsgint(in, 4);
171 
172 	action = pq_getmsgbyte(in);
173 	if (action != 'N')
174 		elog(ERROR, "expected new tuple but got %d",
175 			 action);
176 
177 	logicalrep_read_tuple(in, newtup);
178 
179 	return relid;
180 }
181 
182 /*
183  * Write UPDATE to the output stream.
184  */
185 void
logicalrep_write_update(StringInfo out,TransactionId xid,Relation rel,HeapTuple oldtuple,HeapTuple newtuple,bool binary)186 logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
187 						HeapTuple oldtuple, HeapTuple newtuple, bool binary)
188 {
189 	pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE);
190 
191 	Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
192 		   rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
193 		   rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
194 
195 	/* transaction ID (if not valid, we're not streaming) */
196 	if (TransactionIdIsValid(xid))
197 		pq_sendint32(out, xid);
198 
199 	/* use Oid as relation identifier */
200 	pq_sendint32(out, RelationGetRelid(rel));
201 
202 	if (oldtuple != NULL)
203 	{
204 		if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
205 			pq_sendbyte(out, 'O');	/* old tuple follows */
206 		else
207 			pq_sendbyte(out, 'K');	/* old key follows */
208 		logicalrep_write_tuple(out, rel, oldtuple, binary);
209 	}
210 
211 	pq_sendbyte(out, 'N');		/* new tuple follows */
212 	logicalrep_write_tuple(out, rel, newtuple, binary);
213 }
214 
215 /*
216  * Read UPDATE from stream.
217  */
218 LogicalRepRelId
logicalrep_read_update(StringInfo in,bool * has_oldtuple,LogicalRepTupleData * oldtup,LogicalRepTupleData * newtup)219 logicalrep_read_update(StringInfo in, bool *has_oldtuple,
220 					   LogicalRepTupleData *oldtup,
221 					   LogicalRepTupleData *newtup)
222 {
223 	char		action;
224 	LogicalRepRelId relid;
225 
226 	/* read the relation id */
227 	relid = pq_getmsgint(in, 4);
228 
229 	/* read and verify action */
230 	action = pq_getmsgbyte(in);
231 	if (action != 'K' && action != 'O' && action != 'N')
232 		elog(ERROR, "expected action 'N', 'O' or 'K', got %c",
233 			 action);
234 
235 	/* check for old tuple */
236 	if (action == 'K' || action == 'O')
237 	{
238 		logicalrep_read_tuple(in, oldtup);
239 		*has_oldtuple = true;
240 
241 		action = pq_getmsgbyte(in);
242 	}
243 	else
244 		*has_oldtuple = false;
245 
246 	/* check for new  tuple */
247 	if (action != 'N')
248 		elog(ERROR, "expected action 'N', got %c",
249 			 action);
250 
251 	logicalrep_read_tuple(in, newtup);
252 
253 	return relid;
254 }
255 
256 /*
257  * Write DELETE to the output stream.
258  */
259 void
logicalrep_write_delete(StringInfo out,TransactionId xid,Relation rel,HeapTuple oldtuple,bool binary)260 logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
261 						HeapTuple oldtuple, bool binary)
262 {
263 	Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
264 		   rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
265 		   rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
266 
267 	pq_sendbyte(out, LOGICAL_REP_MSG_DELETE);
268 
269 	/* transaction ID (if not valid, we're not streaming) */
270 	if (TransactionIdIsValid(xid))
271 		pq_sendint32(out, xid);
272 
273 	/* use Oid as relation identifier */
274 	pq_sendint32(out, RelationGetRelid(rel));
275 
276 	if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
277 		pq_sendbyte(out, 'O');	/* old tuple follows */
278 	else
279 		pq_sendbyte(out, 'K');	/* old key follows */
280 
281 	logicalrep_write_tuple(out, rel, oldtuple, binary);
282 }
283 
284 /*
285  * Read DELETE from stream.
286  *
287  * Fills the old tuple.
288  */
289 LogicalRepRelId
logicalrep_read_delete(StringInfo in,LogicalRepTupleData * oldtup)290 logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
291 {
292 	char		action;
293 	LogicalRepRelId relid;
294 
295 	/* read the relation id */
296 	relid = pq_getmsgint(in, 4);
297 
298 	/* read and verify action */
299 	action = pq_getmsgbyte(in);
300 	if (action != 'K' && action != 'O')
301 		elog(ERROR, "expected action 'O' or 'K', got %c", action);
302 
303 	logicalrep_read_tuple(in, oldtup);
304 
305 	return relid;
306 }
307 
308 /*
309  * Write TRUNCATE to the output stream.
310  */
311 void
logicalrep_write_truncate(StringInfo out,TransactionId xid,int nrelids,Oid relids[],bool cascade,bool restart_seqs)312 logicalrep_write_truncate(StringInfo out,
313 						  TransactionId xid,
314 						  int nrelids,
315 						  Oid relids[],
316 						  bool cascade, bool restart_seqs)
317 {
318 	int			i;
319 	uint8		flags = 0;
320 
321 	pq_sendbyte(out, LOGICAL_REP_MSG_TRUNCATE);
322 
323 	/* transaction ID (if not valid, we're not streaming) */
324 	if (TransactionIdIsValid(xid))
325 		pq_sendint32(out, xid);
326 
327 	pq_sendint32(out, nrelids);
328 
329 	/* encode and send truncate flags */
330 	if (cascade)
331 		flags |= TRUNCATE_CASCADE;
332 	if (restart_seqs)
333 		flags |= TRUNCATE_RESTART_SEQS;
334 	pq_sendint8(out, flags);
335 
336 	for (i = 0; i < nrelids; i++)
337 		pq_sendint32(out, relids[i]);
338 }
339 
340 /*
341  * Read TRUNCATE from stream.
342  */
343 List *
logicalrep_read_truncate(StringInfo in,bool * cascade,bool * restart_seqs)344 logicalrep_read_truncate(StringInfo in,
345 						 bool *cascade, bool *restart_seqs)
346 {
347 	int			i;
348 	int			nrelids;
349 	List	   *relids = NIL;
350 	uint8		flags;
351 
352 	nrelids = pq_getmsgint(in, 4);
353 
354 	/* read and decode truncate flags */
355 	flags = pq_getmsgint(in, 1);
356 	*cascade = (flags & TRUNCATE_CASCADE) > 0;
357 	*restart_seqs = (flags & TRUNCATE_RESTART_SEQS) > 0;
358 
359 	for (i = 0; i < nrelids; i++)
360 		relids = lappend_oid(relids, pq_getmsgint(in, 4));
361 
362 	return relids;
363 }
364 
365 /*
366  * Write MESSAGE to stream
367  */
368 void
logicalrep_write_message(StringInfo out,TransactionId xid,XLogRecPtr lsn,bool transactional,const char * prefix,Size sz,const char * message)369 logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
370 						 bool transactional, const char *prefix, Size sz,
371 						 const char *message)
372 {
373 	uint8		flags = 0;
374 
375 	pq_sendbyte(out, LOGICAL_REP_MSG_MESSAGE);
376 
377 	/* encode and send message flags */
378 	if (transactional)
379 		flags |= MESSAGE_TRANSACTIONAL;
380 
381 	/* transaction ID (if not valid, we're not streaming) */
382 	if (TransactionIdIsValid(xid))
383 		pq_sendint32(out, xid);
384 
385 	pq_sendint8(out, flags);
386 	pq_sendint64(out, lsn);
387 	pq_sendstring(out, prefix);
388 	pq_sendint32(out, sz);
389 	pq_sendbytes(out, message, sz);
390 }
391 
392 /*
393  * Write relation description to the output stream.
394  */
395 void
logicalrep_write_rel(StringInfo out,TransactionId xid,Relation rel)396 logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel)
397 {
398 	char	   *relname;
399 
400 	pq_sendbyte(out, LOGICAL_REP_MSG_RELATION);
401 
402 	/* transaction ID (if not valid, we're not streaming) */
403 	if (TransactionIdIsValid(xid))
404 		pq_sendint32(out, xid);
405 
406 	/* use Oid as relation identifier */
407 	pq_sendint32(out, RelationGetRelid(rel));
408 
409 	/* send qualified relation name */
410 	logicalrep_write_namespace(out, RelationGetNamespace(rel));
411 	relname = RelationGetRelationName(rel);
412 	pq_sendstring(out, relname);
413 
414 	/* send replica identity */
415 	pq_sendbyte(out, rel->rd_rel->relreplident);
416 
417 	/* send the attribute info */
418 	logicalrep_write_attrs(out, rel);
419 }
420 
421 /*
422  * Read the relation info from stream and return as LogicalRepRelation.
423  */
424 LogicalRepRelation *
logicalrep_read_rel(StringInfo in)425 logicalrep_read_rel(StringInfo in)
426 {
427 	LogicalRepRelation *rel = palloc(sizeof(LogicalRepRelation));
428 
429 	rel->remoteid = pq_getmsgint(in, 4);
430 
431 	/* Read relation name from stream */
432 	rel->nspname = pstrdup(logicalrep_read_namespace(in));
433 	rel->relname = pstrdup(pq_getmsgstring(in));
434 
435 	/* Read the replica identity. */
436 	rel->replident = pq_getmsgbyte(in);
437 
438 	/* Get attribute description */
439 	logicalrep_read_attrs(in, rel);
440 
441 	return rel;
442 }
443 
444 /*
445  * Write type info to the output stream.
446  *
447  * This function will always write base type info.
448  */
449 void
logicalrep_write_typ(StringInfo out,TransactionId xid,Oid typoid)450 logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid)
451 {
452 	Oid			basetypoid = getBaseType(typoid);
453 	HeapTuple	tup;
454 	Form_pg_type typtup;
455 
456 	pq_sendbyte(out, LOGICAL_REP_MSG_TYPE);
457 
458 	/* transaction ID (if not valid, we're not streaming) */
459 	if (TransactionIdIsValid(xid))
460 		pq_sendint32(out, xid);
461 
462 	tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(basetypoid));
463 	if (!HeapTupleIsValid(tup))
464 		elog(ERROR, "cache lookup failed for type %u", basetypoid);
465 	typtup = (Form_pg_type) GETSTRUCT(tup);
466 
467 	/* use Oid as relation identifier */
468 	pq_sendint32(out, typoid);
469 
470 	/* send qualified type name */
471 	logicalrep_write_namespace(out, typtup->typnamespace);
472 	pq_sendstring(out, NameStr(typtup->typname));
473 
474 	ReleaseSysCache(tup);
475 }
476 
477 /*
478  * Read type info from the output stream.
479  */
480 void
logicalrep_read_typ(StringInfo in,LogicalRepTyp * ltyp)481 logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
482 {
483 	ltyp->remoteid = pq_getmsgint(in, 4);
484 
485 	/* Read type name from stream */
486 	ltyp->nspname = pstrdup(logicalrep_read_namespace(in));
487 	ltyp->typname = pstrdup(pq_getmsgstring(in));
488 }
489 
490 /*
491  * Write a tuple to the outputstream, in the most efficient format possible.
492  */
493 static void
logicalrep_write_tuple(StringInfo out,Relation rel,HeapTuple tuple,bool binary)494 logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary)
495 {
496 	TupleDesc	desc;
497 	Datum		values[MaxTupleAttributeNumber];
498 	bool		isnull[MaxTupleAttributeNumber];
499 	int			i;
500 	uint16		nliveatts = 0;
501 
502 	desc = RelationGetDescr(rel);
503 
504 	for (i = 0; i < desc->natts; i++)
505 	{
506 		if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated)
507 			continue;
508 		nliveatts++;
509 	}
510 	pq_sendint16(out, nliveatts);
511 
512 	/* try to allocate enough memory from the get-go */
513 	enlargeStringInfo(out, tuple->t_len +
514 					  nliveatts * (1 + 4));
515 
516 	heap_deform_tuple(tuple, desc, values, isnull);
517 
518 	/* Write the values */
519 	for (i = 0; i < desc->natts; i++)
520 	{
521 		HeapTuple	typtup;
522 		Form_pg_type typclass;
523 		Form_pg_attribute att = TupleDescAttr(desc, i);
524 
525 		if (att->attisdropped || att->attgenerated)
526 			continue;
527 
528 		if (isnull[i])
529 		{
530 			pq_sendbyte(out, LOGICALREP_COLUMN_NULL);
531 			continue;
532 		}
533 
534 		if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(values[i]))
535 		{
536 			/*
537 			 * Unchanged toasted datum.  (Note that we don't promise to detect
538 			 * unchanged data in general; this is just a cheap check to avoid
539 			 * sending large values unnecessarily.)
540 			 */
541 			pq_sendbyte(out, LOGICALREP_COLUMN_UNCHANGED);
542 			continue;
543 		}
544 
545 		typtup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(att->atttypid));
546 		if (!HeapTupleIsValid(typtup))
547 			elog(ERROR, "cache lookup failed for type %u", att->atttypid);
548 		typclass = (Form_pg_type) GETSTRUCT(typtup);
549 
550 		/*
551 		 * Send in binary if requested and type has suitable send function.
552 		 */
553 		if (binary && OidIsValid(typclass->typsend))
554 		{
555 			bytea	   *outputbytes;
556 			int			len;
557 
558 			pq_sendbyte(out, LOGICALREP_COLUMN_BINARY);
559 			outputbytes = OidSendFunctionCall(typclass->typsend, values[i]);
560 			len = VARSIZE(outputbytes) - VARHDRSZ;
561 			pq_sendint(out, len, 4);	/* length */
562 			pq_sendbytes(out, VARDATA(outputbytes), len);	/* data */
563 			pfree(outputbytes);
564 		}
565 		else
566 		{
567 			char	   *outputstr;
568 
569 			pq_sendbyte(out, LOGICALREP_COLUMN_TEXT);
570 			outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]);
571 			pq_sendcountedtext(out, outputstr, strlen(outputstr), false);
572 			pfree(outputstr);
573 		}
574 
575 		ReleaseSysCache(typtup);
576 	}
577 }
578 
579 /*
580  * Read tuple in logical replication format from stream.
581  */
582 static void
logicalrep_read_tuple(StringInfo in,LogicalRepTupleData * tuple)583 logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
584 {
585 	int			i;
586 	int			natts;
587 
588 	/* Get number of attributes */
589 	natts = pq_getmsgint(in, 2);
590 
591 	/* Allocate space for per-column values; zero out unused StringInfoDatas */
592 	tuple->colvalues = (StringInfoData *) palloc0(natts * sizeof(StringInfoData));
593 	tuple->colstatus = (char *) palloc(natts * sizeof(char));
594 	tuple->ncols = natts;
595 
596 	/* Read the data */
597 	for (i = 0; i < natts; i++)
598 	{
599 		char		kind;
600 		int			len;
601 		StringInfo	value = &tuple->colvalues[i];
602 
603 		kind = pq_getmsgbyte(in);
604 		tuple->colstatus[i] = kind;
605 
606 		switch (kind)
607 		{
608 			case LOGICALREP_COLUMN_NULL:
609 				/* nothing more to do */
610 				break;
611 			case LOGICALREP_COLUMN_UNCHANGED:
612 				/* we don't receive the value of an unchanged column */
613 				break;
614 			case LOGICALREP_COLUMN_TEXT:
615 				len = pq_getmsgint(in, 4);	/* read length */
616 
617 				/* and data */
618 				value->data = palloc(len + 1);
619 				pq_copymsgbytes(in, value->data, len);
620 				value->data[len] = '\0';
621 				/* make StringInfo fully valid */
622 				value->len = len;
623 				value->cursor = 0;
624 				value->maxlen = len;
625 				break;
626 			case LOGICALREP_COLUMN_BINARY:
627 				len = pq_getmsgint(in, 4);	/* read length */
628 
629 				/* and data */
630 				value->data = palloc(len + 1);
631 				pq_copymsgbytes(in, value->data, len);
632 				/* not strictly necessary but per StringInfo practice */
633 				value->data[len] = '\0';
634 				/* make StringInfo fully valid */
635 				value->len = len;
636 				value->cursor = 0;
637 				value->maxlen = len;
638 				break;
639 			default:
640 				elog(ERROR, "unrecognized data representation type '%c'", kind);
641 		}
642 	}
643 }
644 
645 /*
646  * Write relation attribute metadata to the stream.
647  */
648 static void
logicalrep_write_attrs(StringInfo out,Relation rel)649 logicalrep_write_attrs(StringInfo out, Relation rel)
650 {
651 	TupleDesc	desc;
652 	int			i;
653 	uint16		nliveatts = 0;
654 	Bitmapset  *idattrs = NULL;
655 	bool		replidentfull;
656 
657 	desc = RelationGetDescr(rel);
658 
659 	/* send number of live attributes */
660 	for (i = 0; i < desc->natts; i++)
661 	{
662 		if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated)
663 			continue;
664 		nliveatts++;
665 	}
666 	pq_sendint16(out, nliveatts);
667 
668 	/* fetch bitmap of REPLICATION IDENTITY attributes */
669 	replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
670 	if (!replidentfull)
671 		idattrs = RelationGetIdentityKeyBitmap(rel);
672 
673 	/* send the attributes */
674 	for (i = 0; i < desc->natts; i++)
675 	{
676 		Form_pg_attribute att = TupleDescAttr(desc, i);
677 		uint8		flags = 0;
678 
679 		if (att->attisdropped || att->attgenerated)
680 			continue;
681 
682 		/* REPLICA IDENTITY FULL means all columns are sent as part of key. */
683 		if (replidentfull ||
684 			bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
685 						  idattrs))
686 			flags |= LOGICALREP_IS_REPLICA_IDENTITY;
687 
688 		pq_sendbyte(out, flags);
689 
690 		/* attribute name */
691 		pq_sendstring(out, NameStr(att->attname));
692 
693 		/* attribute type id */
694 		pq_sendint32(out, (int) att->atttypid);
695 
696 		/* attribute mode */
697 		pq_sendint32(out, att->atttypmod);
698 	}
699 
700 	bms_free(idattrs);
701 }
702 
703 /*
704  * Read relation attribute metadata from the stream.
705  */
706 static void
logicalrep_read_attrs(StringInfo in,LogicalRepRelation * rel)707 logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel)
708 {
709 	int			i;
710 	int			natts;
711 	char	  **attnames;
712 	Oid		   *atttyps;
713 	Bitmapset  *attkeys = NULL;
714 
715 	natts = pq_getmsgint(in, 2);
716 	attnames = palloc(natts * sizeof(char *));
717 	atttyps = palloc(natts * sizeof(Oid));
718 
719 	/* read the attributes */
720 	for (i = 0; i < natts; i++)
721 	{
722 		uint8		flags;
723 
724 		/* Check for replica identity column */
725 		flags = pq_getmsgbyte(in);
726 		if (flags & LOGICALREP_IS_REPLICA_IDENTITY)
727 			attkeys = bms_add_member(attkeys, i);
728 
729 		/* attribute name */
730 		attnames[i] = pstrdup(pq_getmsgstring(in));
731 
732 		/* attribute type id */
733 		atttyps[i] = (Oid) pq_getmsgint(in, 4);
734 
735 		/* we ignore attribute mode for now */
736 		(void) pq_getmsgint(in, 4);
737 	}
738 
739 	rel->attnames = attnames;
740 	rel->atttyps = atttyps;
741 	rel->attkeys = attkeys;
742 	rel->natts = natts;
743 }
744 
745 /*
746  * Write the namespace name or empty string for pg_catalog (to save space).
747  */
748 static void
logicalrep_write_namespace(StringInfo out,Oid nspid)749 logicalrep_write_namespace(StringInfo out, Oid nspid)
750 {
751 	if (nspid == PG_CATALOG_NAMESPACE)
752 		pq_sendbyte(out, '\0');
753 	else
754 	{
755 		char	   *nspname = get_namespace_name(nspid);
756 
757 		if (nspname == NULL)
758 			elog(ERROR, "cache lookup failed for namespace %u",
759 				 nspid);
760 
761 		pq_sendstring(out, nspname);
762 	}
763 }
764 
765 /*
766  * Read the namespace name while treating empty string as pg_catalog.
767  */
768 static const char *
logicalrep_read_namespace(StringInfo in)769 logicalrep_read_namespace(StringInfo in)
770 {
771 	const char *nspname = pq_getmsgstring(in);
772 
773 	if (nspname[0] == '\0')
774 		nspname = "pg_catalog";
775 
776 	return nspname;
777 }
778 
779 /*
780  * Write the information for the start stream message to the output stream.
781  */
782 void
logicalrep_write_stream_start(StringInfo out,TransactionId xid,bool first_segment)783 logicalrep_write_stream_start(StringInfo out,
784 							  TransactionId xid, bool first_segment)
785 {
786 	pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_START);
787 
788 	Assert(TransactionIdIsValid(xid));
789 
790 	/* transaction ID (we're starting to stream, so must be valid) */
791 	pq_sendint32(out, xid);
792 
793 	/* 1 if this is the first streaming segment for this xid */
794 	pq_sendbyte(out, first_segment ? 1 : 0);
795 }
796 
797 /*
798  * Read the information about the start stream message from output stream.
799  */
800 TransactionId
logicalrep_read_stream_start(StringInfo in,bool * first_segment)801 logicalrep_read_stream_start(StringInfo in, bool *first_segment)
802 {
803 	TransactionId xid;
804 
805 	Assert(first_segment);
806 
807 	xid = pq_getmsgint(in, 4);
808 	*first_segment = (pq_getmsgbyte(in) == 1);
809 
810 	return xid;
811 }
812 
813 /*
814  * Write the stop stream message to the output stream.
815  */
816 void
logicalrep_write_stream_stop(StringInfo out)817 logicalrep_write_stream_stop(StringInfo out)
818 {
819 	pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_END);
820 }
821 
822 /*
823  * Write STREAM COMMIT to the output stream.
824  */
825 void
logicalrep_write_stream_commit(StringInfo out,ReorderBufferTXN * txn,XLogRecPtr commit_lsn)826 logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn,
827 							   XLogRecPtr commit_lsn)
828 {
829 	uint8		flags = 0;
830 
831 	pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_COMMIT);
832 
833 	Assert(TransactionIdIsValid(txn->xid));
834 
835 	/* transaction ID */
836 	pq_sendint32(out, txn->xid);
837 
838 	/* send the flags field (unused for now) */
839 	pq_sendbyte(out, flags);
840 
841 	/* send fields */
842 	pq_sendint64(out, commit_lsn);
843 	pq_sendint64(out, txn->end_lsn);
844 	pq_sendint64(out, txn->commit_time);
845 }
846 
847 /*
848  * Read STREAM COMMIT from the output stream.
849  */
850 TransactionId
logicalrep_read_stream_commit(StringInfo in,LogicalRepCommitData * commit_data)851 logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
852 {
853 	TransactionId xid;
854 	uint8		flags;
855 
856 	xid = pq_getmsgint(in, 4);
857 
858 	/* read flags (unused for now) */
859 	flags = pq_getmsgbyte(in);
860 
861 	if (flags != 0)
862 		elog(ERROR, "unrecognized flags %u in commit message", flags);
863 
864 	/* read fields */
865 	commit_data->commit_lsn = pq_getmsgint64(in);
866 	commit_data->end_lsn = pq_getmsgint64(in);
867 	commit_data->committime = pq_getmsgint64(in);
868 
869 	return xid;
870 }
871 
872 /*
873  * Write STREAM ABORT to the output stream. Note that xid and subxid will be
874  * same for the top-level transaction abort.
875  */
876 void
logicalrep_write_stream_abort(StringInfo out,TransactionId xid,TransactionId subxid)877 logicalrep_write_stream_abort(StringInfo out, TransactionId xid,
878 							  TransactionId subxid)
879 {
880 	pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_ABORT);
881 
882 	Assert(TransactionIdIsValid(xid) && TransactionIdIsValid(subxid));
883 
884 	/* transaction ID */
885 	pq_sendint32(out, xid);
886 	pq_sendint32(out, subxid);
887 }
888 
889 /*
890  * Read STREAM ABORT from the output stream.
891  */
892 void
logicalrep_read_stream_abort(StringInfo in,TransactionId * xid,TransactionId * subxid)893 logicalrep_read_stream_abort(StringInfo in, TransactionId *xid,
894 							 TransactionId *subxid)
895 {
896 	Assert(xid && subxid);
897 
898 	*xid = pq_getmsgint(in, 4);
899 	*subxid = pq_getmsgint(in, 4);
900 }
901