1 /*-------------------------------------------------------------------------
2  *
3  * nodeAgg.c
4  *	  Routines to handle aggregate nodes.
5  *
6  *	  ExecAgg normally evaluates each aggregate in the following steps:
7  *
8  *		 transvalue = initcond
9  *		 foreach input_tuple do
10  *			transvalue = transfunc(transvalue, input_value(s))
11  *		 result = finalfunc(transvalue, direct_argument(s))
12  *
13  *	  If a finalfunc is not supplied then the result is just the ending
14  *	  value of transvalue.
15  *
16  *	  Other behaviors can be selected by the "aggsplit" mode, which exists
17  *	  to support partial aggregation.  It is possible to:
18  *	  * Skip running the finalfunc, so that the output is always the
19  *	  final transvalue state.
20  *	  * Substitute the combinefunc for the transfunc, so that transvalue
21  *	  states (propagated up from a child partial-aggregation step) are merged
22  *	  rather than processing raw input rows.  (The statements below about
23  *	  the transfunc apply equally to the combinefunc, when it's selected.)
24  *	  * Apply the serializefunc to the output values (this only makes sense
25  *	  when skipping the finalfunc, since the serializefunc works on the
26  *	  transvalue data type).
27  *	  * Apply the deserializefunc to the input values (this only makes sense
28  *	  when using the combinefunc, for similar reasons).
29  *	  It is the planner's responsibility to connect up Agg nodes using these
30  *	  alternate behaviors in a way that makes sense, with partial aggregation
31  *	  results being fed to nodes that expect them.
32  *
33  *	  If a normal aggregate call specifies DISTINCT or ORDER BY, we sort the
34  *	  input tuples and eliminate duplicates (if required) before performing
35  *	  the above-depicted process.  (However, we don't do that for ordered-set
36  *	  aggregates; their "ORDER BY" inputs are ordinary aggregate arguments
37  *	  so far as this module is concerned.)	Note that partial aggregation
38  *	  is not supported in these cases, since we couldn't ensure global
39  *	  ordering or distinctness of the inputs.
40  *
41  *	  If transfunc is marked "strict" in pg_proc and initcond is NULL,
42  *	  then the first non-NULL input_value is assigned directly to transvalue,
43  *	  and transfunc isn't applied until the second non-NULL input_value.
44  *	  The agg's first input type and transtype must be the same in this case!
45  *
46  *	  If transfunc is marked "strict" then NULL input_values are skipped,
47  *	  keeping the previous transvalue.  If transfunc is not strict then it
48  *	  is called for every input tuple and must deal with NULL initcond
49  *	  or NULL input_values for itself.
50  *
51  *	  If finalfunc is marked "strict" then it is not called when the
52  *	  ending transvalue is NULL, instead a NULL result is created
53  *	  automatically (this is just the usual handling of strict functions,
54  *	  of course).  A non-strict finalfunc can make its own choice of
55  *	  what to return for a NULL ending transvalue.
56  *
57  *	  Ordered-set aggregates are treated specially in one other way: we
58  *	  evaluate any "direct" arguments and pass them to the finalfunc along
59  *	  with the transition value.
60  *
61  *	  A finalfunc can have additional arguments beyond the transvalue and
62  *	  any "direct" arguments, corresponding to the input arguments of the
63  *	  aggregate.  These are always just passed as NULL.  Such arguments may be
64  *	  needed to allow resolution of a polymorphic aggregate's result type.
65  *
66  *	  We compute aggregate input expressions and run the transition functions
67  *	  in a temporary econtext (aggstate->tmpcontext).  This is reset at least
68  *	  once per input tuple, so when the transvalue datatype is
69  *	  pass-by-reference, we have to be careful to copy it into a longer-lived
70  *	  memory context, and free the prior value to avoid memory leakage.  We
71  *	  store transvalues in another set of econtexts, aggstate->aggcontexts
72  *	  (one per grouping set, see below), which are also used for the hashtable
73  *	  structures in AGG_HASHED mode.  These econtexts are rescanned, not just
74  *	  reset, at group boundaries so that aggregate transition functions can
75  *	  register shutdown callbacks via AggRegisterCallback.
76  *
77  *	  The node's regular econtext (aggstate->ss.ps.ps_ExprContext) is used to
78  *	  run finalize functions and compute the output tuple; this context can be
79  *	  reset once per output tuple.
80  *
81  *	  The executor's AggState node is passed as the fmgr "context" value in
82  *	  all transfunc and finalfunc calls.  It is not recommended that the
83  *	  transition functions look at the AggState node directly, but they can
84  *	  use AggCheckCallContext() to verify that they are being called by
85  *	  nodeAgg.c (and not as ordinary SQL functions).  The main reason a
86  *	  transition function might want to know this is so that it can avoid
87  *	  palloc'ing a fixed-size pass-by-ref transition value on every call:
88  *	  it can instead just scribble on and return its left input.  Ordinarily
89  *	  it is completely forbidden for functions to modify pass-by-ref inputs,
90  *	  but in the aggregate case we know the left input is either the initial
91  *	  transition value or a previous function result, and in either case its
92  *	  value need not be preserved.  See int8inc() for an example.  Notice that
93  *	  the EEOP_AGG_PLAIN_TRANS step is coded to avoid a data copy step when
94  *	  the previous transition value pointer is returned.  It is also possible
95  *	  to avoid repeated data copying when the transition value is an expanded
96  *	  object: to do that, the transition function must take care to return
97  *	  an expanded object that is in a child context of the memory context
98  *	  returned by AggCheckCallContext().  Also, some transition functions want
99  *	  to store working state in addition to the nominal transition value; they
100  *	  can use the memory context returned by AggCheckCallContext() to do that.
101  *
102  *	  Note: AggCheckCallContext() is available as of PostgreSQL 9.0.  The
103  *	  AggState is available as context in earlier releases (back to 8.1),
104  *	  but direct examination of the node is needed to use it before 9.0.
105  *
106  *	  As of 9.4, aggregate transition functions can also use AggGetAggref()
107  *	  to get hold of the Aggref expression node for their aggregate call.
108  *	  This is mainly intended for ordered-set aggregates, which are not
109  *	  supported as window functions.  (A regular aggregate function would
110  *	  need some fallback logic to use this, since there's no Aggref node
111  *	  for a window function.)
112  *
113  *	  Grouping sets:
114  *
115  *	  A list of grouping sets which is structurally equivalent to a ROLLUP
116  *	  clause (e.g. (a,b,c), (a,b), (a)) can be processed in a single pass over
117  *	  ordered data.  We do this by keeping a separate set of transition values
118  *	  for each grouping set being concurrently processed; for each input tuple
119  *	  we update them all, and on group boundaries we reset those states
120  *	  (starting at the front of the list) whose grouping values have changed
121  *	  (the list of grouping sets is ordered from most specific to least
122  *	  specific).
123  *
124  *	  Where more complex grouping sets are used, we break them down into
125  *	  "phases", where each phase has a different sort order (except phase 0
126  *	  which is reserved for hashing).  During each phase but the last, the
127  *	  input tuples are additionally stored in a tuplesort which is keyed to the
128  *	  next phase's sort order; during each phase but the first, the input
129  *	  tuples are drawn from the previously sorted data.  (The sorting of the
130  *	  data for the first phase is handled by the planner, as it might be
131  *	  satisfied by underlying nodes.)
132  *
133  *	  Hashing can be mixed with sorted grouping.  To do this, we have an
134  *	  AGG_MIXED strategy that populates the hashtables during the first sorted
135  *	  phase, and switches to reading them out after completing all sort phases.
136  *	  We can also support AGG_HASHED with multiple hash tables and no sorting
137  *	  at all.
138  *
139  *	  From the perspective of aggregate transition and final functions, the
140  *	  only issue regarding grouping sets is this: a single call site (flinfo)
141  *	  of an aggregate function may be used for updating several different
142  *	  transition values in turn. So the function must not cache in the flinfo
143  *	  anything which logically belongs as part of the transition value (most
144  *	  importantly, the memory context in which the transition value exists).
145  *	  The support API functions (AggCheckCallContext, AggRegisterCallback) are
146  *	  sensitive to the grouping set for which the aggregate function is
147  *	  currently being called.
148  *
149  *	  Plan structure:
150  *
151  *	  What we get from the planner is actually one "real" Agg node which is
152  *	  part of the plan tree proper, but which optionally has an additional list
153  *	  of Agg nodes hung off the side via the "chain" field.  This is because an
154  *	  Agg node happens to be a convenient representation of all the data we
155  *	  need for grouping sets.
156  *
157  *	  For many purposes, we treat the "real" node as if it were just the first
158  *	  node in the chain.  The chain must be ordered such that hashed entries
159  *	  come before sorted/plain entries; the real node is marked AGG_MIXED if
160  *	  there are both types present (in which case the real node describes one
161  *	  of the hashed groupings, other AGG_HASHED nodes may optionally follow in
162  *	  the chain, followed in turn by AGG_SORTED or (one) AGG_PLAIN node).  If
163  *	  the real node is marked AGG_HASHED or AGG_SORTED, then all the chained
164  *	  nodes must be of the same type; if it is AGG_PLAIN, there can be no
165  *	  chained nodes.
166  *
167  *	  We collect all hashed nodes into a single "phase", numbered 0, and create
168  *	  a sorted phase (numbered 1..n) for each AGG_SORTED or AGG_PLAIN node.
169  *	  Phase 0 is allocated even if there are no hashes, but remains unused in
170  *	  that case.
171  *
172  *	  AGG_HASHED nodes actually refer to only a single grouping set each,
173  *	  because for each hashed grouping we need a separate grpColIdx and
174  *	  numGroups estimate.  AGG_SORTED nodes represent a "rollup", a list of
175  *	  grouping sets that share a sort order.  Each AGG_SORTED node other than
176  *	  the first one has an associated Sort node which describes the sort order
177  *	  to be used; the first sorted node takes its input from the outer subtree,
178  *	  which the planner has already arranged to provide ordered data.
179  *
180  *	  Memory and ExprContext usage:
181  *
182  *	  Because we're accumulating aggregate values across input rows, we need to
183  *	  use more memory contexts than just simple input/output tuple contexts.
184  *	  In fact, for a rollup, we need a separate context for each grouping set
185  *	  so that we can reset the inner (finer-grained) aggregates on their group
186  *	  boundaries while continuing to accumulate values for outer
187  *	  (coarser-grained) groupings.  On top of this, we might be simultaneously
188  *	  populating hashtables; however, we only need one context for all the
189  *	  hashtables.
190  *
191  *	  So we create an array, aggcontexts, with an ExprContext for each grouping
192  *	  set in the largest rollup that we're going to process, and use the
193  *	  per-tuple memory context of those ExprContexts to store the aggregate
194  *	  transition values.  hashcontext is the single context created to support
195  *	  all hash tables.
196  *
197  *    Transition / Combine function invocation:
198  *
199  *    For performance reasons transition functions, including combine
200  *    functions, aren't invoked one-by-one from nodeAgg.c after computing
201  *    arguments using the expression evaluation engine. Instead
202  *    ExecBuildAggTrans() builds one large expression that does both argument
203  *    evaluation and transition function invocation. That avoids performance
204  *    issues due to repeated uses of expression evaluation, complications due
205  *    to filter expressions having to be evaluated early, and allows to JIT
206  *    the entire expression into one native function.
207  *
208  * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
209  * Portions Copyright (c) 1994, Regents of the University of California
210  *
211  * IDENTIFICATION
212  *	  src/backend/executor/nodeAgg.c
213  *
214  *-------------------------------------------------------------------------
215  */
216 
217 #include "postgres.h"
218 
219 #include "access/htup_details.h"
220 #include "catalog/objectaccess.h"
221 #include "catalog/pg_aggregate.h"
222 #include "catalog/pg_proc.h"
223 #include "catalog/pg_type.h"
224 #include "executor/execExpr.h"
225 #include "executor/executor.h"
226 #include "executor/nodeAgg.h"
227 #include "miscadmin.h"
228 #include "nodes/makefuncs.h"
229 #include "nodes/nodeFuncs.h"
230 #include "optimizer/clauses.h"
231 #include "optimizer/tlist.h"
232 #include "parser/parse_agg.h"
233 #include "parser/parse_coerce.h"
234 #include "utils/acl.h"
235 #include "utils/builtins.h"
236 #include "utils/lsyscache.h"
237 #include "utils/memutils.h"
238 #include "utils/syscache.h"
239 #include "utils/tuplesort.h"
240 #include "utils/datum.h"
241 
242 
243 static void select_current_set(AggState *aggstate, int setno, bool is_hash);
244 static void initialize_phase(AggState *aggstate, int newphase);
245 static TupleTableSlot *fetch_input_tuple(AggState *aggstate);
246 static void initialize_aggregates(AggState *aggstate,
247 					  AggStatePerGroup *pergroups,
248 					  int numReset);
249 static void advance_transition_function(AggState *aggstate,
250 							AggStatePerTrans pertrans,
251 							AggStatePerGroup pergroupstate);
252 static void advance_aggregates(AggState *aggstate);
253 static void process_ordered_aggregate_single(AggState *aggstate,
254 								 AggStatePerTrans pertrans,
255 								 AggStatePerGroup pergroupstate);
256 static void process_ordered_aggregate_multi(AggState *aggstate,
257 								AggStatePerTrans pertrans,
258 								AggStatePerGroup pergroupstate);
259 static void finalize_aggregate(AggState *aggstate,
260 				   AggStatePerAgg peragg,
261 				   AggStatePerGroup pergroupstate,
262 				   Datum *resultVal, bool *resultIsNull);
263 static void finalize_partialaggregate(AggState *aggstate,
264 						  AggStatePerAgg peragg,
265 						  AggStatePerGroup pergroupstate,
266 						  Datum *resultVal, bool *resultIsNull);
267 static void prepare_projection_slot(AggState *aggstate,
268 						TupleTableSlot *slot,
269 						int currentSet);
270 static void finalize_aggregates(AggState *aggstate,
271 					AggStatePerAgg peragg,
272 					AggStatePerGroup pergroup);
273 static TupleTableSlot *project_aggregates(AggState *aggstate);
274 static Bitmapset *find_unaggregated_cols(AggState *aggstate);
275 static bool find_unaggregated_cols_walker(Node *node, Bitmapset **colnos);
276 static void build_hash_table(AggState *aggstate);
277 static TupleHashEntryData *lookup_hash_entry(AggState *aggstate);
278 static void lookup_hash_entries(AggState *aggstate);
279 static TupleTableSlot *agg_retrieve_direct(AggState *aggstate);
280 static void agg_fill_hash_table(AggState *aggstate);
281 static TupleTableSlot *agg_retrieve_hash_table(AggState *aggstate);
282 static Datum GetAggInitVal(Datum textInitVal, Oid transtype);
283 static void build_pertrans_for_aggref(AggStatePerTrans pertrans,
284 						  AggState *aggstate, EState *estate,
285 						  Aggref *aggref, Oid aggtransfn, Oid aggtranstype,
286 						  Oid aggserialfn, Oid aggdeserialfn,
287 						  Datum initValue, bool initValueIsNull,
288 						  Oid *inputTypes, int numArguments);
289 static int find_compatible_peragg(Aggref *newagg, AggState *aggstate,
290 					   int lastaggno, List **same_input_transnos);
291 static int find_compatible_pertrans(AggState *aggstate, Aggref *newagg,
292 						 bool shareable,
293 						 Oid aggtransfn, Oid aggtranstype,
294 						 Oid aggserialfn, Oid aggdeserialfn,
295 						 Datum initValue, bool initValueIsNull,
296 						 List *transnos);
297 
298 
299 /*
300  * Select the current grouping set; affects current_set and
301  * curaggcontext.
302  */
303 static void
select_current_set(AggState * aggstate,int setno,bool is_hash)304 select_current_set(AggState *aggstate, int setno, bool is_hash)
305 {
306 	/* when changing this, also adapt ExecInterpExpr() and friends */
307 	if (is_hash)
308 		aggstate->curaggcontext = aggstate->hashcontext;
309 	else
310 		aggstate->curaggcontext = aggstate->aggcontexts[setno];
311 
312 	aggstate->current_set = setno;
313 }
314 
315 /*
316  * Switch to phase "newphase", which must either be 0 or 1 (to reset) or
317  * current_phase + 1. Juggle the tuplesorts accordingly.
318  *
319  * Phase 0 is for hashing, which we currently handle last in the AGG_MIXED
320  * case, so when entering phase 0, all we need to do is drop open sorts.
321  */
322 static void
initialize_phase(AggState * aggstate,int newphase)323 initialize_phase(AggState *aggstate, int newphase)
324 {
325 	Assert(newphase <= 1 || newphase == aggstate->current_phase + 1);
326 
327 	/*
328 	 * Whatever the previous state, we're now done with whatever input
329 	 * tuplesort was in use.
330 	 */
331 	if (aggstate->sort_in)
332 	{
333 		tuplesort_end(aggstate->sort_in);
334 		aggstate->sort_in = NULL;
335 	}
336 
337 	if (newphase <= 1)
338 	{
339 		/*
340 		 * Discard any existing output tuplesort.
341 		 */
342 		if (aggstate->sort_out)
343 		{
344 			tuplesort_end(aggstate->sort_out);
345 			aggstate->sort_out = NULL;
346 		}
347 	}
348 	else
349 	{
350 		/*
351 		 * The old output tuplesort becomes the new input one, and this is the
352 		 * right time to actually sort it.
353 		 */
354 		aggstate->sort_in = aggstate->sort_out;
355 		aggstate->sort_out = NULL;
356 		Assert(aggstate->sort_in);
357 		tuplesort_performsort(aggstate->sort_in);
358 	}
359 
360 	/*
361 	 * If this isn't the last phase, we need to sort appropriately for the
362 	 * next phase in sequence.
363 	 */
364 	if (newphase > 0 && newphase < aggstate->numphases - 1)
365 	{
366 		Sort	   *sortnode = aggstate->phases[newphase + 1].sortnode;
367 		PlanState  *outerNode = outerPlanState(aggstate);
368 		TupleDesc	tupDesc = ExecGetResultType(outerNode);
369 
370 		aggstate->sort_out = tuplesort_begin_heap(tupDesc,
371 												  sortnode->numCols,
372 												  sortnode->sortColIdx,
373 												  sortnode->sortOperators,
374 												  sortnode->collations,
375 												  sortnode->nullsFirst,
376 												  work_mem,
377 												  NULL, false);
378 	}
379 
380 	aggstate->current_phase = newphase;
381 	aggstate->phase = &aggstate->phases[newphase];
382 }
383 
384 /*
385  * Fetch a tuple from either the outer plan (for phase 1) or from the sorter
386  * populated by the previous phase.  Copy it to the sorter for the next phase
387  * if any.
388  *
389  * Callers cannot rely on memory for tuple in returned slot remaining valid
390  * past any subsequently fetched tuple.
391  */
392 static TupleTableSlot *
fetch_input_tuple(AggState * aggstate)393 fetch_input_tuple(AggState *aggstate)
394 {
395 	TupleTableSlot *slot;
396 
397 	if (aggstate->sort_in)
398 	{
399 		/* make sure we check for interrupts in either path through here */
400 		CHECK_FOR_INTERRUPTS();
401 		if (!tuplesort_gettupleslot(aggstate->sort_in, true, false,
402 									aggstate->sort_slot, NULL))
403 			return NULL;
404 		slot = aggstate->sort_slot;
405 	}
406 	else
407 		slot = ExecProcNode(outerPlanState(aggstate));
408 
409 	if (!TupIsNull(slot) && aggstate->sort_out)
410 		tuplesort_puttupleslot(aggstate->sort_out, slot);
411 
412 	return slot;
413 }
414 
415 /*
416  * (Re)Initialize an individual aggregate.
417  *
418  * This function handles only one grouping set, already set in
419  * aggstate->current_set.
420  *
421  * When called, CurrentMemoryContext should be the per-query context.
422  */
423 static void
initialize_aggregate(AggState * aggstate,AggStatePerTrans pertrans,AggStatePerGroup pergroupstate)424 initialize_aggregate(AggState *aggstate, AggStatePerTrans pertrans,
425 					 AggStatePerGroup pergroupstate)
426 {
427 	/*
428 	 * Start a fresh sort operation for each DISTINCT/ORDER BY aggregate.
429 	 */
430 	if (pertrans->numSortCols > 0)
431 	{
432 		/*
433 		 * In case of rescan, maybe there could be an uncompleted sort
434 		 * operation?  Clean it up if so.
435 		 */
436 		if (pertrans->sortstates[aggstate->current_set])
437 			tuplesort_end(pertrans->sortstates[aggstate->current_set]);
438 
439 
440 		/*
441 		 * We use a plain Datum sorter when there's a single input column;
442 		 * otherwise sort the full tuple.  (See comments for
443 		 * process_ordered_aggregate_single.)
444 		 */
445 		if (pertrans->numInputs == 1)
446 		{
447 			Form_pg_attribute attr = TupleDescAttr(pertrans->sortdesc, 0);
448 
449 			pertrans->sortstates[aggstate->current_set] =
450 				tuplesort_begin_datum(attr->atttypid,
451 									  pertrans->sortOperators[0],
452 									  pertrans->sortCollations[0],
453 									  pertrans->sortNullsFirst[0],
454 									  work_mem, NULL, false);
455 		}
456 		else
457 			pertrans->sortstates[aggstate->current_set] =
458 				tuplesort_begin_heap(pertrans->sortdesc,
459 									 pertrans->numSortCols,
460 									 pertrans->sortColIdx,
461 									 pertrans->sortOperators,
462 									 pertrans->sortCollations,
463 									 pertrans->sortNullsFirst,
464 									 work_mem, NULL, false);
465 	}
466 
467 	/*
468 	 * (Re)set transValue to the initial value.
469 	 *
470 	 * Note that when the initial value is pass-by-ref, we must copy it (into
471 	 * the aggcontext) since we will pfree the transValue later.
472 	 */
473 	if (pertrans->initValueIsNull)
474 		pergroupstate->transValue = pertrans->initValue;
475 	else
476 	{
477 		MemoryContext oldContext;
478 
479 		oldContext = MemoryContextSwitchTo(
480 										   aggstate->curaggcontext->ecxt_per_tuple_memory);
481 		pergroupstate->transValue = datumCopy(pertrans->initValue,
482 											  pertrans->transtypeByVal,
483 											  pertrans->transtypeLen);
484 		MemoryContextSwitchTo(oldContext);
485 	}
486 	pergroupstate->transValueIsNull = pertrans->initValueIsNull;
487 
488 	/*
489 	 * If the initial value for the transition state doesn't exist in the
490 	 * pg_aggregate table then we will let the first non-NULL value returned
491 	 * from the outer procNode become the initial value. (This is useful for
492 	 * aggregates like max() and min().) The noTransValue flag signals that we
493 	 * still need to do this.
494 	 */
495 	pergroupstate->noTransValue = pertrans->initValueIsNull;
496 }
497 
498 /*
499  * Initialize all aggregate transition states for a new group of input values.
500  *
501  * If there are multiple grouping sets, we initialize only the first numReset
502  * of them (the grouping sets are ordered so that the most specific one, which
503  * is reset most often, is first). As a convenience, if numReset is 0, we
504  * reinitialize all sets.
505  *
506  * NB: This cannot be used for hash aggregates, as for those the grouping set
507  * number has to be specified from further up.
508  *
509  * When called, CurrentMemoryContext should be the per-query context.
510  */
511 static void
initialize_aggregates(AggState * aggstate,AggStatePerGroup * pergroups,int numReset)512 initialize_aggregates(AggState *aggstate,
513 					  AggStatePerGroup *pergroups,
514 					  int numReset)
515 {
516 	int			transno;
517 	int			numGroupingSets = Max(aggstate->phase->numsets, 1);
518 	int			setno = 0;
519 	int			numTrans = aggstate->numtrans;
520 	AggStatePerTrans transstates = aggstate->pertrans;
521 
522 	if (numReset == 0)
523 		numReset = numGroupingSets;
524 
525 	for (setno = 0; setno < numReset; setno++)
526 	{
527 		AggStatePerGroup pergroup = pergroups[setno];
528 
529 		select_current_set(aggstate, setno, false);
530 
531 		for (transno = 0; transno < numTrans; transno++)
532 		{
533 			AggStatePerTrans pertrans = &transstates[transno];
534 			AggStatePerGroup pergroupstate = &pergroup[transno];
535 
536 			initialize_aggregate(aggstate, pertrans, pergroupstate);
537 		}
538 	}
539 }
540 
541 /*
542  * Given new input value(s), advance the transition function of one aggregate
543  * state within one grouping set only (already set in aggstate->current_set)
544  *
545  * The new values (and null flags) have been preloaded into argument positions
546  * 1 and up in pertrans->transfn_fcinfo, so that we needn't copy them again to
547  * pass to the transition function.  We also expect that the static fields of
548  * the fcinfo are already initialized; that was done by ExecInitAgg().
549  *
550  * It doesn't matter which memory context this is called in.
551  */
552 static void
advance_transition_function(AggState * aggstate,AggStatePerTrans pertrans,AggStatePerGroup pergroupstate)553 advance_transition_function(AggState *aggstate,
554 							AggStatePerTrans pertrans,
555 							AggStatePerGroup pergroupstate)
556 {
557 	FunctionCallInfo fcinfo = &pertrans->transfn_fcinfo;
558 	MemoryContext oldContext;
559 	Datum		newVal;
560 
561 	if (pertrans->transfn.fn_strict)
562 	{
563 		/*
564 		 * For a strict transfn, nothing happens when there's a NULL input; we
565 		 * just keep the prior transValue.
566 		 */
567 		int			numTransInputs = pertrans->numTransInputs;
568 		int			i;
569 
570 		for (i = 1; i <= numTransInputs; i++)
571 		{
572 			if (fcinfo->argnull[i])
573 				return;
574 		}
575 		if (pergroupstate->noTransValue)
576 		{
577 			/*
578 			 * transValue has not been initialized. This is the first non-NULL
579 			 * input value. We use it as the initial value for transValue. (We
580 			 * already checked that the agg's input type is binary-compatible
581 			 * with its transtype, so straight copy here is OK.)
582 			 *
583 			 * We must copy the datum into aggcontext if it is pass-by-ref. We
584 			 * do not need to pfree the old transValue, since it's NULL.
585 			 */
586 			oldContext = MemoryContextSwitchTo(
587 											   aggstate->curaggcontext->ecxt_per_tuple_memory);
588 			pergroupstate->transValue = datumCopy(fcinfo->arg[1],
589 												  pertrans->transtypeByVal,
590 												  pertrans->transtypeLen);
591 			pergroupstate->transValueIsNull = false;
592 			pergroupstate->noTransValue = false;
593 			MemoryContextSwitchTo(oldContext);
594 			return;
595 		}
596 		if (pergroupstate->transValueIsNull)
597 		{
598 			/*
599 			 * Don't call a strict function with NULL inputs.  Note it is
600 			 * possible to get here despite the above tests, if the transfn is
601 			 * strict *and* returned a NULL on a prior cycle. If that happens
602 			 * we will propagate the NULL all the way to the end.
603 			 */
604 			return;
605 		}
606 	}
607 
608 	/* We run the transition functions in per-input-tuple memory context */
609 	oldContext = MemoryContextSwitchTo(aggstate->tmpcontext->ecxt_per_tuple_memory);
610 
611 	/* set up aggstate->curpertrans for AggGetAggref() */
612 	aggstate->curpertrans = pertrans;
613 
614 	/*
615 	 * OK to call the transition function
616 	 */
617 	fcinfo->arg[0] = pergroupstate->transValue;
618 	fcinfo->argnull[0] = pergroupstate->transValueIsNull;
619 	fcinfo->isnull = false;		/* just in case transfn doesn't set it */
620 
621 	newVal = FunctionCallInvoke(fcinfo);
622 
623 	aggstate->curpertrans = NULL;
624 
625 	/*
626 	 * If pass-by-ref datatype, must copy the new value into aggcontext and
627 	 * free the prior transValue.  But if transfn returned a pointer to its
628 	 * first input, we don't need to do anything.  Also, if transfn returned a
629 	 * pointer to a R/W expanded object that is already a child of the
630 	 * aggcontext, assume we can adopt that value without copying it.
631 	 *
632 	 * It's safe to compare newVal with pergroup->transValue without
633 	 * regard for either being NULL, because ExecAggTransReparent()
634 	 * takes care to set transValue to 0 when NULL. Otherwise we could
635 	 * end up accidentally not reparenting, when the transValue has
636 	 * the same numerical value as newValue, despite being NULL.  This
637 	 * is a somewhat hot path, making it undesirable to instead solve
638 	 * this with another branch for the common case of the transition
639 	 * function returning its (modified) input argument.
640 	 */
641 	if (!pertrans->transtypeByVal &&
642 		DatumGetPointer(newVal) != DatumGetPointer(pergroupstate->transValue))
643 		newVal = ExecAggTransReparent(aggstate, pertrans,
644 									  newVal, fcinfo->isnull,
645 									  pergroupstate->transValue,
646 									  pergroupstate->transValueIsNull);
647 
648 	pergroupstate->transValue = newVal;
649 	pergroupstate->transValueIsNull = fcinfo->isnull;
650 
651 	MemoryContextSwitchTo(oldContext);
652 }
653 
654 /*
655  * Advance each aggregate transition state for one input tuple.  The input
656  * tuple has been stored in tmpcontext->ecxt_outertuple, so that it is
657  * accessible to ExecEvalExpr.
658  *
659  * We have two sets of transition states to handle: one for sorted aggregation
660  * and one for hashed; we do them both here, to avoid multiple evaluation of
661  * the inputs.
662  *
663  * When called, CurrentMemoryContext should be the per-query context.
664  */
665 static void
advance_aggregates(AggState * aggstate)666 advance_aggregates(AggState *aggstate)
667 {
668 	bool		dummynull;
669 
670 	ExecEvalExprSwitchContext(aggstate->phase->evaltrans,
671 							  aggstate->tmpcontext,
672 							  &dummynull);
673 }
674 
675 /*
676  * Run the transition function for a DISTINCT or ORDER BY aggregate
677  * with only one input.  This is called after we have completed
678  * entering all the input values into the sort object.  We complete the
679  * sort, read out the values in sorted order, and run the transition
680  * function on each value (applying DISTINCT if appropriate).
681  *
682  * Note that the strictness of the transition function was checked when
683  * entering the values into the sort, so we don't check it again here;
684  * we just apply standard SQL DISTINCT logic.
685  *
686  * The one-input case is handled separately from the multi-input case
687  * for performance reasons: for single by-value inputs, such as the
688  * common case of count(distinct id), the tuplesort_getdatum code path
689  * is around 300% faster.  (The speedup for by-reference types is less
690  * but still noticeable.)
691  *
692  * This function handles only one grouping set (already set in
693  * aggstate->current_set).
694  *
695  * When called, CurrentMemoryContext should be the per-query context.
696  */
697 static void
process_ordered_aggregate_single(AggState * aggstate,AggStatePerTrans pertrans,AggStatePerGroup pergroupstate)698 process_ordered_aggregate_single(AggState *aggstate,
699 								 AggStatePerTrans pertrans,
700 								 AggStatePerGroup pergroupstate)
701 {
702 	Datum		oldVal = (Datum) 0;
703 	bool		oldIsNull = true;
704 	bool		haveOldVal = false;
705 	MemoryContext workcontext = aggstate->tmpcontext->ecxt_per_tuple_memory;
706 	MemoryContext oldContext;
707 	bool		isDistinct = (pertrans->numDistinctCols > 0);
708 	Datum		newAbbrevVal = (Datum) 0;
709 	Datum		oldAbbrevVal = (Datum) 0;
710 	FunctionCallInfo fcinfo = &pertrans->transfn_fcinfo;
711 	Datum	   *newVal;
712 	bool	   *isNull;
713 
714 	Assert(pertrans->numDistinctCols < 2);
715 
716 	tuplesort_performsort(pertrans->sortstates[aggstate->current_set]);
717 
718 	/* Load the column into argument 1 (arg 0 will be transition value) */
719 	newVal = fcinfo->arg + 1;
720 	isNull = fcinfo->argnull + 1;
721 
722 	/*
723 	 * Note: if input type is pass-by-ref, the datums returned by the sort are
724 	 * freshly palloc'd in the per-query context, so we must be careful to
725 	 * pfree them when they are no longer needed.
726 	 */
727 
728 	while (tuplesort_getdatum(pertrans->sortstates[aggstate->current_set],
729 							  true, newVal, isNull, &newAbbrevVal))
730 	{
731 		/*
732 		 * Clear and select the working context for evaluation of the equality
733 		 * function and transition function.
734 		 */
735 		MemoryContextReset(workcontext);
736 		oldContext = MemoryContextSwitchTo(workcontext);
737 
738 		/*
739 		 * If DISTINCT mode, and not distinct from prior, skip it.
740 		 *
741 		 * Note: we assume equality functions don't care about collation.
742 		 */
743 		if (isDistinct &&
744 			haveOldVal &&
745 			((oldIsNull && *isNull) ||
746 			 (!oldIsNull && !*isNull &&
747 			  oldAbbrevVal == newAbbrevVal &&
748 			  DatumGetBool(FunctionCall2(&pertrans->equalfnOne,
749 										 oldVal, *newVal)))))
750 		{
751 			/* equal to prior, so forget this one */
752 			if (!pertrans->inputtypeByVal && !*isNull)
753 				pfree(DatumGetPointer(*newVal));
754 		}
755 		else
756 		{
757 			advance_transition_function(aggstate, pertrans, pergroupstate);
758 			/* forget the old value, if any */
759 			if (!oldIsNull && !pertrans->inputtypeByVal)
760 				pfree(DatumGetPointer(oldVal));
761 			/* and remember the new one for subsequent equality checks */
762 			oldVal = *newVal;
763 			oldAbbrevVal = newAbbrevVal;
764 			oldIsNull = *isNull;
765 			haveOldVal = true;
766 		}
767 
768 		MemoryContextSwitchTo(oldContext);
769 	}
770 
771 	if (!oldIsNull && !pertrans->inputtypeByVal)
772 		pfree(DatumGetPointer(oldVal));
773 
774 	tuplesort_end(pertrans->sortstates[aggstate->current_set]);
775 	pertrans->sortstates[aggstate->current_set] = NULL;
776 }
777 
778 /*
779  * Run the transition function for a DISTINCT or ORDER BY aggregate
780  * with more than one input.  This is called after we have completed
781  * entering all the input values into the sort object.  We complete the
782  * sort, read out the values in sorted order, and run the transition
783  * function on each value (applying DISTINCT if appropriate).
784  *
785  * This function handles only one grouping set (already set in
786  * aggstate->current_set).
787  *
788  * When called, CurrentMemoryContext should be the per-query context.
789  */
790 static void
process_ordered_aggregate_multi(AggState * aggstate,AggStatePerTrans pertrans,AggStatePerGroup pergroupstate)791 process_ordered_aggregate_multi(AggState *aggstate,
792 								AggStatePerTrans pertrans,
793 								AggStatePerGroup pergroupstate)
794 {
795 	ExprContext *tmpcontext = aggstate->tmpcontext;
796 	FunctionCallInfo fcinfo = &pertrans->transfn_fcinfo;
797 	TupleTableSlot *slot1 = pertrans->sortslot;
798 	TupleTableSlot *slot2 = pertrans->uniqslot;
799 	int			numTransInputs = pertrans->numTransInputs;
800 	int			numDistinctCols = pertrans->numDistinctCols;
801 	Datum		newAbbrevVal = (Datum) 0;
802 	Datum		oldAbbrevVal = (Datum) 0;
803 	bool		haveOldValue = false;
804 	TupleTableSlot *save = aggstate->tmpcontext->ecxt_outertuple;
805 	int			i;
806 
807 	tuplesort_performsort(pertrans->sortstates[aggstate->current_set]);
808 
809 	ExecClearTuple(slot1);
810 	if (slot2)
811 		ExecClearTuple(slot2);
812 
813 	while (tuplesort_gettupleslot(pertrans->sortstates[aggstate->current_set],
814 								  true, true, slot1, &newAbbrevVal))
815 	{
816 		CHECK_FOR_INTERRUPTS();
817 
818 		tmpcontext->ecxt_outertuple = slot1;
819 		tmpcontext->ecxt_innertuple = slot2;
820 
821 		if (numDistinctCols == 0 ||
822 			!haveOldValue ||
823 			newAbbrevVal != oldAbbrevVal ||
824 			!ExecQual(pertrans->equalfnMulti, tmpcontext))
825 		{
826 			/*
827 			 * Extract the first numTransInputs columns as datums to pass to
828 			 * the transfn.
829 			 */
830 			slot_getsomeattrs(slot1, numTransInputs);
831 
832 			/* Load values into fcinfo */
833 			/* Start from 1, since the 0th arg will be the transition value */
834 			for (i = 0; i < numTransInputs; i++)
835 			{
836 				fcinfo->arg[i + 1] = slot1->tts_values[i];
837 				fcinfo->argnull[i + 1] = slot1->tts_isnull[i];
838 			}
839 
840 			advance_transition_function(aggstate, pertrans, pergroupstate);
841 
842 			if (numDistinctCols > 0)
843 			{
844 				/* swap the slot pointers to retain the current tuple */
845 				TupleTableSlot *tmpslot = slot2;
846 
847 				slot2 = slot1;
848 				slot1 = tmpslot;
849 				/* avoid ExecQual() calls by reusing abbreviated keys */
850 				oldAbbrevVal = newAbbrevVal;
851 				haveOldValue = true;
852 			}
853 		}
854 
855 		/* Reset context each time */
856 		ResetExprContext(tmpcontext);
857 
858 		ExecClearTuple(slot1);
859 	}
860 
861 	if (slot2)
862 		ExecClearTuple(slot2);
863 
864 	tuplesort_end(pertrans->sortstates[aggstate->current_set]);
865 	pertrans->sortstates[aggstate->current_set] = NULL;
866 
867 	/* restore previous slot, potentially in use for grouping sets */
868 	tmpcontext->ecxt_outertuple = save;
869 }
870 
871 /*
872  * Compute the final value of one aggregate.
873  *
874  * This function handles only one grouping set (already set in
875  * aggstate->current_set).
876  *
877  * The finalfunction will be run, and the result delivered, in the
878  * output-tuple context; caller's CurrentMemoryContext does not matter.
879  *
880  * The finalfn uses the state as set in the transno. This also might be
881  * being used by another aggregate function, so it's important that we do
882  * nothing destructive here.
883  */
884 static void
finalize_aggregate(AggState * aggstate,AggStatePerAgg peragg,AggStatePerGroup pergroupstate,Datum * resultVal,bool * resultIsNull)885 finalize_aggregate(AggState *aggstate,
886 				   AggStatePerAgg peragg,
887 				   AggStatePerGroup pergroupstate,
888 				   Datum *resultVal, bool *resultIsNull)
889 {
890 	FunctionCallInfoData fcinfo;
891 	bool		anynull = false;
892 	MemoryContext oldContext;
893 	int			i;
894 	ListCell   *lc;
895 	AggStatePerTrans pertrans = &aggstate->pertrans[peragg->transno];
896 
897 	oldContext = MemoryContextSwitchTo(aggstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
898 
899 	/*
900 	 * Evaluate any direct arguments.  We do this even if there's no finalfn
901 	 * (which is unlikely anyway), so that side-effects happen as expected.
902 	 * The direct arguments go into arg positions 1 and up, leaving position 0
903 	 * for the transition state value.
904 	 */
905 	i = 1;
906 	foreach(lc, peragg->aggdirectargs)
907 	{
908 		ExprState  *expr = (ExprState *) lfirst(lc);
909 
910 		fcinfo.arg[i] = ExecEvalExpr(expr,
911 									 aggstate->ss.ps.ps_ExprContext,
912 									 &fcinfo.argnull[i]);
913 		anynull |= fcinfo.argnull[i];
914 		i++;
915 	}
916 
917 	/*
918 	 * Apply the agg's finalfn if one is provided, else return transValue.
919 	 */
920 	if (OidIsValid(peragg->finalfn_oid))
921 	{
922 		int			numFinalArgs = peragg->numFinalArgs;
923 
924 		/* set up aggstate->curperagg for AggGetAggref() */
925 		aggstate->curperagg = peragg;
926 
927 		InitFunctionCallInfoData(fcinfo, &peragg->finalfn,
928 								 numFinalArgs,
929 								 pertrans->aggCollation,
930 								 (void *) aggstate, NULL);
931 
932 		/* Fill in the transition state value */
933 		fcinfo.arg[0] = MakeExpandedObjectReadOnly(pergroupstate->transValue,
934 												   pergroupstate->transValueIsNull,
935 												   pertrans->transtypeLen);
936 		fcinfo.argnull[0] = pergroupstate->transValueIsNull;
937 		anynull |= pergroupstate->transValueIsNull;
938 
939 		/* Fill any remaining argument positions with nulls */
940 		for (; i < numFinalArgs; i++)
941 		{
942 			fcinfo.arg[i] = (Datum) 0;
943 			fcinfo.argnull[i] = true;
944 			anynull = true;
945 		}
946 
947 		if (fcinfo.flinfo->fn_strict && anynull)
948 		{
949 			/* don't call a strict function with NULL inputs */
950 			*resultVal = (Datum) 0;
951 			*resultIsNull = true;
952 		}
953 		else
954 		{
955 			*resultVal = FunctionCallInvoke(&fcinfo);
956 			*resultIsNull = fcinfo.isnull;
957 		}
958 		aggstate->curperagg = NULL;
959 	}
960 	else
961 	{
962 		/* Don't need MakeExpandedObjectReadOnly; datumCopy will copy it */
963 		*resultVal = pergroupstate->transValue;
964 		*resultIsNull = pergroupstate->transValueIsNull;
965 	}
966 
967 	/*
968 	 * If result is pass-by-ref, make sure it is in the right context.
969 	 */
970 	if (!peragg->resulttypeByVal && !*resultIsNull &&
971 		!MemoryContextContains(CurrentMemoryContext,
972 							   DatumGetPointer(*resultVal)))
973 		*resultVal = datumCopy(*resultVal,
974 							   peragg->resulttypeByVal,
975 							   peragg->resulttypeLen);
976 
977 	MemoryContextSwitchTo(oldContext);
978 }
979 
980 /*
981  * Compute the output value of one partial aggregate.
982  *
983  * The serialization function will be run, and the result delivered, in the
984  * output-tuple context; caller's CurrentMemoryContext does not matter.
985  */
986 static void
finalize_partialaggregate(AggState * aggstate,AggStatePerAgg peragg,AggStatePerGroup pergroupstate,Datum * resultVal,bool * resultIsNull)987 finalize_partialaggregate(AggState *aggstate,
988 						  AggStatePerAgg peragg,
989 						  AggStatePerGroup pergroupstate,
990 						  Datum *resultVal, bool *resultIsNull)
991 {
992 	AggStatePerTrans pertrans = &aggstate->pertrans[peragg->transno];
993 	MemoryContext oldContext;
994 
995 	oldContext = MemoryContextSwitchTo(aggstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
996 
997 	/*
998 	 * serialfn_oid will be set if we must serialize the transvalue before
999 	 * returning it
1000 	 */
1001 	if (OidIsValid(pertrans->serialfn_oid))
1002 	{
1003 		/* Don't call a strict serialization function with NULL input. */
1004 		if (pertrans->serialfn.fn_strict && pergroupstate->transValueIsNull)
1005 		{
1006 			*resultVal = (Datum) 0;
1007 			*resultIsNull = true;
1008 		}
1009 		else
1010 		{
1011 			FunctionCallInfo fcinfo = &pertrans->serialfn_fcinfo;
1012 
1013 			fcinfo->arg[0] = MakeExpandedObjectReadOnly(pergroupstate->transValue,
1014 														pergroupstate->transValueIsNull,
1015 														pertrans->transtypeLen);
1016 			fcinfo->argnull[0] = pergroupstate->transValueIsNull;
1017 			fcinfo->isnull = false;
1018 
1019 			*resultVal = FunctionCallInvoke(fcinfo);
1020 			*resultIsNull = fcinfo->isnull;
1021 		}
1022 	}
1023 	else
1024 	{
1025 		/* Don't need MakeExpandedObjectReadOnly; datumCopy will copy it */
1026 		*resultVal = pergroupstate->transValue;
1027 		*resultIsNull = pergroupstate->transValueIsNull;
1028 	}
1029 
1030 	/* If result is pass-by-ref, make sure it is in the right context. */
1031 	if (!peragg->resulttypeByVal && !*resultIsNull &&
1032 		!MemoryContextContains(CurrentMemoryContext,
1033 							   DatumGetPointer(*resultVal)))
1034 		*resultVal = datumCopy(*resultVal,
1035 							   peragg->resulttypeByVal,
1036 							   peragg->resulttypeLen);
1037 
1038 	MemoryContextSwitchTo(oldContext);
1039 }
1040 
1041 /*
1042  * Prepare to finalize and project based on the specified representative tuple
1043  * slot and grouping set.
1044  *
1045  * In the specified tuple slot, force to null all attributes that should be
1046  * read as null in the context of the current grouping set.  Also stash the
1047  * current group bitmap where GroupingExpr can get at it.
1048  *
1049  * This relies on three conditions:
1050  *
1051  * 1) Nothing is ever going to try and extract the whole tuple from this slot,
1052  * only reference it in evaluations, which will only access individual
1053  * attributes.
1054  *
1055  * 2) No system columns are going to need to be nulled. (If a system column is
1056  * referenced in a group clause, it is actually projected in the outer plan
1057  * tlist.)
1058  *
1059  * 3) Within a given phase, we never need to recover the value of an attribute
1060  * once it has been set to null.
1061  *
1062  * Poking into the slot this way is a bit ugly, but the consensus is that the
1063  * alternative was worse.
1064  */
1065 static void
prepare_projection_slot(AggState * aggstate,TupleTableSlot * slot,int currentSet)1066 prepare_projection_slot(AggState *aggstate, TupleTableSlot *slot, int currentSet)
1067 {
1068 	if (aggstate->phase->grouped_cols)
1069 	{
1070 		Bitmapset  *grouped_cols = aggstate->phase->grouped_cols[currentSet];
1071 
1072 		aggstate->grouped_cols = grouped_cols;
1073 
1074 		if (slot->tts_isempty)
1075 		{
1076 			/*
1077 			 * Force all values to be NULL if working on an empty input tuple
1078 			 * (i.e. an empty grouping set for which no input rows were
1079 			 * supplied).
1080 			 */
1081 			ExecStoreAllNullTuple(slot);
1082 		}
1083 		else if (aggstate->all_grouped_cols)
1084 		{
1085 			ListCell   *lc;
1086 
1087 			/* all_grouped_cols is arranged in desc order */
1088 			slot_getsomeattrs(slot, linitial_int(aggstate->all_grouped_cols));
1089 
1090 			foreach(lc, aggstate->all_grouped_cols)
1091 			{
1092 				int			attnum = lfirst_int(lc);
1093 
1094 				if (!bms_is_member(attnum, grouped_cols))
1095 					slot->tts_isnull[attnum - 1] = true;
1096 			}
1097 		}
1098 	}
1099 }
1100 
1101 /*
1102  * Compute the final value of all aggregates for one group.
1103  *
1104  * This function handles only one grouping set at a time, which the caller must
1105  * have selected.  It's also the caller's responsibility to adjust the supplied
1106  * pergroup parameter to point to the current set's transvalues.
1107  *
1108  * Results are stored in the output econtext aggvalues/aggnulls.
1109  */
1110 static void
finalize_aggregates(AggState * aggstate,AggStatePerAgg peraggs,AggStatePerGroup pergroup)1111 finalize_aggregates(AggState *aggstate,
1112 					AggStatePerAgg peraggs,
1113 					AggStatePerGroup pergroup)
1114 {
1115 	ExprContext *econtext = aggstate->ss.ps.ps_ExprContext;
1116 	Datum	   *aggvalues = econtext->ecxt_aggvalues;
1117 	bool	   *aggnulls = econtext->ecxt_aggnulls;
1118 	int			aggno;
1119 	int			transno;
1120 
1121 	/*
1122 	 * If there were any DISTINCT and/or ORDER BY aggregates, sort their
1123 	 * inputs and run the transition functions.
1124 	 */
1125 	for (transno = 0; transno < aggstate->numtrans; transno++)
1126 	{
1127 		AggStatePerTrans pertrans = &aggstate->pertrans[transno];
1128 		AggStatePerGroup pergroupstate;
1129 
1130 		pergroupstate = &pergroup[transno];
1131 
1132 		if (pertrans->numSortCols > 0)
1133 		{
1134 			Assert(aggstate->aggstrategy != AGG_HASHED &&
1135 				   aggstate->aggstrategy != AGG_MIXED);
1136 
1137 			if (pertrans->numInputs == 1)
1138 				process_ordered_aggregate_single(aggstate,
1139 												 pertrans,
1140 												 pergroupstate);
1141 			else
1142 				process_ordered_aggregate_multi(aggstate,
1143 												pertrans,
1144 												pergroupstate);
1145 		}
1146 	}
1147 
1148 	/*
1149 	 * Run the final functions.
1150 	 */
1151 	for (aggno = 0; aggno < aggstate->numaggs; aggno++)
1152 	{
1153 		AggStatePerAgg peragg = &peraggs[aggno];
1154 		int			transno = peragg->transno;
1155 		AggStatePerGroup pergroupstate;
1156 
1157 		pergroupstate = &pergroup[transno];
1158 
1159 		if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit))
1160 			finalize_partialaggregate(aggstate, peragg, pergroupstate,
1161 									  &aggvalues[aggno], &aggnulls[aggno]);
1162 		else
1163 			finalize_aggregate(aggstate, peragg, pergroupstate,
1164 							   &aggvalues[aggno], &aggnulls[aggno]);
1165 	}
1166 }
1167 
1168 /*
1169  * Project the result of a group (whose aggs have already been calculated by
1170  * finalize_aggregates). Returns the result slot, or NULL if no row is
1171  * projected (suppressed by qual).
1172  */
1173 static TupleTableSlot *
project_aggregates(AggState * aggstate)1174 project_aggregates(AggState *aggstate)
1175 {
1176 	ExprContext *econtext = aggstate->ss.ps.ps_ExprContext;
1177 
1178 	/*
1179 	 * Check the qual (HAVING clause); if the group does not match, ignore it.
1180 	 */
1181 	if (ExecQual(aggstate->ss.ps.qual, econtext))
1182 	{
1183 		/*
1184 		 * Form and return projection tuple using the aggregate results and
1185 		 * the representative input tuple.
1186 		 */
1187 		return ExecProject(aggstate->ss.ps.ps_ProjInfo);
1188 	}
1189 	else
1190 		InstrCountFiltered1(aggstate, 1);
1191 
1192 	return NULL;
1193 }
1194 
1195 /*
1196  * find_unaggregated_cols
1197  *	  Construct a bitmapset of the column numbers of un-aggregated Vars
1198  *	  appearing in our targetlist and qual (HAVING clause)
1199  */
1200 static Bitmapset *
find_unaggregated_cols(AggState * aggstate)1201 find_unaggregated_cols(AggState *aggstate)
1202 {
1203 	Agg		   *node = (Agg *) aggstate->ss.ps.plan;
1204 	Bitmapset  *colnos;
1205 
1206 	colnos = NULL;
1207 	(void) find_unaggregated_cols_walker((Node *) node->plan.targetlist,
1208 										 &colnos);
1209 	(void) find_unaggregated_cols_walker((Node *) node->plan.qual,
1210 										 &colnos);
1211 	return colnos;
1212 }
1213 
1214 static bool
find_unaggregated_cols_walker(Node * node,Bitmapset ** colnos)1215 find_unaggregated_cols_walker(Node *node, Bitmapset **colnos)
1216 {
1217 	if (node == NULL)
1218 		return false;
1219 	if (IsA(node, Var))
1220 	{
1221 		Var		   *var = (Var *) node;
1222 
1223 		/* setrefs.c should have set the varno to OUTER_VAR */
1224 		Assert(var->varno == OUTER_VAR);
1225 		Assert(var->varlevelsup == 0);
1226 		*colnos = bms_add_member(*colnos, var->varattno);
1227 		return false;
1228 	}
1229 	if (IsA(node, Aggref) ||IsA(node, GroupingFunc))
1230 	{
1231 		/* do not descend into aggregate exprs */
1232 		return false;
1233 	}
1234 	return expression_tree_walker(node, find_unaggregated_cols_walker,
1235 								  (void *) colnos);
1236 }
1237 
1238 /*
1239  * (Re-)initialize the hash table(s) to empty.
1240  *
1241  * To implement hashed aggregation, we need a hashtable that stores a
1242  * representative tuple and an array of AggStatePerGroup structs for each
1243  * distinct set of GROUP BY column values.  We compute the hash key from the
1244  * GROUP BY columns.  The per-group data is allocated in lookup_hash_entry(),
1245  * for each entry.
1246  *
1247  * We have a separate hashtable and associated perhash data structure for each
1248  * grouping set for which we're doing hashing.
1249  *
1250  * The contents of the hash tables always live in the hashcontext's per-tuple
1251  * memory context (there is only one of these for all tables together, since
1252  * they are all reset at the same time).
1253  */
1254 static void
build_hash_table(AggState * aggstate)1255 build_hash_table(AggState *aggstate)
1256 {
1257 	MemoryContext tmpmem = aggstate->tmpcontext->ecxt_per_tuple_memory;
1258 	Size		additionalsize;
1259 	int			i;
1260 
1261 	Assert(aggstate->aggstrategy == AGG_HASHED || aggstate->aggstrategy == AGG_MIXED);
1262 
1263 	additionalsize = aggstate->numtrans * sizeof(AggStatePerGroupData);
1264 
1265 	for (i = 0; i < aggstate->num_hashes; ++i)
1266 	{
1267 		AggStatePerHash perhash = &aggstate->perhash[i];
1268 
1269 		Assert(perhash->aggnode->numGroups > 0);
1270 
1271 		if (perhash->hashtable)
1272 			ResetTupleHashTable(perhash->hashtable);
1273 		else
1274 			perhash->hashtable = BuildTupleHashTableExt(&aggstate->ss.ps,
1275 														perhash->hashslot->tts_tupleDescriptor,
1276 														perhash->numCols,
1277 														perhash->hashGrpColIdxHash,
1278 														perhash->eqfuncoids,
1279 														perhash->hashfunctions,
1280 														perhash->aggnode->numGroups,
1281 														additionalsize,
1282 														aggstate->ss.ps.state->es_query_cxt,
1283 														aggstate->hashcontext->ecxt_per_tuple_memory,
1284 														tmpmem,
1285 														DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit));
1286 	}
1287 }
1288 
1289 /*
1290  * Compute columns that actually need to be stored in hashtable entries.  The
1291  * incoming tuples from the child plan node will contain grouping columns,
1292  * other columns referenced in our targetlist and qual, columns used to
1293  * compute the aggregate functions, and perhaps just junk columns we don't use
1294  * at all.  Only columns of the first two types need to be stored in the
1295  * hashtable, and getting rid of the others can make the table entries
1296  * significantly smaller.  The hashtable only contains the relevant columns,
1297  * and is packed/unpacked in lookup_hash_entry() / agg_retrieve_hash_table()
1298  * into the format of the normal input descriptor.
1299  *
1300  * Additional columns, in addition to the columns grouped by, come from two
1301  * sources: Firstly functionally dependent columns that we don't need to group
1302  * by themselves, and secondly ctids for row-marks.
1303  *
1304  * To eliminate duplicates, we build a bitmapset of the needed columns, and
1305  * then build an array of the columns included in the hashtable. We might
1306  * still have duplicates if the passed-in grpColIdx has them, which can happen
1307  * in edge cases from semijoins/distinct; these can't always be removed,
1308  * because it's not certain that the duplicate cols will be using the same
1309  * hash function.
1310  *
1311  * Note that the array is preserved over ExecReScanAgg, so we allocate it in
1312  * the per-query context (unlike the hash table itself).
1313  */
1314 static void
find_hash_columns(AggState * aggstate)1315 find_hash_columns(AggState *aggstate)
1316 {
1317 	Bitmapset  *base_colnos;
1318 	List	   *outerTlist = outerPlanState(aggstate)->plan->targetlist;
1319 	int			numHashes = aggstate->num_hashes;
1320 	EState	   *estate = aggstate->ss.ps.state;
1321 	int			j;
1322 
1323 	/* Find Vars that will be needed in tlist and qual */
1324 	base_colnos = find_unaggregated_cols(aggstate);
1325 
1326 	for (j = 0; j < numHashes; ++j)
1327 	{
1328 		AggStatePerHash perhash = &aggstate->perhash[j];
1329 		Bitmapset  *colnos = bms_copy(base_colnos);
1330 		AttrNumber *grpColIdx = perhash->aggnode->grpColIdx;
1331 		List	   *hashTlist = NIL;
1332 		TupleDesc	hashDesc;
1333 		int			maxCols;
1334 		int			i;
1335 
1336 		perhash->largestGrpColIdx = 0;
1337 
1338 		/*
1339 		 * If we're doing grouping sets, then some Vars might be referenced in
1340 		 * tlist/qual for the benefit of other grouping sets, but not needed
1341 		 * when hashing; i.e. prepare_projection_slot will null them out, so
1342 		 * there'd be no point storing them.  Use prepare_projection_slot's
1343 		 * logic to determine which.
1344 		 */
1345 		if (aggstate->phases[0].grouped_cols)
1346 		{
1347 			Bitmapset  *grouped_cols = aggstate->phases[0].grouped_cols[j];
1348 			ListCell   *lc;
1349 
1350 			foreach(lc, aggstate->all_grouped_cols)
1351 			{
1352 				int			attnum = lfirst_int(lc);
1353 
1354 				if (!bms_is_member(attnum, grouped_cols))
1355 					colnos = bms_del_member(colnos, attnum);
1356 			}
1357 		}
1358 
1359 		/*
1360 		 * Compute maximum number of input columns accounting for possible
1361 		 * duplications in the grpColIdx array, which can happen in some edge
1362 		 * cases where HashAggregate was generated as part of a semijoin or a
1363 		 * DISTINCT.
1364 		 */
1365 		maxCols = bms_num_members(colnos) + perhash->numCols;
1366 
1367 		perhash->hashGrpColIdxInput =
1368 			palloc(maxCols * sizeof(AttrNumber));
1369 		perhash->hashGrpColIdxHash =
1370 			palloc(perhash->numCols * sizeof(AttrNumber));
1371 
1372 		/* Add all the grouping columns to colnos */
1373 		for (i = 0; i < perhash->numCols; i++)
1374 			colnos = bms_add_member(colnos, grpColIdx[i]);
1375 
1376 		/*
1377 		 * First build mapping for columns directly hashed. These are the
1378 		 * first, because they'll be accessed when computing hash values and
1379 		 * comparing tuples for exact matches. We also build simple mapping
1380 		 * for execGrouping, so it knows where to find the to-be-hashed /
1381 		 * compared columns in the input.
1382 		 */
1383 		for (i = 0; i < perhash->numCols; i++)
1384 		{
1385 			perhash->hashGrpColIdxInput[i] = grpColIdx[i];
1386 			perhash->hashGrpColIdxHash[i] = i + 1;
1387 			perhash->numhashGrpCols++;
1388 			/* delete already mapped columns */
1389 			bms_del_member(colnos, grpColIdx[i]);
1390 		}
1391 
1392 		/* and add the remaining columns */
1393 		while ((i = bms_first_member(colnos)) >= 0)
1394 		{
1395 			perhash->hashGrpColIdxInput[perhash->numhashGrpCols] = i;
1396 			perhash->numhashGrpCols++;
1397 		}
1398 
1399 		/* and build a tuple descriptor for the hashtable */
1400 		for (i = 0; i < perhash->numhashGrpCols; i++)
1401 		{
1402 			int			varNumber = perhash->hashGrpColIdxInput[i] - 1;
1403 
1404 			hashTlist = lappend(hashTlist, list_nth(outerTlist, varNumber));
1405 			perhash->largestGrpColIdx =
1406 				Max(varNumber + 1, perhash->largestGrpColIdx);
1407 		}
1408 
1409 		hashDesc = ExecTypeFromTL(hashTlist, false);
1410 
1411 		execTuplesHashPrepare(perhash->numCols,
1412 							  perhash->aggnode->grpOperators,
1413 							  &perhash->eqfuncoids,
1414 							  &perhash->hashfunctions);
1415 		perhash->hashslot =
1416 			ExecAllocTableSlot(&estate->es_tupleTable, hashDesc);
1417 
1418 		list_free(hashTlist);
1419 		bms_free(colnos);
1420 	}
1421 
1422 	bms_free(base_colnos);
1423 }
1424 
1425 /*
1426  * Estimate per-hash-table-entry overhead for the planner.
1427  *
1428  * Note that the estimate does not include space for pass-by-reference
1429  * transition data values, nor for the representative tuple of each group.
1430  * Nor does this account of the target fill-factor and growth policy of the
1431  * hash table.
1432  */
1433 Size
hash_agg_entry_size(int numAggs)1434 hash_agg_entry_size(int numAggs)
1435 {
1436 	Size		entrysize;
1437 
1438 	/* This must match build_hash_table */
1439 	entrysize = sizeof(TupleHashEntryData) +
1440 		numAggs * sizeof(AggStatePerGroupData);
1441 	entrysize = MAXALIGN(entrysize);
1442 
1443 	return entrysize;
1444 }
1445 
1446 /*
1447  * Find or create a hashtable entry for the tuple group containing the current
1448  * tuple (already set in tmpcontext's outertuple slot), in the current grouping
1449  * set (which the caller must have selected - note that initialize_aggregate
1450  * depends on this).
1451  *
1452  * When called, CurrentMemoryContext should be the per-query context.
1453  */
1454 static TupleHashEntryData *
lookup_hash_entry(AggState * aggstate)1455 lookup_hash_entry(AggState *aggstate)
1456 {
1457 	TupleTableSlot *inputslot = aggstate->tmpcontext->ecxt_outertuple;
1458 	AggStatePerHash perhash = &aggstate->perhash[aggstate->current_set];
1459 	TupleTableSlot *hashslot = perhash->hashslot;
1460 	TupleHashEntryData *entry;
1461 	bool		isnew;
1462 	int			i;
1463 
1464 	/* transfer just the needed columns into hashslot */
1465 	slot_getsomeattrs(inputslot, perhash->largestGrpColIdx);
1466 	ExecClearTuple(hashslot);
1467 
1468 	for (i = 0; i < perhash->numhashGrpCols; i++)
1469 	{
1470 		int			varNumber = perhash->hashGrpColIdxInput[i] - 1;
1471 
1472 		hashslot->tts_values[i] = inputslot->tts_values[varNumber];
1473 		hashslot->tts_isnull[i] = inputslot->tts_isnull[varNumber];
1474 	}
1475 	ExecStoreVirtualTuple(hashslot);
1476 
1477 	/* find or create the hashtable entry using the filtered tuple */
1478 	entry = LookupTupleHashEntry(perhash->hashtable, hashslot, &isnew);
1479 
1480 	if (isnew)
1481 	{
1482 		AggStatePerGroup pergroup;
1483 		int			transno;
1484 
1485 		pergroup = (AggStatePerGroup)
1486 			MemoryContextAlloc(perhash->hashtable->tablecxt,
1487 							   sizeof(AggStatePerGroupData) * aggstate->numtrans);
1488 		entry->additional = pergroup;
1489 
1490 		/*
1491 		 * Initialize aggregates for new tuple group, lookup_hash_entries()
1492 		 * already has selected the relevant grouping set.
1493 		 */
1494 		for (transno = 0; transno < aggstate->numtrans; transno++)
1495 		{
1496 			AggStatePerTrans pertrans = &aggstate->pertrans[transno];
1497 			AggStatePerGroup pergroupstate = &pergroup[transno];
1498 
1499 			initialize_aggregate(aggstate, pertrans, pergroupstate);
1500 		}
1501 	}
1502 
1503 	return entry;
1504 }
1505 
1506 /*
1507  * Look up hash entries for the current tuple in all hashed grouping sets,
1508  * returning an array of pergroup pointers suitable for advance_aggregates.
1509  *
1510  * Be aware that lookup_hash_entry can reset the tmpcontext.
1511  */
1512 static void
lookup_hash_entries(AggState * aggstate)1513 lookup_hash_entries(AggState *aggstate)
1514 {
1515 	int			numHashes = aggstate->num_hashes;
1516 	AggStatePerGroup *pergroup = aggstate->hash_pergroup;
1517 	int			setno;
1518 
1519 	for (setno = 0; setno < numHashes; setno++)
1520 	{
1521 		select_current_set(aggstate, setno, true);
1522 		pergroup[setno] = lookup_hash_entry(aggstate)->additional;
1523 	}
1524 }
1525 
1526 /*
1527  * ExecAgg -
1528  *
1529  *	  ExecAgg receives tuples from its outer subplan and aggregates over
1530  *	  the appropriate attribute for each aggregate function use (Aggref
1531  *	  node) appearing in the targetlist or qual of the node.  The number
1532  *	  of tuples to aggregate over depends on whether grouped or plain
1533  *	  aggregation is selected.  In grouped aggregation, we produce a result
1534  *	  row for each group; in plain aggregation there's a single result row
1535  *	  for the whole query.  In either case, the value of each aggregate is
1536  *	  stored in the expression context to be used when ExecProject evaluates
1537  *	  the result tuple.
1538  */
1539 static TupleTableSlot *
ExecAgg(PlanState * pstate)1540 ExecAgg(PlanState *pstate)
1541 {
1542 	AggState   *node = castNode(AggState, pstate);
1543 	TupleTableSlot *result = NULL;
1544 
1545 	CHECK_FOR_INTERRUPTS();
1546 
1547 	if (!node->agg_done)
1548 	{
1549 		/* Dispatch based on strategy */
1550 		switch (node->phase->aggstrategy)
1551 		{
1552 			case AGG_HASHED:
1553 				if (!node->table_filled)
1554 					agg_fill_hash_table(node);
1555 				/* FALLTHROUGH */
1556 			case AGG_MIXED:
1557 				result = agg_retrieve_hash_table(node);
1558 				break;
1559 			case AGG_PLAIN:
1560 			case AGG_SORTED:
1561 				result = agg_retrieve_direct(node);
1562 				break;
1563 		}
1564 
1565 		if (!TupIsNull(result))
1566 			return result;
1567 	}
1568 
1569 	return NULL;
1570 }
1571 
1572 /*
1573  * ExecAgg for non-hashed case
1574  */
1575 static TupleTableSlot *
agg_retrieve_direct(AggState * aggstate)1576 agg_retrieve_direct(AggState *aggstate)
1577 {
1578 	Agg		   *node = aggstate->phase->aggnode;
1579 	ExprContext *econtext;
1580 	ExprContext *tmpcontext;
1581 	AggStatePerAgg peragg;
1582 	AggStatePerGroup *pergroups;
1583 	TupleTableSlot *outerslot;
1584 	TupleTableSlot *firstSlot;
1585 	TupleTableSlot *result;
1586 	bool		hasGroupingSets = aggstate->phase->numsets > 0;
1587 	int			numGroupingSets = Max(aggstate->phase->numsets, 1);
1588 	int			currentSet;
1589 	int			nextSetSize;
1590 	int			numReset;
1591 	int			i;
1592 
1593 	/*
1594 	 * get state info from node
1595 	 *
1596 	 * econtext is the per-output-tuple expression context
1597 	 *
1598 	 * tmpcontext is the per-input-tuple expression context
1599 	 */
1600 	econtext = aggstate->ss.ps.ps_ExprContext;
1601 	tmpcontext = aggstate->tmpcontext;
1602 
1603 	peragg = aggstate->peragg;
1604 	pergroups = aggstate->pergroups;
1605 	firstSlot = aggstate->ss.ss_ScanTupleSlot;
1606 
1607 	/*
1608 	 * We loop retrieving groups until we find one matching
1609 	 * aggstate->ss.ps.qual
1610 	 *
1611 	 * For grouping sets, we have the invariant that aggstate->projected_set
1612 	 * is either -1 (initial call) or the index (starting from 0) in
1613 	 * gset_lengths for the group we just completed (either by projecting a
1614 	 * row or by discarding it in the qual).
1615 	 */
1616 	while (!aggstate->agg_done)
1617 	{
1618 		/*
1619 		 * Clear the per-output-tuple context for each group, as well as
1620 		 * aggcontext (which contains any pass-by-ref transvalues of the old
1621 		 * group).  Some aggregate functions store working state in child
1622 		 * contexts; those now get reset automatically without us needing to
1623 		 * do anything special.
1624 		 *
1625 		 * We use ReScanExprContext not just ResetExprContext because we want
1626 		 * any registered shutdown callbacks to be called.  That allows
1627 		 * aggregate functions to ensure they've cleaned up any non-memory
1628 		 * resources.
1629 		 */
1630 		ReScanExprContext(econtext);
1631 
1632 		/*
1633 		 * Determine how many grouping sets need to be reset at this boundary.
1634 		 */
1635 		if (aggstate->projected_set >= 0 &&
1636 			aggstate->projected_set < numGroupingSets)
1637 			numReset = aggstate->projected_set + 1;
1638 		else
1639 			numReset = numGroupingSets;
1640 
1641 		/*
1642 		 * numReset can change on a phase boundary, but that's OK; we want to
1643 		 * reset the contexts used in _this_ phase, and later, after possibly
1644 		 * changing phase, initialize the right number of aggregates for the
1645 		 * _new_ phase.
1646 		 */
1647 
1648 		for (i = 0; i < numReset; i++)
1649 		{
1650 			ReScanExprContext(aggstate->aggcontexts[i]);
1651 		}
1652 
1653 		/*
1654 		 * Check if input is complete and there are no more groups to project
1655 		 * in this phase; move to next phase or mark as done.
1656 		 */
1657 		if (aggstate->input_done == true &&
1658 			aggstate->projected_set >= (numGroupingSets - 1))
1659 		{
1660 			if (aggstate->current_phase < aggstate->numphases - 1)
1661 			{
1662 				initialize_phase(aggstate, aggstate->current_phase + 1);
1663 				aggstate->input_done = false;
1664 				aggstate->projected_set = -1;
1665 				numGroupingSets = Max(aggstate->phase->numsets, 1);
1666 				node = aggstate->phase->aggnode;
1667 				numReset = numGroupingSets;
1668 			}
1669 			else if (aggstate->aggstrategy == AGG_MIXED)
1670 			{
1671 				/*
1672 				 * Mixed mode; we've output all the grouped stuff and have
1673 				 * full hashtables, so switch to outputting those.
1674 				 */
1675 				initialize_phase(aggstate, 0);
1676 				aggstate->table_filled = true;
1677 				ResetTupleHashIterator(aggstate->perhash[0].hashtable,
1678 									   &aggstate->perhash[0].hashiter);
1679 				select_current_set(aggstate, 0, true);
1680 				return agg_retrieve_hash_table(aggstate);
1681 			}
1682 			else
1683 			{
1684 				aggstate->agg_done = true;
1685 				break;
1686 			}
1687 		}
1688 
1689 		/*
1690 		 * Get the number of columns in the next grouping set after the last
1691 		 * projected one (if any). This is the number of columns to compare to
1692 		 * see if we reached the boundary of that set too.
1693 		 */
1694 		if (aggstate->projected_set >= 0 &&
1695 			aggstate->projected_set < (numGroupingSets - 1))
1696 			nextSetSize = aggstate->phase->gset_lengths[aggstate->projected_set + 1];
1697 		else
1698 			nextSetSize = 0;
1699 
1700 		/*----------
1701 		 * If a subgroup for the current grouping set is present, project it.
1702 		 *
1703 		 * We have a new group if:
1704 		 *	- we're out of input but haven't projected all grouping sets
1705 		 *	  (checked above)
1706 		 * OR
1707 		 *	  - we already projected a row that wasn't from the last grouping
1708 		 *		set
1709 		 *	  AND
1710 		 *	  - the next grouping set has at least one grouping column (since
1711 		 *		empty grouping sets project only once input is exhausted)
1712 		 *	  AND
1713 		 *	  - the previous and pending rows differ on the grouping columns
1714 		 *		of the next grouping set
1715 		 *----------
1716 		 */
1717 		tmpcontext->ecxt_innertuple = econtext->ecxt_outertuple;
1718 		if (aggstate->input_done ||
1719 			(node->aggstrategy != AGG_PLAIN &&
1720 			 aggstate->projected_set != -1 &&
1721 			 aggstate->projected_set < (numGroupingSets - 1) &&
1722 			 nextSetSize > 0 &&
1723 			 !ExecQualAndReset(aggstate->phase->eqfunctions[nextSetSize - 1],
1724 							   tmpcontext)))
1725 		{
1726 			aggstate->projected_set += 1;
1727 
1728 			Assert(aggstate->projected_set < numGroupingSets);
1729 			Assert(nextSetSize > 0 || aggstate->input_done);
1730 		}
1731 		else
1732 		{
1733 			/*
1734 			 * We no longer care what group we just projected, the next
1735 			 * projection will always be the first (or only) grouping set
1736 			 * (unless the input proves to be empty).
1737 			 */
1738 			aggstate->projected_set = 0;
1739 
1740 			/*
1741 			 * If we don't already have the first tuple of the new group,
1742 			 * fetch it from the outer plan.
1743 			 */
1744 			if (aggstate->grp_firstTuple == NULL)
1745 			{
1746 				outerslot = fetch_input_tuple(aggstate);
1747 				if (!TupIsNull(outerslot))
1748 				{
1749 					/*
1750 					 * Make a copy of the first input tuple; we will use this
1751 					 * for comparisons (in group mode) and for projection.
1752 					 */
1753 					aggstate->grp_firstTuple = ExecCopySlotTuple(outerslot);
1754 				}
1755 				else
1756 				{
1757 					/* outer plan produced no tuples at all */
1758 					if (hasGroupingSets)
1759 					{
1760 						/*
1761 						 * If there was no input at all, we need to project
1762 						 * rows only if there are grouping sets of size 0.
1763 						 * Note that this implies that there can't be any
1764 						 * references to ungrouped Vars, which would otherwise
1765 						 * cause issues with the empty output slot.
1766 						 *
1767 						 * XXX: This is no longer true, we currently deal with
1768 						 * this in finalize_aggregates().
1769 						 */
1770 						aggstate->input_done = true;
1771 
1772 						while (aggstate->phase->gset_lengths[aggstate->projected_set] > 0)
1773 						{
1774 							aggstate->projected_set += 1;
1775 							if (aggstate->projected_set >= numGroupingSets)
1776 							{
1777 								/*
1778 								 * We can't set agg_done here because we might
1779 								 * have more phases to do, even though the
1780 								 * input is empty. So we need to restart the
1781 								 * whole outer loop.
1782 								 */
1783 								break;
1784 							}
1785 						}
1786 
1787 						if (aggstate->projected_set >= numGroupingSets)
1788 							continue;
1789 					}
1790 					else
1791 					{
1792 						aggstate->agg_done = true;
1793 						/* If we are grouping, we should produce no tuples too */
1794 						if (node->aggstrategy != AGG_PLAIN)
1795 							return NULL;
1796 					}
1797 				}
1798 			}
1799 
1800 			/*
1801 			 * Initialize working state for a new input tuple group.
1802 			 */
1803 			initialize_aggregates(aggstate, pergroups, numReset);
1804 
1805 			if (aggstate->grp_firstTuple != NULL)
1806 			{
1807 				/*
1808 				 * Store the copied first input tuple in the tuple table slot
1809 				 * reserved for it.  The tuple will be deleted when it is
1810 				 * cleared from the slot.
1811 				 */
1812 				ExecStoreTuple(aggstate->grp_firstTuple,
1813 							   firstSlot,
1814 							   InvalidBuffer,
1815 							   true);
1816 				aggstate->grp_firstTuple = NULL;	/* don't keep two pointers */
1817 
1818 				/* set up for first advance_aggregates call */
1819 				tmpcontext->ecxt_outertuple = firstSlot;
1820 
1821 				/*
1822 				 * Process each outer-plan tuple, and then fetch the next one,
1823 				 * until we exhaust the outer plan or cross a group boundary.
1824 				 */
1825 				for (;;)
1826 				{
1827 					/*
1828 					 * During phase 1 only of a mixed agg, we need to update
1829 					 * hashtables as well in advance_aggregates.
1830 					 */
1831 					if (aggstate->aggstrategy == AGG_MIXED &&
1832 						aggstate->current_phase == 1)
1833 					{
1834 						lookup_hash_entries(aggstate);
1835 					}
1836 
1837 					/* Advance the aggregates (or combine functions) */
1838 					advance_aggregates(aggstate);
1839 
1840 					/* Reset per-input-tuple context after each tuple */
1841 					ResetExprContext(tmpcontext);
1842 
1843 					outerslot = fetch_input_tuple(aggstate);
1844 					if (TupIsNull(outerslot))
1845 					{
1846 						/* no more outer-plan tuples available */
1847 						if (hasGroupingSets)
1848 						{
1849 							aggstate->input_done = true;
1850 							break;
1851 						}
1852 						else
1853 						{
1854 							aggstate->agg_done = true;
1855 							break;
1856 						}
1857 					}
1858 					/* set up for next advance_aggregates call */
1859 					tmpcontext->ecxt_outertuple = outerslot;
1860 
1861 					/*
1862 					 * If we are grouping, check whether we've crossed a group
1863 					 * boundary.
1864 					 */
1865 					if (node->aggstrategy != AGG_PLAIN)
1866 					{
1867 						tmpcontext->ecxt_innertuple = firstSlot;
1868 						if (!ExecQual(aggstate->phase->eqfunctions[node->numCols - 1],
1869 									  tmpcontext))
1870 						{
1871 							aggstate->grp_firstTuple = ExecCopySlotTuple(outerslot);
1872 							break;
1873 						}
1874 					}
1875 				}
1876 			}
1877 
1878 			/*
1879 			 * Use the representative input tuple for any references to
1880 			 * non-aggregated input columns in aggregate direct args, the node
1881 			 * qual, and the tlist.  (If we are not grouping, and there are no
1882 			 * input rows at all, we will come here with an empty firstSlot
1883 			 * ... but if not grouping, there can't be any references to
1884 			 * non-aggregated input columns, so no problem.)
1885 			 */
1886 			econtext->ecxt_outertuple = firstSlot;
1887 		}
1888 
1889 		Assert(aggstate->projected_set >= 0);
1890 
1891 		currentSet = aggstate->projected_set;
1892 
1893 		prepare_projection_slot(aggstate, econtext->ecxt_outertuple, currentSet);
1894 
1895 		select_current_set(aggstate, currentSet, false);
1896 
1897 		finalize_aggregates(aggstate,
1898 							peragg,
1899 							pergroups[currentSet]);
1900 
1901 		/*
1902 		 * If there's no row to project right now, we must continue rather
1903 		 * than returning a null since there might be more groups.
1904 		 */
1905 		result = project_aggregates(aggstate);
1906 		if (result)
1907 			return result;
1908 	}
1909 
1910 	/* No more groups */
1911 	return NULL;
1912 }
1913 
1914 /*
1915  * ExecAgg for hashed case: read input and build hash table
1916  */
1917 static void
agg_fill_hash_table(AggState * aggstate)1918 agg_fill_hash_table(AggState *aggstate)
1919 {
1920 	TupleTableSlot *outerslot;
1921 	ExprContext *tmpcontext = aggstate->tmpcontext;
1922 
1923 	/*
1924 	 * Process each outer-plan tuple, and then fetch the next one, until we
1925 	 * exhaust the outer plan.
1926 	 */
1927 	for (;;)
1928 	{
1929 		outerslot = fetch_input_tuple(aggstate);
1930 		if (TupIsNull(outerslot))
1931 			break;
1932 
1933 		/* set up for lookup_hash_entries and advance_aggregates */
1934 		tmpcontext->ecxt_outertuple = outerslot;
1935 
1936 		/* Find or build hashtable entries */
1937 		lookup_hash_entries(aggstate);
1938 
1939 		/* Advance the aggregates (or combine functions) */
1940 		advance_aggregates(aggstate);
1941 
1942 		/*
1943 		 * Reset per-input-tuple context after each tuple, but note that the
1944 		 * hash lookups do this too
1945 		 */
1946 		ResetExprContext(aggstate->tmpcontext);
1947 	}
1948 
1949 	aggstate->table_filled = true;
1950 	/* Initialize to walk the first hash table */
1951 	select_current_set(aggstate, 0, true);
1952 	ResetTupleHashIterator(aggstate->perhash[0].hashtable,
1953 						   &aggstate->perhash[0].hashiter);
1954 }
1955 
1956 /*
1957  * ExecAgg for hashed case: retrieving groups from hash table
1958  */
1959 static TupleTableSlot *
agg_retrieve_hash_table(AggState * aggstate)1960 agg_retrieve_hash_table(AggState *aggstate)
1961 {
1962 	ExprContext *econtext;
1963 	AggStatePerAgg peragg;
1964 	AggStatePerGroup pergroup;
1965 	TupleHashEntryData *entry;
1966 	TupleTableSlot *firstSlot;
1967 	TupleTableSlot *result;
1968 	AggStatePerHash perhash;
1969 
1970 	/*
1971 	 * get state info from node.
1972 	 *
1973 	 * econtext is the per-output-tuple expression context.
1974 	 */
1975 	econtext = aggstate->ss.ps.ps_ExprContext;
1976 	peragg = aggstate->peragg;
1977 	firstSlot = aggstate->ss.ss_ScanTupleSlot;
1978 
1979 	/*
1980 	 * Note that perhash (and therefore anything accessed through it) can
1981 	 * change inside the loop, as we change between grouping sets.
1982 	 */
1983 	perhash = &aggstate->perhash[aggstate->current_set];
1984 
1985 	/*
1986 	 * We loop retrieving groups until we find one satisfying
1987 	 * aggstate->ss.ps.qual
1988 	 */
1989 	while (!aggstate->agg_done)
1990 	{
1991 		TupleTableSlot *hashslot = perhash->hashslot;
1992 		int			i;
1993 
1994 		CHECK_FOR_INTERRUPTS();
1995 
1996 		/*
1997 		 * Find the next entry in the hash table
1998 		 */
1999 		entry = ScanTupleHashTable(perhash->hashtable, &perhash->hashiter);
2000 		if (entry == NULL)
2001 		{
2002 			int			nextset = aggstate->current_set + 1;
2003 
2004 			if (nextset < aggstate->num_hashes)
2005 			{
2006 				/*
2007 				 * Switch to next grouping set, reinitialize, and restart the
2008 				 * loop.
2009 				 */
2010 				select_current_set(aggstate, nextset, true);
2011 
2012 				perhash = &aggstate->perhash[aggstate->current_set];
2013 
2014 				ResetTupleHashIterator(perhash->hashtable, &perhash->hashiter);
2015 
2016 				continue;
2017 			}
2018 			else
2019 			{
2020 				/* No more hashtables, so done */
2021 				aggstate->agg_done = true;
2022 				return NULL;
2023 			}
2024 		}
2025 
2026 		/*
2027 		 * Clear the per-output-tuple context for each group
2028 		 *
2029 		 * We intentionally don't use ReScanExprContext here; if any aggs have
2030 		 * registered shutdown callbacks, they mustn't be called yet, since we
2031 		 * might not be done with that agg.
2032 		 */
2033 		ResetExprContext(econtext);
2034 
2035 		/*
2036 		 * Transform representative tuple back into one with the right
2037 		 * columns.
2038 		 */
2039 		ExecStoreMinimalTuple(entry->firstTuple, hashslot, false);
2040 		slot_getallattrs(hashslot);
2041 
2042 		ExecClearTuple(firstSlot);
2043 		memset(firstSlot->tts_isnull, true,
2044 			   firstSlot->tts_tupleDescriptor->natts * sizeof(bool));
2045 
2046 		for (i = 0; i < perhash->numhashGrpCols; i++)
2047 		{
2048 			int			varNumber = perhash->hashGrpColIdxInput[i] - 1;
2049 
2050 			firstSlot->tts_values[varNumber] = hashslot->tts_values[i];
2051 			firstSlot->tts_isnull[varNumber] = hashslot->tts_isnull[i];
2052 		}
2053 		ExecStoreVirtualTuple(firstSlot);
2054 
2055 		pergroup = (AggStatePerGroup) entry->additional;
2056 
2057 		/*
2058 		 * Use the representative input tuple for any references to
2059 		 * non-aggregated input columns in the qual and tlist.
2060 		 */
2061 		econtext->ecxt_outertuple = firstSlot;
2062 
2063 		prepare_projection_slot(aggstate,
2064 								econtext->ecxt_outertuple,
2065 								aggstate->current_set);
2066 
2067 		finalize_aggregates(aggstate, peragg, pergroup);
2068 
2069 		result = project_aggregates(aggstate);
2070 		if (result)
2071 			return result;
2072 	}
2073 
2074 	/* No more groups */
2075 	return NULL;
2076 }
2077 
2078 /* -----------------
2079  * ExecInitAgg
2080  *
2081  *	Creates the run-time information for the agg node produced by the
2082  *	planner and initializes its outer subtree.
2083  *
2084  * -----------------
2085  */
2086 AggState *
ExecInitAgg(Agg * node,EState * estate,int eflags)2087 ExecInitAgg(Agg *node, EState *estate, int eflags)
2088 {
2089 	AggState   *aggstate;
2090 	AggStatePerAgg peraggs;
2091 	AggStatePerTrans pertransstates;
2092 	AggStatePerGroup *pergroups;
2093 	Plan	   *outerPlan;
2094 	ExprContext *econtext;
2095 	TupleDesc	scanDesc;
2096 	int			numaggs,
2097 				transno,
2098 				aggno;
2099 	int			phase;
2100 	int			phaseidx;
2101 	ListCell   *l;
2102 	Bitmapset  *all_grouped_cols = NULL;
2103 	int			numGroupingSets = 1;
2104 	int			numPhases;
2105 	int			numHashes;
2106 	int			i = 0;
2107 	int			j = 0;
2108 	bool		use_hashing = (node->aggstrategy == AGG_HASHED ||
2109 							   node->aggstrategy == AGG_MIXED);
2110 
2111 	/* check for unsupported flags */
2112 	Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
2113 
2114 	/*
2115 	 * create state structure
2116 	 */
2117 	aggstate = makeNode(AggState);
2118 	aggstate->ss.ps.plan = (Plan *) node;
2119 	aggstate->ss.ps.state = estate;
2120 	aggstate->ss.ps.ExecProcNode = ExecAgg;
2121 
2122 	aggstate->aggs = NIL;
2123 	aggstate->numaggs = 0;
2124 	aggstate->numtrans = 0;
2125 	aggstate->aggstrategy = node->aggstrategy;
2126 	aggstate->aggsplit = node->aggsplit;
2127 	aggstate->maxsets = 0;
2128 	aggstate->projected_set = -1;
2129 	aggstate->current_set = 0;
2130 	aggstate->peragg = NULL;
2131 	aggstate->pertrans = NULL;
2132 	aggstate->curperagg = NULL;
2133 	aggstate->curpertrans = NULL;
2134 	aggstate->input_done = false;
2135 	aggstate->agg_done = false;
2136 	aggstate->pergroups = NULL;
2137 	aggstate->grp_firstTuple = NULL;
2138 	aggstate->sort_in = NULL;
2139 	aggstate->sort_out = NULL;
2140 
2141 	/*
2142 	 * phases[0] always exists, but is dummy in sorted/plain mode
2143 	 */
2144 	numPhases = (use_hashing ? 1 : 2);
2145 	numHashes = (use_hashing ? 1 : 0);
2146 
2147 	/*
2148 	 * Calculate the maximum number of grouping sets in any phase; this
2149 	 * determines the size of some allocations.  Also calculate the number of
2150 	 * phases, since all hashed/mixed nodes contribute to only a single phase.
2151 	 */
2152 	if (node->groupingSets)
2153 	{
2154 		numGroupingSets = list_length(node->groupingSets);
2155 
2156 		foreach(l, node->chain)
2157 		{
2158 			Agg		   *agg = lfirst(l);
2159 
2160 			numGroupingSets = Max(numGroupingSets,
2161 								  list_length(agg->groupingSets));
2162 
2163 			/*
2164 			 * additional AGG_HASHED aggs become part of phase 0, but all
2165 			 * others add an extra phase.
2166 			 */
2167 			if (agg->aggstrategy != AGG_HASHED)
2168 				++numPhases;
2169 			else
2170 				++numHashes;
2171 		}
2172 	}
2173 
2174 	aggstate->maxsets = numGroupingSets;
2175 	aggstate->numphases = numPhases;
2176 
2177 	aggstate->aggcontexts = (ExprContext **)
2178 		palloc0(sizeof(ExprContext *) * numGroupingSets);
2179 
2180 	/*
2181 	 * Create expression contexts.  We need three or more, one for
2182 	 * per-input-tuple processing, one for per-output-tuple processing, one
2183 	 * for all the hashtables, and one for each grouping set.  The per-tuple
2184 	 * memory context of the per-grouping-set ExprContexts (aggcontexts)
2185 	 * replaces the standalone memory context formerly used to hold transition
2186 	 * values.  We cheat a little by using ExecAssignExprContext() to build
2187 	 * all of them.
2188 	 *
2189 	 * NOTE: the details of what is stored in aggcontexts and what is stored
2190 	 * in the regular per-query memory context are driven by a simple
2191 	 * decision: we want to reset the aggcontext at group boundaries (if not
2192 	 * hashing) and in ExecReScanAgg to recover no-longer-wanted space.
2193 	 */
2194 	ExecAssignExprContext(estate, &aggstate->ss.ps);
2195 	aggstate->tmpcontext = aggstate->ss.ps.ps_ExprContext;
2196 
2197 	for (i = 0; i < numGroupingSets; ++i)
2198 	{
2199 		ExecAssignExprContext(estate, &aggstate->ss.ps);
2200 		aggstate->aggcontexts[i] = aggstate->ss.ps.ps_ExprContext;
2201 	}
2202 
2203 	if (use_hashing)
2204 	{
2205 		ExecAssignExprContext(estate, &aggstate->ss.ps);
2206 		aggstate->hashcontext = aggstate->ss.ps.ps_ExprContext;
2207 	}
2208 
2209 	ExecAssignExprContext(estate, &aggstate->ss.ps);
2210 
2211 	/*
2212 	 * Initialize child nodes.
2213 	 *
2214 	 * If we are doing a hashed aggregation then the child plan does not need
2215 	 * to handle REWIND efficiently; see ExecReScanAgg.
2216 	 */
2217 	if (node->aggstrategy == AGG_HASHED)
2218 		eflags &= ~EXEC_FLAG_REWIND;
2219 	outerPlan = outerPlan(node);
2220 	outerPlanState(aggstate) = ExecInitNode(outerPlan, estate, eflags);
2221 
2222 	/*
2223 	 * initialize source tuple type.
2224 	 */
2225 	ExecCreateScanSlotFromOuterPlan(estate, &aggstate->ss);
2226 	scanDesc = aggstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
2227 	if (node->chain)
2228 		aggstate->sort_slot = ExecInitExtraTupleSlot(estate, scanDesc);
2229 
2230 	/*
2231 	 * Initialize result type, slot and projection.
2232 	 */
2233 	ExecInitResultTupleSlotTL(estate, &aggstate->ss.ps);
2234 	ExecAssignProjectionInfo(&aggstate->ss.ps, NULL);
2235 
2236 	/*
2237 	 * initialize child expressions
2238 	 *
2239 	 * We expect the parser to have checked that no aggs contain other agg
2240 	 * calls in their arguments (and just to be sure, we verify it again while
2241 	 * initializing the plan node).  This would make no sense under SQL
2242 	 * semantics, and it's forbidden by the spec.  Because it is true, we
2243 	 * don't need to worry about evaluating the aggs in any particular order.
2244 	 *
2245 	 * Note: execExpr.c finds Aggrefs for us, and adds their AggrefExprState
2246 	 * nodes to aggstate->aggs.  Aggrefs in the qual are found here; Aggrefs
2247 	 * in the targetlist are found during ExecAssignProjectionInfo, below.
2248 	 */
2249 	aggstate->ss.ps.qual =
2250 		ExecInitQual(node->plan.qual, (PlanState *) aggstate);
2251 
2252 	/*
2253 	 * We should now have found all Aggrefs in the targetlist and quals.
2254 	 */
2255 	numaggs = aggstate->numaggs;
2256 	Assert(numaggs == list_length(aggstate->aggs));
2257 
2258 	/*
2259 	 * For each phase, prepare grouping set data and fmgr lookup data for
2260 	 * compare functions.  Accumulate all_grouped_cols in passing.
2261 	 */
2262 	aggstate->phases = palloc0(numPhases * sizeof(AggStatePerPhaseData));
2263 
2264 	aggstate->num_hashes = numHashes;
2265 	if (numHashes)
2266 	{
2267 		aggstate->perhash = palloc0(sizeof(AggStatePerHashData) * numHashes);
2268 		aggstate->phases[0].numsets = 0;
2269 		aggstate->phases[0].gset_lengths = palloc(numHashes * sizeof(int));
2270 		aggstate->phases[0].grouped_cols = palloc(numHashes * sizeof(Bitmapset *));
2271 	}
2272 
2273 	phase = 0;
2274 	for (phaseidx = 0; phaseidx <= list_length(node->chain); ++phaseidx)
2275 	{
2276 		Agg		   *aggnode;
2277 		Sort	   *sortnode;
2278 
2279 		if (phaseidx > 0)
2280 		{
2281 			aggnode = list_nth_node(Agg, node->chain, phaseidx - 1);
2282 			sortnode = castNode(Sort, aggnode->plan.lefttree);
2283 		}
2284 		else
2285 		{
2286 			aggnode = node;
2287 			sortnode = NULL;
2288 		}
2289 
2290 		Assert(phase <= 1 || sortnode);
2291 
2292 		if (aggnode->aggstrategy == AGG_HASHED
2293 			|| aggnode->aggstrategy == AGG_MIXED)
2294 		{
2295 			AggStatePerPhase phasedata = &aggstate->phases[0];
2296 			AggStatePerHash perhash;
2297 			Bitmapset  *cols = NULL;
2298 
2299 			Assert(phase == 0);
2300 			i = phasedata->numsets++;
2301 			perhash = &aggstate->perhash[i];
2302 
2303 			/* phase 0 always points to the "real" Agg in the hash case */
2304 			phasedata->aggnode = node;
2305 			phasedata->aggstrategy = node->aggstrategy;
2306 
2307 			/* but the actual Agg node representing this hash is saved here */
2308 			perhash->aggnode = aggnode;
2309 
2310 			phasedata->gset_lengths[i] = perhash->numCols = aggnode->numCols;
2311 
2312 			for (j = 0; j < aggnode->numCols; ++j)
2313 				cols = bms_add_member(cols, aggnode->grpColIdx[j]);
2314 
2315 			phasedata->grouped_cols[i] = cols;
2316 
2317 			all_grouped_cols = bms_add_members(all_grouped_cols, cols);
2318 			continue;
2319 		}
2320 		else
2321 		{
2322 			AggStatePerPhase phasedata = &aggstate->phases[++phase];
2323 			int			num_sets;
2324 
2325 			phasedata->numsets = num_sets = list_length(aggnode->groupingSets);
2326 
2327 			if (num_sets)
2328 			{
2329 				phasedata->gset_lengths = palloc(num_sets * sizeof(int));
2330 				phasedata->grouped_cols = palloc(num_sets * sizeof(Bitmapset *));
2331 
2332 				i = 0;
2333 				foreach(l, aggnode->groupingSets)
2334 				{
2335 					int			current_length = list_length(lfirst(l));
2336 					Bitmapset  *cols = NULL;
2337 
2338 					/* planner forces this to be correct */
2339 					for (j = 0; j < current_length; ++j)
2340 						cols = bms_add_member(cols, aggnode->grpColIdx[j]);
2341 
2342 					phasedata->grouped_cols[i] = cols;
2343 					phasedata->gset_lengths[i] = current_length;
2344 
2345 					++i;
2346 				}
2347 
2348 				all_grouped_cols = bms_add_members(all_grouped_cols,
2349 												   phasedata->grouped_cols[0]);
2350 			}
2351 			else
2352 			{
2353 				Assert(phaseidx == 0);
2354 
2355 				phasedata->gset_lengths = NULL;
2356 				phasedata->grouped_cols = NULL;
2357 			}
2358 
2359 			/*
2360 			 * If we are grouping, precompute fmgr lookup data for inner loop.
2361 			 */
2362 			if (aggnode->aggstrategy == AGG_SORTED)
2363 			{
2364 				int			i = 0;
2365 
2366 				Assert(aggnode->numCols > 0);
2367 
2368 				/*
2369 				 * Build a separate function for each subset of columns that
2370 				 * need to be compared.
2371 				 */
2372 				phasedata->eqfunctions =
2373 					(ExprState **) palloc0(aggnode->numCols * sizeof(ExprState *));
2374 
2375 				/* for each grouping set */
2376 				for (i = 0; i < phasedata->numsets; i++)
2377 				{
2378 					int			length = phasedata->gset_lengths[i];
2379 
2380 					if (phasedata->eqfunctions[length - 1] != NULL)
2381 						continue;
2382 
2383 					phasedata->eqfunctions[length - 1] =
2384 						execTuplesMatchPrepare(scanDesc,
2385 											   length,
2386 											   aggnode->grpColIdx,
2387 											   aggnode->grpOperators,
2388 											   (PlanState *) aggstate);
2389 				}
2390 
2391 				/* and for all grouped columns, unless already computed */
2392 				if (phasedata->eqfunctions[aggnode->numCols - 1] == NULL)
2393 				{
2394 					phasedata->eqfunctions[aggnode->numCols - 1] =
2395 						execTuplesMatchPrepare(scanDesc,
2396 											   aggnode->numCols,
2397 											   aggnode->grpColIdx,
2398 											   aggnode->grpOperators,
2399 											   (PlanState *) aggstate);
2400 				}
2401 			}
2402 
2403 			phasedata->aggnode = aggnode;
2404 			phasedata->aggstrategy = aggnode->aggstrategy;
2405 			phasedata->sortnode = sortnode;
2406 		}
2407 	}
2408 
2409 	/*
2410 	 * Convert all_grouped_cols to a descending-order list.
2411 	 */
2412 	i = -1;
2413 	while ((i = bms_next_member(all_grouped_cols, i)) >= 0)
2414 		aggstate->all_grouped_cols = lcons_int(i, aggstate->all_grouped_cols);
2415 
2416 	/*
2417 	 * Set up aggregate-result storage in the output expr context, and also
2418 	 * allocate my private per-agg working storage
2419 	 */
2420 	econtext = aggstate->ss.ps.ps_ExprContext;
2421 	econtext->ecxt_aggvalues = (Datum *) palloc0(sizeof(Datum) * numaggs);
2422 	econtext->ecxt_aggnulls = (bool *) palloc0(sizeof(bool) * numaggs);
2423 
2424 	peraggs = (AggStatePerAgg) palloc0(sizeof(AggStatePerAggData) * numaggs);
2425 	pertransstates = (AggStatePerTrans) palloc0(sizeof(AggStatePerTransData) * numaggs);
2426 
2427 	aggstate->peragg = peraggs;
2428 	aggstate->pertrans = pertransstates;
2429 
2430 
2431 	aggstate->all_pergroups =
2432 		(AggStatePerGroup *) palloc0(sizeof(AggStatePerGroup)
2433 									 * (numGroupingSets + numHashes));
2434 	pergroups = aggstate->all_pergroups;
2435 
2436 	if (node->aggstrategy != AGG_HASHED)
2437 	{
2438 		for (i = 0; i < numGroupingSets; i++)
2439 		{
2440 			pergroups[i] = (AggStatePerGroup) palloc0(sizeof(AggStatePerGroupData)
2441 													  * numaggs);
2442 		}
2443 
2444 		aggstate->pergroups = pergroups;
2445 		pergroups += numGroupingSets;
2446 	}
2447 
2448 	/*
2449 	 * Hashing can only appear in the initial phase.
2450 	 */
2451 	if (use_hashing)
2452 	{
2453 		/* this is an array of pointers, not structures */
2454 		aggstate->hash_pergroup = pergroups;
2455 
2456 		find_hash_columns(aggstate);
2457 
2458 		/* Skip massive memory allocation if we are just doing EXPLAIN */
2459 		if (!(eflags & EXEC_FLAG_EXPLAIN_ONLY))
2460 			build_hash_table(aggstate);
2461 
2462 		aggstate->table_filled = false;
2463 	}
2464 
2465 	/*
2466 	 * Initialize current phase-dependent values to initial phase. The initial
2467 	 * phase is 1 (first sort pass) for all strategies that use sorting (if
2468 	 * hashing is being done too, then phase 0 is processed last); but if only
2469 	 * hashing is being done, then phase 0 is all there is.
2470 	 */
2471 	if (node->aggstrategy == AGG_HASHED)
2472 	{
2473 		aggstate->current_phase = 0;
2474 		initialize_phase(aggstate, 0);
2475 		select_current_set(aggstate, 0, true);
2476 	}
2477 	else
2478 	{
2479 		aggstate->current_phase = 1;
2480 		initialize_phase(aggstate, 1);
2481 		select_current_set(aggstate, 0, false);
2482 	}
2483 
2484 	/* -----------------
2485 	 * Perform lookups of aggregate function info, and initialize the
2486 	 * unchanging fields of the per-agg and per-trans data.
2487 	 *
2488 	 * We try to optimize by detecting duplicate aggregate functions so that
2489 	 * their state and final values are re-used, rather than needlessly being
2490 	 * re-calculated independently. We also detect aggregates that are not
2491 	 * the same, but which can share the same transition state.
2492 	 *
2493 	 * Scenarios:
2494 	 *
2495 	 * 1. Identical aggregate function calls appear in the query:
2496 	 *
2497 	 *	  SELECT SUM(x) FROM ... HAVING SUM(x) > 0
2498 	 *
2499 	 *	  Since these aggregates are identical, we only need to calculate
2500 	 *	  the value once. Both aggregates will share the same 'aggno' value.
2501 	 *
2502 	 * 2. Two different aggregate functions appear in the query, but the
2503 	 *	  aggregates have the same arguments, transition functions and
2504 	 *	  initial values (and, presumably, different final functions):
2505 	 *
2506 	 *	  SELECT AVG(x), STDDEV(x) FROM ...
2507 	 *
2508 	 *	  In this case we must create a new peragg for the varying aggregate,
2509 	 *	  and we need to call the final functions separately, but we need
2510 	 *	  only run the transition function once.  (This requires that the
2511 	 *	  final functions be nondestructive of the transition state, but
2512 	 *	  that's required anyway for other reasons.)
2513 	 *
2514 	 * For either of these optimizations to be valid, all aggregate properties
2515 	 * used in the transition phase must be the same, including any modifiers
2516 	 * such as ORDER BY, DISTINCT and FILTER, and the arguments mustn't
2517 	 * contain any volatile functions.
2518 	 * -----------------
2519 	 */
2520 	aggno = -1;
2521 	transno = -1;
2522 	foreach(l, aggstate->aggs)
2523 	{
2524 		AggrefExprState *aggrefstate = (AggrefExprState *) lfirst(l);
2525 		Aggref	   *aggref = aggrefstate->aggref;
2526 		AggStatePerAgg peragg;
2527 		AggStatePerTrans pertrans;
2528 		int			existing_aggno;
2529 		int			existing_transno;
2530 		List	   *same_input_transnos;
2531 		Oid			inputTypes[FUNC_MAX_ARGS];
2532 		int			numArguments;
2533 		int			numDirectArgs;
2534 		HeapTuple	aggTuple;
2535 		Form_pg_aggregate aggform;
2536 		AclResult	aclresult;
2537 		Oid			transfn_oid,
2538 					finalfn_oid;
2539 		bool		shareable;
2540 		Oid			serialfn_oid,
2541 					deserialfn_oid;
2542 		Expr	   *finalfnexpr;
2543 		Oid			aggtranstype;
2544 		Datum		textInitVal;
2545 		Datum		initValue;
2546 		bool		initValueIsNull;
2547 
2548 		/* Planner should have assigned aggregate to correct level */
2549 		Assert(aggref->agglevelsup == 0);
2550 		/* ... and the split mode should match */
2551 		Assert(aggref->aggsplit == aggstate->aggsplit);
2552 
2553 		/* 1. Check for already processed aggs which can be re-used */
2554 		existing_aggno = find_compatible_peragg(aggref, aggstate, aggno,
2555 												&same_input_transnos);
2556 		if (existing_aggno != -1)
2557 		{
2558 			/*
2559 			 * Existing compatible agg found. so just point the Aggref to the
2560 			 * same per-agg struct.
2561 			 */
2562 			aggrefstate->aggno = existing_aggno;
2563 			continue;
2564 		}
2565 
2566 		/* Mark Aggref state node with assigned index in the result array */
2567 		peragg = &peraggs[++aggno];
2568 		peragg->aggref = aggref;
2569 		aggrefstate->aggno = aggno;
2570 
2571 		/* Fetch the pg_aggregate row */
2572 		aggTuple = SearchSysCache1(AGGFNOID,
2573 								   ObjectIdGetDatum(aggref->aggfnoid));
2574 		if (!HeapTupleIsValid(aggTuple))
2575 			elog(ERROR, "cache lookup failed for aggregate %u",
2576 				 aggref->aggfnoid);
2577 		aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
2578 
2579 		/* Check permission to call aggregate function */
2580 		aclresult = pg_proc_aclcheck(aggref->aggfnoid, GetUserId(),
2581 									 ACL_EXECUTE);
2582 		if (aclresult != ACLCHECK_OK)
2583 			aclcheck_error(aclresult, OBJECT_AGGREGATE,
2584 						   get_func_name(aggref->aggfnoid));
2585 		InvokeFunctionExecuteHook(aggref->aggfnoid);
2586 
2587 		/* planner recorded transition state type in the Aggref itself */
2588 		aggtranstype = aggref->aggtranstype;
2589 		Assert(OidIsValid(aggtranstype));
2590 
2591 		/*
2592 		 * If this aggregation is performing state combines, then instead of
2593 		 * using the transition function, we'll use the combine function
2594 		 */
2595 		if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit))
2596 		{
2597 			transfn_oid = aggform->aggcombinefn;
2598 
2599 			/* If not set then the planner messed up */
2600 			if (!OidIsValid(transfn_oid))
2601 				elog(ERROR, "combinefn not set for aggregate function");
2602 		}
2603 		else
2604 			transfn_oid = aggform->aggtransfn;
2605 
2606 		/* Final function only required if we're finalizing the aggregates */
2607 		if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit))
2608 			peragg->finalfn_oid = finalfn_oid = InvalidOid;
2609 		else
2610 			peragg->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
2611 
2612 		/*
2613 		 * If finalfn is marked read-write, we can't share transition states;
2614 		 * but it is okay to share states for AGGMODIFY_SHAREABLE aggs.  Also,
2615 		 * if we're not executing the finalfn here, we can share regardless.
2616 		 */
2617 		shareable = (aggform->aggfinalmodify != AGGMODIFY_READ_WRITE) ||
2618 			(finalfn_oid == InvalidOid);
2619 		peragg->shareable = shareable;
2620 
2621 		serialfn_oid = InvalidOid;
2622 		deserialfn_oid = InvalidOid;
2623 
2624 		/*
2625 		 * Check if serialization/deserialization is required.  We only do it
2626 		 * for aggregates that have transtype INTERNAL.
2627 		 */
2628 		if (aggtranstype == INTERNALOID)
2629 		{
2630 			/*
2631 			 * The planner should only have generated a serialize agg node if
2632 			 * every aggregate with an INTERNAL state has a serialization
2633 			 * function.  Verify that.
2634 			 */
2635 			if (DO_AGGSPLIT_SERIALIZE(aggstate->aggsplit))
2636 			{
2637 				/* serialization only valid when not running finalfn */
2638 				Assert(DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit));
2639 
2640 				if (!OidIsValid(aggform->aggserialfn))
2641 					elog(ERROR, "serialfunc not provided for serialization aggregation");
2642 				serialfn_oid = aggform->aggserialfn;
2643 			}
2644 
2645 			/* Likewise for deserialization functions */
2646 			if (DO_AGGSPLIT_DESERIALIZE(aggstate->aggsplit))
2647 			{
2648 				/* deserialization only valid when combining states */
2649 				Assert(DO_AGGSPLIT_COMBINE(aggstate->aggsplit));
2650 
2651 				if (!OidIsValid(aggform->aggdeserialfn))
2652 					elog(ERROR, "deserialfunc not provided for deserialization aggregation");
2653 				deserialfn_oid = aggform->aggdeserialfn;
2654 			}
2655 		}
2656 
2657 		/* Check that aggregate owner has permission to call component fns */
2658 		{
2659 			HeapTuple	procTuple;
2660 			Oid			aggOwner;
2661 
2662 			procTuple = SearchSysCache1(PROCOID,
2663 										ObjectIdGetDatum(aggref->aggfnoid));
2664 			if (!HeapTupleIsValid(procTuple))
2665 				elog(ERROR, "cache lookup failed for function %u",
2666 					 aggref->aggfnoid);
2667 			aggOwner = ((Form_pg_proc) GETSTRUCT(procTuple))->proowner;
2668 			ReleaseSysCache(procTuple);
2669 
2670 			aclresult = pg_proc_aclcheck(transfn_oid, aggOwner,
2671 										 ACL_EXECUTE);
2672 			if (aclresult != ACLCHECK_OK)
2673 				aclcheck_error(aclresult, OBJECT_FUNCTION,
2674 							   get_func_name(transfn_oid));
2675 			InvokeFunctionExecuteHook(transfn_oid);
2676 			if (OidIsValid(finalfn_oid))
2677 			{
2678 				aclresult = pg_proc_aclcheck(finalfn_oid, aggOwner,
2679 											 ACL_EXECUTE);
2680 				if (aclresult != ACLCHECK_OK)
2681 					aclcheck_error(aclresult, OBJECT_FUNCTION,
2682 								   get_func_name(finalfn_oid));
2683 				InvokeFunctionExecuteHook(finalfn_oid);
2684 			}
2685 			if (OidIsValid(serialfn_oid))
2686 			{
2687 				aclresult = pg_proc_aclcheck(serialfn_oid, aggOwner,
2688 											 ACL_EXECUTE);
2689 				if (aclresult != ACLCHECK_OK)
2690 					aclcheck_error(aclresult, OBJECT_FUNCTION,
2691 								   get_func_name(serialfn_oid));
2692 				InvokeFunctionExecuteHook(serialfn_oid);
2693 			}
2694 			if (OidIsValid(deserialfn_oid))
2695 			{
2696 				aclresult = pg_proc_aclcheck(deserialfn_oid, aggOwner,
2697 											 ACL_EXECUTE);
2698 				if (aclresult != ACLCHECK_OK)
2699 					aclcheck_error(aclresult, OBJECT_FUNCTION,
2700 								   get_func_name(deserialfn_oid));
2701 				InvokeFunctionExecuteHook(deserialfn_oid);
2702 			}
2703 		}
2704 
2705 		/*
2706 		 * Get actual datatypes of the (nominal) aggregate inputs.  These
2707 		 * could be different from the agg's declared input types, when the
2708 		 * agg accepts ANY or a polymorphic type.
2709 		 */
2710 		numArguments = get_aggregate_argtypes(aggref, inputTypes);
2711 
2712 		/* Count the "direct" arguments, if any */
2713 		numDirectArgs = list_length(aggref->aggdirectargs);
2714 
2715 		/* Detect how many arguments to pass to the finalfn */
2716 		if (aggform->aggfinalextra)
2717 			peragg->numFinalArgs = numArguments + 1;
2718 		else
2719 			peragg->numFinalArgs = numDirectArgs + 1;
2720 
2721 		/* Initialize any direct-argument expressions */
2722 		peragg->aggdirectargs = ExecInitExprList(aggref->aggdirectargs,
2723 												 (PlanState *) aggstate);
2724 
2725 		/*
2726 		 * build expression trees using actual argument & result types for the
2727 		 * finalfn, if it exists and is required.
2728 		 */
2729 		if (OidIsValid(finalfn_oid))
2730 		{
2731 			build_aggregate_finalfn_expr(inputTypes,
2732 										 peragg->numFinalArgs,
2733 										 aggtranstype,
2734 										 aggref->aggtype,
2735 										 aggref->inputcollid,
2736 										 finalfn_oid,
2737 										 &finalfnexpr);
2738 			fmgr_info(finalfn_oid, &peragg->finalfn);
2739 			fmgr_info_set_expr((Node *) finalfnexpr, &peragg->finalfn);
2740 		}
2741 
2742 		/* get info about the output value's datatype */
2743 		get_typlenbyval(aggref->aggtype,
2744 						&peragg->resulttypeLen,
2745 						&peragg->resulttypeByVal);
2746 
2747 		/*
2748 		 * initval is potentially null, so don't try to access it as a struct
2749 		 * field. Must do it the hard way with SysCacheGetAttr.
2750 		 */
2751 		textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple,
2752 									  Anum_pg_aggregate_agginitval,
2753 									  &initValueIsNull);
2754 		if (initValueIsNull)
2755 			initValue = (Datum) 0;
2756 		else
2757 			initValue = GetAggInitVal(textInitVal, aggtranstype);
2758 
2759 		/*
2760 		 * 2. Build working state for invoking the transition function, or
2761 		 * look up previously initialized working state, if we can share it.
2762 		 *
2763 		 * find_compatible_peragg() already collected a list of shareable
2764 		 * per-Trans's with the same inputs. Check if any of them have the
2765 		 * same transition function and initial value.
2766 		 */
2767 		existing_transno = find_compatible_pertrans(aggstate, aggref,
2768 													shareable,
2769 													transfn_oid, aggtranstype,
2770 													serialfn_oid, deserialfn_oid,
2771 													initValue, initValueIsNull,
2772 													same_input_transnos);
2773 		if (existing_transno != -1)
2774 		{
2775 			/*
2776 			 * Existing compatible trans found, so just point the 'peragg' to
2777 			 * the same per-trans struct, and mark the trans state as shared.
2778 			 */
2779 			pertrans = &pertransstates[existing_transno];
2780 			pertrans->aggshared = true;
2781 			peragg->transno = existing_transno;
2782 		}
2783 		else
2784 		{
2785 			pertrans = &pertransstates[++transno];
2786 			build_pertrans_for_aggref(pertrans, aggstate, estate,
2787 									  aggref, transfn_oid, aggtranstype,
2788 									  serialfn_oid, deserialfn_oid,
2789 									  initValue, initValueIsNull,
2790 									  inputTypes, numArguments);
2791 			peragg->transno = transno;
2792 		}
2793 		ReleaseSysCache(aggTuple);
2794 	}
2795 
2796 	/*
2797 	 * Update aggstate->numaggs to be the number of unique aggregates found.
2798 	 * Also set numstates to the number of unique transition states found.
2799 	 */
2800 	aggstate->numaggs = aggno + 1;
2801 	aggstate->numtrans = transno + 1;
2802 
2803 	/*
2804 	 * Last, check whether any more aggregates got added onto the node while
2805 	 * we processed the expressions for the aggregate arguments (including not
2806 	 * only the regular arguments and FILTER expressions handled immediately
2807 	 * above, but any direct arguments we might've handled earlier).  If so,
2808 	 * we have nested aggregate functions, which is semantically nonsensical,
2809 	 * so complain.  (This should have been caught by the parser, so we don't
2810 	 * need to work hard on a helpful error message; but we defend against it
2811 	 * here anyway, just to be sure.)
2812 	 */
2813 	if (numaggs != list_length(aggstate->aggs))
2814 		ereport(ERROR,
2815 				(errcode(ERRCODE_GROUPING_ERROR),
2816 				 errmsg("aggregate function calls cannot be nested")));
2817 
2818 	/*
2819 	 * Build expressions doing all the transition work at once. We build a
2820 	 * different one for each phase, as the number of transition function
2821 	 * invocation can differ between phases. Note this'll work both for
2822 	 * transition and combination functions (although there'll only be one
2823 	 * phase in the latter case).
2824 	 */
2825 	for (phaseidx = 0; phaseidx < aggstate->numphases; phaseidx++)
2826 	{
2827 		AggStatePerPhase phase = &aggstate->phases[phaseidx];
2828 		bool		dohash = false;
2829 		bool		dosort = false;
2830 
2831 		/* phase 0 doesn't necessarily exist */
2832 		if (!phase->aggnode)
2833 			continue;
2834 
2835 		if (aggstate->aggstrategy == AGG_MIXED && phaseidx == 1)
2836 		{
2837 			/*
2838 			 * Phase one, and only phase one, in a mixed agg performs both
2839 			 * sorting and aggregation.
2840 			 */
2841 			dohash = true;
2842 			dosort = true;
2843 		}
2844 		else if (aggstate->aggstrategy == AGG_MIXED && phaseidx == 0)
2845 		{
2846 			/*
2847 			 * No need to compute a transition function for an AGG_MIXED phase
2848 			 * 0 - the contents of the hashtables will have been computed
2849 			 * during phase 1.
2850 			 */
2851 			continue;
2852 		}
2853 		else if (phase->aggstrategy == AGG_PLAIN ||
2854 				 phase->aggstrategy == AGG_SORTED)
2855 		{
2856 			dohash = false;
2857 			dosort = true;
2858 		}
2859 		else if (phase->aggstrategy == AGG_HASHED)
2860 		{
2861 			dohash = true;
2862 			dosort = false;
2863 		}
2864 		else
2865 			Assert(false);
2866 
2867 		phase->evaltrans = ExecBuildAggTrans(aggstate, phase, dosort, dohash);
2868 
2869 	}
2870 
2871 	return aggstate;
2872 }
2873 
2874 /*
2875  * Build the state needed to calculate a state value for an aggregate.
2876  *
2877  * This initializes all the fields in 'pertrans'. 'aggref' is the aggregate
2878  * to initialize the state for. 'aggtransfn', 'aggtranstype', and the rest
2879  * of the arguments could be calculated from 'aggref', but the caller has
2880  * calculated them already, so might as well pass them.
2881  */
2882 static void
build_pertrans_for_aggref(AggStatePerTrans pertrans,AggState * aggstate,EState * estate,Aggref * aggref,Oid aggtransfn,Oid aggtranstype,Oid aggserialfn,Oid aggdeserialfn,Datum initValue,bool initValueIsNull,Oid * inputTypes,int numArguments)2883 build_pertrans_for_aggref(AggStatePerTrans pertrans,
2884 						  AggState *aggstate, EState *estate,
2885 						  Aggref *aggref,
2886 						  Oid aggtransfn, Oid aggtranstype,
2887 						  Oid aggserialfn, Oid aggdeserialfn,
2888 						  Datum initValue, bool initValueIsNull,
2889 						  Oid *inputTypes, int numArguments)
2890 {
2891 	int			numGroupingSets = Max(aggstate->maxsets, 1);
2892 	Expr	   *serialfnexpr = NULL;
2893 	Expr	   *deserialfnexpr = NULL;
2894 	ListCell   *lc;
2895 	int			numInputs;
2896 	int			numDirectArgs;
2897 	List	   *sortlist;
2898 	int			numSortCols;
2899 	int			numDistinctCols;
2900 	int			i;
2901 
2902 	/* Begin filling in the pertrans data */
2903 	pertrans->aggref = aggref;
2904 	pertrans->aggshared = false;
2905 	pertrans->aggCollation = aggref->inputcollid;
2906 	pertrans->transfn_oid = aggtransfn;
2907 	pertrans->serialfn_oid = aggserialfn;
2908 	pertrans->deserialfn_oid = aggdeserialfn;
2909 	pertrans->initValue = initValue;
2910 	pertrans->initValueIsNull = initValueIsNull;
2911 
2912 	/* Count the "direct" arguments, if any */
2913 	numDirectArgs = list_length(aggref->aggdirectargs);
2914 
2915 	/* Count the number of aggregated input columns */
2916 	pertrans->numInputs = numInputs = list_length(aggref->args);
2917 
2918 	pertrans->aggtranstype = aggtranstype;
2919 
2920 	/*
2921 	 * When combining states, we have no use at all for the aggregate
2922 	 * function's transfn. Instead we use the combinefn.  In this case, the
2923 	 * transfn and transfn_oid fields of pertrans refer to the combine
2924 	 * function rather than the transition function.
2925 	 */
2926 	if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit))
2927 	{
2928 		Expr	   *combinefnexpr;
2929 		size_t		numTransArgs;
2930 
2931 		/*
2932 		 * When combining there's only one input, the to-be-combined added
2933 		 * transition value from below (this node's transition value is
2934 		 * counted separately).
2935 		 */
2936 		pertrans->numTransInputs = 1;
2937 
2938 		/* account for the current transition state */
2939 		numTransArgs = pertrans->numTransInputs + 1;
2940 
2941 		build_aggregate_combinefn_expr(aggtranstype,
2942 									   aggref->inputcollid,
2943 									   aggtransfn,
2944 									   &combinefnexpr);
2945 		fmgr_info(aggtransfn, &pertrans->transfn);
2946 		fmgr_info_set_expr((Node *) combinefnexpr, &pertrans->transfn);
2947 
2948 		InitFunctionCallInfoData(pertrans->transfn_fcinfo,
2949 								 &pertrans->transfn,
2950 								 numTransArgs,
2951 								 pertrans->aggCollation,
2952 								 (void *) aggstate, NULL);
2953 
2954 		/*
2955 		 * Ensure that a combine function to combine INTERNAL states is not
2956 		 * strict. This should have been checked during CREATE AGGREGATE, but
2957 		 * the strict property could have been changed since then.
2958 		 */
2959 		if (pertrans->transfn.fn_strict && aggtranstype == INTERNALOID)
2960 			ereport(ERROR,
2961 					(errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
2962 					 errmsg("combine function with transition type %s must not be declared STRICT",
2963 							format_type_be(aggtranstype))));
2964 	}
2965 	else
2966 	{
2967 		Expr	   *transfnexpr;
2968 		size_t		numTransArgs;
2969 
2970 		/* Detect how many arguments to pass to the transfn */
2971 		if (AGGKIND_IS_ORDERED_SET(aggref->aggkind))
2972 			pertrans->numTransInputs = numInputs;
2973 		else
2974 			pertrans->numTransInputs = numArguments;
2975 
2976 		/* account for the current transition state */
2977 		numTransArgs = pertrans->numTransInputs + 1;
2978 
2979 		/*
2980 		 * Set up infrastructure for calling the transfn.  Note that invtrans
2981 		 * is not needed here.
2982 		 */
2983 		build_aggregate_transfn_expr(inputTypes,
2984 									 numArguments,
2985 									 numDirectArgs,
2986 									 aggref->aggvariadic,
2987 									 aggtranstype,
2988 									 aggref->inputcollid,
2989 									 aggtransfn,
2990 									 InvalidOid,
2991 									 &transfnexpr,
2992 									 NULL);
2993 		fmgr_info(aggtransfn, &pertrans->transfn);
2994 		fmgr_info_set_expr((Node *) transfnexpr, &pertrans->transfn);
2995 
2996 		InitFunctionCallInfoData(pertrans->transfn_fcinfo,
2997 								 &pertrans->transfn,
2998 								 numTransArgs,
2999 								 pertrans->aggCollation,
3000 								 (void *) aggstate, NULL);
3001 
3002 		/*
3003 		 * If the transfn is strict and the initval is NULL, make sure input
3004 		 * type and transtype are the same (or at least binary-compatible), so
3005 		 * that it's OK to use the first aggregated input value as the initial
3006 		 * transValue.  This should have been checked at agg definition time,
3007 		 * but we must check again in case the transfn's strictness property
3008 		 * has been changed.
3009 		 */
3010 		if (pertrans->transfn.fn_strict && pertrans->initValueIsNull)
3011 		{
3012 			if (numArguments <= numDirectArgs ||
3013 				!IsBinaryCoercible(inputTypes[numDirectArgs],
3014 								   aggtranstype))
3015 				ereport(ERROR,
3016 						(errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
3017 						 errmsg("aggregate %u needs to have compatible input type and transition type",
3018 								aggref->aggfnoid)));
3019 		}
3020 	}
3021 
3022 	/* get info about the state value's datatype */
3023 	get_typlenbyval(aggtranstype,
3024 					&pertrans->transtypeLen,
3025 					&pertrans->transtypeByVal);
3026 
3027 	if (OidIsValid(aggserialfn))
3028 	{
3029 		build_aggregate_serialfn_expr(aggserialfn,
3030 									  &serialfnexpr);
3031 		fmgr_info(aggserialfn, &pertrans->serialfn);
3032 		fmgr_info_set_expr((Node *) serialfnexpr, &pertrans->serialfn);
3033 
3034 		InitFunctionCallInfoData(pertrans->serialfn_fcinfo,
3035 								 &pertrans->serialfn,
3036 								 1,
3037 								 InvalidOid,
3038 								 (void *) aggstate, NULL);
3039 	}
3040 
3041 	if (OidIsValid(aggdeserialfn))
3042 	{
3043 		build_aggregate_deserialfn_expr(aggdeserialfn,
3044 										&deserialfnexpr);
3045 		fmgr_info(aggdeserialfn, &pertrans->deserialfn);
3046 		fmgr_info_set_expr((Node *) deserialfnexpr, &pertrans->deserialfn);
3047 
3048 		InitFunctionCallInfoData(pertrans->deserialfn_fcinfo,
3049 								 &pertrans->deserialfn,
3050 								 2,
3051 								 InvalidOid,
3052 								 (void *) aggstate, NULL);
3053 
3054 	}
3055 
3056 	/*
3057 	 * If we're doing either DISTINCT or ORDER BY for a plain agg, then we
3058 	 * have a list of SortGroupClause nodes; fish out the data in them and
3059 	 * stick them into arrays.  We ignore ORDER BY for an ordered-set agg,
3060 	 * however; the agg's transfn and finalfn are responsible for that.
3061 	 *
3062 	 * Note that by construction, if there is a DISTINCT clause then the ORDER
3063 	 * BY clause is a prefix of it (see transformDistinctClause).
3064 	 */
3065 	if (AGGKIND_IS_ORDERED_SET(aggref->aggkind))
3066 	{
3067 		sortlist = NIL;
3068 		numSortCols = numDistinctCols = 0;
3069 	}
3070 	else if (aggref->aggdistinct)
3071 	{
3072 		sortlist = aggref->aggdistinct;
3073 		numSortCols = numDistinctCols = list_length(sortlist);
3074 		Assert(numSortCols >= list_length(aggref->aggorder));
3075 	}
3076 	else
3077 	{
3078 		sortlist = aggref->aggorder;
3079 		numSortCols = list_length(sortlist);
3080 		numDistinctCols = 0;
3081 	}
3082 
3083 	pertrans->numSortCols = numSortCols;
3084 	pertrans->numDistinctCols = numDistinctCols;
3085 
3086 	/*
3087 	 * If we have either sorting or filtering to do, create a tupledesc and
3088 	 * slot corresponding to the aggregated inputs (including sort
3089 	 * expressions) of the agg.
3090 	 */
3091 	if (numSortCols > 0 || aggref->aggfilter)
3092 	{
3093 		pertrans->sortdesc = ExecTypeFromTL(aggref->args, false);
3094 		pertrans->sortslot =
3095 			ExecInitExtraTupleSlot(estate, pertrans->sortdesc);
3096 	}
3097 
3098 	if (numSortCols > 0)
3099 	{
3100 		/*
3101 		 * We don't implement DISTINCT or ORDER BY aggs in the HASHED case
3102 		 * (yet)
3103 		 */
3104 		Assert(aggstate->aggstrategy != AGG_HASHED && aggstate->aggstrategy != AGG_MIXED);
3105 
3106 		/* If we have only one input, we need its len/byval info. */
3107 		if (numInputs == 1)
3108 		{
3109 			get_typlenbyval(inputTypes[numDirectArgs],
3110 							&pertrans->inputtypeLen,
3111 							&pertrans->inputtypeByVal);
3112 		}
3113 		else if (numDistinctCols > 0)
3114 		{
3115 			/* we will need an extra slot to store prior values */
3116 			pertrans->uniqslot =
3117 				ExecInitExtraTupleSlot(estate, pertrans->sortdesc);
3118 		}
3119 
3120 		/* Extract the sort information for use later */
3121 		pertrans->sortColIdx =
3122 			(AttrNumber *) palloc(numSortCols * sizeof(AttrNumber));
3123 		pertrans->sortOperators =
3124 			(Oid *) palloc(numSortCols * sizeof(Oid));
3125 		pertrans->sortCollations =
3126 			(Oid *) palloc(numSortCols * sizeof(Oid));
3127 		pertrans->sortNullsFirst =
3128 			(bool *) palloc(numSortCols * sizeof(bool));
3129 
3130 		i = 0;
3131 		foreach(lc, sortlist)
3132 		{
3133 			SortGroupClause *sortcl = (SortGroupClause *) lfirst(lc);
3134 			TargetEntry *tle = get_sortgroupclause_tle(sortcl, aggref->args);
3135 
3136 			/* the parser should have made sure of this */
3137 			Assert(OidIsValid(sortcl->sortop));
3138 
3139 			pertrans->sortColIdx[i] = tle->resno;
3140 			pertrans->sortOperators[i] = sortcl->sortop;
3141 			pertrans->sortCollations[i] = exprCollation((Node *) tle->expr);
3142 			pertrans->sortNullsFirst[i] = sortcl->nulls_first;
3143 			i++;
3144 		}
3145 		Assert(i == numSortCols);
3146 	}
3147 
3148 	if (aggref->aggdistinct)
3149 	{
3150 		Oid		   *ops;
3151 
3152 		Assert(numArguments > 0);
3153 		Assert(list_length(aggref->aggdistinct) == numDistinctCols);
3154 
3155 		ops = palloc(numDistinctCols * sizeof(Oid));
3156 
3157 		i = 0;
3158 		foreach(lc, aggref->aggdistinct)
3159 			ops[i++] = ((SortGroupClause *) lfirst(lc))->eqop;
3160 
3161 		/* lookup / build the necessary comparators */
3162 		if (numDistinctCols == 1)
3163 			fmgr_info(get_opcode(ops[0]), &pertrans->equalfnOne);
3164 		else
3165 			pertrans->equalfnMulti =
3166 				execTuplesMatchPrepare(pertrans->sortdesc,
3167 									   numDistinctCols,
3168 									   pertrans->sortColIdx,
3169 									   ops,
3170 									   &aggstate->ss.ps);
3171 		pfree(ops);
3172 	}
3173 
3174 	pertrans->sortstates = (Tuplesortstate **)
3175 		palloc0(sizeof(Tuplesortstate *) * numGroupingSets);
3176 }
3177 
3178 
3179 static Datum
GetAggInitVal(Datum textInitVal,Oid transtype)3180 GetAggInitVal(Datum textInitVal, Oid transtype)
3181 {
3182 	Oid			typinput,
3183 				typioparam;
3184 	char	   *strInitVal;
3185 	Datum		initVal;
3186 
3187 	getTypeInputInfo(transtype, &typinput, &typioparam);
3188 	strInitVal = TextDatumGetCString(textInitVal);
3189 	initVal = OidInputFunctionCall(typinput, strInitVal,
3190 								   typioparam, -1);
3191 	pfree(strInitVal);
3192 	return initVal;
3193 }
3194 
3195 /*
3196  * find_compatible_peragg - search for a previously initialized per-Agg struct
3197  *
3198  * Searches the previously looked at aggregates to find one which is compatible
3199  * with this one, with the same input parameters. If no compatible aggregate
3200  * can be found, returns -1.
3201  *
3202  * As a side-effect, this also collects a list of existing, shareable per-Trans
3203  * structs with matching inputs. If no identical Aggref is found, the list is
3204  * passed later to find_compatible_pertrans, to see if we can at least reuse
3205  * the state value of another aggregate.
3206  */
3207 static int
find_compatible_peragg(Aggref * newagg,AggState * aggstate,int lastaggno,List ** same_input_transnos)3208 find_compatible_peragg(Aggref *newagg, AggState *aggstate,
3209 					   int lastaggno, List **same_input_transnos)
3210 {
3211 	int			aggno;
3212 	AggStatePerAgg peraggs;
3213 
3214 	*same_input_transnos = NIL;
3215 
3216 	/* we mustn't reuse the aggref if it contains volatile function calls */
3217 	if (contain_volatile_functions((Node *) newagg))
3218 		return -1;
3219 
3220 	peraggs = aggstate->peragg;
3221 
3222 	/*
3223 	 * Search through the list of already seen aggregates. If we find an
3224 	 * existing identical aggregate call, then we can re-use that one. While
3225 	 * searching, we'll also collect a list of Aggrefs with the same input
3226 	 * parameters. If no matching Aggref is found, the caller can potentially
3227 	 * still re-use the transition state of one of them.  (At this stage we
3228 	 * just compare the parsetrees; whether different aggregates share the
3229 	 * same transition function will be checked later.)
3230 	 */
3231 	for (aggno = 0; aggno <= lastaggno; aggno++)
3232 	{
3233 		AggStatePerAgg peragg;
3234 		Aggref	   *existingRef;
3235 
3236 		peragg = &peraggs[aggno];
3237 		existingRef = peragg->aggref;
3238 
3239 		/* all of the following must be the same or it's no match */
3240 		if (newagg->inputcollid != existingRef->inputcollid ||
3241 			newagg->aggtranstype != existingRef->aggtranstype ||
3242 			newagg->aggstar != existingRef->aggstar ||
3243 			newagg->aggvariadic != existingRef->aggvariadic ||
3244 			newagg->aggkind != existingRef->aggkind ||
3245 			!equal(newagg->args, existingRef->args) ||
3246 			!equal(newagg->aggorder, existingRef->aggorder) ||
3247 			!equal(newagg->aggdistinct, existingRef->aggdistinct) ||
3248 			!equal(newagg->aggfilter, existingRef->aggfilter))
3249 			continue;
3250 
3251 		/* if it's the same aggregate function then report exact match */
3252 		if (newagg->aggfnoid == existingRef->aggfnoid &&
3253 			newagg->aggtype == existingRef->aggtype &&
3254 			newagg->aggcollid == existingRef->aggcollid &&
3255 			equal(newagg->aggdirectargs, existingRef->aggdirectargs))
3256 		{
3257 			list_free(*same_input_transnos);
3258 			*same_input_transnos = NIL;
3259 			return aggno;
3260 		}
3261 
3262 		/*
3263 		 * Not identical, but it had the same inputs.  If the final function
3264 		 * permits sharing, return its transno to the caller, in case we can
3265 		 * re-use its per-trans state.  (If there's already sharing going on,
3266 		 * we might report a transno more than once.  find_compatible_pertrans
3267 		 * is cheap enough that it's not worth spending cycles to avoid that.)
3268 		 */
3269 		if (peragg->shareable)
3270 			*same_input_transnos = lappend_int(*same_input_transnos,
3271 											   peragg->transno);
3272 	}
3273 
3274 	return -1;
3275 }
3276 
3277 /*
3278  * find_compatible_pertrans - search for a previously initialized per-Trans
3279  * struct
3280  *
3281  * Searches the list of transnos for a per-Trans struct with the same
3282  * transition function and initial condition. (The inputs have already been
3283  * verified to match.)
3284  */
3285 static int
find_compatible_pertrans(AggState * aggstate,Aggref * newagg,bool shareable,Oid aggtransfn,Oid aggtranstype,Oid aggserialfn,Oid aggdeserialfn,Datum initValue,bool initValueIsNull,List * transnos)3286 find_compatible_pertrans(AggState *aggstate, Aggref *newagg, bool shareable,
3287 						 Oid aggtransfn, Oid aggtranstype,
3288 						 Oid aggserialfn, Oid aggdeserialfn,
3289 						 Datum initValue, bool initValueIsNull,
3290 						 List *transnos)
3291 {
3292 	ListCell   *lc;
3293 
3294 	/* If this aggregate can't share transition states, give up */
3295 	if (!shareable)
3296 		return -1;
3297 
3298 	foreach(lc, transnos)
3299 	{
3300 		int			transno = lfirst_int(lc);
3301 		AggStatePerTrans pertrans = &aggstate->pertrans[transno];
3302 
3303 		/*
3304 		 * if the transfns or transition state types are not the same then the
3305 		 * state can't be shared.
3306 		 */
3307 		if (aggtransfn != pertrans->transfn_oid ||
3308 			aggtranstype != pertrans->aggtranstype)
3309 			continue;
3310 
3311 		/*
3312 		 * The serialization and deserialization functions must match, if
3313 		 * present, as we're unable to share the trans state for aggregates
3314 		 * which will serialize or deserialize into different formats.
3315 		 * Remember that these will be InvalidOid if they're not required for
3316 		 * this agg node.
3317 		 */
3318 		if (aggserialfn != pertrans->serialfn_oid ||
3319 			aggdeserialfn != pertrans->deserialfn_oid)
3320 			continue;
3321 
3322 		/*
3323 		 * Check that the initial condition matches, too.
3324 		 */
3325 		if (initValueIsNull && pertrans->initValueIsNull)
3326 			return transno;
3327 
3328 		if (!initValueIsNull && !pertrans->initValueIsNull &&
3329 			datumIsEqual(initValue, pertrans->initValue,
3330 						 pertrans->transtypeByVal, pertrans->transtypeLen))
3331 			return transno;
3332 	}
3333 	return -1;
3334 }
3335 
3336 void
ExecEndAgg(AggState * node)3337 ExecEndAgg(AggState *node)
3338 {
3339 	PlanState  *outerPlan;
3340 	int			transno;
3341 	int			numGroupingSets = Max(node->maxsets, 1);
3342 	int			setno;
3343 
3344 	/* Make sure we have closed any open tuplesorts */
3345 
3346 	if (node->sort_in)
3347 		tuplesort_end(node->sort_in);
3348 	if (node->sort_out)
3349 		tuplesort_end(node->sort_out);
3350 
3351 	for (transno = 0; transno < node->numtrans; transno++)
3352 	{
3353 		AggStatePerTrans pertrans = &node->pertrans[transno];
3354 
3355 		for (setno = 0; setno < numGroupingSets; setno++)
3356 		{
3357 			if (pertrans->sortstates[setno])
3358 				tuplesort_end(pertrans->sortstates[setno]);
3359 		}
3360 	}
3361 
3362 	/* And ensure any agg shutdown callbacks have been called */
3363 	for (setno = 0; setno < numGroupingSets; setno++)
3364 		ReScanExprContext(node->aggcontexts[setno]);
3365 	if (node->hashcontext)
3366 		ReScanExprContext(node->hashcontext);
3367 
3368 	/*
3369 	 * We don't actually free any ExprContexts here (see comment in
3370 	 * ExecFreeExprContext), just unlinking the output one from the plan node
3371 	 * suffices.
3372 	 */
3373 	ExecFreeExprContext(&node->ss.ps);
3374 
3375 	/* clean up tuple table */
3376 	ExecClearTuple(node->ss.ss_ScanTupleSlot);
3377 
3378 	outerPlan = outerPlanState(node);
3379 	ExecEndNode(outerPlan);
3380 }
3381 
3382 void
ExecReScanAgg(AggState * node)3383 ExecReScanAgg(AggState *node)
3384 {
3385 	ExprContext *econtext = node->ss.ps.ps_ExprContext;
3386 	PlanState  *outerPlan = outerPlanState(node);
3387 	Agg		   *aggnode = (Agg *) node->ss.ps.plan;
3388 	int			transno;
3389 	int			numGroupingSets = Max(node->maxsets, 1);
3390 	int			setno;
3391 
3392 	node->agg_done = false;
3393 
3394 	if (node->aggstrategy == AGG_HASHED)
3395 	{
3396 		/*
3397 		 * In the hashed case, if we haven't yet built the hash table then we
3398 		 * can just return; nothing done yet, so nothing to undo. If subnode's
3399 		 * chgParam is not NULL then it will be re-scanned by ExecProcNode,
3400 		 * else no reason to re-scan it at all.
3401 		 */
3402 		if (!node->table_filled)
3403 			return;
3404 
3405 		/*
3406 		 * If we do have the hash table, and the subplan does not have any
3407 		 * parameter changes, and none of our own parameter changes affect
3408 		 * input expressions of the aggregated functions, then we can just
3409 		 * rescan the existing hash table; no need to build it again.
3410 		 */
3411 		if (outerPlan->chgParam == NULL &&
3412 			!bms_overlap(node->ss.ps.chgParam, aggnode->aggParams))
3413 		{
3414 			ResetTupleHashIterator(node->perhash[0].hashtable,
3415 								   &node->perhash[0].hashiter);
3416 			select_current_set(node, 0, true);
3417 			return;
3418 		}
3419 	}
3420 
3421 	/* Make sure we have closed any open tuplesorts */
3422 	for (transno = 0; transno < node->numtrans; transno++)
3423 	{
3424 		for (setno = 0; setno < numGroupingSets; setno++)
3425 		{
3426 			AggStatePerTrans pertrans = &node->pertrans[transno];
3427 
3428 			if (pertrans->sortstates[setno])
3429 			{
3430 				tuplesort_end(pertrans->sortstates[setno]);
3431 				pertrans->sortstates[setno] = NULL;
3432 			}
3433 		}
3434 	}
3435 
3436 	/*
3437 	 * We don't need to ReScanExprContext the output tuple context here;
3438 	 * ExecReScan already did it. But we do need to reset our per-grouping-set
3439 	 * contexts, which may have transvalues stored in them. (We use rescan
3440 	 * rather than just reset because transfns may have registered callbacks
3441 	 * that need to be run now.) For the AGG_HASHED case, see below.
3442 	 */
3443 
3444 	for (setno = 0; setno < numGroupingSets; setno++)
3445 	{
3446 		ReScanExprContext(node->aggcontexts[setno]);
3447 	}
3448 
3449 	/* Release first tuple of group, if we have made a copy */
3450 	if (node->grp_firstTuple != NULL)
3451 	{
3452 		heap_freetuple(node->grp_firstTuple);
3453 		node->grp_firstTuple = NULL;
3454 	}
3455 	ExecClearTuple(node->ss.ss_ScanTupleSlot);
3456 
3457 	/* Forget current agg values */
3458 	MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * node->numaggs);
3459 	MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * node->numaggs);
3460 
3461 	/*
3462 	 * With AGG_HASHED/MIXED, the hash table is allocated in a sub-context of
3463 	 * the hashcontext. This used to be an issue, but now, resetting a context
3464 	 * automatically deletes sub-contexts too.
3465 	 */
3466 	if (node->aggstrategy == AGG_HASHED || node->aggstrategy == AGG_MIXED)
3467 	{
3468 		ReScanExprContext(node->hashcontext);
3469 		/* Rebuild an empty hash table */
3470 		build_hash_table(node);
3471 		node->table_filled = false;
3472 		/* iterator will be reset when the table is filled */
3473 	}
3474 
3475 	if (node->aggstrategy != AGG_HASHED)
3476 	{
3477 		/*
3478 		 * Reset the per-group state (in particular, mark transvalues null)
3479 		 */
3480 		for (setno = 0; setno < numGroupingSets; setno++)
3481 		{
3482 			MemSet(node->pergroups[setno], 0,
3483 				   sizeof(AggStatePerGroupData) * node->numaggs);
3484 		}
3485 
3486 		/* reset to phase 1 */
3487 		initialize_phase(node, 1);
3488 
3489 		node->input_done = false;
3490 		node->projected_set = -1;
3491 	}
3492 
3493 	if (outerPlan->chgParam == NULL)
3494 		ExecReScan(outerPlan);
3495 }
3496 
3497 
3498 /***********************************************************************
3499  * API exposed to aggregate functions
3500  ***********************************************************************/
3501 
3502 
3503 /*
3504  * AggCheckCallContext - test if a SQL function is being called as an aggregate
3505  *
3506  * The transition and/or final functions of an aggregate may want to verify
3507  * that they are being called as aggregates, rather than as plain SQL
3508  * functions.  They should use this function to do so.  The return value
3509  * is nonzero if being called as an aggregate, or zero if not.  (Specific
3510  * nonzero values are AGG_CONTEXT_AGGREGATE or AGG_CONTEXT_WINDOW, but more
3511  * values could conceivably appear in future.)
3512  *
3513  * If aggcontext isn't NULL, the function also stores at *aggcontext the
3514  * identity of the memory context that aggregate transition values are being
3515  * stored in.  Note that the same aggregate call site (flinfo) may be called
3516  * interleaved on different transition values in different contexts, so it's
3517  * not kosher to cache aggcontext under fn_extra.  It is, however, kosher to
3518  * cache it in the transvalue itself (for internal-type transvalues).
3519  */
3520 int
AggCheckCallContext(FunctionCallInfo fcinfo,MemoryContext * aggcontext)3521 AggCheckCallContext(FunctionCallInfo fcinfo, MemoryContext *aggcontext)
3522 {
3523 	if (fcinfo->context && IsA(fcinfo->context, AggState))
3524 	{
3525 		if (aggcontext)
3526 		{
3527 			AggState   *aggstate = ((AggState *) fcinfo->context);
3528 			ExprContext *cxt = aggstate->curaggcontext;
3529 
3530 			*aggcontext = cxt->ecxt_per_tuple_memory;
3531 		}
3532 		return AGG_CONTEXT_AGGREGATE;
3533 	}
3534 	if (fcinfo->context && IsA(fcinfo->context, WindowAggState))
3535 	{
3536 		if (aggcontext)
3537 			*aggcontext = ((WindowAggState *) fcinfo->context)->curaggcontext;
3538 		return AGG_CONTEXT_WINDOW;
3539 	}
3540 
3541 	/* this is just to prevent "uninitialized variable" warnings */
3542 	if (aggcontext)
3543 		*aggcontext = NULL;
3544 	return 0;
3545 }
3546 
3547 /*
3548  * AggGetAggref - allow an aggregate support function to get its Aggref
3549  *
3550  * If the function is being called as an aggregate support function,
3551  * return the Aggref node for the aggregate call.  Otherwise, return NULL.
3552  *
3553  * Aggregates sharing the same inputs and transition functions can get
3554  * merged into a single transition calculation.  If the transition function
3555  * calls AggGetAggref, it will get some one of the Aggrefs for which it is
3556  * executing.  It must therefore not pay attention to the Aggref fields that
3557  * relate to the final function, as those are indeterminate.  But if a final
3558  * function calls AggGetAggref, it will get a precise result.
3559  *
3560  * Note that if an aggregate is being used as a window function, this will
3561  * return NULL.  We could provide a similar function to return the relevant
3562  * WindowFunc node in such cases, but it's not needed yet.
3563  */
3564 Aggref *
AggGetAggref(FunctionCallInfo fcinfo)3565 AggGetAggref(FunctionCallInfo fcinfo)
3566 {
3567 	if (fcinfo->context && IsA(fcinfo->context, AggState))
3568 	{
3569 		AggState   *aggstate = (AggState *) fcinfo->context;
3570 		AggStatePerAgg curperagg;
3571 		AggStatePerTrans curpertrans;
3572 
3573 		/* check curperagg (valid when in a final function) */
3574 		curperagg = aggstate->curperagg;
3575 
3576 		if (curperagg)
3577 			return curperagg->aggref;
3578 
3579 		/* check curpertrans (valid when in a transition function) */
3580 		curpertrans = aggstate->curpertrans;
3581 
3582 		if (curpertrans)
3583 			return curpertrans->aggref;
3584 	}
3585 	return NULL;
3586 }
3587 
3588 /*
3589  * AggGetTempMemoryContext - fetch short-term memory context for aggregates
3590  *
3591  * This is useful in agg final functions; the context returned is one that
3592  * the final function can safely reset as desired.  This isn't useful for
3593  * transition functions, since the context returned MAY (we don't promise)
3594  * be the same as the context those are called in.
3595  *
3596  * As above, this is currently not useful for aggs called as window functions.
3597  */
3598 MemoryContext
AggGetTempMemoryContext(FunctionCallInfo fcinfo)3599 AggGetTempMemoryContext(FunctionCallInfo fcinfo)
3600 {
3601 	if (fcinfo->context && IsA(fcinfo->context, AggState))
3602 	{
3603 		AggState   *aggstate = (AggState *) fcinfo->context;
3604 
3605 		return aggstate->tmpcontext->ecxt_per_tuple_memory;
3606 	}
3607 	return NULL;
3608 }
3609 
3610 /*
3611  * AggStateIsShared - find out whether transition state is shared
3612  *
3613  * If the function is being called as an aggregate support function,
3614  * return true if the aggregate's transition state is shared across
3615  * multiple aggregates, false if it is not.
3616  *
3617  * Returns true if not called as an aggregate support function.
3618  * This is intended as a conservative answer, ie "no you'd better not
3619  * scribble on your input".  In particular, will return true if the
3620  * aggregate is being used as a window function, which is a scenario
3621  * in which changing the transition state is a bad idea.  We might
3622  * want to refine the behavior for the window case in future.
3623  */
3624 bool
AggStateIsShared(FunctionCallInfo fcinfo)3625 AggStateIsShared(FunctionCallInfo fcinfo)
3626 {
3627 	if (fcinfo->context && IsA(fcinfo->context, AggState))
3628 	{
3629 		AggState   *aggstate = (AggState *) fcinfo->context;
3630 		AggStatePerAgg curperagg;
3631 		AggStatePerTrans curpertrans;
3632 
3633 		/* check curperagg (valid when in a final function) */
3634 		curperagg = aggstate->curperagg;
3635 
3636 		if (curperagg)
3637 			return aggstate->pertrans[curperagg->transno].aggshared;
3638 
3639 		/* check curpertrans (valid when in a transition function) */
3640 		curpertrans = aggstate->curpertrans;
3641 
3642 		if (curpertrans)
3643 			return curpertrans->aggshared;
3644 	}
3645 	return true;
3646 }
3647 
3648 /*
3649  * AggRegisterCallback - register a cleanup callback for an aggregate
3650  *
3651  * This is useful for aggs to register shutdown callbacks, which will ensure
3652  * that non-memory resources are freed.  The callback will occur just before
3653  * the associated aggcontext (as returned by AggCheckCallContext) is reset,
3654  * either between groups or as a result of rescanning the query.  The callback
3655  * will NOT be called on error paths.  The typical use-case is for freeing of
3656  * tuplestores or tuplesorts maintained in aggcontext, or pins held by slots
3657  * created by the agg functions.  (The callback will not be called until after
3658  * the result of the finalfn is no longer needed, so it's safe for the finalfn
3659  * to return data that will be freed by the callback.)
3660  *
3661  * As above, this is currently not useful for aggs called as window functions.
3662  */
3663 void
AggRegisterCallback(FunctionCallInfo fcinfo,ExprContextCallbackFunction func,Datum arg)3664 AggRegisterCallback(FunctionCallInfo fcinfo,
3665 					ExprContextCallbackFunction func,
3666 					Datum arg)
3667 {
3668 	if (fcinfo->context && IsA(fcinfo->context, AggState))
3669 	{
3670 		AggState   *aggstate = (AggState *) fcinfo->context;
3671 		ExprContext *cxt = aggstate->curaggcontext;
3672 
3673 		RegisterExprContextCallback(cxt, func, arg);
3674 
3675 		return;
3676 	}
3677 	elog(ERROR, "aggregate function cannot register a callback in this context");
3678 }
3679 
3680 
3681 /*
3682  * aggregate_dummy - dummy execution routine for aggregate functions
3683  *
3684  * This function is listed as the implementation (prosrc field) of pg_proc
3685  * entries for aggregate functions.  Its only purpose is to throw an error
3686  * if someone mistakenly executes such a function in the normal way.
3687  *
3688  * Perhaps someday we could assign real meaning to the prosrc field of
3689  * an aggregate?
3690  */
3691 Datum
aggregate_dummy(PG_FUNCTION_ARGS)3692 aggregate_dummy(PG_FUNCTION_ARGS)
3693 {
3694 	elog(ERROR, "aggregate function %u called as normal function",
3695 		 fcinfo->flinfo->fn_oid);
3696 	return (Datum) 0;			/* keep compiler quiet */
3697 }
3698