1 /*------------------------------------------------------------------------- 2 * 3 * nodeMergeAppend.c 4 * routines to handle MergeAppend nodes. 5 * 6 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group 7 * Portions Copyright (c) 1994, Regents of the University of California 8 * 9 * 10 * IDENTIFICATION 11 * src/backend/executor/nodeMergeAppend.c 12 * 13 *------------------------------------------------------------------------- 14 */ 15 /* INTERFACE ROUTINES 16 * ExecInitMergeAppend - initialize the MergeAppend node 17 * ExecMergeAppend - retrieve the next tuple from the node 18 * ExecEndMergeAppend - shut down the MergeAppend node 19 * ExecReScanMergeAppend - rescan the MergeAppend node 20 * 21 * NOTES 22 * A MergeAppend node contains a list of one or more subplans. 23 * These are each expected to deliver tuples that are sorted according 24 * to a common sort key. The MergeAppend node merges these streams 25 * to produce output sorted the same way. 26 * 27 * MergeAppend nodes don't make use of their left and right 28 * subtrees, rather they maintain a list of subplans so 29 * a typical MergeAppend node looks like this in the plan tree: get(context.Context)30 * 31 * ... 32 * / 33 * MergeAppend---+------+------+--- nil 34 * / \ | | | 35 * nil nil ... ... ... 36 * subplans 37 */ 38 39 #include "postgres.h" 40 41 #include "executor/execdebug.h" 42 #include "executor/execPartition.h" 43 #include "executor/nodeMergeAppend.h" 44 #include "lib/binaryheap.h" 45 #include "miscadmin.h" 46 47 /* 48 * We have one slot for each item in the heap array. We use SlotNumber 49 * to store slot indexes. This doesn't actually provide any formal 50 * type-safety, but it makes the code more self-documenting. 51 */ 52 typedef int32 SlotNumber; 53 54 static TupleTableSlot *ExecMergeAppend(PlanState *pstate); 55 static int heap_compare_slots(Datum a, Datum b, void *arg); 56 57 58 /* ---------------------------------------------------------------- 59 * ExecInitMergeAppend 60 * 61 * Begin all of the subscans of the MergeAppend node. 62 * ---------------------------------------------------------------- 63 */ 64 MergeAppendState * 65 ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags) 66 { 67 MergeAppendState *mergestate = makeNode(MergeAppendState); 68 PlanState **mergeplanstates; 69 Bitmapset *validsubplans; 70 int nplans; 71 int i, 72 j; 73 ListCell *lc; 74 75 /* check for unsupported flags */ 76 Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK))); 77 78 /* 79 * create new MergeAppendState for our node 80 */ 81 mergestate->ps.plan = (Plan *) node; 82 mergestate->ps.state = estate; 83 mergestate->ps.ExecProcNode = ExecMergeAppend; 84 mergestate->ms_noopscan = false; 85 86 /* If run-time partition pruning is enabled, then set that up now */ 87 if (node->part_prune_info != NULL) 88 { 89 PartitionPruneState *prunestate; 90 91 /* We may need an expression context to evaluate partition exprs */ 92 ExecAssignExprContext(estate, &mergestate->ps); 93 94 prunestate = ExecCreatePartitionPruneState(&mergestate->ps, 95 node->part_prune_info); 96 mergestate->ms_prune_state = prunestate; 97 98 /* Perform an initial partition prune, if required. */ 99 if (prunestate->do_initial_prune) 100 { 101 /* Determine which subplans survive initial pruning */ 102 validsubplans = ExecFindInitialMatchingSubPlans(prunestate, 103 list_length(node->mergeplans)); 104 105 /* 106 * The case where no subplans survive pruning must be handled 107 * specially. The problem here is that code in explain.c requires 108 * a MergeAppend to have at least one subplan in order for it to 109 * properly determine the Vars in that subplan's targetlist. We 110 * sidestep this issue by just initializing the first subplan and 111 * setting ms_noopscan to true to indicate that we don't really 112 * need to scan any subnodes. 113 */ 114 if (bms_is_empty(validsubplans)) 115 { 116 mergestate->ms_noopscan = true; 117 118 /* Mark the first as valid so that it's initialized below */ 119 validsubplans = bms_make_singleton(0); 120 } 121 122 nplans = bms_num_members(validsubplans); 123 } 124 else 125 { 126 /* We'll need to initialize all subplans */ 127 nplans = list_length(node->mergeplans); 128 Assert(nplans > 0); 129 validsubplans = bms_add_range(NULL, 0, nplans - 1); 130 } 131 132 /* 133 * If no runtime pruning is required, we can fill ms_valid_subplans 134 * immediately, preventing later calls to ExecFindMatchingSubPlans. 135 */ 136 if (!prunestate->do_exec_prune) 137 { 138 Assert(nplans > 0); 139 mergestate->ms_valid_subplans = bms_add_range(NULL, 0, nplans - 1); 140 } 141 } 142 else 143 { 144 nplans = list_length(node->mergeplans); 145 146 /* 147 * When run-time partition pruning is not enabled we can just mark all 148 * subplans as valid; they must also all be initialized. 149 */ 150 Assert(nplans > 0); 151 mergestate->ms_valid_subplans = validsubplans = 152 bms_add_range(NULL, 0, nplans - 1); 153 mergestate->ms_prune_state = NULL; 154 } 155 156 mergeplanstates = (PlanState **) palloc(nplans * sizeof(PlanState *)); 157 mergestate->mergeplans = mergeplanstates; 158 mergestate->ms_nplans = nplans; 159 160 mergestate->ms_slots = (TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * nplans); 161 mergestate->ms_heap = binaryheap_allocate(nplans, heap_compare_slots, 162 mergestate); 163 164 /* 165 * Miscellaneous initialization 166 * 167 * MergeAppend nodes do have Result slots, which hold pointers to tuples, 168 * so we have to initialize them. FIXME 169 */ 170 ExecInitResultTupleSlotTL(&mergestate->ps, &TTSOpsVirtual); 171 172 /* node returns slots from each of its subnodes, therefore not fixed */ 173 mergestate->ps.resultopsset = true; 174 mergestate->ps.resultopsfixed = false; 175 176 /* 177 * call ExecInitNode on each of the valid plans to be executed and save 178 * the results into the mergeplanstates array. 179 */ 180 j = i = 0; 181 foreach(lc, node->mergeplans) 182 { 183 if (bms_is_member(i, validsubplans)) 184 { 185 Plan *initNode = (Plan *) lfirst(lc); 186 187 mergeplanstates[j++] = ExecInitNode(initNode, estate, eflags); 188 } 189 i++; 190 } 191 192 mergestate->ps.ps_ProjInfo = NULL; 193 194 /* 195 * initialize sort-key information 196 */ 197 mergestate->ms_nkeys = node->numCols; 198 mergestate->ms_sortkeys = palloc0(sizeof(SortSupportData) * node->numCols); 199 200 for (i = 0; i < node->numCols; i++) 201 { 202 SortSupport sortKey = mergestate->ms_sortkeys + i; 203 204 sortKey->ssup_cxt = CurrentMemoryContext; 205 sortKey->ssup_collation = node->collations[i]; 206 sortKey->ssup_nulls_first = node->nullsFirst[i]; 207 sortKey->ssup_attno = node->sortColIdx[i]; 208 209 /* 210 * It isn't feasible to perform abbreviated key conversion, since 211 * tuples are pulled into mergestate's binary heap as needed. It 212 * would likely be counter-productive to convert tuples into an 213 * abbreviated representation as they're pulled up, so opt out of that 214 * additional optimization entirely. 215 */ 216 sortKey->abbreviate = false; 217 218 PrepareSortSupportFromOrderingOp(node->sortOperators[i], sortKey); 219 } 220 221 /* 222 * initialize to show we have not run the subplans yet 223 */ 224 mergestate->ms_initialized = false; 225 226 return mergestate; 227 } 228 229 /* ---------------------------------------------------------------- 230 * ExecMergeAppend 231 * 232 * Handles iteration over multiple subplans. 233 * ---------------------------------------------------------------- 234 */ 235 static TupleTableSlot * 236 ExecMergeAppend(PlanState *pstate) 237 { 238 MergeAppendState *node = castNode(MergeAppendState, pstate); 239 TupleTableSlot *result; 240 SlotNumber i; 241 242 CHECK_FOR_INTERRUPTS(); 243 244 if (!node->ms_initialized) 245 { 246 /* Nothing to do if all subplans were pruned */ 247 if (node->ms_noopscan) 248 return ExecClearTuple(node->ps.ps_ResultTupleSlot); 249 250 /* 251 * If we've yet to determine the valid subplans then do so now. If 252 * run-time pruning is disabled then the valid subplans will always be 253 * set to all subplans. 254 */ 255 if (node->ms_valid_subplans == NULL) 256 node->ms_valid_subplans = 257 ExecFindMatchingSubPlans(node->ms_prune_state); 258 259 /* 260 * First time through: pull the first tuple from each valid subplan, 261 * and set up the heap. 262 */ 263 i = -1; 264 while ((i = bms_next_member(node->ms_valid_subplans, i)) >= 0) 265 { 266 node->ms_slots[i] = ExecProcNode(node->mergeplans[i]); 267 if (!TupIsNull(node->ms_slots[i])) 268 binaryheap_add_unordered(node->ms_heap, Int32GetDatum(i)); 269 } 270 binaryheap_build(node->ms_heap); 271 node->ms_initialized = true; 272 } 273 else 274 { 275 /* 276 * Otherwise, pull the next tuple from whichever subplan we returned 277 * from last time, and reinsert the subplan index into the heap, 278 * because it might now compare differently against the existing 279 * elements of the heap. (We could perhaps simplify the logic a bit 280 * by doing this before returning from the prior call, but it's better 281 * to not pull tuples until necessary.) 282 */ 283 i = DatumGetInt32(binaryheap_first(node->ms_heap)); 284 node->ms_slots[i] = ExecProcNode(node->mergeplans[i]); 285 if (!TupIsNull(node->ms_slots[i])) 286 binaryheap_replace_first(node->ms_heap, Int32GetDatum(i)); 287 else 288 (void) binaryheap_remove_first(node->ms_heap); 289 } 290 291 if (binaryheap_empty(node->ms_heap)) 292 { 293 /* All the subplans are exhausted, and so is the heap */ 294 result = ExecClearTuple(node->ps.ps_ResultTupleSlot); 295 } 296 else 297 { 298 i = DatumGetInt32(binaryheap_first(node->ms_heap)); 299 result = node->ms_slots[i]; 300 } 301 302 return result; 303 } 304 305 /* 306 * Compare the tuples in the two given slots. 307 */ 308 static int32 309 heap_compare_slots(Datum a, Datum b, void *arg) 310 { 311 MergeAppendState *node = (MergeAppendState *) arg; 312 SlotNumber slot1 = DatumGetInt32(a); 313 SlotNumber slot2 = DatumGetInt32(b); 314 315 TupleTableSlot *s1 = node->ms_slots[slot1]; 316 TupleTableSlot *s2 = node->ms_slots[slot2]; 317 int nkey; 318 319 Assert(!TupIsNull(s1)); 320 Assert(!TupIsNull(s2)); 321 322 for (nkey = 0; nkey < node->ms_nkeys; nkey++) 323 { 324 SortSupport sortKey = node->ms_sortkeys + nkey; 325 AttrNumber attno = sortKey->ssup_attno; 326 Datum datum1, 327 datum2; 328 bool isNull1, 329 isNull2; 330 int compare; 331 332 datum1 = slot_getattr(s1, attno, &isNull1); 333 datum2 = slot_getattr(s2, attno, &isNull2); 334 335 compare = ApplySortComparator(datum1, isNull1, 336 datum2, isNull2, 337 sortKey); 338 if (compare != 0) 339 { 340 INVERT_COMPARE_RESULT(compare); 341 return compare; 342 } 343 } 344 return 0; 345 } 346 347 /* ---------------------------------------------------------------- 348 * ExecEndMergeAppend 349 * 350 * Shuts down the subscans of the MergeAppend node. 351 * 352 * Returns nothing of interest. 353 * ---------------------------------------------------------------- 354 */ 355 void 356 ExecEndMergeAppend(MergeAppendState *node) 357 { 358 PlanState **mergeplans; 359 int nplans; 360 int i; 361 362 /* 363 * get information from the node 364 */ 365 mergeplans = node->mergeplans; 366 nplans = node->ms_nplans; 367 368 /* 369 * shut down each of the subscans 370 */ 371 for (i = 0; i < nplans; i++) 372 ExecEndNode(mergeplans[i]); 373 } 374 375 void 376 ExecReScanMergeAppend(MergeAppendState *node) 377 { 378 int i; 379 380 /* 381 * If any PARAM_EXEC Params used in pruning expressions have changed, then 382 * we'd better unset the valid subplans so that they are reselected for 383 * the new parameter values. 384 */ 385 if (node->ms_prune_state && 386 bms_overlap(node->ps.chgParam, 387 node->ms_prune_state->execparamids)) 388 { 389 bms_free(node->ms_valid_subplans); 390 node->ms_valid_subplans = NULL; 391 } 392 393 for (i = 0; i < node->ms_nplans; i++) 394 { 395 PlanState *subnode = node->mergeplans[i]; 396 397 /* 398 * ExecReScan doesn't know about my subplans, so I have to do 399 * changed-parameter signaling myself. 400 */ 401 if (node->ps.chgParam != NULL) 402 UpdateChangedParamSet(subnode, node->ps.chgParam); 403 404 /* 405 * If chgParam of subnode is not null then plan will be re-scanned by 406 * first ExecProcNode. 407 */ 408 if (subnode->chgParam == NULL) 409 ExecReScan(subnode); 410 } 411 binaryheap_reset(node->ms_heap); 412 node->ms_initialized = false; 413 } 414