1 /*-------------------------------------------------------------------------
2 *
3 * nodeAgg.c
4 * Routines to handle aggregate nodes.
5 *
6 * ExecAgg normally evaluates each aggregate in the following steps:
7 *
8 * transvalue = initcond
9 * foreach input_tuple do
10 * transvalue = transfunc(transvalue, input_value(s))
11 * result = finalfunc(transvalue, direct_argument(s))
12 *
13 * If a finalfunc is not supplied then the result is just the ending
14 * value of transvalue.
15 *
16 * Other behaviors can be selected by the "aggsplit" mode, which exists
17 * to support partial aggregation. It is possible to:
18 * * Skip running the finalfunc, so that the output is always the
19 * final transvalue state.
20 * * Substitute the combinefunc for the transfunc, so that transvalue
21 * states (propagated up from a child partial-aggregation step) are merged
22 * rather than processing raw input rows. (The statements below about
23 * the transfunc apply equally to the combinefunc, when it's selected.)
24 * * Apply the serializefunc to the output values (this only makes sense
25 * when skipping the finalfunc, since the serializefunc works on the
26 * transvalue data type).
27 * * Apply the deserializefunc to the input values (this only makes sense
28 * when using the combinefunc, for similar reasons).
29 * It is the planner's responsibility to connect up Agg nodes using these
30 * alternate behaviors in a way that makes sense, with partial aggregation
31 * results being fed to nodes that expect them.
32 *
33 * If a normal aggregate call specifies DISTINCT or ORDER BY, we sort the
34 * input tuples and eliminate duplicates (if required) before performing
35 * the above-depicted process. (However, we don't do that for ordered-set
36 * aggregates; their "ORDER BY" inputs are ordinary aggregate arguments
37 * so far as this module is concerned.) Note that partial aggregation
38 * is not supported in these cases, since we couldn't ensure global
39 * ordering or distinctness of the inputs.
40 *
41 * If transfunc is marked "strict" in pg_proc and initcond is NULL,
42 * then the first non-NULL input_value is assigned directly to transvalue,
43 * and transfunc isn't applied until the second non-NULL input_value.
44 * The agg's first input type and transtype must be the same in this case!
45 *
46 * If transfunc is marked "strict" then NULL input_values are skipped,
47 * keeping the previous transvalue. If transfunc is not strict then it
48 * is called for every input tuple and must deal with NULL initcond
49 * or NULL input_values for itself.
50 *
51 * If finalfunc is marked "strict" then it is not called when the
52 * ending transvalue is NULL, instead a NULL result is created
53 * automatically (this is just the usual handling of strict functions,
54 * of course). A non-strict finalfunc can make its own choice of
55 * what to return for a NULL ending transvalue.
56 *
57 * Ordered-set aggregates are treated specially in one other way: we
58 * evaluate any "direct" arguments and pass them to the finalfunc along
59 * with the transition value.
60 *
61 * A finalfunc can have additional arguments beyond the transvalue and
62 * any "direct" arguments, corresponding to the input arguments of the
63 * aggregate. These are always just passed as NULL. Such arguments may be
64 * needed to allow resolution of a polymorphic aggregate's result type.
65 *
66 * We compute aggregate input expressions and run the transition functions
67 * in a temporary econtext (aggstate->tmpcontext). This is reset at least
68 * once per input tuple, so when the transvalue datatype is
69 * pass-by-reference, we have to be careful to copy it into a longer-lived
70 * memory context, and free the prior value to avoid memory leakage. We
71 * store transvalues in another set of econtexts, aggstate->aggcontexts
72 * (one per grouping set, see below), which are also used for the hashtable
73 * structures in AGG_HASHED mode. These econtexts are rescanned, not just
74 * reset, at group boundaries so that aggregate transition functions can
75 * register shutdown callbacks via AggRegisterCallback.
76 *
77 * The node's regular econtext (aggstate->ss.ps.ps_ExprContext) is used to
78 * run finalize functions and compute the output tuple; this context can be
79 * reset once per output tuple.
80 *
81 * The executor's AggState node is passed as the fmgr "context" value in
82 * all transfunc and finalfunc calls. It is not recommended that the
83 * transition functions look at the AggState node directly, but they can
84 * use AggCheckCallContext() to verify that they are being called by
85 * nodeAgg.c (and not as ordinary SQL functions). The main reason a
86 * transition function might want to know this is so that it can avoid
87 * palloc'ing a fixed-size pass-by-ref transition value on every call:
88 * it can instead just scribble on and return its left input. Ordinarily
89 * it is completely forbidden for functions to modify pass-by-ref inputs,
90 * but in the aggregate case we know the left input is either the initial
91 * transition value or a previous function result, and in either case its
92 * value need not be preserved. See int8inc() for an example. Notice that
93 * the EEOP_AGG_PLAIN_TRANS step is coded to avoid a data copy step when
94 * the previous transition value pointer is returned. It is also possible
95 * to avoid repeated data copying when the transition value is an expanded
96 * object: to do that, the transition function must take care to return
97 * an expanded object that is in a child context of the memory context
98 * returned by AggCheckCallContext(). Also, some transition functions want
99 * to store working state in addition to the nominal transition value; they
100 * can use the memory context returned by AggCheckCallContext() to do that.
101 *
102 * Note: AggCheckCallContext() is available as of PostgreSQL 9.0. The
103 * AggState is available as context in earlier releases (back to 8.1),
104 * but direct examination of the node is needed to use it before 9.0.
105 *
106 * As of 9.4, aggregate transition functions can also use AggGetAggref()
107 * to get hold of the Aggref expression node for their aggregate call.
108 * This is mainly intended for ordered-set aggregates, which are not
109 * supported as window functions. (A regular aggregate function would
110 * need some fallback logic to use this, since there's no Aggref node
111 * for a window function.)
112 *
113 * Grouping sets:
114 *
115 * A list of grouping sets which is structurally equivalent to a ROLLUP
116 * clause (e.g. (a,b,c), (a,b), (a)) can be processed in a single pass over
117 * ordered data. We do this by keeping a separate set of transition values
118 * for each grouping set being concurrently processed; for each input tuple
119 * we update them all, and on group boundaries we reset those states
120 * (starting at the front of the list) whose grouping values have changed
121 * (the list of grouping sets is ordered from most specific to least
122 * specific).
123 *
124 * Where more complex grouping sets are used, we break them down into
125 * "phases", where each phase has a different sort order (except phase 0
126 * which is reserved for hashing). During each phase but the last, the
127 * input tuples are additionally stored in a tuplesort which is keyed to the
128 * next phase's sort order; during each phase but the first, the input
129 * tuples are drawn from the previously sorted data. (The sorting of the
130 * data for the first phase is handled by the planner, as it might be
131 * satisfied by underlying nodes.)
132 *
133 * Hashing can be mixed with sorted grouping. To do this, we have an
134 * AGG_MIXED strategy that populates the hashtables during the first sorted
135 * phase, and switches to reading them out after completing all sort phases.
136 * We can also support AGG_HASHED with multiple hash tables and no sorting
137 * at all.
138 *
139 * From the perspective of aggregate transition and final functions, the
140 * only issue regarding grouping sets is this: a single call site (flinfo)
141 * of an aggregate function may be used for updating several different
142 * transition values in turn. So the function must not cache in the flinfo
143 * anything which logically belongs as part of the transition value (most
144 * importantly, the memory context in which the transition value exists).
145 * The support API functions (AggCheckCallContext, AggRegisterCallback) are
146 * sensitive to the grouping set for which the aggregate function is
147 * currently being called.
148 *
149 * Plan structure:
150 *
151 * What we get from the planner is actually one "real" Agg node which is
152 * part of the plan tree proper, but which optionally has an additional list
153 * of Agg nodes hung off the side via the "chain" field. This is because an
154 * Agg node happens to be a convenient representation of all the data we
155 * need for grouping sets.
156 *
157 * For many purposes, we treat the "real" node as if it were just the first
158 * node in the chain. The chain must be ordered such that hashed entries
159 * come before sorted/plain entries; the real node is marked AGG_MIXED if
160 * there are both types present (in which case the real node describes one
161 * of the hashed groupings, other AGG_HASHED nodes may optionally follow in
162 * the chain, followed in turn by AGG_SORTED or (one) AGG_PLAIN node). If
163 * the real node is marked AGG_HASHED or AGG_SORTED, then all the chained
164 * nodes must be of the same type; if it is AGG_PLAIN, there can be no
165 * chained nodes.
166 *
167 * We collect all hashed nodes into a single "phase", numbered 0, and create
168 * a sorted phase (numbered 1..n) for each AGG_SORTED or AGG_PLAIN node.
169 * Phase 0 is allocated even if there are no hashes, but remains unused in
170 * that case.
171 *
172 * AGG_HASHED nodes actually refer to only a single grouping set each,
173 * because for each hashed grouping we need a separate grpColIdx and
174 * numGroups estimate. AGG_SORTED nodes represent a "rollup", a list of
175 * grouping sets that share a sort order. Each AGG_SORTED node other than
176 * the first one has an associated Sort node which describes the sort order
177 * to be used; the first sorted node takes its input from the outer subtree,
178 * which the planner has already arranged to provide ordered data.
179 *
180 * Memory and ExprContext usage:
181 *
182 * Because we're accumulating aggregate values across input rows, we need to
183 * use more memory contexts than just simple input/output tuple contexts.
184 * In fact, for a rollup, we need a separate context for each grouping set
185 * so that we can reset the inner (finer-grained) aggregates on their group
186 * boundaries while continuing to accumulate values for outer
187 * (coarser-grained) groupings. On top of this, we might be simultaneously
188 * populating hashtables; however, we only need one context for all the
189 * hashtables.
190 *
191 * So we create an array, aggcontexts, with an ExprContext for each grouping
192 * set in the largest rollup that we're going to process, and use the
193 * per-tuple memory context of those ExprContexts to store the aggregate
194 * transition values. hashcontext is the single context created to support
195 * all hash tables.
196 *
197 * Transition / Combine function invocation:
198 *
199 * For performance reasons transition functions, including combine
200 * functions, aren't invoked one-by-one from nodeAgg.c after computing
201 * arguments using the expression evaluation engine. Instead
202 * ExecBuildAggTrans() builds one large expression that does both argument
203 * evaluation and transition function invocation. That avoids performance
204 * issues due to repeated uses of expression evaluation, complications due
205 * to filter expressions having to be evaluated early, and allows to JIT
206 * the entire expression into one native function.
207 *
208 * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
209 * Portions Copyright (c) 1994, Regents of the University of California
210 *
211 * IDENTIFICATION
212 * src/backend/executor/nodeAgg.c
213 *
214 *-------------------------------------------------------------------------
215 */
216
217 #include "postgres.h"
218
219 #include "access/htup_details.h"
220 #include "catalog/objectaccess.h"
221 #include "catalog/pg_aggregate.h"
222 #include "catalog/pg_proc.h"
223 #include "catalog/pg_type.h"
224 #include "executor/execExpr.h"
225 #include "executor/executor.h"
226 #include "executor/nodeAgg.h"
227 #include "miscadmin.h"
228 #include "nodes/makefuncs.h"
229 #include "nodes/nodeFuncs.h"
230 #include "optimizer/clauses.h"
231 #include "optimizer/tlist.h"
232 #include "parser/parse_agg.h"
233 #include "parser/parse_coerce.h"
234 #include "utils/acl.h"
235 #include "utils/builtins.h"
236 #include "utils/lsyscache.h"
237 #include "utils/memutils.h"
238 #include "utils/syscache.h"
239 #include "utils/tuplesort.h"
240 #include "utils/datum.h"
241
242
243 static void select_current_set(AggState *aggstate, int setno, bool is_hash);
244 static void initialize_phase(AggState *aggstate, int newphase);
245 static TupleTableSlot *fetch_input_tuple(AggState *aggstate);
246 static void initialize_aggregates(AggState *aggstate,
247 AggStatePerGroup *pergroups,
248 int numReset);
249 static void advance_transition_function(AggState *aggstate,
250 AggStatePerTrans pertrans,
251 AggStatePerGroup pergroupstate);
252 static void advance_aggregates(AggState *aggstate);
253 static void process_ordered_aggregate_single(AggState *aggstate,
254 AggStatePerTrans pertrans,
255 AggStatePerGroup pergroupstate);
256 static void process_ordered_aggregate_multi(AggState *aggstate,
257 AggStatePerTrans pertrans,
258 AggStatePerGroup pergroupstate);
259 static void finalize_aggregate(AggState *aggstate,
260 AggStatePerAgg peragg,
261 AggStatePerGroup pergroupstate,
262 Datum *resultVal, bool *resultIsNull);
263 static void finalize_partialaggregate(AggState *aggstate,
264 AggStatePerAgg peragg,
265 AggStatePerGroup pergroupstate,
266 Datum *resultVal, bool *resultIsNull);
267 static void prepare_projection_slot(AggState *aggstate,
268 TupleTableSlot *slot,
269 int currentSet);
270 static void finalize_aggregates(AggState *aggstate,
271 AggStatePerAgg peragg,
272 AggStatePerGroup pergroup);
273 static TupleTableSlot *project_aggregates(AggState *aggstate);
274 static Bitmapset *find_unaggregated_cols(AggState *aggstate);
275 static bool find_unaggregated_cols_walker(Node *node, Bitmapset **colnos);
276 static void build_hash_table(AggState *aggstate);
277 static TupleHashEntryData *lookup_hash_entry(AggState *aggstate);
278 static void lookup_hash_entries(AggState *aggstate);
279 static TupleTableSlot *agg_retrieve_direct(AggState *aggstate);
280 static void agg_fill_hash_table(AggState *aggstate);
281 static TupleTableSlot *agg_retrieve_hash_table(AggState *aggstate);
282 static Datum GetAggInitVal(Datum textInitVal, Oid transtype);
283 static void build_pertrans_for_aggref(AggStatePerTrans pertrans,
284 AggState *aggstate, EState *estate,
285 Aggref *aggref, Oid aggtransfn, Oid aggtranstype,
286 Oid aggserialfn, Oid aggdeserialfn,
287 Datum initValue, bool initValueIsNull,
288 Oid *inputTypes, int numArguments);
289 static int find_compatible_peragg(Aggref *newagg, AggState *aggstate,
290 int lastaggno, List **same_input_transnos);
291 static int find_compatible_pertrans(AggState *aggstate, Aggref *newagg,
292 bool shareable,
293 Oid aggtransfn, Oid aggtranstype,
294 Oid aggserialfn, Oid aggdeserialfn,
295 Datum initValue, bool initValueIsNull,
296 List *transnos);
297
298
299 /*
300 * Select the current grouping set; affects current_set and
301 * curaggcontext.
302 */
303 static void
select_current_set(AggState * aggstate,int setno,bool is_hash)304 select_current_set(AggState *aggstate, int setno, bool is_hash)
305 {
306 /* when changing this, also adapt ExecInterpExpr() and friends */
307 if (is_hash)
308 aggstate->curaggcontext = aggstate->hashcontext;
309 else
310 aggstate->curaggcontext = aggstate->aggcontexts[setno];
311
312 aggstate->current_set = setno;
313 }
314
315 /*
316 * Switch to phase "newphase", which must either be 0 or 1 (to reset) or
317 * current_phase + 1. Juggle the tuplesorts accordingly.
318 *
319 * Phase 0 is for hashing, which we currently handle last in the AGG_MIXED
320 * case, so when entering phase 0, all we need to do is drop open sorts.
321 */
322 static void
initialize_phase(AggState * aggstate,int newphase)323 initialize_phase(AggState *aggstate, int newphase)
324 {
325 Assert(newphase <= 1 || newphase == aggstate->current_phase + 1);
326
327 /*
328 * Whatever the previous state, we're now done with whatever input
329 * tuplesort was in use.
330 */
331 if (aggstate->sort_in)
332 {
333 tuplesort_end(aggstate->sort_in);
334 aggstate->sort_in = NULL;
335 }
336
337 if (newphase <= 1)
338 {
339 /*
340 * Discard any existing output tuplesort.
341 */
342 if (aggstate->sort_out)
343 {
344 tuplesort_end(aggstate->sort_out);
345 aggstate->sort_out = NULL;
346 }
347 }
348 else
349 {
350 /*
351 * The old output tuplesort becomes the new input one, and this is the
352 * right time to actually sort it.
353 */
354 aggstate->sort_in = aggstate->sort_out;
355 aggstate->sort_out = NULL;
356 Assert(aggstate->sort_in);
357 tuplesort_performsort(aggstate->sort_in);
358 }
359
360 /*
361 * If this isn't the last phase, we need to sort appropriately for the
362 * next phase in sequence.
363 */
364 if (newphase > 0 && newphase < aggstate->numphases - 1)
365 {
366 Sort *sortnode = aggstate->phases[newphase + 1].sortnode;
367 PlanState *outerNode = outerPlanState(aggstate);
368 TupleDesc tupDesc = ExecGetResultType(outerNode);
369
370 aggstate->sort_out = tuplesort_begin_heap(tupDesc,
371 sortnode->numCols,
372 sortnode->sortColIdx,
373 sortnode->sortOperators,
374 sortnode->collations,
375 sortnode->nullsFirst,
376 work_mem,
377 NULL, false);
378 }
379
380 aggstate->current_phase = newphase;
381 aggstate->phase = &aggstate->phases[newphase];
382 }
383
384 /*
385 * Fetch a tuple from either the outer plan (for phase 1) or from the sorter
386 * populated by the previous phase. Copy it to the sorter for the next phase
387 * if any.
388 *
389 * Callers cannot rely on memory for tuple in returned slot remaining valid
390 * past any subsequently fetched tuple.
391 */
392 static TupleTableSlot *
fetch_input_tuple(AggState * aggstate)393 fetch_input_tuple(AggState *aggstate)
394 {
395 TupleTableSlot *slot;
396
397 if (aggstate->sort_in)
398 {
399 /* make sure we check for interrupts in either path through here */
400 CHECK_FOR_INTERRUPTS();
401 if (!tuplesort_gettupleslot(aggstate->sort_in, true, false,
402 aggstate->sort_slot, NULL))
403 return NULL;
404 slot = aggstate->sort_slot;
405 }
406 else
407 slot = ExecProcNode(outerPlanState(aggstate));
408
409 if (!TupIsNull(slot) && aggstate->sort_out)
410 tuplesort_puttupleslot(aggstate->sort_out, slot);
411
412 return slot;
413 }
414
415 /*
416 * (Re)Initialize an individual aggregate.
417 *
418 * This function handles only one grouping set, already set in
419 * aggstate->current_set.
420 *
421 * When called, CurrentMemoryContext should be the per-query context.
422 */
423 static void
initialize_aggregate(AggState * aggstate,AggStatePerTrans pertrans,AggStatePerGroup pergroupstate)424 initialize_aggregate(AggState *aggstate, AggStatePerTrans pertrans,
425 AggStatePerGroup pergroupstate)
426 {
427 /*
428 * Start a fresh sort operation for each DISTINCT/ORDER BY aggregate.
429 */
430 if (pertrans->numSortCols > 0)
431 {
432 /*
433 * In case of rescan, maybe there could be an uncompleted sort
434 * operation? Clean it up if so.
435 */
436 if (pertrans->sortstates[aggstate->current_set])
437 tuplesort_end(pertrans->sortstates[aggstate->current_set]);
438
439
440 /*
441 * We use a plain Datum sorter when there's a single input column;
442 * otherwise sort the full tuple. (See comments for
443 * process_ordered_aggregate_single.)
444 */
445 if (pertrans->numInputs == 1)
446 {
447 Form_pg_attribute attr = TupleDescAttr(pertrans->sortdesc, 0);
448
449 pertrans->sortstates[aggstate->current_set] =
450 tuplesort_begin_datum(attr->atttypid,
451 pertrans->sortOperators[0],
452 pertrans->sortCollations[0],
453 pertrans->sortNullsFirst[0],
454 work_mem, NULL, false);
455 }
456 else
457 pertrans->sortstates[aggstate->current_set] =
458 tuplesort_begin_heap(pertrans->sortdesc,
459 pertrans->numSortCols,
460 pertrans->sortColIdx,
461 pertrans->sortOperators,
462 pertrans->sortCollations,
463 pertrans->sortNullsFirst,
464 work_mem, NULL, false);
465 }
466
467 /*
468 * (Re)set transValue to the initial value.
469 *
470 * Note that when the initial value is pass-by-ref, we must copy it (into
471 * the aggcontext) since we will pfree the transValue later.
472 */
473 if (pertrans->initValueIsNull)
474 pergroupstate->transValue = pertrans->initValue;
475 else
476 {
477 MemoryContext oldContext;
478
479 oldContext = MemoryContextSwitchTo(
480 aggstate->curaggcontext->ecxt_per_tuple_memory);
481 pergroupstate->transValue = datumCopy(pertrans->initValue,
482 pertrans->transtypeByVal,
483 pertrans->transtypeLen);
484 MemoryContextSwitchTo(oldContext);
485 }
486 pergroupstate->transValueIsNull = pertrans->initValueIsNull;
487
488 /*
489 * If the initial value for the transition state doesn't exist in the
490 * pg_aggregate table then we will let the first non-NULL value returned
491 * from the outer procNode become the initial value. (This is useful for
492 * aggregates like max() and min().) The noTransValue flag signals that we
493 * still need to do this.
494 */
495 pergroupstate->noTransValue = pertrans->initValueIsNull;
496 }
497
498 /*
499 * Initialize all aggregate transition states for a new group of input values.
500 *
501 * If there are multiple grouping sets, we initialize only the first numReset
502 * of them (the grouping sets are ordered so that the most specific one, which
503 * is reset most often, is first). As a convenience, if numReset is 0, we
504 * reinitialize all sets.
505 *
506 * NB: This cannot be used for hash aggregates, as for those the grouping set
507 * number has to be specified from further up.
508 *
509 * When called, CurrentMemoryContext should be the per-query context.
510 */
511 static void
initialize_aggregates(AggState * aggstate,AggStatePerGroup * pergroups,int numReset)512 initialize_aggregates(AggState *aggstate,
513 AggStatePerGroup *pergroups,
514 int numReset)
515 {
516 int transno;
517 int numGroupingSets = Max(aggstate->phase->numsets, 1);
518 int setno = 0;
519 int numTrans = aggstate->numtrans;
520 AggStatePerTrans transstates = aggstate->pertrans;
521
522 if (numReset == 0)
523 numReset = numGroupingSets;
524
525 for (setno = 0; setno < numReset; setno++)
526 {
527 AggStatePerGroup pergroup = pergroups[setno];
528
529 select_current_set(aggstate, setno, false);
530
531 for (transno = 0; transno < numTrans; transno++)
532 {
533 AggStatePerTrans pertrans = &transstates[transno];
534 AggStatePerGroup pergroupstate = &pergroup[transno];
535
536 initialize_aggregate(aggstate, pertrans, pergroupstate);
537 }
538 }
539 }
540
541 /*
542 * Given new input value(s), advance the transition function of one aggregate
543 * state within one grouping set only (already set in aggstate->current_set)
544 *
545 * The new values (and null flags) have been preloaded into argument positions
546 * 1 and up in pertrans->transfn_fcinfo, so that we needn't copy them again to
547 * pass to the transition function. We also expect that the static fields of
548 * the fcinfo are already initialized; that was done by ExecInitAgg().
549 *
550 * It doesn't matter which memory context this is called in.
551 */
552 static void
advance_transition_function(AggState * aggstate,AggStatePerTrans pertrans,AggStatePerGroup pergroupstate)553 advance_transition_function(AggState *aggstate,
554 AggStatePerTrans pertrans,
555 AggStatePerGroup pergroupstate)
556 {
557 FunctionCallInfo fcinfo = &pertrans->transfn_fcinfo;
558 MemoryContext oldContext;
559 Datum newVal;
560
561 if (pertrans->transfn.fn_strict)
562 {
563 /*
564 * For a strict transfn, nothing happens when there's a NULL input; we
565 * just keep the prior transValue.
566 */
567 int numTransInputs = pertrans->numTransInputs;
568 int i;
569
570 for (i = 1; i <= numTransInputs; i++)
571 {
572 if (fcinfo->argnull[i])
573 return;
574 }
575 if (pergroupstate->noTransValue)
576 {
577 /*
578 * transValue has not been initialized. This is the first non-NULL
579 * input value. We use it as the initial value for transValue. (We
580 * already checked that the agg's input type is binary-compatible
581 * with its transtype, so straight copy here is OK.)
582 *
583 * We must copy the datum into aggcontext if it is pass-by-ref. We
584 * do not need to pfree the old transValue, since it's NULL.
585 */
586 oldContext = MemoryContextSwitchTo(
587 aggstate->curaggcontext->ecxt_per_tuple_memory);
588 pergroupstate->transValue = datumCopy(fcinfo->arg[1],
589 pertrans->transtypeByVal,
590 pertrans->transtypeLen);
591 pergroupstate->transValueIsNull = false;
592 pergroupstate->noTransValue = false;
593 MemoryContextSwitchTo(oldContext);
594 return;
595 }
596 if (pergroupstate->transValueIsNull)
597 {
598 /*
599 * Don't call a strict function with NULL inputs. Note it is
600 * possible to get here despite the above tests, if the transfn is
601 * strict *and* returned a NULL on a prior cycle. If that happens
602 * we will propagate the NULL all the way to the end.
603 */
604 return;
605 }
606 }
607
608 /* We run the transition functions in per-input-tuple memory context */
609 oldContext = MemoryContextSwitchTo(aggstate->tmpcontext->ecxt_per_tuple_memory);
610
611 /* set up aggstate->curpertrans for AggGetAggref() */
612 aggstate->curpertrans = pertrans;
613
614 /*
615 * OK to call the transition function
616 */
617 fcinfo->arg[0] = pergroupstate->transValue;
618 fcinfo->argnull[0] = pergroupstate->transValueIsNull;
619 fcinfo->isnull = false; /* just in case transfn doesn't set it */
620
621 newVal = FunctionCallInvoke(fcinfo);
622
623 aggstate->curpertrans = NULL;
624
625 /*
626 * If pass-by-ref datatype, must copy the new value into aggcontext and
627 * free the prior transValue. But if transfn returned a pointer to its
628 * first input, we don't need to do anything. Also, if transfn returned a
629 * pointer to a R/W expanded object that is already a child of the
630 * aggcontext, assume we can adopt that value without copying it.
631 *
632 * It's safe to compare newVal with pergroup->transValue without
633 * regard for either being NULL, because ExecAggTransReparent()
634 * takes care to set transValue to 0 when NULL. Otherwise we could
635 * end up accidentally not reparenting, when the transValue has
636 * the same numerical value as newValue, despite being NULL. This
637 * is a somewhat hot path, making it undesirable to instead solve
638 * this with another branch for the common case of the transition
639 * function returning its (modified) input argument.
640 */
641 if (!pertrans->transtypeByVal &&
642 DatumGetPointer(newVal) != DatumGetPointer(pergroupstate->transValue))
643 newVal = ExecAggTransReparent(aggstate, pertrans,
644 newVal, fcinfo->isnull,
645 pergroupstate->transValue,
646 pergroupstate->transValueIsNull);
647
648 pergroupstate->transValue = newVal;
649 pergroupstate->transValueIsNull = fcinfo->isnull;
650
651 MemoryContextSwitchTo(oldContext);
652 }
653
654 /*
655 * Advance each aggregate transition state for one input tuple. The input
656 * tuple has been stored in tmpcontext->ecxt_outertuple, so that it is
657 * accessible to ExecEvalExpr.
658 *
659 * We have two sets of transition states to handle: one for sorted aggregation
660 * and one for hashed; we do them both here, to avoid multiple evaluation of
661 * the inputs.
662 *
663 * When called, CurrentMemoryContext should be the per-query context.
664 */
665 static void
advance_aggregates(AggState * aggstate)666 advance_aggregates(AggState *aggstate)
667 {
668 bool dummynull;
669
670 ExecEvalExprSwitchContext(aggstate->phase->evaltrans,
671 aggstate->tmpcontext,
672 &dummynull);
673 }
674
675 /*
676 * Run the transition function for a DISTINCT or ORDER BY aggregate
677 * with only one input. This is called after we have completed
678 * entering all the input values into the sort object. We complete the
679 * sort, read out the values in sorted order, and run the transition
680 * function on each value (applying DISTINCT if appropriate).
681 *
682 * Note that the strictness of the transition function was checked when
683 * entering the values into the sort, so we don't check it again here;
684 * we just apply standard SQL DISTINCT logic.
685 *
686 * The one-input case is handled separately from the multi-input case
687 * for performance reasons: for single by-value inputs, such as the
688 * common case of count(distinct id), the tuplesort_getdatum code path
689 * is around 300% faster. (The speedup for by-reference types is less
690 * but still noticeable.)
691 *
692 * This function handles only one grouping set (already set in
693 * aggstate->current_set).
694 *
695 * When called, CurrentMemoryContext should be the per-query context.
696 */
697 static void
process_ordered_aggregate_single(AggState * aggstate,AggStatePerTrans pertrans,AggStatePerGroup pergroupstate)698 process_ordered_aggregate_single(AggState *aggstate,
699 AggStatePerTrans pertrans,
700 AggStatePerGroup pergroupstate)
701 {
702 Datum oldVal = (Datum) 0;
703 bool oldIsNull = true;
704 bool haveOldVal = false;
705 MemoryContext workcontext = aggstate->tmpcontext->ecxt_per_tuple_memory;
706 MemoryContext oldContext;
707 bool isDistinct = (pertrans->numDistinctCols > 0);
708 Datum newAbbrevVal = (Datum) 0;
709 Datum oldAbbrevVal = (Datum) 0;
710 FunctionCallInfo fcinfo = &pertrans->transfn_fcinfo;
711 Datum *newVal;
712 bool *isNull;
713
714 Assert(pertrans->numDistinctCols < 2);
715
716 tuplesort_performsort(pertrans->sortstates[aggstate->current_set]);
717
718 /* Load the column into argument 1 (arg 0 will be transition value) */
719 newVal = fcinfo->arg + 1;
720 isNull = fcinfo->argnull + 1;
721
722 /*
723 * Note: if input type is pass-by-ref, the datums returned by the sort are
724 * freshly palloc'd in the per-query context, so we must be careful to
725 * pfree them when they are no longer needed.
726 */
727
728 while (tuplesort_getdatum(pertrans->sortstates[aggstate->current_set],
729 true, newVal, isNull, &newAbbrevVal))
730 {
731 /*
732 * Clear and select the working context for evaluation of the equality
733 * function and transition function.
734 */
735 MemoryContextReset(workcontext);
736 oldContext = MemoryContextSwitchTo(workcontext);
737
738 /*
739 * If DISTINCT mode, and not distinct from prior, skip it.
740 *
741 * Note: we assume equality functions don't care about collation.
742 */
743 if (isDistinct &&
744 haveOldVal &&
745 ((oldIsNull && *isNull) ||
746 (!oldIsNull && !*isNull &&
747 oldAbbrevVal == newAbbrevVal &&
748 DatumGetBool(FunctionCall2(&pertrans->equalfnOne,
749 oldVal, *newVal)))))
750 {
751 /* equal to prior, so forget this one */
752 if (!pertrans->inputtypeByVal && !*isNull)
753 pfree(DatumGetPointer(*newVal));
754 }
755 else
756 {
757 advance_transition_function(aggstate, pertrans, pergroupstate);
758 /* forget the old value, if any */
759 if (!oldIsNull && !pertrans->inputtypeByVal)
760 pfree(DatumGetPointer(oldVal));
761 /* and remember the new one for subsequent equality checks */
762 oldVal = *newVal;
763 oldAbbrevVal = newAbbrevVal;
764 oldIsNull = *isNull;
765 haveOldVal = true;
766 }
767
768 MemoryContextSwitchTo(oldContext);
769 }
770
771 if (!oldIsNull && !pertrans->inputtypeByVal)
772 pfree(DatumGetPointer(oldVal));
773
774 tuplesort_end(pertrans->sortstates[aggstate->current_set]);
775 pertrans->sortstates[aggstate->current_set] = NULL;
776 }
777
778 /*
779 * Run the transition function for a DISTINCT or ORDER BY aggregate
780 * with more than one input. This is called after we have completed
781 * entering all the input values into the sort object. We complete the
782 * sort, read out the values in sorted order, and run the transition
783 * function on each value (applying DISTINCT if appropriate).
784 *
785 * This function handles only one grouping set (already set in
786 * aggstate->current_set).
787 *
788 * When called, CurrentMemoryContext should be the per-query context.
789 */
790 static void
process_ordered_aggregate_multi(AggState * aggstate,AggStatePerTrans pertrans,AggStatePerGroup pergroupstate)791 process_ordered_aggregate_multi(AggState *aggstate,
792 AggStatePerTrans pertrans,
793 AggStatePerGroup pergroupstate)
794 {
795 ExprContext *tmpcontext = aggstate->tmpcontext;
796 FunctionCallInfo fcinfo = &pertrans->transfn_fcinfo;
797 TupleTableSlot *slot1 = pertrans->sortslot;
798 TupleTableSlot *slot2 = pertrans->uniqslot;
799 int numTransInputs = pertrans->numTransInputs;
800 int numDistinctCols = pertrans->numDistinctCols;
801 Datum newAbbrevVal = (Datum) 0;
802 Datum oldAbbrevVal = (Datum) 0;
803 bool haveOldValue = false;
804 TupleTableSlot *save = aggstate->tmpcontext->ecxt_outertuple;
805 int i;
806
807 tuplesort_performsort(pertrans->sortstates[aggstate->current_set]);
808
809 ExecClearTuple(slot1);
810 if (slot2)
811 ExecClearTuple(slot2);
812
813 while (tuplesort_gettupleslot(pertrans->sortstates[aggstate->current_set],
814 true, true, slot1, &newAbbrevVal))
815 {
816 CHECK_FOR_INTERRUPTS();
817
818 tmpcontext->ecxt_outertuple = slot1;
819 tmpcontext->ecxt_innertuple = slot2;
820
821 if (numDistinctCols == 0 ||
822 !haveOldValue ||
823 newAbbrevVal != oldAbbrevVal ||
824 !ExecQual(pertrans->equalfnMulti, tmpcontext))
825 {
826 /*
827 * Extract the first numTransInputs columns as datums to pass to
828 * the transfn.
829 */
830 slot_getsomeattrs(slot1, numTransInputs);
831
832 /* Load values into fcinfo */
833 /* Start from 1, since the 0th arg will be the transition value */
834 for (i = 0; i < numTransInputs; i++)
835 {
836 fcinfo->arg[i + 1] = slot1->tts_values[i];
837 fcinfo->argnull[i + 1] = slot1->tts_isnull[i];
838 }
839
840 advance_transition_function(aggstate, pertrans, pergroupstate);
841
842 if (numDistinctCols > 0)
843 {
844 /* swap the slot pointers to retain the current tuple */
845 TupleTableSlot *tmpslot = slot2;
846
847 slot2 = slot1;
848 slot1 = tmpslot;
849 /* avoid ExecQual() calls by reusing abbreviated keys */
850 oldAbbrevVal = newAbbrevVal;
851 haveOldValue = true;
852 }
853 }
854
855 /* Reset context each time */
856 ResetExprContext(tmpcontext);
857
858 ExecClearTuple(slot1);
859 }
860
861 if (slot2)
862 ExecClearTuple(slot2);
863
864 tuplesort_end(pertrans->sortstates[aggstate->current_set]);
865 pertrans->sortstates[aggstate->current_set] = NULL;
866
867 /* restore previous slot, potentially in use for grouping sets */
868 tmpcontext->ecxt_outertuple = save;
869 }
870
871 /*
872 * Compute the final value of one aggregate.
873 *
874 * This function handles only one grouping set (already set in
875 * aggstate->current_set).
876 *
877 * The finalfunction will be run, and the result delivered, in the
878 * output-tuple context; caller's CurrentMemoryContext does not matter.
879 *
880 * The finalfn uses the state as set in the transno. This also might be
881 * being used by another aggregate function, so it's important that we do
882 * nothing destructive here.
883 */
884 static void
finalize_aggregate(AggState * aggstate,AggStatePerAgg peragg,AggStatePerGroup pergroupstate,Datum * resultVal,bool * resultIsNull)885 finalize_aggregate(AggState *aggstate,
886 AggStatePerAgg peragg,
887 AggStatePerGroup pergroupstate,
888 Datum *resultVal, bool *resultIsNull)
889 {
890 FunctionCallInfoData fcinfo;
891 bool anynull = false;
892 MemoryContext oldContext;
893 int i;
894 ListCell *lc;
895 AggStatePerTrans pertrans = &aggstate->pertrans[peragg->transno];
896
897 oldContext = MemoryContextSwitchTo(aggstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
898
899 /*
900 * Evaluate any direct arguments. We do this even if there's no finalfn
901 * (which is unlikely anyway), so that side-effects happen as expected.
902 * The direct arguments go into arg positions 1 and up, leaving position 0
903 * for the transition state value.
904 */
905 i = 1;
906 foreach(lc, peragg->aggdirectargs)
907 {
908 ExprState *expr = (ExprState *) lfirst(lc);
909
910 fcinfo.arg[i] = ExecEvalExpr(expr,
911 aggstate->ss.ps.ps_ExprContext,
912 &fcinfo.argnull[i]);
913 anynull |= fcinfo.argnull[i];
914 i++;
915 }
916
917 /*
918 * Apply the agg's finalfn if one is provided, else return transValue.
919 */
920 if (OidIsValid(peragg->finalfn_oid))
921 {
922 int numFinalArgs = peragg->numFinalArgs;
923
924 /* set up aggstate->curperagg for AggGetAggref() */
925 aggstate->curperagg = peragg;
926
927 InitFunctionCallInfoData(fcinfo, &peragg->finalfn,
928 numFinalArgs,
929 pertrans->aggCollation,
930 (void *) aggstate, NULL);
931
932 /* Fill in the transition state value */
933 fcinfo.arg[0] = MakeExpandedObjectReadOnly(pergroupstate->transValue,
934 pergroupstate->transValueIsNull,
935 pertrans->transtypeLen);
936 fcinfo.argnull[0] = pergroupstate->transValueIsNull;
937 anynull |= pergroupstate->transValueIsNull;
938
939 /* Fill any remaining argument positions with nulls */
940 for (; i < numFinalArgs; i++)
941 {
942 fcinfo.arg[i] = (Datum) 0;
943 fcinfo.argnull[i] = true;
944 anynull = true;
945 }
946
947 if (fcinfo.flinfo->fn_strict && anynull)
948 {
949 /* don't call a strict function with NULL inputs */
950 *resultVal = (Datum) 0;
951 *resultIsNull = true;
952 }
953 else
954 {
955 *resultVal = FunctionCallInvoke(&fcinfo);
956 *resultIsNull = fcinfo.isnull;
957 }
958 aggstate->curperagg = NULL;
959 }
960 else
961 {
962 /* Don't need MakeExpandedObjectReadOnly; datumCopy will copy it */
963 *resultVal = pergroupstate->transValue;
964 *resultIsNull = pergroupstate->transValueIsNull;
965 }
966
967 /*
968 * If result is pass-by-ref, make sure it is in the right context.
969 */
970 if (!peragg->resulttypeByVal && !*resultIsNull &&
971 !MemoryContextContains(CurrentMemoryContext,
972 DatumGetPointer(*resultVal)))
973 *resultVal = datumCopy(*resultVal,
974 peragg->resulttypeByVal,
975 peragg->resulttypeLen);
976
977 MemoryContextSwitchTo(oldContext);
978 }
979
980 /*
981 * Compute the output value of one partial aggregate.
982 *
983 * The serialization function will be run, and the result delivered, in the
984 * output-tuple context; caller's CurrentMemoryContext does not matter.
985 */
986 static void
finalize_partialaggregate(AggState * aggstate,AggStatePerAgg peragg,AggStatePerGroup pergroupstate,Datum * resultVal,bool * resultIsNull)987 finalize_partialaggregate(AggState *aggstate,
988 AggStatePerAgg peragg,
989 AggStatePerGroup pergroupstate,
990 Datum *resultVal, bool *resultIsNull)
991 {
992 AggStatePerTrans pertrans = &aggstate->pertrans[peragg->transno];
993 MemoryContext oldContext;
994
995 oldContext = MemoryContextSwitchTo(aggstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
996
997 /*
998 * serialfn_oid will be set if we must serialize the transvalue before
999 * returning it
1000 */
1001 if (OidIsValid(pertrans->serialfn_oid))
1002 {
1003 /* Don't call a strict serialization function with NULL input. */
1004 if (pertrans->serialfn.fn_strict && pergroupstate->transValueIsNull)
1005 {
1006 *resultVal = (Datum) 0;
1007 *resultIsNull = true;
1008 }
1009 else
1010 {
1011 FunctionCallInfo fcinfo = &pertrans->serialfn_fcinfo;
1012
1013 fcinfo->arg[0] = MakeExpandedObjectReadOnly(pergroupstate->transValue,
1014 pergroupstate->transValueIsNull,
1015 pertrans->transtypeLen);
1016 fcinfo->argnull[0] = pergroupstate->transValueIsNull;
1017 fcinfo->isnull = false;
1018
1019 *resultVal = FunctionCallInvoke(fcinfo);
1020 *resultIsNull = fcinfo->isnull;
1021 }
1022 }
1023 else
1024 {
1025 /* Don't need MakeExpandedObjectReadOnly; datumCopy will copy it */
1026 *resultVal = pergroupstate->transValue;
1027 *resultIsNull = pergroupstate->transValueIsNull;
1028 }
1029
1030 /* If result is pass-by-ref, make sure it is in the right context. */
1031 if (!peragg->resulttypeByVal && !*resultIsNull &&
1032 !MemoryContextContains(CurrentMemoryContext,
1033 DatumGetPointer(*resultVal)))
1034 *resultVal = datumCopy(*resultVal,
1035 peragg->resulttypeByVal,
1036 peragg->resulttypeLen);
1037
1038 MemoryContextSwitchTo(oldContext);
1039 }
1040
1041 /*
1042 * Prepare to finalize and project based on the specified representative tuple
1043 * slot and grouping set.
1044 *
1045 * In the specified tuple slot, force to null all attributes that should be
1046 * read as null in the context of the current grouping set. Also stash the
1047 * current group bitmap where GroupingExpr can get at it.
1048 *
1049 * This relies on three conditions:
1050 *
1051 * 1) Nothing is ever going to try and extract the whole tuple from this slot,
1052 * only reference it in evaluations, which will only access individual
1053 * attributes.
1054 *
1055 * 2) No system columns are going to need to be nulled. (If a system column is
1056 * referenced in a group clause, it is actually projected in the outer plan
1057 * tlist.)
1058 *
1059 * 3) Within a given phase, we never need to recover the value of an attribute
1060 * once it has been set to null.
1061 *
1062 * Poking into the slot this way is a bit ugly, but the consensus is that the
1063 * alternative was worse.
1064 */
1065 static void
prepare_projection_slot(AggState * aggstate,TupleTableSlot * slot,int currentSet)1066 prepare_projection_slot(AggState *aggstate, TupleTableSlot *slot, int currentSet)
1067 {
1068 if (aggstate->phase->grouped_cols)
1069 {
1070 Bitmapset *grouped_cols = aggstate->phase->grouped_cols[currentSet];
1071
1072 aggstate->grouped_cols = grouped_cols;
1073
1074 if (slot->tts_isempty)
1075 {
1076 /*
1077 * Force all values to be NULL if working on an empty input tuple
1078 * (i.e. an empty grouping set for which no input rows were
1079 * supplied).
1080 */
1081 ExecStoreAllNullTuple(slot);
1082 }
1083 else if (aggstate->all_grouped_cols)
1084 {
1085 ListCell *lc;
1086
1087 /* all_grouped_cols is arranged in desc order */
1088 slot_getsomeattrs(slot, linitial_int(aggstate->all_grouped_cols));
1089
1090 foreach(lc, aggstate->all_grouped_cols)
1091 {
1092 int attnum = lfirst_int(lc);
1093
1094 if (!bms_is_member(attnum, grouped_cols))
1095 slot->tts_isnull[attnum - 1] = true;
1096 }
1097 }
1098 }
1099 }
1100
1101 /*
1102 * Compute the final value of all aggregates for one group.
1103 *
1104 * This function handles only one grouping set at a time, which the caller must
1105 * have selected. It's also the caller's responsibility to adjust the supplied
1106 * pergroup parameter to point to the current set's transvalues.
1107 *
1108 * Results are stored in the output econtext aggvalues/aggnulls.
1109 */
1110 static void
finalize_aggregates(AggState * aggstate,AggStatePerAgg peraggs,AggStatePerGroup pergroup)1111 finalize_aggregates(AggState *aggstate,
1112 AggStatePerAgg peraggs,
1113 AggStatePerGroup pergroup)
1114 {
1115 ExprContext *econtext = aggstate->ss.ps.ps_ExprContext;
1116 Datum *aggvalues = econtext->ecxt_aggvalues;
1117 bool *aggnulls = econtext->ecxt_aggnulls;
1118 int aggno;
1119 int transno;
1120
1121 /*
1122 * If there were any DISTINCT and/or ORDER BY aggregates, sort their
1123 * inputs and run the transition functions.
1124 */
1125 for (transno = 0; transno < aggstate->numtrans; transno++)
1126 {
1127 AggStatePerTrans pertrans = &aggstate->pertrans[transno];
1128 AggStatePerGroup pergroupstate;
1129
1130 pergroupstate = &pergroup[transno];
1131
1132 if (pertrans->numSortCols > 0)
1133 {
1134 Assert(aggstate->aggstrategy != AGG_HASHED &&
1135 aggstate->aggstrategy != AGG_MIXED);
1136
1137 if (pertrans->numInputs == 1)
1138 process_ordered_aggregate_single(aggstate,
1139 pertrans,
1140 pergroupstate);
1141 else
1142 process_ordered_aggregate_multi(aggstate,
1143 pertrans,
1144 pergroupstate);
1145 }
1146 }
1147
1148 /*
1149 * Run the final functions.
1150 */
1151 for (aggno = 0; aggno < aggstate->numaggs; aggno++)
1152 {
1153 AggStatePerAgg peragg = &peraggs[aggno];
1154 int transno = peragg->transno;
1155 AggStatePerGroup pergroupstate;
1156
1157 pergroupstate = &pergroup[transno];
1158
1159 if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit))
1160 finalize_partialaggregate(aggstate, peragg, pergroupstate,
1161 &aggvalues[aggno], &aggnulls[aggno]);
1162 else
1163 finalize_aggregate(aggstate, peragg, pergroupstate,
1164 &aggvalues[aggno], &aggnulls[aggno]);
1165 }
1166 }
1167
1168 /*
1169 * Project the result of a group (whose aggs have already been calculated by
1170 * finalize_aggregates). Returns the result slot, or NULL if no row is
1171 * projected (suppressed by qual).
1172 */
1173 static TupleTableSlot *
project_aggregates(AggState * aggstate)1174 project_aggregates(AggState *aggstate)
1175 {
1176 ExprContext *econtext = aggstate->ss.ps.ps_ExprContext;
1177
1178 /*
1179 * Check the qual (HAVING clause); if the group does not match, ignore it.
1180 */
1181 if (ExecQual(aggstate->ss.ps.qual, econtext))
1182 {
1183 /*
1184 * Form and return projection tuple using the aggregate results and
1185 * the representative input tuple.
1186 */
1187 return ExecProject(aggstate->ss.ps.ps_ProjInfo);
1188 }
1189 else
1190 InstrCountFiltered1(aggstate, 1);
1191
1192 return NULL;
1193 }
1194
1195 /*
1196 * find_unaggregated_cols
1197 * Construct a bitmapset of the column numbers of un-aggregated Vars
1198 * appearing in our targetlist and qual (HAVING clause)
1199 */
1200 static Bitmapset *
find_unaggregated_cols(AggState * aggstate)1201 find_unaggregated_cols(AggState *aggstate)
1202 {
1203 Agg *node = (Agg *) aggstate->ss.ps.plan;
1204 Bitmapset *colnos;
1205
1206 colnos = NULL;
1207 (void) find_unaggregated_cols_walker((Node *) node->plan.targetlist,
1208 &colnos);
1209 (void) find_unaggregated_cols_walker((Node *) node->plan.qual,
1210 &colnos);
1211 return colnos;
1212 }
1213
1214 static bool
find_unaggregated_cols_walker(Node * node,Bitmapset ** colnos)1215 find_unaggregated_cols_walker(Node *node, Bitmapset **colnos)
1216 {
1217 if (node == NULL)
1218 return false;
1219 if (IsA(node, Var))
1220 {
1221 Var *var = (Var *) node;
1222
1223 /* setrefs.c should have set the varno to OUTER_VAR */
1224 Assert(var->varno == OUTER_VAR);
1225 Assert(var->varlevelsup == 0);
1226 *colnos = bms_add_member(*colnos, var->varattno);
1227 return false;
1228 }
1229 if (IsA(node, Aggref) ||IsA(node, GroupingFunc))
1230 {
1231 /* do not descend into aggregate exprs */
1232 return false;
1233 }
1234 return expression_tree_walker(node, find_unaggregated_cols_walker,
1235 (void *) colnos);
1236 }
1237
1238 /*
1239 * (Re-)initialize the hash table(s) to empty.
1240 *
1241 * To implement hashed aggregation, we need a hashtable that stores a
1242 * representative tuple and an array of AggStatePerGroup structs for each
1243 * distinct set of GROUP BY column values. We compute the hash key from the
1244 * GROUP BY columns. The per-group data is allocated in lookup_hash_entry(),
1245 * for each entry.
1246 *
1247 * We have a separate hashtable and associated perhash data structure for each
1248 * grouping set for which we're doing hashing.
1249 *
1250 * The contents of the hash tables always live in the hashcontext's per-tuple
1251 * memory context (there is only one of these for all tables together, since
1252 * they are all reset at the same time).
1253 */
1254 static void
build_hash_table(AggState * aggstate)1255 build_hash_table(AggState *aggstate)
1256 {
1257 MemoryContext tmpmem = aggstate->tmpcontext->ecxt_per_tuple_memory;
1258 Size additionalsize;
1259 int i;
1260
1261 Assert(aggstate->aggstrategy == AGG_HASHED || aggstate->aggstrategy == AGG_MIXED);
1262
1263 additionalsize = aggstate->numtrans * sizeof(AggStatePerGroupData);
1264
1265 for (i = 0; i < aggstate->num_hashes; ++i)
1266 {
1267 AggStatePerHash perhash = &aggstate->perhash[i];
1268
1269 Assert(perhash->aggnode->numGroups > 0);
1270
1271 if (perhash->hashtable)
1272 ResetTupleHashTable(perhash->hashtable);
1273 else
1274 perhash->hashtable = BuildTupleHashTableExt(&aggstate->ss.ps,
1275 perhash->hashslot->tts_tupleDescriptor,
1276 perhash->numCols,
1277 perhash->hashGrpColIdxHash,
1278 perhash->eqfuncoids,
1279 perhash->hashfunctions,
1280 perhash->aggnode->numGroups,
1281 additionalsize,
1282 aggstate->ss.ps.state->es_query_cxt,
1283 aggstate->hashcontext->ecxt_per_tuple_memory,
1284 tmpmem,
1285 DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit));
1286 }
1287 }
1288
1289 /*
1290 * Compute columns that actually need to be stored in hashtable entries. The
1291 * incoming tuples from the child plan node will contain grouping columns,
1292 * other columns referenced in our targetlist and qual, columns used to
1293 * compute the aggregate functions, and perhaps just junk columns we don't use
1294 * at all. Only columns of the first two types need to be stored in the
1295 * hashtable, and getting rid of the others can make the table entries
1296 * significantly smaller. The hashtable only contains the relevant columns,
1297 * and is packed/unpacked in lookup_hash_entry() / agg_retrieve_hash_table()
1298 * into the format of the normal input descriptor.
1299 *
1300 * Additional columns, in addition to the columns grouped by, come from two
1301 * sources: Firstly functionally dependent columns that we don't need to group
1302 * by themselves, and secondly ctids for row-marks.
1303 *
1304 * To eliminate duplicates, we build a bitmapset of the needed columns, and
1305 * then build an array of the columns included in the hashtable. We might
1306 * still have duplicates if the passed-in grpColIdx has them, which can happen
1307 * in edge cases from semijoins/distinct; these can't always be removed,
1308 * because it's not certain that the duplicate cols will be using the same
1309 * hash function.
1310 *
1311 * Note that the array is preserved over ExecReScanAgg, so we allocate it in
1312 * the per-query context (unlike the hash table itself).
1313 */
1314 static void
find_hash_columns(AggState * aggstate)1315 find_hash_columns(AggState *aggstate)
1316 {
1317 Bitmapset *base_colnos;
1318 List *outerTlist = outerPlanState(aggstate)->plan->targetlist;
1319 int numHashes = aggstate->num_hashes;
1320 EState *estate = aggstate->ss.ps.state;
1321 int j;
1322
1323 /* Find Vars that will be needed in tlist and qual */
1324 base_colnos = find_unaggregated_cols(aggstate);
1325
1326 for (j = 0; j < numHashes; ++j)
1327 {
1328 AggStatePerHash perhash = &aggstate->perhash[j];
1329 Bitmapset *colnos = bms_copy(base_colnos);
1330 AttrNumber *grpColIdx = perhash->aggnode->grpColIdx;
1331 List *hashTlist = NIL;
1332 TupleDesc hashDesc;
1333 int maxCols;
1334 int i;
1335
1336 perhash->largestGrpColIdx = 0;
1337
1338 /*
1339 * If we're doing grouping sets, then some Vars might be referenced in
1340 * tlist/qual for the benefit of other grouping sets, but not needed
1341 * when hashing; i.e. prepare_projection_slot will null them out, so
1342 * there'd be no point storing them. Use prepare_projection_slot's
1343 * logic to determine which.
1344 */
1345 if (aggstate->phases[0].grouped_cols)
1346 {
1347 Bitmapset *grouped_cols = aggstate->phases[0].grouped_cols[j];
1348 ListCell *lc;
1349
1350 foreach(lc, aggstate->all_grouped_cols)
1351 {
1352 int attnum = lfirst_int(lc);
1353
1354 if (!bms_is_member(attnum, grouped_cols))
1355 colnos = bms_del_member(colnos, attnum);
1356 }
1357 }
1358
1359 /*
1360 * Compute maximum number of input columns accounting for possible
1361 * duplications in the grpColIdx array, which can happen in some edge
1362 * cases where HashAggregate was generated as part of a semijoin or a
1363 * DISTINCT.
1364 */
1365 maxCols = bms_num_members(colnos) + perhash->numCols;
1366
1367 perhash->hashGrpColIdxInput =
1368 palloc(maxCols * sizeof(AttrNumber));
1369 perhash->hashGrpColIdxHash =
1370 palloc(perhash->numCols * sizeof(AttrNumber));
1371
1372 /* Add all the grouping columns to colnos */
1373 for (i = 0; i < perhash->numCols; i++)
1374 colnos = bms_add_member(colnos, grpColIdx[i]);
1375
1376 /*
1377 * First build mapping for columns directly hashed. These are the
1378 * first, because they'll be accessed when computing hash values and
1379 * comparing tuples for exact matches. We also build simple mapping
1380 * for execGrouping, so it knows where to find the to-be-hashed /
1381 * compared columns in the input.
1382 */
1383 for (i = 0; i < perhash->numCols; i++)
1384 {
1385 perhash->hashGrpColIdxInput[i] = grpColIdx[i];
1386 perhash->hashGrpColIdxHash[i] = i + 1;
1387 perhash->numhashGrpCols++;
1388 /* delete already mapped columns */
1389 bms_del_member(colnos, grpColIdx[i]);
1390 }
1391
1392 /* and add the remaining columns */
1393 while ((i = bms_first_member(colnos)) >= 0)
1394 {
1395 perhash->hashGrpColIdxInput[perhash->numhashGrpCols] = i;
1396 perhash->numhashGrpCols++;
1397 }
1398
1399 /* and build a tuple descriptor for the hashtable */
1400 for (i = 0; i < perhash->numhashGrpCols; i++)
1401 {
1402 int varNumber = perhash->hashGrpColIdxInput[i] - 1;
1403
1404 hashTlist = lappend(hashTlist, list_nth(outerTlist, varNumber));
1405 perhash->largestGrpColIdx =
1406 Max(varNumber + 1, perhash->largestGrpColIdx);
1407 }
1408
1409 hashDesc = ExecTypeFromTL(hashTlist, false);
1410
1411 execTuplesHashPrepare(perhash->numCols,
1412 perhash->aggnode->grpOperators,
1413 &perhash->eqfuncoids,
1414 &perhash->hashfunctions);
1415 perhash->hashslot =
1416 ExecAllocTableSlot(&estate->es_tupleTable, hashDesc);
1417
1418 list_free(hashTlist);
1419 bms_free(colnos);
1420 }
1421
1422 bms_free(base_colnos);
1423 }
1424
1425 /*
1426 * Estimate per-hash-table-entry overhead for the planner.
1427 *
1428 * Note that the estimate does not include space for pass-by-reference
1429 * transition data values, nor for the representative tuple of each group.
1430 * Nor does this account of the target fill-factor and growth policy of the
1431 * hash table.
1432 */
1433 Size
hash_agg_entry_size(int numAggs)1434 hash_agg_entry_size(int numAggs)
1435 {
1436 Size entrysize;
1437
1438 /* This must match build_hash_table */
1439 entrysize = sizeof(TupleHashEntryData) +
1440 numAggs * sizeof(AggStatePerGroupData);
1441 entrysize = MAXALIGN(entrysize);
1442
1443 return entrysize;
1444 }
1445
1446 /*
1447 * Find or create a hashtable entry for the tuple group containing the current
1448 * tuple (already set in tmpcontext's outertuple slot), in the current grouping
1449 * set (which the caller must have selected - note that initialize_aggregate
1450 * depends on this).
1451 *
1452 * When called, CurrentMemoryContext should be the per-query context.
1453 */
1454 static TupleHashEntryData *
lookup_hash_entry(AggState * aggstate)1455 lookup_hash_entry(AggState *aggstate)
1456 {
1457 TupleTableSlot *inputslot = aggstate->tmpcontext->ecxt_outertuple;
1458 AggStatePerHash perhash = &aggstate->perhash[aggstate->current_set];
1459 TupleTableSlot *hashslot = perhash->hashslot;
1460 TupleHashEntryData *entry;
1461 bool isnew;
1462 int i;
1463
1464 /* transfer just the needed columns into hashslot */
1465 slot_getsomeattrs(inputslot, perhash->largestGrpColIdx);
1466 ExecClearTuple(hashslot);
1467
1468 for (i = 0; i < perhash->numhashGrpCols; i++)
1469 {
1470 int varNumber = perhash->hashGrpColIdxInput[i] - 1;
1471
1472 hashslot->tts_values[i] = inputslot->tts_values[varNumber];
1473 hashslot->tts_isnull[i] = inputslot->tts_isnull[varNumber];
1474 }
1475 ExecStoreVirtualTuple(hashslot);
1476
1477 /* find or create the hashtable entry using the filtered tuple */
1478 entry = LookupTupleHashEntry(perhash->hashtable, hashslot, &isnew);
1479
1480 if (isnew)
1481 {
1482 AggStatePerGroup pergroup;
1483 int transno;
1484
1485 pergroup = (AggStatePerGroup)
1486 MemoryContextAlloc(perhash->hashtable->tablecxt,
1487 sizeof(AggStatePerGroupData) * aggstate->numtrans);
1488 entry->additional = pergroup;
1489
1490 /*
1491 * Initialize aggregates for new tuple group, lookup_hash_entries()
1492 * already has selected the relevant grouping set.
1493 */
1494 for (transno = 0; transno < aggstate->numtrans; transno++)
1495 {
1496 AggStatePerTrans pertrans = &aggstate->pertrans[transno];
1497 AggStatePerGroup pergroupstate = &pergroup[transno];
1498
1499 initialize_aggregate(aggstate, pertrans, pergroupstate);
1500 }
1501 }
1502
1503 return entry;
1504 }
1505
1506 /*
1507 * Look up hash entries for the current tuple in all hashed grouping sets,
1508 * returning an array of pergroup pointers suitable for advance_aggregates.
1509 *
1510 * Be aware that lookup_hash_entry can reset the tmpcontext.
1511 */
1512 static void
lookup_hash_entries(AggState * aggstate)1513 lookup_hash_entries(AggState *aggstate)
1514 {
1515 int numHashes = aggstate->num_hashes;
1516 AggStatePerGroup *pergroup = aggstate->hash_pergroup;
1517 int setno;
1518
1519 for (setno = 0; setno < numHashes; setno++)
1520 {
1521 select_current_set(aggstate, setno, true);
1522 pergroup[setno] = lookup_hash_entry(aggstate)->additional;
1523 }
1524 }
1525
1526 /*
1527 * ExecAgg -
1528 *
1529 * ExecAgg receives tuples from its outer subplan and aggregates over
1530 * the appropriate attribute for each aggregate function use (Aggref
1531 * node) appearing in the targetlist or qual of the node. The number
1532 * of tuples to aggregate over depends on whether grouped or plain
1533 * aggregation is selected. In grouped aggregation, we produce a result
1534 * row for each group; in plain aggregation there's a single result row
1535 * for the whole query. In either case, the value of each aggregate is
1536 * stored in the expression context to be used when ExecProject evaluates
1537 * the result tuple.
1538 */
1539 static TupleTableSlot *
ExecAgg(PlanState * pstate)1540 ExecAgg(PlanState *pstate)
1541 {
1542 AggState *node = castNode(AggState, pstate);
1543 TupleTableSlot *result = NULL;
1544
1545 CHECK_FOR_INTERRUPTS();
1546
1547 if (!node->agg_done)
1548 {
1549 /* Dispatch based on strategy */
1550 switch (node->phase->aggstrategy)
1551 {
1552 case AGG_HASHED:
1553 if (!node->table_filled)
1554 agg_fill_hash_table(node);
1555 /* FALLTHROUGH */
1556 case AGG_MIXED:
1557 result = agg_retrieve_hash_table(node);
1558 break;
1559 case AGG_PLAIN:
1560 case AGG_SORTED:
1561 result = agg_retrieve_direct(node);
1562 break;
1563 }
1564
1565 if (!TupIsNull(result))
1566 return result;
1567 }
1568
1569 return NULL;
1570 }
1571
1572 /*
1573 * ExecAgg for non-hashed case
1574 */
1575 static TupleTableSlot *
agg_retrieve_direct(AggState * aggstate)1576 agg_retrieve_direct(AggState *aggstate)
1577 {
1578 Agg *node = aggstate->phase->aggnode;
1579 ExprContext *econtext;
1580 ExprContext *tmpcontext;
1581 AggStatePerAgg peragg;
1582 AggStatePerGroup *pergroups;
1583 TupleTableSlot *outerslot;
1584 TupleTableSlot *firstSlot;
1585 TupleTableSlot *result;
1586 bool hasGroupingSets = aggstate->phase->numsets > 0;
1587 int numGroupingSets = Max(aggstate->phase->numsets, 1);
1588 int currentSet;
1589 int nextSetSize;
1590 int numReset;
1591 int i;
1592
1593 /*
1594 * get state info from node
1595 *
1596 * econtext is the per-output-tuple expression context
1597 *
1598 * tmpcontext is the per-input-tuple expression context
1599 */
1600 econtext = aggstate->ss.ps.ps_ExprContext;
1601 tmpcontext = aggstate->tmpcontext;
1602
1603 peragg = aggstate->peragg;
1604 pergroups = aggstate->pergroups;
1605 firstSlot = aggstate->ss.ss_ScanTupleSlot;
1606
1607 /*
1608 * We loop retrieving groups until we find one matching
1609 * aggstate->ss.ps.qual
1610 *
1611 * For grouping sets, we have the invariant that aggstate->projected_set
1612 * is either -1 (initial call) or the index (starting from 0) in
1613 * gset_lengths for the group we just completed (either by projecting a
1614 * row or by discarding it in the qual).
1615 */
1616 while (!aggstate->agg_done)
1617 {
1618 /*
1619 * Clear the per-output-tuple context for each group, as well as
1620 * aggcontext (which contains any pass-by-ref transvalues of the old
1621 * group). Some aggregate functions store working state in child
1622 * contexts; those now get reset automatically without us needing to
1623 * do anything special.
1624 *
1625 * We use ReScanExprContext not just ResetExprContext because we want
1626 * any registered shutdown callbacks to be called. That allows
1627 * aggregate functions to ensure they've cleaned up any non-memory
1628 * resources.
1629 */
1630 ReScanExprContext(econtext);
1631
1632 /*
1633 * Determine how many grouping sets need to be reset at this boundary.
1634 */
1635 if (aggstate->projected_set >= 0 &&
1636 aggstate->projected_set < numGroupingSets)
1637 numReset = aggstate->projected_set + 1;
1638 else
1639 numReset = numGroupingSets;
1640
1641 /*
1642 * numReset can change on a phase boundary, but that's OK; we want to
1643 * reset the contexts used in _this_ phase, and later, after possibly
1644 * changing phase, initialize the right number of aggregates for the
1645 * _new_ phase.
1646 */
1647
1648 for (i = 0; i < numReset; i++)
1649 {
1650 ReScanExprContext(aggstate->aggcontexts[i]);
1651 }
1652
1653 /*
1654 * Check if input is complete and there are no more groups to project
1655 * in this phase; move to next phase or mark as done.
1656 */
1657 if (aggstate->input_done == true &&
1658 aggstate->projected_set >= (numGroupingSets - 1))
1659 {
1660 if (aggstate->current_phase < aggstate->numphases - 1)
1661 {
1662 initialize_phase(aggstate, aggstate->current_phase + 1);
1663 aggstate->input_done = false;
1664 aggstate->projected_set = -1;
1665 numGroupingSets = Max(aggstate->phase->numsets, 1);
1666 node = aggstate->phase->aggnode;
1667 numReset = numGroupingSets;
1668 }
1669 else if (aggstate->aggstrategy == AGG_MIXED)
1670 {
1671 /*
1672 * Mixed mode; we've output all the grouped stuff and have
1673 * full hashtables, so switch to outputting those.
1674 */
1675 initialize_phase(aggstate, 0);
1676 aggstate->table_filled = true;
1677 ResetTupleHashIterator(aggstate->perhash[0].hashtable,
1678 &aggstate->perhash[0].hashiter);
1679 select_current_set(aggstate, 0, true);
1680 return agg_retrieve_hash_table(aggstate);
1681 }
1682 else
1683 {
1684 aggstate->agg_done = true;
1685 break;
1686 }
1687 }
1688
1689 /*
1690 * Get the number of columns in the next grouping set after the last
1691 * projected one (if any). This is the number of columns to compare to
1692 * see if we reached the boundary of that set too.
1693 */
1694 if (aggstate->projected_set >= 0 &&
1695 aggstate->projected_set < (numGroupingSets - 1))
1696 nextSetSize = aggstate->phase->gset_lengths[aggstate->projected_set + 1];
1697 else
1698 nextSetSize = 0;
1699
1700 /*----------
1701 * If a subgroup for the current grouping set is present, project it.
1702 *
1703 * We have a new group if:
1704 * - we're out of input but haven't projected all grouping sets
1705 * (checked above)
1706 * OR
1707 * - we already projected a row that wasn't from the last grouping
1708 * set
1709 * AND
1710 * - the next grouping set has at least one grouping column (since
1711 * empty grouping sets project only once input is exhausted)
1712 * AND
1713 * - the previous and pending rows differ on the grouping columns
1714 * of the next grouping set
1715 *----------
1716 */
1717 tmpcontext->ecxt_innertuple = econtext->ecxt_outertuple;
1718 if (aggstate->input_done ||
1719 (node->aggstrategy != AGG_PLAIN &&
1720 aggstate->projected_set != -1 &&
1721 aggstate->projected_set < (numGroupingSets - 1) &&
1722 nextSetSize > 0 &&
1723 !ExecQualAndReset(aggstate->phase->eqfunctions[nextSetSize - 1],
1724 tmpcontext)))
1725 {
1726 aggstate->projected_set += 1;
1727
1728 Assert(aggstate->projected_set < numGroupingSets);
1729 Assert(nextSetSize > 0 || aggstate->input_done);
1730 }
1731 else
1732 {
1733 /*
1734 * We no longer care what group we just projected, the next
1735 * projection will always be the first (or only) grouping set
1736 * (unless the input proves to be empty).
1737 */
1738 aggstate->projected_set = 0;
1739
1740 /*
1741 * If we don't already have the first tuple of the new group,
1742 * fetch it from the outer plan.
1743 */
1744 if (aggstate->grp_firstTuple == NULL)
1745 {
1746 outerslot = fetch_input_tuple(aggstate);
1747 if (!TupIsNull(outerslot))
1748 {
1749 /*
1750 * Make a copy of the first input tuple; we will use this
1751 * for comparisons (in group mode) and for projection.
1752 */
1753 aggstate->grp_firstTuple = ExecCopySlotTuple(outerslot);
1754 }
1755 else
1756 {
1757 /* outer plan produced no tuples at all */
1758 if (hasGroupingSets)
1759 {
1760 /*
1761 * If there was no input at all, we need to project
1762 * rows only if there are grouping sets of size 0.
1763 * Note that this implies that there can't be any
1764 * references to ungrouped Vars, which would otherwise
1765 * cause issues with the empty output slot.
1766 *
1767 * XXX: This is no longer true, we currently deal with
1768 * this in finalize_aggregates().
1769 */
1770 aggstate->input_done = true;
1771
1772 while (aggstate->phase->gset_lengths[aggstate->projected_set] > 0)
1773 {
1774 aggstate->projected_set += 1;
1775 if (aggstate->projected_set >= numGroupingSets)
1776 {
1777 /*
1778 * We can't set agg_done here because we might
1779 * have more phases to do, even though the
1780 * input is empty. So we need to restart the
1781 * whole outer loop.
1782 */
1783 break;
1784 }
1785 }
1786
1787 if (aggstate->projected_set >= numGroupingSets)
1788 continue;
1789 }
1790 else
1791 {
1792 aggstate->agg_done = true;
1793 /* If we are grouping, we should produce no tuples too */
1794 if (node->aggstrategy != AGG_PLAIN)
1795 return NULL;
1796 }
1797 }
1798 }
1799
1800 /*
1801 * Initialize working state for a new input tuple group.
1802 */
1803 initialize_aggregates(aggstate, pergroups, numReset);
1804
1805 if (aggstate->grp_firstTuple != NULL)
1806 {
1807 /*
1808 * Store the copied first input tuple in the tuple table slot
1809 * reserved for it. The tuple will be deleted when it is
1810 * cleared from the slot.
1811 */
1812 ExecStoreTuple(aggstate->grp_firstTuple,
1813 firstSlot,
1814 InvalidBuffer,
1815 true);
1816 aggstate->grp_firstTuple = NULL; /* don't keep two pointers */
1817
1818 /* set up for first advance_aggregates call */
1819 tmpcontext->ecxt_outertuple = firstSlot;
1820
1821 /*
1822 * Process each outer-plan tuple, and then fetch the next one,
1823 * until we exhaust the outer plan or cross a group boundary.
1824 */
1825 for (;;)
1826 {
1827 /*
1828 * During phase 1 only of a mixed agg, we need to update
1829 * hashtables as well in advance_aggregates.
1830 */
1831 if (aggstate->aggstrategy == AGG_MIXED &&
1832 aggstate->current_phase == 1)
1833 {
1834 lookup_hash_entries(aggstate);
1835 }
1836
1837 /* Advance the aggregates (or combine functions) */
1838 advance_aggregates(aggstate);
1839
1840 /* Reset per-input-tuple context after each tuple */
1841 ResetExprContext(tmpcontext);
1842
1843 outerslot = fetch_input_tuple(aggstate);
1844 if (TupIsNull(outerslot))
1845 {
1846 /* no more outer-plan tuples available */
1847 if (hasGroupingSets)
1848 {
1849 aggstate->input_done = true;
1850 break;
1851 }
1852 else
1853 {
1854 aggstate->agg_done = true;
1855 break;
1856 }
1857 }
1858 /* set up for next advance_aggregates call */
1859 tmpcontext->ecxt_outertuple = outerslot;
1860
1861 /*
1862 * If we are grouping, check whether we've crossed a group
1863 * boundary.
1864 */
1865 if (node->aggstrategy != AGG_PLAIN)
1866 {
1867 tmpcontext->ecxt_innertuple = firstSlot;
1868 if (!ExecQual(aggstate->phase->eqfunctions[node->numCols - 1],
1869 tmpcontext))
1870 {
1871 aggstate->grp_firstTuple = ExecCopySlotTuple(outerslot);
1872 break;
1873 }
1874 }
1875 }
1876 }
1877
1878 /*
1879 * Use the representative input tuple for any references to
1880 * non-aggregated input columns in aggregate direct args, the node
1881 * qual, and the tlist. (If we are not grouping, and there are no
1882 * input rows at all, we will come here with an empty firstSlot
1883 * ... but if not grouping, there can't be any references to
1884 * non-aggregated input columns, so no problem.)
1885 */
1886 econtext->ecxt_outertuple = firstSlot;
1887 }
1888
1889 Assert(aggstate->projected_set >= 0);
1890
1891 currentSet = aggstate->projected_set;
1892
1893 prepare_projection_slot(aggstate, econtext->ecxt_outertuple, currentSet);
1894
1895 select_current_set(aggstate, currentSet, false);
1896
1897 finalize_aggregates(aggstate,
1898 peragg,
1899 pergroups[currentSet]);
1900
1901 /*
1902 * If there's no row to project right now, we must continue rather
1903 * than returning a null since there might be more groups.
1904 */
1905 result = project_aggregates(aggstate);
1906 if (result)
1907 return result;
1908 }
1909
1910 /* No more groups */
1911 return NULL;
1912 }
1913
1914 /*
1915 * ExecAgg for hashed case: read input and build hash table
1916 */
1917 static void
agg_fill_hash_table(AggState * aggstate)1918 agg_fill_hash_table(AggState *aggstate)
1919 {
1920 TupleTableSlot *outerslot;
1921 ExprContext *tmpcontext = aggstate->tmpcontext;
1922
1923 /*
1924 * Process each outer-plan tuple, and then fetch the next one, until we
1925 * exhaust the outer plan.
1926 */
1927 for (;;)
1928 {
1929 outerslot = fetch_input_tuple(aggstate);
1930 if (TupIsNull(outerslot))
1931 break;
1932
1933 /* set up for lookup_hash_entries and advance_aggregates */
1934 tmpcontext->ecxt_outertuple = outerslot;
1935
1936 /* Find or build hashtable entries */
1937 lookup_hash_entries(aggstate);
1938
1939 /* Advance the aggregates (or combine functions) */
1940 advance_aggregates(aggstate);
1941
1942 /*
1943 * Reset per-input-tuple context after each tuple, but note that the
1944 * hash lookups do this too
1945 */
1946 ResetExprContext(aggstate->tmpcontext);
1947 }
1948
1949 aggstate->table_filled = true;
1950 /* Initialize to walk the first hash table */
1951 select_current_set(aggstate, 0, true);
1952 ResetTupleHashIterator(aggstate->perhash[0].hashtable,
1953 &aggstate->perhash[0].hashiter);
1954 }
1955
1956 /*
1957 * ExecAgg for hashed case: retrieving groups from hash table
1958 */
1959 static TupleTableSlot *
agg_retrieve_hash_table(AggState * aggstate)1960 agg_retrieve_hash_table(AggState *aggstate)
1961 {
1962 ExprContext *econtext;
1963 AggStatePerAgg peragg;
1964 AggStatePerGroup pergroup;
1965 TupleHashEntryData *entry;
1966 TupleTableSlot *firstSlot;
1967 TupleTableSlot *result;
1968 AggStatePerHash perhash;
1969
1970 /*
1971 * get state info from node.
1972 *
1973 * econtext is the per-output-tuple expression context.
1974 */
1975 econtext = aggstate->ss.ps.ps_ExprContext;
1976 peragg = aggstate->peragg;
1977 firstSlot = aggstate->ss.ss_ScanTupleSlot;
1978
1979 /*
1980 * Note that perhash (and therefore anything accessed through it) can
1981 * change inside the loop, as we change between grouping sets.
1982 */
1983 perhash = &aggstate->perhash[aggstate->current_set];
1984
1985 /*
1986 * We loop retrieving groups until we find one satisfying
1987 * aggstate->ss.ps.qual
1988 */
1989 while (!aggstate->agg_done)
1990 {
1991 TupleTableSlot *hashslot = perhash->hashslot;
1992 int i;
1993
1994 CHECK_FOR_INTERRUPTS();
1995
1996 /*
1997 * Find the next entry in the hash table
1998 */
1999 entry = ScanTupleHashTable(perhash->hashtable, &perhash->hashiter);
2000 if (entry == NULL)
2001 {
2002 int nextset = aggstate->current_set + 1;
2003
2004 if (nextset < aggstate->num_hashes)
2005 {
2006 /*
2007 * Switch to next grouping set, reinitialize, and restart the
2008 * loop.
2009 */
2010 select_current_set(aggstate, nextset, true);
2011
2012 perhash = &aggstate->perhash[aggstate->current_set];
2013
2014 ResetTupleHashIterator(perhash->hashtable, &perhash->hashiter);
2015
2016 continue;
2017 }
2018 else
2019 {
2020 /* No more hashtables, so done */
2021 aggstate->agg_done = true;
2022 return NULL;
2023 }
2024 }
2025
2026 /*
2027 * Clear the per-output-tuple context for each group
2028 *
2029 * We intentionally don't use ReScanExprContext here; if any aggs have
2030 * registered shutdown callbacks, they mustn't be called yet, since we
2031 * might not be done with that agg.
2032 */
2033 ResetExprContext(econtext);
2034
2035 /*
2036 * Transform representative tuple back into one with the right
2037 * columns.
2038 */
2039 ExecStoreMinimalTuple(entry->firstTuple, hashslot, false);
2040 slot_getallattrs(hashslot);
2041
2042 ExecClearTuple(firstSlot);
2043 memset(firstSlot->tts_isnull, true,
2044 firstSlot->tts_tupleDescriptor->natts * sizeof(bool));
2045
2046 for (i = 0; i < perhash->numhashGrpCols; i++)
2047 {
2048 int varNumber = perhash->hashGrpColIdxInput[i] - 1;
2049
2050 firstSlot->tts_values[varNumber] = hashslot->tts_values[i];
2051 firstSlot->tts_isnull[varNumber] = hashslot->tts_isnull[i];
2052 }
2053 ExecStoreVirtualTuple(firstSlot);
2054
2055 pergroup = (AggStatePerGroup) entry->additional;
2056
2057 /*
2058 * Use the representative input tuple for any references to
2059 * non-aggregated input columns in the qual and tlist.
2060 */
2061 econtext->ecxt_outertuple = firstSlot;
2062
2063 prepare_projection_slot(aggstate,
2064 econtext->ecxt_outertuple,
2065 aggstate->current_set);
2066
2067 finalize_aggregates(aggstate, peragg, pergroup);
2068
2069 result = project_aggregates(aggstate);
2070 if (result)
2071 return result;
2072 }
2073
2074 /* No more groups */
2075 return NULL;
2076 }
2077
2078 /* -----------------
2079 * ExecInitAgg
2080 *
2081 * Creates the run-time information for the agg node produced by the
2082 * planner and initializes its outer subtree.
2083 *
2084 * -----------------
2085 */
2086 AggState *
ExecInitAgg(Agg * node,EState * estate,int eflags)2087 ExecInitAgg(Agg *node, EState *estate, int eflags)
2088 {
2089 AggState *aggstate;
2090 AggStatePerAgg peraggs;
2091 AggStatePerTrans pertransstates;
2092 AggStatePerGroup *pergroups;
2093 Plan *outerPlan;
2094 ExprContext *econtext;
2095 TupleDesc scanDesc;
2096 int numaggs,
2097 transno,
2098 aggno;
2099 int phase;
2100 int phaseidx;
2101 ListCell *l;
2102 Bitmapset *all_grouped_cols = NULL;
2103 int numGroupingSets = 1;
2104 int numPhases;
2105 int numHashes;
2106 int i = 0;
2107 int j = 0;
2108 bool use_hashing = (node->aggstrategy == AGG_HASHED ||
2109 node->aggstrategy == AGG_MIXED);
2110
2111 /* check for unsupported flags */
2112 Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
2113
2114 /*
2115 * create state structure
2116 */
2117 aggstate = makeNode(AggState);
2118 aggstate->ss.ps.plan = (Plan *) node;
2119 aggstate->ss.ps.state = estate;
2120 aggstate->ss.ps.ExecProcNode = ExecAgg;
2121
2122 aggstate->aggs = NIL;
2123 aggstate->numaggs = 0;
2124 aggstate->numtrans = 0;
2125 aggstate->aggstrategy = node->aggstrategy;
2126 aggstate->aggsplit = node->aggsplit;
2127 aggstate->maxsets = 0;
2128 aggstate->projected_set = -1;
2129 aggstate->current_set = 0;
2130 aggstate->peragg = NULL;
2131 aggstate->pertrans = NULL;
2132 aggstate->curperagg = NULL;
2133 aggstate->curpertrans = NULL;
2134 aggstate->input_done = false;
2135 aggstate->agg_done = false;
2136 aggstate->pergroups = NULL;
2137 aggstate->grp_firstTuple = NULL;
2138 aggstate->sort_in = NULL;
2139 aggstate->sort_out = NULL;
2140
2141 /*
2142 * phases[0] always exists, but is dummy in sorted/plain mode
2143 */
2144 numPhases = (use_hashing ? 1 : 2);
2145 numHashes = (use_hashing ? 1 : 0);
2146
2147 /*
2148 * Calculate the maximum number of grouping sets in any phase; this
2149 * determines the size of some allocations. Also calculate the number of
2150 * phases, since all hashed/mixed nodes contribute to only a single phase.
2151 */
2152 if (node->groupingSets)
2153 {
2154 numGroupingSets = list_length(node->groupingSets);
2155
2156 foreach(l, node->chain)
2157 {
2158 Agg *agg = lfirst(l);
2159
2160 numGroupingSets = Max(numGroupingSets,
2161 list_length(agg->groupingSets));
2162
2163 /*
2164 * additional AGG_HASHED aggs become part of phase 0, but all
2165 * others add an extra phase.
2166 */
2167 if (agg->aggstrategy != AGG_HASHED)
2168 ++numPhases;
2169 else
2170 ++numHashes;
2171 }
2172 }
2173
2174 aggstate->maxsets = numGroupingSets;
2175 aggstate->numphases = numPhases;
2176
2177 aggstate->aggcontexts = (ExprContext **)
2178 palloc0(sizeof(ExprContext *) * numGroupingSets);
2179
2180 /*
2181 * Create expression contexts. We need three or more, one for
2182 * per-input-tuple processing, one for per-output-tuple processing, one
2183 * for all the hashtables, and one for each grouping set. The per-tuple
2184 * memory context of the per-grouping-set ExprContexts (aggcontexts)
2185 * replaces the standalone memory context formerly used to hold transition
2186 * values. We cheat a little by using ExecAssignExprContext() to build
2187 * all of them.
2188 *
2189 * NOTE: the details of what is stored in aggcontexts and what is stored
2190 * in the regular per-query memory context are driven by a simple
2191 * decision: we want to reset the aggcontext at group boundaries (if not
2192 * hashing) and in ExecReScanAgg to recover no-longer-wanted space.
2193 */
2194 ExecAssignExprContext(estate, &aggstate->ss.ps);
2195 aggstate->tmpcontext = aggstate->ss.ps.ps_ExprContext;
2196
2197 for (i = 0; i < numGroupingSets; ++i)
2198 {
2199 ExecAssignExprContext(estate, &aggstate->ss.ps);
2200 aggstate->aggcontexts[i] = aggstate->ss.ps.ps_ExprContext;
2201 }
2202
2203 if (use_hashing)
2204 {
2205 ExecAssignExprContext(estate, &aggstate->ss.ps);
2206 aggstate->hashcontext = aggstate->ss.ps.ps_ExprContext;
2207 }
2208
2209 ExecAssignExprContext(estate, &aggstate->ss.ps);
2210
2211 /*
2212 * Initialize child nodes.
2213 *
2214 * If we are doing a hashed aggregation then the child plan does not need
2215 * to handle REWIND efficiently; see ExecReScanAgg.
2216 */
2217 if (node->aggstrategy == AGG_HASHED)
2218 eflags &= ~EXEC_FLAG_REWIND;
2219 outerPlan = outerPlan(node);
2220 outerPlanState(aggstate) = ExecInitNode(outerPlan, estate, eflags);
2221
2222 /*
2223 * initialize source tuple type.
2224 */
2225 ExecCreateScanSlotFromOuterPlan(estate, &aggstate->ss);
2226 scanDesc = aggstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
2227 if (node->chain)
2228 aggstate->sort_slot = ExecInitExtraTupleSlot(estate, scanDesc);
2229
2230 /*
2231 * Initialize result type, slot and projection.
2232 */
2233 ExecInitResultTupleSlotTL(estate, &aggstate->ss.ps);
2234 ExecAssignProjectionInfo(&aggstate->ss.ps, NULL);
2235
2236 /*
2237 * initialize child expressions
2238 *
2239 * We expect the parser to have checked that no aggs contain other agg
2240 * calls in their arguments (and just to be sure, we verify it again while
2241 * initializing the plan node). This would make no sense under SQL
2242 * semantics, and it's forbidden by the spec. Because it is true, we
2243 * don't need to worry about evaluating the aggs in any particular order.
2244 *
2245 * Note: execExpr.c finds Aggrefs for us, and adds their AggrefExprState
2246 * nodes to aggstate->aggs. Aggrefs in the qual are found here; Aggrefs
2247 * in the targetlist are found during ExecAssignProjectionInfo, below.
2248 */
2249 aggstate->ss.ps.qual =
2250 ExecInitQual(node->plan.qual, (PlanState *) aggstate);
2251
2252 /*
2253 * We should now have found all Aggrefs in the targetlist and quals.
2254 */
2255 numaggs = aggstate->numaggs;
2256 Assert(numaggs == list_length(aggstate->aggs));
2257
2258 /*
2259 * For each phase, prepare grouping set data and fmgr lookup data for
2260 * compare functions. Accumulate all_grouped_cols in passing.
2261 */
2262 aggstate->phases = palloc0(numPhases * sizeof(AggStatePerPhaseData));
2263
2264 aggstate->num_hashes = numHashes;
2265 if (numHashes)
2266 {
2267 aggstate->perhash = palloc0(sizeof(AggStatePerHashData) * numHashes);
2268 aggstate->phases[0].numsets = 0;
2269 aggstate->phases[0].gset_lengths = palloc(numHashes * sizeof(int));
2270 aggstate->phases[0].grouped_cols = palloc(numHashes * sizeof(Bitmapset *));
2271 }
2272
2273 phase = 0;
2274 for (phaseidx = 0; phaseidx <= list_length(node->chain); ++phaseidx)
2275 {
2276 Agg *aggnode;
2277 Sort *sortnode;
2278
2279 if (phaseidx > 0)
2280 {
2281 aggnode = list_nth_node(Agg, node->chain, phaseidx - 1);
2282 sortnode = castNode(Sort, aggnode->plan.lefttree);
2283 }
2284 else
2285 {
2286 aggnode = node;
2287 sortnode = NULL;
2288 }
2289
2290 Assert(phase <= 1 || sortnode);
2291
2292 if (aggnode->aggstrategy == AGG_HASHED
2293 || aggnode->aggstrategy == AGG_MIXED)
2294 {
2295 AggStatePerPhase phasedata = &aggstate->phases[0];
2296 AggStatePerHash perhash;
2297 Bitmapset *cols = NULL;
2298
2299 Assert(phase == 0);
2300 i = phasedata->numsets++;
2301 perhash = &aggstate->perhash[i];
2302
2303 /* phase 0 always points to the "real" Agg in the hash case */
2304 phasedata->aggnode = node;
2305 phasedata->aggstrategy = node->aggstrategy;
2306
2307 /* but the actual Agg node representing this hash is saved here */
2308 perhash->aggnode = aggnode;
2309
2310 phasedata->gset_lengths[i] = perhash->numCols = aggnode->numCols;
2311
2312 for (j = 0; j < aggnode->numCols; ++j)
2313 cols = bms_add_member(cols, aggnode->grpColIdx[j]);
2314
2315 phasedata->grouped_cols[i] = cols;
2316
2317 all_grouped_cols = bms_add_members(all_grouped_cols, cols);
2318 continue;
2319 }
2320 else
2321 {
2322 AggStatePerPhase phasedata = &aggstate->phases[++phase];
2323 int num_sets;
2324
2325 phasedata->numsets = num_sets = list_length(aggnode->groupingSets);
2326
2327 if (num_sets)
2328 {
2329 phasedata->gset_lengths = palloc(num_sets * sizeof(int));
2330 phasedata->grouped_cols = palloc(num_sets * sizeof(Bitmapset *));
2331
2332 i = 0;
2333 foreach(l, aggnode->groupingSets)
2334 {
2335 int current_length = list_length(lfirst(l));
2336 Bitmapset *cols = NULL;
2337
2338 /* planner forces this to be correct */
2339 for (j = 0; j < current_length; ++j)
2340 cols = bms_add_member(cols, aggnode->grpColIdx[j]);
2341
2342 phasedata->grouped_cols[i] = cols;
2343 phasedata->gset_lengths[i] = current_length;
2344
2345 ++i;
2346 }
2347
2348 all_grouped_cols = bms_add_members(all_grouped_cols,
2349 phasedata->grouped_cols[0]);
2350 }
2351 else
2352 {
2353 Assert(phaseidx == 0);
2354
2355 phasedata->gset_lengths = NULL;
2356 phasedata->grouped_cols = NULL;
2357 }
2358
2359 /*
2360 * If we are grouping, precompute fmgr lookup data for inner loop.
2361 */
2362 if (aggnode->aggstrategy == AGG_SORTED)
2363 {
2364 int i = 0;
2365
2366 Assert(aggnode->numCols > 0);
2367
2368 /*
2369 * Build a separate function for each subset of columns that
2370 * need to be compared.
2371 */
2372 phasedata->eqfunctions =
2373 (ExprState **) palloc0(aggnode->numCols * sizeof(ExprState *));
2374
2375 /* for each grouping set */
2376 for (i = 0; i < phasedata->numsets; i++)
2377 {
2378 int length = phasedata->gset_lengths[i];
2379
2380 if (phasedata->eqfunctions[length - 1] != NULL)
2381 continue;
2382
2383 phasedata->eqfunctions[length - 1] =
2384 execTuplesMatchPrepare(scanDesc,
2385 length,
2386 aggnode->grpColIdx,
2387 aggnode->grpOperators,
2388 (PlanState *) aggstate);
2389 }
2390
2391 /* and for all grouped columns, unless already computed */
2392 if (phasedata->eqfunctions[aggnode->numCols - 1] == NULL)
2393 {
2394 phasedata->eqfunctions[aggnode->numCols - 1] =
2395 execTuplesMatchPrepare(scanDesc,
2396 aggnode->numCols,
2397 aggnode->grpColIdx,
2398 aggnode->grpOperators,
2399 (PlanState *) aggstate);
2400 }
2401 }
2402
2403 phasedata->aggnode = aggnode;
2404 phasedata->aggstrategy = aggnode->aggstrategy;
2405 phasedata->sortnode = sortnode;
2406 }
2407 }
2408
2409 /*
2410 * Convert all_grouped_cols to a descending-order list.
2411 */
2412 i = -1;
2413 while ((i = bms_next_member(all_grouped_cols, i)) >= 0)
2414 aggstate->all_grouped_cols = lcons_int(i, aggstate->all_grouped_cols);
2415
2416 /*
2417 * Set up aggregate-result storage in the output expr context, and also
2418 * allocate my private per-agg working storage
2419 */
2420 econtext = aggstate->ss.ps.ps_ExprContext;
2421 econtext->ecxt_aggvalues = (Datum *) palloc0(sizeof(Datum) * numaggs);
2422 econtext->ecxt_aggnulls = (bool *) palloc0(sizeof(bool) * numaggs);
2423
2424 peraggs = (AggStatePerAgg) palloc0(sizeof(AggStatePerAggData) * numaggs);
2425 pertransstates = (AggStatePerTrans) palloc0(sizeof(AggStatePerTransData) * numaggs);
2426
2427 aggstate->peragg = peraggs;
2428 aggstate->pertrans = pertransstates;
2429
2430
2431 aggstate->all_pergroups =
2432 (AggStatePerGroup *) palloc0(sizeof(AggStatePerGroup)
2433 * (numGroupingSets + numHashes));
2434 pergroups = aggstate->all_pergroups;
2435
2436 if (node->aggstrategy != AGG_HASHED)
2437 {
2438 for (i = 0; i < numGroupingSets; i++)
2439 {
2440 pergroups[i] = (AggStatePerGroup) palloc0(sizeof(AggStatePerGroupData)
2441 * numaggs);
2442 }
2443
2444 aggstate->pergroups = pergroups;
2445 pergroups += numGroupingSets;
2446 }
2447
2448 /*
2449 * Hashing can only appear in the initial phase.
2450 */
2451 if (use_hashing)
2452 {
2453 /* this is an array of pointers, not structures */
2454 aggstate->hash_pergroup = pergroups;
2455
2456 find_hash_columns(aggstate);
2457
2458 /* Skip massive memory allocation if we are just doing EXPLAIN */
2459 if (!(eflags & EXEC_FLAG_EXPLAIN_ONLY))
2460 build_hash_table(aggstate);
2461
2462 aggstate->table_filled = false;
2463 }
2464
2465 /*
2466 * Initialize current phase-dependent values to initial phase. The initial
2467 * phase is 1 (first sort pass) for all strategies that use sorting (if
2468 * hashing is being done too, then phase 0 is processed last); but if only
2469 * hashing is being done, then phase 0 is all there is.
2470 */
2471 if (node->aggstrategy == AGG_HASHED)
2472 {
2473 aggstate->current_phase = 0;
2474 initialize_phase(aggstate, 0);
2475 select_current_set(aggstate, 0, true);
2476 }
2477 else
2478 {
2479 aggstate->current_phase = 1;
2480 initialize_phase(aggstate, 1);
2481 select_current_set(aggstate, 0, false);
2482 }
2483
2484 /* -----------------
2485 * Perform lookups of aggregate function info, and initialize the
2486 * unchanging fields of the per-agg and per-trans data.
2487 *
2488 * We try to optimize by detecting duplicate aggregate functions so that
2489 * their state and final values are re-used, rather than needlessly being
2490 * re-calculated independently. We also detect aggregates that are not
2491 * the same, but which can share the same transition state.
2492 *
2493 * Scenarios:
2494 *
2495 * 1. Identical aggregate function calls appear in the query:
2496 *
2497 * SELECT SUM(x) FROM ... HAVING SUM(x) > 0
2498 *
2499 * Since these aggregates are identical, we only need to calculate
2500 * the value once. Both aggregates will share the same 'aggno' value.
2501 *
2502 * 2. Two different aggregate functions appear in the query, but the
2503 * aggregates have the same arguments, transition functions and
2504 * initial values (and, presumably, different final functions):
2505 *
2506 * SELECT AVG(x), STDDEV(x) FROM ...
2507 *
2508 * In this case we must create a new peragg for the varying aggregate,
2509 * and we need to call the final functions separately, but we need
2510 * only run the transition function once. (This requires that the
2511 * final functions be nondestructive of the transition state, but
2512 * that's required anyway for other reasons.)
2513 *
2514 * For either of these optimizations to be valid, all aggregate properties
2515 * used in the transition phase must be the same, including any modifiers
2516 * such as ORDER BY, DISTINCT and FILTER, and the arguments mustn't
2517 * contain any volatile functions.
2518 * -----------------
2519 */
2520 aggno = -1;
2521 transno = -1;
2522 foreach(l, aggstate->aggs)
2523 {
2524 AggrefExprState *aggrefstate = (AggrefExprState *) lfirst(l);
2525 Aggref *aggref = aggrefstate->aggref;
2526 AggStatePerAgg peragg;
2527 AggStatePerTrans pertrans;
2528 int existing_aggno;
2529 int existing_transno;
2530 List *same_input_transnos;
2531 Oid inputTypes[FUNC_MAX_ARGS];
2532 int numArguments;
2533 int numDirectArgs;
2534 HeapTuple aggTuple;
2535 Form_pg_aggregate aggform;
2536 AclResult aclresult;
2537 Oid transfn_oid,
2538 finalfn_oid;
2539 bool shareable;
2540 Oid serialfn_oid,
2541 deserialfn_oid;
2542 Expr *finalfnexpr;
2543 Oid aggtranstype;
2544 Datum textInitVal;
2545 Datum initValue;
2546 bool initValueIsNull;
2547
2548 /* Planner should have assigned aggregate to correct level */
2549 Assert(aggref->agglevelsup == 0);
2550 /* ... and the split mode should match */
2551 Assert(aggref->aggsplit == aggstate->aggsplit);
2552
2553 /* 1. Check for already processed aggs which can be re-used */
2554 existing_aggno = find_compatible_peragg(aggref, aggstate, aggno,
2555 &same_input_transnos);
2556 if (existing_aggno != -1)
2557 {
2558 /*
2559 * Existing compatible agg found. so just point the Aggref to the
2560 * same per-agg struct.
2561 */
2562 aggrefstate->aggno = existing_aggno;
2563 continue;
2564 }
2565
2566 /* Mark Aggref state node with assigned index in the result array */
2567 peragg = &peraggs[++aggno];
2568 peragg->aggref = aggref;
2569 aggrefstate->aggno = aggno;
2570
2571 /* Fetch the pg_aggregate row */
2572 aggTuple = SearchSysCache1(AGGFNOID,
2573 ObjectIdGetDatum(aggref->aggfnoid));
2574 if (!HeapTupleIsValid(aggTuple))
2575 elog(ERROR, "cache lookup failed for aggregate %u",
2576 aggref->aggfnoid);
2577 aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
2578
2579 /* Check permission to call aggregate function */
2580 aclresult = pg_proc_aclcheck(aggref->aggfnoid, GetUserId(),
2581 ACL_EXECUTE);
2582 if (aclresult != ACLCHECK_OK)
2583 aclcheck_error(aclresult, OBJECT_AGGREGATE,
2584 get_func_name(aggref->aggfnoid));
2585 InvokeFunctionExecuteHook(aggref->aggfnoid);
2586
2587 /* planner recorded transition state type in the Aggref itself */
2588 aggtranstype = aggref->aggtranstype;
2589 Assert(OidIsValid(aggtranstype));
2590
2591 /*
2592 * If this aggregation is performing state combines, then instead of
2593 * using the transition function, we'll use the combine function
2594 */
2595 if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit))
2596 {
2597 transfn_oid = aggform->aggcombinefn;
2598
2599 /* If not set then the planner messed up */
2600 if (!OidIsValid(transfn_oid))
2601 elog(ERROR, "combinefn not set for aggregate function");
2602 }
2603 else
2604 transfn_oid = aggform->aggtransfn;
2605
2606 /* Final function only required if we're finalizing the aggregates */
2607 if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit))
2608 peragg->finalfn_oid = finalfn_oid = InvalidOid;
2609 else
2610 peragg->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
2611
2612 /*
2613 * If finalfn is marked read-write, we can't share transition states;
2614 * but it is okay to share states for AGGMODIFY_SHAREABLE aggs. Also,
2615 * if we're not executing the finalfn here, we can share regardless.
2616 */
2617 shareable = (aggform->aggfinalmodify != AGGMODIFY_READ_WRITE) ||
2618 (finalfn_oid == InvalidOid);
2619 peragg->shareable = shareable;
2620
2621 serialfn_oid = InvalidOid;
2622 deserialfn_oid = InvalidOid;
2623
2624 /*
2625 * Check if serialization/deserialization is required. We only do it
2626 * for aggregates that have transtype INTERNAL.
2627 */
2628 if (aggtranstype == INTERNALOID)
2629 {
2630 /*
2631 * The planner should only have generated a serialize agg node if
2632 * every aggregate with an INTERNAL state has a serialization
2633 * function. Verify that.
2634 */
2635 if (DO_AGGSPLIT_SERIALIZE(aggstate->aggsplit))
2636 {
2637 /* serialization only valid when not running finalfn */
2638 Assert(DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit));
2639
2640 if (!OidIsValid(aggform->aggserialfn))
2641 elog(ERROR, "serialfunc not provided for serialization aggregation");
2642 serialfn_oid = aggform->aggserialfn;
2643 }
2644
2645 /* Likewise for deserialization functions */
2646 if (DO_AGGSPLIT_DESERIALIZE(aggstate->aggsplit))
2647 {
2648 /* deserialization only valid when combining states */
2649 Assert(DO_AGGSPLIT_COMBINE(aggstate->aggsplit));
2650
2651 if (!OidIsValid(aggform->aggdeserialfn))
2652 elog(ERROR, "deserialfunc not provided for deserialization aggregation");
2653 deserialfn_oid = aggform->aggdeserialfn;
2654 }
2655 }
2656
2657 /* Check that aggregate owner has permission to call component fns */
2658 {
2659 HeapTuple procTuple;
2660 Oid aggOwner;
2661
2662 procTuple = SearchSysCache1(PROCOID,
2663 ObjectIdGetDatum(aggref->aggfnoid));
2664 if (!HeapTupleIsValid(procTuple))
2665 elog(ERROR, "cache lookup failed for function %u",
2666 aggref->aggfnoid);
2667 aggOwner = ((Form_pg_proc) GETSTRUCT(procTuple))->proowner;
2668 ReleaseSysCache(procTuple);
2669
2670 aclresult = pg_proc_aclcheck(transfn_oid, aggOwner,
2671 ACL_EXECUTE);
2672 if (aclresult != ACLCHECK_OK)
2673 aclcheck_error(aclresult, OBJECT_FUNCTION,
2674 get_func_name(transfn_oid));
2675 InvokeFunctionExecuteHook(transfn_oid);
2676 if (OidIsValid(finalfn_oid))
2677 {
2678 aclresult = pg_proc_aclcheck(finalfn_oid, aggOwner,
2679 ACL_EXECUTE);
2680 if (aclresult != ACLCHECK_OK)
2681 aclcheck_error(aclresult, OBJECT_FUNCTION,
2682 get_func_name(finalfn_oid));
2683 InvokeFunctionExecuteHook(finalfn_oid);
2684 }
2685 if (OidIsValid(serialfn_oid))
2686 {
2687 aclresult = pg_proc_aclcheck(serialfn_oid, aggOwner,
2688 ACL_EXECUTE);
2689 if (aclresult != ACLCHECK_OK)
2690 aclcheck_error(aclresult, OBJECT_FUNCTION,
2691 get_func_name(serialfn_oid));
2692 InvokeFunctionExecuteHook(serialfn_oid);
2693 }
2694 if (OidIsValid(deserialfn_oid))
2695 {
2696 aclresult = pg_proc_aclcheck(deserialfn_oid, aggOwner,
2697 ACL_EXECUTE);
2698 if (aclresult != ACLCHECK_OK)
2699 aclcheck_error(aclresult, OBJECT_FUNCTION,
2700 get_func_name(deserialfn_oid));
2701 InvokeFunctionExecuteHook(deserialfn_oid);
2702 }
2703 }
2704
2705 /*
2706 * Get actual datatypes of the (nominal) aggregate inputs. These
2707 * could be different from the agg's declared input types, when the
2708 * agg accepts ANY or a polymorphic type.
2709 */
2710 numArguments = get_aggregate_argtypes(aggref, inputTypes);
2711
2712 /* Count the "direct" arguments, if any */
2713 numDirectArgs = list_length(aggref->aggdirectargs);
2714
2715 /* Detect how many arguments to pass to the finalfn */
2716 if (aggform->aggfinalextra)
2717 peragg->numFinalArgs = numArguments + 1;
2718 else
2719 peragg->numFinalArgs = numDirectArgs + 1;
2720
2721 /* Initialize any direct-argument expressions */
2722 peragg->aggdirectargs = ExecInitExprList(aggref->aggdirectargs,
2723 (PlanState *) aggstate);
2724
2725 /*
2726 * build expression trees using actual argument & result types for the
2727 * finalfn, if it exists and is required.
2728 */
2729 if (OidIsValid(finalfn_oid))
2730 {
2731 build_aggregate_finalfn_expr(inputTypes,
2732 peragg->numFinalArgs,
2733 aggtranstype,
2734 aggref->aggtype,
2735 aggref->inputcollid,
2736 finalfn_oid,
2737 &finalfnexpr);
2738 fmgr_info(finalfn_oid, &peragg->finalfn);
2739 fmgr_info_set_expr((Node *) finalfnexpr, &peragg->finalfn);
2740 }
2741
2742 /* get info about the output value's datatype */
2743 get_typlenbyval(aggref->aggtype,
2744 &peragg->resulttypeLen,
2745 &peragg->resulttypeByVal);
2746
2747 /*
2748 * initval is potentially null, so don't try to access it as a struct
2749 * field. Must do it the hard way with SysCacheGetAttr.
2750 */
2751 textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple,
2752 Anum_pg_aggregate_agginitval,
2753 &initValueIsNull);
2754 if (initValueIsNull)
2755 initValue = (Datum) 0;
2756 else
2757 initValue = GetAggInitVal(textInitVal, aggtranstype);
2758
2759 /*
2760 * 2. Build working state for invoking the transition function, or
2761 * look up previously initialized working state, if we can share it.
2762 *
2763 * find_compatible_peragg() already collected a list of shareable
2764 * per-Trans's with the same inputs. Check if any of them have the
2765 * same transition function and initial value.
2766 */
2767 existing_transno = find_compatible_pertrans(aggstate, aggref,
2768 shareable,
2769 transfn_oid, aggtranstype,
2770 serialfn_oid, deserialfn_oid,
2771 initValue, initValueIsNull,
2772 same_input_transnos);
2773 if (existing_transno != -1)
2774 {
2775 /*
2776 * Existing compatible trans found, so just point the 'peragg' to
2777 * the same per-trans struct, and mark the trans state as shared.
2778 */
2779 pertrans = &pertransstates[existing_transno];
2780 pertrans->aggshared = true;
2781 peragg->transno = existing_transno;
2782 }
2783 else
2784 {
2785 pertrans = &pertransstates[++transno];
2786 build_pertrans_for_aggref(pertrans, aggstate, estate,
2787 aggref, transfn_oid, aggtranstype,
2788 serialfn_oid, deserialfn_oid,
2789 initValue, initValueIsNull,
2790 inputTypes, numArguments);
2791 peragg->transno = transno;
2792 }
2793 ReleaseSysCache(aggTuple);
2794 }
2795
2796 /*
2797 * Update aggstate->numaggs to be the number of unique aggregates found.
2798 * Also set numstates to the number of unique transition states found.
2799 */
2800 aggstate->numaggs = aggno + 1;
2801 aggstate->numtrans = transno + 1;
2802
2803 /*
2804 * Last, check whether any more aggregates got added onto the node while
2805 * we processed the expressions for the aggregate arguments (including not
2806 * only the regular arguments and FILTER expressions handled immediately
2807 * above, but any direct arguments we might've handled earlier). If so,
2808 * we have nested aggregate functions, which is semantically nonsensical,
2809 * so complain. (This should have been caught by the parser, so we don't
2810 * need to work hard on a helpful error message; but we defend against it
2811 * here anyway, just to be sure.)
2812 */
2813 if (numaggs != list_length(aggstate->aggs))
2814 ereport(ERROR,
2815 (errcode(ERRCODE_GROUPING_ERROR),
2816 errmsg("aggregate function calls cannot be nested")));
2817
2818 /*
2819 * Build expressions doing all the transition work at once. We build a
2820 * different one for each phase, as the number of transition function
2821 * invocation can differ between phases. Note this'll work both for
2822 * transition and combination functions (although there'll only be one
2823 * phase in the latter case).
2824 */
2825 for (phaseidx = 0; phaseidx < aggstate->numphases; phaseidx++)
2826 {
2827 AggStatePerPhase phase = &aggstate->phases[phaseidx];
2828 bool dohash = false;
2829 bool dosort = false;
2830
2831 /* phase 0 doesn't necessarily exist */
2832 if (!phase->aggnode)
2833 continue;
2834
2835 if (aggstate->aggstrategy == AGG_MIXED && phaseidx == 1)
2836 {
2837 /*
2838 * Phase one, and only phase one, in a mixed agg performs both
2839 * sorting and aggregation.
2840 */
2841 dohash = true;
2842 dosort = true;
2843 }
2844 else if (aggstate->aggstrategy == AGG_MIXED && phaseidx == 0)
2845 {
2846 /*
2847 * No need to compute a transition function for an AGG_MIXED phase
2848 * 0 - the contents of the hashtables will have been computed
2849 * during phase 1.
2850 */
2851 continue;
2852 }
2853 else if (phase->aggstrategy == AGG_PLAIN ||
2854 phase->aggstrategy == AGG_SORTED)
2855 {
2856 dohash = false;
2857 dosort = true;
2858 }
2859 else if (phase->aggstrategy == AGG_HASHED)
2860 {
2861 dohash = true;
2862 dosort = false;
2863 }
2864 else
2865 Assert(false);
2866
2867 phase->evaltrans = ExecBuildAggTrans(aggstate, phase, dosort, dohash);
2868
2869 }
2870
2871 return aggstate;
2872 }
2873
2874 /*
2875 * Build the state needed to calculate a state value for an aggregate.
2876 *
2877 * This initializes all the fields in 'pertrans'. 'aggref' is the aggregate
2878 * to initialize the state for. 'aggtransfn', 'aggtranstype', and the rest
2879 * of the arguments could be calculated from 'aggref', but the caller has
2880 * calculated them already, so might as well pass them.
2881 */
2882 static void
build_pertrans_for_aggref(AggStatePerTrans pertrans,AggState * aggstate,EState * estate,Aggref * aggref,Oid aggtransfn,Oid aggtranstype,Oid aggserialfn,Oid aggdeserialfn,Datum initValue,bool initValueIsNull,Oid * inputTypes,int numArguments)2883 build_pertrans_for_aggref(AggStatePerTrans pertrans,
2884 AggState *aggstate, EState *estate,
2885 Aggref *aggref,
2886 Oid aggtransfn, Oid aggtranstype,
2887 Oid aggserialfn, Oid aggdeserialfn,
2888 Datum initValue, bool initValueIsNull,
2889 Oid *inputTypes, int numArguments)
2890 {
2891 int numGroupingSets = Max(aggstate->maxsets, 1);
2892 Expr *serialfnexpr = NULL;
2893 Expr *deserialfnexpr = NULL;
2894 ListCell *lc;
2895 int numInputs;
2896 int numDirectArgs;
2897 List *sortlist;
2898 int numSortCols;
2899 int numDistinctCols;
2900 int i;
2901
2902 /* Begin filling in the pertrans data */
2903 pertrans->aggref = aggref;
2904 pertrans->aggshared = false;
2905 pertrans->aggCollation = aggref->inputcollid;
2906 pertrans->transfn_oid = aggtransfn;
2907 pertrans->serialfn_oid = aggserialfn;
2908 pertrans->deserialfn_oid = aggdeserialfn;
2909 pertrans->initValue = initValue;
2910 pertrans->initValueIsNull = initValueIsNull;
2911
2912 /* Count the "direct" arguments, if any */
2913 numDirectArgs = list_length(aggref->aggdirectargs);
2914
2915 /* Count the number of aggregated input columns */
2916 pertrans->numInputs = numInputs = list_length(aggref->args);
2917
2918 pertrans->aggtranstype = aggtranstype;
2919
2920 /*
2921 * When combining states, we have no use at all for the aggregate
2922 * function's transfn. Instead we use the combinefn. In this case, the
2923 * transfn and transfn_oid fields of pertrans refer to the combine
2924 * function rather than the transition function.
2925 */
2926 if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit))
2927 {
2928 Expr *combinefnexpr;
2929 size_t numTransArgs;
2930
2931 /*
2932 * When combining there's only one input, the to-be-combined added
2933 * transition value from below (this node's transition value is
2934 * counted separately).
2935 */
2936 pertrans->numTransInputs = 1;
2937
2938 /* account for the current transition state */
2939 numTransArgs = pertrans->numTransInputs + 1;
2940
2941 build_aggregate_combinefn_expr(aggtranstype,
2942 aggref->inputcollid,
2943 aggtransfn,
2944 &combinefnexpr);
2945 fmgr_info(aggtransfn, &pertrans->transfn);
2946 fmgr_info_set_expr((Node *) combinefnexpr, &pertrans->transfn);
2947
2948 InitFunctionCallInfoData(pertrans->transfn_fcinfo,
2949 &pertrans->transfn,
2950 numTransArgs,
2951 pertrans->aggCollation,
2952 (void *) aggstate, NULL);
2953
2954 /*
2955 * Ensure that a combine function to combine INTERNAL states is not
2956 * strict. This should have been checked during CREATE AGGREGATE, but
2957 * the strict property could have been changed since then.
2958 */
2959 if (pertrans->transfn.fn_strict && aggtranstype == INTERNALOID)
2960 ereport(ERROR,
2961 (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
2962 errmsg("combine function with transition type %s must not be declared STRICT",
2963 format_type_be(aggtranstype))));
2964 }
2965 else
2966 {
2967 Expr *transfnexpr;
2968 size_t numTransArgs;
2969
2970 /* Detect how many arguments to pass to the transfn */
2971 if (AGGKIND_IS_ORDERED_SET(aggref->aggkind))
2972 pertrans->numTransInputs = numInputs;
2973 else
2974 pertrans->numTransInputs = numArguments;
2975
2976 /* account for the current transition state */
2977 numTransArgs = pertrans->numTransInputs + 1;
2978
2979 /*
2980 * Set up infrastructure for calling the transfn. Note that invtrans
2981 * is not needed here.
2982 */
2983 build_aggregate_transfn_expr(inputTypes,
2984 numArguments,
2985 numDirectArgs,
2986 aggref->aggvariadic,
2987 aggtranstype,
2988 aggref->inputcollid,
2989 aggtransfn,
2990 InvalidOid,
2991 &transfnexpr,
2992 NULL);
2993 fmgr_info(aggtransfn, &pertrans->transfn);
2994 fmgr_info_set_expr((Node *) transfnexpr, &pertrans->transfn);
2995
2996 InitFunctionCallInfoData(pertrans->transfn_fcinfo,
2997 &pertrans->transfn,
2998 numTransArgs,
2999 pertrans->aggCollation,
3000 (void *) aggstate, NULL);
3001
3002 /*
3003 * If the transfn is strict and the initval is NULL, make sure input
3004 * type and transtype are the same (or at least binary-compatible), so
3005 * that it's OK to use the first aggregated input value as the initial
3006 * transValue. This should have been checked at agg definition time,
3007 * but we must check again in case the transfn's strictness property
3008 * has been changed.
3009 */
3010 if (pertrans->transfn.fn_strict && pertrans->initValueIsNull)
3011 {
3012 if (numArguments <= numDirectArgs ||
3013 !IsBinaryCoercible(inputTypes[numDirectArgs],
3014 aggtranstype))
3015 ereport(ERROR,
3016 (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
3017 errmsg("aggregate %u needs to have compatible input type and transition type",
3018 aggref->aggfnoid)));
3019 }
3020 }
3021
3022 /* get info about the state value's datatype */
3023 get_typlenbyval(aggtranstype,
3024 &pertrans->transtypeLen,
3025 &pertrans->transtypeByVal);
3026
3027 if (OidIsValid(aggserialfn))
3028 {
3029 build_aggregate_serialfn_expr(aggserialfn,
3030 &serialfnexpr);
3031 fmgr_info(aggserialfn, &pertrans->serialfn);
3032 fmgr_info_set_expr((Node *) serialfnexpr, &pertrans->serialfn);
3033
3034 InitFunctionCallInfoData(pertrans->serialfn_fcinfo,
3035 &pertrans->serialfn,
3036 1,
3037 InvalidOid,
3038 (void *) aggstate, NULL);
3039 }
3040
3041 if (OidIsValid(aggdeserialfn))
3042 {
3043 build_aggregate_deserialfn_expr(aggdeserialfn,
3044 &deserialfnexpr);
3045 fmgr_info(aggdeserialfn, &pertrans->deserialfn);
3046 fmgr_info_set_expr((Node *) deserialfnexpr, &pertrans->deserialfn);
3047
3048 InitFunctionCallInfoData(pertrans->deserialfn_fcinfo,
3049 &pertrans->deserialfn,
3050 2,
3051 InvalidOid,
3052 (void *) aggstate, NULL);
3053
3054 }
3055
3056 /*
3057 * If we're doing either DISTINCT or ORDER BY for a plain agg, then we
3058 * have a list of SortGroupClause nodes; fish out the data in them and
3059 * stick them into arrays. We ignore ORDER BY for an ordered-set agg,
3060 * however; the agg's transfn and finalfn are responsible for that.
3061 *
3062 * Note that by construction, if there is a DISTINCT clause then the ORDER
3063 * BY clause is a prefix of it (see transformDistinctClause).
3064 */
3065 if (AGGKIND_IS_ORDERED_SET(aggref->aggkind))
3066 {
3067 sortlist = NIL;
3068 numSortCols = numDistinctCols = 0;
3069 }
3070 else if (aggref->aggdistinct)
3071 {
3072 sortlist = aggref->aggdistinct;
3073 numSortCols = numDistinctCols = list_length(sortlist);
3074 Assert(numSortCols >= list_length(aggref->aggorder));
3075 }
3076 else
3077 {
3078 sortlist = aggref->aggorder;
3079 numSortCols = list_length(sortlist);
3080 numDistinctCols = 0;
3081 }
3082
3083 pertrans->numSortCols = numSortCols;
3084 pertrans->numDistinctCols = numDistinctCols;
3085
3086 /*
3087 * If we have either sorting or filtering to do, create a tupledesc and
3088 * slot corresponding to the aggregated inputs (including sort
3089 * expressions) of the agg.
3090 */
3091 if (numSortCols > 0 || aggref->aggfilter)
3092 {
3093 pertrans->sortdesc = ExecTypeFromTL(aggref->args, false);
3094 pertrans->sortslot =
3095 ExecInitExtraTupleSlot(estate, pertrans->sortdesc);
3096 }
3097
3098 if (numSortCols > 0)
3099 {
3100 /*
3101 * We don't implement DISTINCT or ORDER BY aggs in the HASHED case
3102 * (yet)
3103 */
3104 Assert(aggstate->aggstrategy != AGG_HASHED && aggstate->aggstrategy != AGG_MIXED);
3105
3106 /* If we have only one input, we need its len/byval info. */
3107 if (numInputs == 1)
3108 {
3109 get_typlenbyval(inputTypes[numDirectArgs],
3110 &pertrans->inputtypeLen,
3111 &pertrans->inputtypeByVal);
3112 }
3113 else if (numDistinctCols > 0)
3114 {
3115 /* we will need an extra slot to store prior values */
3116 pertrans->uniqslot =
3117 ExecInitExtraTupleSlot(estate, pertrans->sortdesc);
3118 }
3119
3120 /* Extract the sort information for use later */
3121 pertrans->sortColIdx =
3122 (AttrNumber *) palloc(numSortCols * sizeof(AttrNumber));
3123 pertrans->sortOperators =
3124 (Oid *) palloc(numSortCols * sizeof(Oid));
3125 pertrans->sortCollations =
3126 (Oid *) palloc(numSortCols * sizeof(Oid));
3127 pertrans->sortNullsFirst =
3128 (bool *) palloc(numSortCols * sizeof(bool));
3129
3130 i = 0;
3131 foreach(lc, sortlist)
3132 {
3133 SortGroupClause *sortcl = (SortGroupClause *) lfirst(lc);
3134 TargetEntry *tle = get_sortgroupclause_tle(sortcl, aggref->args);
3135
3136 /* the parser should have made sure of this */
3137 Assert(OidIsValid(sortcl->sortop));
3138
3139 pertrans->sortColIdx[i] = tle->resno;
3140 pertrans->sortOperators[i] = sortcl->sortop;
3141 pertrans->sortCollations[i] = exprCollation((Node *) tle->expr);
3142 pertrans->sortNullsFirst[i] = sortcl->nulls_first;
3143 i++;
3144 }
3145 Assert(i == numSortCols);
3146 }
3147
3148 if (aggref->aggdistinct)
3149 {
3150 Oid *ops;
3151
3152 Assert(numArguments > 0);
3153 Assert(list_length(aggref->aggdistinct) == numDistinctCols);
3154
3155 ops = palloc(numDistinctCols * sizeof(Oid));
3156
3157 i = 0;
3158 foreach(lc, aggref->aggdistinct)
3159 ops[i++] = ((SortGroupClause *) lfirst(lc))->eqop;
3160
3161 /* lookup / build the necessary comparators */
3162 if (numDistinctCols == 1)
3163 fmgr_info(get_opcode(ops[0]), &pertrans->equalfnOne);
3164 else
3165 pertrans->equalfnMulti =
3166 execTuplesMatchPrepare(pertrans->sortdesc,
3167 numDistinctCols,
3168 pertrans->sortColIdx,
3169 ops,
3170 &aggstate->ss.ps);
3171 pfree(ops);
3172 }
3173
3174 pertrans->sortstates = (Tuplesortstate **)
3175 palloc0(sizeof(Tuplesortstate *) * numGroupingSets);
3176 }
3177
3178
3179 static Datum
GetAggInitVal(Datum textInitVal,Oid transtype)3180 GetAggInitVal(Datum textInitVal, Oid transtype)
3181 {
3182 Oid typinput,
3183 typioparam;
3184 char *strInitVal;
3185 Datum initVal;
3186
3187 getTypeInputInfo(transtype, &typinput, &typioparam);
3188 strInitVal = TextDatumGetCString(textInitVal);
3189 initVal = OidInputFunctionCall(typinput, strInitVal,
3190 typioparam, -1);
3191 pfree(strInitVal);
3192 return initVal;
3193 }
3194
3195 /*
3196 * find_compatible_peragg - search for a previously initialized per-Agg struct
3197 *
3198 * Searches the previously looked at aggregates to find one which is compatible
3199 * with this one, with the same input parameters. If no compatible aggregate
3200 * can be found, returns -1.
3201 *
3202 * As a side-effect, this also collects a list of existing, shareable per-Trans
3203 * structs with matching inputs. If no identical Aggref is found, the list is
3204 * passed later to find_compatible_pertrans, to see if we can at least reuse
3205 * the state value of another aggregate.
3206 */
3207 static int
find_compatible_peragg(Aggref * newagg,AggState * aggstate,int lastaggno,List ** same_input_transnos)3208 find_compatible_peragg(Aggref *newagg, AggState *aggstate,
3209 int lastaggno, List **same_input_transnos)
3210 {
3211 int aggno;
3212 AggStatePerAgg peraggs;
3213
3214 *same_input_transnos = NIL;
3215
3216 /* we mustn't reuse the aggref if it contains volatile function calls */
3217 if (contain_volatile_functions((Node *) newagg))
3218 return -1;
3219
3220 peraggs = aggstate->peragg;
3221
3222 /*
3223 * Search through the list of already seen aggregates. If we find an
3224 * existing identical aggregate call, then we can re-use that one. While
3225 * searching, we'll also collect a list of Aggrefs with the same input
3226 * parameters. If no matching Aggref is found, the caller can potentially
3227 * still re-use the transition state of one of them. (At this stage we
3228 * just compare the parsetrees; whether different aggregates share the
3229 * same transition function will be checked later.)
3230 */
3231 for (aggno = 0; aggno <= lastaggno; aggno++)
3232 {
3233 AggStatePerAgg peragg;
3234 Aggref *existingRef;
3235
3236 peragg = &peraggs[aggno];
3237 existingRef = peragg->aggref;
3238
3239 /* all of the following must be the same or it's no match */
3240 if (newagg->inputcollid != existingRef->inputcollid ||
3241 newagg->aggtranstype != existingRef->aggtranstype ||
3242 newagg->aggstar != existingRef->aggstar ||
3243 newagg->aggvariadic != existingRef->aggvariadic ||
3244 newagg->aggkind != existingRef->aggkind ||
3245 !equal(newagg->args, existingRef->args) ||
3246 !equal(newagg->aggorder, existingRef->aggorder) ||
3247 !equal(newagg->aggdistinct, existingRef->aggdistinct) ||
3248 !equal(newagg->aggfilter, existingRef->aggfilter))
3249 continue;
3250
3251 /* if it's the same aggregate function then report exact match */
3252 if (newagg->aggfnoid == existingRef->aggfnoid &&
3253 newagg->aggtype == existingRef->aggtype &&
3254 newagg->aggcollid == existingRef->aggcollid &&
3255 equal(newagg->aggdirectargs, existingRef->aggdirectargs))
3256 {
3257 list_free(*same_input_transnos);
3258 *same_input_transnos = NIL;
3259 return aggno;
3260 }
3261
3262 /*
3263 * Not identical, but it had the same inputs. If the final function
3264 * permits sharing, return its transno to the caller, in case we can
3265 * re-use its per-trans state. (If there's already sharing going on,
3266 * we might report a transno more than once. find_compatible_pertrans
3267 * is cheap enough that it's not worth spending cycles to avoid that.)
3268 */
3269 if (peragg->shareable)
3270 *same_input_transnos = lappend_int(*same_input_transnos,
3271 peragg->transno);
3272 }
3273
3274 return -1;
3275 }
3276
3277 /*
3278 * find_compatible_pertrans - search for a previously initialized per-Trans
3279 * struct
3280 *
3281 * Searches the list of transnos for a per-Trans struct with the same
3282 * transition function and initial condition. (The inputs have already been
3283 * verified to match.)
3284 */
3285 static int
find_compatible_pertrans(AggState * aggstate,Aggref * newagg,bool shareable,Oid aggtransfn,Oid aggtranstype,Oid aggserialfn,Oid aggdeserialfn,Datum initValue,bool initValueIsNull,List * transnos)3286 find_compatible_pertrans(AggState *aggstate, Aggref *newagg, bool shareable,
3287 Oid aggtransfn, Oid aggtranstype,
3288 Oid aggserialfn, Oid aggdeserialfn,
3289 Datum initValue, bool initValueIsNull,
3290 List *transnos)
3291 {
3292 ListCell *lc;
3293
3294 /* If this aggregate can't share transition states, give up */
3295 if (!shareable)
3296 return -1;
3297
3298 foreach(lc, transnos)
3299 {
3300 int transno = lfirst_int(lc);
3301 AggStatePerTrans pertrans = &aggstate->pertrans[transno];
3302
3303 /*
3304 * if the transfns or transition state types are not the same then the
3305 * state can't be shared.
3306 */
3307 if (aggtransfn != pertrans->transfn_oid ||
3308 aggtranstype != pertrans->aggtranstype)
3309 continue;
3310
3311 /*
3312 * The serialization and deserialization functions must match, if
3313 * present, as we're unable to share the trans state for aggregates
3314 * which will serialize or deserialize into different formats.
3315 * Remember that these will be InvalidOid if they're not required for
3316 * this agg node.
3317 */
3318 if (aggserialfn != pertrans->serialfn_oid ||
3319 aggdeserialfn != pertrans->deserialfn_oid)
3320 continue;
3321
3322 /*
3323 * Check that the initial condition matches, too.
3324 */
3325 if (initValueIsNull && pertrans->initValueIsNull)
3326 return transno;
3327
3328 if (!initValueIsNull && !pertrans->initValueIsNull &&
3329 datumIsEqual(initValue, pertrans->initValue,
3330 pertrans->transtypeByVal, pertrans->transtypeLen))
3331 return transno;
3332 }
3333 return -1;
3334 }
3335
3336 void
ExecEndAgg(AggState * node)3337 ExecEndAgg(AggState *node)
3338 {
3339 PlanState *outerPlan;
3340 int transno;
3341 int numGroupingSets = Max(node->maxsets, 1);
3342 int setno;
3343
3344 /* Make sure we have closed any open tuplesorts */
3345
3346 if (node->sort_in)
3347 tuplesort_end(node->sort_in);
3348 if (node->sort_out)
3349 tuplesort_end(node->sort_out);
3350
3351 for (transno = 0; transno < node->numtrans; transno++)
3352 {
3353 AggStatePerTrans pertrans = &node->pertrans[transno];
3354
3355 for (setno = 0; setno < numGroupingSets; setno++)
3356 {
3357 if (pertrans->sortstates[setno])
3358 tuplesort_end(pertrans->sortstates[setno]);
3359 }
3360 }
3361
3362 /* And ensure any agg shutdown callbacks have been called */
3363 for (setno = 0; setno < numGroupingSets; setno++)
3364 ReScanExprContext(node->aggcontexts[setno]);
3365 if (node->hashcontext)
3366 ReScanExprContext(node->hashcontext);
3367
3368 /*
3369 * We don't actually free any ExprContexts here (see comment in
3370 * ExecFreeExprContext), just unlinking the output one from the plan node
3371 * suffices.
3372 */
3373 ExecFreeExprContext(&node->ss.ps);
3374
3375 /* clean up tuple table */
3376 ExecClearTuple(node->ss.ss_ScanTupleSlot);
3377
3378 outerPlan = outerPlanState(node);
3379 ExecEndNode(outerPlan);
3380 }
3381
3382 void
ExecReScanAgg(AggState * node)3383 ExecReScanAgg(AggState *node)
3384 {
3385 ExprContext *econtext = node->ss.ps.ps_ExprContext;
3386 PlanState *outerPlan = outerPlanState(node);
3387 Agg *aggnode = (Agg *) node->ss.ps.plan;
3388 int transno;
3389 int numGroupingSets = Max(node->maxsets, 1);
3390 int setno;
3391
3392 node->agg_done = false;
3393
3394 if (node->aggstrategy == AGG_HASHED)
3395 {
3396 /*
3397 * In the hashed case, if we haven't yet built the hash table then we
3398 * can just return; nothing done yet, so nothing to undo. If subnode's
3399 * chgParam is not NULL then it will be re-scanned by ExecProcNode,
3400 * else no reason to re-scan it at all.
3401 */
3402 if (!node->table_filled)
3403 return;
3404
3405 /*
3406 * If we do have the hash table, and the subplan does not have any
3407 * parameter changes, and none of our own parameter changes affect
3408 * input expressions of the aggregated functions, then we can just
3409 * rescan the existing hash table; no need to build it again.
3410 */
3411 if (outerPlan->chgParam == NULL &&
3412 !bms_overlap(node->ss.ps.chgParam, aggnode->aggParams))
3413 {
3414 ResetTupleHashIterator(node->perhash[0].hashtable,
3415 &node->perhash[0].hashiter);
3416 select_current_set(node, 0, true);
3417 return;
3418 }
3419 }
3420
3421 /* Make sure we have closed any open tuplesorts */
3422 for (transno = 0; transno < node->numtrans; transno++)
3423 {
3424 for (setno = 0; setno < numGroupingSets; setno++)
3425 {
3426 AggStatePerTrans pertrans = &node->pertrans[transno];
3427
3428 if (pertrans->sortstates[setno])
3429 {
3430 tuplesort_end(pertrans->sortstates[setno]);
3431 pertrans->sortstates[setno] = NULL;
3432 }
3433 }
3434 }
3435
3436 /*
3437 * We don't need to ReScanExprContext the output tuple context here;
3438 * ExecReScan already did it. But we do need to reset our per-grouping-set
3439 * contexts, which may have transvalues stored in them. (We use rescan
3440 * rather than just reset because transfns may have registered callbacks
3441 * that need to be run now.) For the AGG_HASHED case, see below.
3442 */
3443
3444 for (setno = 0; setno < numGroupingSets; setno++)
3445 {
3446 ReScanExprContext(node->aggcontexts[setno]);
3447 }
3448
3449 /* Release first tuple of group, if we have made a copy */
3450 if (node->grp_firstTuple != NULL)
3451 {
3452 heap_freetuple(node->grp_firstTuple);
3453 node->grp_firstTuple = NULL;
3454 }
3455 ExecClearTuple(node->ss.ss_ScanTupleSlot);
3456
3457 /* Forget current agg values */
3458 MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * node->numaggs);
3459 MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * node->numaggs);
3460
3461 /*
3462 * With AGG_HASHED/MIXED, the hash table is allocated in a sub-context of
3463 * the hashcontext. This used to be an issue, but now, resetting a context
3464 * automatically deletes sub-contexts too.
3465 */
3466 if (node->aggstrategy == AGG_HASHED || node->aggstrategy == AGG_MIXED)
3467 {
3468 ReScanExprContext(node->hashcontext);
3469 /* Rebuild an empty hash table */
3470 build_hash_table(node);
3471 node->table_filled = false;
3472 /* iterator will be reset when the table is filled */
3473 }
3474
3475 if (node->aggstrategy != AGG_HASHED)
3476 {
3477 /*
3478 * Reset the per-group state (in particular, mark transvalues null)
3479 */
3480 for (setno = 0; setno < numGroupingSets; setno++)
3481 {
3482 MemSet(node->pergroups[setno], 0,
3483 sizeof(AggStatePerGroupData) * node->numaggs);
3484 }
3485
3486 /* reset to phase 1 */
3487 initialize_phase(node, 1);
3488
3489 node->input_done = false;
3490 node->projected_set = -1;
3491 }
3492
3493 if (outerPlan->chgParam == NULL)
3494 ExecReScan(outerPlan);
3495 }
3496
3497
3498 /***********************************************************************
3499 * API exposed to aggregate functions
3500 ***********************************************************************/
3501
3502
3503 /*
3504 * AggCheckCallContext - test if a SQL function is being called as an aggregate
3505 *
3506 * The transition and/or final functions of an aggregate may want to verify
3507 * that they are being called as aggregates, rather than as plain SQL
3508 * functions. They should use this function to do so. The return value
3509 * is nonzero if being called as an aggregate, or zero if not. (Specific
3510 * nonzero values are AGG_CONTEXT_AGGREGATE or AGG_CONTEXT_WINDOW, but more
3511 * values could conceivably appear in future.)
3512 *
3513 * If aggcontext isn't NULL, the function also stores at *aggcontext the
3514 * identity of the memory context that aggregate transition values are being
3515 * stored in. Note that the same aggregate call site (flinfo) may be called
3516 * interleaved on different transition values in different contexts, so it's
3517 * not kosher to cache aggcontext under fn_extra. It is, however, kosher to
3518 * cache it in the transvalue itself (for internal-type transvalues).
3519 */
3520 int
AggCheckCallContext(FunctionCallInfo fcinfo,MemoryContext * aggcontext)3521 AggCheckCallContext(FunctionCallInfo fcinfo, MemoryContext *aggcontext)
3522 {
3523 if (fcinfo->context && IsA(fcinfo->context, AggState))
3524 {
3525 if (aggcontext)
3526 {
3527 AggState *aggstate = ((AggState *) fcinfo->context);
3528 ExprContext *cxt = aggstate->curaggcontext;
3529
3530 *aggcontext = cxt->ecxt_per_tuple_memory;
3531 }
3532 return AGG_CONTEXT_AGGREGATE;
3533 }
3534 if (fcinfo->context && IsA(fcinfo->context, WindowAggState))
3535 {
3536 if (aggcontext)
3537 *aggcontext = ((WindowAggState *) fcinfo->context)->curaggcontext;
3538 return AGG_CONTEXT_WINDOW;
3539 }
3540
3541 /* this is just to prevent "uninitialized variable" warnings */
3542 if (aggcontext)
3543 *aggcontext = NULL;
3544 return 0;
3545 }
3546
3547 /*
3548 * AggGetAggref - allow an aggregate support function to get its Aggref
3549 *
3550 * If the function is being called as an aggregate support function,
3551 * return the Aggref node for the aggregate call. Otherwise, return NULL.
3552 *
3553 * Aggregates sharing the same inputs and transition functions can get
3554 * merged into a single transition calculation. If the transition function
3555 * calls AggGetAggref, it will get some one of the Aggrefs for which it is
3556 * executing. It must therefore not pay attention to the Aggref fields that
3557 * relate to the final function, as those are indeterminate. But if a final
3558 * function calls AggGetAggref, it will get a precise result.
3559 *
3560 * Note that if an aggregate is being used as a window function, this will
3561 * return NULL. We could provide a similar function to return the relevant
3562 * WindowFunc node in such cases, but it's not needed yet.
3563 */
3564 Aggref *
AggGetAggref(FunctionCallInfo fcinfo)3565 AggGetAggref(FunctionCallInfo fcinfo)
3566 {
3567 if (fcinfo->context && IsA(fcinfo->context, AggState))
3568 {
3569 AggState *aggstate = (AggState *) fcinfo->context;
3570 AggStatePerAgg curperagg;
3571 AggStatePerTrans curpertrans;
3572
3573 /* check curperagg (valid when in a final function) */
3574 curperagg = aggstate->curperagg;
3575
3576 if (curperagg)
3577 return curperagg->aggref;
3578
3579 /* check curpertrans (valid when in a transition function) */
3580 curpertrans = aggstate->curpertrans;
3581
3582 if (curpertrans)
3583 return curpertrans->aggref;
3584 }
3585 return NULL;
3586 }
3587
3588 /*
3589 * AggGetTempMemoryContext - fetch short-term memory context for aggregates
3590 *
3591 * This is useful in agg final functions; the context returned is one that
3592 * the final function can safely reset as desired. This isn't useful for
3593 * transition functions, since the context returned MAY (we don't promise)
3594 * be the same as the context those are called in.
3595 *
3596 * As above, this is currently not useful for aggs called as window functions.
3597 */
3598 MemoryContext
AggGetTempMemoryContext(FunctionCallInfo fcinfo)3599 AggGetTempMemoryContext(FunctionCallInfo fcinfo)
3600 {
3601 if (fcinfo->context && IsA(fcinfo->context, AggState))
3602 {
3603 AggState *aggstate = (AggState *) fcinfo->context;
3604
3605 return aggstate->tmpcontext->ecxt_per_tuple_memory;
3606 }
3607 return NULL;
3608 }
3609
3610 /*
3611 * AggStateIsShared - find out whether transition state is shared
3612 *
3613 * If the function is being called as an aggregate support function,
3614 * return true if the aggregate's transition state is shared across
3615 * multiple aggregates, false if it is not.
3616 *
3617 * Returns true if not called as an aggregate support function.
3618 * This is intended as a conservative answer, ie "no you'd better not
3619 * scribble on your input". In particular, will return true if the
3620 * aggregate is being used as a window function, which is a scenario
3621 * in which changing the transition state is a bad idea. We might
3622 * want to refine the behavior for the window case in future.
3623 */
3624 bool
AggStateIsShared(FunctionCallInfo fcinfo)3625 AggStateIsShared(FunctionCallInfo fcinfo)
3626 {
3627 if (fcinfo->context && IsA(fcinfo->context, AggState))
3628 {
3629 AggState *aggstate = (AggState *) fcinfo->context;
3630 AggStatePerAgg curperagg;
3631 AggStatePerTrans curpertrans;
3632
3633 /* check curperagg (valid when in a final function) */
3634 curperagg = aggstate->curperagg;
3635
3636 if (curperagg)
3637 return aggstate->pertrans[curperagg->transno].aggshared;
3638
3639 /* check curpertrans (valid when in a transition function) */
3640 curpertrans = aggstate->curpertrans;
3641
3642 if (curpertrans)
3643 return curpertrans->aggshared;
3644 }
3645 return true;
3646 }
3647
3648 /*
3649 * AggRegisterCallback - register a cleanup callback for an aggregate
3650 *
3651 * This is useful for aggs to register shutdown callbacks, which will ensure
3652 * that non-memory resources are freed. The callback will occur just before
3653 * the associated aggcontext (as returned by AggCheckCallContext) is reset,
3654 * either between groups or as a result of rescanning the query. The callback
3655 * will NOT be called on error paths. The typical use-case is for freeing of
3656 * tuplestores or tuplesorts maintained in aggcontext, or pins held by slots
3657 * created by the agg functions. (The callback will not be called until after
3658 * the result of the finalfn is no longer needed, so it's safe for the finalfn
3659 * to return data that will be freed by the callback.)
3660 *
3661 * As above, this is currently not useful for aggs called as window functions.
3662 */
3663 void
AggRegisterCallback(FunctionCallInfo fcinfo,ExprContextCallbackFunction func,Datum arg)3664 AggRegisterCallback(FunctionCallInfo fcinfo,
3665 ExprContextCallbackFunction func,
3666 Datum arg)
3667 {
3668 if (fcinfo->context && IsA(fcinfo->context, AggState))
3669 {
3670 AggState *aggstate = (AggState *) fcinfo->context;
3671 ExprContext *cxt = aggstate->curaggcontext;
3672
3673 RegisterExprContextCallback(cxt, func, arg);
3674
3675 return;
3676 }
3677 elog(ERROR, "aggregate function cannot register a callback in this context");
3678 }
3679
3680
3681 /*
3682 * aggregate_dummy - dummy execution routine for aggregate functions
3683 *
3684 * This function is listed as the implementation (prosrc field) of pg_proc
3685 * entries for aggregate functions. Its only purpose is to throw an error
3686 * if someone mistakenly executes such a function in the normal way.
3687 *
3688 * Perhaps someday we could assign real meaning to the prosrc field of
3689 * an aggregate?
3690 */
3691 Datum
aggregate_dummy(PG_FUNCTION_ARGS)3692 aggregate_dummy(PG_FUNCTION_ARGS)
3693 {
3694 elog(ERROR, "aggregate function %u called as normal function",
3695 fcinfo->flinfo->fn_oid);
3696 return (Datum) 0; /* keep compiler quiet */
3697 }
3698