1 /*-------------------------------------------------------------------------
2  *
3  * proto.c
4  *		logical replication protocol functions
5  *
6  * Copyright (c) 2015-2020, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  *		src/backend/replication/logical/proto.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include "access/sysattr.h"
16 #include "catalog/pg_namespace.h"
17 #include "catalog/pg_type.h"
18 #include "libpq/pqformat.h"
19 #include "replication/logicalproto.h"
20 #include "utils/builtins.h"
21 #include "utils/lsyscache.h"
22 #include "utils/syscache.h"
23 
24 /*
25  * Protocol message flags.
26  */
27 #define LOGICALREP_IS_REPLICA_IDENTITY 1
28 
29 #define TRUNCATE_CASCADE		(1<<0)
30 #define TRUNCATE_RESTART_SEQS	(1<<1)
31 
32 static void logicalrep_write_attrs(StringInfo out, Relation rel);
33 static void logicalrep_write_tuple(StringInfo out, Relation rel,
34 								   HeapTuple tuple);
35 
36 static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel);
37 static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple);
38 
39 static void logicalrep_write_namespace(StringInfo out, Oid nspid);
40 static const char *logicalrep_read_namespace(StringInfo in);
41 
42 /*
43  * Write BEGIN to the output stream.
44  */
45 void
logicalrep_write_begin(StringInfo out,ReorderBufferTXN * txn)46 logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
47 {
48 	pq_sendbyte(out, 'B');		/* BEGIN */
49 
50 	/* fixed fields */
51 	pq_sendint64(out, txn->final_lsn);
52 	pq_sendint64(out, txn->commit_time);
53 	pq_sendint32(out, txn->xid);
54 }
55 
56 /*
57  * Read transaction BEGIN from the stream.
58  */
59 void
logicalrep_read_begin(StringInfo in,LogicalRepBeginData * begin_data)60 logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
61 {
62 	/* read fields */
63 	begin_data->final_lsn = pq_getmsgint64(in);
64 	if (begin_data->final_lsn == InvalidXLogRecPtr)
65 		elog(ERROR, "final_lsn not set in begin message");
66 	begin_data->committime = pq_getmsgint64(in);
67 	begin_data->xid = pq_getmsgint(in, 4);
68 }
69 
70 
71 /*
72  * Write COMMIT to the output stream.
73  */
74 void
logicalrep_write_commit(StringInfo out,ReorderBufferTXN * txn,XLogRecPtr commit_lsn)75 logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn,
76 						XLogRecPtr commit_lsn)
77 {
78 	uint8		flags = 0;
79 
80 	pq_sendbyte(out, 'C');		/* sending COMMIT */
81 
82 	/* send the flags field (unused for now) */
83 	pq_sendbyte(out, flags);
84 
85 	/* send fields */
86 	pq_sendint64(out, commit_lsn);
87 	pq_sendint64(out, txn->end_lsn);
88 	pq_sendint64(out, txn->commit_time);
89 }
90 
91 /*
92  * Read transaction COMMIT from the stream.
93  */
94 void
logicalrep_read_commit(StringInfo in,LogicalRepCommitData * commit_data)95 logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
96 {
97 	/* read flags (unused for now) */
98 	uint8		flags = pq_getmsgbyte(in);
99 
100 	if (flags != 0)
101 		elog(ERROR, "unrecognized flags %u in commit message", flags);
102 
103 	/* read fields */
104 	commit_data->commit_lsn = pq_getmsgint64(in);
105 	commit_data->end_lsn = pq_getmsgint64(in);
106 	commit_data->committime = pq_getmsgint64(in);
107 }
108 
109 /*
110  * Write ORIGIN to the output stream.
111  */
112 void
logicalrep_write_origin(StringInfo out,const char * origin,XLogRecPtr origin_lsn)113 logicalrep_write_origin(StringInfo out, const char *origin,
114 						XLogRecPtr origin_lsn)
115 {
116 	pq_sendbyte(out, 'O');		/* ORIGIN */
117 
118 	/* fixed fields */
119 	pq_sendint64(out, origin_lsn);
120 
121 	/* origin string */
122 	pq_sendstring(out, origin);
123 }
124 
125 /*
126  * Read ORIGIN from the output stream.
127  */
128 char *
logicalrep_read_origin(StringInfo in,XLogRecPtr * origin_lsn)129 logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
130 {
131 	/* fixed fields */
132 	*origin_lsn = pq_getmsgint64(in);
133 
134 	/* return origin */
135 	return pstrdup(pq_getmsgstring(in));
136 }
137 
138 /*
139  * Write INSERT to the output stream.
140  */
141 void
logicalrep_write_insert(StringInfo out,Relation rel,HeapTuple newtuple)142 logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple)
143 {
144 	pq_sendbyte(out, 'I');		/* action INSERT */
145 
146 	/* use Oid as relation identifier */
147 	pq_sendint32(out, RelationGetRelid(rel));
148 
149 	pq_sendbyte(out, 'N');		/* new tuple follows */
150 	logicalrep_write_tuple(out, rel, newtuple);
151 }
152 
153 /*
154  * Read INSERT from stream.
155  *
156  * Fills the new tuple.
157  */
158 LogicalRepRelId
logicalrep_read_insert(StringInfo in,LogicalRepTupleData * newtup)159 logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
160 {
161 	char		action;
162 	LogicalRepRelId relid;
163 
164 	/* read the relation id */
165 	relid = pq_getmsgint(in, 4);
166 
167 	action = pq_getmsgbyte(in);
168 	if (action != 'N')
169 		elog(ERROR, "expected new tuple but got %d",
170 			 action);
171 
172 	logicalrep_read_tuple(in, newtup);
173 
174 	return relid;
175 }
176 
177 /*
178  * Write UPDATE to the output stream.
179  */
180 void
logicalrep_write_update(StringInfo out,Relation rel,HeapTuple oldtuple,HeapTuple newtuple)181 logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple,
182 						HeapTuple newtuple)
183 {
184 	pq_sendbyte(out, 'U');		/* action UPDATE */
185 
186 	Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
187 		   rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
188 		   rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
189 
190 	/* use Oid as relation identifier */
191 	pq_sendint32(out, RelationGetRelid(rel));
192 
193 	if (oldtuple != NULL)
194 	{
195 		if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
196 			pq_sendbyte(out, 'O');	/* old tuple follows */
197 		else
198 			pq_sendbyte(out, 'K');	/* old key follows */
199 		logicalrep_write_tuple(out, rel, oldtuple);
200 	}
201 
202 	pq_sendbyte(out, 'N');		/* new tuple follows */
203 	logicalrep_write_tuple(out, rel, newtuple);
204 }
205 
206 /*
207  * Read UPDATE from stream.
208  */
209 LogicalRepRelId
logicalrep_read_update(StringInfo in,bool * has_oldtuple,LogicalRepTupleData * oldtup,LogicalRepTupleData * newtup)210 logicalrep_read_update(StringInfo in, bool *has_oldtuple,
211 					   LogicalRepTupleData *oldtup,
212 					   LogicalRepTupleData *newtup)
213 {
214 	char		action;
215 	LogicalRepRelId relid;
216 
217 	/* read the relation id */
218 	relid = pq_getmsgint(in, 4);
219 
220 	/* read and verify action */
221 	action = pq_getmsgbyte(in);
222 	if (action != 'K' && action != 'O' && action != 'N')
223 		elog(ERROR, "expected action 'N', 'O' or 'K', got %c",
224 			 action);
225 
226 	/* check for old tuple */
227 	if (action == 'K' || action == 'O')
228 	{
229 		logicalrep_read_tuple(in, oldtup);
230 		*has_oldtuple = true;
231 
232 		action = pq_getmsgbyte(in);
233 	}
234 	else
235 		*has_oldtuple = false;
236 
237 	/* check for new  tuple */
238 	if (action != 'N')
239 		elog(ERROR, "expected action 'N', got %c",
240 			 action);
241 
242 	logicalrep_read_tuple(in, newtup);
243 
244 	return relid;
245 }
246 
247 /*
248  * Write DELETE to the output stream.
249  */
250 void
logicalrep_write_delete(StringInfo out,Relation rel,HeapTuple oldtuple)251 logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple)
252 {
253 	Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
254 		   rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
255 		   rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
256 
257 	pq_sendbyte(out, 'D');		/* action DELETE */
258 
259 	/* use Oid as relation identifier */
260 	pq_sendint32(out, RelationGetRelid(rel));
261 
262 	if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
263 		pq_sendbyte(out, 'O');	/* old tuple follows */
264 	else
265 		pq_sendbyte(out, 'K');	/* old key follows */
266 
267 	logicalrep_write_tuple(out, rel, oldtuple);
268 }
269 
270 /*
271  * Read DELETE from stream.
272  *
273  * Fills the old tuple.
274  */
275 LogicalRepRelId
logicalrep_read_delete(StringInfo in,LogicalRepTupleData * oldtup)276 logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
277 {
278 	char		action;
279 	LogicalRepRelId relid;
280 
281 	/* read the relation id */
282 	relid = pq_getmsgint(in, 4);
283 
284 	/* read and verify action */
285 	action = pq_getmsgbyte(in);
286 	if (action != 'K' && action != 'O')
287 		elog(ERROR, "expected action 'O' or 'K', got %c", action);
288 
289 	logicalrep_read_tuple(in, oldtup);
290 
291 	return relid;
292 }
293 
294 /*
295  * Write TRUNCATE to the output stream.
296  */
297 void
logicalrep_write_truncate(StringInfo out,int nrelids,Oid relids[],bool cascade,bool restart_seqs)298 logicalrep_write_truncate(StringInfo out,
299 						  int nrelids,
300 						  Oid relids[],
301 						  bool cascade, bool restart_seqs)
302 {
303 	int			i;
304 	uint8		flags = 0;
305 
306 	pq_sendbyte(out, 'T');		/* action TRUNCATE */
307 
308 	pq_sendint32(out, nrelids);
309 
310 	/* encode and send truncate flags */
311 	if (cascade)
312 		flags |= TRUNCATE_CASCADE;
313 	if (restart_seqs)
314 		flags |= TRUNCATE_RESTART_SEQS;
315 	pq_sendint8(out, flags);
316 
317 	for (i = 0; i < nrelids; i++)
318 		pq_sendint32(out, relids[i]);
319 }
320 
321 /*
322  * Read TRUNCATE from stream.
323  */
324 List *
logicalrep_read_truncate(StringInfo in,bool * cascade,bool * restart_seqs)325 logicalrep_read_truncate(StringInfo in,
326 						 bool *cascade, bool *restart_seqs)
327 {
328 	int			i;
329 	int			nrelids;
330 	List	   *relids = NIL;
331 	uint8		flags;
332 
333 	nrelids = pq_getmsgint(in, 4);
334 
335 	/* read and decode truncate flags */
336 	flags = pq_getmsgint(in, 1);
337 	*cascade = (flags & TRUNCATE_CASCADE) > 0;
338 	*restart_seqs = (flags & TRUNCATE_RESTART_SEQS) > 0;
339 
340 	for (i = 0; i < nrelids; i++)
341 		relids = lappend_oid(relids, pq_getmsgint(in, 4));
342 
343 	return relids;
344 }
345 
346 /*
347  * Write relation description to the output stream.
348  */
349 void
logicalrep_write_rel(StringInfo out,Relation rel)350 logicalrep_write_rel(StringInfo out, Relation rel)
351 {
352 	char	   *relname;
353 
354 	pq_sendbyte(out, 'R');		/* sending RELATION */
355 
356 	/* use Oid as relation identifier */
357 	pq_sendint32(out, RelationGetRelid(rel));
358 
359 	/* send qualified relation name */
360 	logicalrep_write_namespace(out, RelationGetNamespace(rel));
361 	relname = RelationGetRelationName(rel);
362 	pq_sendstring(out, relname);
363 
364 	/* send replica identity */
365 	pq_sendbyte(out, rel->rd_rel->relreplident);
366 
367 	/* send the attribute info */
368 	logicalrep_write_attrs(out, rel);
369 }
370 
371 /*
372  * Read the relation info from stream and return as LogicalRepRelation.
373  */
374 LogicalRepRelation *
logicalrep_read_rel(StringInfo in)375 logicalrep_read_rel(StringInfo in)
376 {
377 	LogicalRepRelation *rel = palloc(sizeof(LogicalRepRelation));
378 
379 	rel->remoteid = pq_getmsgint(in, 4);
380 
381 	/* Read relation name from stream */
382 	rel->nspname = pstrdup(logicalrep_read_namespace(in));
383 	rel->relname = pstrdup(pq_getmsgstring(in));
384 
385 	/* Read the replica identity. */
386 	rel->replident = pq_getmsgbyte(in);
387 
388 	/* Get attribute description */
389 	logicalrep_read_attrs(in, rel);
390 
391 	return rel;
392 }
393 
394 /*
395  * Write type info to the output stream.
396  *
397  * This function will always write base type info.
398  */
399 void
logicalrep_write_typ(StringInfo out,Oid typoid)400 logicalrep_write_typ(StringInfo out, Oid typoid)
401 {
402 	Oid			basetypoid = getBaseType(typoid);
403 	HeapTuple	tup;
404 	Form_pg_type typtup;
405 
406 	pq_sendbyte(out, 'Y');		/* sending TYPE */
407 
408 	tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(basetypoid));
409 	if (!HeapTupleIsValid(tup))
410 		elog(ERROR, "cache lookup failed for type %u", basetypoid);
411 	typtup = (Form_pg_type) GETSTRUCT(tup);
412 
413 	/* use Oid as relation identifier */
414 	pq_sendint32(out, typoid);
415 
416 	/* send qualified type name */
417 	logicalrep_write_namespace(out, typtup->typnamespace);
418 	pq_sendstring(out, NameStr(typtup->typname));
419 
420 	ReleaseSysCache(tup);
421 }
422 
423 /*
424  * Read type info from the output stream.
425  */
426 void
logicalrep_read_typ(StringInfo in,LogicalRepTyp * ltyp)427 logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
428 {
429 	ltyp->remoteid = pq_getmsgint(in, 4);
430 
431 	/* Read type name from stream */
432 	ltyp->nspname = pstrdup(logicalrep_read_namespace(in));
433 	ltyp->typname = pstrdup(pq_getmsgstring(in));
434 }
435 
436 /*
437  * Write a tuple to the outputstream, in the most efficient format possible.
438  */
439 static void
logicalrep_write_tuple(StringInfo out,Relation rel,HeapTuple tuple)440 logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple)
441 {
442 	TupleDesc	desc;
443 	Datum		values[MaxTupleAttributeNumber];
444 	bool		isnull[MaxTupleAttributeNumber];
445 	int			i;
446 	uint16		nliveatts = 0;
447 
448 	desc = RelationGetDescr(rel);
449 
450 	for (i = 0; i < desc->natts; i++)
451 	{
452 		if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated)
453 			continue;
454 		nliveatts++;
455 	}
456 	pq_sendint16(out, nliveatts);
457 
458 	/* try to allocate enough memory from the get-go */
459 	enlargeStringInfo(out, tuple->t_len +
460 					  nliveatts * (1 + 4));
461 
462 	heap_deform_tuple(tuple, desc, values, isnull);
463 
464 	/* Write the values */
465 	for (i = 0; i < desc->natts; i++)
466 	{
467 		HeapTuple	typtup;
468 		Form_pg_type typclass;
469 		Form_pg_attribute att = TupleDescAttr(desc, i);
470 		char	   *outputstr;
471 
472 		if (att->attisdropped || att->attgenerated)
473 			continue;
474 
475 		if (isnull[i])
476 		{
477 			pq_sendbyte(out, 'n');	/* null column */
478 			continue;
479 		}
480 		else if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(values[i]))
481 		{
482 			pq_sendbyte(out, 'u');	/* unchanged toast column */
483 			continue;
484 		}
485 
486 		typtup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(att->atttypid));
487 		if (!HeapTupleIsValid(typtup))
488 			elog(ERROR, "cache lookup failed for type %u", att->atttypid);
489 		typclass = (Form_pg_type) GETSTRUCT(typtup);
490 
491 		pq_sendbyte(out, 't');	/* 'text' data follows */
492 
493 		outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]);
494 		pq_sendcountedtext(out, outputstr, strlen(outputstr), false);
495 		pfree(outputstr);
496 
497 		ReleaseSysCache(typtup);
498 	}
499 }
500 
501 /*
502  * Read tuple in remote format from stream.
503  *
504  * The returned tuple points into the input stringinfo.
505  */
506 static void
logicalrep_read_tuple(StringInfo in,LogicalRepTupleData * tuple)507 logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
508 {
509 	int			i;
510 	int			natts;
511 
512 	/* Get number of attributes */
513 	natts = pq_getmsgint(in, 2);
514 
515 	memset(tuple->changed, 0, sizeof(tuple->changed));
516 
517 	/* Read the data */
518 	for (i = 0; i < natts; i++)
519 	{
520 		char		kind;
521 
522 		kind = pq_getmsgbyte(in);
523 
524 		switch (kind)
525 		{
526 			case 'n':			/* null */
527 				tuple->values[i] = NULL;
528 				tuple->changed[i] = true;
529 				break;
530 			case 'u':			/* unchanged column */
531 				/* we don't receive the value of an unchanged column */
532 				tuple->values[i] = NULL;
533 				break;
534 			case 't':			/* text formatted value */
535 				{
536 					int			len;
537 
538 					tuple->changed[i] = true;
539 
540 					len = pq_getmsgint(in, 4);	/* read length */
541 
542 					/* and data */
543 					tuple->values[i] = palloc(len + 1);
544 					pq_copymsgbytes(in, tuple->values[i], len);
545 					tuple->values[i][len] = '\0';
546 				}
547 				break;
548 			default:
549 				elog(ERROR, "unrecognized data representation type '%c'", kind);
550 		}
551 	}
552 }
553 
554 /*
555  * Write relation attributes to the stream.
556  */
557 static void
logicalrep_write_attrs(StringInfo out,Relation rel)558 logicalrep_write_attrs(StringInfo out, Relation rel)
559 {
560 	TupleDesc	desc;
561 	int			i;
562 	uint16		nliveatts = 0;
563 	Bitmapset  *idattrs = NULL;
564 	bool		replidentfull;
565 
566 	desc = RelationGetDescr(rel);
567 
568 	/* send number of live attributes */
569 	for (i = 0; i < desc->natts; i++)
570 	{
571 		if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated)
572 			continue;
573 		nliveatts++;
574 	}
575 	pq_sendint16(out, nliveatts);
576 
577 	/* fetch bitmap of REPLICATION IDENTITY attributes */
578 	replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
579 	if (!replidentfull)
580 		idattrs = RelationGetIndexAttrBitmap(rel,
581 											 INDEX_ATTR_BITMAP_IDENTITY_KEY);
582 
583 	/* send the attributes */
584 	for (i = 0; i < desc->natts; i++)
585 	{
586 		Form_pg_attribute att = TupleDescAttr(desc, i);
587 		uint8		flags = 0;
588 
589 		if (att->attisdropped || att->attgenerated)
590 			continue;
591 
592 		/* REPLICA IDENTITY FULL means all columns are sent as part of key. */
593 		if (replidentfull ||
594 			bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
595 						  idattrs))
596 			flags |= LOGICALREP_IS_REPLICA_IDENTITY;
597 
598 		pq_sendbyte(out, flags);
599 
600 		/* attribute name */
601 		pq_sendstring(out, NameStr(att->attname));
602 
603 		/* attribute type id */
604 		pq_sendint32(out, (int) att->atttypid);
605 
606 		/* attribute mode */
607 		pq_sendint32(out, att->atttypmod);
608 	}
609 
610 	bms_free(idattrs);
611 }
612 
613 /*
614  * Read relation attribute names from the stream.
615  */
616 static void
logicalrep_read_attrs(StringInfo in,LogicalRepRelation * rel)617 logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel)
618 {
619 	int			i;
620 	int			natts;
621 	char	  **attnames;
622 	Oid		   *atttyps;
623 	Bitmapset  *attkeys = NULL;
624 
625 	natts = pq_getmsgint(in, 2);
626 	attnames = palloc(natts * sizeof(char *));
627 	atttyps = palloc(natts * sizeof(Oid));
628 
629 	/* read the attributes */
630 	for (i = 0; i < natts; i++)
631 	{
632 		uint8		flags;
633 
634 		/* Check for replica identity column */
635 		flags = pq_getmsgbyte(in);
636 		if (flags & LOGICALREP_IS_REPLICA_IDENTITY)
637 			attkeys = bms_add_member(attkeys, i);
638 
639 		/* attribute name */
640 		attnames[i] = pstrdup(pq_getmsgstring(in));
641 
642 		/* attribute type id */
643 		atttyps[i] = (Oid) pq_getmsgint(in, 4);
644 
645 		/* we ignore attribute mode for now */
646 		(void) pq_getmsgint(in, 4);
647 	}
648 
649 	rel->attnames = attnames;
650 	rel->atttyps = atttyps;
651 	rel->attkeys = attkeys;
652 	rel->natts = natts;
653 }
654 
655 /*
656  * Write the namespace name or empty string for pg_catalog (to save space).
657  */
658 static void
logicalrep_write_namespace(StringInfo out,Oid nspid)659 logicalrep_write_namespace(StringInfo out, Oid nspid)
660 {
661 	if (nspid == PG_CATALOG_NAMESPACE)
662 		pq_sendbyte(out, '\0');
663 	else
664 	{
665 		char	   *nspname = get_namespace_name(nspid);
666 
667 		if (nspname == NULL)
668 			elog(ERROR, "cache lookup failed for namespace %u",
669 				 nspid);
670 
671 		pq_sendstring(out, nspname);
672 	}
673 }
674 
675 /*
676  * Read the namespace name while treating empty string as pg_catalog.
677  */
678 static const char *
logicalrep_read_namespace(StringInfo in)679 logicalrep_read_namespace(StringInfo in)
680 {
681 	const char *nspname = pq_getmsgstring(in);
682 
683 	if (nspname[0] == '\0')
684 		nspname = "pg_catalog";
685 
686 	return nspname;
687 }
688