1<!-- doc/src/sgml/logicaldecoding.sgml --> 2 <chapter id="logicaldecoding"> 3 <title>Logical Decoding</title> 4 <indexterm zone="logicaldecoding"> 5 <primary>Logical Decoding</primary> 6 </indexterm> 7 <para> 8 PostgreSQL provides infrastructure to stream the modifications performed 9 via SQL to external consumers. This functionality can be used for a 10 variety of purposes, including replication solutions and auditing. 11 </para> 12 13 <para> 14 Changes are sent out in streams identified by logical replication slots. 15 </para> 16 17 <para> 18 The format in which those changes are streamed is determined by the output 19 plugin used. An example plugin is provided in the PostgreSQL distribution. 20 Additional plugins can be 21 written to extend the choice of available formats without modifying any 22 core code. 23 Every output plugin has access to each individual new row produced 24 by <command>INSERT</command> and the new row version created 25 by <command>UPDATE</command>. Availability of old row versions for 26 <command>UPDATE</command> and <command>DELETE</command> depends on 27 the configured replica identity (see <xref linkend="sql-createtable-replica-identity"/>). 28 </para> 29 30 <para> 31 Changes can be consumed either using the streaming replication protocol 32 (see <xref linkend="protocol-replication"/> and 33 <xref linkend="logicaldecoding-walsender"/>), or by calling functions 34 via SQL (see <xref linkend="logicaldecoding-sql"/>). It is also possible 35 to write additional methods of consuming the output of a replication slot 36 without modifying core code 37 (see <xref linkend="logicaldecoding-writer"/>). 38 </para> 39 40 <sect1 id="logicaldecoding-example"> 41 <title>Logical Decoding Examples</title> 42 43 <para> 44 The following example demonstrates controlling logical decoding using the 45 SQL interface. 46 </para> 47 48 <para> 49 Before you can use logical decoding, you must set 50 <xref linkend="guc-wal-level"/> to <literal>logical</literal> and 51 <xref linkend="guc-max-replication-slots"/> to at least 1. Then, you 52 should connect to the target database (in the example 53 below, <literal>postgres</literal>) as a superuser. 54 </para> 55 56<programlisting> 57postgres=# -- Create a slot named 'regression_slot' using the output plugin 'test_decoding' 58postgres=# SELECT * FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); 59 slot_name | lsn 60-----------------+----------- 61 regression_slot | 0/16B1970 62(1 row) 63 64postgres=# SELECT slot_name, plugin, slot_type, database, active, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots; 65 slot_name | plugin | slot_type | database | active | restart_lsn | confirmed_flush_lsn 66-----------------+---------------+-----------+----------+--------+-------------+----------------- 67 regression_slot | test_decoding | logical | postgres | f | 0/16A4408 | 0/16A4440 68(1 row) 69 70postgres=# -- There are no changes to see yet 71postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL); 72 lsn | xid | data 73-----+-----+------ 74(0 rows) 75 76postgres=# CREATE TABLE data(id serial primary key, data text); 77CREATE TABLE 78 79postgres=# -- DDL isn't replicated, so all you'll see is the transaction 80postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL); 81 lsn | xid | data 82-----------+-------+-------------- 83 0/BA2DA58 | 10297 | BEGIN 10297 84 0/BA5A5A0 | 10297 | COMMIT 10297 85(2 rows) 86 87postgres=# -- Once changes are read, they're consumed and not emitted 88postgres=# -- in a subsequent call: 89postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL); 90 lsn | xid | data 91-----+-----+------ 92(0 rows) 93 94postgres=# BEGIN; 95postgres=*# INSERT INTO data(data) VALUES('1'); 96postgres=*# INSERT INTO data(data) VALUES('2'); 97postgres=*# COMMIT; 98 99postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL); 100 lsn | xid | data 101-----------+-------+--------------------------------------------------------- 102 0/BA5A688 | 10298 | BEGIN 10298 103 0/BA5A6F0 | 10298 | table public.data: INSERT: id[integer]:1 data[text]:'1' 104 0/BA5A7F8 | 10298 | table public.data: INSERT: id[integer]:2 data[text]:'2' 105 0/BA5A8A8 | 10298 | COMMIT 10298 106(4 rows) 107 108postgres=# INSERT INTO data(data) VALUES('3'); 109 110postgres=# -- You can also peek ahead in the change stream without consuming changes 111postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL); 112 lsn | xid | data 113-----------+-------+--------------------------------------------------------- 114 0/BA5A8E0 | 10299 | BEGIN 10299 115 0/BA5A8E0 | 10299 | table public.data: INSERT: id[integer]:3 data[text]:'3' 116 0/BA5A990 | 10299 | COMMIT 10299 117(3 rows) 118 119postgres=# -- The next call to pg_logical_slot_peek_changes() returns the same changes again 120postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL); 121 lsn | xid | data 122-----------+-------+--------------------------------------------------------- 123 0/BA5A8E0 | 10299 | BEGIN 10299 124 0/BA5A8E0 | 10299 | table public.data: INSERT: id[integer]:3 data[text]:'3' 125 0/BA5A990 | 10299 | COMMIT 10299 126(3 rows) 127 128postgres=# -- options can be passed to output plugin, to influence the formatting 129postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-timestamp', 'on'); 130 lsn | xid | data 131-----------+-------+--------------------------------------------------------- 132 0/BA5A8E0 | 10299 | BEGIN 10299 133 0/BA5A8E0 | 10299 | table public.data: INSERT: id[integer]:3 data[text]:'3' 134 0/BA5A990 | 10299 | COMMIT 10299 (at 2017-05-10 12:07:21.272494-04) 135(3 rows) 136 137postgres=# -- Remember to destroy a slot you no longer need to stop it consuming 138postgres=# -- server resources: 139postgres=# SELECT pg_drop_replication_slot('regression_slot'); 140 pg_drop_replication_slot 141----------------------- 142 143(1 row) 144</programlisting> 145 146 <para> 147 The following example shows how logical decoding is controlled over the 148 streaming replication protocol, using the 149 program <xref linkend="app-pgrecvlogical"/> included in the PostgreSQL 150 distribution. This requires that client authentication is set up to allow 151 replication connections 152 (see <xref linkend="streaming-replication-authentication"/>) and 153 that <varname>max_wal_senders</varname> is set sufficiently high to allow 154 an additional connection. 155 </para> 156<programlisting> 157$ pg_recvlogical -d postgres --slot=test --create-slot 158$ pg_recvlogical -d postgres --slot=test --start -f - 159<keycombo action="simul"><keycap>Control</keycap><keycap>Z</keycap></keycombo> 160$ psql -d postgres -c "INSERT INTO data(data) VALUES('4');" 161$ fg 162BEGIN 693 163table public.data: INSERT: id[integer]:4 data[text]:'4' 164COMMIT 693 165<keycombo action="simul"><keycap>Control</keycap><keycap>C</keycap></keycombo> 166$ pg_recvlogical -d postgres --slot=test --drop-slot 167</programlisting> 168 </sect1> 169 170 <sect1 id="logicaldecoding-explanation"> 171 <title>Logical Decoding Concepts</title> 172 <sect2> 173 <title>Logical Decoding</title> 174 175 <indexterm> 176 <primary>Logical Decoding</primary> 177 </indexterm> 178 179 <para> 180 Logical decoding is the process of extracting all persistent changes 181 to a database's tables into a coherent, easy to understand format which 182 can be interpreted without detailed knowledge of the database's internal 183 state. 184 </para> 185 186 <para> 187 In <productname>PostgreSQL</productname>, logical decoding is implemented 188 by decoding the contents of the <link linkend="wal">write-ahead 189 log</link>, which describe changes on a storage level, into an 190 application-specific form such as a stream of tuples or SQL statements. 191 </para> 192 </sect2> 193 194 <sect2 id="logicaldecoding-replication-slots"> 195 <title>Replication Slots</title> 196 197 <indexterm> 198 <primary>replication slot</primary> 199 <secondary>logical replication</secondary> 200 </indexterm> 201 202 <para> 203 In the context of logical replication, a slot represents a stream of 204 changes that can be replayed to a client in the order they were made on 205 the origin server. Each slot streams a sequence of changes from a single 206 database. 207 </para> 208 209 <note> 210 <para><productname>PostgreSQL</productname> also has streaming replication slots 211 (see <xref linkend="streaming-replication"/>), but they are used somewhat 212 differently there. 213 </para> 214 </note> 215 216 <para> 217 A replication slot has an identifier that is unique across all databases 218 in a <productname>PostgreSQL</productname> cluster. Slots persist 219 independently of the connection using them and are crash-safe. 220 </para> 221 222 <para> 223 A logical slot will emit each change just once in normal operation. 224 The current position of each slot is persisted only at checkpoint, so in 225 the case of a crash the slot may return to an earlier LSN, which will 226 then cause recent changes to be sent again when the server restarts. 227 Logical decoding clients are responsible for avoiding ill effects from 228 handling the same message more than once. Clients may wish to record 229 the last LSN they saw when decoding and skip over any repeated data or 230 (when using the replication protocol) request that decoding start from 231 that LSN rather than letting the server determine the start point. 232 The Replication Progress Tracking feature is designed for this purpose, 233 refer to <link linkend="replication-origins">replication origins</link>. 234 </para> 235 236 <para> 237 Multiple independent slots may exist for a single database. Each slot has 238 its own state, allowing different consumers to receive changes from 239 different points in the database change stream. For most applications, a 240 separate slot will be required for each consumer. 241 </para> 242 243 <para> 244 A logical replication slot knows nothing about the state of the 245 receiver(s). It's even possible to have multiple different receivers using 246 the same slot at different times; they'll just get the changes following 247 on from when the last receiver stopped consuming them. Only one receiver 248 may consume changes from a slot at any given time. 249 </para> 250 251 <caution> 252 <para> 253 Replication slots persist across crashes and know nothing about the state 254 of their consumer(s). They will prevent removal of required resources 255 even when there is no connection using them. This consumes storage 256 because neither required WAL nor required rows from the system catalogs 257 can be removed by <command>VACUUM</command> as long as they are required by a replication 258 slot. In extreme cases this could cause the database to shut down to prevent 259 transaction ID wraparound (see <xref linkend="vacuum-for-wraparound"/>). 260 So if a slot is no longer required it should be dropped. 261 </para> 262 </caution> 263 </sect2> 264 265 <sect2> 266 <title>Output Plugins</title> 267 <para> 268 Output plugins transform the data from the write-ahead log's internal 269 representation into the format the consumer of a replication slot desires. 270 </para> 271 </sect2> 272 273 <sect2> 274 <title>Exported Snapshots</title> 275 <para> 276 When a new replication slot is created using the streaming replication 277 interface (see <xref linkend="protocol-replication-create-slot"/>), a 278 snapshot is exported 279 (see <xref linkend="functions-snapshot-synchronization"/>), which will show 280 exactly the state of the database after which all changes will be 281 included in the change stream. This can be used to create a new replica by 282 using <link linkend="sql-set-transaction"><literal>SET TRANSACTION 283 SNAPSHOT</literal></link> to read the state of the database at the moment 284 the slot was created. This transaction can then be used to dump the 285 database's state at that point in time, which afterwards can be updated 286 using the slot's contents without losing any changes. 287 </para> 288 <para> 289 Creation of a snapshot is not always possible. In particular, it will 290 fail when connected to a hot standby. Applications that do not require 291 snapshot export may suppress it with the <literal>NOEXPORT_SNAPSHOT</literal> 292 option. 293 </para> 294 </sect2> 295 </sect1> 296 297 <sect1 id="logicaldecoding-walsender"> 298 <title>Streaming Replication Protocol Interface</title> 299 300 <para> 301 The commands 302 <itemizedlist> 303 <listitem> 304 <para><literal>CREATE_REPLICATION_SLOT <replaceable>slot_name</replaceable> LOGICAL <replaceable>output_plugin</replaceable></literal></para> 305 </listitem> 306 307 <listitem> 308 <para><literal>DROP_REPLICATION_SLOT <replaceable>slot_name</replaceable></literal> <optional> <literal>WAIT</literal> </optional></para> 309 </listitem> 310 311 <listitem> 312 <para><literal>START_REPLICATION SLOT <replaceable>slot_name</replaceable> LOGICAL ...</literal></para> 313 </listitem> 314 </itemizedlist> 315 are used to create, drop, and stream changes from a replication 316 slot, respectively. These commands are only available over a replication 317 connection; they cannot be used via SQL. 318 See <xref linkend="protocol-replication"/> for details on these commands. 319 </para> 320 321 <para> 322 The command <xref linkend="app-pgrecvlogical"/> can be used to control 323 logical decoding over a streaming replication connection. (It uses 324 these commands internally.) 325 </para> 326 </sect1> 327 328 <sect1 id="logicaldecoding-sql"> 329 <title>Logical Decoding <acronym>SQL</acronym> Interface</title> 330 331 <para> 332 See <xref linkend="functions-replication"/> for detailed documentation on 333 the SQL-level API for interacting with logical decoding. 334 </para> 335 336 <para> 337 Synchronous replication (see <xref linkend="synchronous-replication"/>) is 338 only supported on replication slots used over the streaming replication interface. The 339 function interface and additional, non-core interfaces do not support 340 synchronous replication. 341 </para> 342 </sect1> 343 344 <sect1 id="logicaldecoding-catalogs"> 345 <title>System Catalogs Related to Logical Decoding</title> 346 347 <para> 348 The <link linkend="view-pg-replication-slots"><structname>pg_replication_slots</structname></link> 349 view and the 350 <link linkend="monitoring-pg-stat-replication-view"> 351 <structname>pg_stat_replication</structname></link> 352 view provide information about the current state of replication slots and 353 streaming replication connections respectively. These views apply to both physical and 354 logical replication. 355 </para> 356 </sect1> 357 358 <sect1 id="logicaldecoding-output-plugin"> 359 <title>Logical Decoding Output Plugins</title> 360 <para> 361 An example output plugin can be found in the 362 <link linkend="test-decoding"> 363 <filename>contrib/test_decoding</filename> 364 </link> 365 subdirectory of the PostgreSQL source tree. 366 </para> 367 <sect2 id="logicaldecoding-output-init"> 368 <title>Initialization Function</title> 369 <indexterm zone="logicaldecoding-output-init"> 370 <primary>_PG_output_plugin_init</primary> 371 </indexterm> 372 <para> 373 An output plugin is loaded by dynamically loading a shared library with 374 the output plugin's name as the library base name. The normal library 375 search path is used to locate the library. To provide the required output 376 plugin callbacks and to indicate that the library is actually an output 377 plugin it needs to provide a function named 378 <function>_PG_output_plugin_init</function>. This function is passed a 379 struct that needs to be filled with the callback function pointers for 380 individual actions. 381<programlisting> 382typedef struct OutputPluginCallbacks 383{ 384 LogicalDecodeStartupCB startup_cb; 385 LogicalDecodeBeginCB begin_cb; 386 LogicalDecodeChangeCB change_cb; 387 LogicalDecodeTruncateCB truncate_cb; 388 LogicalDecodeCommitCB commit_cb; 389 LogicalDecodeMessageCB message_cb; 390 LogicalDecodeFilterByOriginCB filter_by_origin_cb; 391 LogicalDecodeShutdownCB shutdown_cb; 392} OutputPluginCallbacks; 393 394typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb); 395</programlisting> 396 The <function>begin_cb</function>, <function>change_cb</function> 397 and <function>commit_cb</function> callbacks are required, 398 while <function>startup_cb</function>, 399 <function>filter_by_origin_cb</function>, <function>truncate_cb</function>, 400 and <function>shutdown_cb</function> are optional. 401 If <function>truncate_cb</function> is not set but a 402 <command>TRUNCATE</command> is to be decoded, the action will be ignored. 403 </para> 404 </sect2> 405 406 <sect2 id="logicaldecoding-capabilities"> 407 <title>Capabilities</title> 408 409 <para> 410 To decode, format and output changes, output plugins can use most of the 411 backend's normal infrastructure, including calling output functions. Read 412 only access to relations is permitted as long as only relations are 413 accessed that either have been created by <command>initdb</command> in 414 the <literal>pg_catalog</literal> schema, or have been marked as user 415 provided catalog tables using 416<programlisting> 417ALTER TABLE user_catalog_table SET (user_catalog_table = true); 418CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true); 419</programlisting> 420 Any actions leading to transaction ID assignment are prohibited. That, among others, 421 includes writing to tables, performing DDL changes, and 422 calling <literal>pg_current_xact_id()</literal>. 423 </para> 424 </sect2> 425 426 <sect2 id="logicaldecoding-output-mode"> 427 <title>Output Modes</title> 428 429 <para> 430 Output plugin callbacks can pass data to the consumer in nearly arbitrary 431 formats. For some use cases, like viewing the changes via SQL, returning 432 data in a data type that can contain arbitrary data (e.g., <type>bytea</type>) is 433 cumbersome. If the output plugin only outputs textual data in the 434 server's encoding, it can declare that by 435 setting <literal>OutputPluginOptions.output_type</literal> 436 to <literal>OUTPUT_PLUGIN_TEXTUAL_OUTPUT</literal> instead 437 of <literal>OUTPUT_PLUGIN_BINARY_OUTPUT</literal> in 438 the <link linkend="logicaldecoding-output-plugin-startup">startup 439 callback</link>. In that case, all the data has to be in the server's encoding 440 so that a <type>text</type> datum can contain it. This is checked in assertion-enabled 441 builds. 442 </para> 443 </sect2> 444 445 <sect2 id="logicaldecoding-output-plugin-callbacks"> 446 <title>Output Plugin Callbacks</title> 447 448 <para> 449 An output plugin gets notified about changes that are happening via 450 various callbacks it needs to provide. 451 </para> 452 453 <para> 454 Concurrent transactions are decoded in commit order, and only changes 455 belonging to a specific transaction are decoded between 456 the <literal>begin</literal> and <literal>commit</literal> 457 callbacks. Transactions that were rolled back explicitly or implicitly 458 never get 459 decoded. Successful savepoints are 460 folded into the transaction containing them in the order they were 461 executed within that transaction. 462 </para> 463 464 <note> 465 <para> 466 Only transactions that have already safely been flushed to disk will be 467 decoded. That can lead to a <command>COMMIT</command> not immediately being decoded in a 468 directly following <literal>pg_logical_slot_get_changes()</literal> 469 when <varname>synchronous_commit</varname> is set 470 to <literal>off</literal>. 471 </para> 472 </note> 473 474 <sect3 id="logicaldecoding-output-plugin-startup"> 475 <title>Startup Callback</title> 476 <para> 477 The optional <function>startup_cb</function> callback is called whenever 478 a replication slot is created or asked to stream changes, independent 479 of the number of changes that are ready to be put out. 480<programlisting> 481typedef void (*LogicalDecodeStartupCB) (struct LogicalDecodingContext *ctx, 482 OutputPluginOptions *options, 483 bool is_init); 484</programlisting> 485 The <literal>is_init</literal> parameter will be true when the 486 replication slot is being created and false 487 otherwise. <parameter>options</parameter> points to a struct of options 488 that output plugins can set: 489<programlisting> 490typedef struct OutputPluginOptions 491{ 492 OutputPluginOutputType output_type; 493 bool receive_rewrites; 494} OutputPluginOptions; 495</programlisting> 496 <literal>output_type</literal> has to either be set to 497 <literal>OUTPUT_PLUGIN_TEXTUAL_OUTPUT</literal> 498 or <literal>OUTPUT_PLUGIN_BINARY_OUTPUT</literal>. See also 499 <xref linkend="logicaldecoding-output-mode"/>. 500 If <literal>receive_rewrites</literal> is true, the output plugin will 501 also be called for changes made by heap rewrites during certain DDL 502 operations. These are of interest to plugins that handle DDL 503 replication, but they require special handling. 504 </para> 505 506 <para> 507 The startup callback should validate the options present in 508 <literal>ctx->output_plugin_options</literal>. If the output plugin 509 needs to have a state, it can 510 use <literal>ctx->output_plugin_private</literal> to store it. 511 </para> 512 </sect3> 513 514 <sect3 id="logicaldecoding-output-plugin-shutdown"> 515 <title>Shutdown Callback</title> 516 517 <para> 518 The optional <function>shutdown_cb</function> callback is called 519 whenever a formerly active replication slot is not used anymore and can 520 be used to deallocate resources private to the output plugin. The slot 521 isn't necessarily being dropped, streaming is just being stopped. 522<programlisting> 523typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx); 524</programlisting> 525 </para> 526 </sect3> 527 528 <sect3 id="logicaldecoding-output-plugin-begin"> 529 <title>Transaction Begin Callback</title> 530 531 <para> 532 The required <function>begin_cb</function> callback is called whenever a 533 start of a committed transaction has been decoded. Aborted transactions 534 and their contents never get decoded. 535<programlisting> 536typedef void (*LogicalDecodeBeginCB) (struct LogicalDecodingContext *ctx, 537 ReorderBufferTXN *txn); 538</programlisting> 539 The <parameter>txn</parameter> parameter contains meta information about 540 the transaction, like the time stamp at which it has been committed and 541 its XID. 542 </para> 543 </sect3> 544 545 <sect3 id="logicaldecoding-output-plugin-commit"> 546 <title>Transaction End Callback</title> 547 548 <para> 549 The required <function>commit_cb</function> callback is called whenever 550 a transaction commit has been 551 decoded. The <function>change_cb</function> callbacks for all modified 552 rows will have been called before this, if there have been any modified 553 rows. 554<programlisting> 555typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx, 556 ReorderBufferTXN *txn, 557 XLogRecPtr commit_lsn); 558</programlisting> 559 </para> 560 </sect3> 561 562 <sect3 id="logicaldecoding-output-plugin-change"> 563 <title>Change Callback</title> 564 565 <para> 566 The required <function>change_cb</function> callback is called for every 567 individual row modification inside a transaction, may it be 568 an <command>INSERT</command>, <command>UPDATE</command>, 569 or <command>DELETE</command>. Even if the original command modified 570 several rows at once the callback will be called individually for each 571 row. 572<programlisting> 573typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx, 574 ReorderBufferTXN *txn, 575 Relation relation, 576 ReorderBufferChange *change); 577</programlisting> 578 The <parameter>ctx</parameter> and <parameter>txn</parameter> parameters 579 have the same contents as for the <function>begin_cb</function> 580 and <function>commit_cb</function> callbacks, but additionally the 581 relation descriptor <parameter>relation</parameter> points to the 582 relation the row belongs to and a struct 583 <parameter>change</parameter> describing the row modification are passed 584 in. 585 </para> 586 587 <note> 588 <para> 589 Only changes in user defined tables that are not unlogged 590 (see <xref linkend="sql-createtable-unlogged"/>) and not temporary 591 (see <xref linkend="sql-createtable-temporary"/>) can be extracted using 592 logical decoding. 593 </para> 594 </note> 595 </sect3> 596 597 <sect3 id="logicaldecoding-output-plugin-truncate"> 598 <title>Truncate Callback</title> 599 600 <para> 601 The <function>truncate_cb</function> callback is called for a 602 <command>TRUNCATE</command> command. 603<programlisting> 604typedef void (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx, 605 ReorderBufferTXN *txn, 606 int nrelations, 607 Relation relations[], 608 ReorderBufferChange *change); 609</programlisting> 610 The parameters are analogous to the <function>change_cb</function> 611 callback. However, because <command>TRUNCATE</command> actions on 612 tables connected by foreign keys need to be executed together, this 613 callback receives an array of relations instead of just a single one. 614 See the description of the <xref linkend="sql-truncate"/> statement for 615 details. 616 </para> 617 </sect3> 618 619 <sect3 id="logicaldecoding-output-plugin-filter-origin"> 620 <title>Origin Filter Callback</title> 621 622 <para> 623 The optional <function>filter_by_origin_cb</function> callback 624 is called to determine whether data that has been replayed 625 from <parameter>origin_id</parameter> is of interest to the 626 output plugin. 627<programlisting> 628typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ctx, 629 RepOriginId origin_id); 630</programlisting> 631 The <parameter>ctx</parameter> parameter has the same contents 632 as for the other callbacks. No information but the origin is 633 available. To signal that changes originating on the passed in 634 node are irrelevant, return true, causing them to be filtered 635 away; false otherwise. The other callbacks will not be called 636 for transactions and changes that have been filtered away. 637 </para> 638 <para> 639 This is useful when implementing cascading or multidirectional 640 replication solutions. Filtering by the origin allows to 641 prevent replicating the same changes back and forth in such 642 setups. While transactions and changes also carry information 643 about the origin, filtering via this callback is noticeably 644 more efficient. 645 </para> 646 </sect3> 647 648 <sect3 id="logicaldecoding-output-plugin-message"> 649 <title>Generic Message Callback</title> 650 651 <para> 652 The optional <function>message_cb</function> callback is called whenever 653 a logical decoding message has been decoded. 654<programlisting> 655typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx, 656 ReorderBufferTXN *txn, 657 XLogRecPtr message_lsn, 658 bool transactional, 659 const char *prefix, 660 Size message_size, 661 const char *message); 662</programlisting> 663 The <parameter>txn</parameter> parameter contains meta information about 664 the transaction, like the time stamp at which it has been committed and 665 its XID. Note however that it can be NULL when the message is 666 non-transactional and the XID was not assigned yet in the transaction 667 which logged the message. The <parameter>lsn</parameter> has WAL 668 location of the message. The <parameter>transactional</parameter> says 669 if the message was sent as transactional or not. 670 The <parameter>prefix</parameter> is arbitrary null-terminated prefix 671 which can be used for identifying interesting messages for the current 672 plugin. And finally the <parameter>message</parameter> parameter holds 673 the actual message of <parameter>message_size</parameter> size. 674 </para> 675 <para> 676 Extra care should be taken to ensure that the prefix the output plugin 677 considers interesting is unique. Using name of the extension or the 678 output plugin itself is often a good choice. 679 </para> 680 </sect3> 681 682 </sect2> 683 684 <sect2 id="logicaldecoding-output-plugin-output"> 685 <title>Functions for Producing Output</title> 686 687 <para> 688 To actually produce output, output plugins can write data to 689 the <literal>StringInfo</literal> output buffer 690 in <literal>ctx->out</literal> when inside 691 the <function>begin_cb</function>, <function>commit_cb</function>, 692 or <function>change_cb</function> callbacks. Before writing to the output 693 buffer, <function>OutputPluginPrepareWrite(ctx, last_write)</function> has 694 to be called, and after finishing writing to the 695 buffer, <function>OutputPluginWrite(ctx, last_write)</function> has to be 696 called to perform the write. The <parameter>last_write</parameter> 697 indicates whether a particular write was the callback's last write. 698 </para> 699 700 <para> 701 The following example shows how to output data to the consumer of an 702 output plugin: 703<programlisting> 704OutputPluginPrepareWrite(ctx, true); 705appendStringInfo(ctx->out, "BEGIN %u", txn->xid); 706OutputPluginWrite(ctx, true); 707</programlisting> 708 </para> 709 </sect2> 710 </sect1> 711 712 <sect1 id="logicaldecoding-writer"> 713 <title>Logical Decoding Output Writers</title> 714 715 <para> 716 It is possible to add more output methods for logical decoding. 717 For details, see 718 <filename>src/backend/replication/logical/logicalfuncs.c</filename>. 719 Essentially, three functions need to be provided: one to read WAL, one to 720 prepare writing output, and one to write the output 721 (see <xref linkend="logicaldecoding-output-plugin-output"/>). 722 </para> 723 </sect1> 724 725 <sect1 id="logicaldecoding-synchronous"> 726 <title>Synchronous Replication Support for Logical Decoding</title> 727 <sect2> 728 <title>Overview</title> 729 730 <para> 731 Logical decoding can be used to build 732 <link linkend="synchronous-replication">synchronous 733 replication</link> solutions with the same user interface as synchronous 734 replication for <link linkend="streaming-replication">streaming 735 replication</link>. To do this, the streaming replication interface 736 (see <xref linkend="logicaldecoding-walsender"/>) must be used to stream out 737 data. Clients have to send <literal>Standby status update (F)</literal> 738 (see <xref linkend="protocol-replication"/>) messages, just like streaming 739 replication clients do. 740 </para> 741 742 <note> 743 <para> 744 A synchronous replica receiving changes via logical decoding will work in 745 the scope of a single database. Since, in contrast to 746 that, <parameter>synchronous_standby_names</parameter> currently is 747 server wide, this means this technique will not work properly if more 748 than one database is actively used. 749 </para> 750 </note> 751 </sect2> 752 753 <sect2 id="logicaldecoding-synchronous-caveats"> 754 <title>Caveats</title> 755 756 <para> 757 In synchronous replication setup, a deadlock can happen, if the transaction 758 has locked [user] catalog tables exclusively. See 759 <xref linkend="logicaldecoding-capabilities"/> for information on user 760 catalog tables. This is because logical decoding of transactions can lock 761 catalog tables to access them. To avoid this users must refrain from taking 762 an exclusive lock on [user] catalog tables. This can happen in the following 763 ways: 764 765 <itemizedlist> 766 <listitem> 767 <para> 768 Issuing an explicit <command>LOCK</command> on <structname>pg_class</structname> 769 in a transaction. 770 </para> 771 </listitem> 772 773 <listitem> 774 <para> 775 Perform <command>CLUSTER</command> on <structname>pg_class</structname> in a 776 transaction. 777 </para> 778 </listitem> 779 780 <listitem> 781 <para> 782 Executing <command>TRUNCATE</command> on [user] catalog table in a 783 transaction. 784 </para> 785 </listitem> 786 </itemizedlist> 787 788 Note that these commands that can cause deadlock apply to not only explicitly 789 indicated system catalog tables above but also to any other [user] catalog 790 table. 791 </para> 792 </sect2> 793 </sect1> 794 </chapter> 795