1 /*-------------------------------------------------------------------------
2  *
3  * nodeAgg.c
4  *	  Routines to handle aggregate nodes.
5  *
6  *	  ExecAgg normally evaluates each aggregate in the following steps:
7  *
8  *		 transvalue = initcond
9  *		 foreach input_tuple do
10  *			transvalue = transfunc(transvalue, input_value(s))
11  *		 result = finalfunc(transvalue, direct_argument(s))
12  *
13  *	  If a finalfunc is not supplied then the result is just the ending
14  *	  value of transvalue.
15  *
16  *	  Other behaviors can be selected by the "aggsplit" mode, which exists
17  *	  to support partial aggregation.  It is possible to:
18  *	  * Skip running the finalfunc, so that the output is always the
19  *	  final transvalue state.
20  *	  * Substitute the combinefunc for the transfunc, so that transvalue
21  *	  states (propagated up from a child partial-aggregation step) are merged
22  *	  rather than processing raw input rows.  (The statements below about
23  *	  the transfunc apply equally to the combinefunc, when it's selected.)
24  *	  * Apply the serializefunc to the output values (this only makes sense
25  *	  when skipping the finalfunc, since the serializefunc works on the
26  *	  transvalue data type).
27  *	  * Apply the deserializefunc to the input values (this only makes sense
28  *	  when using the combinefunc, for similar reasons).
29  *	  It is the planner's responsibility to connect up Agg nodes using these
30  *	  alternate behaviors in a way that makes sense, with partial aggregation
31  *	  results being fed to nodes that expect them.
32  *
33  *	  If a normal aggregate call specifies DISTINCT or ORDER BY, we sort the
34  *	  input tuples and eliminate duplicates (if required) before performing
35  *	  the above-depicted process.  (However, we don't do that for ordered-set
36  *	  aggregates; their "ORDER BY" inputs are ordinary aggregate arguments
37  *	  so far as this module is concerned.)	Note that partial aggregation
38  *	  is not supported in these cases, since we couldn't ensure global
39  *	  ordering or distinctness of the inputs.
40  *
41  *	  If transfunc is marked "strict" in pg_proc and initcond is NULL,
42  *	  then the first non-NULL input_value is assigned directly to transvalue,
43  *	  and transfunc isn't applied until the second non-NULL input_value.
44  *	  The agg's first input type and transtype must be the same in this case!
45  *
46  *	  If transfunc is marked "strict" then NULL input_values are skipped,
47  *	  keeping the previous transvalue.  If transfunc is not strict then it
48  *	  is called for every input tuple and must deal with NULL initcond
49  *	  or NULL input_values for itself.
50  *
51  *	  If finalfunc is marked "strict" then it is not called when the
52  *	  ending transvalue is NULL, instead a NULL result is created
53  *	  automatically (this is just the usual handling of strict functions,
54  *	  of course).  A non-strict finalfunc can make its own choice of
55  *	  what to return for a NULL ending transvalue.
56  *
57  *	  Ordered-set aggregates are treated specially in one other way: we
58  *	  evaluate any "direct" arguments and pass them to the finalfunc along
59  *	  with the transition value.
60  *
61  *	  A finalfunc can have additional arguments beyond the transvalue and
62  *	  any "direct" arguments, corresponding to the input arguments of the
63  *	  aggregate.  These are always just passed as NULL.  Such arguments may be
64  *	  needed to allow resolution of a polymorphic aggregate's result type.
65  *
66  *	  We compute aggregate input expressions and run the transition functions
67  *	  in a temporary econtext (aggstate->tmpcontext).  This is reset at least
68  *	  once per input tuple, so when the transvalue datatype is
69  *	  pass-by-reference, we have to be careful to copy it into a longer-lived
70  *	  memory context, and free the prior value to avoid memory leakage.  We
71  *	  store transvalues in another set of econtexts, aggstate->aggcontexts
72  *	  (one per grouping set, see below), which are also used for the hashtable
73  *	  structures in AGG_HASHED mode.  These econtexts are rescanned, not just
74  *	  reset, at group boundaries so that aggregate transition functions can
75  *	  register shutdown callbacks via AggRegisterCallback.
76  *
77  *	  The node's regular econtext (aggstate->ss.ps.ps_ExprContext) is used to
78  *	  run finalize functions and compute the output tuple; this context can be
79  *	  reset once per output tuple.
80  *
81  *	  The executor's AggState node is passed as the fmgr "context" value in
82  *	  all transfunc and finalfunc calls.  It is not recommended that the
83  *	  transition functions look at the AggState node directly, but they can
84  *	  use AggCheckCallContext() to verify that they are being called by
85  *	  nodeAgg.c (and not as ordinary SQL functions).  The main reason a
86  *	  transition function might want to know this is so that it can avoid
87  *	  palloc'ing a fixed-size pass-by-ref transition value on every call:
88  *	  it can instead just scribble on and return its left input.  Ordinarily
89  *	  it is completely forbidden for functions to modify pass-by-ref inputs,
90  *	  but in the aggregate case we know the left input is either the initial
91  *	  transition value or a previous function result, and in either case its
92  *	  value need not be preserved.  See int8inc() for an example.  Notice that
93  *	  the EEOP_AGG_PLAIN_TRANS step is coded to avoid a data copy step when
94  *	  the previous transition value pointer is returned.  It is also possible
95  *	  to avoid repeated data copying when the transition value is an expanded
96  *	  object: to do that, the transition function must take care to return
97  *	  an expanded object that is in a child context of the memory context
98  *	  returned by AggCheckCallContext().  Also, some transition functions want
99  *	  to store working state in addition to the nominal transition value; they
100  *	  can use the memory context returned by AggCheckCallContext() to do that.
101  *
102  *	  Note: AggCheckCallContext() is available as of PostgreSQL 9.0.  The
103  *	  AggState is available as context in earlier releases (back to 8.1),
104  *	  but direct examination of the node is needed to use it before 9.0.
105  *
106  *	  As of 9.4, aggregate transition functions can also use AggGetAggref()
107  *	  to get hold of the Aggref expression node for their aggregate call.
108  *	  This is mainly intended for ordered-set aggregates, which are not
109  *	  supported as window functions.  (A regular aggregate function would
110  *	  need some fallback logic to use this, since there's no Aggref node
111  *	  for a window function.)
112  *
113  *	  Grouping sets:
114  *
115  *	  A list of grouping sets which is structurally equivalent to a ROLLUP
116  *	  clause (e.g. (a,b,c), (a,b), (a)) can be processed in a single pass over
117  *	  ordered data.  We do this by keeping a separate set of transition values
118  *	  for each grouping set being concurrently processed; for each input tuple
119  *	  we update them all, and on group boundaries we reset those states
120  *	  (starting at the front of the list) whose grouping values have changed
121  *	  (the list of grouping sets is ordered from most specific to least
122  *	  specific).
123  *
124  *	  Where more complex grouping sets are used, we break them down into
125  *	  "phases", where each phase has a different sort order (except phase 0
126  *	  which is reserved for hashing).  During each phase but the last, the
127  *	  input tuples are additionally stored in a tuplesort which is keyed to the
128  *	  next phase's sort order; during each phase but the first, the input
129  *	  tuples are drawn from the previously sorted data.  (The sorting of the
130  *	  data for the first phase is handled by the planner, as it might be
131  *	  satisfied by underlying nodes.)
132  *
133  *	  Hashing can be mixed with sorted grouping.  To do this, we have an
134  *	  AGG_MIXED strategy that populates the hashtables during the first sorted
135  *	  phase, and switches to reading them out after completing all sort phases.
136  *	  We can also support AGG_HASHED with multiple hash tables and no sorting
137  *	  at all.
138  *
139  *	  From the perspective of aggregate transition and final functions, the
140  *	  only issue regarding grouping sets is this: a single call site (flinfo)
141  *	  of an aggregate function may be used for updating several different
142  *	  transition values in turn. So the function must not cache in the flinfo
143  *	  anything which logically belongs as part of the transition value (most
144  *	  importantly, the memory context in which the transition value exists).
145  *	  The support API functions (AggCheckCallContext, AggRegisterCallback) are
146  *	  sensitive to the grouping set for which the aggregate function is
147  *	  currently being called.
148  *
149  *	  Plan structure:
150  *
151  *	  What we get from the planner is actually one "real" Agg node which is
152  *	  part of the plan tree proper, but which optionally has an additional list
153  *	  of Agg nodes hung off the side via the "chain" field.  This is because an
154  *	  Agg node happens to be a convenient representation of all the data we
155  *	  need for grouping sets.
156  *
157  *	  For many purposes, we treat the "real" node as if it were just the first
158  *	  node in the chain.  The chain must be ordered such that hashed entries
159  *	  come before sorted/plain entries; the real node is marked AGG_MIXED if
160  *	  there are both types present (in which case the real node describes one
161  *	  of the hashed groupings, other AGG_HASHED nodes may optionally follow in
162  *	  the chain, followed in turn by AGG_SORTED or (one) AGG_PLAIN node).  If
163  *	  the real node is marked AGG_HASHED or AGG_SORTED, then all the chained
164  *	  nodes must be of the same type; if it is AGG_PLAIN, there can be no
165  *	  chained nodes.
166  *
167  *	  We collect all hashed nodes into a single "phase", numbered 0, and create
168  *	  a sorted phase (numbered 1..n) for each AGG_SORTED or AGG_PLAIN node.
169  *	  Phase 0 is allocated even if there are no hashes, but remains unused in
170  *	  that case.
171  *
172  *	  AGG_HASHED nodes actually refer to only a single grouping set each,
173  *	  because for each hashed grouping we need a separate grpColIdx and
174  *	  numGroups estimate.  AGG_SORTED nodes represent a "rollup", a list of
175  *	  grouping sets that share a sort order.  Each AGG_SORTED node other than
176  *	  the first one has an associated Sort node which describes the sort order
177  *	  to be used; the first sorted node takes its input from the outer subtree,
178  *	  which the planner has already arranged to provide ordered data.
179  *
180  *	  Memory and ExprContext usage:
181  *
182  *	  Because we're accumulating aggregate values across input rows, we need to
183  *	  use more memory contexts than just simple input/output tuple contexts.
184  *	  In fact, for a rollup, we need a separate context for each grouping set
185  *	  so that we can reset the inner (finer-grained) aggregates on their group
186  *	  boundaries while continuing to accumulate values for outer
187  *	  (coarser-grained) groupings.  On top of this, we might be simultaneously
188  *	  populating hashtables; however, we only need one context for all the
189  *	  hashtables.
190  *
191  *	  So we create an array, aggcontexts, with an ExprContext for each grouping
192  *	  set in the largest rollup that we're going to process, and use the
193  *	  per-tuple memory context of those ExprContexts to store the aggregate
194  *	  transition values.  hashcontext is the single context created to support
195  *	  all hash tables.
196  *
197  *    Transition / Combine function invocation:
198  *
199  *    For performance reasons transition functions, including combine
200  *    functions, aren't invoked one-by-one from nodeAgg.c after computing
201  *    arguments using the expression evaluation engine. Instead
202  *    ExecBuildAggTrans() builds one large expression that does both argument
203  *    evaluation and transition function invocation. That avoids performance
204  *    issues due to repeated uses of expression evaluation, complications due
205  *    to filter expressions having to be evaluated early, and allows to JIT
206  *    the entire expression into one native function.
207  *
208  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
209  * Portions Copyright (c) 1994, Regents of the University of California
210  *
211  * IDENTIFICATION
212  *	  src/backend/executor/nodeAgg.c
213  *
214  *-------------------------------------------------------------------------
215  */
216 
217 #include "postgres.h"
218 
219 #include "access/htup_details.h"
220 #include "catalog/objectaccess.h"
221 #include "catalog/pg_aggregate.h"
222 #include "catalog/pg_proc.h"
223 #include "catalog/pg_type.h"
224 #include "executor/execExpr.h"
225 #include "executor/executor.h"
226 #include "executor/nodeAgg.h"
227 #include "miscadmin.h"
228 #include "nodes/makefuncs.h"
229 #include "nodes/nodeFuncs.h"
230 #include "optimizer/optimizer.h"
231 #include "parser/parse_agg.h"
232 #include "parser/parse_coerce.h"
233 #include "utils/acl.h"
234 #include "utils/builtins.h"
235 #include "utils/lsyscache.h"
236 #include "utils/memutils.h"
237 #include "utils/syscache.h"
238 #include "utils/tuplesort.h"
239 #include "utils/datum.h"
240 
241 
242 static void select_current_set(AggState *aggstate, int setno, bool is_hash);
243 static void initialize_phase(AggState *aggstate, int newphase);
244 static TupleTableSlot *fetch_input_tuple(AggState *aggstate);
245 static void initialize_aggregates(AggState *aggstate,
246 								  AggStatePerGroup *pergroups,
247 								  int numReset);
248 static void advance_transition_function(AggState *aggstate,
249 										AggStatePerTrans pertrans,
250 										AggStatePerGroup pergroupstate);
251 static void advance_aggregates(AggState *aggstate);
252 static void process_ordered_aggregate_single(AggState *aggstate,
253 											 AggStatePerTrans pertrans,
254 											 AggStatePerGroup pergroupstate);
255 static void process_ordered_aggregate_multi(AggState *aggstate,
256 											AggStatePerTrans pertrans,
257 											AggStatePerGroup pergroupstate);
258 static void finalize_aggregate(AggState *aggstate,
259 							   AggStatePerAgg peragg,
260 							   AggStatePerGroup pergroupstate,
261 							   Datum *resultVal, bool *resultIsNull);
262 static void finalize_partialaggregate(AggState *aggstate,
263 									  AggStatePerAgg peragg,
264 									  AggStatePerGroup pergroupstate,
265 									  Datum *resultVal, bool *resultIsNull);
266 static void prepare_projection_slot(AggState *aggstate,
267 									TupleTableSlot *slot,
268 									int currentSet);
269 static void finalize_aggregates(AggState *aggstate,
270 								AggStatePerAgg peragg,
271 								AggStatePerGroup pergroup);
272 static TupleTableSlot *project_aggregates(AggState *aggstate);
273 static Bitmapset *find_unaggregated_cols(AggState *aggstate);
274 static bool find_unaggregated_cols_walker(Node *node, Bitmapset **colnos);
275 static void build_hash_table(AggState *aggstate);
276 static TupleHashEntryData *lookup_hash_entry(AggState *aggstate);
277 static void lookup_hash_entries(AggState *aggstate);
278 static TupleTableSlot *agg_retrieve_direct(AggState *aggstate);
279 static void agg_fill_hash_table(AggState *aggstate);
280 static TupleTableSlot *agg_retrieve_hash_table(AggState *aggstate);
281 static Datum GetAggInitVal(Datum textInitVal, Oid transtype);
282 static void build_pertrans_for_aggref(AggStatePerTrans pertrans,
283 									  AggState *aggstate, EState *estate,
284 									  Aggref *aggref, Oid aggtransfn, Oid aggtranstype,
285 									  Oid aggserialfn, Oid aggdeserialfn,
286 									  Datum initValue, bool initValueIsNull,
287 									  Oid *inputTypes, int numArguments);
288 static int	find_compatible_peragg(Aggref *newagg, AggState *aggstate,
289 								   int lastaggno, List **same_input_transnos);
290 static int	find_compatible_pertrans(AggState *aggstate, Aggref *newagg,
291 									 bool shareable,
292 									 Oid aggtransfn, Oid aggtranstype,
293 									 Oid aggserialfn, Oid aggdeserialfn,
294 									 Datum initValue, bool initValueIsNull,
295 									 List *transnos);
296 
297 
298 /*
299  * Select the current grouping set; affects current_set and
300  * curaggcontext.
301  */
302 static void
select_current_set(AggState * aggstate,int setno,bool is_hash)303 select_current_set(AggState *aggstate, int setno, bool is_hash)
304 {
305 	/* when changing this, also adapt ExecInterpExpr() and friends */
306 	if (is_hash)
307 		aggstate->curaggcontext = aggstate->hashcontext;
308 	else
309 		aggstate->curaggcontext = aggstate->aggcontexts[setno];
310 
311 	aggstate->current_set = setno;
312 }
313 
314 /*
315  * Switch to phase "newphase", which must either be 0 or 1 (to reset) or
316  * current_phase + 1. Juggle the tuplesorts accordingly.
317  *
318  * Phase 0 is for hashing, which we currently handle last in the AGG_MIXED
319  * case, so when entering phase 0, all we need to do is drop open sorts.
320  */
321 static void
initialize_phase(AggState * aggstate,int newphase)322 initialize_phase(AggState *aggstate, int newphase)
323 {
324 	Assert(newphase <= 1 || newphase == aggstate->current_phase + 1);
325 
326 	/*
327 	 * Whatever the previous state, we're now done with whatever input
328 	 * tuplesort was in use.
329 	 */
330 	if (aggstate->sort_in)
331 	{
332 		tuplesort_end(aggstate->sort_in);
333 		aggstate->sort_in = NULL;
334 	}
335 
336 	if (newphase <= 1)
337 	{
338 		/*
339 		 * Discard any existing output tuplesort.
340 		 */
341 		if (aggstate->sort_out)
342 		{
343 			tuplesort_end(aggstate->sort_out);
344 			aggstate->sort_out = NULL;
345 		}
346 	}
347 	else
348 	{
349 		/*
350 		 * The old output tuplesort becomes the new input one, and this is the
351 		 * right time to actually sort it.
352 		 */
353 		aggstate->sort_in = aggstate->sort_out;
354 		aggstate->sort_out = NULL;
355 		Assert(aggstate->sort_in);
356 		tuplesort_performsort(aggstate->sort_in);
357 	}
358 
359 	/*
360 	 * If this isn't the last phase, we need to sort appropriately for the
361 	 * next phase in sequence.
362 	 */
363 	if (newphase > 0 && newphase < aggstate->numphases - 1)
364 	{
365 		Sort	   *sortnode = aggstate->phases[newphase + 1].sortnode;
366 		PlanState  *outerNode = outerPlanState(aggstate);
367 		TupleDesc	tupDesc = ExecGetResultType(outerNode);
368 
369 		aggstate->sort_out = tuplesort_begin_heap(tupDesc,
370 												  sortnode->numCols,
371 												  sortnode->sortColIdx,
372 												  sortnode->sortOperators,
373 												  sortnode->collations,
374 												  sortnode->nullsFirst,
375 												  work_mem,
376 												  NULL, false);
377 	}
378 
379 	aggstate->current_phase = newphase;
380 	aggstate->phase = &aggstate->phases[newphase];
381 }
382 
383 /*
384  * Fetch a tuple from either the outer plan (for phase 1) or from the sorter
385  * populated by the previous phase.  Copy it to the sorter for the next phase
386  * if any.
387  *
388  * Callers cannot rely on memory for tuple in returned slot remaining valid
389  * past any subsequently fetched tuple.
390  */
391 static TupleTableSlot *
fetch_input_tuple(AggState * aggstate)392 fetch_input_tuple(AggState *aggstate)
393 {
394 	TupleTableSlot *slot;
395 
396 	if (aggstate->sort_in)
397 	{
398 		/* make sure we check for interrupts in either path through here */
399 		CHECK_FOR_INTERRUPTS();
400 		if (!tuplesort_gettupleslot(aggstate->sort_in, true, false,
401 									aggstate->sort_slot, NULL))
402 			return NULL;
403 		slot = aggstate->sort_slot;
404 	}
405 	else
406 		slot = ExecProcNode(outerPlanState(aggstate));
407 
408 	if (!TupIsNull(slot) && aggstate->sort_out)
409 		tuplesort_puttupleslot(aggstate->sort_out, slot);
410 
411 	return slot;
412 }
413 
414 /*
415  * (Re)Initialize an individual aggregate.
416  *
417  * This function handles only one grouping set, already set in
418  * aggstate->current_set.
419  *
420  * When called, CurrentMemoryContext should be the per-query context.
421  */
422 static void
initialize_aggregate(AggState * aggstate,AggStatePerTrans pertrans,AggStatePerGroup pergroupstate)423 initialize_aggregate(AggState *aggstate, AggStatePerTrans pertrans,
424 					 AggStatePerGroup pergroupstate)
425 {
426 	/*
427 	 * Start a fresh sort operation for each DISTINCT/ORDER BY aggregate.
428 	 */
429 	if (pertrans->numSortCols > 0)
430 	{
431 		/*
432 		 * In case of rescan, maybe there could be an uncompleted sort
433 		 * operation?  Clean it up if so.
434 		 */
435 		if (pertrans->sortstates[aggstate->current_set])
436 			tuplesort_end(pertrans->sortstates[aggstate->current_set]);
437 
438 
439 		/*
440 		 * We use a plain Datum sorter when there's a single input column;
441 		 * otherwise sort the full tuple.  (See comments for
442 		 * process_ordered_aggregate_single.)
443 		 */
444 		if (pertrans->numInputs == 1)
445 		{
446 			Form_pg_attribute attr = TupleDescAttr(pertrans->sortdesc, 0);
447 
448 			pertrans->sortstates[aggstate->current_set] =
449 				tuplesort_begin_datum(attr->atttypid,
450 									  pertrans->sortOperators[0],
451 									  pertrans->sortCollations[0],
452 									  pertrans->sortNullsFirst[0],
453 									  work_mem, NULL, false);
454 		}
455 		else
456 			pertrans->sortstates[aggstate->current_set] =
457 				tuplesort_begin_heap(pertrans->sortdesc,
458 									 pertrans->numSortCols,
459 									 pertrans->sortColIdx,
460 									 pertrans->sortOperators,
461 									 pertrans->sortCollations,
462 									 pertrans->sortNullsFirst,
463 									 work_mem, NULL, false);
464 	}
465 
466 	/*
467 	 * (Re)set transValue to the initial value.
468 	 *
469 	 * Note that when the initial value is pass-by-ref, we must copy it (into
470 	 * the aggcontext) since we will pfree the transValue later.
471 	 */
472 	if (pertrans->initValueIsNull)
473 		pergroupstate->transValue = pertrans->initValue;
474 	else
475 	{
476 		MemoryContext oldContext;
477 
478 		oldContext = MemoryContextSwitchTo(
479 										   aggstate->curaggcontext->ecxt_per_tuple_memory);
480 		pergroupstate->transValue = datumCopy(pertrans->initValue,
481 											  pertrans->transtypeByVal,
482 											  pertrans->transtypeLen);
483 		MemoryContextSwitchTo(oldContext);
484 	}
485 	pergroupstate->transValueIsNull = pertrans->initValueIsNull;
486 
487 	/*
488 	 * If the initial value for the transition state doesn't exist in the
489 	 * pg_aggregate table then we will let the first non-NULL value returned
490 	 * from the outer procNode become the initial value. (This is useful for
491 	 * aggregates like max() and min().) The noTransValue flag signals that we
492 	 * still need to do this.
493 	 */
494 	pergroupstate->noTransValue = pertrans->initValueIsNull;
495 }
496 
497 /*
498  * Initialize all aggregate transition states for a new group of input values.
499  *
500  * If there are multiple grouping sets, we initialize only the first numReset
501  * of them (the grouping sets are ordered so that the most specific one, which
502  * is reset most often, is first). As a convenience, if numReset is 0, we
503  * reinitialize all sets.
504  *
505  * NB: This cannot be used for hash aggregates, as for those the grouping set
506  * number has to be specified from further up.
507  *
508  * When called, CurrentMemoryContext should be the per-query context.
509  */
510 static void
initialize_aggregates(AggState * aggstate,AggStatePerGroup * pergroups,int numReset)511 initialize_aggregates(AggState *aggstate,
512 					  AggStatePerGroup *pergroups,
513 					  int numReset)
514 {
515 	int			transno;
516 	int			numGroupingSets = Max(aggstate->phase->numsets, 1);
517 	int			setno = 0;
518 	int			numTrans = aggstate->numtrans;
519 	AggStatePerTrans transstates = aggstate->pertrans;
520 
521 	if (numReset == 0)
522 		numReset = numGroupingSets;
523 
524 	for (setno = 0; setno < numReset; setno++)
525 	{
526 		AggStatePerGroup pergroup = pergroups[setno];
527 
528 		select_current_set(aggstate, setno, false);
529 
530 		for (transno = 0; transno < numTrans; transno++)
531 		{
532 			AggStatePerTrans pertrans = &transstates[transno];
533 			AggStatePerGroup pergroupstate = &pergroup[transno];
534 
535 			initialize_aggregate(aggstate, pertrans, pergroupstate);
536 		}
537 	}
538 }
539 
540 /*
541  * Given new input value(s), advance the transition function of one aggregate
542  * state within one grouping set only (already set in aggstate->current_set)
543  *
544  * The new values (and null flags) have been preloaded into argument positions
545  * 1 and up in pertrans->transfn_fcinfo, so that we needn't copy them again to
546  * pass to the transition function.  We also expect that the static fields of
547  * the fcinfo are already initialized; that was done by ExecInitAgg().
548  *
549  * It doesn't matter which memory context this is called in.
550  */
551 static void
advance_transition_function(AggState * aggstate,AggStatePerTrans pertrans,AggStatePerGroup pergroupstate)552 advance_transition_function(AggState *aggstate,
553 							AggStatePerTrans pertrans,
554 							AggStatePerGroup pergroupstate)
555 {
556 	FunctionCallInfo fcinfo = pertrans->transfn_fcinfo;
557 	MemoryContext oldContext;
558 	Datum		newVal;
559 
560 	if (pertrans->transfn.fn_strict)
561 	{
562 		/*
563 		 * For a strict transfn, nothing happens when there's a NULL input; we
564 		 * just keep the prior transValue.
565 		 */
566 		int			numTransInputs = pertrans->numTransInputs;
567 		int			i;
568 
569 		for (i = 1; i <= numTransInputs; i++)
570 		{
571 			if (fcinfo->args[i].isnull)
572 				return;
573 		}
574 		if (pergroupstate->noTransValue)
575 		{
576 			/*
577 			 * transValue has not been initialized. This is the first non-NULL
578 			 * input value. We use it as the initial value for transValue. (We
579 			 * already checked that the agg's input type is binary-compatible
580 			 * with its transtype, so straight copy here is OK.)
581 			 *
582 			 * We must copy the datum into aggcontext if it is pass-by-ref. We
583 			 * do not need to pfree the old transValue, since it's NULL.
584 			 */
585 			oldContext = MemoryContextSwitchTo(
586 											   aggstate->curaggcontext->ecxt_per_tuple_memory);
587 			pergroupstate->transValue = datumCopy(fcinfo->args[1].value,
588 												  pertrans->transtypeByVal,
589 												  pertrans->transtypeLen);
590 			pergroupstate->transValueIsNull = false;
591 			pergroupstate->noTransValue = false;
592 			MemoryContextSwitchTo(oldContext);
593 			return;
594 		}
595 		if (pergroupstate->transValueIsNull)
596 		{
597 			/*
598 			 * Don't call a strict function with NULL inputs.  Note it is
599 			 * possible to get here despite the above tests, if the transfn is
600 			 * strict *and* returned a NULL on a prior cycle. If that happens
601 			 * we will propagate the NULL all the way to the end.
602 			 */
603 			return;
604 		}
605 	}
606 
607 	/* We run the transition functions in per-input-tuple memory context */
608 	oldContext = MemoryContextSwitchTo(aggstate->tmpcontext->ecxt_per_tuple_memory);
609 
610 	/* set up aggstate->curpertrans for AggGetAggref() */
611 	aggstate->curpertrans = pertrans;
612 
613 	/*
614 	 * OK to call the transition function
615 	 */
616 	fcinfo->args[0].value = pergroupstate->transValue;
617 	fcinfo->args[0].isnull = pergroupstate->transValueIsNull;
618 	fcinfo->isnull = false;		/* just in case transfn doesn't set it */
619 
620 	newVal = FunctionCallInvoke(fcinfo);
621 
622 	aggstate->curpertrans = NULL;
623 
624 	/*
625 	 * If pass-by-ref datatype, must copy the new value into aggcontext and
626 	 * free the prior transValue.  But if transfn returned a pointer to its
627 	 * first input, we don't need to do anything.  Also, if transfn returned a
628 	 * pointer to a R/W expanded object that is already a child of the
629 	 * aggcontext, assume we can adopt that value without copying it.
630 	 *
631 	 * It's safe to compare newVal with pergroup->transValue without
632 	 * regard for either being NULL, because ExecAggTransReparent()
633 	 * takes care to set transValue to 0 when NULL. Otherwise we could
634 	 * end up accidentally not reparenting, when the transValue has
635 	 * the same numerical value as newValue, despite being NULL.  This
636 	 * is a somewhat hot path, making it undesirable to instead solve
637 	 * this with another branch for the common case of the transition
638 	 * function returning its (modified) input argument.
639 	 */
640 	if (!pertrans->transtypeByVal &&
641 		DatumGetPointer(newVal) != DatumGetPointer(pergroupstate->transValue))
642 		newVal = ExecAggTransReparent(aggstate, pertrans,
643 									  newVal, fcinfo->isnull,
644 									  pergroupstate->transValue,
645 									  pergroupstate->transValueIsNull);
646 
647 	pergroupstate->transValue = newVal;
648 	pergroupstate->transValueIsNull = fcinfo->isnull;
649 
650 	MemoryContextSwitchTo(oldContext);
651 }
652 
653 /*
654  * Advance each aggregate transition state for one input tuple.  The input
655  * tuple has been stored in tmpcontext->ecxt_outertuple, so that it is
656  * accessible to ExecEvalExpr.
657  *
658  * We have two sets of transition states to handle: one for sorted aggregation
659  * and one for hashed; we do them both here, to avoid multiple evaluation of
660  * the inputs.
661  *
662  * When called, CurrentMemoryContext should be the per-query context.
663  */
664 static void
advance_aggregates(AggState * aggstate)665 advance_aggregates(AggState *aggstate)
666 {
667 	bool		dummynull;
668 
669 	ExecEvalExprSwitchContext(aggstate->phase->evaltrans,
670 							  aggstate->tmpcontext,
671 							  &dummynull);
672 }
673 
674 /*
675  * Run the transition function for a DISTINCT or ORDER BY aggregate
676  * with only one input.  This is called after we have completed
677  * entering all the input values into the sort object.  We complete the
678  * sort, read out the values in sorted order, and run the transition
679  * function on each value (applying DISTINCT if appropriate).
680  *
681  * Note that the strictness of the transition function was checked when
682  * entering the values into the sort, so we don't check it again here;
683  * we just apply standard SQL DISTINCT logic.
684  *
685  * The one-input case is handled separately from the multi-input case
686  * for performance reasons: for single by-value inputs, such as the
687  * common case of count(distinct id), the tuplesort_getdatum code path
688  * is around 300% faster.  (The speedup for by-reference types is less
689  * but still noticeable.)
690  *
691  * This function handles only one grouping set (already set in
692  * aggstate->current_set).
693  *
694  * When called, CurrentMemoryContext should be the per-query context.
695  */
696 static void
process_ordered_aggregate_single(AggState * aggstate,AggStatePerTrans pertrans,AggStatePerGroup pergroupstate)697 process_ordered_aggregate_single(AggState *aggstate,
698 								 AggStatePerTrans pertrans,
699 								 AggStatePerGroup pergroupstate)
700 {
701 	Datum		oldVal = (Datum) 0;
702 	bool		oldIsNull = true;
703 	bool		haveOldVal = false;
704 	MemoryContext workcontext = aggstate->tmpcontext->ecxt_per_tuple_memory;
705 	MemoryContext oldContext;
706 	bool		isDistinct = (pertrans->numDistinctCols > 0);
707 	Datum		newAbbrevVal = (Datum) 0;
708 	Datum		oldAbbrevVal = (Datum) 0;
709 	FunctionCallInfo fcinfo = pertrans->transfn_fcinfo;
710 	Datum	   *newVal;
711 	bool	   *isNull;
712 
713 	Assert(pertrans->numDistinctCols < 2);
714 
715 	tuplesort_performsort(pertrans->sortstates[aggstate->current_set]);
716 
717 	/* Load the column into argument 1 (arg 0 will be transition value) */
718 	newVal = &fcinfo->args[1].value;
719 	isNull = &fcinfo->args[1].isnull;
720 
721 	/*
722 	 * Note: if input type is pass-by-ref, the datums returned by the sort are
723 	 * freshly palloc'd in the per-query context, so we must be careful to
724 	 * pfree them when they are no longer needed.
725 	 */
726 
727 	while (tuplesort_getdatum(pertrans->sortstates[aggstate->current_set],
728 							  true, newVal, isNull, &newAbbrevVal))
729 	{
730 		/*
731 		 * Clear and select the working context for evaluation of the equality
732 		 * function and transition function.
733 		 */
734 		MemoryContextReset(workcontext);
735 		oldContext = MemoryContextSwitchTo(workcontext);
736 
737 		/*
738 		 * If DISTINCT mode, and not distinct from prior, skip it.
739 		 */
740 		if (isDistinct &&
741 			haveOldVal &&
742 			((oldIsNull && *isNull) ||
743 			 (!oldIsNull && !*isNull &&
744 			  oldAbbrevVal == newAbbrevVal &&
745 			  DatumGetBool(FunctionCall2Coll(&pertrans->equalfnOne,
746 											 pertrans->aggCollation,
747 											 oldVal, *newVal)))))
748 		{
749 			/* equal to prior, so forget this one */
750 			if (!pertrans->inputtypeByVal && !*isNull)
751 				pfree(DatumGetPointer(*newVal));
752 		}
753 		else
754 		{
755 			advance_transition_function(aggstate, pertrans, pergroupstate);
756 			/* forget the old value, if any */
757 			if (!oldIsNull && !pertrans->inputtypeByVal)
758 				pfree(DatumGetPointer(oldVal));
759 			/* and remember the new one for subsequent equality checks */
760 			oldVal = *newVal;
761 			oldAbbrevVal = newAbbrevVal;
762 			oldIsNull = *isNull;
763 			haveOldVal = true;
764 		}
765 
766 		MemoryContextSwitchTo(oldContext);
767 	}
768 
769 	if (!oldIsNull && !pertrans->inputtypeByVal)
770 		pfree(DatumGetPointer(oldVal));
771 
772 	tuplesort_end(pertrans->sortstates[aggstate->current_set]);
773 	pertrans->sortstates[aggstate->current_set] = NULL;
774 }
775 
776 /*
777  * Run the transition function for a DISTINCT or ORDER BY aggregate
778  * with more than one input.  This is called after we have completed
779  * entering all the input values into the sort object.  We complete the
780  * sort, read out the values in sorted order, and run the transition
781  * function on each value (applying DISTINCT if appropriate).
782  *
783  * This function handles only one grouping set (already set in
784  * aggstate->current_set).
785  *
786  * When called, CurrentMemoryContext should be the per-query context.
787  */
788 static void
process_ordered_aggregate_multi(AggState * aggstate,AggStatePerTrans pertrans,AggStatePerGroup pergroupstate)789 process_ordered_aggregate_multi(AggState *aggstate,
790 								AggStatePerTrans pertrans,
791 								AggStatePerGroup pergroupstate)
792 {
793 	ExprContext *tmpcontext = aggstate->tmpcontext;
794 	FunctionCallInfo fcinfo = pertrans->transfn_fcinfo;
795 	TupleTableSlot *slot1 = pertrans->sortslot;
796 	TupleTableSlot *slot2 = pertrans->uniqslot;
797 	int			numTransInputs = pertrans->numTransInputs;
798 	int			numDistinctCols = pertrans->numDistinctCols;
799 	Datum		newAbbrevVal = (Datum) 0;
800 	Datum		oldAbbrevVal = (Datum) 0;
801 	bool		haveOldValue = false;
802 	TupleTableSlot *save = aggstate->tmpcontext->ecxt_outertuple;
803 	int			i;
804 
805 	tuplesort_performsort(pertrans->sortstates[aggstate->current_set]);
806 
807 	ExecClearTuple(slot1);
808 	if (slot2)
809 		ExecClearTuple(slot2);
810 
811 	while (tuplesort_gettupleslot(pertrans->sortstates[aggstate->current_set],
812 								  true, true, slot1, &newAbbrevVal))
813 	{
814 		CHECK_FOR_INTERRUPTS();
815 
816 		tmpcontext->ecxt_outertuple = slot1;
817 		tmpcontext->ecxt_innertuple = slot2;
818 
819 		if (numDistinctCols == 0 ||
820 			!haveOldValue ||
821 			newAbbrevVal != oldAbbrevVal ||
822 			!ExecQual(pertrans->equalfnMulti, tmpcontext))
823 		{
824 			/*
825 			 * Extract the first numTransInputs columns as datums to pass to
826 			 * the transfn.
827 			 */
828 			slot_getsomeattrs(slot1, numTransInputs);
829 
830 			/* Load values into fcinfo */
831 			/* Start from 1, since the 0th arg will be the transition value */
832 			for (i = 0; i < numTransInputs; i++)
833 			{
834 				fcinfo->args[i + 1].value = slot1->tts_values[i];
835 				fcinfo->args[i + 1].isnull = slot1->tts_isnull[i];
836 			}
837 
838 			advance_transition_function(aggstate, pertrans, pergroupstate);
839 
840 			if (numDistinctCols > 0)
841 			{
842 				/* swap the slot pointers to retain the current tuple */
843 				TupleTableSlot *tmpslot = slot2;
844 
845 				slot2 = slot1;
846 				slot1 = tmpslot;
847 				/* avoid ExecQual() calls by reusing abbreviated keys */
848 				oldAbbrevVal = newAbbrevVal;
849 				haveOldValue = true;
850 			}
851 		}
852 
853 		/* Reset context each time */
854 		ResetExprContext(tmpcontext);
855 
856 		ExecClearTuple(slot1);
857 	}
858 
859 	if (slot2)
860 		ExecClearTuple(slot2);
861 
862 	tuplesort_end(pertrans->sortstates[aggstate->current_set]);
863 	pertrans->sortstates[aggstate->current_set] = NULL;
864 
865 	/* restore previous slot, potentially in use for grouping sets */
866 	tmpcontext->ecxt_outertuple = save;
867 }
868 
869 /*
870  * Compute the final value of one aggregate.
871  *
872  * This function handles only one grouping set (already set in
873  * aggstate->current_set).
874  *
875  * The finalfunction will be run, and the result delivered, in the
876  * output-tuple context; caller's CurrentMemoryContext does not matter.
877  *
878  * The finalfn uses the state as set in the transno. This also might be
879  * being used by another aggregate function, so it's important that we do
880  * nothing destructive here.
881  */
882 static void
finalize_aggregate(AggState * aggstate,AggStatePerAgg peragg,AggStatePerGroup pergroupstate,Datum * resultVal,bool * resultIsNull)883 finalize_aggregate(AggState *aggstate,
884 				   AggStatePerAgg peragg,
885 				   AggStatePerGroup pergroupstate,
886 				   Datum *resultVal, bool *resultIsNull)
887 {
888 	LOCAL_FCINFO(fcinfo, FUNC_MAX_ARGS);
889 	bool		anynull = false;
890 	MemoryContext oldContext;
891 	int			i;
892 	ListCell   *lc;
893 	AggStatePerTrans pertrans = &aggstate->pertrans[peragg->transno];
894 
895 	oldContext = MemoryContextSwitchTo(aggstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
896 
897 	/*
898 	 * Evaluate any direct arguments.  We do this even if there's no finalfn
899 	 * (which is unlikely anyway), so that side-effects happen as expected.
900 	 * The direct arguments go into arg positions 1 and up, leaving position 0
901 	 * for the transition state value.
902 	 */
903 	i = 1;
904 	foreach(lc, peragg->aggdirectargs)
905 	{
906 		ExprState  *expr = (ExprState *) lfirst(lc);
907 
908 		fcinfo->args[i].value = ExecEvalExpr(expr,
909 											 aggstate->ss.ps.ps_ExprContext,
910 											 &fcinfo->args[i].isnull);
911 		anynull |= fcinfo->args[i].isnull;
912 		i++;
913 	}
914 
915 	/*
916 	 * Apply the agg's finalfn if one is provided, else return transValue.
917 	 */
918 	if (OidIsValid(peragg->finalfn_oid))
919 	{
920 		int			numFinalArgs = peragg->numFinalArgs;
921 
922 		/* set up aggstate->curperagg for AggGetAggref() */
923 		aggstate->curperagg = peragg;
924 
925 		InitFunctionCallInfoData(*fcinfo, &peragg->finalfn,
926 								 numFinalArgs,
927 								 pertrans->aggCollation,
928 								 (void *) aggstate, NULL);
929 
930 		/* Fill in the transition state value */
931 		fcinfo->args[0].value =
932 			MakeExpandedObjectReadOnly(pergroupstate->transValue,
933 									   pergroupstate->transValueIsNull,
934 									   pertrans->transtypeLen);
935 		fcinfo->args[0].isnull = pergroupstate->transValueIsNull;
936 		anynull |= pergroupstate->transValueIsNull;
937 
938 		/* Fill any remaining argument positions with nulls */
939 		for (; i < numFinalArgs; i++)
940 		{
941 			fcinfo->args[i].value = (Datum) 0;
942 			fcinfo->args[i].isnull = true;
943 			anynull = true;
944 		}
945 
946 		if (fcinfo->flinfo->fn_strict && anynull)
947 		{
948 			/* don't call a strict function with NULL inputs */
949 			*resultVal = (Datum) 0;
950 			*resultIsNull = true;
951 		}
952 		else
953 		{
954 			*resultVal = FunctionCallInvoke(fcinfo);
955 			*resultIsNull = fcinfo->isnull;
956 		}
957 		aggstate->curperagg = NULL;
958 	}
959 	else
960 	{
961 		/* Don't need MakeExpandedObjectReadOnly; datumCopy will copy it */
962 		*resultVal = pergroupstate->transValue;
963 		*resultIsNull = pergroupstate->transValueIsNull;
964 	}
965 
966 	/*
967 	 * If result is pass-by-ref, make sure it is in the right context.
968 	 */
969 	if (!peragg->resulttypeByVal && !*resultIsNull &&
970 		!MemoryContextContains(CurrentMemoryContext,
971 							   DatumGetPointer(*resultVal)))
972 		*resultVal = datumCopy(*resultVal,
973 							   peragg->resulttypeByVal,
974 							   peragg->resulttypeLen);
975 
976 	MemoryContextSwitchTo(oldContext);
977 }
978 
979 /*
980  * Compute the output value of one partial aggregate.
981  *
982  * The serialization function will be run, and the result delivered, in the
983  * output-tuple context; caller's CurrentMemoryContext does not matter.
984  */
985 static void
finalize_partialaggregate(AggState * aggstate,AggStatePerAgg peragg,AggStatePerGroup pergroupstate,Datum * resultVal,bool * resultIsNull)986 finalize_partialaggregate(AggState *aggstate,
987 						  AggStatePerAgg peragg,
988 						  AggStatePerGroup pergroupstate,
989 						  Datum *resultVal, bool *resultIsNull)
990 {
991 	AggStatePerTrans pertrans = &aggstate->pertrans[peragg->transno];
992 	MemoryContext oldContext;
993 
994 	oldContext = MemoryContextSwitchTo(aggstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
995 
996 	/*
997 	 * serialfn_oid will be set if we must serialize the transvalue before
998 	 * returning it
999 	 */
1000 	if (OidIsValid(pertrans->serialfn_oid))
1001 	{
1002 		/* Don't call a strict serialization function with NULL input. */
1003 		if (pertrans->serialfn.fn_strict && pergroupstate->transValueIsNull)
1004 		{
1005 			*resultVal = (Datum) 0;
1006 			*resultIsNull = true;
1007 		}
1008 		else
1009 		{
1010 			FunctionCallInfo fcinfo = pertrans->serialfn_fcinfo;
1011 
1012 			fcinfo->args[0].value =
1013 				MakeExpandedObjectReadOnly(pergroupstate->transValue,
1014 										   pergroupstate->transValueIsNull,
1015 										   pertrans->transtypeLen);
1016 			fcinfo->args[0].isnull = pergroupstate->transValueIsNull;
1017 			fcinfo->isnull = false;
1018 
1019 			*resultVal = FunctionCallInvoke(fcinfo);
1020 			*resultIsNull = fcinfo->isnull;
1021 		}
1022 	}
1023 	else
1024 	{
1025 		/* Don't need MakeExpandedObjectReadOnly; datumCopy will copy it */
1026 		*resultVal = pergroupstate->transValue;
1027 		*resultIsNull = pergroupstate->transValueIsNull;
1028 	}
1029 
1030 	/* If result is pass-by-ref, make sure it is in the right context. */
1031 	if (!peragg->resulttypeByVal && !*resultIsNull &&
1032 		!MemoryContextContains(CurrentMemoryContext,
1033 							   DatumGetPointer(*resultVal)))
1034 		*resultVal = datumCopy(*resultVal,
1035 							   peragg->resulttypeByVal,
1036 							   peragg->resulttypeLen);
1037 
1038 	MemoryContextSwitchTo(oldContext);
1039 }
1040 
1041 /*
1042  * Prepare to finalize and project based on the specified representative tuple
1043  * slot and grouping set.
1044  *
1045  * In the specified tuple slot, force to null all attributes that should be
1046  * read as null in the context of the current grouping set.  Also stash the
1047  * current group bitmap where GroupingExpr can get at it.
1048  *
1049  * This relies on three conditions:
1050  *
1051  * 1) Nothing is ever going to try and extract the whole tuple from this slot,
1052  * only reference it in evaluations, which will only access individual
1053  * attributes.
1054  *
1055  * 2) No system columns are going to need to be nulled. (If a system column is
1056  * referenced in a group clause, it is actually projected in the outer plan
1057  * tlist.)
1058  *
1059  * 3) Within a given phase, we never need to recover the value of an attribute
1060  * once it has been set to null.
1061  *
1062  * Poking into the slot this way is a bit ugly, but the consensus is that the
1063  * alternative was worse.
1064  */
1065 static void
prepare_projection_slot(AggState * aggstate,TupleTableSlot * slot,int currentSet)1066 prepare_projection_slot(AggState *aggstate, TupleTableSlot *slot, int currentSet)
1067 {
1068 	if (aggstate->phase->grouped_cols)
1069 	{
1070 		Bitmapset  *grouped_cols = aggstate->phase->grouped_cols[currentSet];
1071 
1072 		aggstate->grouped_cols = grouped_cols;
1073 
1074 		if (TTS_EMPTY(slot))
1075 		{
1076 			/*
1077 			 * Force all values to be NULL if working on an empty input tuple
1078 			 * (i.e. an empty grouping set for which no input rows were
1079 			 * supplied).
1080 			 */
1081 			ExecStoreAllNullTuple(slot);
1082 		}
1083 		else if (aggstate->all_grouped_cols)
1084 		{
1085 			ListCell   *lc;
1086 
1087 			/* all_grouped_cols is arranged in desc order */
1088 			slot_getsomeattrs(slot, linitial_int(aggstate->all_grouped_cols));
1089 
1090 			foreach(lc, aggstate->all_grouped_cols)
1091 			{
1092 				int			attnum = lfirst_int(lc);
1093 
1094 				if (!bms_is_member(attnum, grouped_cols))
1095 					slot->tts_isnull[attnum - 1] = true;
1096 			}
1097 		}
1098 	}
1099 }
1100 
1101 /*
1102  * Compute the final value of all aggregates for one group.
1103  *
1104  * This function handles only one grouping set at a time, which the caller must
1105  * have selected.  It's also the caller's responsibility to adjust the supplied
1106  * pergroup parameter to point to the current set's transvalues.
1107  *
1108  * Results are stored in the output econtext aggvalues/aggnulls.
1109  */
1110 static void
finalize_aggregates(AggState * aggstate,AggStatePerAgg peraggs,AggStatePerGroup pergroup)1111 finalize_aggregates(AggState *aggstate,
1112 					AggStatePerAgg peraggs,
1113 					AggStatePerGroup pergroup)
1114 {
1115 	ExprContext *econtext = aggstate->ss.ps.ps_ExprContext;
1116 	Datum	   *aggvalues = econtext->ecxt_aggvalues;
1117 	bool	   *aggnulls = econtext->ecxt_aggnulls;
1118 	int			aggno;
1119 	int			transno;
1120 
1121 	/*
1122 	 * If there were any DISTINCT and/or ORDER BY aggregates, sort their
1123 	 * inputs and run the transition functions.
1124 	 */
1125 	for (transno = 0; transno < aggstate->numtrans; transno++)
1126 	{
1127 		AggStatePerTrans pertrans = &aggstate->pertrans[transno];
1128 		AggStatePerGroup pergroupstate;
1129 
1130 		pergroupstate = &pergroup[transno];
1131 
1132 		if (pertrans->numSortCols > 0)
1133 		{
1134 			Assert(aggstate->aggstrategy != AGG_HASHED &&
1135 				   aggstate->aggstrategy != AGG_MIXED);
1136 
1137 			if (pertrans->numInputs == 1)
1138 				process_ordered_aggregate_single(aggstate,
1139 												 pertrans,
1140 												 pergroupstate);
1141 			else
1142 				process_ordered_aggregate_multi(aggstate,
1143 												pertrans,
1144 												pergroupstate);
1145 		}
1146 	}
1147 
1148 	/*
1149 	 * Run the final functions.
1150 	 */
1151 	for (aggno = 0; aggno < aggstate->numaggs; aggno++)
1152 	{
1153 		AggStatePerAgg peragg = &peraggs[aggno];
1154 		int			transno = peragg->transno;
1155 		AggStatePerGroup pergroupstate;
1156 
1157 		pergroupstate = &pergroup[transno];
1158 
1159 		if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit))
1160 			finalize_partialaggregate(aggstate, peragg, pergroupstate,
1161 									  &aggvalues[aggno], &aggnulls[aggno]);
1162 		else
1163 			finalize_aggregate(aggstate, peragg, pergroupstate,
1164 							   &aggvalues[aggno], &aggnulls[aggno]);
1165 	}
1166 }
1167 
1168 /*
1169  * Project the result of a group (whose aggs have already been calculated by
1170  * finalize_aggregates). Returns the result slot, or NULL if no row is
1171  * projected (suppressed by qual).
1172  */
1173 static TupleTableSlot *
project_aggregates(AggState * aggstate)1174 project_aggregates(AggState *aggstate)
1175 {
1176 	ExprContext *econtext = aggstate->ss.ps.ps_ExprContext;
1177 
1178 	/*
1179 	 * Check the qual (HAVING clause); if the group does not match, ignore it.
1180 	 */
1181 	if (ExecQual(aggstate->ss.ps.qual, econtext))
1182 	{
1183 		/*
1184 		 * Form and return projection tuple using the aggregate results and
1185 		 * the representative input tuple.
1186 		 */
1187 		return ExecProject(aggstate->ss.ps.ps_ProjInfo);
1188 	}
1189 	else
1190 		InstrCountFiltered1(aggstate, 1);
1191 
1192 	return NULL;
1193 }
1194 
1195 /*
1196  * find_unaggregated_cols
1197  *	  Construct a bitmapset of the column numbers of un-aggregated Vars
1198  *	  appearing in our targetlist and qual (HAVING clause)
1199  */
1200 static Bitmapset *
find_unaggregated_cols(AggState * aggstate)1201 find_unaggregated_cols(AggState *aggstate)
1202 {
1203 	Agg		   *node = (Agg *) aggstate->ss.ps.plan;
1204 	Bitmapset  *colnos;
1205 
1206 	colnos = NULL;
1207 	(void) find_unaggregated_cols_walker((Node *) node->plan.targetlist,
1208 										 &colnos);
1209 	(void) find_unaggregated_cols_walker((Node *) node->plan.qual,
1210 										 &colnos);
1211 	return colnos;
1212 }
1213 
1214 static bool
find_unaggregated_cols_walker(Node * node,Bitmapset ** colnos)1215 find_unaggregated_cols_walker(Node *node, Bitmapset **colnos)
1216 {
1217 	if (node == NULL)
1218 		return false;
1219 	if (IsA(node, Var))
1220 	{
1221 		Var		   *var = (Var *) node;
1222 
1223 		/* setrefs.c should have set the varno to OUTER_VAR */
1224 		Assert(var->varno == OUTER_VAR);
1225 		Assert(var->varlevelsup == 0);
1226 		*colnos = bms_add_member(*colnos, var->varattno);
1227 		return false;
1228 	}
1229 	if (IsA(node, Aggref) ||IsA(node, GroupingFunc))
1230 	{
1231 		/* do not descend into aggregate exprs */
1232 		return false;
1233 	}
1234 	return expression_tree_walker(node, find_unaggregated_cols_walker,
1235 								  (void *) colnos);
1236 }
1237 
1238 /*
1239  * (Re-)initialize the hash table(s) to empty.
1240  *
1241  * To implement hashed aggregation, we need a hashtable that stores a
1242  * representative tuple and an array of AggStatePerGroup structs for each
1243  * distinct set of GROUP BY column values.  We compute the hash key from the
1244  * GROUP BY columns.  The per-group data is allocated in lookup_hash_entry(),
1245  * for each entry.
1246  *
1247  * We have a separate hashtable and associated perhash data structure for each
1248  * grouping set for which we're doing hashing.
1249  *
1250  * The contents of the hash tables always live in the hashcontext's per-tuple
1251  * memory context (there is only one of these for all tables together, since
1252  * they are all reset at the same time).
1253  */
1254 static void
build_hash_table(AggState * aggstate)1255 build_hash_table(AggState *aggstate)
1256 {
1257 	MemoryContext tmpmem = aggstate->tmpcontext->ecxt_per_tuple_memory;
1258 	Size		additionalsize;
1259 	int			i;
1260 
1261 	Assert(aggstate->aggstrategy == AGG_HASHED || aggstate->aggstrategy == AGG_MIXED);
1262 
1263 	additionalsize = aggstate->numtrans * sizeof(AggStatePerGroupData);
1264 
1265 	for (i = 0; i < aggstate->num_hashes; ++i)
1266 	{
1267 		AggStatePerHash perhash = &aggstate->perhash[i];
1268 
1269 		Assert(perhash->aggnode->numGroups > 0);
1270 
1271 		if (perhash->hashtable)
1272 			ResetTupleHashTable(perhash->hashtable);
1273 		else
1274 			perhash->hashtable = BuildTupleHashTableExt(&aggstate->ss.ps,
1275 														perhash->hashslot->tts_tupleDescriptor,
1276 														perhash->numCols,
1277 														perhash->hashGrpColIdxHash,
1278 														perhash->eqfuncoids,
1279 														perhash->hashfunctions,
1280 														perhash->aggnode->grpCollations,
1281 														perhash->aggnode->numGroups,
1282 														additionalsize,
1283 														aggstate->ss.ps.state->es_query_cxt,
1284 														aggstate->hashcontext->ecxt_per_tuple_memory,
1285 														tmpmem,
1286 														DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit));
1287 	}
1288 }
1289 
1290 /*
1291  * Compute columns that actually need to be stored in hashtable entries.  The
1292  * incoming tuples from the child plan node will contain grouping columns,
1293  * other columns referenced in our targetlist and qual, columns used to
1294  * compute the aggregate functions, and perhaps just junk columns we don't use
1295  * at all.  Only columns of the first two types need to be stored in the
1296  * hashtable, and getting rid of the others can make the table entries
1297  * significantly smaller.  The hashtable only contains the relevant columns,
1298  * and is packed/unpacked in lookup_hash_entry() / agg_retrieve_hash_table()
1299  * into the format of the normal input descriptor.
1300  *
1301  * Additional columns, in addition to the columns grouped by, come from two
1302  * sources: Firstly functionally dependent columns that we don't need to group
1303  * by themselves, and secondly ctids for row-marks.
1304  *
1305  * To eliminate duplicates, we build a bitmapset of the needed columns, and
1306  * then build an array of the columns included in the hashtable. We might
1307  * still have duplicates if the passed-in grpColIdx has them, which can happen
1308  * in edge cases from semijoins/distinct; these can't always be removed,
1309  * because it's not certain that the duplicate cols will be using the same
1310  * hash function.
1311  *
1312  * Note that the array is preserved over ExecReScanAgg, so we allocate it in
1313  * the per-query context (unlike the hash table itself).
1314  */
1315 static void
find_hash_columns(AggState * aggstate)1316 find_hash_columns(AggState *aggstate)
1317 {
1318 	Bitmapset  *base_colnos;
1319 	List	   *outerTlist = outerPlanState(aggstate)->plan->targetlist;
1320 	int			numHashes = aggstate->num_hashes;
1321 	EState	   *estate = aggstate->ss.ps.state;
1322 	int			j;
1323 
1324 	/* Find Vars that will be needed in tlist and qual */
1325 	base_colnos = find_unaggregated_cols(aggstate);
1326 
1327 	for (j = 0; j < numHashes; ++j)
1328 	{
1329 		AggStatePerHash perhash = &aggstate->perhash[j];
1330 		Bitmapset  *colnos = bms_copy(base_colnos);
1331 		AttrNumber *grpColIdx = perhash->aggnode->grpColIdx;
1332 		List	   *hashTlist = NIL;
1333 		TupleDesc	hashDesc;
1334 		int			maxCols;
1335 		int			i;
1336 
1337 		perhash->largestGrpColIdx = 0;
1338 
1339 		/*
1340 		 * If we're doing grouping sets, then some Vars might be referenced in
1341 		 * tlist/qual for the benefit of other grouping sets, but not needed
1342 		 * when hashing; i.e. prepare_projection_slot will null them out, so
1343 		 * there'd be no point storing them.  Use prepare_projection_slot's
1344 		 * logic to determine which.
1345 		 */
1346 		if (aggstate->phases[0].grouped_cols)
1347 		{
1348 			Bitmapset  *grouped_cols = aggstate->phases[0].grouped_cols[j];
1349 			ListCell   *lc;
1350 
1351 			foreach(lc, aggstate->all_grouped_cols)
1352 			{
1353 				int			attnum = lfirst_int(lc);
1354 
1355 				if (!bms_is_member(attnum, grouped_cols))
1356 					colnos = bms_del_member(colnos, attnum);
1357 			}
1358 		}
1359 
1360 		/*
1361 		 * Compute maximum number of input columns accounting for possible
1362 		 * duplications in the grpColIdx array, which can happen in some edge
1363 		 * cases where HashAggregate was generated as part of a semijoin or a
1364 		 * DISTINCT.
1365 		 */
1366 		maxCols = bms_num_members(colnos) + perhash->numCols;
1367 
1368 		perhash->hashGrpColIdxInput =
1369 			palloc(maxCols * sizeof(AttrNumber));
1370 		perhash->hashGrpColIdxHash =
1371 			palloc(perhash->numCols * sizeof(AttrNumber));
1372 
1373 		/* Add all the grouping columns to colnos */
1374 		for (i = 0; i < perhash->numCols; i++)
1375 			colnos = bms_add_member(colnos, grpColIdx[i]);
1376 
1377 		/*
1378 		 * First build mapping for columns directly hashed. These are the
1379 		 * first, because they'll be accessed when computing hash values and
1380 		 * comparing tuples for exact matches. We also build simple mapping
1381 		 * for execGrouping, so it knows where to find the to-be-hashed /
1382 		 * compared columns in the input.
1383 		 */
1384 		for (i = 0; i < perhash->numCols; i++)
1385 		{
1386 			perhash->hashGrpColIdxInput[i] = grpColIdx[i];
1387 			perhash->hashGrpColIdxHash[i] = i + 1;
1388 			perhash->numhashGrpCols++;
1389 			/* delete already mapped columns */
1390 			bms_del_member(colnos, grpColIdx[i]);
1391 		}
1392 
1393 		/* and add the remaining columns */
1394 		while ((i = bms_first_member(colnos)) >= 0)
1395 		{
1396 			perhash->hashGrpColIdxInput[perhash->numhashGrpCols] = i;
1397 			perhash->numhashGrpCols++;
1398 		}
1399 
1400 		/* and build a tuple descriptor for the hashtable */
1401 		for (i = 0; i < perhash->numhashGrpCols; i++)
1402 		{
1403 			int			varNumber = perhash->hashGrpColIdxInput[i] - 1;
1404 
1405 			hashTlist = lappend(hashTlist, list_nth(outerTlist, varNumber));
1406 			perhash->largestGrpColIdx =
1407 				Max(varNumber + 1, perhash->largestGrpColIdx);
1408 		}
1409 
1410 		hashDesc = ExecTypeFromTL(hashTlist);
1411 
1412 		execTuplesHashPrepare(perhash->numCols,
1413 							  perhash->aggnode->grpOperators,
1414 							  &perhash->eqfuncoids,
1415 							  &perhash->hashfunctions);
1416 		perhash->hashslot =
1417 			ExecAllocTableSlot(&estate->es_tupleTable, hashDesc,
1418 							   &TTSOpsMinimalTuple);
1419 
1420 		list_free(hashTlist);
1421 		bms_free(colnos);
1422 	}
1423 
1424 	bms_free(base_colnos);
1425 }
1426 
1427 /*
1428  * Estimate per-hash-table-entry overhead for the planner.
1429  *
1430  * Note that the estimate does not include space for pass-by-reference
1431  * transition data values, nor for the representative tuple of each group.
1432  * Nor does this account of the target fill-factor and growth policy of the
1433  * hash table.
1434  */
1435 Size
hash_agg_entry_size(int numAggs)1436 hash_agg_entry_size(int numAggs)
1437 {
1438 	Size		entrysize;
1439 
1440 	/* This must match build_hash_table */
1441 	entrysize = sizeof(TupleHashEntryData) +
1442 		numAggs * sizeof(AggStatePerGroupData);
1443 	entrysize = MAXALIGN(entrysize);
1444 
1445 	return entrysize;
1446 }
1447 
1448 /*
1449  * Find or create a hashtable entry for the tuple group containing the current
1450  * tuple (already set in tmpcontext's outertuple slot), in the current grouping
1451  * set (which the caller must have selected - note that initialize_aggregate
1452  * depends on this).
1453  *
1454  * When called, CurrentMemoryContext should be the per-query context.
1455  */
1456 static TupleHashEntryData *
lookup_hash_entry(AggState * aggstate)1457 lookup_hash_entry(AggState *aggstate)
1458 {
1459 	TupleTableSlot *inputslot = aggstate->tmpcontext->ecxt_outertuple;
1460 	AggStatePerHash perhash = &aggstate->perhash[aggstate->current_set];
1461 	TupleTableSlot *hashslot = perhash->hashslot;
1462 	TupleHashEntryData *entry;
1463 	bool		isnew;
1464 	int			i;
1465 
1466 	/* transfer just the needed columns into hashslot */
1467 	slot_getsomeattrs(inputslot, perhash->largestGrpColIdx);
1468 	ExecClearTuple(hashslot);
1469 
1470 	for (i = 0; i < perhash->numhashGrpCols; i++)
1471 	{
1472 		int			varNumber = perhash->hashGrpColIdxInput[i] - 1;
1473 
1474 		hashslot->tts_values[i] = inputslot->tts_values[varNumber];
1475 		hashslot->tts_isnull[i] = inputslot->tts_isnull[varNumber];
1476 	}
1477 	ExecStoreVirtualTuple(hashslot);
1478 
1479 	/* find or create the hashtable entry using the filtered tuple */
1480 	entry = LookupTupleHashEntry(perhash->hashtable, hashslot, &isnew);
1481 
1482 	if (isnew)
1483 	{
1484 		AggStatePerGroup pergroup;
1485 		int			transno;
1486 
1487 		pergroup = (AggStatePerGroup)
1488 			MemoryContextAlloc(perhash->hashtable->tablecxt,
1489 							   sizeof(AggStatePerGroupData) * aggstate->numtrans);
1490 		entry->additional = pergroup;
1491 
1492 		/*
1493 		 * Initialize aggregates for new tuple group, lookup_hash_entries()
1494 		 * already has selected the relevant grouping set.
1495 		 */
1496 		for (transno = 0; transno < aggstate->numtrans; transno++)
1497 		{
1498 			AggStatePerTrans pertrans = &aggstate->pertrans[transno];
1499 			AggStatePerGroup pergroupstate = &pergroup[transno];
1500 
1501 			initialize_aggregate(aggstate, pertrans, pergroupstate);
1502 		}
1503 	}
1504 
1505 	return entry;
1506 }
1507 
1508 /*
1509  * Look up hash entries for the current tuple in all hashed grouping sets,
1510  * returning an array of pergroup pointers suitable for advance_aggregates.
1511  *
1512  * Be aware that lookup_hash_entry can reset the tmpcontext.
1513  */
1514 static void
lookup_hash_entries(AggState * aggstate)1515 lookup_hash_entries(AggState *aggstate)
1516 {
1517 	int			numHashes = aggstate->num_hashes;
1518 	AggStatePerGroup *pergroup = aggstate->hash_pergroup;
1519 	int			setno;
1520 
1521 	for (setno = 0; setno < numHashes; setno++)
1522 	{
1523 		select_current_set(aggstate, setno, true);
1524 		pergroup[setno] = lookup_hash_entry(aggstate)->additional;
1525 	}
1526 }
1527 
1528 /*
1529  * ExecAgg -
1530  *
1531  *	  ExecAgg receives tuples from its outer subplan and aggregates over
1532  *	  the appropriate attribute for each aggregate function use (Aggref
1533  *	  node) appearing in the targetlist or qual of the node.  The number
1534  *	  of tuples to aggregate over depends on whether grouped or plain
1535  *	  aggregation is selected.  In grouped aggregation, we produce a result
1536  *	  row for each group; in plain aggregation there's a single result row
1537  *	  for the whole query.  In either case, the value of each aggregate is
1538  *	  stored in the expression context to be used when ExecProject evaluates
1539  *	  the result tuple.
1540  */
1541 static TupleTableSlot *
ExecAgg(PlanState * pstate)1542 ExecAgg(PlanState *pstate)
1543 {
1544 	AggState   *node = castNode(AggState, pstate);
1545 	TupleTableSlot *result = NULL;
1546 
1547 	CHECK_FOR_INTERRUPTS();
1548 
1549 	if (!node->agg_done)
1550 	{
1551 		/* Dispatch based on strategy */
1552 		switch (node->phase->aggstrategy)
1553 		{
1554 			case AGG_HASHED:
1555 				if (!node->table_filled)
1556 					agg_fill_hash_table(node);
1557 				/* FALLTHROUGH */
1558 			case AGG_MIXED:
1559 				result = agg_retrieve_hash_table(node);
1560 				break;
1561 			case AGG_PLAIN:
1562 			case AGG_SORTED:
1563 				result = agg_retrieve_direct(node);
1564 				break;
1565 		}
1566 
1567 		if (!TupIsNull(result))
1568 			return result;
1569 	}
1570 
1571 	return NULL;
1572 }
1573 
1574 /*
1575  * ExecAgg for non-hashed case
1576  */
1577 static TupleTableSlot *
agg_retrieve_direct(AggState * aggstate)1578 agg_retrieve_direct(AggState *aggstate)
1579 {
1580 	Agg		   *node = aggstate->phase->aggnode;
1581 	ExprContext *econtext;
1582 	ExprContext *tmpcontext;
1583 	AggStatePerAgg peragg;
1584 	AggStatePerGroup *pergroups;
1585 	TupleTableSlot *outerslot;
1586 	TupleTableSlot *firstSlot;
1587 	TupleTableSlot *result;
1588 	bool		hasGroupingSets = aggstate->phase->numsets > 0;
1589 	int			numGroupingSets = Max(aggstate->phase->numsets, 1);
1590 	int			currentSet;
1591 	int			nextSetSize;
1592 	int			numReset;
1593 	int			i;
1594 
1595 	/*
1596 	 * get state info from node
1597 	 *
1598 	 * econtext is the per-output-tuple expression context
1599 	 *
1600 	 * tmpcontext is the per-input-tuple expression context
1601 	 */
1602 	econtext = aggstate->ss.ps.ps_ExprContext;
1603 	tmpcontext = aggstate->tmpcontext;
1604 
1605 	peragg = aggstate->peragg;
1606 	pergroups = aggstate->pergroups;
1607 	firstSlot = aggstate->ss.ss_ScanTupleSlot;
1608 
1609 	/*
1610 	 * We loop retrieving groups until we find one matching
1611 	 * aggstate->ss.ps.qual
1612 	 *
1613 	 * For grouping sets, we have the invariant that aggstate->projected_set
1614 	 * is either -1 (initial call) or the index (starting from 0) in
1615 	 * gset_lengths for the group we just completed (either by projecting a
1616 	 * row or by discarding it in the qual).
1617 	 */
1618 	while (!aggstate->agg_done)
1619 	{
1620 		/*
1621 		 * Clear the per-output-tuple context for each group, as well as
1622 		 * aggcontext (which contains any pass-by-ref transvalues of the old
1623 		 * group).  Some aggregate functions store working state in child
1624 		 * contexts; those now get reset automatically without us needing to
1625 		 * do anything special.
1626 		 *
1627 		 * We use ReScanExprContext not just ResetExprContext because we want
1628 		 * any registered shutdown callbacks to be called.  That allows
1629 		 * aggregate functions to ensure they've cleaned up any non-memory
1630 		 * resources.
1631 		 */
1632 		ReScanExprContext(econtext);
1633 
1634 		/*
1635 		 * Determine how many grouping sets need to be reset at this boundary.
1636 		 */
1637 		if (aggstate->projected_set >= 0 &&
1638 			aggstate->projected_set < numGroupingSets)
1639 			numReset = aggstate->projected_set + 1;
1640 		else
1641 			numReset = numGroupingSets;
1642 
1643 		/*
1644 		 * numReset can change on a phase boundary, but that's OK; we want to
1645 		 * reset the contexts used in _this_ phase, and later, after possibly
1646 		 * changing phase, initialize the right number of aggregates for the
1647 		 * _new_ phase.
1648 		 */
1649 
1650 		for (i = 0; i < numReset; i++)
1651 		{
1652 			ReScanExprContext(aggstate->aggcontexts[i]);
1653 		}
1654 
1655 		/*
1656 		 * Check if input is complete and there are no more groups to project
1657 		 * in this phase; move to next phase or mark as done.
1658 		 */
1659 		if (aggstate->input_done == true &&
1660 			aggstate->projected_set >= (numGroupingSets - 1))
1661 		{
1662 			if (aggstate->current_phase < aggstate->numphases - 1)
1663 			{
1664 				initialize_phase(aggstate, aggstate->current_phase + 1);
1665 				aggstate->input_done = false;
1666 				aggstate->projected_set = -1;
1667 				numGroupingSets = Max(aggstate->phase->numsets, 1);
1668 				node = aggstate->phase->aggnode;
1669 				numReset = numGroupingSets;
1670 			}
1671 			else if (aggstate->aggstrategy == AGG_MIXED)
1672 			{
1673 				/*
1674 				 * Mixed mode; we've output all the grouped stuff and have
1675 				 * full hashtables, so switch to outputting those.
1676 				 */
1677 				initialize_phase(aggstate, 0);
1678 				aggstate->table_filled = true;
1679 				ResetTupleHashIterator(aggstate->perhash[0].hashtable,
1680 									   &aggstate->perhash[0].hashiter);
1681 				select_current_set(aggstate, 0, true);
1682 				return agg_retrieve_hash_table(aggstate);
1683 			}
1684 			else
1685 			{
1686 				aggstate->agg_done = true;
1687 				break;
1688 			}
1689 		}
1690 
1691 		/*
1692 		 * Get the number of columns in the next grouping set after the last
1693 		 * projected one (if any). This is the number of columns to compare to
1694 		 * see if we reached the boundary of that set too.
1695 		 */
1696 		if (aggstate->projected_set >= 0 &&
1697 			aggstate->projected_set < (numGroupingSets - 1))
1698 			nextSetSize = aggstate->phase->gset_lengths[aggstate->projected_set + 1];
1699 		else
1700 			nextSetSize = 0;
1701 
1702 		/*----------
1703 		 * If a subgroup for the current grouping set is present, project it.
1704 		 *
1705 		 * We have a new group if:
1706 		 *	- we're out of input but haven't projected all grouping sets
1707 		 *	  (checked above)
1708 		 * OR
1709 		 *	  - we already projected a row that wasn't from the last grouping
1710 		 *		set
1711 		 *	  AND
1712 		 *	  - the next grouping set has at least one grouping column (since
1713 		 *		empty grouping sets project only once input is exhausted)
1714 		 *	  AND
1715 		 *	  - the previous and pending rows differ on the grouping columns
1716 		 *		of the next grouping set
1717 		 *----------
1718 		 */
1719 		tmpcontext->ecxt_innertuple = econtext->ecxt_outertuple;
1720 		if (aggstate->input_done ||
1721 			(node->aggstrategy != AGG_PLAIN &&
1722 			 aggstate->projected_set != -1 &&
1723 			 aggstate->projected_set < (numGroupingSets - 1) &&
1724 			 nextSetSize > 0 &&
1725 			 !ExecQualAndReset(aggstate->phase->eqfunctions[nextSetSize - 1],
1726 							   tmpcontext)))
1727 		{
1728 			aggstate->projected_set += 1;
1729 
1730 			Assert(aggstate->projected_set < numGroupingSets);
1731 			Assert(nextSetSize > 0 || aggstate->input_done);
1732 		}
1733 		else
1734 		{
1735 			/*
1736 			 * We no longer care what group we just projected, the next
1737 			 * projection will always be the first (or only) grouping set
1738 			 * (unless the input proves to be empty).
1739 			 */
1740 			aggstate->projected_set = 0;
1741 
1742 			/*
1743 			 * If we don't already have the first tuple of the new group,
1744 			 * fetch it from the outer plan.
1745 			 */
1746 			if (aggstate->grp_firstTuple == NULL)
1747 			{
1748 				outerslot = fetch_input_tuple(aggstate);
1749 				if (!TupIsNull(outerslot))
1750 				{
1751 					/*
1752 					 * Make a copy of the first input tuple; we will use this
1753 					 * for comparisons (in group mode) and for projection.
1754 					 */
1755 					aggstate->grp_firstTuple = ExecCopySlotHeapTuple(outerslot);
1756 				}
1757 				else
1758 				{
1759 					/* outer plan produced no tuples at all */
1760 					if (hasGroupingSets)
1761 					{
1762 						/*
1763 						 * If there was no input at all, we need to project
1764 						 * rows only if there are grouping sets of size 0.
1765 						 * Note that this implies that there can't be any
1766 						 * references to ungrouped Vars, which would otherwise
1767 						 * cause issues with the empty output slot.
1768 						 *
1769 						 * XXX: This is no longer true, we currently deal with
1770 						 * this in finalize_aggregates().
1771 						 */
1772 						aggstate->input_done = true;
1773 
1774 						while (aggstate->phase->gset_lengths[aggstate->projected_set] > 0)
1775 						{
1776 							aggstate->projected_set += 1;
1777 							if (aggstate->projected_set >= numGroupingSets)
1778 							{
1779 								/*
1780 								 * We can't set agg_done here because we might
1781 								 * have more phases to do, even though the
1782 								 * input is empty. So we need to restart the
1783 								 * whole outer loop.
1784 								 */
1785 								break;
1786 							}
1787 						}
1788 
1789 						if (aggstate->projected_set >= numGroupingSets)
1790 							continue;
1791 					}
1792 					else
1793 					{
1794 						aggstate->agg_done = true;
1795 						/* If we are grouping, we should produce no tuples too */
1796 						if (node->aggstrategy != AGG_PLAIN)
1797 							return NULL;
1798 					}
1799 				}
1800 			}
1801 
1802 			/*
1803 			 * Initialize working state for a new input tuple group.
1804 			 */
1805 			initialize_aggregates(aggstate, pergroups, numReset);
1806 
1807 			if (aggstate->grp_firstTuple != NULL)
1808 			{
1809 				/*
1810 				 * Store the copied first input tuple in the tuple table slot
1811 				 * reserved for it.  The tuple will be deleted when it is
1812 				 * cleared from the slot.
1813 				 */
1814 				ExecForceStoreHeapTuple(aggstate->grp_firstTuple,
1815 										firstSlot, true);
1816 				aggstate->grp_firstTuple = NULL;	/* don't keep two pointers */
1817 
1818 				/* set up for first advance_aggregates call */
1819 				tmpcontext->ecxt_outertuple = firstSlot;
1820 
1821 				/*
1822 				 * Process each outer-plan tuple, and then fetch the next one,
1823 				 * until we exhaust the outer plan or cross a group boundary.
1824 				 */
1825 				for (;;)
1826 				{
1827 					/*
1828 					 * During phase 1 only of a mixed agg, we need to update
1829 					 * hashtables as well in advance_aggregates.
1830 					 */
1831 					if (aggstate->aggstrategy == AGG_MIXED &&
1832 						aggstate->current_phase == 1)
1833 					{
1834 						lookup_hash_entries(aggstate);
1835 					}
1836 
1837 					/* Advance the aggregates (or combine functions) */
1838 					advance_aggregates(aggstate);
1839 
1840 					/* Reset per-input-tuple context after each tuple */
1841 					ResetExprContext(tmpcontext);
1842 
1843 					outerslot = fetch_input_tuple(aggstate);
1844 					if (TupIsNull(outerslot))
1845 					{
1846 						/* no more outer-plan tuples available */
1847 						if (hasGroupingSets)
1848 						{
1849 							aggstate->input_done = true;
1850 							break;
1851 						}
1852 						else
1853 						{
1854 							aggstate->agg_done = true;
1855 							break;
1856 						}
1857 					}
1858 					/* set up for next advance_aggregates call */
1859 					tmpcontext->ecxt_outertuple = outerslot;
1860 
1861 					/*
1862 					 * If we are grouping, check whether we've crossed a group
1863 					 * boundary.
1864 					 */
1865 					if (node->aggstrategy != AGG_PLAIN)
1866 					{
1867 						tmpcontext->ecxt_innertuple = firstSlot;
1868 						if (!ExecQual(aggstate->phase->eqfunctions[node->numCols - 1],
1869 									  tmpcontext))
1870 						{
1871 							aggstate->grp_firstTuple = ExecCopySlotHeapTuple(outerslot);
1872 							break;
1873 						}
1874 					}
1875 				}
1876 			}
1877 
1878 			/*
1879 			 * Use the representative input tuple for any references to
1880 			 * non-aggregated input columns in aggregate direct args, the node
1881 			 * qual, and the tlist.  (If we are not grouping, and there are no
1882 			 * input rows at all, we will come here with an empty firstSlot
1883 			 * ... but if not grouping, there can't be any references to
1884 			 * non-aggregated input columns, so no problem.)
1885 			 */
1886 			econtext->ecxt_outertuple = firstSlot;
1887 		}
1888 
1889 		Assert(aggstate->projected_set >= 0);
1890 
1891 		currentSet = aggstate->projected_set;
1892 
1893 		prepare_projection_slot(aggstate, econtext->ecxt_outertuple, currentSet);
1894 
1895 		select_current_set(aggstate, currentSet, false);
1896 
1897 		finalize_aggregates(aggstate,
1898 							peragg,
1899 							pergroups[currentSet]);
1900 
1901 		/*
1902 		 * If there's no row to project right now, we must continue rather
1903 		 * than returning a null since there might be more groups.
1904 		 */
1905 		result = project_aggregates(aggstate);
1906 		if (result)
1907 			return result;
1908 	}
1909 
1910 	/* No more groups */
1911 	return NULL;
1912 }
1913 
1914 /*
1915  * ExecAgg for hashed case: read input and build hash table
1916  */
1917 static void
agg_fill_hash_table(AggState * aggstate)1918 agg_fill_hash_table(AggState *aggstate)
1919 {
1920 	TupleTableSlot *outerslot;
1921 	ExprContext *tmpcontext = aggstate->tmpcontext;
1922 
1923 	/*
1924 	 * Process each outer-plan tuple, and then fetch the next one, until we
1925 	 * exhaust the outer plan.
1926 	 */
1927 	for (;;)
1928 	{
1929 		outerslot = fetch_input_tuple(aggstate);
1930 		if (TupIsNull(outerslot))
1931 			break;
1932 
1933 		/* set up for lookup_hash_entries and advance_aggregates */
1934 		tmpcontext->ecxt_outertuple = outerslot;
1935 
1936 		/* Find or build hashtable entries */
1937 		lookup_hash_entries(aggstate);
1938 
1939 		/* Advance the aggregates (or combine functions) */
1940 		advance_aggregates(aggstate);
1941 
1942 		/*
1943 		 * Reset per-input-tuple context after each tuple, but note that the
1944 		 * hash lookups do this too
1945 		 */
1946 		ResetExprContext(aggstate->tmpcontext);
1947 	}
1948 
1949 	aggstate->table_filled = true;
1950 	/* Initialize to walk the first hash table */
1951 	select_current_set(aggstate, 0, true);
1952 	ResetTupleHashIterator(aggstate->perhash[0].hashtable,
1953 						   &aggstate->perhash[0].hashiter);
1954 }
1955 
1956 /*
1957  * ExecAgg for hashed case: retrieving groups from hash table
1958  */
1959 static TupleTableSlot *
agg_retrieve_hash_table(AggState * aggstate)1960 agg_retrieve_hash_table(AggState *aggstate)
1961 {
1962 	ExprContext *econtext;
1963 	AggStatePerAgg peragg;
1964 	AggStatePerGroup pergroup;
1965 	TupleHashEntryData *entry;
1966 	TupleTableSlot *firstSlot;
1967 	TupleTableSlot *result;
1968 	AggStatePerHash perhash;
1969 
1970 	/*
1971 	 * get state info from node.
1972 	 *
1973 	 * econtext is the per-output-tuple expression context.
1974 	 */
1975 	econtext = aggstate->ss.ps.ps_ExprContext;
1976 	peragg = aggstate->peragg;
1977 	firstSlot = aggstate->ss.ss_ScanTupleSlot;
1978 
1979 	/*
1980 	 * Note that perhash (and therefore anything accessed through it) can
1981 	 * change inside the loop, as we change between grouping sets.
1982 	 */
1983 	perhash = &aggstate->perhash[aggstate->current_set];
1984 
1985 	/*
1986 	 * We loop retrieving groups until we find one satisfying
1987 	 * aggstate->ss.ps.qual
1988 	 */
1989 	while (!aggstate->agg_done)
1990 	{
1991 		TupleTableSlot *hashslot = perhash->hashslot;
1992 		int			i;
1993 
1994 		CHECK_FOR_INTERRUPTS();
1995 
1996 		/*
1997 		 * Find the next entry in the hash table
1998 		 */
1999 		entry = ScanTupleHashTable(perhash->hashtable, &perhash->hashiter);
2000 		if (entry == NULL)
2001 		{
2002 			int			nextset = aggstate->current_set + 1;
2003 
2004 			if (nextset < aggstate->num_hashes)
2005 			{
2006 				/*
2007 				 * Switch to next grouping set, reinitialize, and restart the
2008 				 * loop.
2009 				 */
2010 				select_current_set(aggstate, nextset, true);
2011 
2012 				perhash = &aggstate->perhash[aggstate->current_set];
2013 
2014 				ResetTupleHashIterator(perhash->hashtable, &perhash->hashiter);
2015 
2016 				continue;
2017 			}
2018 			else
2019 			{
2020 				/* No more hashtables, so done */
2021 				aggstate->agg_done = true;
2022 				return NULL;
2023 			}
2024 		}
2025 
2026 		/*
2027 		 * Clear the per-output-tuple context for each group
2028 		 *
2029 		 * We intentionally don't use ReScanExprContext here; if any aggs have
2030 		 * registered shutdown callbacks, they mustn't be called yet, since we
2031 		 * might not be done with that agg.
2032 		 */
2033 		ResetExprContext(econtext);
2034 
2035 		/*
2036 		 * Transform representative tuple back into one with the right
2037 		 * columns.
2038 		 */
2039 		ExecStoreMinimalTuple(entry->firstTuple, hashslot, false);
2040 		slot_getallattrs(hashslot);
2041 
2042 		ExecClearTuple(firstSlot);
2043 		memset(firstSlot->tts_isnull, true,
2044 			   firstSlot->tts_tupleDescriptor->natts * sizeof(bool));
2045 
2046 		for (i = 0; i < perhash->numhashGrpCols; i++)
2047 		{
2048 			int			varNumber = perhash->hashGrpColIdxInput[i] - 1;
2049 
2050 			firstSlot->tts_values[varNumber] = hashslot->tts_values[i];
2051 			firstSlot->tts_isnull[varNumber] = hashslot->tts_isnull[i];
2052 		}
2053 		ExecStoreVirtualTuple(firstSlot);
2054 
2055 		pergroup = (AggStatePerGroup) entry->additional;
2056 
2057 		/*
2058 		 * Use the representative input tuple for any references to
2059 		 * non-aggregated input columns in the qual and tlist.
2060 		 */
2061 		econtext->ecxt_outertuple = firstSlot;
2062 
2063 		prepare_projection_slot(aggstate,
2064 								econtext->ecxt_outertuple,
2065 								aggstate->current_set);
2066 
2067 		finalize_aggregates(aggstate, peragg, pergroup);
2068 
2069 		result = project_aggregates(aggstate);
2070 		if (result)
2071 			return result;
2072 	}
2073 
2074 	/* No more groups */
2075 	return NULL;
2076 }
2077 
2078 /* -----------------
2079  * ExecInitAgg
2080  *
2081  *	Creates the run-time information for the agg node produced by the
2082  *	planner and initializes its outer subtree.
2083  *
2084  * -----------------
2085  */
2086 AggState *
ExecInitAgg(Agg * node,EState * estate,int eflags)2087 ExecInitAgg(Agg *node, EState *estate, int eflags)
2088 {
2089 	AggState   *aggstate;
2090 	AggStatePerAgg peraggs;
2091 	AggStatePerTrans pertransstates;
2092 	AggStatePerGroup *pergroups;
2093 	Plan	   *outerPlan;
2094 	ExprContext *econtext;
2095 	TupleDesc	scanDesc;
2096 	int			numaggs,
2097 				transno,
2098 				aggno;
2099 	int			phase;
2100 	int			phaseidx;
2101 	ListCell   *l;
2102 	Bitmapset  *all_grouped_cols = NULL;
2103 	int			numGroupingSets = 1;
2104 	int			numPhases;
2105 	int			numHashes;
2106 	int			i = 0;
2107 	int			j = 0;
2108 	bool		use_hashing = (node->aggstrategy == AGG_HASHED ||
2109 							   node->aggstrategy == AGG_MIXED);
2110 
2111 	/* check for unsupported flags */
2112 	Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
2113 
2114 	/*
2115 	 * create state structure
2116 	 */
2117 	aggstate = makeNode(AggState);
2118 	aggstate->ss.ps.plan = (Plan *) node;
2119 	aggstate->ss.ps.state = estate;
2120 	aggstate->ss.ps.ExecProcNode = ExecAgg;
2121 
2122 	aggstate->aggs = NIL;
2123 	aggstate->numaggs = 0;
2124 	aggstate->numtrans = 0;
2125 	aggstate->aggstrategy = node->aggstrategy;
2126 	aggstate->aggsplit = node->aggsplit;
2127 	aggstate->maxsets = 0;
2128 	aggstate->projected_set = -1;
2129 	aggstate->current_set = 0;
2130 	aggstate->peragg = NULL;
2131 	aggstate->pertrans = NULL;
2132 	aggstate->curperagg = NULL;
2133 	aggstate->curpertrans = NULL;
2134 	aggstate->input_done = false;
2135 	aggstate->agg_done = false;
2136 	aggstate->pergroups = NULL;
2137 	aggstate->grp_firstTuple = NULL;
2138 	aggstate->sort_in = NULL;
2139 	aggstate->sort_out = NULL;
2140 
2141 	/*
2142 	 * phases[0] always exists, but is dummy in sorted/plain mode
2143 	 */
2144 	numPhases = (use_hashing ? 1 : 2);
2145 	numHashes = (use_hashing ? 1 : 0);
2146 
2147 	/*
2148 	 * Calculate the maximum number of grouping sets in any phase; this
2149 	 * determines the size of some allocations.  Also calculate the number of
2150 	 * phases, since all hashed/mixed nodes contribute to only a single phase.
2151 	 */
2152 	if (node->groupingSets)
2153 	{
2154 		numGroupingSets = list_length(node->groupingSets);
2155 
2156 		foreach(l, node->chain)
2157 		{
2158 			Agg		   *agg = lfirst(l);
2159 
2160 			numGroupingSets = Max(numGroupingSets,
2161 								  list_length(agg->groupingSets));
2162 
2163 			/*
2164 			 * additional AGG_HASHED aggs become part of phase 0, but all
2165 			 * others add an extra phase.
2166 			 */
2167 			if (agg->aggstrategy != AGG_HASHED)
2168 				++numPhases;
2169 			else
2170 				++numHashes;
2171 		}
2172 	}
2173 
2174 	aggstate->maxsets = numGroupingSets;
2175 	aggstate->numphases = numPhases;
2176 
2177 	aggstate->aggcontexts = (ExprContext **)
2178 		palloc0(sizeof(ExprContext *) * numGroupingSets);
2179 
2180 	/*
2181 	 * Create expression contexts.  We need three or more, one for
2182 	 * per-input-tuple processing, one for per-output-tuple processing, one
2183 	 * for all the hashtables, and one for each grouping set.  The per-tuple
2184 	 * memory context of the per-grouping-set ExprContexts (aggcontexts)
2185 	 * replaces the standalone memory context formerly used to hold transition
2186 	 * values.  We cheat a little by using ExecAssignExprContext() to build
2187 	 * all of them.
2188 	 *
2189 	 * NOTE: the details of what is stored in aggcontexts and what is stored
2190 	 * in the regular per-query memory context are driven by a simple
2191 	 * decision: we want to reset the aggcontext at group boundaries (if not
2192 	 * hashing) and in ExecReScanAgg to recover no-longer-wanted space.
2193 	 */
2194 	ExecAssignExprContext(estate, &aggstate->ss.ps);
2195 	aggstate->tmpcontext = aggstate->ss.ps.ps_ExprContext;
2196 
2197 	for (i = 0; i < numGroupingSets; ++i)
2198 	{
2199 		ExecAssignExprContext(estate, &aggstate->ss.ps);
2200 		aggstate->aggcontexts[i] = aggstate->ss.ps.ps_ExprContext;
2201 	}
2202 
2203 	if (use_hashing)
2204 	{
2205 		ExecAssignExprContext(estate, &aggstate->ss.ps);
2206 		aggstate->hashcontext = aggstate->ss.ps.ps_ExprContext;
2207 	}
2208 
2209 	ExecAssignExprContext(estate, &aggstate->ss.ps);
2210 
2211 	/*
2212 	 * Initialize child nodes.
2213 	 *
2214 	 * If we are doing a hashed aggregation then the child plan does not need
2215 	 * to handle REWIND efficiently; see ExecReScanAgg.
2216 	 */
2217 	if (node->aggstrategy == AGG_HASHED)
2218 		eflags &= ~EXEC_FLAG_REWIND;
2219 	outerPlan = outerPlan(node);
2220 	outerPlanState(aggstate) = ExecInitNode(outerPlan, estate, eflags);
2221 
2222 	/*
2223 	 * initialize source tuple type.
2224 	 */
2225 	aggstate->ss.ps.outerops =
2226 		ExecGetResultSlotOps(outerPlanState(&aggstate->ss),
2227 							 &aggstate->ss.ps.outeropsfixed);
2228 	aggstate->ss.ps.outeropsset = true;
2229 
2230 	ExecCreateScanSlotFromOuterPlan(estate, &aggstate->ss,
2231 									aggstate->ss.ps.outerops);
2232 	scanDesc = aggstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
2233 
2234 	/*
2235 	 * If there are more than two phases (including a potential dummy phase
2236 	 * 0), input will be resorted using tuplesort. Need a slot for that.
2237 	 */
2238 	if (numPhases > 2)
2239 	{
2240 		aggstate->sort_slot = ExecInitExtraTupleSlot(estate, scanDesc,
2241 													 &TTSOpsMinimalTuple);
2242 
2243 		/*
2244 		 * The output of the tuplesort, and the output from the outer child
2245 		 * might not use the same type of slot. In most cases the child will
2246 		 * be a Sort, and thus return a TTSOpsMinimalTuple type slot - but the
2247 		 * input can also be be presorted due an index, in which case it could
2248 		 * be a different type of slot.
2249 		 *
2250 		 * XXX: For efficiency it would be good to instead/additionally
2251 		 * generate expressions with corresponding settings of outerops* for
2252 		 * the individual phases - deforming is often a bottleneck for
2253 		 * aggregations with lots of rows per group. If there's multiple
2254 		 * sorts, we know that all but the first use TTSOpsMinimalTuple (via
2255 		 * the nodeAgg.c internal tuplesort).
2256 		 */
2257 		if (aggstate->ss.ps.outeropsfixed &&
2258 			aggstate->ss.ps.outerops != &TTSOpsMinimalTuple)
2259 			aggstate->ss.ps.outeropsfixed = false;
2260 	}
2261 
2262 	/*
2263 	 * Initialize result type, slot and projection.
2264 	 */
2265 	ExecInitResultTupleSlotTL(&aggstate->ss.ps, &TTSOpsVirtual);
2266 	ExecAssignProjectionInfo(&aggstate->ss.ps, NULL);
2267 
2268 	/*
2269 	 * initialize child expressions
2270 	 *
2271 	 * We expect the parser to have checked that no aggs contain other agg
2272 	 * calls in their arguments (and just to be sure, we verify it again while
2273 	 * initializing the plan node).  This would make no sense under SQL
2274 	 * semantics, and it's forbidden by the spec.  Because it is true, we
2275 	 * don't need to worry about evaluating the aggs in any particular order.
2276 	 *
2277 	 * Note: execExpr.c finds Aggrefs for us, and adds their AggrefExprState
2278 	 * nodes to aggstate->aggs.  Aggrefs in the qual are found here; Aggrefs
2279 	 * in the targetlist are found during ExecAssignProjectionInfo, below.
2280 	 */
2281 	aggstate->ss.ps.qual =
2282 		ExecInitQual(node->plan.qual, (PlanState *) aggstate);
2283 
2284 	/*
2285 	 * We should now have found all Aggrefs in the targetlist and quals.
2286 	 */
2287 	numaggs = aggstate->numaggs;
2288 	Assert(numaggs == list_length(aggstate->aggs));
2289 
2290 	/*
2291 	 * For each phase, prepare grouping set data and fmgr lookup data for
2292 	 * compare functions.  Accumulate all_grouped_cols in passing.
2293 	 */
2294 	aggstate->phases = palloc0(numPhases * sizeof(AggStatePerPhaseData));
2295 
2296 	aggstate->num_hashes = numHashes;
2297 	if (numHashes)
2298 	{
2299 		aggstate->perhash = palloc0(sizeof(AggStatePerHashData) * numHashes);
2300 		aggstate->phases[0].numsets = 0;
2301 		aggstate->phases[0].gset_lengths = palloc(numHashes * sizeof(int));
2302 		aggstate->phases[0].grouped_cols = palloc(numHashes * sizeof(Bitmapset *));
2303 	}
2304 
2305 	phase = 0;
2306 	for (phaseidx = 0; phaseidx <= list_length(node->chain); ++phaseidx)
2307 	{
2308 		Agg		   *aggnode;
2309 		Sort	   *sortnode;
2310 
2311 		if (phaseidx > 0)
2312 		{
2313 			aggnode = list_nth_node(Agg, node->chain, phaseidx - 1);
2314 			sortnode = castNode(Sort, aggnode->plan.lefttree);
2315 		}
2316 		else
2317 		{
2318 			aggnode = node;
2319 			sortnode = NULL;
2320 		}
2321 
2322 		Assert(phase <= 1 || sortnode);
2323 
2324 		if (aggnode->aggstrategy == AGG_HASHED
2325 			|| aggnode->aggstrategy == AGG_MIXED)
2326 		{
2327 			AggStatePerPhase phasedata = &aggstate->phases[0];
2328 			AggStatePerHash perhash;
2329 			Bitmapset  *cols = NULL;
2330 
2331 			Assert(phase == 0);
2332 			i = phasedata->numsets++;
2333 			perhash = &aggstate->perhash[i];
2334 
2335 			/* phase 0 always points to the "real" Agg in the hash case */
2336 			phasedata->aggnode = node;
2337 			phasedata->aggstrategy = node->aggstrategy;
2338 
2339 			/* but the actual Agg node representing this hash is saved here */
2340 			perhash->aggnode = aggnode;
2341 
2342 			phasedata->gset_lengths[i] = perhash->numCols = aggnode->numCols;
2343 
2344 			for (j = 0; j < aggnode->numCols; ++j)
2345 				cols = bms_add_member(cols, aggnode->grpColIdx[j]);
2346 
2347 			phasedata->grouped_cols[i] = cols;
2348 
2349 			all_grouped_cols = bms_add_members(all_grouped_cols, cols);
2350 			continue;
2351 		}
2352 		else
2353 		{
2354 			AggStatePerPhase phasedata = &aggstate->phases[++phase];
2355 			int			num_sets;
2356 
2357 			phasedata->numsets = num_sets = list_length(aggnode->groupingSets);
2358 
2359 			if (num_sets)
2360 			{
2361 				phasedata->gset_lengths = palloc(num_sets * sizeof(int));
2362 				phasedata->grouped_cols = palloc(num_sets * sizeof(Bitmapset *));
2363 
2364 				i = 0;
2365 				foreach(l, aggnode->groupingSets)
2366 				{
2367 					int			current_length = list_length(lfirst(l));
2368 					Bitmapset  *cols = NULL;
2369 
2370 					/* planner forces this to be correct */
2371 					for (j = 0; j < current_length; ++j)
2372 						cols = bms_add_member(cols, aggnode->grpColIdx[j]);
2373 
2374 					phasedata->grouped_cols[i] = cols;
2375 					phasedata->gset_lengths[i] = current_length;
2376 
2377 					++i;
2378 				}
2379 
2380 				all_grouped_cols = bms_add_members(all_grouped_cols,
2381 												   phasedata->grouped_cols[0]);
2382 			}
2383 			else
2384 			{
2385 				Assert(phaseidx == 0);
2386 
2387 				phasedata->gset_lengths = NULL;
2388 				phasedata->grouped_cols = NULL;
2389 			}
2390 
2391 			/*
2392 			 * If we are grouping, precompute fmgr lookup data for inner loop.
2393 			 */
2394 			if (aggnode->aggstrategy == AGG_SORTED)
2395 			{
2396 				int			i = 0;
2397 
2398 				Assert(aggnode->numCols > 0);
2399 
2400 				/*
2401 				 * Build a separate function for each subset of columns that
2402 				 * need to be compared.
2403 				 */
2404 				phasedata->eqfunctions =
2405 					(ExprState **) palloc0(aggnode->numCols * sizeof(ExprState *));
2406 
2407 				/* for each grouping set */
2408 				for (i = 0; i < phasedata->numsets; i++)
2409 				{
2410 					int			length = phasedata->gset_lengths[i];
2411 
2412 					if (phasedata->eqfunctions[length - 1] != NULL)
2413 						continue;
2414 
2415 					phasedata->eqfunctions[length - 1] =
2416 						execTuplesMatchPrepare(scanDesc,
2417 											   length,
2418 											   aggnode->grpColIdx,
2419 											   aggnode->grpOperators,
2420 											   aggnode->grpCollations,
2421 											   (PlanState *) aggstate);
2422 				}
2423 
2424 				/* and for all grouped columns, unless already computed */
2425 				if (phasedata->eqfunctions[aggnode->numCols - 1] == NULL)
2426 				{
2427 					phasedata->eqfunctions[aggnode->numCols - 1] =
2428 						execTuplesMatchPrepare(scanDesc,
2429 											   aggnode->numCols,
2430 											   aggnode->grpColIdx,
2431 											   aggnode->grpOperators,
2432 											   aggnode->grpCollations,
2433 											   (PlanState *) aggstate);
2434 				}
2435 			}
2436 
2437 			phasedata->aggnode = aggnode;
2438 			phasedata->aggstrategy = aggnode->aggstrategy;
2439 			phasedata->sortnode = sortnode;
2440 		}
2441 	}
2442 
2443 	/*
2444 	 * Convert all_grouped_cols to a descending-order list.
2445 	 */
2446 	i = -1;
2447 	while ((i = bms_next_member(all_grouped_cols, i)) >= 0)
2448 		aggstate->all_grouped_cols = lcons_int(i, aggstate->all_grouped_cols);
2449 
2450 	/*
2451 	 * Set up aggregate-result storage in the output expr context, and also
2452 	 * allocate my private per-agg working storage
2453 	 */
2454 	econtext = aggstate->ss.ps.ps_ExprContext;
2455 	econtext->ecxt_aggvalues = (Datum *) palloc0(sizeof(Datum) * numaggs);
2456 	econtext->ecxt_aggnulls = (bool *) palloc0(sizeof(bool) * numaggs);
2457 
2458 	peraggs = (AggStatePerAgg) palloc0(sizeof(AggStatePerAggData) * numaggs);
2459 	pertransstates = (AggStatePerTrans) palloc0(sizeof(AggStatePerTransData) * numaggs);
2460 
2461 	aggstate->peragg = peraggs;
2462 	aggstate->pertrans = pertransstates;
2463 
2464 
2465 	aggstate->all_pergroups =
2466 		(AggStatePerGroup *) palloc0(sizeof(AggStatePerGroup)
2467 									 * (numGroupingSets + numHashes));
2468 	pergroups = aggstate->all_pergroups;
2469 
2470 	if (node->aggstrategy != AGG_HASHED)
2471 	{
2472 		for (i = 0; i < numGroupingSets; i++)
2473 		{
2474 			pergroups[i] = (AggStatePerGroup) palloc0(sizeof(AggStatePerGroupData)
2475 													  * numaggs);
2476 		}
2477 
2478 		aggstate->pergroups = pergroups;
2479 		pergroups += numGroupingSets;
2480 	}
2481 
2482 	/*
2483 	 * Hashing can only appear in the initial phase.
2484 	 */
2485 	if (use_hashing)
2486 	{
2487 		/* this is an array of pointers, not structures */
2488 		aggstate->hash_pergroup = pergroups;
2489 
2490 		find_hash_columns(aggstate);
2491 
2492 		/* Skip massive memory allocation if we are just doing EXPLAIN */
2493 		if (!(eflags & EXEC_FLAG_EXPLAIN_ONLY))
2494 			build_hash_table(aggstate);
2495 
2496 		aggstate->table_filled = false;
2497 	}
2498 
2499 	/*
2500 	 * Initialize current phase-dependent values to initial phase. The initial
2501 	 * phase is 1 (first sort pass) for all strategies that use sorting (if
2502 	 * hashing is being done too, then phase 0 is processed last); but if only
2503 	 * hashing is being done, then phase 0 is all there is.
2504 	 */
2505 	if (node->aggstrategy == AGG_HASHED)
2506 	{
2507 		aggstate->current_phase = 0;
2508 		initialize_phase(aggstate, 0);
2509 		select_current_set(aggstate, 0, true);
2510 	}
2511 	else
2512 	{
2513 		aggstate->current_phase = 1;
2514 		initialize_phase(aggstate, 1);
2515 		select_current_set(aggstate, 0, false);
2516 	}
2517 
2518 	/* -----------------
2519 	 * Perform lookups of aggregate function info, and initialize the
2520 	 * unchanging fields of the per-agg and per-trans data.
2521 	 *
2522 	 * We try to optimize by detecting duplicate aggregate functions so that
2523 	 * their state and final values are re-used, rather than needlessly being
2524 	 * re-calculated independently. We also detect aggregates that are not
2525 	 * the same, but which can share the same transition state.
2526 	 *
2527 	 * Scenarios:
2528 	 *
2529 	 * 1. Identical aggregate function calls appear in the query:
2530 	 *
2531 	 *	  SELECT SUM(x) FROM ... HAVING SUM(x) > 0
2532 	 *
2533 	 *	  Since these aggregates are identical, we only need to calculate
2534 	 *	  the value once. Both aggregates will share the same 'aggno' value.
2535 	 *
2536 	 * 2. Two different aggregate functions appear in the query, but the
2537 	 *	  aggregates have the same arguments, transition functions and
2538 	 *	  initial values (and, presumably, different final functions):
2539 	 *
2540 	 *	  SELECT AVG(x), STDDEV(x) FROM ...
2541 	 *
2542 	 *	  In this case we must create a new peragg for the varying aggregate,
2543 	 *	  and we need to call the final functions separately, but we need
2544 	 *	  only run the transition function once.  (This requires that the
2545 	 *	  final functions be nondestructive of the transition state, but
2546 	 *	  that's required anyway for other reasons.)
2547 	 *
2548 	 * For either of these optimizations to be valid, all aggregate properties
2549 	 * used in the transition phase must be the same, including any modifiers
2550 	 * such as ORDER BY, DISTINCT and FILTER, and the arguments mustn't
2551 	 * contain any volatile functions.
2552 	 * -----------------
2553 	 */
2554 	aggno = -1;
2555 	transno = -1;
2556 	foreach(l, aggstate->aggs)
2557 	{
2558 		AggrefExprState *aggrefstate = (AggrefExprState *) lfirst(l);
2559 		Aggref	   *aggref = aggrefstate->aggref;
2560 		AggStatePerAgg peragg;
2561 		AggStatePerTrans pertrans;
2562 		int			existing_aggno;
2563 		int			existing_transno;
2564 		List	   *same_input_transnos;
2565 		Oid			inputTypes[FUNC_MAX_ARGS];
2566 		int			numArguments;
2567 		int			numDirectArgs;
2568 		HeapTuple	aggTuple;
2569 		Form_pg_aggregate aggform;
2570 		AclResult	aclresult;
2571 		Oid			transfn_oid,
2572 					finalfn_oid;
2573 		bool		shareable;
2574 		Oid			serialfn_oid,
2575 					deserialfn_oid;
2576 		Expr	   *finalfnexpr;
2577 		Oid			aggtranstype;
2578 		Datum		textInitVal;
2579 		Datum		initValue;
2580 		bool		initValueIsNull;
2581 
2582 		/* Planner should have assigned aggregate to correct level */
2583 		Assert(aggref->agglevelsup == 0);
2584 		/* ... and the split mode should match */
2585 		Assert(aggref->aggsplit == aggstate->aggsplit);
2586 
2587 		/* 1. Check for already processed aggs which can be re-used */
2588 		existing_aggno = find_compatible_peragg(aggref, aggstate, aggno,
2589 												&same_input_transnos);
2590 		if (existing_aggno != -1)
2591 		{
2592 			/*
2593 			 * Existing compatible agg found. so just point the Aggref to the
2594 			 * same per-agg struct.
2595 			 */
2596 			aggrefstate->aggno = existing_aggno;
2597 			continue;
2598 		}
2599 
2600 		/* Mark Aggref state node with assigned index in the result array */
2601 		peragg = &peraggs[++aggno];
2602 		peragg->aggref = aggref;
2603 		aggrefstate->aggno = aggno;
2604 
2605 		/* Fetch the pg_aggregate row */
2606 		aggTuple = SearchSysCache1(AGGFNOID,
2607 								   ObjectIdGetDatum(aggref->aggfnoid));
2608 		if (!HeapTupleIsValid(aggTuple))
2609 			elog(ERROR, "cache lookup failed for aggregate %u",
2610 				 aggref->aggfnoid);
2611 		aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
2612 
2613 		/* Check permission to call aggregate function */
2614 		aclresult = pg_proc_aclcheck(aggref->aggfnoid, GetUserId(),
2615 									 ACL_EXECUTE);
2616 		if (aclresult != ACLCHECK_OK)
2617 			aclcheck_error(aclresult, OBJECT_AGGREGATE,
2618 						   get_func_name(aggref->aggfnoid));
2619 		InvokeFunctionExecuteHook(aggref->aggfnoid);
2620 
2621 		/* planner recorded transition state type in the Aggref itself */
2622 		aggtranstype = aggref->aggtranstype;
2623 		Assert(OidIsValid(aggtranstype));
2624 
2625 		/*
2626 		 * If this aggregation is performing state combines, then instead of
2627 		 * using the transition function, we'll use the combine function
2628 		 */
2629 		if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit))
2630 		{
2631 			transfn_oid = aggform->aggcombinefn;
2632 
2633 			/* If not set then the planner messed up */
2634 			if (!OidIsValid(transfn_oid))
2635 				elog(ERROR, "combinefn not set for aggregate function");
2636 		}
2637 		else
2638 			transfn_oid = aggform->aggtransfn;
2639 
2640 		/* Final function only required if we're finalizing the aggregates */
2641 		if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit))
2642 			peragg->finalfn_oid = finalfn_oid = InvalidOid;
2643 		else
2644 			peragg->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
2645 
2646 		/*
2647 		 * If finalfn is marked read-write, we can't share transition states;
2648 		 * but it is okay to share states for AGGMODIFY_SHAREABLE aggs.  Also,
2649 		 * if we're not executing the finalfn here, we can share regardless.
2650 		 */
2651 		shareable = (aggform->aggfinalmodify != AGGMODIFY_READ_WRITE) ||
2652 			(finalfn_oid == InvalidOid);
2653 		peragg->shareable = shareable;
2654 
2655 		serialfn_oid = InvalidOid;
2656 		deserialfn_oid = InvalidOid;
2657 
2658 		/*
2659 		 * Check if serialization/deserialization is required.  We only do it
2660 		 * for aggregates that have transtype INTERNAL.
2661 		 */
2662 		if (aggtranstype == INTERNALOID)
2663 		{
2664 			/*
2665 			 * The planner should only have generated a serialize agg node if
2666 			 * every aggregate with an INTERNAL state has a serialization
2667 			 * function.  Verify that.
2668 			 */
2669 			if (DO_AGGSPLIT_SERIALIZE(aggstate->aggsplit))
2670 			{
2671 				/* serialization only valid when not running finalfn */
2672 				Assert(DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit));
2673 
2674 				if (!OidIsValid(aggform->aggserialfn))
2675 					elog(ERROR, "serialfunc not provided for serialization aggregation");
2676 				serialfn_oid = aggform->aggserialfn;
2677 			}
2678 
2679 			/* Likewise for deserialization functions */
2680 			if (DO_AGGSPLIT_DESERIALIZE(aggstate->aggsplit))
2681 			{
2682 				/* deserialization only valid when combining states */
2683 				Assert(DO_AGGSPLIT_COMBINE(aggstate->aggsplit));
2684 
2685 				if (!OidIsValid(aggform->aggdeserialfn))
2686 					elog(ERROR, "deserialfunc not provided for deserialization aggregation");
2687 				deserialfn_oid = aggform->aggdeserialfn;
2688 			}
2689 		}
2690 
2691 		/* Check that aggregate owner has permission to call component fns */
2692 		{
2693 			HeapTuple	procTuple;
2694 			Oid			aggOwner;
2695 
2696 			procTuple = SearchSysCache1(PROCOID,
2697 										ObjectIdGetDatum(aggref->aggfnoid));
2698 			if (!HeapTupleIsValid(procTuple))
2699 				elog(ERROR, "cache lookup failed for function %u",
2700 					 aggref->aggfnoid);
2701 			aggOwner = ((Form_pg_proc) GETSTRUCT(procTuple))->proowner;
2702 			ReleaseSysCache(procTuple);
2703 
2704 			aclresult = pg_proc_aclcheck(transfn_oid, aggOwner,
2705 										 ACL_EXECUTE);
2706 			if (aclresult != ACLCHECK_OK)
2707 				aclcheck_error(aclresult, OBJECT_FUNCTION,
2708 							   get_func_name(transfn_oid));
2709 			InvokeFunctionExecuteHook(transfn_oid);
2710 			if (OidIsValid(finalfn_oid))
2711 			{
2712 				aclresult = pg_proc_aclcheck(finalfn_oid, aggOwner,
2713 											 ACL_EXECUTE);
2714 				if (aclresult != ACLCHECK_OK)
2715 					aclcheck_error(aclresult, OBJECT_FUNCTION,
2716 								   get_func_name(finalfn_oid));
2717 				InvokeFunctionExecuteHook(finalfn_oid);
2718 			}
2719 			if (OidIsValid(serialfn_oid))
2720 			{
2721 				aclresult = pg_proc_aclcheck(serialfn_oid, aggOwner,
2722 											 ACL_EXECUTE);
2723 				if (aclresult != ACLCHECK_OK)
2724 					aclcheck_error(aclresult, OBJECT_FUNCTION,
2725 								   get_func_name(serialfn_oid));
2726 				InvokeFunctionExecuteHook(serialfn_oid);
2727 			}
2728 			if (OidIsValid(deserialfn_oid))
2729 			{
2730 				aclresult = pg_proc_aclcheck(deserialfn_oid, aggOwner,
2731 											 ACL_EXECUTE);
2732 				if (aclresult != ACLCHECK_OK)
2733 					aclcheck_error(aclresult, OBJECT_FUNCTION,
2734 								   get_func_name(deserialfn_oid));
2735 				InvokeFunctionExecuteHook(deserialfn_oid);
2736 			}
2737 		}
2738 
2739 		/*
2740 		 * Get actual datatypes of the (nominal) aggregate inputs.  These
2741 		 * could be different from the agg's declared input types, when the
2742 		 * agg accepts ANY or a polymorphic type.
2743 		 */
2744 		numArguments = get_aggregate_argtypes(aggref, inputTypes);
2745 
2746 		/* Count the "direct" arguments, if any */
2747 		numDirectArgs = list_length(aggref->aggdirectargs);
2748 
2749 		/* Detect how many arguments to pass to the finalfn */
2750 		if (aggform->aggfinalextra)
2751 			peragg->numFinalArgs = numArguments + 1;
2752 		else
2753 			peragg->numFinalArgs = numDirectArgs + 1;
2754 
2755 		/* Initialize any direct-argument expressions */
2756 		peragg->aggdirectargs = ExecInitExprList(aggref->aggdirectargs,
2757 												 (PlanState *) aggstate);
2758 
2759 		/*
2760 		 * build expression trees using actual argument & result types for the
2761 		 * finalfn, if it exists and is required.
2762 		 */
2763 		if (OidIsValid(finalfn_oid))
2764 		{
2765 			build_aggregate_finalfn_expr(inputTypes,
2766 										 peragg->numFinalArgs,
2767 										 aggtranstype,
2768 										 aggref->aggtype,
2769 										 aggref->inputcollid,
2770 										 finalfn_oid,
2771 										 &finalfnexpr);
2772 			fmgr_info(finalfn_oid, &peragg->finalfn);
2773 			fmgr_info_set_expr((Node *) finalfnexpr, &peragg->finalfn);
2774 		}
2775 
2776 		/* get info about the output value's datatype */
2777 		get_typlenbyval(aggref->aggtype,
2778 						&peragg->resulttypeLen,
2779 						&peragg->resulttypeByVal);
2780 
2781 		/*
2782 		 * initval is potentially null, so don't try to access it as a struct
2783 		 * field. Must do it the hard way with SysCacheGetAttr.
2784 		 */
2785 		textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple,
2786 									  Anum_pg_aggregate_agginitval,
2787 									  &initValueIsNull);
2788 		if (initValueIsNull)
2789 			initValue = (Datum) 0;
2790 		else
2791 			initValue = GetAggInitVal(textInitVal, aggtranstype);
2792 
2793 		/*
2794 		 * 2. Build working state for invoking the transition function, or
2795 		 * look up previously initialized working state, if we can share it.
2796 		 *
2797 		 * find_compatible_peragg() already collected a list of shareable
2798 		 * per-Trans's with the same inputs. Check if any of them have the
2799 		 * same transition function and initial value.
2800 		 */
2801 		existing_transno = find_compatible_pertrans(aggstate, aggref,
2802 													shareable,
2803 													transfn_oid, aggtranstype,
2804 													serialfn_oid, deserialfn_oid,
2805 													initValue, initValueIsNull,
2806 													same_input_transnos);
2807 		if (existing_transno != -1)
2808 		{
2809 			/*
2810 			 * Existing compatible trans found, so just point the 'peragg' to
2811 			 * the same per-trans struct, and mark the trans state as shared.
2812 			 */
2813 			pertrans = &pertransstates[existing_transno];
2814 			pertrans->aggshared = true;
2815 			peragg->transno = existing_transno;
2816 		}
2817 		else
2818 		{
2819 			pertrans = &pertransstates[++transno];
2820 			build_pertrans_for_aggref(pertrans, aggstate, estate,
2821 									  aggref, transfn_oid, aggtranstype,
2822 									  serialfn_oid, deserialfn_oid,
2823 									  initValue, initValueIsNull,
2824 									  inputTypes, numArguments);
2825 			peragg->transno = transno;
2826 		}
2827 		ReleaseSysCache(aggTuple);
2828 	}
2829 
2830 	/*
2831 	 * Update aggstate->numaggs to be the number of unique aggregates found.
2832 	 * Also set numstates to the number of unique transition states found.
2833 	 */
2834 	aggstate->numaggs = aggno + 1;
2835 	aggstate->numtrans = transno + 1;
2836 
2837 	/*
2838 	 * Last, check whether any more aggregates got added onto the node while
2839 	 * we processed the expressions for the aggregate arguments (including not
2840 	 * only the regular arguments and FILTER expressions handled immediately
2841 	 * above, but any direct arguments we might've handled earlier).  If so,
2842 	 * we have nested aggregate functions, which is semantically nonsensical,
2843 	 * so complain.  (This should have been caught by the parser, so we don't
2844 	 * need to work hard on a helpful error message; but we defend against it
2845 	 * here anyway, just to be sure.)
2846 	 */
2847 	if (numaggs != list_length(aggstate->aggs))
2848 		ereport(ERROR,
2849 				(errcode(ERRCODE_GROUPING_ERROR),
2850 				 errmsg("aggregate function calls cannot be nested")));
2851 
2852 	/*
2853 	 * Build expressions doing all the transition work at once. We build a
2854 	 * different one for each phase, as the number of transition function
2855 	 * invocation can differ between phases. Note this'll work both for
2856 	 * transition and combination functions (although there'll only be one
2857 	 * phase in the latter case).
2858 	 */
2859 	for (phaseidx = 0; phaseidx < aggstate->numphases; phaseidx++)
2860 	{
2861 		AggStatePerPhase phase = &aggstate->phases[phaseidx];
2862 		bool		dohash = false;
2863 		bool		dosort = false;
2864 
2865 		/* phase 0 doesn't necessarily exist */
2866 		if (!phase->aggnode)
2867 			continue;
2868 
2869 		if (aggstate->aggstrategy == AGG_MIXED && phaseidx == 1)
2870 		{
2871 			/*
2872 			 * Phase one, and only phase one, in a mixed agg performs both
2873 			 * sorting and aggregation.
2874 			 */
2875 			dohash = true;
2876 			dosort = true;
2877 		}
2878 		else if (aggstate->aggstrategy == AGG_MIXED && phaseidx == 0)
2879 		{
2880 			/*
2881 			 * No need to compute a transition function for an AGG_MIXED phase
2882 			 * 0 - the contents of the hashtables will have been computed
2883 			 * during phase 1.
2884 			 */
2885 			continue;
2886 		}
2887 		else if (phase->aggstrategy == AGG_PLAIN ||
2888 				 phase->aggstrategy == AGG_SORTED)
2889 		{
2890 			dohash = false;
2891 			dosort = true;
2892 		}
2893 		else if (phase->aggstrategy == AGG_HASHED)
2894 		{
2895 			dohash = true;
2896 			dosort = false;
2897 		}
2898 		else
2899 			Assert(false);
2900 
2901 		phase->evaltrans = ExecBuildAggTrans(aggstate, phase, dosort, dohash);
2902 
2903 	}
2904 
2905 	return aggstate;
2906 }
2907 
2908 /*
2909  * Build the state needed to calculate a state value for an aggregate.
2910  *
2911  * This initializes all the fields in 'pertrans'. 'aggref' is the aggregate
2912  * to initialize the state for. 'aggtransfn', 'aggtranstype', and the rest
2913  * of the arguments could be calculated from 'aggref', but the caller has
2914  * calculated them already, so might as well pass them.
2915  */
2916 static void
build_pertrans_for_aggref(AggStatePerTrans pertrans,AggState * aggstate,EState * estate,Aggref * aggref,Oid aggtransfn,Oid aggtranstype,Oid aggserialfn,Oid aggdeserialfn,Datum initValue,bool initValueIsNull,Oid * inputTypes,int numArguments)2917 build_pertrans_for_aggref(AggStatePerTrans pertrans,
2918 						  AggState *aggstate, EState *estate,
2919 						  Aggref *aggref,
2920 						  Oid aggtransfn, Oid aggtranstype,
2921 						  Oid aggserialfn, Oid aggdeserialfn,
2922 						  Datum initValue, bool initValueIsNull,
2923 						  Oid *inputTypes, int numArguments)
2924 {
2925 	int			numGroupingSets = Max(aggstate->maxsets, 1);
2926 	Expr	   *serialfnexpr = NULL;
2927 	Expr	   *deserialfnexpr = NULL;
2928 	ListCell   *lc;
2929 	int			numInputs;
2930 	int			numDirectArgs;
2931 	List	   *sortlist;
2932 	int			numSortCols;
2933 	int			numDistinctCols;
2934 	int			i;
2935 
2936 	/* Begin filling in the pertrans data */
2937 	pertrans->aggref = aggref;
2938 	pertrans->aggshared = false;
2939 	pertrans->aggCollation = aggref->inputcollid;
2940 	pertrans->transfn_oid = aggtransfn;
2941 	pertrans->serialfn_oid = aggserialfn;
2942 	pertrans->deserialfn_oid = aggdeserialfn;
2943 	pertrans->initValue = initValue;
2944 	pertrans->initValueIsNull = initValueIsNull;
2945 
2946 	/* Count the "direct" arguments, if any */
2947 	numDirectArgs = list_length(aggref->aggdirectargs);
2948 
2949 	/* Count the number of aggregated input columns */
2950 	pertrans->numInputs = numInputs = list_length(aggref->args);
2951 
2952 	pertrans->aggtranstype = aggtranstype;
2953 
2954 	/*
2955 	 * When combining states, we have no use at all for the aggregate
2956 	 * function's transfn. Instead we use the combinefn.  In this case, the
2957 	 * transfn and transfn_oid fields of pertrans refer to the combine
2958 	 * function rather than the transition function.
2959 	 */
2960 	if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit))
2961 	{
2962 		Expr	   *combinefnexpr;
2963 		size_t		numTransArgs;
2964 
2965 		/*
2966 		 * When combining there's only one input, the to-be-combined added
2967 		 * transition value from below (this node's transition value is
2968 		 * counted separately).
2969 		 */
2970 		pertrans->numTransInputs = 1;
2971 
2972 		/* account for the current transition state */
2973 		numTransArgs = pertrans->numTransInputs + 1;
2974 
2975 		build_aggregate_combinefn_expr(aggtranstype,
2976 									   aggref->inputcollid,
2977 									   aggtransfn,
2978 									   &combinefnexpr);
2979 		fmgr_info(aggtransfn, &pertrans->transfn);
2980 		fmgr_info_set_expr((Node *) combinefnexpr, &pertrans->transfn);
2981 
2982 		pertrans->transfn_fcinfo =
2983 			(FunctionCallInfo) palloc(SizeForFunctionCallInfo(2));
2984 		InitFunctionCallInfoData(*pertrans->transfn_fcinfo,
2985 								 &pertrans->transfn,
2986 								 numTransArgs,
2987 								 pertrans->aggCollation,
2988 								 (void *) aggstate, NULL);
2989 
2990 		/*
2991 		 * Ensure that a combine function to combine INTERNAL states is not
2992 		 * strict. This should have been checked during CREATE AGGREGATE, but
2993 		 * the strict property could have been changed since then.
2994 		 */
2995 		if (pertrans->transfn.fn_strict && aggtranstype == INTERNALOID)
2996 			ereport(ERROR,
2997 					(errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
2998 					 errmsg("combine function with transition type %s must not be declared STRICT",
2999 							format_type_be(aggtranstype))));
3000 	}
3001 	else
3002 	{
3003 		Expr	   *transfnexpr;
3004 		size_t		numTransArgs;
3005 
3006 		/* Detect how many arguments to pass to the transfn */
3007 		if (AGGKIND_IS_ORDERED_SET(aggref->aggkind))
3008 			pertrans->numTransInputs = numInputs;
3009 		else
3010 			pertrans->numTransInputs = numArguments;
3011 
3012 		/* account for the current transition state */
3013 		numTransArgs = pertrans->numTransInputs + 1;
3014 
3015 		/*
3016 		 * Set up infrastructure for calling the transfn.  Note that invtrans
3017 		 * is not needed here.
3018 		 */
3019 		build_aggregate_transfn_expr(inputTypes,
3020 									 numArguments,
3021 									 numDirectArgs,
3022 									 aggref->aggvariadic,
3023 									 aggtranstype,
3024 									 aggref->inputcollid,
3025 									 aggtransfn,
3026 									 InvalidOid,
3027 									 &transfnexpr,
3028 									 NULL);
3029 		fmgr_info(aggtransfn, &pertrans->transfn);
3030 		fmgr_info_set_expr((Node *) transfnexpr, &pertrans->transfn);
3031 
3032 		pertrans->transfn_fcinfo =
3033 			(FunctionCallInfo) palloc(SizeForFunctionCallInfo(numTransArgs));
3034 		InitFunctionCallInfoData(*pertrans->transfn_fcinfo,
3035 								 &pertrans->transfn,
3036 								 numTransArgs,
3037 								 pertrans->aggCollation,
3038 								 (void *) aggstate, NULL);
3039 
3040 		/*
3041 		 * If the transfn is strict and the initval is NULL, make sure input
3042 		 * type and transtype are the same (or at least binary-compatible), so
3043 		 * that it's OK to use the first aggregated input value as the initial
3044 		 * transValue.  This should have been checked at agg definition time,
3045 		 * but we must check again in case the transfn's strictness property
3046 		 * has been changed.
3047 		 */
3048 		if (pertrans->transfn.fn_strict && pertrans->initValueIsNull)
3049 		{
3050 			if (numArguments <= numDirectArgs ||
3051 				!IsBinaryCoercible(inputTypes[numDirectArgs],
3052 								   aggtranstype))
3053 				ereport(ERROR,
3054 						(errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
3055 						 errmsg("aggregate %u needs to have compatible input type and transition type",
3056 								aggref->aggfnoid)));
3057 		}
3058 	}
3059 
3060 	/* get info about the state value's datatype */
3061 	get_typlenbyval(aggtranstype,
3062 					&pertrans->transtypeLen,
3063 					&pertrans->transtypeByVal);
3064 
3065 	if (OidIsValid(aggserialfn))
3066 	{
3067 		build_aggregate_serialfn_expr(aggserialfn,
3068 									  &serialfnexpr);
3069 		fmgr_info(aggserialfn, &pertrans->serialfn);
3070 		fmgr_info_set_expr((Node *) serialfnexpr, &pertrans->serialfn);
3071 
3072 		pertrans->serialfn_fcinfo =
3073 			(FunctionCallInfo) palloc(SizeForFunctionCallInfo(1));
3074 		InitFunctionCallInfoData(*pertrans->serialfn_fcinfo,
3075 								 &pertrans->serialfn,
3076 								 1,
3077 								 InvalidOid,
3078 								 (void *) aggstate, NULL);
3079 	}
3080 
3081 	if (OidIsValid(aggdeserialfn))
3082 	{
3083 		build_aggregate_deserialfn_expr(aggdeserialfn,
3084 										&deserialfnexpr);
3085 		fmgr_info(aggdeserialfn, &pertrans->deserialfn);
3086 		fmgr_info_set_expr((Node *) deserialfnexpr, &pertrans->deserialfn);
3087 
3088 		pertrans->deserialfn_fcinfo =
3089 			(FunctionCallInfo) palloc(SizeForFunctionCallInfo(2));
3090 		InitFunctionCallInfoData(*pertrans->deserialfn_fcinfo,
3091 								 &pertrans->deserialfn,
3092 								 2,
3093 								 InvalidOid,
3094 								 (void *) aggstate, NULL);
3095 
3096 	}
3097 
3098 	/*
3099 	 * If we're doing either DISTINCT or ORDER BY for a plain agg, then we
3100 	 * have a list of SortGroupClause nodes; fish out the data in them and
3101 	 * stick them into arrays.  We ignore ORDER BY for an ordered-set agg,
3102 	 * however; the agg's transfn and finalfn are responsible for that.
3103 	 *
3104 	 * Note that by construction, if there is a DISTINCT clause then the ORDER
3105 	 * BY clause is a prefix of it (see transformDistinctClause).
3106 	 */
3107 	if (AGGKIND_IS_ORDERED_SET(aggref->aggkind))
3108 	{
3109 		sortlist = NIL;
3110 		numSortCols = numDistinctCols = 0;
3111 	}
3112 	else if (aggref->aggdistinct)
3113 	{
3114 		sortlist = aggref->aggdistinct;
3115 		numSortCols = numDistinctCols = list_length(sortlist);
3116 		Assert(numSortCols >= list_length(aggref->aggorder));
3117 	}
3118 	else
3119 	{
3120 		sortlist = aggref->aggorder;
3121 		numSortCols = list_length(sortlist);
3122 		numDistinctCols = 0;
3123 	}
3124 
3125 	pertrans->numSortCols = numSortCols;
3126 	pertrans->numDistinctCols = numDistinctCols;
3127 
3128 	/*
3129 	 * If we have either sorting or filtering to do, create a tupledesc and
3130 	 * slot corresponding to the aggregated inputs (including sort
3131 	 * expressions) of the agg.
3132 	 */
3133 	if (numSortCols > 0 || aggref->aggfilter)
3134 	{
3135 		pertrans->sortdesc = ExecTypeFromTL(aggref->args);
3136 		pertrans->sortslot =
3137 			ExecInitExtraTupleSlot(estate, pertrans->sortdesc,
3138 								   &TTSOpsMinimalTuple);
3139 	}
3140 
3141 	if (numSortCols > 0)
3142 	{
3143 		/*
3144 		 * We don't implement DISTINCT or ORDER BY aggs in the HASHED case
3145 		 * (yet)
3146 		 */
3147 		Assert(aggstate->aggstrategy != AGG_HASHED && aggstate->aggstrategy != AGG_MIXED);
3148 
3149 		/* If we have only one input, we need its len/byval info. */
3150 		if (numInputs == 1)
3151 		{
3152 			get_typlenbyval(inputTypes[numDirectArgs],
3153 							&pertrans->inputtypeLen,
3154 							&pertrans->inputtypeByVal);
3155 		}
3156 		else if (numDistinctCols > 0)
3157 		{
3158 			/* we will need an extra slot to store prior values */
3159 			pertrans->uniqslot =
3160 				ExecInitExtraTupleSlot(estate, pertrans->sortdesc,
3161 									   &TTSOpsMinimalTuple);
3162 		}
3163 
3164 		/* Extract the sort information for use later */
3165 		pertrans->sortColIdx =
3166 			(AttrNumber *) palloc(numSortCols * sizeof(AttrNumber));
3167 		pertrans->sortOperators =
3168 			(Oid *) palloc(numSortCols * sizeof(Oid));
3169 		pertrans->sortCollations =
3170 			(Oid *) palloc(numSortCols * sizeof(Oid));
3171 		pertrans->sortNullsFirst =
3172 			(bool *) palloc(numSortCols * sizeof(bool));
3173 
3174 		i = 0;
3175 		foreach(lc, sortlist)
3176 		{
3177 			SortGroupClause *sortcl = (SortGroupClause *) lfirst(lc);
3178 			TargetEntry *tle = get_sortgroupclause_tle(sortcl, aggref->args);
3179 
3180 			/* the parser should have made sure of this */
3181 			Assert(OidIsValid(sortcl->sortop));
3182 
3183 			pertrans->sortColIdx[i] = tle->resno;
3184 			pertrans->sortOperators[i] = sortcl->sortop;
3185 			pertrans->sortCollations[i] = exprCollation((Node *) tle->expr);
3186 			pertrans->sortNullsFirst[i] = sortcl->nulls_first;
3187 			i++;
3188 		}
3189 		Assert(i == numSortCols);
3190 	}
3191 
3192 	if (aggref->aggdistinct)
3193 	{
3194 		Oid		   *ops;
3195 
3196 		Assert(numArguments > 0);
3197 		Assert(list_length(aggref->aggdistinct) == numDistinctCols);
3198 
3199 		ops = palloc(numDistinctCols * sizeof(Oid));
3200 
3201 		i = 0;
3202 		foreach(lc, aggref->aggdistinct)
3203 			ops[i++] = ((SortGroupClause *) lfirst(lc))->eqop;
3204 
3205 		/* lookup / build the necessary comparators */
3206 		if (numDistinctCols == 1)
3207 			fmgr_info(get_opcode(ops[0]), &pertrans->equalfnOne);
3208 		else
3209 			pertrans->equalfnMulti =
3210 				execTuplesMatchPrepare(pertrans->sortdesc,
3211 									   numDistinctCols,
3212 									   pertrans->sortColIdx,
3213 									   ops,
3214 									   pertrans->sortCollations,
3215 									   &aggstate->ss.ps);
3216 		pfree(ops);
3217 	}
3218 
3219 	pertrans->sortstates = (Tuplesortstate **)
3220 		palloc0(sizeof(Tuplesortstate *) * numGroupingSets);
3221 }
3222 
3223 
3224 static Datum
GetAggInitVal(Datum textInitVal,Oid transtype)3225 GetAggInitVal(Datum textInitVal, Oid transtype)
3226 {
3227 	Oid			typinput,
3228 				typioparam;
3229 	char	   *strInitVal;
3230 	Datum		initVal;
3231 
3232 	getTypeInputInfo(transtype, &typinput, &typioparam);
3233 	strInitVal = TextDatumGetCString(textInitVal);
3234 	initVal = OidInputFunctionCall(typinput, strInitVal,
3235 								   typioparam, -1);
3236 	pfree(strInitVal);
3237 	return initVal;
3238 }
3239 
3240 /*
3241  * find_compatible_peragg - search for a previously initialized per-Agg struct
3242  *
3243  * Searches the previously looked at aggregates to find one which is compatible
3244  * with this one, with the same input parameters. If no compatible aggregate
3245  * can be found, returns -1.
3246  *
3247  * As a side-effect, this also collects a list of existing, shareable per-Trans
3248  * structs with matching inputs. If no identical Aggref is found, the list is
3249  * passed later to find_compatible_pertrans, to see if we can at least reuse
3250  * the state value of another aggregate.
3251  */
3252 static int
find_compatible_peragg(Aggref * newagg,AggState * aggstate,int lastaggno,List ** same_input_transnos)3253 find_compatible_peragg(Aggref *newagg, AggState *aggstate,
3254 					   int lastaggno, List **same_input_transnos)
3255 {
3256 	int			aggno;
3257 	AggStatePerAgg peraggs;
3258 
3259 	*same_input_transnos = NIL;
3260 
3261 	/* we mustn't reuse the aggref if it contains volatile function calls */
3262 	if (contain_volatile_functions((Node *) newagg))
3263 		return -1;
3264 
3265 	peraggs = aggstate->peragg;
3266 
3267 	/*
3268 	 * Search through the list of already seen aggregates. If we find an
3269 	 * existing identical aggregate call, then we can re-use that one. While
3270 	 * searching, we'll also collect a list of Aggrefs with the same input
3271 	 * parameters. If no matching Aggref is found, the caller can potentially
3272 	 * still re-use the transition state of one of them.  (At this stage we
3273 	 * just compare the parsetrees; whether different aggregates share the
3274 	 * same transition function will be checked later.)
3275 	 */
3276 	for (aggno = 0; aggno <= lastaggno; aggno++)
3277 	{
3278 		AggStatePerAgg peragg;
3279 		Aggref	   *existingRef;
3280 
3281 		peragg = &peraggs[aggno];
3282 		existingRef = peragg->aggref;
3283 
3284 		/* all of the following must be the same or it's no match */
3285 		if (newagg->inputcollid != existingRef->inputcollid ||
3286 			newagg->aggtranstype != existingRef->aggtranstype ||
3287 			newagg->aggstar != existingRef->aggstar ||
3288 			newagg->aggvariadic != existingRef->aggvariadic ||
3289 			newagg->aggkind != existingRef->aggkind ||
3290 			!equal(newagg->args, existingRef->args) ||
3291 			!equal(newagg->aggorder, existingRef->aggorder) ||
3292 			!equal(newagg->aggdistinct, existingRef->aggdistinct) ||
3293 			!equal(newagg->aggfilter, existingRef->aggfilter))
3294 			continue;
3295 
3296 		/* if it's the same aggregate function then report exact match */
3297 		if (newagg->aggfnoid == existingRef->aggfnoid &&
3298 			newagg->aggtype == existingRef->aggtype &&
3299 			newagg->aggcollid == existingRef->aggcollid &&
3300 			equal(newagg->aggdirectargs, existingRef->aggdirectargs))
3301 		{
3302 			list_free(*same_input_transnos);
3303 			*same_input_transnos = NIL;
3304 			return aggno;
3305 		}
3306 
3307 		/*
3308 		 * Not identical, but it had the same inputs.  If the final function
3309 		 * permits sharing, return its transno to the caller, in case we can
3310 		 * re-use its per-trans state.  (If there's already sharing going on,
3311 		 * we might report a transno more than once.  find_compatible_pertrans
3312 		 * is cheap enough that it's not worth spending cycles to avoid that.)
3313 		 */
3314 		if (peragg->shareable)
3315 			*same_input_transnos = lappend_int(*same_input_transnos,
3316 											   peragg->transno);
3317 	}
3318 
3319 	return -1;
3320 }
3321 
3322 /*
3323  * find_compatible_pertrans - search for a previously initialized per-Trans
3324  * struct
3325  *
3326  * Searches the list of transnos for a per-Trans struct with the same
3327  * transition function and initial condition. (The inputs have already been
3328  * verified to match.)
3329  */
3330 static int
find_compatible_pertrans(AggState * aggstate,Aggref * newagg,bool shareable,Oid aggtransfn,Oid aggtranstype,Oid aggserialfn,Oid aggdeserialfn,Datum initValue,bool initValueIsNull,List * transnos)3331 find_compatible_pertrans(AggState *aggstate, Aggref *newagg, bool shareable,
3332 						 Oid aggtransfn, Oid aggtranstype,
3333 						 Oid aggserialfn, Oid aggdeserialfn,
3334 						 Datum initValue, bool initValueIsNull,
3335 						 List *transnos)
3336 {
3337 	ListCell   *lc;
3338 
3339 	/* If this aggregate can't share transition states, give up */
3340 	if (!shareable)
3341 		return -1;
3342 
3343 	foreach(lc, transnos)
3344 	{
3345 		int			transno = lfirst_int(lc);
3346 		AggStatePerTrans pertrans = &aggstate->pertrans[transno];
3347 
3348 		/*
3349 		 * if the transfns or transition state types are not the same then the
3350 		 * state can't be shared.
3351 		 */
3352 		if (aggtransfn != pertrans->transfn_oid ||
3353 			aggtranstype != pertrans->aggtranstype)
3354 			continue;
3355 
3356 		/*
3357 		 * The serialization and deserialization functions must match, if
3358 		 * present, as we're unable to share the trans state for aggregates
3359 		 * which will serialize or deserialize into different formats.
3360 		 * Remember that these will be InvalidOid if they're not required for
3361 		 * this agg node.
3362 		 */
3363 		if (aggserialfn != pertrans->serialfn_oid ||
3364 			aggdeserialfn != pertrans->deserialfn_oid)
3365 			continue;
3366 
3367 		/*
3368 		 * Check that the initial condition matches, too.
3369 		 */
3370 		if (initValueIsNull && pertrans->initValueIsNull)
3371 			return transno;
3372 
3373 		if (!initValueIsNull && !pertrans->initValueIsNull &&
3374 			datumIsEqual(initValue, pertrans->initValue,
3375 						 pertrans->transtypeByVal, pertrans->transtypeLen))
3376 			return transno;
3377 	}
3378 	return -1;
3379 }
3380 
3381 void
ExecEndAgg(AggState * node)3382 ExecEndAgg(AggState *node)
3383 {
3384 	PlanState  *outerPlan;
3385 	int			transno;
3386 	int			numGroupingSets = Max(node->maxsets, 1);
3387 	int			setno;
3388 
3389 	/* Make sure we have closed any open tuplesorts */
3390 
3391 	if (node->sort_in)
3392 		tuplesort_end(node->sort_in);
3393 	if (node->sort_out)
3394 		tuplesort_end(node->sort_out);
3395 
3396 	for (transno = 0; transno < node->numtrans; transno++)
3397 	{
3398 		AggStatePerTrans pertrans = &node->pertrans[transno];
3399 
3400 		for (setno = 0; setno < numGroupingSets; setno++)
3401 		{
3402 			if (pertrans->sortstates[setno])
3403 				tuplesort_end(pertrans->sortstates[setno]);
3404 		}
3405 	}
3406 
3407 	/* And ensure any agg shutdown callbacks have been called */
3408 	for (setno = 0; setno < numGroupingSets; setno++)
3409 		ReScanExprContext(node->aggcontexts[setno]);
3410 	if (node->hashcontext)
3411 		ReScanExprContext(node->hashcontext);
3412 
3413 	/*
3414 	 * We don't actually free any ExprContexts here (see comment in
3415 	 * ExecFreeExprContext), just unlinking the output one from the plan node
3416 	 * suffices.
3417 	 */
3418 	ExecFreeExprContext(&node->ss.ps);
3419 
3420 	/* clean up tuple table */
3421 	ExecClearTuple(node->ss.ss_ScanTupleSlot);
3422 
3423 	outerPlan = outerPlanState(node);
3424 	ExecEndNode(outerPlan);
3425 }
3426 
3427 void
ExecReScanAgg(AggState * node)3428 ExecReScanAgg(AggState *node)
3429 {
3430 	ExprContext *econtext = node->ss.ps.ps_ExprContext;
3431 	PlanState  *outerPlan = outerPlanState(node);
3432 	Agg		   *aggnode = (Agg *) node->ss.ps.plan;
3433 	int			transno;
3434 	int			numGroupingSets = Max(node->maxsets, 1);
3435 	int			setno;
3436 
3437 	node->agg_done = false;
3438 
3439 	if (node->aggstrategy == AGG_HASHED)
3440 	{
3441 		/*
3442 		 * In the hashed case, if we haven't yet built the hash table then we
3443 		 * can just return; nothing done yet, so nothing to undo. If subnode's
3444 		 * chgParam is not NULL then it will be re-scanned by ExecProcNode,
3445 		 * else no reason to re-scan it at all.
3446 		 */
3447 		if (!node->table_filled)
3448 			return;
3449 
3450 		/*
3451 		 * If we do have the hash table, and the subplan does not have any
3452 		 * parameter changes, and none of our own parameter changes affect
3453 		 * input expressions of the aggregated functions, then we can just
3454 		 * rescan the existing hash table; no need to build it again.
3455 		 */
3456 		if (outerPlan->chgParam == NULL &&
3457 			!bms_overlap(node->ss.ps.chgParam, aggnode->aggParams))
3458 		{
3459 			ResetTupleHashIterator(node->perhash[0].hashtable,
3460 								   &node->perhash[0].hashiter);
3461 			select_current_set(node, 0, true);
3462 			return;
3463 		}
3464 	}
3465 
3466 	/* Make sure we have closed any open tuplesorts */
3467 	for (transno = 0; transno < node->numtrans; transno++)
3468 	{
3469 		for (setno = 0; setno < numGroupingSets; setno++)
3470 		{
3471 			AggStatePerTrans pertrans = &node->pertrans[transno];
3472 
3473 			if (pertrans->sortstates[setno])
3474 			{
3475 				tuplesort_end(pertrans->sortstates[setno]);
3476 				pertrans->sortstates[setno] = NULL;
3477 			}
3478 		}
3479 	}
3480 
3481 	/*
3482 	 * We don't need to ReScanExprContext the output tuple context here;
3483 	 * ExecReScan already did it. But we do need to reset our per-grouping-set
3484 	 * contexts, which may have transvalues stored in them. (We use rescan
3485 	 * rather than just reset because transfns may have registered callbacks
3486 	 * that need to be run now.) For the AGG_HASHED case, see below.
3487 	 */
3488 
3489 	for (setno = 0; setno < numGroupingSets; setno++)
3490 	{
3491 		ReScanExprContext(node->aggcontexts[setno]);
3492 	}
3493 
3494 	/* Release first tuple of group, if we have made a copy */
3495 	if (node->grp_firstTuple != NULL)
3496 	{
3497 		heap_freetuple(node->grp_firstTuple);
3498 		node->grp_firstTuple = NULL;
3499 	}
3500 	ExecClearTuple(node->ss.ss_ScanTupleSlot);
3501 
3502 	/* Forget current agg values */
3503 	MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * node->numaggs);
3504 	MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * node->numaggs);
3505 
3506 	/*
3507 	 * With AGG_HASHED/MIXED, the hash table is allocated in a sub-context of
3508 	 * the hashcontext. This used to be an issue, but now, resetting a context
3509 	 * automatically deletes sub-contexts too.
3510 	 */
3511 	if (node->aggstrategy == AGG_HASHED || node->aggstrategy == AGG_MIXED)
3512 	{
3513 		ReScanExprContext(node->hashcontext);
3514 		/* Rebuild an empty hash table */
3515 		build_hash_table(node);
3516 		node->table_filled = false;
3517 		/* iterator will be reset when the table is filled */
3518 	}
3519 
3520 	if (node->aggstrategy != AGG_HASHED)
3521 	{
3522 		/*
3523 		 * Reset the per-group state (in particular, mark transvalues null)
3524 		 */
3525 		for (setno = 0; setno < numGroupingSets; setno++)
3526 		{
3527 			MemSet(node->pergroups[setno], 0,
3528 				   sizeof(AggStatePerGroupData) * node->numaggs);
3529 		}
3530 
3531 		/* reset to phase 1 */
3532 		initialize_phase(node, 1);
3533 
3534 		node->input_done = false;
3535 		node->projected_set = -1;
3536 	}
3537 
3538 	if (outerPlan->chgParam == NULL)
3539 		ExecReScan(outerPlan);
3540 }
3541 
3542 
3543 /***********************************************************************
3544  * API exposed to aggregate functions
3545  ***********************************************************************/
3546 
3547 
3548 /*
3549  * AggCheckCallContext - test if a SQL function is being called as an aggregate
3550  *
3551  * The transition and/or final functions of an aggregate may want to verify
3552  * that they are being called as aggregates, rather than as plain SQL
3553  * functions.  They should use this function to do so.  The return value
3554  * is nonzero if being called as an aggregate, or zero if not.  (Specific
3555  * nonzero values are AGG_CONTEXT_AGGREGATE or AGG_CONTEXT_WINDOW, but more
3556  * values could conceivably appear in future.)
3557  *
3558  * If aggcontext isn't NULL, the function also stores at *aggcontext the
3559  * identity of the memory context that aggregate transition values are being
3560  * stored in.  Note that the same aggregate call site (flinfo) may be called
3561  * interleaved on different transition values in different contexts, so it's
3562  * not kosher to cache aggcontext under fn_extra.  It is, however, kosher to
3563  * cache it in the transvalue itself (for internal-type transvalues).
3564  */
3565 int
AggCheckCallContext(FunctionCallInfo fcinfo,MemoryContext * aggcontext)3566 AggCheckCallContext(FunctionCallInfo fcinfo, MemoryContext *aggcontext)
3567 {
3568 	if (fcinfo->context && IsA(fcinfo->context, AggState))
3569 	{
3570 		if (aggcontext)
3571 		{
3572 			AggState   *aggstate = ((AggState *) fcinfo->context);
3573 			ExprContext *cxt = aggstate->curaggcontext;
3574 
3575 			*aggcontext = cxt->ecxt_per_tuple_memory;
3576 		}
3577 		return AGG_CONTEXT_AGGREGATE;
3578 	}
3579 	if (fcinfo->context && IsA(fcinfo->context, WindowAggState))
3580 	{
3581 		if (aggcontext)
3582 			*aggcontext = ((WindowAggState *) fcinfo->context)->curaggcontext;
3583 		return AGG_CONTEXT_WINDOW;
3584 	}
3585 
3586 	/* this is just to prevent "uninitialized variable" warnings */
3587 	if (aggcontext)
3588 		*aggcontext = NULL;
3589 	return 0;
3590 }
3591 
3592 /*
3593  * AggGetAggref - allow an aggregate support function to get its Aggref
3594  *
3595  * If the function is being called as an aggregate support function,
3596  * return the Aggref node for the aggregate call.  Otherwise, return NULL.
3597  *
3598  * Aggregates sharing the same inputs and transition functions can get
3599  * merged into a single transition calculation.  If the transition function
3600  * calls AggGetAggref, it will get some one of the Aggrefs for which it is
3601  * executing.  It must therefore not pay attention to the Aggref fields that
3602  * relate to the final function, as those are indeterminate.  But if a final
3603  * function calls AggGetAggref, it will get a precise result.
3604  *
3605  * Note that if an aggregate is being used as a window function, this will
3606  * return NULL.  We could provide a similar function to return the relevant
3607  * WindowFunc node in such cases, but it's not needed yet.
3608  */
3609 Aggref *
AggGetAggref(FunctionCallInfo fcinfo)3610 AggGetAggref(FunctionCallInfo fcinfo)
3611 {
3612 	if (fcinfo->context && IsA(fcinfo->context, AggState))
3613 	{
3614 		AggState   *aggstate = (AggState *) fcinfo->context;
3615 		AggStatePerAgg curperagg;
3616 		AggStatePerTrans curpertrans;
3617 
3618 		/* check curperagg (valid when in a final function) */
3619 		curperagg = aggstate->curperagg;
3620 
3621 		if (curperagg)
3622 			return curperagg->aggref;
3623 
3624 		/* check curpertrans (valid when in a transition function) */
3625 		curpertrans = aggstate->curpertrans;
3626 
3627 		if (curpertrans)
3628 			return curpertrans->aggref;
3629 	}
3630 	return NULL;
3631 }
3632 
3633 /*
3634  * AggGetTempMemoryContext - fetch short-term memory context for aggregates
3635  *
3636  * This is useful in agg final functions; the context returned is one that
3637  * the final function can safely reset as desired.  This isn't useful for
3638  * transition functions, since the context returned MAY (we don't promise)
3639  * be the same as the context those are called in.
3640  *
3641  * As above, this is currently not useful for aggs called as window functions.
3642  */
3643 MemoryContext
AggGetTempMemoryContext(FunctionCallInfo fcinfo)3644 AggGetTempMemoryContext(FunctionCallInfo fcinfo)
3645 {
3646 	if (fcinfo->context && IsA(fcinfo->context, AggState))
3647 	{
3648 		AggState   *aggstate = (AggState *) fcinfo->context;
3649 
3650 		return aggstate->tmpcontext->ecxt_per_tuple_memory;
3651 	}
3652 	return NULL;
3653 }
3654 
3655 /*
3656  * AggStateIsShared - find out whether transition state is shared
3657  *
3658  * If the function is being called as an aggregate support function,
3659  * return true if the aggregate's transition state is shared across
3660  * multiple aggregates, false if it is not.
3661  *
3662  * Returns true if not called as an aggregate support function.
3663  * This is intended as a conservative answer, ie "no you'd better not
3664  * scribble on your input".  In particular, will return true if the
3665  * aggregate is being used as a window function, which is a scenario
3666  * in which changing the transition state is a bad idea.  We might
3667  * want to refine the behavior for the window case in future.
3668  */
3669 bool
AggStateIsShared(FunctionCallInfo fcinfo)3670 AggStateIsShared(FunctionCallInfo fcinfo)
3671 {
3672 	if (fcinfo->context && IsA(fcinfo->context, AggState))
3673 	{
3674 		AggState   *aggstate = (AggState *) fcinfo->context;
3675 		AggStatePerAgg curperagg;
3676 		AggStatePerTrans curpertrans;
3677 
3678 		/* check curperagg (valid when in a final function) */
3679 		curperagg = aggstate->curperagg;
3680 
3681 		if (curperagg)
3682 			return aggstate->pertrans[curperagg->transno].aggshared;
3683 
3684 		/* check curpertrans (valid when in a transition function) */
3685 		curpertrans = aggstate->curpertrans;
3686 
3687 		if (curpertrans)
3688 			return curpertrans->aggshared;
3689 	}
3690 	return true;
3691 }
3692 
3693 /*
3694  * AggRegisterCallback - register a cleanup callback for an aggregate
3695  *
3696  * This is useful for aggs to register shutdown callbacks, which will ensure
3697  * that non-memory resources are freed.  The callback will occur just before
3698  * the associated aggcontext (as returned by AggCheckCallContext) is reset,
3699  * either between groups or as a result of rescanning the query.  The callback
3700  * will NOT be called on error paths.  The typical use-case is for freeing of
3701  * tuplestores or tuplesorts maintained in aggcontext, or pins held by slots
3702  * created by the agg functions.  (The callback will not be called until after
3703  * the result of the finalfn is no longer needed, so it's safe for the finalfn
3704  * to return data that will be freed by the callback.)
3705  *
3706  * As above, this is currently not useful for aggs called as window functions.
3707  */
3708 void
AggRegisterCallback(FunctionCallInfo fcinfo,ExprContextCallbackFunction func,Datum arg)3709 AggRegisterCallback(FunctionCallInfo fcinfo,
3710 					ExprContextCallbackFunction func,
3711 					Datum arg)
3712 {
3713 	if (fcinfo->context && IsA(fcinfo->context, AggState))
3714 	{
3715 		AggState   *aggstate = (AggState *) fcinfo->context;
3716 		ExprContext *cxt = aggstate->curaggcontext;
3717 
3718 		RegisterExprContextCallback(cxt, func, arg);
3719 
3720 		return;
3721 	}
3722 	elog(ERROR, "aggregate function cannot register a callback in this context");
3723 }
3724 
3725 
3726 /*
3727  * aggregate_dummy - dummy execution routine for aggregate functions
3728  *
3729  * This function is listed as the implementation (prosrc field) of pg_proc
3730  * entries for aggregate functions.  Its only purpose is to throw an error
3731  * if someone mistakenly executes such a function in the normal way.
3732  *
3733  * Perhaps someday we could assign real meaning to the prosrc field of
3734  * an aggregate?
3735  */
3736 Datum
aggregate_dummy(PG_FUNCTION_ARGS)3737 aggregate_dummy(PG_FUNCTION_ARGS)
3738 {
3739 	elog(ERROR, "aggregate function %u called as normal function",
3740 		 fcinfo->flinfo->fn_oid);
3741 	return (Datum) 0;			/* keep compiler quiet */
3742 }
3743