1 /*-------------------------------------------------------------------------
2  *
3  * nodeRecursiveunion.c
4  *	  routines to handle RecursiveUnion nodes.
5  *
6  * To implement UNION (without ALL), we need a hashtable that stores tuples
7  * already seen.  The hash key is computed from the grouping columns.
8  *
9  *
10  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
11  * Portions Copyright (c) 1994, Regents of the University of California
12  *
13  *
14  * IDENTIFICATION
15  *	  src/backend/executor/nodeRecursiveunion.c
16  *
17  *-------------------------------------------------------------------------
18  */
19 #include "postgres.h"
20 
21 #include "executor/execdebug.h"
22 #include "executor/nodeRecursiveunion.h"
23 #include "miscadmin.h"
24 #include "utils/memutils.h"
25 
26 
27 
28 /*
29  * Initialize the hash table to empty.
30  */
31 static void
build_hash_table(RecursiveUnionState * rustate)32 build_hash_table(RecursiveUnionState *rustate)
33 {
34 	RecursiveUnion *node = (RecursiveUnion *) rustate->ps.plan;
35 
36 	Assert(node->numCols > 0);
37 	Assert(node->numGroups > 0);
38 
39 	rustate->hashtable = BuildTupleHashTable(node->numCols,
40 											 node->dupColIdx,
41 											 rustate->eqfunctions,
42 											 rustate->hashfunctions,
43 											 node->numGroups,
44 											 0,
45 											 rustate->tableContext,
46 											 rustate->tempContext,
47 											 false);
48 }
49 
50 
51 /* ----------------------------------------------------------------
52  *		ExecRecursiveUnion(node)
53  *
54  *		Scans the recursive query sequentially and returns the next
55  *		qualifying tuple.
56  *
57  * 1. evaluate non recursive term and assign the result to RT
58  *
59  * 2. execute recursive terms
60  *
61  * 2.1 WT := RT
62  * 2.2 while WT is not empty repeat 2.3 to 2.6. if WT is empty returns RT
63  * 2.3 replace the name of recursive term with WT
64  * 2.4 evaluate the recursive term and store into WT
65  * 2.5 append WT to RT
66  * 2.6 go back to 2.2
67  * ----------------------------------------------------------------
68  */
69 static TupleTableSlot *
ExecRecursiveUnion(PlanState * pstate)70 ExecRecursiveUnion(PlanState *pstate)
71 {
72 	RecursiveUnionState *node = castNode(RecursiveUnionState, pstate);
73 	PlanState  *outerPlan = outerPlanState(node);
74 	PlanState  *innerPlan = innerPlanState(node);
75 	RecursiveUnion *plan = (RecursiveUnion *) node->ps.plan;
76 	TupleTableSlot *slot;
77 	bool		isnew;
78 
79 	CHECK_FOR_INTERRUPTS();
80 
81 	/* 1. Evaluate non-recursive term */
82 	if (!node->recursing)
83 	{
84 		for (;;)
85 		{
86 			slot = ExecProcNode(outerPlan);
87 			if (TupIsNull(slot))
88 				break;
89 			if (plan->numCols > 0)
90 			{
91 				/* Find or build hashtable entry for this tuple's group */
92 				LookupTupleHashEntry(node->hashtable, slot, &isnew);
93 				/* Must reset temp context after each hashtable lookup */
94 				MemoryContextReset(node->tempContext);
95 				/* Ignore tuple if already seen */
96 				if (!isnew)
97 					continue;
98 			}
99 			/* Each non-duplicate tuple goes to the working table ... */
100 			tuplestore_puttupleslot(node->working_table, slot);
101 			/* ... and to the caller */
102 			return slot;
103 		}
104 		node->recursing = true;
105 	}
106 
107 	/* 2. Execute recursive term */
108 	for (;;)
109 	{
110 		slot = ExecProcNode(innerPlan);
111 		if (TupIsNull(slot))
112 		{
113 			/* Done if there's nothing in the intermediate table */
114 			if (node->intermediate_empty)
115 				break;
116 
117 			/* done with old working table ... */
118 			tuplestore_end(node->working_table);
119 
120 			/* intermediate table becomes working table */
121 			node->working_table = node->intermediate_table;
122 
123 			/* create new empty intermediate table */
124 			node->intermediate_table = tuplestore_begin_heap(false, false,
125 															 work_mem);
126 			node->intermediate_empty = true;
127 
128 			/* reset the recursive term */
129 			innerPlan->chgParam = bms_add_member(innerPlan->chgParam,
130 												 plan->wtParam);
131 
132 			/* and continue fetching from recursive term */
133 			continue;
134 		}
135 
136 		if (plan->numCols > 0)
137 		{
138 			/* Find or build hashtable entry for this tuple's group */
139 			LookupTupleHashEntry(node->hashtable, slot, &isnew);
140 			/* Must reset temp context after each hashtable lookup */
141 			MemoryContextReset(node->tempContext);
142 			/* Ignore tuple if already seen */
143 			if (!isnew)
144 				continue;
145 		}
146 
147 		/* Else, tuple is good; stash it in intermediate table ... */
148 		node->intermediate_empty = false;
149 		tuplestore_puttupleslot(node->intermediate_table, slot);
150 		/* ... and return it */
151 		return slot;
152 	}
153 
154 	return NULL;
155 }
156 
157 /* ----------------------------------------------------------------
158  *		ExecInitRecursiveUnionScan
159  * ----------------------------------------------------------------
160  */
161 RecursiveUnionState *
ExecInitRecursiveUnion(RecursiveUnion * node,EState * estate,int eflags)162 ExecInitRecursiveUnion(RecursiveUnion *node, EState *estate, int eflags)
163 {
164 	RecursiveUnionState *rustate;
165 	ParamExecData *prmdata;
166 
167 	/* check for unsupported flags */
168 	Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
169 
170 	/*
171 	 * create state structure
172 	 */
173 	rustate = makeNode(RecursiveUnionState);
174 	rustate->ps.plan = (Plan *) node;
175 	rustate->ps.state = estate;
176 	rustate->ps.ExecProcNode = ExecRecursiveUnion;
177 
178 	rustate->eqfunctions = NULL;
179 	rustate->hashfunctions = NULL;
180 	rustate->hashtable = NULL;
181 	rustate->tempContext = NULL;
182 	rustate->tableContext = NULL;
183 
184 	/* initialize processing state */
185 	rustate->recursing = false;
186 	rustate->intermediate_empty = true;
187 	rustate->working_table = tuplestore_begin_heap(false, false, work_mem);
188 	rustate->intermediate_table = tuplestore_begin_heap(false, false, work_mem);
189 
190 	/*
191 	 * If hashing, we need a per-tuple memory context for comparisons, and a
192 	 * longer-lived context to store the hash table.  The table can't just be
193 	 * kept in the per-query context because we want to be able to throw it
194 	 * away when rescanning.
195 	 */
196 	if (node->numCols > 0)
197 	{
198 		rustate->tempContext =
199 			AllocSetContextCreate(CurrentMemoryContext,
200 								  "RecursiveUnion",
201 								  ALLOCSET_DEFAULT_SIZES);
202 		rustate->tableContext =
203 			AllocSetContextCreate(CurrentMemoryContext,
204 								  "RecursiveUnion hash table",
205 								  ALLOCSET_DEFAULT_SIZES);
206 	}
207 
208 	/*
209 	 * Make the state structure available to descendant WorkTableScan nodes
210 	 * via the Param slot reserved for it.
211 	 */
212 	prmdata = &(estate->es_param_exec_vals[node->wtParam]);
213 	Assert(prmdata->execPlan == NULL);
214 	prmdata->value = PointerGetDatum(rustate);
215 	prmdata->isnull = false;
216 
217 	/*
218 	 * Miscellaneous initialization
219 	 *
220 	 * RecursiveUnion plans don't have expression contexts because they never
221 	 * call ExecQual or ExecProject.
222 	 */
223 	Assert(node->plan.qual == NIL);
224 
225 	/*
226 	 * RecursiveUnion nodes still have Result slots, which hold pointers to
227 	 * tuples, so we have to initialize them.
228 	 */
229 	ExecInitResultTupleSlot(estate, &rustate->ps);
230 
231 	/*
232 	 * Initialize result tuple type and projection info.  (Note: we have to
233 	 * set up the result type before initializing child nodes, because
234 	 * nodeWorktablescan.c expects it to be valid.)
235 	 */
236 	ExecAssignResultTypeFromTL(&rustate->ps);
237 	rustate->ps.ps_ProjInfo = NULL;
238 
239 	/*
240 	 * initialize child nodes
241 	 */
242 	outerPlanState(rustate) = ExecInitNode(outerPlan(node), estate, eflags);
243 	innerPlanState(rustate) = ExecInitNode(innerPlan(node), estate, eflags);
244 
245 	/*
246 	 * If hashing, precompute fmgr lookup data for inner loop, and create the
247 	 * hash table.
248 	 */
249 	if (node->numCols > 0)
250 	{
251 		execTuplesHashPrepare(node->numCols,
252 							  node->dupOperators,
253 							  &rustate->eqfunctions,
254 							  &rustate->hashfunctions);
255 		build_hash_table(rustate);
256 	}
257 
258 	return rustate;
259 }
260 
261 /* ----------------------------------------------------------------
262  *		ExecEndRecursiveUnionScan
263  *
264  *		frees any storage allocated through C routines.
265  * ----------------------------------------------------------------
266  */
267 void
ExecEndRecursiveUnion(RecursiveUnionState * node)268 ExecEndRecursiveUnion(RecursiveUnionState *node)
269 {
270 	/* Release tuplestores */
271 	tuplestore_end(node->working_table);
272 	tuplestore_end(node->intermediate_table);
273 
274 	/* free subsidiary stuff including hashtable */
275 	if (node->tempContext)
276 		MemoryContextDelete(node->tempContext);
277 	if (node->tableContext)
278 		MemoryContextDelete(node->tableContext);
279 
280 	/*
281 	 * clean out the upper tuple table
282 	 */
283 	ExecClearTuple(node->ps.ps_ResultTupleSlot);
284 
285 	/*
286 	 * close down subplans
287 	 */
288 	ExecEndNode(outerPlanState(node));
289 	ExecEndNode(innerPlanState(node));
290 }
291 
292 /* ----------------------------------------------------------------
293  *		ExecReScanRecursiveUnion
294  *
295  *		Rescans the relation.
296  * ----------------------------------------------------------------
297  */
298 void
ExecReScanRecursiveUnion(RecursiveUnionState * node)299 ExecReScanRecursiveUnion(RecursiveUnionState *node)
300 {
301 	PlanState  *outerPlan = outerPlanState(node);
302 	PlanState  *innerPlan = innerPlanState(node);
303 	RecursiveUnion *plan = (RecursiveUnion *) node->ps.plan;
304 
305 	/*
306 	 * Set recursive term's chgParam to tell it that we'll modify the working
307 	 * table and therefore it has to rescan.
308 	 */
309 	innerPlan->chgParam = bms_add_member(innerPlan->chgParam, plan->wtParam);
310 
311 	/*
312 	 * if chgParam of subnode is not null then plan will be re-scanned by
313 	 * first ExecProcNode.  Because of above, we only have to do this to the
314 	 * non-recursive term.
315 	 */
316 	if (outerPlan->chgParam == NULL)
317 		ExecReScan(outerPlan);
318 
319 	/* Release any hashtable storage */
320 	if (node->tableContext)
321 		MemoryContextResetAndDeleteChildren(node->tableContext);
322 
323 	/* And rebuild empty hashtable if needed */
324 	if (plan->numCols > 0)
325 		build_hash_table(node);
326 
327 	/* reset processing state */
328 	node->recursing = false;
329 	node->intermediate_empty = true;
330 	tuplestore_clear(node->working_table);
331 	tuplestore_clear(node->intermediate_table);
332 }
333