1 /*-------------------------------------------------------------------------
2 *
3 * pglogical_executor.c
4 * pglogical executor related functions
5 *
6 * Copyright (c) 2015, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * pglogical_executor.c
10 *
11 *-------------------------------------------------------------------------
12 */
13 #include "postgres.h"
14
15 #include "miscadmin.h"
16
17 #include "access/hash.h"
18 #include "access/htup_details.h"
19 #include "access/xact.h"
20 #include "access/xlog.h"
21
22 #include "catalog/dependency.h"
23 #include "catalog/namespace.h"
24 #include "catalog/objectaccess.h"
25 #include "catalog/pg_extension.h"
26 #include "catalog/pg_type.h"
27
28 #include "commands/extension.h"
29 #include "commands/trigger.h"
30
31 #include "executor/executor.h"
32
33 #include "nodes/nodeFuncs.h"
34
35 #if PG_VERSION_NUM >= 120000
36 #include "optimizer/optimizer.h"
37 #else
38 #include "optimizer/planner.h"
39 #endif
40
41 #include "parser/parse_coerce.h"
42
43 #include "tcop/utility.h"
44
45 #include "utils/builtins.h"
46 #include "utils/fmgroids.h"
47 #include "utils/json.h"
48 #include "utils/lsyscache.h"
49 #include "utils/memutils.h"
50 #include "utils/rel.h"
51 #include "utils/snapmgr.h"
52
53 #include "pglogical_node.h"
54 #include "pglogical_executor.h"
55 #include "pglogical_repset.h"
56 #include "pglogical_queue.h"
57 #include "pglogical_dependency.h"
58 #include "pglogical.h"
59
60 List *pglogical_truncated_tables = NIL;
61
62 static DropBehavior pglogical_lastDropBehavior = DROP_RESTRICT;
63 static bool dropping_pglogical_obj = false;
64 static object_access_hook_type next_object_access_hook = NULL;
65
66 static ProcessUtility_hook_type next_ProcessUtility_hook = NULL;
67
68 EState *
create_estate_for_relation(Relation rel,bool forwrite)69 create_estate_for_relation(Relation rel, bool forwrite)
70 {
71 EState *estate;
72 RangeTblEntry *rte;
73
74
75 /* Dummy range table entry needed by executor. */
76 rte = makeNode(RangeTblEntry);
77 rte->rtekind = RTE_RELATION;
78 rte->relid = RelationGetRelid(rel);
79 rte->relkind = rel->rd_rel->relkind;
80
81 /* Initialize executor state. */
82 estate = CreateExecutorState();
83 #if PG_VERSION_NUM >= 120000
84 ExecInitRangeTable(estate, list_make1(rte));
85 #elif PG_VERSION_NUM >= 110000 && SECONDQ_VERSION_NUM >= 103
86 /* 2ndQPostgres 11 r1.3 changes executor API */
87 estate->es_range_table = alist_add(NULL, rte);
88 #else
89 estate->es_range_table = list_make1(rte);
90 #endif
91
92 #if PG_VERSION_NUM < 120000
93 if (rel->trigdesc)
94 estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate);
95 #endif
96
97 estate->es_output_cid = GetCurrentCommandId(forwrite);
98
99 return estate;
100 }
101
102 ExprContext *
prepare_per_tuple_econtext(EState * estate,TupleDesc tupdesc)103 prepare_per_tuple_econtext(EState *estate, TupleDesc tupdesc)
104 {
105 ExprContext *econtext;
106 MemoryContext oldContext;
107
108 econtext = GetPerTupleExprContext(estate);
109
110 oldContext = MemoryContextSwitchTo(estate->es_query_cxt);
111 econtext->ecxt_scantuple = ExecInitExtraTupleSlot(estate);
112 MemoryContextSwitchTo(oldContext);
113
114 ExecSetSlotDescriptor(econtext->ecxt_scantuple, tupdesc);
115
116 return econtext;
117 }
118
119 ExprState *
pglogical_prepare_row_filter(Node * row_filter)120 pglogical_prepare_row_filter(Node *row_filter)
121 {
122 ExprState *exprstate;
123 Expr *expr;
124 Oid exprtype;
125
126 exprtype = exprType(row_filter);
127 expr = (Expr *) coerce_to_target_type(NULL, /* no UNKNOWN params here */
128 row_filter, exprtype,
129 BOOLOID, -1,
130 COERCION_ASSIGNMENT,
131 COERCE_IMPLICIT_CAST,
132 -1);
133
134 /* This should never happen but just to be sure. */
135 if (expr == NULL)
136 ereport(ERROR,
137 (errcode(ERRCODE_DATATYPE_MISMATCH),
138 errmsg("cannot cast the row_filter to boolean"),
139 errhint("You will need to rewrite the row_filter.")));
140
141 expr = expression_planner(expr);
142 exprstate = ExecInitExpr(expr, NULL);
143
144 return exprstate;
145 }
146
147 static void
pglogical_start_truncate(void)148 pglogical_start_truncate(void)
149 {
150 pglogical_truncated_tables = NIL;
151 }
152
153 static void
pglogical_finish_truncate(void)154 pglogical_finish_truncate(void)
155 {
156 ListCell *tlc;
157 PGLogicalLocalNode *local_node;
158
159 /* If this is not pglogical node, don't do anything. */
160 local_node = get_local_node(false, true);
161 if (!local_node || !list_length(pglogical_truncated_tables))
162 return;
163
164 foreach (tlc, pglogical_truncated_tables)
165 {
166 Oid reloid = lfirst_oid(tlc);
167 char *nspname;
168 char *relname;
169 List *repsets;
170 StringInfoData json;
171
172 /* Format the query. */
173 nspname = get_namespace_name(get_rel_namespace(reloid));
174 relname = get_rel_name(reloid);
175
176 /* It's easier to construct json manually than via Jsonb API... */
177 initStringInfo(&json);
178 appendStringInfo(&json, "{\"schema_name\": ");
179 escape_json(&json, nspname);
180 appendStringInfo(&json, ",\"table_name\": ");
181 escape_json(&json, relname);
182 appendStringInfo(&json, "}");
183
184 repsets = get_table_replication_sets(local_node->node->id, reloid);
185
186 if (list_length(repsets))
187 {
188 List *repset_names = NIL;
189 ListCell *rlc;
190
191 foreach (rlc, repsets)
192 {
193 PGLogicalRepSet *repset = (PGLogicalRepSet *) lfirst(rlc);
194 repset_names = lappend(repset_names, pstrdup(repset->name));
195 }
196
197 /* Queue the truncate for replication. */
198 queue_message(repset_names, GetUserId(),
199 QUEUE_COMMAND_TYPE_TRUNCATE, json.data);
200 }
201 }
202
203 list_free(pglogical_truncated_tables);
204 pglogical_truncated_tables = NIL;
205 }
206
207 static void
pglogical_ProcessUtility(PlannedStmt * pstmt,const char * queryString,bool readOnlyTree,ProcessUtilityContext context,ParamListInfo params,QueryEnvironment * queryEnv,DestReceiver * dest,bool sentToRemote,QueryCompletion * qc)208 pglogical_ProcessUtility(
209 #if PG_VERSION_NUM >= 100000
210 PlannedStmt *pstmt,
211 #else
212 Node *pstmt,
213 #endif
214 const char *queryString,
215 #if PG_VERSION_NUM >= 140000
216 bool readOnlyTree,
217 #endif
218 ProcessUtilityContext context,
219 ParamListInfo params,
220 #if PG_VERSION_NUM >= 100000
221 QueryEnvironment *queryEnv,
222 #endif
223 DestReceiver *dest,
224 #ifdef XCP
225 bool sentToRemote,
226 #endif
227 QueryCompletion *qc)
228 {
229 #if PG_VERSION_NUM >= 100000
230 Node *parsetree = pstmt->utilityStmt;
231 #else
232 Node *parsetree = pstmt;
233 #define queryEnv NULL
234 #endif
235 #ifndef XCP
236 #define sentToRemote NULL
237 #endif
238
239 dropping_pglogical_obj = false;
240
241 if (nodeTag(parsetree) == T_TruncateStmt)
242 pglogical_start_truncate();
243
244 if (nodeTag(parsetree) == T_DropStmt)
245 pglogical_lastDropBehavior = ((DropStmt *)parsetree)->behavior;
246
247 /* There's no reason we should be in a long lived context here */
248 Assert(CurrentMemoryContext != TopMemoryContext
249 && CurrentMemoryContext != CacheMemoryContext);
250
251 if (next_ProcessUtility_hook)
252 PGLnext_ProcessUtility_hook(pstmt, queryString, readOnlyTree, context, params,
253 queryEnv, dest,
254 sentToRemote,
255 qc);
256 else
257 PGLstandard_ProcessUtility(pstmt, queryString, readOnlyTree, context, params,
258 queryEnv, dest,
259 sentToRemote,
260 qc);
261
262 if (nodeTag(parsetree) == T_TruncateStmt)
263 pglogical_finish_truncate();
264 }
265
266
267 /*
268 * Handle object drop.
269 *
270 * Calls to dependency tracking code.
271 */
272 static void
pglogical_object_access(ObjectAccessType access,Oid classId,Oid objectId,int subId,void * arg)273 pglogical_object_access(ObjectAccessType access,
274 Oid classId,
275 Oid objectId,
276 int subId,
277 void *arg)
278 {
279 if (next_object_access_hook)
280 (*next_object_access_hook) (access, classId, objectId, subId, arg);
281
282 if (access == OAT_DROP)
283 {
284 ObjectAccessDrop *drop_arg = (ObjectAccessDrop *) arg;
285 ObjectAddress object;
286 DropBehavior behavior;
287
288 /* No need to check for internal deletions. */
289 if ((drop_arg->dropflags & PERFORM_DELETION_INTERNAL) != 0)
290 return;
291
292 /* Dropping pglogical itself? */
293 if (classId == ExtensionRelationId &&
294 objectId == get_extension_oid(EXTENSION_NAME, true) &&
295 objectId != InvalidOid /* Should not happen but check anyway */)
296 dropping_pglogical_obj = true;
297
298 /* Dropping relation within pglogical? */
299 if (classId == RelationRelationId)
300 {
301 Oid relnspoid;
302 Oid pglnspoid;
303
304 pglnspoid = get_namespace_oid(EXTENSION_NAME, true);
305 relnspoid = get_rel_namespace(objectId);
306
307 if (pglnspoid == relnspoid)
308 dropping_pglogical_obj = true;
309 }
310
311 /*
312 * Don't do extra dependency checks for internal objects, those
313 * should be handled by Postgres.
314 */
315 if (dropping_pglogical_obj)
316 return;
317
318 /* No local node? */
319 if (!get_local_node(false, true))
320 return;
321
322 ObjectAddressSubSet(object, classId, objectId, subId);
323
324 if (SessionReplicationRole == SESSION_REPLICATION_ROLE_REPLICA)
325 behavior = DROP_CASCADE;
326 else
327 behavior = pglogical_lastDropBehavior;
328
329 pglogical_checkDependency(&object, behavior);
330 }
331 }
332
333 void
pglogical_executor_init(void)334 pglogical_executor_init(void)
335 {
336 next_ProcessUtility_hook = ProcessUtility_hook;
337 ProcessUtility_hook = pglogical_ProcessUtility;
338
339 /* Object access hook */
340 next_object_access_hook = object_access_hook;
341 object_access_hook = pglogical_object_access;
342 }
343