1 /*-------------------------------------------------------------------------
2  *
3  * test_decoding.c
4  *		  example logical decoding output plugin
5  *
6  * Copyright (c) 2012-2016, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  *		  contrib/test_decoding/test_decoding.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include "access/sysattr.h"
16 
17 #include "catalog/pg_class.h"
18 #include "catalog/pg_type.h"
19 
20 #include "nodes/parsenodes.h"
21 
22 #include "replication/output_plugin.h"
23 #include "replication/logical.h"
24 #include "replication/message.h"
25 #include "replication/origin.h"
26 
27 #include "utils/builtins.h"
28 #include "utils/lsyscache.h"
29 #include "utils/memutils.h"
30 #include "utils/rel.h"
31 #include "utils/relcache.h"
32 #include "utils/syscache.h"
33 #include "utils/typcache.h"
34 
35 PG_MODULE_MAGIC;
36 
37 /* These must be available to pg_dlsym() */
38 extern void _PG_init(void);
39 extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
40 
41 typedef struct
42 {
43 	MemoryContext context;
44 	bool		include_xids;
45 	bool		include_timestamp;
46 	bool		skip_empty_xacts;
47 	bool		xact_wrote_changes;
48 	bool		only_local;
49 } TestDecodingData;
50 
51 static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
52 				  bool is_init);
53 static void pg_decode_shutdown(LogicalDecodingContext *ctx);
54 static void pg_decode_begin_txn(LogicalDecodingContext *ctx,
55 					ReorderBufferTXN *txn);
56 static void pg_output_begin(LogicalDecodingContext *ctx,
57 				TestDecodingData *data,
58 				ReorderBufferTXN *txn,
59 				bool last_write);
60 static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
61 					 ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
62 static void pg_decode_change(LogicalDecodingContext *ctx,
63 				 ReorderBufferTXN *txn, Relation rel,
64 				 ReorderBufferChange *change);
65 static bool pg_decode_filter(LogicalDecodingContext *ctx,
66 				 RepOriginId origin_id);
67 static void pg_decode_message(LogicalDecodingContext *ctx,
68 				  ReorderBufferTXN *txn, XLogRecPtr message_lsn,
69 				  bool transactional, const char *prefix,
70 				  Size sz, const char *message);
71 
72 void
_PG_init(void)73 _PG_init(void)
74 {
75 	/* other plugins can perform things here */
76 }
77 
78 /* specify output plugin callbacks */
79 void
_PG_output_plugin_init(OutputPluginCallbacks * cb)80 _PG_output_plugin_init(OutputPluginCallbacks *cb)
81 {
82 	AssertVariableIsOfType(&_PG_output_plugin_init, LogicalOutputPluginInit);
83 
84 	cb->startup_cb = pg_decode_startup;
85 	cb->begin_cb = pg_decode_begin_txn;
86 	cb->change_cb = pg_decode_change;
87 	cb->commit_cb = pg_decode_commit_txn;
88 	cb->filter_by_origin_cb = pg_decode_filter;
89 	cb->shutdown_cb = pg_decode_shutdown;
90 	cb->message_cb = pg_decode_message;
91 }
92 
93 
94 /* initialize this plugin */
95 static void
pg_decode_startup(LogicalDecodingContext * ctx,OutputPluginOptions * opt,bool is_init)96 pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
97 				  bool is_init)
98 {
99 	ListCell   *option;
100 	TestDecodingData *data;
101 
102 	data = palloc0(sizeof(TestDecodingData));
103 	data->context = AllocSetContextCreate(ctx->context,
104 										  "text conversion context",
105 										  ALLOCSET_DEFAULT_SIZES);
106 	data->include_xids = true;
107 	data->include_timestamp = false;
108 	data->skip_empty_xacts = false;
109 	data->only_local = false;
110 
111 	ctx->output_plugin_private = data;
112 
113 	opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
114 
115 	foreach(option, ctx->output_plugin_options)
116 	{
117 		DefElem    *elem = lfirst(option);
118 
119 		Assert(elem->arg == NULL || IsA(elem->arg, String));
120 
121 		if (strcmp(elem->defname, "include-xids") == 0)
122 		{
123 			/* if option does not provide a value, it means its value is true */
124 			if (elem->arg == NULL)
125 				data->include_xids = true;
126 			else if (!parse_bool(strVal(elem->arg), &data->include_xids))
127 				ereport(ERROR,
128 						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
129 				  errmsg("could not parse value \"%s\" for parameter \"%s\"",
130 						 strVal(elem->arg), elem->defname)));
131 		}
132 		else if (strcmp(elem->defname, "include-timestamp") == 0)
133 		{
134 			if (elem->arg == NULL)
135 				data->include_timestamp = true;
136 			else if (!parse_bool(strVal(elem->arg), &data->include_timestamp))
137 				ereport(ERROR,
138 						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
139 				  errmsg("could not parse value \"%s\" for parameter \"%s\"",
140 						 strVal(elem->arg), elem->defname)));
141 		}
142 		else if (strcmp(elem->defname, "force-binary") == 0)
143 		{
144 			bool		force_binary;
145 
146 			if (elem->arg == NULL)
147 				continue;
148 			else if (!parse_bool(strVal(elem->arg), &force_binary))
149 				ereport(ERROR,
150 						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
151 				  errmsg("could not parse value \"%s\" for parameter \"%s\"",
152 						 strVal(elem->arg), elem->defname)));
153 
154 			if (force_binary)
155 				opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
156 		}
157 		else if (strcmp(elem->defname, "skip-empty-xacts") == 0)
158 		{
159 
160 			if (elem->arg == NULL)
161 				data->skip_empty_xacts = true;
162 			else if (!parse_bool(strVal(elem->arg), &data->skip_empty_xacts))
163 				ereport(ERROR,
164 						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
165 				  errmsg("could not parse value \"%s\" for parameter \"%s\"",
166 						 strVal(elem->arg), elem->defname)));
167 		}
168 		else if (strcmp(elem->defname, "only-local") == 0)
169 		{
170 
171 			if (elem->arg == NULL)
172 				data->only_local = true;
173 			else if (!parse_bool(strVal(elem->arg), &data->only_local))
174 				ereport(ERROR,
175 						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
176 				  errmsg("could not parse value \"%s\" for parameter \"%s\"",
177 						 strVal(elem->arg), elem->defname)));
178 		}
179 		else
180 		{
181 			ereport(ERROR,
182 					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
183 					 errmsg("option \"%s\" = \"%s\" is unknown",
184 							elem->defname,
185 							elem->arg ? strVal(elem->arg) : "(null)")));
186 		}
187 	}
188 }
189 
190 /* cleanup this plugin's resources */
191 static void
pg_decode_shutdown(LogicalDecodingContext * ctx)192 pg_decode_shutdown(LogicalDecodingContext *ctx)
193 {
194 	TestDecodingData *data = ctx->output_plugin_private;
195 
196 	/* cleanup our own resources via memory context reset */
197 	MemoryContextDelete(data->context);
198 }
199 
200 /* BEGIN callback */
201 static void
pg_decode_begin_txn(LogicalDecodingContext * ctx,ReorderBufferTXN * txn)202 pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
203 {
204 	TestDecodingData *data = ctx->output_plugin_private;
205 
206 	data->xact_wrote_changes = false;
207 	if (data->skip_empty_xacts)
208 		return;
209 
210 	pg_output_begin(ctx, data, txn, true);
211 }
212 
213 static void
pg_output_begin(LogicalDecodingContext * ctx,TestDecodingData * data,ReorderBufferTXN * txn,bool last_write)214 pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
215 {
216 	OutputPluginPrepareWrite(ctx, last_write);
217 	if (data->include_xids)
218 		appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
219 	else
220 		appendStringInfoString(ctx->out, "BEGIN");
221 	OutputPluginWrite(ctx, last_write);
222 }
223 
224 /* COMMIT callback */
225 static void
pg_decode_commit_txn(LogicalDecodingContext * ctx,ReorderBufferTXN * txn,XLogRecPtr commit_lsn)226 pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
227 					 XLogRecPtr commit_lsn)
228 {
229 	TestDecodingData *data = ctx->output_plugin_private;
230 
231 	if (data->skip_empty_xacts && !data->xact_wrote_changes)
232 		return;
233 
234 	OutputPluginPrepareWrite(ctx, true);
235 	if (data->include_xids)
236 		appendStringInfo(ctx->out, "COMMIT %u", txn->xid);
237 	else
238 		appendStringInfoString(ctx->out, "COMMIT");
239 
240 	if (data->include_timestamp)
241 		appendStringInfo(ctx->out, " (at %s)",
242 						 timestamptz_to_str(txn->commit_time));
243 
244 	OutputPluginWrite(ctx, true);
245 }
246 
247 static bool
pg_decode_filter(LogicalDecodingContext * ctx,RepOriginId origin_id)248 pg_decode_filter(LogicalDecodingContext *ctx,
249 				 RepOriginId origin_id)
250 {
251 	TestDecodingData *data = ctx->output_plugin_private;
252 
253 	if (data->only_local && origin_id != InvalidRepOriginId)
254 		return true;
255 	return false;
256 }
257 
258 /*
259  * Print literal `outputstr' already represented as string of type `typid'
260  * into stringbuf `s'.
261  *
262  * Some builtin types aren't quoted, the rest is quoted. Escaping is done as
263  * if standard_conforming_strings were enabled.
264  */
265 static void
print_literal(StringInfo s,Oid typid,char * outputstr)266 print_literal(StringInfo s, Oid typid, char *outputstr)
267 {
268 	const char *valptr;
269 
270 	switch (typid)
271 	{
272 		case INT2OID:
273 		case INT4OID:
274 		case INT8OID:
275 		case OIDOID:
276 		case FLOAT4OID:
277 		case FLOAT8OID:
278 		case NUMERICOID:
279 			/* NB: We don't care about Inf, NaN et al. */
280 			appendStringInfoString(s, outputstr);
281 			break;
282 
283 		case BITOID:
284 		case VARBITOID:
285 			appendStringInfo(s, "B'%s'", outputstr);
286 			break;
287 
288 		case BOOLOID:
289 			if (strcmp(outputstr, "t") == 0)
290 				appendStringInfoString(s, "true");
291 			else
292 				appendStringInfoString(s, "false");
293 			break;
294 
295 		default:
296 			appendStringInfoChar(s, '\'');
297 			for (valptr = outputstr; *valptr; valptr++)
298 			{
299 				char		ch = *valptr;
300 
301 				if (SQL_STR_DOUBLE(ch, false))
302 					appendStringInfoChar(s, ch);
303 				appendStringInfoChar(s, ch);
304 			}
305 			appendStringInfoChar(s, '\'');
306 			break;
307 	}
308 }
309 
310 /* print the tuple 'tuple' into the StringInfo s */
311 static void
tuple_to_stringinfo(StringInfo s,TupleDesc tupdesc,HeapTuple tuple,bool skip_nulls)312 tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls)
313 {
314 	int			natt;
315 	Oid			oid;
316 
317 	/* print oid of tuple, it's not included in the TupleDesc */
318 	if ((oid = HeapTupleHeaderGetOid(tuple->t_data)) != InvalidOid)
319 	{
320 		appendStringInfo(s, " oid[oid]:%u", oid);
321 	}
322 
323 	/* print all columns individually */
324 	for (natt = 0; natt < tupdesc->natts; natt++)
325 	{
326 		Form_pg_attribute attr; /* the attribute itself */
327 		Oid			typid;		/* type of current attribute */
328 		Oid			typoutput;	/* output function */
329 		bool		typisvarlena;
330 		Datum		origval;	/* possibly toasted Datum */
331 		bool		isnull;		/* column is null? */
332 
333 		attr = tupdesc->attrs[natt];
334 
335 		/*
336 		 * don't print dropped columns, we can't be sure everything is
337 		 * available for them
338 		 */
339 		if (attr->attisdropped)
340 			continue;
341 
342 		/*
343 		 * Don't print system columns, oid will already have been printed if
344 		 * present.
345 		 */
346 		if (attr->attnum < 0)
347 			continue;
348 
349 		typid = attr->atttypid;
350 
351 		/* get Datum from tuple */
352 		origval = heap_getattr(tuple, natt + 1, tupdesc, &isnull);
353 
354 		if (isnull && skip_nulls)
355 			continue;
356 
357 		/* print attribute name */
358 		appendStringInfoChar(s, ' ');
359 		appendStringInfoString(s, quote_identifier(NameStr(attr->attname)));
360 
361 		/* print attribute type */
362 		appendStringInfoChar(s, '[');
363 		appendStringInfoString(s, format_type_be(typid));
364 		appendStringInfoChar(s, ']');
365 
366 		/* query output function */
367 		getTypeOutputInfo(typid,
368 						  &typoutput, &typisvarlena);
369 
370 		/* print separator */
371 		appendStringInfoChar(s, ':');
372 
373 		/* print data */
374 		if (isnull)
375 			appendStringInfoString(s, "null");
376 		else if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval))
377 			appendStringInfoString(s, "unchanged-toast-datum");
378 		else if (!typisvarlena)
379 			print_literal(s, typid,
380 						  OidOutputFunctionCall(typoutput, origval));
381 		else
382 		{
383 			Datum		val;	/* definitely detoasted Datum */
384 
385 			val = PointerGetDatum(PG_DETOAST_DATUM(origval));
386 			print_literal(s, typid, OidOutputFunctionCall(typoutput, val));
387 		}
388 	}
389 }
390 
391 /*
392  * callback for individual changed tuples
393  */
394 static void
pg_decode_change(LogicalDecodingContext * ctx,ReorderBufferTXN * txn,Relation relation,ReorderBufferChange * change)395 pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
396 				 Relation relation, ReorderBufferChange *change)
397 {
398 	TestDecodingData *data;
399 	Form_pg_class class_form;
400 	TupleDesc	tupdesc;
401 	MemoryContext old;
402 
403 	data = ctx->output_plugin_private;
404 
405 	/* output BEGIN if we haven't yet */
406 	if (data->skip_empty_xacts && !data->xact_wrote_changes)
407 	{
408 		pg_output_begin(ctx, data, txn, false);
409 	}
410 	data->xact_wrote_changes = true;
411 
412 	class_form = RelationGetForm(relation);
413 	tupdesc = RelationGetDescr(relation);
414 
415 	/* Avoid leaking memory by using and resetting our own context */
416 	old = MemoryContextSwitchTo(data->context);
417 
418 	OutputPluginPrepareWrite(ctx, true);
419 
420 	appendStringInfoString(ctx->out, "table ");
421 	appendStringInfoString(ctx->out,
422 						   quote_qualified_identifier(
423 													  get_namespace_name(
424 							  get_rel_namespace(RelationGetRelid(relation))),
425 											  NameStr(class_form->relname)));
426 	appendStringInfoChar(ctx->out, ':');
427 
428 	switch (change->action)
429 	{
430 		case REORDER_BUFFER_CHANGE_INSERT:
431 			appendStringInfoString(ctx->out, " INSERT:");
432 			if (change->data.tp.newtuple == NULL)
433 				appendStringInfoString(ctx->out, " (no-tuple-data)");
434 			else
435 				tuple_to_stringinfo(ctx->out, tupdesc,
436 									&change->data.tp.newtuple->tuple,
437 									false);
438 			break;
439 		case REORDER_BUFFER_CHANGE_UPDATE:
440 			appendStringInfoString(ctx->out, " UPDATE:");
441 			if (change->data.tp.oldtuple != NULL)
442 			{
443 				appendStringInfoString(ctx->out, " old-key:");
444 				tuple_to_stringinfo(ctx->out, tupdesc,
445 									&change->data.tp.oldtuple->tuple,
446 									true);
447 				appendStringInfoString(ctx->out, " new-tuple:");
448 			}
449 
450 			if (change->data.tp.newtuple == NULL)
451 				appendStringInfoString(ctx->out, " (no-tuple-data)");
452 			else
453 				tuple_to_stringinfo(ctx->out, tupdesc,
454 									&change->data.tp.newtuple->tuple,
455 									false);
456 			break;
457 		case REORDER_BUFFER_CHANGE_DELETE:
458 			appendStringInfoString(ctx->out, " DELETE:");
459 
460 			/* if there was no PK, we only know that a delete happened */
461 			if (change->data.tp.oldtuple == NULL)
462 				appendStringInfoString(ctx->out, " (no-tuple-data)");
463 			/* In DELETE, only the replica identity is present; display that */
464 			else
465 				tuple_to_stringinfo(ctx->out, tupdesc,
466 									&change->data.tp.oldtuple->tuple,
467 									true);
468 			break;
469 		default:
470 			Assert(false);
471 	}
472 
473 	MemoryContextSwitchTo(old);
474 	MemoryContextReset(data->context);
475 
476 	OutputPluginWrite(ctx, true);
477 }
478 
479 static void
pg_decode_message(LogicalDecodingContext * ctx,ReorderBufferTXN * txn,XLogRecPtr lsn,bool transactional,const char * prefix,Size sz,const char * message)480 pg_decode_message(LogicalDecodingContext *ctx,
481 				  ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
482 				  const char *prefix, Size sz, const char *message)
483 {
484 	OutputPluginPrepareWrite(ctx, true);
485 	appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:",
486 					 transactional, prefix, sz);
487 	appendBinaryStringInfo(ctx->out, message, sz);
488 	OutputPluginWrite(ctx, true);
489 }
490