1 /*-------------------------------------------------------------------------
2  *
3  * tcn.c
4  *	  triggered change notification support for PostgreSQL
5  *
6  * Portions Copyright (c) 2011-2018, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *	  contrib/tcn/tcn.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 
16 #include "postgres.h"
17 
18 #include "access/htup_details.h"
19 #include "executor/spi.h"
20 #include "commands/async.h"
21 #include "commands/trigger.h"
22 #include "lib/stringinfo.h"
23 #include "utils/rel.h"
24 #include "utils/syscache.h"
25 
26 PG_MODULE_MAGIC;
27 
28 /*
29  * Copy from s (for source) to r (for result), wrapping with q (quote)
30  * characters and doubling any quote characters found.
31  */
32 static void
strcpy_quoted(StringInfo r,const char * s,const char q)33 strcpy_quoted(StringInfo r, const char *s, const char q)
34 {
35 	appendStringInfoCharMacro(r, q);
36 	while (*s)
37 	{
38 		if (*s == q)
39 			appendStringInfoCharMacro(r, q);
40 		appendStringInfoCharMacro(r, *s);
41 		s++;
42 	}
43 	appendStringInfoCharMacro(r, q);
44 }
45 
46 /*
47  * triggered_change_notification
48  *
49  * This trigger function will send a notification of data modification with
50  * primary key values.  The channel will be "tcn" unless the trigger is
51  * created with a parameter, in which case that parameter will be used.
52  */
53 PG_FUNCTION_INFO_V1(triggered_change_notification);
54 
55 Datum
triggered_change_notification(PG_FUNCTION_ARGS)56 triggered_change_notification(PG_FUNCTION_ARGS)
57 {
58 	TriggerData *trigdata = (TriggerData *) fcinfo->context;
59 	Trigger    *trigger;
60 	int			nargs;
61 	HeapTuple	trigtuple;
62 	Relation	rel;
63 	TupleDesc	tupdesc;
64 	char	   *channel;
65 	char		operation;
66 	StringInfo	payload = makeStringInfo();
67 	bool		foundPK;
68 
69 	List	   *indexoidlist;
70 	ListCell   *indexoidscan;
71 
72 	/* make sure it's called as a trigger */
73 	if (!CALLED_AS_TRIGGER(fcinfo))
74 		ereport(ERROR,
75 				(errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
76 				 errmsg("triggered_change_notification: must be called as trigger")));
77 
78 	/* and that it's called after the change */
79 	if (!TRIGGER_FIRED_AFTER(trigdata->tg_event))
80 		ereport(ERROR,
81 				(errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
82 				 errmsg("triggered_change_notification: must be called after the change")));
83 
84 	/* and that it's called for each row */
85 	if (!TRIGGER_FIRED_FOR_ROW(trigdata->tg_event))
86 		ereport(ERROR,
87 				(errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
88 				 errmsg("triggered_change_notification: must be called for each row")));
89 
90 	if (TRIGGER_FIRED_BY_INSERT(trigdata->tg_event))
91 		operation = 'I';
92 	else if (TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event))
93 		operation = 'U';
94 	else if (TRIGGER_FIRED_BY_DELETE(trigdata->tg_event))
95 		operation = 'D';
96 	else
97 	{
98 		elog(ERROR, "triggered_change_notification: trigger fired by unrecognized operation");
99 		operation = 'X';		/* silence compiler warning */
100 	}
101 
102 	trigger = trigdata->tg_trigger;
103 	nargs = trigger->tgnargs;
104 	if (nargs > 1)
105 		ereport(ERROR,
106 				(errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
107 				 errmsg("triggered_change_notification: must not be called with more than one parameter")));
108 
109 	if (nargs == 0)
110 		channel = "tcn";
111 	else
112 		channel = trigger->tgargs[0];
113 
114 	/* get tuple data */
115 	trigtuple = trigdata->tg_trigtuple;
116 	rel = trigdata->tg_relation;
117 	tupdesc = rel->rd_att;
118 
119 	foundPK = false;
120 
121 	/*
122 	 * Get the list of index OIDs for the table from the relcache, and look up
123 	 * each one in the pg_index syscache until we find one marked primary key
124 	 * (hopefully there isn't more than one such).
125 	 */
126 	indexoidlist = RelationGetIndexList(rel);
127 
128 	foreach(indexoidscan, indexoidlist)
129 	{
130 		Oid			indexoid = lfirst_oid(indexoidscan);
131 		HeapTuple	indexTuple;
132 		Form_pg_index index;
133 
134 		indexTuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(indexoid));
135 		if (!HeapTupleIsValid(indexTuple))	/* should not happen */
136 			elog(ERROR, "cache lookup failed for index %u", indexoid);
137 		index = (Form_pg_index) GETSTRUCT(indexTuple);
138 		/* we're only interested if it is the primary key and valid */
139 		if (index->indisprimary && IndexIsValid(index))
140 		{
141 			int			indnkeyatts = index->indnkeyatts;
142 
143 			if (indnkeyatts > 0)
144 			{
145 				int			i;
146 
147 				foundPK = true;
148 
149 				strcpy_quoted(payload, RelationGetRelationName(rel), '"');
150 				appendStringInfoCharMacro(payload, ',');
151 				appendStringInfoCharMacro(payload, operation);
152 
153 				for (i = 0; i < indnkeyatts; i++)
154 				{
155 					int			colno = index->indkey.values[i];
156 					Form_pg_attribute attr = TupleDescAttr(tupdesc, colno - 1);
157 
158 					appendStringInfoCharMacro(payload, ',');
159 					strcpy_quoted(payload, NameStr(attr->attname), '"');
160 					appendStringInfoCharMacro(payload, '=');
161 					strcpy_quoted(payload, SPI_getvalue(trigtuple, tupdesc, colno), '\'');
162 				}
163 
164 				Async_Notify(channel, payload->data);
165 			}
166 			ReleaseSysCache(indexTuple);
167 			break;
168 		}
169 		ReleaseSysCache(indexTuple);
170 	}
171 
172 	list_free(indexoidlist);
173 
174 	if (!foundPK)
175 		ereport(ERROR,
176 				(errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
177 				 errmsg("triggered_change_notification: must be called on a table with a primary key")));
178 
179 	return PointerGetDatum(NULL);	/* after trigger; value doesn't matter */
180 }
181