1 /*-------------------------------------------------------------------------
2  *
3  * pgoutput.c
4  *		Logical Replication output plugin
5  *
6  * Copyright (c) 2012-2017, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  *		  src/backend/replication/pgoutput/pgoutput.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include "catalog/pg_publication.h"
16 
17 #include "replication/logical.h"
18 #include "replication/logicalproto.h"
19 #include "replication/origin.h"
20 #include "replication/pgoutput.h"
21 
22 #include "utils/inval.h"
23 #include "utils/int8.h"
24 #include "utils/lsyscache.h"
25 #include "utils/memutils.h"
26 #include "utils/syscache.h"
27 #include "utils/varlena.h"
28 
29 PG_MODULE_MAGIC;
30 
31 extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
32 
33 static void pgoutput_startup(LogicalDecodingContext *ctx,
34 				 OutputPluginOptions *opt, bool is_init);
35 static void pgoutput_shutdown(LogicalDecodingContext *ctx);
36 static void pgoutput_begin_txn(LogicalDecodingContext *ctx,
37 				   ReorderBufferTXN *txn);
38 static void pgoutput_commit_txn(LogicalDecodingContext *ctx,
39 					ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
40 static void pgoutput_change(LogicalDecodingContext *ctx,
41 				ReorderBufferTXN *txn, Relation rel,
42 				ReorderBufferChange *change);
43 static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
44 					   RepOriginId origin_id);
45 
46 static bool publications_valid;
47 
48 static List *LoadPublications(List *pubnames);
49 static void publication_invalidation_cb(Datum arg, int cacheid,
50 							uint32 hashvalue);
51 
52 /* Entry in the map used to remember which relation schemas we sent. */
53 typedef struct RelationSyncEntry
54 {
55 	Oid			relid;			/* relation oid */
56 	bool		schema_sent;	/* did we send the schema? */
57 	bool		replicate_valid;
58 	PublicationActions pubactions;
59 } RelationSyncEntry;
60 
61 /* Map used to remember which relation schemas we sent. */
62 static HTAB *RelationSyncCache = NULL;
63 
64 static void init_rel_sync_cache(MemoryContext decoding_context);
65 static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Oid relid);
66 static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
67 static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
68 							  uint32 hashvalue);
69 
70 /*
71  * Specify output plugin callbacks
72  */
73 void
_PG_output_plugin_init(OutputPluginCallbacks * cb)74 _PG_output_plugin_init(OutputPluginCallbacks *cb)
75 {
76 	AssertVariableIsOfType(&_PG_output_plugin_init, LogicalOutputPluginInit);
77 
78 	cb->startup_cb = pgoutput_startup;
79 	cb->begin_cb = pgoutput_begin_txn;
80 	cb->change_cb = pgoutput_change;
81 	cb->commit_cb = pgoutput_commit_txn;
82 	cb->filter_by_origin_cb = pgoutput_origin_filter;
83 	cb->shutdown_cb = pgoutput_shutdown;
84 }
85 
86 static void
parse_output_parameters(List * options,uint32 * protocol_version,List ** publication_names)87 parse_output_parameters(List *options, uint32 *protocol_version,
88 						List **publication_names)
89 {
90 	ListCell   *lc;
91 	bool		protocol_version_given = false;
92 	bool		publication_names_given = false;
93 
94 	foreach(lc, options)
95 	{
96 		DefElem    *defel = (DefElem *) lfirst(lc);
97 
98 		Assert(defel->arg == NULL || IsA(defel->arg, String));
99 
100 		/* Check each param, whether or not we recognize it */
101 		if (strcmp(defel->defname, "proto_version") == 0)
102 		{
103 			int64		parsed;
104 
105 			if (protocol_version_given)
106 				ereport(ERROR,
107 						(errcode(ERRCODE_SYNTAX_ERROR),
108 						 errmsg("conflicting or redundant options")));
109 			protocol_version_given = true;
110 
111 			if (!scanint8(strVal(defel->arg), true, &parsed))
112 				ereport(ERROR,
113 						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
114 						 errmsg("invalid proto_version")));
115 
116 			if (parsed > PG_UINT32_MAX || parsed < 0)
117 				ereport(ERROR,
118 						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
119 						 errmsg("proto_version \"%s\" out of range",
120 								strVal(defel->arg))));
121 
122 			*protocol_version = (uint32) parsed;
123 		}
124 		else if (strcmp(defel->defname, "publication_names") == 0)
125 		{
126 			if (publication_names_given)
127 				ereport(ERROR,
128 						(errcode(ERRCODE_SYNTAX_ERROR),
129 						 errmsg("conflicting or redundant options")));
130 			publication_names_given = true;
131 
132 			if (!SplitIdentifierString(strVal(defel->arg), ',',
133 									   publication_names))
134 				ereport(ERROR,
135 						(errcode(ERRCODE_INVALID_NAME),
136 						 errmsg("invalid publication_names syntax")));
137 		}
138 		else
139 			elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
140 	}
141 }
142 
143 /*
144  * Initialize this plugin
145  */
146 static void
pgoutput_startup(LogicalDecodingContext * ctx,OutputPluginOptions * opt,bool is_init)147 pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
148 				 bool is_init)
149 {
150 	PGOutputData *data = palloc0(sizeof(PGOutputData));
151 
152 	/* Create our memory context for private allocations. */
153 	data->context = AllocSetContextCreate(ctx->context,
154 										  "logical replication output context",
155 										  ALLOCSET_DEFAULT_MINSIZE,
156 										  ALLOCSET_DEFAULT_INITSIZE,
157 										  ALLOCSET_DEFAULT_MAXSIZE);
158 
159 	ctx->output_plugin_private = data;
160 
161 	/* This plugin uses binary protocol. */
162 	opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
163 
164 	/*
165 	 * This is replication start and not slot initialization.
166 	 *
167 	 * Parse and validate options passed by the client.
168 	 */
169 	if (!is_init)
170 	{
171 		/* Parse the params and ERROR if we see any we don't recognize */
172 		parse_output_parameters(ctx->output_plugin_options,
173 								&data->protocol_version,
174 								&data->publication_names);
175 
176 		/* Check if we support requested protocol */
177 		if (data->protocol_version > LOGICALREP_PROTO_VERSION_NUM)
178 			ereport(ERROR,
179 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
180 					 errmsg("client sent proto_version=%d but we only support protocol %d or lower",
181 							data->protocol_version, LOGICALREP_PROTO_VERSION_NUM)));
182 
183 		if (data->protocol_version < LOGICALREP_PROTO_MIN_VERSION_NUM)
184 			ereport(ERROR,
185 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
186 					 errmsg("client sent proto_version=%d but we only support protocol %d or higher",
187 							data->protocol_version, LOGICALREP_PROTO_MIN_VERSION_NUM)));
188 
189 		if (list_length(data->publication_names) < 1)
190 			ereport(ERROR,
191 					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
192 					 errmsg("publication_names parameter missing")));
193 
194 		/* Init publication state. */
195 		data->publications = NIL;
196 		publications_valid = false;
197 		CacheRegisterSyscacheCallback(PUBLICATIONOID,
198 									  publication_invalidation_cb,
199 									  (Datum) 0);
200 
201 		/* Initialize relation schema cache. */
202 		init_rel_sync_cache(CacheMemoryContext);
203 	}
204 }
205 
206 /*
207  * BEGIN callback
208  */
209 static void
pgoutput_begin_txn(LogicalDecodingContext * ctx,ReorderBufferTXN * txn)210 pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
211 {
212 	bool		send_replication_origin = txn->origin_id != InvalidRepOriginId;
213 
214 	OutputPluginPrepareWrite(ctx, !send_replication_origin);
215 	logicalrep_write_begin(ctx->out, txn);
216 
217 	if (send_replication_origin)
218 	{
219 		char	   *origin;
220 
221 		/* Message boundary */
222 		OutputPluginWrite(ctx, false);
223 		OutputPluginPrepareWrite(ctx, true);
224 
225 		/*----------
226 		 * XXX: which behaviour do we want here?
227 		 *
228 		 * Alternatives:
229 		 *	- don't send origin message if origin name not found
230 		 *	  (that's what we do now)
231 		 *	- throw error - that will break replication, not good
232 		 *	- send some special "unknown" origin
233 		 *----------
234 		 */
235 		if (replorigin_by_oid(txn->origin_id, true, &origin))
236 			logicalrep_write_origin(ctx->out, origin, txn->origin_lsn);
237 	}
238 
239 	OutputPluginWrite(ctx, true);
240 }
241 
242 /*
243  * COMMIT callback
244  */
245 static void
pgoutput_commit_txn(LogicalDecodingContext * ctx,ReorderBufferTXN * txn,XLogRecPtr commit_lsn)246 pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
247 					XLogRecPtr commit_lsn)
248 {
249 	OutputPluginUpdateProgress(ctx);
250 
251 	OutputPluginPrepareWrite(ctx, true);
252 	logicalrep_write_commit(ctx->out, txn, commit_lsn);
253 	OutputPluginWrite(ctx, true);
254 }
255 
256 /*
257  * Sends the decoded DML over wire.
258  */
259 static void
pgoutput_change(LogicalDecodingContext * ctx,ReorderBufferTXN * txn,Relation relation,ReorderBufferChange * change)260 pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
261 				Relation relation, ReorderBufferChange *change)
262 {
263 	PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
264 	MemoryContext old;
265 	RelationSyncEntry *relentry;
266 
267 	if (!is_publishable_relation(relation))
268 		return;
269 
270 	relentry = get_rel_sync_entry(data, RelationGetRelid(relation));
271 
272 	/* First check the table filter */
273 	switch (change->action)
274 	{
275 		case REORDER_BUFFER_CHANGE_INSERT:
276 			if (!relentry->pubactions.pubinsert)
277 				return;
278 			break;
279 		case REORDER_BUFFER_CHANGE_UPDATE:
280 			if (!relentry->pubactions.pubupdate)
281 				return;
282 			break;
283 		case REORDER_BUFFER_CHANGE_DELETE:
284 			if (!relentry->pubactions.pubdelete)
285 				return;
286 			break;
287 		default:
288 			Assert(false);
289 	}
290 
291 	/* Avoid leaking memory by using and resetting our own context */
292 	old = MemoryContextSwitchTo(data->context);
293 
294 	/*
295 	 * Write the relation schema if the current schema haven't been sent yet.
296 	 */
297 	if (!relentry->schema_sent)
298 	{
299 		TupleDesc	desc;
300 		int			i;
301 
302 		desc = RelationGetDescr(relation);
303 
304 		/*
305 		 * Write out type info if needed.  We do that only for user-created
306 		 * types.  We use FirstBootstrapObjectId as the cutoff, so that we only
307 		 * consider objects with hand-assigned OIDs to be "built in", not for
308 		 * instance any function or type defined in the information_schema.
309 		 * This is important because only hand-assigned OIDs can be expected
310 		 * to remain stable across major versions.
311 		 */
312 		for (i = 0; i < desc->natts; i++)
313 		{
314 			Form_pg_attribute att = desc->attrs[i];
315 
316 			if (att->attisdropped)
317 				continue;
318 
319 			if (att->atttypid < FirstBootstrapObjectId)
320 				continue;
321 
322 			OutputPluginPrepareWrite(ctx, false);
323 			logicalrep_write_typ(ctx->out, att->atttypid);
324 			OutputPluginWrite(ctx, false);
325 		}
326 
327 		OutputPluginPrepareWrite(ctx, false);
328 		logicalrep_write_rel(ctx->out, relation);
329 		OutputPluginWrite(ctx, false);
330 		relentry->schema_sent = true;
331 	}
332 
333 	/* Send the data */
334 	switch (change->action)
335 	{
336 		case REORDER_BUFFER_CHANGE_INSERT:
337 			OutputPluginPrepareWrite(ctx, true);
338 			logicalrep_write_insert(ctx->out, relation,
339 									&change->data.tp.newtuple->tuple);
340 			OutputPluginWrite(ctx, true);
341 			break;
342 		case REORDER_BUFFER_CHANGE_UPDATE:
343 			{
344 				HeapTuple	oldtuple = change->data.tp.oldtuple ?
345 				&change->data.tp.oldtuple->tuple : NULL;
346 
347 				OutputPluginPrepareWrite(ctx, true);
348 				logicalrep_write_update(ctx->out, relation, oldtuple,
349 										&change->data.tp.newtuple->tuple);
350 				OutputPluginWrite(ctx, true);
351 				break;
352 			}
353 		case REORDER_BUFFER_CHANGE_DELETE:
354 			if (change->data.tp.oldtuple)
355 			{
356 				OutputPluginPrepareWrite(ctx, true);
357 				logicalrep_write_delete(ctx->out, relation,
358 										&change->data.tp.oldtuple->tuple);
359 				OutputPluginWrite(ctx, true);
360 			}
361 			else
362 				elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
363 			break;
364 		default:
365 			Assert(false);
366 	}
367 
368 	/* Cleanup */
369 	MemoryContextSwitchTo(old);
370 	MemoryContextReset(data->context);
371 }
372 
373 /*
374  * Currently we always forward.
375  */
376 static bool
pgoutput_origin_filter(LogicalDecodingContext * ctx,RepOriginId origin_id)377 pgoutput_origin_filter(LogicalDecodingContext *ctx,
378 					   RepOriginId origin_id)
379 {
380 	return false;
381 }
382 
383 /*
384  * Shutdown the output plugin.
385  *
386  * Note, we don't need to clean the data->context as it's child context
387  * of the ctx->context so it will be cleaned up by logical decoding machinery.
388  */
389 static void
pgoutput_shutdown(LogicalDecodingContext * ctx)390 pgoutput_shutdown(LogicalDecodingContext *ctx)
391 {
392 	if (RelationSyncCache)
393 	{
394 		hash_destroy(RelationSyncCache);
395 		RelationSyncCache = NULL;
396 	}
397 }
398 
399 /*
400  * Load publications from the list of publication names.
401  */
402 static List *
LoadPublications(List * pubnames)403 LoadPublications(List *pubnames)
404 {
405 	List	   *result = NIL;
406 	ListCell   *lc;
407 
408 	foreach(lc, pubnames)
409 	{
410 		char	   *pubname = (char *) lfirst(lc);
411 		Publication *pub = GetPublicationByName(pubname, false);
412 
413 		result = lappend(result, pub);
414 	}
415 
416 	return result;
417 }
418 
419 /*
420  * Publication cache invalidation callback.
421  */
422 static void
publication_invalidation_cb(Datum arg,int cacheid,uint32 hashvalue)423 publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
424 {
425 	publications_valid = false;
426 
427 	/*
428 	 * Also invalidate per-relation cache so that next time the filtering info
429 	 * is checked it will be updated with the new publication settings.
430 	 */
431 	rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
432 }
433 
434 /*
435  * Initialize the relation schema sync cache for a decoding session.
436  *
437  * The hash table is destroyed at the end of a decoding session. While
438  * relcache invalidations still exist and will still be invoked, they
439  * will just see the null hash table global and take no action.
440  */
441 static void
init_rel_sync_cache(MemoryContext cachectx)442 init_rel_sync_cache(MemoryContext cachectx)
443 {
444 	HASHCTL		ctl;
445 	MemoryContext old_ctxt;
446 
447 	if (RelationSyncCache != NULL)
448 		return;
449 
450 	/* Make a new hash table for the cache */
451 	MemSet(&ctl, 0, sizeof(ctl));
452 	ctl.keysize = sizeof(Oid);
453 	ctl.entrysize = sizeof(RelationSyncEntry);
454 	ctl.hcxt = cachectx;
455 
456 	old_ctxt = MemoryContextSwitchTo(cachectx);
457 	RelationSyncCache = hash_create("logical replication output relation cache",
458 									128, &ctl,
459 									HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
460 	(void) MemoryContextSwitchTo(old_ctxt);
461 
462 	Assert(RelationSyncCache != NULL);
463 
464 	CacheRegisterRelcacheCallback(rel_sync_cache_relation_cb, (Datum) 0);
465 	CacheRegisterSyscacheCallback(PUBLICATIONRELMAP,
466 								  rel_sync_cache_publication_cb,
467 								  (Datum) 0);
468 }
469 
470 /*
471  * Find or create entry in the relation schema cache.
472  */
473 static RelationSyncEntry *
get_rel_sync_entry(PGOutputData * data,Oid relid)474 get_rel_sync_entry(PGOutputData *data, Oid relid)
475 {
476 	RelationSyncEntry *entry;
477 	bool		found;
478 	MemoryContext oldctx;
479 
480 	Assert(RelationSyncCache != NULL);
481 
482 	/* Find cached function info, creating if not found */
483 	oldctx = MemoryContextSwitchTo(CacheMemoryContext);
484 	entry = (RelationSyncEntry *) hash_search(RelationSyncCache,
485 											  (void *) &relid,
486 											  HASH_ENTER, &found);
487 	MemoryContextSwitchTo(oldctx);
488 	Assert(entry != NULL);
489 
490 	/* Not found means schema wasn't sent */
491 	if (!found || !entry->replicate_valid)
492 	{
493 		List	   *pubids = GetRelationPublications(relid);
494 		ListCell   *lc;
495 
496 		/* Reload publications if needed before use. */
497 		if (!publications_valid)
498 		{
499 			oldctx = MemoryContextSwitchTo(CacheMemoryContext);
500 			if (data->publications)
501 				list_free_deep(data->publications);
502 
503 			data->publications = LoadPublications(data->publication_names);
504 			MemoryContextSwitchTo(oldctx);
505 			publications_valid = true;
506 		}
507 
508 		/*
509 		 * Build publication cache. We can't use one provided by relcache as
510 		 * relcache considers all publications given relation is in, but here
511 		 * we only need to consider ones that the subscriber requested.
512 		 */
513 		entry->pubactions.pubinsert = entry->pubactions.pubupdate =
514 			entry->pubactions.pubdelete = false;
515 
516 		foreach(lc, data->publications)
517 		{
518 			Publication *pub = lfirst(lc);
519 
520 			/*
521 			 * Skip tables that look like they are from a heap rewrite (see
522 			 * make_new_heap()).  We need to skip them because the subscriber
523 			 * won't have a table by that name to receive the data.  That
524 			 * means we won't ship the new data in, say, an added column with
525 			 * a DEFAULT, but if the user applies the same DDL manually on the
526 			 * subscriber, then this will work out for them.
527 			 *
528 			 * We only need to consider the alltables case, because such a
529 			 * transient heap won't be an explicit member of a publication.
530 			 */
531 			if (pub->alltables)
532 			{
533 				char	   *relname = get_rel_name(relid);
534 				unsigned int u;
535 				int			n;
536 
537 				if (sscanf(relname, "pg_temp_%u%n", &u, &n) == 1 &&
538 					relname[n] == '\0')
539 				{
540 					if (get_rel_relkind(u) == RELKIND_RELATION)
541 						break;
542 				}
543 			}
544 
545 			if (pub->alltables || list_member_oid(pubids, pub->oid))
546 			{
547 				entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
548 				entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
549 				entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
550 			}
551 
552 			if (entry->pubactions.pubinsert && entry->pubactions.pubupdate &&
553 				entry->pubactions.pubdelete)
554 				break;
555 		}
556 
557 		list_free(pubids);
558 
559 		entry->replicate_valid = true;
560 	}
561 
562 	if (!found)
563 		entry->schema_sent = false;
564 
565 	return entry;
566 }
567 
568 /*
569  * Relcache invalidation callback
570  */
571 static void
rel_sync_cache_relation_cb(Datum arg,Oid relid)572 rel_sync_cache_relation_cb(Datum arg, Oid relid)
573 {
574 	RelationSyncEntry *entry;
575 
576 	/*
577 	 * We can get here if the plugin was used in SQL interface as the
578 	 * RelSchemaSyncCache is destroyed when the decoding finishes, but there
579 	 * is no way to unregister the relcache invalidation callback.
580 	 */
581 	if (RelationSyncCache == NULL)
582 		return;
583 
584 	/*
585 	 * Nobody keeps pointers to entries in this hash table around outside
586 	 * logical decoding callback calls - but invalidation events can come in
587 	 * *during* a callback if we access the relcache in the callback. Because
588 	 * of that we must mark the cache entry as invalid but not remove it from
589 	 * the hash while it could still be referenced, then prune it at a later
590 	 * safe point.
591 	 *
592 	 * Getting invalidations for relations that aren't in the table is
593 	 * entirely normal, since there's no way to unregister for an invalidation
594 	 * event. So we don't care if it's found or not.
595 	 */
596 	entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
597 											  HASH_FIND, NULL);
598 
599 	/*
600 	 * Reset schema sent status as the relation definition may have changed.
601 	 */
602 	if (entry != NULL)
603 		entry->schema_sent = false;
604 }
605 
606 /*
607  * Publication relation map syscache invalidation callback
608  */
609 static void
rel_sync_cache_publication_cb(Datum arg,int cacheid,uint32 hashvalue)610 rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
611 {
612 	HASH_SEQ_STATUS status;
613 	RelationSyncEntry *entry;
614 
615 	/*
616 	 * We can get here if the plugin was used in SQL interface as the
617 	 * RelSchemaSyncCache is destroyed when the decoding finishes, but there
618 	 * is no way to unregister the relcache invalidation callback.
619 	 */
620 	if (RelationSyncCache == NULL)
621 		return;
622 
623 	/*
624 	 * There is no way to find which entry in our cache the hash belongs to so
625 	 * mark the whole cache as invalid.
626 	 */
627 	hash_seq_init(&status, RelationSyncCache);
628 	while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
629 		entry->replicate_valid = false;
630 }
631