1 /*-------------------------------------------------------------------------
2  *
3  * nodeRecursiveunion.c
4  *	  routines to handle RecursiveUnion nodes.
5  *
6  * To implement UNION (without ALL), we need a hashtable that stores tuples
7  * already seen.  The hash key is computed from the grouping columns.
8  *
9  *
10  * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
11  * Portions Copyright (c) 1994, Regents of the University of California
12  *
13  *
14  * IDENTIFICATION
15  *	  src/backend/executor/nodeRecursiveunion.c
16  *
17  *-------------------------------------------------------------------------
18  */
19 #include "postgres.h"
20 
21 #include "executor/execdebug.h"
22 #include "executor/nodeRecursiveunion.h"
23 #include "miscadmin.h"
24 #include "utils/memutils.h"
25 
26 
27 
28 /*
29  * Initialize the hash table to empty.
30  */
31 static void
32 build_hash_table(RecursiveUnionState *rustate)
33 {
34 	RecursiveUnion *node = (RecursiveUnion *) rustate->ps.plan;
35 	TupleDesc	desc = ExecGetResultType(outerPlanState(rustate));
36 
37 	Assert(node->numCols > 0);
38 	Assert(node->numGroups > 0);
39 
40 	rustate->hashtable = BuildTupleHashTableExt(&rustate->ps,
41 												desc,
42 												node->numCols,
43 												node->dupColIdx,
44 												rustate->eqfuncoids,
45 												rustate->hashfunctions,
46 												node->numGroups,
47 												0,
48 												rustate->ps.state->es_query_cxt,
49 												rustate->tableContext,
50 												rustate->tempContext,
51 												false);
52 }
53 
54 
55 /* ----------------------------------------------------------------
56  *		ExecRecursiveUnion(node)
57  *
58  *		Scans the recursive query sequentially and returns the next
59  *		qualifying tuple.
60  *
61  * 1. evaluate non recursive term and assign the result to RT
62  *
63  * 2. execute recursive terms
64  *
65  * 2.1 WT := RT
66  * 2.2 while WT is not empty repeat 2.3 to 2.6. if WT is empty returns RT
67  * 2.3 replace the name of recursive term with WT
68  * 2.4 evaluate the recursive term and store into WT
69  * 2.5 append WT to RT
70  * 2.6 go back to 2.2
71  * ----------------------------------------------------------------
72  */
73 static TupleTableSlot *
74 ExecRecursiveUnion(PlanState *pstate)
75 {
76 	RecursiveUnionState *node = castNode(RecursiveUnionState, pstate);
77 	PlanState  *outerPlan = outerPlanState(node);
78 	PlanState  *innerPlan = innerPlanState(node);
79 	RecursiveUnion *plan = (RecursiveUnion *) node->ps.plan;
80 	TupleTableSlot *slot;
81 	bool		isnew;
82 
83 	CHECK_FOR_INTERRUPTS();
84 
85 	/* 1. Evaluate non-recursive term */
86 	if (!node->recursing)
87 	{
88 		for (;;)
89 		{
90 			slot = ExecProcNode(outerPlan);
91 			if (TupIsNull(slot))
92 				break;
93 			if (plan->numCols > 0)
94 			{
95 				/* Find or build hashtable entry for this tuple's group */
96 				LookupTupleHashEntry(node->hashtable, slot, &isnew);
97 				/* Must reset temp context after each hashtable lookup */
98 				MemoryContextReset(node->tempContext);
99 				/* Ignore tuple if already seen */
100 				if (!isnew)
101 					continue;
102 			}
103 			/* Each non-duplicate tuple goes to the working table ... */
104 			tuplestore_puttupleslot(node->working_table, slot);
105 			/* ... and to the caller */
106 			return slot;
107 		}
108 		node->recursing = true;
109 	}
110 
111 	/* 2. Execute recursive term */
112 	for (;;)
113 	{
114 		slot = ExecProcNode(innerPlan);
115 		if (TupIsNull(slot))
116 		{
117 			/* Done if there's nothing in the intermediate table */
118 			if (node->intermediate_empty)
119 				break;
120 
121 			/* done with old working table ... */
122 			tuplestore_end(node->working_table);
123 
124 			/* intermediate table becomes working table */
125 			node->working_table = node->intermediate_table;
126 
127 			/* create new empty intermediate table */
128 			node->intermediate_table = tuplestore_begin_heap(false, false,
129 															 work_mem);
130 			node->intermediate_empty = true;
131 
132 			/* reset the recursive term */
133 			innerPlan->chgParam = bms_add_member(innerPlan->chgParam,
134 												 plan->wtParam);
135 
136 			/* and continue fetching from recursive term */
137 			continue;
138 		}
139 
140 		if (plan->numCols > 0)
141 		{
142 			/* Find or build hashtable entry for this tuple's group */
143 			LookupTupleHashEntry(node->hashtable, slot, &isnew);
144 			/* Must reset temp context after each hashtable lookup */
145 			MemoryContextReset(node->tempContext);
146 			/* Ignore tuple if already seen */
147 			if (!isnew)
148 				continue;
149 		}
150 
151 		/* Else, tuple is good; stash it in intermediate table ... */
152 		node->intermediate_empty = false;
153 		tuplestore_puttupleslot(node->intermediate_table, slot);
154 		/* ... and return it */
155 		return slot;
156 	}
157 
158 	return NULL;
159 }
160 
161 /* ----------------------------------------------------------------
162  *		ExecInitRecursiveUnionScan
163  * ----------------------------------------------------------------
164  */
165 RecursiveUnionState *
166 ExecInitRecursiveUnion(RecursiveUnion *node, EState *estate, int eflags)
167 {
168 	RecursiveUnionState *rustate;
169 	ParamExecData *prmdata;
170 
171 	/* check for unsupported flags */
172 	Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
173 
174 	/*
175 	 * create state structure
176 	 */
177 	rustate = makeNode(RecursiveUnionState);
178 	rustate->ps.plan = (Plan *) node;
179 	rustate->ps.state = estate;
180 	rustate->ps.ExecProcNode = ExecRecursiveUnion;
181 
182 	rustate->eqfuncoids = NULL;
183 	rustate->hashfunctions = NULL;
184 	rustate->hashtable = NULL;
185 	rustate->tempContext = NULL;
186 	rustate->tableContext = NULL;
187 
188 	/* initialize processing state */
189 	rustate->recursing = false;
190 	rustate->intermediate_empty = true;
191 	rustate->working_table = tuplestore_begin_heap(false, false, work_mem);
192 	rustate->intermediate_table = tuplestore_begin_heap(false, false, work_mem);
193 
194 	/*
195 	 * If hashing, we need a per-tuple memory context for comparisons, and a
196 	 * longer-lived context to store the hash table.  The table can't just be
197 	 * kept in the per-query context because we want to be able to throw it
198 	 * away when rescanning.
199 	 */
200 	if (node->numCols > 0)
201 	{
202 		rustate->tempContext =
203 			AllocSetContextCreate(CurrentMemoryContext,
204 								  "RecursiveUnion",
205 								  ALLOCSET_DEFAULT_SIZES);
206 		rustate->tableContext =
207 			AllocSetContextCreate(CurrentMemoryContext,
208 								  "RecursiveUnion hash table",
209 								  ALLOCSET_DEFAULT_SIZES);
210 	}
211 
212 	/*
213 	 * Make the state structure available to descendant WorkTableScan nodes
214 	 * via the Param slot reserved for it.
215 	 */
216 	prmdata = &(estate->es_param_exec_vals[node->wtParam]);
217 	Assert(prmdata->execPlan == NULL);
218 	prmdata->value = PointerGetDatum(rustate);
219 	prmdata->isnull = false;
220 
221 	/*
222 	 * Miscellaneous initialization
223 	 *
224 	 * RecursiveUnion plans don't have expression contexts because they never
225 	 * call ExecQual or ExecProject.
226 	 */
227 	Assert(node->plan.qual == NIL);
228 
229 	/*
230 	 * RecursiveUnion nodes still have Result slots, which hold pointers to
231 	 * tuples, so we have to initialize them.
232 	 */
233 	ExecInitResultTupleSlotTL(estate, &rustate->ps);
234 
235 	/*
236 	 * Initialize result tuple type.  (Note: we have to set up the result type
237 	 * before initializing child nodes, because nodeWorktablescan.c expects it
238 	 * to be valid.)
239 	 */
240 	rustate->ps.ps_ProjInfo = NULL;
241 
242 	/*
243 	 * initialize child nodes
244 	 */
245 	outerPlanState(rustate) = ExecInitNode(outerPlan(node), estate, eflags);
246 	innerPlanState(rustate) = ExecInitNode(innerPlan(node), estate, eflags);
247 
248 	/*
249 	 * If hashing, precompute fmgr lookup data for inner loop, and create the
250 	 * hash table.
251 	 */
252 	if (node->numCols > 0)
253 	{
254 		execTuplesHashPrepare(node->numCols,
255 							  node->dupOperators,
256 							  &rustate->eqfuncoids,
257 							  &rustate->hashfunctions);
258 		build_hash_table(rustate);
259 	}
260 
261 	return rustate;
262 }
263 
264 /* ----------------------------------------------------------------
265  *		ExecEndRecursiveUnionScan
266  *
267  *		frees any storage allocated through C routines.
268  * ----------------------------------------------------------------
269  */
270 void
271 ExecEndRecursiveUnion(RecursiveUnionState *node)
272 {
273 	/* Release tuplestores */
274 	tuplestore_end(node->working_table);
275 	tuplestore_end(node->intermediate_table);
276 
277 	/* free subsidiary stuff including hashtable */
278 	if (node->tempContext)
279 		MemoryContextDelete(node->tempContext);
280 	if (node->tableContext)
281 		MemoryContextDelete(node->tableContext);
282 
283 	/*
284 	 * clean out the upper tuple table
285 	 */
286 	ExecClearTuple(node->ps.ps_ResultTupleSlot);
287 
288 	/*
289 	 * close down subplans
290 	 */
291 	ExecEndNode(outerPlanState(node));
292 	ExecEndNode(innerPlanState(node));
293 }
294 
295 /* ----------------------------------------------------------------
296  *		ExecReScanRecursiveUnion
297  *
298  *		Rescans the relation.
299  * ----------------------------------------------------------------
300  */
301 void
302 ExecReScanRecursiveUnion(RecursiveUnionState *node)
303 {
304 	PlanState  *outerPlan = outerPlanState(node);
305 	PlanState  *innerPlan = innerPlanState(node);
306 	RecursiveUnion *plan = (RecursiveUnion *) node->ps.plan;
307 
308 	/*
309 	 * Set recursive term's chgParam to tell it that we'll modify the working
310 	 * table and therefore it has to rescan.
311 	 */
312 	innerPlan->chgParam = bms_add_member(innerPlan->chgParam, plan->wtParam);
313 
314 	/*
315 	 * if chgParam of subnode is not null then plan will be re-scanned by
316 	 * first ExecProcNode.  Because of above, we only have to do this to the
317 	 * non-recursive term.
318 	 */
319 	if (outerPlan->chgParam == NULL)
320 		ExecReScan(outerPlan);
321 
322 	/* Release any hashtable storage */
323 	if (node->tableContext)
324 		MemoryContextResetAndDeleteChildren(node->tableContext);
325 
326 	/* Empty hashtable if needed */
327 	if (plan->numCols > 0)
328 		ResetTupleHashTable(node->hashtable);
329 
330 	/* reset processing state */
331 	node->recursing = false;
332 	node->intermediate_empty = true;
333 	tuplestore_clear(node->working_table);
334 	tuplestore_clear(node->intermediate_table);
335 }
336