1 /*-------------------------------------------------------------------------
2 *
3 * nodeWindowAgg.c
4 * routines to handle WindowAgg nodes.
5 *
6 * A WindowAgg node evaluates "window functions" across suitable partitions
7 * of the input tuple set. Any one WindowAgg works for just a single window
8 * specification, though it can evaluate multiple window functions sharing
9 * identical window specifications. The input tuples are required to be
10 * delivered in sorted order, with the PARTITION BY columns (if any) as
11 * major sort keys and the ORDER BY columns (if any) as minor sort keys.
12 * (The planner generates a stack of WindowAggs with intervening Sort nodes
13 * as needed, if a query involves more than one window specification.)
14 *
15 * Since window functions can require access to any or all of the rows in
16 * the current partition, we accumulate rows of the partition into a
17 * tuplestore. The window functions are called using the WindowObject API
18 * so that they can access those rows as needed.
19 *
20 * We also support using plain aggregate functions as window functions.
21 * For these, the regular Agg-node environment is emulated for each partition.
22 * As required by the SQL spec, the output represents the value of the
23 * aggregate function over all rows in the current row's window frame.
24 *
25 *
26 * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
27 * Portions Copyright (c) 1994, Regents of the University of California
28 *
29 * IDENTIFICATION
30 * src/backend/executor/nodeWindowAgg.c
31 *
32 *-------------------------------------------------------------------------
33 */
34 #include "postgres.h"
35
36 #include "access/htup_details.h"
37 #include "catalog/objectaccess.h"
38 #include "catalog/pg_aggregate.h"
39 #include "catalog/pg_proc.h"
40 #include "executor/executor.h"
41 #include "executor/nodeWindowAgg.h"
42 #include "miscadmin.h"
43 #include "nodes/nodeFuncs.h"
44 #include "optimizer/optimizer.h"
45 #include "parser/parse_agg.h"
46 #include "parser/parse_coerce.h"
47 #include "utils/acl.h"
48 #include "utils/builtins.h"
49 #include "utils/datum.h"
50 #include "utils/expandeddatum.h"
51 #include "utils/lsyscache.h"
52 #include "utils/memutils.h"
53 #include "utils/regproc.h"
54 #include "utils/syscache.h"
55 #include "windowapi.h"
56
57 /*
58 * All the window function APIs are called with this object, which is passed
59 * to window functions as fcinfo->context.
60 */
61 typedef struct WindowObjectData
62 {
63 NodeTag type;
64 WindowAggState *winstate; /* parent WindowAggState */
65 List *argstates; /* ExprState trees for fn's arguments */
66 void *localmem; /* WinGetPartitionLocalMemory's chunk */
67 int markptr; /* tuplestore mark pointer for this fn */
68 int readptr; /* tuplestore read pointer for this fn */
69 int64 markpos; /* row that markptr is positioned on */
70 int64 seekpos; /* row that readptr is positioned on */
71 } WindowObjectData;
72
73 /*
74 * We have one WindowStatePerFunc struct for each window function and
75 * window aggregate handled by this node.
76 */
77 typedef struct WindowStatePerFuncData
78 {
79 /* Links to WindowFunc expr and state nodes this working state is for */
80 WindowFuncExprState *wfuncstate;
81 WindowFunc *wfunc;
82
83 int numArguments; /* number of arguments */
84
85 FmgrInfo flinfo; /* fmgr lookup data for window function */
86
87 Oid winCollation; /* collation derived for window function */
88
89 /*
90 * We need the len and byval info for the result of each function in order
91 * to know how to copy/delete values.
92 */
93 int16 resulttypeLen;
94 bool resulttypeByVal;
95
96 bool plain_agg; /* is it just a plain aggregate function? */
97 int aggno; /* if so, index of its WindowStatePerAggData */
98
99 WindowObject winobj; /* object used in window function API */
100 } WindowStatePerFuncData;
101
102 /*
103 * For plain aggregate window functions, we also have one of these.
104 */
105 typedef struct WindowStatePerAggData
106 {
107 /* Oids of transition functions */
108 Oid transfn_oid;
109 Oid invtransfn_oid; /* may be InvalidOid */
110 Oid finalfn_oid; /* may be InvalidOid */
111
112 /*
113 * fmgr lookup data for transition functions --- only valid when
114 * corresponding oid is not InvalidOid. Note in particular that fn_strict
115 * flags are kept here.
116 */
117 FmgrInfo transfn;
118 FmgrInfo invtransfn;
119 FmgrInfo finalfn;
120
121 int numFinalArgs; /* number of arguments to pass to finalfn */
122
123 /*
124 * initial value from pg_aggregate entry
125 */
126 Datum initValue;
127 bool initValueIsNull;
128
129 /*
130 * cached value for current frame boundaries
131 */
132 Datum resultValue;
133 bool resultValueIsNull;
134
135 /*
136 * We need the len and byval info for the agg's input, result, and
137 * transition data types in order to know how to copy/delete values.
138 */
139 int16 inputtypeLen,
140 resulttypeLen,
141 transtypeLen;
142 bool inputtypeByVal,
143 resulttypeByVal,
144 transtypeByVal;
145
146 int wfuncno; /* index of associated WindowStatePerFuncData */
147
148 /* Context holding transition value and possibly other subsidiary data */
149 MemoryContext aggcontext; /* may be private, or winstate->aggcontext */
150
151 /* Current transition value */
152 Datum transValue; /* current transition value */
153 bool transValueIsNull;
154
155 int64 transValueCount; /* number of currently-aggregated rows */
156
157 /* Data local to eval_windowaggregates() */
158 bool restart; /* need to restart this agg in this cycle? */
159 } WindowStatePerAggData;
160
161 static void initialize_windowaggregate(WindowAggState *winstate,
162 WindowStatePerFunc perfuncstate,
163 WindowStatePerAgg peraggstate);
164 static void advance_windowaggregate(WindowAggState *winstate,
165 WindowStatePerFunc perfuncstate,
166 WindowStatePerAgg peraggstate);
167 static bool advance_windowaggregate_base(WindowAggState *winstate,
168 WindowStatePerFunc perfuncstate,
169 WindowStatePerAgg peraggstate);
170 static void finalize_windowaggregate(WindowAggState *winstate,
171 WindowStatePerFunc perfuncstate,
172 WindowStatePerAgg peraggstate,
173 Datum *result, bool *isnull);
174
175 static void eval_windowaggregates(WindowAggState *winstate);
176 static void eval_windowfunction(WindowAggState *winstate,
177 WindowStatePerFunc perfuncstate,
178 Datum *result, bool *isnull);
179
180 static void begin_partition(WindowAggState *winstate);
181 static void spool_tuples(WindowAggState *winstate, int64 pos);
182 static void release_partition(WindowAggState *winstate);
183
184 static int row_is_in_frame(WindowAggState *winstate, int64 pos,
185 TupleTableSlot *slot);
186 static void update_frameheadpos(WindowAggState *winstate);
187 static void update_frametailpos(WindowAggState *winstate);
188 static void update_grouptailpos(WindowAggState *winstate);
189
190 static WindowStatePerAggData *initialize_peragg(WindowAggState *winstate,
191 WindowFunc *wfunc,
192 WindowStatePerAgg peraggstate);
193 static Datum GetAggInitVal(Datum textInitVal, Oid transtype);
194
195 static bool are_peers(WindowAggState *winstate, TupleTableSlot *slot1,
196 TupleTableSlot *slot2);
197 static bool window_gettupleslot(WindowObject winobj, int64 pos,
198 TupleTableSlot *slot);
199
200
201 /*
202 * initialize_windowaggregate
203 * parallel to initialize_aggregates in nodeAgg.c
204 */
205 static void
initialize_windowaggregate(WindowAggState * winstate,WindowStatePerFunc perfuncstate,WindowStatePerAgg peraggstate)206 initialize_windowaggregate(WindowAggState *winstate,
207 WindowStatePerFunc perfuncstate,
208 WindowStatePerAgg peraggstate)
209 {
210 MemoryContext oldContext;
211
212 /*
213 * If we're using a private aggcontext, we may reset it here. But if the
214 * context is shared, we don't know which other aggregates may still need
215 * it, so we must leave it to the caller to reset at an appropriate time.
216 */
217 if (peraggstate->aggcontext != winstate->aggcontext)
218 MemoryContextResetAndDeleteChildren(peraggstate->aggcontext);
219
220 if (peraggstate->initValueIsNull)
221 peraggstate->transValue = peraggstate->initValue;
222 else
223 {
224 oldContext = MemoryContextSwitchTo(peraggstate->aggcontext);
225 peraggstate->transValue = datumCopy(peraggstate->initValue,
226 peraggstate->transtypeByVal,
227 peraggstate->transtypeLen);
228 MemoryContextSwitchTo(oldContext);
229 }
230 peraggstate->transValueIsNull = peraggstate->initValueIsNull;
231 peraggstate->transValueCount = 0;
232 peraggstate->resultValue = (Datum) 0;
233 peraggstate->resultValueIsNull = true;
234 }
235
236 /*
237 * advance_windowaggregate
238 * parallel to advance_aggregates in nodeAgg.c
239 */
240 static void
advance_windowaggregate(WindowAggState * winstate,WindowStatePerFunc perfuncstate,WindowStatePerAgg peraggstate)241 advance_windowaggregate(WindowAggState *winstate,
242 WindowStatePerFunc perfuncstate,
243 WindowStatePerAgg peraggstate)
244 {
245 LOCAL_FCINFO(fcinfo, FUNC_MAX_ARGS);
246 WindowFuncExprState *wfuncstate = perfuncstate->wfuncstate;
247 int numArguments = perfuncstate->numArguments;
248 Datum newVal;
249 ListCell *arg;
250 int i;
251 MemoryContext oldContext;
252 ExprContext *econtext = winstate->tmpcontext;
253 ExprState *filter = wfuncstate->aggfilter;
254
255 oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
256
257 /* Skip anything FILTERed out */
258 if (filter)
259 {
260 bool isnull;
261 Datum res = ExecEvalExpr(filter, econtext, &isnull);
262
263 if (isnull || !DatumGetBool(res))
264 {
265 MemoryContextSwitchTo(oldContext);
266 return;
267 }
268 }
269
270 /* We start from 1, since the 0th arg will be the transition value */
271 i = 1;
272 foreach(arg, wfuncstate->args)
273 {
274 ExprState *argstate = (ExprState *) lfirst(arg);
275
276 fcinfo->args[i].value = ExecEvalExpr(argstate, econtext,
277 &fcinfo->args[i].isnull);
278 i++;
279 }
280
281 if (peraggstate->transfn.fn_strict)
282 {
283 /*
284 * For a strict transfn, nothing happens when there's a NULL input; we
285 * just keep the prior transValue. Note transValueCount doesn't
286 * change either.
287 */
288 for (i = 1; i <= numArguments; i++)
289 {
290 if (fcinfo->args[i].isnull)
291 {
292 MemoryContextSwitchTo(oldContext);
293 return;
294 }
295 }
296
297 /*
298 * For strict transition functions with initial value NULL we use the
299 * first non-NULL input as the initial state. (We already checked
300 * that the agg's input type is binary-compatible with its transtype,
301 * so straight copy here is OK.)
302 *
303 * We must copy the datum into aggcontext if it is pass-by-ref. We do
304 * not need to pfree the old transValue, since it's NULL.
305 */
306 if (peraggstate->transValueCount == 0 && peraggstate->transValueIsNull)
307 {
308 MemoryContextSwitchTo(peraggstate->aggcontext);
309 peraggstate->transValue = datumCopy(fcinfo->args[1].value,
310 peraggstate->transtypeByVal,
311 peraggstate->transtypeLen);
312 peraggstate->transValueIsNull = false;
313 peraggstate->transValueCount = 1;
314 MemoryContextSwitchTo(oldContext);
315 return;
316 }
317
318 if (peraggstate->transValueIsNull)
319 {
320 /*
321 * Don't call a strict function with NULL inputs. Note it is
322 * possible to get here despite the above tests, if the transfn is
323 * strict *and* returned a NULL on a prior cycle. If that happens
324 * we will propagate the NULL all the way to the end. That can
325 * only happen if there's no inverse transition function, though,
326 * since we disallow transitions back to NULL when there is one.
327 */
328 MemoryContextSwitchTo(oldContext);
329 Assert(!OidIsValid(peraggstate->invtransfn_oid));
330 return;
331 }
332 }
333
334 /*
335 * OK to call the transition function. Set winstate->curaggcontext while
336 * calling it, for possible use by AggCheckCallContext.
337 */
338 InitFunctionCallInfoData(*fcinfo, &(peraggstate->transfn),
339 numArguments + 1,
340 perfuncstate->winCollation,
341 (void *) winstate, NULL);
342 fcinfo->args[0].value = peraggstate->transValue;
343 fcinfo->args[0].isnull = peraggstate->transValueIsNull;
344 winstate->curaggcontext = peraggstate->aggcontext;
345 newVal = FunctionCallInvoke(fcinfo);
346 winstate->curaggcontext = NULL;
347
348 /*
349 * Moving-aggregate transition functions must not return null, see
350 * advance_windowaggregate_base().
351 */
352 if (fcinfo->isnull && OidIsValid(peraggstate->invtransfn_oid))
353 ereport(ERROR,
354 (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
355 errmsg("moving-aggregate transition function must not return null")));
356
357 /*
358 * We must track the number of rows included in transValue, since to
359 * remove the last input, advance_windowaggregate_base() mustn't call the
360 * inverse transition function, but simply reset transValue back to its
361 * initial value.
362 */
363 peraggstate->transValueCount++;
364
365 /*
366 * If pass-by-ref datatype, must copy the new value into aggcontext and
367 * free the prior transValue. But if transfn returned a pointer to its
368 * first input, we don't need to do anything. Also, if transfn returned a
369 * pointer to a R/W expanded object that is already a child of the
370 * aggcontext, assume we can adopt that value without copying it.
371 */
372 if (!peraggstate->transtypeByVal &&
373 DatumGetPointer(newVal) != DatumGetPointer(peraggstate->transValue))
374 {
375 if (!fcinfo->isnull)
376 {
377 MemoryContextSwitchTo(peraggstate->aggcontext);
378 if (DatumIsReadWriteExpandedObject(newVal,
379 false,
380 peraggstate->transtypeLen) &&
381 MemoryContextGetParent(DatumGetEOHP(newVal)->eoh_context) == CurrentMemoryContext)
382 /* do nothing */ ;
383 else
384 newVal = datumCopy(newVal,
385 peraggstate->transtypeByVal,
386 peraggstate->transtypeLen);
387 }
388 if (!peraggstate->transValueIsNull)
389 {
390 if (DatumIsReadWriteExpandedObject(peraggstate->transValue,
391 false,
392 peraggstate->transtypeLen))
393 DeleteExpandedObject(peraggstate->transValue);
394 else
395 pfree(DatumGetPointer(peraggstate->transValue));
396 }
397 }
398
399 MemoryContextSwitchTo(oldContext);
400 peraggstate->transValue = newVal;
401 peraggstate->transValueIsNull = fcinfo->isnull;
402 }
403
404 /*
405 * advance_windowaggregate_base
406 * Remove the oldest tuple from an aggregation.
407 *
408 * This is very much like advance_windowaggregate, except that we will call
409 * the inverse transition function (which caller must have checked is
410 * available).
411 *
412 * Returns true if we successfully removed the current row from this
413 * aggregate, false if not (in the latter case, caller is responsible
414 * for cleaning up by restarting the aggregation).
415 */
416 static bool
advance_windowaggregate_base(WindowAggState * winstate,WindowStatePerFunc perfuncstate,WindowStatePerAgg peraggstate)417 advance_windowaggregate_base(WindowAggState *winstate,
418 WindowStatePerFunc perfuncstate,
419 WindowStatePerAgg peraggstate)
420 {
421 LOCAL_FCINFO(fcinfo, FUNC_MAX_ARGS);
422 WindowFuncExprState *wfuncstate = perfuncstate->wfuncstate;
423 int numArguments = perfuncstate->numArguments;
424 Datum newVal;
425 ListCell *arg;
426 int i;
427 MemoryContext oldContext;
428 ExprContext *econtext = winstate->tmpcontext;
429 ExprState *filter = wfuncstate->aggfilter;
430
431 oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
432
433 /* Skip anything FILTERed out */
434 if (filter)
435 {
436 bool isnull;
437 Datum res = ExecEvalExpr(filter, econtext, &isnull);
438
439 if (isnull || !DatumGetBool(res))
440 {
441 MemoryContextSwitchTo(oldContext);
442 return true;
443 }
444 }
445
446 /* We start from 1, since the 0th arg will be the transition value */
447 i = 1;
448 foreach(arg, wfuncstate->args)
449 {
450 ExprState *argstate = (ExprState *) lfirst(arg);
451
452 fcinfo->args[i].value = ExecEvalExpr(argstate, econtext,
453 &fcinfo->args[i].isnull);
454 i++;
455 }
456
457 if (peraggstate->invtransfn.fn_strict)
458 {
459 /*
460 * For a strict (inv)transfn, nothing happens when there's a NULL
461 * input; we just keep the prior transValue. Note transValueCount
462 * doesn't change either.
463 */
464 for (i = 1; i <= numArguments; i++)
465 {
466 if (fcinfo->args[i].isnull)
467 {
468 MemoryContextSwitchTo(oldContext);
469 return true;
470 }
471 }
472 }
473
474 /* There should still be an added but not yet removed value */
475 Assert(peraggstate->transValueCount > 0);
476
477 /*
478 * In moving-aggregate mode, the state must never be NULL, except possibly
479 * before any rows have been aggregated (which is surely not the case at
480 * this point). This restriction allows us to interpret a NULL result
481 * from the inverse function as meaning "sorry, can't do an inverse
482 * transition in this case". We already checked this in
483 * advance_windowaggregate, but just for safety, check again.
484 */
485 if (peraggstate->transValueIsNull)
486 elog(ERROR, "aggregate transition value is NULL before inverse transition");
487
488 /*
489 * We mustn't use the inverse transition function to remove the last
490 * input. Doing so would yield a non-NULL state, whereas we should be in
491 * the initial state afterwards which may very well be NULL. So instead,
492 * we simply re-initialize the aggregate in this case.
493 */
494 if (peraggstate->transValueCount == 1)
495 {
496 MemoryContextSwitchTo(oldContext);
497 initialize_windowaggregate(winstate,
498 &winstate->perfunc[peraggstate->wfuncno],
499 peraggstate);
500 return true;
501 }
502
503 /*
504 * OK to call the inverse transition function. Set
505 * winstate->curaggcontext while calling it, for possible use by
506 * AggCheckCallContext.
507 */
508 InitFunctionCallInfoData(*fcinfo, &(peraggstate->invtransfn),
509 numArguments + 1,
510 perfuncstate->winCollation,
511 (void *) winstate, NULL);
512 fcinfo->args[0].value = peraggstate->transValue;
513 fcinfo->args[0].isnull = peraggstate->transValueIsNull;
514 winstate->curaggcontext = peraggstate->aggcontext;
515 newVal = FunctionCallInvoke(fcinfo);
516 winstate->curaggcontext = NULL;
517
518 /*
519 * If the function returns NULL, report failure, forcing a restart.
520 */
521 if (fcinfo->isnull)
522 {
523 MemoryContextSwitchTo(oldContext);
524 return false;
525 }
526
527 /* Update number of rows included in transValue */
528 peraggstate->transValueCount--;
529
530 /*
531 * If pass-by-ref datatype, must copy the new value into aggcontext and
532 * free the prior transValue. But if invtransfn returned a pointer to its
533 * first input, we don't need to do anything. Also, if invtransfn
534 * returned a pointer to a R/W expanded object that is already a child of
535 * the aggcontext, assume we can adopt that value without copying it.
536 *
537 * Note: the checks for null values here will never fire, but it seems
538 * best to have this stanza look just like advance_windowaggregate.
539 */
540 if (!peraggstate->transtypeByVal &&
541 DatumGetPointer(newVal) != DatumGetPointer(peraggstate->transValue))
542 {
543 if (!fcinfo->isnull)
544 {
545 MemoryContextSwitchTo(peraggstate->aggcontext);
546 if (DatumIsReadWriteExpandedObject(newVal,
547 false,
548 peraggstate->transtypeLen) &&
549 MemoryContextGetParent(DatumGetEOHP(newVal)->eoh_context) == CurrentMemoryContext)
550 /* do nothing */ ;
551 else
552 newVal = datumCopy(newVal,
553 peraggstate->transtypeByVal,
554 peraggstate->transtypeLen);
555 }
556 if (!peraggstate->transValueIsNull)
557 {
558 if (DatumIsReadWriteExpandedObject(peraggstate->transValue,
559 false,
560 peraggstate->transtypeLen))
561 DeleteExpandedObject(peraggstate->transValue);
562 else
563 pfree(DatumGetPointer(peraggstate->transValue));
564 }
565 }
566
567 MemoryContextSwitchTo(oldContext);
568 peraggstate->transValue = newVal;
569 peraggstate->transValueIsNull = fcinfo->isnull;
570
571 return true;
572 }
573
574 /*
575 * finalize_windowaggregate
576 * parallel to finalize_aggregate in nodeAgg.c
577 */
578 static void
finalize_windowaggregate(WindowAggState * winstate,WindowStatePerFunc perfuncstate,WindowStatePerAgg peraggstate,Datum * result,bool * isnull)579 finalize_windowaggregate(WindowAggState *winstate,
580 WindowStatePerFunc perfuncstate,
581 WindowStatePerAgg peraggstate,
582 Datum *result, bool *isnull)
583 {
584 MemoryContext oldContext;
585
586 oldContext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
587
588 /*
589 * Apply the agg's finalfn if one is provided, else return transValue.
590 */
591 if (OidIsValid(peraggstate->finalfn_oid))
592 {
593 LOCAL_FCINFO(fcinfo, FUNC_MAX_ARGS);
594 int numFinalArgs = peraggstate->numFinalArgs;
595 bool anynull;
596 int i;
597
598 InitFunctionCallInfoData(fcinfodata.fcinfo, &(peraggstate->finalfn),
599 numFinalArgs,
600 perfuncstate->winCollation,
601 (void *) winstate, NULL);
602 fcinfo->args[0].value =
603 MakeExpandedObjectReadOnly(peraggstate->transValue,
604 peraggstate->transValueIsNull,
605 peraggstate->transtypeLen);
606 fcinfo->args[0].isnull = peraggstate->transValueIsNull;
607 anynull = peraggstate->transValueIsNull;
608
609 /* Fill any remaining argument positions with nulls */
610 for (i = 1; i < numFinalArgs; i++)
611 {
612 fcinfo->args[i].value = (Datum) 0;
613 fcinfo->args[i].isnull = true;
614 anynull = true;
615 }
616
617 if (fcinfo->flinfo->fn_strict && anynull)
618 {
619 /* don't call a strict function with NULL inputs */
620 *result = (Datum) 0;
621 *isnull = true;
622 }
623 else
624 {
625 winstate->curaggcontext = peraggstate->aggcontext;
626 *result = FunctionCallInvoke(fcinfo);
627 winstate->curaggcontext = NULL;
628 *isnull = fcinfo->isnull;
629 }
630 }
631 else
632 {
633 /* Don't need MakeExpandedObjectReadOnly; datumCopy will copy it */
634 *result = peraggstate->transValue;
635 *isnull = peraggstate->transValueIsNull;
636 }
637
638 /*
639 * If result is pass-by-ref, make sure it is in the right context.
640 */
641 if (!peraggstate->resulttypeByVal && !*isnull &&
642 !MemoryContextContains(CurrentMemoryContext,
643 DatumGetPointer(*result)))
644 *result = datumCopy(*result,
645 peraggstate->resulttypeByVal,
646 peraggstate->resulttypeLen);
647 MemoryContextSwitchTo(oldContext);
648 }
649
650 /*
651 * eval_windowaggregates
652 * evaluate plain aggregates being used as window functions
653 *
654 * This differs from nodeAgg.c in two ways. First, if the window's frame
655 * start position moves, we use the inverse transition function (if it exists)
656 * to remove rows from the transition value. And second, we expect to be
657 * able to call aggregate final functions repeatedly after aggregating more
658 * data onto the same transition value. This is not a behavior required by
659 * nodeAgg.c.
660 */
661 static void
eval_windowaggregates(WindowAggState * winstate)662 eval_windowaggregates(WindowAggState *winstate)
663 {
664 WindowStatePerAgg peraggstate;
665 int wfuncno,
666 numaggs,
667 numaggs_restart,
668 i;
669 int64 aggregatedupto_nonrestarted;
670 MemoryContext oldContext;
671 ExprContext *econtext;
672 WindowObject agg_winobj;
673 TupleTableSlot *agg_row_slot;
674 TupleTableSlot *temp_slot;
675
676 numaggs = winstate->numaggs;
677 if (numaggs == 0)
678 return; /* nothing to do */
679
680 /* final output execution is in ps_ExprContext */
681 econtext = winstate->ss.ps.ps_ExprContext;
682 agg_winobj = winstate->agg_winobj;
683 agg_row_slot = winstate->agg_row_slot;
684 temp_slot = winstate->temp_slot_1;
685
686 /*
687 * If the window's frame start clause is UNBOUNDED_PRECEDING and no
688 * exclusion clause is specified, then the window frame consists of a
689 * contiguous group of rows extending forward from the start of the
690 * partition, and rows only enter the frame, never exit it, as the current
691 * row advances forward. This makes it possible to use an incremental
692 * strategy for evaluating aggregates: we run the transition function for
693 * each row added to the frame, and run the final function whenever we
694 * need the current aggregate value. This is considerably more efficient
695 * than the naive approach of re-running the entire aggregate calculation
696 * for each current row. It does assume that the final function doesn't
697 * damage the running transition value, but we have the same assumption in
698 * nodeAgg.c too (when it rescans an existing hash table).
699 *
700 * If the frame start does sometimes move, we can still optimize as above
701 * whenever successive rows share the same frame head, but if the frame
702 * head moves beyond the previous head we try to remove those rows using
703 * the aggregate's inverse transition function. This function restores
704 * the aggregate's current state to what it would be if the removed row
705 * had never been aggregated in the first place. Inverse transition
706 * functions may optionally return NULL, indicating that the function was
707 * unable to remove the tuple from aggregation. If this happens, or if
708 * the aggregate doesn't have an inverse transition function at all, we
709 * must perform the aggregation all over again for all tuples within the
710 * new frame boundaries.
711 *
712 * If there's any exclusion clause, then we may have to aggregate over a
713 * non-contiguous set of rows, so we punt and recalculate for every row.
714 * (For some frame end choices, it might be that the frame is always
715 * contiguous anyway, but that's an optimization to investigate later.)
716 *
717 * In many common cases, multiple rows share the same frame and hence the
718 * same aggregate value. (In particular, if there's no ORDER BY in a RANGE
719 * window, then all rows are peers and so they all have window frame equal
720 * to the whole partition.) We optimize such cases by calculating the
721 * aggregate value once when we reach the first row of a peer group, and
722 * then returning the saved value for all subsequent rows.
723 *
724 * 'aggregatedupto' keeps track of the first row that has not yet been
725 * accumulated into the aggregate transition values. Whenever we start a
726 * new peer group, we accumulate forward to the end of the peer group.
727 */
728
729 /*
730 * First, update the frame head position.
731 *
732 * The frame head should never move backwards, and the code below wouldn't
733 * cope if it did, so for safety we complain if it does.
734 */
735 update_frameheadpos(winstate);
736 if (winstate->frameheadpos < winstate->aggregatedbase)
737 elog(ERROR, "window frame head moved backward");
738
739 /*
740 * If the frame didn't change compared to the previous row, we can re-use
741 * the result values that were previously saved at the bottom of this
742 * function. Since we don't know the current frame's end yet, this is not
743 * possible to check for fully. But if the frame end mode is UNBOUNDED
744 * FOLLOWING or CURRENT ROW, no exclusion clause is specified, and the
745 * current row lies within the previous row's frame, then the two frames'
746 * ends must coincide. Note that on the first row aggregatedbase ==
747 * aggregatedupto, meaning this test must fail, so we don't need to check
748 * the "there was no previous row" case explicitly here.
749 */
750 if (winstate->aggregatedbase == winstate->frameheadpos &&
751 (winstate->frameOptions & (FRAMEOPTION_END_UNBOUNDED_FOLLOWING |
752 FRAMEOPTION_END_CURRENT_ROW)) &&
753 !(winstate->frameOptions & FRAMEOPTION_EXCLUSION) &&
754 winstate->aggregatedbase <= winstate->currentpos &&
755 winstate->aggregatedupto > winstate->currentpos)
756 {
757 for (i = 0; i < numaggs; i++)
758 {
759 peraggstate = &winstate->peragg[i];
760 wfuncno = peraggstate->wfuncno;
761 econtext->ecxt_aggvalues[wfuncno] = peraggstate->resultValue;
762 econtext->ecxt_aggnulls[wfuncno] = peraggstate->resultValueIsNull;
763 }
764 return;
765 }
766
767 /*----------
768 * Initialize restart flags.
769 *
770 * We restart the aggregation:
771 * - if we're processing the first row in the partition, or
772 * - if the frame's head moved and we cannot use an inverse
773 * transition function, or
774 * - we have an EXCLUSION clause, or
775 * - if the new frame doesn't overlap the old one
776 *
777 * Note that we don't strictly need to restart in the last case, but if
778 * we're going to remove all rows from the aggregation anyway, a restart
779 * surely is faster.
780 *----------
781 */
782 numaggs_restart = 0;
783 for (i = 0; i < numaggs; i++)
784 {
785 peraggstate = &winstate->peragg[i];
786 if (winstate->currentpos == 0 ||
787 (winstate->aggregatedbase != winstate->frameheadpos &&
788 !OidIsValid(peraggstate->invtransfn_oid)) ||
789 (winstate->frameOptions & FRAMEOPTION_EXCLUSION) ||
790 winstate->aggregatedupto <= winstate->frameheadpos)
791 {
792 peraggstate->restart = true;
793 numaggs_restart++;
794 }
795 else
796 peraggstate->restart = false;
797 }
798
799 /*
800 * If we have any possibly-moving aggregates, attempt to advance
801 * aggregatedbase to match the frame's head by removing input rows that
802 * fell off the top of the frame from the aggregations. This can fail,
803 * i.e. advance_windowaggregate_base() can return false, in which case
804 * we'll restart that aggregate below.
805 */
806 while (numaggs_restart < numaggs &&
807 winstate->aggregatedbase < winstate->frameheadpos)
808 {
809 /*
810 * Fetch the next tuple of those being removed. This should never fail
811 * as we should have been here before.
812 */
813 if (!window_gettupleslot(agg_winobj, winstate->aggregatedbase,
814 temp_slot))
815 elog(ERROR, "could not re-fetch previously fetched frame row");
816
817 /* Set tuple context for evaluation of aggregate arguments */
818 winstate->tmpcontext->ecxt_outertuple = temp_slot;
819
820 /*
821 * Perform the inverse transition for each aggregate function in the
822 * window, unless it has already been marked as needing a restart.
823 */
824 for (i = 0; i < numaggs; i++)
825 {
826 bool ok;
827
828 peraggstate = &winstate->peragg[i];
829 if (peraggstate->restart)
830 continue;
831
832 wfuncno = peraggstate->wfuncno;
833 ok = advance_windowaggregate_base(winstate,
834 &winstate->perfunc[wfuncno],
835 peraggstate);
836 if (!ok)
837 {
838 /* Inverse transition function has failed, must restart */
839 peraggstate->restart = true;
840 numaggs_restart++;
841 }
842 }
843
844 /* Reset per-input-tuple context after each tuple */
845 ResetExprContext(winstate->tmpcontext);
846
847 /* And advance the aggregated-row state */
848 winstate->aggregatedbase++;
849 ExecClearTuple(temp_slot);
850 }
851
852 /*
853 * If we successfully advanced the base rows of all the aggregates,
854 * aggregatedbase now equals frameheadpos; but if we failed for any, we
855 * must forcibly update aggregatedbase.
856 */
857 winstate->aggregatedbase = winstate->frameheadpos;
858
859 /*
860 * If we created a mark pointer for aggregates, keep it pushed up to frame
861 * head, so that tuplestore can discard unnecessary rows.
862 */
863 if (agg_winobj->markptr >= 0)
864 WinSetMarkPosition(agg_winobj, winstate->frameheadpos);
865
866 /*
867 * Now restart the aggregates that require it.
868 *
869 * We assume that aggregates using the shared context always restart if
870 * *any* aggregate restarts, and we may thus clean up the shared
871 * aggcontext if that is the case. Private aggcontexts are reset by
872 * initialize_windowaggregate() if their owning aggregate restarts. If we
873 * aren't restarting an aggregate, we need to free any previously saved
874 * result for it, else we'll leak memory.
875 */
876 if (numaggs_restart > 0)
877 MemoryContextResetAndDeleteChildren(winstate->aggcontext);
878 for (i = 0; i < numaggs; i++)
879 {
880 peraggstate = &winstate->peragg[i];
881
882 /* Aggregates using the shared ctx must restart if *any* agg does */
883 Assert(peraggstate->aggcontext != winstate->aggcontext ||
884 numaggs_restart == 0 ||
885 peraggstate->restart);
886
887 if (peraggstate->restart)
888 {
889 wfuncno = peraggstate->wfuncno;
890 initialize_windowaggregate(winstate,
891 &winstate->perfunc[wfuncno],
892 peraggstate);
893 }
894 else if (!peraggstate->resultValueIsNull)
895 {
896 if (!peraggstate->resulttypeByVal)
897 pfree(DatumGetPointer(peraggstate->resultValue));
898 peraggstate->resultValue = (Datum) 0;
899 peraggstate->resultValueIsNull = true;
900 }
901 }
902
903 /*
904 * Non-restarted aggregates now contain the rows between aggregatedbase
905 * (i.e., frameheadpos) and aggregatedupto, while restarted aggregates
906 * contain no rows. If there are any restarted aggregates, we must thus
907 * begin aggregating anew at frameheadpos, otherwise we may simply
908 * continue at aggregatedupto. We must remember the old value of
909 * aggregatedupto to know how long to skip advancing non-restarted
910 * aggregates. If we modify aggregatedupto, we must also clear
911 * agg_row_slot, per the loop invariant below.
912 */
913 aggregatedupto_nonrestarted = winstate->aggregatedupto;
914 if (numaggs_restart > 0 &&
915 winstate->aggregatedupto != winstate->frameheadpos)
916 {
917 winstate->aggregatedupto = winstate->frameheadpos;
918 ExecClearTuple(agg_row_slot);
919 }
920
921 /*
922 * Advance until we reach a row not in frame (or end of partition).
923 *
924 * Note the loop invariant: agg_row_slot is either empty or holds the row
925 * at position aggregatedupto. We advance aggregatedupto after processing
926 * a row.
927 */
928 for (;;)
929 {
930 int ret;
931
932 /* Fetch next row if we didn't already */
933 if (TupIsNull(agg_row_slot))
934 {
935 if (!window_gettupleslot(agg_winobj, winstate->aggregatedupto,
936 agg_row_slot))
937 break; /* must be end of partition */
938 }
939
940 /*
941 * Exit loop if no more rows can be in frame. Skip aggregation if
942 * current row is not in frame but there might be more in the frame.
943 */
944 ret = row_is_in_frame(winstate, winstate->aggregatedupto, agg_row_slot);
945 if (ret < 0)
946 break;
947 if (ret == 0)
948 goto next_tuple;
949
950 /* Set tuple context for evaluation of aggregate arguments */
951 winstate->tmpcontext->ecxt_outertuple = agg_row_slot;
952
953 /* Accumulate row into the aggregates */
954 for (i = 0; i < numaggs; i++)
955 {
956 peraggstate = &winstate->peragg[i];
957
958 /* Non-restarted aggs skip until aggregatedupto_nonrestarted */
959 if (!peraggstate->restart &&
960 winstate->aggregatedupto < aggregatedupto_nonrestarted)
961 continue;
962
963 wfuncno = peraggstate->wfuncno;
964 advance_windowaggregate(winstate,
965 &winstate->perfunc[wfuncno],
966 peraggstate);
967 }
968
969 next_tuple:
970 /* Reset per-input-tuple context after each tuple */
971 ResetExprContext(winstate->tmpcontext);
972
973 /* And advance the aggregated-row state */
974 winstate->aggregatedupto++;
975 ExecClearTuple(agg_row_slot);
976 }
977
978 /* The frame's end is not supposed to move backwards, ever */
979 Assert(aggregatedupto_nonrestarted <= winstate->aggregatedupto);
980
981 /*
982 * finalize aggregates and fill result/isnull fields.
983 */
984 for (i = 0; i < numaggs; i++)
985 {
986 Datum *result;
987 bool *isnull;
988
989 peraggstate = &winstate->peragg[i];
990 wfuncno = peraggstate->wfuncno;
991 result = &econtext->ecxt_aggvalues[wfuncno];
992 isnull = &econtext->ecxt_aggnulls[wfuncno];
993 finalize_windowaggregate(winstate,
994 &winstate->perfunc[wfuncno],
995 peraggstate,
996 result, isnull);
997
998 /*
999 * save the result in case next row shares the same frame.
1000 *
1001 * XXX in some framing modes, eg ROWS/END_CURRENT_ROW, we can know in
1002 * advance that the next row can't possibly share the same frame. Is
1003 * it worth detecting that and skipping this code?
1004 */
1005 if (!peraggstate->resulttypeByVal && !*isnull)
1006 {
1007 oldContext = MemoryContextSwitchTo(peraggstate->aggcontext);
1008 peraggstate->resultValue =
1009 datumCopy(*result,
1010 peraggstate->resulttypeByVal,
1011 peraggstate->resulttypeLen);
1012 MemoryContextSwitchTo(oldContext);
1013 }
1014 else
1015 {
1016 peraggstate->resultValue = *result;
1017 }
1018 peraggstate->resultValueIsNull = *isnull;
1019 }
1020 }
1021
1022 /*
1023 * eval_windowfunction
1024 *
1025 * Arguments of window functions are not evaluated here, because a window
1026 * function can need random access to arbitrary rows in the partition.
1027 * The window function uses the special WinGetFuncArgInPartition and
1028 * WinGetFuncArgInFrame functions to evaluate the arguments for the rows
1029 * it wants.
1030 */
1031 static void
eval_windowfunction(WindowAggState * winstate,WindowStatePerFunc perfuncstate,Datum * result,bool * isnull)1032 eval_windowfunction(WindowAggState *winstate, WindowStatePerFunc perfuncstate,
1033 Datum *result, bool *isnull)
1034 {
1035 LOCAL_FCINFO(fcinfo, FUNC_MAX_ARGS);
1036 MemoryContext oldContext;
1037
1038 oldContext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
1039
1040 /*
1041 * We don't pass any normal arguments to a window function, but we do pass
1042 * it the number of arguments, in order to permit window function
1043 * implementations to support varying numbers of arguments. The real info
1044 * goes through the WindowObject, which is passed via fcinfo->context.
1045 */
1046 InitFunctionCallInfoData(*fcinfo, &(perfuncstate->flinfo),
1047 perfuncstate->numArguments,
1048 perfuncstate->winCollation,
1049 (void *) perfuncstate->winobj, NULL);
1050 /* Just in case, make all the regular argument slots be null */
1051 for (int argno = 0; argno < perfuncstate->numArguments; argno++)
1052 fcinfo->args[argno].isnull = true;
1053 /* Window functions don't have a current aggregate context, either */
1054 winstate->curaggcontext = NULL;
1055
1056 *result = FunctionCallInvoke(fcinfo);
1057 *isnull = fcinfo->isnull;
1058
1059 /*
1060 * Make sure pass-by-ref data is allocated in the appropriate context. (We
1061 * need this in case the function returns a pointer into some short-lived
1062 * tuple, as is entirely possible.)
1063 */
1064 if (!perfuncstate->resulttypeByVal && !fcinfo->isnull &&
1065 !MemoryContextContains(CurrentMemoryContext,
1066 DatumGetPointer(*result)))
1067 *result = datumCopy(*result,
1068 perfuncstate->resulttypeByVal,
1069 perfuncstate->resulttypeLen);
1070
1071 MemoryContextSwitchTo(oldContext);
1072 }
1073
1074 /*
1075 * begin_partition
1076 * Start buffering rows of the next partition.
1077 */
1078 static void
begin_partition(WindowAggState * winstate)1079 begin_partition(WindowAggState *winstate)
1080 {
1081 WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
1082 PlanState *outerPlan = outerPlanState(winstate);
1083 int frameOptions = winstate->frameOptions;
1084 int numfuncs = winstate->numfuncs;
1085 int i;
1086
1087 winstate->partition_spooled = false;
1088 winstate->framehead_valid = false;
1089 winstate->frametail_valid = false;
1090 winstate->grouptail_valid = false;
1091 winstate->spooled_rows = 0;
1092 winstate->currentpos = 0;
1093 winstate->frameheadpos = 0;
1094 winstate->frametailpos = 0;
1095 winstate->currentgroup = 0;
1096 winstate->frameheadgroup = 0;
1097 winstate->frametailgroup = 0;
1098 winstate->groupheadpos = 0;
1099 winstate->grouptailpos = -1; /* see update_grouptailpos */
1100 ExecClearTuple(winstate->agg_row_slot);
1101 if (winstate->framehead_slot)
1102 ExecClearTuple(winstate->framehead_slot);
1103 if (winstate->frametail_slot)
1104 ExecClearTuple(winstate->frametail_slot);
1105
1106 /*
1107 * If this is the very first partition, we need to fetch the first input
1108 * row to store in first_part_slot.
1109 */
1110 if (TupIsNull(winstate->first_part_slot))
1111 {
1112 TupleTableSlot *outerslot = ExecProcNode(outerPlan);
1113
1114 if (!TupIsNull(outerslot))
1115 ExecCopySlot(winstate->first_part_slot, outerslot);
1116 else
1117 {
1118 /* outer plan is empty, so we have nothing to do */
1119 winstate->partition_spooled = true;
1120 winstate->more_partitions = false;
1121 return;
1122 }
1123 }
1124
1125 /* Create new tuplestore for this partition */
1126 winstate->buffer = tuplestore_begin_heap(false, false, work_mem);
1127
1128 /*
1129 * Set up read pointers for the tuplestore. The current pointer doesn't
1130 * need BACKWARD capability, but the per-window-function read pointers do,
1131 * and the aggregate pointer does if we might need to restart aggregation.
1132 */
1133 winstate->current_ptr = 0; /* read pointer 0 is pre-allocated */
1134
1135 /* reset default REWIND capability bit for current ptr */
1136 tuplestore_set_eflags(winstate->buffer, 0);
1137
1138 /* create read pointers for aggregates, if needed */
1139 if (winstate->numaggs > 0)
1140 {
1141 WindowObject agg_winobj = winstate->agg_winobj;
1142 int readptr_flags = 0;
1143
1144 /*
1145 * If the frame head is potentially movable, or we have an EXCLUSION
1146 * clause, we might need to restart aggregation ...
1147 */
1148 if (!(frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING) ||
1149 (frameOptions & FRAMEOPTION_EXCLUSION))
1150 {
1151 /* ... so create a mark pointer to track the frame head */
1152 agg_winobj->markptr = tuplestore_alloc_read_pointer(winstate->buffer, 0);
1153 /* and the read pointer will need BACKWARD capability */
1154 readptr_flags |= EXEC_FLAG_BACKWARD;
1155 }
1156
1157 agg_winobj->readptr = tuplestore_alloc_read_pointer(winstate->buffer,
1158 readptr_flags);
1159 agg_winobj->markpos = -1;
1160 agg_winobj->seekpos = -1;
1161
1162 /* Also reset the row counters for aggregates */
1163 winstate->aggregatedbase = 0;
1164 winstate->aggregatedupto = 0;
1165 }
1166
1167 /* create mark and read pointers for each real window function */
1168 for (i = 0; i < numfuncs; i++)
1169 {
1170 WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
1171
1172 if (!perfuncstate->plain_agg)
1173 {
1174 WindowObject winobj = perfuncstate->winobj;
1175
1176 winobj->markptr = tuplestore_alloc_read_pointer(winstate->buffer,
1177 0);
1178 winobj->readptr = tuplestore_alloc_read_pointer(winstate->buffer,
1179 EXEC_FLAG_BACKWARD);
1180 winobj->markpos = -1;
1181 winobj->seekpos = -1;
1182 }
1183 }
1184
1185 /*
1186 * If we are in RANGE or GROUPS mode, then determining frame boundaries
1187 * requires physical access to the frame endpoint rows, except in certain
1188 * degenerate cases. We create read pointers to point to those rows, to
1189 * simplify access and ensure that the tuplestore doesn't discard the
1190 * endpoint rows prematurely. (Must create pointers in exactly the same
1191 * cases that update_frameheadpos and update_frametailpos need them.)
1192 */
1193 winstate->framehead_ptr = winstate->frametail_ptr = -1; /* if not used */
1194
1195 if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS))
1196 {
1197 if (((frameOptions & FRAMEOPTION_START_CURRENT_ROW) &&
1198 node->ordNumCols != 0) ||
1199 (frameOptions & FRAMEOPTION_START_OFFSET))
1200 winstate->framehead_ptr =
1201 tuplestore_alloc_read_pointer(winstate->buffer, 0);
1202 if (((frameOptions & FRAMEOPTION_END_CURRENT_ROW) &&
1203 node->ordNumCols != 0) ||
1204 (frameOptions & FRAMEOPTION_END_OFFSET))
1205 winstate->frametail_ptr =
1206 tuplestore_alloc_read_pointer(winstate->buffer, 0);
1207 }
1208
1209 /*
1210 * If we have an exclusion clause that requires knowing the boundaries of
1211 * the current row's peer group, we create a read pointer to track the
1212 * tail position of the peer group (i.e., first row of the next peer
1213 * group). The head position does not require its own pointer because we
1214 * maintain that as a side effect of advancing the current row.
1215 */
1216 winstate->grouptail_ptr = -1;
1217
1218 if ((frameOptions & (FRAMEOPTION_EXCLUDE_GROUP |
1219 FRAMEOPTION_EXCLUDE_TIES)) &&
1220 node->ordNumCols != 0)
1221 {
1222 winstate->grouptail_ptr =
1223 tuplestore_alloc_read_pointer(winstate->buffer, 0);
1224 }
1225
1226 /*
1227 * Store the first tuple into the tuplestore (it's always available now;
1228 * we either read it above, or saved it at the end of previous partition)
1229 */
1230 tuplestore_puttupleslot(winstate->buffer, winstate->first_part_slot);
1231 winstate->spooled_rows++;
1232 }
1233
1234 /*
1235 * Read tuples from the outer node, up to and including position 'pos', and
1236 * store them into the tuplestore. If pos is -1, reads the whole partition.
1237 */
1238 static void
spool_tuples(WindowAggState * winstate,int64 pos)1239 spool_tuples(WindowAggState *winstate, int64 pos)
1240 {
1241 WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
1242 PlanState *outerPlan;
1243 TupleTableSlot *outerslot;
1244 MemoryContext oldcontext;
1245
1246 if (!winstate->buffer)
1247 return; /* just a safety check */
1248 if (winstate->partition_spooled)
1249 return; /* whole partition done already */
1250
1251 /*
1252 * If the tuplestore has spilled to disk, alternate reading and writing
1253 * becomes quite expensive due to frequent buffer flushes. It's cheaper
1254 * to force the entire partition to get spooled in one go.
1255 *
1256 * XXX this is a horrid kluge --- it'd be better to fix the performance
1257 * problem inside tuplestore. FIXME
1258 */
1259 if (!tuplestore_in_memory(winstate->buffer))
1260 pos = -1;
1261
1262 outerPlan = outerPlanState(winstate);
1263
1264 /* Must be in query context to call outerplan */
1265 oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
1266
1267 while (winstate->spooled_rows <= pos || pos == -1)
1268 {
1269 outerslot = ExecProcNode(outerPlan);
1270 if (TupIsNull(outerslot))
1271 {
1272 /* reached the end of the last partition */
1273 winstate->partition_spooled = true;
1274 winstate->more_partitions = false;
1275 break;
1276 }
1277
1278 if (node->partNumCols > 0)
1279 {
1280 ExprContext *econtext = winstate->tmpcontext;
1281
1282 econtext->ecxt_innertuple = winstate->first_part_slot;
1283 econtext->ecxt_outertuple = outerslot;
1284
1285 /* Check if this tuple still belongs to the current partition */
1286 if (!ExecQualAndReset(winstate->partEqfunction, econtext))
1287 {
1288 /*
1289 * end of partition; copy the tuple for the next cycle.
1290 */
1291 ExecCopySlot(winstate->first_part_slot, outerslot);
1292 winstate->partition_spooled = true;
1293 winstate->more_partitions = true;
1294 break;
1295 }
1296 }
1297
1298 /* Still in partition, so save it into the tuplestore */
1299 tuplestore_puttupleslot(winstate->buffer, outerslot);
1300 winstate->spooled_rows++;
1301 }
1302
1303 MemoryContextSwitchTo(oldcontext);
1304 }
1305
1306 /*
1307 * release_partition
1308 * clear information kept within a partition, including
1309 * tuplestore and aggregate results.
1310 */
1311 static void
release_partition(WindowAggState * winstate)1312 release_partition(WindowAggState *winstate)
1313 {
1314 int i;
1315
1316 for (i = 0; i < winstate->numfuncs; i++)
1317 {
1318 WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
1319
1320 /* Release any partition-local state of this window function */
1321 if (perfuncstate->winobj)
1322 perfuncstate->winobj->localmem = NULL;
1323 }
1324
1325 /*
1326 * Release all partition-local memory (in particular, any partition-local
1327 * state that we might have trashed our pointers to in the above loop, and
1328 * any aggregate temp data). We don't rely on retail pfree because some
1329 * aggregates might have allocated data we don't have direct pointers to.
1330 */
1331 MemoryContextResetAndDeleteChildren(winstate->partcontext);
1332 MemoryContextResetAndDeleteChildren(winstate->aggcontext);
1333 for (i = 0; i < winstate->numaggs; i++)
1334 {
1335 if (winstate->peragg[i].aggcontext != winstate->aggcontext)
1336 MemoryContextResetAndDeleteChildren(winstate->peragg[i].aggcontext);
1337 }
1338
1339 if (winstate->buffer)
1340 tuplestore_end(winstate->buffer);
1341 winstate->buffer = NULL;
1342 winstate->partition_spooled = false;
1343 }
1344
1345 /*
1346 * row_is_in_frame
1347 * Determine whether a row is in the current row's window frame according
1348 * to our window framing rule
1349 *
1350 * The caller must have already determined that the row is in the partition
1351 * and fetched it into a slot. This function just encapsulates the framing
1352 * rules.
1353 *
1354 * Returns:
1355 * -1, if the row is out of frame and no succeeding rows can be in frame
1356 * 0, if the row is out of frame but succeeding rows might be in frame
1357 * 1, if the row is in frame
1358 *
1359 * May clobber winstate->temp_slot_2.
1360 */
1361 static int
row_is_in_frame(WindowAggState * winstate,int64 pos,TupleTableSlot * slot)1362 row_is_in_frame(WindowAggState *winstate, int64 pos, TupleTableSlot *slot)
1363 {
1364 int frameOptions = winstate->frameOptions;
1365
1366 Assert(pos >= 0); /* else caller error */
1367
1368 /*
1369 * First, check frame starting conditions. We might as well delegate this
1370 * to update_frameheadpos always; it doesn't add any notable cost.
1371 */
1372 update_frameheadpos(winstate);
1373 if (pos < winstate->frameheadpos)
1374 return 0;
1375
1376 /*
1377 * Okay so far, now check frame ending conditions. Here, we avoid calling
1378 * update_frametailpos in simple cases, so as not to spool tuples further
1379 * ahead than necessary.
1380 */
1381 if (frameOptions & FRAMEOPTION_END_CURRENT_ROW)
1382 {
1383 if (frameOptions & FRAMEOPTION_ROWS)
1384 {
1385 /* rows after current row are out of frame */
1386 if (pos > winstate->currentpos)
1387 return -1;
1388 }
1389 else if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS))
1390 {
1391 /* following row that is not peer is out of frame */
1392 if (pos > winstate->currentpos &&
1393 !are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
1394 return -1;
1395 }
1396 else
1397 Assert(false);
1398 }
1399 else if (frameOptions & FRAMEOPTION_END_OFFSET)
1400 {
1401 if (frameOptions & FRAMEOPTION_ROWS)
1402 {
1403 int64 offset = DatumGetInt64(winstate->endOffsetValue);
1404
1405 /* rows after current row + offset are out of frame */
1406 if (frameOptions & FRAMEOPTION_END_OFFSET_PRECEDING)
1407 offset = -offset;
1408
1409 if (pos > winstate->currentpos + offset)
1410 return -1;
1411 }
1412 else if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS))
1413 {
1414 /* hard cases, so delegate to update_frametailpos */
1415 update_frametailpos(winstate);
1416 if (pos >= winstate->frametailpos)
1417 return -1;
1418 }
1419 else
1420 Assert(false);
1421 }
1422
1423 /* Check exclusion clause */
1424 if (frameOptions & FRAMEOPTION_EXCLUDE_CURRENT_ROW)
1425 {
1426 if (pos == winstate->currentpos)
1427 return 0;
1428 }
1429 else if ((frameOptions & FRAMEOPTION_EXCLUDE_GROUP) ||
1430 ((frameOptions & FRAMEOPTION_EXCLUDE_TIES) &&
1431 pos != winstate->currentpos))
1432 {
1433 WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
1434
1435 /* If no ORDER BY, all rows are peers with each other */
1436 if (node->ordNumCols == 0)
1437 return 0;
1438 /* Otherwise, check the group boundaries */
1439 if (pos >= winstate->groupheadpos)
1440 {
1441 update_grouptailpos(winstate);
1442 if (pos < winstate->grouptailpos)
1443 return 0;
1444 }
1445 }
1446
1447 /* If we get here, it's in frame */
1448 return 1;
1449 }
1450
1451 /*
1452 * update_frameheadpos
1453 * make frameheadpos valid for the current row
1454 *
1455 * Note that frameheadpos is computed without regard for any window exclusion
1456 * clause; the current row and/or its peers are considered part of the frame
1457 * for this purpose even if they must be excluded later.
1458 *
1459 * May clobber winstate->temp_slot_2.
1460 */
1461 static void
update_frameheadpos(WindowAggState * winstate)1462 update_frameheadpos(WindowAggState *winstate)
1463 {
1464 WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
1465 int frameOptions = winstate->frameOptions;
1466 MemoryContext oldcontext;
1467
1468 if (winstate->framehead_valid)
1469 return; /* already known for current row */
1470
1471 /* We may be called in a short-lived context */
1472 oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
1473
1474 if (frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING)
1475 {
1476 /* In UNBOUNDED PRECEDING mode, frame head is always row 0 */
1477 winstate->frameheadpos = 0;
1478 winstate->framehead_valid = true;
1479 }
1480 else if (frameOptions & FRAMEOPTION_START_CURRENT_ROW)
1481 {
1482 if (frameOptions & FRAMEOPTION_ROWS)
1483 {
1484 /* In ROWS mode, frame head is the same as current */
1485 winstate->frameheadpos = winstate->currentpos;
1486 winstate->framehead_valid = true;
1487 }
1488 else if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS))
1489 {
1490 /* If no ORDER BY, all rows are peers with each other */
1491 if (node->ordNumCols == 0)
1492 {
1493 winstate->frameheadpos = 0;
1494 winstate->framehead_valid = true;
1495 MemoryContextSwitchTo(oldcontext);
1496 return;
1497 }
1498
1499 /*
1500 * In RANGE or GROUPS START_CURRENT_ROW mode, frame head is the
1501 * first row that is a peer of current row. We keep a copy of the
1502 * last-known frame head row in framehead_slot, and advance as
1503 * necessary. Note that if we reach end of partition, we will
1504 * leave frameheadpos = end+1 and framehead_slot empty.
1505 */
1506 tuplestore_select_read_pointer(winstate->buffer,
1507 winstate->framehead_ptr);
1508 if (winstate->frameheadpos == 0 &&
1509 TupIsNull(winstate->framehead_slot))
1510 {
1511 /* fetch first row into framehead_slot, if we didn't already */
1512 if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1513 winstate->framehead_slot))
1514 elog(ERROR, "unexpected end of tuplestore");
1515 }
1516
1517 while (!TupIsNull(winstate->framehead_slot))
1518 {
1519 if (are_peers(winstate, winstate->framehead_slot,
1520 winstate->ss.ss_ScanTupleSlot))
1521 break; /* this row is the correct frame head */
1522 /* Note we advance frameheadpos even if the fetch fails */
1523 winstate->frameheadpos++;
1524 spool_tuples(winstate, winstate->frameheadpos);
1525 if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1526 winstate->framehead_slot))
1527 break; /* end of partition */
1528 }
1529 winstate->framehead_valid = true;
1530 }
1531 else
1532 Assert(false);
1533 }
1534 else if (frameOptions & FRAMEOPTION_START_OFFSET)
1535 {
1536 if (frameOptions & FRAMEOPTION_ROWS)
1537 {
1538 /* In ROWS mode, bound is physically n before/after current */
1539 int64 offset = DatumGetInt64(winstate->startOffsetValue);
1540
1541 if (frameOptions & FRAMEOPTION_START_OFFSET_PRECEDING)
1542 offset = -offset;
1543
1544 winstate->frameheadpos = winstate->currentpos + offset;
1545 /* frame head can't go before first row */
1546 if (winstate->frameheadpos < 0)
1547 winstate->frameheadpos = 0;
1548 else if (winstate->frameheadpos > winstate->currentpos + 1)
1549 {
1550 /* make sure frameheadpos is not past end of partition */
1551 spool_tuples(winstate, winstate->frameheadpos - 1);
1552 if (winstate->frameheadpos > winstate->spooled_rows)
1553 winstate->frameheadpos = winstate->spooled_rows;
1554 }
1555 winstate->framehead_valid = true;
1556 }
1557 else if (frameOptions & FRAMEOPTION_RANGE)
1558 {
1559 /*
1560 * In RANGE START_OFFSET mode, frame head is the first row that
1561 * satisfies the in_range constraint relative to the current row.
1562 * We keep a copy of the last-known frame head row in
1563 * framehead_slot, and advance as necessary. Note that if we
1564 * reach end of partition, we will leave frameheadpos = end+1 and
1565 * framehead_slot empty.
1566 */
1567 int sortCol = node->ordColIdx[0];
1568 bool sub,
1569 less;
1570
1571 /* We must have an ordering column */
1572 Assert(node->ordNumCols == 1);
1573
1574 /* Precompute flags for in_range checks */
1575 if (frameOptions & FRAMEOPTION_START_OFFSET_PRECEDING)
1576 sub = true; /* subtract startOffset from current row */
1577 else
1578 sub = false; /* add it */
1579 less = false; /* normally, we want frame head >= sum */
1580 /* If sort order is descending, flip both flags */
1581 if (!winstate->inRangeAsc)
1582 {
1583 sub = !sub;
1584 less = true;
1585 }
1586
1587 tuplestore_select_read_pointer(winstate->buffer,
1588 winstate->framehead_ptr);
1589 if (winstate->frameheadpos == 0 &&
1590 TupIsNull(winstate->framehead_slot))
1591 {
1592 /* fetch first row into framehead_slot, if we didn't already */
1593 if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1594 winstate->framehead_slot))
1595 elog(ERROR, "unexpected end of tuplestore");
1596 }
1597
1598 while (!TupIsNull(winstate->framehead_slot))
1599 {
1600 Datum headval,
1601 currval;
1602 bool headisnull,
1603 currisnull;
1604
1605 headval = slot_getattr(winstate->framehead_slot, sortCol,
1606 &headisnull);
1607 currval = slot_getattr(winstate->ss.ss_ScanTupleSlot, sortCol,
1608 &currisnull);
1609 if (headisnull || currisnull)
1610 {
1611 /* order of the rows depends only on nulls_first */
1612 if (winstate->inRangeNullsFirst)
1613 {
1614 /* advance head if head is null and curr is not */
1615 if (!headisnull || currisnull)
1616 break;
1617 }
1618 else
1619 {
1620 /* advance head if head is not null and curr is null */
1621 if (headisnull || !currisnull)
1622 break;
1623 }
1624 }
1625 else
1626 {
1627 if (DatumGetBool(FunctionCall5Coll(&winstate->startInRangeFunc,
1628 winstate->inRangeColl,
1629 headval,
1630 currval,
1631 winstate->startOffsetValue,
1632 BoolGetDatum(sub),
1633 BoolGetDatum(less))))
1634 break; /* this row is the correct frame head */
1635 }
1636 /* Note we advance frameheadpos even if the fetch fails */
1637 winstate->frameheadpos++;
1638 spool_tuples(winstate, winstate->frameheadpos);
1639 if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1640 winstate->framehead_slot))
1641 break; /* end of partition */
1642 }
1643 winstate->framehead_valid = true;
1644 }
1645 else if (frameOptions & FRAMEOPTION_GROUPS)
1646 {
1647 /*
1648 * In GROUPS START_OFFSET mode, frame head is the first row of the
1649 * first peer group whose number satisfies the offset constraint.
1650 * We keep a copy of the last-known frame head row in
1651 * framehead_slot, and advance as necessary. Note that if we
1652 * reach end of partition, we will leave frameheadpos = end+1 and
1653 * framehead_slot empty.
1654 */
1655 int64 offset = DatumGetInt64(winstate->startOffsetValue);
1656 int64 minheadgroup;
1657
1658 if (frameOptions & FRAMEOPTION_START_OFFSET_PRECEDING)
1659 minheadgroup = winstate->currentgroup - offset;
1660 else
1661 minheadgroup = winstate->currentgroup + offset;
1662
1663 tuplestore_select_read_pointer(winstate->buffer,
1664 winstate->framehead_ptr);
1665 if (winstate->frameheadpos == 0 &&
1666 TupIsNull(winstate->framehead_slot))
1667 {
1668 /* fetch first row into framehead_slot, if we didn't already */
1669 if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1670 winstate->framehead_slot))
1671 elog(ERROR, "unexpected end of tuplestore");
1672 }
1673
1674 while (!TupIsNull(winstate->framehead_slot))
1675 {
1676 if (winstate->frameheadgroup >= minheadgroup)
1677 break; /* this row is the correct frame head */
1678 ExecCopySlot(winstate->temp_slot_2, winstate->framehead_slot);
1679 /* Note we advance frameheadpos even if the fetch fails */
1680 winstate->frameheadpos++;
1681 spool_tuples(winstate, winstate->frameheadpos);
1682 if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1683 winstate->framehead_slot))
1684 break; /* end of partition */
1685 if (!are_peers(winstate, winstate->temp_slot_2,
1686 winstate->framehead_slot))
1687 winstate->frameheadgroup++;
1688 }
1689 ExecClearTuple(winstate->temp_slot_2);
1690 winstate->framehead_valid = true;
1691 }
1692 else
1693 Assert(false);
1694 }
1695 else
1696 Assert(false);
1697
1698 MemoryContextSwitchTo(oldcontext);
1699 }
1700
1701 /*
1702 * update_frametailpos
1703 * make frametailpos valid for the current row
1704 *
1705 * Note that frametailpos is computed without regard for any window exclusion
1706 * clause; the current row and/or its peers are considered part of the frame
1707 * for this purpose even if they must be excluded later.
1708 *
1709 * May clobber winstate->temp_slot_2.
1710 */
1711 static void
update_frametailpos(WindowAggState * winstate)1712 update_frametailpos(WindowAggState *winstate)
1713 {
1714 WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
1715 int frameOptions = winstate->frameOptions;
1716 MemoryContext oldcontext;
1717
1718 if (winstate->frametail_valid)
1719 return; /* already known for current row */
1720
1721 /* We may be called in a short-lived context */
1722 oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
1723
1724 if (frameOptions & FRAMEOPTION_END_UNBOUNDED_FOLLOWING)
1725 {
1726 /* In UNBOUNDED FOLLOWING mode, all partition rows are in frame */
1727 spool_tuples(winstate, -1);
1728 winstate->frametailpos = winstate->spooled_rows;
1729 winstate->frametail_valid = true;
1730 }
1731 else if (frameOptions & FRAMEOPTION_END_CURRENT_ROW)
1732 {
1733 if (frameOptions & FRAMEOPTION_ROWS)
1734 {
1735 /* In ROWS mode, exactly the rows up to current are in frame */
1736 winstate->frametailpos = winstate->currentpos + 1;
1737 winstate->frametail_valid = true;
1738 }
1739 else if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS))
1740 {
1741 /* If no ORDER BY, all rows are peers with each other */
1742 if (node->ordNumCols == 0)
1743 {
1744 spool_tuples(winstate, -1);
1745 winstate->frametailpos = winstate->spooled_rows;
1746 winstate->frametail_valid = true;
1747 MemoryContextSwitchTo(oldcontext);
1748 return;
1749 }
1750
1751 /*
1752 * In RANGE or GROUPS END_CURRENT_ROW mode, frame end is the last
1753 * row that is a peer of current row, frame tail is the row after
1754 * that (if any). We keep a copy of the last-known frame tail row
1755 * in frametail_slot, and advance as necessary. Note that if we
1756 * reach end of partition, we will leave frametailpos = end+1 and
1757 * frametail_slot empty.
1758 */
1759 tuplestore_select_read_pointer(winstate->buffer,
1760 winstate->frametail_ptr);
1761 if (winstate->frametailpos == 0 &&
1762 TupIsNull(winstate->frametail_slot))
1763 {
1764 /* fetch first row into frametail_slot, if we didn't already */
1765 if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1766 winstate->frametail_slot))
1767 elog(ERROR, "unexpected end of tuplestore");
1768 }
1769
1770 while (!TupIsNull(winstate->frametail_slot))
1771 {
1772 if (winstate->frametailpos > winstate->currentpos &&
1773 !are_peers(winstate, winstate->frametail_slot,
1774 winstate->ss.ss_ScanTupleSlot))
1775 break; /* this row is the frame tail */
1776 /* Note we advance frametailpos even if the fetch fails */
1777 winstate->frametailpos++;
1778 spool_tuples(winstate, winstate->frametailpos);
1779 if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1780 winstate->frametail_slot))
1781 break; /* end of partition */
1782 }
1783 winstate->frametail_valid = true;
1784 }
1785 else
1786 Assert(false);
1787 }
1788 else if (frameOptions & FRAMEOPTION_END_OFFSET)
1789 {
1790 if (frameOptions & FRAMEOPTION_ROWS)
1791 {
1792 /* In ROWS mode, bound is physically n before/after current */
1793 int64 offset = DatumGetInt64(winstate->endOffsetValue);
1794
1795 if (frameOptions & FRAMEOPTION_END_OFFSET_PRECEDING)
1796 offset = -offset;
1797
1798 winstate->frametailpos = winstate->currentpos + offset + 1;
1799 /* smallest allowable value of frametailpos is 0 */
1800 if (winstate->frametailpos < 0)
1801 winstate->frametailpos = 0;
1802 else if (winstate->frametailpos > winstate->currentpos + 1)
1803 {
1804 /* make sure frametailpos is not past end of partition */
1805 spool_tuples(winstate, winstate->frametailpos - 1);
1806 if (winstate->frametailpos > winstate->spooled_rows)
1807 winstate->frametailpos = winstate->spooled_rows;
1808 }
1809 winstate->frametail_valid = true;
1810 }
1811 else if (frameOptions & FRAMEOPTION_RANGE)
1812 {
1813 /*
1814 * In RANGE END_OFFSET mode, frame end is the last row that
1815 * satisfies the in_range constraint relative to the current row,
1816 * frame tail is the row after that (if any). We keep a copy of
1817 * the last-known frame tail row in frametail_slot, and advance as
1818 * necessary. Note that if we reach end of partition, we will
1819 * leave frametailpos = end+1 and frametail_slot empty.
1820 */
1821 int sortCol = node->ordColIdx[0];
1822 bool sub,
1823 less;
1824
1825 /* We must have an ordering column */
1826 Assert(node->ordNumCols == 1);
1827
1828 /* Precompute flags for in_range checks */
1829 if (frameOptions & FRAMEOPTION_END_OFFSET_PRECEDING)
1830 sub = true; /* subtract endOffset from current row */
1831 else
1832 sub = false; /* add it */
1833 less = true; /* normally, we want frame tail <= sum */
1834 /* If sort order is descending, flip both flags */
1835 if (!winstate->inRangeAsc)
1836 {
1837 sub = !sub;
1838 less = false;
1839 }
1840
1841 tuplestore_select_read_pointer(winstate->buffer,
1842 winstate->frametail_ptr);
1843 if (winstate->frametailpos == 0 &&
1844 TupIsNull(winstate->frametail_slot))
1845 {
1846 /* fetch first row into frametail_slot, if we didn't already */
1847 if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1848 winstate->frametail_slot))
1849 elog(ERROR, "unexpected end of tuplestore");
1850 }
1851
1852 while (!TupIsNull(winstate->frametail_slot))
1853 {
1854 Datum tailval,
1855 currval;
1856 bool tailisnull,
1857 currisnull;
1858
1859 tailval = slot_getattr(winstate->frametail_slot, sortCol,
1860 &tailisnull);
1861 currval = slot_getattr(winstate->ss.ss_ScanTupleSlot, sortCol,
1862 &currisnull);
1863 if (tailisnull || currisnull)
1864 {
1865 /* order of the rows depends only on nulls_first */
1866 if (winstate->inRangeNullsFirst)
1867 {
1868 /* advance tail if tail is null or curr is not */
1869 if (!tailisnull)
1870 break;
1871 }
1872 else
1873 {
1874 /* advance tail if tail is not null or curr is null */
1875 if (!currisnull)
1876 break;
1877 }
1878 }
1879 else
1880 {
1881 if (!DatumGetBool(FunctionCall5Coll(&winstate->endInRangeFunc,
1882 winstate->inRangeColl,
1883 tailval,
1884 currval,
1885 winstate->endOffsetValue,
1886 BoolGetDatum(sub),
1887 BoolGetDatum(less))))
1888 break; /* this row is the correct frame tail */
1889 }
1890 /* Note we advance frametailpos even if the fetch fails */
1891 winstate->frametailpos++;
1892 spool_tuples(winstate, winstate->frametailpos);
1893 if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1894 winstate->frametail_slot))
1895 break; /* end of partition */
1896 }
1897 winstate->frametail_valid = true;
1898 }
1899 else if (frameOptions & FRAMEOPTION_GROUPS)
1900 {
1901 /*
1902 * In GROUPS END_OFFSET mode, frame end is the last row of the
1903 * last peer group whose number satisfies the offset constraint,
1904 * and frame tail is the row after that (if any). We keep a copy
1905 * of the last-known frame tail row in frametail_slot, and advance
1906 * as necessary. Note that if we reach end of partition, we will
1907 * leave frametailpos = end+1 and frametail_slot empty.
1908 */
1909 int64 offset = DatumGetInt64(winstate->endOffsetValue);
1910 int64 maxtailgroup;
1911
1912 if (frameOptions & FRAMEOPTION_END_OFFSET_PRECEDING)
1913 maxtailgroup = winstate->currentgroup - offset;
1914 else
1915 maxtailgroup = winstate->currentgroup + offset;
1916
1917 tuplestore_select_read_pointer(winstate->buffer,
1918 winstate->frametail_ptr);
1919 if (winstate->frametailpos == 0 &&
1920 TupIsNull(winstate->frametail_slot))
1921 {
1922 /* fetch first row into frametail_slot, if we didn't already */
1923 if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1924 winstate->frametail_slot))
1925 elog(ERROR, "unexpected end of tuplestore");
1926 }
1927
1928 while (!TupIsNull(winstate->frametail_slot))
1929 {
1930 if (winstate->frametailgroup > maxtailgroup)
1931 break; /* this row is the correct frame tail */
1932 ExecCopySlot(winstate->temp_slot_2, winstate->frametail_slot);
1933 /* Note we advance frametailpos even if the fetch fails */
1934 winstate->frametailpos++;
1935 spool_tuples(winstate, winstate->frametailpos);
1936 if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1937 winstate->frametail_slot))
1938 break; /* end of partition */
1939 if (!are_peers(winstate, winstate->temp_slot_2,
1940 winstate->frametail_slot))
1941 winstate->frametailgroup++;
1942 }
1943 ExecClearTuple(winstate->temp_slot_2);
1944 winstate->frametail_valid = true;
1945 }
1946 else
1947 Assert(false);
1948 }
1949 else
1950 Assert(false);
1951
1952 MemoryContextSwitchTo(oldcontext);
1953 }
1954
1955 /*
1956 * update_grouptailpos
1957 * make grouptailpos valid for the current row
1958 *
1959 * May clobber winstate->temp_slot_2.
1960 */
1961 static void
update_grouptailpos(WindowAggState * winstate)1962 update_grouptailpos(WindowAggState *winstate)
1963 {
1964 WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
1965 MemoryContext oldcontext;
1966
1967 if (winstate->grouptail_valid)
1968 return; /* already known for current row */
1969
1970 /* We may be called in a short-lived context */
1971 oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
1972
1973 /* If no ORDER BY, all rows are peers with each other */
1974 if (node->ordNumCols == 0)
1975 {
1976 spool_tuples(winstate, -1);
1977 winstate->grouptailpos = winstate->spooled_rows;
1978 winstate->grouptail_valid = true;
1979 MemoryContextSwitchTo(oldcontext);
1980 return;
1981 }
1982
1983 /*
1984 * Because grouptail_valid is reset only when current row advances into a
1985 * new peer group, we always reach here knowing that grouptailpos needs to
1986 * be advanced by at least one row. Hence, unlike the otherwise similar
1987 * case for frame tail tracking, we do not need persistent storage of the
1988 * group tail row.
1989 */
1990 Assert(winstate->grouptailpos <= winstate->currentpos);
1991 tuplestore_select_read_pointer(winstate->buffer,
1992 winstate->grouptail_ptr);
1993 for (;;)
1994 {
1995 /* Note we advance grouptailpos even if the fetch fails */
1996 winstate->grouptailpos++;
1997 spool_tuples(winstate, winstate->grouptailpos);
1998 if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1999 winstate->temp_slot_2))
2000 break; /* end of partition */
2001 if (winstate->grouptailpos > winstate->currentpos &&
2002 !are_peers(winstate, winstate->temp_slot_2,
2003 winstate->ss.ss_ScanTupleSlot))
2004 break; /* this row is the group tail */
2005 }
2006 ExecClearTuple(winstate->temp_slot_2);
2007 winstate->grouptail_valid = true;
2008
2009 MemoryContextSwitchTo(oldcontext);
2010 }
2011
2012
2013 /* -----------------
2014 * ExecWindowAgg
2015 *
2016 * ExecWindowAgg receives tuples from its outer subplan and
2017 * stores them into a tuplestore, then processes window functions.
2018 * This node doesn't reduce nor qualify any row so the number of
2019 * returned rows is exactly the same as its outer subplan's result.
2020 * -----------------
2021 */
2022 static TupleTableSlot *
ExecWindowAgg(PlanState * pstate)2023 ExecWindowAgg(PlanState *pstate)
2024 {
2025 WindowAggState *winstate = castNode(WindowAggState, pstate);
2026 ExprContext *econtext;
2027 int i;
2028 int numfuncs;
2029
2030 CHECK_FOR_INTERRUPTS();
2031
2032 if (winstate->all_done)
2033 return NULL;
2034
2035 /*
2036 * Compute frame offset values, if any, during first call (or after a
2037 * rescan). These are assumed to hold constant throughout the scan; if
2038 * user gives us a volatile expression, we'll only use its initial value.
2039 */
2040 if (winstate->all_first)
2041 {
2042 int frameOptions = winstate->frameOptions;
2043 ExprContext *econtext = winstate->ss.ps.ps_ExprContext;
2044 Datum value;
2045 bool isnull;
2046 int16 len;
2047 bool byval;
2048
2049 if (frameOptions & FRAMEOPTION_START_OFFSET)
2050 {
2051 Assert(winstate->startOffset != NULL);
2052 value = ExecEvalExprSwitchContext(winstate->startOffset,
2053 econtext,
2054 &isnull);
2055 if (isnull)
2056 ereport(ERROR,
2057 (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
2058 errmsg("frame starting offset must not be null")));
2059 /* copy value into query-lifespan context */
2060 get_typlenbyval(exprType((Node *) winstate->startOffset->expr),
2061 &len, &byval);
2062 winstate->startOffsetValue = datumCopy(value, byval, len);
2063 if (frameOptions & (FRAMEOPTION_ROWS | FRAMEOPTION_GROUPS))
2064 {
2065 /* value is known to be int8 */
2066 int64 offset = DatumGetInt64(value);
2067
2068 if (offset < 0)
2069 ereport(ERROR,
2070 (errcode(ERRCODE_INVALID_PRECEDING_OR_FOLLOWING_SIZE),
2071 errmsg("frame starting offset must not be negative")));
2072 }
2073 }
2074 if (frameOptions & FRAMEOPTION_END_OFFSET)
2075 {
2076 Assert(winstate->endOffset != NULL);
2077 value = ExecEvalExprSwitchContext(winstate->endOffset,
2078 econtext,
2079 &isnull);
2080 if (isnull)
2081 ereport(ERROR,
2082 (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
2083 errmsg("frame ending offset must not be null")));
2084 /* copy value into query-lifespan context */
2085 get_typlenbyval(exprType((Node *) winstate->endOffset->expr),
2086 &len, &byval);
2087 winstate->endOffsetValue = datumCopy(value, byval, len);
2088 if (frameOptions & (FRAMEOPTION_ROWS | FRAMEOPTION_GROUPS))
2089 {
2090 /* value is known to be int8 */
2091 int64 offset = DatumGetInt64(value);
2092
2093 if (offset < 0)
2094 ereport(ERROR,
2095 (errcode(ERRCODE_INVALID_PRECEDING_OR_FOLLOWING_SIZE),
2096 errmsg("frame ending offset must not be negative")));
2097 }
2098 }
2099 winstate->all_first = false;
2100 }
2101
2102 if (winstate->buffer == NULL)
2103 {
2104 /* Initialize for first partition and set current row = 0 */
2105 begin_partition(winstate);
2106 /* If there are no input rows, we'll detect that and exit below */
2107 }
2108 else
2109 {
2110 /* Advance current row within partition */
2111 winstate->currentpos++;
2112 /* This might mean that the frame moves, too */
2113 winstate->framehead_valid = false;
2114 winstate->frametail_valid = false;
2115 /* we don't need to invalidate grouptail here; see below */
2116 }
2117
2118 /*
2119 * Spool all tuples up to and including the current row, if we haven't
2120 * already
2121 */
2122 spool_tuples(winstate, winstate->currentpos);
2123
2124 /* Move to the next partition if we reached the end of this partition */
2125 if (winstate->partition_spooled &&
2126 winstate->currentpos >= winstate->spooled_rows)
2127 {
2128 release_partition(winstate);
2129
2130 if (winstate->more_partitions)
2131 {
2132 begin_partition(winstate);
2133 Assert(winstate->spooled_rows > 0);
2134 }
2135 else
2136 {
2137 winstate->all_done = true;
2138 return NULL;
2139 }
2140 }
2141
2142 /* final output execution is in ps_ExprContext */
2143 econtext = winstate->ss.ps.ps_ExprContext;
2144
2145 /* Clear the per-output-tuple context for current row */
2146 ResetExprContext(econtext);
2147
2148 /*
2149 * Read the current row from the tuplestore, and save in ScanTupleSlot.
2150 * (We can't rely on the outerplan's output slot because we may have to
2151 * read beyond the current row. Also, we have to actually copy the row
2152 * out of the tuplestore, since window function evaluation might cause the
2153 * tuplestore to dump its state to disk.)
2154 *
2155 * In GROUPS mode, or when tracking a group-oriented exclusion clause, we
2156 * must also detect entering a new peer group and update associated state
2157 * when that happens. We use temp_slot_2 to temporarily hold the previous
2158 * row for this purpose.
2159 *
2160 * Current row must be in the tuplestore, since we spooled it above.
2161 */
2162 tuplestore_select_read_pointer(winstate->buffer, winstate->current_ptr);
2163 if ((winstate->frameOptions & (FRAMEOPTION_GROUPS |
2164 FRAMEOPTION_EXCLUDE_GROUP |
2165 FRAMEOPTION_EXCLUDE_TIES)) &&
2166 winstate->currentpos > 0)
2167 {
2168 ExecCopySlot(winstate->temp_slot_2, winstate->ss.ss_ScanTupleSlot);
2169 if (!tuplestore_gettupleslot(winstate->buffer, true, true,
2170 winstate->ss.ss_ScanTupleSlot))
2171 elog(ERROR, "unexpected end of tuplestore");
2172 if (!are_peers(winstate, winstate->temp_slot_2,
2173 winstate->ss.ss_ScanTupleSlot))
2174 {
2175 winstate->currentgroup++;
2176 winstate->groupheadpos = winstate->currentpos;
2177 winstate->grouptail_valid = false;
2178 }
2179 ExecClearTuple(winstate->temp_slot_2);
2180 }
2181 else
2182 {
2183 if (!tuplestore_gettupleslot(winstate->buffer, true, true,
2184 winstate->ss.ss_ScanTupleSlot))
2185 elog(ERROR, "unexpected end of tuplestore");
2186 }
2187
2188 /*
2189 * Evaluate true window functions
2190 */
2191 numfuncs = winstate->numfuncs;
2192 for (i = 0; i < numfuncs; i++)
2193 {
2194 WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
2195
2196 if (perfuncstate->plain_agg)
2197 continue;
2198 eval_windowfunction(winstate, perfuncstate,
2199 &(econtext->ecxt_aggvalues[perfuncstate->wfuncstate->wfuncno]),
2200 &(econtext->ecxt_aggnulls[perfuncstate->wfuncstate->wfuncno]));
2201 }
2202
2203 /*
2204 * Evaluate aggregates
2205 */
2206 if (winstate->numaggs > 0)
2207 eval_windowaggregates(winstate);
2208
2209 /*
2210 * If we have created auxiliary read pointers for the frame or group
2211 * boundaries, force them to be kept up-to-date, because we don't know
2212 * whether the window function(s) will do anything that requires that.
2213 * Failing to advance the pointers would result in being unable to trim
2214 * data from the tuplestore, which is bad. (If we could know in advance
2215 * whether the window functions will use frame boundary info, we could
2216 * skip creating these pointers in the first place ... but unfortunately
2217 * the window function API doesn't require that.)
2218 */
2219 if (winstate->framehead_ptr >= 0)
2220 update_frameheadpos(winstate);
2221 if (winstate->frametail_ptr >= 0)
2222 update_frametailpos(winstate);
2223 if (winstate->grouptail_ptr >= 0)
2224 update_grouptailpos(winstate);
2225
2226 /*
2227 * Truncate any no-longer-needed rows from the tuplestore.
2228 */
2229 tuplestore_trim(winstate->buffer);
2230
2231 /*
2232 * Form and return a projection tuple using the windowfunc results and the
2233 * current row. Setting ecxt_outertuple arranges that any Vars will be
2234 * evaluated with respect to that row.
2235 */
2236 econtext->ecxt_outertuple = winstate->ss.ss_ScanTupleSlot;
2237
2238 return ExecProject(winstate->ss.ps.ps_ProjInfo);
2239 }
2240
2241 /* -----------------
2242 * ExecInitWindowAgg
2243 *
2244 * Creates the run-time information for the WindowAgg node produced by the
2245 * planner and initializes its outer subtree
2246 * -----------------
2247 */
2248 WindowAggState *
ExecInitWindowAgg(WindowAgg * node,EState * estate,int eflags)2249 ExecInitWindowAgg(WindowAgg *node, EState *estate, int eflags)
2250 {
2251 WindowAggState *winstate;
2252 Plan *outerPlan;
2253 ExprContext *econtext;
2254 ExprContext *tmpcontext;
2255 WindowStatePerFunc perfunc;
2256 WindowStatePerAgg peragg;
2257 int frameOptions = node->frameOptions;
2258 int numfuncs,
2259 wfuncno,
2260 numaggs,
2261 aggno;
2262 TupleDesc scanDesc;
2263 ListCell *l;
2264
2265 /* check for unsupported flags */
2266 Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
2267
2268 /*
2269 * create state structure
2270 */
2271 winstate = makeNode(WindowAggState);
2272 winstate->ss.ps.plan = (Plan *) node;
2273 winstate->ss.ps.state = estate;
2274 winstate->ss.ps.ExecProcNode = ExecWindowAgg;
2275
2276 /*
2277 * Create expression contexts. We need two, one for per-input-tuple
2278 * processing and one for per-output-tuple processing. We cheat a little
2279 * by using ExecAssignExprContext() to build both.
2280 */
2281 ExecAssignExprContext(estate, &winstate->ss.ps);
2282 tmpcontext = winstate->ss.ps.ps_ExprContext;
2283 winstate->tmpcontext = tmpcontext;
2284 ExecAssignExprContext(estate, &winstate->ss.ps);
2285
2286 /* Create long-lived context for storage of partition-local memory etc */
2287 winstate->partcontext =
2288 AllocSetContextCreate(CurrentMemoryContext,
2289 "WindowAgg Partition",
2290 ALLOCSET_DEFAULT_SIZES);
2291
2292 /*
2293 * Create mid-lived context for aggregate trans values etc.
2294 *
2295 * Note that moving aggregates each use their own private context, not
2296 * this one.
2297 */
2298 winstate->aggcontext =
2299 AllocSetContextCreate(CurrentMemoryContext,
2300 "WindowAgg Aggregates",
2301 ALLOCSET_DEFAULT_SIZES);
2302
2303 /*
2304 * WindowAgg nodes never have quals, since they can only occur at the
2305 * logical top level of a query (ie, after any WHERE or HAVING filters)
2306 */
2307 Assert(node->plan.qual == NIL);
2308 winstate->ss.ps.qual = NULL;
2309
2310 /*
2311 * initialize child nodes
2312 */
2313 outerPlan = outerPlan(node);
2314 outerPlanState(winstate) = ExecInitNode(outerPlan, estate, eflags);
2315
2316 /*
2317 * initialize source tuple type (which is also the tuple type that we'll
2318 * store in the tuplestore and use in all our working slots).
2319 */
2320 ExecCreateScanSlotFromOuterPlan(estate, &winstate->ss, &TTSOpsMinimalTuple);
2321 scanDesc = winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
2322
2323 /* the outer tuple isn't the child's tuple, but always a minimal tuple */
2324 winstate->ss.ps.outeropsset = true;
2325 winstate->ss.ps.outerops = &TTSOpsMinimalTuple;
2326 winstate->ss.ps.outeropsfixed = true;
2327
2328 /*
2329 * tuple table initialization
2330 */
2331 winstate->first_part_slot = ExecInitExtraTupleSlot(estate, scanDesc,
2332 &TTSOpsMinimalTuple);
2333 winstate->agg_row_slot = ExecInitExtraTupleSlot(estate, scanDesc,
2334 &TTSOpsMinimalTuple);
2335 winstate->temp_slot_1 = ExecInitExtraTupleSlot(estate, scanDesc,
2336 &TTSOpsMinimalTuple);
2337 winstate->temp_slot_2 = ExecInitExtraTupleSlot(estate, scanDesc,
2338 &TTSOpsMinimalTuple);
2339
2340 /*
2341 * create frame head and tail slots only if needed (must create slots in
2342 * exactly the same cases that update_frameheadpos and update_frametailpos
2343 * need them)
2344 */
2345 winstate->framehead_slot = winstate->frametail_slot = NULL;
2346
2347 if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS))
2348 {
2349 if (((frameOptions & FRAMEOPTION_START_CURRENT_ROW) &&
2350 node->ordNumCols != 0) ||
2351 (frameOptions & FRAMEOPTION_START_OFFSET))
2352 winstate->framehead_slot = ExecInitExtraTupleSlot(estate, scanDesc,
2353 &TTSOpsMinimalTuple);
2354 if (((frameOptions & FRAMEOPTION_END_CURRENT_ROW) &&
2355 node->ordNumCols != 0) ||
2356 (frameOptions & FRAMEOPTION_END_OFFSET))
2357 winstate->frametail_slot = ExecInitExtraTupleSlot(estate, scanDesc,
2358 &TTSOpsMinimalTuple);
2359 }
2360
2361 /*
2362 * Initialize result slot, type and projection.
2363 */
2364 ExecInitResultTupleSlotTL(&winstate->ss.ps, &TTSOpsVirtual);
2365 ExecAssignProjectionInfo(&winstate->ss.ps, NULL);
2366
2367 /* Set up data for comparing tuples */
2368 if (node->partNumCols > 0)
2369 winstate->partEqfunction =
2370 execTuplesMatchPrepare(scanDesc,
2371 node->partNumCols,
2372 node->partColIdx,
2373 node->partOperators,
2374 node->partCollations,
2375 &winstate->ss.ps);
2376
2377 if (node->ordNumCols > 0)
2378 winstate->ordEqfunction =
2379 execTuplesMatchPrepare(scanDesc,
2380 node->ordNumCols,
2381 node->ordColIdx,
2382 node->ordOperators,
2383 node->ordCollations,
2384 &winstate->ss.ps);
2385
2386 /*
2387 * WindowAgg nodes use aggvalues and aggnulls as well as Agg nodes.
2388 */
2389 numfuncs = winstate->numfuncs;
2390 numaggs = winstate->numaggs;
2391 econtext = winstate->ss.ps.ps_ExprContext;
2392 econtext->ecxt_aggvalues = (Datum *) palloc0(sizeof(Datum) * numfuncs);
2393 econtext->ecxt_aggnulls = (bool *) palloc0(sizeof(bool) * numfuncs);
2394
2395 /*
2396 * allocate per-wfunc/per-agg state information.
2397 */
2398 perfunc = (WindowStatePerFunc) palloc0(sizeof(WindowStatePerFuncData) * numfuncs);
2399 peragg = (WindowStatePerAgg) palloc0(sizeof(WindowStatePerAggData) * numaggs);
2400 winstate->perfunc = perfunc;
2401 winstate->peragg = peragg;
2402
2403 wfuncno = -1;
2404 aggno = -1;
2405 foreach(l, winstate->funcs)
2406 {
2407 WindowFuncExprState *wfuncstate = (WindowFuncExprState *) lfirst(l);
2408 WindowFunc *wfunc = wfuncstate->wfunc;
2409 WindowStatePerFunc perfuncstate;
2410 AclResult aclresult;
2411 int i;
2412
2413 if (wfunc->winref != node->winref) /* planner screwed up? */
2414 elog(ERROR, "WindowFunc with winref %u assigned to WindowAgg with winref %u",
2415 wfunc->winref, node->winref);
2416
2417 /* Look for a previous duplicate window function */
2418 for (i = 0; i <= wfuncno; i++)
2419 {
2420 if (equal(wfunc, perfunc[i].wfunc) &&
2421 !contain_volatile_functions((Node *) wfunc))
2422 break;
2423 }
2424 if (i <= wfuncno)
2425 {
2426 /* Found a match to an existing entry, so just mark it */
2427 wfuncstate->wfuncno = i;
2428 continue;
2429 }
2430
2431 /* Nope, so assign a new PerAgg record */
2432 perfuncstate = &perfunc[++wfuncno];
2433
2434 /* Mark WindowFunc state node with assigned index in the result array */
2435 wfuncstate->wfuncno = wfuncno;
2436
2437 /* Check permission to call window function */
2438 aclresult = pg_proc_aclcheck(wfunc->winfnoid, GetUserId(),
2439 ACL_EXECUTE);
2440 if (aclresult != ACLCHECK_OK)
2441 aclcheck_error(aclresult, OBJECT_FUNCTION,
2442 get_func_name(wfunc->winfnoid));
2443 InvokeFunctionExecuteHook(wfunc->winfnoid);
2444
2445 /* Fill in the perfuncstate data */
2446 perfuncstate->wfuncstate = wfuncstate;
2447 perfuncstate->wfunc = wfunc;
2448 perfuncstate->numArguments = list_length(wfuncstate->args);
2449 perfuncstate->winCollation = wfunc->inputcollid;
2450
2451 get_typlenbyval(wfunc->wintype,
2452 &perfuncstate->resulttypeLen,
2453 &perfuncstate->resulttypeByVal);
2454
2455 /*
2456 * If it's really just a plain aggregate function, we'll emulate the
2457 * Agg environment for it.
2458 */
2459 perfuncstate->plain_agg = wfunc->winagg;
2460 if (wfunc->winagg)
2461 {
2462 WindowStatePerAgg peraggstate;
2463
2464 perfuncstate->aggno = ++aggno;
2465 peraggstate = &winstate->peragg[aggno];
2466 initialize_peragg(winstate, wfunc, peraggstate);
2467 peraggstate->wfuncno = wfuncno;
2468 }
2469 else
2470 {
2471 WindowObject winobj = makeNode(WindowObjectData);
2472
2473 winobj->winstate = winstate;
2474 winobj->argstates = wfuncstate->args;
2475 winobj->localmem = NULL;
2476 perfuncstate->winobj = winobj;
2477
2478 /* It's a real window function, so set up to call it. */
2479 fmgr_info_cxt(wfunc->winfnoid, &perfuncstate->flinfo,
2480 econtext->ecxt_per_query_memory);
2481 fmgr_info_set_expr((Node *) wfunc, &perfuncstate->flinfo);
2482 }
2483 }
2484
2485 /* Update numfuncs, numaggs to match number of unique functions found */
2486 winstate->numfuncs = wfuncno + 1;
2487 winstate->numaggs = aggno + 1;
2488
2489 /* Set up WindowObject for aggregates, if needed */
2490 if (winstate->numaggs > 0)
2491 {
2492 WindowObject agg_winobj = makeNode(WindowObjectData);
2493
2494 agg_winobj->winstate = winstate;
2495 agg_winobj->argstates = NIL;
2496 agg_winobj->localmem = NULL;
2497 /* make sure markptr = -1 to invalidate. It may not get used */
2498 agg_winobj->markptr = -1;
2499 agg_winobj->readptr = -1;
2500 winstate->agg_winobj = agg_winobj;
2501 }
2502
2503 /* copy frame options to state node for easy access */
2504 winstate->frameOptions = frameOptions;
2505
2506 /* initialize frame bound offset expressions */
2507 winstate->startOffset = ExecInitExpr((Expr *) node->startOffset,
2508 (PlanState *) winstate);
2509 winstate->endOffset = ExecInitExpr((Expr *) node->endOffset,
2510 (PlanState *) winstate);
2511
2512 /* Lookup in_range support functions if needed */
2513 if (OidIsValid(node->startInRangeFunc))
2514 fmgr_info(node->startInRangeFunc, &winstate->startInRangeFunc);
2515 if (OidIsValid(node->endInRangeFunc))
2516 fmgr_info(node->endInRangeFunc, &winstate->endInRangeFunc);
2517 winstate->inRangeColl = node->inRangeColl;
2518 winstate->inRangeAsc = node->inRangeAsc;
2519 winstate->inRangeNullsFirst = node->inRangeNullsFirst;
2520
2521 winstate->all_first = true;
2522 winstate->partition_spooled = false;
2523 winstate->more_partitions = false;
2524
2525 return winstate;
2526 }
2527
2528 /* -----------------
2529 * ExecEndWindowAgg
2530 * -----------------
2531 */
2532 void
ExecEndWindowAgg(WindowAggState * node)2533 ExecEndWindowAgg(WindowAggState *node)
2534 {
2535 PlanState *outerPlan;
2536 int i;
2537
2538 release_partition(node);
2539
2540 ExecClearTuple(node->ss.ss_ScanTupleSlot);
2541 ExecClearTuple(node->first_part_slot);
2542 ExecClearTuple(node->agg_row_slot);
2543 ExecClearTuple(node->temp_slot_1);
2544 ExecClearTuple(node->temp_slot_2);
2545 if (node->framehead_slot)
2546 ExecClearTuple(node->framehead_slot);
2547 if (node->frametail_slot)
2548 ExecClearTuple(node->frametail_slot);
2549
2550 /*
2551 * Free both the expr contexts.
2552 */
2553 ExecFreeExprContext(&node->ss.ps);
2554 node->ss.ps.ps_ExprContext = node->tmpcontext;
2555 ExecFreeExprContext(&node->ss.ps);
2556
2557 for (i = 0; i < node->numaggs; i++)
2558 {
2559 if (node->peragg[i].aggcontext != node->aggcontext)
2560 MemoryContextDelete(node->peragg[i].aggcontext);
2561 }
2562 MemoryContextDelete(node->partcontext);
2563 MemoryContextDelete(node->aggcontext);
2564
2565 pfree(node->perfunc);
2566 pfree(node->peragg);
2567
2568 outerPlan = outerPlanState(node);
2569 ExecEndNode(outerPlan);
2570 }
2571
2572 /* -----------------
2573 * ExecReScanWindowAgg
2574 * -----------------
2575 */
2576 void
ExecReScanWindowAgg(WindowAggState * node)2577 ExecReScanWindowAgg(WindowAggState *node)
2578 {
2579 PlanState *outerPlan = outerPlanState(node);
2580 ExprContext *econtext = node->ss.ps.ps_ExprContext;
2581
2582 node->all_done = false;
2583 node->all_first = true;
2584
2585 /* release tuplestore et al */
2586 release_partition(node);
2587
2588 /* release all temp tuples, but especially first_part_slot */
2589 ExecClearTuple(node->ss.ss_ScanTupleSlot);
2590 ExecClearTuple(node->first_part_slot);
2591 ExecClearTuple(node->agg_row_slot);
2592 ExecClearTuple(node->temp_slot_1);
2593 ExecClearTuple(node->temp_slot_2);
2594 if (node->framehead_slot)
2595 ExecClearTuple(node->framehead_slot);
2596 if (node->frametail_slot)
2597 ExecClearTuple(node->frametail_slot);
2598
2599 /* Forget current wfunc values */
2600 MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * node->numfuncs);
2601 MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * node->numfuncs);
2602
2603 /*
2604 * if chgParam of subnode is not null then plan will be re-scanned by
2605 * first ExecProcNode.
2606 */
2607 if (outerPlan->chgParam == NULL)
2608 ExecReScan(outerPlan);
2609 }
2610
2611 /*
2612 * initialize_peragg
2613 *
2614 * Almost same as in nodeAgg.c, except we don't support DISTINCT currently.
2615 */
2616 static WindowStatePerAggData *
initialize_peragg(WindowAggState * winstate,WindowFunc * wfunc,WindowStatePerAgg peraggstate)2617 initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
2618 WindowStatePerAgg peraggstate)
2619 {
2620 Oid inputTypes[FUNC_MAX_ARGS];
2621 int numArguments;
2622 HeapTuple aggTuple;
2623 Form_pg_aggregate aggform;
2624 Oid aggtranstype;
2625 AttrNumber initvalAttNo;
2626 AclResult aclresult;
2627 bool use_ma_code;
2628 Oid transfn_oid,
2629 invtransfn_oid,
2630 finalfn_oid;
2631 bool finalextra;
2632 char finalmodify;
2633 Expr *transfnexpr,
2634 *invtransfnexpr,
2635 *finalfnexpr;
2636 Datum textInitVal;
2637 int i;
2638 ListCell *lc;
2639
2640 numArguments = list_length(wfunc->args);
2641
2642 i = 0;
2643 foreach(lc, wfunc->args)
2644 {
2645 inputTypes[i++] = exprType((Node *) lfirst(lc));
2646 }
2647
2648 aggTuple = SearchSysCache1(AGGFNOID, ObjectIdGetDatum(wfunc->winfnoid));
2649 if (!HeapTupleIsValid(aggTuple))
2650 elog(ERROR, "cache lookup failed for aggregate %u",
2651 wfunc->winfnoid);
2652 aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
2653
2654 /*
2655 * Figure out whether we want to use the moving-aggregate implementation,
2656 * and collect the right set of fields from the pg_attribute entry.
2657 *
2658 * It's possible that an aggregate would supply a safe moving-aggregate
2659 * implementation and an unsafe normal one, in which case our hand is
2660 * forced. Otherwise, if the frame head can't move, we don't need
2661 * moving-aggregate code. Even if we'd like to use it, don't do so if the
2662 * aggregate's arguments (and FILTER clause if any) contain any calls to
2663 * volatile functions. Otherwise, the difference between restarting and
2664 * not restarting the aggregation would be user-visible.
2665 */
2666 if (!OidIsValid(aggform->aggminvtransfn))
2667 use_ma_code = false; /* sine qua non */
2668 else if (aggform->aggmfinalmodify == AGGMODIFY_READ_ONLY &&
2669 aggform->aggfinalmodify != AGGMODIFY_READ_ONLY)
2670 use_ma_code = true; /* decision forced by safety */
2671 else if (winstate->frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING)
2672 use_ma_code = false; /* non-moving frame head */
2673 else if (contain_volatile_functions((Node *) wfunc))
2674 use_ma_code = false; /* avoid possible behavioral change */
2675 else
2676 use_ma_code = true; /* yes, let's use it */
2677 if (use_ma_code)
2678 {
2679 peraggstate->transfn_oid = transfn_oid = aggform->aggmtransfn;
2680 peraggstate->invtransfn_oid = invtransfn_oid = aggform->aggminvtransfn;
2681 peraggstate->finalfn_oid = finalfn_oid = aggform->aggmfinalfn;
2682 finalextra = aggform->aggmfinalextra;
2683 finalmodify = aggform->aggmfinalmodify;
2684 aggtranstype = aggform->aggmtranstype;
2685 initvalAttNo = Anum_pg_aggregate_aggminitval;
2686 }
2687 else
2688 {
2689 peraggstate->transfn_oid = transfn_oid = aggform->aggtransfn;
2690 peraggstate->invtransfn_oid = invtransfn_oid = InvalidOid;
2691 peraggstate->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
2692 finalextra = aggform->aggfinalextra;
2693 finalmodify = aggform->aggfinalmodify;
2694 aggtranstype = aggform->aggtranstype;
2695 initvalAttNo = Anum_pg_aggregate_agginitval;
2696 }
2697
2698 /*
2699 * ExecInitWindowAgg already checked permission to call aggregate function
2700 * ... but we still need to check the component functions
2701 */
2702
2703 /* Check that aggregate owner has permission to call component fns */
2704 {
2705 HeapTuple procTuple;
2706 Oid aggOwner;
2707
2708 procTuple = SearchSysCache1(PROCOID,
2709 ObjectIdGetDatum(wfunc->winfnoid));
2710 if (!HeapTupleIsValid(procTuple))
2711 elog(ERROR, "cache lookup failed for function %u",
2712 wfunc->winfnoid);
2713 aggOwner = ((Form_pg_proc) GETSTRUCT(procTuple))->proowner;
2714 ReleaseSysCache(procTuple);
2715
2716 aclresult = pg_proc_aclcheck(transfn_oid, aggOwner,
2717 ACL_EXECUTE);
2718 if (aclresult != ACLCHECK_OK)
2719 aclcheck_error(aclresult, OBJECT_FUNCTION,
2720 get_func_name(transfn_oid));
2721 InvokeFunctionExecuteHook(transfn_oid);
2722
2723 if (OidIsValid(invtransfn_oid))
2724 {
2725 aclresult = pg_proc_aclcheck(invtransfn_oid, aggOwner,
2726 ACL_EXECUTE);
2727 if (aclresult != ACLCHECK_OK)
2728 aclcheck_error(aclresult, OBJECT_FUNCTION,
2729 get_func_name(invtransfn_oid));
2730 InvokeFunctionExecuteHook(invtransfn_oid);
2731 }
2732
2733 if (OidIsValid(finalfn_oid))
2734 {
2735 aclresult = pg_proc_aclcheck(finalfn_oid, aggOwner,
2736 ACL_EXECUTE);
2737 if (aclresult != ACLCHECK_OK)
2738 aclcheck_error(aclresult, OBJECT_FUNCTION,
2739 get_func_name(finalfn_oid));
2740 InvokeFunctionExecuteHook(finalfn_oid);
2741 }
2742 }
2743
2744 /*
2745 * If the selected finalfn isn't read-only, we can't run this aggregate as
2746 * a window function. This is a user-facing error, so we take a bit more
2747 * care with the error message than elsewhere in this function.
2748 */
2749 if (finalmodify != AGGMODIFY_READ_ONLY)
2750 ereport(ERROR,
2751 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2752 errmsg("aggregate function %s does not support use as a window function",
2753 format_procedure(wfunc->winfnoid))));
2754
2755 /* Detect how many arguments to pass to the finalfn */
2756 if (finalextra)
2757 peraggstate->numFinalArgs = numArguments + 1;
2758 else
2759 peraggstate->numFinalArgs = 1;
2760
2761 /* resolve actual type of transition state, if polymorphic */
2762 aggtranstype = resolve_aggregate_transtype(wfunc->winfnoid,
2763 aggtranstype,
2764 inputTypes,
2765 numArguments);
2766
2767 /* build expression trees using actual argument & result types */
2768 build_aggregate_transfn_expr(inputTypes,
2769 numArguments,
2770 0, /* no ordered-set window functions yet */
2771 false, /* no variadic window functions yet */
2772 aggtranstype,
2773 wfunc->inputcollid,
2774 transfn_oid,
2775 invtransfn_oid,
2776 &transfnexpr,
2777 &invtransfnexpr);
2778
2779 /* set up infrastructure for calling the transfn(s) and finalfn */
2780 fmgr_info(transfn_oid, &peraggstate->transfn);
2781 fmgr_info_set_expr((Node *) transfnexpr, &peraggstate->transfn);
2782
2783 if (OidIsValid(invtransfn_oid))
2784 {
2785 fmgr_info(invtransfn_oid, &peraggstate->invtransfn);
2786 fmgr_info_set_expr((Node *) invtransfnexpr, &peraggstate->invtransfn);
2787 }
2788
2789 if (OidIsValid(finalfn_oid))
2790 {
2791 build_aggregate_finalfn_expr(inputTypes,
2792 peraggstate->numFinalArgs,
2793 aggtranstype,
2794 wfunc->wintype,
2795 wfunc->inputcollid,
2796 finalfn_oid,
2797 &finalfnexpr);
2798 fmgr_info(finalfn_oid, &peraggstate->finalfn);
2799 fmgr_info_set_expr((Node *) finalfnexpr, &peraggstate->finalfn);
2800 }
2801
2802 /* get info about relevant datatypes */
2803 get_typlenbyval(wfunc->wintype,
2804 &peraggstate->resulttypeLen,
2805 &peraggstate->resulttypeByVal);
2806 get_typlenbyval(aggtranstype,
2807 &peraggstate->transtypeLen,
2808 &peraggstate->transtypeByVal);
2809
2810 /*
2811 * initval is potentially null, so don't try to access it as a struct
2812 * field. Must do it the hard way with SysCacheGetAttr.
2813 */
2814 textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple, initvalAttNo,
2815 &peraggstate->initValueIsNull);
2816
2817 if (peraggstate->initValueIsNull)
2818 peraggstate->initValue = (Datum) 0;
2819 else
2820 peraggstate->initValue = GetAggInitVal(textInitVal,
2821 aggtranstype);
2822
2823 /*
2824 * If the transfn is strict and the initval is NULL, make sure input type
2825 * and transtype are the same (or at least binary-compatible), so that
2826 * it's OK to use the first input value as the initial transValue. This
2827 * should have been checked at agg definition time, but we must check
2828 * again in case the transfn's strictness property has been changed.
2829 */
2830 if (peraggstate->transfn.fn_strict && peraggstate->initValueIsNull)
2831 {
2832 if (numArguments < 1 ||
2833 !IsBinaryCoercible(inputTypes[0], aggtranstype))
2834 ereport(ERROR,
2835 (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
2836 errmsg("aggregate %u needs to have compatible input type and transition type",
2837 wfunc->winfnoid)));
2838 }
2839
2840 /*
2841 * Insist that forward and inverse transition functions have the same
2842 * strictness setting. Allowing them to differ would require handling
2843 * more special cases in advance_windowaggregate and
2844 * advance_windowaggregate_base, for no discernible benefit. This should
2845 * have been checked at agg definition time, but we must check again in
2846 * case either function's strictness property has been changed.
2847 */
2848 if (OidIsValid(invtransfn_oid) &&
2849 peraggstate->transfn.fn_strict != peraggstate->invtransfn.fn_strict)
2850 ereport(ERROR,
2851 (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
2852 errmsg("strictness of aggregate's forward and inverse transition functions must match")));
2853
2854 /*
2855 * Moving aggregates use their own aggcontext.
2856 *
2857 * This is necessary because they might restart at different times, so we
2858 * might never be able to reset the shared context otherwise. We can't
2859 * make it the aggregates' responsibility to clean up after themselves,
2860 * because strict aggregates must be restarted whenever we remove their
2861 * last non-NULL input, which the aggregate won't be aware is happening.
2862 * Also, just pfree()ing the transValue upon restarting wouldn't help,
2863 * since we'd miss any indirectly referenced data. We could, in theory,
2864 * make the memory allocation rules for moving aggregates different than
2865 * they have historically been for plain aggregates, but that seems grotty
2866 * and likely to lead to memory leaks.
2867 */
2868 if (OidIsValid(invtransfn_oid))
2869 peraggstate->aggcontext =
2870 AllocSetContextCreate(CurrentMemoryContext,
2871 "WindowAgg Per Aggregate",
2872 ALLOCSET_DEFAULT_SIZES);
2873 else
2874 peraggstate->aggcontext = winstate->aggcontext;
2875
2876 ReleaseSysCache(aggTuple);
2877
2878 return peraggstate;
2879 }
2880
2881 static Datum
GetAggInitVal(Datum textInitVal,Oid transtype)2882 GetAggInitVal(Datum textInitVal, Oid transtype)
2883 {
2884 Oid typinput,
2885 typioparam;
2886 char *strInitVal;
2887 Datum initVal;
2888
2889 getTypeInputInfo(transtype, &typinput, &typioparam);
2890 strInitVal = TextDatumGetCString(textInitVal);
2891 initVal = OidInputFunctionCall(typinput, strInitVal,
2892 typioparam, -1);
2893 pfree(strInitVal);
2894 return initVal;
2895 }
2896
2897 /*
2898 * are_peers
2899 * compare two rows to see if they are equal according to the ORDER BY clause
2900 *
2901 * NB: this does not consider the window frame mode.
2902 */
2903 static bool
are_peers(WindowAggState * winstate,TupleTableSlot * slot1,TupleTableSlot * slot2)2904 are_peers(WindowAggState *winstate, TupleTableSlot *slot1,
2905 TupleTableSlot *slot2)
2906 {
2907 WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
2908 ExprContext *econtext = winstate->tmpcontext;
2909
2910 /* If no ORDER BY, all rows are peers with each other */
2911 if (node->ordNumCols == 0)
2912 return true;
2913
2914 econtext->ecxt_outertuple = slot1;
2915 econtext->ecxt_innertuple = slot2;
2916 return ExecQualAndReset(winstate->ordEqfunction, econtext);
2917 }
2918
2919 /*
2920 * window_gettupleslot
2921 * Fetch the pos'th tuple of the current partition into the slot,
2922 * using the winobj's read pointer
2923 *
2924 * Returns true if successful, false if no such row
2925 */
2926 static bool
window_gettupleslot(WindowObject winobj,int64 pos,TupleTableSlot * slot)2927 window_gettupleslot(WindowObject winobj, int64 pos, TupleTableSlot *slot)
2928 {
2929 WindowAggState *winstate = winobj->winstate;
2930 MemoryContext oldcontext;
2931
2932 /* often called repeatedly in a row */
2933 CHECK_FOR_INTERRUPTS();
2934
2935 /* Don't allow passing -1 to spool_tuples here */
2936 if (pos < 0)
2937 return false;
2938
2939 /* If necessary, fetch the tuple into the spool */
2940 spool_tuples(winstate, pos);
2941
2942 if (pos >= winstate->spooled_rows)
2943 return false;
2944
2945 if (pos < winobj->markpos)
2946 elog(ERROR, "cannot fetch row before WindowObject's mark position");
2947
2948 oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
2949
2950 tuplestore_select_read_pointer(winstate->buffer, winobj->readptr);
2951
2952 /*
2953 * Advance or rewind until we are within one tuple of the one we want.
2954 */
2955 if (winobj->seekpos < pos - 1)
2956 {
2957 if (!tuplestore_skiptuples(winstate->buffer,
2958 pos - 1 - winobj->seekpos,
2959 true))
2960 elog(ERROR, "unexpected end of tuplestore");
2961 winobj->seekpos = pos - 1;
2962 }
2963 else if (winobj->seekpos > pos + 1)
2964 {
2965 if (!tuplestore_skiptuples(winstate->buffer,
2966 winobj->seekpos - (pos + 1),
2967 false))
2968 elog(ERROR, "unexpected end of tuplestore");
2969 winobj->seekpos = pos + 1;
2970 }
2971 else if (winobj->seekpos == pos)
2972 {
2973 /*
2974 * There's no API to refetch the tuple at the current position. We
2975 * have to move one tuple forward, and then one backward. (We don't
2976 * do it the other way because we might try to fetch the row before
2977 * our mark, which isn't allowed.) XXX this case could stand to be
2978 * optimized.
2979 */
2980 tuplestore_advance(winstate->buffer, true);
2981 winobj->seekpos++;
2982 }
2983
2984 /*
2985 * Now we should be on the tuple immediately before or after the one we
2986 * want, so just fetch forwards or backwards as appropriate.
2987 */
2988 if (winobj->seekpos > pos)
2989 {
2990 if (!tuplestore_gettupleslot(winstate->buffer, false, true, slot))
2991 elog(ERROR, "unexpected end of tuplestore");
2992 winobj->seekpos--;
2993 }
2994 else
2995 {
2996 if (!tuplestore_gettupleslot(winstate->buffer, true, true, slot))
2997 elog(ERROR, "unexpected end of tuplestore");
2998 winobj->seekpos++;
2999 }
3000
3001 Assert(winobj->seekpos == pos);
3002
3003 MemoryContextSwitchTo(oldcontext);
3004
3005 return true;
3006 }
3007
3008
3009 /***********************************************************************
3010 * API exposed to window functions
3011 ***********************************************************************/
3012
3013
3014 /*
3015 * WinGetPartitionLocalMemory
3016 * Get working memory that lives till end of partition processing
3017 *
3018 * On first call within a given partition, this allocates and zeroes the
3019 * requested amount of space. Subsequent calls just return the same chunk.
3020 *
3021 * Memory obtained this way is normally used to hold state that should be
3022 * automatically reset for each new partition. If a window function wants
3023 * to hold state across the whole query, fcinfo->fn_extra can be used in the
3024 * usual way for that.
3025 */
3026 void *
WinGetPartitionLocalMemory(WindowObject winobj,Size sz)3027 WinGetPartitionLocalMemory(WindowObject winobj, Size sz)
3028 {
3029 Assert(WindowObjectIsValid(winobj));
3030 if (winobj->localmem == NULL)
3031 winobj->localmem =
3032 MemoryContextAllocZero(winobj->winstate->partcontext, sz);
3033 return winobj->localmem;
3034 }
3035
3036 /*
3037 * WinGetCurrentPosition
3038 * Return the current row's position (counting from 0) within the current
3039 * partition.
3040 */
3041 int64
WinGetCurrentPosition(WindowObject winobj)3042 WinGetCurrentPosition(WindowObject winobj)
3043 {
3044 Assert(WindowObjectIsValid(winobj));
3045 return winobj->winstate->currentpos;
3046 }
3047
3048 /*
3049 * WinGetPartitionRowCount
3050 * Return total number of rows contained in the current partition.
3051 *
3052 * Note: this is a relatively expensive operation because it forces the
3053 * whole partition to be "spooled" into the tuplestore at once. Once
3054 * executed, however, additional calls within the same partition are cheap.
3055 */
3056 int64
WinGetPartitionRowCount(WindowObject winobj)3057 WinGetPartitionRowCount(WindowObject winobj)
3058 {
3059 Assert(WindowObjectIsValid(winobj));
3060 spool_tuples(winobj->winstate, -1);
3061 return winobj->winstate->spooled_rows;
3062 }
3063
3064 /*
3065 * WinSetMarkPosition
3066 * Set the "mark" position for the window object, which is the oldest row
3067 * number (counting from 0) it is allowed to fetch during all subsequent
3068 * operations within the current partition.
3069 *
3070 * Window functions do not have to call this, but are encouraged to move the
3071 * mark forward when possible to keep the tuplestore size down and prevent
3072 * having to spill rows to disk.
3073 */
3074 void
WinSetMarkPosition(WindowObject winobj,int64 markpos)3075 WinSetMarkPosition(WindowObject winobj, int64 markpos)
3076 {
3077 WindowAggState *winstate;
3078
3079 Assert(WindowObjectIsValid(winobj));
3080 winstate = winobj->winstate;
3081
3082 if (markpos < winobj->markpos)
3083 elog(ERROR, "cannot move WindowObject's mark position backward");
3084 tuplestore_select_read_pointer(winstate->buffer, winobj->markptr);
3085 if (markpos > winobj->markpos)
3086 {
3087 tuplestore_skiptuples(winstate->buffer,
3088 markpos - winobj->markpos,
3089 true);
3090 winobj->markpos = markpos;
3091 }
3092 tuplestore_select_read_pointer(winstate->buffer, winobj->readptr);
3093 if (markpos > winobj->seekpos)
3094 {
3095 tuplestore_skiptuples(winstate->buffer,
3096 markpos - winobj->seekpos,
3097 true);
3098 winobj->seekpos = markpos;
3099 }
3100 }
3101
3102 /*
3103 * WinRowsArePeers
3104 * Compare two rows (specified by absolute position in partition) to see
3105 * if they are equal according to the ORDER BY clause.
3106 *
3107 * NB: this does not consider the window frame mode.
3108 */
3109 bool
WinRowsArePeers(WindowObject winobj,int64 pos1,int64 pos2)3110 WinRowsArePeers(WindowObject winobj, int64 pos1, int64 pos2)
3111 {
3112 WindowAggState *winstate;
3113 WindowAgg *node;
3114 TupleTableSlot *slot1;
3115 TupleTableSlot *slot2;
3116 bool res;
3117
3118 Assert(WindowObjectIsValid(winobj));
3119 winstate = winobj->winstate;
3120 node = (WindowAgg *) winstate->ss.ps.plan;
3121
3122 /* If no ORDER BY, all rows are peers; don't bother to fetch them */
3123 if (node->ordNumCols == 0)
3124 return true;
3125
3126 /*
3127 * Note: OK to use temp_slot_2 here because we aren't calling any
3128 * frame-related functions (those tend to clobber temp_slot_2).
3129 */
3130 slot1 = winstate->temp_slot_1;
3131 slot2 = winstate->temp_slot_2;
3132
3133 if (!window_gettupleslot(winobj, pos1, slot1))
3134 elog(ERROR, "specified position is out of window: " INT64_FORMAT,
3135 pos1);
3136 if (!window_gettupleslot(winobj, pos2, slot2))
3137 elog(ERROR, "specified position is out of window: " INT64_FORMAT,
3138 pos2);
3139
3140 res = are_peers(winstate, slot1, slot2);
3141
3142 ExecClearTuple(slot1);
3143 ExecClearTuple(slot2);
3144
3145 return res;
3146 }
3147
3148 /*
3149 * WinGetFuncArgInPartition
3150 * Evaluate a window function's argument expression on a specified
3151 * row of the partition. The row is identified in lseek(2) style,
3152 * i.e. relative to the current, first, or last row.
3153 *
3154 * argno: argument number to evaluate (counted from 0)
3155 * relpos: signed rowcount offset from the seek position
3156 * seektype: WINDOW_SEEK_CURRENT, WINDOW_SEEK_HEAD, or WINDOW_SEEK_TAIL
3157 * set_mark: If the row is found and set_mark is true, the mark is moved to
3158 * the row as a side-effect.
3159 * isnull: output argument, receives isnull status of result
3160 * isout: output argument, set to indicate whether target row position
3161 * is out of partition (can pass NULL if caller doesn't care about this)
3162 *
3163 * Specifying a nonexistent row is not an error, it just causes a null result
3164 * (plus setting *isout true, if isout isn't NULL).
3165 */
3166 Datum
WinGetFuncArgInPartition(WindowObject winobj,int argno,int relpos,int seektype,bool set_mark,bool * isnull,bool * isout)3167 WinGetFuncArgInPartition(WindowObject winobj, int argno,
3168 int relpos, int seektype, bool set_mark,
3169 bool *isnull, bool *isout)
3170 {
3171 WindowAggState *winstate;
3172 ExprContext *econtext;
3173 TupleTableSlot *slot;
3174 bool gottuple;
3175 int64 abs_pos;
3176
3177 Assert(WindowObjectIsValid(winobj));
3178 winstate = winobj->winstate;
3179 econtext = winstate->ss.ps.ps_ExprContext;
3180 slot = winstate->temp_slot_1;
3181
3182 switch (seektype)
3183 {
3184 case WINDOW_SEEK_CURRENT:
3185 abs_pos = winstate->currentpos + relpos;
3186 break;
3187 case WINDOW_SEEK_HEAD:
3188 abs_pos = relpos;
3189 break;
3190 case WINDOW_SEEK_TAIL:
3191 spool_tuples(winstate, -1);
3192 abs_pos = winstate->spooled_rows - 1 + relpos;
3193 break;
3194 default:
3195 elog(ERROR, "unrecognized window seek type: %d", seektype);
3196 abs_pos = 0; /* keep compiler quiet */
3197 break;
3198 }
3199
3200 gottuple = window_gettupleslot(winobj, abs_pos, slot);
3201
3202 if (!gottuple)
3203 {
3204 if (isout)
3205 *isout = true;
3206 *isnull = true;
3207 return (Datum) 0;
3208 }
3209 else
3210 {
3211 if (isout)
3212 *isout = false;
3213 if (set_mark)
3214 WinSetMarkPosition(winobj, abs_pos);
3215 econtext->ecxt_outertuple = slot;
3216 return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
3217 econtext, isnull);
3218 }
3219 }
3220
3221 /*
3222 * WinGetFuncArgInFrame
3223 * Evaluate a window function's argument expression on a specified
3224 * row of the window frame. The row is identified in lseek(2) style,
3225 * i.e. relative to the first or last row of the frame. (We do not
3226 * support WINDOW_SEEK_CURRENT here, because it's not very clear what
3227 * that should mean if the current row isn't part of the frame.)
3228 *
3229 * argno: argument number to evaluate (counted from 0)
3230 * relpos: signed rowcount offset from the seek position
3231 * seektype: WINDOW_SEEK_HEAD or WINDOW_SEEK_TAIL
3232 * set_mark: If the row is found/in frame and set_mark is true, the mark is
3233 * moved to the row as a side-effect.
3234 * isnull: output argument, receives isnull status of result
3235 * isout: output argument, set to indicate whether target row position
3236 * is out of frame (can pass NULL if caller doesn't care about this)
3237 *
3238 * Specifying a nonexistent or not-in-frame row is not an error, it just
3239 * causes a null result (plus setting *isout true, if isout isn't NULL).
3240 *
3241 * Note that some exclusion-clause options lead to situations where the
3242 * rows that are in-frame are not consecutive in the partition. But we
3243 * count only in-frame rows when measuring relpos.
3244 *
3245 * The set_mark flag is interpreted as meaning that the caller will specify
3246 * a constant (or, perhaps, monotonically increasing) relpos in successive
3247 * calls, so that *if there is no exclusion clause* there will be no need
3248 * to fetch a row before the previously fetched row. But we do not expect
3249 * the caller to know how to account for exclusion clauses. Therefore,
3250 * if there is an exclusion clause we take responsibility for adjusting the
3251 * mark request to something that will be safe given the above assumption
3252 * about relpos.
3253 */
3254 Datum
WinGetFuncArgInFrame(WindowObject winobj,int argno,int relpos,int seektype,bool set_mark,bool * isnull,bool * isout)3255 WinGetFuncArgInFrame(WindowObject winobj, int argno,
3256 int relpos, int seektype, bool set_mark,
3257 bool *isnull, bool *isout)
3258 {
3259 WindowAggState *winstate;
3260 ExprContext *econtext;
3261 TupleTableSlot *slot;
3262 int64 abs_pos;
3263 int64 mark_pos;
3264
3265 Assert(WindowObjectIsValid(winobj));
3266 winstate = winobj->winstate;
3267 econtext = winstate->ss.ps.ps_ExprContext;
3268 slot = winstate->temp_slot_1;
3269
3270 switch (seektype)
3271 {
3272 case WINDOW_SEEK_CURRENT:
3273 elog(ERROR, "WINDOW_SEEK_CURRENT is not supported for WinGetFuncArgInFrame");
3274 abs_pos = mark_pos = 0; /* keep compiler quiet */
3275 break;
3276 case WINDOW_SEEK_HEAD:
3277 /* rejecting relpos < 0 is easy and simplifies code below */
3278 if (relpos < 0)
3279 goto out_of_frame;
3280 update_frameheadpos(winstate);
3281 abs_pos = winstate->frameheadpos + relpos;
3282 mark_pos = abs_pos;
3283
3284 /*
3285 * Account for exclusion option if one is active, but advance only
3286 * abs_pos not mark_pos. This prevents changes of the current
3287 * row's peer group from resulting in trying to fetch a row before
3288 * some previous mark position.
3289 *
3290 * Note that in some corner cases such as current row being
3291 * outside frame, these calculations are theoretically too simple,
3292 * but it doesn't matter because we'll end up deciding the row is
3293 * out of frame. We do not attempt to avoid fetching rows past
3294 * end of frame; that would happen in some cases anyway.
3295 */
3296 switch (winstate->frameOptions & FRAMEOPTION_EXCLUSION)
3297 {
3298 case 0:
3299 /* no adjustment needed */
3300 break;
3301 case FRAMEOPTION_EXCLUDE_CURRENT_ROW:
3302 if (abs_pos >= winstate->currentpos &&
3303 winstate->currentpos >= winstate->frameheadpos)
3304 abs_pos++;
3305 break;
3306 case FRAMEOPTION_EXCLUDE_GROUP:
3307 update_grouptailpos(winstate);
3308 if (abs_pos >= winstate->groupheadpos &&
3309 winstate->grouptailpos > winstate->frameheadpos)
3310 {
3311 int64 overlapstart = Max(winstate->groupheadpos,
3312 winstate->frameheadpos);
3313
3314 abs_pos += winstate->grouptailpos - overlapstart;
3315 }
3316 break;
3317 case FRAMEOPTION_EXCLUDE_TIES:
3318 update_grouptailpos(winstate);
3319 if (abs_pos >= winstate->groupheadpos &&
3320 winstate->grouptailpos > winstate->frameheadpos)
3321 {
3322 int64 overlapstart = Max(winstate->groupheadpos,
3323 winstate->frameheadpos);
3324
3325 if (abs_pos == overlapstart)
3326 abs_pos = winstate->currentpos;
3327 else
3328 abs_pos += winstate->grouptailpos - overlapstart - 1;
3329 }
3330 break;
3331 default:
3332 elog(ERROR, "unrecognized frame option state: 0x%x",
3333 winstate->frameOptions);
3334 break;
3335 }
3336 break;
3337 case WINDOW_SEEK_TAIL:
3338 /* rejecting relpos > 0 is easy and simplifies code below */
3339 if (relpos > 0)
3340 goto out_of_frame;
3341 update_frametailpos(winstate);
3342 abs_pos = winstate->frametailpos - 1 + relpos;
3343
3344 /*
3345 * Account for exclusion option if one is active. If there is no
3346 * exclusion, we can safely set the mark at the accessed row. But
3347 * if there is, we can only mark the frame start, because we can't
3348 * be sure how far back in the frame the exclusion might cause us
3349 * to fetch in future. Furthermore, we have to actually check
3350 * against frameheadpos here, since it's unsafe to try to fetch a
3351 * row before frame start if the mark might be there already.
3352 */
3353 switch (winstate->frameOptions & FRAMEOPTION_EXCLUSION)
3354 {
3355 case 0:
3356 /* no adjustment needed */
3357 mark_pos = abs_pos;
3358 break;
3359 case FRAMEOPTION_EXCLUDE_CURRENT_ROW:
3360 if (abs_pos <= winstate->currentpos &&
3361 winstate->currentpos < winstate->frametailpos)
3362 abs_pos--;
3363 update_frameheadpos(winstate);
3364 if (abs_pos < winstate->frameheadpos)
3365 goto out_of_frame;
3366 mark_pos = winstate->frameheadpos;
3367 break;
3368 case FRAMEOPTION_EXCLUDE_GROUP:
3369 update_grouptailpos(winstate);
3370 if (abs_pos < winstate->grouptailpos &&
3371 winstate->groupheadpos < winstate->frametailpos)
3372 {
3373 int64 overlapend = Min(winstate->grouptailpos,
3374 winstate->frametailpos);
3375
3376 abs_pos -= overlapend - winstate->groupheadpos;
3377 }
3378 update_frameheadpos(winstate);
3379 if (abs_pos < winstate->frameheadpos)
3380 goto out_of_frame;
3381 mark_pos = winstate->frameheadpos;
3382 break;
3383 case FRAMEOPTION_EXCLUDE_TIES:
3384 update_grouptailpos(winstate);
3385 if (abs_pos < winstate->grouptailpos &&
3386 winstate->groupheadpos < winstate->frametailpos)
3387 {
3388 int64 overlapend = Min(winstate->grouptailpos,
3389 winstate->frametailpos);
3390
3391 if (abs_pos == overlapend - 1)
3392 abs_pos = winstate->currentpos;
3393 else
3394 abs_pos -= overlapend - 1 - winstate->groupheadpos;
3395 }
3396 update_frameheadpos(winstate);
3397 if (abs_pos < winstate->frameheadpos)
3398 goto out_of_frame;
3399 mark_pos = winstate->frameheadpos;
3400 break;
3401 default:
3402 elog(ERROR, "unrecognized frame option state: 0x%x",
3403 winstate->frameOptions);
3404 mark_pos = 0; /* keep compiler quiet */
3405 break;
3406 }
3407 break;
3408 default:
3409 elog(ERROR, "unrecognized window seek type: %d", seektype);
3410 abs_pos = mark_pos = 0; /* keep compiler quiet */
3411 break;
3412 }
3413
3414 if (!window_gettupleslot(winobj, abs_pos, slot))
3415 goto out_of_frame;
3416
3417 /* The code above does not detect all out-of-frame cases, so check */
3418 if (row_is_in_frame(winstate, abs_pos, slot) <= 0)
3419 goto out_of_frame;
3420
3421 if (isout)
3422 *isout = false;
3423 if (set_mark)
3424 WinSetMarkPosition(winobj, mark_pos);
3425 econtext->ecxt_outertuple = slot;
3426 return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
3427 econtext, isnull);
3428
3429 out_of_frame:
3430 if (isout)
3431 *isout = true;
3432 *isnull = true;
3433 return (Datum) 0;
3434 }
3435
3436 /*
3437 * WinGetFuncArgCurrent
3438 * Evaluate a window function's argument expression on the current row.
3439 *
3440 * argno: argument number to evaluate (counted from 0)
3441 * isnull: output argument, receives isnull status of result
3442 *
3443 * Note: this isn't quite equivalent to WinGetFuncArgInPartition or
3444 * WinGetFuncArgInFrame targeting the current row, because it will succeed
3445 * even if the WindowObject's mark has been set beyond the current row.
3446 * This should generally be used for "ordinary" arguments of a window
3447 * function, such as the offset argument of lead() or lag().
3448 */
3449 Datum
WinGetFuncArgCurrent(WindowObject winobj,int argno,bool * isnull)3450 WinGetFuncArgCurrent(WindowObject winobj, int argno, bool *isnull)
3451 {
3452 WindowAggState *winstate;
3453 ExprContext *econtext;
3454
3455 Assert(WindowObjectIsValid(winobj));
3456 winstate = winobj->winstate;
3457
3458 econtext = winstate->ss.ps.ps_ExprContext;
3459
3460 econtext->ecxt_outertuple = winstate->ss.ss_ScanTupleSlot;
3461 return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
3462 econtext, isnull);
3463 }
3464