1 /*-------------------------------------------------------------------------
2 *
3 * pglogical_proto_native.c
4 * pglogical binary protocol functions
5 *
6 * Copyright (c) 2015, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * pglogical_proto_native.c
10 *
11 *-------------------------------------------------------------------------
12 */
13 #include "postgres.h"
14
15 #include "access/sysattr.h"
16 #if PG_VERSION_NUM >= 130000
17 #include "access/detoast.h"
18 #else
19 #include "access/tuptoaster.h"
20 #endif
21 #include "catalog/pg_type.h"
22 #include "libpq/pqformat.h"
23 #include "nodes/parsenodes.h"
24 #include "replication/reorderbuffer.h"
25 #include "utils/lsyscache.h"
26 #include "utils/rel.h"
27 #include "utils/syscache.h"
28
29 #include "pglogical_output_plugin.h"
30 #include "pglogical_output_proto.h"
31 #include "pglogical_proto_native.h"
32
33 #define IS_REPLICA_IDENTITY 1
34
35 static void pglogical_write_attrs(StringInfo out, Relation rel,
36 Bitmapset *att_list);
37 static void pglogical_write_tuple(StringInfo out, PGLogicalOutputData *data,
38 Relation rel, HeapTuple tuple,
39 Bitmapset *att_list);
40 static char decide_datum_transfer(Form_pg_attribute att,
41 Form_pg_type typclass,
42 bool allow_internal_basetypes,
43 bool allow_binary_basetypes);
44
45 static void pglogical_read_attrs(StringInfo in, char ***attrnames,
46 int *nattrnames);
47 static void pglogical_read_tuple(StringInfo in, PGLogicalRelation *rel,
48 PGLogicalTupleData *tuple);
49
50 /*
51 * Write functions
52 */
53
54 /*
55 * Write relation description to the output stream.
56 */
57 void
pglogical_write_rel(StringInfo out,PGLogicalOutputData * data,Relation rel,Bitmapset * att_list)58 pglogical_write_rel(StringInfo out, PGLogicalOutputData *data, Relation rel,
59 Bitmapset *att_list)
60 {
61 char *nspname;
62 uint8 nspnamelen;
63 const char *relname;
64 uint8 relnamelen;
65 uint8 flags = 0;
66
67 pq_sendbyte(out, 'R'); /* sending RELATION */
68
69 /* send the flags field */
70 pq_sendbyte(out, flags);
71
72 /* use Oid as relation identifier */
73 pq_sendint(out, RelationGetRelid(rel), 4);
74
75 nspname = get_namespace_name(rel->rd_rel->relnamespace);
76 if (nspname == NULL)
77 elog(ERROR, "cache lookup failed for namespace %u",
78 rel->rd_rel->relnamespace);
79 nspnamelen = strlen(nspname) + 1;
80
81 relname = NameStr(rel->rd_rel->relname);
82 relnamelen = strlen(relname) + 1;
83
84 pq_sendbyte(out, nspnamelen); /* schema name length */
85 pq_sendbytes(out, nspname, nspnamelen);
86
87 pq_sendbyte(out, relnamelen); /* table name length */
88 pq_sendbytes(out, relname, relnamelen);
89
90 /* send the attribute info */
91 pglogical_write_attrs(out, rel, att_list);
92
93 pfree(nspname);
94 }
95
96 /*
97 * Write relation attributes to the outputstream.
98 */
99 static void
pglogical_write_attrs(StringInfo out,Relation rel,Bitmapset * att_list)100 pglogical_write_attrs(StringInfo out, Relation rel, Bitmapset *att_list)
101 {
102 TupleDesc desc;
103 int i;
104 uint16 nliveatts = 0;
105 Bitmapset *idattrs;
106
107 desc = RelationGetDescr(rel);
108
109 pq_sendbyte(out, 'A'); /* sending ATTRS */
110
111 /* send number of live attributes */
112 for (i = 0; i < desc->natts; i++)
113 {
114 Form_pg_attribute att = TupleDescAttr(desc,i);
115
116 if (att->attisdropped)
117 continue;
118 if (att_list &&
119 !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
120 att_list))
121 continue;
122 nliveatts++;
123 }
124 pq_sendint(out, nliveatts, 2);
125
126 /* fetch bitmap of REPLICATION IDENTITY attributes */
127 idattrs = RelationGetIndexAttrBitmap(rel, INDEX_ATTR_BITMAP_IDENTITY_KEY);
128
129 /* send the attributes */
130 for (i = 0; i < desc->natts; i++)
131 {
132 Form_pg_attribute att = TupleDescAttr(desc,i);
133 uint8 flags = 0;
134 uint16 len;
135 const char *attname;
136
137 if (att->attisdropped)
138 continue;
139 if (att_list &&
140 !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
141 att_list))
142 continue;
143
144 if (bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
145 idattrs))
146 flags |= IS_REPLICA_IDENTITY;
147
148 pq_sendbyte(out, 'C'); /* column definition follows */
149 pq_sendbyte(out, flags);
150
151 pq_sendbyte(out, 'N'); /* column name block follows */
152 attname = NameStr(att->attname);
153 len = strlen(attname) + 1;
154 pq_sendint(out, len, 2);
155 pq_sendbytes(out, attname, len); /* data */
156 }
157
158 bms_free(idattrs);
159 }
160
161 /*
162 * Write BEGIN to the output stream.
163 */
164 void
pglogical_write_begin(StringInfo out,PGLogicalOutputData * data,ReorderBufferTXN * txn)165 pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
166 ReorderBufferTXN *txn)
167 {
168 uint8 flags = 0;
169
170 pq_sendbyte(out, 'B'); /* BEGIN */
171
172 /* send the flags field its self */
173 pq_sendbyte(out, flags);
174
175 /* fixed fields */
176 pq_sendint64(out, txn->final_lsn);
177 pq_sendint64(out, txn->commit_time);
178 pq_sendint(out, txn->xid, 4);
179 }
180
181 /*
182 * Write COMMIT to the output stream.
183 */
184 void
pglogical_write_commit(StringInfo out,PGLogicalOutputData * data,ReorderBufferTXN * txn,XLogRecPtr commit_lsn)185 pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
186 ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
187 {
188 uint8 flags = 0;
189
190 pq_sendbyte(out, 'C'); /* sending COMMIT */
191
192 /* send the flags field */
193 pq_sendbyte(out, flags);
194
195 /* send fixed fields */
196 pq_sendint64(out, commit_lsn);
197 pq_sendint64(out, txn->end_lsn);
198 pq_sendint64(out, txn->commit_time);
199 }
200
201 /*
202 * Write ORIGIN to the output stream.
203 */
204 void
pglogical_write_origin(StringInfo out,const char * origin,XLogRecPtr origin_lsn)205 pglogical_write_origin(StringInfo out, const char *origin,
206 XLogRecPtr origin_lsn)
207 {
208 uint8 flags = 0;
209 uint8 len;
210
211 Assert(strlen(origin) < 255);
212
213 pq_sendbyte(out, 'O'); /* ORIGIN */
214
215 /* send the flags field its self */
216 pq_sendbyte(out, flags);
217
218 /* fixed fields */
219 pq_sendint64(out, origin_lsn);
220
221 /* origin */
222 len = strlen(origin) + 1;
223 pq_sendbyte(out, len);
224 pq_sendbytes(out, origin, len);
225 }
226
227 /*
228 * Write INSERT to the output stream.
229 */
230 void
pglogical_write_insert(StringInfo out,PGLogicalOutputData * data,Relation rel,HeapTuple newtuple,Bitmapset * att_list)231 pglogical_write_insert(StringInfo out, PGLogicalOutputData *data,
232 Relation rel, HeapTuple newtuple,
233 Bitmapset *att_list)
234 {
235 uint8 flags = 0;
236
237 pq_sendbyte(out, 'I'); /* action INSERT */
238
239 /* send the flags field */
240 pq_sendbyte(out, flags);
241
242 /* use Oid as relation identifier */
243 pq_sendint(out, RelationGetRelid(rel), 4);
244
245 pq_sendbyte(out, 'N'); /* new tuple follows */
246 pglogical_write_tuple(out, data, rel, newtuple, att_list);
247 }
248
249 /*
250 * Write UPDATE to the output stream.
251 */
252 void
pglogical_write_update(StringInfo out,PGLogicalOutputData * data,Relation rel,HeapTuple oldtuple,HeapTuple newtuple,Bitmapset * att_list)253 pglogical_write_update(StringInfo out, PGLogicalOutputData *data,
254 Relation rel, HeapTuple oldtuple, HeapTuple newtuple,
255 Bitmapset *att_list)
256 {
257 uint8 flags = 0;
258
259 pq_sendbyte(out, 'U'); /* action UPDATE */
260
261 /* send the flags field */
262 pq_sendbyte(out, flags);
263
264 /* use Oid as relation identifier */
265 pq_sendint(out, RelationGetRelid(rel), 4);
266
267 /*
268 * TODO: support whole tuple (O tuple type)
269 *
270 * Right now we can only write the key-part since logical decoding
271 * doesn't know how to record the whole old tuple for us in WAL.
272 * We can't use REPLICA IDENTITY FULL for this, since that makes
273 * the key-part the whole tuple, causing issues with conflict
274 * resultion and index lookups. We need a separate decoding option
275 * to record whole tuples.
276 */
277 if (oldtuple != NULL)
278 {
279 pq_sendbyte(out, 'K'); /* old key follows */
280 pglogical_write_tuple(out, data, rel, oldtuple, att_list);
281 }
282
283 pq_sendbyte(out, 'N'); /* new tuple follows */
284 pglogical_write_tuple(out, data, rel, newtuple, att_list);
285 }
286
287 /*
288 * Write DELETE to the output stream.
289 */
290 void
pglogical_write_delete(StringInfo out,PGLogicalOutputData * data,Relation rel,HeapTuple oldtuple,Bitmapset * att_list)291 pglogical_write_delete(StringInfo out, PGLogicalOutputData *data,
292 Relation rel, HeapTuple oldtuple,
293 Bitmapset *att_list)
294 {
295 uint8 flags = 0;
296
297 pq_sendbyte(out, 'D'); /* action DELETE */
298
299 /* send the flags field */
300 pq_sendbyte(out, flags);
301
302 /* use Oid as relation identifier */
303 pq_sendint(out, RelationGetRelid(rel), 4);
304
305 /*
306 * TODO support whole tuple ('O' tuple type)
307 *
308 * See notes on update for details
309 */
310 pq_sendbyte(out, 'K'); /* old key follows */
311 pglogical_write_tuple(out, data, rel, oldtuple, att_list);
312 }
313
314 /*
315 * Most of the brains for startup message creation lives in
316 * pglogical_config.c, so this presently just sends the set of key/value pairs.
317 */
318 void
write_startup_message(StringInfo out,List * msg)319 write_startup_message(StringInfo out, List *msg)
320 {
321 ListCell *lc;
322
323 pq_sendbyte(out, 'S'); /* message type field */
324 pq_sendbyte(out, PGLOGICAL_STARTUP_MSG_FORMAT_FLAT); /* startup message version */
325 foreach (lc, msg)
326 {
327 DefElem *param = (DefElem*)lfirst(lc);
328 Assert(IsA(param->arg, String) && strVal(param->arg) != NULL);
329 /* null-terminated key and value pairs, in client_encoding */
330 pq_sendstring(out, param->defname);
331 pq_sendstring(out, strVal(param->arg));
332 }
333 }
334
335 /*
336 * Write a tuple to the outputstream, in the most efficient format possible.
337 */
338 static void
pglogical_write_tuple(StringInfo out,PGLogicalOutputData * data,Relation rel,HeapTuple tuple,Bitmapset * att_list)339 pglogical_write_tuple(StringInfo out, PGLogicalOutputData *data,
340 Relation rel, HeapTuple tuple, Bitmapset *att_list)
341 {
342 TupleDesc desc;
343 Datum values[MaxTupleAttributeNumber];
344 bool isnull[MaxTupleAttributeNumber];
345 int i;
346 uint16 nliveatts = 0;
347
348 desc = RelationGetDescr(rel);
349
350 pq_sendbyte(out, 'T'); /* sending TUPLE */
351
352 for (i = 0; i < desc->natts; i++)
353 {
354 Form_pg_attribute att = TupleDescAttr(desc,i);
355
356 if (att->attisdropped)
357 continue;
358 if (att_list &&
359 !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
360 att_list))
361 continue;
362 nliveatts++;
363 }
364 pq_sendint(out, nliveatts, 2);
365
366 /* try to allocate enough memory from the get go */
367 enlargeStringInfo(out, tuple->t_len +
368 nliveatts * (1 + 4));
369
370 /*
371 * XXX: should this prove to be a relevant bottleneck, it might be
372 * interesting to inline heap_deform_tuple() here, we don't actually need
373 * the information in the form we get from it.
374 */
375 heap_deform_tuple(tuple, desc, values, isnull);
376
377 for (i = 0; i < desc->natts; i++)
378 {
379 HeapTuple typtup;
380 Form_pg_type typclass;
381 Form_pg_attribute att = TupleDescAttr(desc,i);
382 char transfer_type;
383
384 /* skip dropped columns */
385 if (att->attisdropped)
386 continue;
387 if (att_list &&
388 !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
389 att_list))
390 continue;
391
392 if (isnull[i])
393 {
394 pq_sendbyte(out, 'n'); /* null column */
395 continue;
396 }
397 else if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(values[i]))
398 {
399 pq_sendbyte(out, 'u'); /* unchanged toast column */
400 continue;
401 }
402
403 typtup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(att->atttypid));
404 if (!HeapTupleIsValid(typtup))
405 elog(ERROR, "cache lookup failed for type %u", att->atttypid);
406 typclass = (Form_pg_type) GETSTRUCT(typtup);
407
408 transfer_type = decide_datum_transfer(att, typclass,
409 data->allow_internal_basetypes,
410 data->allow_binary_basetypes);
411
412 switch (transfer_type)
413 {
414 case 'i':
415 pq_sendbyte(out, 'i'); /* internal-format binary data follows */
416
417 /* pass by value */
418 if (att->attbyval)
419 {
420 pq_sendint(out, att->attlen, 4); /* length */
421
422 enlargeStringInfo(out, att->attlen);
423 store_att_byval(out->data + out->len, values[i],
424 att->attlen);
425 out->len += att->attlen;
426 out->data[out->len] = '\0';
427 }
428 /* fixed length non-varlena pass-by-reference type */
429 else if (att->attlen > 0)
430 {
431 pq_sendint(out, att->attlen, 4); /* length */
432
433 appendBinaryStringInfo(out, DatumGetPointer(values[i]),
434 att->attlen);
435 }
436 /* varlena type */
437 else if (att->attlen == -1)
438 {
439 char *data = DatumGetPointer(values[i]);
440
441 /* send indirect datums inline */
442 if (VARATT_IS_EXTERNAL_INDIRECT(values[i]))
443 {
444 struct varatt_indirect redirect;
445 VARATT_EXTERNAL_GET_POINTER(redirect, data);
446 data = (char *) redirect.pointer;
447 }
448
449 Assert(!VARATT_IS_EXTERNAL(data));
450
451 pq_sendint(out, VARSIZE_ANY(data), 4); /* length */
452
453 appendBinaryStringInfo(out, data, VARSIZE_ANY(data));
454 }
455 else
456 elog(ERROR, "unsupported tuple type");
457
458 break;
459
460 case 'b':
461 {
462 bytea *outputbytes;
463 int len;
464
465 pq_sendbyte(out, 'b'); /* binary send/recv data follows */
466
467 outputbytes = OidSendFunctionCall(typclass->typsend,
468 values[i]);
469
470 len = VARSIZE(outputbytes) - VARHDRSZ;
471 pq_sendint(out, len, 4); /* length */
472 pq_sendbytes(out, VARDATA(outputbytes), len); /* data */
473 pfree(outputbytes);
474 }
475 break;
476
477 default:
478 {
479 char *outputstr;
480 int len;
481
482 pq_sendbyte(out, 't'); /* 'text' data follows */
483
484 outputstr = OidOutputFunctionCall(typclass->typoutput,
485 values[i]);
486 len = strlen(outputstr) + 1;
487 pq_sendint(out, len, 4); /* length */
488 appendBinaryStringInfo(out, outputstr, len); /* data */
489 pfree(outputstr);
490 }
491 }
492
493 ReleaseSysCache(typtup);
494 }
495 }
496
497 /*
498 * Make the executive decision about which protocol to use.
499 */
500 static char
decide_datum_transfer(Form_pg_attribute att,Form_pg_type typclass,bool allow_internal_basetypes,bool allow_binary_basetypes)501 decide_datum_transfer(Form_pg_attribute att, Form_pg_type typclass,
502 bool allow_internal_basetypes,
503 bool allow_binary_basetypes)
504 {
505 /*
506 * Use the binary protocol, if allowed, for builtin & plain datatypes.
507 */
508 if (allow_internal_basetypes &&
509 typclass->typtype == 'b' &&
510 att->atttypid < FirstNormalObjectId &&
511 typclass->typelem == InvalidOid)
512 {
513 return 'i';
514 }
515 /*
516 * Use send/recv, if allowed, if the type is plain or builtin.
517 *
518 * XXX: we can't use send/recv for array or composite types for now due to
519 * the embedded oids.
520 */
521 else if (allow_binary_basetypes &&
522 OidIsValid(typclass->typreceive) &&
523 (att->atttypid < FirstNormalObjectId || typclass->typtype != 'c') &&
524 (att->atttypid < FirstNormalObjectId || typclass->typelem == InvalidOid))
525 {
526 return 'b';
527 }
528
529 return 't';
530 }
531
532
533 /*
534 * Read functions.
535 */
536
537 /*
538 * Read transaction BEGIN from the stream.
539 */
540 void
pglogical_read_begin(StringInfo in,XLogRecPtr * remote_lsn,TimestampTz * committime,TransactionId * remote_xid)541 pglogical_read_begin(StringInfo in, XLogRecPtr *remote_lsn,
542 TimestampTz *committime, TransactionId *remote_xid)
543 {
544 /* read flags */
545 uint8 flags = pq_getmsgbyte(in);
546 Assert(flags == 0);
547 (void) flags; /* unused */
548
549 /* read fields */
550 *remote_lsn = pq_getmsgint64(in);
551 Assert(*remote_lsn != InvalidXLogRecPtr);
552 *committime = pq_getmsgint64(in);
553 *remote_xid = pq_getmsgint(in, 4);
554 }
555
556 /*
557 * Read transaction COMMIT from the stream.
558 */
559 void
pglogical_read_commit(StringInfo in,XLogRecPtr * commit_lsn,XLogRecPtr * end_lsn,TimestampTz * committime)560 pglogical_read_commit(StringInfo in, XLogRecPtr *commit_lsn,
561 XLogRecPtr *end_lsn, TimestampTz *committime)
562 {
563 /* read flags */
564 uint8 flags = pq_getmsgbyte(in);
565 Assert(flags == 0);
566 (void) flags; /* unused */
567
568 /* read fields */
569 *commit_lsn = pq_getmsgint64(in);
570 *end_lsn = pq_getmsgint64(in);
571 *committime = pq_getmsgint64(in);
572 }
573
574 /*
575 * Read ORIGIN from the output stream.
576 */
577 char *
pglogical_read_origin(StringInfo in,XLogRecPtr * origin_lsn)578 pglogical_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
579 {
580 uint8 flags;
581 uint8 len;
582
583 /* read the flags */
584 flags = pq_getmsgbyte(in);
585 Assert(flags == 0);
586 (void) flags; /* unused */
587
588 /* fixed fields */
589 *origin_lsn = pq_getmsgint64(in);
590
591 /* origin */
592 len = pq_getmsgbyte(in);
593 return pnstrdup(pq_getmsgbytes(in, len), len);
594 }
595
596
597 /*
598 * Read INSERT from stream.
599 *
600 * Fills the new tuple.
601 */
602 PGLogicalRelation *
pglogical_read_insert(StringInfo in,LOCKMODE lockmode,PGLogicalTupleData * newtup)603 pglogical_read_insert(StringInfo in, LOCKMODE lockmode,
604 PGLogicalTupleData *newtup)
605 {
606 char action;
607 uint32 relid;
608 uint8 flags;
609 PGLogicalRelation *rel;
610
611 /* read the flags */
612 flags = pq_getmsgbyte(in);
613 Assert(flags == 0);
614 (void) flags; /* unused */
615
616 /* read the relation id */
617 relid = pq_getmsgint(in, 4);
618
619 action = pq_getmsgbyte(in);
620 if (action != 'N')
621 elog(ERROR, "expected new tuple but got %d",
622 action);
623
624 rel = pglogical_relation_open(relid, lockmode);
625
626 pglogical_read_tuple(in, rel, newtup);
627
628 return rel;
629 }
630
631 /*
632 * Read UPDATE from stream.
633 */
634 PGLogicalRelation *
pglogical_read_update(StringInfo in,LOCKMODE lockmode,bool * hasoldtup,PGLogicalTupleData * oldtup,PGLogicalTupleData * newtup)635 pglogical_read_update(StringInfo in, LOCKMODE lockmode, bool *hasoldtup,
636 PGLogicalTupleData *oldtup, PGLogicalTupleData *newtup)
637 {
638 char action;
639 Oid relid;
640 uint8 flags;
641 PGLogicalRelation *rel;
642
643 /* read the flags */
644 flags = pq_getmsgbyte(in);
645 Assert(flags == 0);
646 (void) flags; /* unused */
647
648 /* read the relation id */
649 relid = pq_getmsgint(in, 4);
650
651 /* read and verify action */
652 action = pq_getmsgbyte(in);
653 if (action != 'K' && action != 'O' && action != 'N')
654 elog(ERROR, "expected action 'N', 'O' or 'K', got %c",
655 action);
656
657 rel = pglogical_relation_open(relid, lockmode);
658
659 /* check for old tuple */
660 if (action == 'K' || action == 'O')
661 {
662 pglogical_read_tuple(in, rel, oldtup);
663 *hasoldtup = true;
664 action = pq_getmsgbyte(in);
665 }
666 else
667 *hasoldtup = false;
668
669 /* check for new tuple */
670 if (action != 'N')
671 elog(ERROR, "expected action 'N', got %c",
672 action);
673
674 pglogical_read_tuple(in, rel, newtup);
675
676 return rel;
677 }
678
679 /*
680 * Read DELETE from stream.
681 *
682 * Fills the old tuple.
683 */
684 PGLogicalRelation *
pglogical_read_delete(StringInfo in,LOCKMODE lockmode,PGLogicalTupleData * oldtup)685 pglogical_read_delete(StringInfo in, LOCKMODE lockmode,
686 PGLogicalTupleData *oldtup)
687 {
688 char action;
689 Oid relid;
690 uint8 flags;
691 PGLogicalRelation *rel;
692
693 /* read the flags */
694 flags = pq_getmsgbyte(in);
695 Assert(flags == 0);
696 (void) flags; /* unused */
697
698 /* read the relation id */
699 relid = pq_getmsgint(in, 4);
700
701 /* read and verify action */
702 action = pq_getmsgbyte(in);
703 if (action != 'K' && action != 'O')
704 elog(ERROR, "expected action 'O' or 'K' %c", action);
705
706 rel = pglogical_relation_open(relid, lockmode);
707
708 pglogical_read_tuple(in, rel, oldtup);
709
710 return rel;
711 }
712
713
714 /*
715 * Read tuple in remote format from stream.
716 *
717 * The returned tuple is converted to the local relation tuple format.
718 */
719 static void
pglogical_read_tuple(StringInfo in,PGLogicalRelation * rel,PGLogicalTupleData * tuple)720 pglogical_read_tuple(StringInfo in, PGLogicalRelation *rel,
721 PGLogicalTupleData *tuple)
722 {
723 int i;
724 int natts;
725 char action;
726 TupleDesc desc;
727
728 action = pq_getmsgbyte(in);
729 if (action != 'T')
730 elog(ERROR, "expected TUPLE, got %c", action);
731
732 memset(tuple->nulls, 1, sizeof(tuple->nulls));
733 memset(tuple->changed, 0, sizeof(tuple->changed));
734
735 natts = pq_getmsgint(in, 2);
736 if (rel->natts != natts)
737 elog(ERROR, "tuple natts mismatch between remote relation metadata cache (natts=%u) and remote tuple data (natts=%u)", rel->natts, natts);
738
739 desc = RelationGetDescr(rel->rel);
740
741 /* Read the data */
742 for (i = 0; i < natts; i++)
743 {
744 int attid = rel->attmap[i];
745 Form_pg_attribute att = TupleDescAttr(desc,attid);
746 char kind = pq_getmsgbyte(in);
747 const char *data;
748 int len;
749
750 switch (kind)
751 {
752 case 'n': /* null */
753 /* already marked as null */
754 tuple->values[attid] = 0xdeadbeef;
755 tuple->changed[attid] = true;
756 break;
757 case 'u': /* unchanged column */
758 tuple->values[attid] = 0xfbadbeef; /* make bad usage more obvious */
759 break;
760 case 'i': /* internal binary format */
761 tuple->nulls[attid] = false;
762 tuple->changed[attid] = true;
763
764 len = pq_getmsgint(in, 4); /* read length */
765 data = pq_getmsgbytes(in, len);
766
767 /* and data */
768 if (att->attbyval)
769 tuple->values[attid] = fetch_att(data, true, len);
770 else
771 tuple->values[attid] = PointerGetDatum(data);
772 break;
773 case 'b': /* binary send/recv format */
774 {
775 Oid typreceive;
776 Oid typioparam;
777 StringInfoData buf;
778
779 tuple->nulls[attid] = false;
780 tuple->changed[attid] = true;
781
782 len = pq_getmsgint(in, 4); /* read length */
783
784 getTypeBinaryInputInfo(att->atttypid,
785 &typreceive, &typioparam);
786
787 /* create StringInfo pointing into the bigger buffer */
788 initStringInfo(&buf);
789 /* and data */
790 buf.data = (char *) pq_getmsgbytes(in, len);
791 buf.len = len;
792 tuple->values[attid] = OidReceiveFunctionCall(
793 typreceive, &buf, typioparam, att->atttypmod);
794
795 if (buf.len != buf.cursor)
796 ereport(ERROR,
797 (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
798 errmsg("incorrect binary data format")));
799 break;
800 }
801 case 't': /* text format */
802 {
803 Oid typinput;
804 Oid typioparam;
805
806 tuple->nulls[attid] = false;
807 tuple->changed[attid] = true;
808
809 len = pq_getmsgint(in, 4); /* read length */
810
811 getTypeInputInfo(att->atttypid, &typinput, &typioparam);
812 /* and data */
813 data = (char *) pq_getmsgbytes(in, len);
814 tuple->values[attid] = OidInputFunctionCall(
815 typinput, (char *) data, typioparam, att->atttypmod);
816 }
817 break;
818 default:
819 elog(ERROR, "unknown data representation type '%c'", kind);
820 }
821 }
822 }
823
824 /*
825 * Read schema.relation from stream and return as PGLogicalRelation opened in
826 * lockmode.
827 */
828 uint32
pglogical_read_rel(StringInfo in)829 pglogical_read_rel(StringInfo in)
830 {
831 uint8 flags;
832 uint32 relid;
833 int len;
834 char *schemaname;
835 char *relname;
836 int natts;
837 char **attrnames;
838
839 /* read the flags */
840 flags = pq_getmsgbyte(in);
841 Assert(flags == 0);
842 (void) flags; /* unused */
843
844 relid = pq_getmsgint(in, 4);
845
846 /* Read relation from stream */
847 len = pq_getmsgbyte(in);
848 schemaname = (char *) pq_getmsgbytes(in, len);
849
850 len = pq_getmsgbyte(in);
851 relname = (char *) pq_getmsgbytes(in, len);
852
853 /* Get attribute description */
854 pglogical_read_attrs(in, &attrnames, &natts);
855
856 pglogical_relation_cache_update(relid, schemaname, relname, natts, attrnames);
857
858 return relid;
859 }
860
861 /*
862 * Read relation attributes from the outputstream.
863 *
864 * TODO handle flags.
865 */
866 static void
pglogical_read_attrs(StringInfo in,char *** attrnames,int * nattrnames)867 pglogical_read_attrs(StringInfo in, char ***attrnames, int *nattrnames)
868 {
869 int i;
870 uint16 nattrs;
871 char **attrs;
872 char blocktype;
873
874 blocktype = pq_getmsgbyte(in);
875 if (blocktype != 'A')
876 elog(ERROR, "expected ATTRS, got %c", blocktype);
877
878 nattrs = pq_getmsgint(in, 2);
879 attrs = palloc(nattrs * sizeof(char *));
880
881 /* read the attributes */
882 for (i = 0; i < nattrs; i++)
883 {
884 uint16 len;
885
886 blocktype = pq_getmsgbyte(in); /* column definition follows */
887 if (blocktype != 'C')
888 elog(ERROR, "expected COLUMN, got %c", blocktype);
889 /* read flags (we ignore them so far) */
890 (void) pq_getmsgbyte(in);
891
892 blocktype = pq_getmsgbyte(in); /* column name block follows */
893 if (blocktype != 'N')
894 elog(ERROR, "expected NAME, got %c", blocktype);
895
896 /* attribute name */
897 len = pq_getmsgint(in, 2);
898 /* the string is NULL terminated */
899 attrs[i] = (char *) pq_getmsgbytes(in, len);
900 }
901
902 *attrnames = attrs;
903 *nattrnames = nattrs;
904 }
905