1 /*------------------------------------------------------------------------- 2 * 3 * nodeAgg.c 4 * Routines to handle aggregate nodes. 5 * 6 * ExecAgg normally evaluates each aggregate in the following steps: 7 * 8 * transvalue = initcond 9 * foreach input_tuple do 10 * transvalue = transfunc(transvalue, input_value(s)) 11 * result = finalfunc(transvalue, direct_argument(s)) 12 * 13 * If a finalfunc is not supplied then the result is just the ending 14 * value of transvalue. 15 * 16 * Other behaviors can be selected by the "aggsplit" mode, which exists 17 * to support partial aggregation. It is possible to: 18 * * Skip running the finalfunc, so that the output is always the 19 * final transvalue state. 20 * * Substitute the combinefunc for the transfunc, so that transvalue 21 * states (propagated up from a child partial-aggregation step) are merged 22 * rather than processing raw input rows. (The statements below about 23 * the transfunc apply equally to the combinefunc, when it's selected.) 24 * * Apply the serializefunc to the output values (this only makes sense 25 * when skipping the finalfunc, since the serializefunc works on the 26 * transvalue data type). 27 * * Apply the deserializefunc to the input values (this only makes sense 28 * when using the combinefunc, for similar reasons). 29 * It is the planner's responsibility to connect up Agg nodes using these 30 * alternate behaviors in a way that makes sense, with partial aggregation 31 * results being fed to nodes that expect them. 32 * 33 * If a normal aggregate call specifies DISTINCT or ORDER BY, we sort the 34 * input tuples and eliminate duplicates (if required) before performing 35 * the above-depicted process. (However, we don't do that for ordered-set 36 * aggregates; their "ORDER BY" inputs are ordinary aggregate arguments 37 * so far as this module is concerned.) Note that partial aggregation 38 * is not supported in these cases, since we couldn't ensure global 39 * ordering or distinctness of the inputs. 40 * 41 * If transfunc is marked "strict" in pg_proc and initcond is NULL, 42 * then the first non-NULL input_value is assigned directly to transvalue, 43 * and transfunc isn't applied until the second non-NULL input_value. 44 * The agg's first input type and transtype must be the same in this case! 45 * 46 * If transfunc is marked "strict" then NULL input_values are skipped, 47 * keeping the previous transvalue. If transfunc is not strict then it 48 * is called for every input tuple and must deal with NULL initcond 49 * or NULL input_values for itself. 50 * 51 * If finalfunc is marked "strict" then it is not called when the 52 * ending transvalue is NULL, instead a NULL result is created 53 * automatically (this is just the usual handling of strict functions, 54 * of course). A non-strict finalfunc can make its own choice of 55 * what to return for a NULL ending transvalue. 56 * 57 * Ordered-set aggregates are treated specially in one other way: we 58 * evaluate any "direct" arguments and pass them to the finalfunc along 59 * with the transition value. 60 * 61 * A finalfunc can have additional arguments beyond the transvalue and 62 * any "direct" arguments, corresponding to the input arguments of the 63 * aggregate. These are always just passed as NULL. Such arguments may be 64 * needed to allow resolution of a polymorphic aggregate's result type. 65 * 66 * We compute aggregate input expressions and run the transition functions 67 * in a temporary econtext (aggstate->tmpcontext). This is reset at least 68 * once per input tuple, so when the transvalue datatype is 69 * pass-by-reference, we have to be careful to copy it into a longer-lived 70 * memory context, and free the prior value to avoid memory leakage. We 71 * store transvalues in another set of econtexts, aggstate->aggcontexts 72 * (one per grouping set, see below), which are also used for the hashtable 73 * structures in AGG_HASHED mode. These econtexts are rescanned, not just 74 * reset, at group boundaries so that aggregate transition functions can 75 * register shutdown callbacks via AggRegisterCallback. 76 * 77 * The node's regular econtext (aggstate->ss.ps.ps_ExprContext) is used to 78 * run finalize functions and compute the output tuple; this context can be 79 * reset once per output tuple. 80 * 81 * The executor's AggState node is passed as the fmgr "context" value in 82 * all transfunc and finalfunc calls. It is not recommended that the 83 * transition functions look at the AggState node directly, but they can 84 * use AggCheckCallContext() to verify that they are being called by 85 * nodeAgg.c (and not as ordinary SQL functions). The main reason a 86 * transition function might want to know this is so that it can avoid 87 * palloc'ing a fixed-size pass-by-ref transition value on every call: 88 * it can instead just scribble on and return its left input. Ordinarily 89 * it is completely forbidden for functions to modify pass-by-ref inputs, 90 * but in the aggregate case we know the left input is either the initial 91 * transition value or a previous function result, and in either case its 92 * value need not be preserved. See int8inc() for an example. Notice that 93 * the EEOP_AGG_PLAIN_TRANS step is coded to avoid a data copy step when 94 * the previous transition value pointer is returned. It is also possible 95 * to avoid repeated data copying when the transition value is an expanded 96 * object: to do that, the transition function must take care to return 97 * an expanded object that is in a child context of the memory context 98 * returned by AggCheckCallContext(). Also, some transition functions want 99 * to store working state in addition to the nominal transition value; they 100 * can use the memory context returned by AggCheckCallContext() to do that. 101 * 102 * Note: AggCheckCallContext() is available as of PostgreSQL 9.0. The 103 * AggState is available as context in earlier releases (back to 8.1), 104 * but direct examination of the node is needed to use it before 9.0. 105 * 106 * As of 9.4, aggregate transition functions can also use AggGetAggref() 107 * to get hold of the Aggref expression node for their aggregate call. 108 * This is mainly intended for ordered-set aggregates, which are not 109 * supported as window functions. (A regular aggregate function would 110 * need some fallback logic to use this, since there's no Aggref node 111 * for a window function.) 112 * 113 * Grouping sets: 114 * 115 * A list of grouping sets which is structurally equivalent to a ROLLUP 116 * clause (e.g. (a,b,c), (a,b), (a)) can be processed in a single pass over 117 * ordered data. We do this by keeping a separate set of transition values 118 * for each grouping set being concurrently processed; for each input tuple 119 * we update them all, and on group boundaries we reset those states 120 * (starting at the front of the list) whose grouping values have changed 121 * (the list of grouping sets is ordered from most specific to least 122 * specific). 123 * 124 * Where more complex grouping sets are used, we break them down into 125 * "phases", where each phase has a different sort order (except phase 0 126 * which is reserved for hashing). During each phase but the last, the 127 * input tuples are additionally stored in a tuplesort which is keyed to the 128 * next phase's sort order; during each phase but the first, the input 129 * tuples are drawn from the previously sorted data. (The sorting of the 130 * data for the first phase is handled by the planner, as it might be 131 * satisfied by underlying nodes.) 132 * 133 * Hashing can be mixed with sorted grouping. To do this, we have an 134 * AGG_MIXED strategy that populates the hashtables during the first sorted 135 * phase, and switches to reading them out after completing all sort phases. 136 * We can also support AGG_HASHED with multiple hash tables and no sorting 137 * at all. 138 * 139 * From the perspective of aggregate transition and final functions, the 140 * only issue regarding grouping sets is this: a single call site (flinfo) 141 * of an aggregate function may be used for updating several different 142 * transition values in turn. So the function must not cache in the flinfo 143 * anything which logically belongs as part of the transition value (most 144 * importantly, the memory context in which the transition value exists). 145 * The support API functions (AggCheckCallContext, AggRegisterCallback) are 146 * sensitive to the grouping set for which the aggregate function is 147 * currently being called. 148 * 149 * Plan structure: 150 * 151 * What we get from the planner is actually one "real" Agg node which is 152 * part of the plan tree proper, but which optionally has an additional list 153 * of Agg nodes hung off the side via the "chain" field. This is because an 154 * Agg node happens to be a convenient representation of all the data we 155 * need for grouping sets. 156 * 157 * For many purposes, we treat the "real" node as if it were just the first 158 * node in the chain. The chain must be ordered such that hashed entries 159 * come before sorted/plain entries; the real node is marked AGG_MIXED if 160 * there are both types present (in which case the real node describes one 161 * of the hashed groupings, other AGG_HASHED nodes may optionally follow in 162 * the chain, followed in turn by AGG_SORTED or (one) AGG_PLAIN node). If 163 * the real node is marked AGG_HASHED or AGG_SORTED, then all the chained 164 * nodes must be of the same type; if it is AGG_PLAIN, there can be no 165 * chained nodes. 166 * 167 * We collect all hashed nodes into a single "phase", numbered 0, and create 168 * a sorted phase (numbered 1..n) for each AGG_SORTED or AGG_PLAIN node. 169 * Phase 0 is allocated even if there are no hashes, but remains unused in 170 * that case. 171 * 172 * AGG_HASHED nodes actually refer to only a single grouping set each, 173 * because for each hashed grouping we need a separate grpColIdx and 174 * numGroups estimate. AGG_SORTED nodes represent a "rollup", a list of 175 * grouping sets that share a sort order. Each AGG_SORTED node other than 176 * the first one has an associated Sort node which describes the sort order 177 * to be used; the first sorted node takes its input from the outer subtree, 178 * which the planner has already arranged to provide ordered data. 179 * 180 * Memory and ExprContext usage: 181 * 182 * Because we're accumulating aggregate values across input rows, we need to 183 * use more memory contexts than just simple input/output tuple contexts. 184 * In fact, for a rollup, we need a separate context for each grouping set 185 * so that we can reset the inner (finer-grained) aggregates on their group 186 * boundaries while continuing to accumulate values for outer 187 * (coarser-grained) groupings. On top of this, we might be simultaneously 188 * populating hashtables; however, we only need one context for all the 189 * hashtables. 190 * 191 * So we create an array, aggcontexts, with an ExprContext for each grouping 192 * set in the largest rollup that we're going to process, and use the 193 * per-tuple memory context of those ExprContexts to store the aggregate 194 * transition values. hashcontext is the single context created to support 195 * all hash tables. 196 * 197 * Transition / Combine function invocation: 198 * 199 * For performance reasons transition functions, including combine 200 * functions, aren't invoked one-by-one from nodeAgg.c after computing 201 * arguments using the expression evaluation engine. Instead 202 * ExecBuildAggTrans() builds one large expression that does both argument 203 * evaluation and transition function invocation. That avoids performance 204 * issues due to repeated uses of expression evaluation, complications due 205 * to filter expressions having to be evaluated early, and allows to JIT 206 * the entire expression into one native function. 207 * 208 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group 209 * Portions Copyright (c) 1994, Regents of the University of California 210 * 211 * IDENTIFICATION 212 * src/backend/executor/nodeAgg.c 213 * isWrite_Operation()214 *------------------------------------------------------------------------- 215 */ 216 217 #include "postgres.h" 218 219 #include "access/htup_details.h" 220 #include "catalog/objectaccess.h" 221 #include "catalog/pg_aggregate.h" 222 #include "catalog/pg_proc.h" 223 #include "catalog/pg_type.h" 224 #include "executor/execExpr.h" 225 #include "executor/executor.h" 226 #include "executor/nodeAgg.h" 227 #include "miscadmin.h" 228 #include "nodes/makefuncs.h" 229 #include "nodes/nodeFuncs.h" 230 #include "optimizer/optimizer.h" 231 #include "parser/parse_agg.h" 232 #include "parser/parse_coerce.h" 233 #include "utils/acl.h" 234 #include "utils/builtins.h" 235 #include "utils/lsyscache.h" 236 #include "utils/memutils.h" 237 #include "utils/syscache.h" 238 #include "utils/tuplesort.h" 239 #include "utils/datum.h" 240 241 242 static void select_current_set(AggState *aggstate, int setno, bool is_hash); 243 static void initialize_phase(AggState *aggstate, int newphase); 244 static TupleTableSlot *fetch_input_tuple(AggState *aggstate); 245 static void initialize_aggregates(AggState *aggstate, 246 AggStatePerGroup *pergroups, 247 int numReset); 248 static void advance_transition_function(AggState *aggstate, 249 AggStatePerTrans pertrans, 250 AggStatePerGroup pergroupstate); 251 static void advance_aggregates(AggState *aggstate); 252 static void process_ordered_aggregate_single(AggState *aggstate, 253 AggStatePerTrans pertrans, 254 AggStatePerGroup pergroupstate); 255 static void process_ordered_aggregate_multi(AggState *aggstate, 256 AggStatePerTrans pertrans, 257 AggStatePerGroup pergroupstate); 258 static void finalize_aggregate(AggState *aggstate, 259 AggStatePerAgg peragg, 260 AggStatePerGroup pergroupstate, 261 Datum *resultVal, bool *resultIsNull); 262 static void finalize_partialaggregate(AggState *aggstate, 263 AggStatePerAgg peragg, 264 AggStatePerGroup pergroupstate, 265 Datum *resultVal, bool *resultIsNull); 266 static void prepare_projection_slot(AggState *aggstate, 267 TupleTableSlot *slot, 268 int currentSet); 269 static void finalize_aggregates(AggState *aggstate, 270 AggStatePerAgg peragg, 271 AggStatePerGroup pergroup); 272 static TupleTableSlot *project_aggregates(AggState *aggstate); 273 static Bitmapset *find_unaggregated_cols(AggState *aggstate); 274 static bool find_unaggregated_cols_walker(Node *node, Bitmapset **colnos); 275 static void build_hash_table(AggState *aggstate); 276 static TupleHashEntryData *lookup_hash_entry(AggState *aggstate); 277 static void lookup_hash_entries(AggState *aggstate); 278 static TupleTableSlot *agg_retrieve_direct(AggState *aggstate); 279 static void agg_fill_hash_table(AggState *aggstate); 280 static TupleTableSlot *agg_retrieve_hash_table(AggState *aggstate); 281 static Datum GetAggInitVal(Datum textInitVal, Oid transtype); 282 static void build_pertrans_for_aggref(AggStatePerTrans pertrans, 283 AggState *aggstate, EState *estate, 284 Aggref *aggref, Oid aggtransfn, Oid aggtranstype, 285 Oid aggserialfn, Oid aggdeserialfn, 286 Datum initValue, bool initValueIsNull, 287 Oid *inputTypes, int numArguments); 288 static int find_compatible_peragg(Aggref *newagg, AggState *aggstate, 289 int lastaggno, List **same_input_transnos); 290 static int find_compatible_pertrans(AggState *aggstate, Aggref *newagg, 291 bool shareable, 292 Oid aggtransfn, Oid aggtranstype, 293 Oid aggserialfn, Oid aggdeserialfn, 294 Datum initValue, bool initValueIsNull, 295 List *transnos); 296 297 298 /* 299 * Select the current grouping set; affects current_set and 300 * curaggcontext. 301 */ 302 static void 303 select_current_set(AggState *aggstate, int setno, bool is_hash) 304 { 305 /* when changing this, also adapt ExecInterpExpr() and friends */ 306 if (is_hash) 307 aggstate->curaggcontext = aggstate->hashcontext; 308 else 309 aggstate->curaggcontext = aggstate->aggcontexts[setno]; 310 311 aggstate->current_set = setno; 312 } 313 314 /* 315 * Switch to phase "newphase", which must either be 0 or 1 (to reset) or 316 * current_phase + 1. Juggle the tuplesorts accordingly. 317 * 318 * Phase 0 is for hashing, which we currently handle last in the AGG_MIXED 319 * case, so when entering phase 0, all we need to do is drop open sorts. 320 */ 321 static void 322 initialize_phase(AggState *aggstate, int newphase) 323 { 324 Assert(newphase <= 1 || newphase == aggstate->current_phase + 1); 325 326 /* 327 * Whatever the previous state, we're now done with whatever input 328 * tuplesort was in use. 329 */ 330 if (aggstate->sort_in) 331 { 332 tuplesort_end(aggstate->sort_in); 333 aggstate->sort_in = NULL; 334 } 335 336 if (newphase <= 1) 337 { 338 /* 339 * Discard any existing output tuplesort. 340 */ 341 if (aggstate->sort_out) 342 { 343 tuplesort_end(aggstate->sort_out); 344 aggstate->sort_out = NULL; 345 } 346 } 347 else 348 { 349 /* 350 * The old output tuplesort becomes the new input one, and this is the 351 * right time to actually sort it. 352 */ 353 aggstate->sort_in = aggstate->sort_out; 354 aggstate->sort_out = NULL; 355 Assert(aggstate->sort_in); 356 tuplesort_performsort(aggstate->sort_in); 357 } 358 359 /* 360 * If this isn't the last phase, we need to sort appropriately for the 361 * next phase in sequence. 362 */ 363 if (newphase > 0 && newphase < aggstate->numphases - 1) 364 { 365 Sort *sortnode = aggstate->phases[newphase + 1].sortnode; 366 PlanState *outerNode = outerPlanState(aggstate); 367 TupleDesc tupDesc = ExecGetResultType(outerNode); 368 369 aggstate->sort_out = tuplesort_begin_heap(tupDesc, 370 sortnode->numCols, 371 sortnode->sortColIdx, 372 sortnode->sortOperators, 373 sortnode->collations, 374 sortnode->nullsFirst, 375 work_mem, 376 NULL, false); 377 } 378 379 aggstate->current_phase = newphase; 380 aggstate->phase = &aggstate->phases[newphase]; 381 } 382 383 /* 384 * Fetch a tuple from either the outer plan (for phase 1) or from the sorter 385 * populated by the previous phase. Copy it to the sorter for the next phase 386 * if any. 387 * 388 * Callers cannot rely on memory for tuple in returned slot remaining valid 389 * past any subsequently fetched tuple. 390 */ 391 static TupleTableSlot * 392 fetch_input_tuple(AggState *aggstate) 393 { 394 TupleTableSlot *slot; 395 396 if (aggstate->sort_in) 397 { 398 /* make sure we check for interrupts in either path through here */ 399 CHECK_FOR_INTERRUPTS(); 400 if (!tuplesort_gettupleslot(aggstate->sort_in, true, false, 401 aggstate->sort_slot, NULL)) 402 return NULL; 403 slot = aggstate->sort_slot; 404 } 405 else 406 slot = ExecProcNode(outerPlanState(aggstate)); 407 408 if (!TupIsNull(slot) && aggstate->sort_out) 409 tuplesort_puttupleslot(aggstate->sort_out, slot); 410 411 return slot; 412 } 413 414 /* 415 * (Re)Initialize an individual aggregate. 416 * 417 * This function handles only one grouping set, already set in 418 * aggstate->current_set. 419 * 420 * When called, CurrentMemoryContext should be the per-query context. 421 */ 422 static void 423 initialize_aggregate(AggState *aggstate, AggStatePerTrans pertrans, 424 AggStatePerGroup pergroupstate) 425 { 426 /* 427 * Start a fresh sort operation for each DISTINCT/ORDER BY aggregate. 428 */ 429 if (pertrans->numSortCols > 0) 430 { 431 /* 432 * In case of rescan, maybe there could be an uncompleted sort 433 * operation? Clean it up if so. 434 */ 435 if (pertrans->sortstates[aggstate->current_set]) 436 tuplesort_end(pertrans->sortstates[aggstate->current_set]); 437 438 439 /* 440 * We use a plain Datum sorter when there's a single input column; 441 * otherwise sort the full tuple. (See comments for 442 * process_ordered_aggregate_single.) 443 */ 444 if (pertrans->numInputs == 1) 445 { 446 Form_pg_attribute attr = TupleDescAttr(pertrans->sortdesc, 0); 447 448 pertrans->sortstates[aggstate->current_set] = 449 tuplesort_begin_datum(attr->atttypid, 450 pertrans->sortOperators[0], 451 pertrans->sortCollations[0], 452 pertrans->sortNullsFirst[0], 453 work_mem, NULL, false); 454 } 455 else 456 pertrans->sortstates[aggstate->current_set] = 457 tuplesort_begin_heap(pertrans->sortdesc, 458 pertrans->numSortCols, 459 pertrans->sortColIdx, 460 pertrans->sortOperators, 461 pertrans->sortCollations, 462 pertrans->sortNullsFirst, 463 work_mem, NULL, false); 464 } 465 466 /* 467 * (Re)set transValue to the initial value. 468 * 469 * Note that when the initial value is pass-by-ref, we must copy it (into 470 * the aggcontext) since we will pfree the transValue later. 471 */ 472 if (pertrans->initValueIsNull) 473 pergroupstate->transValue = pertrans->initValue; 474 else 475 { 476 MemoryContext oldContext; 477 478 oldContext = MemoryContextSwitchTo( 479 aggstate->curaggcontext->ecxt_per_tuple_memory); 480 pergroupstate->transValue = datumCopy(pertrans->initValue, 481 pertrans->transtypeByVal, 482 pertrans->transtypeLen); 483 MemoryContextSwitchTo(oldContext); 484 } 485 pergroupstate->transValueIsNull = pertrans->initValueIsNull; 486 487 /* 488 * If the initial value for the transition state doesn't exist in the 489 * pg_aggregate table then we will let the first non-NULL value returned 490 * from the outer procNode become the initial value. (This is useful for 491 * aggregates like max() and min().) The noTransValue flag signals that we 492 * still need to do this. 493 */ 494 pergroupstate->noTransValue = pertrans->initValueIsNull; 495 } 496 497 /* 498 * Initialize all aggregate transition states for a new group of input values. 499 * 500 * If there are multiple grouping sets, we initialize only the first numReset 501 * of them (the grouping sets are ordered so that the most specific one, which 502 * is reset most often, is first). As a convenience, if numReset is 0, we 503 * reinitialize all sets. 504 * 505 * NB: This cannot be used for hash aggregates, as for those the grouping set 506 * number has to be specified from further up. 507 * 508 * When called, CurrentMemoryContext should be the per-query context. 509 */ 510 static void 511 initialize_aggregates(AggState *aggstate, 512 AggStatePerGroup *pergroups, 513 int numReset) 514 { 515 int transno; 516 int numGroupingSets = Max(aggstate->phase->numsets, 1); 517 int setno = 0; 518 int numTrans = aggstate->numtrans; 519 AggStatePerTrans transstates = aggstate->pertrans; 520 521 if (numReset == 0) 522 numReset = numGroupingSets; 523 524 for (setno = 0; setno < numReset; setno++) 525 { 526 AggStatePerGroup pergroup = pergroups[setno]; 527 528 select_current_set(aggstate, setno, false); 529 530 for (transno = 0; transno < numTrans; transno++) 531 { 532 AggStatePerTrans pertrans = &transstates[transno]; 533 AggStatePerGroup pergroupstate = &pergroup[transno]; 534 535 initialize_aggregate(aggstate, pertrans, pergroupstate); 536 } 537 } 538 } 539 540 /* 541 * Given new input value(s), advance the transition function of one aggregate 542 * state within one grouping set only (already set in aggstate->current_set) 543 * 544 * The new values (and null flags) have been preloaded into argument positions 545 * 1 and up in pertrans->transfn_fcinfo, so that we needn't copy them again to 546 * pass to the transition function. We also expect that the static fields of 547 * the fcinfo are already initialized; that was done by ExecInitAgg(). 548 * 549 * It doesn't matter which memory context this is called in. 550 */ 551 static void 552 advance_transition_function(AggState *aggstate, 553 AggStatePerTrans pertrans, 554 AggStatePerGroup pergroupstate) 555 { 556 FunctionCallInfo fcinfo = pertrans->transfn_fcinfo; 557 MemoryContext oldContext; 558 Datum newVal; 559 560 if (pertrans->transfn.fn_strict) 561 { 562 /* 563 * For a strict transfn, nothing happens when there's a NULL input; we 564 * just keep the prior transValue. 565 */ 566 int numTransInputs = pertrans->numTransInputs; 567 int i; 568 569 for (i = 1; i <= numTransInputs; i++) 570 { 571 if (fcinfo->args[i].isnull) 572 return; 573 } 574 if (pergroupstate->noTransValue) 575 { 576 /* 577 * transValue has not been initialized. This is the first non-NULL 578 * input value. We use it as the initial value for transValue. (We 579 * already checked that the agg's input type is binary-compatible 580 * with its transtype, so straight copy here is OK.) 581 * 582 * We must copy the datum into aggcontext if it is pass-by-ref. We 583 * do not need to pfree the old transValue, since it's NULL. 584 */ 585 oldContext = MemoryContextSwitchTo( 586 aggstate->curaggcontext->ecxt_per_tuple_memory); 587 pergroupstate->transValue = datumCopy(fcinfo->args[1].value, 588 pertrans->transtypeByVal, 589 pertrans->transtypeLen); 590 pergroupstate->transValueIsNull = false; 591 pergroupstate->noTransValue = false; 592 MemoryContextSwitchTo(oldContext); 593 return; 594 } 595 if (pergroupstate->transValueIsNull) 596 { 597 /* 598 * Don't call a strict function with NULL inputs. Note it is 599 * possible to get here despite the above tests, if the transfn is 600 * strict *and* returned a NULL on a prior cycle. If that happens 601 * we will propagate the NULL all the way to the end. 602 */ 603 return; 604 } 605 } 606 607 /* We run the transition functions in per-input-tuple memory context */ 608 oldContext = MemoryContextSwitchTo(aggstate->tmpcontext->ecxt_per_tuple_memory); 609 610 /* set up aggstate->curpertrans for AggGetAggref() */ 611 aggstate->curpertrans = pertrans; 612 613 /* 614 * OK to call the transition function 615 */ 616 fcinfo->args[0].value = pergroupstate->transValue; 617 fcinfo->args[0].isnull = pergroupstate->transValueIsNull; 618 fcinfo->isnull = false; /* just in case transfn doesn't set it */ 619 620 newVal = FunctionCallInvoke(fcinfo); 621 622 aggstate->curpertrans = NULL; 623 624 /* 625 * If pass-by-ref datatype, must copy the new value into aggcontext and 626 * free the prior transValue. But if transfn returned a pointer to its 627 * first input, we don't need to do anything. Also, if transfn returned a 628 * pointer to a R/W expanded object that is already a child of the 629 * aggcontext, assume we can adopt that value without copying it. 630 * 631 * It's safe to compare newVal with pergroup->transValue without 632 * regard for either being NULL, because ExecAggTransReparent() 633 * takes care to set transValue to 0 when NULL. Otherwise we could 634 * end up accidentally not reparenting, when the transValue has 635 * the same numerical value as newValue, despite being NULL. This 636 * is a somewhat hot path, making it undesirable to instead solve 637 * this with another branch for the common case of the transition 638 * function returning its (modified) input argument. 639 */ 640 if (!pertrans->transtypeByVal && 641 DatumGetPointer(newVal) != DatumGetPointer(pergroupstate->transValue)) 642 newVal = ExecAggTransReparent(aggstate, pertrans, 643 newVal, fcinfo->isnull, 644 pergroupstate->transValue, 645 pergroupstate->transValueIsNull); 646 647 pergroupstate->transValue = newVal; 648 pergroupstate->transValueIsNull = fcinfo->isnull; 649 650 MemoryContextSwitchTo(oldContext); 651 } 652 653 /* 654 * Advance each aggregate transition state for one input tuple. The input 655 * tuple has been stored in tmpcontext->ecxt_outertuple, so that it is 656 * accessible to ExecEvalExpr. 657 * 658 * We have two sets of transition states to handle: one for sorted aggregation 659 * and one for hashed; we do them both here, to avoid multiple evaluation of 660 * the inputs. 661 * 662 * When called, CurrentMemoryContext should be the per-query context. 663 */ 664 static void 665 advance_aggregates(AggState *aggstate) 666 { 667 bool dummynull; 668 669 ExecEvalExprSwitchContext(aggstate->phase->evaltrans, 670 aggstate->tmpcontext, 671 &dummynull); 672 } 673 674 /* 675 * Run the transition function for a DISTINCT or ORDER BY aggregate 676 * with only one input. This is called after we have completed 677 * entering all the input values into the sort object. We complete the 678 * sort, read out the values in sorted order, and run the transition 679 * function on each value (applying DISTINCT if appropriate). 680 * 681 * Note that the strictness of the transition function was checked when 682 * entering the values into the sort, so we don't check it again here; 683 * we just apply standard SQL DISTINCT logic. 684 * 685 * The one-input case is handled separately from the multi-input case 686 * for performance reasons: for single by-value inputs, such as the 687 * common case of count(distinct id), the tuplesort_getdatum code path 688 * is around 300% faster. (The speedup for by-reference types is less 689 * but still noticeable.) 690 * 691 * This function handles only one grouping set (already set in 692 * aggstate->current_set). 693 * 694 * When called, CurrentMemoryContext should be the per-query context. 695 */ 696 static void 697 process_ordered_aggregate_single(AggState *aggstate, 698 AggStatePerTrans pertrans, 699 AggStatePerGroup pergroupstate) 700 { 701 Datum oldVal = (Datum) 0; 702 bool oldIsNull = true; 703 bool haveOldVal = false; 704 MemoryContext workcontext = aggstate->tmpcontext->ecxt_per_tuple_memory; 705 MemoryContext oldContext; 706 bool isDistinct = (pertrans->numDistinctCols > 0); 707 Datum newAbbrevVal = (Datum) 0; 708 Datum oldAbbrevVal = (Datum) 0; 709 FunctionCallInfo fcinfo = pertrans->transfn_fcinfo; 710 Datum *newVal; 711 bool *isNull; 712 713 Assert(pertrans->numDistinctCols < 2); 714 715 tuplesort_performsort(pertrans->sortstates[aggstate->current_set]); 716 717 /* Load the column into argument 1 (arg 0 will be transition value) */ 718 newVal = &fcinfo->args[1].value; 719 isNull = &fcinfo->args[1].isnull; 720 721 /* 722 * Note: if input type is pass-by-ref, the datums returned by the sort are 723 * freshly palloc'd in the per-query context, so we must be careful to 724 * pfree them when they are no longer needed. 725 */ 726 727 while (tuplesort_getdatum(pertrans->sortstates[aggstate->current_set], 728 true, newVal, isNull, &newAbbrevVal)) 729 { 730 /* 731 * Clear and select the working context for evaluation of the equality 732 * function and transition function. 733 */ 734 MemoryContextReset(workcontext); 735 oldContext = MemoryContextSwitchTo(workcontext); 736 737 /* 738 * If DISTINCT mode, and not distinct from prior, skip it. 739 */ 740 if (isDistinct && 741 haveOldVal && 742 ((oldIsNull && *isNull) || 743 (!oldIsNull && !*isNull && 744 oldAbbrevVal == newAbbrevVal && 745 DatumGetBool(FunctionCall2Coll(&pertrans->equalfnOne, 746 pertrans->aggCollation, 747 oldVal, *newVal))))) 748 { 749 /* equal to prior, so forget this one */ 750 if (!pertrans->inputtypeByVal && !*isNull) 751 pfree(DatumGetPointer(*newVal)); 752 } 753 else 754 { 755 advance_transition_function(aggstate, pertrans, pergroupstate); 756 /* forget the old value, if any */ 757 if (!oldIsNull && !pertrans->inputtypeByVal) 758 pfree(DatumGetPointer(oldVal)); isDocumentTransform_FieldTransform_TransformType()759 /* and remember the new one for subsequent equality checks */ 760 oldVal = *newVal; 761 oldAbbrevVal = newAbbrevVal; 762 oldIsNull = *isNull; 763 haveOldVal = true; 764 } 765 766 MemoryContextSwitchTo(oldContext); 767 } 768 769 if (!oldIsNull && !pertrans->inputtypeByVal) 770 pfree(DatumGetPointer(oldVal)); 771 772 tuplesort_end(pertrans->sortstates[aggstate->current_set]); 773 pertrans->sortstates[aggstate->current_set] = NULL; 774 } 775 776 /* 777 * Run the transition function for a DISTINCT or ORDER BY aggregate 778 * with more than one input. This is called after we have completed 779 * entering all the input values into the sort object. We complete the 780 * sort, read out the values in sorted order, and run the transition 781 * function on each value (applying DISTINCT if appropriate). 782 * 783 * This function handles only one grouping set (already set in 784 * aggstate->current_set). 785 * 786 * When called, CurrentMemoryContext should be the per-query context. 787 */ 788 static void 789 process_ordered_aggregate_multi(AggState *aggstate, 790 AggStatePerTrans pertrans, 791 AggStatePerGroup pergroupstate) 792 { 793 ExprContext *tmpcontext = aggstate->tmpcontext; 794 FunctionCallInfo fcinfo = pertrans->transfn_fcinfo; 795 TupleTableSlot *slot1 = pertrans->sortslot; 796 TupleTableSlot *slot2 = pertrans->uniqslot; 797 int numTransInputs = pertrans->numTransInputs; 798 int numDistinctCols = pertrans->numDistinctCols; 799 Datum newAbbrevVal = (Datum) 0; 800 Datum oldAbbrevVal = (Datum) 0; 801 bool haveOldValue = false; 802 TupleTableSlot *save = aggstate->tmpcontext->ecxt_outertuple; 803 int i; 804 805 tuplesort_performsort(pertrans->sortstates[aggstate->current_set]); 806 807 ExecClearTuple(slot1); 808 if (slot2) 809 ExecClearTuple(slot2); 810 811 while (tuplesort_gettupleslot(pertrans->sortstates[aggstate->current_set], 812 true, true, slot1, &newAbbrevVal)) 813 { 814 CHECK_FOR_INTERRUPTS(); 815 816 tmpcontext->ecxt_outertuple = slot1; 817 tmpcontext->ecxt_innertuple = slot2; 818 819 if (numDistinctCols == 0 || 820 !haveOldValue || 821 newAbbrevVal != oldAbbrevVal || 822 !ExecQual(pertrans->equalfnMulti, tmpcontext)) 823 { 824 /* 825 * Extract the first numTransInputs columns as datums to pass to 826 * the transfn. 827 */ 828 slot_getsomeattrs(slot1, numTransInputs); 829 830 /* Load values into fcinfo */ 831 /* Start from 1, since the 0th arg will be the transition value */ 832 for (i = 0; i < numTransInputs; i++) 833 { 834 fcinfo->args[i + 1].value = slot1->tts_values[i]; 835 fcinfo->args[i + 1].isnull = slot1->tts_isnull[i]; 836 } 837 838 advance_transition_function(aggstate, pertrans, pergroupstate); 839 840 if (numDistinctCols > 0) 841 { 842 /* swap the slot pointers to retain the current tuple */ 843 TupleTableSlot *tmpslot = slot2; 844 845 slot2 = slot1; 846 slot1 = tmpslot; 847 /* avoid ExecQual() calls by reusing abbreviated keys */ 848 oldAbbrevVal = newAbbrevVal; 849 haveOldValue = true; 850 } 851 } 852 853 /* Reset context each time */ 854 ResetExprContext(tmpcontext); 855 856 ExecClearTuple(slot1); 857 } 858 859 if (slot2) 860 ExecClearTuple(slot2); 861 862 tuplesort_end(pertrans->sortstates[aggstate->current_set]); 863 pertrans->sortstates[aggstate->current_set] = NULL; 864 865 /* restore previous slot, potentially in use for grouping sets */ 866 tmpcontext->ecxt_outertuple = save; 867 } 868 869 /* 870 * Compute the final value of one aggregate. 871 * 872 * This function handles only one grouping set (already set in 873 * aggstate->current_set). 874 * 875 * The finalfunction will be run, and the result delivered, in the 876 * output-tuple context; caller's CurrentMemoryContext does not matter. 877 * 878 * The finalfn uses the state as set in the transno. This also might be 879 * being used by another aggregate function, so it's important that we do 880 * nothing destructive here. 881 */ 882 static void 883 finalize_aggregate(AggState *aggstate, 884 AggStatePerAgg peragg, 885 AggStatePerGroup pergroupstate, 886 Datum *resultVal, bool *resultIsNull) 887 { 888 LOCAL_FCINFO(fcinfo, FUNC_MAX_ARGS); 889 bool anynull = false; 890 MemoryContext oldContext; 891 int i; 892 ListCell *lc; 893 AggStatePerTrans pertrans = &aggstate->pertrans[peragg->transno]; 894 895 oldContext = MemoryContextSwitchTo(aggstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory); 896 897 /* 898 * Evaluate any direct arguments. We do this even if there's no finalfn 899 * (which is unlikely anyway), so that side-effects happen as expected. 900 * The direct arguments go into arg positions 1 and up, leaving position 0 901 * for the transition state value. 902 */ 903 i = 1; 904 foreach(lc, peragg->aggdirectargs) 905 { 906 ExprState *expr = (ExprState *) lfirst(lc); 907 908 fcinfo->args[i].value = ExecEvalExpr(expr, 909 aggstate->ss.ps.ps_ExprContext, 910 &fcinfo->args[i].isnull); 911 anynull |= fcinfo->args[i].isnull; 912 i++; 913 } 914 915 /* 916 * Apply the agg's finalfn if one is provided, else return transValue. 917 */ 918 if (OidIsValid(peragg->finalfn_oid)) 919 { 920 int numFinalArgs = peragg->numFinalArgs; 921 922 /* set up aggstate->curperagg for AggGetAggref() */ 923 aggstate->curperagg = peragg; 924 925 InitFunctionCallInfoData(*fcinfo, &peragg->finalfn, 926 numFinalArgs, 927 pertrans->aggCollation, 928 (void *) aggstate, NULL); 929 930 /* Fill in the transition state value */ 931 fcinfo->args[0].value = 932 MakeExpandedObjectReadOnly(pergroupstate->transValue, 933 pergroupstate->transValueIsNull, 934 pertrans->transtypeLen); 935 fcinfo->args[0].isnull = pergroupstate->transValueIsNull; 936 anynull |= pergroupstate->transValueIsNull; 937 938 /* Fill any remaining argument positions with nulls */ 939 for (; i < numFinalArgs; i++) 940 { 941 fcinfo->args[i].value = (Datum) 0; 942 fcinfo->args[i].isnull = true; 943 anynull = true; 944 } 945 946 if (fcinfo->flinfo->fn_strict && anynull) 947 { 948 /* don't call a strict function with NULL inputs */ 949 *resultVal = (Datum) 0; 950 *resultIsNull = true; 951 } 952 else 953 { 954 *resultVal = FunctionCallInvoke(fcinfo); 955 *resultIsNull = fcinfo->isnull; 956 } 957 aggstate->curperagg = NULL; 958 } 959 else 960 { 961 /* Don't need MakeExpandedObjectReadOnly; datumCopy will copy it */ 962 *resultVal = pergroupstate->transValue; 963 *resultIsNull = pergroupstate->transValueIsNull; 964 } 965 966 /* 967 * If result is pass-by-ref, make sure it is in the right context. 968 */ 969 if (!peragg->resulttypeByVal && !*resultIsNull && 970 !MemoryContextContains(CurrentMemoryContext, 971 DatumGetPointer(*resultVal))) 972 *resultVal = datumCopy(*resultVal, 973 peragg->resulttypeByVal, 974 peragg->resulttypeLen); 975 976 MemoryContextSwitchTo(oldContext); 977 } 978 979 /* 980 * Compute the output value of one partial aggregate. 981 * 982 * The serialization function will be run, and the result delivered, in the 983 * output-tuple context; caller's CurrentMemoryContext does not matter. 984 */ 985 static void 986 finalize_partialaggregate(AggState *aggstate, 987 AggStatePerAgg peragg, 988 AggStatePerGroup pergroupstate, 989 Datum *resultVal, bool *resultIsNull) 990 { 991 AggStatePerTrans pertrans = &aggstate->pertrans[peragg->transno]; 992 MemoryContext oldContext; 993 994 oldContext = MemoryContextSwitchTo(aggstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory); 995 996 /* 997 * serialfn_oid will be set if we must serialize the transvalue before 998 * returning it 999 */ 1000 if (OidIsValid(pertrans->serialfn_oid)) 1001 { 1002 /* Don't call a strict serialization function with NULL input. */ 1003 if (pertrans->serialfn.fn_strict && pergroupstate->transValueIsNull) 1004 { 1005 *resultVal = (Datum) 0; 1006 *resultIsNull = true; 1007 } 1008 else 1009 { 1010 FunctionCallInfo fcinfo = pertrans->serialfn_fcinfo; 1011 1012 fcinfo->args[0].value = 1013 MakeExpandedObjectReadOnly(pergroupstate->transValue, 1014 pergroupstate->transValueIsNull, 1015 pertrans->transtypeLen); 1016 fcinfo->args[0].isnull = pergroupstate->transValueIsNull; 1017 fcinfo->isnull = false; 1018 1019 *resultVal = FunctionCallInvoke(fcinfo); 1020 *resultIsNull = fcinfo->isnull; 1021 } 1022 } 1023 else 1024 { 1025 /* Don't need MakeExpandedObjectReadOnly; datumCopy will copy it */ 1026 *resultVal = pergroupstate->transValue; 1027 *resultIsNull = pergroupstate->transValueIsNull; 1028 } 1029 1030 /* If result is pass-by-ref, make sure it is in the right context. */ 1031 if (!peragg->resulttypeByVal && !*resultIsNull && 1032 !MemoryContextContains(CurrentMemoryContext, 1033 DatumGetPointer(*resultVal))) 1034 *resultVal = datumCopy(*resultVal, 1035 peragg->resulttypeByVal, 1036 peragg->resulttypeLen); 1037 1038 MemoryContextSwitchTo(oldContext); 1039 } 1040 1041 /* 1042 * Prepare to finalize and project based on the specified representative tuple 1043 * slot and grouping set. 1044 * 1045 * In the specified tuple slot, force to null all attributes that should be 1046 * read as null in the context of the current grouping set. Also stash the 1047 * current group bitmap where GroupingExpr can get at it. 1048 * 1049 * This relies on three conditions: 1050 * 1051 * 1) Nothing is ever going to try and extract the whole tuple from this slot, 1052 * only reference it in evaluations, which will only access individual 1053 * attributes. 1054 * 1055 * 2) No system columns are going to need to be nulled. (If a system column is 1056 * referenced in a group clause, it is actually projected in the outer plan 1057 * tlist.) 1058 * 1059 * 3) Within a given phase, we never need to recover the value of an attribute 1060 * once it has been set to null. 1061 * 1062 * Poking into the slot this way is a bit ugly, but the consensus is that the 1063 * alternative was worse. 1064 */ 1065 static void 1066 prepare_projection_slot(AggState *aggstate, TupleTableSlot *slot, int currentSet) 1067 { 1068 if (aggstate->phase->grouped_cols) 1069 { 1070 Bitmapset *grouped_cols = aggstate->phase->grouped_cols[currentSet]; 1071 1072 aggstate->grouped_cols = grouped_cols; 1073 1074 if (TTS_EMPTY(slot)) 1075 { 1076 /* 1077 * Force all values to be NULL if working on an empty input tuple 1078 * (i.e. an empty grouping set for which no input rows were 1079 * supplied). 1080 */ 1081 ExecStoreAllNullTuple(slot); 1082 } 1083 else if (aggstate->all_grouped_cols) 1084 { 1085 ListCell *lc; 1086 1087 /* all_grouped_cols is arranged in desc order */ 1088 slot_getsomeattrs(slot, linitial_int(aggstate->all_grouped_cols)); 1089 1090 foreach(lc, aggstate->all_grouped_cols) 1091 { 1092 int attnum = lfirst_int(lc); 1093 1094 if (!bms_is_member(attnum, grouped_cols)) 1095 slot->tts_isnull[attnum - 1] = true; 1096 } 1097 } 1098 } 1099 } 1100 1101 /* 1102 * Compute the final value of all aggregates for one group. 1103 * 1104 * This function handles only one grouping set at a time, which the caller must 1105 * have selected. It's also the caller's responsibility to adjust the supplied 1106 * pergroup parameter to point to the current set's transvalues. 1107 * 1108 * Results are stored in the output econtext aggvalues/aggnulls. 1109 */ 1110 static void 1111 finalize_aggregates(AggState *aggstate, 1112 AggStatePerAgg peraggs, 1113 AggStatePerGroup pergroup) 1114 { 1115 ExprContext *econtext = aggstate->ss.ps.ps_ExprContext; 1116 Datum *aggvalues = econtext->ecxt_aggvalues; 1117 bool *aggnulls = econtext->ecxt_aggnulls; 1118 int aggno; 1119 int transno; 1120 1121 /* 1122 * If there were any DISTINCT and/or ORDER BY aggregates, sort their 1123 * inputs and run the transition functions. 1124 */ 1125 for (transno = 0; transno < aggstate->numtrans; transno++) 1126 { 1127 AggStatePerTrans pertrans = &aggstate->pertrans[transno]; 1128 AggStatePerGroup pergroupstate; 1129 1130 pergroupstate = &pergroup[transno]; 1131 1132 if (pertrans->numSortCols > 0) 1133 { 1134 Assert(aggstate->aggstrategy != AGG_HASHED && 1135 aggstate->aggstrategy != AGG_MIXED); 1136 1137 if (pertrans->numInputs == 1) 1138 process_ordered_aggregate_single(aggstate, 1139 pertrans, 1140 pergroupstate); 1141 else 1142 process_ordered_aggregate_multi(aggstate, 1143 pertrans, 1144 pergroupstate); 1145 } 1146 } 1147 1148 /* 1149 * Run the final functions. 1150 */ 1151 for (aggno = 0; aggno < aggstate->numaggs; aggno++) 1152 { 1153 AggStatePerAgg peragg = &peraggs[aggno]; 1154 int transno = peragg->transno; 1155 AggStatePerGroup pergroupstate; 1156 1157 pergroupstate = &pergroup[transno]; 1158 1159 if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit)) 1160 finalize_partialaggregate(aggstate, peragg, pergroupstate, 1161 &aggvalues[aggno], &aggnulls[aggno]); 1162 else 1163 finalize_aggregate(aggstate, peragg, pergroupstate, 1164 &aggvalues[aggno], &aggnulls[aggno]); 1165 } 1166 } 1167 1168 /* 1169 * Project the result of a group (whose aggs have already been calculated by 1170 * finalize_aggregates). Returns the result slot, or NULL if no row is 1171 * projected (suppressed by qual). 1172 */ 1173 static TupleTableSlot * 1174 project_aggregates(AggState *aggstate) 1175 { 1176 ExprContext *econtext = aggstate->ss.ps.ps_ExprContext; 1177 1178 /* 1179 * Check the qual (HAVING clause); if the group does not match, ignore it. 1180 */ 1181 if (ExecQual(aggstate->ss.ps.qual, econtext)) 1182 { 1183 /* 1184 * Form and return projection tuple using the aggregate results and 1185 * the representative input tuple. 1186 */ 1187 return ExecProject(aggstate->ss.ps.ps_ProjInfo); 1188 } 1189 else 1190 InstrCountFiltered1(aggstate, 1); 1191 1192 return NULL; 1193 } 1194 1195 /* 1196 * find_unaggregated_cols 1197 * Construct a bitmapset of the column numbers of un-aggregated Vars 1198 * appearing in our targetlist and qual (HAVING clause) 1199 */ 1200 static Bitmapset * 1201 find_unaggregated_cols(AggState *aggstate) 1202 { 1203 Agg *node = (Agg *) aggstate->ss.ps.plan; 1204 Bitmapset *colnos; 1205 1206 colnos = NULL; 1207 (void) find_unaggregated_cols_walker((Node *) node->plan.targetlist, 1208 &colnos); 1209 (void) find_unaggregated_cols_walker((Node *) node->plan.qual, 1210 &colnos); 1211 return colnos; 1212 } 1213 1214 static bool 1215 find_unaggregated_cols_walker(Node *node, Bitmapset **colnos) 1216 { 1217 if (node == NULL) 1218 return false; 1219 if (IsA(node, Var)) 1220 { 1221 Var *var = (Var *) node; 1222 1223 /* setrefs.c should have set the varno to OUTER_VAR */ 1224 Assert(var->varno == OUTER_VAR); 1225 Assert(var->varlevelsup == 0); 1226 *colnos = bms_add_member(*colnos, var->varattno); 1227 return false; 1228 } 1229 if (IsA(node, Aggref) ||IsA(node, GroupingFunc)) 1230 { 1231 /* do not descend into aggregate exprs */ 1232 return false; 1233 } 1234 return expression_tree_walker(node, find_unaggregated_cols_walker, 1235 (void *) colnos); 1236 } 1237 1238 /* 1239 * (Re-)initialize the hash table(s) to empty. 1240 * 1241 * To implement hashed aggregation, we need a hashtable that stores a 1242 * representative tuple and an array of AggStatePerGroup structs for each 1243 * distinct set of GROUP BY column values. We compute the hash key from the 1244 * GROUP BY columns. The per-group data is allocated in lookup_hash_entry(), 1245 * for each entry. 1246 * 1247 * We have a separate hashtable and associated perhash data structure for each 1248 * grouping set for which we're doing hashing. 1249 * 1250 * The contents of the hash tables always live in the hashcontext's per-tuple 1251 * memory context (there is only one of these for all tables together, since 1252 * they are all reset at the same time). 1253 */ 1254 static void 1255 build_hash_table(AggState *aggstate) 1256 { 1257 MemoryContext tmpmem = aggstate->tmpcontext->ecxt_per_tuple_memory; 1258 Size additionalsize; 1259 int i; 1260 1261 Assert(aggstate->aggstrategy == AGG_HASHED || aggstate->aggstrategy == AGG_MIXED); 1262 1263 additionalsize = aggstate->numtrans * sizeof(AggStatePerGroupData); 1264 1265 for (i = 0; i < aggstate->num_hashes; ++i) 1266 { 1267 AggStatePerHash perhash = &aggstate->perhash[i]; 1268 1269 Assert(perhash->aggnode->numGroups > 0); 1270 1271 if (perhash->hashtable) 1272 ResetTupleHashTable(perhash->hashtable); 1273 else 1274 perhash->hashtable = BuildTupleHashTableExt(&aggstate->ss.ps, 1275 perhash->hashslot->tts_tupleDescriptor, 1276 perhash->numCols, 1277 perhash->hashGrpColIdxHash, 1278 perhash->eqfuncoids, 1279 perhash->hashfunctions, 1280 perhash->aggnode->grpCollations, 1281 perhash->aggnode->numGroups, 1282 additionalsize, 1283 aggstate->ss.ps.state->es_query_cxt, 1284 aggstate->hashcontext->ecxt_per_tuple_memory, 1285 tmpmem, 1286 DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit)); 1287 } 1288 } 1289 1290 /* 1291 * Compute columns that actually need to be stored in hashtable entries. The 1292 * incoming tuples from the child plan node will contain grouping columns, 1293 * other columns referenced in our targetlist and qual, columns used to 1294 * compute the aggregate functions, and perhaps just junk columns we don't use 1295 * at all. Only columns of the first two types need to be stored in the 1296 * hashtable, and getting rid of the others can make the table entries 1297 * significantly smaller. The hashtable only contains the relevant columns, 1298 * and is packed/unpacked in lookup_hash_entry() / agg_retrieve_hash_table() 1299 * into the format of the normal input descriptor. 1300 * 1301 * Additional columns, in addition to the columns grouped by, come from two 1302 * sources: Firstly functionally dependent columns that we don't need to group 1303 * by themselves, and secondly ctids for row-marks. 1304 * 1305 * To eliminate duplicates, we build a bitmapset of the needed columns, and 1306 * then build an array of the columns included in the hashtable. We might 1307 * still have duplicates if the passed-in grpColIdx has them, which can happen 1308 * in edge cases from semijoins/distinct; these can't always be removed, 1309 * because it's not certain that the duplicate cols will be using the same 1310 * hash function. 1311 * 1312 * Note that the array is preserved over ExecReScanAgg, so we allocate it in 1313 * the per-query context (unlike the hash table itself). 1314 */ 1315 static void 1316 find_hash_columns(AggState *aggstate) 1317 { 1318 Bitmapset *base_colnos; 1319 List *outerTlist = outerPlanState(aggstate)->plan->targetlist; 1320 int numHashes = aggstate->num_hashes; 1321 EState *estate = aggstate->ss.ps.state; 1322 int j; 1323 1324 /* Find Vars that will be needed in tlist and qual */ 1325 base_colnos = find_unaggregated_cols(aggstate); 1326 1327 for (j = 0; j < numHashes; ++j) 1328 { 1329 AggStatePerHash perhash = &aggstate->perhash[j]; 1330 Bitmapset *colnos = bms_copy(base_colnos); 1331 AttrNumber *grpColIdx = perhash->aggnode->grpColIdx; 1332 List *hashTlist = NIL; 1333 TupleDesc hashDesc; 1334 int maxCols; 1335 int i; 1336 1337 perhash->largestGrpColIdx = 0; 1338 1339 /* 1340 * If we're doing grouping sets, then some Vars might be referenced in 1341 * tlist/qual for the benefit of other grouping sets, but not needed 1342 * when hashing; i.e. prepare_projection_slot will null them out, so 1343 * there'd be no point storing them. Use prepare_projection_slot's 1344 * logic to determine which. 1345 */ 1346 if (aggstate->phases[0].grouped_cols) 1347 { 1348 Bitmapset *grouped_cols = aggstate->phases[0].grouped_cols[j]; 1349 ListCell *lc; 1350 1351 foreach(lc, aggstate->all_grouped_cols) 1352 { 1353 int attnum = lfirst_int(lc); 1354 1355 if (!bms_is_member(attnum, grouped_cols)) 1356 colnos = bms_del_member(colnos, attnum); 1357 } 1358 } 1359 1360 /* 1361 * Compute maximum number of input columns accounting for possible 1362 * duplications in the grpColIdx array, which can happen in some edge 1363 * cases where HashAggregate was generated as part of a semijoin or a 1364 * DISTINCT. 1365 */ 1366 maxCols = bms_num_members(colnos) + perhash->numCols; 1367 1368 perhash->hashGrpColIdxInput = 1369 palloc(maxCols * sizeof(AttrNumber)); 1370 perhash->hashGrpColIdxHash = 1371 palloc(perhash->numCols * sizeof(AttrNumber)); 1372 1373 /* Add all the grouping columns to colnos */ 1374 for (i = 0; i < perhash->numCols; i++) 1375 colnos = bms_add_member(colnos, grpColIdx[i]); 1376 1377 /* 1378 * First build mapping for columns directly hashed. These are the 1379 * first, because they'll be accessed when computing hash values and 1380 * comparing tuples for exact matches. We also build simple mapping 1381 * for execGrouping, so it knows where to find the to-be-hashed / 1382 * compared columns in the input. 1383 */ 1384 for (i = 0; i < perhash->numCols; i++) 1385 { 1386 perhash->hashGrpColIdxInput[i] = grpColIdx[i]; 1387 perhash->hashGrpColIdxHash[i] = i + 1; 1388 perhash->numhashGrpCols++; 1389 /* delete already mapped columns */ 1390 bms_del_member(colnos, grpColIdx[i]); 1391 } 1392 1393 /* and add the remaining columns */ 1394 while ((i = bms_first_member(colnos)) >= 0) 1395 { 1396 perhash->hashGrpColIdxInput[perhash->numhashGrpCols] = i; 1397 perhash->numhashGrpCols++; 1398 } 1399 1400 /* and build a tuple descriptor for the hashtable */ 1401 for (i = 0; i < perhash->numhashGrpCols; i++) 1402 { 1403 int varNumber = perhash->hashGrpColIdxInput[i] - 1; 1404 1405 hashTlist = lappend(hashTlist, list_nth(outerTlist, varNumber)); 1406 perhash->largestGrpColIdx = 1407 Max(varNumber + 1, perhash->largestGrpColIdx); 1408 } 1409 1410 hashDesc = ExecTypeFromTL(hashTlist); 1411 1412 execTuplesHashPrepare(perhash->numCols, 1413 perhash->aggnode->grpOperators, 1414 &perhash->eqfuncoids, 1415 &perhash->hashfunctions); 1416 perhash->hashslot = 1417 ExecAllocTableSlot(&estate->es_tupleTable, hashDesc, 1418 &TTSOpsMinimalTuple); 1419 1420 list_free(hashTlist); 1421 bms_free(colnos); 1422 } 1423 1424 bms_free(base_colnos); 1425 } 1426 1427 /* 1428 * Estimate per-hash-table-entry overhead for the planner. 1429 * 1430 * Note that the estimate does not include space for pass-by-reference 1431 * transition data values, nor for the representative tuple of each group. 1432 * Nor does this account of the target fill-factor and growth policy of the 1433 * hash table. 1434 */ 1435 Size 1436 hash_agg_entry_size(int numAggs) 1437 { 1438 Size entrysize; 1439 1440 /* This must match build_hash_table */ 1441 entrysize = sizeof(TupleHashEntryData) + 1442 numAggs * sizeof(AggStatePerGroupData); 1443 entrysize = MAXALIGN(entrysize); 1444 1445 return entrysize; 1446 } 1447 1448 /* 1449 * Find or create a hashtable entry for the tuple group containing the current 1450 * tuple (already set in tmpcontext's outertuple slot), in the current grouping 1451 * set (which the caller must have selected - note that initialize_aggregate 1452 * depends on this). 1453 * 1454 * When called, CurrentMemoryContext should be the per-query context. 1455 */ 1456 static TupleHashEntryData * 1457 lookup_hash_entry(AggState *aggstate) 1458 { 1459 TupleTableSlot *inputslot = aggstate->tmpcontext->ecxt_outertuple; 1460 AggStatePerHash perhash = &aggstate->perhash[aggstate->current_set]; 1461 TupleTableSlot *hashslot = perhash->hashslot; 1462 TupleHashEntryData *entry; 1463 bool isnew; 1464 int i; 1465 1466 /* transfer just the needed columns into hashslot */ 1467 slot_getsomeattrs(inputslot, perhash->largestGrpColIdx); 1468 ExecClearTuple(hashslot); 1469 1470 for (i = 0; i < perhash->numhashGrpCols; i++) 1471 { 1472 int varNumber = perhash->hashGrpColIdxInput[i] - 1; 1473 1474 hashslot->tts_values[i] = inputslot->tts_values[varNumber]; 1475 hashslot->tts_isnull[i] = inputslot->tts_isnull[varNumber]; 1476 } 1477 ExecStoreVirtualTuple(hashslot); 1478 1479 /* find or create the hashtable entry using the filtered tuple */ 1480 entry = LookupTupleHashEntry(perhash->hashtable, hashslot, &isnew); 1481 1482 if (isnew) 1483 { 1484 AggStatePerGroup pergroup; 1485 int transno; 1486 1487 pergroup = (AggStatePerGroup) 1488 MemoryContextAlloc(perhash->hashtable->tablecxt, 1489 sizeof(AggStatePerGroupData) * aggstate->numtrans); 1490 entry->additional = pergroup; 1491 1492 /* 1493 * Initialize aggregates for new tuple group, lookup_hash_entries() 1494 * already has selected the relevant grouping set. 1495 */ 1496 for (transno = 0; transno < aggstate->numtrans; transno++) 1497 { 1498 AggStatePerTrans pertrans = &aggstate->pertrans[transno]; 1499 AggStatePerGroup pergroupstate = &pergroup[transno]; 1500 1501 initialize_aggregate(aggstate, pertrans, pergroupstate); 1502 } 1503 } 1504 1505 return entry; 1506 } 1507 1508 /* 1509 * Look up hash entries for the current tuple in all hashed grouping sets, 1510 * returning an array of pergroup pointers suitable for advance_aggregates. 1511 * 1512 * Be aware that lookup_hash_entry can reset the tmpcontext. 1513 */ 1514 static void 1515 lookup_hash_entries(AggState *aggstate) 1516 { 1517 int numHashes = aggstate->num_hashes; 1518 AggStatePerGroup *pergroup = aggstate->hash_pergroup; 1519 int setno; 1520 1521 for (setno = 0; setno < numHashes; setno++) 1522 { 1523 select_current_set(aggstate, setno, true); 1524 pergroup[setno] = lookup_hash_entry(aggstate)->additional; 1525 } 1526 } 1527 1528 /* 1529 * ExecAgg - 1530 * 1531 * ExecAgg receives tuples from its outer subplan and aggregates over 1532 * the appropriate attribute for each aggregate function use (Aggref 1533 * node) appearing in the targetlist or qual of the node. The number 1534 * of tuples to aggregate over depends on whether grouped or plain 1535 * aggregation is selected. In grouped aggregation, we produce a result 1536 * row for each group; in plain aggregation there's a single result row 1537 * for the whole query. In either case, the value of each aggregate is 1538 * stored in the expression context to be used when ExecProject evaluates 1539 * the result tuple. 1540 */ 1541 static TupleTableSlot * 1542 ExecAgg(PlanState *pstate) 1543 { 1544 AggState *node = castNode(AggState, pstate); 1545 TupleTableSlot *result = NULL; 1546 1547 CHECK_FOR_INTERRUPTS(); 1548 1549 if (!node->agg_done) 1550 { 1551 /* Dispatch based on strategy */ 1552 switch (node->phase->aggstrategy) 1553 { 1554 case AGG_HASHED: 1555 if (!node->table_filled) 1556 agg_fill_hash_table(node); 1557 /* FALLTHROUGH */ 1558 case AGG_MIXED: 1559 result = agg_retrieve_hash_table(node); 1560 break; 1561 case AGG_PLAIN: 1562 case AGG_SORTED: 1563 result = agg_retrieve_direct(node); 1564 break; 1565 } 1566 1567 if (!TupIsNull(result)) 1568 return result; 1569 } 1570 1571 return NULL; 1572 } 1573 1574 /* 1575 * ExecAgg for non-hashed case 1576 */ 1577 static TupleTableSlot * 1578 agg_retrieve_direct(AggState *aggstate) 1579 { 1580 Agg *node = aggstate->phase->aggnode; 1581 ExprContext *econtext; 1582 ExprContext *tmpcontext; 1583 AggStatePerAgg peragg; 1584 AggStatePerGroup *pergroups; 1585 TupleTableSlot *outerslot; 1586 TupleTableSlot *firstSlot; 1587 TupleTableSlot *result; 1588 bool hasGroupingSets = aggstate->phase->numsets > 0; 1589 int numGroupingSets = Max(aggstate->phase->numsets, 1); 1590 int currentSet; 1591 int nextSetSize; 1592 int numReset; 1593 int i; 1594 1595 /* 1596 * get state info from node 1597 * 1598 * econtext is the per-output-tuple expression context 1599 * 1600 * tmpcontext is the per-input-tuple expression context 1601 */ 1602 econtext = aggstate->ss.ps.ps_ExprContext; 1603 tmpcontext = aggstate->tmpcontext; 1604 1605 peragg = aggstate->peragg; 1606 pergroups = aggstate->pergroups; 1607 firstSlot = aggstate->ss.ss_ScanTupleSlot; 1608 1609 /* 1610 * We loop retrieving groups until we find one matching 1611 * aggstate->ss.ps.qual 1612 * 1613 * For grouping sets, we have the invariant that aggstate->projected_set 1614 * is either -1 (initial call) or the index (starting from 0) in 1615 * gset_lengths for the group we just completed (either by projecting a 1616 * row or by discarding it in the qual). 1617 */ 1618 while (!aggstate->agg_done) 1619 { 1620 /* 1621 * Clear the per-output-tuple context for each group, as well as 1622 * aggcontext (which contains any pass-by-ref transvalues of the old 1623 * group). Some aggregate functions store working state in child 1624 * contexts; those now get reset automatically without us needing to 1625 * do anything special. 1626 * 1627 * We use ReScanExprContext not just ResetExprContext because we want 1628 * any registered shutdown callbacks to be called. That allows 1629 * aggregate functions to ensure they've cleaned up any non-memory 1630 * resources. 1631 */ 1632 ReScanExprContext(econtext); 1633 1634 /* 1635 * Determine how many grouping sets need to be reset at this boundary. 1636 */ 1637 if (aggstate->projected_set >= 0 && 1638 aggstate->projected_set < numGroupingSets) 1639 numReset = aggstate->projected_set + 1; 1640 else 1641 numReset = numGroupingSets; 1642 1643 /* 1644 * numReset can change on a phase boundary, but that's OK; we want to 1645 * reset the contexts used in _this_ phase, and later, after possibly 1646 * changing phase, initialize the right number of aggregates for the 1647 * _new_ phase. 1648 */ 1649 1650 for (i = 0; i < numReset; i++) 1651 { 1652 ReScanExprContext(aggstate->aggcontexts[i]); 1653 } 1654 1655 /* 1656 * Check if input is complete and there are no more groups to project 1657 * in this phase; move to next phase or mark as done. 1658 */ 1659 if (aggstate->input_done == true && 1660 aggstate->projected_set >= (numGroupingSets - 1)) 1661 { 1662 if (aggstate->current_phase < aggstate->numphases - 1) 1663 { 1664 initialize_phase(aggstate, aggstate->current_phase + 1); 1665 aggstate->input_done = false; 1666 aggstate->projected_set = -1; 1667 numGroupingSets = Max(aggstate->phase->numsets, 1); 1668 node = aggstate->phase->aggnode; 1669 numReset = numGroupingSets; 1670 } 1671 else if (aggstate->aggstrategy == AGG_MIXED) 1672 { 1673 /* 1674 * Mixed mode; we've output all the grouped stuff and have 1675 * full hashtables, so switch to outputting those. 1676 */ 1677 initialize_phase(aggstate, 0); 1678 aggstate->table_filled = true; 1679 ResetTupleHashIterator(aggstate->perhash[0].hashtable, 1680 &aggstate->perhash[0].hashiter); 1681 select_current_set(aggstate, 0, true); 1682 return agg_retrieve_hash_table(aggstate); 1683 } 1684 else 1685 { 1686 aggstate->agg_done = true; 1687 break; 1688 } 1689 } 1690 1691 /* 1692 * Get the number of columns in the next grouping set after the last 1693 * projected one (if any). This is the number of columns to compare to 1694 * see if we reached the boundary of that set too. 1695 */ 1696 if (aggstate->projected_set >= 0 && 1697 aggstate->projected_set < (numGroupingSets - 1)) 1698 nextSetSize = aggstate->phase->gset_lengths[aggstate->projected_set + 1]; 1699 else 1700 nextSetSize = 0; 1701 1702 /*---------- 1703 * If a subgroup for the current grouping set is present, project it. 1704 * 1705 * We have a new group if: 1706 * - we're out of input but haven't projected all grouping sets 1707 * (checked above) 1708 * OR 1709 * - we already projected a row that wasn't from the last grouping 1710 * set 1711 * AND 1712 * - the next grouping set has at least one grouping column (since 1713 * empty grouping sets project only once input is exhausted) 1714 * AND 1715 * - the previous and pending rows differ on the grouping columns 1716 * of the next grouping set 1717 *---------- 1718 */ 1719 tmpcontext->ecxt_innertuple = econtext->ecxt_outertuple; 1720 if (aggstate->input_done || 1721 (node->aggstrategy != AGG_PLAIN && 1722 aggstate->projected_set != -1 && 1723 aggstate->projected_set < (numGroupingSets - 1) && 1724 nextSetSize > 0 && 1725 !ExecQualAndReset(aggstate->phase->eqfunctions[nextSetSize - 1], 1726 tmpcontext))) 1727 { 1728 aggstate->projected_set += 1; 1729 1730 Assert(aggstate->projected_set < numGroupingSets); 1731 Assert(nextSetSize > 0 || aggstate->input_done); 1732 } 1733 else 1734 { 1735 /* 1736 * We no longer care what group we just projected, the next 1737 * projection will always be the first (or only) grouping set 1738 * (unless the input proves to be empty). 1739 */ 1740 aggstate->projected_set = 0; 1741 1742 /* 1743 * If we don't already have the first tuple of the new group, 1744 * fetch it from the outer plan. 1745 */ 1746 if (aggstate->grp_firstTuple == NULL) 1747 { 1748 outerslot = fetch_input_tuple(aggstate); 1749 if (!TupIsNull(outerslot)) 1750 { 1751 /* 1752 * Make a copy of the first input tuple; we will use this 1753 * for comparisons (in group mode) and for projection. 1754 */ 1755 aggstate->grp_firstTuple = ExecCopySlotHeapTuple(outerslot); 1756 } 1757 else 1758 { 1759 /* outer plan produced no tuples at all */ 1760 if (hasGroupingSets) 1761 { 1762 /* 1763 * If there was no input at all, we need to project 1764 * rows only if there are grouping sets of size 0. 1765 * Note that this implies that there can't be any 1766 * references to ungrouped Vars, which would otherwise 1767 * cause issues with the empty output slot. 1768 * 1769 * XXX: This is no longer true, we currently deal with 1770 * this in finalize_aggregates(). 1771 */ 1772 aggstate->input_done = true; 1773 1774 while (aggstate->phase->gset_lengths[aggstate->projected_set] > 0) 1775 { 1776 aggstate->projected_set += 1; 1777 if (aggstate->projected_set >= numGroupingSets) 1778 { 1779 /* 1780 * We can't set agg_done here because we might 1781 * have more phases to do, even though the 1782 * input is empty. So we need to restart the 1783 * whole outer loop. 1784 */ 1785 break; 1786 } 1787 } 1788 1789 if (aggstate->projected_set >= numGroupingSets) 1790 continue; 1791 } 1792 else 1793 { 1794 aggstate->agg_done = true; 1795 /* If we are grouping, we should produce no tuples too */ 1796 if (node->aggstrategy != AGG_PLAIN) 1797 return NULL; 1798 } 1799 } 1800 } 1801 1802 /* 1803 * Initialize working state for a new input tuple group. 1804 */ 1805 initialize_aggregates(aggstate, pergroups, numReset); 1806 1807 if (aggstate->grp_firstTuple != NULL) 1808 { 1809 /* 1810 * Store the copied first input tuple in the tuple table slot 1811 * reserved for it. The tuple will be deleted when it is 1812 * cleared from the slot. 1813 */ 1814 ExecForceStoreHeapTuple(aggstate->grp_firstTuple, 1815 firstSlot, true); 1816 aggstate->grp_firstTuple = NULL; /* don't keep two pointers */ 1817 1818 /* set up for first advance_aggregates call */ 1819 tmpcontext->ecxt_outertuple = firstSlot; 1820 1821 /* 1822 * Process each outer-plan tuple, and then fetch the next one, 1823 * until we exhaust the outer plan or cross a group boundary. 1824 */ 1825 for (;;) 1826 { 1827 /* 1828 * During phase 1 only of a mixed agg, we need to update 1829 * hashtables as well in advance_aggregates. 1830 */ 1831 if (aggstate->aggstrategy == AGG_MIXED && 1832 aggstate->current_phase == 1) 1833 { 1834 lookup_hash_entries(aggstate); 1835 } 1836 1837 /* Advance the aggregates (or combine functions) */ 1838 advance_aggregates(aggstate); 1839 1840 /* Reset per-input-tuple context after each tuple */ 1841 ResetExprContext(tmpcontext); 1842 1843 outerslot = fetch_input_tuple(aggstate); 1844 if (TupIsNull(outerslot)) 1845 { 1846 /* no more outer-plan tuples available */ 1847 if (hasGroupingSets) 1848 { 1849 aggstate->input_done = true; 1850 break; 1851 } 1852 else 1853 { 1854 aggstate->agg_done = true; 1855 break; 1856 } 1857 } 1858 /* set up for next advance_aggregates call */ 1859 tmpcontext->ecxt_outertuple = outerslot; 1860 1861 /* 1862 * If we are grouping, check whether we've crossed a group 1863 * boundary. 1864 */ 1865 if (node->aggstrategy != AGG_PLAIN) 1866 { 1867 tmpcontext->ecxt_innertuple = firstSlot; 1868 if (!ExecQual(aggstate->phase->eqfunctions[node->numCols - 1], 1869 tmpcontext)) 1870 { 1871 aggstate->grp_firstTuple = ExecCopySlotHeapTuple(outerslot); 1872 break; 1873 } 1874 } 1875 } 1876 } 1877 1878 /* 1879 * Use the representative input tuple for any references to 1880 * non-aggregated input columns in aggregate direct args, the node 1881 * qual, and the tlist. (If we are not grouping, and there are no 1882 * input rows at all, we will come here with an empty firstSlot 1883 * ... but if not grouping, there can't be any references to 1884 * non-aggregated input columns, so no problem.) 1885 */ 1886 econtext->ecxt_outertuple = firstSlot; 1887 } 1888 1889 Assert(aggstate->projected_set >= 0); 1890 1891 currentSet = aggstate->projected_set; 1892 1893 prepare_projection_slot(aggstate, econtext->ecxt_outertuple, currentSet); 1894 1895 select_current_set(aggstate, currentSet, false); 1896 1897 finalize_aggregates(aggstate, 1898 peragg, 1899 pergroups[currentSet]); 1900 1901 /* 1902 * If there's no row to project right now, we must continue rather 1903 * than returning a null since there might be more groups. 1904 */ 1905 result = project_aggregates(aggstate); 1906 if (result) 1907 return result; 1908 } 1909 1910 /* No more groups */ 1911 return NULL; 1912 } 1913 1914 /* 1915 * ExecAgg for hashed case: read input and build hash table 1916 */ 1917 static void 1918 agg_fill_hash_table(AggState *aggstate) 1919 { 1920 TupleTableSlot *outerslot; 1921 ExprContext *tmpcontext = aggstate->tmpcontext; 1922 1923 /* 1924 * Process each outer-plan tuple, and then fetch the next one, until we 1925 * exhaust the outer plan. 1926 */ 1927 for (;;) 1928 { 1929 outerslot = fetch_input_tuple(aggstate); 1930 if (TupIsNull(outerslot)) 1931 break; 1932 1933 /* set up for lookup_hash_entries and advance_aggregates */ 1934 tmpcontext->ecxt_outertuple = outerslot; 1935 1936 /* Find or build hashtable entries */ 1937 lookup_hash_entries(aggstate); 1938 1939 /* Advance the aggregates (or combine functions) */ 1940 advance_aggregates(aggstate); 1941 1942 /* 1943 * Reset per-input-tuple context after each tuple, but note that the 1944 * hash lookups do this too 1945 */ 1946 ResetExprContext(aggstate->tmpcontext); 1947 } 1948 1949 aggstate->table_filled = true; 1950 /* Initialize to walk the first hash table */ 1951 select_current_set(aggstate, 0, true); 1952 ResetTupleHashIterator(aggstate->perhash[0].hashtable, 1953 &aggstate->perhash[0].hashiter); 1954 } 1955 1956 /* 1957 * ExecAgg for hashed case: retrieving groups from hash table 1958 */ 1959 static TupleTableSlot * 1960 agg_retrieve_hash_table(AggState *aggstate) 1961 { 1962 ExprContext *econtext; 1963 AggStatePerAgg peragg; 1964 AggStatePerGroup pergroup; 1965 TupleHashEntryData *entry; 1966 TupleTableSlot *firstSlot; 1967 TupleTableSlot *result; 1968 AggStatePerHash perhash; 1969 1970 /* 1971 * get state info from node. 1972 * 1973 * econtext is the per-output-tuple expression context. 1974 */ 1975 econtext = aggstate->ss.ps.ps_ExprContext; 1976 peragg = aggstate->peragg; 1977 firstSlot = aggstate->ss.ss_ScanTupleSlot; 1978 1979 /* 1980 * Note that perhash (and therefore anything accessed through it) can 1981 * change inside the loop, as we change between grouping sets. 1982 */ 1983 perhash = &aggstate->perhash[aggstate->current_set]; 1984 1985 /* 1986 * We loop retrieving groups until we find one satisfying 1987 * aggstate->ss.ps.qual 1988 */ 1989 while (!aggstate->agg_done) 1990 { 1991 TupleTableSlot *hashslot = perhash->hashslot; 1992 int i; 1993 1994 CHECK_FOR_INTERRUPTS(); 1995 1996 /* 1997 * Find the next entry in the hash table 1998 */ 1999 entry = ScanTupleHashTable(perhash->hashtable, &perhash->hashiter); 2000 if (entry == NULL) 2001 { 2002 int nextset = aggstate->current_set + 1; 2003 2004 if (nextset < aggstate->num_hashes) 2005 { 2006 /* 2007 * Switch to next grouping set, reinitialize, and restart the 2008 * loop. 2009 */ 2010 select_current_set(aggstate, nextset, true); 2011 2012 perhash = &aggstate->perhash[aggstate->current_set]; 2013 2014 ResetTupleHashIterator(perhash->hashtable, &perhash->hashiter); 2015 2016 continue; 2017 } 2018 else 2019 { 2020 /* No more hashtables, so done */ 2021 aggstate->agg_done = true; 2022 return NULL; 2023 } 2024 } 2025 2026 /* 2027 * Clear the per-output-tuple context for each group 2028 * 2029 * We intentionally don't use ReScanExprContext here; if any aggs have 2030 * registered shutdown callbacks, they mustn't be called yet, since we 2031 * might not be done with that agg. 2032 */ 2033 ResetExprContext(econtext); 2034 2035 /* 2036 * Transform representative tuple back into one with the right 2037 * columns. 2038 */ 2039 ExecStoreMinimalTuple(entry->firstTuple, hashslot, false); 2040 slot_getallattrs(hashslot); 2041 2042 ExecClearTuple(firstSlot); 2043 memset(firstSlot->tts_isnull, true, 2044 firstSlot->tts_tupleDescriptor->natts * sizeof(bool)); 2045 2046 for (i = 0; i < perhash->numhashGrpCols; i++) 2047 { 2048 int varNumber = perhash->hashGrpColIdxInput[i] - 1; 2049 2050 firstSlot->tts_values[varNumber] = hashslot->tts_values[i]; 2051 firstSlot->tts_isnull[varNumber] = hashslot->tts_isnull[i]; 2052 } 2053 ExecStoreVirtualTuple(firstSlot); 2054 2055 pergroup = (AggStatePerGroup) entry->additional; 2056 2057 /* 2058 * Use the representative input tuple for any references to 2059 * non-aggregated input columns in the qual and tlist. 2060 */ 2061 econtext->ecxt_outertuple = firstSlot; 2062 2063 prepare_projection_slot(aggstate, 2064 econtext->ecxt_outertuple, 2065 aggstate->current_set); 2066 2067 finalize_aggregates(aggstate, peragg, pergroup); 2068 2069 result = project_aggregates(aggstate); 2070 if (result) 2071 return result; 2072 } 2073 2074 /* No more groups */ 2075 return NULL; 2076 } 2077 2078 /* ----------------- 2079 * ExecInitAgg 2080 * 2081 * Creates the run-time information for the agg node produced by the 2082 * planner and initializes its outer subtree. 2083 * 2084 * ----------------- 2085 */ 2086 AggState * 2087 ExecInitAgg(Agg *node, EState *estate, int eflags) 2088 { 2089 AggState *aggstate; 2090 AggStatePerAgg peraggs; 2091 AggStatePerTrans pertransstates; 2092 AggStatePerGroup *pergroups; 2093 Plan *outerPlan; 2094 ExprContext *econtext; 2095 TupleDesc scanDesc; 2096 int numaggs, 2097 transno, 2098 aggno; 2099 int phase; 2100 int phaseidx; 2101 ListCell *l; 2102 Bitmapset *all_grouped_cols = NULL; 2103 int numGroupingSets = 1; 2104 int numPhases; 2105 int numHashes; 2106 int i = 0; 2107 int j = 0; 2108 bool use_hashing = (node->aggstrategy == AGG_HASHED || 2109 node->aggstrategy == AGG_MIXED); 2110 2111 /* check for unsupported flags */ 2112 Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK))); 2113 2114 /* 2115 * create state structure 2116 */ 2117 aggstate = makeNode(AggState); 2118 aggstate->ss.ps.plan = (Plan *) node; 2119 aggstate->ss.ps.state = estate; 2120 aggstate->ss.ps.ExecProcNode = ExecAgg; 2121 2122 aggstate->aggs = NIL; 2123 aggstate->numaggs = 0; 2124 aggstate->numtrans = 0; 2125 aggstate->aggstrategy = node->aggstrategy; 2126 aggstate->aggsplit = node->aggsplit; 2127 aggstate->maxsets = 0; 2128 aggstate->projected_set = -1; 2129 aggstate->current_set = 0; 2130 aggstate->peragg = NULL; 2131 aggstate->pertrans = NULL; 2132 aggstate->curperagg = NULL; 2133 aggstate->curpertrans = NULL; 2134 aggstate->input_done = false; 2135 aggstate->agg_done = false; 2136 aggstate->pergroups = NULL; 2137 aggstate->grp_firstTuple = NULL; 2138 aggstate->sort_in = NULL; 2139 aggstate->sort_out = NULL; 2140 2141 /* 2142 * phases[0] always exists, but is dummy in sorted/plain mode 2143 */ 2144 numPhases = (use_hashing ? 1 : 2); 2145 numHashes = (use_hashing ? 1 : 0); 2146 2147 /* 2148 * Calculate the maximum number of grouping sets in any phase; this 2149 * determines the size of some allocations. Also calculate the number of 2150 * phases, since all hashed/mixed nodes contribute to only a single phase. 2151 */ 2152 if (node->groupingSets) 2153 { 2154 numGroupingSets = list_length(node->groupingSets); 2155 2156 foreach(l, node->chain) 2157 { 2158 Agg *agg = lfirst(l); 2159 2160 numGroupingSets = Max(numGroupingSets, 2161 list_length(agg->groupingSets)); 2162 2163 /* 2164 * additional AGG_HASHED aggs become part of phase 0, but all 2165 * others add an extra phase. 2166 */ 2167 if (agg->aggstrategy != AGG_HASHED) 2168 ++numPhases; 2169 else 2170 ++numHashes; 2171 } 2172 } 2173 2174 aggstate->maxsets = numGroupingSets; 2175 aggstate->numphases = numPhases; 2176 2177 aggstate->aggcontexts = (ExprContext **) 2178 palloc0(sizeof(ExprContext *) * numGroupingSets); 2179 2180 /* 2181 * Create expression contexts. We need three or more, one for 2182 * per-input-tuple processing, one for per-output-tuple processing, one 2183 * for all the hashtables, and one for each grouping set. The per-tuple 2184 * memory context of the per-grouping-set ExprContexts (aggcontexts) 2185 * replaces the standalone memory context formerly used to hold transition 2186 * values. We cheat a little by using ExecAssignExprContext() to build 2187 * all of them. 2188 * 2189 * NOTE: the details of what is stored in aggcontexts and what is stored 2190 * in the regular per-query memory context are driven by a simple 2191 * decision: we want to reset the aggcontext at group boundaries (if not 2192 * hashing) and in ExecReScanAgg to recover no-longer-wanted space. 2193 */ 2194 ExecAssignExprContext(estate, &aggstate->ss.ps); 2195 aggstate->tmpcontext = aggstate->ss.ps.ps_ExprContext; 2196 2197 for (i = 0; i < numGroupingSets; ++i) 2198 { 2199 ExecAssignExprContext(estate, &aggstate->ss.ps); 2200 aggstate->aggcontexts[i] = aggstate->ss.ps.ps_ExprContext; 2201 } 2202 2203 if (use_hashing) 2204 { 2205 ExecAssignExprContext(estate, &aggstate->ss.ps); 2206 aggstate->hashcontext = aggstate->ss.ps.ps_ExprContext; 2207 } 2208 2209 ExecAssignExprContext(estate, &aggstate->ss.ps); 2210 2211 /* 2212 * Initialize child nodes. 2213 * 2214 * If we are doing a hashed aggregation then the child plan does not need 2215 * to handle REWIND efficiently; see ExecReScanAgg. 2216 */ 2217 if (node->aggstrategy == AGG_HASHED) 2218 eflags &= ~EXEC_FLAG_REWIND; 2219 outerPlan = outerPlan(node); 2220 outerPlanState(aggstate) = ExecInitNode(outerPlan, estate, eflags); 2221 2222 /* 2223 * initialize source tuple type. 2224 */ 2225 aggstate->ss.ps.outerops = 2226 ExecGetResultSlotOps(outerPlanState(&aggstate->ss), 2227 &aggstate->ss.ps.outeropsfixed); 2228 aggstate->ss.ps.outeropsset = true; 2229 2230 ExecCreateScanSlotFromOuterPlan(estate, &aggstate->ss, 2231 aggstate->ss.ps.outerops); 2232 scanDesc = aggstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor; 2233 2234 /* 2235 * If there are more than two phases (including a potential dummy phase 2236 * 0), input will be resorted using tuplesort. Need a slot for that. 2237 */ 2238 if (numPhases > 2) 2239 { 2240 aggstate->sort_slot = ExecInitExtraTupleSlot(estate, scanDesc, 2241 &TTSOpsMinimalTuple); 2242 2243 /* 2244 * The output of the tuplesort, and the output from the outer child 2245 * might not use the same type of slot. In most cases the child will 2246 * be a Sort, and thus return a TTSOpsMinimalTuple type slot - but the 2247 * input can also be be presorted due an index, in which case it could 2248 * be a different type of slot. 2249 * 2250 * XXX: For efficiency it would be good to instead/additionally 2251 * generate expressions with corresponding settings of outerops* for 2252 * the individual phases - deforming is often a bottleneck for 2253 * aggregations with lots of rows per group. If there's multiple 2254 * sorts, we know that all but the first use TTSOpsMinimalTuple (via 2255 * the nodeAgg.c internal tuplesort). 2256 */ 2257 if (aggstate->ss.ps.outeropsfixed && 2258 aggstate->ss.ps.outerops != &TTSOpsMinimalTuple) 2259 aggstate->ss.ps.outeropsfixed = false; 2260 } 2261 2262 /* 2263 * Initialize result type, slot and projection. 2264 */ 2265 ExecInitResultTupleSlotTL(&aggstate->ss.ps, &TTSOpsVirtual); 2266 ExecAssignProjectionInfo(&aggstate->ss.ps, NULL); 2267 2268 /* 2269 * initialize child expressions 2270 * 2271 * We expect the parser to have checked that no aggs contain other agg 2272 * calls in their arguments (and just to be sure, we verify it again while 2273 * initializing the plan node). This would make no sense under SQL 2274 * semantics, and it's forbidden by the spec. Because it is true, we 2275 * don't need to worry about evaluating the aggs in any particular order. 2276 * 2277 * Note: execExpr.c finds Aggrefs for us, and adds their AggrefExprState 2278 * nodes to aggstate->aggs. Aggrefs in the qual are found here; Aggrefs 2279 * in the targetlist are found during ExecAssignProjectionInfo, below. 2280 */ 2281 aggstate->ss.ps.qual = 2282 ExecInitQual(node->plan.qual, (PlanState *) aggstate); 2283 2284 /* 2285 * We should now have found all Aggrefs in the targetlist and quals. 2286 */ 2287 numaggs = aggstate->numaggs; 2288 Assert(numaggs == list_length(aggstate->aggs)); 2289 2290 /* 2291 * For each phase, prepare grouping set data and fmgr lookup data for 2292 * compare functions. Accumulate all_grouped_cols in passing. 2293 */ 2294 aggstate->phases = palloc0(numPhases * sizeof(AggStatePerPhaseData)); 2295 2296 aggstate->num_hashes = numHashes; 2297 if (numHashes) 2298 { 2299 aggstate->perhash = palloc0(sizeof(AggStatePerHashData) * numHashes); 2300 aggstate->phases[0].numsets = 0; 2301 aggstate->phases[0].gset_lengths = palloc(numHashes * sizeof(int)); 2302 aggstate->phases[0].grouped_cols = palloc(numHashes * sizeof(Bitmapset *)); 2303 } 2304 2305 phase = 0; 2306 for (phaseidx = 0; phaseidx <= list_length(node->chain); ++phaseidx) 2307 { 2308 Agg *aggnode; 2309 Sort *sortnode; 2310 2311 if (phaseidx > 0) 2312 { 2313 aggnode = list_nth_node(Agg, node->chain, phaseidx - 1); 2314 sortnode = castNode(Sort, aggnode->plan.lefttree); 2315 } 2316 else 2317 { 2318 aggnode = node; 2319 sortnode = NULL; 2320 } 2321 2322 Assert(phase <= 1 || sortnode); 2323 2324 if (aggnode->aggstrategy == AGG_HASHED 2325 || aggnode->aggstrategy == AGG_MIXED) 2326 { 2327 AggStatePerPhase phasedata = &aggstate->phases[0]; 2328 AggStatePerHash perhash; 2329 Bitmapset *cols = NULL; 2330 2331 Assert(phase == 0); 2332 i = phasedata->numsets++; 2333 perhash = &aggstate->perhash[i]; 2334 2335 /* phase 0 always points to the "real" Agg in the hash case */ 2336 phasedata->aggnode = node; 2337 phasedata->aggstrategy = node->aggstrategy; 2338 2339 /* but the actual Agg node representing this hash is saved here */ 2340 perhash->aggnode = aggnode; 2341 2342 phasedata->gset_lengths[i] = perhash->numCols = aggnode->numCols; 2343 2344 for (j = 0; j < aggnode->numCols; ++j) 2345 cols = bms_add_member(cols, aggnode->grpColIdx[j]); 2346 2347 phasedata->grouped_cols[i] = cols; 2348 2349 all_grouped_cols = bms_add_members(all_grouped_cols, cols); 2350 continue; 2351 } 2352 else 2353 { 2354 AggStatePerPhase phasedata = &aggstate->phases[++phase]; 2355 int num_sets; 2356 2357 phasedata->numsets = num_sets = list_length(aggnode->groupingSets); 2358 2359 if (num_sets) 2360 { 2361 phasedata->gset_lengths = palloc(num_sets * sizeof(int)); 2362 phasedata->grouped_cols = palloc(num_sets * sizeof(Bitmapset *)); 2363 2364 i = 0; 2365 foreach(l, aggnode->groupingSets) 2366 { 2367 int current_length = list_length(lfirst(l)); 2368 Bitmapset *cols = NULL; 2369 2370 /* planner forces this to be correct */ 2371 for (j = 0; j < current_length; ++j) 2372 cols = bms_add_member(cols, aggnode->grpColIdx[j]); 2373 2374 phasedata->grouped_cols[i] = cols; 2375 phasedata->gset_lengths[i] = current_length; 2376 2377 ++i; 2378 } 2379 2380 all_grouped_cols = bms_add_members(all_grouped_cols, 2381 phasedata->grouped_cols[0]); 2382 } 2383 else 2384 { 2385 Assert(phaseidx == 0); 2386 2387 phasedata->gset_lengths = NULL; 2388 phasedata->grouped_cols = NULL; 2389 } 2390 2391 /* 2392 * If we are grouping, precompute fmgr lookup data for inner loop. 2393 */ 2394 if (aggnode->aggstrategy == AGG_SORTED) 2395 { 2396 int i = 0; 2397 2398 Assert(aggnode->numCols > 0); 2399 2400 /* 2401 * Build a separate function for each subset of columns that 2402 * need to be compared. 2403 */ 2404 phasedata->eqfunctions = 2405 (ExprState **) palloc0(aggnode->numCols * sizeof(ExprState *)); 2406 2407 /* for each grouping set */ 2408 for (i = 0; i < phasedata->numsets; i++) 2409 { 2410 int length = phasedata->gset_lengths[i]; 2411 2412 if (phasedata->eqfunctions[length - 1] != NULL) 2413 continue; 2414 2415 phasedata->eqfunctions[length - 1] = 2416 execTuplesMatchPrepare(scanDesc, 2417 length, 2418 aggnode->grpColIdx, 2419 aggnode->grpOperators, 2420 aggnode->grpCollations, 2421 (PlanState *) aggstate); 2422 } 2423 2424 /* and for all grouped columns, unless already computed */ 2425 if (phasedata->eqfunctions[aggnode->numCols - 1] == NULL) 2426 { 2427 phasedata->eqfunctions[aggnode->numCols - 1] = 2428 execTuplesMatchPrepare(scanDesc, 2429 aggnode->numCols, 2430 aggnode->grpColIdx, 2431 aggnode->grpOperators, 2432 aggnode->grpCollations, 2433 (PlanState *) aggstate); 2434 } 2435 } 2436 2437 phasedata->aggnode = aggnode; 2438 phasedata->aggstrategy = aggnode->aggstrategy; 2439 phasedata->sortnode = sortnode; 2440 } 2441 } 2442 2443 /* 2444 * Convert all_grouped_cols to a descending-order list. 2445 */ 2446 i = -1; 2447 while ((i = bms_next_member(all_grouped_cols, i)) >= 0) 2448 aggstate->all_grouped_cols = lcons_int(i, aggstate->all_grouped_cols); 2449 2450 /* 2451 * Set up aggregate-result storage in the output expr context, and also 2452 * allocate my private per-agg working storage 2453 */ 2454 econtext = aggstate->ss.ps.ps_ExprContext; 2455 econtext->ecxt_aggvalues = (Datum *) palloc0(sizeof(Datum) * numaggs); 2456 econtext->ecxt_aggnulls = (bool *) palloc0(sizeof(bool) * numaggs); 2457 2458 peraggs = (AggStatePerAgg) palloc0(sizeof(AggStatePerAggData) * numaggs); 2459 pertransstates = (AggStatePerTrans) palloc0(sizeof(AggStatePerTransData) * numaggs); 2460 2461 aggstate->peragg = peraggs; 2462 aggstate->pertrans = pertransstates; 2463 2464 2465 aggstate->all_pergroups = 2466 (AggStatePerGroup *) palloc0(sizeof(AggStatePerGroup) 2467 * (numGroupingSets + numHashes)); 2468 pergroups = aggstate->all_pergroups; 2469 2470 if (node->aggstrategy != AGG_HASHED) 2471 { 2472 for (i = 0; i < numGroupingSets; i++) 2473 { 2474 pergroups[i] = (AggStatePerGroup) palloc0(sizeof(AggStatePerGroupData) 2475 * numaggs); 2476 } 2477 2478 aggstate->pergroups = pergroups; 2479 pergroups += numGroupingSets; 2480 } 2481 2482 /* 2483 * Hashing can only appear in the initial phase. 2484 */ 2485 if (use_hashing) 2486 { 2487 /* this is an array of pointers, not structures */ 2488 aggstate->hash_pergroup = pergroups; 2489 2490 find_hash_columns(aggstate); 2491 2492 /* Skip massive memory allocation if we are just doing EXPLAIN */ 2493 if (!(eflags & EXEC_FLAG_EXPLAIN_ONLY)) 2494 build_hash_table(aggstate); 2495 2496 aggstate->table_filled = false; 2497 } 2498 2499 /* 2500 * Initialize current phase-dependent values to initial phase. The initial 2501 * phase is 1 (first sort pass) for all strategies that use sorting (if 2502 * hashing is being done too, then phase 0 is processed last); but if only 2503 * hashing is being done, then phase 0 is all there is. 2504 */ 2505 if (node->aggstrategy == AGG_HASHED) 2506 { 2507 aggstate->current_phase = 0; 2508 initialize_phase(aggstate, 0); 2509 select_current_set(aggstate, 0, true); 2510 } 2511 else 2512 { 2513 aggstate->current_phase = 1; 2514 initialize_phase(aggstate, 1); 2515 select_current_set(aggstate, 0, false); 2516 } 2517 2518 /* ----------------- 2519 * Perform lookups of aggregate function info, and initialize the 2520 * unchanging fields of the per-agg and per-trans data. 2521 * 2522 * We try to optimize by detecting duplicate aggregate functions so that 2523 * their state and final values are re-used, rather than needlessly being 2524 * re-calculated independently. We also detect aggregates that are not 2525 * the same, but which can share the same transition state. 2526 * 2527 * Scenarios: 2528 * 2529 * 1. Identical aggregate function calls appear in the query: 2530 * 2531 * SELECT SUM(x) FROM ... HAVING SUM(x) > 0 2532 * 2533 * Since these aggregates are identical, we only need to calculate 2534 * the value once. Both aggregates will share the same 'aggno' value. 2535 * 2536 * 2. Two different aggregate functions appear in the query, but the 2537 * aggregates have the same arguments, transition functions and 2538 * initial values (and, presumably, different final functions): 2539 * 2540 * SELECT AVG(x), STDDEV(x) FROM ... 2541 * 2542 * In this case we must create a new peragg for the varying aggregate, 2543 * and we need to call the final functions separately, but we need 2544 * only run the transition function once. (This requires that the 2545 * final functions be nondestructive of the transition state, but 2546 * that's required anyway for other reasons.) 2547 * 2548 * For either of these optimizations to be valid, all aggregate properties 2549 * used in the transition phase must be the same, including any modifiers 2550 * such as ORDER BY, DISTINCT and FILTER, and the arguments mustn't 2551 * contain any volatile functions. 2552 * ----------------- 2553 */ 2554 aggno = -1; 2555 transno = -1; 2556 foreach(l, aggstate->aggs) 2557 { 2558 AggrefExprState *aggrefstate = (AggrefExprState *) lfirst(l); 2559 Aggref *aggref = aggrefstate->aggref; 2560 AggStatePerAgg peragg; 2561 AggStatePerTrans pertrans; 2562 int existing_aggno; 2563 int existing_transno; 2564 List *same_input_transnos; 2565 Oid inputTypes[FUNC_MAX_ARGS]; 2566 int numArguments; 2567 int numDirectArgs; 2568 HeapTuple aggTuple; 2569 Form_pg_aggregate aggform; 2570 AclResult aclresult; 2571 Oid transfn_oid, 2572 finalfn_oid; 2573 bool shareable; 2574 Oid serialfn_oid, 2575 deserialfn_oid; 2576 Expr *finalfnexpr; 2577 Oid aggtranstype; 2578 Datum textInitVal; 2579 Datum initValue; 2580 bool initValueIsNull; 2581 2582 /* Planner should have assigned aggregate to correct level */ 2583 Assert(aggref->agglevelsup == 0); 2584 /* ... and the split mode should match */ 2585 Assert(aggref->aggsplit == aggstate->aggsplit); 2586 2587 /* 1. Check for already processed aggs which can be re-used */ 2588 existing_aggno = find_compatible_peragg(aggref, aggstate, aggno, 2589 &same_input_transnos); 2590 if (existing_aggno != -1) 2591 { 2592 /* 2593 * Existing compatible agg found. so just point the Aggref to the 2594 * same per-agg struct. 2595 */ 2596 aggrefstate->aggno = existing_aggno; 2597 continue; 2598 } 2599 2600 /* Mark Aggref state node with assigned index in the result array */ 2601 peragg = &peraggs[++aggno]; 2602 peragg->aggref = aggref; 2603 aggrefstate->aggno = aggno; 2604 2605 /* Fetch the pg_aggregate row */ 2606 aggTuple = SearchSysCache1(AGGFNOID, 2607 ObjectIdGetDatum(aggref->aggfnoid)); 2608 if (!HeapTupleIsValid(aggTuple)) 2609 elog(ERROR, "cache lookup failed for aggregate %u", 2610 aggref->aggfnoid); 2611 aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple); 2612 2613 /* Check permission to call aggregate function */ 2614 aclresult = pg_proc_aclcheck(aggref->aggfnoid, GetUserId(), 2615 ACL_EXECUTE); 2616 if (aclresult != ACLCHECK_OK) 2617 aclcheck_error(aclresult, OBJECT_AGGREGATE, 2618 get_func_name(aggref->aggfnoid)); 2619 InvokeFunctionExecuteHook(aggref->aggfnoid); 2620 2621 /* planner recorded transition state type in the Aggref itself */ 2622 aggtranstype = aggref->aggtranstype; 2623 Assert(OidIsValid(aggtranstype)); 2624 2625 /* 2626 * If this aggregation is performing state combines, then instead of 2627 * using the transition function, we'll use the combine function 2628 */ 2629 if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit)) 2630 { 2631 transfn_oid = aggform->aggcombinefn; 2632 2633 /* If not set then the planner messed up */ 2634 if (!OidIsValid(transfn_oid)) 2635 elog(ERROR, "combinefn not set for aggregate function"); 2636 } 2637 else 2638 transfn_oid = aggform->aggtransfn; 2639 2640 /* Final function only required if we're finalizing the aggregates */ 2641 if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit)) 2642 peragg->finalfn_oid = finalfn_oid = InvalidOid; 2643 else 2644 peragg->finalfn_oid = finalfn_oid = aggform->aggfinalfn; 2645 2646 /* 2647 * If finalfn is marked read-write, we can't share transition states; 2648 * but it is okay to share states for AGGMODIFY_SHAREABLE aggs. Also, 2649 * if we're not executing the finalfn here, we can share regardless. 2650 */ 2651 shareable = (aggform->aggfinalmodify != AGGMODIFY_READ_WRITE) || 2652 (finalfn_oid == InvalidOid); 2653 peragg->shareable = shareable; 2654 2655 serialfn_oid = InvalidOid; 2656 deserialfn_oid = InvalidOid; 2657 2658 /* 2659 * Check if serialization/deserialization is required. We only do it 2660 * for aggregates that have transtype INTERNAL. 2661 */ 2662 if (aggtranstype == INTERNALOID) 2663 { 2664 /* 2665 * The planner should only have generated a serialize agg node if 2666 * every aggregate with an INTERNAL state has a serialization 2667 * function. Verify that. 2668 */ 2669 if (DO_AGGSPLIT_SERIALIZE(aggstate->aggsplit)) 2670 { 2671 /* serialization only valid when not running finalfn */ 2672 Assert(DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit)); 2673 2674 if (!OidIsValid(aggform->aggserialfn)) 2675 elog(ERROR, "serialfunc not provided for serialization aggregation"); 2676 serialfn_oid = aggform->aggserialfn; 2677 } 2678 2679 /* Likewise for deserialization functions */ 2680 if (DO_AGGSPLIT_DESERIALIZE(aggstate->aggsplit)) 2681 { 2682 /* deserialization only valid when combining states */ 2683 Assert(DO_AGGSPLIT_COMBINE(aggstate->aggsplit)); 2684 2685 if (!OidIsValid(aggform->aggdeserialfn)) 2686 elog(ERROR, "deserialfunc not provided for deserialization aggregation"); 2687 deserialfn_oid = aggform->aggdeserialfn; 2688 } 2689 } 2690 2691 /* Check that aggregate owner has permission to call component fns */ 2692 { 2693 HeapTuple procTuple; 2694 Oid aggOwner; 2695 2696 procTuple = SearchSysCache1(PROCOID, 2697 ObjectIdGetDatum(aggref->aggfnoid)); 2698 if (!HeapTupleIsValid(procTuple)) 2699 elog(ERROR, "cache lookup failed for function %u", 2700 aggref->aggfnoid); 2701 aggOwner = ((Form_pg_proc) GETSTRUCT(procTuple))->proowner; 2702 ReleaseSysCache(procTuple); 2703 2704 aclresult = pg_proc_aclcheck(transfn_oid, aggOwner, 2705 ACL_EXECUTE); 2706 if (aclresult != ACLCHECK_OK) 2707 aclcheck_error(aclresult, OBJECT_FUNCTION, 2708 get_func_name(transfn_oid)); 2709 InvokeFunctionExecuteHook(transfn_oid); 2710 if (OidIsValid(finalfn_oid)) 2711 { 2712 aclresult = pg_proc_aclcheck(finalfn_oid, aggOwner, 2713 ACL_EXECUTE); 2714 if (aclresult != ACLCHECK_OK) 2715 aclcheck_error(aclresult, OBJECT_FUNCTION, 2716 get_func_name(finalfn_oid)); 2717 InvokeFunctionExecuteHook(finalfn_oid); 2718 } 2719 if (OidIsValid(serialfn_oid)) 2720 { 2721 aclresult = pg_proc_aclcheck(serialfn_oid, aggOwner, 2722 ACL_EXECUTE); 2723 if (aclresult != ACLCHECK_OK) 2724 aclcheck_error(aclresult, OBJECT_FUNCTION, 2725 get_func_name(serialfn_oid)); 2726 InvokeFunctionExecuteHook(serialfn_oid); 2727 } 2728 if (OidIsValid(deserialfn_oid)) 2729 { 2730 aclresult = pg_proc_aclcheck(deserialfn_oid, aggOwner, 2731 ACL_EXECUTE); 2732 if (aclresult != ACLCHECK_OK) 2733 aclcheck_error(aclresult, OBJECT_FUNCTION, 2734 get_func_name(deserialfn_oid)); 2735 InvokeFunctionExecuteHook(deserialfn_oid); 2736 } 2737 } 2738 2739 /* 2740 * Get actual datatypes of the (nominal) aggregate inputs. These 2741 * could be different from the agg's declared input types, when the 2742 * agg accepts ANY or a polymorphic type. 2743 */ 2744 numArguments = get_aggregate_argtypes(aggref, inputTypes); 2745 2746 /* Count the "direct" arguments, if any */ 2747 numDirectArgs = list_length(aggref->aggdirectargs); 2748 2749 /* Detect how many arguments to pass to the finalfn */ 2750 if (aggform->aggfinalextra) 2751 peragg->numFinalArgs = numArguments + 1; 2752 else 2753 peragg->numFinalArgs = numDirectArgs + 1; 2754 2755 /* Initialize any direct-argument expressions */ 2756 peragg->aggdirectargs = ExecInitExprList(aggref->aggdirectargs, 2757 (PlanState *) aggstate); 2758 2759 /* 2760 * build expression trees using actual argument & result types for the 2761 * finalfn, if it exists and is required. 2762 */ 2763 if (OidIsValid(finalfn_oid)) 2764 { 2765 build_aggregate_finalfn_expr(inputTypes, 2766 peragg->numFinalArgs, 2767 aggtranstype, 2768 aggref->aggtype, 2769 aggref->inputcollid, 2770 finalfn_oid, 2771 &finalfnexpr); 2772 fmgr_info(finalfn_oid, &peragg->finalfn); 2773 fmgr_info_set_expr((Node *) finalfnexpr, &peragg->finalfn); 2774 } 2775 2776 /* get info about the output value's datatype */ 2777 get_typlenbyval(aggref->aggtype, 2778 &peragg->resulttypeLen, 2779 &peragg->resulttypeByVal); 2780 2781 /* 2782 * initval is potentially null, so don't try to access it as a struct 2783 * field. Must do it the hard way with SysCacheGetAttr. 2784 */ 2785 textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple, 2786 Anum_pg_aggregate_agginitval, 2787 &initValueIsNull); 2788 if (initValueIsNull) 2789 initValue = (Datum) 0; 2790 else 2791 initValue = GetAggInitVal(textInitVal, aggtranstype); 2792 2793 /* 2794 * 2. Build working state for invoking the transition function, or 2795 * look up previously initialized working state, if we can share it. 2796 * 2797 * find_compatible_peragg() already collected a list of shareable 2798 * per-Trans's with the same inputs. Check if any of them have the 2799 * same transition function and initial value. 2800 */ 2801 existing_transno = find_compatible_pertrans(aggstate, aggref, 2802 shareable, 2803 transfn_oid, aggtranstype, 2804 serialfn_oid, deserialfn_oid, 2805 initValue, initValueIsNull, 2806 same_input_transnos); 2807 if (existing_transno != -1) 2808 { 2809 /* 2810 * Existing compatible trans found, so just point the 'peragg' to 2811 * the same per-trans struct, and mark the trans state as shared. 2812 */ 2813 pertrans = &pertransstates[existing_transno]; 2814 pertrans->aggshared = true; 2815 peragg->transno = existing_transno; 2816 } 2817 else 2818 { 2819 pertrans = &pertransstates[++transno]; 2820 build_pertrans_for_aggref(pertrans, aggstate, estate, 2821 aggref, transfn_oid, aggtranstype, 2822 serialfn_oid, deserialfn_oid, 2823 initValue, initValueIsNull, 2824 inputTypes, numArguments); 2825 peragg->transno = transno; 2826 } 2827 ReleaseSysCache(aggTuple); 2828 } 2829 2830 /* 2831 * Update aggstate->numaggs to be the number of unique aggregates found. 2832 * Also set numstates to the number of unique transition states found. 2833 */ 2834 aggstate->numaggs = aggno + 1; 2835 aggstate->numtrans = transno + 1; 2836 2837 /* 2838 * Last, check whether any more aggregates got added onto the node while 2839 * we processed the expressions for the aggregate arguments (including not 2840 * only the regular arguments and FILTER expressions handled immediately 2841 * above, but any direct arguments we might've handled earlier). If so, 2842 * we have nested aggregate functions, which is semantically nonsensical, 2843 * so complain. (This should have been caught by the parser, so we don't 2844 * need to work hard on a helpful error message; but we defend against it 2845 * here anyway, just to be sure.) 2846 */ 2847 if (numaggs != list_length(aggstate->aggs)) 2848 ereport(ERROR, 2849 (errcode(ERRCODE_GROUPING_ERROR), 2850 errmsg("aggregate function calls cannot be nested"))); 2851 2852 /* 2853 * Build expressions doing all the transition work at once. We build a 2854 * different one for each phase, as the number of transition function 2855 * invocation can differ between phases. Note this'll work both for 2856 * transition and combination functions (although there'll only be one 2857 * phase in the latter case). 2858 */ 2859 for (phaseidx = 0; phaseidx < aggstate->numphases; phaseidx++) 2860 { 2861 AggStatePerPhase phase = &aggstate->phases[phaseidx]; 2862 bool dohash = false; 2863 bool dosort = false; 2864 2865 /* phase 0 doesn't necessarily exist */ 2866 if (!phase->aggnode) 2867 continue; 2868 2869 if (aggstate->aggstrategy == AGG_MIXED && phaseidx == 1) 2870 { 2871 /* 2872 * Phase one, and only phase one, in a mixed agg performs both 2873 * sorting and aggregation. 2874 */ 2875 dohash = true; 2876 dosort = true; 2877 } 2878 else if (aggstate->aggstrategy == AGG_MIXED && phaseidx == 0) 2879 { 2880 /* 2881 * No need to compute a transition function for an AGG_MIXED phase 2882 * 0 - the contents of the hashtables will have been computed 2883 * during phase 1. 2884 */ 2885 continue; 2886 } 2887 else if (phase->aggstrategy == AGG_PLAIN || 2888 phase->aggstrategy == AGG_SORTED) 2889 { 2890 dohash = false; 2891 dosort = true; 2892 } 2893 else if (phase->aggstrategy == AGG_HASHED) 2894 { 2895 dohash = true; 2896 dosort = false; 2897 } 2898 else 2899 Assert(false); 2900 2901 phase->evaltrans = ExecBuildAggTrans(aggstate, phase, dosort, dohash); 2902 2903 } 2904 2905 return aggstate; 2906 } 2907 2908 /* 2909 * Build the state needed to calculate a state value for an aggregate. 2910 * 2911 * This initializes all the fields in 'pertrans'. 'aggref' is the aggregate 2912 * to initialize the state for. 'aggtransfn', 'aggtranstype', and the rest 2913 * of the arguments could be calculated from 'aggref', but the caller has 2914 * calculated them already, so might as well pass them. 2915 */ 2916 static void 2917 build_pertrans_for_aggref(AggStatePerTrans pertrans, 2918 AggState *aggstate, EState *estate, 2919 Aggref *aggref, 2920 Oid aggtransfn, Oid aggtranstype, 2921 Oid aggserialfn, Oid aggdeserialfn, 2922 Datum initValue, bool initValueIsNull, 2923 Oid *inputTypes, int numArguments) 2924 { 2925 int numGroupingSets = Max(aggstate->maxsets, 1); 2926 Expr *serialfnexpr = NULL; 2927 Expr *deserialfnexpr = NULL; 2928 ListCell *lc; 2929 int numInputs; 2930 int numDirectArgs; 2931 List *sortlist; 2932 int numSortCols; 2933 int numDistinctCols; 2934 int i; 2935 2936 /* Begin filling in the pertrans data */ 2937 pertrans->aggref = aggref; 2938 pertrans->aggshared = false; 2939 pertrans->aggCollation = aggref->inputcollid; 2940 pertrans->transfn_oid = aggtransfn; 2941 pertrans->serialfn_oid = aggserialfn; 2942 pertrans->deserialfn_oid = aggdeserialfn; 2943 pertrans->initValue = initValue; 2944 pertrans->initValueIsNull = initValueIsNull; 2945 2946 /* Count the "direct" arguments, if any */ 2947 numDirectArgs = list_length(aggref->aggdirectargs); 2948 2949 /* Count the number of aggregated input columns */ 2950 pertrans->numInputs = numInputs = list_length(aggref->args); 2951 2952 pertrans->aggtranstype = aggtranstype; 2953 2954 /* 2955 * When combining states, we have no use at all for the aggregate 2956 * function's transfn. Instead we use the combinefn. In this case, the 2957 * transfn and transfn_oid fields of pertrans refer to the combine 2958 * function rather than the transition function. 2959 */ 2960 if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit)) 2961 { 2962 Expr *combinefnexpr; 2963 size_t numTransArgs; 2964 2965 /* 2966 * When combining there's only one input, the to-be-combined added 2967 * transition value from below (this node's transition value is 2968 * counted separately). 2969 */ 2970 pertrans->numTransInputs = 1; 2971 2972 /* account for the current transition state */ 2973 numTransArgs = pertrans->numTransInputs + 1; 2974 2975 build_aggregate_combinefn_expr(aggtranstype, 2976 aggref->inputcollid, 2977 aggtransfn, 2978 &combinefnexpr); 2979 fmgr_info(aggtransfn, &pertrans->transfn); 2980 fmgr_info_set_expr((Node *) combinefnexpr, &pertrans->transfn); 2981 2982 pertrans->transfn_fcinfo = 2983 (FunctionCallInfo) palloc(SizeForFunctionCallInfo(2)); 2984 InitFunctionCallInfoData(*pertrans->transfn_fcinfo, 2985 &pertrans->transfn, 2986 numTransArgs, 2987 pertrans->aggCollation, 2988 (void *) aggstate, NULL); 2989 2990 /* 2991 * Ensure that a combine function to combine INTERNAL states is not 2992 * strict. This should have been checked during CREATE AGGREGATE, but 2993 * the strict property could have been changed since then. 2994 */ 2995 if (pertrans->transfn.fn_strict && aggtranstype == INTERNALOID) 2996 ereport(ERROR, 2997 (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), 2998 errmsg("combine function with transition type %s must not be declared STRICT", 2999 format_type_be(aggtranstype)))); 3000 } 3001 else 3002 { 3003 Expr *transfnexpr; 3004 size_t numTransArgs; 3005 3006 /* Detect how many arguments to pass to the transfn */ 3007 if (AGGKIND_IS_ORDERED_SET(aggref->aggkind)) 3008 pertrans->numTransInputs = numInputs; 3009 else 3010 pertrans->numTransInputs = numArguments; 3011 3012 /* account for the current transition state */ 3013 numTransArgs = pertrans->numTransInputs + 1; 3014 3015 /* 3016 * Set up infrastructure for calling the transfn. Note that invtrans 3017 * is not needed here. 3018 */ 3019 build_aggregate_transfn_expr(inputTypes, 3020 numArguments, 3021 numDirectArgs, 3022 aggref->aggvariadic, 3023 aggtranstype, 3024 aggref->inputcollid, 3025 aggtransfn, 3026 InvalidOid, 3027 &transfnexpr, 3028 NULL); 3029 fmgr_info(aggtransfn, &pertrans->transfn); 3030 fmgr_info_set_expr((Node *) transfnexpr, &pertrans->transfn); 3031 3032 pertrans->transfn_fcinfo = 3033 (FunctionCallInfo) palloc(SizeForFunctionCallInfo(numTransArgs)); 3034 InitFunctionCallInfoData(*pertrans->transfn_fcinfo, 3035 &pertrans->transfn, 3036 numTransArgs, 3037 pertrans->aggCollation, 3038 (void *) aggstate, NULL); 3039 3040 /* 3041 * If the transfn is strict and the initval is NULL, make sure input 3042 * type and transtype are the same (or at least binary-compatible), so 3043 * that it's OK to use the first aggregated input value as the initial 3044 * transValue. This should have been checked at agg definition time, 3045 * but we must check again in case the transfn's strictness property 3046 * has been changed. 3047 */ 3048 if (pertrans->transfn.fn_strict && pertrans->initValueIsNull) 3049 { 3050 if (numArguments <= numDirectArgs || 3051 !IsBinaryCoercible(inputTypes[numDirectArgs], 3052 aggtranstype)) 3053 ereport(ERROR, 3054 (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), 3055 errmsg("aggregate %u needs to have compatible input type and transition type", 3056 aggref->aggfnoid))); 3057 } 3058 } 3059 3060 /* get info about the state value's datatype */ 3061 get_typlenbyval(aggtranstype, 3062 &pertrans->transtypeLen, 3063 &pertrans->transtypeByVal); 3064 3065 if (OidIsValid(aggserialfn)) 3066 { 3067 build_aggregate_serialfn_expr(aggserialfn, 3068 &serialfnexpr); 3069 fmgr_info(aggserialfn, &pertrans->serialfn); 3070 fmgr_info_set_expr((Node *) serialfnexpr, &pertrans->serialfn); 3071 3072 pertrans->serialfn_fcinfo = 3073 (FunctionCallInfo) palloc(SizeForFunctionCallInfo(1)); 3074 InitFunctionCallInfoData(*pertrans->serialfn_fcinfo, 3075 &pertrans->serialfn, 3076 1, 3077 InvalidOid, 3078 (void *) aggstate, NULL); 3079 } 3080 3081 if (OidIsValid(aggdeserialfn)) 3082 { 3083 build_aggregate_deserialfn_expr(aggdeserialfn, 3084 &deserialfnexpr); 3085 fmgr_info(aggdeserialfn, &pertrans->deserialfn); 3086 fmgr_info_set_expr((Node *) deserialfnexpr, &pertrans->deserialfn); 3087 3088 pertrans->deserialfn_fcinfo = 3089 (FunctionCallInfo) palloc(SizeForFunctionCallInfo(2)); 3090 InitFunctionCallInfoData(*pertrans->deserialfn_fcinfo, 3091 &pertrans->deserialfn, 3092 2, 3093 InvalidOid, 3094 (void *) aggstate, NULL); 3095 3096 } 3097 3098 /* 3099 * If we're doing either DISTINCT or ORDER BY for a plain agg, then we 3100 * have a list of SortGroupClause nodes; fish out the data in them and 3101 * stick them into arrays. We ignore ORDER BY for an ordered-set agg, 3102 * however; the agg's transfn and finalfn are responsible for that. 3103 * 3104 * Note that by construction, if there is a DISTINCT clause then the ORDER 3105 * BY clause is a prefix of it (see transformDistinctClause). 3106 */ 3107 if (AGGKIND_IS_ORDERED_SET(aggref->aggkind)) 3108 { 3109 sortlist = NIL; 3110 numSortCols = numDistinctCols = 0; 3111 } 3112 else if (aggref->aggdistinct) 3113 { 3114 sortlist = aggref->aggdistinct; 3115 numSortCols = numDistinctCols = list_length(sortlist); 3116 Assert(numSortCols >= list_length(aggref->aggorder)); 3117 } 3118 else 3119 { 3120 sortlist = aggref->aggorder; 3121 numSortCols = list_length(sortlist); 3122 numDistinctCols = 0; 3123 } 3124 3125 pertrans->numSortCols = numSortCols; 3126 pertrans->numDistinctCols = numDistinctCols; 3127 3128 /* 3129 * If we have either sorting or filtering to do, create a tupledesc and 3130 * slot corresponding to the aggregated inputs (including sort 3131 * expressions) of the agg. 3132 */ 3133 if (numSortCols > 0 || aggref->aggfilter) 3134 { 3135 pertrans->sortdesc = ExecTypeFromTL(aggref->args); 3136 pertrans->sortslot = 3137 ExecInitExtraTupleSlot(estate, pertrans->sortdesc, 3138 &TTSOpsMinimalTuple); 3139 } 3140 3141 if (numSortCols > 0) 3142 { 3143 /* 3144 * We don't implement DISTINCT or ORDER BY aggs in the HASHED case 3145 * (yet) 3146 */ 3147 Assert(aggstate->aggstrategy != AGG_HASHED && aggstate->aggstrategy != AGG_MIXED); 3148 3149 /* If we have only one input, we need its len/byval info. */ 3150 if (numInputs == 1) 3151 { 3152 get_typlenbyval(inputTypes[numDirectArgs], 3153 &pertrans->inputtypeLen, 3154 &pertrans->inputtypeByVal); 3155 } 3156 else if (numDistinctCols > 0) 3157 { 3158 /* we will need an extra slot to store prior values */ 3159 pertrans->uniqslot = 3160 ExecInitExtraTupleSlot(estate, pertrans->sortdesc, 3161 &TTSOpsMinimalTuple); 3162 } 3163 3164 /* Extract the sort information for use later */ 3165 pertrans->sortColIdx = 3166 (AttrNumber *) palloc(numSortCols * sizeof(AttrNumber)); 3167 pertrans->sortOperators = 3168 (Oid *) palloc(numSortCols * sizeof(Oid)); 3169 pertrans->sortCollations = 3170 (Oid *) palloc(numSortCols * sizeof(Oid)); 3171 pertrans->sortNullsFirst = 3172 (bool *) palloc(numSortCols * sizeof(bool)); 3173 3174 i = 0; 3175 foreach(lc, sortlist) 3176 { 3177 SortGroupClause *sortcl = (SortGroupClause *) lfirst(lc); 3178 TargetEntry *tle = get_sortgroupclause_tle(sortcl, aggref->args); 3179 3180 /* the parser should have made sure of this */ 3181 Assert(OidIsValid(sortcl->sortop)); 3182 3183 pertrans->sortColIdx[i] = tle->resno; 3184 pertrans->sortOperators[i] = sortcl->sortop; 3185 pertrans->sortCollations[i] = exprCollation((Node *) tle->expr); 3186 pertrans->sortNullsFirst[i] = sortcl->nulls_first; 3187 i++; 3188 } 3189 Assert(i == numSortCols); 3190 } 3191 3192 if (aggref->aggdistinct) 3193 { 3194 Oid *ops; 3195 3196 Assert(numArguments > 0); 3197 Assert(list_length(aggref->aggdistinct) == numDistinctCols); 3198 3199 ops = palloc(numDistinctCols * sizeof(Oid)); 3200 3201 i = 0; 3202 foreach(lc, aggref->aggdistinct) 3203 ops[i++] = ((SortGroupClause *) lfirst(lc))->eqop; 3204 3205 /* lookup / build the necessary comparators */ 3206 if (numDistinctCols == 1) 3207 fmgr_info(get_opcode(ops[0]), &pertrans->equalfnOne); 3208 else 3209 pertrans->equalfnMulti = 3210 execTuplesMatchPrepare(pertrans->sortdesc, 3211 numDistinctCols, 3212 pertrans->sortColIdx, 3213 ops, 3214 pertrans->sortCollations, 3215 &aggstate->ss.ps); 3216 pfree(ops); 3217 } 3218 3219 pertrans->sortstates = (Tuplesortstate **) 3220 palloc0(sizeof(Tuplesortstate *) * numGroupingSets); 3221 } 3222 3223 3224 static Datum 3225 GetAggInitVal(Datum textInitVal, Oid transtype) 3226 { 3227 Oid typinput, 3228 typioparam; 3229 char *strInitVal; 3230 Datum initVal; 3231 3232 getTypeInputInfo(transtype, &typinput, &typioparam); 3233 strInitVal = TextDatumGetCString(textInitVal); 3234 initVal = OidInputFunctionCall(typinput, strInitVal, 3235 typioparam, -1); 3236 pfree(strInitVal); 3237 return initVal; 3238 } 3239 3240 /* 3241 * find_compatible_peragg - search for a previously initialized per-Agg struct 3242 * 3243 * Searches the previously looked at aggregates to find one which is compatible 3244 * with this one, with the same input parameters. If no compatible aggregate 3245 * can be found, returns -1. 3246 * 3247 * As a side-effect, this also collects a list of existing, shareable per-Trans 3248 * structs with matching inputs. If no identical Aggref is found, the list is 3249 * passed later to find_compatible_pertrans, to see if we can at least reuse 3250 * the state value of another aggregate. 3251 */ 3252 static int 3253 find_compatible_peragg(Aggref *newagg, AggState *aggstate, 3254 int lastaggno, List **same_input_transnos) 3255 { 3256 int aggno; 3257 AggStatePerAgg peraggs; 3258 3259 *same_input_transnos = NIL; 3260 3261 /* we mustn't reuse the aggref if it contains volatile function calls */ 3262 if (contain_volatile_functions((Node *) newagg)) 3263 return -1; 3264 3265 peraggs = aggstate->peragg; 3266 3267 /* 3268 * Search through the list of already seen aggregates. If we find an 3269 * existing identical aggregate call, then we can re-use that one. While 3270 * searching, we'll also collect a list of Aggrefs with the same input 3271 * parameters. If no matching Aggref is found, the caller can potentially 3272 * still re-use the transition state of one of them. (At this stage we 3273 * just compare the parsetrees; whether different aggregates share the 3274 * same transition function will be checked later.) 3275 */ 3276 for (aggno = 0; aggno <= lastaggno; aggno++) 3277 { 3278 AggStatePerAgg peragg; 3279 Aggref *existingRef; 3280 3281 peragg = &peraggs[aggno]; 3282 existingRef = peragg->aggref; 3283 3284 /* all of the following must be the same or it's no match */ 3285 if (newagg->inputcollid != existingRef->inputcollid || 3286 newagg->aggtranstype != existingRef->aggtranstype || 3287 newagg->aggstar != existingRef->aggstar || 3288 newagg->aggvariadic != existingRef->aggvariadic || 3289 newagg->aggkind != existingRef->aggkind || 3290 !equal(newagg->args, existingRef->args) || 3291 !equal(newagg->aggorder, existingRef->aggorder) || 3292 !equal(newagg->aggdistinct, existingRef->aggdistinct) || 3293 !equal(newagg->aggfilter, existingRef->aggfilter)) 3294 continue; 3295 3296 /* if it's the same aggregate function then report exact match */ 3297 if (newagg->aggfnoid == existingRef->aggfnoid && 3298 newagg->aggtype == existingRef->aggtype && 3299 newagg->aggcollid == existingRef->aggcollid && 3300 equal(newagg->aggdirectargs, existingRef->aggdirectargs)) 3301 { 3302 list_free(*same_input_transnos); 3303 *same_input_transnos = NIL; 3304 return aggno; 3305 } 3306 3307 /* 3308 * Not identical, but it had the same inputs. If the final function 3309 * permits sharing, return its transno to the caller, in case we can 3310 * re-use its per-trans state. (If there's already sharing going on, 3311 * we might report a transno more than once. find_compatible_pertrans 3312 * is cheap enough that it's not worth spending cycles to avoid that.) 3313 */ 3314 if (peragg->shareable) 3315 *same_input_transnos = lappend_int(*same_input_transnos, 3316 peragg->transno); 3317 } 3318 3319 return -1; 3320 } 3321 3322 /* 3323 * find_compatible_pertrans - search for a previously initialized per-Trans 3324 * struct 3325 * 3326 * Searches the list of transnos for a per-Trans struct with the same 3327 * transition function and initial condition. (The inputs have already been 3328 * verified to match.) 3329 */ 3330 static int 3331 find_compatible_pertrans(AggState *aggstate, Aggref *newagg, bool shareable, 3332 Oid aggtransfn, Oid aggtranstype, 3333 Oid aggserialfn, Oid aggdeserialfn, 3334 Datum initValue, bool initValueIsNull, 3335 List *transnos) 3336 { 3337 ListCell *lc; 3338 3339 /* If this aggregate can't share transition states, give up */ 3340 if (!shareable) 3341 return -1; 3342 3343 foreach(lc, transnos) 3344 { 3345 int transno = lfirst_int(lc); 3346 AggStatePerTrans pertrans = &aggstate->pertrans[transno]; 3347 3348 /* 3349 * if the transfns or transition state types are not the same then the 3350 * state can't be shared. 3351 */ 3352 if (aggtransfn != pertrans->transfn_oid || 3353 aggtranstype != pertrans->aggtranstype) 3354 continue; 3355 3356 /* 3357 * The serialization and deserialization functions must match, if 3358 * present, as we're unable to share the trans state for aggregates 3359 * which will serialize or deserialize into different formats. 3360 * Remember that these will be InvalidOid if they're not required for 3361 * this agg node. 3362 */ 3363 if (aggserialfn != pertrans->serialfn_oid || 3364 aggdeserialfn != pertrans->deserialfn_oid) 3365 continue; 3366 3367 /* 3368 * Check that the initial condition matches, too. 3369 */ 3370 if (initValueIsNull && pertrans->initValueIsNull) 3371 return transno; 3372 3373 if (!initValueIsNull && !pertrans->initValueIsNull && 3374 datumIsEqual(initValue, pertrans->initValue, 3375 pertrans->transtypeByVal, pertrans->transtypeLen)) 3376 return transno; 3377 } 3378 return -1; 3379 } 3380 3381 void 3382 ExecEndAgg(AggState *node) 3383 { 3384 PlanState *outerPlan; 3385 int transno; 3386 int numGroupingSets = Max(node->maxsets, 1); 3387 int setno; 3388 3389 /* Make sure we have closed any open tuplesorts */ 3390 3391 if (node->sort_in) 3392 tuplesort_end(node->sort_in); 3393 if (node->sort_out) 3394 tuplesort_end(node->sort_out); 3395 3396 for (transno = 0; transno < node->numtrans; transno++) 3397 { 3398 AggStatePerTrans pertrans = &node->pertrans[transno]; 3399 3400 for (setno = 0; setno < numGroupingSets; setno++) 3401 { 3402 if (pertrans->sortstates[setno]) 3403 tuplesort_end(pertrans->sortstates[setno]); 3404 } 3405 } 3406 3407 /* And ensure any agg shutdown callbacks have been called */ 3408 for (setno = 0; setno < numGroupingSets; setno++) 3409 ReScanExprContext(node->aggcontexts[setno]); 3410 if (node->hashcontext) 3411 ReScanExprContext(node->hashcontext); 3412 3413 /* 3414 * We don't actually free any ExprContexts here (see comment in 3415 * ExecFreeExprContext), just unlinking the output one from the plan node 3416 * suffices. 3417 */ 3418 ExecFreeExprContext(&node->ss.ps); 3419 3420 /* clean up tuple table */ 3421 ExecClearTuple(node->ss.ss_ScanTupleSlot); 3422 3423 outerPlan = outerPlanState(node); 3424 ExecEndNode(outerPlan); 3425 } 3426 3427 void 3428 ExecReScanAgg(AggState *node) 3429 { 3430 ExprContext *econtext = node->ss.ps.ps_ExprContext; 3431 PlanState *outerPlan = outerPlanState(node); 3432 Agg *aggnode = (Agg *) node->ss.ps.plan; 3433 int transno; 3434 int numGroupingSets = Max(node->maxsets, 1); 3435 int setno; 3436 3437 node->agg_done = false; 3438 3439 if (node->aggstrategy == AGG_HASHED) 3440 { 3441 /* 3442 * In the hashed case, if we haven't yet built the hash table then we 3443 * can just return; nothing done yet, so nothing to undo. If subnode's 3444 * chgParam is not NULL then it will be re-scanned by ExecProcNode, 3445 * else no reason to re-scan it at all. 3446 */ 3447 if (!node->table_filled) 3448 return; 3449 3450 /* 3451 * If we do have the hash table, and the subplan does not have any 3452 * parameter changes, and none of our own parameter changes affect 3453 * input expressions of the aggregated functions, then we can just 3454 * rescan the existing hash table; no need to build it again. 3455 */ 3456 if (outerPlan->chgParam == NULL && 3457 !bms_overlap(node->ss.ps.chgParam, aggnode->aggParams)) 3458 { 3459 ResetTupleHashIterator(node->perhash[0].hashtable, 3460 &node->perhash[0].hashiter); 3461 select_current_set(node, 0, true); 3462 return; 3463 } 3464 } 3465 3466 /* Make sure we have closed any open tuplesorts */ 3467 for (transno = 0; transno < node->numtrans; transno++) 3468 { 3469 for (setno = 0; setno < numGroupingSets; setno++) 3470 { 3471 AggStatePerTrans pertrans = &node->pertrans[transno]; 3472 3473 if (pertrans->sortstates[setno]) 3474 { 3475 tuplesort_end(pertrans->sortstates[setno]); 3476 pertrans->sortstates[setno] = NULL; 3477 } 3478 } 3479 } 3480 3481 /* 3482 * We don't need to ReScanExprContext the output tuple context here; 3483 * ExecReScan already did it. But we do need to reset our per-grouping-set 3484 * contexts, which may have transvalues stored in them. (We use rescan 3485 * rather than just reset because transfns may have registered callbacks 3486 * that need to be run now.) For the AGG_HASHED case, see below. 3487 */ 3488 3489 for (setno = 0; setno < numGroupingSets; setno++) 3490 { 3491 ReScanExprContext(node->aggcontexts[setno]); 3492 } 3493 3494 /* Release first tuple of group, if we have made a copy */ 3495 if (node->grp_firstTuple != NULL) 3496 { 3497 heap_freetuple(node->grp_firstTuple); 3498 node->grp_firstTuple = NULL; 3499 } 3500 ExecClearTuple(node->ss.ss_ScanTupleSlot); 3501 3502 /* Forget current agg values */ 3503 MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * node->numaggs); 3504 MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * node->numaggs); 3505 3506 /* 3507 * With AGG_HASHED/MIXED, the hash table is allocated in a sub-context of 3508 * the hashcontext. This used to be an issue, but now, resetting a context 3509 * automatically deletes sub-contexts too. 3510 */ 3511 if (node->aggstrategy == AGG_HASHED || node->aggstrategy == AGG_MIXED) 3512 { 3513 ReScanExprContext(node->hashcontext); 3514 /* Rebuild an empty hash table */ 3515 build_hash_table(node); 3516 node->table_filled = false; 3517 /* iterator will be reset when the table is filled */ 3518 } 3519 3520 if (node->aggstrategy != AGG_HASHED) 3521 { 3522 /* 3523 * Reset the per-group state (in particular, mark transvalues null) 3524 */ 3525 for (setno = 0; setno < numGroupingSets; setno++) 3526 { 3527 MemSet(node->pergroups[setno], 0, 3528 sizeof(AggStatePerGroupData) * node->numaggs); 3529 } 3530 3531 /* reset to phase 1 */ 3532 initialize_phase(node, 1); 3533 3534 node->input_done = false; 3535 node->projected_set = -1; 3536 } 3537 3538 if (outerPlan->chgParam == NULL) 3539 ExecReScan(outerPlan); 3540 } 3541 3542 3543 /*********************************************************************** 3544 * API exposed to aggregate functions 3545 ***********************************************************************/ 3546 3547 3548 /* 3549 * AggCheckCallContext - test if a SQL function is being called as an aggregate 3550 * 3551 * The transition and/or final functions of an aggregate may want to verify 3552 * that they are being called as aggregates, rather than as plain SQL 3553 * functions. They should use this function to do so. The return value 3554 * is nonzero if being called as an aggregate, or zero if not. (Specific 3555 * nonzero values are AGG_CONTEXT_AGGREGATE or AGG_CONTEXT_WINDOW, but more 3556 * values could conceivably appear in future.) 3557 * 3558 * If aggcontext isn't NULL, the function also stores at *aggcontext the 3559 * identity of the memory context that aggregate transition values are being 3560 * stored in. Note that the same aggregate call site (flinfo) may be called 3561 * interleaved on different transition values in different contexts, so it's 3562 * not kosher to cache aggcontext under fn_extra. It is, however, kosher to 3563 * cache it in the transvalue itself (for internal-type transvalues). 3564 */ 3565 int 3566 AggCheckCallContext(FunctionCallInfo fcinfo, MemoryContext *aggcontext) 3567 { 3568 if (fcinfo->context && IsA(fcinfo->context, AggState)) 3569 { 3570 if (aggcontext) 3571 { 3572 AggState *aggstate = ((AggState *) fcinfo->context); 3573 ExprContext *cxt = aggstate->curaggcontext; 3574 3575 *aggcontext = cxt->ecxt_per_tuple_memory; 3576 } 3577 return AGG_CONTEXT_AGGREGATE; 3578 } 3579 if (fcinfo->context && IsA(fcinfo->context, WindowAggState)) 3580 { 3581 if (aggcontext) 3582 *aggcontext = ((WindowAggState *) fcinfo->context)->curaggcontext; 3583 return AGG_CONTEXT_WINDOW; 3584 } 3585 3586 /* this is just to prevent "uninitialized variable" warnings */ 3587 if (aggcontext) 3588 *aggcontext = NULL; 3589 return 0; 3590 } 3591 3592 /* 3593 * AggGetAggref - allow an aggregate support function to get its Aggref 3594 * 3595 * If the function is being called as an aggregate support function, 3596 * return the Aggref node for the aggregate call. Otherwise, return NULL. 3597 * 3598 * Aggregates sharing the same inputs and transition functions can get 3599 * merged into a single transition calculation. If the transition function 3600 * calls AggGetAggref, it will get some one of the Aggrefs for which it is 3601 * executing. It must therefore not pay attention to the Aggref fields that 3602 * relate to the final function, as those are indeterminate. But if a final 3603 * function calls AggGetAggref, it will get a precise result. 3604 * 3605 * Note that if an aggregate is being used as a window function, this will 3606 * return NULL. We could provide a similar function to return the relevant 3607 * WindowFunc node in such cases, but it's not needed yet. 3608 */ 3609 Aggref * 3610 AggGetAggref(FunctionCallInfo fcinfo) 3611 { 3612 if (fcinfo->context && IsA(fcinfo->context, AggState)) 3613 { 3614 AggState *aggstate = (AggState *) fcinfo->context; 3615 AggStatePerAgg curperagg; 3616 AggStatePerTrans curpertrans; 3617 3618 /* check curperagg (valid when in a final function) */ 3619 curperagg = aggstate->curperagg; 3620 3621 if (curperagg) 3622 return curperagg->aggref; 3623 3624 /* check curpertrans (valid when in a transition function) */ 3625 curpertrans = aggstate->curpertrans; 3626 3627 if (curpertrans) 3628 return curpertrans->aggref; 3629 } 3630 return NULL; 3631 } 3632 3633 /* 3634 * AggGetTempMemoryContext - fetch short-term memory context for aggregates 3635 * 3636 * This is useful in agg final functions; the context returned is one that 3637 * the final function can safely reset as desired. This isn't useful for 3638 * transition functions, since the context returned MAY (we don't promise) 3639 * be the same as the context those are called in. 3640 * 3641 * As above, this is currently not useful for aggs called as window functions. 3642 */ 3643 MemoryContext 3644 AggGetTempMemoryContext(FunctionCallInfo fcinfo) 3645 { 3646 if (fcinfo->context && IsA(fcinfo->context, AggState)) 3647 { 3648 AggState *aggstate = (AggState *) fcinfo->context; 3649 3650 return aggstate->tmpcontext->ecxt_per_tuple_memory; 3651 } 3652 return NULL; 3653 } 3654 3655 /* 3656 * AggStateIsShared - find out whether transition state is shared 3657 * 3658 * If the function is being called as an aggregate support function, 3659 * return true if the aggregate's transition state is shared across 3660 * multiple aggregates, false if it is not. 3661 * 3662 * Returns true if not called as an aggregate support function. 3663 * This is intended as a conservative answer, ie "no you'd better not 3664 * scribble on your input". In particular, will return true if the 3665 * aggregate is being used as a window function, which is a scenario 3666 * in which changing the transition state is a bad idea. We might 3667 * want to refine the behavior for the window case in future. 3668 */ 3669 bool 3670 AggStateIsShared(FunctionCallInfo fcinfo) 3671 { 3672 if (fcinfo->context && IsA(fcinfo->context, AggState)) 3673 { 3674 AggState *aggstate = (AggState *) fcinfo->context; 3675 AggStatePerAgg curperagg; 3676 AggStatePerTrans curpertrans; 3677 3678 /* check curperagg (valid when in a final function) */ 3679 curperagg = aggstate->curperagg; 3680 3681 if (curperagg) 3682 return aggstate->pertrans[curperagg->transno].aggshared; 3683 3684 /* check curpertrans (valid when in a transition function) */ 3685 curpertrans = aggstate->curpertrans; 3686 3687 if (curpertrans) 3688 return curpertrans->aggshared; 3689 } 3690 return true; 3691 } 3692 3693 /* 3694 * AggRegisterCallback - register a cleanup callback for an aggregate 3695 * 3696 * This is useful for aggs to register shutdown callbacks, which will ensure 3697 * that non-memory resources are freed. The callback will occur just before 3698 * the associated aggcontext (as returned by AggCheckCallContext) is reset, 3699 * either between groups or as a result of rescanning the query. The callback 3700 * will NOT be called on error paths. The typical use-case is for freeing of 3701 * tuplestores or tuplesorts maintained in aggcontext, or pins held by slots 3702 * created by the agg functions. (The callback will not be called until after 3703 * the result of the finalfn is no longer needed, so it's safe for the finalfn 3704 * to return data that will be freed by the callback.) 3705 * 3706 * As above, this is currently not useful for aggs called as window functions. 3707 */ 3708 void 3709 AggRegisterCallback(FunctionCallInfo fcinfo, 3710 ExprContextCallbackFunction func, 3711 Datum arg) 3712 { 3713 if (fcinfo->context && IsA(fcinfo->context, AggState)) 3714 { 3715 AggState *aggstate = (AggState *) fcinfo->context; 3716 ExprContext *cxt = aggstate->curaggcontext; 3717 3718 RegisterExprContextCallback(cxt, func, arg); 3719 3720 return; 3721 } 3722 elog(ERROR, "aggregate function cannot register a callback in this context"); 3723 } 3724 3725 3726 /* 3727 * aggregate_dummy - dummy execution routine for aggregate functions 3728 * 3729 * This function is listed as the implementation (prosrc field) of pg_proc 3730 * entries for aggregate functions. Its only purpose is to throw an error 3731 * if someone mistakenly executes such a function in the normal way. 3732 * 3733 * Perhaps someday we could assign real meaning to the prosrc field of 3734 * an aggregate? 3735 */ 3736 Datum 3737 aggregate_dummy(PG_FUNCTION_ARGS) 3738 { 3739 elog(ERROR, "aggregate function %u called as normal function", 3740 fcinfo->flinfo->fn_oid); 3741 return (Datum) 0; /* keep compiler quiet */ 3742 } 3743