1 /*-------------------------------------------------------------------------
2 *
3 * test_decoding.c
4 * example logical decoding output plugin
5 *
6 * Copyright (c) 2012-2016, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * contrib/test_decoding/test_decoding.c
10 *
11 *-------------------------------------------------------------------------
12 */
13 #include "postgres.h"
14
15 #include "access/sysattr.h"
16
17 #include "catalog/pg_class.h"
18 #include "catalog/pg_type.h"
19
20 #include "nodes/parsenodes.h"
21
22 #include "replication/output_plugin.h"
23 #include "replication/logical.h"
24 #include "replication/message.h"
25 #include "replication/origin.h"
26
27 #include "utils/builtins.h"
28 #include "utils/lsyscache.h"
29 #include "utils/memutils.h"
30 #include "utils/rel.h"
31 #include "utils/relcache.h"
32 #include "utils/syscache.h"
33 #include "utils/typcache.h"
34
35 PG_MODULE_MAGIC;
36
37 /* These must be available to pg_dlsym() */
38 extern void _PG_init(void);
39 extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
40
41 typedef struct
42 {
43 MemoryContext context;
44 bool include_xids;
45 bool include_timestamp;
46 bool skip_empty_xacts;
47 bool xact_wrote_changes;
48 bool only_local;
49 } TestDecodingData;
50
51 static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
52 bool is_init);
53 static void pg_decode_shutdown(LogicalDecodingContext *ctx);
54 static void pg_decode_begin_txn(LogicalDecodingContext *ctx,
55 ReorderBufferTXN *txn);
56 static void pg_output_begin(LogicalDecodingContext *ctx,
57 TestDecodingData *data,
58 ReorderBufferTXN *txn,
59 bool last_write);
60 static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
61 ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
62 static void pg_decode_change(LogicalDecodingContext *ctx,
63 ReorderBufferTXN *txn, Relation rel,
64 ReorderBufferChange *change);
65 static bool pg_decode_filter(LogicalDecodingContext *ctx,
66 RepOriginId origin_id);
67 static void pg_decode_message(LogicalDecodingContext *ctx,
68 ReorderBufferTXN *txn, XLogRecPtr message_lsn,
69 bool transactional, const char *prefix,
70 Size sz, const char *message);
71
72 void
_PG_init(void)73 _PG_init(void)
74 {
75 /* other plugins can perform things here */
76 }
77
78 /* specify output plugin callbacks */
79 void
_PG_output_plugin_init(OutputPluginCallbacks * cb)80 _PG_output_plugin_init(OutputPluginCallbacks *cb)
81 {
82 AssertVariableIsOfType(&_PG_output_plugin_init, LogicalOutputPluginInit);
83
84 cb->startup_cb = pg_decode_startup;
85 cb->begin_cb = pg_decode_begin_txn;
86 cb->change_cb = pg_decode_change;
87 cb->commit_cb = pg_decode_commit_txn;
88 cb->filter_by_origin_cb = pg_decode_filter;
89 cb->shutdown_cb = pg_decode_shutdown;
90 cb->message_cb = pg_decode_message;
91 }
92
93
94 /* initialize this plugin */
95 static void
pg_decode_startup(LogicalDecodingContext * ctx,OutputPluginOptions * opt,bool is_init)96 pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
97 bool is_init)
98 {
99 ListCell *option;
100 TestDecodingData *data;
101
102 data = palloc0(sizeof(TestDecodingData));
103 data->context = AllocSetContextCreate(ctx->context,
104 "text conversion context",
105 ALLOCSET_DEFAULT_SIZES);
106 data->include_xids = true;
107 data->include_timestamp = false;
108 data->skip_empty_xacts = false;
109 data->only_local = false;
110
111 ctx->output_plugin_private = data;
112
113 opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
114
115 foreach(option, ctx->output_plugin_options)
116 {
117 DefElem *elem = lfirst(option);
118
119 Assert(elem->arg == NULL || IsA(elem->arg, String));
120
121 if (strcmp(elem->defname, "include-xids") == 0)
122 {
123 /* if option does not provide a value, it means its value is true */
124 if (elem->arg == NULL)
125 data->include_xids = true;
126 else if (!parse_bool(strVal(elem->arg), &data->include_xids))
127 ereport(ERROR,
128 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
129 errmsg("could not parse value \"%s\" for parameter \"%s\"",
130 strVal(elem->arg), elem->defname)));
131 }
132 else if (strcmp(elem->defname, "include-timestamp") == 0)
133 {
134 if (elem->arg == NULL)
135 data->include_timestamp = true;
136 else if (!parse_bool(strVal(elem->arg), &data->include_timestamp))
137 ereport(ERROR,
138 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
139 errmsg("could not parse value \"%s\" for parameter \"%s\"",
140 strVal(elem->arg), elem->defname)));
141 }
142 else if (strcmp(elem->defname, "force-binary") == 0)
143 {
144 bool force_binary;
145
146 if (elem->arg == NULL)
147 continue;
148 else if (!parse_bool(strVal(elem->arg), &force_binary))
149 ereport(ERROR,
150 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
151 errmsg("could not parse value \"%s\" for parameter \"%s\"",
152 strVal(elem->arg), elem->defname)));
153
154 if (force_binary)
155 opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
156 }
157 else if (strcmp(elem->defname, "skip-empty-xacts") == 0)
158 {
159
160 if (elem->arg == NULL)
161 data->skip_empty_xacts = true;
162 else if (!parse_bool(strVal(elem->arg), &data->skip_empty_xacts))
163 ereport(ERROR,
164 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
165 errmsg("could not parse value \"%s\" for parameter \"%s\"",
166 strVal(elem->arg), elem->defname)));
167 }
168 else if (strcmp(elem->defname, "only-local") == 0)
169 {
170
171 if (elem->arg == NULL)
172 data->only_local = true;
173 else if (!parse_bool(strVal(elem->arg), &data->only_local))
174 ereport(ERROR,
175 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
176 errmsg("could not parse value \"%s\" for parameter \"%s\"",
177 strVal(elem->arg), elem->defname)));
178 }
179 else
180 {
181 ereport(ERROR,
182 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
183 errmsg("option \"%s\" = \"%s\" is unknown",
184 elem->defname,
185 elem->arg ? strVal(elem->arg) : "(null)")));
186 }
187 }
188 }
189
190 /* cleanup this plugin's resources */
191 static void
pg_decode_shutdown(LogicalDecodingContext * ctx)192 pg_decode_shutdown(LogicalDecodingContext *ctx)
193 {
194 TestDecodingData *data = ctx->output_plugin_private;
195
196 /* cleanup our own resources via memory context reset */
197 MemoryContextDelete(data->context);
198 }
199
200 /* BEGIN callback */
201 static void
pg_decode_begin_txn(LogicalDecodingContext * ctx,ReorderBufferTXN * txn)202 pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
203 {
204 TestDecodingData *data = ctx->output_plugin_private;
205
206 data->xact_wrote_changes = false;
207 if (data->skip_empty_xacts)
208 return;
209
210 pg_output_begin(ctx, data, txn, true);
211 }
212
213 static void
pg_output_begin(LogicalDecodingContext * ctx,TestDecodingData * data,ReorderBufferTXN * txn,bool last_write)214 pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
215 {
216 OutputPluginPrepareWrite(ctx, last_write);
217 if (data->include_xids)
218 appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
219 else
220 appendStringInfoString(ctx->out, "BEGIN");
221 OutputPluginWrite(ctx, last_write);
222 }
223
224 /* COMMIT callback */
225 static void
pg_decode_commit_txn(LogicalDecodingContext * ctx,ReorderBufferTXN * txn,XLogRecPtr commit_lsn)226 pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
227 XLogRecPtr commit_lsn)
228 {
229 TestDecodingData *data = ctx->output_plugin_private;
230
231 if (data->skip_empty_xacts && !data->xact_wrote_changes)
232 return;
233
234 OutputPluginPrepareWrite(ctx, true);
235 if (data->include_xids)
236 appendStringInfo(ctx->out, "COMMIT %u", txn->xid);
237 else
238 appendStringInfoString(ctx->out, "COMMIT");
239
240 if (data->include_timestamp)
241 appendStringInfo(ctx->out, " (at %s)",
242 timestamptz_to_str(txn->commit_time));
243
244 OutputPluginWrite(ctx, true);
245 }
246
247 static bool
pg_decode_filter(LogicalDecodingContext * ctx,RepOriginId origin_id)248 pg_decode_filter(LogicalDecodingContext *ctx,
249 RepOriginId origin_id)
250 {
251 TestDecodingData *data = ctx->output_plugin_private;
252
253 if (data->only_local && origin_id != InvalidRepOriginId)
254 return true;
255 return false;
256 }
257
258 /*
259 * Print literal `outputstr' already represented as string of type `typid'
260 * into stringbuf `s'.
261 *
262 * Some builtin types aren't quoted, the rest is quoted. Escaping is done as
263 * if standard_conforming_strings were enabled.
264 */
265 static void
print_literal(StringInfo s,Oid typid,char * outputstr)266 print_literal(StringInfo s, Oid typid, char *outputstr)
267 {
268 const char *valptr;
269
270 switch (typid)
271 {
272 case INT2OID:
273 case INT4OID:
274 case INT8OID:
275 case OIDOID:
276 case FLOAT4OID:
277 case FLOAT8OID:
278 case NUMERICOID:
279 /* NB: We don't care about Inf, NaN et al. */
280 appendStringInfoString(s, outputstr);
281 break;
282
283 case BITOID:
284 case VARBITOID:
285 appendStringInfo(s, "B'%s'", outputstr);
286 break;
287
288 case BOOLOID:
289 if (strcmp(outputstr, "t") == 0)
290 appendStringInfoString(s, "true");
291 else
292 appendStringInfoString(s, "false");
293 break;
294
295 default:
296 appendStringInfoChar(s, '\'');
297 for (valptr = outputstr; *valptr; valptr++)
298 {
299 char ch = *valptr;
300
301 if (SQL_STR_DOUBLE(ch, false))
302 appendStringInfoChar(s, ch);
303 appendStringInfoChar(s, ch);
304 }
305 appendStringInfoChar(s, '\'');
306 break;
307 }
308 }
309
310 /* print the tuple 'tuple' into the StringInfo s */
311 static void
tuple_to_stringinfo(StringInfo s,TupleDesc tupdesc,HeapTuple tuple,bool skip_nulls)312 tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls)
313 {
314 int natt;
315 Oid oid;
316
317 /* print oid of tuple, it's not included in the TupleDesc */
318 if ((oid = HeapTupleHeaderGetOid(tuple->t_data)) != InvalidOid)
319 {
320 appendStringInfo(s, " oid[oid]:%u", oid);
321 }
322
323 /* print all columns individually */
324 for (natt = 0; natt < tupdesc->natts; natt++)
325 {
326 Form_pg_attribute attr; /* the attribute itself */
327 Oid typid; /* type of current attribute */
328 Oid typoutput; /* output function */
329 bool typisvarlena;
330 Datum origval; /* possibly toasted Datum */
331 bool isnull; /* column is null? */
332
333 attr = tupdesc->attrs[natt];
334
335 /*
336 * don't print dropped columns, we can't be sure everything is
337 * available for them
338 */
339 if (attr->attisdropped)
340 continue;
341
342 /*
343 * Don't print system columns, oid will already have been printed if
344 * present.
345 */
346 if (attr->attnum < 0)
347 continue;
348
349 typid = attr->atttypid;
350
351 /* get Datum from tuple */
352 origval = heap_getattr(tuple, natt + 1, tupdesc, &isnull);
353
354 if (isnull && skip_nulls)
355 continue;
356
357 /* print attribute name */
358 appendStringInfoChar(s, ' ');
359 appendStringInfoString(s, quote_identifier(NameStr(attr->attname)));
360
361 /* print attribute type */
362 appendStringInfoChar(s, '[');
363 appendStringInfoString(s, format_type_be(typid));
364 appendStringInfoChar(s, ']');
365
366 /* query output function */
367 getTypeOutputInfo(typid,
368 &typoutput, &typisvarlena);
369
370 /* print separator */
371 appendStringInfoChar(s, ':');
372
373 /* print data */
374 if (isnull)
375 appendStringInfoString(s, "null");
376 else if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval))
377 appendStringInfoString(s, "unchanged-toast-datum");
378 else if (!typisvarlena)
379 print_literal(s, typid,
380 OidOutputFunctionCall(typoutput, origval));
381 else
382 {
383 Datum val; /* definitely detoasted Datum */
384
385 val = PointerGetDatum(PG_DETOAST_DATUM(origval));
386 print_literal(s, typid, OidOutputFunctionCall(typoutput, val));
387 }
388 }
389 }
390
391 /*
392 * callback for individual changed tuples
393 */
394 static void
pg_decode_change(LogicalDecodingContext * ctx,ReorderBufferTXN * txn,Relation relation,ReorderBufferChange * change)395 pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
396 Relation relation, ReorderBufferChange *change)
397 {
398 TestDecodingData *data;
399 Form_pg_class class_form;
400 TupleDesc tupdesc;
401 MemoryContext old;
402
403 data = ctx->output_plugin_private;
404
405 /* output BEGIN if we haven't yet */
406 if (data->skip_empty_xacts && !data->xact_wrote_changes)
407 {
408 pg_output_begin(ctx, data, txn, false);
409 }
410 data->xact_wrote_changes = true;
411
412 class_form = RelationGetForm(relation);
413 tupdesc = RelationGetDescr(relation);
414
415 /* Avoid leaking memory by using and resetting our own context */
416 old = MemoryContextSwitchTo(data->context);
417
418 OutputPluginPrepareWrite(ctx, true);
419
420 appendStringInfoString(ctx->out, "table ");
421 appendStringInfoString(ctx->out,
422 quote_qualified_identifier(
423 get_namespace_name(
424 get_rel_namespace(RelationGetRelid(relation))),
425 NameStr(class_form->relname)));
426 appendStringInfoChar(ctx->out, ':');
427
428 switch (change->action)
429 {
430 case REORDER_BUFFER_CHANGE_INSERT:
431 appendStringInfoString(ctx->out, " INSERT:");
432 if (change->data.tp.newtuple == NULL)
433 appendStringInfoString(ctx->out, " (no-tuple-data)");
434 else
435 tuple_to_stringinfo(ctx->out, tupdesc,
436 &change->data.tp.newtuple->tuple,
437 false);
438 break;
439 case REORDER_BUFFER_CHANGE_UPDATE:
440 appendStringInfoString(ctx->out, " UPDATE:");
441 if (change->data.tp.oldtuple != NULL)
442 {
443 appendStringInfoString(ctx->out, " old-key:");
444 tuple_to_stringinfo(ctx->out, tupdesc,
445 &change->data.tp.oldtuple->tuple,
446 true);
447 appendStringInfoString(ctx->out, " new-tuple:");
448 }
449
450 if (change->data.tp.newtuple == NULL)
451 appendStringInfoString(ctx->out, " (no-tuple-data)");
452 else
453 tuple_to_stringinfo(ctx->out, tupdesc,
454 &change->data.tp.newtuple->tuple,
455 false);
456 break;
457 case REORDER_BUFFER_CHANGE_DELETE:
458 appendStringInfoString(ctx->out, " DELETE:");
459
460 /* if there was no PK, we only know that a delete happened */
461 if (change->data.tp.oldtuple == NULL)
462 appendStringInfoString(ctx->out, " (no-tuple-data)");
463 /* In DELETE, only the replica identity is present; display that */
464 else
465 tuple_to_stringinfo(ctx->out, tupdesc,
466 &change->data.tp.oldtuple->tuple,
467 true);
468 break;
469 default:
470 Assert(false);
471 }
472
473 MemoryContextSwitchTo(old);
474 MemoryContextReset(data->context);
475
476 OutputPluginWrite(ctx, true);
477 }
478
479 static void
pg_decode_message(LogicalDecodingContext * ctx,ReorderBufferTXN * txn,XLogRecPtr lsn,bool transactional,const char * prefix,Size sz,const char * message)480 pg_decode_message(LogicalDecodingContext *ctx,
481 ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
482 const char *prefix, Size sz, const char *message)
483 {
484 OutputPluginPrepareWrite(ctx, true);
485 appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:",
486 transactional, prefix, sz);
487 appendBinaryStringInfo(ctx->out, message, sz);
488 OutputPluginWrite(ctx, true);
489 }
490