1 /*-------------------------------------------------------------------------
2  *
3  * pglogical_executor.c
4  * 		pglogical executor related functions
5  *
6  * Copyright (c) 2015, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  *		  pglogical_executor.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include "miscadmin.h"
16 
17 #include "access/hash.h"
18 #include "access/htup_details.h"
19 #include "access/xact.h"
20 #include "access/xlog.h"
21 
22 #include "catalog/dependency.h"
23 #include "catalog/namespace.h"
24 #include "catalog/objectaccess.h"
25 #include "catalog/pg_extension.h"
26 #include "catalog/pg_type.h"
27 
28 #include "commands/extension.h"
29 #include "commands/trigger.h"
30 
31 #include "executor/executor.h"
32 
33 #include "nodes/nodeFuncs.h"
34 
35 #if PG_VERSION_NUM >= 120000
36 #include "optimizer/optimizer.h"
37 #else
38 #include "optimizer/planner.h"
39 #endif
40 
41 #include "parser/parse_coerce.h"
42 
43 #include "tcop/utility.h"
44 
45 #include "utils/builtins.h"
46 #include "utils/fmgroids.h"
47 #include "utils/json.h"
48 #include "utils/lsyscache.h"
49 #include "utils/memutils.h"
50 #include "utils/rel.h"
51 #include "utils/snapmgr.h"
52 
53 #include "pglogical_node.h"
54 #include "pglogical_executor.h"
55 #include "pglogical_repset.h"
56 #include "pglogical_queue.h"
57 #include "pglogical_dependency.h"
58 #include "pglogical.h"
59 
60 List *pglogical_truncated_tables = NIL;
61 
62 static DropBehavior	pglogical_lastDropBehavior = DROP_RESTRICT;
63 static bool			dropping_pglogical_obj = false;
64 static object_access_hook_type next_object_access_hook = NULL;
65 
66 static ProcessUtility_hook_type next_ProcessUtility_hook = NULL;
67 
68 EState *
create_estate_for_relation(Relation rel,bool forwrite)69 create_estate_for_relation(Relation rel, bool forwrite)
70 {
71 	EState	   *estate;
72 	RangeTblEntry *rte;
73 
74 
75 	/* Dummy range table entry needed by executor. */
76 	rte = makeNode(RangeTblEntry);
77 	rte->rtekind = RTE_RELATION;
78 	rte->relid = RelationGetRelid(rel);
79 	rte->relkind = rel->rd_rel->relkind;
80 
81 	/* Initialize executor state. */
82 	estate = CreateExecutorState();
83 #if PG_VERSION_NUM >= 120000
84 	ExecInitRangeTable(estate, list_make1(rte));
85 #elif PG_VERSION_NUM >= 110000 && SECONDQ_VERSION_NUM >= 103
86 	/* 2ndQPostgres 11 r1.3 changes executor API */
87 	estate->es_range_table = alist_add(NULL, rte);
88 #else
89 	estate->es_range_table = list_make1(rte);
90 #endif
91 
92 #if PG_VERSION_NUM < 120000
93 	if (rel->trigdesc)
94 		estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate);
95 #endif
96 
97 	estate->es_output_cid = GetCurrentCommandId(forwrite);
98 
99 	return estate;
100 }
101 
102 ExprContext *
prepare_per_tuple_econtext(EState * estate,TupleDesc tupdesc)103 prepare_per_tuple_econtext(EState *estate, TupleDesc tupdesc)
104 {
105 	ExprContext	   *econtext;
106 	MemoryContext	oldContext;
107 
108 	econtext = GetPerTupleExprContext(estate);
109 
110 	oldContext = MemoryContextSwitchTo(estate->es_query_cxt);
111 	econtext->ecxt_scantuple = ExecInitExtraTupleSlot(estate);
112 	MemoryContextSwitchTo(oldContext);
113 
114 	ExecSetSlotDescriptor(econtext->ecxt_scantuple, tupdesc);
115 
116 	return econtext;
117 }
118 
119 ExprState *
pglogical_prepare_row_filter(Node * row_filter)120 pglogical_prepare_row_filter(Node *row_filter)
121 {
122 	ExprState  *exprstate;
123 	Expr	   *expr;
124 	Oid			exprtype;
125 
126 	exprtype = exprType(row_filter);
127 	expr = (Expr *) coerce_to_target_type(NULL,	/* no UNKNOWN params here */
128 										  row_filter, exprtype,
129 										  BOOLOID, -1,
130 										  COERCION_ASSIGNMENT,
131 										  COERCE_IMPLICIT_CAST,
132 										  -1);
133 
134 	/* This should never happen but just to be sure. */
135 	if (expr == NULL)
136 		ereport(ERROR,
137 				(errcode(ERRCODE_DATATYPE_MISMATCH),
138 				 errmsg("cannot cast the row_filter to boolean"),
139 			   errhint("You will need to rewrite the row_filter.")));
140 
141 	expr = expression_planner(expr);
142 	exprstate = ExecInitExpr(expr, NULL);
143 
144 	return exprstate;
145 }
146 
147 static void
pglogical_start_truncate(void)148 pglogical_start_truncate(void)
149 {
150 	pglogical_truncated_tables = NIL;
151 }
152 
153 static void
pglogical_finish_truncate(void)154 pglogical_finish_truncate(void)
155 {
156 	ListCell	   *tlc;
157 	PGLogicalLocalNode *local_node;
158 
159 	/* If this is not pglogical node, don't do anything. */
160 	local_node = get_local_node(false, true);
161 	if (!local_node || !list_length(pglogical_truncated_tables))
162 		return;
163 
164 	foreach (tlc, pglogical_truncated_tables)
165 	{
166 		Oid			reloid = lfirst_oid(tlc);
167 		char	   *nspname;
168 		char	   *relname;
169 		List	   *repsets;
170 		StringInfoData	json;
171 
172 		/* Format the query. */
173 		nspname = get_namespace_name(get_rel_namespace(reloid));
174 		relname = get_rel_name(reloid);
175 
176 		/* It's easier to construct json manually than via Jsonb API... */
177 		initStringInfo(&json);
178 		appendStringInfo(&json, "{\"schema_name\": ");
179 		escape_json(&json, nspname);
180 		appendStringInfo(&json, ",\"table_name\": ");
181 		escape_json(&json, relname);
182 		appendStringInfo(&json, "}");
183 
184 		repsets = get_table_replication_sets(local_node->node->id, reloid);
185 
186 		if (list_length(repsets))
187 		{
188 			List	   *repset_names = NIL;
189 			ListCell   *rlc;
190 
191 			foreach (rlc, repsets)
192 			{
193 				PGLogicalRepSet	    *repset = (PGLogicalRepSet *) lfirst(rlc);
194 				repset_names = lappend(repset_names, pstrdup(repset->name));
195 			}
196 
197 			/* Queue the truncate for replication. */
198 			queue_message(repset_names, GetUserId(),
199 						  QUEUE_COMMAND_TYPE_TRUNCATE, json.data);
200 		}
201 	}
202 
203 	list_free(pglogical_truncated_tables);
204 	pglogical_truncated_tables = NIL;
205 }
206 
207 static void
pglogical_ProcessUtility(PlannedStmt * pstmt,const char * queryString,bool readOnlyTree,ProcessUtilityContext context,ParamListInfo params,QueryEnvironment * queryEnv,DestReceiver * dest,bool sentToRemote,QueryCompletion * qc)208 pglogical_ProcessUtility(
209 #if PG_VERSION_NUM >= 100000
210 						 PlannedStmt *pstmt,
211 #else
212 						 Node *pstmt,
213 #endif
214 						 const char *queryString,
215 #if PG_VERSION_NUM >= 140000
216 						 bool readOnlyTree,
217 #endif
218 						 ProcessUtilityContext context,
219 						 ParamListInfo params,
220 #if PG_VERSION_NUM >= 100000
221 						 QueryEnvironment *queryEnv,
222 #endif
223 						 DestReceiver *dest,
224 #ifdef XCP
225 						 bool sentToRemote,
226 #endif
227 						 QueryCompletion *qc)
228 {
229 #if PG_VERSION_NUM >= 100000
230 	Node	   *parsetree = pstmt->utilityStmt;
231 #else
232 	Node	   *parsetree = pstmt;
233 	#define		queryEnv NULL
234 #endif
235 #ifndef XCP
236 	#define		sentToRemote NULL
237 #endif
238 
239 	dropping_pglogical_obj = false;
240 
241 	if (nodeTag(parsetree) == T_TruncateStmt)
242 		pglogical_start_truncate();
243 
244 	if (nodeTag(parsetree) == T_DropStmt)
245 		pglogical_lastDropBehavior = ((DropStmt *)parsetree)->behavior;
246 
247 	/* There's no reason we should be in a long lived context here */
248 	Assert(CurrentMemoryContext != TopMemoryContext
249 		   && CurrentMemoryContext != CacheMemoryContext);
250 
251 	if (next_ProcessUtility_hook)
252 		PGLnext_ProcessUtility_hook(pstmt, queryString, readOnlyTree, context, params,
253 									queryEnv, dest,
254 									sentToRemote,
255 									qc);
256 	else
257 		PGLstandard_ProcessUtility(pstmt, queryString, readOnlyTree, context, params,
258 								   queryEnv, dest,
259 								   sentToRemote,
260 								   qc);
261 
262 	if (nodeTag(parsetree) == T_TruncateStmt)
263 		pglogical_finish_truncate();
264 }
265 
266 
267 /*
268  * Handle object drop.
269  *
270  * Calls to dependency tracking code.
271  */
272 static void
pglogical_object_access(ObjectAccessType access,Oid classId,Oid objectId,int subId,void * arg)273 pglogical_object_access(ObjectAccessType access,
274 						Oid classId,
275 						Oid objectId,
276 						int subId,
277 						void *arg)
278 {
279 	if (next_object_access_hook)
280 		(*next_object_access_hook) (access, classId, objectId, subId, arg);
281 
282 	if (access == OAT_DROP)
283 	{
284 		ObjectAccessDrop   *drop_arg = (ObjectAccessDrop *) arg;
285 		ObjectAddress		object;
286 		DropBehavior		behavior;
287 
288 		/* No need to check for internal deletions. */
289 		if ((drop_arg->dropflags & PERFORM_DELETION_INTERNAL) != 0)
290 			return;
291 
292 		/* Dropping pglogical itself? */
293 		if (classId == ExtensionRelationId &&
294 			objectId == get_extension_oid(EXTENSION_NAME, true) &&
295 			objectId != InvalidOid /* Should not happen but check anyway */)
296 			dropping_pglogical_obj = true;
297 
298 		/* Dropping relation within pglogical? */
299 		if (classId == RelationRelationId)
300 		{
301 			Oid			relnspoid;
302 			Oid			pglnspoid;
303 
304 			pglnspoid = get_namespace_oid(EXTENSION_NAME, true);
305 			relnspoid = get_rel_namespace(objectId);
306 
307 			if (pglnspoid == relnspoid)
308 				dropping_pglogical_obj = true;
309 		}
310 
311 		/*
312 		 * Don't do extra dependency checks for internal objects, those
313 		 * should be handled by Postgres.
314 		 */
315 		if (dropping_pglogical_obj)
316 			return;
317 
318 		/* No local node? */
319 		if (!get_local_node(false, true))
320 			return;
321 
322 		ObjectAddressSubSet(object, classId, objectId, subId);
323 
324 		if (SessionReplicationRole == SESSION_REPLICATION_ROLE_REPLICA)
325 			behavior = DROP_CASCADE;
326 		else
327 			behavior = pglogical_lastDropBehavior;
328 
329 		pglogical_checkDependency(&object, behavior);
330 	}
331 }
332 
333 void
pglogical_executor_init(void)334 pglogical_executor_init(void)
335 {
336 	next_ProcessUtility_hook = ProcessUtility_hook;
337 	ProcessUtility_hook = pglogical_ProcessUtility;
338 
339 	/* Object access hook */
340 	next_object_access_hook = object_access_hook;
341 	object_access_hook = pglogical_object_access;
342 }
343