1 /*-------------------------------------------------------------------------
2 *
3 * nodeAgg.c
4 * Routines to handle aggregate nodes.
5 *
6 * ExecAgg normally evaluates each aggregate in the following steps:
7 *
8 * transvalue = initcond
9 * foreach input_tuple do
10 * transvalue = transfunc(transvalue, input_value(s))
11 * result = finalfunc(transvalue, direct_argument(s))
12 *
13 * If a finalfunc is not supplied then the result is just the ending
14 * value of transvalue.
15 *
16 * Other behaviors can be selected by the "aggsplit" mode, which exists
17 * to support partial aggregation. It is possible to:
18 * * Skip running the finalfunc, so that the output is always the
19 * final transvalue state.
20 * * Substitute the combinefunc for the transfunc, so that transvalue
21 * states (propagated up from a child partial-aggregation step) are merged
22 * rather than processing raw input rows. (The statements below about
23 * the transfunc apply equally to the combinefunc, when it's selected.)
24 * * Apply the serializefunc to the output values (this only makes sense
25 * when skipping the finalfunc, since the serializefunc works on the
26 * transvalue data type).
27 * * Apply the deserializefunc to the input values (this only makes sense
28 * when using the combinefunc, for similar reasons).
29 * It is the planner's responsibility to connect up Agg nodes using these
30 * alternate behaviors in a way that makes sense, with partial aggregation
31 * results being fed to nodes that expect them.
32 *
33 * If a normal aggregate call specifies DISTINCT or ORDER BY, we sort the
34 * input tuples and eliminate duplicates (if required) before performing
35 * the above-depicted process. (However, we don't do that for ordered-set
36 * aggregates; their "ORDER BY" inputs are ordinary aggregate arguments
37 * so far as this module is concerned.) Note that partial aggregation
38 * is not supported in these cases, since we couldn't ensure global
39 * ordering or distinctness of the inputs.
40 *
41 * If transfunc is marked "strict" in pg_proc and initcond is NULL,
42 * then the first non-NULL input_value is assigned directly to transvalue,
43 * and transfunc isn't applied until the second non-NULL input_value.
44 * The agg's first input type and transtype must be the same in this case!
45 *
46 * If transfunc is marked "strict" then NULL input_values are skipped,
47 * keeping the previous transvalue. If transfunc is not strict then it
48 * is called for every input tuple and must deal with NULL initcond
49 * or NULL input_values for itself.
50 *
51 * If finalfunc is marked "strict" then it is not called when the
52 * ending transvalue is NULL, instead a NULL result is created
53 * automatically (this is just the usual handling of strict functions,
54 * of course). A non-strict finalfunc can make its own choice of
55 * what to return for a NULL ending transvalue.
56 *
57 * Ordered-set aggregates are treated specially in one other way: we
58 * evaluate any "direct" arguments and pass them to the finalfunc along
59 * with the transition value.
60 *
61 * A finalfunc can have additional arguments beyond the transvalue and
62 * any "direct" arguments, corresponding to the input arguments of the
63 * aggregate. These are always just passed as NULL. Such arguments may be
64 * needed to allow resolution of a polymorphic aggregate's result type.
65 *
66 * We compute aggregate input expressions and run the transition functions
67 * in a temporary econtext (aggstate->tmpcontext). This is reset at least
68 * once per input tuple, so when the transvalue datatype is
69 * pass-by-reference, we have to be careful to copy it into a longer-lived
70 * memory context, and free the prior value to avoid memory leakage. We
71 * store transvalues in another set of econtexts, aggstate->aggcontexts
72 * (one per grouping set, see below), which are also used for the hashtable
73 * structures in AGG_HASHED mode. These econtexts are rescanned, not just
74 * reset, at group boundaries so that aggregate transition functions can
75 * register shutdown callbacks via AggRegisterCallback.
76 *
77 * The node's regular econtext (aggstate->ss.ps.ps_ExprContext) is used to
78 * run finalize functions and compute the output tuple; this context can be
79 * reset once per output tuple.
80 *
81 * The executor's AggState node is passed as the fmgr "context" value in
82 * all transfunc and finalfunc calls. It is not recommended that the
83 * transition functions look at the AggState node directly, but they can
84 * use AggCheckCallContext() to verify that they are being called by
85 * nodeAgg.c (and not as ordinary SQL functions). The main reason a
86 * transition function might want to know this is so that it can avoid
87 * palloc'ing a fixed-size pass-by-ref transition value on every call:
88 * it can instead just scribble on and return its left input. Ordinarily
89 * it is completely forbidden for functions to modify pass-by-ref inputs,
90 * but in the aggregate case we know the left input is either the initial
91 * transition value or a previous function result, and in either case its
92 * value need not be preserved. See int8inc() for an example. Notice that
93 * the EEOP_AGG_PLAIN_TRANS step is coded to avoid a data copy step when
94 * the previous transition value pointer is returned. It is also possible
95 * to avoid repeated data copying when the transition value is an expanded
96 * object: to do that, the transition function must take care to return
97 * an expanded object that is in a child context of the memory context
98 * returned by AggCheckCallContext(). Also, some transition functions want
99 * to store working state in addition to the nominal transition value; they
100 * can use the memory context returned by AggCheckCallContext() to do that.
101 *
102 * Note: AggCheckCallContext() is available as of PostgreSQL 9.0. The
103 * AggState is available as context in earlier releases (back to 8.1),
104 * but direct examination of the node is needed to use it before 9.0.
105 *
106 * As of 9.4, aggregate transition functions can also use AggGetAggref()
107 * to get hold of the Aggref expression node for their aggregate call.
108 * This is mainly intended for ordered-set aggregates, which are not
109 * supported as window functions. (A regular aggregate function would
110 * need some fallback logic to use this, since there's no Aggref node
111 * for a window function.)
112 *
113 * Grouping sets:
114 *
115 * A list of grouping sets which is structurally equivalent to a ROLLUP
116 * clause (e.g. (a,b,c), (a,b), (a)) can be processed in a single pass over
117 * ordered data. We do this by keeping a separate set of transition values
118 * for each grouping set being concurrently processed; for each input tuple
119 * we update them all, and on group boundaries we reset those states
120 * (starting at the front of the list) whose grouping values have changed
121 * (the list of grouping sets is ordered from most specific to least
122 * specific).
123 *
124 * Where more complex grouping sets are used, we break them down into
125 * "phases", where each phase has a different sort order (except phase 0
126 * which is reserved for hashing). During each phase but the last, the
127 * input tuples are additionally stored in a tuplesort which is keyed to the
128 * next phase's sort order; during each phase but the first, the input
129 * tuples are drawn from the previously sorted data. (The sorting of the
130 * data for the first phase is handled by the planner, as it might be
131 * satisfied by underlying nodes.)
132 *
133 * Hashing can be mixed with sorted grouping. To do this, we have an
134 * AGG_MIXED strategy that populates the hashtables during the first sorted
135 * phase, and switches to reading them out after completing all sort phases.
136 * We can also support AGG_HASHED with multiple hash tables and no sorting
137 * at all.
138 *
139 * From the perspective of aggregate transition and final functions, the
140 * only issue regarding grouping sets is this: a single call site (flinfo)
141 * of an aggregate function may be used for updating several different
142 * transition values in turn. So the function must not cache in the flinfo
143 * anything which logically belongs as part of the transition value (most
144 * importantly, the memory context in which the transition value exists).
145 * The support API functions (AggCheckCallContext, AggRegisterCallback) are
146 * sensitive to the grouping set for which the aggregate function is
147 * currently being called.
148 *
149 * Plan structure:
150 *
151 * What we get from the planner is actually one "real" Agg node which is
152 * part of the plan tree proper, but which optionally has an additional list
153 * of Agg nodes hung off the side via the "chain" field. This is because an
154 * Agg node happens to be a convenient representation of all the data we
155 * need for grouping sets.
156 *
157 * For many purposes, we treat the "real" node as if it were just the first
158 * node in the chain. The chain must be ordered such that hashed entries
159 * come before sorted/plain entries; the real node is marked AGG_MIXED if
160 * there are both types present (in which case the real node describes one
161 * of the hashed groupings, other AGG_HASHED nodes may optionally follow in
162 * the chain, followed in turn by AGG_SORTED or (one) AGG_PLAIN node). If
163 * the real node is marked AGG_HASHED or AGG_SORTED, then all the chained
164 * nodes must be of the same type; if it is AGG_PLAIN, there can be no
165 * chained nodes.
166 *
167 * We collect all hashed nodes into a single "phase", numbered 0, and create
168 * a sorted phase (numbered 1..n) for each AGG_SORTED or AGG_PLAIN node.
169 * Phase 0 is allocated even if there are no hashes, but remains unused in
170 * that case.
171 *
172 * AGG_HASHED nodes actually refer to only a single grouping set each,
173 * because for each hashed grouping we need a separate grpColIdx and
174 * numGroups estimate. AGG_SORTED nodes represent a "rollup", a list of
175 * grouping sets that share a sort order. Each AGG_SORTED node other than
176 * the first one has an associated Sort node which describes the sort order
177 * to be used; the first sorted node takes its input from the outer subtree,
178 * which the planner has already arranged to provide ordered data.
179 *
180 * Memory and ExprContext usage:
181 *
182 * Because we're accumulating aggregate values across input rows, we need to
183 * use more memory contexts than just simple input/output tuple contexts.
184 * In fact, for a rollup, we need a separate context for each grouping set
185 * so that we can reset the inner (finer-grained) aggregates on their group
186 * boundaries while continuing to accumulate values for outer
187 * (coarser-grained) groupings. On top of this, we might be simultaneously
188 * populating hashtables; however, we only need one context for all the
189 * hashtables.
190 *
191 * So we create an array, aggcontexts, with an ExprContext for each grouping
192 * set in the largest rollup that we're going to process, and use the
193 * per-tuple memory context of those ExprContexts to store the aggregate
194 * transition values. hashcontext is the single context created to support
195 * all hash tables.
196 *
197 * Transition / Combine function invocation:
198 *
199 * For performance reasons transition functions, including combine
200 * functions, aren't invoked one-by-one from nodeAgg.c after computing
201 * arguments using the expression evaluation engine. Instead
202 * ExecBuildAggTrans() builds one large expression that does both argument
203 * evaluation and transition function invocation. That avoids performance
204 * issues due to repeated uses of expression evaluation, complications due
205 * to filter expressions having to be evaluated early, and allows to JIT
206 * the entire expression into one native function.
207 *
208 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
209 * Portions Copyright (c) 1994, Regents of the University of California
210 *
211 * IDENTIFICATION
212 * src/backend/executor/nodeAgg.c
213 *
214 *-------------------------------------------------------------------------
215 */
216
217 #include "postgres.h"
218
219 #include "access/htup_details.h"
220 #include "catalog/objectaccess.h"
221 #include "catalog/pg_aggregate.h"
222 #include "catalog/pg_proc.h"
223 #include "catalog/pg_type.h"
224 #include "executor/execExpr.h"
225 #include "executor/executor.h"
226 #include "executor/nodeAgg.h"
227 #include "miscadmin.h"
228 #include "nodes/makefuncs.h"
229 #include "nodes/nodeFuncs.h"
230 #include "optimizer/optimizer.h"
231 #include "parser/parse_agg.h"
232 #include "parser/parse_coerce.h"
233 #include "utils/acl.h"
234 #include "utils/builtins.h"
235 #include "utils/lsyscache.h"
236 #include "utils/memutils.h"
237 #include "utils/syscache.h"
238 #include "utils/tuplesort.h"
239 #include "utils/datum.h"
240
241
242 static void select_current_set(AggState *aggstate, int setno, bool is_hash);
243 static void initialize_phase(AggState *aggstate, int newphase);
244 static TupleTableSlot *fetch_input_tuple(AggState *aggstate);
245 static void initialize_aggregates(AggState *aggstate,
246 AggStatePerGroup *pergroups,
247 int numReset);
248 static void advance_transition_function(AggState *aggstate,
249 AggStatePerTrans pertrans,
250 AggStatePerGroup pergroupstate);
251 static void advance_aggregates(AggState *aggstate);
252 static void process_ordered_aggregate_single(AggState *aggstate,
253 AggStatePerTrans pertrans,
254 AggStatePerGroup pergroupstate);
255 static void process_ordered_aggregate_multi(AggState *aggstate,
256 AggStatePerTrans pertrans,
257 AggStatePerGroup pergroupstate);
258 static void finalize_aggregate(AggState *aggstate,
259 AggStatePerAgg peragg,
260 AggStatePerGroup pergroupstate,
261 Datum *resultVal, bool *resultIsNull);
262 static void finalize_partialaggregate(AggState *aggstate,
263 AggStatePerAgg peragg,
264 AggStatePerGroup pergroupstate,
265 Datum *resultVal, bool *resultIsNull);
266 static void prepare_projection_slot(AggState *aggstate,
267 TupleTableSlot *slot,
268 int currentSet);
269 static void finalize_aggregates(AggState *aggstate,
270 AggStatePerAgg peragg,
271 AggStatePerGroup pergroup);
272 static TupleTableSlot *project_aggregates(AggState *aggstate);
273 static Bitmapset *find_unaggregated_cols(AggState *aggstate);
274 static bool find_unaggregated_cols_walker(Node *node, Bitmapset **colnos);
275 static void build_hash_table(AggState *aggstate);
276 static TupleHashEntryData *lookup_hash_entry(AggState *aggstate);
277 static void lookup_hash_entries(AggState *aggstate);
278 static TupleTableSlot *agg_retrieve_direct(AggState *aggstate);
279 static void agg_fill_hash_table(AggState *aggstate);
280 static TupleTableSlot *agg_retrieve_hash_table(AggState *aggstate);
281 static Datum GetAggInitVal(Datum textInitVal, Oid transtype);
282 static void build_pertrans_for_aggref(AggStatePerTrans pertrans,
283 AggState *aggstate, EState *estate,
284 Aggref *aggref, Oid aggtransfn, Oid aggtranstype,
285 Oid aggserialfn, Oid aggdeserialfn,
286 Datum initValue, bool initValueIsNull,
287 Oid *inputTypes, int numArguments);
288 static int find_compatible_peragg(Aggref *newagg, AggState *aggstate,
289 int lastaggno, List **same_input_transnos);
290 static int find_compatible_pertrans(AggState *aggstate, Aggref *newagg,
291 bool shareable,
292 Oid aggtransfn, Oid aggtranstype,
293 Oid aggserialfn, Oid aggdeserialfn,
294 Datum initValue, bool initValueIsNull,
295 List *transnos);
296
297
298 /*
299 * Select the current grouping set; affects current_set and
300 * curaggcontext.
301 */
302 static void
select_current_set(AggState * aggstate,int setno,bool is_hash)303 select_current_set(AggState *aggstate, int setno, bool is_hash)
304 {
305 /* when changing this, also adapt ExecInterpExpr() and friends */
306 if (is_hash)
307 aggstate->curaggcontext = aggstate->hashcontext;
308 else
309 aggstate->curaggcontext = aggstate->aggcontexts[setno];
310
311 aggstate->current_set = setno;
312 }
313
314 /*
315 * Switch to phase "newphase", which must either be 0 or 1 (to reset) or
316 * current_phase + 1. Juggle the tuplesorts accordingly.
317 *
318 * Phase 0 is for hashing, which we currently handle last in the AGG_MIXED
319 * case, so when entering phase 0, all we need to do is drop open sorts.
320 */
321 static void
initialize_phase(AggState * aggstate,int newphase)322 initialize_phase(AggState *aggstate, int newphase)
323 {
324 Assert(newphase <= 1 || newphase == aggstate->current_phase + 1);
325
326 /*
327 * Whatever the previous state, we're now done with whatever input
328 * tuplesort was in use.
329 */
330 if (aggstate->sort_in)
331 {
332 tuplesort_end(aggstate->sort_in);
333 aggstate->sort_in = NULL;
334 }
335
336 if (newphase <= 1)
337 {
338 /*
339 * Discard any existing output tuplesort.
340 */
341 if (aggstate->sort_out)
342 {
343 tuplesort_end(aggstate->sort_out);
344 aggstate->sort_out = NULL;
345 }
346 }
347 else
348 {
349 /*
350 * The old output tuplesort becomes the new input one, and this is the
351 * right time to actually sort it.
352 */
353 aggstate->sort_in = aggstate->sort_out;
354 aggstate->sort_out = NULL;
355 Assert(aggstate->sort_in);
356 tuplesort_performsort(aggstate->sort_in);
357 }
358
359 /*
360 * If this isn't the last phase, we need to sort appropriately for the
361 * next phase in sequence.
362 */
363 if (newphase > 0 && newphase < aggstate->numphases - 1)
364 {
365 Sort *sortnode = aggstate->phases[newphase + 1].sortnode;
366 PlanState *outerNode = outerPlanState(aggstate);
367 TupleDesc tupDesc = ExecGetResultType(outerNode);
368
369 aggstate->sort_out = tuplesort_begin_heap(tupDesc,
370 sortnode->numCols,
371 sortnode->sortColIdx,
372 sortnode->sortOperators,
373 sortnode->collations,
374 sortnode->nullsFirst,
375 work_mem,
376 NULL, false);
377 }
378
379 aggstate->current_phase = newphase;
380 aggstate->phase = &aggstate->phases[newphase];
381 }
382
383 /*
384 * Fetch a tuple from either the outer plan (for phase 1) or from the sorter
385 * populated by the previous phase. Copy it to the sorter for the next phase
386 * if any.
387 *
388 * Callers cannot rely on memory for tuple in returned slot remaining valid
389 * past any subsequently fetched tuple.
390 */
391 static TupleTableSlot *
fetch_input_tuple(AggState * aggstate)392 fetch_input_tuple(AggState *aggstate)
393 {
394 TupleTableSlot *slot;
395
396 if (aggstate->sort_in)
397 {
398 /* make sure we check for interrupts in either path through here */
399 CHECK_FOR_INTERRUPTS();
400 if (!tuplesort_gettupleslot(aggstate->sort_in, true, false,
401 aggstate->sort_slot, NULL))
402 return NULL;
403 slot = aggstate->sort_slot;
404 }
405 else
406 slot = ExecProcNode(outerPlanState(aggstate));
407
408 if (!TupIsNull(slot) && aggstate->sort_out)
409 tuplesort_puttupleslot(aggstate->sort_out, slot);
410
411 return slot;
412 }
413
414 /*
415 * (Re)Initialize an individual aggregate.
416 *
417 * This function handles only one grouping set, already set in
418 * aggstate->current_set.
419 *
420 * When called, CurrentMemoryContext should be the per-query context.
421 */
422 static void
initialize_aggregate(AggState * aggstate,AggStatePerTrans pertrans,AggStatePerGroup pergroupstate)423 initialize_aggregate(AggState *aggstate, AggStatePerTrans pertrans,
424 AggStatePerGroup pergroupstate)
425 {
426 /*
427 * Start a fresh sort operation for each DISTINCT/ORDER BY aggregate.
428 */
429 if (pertrans->numSortCols > 0)
430 {
431 /*
432 * In case of rescan, maybe there could be an uncompleted sort
433 * operation? Clean it up if so.
434 */
435 if (pertrans->sortstates[aggstate->current_set])
436 tuplesort_end(pertrans->sortstates[aggstate->current_set]);
437
438
439 /*
440 * We use a plain Datum sorter when there's a single input column;
441 * otherwise sort the full tuple. (See comments for
442 * process_ordered_aggregate_single.)
443 */
444 if (pertrans->numInputs == 1)
445 {
446 Form_pg_attribute attr = TupleDescAttr(pertrans->sortdesc, 0);
447
448 pertrans->sortstates[aggstate->current_set] =
449 tuplesort_begin_datum(attr->atttypid,
450 pertrans->sortOperators[0],
451 pertrans->sortCollations[0],
452 pertrans->sortNullsFirst[0],
453 work_mem, NULL, false);
454 }
455 else
456 pertrans->sortstates[aggstate->current_set] =
457 tuplesort_begin_heap(pertrans->sortdesc,
458 pertrans->numSortCols,
459 pertrans->sortColIdx,
460 pertrans->sortOperators,
461 pertrans->sortCollations,
462 pertrans->sortNullsFirst,
463 work_mem, NULL, false);
464 }
465
466 /*
467 * (Re)set transValue to the initial value.
468 *
469 * Note that when the initial value is pass-by-ref, we must copy it (into
470 * the aggcontext) since we will pfree the transValue later.
471 */
472 if (pertrans->initValueIsNull)
473 pergroupstate->transValue = pertrans->initValue;
474 else
475 {
476 MemoryContext oldContext;
477
478 oldContext = MemoryContextSwitchTo(
479 aggstate->curaggcontext->ecxt_per_tuple_memory);
480 pergroupstate->transValue = datumCopy(pertrans->initValue,
481 pertrans->transtypeByVal,
482 pertrans->transtypeLen);
483 MemoryContextSwitchTo(oldContext);
484 }
485 pergroupstate->transValueIsNull = pertrans->initValueIsNull;
486
487 /*
488 * If the initial value for the transition state doesn't exist in the
489 * pg_aggregate table then we will let the first non-NULL value returned
490 * from the outer procNode become the initial value. (This is useful for
491 * aggregates like max() and min().) The noTransValue flag signals that we
492 * still need to do this.
493 */
494 pergroupstate->noTransValue = pertrans->initValueIsNull;
495 }
496
497 /*
498 * Initialize all aggregate transition states for a new group of input values.
499 *
500 * If there are multiple grouping sets, we initialize only the first numReset
501 * of them (the grouping sets are ordered so that the most specific one, which
502 * is reset most often, is first). As a convenience, if numReset is 0, we
503 * reinitialize all sets.
504 *
505 * NB: This cannot be used for hash aggregates, as for those the grouping set
506 * number has to be specified from further up.
507 *
508 * When called, CurrentMemoryContext should be the per-query context.
509 */
510 static void
initialize_aggregates(AggState * aggstate,AggStatePerGroup * pergroups,int numReset)511 initialize_aggregates(AggState *aggstate,
512 AggStatePerGroup *pergroups,
513 int numReset)
514 {
515 int transno;
516 int numGroupingSets = Max(aggstate->phase->numsets, 1);
517 int setno = 0;
518 int numTrans = aggstate->numtrans;
519 AggStatePerTrans transstates = aggstate->pertrans;
520
521 if (numReset == 0)
522 numReset = numGroupingSets;
523
524 for (setno = 0; setno < numReset; setno++)
525 {
526 AggStatePerGroup pergroup = pergroups[setno];
527
528 select_current_set(aggstate, setno, false);
529
530 for (transno = 0; transno < numTrans; transno++)
531 {
532 AggStatePerTrans pertrans = &transstates[transno];
533 AggStatePerGroup pergroupstate = &pergroup[transno];
534
535 initialize_aggregate(aggstate, pertrans, pergroupstate);
536 }
537 }
538 }
539
540 /*
541 * Given new input value(s), advance the transition function of one aggregate
542 * state within one grouping set only (already set in aggstate->current_set)
543 *
544 * The new values (and null flags) have been preloaded into argument positions
545 * 1 and up in pertrans->transfn_fcinfo, so that we needn't copy them again to
546 * pass to the transition function. We also expect that the static fields of
547 * the fcinfo are already initialized; that was done by ExecInitAgg().
548 *
549 * It doesn't matter which memory context this is called in.
550 */
551 static void
advance_transition_function(AggState * aggstate,AggStatePerTrans pertrans,AggStatePerGroup pergroupstate)552 advance_transition_function(AggState *aggstate,
553 AggStatePerTrans pertrans,
554 AggStatePerGroup pergroupstate)
555 {
556 FunctionCallInfo fcinfo = pertrans->transfn_fcinfo;
557 MemoryContext oldContext;
558 Datum newVal;
559
560 if (pertrans->transfn.fn_strict)
561 {
562 /*
563 * For a strict transfn, nothing happens when there's a NULL input; we
564 * just keep the prior transValue.
565 */
566 int numTransInputs = pertrans->numTransInputs;
567 int i;
568
569 for (i = 1; i <= numTransInputs; i++)
570 {
571 if (fcinfo->args[i].isnull)
572 return;
573 }
574 if (pergroupstate->noTransValue)
575 {
576 /*
577 * transValue has not been initialized. This is the first non-NULL
578 * input value. We use it as the initial value for transValue. (We
579 * already checked that the agg's input type is binary-compatible
580 * with its transtype, so straight copy here is OK.)
581 *
582 * We must copy the datum into aggcontext if it is pass-by-ref. We
583 * do not need to pfree the old transValue, since it's NULL.
584 */
585 oldContext = MemoryContextSwitchTo(
586 aggstate->curaggcontext->ecxt_per_tuple_memory);
587 pergroupstate->transValue = datumCopy(fcinfo->args[1].value,
588 pertrans->transtypeByVal,
589 pertrans->transtypeLen);
590 pergroupstate->transValueIsNull = false;
591 pergroupstate->noTransValue = false;
592 MemoryContextSwitchTo(oldContext);
593 return;
594 }
595 if (pergroupstate->transValueIsNull)
596 {
597 /*
598 * Don't call a strict function with NULL inputs. Note it is
599 * possible to get here despite the above tests, if the transfn is
600 * strict *and* returned a NULL on a prior cycle. If that happens
601 * we will propagate the NULL all the way to the end.
602 */
603 return;
604 }
605 }
606
607 /* We run the transition functions in per-input-tuple memory context */
608 oldContext = MemoryContextSwitchTo(aggstate->tmpcontext->ecxt_per_tuple_memory);
609
610 /* set up aggstate->curpertrans for AggGetAggref() */
611 aggstate->curpertrans = pertrans;
612
613 /*
614 * OK to call the transition function
615 */
616 fcinfo->args[0].value = pergroupstate->transValue;
617 fcinfo->args[0].isnull = pergroupstate->transValueIsNull;
618 fcinfo->isnull = false; /* just in case transfn doesn't set it */
619
620 newVal = FunctionCallInvoke(fcinfo);
621
622 aggstate->curpertrans = NULL;
623
624 /*
625 * If pass-by-ref datatype, must copy the new value into aggcontext and
626 * free the prior transValue. But if transfn returned a pointer to its
627 * first input, we don't need to do anything. Also, if transfn returned a
628 * pointer to a R/W expanded object that is already a child of the
629 * aggcontext, assume we can adopt that value without copying it.
630 *
631 * It's safe to compare newVal with pergroup->transValue without
632 * regard for either being NULL, because ExecAggTransReparent()
633 * takes care to set transValue to 0 when NULL. Otherwise we could
634 * end up accidentally not reparenting, when the transValue has
635 * the same numerical value as newValue, despite being NULL. This
636 * is a somewhat hot path, making it undesirable to instead solve
637 * this with another branch for the common case of the transition
638 * function returning its (modified) input argument.
639 */
640 if (!pertrans->transtypeByVal &&
641 DatumGetPointer(newVal) != DatumGetPointer(pergroupstate->transValue))
642 newVal = ExecAggTransReparent(aggstate, pertrans,
643 newVal, fcinfo->isnull,
644 pergroupstate->transValue,
645 pergroupstate->transValueIsNull);
646
647 pergroupstate->transValue = newVal;
648 pergroupstate->transValueIsNull = fcinfo->isnull;
649
650 MemoryContextSwitchTo(oldContext);
651 }
652
653 /*
654 * Advance each aggregate transition state for one input tuple. The input
655 * tuple has been stored in tmpcontext->ecxt_outertuple, so that it is
656 * accessible to ExecEvalExpr.
657 *
658 * We have two sets of transition states to handle: one for sorted aggregation
659 * and one for hashed; we do them both here, to avoid multiple evaluation of
660 * the inputs.
661 *
662 * When called, CurrentMemoryContext should be the per-query context.
663 */
664 static void
advance_aggregates(AggState * aggstate)665 advance_aggregates(AggState *aggstate)
666 {
667 bool dummynull;
668
669 ExecEvalExprSwitchContext(aggstate->phase->evaltrans,
670 aggstate->tmpcontext,
671 &dummynull);
672 }
673
674 /*
675 * Run the transition function for a DISTINCT or ORDER BY aggregate
676 * with only one input. This is called after we have completed
677 * entering all the input values into the sort object. We complete the
678 * sort, read out the values in sorted order, and run the transition
679 * function on each value (applying DISTINCT if appropriate).
680 *
681 * Note that the strictness of the transition function was checked when
682 * entering the values into the sort, so we don't check it again here;
683 * we just apply standard SQL DISTINCT logic.
684 *
685 * The one-input case is handled separately from the multi-input case
686 * for performance reasons: for single by-value inputs, such as the
687 * common case of count(distinct id), the tuplesort_getdatum code path
688 * is around 300% faster. (The speedup for by-reference types is less
689 * but still noticeable.)
690 *
691 * This function handles only one grouping set (already set in
692 * aggstate->current_set).
693 *
694 * When called, CurrentMemoryContext should be the per-query context.
695 */
696 static void
process_ordered_aggregate_single(AggState * aggstate,AggStatePerTrans pertrans,AggStatePerGroup pergroupstate)697 process_ordered_aggregate_single(AggState *aggstate,
698 AggStatePerTrans pertrans,
699 AggStatePerGroup pergroupstate)
700 {
701 Datum oldVal = (Datum) 0;
702 bool oldIsNull = true;
703 bool haveOldVal = false;
704 MemoryContext workcontext = aggstate->tmpcontext->ecxt_per_tuple_memory;
705 MemoryContext oldContext;
706 bool isDistinct = (pertrans->numDistinctCols > 0);
707 Datum newAbbrevVal = (Datum) 0;
708 Datum oldAbbrevVal = (Datum) 0;
709 FunctionCallInfo fcinfo = pertrans->transfn_fcinfo;
710 Datum *newVal;
711 bool *isNull;
712
713 Assert(pertrans->numDistinctCols < 2);
714
715 tuplesort_performsort(pertrans->sortstates[aggstate->current_set]);
716
717 /* Load the column into argument 1 (arg 0 will be transition value) */
718 newVal = &fcinfo->args[1].value;
719 isNull = &fcinfo->args[1].isnull;
720
721 /*
722 * Note: if input type is pass-by-ref, the datums returned by the sort are
723 * freshly palloc'd in the per-query context, so we must be careful to
724 * pfree them when they are no longer needed.
725 */
726
727 while (tuplesort_getdatum(pertrans->sortstates[aggstate->current_set],
728 true, newVal, isNull, &newAbbrevVal))
729 {
730 /*
731 * Clear and select the working context for evaluation of the equality
732 * function and transition function.
733 */
734 MemoryContextReset(workcontext);
735 oldContext = MemoryContextSwitchTo(workcontext);
736
737 /*
738 * If DISTINCT mode, and not distinct from prior, skip it.
739 */
740 if (isDistinct &&
741 haveOldVal &&
742 ((oldIsNull && *isNull) ||
743 (!oldIsNull && !*isNull &&
744 oldAbbrevVal == newAbbrevVal &&
745 DatumGetBool(FunctionCall2Coll(&pertrans->equalfnOne,
746 pertrans->aggCollation,
747 oldVal, *newVal)))))
748 {
749 /* equal to prior, so forget this one */
750 if (!pertrans->inputtypeByVal && !*isNull)
751 pfree(DatumGetPointer(*newVal));
752 }
753 else
754 {
755 advance_transition_function(aggstate, pertrans, pergroupstate);
756 /* forget the old value, if any */
757 if (!oldIsNull && !pertrans->inputtypeByVal)
758 pfree(DatumGetPointer(oldVal));
759 /* and remember the new one for subsequent equality checks */
760 oldVal = *newVal;
761 oldAbbrevVal = newAbbrevVal;
762 oldIsNull = *isNull;
763 haveOldVal = true;
764 }
765
766 MemoryContextSwitchTo(oldContext);
767 }
768
769 if (!oldIsNull && !pertrans->inputtypeByVal)
770 pfree(DatumGetPointer(oldVal));
771
772 tuplesort_end(pertrans->sortstates[aggstate->current_set]);
773 pertrans->sortstates[aggstate->current_set] = NULL;
774 }
775
776 /*
777 * Run the transition function for a DISTINCT or ORDER BY aggregate
778 * with more than one input. This is called after we have completed
779 * entering all the input values into the sort object. We complete the
780 * sort, read out the values in sorted order, and run the transition
781 * function on each value (applying DISTINCT if appropriate).
782 *
783 * This function handles only one grouping set (already set in
784 * aggstate->current_set).
785 *
786 * When called, CurrentMemoryContext should be the per-query context.
787 */
788 static void
process_ordered_aggregate_multi(AggState * aggstate,AggStatePerTrans pertrans,AggStatePerGroup pergroupstate)789 process_ordered_aggregate_multi(AggState *aggstate,
790 AggStatePerTrans pertrans,
791 AggStatePerGroup pergroupstate)
792 {
793 ExprContext *tmpcontext = aggstate->tmpcontext;
794 FunctionCallInfo fcinfo = pertrans->transfn_fcinfo;
795 TupleTableSlot *slot1 = pertrans->sortslot;
796 TupleTableSlot *slot2 = pertrans->uniqslot;
797 int numTransInputs = pertrans->numTransInputs;
798 int numDistinctCols = pertrans->numDistinctCols;
799 Datum newAbbrevVal = (Datum) 0;
800 Datum oldAbbrevVal = (Datum) 0;
801 bool haveOldValue = false;
802 TupleTableSlot *save = aggstate->tmpcontext->ecxt_outertuple;
803 int i;
804
805 tuplesort_performsort(pertrans->sortstates[aggstate->current_set]);
806
807 ExecClearTuple(slot1);
808 if (slot2)
809 ExecClearTuple(slot2);
810
811 while (tuplesort_gettupleslot(pertrans->sortstates[aggstate->current_set],
812 true, true, slot1, &newAbbrevVal))
813 {
814 CHECK_FOR_INTERRUPTS();
815
816 tmpcontext->ecxt_outertuple = slot1;
817 tmpcontext->ecxt_innertuple = slot2;
818
819 if (numDistinctCols == 0 ||
820 !haveOldValue ||
821 newAbbrevVal != oldAbbrevVal ||
822 !ExecQual(pertrans->equalfnMulti, tmpcontext))
823 {
824 /*
825 * Extract the first numTransInputs columns as datums to pass to
826 * the transfn.
827 */
828 slot_getsomeattrs(slot1, numTransInputs);
829
830 /* Load values into fcinfo */
831 /* Start from 1, since the 0th arg will be the transition value */
832 for (i = 0; i < numTransInputs; i++)
833 {
834 fcinfo->args[i + 1].value = slot1->tts_values[i];
835 fcinfo->args[i + 1].isnull = slot1->tts_isnull[i];
836 }
837
838 advance_transition_function(aggstate, pertrans, pergroupstate);
839
840 if (numDistinctCols > 0)
841 {
842 /* swap the slot pointers to retain the current tuple */
843 TupleTableSlot *tmpslot = slot2;
844
845 slot2 = slot1;
846 slot1 = tmpslot;
847 /* avoid ExecQual() calls by reusing abbreviated keys */
848 oldAbbrevVal = newAbbrevVal;
849 haveOldValue = true;
850 }
851 }
852
853 /* Reset context each time */
854 ResetExprContext(tmpcontext);
855
856 ExecClearTuple(slot1);
857 }
858
859 if (slot2)
860 ExecClearTuple(slot2);
861
862 tuplesort_end(pertrans->sortstates[aggstate->current_set]);
863 pertrans->sortstates[aggstate->current_set] = NULL;
864
865 /* restore previous slot, potentially in use for grouping sets */
866 tmpcontext->ecxt_outertuple = save;
867 }
868
869 /*
870 * Compute the final value of one aggregate.
871 *
872 * This function handles only one grouping set (already set in
873 * aggstate->current_set).
874 *
875 * The finalfunction will be run, and the result delivered, in the
876 * output-tuple context; caller's CurrentMemoryContext does not matter.
877 *
878 * The finalfn uses the state as set in the transno. This also might be
879 * being used by another aggregate function, so it's important that we do
880 * nothing destructive here.
881 */
882 static void
finalize_aggregate(AggState * aggstate,AggStatePerAgg peragg,AggStatePerGroup pergroupstate,Datum * resultVal,bool * resultIsNull)883 finalize_aggregate(AggState *aggstate,
884 AggStatePerAgg peragg,
885 AggStatePerGroup pergroupstate,
886 Datum *resultVal, bool *resultIsNull)
887 {
888 LOCAL_FCINFO(fcinfo, FUNC_MAX_ARGS);
889 bool anynull = false;
890 MemoryContext oldContext;
891 int i;
892 ListCell *lc;
893 AggStatePerTrans pertrans = &aggstate->pertrans[peragg->transno];
894
895 oldContext = MemoryContextSwitchTo(aggstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
896
897 /*
898 * Evaluate any direct arguments. We do this even if there's no finalfn
899 * (which is unlikely anyway), so that side-effects happen as expected.
900 * The direct arguments go into arg positions 1 and up, leaving position 0
901 * for the transition state value.
902 */
903 i = 1;
904 foreach(lc, peragg->aggdirectargs)
905 {
906 ExprState *expr = (ExprState *) lfirst(lc);
907
908 fcinfo->args[i].value = ExecEvalExpr(expr,
909 aggstate->ss.ps.ps_ExprContext,
910 &fcinfo->args[i].isnull);
911 anynull |= fcinfo->args[i].isnull;
912 i++;
913 }
914
915 /*
916 * Apply the agg's finalfn if one is provided, else return transValue.
917 */
918 if (OidIsValid(peragg->finalfn_oid))
919 {
920 int numFinalArgs = peragg->numFinalArgs;
921
922 /* set up aggstate->curperagg for AggGetAggref() */
923 aggstate->curperagg = peragg;
924
925 InitFunctionCallInfoData(*fcinfo, &peragg->finalfn,
926 numFinalArgs,
927 pertrans->aggCollation,
928 (void *) aggstate, NULL);
929
930 /* Fill in the transition state value */
931 fcinfo->args[0].value =
932 MakeExpandedObjectReadOnly(pergroupstate->transValue,
933 pergroupstate->transValueIsNull,
934 pertrans->transtypeLen);
935 fcinfo->args[0].isnull = pergroupstate->transValueIsNull;
936 anynull |= pergroupstate->transValueIsNull;
937
938 /* Fill any remaining argument positions with nulls */
939 for (; i < numFinalArgs; i++)
940 {
941 fcinfo->args[i].value = (Datum) 0;
942 fcinfo->args[i].isnull = true;
943 anynull = true;
944 }
945
946 if (fcinfo->flinfo->fn_strict && anynull)
947 {
948 /* don't call a strict function with NULL inputs */
949 *resultVal = (Datum) 0;
950 *resultIsNull = true;
951 }
952 else
953 {
954 *resultVal = FunctionCallInvoke(fcinfo);
955 *resultIsNull = fcinfo->isnull;
956 }
957 aggstate->curperagg = NULL;
958 }
959 else
960 {
961 /* Don't need MakeExpandedObjectReadOnly; datumCopy will copy it */
962 *resultVal = pergroupstate->transValue;
963 *resultIsNull = pergroupstate->transValueIsNull;
964 }
965
966 /*
967 * If result is pass-by-ref, make sure it is in the right context.
968 */
969 if (!peragg->resulttypeByVal && !*resultIsNull &&
970 !MemoryContextContains(CurrentMemoryContext,
971 DatumGetPointer(*resultVal)))
972 *resultVal = datumCopy(*resultVal,
973 peragg->resulttypeByVal,
974 peragg->resulttypeLen);
975
976 MemoryContextSwitchTo(oldContext);
977 }
978
979 /*
980 * Compute the output value of one partial aggregate.
981 *
982 * The serialization function will be run, and the result delivered, in the
983 * output-tuple context; caller's CurrentMemoryContext does not matter.
984 */
985 static void
finalize_partialaggregate(AggState * aggstate,AggStatePerAgg peragg,AggStatePerGroup pergroupstate,Datum * resultVal,bool * resultIsNull)986 finalize_partialaggregate(AggState *aggstate,
987 AggStatePerAgg peragg,
988 AggStatePerGroup pergroupstate,
989 Datum *resultVal, bool *resultIsNull)
990 {
991 AggStatePerTrans pertrans = &aggstate->pertrans[peragg->transno];
992 MemoryContext oldContext;
993
994 oldContext = MemoryContextSwitchTo(aggstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
995
996 /*
997 * serialfn_oid will be set if we must serialize the transvalue before
998 * returning it
999 */
1000 if (OidIsValid(pertrans->serialfn_oid))
1001 {
1002 /* Don't call a strict serialization function with NULL input. */
1003 if (pertrans->serialfn.fn_strict && pergroupstate->transValueIsNull)
1004 {
1005 *resultVal = (Datum) 0;
1006 *resultIsNull = true;
1007 }
1008 else
1009 {
1010 FunctionCallInfo fcinfo = pertrans->serialfn_fcinfo;
1011
1012 fcinfo->args[0].value =
1013 MakeExpandedObjectReadOnly(pergroupstate->transValue,
1014 pergroupstate->transValueIsNull,
1015 pertrans->transtypeLen);
1016 fcinfo->args[0].isnull = pergroupstate->transValueIsNull;
1017 fcinfo->isnull = false;
1018
1019 *resultVal = FunctionCallInvoke(fcinfo);
1020 *resultIsNull = fcinfo->isnull;
1021 }
1022 }
1023 else
1024 {
1025 /* Don't need MakeExpandedObjectReadOnly; datumCopy will copy it */
1026 *resultVal = pergroupstate->transValue;
1027 *resultIsNull = pergroupstate->transValueIsNull;
1028 }
1029
1030 /* If result is pass-by-ref, make sure it is in the right context. */
1031 if (!peragg->resulttypeByVal && !*resultIsNull &&
1032 !MemoryContextContains(CurrentMemoryContext,
1033 DatumGetPointer(*resultVal)))
1034 *resultVal = datumCopy(*resultVal,
1035 peragg->resulttypeByVal,
1036 peragg->resulttypeLen);
1037
1038 MemoryContextSwitchTo(oldContext);
1039 }
1040
1041 /*
1042 * Prepare to finalize and project based on the specified representative tuple
1043 * slot and grouping set.
1044 *
1045 * In the specified tuple slot, force to null all attributes that should be
1046 * read as null in the context of the current grouping set. Also stash the
1047 * current group bitmap where GroupingExpr can get at it.
1048 *
1049 * This relies on three conditions:
1050 *
1051 * 1) Nothing is ever going to try and extract the whole tuple from this slot,
1052 * only reference it in evaluations, which will only access individual
1053 * attributes.
1054 *
1055 * 2) No system columns are going to need to be nulled. (If a system column is
1056 * referenced in a group clause, it is actually projected in the outer plan
1057 * tlist.)
1058 *
1059 * 3) Within a given phase, we never need to recover the value of an attribute
1060 * once it has been set to null.
1061 *
1062 * Poking into the slot this way is a bit ugly, but the consensus is that the
1063 * alternative was worse.
1064 */
1065 static void
prepare_projection_slot(AggState * aggstate,TupleTableSlot * slot,int currentSet)1066 prepare_projection_slot(AggState *aggstate, TupleTableSlot *slot, int currentSet)
1067 {
1068 if (aggstate->phase->grouped_cols)
1069 {
1070 Bitmapset *grouped_cols = aggstate->phase->grouped_cols[currentSet];
1071
1072 aggstate->grouped_cols = grouped_cols;
1073
1074 if (TTS_EMPTY(slot))
1075 {
1076 /*
1077 * Force all values to be NULL if working on an empty input tuple
1078 * (i.e. an empty grouping set for which no input rows were
1079 * supplied).
1080 */
1081 ExecStoreAllNullTuple(slot);
1082 }
1083 else if (aggstate->all_grouped_cols)
1084 {
1085 ListCell *lc;
1086
1087 /* all_grouped_cols is arranged in desc order */
1088 slot_getsomeattrs(slot, linitial_int(aggstate->all_grouped_cols));
1089
1090 foreach(lc, aggstate->all_grouped_cols)
1091 {
1092 int attnum = lfirst_int(lc);
1093
1094 if (!bms_is_member(attnum, grouped_cols))
1095 slot->tts_isnull[attnum - 1] = true;
1096 }
1097 }
1098 }
1099 }
1100
1101 /*
1102 * Compute the final value of all aggregates for one group.
1103 *
1104 * This function handles only one grouping set at a time, which the caller must
1105 * have selected. It's also the caller's responsibility to adjust the supplied
1106 * pergroup parameter to point to the current set's transvalues.
1107 *
1108 * Results are stored in the output econtext aggvalues/aggnulls.
1109 */
1110 static void
finalize_aggregates(AggState * aggstate,AggStatePerAgg peraggs,AggStatePerGroup pergroup)1111 finalize_aggregates(AggState *aggstate,
1112 AggStatePerAgg peraggs,
1113 AggStatePerGroup pergroup)
1114 {
1115 ExprContext *econtext = aggstate->ss.ps.ps_ExprContext;
1116 Datum *aggvalues = econtext->ecxt_aggvalues;
1117 bool *aggnulls = econtext->ecxt_aggnulls;
1118 int aggno;
1119 int transno;
1120
1121 /*
1122 * If there were any DISTINCT and/or ORDER BY aggregates, sort their
1123 * inputs and run the transition functions.
1124 */
1125 for (transno = 0; transno < aggstate->numtrans; transno++)
1126 {
1127 AggStatePerTrans pertrans = &aggstate->pertrans[transno];
1128 AggStatePerGroup pergroupstate;
1129
1130 pergroupstate = &pergroup[transno];
1131
1132 if (pertrans->numSortCols > 0)
1133 {
1134 Assert(aggstate->aggstrategy != AGG_HASHED &&
1135 aggstate->aggstrategy != AGG_MIXED);
1136
1137 if (pertrans->numInputs == 1)
1138 process_ordered_aggregate_single(aggstate,
1139 pertrans,
1140 pergroupstate);
1141 else
1142 process_ordered_aggregate_multi(aggstate,
1143 pertrans,
1144 pergroupstate);
1145 }
1146 }
1147
1148 /*
1149 * Run the final functions.
1150 */
1151 for (aggno = 0; aggno < aggstate->numaggs; aggno++)
1152 {
1153 AggStatePerAgg peragg = &peraggs[aggno];
1154 int transno = peragg->transno;
1155 AggStatePerGroup pergroupstate;
1156
1157 pergroupstate = &pergroup[transno];
1158
1159 if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit))
1160 finalize_partialaggregate(aggstate, peragg, pergroupstate,
1161 &aggvalues[aggno], &aggnulls[aggno]);
1162 else
1163 finalize_aggregate(aggstate, peragg, pergroupstate,
1164 &aggvalues[aggno], &aggnulls[aggno]);
1165 }
1166 }
1167
1168 /*
1169 * Project the result of a group (whose aggs have already been calculated by
1170 * finalize_aggregates). Returns the result slot, or NULL if no row is
1171 * projected (suppressed by qual).
1172 */
1173 static TupleTableSlot *
project_aggregates(AggState * aggstate)1174 project_aggregates(AggState *aggstate)
1175 {
1176 ExprContext *econtext = aggstate->ss.ps.ps_ExprContext;
1177
1178 /*
1179 * Check the qual (HAVING clause); if the group does not match, ignore it.
1180 */
1181 if (ExecQual(aggstate->ss.ps.qual, econtext))
1182 {
1183 /*
1184 * Form and return projection tuple using the aggregate results and
1185 * the representative input tuple.
1186 */
1187 return ExecProject(aggstate->ss.ps.ps_ProjInfo);
1188 }
1189 else
1190 InstrCountFiltered1(aggstate, 1);
1191
1192 return NULL;
1193 }
1194
1195 /*
1196 * find_unaggregated_cols
1197 * Construct a bitmapset of the column numbers of un-aggregated Vars
1198 * appearing in our targetlist and qual (HAVING clause)
1199 */
1200 static Bitmapset *
find_unaggregated_cols(AggState * aggstate)1201 find_unaggregated_cols(AggState *aggstate)
1202 {
1203 Agg *node = (Agg *) aggstate->ss.ps.plan;
1204 Bitmapset *colnos;
1205
1206 colnos = NULL;
1207 (void) find_unaggregated_cols_walker((Node *) node->plan.targetlist,
1208 &colnos);
1209 (void) find_unaggregated_cols_walker((Node *) node->plan.qual,
1210 &colnos);
1211 return colnos;
1212 }
1213
1214 static bool
find_unaggregated_cols_walker(Node * node,Bitmapset ** colnos)1215 find_unaggregated_cols_walker(Node *node, Bitmapset **colnos)
1216 {
1217 if (node == NULL)
1218 return false;
1219 if (IsA(node, Var))
1220 {
1221 Var *var = (Var *) node;
1222
1223 /* setrefs.c should have set the varno to OUTER_VAR */
1224 Assert(var->varno == OUTER_VAR);
1225 Assert(var->varlevelsup == 0);
1226 *colnos = bms_add_member(*colnos, var->varattno);
1227 return false;
1228 }
1229 if (IsA(node, Aggref) ||IsA(node, GroupingFunc))
1230 {
1231 /* do not descend into aggregate exprs */
1232 return false;
1233 }
1234 return expression_tree_walker(node, find_unaggregated_cols_walker,
1235 (void *) colnos);
1236 }
1237
1238 /*
1239 * (Re-)initialize the hash table(s) to empty.
1240 *
1241 * To implement hashed aggregation, we need a hashtable that stores a
1242 * representative tuple and an array of AggStatePerGroup structs for each
1243 * distinct set of GROUP BY column values. We compute the hash key from the
1244 * GROUP BY columns. The per-group data is allocated in lookup_hash_entry(),
1245 * for each entry.
1246 *
1247 * We have a separate hashtable and associated perhash data structure for each
1248 * grouping set for which we're doing hashing.
1249 *
1250 * The contents of the hash tables always live in the hashcontext's per-tuple
1251 * memory context (there is only one of these for all tables together, since
1252 * they are all reset at the same time).
1253 */
1254 static void
build_hash_table(AggState * aggstate)1255 build_hash_table(AggState *aggstate)
1256 {
1257 MemoryContext tmpmem = aggstate->tmpcontext->ecxt_per_tuple_memory;
1258 Size additionalsize;
1259 int i;
1260
1261 Assert(aggstate->aggstrategy == AGG_HASHED || aggstate->aggstrategy == AGG_MIXED);
1262
1263 additionalsize = aggstate->numtrans * sizeof(AggStatePerGroupData);
1264
1265 for (i = 0; i < aggstate->num_hashes; ++i)
1266 {
1267 AggStatePerHash perhash = &aggstate->perhash[i];
1268
1269 Assert(perhash->aggnode->numGroups > 0);
1270
1271 if (perhash->hashtable)
1272 ResetTupleHashTable(perhash->hashtable);
1273 else
1274 perhash->hashtable = BuildTupleHashTableExt(&aggstate->ss.ps,
1275 perhash->hashslot->tts_tupleDescriptor,
1276 perhash->numCols,
1277 perhash->hashGrpColIdxHash,
1278 perhash->eqfuncoids,
1279 perhash->hashfunctions,
1280 perhash->aggnode->grpCollations,
1281 perhash->aggnode->numGroups,
1282 additionalsize,
1283 aggstate->ss.ps.state->es_query_cxt,
1284 aggstate->hashcontext->ecxt_per_tuple_memory,
1285 tmpmem,
1286 DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit));
1287 }
1288 }
1289
1290 /*
1291 * Compute columns that actually need to be stored in hashtable entries. The
1292 * incoming tuples from the child plan node will contain grouping columns,
1293 * other columns referenced in our targetlist and qual, columns used to
1294 * compute the aggregate functions, and perhaps just junk columns we don't use
1295 * at all. Only columns of the first two types need to be stored in the
1296 * hashtable, and getting rid of the others can make the table entries
1297 * significantly smaller. The hashtable only contains the relevant columns,
1298 * and is packed/unpacked in lookup_hash_entry() / agg_retrieve_hash_table()
1299 * into the format of the normal input descriptor.
1300 *
1301 * Additional columns, in addition to the columns grouped by, come from two
1302 * sources: Firstly functionally dependent columns that we don't need to group
1303 * by themselves, and secondly ctids for row-marks.
1304 *
1305 * To eliminate duplicates, we build a bitmapset of the needed columns, and
1306 * then build an array of the columns included in the hashtable. We might
1307 * still have duplicates if the passed-in grpColIdx has them, which can happen
1308 * in edge cases from semijoins/distinct; these can't always be removed,
1309 * because it's not certain that the duplicate cols will be using the same
1310 * hash function.
1311 *
1312 * Note that the array is preserved over ExecReScanAgg, so we allocate it in
1313 * the per-query context (unlike the hash table itself).
1314 */
1315 static void
find_hash_columns(AggState * aggstate)1316 find_hash_columns(AggState *aggstate)
1317 {
1318 Bitmapset *base_colnos;
1319 List *outerTlist = outerPlanState(aggstate)->plan->targetlist;
1320 int numHashes = aggstate->num_hashes;
1321 EState *estate = aggstate->ss.ps.state;
1322 int j;
1323
1324 /* Find Vars that will be needed in tlist and qual */
1325 base_colnos = find_unaggregated_cols(aggstate);
1326
1327 for (j = 0; j < numHashes; ++j)
1328 {
1329 AggStatePerHash perhash = &aggstate->perhash[j];
1330 Bitmapset *colnos = bms_copy(base_colnos);
1331 AttrNumber *grpColIdx = perhash->aggnode->grpColIdx;
1332 List *hashTlist = NIL;
1333 TupleDesc hashDesc;
1334 int maxCols;
1335 int i;
1336
1337 perhash->largestGrpColIdx = 0;
1338
1339 /*
1340 * If we're doing grouping sets, then some Vars might be referenced in
1341 * tlist/qual for the benefit of other grouping sets, but not needed
1342 * when hashing; i.e. prepare_projection_slot will null them out, so
1343 * there'd be no point storing them. Use prepare_projection_slot's
1344 * logic to determine which.
1345 */
1346 if (aggstate->phases[0].grouped_cols)
1347 {
1348 Bitmapset *grouped_cols = aggstate->phases[0].grouped_cols[j];
1349 ListCell *lc;
1350
1351 foreach(lc, aggstate->all_grouped_cols)
1352 {
1353 int attnum = lfirst_int(lc);
1354
1355 if (!bms_is_member(attnum, grouped_cols))
1356 colnos = bms_del_member(colnos, attnum);
1357 }
1358 }
1359
1360 /*
1361 * Compute maximum number of input columns accounting for possible
1362 * duplications in the grpColIdx array, which can happen in some edge
1363 * cases where HashAggregate was generated as part of a semijoin or a
1364 * DISTINCT.
1365 */
1366 maxCols = bms_num_members(colnos) + perhash->numCols;
1367
1368 perhash->hashGrpColIdxInput =
1369 palloc(maxCols * sizeof(AttrNumber));
1370 perhash->hashGrpColIdxHash =
1371 palloc(perhash->numCols * sizeof(AttrNumber));
1372
1373 /* Add all the grouping columns to colnos */
1374 for (i = 0; i < perhash->numCols; i++)
1375 colnos = bms_add_member(colnos, grpColIdx[i]);
1376
1377 /*
1378 * First build mapping for columns directly hashed. These are the
1379 * first, because they'll be accessed when computing hash values and
1380 * comparing tuples for exact matches. We also build simple mapping
1381 * for execGrouping, so it knows where to find the to-be-hashed /
1382 * compared columns in the input.
1383 */
1384 for (i = 0; i < perhash->numCols; i++)
1385 {
1386 perhash->hashGrpColIdxInput[i] = grpColIdx[i];
1387 perhash->hashGrpColIdxHash[i] = i + 1;
1388 perhash->numhashGrpCols++;
1389 /* delete already mapped columns */
1390 bms_del_member(colnos, grpColIdx[i]);
1391 }
1392
1393 /* and add the remaining columns */
1394 while ((i = bms_first_member(colnos)) >= 0)
1395 {
1396 perhash->hashGrpColIdxInput[perhash->numhashGrpCols] = i;
1397 perhash->numhashGrpCols++;
1398 }
1399
1400 /* and build a tuple descriptor for the hashtable */
1401 for (i = 0; i < perhash->numhashGrpCols; i++)
1402 {
1403 int varNumber = perhash->hashGrpColIdxInput[i] - 1;
1404
1405 hashTlist = lappend(hashTlist, list_nth(outerTlist, varNumber));
1406 perhash->largestGrpColIdx =
1407 Max(varNumber + 1, perhash->largestGrpColIdx);
1408 }
1409
1410 hashDesc = ExecTypeFromTL(hashTlist);
1411
1412 execTuplesHashPrepare(perhash->numCols,
1413 perhash->aggnode->grpOperators,
1414 &perhash->eqfuncoids,
1415 &perhash->hashfunctions);
1416 perhash->hashslot =
1417 ExecAllocTableSlot(&estate->es_tupleTable, hashDesc,
1418 &TTSOpsMinimalTuple);
1419
1420 list_free(hashTlist);
1421 bms_free(colnos);
1422 }
1423
1424 bms_free(base_colnos);
1425 }
1426
1427 /*
1428 * Estimate per-hash-table-entry overhead for the planner.
1429 *
1430 * Note that the estimate does not include space for pass-by-reference
1431 * transition data values, nor for the representative tuple of each group.
1432 * Nor does this account of the target fill-factor and growth policy of the
1433 * hash table.
1434 */
1435 Size
hash_agg_entry_size(int numAggs)1436 hash_agg_entry_size(int numAggs)
1437 {
1438 Size entrysize;
1439
1440 /* This must match build_hash_table */
1441 entrysize = sizeof(TupleHashEntryData) +
1442 numAggs * sizeof(AggStatePerGroupData);
1443 entrysize = MAXALIGN(entrysize);
1444
1445 return entrysize;
1446 }
1447
1448 /*
1449 * Find or create a hashtable entry for the tuple group containing the current
1450 * tuple (already set in tmpcontext's outertuple slot), in the current grouping
1451 * set (which the caller must have selected - note that initialize_aggregate
1452 * depends on this).
1453 *
1454 * When called, CurrentMemoryContext should be the per-query context.
1455 */
1456 static TupleHashEntryData *
lookup_hash_entry(AggState * aggstate)1457 lookup_hash_entry(AggState *aggstate)
1458 {
1459 TupleTableSlot *inputslot = aggstate->tmpcontext->ecxt_outertuple;
1460 AggStatePerHash perhash = &aggstate->perhash[aggstate->current_set];
1461 TupleTableSlot *hashslot = perhash->hashslot;
1462 TupleHashEntryData *entry;
1463 bool isnew;
1464 int i;
1465
1466 /* transfer just the needed columns into hashslot */
1467 slot_getsomeattrs(inputslot, perhash->largestGrpColIdx);
1468 ExecClearTuple(hashslot);
1469
1470 for (i = 0; i < perhash->numhashGrpCols; i++)
1471 {
1472 int varNumber = perhash->hashGrpColIdxInput[i] - 1;
1473
1474 hashslot->tts_values[i] = inputslot->tts_values[varNumber];
1475 hashslot->tts_isnull[i] = inputslot->tts_isnull[varNumber];
1476 }
1477 ExecStoreVirtualTuple(hashslot);
1478
1479 /* find or create the hashtable entry using the filtered tuple */
1480 entry = LookupTupleHashEntry(perhash->hashtable, hashslot, &isnew);
1481
1482 if (isnew)
1483 {
1484 AggStatePerGroup pergroup;
1485 int transno;
1486
1487 pergroup = (AggStatePerGroup)
1488 MemoryContextAlloc(perhash->hashtable->tablecxt,
1489 sizeof(AggStatePerGroupData) * aggstate->numtrans);
1490 entry->additional = pergroup;
1491
1492 /*
1493 * Initialize aggregates for new tuple group, lookup_hash_entries()
1494 * already has selected the relevant grouping set.
1495 */
1496 for (transno = 0; transno < aggstate->numtrans; transno++)
1497 {
1498 AggStatePerTrans pertrans = &aggstate->pertrans[transno];
1499 AggStatePerGroup pergroupstate = &pergroup[transno];
1500
1501 initialize_aggregate(aggstate, pertrans, pergroupstate);
1502 }
1503 }
1504
1505 return entry;
1506 }
1507
1508 /*
1509 * Look up hash entries for the current tuple in all hashed grouping sets,
1510 * returning an array of pergroup pointers suitable for advance_aggregates.
1511 *
1512 * Be aware that lookup_hash_entry can reset the tmpcontext.
1513 */
1514 static void
lookup_hash_entries(AggState * aggstate)1515 lookup_hash_entries(AggState *aggstate)
1516 {
1517 int numHashes = aggstate->num_hashes;
1518 AggStatePerGroup *pergroup = aggstate->hash_pergroup;
1519 int setno;
1520
1521 for (setno = 0; setno < numHashes; setno++)
1522 {
1523 select_current_set(aggstate, setno, true);
1524 pergroup[setno] = lookup_hash_entry(aggstate)->additional;
1525 }
1526 }
1527
1528 /*
1529 * ExecAgg -
1530 *
1531 * ExecAgg receives tuples from its outer subplan and aggregates over
1532 * the appropriate attribute for each aggregate function use (Aggref
1533 * node) appearing in the targetlist or qual of the node. The number
1534 * of tuples to aggregate over depends on whether grouped or plain
1535 * aggregation is selected. In grouped aggregation, we produce a result
1536 * row for each group; in plain aggregation there's a single result row
1537 * for the whole query. In either case, the value of each aggregate is
1538 * stored in the expression context to be used when ExecProject evaluates
1539 * the result tuple.
1540 */
1541 static TupleTableSlot *
ExecAgg(PlanState * pstate)1542 ExecAgg(PlanState *pstate)
1543 {
1544 AggState *node = castNode(AggState, pstate);
1545 TupleTableSlot *result = NULL;
1546
1547 CHECK_FOR_INTERRUPTS();
1548
1549 if (!node->agg_done)
1550 {
1551 /* Dispatch based on strategy */
1552 switch (node->phase->aggstrategy)
1553 {
1554 case AGG_HASHED:
1555 if (!node->table_filled)
1556 agg_fill_hash_table(node);
1557 /* FALLTHROUGH */
1558 case AGG_MIXED:
1559 result = agg_retrieve_hash_table(node);
1560 break;
1561 case AGG_PLAIN:
1562 case AGG_SORTED:
1563 result = agg_retrieve_direct(node);
1564 break;
1565 }
1566
1567 if (!TupIsNull(result))
1568 return result;
1569 }
1570
1571 return NULL;
1572 }
1573
1574 /*
1575 * ExecAgg for non-hashed case
1576 */
1577 static TupleTableSlot *
agg_retrieve_direct(AggState * aggstate)1578 agg_retrieve_direct(AggState *aggstate)
1579 {
1580 Agg *node = aggstate->phase->aggnode;
1581 ExprContext *econtext;
1582 ExprContext *tmpcontext;
1583 AggStatePerAgg peragg;
1584 AggStatePerGroup *pergroups;
1585 TupleTableSlot *outerslot;
1586 TupleTableSlot *firstSlot;
1587 TupleTableSlot *result;
1588 bool hasGroupingSets = aggstate->phase->numsets > 0;
1589 int numGroupingSets = Max(aggstate->phase->numsets, 1);
1590 int currentSet;
1591 int nextSetSize;
1592 int numReset;
1593 int i;
1594
1595 /*
1596 * get state info from node
1597 *
1598 * econtext is the per-output-tuple expression context
1599 *
1600 * tmpcontext is the per-input-tuple expression context
1601 */
1602 econtext = aggstate->ss.ps.ps_ExprContext;
1603 tmpcontext = aggstate->tmpcontext;
1604
1605 peragg = aggstate->peragg;
1606 pergroups = aggstate->pergroups;
1607 firstSlot = aggstate->ss.ss_ScanTupleSlot;
1608
1609 /*
1610 * We loop retrieving groups until we find one matching
1611 * aggstate->ss.ps.qual
1612 *
1613 * For grouping sets, we have the invariant that aggstate->projected_set
1614 * is either -1 (initial call) or the index (starting from 0) in
1615 * gset_lengths for the group we just completed (either by projecting a
1616 * row or by discarding it in the qual).
1617 */
1618 while (!aggstate->agg_done)
1619 {
1620 /*
1621 * Clear the per-output-tuple context for each group, as well as
1622 * aggcontext (which contains any pass-by-ref transvalues of the old
1623 * group). Some aggregate functions store working state in child
1624 * contexts; those now get reset automatically without us needing to
1625 * do anything special.
1626 *
1627 * We use ReScanExprContext not just ResetExprContext because we want
1628 * any registered shutdown callbacks to be called. That allows
1629 * aggregate functions to ensure they've cleaned up any non-memory
1630 * resources.
1631 */
1632 ReScanExprContext(econtext);
1633
1634 /*
1635 * Determine how many grouping sets need to be reset at this boundary.
1636 */
1637 if (aggstate->projected_set >= 0 &&
1638 aggstate->projected_set < numGroupingSets)
1639 numReset = aggstate->projected_set + 1;
1640 else
1641 numReset = numGroupingSets;
1642
1643 /*
1644 * numReset can change on a phase boundary, but that's OK; we want to
1645 * reset the contexts used in _this_ phase, and later, after possibly
1646 * changing phase, initialize the right number of aggregates for the
1647 * _new_ phase.
1648 */
1649
1650 for (i = 0; i < numReset; i++)
1651 {
1652 ReScanExprContext(aggstate->aggcontexts[i]);
1653 }
1654
1655 /*
1656 * Check if input is complete and there are no more groups to project
1657 * in this phase; move to next phase or mark as done.
1658 */
1659 if (aggstate->input_done == true &&
1660 aggstate->projected_set >= (numGroupingSets - 1))
1661 {
1662 if (aggstate->current_phase < aggstate->numphases - 1)
1663 {
1664 initialize_phase(aggstate, aggstate->current_phase + 1);
1665 aggstate->input_done = false;
1666 aggstate->projected_set = -1;
1667 numGroupingSets = Max(aggstate->phase->numsets, 1);
1668 node = aggstate->phase->aggnode;
1669 numReset = numGroupingSets;
1670 }
1671 else if (aggstate->aggstrategy == AGG_MIXED)
1672 {
1673 /*
1674 * Mixed mode; we've output all the grouped stuff and have
1675 * full hashtables, so switch to outputting those.
1676 */
1677 initialize_phase(aggstate, 0);
1678 aggstate->table_filled = true;
1679 ResetTupleHashIterator(aggstate->perhash[0].hashtable,
1680 &aggstate->perhash[0].hashiter);
1681 select_current_set(aggstate, 0, true);
1682 return agg_retrieve_hash_table(aggstate);
1683 }
1684 else
1685 {
1686 aggstate->agg_done = true;
1687 break;
1688 }
1689 }
1690
1691 /*
1692 * Get the number of columns in the next grouping set after the last
1693 * projected one (if any). This is the number of columns to compare to
1694 * see if we reached the boundary of that set too.
1695 */
1696 if (aggstate->projected_set >= 0 &&
1697 aggstate->projected_set < (numGroupingSets - 1))
1698 nextSetSize = aggstate->phase->gset_lengths[aggstate->projected_set + 1];
1699 else
1700 nextSetSize = 0;
1701
1702 /*----------
1703 * If a subgroup for the current grouping set is present, project it.
1704 *
1705 * We have a new group if:
1706 * - we're out of input but haven't projected all grouping sets
1707 * (checked above)
1708 * OR
1709 * - we already projected a row that wasn't from the last grouping
1710 * set
1711 * AND
1712 * - the next grouping set has at least one grouping column (since
1713 * empty grouping sets project only once input is exhausted)
1714 * AND
1715 * - the previous and pending rows differ on the grouping columns
1716 * of the next grouping set
1717 *----------
1718 */
1719 tmpcontext->ecxt_innertuple = econtext->ecxt_outertuple;
1720 if (aggstate->input_done ||
1721 (node->aggstrategy != AGG_PLAIN &&
1722 aggstate->projected_set != -1 &&
1723 aggstate->projected_set < (numGroupingSets - 1) &&
1724 nextSetSize > 0 &&
1725 !ExecQualAndReset(aggstate->phase->eqfunctions[nextSetSize - 1],
1726 tmpcontext)))
1727 {
1728 aggstate->projected_set += 1;
1729
1730 Assert(aggstate->projected_set < numGroupingSets);
1731 Assert(nextSetSize > 0 || aggstate->input_done);
1732 }
1733 else
1734 {
1735 /*
1736 * We no longer care what group we just projected, the next
1737 * projection will always be the first (or only) grouping set
1738 * (unless the input proves to be empty).
1739 */
1740 aggstate->projected_set = 0;
1741
1742 /*
1743 * If we don't already have the first tuple of the new group,
1744 * fetch it from the outer plan.
1745 */
1746 if (aggstate->grp_firstTuple == NULL)
1747 {
1748 outerslot = fetch_input_tuple(aggstate);
1749 if (!TupIsNull(outerslot))
1750 {
1751 /*
1752 * Make a copy of the first input tuple; we will use this
1753 * for comparisons (in group mode) and for projection.
1754 */
1755 aggstate->grp_firstTuple = ExecCopySlotHeapTuple(outerslot);
1756 }
1757 else
1758 {
1759 /* outer plan produced no tuples at all */
1760 if (hasGroupingSets)
1761 {
1762 /*
1763 * If there was no input at all, we need to project
1764 * rows only if there are grouping sets of size 0.
1765 * Note that this implies that there can't be any
1766 * references to ungrouped Vars, which would otherwise
1767 * cause issues with the empty output slot.
1768 *
1769 * XXX: This is no longer true, we currently deal with
1770 * this in finalize_aggregates().
1771 */
1772 aggstate->input_done = true;
1773
1774 while (aggstate->phase->gset_lengths[aggstate->projected_set] > 0)
1775 {
1776 aggstate->projected_set += 1;
1777 if (aggstate->projected_set >= numGroupingSets)
1778 {
1779 /*
1780 * We can't set agg_done here because we might
1781 * have more phases to do, even though the
1782 * input is empty. So we need to restart the
1783 * whole outer loop.
1784 */
1785 break;
1786 }
1787 }
1788
1789 if (aggstate->projected_set >= numGroupingSets)
1790 continue;
1791 }
1792 else
1793 {
1794 aggstate->agg_done = true;
1795 /* If we are grouping, we should produce no tuples too */
1796 if (node->aggstrategy != AGG_PLAIN)
1797 return NULL;
1798 }
1799 }
1800 }
1801
1802 /*
1803 * Initialize working state for a new input tuple group.
1804 */
1805 initialize_aggregates(aggstate, pergroups, numReset);
1806
1807 if (aggstate->grp_firstTuple != NULL)
1808 {
1809 /*
1810 * Store the copied first input tuple in the tuple table slot
1811 * reserved for it. The tuple will be deleted when it is
1812 * cleared from the slot.
1813 */
1814 ExecForceStoreHeapTuple(aggstate->grp_firstTuple,
1815 firstSlot, true);
1816 aggstate->grp_firstTuple = NULL; /* don't keep two pointers */
1817
1818 /* set up for first advance_aggregates call */
1819 tmpcontext->ecxt_outertuple = firstSlot;
1820
1821 /*
1822 * Process each outer-plan tuple, and then fetch the next one,
1823 * until we exhaust the outer plan or cross a group boundary.
1824 */
1825 for (;;)
1826 {
1827 /*
1828 * During phase 1 only of a mixed agg, we need to update
1829 * hashtables as well in advance_aggregates.
1830 */
1831 if (aggstate->aggstrategy == AGG_MIXED &&
1832 aggstate->current_phase == 1)
1833 {
1834 lookup_hash_entries(aggstate);
1835 }
1836
1837 /* Advance the aggregates (or combine functions) */
1838 advance_aggregates(aggstate);
1839
1840 /* Reset per-input-tuple context after each tuple */
1841 ResetExprContext(tmpcontext);
1842
1843 outerslot = fetch_input_tuple(aggstate);
1844 if (TupIsNull(outerslot))
1845 {
1846 /* no more outer-plan tuples available */
1847 if (hasGroupingSets)
1848 {
1849 aggstate->input_done = true;
1850 break;
1851 }
1852 else
1853 {
1854 aggstate->agg_done = true;
1855 break;
1856 }
1857 }
1858 /* set up for next advance_aggregates call */
1859 tmpcontext->ecxt_outertuple = outerslot;
1860
1861 /*
1862 * If we are grouping, check whether we've crossed a group
1863 * boundary.
1864 */
1865 if (node->aggstrategy != AGG_PLAIN)
1866 {
1867 tmpcontext->ecxt_innertuple = firstSlot;
1868 if (!ExecQual(aggstate->phase->eqfunctions[node->numCols - 1],
1869 tmpcontext))
1870 {
1871 aggstate->grp_firstTuple = ExecCopySlotHeapTuple(outerslot);
1872 break;
1873 }
1874 }
1875 }
1876 }
1877
1878 /*
1879 * Use the representative input tuple for any references to
1880 * non-aggregated input columns in aggregate direct args, the node
1881 * qual, and the tlist. (If we are not grouping, and there are no
1882 * input rows at all, we will come here with an empty firstSlot
1883 * ... but if not grouping, there can't be any references to
1884 * non-aggregated input columns, so no problem.)
1885 */
1886 econtext->ecxt_outertuple = firstSlot;
1887 }
1888
1889 Assert(aggstate->projected_set >= 0);
1890
1891 currentSet = aggstate->projected_set;
1892
1893 prepare_projection_slot(aggstate, econtext->ecxt_outertuple, currentSet);
1894
1895 select_current_set(aggstate, currentSet, false);
1896
1897 finalize_aggregates(aggstate,
1898 peragg,
1899 pergroups[currentSet]);
1900
1901 /*
1902 * If there's no row to project right now, we must continue rather
1903 * than returning a null since there might be more groups.
1904 */
1905 result = project_aggregates(aggstate);
1906 if (result)
1907 return result;
1908 }
1909
1910 /* No more groups */
1911 return NULL;
1912 }
1913
1914 /*
1915 * ExecAgg for hashed case: read input and build hash table
1916 */
1917 static void
agg_fill_hash_table(AggState * aggstate)1918 agg_fill_hash_table(AggState *aggstate)
1919 {
1920 TupleTableSlot *outerslot;
1921 ExprContext *tmpcontext = aggstate->tmpcontext;
1922
1923 /*
1924 * Process each outer-plan tuple, and then fetch the next one, until we
1925 * exhaust the outer plan.
1926 */
1927 for (;;)
1928 {
1929 outerslot = fetch_input_tuple(aggstate);
1930 if (TupIsNull(outerslot))
1931 break;
1932
1933 /* set up for lookup_hash_entries and advance_aggregates */
1934 tmpcontext->ecxt_outertuple = outerslot;
1935
1936 /* Find or build hashtable entries */
1937 lookup_hash_entries(aggstate);
1938
1939 /* Advance the aggregates (or combine functions) */
1940 advance_aggregates(aggstate);
1941
1942 /*
1943 * Reset per-input-tuple context after each tuple, but note that the
1944 * hash lookups do this too
1945 */
1946 ResetExprContext(aggstate->tmpcontext);
1947 }
1948
1949 aggstate->table_filled = true;
1950 /* Initialize to walk the first hash table */
1951 select_current_set(aggstate, 0, true);
1952 ResetTupleHashIterator(aggstate->perhash[0].hashtable,
1953 &aggstate->perhash[0].hashiter);
1954 }
1955
1956 /*
1957 * ExecAgg for hashed case: retrieving groups from hash table
1958 */
1959 static TupleTableSlot *
agg_retrieve_hash_table(AggState * aggstate)1960 agg_retrieve_hash_table(AggState *aggstate)
1961 {
1962 ExprContext *econtext;
1963 AggStatePerAgg peragg;
1964 AggStatePerGroup pergroup;
1965 TupleHashEntryData *entry;
1966 TupleTableSlot *firstSlot;
1967 TupleTableSlot *result;
1968 AggStatePerHash perhash;
1969
1970 /*
1971 * get state info from node.
1972 *
1973 * econtext is the per-output-tuple expression context.
1974 */
1975 econtext = aggstate->ss.ps.ps_ExprContext;
1976 peragg = aggstate->peragg;
1977 firstSlot = aggstate->ss.ss_ScanTupleSlot;
1978
1979 /*
1980 * Note that perhash (and therefore anything accessed through it) can
1981 * change inside the loop, as we change between grouping sets.
1982 */
1983 perhash = &aggstate->perhash[aggstate->current_set];
1984
1985 /*
1986 * We loop retrieving groups until we find one satisfying
1987 * aggstate->ss.ps.qual
1988 */
1989 while (!aggstate->agg_done)
1990 {
1991 TupleTableSlot *hashslot = perhash->hashslot;
1992 int i;
1993
1994 CHECK_FOR_INTERRUPTS();
1995
1996 /*
1997 * Find the next entry in the hash table
1998 */
1999 entry = ScanTupleHashTable(perhash->hashtable, &perhash->hashiter);
2000 if (entry == NULL)
2001 {
2002 int nextset = aggstate->current_set + 1;
2003
2004 if (nextset < aggstate->num_hashes)
2005 {
2006 /*
2007 * Switch to next grouping set, reinitialize, and restart the
2008 * loop.
2009 */
2010 select_current_set(aggstate, nextset, true);
2011
2012 perhash = &aggstate->perhash[aggstate->current_set];
2013
2014 ResetTupleHashIterator(perhash->hashtable, &perhash->hashiter);
2015
2016 continue;
2017 }
2018 else
2019 {
2020 /* No more hashtables, so done */
2021 aggstate->agg_done = true;
2022 return NULL;
2023 }
2024 }
2025
2026 /*
2027 * Clear the per-output-tuple context for each group
2028 *
2029 * We intentionally don't use ReScanExprContext here; if any aggs have
2030 * registered shutdown callbacks, they mustn't be called yet, since we
2031 * might not be done with that agg.
2032 */
2033 ResetExprContext(econtext);
2034
2035 /*
2036 * Transform representative tuple back into one with the right
2037 * columns.
2038 */
2039 ExecStoreMinimalTuple(entry->firstTuple, hashslot, false);
2040 slot_getallattrs(hashslot);
2041
2042 ExecClearTuple(firstSlot);
2043 memset(firstSlot->tts_isnull, true,
2044 firstSlot->tts_tupleDescriptor->natts * sizeof(bool));
2045
2046 for (i = 0; i < perhash->numhashGrpCols; i++)
2047 {
2048 int varNumber = perhash->hashGrpColIdxInput[i] - 1;
2049
2050 firstSlot->tts_values[varNumber] = hashslot->tts_values[i];
2051 firstSlot->tts_isnull[varNumber] = hashslot->tts_isnull[i];
2052 }
2053 ExecStoreVirtualTuple(firstSlot);
2054
2055 pergroup = (AggStatePerGroup) entry->additional;
2056
2057 /*
2058 * Use the representative input tuple for any references to
2059 * non-aggregated input columns in the qual and tlist.
2060 */
2061 econtext->ecxt_outertuple = firstSlot;
2062
2063 prepare_projection_slot(aggstate,
2064 econtext->ecxt_outertuple,
2065 aggstate->current_set);
2066
2067 finalize_aggregates(aggstate, peragg, pergroup);
2068
2069 result = project_aggregates(aggstate);
2070 if (result)
2071 return result;
2072 }
2073
2074 /* No more groups */
2075 return NULL;
2076 }
2077
2078 /* -----------------
2079 * ExecInitAgg
2080 *
2081 * Creates the run-time information for the agg node produced by the
2082 * planner and initializes its outer subtree.
2083 *
2084 * -----------------
2085 */
2086 AggState *
ExecInitAgg(Agg * node,EState * estate,int eflags)2087 ExecInitAgg(Agg *node, EState *estate, int eflags)
2088 {
2089 AggState *aggstate;
2090 AggStatePerAgg peraggs;
2091 AggStatePerTrans pertransstates;
2092 AggStatePerGroup *pergroups;
2093 Plan *outerPlan;
2094 ExprContext *econtext;
2095 TupleDesc scanDesc;
2096 int numaggs,
2097 transno,
2098 aggno;
2099 int phase;
2100 int phaseidx;
2101 ListCell *l;
2102 Bitmapset *all_grouped_cols = NULL;
2103 int numGroupingSets = 1;
2104 int numPhases;
2105 int numHashes;
2106 int i = 0;
2107 int j = 0;
2108 bool use_hashing = (node->aggstrategy == AGG_HASHED ||
2109 node->aggstrategy == AGG_MIXED);
2110
2111 /* check for unsupported flags */
2112 Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
2113
2114 /*
2115 * create state structure
2116 */
2117 aggstate = makeNode(AggState);
2118 aggstate->ss.ps.plan = (Plan *) node;
2119 aggstate->ss.ps.state = estate;
2120 aggstate->ss.ps.ExecProcNode = ExecAgg;
2121
2122 aggstate->aggs = NIL;
2123 aggstate->numaggs = 0;
2124 aggstate->numtrans = 0;
2125 aggstate->aggstrategy = node->aggstrategy;
2126 aggstate->aggsplit = node->aggsplit;
2127 aggstate->maxsets = 0;
2128 aggstate->projected_set = -1;
2129 aggstate->current_set = 0;
2130 aggstate->peragg = NULL;
2131 aggstate->pertrans = NULL;
2132 aggstate->curperagg = NULL;
2133 aggstate->curpertrans = NULL;
2134 aggstate->input_done = false;
2135 aggstate->agg_done = false;
2136 aggstate->pergroups = NULL;
2137 aggstate->grp_firstTuple = NULL;
2138 aggstate->sort_in = NULL;
2139 aggstate->sort_out = NULL;
2140
2141 /*
2142 * phases[0] always exists, but is dummy in sorted/plain mode
2143 */
2144 numPhases = (use_hashing ? 1 : 2);
2145 numHashes = (use_hashing ? 1 : 0);
2146
2147 /*
2148 * Calculate the maximum number of grouping sets in any phase; this
2149 * determines the size of some allocations. Also calculate the number of
2150 * phases, since all hashed/mixed nodes contribute to only a single phase.
2151 */
2152 if (node->groupingSets)
2153 {
2154 numGroupingSets = list_length(node->groupingSets);
2155
2156 foreach(l, node->chain)
2157 {
2158 Agg *agg = lfirst(l);
2159
2160 numGroupingSets = Max(numGroupingSets,
2161 list_length(agg->groupingSets));
2162
2163 /*
2164 * additional AGG_HASHED aggs become part of phase 0, but all
2165 * others add an extra phase.
2166 */
2167 if (agg->aggstrategy != AGG_HASHED)
2168 ++numPhases;
2169 else
2170 ++numHashes;
2171 }
2172 }
2173
2174 aggstate->maxsets = numGroupingSets;
2175 aggstate->numphases = numPhases;
2176
2177 aggstate->aggcontexts = (ExprContext **)
2178 palloc0(sizeof(ExprContext *) * numGroupingSets);
2179
2180 /*
2181 * Create expression contexts. We need three or more, one for
2182 * per-input-tuple processing, one for per-output-tuple processing, one
2183 * for all the hashtables, and one for each grouping set. The per-tuple
2184 * memory context of the per-grouping-set ExprContexts (aggcontexts)
2185 * replaces the standalone memory context formerly used to hold transition
2186 * values. We cheat a little by using ExecAssignExprContext() to build
2187 * all of them.
2188 *
2189 * NOTE: the details of what is stored in aggcontexts and what is stored
2190 * in the regular per-query memory context are driven by a simple
2191 * decision: we want to reset the aggcontext at group boundaries (if not
2192 * hashing) and in ExecReScanAgg to recover no-longer-wanted space.
2193 */
2194 ExecAssignExprContext(estate, &aggstate->ss.ps);
2195 aggstate->tmpcontext = aggstate->ss.ps.ps_ExprContext;
2196
2197 for (i = 0; i < numGroupingSets; ++i)
2198 {
2199 ExecAssignExprContext(estate, &aggstate->ss.ps);
2200 aggstate->aggcontexts[i] = aggstate->ss.ps.ps_ExprContext;
2201 }
2202
2203 if (use_hashing)
2204 {
2205 ExecAssignExprContext(estate, &aggstate->ss.ps);
2206 aggstate->hashcontext = aggstate->ss.ps.ps_ExprContext;
2207 }
2208
2209 ExecAssignExprContext(estate, &aggstate->ss.ps);
2210
2211 /*
2212 * Initialize child nodes.
2213 *
2214 * If we are doing a hashed aggregation then the child plan does not need
2215 * to handle REWIND efficiently; see ExecReScanAgg.
2216 */
2217 if (node->aggstrategy == AGG_HASHED)
2218 eflags &= ~EXEC_FLAG_REWIND;
2219 outerPlan = outerPlan(node);
2220 outerPlanState(aggstate) = ExecInitNode(outerPlan, estate, eflags);
2221
2222 /*
2223 * initialize source tuple type.
2224 */
2225 aggstate->ss.ps.outerops =
2226 ExecGetResultSlotOps(outerPlanState(&aggstate->ss),
2227 &aggstate->ss.ps.outeropsfixed);
2228 aggstate->ss.ps.outeropsset = true;
2229
2230 ExecCreateScanSlotFromOuterPlan(estate, &aggstate->ss,
2231 aggstate->ss.ps.outerops);
2232 scanDesc = aggstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
2233
2234 /*
2235 * If there are more than two phases (including a potential dummy phase
2236 * 0), input will be resorted using tuplesort. Need a slot for that.
2237 */
2238 if (numPhases > 2)
2239 {
2240 aggstate->sort_slot = ExecInitExtraTupleSlot(estate, scanDesc,
2241 &TTSOpsMinimalTuple);
2242
2243 /*
2244 * The output of the tuplesort, and the output from the outer child
2245 * might not use the same type of slot. In most cases the child will
2246 * be a Sort, and thus return a TTSOpsMinimalTuple type slot - but the
2247 * input can also be be presorted due an index, in which case it could
2248 * be a different type of slot.
2249 *
2250 * XXX: For efficiency it would be good to instead/additionally
2251 * generate expressions with corresponding settings of outerops* for
2252 * the individual phases - deforming is often a bottleneck for
2253 * aggregations with lots of rows per group. If there's multiple
2254 * sorts, we know that all but the first use TTSOpsMinimalTuple (via
2255 * the nodeAgg.c internal tuplesort).
2256 */
2257 if (aggstate->ss.ps.outeropsfixed &&
2258 aggstate->ss.ps.outerops != &TTSOpsMinimalTuple)
2259 aggstate->ss.ps.outeropsfixed = false;
2260 }
2261
2262 /*
2263 * Initialize result type, slot and projection.
2264 */
2265 ExecInitResultTupleSlotTL(&aggstate->ss.ps, &TTSOpsVirtual);
2266 ExecAssignProjectionInfo(&aggstate->ss.ps, NULL);
2267
2268 /*
2269 * initialize child expressions
2270 *
2271 * We expect the parser to have checked that no aggs contain other agg
2272 * calls in their arguments (and just to be sure, we verify it again while
2273 * initializing the plan node). This would make no sense under SQL
2274 * semantics, and it's forbidden by the spec. Because it is true, we
2275 * don't need to worry about evaluating the aggs in any particular order.
2276 *
2277 * Note: execExpr.c finds Aggrefs for us, and adds their AggrefExprState
2278 * nodes to aggstate->aggs. Aggrefs in the qual are found here; Aggrefs
2279 * in the targetlist are found during ExecAssignProjectionInfo, below.
2280 */
2281 aggstate->ss.ps.qual =
2282 ExecInitQual(node->plan.qual, (PlanState *) aggstate);
2283
2284 /*
2285 * We should now have found all Aggrefs in the targetlist and quals.
2286 */
2287 numaggs = aggstate->numaggs;
2288 Assert(numaggs == list_length(aggstate->aggs));
2289
2290 /*
2291 * For each phase, prepare grouping set data and fmgr lookup data for
2292 * compare functions. Accumulate all_grouped_cols in passing.
2293 */
2294 aggstate->phases = palloc0(numPhases * sizeof(AggStatePerPhaseData));
2295
2296 aggstate->num_hashes = numHashes;
2297 if (numHashes)
2298 {
2299 aggstate->perhash = palloc0(sizeof(AggStatePerHashData) * numHashes);
2300 aggstate->phases[0].numsets = 0;
2301 aggstate->phases[0].gset_lengths = palloc(numHashes * sizeof(int));
2302 aggstate->phases[0].grouped_cols = palloc(numHashes * sizeof(Bitmapset *));
2303 }
2304
2305 phase = 0;
2306 for (phaseidx = 0; phaseidx <= list_length(node->chain); ++phaseidx)
2307 {
2308 Agg *aggnode;
2309 Sort *sortnode;
2310
2311 if (phaseidx > 0)
2312 {
2313 aggnode = list_nth_node(Agg, node->chain, phaseidx - 1);
2314 sortnode = castNode(Sort, aggnode->plan.lefttree);
2315 }
2316 else
2317 {
2318 aggnode = node;
2319 sortnode = NULL;
2320 }
2321
2322 Assert(phase <= 1 || sortnode);
2323
2324 if (aggnode->aggstrategy == AGG_HASHED
2325 || aggnode->aggstrategy == AGG_MIXED)
2326 {
2327 AggStatePerPhase phasedata = &aggstate->phases[0];
2328 AggStatePerHash perhash;
2329 Bitmapset *cols = NULL;
2330
2331 Assert(phase == 0);
2332 i = phasedata->numsets++;
2333 perhash = &aggstate->perhash[i];
2334
2335 /* phase 0 always points to the "real" Agg in the hash case */
2336 phasedata->aggnode = node;
2337 phasedata->aggstrategy = node->aggstrategy;
2338
2339 /* but the actual Agg node representing this hash is saved here */
2340 perhash->aggnode = aggnode;
2341
2342 phasedata->gset_lengths[i] = perhash->numCols = aggnode->numCols;
2343
2344 for (j = 0; j < aggnode->numCols; ++j)
2345 cols = bms_add_member(cols, aggnode->grpColIdx[j]);
2346
2347 phasedata->grouped_cols[i] = cols;
2348
2349 all_grouped_cols = bms_add_members(all_grouped_cols, cols);
2350 continue;
2351 }
2352 else
2353 {
2354 AggStatePerPhase phasedata = &aggstate->phases[++phase];
2355 int num_sets;
2356
2357 phasedata->numsets = num_sets = list_length(aggnode->groupingSets);
2358
2359 if (num_sets)
2360 {
2361 phasedata->gset_lengths = palloc(num_sets * sizeof(int));
2362 phasedata->grouped_cols = palloc(num_sets * sizeof(Bitmapset *));
2363
2364 i = 0;
2365 foreach(l, aggnode->groupingSets)
2366 {
2367 int current_length = list_length(lfirst(l));
2368 Bitmapset *cols = NULL;
2369
2370 /* planner forces this to be correct */
2371 for (j = 0; j < current_length; ++j)
2372 cols = bms_add_member(cols, aggnode->grpColIdx[j]);
2373
2374 phasedata->grouped_cols[i] = cols;
2375 phasedata->gset_lengths[i] = current_length;
2376
2377 ++i;
2378 }
2379
2380 all_grouped_cols = bms_add_members(all_grouped_cols,
2381 phasedata->grouped_cols[0]);
2382 }
2383 else
2384 {
2385 Assert(phaseidx == 0);
2386
2387 phasedata->gset_lengths = NULL;
2388 phasedata->grouped_cols = NULL;
2389 }
2390
2391 /*
2392 * If we are grouping, precompute fmgr lookup data for inner loop.
2393 */
2394 if (aggnode->aggstrategy == AGG_SORTED)
2395 {
2396 int i = 0;
2397
2398 Assert(aggnode->numCols > 0);
2399
2400 /*
2401 * Build a separate function for each subset of columns that
2402 * need to be compared.
2403 */
2404 phasedata->eqfunctions =
2405 (ExprState **) palloc0(aggnode->numCols * sizeof(ExprState *));
2406
2407 /* for each grouping set */
2408 for (i = 0; i < phasedata->numsets; i++)
2409 {
2410 int length = phasedata->gset_lengths[i];
2411
2412 if (phasedata->eqfunctions[length - 1] != NULL)
2413 continue;
2414
2415 phasedata->eqfunctions[length - 1] =
2416 execTuplesMatchPrepare(scanDesc,
2417 length,
2418 aggnode->grpColIdx,
2419 aggnode->grpOperators,
2420 aggnode->grpCollations,
2421 (PlanState *) aggstate);
2422 }
2423
2424 /* and for all grouped columns, unless already computed */
2425 if (phasedata->eqfunctions[aggnode->numCols - 1] == NULL)
2426 {
2427 phasedata->eqfunctions[aggnode->numCols - 1] =
2428 execTuplesMatchPrepare(scanDesc,
2429 aggnode->numCols,
2430 aggnode->grpColIdx,
2431 aggnode->grpOperators,
2432 aggnode->grpCollations,
2433 (PlanState *) aggstate);
2434 }
2435 }
2436
2437 phasedata->aggnode = aggnode;
2438 phasedata->aggstrategy = aggnode->aggstrategy;
2439 phasedata->sortnode = sortnode;
2440 }
2441 }
2442
2443 /*
2444 * Convert all_grouped_cols to a descending-order list.
2445 */
2446 i = -1;
2447 while ((i = bms_next_member(all_grouped_cols, i)) >= 0)
2448 aggstate->all_grouped_cols = lcons_int(i, aggstate->all_grouped_cols);
2449
2450 /*
2451 * Set up aggregate-result storage in the output expr context, and also
2452 * allocate my private per-agg working storage
2453 */
2454 econtext = aggstate->ss.ps.ps_ExprContext;
2455 econtext->ecxt_aggvalues = (Datum *) palloc0(sizeof(Datum) * numaggs);
2456 econtext->ecxt_aggnulls = (bool *) palloc0(sizeof(bool) * numaggs);
2457
2458 peraggs = (AggStatePerAgg) palloc0(sizeof(AggStatePerAggData) * numaggs);
2459 pertransstates = (AggStatePerTrans) palloc0(sizeof(AggStatePerTransData) * numaggs);
2460
2461 aggstate->peragg = peraggs;
2462 aggstate->pertrans = pertransstates;
2463
2464
2465 aggstate->all_pergroups =
2466 (AggStatePerGroup *) palloc0(sizeof(AggStatePerGroup)
2467 * (numGroupingSets + numHashes));
2468 pergroups = aggstate->all_pergroups;
2469
2470 if (node->aggstrategy != AGG_HASHED)
2471 {
2472 for (i = 0; i < numGroupingSets; i++)
2473 {
2474 pergroups[i] = (AggStatePerGroup) palloc0(sizeof(AggStatePerGroupData)
2475 * numaggs);
2476 }
2477
2478 aggstate->pergroups = pergroups;
2479 pergroups += numGroupingSets;
2480 }
2481
2482 /*
2483 * Hashing can only appear in the initial phase.
2484 */
2485 if (use_hashing)
2486 {
2487 /* this is an array of pointers, not structures */
2488 aggstate->hash_pergroup = pergroups;
2489
2490 find_hash_columns(aggstate);
2491
2492 /* Skip massive memory allocation if we are just doing EXPLAIN */
2493 if (!(eflags & EXEC_FLAG_EXPLAIN_ONLY))
2494 build_hash_table(aggstate);
2495
2496 aggstate->table_filled = false;
2497 }
2498
2499 /*
2500 * Initialize current phase-dependent values to initial phase. The initial
2501 * phase is 1 (first sort pass) for all strategies that use sorting (if
2502 * hashing is being done too, then phase 0 is processed last); but if only
2503 * hashing is being done, then phase 0 is all there is.
2504 */
2505 if (node->aggstrategy == AGG_HASHED)
2506 {
2507 aggstate->current_phase = 0;
2508 initialize_phase(aggstate, 0);
2509 select_current_set(aggstate, 0, true);
2510 }
2511 else
2512 {
2513 aggstate->current_phase = 1;
2514 initialize_phase(aggstate, 1);
2515 select_current_set(aggstate, 0, false);
2516 }
2517
2518 /* -----------------
2519 * Perform lookups of aggregate function info, and initialize the
2520 * unchanging fields of the per-agg and per-trans data.
2521 *
2522 * We try to optimize by detecting duplicate aggregate functions so that
2523 * their state and final values are re-used, rather than needlessly being
2524 * re-calculated independently. We also detect aggregates that are not
2525 * the same, but which can share the same transition state.
2526 *
2527 * Scenarios:
2528 *
2529 * 1. Identical aggregate function calls appear in the query:
2530 *
2531 * SELECT SUM(x) FROM ... HAVING SUM(x) > 0
2532 *
2533 * Since these aggregates are identical, we only need to calculate
2534 * the value once. Both aggregates will share the same 'aggno' value.
2535 *
2536 * 2. Two different aggregate functions appear in the query, but the
2537 * aggregates have the same arguments, transition functions and
2538 * initial values (and, presumably, different final functions):
2539 *
2540 * SELECT AVG(x), STDDEV(x) FROM ...
2541 *
2542 * In this case we must create a new peragg for the varying aggregate,
2543 * and we need to call the final functions separately, but we need
2544 * only run the transition function once. (This requires that the
2545 * final functions be nondestructive of the transition state, but
2546 * that's required anyway for other reasons.)
2547 *
2548 * For either of these optimizations to be valid, all aggregate properties
2549 * used in the transition phase must be the same, including any modifiers
2550 * such as ORDER BY, DISTINCT and FILTER, and the arguments mustn't
2551 * contain any volatile functions.
2552 * -----------------
2553 */
2554 aggno = -1;
2555 transno = -1;
2556 foreach(l, aggstate->aggs)
2557 {
2558 AggrefExprState *aggrefstate = (AggrefExprState *) lfirst(l);
2559 Aggref *aggref = aggrefstate->aggref;
2560 AggStatePerAgg peragg;
2561 AggStatePerTrans pertrans;
2562 int existing_aggno;
2563 int existing_transno;
2564 List *same_input_transnos;
2565 Oid inputTypes[FUNC_MAX_ARGS];
2566 int numArguments;
2567 int numDirectArgs;
2568 HeapTuple aggTuple;
2569 Form_pg_aggregate aggform;
2570 AclResult aclresult;
2571 Oid transfn_oid,
2572 finalfn_oid;
2573 bool shareable;
2574 Oid serialfn_oid,
2575 deserialfn_oid;
2576 Expr *finalfnexpr;
2577 Oid aggtranstype;
2578 Datum textInitVal;
2579 Datum initValue;
2580 bool initValueIsNull;
2581
2582 /* Planner should have assigned aggregate to correct level */
2583 Assert(aggref->agglevelsup == 0);
2584 /* ... and the split mode should match */
2585 Assert(aggref->aggsplit == aggstate->aggsplit);
2586
2587 /* 1. Check for already processed aggs which can be re-used */
2588 existing_aggno = find_compatible_peragg(aggref, aggstate, aggno,
2589 &same_input_transnos);
2590 if (existing_aggno != -1)
2591 {
2592 /*
2593 * Existing compatible agg found. so just point the Aggref to the
2594 * same per-agg struct.
2595 */
2596 aggrefstate->aggno = existing_aggno;
2597 continue;
2598 }
2599
2600 /* Mark Aggref state node with assigned index in the result array */
2601 peragg = &peraggs[++aggno];
2602 peragg->aggref = aggref;
2603 aggrefstate->aggno = aggno;
2604
2605 /* Fetch the pg_aggregate row */
2606 aggTuple = SearchSysCache1(AGGFNOID,
2607 ObjectIdGetDatum(aggref->aggfnoid));
2608 if (!HeapTupleIsValid(aggTuple))
2609 elog(ERROR, "cache lookup failed for aggregate %u",
2610 aggref->aggfnoid);
2611 aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
2612
2613 /* Check permission to call aggregate function */
2614 aclresult = pg_proc_aclcheck(aggref->aggfnoid, GetUserId(),
2615 ACL_EXECUTE);
2616 if (aclresult != ACLCHECK_OK)
2617 aclcheck_error(aclresult, OBJECT_AGGREGATE,
2618 get_func_name(aggref->aggfnoid));
2619 InvokeFunctionExecuteHook(aggref->aggfnoid);
2620
2621 /* planner recorded transition state type in the Aggref itself */
2622 aggtranstype = aggref->aggtranstype;
2623 Assert(OidIsValid(aggtranstype));
2624
2625 /*
2626 * If this aggregation is performing state combines, then instead of
2627 * using the transition function, we'll use the combine function
2628 */
2629 if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit))
2630 {
2631 transfn_oid = aggform->aggcombinefn;
2632
2633 /* If not set then the planner messed up */
2634 if (!OidIsValid(transfn_oid))
2635 elog(ERROR, "combinefn not set for aggregate function");
2636 }
2637 else
2638 transfn_oid = aggform->aggtransfn;
2639
2640 /* Final function only required if we're finalizing the aggregates */
2641 if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit))
2642 peragg->finalfn_oid = finalfn_oid = InvalidOid;
2643 else
2644 peragg->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
2645
2646 /*
2647 * If finalfn is marked read-write, we can't share transition states;
2648 * but it is okay to share states for AGGMODIFY_SHAREABLE aggs. Also,
2649 * if we're not executing the finalfn here, we can share regardless.
2650 */
2651 shareable = (aggform->aggfinalmodify != AGGMODIFY_READ_WRITE) ||
2652 (finalfn_oid == InvalidOid);
2653 peragg->shareable = shareable;
2654
2655 serialfn_oid = InvalidOid;
2656 deserialfn_oid = InvalidOid;
2657
2658 /*
2659 * Check if serialization/deserialization is required. We only do it
2660 * for aggregates that have transtype INTERNAL.
2661 */
2662 if (aggtranstype == INTERNALOID)
2663 {
2664 /*
2665 * The planner should only have generated a serialize agg node if
2666 * every aggregate with an INTERNAL state has a serialization
2667 * function. Verify that.
2668 */
2669 if (DO_AGGSPLIT_SERIALIZE(aggstate->aggsplit))
2670 {
2671 /* serialization only valid when not running finalfn */
2672 Assert(DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit));
2673
2674 if (!OidIsValid(aggform->aggserialfn))
2675 elog(ERROR, "serialfunc not provided for serialization aggregation");
2676 serialfn_oid = aggform->aggserialfn;
2677 }
2678
2679 /* Likewise for deserialization functions */
2680 if (DO_AGGSPLIT_DESERIALIZE(aggstate->aggsplit))
2681 {
2682 /* deserialization only valid when combining states */
2683 Assert(DO_AGGSPLIT_COMBINE(aggstate->aggsplit));
2684
2685 if (!OidIsValid(aggform->aggdeserialfn))
2686 elog(ERROR, "deserialfunc not provided for deserialization aggregation");
2687 deserialfn_oid = aggform->aggdeserialfn;
2688 }
2689 }
2690
2691 /* Check that aggregate owner has permission to call component fns */
2692 {
2693 HeapTuple procTuple;
2694 Oid aggOwner;
2695
2696 procTuple = SearchSysCache1(PROCOID,
2697 ObjectIdGetDatum(aggref->aggfnoid));
2698 if (!HeapTupleIsValid(procTuple))
2699 elog(ERROR, "cache lookup failed for function %u",
2700 aggref->aggfnoid);
2701 aggOwner = ((Form_pg_proc) GETSTRUCT(procTuple))->proowner;
2702 ReleaseSysCache(procTuple);
2703
2704 aclresult = pg_proc_aclcheck(transfn_oid, aggOwner,
2705 ACL_EXECUTE);
2706 if (aclresult != ACLCHECK_OK)
2707 aclcheck_error(aclresult, OBJECT_FUNCTION,
2708 get_func_name(transfn_oid));
2709 InvokeFunctionExecuteHook(transfn_oid);
2710 if (OidIsValid(finalfn_oid))
2711 {
2712 aclresult = pg_proc_aclcheck(finalfn_oid, aggOwner,
2713 ACL_EXECUTE);
2714 if (aclresult != ACLCHECK_OK)
2715 aclcheck_error(aclresult, OBJECT_FUNCTION,
2716 get_func_name(finalfn_oid));
2717 InvokeFunctionExecuteHook(finalfn_oid);
2718 }
2719 if (OidIsValid(serialfn_oid))
2720 {
2721 aclresult = pg_proc_aclcheck(serialfn_oid, aggOwner,
2722 ACL_EXECUTE);
2723 if (aclresult != ACLCHECK_OK)
2724 aclcheck_error(aclresult, OBJECT_FUNCTION,
2725 get_func_name(serialfn_oid));
2726 InvokeFunctionExecuteHook(serialfn_oid);
2727 }
2728 if (OidIsValid(deserialfn_oid))
2729 {
2730 aclresult = pg_proc_aclcheck(deserialfn_oid, aggOwner,
2731 ACL_EXECUTE);
2732 if (aclresult != ACLCHECK_OK)
2733 aclcheck_error(aclresult, OBJECT_FUNCTION,
2734 get_func_name(deserialfn_oid));
2735 InvokeFunctionExecuteHook(deserialfn_oid);
2736 }
2737 }
2738
2739 /*
2740 * Get actual datatypes of the (nominal) aggregate inputs. These
2741 * could be different from the agg's declared input types, when the
2742 * agg accepts ANY or a polymorphic type.
2743 */
2744 numArguments = get_aggregate_argtypes(aggref, inputTypes);
2745
2746 /* Count the "direct" arguments, if any */
2747 numDirectArgs = list_length(aggref->aggdirectargs);
2748
2749 /* Detect how many arguments to pass to the finalfn */
2750 if (aggform->aggfinalextra)
2751 peragg->numFinalArgs = numArguments + 1;
2752 else
2753 peragg->numFinalArgs = numDirectArgs + 1;
2754
2755 /* Initialize any direct-argument expressions */
2756 peragg->aggdirectargs = ExecInitExprList(aggref->aggdirectargs,
2757 (PlanState *) aggstate);
2758
2759 /*
2760 * build expression trees using actual argument & result types for the
2761 * finalfn, if it exists and is required.
2762 */
2763 if (OidIsValid(finalfn_oid))
2764 {
2765 build_aggregate_finalfn_expr(inputTypes,
2766 peragg->numFinalArgs,
2767 aggtranstype,
2768 aggref->aggtype,
2769 aggref->inputcollid,
2770 finalfn_oid,
2771 &finalfnexpr);
2772 fmgr_info(finalfn_oid, &peragg->finalfn);
2773 fmgr_info_set_expr((Node *) finalfnexpr, &peragg->finalfn);
2774 }
2775
2776 /* get info about the output value's datatype */
2777 get_typlenbyval(aggref->aggtype,
2778 &peragg->resulttypeLen,
2779 &peragg->resulttypeByVal);
2780
2781 /*
2782 * initval is potentially null, so don't try to access it as a struct
2783 * field. Must do it the hard way with SysCacheGetAttr.
2784 */
2785 textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple,
2786 Anum_pg_aggregate_agginitval,
2787 &initValueIsNull);
2788 if (initValueIsNull)
2789 initValue = (Datum) 0;
2790 else
2791 initValue = GetAggInitVal(textInitVal, aggtranstype);
2792
2793 /*
2794 * 2. Build working state for invoking the transition function, or
2795 * look up previously initialized working state, if we can share it.
2796 *
2797 * find_compatible_peragg() already collected a list of shareable
2798 * per-Trans's with the same inputs. Check if any of them have the
2799 * same transition function and initial value.
2800 */
2801 existing_transno = find_compatible_pertrans(aggstate, aggref,
2802 shareable,
2803 transfn_oid, aggtranstype,
2804 serialfn_oid, deserialfn_oid,
2805 initValue, initValueIsNull,
2806 same_input_transnos);
2807 if (existing_transno != -1)
2808 {
2809 /*
2810 * Existing compatible trans found, so just point the 'peragg' to
2811 * the same per-trans struct, and mark the trans state as shared.
2812 */
2813 pertrans = &pertransstates[existing_transno];
2814 pertrans->aggshared = true;
2815 peragg->transno = existing_transno;
2816 }
2817 else
2818 {
2819 pertrans = &pertransstates[++transno];
2820 build_pertrans_for_aggref(pertrans, aggstate, estate,
2821 aggref, transfn_oid, aggtranstype,
2822 serialfn_oid, deserialfn_oid,
2823 initValue, initValueIsNull,
2824 inputTypes, numArguments);
2825 peragg->transno = transno;
2826 }
2827 ReleaseSysCache(aggTuple);
2828 }
2829
2830 /*
2831 * Update aggstate->numaggs to be the number of unique aggregates found.
2832 * Also set numstates to the number of unique transition states found.
2833 */
2834 aggstate->numaggs = aggno + 1;
2835 aggstate->numtrans = transno + 1;
2836
2837 /*
2838 * Last, check whether any more aggregates got added onto the node while
2839 * we processed the expressions for the aggregate arguments (including not
2840 * only the regular arguments and FILTER expressions handled immediately
2841 * above, but any direct arguments we might've handled earlier). If so,
2842 * we have nested aggregate functions, which is semantically nonsensical,
2843 * so complain. (This should have been caught by the parser, so we don't
2844 * need to work hard on a helpful error message; but we defend against it
2845 * here anyway, just to be sure.)
2846 */
2847 if (numaggs != list_length(aggstate->aggs))
2848 ereport(ERROR,
2849 (errcode(ERRCODE_GROUPING_ERROR),
2850 errmsg("aggregate function calls cannot be nested")));
2851
2852 /*
2853 * Build expressions doing all the transition work at once. We build a
2854 * different one for each phase, as the number of transition function
2855 * invocation can differ between phases. Note this'll work both for
2856 * transition and combination functions (although there'll only be one
2857 * phase in the latter case).
2858 */
2859 for (phaseidx = 0; phaseidx < aggstate->numphases; phaseidx++)
2860 {
2861 AggStatePerPhase phase = &aggstate->phases[phaseidx];
2862 bool dohash = false;
2863 bool dosort = false;
2864
2865 /* phase 0 doesn't necessarily exist */
2866 if (!phase->aggnode)
2867 continue;
2868
2869 if (aggstate->aggstrategy == AGG_MIXED && phaseidx == 1)
2870 {
2871 /*
2872 * Phase one, and only phase one, in a mixed agg performs both
2873 * sorting and aggregation.
2874 */
2875 dohash = true;
2876 dosort = true;
2877 }
2878 else if (aggstate->aggstrategy == AGG_MIXED && phaseidx == 0)
2879 {
2880 /*
2881 * No need to compute a transition function for an AGG_MIXED phase
2882 * 0 - the contents of the hashtables will have been computed
2883 * during phase 1.
2884 */
2885 continue;
2886 }
2887 else if (phase->aggstrategy == AGG_PLAIN ||
2888 phase->aggstrategy == AGG_SORTED)
2889 {
2890 dohash = false;
2891 dosort = true;
2892 }
2893 else if (phase->aggstrategy == AGG_HASHED)
2894 {
2895 dohash = true;
2896 dosort = false;
2897 }
2898 else
2899 Assert(false);
2900
2901 phase->evaltrans = ExecBuildAggTrans(aggstate, phase, dosort, dohash);
2902
2903 }
2904
2905 return aggstate;
2906 }
2907
2908 /*
2909 * Build the state needed to calculate a state value for an aggregate.
2910 *
2911 * This initializes all the fields in 'pertrans'. 'aggref' is the aggregate
2912 * to initialize the state for. 'aggtransfn', 'aggtranstype', and the rest
2913 * of the arguments could be calculated from 'aggref', but the caller has
2914 * calculated them already, so might as well pass them.
2915 */
2916 static void
build_pertrans_for_aggref(AggStatePerTrans pertrans,AggState * aggstate,EState * estate,Aggref * aggref,Oid aggtransfn,Oid aggtranstype,Oid aggserialfn,Oid aggdeserialfn,Datum initValue,bool initValueIsNull,Oid * inputTypes,int numArguments)2917 build_pertrans_for_aggref(AggStatePerTrans pertrans,
2918 AggState *aggstate, EState *estate,
2919 Aggref *aggref,
2920 Oid aggtransfn, Oid aggtranstype,
2921 Oid aggserialfn, Oid aggdeserialfn,
2922 Datum initValue, bool initValueIsNull,
2923 Oid *inputTypes, int numArguments)
2924 {
2925 int numGroupingSets = Max(aggstate->maxsets, 1);
2926 Expr *serialfnexpr = NULL;
2927 Expr *deserialfnexpr = NULL;
2928 ListCell *lc;
2929 int numInputs;
2930 int numDirectArgs;
2931 List *sortlist;
2932 int numSortCols;
2933 int numDistinctCols;
2934 int i;
2935
2936 /* Begin filling in the pertrans data */
2937 pertrans->aggref = aggref;
2938 pertrans->aggshared = false;
2939 pertrans->aggCollation = aggref->inputcollid;
2940 pertrans->transfn_oid = aggtransfn;
2941 pertrans->serialfn_oid = aggserialfn;
2942 pertrans->deserialfn_oid = aggdeserialfn;
2943 pertrans->initValue = initValue;
2944 pertrans->initValueIsNull = initValueIsNull;
2945
2946 /* Count the "direct" arguments, if any */
2947 numDirectArgs = list_length(aggref->aggdirectargs);
2948
2949 /* Count the number of aggregated input columns */
2950 pertrans->numInputs = numInputs = list_length(aggref->args);
2951
2952 pertrans->aggtranstype = aggtranstype;
2953
2954 /*
2955 * When combining states, we have no use at all for the aggregate
2956 * function's transfn. Instead we use the combinefn. In this case, the
2957 * transfn and transfn_oid fields of pertrans refer to the combine
2958 * function rather than the transition function.
2959 */
2960 if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit))
2961 {
2962 Expr *combinefnexpr;
2963 size_t numTransArgs;
2964
2965 /*
2966 * When combining there's only one input, the to-be-combined added
2967 * transition value from below (this node's transition value is
2968 * counted separately).
2969 */
2970 pertrans->numTransInputs = 1;
2971
2972 /* account for the current transition state */
2973 numTransArgs = pertrans->numTransInputs + 1;
2974
2975 build_aggregate_combinefn_expr(aggtranstype,
2976 aggref->inputcollid,
2977 aggtransfn,
2978 &combinefnexpr);
2979 fmgr_info(aggtransfn, &pertrans->transfn);
2980 fmgr_info_set_expr((Node *) combinefnexpr, &pertrans->transfn);
2981
2982 pertrans->transfn_fcinfo =
2983 (FunctionCallInfo) palloc(SizeForFunctionCallInfo(2));
2984 InitFunctionCallInfoData(*pertrans->transfn_fcinfo,
2985 &pertrans->transfn,
2986 numTransArgs,
2987 pertrans->aggCollation,
2988 (void *) aggstate, NULL);
2989
2990 /*
2991 * Ensure that a combine function to combine INTERNAL states is not
2992 * strict. This should have been checked during CREATE AGGREGATE, but
2993 * the strict property could have been changed since then.
2994 */
2995 if (pertrans->transfn.fn_strict && aggtranstype == INTERNALOID)
2996 ereport(ERROR,
2997 (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
2998 errmsg("combine function with transition type %s must not be declared STRICT",
2999 format_type_be(aggtranstype))));
3000 }
3001 else
3002 {
3003 Expr *transfnexpr;
3004 size_t numTransArgs;
3005
3006 /* Detect how many arguments to pass to the transfn */
3007 if (AGGKIND_IS_ORDERED_SET(aggref->aggkind))
3008 pertrans->numTransInputs = numInputs;
3009 else
3010 pertrans->numTransInputs = numArguments;
3011
3012 /* account for the current transition state */
3013 numTransArgs = pertrans->numTransInputs + 1;
3014
3015 /*
3016 * Set up infrastructure for calling the transfn. Note that invtrans
3017 * is not needed here.
3018 */
3019 build_aggregate_transfn_expr(inputTypes,
3020 numArguments,
3021 numDirectArgs,
3022 aggref->aggvariadic,
3023 aggtranstype,
3024 aggref->inputcollid,
3025 aggtransfn,
3026 InvalidOid,
3027 &transfnexpr,
3028 NULL);
3029 fmgr_info(aggtransfn, &pertrans->transfn);
3030 fmgr_info_set_expr((Node *) transfnexpr, &pertrans->transfn);
3031
3032 pertrans->transfn_fcinfo =
3033 (FunctionCallInfo) palloc(SizeForFunctionCallInfo(numTransArgs));
3034 InitFunctionCallInfoData(*pertrans->transfn_fcinfo,
3035 &pertrans->transfn,
3036 numTransArgs,
3037 pertrans->aggCollation,
3038 (void *) aggstate, NULL);
3039
3040 /*
3041 * If the transfn is strict and the initval is NULL, make sure input
3042 * type and transtype are the same (or at least binary-compatible), so
3043 * that it's OK to use the first aggregated input value as the initial
3044 * transValue. This should have been checked at agg definition time,
3045 * but we must check again in case the transfn's strictness property
3046 * has been changed.
3047 */
3048 if (pertrans->transfn.fn_strict && pertrans->initValueIsNull)
3049 {
3050 if (numArguments <= numDirectArgs ||
3051 !IsBinaryCoercible(inputTypes[numDirectArgs],
3052 aggtranstype))
3053 ereport(ERROR,
3054 (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
3055 errmsg("aggregate %u needs to have compatible input type and transition type",
3056 aggref->aggfnoid)));
3057 }
3058 }
3059
3060 /* get info about the state value's datatype */
3061 get_typlenbyval(aggtranstype,
3062 &pertrans->transtypeLen,
3063 &pertrans->transtypeByVal);
3064
3065 if (OidIsValid(aggserialfn))
3066 {
3067 build_aggregate_serialfn_expr(aggserialfn,
3068 &serialfnexpr);
3069 fmgr_info(aggserialfn, &pertrans->serialfn);
3070 fmgr_info_set_expr((Node *) serialfnexpr, &pertrans->serialfn);
3071
3072 pertrans->serialfn_fcinfo =
3073 (FunctionCallInfo) palloc(SizeForFunctionCallInfo(1));
3074 InitFunctionCallInfoData(*pertrans->serialfn_fcinfo,
3075 &pertrans->serialfn,
3076 1,
3077 InvalidOid,
3078 (void *) aggstate, NULL);
3079 }
3080
3081 if (OidIsValid(aggdeserialfn))
3082 {
3083 build_aggregate_deserialfn_expr(aggdeserialfn,
3084 &deserialfnexpr);
3085 fmgr_info(aggdeserialfn, &pertrans->deserialfn);
3086 fmgr_info_set_expr((Node *) deserialfnexpr, &pertrans->deserialfn);
3087
3088 pertrans->deserialfn_fcinfo =
3089 (FunctionCallInfo) palloc(SizeForFunctionCallInfo(2));
3090 InitFunctionCallInfoData(*pertrans->deserialfn_fcinfo,
3091 &pertrans->deserialfn,
3092 2,
3093 InvalidOid,
3094 (void *) aggstate, NULL);
3095
3096 }
3097
3098 /*
3099 * If we're doing either DISTINCT or ORDER BY for a plain agg, then we
3100 * have a list of SortGroupClause nodes; fish out the data in them and
3101 * stick them into arrays. We ignore ORDER BY for an ordered-set agg,
3102 * however; the agg's transfn and finalfn are responsible for that.
3103 *
3104 * Note that by construction, if there is a DISTINCT clause then the ORDER
3105 * BY clause is a prefix of it (see transformDistinctClause).
3106 */
3107 if (AGGKIND_IS_ORDERED_SET(aggref->aggkind))
3108 {
3109 sortlist = NIL;
3110 numSortCols = numDistinctCols = 0;
3111 }
3112 else if (aggref->aggdistinct)
3113 {
3114 sortlist = aggref->aggdistinct;
3115 numSortCols = numDistinctCols = list_length(sortlist);
3116 Assert(numSortCols >= list_length(aggref->aggorder));
3117 }
3118 else
3119 {
3120 sortlist = aggref->aggorder;
3121 numSortCols = list_length(sortlist);
3122 numDistinctCols = 0;
3123 }
3124
3125 pertrans->numSortCols = numSortCols;
3126 pertrans->numDistinctCols = numDistinctCols;
3127
3128 /*
3129 * If we have either sorting or filtering to do, create a tupledesc and
3130 * slot corresponding to the aggregated inputs (including sort
3131 * expressions) of the agg.
3132 */
3133 if (numSortCols > 0 || aggref->aggfilter)
3134 {
3135 pertrans->sortdesc = ExecTypeFromTL(aggref->args);
3136 pertrans->sortslot =
3137 ExecInitExtraTupleSlot(estate, pertrans->sortdesc,
3138 &TTSOpsMinimalTuple);
3139 }
3140
3141 if (numSortCols > 0)
3142 {
3143 /*
3144 * We don't implement DISTINCT or ORDER BY aggs in the HASHED case
3145 * (yet)
3146 */
3147 Assert(aggstate->aggstrategy != AGG_HASHED && aggstate->aggstrategy != AGG_MIXED);
3148
3149 /* If we have only one input, we need its len/byval info. */
3150 if (numInputs == 1)
3151 {
3152 get_typlenbyval(inputTypes[numDirectArgs],
3153 &pertrans->inputtypeLen,
3154 &pertrans->inputtypeByVal);
3155 }
3156 else if (numDistinctCols > 0)
3157 {
3158 /* we will need an extra slot to store prior values */
3159 pertrans->uniqslot =
3160 ExecInitExtraTupleSlot(estate, pertrans->sortdesc,
3161 &TTSOpsMinimalTuple);
3162 }
3163
3164 /* Extract the sort information for use later */
3165 pertrans->sortColIdx =
3166 (AttrNumber *) palloc(numSortCols * sizeof(AttrNumber));
3167 pertrans->sortOperators =
3168 (Oid *) palloc(numSortCols * sizeof(Oid));
3169 pertrans->sortCollations =
3170 (Oid *) palloc(numSortCols * sizeof(Oid));
3171 pertrans->sortNullsFirst =
3172 (bool *) palloc(numSortCols * sizeof(bool));
3173
3174 i = 0;
3175 foreach(lc, sortlist)
3176 {
3177 SortGroupClause *sortcl = (SortGroupClause *) lfirst(lc);
3178 TargetEntry *tle = get_sortgroupclause_tle(sortcl, aggref->args);
3179
3180 /* the parser should have made sure of this */
3181 Assert(OidIsValid(sortcl->sortop));
3182
3183 pertrans->sortColIdx[i] = tle->resno;
3184 pertrans->sortOperators[i] = sortcl->sortop;
3185 pertrans->sortCollations[i] = exprCollation((Node *) tle->expr);
3186 pertrans->sortNullsFirst[i] = sortcl->nulls_first;
3187 i++;
3188 }
3189 Assert(i == numSortCols);
3190 }
3191
3192 if (aggref->aggdistinct)
3193 {
3194 Oid *ops;
3195
3196 Assert(numArguments > 0);
3197 Assert(list_length(aggref->aggdistinct) == numDistinctCols);
3198
3199 ops = palloc(numDistinctCols * sizeof(Oid));
3200
3201 i = 0;
3202 foreach(lc, aggref->aggdistinct)
3203 ops[i++] = ((SortGroupClause *) lfirst(lc))->eqop;
3204
3205 /* lookup / build the necessary comparators */
3206 if (numDistinctCols == 1)
3207 fmgr_info(get_opcode(ops[0]), &pertrans->equalfnOne);
3208 else
3209 pertrans->equalfnMulti =
3210 execTuplesMatchPrepare(pertrans->sortdesc,
3211 numDistinctCols,
3212 pertrans->sortColIdx,
3213 ops,
3214 pertrans->sortCollations,
3215 &aggstate->ss.ps);
3216 pfree(ops);
3217 }
3218
3219 pertrans->sortstates = (Tuplesortstate **)
3220 palloc0(sizeof(Tuplesortstate *) * numGroupingSets);
3221 }
3222
3223
3224 static Datum
GetAggInitVal(Datum textInitVal,Oid transtype)3225 GetAggInitVal(Datum textInitVal, Oid transtype)
3226 {
3227 Oid typinput,
3228 typioparam;
3229 char *strInitVal;
3230 Datum initVal;
3231
3232 getTypeInputInfo(transtype, &typinput, &typioparam);
3233 strInitVal = TextDatumGetCString(textInitVal);
3234 initVal = OidInputFunctionCall(typinput, strInitVal,
3235 typioparam, -1);
3236 pfree(strInitVal);
3237 return initVal;
3238 }
3239
3240 /*
3241 * find_compatible_peragg - search for a previously initialized per-Agg struct
3242 *
3243 * Searches the previously looked at aggregates to find one which is compatible
3244 * with this one, with the same input parameters. If no compatible aggregate
3245 * can be found, returns -1.
3246 *
3247 * As a side-effect, this also collects a list of existing, shareable per-Trans
3248 * structs with matching inputs. If no identical Aggref is found, the list is
3249 * passed later to find_compatible_pertrans, to see if we can at least reuse
3250 * the state value of another aggregate.
3251 */
3252 static int
find_compatible_peragg(Aggref * newagg,AggState * aggstate,int lastaggno,List ** same_input_transnos)3253 find_compatible_peragg(Aggref *newagg, AggState *aggstate,
3254 int lastaggno, List **same_input_transnos)
3255 {
3256 int aggno;
3257 AggStatePerAgg peraggs;
3258
3259 *same_input_transnos = NIL;
3260
3261 /* we mustn't reuse the aggref if it contains volatile function calls */
3262 if (contain_volatile_functions((Node *) newagg))
3263 return -1;
3264
3265 peraggs = aggstate->peragg;
3266
3267 /*
3268 * Search through the list of already seen aggregates. If we find an
3269 * existing identical aggregate call, then we can re-use that one. While
3270 * searching, we'll also collect a list of Aggrefs with the same input
3271 * parameters. If no matching Aggref is found, the caller can potentially
3272 * still re-use the transition state of one of them. (At this stage we
3273 * just compare the parsetrees; whether different aggregates share the
3274 * same transition function will be checked later.)
3275 */
3276 for (aggno = 0; aggno <= lastaggno; aggno++)
3277 {
3278 AggStatePerAgg peragg;
3279 Aggref *existingRef;
3280
3281 peragg = &peraggs[aggno];
3282 existingRef = peragg->aggref;
3283
3284 /* all of the following must be the same or it's no match */
3285 if (newagg->inputcollid != existingRef->inputcollid ||
3286 newagg->aggtranstype != existingRef->aggtranstype ||
3287 newagg->aggstar != existingRef->aggstar ||
3288 newagg->aggvariadic != existingRef->aggvariadic ||
3289 newagg->aggkind != existingRef->aggkind ||
3290 !equal(newagg->args, existingRef->args) ||
3291 !equal(newagg->aggorder, existingRef->aggorder) ||
3292 !equal(newagg->aggdistinct, existingRef->aggdistinct) ||
3293 !equal(newagg->aggfilter, existingRef->aggfilter))
3294 continue;
3295
3296 /* if it's the same aggregate function then report exact match */
3297 if (newagg->aggfnoid == existingRef->aggfnoid &&
3298 newagg->aggtype == existingRef->aggtype &&
3299 newagg->aggcollid == existingRef->aggcollid &&
3300 equal(newagg->aggdirectargs, existingRef->aggdirectargs))
3301 {
3302 list_free(*same_input_transnos);
3303 *same_input_transnos = NIL;
3304 return aggno;
3305 }
3306
3307 /*
3308 * Not identical, but it had the same inputs. If the final function
3309 * permits sharing, return its transno to the caller, in case we can
3310 * re-use its per-trans state. (If there's already sharing going on,
3311 * we might report a transno more than once. find_compatible_pertrans
3312 * is cheap enough that it's not worth spending cycles to avoid that.)
3313 */
3314 if (peragg->shareable)
3315 *same_input_transnos = lappend_int(*same_input_transnos,
3316 peragg->transno);
3317 }
3318
3319 return -1;
3320 }
3321
3322 /*
3323 * find_compatible_pertrans - search for a previously initialized per-Trans
3324 * struct
3325 *
3326 * Searches the list of transnos for a per-Trans struct with the same
3327 * transition function and initial condition. (The inputs have already been
3328 * verified to match.)
3329 */
3330 static int
find_compatible_pertrans(AggState * aggstate,Aggref * newagg,bool shareable,Oid aggtransfn,Oid aggtranstype,Oid aggserialfn,Oid aggdeserialfn,Datum initValue,bool initValueIsNull,List * transnos)3331 find_compatible_pertrans(AggState *aggstate, Aggref *newagg, bool shareable,
3332 Oid aggtransfn, Oid aggtranstype,
3333 Oid aggserialfn, Oid aggdeserialfn,
3334 Datum initValue, bool initValueIsNull,
3335 List *transnos)
3336 {
3337 ListCell *lc;
3338
3339 /* If this aggregate can't share transition states, give up */
3340 if (!shareable)
3341 return -1;
3342
3343 foreach(lc, transnos)
3344 {
3345 int transno = lfirst_int(lc);
3346 AggStatePerTrans pertrans = &aggstate->pertrans[transno];
3347
3348 /*
3349 * if the transfns or transition state types are not the same then the
3350 * state can't be shared.
3351 */
3352 if (aggtransfn != pertrans->transfn_oid ||
3353 aggtranstype != pertrans->aggtranstype)
3354 continue;
3355
3356 /*
3357 * The serialization and deserialization functions must match, if
3358 * present, as we're unable to share the trans state for aggregates
3359 * which will serialize or deserialize into different formats.
3360 * Remember that these will be InvalidOid if they're not required for
3361 * this agg node.
3362 */
3363 if (aggserialfn != pertrans->serialfn_oid ||
3364 aggdeserialfn != pertrans->deserialfn_oid)
3365 continue;
3366
3367 /*
3368 * Check that the initial condition matches, too.
3369 */
3370 if (initValueIsNull && pertrans->initValueIsNull)
3371 return transno;
3372
3373 if (!initValueIsNull && !pertrans->initValueIsNull &&
3374 datumIsEqual(initValue, pertrans->initValue,
3375 pertrans->transtypeByVal, pertrans->transtypeLen))
3376 return transno;
3377 }
3378 return -1;
3379 }
3380
3381 void
ExecEndAgg(AggState * node)3382 ExecEndAgg(AggState *node)
3383 {
3384 PlanState *outerPlan;
3385 int transno;
3386 int numGroupingSets = Max(node->maxsets, 1);
3387 int setno;
3388
3389 /* Make sure we have closed any open tuplesorts */
3390
3391 if (node->sort_in)
3392 tuplesort_end(node->sort_in);
3393 if (node->sort_out)
3394 tuplesort_end(node->sort_out);
3395
3396 for (transno = 0; transno < node->numtrans; transno++)
3397 {
3398 AggStatePerTrans pertrans = &node->pertrans[transno];
3399
3400 for (setno = 0; setno < numGroupingSets; setno++)
3401 {
3402 if (pertrans->sortstates[setno])
3403 tuplesort_end(pertrans->sortstates[setno]);
3404 }
3405 }
3406
3407 /* And ensure any agg shutdown callbacks have been called */
3408 for (setno = 0; setno < numGroupingSets; setno++)
3409 ReScanExprContext(node->aggcontexts[setno]);
3410 if (node->hashcontext)
3411 ReScanExprContext(node->hashcontext);
3412
3413 /*
3414 * We don't actually free any ExprContexts here (see comment in
3415 * ExecFreeExprContext), just unlinking the output one from the plan node
3416 * suffices.
3417 */
3418 ExecFreeExprContext(&node->ss.ps);
3419
3420 /* clean up tuple table */
3421 ExecClearTuple(node->ss.ss_ScanTupleSlot);
3422
3423 outerPlan = outerPlanState(node);
3424 ExecEndNode(outerPlan);
3425 }
3426
3427 void
ExecReScanAgg(AggState * node)3428 ExecReScanAgg(AggState *node)
3429 {
3430 ExprContext *econtext = node->ss.ps.ps_ExprContext;
3431 PlanState *outerPlan = outerPlanState(node);
3432 Agg *aggnode = (Agg *) node->ss.ps.plan;
3433 int transno;
3434 int numGroupingSets = Max(node->maxsets, 1);
3435 int setno;
3436
3437 node->agg_done = false;
3438
3439 if (node->aggstrategy == AGG_HASHED)
3440 {
3441 /*
3442 * In the hashed case, if we haven't yet built the hash table then we
3443 * can just return; nothing done yet, so nothing to undo. If subnode's
3444 * chgParam is not NULL then it will be re-scanned by ExecProcNode,
3445 * else no reason to re-scan it at all.
3446 */
3447 if (!node->table_filled)
3448 return;
3449
3450 /*
3451 * If we do have the hash table, and the subplan does not have any
3452 * parameter changes, and none of our own parameter changes affect
3453 * input expressions of the aggregated functions, then we can just
3454 * rescan the existing hash table; no need to build it again.
3455 */
3456 if (outerPlan->chgParam == NULL &&
3457 !bms_overlap(node->ss.ps.chgParam, aggnode->aggParams))
3458 {
3459 ResetTupleHashIterator(node->perhash[0].hashtable,
3460 &node->perhash[0].hashiter);
3461 select_current_set(node, 0, true);
3462 return;
3463 }
3464 }
3465
3466 /* Make sure we have closed any open tuplesorts */
3467 for (transno = 0; transno < node->numtrans; transno++)
3468 {
3469 for (setno = 0; setno < numGroupingSets; setno++)
3470 {
3471 AggStatePerTrans pertrans = &node->pertrans[transno];
3472
3473 if (pertrans->sortstates[setno])
3474 {
3475 tuplesort_end(pertrans->sortstates[setno]);
3476 pertrans->sortstates[setno] = NULL;
3477 }
3478 }
3479 }
3480
3481 /*
3482 * We don't need to ReScanExprContext the output tuple context here;
3483 * ExecReScan already did it. But we do need to reset our per-grouping-set
3484 * contexts, which may have transvalues stored in them. (We use rescan
3485 * rather than just reset because transfns may have registered callbacks
3486 * that need to be run now.) For the AGG_HASHED case, see below.
3487 */
3488
3489 for (setno = 0; setno < numGroupingSets; setno++)
3490 {
3491 ReScanExprContext(node->aggcontexts[setno]);
3492 }
3493
3494 /* Release first tuple of group, if we have made a copy */
3495 if (node->grp_firstTuple != NULL)
3496 {
3497 heap_freetuple(node->grp_firstTuple);
3498 node->grp_firstTuple = NULL;
3499 }
3500 ExecClearTuple(node->ss.ss_ScanTupleSlot);
3501
3502 /* Forget current agg values */
3503 MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * node->numaggs);
3504 MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * node->numaggs);
3505
3506 /*
3507 * With AGG_HASHED/MIXED, the hash table is allocated in a sub-context of
3508 * the hashcontext. This used to be an issue, but now, resetting a context
3509 * automatically deletes sub-contexts too.
3510 */
3511 if (node->aggstrategy == AGG_HASHED || node->aggstrategy == AGG_MIXED)
3512 {
3513 ReScanExprContext(node->hashcontext);
3514 /* Rebuild an empty hash table */
3515 build_hash_table(node);
3516 node->table_filled = false;
3517 /* iterator will be reset when the table is filled */
3518 }
3519
3520 if (node->aggstrategy != AGG_HASHED)
3521 {
3522 /*
3523 * Reset the per-group state (in particular, mark transvalues null)
3524 */
3525 for (setno = 0; setno < numGroupingSets; setno++)
3526 {
3527 MemSet(node->pergroups[setno], 0,
3528 sizeof(AggStatePerGroupData) * node->numaggs);
3529 }
3530
3531 /* reset to phase 1 */
3532 initialize_phase(node, 1);
3533
3534 node->input_done = false;
3535 node->projected_set = -1;
3536 }
3537
3538 if (outerPlan->chgParam == NULL)
3539 ExecReScan(outerPlan);
3540 }
3541
3542
3543 /***********************************************************************
3544 * API exposed to aggregate functions
3545 ***********************************************************************/
3546
3547
3548 /*
3549 * AggCheckCallContext - test if a SQL function is being called as an aggregate
3550 *
3551 * The transition and/or final functions of an aggregate may want to verify
3552 * that they are being called as aggregates, rather than as plain SQL
3553 * functions. They should use this function to do so. The return value
3554 * is nonzero if being called as an aggregate, or zero if not. (Specific
3555 * nonzero values are AGG_CONTEXT_AGGREGATE or AGG_CONTEXT_WINDOW, but more
3556 * values could conceivably appear in future.)
3557 *
3558 * If aggcontext isn't NULL, the function also stores at *aggcontext the
3559 * identity of the memory context that aggregate transition values are being
3560 * stored in. Note that the same aggregate call site (flinfo) may be called
3561 * interleaved on different transition values in different contexts, so it's
3562 * not kosher to cache aggcontext under fn_extra. It is, however, kosher to
3563 * cache it in the transvalue itself (for internal-type transvalues).
3564 */
3565 int
AggCheckCallContext(FunctionCallInfo fcinfo,MemoryContext * aggcontext)3566 AggCheckCallContext(FunctionCallInfo fcinfo, MemoryContext *aggcontext)
3567 {
3568 if (fcinfo->context && IsA(fcinfo->context, AggState))
3569 {
3570 if (aggcontext)
3571 {
3572 AggState *aggstate = ((AggState *) fcinfo->context);
3573 ExprContext *cxt = aggstate->curaggcontext;
3574
3575 *aggcontext = cxt->ecxt_per_tuple_memory;
3576 }
3577 return AGG_CONTEXT_AGGREGATE;
3578 }
3579 if (fcinfo->context && IsA(fcinfo->context, WindowAggState))
3580 {
3581 if (aggcontext)
3582 *aggcontext = ((WindowAggState *) fcinfo->context)->curaggcontext;
3583 return AGG_CONTEXT_WINDOW;
3584 }
3585
3586 /* this is just to prevent "uninitialized variable" warnings */
3587 if (aggcontext)
3588 *aggcontext = NULL;
3589 return 0;
3590 }
3591
3592 /*
3593 * AggGetAggref - allow an aggregate support function to get its Aggref
3594 *
3595 * If the function is being called as an aggregate support function,
3596 * return the Aggref node for the aggregate call. Otherwise, return NULL.
3597 *
3598 * Aggregates sharing the same inputs and transition functions can get
3599 * merged into a single transition calculation. If the transition function
3600 * calls AggGetAggref, it will get some one of the Aggrefs for which it is
3601 * executing. It must therefore not pay attention to the Aggref fields that
3602 * relate to the final function, as those are indeterminate. But if a final
3603 * function calls AggGetAggref, it will get a precise result.
3604 *
3605 * Note that if an aggregate is being used as a window function, this will
3606 * return NULL. We could provide a similar function to return the relevant
3607 * WindowFunc node in such cases, but it's not needed yet.
3608 */
3609 Aggref *
AggGetAggref(FunctionCallInfo fcinfo)3610 AggGetAggref(FunctionCallInfo fcinfo)
3611 {
3612 if (fcinfo->context && IsA(fcinfo->context, AggState))
3613 {
3614 AggState *aggstate = (AggState *) fcinfo->context;
3615 AggStatePerAgg curperagg;
3616 AggStatePerTrans curpertrans;
3617
3618 /* check curperagg (valid when in a final function) */
3619 curperagg = aggstate->curperagg;
3620
3621 if (curperagg)
3622 return curperagg->aggref;
3623
3624 /* check curpertrans (valid when in a transition function) */
3625 curpertrans = aggstate->curpertrans;
3626
3627 if (curpertrans)
3628 return curpertrans->aggref;
3629 }
3630 return NULL;
3631 }
3632
3633 /*
3634 * AggGetTempMemoryContext - fetch short-term memory context for aggregates
3635 *
3636 * This is useful in agg final functions; the context returned is one that
3637 * the final function can safely reset as desired. This isn't useful for
3638 * transition functions, since the context returned MAY (we don't promise)
3639 * be the same as the context those are called in.
3640 *
3641 * As above, this is currently not useful for aggs called as window functions.
3642 */
3643 MemoryContext
AggGetTempMemoryContext(FunctionCallInfo fcinfo)3644 AggGetTempMemoryContext(FunctionCallInfo fcinfo)
3645 {
3646 if (fcinfo->context && IsA(fcinfo->context, AggState))
3647 {
3648 AggState *aggstate = (AggState *) fcinfo->context;
3649
3650 return aggstate->tmpcontext->ecxt_per_tuple_memory;
3651 }
3652 return NULL;
3653 }
3654
3655 /*
3656 * AggStateIsShared - find out whether transition state is shared
3657 *
3658 * If the function is being called as an aggregate support function,
3659 * return true if the aggregate's transition state is shared across
3660 * multiple aggregates, false if it is not.
3661 *
3662 * Returns true if not called as an aggregate support function.
3663 * This is intended as a conservative answer, ie "no you'd better not
3664 * scribble on your input". In particular, will return true if the
3665 * aggregate is being used as a window function, which is a scenario
3666 * in which changing the transition state is a bad idea. We might
3667 * want to refine the behavior for the window case in future.
3668 */
3669 bool
AggStateIsShared(FunctionCallInfo fcinfo)3670 AggStateIsShared(FunctionCallInfo fcinfo)
3671 {
3672 if (fcinfo->context && IsA(fcinfo->context, AggState))
3673 {
3674 AggState *aggstate = (AggState *) fcinfo->context;
3675 AggStatePerAgg curperagg;
3676 AggStatePerTrans curpertrans;
3677
3678 /* check curperagg (valid when in a final function) */
3679 curperagg = aggstate->curperagg;
3680
3681 if (curperagg)
3682 return aggstate->pertrans[curperagg->transno].aggshared;
3683
3684 /* check curpertrans (valid when in a transition function) */
3685 curpertrans = aggstate->curpertrans;
3686
3687 if (curpertrans)
3688 return curpertrans->aggshared;
3689 }
3690 return true;
3691 }
3692
3693 /*
3694 * AggRegisterCallback - register a cleanup callback for an aggregate
3695 *
3696 * This is useful for aggs to register shutdown callbacks, which will ensure
3697 * that non-memory resources are freed. The callback will occur just before
3698 * the associated aggcontext (as returned by AggCheckCallContext) is reset,
3699 * either between groups or as a result of rescanning the query. The callback
3700 * will NOT be called on error paths. The typical use-case is for freeing of
3701 * tuplestores or tuplesorts maintained in aggcontext, or pins held by slots
3702 * created by the agg functions. (The callback will not be called until after
3703 * the result of the finalfn is no longer needed, so it's safe for the finalfn
3704 * to return data that will be freed by the callback.)
3705 *
3706 * As above, this is currently not useful for aggs called as window functions.
3707 */
3708 void
AggRegisterCallback(FunctionCallInfo fcinfo,ExprContextCallbackFunction func,Datum arg)3709 AggRegisterCallback(FunctionCallInfo fcinfo,
3710 ExprContextCallbackFunction func,
3711 Datum arg)
3712 {
3713 if (fcinfo->context && IsA(fcinfo->context, AggState))
3714 {
3715 AggState *aggstate = (AggState *) fcinfo->context;
3716 ExprContext *cxt = aggstate->curaggcontext;
3717
3718 RegisterExprContextCallback(cxt, func, arg);
3719
3720 return;
3721 }
3722 elog(ERROR, "aggregate function cannot register a callback in this context");
3723 }
3724
3725
3726 /*
3727 * aggregate_dummy - dummy execution routine for aggregate functions
3728 *
3729 * This function is listed as the implementation (prosrc field) of pg_proc
3730 * entries for aggregate functions. Its only purpose is to throw an error
3731 * if someone mistakenly executes such a function in the normal way.
3732 *
3733 * Perhaps someday we could assign real meaning to the prosrc field of
3734 * an aggregate?
3735 */
3736 Datum
aggregate_dummy(PG_FUNCTION_ARGS)3737 aggregate_dummy(PG_FUNCTION_ARGS)
3738 {
3739 elog(ERROR, "aggregate function %u called as normal function",
3740 fcinfo->flinfo->fn_oid);
3741 return (Datum) 0; /* keep compiler quiet */
3742 }
3743