1 /*-------------------------------------------------------------------------
2 *
3 * nodeAgg.c
4 * Routines to handle aggregate nodes.
5 *
6 * ExecAgg normally evaluates each aggregate in the following steps:
7 *
8 * transvalue = initcond
9 * foreach input_tuple do
10 * transvalue = transfunc(transvalue, input_value(s))
11 * result = finalfunc(transvalue, direct_argument(s))
12 *
13 * If a finalfunc is not supplied then the result is just the ending
14 * value of transvalue.
15 *
16 * Other behaviors can be selected by the "aggsplit" mode, which exists
17 * to support partial aggregation. It is possible to:
18 * * Skip running the finalfunc, so that the output is always the
19 * final transvalue state.
20 * * Substitute the combinefunc for the transfunc, so that transvalue
21 * states (propagated up from a child partial-aggregation step) are merged
22 * rather than processing raw input rows. (The statements below about
23 * the transfunc apply equally to the combinefunc, when it's selected.)
24 * * Apply the serializefunc to the output values (this only makes sense
25 * when skipping the finalfunc, since the serializefunc works on the
26 * transvalue data type).
27 * * Apply the deserializefunc to the input values (this only makes sense
28 * when using the combinefunc, for similar reasons).
29 * It is the planner's responsibility to connect up Agg nodes using these
30 * alternate behaviors in a way that makes sense, with partial aggregation
31 * results being fed to nodes that expect them.
32 *
33 * If a normal aggregate call specifies DISTINCT or ORDER BY, we sort the
34 * input tuples and eliminate duplicates (if required) before performing
35 * the above-depicted process. (However, we don't do that for ordered-set
36 * aggregates; their "ORDER BY" inputs are ordinary aggregate arguments
37 * so far as this module is concerned.) Note that partial aggregation
38 * is not supported in these cases, since we couldn't ensure global
39 * ordering or distinctness of the inputs.
40 *
41 * If transfunc is marked "strict" in pg_proc and initcond is NULL,
42 * then the first non-NULL input_value is assigned directly to transvalue,
43 * and transfunc isn't applied until the second non-NULL input_value.
44 * The agg's first input type and transtype must be the same in this case!
45 *
46 * If transfunc is marked "strict" then NULL input_values are skipped,
47 * keeping the previous transvalue. If transfunc is not strict then it
48 * is called for every input tuple and must deal with NULL initcond
49 * or NULL input_values for itself.
50 *
51 * If finalfunc is marked "strict" then it is not called when the
52 * ending transvalue is NULL, instead a NULL result is created
53 * automatically (this is just the usual handling of strict functions,
54 * of course). A non-strict finalfunc can make its own choice of
55 * what to return for a NULL ending transvalue.
56 *
57 * Ordered-set aggregates are treated specially in one other way: we
58 * evaluate any "direct" arguments and pass them to the finalfunc along
59 * with the transition value.
60 *
61 * A finalfunc can have additional arguments beyond the transvalue and
62 * any "direct" arguments, corresponding to the input arguments of the
63 * aggregate. These are always just passed as NULL. Such arguments may be
64 * needed to allow resolution of a polymorphic aggregate's result type.
65 *
66 * We compute aggregate input expressions and run the transition functions
67 * in a temporary econtext (aggstate->tmpcontext). This is reset at least
68 * once per input tuple, so when the transvalue datatype is
69 * pass-by-reference, we have to be careful to copy it into a longer-lived
70 * memory context, and free the prior value to avoid memory leakage. We
71 * store transvalues in another set of econtexts, aggstate->aggcontexts
72 * (one per grouping set, see below), which are also used for the hashtable
73 * structures in AGG_HASHED mode. These econtexts are rescanned, not just
74 * reset, at group boundaries so that aggregate transition functions can
75 * register shutdown callbacks via AggRegisterCallback.
76 *
77 * The node's regular econtext (aggstate->ss.ps.ps_ExprContext) is used to
78 * run finalize functions and compute the output tuple; this context can be
79 * reset once per output tuple.
80 *
81 * The executor's AggState node is passed as the fmgr "context" value in
82 * all transfunc and finalfunc calls. It is not recommended that the
83 * transition functions look at the AggState node directly, but they can
84 * use AggCheckCallContext() to verify that they are being called by
85 * nodeAgg.c (and not as ordinary SQL functions). The main reason a
86 * transition function might want to know this is so that it can avoid
87 * palloc'ing a fixed-size pass-by-ref transition value on every call:
88 * it can instead just scribble on and return its left input. Ordinarily
89 * it is completely forbidden for functions to modify pass-by-ref inputs,
90 * but in the aggregate case we know the left input is either the initial
91 * transition value or a previous function result, and in either case its
92 * value need not be preserved. See int8inc() for an example. Notice that
93 * advance_transition_function() is coded to avoid a data copy step when
94 * the previous transition value pointer is returned. It is also possible
95 * to avoid repeated data copying when the transition value is an expanded
96 * object: to do that, the transition function must take care to return
97 * an expanded object that is in a child context of the memory context
98 * returned by AggCheckCallContext(). Also, some transition functions want
99 * to store working state in addition to the nominal transition value; they
100 * can use the memory context returned by AggCheckCallContext() to do that.
101 *
102 * Note: AggCheckCallContext() is available as of PostgreSQL 9.0. The
103 * AggState is available as context in earlier releases (back to 8.1),
104 * but direct examination of the node is needed to use it before 9.0.
105 *
106 * As of 9.4, aggregate transition functions can also use AggGetAggref()
107 * to get hold of the Aggref expression node for their aggregate call.
108 * This is mainly intended for ordered-set aggregates, which are not
109 * supported as window functions. (A regular aggregate function would
110 * need some fallback logic to use this, since there's no Aggref node
111 * for a window function.)
112 *
113 * Grouping sets:
114 *
115 * A list of grouping sets which is structurally equivalent to a ROLLUP
116 * clause (e.g. (a,b,c), (a,b), (a)) can be processed in a single pass over
117 * ordered data. We do this by keeping a separate set of transition values
118 * for each grouping set being concurrently processed; for each input tuple
119 * we update them all, and on group boundaries we reset those states
120 * (starting at the front of the list) whose grouping values have changed
121 * (the list of grouping sets is ordered from most specific to least
122 * specific).
123 *
124 * Where more complex grouping sets are used, we break them down into
125 * "phases", where each phase has a different sort order (except phase 0
126 * which is reserved for hashing). During each phase but the last, the
127 * input tuples are additionally stored in a tuplesort which is keyed to the
128 * next phase's sort order; during each phase but the first, the input
129 * tuples are drawn from the previously sorted data. (The sorting of the
130 * data for the first phase is handled by the planner, as it might be
131 * satisfied by underlying nodes.)
132 *
133 * Hashing can be mixed with sorted grouping. To do this, we have an
134 * AGG_MIXED strategy that populates the hashtables during the first sorted
135 * phase, and switches to reading them out after completing all sort phases.
136 * We can also support AGG_HASHED with multiple hash tables and no sorting
137 * at all.
138 *
139 * From the perspective of aggregate transition and final functions, the
140 * only issue regarding grouping sets is this: a single call site (flinfo)
141 * of an aggregate function may be used for updating several different
142 * transition values in turn. So the function must not cache in the flinfo
143 * anything which logically belongs as part of the transition value (most
144 * importantly, the memory context in which the transition value exists).
145 * The support API functions (AggCheckCallContext, AggRegisterCallback) are
146 * sensitive to the grouping set for which the aggregate function is
147 * currently being called.
148 *
149 * Plan structure:
150 *
151 * What we get from the planner is actually one "real" Agg node which is
152 * part of the plan tree proper, but which optionally has an additional list
153 * of Agg nodes hung off the side via the "chain" field. This is because an
154 * Agg node happens to be a convenient representation of all the data we
155 * need for grouping sets.
156 *
157 * For many purposes, we treat the "real" node as if it were just the first
158 * node in the chain. The chain must be ordered such that hashed entries
159 * come before sorted/plain entries; the real node is marked AGG_MIXED if
160 * there are both types present (in which case the real node describes one
161 * of the hashed groupings, other AGG_HASHED nodes may optionally follow in
162 * the chain, followed in turn by AGG_SORTED or (one) AGG_PLAIN node). If
163 * the real node is marked AGG_HASHED or AGG_SORTED, then all the chained
164 * nodes must be of the same type; if it is AGG_PLAIN, there can be no
165 * chained nodes.
166 *
167 * We collect all hashed nodes into a single "phase", numbered 0, and create
168 * a sorted phase (numbered 1..n) for each AGG_SORTED or AGG_PLAIN node.
169 * Phase 0 is allocated even if there are no hashes, but remains unused in
170 * that case.
171 *
172 * AGG_HASHED nodes actually refer to only a single grouping set each,
173 * because for each hashed grouping we need a separate grpColIdx and
174 * numGroups estimate. AGG_SORTED nodes represent a "rollup", a list of
175 * grouping sets that share a sort order. Each AGG_SORTED node other than
176 * the first one has an associated Sort node which describes the sort order
177 * to be used; the first sorted node takes its input from the outer subtree,
178 * which the planner has already arranged to provide ordered data.
179 *
180 * Memory and ExprContext usage:
181 *
182 * Because we're accumulating aggregate values across input rows, we need to
183 * use more memory contexts than just simple input/output tuple contexts.
184 * In fact, for a rollup, we need a separate context for each grouping set
185 * so that we can reset the inner (finer-grained) aggregates on their group
186 * boundaries while continuing to accumulate values for outer
187 * (coarser-grained) groupings. On top of this, we might be simultaneously
188 * populating hashtables; however, we only need one context for all the
189 * hashtables.
190 *
191 * So we create an array, aggcontexts, with an ExprContext for each grouping
192 * set in the largest rollup that we're going to process, and use the
193 * per-tuple memory context of those ExprContexts to store the aggregate
194 * transition values. hashcontext is the single context created to support
195 * all hash tables.
196 *
197 *
198 * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
199 * Portions Copyright (c) 1994, Regents of the University of California
200 *
201 * IDENTIFICATION
202 * src/backend/executor/nodeAgg.c
203 *
204 *-------------------------------------------------------------------------
205 */
206
207 #include "postgres.h"
208
209 #include "access/htup_details.h"
210 #include "catalog/objectaccess.h"
211 #include "catalog/pg_aggregate.h"
212 #include "catalog/pg_proc.h"
213 #include "catalog/pg_type.h"
214 #include "executor/executor.h"
215 #include "executor/nodeAgg.h"
216 #include "miscadmin.h"
217 #include "nodes/makefuncs.h"
218 #include "nodes/nodeFuncs.h"
219 #include "optimizer/clauses.h"
220 #include "optimizer/tlist.h"
221 #include "parser/parse_agg.h"
222 #include "parser/parse_coerce.h"
223 #include "utils/acl.h"
224 #include "utils/builtins.h"
225 #include "utils/lsyscache.h"
226 #include "utils/memutils.h"
227 #include "utils/syscache.h"
228 #include "utils/tuplesort.h"
229 #include "utils/datum.h"
230
231
232 /*
233 * AggStatePerTransData - per aggregate state value information
234 *
235 * Working state for updating the aggregate's state value, by calling the
236 * transition function with an input row. This struct does not store the
237 * information needed to produce the final aggregate result from the transition
238 * state, that's stored in AggStatePerAggData instead. This separation allows
239 * multiple aggregate results to be produced from a single state value.
240 */
241 typedef struct AggStatePerTransData
242 {
243 /*
244 * These values are set up during ExecInitAgg() and do not change
245 * thereafter:
246 */
247
248 /*
249 * Link to an Aggref expr this state value is for.
250 *
251 * There can be multiple Aggref's sharing the same state value, as long as
252 * the inputs and transition function are identical. This points to the
253 * first one of them.
254 */
255 Aggref *aggref;
256
257 /*
258 * Nominal number of arguments for aggregate function. For plain aggs,
259 * this excludes any ORDER BY expressions. For ordered-set aggs, this
260 * counts both the direct and aggregated (ORDER BY) arguments.
261 */
262 int numArguments;
263
264 /*
265 * Number of aggregated input columns. This includes ORDER BY expressions
266 * in both the plain-agg and ordered-set cases. Ordered-set direct args
267 * are not counted, though.
268 */
269 int numInputs;
270
271 /*
272 * Number of aggregated input columns to pass to the transfn. This
273 * includes the ORDER BY columns for ordered-set aggs, but not for plain
274 * aggs. (This doesn't count the transition state value!)
275 */
276 int numTransInputs;
277
278 /*
279 * At each input row, we perform a single ExecProject call to evaluate all
280 * argument expressions that will certainly be needed at this row; that
281 * includes this aggregate's filter expression if it has one, or its
282 * regular argument expressions (including any ORDER BY columns) if it
283 * doesn't. inputoff is the starting index of this aggregate's required
284 * expressions in the resulting tuple.
285 */
286 int inputoff;
287
288 /* Oid of the state transition or combine function */
289 Oid transfn_oid;
290
291 /* Oid of the serialization function or InvalidOid */
292 Oid serialfn_oid;
293
294 /* Oid of the deserialization function or InvalidOid */
295 Oid deserialfn_oid;
296
297 /* Oid of state value's datatype */
298 Oid aggtranstype;
299
300 /* ExprStates for any direct-argument expressions */
301 List *aggdirectargs;
302
303 /*
304 * fmgr lookup data for transition function or combine function. Note in
305 * particular that the fn_strict flag is kept here.
306 */
307 FmgrInfo transfn;
308
309 /* fmgr lookup data for serialization function */
310 FmgrInfo serialfn;
311
312 /* fmgr lookup data for deserialization function */
313 FmgrInfo deserialfn;
314
315 /* Input collation derived for aggregate */
316 Oid aggCollation;
317
318 /* number of sorting columns */
319 int numSortCols;
320
321 /* number of sorting columns to consider in DISTINCT comparisons */
322 /* (this is either zero or the same as numSortCols) */
323 int numDistinctCols;
324
325 /* deconstructed sorting information (arrays of length numSortCols) */
326 AttrNumber *sortColIdx;
327 Oid *sortOperators;
328 Oid *sortCollations;
329 bool *sortNullsFirst;
330
331 /*
332 * fmgr lookup data for input columns' equality operators --- only
333 * set/used when aggregate has DISTINCT flag. Note that these are in
334 * order of sort column index, not parameter index.
335 */
336 FmgrInfo *equalfns; /* array of length numDistinctCols */
337
338 /*
339 * initial value from pg_aggregate entry
340 */
341 Datum initValue;
342 bool initValueIsNull;
343
344 /*
345 * We need the len and byval info for the agg's input and transition data
346 * types in order to know how to copy/delete values.
347 *
348 * Note that the info for the input type is used only when handling
349 * DISTINCT aggs with just one argument, so there is only one input type.
350 */
351 int16 inputtypeLen,
352 transtypeLen;
353 bool inputtypeByVal,
354 transtypeByVal;
355
356 /*
357 * Stuff for evaluation of aggregate inputs, when they must be evaluated
358 * separately because there's a FILTER expression. In such cases we will
359 * create a sortslot and the result will be stored there, whether or not
360 * we're actually sorting.
361 */
362 ProjectionInfo *evalproj; /* projection machinery */
363
364 /*
365 * Slots for holding the evaluated input arguments. These are set up
366 * during ExecInitAgg() and then used for each input row requiring either
367 * FILTER or ORDER BY/DISTINCT processing.
368 */
369 TupleTableSlot *sortslot; /* current input tuple */
370 TupleTableSlot *uniqslot; /* used for multi-column DISTINCT */
371 TupleDesc sortdesc; /* descriptor of input tuples */
372
373 /*
374 * These values are working state that is initialized at the start of an
375 * input tuple group and updated for each input tuple.
376 *
377 * For a simple (non DISTINCT/ORDER BY) aggregate, we just feed the input
378 * values straight to the transition function. If it's DISTINCT or
379 * requires ORDER BY, we pass the input values into a Tuplesort object;
380 * then at completion of the input tuple group, we scan the sorted values,
381 * eliminate duplicates if needed, and run the transition function on the
382 * rest.
383 *
384 * We need a separate tuplesort for each grouping set.
385 */
386
387 Tuplesortstate **sortstates; /* sort objects, if DISTINCT or ORDER BY */
388
389 /*
390 * This field is a pre-initialized FunctionCallInfo struct used for
391 * calling this aggregate's transfn. We save a few cycles per row by not
392 * re-initializing the unchanging fields; which isn't much, but it seems
393 * worth the extra space consumption.
394 */
395 FunctionCallInfoData transfn_fcinfo;
396
397 /* Likewise for serialization and deserialization functions */
398 FunctionCallInfoData serialfn_fcinfo;
399
400 FunctionCallInfoData deserialfn_fcinfo;
401 } AggStatePerTransData;
402
403 /*
404 * AggStatePerAggData - per-aggregate information
405 *
406 * This contains the information needed to call the final function, to produce
407 * a final aggregate result from the state value. If there are multiple
408 * identical Aggrefs in the query, they can all share the same per-agg data.
409 *
410 * These values are set up during ExecInitAgg() and do not change thereafter.
411 */
412 typedef struct AggStatePerAggData
413 {
414 /*
415 * Link to an Aggref expr this state value is for.
416 *
417 * There can be multiple identical Aggref's sharing the same per-agg. This
418 * points to the first one of them.
419 */
420 Aggref *aggref;
421
422 /* index to the state value which this agg should use */
423 int transno;
424
425 /* Optional Oid of final function (may be InvalidOid) */
426 Oid finalfn_oid;
427
428 /*
429 * fmgr lookup data for final function --- only valid when finalfn_oid oid
430 * is not InvalidOid.
431 */
432 FmgrInfo finalfn;
433
434 /*
435 * Number of arguments to pass to the finalfn. This is always at least 1
436 * (the transition state value) plus any ordered-set direct args. If the
437 * finalfn wants extra args then we pass nulls corresponding to the
438 * aggregated input columns.
439 */
440 int numFinalArgs;
441
442 /*
443 * We need the len and byval info for the agg's result data type in order
444 * to know how to copy/delete values.
445 */
446 int16 resulttypeLen;
447 bool resulttypeByVal;
448
449 } AggStatePerAggData;
450
451 /*
452 * AggStatePerGroupData - per-aggregate-per-group working state
453 *
454 * These values are working state that is initialized at the start of
455 * an input tuple group and updated for each input tuple.
456 *
457 * In AGG_PLAIN and AGG_SORTED modes, we have a single array of these
458 * structs (pointed to by aggstate->pergroup); we re-use the array for
459 * each input group, if it's AGG_SORTED mode. In AGG_HASHED mode, the
460 * hash table contains an array of these structs for each tuple group.
461 *
462 * Logically, the sortstate field belongs in this struct, but we do not
463 * keep it here for space reasons: we don't support DISTINCT aggregates
464 * in AGG_HASHED mode, so there's no reason to use up a pointer field
465 * in every entry of the hashtable.
466 */
467 typedef struct AggStatePerGroupData
468 {
469 Datum transValue; /* current transition value */
470 bool transValueIsNull;
471
472 bool noTransValue; /* true if transValue not set yet */
473
474 /*
475 * Note: noTransValue initially has the same value as transValueIsNull,
476 * and if true both are cleared to false at the same time. They are not
477 * the same though: if transfn later returns a NULL, we want to keep that
478 * NULL and not auto-replace it with a later input value. Only the first
479 * non-NULL input will be auto-substituted.
480 */
481 } AggStatePerGroupData;
482
483 /*
484 * AggStatePerPhaseData - per-grouping-set-phase state
485 *
486 * Grouping sets are divided into "phases", where a single phase can be
487 * processed in one pass over the input. If there is more than one phase, then
488 * at the end of input from the current phase, state is reset and another pass
489 * taken over the data which has been re-sorted in the mean time.
490 *
491 * Accordingly, each phase specifies a list of grouping sets and group clause
492 * information, plus each phase after the first also has a sort order.
493 */
494 typedef struct AggStatePerPhaseData
495 {
496 AggStrategy aggstrategy; /* strategy for this phase */
497 int numsets; /* number of grouping sets (or 0) */
498 int *gset_lengths; /* lengths of grouping sets */
499 Bitmapset **grouped_cols; /* column groupings for rollup */
500 FmgrInfo *eqfunctions; /* per-grouping-field equality fns */
501 Agg *aggnode; /* Agg node for phase data */
502 Sort *sortnode; /* Sort node for input ordering for phase */
503 } AggStatePerPhaseData;
504
505 /*
506 * AggStatePerHashData - per-hashtable state
507 *
508 * When doing grouping sets with hashing, we have one of these for each
509 * grouping set. (When doing hashing without grouping sets, we have just one of
510 * them.)
511 */
512 typedef struct AggStatePerHashData
513 {
514 TupleHashTable hashtable; /* hash table with one entry per group */
515 TupleHashIterator hashiter; /* for iterating through hash table */
516 TupleTableSlot *hashslot; /* slot for loading hash table */
517 FmgrInfo *hashfunctions; /* per-grouping-field hash fns */
518 FmgrInfo *eqfunctions; /* per-grouping-field equality fns */
519 int numCols; /* number of hash key columns */
520 int numhashGrpCols; /* number of columns in hash table */
521 int largestGrpColIdx; /* largest col required for hashing */
522 AttrNumber *hashGrpColIdxInput; /* hash col indices in input slot */
523 AttrNumber *hashGrpColIdxHash; /* indices in hashtbl tuples */
524 Agg *aggnode; /* original Agg node, for numGroups etc. */
525 } AggStatePerHashData;
526
527
528 static void select_current_set(AggState *aggstate, int setno, bool is_hash);
529 static void initialize_phase(AggState *aggstate, int newphase);
530 static TupleTableSlot *fetch_input_tuple(AggState *aggstate);
531 static void initialize_aggregates(AggState *aggstate,
532 AggStatePerGroup pergroup,
533 int numReset);
534 static void advance_transition_function(AggState *aggstate,
535 AggStatePerTrans pertrans,
536 AggStatePerGroup pergroupstate);
537 static void advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup,
538 AggStatePerGroup *pergroups);
539 static void advance_combine_function(AggState *aggstate,
540 AggStatePerTrans pertrans,
541 AggStatePerGroup pergroupstate);
542 static void combine_aggregates(AggState *aggstate, AggStatePerGroup pergroup);
543 static void process_ordered_aggregate_single(AggState *aggstate,
544 AggStatePerTrans pertrans,
545 AggStatePerGroup pergroupstate);
546 static void process_ordered_aggregate_multi(AggState *aggstate,
547 AggStatePerTrans pertrans,
548 AggStatePerGroup pergroupstate);
549 static void finalize_aggregate(AggState *aggstate,
550 AggStatePerAgg peragg,
551 AggStatePerGroup pergroupstate,
552 Datum *resultVal, bool *resultIsNull);
553 static void finalize_partialaggregate(AggState *aggstate,
554 AggStatePerAgg peragg,
555 AggStatePerGroup pergroupstate,
556 Datum *resultVal, bool *resultIsNull);
557 static void prepare_projection_slot(AggState *aggstate,
558 TupleTableSlot *slot,
559 int currentSet);
560 static void finalize_aggregates(AggState *aggstate,
561 AggStatePerAgg peragg,
562 AggStatePerGroup pergroup);
563 static TupleTableSlot *project_aggregates(AggState *aggstate);
564 static Bitmapset *find_unaggregated_cols(AggState *aggstate);
565 static bool find_unaggregated_cols_walker(Node *node, Bitmapset **colnos);
566 static void build_hash_table(AggState *aggstate);
567 static TupleHashEntryData *lookup_hash_entry(AggState *aggstate);
568 static AggStatePerGroup *lookup_hash_entries(AggState *aggstate);
569 static TupleTableSlot *agg_retrieve_direct(AggState *aggstate);
570 static void agg_fill_hash_table(AggState *aggstate);
571 static TupleTableSlot *agg_retrieve_hash_table(AggState *aggstate);
572 static Datum GetAggInitVal(Datum textInitVal, Oid transtype);
573 static void build_pertrans_for_aggref(AggStatePerTrans pertrans,
574 AggState *aggstate, EState *estate,
575 Aggref *aggref, Oid aggtransfn, Oid aggtranstype,
576 Oid aggserialfn, Oid aggdeserialfn,
577 Datum initValue, bool initValueIsNull,
578 Oid *inputTypes, int numArguments);
579 static int find_compatible_peragg(Aggref *newagg, AggState *aggstate,
580 int lastaggno, List **same_input_transnos);
581 static int find_compatible_pertrans(AggState *aggstate, Aggref *newagg,
582 Oid aggtransfn, Oid aggtranstype,
583 Oid aggserialfn, Oid aggdeserialfn,
584 Datum initValue, bool initValueIsNull,
585 List *transnos);
586
587
588 /*
589 * Select the current grouping set; affects current_set and
590 * curaggcontext.
591 */
592 static void
select_current_set(AggState * aggstate,int setno,bool is_hash)593 select_current_set(AggState *aggstate, int setno, bool is_hash)
594 {
595 if (is_hash)
596 aggstate->curaggcontext = aggstate->hashcontext;
597 else
598 aggstate->curaggcontext = aggstate->aggcontexts[setno];
599
600 aggstate->current_set = setno;
601 }
602
603 /*
604 * Switch to phase "newphase", which must either be 0 or 1 (to reset) or
605 * current_phase + 1. Juggle the tuplesorts accordingly.
606 *
607 * Phase 0 is for hashing, which we currently handle last in the AGG_MIXED
608 * case, so when entering phase 0, all we need to do is drop open sorts.
609 */
610 static void
initialize_phase(AggState * aggstate,int newphase)611 initialize_phase(AggState *aggstate, int newphase)
612 {
613 Assert(newphase <= 1 || newphase == aggstate->current_phase + 1);
614
615 /*
616 * Whatever the previous state, we're now done with whatever input
617 * tuplesort was in use.
618 */
619 if (aggstate->sort_in)
620 {
621 tuplesort_end(aggstate->sort_in);
622 aggstate->sort_in = NULL;
623 }
624
625 if (newphase <= 1)
626 {
627 /*
628 * Discard any existing output tuplesort.
629 */
630 if (aggstate->sort_out)
631 {
632 tuplesort_end(aggstate->sort_out);
633 aggstate->sort_out = NULL;
634 }
635 }
636 else
637 {
638 /*
639 * The old output tuplesort becomes the new input one, and this is the
640 * right time to actually sort it.
641 */
642 aggstate->sort_in = aggstate->sort_out;
643 aggstate->sort_out = NULL;
644 Assert(aggstate->sort_in);
645 tuplesort_performsort(aggstate->sort_in);
646 }
647
648 /*
649 * If this isn't the last phase, we need to sort appropriately for the
650 * next phase in sequence.
651 */
652 if (newphase > 0 && newphase < aggstate->numphases - 1)
653 {
654 Sort *sortnode = aggstate->phases[newphase + 1].sortnode;
655 PlanState *outerNode = outerPlanState(aggstate);
656 TupleDesc tupDesc = ExecGetResultType(outerNode);
657
658 aggstate->sort_out = tuplesort_begin_heap(tupDesc,
659 sortnode->numCols,
660 sortnode->sortColIdx,
661 sortnode->sortOperators,
662 sortnode->collations,
663 sortnode->nullsFirst,
664 work_mem,
665 false);
666 }
667
668 aggstate->current_phase = newphase;
669 aggstate->phase = &aggstate->phases[newphase];
670 }
671
672 /*
673 * Fetch a tuple from either the outer plan (for phase 1) or from the sorter
674 * populated by the previous phase. Copy it to the sorter for the next phase
675 * if any.
676 *
677 * Callers cannot rely on memory for tuple in returned slot remaining valid
678 * past any subsequently fetched tuple.
679 */
680 static TupleTableSlot *
fetch_input_tuple(AggState * aggstate)681 fetch_input_tuple(AggState *aggstate)
682 {
683 TupleTableSlot *slot;
684
685 if (aggstate->sort_in)
686 {
687 /* make sure we check for interrupts in either path through here */
688 CHECK_FOR_INTERRUPTS();
689 if (!tuplesort_gettupleslot(aggstate->sort_in, true, false,
690 aggstate->sort_slot, NULL))
691 return NULL;
692 slot = aggstate->sort_slot;
693 }
694 else
695 slot = ExecProcNode(outerPlanState(aggstate));
696
697 if (!TupIsNull(slot) && aggstate->sort_out)
698 tuplesort_puttupleslot(aggstate->sort_out, slot);
699
700 return slot;
701 }
702
703 /*
704 * (Re)Initialize an individual aggregate.
705 *
706 * This function handles only one grouping set, already set in
707 * aggstate->current_set.
708 *
709 * When called, CurrentMemoryContext should be the per-query context.
710 */
711 static void
initialize_aggregate(AggState * aggstate,AggStatePerTrans pertrans,AggStatePerGroup pergroupstate)712 initialize_aggregate(AggState *aggstate, AggStatePerTrans pertrans,
713 AggStatePerGroup pergroupstate)
714 {
715 /*
716 * Start a fresh sort operation for each DISTINCT/ORDER BY aggregate.
717 */
718 if (pertrans->numSortCols > 0)
719 {
720 /*
721 * In case of rescan, maybe there could be an uncompleted sort
722 * operation? Clean it up if so.
723 */
724 if (pertrans->sortstates[aggstate->current_set])
725 tuplesort_end(pertrans->sortstates[aggstate->current_set]);
726
727
728 /*
729 * We use a plain Datum sorter when there's a single input column;
730 * otherwise sort the full tuple. (See comments for
731 * process_ordered_aggregate_single.)
732 */
733 if (pertrans->numInputs == 1)
734 pertrans->sortstates[aggstate->current_set] =
735 tuplesort_begin_datum(pertrans->sortdesc->attrs[0]->atttypid,
736 pertrans->sortOperators[0],
737 pertrans->sortCollations[0],
738 pertrans->sortNullsFirst[0],
739 work_mem, false);
740 else
741 pertrans->sortstates[aggstate->current_set] =
742 tuplesort_begin_heap(pertrans->sortdesc,
743 pertrans->numSortCols,
744 pertrans->sortColIdx,
745 pertrans->sortOperators,
746 pertrans->sortCollations,
747 pertrans->sortNullsFirst,
748 work_mem, false);
749 }
750
751 /*
752 * (Re)set transValue to the initial value.
753 *
754 * Note that when the initial value is pass-by-ref, we must copy it (into
755 * the aggcontext) since we will pfree the transValue later.
756 */
757 if (pertrans->initValueIsNull)
758 pergroupstate->transValue = pertrans->initValue;
759 else
760 {
761 MemoryContext oldContext;
762
763 oldContext = MemoryContextSwitchTo(
764 aggstate->curaggcontext->ecxt_per_tuple_memory);
765 pergroupstate->transValue = datumCopy(pertrans->initValue,
766 pertrans->transtypeByVal,
767 pertrans->transtypeLen);
768 MemoryContextSwitchTo(oldContext);
769 }
770 pergroupstate->transValueIsNull = pertrans->initValueIsNull;
771
772 /*
773 * If the initial value for the transition state doesn't exist in the
774 * pg_aggregate table then we will let the first non-NULL value returned
775 * from the outer procNode become the initial value. (This is useful for
776 * aggregates like max() and min().) The noTransValue flag signals that we
777 * still need to do this.
778 */
779 pergroupstate->noTransValue = pertrans->initValueIsNull;
780 }
781
782 /*
783 * Initialize all aggregate transition states for a new group of input values.
784 *
785 * If there are multiple grouping sets, we initialize only the first numReset
786 * of them (the grouping sets are ordered so that the most specific one, which
787 * is reset most often, is first). As a convenience, if numReset is 0, we
788 * reinitialize all sets. numReset is -1 to initialize a hashtable entry, in
789 * which case the caller must have used select_current_set appropriately.
790 *
791 * When called, CurrentMemoryContext should be the per-query context.
792 */
793 static void
initialize_aggregates(AggState * aggstate,AggStatePerGroup pergroup,int numReset)794 initialize_aggregates(AggState *aggstate,
795 AggStatePerGroup pergroup,
796 int numReset)
797 {
798 int transno;
799 int numGroupingSets = Max(aggstate->phase->numsets, 1);
800 int setno = 0;
801 int numTrans = aggstate->numtrans;
802 AggStatePerTrans transstates = aggstate->pertrans;
803
804 if (numReset == 0)
805 numReset = numGroupingSets;
806
807 for (transno = 0; transno < numTrans; transno++)
808 {
809 AggStatePerTrans pertrans = &transstates[transno];
810
811 if (numReset < 0)
812 {
813 AggStatePerGroup pergroupstate;
814
815 pergroupstate = &pergroup[transno];
816
817 initialize_aggregate(aggstate, pertrans, pergroupstate);
818 }
819 else
820 {
821 for (setno = 0; setno < numReset; setno++)
822 {
823 AggStatePerGroup pergroupstate;
824
825 pergroupstate = &pergroup[transno + (setno * numTrans)];
826
827 select_current_set(aggstate, setno, false);
828
829 initialize_aggregate(aggstate, pertrans, pergroupstate);
830 }
831 }
832 }
833 }
834
835 /*
836 * Given new input value(s), advance the transition function of one aggregate
837 * state within one grouping set only (already set in aggstate->current_set)
838 *
839 * The new values (and null flags) have been preloaded into argument positions
840 * 1 and up in pertrans->transfn_fcinfo, so that we needn't copy them again to
841 * pass to the transition function. We also expect that the static fields of
842 * the fcinfo are already initialized; that was done by ExecInitAgg().
843 *
844 * It doesn't matter which memory context this is called in.
845 */
846 static void
advance_transition_function(AggState * aggstate,AggStatePerTrans pertrans,AggStatePerGroup pergroupstate)847 advance_transition_function(AggState *aggstate,
848 AggStatePerTrans pertrans,
849 AggStatePerGroup pergroupstate)
850 {
851 FunctionCallInfo fcinfo = &pertrans->transfn_fcinfo;
852 MemoryContext oldContext;
853 Datum newVal;
854
855 if (pertrans->transfn.fn_strict)
856 {
857 /*
858 * For a strict transfn, nothing happens when there's a NULL input; we
859 * just keep the prior transValue.
860 */
861 int numTransInputs = pertrans->numTransInputs;
862 int i;
863
864 for (i = 1; i <= numTransInputs; i++)
865 {
866 if (fcinfo->argnull[i])
867 return;
868 }
869 if (pergroupstate->noTransValue)
870 {
871 /*
872 * transValue has not been initialized. This is the first non-NULL
873 * input value. We use it as the initial value for transValue. (We
874 * already checked that the agg's input type is binary-compatible
875 * with its transtype, so straight copy here is OK.)
876 *
877 * We must copy the datum into aggcontext if it is pass-by-ref. We
878 * do not need to pfree the old transValue, since it's NULL.
879 */
880 oldContext = MemoryContextSwitchTo(
881 aggstate->curaggcontext->ecxt_per_tuple_memory);
882 pergroupstate->transValue = datumCopy(fcinfo->arg[1],
883 pertrans->transtypeByVal,
884 pertrans->transtypeLen);
885 pergroupstate->transValueIsNull = false;
886 pergroupstate->noTransValue = false;
887 MemoryContextSwitchTo(oldContext);
888 return;
889 }
890 if (pergroupstate->transValueIsNull)
891 {
892 /*
893 * Don't call a strict function with NULL inputs. Note it is
894 * possible to get here despite the above tests, if the transfn is
895 * strict *and* returned a NULL on a prior cycle. If that happens
896 * we will propagate the NULL all the way to the end.
897 */
898 return;
899 }
900 }
901
902 /* We run the transition functions in per-input-tuple memory context */
903 oldContext = MemoryContextSwitchTo(aggstate->tmpcontext->ecxt_per_tuple_memory);
904
905 /* set up aggstate->curpertrans for AggGetAggref() */
906 aggstate->curpertrans = pertrans;
907
908 /*
909 * OK to call the transition function
910 */
911 fcinfo->arg[0] = pergroupstate->transValue;
912 fcinfo->argnull[0] = pergroupstate->transValueIsNull;
913 fcinfo->isnull = false; /* just in case transfn doesn't set it */
914
915 newVal = FunctionCallInvoke(fcinfo);
916
917 aggstate->curpertrans = NULL;
918
919 /*
920 * If pass-by-ref datatype, must copy the new value into aggcontext and
921 * free the prior transValue. But if transfn returned a pointer to its
922 * first input, we don't need to do anything. Also, if transfn returned a
923 * pointer to a R/W expanded object that is already a child of the
924 * aggcontext, assume we can adopt that value without copying it.
925 *
926 * It's safe to compare newVal with pergroupstate->transValue without
927 * regard for either being NULL, because we below take care to set
928 * transValue to 0 when NULL. Otherwise we could end up accidentally not
929 * reparenting, when the transValue has the same numerical value as
930 * newVal, despite being NULL. This is a somewhat hot path, making it
931 * undesirable to instead solve this with another branch for the common
932 * case of the transition function returning its (modified) input
933 * argument.
934 */
935 if (!pertrans->transtypeByVal &&
936 DatumGetPointer(newVal) != DatumGetPointer(pergroupstate->transValue))
937 {
938 if (!fcinfo->isnull)
939 {
940 MemoryContextSwitchTo(aggstate->curaggcontext->ecxt_per_tuple_memory);
941 if (DatumIsReadWriteExpandedObject(newVal,
942 false,
943 pertrans->transtypeLen) &&
944 MemoryContextGetParent(DatumGetEOHP(newVal)->eoh_context) == CurrentMemoryContext)
945 /* do nothing */ ;
946 else
947 newVal = datumCopy(newVal,
948 pertrans->transtypeByVal,
949 pertrans->transtypeLen);
950 }
951 else
952 {
953 /*
954 * Ensure that pergroupstate->transValue ends up being 0, so we
955 * can safely compare newVal/transValue without having to check
956 * their respective nullness.
957 */
958 newVal = (Datum) 0;
959 }
960
961 if (!pergroupstate->transValueIsNull)
962 {
963 if (DatumIsReadWriteExpandedObject(pergroupstate->transValue,
964 false,
965 pertrans->transtypeLen))
966 DeleteExpandedObject(pergroupstate->transValue);
967 else
968 pfree(DatumGetPointer(pergroupstate->transValue));
969 }
970 }
971
972 pergroupstate->transValue = newVal;
973 pergroupstate->transValueIsNull = fcinfo->isnull;
974
975 MemoryContextSwitchTo(oldContext);
976 }
977
978 /*
979 * Advance each aggregate transition state for one input tuple. The input
980 * tuple has been stored in tmpcontext->ecxt_outertuple, so that it is
981 * accessible to ExecEvalExpr.
982 *
983 * We have two sets of transition states to handle: one for sorted aggregation
984 * and one for hashed; we do them both here, to avoid multiple evaluation of
985 * the inputs.
986 *
987 * When called, CurrentMemoryContext should be the per-query context.
988 */
989 static void
advance_aggregates(AggState * aggstate,AggStatePerGroup pergroup,AggStatePerGroup * pergroups)990 advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup, AggStatePerGroup *pergroups)
991 {
992 int transno;
993 int setno = 0;
994 int numGroupingSets = Max(aggstate->phase->numsets, 1);
995 int numHashes = aggstate->num_hashes;
996 int numTrans = aggstate->numtrans;
997 TupleTableSlot *combinedslot;
998
999 /* compute required inputs for all aggregates */
1000 combinedslot = ExecProject(aggstate->combinedproj);
1001
1002 for (transno = 0; transno < numTrans; transno++)
1003 {
1004 AggStatePerTrans pertrans = &aggstate->pertrans[transno];
1005 int numTransInputs = pertrans->numTransInputs;
1006 int inputoff = pertrans->inputoff;
1007 TupleTableSlot *slot;
1008 int i;
1009
1010 /* Skip anything FILTERed out */
1011 if (pertrans->aggref->aggfilter)
1012 {
1013 /* Check the result of the filter expression */
1014 if (combinedslot->tts_isnull[inputoff] ||
1015 !DatumGetBool(combinedslot->tts_values[inputoff]))
1016 continue;
1017
1018 /* Now it's safe to evaluate this agg's arguments */
1019 slot = ExecProject(pertrans->evalproj);
1020 /* There's no offset needed in this slot, of course */
1021 inputoff = 0;
1022 }
1023 else
1024 {
1025 /* arguments are already evaluated into combinedslot @ inputoff */
1026 slot = combinedslot;
1027 }
1028
1029 if (pertrans->numSortCols > 0)
1030 {
1031 /* DISTINCT and/or ORDER BY case */
1032 Assert(slot->tts_nvalid >= (pertrans->numInputs + inputoff));
1033 Assert(!pergroups);
1034
1035 /*
1036 * If the transfn is strict, we want to check for nullity before
1037 * storing the row in the sorter, to save space if there are a lot
1038 * of nulls. Note that we must only check numTransInputs columns,
1039 * not numInputs, since nullity in columns used only for sorting
1040 * is not relevant here.
1041 */
1042 if (pertrans->transfn.fn_strict)
1043 {
1044 for (i = 0; i < numTransInputs; i++)
1045 {
1046 if (slot->tts_isnull[i + inputoff])
1047 break;
1048 }
1049 if (i < numTransInputs)
1050 continue;
1051 }
1052
1053 for (setno = 0; setno < numGroupingSets; setno++)
1054 {
1055 /* OK, put the tuple into the tuplesort object */
1056 if (pertrans->numInputs == 1)
1057 tuplesort_putdatum(pertrans->sortstates[setno],
1058 slot->tts_values[inputoff],
1059 slot->tts_isnull[inputoff]);
1060 else if (pertrans->aggref->aggfilter)
1061 {
1062 /*
1063 * When filtering and ordering, we already have a slot
1064 * containing just the argument columns.
1065 */
1066 Assert(slot == pertrans->sortslot);
1067 tuplesort_puttupleslot(pertrans->sortstates[setno], slot);
1068 }
1069 else
1070 {
1071 /*
1072 * Copy argument columns from combined slot, starting at
1073 * inputoff, into sortslot, so that we can store just the
1074 * columns we want.
1075 */
1076 ExecClearTuple(pertrans->sortslot);
1077 memcpy(pertrans->sortslot->tts_values,
1078 &slot->tts_values[inputoff],
1079 pertrans->numInputs * sizeof(Datum));
1080 memcpy(pertrans->sortslot->tts_isnull,
1081 &slot->tts_isnull[inputoff],
1082 pertrans->numInputs * sizeof(bool));
1083 ExecStoreVirtualTuple(pertrans->sortslot);
1084 tuplesort_puttupleslot(pertrans->sortstates[setno],
1085 pertrans->sortslot);
1086 }
1087 }
1088 }
1089 else
1090 {
1091 /* We can apply the transition function immediately */
1092 FunctionCallInfo fcinfo = &pertrans->transfn_fcinfo;
1093
1094 /* Load values into fcinfo */
1095 /* Start from 1, since the 0th arg will be the transition value */
1096 Assert(slot->tts_nvalid >= (numTransInputs + inputoff));
1097
1098 for (i = 0; i < numTransInputs; i++)
1099 {
1100 fcinfo->arg[i + 1] = slot->tts_values[i + inputoff];
1101 fcinfo->argnull[i + 1] = slot->tts_isnull[i + inputoff];
1102 }
1103
1104 if (pergroup)
1105 {
1106 /* advance transition states for ordered grouping */
1107
1108 for (setno = 0; setno < numGroupingSets; setno++)
1109 {
1110 AggStatePerGroup pergroupstate;
1111
1112 select_current_set(aggstate, setno, false);
1113
1114 pergroupstate = &pergroup[transno + (setno * numTrans)];
1115
1116 advance_transition_function(aggstate, pertrans, pergroupstate);
1117 }
1118 }
1119
1120 if (pergroups)
1121 {
1122 /* advance transition states for hashed grouping */
1123
1124 for (setno = 0; setno < numHashes; setno++)
1125 {
1126 AggStatePerGroup pergroupstate;
1127
1128 select_current_set(aggstate, setno, true);
1129
1130 pergroupstate = &pergroups[setno][transno];
1131
1132 advance_transition_function(aggstate, pertrans, pergroupstate);
1133 }
1134 }
1135 }
1136 }
1137 }
1138
1139 /*
1140 * combine_aggregates replaces advance_aggregates in DO_AGGSPLIT_COMBINE
1141 * mode. The principal difference is that here we may need to apply the
1142 * deserialization function before running the transfn (which, in this mode,
1143 * is actually the aggregate's combinefn). Also, we know we don't need to
1144 * handle FILTER, DISTINCT, ORDER BY, or grouping sets.
1145 */
1146 static void
combine_aggregates(AggState * aggstate,AggStatePerGroup pergroup)1147 combine_aggregates(AggState *aggstate, AggStatePerGroup pergroup)
1148 {
1149 int transno;
1150 int numTrans = aggstate->numtrans;
1151 TupleTableSlot *slot;
1152
1153 /* combine not supported with grouping sets */
1154 Assert(aggstate->phase->numsets <= 1);
1155
1156 /* compute input for all aggregates */
1157 slot = ExecProject(aggstate->combinedproj);
1158
1159 for (transno = 0; transno < numTrans; transno++)
1160 {
1161 AggStatePerTrans pertrans = &aggstate->pertrans[transno];
1162 AggStatePerGroup pergroupstate = &pergroup[transno];
1163 FunctionCallInfo fcinfo = &pertrans->transfn_fcinfo;
1164 int inputoff = pertrans->inputoff;
1165
1166 Assert(slot->tts_nvalid > inputoff);
1167
1168 /*
1169 * deserialfn_oid will be set if we must deserialize the input state
1170 * before calling the combine function
1171 */
1172 if (OidIsValid(pertrans->deserialfn_oid))
1173 {
1174 /* Don't call a strict deserialization function with NULL input */
1175 if (pertrans->deserialfn.fn_strict && slot->tts_isnull[inputoff])
1176 {
1177 fcinfo->arg[1] = slot->tts_values[inputoff];
1178 fcinfo->argnull[1] = slot->tts_isnull[inputoff];
1179 }
1180 else
1181 {
1182 FunctionCallInfo dsinfo = &pertrans->deserialfn_fcinfo;
1183 MemoryContext oldContext;
1184
1185 dsinfo->arg[0] = slot->tts_values[inputoff];
1186 dsinfo->argnull[0] = slot->tts_isnull[inputoff];
1187 /* Dummy second argument for type-safety reasons */
1188 dsinfo->arg[1] = PointerGetDatum(NULL);
1189 dsinfo->argnull[1] = false;
1190
1191 /*
1192 * We run the deserialization functions in per-input-tuple
1193 * memory context.
1194 */
1195 oldContext = MemoryContextSwitchTo(aggstate->tmpcontext->ecxt_per_tuple_memory);
1196
1197 fcinfo->arg[1] = FunctionCallInvoke(dsinfo);
1198 fcinfo->argnull[1] = dsinfo->isnull;
1199
1200 MemoryContextSwitchTo(oldContext);
1201 }
1202 }
1203 else
1204 {
1205 fcinfo->arg[1] = slot->tts_values[inputoff];
1206 fcinfo->argnull[1] = slot->tts_isnull[inputoff];
1207 }
1208
1209 advance_combine_function(aggstate, pertrans, pergroupstate);
1210 }
1211 }
1212
1213 /*
1214 * Perform combination of states between 2 aggregate states. Effectively this
1215 * 'adds' two states together by whichever logic is defined in the aggregate
1216 * function's combine function.
1217 *
1218 * Note that in this case transfn is set to the combination function. This
1219 * perhaps should be changed to avoid confusion, but one field is ok for now
1220 * as they'll never be needed at the same time.
1221 */
1222 static void
advance_combine_function(AggState * aggstate,AggStatePerTrans pertrans,AggStatePerGroup pergroupstate)1223 advance_combine_function(AggState *aggstate,
1224 AggStatePerTrans pertrans,
1225 AggStatePerGroup pergroupstate)
1226 {
1227 FunctionCallInfo fcinfo = &pertrans->transfn_fcinfo;
1228 MemoryContext oldContext;
1229 Datum newVal;
1230
1231 if (pertrans->transfn.fn_strict)
1232 {
1233 /* if we're asked to merge to a NULL state, then do nothing */
1234 if (fcinfo->argnull[1])
1235 return;
1236
1237 if (pergroupstate->noTransValue)
1238 {
1239 /*
1240 * transValue has not yet been initialized. If pass-by-ref
1241 * datatype we must copy the combining state value into
1242 * aggcontext.
1243 */
1244 if (!pertrans->transtypeByVal)
1245 {
1246 oldContext = MemoryContextSwitchTo(
1247 aggstate->curaggcontext->ecxt_per_tuple_memory);
1248 pergroupstate->transValue = datumCopy(fcinfo->arg[1],
1249 pertrans->transtypeByVal,
1250 pertrans->transtypeLen);
1251 MemoryContextSwitchTo(oldContext);
1252 }
1253 else
1254 pergroupstate->transValue = fcinfo->arg[1];
1255
1256 pergroupstate->transValueIsNull = false;
1257 pergroupstate->noTransValue = false;
1258 return;
1259 }
1260
1261 if (pergroupstate->transValueIsNull)
1262 {
1263 /*
1264 * Don't call a strict function with NULL inputs. Note it is
1265 * possible to get here despite the above tests, if the combinefn
1266 * is strict *and* returned a NULL on a prior cycle. If that
1267 * happens we will propagate the NULL all the way to the end.
1268 */
1269 return;
1270 }
1271 }
1272
1273 /* We run the combine functions in per-input-tuple memory context */
1274 oldContext = MemoryContextSwitchTo(aggstate->tmpcontext->ecxt_per_tuple_memory);
1275
1276 /* set up aggstate->curpertrans for AggGetAggref() */
1277 aggstate->curpertrans = pertrans;
1278
1279 /*
1280 * OK to call the combine function
1281 */
1282 fcinfo->arg[0] = pergroupstate->transValue;
1283 fcinfo->argnull[0] = pergroupstate->transValueIsNull;
1284 fcinfo->isnull = false; /* just in case combine func doesn't set it */
1285
1286 newVal = FunctionCallInvoke(fcinfo);
1287
1288 aggstate->curpertrans = NULL;
1289
1290 /*
1291 * If pass-by-ref datatype, must copy the new value into aggcontext and
1292 * free the prior transValue. But if the combine function returned a
1293 * pointer to its first input, we don't need to do anything. Also, if the
1294 * combine function returned a pointer to a R/W expanded object that is
1295 * already a child of the aggcontext, assume we can adopt that value
1296 * without copying it.
1297 */
1298 if (!pertrans->transtypeByVal &&
1299 DatumGetPointer(newVal) != DatumGetPointer(pergroupstate->transValue))
1300 {
1301 if (!fcinfo->isnull)
1302 {
1303 MemoryContextSwitchTo(aggstate->curaggcontext->ecxt_per_tuple_memory);
1304 if (DatumIsReadWriteExpandedObject(newVal,
1305 false,
1306 pertrans->transtypeLen) &&
1307 MemoryContextGetParent(DatumGetEOHP(newVal)->eoh_context) == CurrentMemoryContext)
1308 /* do nothing */ ;
1309 else
1310 newVal = datumCopy(newVal,
1311 pertrans->transtypeByVal,
1312 pertrans->transtypeLen);
1313 }
1314 if (!pergroupstate->transValueIsNull)
1315 {
1316 if (DatumIsReadWriteExpandedObject(pergroupstate->transValue,
1317 false,
1318 pertrans->transtypeLen))
1319 DeleteExpandedObject(pergroupstate->transValue);
1320 else
1321 pfree(DatumGetPointer(pergroupstate->transValue));
1322 }
1323 }
1324
1325 pergroupstate->transValue = newVal;
1326 pergroupstate->transValueIsNull = fcinfo->isnull;
1327
1328 MemoryContextSwitchTo(oldContext);
1329 }
1330
1331
1332 /*
1333 * Run the transition function for a DISTINCT or ORDER BY aggregate
1334 * with only one input. This is called after we have completed
1335 * entering all the input values into the sort object. We complete the
1336 * sort, read out the values in sorted order, and run the transition
1337 * function on each value (applying DISTINCT if appropriate).
1338 *
1339 * Note that the strictness of the transition function was checked when
1340 * entering the values into the sort, so we don't check it again here;
1341 * we just apply standard SQL DISTINCT logic.
1342 *
1343 * The one-input case is handled separately from the multi-input case
1344 * for performance reasons: for single by-value inputs, such as the
1345 * common case of count(distinct id), the tuplesort_getdatum code path
1346 * is around 300% faster. (The speedup for by-reference types is less
1347 * but still noticeable.)
1348 *
1349 * This function handles only one grouping set (already set in
1350 * aggstate->current_set).
1351 *
1352 * When called, CurrentMemoryContext should be the per-query context.
1353 */
1354 static void
process_ordered_aggregate_single(AggState * aggstate,AggStatePerTrans pertrans,AggStatePerGroup pergroupstate)1355 process_ordered_aggregate_single(AggState *aggstate,
1356 AggStatePerTrans pertrans,
1357 AggStatePerGroup pergroupstate)
1358 {
1359 Datum oldVal = (Datum) 0;
1360 bool oldIsNull = true;
1361 bool haveOldVal = false;
1362 MemoryContext workcontext = aggstate->tmpcontext->ecxt_per_tuple_memory;
1363 MemoryContext oldContext;
1364 bool isDistinct = (pertrans->numDistinctCols > 0);
1365 Datum newAbbrevVal = (Datum) 0;
1366 Datum oldAbbrevVal = (Datum) 0;
1367 FunctionCallInfo fcinfo = &pertrans->transfn_fcinfo;
1368 Datum *newVal;
1369 bool *isNull;
1370
1371 Assert(pertrans->numDistinctCols < 2);
1372
1373 tuplesort_performsort(pertrans->sortstates[aggstate->current_set]);
1374
1375 /* Load the column into argument 1 (arg 0 will be transition value) */
1376 newVal = fcinfo->arg + 1;
1377 isNull = fcinfo->argnull + 1;
1378
1379 /*
1380 * Note: if input type is pass-by-ref, the datums returned by the sort are
1381 * freshly palloc'd in the per-query context, so we must be careful to
1382 * pfree them when they are no longer needed.
1383 */
1384
1385 while (tuplesort_getdatum(pertrans->sortstates[aggstate->current_set],
1386 true, newVal, isNull, &newAbbrevVal))
1387 {
1388 /*
1389 * Clear and select the working context for evaluation of the equality
1390 * function and transition function.
1391 */
1392 MemoryContextReset(workcontext);
1393 oldContext = MemoryContextSwitchTo(workcontext);
1394
1395 /*
1396 * If DISTINCT mode, and not distinct from prior, skip it.
1397 *
1398 * Note: we assume equality functions don't care about collation.
1399 */
1400 if (isDistinct &&
1401 haveOldVal &&
1402 ((oldIsNull && *isNull) ||
1403 (!oldIsNull && !*isNull &&
1404 oldAbbrevVal == newAbbrevVal &&
1405 DatumGetBool(FunctionCall2(&pertrans->equalfns[0],
1406 oldVal, *newVal)))))
1407 {
1408 /* equal to prior, so forget this one */
1409 if (!pertrans->inputtypeByVal && !*isNull)
1410 pfree(DatumGetPointer(*newVal));
1411 }
1412 else
1413 {
1414 advance_transition_function(aggstate, pertrans, pergroupstate);
1415 /* forget the old value, if any */
1416 if (!oldIsNull && !pertrans->inputtypeByVal)
1417 pfree(DatumGetPointer(oldVal));
1418 /* and remember the new one for subsequent equality checks */
1419 oldVal = *newVal;
1420 oldAbbrevVal = newAbbrevVal;
1421 oldIsNull = *isNull;
1422 haveOldVal = true;
1423 }
1424
1425 MemoryContextSwitchTo(oldContext);
1426 }
1427
1428 if (!oldIsNull && !pertrans->inputtypeByVal)
1429 pfree(DatumGetPointer(oldVal));
1430
1431 tuplesort_end(pertrans->sortstates[aggstate->current_set]);
1432 pertrans->sortstates[aggstate->current_set] = NULL;
1433 }
1434
1435 /*
1436 * Run the transition function for a DISTINCT or ORDER BY aggregate
1437 * with more than one input. This is called after we have completed
1438 * entering all the input values into the sort object. We complete the
1439 * sort, read out the values in sorted order, and run the transition
1440 * function on each value (applying DISTINCT if appropriate).
1441 *
1442 * This function handles only one grouping set (already set in
1443 * aggstate->current_set).
1444 *
1445 * When called, CurrentMemoryContext should be the per-query context.
1446 */
1447 static void
process_ordered_aggregate_multi(AggState * aggstate,AggStatePerTrans pertrans,AggStatePerGroup pergroupstate)1448 process_ordered_aggregate_multi(AggState *aggstate,
1449 AggStatePerTrans pertrans,
1450 AggStatePerGroup pergroupstate)
1451 {
1452 MemoryContext workcontext = aggstate->tmpcontext->ecxt_per_tuple_memory;
1453 FunctionCallInfo fcinfo = &pertrans->transfn_fcinfo;
1454 TupleTableSlot *slot1 = pertrans->sortslot;
1455 TupleTableSlot *slot2 = pertrans->uniqslot;
1456 int numTransInputs = pertrans->numTransInputs;
1457 int numDistinctCols = pertrans->numDistinctCols;
1458 Datum newAbbrevVal = (Datum) 0;
1459 Datum oldAbbrevVal = (Datum) 0;
1460 bool haveOldValue = false;
1461 int i;
1462
1463 tuplesort_performsort(pertrans->sortstates[aggstate->current_set]);
1464
1465 ExecClearTuple(slot1);
1466 if (slot2)
1467 ExecClearTuple(slot2);
1468
1469 while (tuplesort_gettupleslot(pertrans->sortstates[aggstate->current_set],
1470 true, true, slot1, &newAbbrevVal))
1471 {
1472 CHECK_FOR_INTERRUPTS();
1473
1474 /*
1475 * Extract the first numTransInputs columns as datums to pass to the
1476 * transfn. (This will help execTuplesMatch too, so we do it
1477 * immediately.)
1478 */
1479 slot_getsomeattrs(slot1, numTransInputs);
1480
1481 if (numDistinctCols == 0 ||
1482 !haveOldValue ||
1483 newAbbrevVal != oldAbbrevVal ||
1484 !execTuplesMatch(slot1, slot2,
1485 numDistinctCols,
1486 pertrans->sortColIdx,
1487 pertrans->equalfns,
1488 workcontext))
1489 {
1490 /* Load values into fcinfo */
1491 /* Start from 1, since the 0th arg will be the transition value */
1492 for (i = 0; i < numTransInputs; i++)
1493 {
1494 fcinfo->arg[i + 1] = slot1->tts_values[i];
1495 fcinfo->argnull[i + 1] = slot1->tts_isnull[i];
1496 }
1497
1498 advance_transition_function(aggstate, pertrans, pergroupstate);
1499
1500 if (numDistinctCols > 0)
1501 {
1502 /* swap the slot pointers to retain the current tuple */
1503 TupleTableSlot *tmpslot = slot2;
1504
1505 slot2 = slot1;
1506 slot1 = tmpslot;
1507 /* avoid execTuplesMatch() calls by reusing abbreviated keys */
1508 oldAbbrevVal = newAbbrevVal;
1509 haveOldValue = true;
1510 }
1511 }
1512
1513 /* Reset context each time, unless execTuplesMatch did it for us */
1514 if (numDistinctCols == 0)
1515 MemoryContextReset(workcontext);
1516
1517 ExecClearTuple(slot1);
1518 }
1519
1520 if (slot2)
1521 ExecClearTuple(slot2);
1522
1523 tuplesort_end(pertrans->sortstates[aggstate->current_set]);
1524 pertrans->sortstates[aggstate->current_set] = NULL;
1525 }
1526
1527 /*
1528 * Compute the final value of one aggregate.
1529 *
1530 * This function handles only one grouping set (already set in
1531 * aggstate->current_set).
1532 *
1533 * The finalfunction will be run, and the result delivered, in the
1534 * output-tuple context; caller's CurrentMemoryContext does not matter.
1535 *
1536 * The finalfn uses the state as set in the transno. This also might be
1537 * being used by another aggregate function, so it's important that we do
1538 * nothing destructive here.
1539 */
1540 static void
finalize_aggregate(AggState * aggstate,AggStatePerAgg peragg,AggStatePerGroup pergroupstate,Datum * resultVal,bool * resultIsNull)1541 finalize_aggregate(AggState *aggstate,
1542 AggStatePerAgg peragg,
1543 AggStatePerGroup pergroupstate,
1544 Datum *resultVal, bool *resultIsNull)
1545 {
1546 FunctionCallInfoData fcinfo;
1547 bool anynull = false;
1548 MemoryContext oldContext;
1549 int i;
1550 ListCell *lc;
1551 AggStatePerTrans pertrans = &aggstate->pertrans[peragg->transno];
1552
1553 oldContext = MemoryContextSwitchTo(aggstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
1554
1555 /*
1556 * Evaluate any direct arguments. We do this even if there's no finalfn
1557 * (which is unlikely anyway), so that side-effects happen as expected.
1558 * The direct arguments go into arg positions 1 and up, leaving position 0
1559 * for the transition state value.
1560 */
1561 i = 1;
1562 foreach(lc, pertrans->aggdirectargs)
1563 {
1564 ExprState *expr = (ExprState *) lfirst(lc);
1565
1566 fcinfo.arg[i] = ExecEvalExpr(expr,
1567 aggstate->ss.ps.ps_ExprContext,
1568 &fcinfo.argnull[i]);
1569 anynull |= fcinfo.argnull[i];
1570 i++;
1571 }
1572
1573 /*
1574 * Apply the agg's finalfn if one is provided, else return transValue.
1575 */
1576 if (OidIsValid(peragg->finalfn_oid))
1577 {
1578 int numFinalArgs = peragg->numFinalArgs;
1579
1580 /* set up aggstate->curperagg for AggGetAggref() */
1581 aggstate->curperagg = peragg;
1582
1583 InitFunctionCallInfoData(fcinfo, &peragg->finalfn,
1584 numFinalArgs,
1585 pertrans->aggCollation,
1586 (void *) aggstate, NULL);
1587
1588 /* Fill in the transition state value */
1589 fcinfo.arg[0] = MakeExpandedObjectReadOnly(pergroupstate->transValue,
1590 pergroupstate->transValueIsNull,
1591 pertrans->transtypeLen);
1592 fcinfo.argnull[0] = pergroupstate->transValueIsNull;
1593 anynull |= pergroupstate->transValueIsNull;
1594
1595 /* Fill any remaining argument positions with nulls */
1596 for (; i < numFinalArgs; i++)
1597 {
1598 fcinfo.arg[i] = (Datum) 0;
1599 fcinfo.argnull[i] = true;
1600 anynull = true;
1601 }
1602
1603 if (fcinfo.flinfo->fn_strict && anynull)
1604 {
1605 /* don't call a strict function with NULL inputs */
1606 *resultVal = (Datum) 0;
1607 *resultIsNull = true;
1608 }
1609 else
1610 {
1611 *resultVal = FunctionCallInvoke(&fcinfo);
1612 *resultIsNull = fcinfo.isnull;
1613 }
1614 aggstate->curperagg = NULL;
1615 }
1616 else
1617 {
1618 /* Don't need MakeExpandedObjectReadOnly; datumCopy will copy it */
1619 *resultVal = pergroupstate->transValue;
1620 *resultIsNull = pergroupstate->transValueIsNull;
1621 }
1622
1623 /*
1624 * If result is pass-by-ref, make sure it is in the right context.
1625 */
1626 if (!peragg->resulttypeByVal && !*resultIsNull &&
1627 !MemoryContextContains(CurrentMemoryContext,
1628 DatumGetPointer(*resultVal)))
1629 *resultVal = datumCopy(*resultVal,
1630 peragg->resulttypeByVal,
1631 peragg->resulttypeLen);
1632
1633 MemoryContextSwitchTo(oldContext);
1634 }
1635
1636 /*
1637 * Compute the output value of one partial aggregate.
1638 *
1639 * The serialization function will be run, and the result delivered, in the
1640 * output-tuple context; caller's CurrentMemoryContext does not matter.
1641 */
1642 static void
finalize_partialaggregate(AggState * aggstate,AggStatePerAgg peragg,AggStatePerGroup pergroupstate,Datum * resultVal,bool * resultIsNull)1643 finalize_partialaggregate(AggState *aggstate,
1644 AggStatePerAgg peragg,
1645 AggStatePerGroup pergroupstate,
1646 Datum *resultVal, bool *resultIsNull)
1647 {
1648 AggStatePerTrans pertrans = &aggstate->pertrans[peragg->transno];
1649 MemoryContext oldContext;
1650
1651 oldContext = MemoryContextSwitchTo(aggstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
1652
1653 /*
1654 * serialfn_oid will be set if we must serialize the transvalue before
1655 * returning it
1656 */
1657 if (OidIsValid(pertrans->serialfn_oid))
1658 {
1659 /* Don't call a strict serialization function with NULL input. */
1660 if (pertrans->serialfn.fn_strict && pergroupstate->transValueIsNull)
1661 {
1662 *resultVal = (Datum) 0;
1663 *resultIsNull = true;
1664 }
1665 else
1666 {
1667 FunctionCallInfo fcinfo = &pertrans->serialfn_fcinfo;
1668
1669 fcinfo->arg[0] = MakeExpandedObjectReadOnly(pergroupstate->transValue,
1670 pergroupstate->transValueIsNull,
1671 pertrans->transtypeLen);
1672 fcinfo->argnull[0] = pergroupstate->transValueIsNull;
1673 fcinfo->isnull = false;
1674
1675 *resultVal = FunctionCallInvoke(fcinfo);
1676 *resultIsNull = fcinfo->isnull;
1677 }
1678 }
1679 else
1680 {
1681 /* Don't need MakeExpandedObjectReadOnly; datumCopy will copy it */
1682 *resultVal = pergroupstate->transValue;
1683 *resultIsNull = pergroupstate->transValueIsNull;
1684 }
1685
1686 /* If result is pass-by-ref, make sure it is in the right context. */
1687 if (!peragg->resulttypeByVal && !*resultIsNull &&
1688 !MemoryContextContains(CurrentMemoryContext,
1689 DatumGetPointer(*resultVal)))
1690 *resultVal = datumCopy(*resultVal,
1691 peragg->resulttypeByVal,
1692 peragg->resulttypeLen);
1693
1694 MemoryContextSwitchTo(oldContext);
1695 }
1696
1697 /*
1698 * Prepare to finalize and project based on the specified representative tuple
1699 * slot and grouping set.
1700 *
1701 * In the specified tuple slot, force to null all attributes that should be
1702 * read as null in the context of the current grouping set. Also stash the
1703 * current group bitmap where GroupingExpr can get at it.
1704 *
1705 * This relies on three conditions:
1706 *
1707 * 1) Nothing is ever going to try and extract the whole tuple from this slot,
1708 * only reference it in evaluations, which will only access individual
1709 * attributes.
1710 *
1711 * 2) No system columns are going to need to be nulled. (If a system column is
1712 * referenced in a group clause, it is actually projected in the outer plan
1713 * tlist.)
1714 *
1715 * 3) Within a given phase, we never need to recover the value of an attribute
1716 * once it has been set to null.
1717 *
1718 * Poking into the slot this way is a bit ugly, but the consensus is that the
1719 * alternative was worse.
1720 */
1721 static void
prepare_projection_slot(AggState * aggstate,TupleTableSlot * slot,int currentSet)1722 prepare_projection_slot(AggState *aggstate, TupleTableSlot *slot, int currentSet)
1723 {
1724 if (aggstate->phase->grouped_cols)
1725 {
1726 Bitmapset *grouped_cols = aggstate->phase->grouped_cols[currentSet];
1727
1728 aggstate->grouped_cols = grouped_cols;
1729
1730 if (slot->tts_isempty)
1731 {
1732 /*
1733 * Force all values to be NULL if working on an empty input tuple
1734 * (i.e. an empty grouping set for which no input rows were
1735 * supplied).
1736 */
1737 ExecStoreAllNullTuple(slot);
1738 }
1739 else if (aggstate->all_grouped_cols)
1740 {
1741 ListCell *lc;
1742
1743 /* all_grouped_cols is arranged in desc order */
1744 slot_getsomeattrs(slot, linitial_int(aggstate->all_grouped_cols));
1745
1746 foreach(lc, aggstate->all_grouped_cols)
1747 {
1748 int attnum = lfirst_int(lc);
1749
1750 if (!bms_is_member(attnum, grouped_cols))
1751 slot->tts_isnull[attnum - 1] = true;
1752 }
1753 }
1754 }
1755 }
1756
1757 /*
1758 * Compute the final value of all aggregates for one group.
1759 *
1760 * This function handles only one grouping set at a time, which the caller must
1761 * have selected. It's also the caller's responsibility to adjust the supplied
1762 * pergroup parameter to point to the current set's transvalues.
1763 *
1764 * Results are stored in the output econtext aggvalues/aggnulls.
1765 */
1766 static void
finalize_aggregates(AggState * aggstate,AggStatePerAgg peraggs,AggStatePerGroup pergroup)1767 finalize_aggregates(AggState *aggstate,
1768 AggStatePerAgg peraggs,
1769 AggStatePerGroup pergroup)
1770 {
1771 ExprContext *econtext = aggstate->ss.ps.ps_ExprContext;
1772 Datum *aggvalues = econtext->ecxt_aggvalues;
1773 bool *aggnulls = econtext->ecxt_aggnulls;
1774 int aggno;
1775 int transno;
1776
1777 /*
1778 * If there were any DISTINCT and/or ORDER BY aggregates, sort their
1779 * inputs and run the transition functions.
1780 */
1781 for (transno = 0; transno < aggstate->numtrans; transno++)
1782 {
1783 AggStatePerTrans pertrans = &aggstate->pertrans[transno];
1784 AggStatePerGroup pergroupstate;
1785
1786 pergroupstate = &pergroup[transno];
1787
1788 if (pertrans->numSortCols > 0)
1789 {
1790 Assert(aggstate->aggstrategy != AGG_HASHED &&
1791 aggstate->aggstrategy != AGG_MIXED);
1792
1793 if (pertrans->numInputs == 1)
1794 process_ordered_aggregate_single(aggstate,
1795 pertrans,
1796 pergroupstate);
1797 else
1798 process_ordered_aggregate_multi(aggstate,
1799 pertrans,
1800 pergroupstate);
1801 }
1802 }
1803
1804 /*
1805 * Run the final functions.
1806 */
1807 for (aggno = 0; aggno < aggstate->numaggs; aggno++)
1808 {
1809 AggStatePerAgg peragg = &peraggs[aggno];
1810 int transno = peragg->transno;
1811 AggStatePerGroup pergroupstate;
1812
1813 pergroupstate = &pergroup[transno];
1814
1815 if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit))
1816 finalize_partialaggregate(aggstate, peragg, pergroupstate,
1817 &aggvalues[aggno], &aggnulls[aggno]);
1818 else
1819 finalize_aggregate(aggstate, peragg, pergroupstate,
1820 &aggvalues[aggno], &aggnulls[aggno]);
1821 }
1822 }
1823
1824 /*
1825 * Project the result of a group (whose aggs have already been calculated by
1826 * finalize_aggregates). Returns the result slot, or NULL if no row is
1827 * projected (suppressed by qual).
1828 */
1829 static TupleTableSlot *
project_aggregates(AggState * aggstate)1830 project_aggregates(AggState *aggstate)
1831 {
1832 ExprContext *econtext = aggstate->ss.ps.ps_ExprContext;
1833
1834 /*
1835 * Check the qual (HAVING clause); if the group does not match, ignore it.
1836 */
1837 if (ExecQual(aggstate->ss.ps.qual, econtext))
1838 {
1839 /*
1840 * Form and return projection tuple using the aggregate results and
1841 * the representative input tuple.
1842 */
1843 return ExecProject(aggstate->ss.ps.ps_ProjInfo);
1844 }
1845 else
1846 InstrCountFiltered1(aggstate, 1);
1847
1848 return NULL;
1849 }
1850
1851 /*
1852 * find_unaggregated_cols
1853 * Construct a bitmapset of the column numbers of un-aggregated Vars
1854 * appearing in our targetlist and qual (HAVING clause)
1855 */
1856 static Bitmapset *
find_unaggregated_cols(AggState * aggstate)1857 find_unaggregated_cols(AggState *aggstate)
1858 {
1859 Agg *node = (Agg *) aggstate->ss.ps.plan;
1860 Bitmapset *colnos;
1861
1862 colnos = NULL;
1863 (void) find_unaggregated_cols_walker((Node *) node->plan.targetlist,
1864 &colnos);
1865 (void) find_unaggregated_cols_walker((Node *) node->plan.qual,
1866 &colnos);
1867 return colnos;
1868 }
1869
1870 static bool
find_unaggregated_cols_walker(Node * node,Bitmapset ** colnos)1871 find_unaggregated_cols_walker(Node *node, Bitmapset **colnos)
1872 {
1873 if (node == NULL)
1874 return false;
1875 if (IsA(node, Var))
1876 {
1877 Var *var = (Var *) node;
1878
1879 /* setrefs.c should have set the varno to OUTER_VAR */
1880 Assert(var->varno == OUTER_VAR);
1881 Assert(var->varlevelsup == 0);
1882 *colnos = bms_add_member(*colnos, var->varattno);
1883 return false;
1884 }
1885 if (IsA(node, Aggref) ||IsA(node, GroupingFunc))
1886 {
1887 /* do not descend into aggregate exprs */
1888 return false;
1889 }
1890 return expression_tree_walker(node, find_unaggregated_cols_walker,
1891 (void *) colnos);
1892 }
1893
1894 /*
1895 * Initialize the hash table(s) to empty.
1896 *
1897 * To implement hashed aggregation, we need a hashtable that stores a
1898 * representative tuple and an array of AggStatePerGroup structs for each
1899 * distinct set of GROUP BY column values. We compute the hash key from the
1900 * GROUP BY columns. The per-group data is allocated in lookup_hash_entry(),
1901 * for each entry.
1902 *
1903 * We have a separate hashtable and associated perhash data structure for each
1904 * grouping set for which we're doing hashing.
1905 *
1906 * The hash tables always live in the hashcontext's per-tuple memory context
1907 * (there is only one of these for all tables together, since they are all
1908 * reset at the same time).
1909 */
1910 static void
build_hash_table(AggState * aggstate)1911 build_hash_table(AggState *aggstate)
1912 {
1913 MemoryContext tmpmem = aggstate->tmpcontext->ecxt_per_tuple_memory;
1914 Size additionalsize;
1915 int i;
1916
1917 Assert(aggstate->aggstrategy == AGG_HASHED || aggstate->aggstrategy == AGG_MIXED);
1918
1919 additionalsize = aggstate->numtrans * sizeof(AggStatePerGroupData);
1920
1921 for (i = 0; i < aggstate->num_hashes; ++i)
1922 {
1923 AggStatePerHash perhash = &aggstate->perhash[i];
1924
1925 Assert(perhash->aggnode->numGroups > 0);
1926
1927 perhash->hashtable = BuildTupleHashTable(perhash->numCols,
1928 perhash->hashGrpColIdxHash,
1929 perhash->eqfunctions,
1930 perhash->hashfunctions,
1931 perhash->aggnode->numGroups,
1932 additionalsize,
1933 aggstate->hashcontext->ecxt_per_tuple_memory,
1934 tmpmem,
1935 DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit));
1936 }
1937 }
1938
1939 /*
1940 * Compute columns that actually need to be stored in hashtable entries. The
1941 * incoming tuples from the child plan node will contain grouping columns,
1942 * other columns referenced in our targetlist and qual, columns used to
1943 * compute the aggregate functions, and perhaps just junk columns we don't use
1944 * at all. Only columns of the first two types need to be stored in the
1945 * hashtable, and getting rid of the others can make the table entries
1946 * significantly smaller. The hashtable only contains the relevant columns,
1947 * and is packed/unpacked in lookup_hash_entry() / agg_retrieve_hash_table()
1948 * into the format of the normal input descriptor.
1949 *
1950 * Additional columns, in addition to the columns grouped by, come from two
1951 * sources: Firstly functionally dependent columns that we don't need to group
1952 * by themselves, and secondly ctids for row-marks.
1953 *
1954 * To eliminate duplicates, we build a bitmapset of the needed columns, and
1955 * then build an array of the columns included in the hashtable. We might
1956 * still have duplicates if the passed-in grpColIdx has them, which can happen
1957 * in edge cases from semijoins/distinct; these can't always be removed,
1958 * because it's not certain that the duplicate cols will be using the same
1959 * hash function.
1960 *
1961 * Note that the array is preserved over ExecReScanAgg, so we allocate it in
1962 * the per-query context (unlike the hash table itself).
1963 */
1964 static void
find_hash_columns(AggState * aggstate)1965 find_hash_columns(AggState *aggstate)
1966 {
1967 Bitmapset *base_colnos;
1968 List *outerTlist = outerPlanState(aggstate)->plan->targetlist;
1969 int numHashes = aggstate->num_hashes;
1970 int j;
1971
1972 /* Find Vars that will be needed in tlist and qual */
1973 base_colnos = find_unaggregated_cols(aggstate);
1974
1975 for (j = 0; j < numHashes; ++j)
1976 {
1977 AggStatePerHash perhash = &aggstate->perhash[j];
1978 Bitmapset *colnos = bms_copy(base_colnos);
1979 AttrNumber *grpColIdx = perhash->aggnode->grpColIdx;
1980 List *hashTlist = NIL;
1981 TupleDesc hashDesc;
1982 int maxCols;
1983 int i;
1984
1985 perhash->largestGrpColIdx = 0;
1986
1987 /*
1988 * If we're doing grouping sets, then some Vars might be referenced in
1989 * tlist/qual for the benefit of other grouping sets, but not needed
1990 * when hashing; i.e. prepare_projection_slot will null them out, so
1991 * there'd be no point storing them. Use prepare_projection_slot's
1992 * logic to determine which.
1993 */
1994 if (aggstate->phases[0].grouped_cols)
1995 {
1996 Bitmapset *grouped_cols = aggstate->phases[0].grouped_cols[j];
1997 ListCell *lc;
1998
1999 foreach(lc, aggstate->all_grouped_cols)
2000 {
2001 int attnum = lfirst_int(lc);
2002
2003 if (!bms_is_member(attnum, grouped_cols))
2004 colnos = bms_del_member(colnos, attnum);
2005 }
2006 }
2007
2008 /*
2009 * Compute maximum number of input columns accounting for possible
2010 * duplications in the grpColIdx array, which can happen in some edge
2011 * cases where HashAggregate was generated as part of a semijoin or a
2012 * DISTINCT.
2013 */
2014 maxCols = bms_num_members(colnos) + perhash->numCols;
2015
2016 perhash->hashGrpColIdxInput =
2017 palloc(maxCols * sizeof(AttrNumber));
2018 perhash->hashGrpColIdxHash =
2019 palloc(perhash->numCols * sizeof(AttrNumber));
2020
2021 /* Add all the grouping columns to colnos */
2022 for (i = 0; i < perhash->numCols; i++)
2023 colnos = bms_add_member(colnos, grpColIdx[i]);
2024
2025 /*
2026 * First build mapping for columns directly hashed. These are the
2027 * first, because they'll be accessed when computing hash values and
2028 * comparing tuples for exact matches. We also build simple mapping
2029 * for execGrouping, so it knows where to find the to-be-hashed /
2030 * compared columns in the input.
2031 */
2032 for (i = 0; i < perhash->numCols; i++)
2033 {
2034 perhash->hashGrpColIdxInput[i] = grpColIdx[i];
2035 perhash->hashGrpColIdxHash[i] = i + 1;
2036 perhash->numhashGrpCols++;
2037 /* delete already mapped columns */
2038 bms_del_member(colnos, grpColIdx[i]);
2039 }
2040
2041 /* and add the remaining columns */
2042 while ((i = bms_first_member(colnos)) >= 0)
2043 {
2044 perhash->hashGrpColIdxInput[perhash->numhashGrpCols] = i;
2045 perhash->numhashGrpCols++;
2046 }
2047
2048 /* and build a tuple descriptor for the hashtable */
2049 for (i = 0; i < perhash->numhashGrpCols; i++)
2050 {
2051 int varNumber = perhash->hashGrpColIdxInput[i] - 1;
2052
2053 hashTlist = lappend(hashTlist, list_nth(outerTlist, varNumber));
2054 perhash->largestGrpColIdx =
2055 Max(varNumber + 1, perhash->largestGrpColIdx);
2056 }
2057
2058 hashDesc = ExecTypeFromTL(hashTlist, false);
2059 ExecSetSlotDescriptor(perhash->hashslot, hashDesc);
2060
2061 list_free(hashTlist);
2062 bms_free(colnos);
2063 }
2064
2065 bms_free(base_colnos);
2066 }
2067
2068 /*
2069 * Estimate per-hash-table-entry overhead for the planner.
2070 *
2071 * Note that the estimate does not include space for pass-by-reference
2072 * transition data values, nor for the representative tuple of each group.
2073 * Nor does this account of the target fill-factor and growth policy of the
2074 * hash table.
2075 */
2076 Size
hash_agg_entry_size(int numAggs)2077 hash_agg_entry_size(int numAggs)
2078 {
2079 Size entrysize;
2080
2081 /* This must match build_hash_table */
2082 entrysize = sizeof(TupleHashEntryData) +
2083 numAggs * sizeof(AggStatePerGroupData);
2084 entrysize = MAXALIGN(entrysize);
2085
2086 return entrysize;
2087 }
2088
2089 /*
2090 * Find or create a hashtable entry for the tuple group containing the current
2091 * tuple (already set in tmpcontext's outertuple slot), in the current grouping
2092 * set (which the caller must have selected - note that initialize_aggregate
2093 * depends on this).
2094 *
2095 * When called, CurrentMemoryContext should be the per-query context.
2096 */
2097 static TupleHashEntryData *
lookup_hash_entry(AggState * aggstate)2098 lookup_hash_entry(AggState *aggstate)
2099 {
2100 TupleTableSlot *inputslot = aggstate->tmpcontext->ecxt_outertuple;
2101 AggStatePerHash perhash = &aggstate->perhash[aggstate->current_set];
2102 TupleTableSlot *hashslot = perhash->hashslot;
2103 TupleHashEntryData *entry;
2104 bool isnew;
2105 int i;
2106
2107 /* transfer just the needed columns into hashslot */
2108 slot_getsomeattrs(inputslot, perhash->largestGrpColIdx);
2109 ExecClearTuple(hashslot);
2110
2111 for (i = 0; i < perhash->numhashGrpCols; i++)
2112 {
2113 int varNumber = perhash->hashGrpColIdxInput[i] - 1;
2114
2115 hashslot->tts_values[i] = inputslot->tts_values[varNumber];
2116 hashslot->tts_isnull[i] = inputslot->tts_isnull[varNumber];
2117 }
2118 ExecStoreVirtualTuple(hashslot);
2119
2120 /* find or create the hashtable entry using the filtered tuple */
2121 entry = LookupTupleHashEntry(perhash->hashtable, hashslot, &isnew);
2122
2123 if (isnew)
2124 {
2125 entry->additional = (AggStatePerGroup)
2126 MemoryContextAlloc(perhash->hashtable->tablecxt,
2127 sizeof(AggStatePerGroupData) * aggstate->numtrans);
2128 /* initialize aggregates for new tuple group */
2129 initialize_aggregates(aggstate, (AggStatePerGroup) entry->additional,
2130 -1);
2131 }
2132
2133 return entry;
2134 }
2135
2136 /*
2137 * Look up hash entries for the current tuple in all hashed grouping sets,
2138 * returning an array of pergroup pointers suitable for advance_aggregates.
2139 *
2140 * Be aware that lookup_hash_entry can reset the tmpcontext.
2141 */
2142 static AggStatePerGroup *
lookup_hash_entries(AggState * aggstate)2143 lookup_hash_entries(AggState *aggstate)
2144 {
2145 int numHashes = aggstate->num_hashes;
2146 AggStatePerGroup *pergroup = aggstate->hash_pergroup;
2147 int setno;
2148
2149 for (setno = 0; setno < numHashes; setno++)
2150 {
2151 select_current_set(aggstate, setno, true);
2152 pergroup[setno] = lookup_hash_entry(aggstate)->additional;
2153 }
2154
2155 return pergroup;
2156 }
2157
2158 /*
2159 * ExecAgg -
2160 *
2161 * ExecAgg receives tuples from its outer subplan and aggregates over
2162 * the appropriate attribute for each aggregate function use (Aggref
2163 * node) appearing in the targetlist or qual of the node. The number
2164 * of tuples to aggregate over depends on whether grouped or plain
2165 * aggregation is selected. In grouped aggregation, we produce a result
2166 * row for each group; in plain aggregation there's a single result row
2167 * for the whole query. In either case, the value of each aggregate is
2168 * stored in the expression context to be used when ExecProject evaluates
2169 * the result tuple.
2170 */
2171 static TupleTableSlot *
ExecAgg(PlanState * pstate)2172 ExecAgg(PlanState *pstate)
2173 {
2174 AggState *node = castNode(AggState, pstate);
2175 TupleTableSlot *result = NULL;
2176
2177 CHECK_FOR_INTERRUPTS();
2178
2179 if (!node->agg_done)
2180 {
2181 /* Dispatch based on strategy */
2182 switch (node->phase->aggstrategy)
2183 {
2184 case AGG_HASHED:
2185 if (!node->table_filled)
2186 agg_fill_hash_table(node);
2187 /* FALLTHROUGH */
2188 case AGG_MIXED:
2189 result = agg_retrieve_hash_table(node);
2190 break;
2191 case AGG_PLAIN:
2192 case AGG_SORTED:
2193 result = agg_retrieve_direct(node);
2194 break;
2195 }
2196
2197 if (!TupIsNull(result))
2198 return result;
2199 }
2200
2201 return NULL;
2202 }
2203
2204 /*
2205 * ExecAgg for non-hashed case
2206 */
2207 static TupleTableSlot *
agg_retrieve_direct(AggState * aggstate)2208 agg_retrieve_direct(AggState *aggstate)
2209 {
2210 Agg *node = aggstate->phase->aggnode;
2211 ExprContext *econtext;
2212 ExprContext *tmpcontext;
2213 AggStatePerAgg peragg;
2214 AggStatePerGroup pergroup;
2215 AggStatePerGroup *hash_pergroups = NULL;
2216 TupleTableSlot *outerslot;
2217 TupleTableSlot *firstSlot;
2218 TupleTableSlot *result;
2219 bool hasGroupingSets = aggstate->phase->numsets > 0;
2220 int numGroupingSets = Max(aggstate->phase->numsets, 1);
2221 int currentSet;
2222 int nextSetSize;
2223 int numReset;
2224 int i;
2225
2226 /*
2227 * get state info from node
2228 *
2229 * econtext is the per-output-tuple expression context
2230 *
2231 * tmpcontext is the per-input-tuple expression context
2232 */
2233 econtext = aggstate->ss.ps.ps_ExprContext;
2234 tmpcontext = aggstate->tmpcontext;
2235
2236 peragg = aggstate->peragg;
2237 pergroup = aggstate->pergroup;
2238 firstSlot = aggstate->ss.ss_ScanTupleSlot;
2239
2240 /*
2241 * We loop retrieving groups until we find one matching
2242 * aggstate->ss.ps.qual
2243 *
2244 * For grouping sets, we have the invariant that aggstate->projected_set
2245 * is either -1 (initial call) or the index (starting from 0) in
2246 * gset_lengths for the group we just completed (either by projecting a
2247 * row or by discarding it in the qual).
2248 */
2249 while (!aggstate->agg_done)
2250 {
2251 /*
2252 * Clear the per-output-tuple context for each group, as well as
2253 * aggcontext (which contains any pass-by-ref transvalues of the old
2254 * group). Some aggregate functions store working state in child
2255 * contexts; those now get reset automatically without us needing to
2256 * do anything special.
2257 *
2258 * We use ReScanExprContext not just ResetExprContext because we want
2259 * any registered shutdown callbacks to be called. That allows
2260 * aggregate functions to ensure they've cleaned up any non-memory
2261 * resources.
2262 */
2263 ReScanExprContext(econtext);
2264
2265 /*
2266 * Determine how many grouping sets need to be reset at this boundary.
2267 */
2268 if (aggstate->projected_set >= 0 &&
2269 aggstate->projected_set < numGroupingSets)
2270 numReset = aggstate->projected_set + 1;
2271 else
2272 numReset = numGroupingSets;
2273
2274 /*
2275 * numReset can change on a phase boundary, but that's OK; we want to
2276 * reset the contexts used in _this_ phase, and later, after possibly
2277 * changing phase, initialize the right number of aggregates for the
2278 * _new_ phase.
2279 */
2280
2281 for (i = 0; i < numReset; i++)
2282 {
2283 ReScanExprContext(aggstate->aggcontexts[i]);
2284 }
2285
2286 /*
2287 * Check if input is complete and there are no more groups to project
2288 * in this phase; move to next phase or mark as done.
2289 */
2290 if (aggstate->input_done == true &&
2291 aggstate->projected_set >= (numGroupingSets - 1))
2292 {
2293 if (aggstate->current_phase < aggstate->numphases - 1)
2294 {
2295 initialize_phase(aggstate, aggstate->current_phase + 1);
2296 aggstate->input_done = false;
2297 aggstate->projected_set = -1;
2298 numGroupingSets = Max(aggstate->phase->numsets, 1);
2299 node = aggstate->phase->aggnode;
2300 numReset = numGroupingSets;
2301 }
2302 else if (aggstate->aggstrategy == AGG_MIXED)
2303 {
2304 /*
2305 * Mixed mode; we've output all the grouped stuff and have
2306 * full hashtables, so switch to outputting those.
2307 */
2308 initialize_phase(aggstate, 0);
2309 aggstate->table_filled = true;
2310 ResetTupleHashIterator(aggstate->perhash[0].hashtable,
2311 &aggstate->perhash[0].hashiter);
2312 select_current_set(aggstate, 0, true);
2313 return agg_retrieve_hash_table(aggstate);
2314 }
2315 else
2316 {
2317 aggstate->agg_done = true;
2318 break;
2319 }
2320 }
2321
2322 /*
2323 * Get the number of columns in the next grouping set after the last
2324 * projected one (if any). This is the number of columns to compare to
2325 * see if we reached the boundary of that set too.
2326 */
2327 if (aggstate->projected_set >= 0 &&
2328 aggstate->projected_set < (numGroupingSets - 1))
2329 nextSetSize = aggstate->phase->gset_lengths[aggstate->projected_set + 1];
2330 else
2331 nextSetSize = 0;
2332
2333 /*----------
2334 * If a subgroup for the current grouping set is present, project it.
2335 *
2336 * We have a new group if:
2337 * - we're out of input but haven't projected all grouping sets
2338 * (checked above)
2339 * OR
2340 * - we already projected a row that wasn't from the last grouping
2341 * set
2342 * AND
2343 * - the next grouping set has at least one grouping column (since
2344 * empty grouping sets project only once input is exhausted)
2345 * AND
2346 * - the previous and pending rows differ on the grouping columns
2347 * of the next grouping set
2348 *----------
2349 */
2350 if (aggstate->input_done ||
2351 (node->aggstrategy != AGG_PLAIN &&
2352 aggstate->projected_set != -1 &&
2353 aggstate->projected_set < (numGroupingSets - 1) &&
2354 nextSetSize > 0 &&
2355 !execTuplesMatch(econtext->ecxt_outertuple,
2356 tmpcontext->ecxt_outertuple,
2357 nextSetSize,
2358 node->grpColIdx,
2359 aggstate->phase->eqfunctions,
2360 tmpcontext->ecxt_per_tuple_memory)))
2361 {
2362 aggstate->projected_set += 1;
2363
2364 Assert(aggstate->projected_set < numGroupingSets);
2365 Assert(nextSetSize > 0 || aggstate->input_done);
2366 }
2367 else
2368 {
2369 /*
2370 * We no longer care what group we just projected, the next
2371 * projection will always be the first (or only) grouping set
2372 * (unless the input proves to be empty).
2373 */
2374 aggstate->projected_set = 0;
2375
2376 /*
2377 * If we don't already have the first tuple of the new group,
2378 * fetch it from the outer plan.
2379 */
2380 if (aggstate->grp_firstTuple == NULL)
2381 {
2382 outerslot = fetch_input_tuple(aggstate);
2383 if (!TupIsNull(outerslot))
2384 {
2385 /*
2386 * Make a copy of the first input tuple; we will use this
2387 * for comparisons (in group mode) and for projection.
2388 */
2389 aggstate->grp_firstTuple = ExecCopySlotTuple(outerslot);
2390 }
2391 else
2392 {
2393 /* outer plan produced no tuples at all */
2394 if (hasGroupingSets)
2395 {
2396 /*
2397 * If there was no input at all, we need to project
2398 * rows only if there are grouping sets of size 0.
2399 * Note that this implies that there can't be any
2400 * references to ungrouped Vars, which would otherwise
2401 * cause issues with the empty output slot.
2402 *
2403 * XXX: This is no longer true, we currently deal with
2404 * this in finalize_aggregates().
2405 */
2406 aggstate->input_done = true;
2407
2408 while (aggstate->phase->gset_lengths[aggstate->projected_set] > 0)
2409 {
2410 aggstate->projected_set += 1;
2411 if (aggstate->projected_set >= numGroupingSets)
2412 {
2413 /*
2414 * We can't set agg_done here because we might
2415 * have more phases to do, even though the
2416 * input is empty. So we need to restart the
2417 * whole outer loop.
2418 */
2419 break;
2420 }
2421 }
2422
2423 if (aggstate->projected_set >= numGroupingSets)
2424 continue;
2425 }
2426 else
2427 {
2428 aggstate->agg_done = true;
2429 /* If we are grouping, we should produce no tuples too */
2430 if (node->aggstrategy != AGG_PLAIN)
2431 return NULL;
2432 }
2433 }
2434 }
2435
2436 /*
2437 * Initialize working state for a new input tuple group.
2438 */
2439 initialize_aggregates(aggstate, pergroup, numReset);
2440
2441 if (aggstate->grp_firstTuple != NULL)
2442 {
2443 /*
2444 * Store the copied first input tuple in the tuple table slot
2445 * reserved for it. The tuple will be deleted when it is
2446 * cleared from the slot.
2447 */
2448 ExecStoreTuple(aggstate->grp_firstTuple,
2449 firstSlot,
2450 InvalidBuffer,
2451 true);
2452 aggstate->grp_firstTuple = NULL; /* don't keep two pointers */
2453
2454 /* set up for first advance_aggregates call */
2455 tmpcontext->ecxt_outertuple = firstSlot;
2456
2457 /*
2458 * Process each outer-plan tuple, and then fetch the next one,
2459 * until we exhaust the outer plan or cross a group boundary.
2460 */
2461 for (;;)
2462 {
2463 /*
2464 * During phase 1 only of a mixed agg, we need to update
2465 * hashtables as well in advance_aggregates.
2466 */
2467 if (aggstate->aggstrategy == AGG_MIXED &&
2468 aggstate->current_phase == 1)
2469 {
2470 hash_pergroups = lookup_hash_entries(aggstate);
2471 }
2472 else
2473 hash_pergroups = NULL;
2474
2475 if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit))
2476 combine_aggregates(aggstate, pergroup);
2477 else
2478 advance_aggregates(aggstate, pergroup, hash_pergroups);
2479
2480 /* Reset per-input-tuple context after each tuple */
2481 ResetExprContext(tmpcontext);
2482
2483 outerslot = fetch_input_tuple(aggstate);
2484 if (TupIsNull(outerslot))
2485 {
2486 /* no more outer-plan tuples available */
2487 if (hasGroupingSets)
2488 {
2489 aggstate->input_done = true;
2490 break;
2491 }
2492 else
2493 {
2494 aggstate->agg_done = true;
2495 break;
2496 }
2497 }
2498 /* set up for next advance_aggregates call */
2499 tmpcontext->ecxt_outertuple = outerslot;
2500
2501 /*
2502 * If we are grouping, check whether we've crossed a group
2503 * boundary.
2504 */
2505 if (node->aggstrategy != AGG_PLAIN)
2506 {
2507 if (!execTuplesMatch(firstSlot,
2508 outerslot,
2509 node->numCols,
2510 node->grpColIdx,
2511 aggstate->phase->eqfunctions,
2512 tmpcontext->ecxt_per_tuple_memory))
2513 {
2514 aggstate->grp_firstTuple = ExecCopySlotTuple(outerslot);
2515 break;
2516 }
2517 }
2518 }
2519 }
2520
2521 /*
2522 * Use the representative input tuple for any references to
2523 * non-aggregated input columns in aggregate direct args, the node
2524 * qual, and the tlist. (If we are not grouping, and there are no
2525 * input rows at all, we will come here with an empty firstSlot
2526 * ... but if not grouping, there can't be any references to
2527 * non-aggregated input columns, so no problem.)
2528 */
2529 econtext->ecxt_outertuple = firstSlot;
2530 }
2531
2532 Assert(aggstate->projected_set >= 0);
2533
2534 currentSet = aggstate->projected_set;
2535
2536 prepare_projection_slot(aggstate, econtext->ecxt_outertuple, currentSet);
2537
2538 select_current_set(aggstate, currentSet, false);
2539
2540 finalize_aggregates(aggstate,
2541 peragg,
2542 pergroup + (currentSet * aggstate->numtrans));
2543
2544 /*
2545 * If there's no row to project right now, we must continue rather
2546 * than returning a null since there might be more groups.
2547 */
2548 result = project_aggregates(aggstate);
2549 if (result)
2550 return result;
2551 }
2552
2553 /* No more groups */
2554 return NULL;
2555 }
2556
2557 /*
2558 * ExecAgg for hashed case: read input and build hash table
2559 */
2560 static void
agg_fill_hash_table(AggState * aggstate)2561 agg_fill_hash_table(AggState *aggstate)
2562 {
2563 TupleTableSlot *outerslot;
2564 ExprContext *tmpcontext = aggstate->tmpcontext;
2565
2566 /*
2567 * Process each outer-plan tuple, and then fetch the next one, until we
2568 * exhaust the outer plan.
2569 */
2570 for (;;)
2571 {
2572 AggStatePerGroup *pergroups;
2573
2574 outerslot = fetch_input_tuple(aggstate);
2575 if (TupIsNull(outerslot))
2576 break;
2577
2578 /* set up for lookup_hash_entries and advance_aggregates */
2579 tmpcontext->ecxt_outertuple = outerslot;
2580
2581 /* Find or build hashtable entries */
2582 pergroups = lookup_hash_entries(aggstate);
2583
2584 /* Advance the aggregates */
2585 if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit))
2586 combine_aggregates(aggstate, pergroups[0]);
2587 else
2588 advance_aggregates(aggstate, NULL, pergroups);
2589
2590 /*
2591 * Reset per-input-tuple context after each tuple, but note that the
2592 * hash lookups do this too
2593 */
2594 ResetExprContext(aggstate->tmpcontext);
2595 }
2596
2597 aggstate->table_filled = true;
2598 /* Initialize to walk the first hash table */
2599 select_current_set(aggstate, 0, true);
2600 ResetTupleHashIterator(aggstate->perhash[0].hashtable,
2601 &aggstate->perhash[0].hashiter);
2602 }
2603
2604 /*
2605 * ExecAgg for hashed case: retrieving groups from hash table
2606 */
2607 static TupleTableSlot *
agg_retrieve_hash_table(AggState * aggstate)2608 agg_retrieve_hash_table(AggState *aggstate)
2609 {
2610 ExprContext *econtext;
2611 AggStatePerAgg peragg;
2612 AggStatePerGroup pergroup;
2613 TupleHashEntryData *entry;
2614 TupleTableSlot *firstSlot;
2615 TupleTableSlot *result;
2616 AggStatePerHash perhash;
2617
2618 /*
2619 * get state info from node.
2620 *
2621 * econtext is the per-output-tuple expression context.
2622 */
2623 econtext = aggstate->ss.ps.ps_ExprContext;
2624 peragg = aggstate->peragg;
2625 firstSlot = aggstate->ss.ss_ScanTupleSlot;
2626
2627 /*
2628 * Note that perhash (and therefore anything accessed through it) can
2629 * change inside the loop, as we change between grouping sets.
2630 */
2631 perhash = &aggstate->perhash[aggstate->current_set];
2632
2633 /*
2634 * We loop retrieving groups until we find one satisfying
2635 * aggstate->ss.ps.qual
2636 */
2637 while (!aggstate->agg_done)
2638 {
2639 TupleTableSlot *hashslot = perhash->hashslot;
2640 int i;
2641
2642 CHECK_FOR_INTERRUPTS();
2643
2644 /*
2645 * Find the next entry in the hash table
2646 */
2647 entry = ScanTupleHashTable(perhash->hashtable, &perhash->hashiter);
2648 if (entry == NULL)
2649 {
2650 int nextset = aggstate->current_set + 1;
2651
2652 if (nextset < aggstate->num_hashes)
2653 {
2654 /*
2655 * Switch to next grouping set, reinitialize, and restart the
2656 * loop.
2657 */
2658 select_current_set(aggstate, nextset, true);
2659
2660 perhash = &aggstate->perhash[aggstate->current_set];
2661
2662 ResetTupleHashIterator(perhash->hashtable, &perhash->hashiter);
2663
2664 continue;
2665 }
2666 else
2667 {
2668 /* No more hashtables, so done */
2669 aggstate->agg_done = TRUE;
2670 return NULL;
2671 }
2672 }
2673
2674 /*
2675 * Clear the per-output-tuple context for each group
2676 *
2677 * We intentionally don't use ReScanExprContext here; if any aggs have
2678 * registered shutdown callbacks, they mustn't be called yet, since we
2679 * might not be done with that agg.
2680 */
2681 ResetExprContext(econtext);
2682
2683 /*
2684 * Transform representative tuple back into one with the right
2685 * columns.
2686 */
2687 ExecStoreMinimalTuple(entry->firstTuple, hashslot, false);
2688 slot_getallattrs(hashslot);
2689
2690 ExecClearTuple(firstSlot);
2691 memset(firstSlot->tts_isnull, true,
2692 firstSlot->tts_tupleDescriptor->natts * sizeof(bool));
2693
2694 for (i = 0; i < perhash->numhashGrpCols; i++)
2695 {
2696 int varNumber = perhash->hashGrpColIdxInput[i] - 1;
2697
2698 firstSlot->tts_values[varNumber] = hashslot->tts_values[i];
2699 firstSlot->tts_isnull[varNumber] = hashslot->tts_isnull[i];
2700 }
2701 ExecStoreVirtualTuple(firstSlot);
2702
2703 pergroup = (AggStatePerGroup) entry->additional;
2704
2705 /*
2706 * Use the representative input tuple for any references to
2707 * non-aggregated input columns in the qual and tlist.
2708 */
2709 econtext->ecxt_outertuple = firstSlot;
2710
2711 prepare_projection_slot(aggstate,
2712 econtext->ecxt_outertuple,
2713 aggstate->current_set);
2714
2715 finalize_aggregates(aggstate, peragg, pergroup);
2716
2717 result = project_aggregates(aggstate);
2718 if (result)
2719 return result;
2720 }
2721
2722 /* No more groups */
2723 return NULL;
2724 }
2725
2726 /* -----------------
2727 * ExecInitAgg
2728 *
2729 * Creates the run-time information for the agg node produced by the
2730 * planner and initializes its outer subtree.
2731 *
2732 * -----------------
2733 */
2734 AggState *
ExecInitAgg(Agg * node,EState * estate,int eflags)2735 ExecInitAgg(Agg *node, EState *estate, int eflags)
2736 {
2737 AggState *aggstate;
2738 AggStatePerAgg peraggs;
2739 AggStatePerTrans pertransstates;
2740 Plan *outerPlan;
2741 ExprContext *econtext;
2742 int numaggs,
2743 transno,
2744 aggno;
2745 int phase;
2746 int phaseidx;
2747 List *combined_inputeval;
2748 TupleDesc combineddesc;
2749 TupleTableSlot *combinedslot;
2750 ListCell *l;
2751 Bitmapset *all_grouped_cols = NULL;
2752 int numGroupingSets = 1;
2753 int numPhases;
2754 int numHashes;
2755 int column_offset;
2756 int i = 0;
2757 int j = 0;
2758 bool use_hashing = (node->aggstrategy == AGG_HASHED ||
2759 node->aggstrategy == AGG_MIXED);
2760
2761 /* check for unsupported flags */
2762 Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
2763
2764 /*
2765 * create state structure
2766 */
2767 aggstate = makeNode(AggState);
2768 aggstate->ss.ps.plan = (Plan *) node;
2769 aggstate->ss.ps.state = estate;
2770 aggstate->ss.ps.ExecProcNode = ExecAgg;
2771
2772 aggstate->aggs = NIL;
2773 aggstate->numaggs = 0;
2774 aggstate->numtrans = 0;
2775 aggstate->aggstrategy = node->aggstrategy;
2776 aggstate->aggsplit = node->aggsplit;
2777 aggstate->maxsets = 0;
2778 aggstate->projected_set = -1;
2779 aggstate->current_set = 0;
2780 aggstate->peragg = NULL;
2781 aggstate->pertrans = NULL;
2782 aggstate->curperagg = NULL;
2783 aggstate->curpertrans = NULL;
2784 aggstate->input_done = false;
2785 aggstate->agg_done = false;
2786 aggstate->pergroup = NULL;
2787 aggstate->grp_firstTuple = NULL;
2788 aggstate->sort_in = NULL;
2789 aggstate->sort_out = NULL;
2790
2791 /*
2792 * phases[0] always exists, but is dummy in sorted/plain mode
2793 */
2794 numPhases = (use_hashing ? 1 : 2);
2795 numHashes = (use_hashing ? 1 : 0);
2796
2797 /*
2798 * Calculate the maximum number of grouping sets in any phase; this
2799 * determines the size of some allocations. Also calculate the number of
2800 * phases, since all hashed/mixed nodes contribute to only a single phase.
2801 */
2802 if (node->groupingSets)
2803 {
2804 numGroupingSets = list_length(node->groupingSets);
2805
2806 foreach(l, node->chain)
2807 {
2808 Agg *agg = lfirst(l);
2809
2810 numGroupingSets = Max(numGroupingSets,
2811 list_length(agg->groupingSets));
2812
2813 /*
2814 * additional AGG_HASHED aggs become part of phase 0, but all
2815 * others add an extra phase.
2816 */
2817 if (agg->aggstrategy != AGG_HASHED)
2818 ++numPhases;
2819 else
2820 ++numHashes;
2821 }
2822 }
2823
2824 aggstate->maxsets = numGroupingSets;
2825 aggstate->numphases = numPhases;
2826
2827 aggstate->aggcontexts = (ExprContext **)
2828 palloc0(sizeof(ExprContext *) * numGroupingSets);
2829
2830 /*
2831 * Create expression contexts. We need three or more, one for
2832 * per-input-tuple processing, one for per-output-tuple processing, one
2833 * for all the hashtables, and one for each grouping set. The per-tuple
2834 * memory context of the per-grouping-set ExprContexts (aggcontexts)
2835 * replaces the standalone memory context formerly used to hold transition
2836 * values. We cheat a little by using ExecAssignExprContext() to build
2837 * all of them.
2838 *
2839 * NOTE: the details of what is stored in aggcontexts and what is stored
2840 * in the regular per-query memory context are driven by a simple
2841 * decision: we want to reset the aggcontext at group boundaries (if not
2842 * hashing) and in ExecReScanAgg to recover no-longer-wanted space.
2843 */
2844 ExecAssignExprContext(estate, &aggstate->ss.ps);
2845 aggstate->tmpcontext = aggstate->ss.ps.ps_ExprContext;
2846
2847 for (i = 0; i < numGroupingSets; ++i)
2848 {
2849 ExecAssignExprContext(estate, &aggstate->ss.ps);
2850 aggstate->aggcontexts[i] = aggstate->ss.ps.ps_ExprContext;
2851 }
2852
2853 if (use_hashing)
2854 {
2855 ExecAssignExprContext(estate, &aggstate->ss.ps);
2856 aggstate->hashcontext = aggstate->ss.ps.ps_ExprContext;
2857 }
2858
2859 ExecAssignExprContext(estate, &aggstate->ss.ps);
2860
2861 /*
2862 * tuple table initialization.
2863 *
2864 * For hashtables, we create some additional slots below.
2865 */
2866 ExecInitScanTupleSlot(estate, &aggstate->ss);
2867 ExecInitResultTupleSlot(estate, &aggstate->ss.ps);
2868 aggstate->sort_slot = ExecInitExtraTupleSlot(estate);
2869
2870 /*
2871 * initialize child expressions
2872 *
2873 * We expect the parser to have checked that no aggs contain other agg
2874 * calls in their arguments (and just to be sure, we verify it again while
2875 * initializing the plan node). This would make no sense under SQL
2876 * semantics, and it's forbidden by the spec. Because it is true, we
2877 * don't need to worry about evaluating the aggs in any particular order.
2878 *
2879 * Note: execExpr.c finds Aggrefs for us, and adds their AggrefExprState
2880 * nodes to aggstate->aggs. Aggrefs in the qual are found here; Aggrefs
2881 * in the targetlist are found during ExecAssignProjectionInfo, below.
2882 */
2883 aggstate->ss.ps.qual =
2884 ExecInitQual(node->plan.qual, (PlanState *) aggstate);
2885
2886 /*
2887 * Initialize child nodes.
2888 *
2889 * If we are doing a hashed aggregation then the child plan does not need
2890 * to handle REWIND efficiently; see ExecReScanAgg.
2891 */
2892 if (node->aggstrategy == AGG_HASHED)
2893 eflags &= ~EXEC_FLAG_REWIND;
2894 outerPlan = outerPlan(node);
2895 outerPlanState(aggstate) = ExecInitNode(outerPlan, estate, eflags);
2896
2897 /*
2898 * initialize source tuple type.
2899 */
2900 ExecAssignScanTypeFromOuterPlan(&aggstate->ss);
2901 if (node->chain)
2902 ExecSetSlotDescriptor(aggstate->sort_slot,
2903 aggstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor);
2904
2905 /*
2906 * Initialize result tuple type and projection info.
2907 */
2908 ExecAssignResultTypeFromTL(&aggstate->ss.ps);
2909 ExecAssignProjectionInfo(&aggstate->ss.ps, NULL);
2910
2911 /*
2912 * We should now have found all Aggrefs in the targetlist and quals.
2913 */
2914 numaggs = aggstate->numaggs;
2915 Assert(numaggs == list_length(aggstate->aggs));
2916
2917 /*
2918 * For each phase, prepare grouping set data and fmgr lookup data for
2919 * compare functions. Accumulate all_grouped_cols in passing.
2920 */
2921 aggstate->phases = palloc0(numPhases * sizeof(AggStatePerPhaseData));
2922
2923 aggstate->num_hashes = numHashes;
2924 if (numHashes)
2925 {
2926 aggstate->perhash = palloc0(sizeof(AggStatePerHashData) * numHashes);
2927 aggstate->phases[0].numsets = 0;
2928 aggstate->phases[0].gset_lengths = palloc(numHashes * sizeof(int));
2929 aggstate->phases[0].grouped_cols = palloc(numHashes * sizeof(Bitmapset *));
2930 }
2931
2932 phase = 0;
2933 for (phaseidx = 0; phaseidx <= list_length(node->chain); ++phaseidx)
2934 {
2935 Agg *aggnode;
2936 Sort *sortnode;
2937
2938 if (phaseidx > 0)
2939 {
2940 aggnode = list_nth_node(Agg, node->chain, phaseidx - 1);
2941 sortnode = castNode(Sort, aggnode->plan.lefttree);
2942 }
2943 else
2944 {
2945 aggnode = node;
2946 sortnode = NULL;
2947 }
2948
2949 Assert(phase <= 1 || sortnode);
2950
2951 if (aggnode->aggstrategy == AGG_HASHED
2952 || aggnode->aggstrategy == AGG_MIXED)
2953 {
2954 AggStatePerPhase phasedata = &aggstate->phases[0];
2955 AggStatePerHash perhash;
2956 Bitmapset *cols = NULL;
2957
2958 Assert(phase == 0);
2959 i = phasedata->numsets++;
2960 perhash = &aggstate->perhash[i];
2961
2962 /* phase 0 always points to the "real" Agg in the hash case */
2963 phasedata->aggnode = node;
2964 phasedata->aggstrategy = node->aggstrategy;
2965
2966 /* but the actual Agg node representing this hash is saved here */
2967 perhash->aggnode = aggnode;
2968
2969 phasedata->gset_lengths[i] = perhash->numCols = aggnode->numCols;
2970
2971 for (j = 0; j < aggnode->numCols; ++j)
2972 cols = bms_add_member(cols, aggnode->grpColIdx[j]);
2973
2974 phasedata->grouped_cols[i] = cols;
2975
2976 all_grouped_cols = bms_add_members(all_grouped_cols, cols);
2977 continue;
2978 }
2979 else
2980 {
2981 AggStatePerPhase phasedata = &aggstate->phases[++phase];
2982 int num_sets;
2983
2984 phasedata->numsets = num_sets = list_length(aggnode->groupingSets);
2985
2986 if (num_sets)
2987 {
2988 phasedata->gset_lengths = palloc(num_sets * sizeof(int));
2989 phasedata->grouped_cols = palloc(num_sets * sizeof(Bitmapset *));
2990
2991 i = 0;
2992 foreach(l, aggnode->groupingSets)
2993 {
2994 int current_length = list_length(lfirst(l));
2995 Bitmapset *cols = NULL;
2996
2997 /* planner forces this to be correct */
2998 for (j = 0; j < current_length; ++j)
2999 cols = bms_add_member(cols, aggnode->grpColIdx[j]);
3000
3001 phasedata->grouped_cols[i] = cols;
3002 phasedata->gset_lengths[i] = current_length;
3003
3004 ++i;
3005 }
3006
3007 all_grouped_cols = bms_add_members(all_grouped_cols,
3008 phasedata->grouped_cols[0]);
3009 }
3010 else
3011 {
3012 Assert(phaseidx == 0);
3013
3014 phasedata->gset_lengths = NULL;
3015 phasedata->grouped_cols = NULL;
3016 }
3017
3018 /*
3019 * If we are grouping, precompute fmgr lookup data for inner loop.
3020 */
3021 if (aggnode->aggstrategy == AGG_SORTED)
3022 {
3023 Assert(aggnode->numCols > 0);
3024
3025 phasedata->eqfunctions =
3026 execTuplesMatchPrepare(aggnode->numCols,
3027 aggnode->grpOperators);
3028 }
3029
3030 phasedata->aggnode = aggnode;
3031 phasedata->aggstrategy = aggnode->aggstrategy;
3032 phasedata->sortnode = sortnode;
3033 }
3034 }
3035
3036 /*
3037 * Convert all_grouped_cols to a descending-order list.
3038 */
3039 i = -1;
3040 while ((i = bms_next_member(all_grouped_cols, i)) >= 0)
3041 aggstate->all_grouped_cols = lcons_int(i, aggstate->all_grouped_cols);
3042
3043 /*
3044 * Set up aggregate-result storage in the output expr context, and also
3045 * allocate my private per-agg working storage
3046 */
3047 econtext = aggstate->ss.ps.ps_ExprContext;
3048 econtext->ecxt_aggvalues = (Datum *) palloc0(sizeof(Datum) * numaggs);
3049 econtext->ecxt_aggnulls = (bool *) palloc0(sizeof(bool) * numaggs);
3050
3051 peraggs = (AggStatePerAgg) palloc0(sizeof(AggStatePerAggData) * numaggs);
3052 pertransstates = (AggStatePerTrans) palloc0(sizeof(AggStatePerTransData) * numaggs);
3053
3054 aggstate->peragg = peraggs;
3055 aggstate->pertrans = pertransstates;
3056
3057 /*
3058 * Hashing can only appear in the initial phase.
3059 */
3060 if (use_hashing)
3061 {
3062 for (i = 0; i < numHashes; ++i)
3063 {
3064 aggstate->perhash[i].hashslot = ExecInitExtraTupleSlot(estate);
3065
3066 execTuplesHashPrepare(aggstate->perhash[i].numCols,
3067 aggstate->perhash[i].aggnode->grpOperators,
3068 &aggstate->perhash[i].eqfunctions,
3069 &aggstate->perhash[i].hashfunctions);
3070 }
3071
3072 /* this is an array of pointers, not structures */
3073 aggstate->hash_pergroup = palloc0(sizeof(AggStatePerGroup) * numHashes);
3074
3075 find_hash_columns(aggstate);
3076
3077 /* Skip massive memory allocation if we are just doing EXPLAIN */
3078 if (!(eflags & EXEC_FLAG_EXPLAIN_ONLY))
3079 build_hash_table(aggstate);
3080
3081 aggstate->table_filled = false;
3082 }
3083
3084 if (node->aggstrategy != AGG_HASHED)
3085 {
3086 AggStatePerGroup pergroup;
3087
3088 pergroup = (AggStatePerGroup) palloc0(sizeof(AggStatePerGroupData)
3089 * numaggs
3090 * numGroupingSets);
3091
3092 aggstate->pergroup = pergroup;
3093 }
3094
3095 /*
3096 * Initialize current phase-dependent values to initial phase. The initial
3097 * phase is 1 (first sort pass) for all strategies that use sorting (if
3098 * hashing is being done too, then phase 0 is processed last); but if only
3099 * hashing is being done, then phase 0 is all there is.
3100 */
3101 if (node->aggstrategy == AGG_HASHED)
3102 {
3103 aggstate->current_phase = 0;
3104 initialize_phase(aggstate, 0);
3105 select_current_set(aggstate, 0, true);
3106 }
3107 else
3108 {
3109 aggstate->current_phase = 1;
3110 initialize_phase(aggstate, 1);
3111 select_current_set(aggstate, 0, false);
3112 }
3113
3114 /* -----------------
3115 * Perform lookups of aggregate function info, and initialize the
3116 * unchanging fields of the per-agg and per-trans data.
3117 *
3118 * We try to optimize by detecting duplicate aggregate functions so that
3119 * their state and final values are re-used, rather than needlessly being
3120 * re-calculated independently. We also detect aggregates that are not
3121 * the same, but which can share the same transition state.
3122 *
3123 * Scenarios:
3124 *
3125 * 1. Identical aggregate function calls appear in the query:
3126 *
3127 * SELECT SUM(x) FROM ... HAVING SUM(x) > 0
3128 *
3129 * Since these aggregates are identical, we only need to calculate
3130 * the value once. Both aggregates will share the same 'aggno' value.
3131 *
3132 * 2. Two different aggregate functions appear in the query, but the
3133 * aggregates have the same arguments, transition functions and
3134 * initial values (and, presumably, different final functions):
3135 *
3136 * SELECT AVG(x), STDDEV(x) FROM ...
3137 *
3138 * In this case we must create a new peragg for the varying aggregate,
3139 * and we need to call the final functions separately, but we need
3140 * only run the transition function once. (This requires that the
3141 * final functions be nondestructive of the transition state, but
3142 * that's required anyway for other reasons.)
3143 *
3144 * For either of these optimizations to be valid, all aggregate properties
3145 * used in the transition phase must be the same, including any modifiers
3146 * such as ORDER BY, DISTINCT and FILTER, and the arguments mustn't
3147 * contain any volatile functions.
3148 * -----------------
3149 */
3150 aggno = -1;
3151 transno = -1;
3152 foreach(l, aggstate->aggs)
3153 {
3154 AggrefExprState *aggrefstate = (AggrefExprState *) lfirst(l);
3155 Aggref *aggref = aggrefstate->aggref;
3156 AggStatePerAgg peragg;
3157 AggStatePerTrans pertrans;
3158 int existing_aggno;
3159 int existing_transno;
3160 List *same_input_transnos;
3161 Oid inputTypes[FUNC_MAX_ARGS];
3162 int numArguments;
3163 int numDirectArgs;
3164 HeapTuple aggTuple;
3165 Form_pg_aggregate aggform;
3166 AclResult aclresult;
3167 Oid transfn_oid,
3168 finalfn_oid;
3169 Oid serialfn_oid,
3170 deserialfn_oid;
3171 Expr *finalfnexpr;
3172 Oid aggtranstype;
3173 Datum textInitVal;
3174 Datum initValue;
3175 bool initValueIsNull;
3176
3177 /* Planner should have assigned aggregate to correct level */
3178 Assert(aggref->agglevelsup == 0);
3179 /* ... and the split mode should match */
3180 Assert(aggref->aggsplit == aggstate->aggsplit);
3181
3182 /* 1. Check for already processed aggs which can be re-used */
3183 existing_aggno = find_compatible_peragg(aggref, aggstate, aggno,
3184 &same_input_transnos);
3185 if (existing_aggno != -1)
3186 {
3187 /*
3188 * Existing compatible agg found. so just point the Aggref to the
3189 * same per-agg struct.
3190 */
3191 aggrefstate->aggno = existing_aggno;
3192 continue;
3193 }
3194
3195 /* Mark Aggref state node with assigned index in the result array */
3196 peragg = &peraggs[++aggno];
3197 peragg->aggref = aggref;
3198 aggrefstate->aggno = aggno;
3199
3200 /* Fetch the pg_aggregate row */
3201 aggTuple = SearchSysCache1(AGGFNOID,
3202 ObjectIdGetDatum(aggref->aggfnoid));
3203 if (!HeapTupleIsValid(aggTuple))
3204 elog(ERROR, "cache lookup failed for aggregate %u",
3205 aggref->aggfnoid);
3206 aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
3207
3208 /* Check permission to call aggregate function */
3209 aclresult = pg_proc_aclcheck(aggref->aggfnoid, GetUserId(),
3210 ACL_EXECUTE);
3211 if (aclresult != ACLCHECK_OK)
3212 aclcheck_error(aclresult, ACL_KIND_PROC,
3213 get_func_name(aggref->aggfnoid));
3214 InvokeFunctionExecuteHook(aggref->aggfnoid);
3215
3216 /* planner recorded transition state type in the Aggref itself */
3217 aggtranstype = aggref->aggtranstype;
3218 Assert(OidIsValid(aggtranstype));
3219
3220 /*
3221 * If this aggregation is performing state combines, then instead of
3222 * using the transition function, we'll use the combine function
3223 */
3224 if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit))
3225 {
3226 transfn_oid = aggform->aggcombinefn;
3227
3228 /* If not set then the planner messed up */
3229 if (!OidIsValid(transfn_oid))
3230 elog(ERROR, "combinefn not set for aggregate function");
3231 }
3232 else
3233 transfn_oid = aggform->aggtransfn;
3234
3235 /* Final function only required if we're finalizing the aggregates */
3236 if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit))
3237 peragg->finalfn_oid = finalfn_oid = InvalidOid;
3238 else
3239 peragg->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
3240
3241 serialfn_oid = InvalidOid;
3242 deserialfn_oid = InvalidOid;
3243
3244 /*
3245 * Check if serialization/deserialization is required. We only do it
3246 * for aggregates that have transtype INTERNAL.
3247 */
3248 if (aggtranstype == INTERNALOID)
3249 {
3250 /*
3251 * The planner should only have generated a serialize agg node if
3252 * every aggregate with an INTERNAL state has a serialization
3253 * function. Verify that.
3254 */
3255 if (DO_AGGSPLIT_SERIALIZE(aggstate->aggsplit))
3256 {
3257 /* serialization only valid when not running finalfn */
3258 Assert(DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit));
3259
3260 if (!OidIsValid(aggform->aggserialfn))
3261 elog(ERROR, "serialfunc not provided for serialization aggregation");
3262 serialfn_oid = aggform->aggserialfn;
3263 }
3264
3265 /* Likewise for deserialization functions */
3266 if (DO_AGGSPLIT_DESERIALIZE(aggstate->aggsplit))
3267 {
3268 /* deserialization only valid when combining states */
3269 Assert(DO_AGGSPLIT_COMBINE(aggstate->aggsplit));
3270
3271 if (!OidIsValid(aggform->aggdeserialfn))
3272 elog(ERROR, "deserialfunc not provided for deserialization aggregation");
3273 deserialfn_oid = aggform->aggdeserialfn;
3274 }
3275 }
3276
3277 /* Check that aggregate owner has permission to call component fns */
3278 {
3279 HeapTuple procTuple;
3280 Oid aggOwner;
3281
3282 procTuple = SearchSysCache1(PROCOID,
3283 ObjectIdGetDatum(aggref->aggfnoid));
3284 if (!HeapTupleIsValid(procTuple))
3285 elog(ERROR, "cache lookup failed for function %u",
3286 aggref->aggfnoid);
3287 aggOwner = ((Form_pg_proc) GETSTRUCT(procTuple))->proowner;
3288 ReleaseSysCache(procTuple);
3289
3290 aclresult = pg_proc_aclcheck(transfn_oid, aggOwner,
3291 ACL_EXECUTE);
3292 if (aclresult != ACLCHECK_OK)
3293 aclcheck_error(aclresult, ACL_KIND_PROC,
3294 get_func_name(transfn_oid));
3295 InvokeFunctionExecuteHook(transfn_oid);
3296 if (OidIsValid(finalfn_oid))
3297 {
3298 aclresult = pg_proc_aclcheck(finalfn_oid, aggOwner,
3299 ACL_EXECUTE);
3300 if (aclresult != ACLCHECK_OK)
3301 aclcheck_error(aclresult, ACL_KIND_PROC,
3302 get_func_name(finalfn_oid));
3303 InvokeFunctionExecuteHook(finalfn_oid);
3304 }
3305 if (OidIsValid(serialfn_oid))
3306 {
3307 aclresult = pg_proc_aclcheck(serialfn_oid, aggOwner,
3308 ACL_EXECUTE);
3309 if (aclresult != ACLCHECK_OK)
3310 aclcheck_error(aclresult, ACL_KIND_PROC,
3311 get_func_name(serialfn_oid));
3312 InvokeFunctionExecuteHook(serialfn_oid);
3313 }
3314 if (OidIsValid(deserialfn_oid))
3315 {
3316 aclresult = pg_proc_aclcheck(deserialfn_oid, aggOwner,
3317 ACL_EXECUTE);
3318 if (aclresult != ACLCHECK_OK)
3319 aclcheck_error(aclresult, ACL_KIND_PROC,
3320 get_func_name(deserialfn_oid));
3321 InvokeFunctionExecuteHook(deserialfn_oid);
3322 }
3323 }
3324
3325 /*
3326 * Get actual datatypes of the (nominal) aggregate inputs. These
3327 * could be different from the agg's declared input types, when the
3328 * agg accepts ANY or a polymorphic type.
3329 */
3330 numArguments = get_aggregate_argtypes(aggref, inputTypes);
3331
3332 /* Count the "direct" arguments, if any */
3333 numDirectArgs = list_length(aggref->aggdirectargs);
3334
3335 /* Detect how many arguments to pass to the finalfn */
3336 if (aggform->aggfinalextra)
3337 peragg->numFinalArgs = numArguments + 1;
3338 else
3339 peragg->numFinalArgs = numDirectArgs + 1;
3340
3341 /*
3342 * build expression trees using actual argument & result types for the
3343 * finalfn, if it exists and is required.
3344 */
3345 if (OidIsValid(finalfn_oid))
3346 {
3347 build_aggregate_finalfn_expr(inputTypes,
3348 peragg->numFinalArgs,
3349 aggtranstype,
3350 aggref->aggtype,
3351 aggref->inputcollid,
3352 finalfn_oid,
3353 &finalfnexpr);
3354 fmgr_info(finalfn_oid, &peragg->finalfn);
3355 fmgr_info_set_expr((Node *) finalfnexpr, &peragg->finalfn);
3356 }
3357
3358 /* get info about the output value's datatype */
3359 get_typlenbyval(aggref->aggtype,
3360 &peragg->resulttypeLen,
3361 &peragg->resulttypeByVal);
3362
3363 /*
3364 * initval is potentially null, so don't try to access it as a struct
3365 * field. Must do it the hard way with SysCacheGetAttr.
3366 */
3367 textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple,
3368 Anum_pg_aggregate_agginitval,
3369 &initValueIsNull);
3370 if (initValueIsNull)
3371 initValue = (Datum) 0;
3372 else
3373 initValue = GetAggInitVal(textInitVal, aggtranstype);
3374
3375 /*
3376 * 2. Build working state for invoking the transition function, or
3377 * look up previously initialized working state, if we can share it.
3378 *
3379 * find_compatible_peragg() already collected a list of per-Trans's
3380 * with the same inputs. Check if any of them have the same transition
3381 * function and initial value.
3382 */
3383 existing_transno = find_compatible_pertrans(aggstate, aggref,
3384 transfn_oid, aggtranstype,
3385 serialfn_oid, deserialfn_oid,
3386 initValue, initValueIsNull,
3387 same_input_transnos);
3388 if (existing_transno != -1)
3389 {
3390 /*
3391 * Existing compatible trans found, so just point the 'peragg' to
3392 * the same per-trans struct.
3393 */
3394 pertrans = &pertransstates[existing_transno];
3395 peragg->transno = existing_transno;
3396 }
3397 else
3398 {
3399 pertrans = &pertransstates[++transno];
3400 build_pertrans_for_aggref(pertrans, aggstate, estate,
3401 aggref, transfn_oid, aggtranstype,
3402 serialfn_oid, deserialfn_oid,
3403 initValue, initValueIsNull,
3404 inputTypes, numArguments);
3405 peragg->transno = transno;
3406 }
3407 ReleaseSysCache(aggTuple);
3408 }
3409
3410 /*
3411 * Update aggstate->numaggs to be the number of unique aggregates found.
3412 * Also set numstates to the number of unique transition states found.
3413 */
3414 aggstate->numaggs = aggno + 1;
3415 aggstate->numtrans = transno + 1;
3416
3417 /*
3418 * Build a single projection computing the required arguments for all
3419 * aggregates at once; if there's more than one, that's considerably
3420 * faster than doing it separately for each.
3421 *
3422 * First create a targetlist representing the values to compute.
3423 */
3424 combined_inputeval = NIL;
3425 column_offset = 0;
3426 for (transno = 0; transno < aggstate->numtrans; transno++)
3427 {
3428 AggStatePerTrans pertrans = &pertransstates[transno];
3429
3430 /*
3431 * Mark this per-trans state with its starting column in the combined
3432 * slot.
3433 */
3434 pertrans->inputoff = column_offset;
3435
3436 /*
3437 * If the aggregate has a FILTER, we can only evaluate the filter
3438 * expression, not the actual input expressions, during the combined
3439 * eval step --- unless we're ignoring the filter because this node is
3440 * running combinefns not transfns.
3441 */
3442 if (pertrans->aggref->aggfilter &&
3443 !DO_AGGSPLIT_COMBINE(aggstate->aggsplit))
3444 {
3445 TargetEntry *tle;
3446
3447 tle = makeTargetEntry(pertrans->aggref->aggfilter,
3448 column_offset + 1, NULL, false);
3449 combined_inputeval = lappend(combined_inputeval, tle);
3450 column_offset++;
3451
3452 /*
3453 * We'll need separate projection machinery for the real args.
3454 * Arrange to evaluate them into the sortslot previously created.
3455 */
3456 Assert(pertrans->sortslot);
3457 pertrans->evalproj = ExecBuildProjectionInfo(pertrans->aggref->args,
3458 aggstate->tmpcontext,
3459 pertrans->sortslot,
3460 &aggstate->ss.ps,
3461 NULL);
3462 }
3463 else
3464 {
3465 /*
3466 * Add agg's input expressions to combined_inputeval, adjusting
3467 * resnos in the copied target entries to match the combined slot.
3468 */
3469 ListCell *arg;
3470
3471 foreach(arg, pertrans->aggref->args)
3472 {
3473 TargetEntry *source_tle = lfirst_node(TargetEntry, arg);
3474 TargetEntry *tle;
3475
3476 tle = flatCopyTargetEntry(source_tle);
3477 tle->resno += column_offset;
3478
3479 combined_inputeval = lappend(combined_inputeval, tle);
3480 }
3481
3482 column_offset += list_length(pertrans->aggref->args);
3483 }
3484 }
3485
3486 /* Now create a projection for the combined targetlist */
3487 combineddesc = ExecTypeFromTL(combined_inputeval, false);
3488 combinedslot = ExecInitExtraTupleSlot(estate);
3489 ExecSetSlotDescriptor(combinedslot, combineddesc);
3490 aggstate->combinedproj = ExecBuildProjectionInfo(combined_inputeval,
3491 aggstate->tmpcontext,
3492 combinedslot,
3493 &aggstate->ss.ps,
3494 NULL);
3495
3496 /*
3497 * Last, check whether any more aggregates got added onto the node while
3498 * we processed the expressions for the aggregate arguments (including not
3499 * only the regular arguments and FILTER expressions handled immediately
3500 * above, but any direct arguments we might've handled earlier). If so,
3501 * we have nested aggregate functions, which is semantically nonsensical,
3502 * so complain. (This should have been caught by the parser, so we don't
3503 * need to work hard on a helpful error message; but we defend against it
3504 * here anyway, just to be sure.)
3505 */
3506 if (numaggs != list_length(aggstate->aggs))
3507 ereport(ERROR,
3508 (errcode(ERRCODE_GROUPING_ERROR),
3509 errmsg("aggregate function calls cannot be nested")));
3510
3511 return aggstate;
3512 }
3513
3514 /*
3515 * Build the state needed to calculate a state value for an aggregate.
3516 *
3517 * This initializes all the fields in 'pertrans'. 'aggref' is the aggregate
3518 * to initialize the state for. 'aggtransfn', 'aggtranstype', and the rest
3519 * of the arguments could be calculated from 'aggref', but the caller has
3520 * calculated them already, so might as well pass them.
3521 */
3522 static void
build_pertrans_for_aggref(AggStatePerTrans pertrans,AggState * aggstate,EState * estate,Aggref * aggref,Oid aggtransfn,Oid aggtranstype,Oid aggserialfn,Oid aggdeserialfn,Datum initValue,bool initValueIsNull,Oid * inputTypes,int numArguments)3523 build_pertrans_for_aggref(AggStatePerTrans pertrans,
3524 AggState *aggstate, EState *estate,
3525 Aggref *aggref,
3526 Oid aggtransfn, Oid aggtranstype,
3527 Oid aggserialfn, Oid aggdeserialfn,
3528 Datum initValue, bool initValueIsNull,
3529 Oid *inputTypes, int numArguments)
3530 {
3531 int numGroupingSets = Max(aggstate->maxsets, 1);
3532 Expr *serialfnexpr = NULL;
3533 Expr *deserialfnexpr = NULL;
3534 ListCell *lc;
3535 int numInputs;
3536 int numDirectArgs;
3537 List *sortlist;
3538 int numSortCols;
3539 int numDistinctCols;
3540 int i;
3541
3542 /* Begin filling in the pertrans data */
3543 pertrans->aggref = aggref;
3544 pertrans->aggCollation = aggref->inputcollid;
3545 pertrans->transfn_oid = aggtransfn;
3546 pertrans->serialfn_oid = aggserialfn;
3547 pertrans->deserialfn_oid = aggdeserialfn;
3548 pertrans->initValue = initValue;
3549 pertrans->initValueIsNull = initValueIsNull;
3550
3551 /* Count the "direct" arguments, if any */
3552 numDirectArgs = list_length(aggref->aggdirectargs);
3553
3554 /* Count the number of aggregated input columns */
3555 pertrans->numInputs = numInputs = list_length(aggref->args);
3556
3557 pertrans->aggtranstype = aggtranstype;
3558
3559 /* Detect how many arguments to pass to the transfn */
3560 if (AGGKIND_IS_ORDERED_SET(aggref->aggkind))
3561 pertrans->numTransInputs = numInputs;
3562 else
3563 pertrans->numTransInputs = numArguments;
3564
3565 /* inputoff and evalproj will be set up later, in ExecInitAgg */
3566
3567 /*
3568 * When combining states, we have no use at all for the aggregate
3569 * function's transfn. Instead we use the combinefn. In this case, the
3570 * transfn and transfn_oid fields of pertrans refer to the combine
3571 * function rather than the transition function.
3572 */
3573 if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit))
3574 {
3575 Expr *combinefnexpr;
3576
3577 build_aggregate_combinefn_expr(aggtranstype,
3578 aggref->inputcollid,
3579 aggtransfn,
3580 &combinefnexpr);
3581 fmgr_info(aggtransfn, &pertrans->transfn);
3582 fmgr_info_set_expr((Node *) combinefnexpr, &pertrans->transfn);
3583
3584 InitFunctionCallInfoData(pertrans->transfn_fcinfo,
3585 &pertrans->transfn,
3586 2,
3587 pertrans->aggCollation,
3588 (void *) aggstate, NULL);
3589
3590 /*
3591 * Ensure that a combine function to combine INTERNAL states is not
3592 * strict. This should have been checked during CREATE AGGREGATE, but
3593 * the strict property could have been changed since then.
3594 */
3595 if (pertrans->transfn.fn_strict && aggtranstype == INTERNALOID)
3596 ereport(ERROR,
3597 (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
3598 errmsg("combine function with transition type %s must not be declared STRICT",
3599 format_type_be(aggtranstype))));
3600 }
3601 else
3602 {
3603 Expr *transfnexpr;
3604
3605 /*
3606 * Set up infrastructure for calling the transfn. Note that invtrans
3607 * is not needed here.
3608 */
3609 build_aggregate_transfn_expr(inputTypes,
3610 numArguments,
3611 numDirectArgs,
3612 aggref->aggvariadic,
3613 aggtranstype,
3614 aggref->inputcollid,
3615 aggtransfn,
3616 InvalidOid,
3617 &transfnexpr,
3618 NULL);
3619 fmgr_info(aggtransfn, &pertrans->transfn);
3620 fmgr_info_set_expr((Node *) transfnexpr, &pertrans->transfn);
3621
3622 InitFunctionCallInfoData(pertrans->transfn_fcinfo,
3623 &pertrans->transfn,
3624 pertrans->numTransInputs + 1,
3625 pertrans->aggCollation,
3626 (void *) aggstate, NULL);
3627
3628 /*
3629 * If the transfn is strict and the initval is NULL, make sure input
3630 * type and transtype are the same (or at least binary-compatible), so
3631 * that it's OK to use the first aggregated input value as the initial
3632 * transValue. This should have been checked at agg definition time,
3633 * but we must check again in case the transfn's strictness property
3634 * has been changed.
3635 */
3636 if (pertrans->transfn.fn_strict && pertrans->initValueIsNull)
3637 {
3638 if (numArguments <= numDirectArgs ||
3639 !IsBinaryCoercible(inputTypes[numDirectArgs],
3640 aggtranstype))
3641 ereport(ERROR,
3642 (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
3643 errmsg("aggregate %u needs to have compatible input type and transition type",
3644 aggref->aggfnoid)));
3645 }
3646 }
3647
3648 /* get info about the state value's datatype */
3649 get_typlenbyval(aggtranstype,
3650 &pertrans->transtypeLen,
3651 &pertrans->transtypeByVal);
3652
3653 if (OidIsValid(aggserialfn))
3654 {
3655 build_aggregate_serialfn_expr(aggserialfn,
3656 &serialfnexpr);
3657 fmgr_info(aggserialfn, &pertrans->serialfn);
3658 fmgr_info_set_expr((Node *) serialfnexpr, &pertrans->serialfn);
3659
3660 InitFunctionCallInfoData(pertrans->serialfn_fcinfo,
3661 &pertrans->serialfn,
3662 1,
3663 InvalidOid,
3664 (void *) aggstate, NULL);
3665 }
3666
3667 if (OidIsValid(aggdeserialfn))
3668 {
3669 build_aggregate_deserialfn_expr(aggdeserialfn,
3670 &deserialfnexpr);
3671 fmgr_info(aggdeserialfn, &pertrans->deserialfn);
3672 fmgr_info_set_expr((Node *) deserialfnexpr, &pertrans->deserialfn);
3673
3674 InitFunctionCallInfoData(pertrans->deserialfn_fcinfo,
3675 &pertrans->deserialfn,
3676 2,
3677 InvalidOid,
3678 (void *) aggstate, NULL);
3679
3680 }
3681
3682 /* Initialize any direct-argument expressions */
3683 pertrans->aggdirectargs = ExecInitExprList(aggref->aggdirectargs,
3684 (PlanState *) aggstate);
3685
3686 /*
3687 * If we're doing either DISTINCT or ORDER BY for a plain agg, then we
3688 * have a list of SortGroupClause nodes; fish out the data in them and
3689 * stick them into arrays. We ignore ORDER BY for an ordered-set agg,
3690 * however; the agg's transfn and finalfn are responsible for that.
3691 *
3692 * Note that by construction, if there is a DISTINCT clause then the ORDER
3693 * BY clause is a prefix of it (see transformDistinctClause).
3694 */
3695 if (AGGKIND_IS_ORDERED_SET(aggref->aggkind))
3696 {
3697 sortlist = NIL;
3698 numSortCols = numDistinctCols = 0;
3699 }
3700 else if (aggref->aggdistinct)
3701 {
3702 sortlist = aggref->aggdistinct;
3703 numSortCols = numDistinctCols = list_length(sortlist);
3704 Assert(numSortCols >= list_length(aggref->aggorder));
3705 }
3706 else
3707 {
3708 sortlist = aggref->aggorder;
3709 numSortCols = list_length(sortlist);
3710 numDistinctCols = 0;
3711 }
3712
3713 pertrans->numSortCols = numSortCols;
3714 pertrans->numDistinctCols = numDistinctCols;
3715
3716 /*
3717 * If we have either sorting or filtering to do, create a tupledesc and
3718 * slot corresponding to the aggregated inputs (including sort
3719 * expressions) of the agg.
3720 */
3721 if (numSortCols > 0 || aggref->aggfilter)
3722 {
3723 pertrans->sortdesc = ExecTypeFromTL(aggref->args, false);
3724 pertrans->sortslot = ExecInitExtraTupleSlot(estate);
3725 ExecSetSlotDescriptor(pertrans->sortslot, pertrans->sortdesc);
3726 }
3727
3728 if (numSortCols > 0)
3729 {
3730 /*
3731 * We don't implement DISTINCT or ORDER BY aggs in the HASHED case
3732 * (yet)
3733 */
3734 Assert(aggstate->aggstrategy != AGG_HASHED && aggstate->aggstrategy != AGG_MIXED);
3735
3736 /* If we have only one input, we need its len/byval info. */
3737 if (numInputs == 1)
3738 {
3739 get_typlenbyval(inputTypes[numDirectArgs],
3740 &pertrans->inputtypeLen,
3741 &pertrans->inputtypeByVal);
3742 }
3743 else if (numDistinctCols > 0)
3744 {
3745 /* we will need an extra slot to store prior values */
3746 pertrans->uniqslot = ExecInitExtraTupleSlot(estate);
3747 ExecSetSlotDescriptor(pertrans->uniqslot,
3748 pertrans->sortdesc);
3749 }
3750
3751 /* Extract the sort information for use later */
3752 pertrans->sortColIdx =
3753 (AttrNumber *) palloc(numSortCols * sizeof(AttrNumber));
3754 pertrans->sortOperators =
3755 (Oid *) palloc(numSortCols * sizeof(Oid));
3756 pertrans->sortCollations =
3757 (Oid *) palloc(numSortCols * sizeof(Oid));
3758 pertrans->sortNullsFirst =
3759 (bool *) palloc(numSortCols * sizeof(bool));
3760
3761 i = 0;
3762 foreach(lc, sortlist)
3763 {
3764 SortGroupClause *sortcl = (SortGroupClause *) lfirst(lc);
3765 TargetEntry *tle = get_sortgroupclause_tle(sortcl, aggref->args);
3766
3767 /* the parser should have made sure of this */
3768 Assert(OidIsValid(sortcl->sortop));
3769
3770 pertrans->sortColIdx[i] = tle->resno;
3771 pertrans->sortOperators[i] = sortcl->sortop;
3772 pertrans->sortCollations[i] = exprCollation((Node *) tle->expr);
3773 pertrans->sortNullsFirst[i] = sortcl->nulls_first;
3774 i++;
3775 }
3776 Assert(i == numSortCols);
3777 }
3778
3779 if (aggref->aggdistinct)
3780 {
3781 Assert(numArguments > 0);
3782
3783 /*
3784 * We need the equal function for each DISTINCT comparison we will
3785 * make.
3786 */
3787 pertrans->equalfns =
3788 (FmgrInfo *) palloc(numDistinctCols * sizeof(FmgrInfo));
3789
3790 i = 0;
3791 foreach(lc, aggref->aggdistinct)
3792 {
3793 SortGroupClause *sortcl = (SortGroupClause *) lfirst(lc);
3794
3795 fmgr_info(get_opcode(sortcl->eqop), &pertrans->equalfns[i]);
3796 i++;
3797 }
3798 Assert(i == numDistinctCols);
3799 }
3800
3801 pertrans->sortstates = (Tuplesortstate **)
3802 palloc0(sizeof(Tuplesortstate *) * numGroupingSets);
3803 }
3804
3805
3806 static Datum
GetAggInitVal(Datum textInitVal,Oid transtype)3807 GetAggInitVal(Datum textInitVal, Oid transtype)
3808 {
3809 Oid typinput,
3810 typioparam;
3811 char *strInitVal;
3812 Datum initVal;
3813
3814 getTypeInputInfo(transtype, &typinput, &typioparam);
3815 strInitVal = TextDatumGetCString(textInitVal);
3816 initVal = OidInputFunctionCall(typinput, strInitVal,
3817 typioparam, -1);
3818 pfree(strInitVal);
3819 return initVal;
3820 }
3821
3822 /*
3823 * find_compatible_peragg - search for a previously initialized per-Agg struct
3824 *
3825 * Searches the previously looked at aggregates to find one which is compatible
3826 * with this one, with the same input parameters. If no compatible aggregate
3827 * can be found, returns -1.
3828 *
3829 * As a side-effect, this also collects a list of existing per-Trans structs
3830 * with matching inputs. If no identical Aggref is found, the list is passed
3831 * later to find_compatible_pertrans, to see if we can at least reuse the
3832 * state value of another aggregate.
3833 */
3834 static int
find_compatible_peragg(Aggref * newagg,AggState * aggstate,int lastaggno,List ** same_input_transnos)3835 find_compatible_peragg(Aggref *newagg, AggState *aggstate,
3836 int lastaggno, List **same_input_transnos)
3837 {
3838 int aggno;
3839 AggStatePerAgg peraggs;
3840
3841 *same_input_transnos = NIL;
3842
3843 /* we mustn't reuse the aggref if it contains volatile function calls */
3844 if (contain_volatile_functions((Node *) newagg))
3845 return -1;
3846
3847 peraggs = aggstate->peragg;
3848
3849 /*
3850 * Search through the list of already seen aggregates. If we find an
3851 * existing identical aggregate call, then we can re-use that one. While
3852 * searching, we'll also collect a list of Aggrefs with the same input
3853 * parameters. If no matching Aggref is found, the caller can potentially
3854 * still re-use the transition state of one of them. (At this stage we
3855 * just compare the parsetrees; whether different aggregates share the
3856 * same transition function will be checked later.)
3857 */
3858 for (aggno = 0; aggno <= lastaggno; aggno++)
3859 {
3860 AggStatePerAgg peragg;
3861 Aggref *existingRef;
3862
3863 peragg = &peraggs[aggno];
3864 existingRef = peragg->aggref;
3865
3866 /* all of the following must be the same or it's no match */
3867 if (newagg->inputcollid != existingRef->inputcollid ||
3868 newagg->aggtranstype != existingRef->aggtranstype ||
3869 newagg->aggstar != existingRef->aggstar ||
3870 newagg->aggvariadic != existingRef->aggvariadic ||
3871 newagg->aggkind != existingRef->aggkind ||
3872 !equal(newagg->aggdirectargs, existingRef->aggdirectargs) ||
3873 !equal(newagg->args, existingRef->args) ||
3874 !equal(newagg->aggorder, existingRef->aggorder) ||
3875 !equal(newagg->aggdistinct, existingRef->aggdistinct) ||
3876 !equal(newagg->aggfilter, existingRef->aggfilter))
3877 continue;
3878
3879 /* if it's the same aggregate function then report exact match */
3880 if (newagg->aggfnoid == existingRef->aggfnoid &&
3881 newagg->aggtype == existingRef->aggtype &&
3882 newagg->aggcollid == existingRef->aggcollid)
3883 {
3884 list_free(*same_input_transnos);
3885 *same_input_transnos = NIL;
3886 return aggno;
3887 }
3888
3889 /*
3890 * Not identical, but it had the same inputs. Return it to the caller,
3891 * in case we can re-use its per-trans state.
3892 */
3893 *same_input_transnos = lappend_int(*same_input_transnos,
3894 peragg->transno);
3895 }
3896
3897 return -1;
3898 }
3899
3900 /*
3901 * find_compatible_pertrans - search for a previously initialized per-Trans
3902 * struct
3903 *
3904 * Searches the list of transnos for a per-Trans struct with the same
3905 * transition function and initial condition. (The inputs have already been
3906 * verified to match.)
3907 */
3908 static int
find_compatible_pertrans(AggState * aggstate,Aggref * newagg,Oid aggtransfn,Oid aggtranstype,Oid aggserialfn,Oid aggdeserialfn,Datum initValue,bool initValueIsNull,List * transnos)3909 find_compatible_pertrans(AggState *aggstate, Aggref *newagg,
3910 Oid aggtransfn, Oid aggtranstype,
3911 Oid aggserialfn, Oid aggdeserialfn,
3912 Datum initValue, bool initValueIsNull,
3913 List *transnos)
3914 {
3915 ListCell *lc;
3916
3917 /*
3918 * For the moment, never try to share transition states between different
3919 * ordered-set aggregates. This is necessary because the finalfns of the
3920 * built-in OSAs (see orderedsetaggs.c) are destructive of their
3921 * transition states. We should fix them so we can allow this, but not
3922 * losing performance in the normal non-shared case will take some work.
3923 */
3924 if (AGGKIND_IS_ORDERED_SET(newagg->aggkind))
3925 return -1;
3926
3927 foreach(lc, transnos)
3928 {
3929 int transno = lfirst_int(lc);
3930 AggStatePerTrans pertrans = &aggstate->pertrans[transno];
3931
3932 /*
3933 * if the transfns or transition state types are not the same then the
3934 * state can't be shared.
3935 */
3936 if (aggtransfn != pertrans->transfn_oid ||
3937 aggtranstype != pertrans->aggtranstype)
3938 continue;
3939
3940 /*
3941 * The serialization and deserialization functions must match, if
3942 * present, as we're unable to share the trans state for aggregates
3943 * which will serialize or deserialize into different formats.
3944 * Remember that these will be InvalidOid if they're not required for
3945 * this agg node.
3946 */
3947 if (aggserialfn != pertrans->serialfn_oid ||
3948 aggdeserialfn != pertrans->deserialfn_oid)
3949 continue;
3950
3951 /*
3952 * Check that the initial condition matches, too.
3953 */
3954 if (initValueIsNull && pertrans->initValueIsNull)
3955 return transno;
3956
3957 if (!initValueIsNull && !pertrans->initValueIsNull &&
3958 datumIsEqual(initValue, pertrans->initValue,
3959 pertrans->transtypeByVal, pertrans->transtypeLen))
3960 return transno;
3961 }
3962 return -1;
3963 }
3964
3965 void
ExecEndAgg(AggState * node)3966 ExecEndAgg(AggState *node)
3967 {
3968 PlanState *outerPlan;
3969 int transno;
3970 int numGroupingSets = Max(node->maxsets, 1);
3971 int setno;
3972
3973 /* Make sure we have closed any open tuplesorts */
3974
3975 if (node->sort_in)
3976 tuplesort_end(node->sort_in);
3977 if (node->sort_out)
3978 tuplesort_end(node->sort_out);
3979
3980 for (transno = 0; transno < node->numtrans; transno++)
3981 {
3982 AggStatePerTrans pertrans = &node->pertrans[transno];
3983
3984 for (setno = 0; setno < numGroupingSets; setno++)
3985 {
3986 if (pertrans->sortstates[setno])
3987 tuplesort_end(pertrans->sortstates[setno]);
3988 }
3989 }
3990
3991 /* And ensure any agg shutdown callbacks have been called */
3992 for (setno = 0; setno < numGroupingSets; setno++)
3993 ReScanExprContext(node->aggcontexts[setno]);
3994 if (node->hashcontext)
3995 ReScanExprContext(node->hashcontext);
3996
3997 /*
3998 * We don't actually free any ExprContexts here (see comment in
3999 * ExecFreeExprContext), just unlinking the output one from the plan node
4000 * suffices.
4001 */
4002 ExecFreeExprContext(&node->ss.ps);
4003
4004 /* clean up tuple table */
4005 ExecClearTuple(node->ss.ss_ScanTupleSlot);
4006
4007 outerPlan = outerPlanState(node);
4008 ExecEndNode(outerPlan);
4009 }
4010
4011 void
ExecReScanAgg(AggState * node)4012 ExecReScanAgg(AggState *node)
4013 {
4014 ExprContext *econtext = node->ss.ps.ps_ExprContext;
4015 PlanState *outerPlan = outerPlanState(node);
4016 Agg *aggnode = (Agg *) node->ss.ps.plan;
4017 int transno;
4018 int numGroupingSets = Max(node->maxsets, 1);
4019 int setno;
4020
4021 node->agg_done = false;
4022
4023 if (node->aggstrategy == AGG_HASHED)
4024 {
4025 /*
4026 * In the hashed case, if we haven't yet built the hash table then we
4027 * can just return; nothing done yet, so nothing to undo. If subnode's
4028 * chgParam is not NULL then it will be re-scanned by ExecProcNode,
4029 * else no reason to re-scan it at all.
4030 */
4031 if (!node->table_filled)
4032 return;
4033
4034 /*
4035 * If we do have the hash table, and the subplan does not have any
4036 * parameter changes, and none of our own parameter changes affect
4037 * input expressions of the aggregated functions, then we can just
4038 * rescan the existing hash table; no need to build it again.
4039 */
4040 if (outerPlan->chgParam == NULL &&
4041 !bms_overlap(node->ss.ps.chgParam, aggnode->aggParams))
4042 {
4043 ResetTupleHashIterator(node->perhash[0].hashtable,
4044 &node->perhash[0].hashiter);
4045 select_current_set(node, 0, true);
4046 return;
4047 }
4048 }
4049
4050 /* Make sure we have closed any open tuplesorts */
4051 for (transno = 0; transno < node->numtrans; transno++)
4052 {
4053 for (setno = 0; setno < numGroupingSets; setno++)
4054 {
4055 AggStatePerTrans pertrans = &node->pertrans[transno];
4056
4057 if (pertrans->sortstates[setno])
4058 {
4059 tuplesort_end(pertrans->sortstates[setno]);
4060 pertrans->sortstates[setno] = NULL;
4061 }
4062 }
4063 }
4064
4065 /*
4066 * We don't need to ReScanExprContext the output tuple context here;
4067 * ExecReScan already did it. But we do need to reset our per-grouping-set
4068 * contexts, which may have transvalues stored in them. (We use rescan
4069 * rather than just reset because transfns may have registered callbacks
4070 * that need to be run now.) For the AGG_HASHED case, see below.
4071 */
4072
4073 for (setno = 0; setno < numGroupingSets; setno++)
4074 {
4075 ReScanExprContext(node->aggcontexts[setno]);
4076 }
4077
4078 /* Release first tuple of group, if we have made a copy */
4079 if (node->grp_firstTuple != NULL)
4080 {
4081 heap_freetuple(node->grp_firstTuple);
4082 node->grp_firstTuple = NULL;
4083 }
4084 ExecClearTuple(node->ss.ss_ScanTupleSlot);
4085
4086 /* Forget current agg values */
4087 MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * node->numaggs);
4088 MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * node->numaggs);
4089
4090 /*
4091 * With AGG_HASHED/MIXED, the hash table is allocated in a sub-context of
4092 * the hashcontext. This used to be an issue, but now, resetting a context
4093 * automatically deletes sub-contexts too.
4094 */
4095 if (node->aggstrategy == AGG_HASHED || node->aggstrategy == AGG_MIXED)
4096 {
4097 ReScanExprContext(node->hashcontext);
4098 /* Rebuild an empty hash table */
4099 build_hash_table(node);
4100 node->table_filled = false;
4101 /* iterator will be reset when the table is filled */
4102 }
4103
4104 if (node->aggstrategy != AGG_HASHED)
4105 {
4106 /*
4107 * Reset the per-group state (in particular, mark transvalues null)
4108 */
4109 MemSet(node->pergroup, 0,
4110 sizeof(AggStatePerGroupData) * node->numaggs * numGroupingSets);
4111
4112 /* reset to phase 1 */
4113 initialize_phase(node, 1);
4114
4115 node->input_done = false;
4116 node->projected_set = -1;
4117 }
4118
4119 if (outerPlan->chgParam == NULL)
4120 ExecReScan(outerPlan);
4121 }
4122
4123
4124 /***********************************************************************
4125 * API exposed to aggregate functions
4126 ***********************************************************************/
4127
4128
4129 /*
4130 * AggCheckCallContext - test if a SQL function is being called as an aggregate
4131 *
4132 * The transition and/or final functions of an aggregate may want to verify
4133 * that they are being called as aggregates, rather than as plain SQL
4134 * functions. They should use this function to do so. The return value
4135 * is nonzero if being called as an aggregate, or zero if not. (Specific
4136 * nonzero values are AGG_CONTEXT_AGGREGATE or AGG_CONTEXT_WINDOW, but more
4137 * values could conceivably appear in future.)
4138 *
4139 * If aggcontext isn't NULL, the function also stores at *aggcontext the
4140 * identity of the memory context that aggregate transition values are being
4141 * stored in. Note that the same aggregate call site (flinfo) may be called
4142 * interleaved on different transition values in different contexts, so it's
4143 * not kosher to cache aggcontext under fn_extra. It is, however, kosher to
4144 * cache it in the transvalue itself (for internal-type transvalues).
4145 */
4146 int
AggCheckCallContext(FunctionCallInfo fcinfo,MemoryContext * aggcontext)4147 AggCheckCallContext(FunctionCallInfo fcinfo, MemoryContext *aggcontext)
4148 {
4149 if (fcinfo->context && IsA(fcinfo->context, AggState))
4150 {
4151 if (aggcontext)
4152 {
4153 AggState *aggstate = ((AggState *) fcinfo->context);
4154 ExprContext *cxt = aggstate->curaggcontext;
4155
4156 *aggcontext = cxt->ecxt_per_tuple_memory;
4157 }
4158 return AGG_CONTEXT_AGGREGATE;
4159 }
4160 if (fcinfo->context && IsA(fcinfo->context, WindowAggState))
4161 {
4162 if (aggcontext)
4163 *aggcontext = ((WindowAggState *) fcinfo->context)->curaggcontext;
4164 return AGG_CONTEXT_WINDOW;
4165 }
4166
4167 /* this is just to prevent "uninitialized variable" warnings */
4168 if (aggcontext)
4169 *aggcontext = NULL;
4170 return 0;
4171 }
4172
4173 /*
4174 * AggGetAggref - allow an aggregate support function to get its Aggref
4175 *
4176 * If the function is being called as an aggregate support function,
4177 * return the Aggref node for the aggregate call. Otherwise, return NULL.
4178 *
4179 * Aggregates sharing the same inputs and transition functions can get
4180 * merged into a single transition calculation. If the transition function
4181 * calls AggGetAggref, it will get some one of the Aggrefs for which it is
4182 * executing. It must therefore not pay attention to the Aggref fields that
4183 * relate to the final function, as those are indeterminate. But if a final
4184 * function calls AggGetAggref, it will get a precise result.
4185 *
4186 * Note that if an aggregate is being used as a window function, this will
4187 * return NULL. We could provide a similar function to return the relevant
4188 * WindowFunc node in such cases, but it's not needed yet.
4189 */
4190 Aggref *
AggGetAggref(FunctionCallInfo fcinfo)4191 AggGetAggref(FunctionCallInfo fcinfo)
4192 {
4193 if (fcinfo->context && IsA(fcinfo->context, AggState))
4194 {
4195 AggStatePerAgg curperagg;
4196 AggStatePerTrans curpertrans;
4197
4198 /* check curperagg (valid when in a final function) */
4199 curperagg = ((AggState *) fcinfo->context)->curperagg;
4200
4201 if (curperagg)
4202 return curperagg->aggref;
4203
4204 /* check curpertrans (valid when in a transition function) */
4205 curpertrans = ((AggState *) fcinfo->context)->curpertrans;
4206
4207 if (curpertrans)
4208 return curpertrans->aggref;
4209 }
4210 return NULL;
4211 }
4212
4213 /*
4214 * AggGetTempMemoryContext - fetch short-term memory context for aggregates
4215 *
4216 * This is useful in agg final functions; the context returned is one that
4217 * the final function can safely reset as desired. This isn't useful for
4218 * transition functions, since the context returned MAY (we don't promise)
4219 * be the same as the context those are called in.
4220 *
4221 * As above, this is currently not useful for aggs called as window functions.
4222 */
4223 MemoryContext
AggGetTempMemoryContext(FunctionCallInfo fcinfo)4224 AggGetTempMemoryContext(FunctionCallInfo fcinfo)
4225 {
4226 if (fcinfo->context && IsA(fcinfo->context, AggState))
4227 {
4228 AggState *aggstate = (AggState *) fcinfo->context;
4229
4230 return aggstate->tmpcontext->ecxt_per_tuple_memory;
4231 }
4232 return NULL;
4233 }
4234
4235 /*
4236 * AggRegisterCallback - register a cleanup callback for an aggregate
4237 *
4238 * This is useful for aggs to register shutdown callbacks, which will ensure
4239 * that non-memory resources are freed. The callback will occur just before
4240 * the associated aggcontext (as returned by AggCheckCallContext) is reset,
4241 * either between groups or as a result of rescanning the query. The callback
4242 * will NOT be called on error paths. The typical use-case is for freeing of
4243 * tuplestores or tuplesorts maintained in aggcontext, or pins held by slots
4244 * created by the agg functions. (The callback will not be called until after
4245 * the result of the finalfn is no longer needed, so it's safe for the finalfn
4246 * to return data that will be freed by the callback.)
4247 *
4248 * As above, this is currently not useful for aggs called as window functions.
4249 */
4250 void
AggRegisterCallback(FunctionCallInfo fcinfo,ExprContextCallbackFunction func,Datum arg)4251 AggRegisterCallback(FunctionCallInfo fcinfo,
4252 ExprContextCallbackFunction func,
4253 Datum arg)
4254 {
4255 if (fcinfo->context && IsA(fcinfo->context, AggState))
4256 {
4257 AggState *aggstate = (AggState *) fcinfo->context;
4258 ExprContext *cxt = aggstate->curaggcontext;
4259
4260 RegisterExprContextCallback(cxt, func, arg);
4261
4262 return;
4263 }
4264 elog(ERROR, "aggregate function cannot register a callback in this context");
4265 }
4266
4267
4268 /*
4269 * aggregate_dummy - dummy execution routine for aggregate functions
4270 *
4271 * This function is listed as the implementation (prosrc field) of pg_proc
4272 * entries for aggregate functions. Its only purpose is to throw an error
4273 * if someone mistakenly executes such a function in the normal way.
4274 *
4275 * Perhaps someday we could assign real meaning to the prosrc field of
4276 * an aggregate?
4277 */
4278 Datum
aggregate_dummy(PG_FUNCTION_ARGS)4279 aggregate_dummy(PG_FUNCTION_ARGS)
4280 {
4281 elog(ERROR, "aggregate function %u called as normal function",
4282 fcinfo->flinfo->fn_oid);
4283 return (Datum) 0; /* keep compiler quiet */
4284 }
4285