1 /*-------------------------------------------------------------------------
2 *
3 * pgoutput.c
4 * Logical Replication output plugin
5 *
6 * Copyright (c) 2012-2017, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * src/backend/replication/pgoutput/pgoutput.c
10 *
11 *-------------------------------------------------------------------------
12 */
13 #include "postgres.h"
14
15 #include "catalog/pg_publication.h"
16
17 #include "replication/logical.h"
18 #include "replication/logicalproto.h"
19 #include "replication/origin.h"
20 #include "replication/pgoutput.h"
21
22 #include "utils/inval.h"
23 #include "utils/int8.h"
24 #include "utils/lsyscache.h"
25 #include "utils/memutils.h"
26 #include "utils/syscache.h"
27 #include "utils/varlena.h"
28
29 PG_MODULE_MAGIC;
30
31 extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
32
33 static void pgoutput_startup(LogicalDecodingContext *ctx,
34 OutputPluginOptions *opt, bool is_init);
35 static void pgoutput_shutdown(LogicalDecodingContext *ctx);
36 static void pgoutput_begin_txn(LogicalDecodingContext *ctx,
37 ReorderBufferTXN *txn);
38 static void pgoutput_commit_txn(LogicalDecodingContext *ctx,
39 ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
40 static void pgoutput_change(LogicalDecodingContext *ctx,
41 ReorderBufferTXN *txn, Relation rel,
42 ReorderBufferChange *change);
43 static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
44 RepOriginId origin_id);
45
46 static bool publications_valid;
47
48 static List *LoadPublications(List *pubnames);
49 static void publication_invalidation_cb(Datum arg, int cacheid,
50 uint32 hashvalue);
51
52 /* Entry in the map used to remember which relation schemas we sent. */
53 typedef struct RelationSyncEntry
54 {
55 Oid relid; /* relation oid */
56 bool schema_sent; /* did we send the schema? */
57 bool replicate_valid;
58 PublicationActions pubactions;
59 } RelationSyncEntry;
60
61 /* Map used to remember which relation schemas we sent. */
62 static HTAB *RelationSyncCache = NULL;
63
64 static void init_rel_sync_cache(MemoryContext decoding_context);
65 static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Oid relid);
66 static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
67 static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
68 uint32 hashvalue);
69
70 /*
71 * Specify output plugin callbacks
72 */
73 void
_PG_output_plugin_init(OutputPluginCallbacks * cb)74 _PG_output_plugin_init(OutputPluginCallbacks *cb)
75 {
76 AssertVariableIsOfType(&_PG_output_plugin_init, LogicalOutputPluginInit);
77
78 cb->startup_cb = pgoutput_startup;
79 cb->begin_cb = pgoutput_begin_txn;
80 cb->change_cb = pgoutput_change;
81 cb->commit_cb = pgoutput_commit_txn;
82 cb->filter_by_origin_cb = pgoutput_origin_filter;
83 cb->shutdown_cb = pgoutput_shutdown;
84 }
85
86 static void
parse_output_parameters(List * options,uint32 * protocol_version,List ** publication_names)87 parse_output_parameters(List *options, uint32 *protocol_version,
88 List **publication_names)
89 {
90 ListCell *lc;
91 bool protocol_version_given = false;
92 bool publication_names_given = false;
93
94 foreach(lc, options)
95 {
96 DefElem *defel = (DefElem *) lfirst(lc);
97
98 Assert(defel->arg == NULL || IsA(defel->arg, String));
99
100 /* Check each param, whether or not we recognize it */
101 if (strcmp(defel->defname, "proto_version") == 0)
102 {
103 int64 parsed;
104
105 if (protocol_version_given)
106 ereport(ERROR,
107 (errcode(ERRCODE_SYNTAX_ERROR),
108 errmsg("conflicting or redundant options")));
109 protocol_version_given = true;
110
111 if (!scanint8(strVal(defel->arg), true, &parsed))
112 ereport(ERROR,
113 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
114 errmsg("invalid proto_version")));
115
116 if (parsed > PG_UINT32_MAX || parsed < 0)
117 ereport(ERROR,
118 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
119 errmsg("proto_version \"%s\" out of range",
120 strVal(defel->arg))));
121
122 *protocol_version = (uint32) parsed;
123 }
124 else if (strcmp(defel->defname, "publication_names") == 0)
125 {
126 if (publication_names_given)
127 ereport(ERROR,
128 (errcode(ERRCODE_SYNTAX_ERROR),
129 errmsg("conflicting or redundant options")));
130 publication_names_given = true;
131
132 if (!SplitIdentifierString(strVal(defel->arg), ',',
133 publication_names))
134 ereport(ERROR,
135 (errcode(ERRCODE_INVALID_NAME),
136 errmsg("invalid publication_names syntax")));
137 }
138 else
139 elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
140 }
141 }
142
143 /*
144 * Initialize this plugin
145 */
146 static void
pgoutput_startup(LogicalDecodingContext * ctx,OutputPluginOptions * opt,bool is_init)147 pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
148 bool is_init)
149 {
150 PGOutputData *data = palloc0(sizeof(PGOutputData));
151
152 /* Create our memory context for private allocations. */
153 data->context = AllocSetContextCreate(ctx->context,
154 "logical replication output context",
155 ALLOCSET_DEFAULT_MINSIZE,
156 ALLOCSET_DEFAULT_INITSIZE,
157 ALLOCSET_DEFAULT_MAXSIZE);
158
159 ctx->output_plugin_private = data;
160
161 /* This plugin uses binary protocol. */
162 opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
163
164 /*
165 * This is replication start and not slot initialization.
166 *
167 * Parse and validate options passed by the client.
168 */
169 if (!is_init)
170 {
171 /* Parse the params and ERROR if we see any we don't recognize */
172 parse_output_parameters(ctx->output_plugin_options,
173 &data->protocol_version,
174 &data->publication_names);
175
176 /* Check if we support requested protocol */
177 if (data->protocol_version > LOGICALREP_PROTO_VERSION_NUM)
178 ereport(ERROR,
179 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
180 errmsg("client sent proto_version=%d but we only support protocol %d or lower",
181 data->protocol_version, LOGICALREP_PROTO_VERSION_NUM)));
182
183 if (data->protocol_version < LOGICALREP_PROTO_MIN_VERSION_NUM)
184 ereport(ERROR,
185 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
186 errmsg("client sent proto_version=%d but we only support protocol %d or higher",
187 data->protocol_version, LOGICALREP_PROTO_MIN_VERSION_NUM)));
188
189 if (list_length(data->publication_names) < 1)
190 ereport(ERROR,
191 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
192 errmsg("publication_names parameter missing")));
193
194 /* Init publication state. */
195 data->publications = NIL;
196 publications_valid = false;
197 CacheRegisterSyscacheCallback(PUBLICATIONOID,
198 publication_invalidation_cb,
199 (Datum) 0);
200
201 /* Initialize relation schema cache. */
202 init_rel_sync_cache(CacheMemoryContext);
203 }
204 }
205
206 /*
207 * BEGIN callback
208 */
209 static void
pgoutput_begin_txn(LogicalDecodingContext * ctx,ReorderBufferTXN * txn)210 pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
211 {
212 bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
213
214 OutputPluginPrepareWrite(ctx, !send_replication_origin);
215 logicalrep_write_begin(ctx->out, txn);
216
217 if (send_replication_origin)
218 {
219 char *origin;
220
221 /* Message boundary */
222 OutputPluginWrite(ctx, false);
223 OutputPluginPrepareWrite(ctx, true);
224
225 /*----------
226 * XXX: which behaviour do we want here?
227 *
228 * Alternatives:
229 * - don't send origin message if origin name not found
230 * (that's what we do now)
231 * - throw error - that will break replication, not good
232 * - send some special "unknown" origin
233 *----------
234 */
235 if (replorigin_by_oid(txn->origin_id, true, &origin))
236 logicalrep_write_origin(ctx->out, origin, txn->origin_lsn);
237 }
238
239 OutputPluginWrite(ctx, true);
240 }
241
242 /*
243 * COMMIT callback
244 */
245 static void
pgoutput_commit_txn(LogicalDecodingContext * ctx,ReorderBufferTXN * txn,XLogRecPtr commit_lsn)246 pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
247 XLogRecPtr commit_lsn)
248 {
249 OutputPluginUpdateProgress(ctx);
250
251 OutputPluginPrepareWrite(ctx, true);
252 logicalrep_write_commit(ctx->out, txn, commit_lsn);
253 OutputPluginWrite(ctx, true);
254 }
255
256 /*
257 * Sends the decoded DML over wire.
258 */
259 static void
pgoutput_change(LogicalDecodingContext * ctx,ReorderBufferTXN * txn,Relation relation,ReorderBufferChange * change)260 pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
261 Relation relation, ReorderBufferChange *change)
262 {
263 PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
264 MemoryContext old;
265 RelationSyncEntry *relentry;
266
267 if (!is_publishable_relation(relation))
268 return;
269
270 relentry = get_rel_sync_entry(data, RelationGetRelid(relation));
271
272 /* First check the table filter */
273 switch (change->action)
274 {
275 case REORDER_BUFFER_CHANGE_INSERT:
276 if (!relentry->pubactions.pubinsert)
277 return;
278 break;
279 case REORDER_BUFFER_CHANGE_UPDATE:
280 if (!relentry->pubactions.pubupdate)
281 return;
282 break;
283 case REORDER_BUFFER_CHANGE_DELETE:
284 if (!relentry->pubactions.pubdelete)
285 return;
286 break;
287 default:
288 Assert(false);
289 }
290
291 /* Avoid leaking memory by using and resetting our own context */
292 old = MemoryContextSwitchTo(data->context);
293
294 /*
295 * Write the relation schema if the current schema haven't been sent yet.
296 */
297 if (!relentry->schema_sent)
298 {
299 TupleDesc desc;
300 int i;
301
302 desc = RelationGetDescr(relation);
303
304 /*
305 * Write out type info if needed. We do that only for user-created
306 * types. We use FirstBootstrapObjectId as the cutoff, so that we only
307 * consider objects with hand-assigned OIDs to be "built in", not for
308 * instance any function or type defined in the information_schema.
309 * This is important because only hand-assigned OIDs can be expected
310 * to remain stable across major versions.
311 */
312 for (i = 0; i < desc->natts; i++)
313 {
314 Form_pg_attribute att = desc->attrs[i];
315
316 if (att->attisdropped)
317 continue;
318
319 if (att->atttypid < FirstBootstrapObjectId)
320 continue;
321
322 OutputPluginPrepareWrite(ctx, false);
323 logicalrep_write_typ(ctx->out, att->atttypid);
324 OutputPluginWrite(ctx, false);
325 }
326
327 OutputPluginPrepareWrite(ctx, false);
328 logicalrep_write_rel(ctx->out, relation);
329 OutputPluginWrite(ctx, false);
330 relentry->schema_sent = true;
331 }
332
333 /* Send the data */
334 switch (change->action)
335 {
336 case REORDER_BUFFER_CHANGE_INSERT:
337 OutputPluginPrepareWrite(ctx, true);
338 logicalrep_write_insert(ctx->out, relation,
339 &change->data.tp.newtuple->tuple);
340 OutputPluginWrite(ctx, true);
341 break;
342 case REORDER_BUFFER_CHANGE_UPDATE:
343 {
344 HeapTuple oldtuple = change->data.tp.oldtuple ?
345 &change->data.tp.oldtuple->tuple : NULL;
346
347 OutputPluginPrepareWrite(ctx, true);
348 logicalrep_write_update(ctx->out, relation, oldtuple,
349 &change->data.tp.newtuple->tuple);
350 OutputPluginWrite(ctx, true);
351 break;
352 }
353 case REORDER_BUFFER_CHANGE_DELETE:
354 if (change->data.tp.oldtuple)
355 {
356 OutputPluginPrepareWrite(ctx, true);
357 logicalrep_write_delete(ctx->out, relation,
358 &change->data.tp.oldtuple->tuple);
359 OutputPluginWrite(ctx, true);
360 }
361 else
362 elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
363 break;
364 default:
365 Assert(false);
366 }
367
368 /* Cleanup */
369 MemoryContextSwitchTo(old);
370 MemoryContextReset(data->context);
371 }
372
373 /*
374 * Currently we always forward.
375 */
376 static bool
pgoutput_origin_filter(LogicalDecodingContext * ctx,RepOriginId origin_id)377 pgoutput_origin_filter(LogicalDecodingContext *ctx,
378 RepOriginId origin_id)
379 {
380 return false;
381 }
382
383 /*
384 * Shutdown the output plugin.
385 *
386 * Note, we don't need to clean the data->context as it's child context
387 * of the ctx->context so it will be cleaned up by logical decoding machinery.
388 */
389 static void
pgoutput_shutdown(LogicalDecodingContext * ctx)390 pgoutput_shutdown(LogicalDecodingContext *ctx)
391 {
392 if (RelationSyncCache)
393 {
394 hash_destroy(RelationSyncCache);
395 RelationSyncCache = NULL;
396 }
397 }
398
399 /*
400 * Load publications from the list of publication names.
401 */
402 static List *
LoadPublications(List * pubnames)403 LoadPublications(List *pubnames)
404 {
405 List *result = NIL;
406 ListCell *lc;
407
408 foreach(lc, pubnames)
409 {
410 char *pubname = (char *) lfirst(lc);
411 Publication *pub = GetPublicationByName(pubname, false);
412
413 result = lappend(result, pub);
414 }
415
416 return result;
417 }
418
419 /*
420 * Publication cache invalidation callback.
421 */
422 static void
publication_invalidation_cb(Datum arg,int cacheid,uint32 hashvalue)423 publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
424 {
425 publications_valid = false;
426
427 /*
428 * Also invalidate per-relation cache so that next time the filtering info
429 * is checked it will be updated with the new publication settings.
430 */
431 rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
432 }
433
434 /*
435 * Initialize the relation schema sync cache for a decoding session.
436 *
437 * The hash table is destroyed at the end of a decoding session. While
438 * relcache invalidations still exist and will still be invoked, they
439 * will just see the null hash table global and take no action.
440 */
441 static void
init_rel_sync_cache(MemoryContext cachectx)442 init_rel_sync_cache(MemoryContext cachectx)
443 {
444 HASHCTL ctl;
445 MemoryContext old_ctxt;
446
447 if (RelationSyncCache != NULL)
448 return;
449
450 /* Make a new hash table for the cache */
451 MemSet(&ctl, 0, sizeof(ctl));
452 ctl.keysize = sizeof(Oid);
453 ctl.entrysize = sizeof(RelationSyncEntry);
454 ctl.hcxt = cachectx;
455
456 old_ctxt = MemoryContextSwitchTo(cachectx);
457 RelationSyncCache = hash_create("logical replication output relation cache",
458 128, &ctl,
459 HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
460 (void) MemoryContextSwitchTo(old_ctxt);
461
462 Assert(RelationSyncCache != NULL);
463
464 CacheRegisterRelcacheCallback(rel_sync_cache_relation_cb, (Datum) 0);
465 CacheRegisterSyscacheCallback(PUBLICATIONRELMAP,
466 rel_sync_cache_publication_cb,
467 (Datum) 0);
468 }
469
470 /*
471 * Find or create entry in the relation schema cache.
472 */
473 static RelationSyncEntry *
get_rel_sync_entry(PGOutputData * data,Oid relid)474 get_rel_sync_entry(PGOutputData *data, Oid relid)
475 {
476 RelationSyncEntry *entry;
477 bool found;
478 MemoryContext oldctx;
479
480 Assert(RelationSyncCache != NULL);
481
482 /* Find cached function info, creating if not found */
483 oldctx = MemoryContextSwitchTo(CacheMemoryContext);
484 entry = (RelationSyncEntry *) hash_search(RelationSyncCache,
485 (void *) &relid,
486 HASH_ENTER, &found);
487 MemoryContextSwitchTo(oldctx);
488 Assert(entry != NULL);
489
490 /* Not found means schema wasn't sent */
491 if (!found || !entry->replicate_valid)
492 {
493 List *pubids = GetRelationPublications(relid);
494 ListCell *lc;
495
496 /* Reload publications if needed before use. */
497 if (!publications_valid)
498 {
499 oldctx = MemoryContextSwitchTo(CacheMemoryContext);
500 if (data->publications)
501 list_free_deep(data->publications);
502
503 data->publications = LoadPublications(data->publication_names);
504 MemoryContextSwitchTo(oldctx);
505 publications_valid = true;
506 }
507
508 /*
509 * Build publication cache. We can't use one provided by relcache as
510 * relcache considers all publications given relation is in, but here
511 * we only need to consider ones that the subscriber requested.
512 */
513 entry->pubactions.pubinsert = entry->pubactions.pubupdate =
514 entry->pubactions.pubdelete = false;
515
516 foreach(lc, data->publications)
517 {
518 Publication *pub = lfirst(lc);
519
520 /*
521 * Skip tables that look like they are from a heap rewrite (see
522 * make_new_heap()). We need to skip them because the subscriber
523 * won't have a table by that name to receive the data. That
524 * means we won't ship the new data in, say, an added column with
525 * a DEFAULT, but if the user applies the same DDL manually on the
526 * subscriber, then this will work out for them.
527 *
528 * We only need to consider the alltables case, because such a
529 * transient heap won't be an explicit member of a publication.
530 */
531 if (pub->alltables)
532 {
533 char *relname = get_rel_name(relid);
534 unsigned int u;
535 int n;
536
537 if (sscanf(relname, "pg_temp_%u%n", &u, &n) == 1 &&
538 relname[n] == '\0')
539 {
540 if (get_rel_relkind(u) == RELKIND_RELATION)
541 break;
542 }
543 }
544
545 if (pub->alltables || list_member_oid(pubids, pub->oid))
546 {
547 entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
548 entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
549 entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
550 }
551
552 if (entry->pubactions.pubinsert && entry->pubactions.pubupdate &&
553 entry->pubactions.pubdelete)
554 break;
555 }
556
557 list_free(pubids);
558
559 entry->replicate_valid = true;
560 }
561
562 if (!found)
563 entry->schema_sent = false;
564
565 return entry;
566 }
567
568 /*
569 * Relcache invalidation callback
570 */
571 static void
rel_sync_cache_relation_cb(Datum arg,Oid relid)572 rel_sync_cache_relation_cb(Datum arg, Oid relid)
573 {
574 RelationSyncEntry *entry;
575
576 /*
577 * We can get here if the plugin was used in SQL interface as the
578 * RelSchemaSyncCache is destroyed when the decoding finishes, but there
579 * is no way to unregister the relcache invalidation callback.
580 */
581 if (RelationSyncCache == NULL)
582 return;
583
584 /*
585 * Nobody keeps pointers to entries in this hash table around outside
586 * logical decoding callback calls - but invalidation events can come in
587 * *during* a callback if we access the relcache in the callback. Because
588 * of that we must mark the cache entry as invalid but not remove it from
589 * the hash while it could still be referenced, then prune it at a later
590 * safe point.
591 *
592 * Getting invalidations for relations that aren't in the table is
593 * entirely normal, since there's no way to unregister for an invalidation
594 * event. So we don't care if it's found or not.
595 */
596 entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
597 HASH_FIND, NULL);
598
599 /*
600 * Reset schema sent status as the relation definition may have changed.
601 */
602 if (entry != NULL)
603 entry->schema_sent = false;
604 }
605
606 /*
607 * Publication relation map syscache invalidation callback
608 */
609 static void
rel_sync_cache_publication_cb(Datum arg,int cacheid,uint32 hashvalue)610 rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
611 {
612 HASH_SEQ_STATUS status;
613 RelationSyncEntry *entry;
614
615 /*
616 * We can get here if the plugin was used in SQL interface as the
617 * RelSchemaSyncCache is destroyed when the decoding finishes, but there
618 * is no way to unregister the relcache invalidation callback.
619 */
620 if (RelationSyncCache == NULL)
621 return;
622
623 /*
624 * There is no way to find which entry in our cache the hash belongs to so
625 * mark the whole cache as invalid.
626 */
627 hash_seq_init(&status, RelationSyncCache);
628 while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
629 entry->replicate_valid = false;
630 }
631