1 /*-------------------------------------------------------------------------
2 *
3 * proto.c
4 * logical replication protocol functions
5 *
6 * Copyright (c) 2015-2021, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * src/backend/replication/logical/proto.c
10 *
11 *-------------------------------------------------------------------------
12 */
13 #include "postgres.h"
14
15 #include "access/sysattr.h"
16 #include "catalog/pg_namespace.h"
17 #include "catalog/pg_type.h"
18 #include "libpq/pqformat.h"
19 #include "replication/logicalproto.h"
20 #include "utils/lsyscache.h"
21 #include "utils/syscache.h"
22
23 /*
24 * Protocol message flags.
25 */
26 #define LOGICALREP_IS_REPLICA_IDENTITY 1
27
28 #define MESSAGE_TRANSACTIONAL (1<<0)
29 #define TRUNCATE_CASCADE (1<<0)
30 #define TRUNCATE_RESTART_SEQS (1<<1)
31
32 static void logicalrep_write_attrs(StringInfo out, Relation rel);
33 static void logicalrep_write_tuple(StringInfo out, Relation rel,
34 HeapTuple tuple, bool binary);
35
36 static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel);
37 static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple);
38
39 static void logicalrep_write_namespace(StringInfo out, Oid nspid);
40 static const char *logicalrep_read_namespace(StringInfo in);
41
42 /*
43 * Write BEGIN to the output stream.
44 */
45 void
logicalrep_write_begin(StringInfo out,ReorderBufferTXN * txn)46 logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
47 {
48 pq_sendbyte(out, LOGICAL_REP_MSG_BEGIN);
49
50 /* fixed fields */
51 pq_sendint64(out, txn->final_lsn);
52 pq_sendint64(out, txn->commit_time);
53 pq_sendint32(out, txn->xid);
54 }
55
56 /*
57 * Read transaction BEGIN from the stream.
58 */
59 void
logicalrep_read_begin(StringInfo in,LogicalRepBeginData * begin_data)60 logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
61 {
62 /* read fields */
63 begin_data->final_lsn = pq_getmsgint64(in);
64 if (begin_data->final_lsn == InvalidXLogRecPtr)
65 elog(ERROR, "final_lsn not set in begin message");
66 begin_data->committime = pq_getmsgint64(in);
67 begin_data->xid = pq_getmsgint(in, 4);
68 }
69
70
71 /*
72 * Write COMMIT to the output stream.
73 */
74 void
logicalrep_write_commit(StringInfo out,ReorderBufferTXN * txn,XLogRecPtr commit_lsn)75 logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn,
76 XLogRecPtr commit_lsn)
77 {
78 uint8 flags = 0;
79
80 pq_sendbyte(out, LOGICAL_REP_MSG_COMMIT);
81
82 /* send the flags field (unused for now) */
83 pq_sendbyte(out, flags);
84
85 /* send fields */
86 pq_sendint64(out, commit_lsn);
87 pq_sendint64(out, txn->end_lsn);
88 pq_sendint64(out, txn->commit_time);
89 }
90
91 /*
92 * Read transaction COMMIT from the stream.
93 */
94 void
logicalrep_read_commit(StringInfo in,LogicalRepCommitData * commit_data)95 logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
96 {
97 /* read flags (unused for now) */
98 uint8 flags = pq_getmsgbyte(in);
99
100 if (flags != 0)
101 elog(ERROR, "unrecognized flags %u in commit message", flags);
102
103 /* read fields */
104 commit_data->commit_lsn = pq_getmsgint64(in);
105 commit_data->end_lsn = pq_getmsgint64(in);
106 commit_data->committime = pq_getmsgint64(in);
107 }
108
109 /*
110 * Write ORIGIN to the output stream.
111 */
112 void
logicalrep_write_origin(StringInfo out,const char * origin,XLogRecPtr origin_lsn)113 logicalrep_write_origin(StringInfo out, const char *origin,
114 XLogRecPtr origin_lsn)
115 {
116 pq_sendbyte(out, LOGICAL_REP_MSG_ORIGIN);
117
118 /* fixed fields */
119 pq_sendint64(out, origin_lsn);
120
121 /* origin string */
122 pq_sendstring(out, origin);
123 }
124
125 /*
126 * Read ORIGIN from the output stream.
127 */
128 char *
logicalrep_read_origin(StringInfo in,XLogRecPtr * origin_lsn)129 logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
130 {
131 /* fixed fields */
132 *origin_lsn = pq_getmsgint64(in);
133
134 /* return origin */
135 return pstrdup(pq_getmsgstring(in));
136 }
137
138 /*
139 * Write INSERT to the output stream.
140 */
141 void
logicalrep_write_insert(StringInfo out,TransactionId xid,Relation rel,HeapTuple newtuple,bool binary)142 logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
143 HeapTuple newtuple, bool binary)
144 {
145 pq_sendbyte(out, LOGICAL_REP_MSG_INSERT);
146
147 /* transaction ID (if not valid, we're not streaming) */
148 if (TransactionIdIsValid(xid))
149 pq_sendint32(out, xid);
150
151 /* use Oid as relation identifier */
152 pq_sendint32(out, RelationGetRelid(rel));
153
154 pq_sendbyte(out, 'N'); /* new tuple follows */
155 logicalrep_write_tuple(out, rel, newtuple, binary);
156 }
157
158 /*
159 * Read INSERT from stream.
160 *
161 * Fills the new tuple.
162 */
163 LogicalRepRelId
logicalrep_read_insert(StringInfo in,LogicalRepTupleData * newtup)164 logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
165 {
166 char action;
167 LogicalRepRelId relid;
168
169 /* read the relation id */
170 relid = pq_getmsgint(in, 4);
171
172 action = pq_getmsgbyte(in);
173 if (action != 'N')
174 elog(ERROR, "expected new tuple but got %d",
175 action);
176
177 logicalrep_read_tuple(in, newtup);
178
179 return relid;
180 }
181
182 /*
183 * Write UPDATE to the output stream.
184 */
185 void
logicalrep_write_update(StringInfo out,TransactionId xid,Relation rel,HeapTuple oldtuple,HeapTuple newtuple,bool binary)186 logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
187 HeapTuple oldtuple, HeapTuple newtuple, bool binary)
188 {
189 pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE);
190
191 Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
192 rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
193 rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
194
195 /* transaction ID (if not valid, we're not streaming) */
196 if (TransactionIdIsValid(xid))
197 pq_sendint32(out, xid);
198
199 /* use Oid as relation identifier */
200 pq_sendint32(out, RelationGetRelid(rel));
201
202 if (oldtuple != NULL)
203 {
204 if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
205 pq_sendbyte(out, 'O'); /* old tuple follows */
206 else
207 pq_sendbyte(out, 'K'); /* old key follows */
208 logicalrep_write_tuple(out, rel, oldtuple, binary);
209 }
210
211 pq_sendbyte(out, 'N'); /* new tuple follows */
212 logicalrep_write_tuple(out, rel, newtuple, binary);
213 }
214
215 /*
216 * Read UPDATE from stream.
217 */
218 LogicalRepRelId
logicalrep_read_update(StringInfo in,bool * has_oldtuple,LogicalRepTupleData * oldtup,LogicalRepTupleData * newtup)219 logicalrep_read_update(StringInfo in, bool *has_oldtuple,
220 LogicalRepTupleData *oldtup,
221 LogicalRepTupleData *newtup)
222 {
223 char action;
224 LogicalRepRelId relid;
225
226 /* read the relation id */
227 relid = pq_getmsgint(in, 4);
228
229 /* read and verify action */
230 action = pq_getmsgbyte(in);
231 if (action != 'K' && action != 'O' && action != 'N')
232 elog(ERROR, "expected action 'N', 'O' or 'K', got %c",
233 action);
234
235 /* check for old tuple */
236 if (action == 'K' || action == 'O')
237 {
238 logicalrep_read_tuple(in, oldtup);
239 *has_oldtuple = true;
240
241 action = pq_getmsgbyte(in);
242 }
243 else
244 *has_oldtuple = false;
245
246 /* check for new tuple */
247 if (action != 'N')
248 elog(ERROR, "expected action 'N', got %c",
249 action);
250
251 logicalrep_read_tuple(in, newtup);
252
253 return relid;
254 }
255
256 /*
257 * Write DELETE to the output stream.
258 */
259 void
logicalrep_write_delete(StringInfo out,TransactionId xid,Relation rel,HeapTuple oldtuple,bool binary)260 logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
261 HeapTuple oldtuple, bool binary)
262 {
263 Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
264 rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
265 rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
266
267 pq_sendbyte(out, LOGICAL_REP_MSG_DELETE);
268
269 /* transaction ID (if not valid, we're not streaming) */
270 if (TransactionIdIsValid(xid))
271 pq_sendint32(out, xid);
272
273 /* use Oid as relation identifier */
274 pq_sendint32(out, RelationGetRelid(rel));
275
276 if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
277 pq_sendbyte(out, 'O'); /* old tuple follows */
278 else
279 pq_sendbyte(out, 'K'); /* old key follows */
280
281 logicalrep_write_tuple(out, rel, oldtuple, binary);
282 }
283
284 /*
285 * Read DELETE from stream.
286 *
287 * Fills the old tuple.
288 */
289 LogicalRepRelId
logicalrep_read_delete(StringInfo in,LogicalRepTupleData * oldtup)290 logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
291 {
292 char action;
293 LogicalRepRelId relid;
294
295 /* read the relation id */
296 relid = pq_getmsgint(in, 4);
297
298 /* read and verify action */
299 action = pq_getmsgbyte(in);
300 if (action != 'K' && action != 'O')
301 elog(ERROR, "expected action 'O' or 'K', got %c", action);
302
303 logicalrep_read_tuple(in, oldtup);
304
305 return relid;
306 }
307
308 /*
309 * Write TRUNCATE to the output stream.
310 */
311 void
logicalrep_write_truncate(StringInfo out,TransactionId xid,int nrelids,Oid relids[],bool cascade,bool restart_seqs)312 logicalrep_write_truncate(StringInfo out,
313 TransactionId xid,
314 int nrelids,
315 Oid relids[],
316 bool cascade, bool restart_seqs)
317 {
318 int i;
319 uint8 flags = 0;
320
321 pq_sendbyte(out, LOGICAL_REP_MSG_TRUNCATE);
322
323 /* transaction ID (if not valid, we're not streaming) */
324 if (TransactionIdIsValid(xid))
325 pq_sendint32(out, xid);
326
327 pq_sendint32(out, nrelids);
328
329 /* encode and send truncate flags */
330 if (cascade)
331 flags |= TRUNCATE_CASCADE;
332 if (restart_seqs)
333 flags |= TRUNCATE_RESTART_SEQS;
334 pq_sendint8(out, flags);
335
336 for (i = 0; i < nrelids; i++)
337 pq_sendint32(out, relids[i]);
338 }
339
340 /*
341 * Read TRUNCATE from stream.
342 */
343 List *
logicalrep_read_truncate(StringInfo in,bool * cascade,bool * restart_seqs)344 logicalrep_read_truncate(StringInfo in,
345 bool *cascade, bool *restart_seqs)
346 {
347 int i;
348 int nrelids;
349 List *relids = NIL;
350 uint8 flags;
351
352 nrelids = pq_getmsgint(in, 4);
353
354 /* read and decode truncate flags */
355 flags = pq_getmsgint(in, 1);
356 *cascade = (flags & TRUNCATE_CASCADE) > 0;
357 *restart_seqs = (flags & TRUNCATE_RESTART_SEQS) > 0;
358
359 for (i = 0; i < nrelids; i++)
360 relids = lappend_oid(relids, pq_getmsgint(in, 4));
361
362 return relids;
363 }
364
365 /*
366 * Write MESSAGE to stream
367 */
368 void
logicalrep_write_message(StringInfo out,TransactionId xid,XLogRecPtr lsn,bool transactional,const char * prefix,Size sz,const char * message)369 logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
370 bool transactional, const char *prefix, Size sz,
371 const char *message)
372 {
373 uint8 flags = 0;
374
375 pq_sendbyte(out, LOGICAL_REP_MSG_MESSAGE);
376
377 /* encode and send message flags */
378 if (transactional)
379 flags |= MESSAGE_TRANSACTIONAL;
380
381 /* transaction ID (if not valid, we're not streaming) */
382 if (TransactionIdIsValid(xid))
383 pq_sendint32(out, xid);
384
385 pq_sendint8(out, flags);
386 pq_sendint64(out, lsn);
387 pq_sendstring(out, prefix);
388 pq_sendint32(out, sz);
389 pq_sendbytes(out, message, sz);
390 }
391
392 /*
393 * Write relation description to the output stream.
394 */
395 void
logicalrep_write_rel(StringInfo out,TransactionId xid,Relation rel)396 logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel)
397 {
398 char *relname;
399
400 pq_sendbyte(out, LOGICAL_REP_MSG_RELATION);
401
402 /* transaction ID (if not valid, we're not streaming) */
403 if (TransactionIdIsValid(xid))
404 pq_sendint32(out, xid);
405
406 /* use Oid as relation identifier */
407 pq_sendint32(out, RelationGetRelid(rel));
408
409 /* send qualified relation name */
410 logicalrep_write_namespace(out, RelationGetNamespace(rel));
411 relname = RelationGetRelationName(rel);
412 pq_sendstring(out, relname);
413
414 /* send replica identity */
415 pq_sendbyte(out, rel->rd_rel->relreplident);
416
417 /* send the attribute info */
418 logicalrep_write_attrs(out, rel);
419 }
420
421 /*
422 * Read the relation info from stream and return as LogicalRepRelation.
423 */
424 LogicalRepRelation *
logicalrep_read_rel(StringInfo in)425 logicalrep_read_rel(StringInfo in)
426 {
427 LogicalRepRelation *rel = palloc(sizeof(LogicalRepRelation));
428
429 rel->remoteid = pq_getmsgint(in, 4);
430
431 /* Read relation name from stream */
432 rel->nspname = pstrdup(logicalrep_read_namespace(in));
433 rel->relname = pstrdup(pq_getmsgstring(in));
434
435 /* Read the replica identity. */
436 rel->replident = pq_getmsgbyte(in);
437
438 /* Get attribute description */
439 logicalrep_read_attrs(in, rel);
440
441 return rel;
442 }
443
444 /*
445 * Write type info to the output stream.
446 *
447 * This function will always write base type info.
448 */
449 void
logicalrep_write_typ(StringInfo out,TransactionId xid,Oid typoid)450 logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid)
451 {
452 Oid basetypoid = getBaseType(typoid);
453 HeapTuple tup;
454 Form_pg_type typtup;
455
456 pq_sendbyte(out, LOGICAL_REP_MSG_TYPE);
457
458 /* transaction ID (if not valid, we're not streaming) */
459 if (TransactionIdIsValid(xid))
460 pq_sendint32(out, xid);
461
462 tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(basetypoid));
463 if (!HeapTupleIsValid(tup))
464 elog(ERROR, "cache lookup failed for type %u", basetypoid);
465 typtup = (Form_pg_type) GETSTRUCT(tup);
466
467 /* use Oid as relation identifier */
468 pq_sendint32(out, typoid);
469
470 /* send qualified type name */
471 logicalrep_write_namespace(out, typtup->typnamespace);
472 pq_sendstring(out, NameStr(typtup->typname));
473
474 ReleaseSysCache(tup);
475 }
476
477 /*
478 * Read type info from the output stream.
479 */
480 void
logicalrep_read_typ(StringInfo in,LogicalRepTyp * ltyp)481 logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
482 {
483 ltyp->remoteid = pq_getmsgint(in, 4);
484
485 /* Read type name from stream */
486 ltyp->nspname = pstrdup(logicalrep_read_namespace(in));
487 ltyp->typname = pstrdup(pq_getmsgstring(in));
488 }
489
490 /*
491 * Write a tuple to the outputstream, in the most efficient format possible.
492 */
493 static void
logicalrep_write_tuple(StringInfo out,Relation rel,HeapTuple tuple,bool binary)494 logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary)
495 {
496 TupleDesc desc;
497 Datum values[MaxTupleAttributeNumber];
498 bool isnull[MaxTupleAttributeNumber];
499 int i;
500 uint16 nliveatts = 0;
501
502 desc = RelationGetDescr(rel);
503
504 for (i = 0; i < desc->natts; i++)
505 {
506 if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated)
507 continue;
508 nliveatts++;
509 }
510 pq_sendint16(out, nliveatts);
511
512 /* try to allocate enough memory from the get-go */
513 enlargeStringInfo(out, tuple->t_len +
514 nliveatts * (1 + 4));
515
516 heap_deform_tuple(tuple, desc, values, isnull);
517
518 /* Write the values */
519 for (i = 0; i < desc->natts; i++)
520 {
521 HeapTuple typtup;
522 Form_pg_type typclass;
523 Form_pg_attribute att = TupleDescAttr(desc, i);
524
525 if (att->attisdropped || att->attgenerated)
526 continue;
527
528 if (isnull[i])
529 {
530 pq_sendbyte(out, LOGICALREP_COLUMN_NULL);
531 continue;
532 }
533
534 if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(values[i]))
535 {
536 /*
537 * Unchanged toasted datum. (Note that we don't promise to detect
538 * unchanged data in general; this is just a cheap check to avoid
539 * sending large values unnecessarily.)
540 */
541 pq_sendbyte(out, LOGICALREP_COLUMN_UNCHANGED);
542 continue;
543 }
544
545 typtup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(att->atttypid));
546 if (!HeapTupleIsValid(typtup))
547 elog(ERROR, "cache lookup failed for type %u", att->atttypid);
548 typclass = (Form_pg_type) GETSTRUCT(typtup);
549
550 /*
551 * Send in binary if requested and type has suitable send function.
552 */
553 if (binary && OidIsValid(typclass->typsend))
554 {
555 bytea *outputbytes;
556 int len;
557
558 pq_sendbyte(out, LOGICALREP_COLUMN_BINARY);
559 outputbytes = OidSendFunctionCall(typclass->typsend, values[i]);
560 len = VARSIZE(outputbytes) - VARHDRSZ;
561 pq_sendint(out, len, 4); /* length */
562 pq_sendbytes(out, VARDATA(outputbytes), len); /* data */
563 pfree(outputbytes);
564 }
565 else
566 {
567 char *outputstr;
568
569 pq_sendbyte(out, LOGICALREP_COLUMN_TEXT);
570 outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]);
571 pq_sendcountedtext(out, outputstr, strlen(outputstr), false);
572 pfree(outputstr);
573 }
574
575 ReleaseSysCache(typtup);
576 }
577 }
578
579 /*
580 * Read tuple in logical replication format from stream.
581 */
582 static void
logicalrep_read_tuple(StringInfo in,LogicalRepTupleData * tuple)583 logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
584 {
585 int i;
586 int natts;
587
588 /* Get number of attributes */
589 natts = pq_getmsgint(in, 2);
590
591 /* Allocate space for per-column values; zero out unused StringInfoDatas */
592 tuple->colvalues = (StringInfoData *) palloc0(natts * sizeof(StringInfoData));
593 tuple->colstatus = (char *) palloc(natts * sizeof(char));
594 tuple->ncols = natts;
595
596 /* Read the data */
597 for (i = 0; i < natts; i++)
598 {
599 char kind;
600 int len;
601 StringInfo value = &tuple->colvalues[i];
602
603 kind = pq_getmsgbyte(in);
604 tuple->colstatus[i] = kind;
605
606 switch (kind)
607 {
608 case LOGICALREP_COLUMN_NULL:
609 /* nothing more to do */
610 break;
611 case LOGICALREP_COLUMN_UNCHANGED:
612 /* we don't receive the value of an unchanged column */
613 break;
614 case LOGICALREP_COLUMN_TEXT:
615 len = pq_getmsgint(in, 4); /* read length */
616
617 /* and data */
618 value->data = palloc(len + 1);
619 pq_copymsgbytes(in, value->data, len);
620 value->data[len] = '\0';
621 /* make StringInfo fully valid */
622 value->len = len;
623 value->cursor = 0;
624 value->maxlen = len;
625 break;
626 case LOGICALREP_COLUMN_BINARY:
627 len = pq_getmsgint(in, 4); /* read length */
628
629 /* and data */
630 value->data = palloc(len + 1);
631 pq_copymsgbytes(in, value->data, len);
632 /* not strictly necessary but per StringInfo practice */
633 value->data[len] = '\0';
634 /* make StringInfo fully valid */
635 value->len = len;
636 value->cursor = 0;
637 value->maxlen = len;
638 break;
639 default:
640 elog(ERROR, "unrecognized data representation type '%c'", kind);
641 }
642 }
643 }
644
645 /*
646 * Write relation attribute metadata to the stream.
647 */
648 static void
logicalrep_write_attrs(StringInfo out,Relation rel)649 logicalrep_write_attrs(StringInfo out, Relation rel)
650 {
651 TupleDesc desc;
652 int i;
653 uint16 nliveatts = 0;
654 Bitmapset *idattrs = NULL;
655 bool replidentfull;
656
657 desc = RelationGetDescr(rel);
658
659 /* send number of live attributes */
660 for (i = 0; i < desc->natts; i++)
661 {
662 if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated)
663 continue;
664 nliveatts++;
665 }
666 pq_sendint16(out, nliveatts);
667
668 /* fetch bitmap of REPLICATION IDENTITY attributes */
669 replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
670 if (!replidentfull)
671 idattrs = RelationGetIdentityKeyBitmap(rel);
672
673 /* send the attributes */
674 for (i = 0; i < desc->natts; i++)
675 {
676 Form_pg_attribute att = TupleDescAttr(desc, i);
677 uint8 flags = 0;
678
679 if (att->attisdropped || att->attgenerated)
680 continue;
681
682 /* REPLICA IDENTITY FULL means all columns are sent as part of key. */
683 if (replidentfull ||
684 bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
685 idattrs))
686 flags |= LOGICALREP_IS_REPLICA_IDENTITY;
687
688 pq_sendbyte(out, flags);
689
690 /* attribute name */
691 pq_sendstring(out, NameStr(att->attname));
692
693 /* attribute type id */
694 pq_sendint32(out, (int) att->atttypid);
695
696 /* attribute mode */
697 pq_sendint32(out, att->atttypmod);
698 }
699
700 bms_free(idattrs);
701 }
702
703 /*
704 * Read relation attribute metadata from the stream.
705 */
706 static void
logicalrep_read_attrs(StringInfo in,LogicalRepRelation * rel)707 logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel)
708 {
709 int i;
710 int natts;
711 char **attnames;
712 Oid *atttyps;
713 Bitmapset *attkeys = NULL;
714
715 natts = pq_getmsgint(in, 2);
716 attnames = palloc(natts * sizeof(char *));
717 atttyps = palloc(natts * sizeof(Oid));
718
719 /* read the attributes */
720 for (i = 0; i < natts; i++)
721 {
722 uint8 flags;
723
724 /* Check for replica identity column */
725 flags = pq_getmsgbyte(in);
726 if (flags & LOGICALREP_IS_REPLICA_IDENTITY)
727 attkeys = bms_add_member(attkeys, i);
728
729 /* attribute name */
730 attnames[i] = pstrdup(pq_getmsgstring(in));
731
732 /* attribute type id */
733 atttyps[i] = (Oid) pq_getmsgint(in, 4);
734
735 /* we ignore attribute mode for now */
736 (void) pq_getmsgint(in, 4);
737 }
738
739 rel->attnames = attnames;
740 rel->atttyps = atttyps;
741 rel->attkeys = attkeys;
742 rel->natts = natts;
743 }
744
745 /*
746 * Write the namespace name or empty string for pg_catalog (to save space).
747 */
748 static void
logicalrep_write_namespace(StringInfo out,Oid nspid)749 logicalrep_write_namespace(StringInfo out, Oid nspid)
750 {
751 if (nspid == PG_CATALOG_NAMESPACE)
752 pq_sendbyte(out, '\0');
753 else
754 {
755 char *nspname = get_namespace_name(nspid);
756
757 if (nspname == NULL)
758 elog(ERROR, "cache lookup failed for namespace %u",
759 nspid);
760
761 pq_sendstring(out, nspname);
762 }
763 }
764
765 /*
766 * Read the namespace name while treating empty string as pg_catalog.
767 */
768 static const char *
logicalrep_read_namespace(StringInfo in)769 logicalrep_read_namespace(StringInfo in)
770 {
771 const char *nspname = pq_getmsgstring(in);
772
773 if (nspname[0] == '\0')
774 nspname = "pg_catalog";
775
776 return nspname;
777 }
778
779 /*
780 * Write the information for the start stream message to the output stream.
781 */
782 void
logicalrep_write_stream_start(StringInfo out,TransactionId xid,bool first_segment)783 logicalrep_write_stream_start(StringInfo out,
784 TransactionId xid, bool first_segment)
785 {
786 pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_START);
787
788 Assert(TransactionIdIsValid(xid));
789
790 /* transaction ID (we're starting to stream, so must be valid) */
791 pq_sendint32(out, xid);
792
793 /* 1 if this is the first streaming segment for this xid */
794 pq_sendbyte(out, first_segment ? 1 : 0);
795 }
796
797 /*
798 * Read the information about the start stream message from output stream.
799 */
800 TransactionId
logicalrep_read_stream_start(StringInfo in,bool * first_segment)801 logicalrep_read_stream_start(StringInfo in, bool *first_segment)
802 {
803 TransactionId xid;
804
805 Assert(first_segment);
806
807 xid = pq_getmsgint(in, 4);
808 *first_segment = (pq_getmsgbyte(in) == 1);
809
810 return xid;
811 }
812
813 /*
814 * Write the stop stream message to the output stream.
815 */
816 void
logicalrep_write_stream_stop(StringInfo out)817 logicalrep_write_stream_stop(StringInfo out)
818 {
819 pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_END);
820 }
821
822 /*
823 * Write STREAM COMMIT to the output stream.
824 */
825 void
logicalrep_write_stream_commit(StringInfo out,ReorderBufferTXN * txn,XLogRecPtr commit_lsn)826 logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn,
827 XLogRecPtr commit_lsn)
828 {
829 uint8 flags = 0;
830
831 pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_COMMIT);
832
833 Assert(TransactionIdIsValid(txn->xid));
834
835 /* transaction ID */
836 pq_sendint32(out, txn->xid);
837
838 /* send the flags field (unused for now) */
839 pq_sendbyte(out, flags);
840
841 /* send fields */
842 pq_sendint64(out, commit_lsn);
843 pq_sendint64(out, txn->end_lsn);
844 pq_sendint64(out, txn->commit_time);
845 }
846
847 /*
848 * Read STREAM COMMIT from the output stream.
849 */
850 TransactionId
logicalrep_read_stream_commit(StringInfo in,LogicalRepCommitData * commit_data)851 logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
852 {
853 TransactionId xid;
854 uint8 flags;
855
856 xid = pq_getmsgint(in, 4);
857
858 /* read flags (unused for now) */
859 flags = pq_getmsgbyte(in);
860
861 if (flags != 0)
862 elog(ERROR, "unrecognized flags %u in commit message", flags);
863
864 /* read fields */
865 commit_data->commit_lsn = pq_getmsgint64(in);
866 commit_data->end_lsn = pq_getmsgint64(in);
867 commit_data->committime = pq_getmsgint64(in);
868
869 return xid;
870 }
871
872 /*
873 * Write STREAM ABORT to the output stream. Note that xid and subxid will be
874 * same for the top-level transaction abort.
875 */
876 void
logicalrep_write_stream_abort(StringInfo out,TransactionId xid,TransactionId subxid)877 logicalrep_write_stream_abort(StringInfo out, TransactionId xid,
878 TransactionId subxid)
879 {
880 pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_ABORT);
881
882 Assert(TransactionIdIsValid(xid) && TransactionIdIsValid(subxid));
883
884 /* transaction ID */
885 pq_sendint32(out, xid);
886 pq_sendint32(out, subxid);
887 }
888
889 /*
890 * Read STREAM ABORT from the output stream.
891 */
892 void
logicalrep_read_stream_abort(StringInfo in,TransactionId * xid,TransactionId * subxid)893 logicalrep_read_stream_abort(StringInfo in, TransactionId *xid,
894 TransactionId *subxid)
895 {
896 Assert(xid && subxid);
897
898 *xid = pq_getmsgint(in, 4);
899 *subxid = pq_getmsgint(in, 4);
900 }
901