1 /*-------------------------------------------------------------------------
2  *
3  * message.c
4  *	  Generic logical messages.
5  *
6  * Copyright (c) 2013-2017, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  *	  src/backend/replication/logical/message.c
10  *
11  * NOTES
12  *
13  * Generic logical messages allow XLOG logging of arbitrary binary blobs that
14  * get passed to the logical decoding plugin. In normal XLOG processing they
15  * are same as NOOP.
16  *
17  * These messages can be either transactional or non-transactional.
18  * Transactional messages are part of current transaction and will be sent to
19  * decoding plugin using in a same way as DML operations.
20  * Non-transactional messages are sent to the plugin at the time when the
21  * logical decoding reads them from XLOG. This also means that transactional
22  * messages won't be delivered if the transaction was rolled back but the
23  * non-transactional one will always be delivered.
24  *
25  * Every message carries prefix to avoid conflicts between different decoding
26  * plugins. The plugin authors must take extra care to use unique prefix,
27  * good options seems to be for example to use the name of the extension.
28  *
29  * ---------------------------------------------------------------------------
30  */
31 
32 #include "postgres.h"
33 
34 #include "miscadmin.h"
35 
36 #include "access/xact.h"
37 
38 #include "catalog/indexing.h"
39 
40 #include "nodes/execnodes.h"
41 
42 #include "replication/message.h"
43 #include "replication/logical.h"
44 
45 #include "utils/memutils.h"
46 
47 /*
48  * Write logical decoding message into XLog.
49  */
50 XLogRecPtr
LogLogicalMessage(const char * prefix,const char * message,size_t size,bool transactional)51 LogLogicalMessage(const char *prefix, const char *message, size_t size,
52 				  bool transactional)
53 {
54 	xl_logical_message xlrec;
55 
56 	/*
57 	 * Force xid to be allocated if we're emitting a transactional message.
58 	 */
59 	if (transactional)
60 	{
61 		Assert(IsTransactionState());
62 		GetCurrentTransactionId();
63 	}
64 
65 	xlrec.dbId = MyDatabaseId;
66 	xlrec.transactional = transactional;
67 	xlrec.prefix_size = strlen(prefix) + 1;
68 	xlrec.message_size = size;
69 
70 	XLogBeginInsert();
71 	XLogRegisterData((char *) &xlrec, SizeOfLogicalMessage);
72 	XLogRegisterData((char *) prefix, xlrec.prefix_size);
73 	XLogRegisterData((char *) message, size);
74 
75 	/* allow origin filtering */
76 	XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
77 
78 	return XLogInsert(RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE);
79 }
80 
81 /*
82  * Redo is basically just noop for logical decoding messages.
83  */
84 void
logicalmsg_redo(XLogReaderState * record)85 logicalmsg_redo(XLogReaderState *record)
86 {
87 	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
88 
89 	if (info != XLOG_LOGICAL_MESSAGE)
90 		elog(PANIC, "logicalmsg_redo: unknown op code %u", info);
91 
92 	/* This is only interesting for logical decoding, see decode.c. */
93 }
94