1 /*-------------------------------------------------------------------------
2  *
3  * pglogical_proto_native.c
4  * 		pglogical binary protocol functions
5  *
6  * Copyright (c) 2015, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  *		  pglogical_proto_native.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include "access/sysattr.h"
16 #if PG_VERSION_NUM >= 130000
17 #include "access/detoast.h"
18 #else
19 #include "access/tuptoaster.h"
20 #endif
21 #include "catalog/pg_type.h"
22 #include "libpq/pqformat.h"
23 #include "nodes/parsenodes.h"
24 #include "replication/reorderbuffer.h"
25 #include "utils/lsyscache.h"
26 #include "utils/rel.h"
27 #include "utils/syscache.h"
28 
29 #include "pglogical_output_plugin.h"
30 #include "pglogical_output_proto.h"
31 #include "pglogical_proto_native.h"
32 
33 #define IS_REPLICA_IDENTITY 1
34 
35 static void pglogical_write_attrs(StringInfo out, Relation rel,
36 								  Bitmapset *att_list);
37 static void pglogical_write_tuple(StringInfo out, PGLogicalOutputData *data,
38 								  Relation rel, HeapTuple tuple,
39 								  Bitmapset *att_list);
40 static char decide_datum_transfer(Form_pg_attribute att,
41 								  Form_pg_type typclass,
42 								  bool allow_internal_basetypes,
43 								  bool allow_binary_basetypes);
44 
45 static void pglogical_read_attrs(StringInfo in, char ***attrnames,
46 								  int *nattrnames);
47 static void pglogical_read_tuple(StringInfo in, PGLogicalRelation *rel,
48 					  PGLogicalTupleData *tuple);
49 
50 /*
51  * Write functions
52  */
53 
54 /*
55  * Write relation description to the output stream.
56  */
57 void
pglogical_write_rel(StringInfo out,PGLogicalOutputData * data,Relation rel,Bitmapset * att_list)58 pglogical_write_rel(StringInfo out, PGLogicalOutputData *data, Relation rel,
59 					Bitmapset *att_list)
60 {
61 	char	   *nspname;
62 	uint8		nspnamelen;
63 	const char *relname;
64 	uint8		relnamelen;
65 	uint8		flags = 0;
66 
67 	pq_sendbyte(out, 'R');		/* sending RELATION */
68 
69 	/* send the flags field */
70 	pq_sendbyte(out, flags);
71 
72 	/* use Oid as relation identifier */
73 	pq_sendint(out, RelationGetRelid(rel), 4);
74 
75 	nspname = get_namespace_name(rel->rd_rel->relnamespace);
76 	if (nspname == NULL)
77 		elog(ERROR, "cache lookup failed for namespace %u",
78 			 rel->rd_rel->relnamespace);
79 	nspnamelen = strlen(nspname) + 1;
80 
81 	relname = NameStr(rel->rd_rel->relname);
82 	relnamelen = strlen(relname) + 1;
83 
84 	pq_sendbyte(out, nspnamelen);		/* schema name length */
85 	pq_sendbytes(out, nspname, nspnamelen);
86 
87 	pq_sendbyte(out, relnamelen);		/* table name length */
88 	pq_sendbytes(out, relname, relnamelen);
89 
90 	/* send the attribute info */
91 	pglogical_write_attrs(out, rel, att_list);
92 
93 	pfree(nspname);
94 }
95 
96 /*
97  * Write relation attributes to the outputstream.
98  */
99 static void
pglogical_write_attrs(StringInfo out,Relation rel,Bitmapset * att_list)100 pglogical_write_attrs(StringInfo out, Relation rel, Bitmapset *att_list)
101 {
102 	TupleDesc	desc;
103 	int			i;
104 	uint16		nliveatts = 0;
105 	Bitmapset  *idattrs;
106 
107 	desc = RelationGetDescr(rel);
108 
109 	pq_sendbyte(out, 'A');			/* sending ATTRS */
110 
111 	/* send number of live attributes */
112 	for (i = 0; i < desc->natts; i++)
113 	{
114 		Form_pg_attribute att = TupleDescAttr(desc,i);
115 
116 		if (att->attisdropped)
117 			continue;
118 		if (att_list &&
119 			!bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
120 						   att_list))
121 			continue;
122 		nliveatts++;
123 	}
124 	pq_sendint(out, nliveatts, 2);
125 
126 	/* fetch bitmap of REPLICATION IDENTITY attributes */
127 	idattrs = RelationGetIndexAttrBitmap(rel, INDEX_ATTR_BITMAP_IDENTITY_KEY);
128 
129 	/* send the attributes */
130 	for (i = 0; i < desc->natts; i++)
131 	{
132 		Form_pg_attribute att = TupleDescAttr(desc,i);
133 		uint8			flags = 0;
134 		uint16			len;
135 		const char	   *attname;
136 
137 		if (att->attisdropped)
138 			continue;
139 		if (att_list &&
140 			!bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
141 						   att_list))
142 			continue;
143 
144 		if (bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
145 						  idattrs))
146 			flags |= IS_REPLICA_IDENTITY;
147 
148 		pq_sendbyte(out, 'C');		/* column definition follows */
149 		pq_sendbyte(out, flags);
150 
151 		pq_sendbyte(out, 'N');		/* column name block follows */
152 		attname = NameStr(att->attname);
153 		len = strlen(attname) + 1;
154 		pq_sendint(out, len, 2);
155 		pq_sendbytes(out, attname, len); /* data */
156 	}
157 
158 	bms_free(idattrs);
159 }
160 
161 /*
162  * Write BEGIN to the output stream.
163  */
164 void
pglogical_write_begin(StringInfo out,PGLogicalOutputData * data,ReorderBufferTXN * txn)165 pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
166 					  ReorderBufferTXN *txn)
167 {
168 	uint8	flags = 0;
169 
170 	pq_sendbyte(out, 'B');		/* BEGIN */
171 
172 	/* send the flags field its self */
173 	pq_sendbyte(out, flags);
174 
175 	/* fixed fields */
176 	pq_sendint64(out, txn->final_lsn);
177 	pq_sendint64(out, txn->commit_time);
178 	pq_sendint(out, txn->xid, 4);
179 }
180 
181 /*
182  * Write COMMIT to the output stream.
183  */
184 void
pglogical_write_commit(StringInfo out,PGLogicalOutputData * data,ReorderBufferTXN * txn,XLogRecPtr commit_lsn)185 pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
186 					   ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
187 {
188 	uint8 flags = 0;
189 
190 	pq_sendbyte(out, 'C');		/* sending COMMIT */
191 
192 	/* send the flags field */
193 	pq_sendbyte(out, flags);
194 
195 	/* send fixed fields */
196 	pq_sendint64(out, commit_lsn);
197 	pq_sendint64(out, txn->end_lsn);
198 	pq_sendint64(out, txn->commit_time);
199 }
200 
201 /*
202  * Write ORIGIN to the output stream.
203  */
204 void
pglogical_write_origin(StringInfo out,const char * origin,XLogRecPtr origin_lsn)205 pglogical_write_origin(StringInfo out, const char *origin,
206 						XLogRecPtr origin_lsn)
207 {
208 	uint8	flags = 0;
209 	uint8	len;
210 
211 	Assert(strlen(origin) < 255);
212 
213 	pq_sendbyte(out, 'O');		/* ORIGIN */
214 
215 	/* send the flags field its self */
216 	pq_sendbyte(out, flags);
217 
218 	/* fixed fields */
219 	pq_sendint64(out, origin_lsn);
220 
221 	/* origin */
222 	len = strlen(origin) + 1;
223 	pq_sendbyte(out, len);
224 	pq_sendbytes(out, origin, len);
225 }
226 
227 /*
228  * Write INSERT to the output stream.
229  */
230 void
pglogical_write_insert(StringInfo out,PGLogicalOutputData * data,Relation rel,HeapTuple newtuple,Bitmapset * att_list)231 pglogical_write_insert(StringInfo out, PGLogicalOutputData *data,
232 						Relation rel, HeapTuple newtuple,
233 						Bitmapset *att_list)
234 {
235 	uint8 flags = 0;
236 
237 	pq_sendbyte(out, 'I');		/* action INSERT */
238 
239 	/* send the flags field */
240 	pq_sendbyte(out, flags);
241 
242 	/* use Oid as relation identifier */
243 	pq_sendint(out, RelationGetRelid(rel), 4);
244 
245 	pq_sendbyte(out, 'N');		/* new tuple follows */
246 	pglogical_write_tuple(out, data, rel, newtuple, att_list);
247 }
248 
249 /*
250  * Write UPDATE to the output stream.
251  */
252 void
pglogical_write_update(StringInfo out,PGLogicalOutputData * data,Relation rel,HeapTuple oldtuple,HeapTuple newtuple,Bitmapset * att_list)253 pglogical_write_update(StringInfo out, PGLogicalOutputData *data,
254 						Relation rel, HeapTuple oldtuple, HeapTuple newtuple,
255 						Bitmapset *att_list)
256 {
257 	uint8 flags = 0;
258 
259 	pq_sendbyte(out, 'U');		/* action UPDATE */
260 
261 	/* send the flags field */
262 	pq_sendbyte(out, flags);
263 
264 	/* use Oid as relation identifier */
265 	pq_sendint(out, RelationGetRelid(rel), 4);
266 
267 	/*
268 	 * TODO: support whole tuple (O tuple type)
269 	 *
270 	 * Right now we can only write the key-part since logical decoding
271 	 * doesn't know how to record the whole old tuple for us in WAL.
272 	 * We can't use REPLICA IDENTITY FULL for this, since that makes
273 	 * the key-part the whole tuple, causing issues with conflict
274 	 * resultion and index lookups. We need a separate decoding option
275 	 * to record whole tuples.
276 	 */
277 	if (oldtuple != NULL)
278 	{
279 		pq_sendbyte(out, 'K');	/* old key follows */
280 		pglogical_write_tuple(out, data, rel, oldtuple, att_list);
281 	}
282 
283 	pq_sendbyte(out, 'N');		/* new tuple follows */
284 	pglogical_write_tuple(out, data, rel, newtuple, att_list);
285 }
286 
287 /*
288  * Write DELETE to the output stream.
289  */
290 void
pglogical_write_delete(StringInfo out,PGLogicalOutputData * data,Relation rel,HeapTuple oldtuple,Bitmapset * att_list)291 pglogical_write_delete(StringInfo out, PGLogicalOutputData *data,
292 						Relation rel, HeapTuple oldtuple,
293 						Bitmapset *att_list)
294 {
295 	uint8 flags = 0;
296 
297 	pq_sendbyte(out, 'D');		/* action DELETE */
298 
299 	/* send the flags field */
300 	pq_sendbyte(out, flags);
301 
302 	/* use Oid as relation identifier */
303 	pq_sendint(out, RelationGetRelid(rel), 4);
304 
305 	/*
306 	 * TODO support whole tuple ('O' tuple type)
307 	 *
308 	 * See notes on update for details
309 	 */
310 	pq_sendbyte(out, 'K');	/* old key follows */
311 	pglogical_write_tuple(out, data, rel, oldtuple, att_list);
312 }
313 
314 /*
315  * Most of the brains for startup message creation lives in
316  * pglogical_config.c, so this presently just sends the set of key/value pairs.
317  */
318 void
write_startup_message(StringInfo out,List * msg)319 write_startup_message(StringInfo out, List *msg)
320 {
321 	ListCell *lc;
322 
323 	pq_sendbyte(out, 'S');	/* message type field */
324 	pq_sendbyte(out, PGLOGICAL_STARTUP_MSG_FORMAT_FLAT); 	/* startup message version */
325 	foreach (lc, msg)
326 	{
327 		DefElem *param = (DefElem*)lfirst(lc);
328 		Assert(IsA(param->arg, String) && strVal(param->arg) != NULL);
329 		/* null-terminated key and value pairs, in client_encoding */
330 		pq_sendstring(out, param->defname);
331 		pq_sendstring(out, strVal(param->arg));
332 	}
333 }
334 
335 /*
336  * Write a tuple to the outputstream, in the most efficient format possible.
337  */
338 static void
pglogical_write_tuple(StringInfo out,PGLogicalOutputData * data,Relation rel,HeapTuple tuple,Bitmapset * att_list)339 pglogical_write_tuple(StringInfo out, PGLogicalOutputData *data,
340 					  Relation rel, HeapTuple tuple, Bitmapset *att_list)
341 {
342 	TupleDesc	desc;
343 	Datum		values[MaxTupleAttributeNumber];
344 	bool		isnull[MaxTupleAttributeNumber];
345 	int			i;
346 	uint16		nliveatts = 0;
347 
348 	desc = RelationGetDescr(rel);
349 
350 	pq_sendbyte(out, 'T');			/* sending TUPLE */
351 
352 	for (i = 0; i < desc->natts; i++)
353 	{
354 		Form_pg_attribute att = TupleDescAttr(desc,i);
355 
356 		if (att->attisdropped)
357 			continue;
358 		if (att_list &&
359 			!bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
360 						   att_list))
361 			continue;
362 		nliveatts++;
363 	}
364 	pq_sendint(out, nliveatts, 2);
365 
366 	/* try to allocate enough memory from the get go */
367 	enlargeStringInfo(out, tuple->t_len +
368 					  nliveatts * (1 + 4));
369 
370 	/*
371 	 * XXX: should this prove to be a relevant bottleneck, it might be
372 	 * interesting to inline heap_deform_tuple() here, we don't actually need
373 	 * the information in the form we get from it.
374 	 */
375 	heap_deform_tuple(tuple, desc, values, isnull);
376 
377 	for (i = 0; i < desc->natts; i++)
378 	{
379 		HeapTuple	typtup;
380 		Form_pg_type typclass;
381 		Form_pg_attribute att = TupleDescAttr(desc,i);
382 		char		transfer_type;
383 
384 		/* skip dropped columns */
385 		if (att->attisdropped)
386 			continue;
387 		if (att_list &&
388 			!bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
389 						   att_list))
390 			continue;
391 
392 		if (isnull[i])
393 		{
394 			pq_sendbyte(out, 'n');	/* null column */
395 			continue;
396 		}
397 		else if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(values[i]))
398 		{
399 			pq_sendbyte(out, 'u');	/* unchanged toast column */
400 			continue;
401 		}
402 
403 		typtup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(att->atttypid));
404 		if (!HeapTupleIsValid(typtup))
405 			elog(ERROR, "cache lookup failed for type %u", att->atttypid);
406 		typclass = (Form_pg_type) GETSTRUCT(typtup);
407 
408 		transfer_type = decide_datum_transfer(att, typclass,
409 											  data->allow_internal_basetypes,
410 											  data->allow_binary_basetypes);
411 
412 		switch (transfer_type)
413 		{
414 			case 'i':
415 				pq_sendbyte(out, 'i');	/* internal-format binary data follows */
416 
417 				/* pass by value */
418 				if (att->attbyval)
419 				{
420 					pq_sendint(out, att->attlen, 4); /* length */
421 
422 					enlargeStringInfo(out, att->attlen);
423 					store_att_byval(out->data + out->len, values[i],
424 									att->attlen);
425 					out->len += att->attlen;
426 					out->data[out->len] = '\0';
427 				}
428 				/* fixed length non-varlena pass-by-reference type */
429 				else if (att->attlen > 0)
430 				{
431 					pq_sendint(out, att->attlen, 4); /* length */
432 
433 					appendBinaryStringInfo(out, DatumGetPointer(values[i]),
434 										   att->attlen);
435 				}
436 				/* varlena type */
437 				else if (att->attlen == -1)
438 				{
439 					char *data = DatumGetPointer(values[i]);
440 
441 					/* send indirect datums inline */
442 					if (VARATT_IS_EXTERNAL_INDIRECT(values[i]))
443 					{
444 						struct varatt_indirect redirect;
445 						VARATT_EXTERNAL_GET_POINTER(redirect, data);
446 						data = (char *) redirect.pointer;
447 					}
448 
449 					Assert(!VARATT_IS_EXTERNAL(data));
450 
451 					pq_sendint(out, VARSIZE_ANY(data), 4); /* length */
452 
453 					appendBinaryStringInfo(out, data, VARSIZE_ANY(data));
454 				}
455 				else
456 					elog(ERROR, "unsupported tuple type");
457 
458 				break;
459 
460 			case 'b':
461 				{
462 					bytea	   *outputbytes;
463 					int			len;
464 
465 					pq_sendbyte(out, 'b');	/* binary send/recv data follows */
466 
467 					outputbytes = OidSendFunctionCall(typclass->typsend,
468 													  values[i]);
469 
470 					len = VARSIZE(outputbytes) - VARHDRSZ;
471 					pq_sendint(out, len, 4); /* length */
472 					pq_sendbytes(out, VARDATA(outputbytes), len); /* data */
473 					pfree(outputbytes);
474 				}
475 				break;
476 
477 			default:
478 				{
479 					char   	   *outputstr;
480 					int			len;
481 
482 					pq_sendbyte(out, 't');	/* 'text' data follows */
483 
484 					outputstr =	OidOutputFunctionCall(typclass->typoutput,
485 													  values[i]);
486 					len = strlen(outputstr) + 1;
487 					pq_sendint(out, len, 4); /* length */
488 					appendBinaryStringInfo(out, outputstr, len); /* data */
489 					pfree(outputstr);
490 				}
491 		}
492 
493 		ReleaseSysCache(typtup);
494 	}
495 }
496 
497 /*
498  * Make the executive decision about which protocol to use.
499  */
500 static char
decide_datum_transfer(Form_pg_attribute att,Form_pg_type typclass,bool allow_internal_basetypes,bool allow_binary_basetypes)501 decide_datum_transfer(Form_pg_attribute att, Form_pg_type typclass,
502 					  bool allow_internal_basetypes,
503 					  bool allow_binary_basetypes)
504 {
505 	/*
506 	 * Use the binary protocol, if allowed, for builtin & plain datatypes.
507 	 */
508 	if (allow_internal_basetypes &&
509 		typclass->typtype == 'b' &&
510 		att->atttypid < FirstNormalObjectId &&
511 		typclass->typelem == InvalidOid)
512 	{
513 		return 'i';
514 	}
515 	/*
516 	 * Use send/recv, if allowed, if the type is plain or builtin.
517 	 *
518 	 * XXX: we can't use send/recv for array or composite types for now due to
519 	 * the embedded oids.
520 	 */
521 	else if (allow_binary_basetypes &&
522 			 OidIsValid(typclass->typreceive) &&
523 			 (att->atttypid < FirstNormalObjectId || typclass->typtype != 'c') &&
524 			 (att->atttypid < FirstNormalObjectId || typclass->typelem == InvalidOid))
525 	{
526 		return 'b';
527 	}
528 
529 	return 't';
530 }
531 
532 
533 /*
534  * Read functions.
535  */
536 
537 /*
538  * Read transaction BEGIN from the stream.
539  */
540 void
pglogical_read_begin(StringInfo in,XLogRecPtr * remote_lsn,TimestampTz * committime,TransactionId * remote_xid)541 pglogical_read_begin(StringInfo in, XLogRecPtr *remote_lsn,
542 					  TimestampTz *committime, TransactionId *remote_xid)
543 {
544 	/* read flags */
545 	uint8	flags = pq_getmsgbyte(in);
546 	Assert(flags == 0);
547 	(void) flags; /* unused */
548 
549 	/* read fields */
550 	*remote_lsn = pq_getmsgint64(in);
551 	Assert(*remote_lsn != InvalidXLogRecPtr);
552 	*committime = pq_getmsgint64(in);
553 	*remote_xid = pq_getmsgint(in, 4);
554 }
555 
556 /*
557  * Read transaction COMMIT from the stream.
558  */
559 void
pglogical_read_commit(StringInfo in,XLogRecPtr * commit_lsn,XLogRecPtr * end_lsn,TimestampTz * committime)560 pglogical_read_commit(StringInfo in, XLogRecPtr *commit_lsn,
561 					   XLogRecPtr *end_lsn, TimestampTz *committime)
562 {
563 	/* read flags */
564 	uint8	flags = pq_getmsgbyte(in);
565 	Assert(flags == 0);
566 	(void) flags; /* unused */
567 
568 	/* read fields */
569 	*commit_lsn = pq_getmsgint64(in);
570 	*end_lsn = pq_getmsgint64(in);
571 	*committime = pq_getmsgint64(in);
572 }
573 
574 /*
575  * Read ORIGIN from the output stream.
576  */
577 char *
pglogical_read_origin(StringInfo in,XLogRecPtr * origin_lsn)578 pglogical_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
579 {
580 	uint8	flags;
581 	uint8	len;
582 
583 	/* read the flags */
584 	flags = pq_getmsgbyte(in);
585 	Assert(flags == 0);
586 	(void) flags; /* unused */
587 
588 	/* fixed fields */
589 	*origin_lsn = pq_getmsgint64(in);
590 
591 	/* origin */
592 	len = pq_getmsgbyte(in);
593 	return pnstrdup(pq_getmsgbytes(in, len), len);
594 }
595 
596 
597 /*
598  * Read INSERT from stream.
599  *
600  * Fills the new tuple.
601  */
602 PGLogicalRelation *
pglogical_read_insert(StringInfo in,LOCKMODE lockmode,PGLogicalTupleData * newtup)603 pglogical_read_insert(StringInfo in, LOCKMODE lockmode,
604 					   PGLogicalTupleData *newtup)
605 {
606 	char		action;
607 	uint32		relid;
608 	uint8		flags;
609 	PGLogicalRelation *rel;
610 
611 	/* read the flags */
612 	flags = pq_getmsgbyte(in);
613 	Assert(flags == 0);
614 	(void) flags; /* unused */
615 
616 	/* read the relation id */
617 	relid = pq_getmsgint(in, 4);
618 
619 	action = pq_getmsgbyte(in);
620 	if (action != 'N')
621 		elog(ERROR, "expected new tuple but got %d",
622 			 action);
623 
624 	rel = pglogical_relation_open(relid, lockmode);
625 
626 	pglogical_read_tuple(in, rel, newtup);
627 
628 	return rel;
629 }
630 
631 /*
632  * Read UPDATE from stream.
633  */
634 PGLogicalRelation *
pglogical_read_update(StringInfo in,LOCKMODE lockmode,bool * hasoldtup,PGLogicalTupleData * oldtup,PGLogicalTupleData * newtup)635 pglogical_read_update(StringInfo in, LOCKMODE lockmode, bool *hasoldtup,
636 					   PGLogicalTupleData *oldtup, PGLogicalTupleData *newtup)
637 {
638 	char		action;
639 	Oid			relid;
640 	uint8		flags;
641 	PGLogicalRelation *rel;
642 
643 	/* read the flags */
644 	flags = pq_getmsgbyte(in);
645 	Assert(flags == 0);
646 	(void) flags; /* unused */
647 
648 	/* read the relation id */
649 	relid = pq_getmsgint(in, 4);
650 
651 	/* read and verify action */
652 	action = pq_getmsgbyte(in);
653 	if (action != 'K' && action != 'O' && action != 'N')
654 		elog(ERROR, "expected action 'N', 'O' or 'K', got %c",
655 			 action);
656 
657 	rel = pglogical_relation_open(relid, lockmode);
658 
659 	/* check for old tuple */
660 	if (action == 'K' || action == 'O')
661 	{
662 		pglogical_read_tuple(in, rel, oldtup);
663 		*hasoldtup = true;
664 		action = pq_getmsgbyte(in);
665 	}
666 	else
667 		*hasoldtup = false;
668 
669 	/* check for new  tuple */
670 	if (action != 'N')
671 		elog(ERROR, "expected action 'N', got %c",
672 			 action);
673 
674 	pglogical_read_tuple(in, rel, newtup);
675 
676 	return rel;
677 }
678 
679 /*
680  * Read DELETE from stream.
681  *
682  * Fills the old tuple.
683  */
684 PGLogicalRelation *
pglogical_read_delete(StringInfo in,LOCKMODE lockmode,PGLogicalTupleData * oldtup)685 pglogical_read_delete(StringInfo in, LOCKMODE lockmode,
686 					   PGLogicalTupleData *oldtup)
687 {
688 	char		action;
689 	Oid			relid;
690 	uint8		flags;
691 	PGLogicalRelation *rel;
692 
693 	/* read the flags */
694 	flags = pq_getmsgbyte(in);
695 	Assert(flags == 0);
696 	(void) flags; /* unused */
697 
698 	/* read the relation id */
699 	relid = pq_getmsgint(in, 4);
700 
701 	/* read and verify action */
702 	action = pq_getmsgbyte(in);
703 	if (action != 'K' && action != 'O')
704 		elog(ERROR, "expected action 'O' or 'K' %c", action);
705 
706 	rel = pglogical_relation_open(relid, lockmode);
707 
708 	pglogical_read_tuple(in, rel, oldtup);
709 
710 	return rel;
711 }
712 
713 
714 /*
715  * Read tuple in remote format from stream.
716  *
717  * The returned tuple is converted to the local relation tuple format.
718  */
719 static void
pglogical_read_tuple(StringInfo in,PGLogicalRelation * rel,PGLogicalTupleData * tuple)720 pglogical_read_tuple(StringInfo in, PGLogicalRelation *rel,
721 					  PGLogicalTupleData *tuple)
722 {
723 	int			i;
724 	int			natts;
725 	char		action;
726 	TupleDesc	desc;
727 
728 	action = pq_getmsgbyte(in);
729 	if (action != 'T')
730 		elog(ERROR, "expected TUPLE, got %c", action);
731 
732 	memset(tuple->nulls, 1, sizeof(tuple->nulls));
733 	memset(tuple->changed, 0, sizeof(tuple->changed));
734 
735 	natts = pq_getmsgint(in, 2);
736 	if (rel->natts != natts)
737 		elog(ERROR, "tuple natts mismatch between remote relation metadata cache (natts=%u) and remote tuple data (natts=%u)", rel->natts, natts);
738 
739 	desc = RelationGetDescr(rel->rel);
740 
741 	/* Read the data */
742 	for (i = 0; i < natts; i++)
743 	{
744 		int			attid = rel->attmap[i];
745 		Form_pg_attribute att = TupleDescAttr(desc,attid);
746 		char		kind = pq_getmsgbyte(in);
747 		const char *data;
748 		int			len;
749 
750 		switch (kind)
751 		{
752 			case 'n': /* null */
753 				/* already marked as null */
754 				tuple->values[attid] = 0xdeadbeef;
755 				tuple->changed[attid] = true;
756 				break;
757 			case 'u': /* unchanged column */
758 				tuple->values[attid] = 0xfbadbeef; /* make bad usage more obvious */
759 				break;
760 			case 'i': /* internal binary format */
761 				tuple->nulls[attid] = false;
762 				tuple->changed[attid] = true;
763 
764 				len = pq_getmsgint(in, 4); /* read length */
765 				data = pq_getmsgbytes(in, len);
766 
767 				/* and data */
768 				if (att->attbyval)
769 					tuple->values[attid] = fetch_att(data, true, len);
770 				else
771 					tuple->values[attid] = PointerGetDatum(data);
772 				break;
773 			case 'b': /* binary send/recv format */
774 				{
775 					Oid typreceive;
776 					Oid typioparam;
777 					StringInfoData buf;
778 
779 					tuple->nulls[attid] = false;
780 					tuple->changed[attid] = true;
781 
782 					len = pq_getmsgint(in, 4); /* read length */
783 
784 					getTypeBinaryInputInfo(att->atttypid,
785 										   &typreceive, &typioparam);
786 
787 					/* create StringInfo pointing into the bigger buffer */
788 					initStringInfo(&buf);
789 					/* and data */
790 					buf.data = (char *) pq_getmsgbytes(in, len);
791 					buf.len = len;
792 					tuple->values[attid] = OidReceiveFunctionCall(
793 						typreceive, &buf, typioparam, att->atttypmod);
794 
795 					if (buf.len != buf.cursor)
796 						ereport(ERROR,
797 								(errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
798 								 errmsg("incorrect binary data format")));
799 					break;
800 				}
801 			case 't': /* text format */
802 				{
803 					Oid typinput;
804 					Oid typioparam;
805 
806 					tuple->nulls[attid] = false;
807 					tuple->changed[attid] = true;
808 
809 					len = pq_getmsgint(in, 4); /* read length */
810 
811 					getTypeInputInfo(att->atttypid, &typinput, &typioparam);
812 					/* and data */
813 					data = (char *) pq_getmsgbytes(in, len);
814 					tuple->values[attid] = OidInputFunctionCall(
815 						typinput, (char *) data, typioparam, att->atttypmod);
816 				}
817 				break;
818 			default:
819 				elog(ERROR, "unknown data representation type '%c'", kind);
820 		}
821 	}
822 }
823 
824 /*
825  * Read schema.relation from stream and return as PGLogicalRelation opened in
826  * lockmode.
827  */
828 uint32
pglogical_read_rel(StringInfo in)829 pglogical_read_rel(StringInfo in)
830 {
831 	uint8		flags;
832 	uint32		relid;
833 	int			len;
834 	char	   *schemaname;
835 	char	   *relname;
836 	int			natts;
837 	char	  **attrnames;
838 
839 	/* read the flags */
840 	flags = pq_getmsgbyte(in);
841 	Assert(flags == 0);
842 	(void) flags; /* unused */
843 
844 	relid = pq_getmsgint(in, 4);
845 
846 	/* Read relation from stream */
847 	len = pq_getmsgbyte(in);
848 	schemaname = (char *) pq_getmsgbytes(in, len);
849 
850 	len = pq_getmsgbyte(in);
851 	relname = (char *) pq_getmsgbytes(in, len);
852 
853 	/* Get attribute description */
854 	pglogical_read_attrs(in, &attrnames, &natts);
855 
856 	pglogical_relation_cache_update(relid, schemaname, relname, natts, attrnames);
857 
858 	return relid;
859 }
860 
861 /*
862  * Read relation attributes from the outputstream.
863  *
864  * TODO handle flags.
865  */
866 static void
pglogical_read_attrs(StringInfo in,char *** attrnames,int * nattrnames)867 pglogical_read_attrs(StringInfo in, char ***attrnames, int *nattrnames)
868 {
869 	int			i;
870 	uint16		nattrs;
871 	char	  **attrs;
872 	char		blocktype;
873 
874 	blocktype = pq_getmsgbyte(in);
875 	if (blocktype != 'A')
876 		elog(ERROR, "expected ATTRS, got %c", blocktype);
877 
878 	nattrs = pq_getmsgint(in, 2);
879 	attrs = palloc(nattrs * sizeof(char *));
880 
881 	/* read the attributes */
882 	for (i = 0; i < nattrs; i++)
883 	{
884 		uint16			len;
885 
886 		blocktype = pq_getmsgbyte(in);		/* column definition follows */
887 		if (blocktype != 'C')
888 			elog(ERROR, "expected COLUMN, got %c", blocktype);
889 		/* read flags (we ignore them so far) */
890 		(void) pq_getmsgbyte(in);
891 
892 		blocktype = pq_getmsgbyte(in);		/* column name block follows */
893 		if (blocktype != 'N')
894 			elog(ERROR, "expected NAME, got %c", blocktype);
895 
896 		/* attribute name */
897 		len = pq_getmsgint(in, 2);
898 		/* the string is NULL terminated */
899 		attrs[i] = (char *) pq_getmsgbytes(in, len);
900 	}
901 
902 	*attrnames = attrs;
903 	*nattrnames = nattrs;
904 }
905