1 /*-------------------------------------------------------------------------
2  *
3  * nodeAgg.c
4  *	  Routines to handle aggregate nodes.
5  *
6  *	  ExecAgg normally evaluates each aggregate in the following steps:
7  *
8  *		 transvalue = initcond
9  *		 foreach input_tuple do
10  *			transvalue = transfunc(transvalue, input_value(s))
11  *		 result = finalfunc(transvalue, direct_argument(s))
12  *
13  *	  If a finalfunc is not supplied then the result is just the ending
14  *	  value of transvalue.
15  *
16  *	  Other behaviors can be selected by the "aggsplit" mode, which exists
17  *	  to support partial aggregation.  It is possible to:
18  *	  * Skip running the finalfunc, so that the output is always the
19  *	  final transvalue state.
20  *	  * Substitute the combinefunc for the transfunc, so that transvalue
21  *	  states (propagated up from a child partial-aggregation step) are merged
22  *	  rather than processing raw input rows.  (The statements below about
23  *	  the transfunc apply equally to the combinefunc, when it's selected.)
24  *	  * Apply the serializefunc to the output values (this only makes sense
25  *	  when skipping the finalfunc, since the serializefunc works on the
26  *	  transvalue data type).
27  *	  * Apply the deserializefunc to the input values (this only makes sense
28  *	  when using the combinefunc, for similar reasons).
29  *	  It is the planner's responsibility to connect up Agg nodes using these
30  *	  alternate behaviors in a way that makes sense, with partial aggregation
31  *	  results being fed to nodes that expect them.
32  *
33  *	  If a normal aggregate call specifies DISTINCT or ORDER BY, we sort the
34  *	  input tuples and eliminate duplicates (if required) before performing
35  *	  the above-depicted process.  (However, we don't do that for ordered-set
36  *	  aggregates; their "ORDER BY" inputs are ordinary aggregate arguments
37  *	  so far as this module is concerned.)	Note that partial aggregation
38  *	  is not supported in these cases, since we couldn't ensure global
39  *	  ordering or distinctness of the inputs.
40  *
41  *	  If transfunc is marked "strict" in pg_proc and initcond is NULL,
42  *	  then the first non-NULL input_value is assigned directly to transvalue,
43  *	  and transfunc isn't applied until the second non-NULL input_value.
44  *	  The agg's first input type and transtype must be the same in this case!
45  *
46  *	  If transfunc is marked "strict" then NULL input_values are skipped,
47  *	  keeping the previous transvalue.  If transfunc is not strict then it
48  *	  is called for every input tuple and must deal with NULL initcond
49  *	  or NULL input_values for itself.
50  *
51  *	  If finalfunc is marked "strict" then it is not called when the
52  *	  ending transvalue is NULL, instead a NULL result is created
53  *	  automatically (this is just the usual handling of strict functions,
54  *	  of course).  A non-strict finalfunc can make its own choice of
55  *	  what to return for a NULL ending transvalue.
56  *
57  *	  Ordered-set aggregates are treated specially in one other way: we
58  *	  evaluate any "direct" arguments and pass them to the finalfunc along
59  *	  with the transition value.
60  *
61  *	  A finalfunc can have additional arguments beyond the transvalue and
62  *	  any "direct" arguments, corresponding to the input arguments of the
63  *	  aggregate.  These are always just passed as NULL.  Such arguments may be
64  *	  needed to allow resolution of a polymorphic aggregate's result type.
65  *
66  *	  We compute aggregate input expressions and run the transition functions
67  *	  in a temporary econtext (aggstate->tmpcontext).  This is reset at least
68  *	  once per input tuple, so when the transvalue datatype is
69  *	  pass-by-reference, we have to be careful to copy it into a longer-lived
70  *	  memory context, and free the prior value to avoid memory leakage.  We
71  *	  store transvalues in another set of econtexts, aggstate->aggcontexts
72  *	  (one per grouping set, see below), which are also used for the hashtable
73  *	  structures in AGG_HASHED mode.  These econtexts are rescanned, not just
74  *	  reset, at group boundaries so that aggregate transition functions can
75  *	  register shutdown callbacks via AggRegisterCallback.
76  *
77  *	  The node's regular econtext (aggstate->ss.ps.ps_ExprContext) is used to
78  *	  run finalize functions and compute the output tuple; this context can be
79  *	  reset once per output tuple.
80  *
81  *	  The executor's AggState node is passed as the fmgr "context" value in
82  *	  all transfunc and finalfunc calls.  It is not recommended that the
83  *	  transition functions look at the AggState node directly, but they can
84  *	  use AggCheckCallContext() to verify that they are being called by
85  *	  nodeAgg.c (and not as ordinary SQL functions).  The main reason a
86  *	  transition function might want to know this is so that it can avoid
87  *	  palloc'ing a fixed-size pass-by-ref transition value on every call:
88  *	  it can instead just scribble on and return its left input.  Ordinarily
89  *	  it is completely forbidden for functions to modify pass-by-ref inputs,
90  *	  but in the aggregate case we know the left input is either the initial
91  *	  transition value or a previous function result, and in either case its
92  *	  value need not be preserved.  See int8inc() for an example.  Notice that
93  *	  advance_transition_function() is coded to avoid a data copy step when
94  *	  the previous transition value pointer is returned.  It is also possible
95  *	  to avoid repeated data copying when the transition value is an expanded
96  *	  object: to do that, the transition function must take care to return
97  *	  an expanded object that is in a child context of the memory context
98  *	  returned by AggCheckCallContext().  Also, some transition functions want
99  *	  to store working state in addition to the nominal transition value; they
100  *	  can use the memory context returned by AggCheckCallContext() to do that.
101  *
102  *	  Note: AggCheckCallContext() is available as of PostgreSQL 9.0.  The
103  *	  AggState is available as context in earlier releases (back to 8.1),
104  *	  but direct examination of the node is needed to use it before 9.0.
105  *
106  *	  As of 9.4, aggregate transition functions can also use AggGetAggref()
107  *	  to get hold of the Aggref expression node for their aggregate call.
108  *	  This is mainly intended for ordered-set aggregates, which are not
109  *	  supported as window functions.  (A regular aggregate function would
110  *	  need some fallback logic to use this, since there's no Aggref node
111  *	  for a window function.)
112  *
113  *	  Grouping sets:
114  *
115  *	  A list of grouping sets which is structurally equivalent to a ROLLUP
116  *	  clause (e.g. (a,b,c), (a,b), (a)) can be processed in a single pass over
117  *	  ordered data.  We do this by keeping a separate set of transition values
118  *	  for each grouping set being concurrently processed; for each input tuple
119  *	  we update them all, and on group boundaries we reset those states
120  *	  (starting at the front of the list) whose grouping values have changed
121  *	  (the list of grouping sets is ordered from most specific to least
122  *	  specific).
123  *
124  *	  Where more complex grouping sets are used, we break them down into
125  *	  "phases", where each phase has a different sort order (except phase 0
126  *	  which is reserved for hashing).  During each phase but the last, the
127  *	  input tuples are additionally stored in a tuplesort which is keyed to the
128  *	  next phase's sort order; during each phase but the first, the input
129  *	  tuples are drawn from the previously sorted data.  (The sorting of the
130  *	  data for the first phase is handled by the planner, as it might be
131  *	  satisfied by underlying nodes.)
132  *
133  *	  Hashing can be mixed with sorted grouping.  To do this, we have an
134  *	  AGG_MIXED strategy that populates the hashtables during the first sorted
135  *	  phase, and switches to reading them out after completing all sort phases.
136  *	  We can also support AGG_HASHED with multiple hash tables and no sorting
137  *	  at all.
138  *
139  *	  From the perspective of aggregate transition and final functions, the
140  *	  only issue regarding grouping sets is this: a single call site (flinfo)
141  *	  of an aggregate function may be used for updating several different
142  *	  transition values in turn. So the function must not cache in the flinfo
143  *	  anything which logically belongs as part of the transition value (most
144  *	  importantly, the memory context in which the transition value exists).
145  *	  The support API functions (AggCheckCallContext, AggRegisterCallback) are
146  *	  sensitive to the grouping set for which the aggregate function is
147  *	  currently being called.
148  *
149  *	  Plan structure:
150  *
151  *	  What we get from the planner is actually one "real" Agg node which is
152  *	  part of the plan tree proper, but which optionally has an additional list
153  *	  of Agg nodes hung off the side via the "chain" field.  This is because an
154  *	  Agg node happens to be a convenient representation of all the data we
155  *	  need for grouping sets.
156  *
157  *	  For many purposes, we treat the "real" node as if it were just the first
158  *	  node in the chain.  The chain must be ordered such that hashed entries
159  *	  come before sorted/plain entries; the real node is marked AGG_MIXED if
160  *	  there are both types present (in which case the real node describes one
161  *	  of the hashed groupings, other AGG_HASHED nodes may optionally follow in
162  *	  the chain, followed in turn by AGG_SORTED or (one) AGG_PLAIN node).  If
163  *	  the real node is marked AGG_HASHED or AGG_SORTED, then all the chained
164  *	  nodes must be of the same type; if it is AGG_PLAIN, there can be no
165  *	  chained nodes.
166  *
167  *	  We collect all hashed nodes into a single "phase", numbered 0, and create
168  *	  a sorted phase (numbered 1..n) for each AGG_SORTED or AGG_PLAIN node.
169  *	  Phase 0 is allocated even if there are no hashes, but remains unused in
170  *	  that case.
171  *
172  *	  AGG_HASHED nodes actually refer to only a single grouping set each,
173  *	  because for each hashed grouping we need a separate grpColIdx and
174  *	  numGroups estimate.  AGG_SORTED nodes represent a "rollup", a list of
175  *	  grouping sets that share a sort order.  Each AGG_SORTED node other than
176  *	  the first one has an associated Sort node which describes the sort order
177  *	  to be used; the first sorted node takes its input from the outer subtree,
178  *	  which the planner has already arranged to provide ordered data.
179  *
180  *	  Memory and ExprContext usage:
181  *
182  *	  Because we're accumulating aggregate values across input rows, we need to
183  *	  use more memory contexts than just simple input/output tuple contexts.
184  *	  In fact, for a rollup, we need a separate context for each grouping set
185  *	  so that we can reset the inner (finer-grained) aggregates on their group
186  *	  boundaries while continuing to accumulate values for outer
187  *	  (coarser-grained) groupings.  On top of this, we might be simultaneously
188  *	  populating hashtables; however, we only need one context for all the
189  *	  hashtables.
190  *
191  *	  So we create an array, aggcontexts, with an ExprContext for each grouping
192  *	  set in the largest rollup that we're going to process, and use the
193  *	  per-tuple memory context of those ExprContexts to store the aggregate
194  *	  transition values.  hashcontext is the single context created to support
195  *	  all hash tables.
196  *
197  *
198  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
199  * Portions Copyright (c) 1994, Regents of the University of California
200  *
201  * IDENTIFICATION
202  *	  src/backend/executor/nodeAgg.c
203  *
204  *-------------------------------------------------------------------------
205  */
206 
207 #include "postgres.h"
208 
209 #include "access/htup_details.h"
210 #include "catalog/objectaccess.h"
211 #include "catalog/pg_aggregate.h"
212 #include "catalog/pg_proc.h"
213 #include "catalog/pg_type.h"
214 #include "executor/executor.h"
215 #include "executor/nodeAgg.h"
216 #include "miscadmin.h"
217 #include "nodes/makefuncs.h"
218 #include "nodes/nodeFuncs.h"
219 #include "optimizer/clauses.h"
220 #include "optimizer/tlist.h"
221 #include "parser/parse_agg.h"
222 #include "parser/parse_coerce.h"
223 #include "utils/acl.h"
224 #include "utils/builtins.h"
225 #include "utils/lsyscache.h"
226 #include "utils/memutils.h"
227 #include "utils/syscache.h"
228 #include "utils/tuplesort.h"
229 #include "utils/datum.h"
230 
231 
232 /*
233  * AggStatePerTransData - per aggregate state value information
234  *
235  * Working state for updating the aggregate's state value, by calling the
236  * transition function with an input row. This struct does not store the
237  * information needed to produce the final aggregate result from the transition
238  * state, that's stored in AggStatePerAggData instead. This separation allows
239  * multiple aggregate results to be produced from a single state value.
240  */
241 typedef struct AggStatePerTransData
242 {
243 	/*
244 	 * These values are set up during ExecInitAgg() and do not change
245 	 * thereafter:
246 	 */
247 
248 	/*
249 	 * Link to an Aggref expr this state value is for.
250 	 *
251 	 * There can be multiple Aggref's sharing the same state value, as long as
252 	 * the inputs and transition function are identical. This points to the
253 	 * first one of them.
254 	 */
255 	Aggref	   *aggref;
256 
257 	/*
258 	 * Nominal number of arguments for aggregate function.  For plain aggs,
259 	 * this excludes any ORDER BY expressions.  For ordered-set aggs, this
260 	 * counts both the direct and aggregated (ORDER BY) arguments.
261 	 */
262 	int			numArguments;
263 
264 	/*
265 	 * Number of aggregated input columns.  This includes ORDER BY expressions
266 	 * in both the plain-agg and ordered-set cases.  Ordered-set direct args
267 	 * are not counted, though.
268 	 */
269 	int			numInputs;
270 
271 	/*
272 	 * Number of aggregated input columns to pass to the transfn.  This
273 	 * includes the ORDER BY columns for ordered-set aggs, but not for plain
274 	 * aggs.  (This doesn't count the transition state value!)
275 	 */
276 	int			numTransInputs;
277 
278 	/*
279 	 * At each input row, we perform a single ExecProject call to evaluate all
280 	 * argument expressions that will certainly be needed at this row; that
281 	 * includes this aggregate's filter expression if it has one, or its
282 	 * regular argument expressions (including any ORDER BY columns) if it
283 	 * doesn't.  inputoff is the starting index of this aggregate's required
284 	 * expressions in the resulting tuple.
285 	 */
286 	int			inputoff;
287 
288 	/* Oid of the state transition or combine function */
289 	Oid			transfn_oid;
290 
291 	/* Oid of the serialization function or InvalidOid */
292 	Oid			serialfn_oid;
293 
294 	/* Oid of the deserialization function or InvalidOid */
295 	Oid			deserialfn_oid;
296 
297 	/* Oid of state value's datatype */
298 	Oid			aggtranstype;
299 
300 	/* ExprStates for any direct-argument expressions */
301 	List	   *aggdirectargs;
302 
303 	/*
304 	 * fmgr lookup data for transition function or combine function.  Note in
305 	 * particular that the fn_strict flag is kept here.
306 	 */
307 	FmgrInfo	transfn;
308 
309 	/* fmgr lookup data for serialization function */
310 	FmgrInfo	serialfn;
311 
312 	/* fmgr lookup data for deserialization function */
313 	FmgrInfo	deserialfn;
314 
315 	/* Input collation derived for aggregate */
316 	Oid			aggCollation;
317 
318 	/* number of sorting columns */
319 	int			numSortCols;
320 
321 	/* number of sorting columns to consider in DISTINCT comparisons */
322 	/* (this is either zero or the same as numSortCols) */
323 	int			numDistinctCols;
324 
325 	/* deconstructed sorting information (arrays of length numSortCols) */
326 	AttrNumber *sortColIdx;
327 	Oid		   *sortOperators;
328 	Oid		   *sortCollations;
329 	bool	   *sortNullsFirst;
330 
331 	/*
332 	 * fmgr lookup data for input columns' equality operators --- only
333 	 * set/used when aggregate has DISTINCT flag.  Note that these are in
334 	 * order of sort column index, not parameter index.
335 	 */
336 	FmgrInfo   *equalfns;		/* array of length numDistinctCols */
337 
338 	/*
339 	 * initial value from pg_aggregate entry
340 	 */
341 	Datum		initValue;
342 	bool		initValueIsNull;
343 
344 	/*
345 	 * We need the len and byval info for the agg's input and transition data
346 	 * types in order to know how to copy/delete values.
347 	 *
348 	 * Note that the info for the input type is used only when handling
349 	 * DISTINCT aggs with just one argument, so there is only one input type.
350 	 */
351 	int16		inputtypeLen,
352 				transtypeLen;
353 	bool		inputtypeByVal,
354 				transtypeByVal;
355 
356 	/*
357 	 * Stuff for evaluation of aggregate inputs, when they must be evaluated
358 	 * separately because there's a FILTER expression.  In such cases we will
359 	 * create a sortslot and the result will be stored there, whether or not
360 	 * we're actually sorting.
361 	 */
362 	ProjectionInfo *evalproj;	/* projection machinery */
363 
364 	/*
365 	 * Slots for holding the evaluated input arguments.  These are set up
366 	 * during ExecInitAgg() and then used for each input row requiring either
367 	 * FILTER or ORDER BY/DISTINCT processing.
368 	 */
369 	TupleTableSlot *sortslot;	/* current input tuple */
370 	TupleTableSlot *uniqslot;	/* used for multi-column DISTINCT */
371 	TupleDesc	sortdesc;		/* descriptor of input tuples */
372 
373 	/*
374 	 * These values are working state that is initialized at the start of an
375 	 * input tuple group and updated for each input tuple.
376 	 *
377 	 * For a simple (non DISTINCT/ORDER BY) aggregate, we just feed the input
378 	 * values straight to the transition function.  If it's DISTINCT or
379 	 * requires ORDER BY, we pass the input values into a Tuplesort object;
380 	 * then at completion of the input tuple group, we scan the sorted values,
381 	 * eliminate duplicates if needed, and run the transition function on the
382 	 * rest.
383 	 *
384 	 * We need a separate tuplesort for each grouping set.
385 	 */
386 
387 	Tuplesortstate **sortstates;	/* sort objects, if DISTINCT or ORDER BY */
388 
389 	/*
390 	 * This field is a pre-initialized FunctionCallInfo struct used for
391 	 * calling this aggregate's transfn.  We save a few cycles per row by not
392 	 * re-initializing the unchanging fields; which isn't much, but it seems
393 	 * worth the extra space consumption.
394 	 */
395 	FunctionCallInfoData transfn_fcinfo;
396 
397 	/* Likewise for serialization and deserialization functions */
398 	FunctionCallInfoData serialfn_fcinfo;
399 
400 	FunctionCallInfoData deserialfn_fcinfo;
401 }			AggStatePerTransData;
402 
403 /*
404  * AggStatePerAggData - per-aggregate information
405  *
406  * This contains the information needed to call the final function, to produce
407  * a final aggregate result from the state value. If there are multiple
408  * identical Aggrefs in the query, they can all share the same per-agg data.
409  *
410  * These values are set up during ExecInitAgg() and do not change thereafter.
411  */
412 typedef struct AggStatePerAggData
413 {
414 	/*
415 	 * Link to an Aggref expr this state value is for.
416 	 *
417 	 * There can be multiple identical Aggref's sharing the same per-agg. This
418 	 * points to the first one of them.
419 	 */
420 	Aggref	   *aggref;
421 
422 	/* index to the state value which this agg should use */
423 	int			transno;
424 
425 	/* Optional Oid of final function (may be InvalidOid) */
426 	Oid			finalfn_oid;
427 
428 	/*
429 	 * fmgr lookup data for final function --- only valid when finalfn_oid oid
430 	 * is not InvalidOid.
431 	 */
432 	FmgrInfo	finalfn;
433 
434 	/*
435 	 * Number of arguments to pass to the finalfn.  This is always at least 1
436 	 * (the transition state value) plus any ordered-set direct args. If the
437 	 * finalfn wants extra args then we pass nulls corresponding to the
438 	 * aggregated input columns.
439 	 */
440 	int			numFinalArgs;
441 
442 	/*
443 	 * We need the len and byval info for the agg's result data type in order
444 	 * to know how to copy/delete values.
445 	 */
446 	int16		resulttypeLen;
447 	bool		resulttypeByVal;
448 
449 }			AggStatePerAggData;
450 
451 /*
452  * AggStatePerGroupData - per-aggregate-per-group working state
453  *
454  * These values are working state that is initialized at the start of
455  * an input tuple group and updated for each input tuple.
456  *
457  * In AGG_PLAIN and AGG_SORTED modes, we have a single array of these
458  * structs (pointed to by aggstate->pergroup); we re-use the array for
459  * each input group, if it's AGG_SORTED mode.  In AGG_HASHED mode, the
460  * hash table contains an array of these structs for each tuple group.
461  *
462  * Logically, the sortstate field belongs in this struct, but we do not
463  * keep it here for space reasons: we don't support DISTINCT aggregates
464  * in AGG_HASHED mode, so there's no reason to use up a pointer field
465  * in every entry of the hashtable.
466  */
467 typedef struct AggStatePerGroupData
468 {
469 	Datum		transValue;		/* current transition value */
470 	bool		transValueIsNull;
471 
472 	bool		noTransValue;	/* true if transValue not set yet */
473 
474 	/*
475 	 * Note: noTransValue initially has the same value as transValueIsNull,
476 	 * and if true both are cleared to false at the same time.  They are not
477 	 * the same though: if transfn later returns a NULL, we want to keep that
478 	 * NULL and not auto-replace it with a later input value. Only the first
479 	 * non-NULL input will be auto-substituted.
480 	 */
481 }			AggStatePerGroupData;
482 
483 /*
484  * AggStatePerPhaseData - per-grouping-set-phase state
485  *
486  * Grouping sets are divided into "phases", where a single phase can be
487  * processed in one pass over the input. If there is more than one phase, then
488  * at the end of input from the current phase, state is reset and another pass
489  * taken over the data which has been re-sorted in the mean time.
490  *
491  * Accordingly, each phase specifies a list of grouping sets and group clause
492  * information, plus each phase after the first also has a sort order.
493  */
494 typedef struct AggStatePerPhaseData
495 {
496 	AggStrategy aggstrategy;	/* strategy for this phase */
497 	int			numsets;		/* number of grouping sets (or 0) */
498 	int		   *gset_lengths;	/* lengths of grouping sets */
499 	Bitmapset **grouped_cols;	/* column groupings for rollup */
500 	FmgrInfo   *eqfunctions;	/* per-grouping-field equality fns */
501 	Agg		   *aggnode;		/* Agg node for phase data */
502 	Sort	   *sortnode;		/* Sort node for input ordering for phase */
503 }			AggStatePerPhaseData;
504 
505 /*
506  * AggStatePerHashData - per-hashtable state
507  *
508  * When doing grouping sets with hashing, we have one of these for each
509  * grouping set. (When doing hashing without grouping sets, we have just one of
510  * them.)
511  */
512 typedef struct AggStatePerHashData
513 {
514 	TupleHashTable hashtable;	/* hash table with one entry per group */
515 	TupleHashIterator hashiter; /* for iterating through hash table */
516 	TupleTableSlot *hashslot;	/* slot for loading hash table */
517 	FmgrInfo   *hashfunctions;	/* per-grouping-field hash fns */
518 	FmgrInfo   *eqfunctions;	/* per-grouping-field equality fns */
519 	int			numCols;		/* number of hash key columns */
520 	int			numhashGrpCols; /* number of columns in hash table */
521 	int			largestGrpColIdx;	/* largest col required for hashing */
522 	AttrNumber *hashGrpColIdxInput; /* hash col indices in input slot */
523 	AttrNumber *hashGrpColIdxHash;	/* indices in hashtbl tuples */
524 	Agg		   *aggnode;		/* original Agg node, for numGroups etc. */
525 }			AggStatePerHashData;
526 
527 
528 static void select_current_set(AggState *aggstate, int setno, bool is_hash);
529 static void initialize_phase(AggState *aggstate, int newphase);
530 static TupleTableSlot *fetch_input_tuple(AggState *aggstate);
531 static void initialize_aggregates(AggState *aggstate,
532 					  AggStatePerGroup pergroup,
533 					  int numReset);
534 static void advance_transition_function(AggState *aggstate,
535 							AggStatePerTrans pertrans,
536 							AggStatePerGroup pergroupstate);
537 static void advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup,
538 				   AggStatePerGroup *pergroups);
539 static void advance_combine_function(AggState *aggstate,
540 						 AggStatePerTrans pertrans,
541 						 AggStatePerGroup pergroupstate);
542 static void combine_aggregates(AggState *aggstate, AggStatePerGroup pergroup);
543 static void process_ordered_aggregate_single(AggState *aggstate,
544 								 AggStatePerTrans pertrans,
545 								 AggStatePerGroup pergroupstate);
546 static void process_ordered_aggregate_multi(AggState *aggstate,
547 								AggStatePerTrans pertrans,
548 								AggStatePerGroup pergroupstate);
549 static void finalize_aggregate(AggState *aggstate,
550 				   AggStatePerAgg peragg,
551 				   AggStatePerGroup pergroupstate,
552 				   Datum *resultVal, bool *resultIsNull);
553 static void finalize_partialaggregate(AggState *aggstate,
554 						  AggStatePerAgg peragg,
555 						  AggStatePerGroup pergroupstate,
556 						  Datum *resultVal, bool *resultIsNull);
557 static void prepare_projection_slot(AggState *aggstate,
558 						TupleTableSlot *slot,
559 						int currentSet);
560 static void finalize_aggregates(AggState *aggstate,
561 					AggStatePerAgg peragg,
562 					AggStatePerGroup pergroup);
563 static TupleTableSlot *project_aggregates(AggState *aggstate);
564 static Bitmapset *find_unaggregated_cols(AggState *aggstate);
565 static bool find_unaggregated_cols_walker(Node *node, Bitmapset **colnos);
566 static void build_hash_table(AggState *aggstate);
567 static TupleHashEntryData *lookup_hash_entry(AggState *aggstate);
568 static AggStatePerGroup *lookup_hash_entries(AggState *aggstate);
569 static TupleTableSlot *agg_retrieve_direct(AggState *aggstate);
570 static void agg_fill_hash_table(AggState *aggstate);
571 static TupleTableSlot *agg_retrieve_hash_table(AggState *aggstate);
572 static Datum GetAggInitVal(Datum textInitVal, Oid transtype);
573 static void build_pertrans_for_aggref(AggStatePerTrans pertrans,
574 						  AggState *aggstate, EState *estate,
575 						  Aggref *aggref, Oid aggtransfn, Oid aggtranstype,
576 						  Oid aggserialfn, Oid aggdeserialfn,
577 						  Datum initValue, bool initValueIsNull,
578 						  Oid *inputTypes, int numArguments);
579 static int find_compatible_peragg(Aggref *newagg, AggState *aggstate,
580 					   int lastaggno, List **same_input_transnos);
581 static int find_compatible_pertrans(AggState *aggstate, Aggref *newagg,
582 						 Oid aggtransfn, Oid aggtranstype,
583 						 Oid aggserialfn, Oid aggdeserialfn,
584 						 Datum initValue, bool initValueIsNull,
585 						 List *transnos);
586 
587 
588 /*
589  * Select the current grouping set; affects current_set and
590  * curaggcontext.
591  */
592 static void
select_current_set(AggState * aggstate,int setno,bool is_hash)593 select_current_set(AggState *aggstate, int setno, bool is_hash)
594 {
595 	if (is_hash)
596 		aggstate->curaggcontext = aggstate->hashcontext;
597 	else
598 		aggstate->curaggcontext = aggstate->aggcontexts[setno];
599 
600 	aggstate->current_set = setno;
601 }
602 
603 /*
604  * Switch to phase "newphase", which must either be 0 or 1 (to reset) or
605  * current_phase + 1. Juggle the tuplesorts accordingly.
606  *
607  * Phase 0 is for hashing, which we currently handle last in the AGG_MIXED
608  * case, so when entering phase 0, all we need to do is drop open sorts.
609  */
610 static void
initialize_phase(AggState * aggstate,int newphase)611 initialize_phase(AggState *aggstate, int newphase)
612 {
613 	Assert(newphase <= 1 || newphase == aggstate->current_phase + 1);
614 
615 	/*
616 	 * Whatever the previous state, we're now done with whatever input
617 	 * tuplesort was in use.
618 	 */
619 	if (aggstate->sort_in)
620 	{
621 		tuplesort_end(aggstate->sort_in);
622 		aggstate->sort_in = NULL;
623 	}
624 
625 	if (newphase <= 1)
626 	{
627 		/*
628 		 * Discard any existing output tuplesort.
629 		 */
630 		if (aggstate->sort_out)
631 		{
632 			tuplesort_end(aggstate->sort_out);
633 			aggstate->sort_out = NULL;
634 		}
635 	}
636 	else
637 	{
638 		/*
639 		 * The old output tuplesort becomes the new input one, and this is the
640 		 * right time to actually sort it.
641 		 */
642 		aggstate->sort_in = aggstate->sort_out;
643 		aggstate->sort_out = NULL;
644 		Assert(aggstate->sort_in);
645 		tuplesort_performsort(aggstate->sort_in);
646 	}
647 
648 	/*
649 	 * If this isn't the last phase, we need to sort appropriately for the
650 	 * next phase in sequence.
651 	 */
652 	if (newphase > 0 && newphase < aggstate->numphases - 1)
653 	{
654 		Sort	   *sortnode = aggstate->phases[newphase + 1].sortnode;
655 		PlanState  *outerNode = outerPlanState(aggstate);
656 		TupleDesc	tupDesc = ExecGetResultType(outerNode);
657 
658 		aggstate->sort_out = tuplesort_begin_heap(tupDesc,
659 												  sortnode->numCols,
660 												  sortnode->sortColIdx,
661 												  sortnode->sortOperators,
662 												  sortnode->collations,
663 												  sortnode->nullsFirst,
664 												  work_mem,
665 												  false);
666 	}
667 
668 	aggstate->current_phase = newphase;
669 	aggstate->phase = &aggstate->phases[newphase];
670 }
671 
672 /*
673  * Fetch a tuple from either the outer plan (for phase 1) or from the sorter
674  * populated by the previous phase.  Copy it to the sorter for the next phase
675  * if any.
676  *
677  * Callers cannot rely on memory for tuple in returned slot remaining valid
678  * past any subsequently fetched tuple.
679  */
680 static TupleTableSlot *
fetch_input_tuple(AggState * aggstate)681 fetch_input_tuple(AggState *aggstate)
682 {
683 	TupleTableSlot *slot;
684 
685 	if (aggstate->sort_in)
686 	{
687 		/* make sure we check for interrupts in either path through here */
688 		CHECK_FOR_INTERRUPTS();
689 		if (!tuplesort_gettupleslot(aggstate->sort_in, true, false,
690 									aggstate->sort_slot, NULL))
691 			return NULL;
692 		slot = aggstate->sort_slot;
693 	}
694 	else
695 		slot = ExecProcNode(outerPlanState(aggstate));
696 
697 	if (!TupIsNull(slot) && aggstate->sort_out)
698 		tuplesort_puttupleslot(aggstate->sort_out, slot);
699 
700 	return slot;
701 }
702 
703 /*
704  * (Re)Initialize an individual aggregate.
705  *
706  * This function handles only one grouping set, already set in
707  * aggstate->current_set.
708  *
709  * When called, CurrentMemoryContext should be the per-query context.
710  */
711 static void
initialize_aggregate(AggState * aggstate,AggStatePerTrans pertrans,AggStatePerGroup pergroupstate)712 initialize_aggregate(AggState *aggstate, AggStatePerTrans pertrans,
713 					 AggStatePerGroup pergroupstate)
714 {
715 	/*
716 	 * Start a fresh sort operation for each DISTINCT/ORDER BY aggregate.
717 	 */
718 	if (pertrans->numSortCols > 0)
719 	{
720 		/*
721 		 * In case of rescan, maybe there could be an uncompleted sort
722 		 * operation?  Clean it up if so.
723 		 */
724 		if (pertrans->sortstates[aggstate->current_set])
725 			tuplesort_end(pertrans->sortstates[aggstate->current_set]);
726 
727 
728 		/*
729 		 * We use a plain Datum sorter when there's a single input column;
730 		 * otherwise sort the full tuple.  (See comments for
731 		 * process_ordered_aggregate_single.)
732 		 */
733 		if (pertrans->numInputs == 1)
734 			pertrans->sortstates[aggstate->current_set] =
735 				tuplesort_begin_datum(pertrans->sortdesc->attrs[0]->atttypid,
736 									  pertrans->sortOperators[0],
737 									  pertrans->sortCollations[0],
738 									  pertrans->sortNullsFirst[0],
739 									  work_mem, false);
740 		else
741 			pertrans->sortstates[aggstate->current_set] =
742 				tuplesort_begin_heap(pertrans->sortdesc,
743 									 pertrans->numSortCols,
744 									 pertrans->sortColIdx,
745 									 pertrans->sortOperators,
746 									 pertrans->sortCollations,
747 									 pertrans->sortNullsFirst,
748 									 work_mem, false);
749 	}
750 
751 	/*
752 	 * (Re)set transValue to the initial value.
753 	 *
754 	 * Note that when the initial value is pass-by-ref, we must copy it (into
755 	 * the aggcontext) since we will pfree the transValue later.
756 	 */
757 	if (pertrans->initValueIsNull)
758 		pergroupstate->transValue = pertrans->initValue;
759 	else
760 	{
761 		MemoryContext oldContext;
762 
763 		oldContext = MemoryContextSwitchTo(
764 										   aggstate->curaggcontext->ecxt_per_tuple_memory);
765 		pergroupstate->transValue = datumCopy(pertrans->initValue,
766 											  pertrans->transtypeByVal,
767 											  pertrans->transtypeLen);
768 		MemoryContextSwitchTo(oldContext);
769 	}
770 	pergroupstate->transValueIsNull = pertrans->initValueIsNull;
771 
772 	/*
773 	 * If the initial value for the transition state doesn't exist in the
774 	 * pg_aggregate table then we will let the first non-NULL value returned
775 	 * from the outer procNode become the initial value. (This is useful for
776 	 * aggregates like max() and min().) The noTransValue flag signals that we
777 	 * still need to do this.
778 	 */
779 	pergroupstate->noTransValue = pertrans->initValueIsNull;
780 }
781 
782 /*
783  * Initialize all aggregate transition states for a new group of input values.
784  *
785  * If there are multiple grouping sets, we initialize only the first numReset
786  * of them (the grouping sets are ordered so that the most specific one, which
787  * is reset most often, is first). As a convenience, if numReset is 0, we
788  * reinitialize all sets. numReset is -1 to initialize a hashtable entry, in
789  * which case the caller must have used select_current_set appropriately.
790  *
791  * When called, CurrentMemoryContext should be the per-query context.
792  */
793 static void
initialize_aggregates(AggState * aggstate,AggStatePerGroup pergroup,int numReset)794 initialize_aggregates(AggState *aggstate,
795 					  AggStatePerGroup pergroup,
796 					  int numReset)
797 {
798 	int			transno;
799 	int			numGroupingSets = Max(aggstate->phase->numsets, 1);
800 	int			setno = 0;
801 	int			numTrans = aggstate->numtrans;
802 	AggStatePerTrans transstates = aggstate->pertrans;
803 
804 	if (numReset == 0)
805 		numReset = numGroupingSets;
806 
807 	for (transno = 0; transno < numTrans; transno++)
808 	{
809 		AggStatePerTrans pertrans = &transstates[transno];
810 
811 		if (numReset < 0)
812 		{
813 			AggStatePerGroup pergroupstate;
814 
815 			pergroupstate = &pergroup[transno];
816 
817 			initialize_aggregate(aggstate, pertrans, pergroupstate);
818 		}
819 		else
820 		{
821 			for (setno = 0; setno < numReset; setno++)
822 			{
823 				AggStatePerGroup pergroupstate;
824 
825 				pergroupstate = &pergroup[transno + (setno * numTrans)];
826 
827 				select_current_set(aggstate, setno, false);
828 
829 				initialize_aggregate(aggstate, pertrans, pergroupstate);
830 			}
831 		}
832 	}
833 }
834 
835 /*
836  * Given new input value(s), advance the transition function of one aggregate
837  * state within one grouping set only (already set in aggstate->current_set)
838  *
839  * The new values (and null flags) have been preloaded into argument positions
840  * 1 and up in pertrans->transfn_fcinfo, so that we needn't copy them again to
841  * pass to the transition function.  We also expect that the static fields of
842  * the fcinfo are already initialized; that was done by ExecInitAgg().
843  *
844  * It doesn't matter which memory context this is called in.
845  */
846 static void
advance_transition_function(AggState * aggstate,AggStatePerTrans pertrans,AggStatePerGroup pergroupstate)847 advance_transition_function(AggState *aggstate,
848 							AggStatePerTrans pertrans,
849 							AggStatePerGroup pergroupstate)
850 {
851 	FunctionCallInfo fcinfo = &pertrans->transfn_fcinfo;
852 	MemoryContext oldContext;
853 	Datum		newVal;
854 
855 	if (pertrans->transfn.fn_strict)
856 	{
857 		/*
858 		 * For a strict transfn, nothing happens when there's a NULL input; we
859 		 * just keep the prior transValue.
860 		 */
861 		int			numTransInputs = pertrans->numTransInputs;
862 		int			i;
863 
864 		for (i = 1; i <= numTransInputs; i++)
865 		{
866 			if (fcinfo->argnull[i])
867 				return;
868 		}
869 		if (pergroupstate->noTransValue)
870 		{
871 			/*
872 			 * transValue has not been initialized. This is the first non-NULL
873 			 * input value. We use it as the initial value for transValue. (We
874 			 * already checked that the agg's input type is binary-compatible
875 			 * with its transtype, so straight copy here is OK.)
876 			 *
877 			 * We must copy the datum into aggcontext if it is pass-by-ref. We
878 			 * do not need to pfree the old transValue, since it's NULL.
879 			 */
880 			oldContext = MemoryContextSwitchTo(
881 											   aggstate->curaggcontext->ecxt_per_tuple_memory);
882 			pergroupstate->transValue = datumCopy(fcinfo->arg[1],
883 												  pertrans->transtypeByVal,
884 												  pertrans->transtypeLen);
885 			pergroupstate->transValueIsNull = false;
886 			pergroupstate->noTransValue = false;
887 			MemoryContextSwitchTo(oldContext);
888 			return;
889 		}
890 		if (pergroupstate->transValueIsNull)
891 		{
892 			/*
893 			 * Don't call a strict function with NULL inputs.  Note it is
894 			 * possible to get here despite the above tests, if the transfn is
895 			 * strict *and* returned a NULL on a prior cycle. If that happens
896 			 * we will propagate the NULL all the way to the end.
897 			 */
898 			return;
899 		}
900 	}
901 
902 	/* We run the transition functions in per-input-tuple memory context */
903 	oldContext = MemoryContextSwitchTo(aggstate->tmpcontext->ecxt_per_tuple_memory);
904 
905 	/* set up aggstate->curpertrans for AggGetAggref() */
906 	aggstate->curpertrans = pertrans;
907 
908 	/*
909 	 * OK to call the transition function
910 	 */
911 	fcinfo->arg[0] = pergroupstate->transValue;
912 	fcinfo->argnull[0] = pergroupstate->transValueIsNull;
913 	fcinfo->isnull = false;		/* just in case transfn doesn't set it */
914 
915 	newVal = FunctionCallInvoke(fcinfo);
916 
917 	aggstate->curpertrans = NULL;
918 
919 	/*
920 	 * If pass-by-ref datatype, must copy the new value into aggcontext and
921 	 * free the prior transValue.  But if transfn returned a pointer to its
922 	 * first input, we don't need to do anything.  Also, if transfn returned a
923 	 * pointer to a R/W expanded object that is already a child of the
924 	 * aggcontext, assume we can adopt that value without copying it.
925 	 *
926 	 * It's safe to compare newVal with pergroupstate->transValue without
927 	 * regard for either being NULL, because we below take care to set
928 	 * transValue to 0 when NULL. Otherwise we could end up accidentally not
929 	 * reparenting, when the transValue has the same numerical value as
930 	 * newVal, despite being NULL.  This is a somewhat hot path, making it
931 	 * undesirable to instead solve this with another branch for the common
932 	 * case of the transition function returning its (modified) input
933 	 * argument.
934 	 */
935 	if (!pertrans->transtypeByVal &&
936 		DatumGetPointer(newVal) != DatumGetPointer(pergroupstate->transValue))
937 	{
938 		if (!fcinfo->isnull)
939 		{
940 			MemoryContextSwitchTo(aggstate->curaggcontext->ecxt_per_tuple_memory);
941 			if (DatumIsReadWriteExpandedObject(newVal,
942 											   false,
943 											   pertrans->transtypeLen) &&
944 				MemoryContextGetParent(DatumGetEOHP(newVal)->eoh_context) == CurrentMemoryContext)
945 				 /* do nothing */ ;
946 			else
947 				newVal = datumCopy(newVal,
948 								   pertrans->transtypeByVal,
949 								   pertrans->transtypeLen);
950 		}
951 		else
952 		{
953 			/*
954 			 * Ensure that pergroupstate->transValue ends up being 0, so we
955 			 * can safely compare newVal/transValue without having to check
956 			 * their respective nullness.
957 			 */
958 			newVal = (Datum) 0;
959 		}
960 
961 		if (!pergroupstate->transValueIsNull)
962 		{
963 			if (DatumIsReadWriteExpandedObject(pergroupstate->transValue,
964 											   false,
965 											   pertrans->transtypeLen))
966 				DeleteExpandedObject(pergroupstate->transValue);
967 			else
968 				pfree(DatumGetPointer(pergroupstate->transValue));
969 		}
970 	}
971 
972 	pergroupstate->transValue = newVal;
973 	pergroupstate->transValueIsNull = fcinfo->isnull;
974 
975 	MemoryContextSwitchTo(oldContext);
976 }
977 
978 /*
979  * Advance each aggregate transition state for one input tuple.  The input
980  * tuple has been stored in tmpcontext->ecxt_outertuple, so that it is
981  * accessible to ExecEvalExpr.
982  *
983  * We have two sets of transition states to handle: one for sorted aggregation
984  * and one for hashed; we do them both here, to avoid multiple evaluation of
985  * the inputs.
986  *
987  * When called, CurrentMemoryContext should be the per-query context.
988  */
989 static void
advance_aggregates(AggState * aggstate,AggStatePerGroup pergroup,AggStatePerGroup * pergroups)990 advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup, AggStatePerGroup *pergroups)
991 {
992 	int			transno;
993 	int			setno = 0;
994 	int			numGroupingSets = Max(aggstate->phase->numsets, 1);
995 	int			numHashes = aggstate->num_hashes;
996 	int			numTrans = aggstate->numtrans;
997 	TupleTableSlot *combinedslot;
998 
999 	/* compute required inputs for all aggregates */
1000 	combinedslot = ExecProject(aggstate->combinedproj);
1001 
1002 	for (transno = 0; transno < numTrans; transno++)
1003 	{
1004 		AggStatePerTrans pertrans = &aggstate->pertrans[transno];
1005 		int			numTransInputs = pertrans->numTransInputs;
1006 		int			inputoff = pertrans->inputoff;
1007 		TupleTableSlot *slot;
1008 		int			i;
1009 
1010 		/* Skip anything FILTERed out */
1011 		if (pertrans->aggref->aggfilter)
1012 		{
1013 			/* Check the result of the filter expression */
1014 			if (combinedslot->tts_isnull[inputoff] ||
1015 				!DatumGetBool(combinedslot->tts_values[inputoff]))
1016 				continue;
1017 
1018 			/* Now it's safe to evaluate this agg's arguments */
1019 			slot = ExecProject(pertrans->evalproj);
1020 			/* There's no offset needed in this slot, of course */
1021 			inputoff = 0;
1022 		}
1023 		else
1024 		{
1025 			/* arguments are already evaluated into combinedslot @ inputoff */
1026 			slot = combinedslot;
1027 		}
1028 
1029 		if (pertrans->numSortCols > 0)
1030 		{
1031 			/* DISTINCT and/or ORDER BY case */
1032 			Assert(slot->tts_nvalid >= (pertrans->numInputs + inputoff));
1033 			Assert(!pergroups);
1034 
1035 			/*
1036 			 * If the transfn is strict, we want to check for nullity before
1037 			 * storing the row in the sorter, to save space if there are a lot
1038 			 * of nulls.  Note that we must only check numTransInputs columns,
1039 			 * not numInputs, since nullity in columns used only for sorting
1040 			 * is not relevant here.
1041 			 */
1042 			if (pertrans->transfn.fn_strict)
1043 			{
1044 				for (i = 0; i < numTransInputs; i++)
1045 				{
1046 					if (slot->tts_isnull[i + inputoff])
1047 						break;
1048 				}
1049 				if (i < numTransInputs)
1050 					continue;
1051 			}
1052 
1053 			for (setno = 0; setno < numGroupingSets; setno++)
1054 			{
1055 				/* OK, put the tuple into the tuplesort object */
1056 				if (pertrans->numInputs == 1)
1057 					tuplesort_putdatum(pertrans->sortstates[setno],
1058 									   slot->tts_values[inputoff],
1059 									   slot->tts_isnull[inputoff]);
1060 				else if (pertrans->aggref->aggfilter)
1061 				{
1062 					/*
1063 					 * When filtering and ordering, we already have a slot
1064 					 * containing just the argument columns.
1065 					 */
1066 					Assert(slot == pertrans->sortslot);
1067 					tuplesort_puttupleslot(pertrans->sortstates[setno], slot);
1068 				}
1069 				else
1070 				{
1071 					/*
1072 					 * Copy argument columns from combined slot, starting at
1073 					 * inputoff, into sortslot, so that we can store just the
1074 					 * columns we want.
1075 					 */
1076 					ExecClearTuple(pertrans->sortslot);
1077 					memcpy(pertrans->sortslot->tts_values,
1078 						   &slot->tts_values[inputoff],
1079 						   pertrans->numInputs * sizeof(Datum));
1080 					memcpy(pertrans->sortslot->tts_isnull,
1081 						   &slot->tts_isnull[inputoff],
1082 						   pertrans->numInputs * sizeof(bool));
1083 					ExecStoreVirtualTuple(pertrans->sortslot);
1084 					tuplesort_puttupleslot(pertrans->sortstates[setno],
1085 										   pertrans->sortslot);
1086 				}
1087 			}
1088 		}
1089 		else
1090 		{
1091 			/* We can apply the transition function immediately */
1092 			FunctionCallInfo fcinfo = &pertrans->transfn_fcinfo;
1093 
1094 			/* Load values into fcinfo */
1095 			/* Start from 1, since the 0th arg will be the transition value */
1096 			Assert(slot->tts_nvalid >= (numTransInputs + inputoff));
1097 
1098 			for (i = 0; i < numTransInputs; i++)
1099 			{
1100 				fcinfo->arg[i + 1] = slot->tts_values[i + inputoff];
1101 				fcinfo->argnull[i + 1] = slot->tts_isnull[i + inputoff];
1102 			}
1103 
1104 			if (pergroup)
1105 			{
1106 				/* advance transition states for ordered grouping */
1107 
1108 				for (setno = 0; setno < numGroupingSets; setno++)
1109 				{
1110 					AggStatePerGroup pergroupstate;
1111 
1112 					select_current_set(aggstate, setno, false);
1113 
1114 					pergroupstate = &pergroup[transno + (setno * numTrans)];
1115 
1116 					advance_transition_function(aggstate, pertrans, pergroupstate);
1117 				}
1118 			}
1119 
1120 			if (pergroups)
1121 			{
1122 				/* advance transition states for hashed grouping */
1123 
1124 				for (setno = 0; setno < numHashes; setno++)
1125 				{
1126 					AggStatePerGroup pergroupstate;
1127 
1128 					select_current_set(aggstate, setno, true);
1129 
1130 					pergroupstate = &pergroups[setno][transno];
1131 
1132 					advance_transition_function(aggstate, pertrans, pergroupstate);
1133 				}
1134 			}
1135 		}
1136 	}
1137 }
1138 
1139 /*
1140  * combine_aggregates replaces advance_aggregates in DO_AGGSPLIT_COMBINE
1141  * mode.  The principal difference is that here we may need to apply the
1142  * deserialization function before running the transfn (which, in this mode,
1143  * is actually the aggregate's combinefn).  Also, we know we don't need to
1144  * handle FILTER, DISTINCT, ORDER BY, or grouping sets.
1145  */
1146 static void
combine_aggregates(AggState * aggstate,AggStatePerGroup pergroup)1147 combine_aggregates(AggState *aggstate, AggStatePerGroup pergroup)
1148 {
1149 	int			transno;
1150 	int			numTrans = aggstate->numtrans;
1151 	TupleTableSlot *slot;
1152 
1153 	/* combine not supported with grouping sets */
1154 	Assert(aggstate->phase->numsets <= 1);
1155 
1156 	/* compute input for all aggregates */
1157 	slot = ExecProject(aggstate->combinedproj);
1158 
1159 	for (transno = 0; transno < numTrans; transno++)
1160 	{
1161 		AggStatePerTrans pertrans = &aggstate->pertrans[transno];
1162 		AggStatePerGroup pergroupstate = &pergroup[transno];
1163 		FunctionCallInfo fcinfo = &pertrans->transfn_fcinfo;
1164 		int			inputoff = pertrans->inputoff;
1165 
1166 		Assert(slot->tts_nvalid > inputoff);
1167 
1168 		/*
1169 		 * deserialfn_oid will be set if we must deserialize the input state
1170 		 * before calling the combine function
1171 		 */
1172 		if (OidIsValid(pertrans->deserialfn_oid))
1173 		{
1174 			/* Don't call a strict deserialization function with NULL input */
1175 			if (pertrans->deserialfn.fn_strict && slot->tts_isnull[inputoff])
1176 			{
1177 				fcinfo->arg[1] = slot->tts_values[inputoff];
1178 				fcinfo->argnull[1] = slot->tts_isnull[inputoff];
1179 			}
1180 			else
1181 			{
1182 				FunctionCallInfo dsinfo = &pertrans->deserialfn_fcinfo;
1183 				MemoryContext oldContext;
1184 
1185 				dsinfo->arg[0] = slot->tts_values[inputoff];
1186 				dsinfo->argnull[0] = slot->tts_isnull[inputoff];
1187 				/* Dummy second argument for type-safety reasons */
1188 				dsinfo->arg[1] = PointerGetDatum(NULL);
1189 				dsinfo->argnull[1] = false;
1190 
1191 				/*
1192 				 * We run the deserialization functions in per-input-tuple
1193 				 * memory context.
1194 				 */
1195 				oldContext = MemoryContextSwitchTo(aggstate->tmpcontext->ecxt_per_tuple_memory);
1196 
1197 				fcinfo->arg[1] = FunctionCallInvoke(dsinfo);
1198 				fcinfo->argnull[1] = dsinfo->isnull;
1199 
1200 				MemoryContextSwitchTo(oldContext);
1201 			}
1202 		}
1203 		else
1204 		{
1205 			fcinfo->arg[1] = slot->tts_values[inputoff];
1206 			fcinfo->argnull[1] = slot->tts_isnull[inputoff];
1207 		}
1208 
1209 		advance_combine_function(aggstate, pertrans, pergroupstate);
1210 	}
1211 }
1212 
1213 /*
1214  * Perform combination of states between 2 aggregate states. Effectively this
1215  * 'adds' two states together by whichever logic is defined in the aggregate
1216  * function's combine function.
1217  *
1218  * Note that in this case transfn is set to the combination function. This
1219  * perhaps should be changed to avoid confusion, but one field is ok for now
1220  * as they'll never be needed at the same time.
1221  */
1222 static void
advance_combine_function(AggState * aggstate,AggStatePerTrans pertrans,AggStatePerGroup pergroupstate)1223 advance_combine_function(AggState *aggstate,
1224 						 AggStatePerTrans pertrans,
1225 						 AggStatePerGroup pergroupstate)
1226 {
1227 	FunctionCallInfo fcinfo = &pertrans->transfn_fcinfo;
1228 	MemoryContext oldContext;
1229 	Datum		newVal;
1230 
1231 	if (pertrans->transfn.fn_strict)
1232 	{
1233 		/* if we're asked to merge to a NULL state, then do nothing */
1234 		if (fcinfo->argnull[1])
1235 			return;
1236 
1237 		if (pergroupstate->noTransValue)
1238 		{
1239 			/*
1240 			 * transValue has not yet been initialized.  If pass-by-ref
1241 			 * datatype we must copy the combining state value into
1242 			 * aggcontext.
1243 			 */
1244 			if (!pertrans->transtypeByVal)
1245 			{
1246 				oldContext = MemoryContextSwitchTo(
1247 												   aggstate->curaggcontext->ecxt_per_tuple_memory);
1248 				pergroupstate->transValue = datumCopy(fcinfo->arg[1],
1249 													  pertrans->transtypeByVal,
1250 													  pertrans->transtypeLen);
1251 				MemoryContextSwitchTo(oldContext);
1252 			}
1253 			else
1254 				pergroupstate->transValue = fcinfo->arg[1];
1255 
1256 			pergroupstate->transValueIsNull = false;
1257 			pergroupstate->noTransValue = false;
1258 			return;
1259 		}
1260 
1261 		if (pergroupstate->transValueIsNull)
1262 		{
1263 			/*
1264 			 * Don't call a strict function with NULL inputs.  Note it is
1265 			 * possible to get here despite the above tests, if the combinefn
1266 			 * is strict *and* returned a NULL on a prior cycle. If that
1267 			 * happens we will propagate the NULL all the way to the end.
1268 			 */
1269 			return;
1270 		}
1271 	}
1272 
1273 	/* We run the combine functions in per-input-tuple memory context */
1274 	oldContext = MemoryContextSwitchTo(aggstate->tmpcontext->ecxt_per_tuple_memory);
1275 
1276 	/* set up aggstate->curpertrans for AggGetAggref() */
1277 	aggstate->curpertrans = pertrans;
1278 
1279 	/*
1280 	 * OK to call the combine function
1281 	 */
1282 	fcinfo->arg[0] = pergroupstate->transValue;
1283 	fcinfo->argnull[0] = pergroupstate->transValueIsNull;
1284 	fcinfo->isnull = false;		/* just in case combine func doesn't set it */
1285 
1286 	newVal = FunctionCallInvoke(fcinfo);
1287 
1288 	aggstate->curpertrans = NULL;
1289 
1290 	/*
1291 	 * If pass-by-ref datatype, must copy the new value into aggcontext and
1292 	 * free the prior transValue.  But if the combine function returned a
1293 	 * pointer to its first input, we don't need to do anything.  Also, if the
1294 	 * combine function returned a pointer to a R/W expanded object that is
1295 	 * already a child of the aggcontext, assume we can adopt that value
1296 	 * without copying it.
1297 	 */
1298 	if (!pertrans->transtypeByVal &&
1299 		DatumGetPointer(newVal) != DatumGetPointer(pergroupstate->transValue))
1300 	{
1301 		if (!fcinfo->isnull)
1302 		{
1303 			MemoryContextSwitchTo(aggstate->curaggcontext->ecxt_per_tuple_memory);
1304 			if (DatumIsReadWriteExpandedObject(newVal,
1305 											   false,
1306 											   pertrans->transtypeLen) &&
1307 				MemoryContextGetParent(DatumGetEOHP(newVal)->eoh_context) == CurrentMemoryContext)
1308 				 /* do nothing */ ;
1309 			else
1310 				newVal = datumCopy(newVal,
1311 								   pertrans->transtypeByVal,
1312 								   pertrans->transtypeLen);
1313 		}
1314 		if (!pergroupstate->transValueIsNull)
1315 		{
1316 			if (DatumIsReadWriteExpandedObject(pergroupstate->transValue,
1317 											   false,
1318 											   pertrans->transtypeLen))
1319 				DeleteExpandedObject(pergroupstate->transValue);
1320 			else
1321 				pfree(DatumGetPointer(pergroupstate->transValue));
1322 		}
1323 	}
1324 
1325 	pergroupstate->transValue = newVal;
1326 	pergroupstate->transValueIsNull = fcinfo->isnull;
1327 
1328 	MemoryContextSwitchTo(oldContext);
1329 }
1330 
1331 
1332 /*
1333  * Run the transition function for a DISTINCT or ORDER BY aggregate
1334  * with only one input.  This is called after we have completed
1335  * entering all the input values into the sort object.  We complete the
1336  * sort, read out the values in sorted order, and run the transition
1337  * function on each value (applying DISTINCT if appropriate).
1338  *
1339  * Note that the strictness of the transition function was checked when
1340  * entering the values into the sort, so we don't check it again here;
1341  * we just apply standard SQL DISTINCT logic.
1342  *
1343  * The one-input case is handled separately from the multi-input case
1344  * for performance reasons: for single by-value inputs, such as the
1345  * common case of count(distinct id), the tuplesort_getdatum code path
1346  * is around 300% faster.  (The speedup for by-reference types is less
1347  * but still noticeable.)
1348  *
1349  * This function handles only one grouping set (already set in
1350  * aggstate->current_set).
1351  *
1352  * When called, CurrentMemoryContext should be the per-query context.
1353  */
1354 static void
process_ordered_aggregate_single(AggState * aggstate,AggStatePerTrans pertrans,AggStatePerGroup pergroupstate)1355 process_ordered_aggregate_single(AggState *aggstate,
1356 								 AggStatePerTrans pertrans,
1357 								 AggStatePerGroup pergroupstate)
1358 {
1359 	Datum		oldVal = (Datum) 0;
1360 	bool		oldIsNull = true;
1361 	bool		haveOldVal = false;
1362 	MemoryContext workcontext = aggstate->tmpcontext->ecxt_per_tuple_memory;
1363 	MemoryContext oldContext;
1364 	bool		isDistinct = (pertrans->numDistinctCols > 0);
1365 	Datum		newAbbrevVal = (Datum) 0;
1366 	Datum		oldAbbrevVal = (Datum) 0;
1367 	FunctionCallInfo fcinfo = &pertrans->transfn_fcinfo;
1368 	Datum	   *newVal;
1369 	bool	   *isNull;
1370 
1371 	Assert(pertrans->numDistinctCols < 2);
1372 
1373 	tuplesort_performsort(pertrans->sortstates[aggstate->current_set]);
1374 
1375 	/* Load the column into argument 1 (arg 0 will be transition value) */
1376 	newVal = fcinfo->arg + 1;
1377 	isNull = fcinfo->argnull + 1;
1378 
1379 	/*
1380 	 * Note: if input type is pass-by-ref, the datums returned by the sort are
1381 	 * freshly palloc'd in the per-query context, so we must be careful to
1382 	 * pfree them when they are no longer needed.
1383 	 */
1384 
1385 	while (tuplesort_getdatum(pertrans->sortstates[aggstate->current_set],
1386 							  true, newVal, isNull, &newAbbrevVal))
1387 	{
1388 		/*
1389 		 * Clear and select the working context for evaluation of the equality
1390 		 * function and transition function.
1391 		 */
1392 		MemoryContextReset(workcontext);
1393 		oldContext = MemoryContextSwitchTo(workcontext);
1394 
1395 		/*
1396 		 * If DISTINCT mode, and not distinct from prior, skip it.
1397 		 *
1398 		 * Note: we assume equality functions don't care about collation.
1399 		 */
1400 		if (isDistinct &&
1401 			haveOldVal &&
1402 			((oldIsNull && *isNull) ||
1403 			 (!oldIsNull && !*isNull &&
1404 			  oldAbbrevVal == newAbbrevVal &&
1405 			  DatumGetBool(FunctionCall2(&pertrans->equalfns[0],
1406 										 oldVal, *newVal)))))
1407 		{
1408 			/* equal to prior, so forget this one */
1409 			if (!pertrans->inputtypeByVal && !*isNull)
1410 				pfree(DatumGetPointer(*newVal));
1411 		}
1412 		else
1413 		{
1414 			advance_transition_function(aggstate, pertrans, pergroupstate);
1415 			/* forget the old value, if any */
1416 			if (!oldIsNull && !pertrans->inputtypeByVal)
1417 				pfree(DatumGetPointer(oldVal));
1418 			/* and remember the new one for subsequent equality checks */
1419 			oldVal = *newVal;
1420 			oldAbbrevVal = newAbbrevVal;
1421 			oldIsNull = *isNull;
1422 			haveOldVal = true;
1423 		}
1424 
1425 		MemoryContextSwitchTo(oldContext);
1426 	}
1427 
1428 	if (!oldIsNull && !pertrans->inputtypeByVal)
1429 		pfree(DatumGetPointer(oldVal));
1430 
1431 	tuplesort_end(pertrans->sortstates[aggstate->current_set]);
1432 	pertrans->sortstates[aggstate->current_set] = NULL;
1433 }
1434 
1435 /*
1436  * Run the transition function for a DISTINCT or ORDER BY aggregate
1437  * with more than one input.  This is called after we have completed
1438  * entering all the input values into the sort object.  We complete the
1439  * sort, read out the values in sorted order, and run the transition
1440  * function on each value (applying DISTINCT if appropriate).
1441  *
1442  * This function handles only one grouping set (already set in
1443  * aggstate->current_set).
1444  *
1445  * When called, CurrentMemoryContext should be the per-query context.
1446  */
1447 static void
process_ordered_aggregate_multi(AggState * aggstate,AggStatePerTrans pertrans,AggStatePerGroup pergroupstate)1448 process_ordered_aggregate_multi(AggState *aggstate,
1449 								AggStatePerTrans pertrans,
1450 								AggStatePerGroup pergroupstate)
1451 {
1452 	MemoryContext workcontext = aggstate->tmpcontext->ecxt_per_tuple_memory;
1453 	FunctionCallInfo fcinfo = &pertrans->transfn_fcinfo;
1454 	TupleTableSlot *slot1 = pertrans->sortslot;
1455 	TupleTableSlot *slot2 = pertrans->uniqslot;
1456 	int			numTransInputs = pertrans->numTransInputs;
1457 	int			numDistinctCols = pertrans->numDistinctCols;
1458 	Datum		newAbbrevVal = (Datum) 0;
1459 	Datum		oldAbbrevVal = (Datum) 0;
1460 	bool		haveOldValue = false;
1461 	int			i;
1462 
1463 	tuplesort_performsort(pertrans->sortstates[aggstate->current_set]);
1464 
1465 	ExecClearTuple(slot1);
1466 	if (slot2)
1467 		ExecClearTuple(slot2);
1468 
1469 	while (tuplesort_gettupleslot(pertrans->sortstates[aggstate->current_set],
1470 								  true, true, slot1, &newAbbrevVal))
1471 	{
1472 		CHECK_FOR_INTERRUPTS();
1473 
1474 		/*
1475 		 * Extract the first numTransInputs columns as datums to pass to the
1476 		 * transfn.  (This will help execTuplesMatch too, so we do it
1477 		 * immediately.)
1478 		 */
1479 		slot_getsomeattrs(slot1, numTransInputs);
1480 
1481 		if (numDistinctCols == 0 ||
1482 			!haveOldValue ||
1483 			newAbbrevVal != oldAbbrevVal ||
1484 			!execTuplesMatch(slot1, slot2,
1485 							 numDistinctCols,
1486 							 pertrans->sortColIdx,
1487 							 pertrans->equalfns,
1488 							 workcontext))
1489 		{
1490 			/* Load values into fcinfo */
1491 			/* Start from 1, since the 0th arg will be the transition value */
1492 			for (i = 0; i < numTransInputs; i++)
1493 			{
1494 				fcinfo->arg[i + 1] = slot1->tts_values[i];
1495 				fcinfo->argnull[i + 1] = slot1->tts_isnull[i];
1496 			}
1497 
1498 			advance_transition_function(aggstate, pertrans, pergroupstate);
1499 
1500 			if (numDistinctCols > 0)
1501 			{
1502 				/* swap the slot pointers to retain the current tuple */
1503 				TupleTableSlot *tmpslot = slot2;
1504 
1505 				slot2 = slot1;
1506 				slot1 = tmpslot;
1507 				/* avoid execTuplesMatch() calls by reusing abbreviated keys */
1508 				oldAbbrevVal = newAbbrevVal;
1509 				haveOldValue = true;
1510 			}
1511 		}
1512 
1513 		/* Reset context each time, unless execTuplesMatch did it for us */
1514 		if (numDistinctCols == 0)
1515 			MemoryContextReset(workcontext);
1516 
1517 		ExecClearTuple(slot1);
1518 	}
1519 
1520 	if (slot2)
1521 		ExecClearTuple(slot2);
1522 
1523 	tuplesort_end(pertrans->sortstates[aggstate->current_set]);
1524 	pertrans->sortstates[aggstate->current_set] = NULL;
1525 }
1526 
1527 /*
1528  * Compute the final value of one aggregate.
1529  *
1530  * This function handles only one grouping set (already set in
1531  * aggstate->current_set).
1532  *
1533  * The finalfunction will be run, and the result delivered, in the
1534  * output-tuple context; caller's CurrentMemoryContext does not matter.
1535  *
1536  * The finalfn uses the state as set in the transno. This also might be
1537  * being used by another aggregate function, so it's important that we do
1538  * nothing destructive here.
1539  */
1540 static void
finalize_aggregate(AggState * aggstate,AggStatePerAgg peragg,AggStatePerGroup pergroupstate,Datum * resultVal,bool * resultIsNull)1541 finalize_aggregate(AggState *aggstate,
1542 				   AggStatePerAgg peragg,
1543 				   AggStatePerGroup pergroupstate,
1544 				   Datum *resultVal, bool *resultIsNull)
1545 {
1546 	FunctionCallInfoData fcinfo;
1547 	bool		anynull = false;
1548 	MemoryContext oldContext;
1549 	int			i;
1550 	ListCell   *lc;
1551 	AggStatePerTrans pertrans = &aggstate->pertrans[peragg->transno];
1552 
1553 	oldContext = MemoryContextSwitchTo(aggstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
1554 
1555 	/*
1556 	 * Evaluate any direct arguments.  We do this even if there's no finalfn
1557 	 * (which is unlikely anyway), so that side-effects happen as expected.
1558 	 * The direct arguments go into arg positions 1 and up, leaving position 0
1559 	 * for the transition state value.
1560 	 */
1561 	i = 1;
1562 	foreach(lc, pertrans->aggdirectargs)
1563 	{
1564 		ExprState  *expr = (ExprState *) lfirst(lc);
1565 
1566 		fcinfo.arg[i] = ExecEvalExpr(expr,
1567 									 aggstate->ss.ps.ps_ExprContext,
1568 									 &fcinfo.argnull[i]);
1569 		anynull |= fcinfo.argnull[i];
1570 		i++;
1571 	}
1572 
1573 	/*
1574 	 * Apply the agg's finalfn if one is provided, else return transValue.
1575 	 */
1576 	if (OidIsValid(peragg->finalfn_oid))
1577 	{
1578 		int			numFinalArgs = peragg->numFinalArgs;
1579 
1580 		/* set up aggstate->curperagg for AggGetAggref() */
1581 		aggstate->curperagg = peragg;
1582 
1583 		InitFunctionCallInfoData(fcinfo, &peragg->finalfn,
1584 								 numFinalArgs,
1585 								 pertrans->aggCollation,
1586 								 (void *) aggstate, NULL);
1587 
1588 		/* Fill in the transition state value */
1589 		fcinfo.arg[0] = MakeExpandedObjectReadOnly(pergroupstate->transValue,
1590 												   pergroupstate->transValueIsNull,
1591 												   pertrans->transtypeLen);
1592 		fcinfo.argnull[0] = pergroupstate->transValueIsNull;
1593 		anynull |= pergroupstate->transValueIsNull;
1594 
1595 		/* Fill any remaining argument positions with nulls */
1596 		for (; i < numFinalArgs; i++)
1597 		{
1598 			fcinfo.arg[i] = (Datum) 0;
1599 			fcinfo.argnull[i] = true;
1600 			anynull = true;
1601 		}
1602 
1603 		if (fcinfo.flinfo->fn_strict && anynull)
1604 		{
1605 			/* don't call a strict function with NULL inputs */
1606 			*resultVal = (Datum) 0;
1607 			*resultIsNull = true;
1608 		}
1609 		else
1610 		{
1611 			*resultVal = FunctionCallInvoke(&fcinfo);
1612 			*resultIsNull = fcinfo.isnull;
1613 		}
1614 		aggstate->curperagg = NULL;
1615 	}
1616 	else
1617 	{
1618 		/* Don't need MakeExpandedObjectReadOnly; datumCopy will copy it */
1619 		*resultVal = pergroupstate->transValue;
1620 		*resultIsNull = pergroupstate->transValueIsNull;
1621 	}
1622 
1623 	/*
1624 	 * If result is pass-by-ref, make sure it is in the right context.
1625 	 */
1626 	if (!peragg->resulttypeByVal && !*resultIsNull &&
1627 		!MemoryContextContains(CurrentMemoryContext,
1628 							   DatumGetPointer(*resultVal)))
1629 		*resultVal = datumCopy(*resultVal,
1630 							   peragg->resulttypeByVal,
1631 							   peragg->resulttypeLen);
1632 
1633 	MemoryContextSwitchTo(oldContext);
1634 }
1635 
1636 /*
1637  * Compute the output value of one partial aggregate.
1638  *
1639  * The serialization function will be run, and the result delivered, in the
1640  * output-tuple context; caller's CurrentMemoryContext does not matter.
1641  */
1642 static void
finalize_partialaggregate(AggState * aggstate,AggStatePerAgg peragg,AggStatePerGroup pergroupstate,Datum * resultVal,bool * resultIsNull)1643 finalize_partialaggregate(AggState *aggstate,
1644 						  AggStatePerAgg peragg,
1645 						  AggStatePerGroup pergroupstate,
1646 						  Datum *resultVal, bool *resultIsNull)
1647 {
1648 	AggStatePerTrans pertrans = &aggstate->pertrans[peragg->transno];
1649 	MemoryContext oldContext;
1650 
1651 	oldContext = MemoryContextSwitchTo(aggstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
1652 
1653 	/*
1654 	 * serialfn_oid will be set if we must serialize the transvalue before
1655 	 * returning it
1656 	 */
1657 	if (OidIsValid(pertrans->serialfn_oid))
1658 	{
1659 		/* Don't call a strict serialization function with NULL input. */
1660 		if (pertrans->serialfn.fn_strict && pergroupstate->transValueIsNull)
1661 		{
1662 			*resultVal = (Datum) 0;
1663 			*resultIsNull = true;
1664 		}
1665 		else
1666 		{
1667 			FunctionCallInfo fcinfo = &pertrans->serialfn_fcinfo;
1668 
1669 			fcinfo->arg[0] = MakeExpandedObjectReadOnly(pergroupstate->transValue,
1670 														pergroupstate->transValueIsNull,
1671 														pertrans->transtypeLen);
1672 			fcinfo->argnull[0] = pergroupstate->transValueIsNull;
1673 			fcinfo->isnull = false;
1674 
1675 			*resultVal = FunctionCallInvoke(fcinfo);
1676 			*resultIsNull = fcinfo->isnull;
1677 		}
1678 	}
1679 	else
1680 	{
1681 		/* Don't need MakeExpandedObjectReadOnly; datumCopy will copy it */
1682 		*resultVal = pergroupstate->transValue;
1683 		*resultIsNull = pergroupstate->transValueIsNull;
1684 	}
1685 
1686 	/* If result is pass-by-ref, make sure it is in the right context. */
1687 	if (!peragg->resulttypeByVal && !*resultIsNull &&
1688 		!MemoryContextContains(CurrentMemoryContext,
1689 							   DatumGetPointer(*resultVal)))
1690 		*resultVal = datumCopy(*resultVal,
1691 							   peragg->resulttypeByVal,
1692 							   peragg->resulttypeLen);
1693 
1694 	MemoryContextSwitchTo(oldContext);
1695 }
1696 
1697 /*
1698  * Prepare to finalize and project based on the specified representative tuple
1699  * slot and grouping set.
1700  *
1701  * In the specified tuple slot, force to null all attributes that should be
1702  * read as null in the context of the current grouping set.  Also stash the
1703  * current group bitmap where GroupingExpr can get at it.
1704  *
1705  * This relies on three conditions:
1706  *
1707  * 1) Nothing is ever going to try and extract the whole tuple from this slot,
1708  * only reference it in evaluations, which will only access individual
1709  * attributes.
1710  *
1711  * 2) No system columns are going to need to be nulled. (If a system column is
1712  * referenced in a group clause, it is actually projected in the outer plan
1713  * tlist.)
1714  *
1715  * 3) Within a given phase, we never need to recover the value of an attribute
1716  * once it has been set to null.
1717  *
1718  * Poking into the slot this way is a bit ugly, but the consensus is that the
1719  * alternative was worse.
1720  */
1721 static void
prepare_projection_slot(AggState * aggstate,TupleTableSlot * slot,int currentSet)1722 prepare_projection_slot(AggState *aggstate, TupleTableSlot *slot, int currentSet)
1723 {
1724 	if (aggstate->phase->grouped_cols)
1725 	{
1726 		Bitmapset  *grouped_cols = aggstate->phase->grouped_cols[currentSet];
1727 
1728 		aggstate->grouped_cols = grouped_cols;
1729 
1730 		if (slot->tts_isempty)
1731 		{
1732 			/*
1733 			 * Force all values to be NULL if working on an empty input tuple
1734 			 * (i.e. an empty grouping set for which no input rows were
1735 			 * supplied).
1736 			 */
1737 			ExecStoreAllNullTuple(slot);
1738 		}
1739 		else if (aggstate->all_grouped_cols)
1740 		{
1741 			ListCell   *lc;
1742 
1743 			/* all_grouped_cols is arranged in desc order */
1744 			slot_getsomeattrs(slot, linitial_int(aggstate->all_grouped_cols));
1745 
1746 			foreach(lc, aggstate->all_grouped_cols)
1747 			{
1748 				int			attnum = lfirst_int(lc);
1749 
1750 				if (!bms_is_member(attnum, grouped_cols))
1751 					slot->tts_isnull[attnum - 1] = true;
1752 			}
1753 		}
1754 	}
1755 }
1756 
1757 /*
1758  * Compute the final value of all aggregates for one group.
1759  *
1760  * This function handles only one grouping set at a time, which the caller must
1761  * have selected.  It's also the caller's responsibility to adjust the supplied
1762  * pergroup parameter to point to the current set's transvalues.
1763  *
1764  * Results are stored in the output econtext aggvalues/aggnulls.
1765  */
1766 static void
finalize_aggregates(AggState * aggstate,AggStatePerAgg peraggs,AggStatePerGroup pergroup)1767 finalize_aggregates(AggState *aggstate,
1768 					AggStatePerAgg peraggs,
1769 					AggStatePerGroup pergroup)
1770 {
1771 	ExprContext *econtext = aggstate->ss.ps.ps_ExprContext;
1772 	Datum	   *aggvalues = econtext->ecxt_aggvalues;
1773 	bool	   *aggnulls = econtext->ecxt_aggnulls;
1774 	int			aggno;
1775 	int			transno;
1776 
1777 	/*
1778 	 * If there were any DISTINCT and/or ORDER BY aggregates, sort their
1779 	 * inputs and run the transition functions.
1780 	 */
1781 	for (transno = 0; transno < aggstate->numtrans; transno++)
1782 	{
1783 		AggStatePerTrans pertrans = &aggstate->pertrans[transno];
1784 		AggStatePerGroup pergroupstate;
1785 
1786 		pergroupstate = &pergroup[transno];
1787 
1788 		if (pertrans->numSortCols > 0)
1789 		{
1790 			Assert(aggstate->aggstrategy != AGG_HASHED &&
1791 				   aggstate->aggstrategy != AGG_MIXED);
1792 
1793 			if (pertrans->numInputs == 1)
1794 				process_ordered_aggregate_single(aggstate,
1795 												 pertrans,
1796 												 pergroupstate);
1797 			else
1798 				process_ordered_aggregate_multi(aggstate,
1799 												pertrans,
1800 												pergroupstate);
1801 		}
1802 	}
1803 
1804 	/*
1805 	 * Run the final functions.
1806 	 */
1807 	for (aggno = 0; aggno < aggstate->numaggs; aggno++)
1808 	{
1809 		AggStatePerAgg peragg = &peraggs[aggno];
1810 		int			transno = peragg->transno;
1811 		AggStatePerGroup pergroupstate;
1812 
1813 		pergroupstate = &pergroup[transno];
1814 
1815 		if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit))
1816 			finalize_partialaggregate(aggstate, peragg, pergroupstate,
1817 									  &aggvalues[aggno], &aggnulls[aggno]);
1818 		else
1819 			finalize_aggregate(aggstate, peragg, pergroupstate,
1820 							   &aggvalues[aggno], &aggnulls[aggno]);
1821 	}
1822 }
1823 
1824 /*
1825  * Project the result of a group (whose aggs have already been calculated by
1826  * finalize_aggregates). Returns the result slot, or NULL if no row is
1827  * projected (suppressed by qual).
1828  */
1829 static TupleTableSlot *
project_aggregates(AggState * aggstate)1830 project_aggregates(AggState *aggstate)
1831 {
1832 	ExprContext *econtext = aggstate->ss.ps.ps_ExprContext;
1833 
1834 	/*
1835 	 * Check the qual (HAVING clause); if the group does not match, ignore it.
1836 	 */
1837 	if (ExecQual(aggstate->ss.ps.qual, econtext))
1838 	{
1839 		/*
1840 		 * Form and return projection tuple using the aggregate results and
1841 		 * the representative input tuple.
1842 		 */
1843 		return ExecProject(aggstate->ss.ps.ps_ProjInfo);
1844 	}
1845 	else
1846 		InstrCountFiltered1(aggstate, 1);
1847 
1848 	return NULL;
1849 }
1850 
1851 /*
1852  * find_unaggregated_cols
1853  *	  Construct a bitmapset of the column numbers of un-aggregated Vars
1854  *	  appearing in our targetlist and qual (HAVING clause)
1855  */
1856 static Bitmapset *
find_unaggregated_cols(AggState * aggstate)1857 find_unaggregated_cols(AggState *aggstate)
1858 {
1859 	Agg		   *node = (Agg *) aggstate->ss.ps.plan;
1860 	Bitmapset  *colnos;
1861 
1862 	colnos = NULL;
1863 	(void) find_unaggregated_cols_walker((Node *) node->plan.targetlist,
1864 										 &colnos);
1865 	(void) find_unaggregated_cols_walker((Node *) node->plan.qual,
1866 										 &colnos);
1867 	return colnos;
1868 }
1869 
1870 static bool
find_unaggregated_cols_walker(Node * node,Bitmapset ** colnos)1871 find_unaggregated_cols_walker(Node *node, Bitmapset **colnos)
1872 {
1873 	if (node == NULL)
1874 		return false;
1875 	if (IsA(node, Var))
1876 	{
1877 		Var		   *var = (Var *) node;
1878 
1879 		/* setrefs.c should have set the varno to OUTER_VAR */
1880 		Assert(var->varno == OUTER_VAR);
1881 		Assert(var->varlevelsup == 0);
1882 		*colnos = bms_add_member(*colnos, var->varattno);
1883 		return false;
1884 	}
1885 	if (IsA(node, Aggref) ||IsA(node, GroupingFunc))
1886 	{
1887 		/* do not descend into aggregate exprs */
1888 		return false;
1889 	}
1890 	return expression_tree_walker(node, find_unaggregated_cols_walker,
1891 								  (void *) colnos);
1892 }
1893 
1894 /*
1895  * Initialize the hash table(s) to empty.
1896  *
1897  * To implement hashed aggregation, we need a hashtable that stores a
1898  * representative tuple and an array of AggStatePerGroup structs for each
1899  * distinct set of GROUP BY column values.  We compute the hash key from the
1900  * GROUP BY columns.  The per-group data is allocated in lookup_hash_entry(),
1901  * for each entry.
1902  *
1903  * We have a separate hashtable and associated perhash data structure for each
1904  * grouping set for which we're doing hashing.
1905  *
1906  * The hash tables always live in the hashcontext's per-tuple memory context
1907  * (there is only one of these for all tables together, since they are all
1908  * reset at the same time).
1909  */
1910 static void
build_hash_table(AggState * aggstate)1911 build_hash_table(AggState *aggstate)
1912 {
1913 	MemoryContext tmpmem = aggstate->tmpcontext->ecxt_per_tuple_memory;
1914 	Size		additionalsize;
1915 	int			i;
1916 
1917 	Assert(aggstate->aggstrategy == AGG_HASHED || aggstate->aggstrategy == AGG_MIXED);
1918 
1919 	additionalsize = aggstate->numtrans * sizeof(AggStatePerGroupData);
1920 
1921 	for (i = 0; i < aggstate->num_hashes; ++i)
1922 	{
1923 		AggStatePerHash perhash = &aggstate->perhash[i];
1924 
1925 		Assert(perhash->aggnode->numGroups > 0);
1926 
1927 		perhash->hashtable = BuildTupleHashTable(perhash->numCols,
1928 												 perhash->hashGrpColIdxHash,
1929 												 perhash->eqfunctions,
1930 												 perhash->hashfunctions,
1931 												 perhash->aggnode->numGroups,
1932 												 additionalsize,
1933 												 aggstate->hashcontext->ecxt_per_tuple_memory,
1934 												 tmpmem,
1935 												 DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit));
1936 	}
1937 }
1938 
1939 /*
1940  * Compute columns that actually need to be stored in hashtable entries.  The
1941  * incoming tuples from the child plan node will contain grouping columns,
1942  * other columns referenced in our targetlist and qual, columns used to
1943  * compute the aggregate functions, and perhaps just junk columns we don't use
1944  * at all.  Only columns of the first two types need to be stored in the
1945  * hashtable, and getting rid of the others can make the table entries
1946  * significantly smaller.  The hashtable only contains the relevant columns,
1947  * and is packed/unpacked in lookup_hash_entry() / agg_retrieve_hash_table()
1948  * into the format of the normal input descriptor.
1949  *
1950  * Additional columns, in addition to the columns grouped by, come from two
1951  * sources: Firstly functionally dependent columns that we don't need to group
1952  * by themselves, and secondly ctids for row-marks.
1953  *
1954  * To eliminate duplicates, we build a bitmapset of the needed columns, and
1955  * then build an array of the columns included in the hashtable. We might
1956  * still have duplicates if the passed-in grpColIdx has them, which can happen
1957  * in edge cases from semijoins/distinct; these can't always be removed,
1958  * because it's not certain that the duplicate cols will be using the same
1959  * hash function.
1960  *
1961  * Note that the array is preserved over ExecReScanAgg, so we allocate it in
1962  * the per-query context (unlike the hash table itself).
1963  */
1964 static void
find_hash_columns(AggState * aggstate)1965 find_hash_columns(AggState *aggstate)
1966 {
1967 	Bitmapset  *base_colnos;
1968 	List	   *outerTlist = outerPlanState(aggstate)->plan->targetlist;
1969 	int			numHashes = aggstate->num_hashes;
1970 	int			j;
1971 
1972 	/* Find Vars that will be needed in tlist and qual */
1973 	base_colnos = find_unaggregated_cols(aggstate);
1974 
1975 	for (j = 0; j < numHashes; ++j)
1976 	{
1977 		AggStatePerHash perhash = &aggstate->perhash[j];
1978 		Bitmapset  *colnos = bms_copy(base_colnos);
1979 		AttrNumber *grpColIdx = perhash->aggnode->grpColIdx;
1980 		List	   *hashTlist = NIL;
1981 		TupleDesc	hashDesc;
1982 		int			maxCols;
1983 		int			i;
1984 
1985 		perhash->largestGrpColIdx = 0;
1986 
1987 		/*
1988 		 * If we're doing grouping sets, then some Vars might be referenced in
1989 		 * tlist/qual for the benefit of other grouping sets, but not needed
1990 		 * when hashing; i.e. prepare_projection_slot will null them out, so
1991 		 * there'd be no point storing them.  Use prepare_projection_slot's
1992 		 * logic to determine which.
1993 		 */
1994 		if (aggstate->phases[0].grouped_cols)
1995 		{
1996 			Bitmapset  *grouped_cols = aggstate->phases[0].grouped_cols[j];
1997 			ListCell   *lc;
1998 
1999 			foreach(lc, aggstate->all_grouped_cols)
2000 			{
2001 				int			attnum = lfirst_int(lc);
2002 
2003 				if (!bms_is_member(attnum, grouped_cols))
2004 					colnos = bms_del_member(colnos, attnum);
2005 			}
2006 		}
2007 
2008 		/*
2009 		 * Compute maximum number of input columns accounting for possible
2010 		 * duplications in the grpColIdx array, which can happen in some edge
2011 		 * cases where HashAggregate was generated as part of a semijoin or a
2012 		 * DISTINCT.
2013 		 */
2014 		maxCols = bms_num_members(colnos) + perhash->numCols;
2015 
2016 		perhash->hashGrpColIdxInput =
2017 			palloc(maxCols * sizeof(AttrNumber));
2018 		perhash->hashGrpColIdxHash =
2019 			palloc(perhash->numCols * sizeof(AttrNumber));
2020 
2021 		/* Add all the grouping columns to colnos */
2022 		for (i = 0; i < perhash->numCols; i++)
2023 			colnos = bms_add_member(colnos, grpColIdx[i]);
2024 
2025 		/*
2026 		 * First build mapping for columns directly hashed. These are the
2027 		 * first, because they'll be accessed when computing hash values and
2028 		 * comparing tuples for exact matches. We also build simple mapping
2029 		 * for execGrouping, so it knows where to find the to-be-hashed /
2030 		 * compared columns in the input.
2031 		 */
2032 		for (i = 0; i < perhash->numCols; i++)
2033 		{
2034 			perhash->hashGrpColIdxInput[i] = grpColIdx[i];
2035 			perhash->hashGrpColIdxHash[i] = i + 1;
2036 			perhash->numhashGrpCols++;
2037 			/* delete already mapped columns */
2038 			bms_del_member(colnos, grpColIdx[i]);
2039 		}
2040 
2041 		/* and add the remaining columns */
2042 		while ((i = bms_first_member(colnos)) >= 0)
2043 		{
2044 			perhash->hashGrpColIdxInput[perhash->numhashGrpCols] = i;
2045 			perhash->numhashGrpCols++;
2046 		}
2047 
2048 		/* and build a tuple descriptor for the hashtable */
2049 		for (i = 0; i < perhash->numhashGrpCols; i++)
2050 		{
2051 			int			varNumber = perhash->hashGrpColIdxInput[i] - 1;
2052 
2053 			hashTlist = lappend(hashTlist, list_nth(outerTlist, varNumber));
2054 			perhash->largestGrpColIdx =
2055 				Max(varNumber + 1, perhash->largestGrpColIdx);
2056 		}
2057 
2058 		hashDesc = ExecTypeFromTL(hashTlist, false);
2059 		ExecSetSlotDescriptor(perhash->hashslot, hashDesc);
2060 
2061 		list_free(hashTlist);
2062 		bms_free(colnos);
2063 	}
2064 
2065 	bms_free(base_colnos);
2066 }
2067 
2068 /*
2069  * Estimate per-hash-table-entry overhead for the planner.
2070  *
2071  * Note that the estimate does not include space for pass-by-reference
2072  * transition data values, nor for the representative tuple of each group.
2073  * Nor does this account of the target fill-factor and growth policy of the
2074  * hash table.
2075  */
2076 Size
hash_agg_entry_size(int numAggs)2077 hash_agg_entry_size(int numAggs)
2078 {
2079 	Size		entrysize;
2080 
2081 	/* This must match build_hash_table */
2082 	entrysize = sizeof(TupleHashEntryData) +
2083 		numAggs * sizeof(AggStatePerGroupData);
2084 	entrysize = MAXALIGN(entrysize);
2085 
2086 	return entrysize;
2087 }
2088 
2089 /*
2090  * Find or create a hashtable entry for the tuple group containing the current
2091  * tuple (already set in tmpcontext's outertuple slot), in the current grouping
2092  * set (which the caller must have selected - note that initialize_aggregate
2093  * depends on this).
2094  *
2095  * When called, CurrentMemoryContext should be the per-query context.
2096  */
2097 static TupleHashEntryData *
lookup_hash_entry(AggState * aggstate)2098 lookup_hash_entry(AggState *aggstate)
2099 {
2100 	TupleTableSlot *inputslot = aggstate->tmpcontext->ecxt_outertuple;
2101 	AggStatePerHash perhash = &aggstate->perhash[aggstate->current_set];
2102 	TupleTableSlot *hashslot = perhash->hashslot;
2103 	TupleHashEntryData *entry;
2104 	bool		isnew;
2105 	int			i;
2106 
2107 	/* transfer just the needed columns into hashslot */
2108 	slot_getsomeattrs(inputslot, perhash->largestGrpColIdx);
2109 	ExecClearTuple(hashslot);
2110 
2111 	for (i = 0; i < perhash->numhashGrpCols; i++)
2112 	{
2113 		int			varNumber = perhash->hashGrpColIdxInput[i] - 1;
2114 
2115 		hashslot->tts_values[i] = inputslot->tts_values[varNumber];
2116 		hashslot->tts_isnull[i] = inputslot->tts_isnull[varNumber];
2117 	}
2118 	ExecStoreVirtualTuple(hashslot);
2119 
2120 	/* find or create the hashtable entry using the filtered tuple */
2121 	entry = LookupTupleHashEntry(perhash->hashtable, hashslot, &isnew);
2122 
2123 	if (isnew)
2124 	{
2125 		entry->additional = (AggStatePerGroup)
2126 			MemoryContextAlloc(perhash->hashtable->tablecxt,
2127 							   sizeof(AggStatePerGroupData) * aggstate->numtrans);
2128 		/* initialize aggregates for new tuple group */
2129 		initialize_aggregates(aggstate, (AggStatePerGroup) entry->additional,
2130 							  -1);
2131 	}
2132 
2133 	return entry;
2134 }
2135 
2136 /*
2137  * Look up hash entries for the current tuple in all hashed grouping sets,
2138  * returning an array of pergroup pointers suitable for advance_aggregates.
2139  *
2140  * Be aware that lookup_hash_entry can reset the tmpcontext.
2141  */
2142 static AggStatePerGroup *
lookup_hash_entries(AggState * aggstate)2143 lookup_hash_entries(AggState *aggstate)
2144 {
2145 	int			numHashes = aggstate->num_hashes;
2146 	AggStatePerGroup *pergroup = aggstate->hash_pergroup;
2147 	int			setno;
2148 
2149 	for (setno = 0; setno < numHashes; setno++)
2150 	{
2151 		select_current_set(aggstate, setno, true);
2152 		pergroup[setno] = lookup_hash_entry(aggstate)->additional;
2153 	}
2154 
2155 	return pergroup;
2156 }
2157 
2158 /*
2159  * ExecAgg -
2160  *
2161  *	  ExecAgg receives tuples from its outer subplan and aggregates over
2162  *	  the appropriate attribute for each aggregate function use (Aggref
2163  *	  node) appearing in the targetlist or qual of the node.  The number
2164  *	  of tuples to aggregate over depends on whether grouped or plain
2165  *	  aggregation is selected.  In grouped aggregation, we produce a result
2166  *	  row for each group; in plain aggregation there's a single result row
2167  *	  for the whole query.  In either case, the value of each aggregate is
2168  *	  stored in the expression context to be used when ExecProject evaluates
2169  *	  the result tuple.
2170  */
2171 static TupleTableSlot *
ExecAgg(PlanState * pstate)2172 ExecAgg(PlanState *pstate)
2173 {
2174 	AggState   *node = castNode(AggState, pstate);
2175 	TupleTableSlot *result = NULL;
2176 
2177 	CHECK_FOR_INTERRUPTS();
2178 
2179 	if (!node->agg_done)
2180 	{
2181 		/* Dispatch based on strategy */
2182 		switch (node->phase->aggstrategy)
2183 		{
2184 			case AGG_HASHED:
2185 				if (!node->table_filled)
2186 					agg_fill_hash_table(node);
2187 				/* FALLTHROUGH */
2188 			case AGG_MIXED:
2189 				result = agg_retrieve_hash_table(node);
2190 				break;
2191 			case AGG_PLAIN:
2192 			case AGG_SORTED:
2193 				result = agg_retrieve_direct(node);
2194 				break;
2195 		}
2196 
2197 		if (!TupIsNull(result))
2198 			return result;
2199 	}
2200 
2201 	return NULL;
2202 }
2203 
2204 /*
2205  * ExecAgg for non-hashed case
2206  */
2207 static TupleTableSlot *
agg_retrieve_direct(AggState * aggstate)2208 agg_retrieve_direct(AggState *aggstate)
2209 {
2210 	Agg		   *node = aggstate->phase->aggnode;
2211 	ExprContext *econtext;
2212 	ExprContext *tmpcontext;
2213 	AggStatePerAgg peragg;
2214 	AggStatePerGroup pergroup;
2215 	AggStatePerGroup *hash_pergroups = NULL;
2216 	TupleTableSlot *outerslot;
2217 	TupleTableSlot *firstSlot;
2218 	TupleTableSlot *result;
2219 	bool		hasGroupingSets = aggstate->phase->numsets > 0;
2220 	int			numGroupingSets = Max(aggstate->phase->numsets, 1);
2221 	int			currentSet;
2222 	int			nextSetSize;
2223 	int			numReset;
2224 	int			i;
2225 
2226 	/*
2227 	 * get state info from node
2228 	 *
2229 	 * econtext is the per-output-tuple expression context
2230 	 *
2231 	 * tmpcontext is the per-input-tuple expression context
2232 	 */
2233 	econtext = aggstate->ss.ps.ps_ExprContext;
2234 	tmpcontext = aggstate->tmpcontext;
2235 
2236 	peragg = aggstate->peragg;
2237 	pergroup = aggstate->pergroup;
2238 	firstSlot = aggstate->ss.ss_ScanTupleSlot;
2239 
2240 	/*
2241 	 * We loop retrieving groups until we find one matching
2242 	 * aggstate->ss.ps.qual
2243 	 *
2244 	 * For grouping sets, we have the invariant that aggstate->projected_set
2245 	 * is either -1 (initial call) or the index (starting from 0) in
2246 	 * gset_lengths for the group we just completed (either by projecting a
2247 	 * row or by discarding it in the qual).
2248 	 */
2249 	while (!aggstate->agg_done)
2250 	{
2251 		/*
2252 		 * Clear the per-output-tuple context for each group, as well as
2253 		 * aggcontext (which contains any pass-by-ref transvalues of the old
2254 		 * group).  Some aggregate functions store working state in child
2255 		 * contexts; those now get reset automatically without us needing to
2256 		 * do anything special.
2257 		 *
2258 		 * We use ReScanExprContext not just ResetExprContext because we want
2259 		 * any registered shutdown callbacks to be called.  That allows
2260 		 * aggregate functions to ensure they've cleaned up any non-memory
2261 		 * resources.
2262 		 */
2263 		ReScanExprContext(econtext);
2264 
2265 		/*
2266 		 * Determine how many grouping sets need to be reset at this boundary.
2267 		 */
2268 		if (aggstate->projected_set >= 0 &&
2269 			aggstate->projected_set < numGroupingSets)
2270 			numReset = aggstate->projected_set + 1;
2271 		else
2272 			numReset = numGroupingSets;
2273 
2274 		/*
2275 		 * numReset can change on a phase boundary, but that's OK; we want to
2276 		 * reset the contexts used in _this_ phase, and later, after possibly
2277 		 * changing phase, initialize the right number of aggregates for the
2278 		 * _new_ phase.
2279 		 */
2280 
2281 		for (i = 0; i < numReset; i++)
2282 		{
2283 			ReScanExprContext(aggstate->aggcontexts[i]);
2284 		}
2285 
2286 		/*
2287 		 * Check if input is complete and there are no more groups to project
2288 		 * in this phase; move to next phase or mark as done.
2289 		 */
2290 		if (aggstate->input_done == true &&
2291 			aggstate->projected_set >= (numGroupingSets - 1))
2292 		{
2293 			if (aggstate->current_phase < aggstate->numphases - 1)
2294 			{
2295 				initialize_phase(aggstate, aggstate->current_phase + 1);
2296 				aggstate->input_done = false;
2297 				aggstate->projected_set = -1;
2298 				numGroupingSets = Max(aggstate->phase->numsets, 1);
2299 				node = aggstate->phase->aggnode;
2300 				numReset = numGroupingSets;
2301 			}
2302 			else if (aggstate->aggstrategy == AGG_MIXED)
2303 			{
2304 				/*
2305 				 * Mixed mode; we've output all the grouped stuff and have
2306 				 * full hashtables, so switch to outputting those.
2307 				 */
2308 				initialize_phase(aggstate, 0);
2309 				aggstate->table_filled = true;
2310 				ResetTupleHashIterator(aggstate->perhash[0].hashtable,
2311 									   &aggstate->perhash[0].hashiter);
2312 				select_current_set(aggstate, 0, true);
2313 				return agg_retrieve_hash_table(aggstate);
2314 			}
2315 			else
2316 			{
2317 				aggstate->agg_done = true;
2318 				break;
2319 			}
2320 		}
2321 
2322 		/*
2323 		 * Get the number of columns in the next grouping set after the last
2324 		 * projected one (if any). This is the number of columns to compare to
2325 		 * see if we reached the boundary of that set too.
2326 		 */
2327 		if (aggstate->projected_set >= 0 &&
2328 			aggstate->projected_set < (numGroupingSets - 1))
2329 			nextSetSize = aggstate->phase->gset_lengths[aggstate->projected_set + 1];
2330 		else
2331 			nextSetSize = 0;
2332 
2333 		/*----------
2334 		 * If a subgroup for the current grouping set is present, project it.
2335 		 *
2336 		 * We have a new group if:
2337 		 *	- we're out of input but haven't projected all grouping sets
2338 		 *	  (checked above)
2339 		 * OR
2340 		 *	  - we already projected a row that wasn't from the last grouping
2341 		 *		set
2342 		 *	  AND
2343 		 *	  - the next grouping set has at least one grouping column (since
2344 		 *		empty grouping sets project only once input is exhausted)
2345 		 *	  AND
2346 		 *	  - the previous and pending rows differ on the grouping columns
2347 		 *		of the next grouping set
2348 		 *----------
2349 		 */
2350 		if (aggstate->input_done ||
2351 			(node->aggstrategy != AGG_PLAIN &&
2352 			 aggstate->projected_set != -1 &&
2353 			 aggstate->projected_set < (numGroupingSets - 1) &&
2354 			 nextSetSize > 0 &&
2355 			 !execTuplesMatch(econtext->ecxt_outertuple,
2356 							  tmpcontext->ecxt_outertuple,
2357 							  nextSetSize,
2358 							  node->grpColIdx,
2359 							  aggstate->phase->eqfunctions,
2360 							  tmpcontext->ecxt_per_tuple_memory)))
2361 		{
2362 			aggstate->projected_set += 1;
2363 
2364 			Assert(aggstate->projected_set < numGroupingSets);
2365 			Assert(nextSetSize > 0 || aggstate->input_done);
2366 		}
2367 		else
2368 		{
2369 			/*
2370 			 * We no longer care what group we just projected, the next
2371 			 * projection will always be the first (or only) grouping set
2372 			 * (unless the input proves to be empty).
2373 			 */
2374 			aggstate->projected_set = 0;
2375 
2376 			/*
2377 			 * If we don't already have the first tuple of the new group,
2378 			 * fetch it from the outer plan.
2379 			 */
2380 			if (aggstate->grp_firstTuple == NULL)
2381 			{
2382 				outerslot = fetch_input_tuple(aggstate);
2383 				if (!TupIsNull(outerslot))
2384 				{
2385 					/*
2386 					 * Make a copy of the first input tuple; we will use this
2387 					 * for comparisons (in group mode) and for projection.
2388 					 */
2389 					aggstate->grp_firstTuple = ExecCopySlotTuple(outerslot);
2390 				}
2391 				else
2392 				{
2393 					/* outer plan produced no tuples at all */
2394 					if (hasGroupingSets)
2395 					{
2396 						/*
2397 						 * If there was no input at all, we need to project
2398 						 * rows only if there are grouping sets of size 0.
2399 						 * Note that this implies that there can't be any
2400 						 * references to ungrouped Vars, which would otherwise
2401 						 * cause issues with the empty output slot.
2402 						 *
2403 						 * XXX: This is no longer true, we currently deal with
2404 						 * this in finalize_aggregates().
2405 						 */
2406 						aggstate->input_done = true;
2407 
2408 						while (aggstate->phase->gset_lengths[aggstate->projected_set] > 0)
2409 						{
2410 							aggstate->projected_set += 1;
2411 							if (aggstate->projected_set >= numGroupingSets)
2412 							{
2413 								/*
2414 								 * We can't set agg_done here because we might
2415 								 * have more phases to do, even though the
2416 								 * input is empty. So we need to restart the
2417 								 * whole outer loop.
2418 								 */
2419 								break;
2420 							}
2421 						}
2422 
2423 						if (aggstate->projected_set >= numGroupingSets)
2424 							continue;
2425 					}
2426 					else
2427 					{
2428 						aggstate->agg_done = true;
2429 						/* If we are grouping, we should produce no tuples too */
2430 						if (node->aggstrategy != AGG_PLAIN)
2431 							return NULL;
2432 					}
2433 				}
2434 			}
2435 
2436 			/*
2437 			 * Initialize working state for a new input tuple group.
2438 			 */
2439 			initialize_aggregates(aggstate, pergroup, numReset);
2440 
2441 			if (aggstate->grp_firstTuple != NULL)
2442 			{
2443 				/*
2444 				 * Store the copied first input tuple in the tuple table slot
2445 				 * reserved for it.  The tuple will be deleted when it is
2446 				 * cleared from the slot.
2447 				 */
2448 				ExecStoreTuple(aggstate->grp_firstTuple,
2449 							   firstSlot,
2450 							   InvalidBuffer,
2451 							   true);
2452 				aggstate->grp_firstTuple = NULL;	/* don't keep two pointers */
2453 
2454 				/* set up for first advance_aggregates call */
2455 				tmpcontext->ecxt_outertuple = firstSlot;
2456 
2457 				/*
2458 				 * Process each outer-plan tuple, and then fetch the next one,
2459 				 * until we exhaust the outer plan or cross a group boundary.
2460 				 */
2461 				for (;;)
2462 				{
2463 					/*
2464 					 * During phase 1 only of a mixed agg, we need to update
2465 					 * hashtables as well in advance_aggregates.
2466 					 */
2467 					if (aggstate->aggstrategy == AGG_MIXED &&
2468 						aggstate->current_phase == 1)
2469 					{
2470 						hash_pergroups = lookup_hash_entries(aggstate);
2471 					}
2472 					else
2473 						hash_pergroups = NULL;
2474 
2475 					if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit))
2476 						combine_aggregates(aggstate, pergroup);
2477 					else
2478 						advance_aggregates(aggstate, pergroup, hash_pergroups);
2479 
2480 					/* Reset per-input-tuple context after each tuple */
2481 					ResetExprContext(tmpcontext);
2482 
2483 					outerslot = fetch_input_tuple(aggstate);
2484 					if (TupIsNull(outerslot))
2485 					{
2486 						/* no more outer-plan tuples available */
2487 						if (hasGroupingSets)
2488 						{
2489 							aggstate->input_done = true;
2490 							break;
2491 						}
2492 						else
2493 						{
2494 							aggstate->agg_done = true;
2495 							break;
2496 						}
2497 					}
2498 					/* set up for next advance_aggregates call */
2499 					tmpcontext->ecxt_outertuple = outerslot;
2500 
2501 					/*
2502 					 * If we are grouping, check whether we've crossed a group
2503 					 * boundary.
2504 					 */
2505 					if (node->aggstrategy != AGG_PLAIN)
2506 					{
2507 						if (!execTuplesMatch(firstSlot,
2508 											 outerslot,
2509 											 node->numCols,
2510 											 node->grpColIdx,
2511 											 aggstate->phase->eqfunctions,
2512 											 tmpcontext->ecxt_per_tuple_memory))
2513 						{
2514 							aggstate->grp_firstTuple = ExecCopySlotTuple(outerslot);
2515 							break;
2516 						}
2517 					}
2518 				}
2519 			}
2520 
2521 			/*
2522 			 * Use the representative input tuple for any references to
2523 			 * non-aggregated input columns in aggregate direct args, the node
2524 			 * qual, and the tlist.  (If we are not grouping, and there are no
2525 			 * input rows at all, we will come here with an empty firstSlot
2526 			 * ... but if not grouping, there can't be any references to
2527 			 * non-aggregated input columns, so no problem.)
2528 			 */
2529 			econtext->ecxt_outertuple = firstSlot;
2530 		}
2531 
2532 		Assert(aggstate->projected_set >= 0);
2533 
2534 		currentSet = aggstate->projected_set;
2535 
2536 		prepare_projection_slot(aggstate, econtext->ecxt_outertuple, currentSet);
2537 
2538 		select_current_set(aggstate, currentSet, false);
2539 
2540 		finalize_aggregates(aggstate,
2541 							peragg,
2542 							pergroup + (currentSet * aggstate->numtrans));
2543 
2544 		/*
2545 		 * If there's no row to project right now, we must continue rather
2546 		 * than returning a null since there might be more groups.
2547 		 */
2548 		result = project_aggregates(aggstate);
2549 		if (result)
2550 			return result;
2551 	}
2552 
2553 	/* No more groups */
2554 	return NULL;
2555 }
2556 
2557 /*
2558  * ExecAgg for hashed case: read input and build hash table
2559  */
2560 static void
agg_fill_hash_table(AggState * aggstate)2561 agg_fill_hash_table(AggState *aggstate)
2562 {
2563 	TupleTableSlot *outerslot;
2564 	ExprContext *tmpcontext = aggstate->tmpcontext;
2565 
2566 	/*
2567 	 * Process each outer-plan tuple, and then fetch the next one, until we
2568 	 * exhaust the outer plan.
2569 	 */
2570 	for (;;)
2571 	{
2572 		AggStatePerGroup *pergroups;
2573 
2574 		outerslot = fetch_input_tuple(aggstate);
2575 		if (TupIsNull(outerslot))
2576 			break;
2577 
2578 		/* set up for lookup_hash_entries and advance_aggregates */
2579 		tmpcontext->ecxt_outertuple = outerslot;
2580 
2581 		/* Find or build hashtable entries */
2582 		pergroups = lookup_hash_entries(aggstate);
2583 
2584 		/* Advance the aggregates */
2585 		if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit))
2586 			combine_aggregates(aggstate, pergroups[0]);
2587 		else
2588 			advance_aggregates(aggstate, NULL, pergroups);
2589 
2590 		/*
2591 		 * Reset per-input-tuple context after each tuple, but note that the
2592 		 * hash lookups do this too
2593 		 */
2594 		ResetExprContext(aggstate->tmpcontext);
2595 	}
2596 
2597 	aggstate->table_filled = true;
2598 	/* Initialize to walk the first hash table */
2599 	select_current_set(aggstate, 0, true);
2600 	ResetTupleHashIterator(aggstate->perhash[0].hashtable,
2601 						   &aggstate->perhash[0].hashiter);
2602 }
2603 
2604 /*
2605  * ExecAgg for hashed case: retrieving groups from hash table
2606  */
2607 static TupleTableSlot *
agg_retrieve_hash_table(AggState * aggstate)2608 agg_retrieve_hash_table(AggState *aggstate)
2609 {
2610 	ExprContext *econtext;
2611 	AggStatePerAgg peragg;
2612 	AggStatePerGroup pergroup;
2613 	TupleHashEntryData *entry;
2614 	TupleTableSlot *firstSlot;
2615 	TupleTableSlot *result;
2616 	AggStatePerHash perhash;
2617 
2618 	/*
2619 	 * get state info from node.
2620 	 *
2621 	 * econtext is the per-output-tuple expression context.
2622 	 */
2623 	econtext = aggstate->ss.ps.ps_ExprContext;
2624 	peragg = aggstate->peragg;
2625 	firstSlot = aggstate->ss.ss_ScanTupleSlot;
2626 
2627 	/*
2628 	 * Note that perhash (and therefore anything accessed through it) can
2629 	 * change inside the loop, as we change between grouping sets.
2630 	 */
2631 	perhash = &aggstate->perhash[aggstate->current_set];
2632 
2633 	/*
2634 	 * We loop retrieving groups until we find one satisfying
2635 	 * aggstate->ss.ps.qual
2636 	 */
2637 	while (!aggstate->agg_done)
2638 	{
2639 		TupleTableSlot *hashslot = perhash->hashslot;
2640 		int			i;
2641 
2642 		CHECK_FOR_INTERRUPTS();
2643 
2644 		/*
2645 		 * Find the next entry in the hash table
2646 		 */
2647 		entry = ScanTupleHashTable(perhash->hashtable, &perhash->hashiter);
2648 		if (entry == NULL)
2649 		{
2650 			int			nextset = aggstate->current_set + 1;
2651 
2652 			if (nextset < aggstate->num_hashes)
2653 			{
2654 				/*
2655 				 * Switch to next grouping set, reinitialize, and restart the
2656 				 * loop.
2657 				 */
2658 				select_current_set(aggstate, nextset, true);
2659 
2660 				perhash = &aggstate->perhash[aggstate->current_set];
2661 
2662 				ResetTupleHashIterator(perhash->hashtable, &perhash->hashiter);
2663 
2664 				continue;
2665 			}
2666 			else
2667 			{
2668 				/* No more hashtables, so done */
2669 				aggstate->agg_done = TRUE;
2670 				return NULL;
2671 			}
2672 		}
2673 
2674 		/*
2675 		 * Clear the per-output-tuple context for each group
2676 		 *
2677 		 * We intentionally don't use ReScanExprContext here; if any aggs have
2678 		 * registered shutdown callbacks, they mustn't be called yet, since we
2679 		 * might not be done with that agg.
2680 		 */
2681 		ResetExprContext(econtext);
2682 
2683 		/*
2684 		 * Transform representative tuple back into one with the right
2685 		 * columns.
2686 		 */
2687 		ExecStoreMinimalTuple(entry->firstTuple, hashslot, false);
2688 		slot_getallattrs(hashslot);
2689 
2690 		ExecClearTuple(firstSlot);
2691 		memset(firstSlot->tts_isnull, true,
2692 			   firstSlot->tts_tupleDescriptor->natts * sizeof(bool));
2693 
2694 		for (i = 0; i < perhash->numhashGrpCols; i++)
2695 		{
2696 			int			varNumber = perhash->hashGrpColIdxInput[i] - 1;
2697 
2698 			firstSlot->tts_values[varNumber] = hashslot->tts_values[i];
2699 			firstSlot->tts_isnull[varNumber] = hashslot->tts_isnull[i];
2700 		}
2701 		ExecStoreVirtualTuple(firstSlot);
2702 
2703 		pergroup = (AggStatePerGroup) entry->additional;
2704 
2705 		/*
2706 		 * Use the representative input tuple for any references to
2707 		 * non-aggregated input columns in the qual and tlist.
2708 		 */
2709 		econtext->ecxt_outertuple = firstSlot;
2710 
2711 		prepare_projection_slot(aggstate,
2712 								econtext->ecxt_outertuple,
2713 								aggstate->current_set);
2714 
2715 		finalize_aggregates(aggstate, peragg, pergroup);
2716 
2717 		result = project_aggregates(aggstate);
2718 		if (result)
2719 			return result;
2720 	}
2721 
2722 	/* No more groups */
2723 	return NULL;
2724 }
2725 
2726 /* -----------------
2727  * ExecInitAgg
2728  *
2729  *	Creates the run-time information for the agg node produced by the
2730  *	planner and initializes its outer subtree.
2731  *
2732  * -----------------
2733  */
2734 AggState *
ExecInitAgg(Agg * node,EState * estate,int eflags)2735 ExecInitAgg(Agg *node, EState *estate, int eflags)
2736 {
2737 	AggState   *aggstate;
2738 	AggStatePerAgg peraggs;
2739 	AggStatePerTrans pertransstates;
2740 	Plan	   *outerPlan;
2741 	ExprContext *econtext;
2742 	int			numaggs,
2743 				transno,
2744 				aggno;
2745 	int			phase;
2746 	int			phaseidx;
2747 	List	   *combined_inputeval;
2748 	TupleDesc	combineddesc;
2749 	TupleTableSlot *combinedslot;
2750 	ListCell   *l;
2751 	Bitmapset  *all_grouped_cols = NULL;
2752 	int			numGroupingSets = 1;
2753 	int			numPhases;
2754 	int			numHashes;
2755 	int			column_offset;
2756 	int			i = 0;
2757 	int			j = 0;
2758 	bool		use_hashing = (node->aggstrategy == AGG_HASHED ||
2759 							   node->aggstrategy == AGG_MIXED);
2760 
2761 	/* check for unsupported flags */
2762 	Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
2763 
2764 	/*
2765 	 * create state structure
2766 	 */
2767 	aggstate = makeNode(AggState);
2768 	aggstate->ss.ps.plan = (Plan *) node;
2769 	aggstate->ss.ps.state = estate;
2770 	aggstate->ss.ps.ExecProcNode = ExecAgg;
2771 
2772 	aggstate->aggs = NIL;
2773 	aggstate->numaggs = 0;
2774 	aggstate->numtrans = 0;
2775 	aggstate->aggstrategy = node->aggstrategy;
2776 	aggstate->aggsplit = node->aggsplit;
2777 	aggstate->maxsets = 0;
2778 	aggstate->projected_set = -1;
2779 	aggstate->current_set = 0;
2780 	aggstate->peragg = NULL;
2781 	aggstate->pertrans = NULL;
2782 	aggstate->curperagg = NULL;
2783 	aggstate->curpertrans = NULL;
2784 	aggstate->input_done = false;
2785 	aggstate->agg_done = false;
2786 	aggstate->pergroup = NULL;
2787 	aggstate->grp_firstTuple = NULL;
2788 	aggstate->sort_in = NULL;
2789 	aggstate->sort_out = NULL;
2790 
2791 	/*
2792 	 * phases[0] always exists, but is dummy in sorted/plain mode
2793 	 */
2794 	numPhases = (use_hashing ? 1 : 2);
2795 	numHashes = (use_hashing ? 1 : 0);
2796 
2797 	/*
2798 	 * Calculate the maximum number of grouping sets in any phase; this
2799 	 * determines the size of some allocations.  Also calculate the number of
2800 	 * phases, since all hashed/mixed nodes contribute to only a single phase.
2801 	 */
2802 	if (node->groupingSets)
2803 	{
2804 		numGroupingSets = list_length(node->groupingSets);
2805 
2806 		foreach(l, node->chain)
2807 		{
2808 			Agg		   *agg = lfirst(l);
2809 
2810 			numGroupingSets = Max(numGroupingSets,
2811 								  list_length(agg->groupingSets));
2812 
2813 			/*
2814 			 * additional AGG_HASHED aggs become part of phase 0, but all
2815 			 * others add an extra phase.
2816 			 */
2817 			if (agg->aggstrategy != AGG_HASHED)
2818 				++numPhases;
2819 			else
2820 				++numHashes;
2821 		}
2822 	}
2823 
2824 	aggstate->maxsets = numGroupingSets;
2825 	aggstate->numphases = numPhases;
2826 
2827 	aggstate->aggcontexts = (ExprContext **)
2828 		palloc0(sizeof(ExprContext *) * numGroupingSets);
2829 
2830 	/*
2831 	 * Create expression contexts.  We need three or more, one for
2832 	 * per-input-tuple processing, one for per-output-tuple processing, one
2833 	 * for all the hashtables, and one for each grouping set.  The per-tuple
2834 	 * memory context of the per-grouping-set ExprContexts (aggcontexts)
2835 	 * replaces the standalone memory context formerly used to hold transition
2836 	 * values.  We cheat a little by using ExecAssignExprContext() to build
2837 	 * all of them.
2838 	 *
2839 	 * NOTE: the details of what is stored in aggcontexts and what is stored
2840 	 * in the regular per-query memory context are driven by a simple
2841 	 * decision: we want to reset the aggcontext at group boundaries (if not
2842 	 * hashing) and in ExecReScanAgg to recover no-longer-wanted space.
2843 	 */
2844 	ExecAssignExprContext(estate, &aggstate->ss.ps);
2845 	aggstate->tmpcontext = aggstate->ss.ps.ps_ExprContext;
2846 
2847 	for (i = 0; i < numGroupingSets; ++i)
2848 	{
2849 		ExecAssignExprContext(estate, &aggstate->ss.ps);
2850 		aggstate->aggcontexts[i] = aggstate->ss.ps.ps_ExprContext;
2851 	}
2852 
2853 	if (use_hashing)
2854 	{
2855 		ExecAssignExprContext(estate, &aggstate->ss.ps);
2856 		aggstate->hashcontext = aggstate->ss.ps.ps_ExprContext;
2857 	}
2858 
2859 	ExecAssignExprContext(estate, &aggstate->ss.ps);
2860 
2861 	/*
2862 	 * tuple table initialization.
2863 	 *
2864 	 * For hashtables, we create some additional slots below.
2865 	 */
2866 	ExecInitScanTupleSlot(estate, &aggstate->ss);
2867 	ExecInitResultTupleSlot(estate, &aggstate->ss.ps);
2868 	aggstate->sort_slot = ExecInitExtraTupleSlot(estate);
2869 
2870 	/*
2871 	 * initialize child expressions
2872 	 *
2873 	 * We expect the parser to have checked that no aggs contain other agg
2874 	 * calls in their arguments (and just to be sure, we verify it again while
2875 	 * initializing the plan node).  This would make no sense under SQL
2876 	 * semantics, and it's forbidden by the spec.  Because it is true, we
2877 	 * don't need to worry about evaluating the aggs in any particular order.
2878 	 *
2879 	 * Note: execExpr.c finds Aggrefs for us, and adds their AggrefExprState
2880 	 * nodes to aggstate->aggs.  Aggrefs in the qual are found here; Aggrefs
2881 	 * in the targetlist are found during ExecAssignProjectionInfo, below.
2882 	 */
2883 	aggstate->ss.ps.qual =
2884 		ExecInitQual(node->plan.qual, (PlanState *) aggstate);
2885 
2886 	/*
2887 	 * Initialize child nodes.
2888 	 *
2889 	 * If we are doing a hashed aggregation then the child plan does not need
2890 	 * to handle REWIND efficiently; see ExecReScanAgg.
2891 	 */
2892 	if (node->aggstrategy == AGG_HASHED)
2893 		eflags &= ~EXEC_FLAG_REWIND;
2894 	outerPlan = outerPlan(node);
2895 	outerPlanState(aggstate) = ExecInitNode(outerPlan, estate, eflags);
2896 
2897 	/*
2898 	 * initialize source tuple type.
2899 	 */
2900 	ExecAssignScanTypeFromOuterPlan(&aggstate->ss);
2901 	if (node->chain)
2902 		ExecSetSlotDescriptor(aggstate->sort_slot,
2903 							  aggstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor);
2904 
2905 	/*
2906 	 * Initialize result tuple type and projection info.
2907 	 */
2908 	ExecAssignResultTypeFromTL(&aggstate->ss.ps);
2909 	ExecAssignProjectionInfo(&aggstate->ss.ps, NULL);
2910 
2911 	/*
2912 	 * We should now have found all Aggrefs in the targetlist and quals.
2913 	 */
2914 	numaggs = aggstate->numaggs;
2915 	Assert(numaggs == list_length(aggstate->aggs));
2916 
2917 	/*
2918 	 * For each phase, prepare grouping set data and fmgr lookup data for
2919 	 * compare functions.  Accumulate all_grouped_cols in passing.
2920 	 */
2921 	aggstate->phases = palloc0(numPhases * sizeof(AggStatePerPhaseData));
2922 
2923 	aggstate->num_hashes = numHashes;
2924 	if (numHashes)
2925 	{
2926 		aggstate->perhash = palloc0(sizeof(AggStatePerHashData) * numHashes);
2927 		aggstate->phases[0].numsets = 0;
2928 		aggstate->phases[0].gset_lengths = palloc(numHashes * sizeof(int));
2929 		aggstate->phases[0].grouped_cols = palloc(numHashes * sizeof(Bitmapset *));
2930 	}
2931 
2932 	phase = 0;
2933 	for (phaseidx = 0; phaseidx <= list_length(node->chain); ++phaseidx)
2934 	{
2935 		Agg		   *aggnode;
2936 		Sort	   *sortnode;
2937 
2938 		if (phaseidx > 0)
2939 		{
2940 			aggnode = list_nth_node(Agg, node->chain, phaseidx - 1);
2941 			sortnode = castNode(Sort, aggnode->plan.lefttree);
2942 		}
2943 		else
2944 		{
2945 			aggnode = node;
2946 			sortnode = NULL;
2947 		}
2948 
2949 		Assert(phase <= 1 || sortnode);
2950 
2951 		if (aggnode->aggstrategy == AGG_HASHED
2952 			|| aggnode->aggstrategy == AGG_MIXED)
2953 		{
2954 			AggStatePerPhase phasedata = &aggstate->phases[0];
2955 			AggStatePerHash perhash;
2956 			Bitmapset  *cols = NULL;
2957 
2958 			Assert(phase == 0);
2959 			i = phasedata->numsets++;
2960 			perhash = &aggstate->perhash[i];
2961 
2962 			/* phase 0 always points to the "real" Agg in the hash case */
2963 			phasedata->aggnode = node;
2964 			phasedata->aggstrategy = node->aggstrategy;
2965 
2966 			/* but the actual Agg node representing this hash is saved here */
2967 			perhash->aggnode = aggnode;
2968 
2969 			phasedata->gset_lengths[i] = perhash->numCols = aggnode->numCols;
2970 
2971 			for (j = 0; j < aggnode->numCols; ++j)
2972 				cols = bms_add_member(cols, aggnode->grpColIdx[j]);
2973 
2974 			phasedata->grouped_cols[i] = cols;
2975 
2976 			all_grouped_cols = bms_add_members(all_grouped_cols, cols);
2977 			continue;
2978 		}
2979 		else
2980 		{
2981 			AggStatePerPhase phasedata = &aggstate->phases[++phase];
2982 			int			num_sets;
2983 
2984 			phasedata->numsets = num_sets = list_length(aggnode->groupingSets);
2985 
2986 			if (num_sets)
2987 			{
2988 				phasedata->gset_lengths = palloc(num_sets * sizeof(int));
2989 				phasedata->grouped_cols = palloc(num_sets * sizeof(Bitmapset *));
2990 
2991 				i = 0;
2992 				foreach(l, aggnode->groupingSets)
2993 				{
2994 					int			current_length = list_length(lfirst(l));
2995 					Bitmapset  *cols = NULL;
2996 
2997 					/* planner forces this to be correct */
2998 					for (j = 0; j < current_length; ++j)
2999 						cols = bms_add_member(cols, aggnode->grpColIdx[j]);
3000 
3001 					phasedata->grouped_cols[i] = cols;
3002 					phasedata->gset_lengths[i] = current_length;
3003 
3004 					++i;
3005 				}
3006 
3007 				all_grouped_cols = bms_add_members(all_grouped_cols,
3008 												   phasedata->grouped_cols[0]);
3009 			}
3010 			else
3011 			{
3012 				Assert(phaseidx == 0);
3013 
3014 				phasedata->gset_lengths = NULL;
3015 				phasedata->grouped_cols = NULL;
3016 			}
3017 
3018 			/*
3019 			 * If we are grouping, precompute fmgr lookup data for inner loop.
3020 			 */
3021 			if (aggnode->aggstrategy == AGG_SORTED)
3022 			{
3023 				Assert(aggnode->numCols > 0);
3024 
3025 				phasedata->eqfunctions =
3026 					execTuplesMatchPrepare(aggnode->numCols,
3027 										   aggnode->grpOperators);
3028 			}
3029 
3030 			phasedata->aggnode = aggnode;
3031 			phasedata->aggstrategy = aggnode->aggstrategy;
3032 			phasedata->sortnode = sortnode;
3033 		}
3034 	}
3035 
3036 	/*
3037 	 * Convert all_grouped_cols to a descending-order list.
3038 	 */
3039 	i = -1;
3040 	while ((i = bms_next_member(all_grouped_cols, i)) >= 0)
3041 		aggstate->all_grouped_cols = lcons_int(i, aggstate->all_grouped_cols);
3042 
3043 	/*
3044 	 * Set up aggregate-result storage in the output expr context, and also
3045 	 * allocate my private per-agg working storage
3046 	 */
3047 	econtext = aggstate->ss.ps.ps_ExprContext;
3048 	econtext->ecxt_aggvalues = (Datum *) palloc0(sizeof(Datum) * numaggs);
3049 	econtext->ecxt_aggnulls = (bool *) palloc0(sizeof(bool) * numaggs);
3050 
3051 	peraggs = (AggStatePerAgg) palloc0(sizeof(AggStatePerAggData) * numaggs);
3052 	pertransstates = (AggStatePerTrans) palloc0(sizeof(AggStatePerTransData) * numaggs);
3053 
3054 	aggstate->peragg = peraggs;
3055 	aggstate->pertrans = pertransstates;
3056 
3057 	/*
3058 	 * Hashing can only appear in the initial phase.
3059 	 */
3060 	if (use_hashing)
3061 	{
3062 		for (i = 0; i < numHashes; ++i)
3063 		{
3064 			aggstate->perhash[i].hashslot = ExecInitExtraTupleSlot(estate);
3065 
3066 			execTuplesHashPrepare(aggstate->perhash[i].numCols,
3067 								  aggstate->perhash[i].aggnode->grpOperators,
3068 								  &aggstate->perhash[i].eqfunctions,
3069 								  &aggstate->perhash[i].hashfunctions);
3070 		}
3071 
3072 		/* this is an array of pointers, not structures */
3073 		aggstate->hash_pergroup = palloc0(sizeof(AggStatePerGroup) * numHashes);
3074 
3075 		find_hash_columns(aggstate);
3076 
3077 		/* Skip massive memory allocation if we are just doing EXPLAIN */
3078 		if (!(eflags & EXEC_FLAG_EXPLAIN_ONLY))
3079 			build_hash_table(aggstate);
3080 
3081 		aggstate->table_filled = false;
3082 	}
3083 
3084 	if (node->aggstrategy != AGG_HASHED)
3085 	{
3086 		AggStatePerGroup pergroup;
3087 
3088 		pergroup = (AggStatePerGroup) palloc0(sizeof(AggStatePerGroupData)
3089 											  * numaggs
3090 											  * numGroupingSets);
3091 
3092 		aggstate->pergroup = pergroup;
3093 	}
3094 
3095 	/*
3096 	 * Initialize current phase-dependent values to initial phase. The initial
3097 	 * phase is 1 (first sort pass) for all strategies that use sorting (if
3098 	 * hashing is being done too, then phase 0 is processed last); but if only
3099 	 * hashing is being done, then phase 0 is all there is.
3100 	 */
3101 	if (node->aggstrategy == AGG_HASHED)
3102 	{
3103 		aggstate->current_phase = 0;
3104 		initialize_phase(aggstate, 0);
3105 		select_current_set(aggstate, 0, true);
3106 	}
3107 	else
3108 	{
3109 		aggstate->current_phase = 1;
3110 		initialize_phase(aggstate, 1);
3111 		select_current_set(aggstate, 0, false);
3112 	}
3113 
3114 	/* -----------------
3115 	 * Perform lookups of aggregate function info, and initialize the
3116 	 * unchanging fields of the per-agg and per-trans data.
3117 	 *
3118 	 * We try to optimize by detecting duplicate aggregate functions so that
3119 	 * their state and final values are re-used, rather than needlessly being
3120 	 * re-calculated independently. We also detect aggregates that are not
3121 	 * the same, but which can share the same transition state.
3122 	 *
3123 	 * Scenarios:
3124 	 *
3125 	 * 1. Identical aggregate function calls appear in the query:
3126 	 *
3127 	 *	  SELECT SUM(x) FROM ... HAVING SUM(x) > 0
3128 	 *
3129 	 *	  Since these aggregates are identical, we only need to calculate
3130 	 *	  the value once. Both aggregates will share the same 'aggno' value.
3131 	 *
3132 	 * 2. Two different aggregate functions appear in the query, but the
3133 	 *	  aggregates have the same arguments, transition functions and
3134 	 *	  initial values (and, presumably, different final functions):
3135 	 *
3136 	 *	  SELECT AVG(x), STDDEV(x) FROM ...
3137 	 *
3138 	 *	  In this case we must create a new peragg for the varying aggregate,
3139 	 *	  and we need to call the final functions separately, but we need
3140 	 *	  only run the transition function once.  (This requires that the
3141 	 *	  final functions be nondestructive of the transition state, but
3142 	 *	  that's required anyway for other reasons.)
3143 	 *
3144 	 * For either of these optimizations to be valid, all aggregate properties
3145 	 * used in the transition phase must be the same, including any modifiers
3146 	 * such as ORDER BY, DISTINCT and FILTER, and the arguments mustn't
3147 	 * contain any volatile functions.
3148 	 * -----------------
3149 	 */
3150 	aggno = -1;
3151 	transno = -1;
3152 	foreach(l, aggstate->aggs)
3153 	{
3154 		AggrefExprState *aggrefstate = (AggrefExprState *) lfirst(l);
3155 		Aggref	   *aggref = aggrefstate->aggref;
3156 		AggStatePerAgg peragg;
3157 		AggStatePerTrans pertrans;
3158 		int			existing_aggno;
3159 		int			existing_transno;
3160 		List	   *same_input_transnos;
3161 		Oid			inputTypes[FUNC_MAX_ARGS];
3162 		int			numArguments;
3163 		int			numDirectArgs;
3164 		HeapTuple	aggTuple;
3165 		Form_pg_aggregate aggform;
3166 		AclResult	aclresult;
3167 		Oid			transfn_oid,
3168 					finalfn_oid;
3169 		Oid			serialfn_oid,
3170 					deserialfn_oid;
3171 		Expr	   *finalfnexpr;
3172 		Oid			aggtranstype;
3173 		Datum		textInitVal;
3174 		Datum		initValue;
3175 		bool		initValueIsNull;
3176 
3177 		/* Planner should have assigned aggregate to correct level */
3178 		Assert(aggref->agglevelsup == 0);
3179 		/* ... and the split mode should match */
3180 		Assert(aggref->aggsplit == aggstate->aggsplit);
3181 
3182 		/* 1. Check for already processed aggs which can be re-used */
3183 		existing_aggno = find_compatible_peragg(aggref, aggstate, aggno,
3184 												&same_input_transnos);
3185 		if (existing_aggno != -1)
3186 		{
3187 			/*
3188 			 * Existing compatible agg found. so just point the Aggref to the
3189 			 * same per-agg struct.
3190 			 */
3191 			aggrefstate->aggno = existing_aggno;
3192 			continue;
3193 		}
3194 
3195 		/* Mark Aggref state node with assigned index in the result array */
3196 		peragg = &peraggs[++aggno];
3197 		peragg->aggref = aggref;
3198 		aggrefstate->aggno = aggno;
3199 
3200 		/* Fetch the pg_aggregate row */
3201 		aggTuple = SearchSysCache1(AGGFNOID,
3202 								   ObjectIdGetDatum(aggref->aggfnoid));
3203 		if (!HeapTupleIsValid(aggTuple))
3204 			elog(ERROR, "cache lookup failed for aggregate %u",
3205 				 aggref->aggfnoid);
3206 		aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
3207 
3208 		/* Check permission to call aggregate function */
3209 		aclresult = pg_proc_aclcheck(aggref->aggfnoid, GetUserId(),
3210 									 ACL_EXECUTE);
3211 		if (aclresult != ACLCHECK_OK)
3212 			aclcheck_error(aclresult, ACL_KIND_PROC,
3213 						   get_func_name(aggref->aggfnoid));
3214 		InvokeFunctionExecuteHook(aggref->aggfnoid);
3215 
3216 		/* planner recorded transition state type in the Aggref itself */
3217 		aggtranstype = aggref->aggtranstype;
3218 		Assert(OidIsValid(aggtranstype));
3219 
3220 		/*
3221 		 * If this aggregation is performing state combines, then instead of
3222 		 * using the transition function, we'll use the combine function
3223 		 */
3224 		if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit))
3225 		{
3226 			transfn_oid = aggform->aggcombinefn;
3227 
3228 			/* If not set then the planner messed up */
3229 			if (!OidIsValid(transfn_oid))
3230 				elog(ERROR, "combinefn not set for aggregate function");
3231 		}
3232 		else
3233 			transfn_oid = aggform->aggtransfn;
3234 
3235 		/* Final function only required if we're finalizing the aggregates */
3236 		if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit))
3237 			peragg->finalfn_oid = finalfn_oid = InvalidOid;
3238 		else
3239 			peragg->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
3240 
3241 		serialfn_oid = InvalidOid;
3242 		deserialfn_oid = InvalidOid;
3243 
3244 		/*
3245 		 * Check if serialization/deserialization is required.  We only do it
3246 		 * for aggregates that have transtype INTERNAL.
3247 		 */
3248 		if (aggtranstype == INTERNALOID)
3249 		{
3250 			/*
3251 			 * The planner should only have generated a serialize agg node if
3252 			 * every aggregate with an INTERNAL state has a serialization
3253 			 * function.  Verify that.
3254 			 */
3255 			if (DO_AGGSPLIT_SERIALIZE(aggstate->aggsplit))
3256 			{
3257 				/* serialization only valid when not running finalfn */
3258 				Assert(DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit));
3259 
3260 				if (!OidIsValid(aggform->aggserialfn))
3261 					elog(ERROR, "serialfunc not provided for serialization aggregation");
3262 				serialfn_oid = aggform->aggserialfn;
3263 			}
3264 
3265 			/* Likewise for deserialization functions */
3266 			if (DO_AGGSPLIT_DESERIALIZE(aggstate->aggsplit))
3267 			{
3268 				/* deserialization only valid when combining states */
3269 				Assert(DO_AGGSPLIT_COMBINE(aggstate->aggsplit));
3270 
3271 				if (!OidIsValid(aggform->aggdeserialfn))
3272 					elog(ERROR, "deserialfunc not provided for deserialization aggregation");
3273 				deserialfn_oid = aggform->aggdeserialfn;
3274 			}
3275 		}
3276 
3277 		/* Check that aggregate owner has permission to call component fns */
3278 		{
3279 			HeapTuple	procTuple;
3280 			Oid			aggOwner;
3281 
3282 			procTuple = SearchSysCache1(PROCOID,
3283 										ObjectIdGetDatum(aggref->aggfnoid));
3284 			if (!HeapTupleIsValid(procTuple))
3285 				elog(ERROR, "cache lookup failed for function %u",
3286 					 aggref->aggfnoid);
3287 			aggOwner = ((Form_pg_proc) GETSTRUCT(procTuple))->proowner;
3288 			ReleaseSysCache(procTuple);
3289 
3290 			aclresult = pg_proc_aclcheck(transfn_oid, aggOwner,
3291 										 ACL_EXECUTE);
3292 			if (aclresult != ACLCHECK_OK)
3293 				aclcheck_error(aclresult, ACL_KIND_PROC,
3294 							   get_func_name(transfn_oid));
3295 			InvokeFunctionExecuteHook(transfn_oid);
3296 			if (OidIsValid(finalfn_oid))
3297 			{
3298 				aclresult = pg_proc_aclcheck(finalfn_oid, aggOwner,
3299 											 ACL_EXECUTE);
3300 				if (aclresult != ACLCHECK_OK)
3301 					aclcheck_error(aclresult, ACL_KIND_PROC,
3302 								   get_func_name(finalfn_oid));
3303 				InvokeFunctionExecuteHook(finalfn_oid);
3304 			}
3305 			if (OidIsValid(serialfn_oid))
3306 			{
3307 				aclresult = pg_proc_aclcheck(serialfn_oid, aggOwner,
3308 											 ACL_EXECUTE);
3309 				if (aclresult != ACLCHECK_OK)
3310 					aclcheck_error(aclresult, ACL_KIND_PROC,
3311 								   get_func_name(serialfn_oid));
3312 				InvokeFunctionExecuteHook(serialfn_oid);
3313 			}
3314 			if (OidIsValid(deserialfn_oid))
3315 			{
3316 				aclresult = pg_proc_aclcheck(deserialfn_oid, aggOwner,
3317 											 ACL_EXECUTE);
3318 				if (aclresult != ACLCHECK_OK)
3319 					aclcheck_error(aclresult, ACL_KIND_PROC,
3320 								   get_func_name(deserialfn_oid));
3321 				InvokeFunctionExecuteHook(deserialfn_oid);
3322 			}
3323 		}
3324 
3325 		/*
3326 		 * Get actual datatypes of the (nominal) aggregate inputs.  These
3327 		 * could be different from the agg's declared input types, when the
3328 		 * agg accepts ANY or a polymorphic type.
3329 		 */
3330 		numArguments = get_aggregate_argtypes(aggref, inputTypes);
3331 
3332 		/* Count the "direct" arguments, if any */
3333 		numDirectArgs = list_length(aggref->aggdirectargs);
3334 
3335 		/* Detect how many arguments to pass to the finalfn */
3336 		if (aggform->aggfinalextra)
3337 			peragg->numFinalArgs = numArguments + 1;
3338 		else
3339 			peragg->numFinalArgs = numDirectArgs + 1;
3340 
3341 		/*
3342 		 * build expression trees using actual argument & result types for the
3343 		 * finalfn, if it exists and is required.
3344 		 */
3345 		if (OidIsValid(finalfn_oid))
3346 		{
3347 			build_aggregate_finalfn_expr(inputTypes,
3348 										 peragg->numFinalArgs,
3349 										 aggtranstype,
3350 										 aggref->aggtype,
3351 										 aggref->inputcollid,
3352 										 finalfn_oid,
3353 										 &finalfnexpr);
3354 			fmgr_info(finalfn_oid, &peragg->finalfn);
3355 			fmgr_info_set_expr((Node *) finalfnexpr, &peragg->finalfn);
3356 		}
3357 
3358 		/* get info about the output value's datatype */
3359 		get_typlenbyval(aggref->aggtype,
3360 						&peragg->resulttypeLen,
3361 						&peragg->resulttypeByVal);
3362 
3363 		/*
3364 		 * initval is potentially null, so don't try to access it as a struct
3365 		 * field. Must do it the hard way with SysCacheGetAttr.
3366 		 */
3367 		textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple,
3368 									  Anum_pg_aggregate_agginitval,
3369 									  &initValueIsNull);
3370 		if (initValueIsNull)
3371 			initValue = (Datum) 0;
3372 		else
3373 			initValue = GetAggInitVal(textInitVal, aggtranstype);
3374 
3375 		/*
3376 		 * 2. Build working state for invoking the transition function, or
3377 		 * look up previously initialized working state, if we can share it.
3378 		 *
3379 		 * find_compatible_peragg() already collected a list of per-Trans's
3380 		 * with the same inputs. Check if any of them have the same transition
3381 		 * function and initial value.
3382 		 */
3383 		existing_transno = find_compatible_pertrans(aggstate, aggref,
3384 													transfn_oid, aggtranstype,
3385 													serialfn_oid, deserialfn_oid,
3386 													initValue, initValueIsNull,
3387 													same_input_transnos);
3388 		if (existing_transno != -1)
3389 		{
3390 			/*
3391 			 * Existing compatible trans found, so just point the 'peragg' to
3392 			 * the same per-trans struct.
3393 			 */
3394 			pertrans = &pertransstates[existing_transno];
3395 			peragg->transno = existing_transno;
3396 		}
3397 		else
3398 		{
3399 			pertrans = &pertransstates[++transno];
3400 			build_pertrans_for_aggref(pertrans, aggstate, estate,
3401 									  aggref, transfn_oid, aggtranstype,
3402 									  serialfn_oid, deserialfn_oid,
3403 									  initValue, initValueIsNull,
3404 									  inputTypes, numArguments);
3405 			peragg->transno = transno;
3406 		}
3407 		ReleaseSysCache(aggTuple);
3408 	}
3409 
3410 	/*
3411 	 * Update aggstate->numaggs to be the number of unique aggregates found.
3412 	 * Also set numstates to the number of unique transition states found.
3413 	 */
3414 	aggstate->numaggs = aggno + 1;
3415 	aggstate->numtrans = transno + 1;
3416 
3417 	/*
3418 	 * Build a single projection computing the required arguments for all
3419 	 * aggregates at once; if there's more than one, that's considerably
3420 	 * faster than doing it separately for each.
3421 	 *
3422 	 * First create a targetlist representing the values to compute.
3423 	 */
3424 	combined_inputeval = NIL;
3425 	column_offset = 0;
3426 	for (transno = 0; transno < aggstate->numtrans; transno++)
3427 	{
3428 		AggStatePerTrans pertrans = &pertransstates[transno];
3429 
3430 		/*
3431 		 * Mark this per-trans state with its starting column in the combined
3432 		 * slot.
3433 		 */
3434 		pertrans->inputoff = column_offset;
3435 
3436 		/*
3437 		 * If the aggregate has a FILTER, we can only evaluate the filter
3438 		 * expression, not the actual input expressions, during the combined
3439 		 * eval step --- unless we're ignoring the filter because this node is
3440 		 * running combinefns not transfns.
3441 		 */
3442 		if (pertrans->aggref->aggfilter &&
3443 			!DO_AGGSPLIT_COMBINE(aggstate->aggsplit))
3444 		{
3445 			TargetEntry *tle;
3446 
3447 			tle = makeTargetEntry(pertrans->aggref->aggfilter,
3448 								  column_offset + 1, NULL, false);
3449 			combined_inputeval = lappend(combined_inputeval, tle);
3450 			column_offset++;
3451 
3452 			/*
3453 			 * We'll need separate projection machinery for the real args.
3454 			 * Arrange to evaluate them into the sortslot previously created.
3455 			 */
3456 			Assert(pertrans->sortslot);
3457 			pertrans->evalproj = ExecBuildProjectionInfo(pertrans->aggref->args,
3458 														 aggstate->tmpcontext,
3459 														 pertrans->sortslot,
3460 														 &aggstate->ss.ps,
3461 														 NULL);
3462 		}
3463 		else
3464 		{
3465 			/*
3466 			 * Add agg's input expressions to combined_inputeval, adjusting
3467 			 * resnos in the copied target entries to match the combined slot.
3468 			 */
3469 			ListCell   *arg;
3470 
3471 			foreach(arg, pertrans->aggref->args)
3472 			{
3473 				TargetEntry *source_tle = lfirst_node(TargetEntry, arg);
3474 				TargetEntry *tle;
3475 
3476 				tle = flatCopyTargetEntry(source_tle);
3477 				tle->resno += column_offset;
3478 
3479 				combined_inputeval = lappend(combined_inputeval, tle);
3480 			}
3481 
3482 			column_offset += list_length(pertrans->aggref->args);
3483 		}
3484 	}
3485 
3486 	/* Now create a projection for the combined targetlist */
3487 	combineddesc = ExecTypeFromTL(combined_inputeval, false);
3488 	combinedslot = ExecInitExtraTupleSlot(estate);
3489 	ExecSetSlotDescriptor(combinedslot, combineddesc);
3490 	aggstate->combinedproj = ExecBuildProjectionInfo(combined_inputeval,
3491 													 aggstate->tmpcontext,
3492 													 combinedslot,
3493 													 &aggstate->ss.ps,
3494 													 NULL);
3495 
3496 	/*
3497 	 * Last, check whether any more aggregates got added onto the node while
3498 	 * we processed the expressions for the aggregate arguments (including not
3499 	 * only the regular arguments and FILTER expressions handled immediately
3500 	 * above, but any direct arguments we might've handled earlier).  If so,
3501 	 * we have nested aggregate functions, which is semantically nonsensical,
3502 	 * so complain.  (This should have been caught by the parser, so we don't
3503 	 * need to work hard on a helpful error message; but we defend against it
3504 	 * here anyway, just to be sure.)
3505 	 */
3506 	if (numaggs != list_length(aggstate->aggs))
3507 		ereport(ERROR,
3508 				(errcode(ERRCODE_GROUPING_ERROR),
3509 				 errmsg("aggregate function calls cannot be nested")));
3510 
3511 	return aggstate;
3512 }
3513 
3514 /*
3515  * Build the state needed to calculate a state value for an aggregate.
3516  *
3517  * This initializes all the fields in 'pertrans'. 'aggref' is the aggregate
3518  * to initialize the state for. 'aggtransfn', 'aggtranstype', and the rest
3519  * of the arguments could be calculated from 'aggref', but the caller has
3520  * calculated them already, so might as well pass them.
3521  */
3522 static void
build_pertrans_for_aggref(AggStatePerTrans pertrans,AggState * aggstate,EState * estate,Aggref * aggref,Oid aggtransfn,Oid aggtranstype,Oid aggserialfn,Oid aggdeserialfn,Datum initValue,bool initValueIsNull,Oid * inputTypes,int numArguments)3523 build_pertrans_for_aggref(AggStatePerTrans pertrans,
3524 						  AggState *aggstate, EState *estate,
3525 						  Aggref *aggref,
3526 						  Oid aggtransfn, Oid aggtranstype,
3527 						  Oid aggserialfn, Oid aggdeserialfn,
3528 						  Datum initValue, bool initValueIsNull,
3529 						  Oid *inputTypes, int numArguments)
3530 {
3531 	int			numGroupingSets = Max(aggstate->maxsets, 1);
3532 	Expr	   *serialfnexpr = NULL;
3533 	Expr	   *deserialfnexpr = NULL;
3534 	ListCell   *lc;
3535 	int			numInputs;
3536 	int			numDirectArgs;
3537 	List	   *sortlist;
3538 	int			numSortCols;
3539 	int			numDistinctCols;
3540 	int			i;
3541 
3542 	/* Begin filling in the pertrans data */
3543 	pertrans->aggref = aggref;
3544 	pertrans->aggCollation = aggref->inputcollid;
3545 	pertrans->transfn_oid = aggtransfn;
3546 	pertrans->serialfn_oid = aggserialfn;
3547 	pertrans->deserialfn_oid = aggdeserialfn;
3548 	pertrans->initValue = initValue;
3549 	pertrans->initValueIsNull = initValueIsNull;
3550 
3551 	/* Count the "direct" arguments, if any */
3552 	numDirectArgs = list_length(aggref->aggdirectargs);
3553 
3554 	/* Count the number of aggregated input columns */
3555 	pertrans->numInputs = numInputs = list_length(aggref->args);
3556 
3557 	pertrans->aggtranstype = aggtranstype;
3558 
3559 	/* Detect how many arguments to pass to the transfn */
3560 	if (AGGKIND_IS_ORDERED_SET(aggref->aggkind))
3561 		pertrans->numTransInputs = numInputs;
3562 	else
3563 		pertrans->numTransInputs = numArguments;
3564 
3565 	/* inputoff and evalproj will be set up later, in ExecInitAgg */
3566 
3567 	/*
3568 	 * When combining states, we have no use at all for the aggregate
3569 	 * function's transfn. Instead we use the combinefn.  In this case, the
3570 	 * transfn and transfn_oid fields of pertrans refer to the combine
3571 	 * function rather than the transition function.
3572 	 */
3573 	if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit))
3574 	{
3575 		Expr	   *combinefnexpr;
3576 
3577 		build_aggregate_combinefn_expr(aggtranstype,
3578 									   aggref->inputcollid,
3579 									   aggtransfn,
3580 									   &combinefnexpr);
3581 		fmgr_info(aggtransfn, &pertrans->transfn);
3582 		fmgr_info_set_expr((Node *) combinefnexpr, &pertrans->transfn);
3583 
3584 		InitFunctionCallInfoData(pertrans->transfn_fcinfo,
3585 								 &pertrans->transfn,
3586 								 2,
3587 								 pertrans->aggCollation,
3588 								 (void *) aggstate, NULL);
3589 
3590 		/*
3591 		 * Ensure that a combine function to combine INTERNAL states is not
3592 		 * strict. This should have been checked during CREATE AGGREGATE, but
3593 		 * the strict property could have been changed since then.
3594 		 */
3595 		if (pertrans->transfn.fn_strict && aggtranstype == INTERNALOID)
3596 			ereport(ERROR,
3597 					(errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
3598 					 errmsg("combine function with transition type %s must not be declared STRICT",
3599 							format_type_be(aggtranstype))));
3600 	}
3601 	else
3602 	{
3603 		Expr	   *transfnexpr;
3604 
3605 		/*
3606 		 * Set up infrastructure for calling the transfn.  Note that invtrans
3607 		 * is not needed here.
3608 		 */
3609 		build_aggregate_transfn_expr(inputTypes,
3610 									 numArguments,
3611 									 numDirectArgs,
3612 									 aggref->aggvariadic,
3613 									 aggtranstype,
3614 									 aggref->inputcollid,
3615 									 aggtransfn,
3616 									 InvalidOid,
3617 									 &transfnexpr,
3618 									 NULL);
3619 		fmgr_info(aggtransfn, &pertrans->transfn);
3620 		fmgr_info_set_expr((Node *) transfnexpr, &pertrans->transfn);
3621 
3622 		InitFunctionCallInfoData(pertrans->transfn_fcinfo,
3623 								 &pertrans->transfn,
3624 								 pertrans->numTransInputs + 1,
3625 								 pertrans->aggCollation,
3626 								 (void *) aggstate, NULL);
3627 
3628 		/*
3629 		 * If the transfn is strict and the initval is NULL, make sure input
3630 		 * type and transtype are the same (or at least binary-compatible), so
3631 		 * that it's OK to use the first aggregated input value as the initial
3632 		 * transValue.  This should have been checked at agg definition time,
3633 		 * but we must check again in case the transfn's strictness property
3634 		 * has been changed.
3635 		 */
3636 		if (pertrans->transfn.fn_strict && pertrans->initValueIsNull)
3637 		{
3638 			if (numArguments <= numDirectArgs ||
3639 				!IsBinaryCoercible(inputTypes[numDirectArgs],
3640 								   aggtranstype))
3641 				ereport(ERROR,
3642 						(errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
3643 						 errmsg("aggregate %u needs to have compatible input type and transition type",
3644 								aggref->aggfnoid)));
3645 		}
3646 	}
3647 
3648 	/* get info about the state value's datatype */
3649 	get_typlenbyval(aggtranstype,
3650 					&pertrans->transtypeLen,
3651 					&pertrans->transtypeByVal);
3652 
3653 	if (OidIsValid(aggserialfn))
3654 	{
3655 		build_aggregate_serialfn_expr(aggserialfn,
3656 									  &serialfnexpr);
3657 		fmgr_info(aggserialfn, &pertrans->serialfn);
3658 		fmgr_info_set_expr((Node *) serialfnexpr, &pertrans->serialfn);
3659 
3660 		InitFunctionCallInfoData(pertrans->serialfn_fcinfo,
3661 								 &pertrans->serialfn,
3662 								 1,
3663 								 InvalidOid,
3664 								 (void *) aggstate, NULL);
3665 	}
3666 
3667 	if (OidIsValid(aggdeserialfn))
3668 	{
3669 		build_aggregate_deserialfn_expr(aggdeserialfn,
3670 										&deserialfnexpr);
3671 		fmgr_info(aggdeserialfn, &pertrans->deserialfn);
3672 		fmgr_info_set_expr((Node *) deserialfnexpr, &pertrans->deserialfn);
3673 
3674 		InitFunctionCallInfoData(pertrans->deserialfn_fcinfo,
3675 								 &pertrans->deserialfn,
3676 								 2,
3677 								 InvalidOid,
3678 								 (void *) aggstate, NULL);
3679 
3680 	}
3681 
3682 	/* Initialize any direct-argument expressions */
3683 	pertrans->aggdirectargs = ExecInitExprList(aggref->aggdirectargs,
3684 											   (PlanState *) aggstate);
3685 
3686 	/*
3687 	 * If we're doing either DISTINCT or ORDER BY for a plain agg, then we
3688 	 * have a list of SortGroupClause nodes; fish out the data in them and
3689 	 * stick them into arrays.  We ignore ORDER BY for an ordered-set agg,
3690 	 * however; the agg's transfn and finalfn are responsible for that.
3691 	 *
3692 	 * Note that by construction, if there is a DISTINCT clause then the ORDER
3693 	 * BY clause is a prefix of it (see transformDistinctClause).
3694 	 */
3695 	if (AGGKIND_IS_ORDERED_SET(aggref->aggkind))
3696 	{
3697 		sortlist = NIL;
3698 		numSortCols = numDistinctCols = 0;
3699 	}
3700 	else if (aggref->aggdistinct)
3701 	{
3702 		sortlist = aggref->aggdistinct;
3703 		numSortCols = numDistinctCols = list_length(sortlist);
3704 		Assert(numSortCols >= list_length(aggref->aggorder));
3705 	}
3706 	else
3707 	{
3708 		sortlist = aggref->aggorder;
3709 		numSortCols = list_length(sortlist);
3710 		numDistinctCols = 0;
3711 	}
3712 
3713 	pertrans->numSortCols = numSortCols;
3714 	pertrans->numDistinctCols = numDistinctCols;
3715 
3716 	/*
3717 	 * If we have either sorting or filtering to do, create a tupledesc and
3718 	 * slot corresponding to the aggregated inputs (including sort
3719 	 * expressions) of the agg.
3720 	 */
3721 	if (numSortCols > 0 || aggref->aggfilter)
3722 	{
3723 		pertrans->sortdesc = ExecTypeFromTL(aggref->args, false);
3724 		pertrans->sortslot = ExecInitExtraTupleSlot(estate);
3725 		ExecSetSlotDescriptor(pertrans->sortslot, pertrans->sortdesc);
3726 	}
3727 
3728 	if (numSortCols > 0)
3729 	{
3730 		/*
3731 		 * We don't implement DISTINCT or ORDER BY aggs in the HASHED case
3732 		 * (yet)
3733 		 */
3734 		Assert(aggstate->aggstrategy != AGG_HASHED && aggstate->aggstrategy != AGG_MIXED);
3735 
3736 		/* If we have only one input, we need its len/byval info. */
3737 		if (numInputs == 1)
3738 		{
3739 			get_typlenbyval(inputTypes[numDirectArgs],
3740 							&pertrans->inputtypeLen,
3741 							&pertrans->inputtypeByVal);
3742 		}
3743 		else if (numDistinctCols > 0)
3744 		{
3745 			/* we will need an extra slot to store prior values */
3746 			pertrans->uniqslot = ExecInitExtraTupleSlot(estate);
3747 			ExecSetSlotDescriptor(pertrans->uniqslot,
3748 								  pertrans->sortdesc);
3749 		}
3750 
3751 		/* Extract the sort information for use later */
3752 		pertrans->sortColIdx =
3753 			(AttrNumber *) palloc(numSortCols * sizeof(AttrNumber));
3754 		pertrans->sortOperators =
3755 			(Oid *) palloc(numSortCols * sizeof(Oid));
3756 		pertrans->sortCollations =
3757 			(Oid *) palloc(numSortCols * sizeof(Oid));
3758 		pertrans->sortNullsFirst =
3759 			(bool *) palloc(numSortCols * sizeof(bool));
3760 
3761 		i = 0;
3762 		foreach(lc, sortlist)
3763 		{
3764 			SortGroupClause *sortcl = (SortGroupClause *) lfirst(lc);
3765 			TargetEntry *tle = get_sortgroupclause_tle(sortcl, aggref->args);
3766 
3767 			/* the parser should have made sure of this */
3768 			Assert(OidIsValid(sortcl->sortop));
3769 
3770 			pertrans->sortColIdx[i] = tle->resno;
3771 			pertrans->sortOperators[i] = sortcl->sortop;
3772 			pertrans->sortCollations[i] = exprCollation((Node *) tle->expr);
3773 			pertrans->sortNullsFirst[i] = sortcl->nulls_first;
3774 			i++;
3775 		}
3776 		Assert(i == numSortCols);
3777 	}
3778 
3779 	if (aggref->aggdistinct)
3780 	{
3781 		Assert(numArguments > 0);
3782 
3783 		/*
3784 		 * We need the equal function for each DISTINCT comparison we will
3785 		 * make.
3786 		 */
3787 		pertrans->equalfns =
3788 			(FmgrInfo *) palloc(numDistinctCols * sizeof(FmgrInfo));
3789 
3790 		i = 0;
3791 		foreach(lc, aggref->aggdistinct)
3792 		{
3793 			SortGroupClause *sortcl = (SortGroupClause *) lfirst(lc);
3794 
3795 			fmgr_info(get_opcode(sortcl->eqop), &pertrans->equalfns[i]);
3796 			i++;
3797 		}
3798 		Assert(i == numDistinctCols);
3799 	}
3800 
3801 	pertrans->sortstates = (Tuplesortstate **)
3802 		palloc0(sizeof(Tuplesortstate *) * numGroupingSets);
3803 }
3804 
3805 
3806 static Datum
GetAggInitVal(Datum textInitVal,Oid transtype)3807 GetAggInitVal(Datum textInitVal, Oid transtype)
3808 {
3809 	Oid			typinput,
3810 				typioparam;
3811 	char	   *strInitVal;
3812 	Datum		initVal;
3813 
3814 	getTypeInputInfo(transtype, &typinput, &typioparam);
3815 	strInitVal = TextDatumGetCString(textInitVal);
3816 	initVal = OidInputFunctionCall(typinput, strInitVal,
3817 								   typioparam, -1);
3818 	pfree(strInitVal);
3819 	return initVal;
3820 }
3821 
3822 /*
3823  * find_compatible_peragg - search for a previously initialized per-Agg struct
3824  *
3825  * Searches the previously looked at aggregates to find one which is compatible
3826  * with this one, with the same input parameters. If no compatible aggregate
3827  * can be found, returns -1.
3828  *
3829  * As a side-effect, this also collects a list of existing per-Trans structs
3830  * with matching inputs. If no identical Aggref is found, the list is passed
3831  * later to find_compatible_pertrans, to see if we can at least reuse the
3832  * state value of another aggregate.
3833  */
3834 static int
find_compatible_peragg(Aggref * newagg,AggState * aggstate,int lastaggno,List ** same_input_transnos)3835 find_compatible_peragg(Aggref *newagg, AggState *aggstate,
3836 					   int lastaggno, List **same_input_transnos)
3837 {
3838 	int			aggno;
3839 	AggStatePerAgg peraggs;
3840 
3841 	*same_input_transnos = NIL;
3842 
3843 	/* we mustn't reuse the aggref if it contains volatile function calls */
3844 	if (contain_volatile_functions((Node *) newagg))
3845 		return -1;
3846 
3847 	peraggs = aggstate->peragg;
3848 
3849 	/*
3850 	 * Search through the list of already seen aggregates. If we find an
3851 	 * existing identical aggregate call, then we can re-use that one. While
3852 	 * searching, we'll also collect a list of Aggrefs with the same input
3853 	 * parameters. If no matching Aggref is found, the caller can potentially
3854 	 * still re-use the transition state of one of them.  (At this stage we
3855 	 * just compare the parsetrees; whether different aggregates share the
3856 	 * same transition function will be checked later.)
3857 	 */
3858 	for (aggno = 0; aggno <= lastaggno; aggno++)
3859 	{
3860 		AggStatePerAgg peragg;
3861 		Aggref	   *existingRef;
3862 
3863 		peragg = &peraggs[aggno];
3864 		existingRef = peragg->aggref;
3865 
3866 		/* all of the following must be the same or it's no match */
3867 		if (newagg->inputcollid != existingRef->inputcollid ||
3868 			newagg->aggtranstype != existingRef->aggtranstype ||
3869 			newagg->aggstar != existingRef->aggstar ||
3870 			newagg->aggvariadic != existingRef->aggvariadic ||
3871 			newagg->aggkind != existingRef->aggkind ||
3872 			!equal(newagg->aggdirectargs, existingRef->aggdirectargs) ||
3873 			!equal(newagg->args, existingRef->args) ||
3874 			!equal(newagg->aggorder, existingRef->aggorder) ||
3875 			!equal(newagg->aggdistinct, existingRef->aggdistinct) ||
3876 			!equal(newagg->aggfilter, existingRef->aggfilter))
3877 			continue;
3878 
3879 		/* if it's the same aggregate function then report exact match */
3880 		if (newagg->aggfnoid == existingRef->aggfnoid &&
3881 			newagg->aggtype == existingRef->aggtype &&
3882 			newagg->aggcollid == existingRef->aggcollid)
3883 		{
3884 			list_free(*same_input_transnos);
3885 			*same_input_transnos = NIL;
3886 			return aggno;
3887 		}
3888 
3889 		/*
3890 		 * Not identical, but it had the same inputs. Return it to the caller,
3891 		 * in case we can re-use its per-trans state.
3892 		 */
3893 		*same_input_transnos = lappend_int(*same_input_transnos,
3894 										   peragg->transno);
3895 	}
3896 
3897 	return -1;
3898 }
3899 
3900 /*
3901  * find_compatible_pertrans - search for a previously initialized per-Trans
3902  * struct
3903  *
3904  * Searches the list of transnos for a per-Trans struct with the same
3905  * transition function and initial condition. (The inputs have already been
3906  * verified to match.)
3907  */
3908 static int
find_compatible_pertrans(AggState * aggstate,Aggref * newagg,Oid aggtransfn,Oid aggtranstype,Oid aggserialfn,Oid aggdeserialfn,Datum initValue,bool initValueIsNull,List * transnos)3909 find_compatible_pertrans(AggState *aggstate, Aggref *newagg,
3910 						 Oid aggtransfn, Oid aggtranstype,
3911 						 Oid aggserialfn, Oid aggdeserialfn,
3912 						 Datum initValue, bool initValueIsNull,
3913 						 List *transnos)
3914 {
3915 	ListCell   *lc;
3916 
3917 	/*
3918 	 * For the moment, never try to share transition states between different
3919 	 * ordered-set aggregates.  This is necessary because the finalfns of the
3920 	 * built-in OSAs (see orderedsetaggs.c) are destructive of their
3921 	 * transition states.  We should fix them so we can allow this, but not
3922 	 * losing performance in the normal non-shared case will take some work.
3923 	 */
3924 	if (AGGKIND_IS_ORDERED_SET(newagg->aggkind))
3925 		return -1;
3926 
3927 	foreach(lc, transnos)
3928 	{
3929 		int			transno = lfirst_int(lc);
3930 		AggStatePerTrans pertrans = &aggstate->pertrans[transno];
3931 
3932 		/*
3933 		 * if the transfns or transition state types are not the same then the
3934 		 * state can't be shared.
3935 		 */
3936 		if (aggtransfn != pertrans->transfn_oid ||
3937 			aggtranstype != pertrans->aggtranstype)
3938 			continue;
3939 
3940 		/*
3941 		 * The serialization and deserialization functions must match, if
3942 		 * present, as we're unable to share the trans state for aggregates
3943 		 * which will serialize or deserialize into different formats.
3944 		 * Remember that these will be InvalidOid if they're not required for
3945 		 * this agg node.
3946 		 */
3947 		if (aggserialfn != pertrans->serialfn_oid ||
3948 			aggdeserialfn != pertrans->deserialfn_oid)
3949 			continue;
3950 
3951 		/*
3952 		 * Check that the initial condition matches, too.
3953 		 */
3954 		if (initValueIsNull && pertrans->initValueIsNull)
3955 			return transno;
3956 
3957 		if (!initValueIsNull && !pertrans->initValueIsNull &&
3958 			datumIsEqual(initValue, pertrans->initValue,
3959 						 pertrans->transtypeByVal, pertrans->transtypeLen))
3960 			return transno;
3961 	}
3962 	return -1;
3963 }
3964 
3965 void
ExecEndAgg(AggState * node)3966 ExecEndAgg(AggState *node)
3967 {
3968 	PlanState  *outerPlan;
3969 	int			transno;
3970 	int			numGroupingSets = Max(node->maxsets, 1);
3971 	int			setno;
3972 
3973 	/* Make sure we have closed any open tuplesorts */
3974 
3975 	if (node->sort_in)
3976 		tuplesort_end(node->sort_in);
3977 	if (node->sort_out)
3978 		tuplesort_end(node->sort_out);
3979 
3980 	for (transno = 0; transno < node->numtrans; transno++)
3981 	{
3982 		AggStatePerTrans pertrans = &node->pertrans[transno];
3983 
3984 		for (setno = 0; setno < numGroupingSets; setno++)
3985 		{
3986 			if (pertrans->sortstates[setno])
3987 				tuplesort_end(pertrans->sortstates[setno]);
3988 		}
3989 	}
3990 
3991 	/* And ensure any agg shutdown callbacks have been called */
3992 	for (setno = 0; setno < numGroupingSets; setno++)
3993 		ReScanExprContext(node->aggcontexts[setno]);
3994 	if (node->hashcontext)
3995 		ReScanExprContext(node->hashcontext);
3996 
3997 	/*
3998 	 * We don't actually free any ExprContexts here (see comment in
3999 	 * ExecFreeExprContext), just unlinking the output one from the plan node
4000 	 * suffices.
4001 	 */
4002 	ExecFreeExprContext(&node->ss.ps);
4003 
4004 	/* clean up tuple table */
4005 	ExecClearTuple(node->ss.ss_ScanTupleSlot);
4006 
4007 	outerPlan = outerPlanState(node);
4008 	ExecEndNode(outerPlan);
4009 }
4010 
4011 void
ExecReScanAgg(AggState * node)4012 ExecReScanAgg(AggState *node)
4013 {
4014 	ExprContext *econtext = node->ss.ps.ps_ExprContext;
4015 	PlanState  *outerPlan = outerPlanState(node);
4016 	Agg		   *aggnode = (Agg *) node->ss.ps.plan;
4017 	int			transno;
4018 	int			numGroupingSets = Max(node->maxsets, 1);
4019 	int			setno;
4020 
4021 	node->agg_done = false;
4022 
4023 	if (node->aggstrategy == AGG_HASHED)
4024 	{
4025 		/*
4026 		 * In the hashed case, if we haven't yet built the hash table then we
4027 		 * can just return; nothing done yet, so nothing to undo. If subnode's
4028 		 * chgParam is not NULL then it will be re-scanned by ExecProcNode,
4029 		 * else no reason to re-scan it at all.
4030 		 */
4031 		if (!node->table_filled)
4032 			return;
4033 
4034 		/*
4035 		 * If we do have the hash table, and the subplan does not have any
4036 		 * parameter changes, and none of our own parameter changes affect
4037 		 * input expressions of the aggregated functions, then we can just
4038 		 * rescan the existing hash table; no need to build it again.
4039 		 */
4040 		if (outerPlan->chgParam == NULL &&
4041 			!bms_overlap(node->ss.ps.chgParam, aggnode->aggParams))
4042 		{
4043 			ResetTupleHashIterator(node->perhash[0].hashtable,
4044 								   &node->perhash[0].hashiter);
4045 			select_current_set(node, 0, true);
4046 			return;
4047 		}
4048 	}
4049 
4050 	/* Make sure we have closed any open tuplesorts */
4051 	for (transno = 0; transno < node->numtrans; transno++)
4052 	{
4053 		for (setno = 0; setno < numGroupingSets; setno++)
4054 		{
4055 			AggStatePerTrans pertrans = &node->pertrans[transno];
4056 
4057 			if (pertrans->sortstates[setno])
4058 			{
4059 				tuplesort_end(pertrans->sortstates[setno]);
4060 				pertrans->sortstates[setno] = NULL;
4061 			}
4062 		}
4063 	}
4064 
4065 	/*
4066 	 * We don't need to ReScanExprContext the output tuple context here;
4067 	 * ExecReScan already did it. But we do need to reset our per-grouping-set
4068 	 * contexts, which may have transvalues stored in them. (We use rescan
4069 	 * rather than just reset because transfns may have registered callbacks
4070 	 * that need to be run now.) For the AGG_HASHED case, see below.
4071 	 */
4072 
4073 	for (setno = 0; setno < numGroupingSets; setno++)
4074 	{
4075 		ReScanExprContext(node->aggcontexts[setno]);
4076 	}
4077 
4078 	/* Release first tuple of group, if we have made a copy */
4079 	if (node->grp_firstTuple != NULL)
4080 	{
4081 		heap_freetuple(node->grp_firstTuple);
4082 		node->grp_firstTuple = NULL;
4083 	}
4084 	ExecClearTuple(node->ss.ss_ScanTupleSlot);
4085 
4086 	/* Forget current agg values */
4087 	MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * node->numaggs);
4088 	MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * node->numaggs);
4089 
4090 	/*
4091 	 * With AGG_HASHED/MIXED, the hash table is allocated in a sub-context of
4092 	 * the hashcontext. This used to be an issue, but now, resetting a context
4093 	 * automatically deletes sub-contexts too.
4094 	 */
4095 	if (node->aggstrategy == AGG_HASHED || node->aggstrategy == AGG_MIXED)
4096 	{
4097 		ReScanExprContext(node->hashcontext);
4098 		/* Rebuild an empty hash table */
4099 		build_hash_table(node);
4100 		node->table_filled = false;
4101 		/* iterator will be reset when the table is filled */
4102 	}
4103 
4104 	if (node->aggstrategy != AGG_HASHED)
4105 	{
4106 		/*
4107 		 * Reset the per-group state (in particular, mark transvalues null)
4108 		 */
4109 		MemSet(node->pergroup, 0,
4110 			   sizeof(AggStatePerGroupData) * node->numaggs * numGroupingSets);
4111 
4112 		/* reset to phase 1 */
4113 		initialize_phase(node, 1);
4114 
4115 		node->input_done = false;
4116 		node->projected_set = -1;
4117 	}
4118 
4119 	if (outerPlan->chgParam == NULL)
4120 		ExecReScan(outerPlan);
4121 }
4122 
4123 
4124 /***********************************************************************
4125  * API exposed to aggregate functions
4126  ***********************************************************************/
4127 
4128 
4129 /*
4130  * AggCheckCallContext - test if a SQL function is being called as an aggregate
4131  *
4132  * The transition and/or final functions of an aggregate may want to verify
4133  * that they are being called as aggregates, rather than as plain SQL
4134  * functions.  They should use this function to do so.  The return value
4135  * is nonzero if being called as an aggregate, or zero if not.  (Specific
4136  * nonzero values are AGG_CONTEXT_AGGREGATE or AGG_CONTEXT_WINDOW, but more
4137  * values could conceivably appear in future.)
4138  *
4139  * If aggcontext isn't NULL, the function also stores at *aggcontext the
4140  * identity of the memory context that aggregate transition values are being
4141  * stored in.  Note that the same aggregate call site (flinfo) may be called
4142  * interleaved on different transition values in different contexts, so it's
4143  * not kosher to cache aggcontext under fn_extra.  It is, however, kosher to
4144  * cache it in the transvalue itself (for internal-type transvalues).
4145  */
4146 int
AggCheckCallContext(FunctionCallInfo fcinfo,MemoryContext * aggcontext)4147 AggCheckCallContext(FunctionCallInfo fcinfo, MemoryContext *aggcontext)
4148 {
4149 	if (fcinfo->context && IsA(fcinfo->context, AggState))
4150 	{
4151 		if (aggcontext)
4152 		{
4153 			AggState   *aggstate = ((AggState *) fcinfo->context);
4154 			ExprContext *cxt = aggstate->curaggcontext;
4155 
4156 			*aggcontext = cxt->ecxt_per_tuple_memory;
4157 		}
4158 		return AGG_CONTEXT_AGGREGATE;
4159 	}
4160 	if (fcinfo->context && IsA(fcinfo->context, WindowAggState))
4161 	{
4162 		if (aggcontext)
4163 			*aggcontext = ((WindowAggState *) fcinfo->context)->curaggcontext;
4164 		return AGG_CONTEXT_WINDOW;
4165 	}
4166 
4167 	/* this is just to prevent "uninitialized variable" warnings */
4168 	if (aggcontext)
4169 		*aggcontext = NULL;
4170 	return 0;
4171 }
4172 
4173 /*
4174  * AggGetAggref - allow an aggregate support function to get its Aggref
4175  *
4176  * If the function is being called as an aggregate support function,
4177  * return the Aggref node for the aggregate call.  Otherwise, return NULL.
4178  *
4179  * Aggregates sharing the same inputs and transition functions can get
4180  * merged into a single transition calculation.  If the transition function
4181  * calls AggGetAggref, it will get some one of the Aggrefs for which it is
4182  * executing.  It must therefore not pay attention to the Aggref fields that
4183  * relate to the final function, as those are indeterminate.  But if a final
4184  * function calls AggGetAggref, it will get a precise result.
4185  *
4186  * Note that if an aggregate is being used as a window function, this will
4187  * return NULL.  We could provide a similar function to return the relevant
4188  * WindowFunc node in such cases, but it's not needed yet.
4189  */
4190 Aggref *
AggGetAggref(FunctionCallInfo fcinfo)4191 AggGetAggref(FunctionCallInfo fcinfo)
4192 {
4193 	if (fcinfo->context && IsA(fcinfo->context, AggState))
4194 	{
4195 		AggStatePerAgg curperagg;
4196 		AggStatePerTrans curpertrans;
4197 
4198 		/* check curperagg (valid when in a final function) */
4199 		curperagg = ((AggState *) fcinfo->context)->curperagg;
4200 
4201 		if (curperagg)
4202 			return curperagg->aggref;
4203 
4204 		/* check curpertrans (valid when in a transition function) */
4205 		curpertrans = ((AggState *) fcinfo->context)->curpertrans;
4206 
4207 		if (curpertrans)
4208 			return curpertrans->aggref;
4209 	}
4210 	return NULL;
4211 }
4212 
4213 /*
4214  * AggGetTempMemoryContext - fetch short-term memory context for aggregates
4215  *
4216  * This is useful in agg final functions; the context returned is one that
4217  * the final function can safely reset as desired.  This isn't useful for
4218  * transition functions, since the context returned MAY (we don't promise)
4219  * be the same as the context those are called in.
4220  *
4221  * As above, this is currently not useful for aggs called as window functions.
4222  */
4223 MemoryContext
AggGetTempMemoryContext(FunctionCallInfo fcinfo)4224 AggGetTempMemoryContext(FunctionCallInfo fcinfo)
4225 {
4226 	if (fcinfo->context && IsA(fcinfo->context, AggState))
4227 	{
4228 		AggState   *aggstate = (AggState *) fcinfo->context;
4229 
4230 		return aggstate->tmpcontext->ecxt_per_tuple_memory;
4231 	}
4232 	return NULL;
4233 }
4234 
4235 /*
4236  * AggRegisterCallback - register a cleanup callback for an aggregate
4237  *
4238  * This is useful for aggs to register shutdown callbacks, which will ensure
4239  * that non-memory resources are freed.  The callback will occur just before
4240  * the associated aggcontext (as returned by AggCheckCallContext) is reset,
4241  * either between groups or as a result of rescanning the query.  The callback
4242  * will NOT be called on error paths.  The typical use-case is for freeing of
4243  * tuplestores or tuplesorts maintained in aggcontext, or pins held by slots
4244  * created by the agg functions.  (The callback will not be called until after
4245  * the result of the finalfn is no longer needed, so it's safe for the finalfn
4246  * to return data that will be freed by the callback.)
4247  *
4248  * As above, this is currently not useful for aggs called as window functions.
4249  */
4250 void
AggRegisterCallback(FunctionCallInfo fcinfo,ExprContextCallbackFunction func,Datum arg)4251 AggRegisterCallback(FunctionCallInfo fcinfo,
4252 					ExprContextCallbackFunction func,
4253 					Datum arg)
4254 {
4255 	if (fcinfo->context && IsA(fcinfo->context, AggState))
4256 	{
4257 		AggState   *aggstate = (AggState *) fcinfo->context;
4258 		ExprContext *cxt = aggstate->curaggcontext;
4259 
4260 		RegisterExprContextCallback(cxt, func, arg);
4261 
4262 		return;
4263 	}
4264 	elog(ERROR, "aggregate function cannot register a callback in this context");
4265 }
4266 
4267 
4268 /*
4269  * aggregate_dummy - dummy execution routine for aggregate functions
4270  *
4271  * This function is listed as the implementation (prosrc field) of pg_proc
4272  * entries for aggregate functions.  Its only purpose is to throw an error
4273  * if someone mistakenly executes such a function in the normal way.
4274  *
4275  * Perhaps someday we could assign real meaning to the prosrc field of
4276  * an aggregate?
4277  */
4278 Datum
aggregate_dummy(PG_FUNCTION_ARGS)4279 aggregate_dummy(PG_FUNCTION_ARGS)
4280 {
4281 	elog(ERROR, "aggregate function %u called as normal function",
4282 		 fcinfo->flinfo->fn_oid);
4283 	return (Datum) 0;			/* keep compiler quiet */
4284 }
4285