1 /*-------------------------------------------------------------------------
2 *
3 * logicalfuncs.c
4 *
5 * Support functions for using logical decoding and management of
6 * logical replication slots via SQL.
7 *
8 *
9 * Copyright (c) 2012-2021, PostgreSQL Global Development Group
10 *
11 * IDENTIFICATION
12 * src/backend/replication/logicalfuncs.c
13 *-------------------------------------------------------------------------
14 */
15
16 #include "postgres.h"
17
18 #include <unistd.h>
19
20 #include "access/xact.h"
21 #include "access/xlog_internal.h"
22 #include "access/xlogutils.h"
23 #include "catalog/pg_type.h"
24 #include "fmgr.h"
25 #include "funcapi.h"
26 #include "mb/pg_wchar.h"
27 #include "miscadmin.h"
28 #include "nodes/makefuncs.h"
29 #include "replication/decode.h"
30 #include "replication/logical.h"
31 #include "replication/message.h"
32 #include "storage/fd.h"
33 #include "utils/array.h"
34 #include "utils/builtins.h"
35 #include "utils/inval.h"
36 #include "utils/lsyscache.h"
37 #include "utils/memutils.h"
38 #include "utils/pg_lsn.h"
39 #include "utils/regproc.h"
40 #include "utils/resowner.h"
41
42 /* private date for writing out data */
43 typedef struct DecodingOutputState
44 {
45 Tuplestorestate *tupstore;
46 TupleDesc tupdesc;
47 bool binary_output;
48 int64 returned_rows;
49 } DecodingOutputState;
50
51 /*
52 * Prepare for an output plugin write.
53 */
54 static void
LogicalOutputPrepareWrite(LogicalDecodingContext * ctx,XLogRecPtr lsn,TransactionId xid,bool last_write)55 LogicalOutputPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
56 bool last_write)
57 {
58 resetStringInfo(ctx->out);
59 }
60
61 /*
62 * Perform output plugin write into tuplestore.
63 */
64 static void
LogicalOutputWrite(LogicalDecodingContext * ctx,XLogRecPtr lsn,TransactionId xid,bool last_write)65 LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
66 bool last_write)
67 {
68 Datum values[3];
69 bool nulls[3];
70 DecodingOutputState *p;
71
72 /* SQL Datums can only be of a limited length... */
73 if (ctx->out->len > MaxAllocSize - VARHDRSZ)
74 elog(ERROR, "too much output for sql interface");
75
76 p = (DecodingOutputState *) ctx->output_writer_private;
77
78 memset(nulls, 0, sizeof(nulls));
79 values[0] = LSNGetDatum(lsn);
80 values[1] = TransactionIdGetDatum(xid);
81
82 /*
83 * Assert ctx->out is in database encoding when we're writing textual
84 * output.
85 */
86 if (!p->binary_output)
87 Assert(pg_verify_mbstr(GetDatabaseEncoding(),
88 ctx->out->data, ctx->out->len,
89 false));
90
91 /* ick, but cstring_to_text_with_len works for bytea perfectly fine */
92 values[2] = PointerGetDatum(cstring_to_text_with_len(ctx->out->data, ctx->out->len));
93
94 tuplestore_putvalues(p->tupstore, p->tupdesc, values, nulls);
95 p->returned_rows++;
96 }
97
98 static void
check_permissions(void)99 check_permissions(void)
100 {
101 if (!superuser() && !has_rolreplication(GetUserId()))
102 ereport(ERROR,
103 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
104 errmsg("must be superuser or replication role to use replication slots")));
105 }
106
107 /*
108 * Helper function for the various SQL callable logical decoding functions.
109 */
110 static Datum
pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo,bool confirm,bool binary)111 pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool binary)
112 {
113 Name name;
114 XLogRecPtr upto_lsn;
115 int32 upto_nchanges;
116 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
117 MemoryContext per_query_ctx;
118 MemoryContext oldcontext;
119 XLogRecPtr end_of_wal;
120 LogicalDecodingContext *ctx;
121 ResourceOwner old_resowner = CurrentResourceOwner;
122 ArrayType *arr;
123 Size ndim;
124 List *options = NIL;
125 DecodingOutputState *p;
126
127 check_permissions();
128
129 CheckLogicalDecodingRequirements();
130
131 if (PG_ARGISNULL(0))
132 ereport(ERROR,
133 (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
134 errmsg("slot name must not be null")));
135 name = PG_GETARG_NAME(0);
136
137 if (PG_ARGISNULL(1))
138 upto_lsn = InvalidXLogRecPtr;
139 else
140 upto_lsn = PG_GETARG_LSN(1);
141
142 if (PG_ARGISNULL(2))
143 upto_nchanges = InvalidXLogRecPtr;
144 else
145 upto_nchanges = PG_GETARG_INT32(2);
146
147 if (PG_ARGISNULL(3))
148 ereport(ERROR,
149 (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
150 errmsg("options array must not be null")));
151 arr = PG_GETARG_ARRAYTYPE_P(3);
152
153 /* check to see if caller supports us returning a tuplestore */
154 if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
155 ereport(ERROR,
156 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
157 errmsg("set-valued function called in context that cannot accept a set")));
158 if (!(rsinfo->allowedModes & SFRM_Materialize))
159 ereport(ERROR,
160 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
161 errmsg("materialize mode required, but it is not allowed in this context")));
162
163 /* state to write output to */
164 p = palloc0(sizeof(DecodingOutputState));
165
166 p->binary_output = binary;
167
168 /* Build a tuple descriptor for our result type */
169 if (get_call_result_type(fcinfo, NULL, &p->tupdesc) != TYPEFUNC_COMPOSITE)
170 elog(ERROR, "return type must be a row type");
171
172 per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
173 oldcontext = MemoryContextSwitchTo(per_query_ctx);
174
175 /* Deconstruct options array */
176 ndim = ARR_NDIM(arr);
177 if (ndim > 1)
178 {
179 ereport(ERROR,
180 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
181 errmsg("array must be one-dimensional")));
182 }
183 else if (array_contains_nulls(arr))
184 {
185 ereport(ERROR,
186 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
187 errmsg("array must not contain nulls")));
188 }
189 else if (ndim == 1)
190 {
191 int nelems;
192 Datum *datum_opts;
193 int i;
194
195 Assert(ARR_ELEMTYPE(arr) == TEXTOID);
196
197 deconstruct_array(arr, TEXTOID, -1, false, TYPALIGN_INT,
198 &datum_opts, NULL, &nelems);
199
200 if (nelems % 2 != 0)
201 ereport(ERROR,
202 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
203 errmsg("array must have even number of elements")));
204
205 for (i = 0; i < nelems; i += 2)
206 {
207 char *name = TextDatumGetCString(datum_opts[i]);
208 char *opt = TextDatumGetCString(datum_opts[i + 1]);
209
210 options = lappend(options, makeDefElem(name, (Node *) makeString(opt), -1));
211 }
212 }
213
214 p->tupstore = tuplestore_begin_heap(true, false, work_mem);
215 rsinfo->returnMode = SFRM_Materialize;
216 rsinfo->setResult = p->tupstore;
217 rsinfo->setDesc = p->tupdesc;
218
219 /*
220 * Compute the current end-of-wal and maintain ThisTimeLineID.
221 * RecoveryInProgress() will update ThisTimeLineID on promotion.
222 */
223 if (!RecoveryInProgress())
224 end_of_wal = GetFlushRecPtr();
225 else
226 end_of_wal = GetXLogReplayRecPtr(&ThisTimeLineID);
227
228 ReplicationSlotAcquire(NameStr(*name), true);
229
230 PG_TRY();
231 {
232 /* restart at slot's confirmed_flush */
233 ctx = CreateDecodingContext(InvalidXLogRecPtr,
234 options,
235 false,
236 XL_ROUTINE(.page_read = read_local_xlog_page,
237 .segment_open = wal_segment_open,
238 .segment_close = wal_segment_close),
239 LogicalOutputPrepareWrite,
240 LogicalOutputWrite, NULL);
241
242 /*
243 * After the sanity checks in CreateDecodingContext, make sure the
244 * restart_lsn is valid. Avoid "cannot get changes" wording in this
245 * errmsg because that'd be confusingly ambiguous about no changes
246 * being available.
247 */
248 if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
249 ereport(ERROR,
250 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
251 errmsg("can no longer get changes from replication slot \"%s\"",
252 NameStr(*name)),
253 errdetail("This slot has never previously reserved WAL, or it has been invalidated.")));
254
255 MemoryContextSwitchTo(oldcontext);
256
257 /*
258 * Check whether the output plugin writes textual output if that's
259 * what we need.
260 */
261 if (!binary &&
262 ctx->options.output_type !=OUTPUT_PLUGIN_TEXTUAL_OUTPUT)
263 ereport(ERROR,
264 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
265 errmsg("logical decoding output plugin \"%s\" produces binary output, but function \"%s\" expects textual data",
266 NameStr(MyReplicationSlot->data.plugin),
267 format_procedure(fcinfo->flinfo->fn_oid))));
268
269 ctx->output_writer_private = p;
270
271 /*
272 * Decoding of WAL must start at restart_lsn so that the entirety of
273 * xacts that committed after the slot's confirmed_flush can be
274 * accumulated into reorder buffers.
275 */
276 XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn);
277
278 /* invalidate non-timetravel entries */
279 InvalidateSystemCaches();
280
281 /* Decode until we run out of records */
282 while (ctx->reader->EndRecPtr < end_of_wal)
283 {
284 XLogRecord *record;
285 char *errm = NULL;
286
287 record = XLogReadRecord(ctx->reader, &errm);
288 if (errm)
289 elog(ERROR, "%s", errm);
290
291 /*
292 * The {begin_txn,change,commit_txn}_wrapper callbacks above will
293 * store the description into our tuplestore.
294 */
295 if (record != NULL)
296 LogicalDecodingProcessRecord(ctx, ctx->reader);
297
298 /* check limits */
299 if (upto_lsn != InvalidXLogRecPtr &&
300 upto_lsn <= ctx->reader->EndRecPtr)
301 break;
302 if (upto_nchanges != 0 &&
303 upto_nchanges <= p->returned_rows)
304 break;
305 CHECK_FOR_INTERRUPTS();
306 }
307
308 tuplestore_donestoring(tupstore);
309
310 /*
311 * Logical decoding could have clobbered CurrentResourceOwner during
312 * transaction management, so restore the executor's value. (This is
313 * a kluge, but it's not worth cleaning up right now.)
314 */
315 CurrentResourceOwner = old_resowner;
316
317 /*
318 * Next time, start where we left off. (Hunting things, the family
319 * business..)
320 */
321 if (ctx->reader->EndRecPtr != InvalidXLogRecPtr && confirm)
322 {
323 LogicalConfirmReceivedLocation(ctx->reader->EndRecPtr);
324
325 /*
326 * If only the confirmed_flush_lsn has changed the slot won't get
327 * marked as dirty by the above. Callers on the walsender
328 * interface are expected to keep track of their own progress and
329 * don't need it written out. But SQL-interface users cannot
330 * specify their own start positions and it's harder for them to
331 * keep track of their progress, so we should make more of an
332 * effort to save it for them.
333 *
334 * Dirty the slot so it's written out at the next checkpoint.
335 * We'll still lose its position on crash, as documented, but it's
336 * better than always losing the position even on clean restart.
337 */
338 ReplicationSlotMarkDirty();
339 }
340
341 /* free context, call shutdown callback */
342 FreeDecodingContext(ctx);
343
344 ReplicationSlotRelease();
345 InvalidateSystemCaches();
346 }
347 PG_CATCH();
348 {
349 /* clear all timetravel entries */
350 InvalidateSystemCaches();
351
352 PG_RE_THROW();
353 }
354 PG_END_TRY();
355
356 return (Datum) 0;
357 }
358
359 /*
360 * SQL function returning the changestream as text, consuming the data.
361 */
362 Datum
pg_logical_slot_get_changes(PG_FUNCTION_ARGS)363 pg_logical_slot_get_changes(PG_FUNCTION_ARGS)
364 {
365 return pg_logical_slot_get_changes_guts(fcinfo, true, false);
366 }
367
368 /*
369 * SQL function returning the changestream as text, only peeking ahead.
370 */
371 Datum
pg_logical_slot_peek_changes(PG_FUNCTION_ARGS)372 pg_logical_slot_peek_changes(PG_FUNCTION_ARGS)
373 {
374 return pg_logical_slot_get_changes_guts(fcinfo, false, false);
375 }
376
377 /*
378 * SQL function returning the changestream in binary, consuming the data.
379 */
380 Datum
pg_logical_slot_get_binary_changes(PG_FUNCTION_ARGS)381 pg_logical_slot_get_binary_changes(PG_FUNCTION_ARGS)
382 {
383 return pg_logical_slot_get_changes_guts(fcinfo, true, true);
384 }
385
386 /*
387 * SQL function returning the changestream in binary, only peeking ahead.
388 */
389 Datum
pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS)390 pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS)
391 {
392 return pg_logical_slot_get_changes_guts(fcinfo, false, true);
393 }
394
395
396 /*
397 * SQL function for writing logical decoding message into WAL.
398 */
399 Datum
pg_logical_emit_message_bytea(PG_FUNCTION_ARGS)400 pg_logical_emit_message_bytea(PG_FUNCTION_ARGS)
401 {
402 bool transactional = PG_GETARG_BOOL(0);
403 char *prefix = text_to_cstring(PG_GETARG_TEXT_PP(1));
404 bytea *data = PG_GETARG_BYTEA_PP(2);
405 XLogRecPtr lsn;
406
407 lsn = LogLogicalMessage(prefix, VARDATA_ANY(data), VARSIZE_ANY_EXHDR(data),
408 transactional);
409 PG_RETURN_LSN(lsn);
410 }
411
412 Datum
pg_logical_emit_message_text(PG_FUNCTION_ARGS)413 pg_logical_emit_message_text(PG_FUNCTION_ARGS)
414 {
415 /* bytea and text are compatible */
416 return pg_logical_emit_message_bytea(fcinfo);
417 }
418