1<!-- doc/src/sgml/logicaldecoding.sgml -->
2 <chapter id="logicaldecoding">
3  <title>Logical Decoding</title>
4  <indexterm zone="logicaldecoding">
5   <primary>Logical Decoding</primary>
6  </indexterm>
7  <para>
8   PostgreSQL provides infrastructure to stream the modifications performed
9   via SQL to external consumers.  This functionality can be used for a
10   variety of purposes, including replication solutions and auditing.
11  </para>
12
13  <para>
14   Changes are sent out in streams identified by logical replication slots.
15  </para>
16
17  <para>
18   The format in which those changes are streamed is determined by the output
19   plugin used.  An example plugin is provided in the PostgreSQL distribution.
20   Additional plugins can be
21   written to extend the choice of available formats without modifying any
22   core code.
23   Every output plugin has access to each individual new row produced
24   by <command>INSERT</command> and the new row version created
25   by <command>UPDATE</command>.  Availability of old row versions for
26   <command>UPDATE</command> and <command>DELETE</command> depends on
27   the configured replica identity (see <xref linkend="sql-createtable-replica-identity"/>).
28  </para>
29
30  <para>
31   Changes can be consumed either using the streaming replication protocol
32   (see <xref linkend="protocol-replication"/> and
33   <xref linkend="logicaldecoding-walsender"/>), or by calling functions
34   via SQL (see <xref linkend="logicaldecoding-sql"/>). It is also possible
35   to write additional methods of consuming the output of a replication slot
36   without modifying core code
37   (see <xref linkend="logicaldecoding-writer"/>).
38  </para>
39
40  <sect1 id="logicaldecoding-example">
41   <title>Logical Decoding Examples</title>
42
43   <para>
44    The following example demonstrates controlling logical decoding using the
45    SQL interface.
46   </para>
47
48   <para>
49    Before you can use logical decoding, you must set
50    <xref linkend="guc-wal-level"/> to <literal>logical</literal> and
51    <xref linkend="guc-max-replication-slots"/> to at least 1.  Then, you
52    should connect to the target database (in the example
53    below, <literal>postgres</literal>) as a superuser.
54   </para>
55
56<programlisting>
57postgres=# -- Create a slot named 'regression_slot' using the output plugin 'test_decoding'
58postgres=# SELECT * FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
59    slot_name    |    lsn
60-----------------+-----------
61 regression_slot | 0/16B1970
62(1 row)
63
64postgres=# SELECT slot_name, plugin, slot_type, database, active, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots;
65    slot_name    |    plugin     | slot_type | database | active | restart_lsn | confirmed_flush_lsn
66-----------------+---------------+-----------+----------+--------+-------------+-----------------
67 regression_slot | test_decoding | logical   | postgres | f      | 0/16A4408   | 0/16A4440
68(1 row)
69
70postgres=# -- There are no changes to see yet
71postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
72 lsn | xid | data
73-----+-----+------
74(0 rows)
75
76postgres=# CREATE TABLE data(id serial primary key, data text);
77CREATE TABLE
78
79postgres=# -- DDL isn't replicated, so all you'll see is the transaction
80postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
81    lsn    |  xid  |     data
82-----------+-------+--------------
83 0/BA2DA58 | 10297 | BEGIN 10297
84 0/BA5A5A0 | 10297 | COMMIT 10297
85(2 rows)
86
87postgres=# -- Once changes are read, they're consumed and not emitted
88postgres=# -- in a subsequent call:
89postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
90 lsn | xid | data
91-----+-----+------
92(0 rows)
93
94postgres=# BEGIN;
95postgres=*# INSERT INTO data(data) VALUES('1');
96postgres=*# INSERT INTO data(data) VALUES('2');
97postgres=*# COMMIT;
98
99postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
100    lsn    |  xid  |                          data
101-----------+-------+---------------------------------------------------------
102 0/BA5A688 | 10298 | BEGIN 10298
103 0/BA5A6F0 | 10298 | table public.data: INSERT: id[integer]:1 data[text]:'1'
104 0/BA5A7F8 | 10298 | table public.data: INSERT: id[integer]:2 data[text]:'2'
105 0/BA5A8A8 | 10298 | COMMIT 10298
106(4 rows)
107
108postgres=# INSERT INTO data(data) VALUES('3');
109
110postgres=# -- You can also peek ahead in the change stream without consuming changes
111postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL);
112    lsn    |  xid  |                          data
113-----------+-------+---------------------------------------------------------
114 0/BA5A8E0 | 10299 | BEGIN 10299
115 0/BA5A8E0 | 10299 | table public.data: INSERT: id[integer]:3 data[text]:'3'
116 0/BA5A990 | 10299 | COMMIT 10299
117(3 rows)
118
119postgres=# -- The next call to pg_logical_slot_peek_changes() returns the same changes again
120postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL);
121    lsn    |  xid  |                          data
122-----------+-------+---------------------------------------------------------
123 0/BA5A8E0 | 10299 | BEGIN 10299
124 0/BA5A8E0 | 10299 | table public.data: INSERT: id[integer]:3 data[text]:'3'
125 0/BA5A990 | 10299 | COMMIT 10299
126(3 rows)
127
128postgres=# -- options can be passed to output plugin, to influence the formatting
129postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-timestamp', 'on');
130    lsn    |  xid  |                          data
131-----------+-------+---------------------------------------------------------
132 0/BA5A8E0 | 10299 | BEGIN 10299
133 0/BA5A8E0 | 10299 | table public.data: INSERT: id[integer]:3 data[text]:'3'
134 0/BA5A990 | 10299 | COMMIT 10299 (at 2017-05-10 12:07:21.272494-04)
135(3 rows)
136
137postgres=# -- Remember to destroy a slot you no longer need to stop it consuming
138postgres=# -- server resources:
139postgres=# SELECT pg_drop_replication_slot('regression_slot');
140 pg_drop_replication_slot
141-----------------------
142
143(1 row)
144</programlisting>
145
146   <para>
147    The following example shows how logical decoding is controlled over the
148    streaming replication protocol, using the
149    program <xref linkend="app-pgrecvlogical"/> included in the PostgreSQL
150    distribution.  This requires that client authentication is set up to allow
151    replication connections
152    (see <xref linkend="streaming-replication-authentication"/>) and
153    that <varname>max_wal_senders</varname> is set sufficiently high to allow
154    an additional connection.
155   </para>
156<programlisting>
157$ pg_recvlogical -d postgres --slot=test --create-slot
158$ pg_recvlogical -d postgres --slot=test --start -f -
159<keycombo action="simul"><keycap>Control</keycap><keycap>Z</keycap></keycombo>
160$ psql -d postgres -c "INSERT INTO data(data) VALUES('4');"
161$ fg
162BEGIN 693
163table public.data: INSERT: id[integer]:4 data[text]:'4'
164COMMIT 693
165<keycombo action="simul"><keycap>Control</keycap><keycap>C</keycap></keycombo>
166$ pg_recvlogical -d postgres --slot=test --drop-slot
167</programlisting>
168  </sect1>
169
170  <sect1 id="logicaldecoding-explanation">
171   <title>Logical Decoding Concepts</title>
172   <sect2>
173    <title>Logical Decoding</title>
174
175    <indexterm>
176     <primary>Logical Decoding</primary>
177    </indexterm>
178
179    <para>
180     Logical decoding is the process of extracting all persistent changes
181     to a database's tables into a coherent, easy to understand format which
182     can be interpreted without detailed knowledge of the database's internal
183     state.
184    </para>
185
186    <para>
187     In <productname>PostgreSQL</productname>, logical decoding is implemented
188     by decoding the contents of the <link linkend="wal">write-ahead
189     log</link>, which describe changes on a storage level, into an
190     application-specific form such as a stream of tuples or SQL statements.
191    </para>
192   </sect2>
193
194   <sect2 id="logicaldecoding-replication-slots">
195    <title>Replication Slots</title>
196
197    <indexterm>
198     <primary>replication slot</primary>
199     <secondary>logical replication</secondary>
200    </indexterm>
201
202    <para>
203     In the context of logical replication, a slot represents a stream of
204     changes that can be replayed to a client in the order they were made on
205     the origin server. Each slot streams a sequence of changes from a single
206     database.
207    </para>
208
209    <note>
210     <para><productname>PostgreSQL</productname> also has streaming replication slots
211     (see <xref linkend="streaming-replication"/>), but they are used somewhat
212     differently there.
213     </para>
214    </note>
215
216    <para>
217     A replication slot has an identifier that is unique across all databases
218     in a <productname>PostgreSQL</productname> cluster. Slots persist
219     independently of the connection using them and are crash-safe.
220    </para>
221
222    <para>
223     A logical slot will emit each change just once in normal operation.
224     The current position of each slot is persisted only at checkpoint, so in
225     the case of a crash the slot may return to an earlier LSN, which will
226     then cause recent changes to be sent again when the server restarts.
227     Logical decoding clients are responsible for avoiding ill effects from
228     handling the same message more than once.  Clients may wish to record
229     the last LSN they saw when decoding and skip over any repeated data or
230     (when using the replication protocol) request that decoding start from
231     that LSN rather than letting the server determine the start point.
232     The Replication Progress Tracking feature is designed for this purpose,
233     refer to <link linkend="replication-origins">replication origins</link>.
234    </para>
235
236    <para>
237     Multiple independent slots may exist for a single database. Each slot has
238     its own state, allowing different consumers to receive changes from
239     different points in the database change stream. For most applications, a
240     separate slot will be required for each consumer.
241    </para>
242
243    <para>
244     A logical replication slot knows nothing about the state of the
245     receiver(s).  It's even possible to have multiple different receivers using
246     the same slot at different times; they'll just get the changes following
247     on from when the last receiver stopped consuming them. Only one receiver
248     may consume changes from a slot at any given time.
249    </para>
250
251    <caution>
252     <para>
253      Replication slots persist across crashes and know nothing about the state
254      of their consumer(s). They will prevent removal of required resources
255      even when there is no connection using them. This consumes storage
256      because neither required WAL nor required rows from the system catalogs
257      can be removed by <command>VACUUM</command> as long as they are required by a replication
258      slot.  In extreme cases this could cause the database to shut down to prevent
259      transaction ID wraparound (see <xref linkend="vacuum-for-wraparound"/>).
260      So if a slot is no longer required it should be dropped.
261     </para>
262    </caution>
263   </sect2>
264
265   <sect2>
266    <title>Output Plugins</title>
267    <para>
268     Output plugins transform the data from the write-ahead log's internal
269     representation into the format the consumer of a replication slot desires.
270    </para>
271   </sect2>
272
273   <sect2>
274    <title>Exported Snapshots</title>
275    <para>
276     When a new replication slot is created using the streaming replication
277     interface (see <xref linkend="protocol-replication-create-slot"/>), a
278     snapshot is exported
279     (see <xref linkend="functions-snapshot-synchronization"/>), which will show
280     exactly the state of the database after which all changes will be
281     included in the change stream. This can be used to create a new replica by
282     using <link linkend="sql-set-transaction"><literal>SET TRANSACTION
283     SNAPSHOT</literal></link> to read the state of the database at the moment
284     the slot was created. This transaction can then be used to dump the
285     database's state at that point in time, which afterwards can be updated
286     using the slot's contents without losing any changes.
287    </para>
288    <para>
289     Creation of a snapshot is not always possible.  In particular, it will
290     fail when connected to a hot standby.  Applications that do not require
291     snapshot export may suppress it with the <literal>NOEXPORT_SNAPSHOT</literal>
292     option.
293    </para>
294   </sect2>
295  </sect1>
296
297  <sect1 id="logicaldecoding-walsender">
298   <title>Streaming Replication Protocol Interface</title>
299
300   <para>
301    The commands
302    <itemizedlist>
303     <listitem>
304      <para><literal>CREATE_REPLICATION_SLOT <replaceable>slot_name</replaceable> LOGICAL <replaceable>output_plugin</replaceable></literal></para>
305     </listitem>
306
307     <listitem>
308      <para><literal>DROP_REPLICATION_SLOT <replaceable>slot_name</replaceable></literal> <optional> <literal>WAIT</literal> </optional></para>
309     </listitem>
310
311     <listitem>
312      <para><literal>START_REPLICATION SLOT <replaceable>slot_name</replaceable> LOGICAL ...</literal></para>
313     </listitem>
314    </itemizedlist>
315    are used to create, drop, and stream changes from a replication
316    slot, respectively. These commands are only available over a replication
317    connection; they cannot be used via SQL.
318    See <xref linkend="protocol-replication"/> for details on these commands.
319   </para>
320
321   <para>
322    The command <xref linkend="app-pgrecvlogical"/> can be used to control
323    logical decoding over a streaming replication connection.  (It uses
324    these commands internally.)
325   </para>
326  </sect1>
327
328  <sect1 id="logicaldecoding-sql">
329   <title>Logical Decoding <acronym>SQL</acronym> Interface</title>
330
331   <para>
332     See <xref linkend="functions-replication"/> for detailed documentation on
333     the SQL-level API for interacting with logical decoding.
334   </para>
335
336   <para>
337    Synchronous replication (see <xref linkend="synchronous-replication"/>) is
338    only supported on replication slots used over the streaming replication interface. The
339    function interface and additional, non-core interfaces do not support
340    synchronous replication.
341   </para>
342  </sect1>
343
344  <sect1 id="logicaldecoding-catalogs">
345   <title>System Catalogs Related to Logical Decoding</title>
346
347   <para>
348    The <link linkend="view-pg-replication-slots"><structname>pg_replication_slots</structname></link>
349    view and the
350    <link linkend="monitoring-pg-stat-replication-view">
351    <structname>pg_stat_replication</structname></link>
352    view provide information about the current state of replication slots and
353    streaming replication connections respectively. These views apply to both physical and
354    logical replication.
355   </para>
356  </sect1>
357
358  <sect1 id="logicaldecoding-output-plugin">
359   <title>Logical Decoding Output Plugins</title>
360   <para>
361    An example output plugin can be found in the
362    <link linkend="test-decoding">
363     <filename>contrib/test_decoding</filename>
364    </link>
365    subdirectory of the PostgreSQL source tree.
366   </para>
367   <sect2 id="logicaldecoding-output-init">
368    <title>Initialization Function</title>
369    <indexterm zone="logicaldecoding-output-init">
370     <primary>_PG_output_plugin_init</primary>
371    </indexterm>
372    <para>
373     An output plugin is loaded by dynamically loading a shared library with
374     the output plugin's name as the library base name. The normal library
375     search path is used to locate the library. To provide the required output
376     plugin callbacks and to indicate that the library is actually an output
377     plugin it needs to provide a function named
378     <function>_PG_output_plugin_init</function>. This function is passed a
379     struct that needs to be filled with the callback function pointers for
380     individual actions.
381<programlisting>
382typedef struct OutputPluginCallbacks
383{
384    LogicalDecodeStartupCB startup_cb;
385    LogicalDecodeBeginCB begin_cb;
386    LogicalDecodeChangeCB change_cb;
387    LogicalDecodeTruncateCB truncate_cb;
388    LogicalDecodeCommitCB commit_cb;
389    LogicalDecodeMessageCB message_cb;
390    LogicalDecodeFilterByOriginCB filter_by_origin_cb;
391    LogicalDecodeShutdownCB shutdown_cb;
392} OutputPluginCallbacks;
393
394typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
395</programlisting>
396     The <function>begin_cb</function>, <function>change_cb</function>
397     and <function>commit_cb</function> callbacks are required,
398     while <function>startup_cb</function>,
399     <function>filter_by_origin_cb</function>, <function>truncate_cb</function>,
400     and <function>shutdown_cb</function> are optional.
401     If <function>truncate_cb</function> is not set but a
402     <command>TRUNCATE</command> is to be decoded, the action will be ignored.
403    </para>
404   </sect2>
405
406   <sect2 id="logicaldecoding-capabilities">
407    <title>Capabilities</title>
408
409    <para>
410     To decode, format and output changes, output plugins can use most of the
411     backend's normal infrastructure, including calling output functions. Read
412     only access to relations is permitted as long as only relations are
413     accessed that either have been created by <command>initdb</command> in
414     the <literal>pg_catalog</literal> schema, or have been marked as user
415     provided catalog tables using
416<programlisting>
417ALTER TABLE user_catalog_table SET (user_catalog_table = true);
418CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true);
419</programlisting>
420     Any actions leading to transaction ID assignment are prohibited. That, among others,
421     includes writing to tables, performing DDL changes, and
422     calling <literal>pg_current_xact_id()</literal>.
423    </para>
424   </sect2>
425
426   <sect2 id="logicaldecoding-output-mode">
427    <title>Output Modes</title>
428
429    <para>
430     Output plugin callbacks can pass data to the consumer in nearly arbitrary
431     formats. For some use cases, like viewing the changes via SQL, returning
432     data in a data type that can contain arbitrary data (e.g., <type>bytea</type>) is
433     cumbersome. If the output plugin only outputs textual data in the
434     server's encoding, it can declare that by
435     setting <literal>OutputPluginOptions.output_type</literal>
436     to <literal>OUTPUT_PLUGIN_TEXTUAL_OUTPUT</literal> instead
437     of <literal>OUTPUT_PLUGIN_BINARY_OUTPUT</literal> in
438     the <link linkend="logicaldecoding-output-plugin-startup">startup
439     callback</link>. In that case, all the data has to be in the server's encoding
440     so that a <type>text</type> datum can contain it. This is checked in assertion-enabled
441     builds.
442    </para>
443   </sect2>
444
445   <sect2 id="logicaldecoding-output-plugin-callbacks">
446    <title>Output Plugin Callbacks</title>
447
448    <para>
449     An output plugin gets notified about changes that are happening via
450     various callbacks it needs to provide.
451    </para>
452
453    <para>
454     Concurrent transactions are decoded in commit order, and only changes
455     belonging to a specific transaction are decoded between
456     the <literal>begin</literal> and <literal>commit</literal>
457     callbacks. Transactions that were rolled back explicitly or implicitly
458     never get
459     decoded. Successful savepoints are
460     folded into the transaction containing them in the order they were
461     executed within that transaction.
462    </para>
463
464    <note>
465     <para>
466      Only transactions that have already safely been flushed to disk will be
467      decoded. That can lead to a <command>COMMIT</command> not immediately being decoded in a
468      directly following <literal>pg_logical_slot_get_changes()</literal>
469      when <varname>synchronous_commit</varname> is set
470      to <literal>off</literal>.
471     </para>
472    </note>
473
474    <sect3 id="logicaldecoding-output-plugin-startup">
475     <title>Startup Callback</title>
476     <para>
477      The optional <function>startup_cb</function> callback is called whenever
478      a replication slot is created or asked to stream changes, independent
479      of the number of changes that are ready to be put out.
480<programlisting>
481typedef void (*LogicalDecodeStartupCB) (struct LogicalDecodingContext *ctx,
482                                        OutputPluginOptions *options,
483                                        bool is_init);
484</programlisting>
485      The <literal>is_init</literal> parameter will be true when the
486      replication slot is being created and false
487      otherwise. <parameter>options</parameter> points to a struct of options
488      that output plugins can set:
489<programlisting>
490typedef struct OutputPluginOptions
491{
492    OutputPluginOutputType output_type;
493    bool        receive_rewrites;
494} OutputPluginOptions;
495</programlisting>
496      <literal>output_type</literal> has to either be set to
497      <literal>OUTPUT_PLUGIN_TEXTUAL_OUTPUT</literal>
498      or <literal>OUTPUT_PLUGIN_BINARY_OUTPUT</literal>. See also
499      <xref linkend="logicaldecoding-output-mode"/>.
500      If <literal>receive_rewrites</literal> is true, the output plugin will
501      also be called for changes made by heap rewrites during certain DDL
502      operations.  These are of interest to plugins that handle DDL
503      replication, but they require special handling.
504     </para>
505
506     <para>
507      The startup callback should validate the options present in
508      <literal>ctx-&gt;output_plugin_options</literal>. If the output plugin
509      needs to have a state, it can
510      use <literal>ctx-&gt;output_plugin_private</literal> to store it.
511     </para>
512    </sect3>
513
514    <sect3 id="logicaldecoding-output-plugin-shutdown">
515     <title>Shutdown Callback</title>
516
517     <para>
518      The optional <function>shutdown_cb</function> callback is called
519      whenever a formerly active replication slot is not used anymore and can
520      be used to deallocate resources private to the output plugin. The slot
521      isn't necessarily being dropped, streaming is just being stopped.
522<programlisting>
523typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);
524</programlisting>
525     </para>
526    </sect3>
527
528    <sect3 id="logicaldecoding-output-plugin-begin">
529     <title>Transaction Begin Callback</title>
530
531     <para>
532      The required <function>begin_cb</function> callback is called whenever a
533      start of a committed transaction has been decoded. Aborted transactions
534      and their contents never get decoded.
535<programlisting>
536typedef void (*LogicalDecodeBeginCB) (struct LogicalDecodingContext *ctx,
537                                      ReorderBufferTXN *txn);
538</programlisting>
539      The <parameter>txn</parameter> parameter contains meta information about
540      the transaction, like the time stamp at which it has been committed and
541      its XID.
542     </para>
543    </sect3>
544
545    <sect3 id="logicaldecoding-output-plugin-commit">
546     <title>Transaction End Callback</title>
547
548     <para>
549      The required <function>commit_cb</function> callback is called whenever
550      a transaction commit has been
551      decoded. The <function>change_cb</function> callbacks for all modified
552      rows will have been called before this, if there have been any modified
553      rows.
554<programlisting>
555typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
556                                       ReorderBufferTXN *txn,
557                                       XLogRecPtr commit_lsn);
558</programlisting>
559     </para>
560    </sect3>
561
562    <sect3 id="logicaldecoding-output-plugin-change">
563     <title>Change Callback</title>
564
565     <para>
566      The required <function>change_cb</function> callback is called for every
567      individual row modification inside a transaction, may it be
568      an <command>INSERT</command>, <command>UPDATE</command>,
569      or <command>DELETE</command>. Even if the original command modified
570      several rows at once the callback will be called individually for each
571      row.
572<programlisting>
573typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx,
574                                       ReorderBufferTXN *txn,
575                                       Relation relation,
576                                       ReorderBufferChange *change);
577</programlisting>
578      The <parameter>ctx</parameter> and <parameter>txn</parameter> parameters
579      have the same contents as for the <function>begin_cb</function>
580      and <function>commit_cb</function> callbacks, but additionally the
581      relation descriptor <parameter>relation</parameter> points to the
582      relation the row belongs to and a struct
583      <parameter>change</parameter> describing the row modification are passed
584      in.
585     </para>
586
587     <note>
588      <para>
589       Only changes in user defined tables that are not unlogged
590       (see <xref linkend="sql-createtable-unlogged"/>) and not temporary
591       (see <xref linkend="sql-createtable-temporary"/>) can be extracted using
592       logical decoding.
593      </para>
594     </note>
595    </sect3>
596
597    <sect3 id="logicaldecoding-output-plugin-truncate">
598     <title>Truncate Callback</title>
599
600     <para>
601      The <function>truncate_cb</function> callback is called for a
602      <command>TRUNCATE</command> command.
603<programlisting>
604typedef void (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx,
605                                         ReorderBufferTXN *txn,
606                                         int nrelations,
607                                         Relation relations[],
608                                         ReorderBufferChange *change);
609</programlisting>
610      The parameters are analogous to the <function>change_cb</function>
611      callback.  However, because <command>TRUNCATE</command> actions on
612      tables connected by foreign keys need to be executed together, this
613      callback receives an array of relations instead of just a single one.
614      See the description of the <xref linkend="sql-truncate"/> statement for
615      details.
616     </para>
617    </sect3>
618
619     <sect3 id="logicaldecoding-output-plugin-filter-origin">
620     <title>Origin Filter Callback</title>
621
622     <para>
623       The optional <function>filter_by_origin_cb</function> callback
624       is called to determine whether data that has been replayed
625       from <parameter>origin_id</parameter> is of interest to the
626       output plugin.
627<programlisting>
628typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ctx,
629                                               RepOriginId origin_id);
630</programlisting>
631      The <parameter>ctx</parameter> parameter has the same contents
632      as for the other callbacks. No information but the origin is
633      available. To signal that changes originating on the passed in
634      node are irrelevant, return true, causing them to be filtered
635      away; false otherwise. The other callbacks will not be called
636      for transactions and changes that have been filtered away.
637     </para>
638     <para>
639       This is useful when implementing cascading or multidirectional
640       replication solutions. Filtering by the origin allows to
641       prevent replicating the same changes back and forth in such
642       setups.  While transactions and changes also carry information
643       about the origin, filtering via this callback is noticeably
644       more efficient.
645     </para>
646     </sect3>
647
648    <sect3 id="logicaldecoding-output-plugin-message">
649     <title>Generic Message Callback</title>
650
651     <para>
652      The optional <function>message_cb</function> callback is called whenever
653      a logical decoding message has been decoded.
654<programlisting>
655typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
656                                        ReorderBufferTXN *txn,
657                                        XLogRecPtr message_lsn,
658                                        bool transactional,
659                                        const char *prefix,
660                                        Size message_size,
661                                        const char *message);
662</programlisting>
663      The <parameter>txn</parameter> parameter contains meta information about
664      the transaction, like the time stamp at which it has been committed and
665      its XID. Note however that it can be NULL when the message is
666      non-transactional and the XID was not assigned yet in the transaction
667      which logged the message. The <parameter>lsn</parameter> has WAL
668      location of the message. The <parameter>transactional</parameter> says
669      if the message was sent as transactional or not.
670      The <parameter>prefix</parameter> is arbitrary null-terminated prefix
671      which can be used for identifying interesting messages for the current
672      plugin. And finally the <parameter>message</parameter> parameter holds
673      the actual message of <parameter>message_size</parameter> size.
674     </para>
675     <para>
676      Extra care should be taken to ensure that the prefix the output plugin
677      considers interesting is unique. Using name of the extension or the
678      output plugin itself is often a good choice.
679     </para>
680    </sect3>
681
682   </sect2>
683
684   <sect2 id="logicaldecoding-output-plugin-output">
685    <title>Functions for Producing Output</title>
686
687    <para>
688     To actually produce output, output plugins can write data to
689     the <literal>StringInfo</literal> output buffer
690     in <literal>ctx-&gt;out</literal> when inside
691     the <function>begin_cb</function>, <function>commit_cb</function>,
692     or <function>change_cb</function> callbacks. Before writing to the output
693     buffer, <function>OutputPluginPrepareWrite(ctx, last_write)</function> has
694     to be called, and after finishing writing to the
695     buffer, <function>OutputPluginWrite(ctx, last_write)</function> has to be
696     called to perform the write. The <parameter>last_write</parameter>
697     indicates whether a particular write was the callback's last write.
698    </para>
699
700    <para>
701     The following example shows how to output data to the consumer of an
702     output plugin:
703<programlisting>
704OutputPluginPrepareWrite(ctx, true);
705appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
706OutputPluginWrite(ctx, true);
707</programlisting>
708    </para>
709   </sect2>
710  </sect1>
711
712  <sect1 id="logicaldecoding-writer">
713   <title>Logical Decoding Output Writers</title>
714
715   <para>
716    It is possible to add more output methods for logical decoding.
717    For details, see
718    <filename>src/backend/replication/logical/logicalfuncs.c</filename>.
719    Essentially, three functions need to be provided: one to read WAL, one to
720    prepare writing output, and one to write the output
721    (see <xref linkend="logicaldecoding-output-plugin-output"/>).
722   </para>
723  </sect1>
724
725  <sect1 id="logicaldecoding-synchronous">
726   <title>Synchronous Replication Support for Logical Decoding</title>
727   <sect2>
728    <title>Overview</title>
729
730    <para>
731     Logical decoding can be used to build
732     <link linkend="synchronous-replication">synchronous
733     replication</link> solutions with the same user interface as synchronous
734     replication for <link linkend="streaming-replication">streaming
735     replication</link>.  To do this, the streaming replication interface
736     (see <xref linkend="logicaldecoding-walsender"/>) must be used to stream out
737     data. Clients have to send <literal>Standby status update (F)</literal>
738     (see <xref linkend="protocol-replication"/>) messages, just like streaming
739     replication clients do.
740    </para>
741
742    <note>
743     <para>
744      A synchronous replica receiving changes via logical decoding will work in
745      the scope of a single database. Since, in contrast to
746      that, <parameter>synchronous_standby_names</parameter> currently is
747      server wide, this means this technique will not work properly if more
748      than one database is actively used.
749     </para>
750    </note>
751   </sect2>
752
753   <sect2 id="logicaldecoding-synchronous-caveats">
754    <title>Caveats</title>
755
756    <para>
757     In synchronous replication setup, a deadlock can happen, if the transaction
758     has locked [user] catalog tables exclusively. See
759     <xref linkend="logicaldecoding-capabilities"/> for information on user
760     catalog tables. This is because logical decoding of transactions can lock
761     catalog tables to access them. To avoid this users must refrain from taking
762     an exclusive lock on [user] catalog tables. This can happen in the following
763     ways:
764
765     <itemizedlist>
766      <listitem>
767       <para>
768        Issuing an explicit <command>LOCK</command> on <structname>pg_class</structname>
769        in a transaction.
770       </para>
771      </listitem>
772
773      <listitem>
774       <para>
775        Perform <command>CLUSTER</command> on <structname>pg_class</structname> in a
776        transaction.
777       </para>
778      </listitem>
779
780      <listitem>
781       <para>
782        Executing <command>TRUNCATE</command> on [user] catalog table in a
783        transaction.
784       </para>
785      </listitem>
786     </itemizedlist>
787
788     Note that these commands that can cause deadlock apply to not only explicitly
789     indicated system catalog tables above but also to any other [user] catalog
790     table.
791    </para>
792   </sect2>
793  </sect1>
794 </chapter>
795