1 /*-------------------------------------------------------------------------
2 *
3 * proto.c
4 * logical replication protocol functions
5 *
6 * Copyright (c) 2015-2020, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * src/backend/replication/logical/proto.c
10 *
11 *-------------------------------------------------------------------------
12 */
13 #include "postgres.h"
14
15 #include "access/sysattr.h"
16 #include "catalog/pg_namespace.h"
17 #include "catalog/pg_type.h"
18 #include "libpq/pqformat.h"
19 #include "replication/logicalproto.h"
20 #include "utils/builtins.h"
21 #include "utils/lsyscache.h"
22 #include "utils/syscache.h"
23
24 /*
25 * Protocol message flags.
26 */
27 #define LOGICALREP_IS_REPLICA_IDENTITY 1
28
29 #define TRUNCATE_CASCADE (1<<0)
30 #define TRUNCATE_RESTART_SEQS (1<<1)
31
32 static void logicalrep_write_attrs(StringInfo out, Relation rel);
33 static void logicalrep_write_tuple(StringInfo out, Relation rel,
34 HeapTuple tuple);
35
36 static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel);
37 static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple);
38
39 static void logicalrep_write_namespace(StringInfo out, Oid nspid);
40 static const char *logicalrep_read_namespace(StringInfo in);
41
42 /*
43 * Write BEGIN to the output stream.
44 */
45 void
logicalrep_write_begin(StringInfo out,ReorderBufferTXN * txn)46 logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
47 {
48 pq_sendbyte(out, 'B'); /* BEGIN */
49
50 /* fixed fields */
51 pq_sendint64(out, txn->final_lsn);
52 pq_sendint64(out, txn->commit_time);
53 pq_sendint32(out, txn->xid);
54 }
55
56 /*
57 * Read transaction BEGIN from the stream.
58 */
59 void
logicalrep_read_begin(StringInfo in,LogicalRepBeginData * begin_data)60 logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
61 {
62 /* read fields */
63 begin_data->final_lsn = pq_getmsgint64(in);
64 if (begin_data->final_lsn == InvalidXLogRecPtr)
65 elog(ERROR, "final_lsn not set in begin message");
66 begin_data->committime = pq_getmsgint64(in);
67 begin_data->xid = pq_getmsgint(in, 4);
68 }
69
70
71 /*
72 * Write COMMIT to the output stream.
73 */
74 void
logicalrep_write_commit(StringInfo out,ReorderBufferTXN * txn,XLogRecPtr commit_lsn)75 logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn,
76 XLogRecPtr commit_lsn)
77 {
78 uint8 flags = 0;
79
80 pq_sendbyte(out, 'C'); /* sending COMMIT */
81
82 /* send the flags field (unused for now) */
83 pq_sendbyte(out, flags);
84
85 /* send fields */
86 pq_sendint64(out, commit_lsn);
87 pq_sendint64(out, txn->end_lsn);
88 pq_sendint64(out, txn->commit_time);
89 }
90
91 /*
92 * Read transaction COMMIT from the stream.
93 */
94 void
logicalrep_read_commit(StringInfo in,LogicalRepCommitData * commit_data)95 logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
96 {
97 /* read flags (unused for now) */
98 uint8 flags = pq_getmsgbyte(in);
99
100 if (flags != 0)
101 elog(ERROR, "unrecognized flags %u in commit message", flags);
102
103 /* read fields */
104 commit_data->commit_lsn = pq_getmsgint64(in);
105 commit_data->end_lsn = pq_getmsgint64(in);
106 commit_data->committime = pq_getmsgint64(in);
107 }
108
109 /*
110 * Write ORIGIN to the output stream.
111 */
112 void
logicalrep_write_origin(StringInfo out,const char * origin,XLogRecPtr origin_lsn)113 logicalrep_write_origin(StringInfo out, const char *origin,
114 XLogRecPtr origin_lsn)
115 {
116 pq_sendbyte(out, 'O'); /* ORIGIN */
117
118 /* fixed fields */
119 pq_sendint64(out, origin_lsn);
120
121 /* origin string */
122 pq_sendstring(out, origin);
123 }
124
125 /*
126 * Read ORIGIN from the output stream.
127 */
128 char *
logicalrep_read_origin(StringInfo in,XLogRecPtr * origin_lsn)129 logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
130 {
131 /* fixed fields */
132 *origin_lsn = pq_getmsgint64(in);
133
134 /* return origin */
135 return pstrdup(pq_getmsgstring(in));
136 }
137
138 /*
139 * Write INSERT to the output stream.
140 */
141 void
logicalrep_write_insert(StringInfo out,Relation rel,HeapTuple newtuple)142 logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple)
143 {
144 pq_sendbyte(out, 'I'); /* action INSERT */
145
146 /* use Oid as relation identifier */
147 pq_sendint32(out, RelationGetRelid(rel));
148
149 pq_sendbyte(out, 'N'); /* new tuple follows */
150 logicalrep_write_tuple(out, rel, newtuple);
151 }
152
153 /*
154 * Read INSERT from stream.
155 *
156 * Fills the new tuple.
157 */
158 LogicalRepRelId
logicalrep_read_insert(StringInfo in,LogicalRepTupleData * newtup)159 logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
160 {
161 char action;
162 LogicalRepRelId relid;
163
164 /* read the relation id */
165 relid = pq_getmsgint(in, 4);
166
167 action = pq_getmsgbyte(in);
168 if (action != 'N')
169 elog(ERROR, "expected new tuple but got %d",
170 action);
171
172 logicalrep_read_tuple(in, newtup);
173
174 return relid;
175 }
176
177 /*
178 * Write UPDATE to the output stream.
179 */
180 void
logicalrep_write_update(StringInfo out,Relation rel,HeapTuple oldtuple,HeapTuple newtuple)181 logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple,
182 HeapTuple newtuple)
183 {
184 pq_sendbyte(out, 'U'); /* action UPDATE */
185
186 Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
187 rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
188 rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
189
190 /* use Oid as relation identifier */
191 pq_sendint32(out, RelationGetRelid(rel));
192
193 if (oldtuple != NULL)
194 {
195 if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
196 pq_sendbyte(out, 'O'); /* old tuple follows */
197 else
198 pq_sendbyte(out, 'K'); /* old key follows */
199 logicalrep_write_tuple(out, rel, oldtuple);
200 }
201
202 pq_sendbyte(out, 'N'); /* new tuple follows */
203 logicalrep_write_tuple(out, rel, newtuple);
204 }
205
206 /*
207 * Read UPDATE from stream.
208 */
209 LogicalRepRelId
logicalrep_read_update(StringInfo in,bool * has_oldtuple,LogicalRepTupleData * oldtup,LogicalRepTupleData * newtup)210 logicalrep_read_update(StringInfo in, bool *has_oldtuple,
211 LogicalRepTupleData *oldtup,
212 LogicalRepTupleData *newtup)
213 {
214 char action;
215 LogicalRepRelId relid;
216
217 /* read the relation id */
218 relid = pq_getmsgint(in, 4);
219
220 /* read and verify action */
221 action = pq_getmsgbyte(in);
222 if (action != 'K' && action != 'O' && action != 'N')
223 elog(ERROR, "expected action 'N', 'O' or 'K', got %c",
224 action);
225
226 /* check for old tuple */
227 if (action == 'K' || action == 'O')
228 {
229 logicalrep_read_tuple(in, oldtup);
230 *has_oldtuple = true;
231
232 action = pq_getmsgbyte(in);
233 }
234 else
235 *has_oldtuple = false;
236
237 /* check for new tuple */
238 if (action != 'N')
239 elog(ERROR, "expected action 'N', got %c",
240 action);
241
242 logicalrep_read_tuple(in, newtup);
243
244 return relid;
245 }
246
247 /*
248 * Write DELETE to the output stream.
249 */
250 void
logicalrep_write_delete(StringInfo out,Relation rel,HeapTuple oldtuple)251 logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple)
252 {
253 Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
254 rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
255 rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
256
257 pq_sendbyte(out, 'D'); /* action DELETE */
258
259 /* use Oid as relation identifier */
260 pq_sendint32(out, RelationGetRelid(rel));
261
262 if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
263 pq_sendbyte(out, 'O'); /* old tuple follows */
264 else
265 pq_sendbyte(out, 'K'); /* old key follows */
266
267 logicalrep_write_tuple(out, rel, oldtuple);
268 }
269
270 /*
271 * Read DELETE from stream.
272 *
273 * Fills the old tuple.
274 */
275 LogicalRepRelId
logicalrep_read_delete(StringInfo in,LogicalRepTupleData * oldtup)276 logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
277 {
278 char action;
279 LogicalRepRelId relid;
280
281 /* read the relation id */
282 relid = pq_getmsgint(in, 4);
283
284 /* read and verify action */
285 action = pq_getmsgbyte(in);
286 if (action != 'K' && action != 'O')
287 elog(ERROR, "expected action 'O' or 'K', got %c", action);
288
289 logicalrep_read_tuple(in, oldtup);
290
291 return relid;
292 }
293
294 /*
295 * Write TRUNCATE to the output stream.
296 */
297 void
logicalrep_write_truncate(StringInfo out,int nrelids,Oid relids[],bool cascade,bool restart_seqs)298 logicalrep_write_truncate(StringInfo out,
299 int nrelids,
300 Oid relids[],
301 bool cascade, bool restart_seqs)
302 {
303 int i;
304 uint8 flags = 0;
305
306 pq_sendbyte(out, 'T'); /* action TRUNCATE */
307
308 pq_sendint32(out, nrelids);
309
310 /* encode and send truncate flags */
311 if (cascade)
312 flags |= TRUNCATE_CASCADE;
313 if (restart_seqs)
314 flags |= TRUNCATE_RESTART_SEQS;
315 pq_sendint8(out, flags);
316
317 for (i = 0; i < nrelids; i++)
318 pq_sendint32(out, relids[i]);
319 }
320
321 /*
322 * Read TRUNCATE from stream.
323 */
324 List *
logicalrep_read_truncate(StringInfo in,bool * cascade,bool * restart_seqs)325 logicalrep_read_truncate(StringInfo in,
326 bool *cascade, bool *restart_seqs)
327 {
328 int i;
329 int nrelids;
330 List *relids = NIL;
331 uint8 flags;
332
333 nrelids = pq_getmsgint(in, 4);
334
335 /* read and decode truncate flags */
336 flags = pq_getmsgint(in, 1);
337 *cascade = (flags & TRUNCATE_CASCADE) > 0;
338 *restart_seqs = (flags & TRUNCATE_RESTART_SEQS) > 0;
339
340 for (i = 0; i < nrelids; i++)
341 relids = lappend_oid(relids, pq_getmsgint(in, 4));
342
343 return relids;
344 }
345
346 /*
347 * Write relation description to the output stream.
348 */
349 void
logicalrep_write_rel(StringInfo out,Relation rel)350 logicalrep_write_rel(StringInfo out, Relation rel)
351 {
352 char *relname;
353
354 pq_sendbyte(out, 'R'); /* sending RELATION */
355
356 /* use Oid as relation identifier */
357 pq_sendint32(out, RelationGetRelid(rel));
358
359 /* send qualified relation name */
360 logicalrep_write_namespace(out, RelationGetNamespace(rel));
361 relname = RelationGetRelationName(rel);
362 pq_sendstring(out, relname);
363
364 /* send replica identity */
365 pq_sendbyte(out, rel->rd_rel->relreplident);
366
367 /* send the attribute info */
368 logicalrep_write_attrs(out, rel);
369 }
370
371 /*
372 * Read the relation info from stream and return as LogicalRepRelation.
373 */
374 LogicalRepRelation *
logicalrep_read_rel(StringInfo in)375 logicalrep_read_rel(StringInfo in)
376 {
377 LogicalRepRelation *rel = palloc(sizeof(LogicalRepRelation));
378
379 rel->remoteid = pq_getmsgint(in, 4);
380
381 /* Read relation name from stream */
382 rel->nspname = pstrdup(logicalrep_read_namespace(in));
383 rel->relname = pstrdup(pq_getmsgstring(in));
384
385 /* Read the replica identity. */
386 rel->replident = pq_getmsgbyte(in);
387
388 /* Get attribute description */
389 logicalrep_read_attrs(in, rel);
390
391 return rel;
392 }
393
394 /*
395 * Write type info to the output stream.
396 *
397 * This function will always write base type info.
398 */
399 void
logicalrep_write_typ(StringInfo out,Oid typoid)400 logicalrep_write_typ(StringInfo out, Oid typoid)
401 {
402 Oid basetypoid = getBaseType(typoid);
403 HeapTuple tup;
404 Form_pg_type typtup;
405
406 pq_sendbyte(out, 'Y'); /* sending TYPE */
407
408 tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(basetypoid));
409 if (!HeapTupleIsValid(tup))
410 elog(ERROR, "cache lookup failed for type %u", basetypoid);
411 typtup = (Form_pg_type) GETSTRUCT(tup);
412
413 /* use Oid as relation identifier */
414 pq_sendint32(out, typoid);
415
416 /* send qualified type name */
417 logicalrep_write_namespace(out, typtup->typnamespace);
418 pq_sendstring(out, NameStr(typtup->typname));
419
420 ReleaseSysCache(tup);
421 }
422
423 /*
424 * Read type info from the output stream.
425 */
426 void
logicalrep_read_typ(StringInfo in,LogicalRepTyp * ltyp)427 logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
428 {
429 ltyp->remoteid = pq_getmsgint(in, 4);
430
431 /* Read type name from stream */
432 ltyp->nspname = pstrdup(logicalrep_read_namespace(in));
433 ltyp->typname = pstrdup(pq_getmsgstring(in));
434 }
435
436 /*
437 * Write a tuple to the outputstream, in the most efficient format possible.
438 */
439 static void
logicalrep_write_tuple(StringInfo out,Relation rel,HeapTuple tuple)440 logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple)
441 {
442 TupleDesc desc;
443 Datum values[MaxTupleAttributeNumber];
444 bool isnull[MaxTupleAttributeNumber];
445 int i;
446 uint16 nliveatts = 0;
447
448 desc = RelationGetDescr(rel);
449
450 for (i = 0; i < desc->natts; i++)
451 {
452 if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated)
453 continue;
454 nliveatts++;
455 }
456 pq_sendint16(out, nliveatts);
457
458 /* try to allocate enough memory from the get-go */
459 enlargeStringInfo(out, tuple->t_len +
460 nliveatts * (1 + 4));
461
462 heap_deform_tuple(tuple, desc, values, isnull);
463
464 /* Write the values */
465 for (i = 0; i < desc->natts; i++)
466 {
467 HeapTuple typtup;
468 Form_pg_type typclass;
469 Form_pg_attribute att = TupleDescAttr(desc, i);
470 char *outputstr;
471
472 if (att->attisdropped || att->attgenerated)
473 continue;
474
475 if (isnull[i])
476 {
477 pq_sendbyte(out, 'n'); /* null column */
478 continue;
479 }
480 else if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(values[i]))
481 {
482 pq_sendbyte(out, 'u'); /* unchanged toast column */
483 continue;
484 }
485
486 typtup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(att->atttypid));
487 if (!HeapTupleIsValid(typtup))
488 elog(ERROR, "cache lookup failed for type %u", att->atttypid);
489 typclass = (Form_pg_type) GETSTRUCT(typtup);
490
491 pq_sendbyte(out, 't'); /* 'text' data follows */
492
493 outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]);
494 pq_sendcountedtext(out, outputstr, strlen(outputstr), false);
495 pfree(outputstr);
496
497 ReleaseSysCache(typtup);
498 }
499 }
500
501 /*
502 * Read tuple in remote format from stream.
503 *
504 * The returned tuple points into the input stringinfo.
505 */
506 static void
logicalrep_read_tuple(StringInfo in,LogicalRepTupleData * tuple)507 logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
508 {
509 int i;
510 int natts;
511
512 /* Get number of attributes */
513 natts = pq_getmsgint(in, 2);
514
515 memset(tuple->changed, 0, sizeof(tuple->changed));
516
517 /* Read the data */
518 for (i = 0; i < natts; i++)
519 {
520 char kind;
521
522 kind = pq_getmsgbyte(in);
523
524 switch (kind)
525 {
526 case 'n': /* null */
527 tuple->values[i] = NULL;
528 tuple->changed[i] = true;
529 break;
530 case 'u': /* unchanged column */
531 /* we don't receive the value of an unchanged column */
532 tuple->values[i] = NULL;
533 break;
534 case 't': /* text formatted value */
535 {
536 int len;
537
538 tuple->changed[i] = true;
539
540 len = pq_getmsgint(in, 4); /* read length */
541
542 /* and data */
543 tuple->values[i] = palloc(len + 1);
544 pq_copymsgbytes(in, tuple->values[i], len);
545 tuple->values[i][len] = '\0';
546 }
547 break;
548 default:
549 elog(ERROR, "unrecognized data representation type '%c'", kind);
550 }
551 }
552 }
553
554 /*
555 * Write relation attributes to the stream.
556 */
557 static void
logicalrep_write_attrs(StringInfo out,Relation rel)558 logicalrep_write_attrs(StringInfo out, Relation rel)
559 {
560 TupleDesc desc;
561 int i;
562 uint16 nliveatts = 0;
563 Bitmapset *idattrs = NULL;
564 bool replidentfull;
565
566 desc = RelationGetDescr(rel);
567
568 /* send number of live attributes */
569 for (i = 0; i < desc->natts; i++)
570 {
571 if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated)
572 continue;
573 nliveatts++;
574 }
575 pq_sendint16(out, nliveatts);
576
577 /* fetch bitmap of REPLICATION IDENTITY attributes */
578 replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
579 if (!replidentfull)
580 idattrs = RelationGetIndexAttrBitmap(rel,
581 INDEX_ATTR_BITMAP_IDENTITY_KEY);
582
583 /* send the attributes */
584 for (i = 0; i < desc->natts; i++)
585 {
586 Form_pg_attribute att = TupleDescAttr(desc, i);
587 uint8 flags = 0;
588
589 if (att->attisdropped || att->attgenerated)
590 continue;
591
592 /* REPLICA IDENTITY FULL means all columns are sent as part of key. */
593 if (replidentfull ||
594 bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
595 idattrs))
596 flags |= LOGICALREP_IS_REPLICA_IDENTITY;
597
598 pq_sendbyte(out, flags);
599
600 /* attribute name */
601 pq_sendstring(out, NameStr(att->attname));
602
603 /* attribute type id */
604 pq_sendint32(out, (int) att->atttypid);
605
606 /* attribute mode */
607 pq_sendint32(out, att->atttypmod);
608 }
609
610 bms_free(idattrs);
611 }
612
613 /*
614 * Read relation attribute names from the stream.
615 */
616 static void
logicalrep_read_attrs(StringInfo in,LogicalRepRelation * rel)617 logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel)
618 {
619 int i;
620 int natts;
621 char **attnames;
622 Oid *atttyps;
623 Bitmapset *attkeys = NULL;
624
625 natts = pq_getmsgint(in, 2);
626 attnames = palloc(natts * sizeof(char *));
627 atttyps = palloc(natts * sizeof(Oid));
628
629 /* read the attributes */
630 for (i = 0; i < natts; i++)
631 {
632 uint8 flags;
633
634 /* Check for replica identity column */
635 flags = pq_getmsgbyte(in);
636 if (flags & LOGICALREP_IS_REPLICA_IDENTITY)
637 attkeys = bms_add_member(attkeys, i);
638
639 /* attribute name */
640 attnames[i] = pstrdup(pq_getmsgstring(in));
641
642 /* attribute type id */
643 atttyps[i] = (Oid) pq_getmsgint(in, 4);
644
645 /* we ignore attribute mode for now */
646 (void) pq_getmsgint(in, 4);
647 }
648
649 rel->attnames = attnames;
650 rel->atttyps = atttyps;
651 rel->attkeys = attkeys;
652 rel->natts = natts;
653 }
654
655 /*
656 * Write the namespace name or empty string for pg_catalog (to save space).
657 */
658 static void
logicalrep_write_namespace(StringInfo out,Oid nspid)659 logicalrep_write_namespace(StringInfo out, Oid nspid)
660 {
661 if (nspid == PG_CATALOG_NAMESPACE)
662 pq_sendbyte(out, '\0');
663 else
664 {
665 char *nspname = get_namespace_name(nspid);
666
667 if (nspname == NULL)
668 elog(ERROR, "cache lookup failed for namespace %u",
669 nspid);
670
671 pq_sendstring(out, nspname);
672 }
673 }
674
675 /*
676 * Read the namespace name while treating empty string as pg_catalog.
677 */
678 static const char *
logicalrep_read_namespace(StringInfo in)679 logicalrep_read_namespace(StringInfo in)
680 {
681 const char *nspname = pq_getmsgstring(in);
682
683 if (nspname[0] == '\0')
684 nspname = "pg_catalog";
685
686 return nspname;
687 }
688