1 /*-------------------------------------------------------------------------
2 *
3 * combine_query_planner.c
4 * Routines for planning the combine query that runs on the coordinator
5 * to combine results from the workers.
6 *
7 * Copyright (c) Citus Data, Inc.
8 *
9 *-------------------------------------------------------------------------
10 */
11
12 #include "postgres.h"
13
14 #include "distributed/pg_version_constants.h"
15
16 #include "catalog/pg_type.h"
17 #include "distributed/citus_ruleutils.h"
18 #include "distributed/insert_select_planner.h"
19 #include "distributed/listutils.h"
20 #include "distributed/metadata_cache.h"
21 #include "distributed/combine_query_planner.h"
22 #include "distributed/multi_physical_planner.h"
23 #include "nodes/makefuncs.h"
24 #include "nodes/nodeFuncs.h"
25 #include "optimizer/clauses.h"
26 #include "optimizer/planner.h"
27 #include "rewrite/rewriteManip.h"
28
29 static List * RemoteScanTargetList(List *workerTargetList);
30 static PlannedStmt * BuildSelectStatementViaStdPlanner(Query *combineQuery,
31 List *remoteScanTargetList,
32 CustomScan *remoteScan);
33 static bool FindCitusExtradataContainerRTE(Node *node, RangeTblEntry **result);
34
35 static Plan * CitusCustomScanPathPlan(PlannerInfo *root, RelOptInfo *rel,
36 struct CustomPath *best_path, List *tlist,
37 List *clauses, List *custom_plans);
38
39 bool ReplaceCitusExtraDataContainer = false;
40 CustomScan *ReplaceCitusExtraDataContainerWithCustomScan = NULL;
41
42 /*
43 * CitusCustomScanPathMethods defines the methods for a custom path we insert into the
44 * planner during the planning of the query part that will be executed on the node
45 * coordinating the query.
46 */
47 static CustomPathMethods CitusCustomScanPathMethods = {
48 .CustomName = "CitusCustomScanPath",
49 .PlanCustomPath = CitusCustomScanPathPlan,
50 };
51
52 /*
53 * PlanCombineQuery takes in a distributed plan and a custom scan node which
54 * wraps remote part of the plan. This function finds the combine query structure
55 * in the multi plan, and builds the final select plan to execute on the tuples
56 * returned by remote scan on the coordinator node. Note that this select
57 * plan is executed after result files are retrieved from worker nodes and
58 * filled into the tuple store inside provided custom scan.
59 */
60 PlannedStmt *
PlanCombineQuery(DistributedPlan * distributedPlan,CustomScan * remoteScan)61 PlanCombineQuery(DistributedPlan *distributedPlan, CustomScan *remoteScan)
62 {
63 Query *combineQuery = distributedPlan->combineQuery;
64
65 Job *workerJob = distributedPlan->workerJob;
66 List *workerTargetList = workerJob->jobQuery->targetList;
67 List *remoteScanTargetList = RemoteScanTargetList(workerTargetList);
68 return BuildSelectStatementViaStdPlanner(combineQuery, remoteScanTargetList,
69 remoteScan);
70 }
71
72
73 /*
74 * RemoteScanTargetList uses the given worker target list's expressions, and creates
75 * a target list for the remote scan on the coordinator node.
76 */
77 static List *
RemoteScanTargetList(List * workerTargetList)78 RemoteScanTargetList(List *workerTargetList)
79 {
80 List *remoteScanTargetList = NIL;
81 const Index tableId = 1;
82 AttrNumber columnId = 1;
83
84 ListCell *workerTargetCell = NULL;
85 foreach(workerTargetCell, workerTargetList)
86 {
87 TargetEntry *workerTargetEntry = (TargetEntry *) lfirst(workerTargetCell);
88
89 if (workerTargetEntry->resjunk)
90 {
91 continue;
92 }
93
94 Var *remoteScanColumn = makeVarFromTargetEntry(tableId, workerTargetEntry);
95 remoteScanColumn->varattno = columnId;
96 remoteScanColumn->varattnosyn = columnId;
97 columnId++;
98
99 if (remoteScanColumn->vartype == RECORDOID || remoteScanColumn->vartype ==
100 RECORDARRAYOID)
101 {
102 remoteScanColumn->vartypmod = BlessRecordExpression(workerTargetEntry->expr);
103 }
104
105 /*
106 * The remote scan target entry has two pieces to it. The first piece is the
107 * target entry's expression, which we set to the newly created column.
108 * The second piece is sort and group clauses that we implicitly copy
109 * from the worker target entry. Note that any changes to worker target
110 * entry's sort and group clauses will *break* us here.
111 */
112 TargetEntry *remoteScanTargetEntry = flatCopyTargetEntry(workerTargetEntry);
113 remoteScanTargetEntry->expr = (Expr *) remoteScanColumn;
114 remoteScanTargetList = lappend(remoteScanTargetList, remoteScanTargetEntry);
115 }
116
117 return remoteScanTargetList;
118 }
119
120
121 /*
122 * CreateCitusCustomScanPath creates a custom path node that will return the CustomScan if
123 * the path ends up in the best_path during postgres planning. We use this function during
124 * the set relation hook of postgres during the planning of the query part that will be
125 * executed on the query coordinating node.
126 */
127 Path *
CreateCitusCustomScanPath(PlannerInfo * root,RelOptInfo * relOptInfo,Index restrictionIndex,RangeTblEntry * rte,CustomScan * remoteScan)128 CreateCitusCustomScanPath(PlannerInfo *root, RelOptInfo *relOptInfo,
129 Index restrictionIndex, RangeTblEntry *rte,
130 CustomScan *remoteScan)
131 {
132 CitusCustomScanPath *path = (CitusCustomScanPath *) newNode(
133 sizeof(CitusCustomScanPath), T_CustomPath);
134 path->custom_path.methods = &CitusCustomScanPathMethods;
135 path->custom_path.path.pathtype = T_CustomScan;
136 path->custom_path.path.pathtarget = relOptInfo->reltarget;
137 path->custom_path.path.parent = relOptInfo;
138
139 /*
140 * The 100k rows we put on the cost of the path is kind of arbitrary and could be
141 * improved in accuracy to produce better plans.
142 *
143 * 100k on the row estimate causes the postgres planner to behave very much like the
144 * old citus planner in the plans it produces. Namely the old planner had hardcoded
145 * the use of Hash Aggregates for most of the operations, unless a postgres guc was
146 * set that would disallow hash aggregates to be used.
147 *
148 * Ideally we would be able to provide estimates close to postgres' estimates on the
149 * workers to let the standard planner choose an optimal solution for the combineQuery.
150 */
151 path->custom_path.path.rows = 100000;
152 path->remoteScan = remoteScan;
153
154 return (Path *) path;
155 }
156
157
158 /*
159 * CitusCustomScanPathPlan is called for the CitusCustomScanPath node in the best_path
160 * after the postgres planner has evaluated all possible paths.
161 *
162 * This function returns a Plan node, more specifically the CustomScan Plan node that has
163 * the ability to execute the distributed part of the query.
164 *
165 * When this function is called there is an extra list of clauses passed in that might not
166 * already have been applied to the plan. We add these clauses to the quals this node will
167 * execute. The quals are evaluated before returning the tuples scanned from the workers
168 * to the plan above ours to make sure they do not end up in the final result.
169 */
170 static Plan *
CitusCustomScanPathPlan(PlannerInfo * root,RelOptInfo * rel,struct CustomPath * best_path,List * tlist,List * clauses,List * custom_plans)171 CitusCustomScanPathPlan(PlannerInfo *root,
172 RelOptInfo *rel,
173 struct CustomPath *best_path,
174 List *tlist,
175 List *clauses,
176 List *custom_plans)
177 {
178 CitusCustomScanPath *citusPath = (CitusCustomScanPath *) best_path;
179
180 /*
181 * Columns could have been pruned from the target list by the standard planner.
182 * A situation in which this might happen is a CASE that is proven to be always the
183 * same causing the other column to become useless;
184 * CASE WHEN ... <> NULL
185 * THEN ...
186 * ELSE ...
187 * END
188 * Since nothing is equal to NULL it will always end up in the else branch. The final
189 * target list the planenr needs from our node is passed in as tlist. By placing that
190 * as the target list on our scan the internal rows will be projected to this one.
191 */
192 citusPath->remoteScan->scan.plan.targetlist = tlist;
193
194 /*
195 * The custom_scan_tlist contains target entries for to the "output" of the call
196 * to citus_extradata_container, which is actually replaced by a CustomScan.
197 * The target entries are initialized with varno 1 (see RemoteScanTargetList), since
198 * it's currently the only relation in the join tree of the combineQuery.
199 *
200 * If the citus_extradata_container function call is not the first relation to
201 * appear in the flattened rtable for the entire plan, then varno is now pointing
202 * to the wrong relation and needs to be updated.
203 *
204 * Example:
205 * When the combineQuery field of the DistributedPlan is
206 * INSERT INTO local SELECT .. FROM citus_extradata_container.
207 * In that case the varno of citusdata_extradata_container should be 3, because
208 * it is preceded range table entries for "local" and the subquery.
209 */
210 if (rel->relid != 1)
211 {
212 TargetEntry *targetEntry = NULL;
213
214 foreach_ptr(targetEntry, citusPath->remoteScan->custom_scan_tlist)
215 {
216 /* we created this list, so we know it only contains Var */
217 Assert(IsA(targetEntry->expr, Var));
218
219 Var *var = (Var *) targetEntry->expr;
220
221 var->varno = rel->relid;
222 }
223 }
224
225 /* clauses might have been added by the planner, need to add them to our scan */
226 RestrictInfo *restrictInfo = NULL;
227 List **quals = &citusPath->remoteScan->scan.plan.qual;
228 foreach_ptr(restrictInfo, clauses)
229 {
230 *quals = lappend(*quals, restrictInfo->clause);
231 }
232 return (Plan *) citusPath->remoteScan;
233 }
234
235
236 /*
237 * BuildSelectStatementViaStdPlanner creates a PlannedStmt where it combines the
238 * combineQuery and the remoteScan. It utilizes the standard_planner from postgres to
239 * create a plan based on the combineQuery.
240 */
241 static PlannedStmt *
BuildSelectStatementViaStdPlanner(Query * combineQuery,List * remoteScanTargetList,CustomScan * remoteScan)242 BuildSelectStatementViaStdPlanner(Query *combineQuery, List *remoteScanTargetList,
243 CustomScan *remoteScan)
244 {
245 /*
246 * the standard planner will scribble on the target list. Since it is essential to not
247 * change the custom_scan_tlist we copy the target list before adding them to any.
248 * The remoteScanTargetList is used in the end to extract the column names to be added to
249 * the alias we will create for the CustomScan, (expressed as the
250 * citus_extradata_container function call in the combineQuery).
251 */
252 remoteScan->custom_scan_tlist = copyObject(remoteScanTargetList);
253 remoteScan->scan.plan.targetlist = copyObject(remoteScanTargetList);
254
255 /*
256 * We will overwrite the alias of the rangetable which describes the custom scan.
257 * Ideally we would have set the correct column names and alias on the range table in
258 * the combine query already when we inserted the extra data container. This could be
259 * improved in the future.
260 */
261
262 /* find the rangetable entry for the extradata container and overwrite its alias */
263 RangeTblEntry *extradataContainerRTE = NULL;
264 FindCitusExtradataContainerRTE((Node *) combineQuery, &extradataContainerRTE);
265 if (extradataContainerRTE != NULL)
266 {
267 /* extract column names from the remoteScanTargetList */
268 List *columnNameList = NIL;
269 TargetEntry *targetEntry = NULL;
270 foreach_ptr(targetEntry, remoteScanTargetList)
271 {
272 columnNameList = lappend(columnNameList, makeString(targetEntry->resname));
273 }
274 extradataContainerRTE->eref = makeAlias("remote_scan", columnNameList);
275 }
276
277 /*
278 * Print the combine query at debug level 4. Since serializing the query is relatively
279 * cpu intensive we only perform that if we are actually logging DEBUG4.
280 */
281 const int logCombineQueryLevel = DEBUG4;
282 if (IsLoggableLevel(logCombineQueryLevel))
283 {
284 StringInfo queryString = makeStringInfo();
285 pg_get_query_def(combineQuery, queryString);
286 elog(logCombineQueryLevel, "combine query: %s", queryString->data);
287 }
288
289 PlannedStmt *standardStmt = NULL;
290 PG_TRY();
291 {
292 /* This code should not be re-entrant, we check via asserts below */
293 Assert(ReplaceCitusExtraDataContainer == false);
294 Assert(ReplaceCitusExtraDataContainerWithCustomScan == NULL);
295 ReplaceCitusExtraDataContainer = true;
296 ReplaceCitusExtraDataContainerWithCustomScan = remoteScan;
297
298 standardStmt = standard_planner_compat(combineQuery, 0, NULL);
299
300 ReplaceCitusExtraDataContainer = false;
301 ReplaceCitusExtraDataContainerWithCustomScan = NULL;
302 }
303 PG_CATCH();
304 {
305 ReplaceCitusExtraDataContainer = false;
306 ReplaceCitusExtraDataContainerWithCustomScan = NULL;
307 PG_RE_THROW();
308 }
309 PG_END_TRY();
310
311 Assert(standardStmt != NULL);
312 return standardStmt;
313 }
314
315
316 /*
317 * Finds the rangetable entry in the query that refers to the citus_extradata_container
318 * and stores the pointer in result.
319 */
320 static bool
FindCitusExtradataContainerRTE(Node * node,RangeTblEntry ** result)321 FindCitusExtradataContainerRTE(Node *node, RangeTblEntry **result)
322 {
323 if (node == NULL)
324 {
325 return false;
326 }
327
328 if (IsA(node, RangeTblEntry))
329 {
330 RangeTblEntry *rangeTblEntry = castNode(RangeTblEntry, node);
331 if (rangeTblEntry->rtekind == RTE_FUNCTION &&
332 list_length(rangeTblEntry->functions) == 1)
333 {
334 RangeTblFunction *rangeTblFunction = (RangeTblFunction *) linitial(
335 rangeTblEntry->functions);
336 FuncExpr *funcExpr = castNode(FuncExpr, rangeTblFunction->funcexpr);
337 if (funcExpr->funcid == CitusExtraDataContainerFuncId())
338 {
339 *result = rangeTblEntry;
340 return true;
341 }
342 }
343
344 /* query_tree_walker descends into RTEs */
345 return false;
346 }
347 else if (IsA(node, Query))
348 {
349 const int flags = QTW_EXAMINE_RTES_BEFORE;
350 return query_tree_walker((Query *) node, FindCitusExtradataContainerRTE, result,
351 flags);
352 }
353
354 return expression_tree_walker(node, FindCitusExtradataContainerRTE, result);
355 }
356