1 /*-------------------------------------------------------------------------
2  *
3  * combine_query_planner.c
4  *	  Routines for planning the combine query that runs on the coordinator
5  *    to combine results from the workers.
6  *
7  * Copyright (c) Citus Data, Inc.
8  *
9  *-------------------------------------------------------------------------
10  */
11 
12 #include "postgres.h"
13 
14 #include "distributed/pg_version_constants.h"
15 
16 #include "catalog/pg_type.h"
17 #include "distributed/citus_ruleutils.h"
18 #include "distributed/insert_select_planner.h"
19 #include "distributed/listutils.h"
20 #include "distributed/metadata_cache.h"
21 #include "distributed/combine_query_planner.h"
22 #include "distributed/multi_physical_planner.h"
23 #include "nodes/makefuncs.h"
24 #include "nodes/nodeFuncs.h"
25 #include "optimizer/clauses.h"
26 #include "optimizer/planner.h"
27 #include "rewrite/rewriteManip.h"
28 
29 static List * RemoteScanTargetList(List *workerTargetList);
30 static PlannedStmt * BuildSelectStatementViaStdPlanner(Query *combineQuery,
31 													   List *remoteScanTargetList,
32 													   CustomScan *remoteScan);
33 static bool FindCitusExtradataContainerRTE(Node *node, RangeTblEntry **result);
34 
35 static Plan * CitusCustomScanPathPlan(PlannerInfo *root, RelOptInfo *rel,
36 									  struct CustomPath *best_path, List *tlist,
37 									  List *clauses, List *custom_plans);
38 
39 bool ReplaceCitusExtraDataContainer = false;
40 CustomScan *ReplaceCitusExtraDataContainerWithCustomScan = NULL;
41 
42 /*
43  * CitusCustomScanPathMethods defines the methods for a custom path we insert into the
44  * planner during the planning of the query part that will be executed on the node
45  * coordinating the query.
46  */
47 static CustomPathMethods CitusCustomScanPathMethods = {
48 	.CustomName = "CitusCustomScanPath",
49 	.PlanCustomPath = CitusCustomScanPathPlan,
50 };
51 
52 /*
53  * PlanCombineQuery takes in a distributed plan and a custom scan node which
54  * wraps remote part of the plan. This function finds the combine query structure
55  * in the multi plan, and builds the final select plan to execute on the tuples
56  * returned by remote scan on the coordinator node. Note that this select
57  * plan is executed after result files are retrieved from worker nodes and
58  * filled into the tuple store inside provided custom scan.
59  */
60 PlannedStmt *
PlanCombineQuery(DistributedPlan * distributedPlan,CustomScan * remoteScan)61 PlanCombineQuery(DistributedPlan *distributedPlan, CustomScan *remoteScan)
62 {
63 	Query *combineQuery = distributedPlan->combineQuery;
64 
65 	Job *workerJob = distributedPlan->workerJob;
66 	List *workerTargetList = workerJob->jobQuery->targetList;
67 	List *remoteScanTargetList = RemoteScanTargetList(workerTargetList);
68 	return BuildSelectStatementViaStdPlanner(combineQuery, remoteScanTargetList,
69 											 remoteScan);
70 }
71 
72 
73 /*
74  * RemoteScanTargetList uses the given worker target list's expressions, and creates
75  * a target list for the remote scan on the coordinator node.
76  */
77 static List *
RemoteScanTargetList(List * workerTargetList)78 RemoteScanTargetList(List *workerTargetList)
79 {
80 	List *remoteScanTargetList = NIL;
81 	const Index tableId = 1;
82 	AttrNumber columnId = 1;
83 
84 	ListCell *workerTargetCell = NULL;
85 	foreach(workerTargetCell, workerTargetList)
86 	{
87 		TargetEntry *workerTargetEntry = (TargetEntry *) lfirst(workerTargetCell);
88 
89 		if (workerTargetEntry->resjunk)
90 		{
91 			continue;
92 		}
93 
94 		Var *remoteScanColumn = makeVarFromTargetEntry(tableId, workerTargetEntry);
95 		remoteScanColumn->varattno = columnId;
96 		remoteScanColumn->varattnosyn = columnId;
97 		columnId++;
98 
99 		if (remoteScanColumn->vartype == RECORDOID || remoteScanColumn->vartype ==
100 			RECORDARRAYOID)
101 		{
102 			remoteScanColumn->vartypmod = BlessRecordExpression(workerTargetEntry->expr);
103 		}
104 
105 		/*
106 		 * The remote scan target entry has two pieces to it. The first piece is the
107 		 * target entry's expression, which we set to the newly created column.
108 		 * The second piece is sort and group clauses that we implicitly copy
109 		 * from the worker target entry. Note that any changes to worker target
110 		 * entry's sort and group clauses will *break* us here.
111 		 */
112 		TargetEntry *remoteScanTargetEntry = flatCopyTargetEntry(workerTargetEntry);
113 		remoteScanTargetEntry->expr = (Expr *) remoteScanColumn;
114 		remoteScanTargetList = lappend(remoteScanTargetList, remoteScanTargetEntry);
115 	}
116 
117 	return remoteScanTargetList;
118 }
119 
120 
121 /*
122  * CreateCitusCustomScanPath creates a custom path node that will return the CustomScan if
123  * the path ends up in the best_path during postgres planning. We use this function during
124  * the set relation hook of postgres during the planning of the query part that will be
125  * executed on the query coordinating node.
126  */
127 Path *
CreateCitusCustomScanPath(PlannerInfo * root,RelOptInfo * relOptInfo,Index restrictionIndex,RangeTblEntry * rte,CustomScan * remoteScan)128 CreateCitusCustomScanPath(PlannerInfo *root, RelOptInfo *relOptInfo,
129 						  Index restrictionIndex, RangeTblEntry *rte,
130 						  CustomScan *remoteScan)
131 {
132 	CitusCustomScanPath *path = (CitusCustomScanPath *) newNode(
133 		sizeof(CitusCustomScanPath), T_CustomPath);
134 	path->custom_path.methods = &CitusCustomScanPathMethods;
135 	path->custom_path.path.pathtype = T_CustomScan;
136 	path->custom_path.path.pathtarget = relOptInfo->reltarget;
137 	path->custom_path.path.parent = relOptInfo;
138 
139 	/*
140 	 * The 100k rows we put on the cost of the path is kind of arbitrary and could be
141 	 * improved in accuracy to produce better plans.
142 	 *
143 	 * 100k on the row estimate causes the postgres planner to behave very much like the
144 	 * old citus planner in the plans it produces. Namely the old planner had hardcoded
145 	 * the use of Hash Aggregates for most of the operations, unless a postgres guc was
146 	 * set that would disallow hash aggregates to be used.
147 	 *
148 	 * Ideally we would be able to provide estimates close to postgres' estimates on the
149 	 * workers to let the standard planner choose an optimal solution for the combineQuery.
150 	 */
151 	path->custom_path.path.rows = 100000;
152 	path->remoteScan = remoteScan;
153 
154 	return (Path *) path;
155 }
156 
157 
158 /*
159  * CitusCustomScanPathPlan is called for the CitusCustomScanPath node in the best_path
160  * after the postgres planner has evaluated all possible paths.
161  *
162  * This function returns a Plan node, more specifically the CustomScan Plan node that has
163  * the ability to execute the distributed part of the query.
164  *
165  * When this function is called there is an extra list of clauses passed in that might not
166  * already have been applied to the plan. We add these clauses to the quals this node will
167  * execute. The quals are evaluated before returning the tuples scanned from the workers
168  * to the plan above ours to make sure they do not end up in the final result.
169  */
170 static Plan *
CitusCustomScanPathPlan(PlannerInfo * root,RelOptInfo * rel,struct CustomPath * best_path,List * tlist,List * clauses,List * custom_plans)171 CitusCustomScanPathPlan(PlannerInfo *root,
172 						RelOptInfo *rel,
173 						struct CustomPath *best_path,
174 						List *tlist,
175 						List *clauses,
176 						List *custom_plans)
177 {
178 	CitusCustomScanPath *citusPath = (CitusCustomScanPath *) best_path;
179 
180 	/*
181 	 * Columns could have been pruned from the target list by the standard planner.
182 	 * A situation in which this might happen is a CASE that is proven to be always the
183 	 * same causing the other column to become useless;
184 	 *   CASE WHEN ... <> NULL
185 	 *     THEN ...
186 	 *     ELSE ...
187 	 *   END
188 	 * Since nothing is equal to NULL it will always end up in the else branch. The final
189 	 * target list the planenr needs from our node is passed in as tlist. By placing that
190 	 * as the target list on our scan the internal rows will be projected to this one.
191 	 */
192 	citusPath->remoteScan->scan.plan.targetlist = tlist;
193 
194 	/*
195 	 * The custom_scan_tlist contains target entries for to the "output" of the call
196 	 * to citus_extradata_container, which is actually replaced by a CustomScan.
197 	 * The target entries are initialized with varno 1 (see RemoteScanTargetList), since
198 	 * it's currently the only relation in the join tree of the combineQuery.
199 	 *
200 	 * If the citus_extradata_container function call is not the first relation to
201 	 * appear in the flattened rtable for the entire plan, then varno is now pointing
202 	 * to the wrong relation and needs to be updated.
203 	 *
204 	 * Example:
205 	 * When the combineQuery field of the DistributedPlan is
206 	 * INSERT INTO local SELECT .. FROM citus_extradata_container.
207 	 * In that case the varno of citusdata_extradata_container should be 3, because
208 	 * it is preceded range table entries for "local" and the subquery.
209 	 */
210 	if (rel->relid != 1)
211 	{
212 		TargetEntry *targetEntry = NULL;
213 
214 		foreach_ptr(targetEntry, citusPath->remoteScan->custom_scan_tlist)
215 		{
216 			/* we created this list, so we know it only contains Var */
217 			Assert(IsA(targetEntry->expr, Var));
218 
219 			Var *var = (Var *) targetEntry->expr;
220 
221 			var->varno = rel->relid;
222 		}
223 	}
224 
225 	/* clauses might have been added by the planner, need to add them to our scan */
226 	RestrictInfo *restrictInfo = NULL;
227 	List **quals = &citusPath->remoteScan->scan.plan.qual;
228 	foreach_ptr(restrictInfo, clauses)
229 	{
230 		*quals = lappend(*quals, restrictInfo->clause);
231 	}
232 	return (Plan *) citusPath->remoteScan;
233 }
234 
235 
236 /*
237  * BuildSelectStatementViaStdPlanner creates a PlannedStmt where it combines the
238  * combineQuery and the remoteScan. It utilizes the standard_planner from postgres to
239  * create a plan based on the combineQuery.
240  */
241 static PlannedStmt *
BuildSelectStatementViaStdPlanner(Query * combineQuery,List * remoteScanTargetList,CustomScan * remoteScan)242 BuildSelectStatementViaStdPlanner(Query *combineQuery, List *remoteScanTargetList,
243 								  CustomScan *remoteScan)
244 {
245 	/*
246 	 * the standard planner will scribble on the target list. Since it is essential to not
247 	 * change the custom_scan_tlist we copy the target list before adding them to any.
248 	 * The remoteScanTargetList is used in the end to extract the column names to be added to
249 	 * the alias we will create for the CustomScan, (expressed as the
250 	 * citus_extradata_container function call in the combineQuery).
251 	 */
252 	remoteScan->custom_scan_tlist = copyObject(remoteScanTargetList);
253 	remoteScan->scan.plan.targetlist = copyObject(remoteScanTargetList);
254 
255 	/*
256 	 * We will overwrite the alias of the rangetable which describes the custom scan.
257 	 * Ideally we would have set the correct column names and alias on the range table in
258 	 * the combine query already when we inserted the extra data container. This could be
259 	 * improved in the future.
260 	 */
261 
262 	/* find the rangetable entry for the extradata container and overwrite its alias */
263 	RangeTblEntry *extradataContainerRTE = NULL;
264 	FindCitusExtradataContainerRTE((Node *) combineQuery, &extradataContainerRTE);
265 	if (extradataContainerRTE != NULL)
266 	{
267 		/* extract column names from the remoteScanTargetList */
268 		List *columnNameList = NIL;
269 		TargetEntry *targetEntry = NULL;
270 		foreach_ptr(targetEntry, remoteScanTargetList)
271 		{
272 			columnNameList = lappend(columnNameList, makeString(targetEntry->resname));
273 		}
274 		extradataContainerRTE->eref = makeAlias("remote_scan", columnNameList);
275 	}
276 
277 	/*
278 	 * Print the combine query at debug level 4. Since serializing the query is relatively
279 	 * cpu intensive we only perform that if we are actually logging DEBUG4.
280 	 */
281 	const int logCombineQueryLevel = DEBUG4;
282 	if (IsLoggableLevel(logCombineQueryLevel))
283 	{
284 		StringInfo queryString = makeStringInfo();
285 		pg_get_query_def(combineQuery, queryString);
286 		elog(logCombineQueryLevel, "combine query: %s", queryString->data);
287 	}
288 
289 	PlannedStmt *standardStmt = NULL;
290 	PG_TRY();
291 	{
292 		/* This code should not be re-entrant, we check via asserts below */
293 		Assert(ReplaceCitusExtraDataContainer == false);
294 		Assert(ReplaceCitusExtraDataContainerWithCustomScan == NULL);
295 		ReplaceCitusExtraDataContainer = true;
296 		ReplaceCitusExtraDataContainerWithCustomScan = remoteScan;
297 
298 		standardStmt = standard_planner_compat(combineQuery, 0, NULL);
299 
300 		ReplaceCitusExtraDataContainer = false;
301 		ReplaceCitusExtraDataContainerWithCustomScan = NULL;
302 	}
303 	PG_CATCH();
304 	{
305 		ReplaceCitusExtraDataContainer = false;
306 		ReplaceCitusExtraDataContainerWithCustomScan = NULL;
307 		PG_RE_THROW();
308 	}
309 	PG_END_TRY();
310 
311 	Assert(standardStmt != NULL);
312 	return standardStmt;
313 }
314 
315 
316 /*
317  * Finds the rangetable entry in the query that refers to the citus_extradata_container
318  * and stores the pointer in result.
319  */
320 static bool
FindCitusExtradataContainerRTE(Node * node,RangeTblEntry ** result)321 FindCitusExtradataContainerRTE(Node *node, RangeTblEntry **result)
322 {
323 	if (node == NULL)
324 	{
325 		return false;
326 	}
327 
328 	if (IsA(node, RangeTblEntry))
329 	{
330 		RangeTblEntry *rangeTblEntry = castNode(RangeTblEntry, node);
331 		if (rangeTblEntry->rtekind == RTE_FUNCTION &&
332 			list_length(rangeTblEntry->functions) == 1)
333 		{
334 			RangeTblFunction *rangeTblFunction = (RangeTblFunction *) linitial(
335 				rangeTblEntry->functions);
336 			FuncExpr *funcExpr = castNode(FuncExpr, rangeTblFunction->funcexpr);
337 			if (funcExpr->funcid == CitusExtraDataContainerFuncId())
338 			{
339 				*result = rangeTblEntry;
340 				return true;
341 			}
342 		}
343 
344 		/* query_tree_walker descends into RTEs */
345 		return false;
346 	}
347 	else if (IsA(node, Query))
348 	{
349 		const int flags = QTW_EXAMINE_RTES_BEFORE;
350 		return query_tree_walker((Query *) node, FindCitusExtradataContainerRTE, result,
351 								 flags);
352 	}
353 
354 	return expression_tree_walker(node, FindCitusExtradataContainerRTE, result);
355 }
356