1 /*-------------------------------------------------------------------------
2 *
3 * tcn.c
4 * triggered change notification support for PostgreSQL
5 *
6 * Portions Copyright (c) 2011-2018, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 *
10 * IDENTIFICATION
11 * contrib/tcn/tcn.c
12 *
13 *-------------------------------------------------------------------------
14 */
15
16 #include "postgres.h"
17
18 #include "access/htup_details.h"
19 #include "executor/spi.h"
20 #include "commands/async.h"
21 #include "commands/trigger.h"
22 #include "lib/stringinfo.h"
23 #include "utils/rel.h"
24 #include "utils/syscache.h"
25
26 PG_MODULE_MAGIC;
27
28 /*
29 * Copy from s (for source) to r (for result), wrapping with q (quote)
30 * characters and doubling any quote characters found.
31 */
32 static void
strcpy_quoted(StringInfo r,const char * s,const char q)33 strcpy_quoted(StringInfo r, const char *s, const char q)
34 {
35 appendStringInfoCharMacro(r, q);
36 while (*s)
37 {
38 if (*s == q)
39 appendStringInfoCharMacro(r, q);
40 appendStringInfoCharMacro(r, *s);
41 s++;
42 }
43 appendStringInfoCharMacro(r, q);
44 }
45
46 /*
47 * triggered_change_notification
48 *
49 * This trigger function will send a notification of data modification with
50 * primary key values. The channel will be "tcn" unless the trigger is
51 * created with a parameter, in which case that parameter will be used.
52 */
53 PG_FUNCTION_INFO_V1(triggered_change_notification);
54
55 Datum
triggered_change_notification(PG_FUNCTION_ARGS)56 triggered_change_notification(PG_FUNCTION_ARGS)
57 {
58 TriggerData *trigdata = (TriggerData *) fcinfo->context;
59 Trigger *trigger;
60 int nargs;
61 HeapTuple trigtuple;
62 Relation rel;
63 TupleDesc tupdesc;
64 char *channel;
65 char operation;
66 StringInfo payload = makeStringInfo();
67 bool foundPK;
68
69 List *indexoidlist;
70 ListCell *indexoidscan;
71
72 /* make sure it's called as a trigger */
73 if (!CALLED_AS_TRIGGER(fcinfo))
74 ereport(ERROR,
75 (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
76 errmsg("triggered_change_notification: must be called as trigger")));
77
78 /* and that it's called after the change */
79 if (!TRIGGER_FIRED_AFTER(trigdata->tg_event))
80 ereport(ERROR,
81 (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
82 errmsg("triggered_change_notification: must be called after the change")));
83
84 /* and that it's called for each row */
85 if (!TRIGGER_FIRED_FOR_ROW(trigdata->tg_event))
86 ereport(ERROR,
87 (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
88 errmsg("triggered_change_notification: must be called for each row")));
89
90 if (TRIGGER_FIRED_BY_INSERT(trigdata->tg_event))
91 operation = 'I';
92 else if (TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event))
93 operation = 'U';
94 else if (TRIGGER_FIRED_BY_DELETE(trigdata->tg_event))
95 operation = 'D';
96 else
97 {
98 elog(ERROR, "triggered_change_notification: trigger fired by unrecognized operation");
99 operation = 'X'; /* silence compiler warning */
100 }
101
102 trigger = trigdata->tg_trigger;
103 nargs = trigger->tgnargs;
104 if (nargs > 1)
105 ereport(ERROR,
106 (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
107 errmsg("triggered_change_notification: must not be called with more than one parameter")));
108
109 if (nargs == 0)
110 channel = "tcn";
111 else
112 channel = trigger->tgargs[0];
113
114 /* get tuple data */
115 trigtuple = trigdata->tg_trigtuple;
116 rel = trigdata->tg_relation;
117 tupdesc = rel->rd_att;
118
119 foundPK = false;
120
121 /*
122 * Get the list of index OIDs for the table from the relcache, and look up
123 * each one in the pg_index syscache until we find one marked primary key
124 * (hopefully there isn't more than one such).
125 */
126 indexoidlist = RelationGetIndexList(rel);
127
128 foreach(indexoidscan, indexoidlist)
129 {
130 Oid indexoid = lfirst_oid(indexoidscan);
131 HeapTuple indexTuple;
132 Form_pg_index index;
133
134 indexTuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(indexoid));
135 if (!HeapTupleIsValid(indexTuple)) /* should not happen */
136 elog(ERROR, "cache lookup failed for index %u", indexoid);
137 index = (Form_pg_index) GETSTRUCT(indexTuple);
138 /* we're only interested if it is the primary key and valid */
139 if (index->indisprimary && IndexIsValid(index))
140 {
141 int indnkeyatts = index->indnkeyatts;
142
143 if (indnkeyatts > 0)
144 {
145 int i;
146
147 foundPK = true;
148
149 strcpy_quoted(payload, RelationGetRelationName(rel), '"');
150 appendStringInfoCharMacro(payload, ',');
151 appendStringInfoCharMacro(payload, operation);
152
153 for (i = 0; i < indnkeyatts; i++)
154 {
155 int colno = index->indkey.values[i];
156 Form_pg_attribute attr = TupleDescAttr(tupdesc, colno - 1);
157
158 appendStringInfoCharMacro(payload, ',');
159 strcpy_quoted(payload, NameStr(attr->attname), '"');
160 appendStringInfoCharMacro(payload, '=');
161 strcpy_quoted(payload, SPI_getvalue(trigtuple, tupdesc, colno), '\'');
162 }
163
164 Async_Notify(channel, payload->data);
165 }
166 ReleaseSysCache(indexTuple);
167 break;
168 }
169 ReleaseSysCache(indexTuple);
170 }
171
172 list_free(indexoidlist);
173
174 if (!foundPK)
175 ereport(ERROR,
176 (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
177 errmsg("triggered_change_notification: must be called on a table with a primary key")));
178
179 return PointerGetDatum(NULL); /* after trigger; value doesn't matter */
180 }
181