1 /*-------------------------------------------------------------------------
2 *
3 * nodeAgg.c
4 * Routines to handle aggregate nodes.
5 *
6 * ExecAgg normally evaluates each aggregate in the following steps:
7 *
8 * transvalue = initcond
9 * foreach input_tuple do
10 * transvalue = transfunc(transvalue, input_value(s))
11 * result = finalfunc(transvalue, direct_argument(s))
12 *
13 * If a finalfunc is not supplied then the result is just the ending
14 * value of transvalue.
15 *
16 * Other behaviors can be selected by the "aggsplit" mode, which exists
17 * to support partial aggregation. It is possible to:
18 * * Skip running the finalfunc, so that the output is always the
19 * final transvalue state.
20 * * Substitute the combinefunc for the transfunc, so that transvalue
21 * states (propagated up from a child partial-aggregation step) are merged
22 * rather than processing raw input rows. (The statements below about
23 * the transfunc apply equally to the combinefunc, when it's selected.)
24 * * Apply the serializefunc to the output values (this only makes sense
25 * when skipping the finalfunc, since the serializefunc works on the
26 * transvalue data type).
27 * * Apply the deserializefunc to the input values (this only makes sense
28 * when using the combinefunc, for similar reasons).
29 * It is the planner's responsibility to connect up Agg nodes using these
30 * alternate behaviors in a way that makes sense, with partial aggregation
31 * results being fed to nodes that expect them.
32 *
33 * If a normal aggregate call specifies DISTINCT or ORDER BY, we sort the
34 * input tuples and eliminate duplicates (if required) before performing
35 * the above-depicted process. (However, we don't do that for ordered-set
36 * aggregates; their "ORDER BY" inputs are ordinary aggregate arguments
37 * so far as this module is concerned.) Note that partial aggregation
38 * is not supported in these cases, since we couldn't ensure global
39 * ordering or distinctness of the inputs.
40 *
41 * If transfunc is marked "strict" in pg_proc and initcond is NULL,
42 * then the first non-NULL input_value is assigned directly to transvalue,
43 * and transfunc isn't applied until the second non-NULL input_value.
44 * The agg's first input type and transtype must be the same in this case!
45 *
46 * If transfunc is marked "strict" then NULL input_values are skipped,
47 * keeping the previous transvalue. If transfunc is not strict then it
48 * is called for every input tuple and must deal with NULL initcond
49 * or NULL input_values for itself.
50 *
51 * If finalfunc is marked "strict" then it is not called when the
52 * ending transvalue is NULL, instead a NULL result is created
53 * automatically (this is just the usual handling of strict functions,
54 * of course). A non-strict finalfunc can make its own choice of
55 * what to return for a NULL ending transvalue.
56 *
57 * Ordered-set aggregates are treated specially in one other way: we
58 * evaluate any "direct" arguments and pass them to the finalfunc along
59 * with the transition value.
60 *
61 * A finalfunc can have additional arguments beyond the transvalue and
62 * any "direct" arguments, corresponding to the input arguments of the
63 * aggregate. These are always just passed as NULL. Such arguments may be
64 * needed to allow resolution of a polymorphic aggregate's result type.
65 *
66 * We compute aggregate input expressions and run the transition functions
67 * in a temporary econtext (aggstate->tmpcontext). This is reset at least
68 * once per input tuple, so when the transvalue datatype is
69 * pass-by-reference, we have to be careful to copy it into a longer-lived
70 * memory context, and free the prior value to avoid memory leakage. We
71 * store transvalues in another set of econtexts, aggstate->aggcontexts
72 * (one per grouping set, see below), which are also used for the hashtable
73 * structures in AGG_HASHED mode. These econtexts are rescanned, not just
74 * reset, at group boundaries so that aggregate transition functions can
75 * register shutdown callbacks via AggRegisterCallback.
76 *
77 * The node's regular econtext (aggstate->ss.ps.ps_ExprContext) is used to
78 * run finalize functions and compute the output tuple; this context can be
79 * reset once per output tuple.
80 *
81 * The executor's AggState node is passed as the fmgr "context" value in
82 * all transfunc and finalfunc calls. It is not recommended that the
83 * transition functions look at the AggState node directly, but they can
84 * use AggCheckCallContext() to verify that they are being called by
85 * nodeAgg.c (and not as ordinary SQL functions). The main reason a
86 * transition function might want to know this is so that it can avoid
87 * palloc'ing a fixed-size pass-by-ref transition value on every call:
88 * it can instead just scribble on and return its left input. Ordinarily
89 * it is completely forbidden for functions to modify pass-by-ref inputs,
90 * but in the aggregate case we know the left input is either the initial
91 * transition value or a previous function result, and in either case its
92 * value need not be preserved. See int8inc() for an example. Notice that
93 * the EEOP_AGG_PLAIN_TRANS step is coded to avoid a data copy step when
94 * the previous transition value pointer is returned. It is also possible
95 * to avoid repeated data copying when the transition value is an expanded
96 * object: to do that, the transition function must take care to return
97 * an expanded object that is in a child context of the memory context
98 * returned by AggCheckCallContext(). Also, some transition functions want
99 * to store working state in addition to the nominal transition value; they
100 * can use the memory context returned by AggCheckCallContext() to do that.
101 *
102 * Note: AggCheckCallContext() is available as of PostgreSQL 9.0. The
103 * AggState is available as context in earlier releases (back to 8.1),
104 * but direct examination of the node is needed to use it before 9.0.
105 *
106 * As of 9.4, aggregate transition functions can also use AggGetAggref()
107 * to get hold of the Aggref expression node for their aggregate call.
108 * This is mainly intended for ordered-set aggregates, which are not
109 * supported as window functions. (A regular aggregate function would
110 * need some fallback logic to use this, since there's no Aggref node
111 * for a window function.)
112 *
113 * Grouping sets:
114 *
115 * A list of grouping sets which is structurally equivalent to a ROLLUP
116 * clause (e.g. (a,b,c), (a,b), (a)) can be processed in a single pass over
117 * ordered data. We do this by keeping a separate set of transition values
118 * for each grouping set being concurrently processed; for each input tuple
119 * we update them all, and on group boundaries we reset those states
120 * (starting at the front of the list) whose grouping values have changed
121 * (the list of grouping sets is ordered from most specific to least
122 * specific).
123 *
124 * Where more complex grouping sets are used, we break them down into
125 * "phases", where each phase has a different sort order (except phase 0
126 * which is reserved for hashing). During each phase but the last, the
127 * input tuples are additionally stored in a tuplesort which is keyed to the
128 * next phase's sort order; during each phase but the first, the input
129 * tuples are drawn from the previously sorted data. (The sorting of the
130 * data for the first phase is handled by the planner, as it might be
131 * satisfied by underlying nodes.)
132 *
133 * Hashing can be mixed with sorted grouping. To do this, we have an
134 * AGG_MIXED strategy that populates the hashtables during the first sorted
135 * phase, and switches to reading them out after completing all sort phases.
136 * We can also support AGG_HASHED with multiple hash tables and no sorting
137 * at all.
138 *
139 * From the perspective of aggregate transition and final functions, the
140 * only issue regarding grouping sets is this: a single call site (flinfo)
141 * of an aggregate function may be used for updating several different
142 * transition values in turn. So the function must not cache in the flinfo
143 * anything which logically belongs as part of the transition value (most
144 * importantly, the memory context in which the transition value exists).
145 * The support API functions (AggCheckCallContext, AggRegisterCallback) are
146 * sensitive to the grouping set for which the aggregate function is
147 * currently being called.
148 *
149 * Plan structure:
150 *
151 * What we get from the planner is actually one "real" Agg node which is
152 * part of the plan tree proper, but which optionally has an additional list
153 * of Agg nodes hung off the side via the "chain" field. This is because an
154 * Agg node happens to be a convenient representation of all the data we
155 * need for grouping sets.
156 *
157 * For many purposes, we treat the "real" node as if it were just the first
158 * node in the chain. The chain must be ordered such that hashed entries
159 * come before sorted/plain entries; the real node is marked AGG_MIXED if
160 * there are both types present (in which case the real node describes one
161 * of the hashed groupings, other AGG_HASHED nodes may optionally follow in
162 * the chain, followed in turn by AGG_SORTED or (one) AGG_PLAIN node). If
163 * the real node is marked AGG_HASHED or AGG_SORTED, then all the chained
164 * nodes must be of the same type; if it is AGG_PLAIN, there can be no
165 * chained nodes.
166 *
167 * We collect all hashed nodes into a single "phase", numbered 0, and create
168 * a sorted phase (numbered 1..n) for each AGG_SORTED or AGG_PLAIN node.
169 * Phase 0 is allocated even if there are no hashes, but remains unused in
170 * that case.
171 *
172 * AGG_HASHED nodes actually refer to only a single grouping set each,
173 * because for each hashed grouping we need a separate grpColIdx and
174 * numGroups estimate. AGG_SORTED nodes represent a "rollup", a list of
175 * grouping sets that share a sort order. Each AGG_SORTED node other than
176 * the first one has an associated Sort node which describes the sort order
177 * to be used; the first sorted node takes its input from the outer subtree,
178 * which the planner has already arranged to provide ordered data.
179 *
180 * Memory and ExprContext usage:
181 *
182 * Because we're accumulating aggregate values across input rows, we need to
183 * use more memory contexts than just simple input/output tuple contexts.
184 * In fact, for a rollup, we need a separate context for each grouping set
185 * so that we can reset the inner (finer-grained) aggregates on their group
186 * boundaries while continuing to accumulate values for outer
187 * (coarser-grained) groupings. On top of this, we might be simultaneously
188 * populating hashtables; however, we only need one context for all the
189 * hashtables.
190 *
191 * So we create an array, aggcontexts, with an ExprContext for each grouping
192 * set in the largest rollup that we're going to process, and use the
193 * per-tuple memory context of those ExprContexts to store the aggregate
194 * transition values. hashcontext is the single context created to support
195 * all hash tables.
196 *
197 * Spilling To Disk
198 *
199 * When performing hash aggregation, if the hash table memory exceeds the
200 * limit (see hash_agg_check_limits()), we enter "spill mode". In spill
201 * mode, we advance the transition states only for groups already in the
202 * hash table. For tuples that would need to create a new hash table
203 * entries (and initialize new transition states), we instead spill them to
204 * disk to be processed later. The tuples are spilled in a partitioned
205 * manner, so that subsequent batches are smaller and less likely to exceed
206 * hash_mem (if a batch does exceed hash_mem, it must be spilled
207 * recursively).
208 *
209 * Spilled data is written to logical tapes. These provide better control
210 * over memory usage, disk space, and the number of files than if we were
211 * to use a BufFile for each spill.
212 *
213 * Note that it's possible for transition states to start small but then
214 * grow very large; for instance in the case of ARRAY_AGG. In such cases,
215 * it's still possible to significantly exceed hash_mem. We try to avoid
216 * this situation by estimating what will fit in the available memory, and
217 * imposing a limit on the number of groups separately from the amount of
218 * memory consumed.
219 *
220 * Transition / Combine function invocation:
221 *
222 * For performance reasons transition functions, including combine
223 * functions, aren't invoked one-by-one from nodeAgg.c after computing
224 * arguments using the expression evaluation engine. Instead
225 * ExecBuildAggTrans() builds one large expression that does both argument
226 * evaluation and transition function invocation. That avoids performance
227 * issues due to repeated uses of expression evaluation, complications due
228 * to filter expressions having to be evaluated early, and allows to JIT
229 * the entire expression into one native function.
230 *
231 * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
232 * Portions Copyright (c) 1994, Regents of the University of California
233 *
234 * IDENTIFICATION
235 * src/backend/executor/nodeAgg.c
236 *
237 *-------------------------------------------------------------------------
238 */
239
240 #include "postgres.h"
241
242 #include "access/htup_details.h"
243 #include "access/parallel.h"
244 #include "catalog/objectaccess.h"
245 #include "catalog/pg_aggregate.h"
246 #include "catalog/pg_proc.h"
247 #include "catalog/pg_type.h"
248 #include "common/hashfn.h"
249 #include "executor/execExpr.h"
250 #include "executor/executor.h"
251 #include "executor/nodeAgg.h"
252 #include "lib/hyperloglog.h"
253 #include "miscadmin.h"
254 #include "nodes/makefuncs.h"
255 #include "nodes/nodeFuncs.h"
256 #include "optimizer/optimizer.h"
257 #include "parser/parse_agg.h"
258 #include "parser/parse_coerce.h"
259 #include "utils/acl.h"
260 #include "utils/builtins.h"
261 #include "utils/datum.h"
262 #include "utils/dynahash.h"
263 #include "utils/expandeddatum.h"
264 #include "utils/logtape.h"
265 #include "utils/lsyscache.h"
266 #include "utils/memutils.h"
267 #include "utils/syscache.h"
268 #include "utils/tuplesort.h"
269
270 /*
271 * Control how many partitions are created when spilling HashAgg to
272 * disk.
273 *
274 * HASHAGG_PARTITION_FACTOR is multiplied by the estimated number of
275 * partitions needed such that each partition will fit in memory. The factor
276 * is set higher than one because there's not a high cost to having a few too
277 * many partitions, and it makes it less likely that a partition will need to
278 * be spilled recursively. Another benefit of having more, smaller partitions
279 * is that small hash tables may perform better than large ones due to memory
280 * caching effects.
281 *
282 * We also specify a min and max number of partitions per spill. Too few might
283 * mean a lot of wasted I/O from repeated spilling of the same tuples. Too
284 * many will result in lots of memory wasted buffering the spill files (which
285 * could instead be spent on a larger hash table).
286 */
287 #define HASHAGG_PARTITION_FACTOR 1.50
288 #define HASHAGG_MIN_PARTITIONS 4
289 #define HASHAGG_MAX_PARTITIONS 1024
290
291 /*
292 * For reading from tapes, the buffer size must be a multiple of
293 * BLCKSZ. Larger values help when reading from multiple tapes concurrently,
294 * but that doesn't happen in HashAgg, so we simply use BLCKSZ. Writing to a
295 * tape always uses a buffer of size BLCKSZ.
296 */
297 #define HASHAGG_READ_BUFFER_SIZE BLCKSZ
298 #define HASHAGG_WRITE_BUFFER_SIZE BLCKSZ
299
300 /*
301 * HyperLogLog is used for estimating the cardinality of the spilled tuples in
302 * a given partition. 5 bits corresponds to a size of about 32 bytes and a
303 * worst-case error of around 18%. That's effective enough to choose a
304 * reasonable number of partitions when recursing.
305 */
306 #define HASHAGG_HLL_BIT_WIDTH 5
307
308 /*
309 * Estimate chunk overhead as a constant 16 bytes. XXX: should this be
310 * improved?
311 */
312 #define CHUNKHDRSZ 16
313
314 /*
315 * Track all tapes needed for a HashAgg that spills. We don't know the maximum
316 * number of tapes needed at the start of the algorithm (because it can
317 * recurse), so one tape set is allocated and extended as needed for new
318 * tapes. When a particular tape is already read, rewind it for write mode and
319 * put it in the free list.
320 *
321 * Tapes' buffers can take up substantial memory when many tapes are open at
322 * once. We only need one tape open at a time in read mode (using a buffer
323 * that's a multiple of BLCKSZ); but we need one tape open in write mode (each
324 * requiring a buffer of size BLCKSZ) for each partition.
325 */
326 typedef struct HashTapeInfo
327 {
328 LogicalTapeSet *tapeset;
329 int ntapes;
330 int *freetapes;
331 int nfreetapes;
332 int freetapes_alloc;
333 } HashTapeInfo;
334
335 /*
336 * Represents partitioned spill data for a single hashtable. Contains the
337 * necessary information to route tuples to the correct partition, and to
338 * transform the spilled data into new batches.
339 *
340 * The high bits are used for partition selection (when recursing, we ignore
341 * the bits that have already been used for partition selection at an earlier
342 * level).
343 */
344 typedef struct HashAggSpill
345 {
346 LogicalTapeSet *tapeset; /* borrowed reference to tape set */
347 int npartitions; /* number of partitions */
348 int *partitions; /* spill partition tape numbers */
349 int64 *ntuples; /* number of tuples in each partition */
350 uint32 mask; /* mask to find partition from hash value */
351 int shift; /* after masking, shift by this amount */
352 hyperLogLogState *hll_card; /* cardinality estimate for contents */
353 } HashAggSpill;
354
355 /*
356 * Represents work to be done for one pass of hash aggregation (with only one
357 * grouping set).
358 *
359 * Also tracks the bits of the hash already used for partition selection by
360 * earlier iterations, so that this batch can use new bits. If all bits have
361 * already been used, no partitioning will be done (any spilled data will go
362 * to a single output tape).
363 */
364 typedef struct HashAggBatch
365 {
366 int setno; /* grouping set */
367 int used_bits; /* number of bits of hash already used */
368 LogicalTapeSet *tapeset; /* borrowed reference to tape set */
369 int input_tapenum; /* input partition tape */
370 int64 input_tuples; /* number of tuples in this batch */
371 double input_card; /* estimated group cardinality */
372 } HashAggBatch;
373
374 /* used to find referenced colnos */
375 typedef struct FindColsContext
376 {
377 bool is_aggref; /* is under an aggref */
378 Bitmapset *aggregated; /* column references under an aggref */
379 Bitmapset *unaggregated; /* other column references */
380 } FindColsContext;
381
382 static void select_current_set(AggState *aggstate, int setno, bool is_hash);
383 static void initialize_phase(AggState *aggstate, int newphase);
384 static TupleTableSlot *fetch_input_tuple(AggState *aggstate);
385 static void initialize_aggregates(AggState *aggstate,
386 AggStatePerGroup *pergroups,
387 int numReset);
388 static void advance_transition_function(AggState *aggstate,
389 AggStatePerTrans pertrans,
390 AggStatePerGroup pergroupstate);
391 static void advance_aggregates(AggState *aggstate);
392 static void process_ordered_aggregate_single(AggState *aggstate,
393 AggStatePerTrans pertrans,
394 AggStatePerGroup pergroupstate);
395 static void process_ordered_aggregate_multi(AggState *aggstate,
396 AggStatePerTrans pertrans,
397 AggStatePerGroup pergroupstate);
398 static void finalize_aggregate(AggState *aggstate,
399 AggStatePerAgg peragg,
400 AggStatePerGroup pergroupstate,
401 Datum *resultVal, bool *resultIsNull);
402 static void finalize_partialaggregate(AggState *aggstate,
403 AggStatePerAgg peragg,
404 AggStatePerGroup pergroupstate,
405 Datum *resultVal, bool *resultIsNull);
406 static inline void prepare_hash_slot(AggStatePerHash perhash,
407 TupleTableSlot *inputslot,
408 TupleTableSlot *hashslot);
409 static void prepare_projection_slot(AggState *aggstate,
410 TupleTableSlot *slot,
411 int currentSet);
412 static void finalize_aggregates(AggState *aggstate,
413 AggStatePerAgg peragg,
414 AggStatePerGroup pergroup);
415 static TupleTableSlot *project_aggregates(AggState *aggstate);
416 static void find_cols(AggState *aggstate, Bitmapset **aggregated,
417 Bitmapset **unaggregated);
418 static bool find_cols_walker(Node *node, FindColsContext *context);
419 static void build_hash_tables(AggState *aggstate);
420 static void build_hash_table(AggState *aggstate, int setno, long nbuckets);
421 static void hashagg_recompile_expressions(AggState *aggstate, bool minslot,
422 bool nullcheck);
423 static long hash_choose_num_buckets(double hashentrysize,
424 long estimated_nbuckets,
425 Size memory);
426 static int hash_choose_num_partitions(double input_groups,
427 double hashentrysize,
428 int used_bits,
429 int *log2_npartittions);
430 static void initialize_hash_entry(AggState *aggstate,
431 TupleHashTable hashtable,
432 TupleHashEntry entry);
433 static void lookup_hash_entries(AggState *aggstate);
434 static TupleTableSlot *agg_retrieve_direct(AggState *aggstate);
435 static void agg_fill_hash_table(AggState *aggstate);
436 static bool agg_refill_hash_table(AggState *aggstate);
437 static TupleTableSlot *agg_retrieve_hash_table(AggState *aggstate);
438 static TupleTableSlot *agg_retrieve_hash_table_in_memory(AggState *aggstate);
439 static void hash_agg_check_limits(AggState *aggstate);
440 static void hash_agg_enter_spill_mode(AggState *aggstate);
441 static void hash_agg_update_metrics(AggState *aggstate, bool from_tape,
442 int npartitions);
443 static void hashagg_finish_initial_spills(AggState *aggstate);
444 static void hashagg_reset_spill_state(AggState *aggstate);
445 static HashAggBatch *hashagg_batch_new(LogicalTapeSet *tapeset,
446 int input_tapenum, int setno,
447 int64 input_tuples, double input_card,
448 int used_bits);
449 static MinimalTuple hashagg_batch_read(HashAggBatch *batch, uint32 *hashp);
450 static void hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo,
451 int used_bits, double input_groups,
452 double hashentrysize);
453 static Size hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
454 TupleTableSlot *slot, uint32 hash);
455 static void hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill,
456 int setno);
457 static void hashagg_tapeinfo_init(AggState *aggstate);
458 static void hashagg_tapeinfo_assign(HashTapeInfo *tapeinfo, int *dest,
459 int ndest);
460 static void hashagg_tapeinfo_release(HashTapeInfo *tapeinfo, int tapenum);
461 static Datum GetAggInitVal(Datum textInitVal, Oid transtype);
462 static void build_pertrans_for_aggref(AggStatePerTrans pertrans,
463 AggState *aggstate, EState *estate,
464 Aggref *aggref, Oid aggtransfn, Oid aggtranstype,
465 Oid aggserialfn, Oid aggdeserialfn,
466 Datum initValue, bool initValueIsNull,
467 Oid *inputTypes, int numArguments);
468
469
470 /*
471 * Select the current grouping set; affects current_set and
472 * curaggcontext.
473 */
474 static void
select_current_set(AggState * aggstate,int setno,bool is_hash)475 select_current_set(AggState *aggstate, int setno, bool is_hash)
476 {
477 /*
478 * When changing this, also adapt ExecAggPlainTransByVal() and
479 * ExecAggPlainTransByRef().
480 */
481 if (is_hash)
482 aggstate->curaggcontext = aggstate->hashcontext;
483 else
484 aggstate->curaggcontext = aggstate->aggcontexts[setno];
485
486 aggstate->current_set = setno;
487 }
488
489 /*
490 * Switch to phase "newphase", which must either be 0 or 1 (to reset) or
491 * current_phase + 1. Juggle the tuplesorts accordingly.
492 *
493 * Phase 0 is for hashing, which we currently handle last in the AGG_MIXED
494 * case, so when entering phase 0, all we need to do is drop open sorts.
495 */
496 static void
initialize_phase(AggState * aggstate,int newphase)497 initialize_phase(AggState *aggstate, int newphase)
498 {
499 Assert(newphase <= 1 || newphase == aggstate->current_phase + 1);
500
501 /*
502 * Whatever the previous state, we're now done with whatever input
503 * tuplesort was in use.
504 */
505 if (aggstate->sort_in)
506 {
507 tuplesort_end(aggstate->sort_in);
508 aggstate->sort_in = NULL;
509 }
510
511 if (newphase <= 1)
512 {
513 /*
514 * Discard any existing output tuplesort.
515 */
516 if (aggstate->sort_out)
517 {
518 tuplesort_end(aggstate->sort_out);
519 aggstate->sort_out = NULL;
520 }
521 }
522 else
523 {
524 /*
525 * The old output tuplesort becomes the new input one, and this is the
526 * right time to actually sort it.
527 */
528 aggstate->sort_in = aggstate->sort_out;
529 aggstate->sort_out = NULL;
530 Assert(aggstate->sort_in);
531 tuplesort_performsort(aggstate->sort_in);
532 }
533
534 /*
535 * If this isn't the last phase, we need to sort appropriately for the
536 * next phase in sequence.
537 */
538 if (newphase > 0 && newphase < aggstate->numphases - 1)
539 {
540 Sort *sortnode = aggstate->phases[newphase + 1].sortnode;
541 PlanState *outerNode = outerPlanState(aggstate);
542 TupleDesc tupDesc = ExecGetResultType(outerNode);
543
544 aggstate->sort_out = tuplesort_begin_heap(tupDesc,
545 sortnode->numCols,
546 sortnode->sortColIdx,
547 sortnode->sortOperators,
548 sortnode->collations,
549 sortnode->nullsFirst,
550 work_mem,
551 NULL, false);
552 }
553
554 aggstate->current_phase = newphase;
555 aggstate->phase = &aggstate->phases[newphase];
556 }
557
558 /*
559 * Fetch a tuple from either the outer plan (for phase 1) or from the sorter
560 * populated by the previous phase. Copy it to the sorter for the next phase
561 * if any.
562 *
563 * Callers cannot rely on memory for tuple in returned slot remaining valid
564 * past any subsequently fetched tuple.
565 */
566 static TupleTableSlot *
fetch_input_tuple(AggState * aggstate)567 fetch_input_tuple(AggState *aggstate)
568 {
569 TupleTableSlot *slot;
570
571 if (aggstate->sort_in)
572 {
573 /* make sure we check for interrupts in either path through here */
574 CHECK_FOR_INTERRUPTS();
575 if (!tuplesort_gettupleslot(aggstate->sort_in, true, false,
576 aggstate->sort_slot, NULL))
577 return NULL;
578 slot = aggstate->sort_slot;
579 }
580 else
581 slot = ExecProcNode(outerPlanState(aggstate));
582
583 if (!TupIsNull(slot) && aggstate->sort_out)
584 tuplesort_puttupleslot(aggstate->sort_out, slot);
585
586 return slot;
587 }
588
589 /*
590 * (Re)Initialize an individual aggregate.
591 *
592 * This function handles only one grouping set, already set in
593 * aggstate->current_set.
594 *
595 * When called, CurrentMemoryContext should be the per-query context.
596 */
597 static void
initialize_aggregate(AggState * aggstate,AggStatePerTrans pertrans,AggStatePerGroup pergroupstate)598 initialize_aggregate(AggState *aggstate, AggStatePerTrans pertrans,
599 AggStatePerGroup pergroupstate)
600 {
601 /*
602 * Start a fresh sort operation for each DISTINCT/ORDER BY aggregate.
603 */
604 if (pertrans->numSortCols > 0)
605 {
606 /*
607 * In case of rescan, maybe there could be an uncompleted sort
608 * operation? Clean it up if so.
609 */
610 if (pertrans->sortstates[aggstate->current_set])
611 tuplesort_end(pertrans->sortstates[aggstate->current_set]);
612
613
614 /*
615 * We use a plain Datum sorter when there's a single input column;
616 * otherwise sort the full tuple. (See comments for
617 * process_ordered_aggregate_single.)
618 */
619 if (pertrans->numInputs == 1)
620 {
621 Form_pg_attribute attr = TupleDescAttr(pertrans->sortdesc, 0);
622
623 pertrans->sortstates[aggstate->current_set] =
624 tuplesort_begin_datum(attr->atttypid,
625 pertrans->sortOperators[0],
626 pertrans->sortCollations[0],
627 pertrans->sortNullsFirst[0],
628 work_mem, NULL, false);
629 }
630 else
631 pertrans->sortstates[aggstate->current_set] =
632 tuplesort_begin_heap(pertrans->sortdesc,
633 pertrans->numSortCols,
634 pertrans->sortColIdx,
635 pertrans->sortOperators,
636 pertrans->sortCollations,
637 pertrans->sortNullsFirst,
638 work_mem, NULL, false);
639 }
640
641 /*
642 * (Re)set transValue to the initial value.
643 *
644 * Note that when the initial value is pass-by-ref, we must copy it (into
645 * the aggcontext) since we will pfree the transValue later.
646 */
647 if (pertrans->initValueIsNull)
648 pergroupstate->transValue = pertrans->initValue;
649 else
650 {
651 MemoryContext oldContext;
652
653 oldContext = MemoryContextSwitchTo(aggstate->curaggcontext->ecxt_per_tuple_memory);
654 pergroupstate->transValue = datumCopy(pertrans->initValue,
655 pertrans->transtypeByVal,
656 pertrans->transtypeLen);
657 MemoryContextSwitchTo(oldContext);
658 }
659 pergroupstate->transValueIsNull = pertrans->initValueIsNull;
660
661 /*
662 * If the initial value for the transition state doesn't exist in the
663 * pg_aggregate table then we will let the first non-NULL value returned
664 * from the outer procNode become the initial value. (This is useful for
665 * aggregates like max() and min().) The noTransValue flag signals that we
666 * still need to do this.
667 */
668 pergroupstate->noTransValue = pertrans->initValueIsNull;
669 }
670
671 /*
672 * Initialize all aggregate transition states for a new group of input values.
673 *
674 * If there are multiple grouping sets, we initialize only the first numReset
675 * of them (the grouping sets are ordered so that the most specific one, which
676 * is reset most often, is first). As a convenience, if numReset is 0, we
677 * reinitialize all sets.
678 *
679 * NB: This cannot be used for hash aggregates, as for those the grouping set
680 * number has to be specified from further up.
681 *
682 * When called, CurrentMemoryContext should be the per-query context.
683 */
684 static void
initialize_aggregates(AggState * aggstate,AggStatePerGroup * pergroups,int numReset)685 initialize_aggregates(AggState *aggstate,
686 AggStatePerGroup *pergroups,
687 int numReset)
688 {
689 int transno;
690 int numGroupingSets = Max(aggstate->phase->numsets, 1);
691 int setno = 0;
692 int numTrans = aggstate->numtrans;
693 AggStatePerTrans transstates = aggstate->pertrans;
694
695 if (numReset == 0)
696 numReset = numGroupingSets;
697
698 for (setno = 0; setno < numReset; setno++)
699 {
700 AggStatePerGroup pergroup = pergroups[setno];
701
702 select_current_set(aggstate, setno, false);
703
704 for (transno = 0; transno < numTrans; transno++)
705 {
706 AggStatePerTrans pertrans = &transstates[transno];
707 AggStatePerGroup pergroupstate = &pergroup[transno];
708
709 initialize_aggregate(aggstate, pertrans, pergroupstate);
710 }
711 }
712 }
713
714 /*
715 * Given new input value(s), advance the transition function of one aggregate
716 * state within one grouping set only (already set in aggstate->current_set)
717 *
718 * The new values (and null flags) have been preloaded into argument positions
719 * 1 and up in pertrans->transfn_fcinfo, so that we needn't copy them again to
720 * pass to the transition function. We also expect that the static fields of
721 * the fcinfo are already initialized; that was done by ExecInitAgg().
722 *
723 * It doesn't matter which memory context this is called in.
724 */
725 static void
advance_transition_function(AggState * aggstate,AggStatePerTrans pertrans,AggStatePerGroup pergroupstate)726 advance_transition_function(AggState *aggstate,
727 AggStatePerTrans pertrans,
728 AggStatePerGroup pergroupstate)
729 {
730 FunctionCallInfo fcinfo = pertrans->transfn_fcinfo;
731 MemoryContext oldContext;
732 Datum newVal;
733
734 if (pertrans->transfn.fn_strict)
735 {
736 /*
737 * For a strict transfn, nothing happens when there's a NULL input; we
738 * just keep the prior transValue.
739 */
740 int numTransInputs = pertrans->numTransInputs;
741 int i;
742
743 for (i = 1; i <= numTransInputs; i++)
744 {
745 if (fcinfo->args[i].isnull)
746 return;
747 }
748 if (pergroupstate->noTransValue)
749 {
750 /*
751 * transValue has not been initialized. This is the first non-NULL
752 * input value. We use it as the initial value for transValue. (We
753 * already checked that the agg's input type is binary-compatible
754 * with its transtype, so straight copy here is OK.)
755 *
756 * We must copy the datum into aggcontext if it is pass-by-ref. We
757 * do not need to pfree the old transValue, since it's NULL.
758 */
759 oldContext = MemoryContextSwitchTo(aggstate->curaggcontext->ecxt_per_tuple_memory);
760 pergroupstate->transValue = datumCopy(fcinfo->args[1].value,
761 pertrans->transtypeByVal,
762 pertrans->transtypeLen);
763 pergroupstate->transValueIsNull = false;
764 pergroupstate->noTransValue = false;
765 MemoryContextSwitchTo(oldContext);
766 return;
767 }
768 if (pergroupstate->transValueIsNull)
769 {
770 /*
771 * Don't call a strict function with NULL inputs. Note it is
772 * possible to get here despite the above tests, if the transfn is
773 * strict *and* returned a NULL on a prior cycle. If that happens
774 * we will propagate the NULL all the way to the end.
775 */
776 return;
777 }
778 }
779
780 /* We run the transition functions in per-input-tuple memory context */
781 oldContext = MemoryContextSwitchTo(aggstate->tmpcontext->ecxt_per_tuple_memory);
782
783 /* set up aggstate->curpertrans for AggGetAggref() */
784 aggstate->curpertrans = pertrans;
785
786 /*
787 * OK to call the transition function
788 */
789 fcinfo->args[0].value = pergroupstate->transValue;
790 fcinfo->args[0].isnull = pergroupstate->transValueIsNull;
791 fcinfo->isnull = false; /* just in case transfn doesn't set it */
792
793 newVal = FunctionCallInvoke(fcinfo);
794
795 aggstate->curpertrans = NULL;
796
797 /*
798 * If pass-by-ref datatype, must copy the new value into aggcontext and
799 * free the prior transValue. But if transfn returned a pointer to its
800 * first input, we don't need to do anything. Also, if transfn returned a
801 * pointer to a R/W expanded object that is already a child of the
802 * aggcontext, assume we can adopt that value without copying it.
803 *
804 * It's safe to compare newVal with pergroup->transValue without regard
805 * for either being NULL, because ExecAggTransReparent() takes care to set
806 * transValue to 0 when NULL. Otherwise we could end up accidentally not
807 * reparenting, when the transValue has the same numerical value as
808 * newValue, despite being NULL. This is a somewhat hot path, making it
809 * undesirable to instead solve this with another branch for the common
810 * case of the transition function returning its (modified) input
811 * argument.
812 */
813 if (!pertrans->transtypeByVal &&
814 DatumGetPointer(newVal) != DatumGetPointer(pergroupstate->transValue))
815 newVal = ExecAggTransReparent(aggstate, pertrans,
816 newVal, fcinfo->isnull,
817 pergroupstate->transValue,
818 pergroupstate->transValueIsNull);
819
820 pergroupstate->transValue = newVal;
821 pergroupstate->transValueIsNull = fcinfo->isnull;
822
823 MemoryContextSwitchTo(oldContext);
824 }
825
826 /*
827 * Advance each aggregate transition state for one input tuple. The input
828 * tuple has been stored in tmpcontext->ecxt_outertuple, so that it is
829 * accessible to ExecEvalExpr.
830 *
831 * We have two sets of transition states to handle: one for sorted aggregation
832 * and one for hashed; we do them both here, to avoid multiple evaluation of
833 * the inputs.
834 *
835 * When called, CurrentMemoryContext should be the per-query context.
836 */
837 static void
advance_aggregates(AggState * aggstate)838 advance_aggregates(AggState *aggstate)
839 {
840 bool dummynull;
841
842 ExecEvalExprSwitchContext(aggstate->phase->evaltrans,
843 aggstate->tmpcontext,
844 &dummynull);
845 }
846
847 /*
848 * Run the transition function for a DISTINCT or ORDER BY aggregate
849 * with only one input. This is called after we have completed
850 * entering all the input values into the sort object. We complete the
851 * sort, read out the values in sorted order, and run the transition
852 * function on each value (applying DISTINCT if appropriate).
853 *
854 * Note that the strictness of the transition function was checked when
855 * entering the values into the sort, so we don't check it again here;
856 * we just apply standard SQL DISTINCT logic.
857 *
858 * The one-input case is handled separately from the multi-input case
859 * for performance reasons: for single by-value inputs, such as the
860 * common case of count(distinct id), the tuplesort_getdatum code path
861 * is around 300% faster. (The speedup for by-reference types is less
862 * but still noticeable.)
863 *
864 * This function handles only one grouping set (already set in
865 * aggstate->current_set).
866 *
867 * When called, CurrentMemoryContext should be the per-query context.
868 */
869 static void
process_ordered_aggregate_single(AggState * aggstate,AggStatePerTrans pertrans,AggStatePerGroup pergroupstate)870 process_ordered_aggregate_single(AggState *aggstate,
871 AggStatePerTrans pertrans,
872 AggStatePerGroup pergroupstate)
873 {
874 Datum oldVal = (Datum) 0;
875 bool oldIsNull = true;
876 bool haveOldVal = false;
877 MemoryContext workcontext = aggstate->tmpcontext->ecxt_per_tuple_memory;
878 MemoryContext oldContext;
879 bool isDistinct = (pertrans->numDistinctCols > 0);
880 Datum newAbbrevVal = (Datum) 0;
881 Datum oldAbbrevVal = (Datum) 0;
882 FunctionCallInfo fcinfo = pertrans->transfn_fcinfo;
883 Datum *newVal;
884 bool *isNull;
885
886 Assert(pertrans->numDistinctCols < 2);
887
888 tuplesort_performsort(pertrans->sortstates[aggstate->current_set]);
889
890 /* Load the column into argument 1 (arg 0 will be transition value) */
891 newVal = &fcinfo->args[1].value;
892 isNull = &fcinfo->args[1].isnull;
893
894 /*
895 * Note: if input type is pass-by-ref, the datums returned by the sort are
896 * freshly palloc'd in the per-query context, so we must be careful to
897 * pfree them when they are no longer needed.
898 */
899
900 while (tuplesort_getdatum(pertrans->sortstates[aggstate->current_set],
901 true, newVal, isNull, &newAbbrevVal))
902 {
903 /*
904 * Clear and select the working context for evaluation of the equality
905 * function and transition function.
906 */
907 MemoryContextReset(workcontext);
908 oldContext = MemoryContextSwitchTo(workcontext);
909
910 /*
911 * If DISTINCT mode, and not distinct from prior, skip it.
912 */
913 if (isDistinct &&
914 haveOldVal &&
915 ((oldIsNull && *isNull) ||
916 (!oldIsNull && !*isNull &&
917 oldAbbrevVal == newAbbrevVal &&
918 DatumGetBool(FunctionCall2Coll(&pertrans->equalfnOne,
919 pertrans->aggCollation,
920 oldVal, *newVal)))))
921 {
922 /* equal to prior, so forget this one */
923 if (!pertrans->inputtypeByVal && !*isNull)
924 pfree(DatumGetPointer(*newVal));
925 }
926 else
927 {
928 advance_transition_function(aggstate, pertrans, pergroupstate);
929 /* forget the old value, if any */
930 if (!oldIsNull && !pertrans->inputtypeByVal)
931 pfree(DatumGetPointer(oldVal));
932 /* and remember the new one for subsequent equality checks */
933 oldVal = *newVal;
934 oldAbbrevVal = newAbbrevVal;
935 oldIsNull = *isNull;
936 haveOldVal = true;
937 }
938
939 MemoryContextSwitchTo(oldContext);
940 }
941
942 if (!oldIsNull && !pertrans->inputtypeByVal)
943 pfree(DatumGetPointer(oldVal));
944
945 tuplesort_end(pertrans->sortstates[aggstate->current_set]);
946 pertrans->sortstates[aggstate->current_set] = NULL;
947 }
948
949 /*
950 * Run the transition function for a DISTINCT or ORDER BY aggregate
951 * with more than one input. This is called after we have completed
952 * entering all the input values into the sort object. We complete the
953 * sort, read out the values in sorted order, and run the transition
954 * function on each value (applying DISTINCT if appropriate).
955 *
956 * This function handles only one grouping set (already set in
957 * aggstate->current_set).
958 *
959 * When called, CurrentMemoryContext should be the per-query context.
960 */
961 static void
process_ordered_aggregate_multi(AggState * aggstate,AggStatePerTrans pertrans,AggStatePerGroup pergroupstate)962 process_ordered_aggregate_multi(AggState *aggstate,
963 AggStatePerTrans pertrans,
964 AggStatePerGroup pergroupstate)
965 {
966 ExprContext *tmpcontext = aggstate->tmpcontext;
967 FunctionCallInfo fcinfo = pertrans->transfn_fcinfo;
968 TupleTableSlot *slot1 = pertrans->sortslot;
969 TupleTableSlot *slot2 = pertrans->uniqslot;
970 int numTransInputs = pertrans->numTransInputs;
971 int numDistinctCols = pertrans->numDistinctCols;
972 Datum newAbbrevVal = (Datum) 0;
973 Datum oldAbbrevVal = (Datum) 0;
974 bool haveOldValue = false;
975 TupleTableSlot *save = aggstate->tmpcontext->ecxt_outertuple;
976 int i;
977
978 tuplesort_performsort(pertrans->sortstates[aggstate->current_set]);
979
980 ExecClearTuple(slot1);
981 if (slot2)
982 ExecClearTuple(slot2);
983
984 while (tuplesort_gettupleslot(pertrans->sortstates[aggstate->current_set],
985 true, true, slot1, &newAbbrevVal))
986 {
987 CHECK_FOR_INTERRUPTS();
988
989 tmpcontext->ecxt_outertuple = slot1;
990 tmpcontext->ecxt_innertuple = slot2;
991
992 if (numDistinctCols == 0 ||
993 !haveOldValue ||
994 newAbbrevVal != oldAbbrevVal ||
995 !ExecQual(pertrans->equalfnMulti, tmpcontext))
996 {
997 /*
998 * Extract the first numTransInputs columns as datums to pass to
999 * the transfn.
1000 */
1001 slot_getsomeattrs(slot1, numTransInputs);
1002
1003 /* Load values into fcinfo */
1004 /* Start from 1, since the 0th arg will be the transition value */
1005 for (i = 0; i < numTransInputs; i++)
1006 {
1007 fcinfo->args[i + 1].value = slot1->tts_values[i];
1008 fcinfo->args[i + 1].isnull = slot1->tts_isnull[i];
1009 }
1010
1011 advance_transition_function(aggstate, pertrans, pergroupstate);
1012
1013 if (numDistinctCols > 0)
1014 {
1015 /* swap the slot pointers to retain the current tuple */
1016 TupleTableSlot *tmpslot = slot2;
1017
1018 slot2 = slot1;
1019 slot1 = tmpslot;
1020 /* avoid ExecQual() calls by reusing abbreviated keys */
1021 oldAbbrevVal = newAbbrevVal;
1022 haveOldValue = true;
1023 }
1024 }
1025
1026 /* Reset context each time */
1027 ResetExprContext(tmpcontext);
1028
1029 ExecClearTuple(slot1);
1030 }
1031
1032 if (slot2)
1033 ExecClearTuple(slot2);
1034
1035 tuplesort_end(pertrans->sortstates[aggstate->current_set]);
1036 pertrans->sortstates[aggstate->current_set] = NULL;
1037
1038 /* restore previous slot, potentially in use for grouping sets */
1039 tmpcontext->ecxt_outertuple = save;
1040 }
1041
1042 /*
1043 * Compute the final value of one aggregate.
1044 *
1045 * This function handles only one grouping set (already set in
1046 * aggstate->current_set).
1047 *
1048 * The finalfn will be run, and the result delivered, in the
1049 * output-tuple context; caller's CurrentMemoryContext does not matter.
1050 *
1051 * The finalfn uses the state as set in the transno. This also might be
1052 * being used by another aggregate function, so it's important that we do
1053 * nothing destructive here.
1054 */
1055 static void
finalize_aggregate(AggState * aggstate,AggStatePerAgg peragg,AggStatePerGroup pergroupstate,Datum * resultVal,bool * resultIsNull)1056 finalize_aggregate(AggState *aggstate,
1057 AggStatePerAgg peragg,
1058 AggStatePerGroup pergroupstate,
1059 Datum *resultVal, bool *resultIsNull)
1060 {
1061 LOCAL_FCINFO(fcinfo, FUNC_MAX_ARGS);
1062 bool anynull = false;
1063 MemoryContext oldContext;
1064 int i;
1065 ListCell *lc;
1066 AggStatePerTrans pertrans = &aggstate->pertrans[peragg->transno];
1067
1068 oldContext = MemoryContextSwitchTo(aggstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
1069
1070 /*
1071 * Evaluate any direct arguments. We do this even if there's no finalfn
1072 * (which is unlikely anyway), so that side-effects happen as expected.
1073 * The direct arguments go into arg positions 1 and up, leaving position 0
1074 * for the transition state value.
1075 */
1076 i = 1;
1077 foreach(lc, peragg->aggdirectargs)
1078 {
1079 ExprState *expr = (ExprState *) lfirst(lc);
1080
1081 fcinfo->args[i].value = ExecEvalExpr(expr,
1082 aggstate->ss.ps.ps_ExprContext,
1083 &fcinfo->args[i].isnull);
1084 anynull |= fcinfo->args[i].isnull;
1085 i++;
1086 }
1087
1088 /*
1089 * Apply the agg's finalfn if one is provided, else return transValue.
1090 */
1091 if (OidIsValid(peragg->finalfn_oid))
1092 {
1093 int numFinalArgs = peragg->numFinalArgs;
1094
1095 /* set up aggstate->curperagg for AggGetAggref() */
1096 aggstate->curperagg = peragg;
1097
1098 InitFunctionCallInfoData(*fcinfo, &peragg->finalfn,
1099 numFinalArgs,
1100 pertrans->aggCollation,
1101 (void *) aggstate, NULL);
1102
1103 /* Fill in the transition state value */
1104 fcinfo->args[0].value =
1105 MakeExpandedObjectReadOnly(pergroupstate->transValue,
1106 pergroupstate->transValueIsNull,
1107 pertrans->transtypeLen);
1108 fcinfo->args[0].isnull = pergroupstate->transValueIsNull;
1109 anynull |= pergroupstate->transValueIsNull;
1110
1111 /* Fill any remaining argument positions with nulls */
1112 for (; i < numFinalArgs; i++)
1113 {
1114 fcinfo->args[i].value = (Datum) 0;
1115 fcinfo->args[i].isnull = true;
1116 anynull = true;
1117 }
1118
1119 if (fcinfo->flinfo->fn_strict && anynull)
1120 {
1121 /* don't call a strict function with NULL inputs */
1122 *resultVal = (Datum) 0;
1123 *resultIsNull = true;
1124 }
1125 else
1126 {
1127 *resultVal = FunctionCallInvoke(fcinfo);
1128 *resultIsNull = fcinfo->isnull;
1129 }
1130 aggstate->curperagg = NULL;
1131 }
1132 else
1133 {
1134 /* Don't need MakeExpandedObjectReadOnly; datumCopy will copy it */
1135 *resultVal = pergroupstate->transValue;
1136 *resultIsNull = pergroupstate->transValueIsNull;
1137 }
1138
1139 /*
1140 * If result is pass-by-ref, make sure it is in the right context.
1141 */
1142 if (!peragg->resulttypeByVal && !*resultIsNull &&
1143 !MemoryContextContains(CurrentMemoryContext,
1144 DatumGetPointer(*resultVal)))
1145 *resultVal = datumCopy(*resultVal,
1146 peragg->resulttypeByVal,
1147 peragg->resulttypeLen);
1148
1149 MemoryContextSwitchTo(oldContext);
1150 }
1151
1152 /*
1153 * Compute the output value of one partial aggregate.
1154 *
1155 * The serialization function will be run, and the result delivered, in the
1156 * output-tuple context; caller's CurrentMemoryContext does not matter.
1157 */
1158 static void
finalize_partialaggregate(AggState * aggstate,AggStatePerAgg peragg,AggStatePerGroup pergroupstate,Datum * resultVal,bool * resultIsNull)1159 finalize_partialaggregate(AggState *aggstate,
1160 AggStatePerAgg peragg,
1161 AggStatePerGroup pergroupstate,
1162 Datum *resultVal, bool *resultIsNull)
1163 {
1164 AggStatePerTrans pertrans = &aggstate->pertrans[peragg->transno];
1165 MemoryContext oldContext;
1166
1167 oldContext = MemoryContextSwitchTo(aggstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
1168
1169 /*
1170 * serialfn_oid will be set if we must serialize the transvalue before
1171 * returning it
1172 */
1173 if (OidIsValid(pertrans->serialfn_oid))
1174 {
1175 /* Don't call a strict serialization function with NULL input. */
1176 if (pertrans->serialfn.fn_strict && pergroupstate->transValueIsNull)
1177 {
1178 *resultVal = (Datum) 0;
1179 *resultIsNull = true;
1180 }
1181 else
1182 {
1183 FunctionCallInfo fcinfo = pertrans->serialfn_fcinfo;
1184
1185 fcinfo->args[0].value =
1186 MakeExpandedObjectReadOnly(pergroupstate->transValue,
1187 pergroupstate->transValueIsNull,
1188 pertrans->transtypeLen);
1189 fcinfo->args[0].isnull = pergroupstate->transValueIsNull;
1190 fcinfo->isnull = false;
1191
1192 *resultVal = FunctionCallInvoke(fcinfo);
1193 *resultIsNull = fcinfo->isnull;
1194 }
1195 }
1196 else
1197 {
1198 /* Don't need MakeExpandedObjectReadOnly; datumCopy will copy it */
1199 *resultVal = pergroupstate->transValue;
1200 *resultIsNull = pergroupstate->transValueIsNull;
1201 }
1202
1203 /* If result is pass-by-ref, make sure it is in the right context. */
1204 if (!peragg->resulttypeByVal && !*resultIsNull &&
1205 !MemoryContextContains(CurrentMemoryContext,
1206 DatumGetPointer(*resultVal)))
1207 *resultVal = datumCopy(*resultVal,
1208 peragg->resulttypeByVal,
1209 peragg->resulttypeLen);
1210
1211 MemoryContextSwitchTo(oldContext);
1212 }
1213
1214 /*
1215 * Extract the attributes that make up the grouping key into the
1216 * hashslot. This is necessary to compute the hash or perform a lookup.
1217 */
1218 static inline void
prepare_hash_slot(AggStatePerHash perhash,TupleTableSlot * inputslot,TupleTableSlot * hashslot)1219 prepare_hash_slot(AggStatePerHash perhash,
1220 TupleTableSlot *inputslot,
1221 TupleTableSlot *hashslot)
1222 {
1223 int i;
1224
1225 /* transfer just the needed columns into hashslot */
1226 slot_getsomeattrs(inputslot, perhash->largestGrpColIdx);
1227 ExecClearTuple(hashslot);
1228
1229 for (i = 0; i < perhash->numhashGrpCols; i++)
1230 {
1231 int varNumber = perhash->hashGrpColIdxInput[i] - 1;
1232
1233 hashslot->tts_values[i] = inputslot->tts_values[varNumber];
1234 hashslot->tts_isnull[i] = inputslot->tts_isnull[varNumber];
1235 }
1236 ExecStoreVirtualTuple(hashslot);
1237 }
1238
1239 /*
1240 * Prepare to finalize and project based on the specified representative tuple
1241 * slot and grouping set.
1242 *
1243 * In the specified tuple slot, force to null all attributes that should be
1244 * read as null in the context of the current grouping set. Also stash the
1245 * current group bitmap where GroupingExpr can get at it.
1246 *
1247 * This relies on three conditions:
1248 *
1249 * 1) Nothing is ever going to try and extract the whole tuple from this slot,
1250 * only reference it in evaluations, which will only access individual
1251 * attributes.
1252 *
1253 * 2) No system columns are going to need to be nulled. (If a system column is
1254 * referenced in a group clause, it is actually projected in the outer plan
1255 * tlist.)
1256 *
1257 * 3) Within a given phase, we never need to recover the value of an attribute
1258 * once it has been set to null.
1259 *
1260 * Poking into the slot this way is a bit ugly, but the consensus is that the
1261 * alternative was worse.
1262 */
1263 static void
prepare_projection_slot(AggState * aggstate,TupleTableSlot * slot,int currentSet)1264 prepare_projection_slot(AggState *aggstate, TupleTableSlot *slot, int currentSet)
1265 {
1266 if (aggstate->phase->grouped_cols)
1267 {
1268 Bitmapset *grouped_cols = aggstate->phase->grouped_cols[currentSet];
1269
1270 aggstate->grouped_cols = grouped_cols;
1271
1272 if (TTS_EMPTY(slot))
1273 {
1274 /*
1275 * Force all values to be NULL if working on an empty input tuple
1276 * (i.e. an empty grouping set for which no input rows were
1277 * supplied).
1278 */
1279 ExecStoreAllNullTuple(slot);
1280 }
1281 else if (aggstate->all_grouped_cols)
1282 {
1283 ListCell *lc;
1284
1285 /* all_grouped_cols is arranged in desc order */
1286 slot_getsomeattrs(slot, linitial_int(aggstate->all_grouped_cols));
1287
1288 foreach(lc, aggstate->all_grouped_cols)
1289 {
1290 int attnum = lfirst_int(lc);
1291
1292 if (!bms_is_member(attnum, grouped_cols))
1293 slot->tts_isnull[attnum - 1] = true;
1294 }
1295 }
1296 }
1297 }
1298
1299 /*
1300 * Compute the final value of all aggregates for one group.
1301 *
1302 * This function handles only one grouping set at a time, which the caller must
1303 * have selected. It's also the caller's responsibility to adjust the supplied
1304 * pergroup parameter to point to the current set's transvalues.
1305 *
1306 * Results are stored in the output econtext aggvalues/aggnulls.
1307 */
1308 static void
finalize_aggregates(AggState * aggstate,AggStatePerAgg peraggs,AggStatePerGroup pergroup)1309 finalize_aggregates(AggState *aggstate,
1310 AggStatePerAgg peraggs,
1311 AggStatePerGroup pergroup)
1312 {
1313 ExprContext *econtext = aggstate->ss.ps.ps_ExprContext;
1314 Datum *aggvalues = econtext->ecxt_aggvalues;
1315 bool *aggnulls = econtext->ecxt_aggnulls;
1316 int aggno;
1317 int transno;
1318
1319 /*
1320 * If there were any DISTINCT and/or ORDER BY aggregates, sort their
1321 * inputs and run the transition functions.
1322 */
1323 for (transno = 0; transno < aggstate->numtrans; transno++)
1324 {
1325 AggStatePerTrans pertrans = &aggstate->pertrans[transno];
1326 AggStatePerGroup pergroupstate;
1327
1328 pergroupstate = &pergroup[transno];
1329
1330 if (pertrans->numSortCols > 0)
1331 {
1332 Assert(aggstate->aggstrategy != AGG_HASHED &&
1333 aggstate->aggstrategy != AGG_MIXED);
1334
1335 if (pertrans->numInputs == 1)
1336 process_ordered_aggregate_single(aggstate,
1337 pertrans,
1338 pergroupstate);
1339 else
1340 process_ordered_aggregate_multi(aggstate,
1341 pertrans,
1342 pergroupstate);
1343 }
1344 }
1345
1346 /*
1347 * Run the final functions.
1348 */
1349 for (aggno = 0; aggno < aggstate->numaggs; aggno++)
1350 {
1351 AggStatePerAgg peragg = &peraggs[aggno];
1352 int transno = peragg->transno;
1353 AggStatePerGroup pergroupstate;
1354
1355 pergroupstate = &pergroup[transno];
1356
1357 if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit))
1358 finalize_partialaggregate(aggstate, peragg, pergroupstate,
1359 &aggvalues[aggno], &aggnulls[aggno]);
1360 else
1361 finalize_aggregate(aggstate, peragg, pergroupstate,
1362 &aggvalues[aggno], &aggnulls[aggno]);
1363 }
1364 }
1365
1366 /*
1367 * Project the result of a group (whose aggs have already been calculated by
1368 * finalize_aggregates). Returns the result slot, or NULL if no row is
1369 * projected (suppressed by qual).
1370 */
1371 static TupleTableSlot *
project_aggregates(AggState * aggstate)1372 project_aggregates(AggState *aggstate)
1373 {
1374 ExprContext *econtext = aggstate->ss.ps.ps_ExprContext;
1375
1376 /*
1377 * Check the qual (HAVING clause); if the group does not match, ignore it.
1378 */
1379 if (ExecQual(aggstate->ss.ps.qual, econtext))
1380 {
1381 /*
1382 * Form and return projection tuple using the aggregate results and
1383 * the representative input tuple.
1384 */
1385 return ExecProject(aggstate->ss.ps.ps_ProjInfo);
1386 }
1387 else
1388 InstrCountFiltered1(aggstate, 1);
1389
1390 return NULL;
1391 }
1392
1393 /*
1394 * Find input-tuple columns that are needed, dividing them into
1395 * aggregated and unaggregated sets.
1396 */
1397 static void
find_cols(AggState * aggstate,Bitmapset ** aggregated,Bitmapset ** unaggregated)1398 find_cols(AggState *aggstate, Bitmapset **aggregated, Bitmapset **unaggregated)
1399 {
1400 Agg *agg = (Agg *) aggstate->ss.ps.plan;
1401 FindColsContext context;
1402
1403 context.is_aggref = false;
1404 context.aggregated = NULL;
1405 context.unaggregated = NULL;
1406
1407 /* Examine tlist and quals */
1408 (void) find_cols_walker((Node *) agg->plan.targetlist, &context);
1409 (void) find_cols_walker((Node *) agg->plan.qual, &context);
1410
1411 /* In some cases, grouping columns will not appear in the tlist */
1412 for (int i = 0; i < agg->numCols; i++)
1413 context.unaggregated = bms_add_member(context.unaggregated,
1414 agg->grpColIdx[i]);
1415
1416 *aggregated = context.aggregated;
1417 *unaggregated = context.unaggregated;
1418 }
1419
1420 static bool
find_cols_walker(Node * node,FindColsContext * context)1421 find_cols_walker(Node *node, FindColsContext *context)
1422 {
1423 if (node == NULL)
1424 return false;
1425 if (IsA(node, Var))
1426 {
1427 Var *var = (Var *) node;
1428
1429 /* setrefs.c should have set the varno to OUTER_VAR */
1430 Assert(var->varno == OUTER_VAR);
1431 Assert(var->varlevelsup == 0);
1432 if (context->is_aggref)
1433 context->aggregated = bms_add_member(context->aggregated,
1434 var->varattno);
1435 else
1436 context->unaggregated = bms_add_member(context->unaggregated,
1437 var->varattno);
1438 return false;
1439 }
1440 if (IsA(node, Aggref))
1441 {
1442 Assert(!context->is_aggref);
1443 context->is_aggref = true;
1444 expression_tree_walker(node, find_cols_walker, (void *) context);
1445 context->is_aggref = false;
1446 return false;
1447 }
1448 return expression_tree_walker(node, find_cols_walker,
1449 (void *) context);
1450 }
1451
1452 /*
1453 * (Re-)initialize the hash table(s) to empty.
1454 *
1455 * To implement hashed aggregation, we need a hashtable that stores a
1456 * representative tuple and an array of AggStatePerGroup structs for each
1457 * distinct set of GROUP BY column values. We compute the hash key from the
1458 * GROUP BY columns. The per-group data is allocated in lookup_hash_entry(),
1459 * for each entry.
1460 *
1461 * We have a separate hashtable and associated perhash data structure for each
1462 * grouping set for which we're doing hashing.
1463 *
1464 * The contents of the hash tables always live in the hashcontext's per-tuple
1465 * memory context (there is only one of these for all tables together, since
1466 * they are all reset at the same time).
1467 */
1468 static void
build_hash_tables(AggState * aggstate)1469 build_hash_tables(AggState *aggstate)
1470 {
1471 int setno;
1472
1473 for (setno = 0; setno < aggstate->num_hashes; ++setno)
1474 {
1475 AggStatePerHash perhash = &aggstate->perhash[setno];
1476 long nbuckets;
1477 Size memory;
1478
1479 if (perhash->hashtable != NULL)
1480 {
1481 ResetTupleHashTable(perhash->hashtable);
1482 continue;
1483 }
1484
1485 Assert(perhash->aggnode->numGroups > 0);
1486
1487 memory = aggstate->hash_mem_limit / aggstate->num_hashes;
1488
1489 /* choose reasonable number of buckets per hashtable */
1490 nbuckets = hash_choose_num_buckets(aggstate->hashentrysize,
1491 perhash->aggnode->numGroups,
1492 memory);
1493
1494 build_hash_table(aggstate, setno, nbuckets);
1495 }
1496
1497 aggstate->hash_ngroups_current = 0;
1498 }
1499
1500 /*
1501 * Build a single hashtable for this grouping set.
1502 */
1503 static void
build_hash_table(AggState * aggstate,int setno,long nbuckets)1504 build_hash_table(AggState *aggstate, int setno, long nbuckets)
1505 {
1506 AggStatePerHash perhash = &aggstate->perhash[setno];
1507 MemoryContext metacxt = aggstate->hash_metacxt;
1508 MemoryContext hashcxt = aggstate->hashcontext->ecxt_per_tuple_memory;
1509 MemoryContext tmpcxt = aggstate->tmpcontext->ecxt_per_tuple_memory;
1510 Size additionalsize;
1511
1512 Assert(aggstate->aggstrategy == AGG_HASHED ||
1513 aggstate->aggstrategy == AGG_MIXED);
1514
1515 /*
1516 * Used to make sure initial hash table allocation does not exceed
1517 * hash_mem. Note that the estimate does not include space for
1518 * pass-by-reference transition data values, nor for the representative
1519 * tuple of each group.
1520 */
1521 additionalsize = aggstate->numtrans * sizeof(AggStatePerGroupData);
1522
1523 perhash->hashtable = BuildTupleHashTableExt(&aggstate->ss.ps,
1524 perhash->hashslot->tts_tupleDescriptor,
1525 perhash->numCols,
1526 perhash->hashGrpColIdxHash,
1527 perhash->eqfuncoids,
1528 perhash->hashfunctions,
1529 perhash->aggnode->grpCollations,
1530 nbuckets,
1531 additionalsize,
1532 metacxt,
1533 hashcxt,
1534 tmpcxt,
1535 DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit));
1536 }
1537
1538 /*
1539 * Compute columns that actually need to be stored in hashtable entries. The
1540 * incoming tuples from the child plan node will contain grouping columns,
1541 * other columns referenced in our targetlist and qual, columns used to
1542 * compute the aggregate functions, and perhaps just junk columns we don't use
1543 * at all. Only columns of the first two types need to be stored in the
1544 * hashtable, and getting rid of the others can make the table entries
1545 * significantly smaller. The hashtable only contains the relevant columns,
1546 * and is packed/unpacked in lookup_hash_entry() / agg_retrieve_hash_table()
1547 * into the format of the normal input descriptor.
1548 *
1549 * Additional columns, in addition to the columns grouped by, come from two
1550 * sources: Firstly functionally dependent columns that we don't need to group
1551 * by themselves, and secondly ctids for row-marks.
1552 *
1553 * To eliminate duplicates, we build a bitmapset of the needed columns, and
1554 * then build an array of the columns included in the hashtable. We might
1555 * still have duplicates if the passed-in grpColIdx has them, which can happen
1556 * in edge cases from semijoins/distinct; these can't always be removed,
1557 * because it's not certain that the duplicate cols will be using the same
1558 * hash function.
1559 *
1560 * Note that the array is preserved over ExecReScanAgg, so we allocate it in
1561 * the per-query context (unlike the hash table itself).
1562 */
1563 static void
find_hash_columns(AggState * aggstate)1564 find_hash_columns(AggState *aggstate)
1565 {
1566 Bitmapset *base_colnos;
1567 Bitmapset *aggregated_colnos;
1568 TupleDesc scanDesc = aggstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
1569 List *outerTlist = outerPlanState(aggstate)->plan->targetlist;
1570 int numHashes = aggstate->num_hashes;
1571 EState *estate = aggstate->ss.ps.state;
1572 int j;
1573
1574 /* Find Vars that will be needed in tlist and qual */
1575 find_cols(aggstate, &aggregated_colnos, &base_colnos);
1576 aggstate->colnos_needed = bms_union(base_colnos, aggregated_colnos);
1577 aggstate->max_colno_needed = 0;
1578 aggstate->all_cols_needed = true;
1579
1580 for (int i = 0; i < scanDesc->natts; i++)
1581 {
1582 int colno = i + 1;
1583
1584 if (bms_is_member(colno, aggstate->colnos_needed))
1585 aggstate->max_colno_needed = colno;
1586 else
1587 aggstate->all_cols_needed = false;
1588 }
1589
1590 for (j = 0; j < numHashes; ++j)
1591 {
1592 AggStatePerHash perhash = &aggstate->perhash[j];
1593 Bitmapset *colnos = bms_copy(base_colnos);
1594 AttrNumber *grpColIdx = perhash->aggnode->grpColIdx;
1595 List *hashTlist = NIL;
1596 TupleDesc hashDesc;
1597 int maxCols;
1598 int i;
1599
1600 perhash->largestGrpColIdx = 0;
1601
1602 /*
1603 * If we're doing grouping sets, then some Vars might be referenced in
1604 * tlist/qual for the benefit of other grouping sets, but not needed
1605 * when hashing; i.e. prepare_projection_slot will null them out, so
1606 * there'd be no point storing them. Use prepare_projection_slot's
1607 * logic to determine which.
1608 */
1609 if (aggstate->phases[0].grouped_cols)
1610 {
1611 Bitmapset *grouped_cols = aggstate->phases[0].grouped_cols[j];
1612 ListCell *lc;
1613
1614 foreach(lc, aggstate->all_grouped_cols)
1615 {
1616 int attnum = lfirst_int(lc);
1617
1618 if (!bms_is_member(attnum, grouped_cols))
1619 colnos = bms_del_member(colnos, attnum);
1620 }
1621 }
1622
1623 /*
1624 * Compute maximum number of input columns accounting for possible
1625 * duplications in the grpColIdx array, which can happen in some edge
1626 * cases where HashAggregate was generated as part of a semijoin or a
1627 * DISTINCT.
1628 */
1629 maxCols = bms_num_members(colnos) + perhash->numCols;
1630
1631 perhash->hashGrpColIdxInput =
1632 palloc(maxCols * sizeof(AttrNumber));
1633 perhash->hashGrpColIdxHash =
1634 palloc(perhash->numCols * sizeof(AttrNumber));
1635
1636 /* Add all the grouping columns to colnos */
1637 for (i = 0; i < perhash->numCols; i++)
1638 colnos = bms_add_member(colnos, grpColIdx[i]);
1639
1640 /*
1641 * First build mapping for columns directly hashed. These are the
1642 * first, because they'll be accessed when computing hash values and
1643 * comparing tuples for exact matches. We also build simple mapping
1644 * for execGrouping, so it knows where to find the to-be-hashed /
1645 * compared columns in the input.
1646 */
1647 for (i = 0; i < perhash->numCols; i++)
1648 {
1649 perhash->hashGrpColIdxInput[i] = grpColIdx[i];
1650 perhash->hashGrpColIdxHash[i] = i + 1;
1651 perhash->numhashGrpCols++;
1652 /* delete already mapped columns */
1653 bms_del_member(colnos, grpColIdx[i]);
1654 }
1655
1656 /* and add the remaining columns */
1657 while ((i = bms_first_member(colnos)) >= 0)
1658 {
1659 perhash->hashGrpColIdxInput[perhash->numhashGrpCols] = i;
1660 perhash->numhashGrpCols++;
1661 }
1662
1663 /* and build a tuple descriptor for the hashtable */
1664 for (i = 0; i < perhash->numhashGrpCols; i++)
1665 {
1666 int varNumber = perhash->hashGrpColIdxInput[i] - 1;
1667
1668 hashTlist = lappend(hashTlist, list_nth(outerTlist, varNumber));
1669 perhash->largestGrpColIdx =
1670 Max(varNumber + 1, perhash->largestGrpColIdx);
1671 }
1672
1673 hashDesc = ExecTypeFromTL(hashTlist);
1674
1675 execTuplesHashPrepare(perhash->numCols,
1676 perhash->aggnode->grpOperators,
1677 &perhash->eqfuncoids,
1678 &perhash->hashfunctions);
1679 perhash->hashslot =
1680 ExecAllocTableSlot(&estate->es_tupleTable, hashDesc,
1681 &TTSOpsMinimalTuple);
1682
1683 list_free(hashTlist);
1684 bms_free(colnos);
1685 }
1686
1687 bms_free(base_colnos);
1688 }
1689
1690 /*
1691 * Estimate per-hash-table-entry overhead.
1692 */
1693 Size
hash_agg_entry_size(int numTrans,Size tupleWidth,Size transitionSpace)1694 hash_agg_entry_size(int numTrans, Size tupleWidth, Size transitionSpace)
1695 {
1696 Size tupleChunkSize;
1697 Size pergroupChunkSize;
1698 Size transitionChunkSize;
1699 Size tupleSize = (MAXALIGN(SizeofMinimalTupleHeader) +
1700 tupleWidth);
1701 Size pergroupSize = numTrans * sizeof(AggStatePerGroupData);
1702
1703 tupleChunkSize = CHUNKHDRSZ + tupleSize;
1704
1705 if (pergroupSize > 0)
1706 pergroupChunkSize = CHUNKHDRSZ + pergroupSize;
1707 else
1708 pergroupChunkSize = 0;
1709
1710 if (transitionSpace > 0)
1711 transitionChunkSize = CHUNKHDRSZ + transitionSpace;
1712 else
1713 transitionChunkSize = 0;
1714
1715 return
1716 sizeof(TupleHashEntryData) +
1717 tupleChunkSize +
1718 pergroupChunkSize +
1719 transitionChunkSize;
1720 }
1721
1722 /*
1723 * hashagg_recompile_expressions()
1724 *
1725 * Identifies the right phase, compiles the right expression given the
1726 * arguments, and then sets phase->evalfunc to that expression.
1727 *
1728 * Different versions of the compiled expression are needed depending on
1729 * whether hash aggregation has spilled or not, and whether it's reading from
1730 * the outer plan or a tape. Before spilling to disk, the expression reads
1731 * from the outer plan and does not need to perform a NULL check. After
1732 * HashAgg begins to spill, new groups will not be created in the hash table,
1733 * and the AggStatePerGroup array may be NULL; therefore we need to add a null
1734 * pointer check to the expression. Then, when reading spilled data from a
1735 * tape, we change the outer slot type to be a fixed minimal tuple slot.
1736 *
1737 * It would be wasteful to recompile every time, so cache the compiled
1738 * expressions in the AggStatePerPhase, and reuse when appropriate.
1739 */
1740 static void
hashagg_recompile_expressions(AggState * aggstate,bool minslot,bool nullcheck)1741 hashagg_recompile_expressions(AggState *aggstate, bool minslot, bool nullcheck)
1742 {
1743 AggStatePerPhase phase;
1744 int i = minslot ? 1 : 0;
1745 int j = nullcheck ? 1 : 0;
1746
1747 Assert(aggstate->aggstrategy == AGG_HASHED ||
1748 aggstate->aggstrategy == AGG_MIXED);
1749
1750 if (aggstate->aggstrategy == AGG_HASHED)
1751 phase = &aggstate->phases[0];
1752 else /* AGG_MIXED */
1753 phase = &aggstate->phases[1];
1754
1755 if (phase->evaltrans_cache[i][j] == NULL)
1756 {
1757 const TupleTableSlotOps *outerops = aggstate->ss.ps.outerops;
1758 bool outerfixed = aggstate->ss.ps.outeropsfixed;
1759 bool dohash = true;
1760 bool dosort = false;
1761
1762 /*
1763 * If minslot is true, that means we are processing a spilled batch
1764 * (inside agg_refill_hash_table()), and we must not advance the
1765 * sorted grouping sets.
1766 */
1767 if (aggstate->aggstrategy == AGG_MIXED && !minslot)
1768 dosort = true;
1769
1770 /* temporarily change the outerops while compiling the expression */
1771 if (minslot)
1772 {
1773 aggstate->ss.ps.outerops = &TTSOpsMinimalTuple;
1774 aggstate->ss.ps.outeropsfixed = true;
1775 }
1776
1777 phase->evaltrans_cache[i][j] = ExecBuildAggTrans(aggstate, phase,
1778 dosort, dohash,
1779 nullcheck);
1780
1781 /* change back */
1782 aggstate->ss.ps.outerops = outerops;
1783 aggstate->ss.ps.outeropsfixed = outerfixed;
1784 }
1785
1786 phase->evaltrans = phase->evaltrans_cache[i][j];
1787 }
1788
1789 /*
1790 * Set limits that trigger spilling to avoid exceeding hash_mem. Consider the
1791 * number of partitions we expect to create (if we do spill).
1792 *
1793 * There are two limits: a memory limit, and also an ngroups limit. The
1794 * ngroups limit becomes important when we expect transition values to grow
1795 * substantially larger than the initial value.
1796 */
1797 void
hash_agg_set_limits(double hashentrysize,double input_groups,int used_bits,Size * mem_limit,uint64 * ngroups_limit,int * num_partitions)1798 hash_agg_set_limits(double hashentrysize, double input_groups, int used_bits,
1799 Size *mem_limit, uint64 *ngroups_limit,
1800 int *num_partitions)
1801 {
1802 int npartitions;
1803 Size partition_mem;
1804 Size hash_mem_limit = get_hash_memory_limit();
1805
1806 /* if not expected to spill, use all of hash_mem */
1807 if (input_groups * hashentrysize <= hash_mem_limit)
1808 {
1809 if (num_partitions != NULL)
1810 *num_partitions = 0;
1811 *mem_limit = hash_mem_limit;
1812 *ngroups_limit = hash_mem_limit / hashentrysize;
1813 return;
1814 }
1815
1816 /*
1817 * Calculate expected memory requirements for spilling, which is the size
1818 * of the buffers needed for all the tapes that need to be open at once.
1819 * Then, subtract that from the memory available for holding hash tables.
1820 */
1821 npartitions = hash_choose_num_partitions(input_groups,
1822 hashentrysize,
1823 used_bits,
1824 NULL);
1825 if (num_partitions != NULL)
1826 *num_partitions = npartitions;
1827
1828 partition_mem =
1829 HASHAGG_READ_BUFFER_SIZE +
1830 HASHAGG_WRITE_BUFFER_SIZE * npartitions;
1831
1832 /*
1833 * Don't set the limit below 3/4 of hash_mem. In that case, we are at the
1834 * minimum number of partitions, so we aren't going to dramatically exceed
1835 * work mem anyway.
1836 */
1837 if (hash_mem_limit > 4 * partition_mem)
1838 *mem_limit = hash_mem_limit - partition_mem;
1839 else
1840 *mem_limit = hash_mem_limit * 0.75;
1841
1842 if (*mem_limit > hashentrysize)
1843 *ngroups_limit = *mem_limit / hashentrysize;
1844 else
1845 *ngroups_limit = 1;
1846 }
1847
1848 /*
1849 * hash_agg_check_limits
1850 *
1851 * After adding a new group to the hash table, check whether we need to enter
1852 * spill mode. Allocations may happen without adding new groups (for instance,
1853 * if the transition state size grows), so this check is imperfect.
1854 */
1855 static void
hash_agg_check_limits(AggState * aggstate)1856 hash_agg_check_limits(AggState *aggstate)
1857 {
1858 uint64 ngroups = aggstate->hash_ngroups_current;
1859 Size meta_mem = MemoryContextMemAllocated(aggstate->hash_metacxt,
1860 true);
1861 Size hashkey_mem = MemoryContextMemAllocated(aggstate->hashcontext->ecxt_per_tuple_memory,
1862 true);
1863
1864 /*
1865 * Don't spill unless there's at least one group in the hash table so we
1866 * can be sure to make progress even in edge cases.
1867 */
1868 if (aggstate->hash_ngroups_current > 0 &&
1869 (meta_mem + hashkey_mem > aggstate->hash_mem_limit ||
1870 ngroups > aggstate->hash_ngroups_limit))
1871 {
1872 hash_agg_enter_spill_mode(aggstate);
1873 }
1874 }
1875
1876 /*
1877 * Enter "spill mode", meaning that no new groups are added to any of the hash
1878 * tables. Tuples that would create a new group are instead spilled, and
1879 * processed later.
1880 */
1881 static void
hash_agg_enter_spill_mode(AggState * aggstate)1882 hash_agg_enter_spill_mode(AggState *aggstate)
1883 {
1884 aggstate->hash_spill_mode = true;
1885 hashagg_recompile_expressions(aggstate, aggstate->table_filled, true);
1886
1887 if (!aggstate->hash_ever_spilled)
1888 {
1889 Assert(aggstate->hash_tapeinfo == NULL);
1890 Assert(aggstate->hash_spills == NULL);
1891
1892 aggstate->hash_ever_spilled = true;
1893
1894 hashagg_tapeinfo_init(aggstate);
1895
1896 aggstate->hash_spills = palloc(sizeof(HashAggSpill) * aggstate->num_hashes);
1897
1898 for (int setno = 0; setno < aggstate->num_hashes; setno++)
1899 {
1900 AggStatePerHash perhash = &aggstate->perhash[setno];
1901 HashAggSpill *spill = &aggstate->hash_spills[setno];
1902
1903 hashagg_spill_init(spill, aggstate->hash_tapeinfo, 0,
1904 perhash->aggnode->numGroups,
1905 aggstate->hashentrysize);
1906 }
1907 }
1908 }
1909
1910 /*
1911 * Update metrics after filling the hash table.
1912 *
1913 * If reading from the outer plan, from_tape should be false; if reading from
1914 * another tape, from_tape should be true.
1915 */
1916 static void
hash_agg_update_metrics(AggState * aggstate,bool from_tape,int npartitions)1917 hash_agg_update_metrics(AggState *aggstate, bool from_tape, int npartitions)
1918 {
1919 Size meta_mem;
1920 Size hashkey_mem;
1921 Size buffer_mem;
1922 Size total_mem;
1923
1924 if (aggstate->aggstrategy != AGG_MIXED &&
1925 aggstate->aggstrategy != AGG_HASHED)
1926 return;
1927
1928 /* memory for the hash table itself */
1929 meta_mem = MemoryContextMemAllocated(aggstate->hash_metacxt, true);
1930
1931 /* memory for the group keys and transition states */
1932 hashkey_mem = MemoryContextMemAllocated(aggstate->hashcontext->ecxt_per_tuple_memory, true);
1933
1934 /* memory for read/write tape buffers, if spilled */
1935 buffer_mem = npartitions * HASHAGG_WRITE_BUFFER_SIZE;
1936 if (from_tape)
1937 buffer_mem += HASHAGG_READ_BUFFER_SIZE;
1938
1939 /* update peak mem */
1940 total_mem = meta_mem + hashkey_mem + buffer_mem;
1941 if (total_mem > aggstate->hash_mem_peak)
1942 aggstate->hash_mem_peak = total_mem;
1943
1944 /* update disk usage */
1945 if (aggstate->hash_tapeinfo != NULL)
1946 {
1947 uint64 disk_used = LogicalTapeSetBlocks(aggstate->hash_tapeinfo->tapeset) * (BLCKSZ / 1024);
1948
1949 if (aggstate->hash_disk_used < disk_used)
1950 aggstate->hash_disk_used = disk_used;
1951 }
1952
1953 /* update hashentrysize estimate based on contents */
1954 if (aggstate->hash_ngroups_current > 0)
1955 {
1956 aggstate->hashentrysize =
1957 sizeof(TupleHashEntryData) +
1958 (hashkey_mem / (double) aggstate->hash_ngroups_current);
1959 }
1960 }
1961
1962 /*
1963 * Choose a reasonable number of buckets for the initial hash table size.
1964 */
1965 static long
hash_choose_num_buckets(double hashentrysize,long ngroups,Size memory)1966 hash_choose_num_buckets(double hashentrysize, long ngroups, Size memory)
1967 {
1968 long max_nbuckets;
1969 long nbuckets = ngroups;
1970
1971 max_nbuckets = memory / hashentrysize;
1972
1973 /*
1974 * Underestimating is better than overestimating. Too many buckets crowd
1975 * out space for group keys and transition state values.
1976 */
1977 max_nbuckets >>= 1;
1978
1979 if (nbuckets > max_nbuckets)
1980 nbuckets = max_nbuckets;
1981
1982 return Max(nbuckets, 1);
1983 }
1984
1985 /*
1986 * Determine the number of partitions to create when spilling, which will
1987 * always be a power of two. If log2_npartitions is non-NULL, set
1988 * *log2_npartitions to the log2() of the number of partitions.
1989 */
1990 static int
hash_choose_num_partitions(double input_groups,double hashentrysize,int used_bits,int * log2_npartitions)1991 hash_choose_num_partitions(double input_groups, double hashentrysize,
1992 int used_bits, int *log2_npartitions)
1993 {
1994 Size hash_mem_limit = get_hash_memory_limit();
1995 double partition_limit;
1996 double mem_wanted;
1997 double dpartitions;
1998 int npartitions;
1999 int partition_bits;
2000
2001 /*
2002 * Avoid creating so many partitions that the memory requirements of the
2003 * open partition files are greater than 1/4 of hash_mem.
2004 */
2005 partition_limit =
2006 (hash_mem_limit * 0.25 - HASHAGG_READ_BUFFER_SIZE) /
2007 HASHAGG_WRITE_BUFFER_SIZE;
2008
2009 mem_wanted = HASHAGG_PARTITION_FACTOR * input_groups * hashentrysize;
2010
2011 /* make enough partitions so that each one is likely to fit in memory */
2012 dpartitions = 1 + (mem_wanted / hash_mem_limit);
2013
2014 if (dpartitions > partition_limit)
2015 dpartitions = partition_limit;
2016
2017 if (dpartitions < HASHAGG_MIN_PARTITIONS)
2018 dpartitions = HASHAGG_MIN_PARTITIONS;
2019 if (dpartitions > HASHAGG_MAX_PARTITIONS)
2020 dpartitions = HASHAGG_MAX_PARTITIONS;
2021
2022 /* HASHAGG_MAX_PARTITIONS limit makes this safe */
2023 npartitions = (int) dpartitions;
2024
2025 /* ceil(log2(npartitions)) */
2026 partition_bits = my_log2(npartitions);
2027
2028 /* make sure that we don't exhaust the hash bits */
2029 if (partition_bits + used_bits >= 32)
2030 partition_bits = 32 - used_bits;
2031
2032 if (log2_npartitions != NULL)
2033 *log2_npartitions = partition_bits;
2034
2035 /* number of partitions will be a power of two */
2036 npartitions = 1 << partition_bits;
2037
2038 return npartitions;
2039 }
2040
2041 /*
2042 * Initialize a freshly-created TupleHashEntry.
2043 */
2044 static void
initialize_hash_entry(AggState * aggstate,TupleHashTable hashtable,TupleHashEntry entry)2045 initialize_hash_entry(AggState *aggstate, TupleHashTable hashtable,
2046 TupleHashEntry entry)
2047 {
2048 AggStatePerGroup pergroup;
2049 int transno;
2050
2051 aggstate->hash_ngroups_current++;
2052 hash_agg_check_limits(aggstate);
2053
2054 /* no need to allocate or initialize per-group state */
2055 if (aggstate->numtrans == 0)
2056 return;
2057
2058 pergroup = (AggStatePerGroup)
2059 MemoryContextAlloc(hashtable->tablecxt,
2060 sizeof(AggStatePerGroupData) * aggstate->numtrans);
2061
2062 entry->additional = pergroup;
2063
2064 /*
2065 * Initialize aggregates for new tuple group, lookup_hash_entries()
2066 * already has selected the relevant grouping set.
2067 */
2068 for (transno = 0; transno < aggstate->numtrans; transno++)
2069 {
2070 AggStatePerTrans pertrans = &aggstate->pertrans[transno];
2071 AggStatePerGroup pergroupstate = &pergroup[transno];
2072
2073 initialize_aggregate(aggstate, pertrans, pergroupstate);
2074 }
2075 }
2076
2077 /*
2078 * Look up hash entries for the current tuple in all hashed grouping sets.
2079 *
2080 * Be aware that lookup_hash_entry can reset the tmpcontext.
2081 *
2082 * Some entries may be left NULL if we are in "spill mode". The same tuple
2083 * will belong to different groups for each grouping set, so may match a group
2084 * already in memory for one set and match a group not in memory for another
2085 * set. When in "spill mode", the tuple will be spilled for each grouping set
2086 * where it doesn't match a group in memory.
2087 *
2088 * NB: It's possible to spill the same tuple for several different grouping
2089 * sets. This may seem wasteful, but it's actually a trade-off: if we spill
2090 * the tuple multiple times for multiple grouping sets, it can be partitioned
2091 * for each grouping set, making the refilling of the hash table very
2092 * efficient.
2093 */
2094 static void
lookup_hash_entries(AggState * aggstate)2095 lookup_hash_entries(AggState *aggstate)
2096 {
2097 AggStatePerGroup *pergroup = aggstate->hash_pergroup;
2098 TupleTableSlot *outerslot = aggstate->tmpcontext->ecxt_outertuple;
2099 int setno;
2100
2101 for (setno = 0; setno < aggstate->num_hashes; setno++)
2102 {
2103 AggStatePerHash perhash = &aggstate->perhash[setno];
2104 TupleHashTable hashtable = perhash->hashtable;
2105 TupleTableSlot *hashslot = perhash->hashslot;
2106 TupleHashEntry entry;
2107 uint32 hash;
2108 bool isnew = false;
2109 bool *p_isnew;
2110
2111 /* if hash table already spilled, don't create new entries */
2112 p_isnew = aggstate->hash_spill_mode ? NULL : &isnew;
2113
2114 select_current_set(aggstate, setno, true);
2115 prepare_hash_slot(perhash,
2116 outerslot,
2117 hashslot);
2118
2119 entry = LookupTupleHashEntry(hashtable, hashslot,
2120 p_isnew, &hash);
2121
2122 if (entry != NULL)
2123 {
2124 if (isnew)
2125 initialize_hash_entry(aggstate, hashtable, entry);
2126 pergroup[setno] = entry->additional;
2127 }
2128 else
2129 {
2130 HashAggSpill *spill = &aggstate->hash_spills[setno];
2131 TupleTableSlot *slot = aggstate->tmpcontext->ecxt_outertuple;
2132
2133 if (spill->partitions == NULL)
2134 hashagg_spill_init(spill, aggstate->hash_tapeinfo, 0,
2135 perhash->aggnode->numGroups,
2136 aggstate->hashentrysize);
2137
2138 hashagg_spill_tuple(aggstate, spill, slot, hash);
2139 pergroup[setno] = NULL;
2140 }
2141 }
2142 }
2143
2144 /*
2145 * ExecAgg -
2146 *
2147 * ExecAgg receives tuples from its outer subplan and aggregates over
2148 * the appropriate attribute for each aggregate function use (Aggref
2149 * node) appearing in the targetlist or qual of the node. The number
2150 * of tuples to aggregate over depends on whether grouped or plain
2151 * aggregation is selected. In grouped aggregation, we produce a result
2152 * row for each group; in plain aggregation there's a single result row
2153 * for the whole query. In either case, the value of each aggregate is
2154 * stored in the expression context to be used when ExecProject evaluates
2155 * the result tuple.
2156 */
2157 static TupleTableSlot *
ExecAgg(PlanState * pstate)2158 ExecAgg(PlanState *pstate)
2159 {
2160 AggState *node = castNode(AggState, pstate);
2161 TupleTableSlot *result = NULL;
2162
2163 CHECK_FOR_INTERRUPTS();
2164
2165 if (!node->agg_done)
2166 {
2167 /* Dispatch based on strategy */
2168 switch (node->phase->aggstrategy)
2169 {
2170 case AGG_HASHED:
2171 if (!node->table_filled)
2172 agg_fill_hash_table(node);
2173 /* FALLTHROUGH */
2174 case AGG_MIXED:
2175 result = agg_retrieve_hash_table(node);
2176 break;
2177 case AGG_PLAIN:
2178 case AGG_SORTED:
2179 result = agg_retrieve_direct(node);
2180 break;
2181 }
2182
2183 if (!TupIsNull(result))
2184 return result;
2185 }
2186
2187 return NULL;
2188 }
2189
2190 /*
2191 * ExecAgg for non-hashed case
2192 */
2193 static TupleTableSlot *
agg_retrieve_direct(AggState * aggstate)2194 agg_retrieve_direct(AggState *aggstate)
2195 {
2196 Agg *node = aggstate->phase->aggnode;
2197 ExprContext *econtext;
2198 ExprContext *tmpcontext;
2199 AggStatePerAgg peragg;
2200 AggStatePerGroup *pergroups;
2201 TupleTableSlot *outerslot;
2202 TupleTableSlot *firstSlot;
2203 TupleTableSlot *result;
2204 bool hasGroupingSets = aggstate->phase->numsets > 0;
2205 int numGroupingSets = Max(aggstate->phase->numsets, 1);
2206 int currentSet;
2207 int nextSetSize;
2208 int numReset;
2209 int i;
2210
2211 /*
2212 * get state info from node
2213 *
2214 * econtext is the per-output-tuple expression context
2215 *
2216 * tmpcontext is the per-input-tuple expression context
2217 */
2218 econtext = aggstate->ss.ps.ps_ExprContext;
2219 tmpcontext = aggstate->tmpcontext;
2220
2221 peragg = aggstate->peragg;
2222 pergroups = aggstate->pergroups;
2223 firstSlot = aggstate->ss.ss_ScanTupleSlot;
2224
2225 /*
2226 * We loop retrieving groups until we find one matching
2227 * aggstate->ss.ps.qual
2228 *
2229 * For grouping sets, we have the invariant that aggstate->projected_set
2230 * is either -1 (initial call) or the index (starting from 0) in
2231 * gset_lengths for the group we just completed (either by projecting a
2232 * row or by discarding it in the qual).
2233 */
2234 while (!aggstate->agg_done)
2235 {
2236 /*
2237 * Clear the per-output-tuple context for each group, as well as
2238 * aggcontext (which contains any pass-by-ref transvalues of the old
2239 * group). Some aggregate functions store working state in child
2240 * contexts; those now get reset automatically without us needing to
2241 * do anything special.
2242 *
2243 * We use ReScanExprContext not just ResetExprContext because we want
2244 * any registered shutdown callbacks to be called. That allows
2245 * aggregate functions to ensure they've cleaned up any non-memory
2246 * resources.
2247 */
2248 ReScanExprContext(econtext);
2249
2250 /*
2251 * Determine how many grouping sets need to be reset at this boundary.
2252 */
2253 if (aggstate->projected_set >= 0 &&
2254 aggstate->projected_set < numGroupingSets)
2255 numReset = aggstate->projected_set + 1;
2256 else
2257 numReset = numGroupingSets;
2258
2259 /*
2260 * numReset can change on a phase boundary, but that's OK; we want to
2261 * reset the contexts used in _this_ phase, and later, after possibly
2262 * changing phase, initialize the right number of aggregates for the
2263 * _new_ phase.
2264 */
2265
2266 for (i = 0; i < numReset; i++)
2267 {
2268 ReScanExprContext(aggstate->aggcontexts[i]);
2269 }
2270
2271 /*
2272 * Check if input is complete and there are no more groups to project
2273 * in this phase; move to next phase or mark as done.
2274 */
2275 if (aggstate->input_done == true &&
2276 aggstate->projected_set >= (numGroupingSets - 1))
2277 {
2278 if (aggstate->current_phase < aggstate->numphases - 1)
2279 {
2280 initialize_phase(aggstate, aggstate->current_phase + 1);
2281 aggstate->input_done = false;
2282 aggstate->projected_set = -1;
2283 numGroupingSets = Max(aggstate->phase->numsets, 1);
2284 node = aggstate->phase->aggnode;
2285 numReset = numGroupingSets;
2286 }
2287 else if (aggstate->aggstrategy == AGG_MIXED)
2288 {
2289 /*
2290 * Mixed mode; we've output all the grouped stuff and have
2291 * full hashtables, so switch to outputting those.
2292 */
2293 initialize_phase(aggstate, 0);
2294 aggstate->table_filled = true;
2295 ResetTupleHashIterator(aggstate->perhash[0].hashtable,
2296 &aggstate->perhash[0].hashiter);
2297 select_current_set(aggstate, 0, true);
2298 return agg_retrieve_hash_table(aggstate);
2299 }
2300 else
2301 {
2302 aggstate->agg_done = true;
2303 break;
2304 }
2305 }
2306
2307 /*
2308 * Get the number of columns in the next grouping set after the last
2309 * projected one (if any). This is the number of columns to compare to
2310 * see if we reached the boundary of that set too.
2311 */
2312 if (aggstate->projected_set >= 0 &&
2313 aggstate->projected_set < (numGroupingSets - 1))
2314 nextSetSize = aggstate->phase->gset_lengths[aggstate->projected_set + 1];
2315 else
2316 nextSetSize = 0;
2317
2318 /*----------
2319 * If a subgroup for the current grouping set is present, project it.
2320 *
2321 * We have a new group if:
2322 * - we're out of input but haven't projected all grouping sets
2323 * (checked above)
2324 * OR
2325 * - we already projected a row that wasn't from the last grouping
2326 * set
2327 * AND
2328 * - the next grouping set has at least one grouping column (since
2329 * empty grouping sets project only once input is exhausted)
2330 * AND
2331 * - the previous and pending rows differ on the grouping columns
2332 * of the next grouping set
2333 *----------
2334 */
2335 tmpcontext->ecxt_innertuple = econtext->ecxt_outertuple;
2336 if (aggstate->input_done ||
2337 (node->aggstrategy != AGG_PLAIN &&
2338 aggstate->projected_set != -1 &&
2339 aggstate->projected_set < (numGroupingSets - 1) &&
2340 nextSetSize > 0 &&
2341 !ExecQualAndReset(aggstate->phase->eqfunctions[nextSetSize - 1],
2342 tmpcontext)))
2343 {
2344 aggstate->projected_set += 1;
2345
2346 Assert(aggstate->projected_set < numGroupingSets);
2347 Assert(nextSetSize > 0 || aggstate->input_done);
2348 }
2349 else
2350 {
2351 /*
2352 * We no longer care what group we just projected, the next
2353 * projection will always be the first (or only) grouping set
2354 * (unless the input proves to be empty).
2355 */
2356 aggstate->projected_set = 0;
2357
2358 /*
2359 * If we don't already have the first tuple of the new group,
2360 * fetch it from the outer plan.
2361 */
2362 if (aggstate->grp_firstTuple == NULL)
2363 {
2364 outerslot = fetch_input_tuple(aggstate);
2365 if (!TupIsNull(outerslot))
2366 {
2367 /*
2368 * Make a copy of the first input tuple; we will use this
2369 * for comparisons (in group mode) and for projection.
2370 */
2371 aggstate->grp_firstTuple = ExecCopySlotHeapTuple(outerslot);
2372 }
2373 else
2374 {
2375 /* outer plan produced no tuples at all */
2376 if (hasGroupingSets)
2377 {
2378 /*
2379 * If there was no input at all, we need to project
2380 * rows only if there are grouping sets of size 0.
2381 * Note that this implies that there can't be any
2382 * references to ungrouped Vars, which would otherwise
2383 * cause issues with the empty output slot.
2384 *
2385 * XXX: This is no longer true, we currently deal with
2386 * this in finalize_aggregates().
2387 */
2388 aggstate->input_done = true;
2389
2390 while (aggstate->phase->gset_lengths[aggstate->projected_set] > 0)
2391 {
2392 aggstate->projected_set += 1;
2393 if (aggstate->projected_set >= numGroupingSets)
2394 {
2395 /*
2396 * We can't set agg_done here because we might
2397 * have more phases to do, even though the
2398 * input is empty. So we need to restart the
2399 * whole outer loop.
2400 */
2401 break;
2402 }
2403 }
2404
2405 if (aggstate->projected_set >= numGroupingSets)
2406 continue;
2407 }
2408 else
2409 {
2410 aggstate->agg_done = true;
2411 /* If we are grouping, we should produce no tuples too */
2412 if (node->aggstrategy != AGG_PLAIN)
2413 return NULL;
2414 }
2415 }
2416 }
2417
2418 /*
2419 * Initialize working state for a new input tuple group.
2420 */
2421 initialize_aggregates(aggstate, pergroups, numReset);
2422
2423 if (aggstate->grp_firstTuple != NULL)
2424 {
2425 /*
2426 * Store the copied first input tuple in the tuple table slot
2427 * reserved for it. The tuple will be deleted when it is
2428 * cleared from the slot.
2429 */
2430 ExecForceStoreHeapTuple(aggstate->grp_firstTuple,
2431 firstSlot, true);
2432 aggstate->grp_firstTuple = NULL; /* don't keep two pointers */
2433
2434 /* set up for first advance_aggregates call */
2435 tmpcontext->ecxt_outertuple = firstSlot;
2436
2437 /*
2438 * Process each outer-plan tuple, and then fetch the next one,
2439 * until we exhaust the outer plan or cross a group boundary.
2440 */
2441 for (;;)
2442 {
2443 /*
2444 * During phase 1 only of a mixed agg, we need to update
2445 * hashtables as well in advance_aggregates.
2446 */
2447 if (aggstate->aggstrategy == AGG_MIXED &&
2448 aggstate->current_phase == 1)
2449 {
2450 lookup_hash_entries(aggstate);
2451 }
2452
2453 /* Advance the aggregates (or combine functions) */
2454 advance_aggregates(aggstate);
2455
2456 /* Reset per-input-tuple context after each tuple */
2457 ResetExprContext(tmpcontext);
2458
2459 outerslot = fetch_input_tuple(aggstate);
2460 if (TupIsNull(outerslot))
2461 {
2462 /* no more outer-plan tuples available */
2463
2464 /* if we built hash tables, finalize any spills */
2465 if (aggstate->aggstrategy == AGG_MIXED &&
2466 aggstate->current_phase == 1)
2467 hashagg_finish_initial_spills(aggstate);
2468
2469 if (hasGroupingSets)
2470 {
2471 aggstate->input_done = true;
2472 break;
2473 }
2474 else
2475 {
2476 aggstate->agg_done = true;
2477 break;
2478 }
2479 }
2480 /* set up for next advance_aggregates call */
2481 tmpcontext->ecxt_outertuple = outerslot;
2482
2483 /*
2484 * If we are grouping, check whether we've crossed a group
2485 * boundary.
2486 */
2487 if (node->aggstrategy != AGG_PLAIN)
2488 {
2489 tmpcontext->ecxt_innertuple = firstSlot;
2490 if (!ExecQual(aggstate->phase->eqfunctions[node->numCols - 1],
2491 tmpcontext))
2492 {
2493 aggstate->grp_firstTuple = ExecCopySlotHeapTuple(outerslot);
2494 break;
2495 }
2496 }
2497 }
2498 }
2499
2500 /*
2501 * Use the representative input tuple for any references to
2502 * non-aggregated input columns in aggregate direct args, the node
2503 * qual, and the tlist. (If we are not grouping, and there are no
2504 * input rows at all, we will come here with an empty firstSlot
2505 * ... but if not grouping, there can't be any references to
2506 * non-aggregated input columns, so no problem.)
2507 */
2508 econtext->ecxt_outertuple = firstSlot;
2509 }
2510
2511 Assert(aggstate->projected_set >= 0);
2512
2513 currentSet = aggstate->projected_set;
2514
2515 prepare_projection_slot(aggstate, econtext->ecxt_outertuple, currentSet);
2516
2517 select_current_set(aggstate, currentSet, false);
2518
2519 finalize_aggregates(aggstate,
2520 peragg,
2521 pergroups[currentSet]);
2522
2523 /*
2524 * If there's no row to project right now, we must continue rather
2525 * than returning a null since there might be more groups.
2526 */
2527 result = project_aggregates(aggstate);
2528 if (result)
2529 return result;
2530 }
2531
2532 /* No more groups */
2533 return NULL;
2534 }
2535
2536 /*
2537 * ExecAgg for hashed case: read input and build hash table
2538 */
2539 static void
agg_fill_hash_table(AggState * aggstate)2540 agg_fill_hash_table(AggState *aggstate)
2541 {
2542 TupleTableSlot *outerslot;
2543 ExprContext *tmpcontext = aggstate->tmpcontext;
2544
2545 /*
2546 * Process each outer-plan tuple, and then fetch the next one, until we
2547 * exhaust the outer plan.
2548 */
2549 for (;;)
2550 {
2551 outerslot = fetch_input_tuple(aggstate);
2552 if (TupIsNull(outerslot))
2553 break;
2554
2555 /* set up for lookup_hash_entries and advance_aggregates */
2556 tmpcontext->ecxt_outertuple = outerslot;
2557
2558 /* Find or build hashtable entries */
2559 lookup_hash_entries(aggstate);
2560
2561 /* Advance the aggregates (or combine functions) */
2562 advance_aggregates(aggstate);
2563
2564 /*
2565 * Reset per-input-tuple context after each tuple, but note that the
2566 * hash lookups do this too
2567 */
2568 ResetExprContext(aggstate->tmpcontext);
2569 }
2570
2571 /* finalize spills, if any */
2572 hashagg_finish_initial_spills(aggstate);
2573
2574 aggstate->table_filled = true;
2575 /* Initialize to walk the first hash table */
2576 select_current_set(aggstate, 0, true);
2577 ResetTupleHashIterator(aggstate->perhash[0].hashtable,
2578 &aggstate->perhash[0].hashiter);
2579 }
2580
2581 /*
2582 * If any data was spilled during hash aggregation, reset the hash table and
2583 * reprocess one batch of spilled data. After reprocessing a batch, the hash
2584 * table will again contain data, ready to be consumed by
2585 * agg_retrieve_hash_table_in_memory().
2586 *
2587 * Should only be called after all in memory hash table entries have been
2588 * finalized and emitted.
2589 *
2590 * Return false when input is exhausted and there's no more work to be done;
2591 * otherwise return true.
2592 */
2593 static bool
agg_refill_hash_table(AggState * aggstate)2594 agg_refill_hash_table(AggState *aggstate)
2595 {
2596 HashAggBatch *batch;
2597 AggStatePerHash perhash;
2598 HashAggSpill spill;
2599 HashTapeInfo *tapeinfo = aggstate->hash_tapeinfo;
2600 bool spill_initialized = false;
2601
2602 if (aggstate->hash_batches == NIL)
2603 return false;
2604
2605 /* hash_batches is a stack, with the top item at the end of the list */
2606 batch = llast(aggstate->hash_batches);
2607 aggstate->hash_batches = list_delete_last(aggstate->hash_batches);
2608
2609 hash_agg_set_limits(aggstate->hashentrysize, batch->input_card,
2610 batch->used_bits, &aggstate->hash_mem_limit,
2611 &aggstate->hash_ngroups_limit, NULL);
2612
2613 /*
2614 * Each batch only processes one grouping set; set the rest to NULL so
2615 * that advance_aggregates() knows to ignore them. We don't touch
2616 * pergroups for sorted grouping sets here, because they will be needed if
2617 * we rescan later. The expressions for sorted grouping sets will not be
2618 * evaluated after we recompile anyway.
2619 */
2620 MemSet(aggstate->hash_pergroup, 0,
2621 sizeof(AggStatePerGroup) * aggstate->num_hashes);
2622
2623 /* free memory and reset hash tables */
2624 ReScanExprContext(aggstate->hashcontext);
2625 for (int setno = 0; setno < aggstate->num_hashes; setno++)
2626 ResetTupleHashTable(aggstate->perhash[setno].hashtable);
2627
2628 aggstate->hash_ngroups_current = 0;
2629
2630 /*
2631 * In AGG_MIXED mode, hash aggregation happens in phase 1 and the output
2632 * happens in phase 0. So, we switch to phase 1 when processing a batch,
2633 * and back to phase 0 after the batch is done.
2634 */
2635 Assert(aggstate->current_phase == 0);
2636 if (aggstate->phase->aggstrategy == AGG_MIXED)
2637 {
2638 aggstate->current_phase = 1;
2639 aggstate->phase = &aggstate->phases[aggstate->current_phase];
2640 }
2641
2642 select_current_set(aggstate, batch->setno, true);
2643
2644 perhash = &aggstate->perhash[aggstate->current_set];
2645
2646 /*
2647 * Spilled tuples are always read back as MinimalTuples, which may be
2648 * different from the outer plan, so recompile the aggregate expressions.
2649 *
2650 * We still need the NULL check, because we are only processing one
2651 * grouping set at a time and the rest will be NULL.
2652 */
2653 hashagg_recompile_expressions(aggstate, true, true);
2654
2655 for (;;)
2656 {
2657 TupleTableSlot *spillslot = aggstate->hash_spill_rslot;
2658 TupleTableSlot *hashslot = perhash->hashslot;
2659 TupleHashEntry entry;
2660 MinimalTuple tuple;
2661 uint32 hash;
2662 bool isnew = false;
2663 bool *p_isnew = aggstate->hash_spill_mode ? NULL : &isnew;
2664
2665 CHECK_FOR_INTERRUPTS();
2666
2667 tuple = hashagg_batch_read(batch, &hash);
2668 if (tuple == NULL)
2669 break;
2670
2671 ExecStoreMinimalTuple(tuple, spillslot, true);
2672 aggstate->tmpcontext->ecxt_outertuple = spillslot;
2673
2674 prepare_hash_slot(perhash,
2675 aggstate->tmpcontext->ecxt_outertuple,
2676 hashslot);
2677 entry = LookupTupleHashEntryHash(
2678 perhash->hashtable, hashslot, p_isnew, hash);
2679
2680 if (entry != NULL)
2681 {
2682 if (isnew)
2683 initialize_hash_entry(aggstate, perhash->hashtable, entry);
2684 aggstate->hash_pergroup[batch->setno] = entry->additional;
2685 advance_aggregates(aggstate);
2686 }
2687 else
2688 {
2689 if (!spill_initialized)
2690 {
2691 /*
2692 * Avoid initializing the spill until we actually need it so
2693 * that we don't assign tapes that will never be used.
2694 */
2695 spill_initialized = true;
2696 hashagg_spill_init(&spill, tapeinfo, batch->used_bits,
2697 batch->input_card, aggstate->hashentrysize);
2698 }
2699 /* no memory for a new group, spill */
2700 hashagg_spill_tuple(aggstate, &spill, spillslot, hash);
2701
2702 aggstate->hash_pergroup[batch->setno] = NULL;
2703 }
2704
2705 /*
2706 * Reset per-input-tuple context after each tuple, but note that the
2707 * hash lookups do this too
2708 */
2709 ResetExprContext(aggstate->tmpcontext);
2710 }
2711
2712 hashagg_tapeinfo_release(tapeinfo, batch->input_tapenum);
2713
2714 /* change back to phase 0 */
2715 aggstate->current_phase = 0;
2716 aggstate->phase = &aggstate->phases[aggstate->current_phase];
2717
2718 if (spill_initialized)
2719 {
2720 hashagg_spill_finish(aggstate, &spill, batch->setno);
2721 hash_agg_update_metrics(aggstate, true, spill.npartitions);
2722 }
2723 else
2724 hash_agg_update_metrics(aggstate, true, 0);
2725
2726 aggstate->hash_spill_mode = false;
2727
2728 /* prepare to walk the first hash table */
2729 select_current_set(aggstate, batch->setno, true);
2730 ResetTupleHashIterator(aggstate->perhash[batch->setno].hashtable,
2731 &aggstate->perhash[batch->setno].hashiter);
2732
2733 pfree(batch);
2734
2735 return true;
2736 }
2737
2738 /*
2739 * ExecAgg for hashed case: retrieving groups from hash table
2740 *
2741 * After exhausting in-memory tuples, also try refilling the hash table using
2742 * previously-spilled tuples. Only returns NULL after all in-memory and
2743 * spilled tuples are exhausted.
2744 */
2745 static TupleTableSlot *
agg_retrieve_hash_table(AggState * aggstate)2746 agg_retrieve_hash_table(AggState *aggstate)
2747 {
2748 TupleTableSlot *result = NULL;
2749
2750 while (result == NULL)
2751 {
2752 result = agg_retrieve_hash_table_in_memory(aggstate);
2753 if (result == NULL)
2754 {
2755 if (!agg_refill_hash_table(aggstate))
2756 {
2757 aggstate->agg_done = true;
2758 break;
2759 }
2760 }
2761 }
2762
2763 return result;
2764 }
2765
2766 /*
2767 * Retrieve the groups from the in-memory hash tables without considering any
2768 * spilled tuples.
2769 */
2770 static TupleTableSlot *
agg_retrieve_hash_table_in_memory(AggState * aggstate)2771 agg_retrieve_hash_table_in_memory(AggState *aggstate)
2772 {
2773 ExprContext *econtext;
2774 AggStatePerAgg peragg;
2775 AggStatePerGroup pergroup;
2776 TupleHashEntryData *entry;
2777 TupleTableSlot *firstSlot;
2778 TupleTableSlot *result;
2779 AggStatePerHash perhash;
2780
2781 /*
2782 * get state info from node.
2783 *
2784 * econtext is the per-output-tuple expression context.
2785 */
2786 econtext = aggstate->ss.ps.ps_ExprContext;
2787 peragg = aggstate->peragg;
2788 firstSlot = aggstate->ss.ss_ScanTupleSlot;
2789
2790 /*
2791 * Note that perhash (and therefore anything accessed through it) can
2792 * change inside the loop, as we change between grouping sets.
2793 */
2794 perhash = &aggstate->perhash[aggstate->current_set];
2795
2796 /*
2797 * We loop retrieving groups until we find one satisfying
2798 * aggstate->ss.ps.qual
2799 */
2800 for (;;)
2801 {
2802 TupleTableSlot *hashslot = perhash->hashslot;
2803 int i;
2804
2805 CHECK_FOR_INTERRUPTS();
2806
2807 /*
2808 * Find the next entry in the hash table
2809 */
2810 entry = ScanTupleHashTable(perhash->hashtable, &perhash->hashiter);
2811 if (entry == NULL)
2812 {
2813 int nextset = aggstate->current_set + 1;
2814
2815 if (nextset < aggstate->num_hashes)
2816 {
2817 /*
2818 * Switch to next grouping set, reinitialize, and restart the
2819 * loop.
2820 */
2821 select_current_set(aggstate, nextset, true);
2822
2823 perhash = &aggstate->perhash[aggstate->current_set];
2824
2825 ResetTupleHashIterator(perhash->hashtable, &perhash->hashiter);
2826
2827 continue;
2828 }
2829 else
2830 {
2831 return NULL;
2832 }
2833 }
2834
2835 /*
2836 * Clear the per-output-tuple context for each group
2837 *
2838 * We intentionally don't use ReScanExprContext here; if any aggs have
2839 * registered shutdown callbacks, they mustn't be called yet, since we
2840 * might not be done with that agg.
2841 */
2842 ResetExprContext(econtext);
2843
2844 /*
2845 * Transform representative tuple back into one with the right
2846 * columns.
2847 */
2848 ExecStoreMinimalTuple(entry->firstTuple, hashslot, false);
2849 slot_getallattrs(hashslot);
2850
2851 ExecClearTuple(firstSlot);
2852 memset(firstSlot->tts_isnull, true,
2853 firstSlot->tts_tupleDescriptor->natts * sizeof(bool));
2854
2855 for (i = 0; i < perhash->numhashGrpCols; i++)
2856 {
2857 int varNumber = perhash->hashGrpColIdxInput[i] - 1;
2858
2859 firstSlot->tts_values[varNumber] = hashslot->tts_values[i];
2860 firstSlot->tts_isnull[varNumber] = hashslot->tts_isnull[i];
2861 }
2862 ExecStoreVirtualTuple(firstSlot);
2863
2864 pergroup = (AggStatePerGroup) entry->additional;
2865
2866 /*
2867 * Use the representative input tuple for any references to
2868 * non-aggregated input columns in the qual and tlist.
2869 */
2870 econtext->ecxt_outertuple = firstSlot;
2871
2872 prepare_projection_slot(aggstate,
2873 econtext->ecxt_outertuple,
2874 aggstate->current_set);
2875
2876 finalize_aggregates(aggstate, peragg, pergroup);
2877
2878 result = project_aggregates(aggstate);
2879 if (result)
2880 return result;
2881 }
2882
2883 /* No more groups */
2884 return NULL;
2885 }
2886
2887 /*
2888 * Initialize HashTapeInfo
2889 */
2890 static void
hashagg_tapeinfo_init(AggState * aggstate)2891 hashagg_tapeinfo_init(AggState *aggstate)
2892 {
2893 HashTapeInfo *tapeinfo = palloc(sizeof(HashTapeInfo));
2894 int init_tapes = 16; /* expanded dynamically */
2895
2896 tapeinfo->tapeset = LogicalTapeSetCreate(init_tapes, true, NULL, NULL, -1);
2897 tapeinfo->ntapes = init_tapes;
2898 tapeinfo->nfreetapes = init_tapes;
2899 tapeinfo->freetapes_alloc = init_tapes;
2900 tapeinfo->freetapes = palloc(init_tapes * sizeof(int));
2901 for (int i = 0; i < init_tapes; i++)
2902 tapeinfo->freetapes[i] = i;
2903
2904 aggstate->hash_tapeinfo = tapeinfo;
2905 }
2906
2907 /*
2908 * Assign unused tapes to spill partitions, extending the tape set if
2909 * necessary.
2910 */
2911 static void
hashagg_tapeinfo_assign(HashTapeInfo * tapeinfo,int * partitions,int npartitions)2912 hashagg_tapeinfo_assign(HashTapeInfo *tapeinfo, int *partitions,
2913 int npartitions)
2914 {
2915 int partidx = 0;
2916
2917 /* use free tapes if available */
2918 while (partidx < npartitions && tapeinfo->nfreetapes > 0)
2919 partitions[partidx++] = tapeinfo->freetapes[--tapeinfo->nfreetapes];
2920
2921 if (partidx < npartitions)
2922 {
2923 LogicalTapeSetExtend(tapeinfo->tapeset, npartitions - partidx);
2924
2925 while (partidx < npartitions)
2926 partitions[partidx++] = tapeinfo->ntapes++;
2927 }
2928 }
2929
2930 /*
2931 * After a tape has already been written to and then read, this function
2932 * rewinds it for writing and adds it to the free list.
2933 */
2934 static void
hashagg_tapeinfo_release(HashTapeInfo * tapeinfo,int tapenum)2935 hashagg_tapeinfo_release(HashTapeInfo *tapeinfo, int tapenum)
2936 {
2937 /* rewinding frees the buffer while not in use */
2938 LogicalTapeRewindForWrite(tapeinfo->tapeset, tapenum);
2939 if (tapeinfo->freetapes_alloc == tapeinfo->nfreetapes)
2940 {
2941 tapeinfo->freetapes_alloc <<= 1;
2942 tapeinfo->freetapes = repalloc(tapeinfo->freetapes,
2943 tapeinfo->freetapes_alloc * sizeof(int));
2944 }
2945 tapeinfo->freetapes[tapeinfo->nfreetapes++] = tapenum;
2946 }
2947
2948 /*
2949 * hashagg_spill_init
2950 *
2951 * Called after we determined that spilling is necessary. Chooses the number
2952 * of partitions to create, and initializes them.
2953 */
2954 static void
hashagg_spill_init(HashAggSpill * spill,HashTapeInfo * tapeinfo,int used_bits,double input_groups,double hashentrysize)2955 hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, int used_bits,
2956 double input_groups, double hashentrysize)
2957 {
2958 int npartitions;
2959 int partition_bits;
2960
2961 npartitions = hash_choose_num_partitions(input_groups, hashentrysize,
2962 used_bits, &partition_bits);
2963
2964 spill->partitions = palloc0(sizeof(int) * npartitions);
2965 spill->ntuples = palloc0(sizeof(int64) * npartitions);
2966 spill->hll_card = palloc0(sizeof(hyperLogLogState) * npartitions);
2967
2968 hashagg_tapeinfo_assign(tapeinfo, spill->partitions, npartitions);
2969
2970 spill->tapeset = tapeinfo->tapeset;
2971 spill->shift = 32 - used_bits - partition_bits;
2972 spill->mask = (npartitions - 1) << spill->shift;
2973 spill->npartitions = npartitions;
2974
2975 for (int i = 0; i < npartitions; i++)
2976 initHyperLogLog(&spill->hll_card[i], HASHAGG_HLL_BIT_WIDTH);
2977 }
2978
2979 /*
2980 * hashagg_spill_tuple
2981 *
2982 * No room for new groups in the hash table. Save for later in the appropriate
2983 * partition.
2984 */
2985 static Size
hashagg_spill_tuple(AggState * aggstate,HashAggSpill * spill,TupleTableSlot * inputslot,uint32 hash)2986 hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
2987 TupleTableSlot *inputslot, uint32 hash)
2988 {
2989 LogicalTapeSet *tapeset = spill->tapeset;
2990 TupleTableSlot *spillslot;
2991 int partition;
2992 MinimalTuple tuple;
2993 int tapenum;
2994 int total_written = 0;
2995 bool shouldFree;
2996
2997 Assert(spill->partitions != NULL);
2998
2999 /* spill only attributes that we actually need */
3000 if (!aggstate->all_cols_needed)
3001 {
3002 spillslot = aggstate->hash_spill_wslot;
3003 slot_getsomeattrs(inputslot, aggstate->max_colno_needed);
3004 ExecClearTuple(spillslot);
3005 for (int i = 0; i < spillslot->tts_tupleDescriptor->natts; i++)
3006 {
3007 if (bms_is_member(i + 1, aggstate->colnos_needed))
3008 {
3009 spillslot->tts_values[i] = inputslot->tts_values[i];
3010 spillslot->tts_isnull[i] = inputslot->tts_isnull[i];
3011 }
3012 else
3013 spillslot->tts_isnull[i] = true;
3014 }
3015 ExecStoreVirtualTuple(spillslot);
3016 }
3017 else
3018 spillslot = inputslot;
3019
3020 tuple = ExecFetchSlotMinimalTuple(spillslot, &shouldFree);
3021
3022 partition = (hash & spill->mask) >> spill->shift;
3023 spill->ntuples[partition]++;
3024
3025 /*
3026 * All hash values destined for a given partition have some bits in
3027 * common, which causes bad HLL cardinality estimates. Hash the hash to
3028 * get a more uniform distribution.
3029 */
3030 addHyperLogLog(&spill->hll_card[partition], hash_bytes_uint32(hash));
3031
3032 tapenum = spill->partitions[partition];
3033
3034 LogicalTapeWrite(tapeset, tapenum, (void *) &hash, sizeof(uint32));
3035 total_written += sizeof(uint32);
3036
3037 LogicalTapeWrite(tapeset, tapenum, (void *) tuple, tuple->t_len);
3038 total_written += tuple->t_len;
3039
3040 if (shouldFree)
3041 pfree(tuple);
3042
3043 return total_written;
3044 }
3045
3046 /*
3047 * hashagg_batch_new
3048 *
3049 * Construct a HashAggBatch item, which represents one iteration of HashAgg to
3050 * be done.
3051 */
3052 static HashAggBatch *
hashagg_batch_new(LogicalTapeSet * tapeset,int tapenum,int setno,int64 input_tuples,double input_card,int used_bits)3053 hashagg_batch_new(LogicalTapeSet *tapeset, int tapenum, int setno,
3054 int64 input_tuples, double input_card, int used_bits)
3055 {
3056 HashAggBatch *batch = palloc0(sizeof(HashAggBatch));
3057
3058 batch->setno = setno;
3059 batch->used_bits = used_bits;
3060 batch->tapeset = tapeset;
3061 batch->input_tapenum = tapenum;
3062 batch->input_tuples = input_tuples;
3063 batch->input_card = input_card;
3064
3065 return batch;
3066 }
3067
3068 /*
3069 * read_spilled_tuple
3070 * read the next tuple from a batch's tape. Return NULL if no more.
3071 */
3072 static MinimalTuple
hashagg_batch_read(HashAggBatch * batch,uint32 * hashp)3073 hashagg_batch_read(HashAggBatch *batch, uint32 *hashp)
3074 {
3075 LogicalTapeSet *tapeset = batch->tapeset;
3076 int tapenum = batch->input_tapenum;
3077 MinimalTuple tuple;
3078 uint32 t_len;
3079 size_t nread;
3080 uint32 hash;
3081
3082 nread = LogicalTapeRead(tapeset, tapenum, &hash, sizeof(uint32));
3083 if (nread == 0)
3084 return NULL;
3085 if (nread != sizeof(uint32))
3086 ereport(ERROR,
3087 (errcode_for_file_access(),
3088 errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes",
3089 tapenum, sizeof(uint32), nread)));
3090 if (hashp != NULL)
3091 *hashp = hash;
3092
3093 nread = LogicalTapeRead(tapeset, tapenum, &t_len, sizeof(t_len));
3094 if (nread != sizeof(uint32))
3095 ereport(ERROR,
3096 (errcode_for_file_access(),
3097 errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes",
3098 tapenum, sizeof(uint32), nread)));
3099
3100 tuple = (MinimalTuple) palloc(t_len);
3101 tuple->t_len = t_len;
3102
3103 nread = LogicalTapeRead(tapeset, tapenum,
3104 (void *) ((char *) tuple + sizeof(uint32)),
3105 t_len - sizeof(uint32));
3106 if (nread != t_len - sizeof(uint32))
3107 ereport(ERROR,
3108 (errcode_for_file_access(),
3109 errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes",
3110 tapenum, t_len - sizeof(uint32), nread)));
3111
3112 return tuple;
3113 }
3114
3115 /*
3116 * hashagg_finish_initial_spills
3117 *
3118 * After a HashAggBatch has been processed, it may have spilled tuples to
3119 * disk. If so, turn the spilled partitions into new batches that must later
3120 * be executed.
3121 */
3122 static void
hashagg_finish_initial_spills(AggState * aggstate)3123 hashagg_finish_initial_spills(AggState *aggstate)
3124 {
3125 int setno;
3126 int total_npartitions = 0;
3127
3128 if (aggstate->hash_spills != NULL)
3129 {
3130 for (setno = 0; setno < aggstate->num_hashes; setno++)
3131 {
3132 HashAggSpill *spill = &aggstate->hash_spills[setno];
3133
3134 total_npartitions += spill->npartitions;
3135 hashagg_spill_finish(aggstate, spill, setno);
3136 }
3137
3138 /*
3139 * We're not processing tuples from outer plan any more; only
3140 * processing batches of spilled tuples. The initial spill structures
3141 * are no longer needed.
3142 */
3143 pfree(aggstate->hash_spills);
3144 aggstate->hash_spills = NULL;
3145 }
3146
3147 hash_agg_update_metrics(aggstate, false, total_npartitions);
3148 aggstate->hash_spill_mode = false;
3149 }
3150
3151 /*
3152 * hashagg_spill_finish
3153 *
3154 * Transform spill partitions into new batches.
3155 */
3156 static void
hashagg_spill_finish(AggState * aggstate,HashAggSpill * spill,int setno)3157 hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno)
3158 {
3159 int i;
3160 int used_bits = 32 - spill->shift;
3161
3162 if (spill->npartitions == 0)
3163 return; /* didn't spill */
3164
3165 for (i = 0; i < spill->npartitions; i++)
3166 {
3167 LogicalTapeSet *tapeset = aggstate->hash_tapeinfo->tapeset;
3168 int tapenum = spill->partitions[i];
3169 HashAggBatch *new_batch;
3170 double cardinality;
3171
3172 /* if the partition is empty, don't create a new batch of work */
3173 if (spill->ntuples[i] == 0)
3174 continue;
3175
3176 cardinality = estimateHyperLogLog(&spill->hll_card[i]);
3177 freeHyperLogLog(&spill->hll_card[i]);
3178
3179 /* rewinding frees the buffer while not in use */
3180 LogicalTapeRewindForRead(tapeset, tapenum,
3181 HASHAGG_READ_BUFFER_SIZE);
3182
3183 new_batch = hashagg_batch_new(tapeset, tapenum, setno,
3184 spill->ntuples[i], cardinality,
3185 used_bits);
3186 aggstate->hash_batches = lappend(aggstate->hash_batches, new_batch);
3187 aggstate->hash_batches_used++;
3188 }
3189
3190 pfree(spill->ntuples);
3191 pfree(spill->hll_card);
3192 pfree(spill->partitions);
3193 }
3194
3195 /*
3196 * Free resources related to a spilled HashAgg.
3197 */
3198 static void
hashagg_reset_spill_state(AggState * aggstate)3199 hashagg_reset_spill_state(AggState *aggstate)
3200 {
3201 /* free spills from initial pass */
3202 if (aggstate->hash_spills != NULL)
3203 {
3204 int setno;
3205
3206 for (setno = 0; setno < aggstate->num_hashes; setno++)
3207 {
3208 HashAggSpill *spill = &aggstate->hash_spills[setno];
3209
3210 pfree(spill->ntuples);
3211 pfree(spill->partitions);
3212 }
3213 pfree(aggstate->hash_spills);
3214 aggstate->hash_spills = NULL;
3215 }
3216
3217 /* free batches */
3218 list_free_deep(aggstate->hash_batches);
3219 aggstate->hash_batches = NIL;
3220
3221 /* close tape set */
3222 if (aggstate->hash_tapeinfo != NULL)
3223 {
3224 HashTapeInfo *tapeinfo = aggstate->hash_tapeinfo;
3225
3226 LogicalTapeSetClose(tapeinfo->tapeset);
3227 pfree(tapeinfo->freetapes);
3228 pfree(tapeinfo);
3229 aggstate->hash_tapeinfo = NULL;
3230 }
3231 }
3232
3233
3234 /* -----------------
3235 * ExecInitAgg
3236 *
3237 * Creates the run-time information for the agg node produced by the
3238 * planner and initializes its outer subtree.
3239 *
3240 * -----------------
3241 */
3242 AggState *
ExecInitAgg(Agg * node,EState * estate,int eflags)3243 ExecInitAgg(Agg *node, EState *estate, int eflags)
3244 {
3245 AggState *aggstate;
3246 AggStatePerAgg peraggs;
3247 AggStatePerTrans pertransstates;
3248 AggStatePerGroup *pergroups;
3249 Plan *outerPlan;
3250 ExprContext *econtext;
3251 TupleDesc scanDesc;
3252 int max_aggno;
3253 int max_transno;
3254 int numaggrefs;
3255 int numaggs;
3256 int numtrans;
3257 int phase;
3258 int phaseidx;
3259 ListCell *l;
3260 Bitmapset *all_grouped_cols = NULL;
3261 int numGroupingSets = 1;
3262 int numPhases;
3263 int numHashes;
3264 int i = 0;
3265 int j = 0;
3266 bool use_hashing = (node->aggstrategy == AGG_HASHED ||
3267 node->aggstrategy == AGG_MIXED);
3268
3269 /* check for unsupported flags */
3270 Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
3271
3272 /*
3273 * create state structure
3274 */
3275 aggstate = makeNode(AggState);
3276 aggstate->ss.ps.plan = (Plan *) node;
3277 aggstate->ss.ps.state = estate;
3278 aggstate->ss.ps.ExecProcNode = ExecAgg;
3279
3280 aggstate->aggs = NIL;
3281 aggstate->numaggs = 0;
3282 aggstate->numtrans = 0;
3283 aggstate->aggstrategy = node->aggstrategy;
3284 aggstate->aggsplit = node->aggsplit;
3285 aggstate->maxsets = 0;
3286 aggstate->projected_set = -1;
3287 aggstate->current_set = 0;
3288 aggstate->peragg = NULL;
3289 aggstate->pertrans = NULL;
3290 aggstate->curperagg = NULL;
3291 aggstate->curpertrans = NULL;
3292 aggstate->input_done = false;
3293 aggstate->agg_done = false;
3294 aggstate->pergroups = NULL;
3295 aggstate->grp_firstTuple = NULL;
3296 aggstate->sort_in = NULL;
3297 aggstate->sort_out = NULL;
3298
3299 /*
3300 * phases[0] always exists, but is dummy in sorted/plain mode
3301 */
3302 numPhases = (use_hashing ? 1 : 2);
3303 numHashes = (use_hashing ? 1 : 0);
3304
3305 /*
3306 * Calculate the maximum number of grouping sets in any phase; this
3307 * determines the size of some allocations. Also calculate the number of
3308 * phases, since all hashed/mixed nodes contribute to only a single phase.
3309 */
3310 if (node->groupingSets)
3311 {
3312 numGroupingSets = list_length(node->groupingSets);
3313
3314 foreach(l, node->chain)
3315 {
3316 Agg *agg = lfirst(l);
3317
3318 numGroupingSets = Max(numGroupingSets,
3319 list_length(agg->groupingSets));
3320
3321 /*
3322 * additional AGG_HASHED aggs become part of phase 0, but all
3323 * others add an extra phase.
3324 */
3325 if (agg->aggstrategy != AGG_HASHED)
3326 ++numPhases;
3327 else
3328 ++numHashes;
3329 }
3330 }
3331
3332 aggstate->maxsets = numGroupingSets;
3333 aggstate->numphases = numPhases;
3334
3335 aggstate->aggcontexts = (ExprContext **)
3336 palloc0(sizeof(ExprContext *) * numGroupingSets);
3337
3338 /*
3339 * Create expression contexts. We need three or more, one for
3340 * per-input-tuple processing, one for per-output-tuple processing, one
3341 * for all the hashtables, and one for each grouping set. The per-tuple
3342 * memory context of the per-grouping-set ExprContexts (aggcontexts)
3343 * replaces the standalone memory context formerly used to hold transition
3344 * values. We cheat a little by using ExecAssignExprContext() to build
3345 * all of them.
3346 *
3347 * NOTE: the details of what is stored in aggcontexts and what is stored
3348 * in the regular per-query memory context are driven by a simple
3349 * decision: we want to reset the aggcontext at group boundaries (if not
3350 * hashing) and in ExecReScanAgg to recover no-longer-wanted space.
3351 */
3352 ExecAssignExprContext(estate, &aggstate->ss.ps);
3353 aggstate->tmpcontext = aggstate->ss.ps.ps_ExprContext;
3354
3355 for (i = 0; i < numGroupingSets; ++i)
3356 {
3357 ExecAssignExprContext(estate, &aggstate->ss.ps);
3358 aggstate->aggcontexts[i] = aggstate->ss.ps.ps_ExprContext;
3359 }
3360
3361 if (use_hashing)
3362 aggstate->hashcontext = CreateWorkExprContext(estate);
3363
3364 ExecAssignExprContext(estate, &aggstate->ss.ps);
3365
3366 /*
3367 * Initialize child nodes.
3368 *
3369 * If we are doing a hashed aggregation then the child plan does not need
3370 * to handle REWIND efficiently; see ExecReScanAgg.
3371 */
3372 if (node->aggstrategy == AGG_HASHED)
3373 eflags &= ~EXEC_FLAG_REWIND;
3374 outerPlan = outerPlan(node);
3375 outerPlanState(aggstate) = ExecInitNode(outerPlan, estate, eflags);
3376
3377 /*
3378 * initialize source tuple type.
3379 */
3380 aggstate->ss.ps.outerops =
3381 ExecGetResultSlotOps(outerPlanState(&aggstate->ss),
3382 &aggstate->ss.ps.outeropsfixed);
3383 aggstate->ss.ps.outeropsset = true;
3384
3385 ExecCreateScanSlotFromOuterPlan(estate, &aggstate->ss,
3386 aggstate->ss.ps.outerops);
3387 scanDesc = aggstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
3388
3389 /*
3390 * If there are more than two phases (including a potential dummy phase
3391 * 0), input will be resorted using tuplesort. Need a slot for that.
3392 */
3393 if (numPhases > 2)
3394 {
3395 aggstate->sort_slot = ExecInitExtraTupleSlot(estate, scanDesc,
3396 &TTSOpsMinimalTuple);
3397
3398 /*
3399 * The output of the tuplesort, and the output from the outer child
3400 * might not use the same type of slot. In most cases the child will
3401 * be a Sort, and thus return a TTSOpsMinimalTuple type slot - but the
3402 * input can also be presorted due an index, in which case it could be
3403 * a different type of slot.
3404 *
3405 * XXX: For efficiency it would be good to instead/additionally
3406 * generate expressions with corresponding settings of outerops* for
3407 * the individual phases - deforming is often a bottleneck for
3408 * aggregations with lots of rows per group. If there's multiple
3409 * sorts, we know that all but the first use TTSOpsMinimalTuple (via
3410 * the nodeAgg.c internal tuplesort).
3411 */
3412 if (aggstate->ss.ps.outeropsfixed &&
3413 aggstate->ss.ps.outerops != &TTSOpsMinimalTuple)
3414 aggstate->ss.ps.outeropsfixed = false;
3415 }
3416
3417 /*
3418 * Initialize result type, slot and projection.
3419 */
3420 ExecInitResultTupleSlotTL(&aggstate->ss.ps, &TTSOpsVirtual);
3421 ExecAssignProjectionInfo(&aggstate->ss.ps, NULL);
3422
3423 /*
3424 * initialize child expressions
3425 *
3426 * We expect the parser to have checked that no aggs contain other agg
3427 * calls in their arguments (and just to be sure, we verify it again while
3428 * initializing the plan node). This would make no sense under SQL
3429 * semantics, and it's forbidden by the spec. Because it is true, we
3430 * don't need to worry about evaluating the aggs in any particular order.
3431 *
3432 * Note: execExpr.c finds Aggrefs for us, and adds them to aggstate->aggs.
3433 * Aggrefs in the qual are found here; Aggrefs in the targetlist are found
3434 * during ExecAssignProjectionInfo, above.
3435 */
3436 aggstate->ss.ps.qual =
3437 ExecInitQual(node->plan.qual, (PlanState *) aggstate);
3438
3439 /*
3440 * We should now have found all Aggrefs in the targetlist and quals.
3441 */
3442 numaggrefs = list_length(aggstate->aggs);
3443 max_aggno = -1;
3444 max_transno = -1;
3445 foreach(l, aggstate->aggs)
3446 {
3447 Aggref *aggref = (Aggref *) lfirst(l);
3448
3449 max_aggno = Max(max_aggno, aggref->aggno);
3450 max_transno = Max(max_transno, aggref->aggtransno);
3451 }
3452 numaggs = max_aggno + 1;
3453 numtrans = max_transno + 1;
3454
3455 /*
3456 * For each phase, prepare grouping set data and fmgr lookup data for
3457 * compare functions. Accumulate all_grouped_cols in passing.
3458 */
3459 aggstate->phases = palloc0(numPhases * sizeof(AggStatePerPhaseData));
3460
3461 aggstate->num_hashes = numHashes;
3462 if (numHashes)
3463 {
3464 aggstate->perhash = palloc0(sizeof(AggStatePerHashData) * numHashes);
3465 aggstate->phases[0].numsets = 0;
3466 aggstate->phases[0].gset_lengths = palloc(numHashes * sizeof(int));
3467 aggstate->phases[0].grouped_cols = palloc(numHashes * sizeof(Bitmapset *));
3468 }
3469
3470 phase = 0;
3471 for (phaseidx = 0; phaseidx <= list_length(node->chain); ++phaseidx)
3472 {
3473 Agg *aggnode;
3474 Sort *sortnode;
3475
3476 if (phaseidx > 0)
3477 {
3478 aggnode = list_nth_node(Agg, node->chain, phaseidx - 1);
3479 sortnode = castNode(Sort, aggnode->plan.lefttree);
3480 }
3481 else
3482 {
3483 aggnode = node;
3484 sortnode = NULL;
3485 }
3486
3487 Assert(phase <= 1 || sortnode);
3488
3489 if (aggnode->aggstrategy == AGG_HASHED
3490 || aggnode->aggstrategy == AGG_MIXED)
3491 {
3492 AggStatePerPhase phasedata = &aggstate->phases[0];
3493 AggStatePerHash perhash;
3494 Bitmapset *cols = NULL;
3495
3496 Assert(phase == 0);
3497 i = phasedata->numsets++;
3498 perhash = &aggstate->perhash[i];
3499
3500 /* phase 0 always points to the "real" Agg in the hash case */
3501 phasedata->aggnode = node;
3502 phasedata->aggstrategy = node->aggstrategy;
3503
3504 /* but the actual Agg node representing this hash is saved here */
3505 perhash->aggnode = aggnode;
3506
3507 phasedata->gset_lengths[i] = perhash->numCols = aggnode->numCols;
3508
3509 for (j = 0; j < aggnode->numCols; ++j)
3510 cols = bms_add_member(cols, aggnode->grpColIdx[j]);
3511
3512 phasedata->grouped_cols[i] = cols;
3513
3514 all_grouped_cols = bms_add_members(all_grouped_cols, cols);
3515 continue;
3516 }
3517 else
3518 {
3519 AggStatePerPhase phasedata = &aggstate->phases[++phase];
3520 int num_sets;
3521
3522 phasedata->numsets = num_sets = list_length(aggnode->groupingSets);
3523
3524 if (num_sets)
3525 {
3526 phasedata->gset_lengths = palloc(num_sets * sizeof(int));
3527 phasedata->grouped_cols = palloc(num_sets * sizeof(Bitmapset *));
3528
3529 i = 0;
3530 foreach(l, aggnode->groupingSets)
3531 {
3532 int current_length = list_length(lfirst(l));
3533 Bitmapset *cols = NULL;
3534
3535 /* planner forces this to be correct */
3536 for (j = 0; j < current_length; ++j)
3537 cols = bms_add_member(cols, aggnode->grpColIdx[j]);
3538
3539 phasedata->grouped_cols[i] = cols;
3540 phasedata->gset_lengths[i] = current_length;
3541
3542 ++i;
3543 }
3544
3545 all_grouped_cols = bms_add_members(all_grouped_cols,
3546 phasedata->grouped_cols[0]);
3547 }
3548 else
3549 {
3550 Assert(phaseidx == 0);
3551
3552 phasedata->gset_lengths = NULL;
3553 phasedata->grouped_cols = NULL;
3554 }
3555
3556 /*
3557 * If we are grouping, precompute fmgr lookup data for inner loop.
3558 */
3559 if (aggnode->aggstrategy == AGG_SORTED)
3560 {
3561 int i = 0;
3562
3563 Assert(aggnode->numCols > 0);
3564
3565 /*
3566 * Build a separate function for each subset of columns that
3567 * need to be compared.
3568 */
3569 phasedata->eqfunctions =
3570 (ExprState **) palloc0(aggnode->numCols * sizeof(ExprState *));
3571
3572 /* for each grouping set */
3573 for (i = 0; i < phasedata->numsets; i++)
3574 {
3575 int length = phasedata->gset_lengths[i];
3576
3577 if (phasedata->eqfunctions[length - 1] != NULL)
3578 continue;
3579
3580 phasedata->eqfunctions[length - 1] =
3581 execTuplesMatchPrepare(scanDesc,
3582 length,
3583 aggnode->grpColIdx,
3584 aggnode->grpOperators,
3585 aggnode->grpCollations,
3586 (PlanState *) aggstate);
3587 }
3588
3589 /* and for all grouped columns, unless already computed */
3590 if (phasedata->eqfunctions[aggnode->numCols - 1] == NULL)
3591 {
3592 phasedata->eqfunctions[aggnode->numCols - 1] =
3593 execTuplesMatchPrepare(scanDesc,
3594 aggnode->numCols,
3595 aggnode->grpColIdx,
3596 aggnode->grpOperators,
3597 aggnode->grpCollations,
3598 (PlanState *) aggstate);
3599 }
3600 }
3601
3602 phasedata->aggnode = aggnode;
3603 phasedata->aggstrategy = aggnode->aggstrategy;
3604 phasedata->sortnode = sortnode;
3605 }
3606 }
3607
3608 /*
3609 * Convert all_grouped_cols to a descending-order list.
3610 */
3611 i = -1;
3612 while ((i = bms_next_member(all_grouped_cols, i)) >= 0)
3613 aggstate->all_grouped_cols = lcons_int(i, aggstate->all_grouped_cols);
3614
3615 /*
3616 * Set up aggregate-result storage in the output expr context, and also
3617 * allocate my private per-agg working storage
3618 */
3619 econtext = aggstate->ss.ps.ps_ExprContext;
3620 econtext->ecxt_aggvalues = (Datum *) palloc0(sizeof(Datum) * numaggs);
3621 econtext->ecxt_aggnulls = (bool *) palloc0(sizeof(bool) * numaggs);
3622
3623 peraggs = (AggStatePerAgg) palloc0(sizeof(AggStatePerAggData) * numaggs);
3624 pertransstates = (AggStatePerTrans) palloc0(sizeof(AggStatePerTransData) * numtrans);
3625
3626 aggstate->peragg = peraggs;
3627 aggstate->pertrans = pertransstates;
3628
3629
3630 aggstate->all_pergroups =
3631 (AggStatePerGroup *) palloc0(sizeof(AggStatePerGroup)
3632 * (numGroupingSets + numHashes));
3633 pergroups = aggstate->all_pergroups;
3634
3635 if (node->aggstrategy != AGG_HASHED)
3636 {
3637 for (i = 0; i < numGroupingSets; i++)
3638 {
3639 pergroups[i] = (AggStatePerGroup) palloc0(sizeof(AggStatePerGroupData)
3640 * numaggs);
3641 }
3642
3643 aggstate->pergroups = pergroups;
3644 pergroups += numGroupingSets;
3645 }
3646
3647 /*
3648 * Hashing can only appear in the initial phase.
3649 */
3650 if (use_hashing)
3651 {
3652 Plan *outerplan = outerPlan(node);
3653 uint64 totalGroups = 0;
3654 int i;
3655
3656 aggstate->hash_metacxt = AllocSetContextCreate(aggstate->ss.ps.state->es_query_cxt,
3657 "HashAgg meta context",
3658 ALLOCSET_DEFAULT_SIZES);
3659 aggstate->hash_spill_rslot = ExecInitExtraTupleSlot(estate, scanDesc,
3660 &TTSOpsMinimalTuple);
3661 aggstate->hash_spill_wslot = ExecInitExtraTupleSlot(estate, scanDesc,
3662 &TTSOpsVirtual);
3663
3664 /* this is an array of pointers, not structures */
3665 aggstate->hash_pergroup = pergroups;
3666
3667 aggstate->hashentrysize = hash_agg_entry_size(aggstate->numtrans,
3668 outerplan->plan_width,
3669 node->transitionSpace);
3670
3671 /*
3672 * Consider all of the grouping sets together when setting the limits
3673 * and estimating the number of partitions. This can be inaccurate
3674 * when there is more than one grouping set, but should still be
3675 * reasonable.
3676 */
3677 for (i = 0; i < aggstate->num_hashes; i++)
3678 totalGroups += aggstate->perhash[i].aggnode->numGroups;
3679
3680 hash_agg_set_limits(aggstate->hashentrysize, totalGroups, 0,
3681 &aggstate->hash_mem_limit,
3682 &aggstate->hash_ngroups_limit,
3683 &aggstate->hash_planned_partitions);
3684 find_hash_columns(aggstate);
3685
3686 /* Skip massive memory allocation if we are just doing EXPLAIN */
3687 if (!(eflags & EXEC_FLAG_EXPLAIN_ONLY))
3688 build_hash_tables(aggstate);
3689
3690 aggstate->table_filled = false;
3691
3692 /* Initialize this to 1, meaning nothing spilled, yet */
3693 aggstate->hash_batches_used = 1;
3694 }
3695
3696 /*
3697 * Initialize current phase-dependent values to initial phase. The initial
3698 * phase is 1 (first sort pass) for all strategies that use sorting (if
3699 * hashing is being done too, then phase 0 is processed last); but if only
3700 * hashing is being done, then phase 0 is all there is.
3701 */
3702 if (node->aggstrategy == AGG_HASHED)
3703 {
3704 aggstate->current_phase = 0;
3705 initialize_phase(aggstate, 0);
3706 select_current_set(aggstate, 0, true);
3707 }
3708 else
3709 {
3710 aggstate->current_phase = 1;
3711 initialize_phase(aggstate, 1);
3712 select_current_set(aggstate, 0, false);
3713 }
3714
3715 /*
3716 * Perform lookups of aggregate function info, and initialize the
3717 * unchanging fields of the per-agg and per-trans data.
3718 */
3719 foreach(l, aggstate->aggs)
3720 {
3721 Aggref *aggref = lfirst(l);
3722 AggStatePerAgg peragg;
3723 AggStatePerTrans pertrans;
3724 Oid inputTypes[FUNC_MAX_ARGS];
3725 int numArguments;
3726 int numDirectArgs;
3727 HeapTuple aggTuple;
3728 Form_pg_aggregate aggform;
3729 AclResult aclresult;
3730 Oid finalfn_oid;
3731 Oid serialfn_oid,
3732 deserialfn_oid;
3733 Oid aggOwner;
3734 Expr *finalfnexpr;
3735 Oid aggtranstype;
3736
3737 /* Planner should have assigned aggregate to correct level */
3738 Assert(aggref->agglevelsup == 0);
3739 /* ... and the split mode should match */
3740 Assert(aggref->aggsplit == aggstate->aggsplit);
3741
3742 peragg = &peraggs[aggref->aggno];
3743
3744 /* Check if we initialized the state for this aggregate already. */
3745 if (peragg->aggref != NULL)
3746 continue;
3747
3748 peragg->aggref = aggref;
3749 peragg->transno = aggref->aggtransno;
3750
3751 /* Fetch the pg_aggregate row */
3752 aggTuple = SearchSysCache1(AGGFNOID,
3753 ObjectIdGetDatum(aggref->aggfnoid));
3754 if (!HeapTupleIsValid(aggTuple))
3755 elog(ERROR, "cache lookup failed for aggregate %u",
3756 aggref->aggfnoid);
3757 aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
3758
3759 /* Check permission to call aggregate function */
3760 aclresult = pg_proc_aclcheck(aggref->aggfnoid, GetUserId(),
3761 ACL_EXECUTE);
3762 if (aclresult != ACLCHECK_OK)
3763 aclcheck_error(aclresult, OBJECT_AGGREGATE,
3764 get_func_name(aggref->aggfnoid));
3765 InvokeFunctionExecuteHook(aggref->aggfnoid);
3766
3767 /* planner recorded transition state type in the Aggref itself */
3768 aggtranstype = aggref->aggtranstype;
3769 Assert(OidIsValid(aggtranstype));
3770
3771 /* Final function only required if we're finalizing the aggregates */
3772 if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit))
3773 peragg->finalfn_oid = finalfn_oid = InvalidOid;
3774 else
3775 peragg->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
3776
3777 serialfn_oid = InvalidOid;
3778 deserialfn_oid = InvalidOid;
3779
3780 /*
3781 * Check if serialization/deserialization is required. We only do it
3782 * for aggregates that have transtype INTERNAL.
3783 */
3784 if (aggtranstype == INTERNALOID)
3785 {
3786 /*
3787 * The planner should only have generated a serialize agg node if
3788 * every aggregate with an INTERNAL state has a serialization
3789 * function. Verify that.
3790 */
3791 if (DO_AGGSPLIT_SERIALIZE(aggstate->aggsplit))
3792 {
3793 /* serialization only valid when not running finalfn */
3794 Assert(DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit));
3795
3796 if (!OidIsValid(aggform->aggserialfn))
3797 elog(ERROR, "serialfunc not provided for serialization aggregation");
3798 serialfn_oid = aggform->aggserialfn;
3799 }
3800
3801 /* Likewise for deserialization functions */
3802 if (DO_AGGSPLIT_DESERIALIZE(aggstate->aggsplit))
3803 {
3804 /* deserialization only valid when combining states */
3805 Assert(DO_AGGSPLIT_COMBINE(aggstate->aggsplit));
3806
3807 if (!OidIsValid(aggform->aggdeserialfn))
3808 elog(ERROR, "deserialfunc not provided for deserialization aggregation");
3809 deserialfn_oid = aggform->aggdeserialfn;
3810 }
3811 }
3812
3813 /* Check that aggregate owner has permission to call component fns */
3814 {
3815 HeapTuple procTuple;
3816
3817 procTuple = SearchSysCache1(PROCOID,
3818 ObjectIdGetDatum(aggref->aggfnoid));
3819 if (!HeapTupleIsValid(procTuple))
3820 elog(ERROR, "cache lookup failed for function %u",
3821 aggref->aggfnoid);
3822 aggOwner = ((Form_pg_proc) GETSTRUCT(procTuple))->proowner;
3823 ReleaseSysCache(procTuple);
3824
3825 if (OidIsValid(finalfn_oid))
3826 {
3827 aclresult = pg_proc_aclcheck(finalfn_oid, aggOwner,
3828 ACL_EXECUTE);
3829 if (aclresult != ACLCHECK_OK)
3830 aclcheck_error(aclresult, OBJECT_FUNCTION,
3831 get_func_name(finalfn_oid));
3832 InvokeFunctionExecuteHook(finalfn_oid);
3833 }
3834 if (OidIsValid(serialfn_oid))
3835 {
3836 aclresult = pg_proc_aclcheck(serialfn_oid, aggOwner,
3837 ACL_EXECUTE);
3838 if (aclresult != ACLCHECK_OK)
3839 aclcheck_error(aclresult, OBJECT_FUNCTION,
3840 get_func_name(serialfn_oid));
3841 InvokeFunctionExecuteHook(serialfn_oid);
3842 }
3843 if (OidIsValid(deserialfn_oid))
3844 {
3845 aclresult = pg_proc_aclcheck(deserialfn_oid, aggOwner,
3846 ACL_EXECUTE);
3847 if (aclresult != ACLCHECK_OK)
3848 aclcheck_error(aclresult, OBJECT_FUNCTION,
3849 get_func_name(deserialfn_oid));
3850 InvokeFunctionExecuteHook(deserialfn_oid);
3851 }
3852 }
3853
3854 /*
3855 * Get actual datatypes of the (nominal) aggregate inputs. These
3856 * could be different from the agg's declared input types, when the
3857 * agg accepts ANY or a polymorphic type.
3858 */
3859 numArguments = get_aggregate_argtypes(aggref, inputTypes);
3860
3861 /* Count the "direct" arguments, if any */
3862 numDirectArgs = list_length(aggref->aggdirectargs);
3863
3864 /* Detect how many arguments to pass to the finalfn */
3865 if (aggform->aggfinalextra)
3866 peragg->numFinalArgs = numArguments + 1;
3867 else
3868 peragg->numFinalArgs = numDirectArgs + 1;
3869
3870 /* Initialize any direct-argument expressions */
3871 peragg->aggdirectargs = ExecInitExprList(aggref->aggdirectargs,
3872 (PlanState *) aggstate);
3873
3874 /*
3875 * build expression trees using actual argument & result types for the
3876 * finalfn, if it exists and is required.
3877 */
3878 if (OidIsValid(finalfn_oid))
3879 {
3880 build_aggregate_finalfn_expr(inputTypes,
3881 peragg->numFinalArgs,
3882 aggtranstype,
3883 aggref->aggtype,
3884 aggref->inputcollid,
3885 finalfn_oid,
3886 &finalfnexpr);
3887 fmgr_info(finalfn_oid, &peragg->finalfn);
3888 fmgr_info_set_expr((Node *) finalfnexpr, &peragg->finalfn);
3889 }
3890
3891 /* get info about the output value's datatype */
3892 get_typlenbyval(aggref->aggtype,
3893 &peragg->resulttypeLen,
3894 &peragg->resulttypeByVal);
3895
3896 /*
3897 * Build working state for invoking the transition function, if we
3898 * haven't done it already.
3899 */
3900 pertrans = &pertransstates[aggref->aggtransno];
3901 if (pertrans->aggref == NULL)
3902 {
3903 Datum textInitVal;
3904 Datum initValue;
3905 bool initValueIsNull;
3906 Oid transfn_oid;
3907
3908 /*
3909 * If this aggregation is performing state combines, then instead
3910 * of using the transition function, we'll use the combine
3911 * function
3912 */
3913 if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit))
3914 {
3915 transfn_oid = aggform->aggcombinefn;
3916
3917 /* If not set then the planner messed up */
3918 if (!OidIsValid(transfn_oid))
3919 elog(ERROR, "combinefn not set for aggregate function");
3920 }
3921 else
3922 transfn_oid = aggform->aggtransfn;
3923
3924 aclresult = pg_proc_aclcheck(transfn_oid, aggOwner,
3925 ACL_EXECUTE);
3926 if (aclresult != ACLCHECK_OK)
3927 aclcheck_error(aclresult, OBJECT_FUNCTION,
3928 get_func_name(transfn_oid));
3929 InvokeFunctionExecuteHook(transfn_oid);
3930
3931 /*
3932 * initval is potentially null, so don't try to access it as a
3933 * struct field. Must do it the hard way with SysCacheGetAttr.
3934 */
3935 textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple,
3936 Anum_pg_aggregate_agginitval,
3937 &initValueIsNull);
3938 if (initValueIsNull)
3939 initValue = (Datum) 0;
3940 else
3941 initValue = GetAggInitVal(textInitVal, aggtranstype);
3942
3943 build_pertrans_for_aggref(pertrans, aggstate, estate,
3944 aggref, transfn_oid, aggtranstype,
3945 serialfn_oid, deserialfn_oid,
3946 initValue, initValueIsNull,
3947 inputTypes, numArguments);
3948 }
3949 else
3950 pertrans->aggshared = true;
3951 ReleaseSysCache(aggTuple);
3952 }
3953
3954 /*
3955 * Update aggstate->numaggs to be the number of unique aggregates found.
3956 * Also set numstates to the number of unique transition states found.
3957 */
3958 aggstate->numaggs = numaggs;
3959 aggstate->numtrans = numtrans;
3960
3961 /*
3962 * Last, check whether any more aggregates got added onto the node while
3963 * we processed the expressions for the aggregate arguments (including not
3964 * only the regular arguments and FILTER expressions handled immediately
3965 * above, but any direct arguments we might've handled earlier). If so,
3966 * we have nested aggregate functions, which is semantically nonsensical,
3967 * so complain. (This should have been caught by the parser, so we don't
3968 * need to work hard on a helpful error message; but we defend against it
3969 * here anyway, just to be sure.)
3970 */
3971 if (numaggrefs != list_length(aggstate->aggs))
3972 ereport(ERROR,
3973 (errcode(ERRCODE_GROUPING_ERROR),
3974 errmsg("aggregate function calls cannot be nested")));
3975
3976 /*
3977 * Build expressions doing all the transition work at once. We build a
3978 * different one for each phase, as the number of transition function
3979 * invocation can differ between phases. Note this'll work both for
3980 * transition and combination functions (although there'll only be one
3981 * phase in the latter case).
3982 */
3983 for (phaseidx = 0; phaseidx < aggstate->numphases; phaseidx++)
3984 {
3985 AggStatePerPhase phase = &aggstate->phases[phaseidx];
3986 bool dohash = false;
3987 bool dosort = false;
3988
3989 /* phase 0 doesn't necessarily exist */
3990 if (!phase->aggnode)
3991 continue;
3992
3993 if (aggstate->aggstrategy == AGG_MIXED && phaseidx == 1)
3994 {
3995 /*
3996 * Phase one, and only phase one, in a mixed agg performs both
3997 * sorting and aggregation.
3998 */
3999 dohash = true;
4000 dosort = true;
4001 }
4002 else if (aggstate->aggstrategy == AGG_MIXED && phaseidx == 0)
4003 {
4004 /*
4005 * No need to compute a transition function for an AGG_MIXED phase
4006 * 0 - the contents of the hashtables will have been computed
4007 * during phase 1.
4008 */
4009 continue;
4010 }
4011 else if (phase->aggstrategy == AGG_PLAIN ||
4012 phase->aggstrategy == AGG_SORTED)
4013 {
4014 dohash = false;
4015 dosort = true;
4016 }
4017 else if (phase->aggstrategy == AGG_HASHED)
4018 {
4019 dohash = true;
4020 dosort = false;
4021 }
4022 else
4023 Assert(false);
4024
4025 phase->evaltrans = ExecBuildAggTrans(aggstate, phase, dosort, dohash,
4026 false);
4027
4028 /* cache compiled expression for outer slot without NULL check */
4029 phase->evaltrans_cache[0][0] = phase->evaltrans;
4030 }
4031
4032 return aggstate;
4033 }
4034
4035 /*
4036 * Build the state needed to calculate a state value for an aggregate.
4037 *
4038 * This initializes all the fields in 'pertrans'. 'aggref' is the aggregate
4039 * to initialize the state for. 'aggtransfn', 'aggtranstype', and the rest
4040 * of the arguments could be calculated from 'aggref', but the caller has
4041 * calculated them already, so might as well pass them.
4042 */
4043 static void
build_pertrans_for_aggref(AggStatePerTrans pertrans,AggState * aggstate,EState * estate,Aggref * aggref,Oid aggtransfn,Oid aggtranstype,Oid aggserialfn,Oid aggdeserialfn,Datum initValue,bool initValueIsNull,Oid * inputTypes,int numArguments)4044 build_pertrans_for_aggref(AggStatePerTrans pertrans,
4045 AggState *aggstate, EState *estate,
4046 Aggref *aggref,
4047 Oid aggtransfn, Oid aggtranstype,
4048 Oid aggserialfn, Oid aggdeserialfn,
4049 Datum initValue, bool initValueIsNull,
4050 Oid *inputTypes, int numArguments)
4051 {
4052 int numGroupingSets = Max(aggstate->maxsets, 1);
4053 Expr *serialfnexpr = NULL;
4054 Expr *deserialfnexpr = NULL;
4055 ListCell *lc;
4056 int numInputs;
4057 int numDirectArgs;
4058 List *sortlist;
4059 int numSortCols;
4060 int numDistinctCols;
4061 int i;
4062
4063 /* Begin filling in the pertrans data */
4064 pertrans->aggref = aggref;
4065 pertrans->aggshared = false;
4066 pertrans->aggCollation = aggref->inputcollid;
4067 pertrans->transfn_oid = aggtransfn;
4068 pertrans->serialfn_oid = aggserialfn;
4069 pertrans->deserialfn_oid = aggdeserialfn;
4070 pertrans->initValue = initValue;
4071 pertrans->initValueIsNull = initValueIsNull;
4072
4073 /* Count the "direct" arguments, if any */
4074 numDirectArgs = list_length(aggref->aggdirectargs);
4075
4076 /* Count the number of aggregated input columns */
4077 pertrans->numInputs = numInputs = list_length(aggref->args);
4078
4079 pertrans->aggtranstype = aggtranstype;
4080
4081 /*
4082 * When combining states, we have no use at all for the aggregate
4083 * function's transfn. Instead we use the combinefn. In this case, the
4084 * transfn and transfn_oid fields of pertrans refer to the combine
4085 * function rather than the transition function.
4086 */
4087 if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit))
4088 {
4089 Expr *combinefnexpr;
4090 size_t numTransArgs;
4091
4092 /*
4093 * When combining there's only one input, the to-be-combined added
4094 * transition value from below (this node's transition value is
4095 * counted separately).
4096 */
4097 pertrans->numTransInputs = 1;
4098
4099 /* account for the current transition state */
4100 numTransArgs = pertrans->numTransInputs + 1;
4101
4102 build_aggregate_combinefn_expr(aggtranstype,
4103 aggref->inputcollid,
4104 aggtransfn,
4105 &combinefnexpr);
4106 fmgr_info(aggtransfn, &pertrans->transfn);
4107 fmgr_info_set_expr((Node *) combinefnexpr, &pertrans->transfn);
4108
4109 pertrans->transfn_fcinfo =
4110 (FunctionCallInfo) palloc(SizeForFunctionCallInfo(2));
4111 InitFunctionCallInfoData(*pertrans->transfn_fcinfo,
4112 &pertrans->transfn,
4113 numTransArgs,
4114 pertrans->aggCollation,
4115 (void *) aggstate, NULL);
4116
4117 /*
4118 * Ensure that a combine function to combine INTERNAL states is not
4119 * strict. This should have been checked during CREATE AGGREGATE, but
4120 * the strict property could have been changed since then.
4121 */
4122 if (pertrans->transfn.fn_strict && aggtranstype == INTERNALOID)
4123 ereport(ERROR,
4124 (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
4125 errmsg("combine function with transition type %s must not be declared STRICT",
4126 format_type_be(aggtranstype))));
4127 }
4128 else
4129 {
4130 Expr *transfnexpr;
4131 size_t numTransArgs;
4132
4133 /* Detect how many arguments to pass to the transfn */
4134 if (AGGKIND_IS_ORDERED_SET(aggref->aggkind))
4135 pertrans->numTransInputs = numInputs;
4136 else
4137 pertrans->numTransInputs = numArguments;
4138
4139 /* account for the current transition state */
4140 numTransArgs = pertrans->numTransInputs + 1;
4141
4142 /*
4143 * Set up infrastructure for calling the transfn. Note that
4144 * invtransfn is not needed here.
4145 */
4146 build_aggregate_transfn_expr(inputTypes,
4147 numArguments,
4148 numDirectArgs,
4149 aggref->aggvariadic,
4150 aggtranstype,
4151 aggref->inputcollid,
4152 aggtransfn,
4153 InvalidOid,
4154 &transfnexpr,
4155 NULL);
4156 fmgr_info(aggtransfn, &pertrans->transfn);
4157 fmgr_info_set_expr((Node *) transfnexpr, &pertrans->transfn);
4158
4159 pertrans->transfn_fcinfo =
4160 (FunctionCallInfo) palloc(SizeForFunctionCallInfo(numTransArgs));
4161 InitFunctionCallInfoData(*pertrans->transfn_fcinfo,
4162 &pertrans->transfn,
4163 numTransArgs,
4164 pertrans->aggCollation,
4165 (void *) aggstate, NULL);
4166
4167 /*
4168 * If the transfn is strict and the initval is NULL, make sure input
4169 * type and transtype are the same (or at least binary-compatible), so
4170 * that it's OK to use the first aggregated input value as the initial
4171 * transValue. This should have been checked at agg definition time,
4172 * but we must check again in case the transfn's strictness property
4173 * has been changed.
4174 */
4175 if (pertrans->transfn.fn_strict && pertrans->initValueIsNull)
4176 {
4177 if (numArguments <= numDirectArgs ||
4178 !IsBinaryCoercible(inputTypes[numDirectArgs],
4179 aggtranstype))
4180 ereport(ERROR,
4181 (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
4182 errmsg("aggregate %u needs to have compatible input type and transition type",
4183 aggref->aggfnoid)));
4184 }
4185 }
4186
4187 /* get info about the state value's datatype */
4188 get_typlenbyval(aggtranstype,
4189 &pertrans->transtypeLen,
4190 &pertrans->transtypeByVal);
4191
4192 if (OidIsValid(aggserialfn))
4193 {
4194 build_aggregate_serialfn_expr(aggserialfn,
4195 &serialfnexpr);
4196 fmgr_info(aggserialfn, &pertrans->serialfn);
4197 fmgr_info_set_expr((Node *) serialfnexpr, &pertrans->serialfn);
4198
4199 pertrans->serialfn_fcinfo =
4200 (FunctionCallInfo) palloc(SizeForFunctionCallInfo(1));
4201 InitFunctionCallInfoData(*pertrans->serialfn_fcinfo,
4202 &pertrans->serialfn,
4203 1,
4204 InvalidOid,
4205 (void *) aggstate, NULL);
4206 }
4207
4208 if (OidIsValid(aggdeserialfn))
4209 {
4210 build_aggregate_deserialfn_expr(aggdeserialfn,
4211 &deserialfnexpr);
4212 fmgr_info(aggdeserialfn, &pertrans->deserialfn);
4213 fmgr_info_set_expr((Node *) deserialfnexpr, &pertrans->deserialfn);
4214
4215 pertrans->deserialfn_fcinfo =
4216 (FunctionCallInfo) palloc(SizeForFunctionCallInfo(2));
4217 InitFunctionCallInfoData(*pertrans->deserialfn_fcinfo,
4218 &pertrans->deserialfn,
4219 2,
4220 InvalidOid,
4221 (void *) aggstate, NULL);
4222
4223 }
4224
4225 /*
4226 * If we're doing either DISTINCT or ORDER BY for a plain agg, then we
4227 * have a list of SortGroupClause nodes; fish out the data in them and
4228 * stick them into arrays. We ignore ORDER BY for an ordered-set agg,
4229 * however; the agg's transfn and finalfn are responsible for that.
4230 *
4231 * Note that by construction, if there is a DISTINCT clause then the ORDER
4232 * BY clause is a prefix of it (see transformDistinctClause).
4233 */
4234 if (AGGKIND_IS_ORDERED_SET(aggref->aggkind))
4235 {
4236 sortlist = NIL;
4237 numSortCols = numDistinctCols = 0;
4238 }
4239 else if (aggref->aggdistinct)
4240 {
4241 sortlist = aggref->aggdistinct;
4242 numSortCols = numDistinctCols = list_length(sortlist);
4243 Assert(numSortCols >= list_length(aggref->aggorder));
4244 }
4245 else
4246 {
4247 sortlist = aggref->aggorder;
4248 numSortCols = list_length(sortlist);
4249 numDistinctCols = 0;
4250 }
4251
4252 pertrans->numSortCols = numSortCols;
4253 pertrans->numDistinctCols = numDistinctCols;
4254
4255 /*
4256 * If we have either sorting or filtering to do, create a tupledesc and
4257 * slot corresponding to the aggregated inputs (including sort
4258 * expressions) of the agg.
4259 */
4260 if (numSortCols > 0 || aggref->aggfilter)
4261 {
4262 pertrans->sortdesc = ExecTypeFromTL(aggref->args);
4263 pertrans->sortslot =
4264 ExecInitExtraTupleSlot(estate, pertrans->sortdesc,
4265 &TTSOpsMinimalTuple);
4266 }
4267
4268 if (numSortCols > 0)
4269 {
4270 /*
4271 * We don't implement DISTINCT or ORDER BY aggs in the HASHED case
4272 * (yet)
4273 */
4274 Assert(aggstate->aggstrategy != AGG_HASHED && aggstate->aggstrategy != AGG_MIXED);
4275
4276 /* If we have only one input, we need its len/byval info. */
4277 if (numInputs == 1)
4278 {
4279 get_typlenbyval(inputTypes[numDirectArgs],
4280 &pertrans->inputtypeLen,
4281 &pertrans->inputtypeByVal);
4282 }
4283 else if (numDistinctCols > 0)
4284 {
4285 /* we will need an extra slot to store prior values */
4286 pertrans->uniqslot =
4287 ExecInitExtraTupleSlot(estate, pertrans->sortdesc,
4288 &TTSOpsMinimalTuple);
4289 }
4290
4291 /* Extract the sort information for use later */
4292 pertrans->sortColIdx =
4293 (AttrNumber *) palloc(numSortCols * sizeof(AttrNumber));
4294 pertrans->sortOperators =
4295 (Oid *) palloc(numSortCols * sizeof(Oid));
4296 pertrans->sortCollations =
4297 (Oid *) palloc(numSortCols * sizeof(Oid));
4298 pertrans->sortNullsFirst =
4299 (bool *) palloc(numSortCols * sizeof(bool));
4300
4301 i = 0;
4302 foreach(lc, sortlist)
4303 {
4304 SortGroupClause *sortcl = (SortGroupClause *) lfirst(lc);
4305 TargetEntry *tle = get_sortgroupclause_tle(sortcl, aggref->args);
4306
4307 /* the parser should have made sure of this */
4308 Assert(OidIsValid(sortcl->sortop));
4309
4310 pertrans->sortColIdx[i] = tle->resno;
4311 pertrans->sortOperators[i] = sortcl->sortop;
4312 pertrans->sortCollations[i] = exprCollation((Node *) tle->expr);
4313 pertrans->sortNullsFirst[i] = sortcl->nulls_first;
4314 i++;
4315 }
4316 Assert(i == numSortCols);
4317 }
4318
4319 if (aggref->aggdistinct)
4320 {
4321 Oid *ops;
4322
4323 Assert(numArguments > 0);
4324 Assert(list_length(aggref->aggdistinct) == numDistinctCols);
4325
4326 ops = palloc(numDistinctCols * sizeof(Oid));
4327
4328 i = 0;
4329 foreach(lc, aggref->aggdistinct)
4330 ops[i++] = ((SortGroupClause *) lfirst(lc))->eqop;
4331
4332 /* lookup / build the necessary comparators */
4333 if (numDistinctCols == 1)
4334 fmgr_info(get_opcode(ops[0]), &pertrans->equalfnOne);
4335 else
4336 pertrans->equalfnMulti =
4337 execTuplesMatchPrepare(pertrans->sortdesc,
4338 numDistinctCols,
4339 pertrans->sortColIdx,
4340 ops,
4341 pertrans->sortCollations,
4342 &aggstate->ss.ps);
4343 pfree(ops);
4344 }
4345
4346 pertrans->sortstates = (Tuplesortstate **)
4347 palloc0(sizeof(Tuplesortstate *) * numGroupingSets);
4348 }
4349
4350
4351 static Datum
GetAggInitVal(Datum textInitVal,Oid transtype)4352 GetAggInitVal(Datum textInitVal, Oid transtype)
4353 {
4354 Oid typinput,
4355 typioparam;
4356 char *strInitVal;
4357 Datum initVal;
4358
4359 getTypeInputInfo(transtype, &typinput, &typioparam);
4360 strInitVal = TextDatumGetCString(textInitVal);
4361 initVal = OidInputFunctionCall(typinput, strInitVal,
4362 typioparam, -1);
4363 pfree(strInitVal);
4364 return initVal;
4365 }
4366
4367 void
ExecEndAgg(AggState * node)4368 ExecEndAgg(AggState *node)
4369 {
4370 PlanState *outerPlan;
4371 int transno;
4372 int numGroupingSets = Max(node->maxsets, 1);
4373 int setno;
4374
4375 /*
4376 * When ending a parallel worker, copy the statistics gathered by the
4377 * worker back into shared memory so that it can be picked up by the main
4378 * process to report in EXPLAIN ANALYZE.
4379 */
4380 if (node->shared_info && IsParallelWorker())
4381 {
4382 AggregateInstrumentation *si;
4383
4384 Assert(ParallelWorkerNumber <= node->shared_info->num_workers);
4385 si = &node->shared_info->sinstrument[ParallelWorkerNumber];
4386 si->hash_batches_used = node->hash_batches_used;
4387 si->hash_disk_used = node->hash_disk_used;
4388 si->hash_mem_peak = node->hash_mem_peak;
4389 }
4390
4391 /* Make sure we have closed any open tuplesorts */
4392
4393 if (node->sort_in)
4394 tuplesort_end(node->sort_in);
4395 if (node->sort_out)
4396 tuplesort_end(node->sort_out);
4397
4398 hashagg_reset_spill_state(node);
4399
4400 if (node->hash_metacxt != NULL)
4401 {
4402 MemoryContextDelete(node->hash_metacxt);
4403 node->hash_metacxt = NULL;
4404 }
4405
4406 for (transno = 0; transno < node->numtrans; transno++)
4407 {
4408 AggStatePerTrans pertrans = &node->pertrans[transno];
4409
4410 for (setno = 0; setno < numGroupingSets; setno++)
4411 {
4412 if (pertrans->sortstates[setno])
4413 tuplesort_end(pertrans->sortstates[setno]);
4414 }
4415 }
4416
4417 /* And ensure any agg shutdown callbacks have been called */
4418 for (setno = 0; setno < numGroupingSets; setno++)
4419 ReScanExprContext(node->aggcontexts[setno]);
4420 if (node->hashcontext)
4421 ReScanExprContext(node->hashcontext);
4422
4423 /*
4424 * We don't actually free any ExprContexts here (see comment in
4425 * ExecFreeExprContext), just unlinking the output one from the plan node
4426 * suffices.
4427 */
4428 ExecFreeExprContext(&node->ss.ps);
4429
4430 /* clean up tuple table */
4431 ExecClearTuple(node->ss.ss_ScanTupleSlot);
4432
4433 outerPlan = outerPlanState(node);
4434 ExecEndNode(outerPlan);
4435 }
4436
4437 void
ExecReScanAgg(AggState * node)4438 ExecReScanAgg(AggState *node)
4439 {
4440 ExprContext *econtext = node->ss.ps.ps_ExprContext;
4441 PlanState *outerPlan = outerPlanState(node);
4442 Agg *aggnode = (Agg *) node->ss.ps.plan;
4443 int transno;
4444 int numGroupingSets = Max(node->maxsets, 1);
4445 int setno;
4446
4447 node->agg_done = false;
4448
4449 if (node->aggstrategy == AGG_HASHED)
4450 {
4451 /*
4452 * In the hashed case, if we haven't yet built the hash table then we
4453 * can just return; nothing done yet, so nothing to undo. If subnode's
4454 * chgParam is not NULL then it will be re-scanned by ExecProcNode,
4455 * else no reason to re-scan it at all.
4456 */
4457 if (!node->table_filled)
4458 return;
4459
4460 /*
4461 * If we do have the hash table, and it never spilled, and the subplan
4462 * does not have any parameter changes, and none of our own parameter
4463 * changes affect input expressions of the aggregated functions, then
4464 * we can just rescan the existing hash table; no need to build it
4465 * again.
4466 */
4467 if (outerPlan->chgParam == NULL && !node->hash_ever_spilled &&
4468 !bms_overlap(node->ss.ps.chgParam, aggnode->aggParams))
4469 {
4470 ResetTupleHashIterator(node->perhash[0].hashtable,
4471 &node->perhash[0].hashiter);
4472 select_current_set(node, 0, true);
4473 return;
4474 }
4475 }
4476
4477 /* Make sure we have closed any open tuplesorts */
4478 for (transno = 0; transno < node->numtrans; transno++)
4479 {
4480 for (setno = 0; setno < numGroupingSets; setno++)
4481 {
4482 AggStatePerTrans pertrans = &node->pertrans[transno];
4483
4484 if (pertrans->sortstates[setno])
4485 {
4486 tuplesort_end(pertrans->sortstates[setno]);
4487 pertrans->sortstates[setno] = NULL;
4488 }
4489 }
4490 }
4491
4492 /*
4493 * We don't need to ReScanExprContext the output tuple context here;
4494 * ExecReScan already did it. But we do need to reset our per-grouping-set
4495 * contexts, which may have transvalues stored in them. (We use rescan
4496 * rather than just reset because transfns may have registered callbacks
4497 * that need to be run now.) For the AGG_HASHED case, see below.
4498 */
4499
4500 for (setno = 0; setno < numGroupingSets; setno++)
4501 {
4502 ReScanExprContext(node->aggcontexts[setno]);
4503 }
4504
4505 /* Release first tuple of group, if we have made a copy */
4506 if (node->grp_firstTuple != NULL)
4507 {
4508 heap_freetuple(node->grp_firstTuple);
4509 node->grp_firstTuple = NULL;
4510 }
4511 ExecClearTuple(node->ss.ss_ScanTupleSlot);
4512
4513 /* Forget current agg values */
4514 MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * node->numaggs);
4515 MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * node->numaggs);
4516
4517 /*
4518 * With AGG_HASHED/MIXED, the hash table is allocated in a sub-context of
4519 * the hashcontext. This used to be an issue, but now, resetting a context
4520 * automatically deletes sub-contexts too.
4521 */
4522 if (node->aggstrategy == AGG_HASHED || node->aggstrategy == AGG_MIXED)
4523 {
4524 hashagg_reset_spill_state(node);
4525
4526 node->hash_ever_spilled = false;
4527 node->hash_spill_mode = false;
4528 node->hash_ngroups_current = 0;
4529
4530 ReScanExprContext(node->hashcontext);
4531 /* Rebuild an empty hash table */
4532 build_hash_tables(node);
4533 node->table_filled = false;
4534 /* iterator will be reset when the table is filled */
4535
4536 hashagg_recompile_expressions(node, false, false);
4537 }
4538
4539 if (node->aggstrategy != AGG_HASHED)
4540 {
4541 /*
4542 * Reset the per-group state (in particular, mark transvalues null)
4543 */
4544 for (setno = 0; setno < numGroupingSets; setno++)
4545 {
4546 MemSet(node->pergroups[setno], 0,
4547 sizeof(AggStatePerGroupData) * node->numaggs);
4548 }
4549
4550 /* reset to phase 1 */
4551 initialize_phase(node, 1);
4552
4553 node->input_done = false;
4554 node->projected_set = -1;
4555 }
4556
4557 if (outerPlan->chgParam == NULL)
4558 ExecReScan(outerPlan);
4559 }
4560
4561
4562 /***********************************************************************
4563 * API exposed to aggregate functions
4564 ***********************************************************************/
4565
4566
4567 /*
4568 * AggCheckCallContext - test if a SQL function is being called as an aggregate
4569 *
4570 * The transition and/or final functions of an aggregate may want to verify
4571 * that they are being called as aggregates, rather than as plain SQL
4572 * functions. They should use this function to do so. The return value
4573 * is nonzero if being called as an aggregate, or zero if not. (Specific
4574 * nonzero values are AGG_CONTEXT_AGGREGATE or AGG_CONTEXT_WINDOW, but more
4575 * values could conceivably appear in future.)
4576 *
4577 * If aggcontext isn't NULL, the function also stores at *aggcontext the
4578 * identity of the memory context that aggregate transition values are being
4579 * stored in. Note that the same aggregate call site (flinfo) may be called
4580 * interleaved on different transition values in different contexts, so it's
4581 * not kosher to cache aggcontext under fn_extra. It is, however, kosher to
4582 * cache it in the transvalue itself (for internal-type transvalues).
4583 */
4584 int
AggCheckCallContext(FunctionCallInfo fcinfo,MemoryContext * aggcontext)4585 AggCheckCallContext(FunctionCallInfo fcinfo, MemoryContext *aggcontext)
4586 {
4587 if (fcinfo->context && IsA(fcinfo->context, AggState))
4588 {
4589 if (aggcontext)
4590 {
4591 AggState *aggstate = ((AggState *) fcinfo->context);
4592 ExprContext *cxt = aggstate->curaggcontext;
4593
4594 *aggcontext = cxt->ecxt_per_tuple_memory;
4595 }
4596 return AGG_CONTEXT_AGGREGATE;
4597 }
4598 if (fcinfo->context && IsA(fcinfo->context, WindowAggState))
4599 {
4600 if (aggcontext)
4601 *aggcontext = ((WindowAggState *) fcinfo->context)->curaggcontext;
4602 return AGG_CONTEXT_WINDOW;
4603 }
4604
4605 /* this is just to prevent "uninitialized variable" warnings */
4606 if (aggcontext)
4607 *aggcontext = NULL;
4608 return 0;
4609 }
4610
4611 /*
4612 * AggGetAggref - allow an aggregate support function to get its Aggref
4613 *
4614 * If the function is being called as an aggregate support function,
4615 * return the Aggref node for the aggregate call. Otherwise, return NULL.
4616 *
4617 * Aggregates sharing the same inputs and transition functions can get
4618 * merged into a single transition calculation. If the transition function
4619 * calls AggGetAggref, it will get some one of the Aggrefs for which it is
4620 * executing. It must therefore not pay attention to the Aggref fields that
4621 * relate to the final function, as those are indeterminate. But if a final
4622 * function calls AggGetAggref, it will get a precise result.
4623 *
4624 * Note that if an aggregate is being used as a window function, this will
4625 * return NULL. We could provide a similar function to return the relevant
4626 * WindowFunc node in such cases, but it's not needed yet.
4627 */
4628 Aggref *
AggGetAggref(FunctionCallInfo fcinfo)4629 AggGetAggref(FunctionCallInfo fcinfo)
4630 {
4631 if (fcinfo->context && IsA(fcinfo->context, AggState))
4632 {
4633 AggState *aggstate = (AggState *) fcinfo->context;
4634 AggStatePerAgg curperagg;
4635 AggStatePerTrans curpertrans;
4636
4637 /* check curperagg (valid when in a final function) */
4638 curperagg = aggstate->curperagg;
4639
4640 if (curperagg)
4641 return curperagg->aggref;
4642
4643 /* check curpertrans (valid when in a transition function) */
4644 curpertrans = aggstate->curpertrans;
4645
4646 if (curpertrans)
4647 return curpertrans->aggref;
4648 }
4649 return NULL;
4650 }
4651
4652 /*
4653 * AggGetTempMemoryContext - fetch short-term memory context for aggregates
4654 *
4655 * This is useful in agg final functions; the context returned is one that
4656 * the final function can safely reset as desired. This isn't useful for
4657 * transition functions, since the context returned MAY (we don't promise)
4658 * be the same as the context those are called in.
4659 *
4660 * As above, this is currently not useful for aggs called as window functions.
4661 */
4662 MemoryContext
AggGetTempMemoryContext(FunctionCallInfo fcinfo)4663 AggGetTempMemoryContext(FunctionCallInfo fcinfo)
4664 {
4665 if (fcinfo->context && IsA(fcinfo->context, AggState))
4666 {
4667 AggState *aggstate = (AggState *) fcinfo->context;
4668
4669 return aggstate->tmpcontext->ecxt_per_tuple_memory;
4670 }
4671 return NULL;
4672 }
4673
4674 /*
4675 * AggStateIsShared - find out whether transition state is shared
4676 *
4677 * If the function is being called as an aggregate support function,
4678 * return true if the aggregate's transition state is shared across
4679 * multiple aggregates, false if it is not.
4680 *
4681 * Returns true if not called as an aggregate support function.
4682 * This is intended as a conservative answer, ie "no you'd better not
4683 * scribble on your input". In particular, will return true if the
4684 * aggregate is being used as a window function, which is a scenario
4685 * in which changing the transition state is a bad idea. We might
4686 * want to refine the behavior for the window case in future.
4687 */
4688 bool
AggStateIsShared(FunctionCallInfo fcinfo)4689 AggStateIsShared(FunctionCallInfo fcinfo)
4690 {
4691 if (fcinfo->context && IsA(fcinfo->context, AggState))
4692 {
4693 AggState *aggstate = (AggState *) fcinfo->context;
4694 AggStatePerAgg curperagg;
4695 AggStatePerTrans curpertrans;
4696
4697 /* check curperagg (valid when in a final function) */
4698 curperagg = aggstate->curperagg;
4699
4700 if (curperagg)
4701 return aggstate->pertrans[curperagg->transno].aggshared;
4702
4703 /* check curpertrans (valid when in a transition function) */
4704 curpertrans = aggstate->curpertrans;
4705
4706 if (curpertrans)
4707 return curpertrans->aggshared;
4708 }
4709 return true;
4710 }
4711
4712 /*
4713 * AggRegisterCallback - register a cleanup callback for an aggregate
4714 *
4715 * This is useful for aggs to register shutdown callbacks, which will ensure
4716 * that non-memory resources are freed. The callback will occur just before
4717 * the associated aggcontext (as returned by AggCheckCallContext) is reset,
4718 * either between groups or as a result of rescanning the query. The callback
4719 * will NOT be called on error paths. The typical use-case is for freeing of
4720 * tuplestores or tuplesorts maintained in aggcontext, or pins held by slots
4721 * created by the agg functions. (The callback will not be called until after
4722 * the result of the finalfn is no longer needed, so it's safe for the finalfn
4723 * to return data that will be freed by the callback.)
4724 *
4725 * As above, this is currently not useful for aggs called as window functions.
4726 */
4727 void
AggRegisterCallback(FunctionCallInfo fcinfo,ExprContextCallbackFunction func,Datum arg)4728 AggRegisterCallback(FunctionCallInfo fcinfo,
4729 ExprContextCallbackFunction func,
4730 Datum arg)
4731 {
4732 if (fcinfo->context && IsA(fcinfo->context, AggState))
4733 {
4734 AggState *aggstate = (AggState *) fcinfo->context;
4735 ExprContext *cxt = aggstate->curaggcontext;
4736
4737 RegisterExprContextCallback(cxt, func, arg);
4738
4739 return;
4740 }
4741 elog(ERROR, "aggregate function cannot register a callback in this context");
4742 }
4743
4744
4745 /* ----------------------------------------------------------------
4746 * Parallel Query Support
4747 * ----------------------------------------------------------------
4748 */
4749
4750 /* ----------------------------------------------------------------
4751 * ExecAggEstimate
4752 *
4753 * Estimate space required to propagate aggregate statistics.
4754 * ----------------------------------------------------------------
4755 */
4756 void
ExecAggEstimate(AggState * node,ParallelContext * pcxt)4757 ExecAggEstimate(AggState *node, ParallelContext *pcxt)
4758 {
4759 Size size;
4760
4761 /* don't need this if not instrumenting or no workers */
4762 if (!node->ss.ps.instrument || pcxt->nworkers == 0)
4763 return;
4764
4765 size = mul_size(pcxt->nworkers, sizeof(AggregateInstrumentation));
4766 size = add_size(size, offsetof(SharedAggInfo, sinstrument));
4767 shm_toc_estimate_chunk(&pcxt->estimator, size);
4768 shm_toc_estimate_keys(&pcxt->estimator, 1);
4769 }
4770
4771 /* ----------------------------------------------------------------
4772 * ExecAggInitializeDSM
4773 *
4774 * Initialize DSM space for aggregate statistics.
4775 * ----------------------------------------------------------------
4776 */
4777 void
ExecAggInitializeDSM(AggState * node,ParallelContext * pcxt)4778 ExecAggInitializeDSM(AggState *node, ParallelContext *pcxt)
4779 {
4780 Size size;
4781
4782 /* don't need this if not instrumenting or no workers */
4783 if (!node->ss.ps.instrument || pcxt->nworkers == 0)
4784 return;
4785
4786 size = offsetof(SharedAggInfo, sinstrument)
4787 + pcxt->nworkers * sizeof(AggregateInstrumentation);
4788 node->shared_info = shm_toc_allocate(pcxt->toc, size);
4789 /* ensure any unfilled slots will contain zeroes */
4790 memset(node->shared_info, 0, size);
4791 node->shared_info->num_workers = pcxt->nworkers;
4792 shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id,
4793 node->shared_info);
4794 }
4795
4796 /* ----------------------------------------------------------------
4797 * ExecAggInitializeWorker
4798 *
4799 * Attach worker to DSM space for aggregate statistics.
4800 * ----------------------------------------------------------------
4801 */
4802 void
ExecAggInitializeWorker(AggState * node,ParallelWorkerContext * pwcxt)4803 ExecAggInitializeWorker(AggState *node, ParallelWorkerContext *pwcxt)
4804 {
4805 node->shared_info =
4806 shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, true);
4807 }
4808
4809 /* ----------------------------------------------------------------
4810 * ExecAggRetrieveInstrumentation
4811 *
4812 * Transfer aggregate statistics from DSM to private memory.
4813 * ----------------------------------------------------------------
4814 */
4815 void
ExecAggRetrieveInstrumentation(AggState * node)4816 ExecAggRetrieveInstrumentation(AggState *node)
4817 {
4818 Size size;
4819 SharedAggInfo *si;
4820
4821 if (node->shared_info == NULL)
4822 return;
4823
4824 size = offsetof(SharedAggInfo, sinstrument)
4825 + node->shared_info->num_workers * sizeof(AggregateInstrumentation);
4826 si = palloc(size);
4827 memcpy(si, node->shared_info, size);
4828 node->shared_info = si;
4829 }
4830