1 /*------------------------------------------------------------------------- 2 * logical.h 3 * PostgreSQL logical decoding coordination 4 * 5 * Copyright (c) 2012-2016, PostgreSQL Global Development Group 6 * 7 *------------------------------------------------------------------------- 8 */ 9 #ifndef LOGICAL_H 10 #define LOGICAL_H 11 12 #include "replication/slot.h" 13 14 #include "access/xlog.h" 15 #include "access/xlogreader.h" 16 #include "replication/output_plugin.h" 17 18 struct LogicalDecodingContext; 19 20 typedef void (*LogicalOutputPluginWriterWrite) ( 21 struct LogicalDecodingContext *lr, 22 XLogRecPtr Ptr, 23 TransactionId xid, 24 bool last_write 25 ); 26 27 typedef LogicalOutputPluginWriterWrite LogicalOutputPluginWriterPrepareWrite; 28 29 typedef struct LogicalDecodingContext 30 { 31 /* memory context this is all allocated in */ 32 MemoryContext context; 33 34 /* infrastructure pieces */ 35 XLogReaderState *reader; 36 ReplicationSlot *slot; 37 struct ReorderBuffer *reorder; 38 struct SnapBuild *snapshot_builder; 39 40 OutputPluginCallbacks callbacks; 41 OutputPluginOptions options; 42 43 /* 44 * User specified options 45 */ 46 List *output_plugin_options; 47 48 /* 49 * User-Provided callback for writing/streaming out data. 50 */ 51 LogicalOutputPluginWriterPrepareWrite prepare_write; 52 LogicalOutputPluginWriterWrite write; 53 54 /* 55 * Output buffer. 56 */ 57 StringInfo out; 58 59 /* 60 * Private data pointer of the output plugin. 61 */ 62 void *output_plugin_private; 63 64 /* 65 * Private data pointer for the data writer. 66 */ 67 void *output_writer_private; 68 69 /* 70 * State for writing output. 71 */ 72 bool accept_writes; 73 bool prepared_write; 74 XLogRecPtr write_location; 75 TransactionId write_xid; 76 } LogicalDecodingContext; 77 78 extern void CheckLogicalDecodingRequirements(void); 79 80 extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin, 81 List *output_plugin_options, 82 bool need_full_snapshot, 83 XLogPageReadCB read_page, 84 LogicalOutputPluginWriterPrepareWrite prepare_write, 85 LogicalOutputPluginWriterWrite do_write); 86 extern LogicalDecodingContext *CreateDecodingContext( 87 XLogRecPtr start_lsn, 88 List *output_plugin_options, 89 XLogPageReadCB read_page, 90 LogicalOutputPluginWriterPrepareWrite prepare_write, 91 LogicalOutputPluginWriterWrite do_write); 92 extern void DecodingContextFindStartpoint(LogicalDecodingContext *ctx); 93 extern bool DecodingContextReady(LogicalDecodingContext *ctx); 94 extern void FreeDecodingContext(LogicalDecodingContext *ctx); 95 96 extern void LogicalIncreaseXminForSlot(XLogRecPtr lsn, TransactionId xmin); 97 extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, 98 XLogRecPtr restart_lsn); 99 extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn); 100 101 extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id); 102 103 #endif 104