1 /*-------------------------------------------------------------------------
2  *
3  * nodeAgg.c
4  *	  Routines to handle aggregate nodes.
5  *
6  *	  ExecAgg normally evaluates each aggregate in the following steps:
7  *
8  *		 transvalue = initcond
9  *		 foreach input_tuple do
10  *			transvalue = transfunc(transvalue, input_value(s))
11  *		 result = finalfunc(transvalue, direct_argument(s))
12  *
13  *	  If a finalfunc is not supplied then the result is just the ending
14  *	  value of transvalue.
15  *
16  *	  Other behaviors can be selected by the "aggsplit" mode, which exists
17  *	  to support partial aggregation.  It is possible to:
18  *	  * Skip running the finalfunc, so that the output is always the
19  *	  final transvalue state.
20  *	  * Substitute the combinefunc for the transfunc, so that transvalue
21  *	  states (propagated up from a child partial-aggregation step) are merged
22  *	  rather than processing raw input rows.  (The statements below about
23  *	  the transfunc apply equally to the combinefunc, when it's selected.)
24  *	  * Apply the serializefunc to the output values (this only makes sense
25  *	  when skipping the finalfunc, since the serializefunc works on the
26  *	  transvalue data type).
27  *	  * Apply the deserializefunc to the input values (this only makes sense
28  *	  when using the combinefunc, for similar reasons).
29  *	  It is the planner's responsibility to connect up Agg nodes using these
30  *	  alternate behaviors in a way that makes sense, with partial aggregation
31  *	  results being fed to nodes that expect them.
32  *
33  *	  If a normal aggregate call specifies DISTINCT or ORDER BY, we sort the
34  *	  input tuples and eliminate duplicates (if required) before performing
35  *	  the above-depicted process.  (However, we don't do that for ordered-set
36  *	  aggregates; their "ORDER BY" inputs are ordinary aggregate arguments
37  *	  so far as this module is concerned.)	Note that partial aggregation
38  *	  is not supported in these cases, since we couldn't ensure global
39  *	  ordering or distinctness of the inputs.
40  *
41  *	  If transfunc is marked "strict" in pg_proc and initcond is NULL,
42  *	  then the first non-NULL input_value is assigned directly to transvalue,
43  *	  and transfunc isn't applied until the second non-NULL input_value.
44  *	  The agg's first input type and transtype must be the same in this case!
45  *
46  *	  If transfunc is marked "strict" then NULL input_values are skipped,
47  *	  keeping the previous transvalue.  If transfunc is not strict then it
48  *	  is called for every input tuple and must deal with NULL initcond
49  *	  or NULL input_values for itself.
50  *
51  *	  If finalfunc is marked "strict" then it is not called when the
52  *	  ending transvalue is NULL, instead a NULL result is created
53  *	  automatically (this is just the usual handling of strict functions,
54  *	  of course).  A non-strict finalfunc can make its own choice of
55  *	  what to return for a NULL ending transvalue.
56  *
57  *	  Ordered-set aggregates are treated specially in one other way: we
58  *	  evaluate any "direct" arguments and pass them to the finalfunc along
59  *	  with the transition value.
60  *
61  *	  A finalfunc can have additional arguments beyond the transvalue and
62  *	  any "direct" arguments, corresponding to the input arguments of the
63  *	  aggregate.  These are always just passed as NULL.  Such arguments may be
64  *	  needed to allow resolution of a polymorphic aggregate's result type.
65  *
66  *	  We compute aggregate input expressions and run the transition functions
67  *	  in a temporary econtext (aggstate->tmpcontext).  This is reset at least
68  *	  once per input tuple, so when the transvalue datatype is
69  *	  pass-by-reference, we have to be careful to copy it into a longer-lived
70  *	  memory context, and free the prior value to avoid memory leakage.  We
71  *	  store transvalues in another set of econtexts, aggstate->aggcontexts
72  *	  (one per grouping set, see below), which are also used for the hashtable
73  *	  structures in AGG_HASHED mode.  These econtexts are rescanned, not just
74  *	  reset, at group boundaries so that aggregate transition functions can
75  *	  register shutdown callbacks via AggRegisterCallback.
76  *
77  *	  The node's regular econtext (aggstate->ss.ps.ps_ExprContext) is used to
78  *	  run finalize functions and compute the output tuple; this context can be
79  *	  reset once per output tuple.
80  *
81  *	  The executor's AggState node is passed as the fmgr "context" value in
82  *	  all transfunc and finalfunc calls.  It is not recommended that the
83  *	  transition functions look at the AggState node directly, but they can
84  *	  use AggCheckCallContext() to verify that they are being called by
85  *	  nodeAgg.c (and not as ordinary SQL functions).  The main reason a
86  *	  transition function might want to know this is so that it can avoid
87  *	  palloc'ing a fixed-size pass-by-ref transition value on every call:
88  *	  it can instead just scribble on and return its left input.  Ordinarily
89  *	  it is completely forbidden for functions to modify pass-by-ref inputs,
90  *	  but in the aggregate case we know the left input is either the initial
91  *	  transition value or a previous function result, and in either case its
92  *	  value need not be preserved.  See int8inc() for an example.  Notice that
93  *	  the EEOP_AGG_PLAIN_TRANS step is coded to avoid a data copy step when
94  *	  the previous transition value pointer is returned.  It is also possible
95  *	  to avoid repeated data copying when the transition value is an expanded
96  *	  object: to do that, the transition function must take care to return
97  *	  an expanded object that is in a child context of the memory context
98  *	  returned by AggCheckCallContext().  Also, some transition functions want
99  *	  to store working state in addition to the nominal transition value; they
100  *	  can use the memory context returned by AggCheckCallContext() to do that.
101  *
102  *	  Note: AggCheckCallContext() is available as of PostgreSQL 9.0.  The
103  *	  AggState is available as context in earlier releases (back to 8.1),
104  *	  but direct examination of the node is needed to use it before 9.0.
105  *
106  *	  As of 9.4, aggregate transition functions can also use AggGetAggref()
107  *	  to get hold of the Aggref expression node for their aggregate call.
108  *	  This is mainly intended for ordered-set aggregates, which are not
109  *	  supported as window functions.  (A regular aggregate function would
110  *	  need some fallback logic to use this, since there's no Aggref node
111  *	  for a window function.)
112  *
113  *	  Grouping sets:
114  *
115  *	  A list of grouping sets which is structurally equivalent to a ROLLUP
116  *	  clause (e.g. (a,b,c), (a,b), (a)) can be processed in a single pass over
117  *	  ordered data.  We do this by keeping a separate set of transition values
118  *	  for each grouping set being concurrently processed; for each input tuple
119  *	  we update them all, and on group boundaries we reset those states
120  *	  (starting at the front of the list) whose grouping values have changed
121  *	  (the list of grouping sets is ordered from most specific to least
122  *	  specific).
123  *
124  *	  Where more complex grouping sets are used, we break them down into
125  *	  "phases", where each phase has a different sort order (except phase 0
126  *	  which is reserved for hashing).  During each phase but the last, the
127  *	  input tuples are additionally stored in a tuplesort which is keyed to the
128  *	  next phase's sort order; during each phase but the first, the input
129  *	  tuples are drawn from the previously sorted data.  (The sorting of the
130  *	  data for the first phase is handled by the planner, as it might be
131  *	  satisfied by underlying nodes.)
132  *
133  *	  Hashing can be mixed with sorted grouping.  To do this, we have an
134  *	  AGG_MIXED strategy that populates the hashtables during the first sorted
135  *	  phase, and switches to reading them out after completing all sort phases.
136  *	  We can also support AGG_HASHED with multiple hash tables and no sorting
137  *	  at all.
138  *
139  *	  From the perspective of aggregate transition and final functions, the
140  *	  only issue regarding grouping sets is this: a single call site (flinfo)
141  *	  of an aggregate function may be used for updating several different
142  *	  transition values in turn. So the function must not cache in the flinfo
143  *	  anything which logically belongs as part of the transition value (most
144  *	  importantly, the memory context in which the transition value exists).
145  *	  The support API functions (AggCheckCallContext, AggRegisterCallback) are
146  *	  sensitive to the grouping set for which the aggregate function is
147  *	  currently being called.
148  *
149  *	  Plan structure:
150  *
151  *	  What we get from the planner is actually one "real" Agg node which is
152  *	  part of the plan tree proper, but which optionally has an additional list
153  *	  of Agg nodes hung off the side via the "chain" field.  This is because an
154  *	  Agg node happens to be a convenient representation of all the data we
155  *	  need for grouping sets.
156  *
157  *	  For many purposes, we treat the "real" node as if it were just the first
158  *	  node in the chain.  The chain must be ordered such that hashed entries
159  *	  come before sorted/plain entries; the real node is marked AGG_MIXED if
160  *	  there are both types present (in which case the real node describes one
161  *	  of the hashed groupings, other AGG_HASHED nodes may optionally follow in
162  *	  the chain, followed in turn by AGG_SORTED or (one) AGG_PLAIN node).  If
163  *	  the real node is marked AGG_HASHED or AGG_SORTED, then all the chained
164  *	  nodes must be of the same type; if it is AGG_PLAIN, there can be no
165  *	  chained nodes.
166  *
167  *	  We collect all hashed nodes into a single "phase", numbered 0, and create
168  *	  a sorted phase (numbered 1..n) for each AGG_SORTED or AGG_PLAIN node.
169  *	  Phase 0 is allocated even if there are no hashes, but remains unused in
170  *	  that case.
171  *
172  *	  AGG_HASHED nodes actually refer to only a single grouping set each,
173  *	  because for each hashed grouping we need a separate grpColIdx and
174  *	  numGroups estimate.  AGG_SORTED nodes represent a "rollup", a list of
175  *	  grouping sets that share a sort order.  Each AGG_SORTED node other than
176  *	  the first one has an associated Sort node which describes the sort order
177  *	  to be used; the first sorted node takes its input from the outer subtree,
178  *	  which the planner has already arranged to provide ordered data.
179  *
180  *	  Memory and ExprContext usage:
181  *
182  *	  Because we're accumulating aggregate values across input rows, we need to
183  *	  use more memory contexts than just simple input/output tuple contexts.
184  *	  In fact, for a rollup, we need a separate context for each grouping set
185  *	  so that we can reset the inner (finer-grained) aggregates on their group
186  *	  boundaries while continuing to accumulate values for outer
187  *	  (coarser-grained) groupings.  On top of this, we might be simultaneously
188  *	  populating hashtables; however, we only need one context for all the
189  *	  hashtables.
190  *
191  *	  So we create an array, aggcontexts, with an ExprContext for each grouping
192  *	  set in the largest rollup that we're going to process, and use the
193  *	  per-tuple memory context of those ExprContexts to store the aggregate
194  *	  transition values.  hashcontext is the single context created to support
195  *	  all hash tables.
196  *
197  *	  Spilling To Disk
198  *
199  *	  When performing hash aggregation, if the hash table memory exceeds the
200  *	  limit (see hash_agg_check_limits()), we enter "spill mode". In spill
201  *	  mode, we advance the transition states only for groups already in the
202  *	  hash table. For tuples that would need to create a new hash table
203  *	  entries (and initialize new transition states), we instead spill them to
204  *	  disk to be processed later. The tuples are spilled in a partitioned
205  *	  manner, so that subsequent batches are smaller and less likely to exceed
206  *	  hash_mem (if a batch does exceed hash_mem, it must be spilled
207  *	  recursively).
208  *
209  *	  Spilled data is written to logical tapes. These provide better control
210  *	  over memory usage, disk space, and the number of files than if we were
211  *	  to use a BufFile for each spill.
212  *
213  *	  Note that it's possible for transition states to start small but then
214  *	  grow very large; for instance in the case of ARRAY_AGG. In such cases,
215  *	  it's still possible to significantly exceed hash_mem. We try to avoid
216  *	  this situation by estimating what will fit in the available memory, and
217  *	  imposing a limit on the number of groups separately from the amount of
218  *	  memory consumed.
219  *
220  *    Transition / Combine function invocation:
221  *
222  *    For performance reasons transition functions, including combine
223  *    functions, aren't invoked one-by-one from nodeAgg.c after computing
224  *    arguments using the expression evaluation engine. Instead
225  *    ExecBuildAggTrans() builds one large expression that does both argument
226  *    evaluation and transition function invocation. That avoids performance
227  *    issues due to repeated uses of expression evaluation, complications due
228  *    to filter expressions having to be evaluated early, and allows to JIT
229  *    the entire expression into one native function.
230  *
231  * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
232  * Portions Copyright (c) 1994, Regents of the University of California
233  *
234  * IDENTIFICATION
235  *	  src/backend/executor/nodeAgg.c
236  *
237  *-------------------------------------------------------------------------
238  */
239 
240 #include "postgres.h"
241 
242 #include "access/htup_details.h"
243 #include "access/parallel.h"
244 #include "catalog/objectaccess.h"
245 #include "catalog/pg_aggregate.h"
246 #include "catalog/pg_proc.h"
247 #include "catalog/pg_type.h"
248 #include "common/hashfn.h"
249 #include "executor/execExpr.h"
250 #include "executor/executor.h"
251 #include "executor/nodeAgg.h"
252 #include "lib/hyperloglog.h"
253 #include "miscadmin.h"
254 #include "nodes/makefuncs.h"
255 #include "nodes/nodeFuncs.h"
256 #include "optimizer/optimizer.h"
257 #include "parser/parse_agg.h"
258 #include "parser/parse_coerce.h"
259 #include "utils/acl.h"
260 #include "utils/builtins.h"
261 #include "utils/datum.h"
262 #include "utils/dynahash.h"
263 #include "utils/expandeddatum.h"
264 #include "utils/logtape.h"
265 #include "utils/lsyscache.h"
266 #include "utils/memutils.h"
267 #include "utils/syscache.h"
268 #include "utils/tuplesort.h"
269 
270 /*
271  * Control how many partitions are created when spilling HashAgg to
272  * disk.
273  *
274  * HASHAGG_PARTITION_FACTOR is multiplied by the estimated number of
275  * partitions needed such that each partition will fit in memory. The factor
276  * is set higher than one because there's not a high cost to having a few too
277  * many partitions, and it makes it less likely that a partition will need to
278  * be spilled recursively. Another benefit of having more, smaller partitions
279  * is that small hash tables may perform better than large ones due to memory
280  * caching effects.
281  *
282  * We also specify a min and max number of partitions per spill. Too few might
283  * mean a lot of wasted I/O from repeated spilling of the same tuples. Too
284  * many will result in lots of memory wasted buffering the spill files (which
285  * could instead be spent on a larger hash table).
286  */
287 #define HASHAGG_PARTITION_FACTOR 1.50
288 #define HASHAGG_MIN_PARTITIONS 4
289 #define HASHAGG_MAX_PARTITIONS 1024
290 
291 /*
292  * For reading from tapes, the buffer size must be a multiple of
293  * BLCKSZ. Larger values help when reading from multiple tapes concurrently,
294  * but that doesn't happen in HashAgg, so we simply use BLCKSZ. Writing to a
295  * tape always uses a buffer of size BLCKSZ.
296  */
297 #define HASHAGG_READ_BUFFER_SIZE BLCKSZ
298 #define HASHAGG_WRITE_BUFFER_SIZE BLCKSZ
299 
300 /*
301  * HyperLogLog is used for estimating the cardinality of the spilled tuples in
302  * a given partition. 5 bits corresponds to a size of about 32 bytes and a
303  * worst-case error of around 18%. That's effective enough to choose a
304  * reasonable number of partitions when recursing.
305  */
306 #define HASHAGG_HLL_BIT_WIDTH 5
307 
308 /*
309  * Estimate chunk overhead as a constant 16 bytes. XXX: should this be
310  * improved?
311  */
312 #define CHUNKHDRSZ 16
313 
314 /*
315  * Track all tapes needed for a HashAgg that spills. We don't know the maximum
316  * number of tapes needed at the start of the algorithm (because it can
317  * recurse), so one tape set is allocated and extended as needed for new
318  * tapes. When a particular tape is already read, rewind it for write mode and
319  * put it in the free list.
320  *
321  * Tapes' buffers can take up substantial memory when many tapes are open at
322  * once. We only need one tape open at a time in read mode (using a buffer
323  * that's a multiple of BLCKSZ); but we need one tape open in write mode (each
324  * requiring a buffer of size BLCKSZ) for each partition.
325  */
326 typedef struct HashTapeInfo
327 {
328 	LogicalTapeSet *tapeset;
329 	int			ntapes;
330 	int		   *freetapes;
331 	int			nfreetapes;
332 	int			freetapes_alloc;
333 } HashTapeInfo;
334 
335 /*
336  * Represents partitioned spill data for a single hashtable. Contains the
337  * necessary information to route tuples to the correct partition, and to
338  * transform the spilled data into new batches.
339  *
340  * The high bits are used for partition selection (when recursing, we ignore
341  * the bits that have already been used for partition selection at an earlier
342  * level).
343  */
344 typedef struct HashAggSpill
345 {
346 	LogicalTapeSet *tapeset;	/* borrowed reference to tape set */
347 	int			npartitions;	/* number of partitions */
348 	int		   *partitions;		/* spill partition tape numbers */
349 	int64	   *ntuples;		/* number of tuples in each partition */
350 	uint32		mask;			/* mask to find partition from hash value */
351 	int			shift;			/* after masking, shift by this amount */
352 	hyperLogLogState *hll_card; /* cardinality estimate for contents */
353 } HashAggSpill;
354 
355 /*
356  * Represents work to be done for one pass of hash aggregation (with only one
357  * grouping set).
358  *
359  * Also tracks the bits of the hash already used for partition selection by
360  * earlier iterations, so that this batch can use new bits. If all bits have
361  * already been used, no partitioning will be done (any spilled data will go
362  * to a single output tape).
363  */
364 typedef struct HashAggBatch
365 {
366 	int			setno;			/* grouping set */
367 	int			used_bits;		/* number of bits of hash already used */
368 	LogicalTapeSet *tapeset;	/* borrowed reference to tape set */
369 	int			input_tapenum;	/* input partition tape */
370 	int64		input_tuples;	/* number of tuples in this batch */
371 	double		input_card;		/* estimated group cardinality */
372 } HashAggBatch;
373 
374 /* used to find referenced colnos */
375 typedef struct FindColsContext
376 {
377 	bool		is_aggref;		/* is under an aggref */
378 	Bitmapset  *aggregated;		/* column references under an aggref */
379 	Bitmapset  *unaggregated;	/* other column references */
380 } FindColsContext;
381 
382 static void select_current_set(AggState *aggstate, int setno, bool is_hash);
383 static void initialize_phase(AggState *aggstate, int newphase);
384 static TupleTableSlot *fetch_input_tuple(AggState *aggstate);
385 static void initialize_aggregates(AggState *aggstate,
386 								  AggStatePerGroup *pergroups,
387 								  int numReset);
388 static void advance_transition_function(AggState *aggstate,
389 										AggStatePerTrans pertrans,
390 										AggStatePerGroup pergroupstate);
391 static void advance_aggregates(AggState *aggstate);
392 static void process_ordered_aggregate_single(AggState *aggstate,
393 											 AggStatePerTrans pertrans,
394 											 AggStatePerGroup pergroupstate);
395 static void process_ordered_aggregate_multi(AggState *aggstate,
396 											AggStatePerTrans pertrans,
397 											AggStatePerGroup pergroupstate);
398 static void finalize_aggregate(AggState *aggstate,
399 							   AggStatePerAgg peragg,
400 							   AggStatePerGroup pergroupstate,
401 							   Datum *resultVal, bool *resultIsNull);
402 static void finalize_partialaggregate(AggState *aggstate,
403 									  AggStatePerAgg peragg,
404 									  AggStatePerGroup pergroupstate,
405 									  Datum *resultVal, bool *resultIsNull);
406 static inline void prepare_hash_slot(AggStatePerHash perhash,
407 									 TupleTableSlot *inputslot,
408 									 TupleTableSlot *hashslot);
409 static void prepare_projection_slot(AggState *aggstate,
410 									TupleTableSlot *slot,
411 									int currentSet);
412 static void finalize_aggregates(AggState *aggstate,
413 								AggStatePerAgg peragg,
414 								AggStatePerGroup pergroup);
415 static TupleTableSlot *project_aggregates(AggState *aggstate);
416 static void find_cols(AggState *aggstate, Bitmapset **aggregated,
417 					  Bitmapset **unaggregated);
418 static bool find_cols_walker(Node *node, FindColsContext *context);
419 static void build_hash_tables(AggState *aggstate);
420 static void build_hash_table(AggState *aggstate, int setno, long nbuckets);
421 static void hashagg_recompile_expressions(AggState *aggstate, bool minslot,
422 										  bool nullcheck);
423 static long hash_choose_num_buckets(double hashentrysize,
424 									long estimated_nbuckets,
425 									Size memory);
426 static int	hash_choose_num_partitions(double input_groups,
427 									   double hashentrysize,
428 									   int used_bits,
429 									   int *log2_npartittions);
430 static void initialize_hash_entry(AggState *aggstate,
431 								  TupleHashTable hashtable,
432 								  TupleHashEntry entry);
433 static void lookup_hash_entries(AggState *aggstate);
434 static TupleTableSlot *agg_retrieve_direct(AggState *aggstate);
435 static void agg_fill_hash_table(AggState *aggstate);
436 static bool agg_refill_hash_table(AggState *aggstate);
437 static TupleTableSlot *agg_retrieve_hash_table(AggState *aggstate);
438 static TupleTableSlot *agg_retrieve_hash_table_in_memory(AggState *aggstate);
439 static void hash_agg_check_limits(AggState *aggstate);
440 static void hash_agg_enter_spill_mode(AggState *aggstate);
441 static void hash_agg_update_metrics(AggState *aggstate, bool from_tape,
442 									int npartitions);
443 static void hashagg_finish_initial_spills(AggState *aggstate);
444 static void hashagg_reset_spill_state(AggState *aggstate);
445 static HashAggBatch *hashagg_batch_new(LogicalTapeSet *tapeset,
446 									   int input_tapenum, int setno,
447 									   int64 input_tuples, double input_card,
448 									   int used_bits);
449 static MinimalTuple hashagg_batch_read(HashAggBatch *batch, uint32 *hashp);
450 static void hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo,
451 							   int used_bits, double input_groups,
452 							   double hashentrysize);
453 static Size hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
454 								TupleTableSlot *slot, uint32 hash);
455 static void hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill,
456 								 int setno);
457 static void hashagg_tapeinfo_init(AggState *aggstate);
458 static void hashagg_tapeinfo_assign(HashTapeInfo *tapeinfo, int *dest,
459 									int ndest);
460 static void hashagg_tapeinfo_release(HashTapeInfo *tapeinfo, int tapenum);
461 static Datum GetAggInitVal(Datum textInitVal, Oid transtype);
462 static void build_pertrans_for_aggref(AggStatePerTrans pertrans,
463 									  AggState *aggstate, EState *estate,
464 									  Aggref *aggref, Oid aggtransfn, Oid aggtranstype,
465 									  Oid aggserialfn, Oid aggdeserialfn,
466 									  Datum initValue, bool initValueIsNull,
467 									  Oid *inputTypes, int numArguments);
468 
469 
470 /*
471  * Select the current grouping set; affects current_set and
472  * curaggcontext.
473  */
474 static void
select_current_set(AggState * aggstate,int setno,bool is_hash)475 select_current_set(AggState *aggstate, int setno, bool is_hash)
476 {
477 	/*
478 	 * When changing this, also adapt ExecAggPlainTransByVal() and
479 	 * ExecAggPlainTransByRef().
480 	 */
481 	if (is_hash)
482 		aggstate->curaggcontext = aggstate->hashcontext;
483 	else
484 		aggstate->curaggcontext = aggstate->aggcontexts[setno];
485 
486 	aggstate->current_set = setno;
487 }
488 
489 /*
490  * Switch to phase "newphase", which must either be 0 or 1 (to reset) or
491  * current_phase + 1. Juggle the tuplesorts accordingly.
492  *
493  * Phase 0 is for hashing, which we currently handle last in the AGG_MIXED
494  * case, so when entering phase 0, all we need to do is drop open sorts.
495  */
496 static void
initialize_phase(AggState * aggstate,int newphase)497 initialize_phase(AggState *aggstate, int newphase)
498 {
499 	Assert(newphase <= 1 || newphase == aggstate->current_phase + 1);
500 
501 	/*
502 	 * Whatever the previous state, we're now done with whatever input
503 	 * tuplesort was in use.
504 	 */
505 	if (aggstate->sort_in)
506 	{
507 		tuplesort_end(aggstate->sort_in);
508 		aggstate->sort_in = NULL;
509 	}
510 
511 	if (newphase <= 1)
512 	{
513 		/*
514 		 * Discard any existing output tuplesort.
515 		 */
516 		if (aggstate->sort_out)
517 		{
518 			tuplesort_end(aggstate->sort_out);
519 			aggstate->sort_out = NULL;
520 		}
521 	}
522 	else
523 	{
524 		/*
525 		 * The old output tuplesort becomes the new input one, and this is the
526 		 * right time to actually sort it.
527 		 */
528 		aggstate->sort_in = aggstate->sort_out;
529 		aggstate->sort_out = NULL;
530 		Assert(aggstate->sort_in);
531 		tuplesort_performsort(aggstate->sort_in);
532 	}
533 
534 	/*
535 	 * If this isn't the last phase, we need to sort appropriately for the
536 	 * next phase in sequence.
537 	 */
538 	if (newphase > 0 && newphase < aggstate->numphases - 1)
539 	{
540 		Sort	   *sortnode = aggstate->phases[newphase + 1].sortnode;
541 		PlanState  *outerNode = outerPlanState(aggstate);
542 		TupleDesc	tupDesc = ExecGetResultType(outerNode);
543 
544 		aggstate->sort_out = tuplesort_begin_heap(tupDesc,
545 												  sortnode->numCols,
546 												  sortnode->sortColIdx,
547 												  sortnode->sortOperators,
548 												  sortnode->collations,
549 												  sortnode->nullsFirst,
550 												  work_mem,
551 												  NULL, false);
552 	}
553 
554 	aggstate->current_phase = newphase;
555 	aggstate->phase = &aggstate->phases[newphase];
556 }
557 
558 /*
559  * Fetch a tuple from either the outer plan (for phase 1) or from the sorter
560  * populated by the previous phase.  Copy it to the sorter for the next phase
561  * if any.
562  *
563  * Callers cannot rely on memory for tuple in returned slot remaining valid
564  * past any subsequently fetched tuple.
565  */
566 static TupleTableSlot *
fetch_input_tuple(AggState * aggstate)567 fetch_input_tuple(AggState *aggstate)
568 {
569 	TupleTableSlot *slot;
570 
571 	if (aggstate->sort_in)
572 	{
573 		/* make sure we check for interrupts in either path through here */
574 		CHECK_FOR_INTERRUPTS();
575 		if (!tuplesort_gettupleslot(aggstate->sort_in, true, false,
576 									aggstate->sort_slot, NULL))
577 			return NULL;
578 		slot = aggstate->sort_slot;
579 	}
580 	else
581 		slot = ExecProcNode(outerPlanState(aggstate));
582 
583 	if (!TupIsNull(slot) && aggstate->sort_out)
584 		tuplesort_puttupleslot(aggstate->sort_out, slot);
585 
586 	return slot;
587 }
588 
589 /*
590  * (Re)Initialize an individual aggregate.
591  *
592  * This function handles only one grouping set, already set in
593  * aggstate->current_set.
594  *
595  * When called, CurrentMemoryContext should be the per-query context.
596  */
597 static void
initialize_aggregate(AggState * aggstate,AggStatePerTrans pertrans,AggStatePerGroup pergroupstate)598 initialize_aggregate(AggState *aggstate, AggStatePerTrans pertrans,
599 					 AggStatePerGroup pergroupstate)
600 {
601 	/*
602 	 * Start a fresh sort operation for each DISTINCT/ORDER BY aggregate.
603 	 */
604 	if (pertrans->numSortCols > 0)
605 	{
606 		/*
607 		 * In case of rescan, maybe there could be an uncompleted sort
608 		 * operation?  Clean it up if so.
609 		 */
610 		if (pertrans->sortstates[aggstate->current_set])
611 			tuplesort_end(pertrans->sortstates[aggstate->current_set]);
612 
613 
614 		/*
615 		 * We use a plain Datum sorter when there's a single input column;
616 		 * otherwise sort the full tuple.  (See comments for
617 		 * process_ordered_aggregate_single.)
618 		 */
619 		if (pertrans->numInputs == 1)
620 		{
621 			Form_pg_attribute attr = TupleDescAttr(pertrans->sortdesc, 0);
622 
623 			pertrans->sortstates[aggstate->current_set] =
624 				tuplesort_begin_datum(attr->atttypid,
625 									  pertrans->sortOperators[0],
626 									  pertrans->sortCollations[0],
627 									  pertrans->sortNullsFirst[0],
628 									  work_mem, NULL, false);
629 		}
630 		else
631 			pertrans->sortstates[aggstate->current_set] =
632 				tuplesort_begin_heap(pertrans->sortdesc,
633 									 pertrans->numSortCols,
634 									 pertrans->sortColIdx,
635 									 pertrans->sortOperators,
636 									 pertrans->sortCollations,
637 									 pertrans->sortNullsFirst,
638 									 work_mem, NULL, false);
639 	}
640 
641 	/*
642 	 * (Re)set transValue to the initial value.
643 	 *
644 	 * Note that when the initial value is pass-by-ref, we must copy it (into
645 	 * the aggcontext) since we will pfree the transValue later.
646 	 */
647 	if (pertrans->initValueIsNull)
648 		pergroupstate->transValue = pertrans->initValue;
649 	else
650 	{
651 		MemoryContext oldContext;
652 
653 		oldContext = MemoryContextSwitchTo(aggstate->curaggcontext->ecxt_per_tuple_memory);
654 		pergroupstate->transValue = datumCopy(pertrans->initValue,
655 											  pertrans->transtypeByVal,
656 											  pertrans->transtypeLen);
657 		MemoryContextSwitchTo(oldContext);
658 	}
659 	pergroupstate->transValueIsNull = pertrans->initValueIsNull;
660 
661 	/*
662 	 * If the initial value for the transition state doesn't exist in the
663 	 * pg_aggregate table then we will let the first non-NULL value returned
664 	 * from the outer procNode become the initial value. (This is useful for
665 	 * aggregates like max() and min().) The noTransValue flag signals that we
666 	 * still need to do this.
667 	 */
668 	pergroupstate->noTransValue = pertrans->initValueIsNull;
669 }
670 
671 /*
672  * Initialize all aggregate transition states for a new group of input values.
673  *
674  * If there are multiple grouping sets, we initialize only the first numReset
675  * of them (the grouping sets are ordered so that the most specific one, which
676  * is reset most often, is first). As a convenience, if numReset is 0, we
677  * reinitialize all sets.
678  *
679  * NB: This cannot be used for hash aggregates, as for those the grouping set
680  * number has to be specified from further up.
681  *
682  * When called, CurrentMemoryContext should be the per-query context.
683  */
684 static void
initialize_aggregates(AggState * aggstate,AggStatePerGroup * pergroups,int numReset)685 initialize_aggregates(AggState *aggstate,
686 					  AggStatePerGroup *pergroups,
687 					  int numReset)
688 {
689 	int			transno;
690 	int			numGroupingSets = Max(aggstate->phase->numsets, 1);
691 	int			setno = 0;
692 	int			numTrans = aggstate->numtrans;
693 	AggStatePerTrans transstates = aggstate->pertrans;
694 
695 	if (numReset == 0)
696 		numReset = numGroupingSets;
697 
698 	for (setno = 0; setno < numReset; setno++)
699 	{
700 		AggStatePerGroup pergroup = pergroups[setno];
701 
702 		select_current_set(aggstate, setno, false);
703 
704 		for (transno = 0; transno < numTrans; transno++)
705 		{
706 			AggStatePerTrans pertrans = &transstates[transno];
707 			AggStatePerGroup pergroupstate = &pergroup[transno];
708 
709 			initialize_aggregate(aggstate, pertrans, pergroupstate);
710 		}
711 	}
712 }
713 
714 /*
715  * Given new input value(s), advance the transition function of one aggregate
716  * state within one grouping set only (already set in aggstate->current_set)
717  *
718  * The new values (and null flags) have been preloaded into argument positions
719  * 1 and up in pertrans->transfn_fcinfo, so that we needn't copy them again to
720  * pass to the transition function.  We also expect that the static fields of
721  * the fcinfo are already initialized; that was done by ExecInitAgg().
722  *
723  * It doesn't matter which memory context this is called in.
724  */
725 static void
advance_transition_function(AggState * aggstate,AggStatePerTrans pertrans,AggStatePerGroup pergroupstate)726 advance_transition_function(AggState *aggstate,
727 							AggStatePerTrans pertrans,
728 							AggStatePerGroup pergroupstate)
729 {
730 	FunctionCallInfo fcinfo = pertrans->transfn_fcinfo;
731 	MemoryContext oldContext;
732 	Datum		newVal;
733 
734 	if (pertrans->transfn.fn_strict)
735 	{
736 		/*
737 		 * For a strict transfn, nothing happens when there's a NULL input; we
738 		 * just keep the prior transValue.
739 		 */
740 		int			numTransInputs = pertrans->numTransInputs;
741 		int			i;
742 
743 		for (i = 1; i <= numTransInputs; i++)
744 		{
745 			if (fcinfo->args[i].isnull)
746 				return;
747 		}
748 		if (pergroupstate->noTransValue)
749 		{
750 			/*
751 			 * transValue has not been initialized. This is the first non-NULL
752 			 * input value. We use it as the initial value for transValue. (We
753 			 * already checked that the agg's input type is binary-compatible
754 			 * with its transtype, so straight copy here is OK.)
755 			 *
756 			 * We must copy the datum into aggcontext if it is pass-by-ref. We
757 			 * do not need to pfree the old transValue, since it's NULL.
758 			 */
759 			oldContext = MemoryContextSwitchTo(aggstate->curaggcontext->ecxt_per_tuple_memory);
760 			pergroupstate->transValue = datumCopy(fcinfo->args[1].value,
761 												  pertrans->transtypeByVal,
762 												  pertrans->transtypeLen);
763 			pergroupstate->transValueIsNull = false;
764 			pergroupstate->noTransValue = false;
765 			MemoryContextSwitchTo(oldContext);
766 			return;
767 		}
768 		if (pergroupstate->transValueIsNull)
769 		{
770 			/*
771 			 * Don't call a strict function with NULL inputs.  Note it is
772 			 * possible to get here despite the above tests, if the transfn is
773 			 * strict *and* returned a NULL on a prior cycle. If that happens
774 			 * we will propagate the NULL all the way to the end.
775 			 */
776 			return;
777 		}
778 	}
779 
780 	/* We run the transition functions in per-input-tuple memory context */
781 	oldContext = MemoryContextSwitchTo(aggstate->tmpcontext->ecxt_per_tuple_memory);
782 
783 	/* set up aggstate->curpertrans for AggGetAggref() */
784 	aggstate->curpertrans = pertrans;
785 
786 	/*
787 	 * OK to call the transition function
788 	 */
789 	fcinfo->args[0].value = pergroupstate->transValue;
790 	fcinfo->args[0].isnull = pergroupstate->transValueIsNull;
791 	fcinfo->isnull = false;		/* just in case transfn doesn't set it */
792 
793 	newVal = FunctionCallInvoke(fcinfo);
794 
795 	aggstate->curpertrans = NULL;
796 
797 	/*
798 	 * If pass-by-ref datatype, must copy the new value into aggcontext and
799 	 * free the prior transValue.  But if transfn returned a pointer to its
800 	 * first input, we don't need to do anything.  Also, if transfn returned a
801 	 * pointer to a R/W expanded object that is already a child of the
802 	 * aggcontext, assume we can adopt that value without copying it.
803 	 *
804 	 * It's safe to compare newVal with pergroup->transValue without regard
805 	 * for either being NULL, because ExecAggTransReparent() takes care to set
806 	 * transValue to 0 when NULL. Otherwise we could end up accidentally not
807 	 * reparenting, when the transValue has the same numerical value as
808 	 * newValue, despite being NULL.  This is a somewhat hot path, making it
809 	 * undesirable to instead solve this with another branch for the common
810 	 * case of the transition function returning its (modified) input
811 	 * argument.
812 	 */
813 	if (!pertrans->transtypeByVal &&
814 		DatumGetPointer(newVal) != DatumGetPointer(pergroupstate->transValue))
815 		newVal = ExecAggTransReparent(aggstate, pertrans,
816 									  newVal, fcinfo->isnull,
817 									  pergroupstate->transValue,
818 									  pergroupstate->transValueIsNull);
819 
820 	pergroupstate->transValue = newVal;
821 	pergroupstate->transValueIsNull = fcinfo->isnull;
822 
823 	MemoryContextSwitchTo(oldContext);
824 }
825 
826 /*
827  * Advance each aggregate transition state for one input tuple.  The input
828  * tuple has been stored in tmpcontext->ecxt_outertuple, so that it is
829  * accessible to ExecEvalExpr.
830  *
831  * We have two sets of transition states to handle: one for sorted aggregation
832  * and one for hashed; we do them both here, to avoid multiple evaluation of
833  * the inputs.
834  *
835  * When called, CurrentMemoryContext should be the per-query context.
836  */
837 static void
advance_aggregates(AggState * aggstate)838 advance_aggregates(AggState *aggstate)
839 {
840 	bool		dummynull;
841 
842 	ExecEvalExprSwitchContext(aggstate->phase->evaltrans,
843 							  aggstate->tmpcontext,
844 							  &dummynull);
845 }
846 
847 /*
848  * Run the transition function for a DISTINCT or ORDER BY aggregate
849  * with only one input.  This is called after we have completed
850  * entering all the input values into the sort object.  We complete the
851  * sort, read out the values in sorted order, and run the transition
852  * function on each value (applying DISTINCT if appropriate).
853  *
854  * Note that the strictness of the transition function was checked when
855  * entering the values into the sort, so we don't check it again here;
856  * we just apply standard SQL DISTINCT logic.
857  *
858  * The one-input case is handled separately from the multi-input case
859  * for performance reasons: for single by-value inputs, such as the
860  * common case of count(distinct id), the tuplesort_getdatum code path
861  * is around 300% faster.  (The speedup for by-reference types is less
862  * but still noticeable.)
863  *
864  * This function handles only one grouping set (already set in
865  * aggstate->current_set).
866  *
867  * When called, CurrentMemoryContext should be the per-query context.
868  */
869 static void
process_ordered_aggregate_single(AggState * aggstate,AggStatePerTrans pertrans,AggStatePerGroup pergroupstate)870 process_ordered_aggregate_single(AggState *aggstate,
871 								 AggStatePerTrans pertrans,
872 								 AggStatePerGroup pergroupstate)
873 {
874 	Datum		oldVal = (Datum) 0;
875 	bool		oldIsNull = true;
876 	bool		haveOldVal = false;
877 	MemoryContext workcontext = aggstate->tmpcontext->ecxt_per_tuple_memory;
878 	MemoryContext oldContext;
879 	bool		isDistinct = (pertrans->numDistinctCols > 0);
880 	Datum		newAbbrevVal = (Datum) 0;
881 	Datum		oldAbbrevVal = (Datum) 0;
882 	FunctionCallInfo fcinfo = pertrans->transfn_fcinfo;
883 	Datum	   *newVal;
884 	bool	   *isNull;
885 
886 	Assert(pertrans->numDistinctCols < 2);
887 
888 	tuplesort_performsort(pertrans->sortstates[aggstate->current_set]);
889 
890 	/* Load the column into argument 1 (arg 0 will be transition value) */
891 	newVal = &fcinfo->args[1].value;
892 	isNull = &fcinfo->args[1].isnull;
893 
894 	/*
895 	 * Note: if input type is pass-by-ref, the datums returned by the sort are
896 	 * freshly palloc'd in the per-query context, so we must be careful to
897 	 * pfree them when they are no longer needed.
898 	 */
899 
900 	while (tuplesort_getdatum(pertrans->sortstates[aggstate->current_set],
901 							  true, newVal, isNull, &newAbbrevVal))
902 	{
903 		/*
904 		 * Clear and select the working context for evaluation of the equality
905 		 * function and transition function.
906 		 */
907 		MemoryContextReset(workcontext);
908 		oldContext = MemoryContextSwitchTo(workcontext);
909 
910 		/*
911 		 * If DISTINCT mode, and not distinct from prior, skip it.
912 		 */
913 		if (isDistinct &&
914 			haveOldVal &&
915 			((oldIsNull && *isNull) ||
916 			 (!oldIsNull && !*isNull &&
917 			  oldAbbrevVal == newAbbrevVal &&
918 			  DatumGetBool(FunctionCall2Coll(&pertrans->equalfnOne,
919 											 pertrans->aggCollation,
920 											 oldVal, *newVal)))))
921 		{
922 			/* equal to prior, so forget this one */
923 			if (!pertrans->inputtypeByVal && !*isNull)
924 				pfree(DatumGetPointer(*newVal));
925 		}
926 		else
927 		{
928 			advance_transition_function(aggstate, pertrans, pergroupstate);
929 			/* forget the old value, if any */
930 			if (!oldIsNull && !pertrans->inputtypeByVal)
931 				pfree(DatumGetPointer(oldVal));
932 			/* and remember the new one for subsequent equality checks */
933 			oldVal = *newVal;
934 			oldAbbrevVal = newAbbrevVal;
935 			oldIsNull = *isNull;
936 			haveOldVal = true;
937 		}
938 
939 		MemoryContextSwitchTo(oldContext);
940 	}
941 
942 	if (!oldIsNull && !pertrans->inputtypeByVal)
943 		pfree(DatumGetPointer(oldVal));
944 
945 	tuplesort_end(pertrans->sortstates[aggstate->current_set]);
946 	pertrans->sortstates[aggstate->current_set] = NULL;
947 }
948 
949 /*
950  * Run the transition function for a DISTINCT or ORDER BY aggregate
951  * with more than one input.  This is called after we have completed
952  * entering all the input values into the sort object.  We complete the
953  * sort, read out the values in sorted order, and run the transition
954  * function on each value (applying DISTINCT if appropriate).
955  *
956  * This function handles only one grouping set (already set in
957  * aggstate->current_set).
958  *
959  * When called, CurrentMemoryContext should be the per-query context.
960  */
961 static void
process_ordered_aggregate_multi(AggState * aggstate,AggStatePerTrans pertrans,AggStatePerGroup pergroupstate)962 process_ordered_aggregate_multi(AggState *aggstate,
963 								AggStatePerTrans pertrans,
964 								AggStatePerGroup pergroupstate)
965 {
966 	ExprContext *tmpcontext = aggstate->tmpcontext;
967 	FunctionCallInfo fcinfo = pertrans->transfn_fcinfo;
968 	TupleTableSlot *slot1 = pertrans->sortslot;
969 	TupleTableSlot *slot2 = pertrans->uniqslot;
970 	int			numTransInputs = pertrans->numTransInputs;
971 	int			numDistinctCols = pertrans->numDistinctCols;
972 	Datum		newAbbrevVal = (Datum) 0;
973 	Datum		oldAbbrevVal = (Datum) 0;
974 	bool		haveOldValue = false;
975 	TupleTableSlot *save = aggstate->tmpcontext->ecxt_outertuple;
976 	int			i;
977 
978 	tuplesort_performsort(pertrans->sortstates[aggstate->current_set]);
979 
980 	ExecClearTuple(slot1);
981 	if (slot2)
982 		ExecClearTuple(slot2);
983 
984 	while (tuplesort_gettupleslot(pertrans->sortstates[aggstate->current_set],
985 								  true, true, slot1, &newAbbrevVal))
986 	{
987 		CHECK_FOR_INTERRUPTS();
988 
989 		tmpcontext->ecxt_outertuple = slot1;
990 		tmpcontext->ecxt_innertuple = slot2;
991 
992 		if (numDistinctCols == 0 ||
993 			!haveOldValue ||
994 			newAbbrevVal != oldAbbrevVal ||
995 			!ExecQual(pertrans->equalfnMulti, tmpcontext))
996 		{
997 			/*
998 			 * Extract the first numTransInputs columns as datums to pass to
999 			 * the transfn.
1000 			 */
1001 			slot_getsomeattrs(slot1, numTransInputs);
1002 
1003 			/* Load values into fcinfo */
1004 			/* Start from 1, since the 0th arg will be the transition value */
1005 			for (i = 0; i < numTransInputs; i++)
1006 			{
1007 				fcinfo->args[i + 1].value = slot1->tts_values[i];
1008 				fcinfo->args[i + 1].isnull = slot1->tts_isnull[i];
1009 			}
1010 
1011 			advance_transition_function(aggstate, pertrans, pergroupstate);
1012 
1013 			if (numDistinctCols > 0)
1014 			{
1015 				/* swap the slot pointers to retain the current tuple */
1016 				TupleTableSlot *tmpslot = slot2;
1017 
1018 				slot2 = slot1;
1019 				slot1 = tmpslot;
1020 				/* avoid ExecQual() calls by reusing abbreviated keys */
1021 				oldAbbrevVal = newAbbrevVal;
1022 				haveOldValue = true;
1023 			}
1024 		}
1025 
1026 		/* Reset context each time */
1027 		ResetExprContext(tmpcontext);
1028 
1029 		ExecClearTuple(slot1);
1030 	}
1031 
1032 	if (slot2)
1033 		ExecClearTuple(slot2);
1034 
1035 	tuplesort_end(pertrans->sortstates[aggstate->current_set]);
1036 	pertrans->sortstates[aggstate->current_set] = NULL;
1037 
1038 	/* restore previous slot, potentially in use for grouping sets */
1039 	tmpcontext->ecxt_outertuple = save;
1040 }
1041 
1042 /*
1043  * Compute the final value of one aggregate.
1044  *
1045  * This function handles only one grouping set (already set in
1046  * aggstate->current_set).
1047  *
1048  * The finalfn will be run, and the result delivered, in the
1049  * output-tuple context; caller's CurrentMemoryContext does not matter.
1050  *
1051  * The finalfn uses the state as set in the transno. This also might be
1052  * being used by another aggregate function, so it's important that we do
1053  * nothing destructive here.
1054  */
1055 static void
finalize_aggregate(AggState * aggstate,AggStatePerAgg peragg,AggStatePerGroup pergroupstate,Datum * resultVal,bool * resultIsNull)1056 finalize_aggregate(AggState *aggstate,
1057 				   AggStatePerAgg peragg,
1058 				   AggStatePerGroup pergroupstate,
1059 				   Datum *resultVal, bool *resultIsNull)
1060 {
1061 	LOCAL_FCINFO(fcinfo, FUNC_MAX_ARGS);
1062 	bool		anynull = false;
1063 	MemoryContext oldContext;
1064 	int			i;
1065 	ListCell   *lc;
1066 	AggStatePerTrans pertrans = &aggstate->pertrans[peragg->transno];
1067 
1068 	oldContext = MemoryContextSwitchTo(aggstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
1069 
1070 	/*
1071 	 * Evaluate any direct arguments.  We do this even if there's no finalfn
1072 	 * (which is unlikely anyway), so that side-effects happen as expected.
1073 	 * The direct arguments go into arg positions 1 and up, leaving position 0
1074 	 * for the transition state value.
1075 	 */
1076 	i = 1;
1077 	foreach(lc, peragg->aggdirectargs)
1078 	{
1079 		ExprState  *expr = (ExprState *) lfirst(lc);
1080 
1081 		fcinfo->args[i].value = ExecEvalExpr(expr,
1082 											 aggstate->ss.ps.ps_ExprContext,
1083 											 &fcinfo->args[i].isnull);
1084 		anynull |= fcinfo->args[i].isnull;
1085 		i++;
1086 	}
1087 
1088 	/*
1089 	 * Apply the agg's finalfn if one is provided, else return transValue.
1090 	 */
1091 	if (OidIsValid(peragg->finalfn_oid))
1092 	{
1093 		int			numFinalArgs = peragg->numFinalArgs;
1094 
1095 		/* set up aggstate->curperagg for AggGetAggref() */
1096 		aggstate->curperagg = peragg;
1097 
1098 		InitFunctionCallInfoData(*fcinfo, &peragg->finalfn,
1099 								 numFinalArgs,
1100 								 pertrans->aggCollation,
1101 								 (void *) aggstate, NULL);
1102 
1103 		/* Fill in the transition state value */
1104 		fcinfo->args[0].value =
1105 			MakeExpandedObjectReadOnly(pergroupstate->transValue,
1106 									   pergroupstate->transValueIsNull,
1107 									   pertrans->transtypeLen);
1108 		fcinfo->args[0].isnull = pergroupstate->transValueIsNull;
1109 		anynull |= pergroupstate->transValueIsNull;
1110 
1111 		/* Fill any remaining argument positions with nulls */
1112 		for (; i < numFinalArgs; i++)
1113 		{
1114 			fcinfo->args[i].value = (Datum) 0;
1115 			fcinfo->args[i].isnull = true;
1116 			anynull = true;
1117 		}
1118 
1119 		if (fcinfo->flinfo->fn_strict && anynull)
1120 		{
1121 			/* don't call a strict function with NULL inputs */
1122 			*resultVal = (Datum) 0;
1123 			*resultIsNull = true;
1124 		}
1125 		else
1126 		{
1127 			*resultVal = FunctionCallInvoke(fcinfo);
1128 			*resultIsNull = fcinfo->isnull;
1129 		}
1130 		aggstate->curperagg = NULL;
1131 	}
1132 	else
1133 	{
1134 		/* Don't need MakeExpandedObjectReadOnly; datumCopy will copy it */
1135 		*resultVal = pergroupstate->transValue;
1136 		*resultIsNull = pergroupstate->transValueIsNull;
1137 	}
1138 
1139 	/*
1140 	 * If result is pass-by-ref, make sure it is in the right context.
1141 	 */
1142 	if (!peragg->resulttypeByVal && !*resultIsNull &&
1143 		!MemoryContextContains(CurrentMemoryContext,
1144 							   DatumGetPointer(*resultVal)))
1145 		*resultVal = datumCopy(*resultVal,
1146 							   peragg->resulttypeByVal,
1147 							   peragg->resulttypeLen);
1148 
1149 	MemoryContextSwitchTo(oldContext);
1150 }
1151 
1152 /*
1153  * Compute the output value of one partial aggregate.
1154  *
1155  * The serialization function will be run, and the result delivered, in the
1156  * output-tuple context; caller's CurrentMemoryContext does not matter.
1157  */
1158 static void
finalize_partialaggregate(AggState * aggstate,AggStatePerAgg peragg,AggStatePerGroup pergroupstate,Datum * resultVal,bool * resultIsNull)1159 finalize_partialaggregate(AggState *aggstate,
1160 						  AggStatePerAgg peragg,
1161 						  AggStatePerGroup pergroupstate,
1162 						  Datum *resultVal, bool *resultIsNull)
1163 {
1164 	AggStatePerTrans pertrans = &aggstate->pertrans[peragg->transno];
1165 	MemoryContext oldContext;
1166 
1167 	oldContext = MemoryContextSwitchTo(aggstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
1168 
1169 	/*
1170 	 * serialfn_oid will be set if we must serialize the transvalue before
1171 	 * returning it
1172 	 */
1173 	if (OidIsValid(pertrans->serialfn_oid))
1174 	{
1175 		/* Don't call a strict serialization function with NULL input. */
1176 		if (pertrans->serialfn.fn_strict && pergroupstate->transValueIsNull)
1177 		{
1178 			*resultVal = (Datum) 0;
1179 			*resultIsNull = true;
1180 		}
1181 		else
1182 		{
1183 			FunctionCallInfo fcinfo = pertrans->serialfn_fcinfo;
1184 
1185 			fcinfo->args[0].value =
1186 				MakeExpandedObjectReadOnly(pergroupstate->transValue,
1187 										   pergroupstate->transValueIsNull,
1188 										   pertrans->transtypeLen);
1189 			fcinfo->args[0].isnull = pergroupstate->transValueIsNull;
1190 			fcinfo->isnull = false;
1191 
1192 			*resultVal = FunctionCallInvoke(fcinfo);
1193 			*resultIsNull = fcinfo->isnull;
1194 		}
1195 	}
1196 	else
1197 	{
1198 		/* Don't need MakeExpandedObjectReadOnly; datumCopy will copy it */
1199 		*resultVal = pergroupstate->transValue;
1200 		*resultIsNull = pergroupstate->transValueIsNull;
1201 	}
1202 
1203 	/* If result is pass-by-ref, make sure it is in the right context. */
1204 	if (!peragg->resulttypeByVal && !*resultIsNull &&
1205 		!MemoryContextContains(CurrentMemoryContext,
1206 							   DatumGetPointer(*resultVal)))
1207 		*resultVal = datumCopy(*resultVal,
1208 							   peragg->resulttypeByVal,
1209 							   peragg->resulttypeLen);
1210 
1211 	MemoryContextSwitchTo(oldContext);
1212 }
1213 
1214 /*
1215  * Extract the attributes that make up the grouping key into the
1216  * hashslot. This is necessary to compute the hash or perform a lookup.
1217  */
1218 static inline void
prepare_hash_slot(AggStatePerHash perhash,TupleTableSlot * inputslot,TupleTableSlot * hashslot)1219 prepare_hash_slot(AggStatePerHash perhash,
1220 				  TupleTableSlot *inputslot,
1221 				  TupleTableSlot *hashslot)
1222 {
1223 	int			i;
1224 
1225 	/* transfer just the needed columns into hashslot */
1226 	slot_getsomeattrs(inputslot, perhash->largestGrpColIdx);
1227 	ExecClearTuple(hashslot);
1228 
1229 	for (i = 0; i < perhash->numhashGrpCols; i++)
1230 	{
1231 		int			varNumber = perhash->hashGrpColIdxInput[i] - 1;
1232 
1233 		hashslot->tts_values[i] = inputslot->tts_values[varNumber];
1234 		hashslot->tts_isnull[i] = inputslot->tts_isnull[varNumber];
1235 	}
1236 	ExecStoreVirtualTuple(hashslot);
1237 }
1238 
1239 /*
1240  * Prepare to finalize and project based on the specified representative tuple
1241  * slot and grouping set.
1242  *
1243  * In the specified tuple slot, force to null all attributes that should be
1244  * read as null in the context of the current grouping set.  Also stash the
1245  * current group bitmap where GroupingExpr can get at it.
1246  *
1247  * This relies on three conditions:
1248  *
1249  * 1) Nothing is ever going to try and extract the whole tuple from this slot,
1250  * only reference it in evaluations, which will only access individual
1251  * attributes.
1252  *
1253  * 2) No system columns are going to need to be nulled. (If a system column is
1254  * referenced in a group clause, it is actually projected in the outer plan
1255  * tlist.)
1256  *
1257  * 3) Within a given phase, we never need to recover the value of an attribute
1258  * once it has been set to null.
1259  *
1260  * Poking into the slot this way is a bit ugly, but the consensus is that the
1261  * alternative was worse.
1262  */
1263 static void
prepare_projection_slot(AggState * aggstate,TupleTableSlot * slot,int currentSet)1264 prepare_projection_slot(AggState *aggstate, TupleTableSlot *slot, int currentSet)
1265 {
1266 	if (aggstate->phase->grouped_cols)
1267 	{
1268 		Bitmapset  *grouped_cols = aggstate->phase->grouped_cols[currentSet];
1269 
1270 		aggstate->grouped_cols = grouped_cols;
1271 
1272 		if (TTS_EMPTY(slot))
1273 		{
1274 			/*
1275 			 * Force all values to be NULL if working on an empty input tuple
1276 			 * (i.e. an empty grouping set for which no input rows were
1277 			 * supplied).
1278 			 */
1279 			ExecStoreAllNullTuple(slot);
1280 		}
1281 		else if (aggstate->all_grouped_cols)
1282 		{
1283 			ListCell   *lc;
1284 
1285 			/* all_grouped_cols is arranged in desc order */
1286 			slot_getsomeattrs(slot, linitial_int(aggstate->all_grouped_cols));
1287 
1288 			foreach(lc, aggstate->all_grouped_cols)
1289 			{
1290 				int			attnum = lfirst_int(lc);
1291 
1292 				if (!bms_is_member(attnum, grouped_cols))
1293 					slot->tts_isnull[attnum - 1] = true;
1294 			}
1295 		}
1296 	}
1297 }
1298 
1299 /*
1300  * Compute the final value of all aggregates for one group.
1301  *
1302  * This function handles only one grouping set at a time, which the caller must
1303  * have selected.  It's also the caller's responsibility to adjust the supplied
1304  * pergroup parameter to point to the current set's transvalues.
1305  *
1306  * Results are stored in the output econtext aggvalues/aggnulls.
1307  */
1308 static void
finalize_aggregates(AggState * aggstate,AggStatePerAgg peraggs,AggStatePerGroup pergroup)1309 finalize_aggregates(AggState *aggstate,
1310 					AggStatePerAgg peraggs,
1311 					AggStatePerGroup pergroup)
1312 {
1313 	ExprContext *econtext = aggstate->ss.ps.ps_ExprContext;
1314 	Datum	   *aggvalues = econtext->ecxt_aggvalues;
1315 	bool	   *aggnulls = econtext->ecxt_aggnulls;
1316 	int			aggno;
1317 	int			transno;
1318 
1319 	/*
1320 	 * If there were any DISTINCT and/or ORDER BY aggregates, sort their
1321 	 * inputs and run the transition functions.
1322 	 */
1323 	for (transno = 0; transno < aggstate->numtrans; transno++)
1324 	{
1325 		AggStatePerTrans pertrans = &aggstate->pertrans[transno];
1326 		AggStatePerGroup pergroupstate;
1327 
1328 		pergroupstate = &pergroup[transno];
1329 
1330 		if (pertrans->numSortCols > 0)
1331 		{
1332 			Assert(aggstate->aggstrategy != AGG_HASHED &&
1333 				   aggstate->aggstrategy != AGG_MIXED);
1334 
1335 			if (pertrans->numInputs == 1)
1336 				process_ordered_aggregate_single(aggstate,
1337 												 pertrans,
1338 												 pergroupstate);
1339 			else
1340 				process_ordered_aggregate_multi(aggstate,
1341 												pertrans,
1342 												pergroupstate);
1343 		}
1344 	}
1345 
1346 	/*
1347 	 * Run the final functions.
1348 	 */
1349 	for (aggno = 0; aggno < aggstate->numaggs; aggno++)
1350 	{
1351 		AggStatePerAgg peragg = &peraggs[aggno];
1352 		int			transno = peragg->transno;
1353 		AggStatePerGroup pergroupstate;
1354 
1355 		pergroupstate = &pergroup[transno];
1356 
1357 		if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit))
1358 			finalize_partialaggregate(aggstate, peragg, pergroupstate,
1359 									  &aggvalues[aggno], &aggnulls[aggno]);
1360 		else
1361 			finalize_aggregate(aggstate, peragg, pergroupstate,
1362 							   &aggvalues[aggno], &aggnulls[aggno]);
1363 	}
1364 }
1365 
1366 /*
1367  * Project the result of a group (whose aggs have already been calculated by
1368  * finalize_aggregates). Returns the result slot, or NULL if no row is
1369  * projected (suppressed by qual).
1370  */
1371 static TupleTableSlot *
project_aggregates(AggState * aggstate)1372 project_aggregates(AggState *aggstate)
1373 {
1374 	ExprContext *econtext = aggstate->ss.ps.ps_ExprContext;
1375 
1376 	/*
1377 	 * Check the qual (HAVING clause); if the group does not match, ignore it.
1378 	 */
1379 	if (ExecQual(aggstate->ss.ps.qual, econtext))
1380 	{
1381 		/*
1382 		 * Form and return projection tuple using the aggregate results and
1383 		 * the representative input tuple.
1384 		 */
1385 		return ExecProject(aggstate->ss.ps.ps_ProjInfo);
1386 	}
1387 	else
1388 		InstrCountFiltered1(aggstate, 1);
1389 
1390 	return NULL;
1391 }
1392 
1393 /*
1394  * Find input-tuple columns that are needed, dividing them into
1395  * aggregated and unaggregated sets.
1396  */
1397 static void
find_cols(AggState * aggstate,Bitmapset ** aggregated,Bitmapset ** unaggregated)1398 find_cols(AggState *aggstate, Bitmapset **aggregated, Bitmapset **unaggregated)
1399 {
1400 	Agg		   *agg = (Agg *) aggstate->ss.ps.plan;
1401 	FindColsContext context;
1402 
1403 	context.is_aggref = false;
1404 	context.aggregated = NULL;
1405 	context.unaggregated = NULL;
1406 
1407 	/* Examine tlist and quals */
1408 	(void) find_cols_walker((Node *) agg->plan.targetlist, &context);
1409 	(void) find_cols_walker((Node *) agg->plan.qual, &context);
1410 
1411 	/* In some cases, grouping columns will not appear in the tlist */
1412 	for (int i = 0; i < agg->numCols; i++)
1413 		context.unaggregated = bms_add_member(context.unaggregated,
1414 											  agg->grpColIdx[i]);
1415 
1416 	*aggregated = context.aggregated;
1417 	*unaggregated = context.unaggregated;
1418 }
1419 
1420 static bool
find_cols_walker(Node * node,FindColsContext * context)1421 find_cols_walker(Node *node, FindColsContext *context)
1422 {
1423 	if (node == NULL)
1424 		return false;
1425 	if (IsA(node, Var))
1426 	{
1427 		Var		   *var = (Var *) node;
1428 
1429 		/* setrefs.c should have set the varno to OUTER_VAR */
1430 		Assert(var->varno == OUTER_VAR);
1431 		Assert(var->varlevelsup == 0);
1432 		if (context->is_aggref)
1433 			context->aggregated = bms_add_member(context->aggregated,
1434 												 var->varattno);
1435 		else
1436 			context->unaggregated = bms_add_member(context->unaggregated,
1437 												   var->varattno);
1438 		return false;
1439 	}
1440 	if (IsA(node, Aggref))
1441 	{
1442 		Assert(!context->is_aggref);
1443 		context->is_aggref = true;
1444 		expression_tree_walker(node, find_cols_walker, (void *) context);
1445 		context->is_aggref = false;
1446 		return false;
1447 	}
1448 	return expression_tree_walker(node, find_cols_walker,
1449 								  (void *) context);
1450 }
1451 
1452 /*
1453  * (Re-)initialize the hash table(s) to empty.
1454  *
1455  * To implement hashed aggregation, we need a hashtable that stores a
1456  * representative tuple and an array of AggStatePerGroup structs for each
1457  * distinct set of GROUP BY column values.  We compute the hash key from the
1458  * GROUP BY columns.  The per-group data is allocated in lookup_hash_entry(),
1459  * for each entry.
1460  *
1461  * We have a separate hashtable and associated perhash data structure for each
1462  * grouping set for which we're doing hashing.
1463  *
1464  * The contents of the hash tables always live in the hashcontext's per-tuple
1465  * memory context (there is only one of these for all tables together, since
1466  * they are all reset at the same time).
1467  */
1468 static void
build_hash_tables(AggState * aggstate)1469 build_hash_tables(AggState *aggstate)
1470 {
1471 	int			setno;
1472 
1473 	for (setno = 0; setno < aggstate->num_hashes; ++setno)
1474 	{
1475 		AggStatePerHash perhash = &aggstate->perhash[setno];
1476 		long		nbuckets;
1477 		Size		memory;
1478 
1479 		if (perhash->hashtable != NULL)
1480 		{
1481 			ResetTupleHashTable(perhash->hashtable);
1482 			continue;
1483 		}
1484 
1485 		Assert(perhash->aggnode->numGroups > 0);
1486 
1487 		memory = aggstate->hash_mem_limit / aggstate->num_hashes;
1488 
1489 		/* choose reasonable number of buckets per hashtable */
1490 		nbuckets = hash_choose_num_buckets(aggstate->hashentrysize,
1491 										   perhash->aggnode->numGroups,
1492 										   memory);
1493 
1494 		build_hash_table(aggstate, setno, nbuckets);
1495 	}
1496 
1497 	aggstate->hash_ngroups_current = 0;
1498 }
1499 
1500 /*
1501  * Build a single hashtable for this grouping set.
1502  */
1503 static void
build_hash_table(AggState * aggstate,int setno,long nbuckets)1504 build_hash_table(AggState *aggstate, int setno, long nbuckets)
1505 {
1506 	AggStatePerHash perhash = &aggstate->perhash[setno];
1507 	MemoryContext metacxt = aggstate->hash_metacxt;
1508 	MemoryContext hashcxt = aggstate->hashcontext->ecxt_per_tuple_memory;
1509 	MemoryContext tmpcxt = aggstate->tmpcontext->ecxt_per_tuple_memory;
1510 	Size		additionalsize;
1511 
1512 	Assert(aggstate->aggstrategy == AGG_HASHED ||
1513 		   aggstate->aggstrategy == AGG_MIXED);
1514 
1515 	/*
1516 	 * Used to make sure initial hash table allocation does not exceed
1517 	 * hash_mem. Note that the estimate does not include space for
1518 	 * pass-by-reference transition data values, nor for the representative
1519 	 * tuple of each group.
1520 	 */
1521 	additionalsize = aggstate->numtrans * sizeof(AggStatePerGroupData);
1522 
1523 	perhash->hashtable = BuildTupleHashTableExt(&aggstate->ss.ps,
1524 												perhash->hashslot->tts_tupleDescriptor,
1525 												perhash->numCols,
1526 												perhash->hashGrpColIdxHash,
1527 												perhash->eqfuncoids,
1528 												perhash->hashfunctions,
1529 												perhash->aggnode->grpCollations,
1530 												nbuckets,
1531 												additionalsize,
1532 												metacxt,
1533 												hashcxt,
1534 												tmpcxt,
1535 												DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit));
1536 }
1537 
1538 /*
1539  * Compute columns that actually need to be stored in hashtable entries.  The
1540  * incoming tuples from the child plan node will contain grouping columns,
1541  * other columns referenced in our targetlist and qual, columns used to
1542  * compute the aggregate functions, and perhaps just junk columns we don't use
1543  * at all.  Only columns of the first two types need to be stored in the
1544  * hashtable, and getting rid of the others can make the table entries
1545  * significantly smaller.  The hashtable only contains the relevant columns,
1546  * and is packed/unpacked in lookup_hash_entry() / agg_retrieve_hash_table()
1547  * into the format of the normal input descriptor.
1548  *
1549  * Additional columns, in addition to the columns grouped by, come from two
1550  * sources: Firstly functionally dependent columns that we don't need to group
1551  * by themselves, and secondly ctids for row-marks.
1552  *
1553  * To eliminate duplicates, we build a bitmapset of the needed columns, and
1554  * then build an array of the columns included in the hashtable. We might
1555  * still have duplicates if the passed-in grpColIdx has them, which can happen
1556  * in edge cases from semijoins/distinct; these can't always be removed,
1557  * because it's not certain that the duplicate cols will be using the same
1558  * hash function.
1559  *
1560  * Note that the array is preserved over ExecReScanAgg, so we allocate it in
1561  * the per-query context (unlike the hash table itself).
1562  */
1563 static void
find_hash_columns(AggState * aggstate)1564 find_hash_columns(AggState *aggstate)
1565 {
1566 	Bitmapset  *base_colnos;
1567 	Bitmapset  *aggregated_colnos;
1568 	TupleDesc	scanDesc = aggstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
1569 	List	   *outerTlist = outerPlanState(aggstate)->plan->targetlist;
1570 	int			numHashes = aggstate->num_hashes;
1571 	EState	   *estate = aggstate->ss.ps.state;
1572 	int			j;
1573 
1574 	/* Find Vars that will be needed in tlist and qual */
1575 	find_cols(aggstate, &aggregated_colnos, &base_colnos);
1576 	aggstate->colnos_needed = bms_union(base_colnos, aggregated_colnos);
1577 	aggstate->max_colno_needed = 0;
1578 	aggstate->all_cols_needed = true;
1579 
1580 	for (int i = 0; i < scanDesc->natts; i++)
1581 	{
1582 		int			colno = i + 1;
1583 
1584 		if (bms_is_member(colno, aggstate->colnos_needed))
1585 			aggstate->max_colno_needed = colno;
1586 		else
1587 			aggstate->all_cols_needed = false;
1588 	}
1589 
1590 	for (j = 0; j < numHashes; ++j)
1591 	{
1592 		AggStatePerHash perhash = &aggstate->perhash[j];
1593 		Bitmapset  *colnos = bms_copy(base_colnos);
1594 		AttrNumber *grpColIdx = perhash->aggnode->grpColIdx;
1595 		List	   *hashTlist = NIL;
1596 		TupleDesc	hashDesc;
1597 		int			maxCols;
1598 		int			i;
1599 
1600 		perhash->largestGrpColIdx = 0;
1601 
1602 		/*
1603 		 * If we're doing grouping sets, then some Vars might be referenced in
1604 		 * tlist/qual for the benefit of other grouping sets, but not needed
1605 		 * when hashing; i.e. prepare_projection_slot will null them out, so
1606 		 * there'd be no point storing them.  Use prepare_projection_slot's
1607 		 * logic to determine which.
1608 		 */
1609 		if (aggstate->phases[0].grouped_cols)
1610 		{
1611 			Bitmapset  *grouped_cols = aggstate->phases[0].grouped_cols[j];
1612 			ListCell   *lc;
1613 
1614 			foreach(lc, aggstate->all_grouped_cols)
1615 			{
1616 				int			attnum = lfirst_int(lc);
1617 
1618 				if (!bms_is_member(attnum, grouped_cols))
1619 					colnos = bms_del_member(colnos, attnum);
1620 			}
1621 		}
1622 
1623 		/*
1624 		 * Compute maximum number of input columns accounting for possible
1625 		 * duplications in the grpColIdx array, which can happen in some edge
1626 		 * cases where HashAggregate was generated as part of a semijoin or a
1627 		 * DISTINCT.
1628 		 */
1629 		maxCols = bms_num_members(colnos) + perhash->numCols;
1630 
1631 		perhash->hashGrpColIdxInput =
1632 			palloc(maxCols * sizeof(AttrNumber));
1633 		perhash->hashGrpColIdxHash =
1634 			palloc(perhash->numCols * sizeof(AttrNumber));
1635 
1636 		/* Add all the grouping columns to colnos */
1637 		for (i = 0; i < perhash->numCols; i++)
1638 			colnos = bms_add_member(colnos, grpColIdx[i]);
1639 
1640 		/*
1641 		 * First build mapping for columns directly hashed. These are the
1642 		 * first, because they'll be accessed when computing hash values and
1643 		 * comparing tuples for exact matches. We also build simple mapping
1644 		 * for execGrouping, so it knows where to find the to-be-hashed /
1645 		 * compared columns in the input.
1646 		 */
1647 		for (i = 0; i < perhash->numCols; i++)
1648 		{
1649 			perhash->hashGrpColIdxInput[i] = grpColIdx[i];
1650 			perhash->hashGrpColIdxHash[i] = i + 1;
1651 			perhash->numhashGrpCols++;
1652 			/* delete already mapped columns */
1653 			bms_del_member(colnos, grpColIdx[i]);
1654 		}
1655 
1656 		/* and add the remaining columns */
1657 		while ((i = bms_first_member(colnos)) >= 0)
1658 		{
1659 			perhash->hashGrpColIdxInput[perhash->numhashGrpCols] = i;
1660 			perhash->numhashGrpCols++;
1661 		}
1662 
1663 		/* and build a tuple descriptor for the hashtable */
1664 		for (i = 0; i < perhash->numhashGrpCols; i++)
1665 		{
1666 			int			varNumber = perhash->hashGrpColIdxInput[i] - 1;
1667 
1668 			hashTlist = lappend(hashTlist, list_nth(outerTlist, varNumber));
1669 			perhash->largestGrpColIdx =
1670 				Max(varNumber + 1, perhash->largestGrpColIdx);
1671 		}
1672 
1673 		hashDesc = ExecTypeFromTL(hashTlist);
1674 
1675 		execTuplesHashPrepare(perhash->numCols,
1676 							  perhash->aggnode->grpOperators,
1677 							  &perhash->eqfuncoids,
1678 							  &perhash->hashfunctions);
1679 		perhash->hashslot =
1680 			ExecAllocTableSlot(&estate->es_tupleTable, hashDesc,
1681 							   &TTSOpsMinimalTuple);
1682 
1683 		list_free(hashTlist);
1684 		bms_free(colnos);
1685 	}
1686 
1687 	bms_free(base_colnos);
1688 }
1689 
1690 /*
1691  * Estimate per-hash-table-entry overhead.
1692  */
1693 Size
hash_agg_entry_size(int numTrans,Size tupleWidth,Size transitionSpace)1694 hash_agg_entry_size(int numTrans, Size tupleWidth, Size transitionSpace)
1695 {
1696 	Size		tupleChunkSize;
1697 	Size		pergroupChunkSize;
1698 	Size		transitionChunkSize;
1699 	Size		tupleSize = (MAXALIGN(SizeofMinimalTupleHeader) +
1700 							 tupleWidth);
1701 	Size		pergroupSize = numTrans * sizeof(AggStatePerGroupData);
1702 
1703 	tupleChunkSize = CHUNKHDRSZ + tupleSize;
1704 
1705 	if (pergroupSize > 0)
1706 		pergroupChunkSize = CHUNKHDRSZ + pergroupSize;
1707 	else
1708 		pergroupChunkSize = 0;
1709 
1710 	if (transitionSpace > 0)
1711 		transitionChunkSize = CHUNKHDRSZ + transitionSpace;
1712 	else
1713 		transitionChunkSize = 0;
1714 
1715 	return
1716 		sizeof(TupleHashEntryData) +
1717 		tupleChunkSize +
1718 		pergroupChunkSize +
1719 		transitionChunkSize;
1720 }
1721 
1722 /*
1723  * hashagg_recompile_expressions()
1724  *
1725  * Identifies the right phase, compiles the right expression given the
1726  * arguments, and then sets phase->evalfunc to that expression.
1727  *
1728  * Different versions of the compiled expression are needed depending on
1729  * whether hash aggregation has spilled or not, and whether it's reading from
1730  * the outer plan or a tape. Before spilling to disk, the expression reads
1731  * from the outer plan and does not need to perform a NULL check. After
1732  * HashAgg begins to spill, new groups will not be created in the hash table,
1733  * and the AggStatePerGroup array may be NULL; therefore we need to add a null
1734  * pointer check to the expression. Then, when reading spilled data from a
1735  * tape, we change the outer slot type to be a fixed minimal tuple slot.
1736  *
1737  * It would be wasteful to recompile every time, so cache the compiled
1738  * expressions in the AggStatePerPhase, and reuse when appropriate.
1739  */
1740 static void
hashagg_recompile_expressions(AggState * aggstate,bool minslot,bool nullcheck)1741 hashagg_recompile_expressions(AggState *aggstate, bool minslot, bool nullcheck)
1742 {
1743 	AggStatePerPhase phase;
1744 	int			i = minslot ? 1 : 0;
1745 	int			j = nullcheck ? 1 : 0;
1746 
1747 	Assert(aggstate->aggstrategy == AGG_HASHED ||
1748 		   aggstate->aggstrategy == AGG_MIXED);
1749 
1750 	if (aggstate->aggstrategy == AGG_HASHED)
1751 		phase = &aggstate->phases[0];
1752 	else						/* AGG_MIXED */
1753 		phase = &aggstate->phases[1];
1754 
1755 	if (phase->evaltrans_cache[i][j] == NULL)
1756 	{
1757 		const TupleTableSlotOps *outerops = aggstate->ss.ps.outerops;
1758 		bool		outerfixed = aggstate->ss.ps.outeropsfixed;
1759 		bool		dohash = true;
1760 		bool		dosort = false;
1761 
1762 		/*
1763 		 * If minslot is true, that means we are processing a spilled batch
1764 		 * (inside agg_refill_hash_table()), and we must not advance the
1765 		 * sorted grouping sets.
1766 		 */
1767 		if (aggstate->aggstrategy == AGG_MIXED && !minslot)
1768 			dosort = true;
1769 
1770 		/* temporarily change the outerops while compiling the expression */
1771 		if (minslot)
1772 		{
1773 			aggstate->ss.ps.outerops = &TTSOpsMinimalTuple;
1774 			aggstate->ss.ps.outeropsfixed = true;
1775 		}
1776 
1777 		phase->evaltrans_cache[i][j] = ExecBuildAggTrans(aggstate, phase,
1778 														 dosort, dohash,
1779 														 nullcheck);
1780 
1781 		/* change back */
1782 		aggstate->ss.ps.outerops = outerops;
1783 		aggstate->ss.ps.outeropsfixed = outerfixed;
1784 	}
1785 
1786 	phase->evaltrans = phase->evaltrans_cache[i][j];
1787 }
1788 
1789 /*
1790  * Set limits that trigger spilling to avoid exceeding hash_mem. Consider the
1791  * number of partitions we expect to create (if we do spill).
1792  *
1793  * There are two limits: a memory limit, and also an ngroups limit. The
1794  * ngroups limit becomes important when we expect transition values to grow
1795  * substantially larger than the initial value.
1796  */
1797 void
hash_agg_set_limits(double hashentrysize,double input_groups,int used_bits,Size * mem_limit,uint64 * ngroups_limit,int * num_partitions)1798 hash_agg_set_limits(double hashentrysize, double input_groups, int used_bits,
1799 					Size *mem_limit, uint64 *ngroups_limit,
1800 					int *num_partitions)
1801 {
1802 	int			npartitions;
1803 	Size		partition_mem;
1804 	Size		hash_mem_limit = get_hash_memory_limit();
1805 
1806 	/* if not expected to spill, use all of hash_mem */
1807 	if (input_groups * hashentrysize <= hash_mem_limit)
1808 	{
1809 		if (num_partitions != NULL)
1810 			*num_partitions = 0;
1811 		*mem_limit = hash_mem_limit;
1812 		*ngroups_limit = hash_mem_limit / hashentrysize;
1813 		return;
1814 	}
1815 
1816 	/*
1817 	 * Calculate expected memory requirements for spilling, which is the size
1818 	 * of the buffers needed for all the tapes that need to be open at once.
1819 	 * Then, subtract that from the memory available for holding hash tables.
1820 	 */
1821 	npartitions = hash_choose_num_partitions(input_groups,
1822 											 hashentrysize,
1823 											 used_bits,
1824 											 NULL);
1825 	if (num_partitions != NULL)
1826 		*num_partitions = npartitions;
1827 
1828 	partition_mem =
1829 		HASHAGG_READ_BUFFER_SIZE +
1830 		HASHAGG_WRITE_BUFFER_SIZE * npartitions;
1831 
1832 	/*
1833 	 * Don't set the limit below 3/4 of hash_mem. In that case, we are at the
1834 	 * minimum number of partitions, so we aren't going to dramatically exceed
1835 	 * work mem anyway.
1836 	 */
1837 	if (hash_mem_limit > 4 * partition_mem)
1838 		*mem_limit = hash_mem_limit - partition_mem;
1839 	else
1840 		*mem_limit = hash_mem_limit * 0.75;
1841 
1842 	if (*mem_limit > hashentrysize)
1843 		*ngroups_limit = *mem_limit / hashentrysize;
1844 	else
1845 		*ngroups_limit = 1;
1846 }
1847 
1848 /*
1849  * hash_agg_check_limits
1850  *
1851  * After adding a new group to the hash table, check whether we need to enter
1852  * spill mode. Allocations may happen without adding new groups (for instance,
1853  * if the transition state size grows), so this check is imperfect.
1854  */
1855 static void
hash_agg_check_limits(AggState * aggstate)1856 hash_agg_check_limits(AggState *aggstate)
1857 {
1858 	uint64		ngroups = aggstate->hash_ngroups_current;
1859 	Size		meta_mem = MemoryContextMemAllocated(aggstate->hash_metacxt,
1860 													 true);
1861 	Size		hashkey_mem = MemoryContextMemAllocated(aggstate->hashcontext->ecxt_per_tuple_memory,
1862 														true);
1863 
1864 	/*
1865 	 * Don't spill unless there's at least one group in the hash table so we
1866 	 * can be sure to make progress even in edge cases.
1867 	 */
1868 	if (aggstate->hash_ngroups_current > 0 &&
1869 		(meta_mem + hashkey_mem > aggstate->hash_mem_limit ||
1870 		 ngroups > aggstate->hash_ngroups_limit))
1871 	{
1872 		hash_agg_enter_spill_mode(aggstate);
1873 	}
1874 }
1875 
1876 /*
1877  * Enter "spill mode", meaning that no new groups are added to any of the hash
1878  * tables. Tuples that would create a new group are instead spilled, and
1879  * processed later.
1880  */
1881 static void
hash_agg_enter_spill_mode(AggState * aggstate)1882 hash_agg_enter_spill_mode(AggState *aggstate)
1883 {
1884 	aggstate->hash_spill_mode = true;
1885 	hashagg_recompile_expressions(aggstate, aggstate->table_filled, true);
1886 
1887 	if (!aggstate->hash_ever_spilled)
1888 	{
1889 		Assert(aggstate->hash_tapeinfo == NULL);
1890 		Assert(aggstate->hash_spills == NULL);
1891 
1892 		aggstate->hash_ever_spilled = true;
1893 
1894 		hashagg_tapeinfo_init(aggstate);
1895 
1896 		aggstate->hash_spills = palloc(sizeof(HashAggSpill) * aggstate->num_hashes);
1897 
1898 		for (int setno = 0; setno < aggstate->num_hashes; setno++)
1899 		{
1900 			AggStatePerHash perhash = &aggstate->perhash[setno];
1901 			HashAggSpill *spill = &aggstate->hash_spills[setno];
1902 
1903 			hashagg_spill_init(spill, aggstate->hash_tapeinfo, 0,
1904 							   perhash->aggnode->numGroups,
1905 							   aggstate->hashentrysize);
1906 		}
1907 	}
1908 }
1909 
1910 /*
1911  * Update metrics after filling the hash table.
1912  *
1913  * If reading from the outer plan, from_tape should be false; if reading from
1914  * another tape, from_tape should be true.
1915  */
1916 static void
hash_agg_update_metrics(AggState * aggstate,bool from_tape,int npartitions)1917 hash_agg_update_metrics(AggState *aggstate, bool from_tape, int npartitions)
1918 {
1919 	Size		meta_mem;
1920 	Size		hashkey_mem;
1921 	Size		buffer_mem;
1922 	Size		total_mem;
1923 
1924 	if (aggstate->aggstrategy != AGG_MIXED &&
1925 		aggstate->aggstrategy != AGG_HASHED)
1926 		return;
1927 
1928 	/* memory for the hash table itself */
1929 	meta_mem = MemoryContextMemAllocated(aggstate->hash_metacxt, true);
1930 
1931 	/* memory for the group keys and transition states */
1932 	hashkey_mem = MemoryContextMemAllocated(aggstate->hashcontext->ecxt_per_tuple_memory, true);
1933 
1934 	/* memory for read/write tape buffers, if spilled */
1935 	buffer_mem = npartitions * HASHAGG_WRITE_BUFFER_SIZE;
1936 	if (from_tape)
1937 		buffer_mem += HASHAGG_READ_BUFFER_SIZE;
1938 
1939 	/* update peak mem */
1940 	total_mem = meta_mem + hashkey_mem + buffer_mem;
1941 	if (total_mem > aggstate->hash_mem_peak)
1942 		aggstate->hash_mem_peak = total_mem;
1943 
1944 	/* update disk usage */
1945 	if (aggstate->hash_tapeinfo != NULL)
1946 	{
1947 		uint64		disk_used = LogicalTapeSetBlocks(aggstate->hash_tapeinfo->tapeset) * (BLCKSZ / 1024);
1948 
1949 		if (aggstate->hash_disk_used < disk_used)
1950 			aggstate->hash_disk_used = disk_used;
1951 	}
1952 
1953 	/* update hashentrysize estimate based on contents */
1954 	if (aggstate->hash_ngroups_current > 0)
1955 	{
1956 		aggstate->hashentrysize =
1957 			sizeof(TupleHashEntryData) +
1958 			(hashkey_mem / (double) aggstate->hash_ngroups_current);
1959 	}
1960 }
1961 
1962 /*
1963  * Choose a reasonable number of buckets for the initial hash table size.
1964  */
1965 static long
hash_choose_num_buckets(double hashentrysize,long ngroups,Size memory)1966 hash_choose_num_buckets(double hashentrysize, long ngroups, Size memory)
1967 {
1968 	long		max_nbuckets;
1969 	long		nbuckets = ngroups;
1970 
1971 	max_nbuckets = memory / hashentrysize;
1972 
1973 	/*
1974 	 * Underestimating is better than overestimating. Too many buckets crowd
1975 	 * out space for group keys and transition state values.
1976 	 */
1977 	max_nbuckets >>= 1;
1978 
1979 	if (nbuckets > max_nbuckets)
1980 		nbuckets = max_nbuckets;
1981 
1982 	return Max(nbuckets, 1);
1983 }
1984 
1985 /*
1986  * Determine the number of partitions to create when spilling, which will
1987  * always be a power of two. If log2_npartitions is non-NULL, set
1988  * *log2_npartitions to the log2() of the number of partitions.
1989  */
1990 static int
hash_choose_num_partitions(double input_groups,double hashentrysize,int used_bits,int * log2_npartitions)1991 hash_choose_num_partitions(double input_groups, double hashentrysize,
1992 						   int used_bits, int *log2_npartitions)
1993 {
1994 	Size		hash_mem_limit = get_hash_memory_limit();
1995 	double		partition_limit;
1996 	double		mem_wanted;
1997 	double		dpartitions;
1998 	int			npartitions;
1999 	int			partition_bits;
2000 
2001 	/*
2002 	 * Avoid creating so many partitions that the memory requirements of the
2003 	 * open partition files are greater than 1/4 of hash_mem.
2004 	 */
2005 	partition_limit =
2006 		(hash_mem_limit * 0.25 - HASHAGG_READ_BUFFER_SIZE) /
2007 		HASHAGG_WRITE_BUFFER_SIZE;
2008 
2009 	mem_wanted = HASHAGG_PARTITION_FACTOR * input_groups * hashentrysize;
2010 
2011 	/* make enough partitions so that each one is likely to fit in memory */
2012 	dpartitions = 1 + (mem_wanted / hash_mem_limit);
2013 
2014 	if (dpartitions > partition_limit)
2015 		dpartitions = partition_limit;
2016 
2017 	if (dpartitions < HASHAGG_MIN_PARTITIONS)
2018 		dpartitions = HASHAGG_MIN_PARTITIONS;
2019 	if (dpartitions > HASHAGG_MAX_PARTITIONS)
2020 		dpartitions = HASHAGG_MAX_PARTITIONS;
2021 
2022 	/* HASHAGG_MAX_PARTITIONS limit makes this safe */
2023 	npartitions = (int) dpartitions;
2024 
2025 	/* ceil(log2(npartitions)) */
2026 	partition_bits = my_log2(npartitions);
2027 
2028 	/* make sure that we don't exhaust the hash bits */
2029 	if (partition_bits + used_bits >= 32)
2030 		partition_bits = 32 - used_bits;
2031 
2032 	if (log2_npartitions != NULL)
2033 		*log2_npartitions = partition_bits;
2034 
2035 	/* number of partitions will be a power of two */
2036 	npartitions = 1 << partition_bits;
2037 
2038 	return npartitions;
2039 }
2040 
2041 /*
2042  * Initialize a freshly-created TupleHashEntry.
2043  */
2044 static void
initialize_hash_entry(AggState * aggstate,TupleHashTable hashtable,TupleHashEntry entry)2045 initialize_hash_entry(AggState *aggstate, TupleHashTable hashtable,
2046 					  TupleHashEntry entry)
2047 {
2048 	AggStatePerGroup pergroup;
2049 	int			transno;
2050 
2051 	aggstate->hash_ngroups_current++;
2052 	hash_agg_check_limits(aggstate);
2053 
2054 	/* no need to allocate or initialize per-group state */
2055 	if (aggstate->numtrans == 0)
2056 		return;
2057 
2058 	pergroup = (AggStatePerGroup)
2059 		MemoryContextAlloc(hashtable->tablecxt,
2060 						   sizeof(AggStatePerGroupData) * aggstate->numtrans);
2061 
2062 	entry->additional = pergroup;
2063 
2064 	/*
2065 	 * Initialize aggregates for new tuple group, lookup_hash_entries()
2066 	 * already has selected the relevant grouping set.
2067 	 */
2068 	for (transno = 0; transno < aggstate->numtrans; transno++)
2069 	{
2070 		AggStatePerTrans pertrans = &aggstate->pertrans[transno];
2071 		AggStatePerGroup pergroupstate = &pergroup[transno];
2072 
2073 		initialize_aggregate(aggstate, pertrans, pergroupstate);
2074 	}
2075 }
2076 
2077 /*
2078  * Look up hash entries for the current tuple in all hashed grouping sets.
2079  *
2080  * Be aware that lookup_hash_entry can reset the tmpcontext.
2081  *
2082  * Some entries may be left NULL if we are in "spill mode". The same tuple
2083  * will belong to different groups for each grouping set, so may match a group
2084  * already in memory for one set and match a group not in memory for another
2085  * set. When in "spill mode", the tuple will be spilled for each grouping set
2086  * where it doesn't match a group in memory.
2087  *
2088  * NB: It's possible to spill the same tuple for several different grouping
2089  * sets. This may seem wasteful, but it's actually a trade-off: if we spill
2090  * the tuple multiple times for multiple grouping sets, it can be partitioned
2091  * for each grouping set, making the refilling of the hash table very
2092  * efficient.
2093  */
2094 static void
lookup_hash_entries(AggState * aggstate)2095 lookup_hash_entries(AggState *aggstate)
2096 {
2097 	AggStatePerGroup *pergroup = aggstate->hash_pergroup;
2098 	TupleTableSlot *outerslot = aggstate->tmpcontext->ecxt_outertuple;
2099 	int			setno;
2100 
2101 	for (setno = 0; setno < aggstate->num_hashes; setno++)
2102 	{
2103 		AggStatePerHash perhash = &aggstate->perhash[setno];
2104 		TupleHashTable hashtable = perhash->hashtable;
2105 		TupleTableSlot *hashslot = perhash->hashslot;
2106 		TupleHashEntry entry;
2107 		uint32		hash;
2108 		bool		isnew = false;
2109 		bool	   *p_isnew;
2110 
2111 		/* if hash table already spilled, don't create new entries */
2112 		p_isnew = aggstate->hash_spill_mode ? NULL : &isnew;
2113 
2114 		select_current_set(aggstate, setno, true);
2115 		prepare_hash_slot(perhash,
2116 						  outerslot,
2117 						  hashslot);
2118 
2119 		entry = LookupTupleHashEntry(hashtable, hashslot,
2120 									 p_isnew, &hash);
2121 
2122 		if (entry != NULL)
2123 		{
2124 			if (isnew)
2125 				initialize_hash_entry(aggstate, hashtable, entry);
2126 			pergroup[setno] = entry->additional;
2127 		}
2128 		else
2129 		{
2130 			HashAggSpill *spill = &aggstate->hash_spills[setno];
2131 			TupleTableSlot *slot = aggstate->tmpcontext->ecxt_outertuple;
2132 
2133 			if (spill->partitions == NULL)
2134 				hashagg_spill_init(spill, aggstate->hash_tapeinfo, 0,
2135 								   perhash->aggnode->numGroups,
2136 								   aggstate->hashentrysize);
2137 
2138 			hashagg_spill_tuple(aggstate, spill, slot, hash);
2139 			pergroup[setno] = NULL;
2140 		}
2141 	}
2142 }
2143 
2144 /*
2145  * ExecAgg -
2146  *
2147  *	  ExecAgg receives tuples from its outer subplan and aggregates over
2148  *	  the appropriate attribute for each aggregate function use (Aggref
2149  *	  node) appearing in the targetlist or qual of the node.  The number
2150  *	  of tuples to aggregate over depends on whether grouped or plain
2151  *	  aggregation is selected.  In grouped aggregation, we produce a result
2152  *	  row for each group; in plain aggregation there's a single result row
2153  *	  for the whole query.  In either case, the value of each aggregate is
2154  *	  stored in the expression context to be used when ExecProject evaluates
2155  *	  the result tuple.
2156  */
2157 static TupleTableSlot *
ExecAgg(PlanState * pstate)2158 ExecAgg(PlanState *pstate)
2159 {
2160 	AggState   *node = castNode(AggState, pstate);
2161 	TupleTableSlot *result = NULL;
2162 
2163 	CHECK_FOR_INTERRUPTS();
2164 
2165 	if (!node->agg_done)
2166 	{
2167 		/* Dispatch based on strategy */
2168 		switch (node->phase->aggstrategy)
2169 		{
2170 			case AGG_HASHED:
2171 				if (!node->table_filled)
2172 					agg_fill_hash_table(node);
2173 				/* FALLTHROUGH */
2174 			case AGG_MIXED:
2175 				result = agg_retrieve_hash_table(node);
2176 				break;
2177 			case AGG_PLAIN:
2178 			case AGG_SORTED:
2179 				result = agg_retrieve_direct(node);
2180 				break;
2181 		}
2182 
2183 		if (!TupIsNull(result))
2184 			return result;
2185 	}
2186 
2187 	return NULL;
2188 }
2189 
2190 /*
2191  * ExecAgg for non-hashed case
2192  */
2193 static TupleTableSlot *
agg_retrieve_direct(AggState * aggstate)2194 agg_retrieve_direct(AggState *aggstate)
2195 {
2196 	Agg		   *node = aggstate->phase->aggnode;
2197 	ExprContext *econtext;
2198 	ExprContext *tmpcontext;
2199 	AggStatePerAgg peragg;
2200 	AggStatePerGroup *pergroups;
2201 	TupleTableSlot *outerslot;
2202 	TupleTableSlot *firstSlot;
2203 	TupleTableSlot *result;
2204 	bool		hasGroupingSets = aggstate->phase->numsets > 0;
2205 	int			numGroupingSets = Max(aggstate->phase->numsets, 1);
2206 	int			currentSet;
2207 	int			nextSetSize;
2208 	int			numReset;
2209 	int			i;
2210 
2211 	/*
2212 	 * get state info from node
2213 	 *
2214 	 * econtext is the per-output-tuple expression context
2215 	 *
2216 	 * tmpcontext is the per-input-tuple expression context
2217 	 */
2218 	econtext = aggstate->ss.ps.ps_ExprContext;
2219 	tmpcontext = aggstate->tmpcontext;
2220 
2221 	peragg = aggstate->peragg;
2222 	pergroups = aggstate->pergroups;
2223 	firstSlot = aggstate->ss.ss_ScanTupleSlot;
2224 
2225 	/*
2226 	 * We loop retrieving groups until we find one matching
2227 	 * aggstate->ss.ps.qual
2228 	 *
2229 	 * For grouping sets, we have the invariant that aggstate->projected_set
2230 	 * is either -1 (initial call) or the index (starting from 0) in
2231 	 * gset_lengths for the group we just completed (either by projecting a
2232 	 * row or by discarding it in the qual).
2233 	 */
2234 	while (!aggstate->agg_done)
2235 	{
2236 		/*
2237 		 * Clear the per-output-tuple context for each group, as well as
2238 		 * aggcontext (which contains any pass-by-ref transvalues of the old
2239 		 * group).  Some aggregate functions store working state in child
2240 		 * contexts; those now get reset automatically without us needing to
2241 		 * do anything special.
2242 		 *
2243 		 * We use ReScanExprContext not just ResetExprContext because we want
2244 		 * any registered shutdown callbacks to be called.  That allows
2245 		 * aggregate functions to ensure they've cleaned up any non-memory
2246 		 * resources.
2247 		 */
2248 		ReScanExprContext(econtext);
2249 
2250 		/*
2251 		 * Determine how many grouping sets need to be reset at this boundary.
2252 		 */
2253 		if (aggstate->projected_set >= 0 &&
2254 			aggstate->projected_set < numGroupingSets)
2255 			numReset = aggstate->projected_set + 1;
2256 		else
2257 			numReset = numGroupingSets;
2258 
2259 		/*
2260 		 * numReset can change on a phase boundary, but that's OK; we want to
2261 		 * reset the contexts used in _this_ phase, and later, after possibly
2262 		 * changing phase, initialize the right number of aggregates for the
2263 		 * _new_ phase.
2264 		 */
2265 
2266 		for (i = 0; i < numReset; i++)
2267 		{
2268 			ReScanExprContext(aggstate->aggcontexts[i]);
2269 		}
2270 
2271 		/*
2272 		 * Check if input is complete and there are no more groups to project
2273 		 * in this phase; move to next phase or mark as done.
2274 		 */
2275 		if (aggstate->input_done == true &&
2276 			aggstate->projected_set >= (numGroupingSets - 1))
2277 		{
2278 			if (aggstate->current_phase < aggstate->numphases - 1)
2279 			{
2280 				initialize_phase(aggstate, aggstate->current_phase + 1);
2281 				aggstate->input_done = false;
2282 				aggstate->projected_set = -1;
2283 				numGroupingSets = Max(aggstate->phase->numsets, 1);
2284 				node = aggstate->phase->aggnode;
2285 				numReset = numGroupingSets;
2286 			}
2287 			else if (aggstate->aggstrategy == AGG_MIXED)
2288 			{
2289 				/*
2290 				 * Mixed mode; we've output all the grouped stuff and have
2291 				 * full hashtables, so switch to outputting those.
2292 				 */
2293 				initialize_phase(aggstate, 0);
2294 				aggstate->table_filled = true;
2295 				ResetTupleHashIterator(aggstate->perhash[0].hashtable,
2296 									   &aggstate->perhash[0].hashiter);
2297 				select_current_set(aggstate, 0, true);
2298 				return agg_retrieve_hash_table(aggstate);
2299 			}
2300 			else
2301 			{
2302 				aggstate->agg_done = true;
2303 				break;
2304 			}
2305 		}
2306 
2307 		/*
2308 		 * Get the number of columns in the next grouping set after the last
2309 		 * projected one (if any). This is the number of columns to compare to
2310 		 * see if we reached the boundary of that set too.
2311 		 */
2312 		if (aggstate->projected_set >= 0 &&
2313 			aggstate->projected_set < (numGroupingSets - 1))
2314 			nextSetSize = aggstate->phase->gset_lengths[aggstate->projected_set + 1];
2315 		else
2316 			nextSetSize = 0;
2317 
2318 		/*----------
2319 		 * If a subgroup for the current grouping set is present, project it.
2320 		 *
2321 		 * We have a new group if:
2322 		 *	- we're out of input but haven't projected all grouping sets
2323 		 *	  (checked above)
2324 		 * OR
2325 		 *	  - we already projected a row that wasn't from the last grouping
2326 		 *		set
2327 		 *	  AND
2328 		 *	  - the next grouping set has at least one grouping column (since
2329 		 *		empty grouping sets project only once input is exhausted)
2330 		 *	  AND
2331 		 *	  - the previous and pending rows differ on the grouping columns
2332 		 *		of the next grouping set
2333 		 *----------
2334 		 */
2335 		tmpcontext->ecxt_innertuple = econtext->ecxt_outertuple;
2336 		if (aggstate->input_done ||
2337 			(node->aggstrategy != AGG_PLAIN &&
2338 			 aggstate->projected_set != -1 &&
2339 			 aggstate->projected_set < (numGroupingSets - 1) &&
2340 			 nextSetSize > 0 &&
2341 			 !ExecQualAndReset(aggstate->phase->eqfunctions[nextSetSize - 1],
2342 							   tmpcontext)))
2343 		{
2344 			aggstate->projected_set += 1;
2345 
2346 			Assert(aggstate->projected_set < numGroupingSets);
2347 			Assert(nextSetSize > 0 || aggstate->input_done);
2348 		}
2349 		else
2350 		{
2351 			/*
2352 			 * We no longer care what group we just projected, the next
2353 			 * projection will always be the first (or only) grouping set
2354 			 * (unless the input proves to be empty).
2355 			 */
2356 			aggstate->projected_set = 0;
2357 
2358 			/*
2359 			 * If we don't already have the first tuple of the new group,
2360 			 * fetch it from the outer plan.
2361 			 */
2362 			if (aggstate->grp_firstTuple == NULL)
2363 			{
2364 				outerslot = fetch_input_tuple(aggstate);
2365 				if (!TupIsNull(outerslot))
2366 				{
2367 					/*
2368 					 * Make a copy of the first input tuple; we will use this
2369 					 * for comparisons (in group mode) and for projection.
2370 					 */
2371 					aggstate->grp_firstTuple = ExecCopySlotHeapTuple(outerslot);
2372 				}
2373 				else
2374 				{
2375 					/* outer plan produced no tuples at all */
2376 					if (hasGroupingSets)
2377 					{
2378 						/*
2379 						 * If there was no input at all, we need to project
2380 						 * rows only if there are grouping sets of size 0.
2381 						 * Note that this implies that there can't be any
2382 						 * references to ungrouped Vars, which would otherwise
2383 						 * cause issues with the empty output slot.
2384 						 *
2385 						 * XXX: This is no longer true, we currently deal with
2386 						 * this in finalize_aggregates().
2387 						 */
2388 						aggstate->input_done = true;
2389 
2390 						while (aggstate->phase->gset_lengths[aggstate->projected_set] > 0)
2391 						{
2392 							aggstate->projected_set += 1;
2393 							if (aggstate->projected_set >= numGroupingSets)
2394 							{
2395 								/*
2396 								 * We can't set agg_done here because we might
2397 								 * have more phases to do, even though the
2398 								 * input is empty. So we need to restart the
2399 								 * whole outer loop.
2400 								 */
2401 								break;
2402 							}
2403 						}
2404 
2405 						if (aggstate->projected_set >= numGroupingSets)
2406 							continue;
2407 					}
2408 					else
2409 					{
2410 						aggstate->agg_done = true;
2411 						/* If we are grouping, we should produce no tuples too */
2412 						if (node->aggstrategy != AGG_PLAIN)
2413 							return NULL;
2414 					}
2415 				}
2416 			}
2417 
2418 			/*
2419 			 * Initialize working state for a new input tuple group.
2420 			 */
2421 			initialize_aggregates(aggstate, pergroups, numReset);
2422 
2423 			if (aggstate->grp_firstTuple != NULL)
2424 			{
2425 				/*
2426 				 * Store the copied first input tuple in the tuple table slot
2427 				 * reserved for it.  The tuple will be deleted when it is
2428 				 * cleared from the slot.
2429 				 */
2430 				ExecForceStoreHeapTuple(aggstate->grp_firstTuple,
2431 										firstSlot, true);
2432 				aggstate->grp_firstTuple = NULL;	/* don't keep two pointers */
2433 
2434 				/* set up for first advance_aggregates call */
2435 				tmpcontext->ecxt_outertuple = firstSlot;
2436 
2437 				/*
2438 				 * Process each outer-plan tuple, and then fetch the next one,
2439 				 * until we exhaust the outer plan or cross a group boundary.
2440 				 */
2441 				for (;;)
2442 				{
2443 					/*
2444 					 * During phase 1 only of a mixed agg, we need to update
2445 					 * hashtables as well in advance_aggregates.
2446 					 */
2447 					if (aggstate->aggstrategy == AGG_MIXED &&
2448 						aggstate->current_phase == 1)
2449 					{
2450 						lookup_hash_entries(aggstate);
2451 					}
2452 
2453 					/* Advance the aggregates (or combine functions) */
2454 					advance_aggregates(aggstate);
2455 
2456 					/* Reset per-input-tuple context after each tuple */
2457 					ResetExprContext(tmpcontext);
2458 
2459 					outerslot = fetch_input_tuple(aggstate);
2460 					if (TupIsNull(outerslot))
2461 					{
2462 						/* no more outer-plan tuples available */
2463 
2464 						/* if we built hash tables, finalize any spills */
2465 						if (aggstate->aggstrategy == AGG_MIXED &&
2466 							aggstate->current_phase == 1)
2467 							hashagg_finish_initial_spills(aggstate);
2468 
2469 						if (hasGroupingSets)
2470 						{
2471 							aggstate->input_done = true;
2472 							break;
2473 						}
2474 						else
2475 						{
2476 							aggstate->agg_done = true;
2477 							break;
2478 						}
2479 					}
2480 					/* set up for next advance_aggregates call */
2481 					tmpcontext->ecxt_outertuple = outerslot;
2482 
2483 					/*
2484 					 * If we are grouping, check whether we've crossed a group
2485 					 * boundary.
2486 					 */
2487 					if (node->aggstrategy != AGG_PLAIN)
2488 					{
2489 						tmpcontext->ecxt_innertuple = firstSlot;
2490 						if (!ExecQual(aggstate->phase->eqfunctions[node->numCols - 1],
2491 									  tmpcontext))
2492 						{
2493 							aggstate->grp_firstTuple = ExecCopySlotHeapTuple(outerslot);
2494 							break;
2495 						}
2496 					}
2497 				}
2498 			}
2499 
2500 			/*
2501 			 * Use the representative input tuple for any references to
2502 			 * non-aggregated input columns in aggregate direct args, the node
2503 			 * qual, and the tlist.  (If we are not grouping, and there are no
2504 			 * input rows at all, we will come here with an empty firstSlot
2505 			 * ... but if not grouping, there can't be any references to
2506 			 * non-aggregated input columns, so no problem.)
2507 			 */
2508 			econtext->ecxt_outertuple = firstSlot;
2509 		}
2510 
2511 		Assert(aggstate->projected_set >= 0);
2512 
2513 		currentSet = aggstate->projected_set;
2514 
2515 		prepare_projection_slot(aggstate, econtext->ecxt_outertuple, currentSet);
2516 
2517 		select_current_set(aggstate, currentSet, false);
2518 
2519 		finalize_aggregates(aggstate,
2520 							peragg,
2521 							pergroups[currentSet]);
2522 
2523 		/*
2524 		 * If there's no row to project right now, we must continue rather
2525 		 * than returning a null since there might be more groups.
2526 		 */
2527 		result = project_aggregates(aggstate);
2528 		if (result)
2529 			return result;
2530 	}
2531 
2532 	/* No more groups */
2533 	return NULL;
2534 }
2535 
2536 /*
2537  * ExecAgg for hashed case: read input and build hash table
2538  */
2539 static void
agg_fill_hash_table(AggState * aggstate)2540 agg_fill_hash_table(AggState *aggstate)
2541 {
2542 	TupleTableSlot *outerslot;
2543 	ExprContext *tmpcontext = aggstate->tmpcontext;
2544 
2545 	/*
2546 	 * Process each outer-plan tuple, and then fetch the next one, until we
2547 	 * exhaust the outer plan.
2548 	 */
2549 	for (;;)
2550 	{
2551 		outerslot = fetch_input_tuple(aggstate);
2552 		if (TupIsNull(outerslot))
2553 			break;
2554 
2555 		/* set up for lookup_hash_entries and advance_aggregates */
2556 		tmpcontext->ecxt_outertuple = outerslot;
2557 
2558 		/* Find or build hashtable entries */
2559 		lookup_hash_entries(aggstate);
2560 
2561 		/* Advance the aggregates (or combine functions) */
2562 		advance_aggregates(aggstate);
2563 
2564 		/*
2565 		 * Reset per-input-tuple context after each tuple, but note that the
2566 		 * hash lookups do this too
2567 		 */
2568 		ResetExprContext(aggstate->tmpcontext);
2569 	}
2570 
2571 	/* finalize spills, if any */
2572 	hashagg_finish_initial_spills(aggstate);
2573 
2574 	aggstate->table_filled = true;
2575 	/* Initialize to walk the first hash table */
2576 	select_current_set(aggstate, 0, true);
2577 	ResetTupleHashIterator(aggstate->perhash[0].hashtable,
2578 						   &aggstate->perhash[0].hashiter);
2579 }
2580 
2581 /*
2582  * If any data was spilled during hash aggregation, reset the hash table and
2583  * reprocess one batch of spilled data. After reprocessing a batch, the hash
2584  * table will again contain data, ready to be consumed by
2585  * agg_retrieve_hash_table_in_memory().
2586  *
2587  * Should only be called after all in memory hash table entries have been
2588  * finalized and emitted.
2589  *
2590  * Return false when input is exhausted and there's no more work to be done;
2591  * otherwise return true.
2592  */
2593 static bool
agg_refill_hash_table(AggState * aggstate)2594 agg_refill_hash_table(AggState *aggstate)
2595 {
2596 	HashAggBatch *batch;
2597 	AggStatePerHash perhash;
2598 	HashAggSpill spill;
2599 	HashTapeInfo *tapeinfo = aggstate->hash_tapeinfo;
2600 	bool		spill_initialized = false;
2601 
2602 	if (aggstate->hash_batches == NIL)
2603 		return false;
2604 
2605 	/* hash_batches is a stack, with the top item at the end of the list */
2606 	batch = llast(aggstate->hash_batches);
2607 	aggstate->hash_batches = list_delete_last(aggstate->hash_batches);
2608 
2609 	hash_agg_set_limits(aggstate->hashentrysize, batch->input_card,
2610 						batch->used_bits, &aggstate->hash_mem_limit,
2611 						&aggstate->hash_ngroups_limit, NULL);
2612 
2613 	/*
2614 	 * Each batch only processes one grouping set; set the rest to NULL so
2615 	 * that advance_aggregates() knows to ignore them. We don't touch
2616 	 * pergroups for sorted grouping sets here, because they will be needed if
2617 	 * we rescan later. The expressions for sorted grouping sets will not be
2618 	 * evaluated after we recompile anyway.
2619 	 */
2620 	MemSet(aggstate->hash_pergroup, 0,
2621 		   sizeof(AggStatePerGroup) * aggstate->num_hashes);
2622 
2623 	/* free memory and reset hash tables */
2624 	ReScanExprContext(aggstate->hashcontext);
2625 	for (int setno = 0; setno < aggstate->num_hashes; setno++)
2626 		ResetTupleHashTable(aggstate->perhash[setno].hashtable);
2627 
2628 	aggstate->hash_ngroups_current = 0;
2629 
2630 	/*
2631 	 * In AGG_MIXED mode, hash aggregation happens in phase 1 and the output
2632 	 * happens in phase 0. So, we switch to phase 1 when processing a batch,
2633 	 * and back to phase 0 after the batch is done.
2634 	 */
2635 	Assert(aggstate->current_phase == 0);
2636 	if (aggstate->phase->aggstrategy == AGG_MIXED)
2637 	{
2638 		aggstate->current_phase = 1;
2639 		aggstate->phase = &aggstate->phases[aggstate->current_phase];
2640 	}
2641 
2642 	select_current_set(aggstate, batch->setno, true);
2643 
2644 	perhash = &aggstate->perhash[aggstate->current_set];
2645 
2646 	/*
2647 	 * Spilled tuples are always read back as MinimalTuples, which may be
2648 	 * different from the outer plan, so recompile the aggregate expressions.
2649 	 *
2650 	 * We still need the NULL check, because we are only processing one
2651 	 * grouping set at a time and the rest will be NULL.
2652 	 */
2653 	hashagg_recompile_expressions(aggstate, true, true);
2654 
2655 	for (;;)
2656 	{
2657 		TupleTableSlot *spillslot = aggstate->hash_spill_rslot;
2658 		TupleTableSlot *hashslot = perhash->hashslot;
2659 		TupleHashEntry entry;
2660 		MinimalTuple tuple;
2661 		uint32		hash;
2662 		bool		isnew = false;
2663 		bool	   *p_isnew = aggstate->hash_spill_mode ? NULL : &isnew;
2664 
2665 		CHECK_FOR_INTERRUPTS();
2666 
2667 		tuple = hashagg_batch_read(batch, &hash);
2668 		if (tuple == NULL)
2669 			break;
2670 
2671 		ExecStoreMinimalTuple(tuple, spillslot, true);
2672 		aggstate->tmpcontext->ecxt_outertuple = spillslot;
2673 
2674 		prepare_hash_slot(perhash,
2675 						  aggstate->tmpcontext->ecxt_outertuple,
2676 						  hashslot);
2677 		entry = LookupTupleHashEntryHash(
2678 										 perhash->hashtable, hashslot, p_isnew, hash);
2679 
2680 		if (entry != NULL)
2681 		{
2682 			if (isnew)
2683 				initialize_hash_entry(aggstate, perhash->hashtable, entry);
2684 			aggstate->hash_pergroup[batch->setno] = entry->additional;
2685 			advance_aggregates(aggstate);
2686 		}
2687 		else
2688 		{
2689 			if (!spill_initialized)
2690 			{
2691 				/*
2692 				 * Avoid initializing the spill until we actually need it so
2693 				 * that we don't assign tapes that will never be used.
2694 				 */
2695 				spill_initialized = true;
2696 				hashagg_spill_init(&spill, tapeinfo, batch->used_bits,
2697 								   batch->input_card, aggstate->hashentrysize);
2698 			}
2699 			/* no memory for a new group, spill */
2700 			hashagg_spill_tuple(aggstate, &spill, spillslot, hash);
2701 
2702 			aggstate->hash_pergroup[batch->setno] = NULL;
2703 		}
2704 
2705 		/*
2706 		 * Reset per-input-tuple context after each tuple, but note that the
2707 		 * hash lookups do this too
2708 		 */
2709 		ResetExprContext(aggstate->tmpcontext);
2710 	}
2711 
2712 	hashagg_tapeinfo_release(tapeinfo, batch->input_tapenum);
2713 
2714 	/* change back to phase 0 */
2715 	aggstate->current_phase = 0;
2716 	aggstate->phase = &aggstate->phases[aggstate->current_phase];
2717 
2718 	if (spill_initialized)
2719 	{
2720 		hashagg_spill_finish(aggstate, &spill, batch->setno);
2721 		hash_agg_update_metrics(aggstate, true, spill.npartitions);
2722 	}
2723 	else
2724 		hash_agg_update_metrics(aggstate, true, 0);
2725 
2726 	aggstate->hash_spill_mode = false;
2727 
2728 	/* prepare to walk the first hash table */
2729 	select_current_set(aggstate, batch->setno, true);
2730 	ResetTupleHashIterator(aggstate->perhash[batch->setno].hashtable,
2731 						   &aggstate->perhash[batch->setno].hashiter);
2732 
2733 	pfree(batch);
2734 
2735 	return true;
2736 }
2737 
2738 /*
2739  * ExecAgg for hashed case: retrieving groups from hash table
2740  *
2741  * After exhausting in-memory tuples, also try refilling the hash table using
2742  * previously-spilled tuples. Only returns NULL after all in-memory and
2743  * spilled tuples are exhausted.
2744  */
2745 static TupleTableSlot *
agg_retrieve_hash_table(AggState * aggstate)2746 agg_retrieve_hash_table(AggState *aggstate)
2747 {
2748 	TupleTableSlot *result = NULL;
2749 
2750 	while (result == NULL)
2751 	{
2752 		result = agg_retrieve_hash_table_in_memory(aggstate);
2753 		if (result == NULL)
2754 		{
2755 			if (!agg_refill_hash_table(aggstate))
2756 			{
2757 				aggstate->agg_done = true;
2758 				break;
2759 			}
2760 		}
2761 	}
2762 
2763 	return result;
2764 }
2765 
2766 /*
2767  * Retrieve the groups from the in-memory hash tables without considering any
2768  * spilled tuples.
2769  */
2770 static TupleTableSlot *
agg_retrieve_hash_table_in_memory(AggState * aggstate)2771 agg_retrieve_hash_table_in_memory(AggState *aggstate)
2772 {
2773 	ExprContext *econtext;
2774 	AggStatePerAgg peragg;
2775 	AggStatePerGroup pergroup;
2776 	TupleHashEntryData *entry;
2777 	TupleTableSlot *firstSlot;
2778 	TupleTableSlot *result;
2779 	AggStatePerHash perhash;
2780 
2781 	/*
2782 	 * get state info from node.
2783 	 *
2784 	 * econtext is the per-output-tuple expression context.
2785 	 */
2786 	econtext = aggstate->ss.ps.ps_ExprContext;
2787 	peragg = aggstate->peragg;
2788 	firstSlot = aggstate->ss.ss_ScanTupleSlot;
2789 
2790 	/*
2791 	 * Note that perhash (and therefore anything accessed through it) can
2792 	 * change inside the loop, as we change between grouping sets.
2793 	 */
2794 	perhash = &aggstate->perhash[aggstate->current_set];
2795 
2796 	/*
2797 	 * We loop retrieving groups until we find one satisfying
2798 	 * aggstate->ss.ps.qual
2799 	 */
2800 	for (;;)
2801 	{
2802 		TupleTableSlot *hashslot = perhash->hashslot;
2803 		int			i;
2804 
2805 		CHECK_FOR_INTERRUPTS();
2806 
2807 		/*
2808 		 * Find the next entry in the hash table
2809 		 */
2810 		entry = ScanTupleHashTable(perhash->hashtable, &perhash->hashiter);
2811 		if (entry == NULL)
2812 		{
2813 			int			nextset = aggstate->current_set + 1;
2814 
2815 			if (nextset < aggstate->num_hashes)
2816 			{
2817 				/*
2818 				 * Switch to next grouping set, reinitialize, and restart the
2819 				 * loop.
2820 				 */
2821 				select_current_set(aggstate, nextset, true);
2822 
2823 				perhash = &aggstate->perhash[aggstate->current_set];
2824 
2825 				ResetTupleHashIterator(perhash->hashtable, &perhash->hashiter);
2826 
2827 				continue;
2828 			}
2829 			else
2830 			{
2831 				return NULL;
2832 			}
2833 		}
2834 
2835 		/*
2836 		 * Clear the per-output-tuple context for each group
2837 		 *
2838 		 * We intentionally don't use ReScanExprContext here; if any aggs have
2839 		 * registered shutdown callbacks, they mustn't be called yet, since we
2840 		 * might not be done with that agg.
2841 		 */
2842 		ResetExprContext(econtext);
2843 
2844 		/*
2845 		 * Transform representative tuple back into one with the right
2846 		 * columns.
2847 		 */
2848 		ExecStoreMinimalTuple(entry->firstTuple, hashslot, false);
2849 		slot_getallattrs(hashslot);
2850 
2851 		ExecClearTuple(firstSlot);
2852 		memset(firstSlot->tts_isnull, true,
2853 			   firstSlot->tts_tupleDescriptor->natts * sizeof(bool));
2854 
2855 		for (i = 0; i < perhash->numhashGrpCols; i++)
2856 		{
2857 			int			varNumber = perhash->hashGrpColIdxInput[i] - 1;
2858 
2859 			firstSlot->tts_values[varNumber] = hashslot->tts_values[i];
2860 			firstSlot->tts_isnull[varNumber] = hashslot->tts_isnull[i];
2861 		}
2862 		ExecStoreVirtualTuple(firstSlot);
2863 
2864 		pergroup = (AggStatePerGroup) entry->additional;
2865 
2866 		/*
2867 		 * Use the representative input tuple for any references to
2868 		 * non-aggregated input columns in the qual and tlist.
2869 		 */
2870 		econtext->ecxt_outertuple = firstSlot;
2871 
2872 		prepare_projection_slot(aggstate,
2873 								econtext->ecxt_outertuple,
2874 								aggstate->current_set);
2875 
2876 		finalize_aggregates(aggstate, peragg, pergroup);
2877 
2878 		result = project_aggregates(aggstate);
2879 		if (result)
2880 			return result;
2881 	}
2882 
2883 	/* No more groups */
2884 	return NULL;
2885 }
2886 
2887 /*
2888  * Initialize HashTapeInfo
2889  */
2890 static void
hashagg_tapeinfo_init(AggState * aggstate)2891 hashagg_tapeinfo_init(AggState *aggstate)
2892 {
2893 	HashTapeInfo *tapeinfo = palloc(sizeof(HashTapeInfo));
2894 	int			init_tapes = 16;	/* expanded dynamically */
2895 
2896 	tapeinfo->tapeset = LogicalTapeSetCreate(init_tapes, true, NULL, NULL, -1);
2897 	tapeinfo->ntapes = init_tapes;
2898 	tapeinfo->nfreetapes = init_tapes;
2899 	tapeinfo->freetapes_alloc = init_tapes;
2900 	tapeinfo->freetapes = palloc(init_tapes * sizeof(int));
2901 	for (int i = 0; i < init_tapes; i++)
2902 		tapeinfo->freetapes[i] = i;
2903 
2904 	aggstate->hash_tapeinfo = tapeinfo;
2905 }
2906 
2907 /*
2908  * Assign unused tapes to spill partitions, extending the tape set if
2909  * necessary.
2910  */
2911 static void
hashagg_tapeinfo_assign(HashTapeInfo * tapeinfo,int * partitions,int npartitions)2912 hashagg_tapeinfo_assign(HashTapeInfo *tapeinfo, int *partitions,
2913 						int npartitions)
2914 {
2915 	int			partidx = 0;
2916 
2917 	/* use free tapes if available */
2918 	while (partidx < npartitions && tapeinfo->nfreetapes > 0)
2919 		partitions[partidx++] = tapeinfo->freetapes[--tapeinfo->nfreetapes];
2920 
2921 	if (partidx < npartitions)
2922 	{
2923 		LogicalTapeSetExtend(tapeinfo->tapeset, npartitions - partidx);
2924 
2925 		while (partidx < npartitions)
2926 			partitions[partidx++] = tapeinfo->ntapes++;
2927 	}
2928 }
2929 
2930 /*
2931  * After a tape has already been written to and then read, this function
2932  * rewinds it for writing and adds it to the free list.
2933  */
2934 static void
hashagg_tapeinfo_release(HashTapeInfo * tapeinfo,int tapenum)2935 hashagg_tapeinfo_release(HashTapeInfo *tapeinfo, int tapenum)
2936 {
2937 	/* rewinding frees the buffer while not in use */
2938 	LogicalTapeRewindForWrite(tapeinfo->tapeset, tapenum);
2939 	if (tapeinfo->freetapes_alloc == tapeinfo->nfreetapes)
2940 	{
2941 		tapeinfo->freetapes_alloc <<= 1;
2942 		tapeinfo->freetapes = repalloc(tapeinfo->freetapes,
2943 									   tapeinfo->freetapes_alloc * sizeof(int));
2944 	}
2945 	tapeinfo->freetapes[tapeinfo->nfreetapes++] = tapenum;
2946 }
2947 
2948 /*
2949  * hashagg_spill_init
2950  *
2951  * Called after we determined that spilling is necessary. Chooses the number
2952  * of partitions to create, and initializes them.
2953  */
2954 static void
hashagg_spill_init(HashAggSpill * spill,HashTapeInfo * tapeinfo,int used_bits,double input_groups,double hashentrysize)2955 hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, int used_bits,
2956 				   double input_groups, double hashentrysize)
2957 {
2958 	int			npartitions;
2959 	int			partition_bits;
2960 
2961 	npartitions = hash_choose_num_partitions(input_groups, hashentrysize,
2962 											 used_bits, &partition_bits);
2963 
2964 	spill->partitions = palloc0(sizeof(int) * npartitions);
2965 	spill->ntuples = palloc0(sizeof(int64) * npartitions);
2966 	spill->hll_card = palloc0(sizeof(hyperLogLogState) * npartitions);
2967 
2968 	hashagg_tapeinfo_assign(tapeinfo, spill->partitions, npartitions);
2969 
2970 	spill->tapeset = tapeinfo->tapeset;
2971 	spill->shift = 32 - used_bits - partition_bits;
2972 	spill->mask = (npartitions - 1) << spill->shift;
2973 	spill->npartitions = npartitions;
2974 
2975 	for (int i = 0; i < npartitions; i++)
2976 		initHyperLogLog(&spill->hll_card[i], HASHAGG_HLL_BIT_WIDTH);
2977 }
2978 
2979 /*
2980  * hashagg_spill_tuple
2981  *
2982  * No room for new groups in the hash table. Save for later in the appropriate
2983  * partition.
2984  */
2985 static Size
hashagg_spill_tuple(AggState * aggstate,HashAggSpill * spill,TupleTableSlot * inputslot,uint32 hash)2986 hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
2987 					TupleTableSlot *inputslot, uint32 hash)
2988 {
2989 	LogicalTapeSet *tapeset = spill->tapeset;
2990 	TupleTableSlot *spillslot;
2991 	int			partition;
2992 	MinimalTuple tuple;
2993 	int			tapenum;
2994 	int			total_written = 0;
2995 	bool		shouldFree;
2996 
2997 	Assert(spill->partitions != NULL);
2998 
2999 	/* spill only attributes that we actually need */
3000 	if (!aggstate->all_cols_needed)
3001 	{
3002 		spillslot = aggstate->hash_spill_wslot;
3003 		slot_getsomeattrs(inputslot, aggstate->max_colno_needed);
3004 		ExecClearTuple(spillslot);
3005 		for (int i = 0; i < spillslot->tts_tupleDescriptor->natts; i++)
3006 		{
3007 			if (bms_is_member(i + 1, aggstate->colnos_needed))
3008 			{
3009 				spillslot->tts_values[i] = inputslot->tts_values[i];
3010 				spillslot->tts_isnull[i] = inputslot->tts_isnull[i];
3011 			}
3012 			else
3013 				spillslot->tts_isnull[i] = true;
3014 		}
3015 		ExecStoreVirtualTuple(spillslot);
3016 	}
3017 	else
3018 		spillslot = inputslot;
3019 
3020 	tuple = ExecFetchSlotMinimalTuple(spillslot, &shouldFree);
3021 
3022 	partition = (hash & spill->mask) >> spill->shift;
3023 	spill->ntuples[partition]++;
3024 
3025 	/*
3026 	 * All hash values destined for a given partition have some bits in
3027 	 * common, which causes bad HLL cardinality estimates. Hash the hash to
3028 	 * get a more uniform distribution.
3029 	 */
3030 	addHyperLogLog(&spill->hll_card[partition], hash_bytes_uint32(hash));
3031 
3032 	tapenum = spill->partitions[partition];
3033 
3034 	LogicalTapeWrite(tapeset, tapenum, (void *) &hash, sizeof(uint32));
3035 	total_written += sizeof(uint32);
3036 
3037 	LogicalTapeWrite(tapeset, tapenum, (void *) tuple, tuple->t_len);
3038 	total_written += tuple->t_len;
3039 
3040 	if (shouldFree)
3041 		pfree(tuple);
3042 
3043 	return total_written;
3044 }
3045 
3046 /*
3047  * hashagg_batch_new
3048  *
3049  * Construct a HashAggBatch item, which represents one iteration of HashAgg to
3050  * be done.
3051  */
3052 static HashAggBatch *
hashagg_batch_new(LogicalTapeSet * tapeset,int tapenum,int setno,int64 input_tuples,double input_card,int used_bits)3053 hashagg_batch_new(LogicalTapeSet *tapeset, int tapenum, int setno,
3054 				  int64 input_tuples, double input_card, int used_bits)
3055 {
3056 	HashAggBatch *batch = palloc0(sizeof(HashAggBatch));
3057 
3058 	batch->setno = setno;
3059 	batch->used_bits = used_bits;
3060 	batch->tapeset = tapeset;
3061 	batch->input_tapenum = tapenum;
3062 	batch->input_tuples = input_tuples;
3063 	batch->input_card = input_card;
3064 
3065 	return batch;
3066 }
3067 
3068 /*
3069  * read_spilled_tuple
3070  * 		read the next tuple from a batch's tape.  Return NULL if no more.
3071  */
3072 static MinimalTuple
hashagg_batch_read(HashAggBatch * batch,uint32 * hashp)3073 hashagg_batch_read(HashAggBatch *batch, uint32 *hashp)
3074 {
3075 	LogicalTapeSet *tapeset = batch->tapeset;
3076 	int			tapenum = batch->input_tapenum;
3077 	MinimalTuple tuple;
3078 	uint32		t_len;
3079 	size_t		nread;
3080 	uint32		hash;
3081 
3082 	nread = LogicalTapeRead(tapeset, tapenum, &hash, sizeof(uint32));
3083 	if (nread == 0)
3084 		return NULL;
3085 	if (nread != sizeof(uint32))
3086 		ereport(ERROR,
3087 				(errcode_for_file_access(),
3088 				 errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes",
3089 						tapenum, sizeof(uint32), nread)));
3090 	if (hashp != NULL)
3091 		*hashp = hash;
3092 
3093 	nread = LogicalTapeRead(tapeset, tapenum, &t_len, sizeof(t_len));
3094 	if (nread != sizeof(uint32))
3095 		ereport(ERROR,
3096 				(errcode_for_file_access(),
3097 				 errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes",
3098 						tapenum, sizeof(uint32), nread)));
3099 
3100 	tuple = (MinimalTuple) palloc(t_len);
3101 	tuple->t_len = t_len;
3102 
3103 	nread = LogicalTapeRead(tapeset, tapenum,
3104 							(void *) ((char *) tuple + sizeof(uint32)),
3105 							t_len - sizeof(uint32));
3106 	if (nread != t_len - sizeof(uint32))
3107 		ereport(ERROR,
3108 				(errcode_for_file_access(),
3109 				 errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes",
3110 						tapenum, t_len - sizeof(uint32), nread)));
3111 
3112 	return tuple;
3113 }
3114 
3115 /*
3116  * hashagg_finish_initial_spills
3117  *
3118  * After a HashAggBatch has been processed, it may have spilled tuples to
3119  * disk. If so, turn the spilled partitions into new batches that must later
3120  * be executed.
3121  */
3122 static void
hashagg_finish_initial_spills(AggState * aggstate)3123 hashagg_finish_initial_spills(AggState *aggstate)
3124 {
3125 	int			setno;
3126 	int			total_npartitions = 0;
3127 
3128 	if (aggstate->hash_spills != NULL)
3129 	{
3130 		for (setno = 0; setno < aggstate->num_hashes; setno++)
3131 		{
3132 			HashAggSpill *spill = &aggstate->hash_spills[setno];
3133 
3134 			total_npartitions += spill->npartitions;
3135 			hashagg_spill_finish(aggstate, spill, setno);
3136 		}
3137 
3138 		/*
3139 		 * We're not processing tuples from outer plan any more; only
3140 		 * processing batches of spilled tuples. The initial spill structures
3141 		 * are no longer needed.
3142 		 */
3143 		pfree(aggstate->hash_spills);
3144 		aggstate->hash_spills = NULL;
3145 	}
3146 
3147 	hash_agg_update_metrics(aggstate, false, total_npartitions);
3148 	aggstate->hash_spill_mode = false;
3149 }
3150 
3151 /*
3152  * hashagg_spill_finish
3153  *
3154  * Transform spill partitions into new batches.
3155  */
3156 static void
hashagg_spill_finish(AggState * aggstate,HashAggSpill * spill,int setno)3157 hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno)
3158 {
3159 	int			i;
3160 	int			used_bits = 32 - spill->shift;
3161 
3162 	if (spill->npartitions == 0)
3163 		return;					/* didn't spill */
3164 
3165 	for (i = 0; i < spill->npartitions; i++)
3166 	{
3167 		LogicalTapeSet *tapeset = aggstate->hash_tapeinfo->tapeset;
3168 		int			tapenum = spill->partitions[i];
3169 		HashAggBatch *new_batch;
3170 		double		cardinality;
3171 
3172 		/* if the partition is empty, don't create a new batch of work */
3173 		if (spill->ntuples[i] == 0)
3174 			continue;
3175 
3176 		cardinality = estimateHyperLogLog(&spill->hll_card[i]);
3177 		freeHyperLogLog(&spill->hll_card[i]);
3178 
3179 		/* rewinding frees the buffer while not in use */
3180 		LogicalTapeRewindForRead(tapeset, tapenum,
3181 								 HASHAGG_READ_BUFFER_SIZE);
3182 
3183 		new_batch = hashagg_batch_new(tapeset, tapenum, setno,
3184 									  spill->ntuples[i], cardinality,
3185 									  used_bits);
3186 		aggstate->hash_batches = lappend(aggstate->hash_batches, new_batch);
3187 		aggstate->hash_batches_used++;
3188 	}
3189 
3190 	pfree(spill->ntuples);
3191 	pfree(spill->hll_card);
3192 	pfree(spill->partitions);
3193 }
3194 
3195 /*
3196  * Free resources related to a spilled HashAgg.
3197  */
3198 static void
hashagg_reset_spill_state(AggState * aggstate)3199 hashagg_reset_spill_state(AggState *aggstate)
3200 {
3201 	/* free spills from initial pass */
3202 	if (aggstate->hash_spills != NULL)
3203 	{
3204 		int			setno;
3205 
3206 		for (setno = 0; setno < aggstate->num_hashes; setno++)
3207 		{
3208 			HashAggSpill *spill = &aggstate->hash_spills[setno];
3209 
3210 			pfree(spill->ntuples);
3211 			pfree(spill->partitions);
3212 		}
3213 		pfree(aggstate->hash_spills);
3214 		aggstate->hash_spills = NULL;
3215 	}
3216 
3217 	/* free batches */
3218 	list_free_deep(aggstate->hash_batches);
3219 	aggstate->hash_batches = NIL;
3220 
3221 	/* close tape set */
3222 	if (aggstate->hash_tapeinfo != NULL)
3223 	{
3224 		HashTapeInfo *tapeinfo = aggstate->hash_tapeinfo;
3225 
3226 		LogicalTapeSetClose(tapeinfo->tapeset);
3227 		pfree(tapeinfo->freetapes);
3228 		pfree(tapeinfo);
3229 		aggstate->hash_tapeinfo = NULL;
3230 	}
3231 }
3232 
3233 
3234 /* -----------------
3235  * ExecInitAgg
3236  *
3237  *	Creates the run-time information for the agg node produced by the
3238  *	planner and initializes its outer subtree.
3239  *
3240  * -----------------
3241  */
3242 AggState *
ExecInitAgg(Agg * node,EState * estate,int eflags)3243 ExecInitAgg(Agg *node, EState *estate, int eflags)
3244 {
3245 	AggState   *aggstate;
3246 	AggStatePerAgg peraggs;
3247 	AggStatePerTrans pertransstates;
3248 	AggStatePerGroup *pergroups;
3249 	Plan	   *outerPlan;
3250 	ExprContext *econtext;
3251 	TupleDesc	scanDesc;
3252 	int			max_aggno;
3253 	int			max_transno;
3254 	int			numaggrefs;
3255 	int			numaggs;
3256 	int			numtrans;
3257 	int			phase;
3258 	int			phaseidx;
3259 	ListCell   *l;
3260 	Bitmapset  *all_grouped_cols = NULL;
3261 	int			numGroupingSets = 1;
3262 	int			numPhases;
3263 	int			numHashes;
3264 	int			i = 0;
3265 	int			j = 0;
3266 	bool		use_hashing = (node->aggstrategy == AGG_HASHED ||
3267 							   node->aggstrategy == AGG_MIXED);
3268 
3269 	/* check for unsupported flags */
3270 	Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
3271 
3272 	/*
3273 	 * create state structure
3274 	 */
3275 	aggstate = makeNode(AggState);
3276 	aggstate->ss.ps.plan = (Plan *) node;
3277 	aggstate->ss.ps.state = estate;
3278 	aggstate->ss.ps.ExecProcNode = ExecAgg;
3279 
3280 	aggstate->aggs = NIL;
3281 	aggstate->numaggs = 0;
3282 	aggstate->numtrans = 0;
3283 	aggstate->aggstrategy = node->aggstrategy;
3284 	aggstate->aggsplit = node->aggsplit;
3285 	aggstate->maxsets = 0;
3286 	aggstate->projected_set = -1;
3287 	aggstate->current_set = 0;
3288 	aggstate->peragg = NULL;
3289 	aggstate->pertrans = NULL;
3290 	aggstate->curperagg = NULL;
3291 	aggstate->curpertrans = NULL;
3292 	aggstate->input_done = false;
3293 	aggstate->agg_done = false;
3294 	aggstate->pergroups = NULL;
3295 	aggstate->grp_firstTuple = NULL;
3296 	aggstate->sort_in = NULL;
3297 	aggstate->sort_out = NULL;
3298 
3299 	/*
3300 	 * phases[0] always exists, but is dummy in sorted/plain mode
3301 	 */
3302 	numPhases = (use_hashing ? 1 : 2);
3303 	numHashes = (use_hashing ? 1 : 0);
3304 
3305 	/*
3306 	 * Calculate the maximum number of grouping sets in any phase; this
3307 	 * determines the size of some allocations.  Also calculate the number of
3308 	 * phases, since all hashed/mixed nodes contribute to only a single phase.
3309 	 */
3310 	if (node->groupingSets)
3311 	{
3312 		numGroupingSets = list_length(node->groupingSets);
3313 
3314 		foreach(l, node->chain)
3315 		{
3316 			Agg		   *agg = lfirst(l);
3317 
3318 			numGroupingSets = Max(numGroupingSets,
3319 								  list_length(agg->groupingSets));
3320 
3321 			/*
3322 			 * additional AGG_HASHED aggs become part of phase 0, but all
3323 			 * others add an extra phase.
3324 			 */
3325 			if (agg->aggstrategy != AGG_HASHED)
3326 				++numPhases;
3327 			else
3328 				++numHashes;
3329 		}
3330 	}
3331 
3332 	aggstate->maxsets = numGroupingSets;
3333 	aggstate->numphases = numPhases;
3334 
3335 	aggstate->aggcontexts = (ExprContext **)
3336 		palloc0(sizeof(ExprContext *) * numGroupingSets);
3337 
3338 	/*
3339 	 * Create expression contexts.  We need three or more, one for
3340 	 * per-input-tuple processing, one for per-output-tuple processing, one
3341 	 * for all the hashtables, and one for each grouping set.  The per-tuple
3342 	 * memory context of the per-grouping-set ExprContexts (aggcontexts)
3343 	 * replaces the standalone memory context formerly used to hold transition
3344 	 * values.  We cheat a little by using ExecAssignExprContext() to build
3345 	 * all of them.
3346 	 *
3347 	 * NOTE: the details of what is stored in aggcontexts and what is stored
3348 	 * in the regular per-query memory context are driven by a simple
3349 	 * decision: we want to reset the aggcontext at group boundaries (if not
3350 	 * hashing) and in ExecReScanAgg to recover no-longer-wanted space.
3351 	 */
3352 	ExecAssignExprContext(estate, &aggstate->ss.ps);
3353 	aggstate->tmpcontext = aggstate->ss.ps.ps_ExprContext;
3354 
3355 	for (i = 0; i < numGroupingSets; ++i)
3356 	{
3357 		ExecAssignExprContext(estate, &aggstate->ss.ps);
3358 		aggstate->aggcontexts[i] = aggstate->ss.ps.ps_ExprContext;
3359 	}
3360 
3361 	if (use_hashing)
3362 		aggstate->hashcontext = CreateWorkExprContext(estate);
3363 
3364 	ExecAssignExprContext(estate, &aggstate->ss.ps);
3365 
3366 	/*
3367 	 * Initialize child nodes.
3368 	 *
3369 	 * If we are doing a hashed aggregation then the child plan does not need
3370 	 * to handle REWIND efficiently; see ExecReScanAgg.
3371 	 */
3372 	if (node->aggstrategy == AGG_HASHED)
3373 		eflags &= ~EXEC_FLAG_REWIND;
3374 	outerPlan = outerPlan(node);
3375 	outerPlanState(aggstate) = ExecInitNode(outerPlan, estate, eflags);
3376 
3377 	/*
3378 	 * initialize source tuple type.
3379 	 */
3380 	aggstate->ss.ps.outerops =
3381 		ExecGetResultSlotOps(outerPlanState(&aggstate->ss),
3382 							 &aggstate->ss.ps.outeropsfixed);
3383 	aggstate->ss.ps.outeropsset = true;
3384 
3385 	ExecCreateScanSlotFromOuterPlan(estate, &aggstate->ss,
3386 									aggstate->ss.ps.outerops);
3387 	scanDesc = aggstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
3388 
3389 	/*
3390 	 * If there are more than two phases (including a potential dummy phase
3391 	 * 0), input will be resorted using tuplesort. Need a slot for that.
3392 	 */
3393 	if (numPhases > 2)
3394 	{
3395 		aggstate->sort_slot = ExecInitExtraTupleSlot(estate, scanDesc,
3396 													 &TTSOpsMinimalTuple);
3397 
3398 		/*
3399 		 * The output of the tuplesort, and the output from the outer child
3400 		 * might not use the same type of slot. In most cases the child will
3401 		 * be a Sort, and thus return a TTSOpsMinimalTuple type slot - but the
3402 		 * input can also be presorted due an index, in which case it could be
3403 		 * a different type of slot.
3404 		 *
3405 		 * XXX: For efficiency it would be good to instead/additionally
3406 		 * generate expressions with corresponding settings of outerops* for
3407 		 * the individual phases - deforming is often a bottleneck for
3408 		 * aggregations with lots of rows per group. If there's multiple
3409 		 * sorts, we know that all but the first use TTSOpsMinimalTuple (via
3410 		 * the nodeAgg.c internal tuplesort).
3411 		 */
3412 		if (aggstate->ss.ps.outeropsfixed &&
3413 			aggstate->ss.ps.outerops != &TTSOpsMinimalTuple)
3414 			aggstate->ss.ps.outeropsfixed = false;
3415 	}
3416 
3417 	/*
3418 	 * Initialize result type, slot and projection.
3419 	 */
3420 	ExecInitResultTupleSlotTL(&aggstate->ss.ps, &TTSOpsVirtual);
3421 	ExecAssignProjectionInfo(&aggstate->ss.ps, NULL);
3422 
3423 	/*
3424 	 * initialize child expressions
3425 	 *
3426 	 * We expect the parser to have checked that no aggs contain other agg
3427 	 * calls in their arguments (and just to be sure, we verify it again while
3428 	 * initializing the plan node).  This would make no sense under SQL
3429 	 * semantics, and it's forbidden by the spec.  Because it is true, we
3430 	 * don't need to worry about evaluating the aggs in any particular order.
3431 	 *
3432 	 * Note: execExpr.c finds Aggrefs for us, and adds them to aggstate->aggs.
3433 	 * Aggrefs in the qual are found here; Aggrefs in the targetlist are found
3434 	 * during ExecAssignProjectionInfo, above.
3435 	 */
3436 	aggstate->ss.ps.qual =
3437 		ExecInitQual(node->plan.qual, (PlanState *) aggstate);
3438 
3439 	/*
3440 	 * We should now have found all Aggrefs in the targetlist and quals.
3441 	 */
3442 	numaggrefs = list_length(aggstate->aggs);
3443 	max_aggno = -1;
3444 	max_transno = -1;
3445 	foreach(l, aggstate->aggs)
3446 	{
3447 		Aggref	   *aggref = (Aggref *) lfirst(l);
3448 
3449 		max_aggno = Max(max_aggno, aggref->aggno);
3450 		max_transno = Max(max_transno, aggref->aggtransno);
3451 	}
3452 	numaggs = max_aggno + 1;
3453 	numtrans = max_transno + 1;
3454 
3455 	/*
3456 	 * For each phase, prepare grouping set data and fmgr lookup data for
3457 	 * compare functions.  Accumulate all_grouped_cols in passing.
3458 	 */
3459 	aggstate->phases = palloc0(numPhases * sizeof(AggStatePerPhaseData));
3460 
3461 	aggstate->num_hashes = numHashes;
3462 	if (numHashes)
3463 	{
3464 		aggstate->perhash = palloc0(sizeof(AggStatePerHashData) * numHashes);
3465 		aggstate->phases[0].numsets = 0;
3466 		aggstate->phases[0].gset_lengths = palloc(numHashes * sizeof(int));
3467 		aggstate->phases[0].grouped_cols = palloc(numHashes * sizeof(Bitmapset *));
3468 	}
3469 
3470 	phase = 0;
3471 	for (phaseidx = 0; phaseidx <= list_length(node->chain); ++phaseidx)
3472 	{
3473 		Agg		   *aggnode;
3474 		Sort	   *sortnode;
3475 
3476 		if (phaseidx > 0)
3477 		{
3478 			aggnode = list_nth_node(Agg, node->chain, phaseidx - 1);
3479 			sortnode = castNode(Sort, aggnode->plan.lefttree);
3480 		}
3481 		else
3482 		{
3483 			aggnode = node;
3484 			sortnode = NULL;
3485 		}
3486 
3487 		Assert(phase <= 1 || sortnode);
3488 
3489 		if (aggnode->aggstrategy == AGG_HASHED
3490 			|| aggnode->aggstrategy == AGG_MIXED)
3491 		{
3492 			AggStatePerPhase phasedata = &aggstate->phases[0];
3493 			AggStatePerHash perhash;
3494 			Bitmapset  *cols = NULL;
3495 
3496 			Assert(phase == 0);
3497 			i = phasedata->numsets++;
3498 			perhash = &aggstate->perhash[i];
3499 
3500 			/* phase 0 always points to the "real" Agg in the hash case */
3501 			phasedata->aggnode = node;
3502 			phasedata->aggstrategy = node->aggstrategy;
3503 
3504 			/* but the actual Agg node representing this hash is saved here */
3505 			perhash->aggnode = aggnode;
3506 
3507 			phasedata->gset_lengths[i] = perhash->numCols = aggnode->numCols;
3508 
3509 			for (j = 0; j < aggnode->numCols; ++j)
3510 				cols = bms_add_member(cols, aggnode->grpColIdx[j]);
3511 
3512 			phasedata->grouped_cols[i] = cols;
3513 
3514 			all_grouped_cols = bms_add_members(all_grouped_cols, cols);
3515 			continue;
3516 		}
3517 		else
3518 		{
3519 			AggStatePerPhase phasedata = &aggstate->phases[++phase];
3520 			int			num_sets;
3521 
3522 			phasedata->numsets = num_sets = list_length(aggnode->groupingSets);
3523 
3524 			if (num_sets)
3525 			{
3526 				phasedata->gset_lengths = palloc(num_sets * sizeof(int));
3527 				phasedata->grouped_cols = palloc(num_sets * sizeof(Bitmapset *));
3528 
3529 				i = 0;
3530 				foreach(l, aggnode->groupingSets)
3531 				{
3532 					int			current_length = list_length(lfirst(l));
3533 					Bitmapset  *cols = NULL;
3534 
3535 					/* planner forces this to be correct */
3536 					for (j = 0; j < current_length; ++j)
3537 						cols = bms_add_member(cols, aggnode->grpColIdx[j]);
3538 
3539 					phasedata->grouped_cols[i] = cols;
3540 					phasedata->gset_lengths[i] = current_length;
3541 
3542 					++i;
3543 				}
3544 
3545 				all_grouped_cols = bms_add_members(all_grouped_cols,
3546 												   phasedata->grouped_cols[0]);
3547 			}
3548 			else
3549 			{
3550 				Assert(phaseidx == 0);
3551 
3552 				phasedata->gset_lengths = NULL;
3553 				phasedata->grouped_cols = NULL;
3554 			}
3555 
3556 			/*
3557 			 * If we are grouping, precompute fmgr lookup data for inner loop.
3558 			 */
3559 			if (aggnode->aggstrategy == AGG_SORTED)
3560 			{
3561 				int			i = 0;
3562 
3563 				Assert(aggnode->numCols > 0);
3564 
3565 				/*
3566 				 * Build a separate function for each subset of columns that
3567 				 * need to be compared.
3568 				 */
3569 				phasedata->eqfunctions =
3570 					(ExprState **) palloc0(aggnode->numCols * sizeof(ExprState *));
3571 
3572 				/* for each grouping set */
3573 				for (i = 0; i < phasedata->numsets; i++)
3574 				{
3575 					int			length = phasedata->gset_lengths[i];
3576 
3577 					if (phasedata->eqfunctions[length - 1] != NULL)
3578 						continue;
3579 
3580 					phasedata->eqfunctions[length - 1] =
3581 						execTuplesMatchPrepare(scanDesc,
3582 											   length,
3583 											   aggnode->grpColIdx,
3584 											   aggnode->grpOperators,
3585 											   aggnode->grpCollations,
3586 											   (PlanState *) aggstate);
3587 				}
3588 
3589 				/* and for all grouped columns, unless already computed */
3590 				if (phasedata->eqfunctions[aggnode->numCols - 1] == NULL)
3591 				{
3592 					phasedata->eqfunctions[aggnode->numCols - 1] =
3593 						execTuplesMatchPrepare(scanDesc,
3594 											   aggnode->numCols,
3595 											   aggnode->grpColIdx,
3596 											   aggnode->grpOperators,
3597 											   aggnode->grpCollations,
3598 											   (PlanState *) aggstate);
3599 				}
3600 			}
3601 
3602 			phasedata->aggnode = aggnode;
3603 			phasedata->aggstrategy = aggnode->aggstrategy;
3604 			phasedata->sortnode = sortnode;
3605 		}
3606 	}
3607 
3608 	/*
3609 	 * Convert all_grouped_cols to a descending-order list.
3610 	 */
3611 	i = -1;
3612 	while ((i = bms_next_member(all_grouped_cols, i)) >= 0)
3613 		aggstate->all_grouped_cols = lcons_int(i, aggstate->all_grouped_cols);
3614 
3615 	/*
3616 	 * Set up aggregate-result storage in the output expr context, and also
3617 	 * allocate my private per-agg working storage
3618 	 */
3619 	econtext = aggstate->ss.ps.ps_ExprContext;
3620 	econtext->ecxt_aggvalues = (Datum *) palloc0(sizeof(Datum) * numaggs);
3621 	econtext->ecxt_aggnulls = (bool *) palloc0(sizeof(bool) * numaggs);
3622 
3623 	peraggs = (AggStatePerAgg) palloc0(sizeof(AggStatePerAggData) * numaggs);
3624 	pertransstates = (AggStatePerTrans) palloc0(sizeof(AggStatePerTransData) * numtrans);
3625 
3626 	aggstate->peragg = peraggs;
3627 	aggstate->pertrans = pertransstates;
3628 
3629 
3630 	aggstate->all_pergroups =
3631 		(AggStatePerGroup *) palloc0(sizeof(AggStatePerGroup)
3632 									 * (numGroupingSets + numHashes));
3633 	pergroups = aggstate->all_pergroups;
3634 
3635 	if (node->aggstrategy != AGG_HASHED)
3636 	{
3637 		for (i = 0; i < numGroupingSets; i++)
3638 		{
3639 			pergroups[i] = (AggStatePerGroup) palloc0(sizeof(AggStatePerGroupData)
3640 													  * numaggs);
3641 		}
3642 
3643 		aggstate->pergroups = pergroups;
3644 		pergroups += numGroupingSets;
3645 	}
3646 
3647 	/*
3648 	 * Hashing can only appear in the initial phase.
3649 	 */
3650 	if (use_hashing)
3651 	{
3652 		Plan	   *outerplan = outerPlan(node);
3653 		uint64		totalGroups = 0;
3654 		int			i;
3655 
3656 		aggstate->hash_metacxt = AllocSetContextCreate(aggstate->ss.ps.state->es_query_cxt,
3657 													   "HashAgg meta context",
3658 													   ALLOCSET_DEFAULT_SIZES);
3659 		aggstate->hash_spill_rslot = ExecInitExtraTupleSlot(estate, scanDesc,
3660 															&TTSOpsMinimalTuple);
3661 		aggstate->hash_spill_wslot = ExecInitExtraTupleSlot(estate, scanDesc,
3662 															&TTSOpsVirtual);
3663 
3664 		/* this is an array of pointers, not structures */
3665 		aggstate->hash_pergroup = pergroups;
3666 
3667 		aggstate->hashentrysize = hash_agg_entry_size(aggstate->numtrans,
3668 													  outerplan->plan_width,
3669 													  node->transitionSpace);
3670 
3671 		/*
3672 		 * Consider all of the grouping sets together when setting the limits
3673 		 * and estimating the number of partitions. This can be inaccurate
3674 		 * when there is more than one grouping set, but should still be
3675 		 * reasonable.
3676 		 */
3677 		for (i = 0; i < aggstate->num_hashes; i++)
3678 			totalGroups += aggstate->perhash[i].aggnode->numGroups;
3679 
3680 		hash_agg_set_limits(aggstate->hashentrysize, totalGroups, 0,
3681 							&aggstate->hash_mem_limit,
3682 							&aggstate->hash_ngroups_limit,
3683 							&aggstate->hash_planned_partitions);
3684 		find_hash_columns(aggstate);
3685 
3686 		/* Skip massive memory allocation if we are just doing EXPLAIN */
3687 		if (!(eflags & EXEC_FLAG_EXPLAIN_ONLY))
3688 			build_hash_tables(aggstate);
3689 
3690 		aggstate->table_filled = false;
3691 
3692 		/* Initialize this to 1, meaning nothing spilled, yet */
3693 		aggstate->hash_batches_used = 1;
3694 	}
3695 
3696 	/*
3697 	 * Initialize current phase-dependent values to initial phase. The initial
3698 	 * phase is 1 (first sort pass) for all strategies that use sorting (if
3699 	 * hashing is being done too, then phase 0 is processed last); but if only
3700 	 * hashing is being done, then phase 0 is all there is.
3701 	 */
3702 	if (node->aggstrategy == AGG_HASHED)
3703 	{
3704 		aggstate->current_phase = 0;
3705 		initialize_phase(aggstate, 0);
3706 		select_current_set(aggstate, 0, true);
3707 	}
3708 	else
3709 	{
3710 		aggstate->current_phase = 1;
3711 		initialize_phase(aggstate, 1);
3712 		select_current_set(aggstate, 0, false);
3713 	}
3714 
3715 	/*
3716 	 * Perform lookups of aggregate function info, and initialize the
3717 	 * unchanging fields of the per-agg and per-trans data.
3718 	 */
3719 	foreach(l, aggstate->aggs)
3720 	{
3721 		Aggref	   *aggref = lfirst(l);
3722 		AggStatePerAgg peragg;
3723 		AggStatePerTrans pertrans;
3724 		Oid			inputTypes[FUNC_MAX_ARGS];
3725 		int			numArguments;
3726 		int			numDirectArgs;
3727 		HeapTuple	aggTuple;
3728 		Form_pg_aggregate aggform;
3729 		AclResult	aclresult;
3730 		Oid			finalfn_oid;
3731 		Oid			serialfn_oid,
3732 					deserialfn_oid;
3733 		Oid			aggOwner;
3734 		Expr	   *finalfnexpr;
3735 		Oid			aggtranstype;
3736 
3737 		/* Planner should have assigned aggregate to correct level */
3738 		Assert(aggref->agglevelsup == 0);
3739 		/* ... and the split mode should match */
3740 		Assert(aggref->aggsplit == aggstate->aggsplit);
3741 
3742 		peragg = &peraggs[aggref->aggno];
3743 
3744 		/* Check if we initialized the state for this aggregate already. */
3745 		if (peragg->aggref != NULL)
3746 			continue;
3747 
3748 		peragg->aggref = aggref;
3749 		peragg->transno = aggref->aggtransno;
3750 
3751 		/* Fetch the pg_aggregate row */
3752 		aggTuple = SearchSysCache1(AGGFNOID,
3753 								   ObjectIdGetDatum(aggref->aggfnoid));
3754 		if (!HeapTupleIsValid(aggTuple))
3755 			elog(ERROR, "cache lookup failed for aggregate %u",
3756 				 aggref->aggfnoid);
3757 		aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
3758 
3759 		/* Check permission to call aggregate function */
3760 		aclresult = pg_proc_aclcheck(aggref->aggfnoid, GetUserId(),
3761 									 ACL_EXECUTE);
3762 		if (aclresult != ACLCHECK_OK)
3763 			aclcheck_error(aclresult, OBJECT_AGGREGATE,
3764 						   get_func_name(aggref->aggfnoid));
3765 		InvokeFunctionExecuteHook(aggref->aggfnoid);
3766 
3767 		/* planner recorded transition state type in the Aggref itself */
3768 		aggtranstype = aggref->aggtranstype;
3769 		Assert(OidIsValid(aggtranstype));
3770 
3771 		/* Final function only required if we're finalizing the aggregates */
3772 		if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit))
3773 			peragg->finalfn_oid = finalfn_oid = InvalidOid;
3774 		else
3775 			peragg->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
3776 
3777 		serialfn_oid = InvalidOid;
3778 		deserialfn_oid = InvalidOid;
3779 
3780 		/*
3781 		 * Check if serialization/deserialization is required.  We only do it
3782 		 * for aggregates that have transtype INTERNAL.
3783 		 */
3784 		if (aggtranstype == INTERNALOID)
3785 		{
3786 			/*
3787 			 * The planner should only have generated a serialize agg node if
3788 			 * every aggregate with an INTERNAL state has a serialization
3789 			 * function.  Verify that.
3790 			 */
3791 			if (DO_AGGSPLIT_SERIALIZE(aggstate->aggsplit))
3792 			{
3793 				/* serialization only valid when not running finalfn */
3794 				Assert(DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit));
3795 
3796 				if (!OidIsValid(aggform->aggserialfn))
3797 					elog(ERROR, "serialfunc not provided for serialization aggregation");
3798 				serialfn_oid = aggform->aggserialfn;
3799 			}
3800 
3801 			/* Likewise for deserialization functions */
3802 			if (DO_AGGSPLIT_DESERIALIZE(aggstate->aggsplit))
3803 			{
3804 				/* deserialization only valid when combining states */
3805 				Assert(DO_AGGSPLIT_COMBINE(aggstate->aggsplit));
3806 
3807 				if (!OidIsValid(aggform->aggdeserialfn))
3808 					elog(ERROR, "deserialfunc not provided for deserialization aggregation");
3809 				deserialfn_oid = aggform->aggdeserialfn;
3810 			}
3811 		}
3812 
3813 		/* Check that aggregate owner has permission to call component fns */
3814 		{
3815 			HeapTuple	procTuple;
3816 
3817 			procTuple = SearchSysCache1(PROCOID,
3818 										ObjectIdGetDatum(aggref->aggfnoid));
3819 			if (!HeapTupleIsValid(procTuple))
3820 				elog(ERROR, "cache lookup failed for function %u",
3821 					 aggref->aggfnoid);
3822 			aggOwner = ((Form_pg_proc) GETSTRUCT(procTuple))->proowner;
3823 			ReleaseSysCache(procTuple);
3824 
3825 			if (OidIsValid(finalfn_oid))
3826 			{
3827 				aclresult = pg_proc_aclcheck(finalfn_oid, aggOwner,
3828 											 ACL_EXECUTE);
3829 				if (aclresult != ACLCHECK_OK)
3830 					aclcheck_error(aclresult, OBJECT_FUNCTION,
3831 								   get_func_name(finalfn_oid));
3832 				InvokeFunctionExecuteHook(finalfn_oid);
3833 			}
3834 			if (OidIsValid(serialfn_oid))
3835 			{
3836 				aclresult = pg_proc_aclcheck(serialfn_oid, aggOwner,
3837 											 ACL_EXECUTE);
3838 				if (aclresult != ACLCHECK_OK)
3839 					aclcheck_error(aclresult, OBJECT_FUNCTION,
3840 								   get_func_name(serialfn_oid));
3841 				InvokeFunctionExecuteHook(serialfn_oid);
3842 			}
3843 			if (OidIsValid(deserialfn_oid))
3844 			{
3845 				aclresult = pg_proc_aclcheck(deserialfn_oid, aggOwner,
3846 											 ACL_EXECUTE);
3847 				if (aclresult != ACLCHECK_OK)
3848 					aclcheck_error(aclresult, OBJECT_FUNCTION,
3849 								   get_func_name(deserialfn_oid));
3850 				InvokeFunctionExecuteHook(deserialfn_oid);
3851 			}
3852 		}
3853 
3854 		/*
3855 		 * Get actual datatypes of the (nominal) aggregate inputs.  These
3856 		 * could be different from the agg's declared input types, when the
3857 		 * agg accepts ANY or a polymorphic type.
3858 		 */
3859 		numArguments = get_aggregate_argtypes(aggref, inputTypes);
3860 
3861 		/* Count the "direct" arguments, if any */
3862 		numDirectArgs = list_length(aggref->aggdirectargs);
3863 
3864 		/* Detect how many arguments to pass to the finalfn */
3865 		if (aggform->aggfinalextra)
3866 			peragg->numFinalArgs = numArguments + 1;
3867 		else
3868 			peragg->numFinalArgs = numDirectArgs + 1;
3869 
3870 		/* Initialize any direct-argument expressions */
3871 		peragg->aggdirectargs = ExecInitExprList(aggref->aggdirectargs,
3872 												 (PlanState *) aggstate);
3873 
3874 		/*
3875 		 * build expression trees using actual argument & result types for the
3876 		 * finalfn, if it exists and is required.
3877 		 */
3878 		if (OidIsValid(finalfn_oid))
3879 		{
3880 			build_aggregate_finalfn_expr(inputTypes,
3881 										 peragg->numFinalArgs,
3882 										 aggtranstype,
3883 										 aggref->aggtype,
3884 										 aggref->inputcollid,
3885 										 finalfn_oid,
3886 										 &finalfnexpr);
3887 			fmgr_info(finalfn_oid, &peragg->finalfn);
3888 			fmgr_info_set_expr((Node *) finalfnexpr, &peragg->finalfn);
3889 		}
3890 
3891 		/* get info about the output value's datatype */
3892 		get_typlenbyval(aggref->aggtype,
3893 						&peragg->resulttypeLen,
3894 						&peragg->resulttypeByVal);
3895 
3896 		/*
3897 		 * Build working state for invoking the transition function, if we
3898 		 * haven't done it already.
3899 		 */
3900 		pertrans = &pertransstates[aggref->aggtransno];
3901 		if (pertrans->aggref == NULL)
3902 		{
3903 			Datum		textInitVal;
3904 			Datum		initValue;
3905 			bool		initValueIsNull;
3906 			Oid			transfn_oid;
3907 
3908 			/*
3909 			 * If this aggregation is performing state combines, then instead
3910 			 * of using the transition function, we'll use the combine
3911 			 * function
3912 			 */
3913 			if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit))
3914 			{
3915 				transfn_oid = aggform->aggcombinefn;
3916 
3917 				/* If not set then the planner messed up */
3918 				if (!OidIsValid(transfn_oid))
3919 					elog(ERROR, "combinefn not set for aggregate function");
3920 			}
3921 			else
3922 				transfn_oid = aggform->aggtransfn;
3923 
3924 			aclresult = pg_proc_aclcheck(transfn_oid, aggOwner,
3925 										 ACL_EXECUTE);
3926 			if (aclresult != ACLCHECK_OK)
3927 				aclcheck_error(aclresult, OBJECT_FUNCTION,
3928 							   get_func_name(transfn_oid));
3929 			InvokeFunctionExecuteHook(transfn_oid);
3930 
3931 			/*
3932 			 * initval is potentially null, so don't try to access it as a
3933 			 * struct field. Must do it the hard way with SysCacheGetAttr.
3934 			 */
3935 			textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple,
3936 										  Anum_pg_aggregate_agginitval,
3937 										  &initValueIsNull);
3938 			if (initValueIsNull)
3939 				initValue = (Datum) 0;
3940 			else
3941 				initValue = GetAggInitVal(textInitVal, aggtranstype);
3942 
3943 			build_pertrans_for_aggref(pertrans, aggstate, estate,
3944 									  aggref, transfn_oid, aggtranstype,
3945 									  serialfn_oid, deserialfn_oid,
3946 									  initValue, initValueIsNull,
3947 									  inputTypes, numArguments);
3948 		}
3949 		else
3950 			pertrans->aggshared = true;
3951 		ReleaseSysCache(aggTuple);
3952 	}
3953 
3954 	/*
3955 	 * Update aggstate->numaggs to be the number of unique aggregates found.
3956 	 * Also set numstates to the number of unique transition states found.
3957 	 */
3958 	aggstate->numaggs = numaggs;
3959 	aggstate->numtrans = numtrans;
3960 
3961 	/*
3962 	 * Last, check whether any more aggregates got added onto the node while
3963 	 * we processed the expressions for the aggregate arguments (including not
3964 	 * only the regular arguments and FILTER expressions handled immediately
3965 	 * above, but any direct arguments we might've handled earlier).  If so,
3966 	 * we have nested aggregate functions, which is semantically nonsensical,
3967 	 * so complain.  (This should have been caught by the parser, so we don't
3968 	 * need to work hard on a helpful error message; but we defend against it
3969 	 * here anyway, just to be sure.)
3970 	 */
3971 	if (numaggrefs != list_length(aggstate->aggs))
3972 		ereport(ERROR,
3973 				(errcode(ERRCODE_GROUPING_ERROR),
3974 				 errmsg("aggregate function calls cannot be nested")));
3975 
3976 	/*
3977 	 * Build expressions doing all the transition work at once. We build a
3978 	 * different one for each phase, as the number of transition function
3979 	 * invocation can differ between phases. Note this'll work both for
3980 	 * transition and combination functions (although there'll only be one
3981 	 * phase in the latter case).
3982 	 */
3983 	for (phaseidx = 0; phaseidx < aggstate->numphases; phaseidx++)
3984 	{
3985 		AggStatePerPhase phase = &aggstate->phases[phaseidx];
3986 		bool		dohash = false;
3987 		bool		dosort = false;
3988 
3989 		/* phase 0 doesn't necessarily exist */
3990 		if (!phase->aggnode)
3991 			continue;
3992 
3993 		if (aggstate->aggstrategy == AGG_MIXED && phaseidx == 1)
3994 		{
3995 			/*
3996 			 * Phase one, and only phase one, in a mixed agg performs both
3997 			 * sorting and aggregation.
3998 			 */
3999 			dohash = true;
4000 			dosort = true;
4001 		}
4002 		else if (aggstate->aggstrategy == AGG_MIXED && phaseidx == 0)
4003 		{
4004 			/*
4005 			 * No need to compute a transition function for an AGG_MIXED phase
4006 			 * 0 - the contents of the hashtables will have been computed
4007 			 * during phase 1.
4008 			 */
4009 			continue;
4010 		}
4011 		else if (phase->aggstrategy == AGG_PLAIN ||
4012 				 phase->aggstrategy == AGG_SORTED)
4013 		{
4014 			dohash = false;
4015 			dosort = true;
4016 		}
4017 		else if (phase->aggstrategy == AGG_HASHED)
4018 		{
4019 			dohash = true;
4020 			dosort = false;
4021 		}
4022 		else
4023 			Assert(false);
4024 
4025 		phase->evaltrans = ExecBuildAggTrans(aggstate, phase, dosort, dohash,
4026 											 false);
4027 
4028 		/* cache compiled expression for outer slot without NULL check */
4029 		phase->evaltrans_cache[0][0] = phase->evaltrans;
4030 	}
4031 
4032 	return aggstate;
4033 }
4034 
4035 /*
4036  * Build the state needed to calculate a state value for an aggregate.
4037  *
4038  * This initializes all the fields in 'pertrans'. 'aggref' is the aggregate
4039  * to initialize the state for. 'aggtransfn', 'aggtranstype', and the rest
4040  * of the arguments could be calculated from 'aggref', but the caller has
4041  * calculated them already, so might as well pass them.
4042  */
4043 static void
build_pertrans_for_aggref(AggStatePerTrans pertrans,AggState * aggstate,EState * estate,Aggref * aggref,Oid aggtransfn,Oid aggtranstype,Oid aggserialfn,Oid aggdeserialfn,Datum initValue,bool initValueIsNull,Oid * inputTypes,int numArguments)4044 build_pertrans_for_aggref(AggStatePerTrans pertrans,
4045 						  AggState *aggstate, EState *estate,
4046 						  Aggref *aggref,
4047 						  Oid aggtransfn, Oid aggtranstype,
4048 						  Oid aggserialfn, Oid aggdeserialfn,
4049 						  Datum initValue, bool initValueIsNull,
4050 						  Oid *inputTypes, int numArguments)
4051 {
4052 	int			numGroupingSets = Max(aggstate->maxsets, 1);
4053 	Expr	   *serialfnexpr = NULL;
4054 	Expr	   *deserialfnexpr = NULL;
4055 	ListCell   *lc;
4056 	int			numInputs;
4057 	int			numDirectArgs;
4058 	List	   *sortlist;
4059 	int			numSortCols;
4060 	int			numDistinctCols;
4061 	int			i;
4062 
4063 	/* Begin filling in the pertrans data */
4064 	pertrans->aggref = aggref;
4065 	pertrans->aggshared = false;
4066 	pertrans->aggCollation = aggref->inputcollid;
4067 	pertrans->transfn_oid = aggtransfn;
4068 	pertrans->serialfn_oid = aggserialfn;
4069 	pertrans->deserialfn_oid = aggdeserialfn;
4070 	pertrans->initValue = initValue;
4071 	pertrans->initValueIsNull = initValueIsNull;
4072 
4073 	/* Count the "direct" arguments, if any */
4074 	numDirectArgs = list_length(aggref->aggdirectargs);
4075 
4076 	/* Count the number of aggregated input columns */
4077 	pertrans->numInputs = numInputs = list_length(aggref->args);
4078 
4079 	pertrans->aggtranstype = aggtranstype;
4080 
4081 	/*
4082 	 * When combining states, we have no use at all for the aggregate
4083 	 * function's transfn. Instead we use the combinefn.  In this case, the
4084 	 * transfn and transfn_oid fields of pertrans refer to the combine
4085 	 * function rather than the transition function.
4086 	 */
4087 	if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit))
4088 	{
4089 		Expr	   *combinefnexpr;
4090 		size_t		numTransArgs;
4091 
4092 		/*
4093 		 * When combining there's only one input, the to-be-combined added
4094 		 * transition value from below (this node's transition value is
4095 		 * counted separately).
4096 		 */
4097 		pertrans->numTransInputs = 1;
4098 
4099 		/* account for the current transition state */
4100 		numTransArgs = pertrans->numTransInputs + 1;
4101 
4102 		build_aggregate_combinefn_expr(aggtranstype,
4103 									   aggref->inputcollid,
4104 									   aggtransfn,
4105 									   &combinefnexpr);
4106 		fmgr_info(aggtransfn, &pertrans->transfn);
4107 		fmgr_info_set_expr((Node *) combinefnexpr, &pertrans->transfn);
4108 
4109 		pertrans->transfn_fcinfo =
4110 			(FunctionCallInfo) palloc(SizeForFunctionCallInfo(2));
4111 		InitFunctionCallInfoData(*pertrans->transfn_fcinfo,
4112 								 &pertrans->transfn,
4113 								 numTransArgs,
4114 								 pertrans->aggCollation,
4115 								 (void *) aggstate, NULL);
4116 
4117 		/*
4118 		 * Ensure that a combine function to combine INTERNAL states is not
4119 		 * strict. This should have been checked during CREATE AGGREGATE, but
4120 		 * the strict property could have been changed since then.
4121 		 */
4122 		if (pertrans->transfn.fn_strict && aggtranstype == INTERNALOID)
4123 			ereport(ERROR,
4124 					(errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
4125 					 errmsg("combine function with transition type %s must not be declared STRICT",
4126 							format_type_be(aggtranstype))));
4127 	}
4128 	else
4129 	{
4130 		Expr	   *transfnexpr;
4131 		size_t		numTransArgs;
4132 
4133 		/* Detect how many arguments to pass to the transfn */
4134 		if (AGGKIND_IS_ORDERED_SET(aggref->aggkind))
4135 			pertrans->numTransInputs = numInputs;
4136 		else
4137 			pertrans->numTransInputs = numArguments;
4138 
4139 		/* account for the current transition state */
4140 		numTransArgs = pertrans->numTransInputs + 1;
4141 
4142 		/*
4143 		 * Set up infrastructure for calling the transfn.  Note that
4144 		 * invtransfn is not needed here.
4145 		 */
4146 		build_aggregate_transfn_expr(inputTypes,
4147 									 numArguments,
4148 									 numDirectArgs,
4149 									 aggref->aggvariadic,
4150 									 aggtranstype,
4151 									 aggref->inputcollid,
4152 									 aggtransfn,
4153 									 InvalidOid,
4154 									 &transfnexpr,
4155 									 NULL);
4156 		fmgr_info(aggtransfn, &pertrans->transfn);
4157 		fmgr_info_set_expr((Node *) transfnexpr, &pertrans->transfn);
4158 
4159 		pertrans->transfn_fcinfo =
4160 			(FunctionCallInfo) palloc(SizeForFunctionCallInfo(numTransArgs));
4161 		InitFunctionCallInfoData(*pertrans->transfn_fcinfo,
4162 								 &pertrans->transfn,
4163 								 numTransArgs,
4164 								 pertrans->aggCollation,
4165 								 (void *) aggstate, NULL);
4166 
4167 		/*
4168 		 * If the transfn is strict and the initval is NULL, make sure input
4169 		 * type and transtype are the same (or at least binary-compatible), so
4170 		 * that it's OK to use the first aggregated input value as the initial
4171 		 * transValue.  This should have been checked at agg definition time,
4172 		 * but we must check again in case the transfn's strictness property
4173 		 * has been changed.
4174 		 */
4175 		if (pertrans->transfn.fn_strict && pertrans->initValueIsNull)
4176 		{
4177 			if (numArguments <= numDirectArgs ||
4178 				!IsBinaryCoercible(inputTypes[numDirectArgs],
4179 								   aggtranstype))
4180 				ereport(ERROR,
4181 						(errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
4182 						 errmsg("aggregate %u needs to have compatible input type and transition type",
4183 								aggref->aggfnoid)));
4184 		}
4185 	}
4186 
4187 	/* get info about the state value's datatype */
4188 	get_typlenbyval(aggtranstype,
4189 					&pertrans->transtypeLen,
4190 					&pertrans->transtypeByVal);
4191 
4192 	if (OidIsValid(aggserialfn))
4193 	{
4194 		build_aggregate_serialfn_expr(aggserialfn,
4195 									  &serialfnexpr);
4196 		fmgr_info(aggserialfn, &pertrans->serialfn);
4197 		fmgr_info_set_expr((Node *) serialfnexpr, &pertrans->serialfn);
4198 
4199 		pertrans->serialfn_fcinfo =
4200 			(FunctionCallInfo) palloc(SizeForFunctionCallInfo(1));
4201 		InitFunctionCallInfoData(*pertrans->serialfn_fcinfo,
4202 								 &pertrans->serialfn,
4203 								 1,
4204 								 InvalidOid,
4205 								 (void *) aggstate, NULL);
4206 	}
4207 
4208 	if (OidIsValid(aggdeserialfn))
4209 	{
4210 		build_aggregate_deserialfn_expr(aggdeserialfn,
4211 										&deserialfnexpr);
4212 		fmgr_info(aggdeserialfn, &pertrans->deserialfn);
4213 		fmgr_info_set_expr((Node *) deserialfnexpr, &pertrans->deserialfn);
4214 
4215 		pertrans->deserialfn_fcinfo =
4216 			(FunctionCallInfo) palloc(SizeForFunctionCallInfo(2));
4217 		InitFunctionCallInfoData(*pertrans->deserialfn_fcinfo,
4218 								 &pertrans->deserialfn,
4219 								 2,
4220 								 InvalidOid,
4221 								 (void *) aggstate, NULL);
4222 
4223 	}
4224 
4225 	/*
4226 	 * If we're doing either DISTINCT or ORDER BY for a plain agg, then we
4227 	 * have a list of SortGroupClause nodes; fish out the data in them and
4228 	 * stick them into arrays.  We ignore ORDER BY for an ordered-set agg,
4229 	 * however; the agg's transfn and finalfn are responsible for that.
4230 	 *
4231 	 * Note that by construction, if there is a DISTINCT clause then the ORDER
4232 	 * BY clause is a prefix of it (see transformDistinctClause).
4233 	 */
4234 	if (AGGKIND_IS_ORDERED_SET(aggref->aggkind))
4235 	{
4236 		sortlist = NIL;
4237 		numSortCols = numDistinctCols = 0;
4238 	}
4239 	else if (aggref->aggdistinct)
4240 	{
4241 		sortlist = aggref->aggdistinct;
4242 		numSortCols = numDistinctCols = list_length(sortlist);
4243 		Assert(numSortCols >= list_length(aggref->aggorder));
4244 	}
4245 	else
4246 	{
4247 		sortlist = aggref->aggorder;
4248 		numSortCols = list_length(sortlist);
4249 		numDistinctCols = 0;
4250 	}
4251 
4252 	pertrans->numSortCols = numSortCols;
4253 	pertrans->numDistinctCols = numDistinctCols;
4254 
4255 	/*
4256 	 * If we have either sorting or filtering to do, create a tupledesc and
4257 	 * slot corresponding to the aggregated inputs (including sort
4258 	 * expressions) of the agg.
4259 	 */
4260 	if (numSortCols > 0 || aggref->aggfilter)
4261 	{
4262 		pertrans->sortdesc = ExecTypeFromTL(aggref->args);
4263 		pertrans->sortslot =
4264 			ExecInitExtraTupleSlot(estate, pertrans->sortdesc,
4265 								   &TTSOpsMinimalTuple);
4266 	}
4267 
4268 	if (numSortCols > 0)
4269 	{
4270 		/*
4271 		 * We don't implement DISTINCT or ORDER BY aggs in the HASHED case
4272 		 * (yet)
4273 		 */
4274 		Assert(aggstate->aggstrategy != AGG_HASHED && aggstate->aggstrategy != AGG_MIXED);
4275 
4276 		/* If we have only one input, we need its len/byval info. */
4277 		if (numInputs == 1)
4278 		{
4279 			get_typlenbyval(inputTypes[numDirectArgs],
4280 							&pertrans->inputtypeLen,
4281 							&pertrans->inputtypeByVal);
4282 		}
4283 		else if (numDistinctCols > 0)
4284 		{
4285 			/* we will need an extra slot to store prior values */
4286 			pertrans->uniqslot =
4287 				ExecInitExtraTupleSlot(estate, pertrans->sortdesc,
4288 									   &TTSOpsMinimalTuple);
4289 		}
4290 
4291 		/* Extract the sort information for use later */
4292 		pertrans->sortColIdx =
4293 			(AttrNumber *) palloc(numSortCols * sizeof(AttrNumber));
4294 		pertrans->sortOperators =
4295 			(Oid *) palloc(numSortCols * sizeof(Oid));
4296 		pertrans->sortCollations =
4297 			(Oid *) palloc(numSortCols * sizeof(Oid));
4298 		pertrans->sortNullsFirst =
4299 			(bool *) palloc(numSortCols * sizeof(bool));
4300 
4301 		i = 0;
4302 		foreach(lc, sortlist)
4303 		{
4304 			SortGroupClause *sortcl = (SortGroupClause *) lfirst(lc);
4305 			TargetEntry *tle = get_sortgroupclause_tle(sortcl, aggref->args);
4306 
4307 			/* the parser should have made sure of this */
4308 			Assert(OidIsValid(sortcl->sortop));
4309 
4310 			pertrans->sortColIdx[i] = tle->resno;
4311 			pertrans->sortOperators[i] = sortcl->sortop;
4312 			pertrans->sortCollations[i] = exprCollation((Node *) tle->expr);
4313 			pertrans->sortNullsFirst[i] = sortcl->nulls_first;
4314 			i++;
4315 		}
4316 		Assert(i == numSortCols);
4317 	}
4318 
4319 	if (aggref->aggdistinct)
4320 	{
4321 		Oid		   *ops;
4322 
4323 		Assert(numArguments > 0);
4324 		Assert(list_length(aggref->aggdistinct) == numDistinctCols);
4325 
4326 		ops = palloc(numDistinctCols * sizeof(Oid));
4327 
4328 		i = 0;
4329 		foreach(lc, aggref->aggdistinct)
4330 			ops[i++] = ((SortGroupClause *) lfirst(lc))->eqop;
4331 
4332 		/* lookup / build the necessary comparators */
4333 		if (numDistinctCols == 1)
4334 			fmgr_info(get_opcode(ops[0]), &pertrans->equalfnOne);
4335 		else
4336 			pertrans->equalfnMulti =
4337 				execTuplesMatchPrepare(pertrans->sortdesc,
4338 									   numDistinctCols,
4339 									   pertrans->sortColIdx,
4340 									   ops,
4341 									   pertrans->sortCollations,
4342 									   &aggstate->ss.ps);
4343 		pfree(ops);
4344 	}
4345 
4346 	pertrans->sortstates = (Tuplesortstate **)
4347 		palloc0(sizeof(Tuplesortstate *) * numGroupingSets);
4348 }
4349 
4350 
4351 static Datum
GetAggInitVal(Datum textInitVal,Oid transtype)4352 GetAggInitVal(Datum textInitVal, Oid transtype)
4353 {
4354 	Oid			typinput,
4355 				typioparam;
4356 	char	   *strInitVal;
4357 	Datum		initVal;
4358 
4359 	getTypeInputInfo(transtype, &typinput, &typioparam);
4360 	strInitVal = TextDatumGetCString(textInitVal);
4361 	initVal = OidInputFunctionCall(typinput, strInitVal,
4362 								   typioparam, -1);
4363 	pfree(strInitVal);
4364 	return initVal;
4365 }
4366 
4367 void
ExecEndAgg(AggState * node)4368 ExecEndAgg(AggState *node)
4369 {
4370 	PlanState  *outerPlan;
4371 	int			transno;
4372 	int			numGroupingSets = Max(node->maxsets, 1);
4373 	int			setno;
4374 
4375 	/*
4376 	 * When ending a parallel worker, copy the statistics gathered by the
4377 	 * worker back into shared memory so that it can be picked up by the main
4378 	 * process to report in EXPLAIN ANALYZE.
4379 	 */
4380 	if (node->shared_info && IsParallelWorker())
4381 	{
4382 		AggregateInstrumentation *si;
4383 
4384 		Assert(ParallelWorkerNumber <= node->shared_info->num_workers);
4385 		si = &node->shared_info->sinstrument[ParallelWorkerNumber];
4386 		si->hash_batches_used = node->hash_batches_used;
4387 		si->hash_disk_used = node->hash_disk_used;
4388 		si->hash_mem_peak = node->hash_mem_peak;
4389 	}
4390 
4391 	/* Make sure we have closed any open tuplesorts */
4392 
4393 	if (node->sort_in)
4394 		tuplesort_end(node->sort_in);
4395 	if (node->sort_out)
4396 		tuplesort_end(node->sort_out);
4397 
4398 	hashagg_reset_spill_state(node);
4399 
4400 	if (node->hash_metacxt != NULL)
4401 	{
4402 		MemoryContextDelete(node->hash_metacxt);
4403 		node->hash_metacxt = NULL;
4404 	}
4405 
4406 	for (transno = 0; transno < node->numtrans; transno++)
4407 	{
4408 		AggStatePerTrans pertrans = &node->pertrans[transno];
4409 
4410 		for (setno = 0; setno < numGroupingSets; setno++)
4411 		{
4412 			if (pertrans->sortstates[setno])
4413 				tuplesort_end(pertrans->sortstates[setno]);
4414 		}
4415 	}
4416 
4417 	/* And ensure any agg shutdown callbacks have been called */
4418 	for (setno = 0; setno < numGroupingSets; setno++)
4419 		ReScanExprContext(node->aggcontexts[setno]);
4420 	if (node->hashcontext)
4421 		ReScanExprContext(node->hashcontext);
4422 
4423 	/*
4424 	 * We don't actually free any ExprContexts here (see comment in
4425 	 * ExecFreeExprContext), just unlinking the output one from the plan node
4426 	 * suffices.
4427 	 */
4428 	ExecFreeExprContext(&node->ss.ps);
4429 
4430 	/* clean up tuple table */
4431 	ExecClearTuple(node->ss.ss_ScanTupleSlot);
4432 
4433 	outerPlan = outerPlanState(node);
4434 	ExecEndNode(outerPlan);
4435 }
4436 
4437 void
ExecReScanAgg(AggState * node)4438 ExecReScanAgg(AggState *node)
4439 {
4440 	ExprContext *econtext = node->ss.ps.ps_ExprContext;
4441 	PlanState  *outerPlan = outerPlanState(node);
4442 	Agg		   *aggnode = (Agg *) node->ss.ps.plan;
4443 	int			transno;
4444 	int			numGroupingSets = Max(node->maxsets, 1);
4445 	int			setno;
4446 
4447 	node->agg_done = false;
4448 
4449 	if (node->aggstrategy == AGG_HASHED)
4450 	{
4451 		/*
4452 		 * In the hashed case, if we haven't yet built the hash table then we
4453 		 * can just return; nothing done yet, so nothing to undo. If subnode's
4454 		 * chgParam is not NULL then it will be re-scanned by ExecProcNode,
4455 		 * else no reason to re-scan it at all.
4456 		 */
4457 		if (!node->table_filled)
4458 			return;
4459 
4460 		/*
4461 		 * If we do have the hash table, and it never spilled, and the subplan
4462 		 * does not have any parameter changes, and none of our own parameter
4463 		 * changes affect input expressions of the aggregated functions, then
4464 		 * we can just rescan the existing hash table; no need to build it
4465 		 * again.
4466 		 */
4467 		if (outerPlan->chgParam == NULL && !node->hash_ever_spilled &&
4468 			!bms_overlap(node->ss.ps.chgParam, aggnode->aggParams))
4469 		{
4470 			ResetTupleHashIterator(node->perhash[0].hashtable,
4471 								   &node->perhash[0].hashiter);
4472 			select_current_set(node, 0, true);
4473 			return;
4474 		}
4475 	}
4476 
4477 	/* Make sure we have closed any open tuplesorts */
4478 	for (transno = 0; transno < node->numtrans; transno++)
4479 	{
4480 		for (setno = 0; setno < numGroupingSets; setno++)
4481 		{
4482 			AggStatePerTrans pertrans = &node->pertrans[transno];
4483 
4484 			if (pertrans->sortstates[setno])
4485 			{
4486 				tuplesort_end(pertrans->sortstates[setno]);
4487 				pertrans->sortstates[setno] = NULL;
4488 			}
4489 		}
4490 	}
4491 
4492 	/*
4493 	 * We don't need to ReScanExprContext the output tuple context here;
4494 	 * ExecReScan already did it. But we do need to reset our per-grouping-set
4495 	 * contexts, which may have transvalues stored in them. (We use rescan
4496 	 * rather than just reset because transfns may have registered callbacks
4497 	 * that need to be run now.) For the AGG_HASHED case, see below.
4498 	 */
4499 
4500 	for (setno = 0; setno < numGroupingSets; setno++)
4501 	{
4502 		ReScanExprContext(node->aggcontexts[setno]);
4503 	}
4504 
4505 	/* Release first tuple of group, if we have made a copy */
4506 	if (node->grp_firstTuple != NULL)
4507 	{
4508 		heap_freetuple(node->grp_firstTuple);
4509 		node->grp_firstTuple = NULL;
4510 	}
4511 	ExecClearTuple(node->ss.ss_ScanTupleSlot);
4512 
4513 	/* Forget current agg values */
4514 	MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * node->numaggs);
4515 	MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * node->numaggs);
4516 
4517 	/*
4518 	 * With AGG_HASHED/MIXED, the hash table is allocated in a sub-context of
4519 	 * the hashcontext. This used to be an issue, but now, resetting a context
4520 	 * automatically deletes sub-contexts too.
4521 	 */
4522 	if (node->aggstrategy == AGG_HASHED || node->aggstrategy == AGG_MIXED)
4523 	{
4524 		hashagg_reset_spill_state(node);
4525 
4526 		node->hash_ever_spilled = false;
4527 		node->hash_spill_mode = false;
4528 		node->hash_ngroups_current = 0;
4529 
4530 		ReScanExprContext(node->hashcontext);
4531 		/* Rebuild an empty hash table */
4532 		build_hash_tables(node);
4533 		node->table_filled = false;
4534 		/* iterator will be reset when the table is filled */
4535 
4536 		hashagg_recompile_expressions(node, false, false);
4537 	}
4538 
4539 	if (node->aggstrategy != AGG_HASHED)
4540 	{
4541 		/*
4542 		 * Reset the per-group state (in particular, mark transvalues null)
4543 		 */
4544 		for (setno = 0; setno < numGroupingSets; setno++)
4545 		{
4546 			MemSet(node->pergroups[setno], 0,
4547 				   sizeof(AggStatePerGroupData) * node->numaggs);
4548 		}
4549 
4550 		/* reset to phase 1 */
4551 		initialize_phase(node, 1);
4552 
4553 		node->input_done = false;
4554 		node->projected_set = -1;
4555 	}
4556 
4557 	if (outerPlan->chgParam == NULL)
4558 		ExecReScan(outerPlan);
4559 }
4560 
4561 
4562 /***********************************************************************
4563  * API exposed to aggregate functions
4564  ***********************************************************************/
4565 
4566 
4567 /*
4568  * AggCheckCallContext - test if a SQL function is being called as an aggregate
4569  *
4570  * The transition and/or final functions of an aggregate may want to verify
4571  * that they are being called as aggregates, rather than as plain SQL
4572  * functions.  They should use this function to do so.  The return value
4573  * is nonzero if being called as an aggregate, or zero if not.  (Specific
4574  * nonzero values are AGG_CONTEXT_AGGREGATE or AGG_CONTEXT_WINDOW, but more
4575  * values could conceivably appear in future.)
4576  *
4577  * If aggcontext isn't NULL, the function also stores at *aggcontext the
4578  * identity of the memory context that aggregate transition values are being
4579  * stored in.  Note that the same aggregate call site (flinfo) may be called
4580  * interleaved on different transition values in different contexts, so it's
4581  * not kosher to cache aggcontext under fn_extra.  It is, however, kosher to
4582  * cache it in the transvalue itself (for internal-type transvalues).
4583  */
4584 int
AggCheckCallContext(FunctionCallInfo fcinfo,MemoryContext * aggcontext)4585 AggCheckCallContext(FunctionCallInfo fcinfo, MemoryContext *aggcontext)
4586 {
4587 	if (fcinfo->context && IsA(fcinfo->context, AggState))
4588 	{
4589 		if (aggcontext)
4590 		{
4591 			AggState   *aggstate = ((AggState *) fcinfo->context);
4592 			ExprContext *cxt = aggstate->curaggcontext;
4593 
4594 			*aggcontext = cxt->ecxt_per_tuple_memory;
4595 		}
4596 		return AGG_CONTEXT_AGGREGATE;
4597 	}
4598 	if (fcinfo->context && IsA(fcinfo->context, WindowAggState))
4599 	{
4600 		if (aggcontext)
4601 			*aggcontext = ((WindowAggState *) fcinfo->context)->curaggcontext;
4602 		return AGG_CONTEXT_WINDOW;
4603 	}
4604 
4605 	/* this is just to prevent "uninitialized variable" warnings */
4606 	if (aggcontext)
4607 		*aggcontext = NULL;
4608 	return 0;
4609 }
4610 
4611 /*
4612  * AggGetAggref - allow an aggregate support function to get its Aggref
4613  *
4614  * If the function is being called as an aggregate support function,
4615  * return the Aggref node for the aggregate call.  Otherwise, return NULL.
4616  *
4617  * Aggregates sharing the same inputs and transition functions can get
4618  * merged into a single transition calculation.  If the transition function
4619  * calls AggGetAggref, it will get some one of the Aggrefs for which it is
4620  * executing.  It must therefore not pay attention to the Aggref fields that
4621  * relate to the final function, as those are indeterminate.  But if a final
4622  * function calls AggGetAggref, it will get a precise result.
4623  *
4624  * Note that if an aggregate is being used as a window function, this will
4625  * return NULL.  We could provide a similar function to return the relevant
4626  * WindowFunc node in such cases, but it's not needed yet.
4627  */
4628 Aggref *
AggGetAggref(FunctionCallInfo fcinfo)4629 AggGetAggref(FunctionCallInfo fcinfo)
4630 {
4631 	if (fcinfo->context && IsA(fcinfo->context, AggState))
4632 	{
4633 		AggState   *aggstate = (AggState *) fcinfo->context;
4634 		AggStatePerAgg curperagg;
4635 		AggStatePerTrans curpertrans;
4636 
4637 		/* check curperagg (valid when in a final function) */
4638 		curperagg = aggstate->curperagg;
4639 
4640 		if (curperagg)
4641 			return curperagg->aggref;
4642 
4643 		/* check curpertrans (valid when in a transition function) */
4644 		curpertrans = aggstate->curpertrans;
4645 
4646 		if (curpertrans)
4647 			return curpertrans->aggref;
4648 	}
4649 	return NULL;
4650 }
4651 
4652 /*
4653  * AggGetTempMemoryContext - fetch short-term memory context for aggregates
4654  *
4655  * This is useful in agg final functions; the context returned is one that
4656  * the final function can safely reset as desired.  This isn't useful for
4657  * transition functions, since the context returned MAY (we don't promise)
4658  * be the same as the context those are called in.
4659  *
4660  * As above, this is currently not useful for aggs called as window functions.
4661  */
4662 MemoryContext
AggGetTempMemoryContext(FunctionCallInfo fcinfo)4663 AggGetTempMemoryContext(FunctionCallInfo fcinfo)
4664 {
4665 	if (fcinfo->context && IsA(fcinfo->context, AggState))
4666 	{
4667 		AggState   *aggstate = (AggState *) fcinfo->context;
4668 
4669 		return aggstate->tmpcontext->ecxt_per_tuple_memory;
4670 	}
4671 	return NULL;
4672 }
4673 
4674 /*
4675  * AggStateIsShared - find out whether transition state is shared
4676  *
4677  * If the function is being called as an aggregate support function,
4678  * return true if the aggregate's transition state is shared across
4679  * multiple aggregates, false if it is not.
4680  *
4681  * Returns true if not called as an aggregate support function.
4682  * This is intended as a conservative answer, ie "no you'd better not
4683  * scribble on your input".  In particular, will return true if the
4684  * aggregate is being used as a window function, which is a scenario
4685  * in which changing the transition state is a bad idea.  We might
4686  * want to refine the behavior for the window case in future.
4687  */
4688 bool
AggStateIsShared(FunctionCallInfo fcinfo)4689 AggStateIsShared(FunctionCallInfo fcinfo)
4690 {
4691 	if (fcinfo->context && IsA(fcinfo->context, AggState))
4692 	{
4693 		AggState   *aggstate = (AggState *) fcinfo->context;
4694 		AggStatePerAgg curperagg;
4695 		AggStatePerTrans curpertrans;
4696 
4697 		/* check curperagg (valid when in a final function) */
4698 		curperagg = aggstate->curperagg;
4699 
4700 		if (curperagg)
4701 			return aggstate->pertrans[curperagg->transno].aggshared;
4702 
4703 		/* check curpertrans (valid when in a transition function) */
4704 		curpertrans = aggstate->curpertrans;
4705 
4706 		if (curpertrans)
4707 			return curpertrans->aggshared;
4708 	}
4709 	return true;
4710 }
4711 
4712 /*
4713  * AggRegisterCallback - register a cleanup callback for an aggregate
4714  *
4715  * This is useful for aggs to register shutdown callbacks, which will ensure
4716  * that non-memory resources are freed.  The callback will occur just before
4717  * the associated aggcontext (as returned by AggCheckCallContext) is reset,
4718  * either between groups or as a result of rescanning the query.  The callback
4719  * will NOT be called on error paths.  The typical use-case is for freeing of
4720  * tuplestores or tuplesorts maintained in aggcontext, or pins held by slots
4721  * created by the agg functions.  (The callback will not be called until after
4722  * the result of the finalfn is no longer needed, so it's safe for the finalfn
4723  * to return data that will be freed by the callback.)
4724  *
4725  * As above, this is currently not useful for aggs called as window functions.
4726  */
4727 void
AggRegisterCallback(FunctionCallInfo fcinfo,ExprContextCallbackFunction func,Datum arg)4728 AggRegisterCallback(FunctionCallInfo fcinfo,
4729 					ExprContextCallbackFunction func,
4730 					Datum arg)
4731 {
4732 	if (fcinfo->context && IsA(fcinfo->context, AggState))
4733 	{
4734 		AggState   *aggstate = (AggState *) fcinfo->context;
4735 		ExprContext *cxt = aggstate->curaggcontext;
4736 
4737 		RegisterExprContextCallback(cxt, func, arg);
4738 
4739 		return;
4740 	}
4741 	elog(ERROR, "aggregate function cannot register a callback in this context");
4742 }
4743 
4744 
4745 /* ----------------------------------------------------------------
4746  *						Parallel Query Support
4747  * ----------------------------------------------------------------
4748  */
4749 
4750  /* ----------------------------------------------------------------
4751   *		ExecAggEstimate
4752   *
4753   *		Estimate space required to propagate aggregate statistics.
4754   * ----------------------------------------------------------------
4755   */
4756 void
ExecAggEstimate(AggState * node,ParallelContext * pcxt)4757 ExecAggEstimate(AggState *node, ParallelContext *pcxt)
4758 {
4759 	Size		size;
4760 
4761 	/* don't need this if not instrumenting or no workers */
4762 	if (!node->ss.ps.instrument || pcxt->nworkers == 0)
4763 		return;
4764 
4765 	size = mul_size(pcxt->nworkers, sizeof(AggregateInstrumentation));
4766 	size = add_size(size, offsetof(SharedAggInfo, sinstrument));
4767 	shm_toc_estimate_chunk(&pcxt->estimator, size);
4768 	shm_toc_estimate_keys(&pcxt->estimator, 1);
4769 }
4770 
4771 /* ----------------------------------------------------------------
4772  *		ExecAggInitializeDSM
4773  *
4774  *		Initialize DSM space for aggregate statistics.
4775  * ----------------------------------------------------------------
4776  */
4777 void
ExecAggInitializeDSM(AggState * node,ParallelContext * pcxt)4778 ExecAggInitializeDSM(AggState *node, ParallelContext *pcxt)
4779 {
4780 	Size		size;
4781 
4782 	/* don't need this if not instrumenting or no workers */
4783 	if (!node->ss.ps.instrument || pcxt->nworkers == 0)
4784 		return;
4785 
4786 	size = offsetof(SharedAggInfo, sinstrument)
4787 		+ pcxt->nworkers * sizeof(AggregateInstrumentation);
4788 	node->shared_info = shm_toc_allocate(pcxt->toc, size);
4789 	/* ensure any unfilled slots will contain zeroes */
4790 	memset(node->shared_info, 0, size);
4791 	node->shared_info->num_workers = pcxt->nworkers;
4792 	shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id,
4793 				   node->shared_info);
4794 }
4795 
4796 /* ----------------------------------------------------------------
4797  *		ExecAggInitializeWorker
4798  *
4799  *		Attach worker to DSM space for aggregate statistics.
4800  * ----------------------------------------------------------------
4801  */
4802 void
ExecAggInitializeWorker(AggState * node,ParallelWorkerContext * pwcxt)4803 ExecAggInitializeWorker(AggState *node, ParallelWorkerContext *pwcxt)
4804 {
4805 	node->shared_info =
4806 		shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, true);
4807 }
4808 
4809 /* ----------------------------------------------------------------
4810  *		ExecAggRetrieveInstrumentation
4811  *
4812  *		Transfer aggregate statistics from DSM to private memory.
4813  * ----------------------------------------------------------------
4814  */
4815 void
ExecAggRetrieveInstrumentation(AggState * node)4816 ExecAggRetrieveInstrumentation(AggState *node)
4817 {
4818 	Size		size;
4819 	SharedAggInfo *si;
4820 
4821 	if (node->shared_info == NULL)
4822 		return;
4823 
4824 	size = offsetof(SharedAggInfo, sinstrument)
4825 		+ node->shared_info->num_workers * sizeof(AggregateInstrumentation);
4826 	si = palloc(size);
4827 	memcpy(si, node->shared_info, size);
4828 	node->shared_info = si;
4829 }
4830