1 /*------------------------------------------------------------------------- 2 * 3 * nodeRecursiveunion.c 4 * routines to handle RecursiveUnion nodes. 5 * 6 * To implement UNION (without ALL), we need a hashtable that stores tuples 7 * already seen. The hash key is computed from the grouping columns. 8 * 9 * 10 * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group 11 * Portions Copyright (c) 1994, Regents of the University of California 12 * 13 * 14 * IDENTIFICATION 15 * src/backend/executor/nodeRecursiveunion.c 16 * 17 *------------------------------------------------------------------------- 18 */ 19 #include "postgres.h" 20 21 #include "executor/execdebug.h" 22 #include "executor/nodeRecursiveunion.h" 23 #include "miscadmin.h" 24 #include "utils/memutils.h" 25 26 27 28 /* 29 * Initialize the hash table to empty. 30 */ 31 static void 32 build_hash_table(RecursiveUnionState *rustate) 33 { 34 RecursiveUnion *node = (RecursiveUnion *) rustate->ps.plan; 35 TupleDesc desc = ExecGetResultType(outerPlanState(rustate)); 36 37 Assert(node->numCols > 0); 38 Assert(node->numGroups > 0); 39 40 rustate->hashtable = BuildTupleHashTableExt(&rustate->ps, 41 desc, 42 node->numCols, 43 node->dupColIdx, 44 rustate->eqfuncoids, 45 rustate->hashfunctions, 46 node->numGroups, 47 0, 48 rustate->ps.state->es_query_cxt, 49 rustate->tableContext, 50 rustate->tempContext, 51 false); 52 } 53 54 55 /* ---------------------------------------------------------------- 56 * ExecRecursiveUnion(node) 57 * 58 * Scans the recursive query sequentially and returns the next 59 * qualifying tuple. 60 * 61 * 1. evaluate non recursive term and assign the result to RT 62 * 63 * 2. execute recursive terms 64 * 65 * 2.1 WT := RT 66 * 2.2 while WT is not empty repeat 2.3 to 2.6. if WT is empty returns RT 67 * 2.3 replace the name of recursive term with WT 68 * 2.4 evaluate the recursive term and store into WT 69 * 2.5 append WT to RT 70 * 2.6 go back to 2.2 71 * ---------------------------------------------------------------- 72 */ 73 static TupleTableSlot * 74 ExecRecursiveUnion(PlanState *pstate) 75 { 76 RecursiveUnionState *node = castNode(RecursiveUnionState, pstate); 77 PlanState *outerPlan = outerPlanState(node); 78 PlanState *innerPlan = innerPlanState(node); 79 RecursiveUnion *plan = (RecursiveUnion *) node->ps.plan; 80 TupleTableSlot *slot; 81 bool isnew; 82 83 CHECK_FOR_INTERRUPTS(); 84 85 /* 1. Evaluate non-recursive term */ 86 if (!node->recursing) 87 { 88 for (;;) 89 { 90 slot = ExecProcNode(outerPlan); 91 if (TupIsNull(slot)) 92 break; 93 if (plan->numCols > 0) 94 { 95 /* Find or build hashtable entry for this tuple's group */ 96 LookupTupleHashEntry(node->hashtable, slot, &isnew); 97 /* Must reset temp context after each hashtable lookup */ 98 MemoryContextReset(node->tempContext); 99 /* Ignore tuple if already seen */ 100 if (!isnew) 101 continue; 102 } 103 /* Each non-duplicate tuple goes to the working table ... */ 104 tuplestore_puttupleslot(node->working_table, slot); 105 /* ... and to the caller */ 106 return slot; 107 } 108 node->recursing = true; 109 } 110 111 /* 2. Execute recursive term */ 112 for (;;) 113 { 114 slot = ExecProcNode(innerPlan); 115 if (TupIsNull(slot)) 116 { 117 /* Done if there's nothing in the intermediate table */ 118 if (node->intermediate_empty) 119 break; 120 121 /* done with old working table ... */ 122 tuplestore_end(node->working_table); 123 124 /* intermediate table becomes working table */ 125 node->working_table = node->intermediate_table; 126 127 /* create new empty intermediate table */ 128 node->intermediate_table = tuplestore_begin_heap(false, false, 129 work_mem); 130 node->intermediate_empty = true; 131 132 /* reset the recursive term */ 133 innerPlan->chgParam = bms_add_member(innerPlan->chgParam, 134 plan->wtParam); 135 136 /* and continue fetching from recursive term */ 137 continue; 138 } 139 140 if (plan->numCols > 0) 141 { 142 /* Find or build hashtable entry for this tuple's group */ 143 LookupTupleHashEntry(node->hashtable, slot, &isnew); 144 /* Must reset temp context after each hashtable lookup */ 145 MemoryContextReset(node->tempContext); 146 /* Ignore tuple if already seen */ 147 if (!isnew) 148 continue; 149 } 150 151 /* Else, tuple is good; stash it in intermediate table ... */ 152 node->intermediate_empty = false; 153 tuplestore_puttupleslot(node->intermediate_table, slot); 154 /* ... and return it */ 155 return slot; 156 } 157 158 return NULL; 159 } 160 161 /* ---------------------------------------------------------------- 162 * ExecInitRecursiveUnionScan 163 * ---------------------------------------------------------------- 164 */ 165 RecursiveUnionState * 166 ExecInitRecursiveUnion(RecursiveUnion *node, EState *estate, int eflags) 167 { 168 RecursiveUnionState *rustate; 169 ParamExecData *prmdata; 170 171 /* check for unsupported flags */ 172 Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK))); 173 174 /* 175 * create state structure 176 */ 177 rustate = makeNode(RecursiveUnionState); 178 rustate->ps.plan = (Plan *) node; 179 rustate->ps.state = estate; 180 rustate->ps.ExecProcNode = ExecRecursiveUnion; 181 182 rustate->eqfuncoids = NULL; 183 rustate->hashfunctions = NULL; 184 rustate->hashtable = NULL; 185 rustate->tempContext = NULL; 186 rustate->tableContext = NULL; 187 188 /* initialize processing state */ 189 rustate->recursing = false; 190 rustate->intermediate_empty = true; 191 rustate->working_table = tuplestore_begin_heap(false, false, work_mem); 192 rustate->intermediate_table = tuplestore_begin_heap(false, false, work_mem); 193 194 /* 195 * If hashing, we need a per-tuple memory context for comparisons, and a 196 * longer-lived context to store the hash table. The table can't just be 197 * kept in the per-query context because we want to be able to throw it 198 * away when rescanning. 199 */ 200 if (node->numCols > 0) 201 { 202 rustate->tempContext = 203 AllocSetContextCreate(CurrentMemoryContext, 204 "RecursiveUnion", 205 ALLOCSET_DEFAULT_SIZES); 206 rustate->tableContext = 207 AllocSetContextCreate(CurrentMemoryContext, 208 "RecursiveUnion hash table", 209 ALLOCSET_DEFAULT_SIZES); 210 } 211 212 /* 213 * Make the state structure available to descendant WorkTableScan nodes 214 * via the Param slot reserved for it. 215 */ 216 prmdata = &(estate->es_param_exec_vals[node->wtParam]); 217 Assert(prmdata->execPlan == NULL); 218 prmdata->value = PointerGetDatum(rustate); 219 prmdata->isnull = false; 220 221 /* 222 * Miscellaneous initialization 223 * 224 * RecursiveUnion plans don't have expression contexts because they never 225 * call ExecQual or ExecProject. 226 */ 227 Assert(node->plan.qual == NIL); 228 229 /* 230 * RecursiveUnion nodes still have Result slots, which hold pointers to 231 * tuples, so we have to initialize them. 232 */ 233 ExecInitResultTupleSlotTL(estate, &rustate->ps); 234 235 /* 236 * Initialize result tuple type. (Note: we have to set up the result type 237 * before initializing child nodes, because nodeWorktablescan.c expects it 238 * to be valid.) 239 */ 240 rustate->ps.ps_ProjInfo = NULL; 241 242 /* 243 * initialize child nodes 244 */ 245 outerPlanState(rustate) = ExecInitNode(outerPlan(node), estate, eflags); 246 innerPlanState(rustate) = ExecInitNode(innerPlan(node), estate, eflags); 247 248 /* 249 * If hashing, precompute fmgr lookup data for inner loop, and create the 250 * hash table. 251 */ 252 if (node->numCols > 0) 253 { 254 execTuplesHashPrepare(node->numCols, 255 node->dupOperators, 256 &rustate->eqfuncoids, 257 &rustate->hashfunctions); 258 build_hash_table(rustate); 259 } 260 261 return rustate; 262 } 263 264 /* ---------------------------------------------------------------- 265 * ExecEndRecursiveUnionScan 266 * 267 * frees any storage allocated through C routines. 268 * ---------------------------------------------------------------- 269 */ 270 void 271 ExecEndRecursiveUnion(RecursiveUnionState *node) 272 { 273 /* Release tuplestores */ 274 tuplestore_end(node->working_table); 275 tuplestore_end(node->intermediate_table); 276 277 /* free subsidiary stuff including hashtable */ 278 if (node->tempContext) 279 MemoryContextDelete(node->tempContext); 280 if (node->tableContext) 281 MemoryContextDelete(node->tableContext); 282 283 /* 284 * clean out the upper tuple table 285 */ 286 ExecClearTuple(node->ps.ps_ResultTupleSlot); 287 288 /* 289 * close down subplans 290 */ 291 ExecEndNode(outerPlanState(node)); 292 ExecEndNode(innerPlanState(node)); 293 } 294 295 /* ---------------------------------------------------------------- 296 * ExecReScanRecursiveUnion 297 * 298 * Rescans the relation. 299 * ---------------------------------------------------------------- 300 */ 301 void 302 ExecReScanRecursiveUnion(RecursiveUnionState *node) 303 { 304 PlanState *outerPlan = outerPlanState(node); 305 PlanState *innerPlan = innerPlanState(node); 306 RecursiveUnion *plan = (RecursiveUnion *) node->ps.plan; 307 308 /* 309 * Set recursive term's chgParam to tell it that we'll modify the working 310 * table and therefore it has to rescan. 311 */ 312 innerPlan->chgParam = bms_add_member(innerPlan->chgParam, plan->wtParam); 313 314 /* 315 * if chgParam of subnode is not null then plan will be re-scanned by 316 * first ExecProcNode. Because of above, we only have to do this to the 317 * non-recursive term. 318 */ 319 if (outerPlan->chgParam == NULL) 320 ExecReScan(outerPlan); 321 322 /* Release any hashtable storage */ 323 if (node->tableContext) 324 MemoryContextResetAndDeleteChildren(node->tableContext); 325 326 /* Empty hashtable if needed */ 327 if (plan->numCols > 0) 328 ResetTupleHashTable(node->hashtable); 329 330 /* reset processing state */ 331 node->recursing = false; 332 node->intermediate_empty = true; 333 tuplestore_clear(node->working_table); 334 tuplestore_clear(node->intermediate_table); 335 } 336