1 /*-------------------------------------------------------------------------
2 *
3 * nodeWindowAgg.c
4 * routines to handle WindowAgg nodes.
5 *
6 * A WindowAgg node evaluates "window functions" across suitable partitions
7 * of the input tuple set. Any one WindowAgg works for just a single window
8 * specification, though it can evaluate multiple window functions sharing
9 * identical window specifications. The input tuples are required to be
10 * delivered in sorted order, with the PARTITION BY columns (if any) as
11 * major sort keys and the ORDER BY columns (if any) as minor sort keys.
12 * (The planner generates a stack of WindowAggs with intervening Sort nodes
13 * as needed, if a query involves more than one window specification.)
14 *
15 * Since window functions can require access to any or all of the rows in
16 * the current partition, we accumulate rows of the partition into a
17 * tuplestore. The window functions are called using the WindowObject API
18 * so that they can access those rows as needed.
19 *
20 * We also support using plain aggregate functions as window functions.
21 * For these, the regular Agg-node environment is emulated for each partition.
22 * As required by the SQL spec, the output represents the value of the
23 * aggregate function over all rows in the current row's window frame.
24 *
25 *
26 * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
27 * Portions Copyright (c) 1994, Regents of the University of California
28 *
29 * IDENTIFICATION
30 * src/backend/executor/nodeWindowAgg.c
31 *
32 *-------------------------------------------------------------------------
33 */
34 #include "postgres.h"
35
36 #include "access/htup_details.h"
37 #include "catalog/objectaccess.h"
38 #include "catalog/pg_aggregate.h"
39 #include "catalog/pg_proc.h"
40 #include "executor/executor.h"
41 #include "executor/nodeWindowAgg.h"
42 #include "miscadmin.h"
43 #include "nodes/nodeFuncs.h"
44 #include "optimizer/clauses.h"
45 #include "parser/parse_agg.h"
46 #include "parser/parse_coerce.h"
47 #include "utils/acl.h"
48 #include "utils/builtins.h"
49 #include "utils/datum.h"
50 #include "utils/lsyscache.h"
51 #include "utils/memutils.h"
52 #include "utils/regproc.h"
53 #include "utils/syscache.h"
54 #include "windowapi.h"
55
56 /*
57 * All the window function APIs are called with this object, which is passed
58 * to window functions as fcinfo->context.
59 */
60 typedef struct WindowObjectData
61 {
62 NodeTag type;
63 WindowAggState *winstate; /* parent WindowAggState */
64 List *argstates; /* ExprState trees for fn's arguments */
65 void *localmem; /* WinGetPartitionLocalMemory's chunk */
66 int markptr; /* tuplestore mark pointer for this fn */
67 int readptr; /* tuplestore read pointer for this fn */
68 int64 markpos; /* row that markptr is positioned on */
69 int64 seekpos; /* row that readptr is positioned on */
70 } WindowObjectData;
71
72 /*
73 * We have one WindowStatePerFunc struct for each window function and
74 * window aggregate handled by this node.
75 */
76 typedef struct WindowStatePerFuncData
77 {
78 /* Links to WindowFunc expr and state nodes this working state is for */
79 WindowFuncExprState *wfuncstate;
80 WindowFunc *wfunc;
81
82 int numArguments; /* number of arguments */
83
84 FmgrInfo flinfo; /* fmgr lookup data for window function */
85
86 Oid winCollation; /* collation derived for window function */
87
88 /*
89 * We need the len and byval info for the result of each function in order
90 * to know how to copy/delete values.
91 */
92 int16 resulttypeLen;
93 bool resulttypeByVal;
94
95 bool plain_agg; /* is it just a plain aggregate function? */
96 int aggno; /* if so, index of its PerAggData */
97
98 WindowObject winobj; /* object used in window function API */
99 } WindowStatePerFuncData;
100
101 /*
102 * For plain aggregate window functions, we also have one of these.
103 */
104 typedef struct WindowStatePerAggData
105 {
106 /* Oids of transition functions */
107 Oid transfn_oid;
108 Oid invtransfn_oid; /* may be InvalidOid */
109 Oid finalfn_oid; /* may be InvalidOid */
110
111 /*
112 * fmgr lookup data for transition functions --- only valid when
113 * corresponding oid is not InvalidOid. Note in particular that fn_strict
114 * flags are kept here.
115 */
116 FmgrInfo transfn;
117 FmgrInfo invtransfn;
118 FmgrInfo finalfn;
119
120 int numFinalArgs; /* number of arguments to pass to finalfn */
121
122 /*
123 * initial value from pg_aggregate entry
124 */
125 Datum initValue;
126 bool initValueIsNull;
127
128 /*
129 * cached value for current frame boundaries
130 */
131 Datum resultValue;
132 bool resultValueIsNull;
133
134 /*
135 * We need the len and byval info for the agg's input, result, and
136 * transition data types in order to know how to copy/delete values.
137 */
138 int16 inputtypeLen,
139 resulttypeLen,
140 transtypeLen;
141 bool inputtypeByVal,
142 resulttypeByVal,
143 transtypeByVal;
144
145 int wfuncno; /* index of associated PerFuncData */
146
147 /* Context holding transition value and possibly other subsidiary data */
148 MemoryContext aggcontext; /* may be private, or winstate->aggcontext */
149
150 /* Current transition value */
151 Datum transValue; /* current transition value */
152 bool transValueIsNull;
153
154 int64 transValueCount; /* number of currently-aggregated rows */
155
156 /* Data local to eval_windowaggregates() */
157 bool restart; /* need to restart this agg in this cycle? */
158 } WindowStatePerAggData;
159
160 static void initialize_windowaggregate(WindowAggState *winstate,
161 WindowStatePerFunc perfuncstate,
162 WindowStatePerAgg peraggstate);
163 static void advance_windowaggregate(WindowAggState *winstate,
164 WindowStatePerFunc perfuncstate,
165 WindowStatePerAgg peraggstate);
166 static bool advance_windowaggregate_base(WindowAggState *winstate,
167 WindowStatePerFunc perfuncstate,
168 WindowStatePerAgg peraggstate);
169 static void finalize_windowaggregate(WindowAggState *winstate,
170 WindowStatePerFunc perfuncstate,
171 WindowStatePerAgg peraggstate,
172 Datum *result, bool *isnull);
173
174 static void eval_windowaggregates(WindowAggState *winstate);
175 static void eval_windowfunction(WindowAggState *winstate,
176 WindowStatePerFunc perfuncstate,
177 Datum *result, bool *isnull);
178
179 static void begin_partition(WindowAggState *winstate);
180 static void spool_tuples(WindowAggState *winstate, int64 pos);
181 static void release_partition(WindowAggState *winstate);
182
183 static int row_is_in_frame(WindowAggState *winstate, int64 pos,
184 TupleTableSlot *slot);
185 static void update_frameheadpos(WindowAggState *winstate);
186 static void update_frametailpos(WindowAggState *winstate);
187 static void update_grouptailpos(WindowAggState *winstate);
188
189 static WindowStatePerAggData *initialize_peragg(WindowAggState *winstate,
190 WindowFunc *wfunc,
191 WindowStatePerAgg peraggstate);
192 static Datum GetAggInitVal(Datum textInitVal, Oid transtype);
193
194 static bool are_peers(WindowAggState *winstate, TupleTableSlot *slot1,
195 TupleTableSlot *slot2);
196 static bool window_gettupleslot(WindowObject winobj, int64 pos,
197 TupleTableSlot *slot);
198
199
200 /*
201 * initialize_windowaggregate
202 * parallel to initialize_aggregates in nodeAgg.c
203 */
204 static void
initialize_windowaggregate(WindowAggState * winstate,WindowStatePerFunc perfuncstate,WindowStatePerAgg peraggstate)205 initialize_windowaggregate(WindowAggState *winstate,
206 WindowStatePerFunc perfuncstate,
207 WindowStatePerAgg peraggstate)
208 {
209 MemoryContext oldContext;
210
211 /*
212 * If we're using a private aggcontext, we may reset it here. But if the
213 * context is shared, we don't know which other aggregates may still need
214 * it, so we must leave it to the caller to reset at an appropriate time.
215 */
216 if (peraggstate->aggcontext != winstate->aggcontext)
217 MemoryContextResetAndDeleteChildren(peraggstate->aggcontext);
218
219 if (peraggstate->initValueIsNull)
220 peraggstate->transValue = peraggstate->initValue;
221 else
222 {
223 oldContext = MemoryContextSwitchTo(peraggstate->aggcontext);
224 peraggstate->transValue = datumCopy(peraggstate->initValue,
225 peraggstate->transtypeByVal,
226 peraggstate->transtypeLen);
227 MemoryContextSwitchTo(oldContext);
228 }
229 peraggstate->transValueIsNull = peraggstate->initValueIsNull;
230 peraggstate->transValueCount = 0;
231 peraggstate->resultValue = (Datum) 0;
232 peraggstate->resultValueIsNull = true;
233 }
234
235 /*
236 * advance_windowaggregate
237 * parallel to advance_aggregates in nodeAgg.c
238 */
239 static void
advance_windowaggregate(WindowAggState * winstate,WindowStatePerFunc perfuncstate,WindowStatePerAgg peraggstate)240 advance_windowaggregate(WindowAggState *winstate,
241 WindowStatePerFunc perfuncstate,
242 WindowStatePerAgg peraggstate)
243 {
244 WindowFuncExprState *wfuncstate = perfuncstate->wfuncstate;
245 int numArguments = perfuncstate->numArguments;
246 FunctionCallInfoData fcinfodata;
247 FunctionCallInfo fcinfo = &fcinfodata;
248 Datum newVal;
249 ListCell *arg;
250 int i;
251 MemoryContext oldContext;
252 ExprContext *econtext = winstate->tmpcontext;
253 ExprState *filter = wfuncstate->aggfilter;
254
255 oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
256
257 /* Skip anything FILTERed out */
258 if (filter)
259 {
260 bool isnull;
261 Datum res = ExecEvalExpr(filter, econtext, &isnull);
262
263 if (isnull || !DatumGetBool(res))
264 {
265 MemoryContextSwitchTo(oldContext);
266 return;
267 }
268 }
269
270 /* We start from 1, since the 0th arg will be the transition value */
271 i = 1;
272 foreach(arg, wfuncstate->args)
273 {
274 ExprState *argstate = (ExprState *) lfirst(arg);
275
276 fcinfo->arg[i] = ExecEvalExpr(argstate, econtext,
277 &fcinfo->argnull[i]);
278 i++;
279 }
280
281 if (peraggstate->transfn.fn_strict)
282 {
283 /*
284 * For a strict transfn, nothing happens when there's a NULL input; we
285 * just keep the prior transValue. Note transValueCount doesn't
286 * change either.
287 */
288 for (i = 1; i <= numArguments; i++)
289 {
290 if (fcinfo->argnull[i])
291 {
292 MemoryContextSwitchTo(oldContext);
293 return;
294 }
295 }
296
297 /*
298 * For strict transition functions with initial value NULL we use the
299 * first non-NULL input as the initial state. (We already checked
300 * that the agg's input type is binary-compatible with its transtype,
301 * so straight copy here is OK.)
302 *
303 * We must copy the datum into aggcontext if it is pass-by-ref. We do
304 * not need to pfree the old transValue, since it's NULL.
305 */
306 if (peraggstate->transValueCount == 0 && peraggstate->transValueIsNull)
307 {
308 MemoryContextSwitchTo(peraggstate->aggcontext);
309 peraggstate->transValue = datumCopy(fcinfo->arg[1],
310 peraggstate->transtypeByVal,
311 peraggstate->transtypeLen);
312 peraggstate->transValueIsNull = false;
313 peraggstate->transValueCount = 1;
314 MemoryContextSwitchTo(oldContext);
315 return;
316 }
317
318 if (peraggstate->transValueIsNull)
319 {
320 /*
321 * Don't call a strict function with NULL inputs. Note it is
322 * possible to get here despite the above tests, if the transfn is
323 * strict *and* returned a NULL on a prior cycle. If that happens
324 * we will propagate the NULL all the way to the end. That can
325 * only happen if there's no inverse transition function, though,
326 * since we disallow transitions back to NULL when there is one.
327 */
328 MemoryContextSwitchTo(oldContext);
329 Assert(!OidIsValid(peraggstate->invtransfn_oid));
330 return;
331 }
332 }
333
334 /*
335 * OK to call the transition function. Set winstate->curaggcontext while
336 * calling it, for possible use by AggCheckCallContext.
337 */
338 InitFunctionCallInfoData(*fcinfo, &(peraggstate->transfn),
339 numArguments + 1,
340 perfuncstate->winCollation,
341 (void *) winstate, NULL);
342 fcinfo->arg[0] = peraggstate->transValue;
343 fcinfo->argnull[0] = peraggstate->transValueIsNull;
344 winstate->curaggcontext = peraggstate->aggcontext;
345 newVal = FunctionCallInvoke(fcinfo);
346 winstate->curaggcontext = NULL;
347
348 /*
349 * Moving-aggregate transition functions must not return null, see
350 * advance_windowaggregate_base().
351 */
352 if (fcinfo->isnull && OidIsValid(peraggstate->invtransfn_oid))
353 ereport(ERROR,
354 (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
355 errmsg("moving-aggregate transition function must not return null")));
356
357 /*
358 * We must track the number of rows included in transValue, since to
359 * remove the last input, advance_windowaggregate_base() mustn't call the
360 * inverse transition function, but simply reset transValue back to its
361 * initial value.
362 */
363 peraggstate->transValueCount++;
364
365 /*
366 * If pass-by-ref datatype, must copy the new value into aggcontext and
367 * free the prior transValue. But if transfn returned a pointer to its
368 * first input, we don't need to do anything. Also, if transfn returned a
369 * pointer to a R/W expanded object that is already a child of the
370 * aggcontext, assume we can adopt that value without copying it.
371 */
372 if (!peraggstate->transtypeByVal &&
373 DatumGetPointer(newVal) != DatumGetPointer(peraggstate->transValue))
374 {
375 if (!fcinfo->isnull)
376 {
377 MemoryContextSwitchTo(peraggstate->aggcontext);
378 if (DatumIsReadWriteExpandedObject(newVal,
379 false,
380 peraggstate->transtypeLen) &&
381 MemoryContextGetParent(DatumGetEOHP(newVal)->eoh_context) == CurrentMemoryContext)
382 /* do nothing */ ;
383 else
384 newVal = datumCopy(newVal,
385 peraggstate->transtypeByVal,
386 peraggstate->transtypeLen);
387 }
388 if (!peraggstate->transValueIsNull)
389 {
390 if (DatumIsReadWriteExpandedObject(peraggstate->transValue,
391 false,
392 peraggstate->transtypeLen))
393 DeleteExpandedObject(peraggstate->transValue);
394 else
395 pfree(DatumGetPointer(peraggstate->transValue));
396 }
397 }
398
399 MemoryContextSwitchTo(oldContext);
400 peraggstate->transValue = newVal;
401 peraggstate->transValueIsNull = fcinfo->isnull;
402 }
403
404 /*
405 * advance_windowaggregate_base
406 * Remove the oldest tuple from an aggregation.
407 *
408 * This is very much like advance_windowaggregate, except that we will call
409 * the inverse transition function (which caller must have checked is
410 * available).
411 *
412 * Returns true if we successfully removed the current row from this
413 * aggregate, false if not (in the latter case, caller is responsible
414 * for cleaning up by restarting the aggregation).
415 */
416 static bool
advance_windowaggregate_base(WindowAggState * winstate,WindowStatePerFunc perfuncstate,WindowStatePerAgg peraggstate)417 advance_windowaggregate_base(WindowAggState *winstate,
418 WindowStatePerFunc perfuncstate,
419 WindowStatePerAgg peraggstate)
420 {
421 WindowFuncExprState *wfuncstate = perfuncstate->wfuncstate;
422 int numArguments = perfuncstate->numArguments;
423 FunctionCallInfoData fcinfodata;
424 FunctionCallInfo fcinfo = &fcinfodata;
425 Datum newVal;
426 ListCell *arg;
427 int i;
428 MemoryContext oldContext;
429 ExprContext *econtext = winstate->tmpcontext;
430 ExprState *filter = wfuncstate->aggfilter;
431
432 oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
433
434 /* Skip anything FILTERed out */
435 if (filter)
436 {
437 bool isnull;
438 Datum res = ExecEvalExpr(filter, econtext, &isnull);
439
440 if (isnull || !DatumGetBool(res))
441 {
442 MemoryContextSwitchTo(oldContext);
443 return true;
444 }
445 }
446
447 /* We start from 1, since the 0th arg will be the transition value */
448 i = 1;
449 foreach(arg, wfuncstate->args)
450 {
451 ExprState *argstate = (ExprState *) lfirst(arg);
452
453 fcinfo->arg[i] = ExecEvalExpr(argstate, econtext,
454 &fcinfo->argnull[i]);
455 i++;
456 }
457
458 if (peraggstate->invtransfn.fn_strict)
459 {
460 /*
461 * For a strict (inv)transfn, nothing happens when there's a NULL
462 * input; we just keep the prior transValue. Note transValueCount
463 * doesn't change either.
464 */
465 for (i = 1; i <= numArguments; i++)
466 {
467 if (fcinfo->argnull[i])
468 {
469 MemoryContextSwitchTo(oldContext);
470 return true;
471 }
472 }
473 }
474
475 /* There should still be an added but not yet removed value */
476 Assert(peraggstate->transValueCount > 0);
477
478 /*
479 * In moving-aggregate mode, the state must never be NULL, except possibly
480 * before any rows have been aggregated (which is surely not the case at
481 * this point). This restriction allows us to interpret a NULL result
482 * from the inverse function as meaning "sorry, can't do an inverse
483 * transition in this case". We already checked this in
484 * advance_windowaggregate, but just for safety, check again.
485 */
486 if (peraggstate->transValueIsNull)
487 elog(ERROR, "aggregate transition value is NULL before inverse transition");
488
489 /*
490 * We mustn't use the inverse transition function to remove the last
491 * input. Doing so would yield a non-NULL state, whereas we should be in
492 * the initial state afterwards which may very well be NULL. So instead,
493 * we simply re-initialize the aggregate in this case.
494 */
495 if (peraggstate->transValueCount == 1)
496 {
497 MemoryContextSwitchTo(oldContext);
498 initialize_windowaggregate(winstate,
499 &winstate->perfunc[peraggstate->wfuncno],
500 peraggstate);
501 return true;
502 }
503
504 /*
505 * OK to call the inverse transition function. Set
506 * winstate->curaggcontext while calling it, for possible use by
507 * AggCheckCallContext.
508 */
509 InitFunctionCallInfoData(*fcinfo, &(peraggstate->invtransfn),
510 numArguments + 1,
511 perfuncstate->winCollation,
512 (void *) winstate, NULL);
513 fcinfo->arg[0] = peraggstate->transValue;
514 fcinfo->argnull[0] = peraggstate->transValueIsNull;
515 winstate->curaggcontext = peraggstate->aggcontext;
516 newVal = FunctionCallInvoke(fcinfo);
517 winstate->curaggcontext = NULL;
518
519 /*
520 * If the function returns NULL, report failure, forcing a restart.
521 */
522 if (fcinfo->isnull)
523 {
524 MemoryContextSwitchTo(oldContext);
525 return false;
526 }
527
528 /* Update number of rows included in transValue */
529 peraggstate->transValueCount--;
530
531 /*
532 * If pass-by-ref datatype, must copy the new value into aggcontext and
533 * free the prior transValue. But if invtransfn returned a pointer to its
534 * first input, we don't need to do anything. Also, if invtransfn
535 * returned a pointer to a R/W expanded object that is already a child of
536 * the aggcontext, assume we can adopt that value without copying it.
537 *
538 * Note: the checks for null values here will never fire, but it seems
539 * best to have this stanza look just like advance_windowaggregate.
540 */
541 if (!peraggstate->transtypeByVal &&
542 DatumGetPointer(newVal) != DatumGetPointer(peraggstate->transValue))
543 {
544 if (!fcinfo->isnull)
545 {
546 MemoryContextSwitchTo(peraggstate->aggcontext);
547 if (DatumIsReadWriteExpandedObject(newVal,
548 false,
549 peraggstate->transtypeLen) &&
550 MemoryContextGetParent(DatumGetEOHP(newVal)->eoh_context) == CurrentMemoryContext)
551 /* do nothing */ ;
552 else
553 newVal = datumCopy(newVal,
554 peraggstate->transtypeByVal,
555 peraggstate->transtypeLen);
556 }
557 if (!peraggstate->transValueIsNull)
558 {
559 if (DatumIsReadWriteExpandedObject(peraggstate->transValue,
560 false,
561 peraggstate->transtypeLen))
562 DeleteExpandedObject(peraggstate->transValue);
563 else
564 pfree(DatumGetPointer(peraggstate->transValue));
565 }
566 }
567
568 MemoryContextSwitchTo(oldContext);
569 peraggstate->transValue = newVal;
570 peraggstate->transValueIsNull = fcinfo->isnull;
571
572 return true;
573 }
574
575 /*
576 * finalize_windowaggregate
577 * parallel to finalize_aggregate in nodeAgg.c
578 */
579 static void
finalize_windowaggregate(WindowAggState * winstate,WindowStatePerFunc perfuncstate,WindowStatePerAgg peraggstate,Datum * result,bool * isnull)580 finalize_windowaggregate(WindowAggState *winstate,
581 WindowStatePerFunc perfuncstate,
582 WindowStatePerAgg peraggstate,
583 Datum *result, bool *isnull)
584 {
585 MemoryContext oldContext;
586
587 oldContext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
588
589 /*
590 * Apply the agg's finalfn if one is provided, else return transValue.
591 */
592 if (OidIsValid(peraggstate->finalfn_oid))
593 {
594 int numFinalArgs = peraggstate->numFinalArgs;
595 FunctionCallInfoData fcinfo;
596 bool anynull;
597 int i;
598
599 InitFunctionCallInfoData(fcinfo, &(peraggstate->finalfn),
600 numFinalArgs,
601 perfuncstate->winCollation,
602 (void *) winstate, NULL);
603 fcinfo.arg[0] = MakeExpandedObjectReadOnly(peraggstate->transValue,
604 peraggstate->transValueIsNull,
605 peraggstate->transtypeLen);
606 fcinfo.argnull[0] = peraggstate->transValueIsNull;
607 anynull = peraggstate->transValueIsNull;
608
609 /* Fill any remaining argument positions with nulls */
610 for (i = 1; i < numFinalArgs; i++)
611 {
612 fcinfo.arg[i] = (Datum) 0;
613 fcinfo.argnull[i] = true;
614 anynull = true;
615 }
616
617 if (fcinfo.flinfo->fn_strict && anynull)
618 {
619 /* don't call a strict function with NULL inputs */
620 *result = (Datum) 0;
621 *isnull = true;
622 }
623 else
624 {
625 winstate->curaggcontext = peraggstate->aggcontext;
626 *result = FunctionCallInvoke(&fcinfo);
627 winstate->curaggcontext = NULL;
628 *isnull = fcinfo.isnull;
629 }
630 }
631 else
632 {
633 /* Don't need MakeExpandedObjectReadOnly; datumCopy will copy it */
634 *result = peraggstate->transValue;
635 *isnull = peraggstate->transValueIsNull;
636 }
637
638 /*
639 * If result is pass-by-ref, make sure it is in the right context.
640 */
641 if (!peraggstate->resulttypeByVal && !*isnull &&
642 !MemoryContextContains(CurrentMemoryContext,
643 DatumGetPointer(*result)))
644 *result = datumCopy(*result,
645 peraggstate->resulttypeByVal,
646 peraggstate->resulttypeLen);
647 MemoryContextSwitchTo(oldContext);
648 }
649
650 /*
651 * eval_windowaggregates
652 * evaluate plain aggregates being used as window functions
653 *
654 * This differs from nodeAgg.c in two ways. First, if the window's frame
655 * start position moves, we use the inverse transition function (if it exists)
656 * to remove rows from the transition value. And second, we expect to be
657 * able to call aggregate final functions repeatedly after aggregating more
658 * data onto the same transition value. This is not a behavior required by
659 * nodeAgg.c.
660 */
661 static void
eval_windowaggregates(WindowAggState * winstate)662 eval_windowaggregates(WindowAggState *winstate)
663 {
664 WindowStatePerAgg peraggstate;
665 int wfuncno,
666 numaggs,
667 numaggs_restart,
668 i;
669 int64 aggregatedupto_nonrestarted;
670 MemoryContext oldContext;
671 ExprContext *econtext;
672 WindowObject agg_winobj;
673 TupleTableSlot *agg_row_slot;
674 TupleTableSlot *temp_slot;
675
676 numaggs = winstate->numaggs;
677 if (numaggs == 0)
678 return; /* nothing to do */
679
680 /* final output execution is in ps_ExprContext */
681 econtext = winstate->ss.ps.ps_ExprContext;
682 agg_winobj = winstate->agg_winobj;
683 agg_row_slot = winstate->agg_row_slot;
684 temp_slot = winstate->temp_slot_1;
685
686 /*
687 * If the window's frame start clause is UNBOUNDED_PRECEDING and no
688 * exclusion clause is specified, then the window frame consists of a
689 * contiguous group of rows extending forward from the start of the
690 * partition, and rows only enter the frame, never exit it, as the current
691 * row advances forward. This makes it possible to use an incremental
692 * strategy for evaluating aggregates: we run the transition function for
693 * each row added to the frame, and run the final function whenever we
694 * need the current aggregate value. This is considerably more efficient
695 * than the naive approach of re-running the entire aggregate calculation
696 * for each current row. It does assume that the final function doesn't
697 * damage the running transition value, but we have the same assumption in
698 * nodeAgg.c too (when it rescans an existing hash table).
699 *
700 * If the frame start does sometimes move, we can still optimize as above
701 * whenever successive rows share the same frame head, but if the frame
702 * head moves beyond the previous head we try to remove those rows using
703 * the aggregate's inverse transition function. This function restores
704 * the aggregate's current state to what it would be if the removed row
705 * had never been aggregated in the first place. Inverse transition
706 * functions may optionally return NULL, indicating that the function was
707 * unable to remove the tuple from aggregation. If this happens, or if
708 * the aggregate doesn't have an inverse transition function at all, we
709 * must perform the aggregation all over again for all tuples within the
710 * new frame boundaries.
711 *
712 * If there's any exclusion clause, then we may have to aggregate over a
713 * non-contiguous set of rows, so we punt and recalculate for every row.
714 * (For some frame end choices, it might be that the frame is always
715 * contiguous anyway, but that's an optimization to investigate later.)
716 *
717 * In many common cases, multiple rows share the same frame and hence the
718 * same aggregate value. (In particular, if there's no ORDER BY in a RANGE
719 * window, then all rows are peers and so they all have window frame equal
720 * to the whole partition.) We optimize such cases by calculating the
721 * aggregate value once when we reach the first row of a peer group, and
722 * then returning the saved value for all subsequent rows.
723 *
724 * 'aggregatedupto' keeps track of the first row that has not yet been
725 * accumulated into the aggregate transition values. Whenever we start a
726 * new peer group, we accumulate forward to the end of the peer group.
727 */
728
729 /*
730 * First, update the frame head position.
731 *
732 * The frame head should never move backwards, and the code below wouldn't
733 * cope if it did, so for safety we complain if it does.
734 */
735 update_frameheadpos(winstate);
736 if (winstate->frameheadpos < winstate->aggregatedbase)
737 elog(ERROR, "window frame head moved backward");
738
739 /*
740 * If the frame didn't change compared to the previous row, we can re-use
741 * the result values that were previously saved at the bottom of this
742 * function. Since we don't know the current frame's end yet, this is not
743 * possible to check for fully. But if the frame end mode is UNBOUNDED
744 * FOLLOWING or CURRENT ROW, no exclusion clause is specified, and the
745 * current row lies within the previous row's frame, then the two frames'
746 * ends must coincide. Note that on the first row aggregatedbase ==
747 * aggregatedupto, meaning this test must fail, so we don't need to check
748 * the "there was no previous row" case explicitly here.
749 */
750 if (winstate->aggregatedbase == winstate->frameheadpos &&
751 (winstate->frameOptions & (FRAMEOPTION_END_UNBOUNDED_FOLLOWING |
752 FRAMEOPTION_END_CURRENT_ROW)) &&
753 !(winstate->frameOptions & FRAMEOPTION_EXCLUSION) &&
754 winstate->aggregatedbase <= winstate->currentpos &&
755 winstate->aggregatedupto > winstate->currentpos)
756 {
757 for (i = 0; i < numaggs; i++)
758 {
759 peraggstate = &winstate->peragg[i];
760 wfuncno = peraggstate->wfuncno;
761 econtext->ecxt_aggvalues[wfuncno] = peraggstate->resultValue;
762 econtext->ecxt_aggnulls[wfuncno] = peraggstate->resultValueIsNull;
763 }
764 return;
765 }
766
767 /*----------
768 * Initialize restart flags.
769 *
770 * We restart the aggregation:
771 * - if we're processing the first row in the partition, or
772 * - if the frame's head moved and we cannot use an inverse
773 * transition function, or
774 * - we have an EXCLUSION clause, or
775 * - if the new frame doesn't overlap the old one
776 *
777 * Note that we don't strictly need to restart in the last case, but if
778 * we're going to remove all rows from the aggregation anyway, a restart
779 * surely is faster.
780 *----------
781 */
782 numaggs_restart = 0;
783 for (i = 0; i < numaggs; i++)
784 {
785 peraggstate = &winstate->peragg[i];
786 if (winstate->currentpos == 0 ||
787 (winstate->aggregatedbase != winstate->frameheadpos &&
788 !OidIsValid(peraggstate->invtransfn_oid)) ||
789 (winstate->frameOptions & FRAMEOPTION_EXCLUSION) ||
790 winstate->aggregatedupto <= winstate->frameheadpos)
791 {
792 peraggstate->restart = true;
793 numaggs_restart++;
794 }
795 else
796 peraggstate->restart = false;
797 }
798
799 /*
800 * If we have any possibly-moving aggregates, attempt to advance
801 * aggregatedbase to match the frame's head by removing input rows that
802 * fell off the top of the frame from the aggregations. This can fail,
803 * i.e. advance_windowaggregate_base() can return false, in which case
804 * we'll restart that aggregate below.
805 */
806 while (numaggs_restart < numaggs &&
807 winstate->aggregatedbase < winstate->frameheadpos)
808 {
809 /*
810 * Fetch the next tuple of those being removed. This should never fail
811 * as we should have been here before.
812 */
813 if (!window_gettupleslot(agg_winobj, winstate->aggregatedbase,
814 temp_slot))
815 elog(ERROR, "could not re-fetch previously fetched frame row");
816
817 /* Set tuple context for evaluation of aggregate arguments */
818 winstate->tmpcontext->ecxt_outertuple = temp_slot;
819
820 /*
821 * Perform the inverse transition for each aggregate function in the
822 * window, unless it has already been marked as needing a restart.
823 */
824 for (i = 0; i < numaggs; i++)
825 {
826 bool ok;
827
828 peraggstate = &winstate->peragg[i];
829 if (peraggstate->restart)
830 continue;
831
832 wfuncno = peraggstate->wfuncno;
833 ok = advance_windowaggregate_base(winstate,
834 &winstate->perfunc[wfuncno],
835 peraggstate);
836 if (!ok)
837 {
838 /* Inverse transition function has failed, must restart */
839 peraggstate->restart = true;
840 numaggs_restart++;
841 }
842 }
843
844 /* Reset per-input-tuple context after each tuple */
845 ResetExprContext(winstate->tmpcontext);
846
847 /* And advance the aggregated-row state */
848 winstate->aggregatedbase++;
849 ExecClearTuple(temp_slot);
850 }
851
852 /*
853 * If we successfully advanced the base rows of all the aggregates,
854 * aggregatedbase now equals frameheadpos; but if we failed for any, we
855 * must forcibly update aggregatedbase.
856 */
857 winstate->aggregatedbase = winstate->frameheadpos;
858
859 /*
860 * If we created a mark pointer for aggregates, keep it pushed up to frame
861 * head, so that tuplestore can discard unnecessary rows.
862 */
863 if (agg_winobj->markptr >= 0)
864 WinSetMarkPosition(agg_winobj, winstate->frameheadpos);
865
866 /*
867 * Now restart the aggregates that require it.
868 *
869 * We assume that aggregates using the shared context always restart if
870 * *any* aggregate restarts, and we may thus clean up the shared
871 * aggcontext if that is the case. Private aggcontexts are reset by
872 * initialize_windowaggregate() if their owning aggregate restarts. If we
873 * aren't restarting an aggregate, we need to free any previously saved
874 * result for it, else we'll leak memory.
875 */
876 if (numaggs_restart > 0)
877 MemoryContextResetAndDeleteChildren(winstate->aggcontext);
878 for (i = 0; i < numaggs; i++)
879 {
880 peraggstate = &winstate->peragg[i];
881
882 /* Aggregates using the shared ctx must restart if *any* agg does */
883 Assert(peraggstate->aggcontext != winstate->aggcontext ||
884 numaggs_restart == 0 ||
885 peraggstate->restart);
886
887 if (peraggstate->restart)
888 {
889 wfuncno = peraggstate->wfuncno;
890 initialize_windowaggregate(winstate,
891 &winstate->perfunc[wfuncno],
892 peraggstate);
893 }
894 else if (!peraggstate->resultValueIsNull)
895 {
896 if (!peraggstate->resulttypeByVal)
897 pfree(DatumGetPointer(peraggstate->resultValue));
898 peraggstate->resultValue = (Datum) 0;
899 peraggstate->resultValueIsNull = true;
900 }
901 }
902
903 /*
904 * Non-restarted aggregates now contain the rows between aggregatedbase
905 * (i.e., frameheadpos) and aggregatedupto, while restarted aggregates
906 * contain no rows. If there are any restarted aggregates, we must thus
907 * begin aggregating anew at frameheadpos, otherwise we may simply
908 * continue at aggregatedupto. We must remember the old value of
909 * aggregatedupto to know how long to skip advancing non-restarted
910 * aggregates. If we modify aggregatedupto, we must also clear
911 * agg_row_slot, per the loop invariant below.
912 */
913 aggregatedupto_nonrestarted = winstate->aggregatedupto;
914 if (numaggs_restart > 0 &&
915 winstate->aggregatedupto != winstate->frameheadpos)
916 {
917 winstate->aggregatedupto = winstate->frameheadpos;
918 ExecClearTuple(agg_row_slot);
919 }
920
921 /*
922 * Advance until we reach a row not in frame (or end of partition).
923 *
924 * Note the loop invariant: agg_row_slot is either empty or holds the row
925 * at position aggregatedupto. We advance aggregatedupto after processing
926 * a row.
927 */
928 for (;;)
929 {
930 int ret;
931
932 /* Fetch next row if we didn't already */
933 if (TupIsNull(agg_row_slot))
934 {
935 if (!window_gettupleslot(agg_winobj, winstate->aggregatedupto,
936 agg_row_slot))
937 break; /* must be end of partition */
938 }
939
940 /*
941 * Exit loop if no more rows can be in frame. Skip aggregation if
942 * current row is not in frame but there might be more in the frame.
943 */
944 ret = row_is_in_frame(winstate, winstate->aggregatedupto, agg_row_slot);
945 if (ret < 0)
946 break;
947 if (ret == 0)
948 goto next_tuple;
949
950 /* Set tuple context for evaluation of aggregate arguments */
951 winstate->tmpcontext->ecxt_outertuple = agg_row_slot;
952
953 /* Accumulate row into the aggregates */
954 for (i = 0; i < numaggs; i++)
955 {
956 peraggstate = &winstate->peragg[i];
957
958 /* Non-restarted aggs skip until aggregatedupto_nonrestarted */
959 if (!peraggstate->restart &&
960 winstate->aggregatedupto < aggregatedupto_nonrestarted)
961 continue;
962
963 wfuncno = peraggstate->wfuncno;
964 advance_windowaggregate(winstate,
965 &winstate->perfunc[wfuncno],
966 peraggstate);
967 }
968
969 next_tuple:
970 /* Reset per-input-tuple context after each tuple */
971 ResetExprContext(winstate->tmpcontext);
972
973 /* And advance the aggregated-row state */
974 winstate->aggregatedupto++;
975 ExecClearTuple(agg_row_slot);
976 }
977
978 /* The frame's end is not supposed to move backwards, ever */
979 Assert(aggregatedupto_nonrestarted <= winstate->aggregatedupto);
980
981 /*
982 * finalize aggregates and fill result/isnull fields.
983 */
984 for (i = 0; i < numaggs; i++)
985 {
986 Datum *result;
987 bool *isnull;
988
989 peraggstate = &winstate->peragg[i];
990 wfuncno = peraggstate->wfuncno;
991 result = &econtext->ecxt_aggvalues[wfuncno];
992 isnull = &econtext->ecxt_aggnulls[wfuncno];
993 finalize_windowaggregate(winstate,
994 &winstate->perfunc[wfuncno],
995 peraggstate,
996 result, isnull);
997
998 /*
999 * save the result in case next row shares the same frame.
1000 *
1001 * XXX in some framing modes, eg ROWS/END_CURRENT_ROW, we can know in
1002 * advance that the next row can't possibly share the same frame. Is
1003 * it worth detecting that and skipping this code?
1004 */
1005 if (!peraggstate->resulttypeByVal && !*isnull)
1006 {
1007 oldContext = MemoryContextSwitchTo(peraggstate->aggcontext);
1008 peraggstate->resultValue =
1009 datumCopy(*result,
1010 peraggstate->resulttypeByVal,
1011 peraggstate->resulttypeLen);
1012 MemoryContextSwitchTo(oldContext);
1013 }
1014 else
1015 {
1016 peraggstate->resultValue = *result;
1017 }
1018 peraggstate->resultValueIsNull = *isnull;
1019 }
1020 }
1021
1022 /*
1023 * eval_windowfunction
1024 *
1025 * Arguments of window functions are not evaluated here, because a window
1026 * function can need random access to arbitrary rows in the partition.
1027 * The window function uses the special WinGetFuncArgInPartition and
1028 * WinGetFuncArgInFrame functions to evaluate the arguments for the rows
1029 * it wants.
1030 */
1031 static void
eval_windowfunction(WindowAggState * winstate,WindowStatePerFunc perfuncstate,Datum * result,bool * isnull)1032 eval_windowfunction(WindowAggState *winstate, WindowStatePerFunc perfuncstate,
1033 Datum *result, bool *isnull)
1034 {
1035 FunctionCallInfoData fcinfo;
1036 MemoryContext oldContext;
1037
1038 oldContext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
1039
1040 /*
1041 * We don't pass any normal arguments to a window function, but we do pass
1042 * it the number of arguments, in order to permit window function
1043 * implementations to support varying numbers of arguments. The real info
1044 * goes through the WindowObject, which is passed via fcinfo->context.
1045 */
1046 InitFunctionCallInfoData(fcinfo, &(perfuncstate->flinfo),
1047 perfuncstate->numArguments,
1048 perfuncstate->winCollation,
1049 (void *) perfuncstate->winobj, NULL);
1050 /* Just in case, make all the regular argument slots be null */
1051 memset(fcinfo.argnull, true, perfuncstate->numArguments);
1052 /* Window functions don't have a current aggregate context, either */
1053 winstate->curaggcontext = NULL;
1054
1055 *result = FunctionCallInvoke(&fcinfo);
1056 *isnull = fcinfo.isnull;
1057
1058 /*
1059 * Make sure pass-by-ref data is allocated in the appropriate context. (We
1060 * need this in case the function returns a pointer into some short-lived
1061 * tuple, as is entirely possible.)
1062 */
1063 if (!perfuncstate->resulttypeByVal && !fcinfo.isnull &&
1064 !MemoryContextContains(CurrentMemoryContext,
1065 DatumGetPointer(*result)))
1066 *result = datumCopy(*result,
1067 perfuncstate->resulttypeByVal,
1068 perfuncstate->resulttypeLen);
1069
1070 MemoryContextSwitchTo(oldContext);
1071 }
1072
1073 /*
1074 * begin_partition
1075 * Start buffering rows of the next partition.
1076 */
1077 static void
begin_partition(WindowAggState * winstate)1078 begin_partition(WindowAggState *winstate)
1079 {
1080 WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
1081 PlanState *outerPlan = outerPlanState(winstate);
1082 int frameOptions = winstate->frameOptions;
1083 int numfuncs = winstate->numfuncs;
1084 int i;
1085
1086 winstate->partition_spooled = false;
1087 winstate->framehead_valid = false;
1088 winstate->frametail_valid = false;
1089 winstate->grouptail_valid = false;
1090 winstate->spooled_rows = 0;
1091 winstate->currentpos = 0;
1092 winstate->frameheadpos = 0;
1093 winstate->frametailpos = 0;
1094 winstate->currentgroup = 0;
1095 winstate->frameheadgroup = 0;
1096 winstate->frametailgroup = 0;
1097 winstate->groupheadpos = 0;
1098 winstate->grouptailpos = -1; /* see update_grouptailpos */
1099 ExecClearTuple(winstate->agg_row_slot);
1100 if (winstate->framehead_slot)
1101 ExecClearTuple(winstate->framehead_slot);
1102 if (winstate->frametail_slot)
1103 ExecClearTuple(winstate->frametail_slot);
1104
1105 /*
1106 * If this is the very first partition, we need to fetch the first input
1107 * row to store in first_part_slot.
1108 */
1109 if (TupIsNull(winstate->first_part_slot))
1110 {
1111 TupleTableSlot *outerslot = ExecProcNode(outerPlan);
1112
1113 if (!TupIsNull(outerslot))
1114 ExecCopySlot(winstate->first_part_slot, outerslot);
1115 else
1116 {
1117 /* outer plan is empty, so we have nothing to do */
1118 winstate->partition_spooled = true;
1119 winstate->more_partitions = false;
1120 return;
1121 }
1122 }
1123
1124 /* Create new tuplestore for this partition */
1125 winstate->buffer = tuplestore_begin_heap(false, false, work_mem);
1126
1127 /*
1128 * Set up read pointers for the tuplestore. The current pointer doesn't
1129 * need BACKWARD capability, but the per-window-function read pointers do,
1130 * and the aggregate pointer does if we might need to restart aggregation.
1131 */
1132 winstate->current_ptr = 0; /* read pointer 0 is pre-allocated */
1133
1134 /* reset default REWIND capability bit for current ptr */
1135 tuplestore_set_eflags(winstate->buffer, 0);
1136
1137 /* create read pointers for aggregates, if needed */
1138 if (winstate->numaggs > 0)
1139 {
1140 WindowObject agg_winobj = winstate->agg_winobj;
1141 int readptr_flags = 0;
1142
1143 /*
1144 * If the frame head is potentially movable, or we have an EXCLUSION
1145 * clause, we might need to restart aggregation ...
1146 */
1147 if (!(frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING) ||
1148 (frameOptions & FRAMEOPTION_EXCLUSION))
1149 {
1150 /* ... so create a mark pointer to track the frame head */
1151 agg_winobj->markptr = tuplestore_alloc_read_pointer(winstate->buffer, 0);
1152 /* and the read pointer will need BACKWARD capability */
1153 readptr_flags |= EXEC_FLAG_BACKWARD;
1154 }
1155
1156 agg_winobj->readptr = tuplestore_alloc_read_pointer(winstate->buffer,
1157 readptr_flags);
1158 agg_winobj->markpos = -1;
1159 agg_winobj->seekpos = -1;
1160
1161 /* Also reset the row counters for aggregates */
1162 winstate->aggregatedbase = 0;
1163 winstate->aggregatedupto = 0;
1164 }
1165
1166 /* create mark and read pointers for each real window function */
1167 for (i = 0; i < numfuncs; i++)
1168 {
1169 WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
1170
1171 if (!perfuncstate->plain_agg)
1172 {
1173 WindowObject winobj = perfuncstate->winobj;
1174
1175 winobj->markptr = tuplestore_alloc_read_pointer(winstate->buffer,
1176 0);
1177 winobj->readptr = tuplestore_alloc_read_pointer(winstate->buffer,
1178 EXEC_FLAG_BACKWARD);
1179 winobj->markpos = -1;
1180 winobj->seekpos = -1;
1181 }
1182 }
1183
1184 /*
1185 * If we are in RANGE or GROUPS mode, then determining frame boundaries
1186 * requires physical access to the frame endpoint rows, except in certain
1187 * degenerate cases. We create read pointers to point to those rows, to
1188 * simplify access and ensure that the tuplestore doesn't discard the
1189 * endpoint rows prematurely. (Must create pointers in exactly the same
1190 * cases that update_frameheadpos and update_frametailpos need them.)
1191 */
1192 winstate->framehead_ptr = winstate->frametail_ptr = -1; /* if not used */
1193
1194 if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS))
1195 {
1196 if (((frameOptions & FRAMEOPTION_START_CURRENT_ROW) &&
1197 node->ordNumCols != 0) ||
1198 (frameOptions & FRAMEOPTION_START_OFFSET))
1199 winstate->framehead_ptr =
1200 tuplestore_alloc_read_pointer(winstate->buffer, 0);
1201 if (((frameOptions & FRAMEOPTION_END_CURRENT_ROW) &&
1202 node->ordNumCols != 0) ||
1203 (frameOptions & FRAMEOPTION_END_OFFSET))
1204 winstate->frametail_ptr =
1205 tuplestore_alloc_read_pointer(winstate->buffer, 0);
1206 }
1207
1208 /*
1209 * If we have an exclusion clause that requires knowing the boundaries of
1210 * the current row's peer group, we create a read pointer to track the
1211 * tail position of the peer group (i.e., first row of the next peer
1212 * group). The head position does not require its own pointer because we
1213 * maintain that as a side effect of advancing the current row.
1214 */
1215 winstate->grouptail_ptr = -1;
1216
1217 if ((frameOptions & (FRAMEOPTION_EXCLUDE_GROUP |
1218 FRAMEOPTION_EXCLUDE_TIES)) &&
1219 node->ordNumCols != 0)
1220 {
1221 winstate->grouptail_ptr =
1222 tuplestore_alloc_read_pointer(winstate->buffer, 0);
1223 }
1224
1225 /*
1226 * Store the first tuple into the tuplestore (it's always available now;
1227 * we either read it above, or saved it at the end of previous partition)
1228 */
1229 tuplestore_puttupleslot(winstate->buffer, winstate->first_part_slot);
1230 winstate->spooled_rows++;
1231 }
1232
1233 /*
1234 * Read tuples from the outer node, up to and including position 'pos', and
1235 * store them into the tuplestore. If pos is -1, reads the whole partition.
1236 */
1237 static void
spool_tuples(WindowAggState * winstate,int64 pos)1238 spool_tuples(WindowAggState *winstate, int64 pos)
1239 {
1240 WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
1241 PlanState *outerPlan;
1242 TupleTableSlot *outerslot;
1243 MemoryContext oldcontext;
1244
1245 if (!winstate->buffer)
1246 return; /* just a safety check */
1247 if (winstate->partition_spooled)
1248 return; /* whole partition done already */
1249
1250 /*
1251 * If the tuplestore has spilled to disk, alternate reading and writing
1252 * becomes quite expensive due to frequent buffer flushes. It's cheaper
1253 * to force the entire partition to get spooled in one go.
1254 *
1255 * XXX this is a horrid kluge --- it'd be better to fix the performance
1256 * problem inside tuplestore. FIXME
1257 */
1258 if (!tuplestore_in_memory(winstate->buffer))
1259 pos = -1;
1260
1261 outerPlan = outerPlanState(winstate);
1262
1263 /* Must be in query context to call outerplan */
1264 oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
1265
1266 while (winstate->spooled_rows <= pos || pos == -1)
1267 {
1268 outerslot = ExecProcNode(outerPlan);
1269 if (TupIsNull(outerslot))
1270 {
1271 /* reached the end of the last partition */
1272 winstate->partition_spooled = true;
1273 winstate->more_partitions = false;
1274 break;
1275 }
1276
1277 if (node->partNumCols > 0)
1278 {
1279 ExprContext *econtext = winstate->tmpcontext;
1280
1281 econtext->ecxt_innertuple = winstate->first_part_slot;
1282 econtext->ecxt_outertuple = outerslot;
1283
1284 /* Check if this tuple still belongs to the current partition */
1285 if (!ExecQualAndReset(winstate->partEqfunction, econtext))
1286 {
1287 /*
1288 * end of partition; copy the tuple for the next cycle.
1289 */
1290 ExecCopySlot(winstate->first_part_slot, outerslot);
1291 winstate->partition_spooled = true;
1292 winstate->more_partitions = true;
1293 break;
1294 }
1295 }
1296
1297 /* Still in partition, so save it into the tuplestore */
1298 tuplestore_puttupleslot(winstate->buffer, outerslot);
1299 winstate->spooled_rows++;
1300 }
1301
1302 MemoryContextSwitchTo(oldcontext);
1303 }
1304
1305 /*
1306 * release_partition
1307 * clear information kept within a partition, including
1308 * tuplestore and aggregate results.
1309 */
1310 static void
release_partition(WindowAggState * winstate)1311 release_partition(WindowAggState *winstate)
1312 {
1313 int i;
1314
1315 for (i = 0; i < winstate->numfuncs; i++)
1316 {
1317 WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
1318
1319 /* Release any partition-local state of this window function */
1320 if (perfuncstate->winobj)
1321 perfuncstate->winobj->localmem = NULL;
1322 }
1323
1324 /*
1325 * Release all partition-local memory (in particular, any partition-local
1326 * state that we might have trashed our pointers to in the above loop, and
1327 * any aggregate temp data). We don't rely on retail pfree because some
1328 * aggregates might have allocated data we don't have direct pointers to.
1329 */
1330 MemoryContextResetAndDeleteChildren(winstate->partcontext);
1331 MemoryContextResetAndDeleteChildren(winstate->aggcontext);
1332 for (i = 0; i < winstate->numaggs; i++)
1333 {
1334 if (winstate->peragg[i].aggcontext != winstate->aggcontext)
1335 MemoryContextResetAndDeleteChildren(winstate->peragg[i].aggcontext);
1336 }
1337
1338 if (winstate->buffer)
1339 tuplestore_end(winstate->buffer);
1340 winstate->buffer = NULL;
1341 winstate->partition_spooled = false;
1342 }
1343
1344 /*
1345 * row_is_in_frame
1346 * Determine whether a row is in the current row's window frame according
1347 * to our window framing rule
1348 *
1349 * The caller must have already determined that the row is in the partition
1350 * and fetched it into a slot. This function just encapsulates the framing
1351 * rules.
1352 *
1353 * Returns:
1354 * -1, if the row is out of frame and no succeeding rows can be in frame
1355 * 0, if the row is out of frame but succeeding rows might be in frame
1356 * 1, if the row is in frame
1357 *
1358 * May clobber winstate->temp_slot_2.
1359 */
1360 static int
row_is_in_frame(WindowAggState * winstate,int64 pos,TupleTableSlot * slot)1361 row_is_in_frame(WindowAggState *winstate, int64 pos, TupleTableSlot *slot)
1362 {
1363 int frameOptions = winstate->frameOptions;
1364
1365 Assert(pos >= 0); /* else caller error */
1366
1367 /*
1368 * First, check frame starting conditions. We might as well delegate this
1369 * to update_frameheadpos always; it doesn't add any notable cost.
1370 */
1371 update_frameheadpos(winstate);
1372 if (pos < winstate->frameheadpos)
1373 return 0;
1374
1375 /*
1376 * Okay so far, now check frame ending conditions. Here, we avoid calling
1377 * update_frametailpos in simple cases, so as not to spool tuples further
1378 * ahead than necessary.
1379 */
1380 if (frameOptions & FRAMEOPTION_END_CURRENT_ROW)
1381 {
1382 if (frameOptions & FRAMEOPTION_ROWS)
1383 {
1384 /* rows after current row are out of frame */
1385 if (pos > winstate->currentpos)
1386 return -1;
1387 }
1388 else if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS))
1389 {
1390 /* following row that is not peer is out of frame */
1391 if (pos > winstate->currentpos &&
1392 !are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
1393 return -1;
1394 }
1395 else
1396 Assert(false);
1397 }
1398 else if (frameOptions & FRAMEOPTION_END_OFFSET)
1399 {
1400 if (frameOptions & FRAMEOPTION_ROWS)
1401 {
1402 int64 offset = DatumGetInt64(winstate->endOffsetValue);
1403
1404 /* rows after current row + offset are out of frame */
1405 if (frameOptions & FRAMEOPTION_END_OFFSET_PRECEDING)
1406 offset = -offset;
1407
1408 if (pos > winstate->currentpos + offset)
1409 return -1;
1410 }
1411 else if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS))
1412 {
1413 /* hard cases, so delegate to update_frametailpos */
1414 update_frametailpos(winstate);
1415 if (pos >= winstate->frametailpos)
1416 return -1;
1417 }
1418 else
1419 Assert(false);
1420 }
1421
1422 /* Check exclusion clause */
1423 if (frameOptions & FRAMEOPTION_EXCLUDE_CURRENT_ROW)
1424 {
1425 if (pos == winstate->currentpos)
1426 return 0;
1427 }
1428 else if ((frameOptions & FRAMEOPTION_EXCLUDE_GROUP) ||
1429 ((frameOptions & FRAMEOPTION_EXCLUDE_TIES) &&
1430 pos != winstate->currentpos))
1431 {
1432 WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
1433
1434 /* If no ORDER BY, all rows are peers with each other */
1435 if (node->ordNumCols == 0)
1436 return 0;
1437 /* Otherwise, check the group boundaries */
1438 if (pos >= winstate->groupheadpos)
1439 {
1440 update_grouptailpos(winstate);
1441 if (pos < winstate->grouptailpos)
1442 return 0;
1443 }
1444 }
1445
1446 /* If we get here, it's in frame */
1447 return 1;
1448 }
1449
1450 /*
1451 * update_frameheadpos
1452 * make frameheadpos valid for the current row
1453 *
1454 * Note that frameheadpos is computed without regard for any window exclusion
1455 * clause; the current row and/or its peers are considered part of the frame
1456 * for this purpose even if they must be excluded later.
1457 *
1458 * May clobber winstate->temp_slot_2.
1459 */
1460 static void
update_frameheadpos(WindowAggState * winstate)1461 update_frameheadpos(WindowAggState *winstate)
1462 {
1463 WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
1464 int frameOptions = winstate->frameOptions;
1465 MemoryContext oldcontext;
1466
1467 if (winstate->framehead_valid)
1468 return; /* already known for current row */
1469
1470 /* We may be called in a short-lived context */
1471 oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
1472
1473 if (frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING)
1474 {
1475 /* In UNBOUNDED PRECEDING mode, frame head is always row 0 */
1476 winstate->frameheadpos = 0;
1477 winstate->framehead_valid = true;
1478 }
1479 else if (frameOptions & FRAMEOPTION_START_CURRENT_ROW)
1480 {
1481 if (frameOptions & FRAMEOPTION_ROWS)
1482 {
1483 /* In ROWS mode, frame head is the same as current */
1484 winstate->frameheadpos = winstate->currentpos;
1485 winstate->framehead_valid = true;
1486 }
1487 else if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS))
1488 {
1489 /* If no ORDER BY, all rows are peers with each other */
1490 if (node->ordNumCols == 0)
1491 {
1492 winstate->frameheadpos = 0;
1493 winstate->framehead_valid = true;
1494 MemoryContextSwitchTo(oldcontext);
1495 return;
1496 }
1497
1498 /*
1499 * In RANGE or GROUPS START_CURRENT_ROW mode, frame head is the
1500 * first row that is a peer of current row. We keep a copy of the
1501 * last-known frame head row in framehead_slot, and advance as
1502 * necessary. Note that if we reach end of partition, we will
1503 * leave frameheadpos = end+1 and framehead_slot empty.
1504 */
1505 tuplestore_select_read_pointer(winstate->buffer,
1506 winstate->framehead_ptr);
1507 if (winstate->frameheadpos == 0 &&
1508 TupIsNull(winstate->framehead_slot))
1509 {
1510 /* fetch first row into framehead_slot, if we didn't already */
1511 if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1512 winstate->framehead_slot))
1513 elog(ERROR, "unexpected end of tuplestore");
1514 }
1515
1516 while (!TupIsNull(winstate->framehead_slot))
1517 {
1518 if (are_peers(winstate, winstate->framehead_slot,
1519 winstate->ss.ss_ScanTupleSlot))
1520 break; /* this row is the correct frame head */
1521 /* Note we advance frameheadpos even if the fetch fails */
1522 winstate->frameheadpos++;
1523 spool_tuples(winstate, winstate->frameheadpos);
1524 if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1525 winstate->framehead_slot))
1526 break; /* end of partition */
1527 }
1528 winstate->framehead_valid = true;
1529 }
1530 else
1531 Assert(false);
1532 }
1533 else if (frameOptions & FRAMEOPTION_START_OFFSET)
1534 {
1535 if (frameOptions & FRAMEOPTION_ROWS)
1536 {
1537 /* In ROWS mode, bound is physically n before/after current */
1538 int64 offset = DatumGetInt64(winstate->startOffsetValue);
1539
1540 if (frameOptions & FRAMEOPTION_START_OFFSET_PRECEDING)
1541 offset = -offset;
1542
1543 winstate->frameheadpos = winstate->currentpos + offset;
1544 /* frame head can't go before first row */
1545 if (winstate->frameheadpos < 0)
1546 winstate->frameheadpos = 0;
1547 else if (winstate->frameheadpos > winstate->currentpos + 1)
1548 {
1549 /* make sure frameheadpos is not past end of partition */
1550 spool_tuples(winstate, winstate->frameheadpos - 1);
1551 if (winstate->frameheadpos > winstate->spooled_rows)
1552 winstate->frameheadpos = winstate->spooled_rows;
1553 }
1554 winstate->framehead_valid = true;
1555 }
1556 else if (frameOptions & FRAMEOPTION_RANGE)
1557 {
1558 /*
1559 * In RANGE START_OFFSET mode, frame head is the first row that
1560 * satisfies the in_range constraint relative to the current row.
1561 * We keep a copy of the last-known frame head row in
1562 * framehead_slot, and advance as necessary. Note that if we
1563 * reach end of partition, we will leave frameheadpos = end+1 and
1564 * framehead_slot empty.
1565 */
1566 int sortCol = node->ordColIdx[0];
1567 bool sub,
1568 less;
1569
1570 /* We must have an ordering column */
1571 Assert(node->ordNumCols == 1);
1572
1573 /* Precompute flags for in_range checks */
1574 if (frameOptions & FRAMEOPTION_START_OFFSET_PRECEDING)
1575 sub = true; /* subtract startOffset from current row */
1576 else
1577 sub = false; /* add it */
1578 less = false; /* normally, we want frame head >= sum */
1579 /* If sort order is descending, flip both flags */
1580 if (!winstate->inRangeAsc)
1581 {
1582 sub = !sub;
1583 less = true;
1584 }
1585
1586 tuplestore_select_read_pointer(winstate->buffer,
1587 winstate->framehead_ptr);
1588 if (winstate->frameheadpos == 0 &&
1589 TupIsNull(winstate->framehead_slot))
1590 {
1591 /* fetch first row into framehead_slot, if we didn't already */
1592 if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1593 winstate->framehead_slot))
1594 elog(ERROR, "unexpected end of tuplestore");
1595 }
1596
1597 while (!TupIsNull(winstate->framehead_slot))
1598 {
1599 Datum headval,
1600 currval;
1601 bool headisnull,
1602 currisnull;
1603
1604 headval = slot_getattr(winstate->framehead_slot, sortCol,
1605 &headisnull);
1606 currval = slot_getattr(winstate->ss.ss_ScanTupleSlot, sortCol,
1607 &currisnull);
1608 if (headisnull || currisnull)
1609 {
1610 /* order of the rows depends only on nulls_first */
1611 if (winstate->inRangeNullsFirst)
1612 {
1613 /* advance head if head is null and curr is not */
1614 if (!headisnull || currisnull)
1615 break;
1616 }
1617 else
1618 {
1619 /* advance head if head is not null and curr is null */
1620 if (headisnull || !currisnull)
1621 break;
1622 }
1623 }
1624 else
1625 {
1626 if (DatumGetBool(FunctionCall5Coll(&winstate->startInRangeFunc,
1627 winstate->inRangeColl,
1628 headval,
1629 currval,
1630 winstate->startOffsetValue,
1631 BoolGetDatum(sub),
1632 BoolGetDatum(less))))
1633 break; /* this row is the correct frame head */
1634 }
1635 /* Note we advance frameheadpos even if the fetch fails */
1636 winstate->frameheadpos++;
1637 spool_tuples(winstate, winstate->frameheadpos);
1638 if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1639 winstate->framehead_slot))
1640 break; /* end of partition */
1641 }
1642 winstate->framehead_valid = true;
1643 }
1644 else if (frameOptions & FRAMEOPTION_GROUPS)
1645 {
1646 /*
1647 * In GROUPS START_OFFSET mode, frame head is the first row of the
1648 * first peer group whose number satisfies the offset constraint.
1649 * We keep a copy of the last-known frame head row in
1650 * framehead_slot, and advance as necessary. Note that if we
1651 * reach end of partition, we will leave frameheadpos = end+1 and
1652 * framehead_slot empty.
1653 */
1654 int64 offset = DatumGetInt64(winstate->startOffsetValue);
1655 int64 minheadgroup;
1656
1657 if (frameOptions & FRAMEOPTION_START_OFFSET_PRECEDING)
1658 minheadgroup = winstate->currentgroup - offset;
1659 else
1660 minheadgroup = winstate->currentgroup + offset;
1661
1662 tuplestore_select_read_pointer(winstate->buffer,
1663 winstate->framehead_ptr);
1664 if (winstate->frameheadpos == 0 &&
1665 TupIsNull(winstate->framehead_slot))
1666 {
1667 /* fetch first row into framehead_slot, if we didn't already */
1668 if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1669 winstate->framehead_slot))
1670 elog(ERROR, "unexpected end of tuplestore");
1671 }
1672
1673 while (!TupIsNull(winstate->framehead_slot))
1674 {
1675 if (winstate->frameheadgroup >= minheadgroup)
1676 break; /* this row is the correct frame head */
1677 ExecCopySlot(winstate->temp_slot_2, winstate->framehead_slot);
1678 /* Note we advance frameheadpos even if the fetch fails */
1679 winstate->frameheadpos++;
1680 spool_tuples(winstate, winstate->frameheadpos);
1681 if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1682 winstate->framehead_slot))
1683 break; /* end of partition */
1684 if (!are_peers(winstate, winstate->temp_slot_2,
1685 winstate->framehead_slot))
1686 winstate->frameheadgroup++;
1687 }
1688 ExecClearTuple(winstate->temp_slot_2);
1689 winstate->framehead_valid = true;
1690 }
1691 else
1692 Assert(false);
1693 }
1694 else
1695 Assert(false);
1696
1697 MemoryContextSwitchTo(oldcontext);
1698 }
1699
1700 /*
1701 * update_frametailpos
1702 * make frametailpos valid for the current row
1703 *
1704 * Note that frametailpos is computed without regard for any window exclusion
1705 * clause; the current row and/or its peers are considered part of the frame
1706 * for this purpose even if they must be excluded later.
1707 *
1708 * May clobber winstate->temp_slot_2.
1709 */
1710 static void
update_frametailpos(WindowAggState * winstate)1711 update_frametailpos(WindowAggState *winstate)
1712 {
1713 WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
1714 int frameOptions = winstate->frameOptions;
1715 MemoryContext oldcontext;
1716
1717 if (winstate->frametail_valid)
1718 return; /* already known for current row */
1719
1720 /* We may be called in a short-lived context */
1721 oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
1722
1723 if (frameOptions & FRAMEOPTION_END_UNBOUNDED_FOLLOWING)
1724 {
1725 /* In UNBOUNDED FOLLOWING mode, all partition rows are in frame */
1726 spool_tuples(winstate, -1);
1727 winstate->frametailpos = winstate->spooled_rows;
1728 winstate->frametail_valid = true;
1729 }
1730 else if (frameOptions & FRAMEOPTION_END_CURRENT_ROW)
1731 {
1732 if (frameOptions & FRAMEOPTION_ROWS)
1733 {
1734 /* In ROWS mode, exactly the rows up to current are in frame */
1735 winstate->frametailpos = winstate->currentpos + 1;
1736 winstate->frametail_valid = true;
1737 }
1738 else if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS))
1739 {
1740 /* If no ORDER BY, all rows are peers with each other */
1741 if (node->ordNumCols == 0)
1742 {
1743 spool_tuples(winstate, -1);
1744 winstate->frametailpos = winstate->spooled_rows;
1745 winstate->frametail_valid = true;
1746 MemoryContextSwitchTo(oldcontext);
1747 return;
1748 }
1749
1750 /*
1751 * In RANGE or GROUPS END_CURRENT_ROW mode, frame end is the last
1752 * row that is a peer of current row, frame tail is the row after
1753 * that (if any). We keep a copy of the last-known frame tail row
1754 * in frametail_slot, and advance as necessary. Note that if we
1755 * reach end of partition, we will leave frametailpos = end+1 and
1756 * frametail_slot empty.
1757 */
1758 tuplestore_select_read_pointer(winstate->buffer,
1759 winstate->frametail_ptr);
1760 if (winstate->frametailpos == 0 &&
1761 TupIsNull(winstate->frametail_slot))
1762 {
1763 /* fetch first row into frametail_slot, if we didn't already */
1764 if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1765 winstate->frametail_slot))
1766 elog(ERROR, "unexpected end of tuplestore");
1767 }
1768
1769 while (!TupIsNull(winstate->frametail_slot))
1770 {
1771 if (winstate->frametailpos > winstate->currentpos &&
1772 !are_peers(winstate, winstate->frametail_slot,
1773 winstate->ss.ss_ScanTupleSlot))
1774 break; /* this row is the frame tail */
1775 /* Note we advance frametailpos even if the fetch fails */
1776 winstate->frametailpos++;
1777 spool_tuples(winstate, winstate->frametailpos);
1778 if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1779 winstate->frametail_slot))
1780 break; /* end of partition */
1781 }
1782 winstate->frametail_valid = true;
1783 }
1784 else
1785 Assert(false);
1786 }
1787 else if (frameOptions & FRAMEOPTION_END_OFFSET)
1788 {
1789 if (frameOptions & FRAMEOPTION_ROWS)
1790 {
1791 /* In ROWS mode, bound is physically n before/after current */
1792 int64 offset = DatumGetInt64(winstate->endOffsetValue);
1793
1794 if (frameOptions & FRAMEOPTION_END_OFFSET_PRECEDING)
1795 offset = -offset;
1796
1797 winstate->frametailpos = winstate->currentpos + offset + 1;
1798 /* smallest allowable value of frametailpos is 0 */
1799 if (winstate->frametailpos < 0)
1800 winstate->frametailpos = 0;
1801 else if (winstate->frametailpos > winstate->currentpos + 1)
1802 {
1803 /* make sure frametailpos is not past end of partition */
1804 spool_tuples(winstate, winstate->frametailpos - 1);
1805 if (winstate->frametailpos > winstate->spooled_rows)
1806 winstate->frametailpos = winstate->spooled_rows;
1807 }
1808 winstate->frametail_valid = true;
1809 }
1810 else if (frameOptions & FRAMEOPTION_RANGE)
1811 {
1812 /*
1813 * In RANGE END_OFFSET mode, frame end is the last row that
1814 * satisfies the in_range constraint relative to the current row,
1815 * frame tail is the row after that (if any). We keep a copy of
1816 * the last-known frame tail row in frametail_slot, and advance as
1817 * necessary. Note that if we reach end of partition, we will
1818 * leave frametailpos = end+1 and frametail_slot empty.
1819 */
1820 int sortCol = node->ordColIdx[0];
1821 bool sub,
1822 less;
1823
1824 /* We must have an ordering column */
1825 Assert(node->ordNumCols == 1);
1826
1827 /* Precompute flags for in_range checks */
1828 if (frameOptions & FRAMEOPTION_END_OFFSET_PRECEDING)
1829 sub = true; /* subtract endOffset from current row */
1830 else
1831 sub = false; /* add it */
1832 less = true; /* normally, we want frame tail <= sum */
1833 /* If sort order is descending, flip both flags */
1834 if (!winstate->inRangeAsc)
1835 {
1836 sub = !sub;
1837 less = false;
1838 }
1839
1840 tuplestore_select_read_pointer(winstate->buffer,
1841 winstate->frametail_ptr);
1842 if (winstate->frametailpos == 0 &&
1843 TupIsNull(winstate->frametail_slot))
1844 {
1845 /* fetch first row into frametail_slot, if we didn't already */
1846 if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1847 winstate->frametail_slot))
1848 elog(ERROR, "unexpected end of tuplestore");
1849 }
1850
1851 while (!TupIsNull(winstate->frametail_slot))
1852 {
1853 Datum tailval,
1854 currval;
1855 bool tailisnull,
1856 currisnull;
1857
1858 tailval = slot_getattr(winstate->frametail_slot, sortCol,
1859 &tailisnull);
1860 currval = slot_getattr(winstate->ss.ss_ScanTupleSlot, sortCol,
1861 &currisnull);
1862 if (tailisnull || currisnull)
1863 {
1864 /* order of the rows depends only on nulls_first */
1865 if (winstate->inRangeNullsFirst)
1866 {
1867 /* advance tail if tail is null or curr is not */
1868 if (!tailisnull)
1869 break;
1870 }
1871 else
1872 {
1873 /* advance tail if tail is not null or curr is null */
1874 if (!currisnull)
1875 break;
1876 }
1877 }
1878 else
1879 {
1880 if (!DatumGetBool(FunctionCall5Coll(&winstate->endInRangeFunc,
1881 winstate->inRangeColl,
1882 tailval,
1883 currval,
1884 winstate->endOffsetValue,
1885 BoolGetDatum(sub),
1886 BoolGetDatum(less))))
1887 break; /* this row is the correct frame tail */
1888 }
1889 /* Note we advance frametailpos even if the fetch fails */
1890 winstate->frametailpos++;
1891 spool_tuples(winstate, winstate->frametailpos);
1892 if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1893 winstate->frametail_slot))
1894 break; /* end of partition */
1895 }
1896 winstate->frametail_valid = true;
1897 }
1898 else if (frameOptions & FRAMEOPTION_GROUPS)
1899 {
1900 /*
1901 * In GROUPS END_OFFSET mode, frame end is the last row of the
1902 * last peer group whose number satisfies the offset constraint,
1903 * and frame tail is the row after that (if any). We keep a copy
1904 * of the last-known frame tail row in frametail_slot, and advance
1905 * as necessary. Note that if we reach end of partition, we will
1906 * leave frametailpos = end+1 and frametail_slot empty.
1907 */
1908 int64 offset = DatumGetInt64(winstate->endOffsetValue);
1909 int64 maxtailgroup;
1910
1911 if (frameOptions & FRAMEOPTION_END_OFFSET_PRECEDING)
1912 maxtailgroup = winstate->currentgroup - offset;
1913 else
1914 maxtailgroup = winstate->currentgroup + offset;
1915
1916 tuplestore_select_read_pointer(winstate->buffer,
1917 winstate->frametail_ptr);
1918 if (winstate->frametailpos == 0 &&
1919 TupIsNull(winstate->frametail_slot))
1920 {
1921 /* fetch first row into frametail_slot, if we didn't already */
1922 if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1923 winstate->frametail_slot))
1924 elog(ERROR, "unexpected end of tuplestore");
1925 }
1926
1927 while (!TupIsNull(winstate->frametail_slot))
1928 {
1929 if (winstate->frametailgroup > maxtailgroup)
1930 break; /* this row is the correct frame tail */
1931 ExecCopySlot(winstate->temp_slot_2, winstate->frametail_slot);
1932 /* Note we advance frametailpos even if the fetch fails */
1933 winstate->frametailpos++;
1934 spool_tuples(winstate, winstate->frametailpos);
1935 if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1936 winstate->frametail_slot))
1937 break; /* end of partition */
1938 if (!are_peers(winstate, winstate->temp_slot_2,
1939 winstate->frametail_slot))
1940 winstate->frametailgroup++;
1941 }
1942 ExecClearTuple(winstate->temp_slot_2);
1943 winstate->frametail_valid = true;
1944 }
1945 else
1946 Assert(false);
1947 }
1948 else
1949 Assert(false);
1950
1951 MemoryContextSwitchTo(oldcontext);
1952 }
1953
1954 /*
1955 * update_grouptailpos
1956 * make grouptailpos valid for the current row
1957 *
1958 * May clobber winstate->temp_slot_2.
1959 */
1960 static void
update_grouptailpos(WindowAggState * winstate)1961 update_grouptailpos(WindowAggState *winstate)
1962 {
1963 WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
1964 MemoryContext oldcontext;
1965
1966 if (winstate->grouptail_valid)
1967 return; /* already known for current row */
1968
1969 /* We may be called in a short-lived context */
1970 oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
1971
1972 /* If no ORDER BY, all rows are peers with each other */
1973 if (node->ordNumCols == 0)
1974 {
1975 spool_tuples(winstate, -1);
1976 winstate->grouptailpos = winstate->spooled_rows;
1977 winstate->grouptail_valid = true;
1978 MemoryContextSwitchTo(oldcontext);
1979 return;
1980 }
1981
1982 /*
1983 * Because grouptail_valid is reset only when current row advances into a
1984 * new peer group, we always reach here knowing that grouptailpos needs to
1985 * be advanced by at least one row. Hence, unlike the otherwise similar
1986 * case for frame tail tracking, we do not need persistent storage of the
1987 * group tail row.
1988 */
1989 Assert(winstate->grouptailpos <= winstate->currentpos);
1990 tuplestore_select_read_pointer(winstate->buffer,
1991 winstate->grouptail_ptr);
1992 for (;;)
1993 {
1994 /* Note we advance grouptailpos even if the fetch fails */
1995 winstate->grouptailpos++;
1996 spool_tuples(winstate, winstate->grouptailpos);
1997 if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1998 winstate->temp_slot_2))
1999 break; /* end of partition */
2000 if (winstate->grouptailpos > winstate->currentpos &&
2001 !are_peers(winstate, winstate->temp_slot_2,
2002 winstate->ss.ss_ScanTupleSlot))
2003 break; /* this row is the group tail */
2004 }
2005 ExecClearTuple(winstate->temp_slot_2);
2006 winstate->grouptail_valid = true;
2007
2008 MemoryContextSwitchTo(oldcontext);
2009 }
2010
2011
2012 /* -----------------
2013 * ExecWindowAgg
2014 *
2015 * ExecWindowAgg receives tuples from its outer subplan and
2016 * stores them into a tuplestore, then processes window functions.
2017 * This node doesn't reduce nor qualify any row so the number of
2018 * returned rows is exactly the same as its outer subplan's result.
2019 * -----------------
2020 */
2021 static TupleTableSlot *
ExecWindowAgg(PlanState * pstate)2022 ExecWindowAgg(PlanState *pstate)
2023 {
2024 WindowAggState *winstate = castNode(WindowAggState, pstate);
2025 ExprContext *econtext;
2026 int i;
2027 int numfuncs;
2028
2029 CHECK_FOR_INTERRUPTS();
2030
2031 if (winstate->all_done)
2032 return NULL;
2033
2034 /*
2035 * Compute frame offset values, if any, during first call (or after a
2036 * rescan). These are assumed to hold constant throughout the scan; if
2037 * user gives us a volatile expression, we'll only use its initial value.
2038 */
2039 if (winstate->all_first)
2040 {
2041 int frameOptions = winstate->frameOptions;
2042 ExprContext *econtext = winstate->ss.ps.ps_ExprContext;
2043 Datum value;
2044 bool isnull;
2045 int16 len;
2046 bool byval;
2047
2048 if (frameOptions & FRAMEOPTION_START_OFFSET)
2049 {
2050 Assert(winstate->startOffset != NULL);
2051 value = ExecEvalExprSwitchContext(winstate->startOffset,
2052 econtext,
2053 &isnull);
2054 if (isnull)
2055 ereport(ERROR,
2056 (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
2057 errmsg("frame starting offset must not be null")));
2058 /* copy value into query-lifespan context */
2059 get_typlenbyval(exprType((Node *) winstate->startOffset->expr),
2060 &len, &byval);
2061 winstate->startOffsetValue = datumCopy(value, byval, len);
2062 if (frameOptions & (FRAMEOPTION_ROWS | FRAMEOPTION_GROUPS))
2063 {
2064 /* value is known to be int8 */
2065 int64 offset = DatumGetInt64(value);
2066
2067 if (offset < 0)
2068 ereport(ERROR,
2069 (errcode(ERRCODE_INVALID_PRECEDING_OR_FOLLOWING_SIZE),
2070 errmsg("frame starting offset must not be negative")));
2071 }
2072 }
2073 if (frameOptions & FRAMEOPTION_END_OFFSET)
2074 {
2075 Assert(winstate->endOffset != NULL);
2076 value = ExecEvalExprSwitchContext(winstate->endOffset,
2077 econtext,
2078 &isnull);
2079 if (isnull)
2080 ereport(ERROR,
2081 (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
2082 errmsg("frame ending offset must not be null")));
2083 /* copy value into query-lifespan context */
2084 get_typlenbyval(exprType((Node *) winstate->endOffset->expr),
2085 &len, &byval);
2086 winstate->endOffsetValue = datumCopy(value, byval, len);
2087 if (frameOptions & (FRAMEOPTION_ROWS | FRAMEOPTION_GROUPS))
2088 {
2089 /* value is known to be int8 */
2090 int64 offset = DatumGetInt64(value);
2091
2092 if (offset < 0)
2093 ereport(ERROR,
2094 (errcode(ERRCODE_INVALID_PRECEDING_OR_FOLLOWING_SIZE),
2095 errmsg("frame ending offset must not be negative")));
2096 }
2097 }
2098 winstate->all_first = false;
2099 }
2100
2101 if (winstate->buffer == NULL)
2102 {
2103 /* Initialize for first partition and set current row = 0 */
2104 begin_partition(winstate);
2105 /* If there are no input rows, we'll detect that and exit below */
2106 }
2107 else
2108 {
2109 /* Advance current row within partition */
2110 winstate->currentpos++;
2111 /* This might mean that the frame moves, too */
2112 winstate->framehead_valid = false;
2113 winstate->frametail_valid = false;
2114 /* we don't need to invalidate grouptail here; see below */
2115 }
2116
2117 /*
2118 * Spool all tuples up to and including the current row, if we haven't
2119 * already
2120 */
2121 spool_tuples(winstate, winstate->currentpos);
2122
2123 /* Move to the next partition if we reached the end of this partition */
2124 if (winstate->partition_spooled &&
2125 winstate->currentpos >= winstate->spooled_rows)
2126 {
2127 release_partition(winstate);
2128
2129 if (winstate->more_partitions)
2130 {
2131 begin_partition(winstate);
2132 Assert(winstate->spooled_rows > 0);
2133 }
2134 else
2135 {
2136 winstate->all_done = true;
2137 return NULL;
2138 }
2139 }
2140
2141 /* final output execution is in ps_ExprContext */
2142 econtext = winstate->ss.ps.ps_ExprContext;
2143
2144 /* Clear the per-output-tuple context for current row */
2145 ResetExprContext(econtext);
2146
2147 /*
2148 * Read the current row from the tuplestore, and save in ScanTupleSlot.
2149 * (We can't rely on the outerplan's output slot because we may have to
2150 * read beyond the current row. Also, we have to actually copy the row
2151 * out of the tuplestore, since window function evaluation might cause the
2152 * tuplestore to dump its state to disk.)
2153 *
2154 * In GROUPS mode, or when tracking a group-oriented exclusion clause, we
2155 * must also detect entering a new peer group and update associated state
2156 * when that happens. We use temp_slot_2 to temporarily hold the previous
2157 * row for this purpose.
2158 *
2159 * Current row must be in the tuplestore, since we spooled it above.
2160 */
2161 tuplestore_select_read_pointer(winstate->buffer, winstate->current_ptr);
2162 if ((winstate->frameOptions & (FRAMEOPTION_GROUPS |
2163 FRAMEOPTION_EXCLUDE_GROUP |
2164 FRAMEOPTION_EXCLUDE_TIES)) &&
2165 winstate->currentpos > 0)
2166 {
2167 ExecCopySlot(winstate->temp_slot_2, winstate->ss.ss_ScanTupleSlot);
2168 if (!tuplestore_gettupleslot(winstate->buffer, true, true,
2169 winstate->ss.ss_ScanTupleSlot))
2170 elog(ERROR, "unexpected end of tuplestore");
2171 if (!are_peers(winstate, winstate->temp_slot_2,
2172 winstate->ss.ss_ScanTupleSlot))
2173 {
2174 winstate->currentgroup++;
2175 winstate->groupheadpos = winstate->currentpos;
2176 winstate->grouptail_valid = false;
2177 }
2178 ExecClearTuple(winstate->temp_slot_2);
2179 }
2180 else
2181 {
2182 if (!tuplestore_gettupleslot(winstate->buffer, true, true,
2183 winstate->ss.ss_ScanTupleSlot))
2184 elog(ERROR, "unexpected end of tuplestore");
2185 }
2186
2187 /*
2188 * Evaluate true window functions
2189 */
2190 numfuncs = winstate->numfuncs;
2191 for (i = 0; i < numfuncs; i++)
2192 {
2193 WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
2194
2195 if (perfuncstate->plain_agg)
2196 continue;
2197 eval_windowfunction(winstate, perfuncstate,
2198 &(econtext->ecxt_aggvalues[perfuncstate->wfuncstate->wfuncno]),
2199 &(econtext->ecxt_aggnulls[perfuncstate->wfuncstate->wfuncno]));
2200 }
2201
2202 /*
2203 * Evaluate aggregates
2204 */
2205 if (winstate->numaggs > 0)
2206 eval_windowaggregates(winstate);
2207
2208 /*
2209 * If we have created auxiliary read pointers for the frame or group
2210 * boundaries, force them to be kept up-to-date, because we don't know
2211 * whether the window function(s) will do anything that requires that.
2212 * Failing to advance the pointers would result in being unable to trim
2213 * data from the tuplestore, which is bad. (If we could know in advance
2214 * whether the window functions will use frame boundary info, we could
2215 * skip creating these pointers in the first place ... but unfortunately
2216 * the window function API doesn't require that.)
2217 */
2218 if (winstate->framehead_ptr >= 0)
2219 update_frameheadpos(winstate);
2220 if (winstate->frametail_ptr >= 0)
2221 update_frametailpos(winstate);
2222 if (winstate->grouptail_ptr >= 0)
2223 update_grouptailpos(winstate);
2224
2225 /*
2226 * Truncate any no-longer-needed rows from the tuplestore.
2227 */
2228 tuplestore_trim(winstate->buffer);
2229
2230 /*
2231 * Form and return a projection tuple using the windowfunc results and the
2232 * current row. Setting ecxt_outertuple arranges that any Vars will be
2233 * evaluated with respect to that row.
2234 */
2235 econtext->ecxt_outertuple = winstate->ss.ss_ScanTupleSlot;
2236
2237 return ExecProject(winstate->ss.ps.ps_ProjInfo);
2238 }
2239
2240 /* -----------------
2241 * ExecInitWindowAgg
2242 *
2243 * Creates the run-time information for the WindowAgg node produced by the
2244 * planner and initializes its outer subtree
2245 * -----------------
2246 */
2247 WindowAggState *
ExecInitWindowAgg(WindowAgg * node,EState * estate,int eflags)2248 ExecInitWindowAgg(WindowAgg *node, EState *estate, int eflags)
2249 {
2250 WindowAggState *winstate;
2251 Plan *outerPlan;
2252 ExprContext *econtext;
2253 ExprContext *tmpcontext;
2254 WindowStatePerFunc perfunc;
2255 WindowStatePerAgg peragg;
2256 int frameOptions = node->frameOptions;
2257 int numfuncs,
2258 wfuncno,
2259 numaggs,
2260 aggno;
2261 TupleDesc scanDesc;
2262 ListCell *l;
2263
2264 /* check for unsupported flags */
2265 Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
2266
2267 /*
2268 * create state structure
2269 */
2270 winstate = makeNode(WindowAggState);
2271 winstate->ss.ps.plan = (Plan *) node;
2272 winstate->ss.ps.state = estate;
2273 winstate->ss.ps.ExecProcNode = ExecWindowAgg;
2274
2275 /*
2276 * Create expression contexts. We need two, one for per-input-tuple
2277 * processing and one for per-output-tuple processing. We cheat a little
2278 * by using ExecAssignExprContext() to build both.
2279 */
2280 ExecAssignExprContext(estate, &winstate->ss.ps);
2281 tmpcontext = winstate->ss.ps.ps_ExprContext;
2282 winstate->tmpcontext = tmpcontext;
2283 ExecAssignExprContext(estate, &winstate->ss.ps);
2284
2285 /* Create long-lived context for storage of partition-local memory etc */
2286 winstate->partcontext =
2287 AllocSetContextCreate(CurrentMemoryContext,
2288 "WindowAgg Partition",
2289 ALLOCSET_DEFAULT_SIZES);
2290
2291 /*
2292 * Create mid-lived context for aggregate trans values etc.
2293 *
2294 * Note that moving aggregates each use their own private context, not
2295 * this one.
2296 */
2297 winstate->aggcontext =
2298 AllocSetContextCreate(CurrentMemoryContext,
2299 "WindowAgg Aggregates",
2300 ALLOCSET_DEFAULT_SIZES);
2301
2302 /*
2303 * WindowAgg nodes never have quals, since they can only occur at the
2304 * logical top level of a query (ie, after any WHERE or HAVING filters)
2305 */
2306 Assert(node->plan.qual == NIL);
2307 winstate->ss.ps.qual = NULL;
2308
2309 /*
2310 * initialize child nodes
2311 */
2312 outerPlan = outerPlan(node);
2313 outerPlanState(winstate) = ExecInitNode(outerPlan, estate, eflags);
2314
2315 /*
2316 * initialize source tuple type (which is also the tuple type that we'll
2317 * store in the tuplestore and use in all our working slots).
2318 */
2319 ExecCreateScanSlotFromOuterPlan(estate, &winstate->ss);
2320 scanDesc = winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
2321
2322 /*
2323 * tuple table initialization
2324 */
2325 winstate->first_part_slot = ExecInitExtraTupleSlot(estate, scanDesc);
2326 winstate->agg_row_slot = ExecInitExtraTupleSlot(estate, scanDesc);
2327 winstate->temp_slot_1 = ExecInitExtraTupleSlot(estate, scanDesc);
2328 winstate->temp_slot_2 = ExecInitExtraTupleSlot(estate, scanDesc);
2329
2330 /*
2331 * create frame head and tail slots only if needed (must create slots in
2332 * exactly the same cases that update_frameheadpos and update_frametailpos
2333 * need them)
2334 */
2335 winstate->framehead_slot = winstate->frametail_slot = NULL;
2336
2337 if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS))
2338 {
2339 if (((frameOptions & FRAMEOPTION_START_CURRENT_ROW) &&
2340 node->ordNumCols != 0) ||
2341 (frameOptions & FRAMEOPTION_START_OFFSET))
2342 winstate->framehead_slot = ExecInitExtraTupleSlot(estate, scanDesc);
2343 if (((frameOptions & FRAMEOPTION_END_CURRENT_ROW) &&
2344 node->ordNumCols != 0) ||
2345 (frameOptions & FRAMEOPTION_END_OFFSET))
2346 winstate->frametail_slot = ExecInitExtraTupleSlot(estate, scanDesc);
2347 }
2348
2349 /*
2350 * Initialize result slot, type and projection.
2351 */
2352 ExecInitResultTupleSlotTL(estate, &winstate->ss.ps);
2353 ExecAssignProjectionInfo(&winstate->ss.ps, NULL);
2354
2355 /* Set up data for comparing tuples */
2356 if (node->partNumCols > 0)
2357 winstate->partEqfunction =
2358 execTuplesMatchPrepare(scanDesc,
2359 node->partNumCols,
2360 node->partColIdx,
2361 node->partOperators,
2362 &winstate->ss.ps);
2363
2364 if (node->ordNumCols > 0)
2365 winstate->ordEqfunction =
2366 execTuplesMatchPrepare(scanDesc,
2367 node->ordNumCols,
2368 node->ordColIdx,
2369 node->ordOperators,
2370 &winstate->ss.ps);
2371
2372 /*
2373 * WindowAgg nodes use aggvalues and aggnulls as well as Agg nodes.
2374 */
2375 numfuncs = winstate->numfuncs;
2376 numaggs = winstate->numaggs;
2377 econtext = winstate->ss.ps.ps_ExprContext;
2378 econtext->ecxt_aggvalues = (Datum *) palloc0(sizeof(Datum) * numfuncs);
2379 econtext->ecxt_aggnulls = (bool *) palloc0(sizeof(bool) * numfuncs);
2380
2381 /*
2382 * allocate per-wfunc/per-agg state information.
2383 */
2384 perfunc = (WindowStatePerFunc) palloc0(sizeof(WindowStatePerFuncData) * numfuncs);
2385 peragg = (WindowStatePerAgg) palloc0(sizeof(WindowStatePerAggData) * numaggs);
2386 winstate->perfunc = perfunc;
2387 winstate->peragg = peragg;
2388
2389 wfuncno = -1;
2390 aggno = -1;
2391 foreach(l, winstate->funcs)
2392 {
2393 WindowFuncExprState *wfuncstate = (WindowFuncExprState *) lfirst(l);
2394 WindowFunc *wfunc = wfuncstate->wfunc;
2395 WindowStatePerFunc perfuncstate;
2396 AclResult aclresult;
2397 int i;
2398
2399 if (wfunc->winref != node->winref) /* planner screwed up? */
2400 elog(ERROR, "WindowFunc with winref %u assigned to WindowAgg with winref %u",
2401 wfunc->winref, node->winref);
2402
2403 /* Look for a previous duplicate window function */
2404 for (i = 0; i <= wfuncno; i++)
2405 {
2406 if (equal(wfunc, perfunc[i].wfunc) &&
2407 !contain_volatile_functions((Node *) wfunc))
2408 break;
2409 }
2410 if (i <= wfuncno)
2411 {
2412 /* Found a match to an existing entry, so just mark it */
2413 wfuncstate->wfuncno = i;
2414 continue;
2415 }
2416
2417 /* Nope, so assign a new PerAgg record */
2418 perfuncstate = &perfunc[++wfuncno];
2419
2420 /* Mark WindowFunc state node with assigned index in the result array */
2421 wfuncstate->wfuncno = wfuncno;
2422
2423 /* Check permission to call window function */
2424 aclresult = pg_proc_aclcheck(wfunc->winfnoid, GetUserId(),
2425 ACL_EXECUTE);
2426 if (aclresult != ACLCHECK_OK)
2427 aclcheck_error(aclresult, OBJECT_FUNCTION,
2428 get_func_name(wfunc->winfnoid));
2429 InvokeFunctionExecuteHook(wfunc->winfnoid);
2430
2431 /* Fill in the perfuncstate data */
2432 perfuncstate->wfuncstate = wfuncstate;
2433 perfuncstate->wfunc = wfunc;
2434 perfuncstate->numArguments = list_length(wfuncstate->args);
2435
2436 fmgr_info_cxt(wfunc->winfnoid, &perfuncstate->flinfo,
2437 econtext->ecxt_per_query_memory);
2438 fmgr_info_set_expr((Node *) wfunc, &perfuncstate->flinfo);
2439
2440 perfuncstate->winCollation = wfunc->inputcollid;
2441
2442 get_typlenbyval(wfunc->wintype,
2443 &perfuncstate->resulttypeLen,
2444 &perfuncstate->resulttypeByVal);
2445
2446 /*
2447 * If it's really just a plain aggregate function, we'll emulate the
2448 * Agg environment for it.
2449 */
2450 perfuncstate->plain_agg = wfunc->winagg;
2451 if (wfunc->winagg)
2452 {
2453 WindowStatePerAgg peraggstate;
2454
2455 perfuncstate->aggno = ++aggno;
2456 peraggstate = &winstate->peragg[aggno];
2457 initialize_peragg(winstate, wfunc, peraggstate);
2458 peraggstate->wfuncno = wfuncno;
2459 }
2460 else
2461 {
2462 WindowObject winobj = makeNode(WindowObjectData);
2463
2464 winobj->winstate = winstate;
2465 winobj->argstates = wfuncstate->args;
2466 winobj->localmem = NULL;
2467 perfuncstate->winobj = winobj;
2468 }
2469 }
2470
2471 /* Update numfuncs, numaggs to match number of unique functions found */
2472 winstate->numfuncs = wfuncno + 1;
2473 winstate->numaggs = aggno + 1;
2474
2475 /* Set up WindowObject for aggregates, if needed */
2476 if (winstate->numaggs > 0)
2477 {
2478 WindowObject agg_winobj = makeNode(WindowObjectData);
2479
2480 agg_winobj->winstate = winstate;
2481 agg_winobj->argstates = NIL;
2482 agg_winobj->localmem = NULL;
2483 /* make sure markptr = -1 to invalidate. It may not get used */
2484 agg_winobj->markptr = -1;
2485 agg_winobj->readptr = -1;
2486 winstate->agg_winobj = agg_winobj;
2487 }
2488
2489 /* copy frame options to state node for easy access */
2490 winstate->frameOptions = frameOptions;
2491
2492 /* initialize frame bound offset expressions */
2493 winstate->startOffset = ExecInitExpr((Expr *) node->startOffset,
2494 (PlanState *) winstate);
2495 winstate->endOffset = ExecInitExpr((Expr *) node->endOffset,
2496 (PlanState *) winstate);
2497
2498 /* Lookup in_range support functions if needed */
2499 if (OidIsValid(node->startInRangeFunc))
2500 fmgr_info(node->startInRangeFunc, &winstate->startInRangeFunc);
2501 if (OidIsValid(node->endInRangeFunc))
2502 fmgr_info(node->endInRangeFunc, &winstate->endInRangeFunc);
2503 winstate->inRangeColl = node->inRangeColl;
2504 winstate->inRangeAsc = node->inRangeAsc;
2505 winstate->inRangeNullsFirst = node->inRangeNullsFirst;
2506
2507 winstate->all_first = true;
2508 winstate->partition_spooled = false;
2509 winstate->more_partitions = false;
2510
2511 return winstate;
2512 }
2513
2514 /* -----------------
2515 * ExecEndWindowAgg
2516 * -----------------
2517 */
2518 void
ExecEndWindowAgg(WindowAggState * node)2519 ExecEndWindowAgg(WindowAggState *node)
2520 {
2521 PlanState *outerPlan;
2522 int i;
2523
2524 release_partition(node);
2525
2526 ExecClearTuple(node->ss.ss_ScanTupleSlot);
2527 ExecClearTuple(node->first_part_slot);
2528 ExecClearTuple(node->agg_row_slot);
2529 ExecClearTuple(node->temp_slot_1);
2530 ExecClearTuple(node->temp_slot_2);
2531 if (node->framehead_slot)
2532 ExecClearTuple(node->framehead_slot);
2533 if (node->frametail_slot)
2534 ExecClearTuple(node->frametail_slot);
2535
2536 /*
2537 * Free both the expr contexts.
2538 */
2539 ExecFreeExprContext(&node->ss.ps);
2540 node->ss.ps.ps_ExprContext = node->tmpcontext;
2541 ExecFreeExprContext(&node->ss.ps);
2542
2543 for (i = 0; i < node->numaggs; i++)
2544 {
2545 if (node->peragg[i].aggcontext != node->aggcontext)
2546 MemoryContextDelete(node->peragg[i].aggcontext);
2547 }
2548 MemoryContextDelete(node->partcontext);
2549 MemoryContextDelete(node->aggcontext);
2550
2551 pfree(node->perfunc);
2552 pfree(node->peragg);
2553
2554 outerPlan = outerPlanState(node);
2555 ExecEndNode(outerPlan);
2556 }
2557
2558 /* -----------------
2559 * ExecReScanWindowAgg
2560 * -----------------
2561 */
2562 void
ExecReScanWindowAgg(WindowAggState * node)2563 ExecReScanWindowAgg(WindowAggState *node)
2564 {
2565 PlanState *outerPlan = outerPlanState(node);
2566 ExprContext *econtext = node->ss.ps.ps_ExprContext;
2567
2568 node->all_done = false;
2569 node->all_first = true;
2570
2571 /* release tuplestore et al */
2572 release_partition(node);
2573
2574 /* release all temp tuples, but especially first_part_slot */
2575 ExecClearTuple(node->ss.ss_ScanTupleSlot);
2576 ExecClearTuple(node->first_part_slot);
2577 ExecClearTuple(node->agg_row_slot);
2578 ExecClearTuple(node->temp_slot_1);
2579 ExecClearTuple(node->temp_slot_2);
2580 if (node->framehead_slot)
2581 ExecClearTuple(node->framehead_slot);
2582 if (node->frametail_slot)
2583 ExecClearTuple(node->frametail_slot);
2584
2585 /* Forget current wfunc values */
2586 MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * node->numfuncs);
2587 MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * node->numfuncs);
2588
2589 /*
2590 * if chgParam of subnode is not null then plan will be re-scanned by
2591 * first ExecProcNode.
2592 */
2593 if (outerPlan->chgParam == NULL)
2594 ExecReScan(outerPlan);
2595 }
2596
2597 /*
2598 * initialize_peragg
2599 *
2600 * Almost same as in nodeAgg.c, except we don't support DISTINCT currently.
2601 */
2602 static WindowStatePerAggData *
initialize_peragg(WindowAggState * winstate,WindowFunc * wfunc,WindowStatePerAgg peraggstate)2603 initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
2604 WindowStatePerAgg peraggstate)
2605 {
2606 Oid inputTypes[FUNC_MAX_ARGS];
2607 int numArguments;
2608 HeapTuple aggTuple;
2609 Form_pg_aggregate aggform;
2610 Oid aggtranstype;
2611 AttrNumber initvalAttNo;
2612 AclResult aclresult;
2613 bool use_ma_code;
2614 Oid transfn_oid,
2615 invtransfn_oid,
2616 finalfn_oid;
2617 bool finalextra;
2618 char finalmodify;
2619 Expr *transfnexpr,
2620 *invtransfnexpr,
2621 *finalfnexpr;
2622 Datum textInitVal;
2623 int i;
2624 ListCell *lc;
2625
2626 numArguments = list_length(wfunc->args);
2627
2628 i = 0;
2629 foreach(lc, wfunc->args)
2630 {
2631 inputTypes[i++] = exprType((Node *) lfirst(lc));
2632 }
2633
2634 aggTuple = SearchSysCache1(AGGFNOID, ObjectIdGetDatum(wfunc->winfnoid));
2635 if (!HeapTupleIsValid(aggTuple))
2636 elog(ERROR, "cache lookup failed for aggregate %u",
2637 wfunc->winfnoid);
2638 aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
2639
2640 /*
2641 * Figure out whether we want to use the moving-aggregate implementation,
2642 * and collect the right set of fields from the pg_attribute entry.
2643 *
2644 * It's possible that an aggregate would supply a safe moving-aggregate
2645 * implementation and an unsafe normal one, in which case our hand is
2646 * forced. Otherwise, if the frame head can't move, we don't need
2647 * moving-aggregate code. Even if we'd like to use it, don't do so if the
2648 * aggregate's arguments (and FILTER clause if any) contain any calls to
2649 * volatile functions. Otherwise, the difference between restarting and
2650 * not restarting the aggregation would be user-visible.
2651 */
2652 if (!OidIsValid(aggform->aggminvtransfn))
2653 use_ma_code = false; /* sine qua non */
2654 else if (aggform->aggmfinalmodify == AGGMODIFY_READ_ONLY &&
2655 aggform->aggfinalmodify != AGGMODIFY_READ_ONLY)
2656 use_ma_code = true; /* decision forced by safety */
2657 else if (winstate->frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING)
2658 use_ma_code = false; /* non-moving frame head */
2659 else if (contain_volatile_functions((Node *) wfunc))
2660 use_ma_code = false; /* avoid possible behavioral change */
2661 else
2662 use_ma_code = true; /* yes, let's use it */
2663 if (use_ma_code)
2664 {
2665 peraggstate->transfn_oid = transfn_oid = aggform->aggmtransfn;
2666 peraggstate->invtransfn_oid = invtransfn_oid = aggform->aggminvtransfn;
2667 peraggstate->finalfn_oid = finalfn_oid = aggform->aggmfinalfn;
2668 finalextra = aggform->aggmfinalextra;
2669 finalmodify = aggform->aggmfinalmodify;
2670 aggtranstype = aggform->aggmtranstype;
2671 initvalAttNo = Anum_pg_aggregate_aggminitval;
2672 }
2673 else
2674 {
2675 peraggstate->transfn_oid = transfn_oid = aggform->aggtransfn;
2676 peraggstate->invtransfn_oid = invtransfn_oid = InvalidOid;
2677 peraggstate->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
2678 finalextra = aggform->aggfinalextra;
2679 finalmodify = aggform->aggfinalmodify;
2680 aggtranstype = aggform->aggtranstype;
2681 initvalAttNo = Anum_pg_aggregate_agginitval;
2682 }
2683
2684 /*
2685 * ExecInitWindowAgg already checked permission to call aggregate function
2686 * ... but we still need to check the component functions
2687 */
2688
2689 /* Check that aggregate owner has permission to call component fns */
2690 {
2691 HeapTuple procTuple;
2692 Oid aggOwner;
2693
2694 procTuple = SearchSysCache1(PROCOID,
2695 ObjectIdGetDatum(wfunc->winfnoid));
2696 if (!HeapTupleIsValid(procTuple))
2697 elog(ERROR, "cache lookup failed for function %u",
2698 wfunc->winfnoid);
2699 aggOwner = ((Form_pg_proc) GETSTRUCT(procTuple))->proowner;
2700 ReleaseSysCache(procTuple);
2701
2702 aclresult = pg_proc_aclcheck(transfn_oid, aggOwner,
2703 ACL_EXECUTE);
2704 if (aclresult != ACLCHECK_OK)
2705 aclcheck_error(aclresult, OBJECT_FUNCTION,
2706 get_func_name(transfn_oid));
2707 InvokeFunctionExecuteHook(transfn_oid);
2708
2709 if (OidIsValid(invtransfn_oid))
2710 {
2711 aclresult = pg_proc_aclcheck(invtransfn_oid, aggOwner,
2712 ACL_EXECUTE);
2713 if (aclresult != ACLCHECK_OK)
2714 aclcheck_error(aclresult, OBJECT_FUNCTION,
2715 get_func_name(invtransfn_oid));
2716 InvokeFunctionExecuteHook(invtransfn_oid);
2717 }
2718
2719 if (OidIsValid(finalfn_oid))
2720 {
2721 aclresult = pg_proc_aclcheck(finalfn_oid, aggOwner,
2722 ACL_EXECUTE);
2723 if (aclresult != ACLCHECK_OK)
2724 aclcheck_error(aclresult, OBJECT_FUNCTION,
2725 get_func_name(finalfn_oid));
2726 InvokeFunctionExecuteHook(finalfn_oid);
2727 }
2728 }
2729
2730 /*
2731 * If the selected finalfn isn't read-only, we can't run this aggregate as
2732 * a window function. This is a user-facing error, so we take a bit more
2733 * care with the error message than elsewhere in this function.
2734 */
2735 if (finalmodify != AGGMODIFY_READ_ONLY)
2736 ereport(ERROR,
2737 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2738 errmsg("aggregate function %s does not support use as a window function",
2739 format_procedure(wfunc->winfnoid))));
2740
2741 /* Detect how many arguments to pass to the finalfn */
2742 if (finalextra)
2743 peraggstate->numFinalArgs = numArguments + 1;
2744 else
2745 peraggstate->numFinalArgs = 1;
2746
2747 /* resolve actual type of transition state, if polymorphic */
2748 aggtranstype = resolve_aggregate_transtype(wfunc->winfnoid,
2749 aggtranstype,
2750 inputTypes,
2751 numArguments);
2752
2753 /* build expression trees using actual argument & result types */
2754 build_aggregate_transfn_expr(inputTypes,
2755 numArguments,
2756 0, /* no ordered-set window functions yet */
2757 false, /* no variadic window functions yet */
2758 aggtranstype,
2759 wfunc->inputcollid,
2760 transfn_oid,
2761 invtransfn_oid,
2762 &transfnexpr,
2763 &invtransfnexpr);
2764
2765 /* set up infrastructure for calling the transfn(s) and finalfn */
2766 fmgr_info(transfn_oid, &peraggstate->transfn);
2767 fmgr_info_set_expr((Node *) transfnexpr, &peraggstate->transfn);
2768
2769 if (OidIsValid(invtransfn_oid))
2770 {
2771 fmgr_info(invtransfn_oid, &peraggstate->invtransfn);
2772 fmgr_info_set_expr((Node *) invtransfnexpr, &peraggstate->invtransfn);
2773 }
2774
2775 if (OidIsValid(finalfn_oid))
2776 {
2777 build_aggregate_finalfn_expr(inputTypes,
2778 peraggstate->numFinalArgs,
2779 aggtranstype,
2780 wfunc->wintype,
2781 wfunc->inputcollid,
2782 finalfn_oid,
2783 &finalfnexpr);
2784 fmgr_info(finalfn_oid, &peraggstate->finalfn);
2785 fmgr_info_set_expr((Node *) finalfnexpr, &peraggstate->finalfn);
2786 }
2787
2788 /* get info about relevant datatypes */
2789 get_typlenbyval(wfunc->wintype,
2790 &peraggstate->resulttypeLen,
2791 &peraggstate->resulttypeByVal);
2792 get_typlenbyval(aggtranstype,
2793 &peraggstate->transtypeLen,
2794 &peraggstate->transtypeByVal);
2795
2796 /*
2797 * initval is potentially null, so don't try to access it as a struct
2798 * field. Must do it the hard way with SysCacheGetAttr.
2799 */
2800 textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple, initvalAttNo,
2801 &peraggstate->initValueIsNull);
2802
2803 if (peraggstate->initValueIsNull)
2804 peraggstate->initValue = (Datum) 0;
2805 else
2806 peraggstate->initValue = GetAggInitVal(textInitVal,
2807 aggtranstype);
2808
2809 /*
2810 * If the transfn is strict and the initval is NULL, make sure input type
2811 * and transtype are the same (or at least binary-compatible), so that
2812 * it's OK to use the first input value as the initial transValue. This
2813 * should have been checked at agg definition time, but we must check
2814 * again in case the transfn's strictness property has been changed.
2815 */
2816 if (peraggstate->transfn.fn_strict && peraggstate->initValueIsNull)
2817 {
2818 if (numArguments < 1 ||
2819 !IsBinaryCoercible(inputTypes[0], aggtranstype))
2820 ereport(ERROR,
2821 (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
2822 errmsg("aggregate %u needs to have compatible input type and transition type",
2823 wfunc->winfnoid)));
2824 }
2825
2826 /*
2827 * Insist that forward and inverse transition functions have the same
2828 * strictness setting. Allowing them to differ would require handling
2829 * more special cases in advance_windowaggregate and
2830 * advance_windowaggregate_base, for no discernible benefit. This should
2831 * have been checked at agg definition time, but we must check again in
2832 * case either function's strictness property has been changed.
2833 */
2834 if (OidIsValid(invtransfn_oid) &&
2835 peraggstate->transfn.fn_strict != peraggstate->invtransfn.fn_strict)
2836 ereport(ERROR,
2837 (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
2838 errmsg("strictness of aggregate's forward and inverse transition functions must match")));
2839
2840 /*
2841 * Moving aggregates use their own aggcontext.
2842 *
2843 * This is necessary because they might restart at different times, so we
2844 * might never be able to reset the shared context otherwise. We can't
2845 * make it the aggregates' responsibility to clean up after themselves,
2846 * because strict aggregates must be restarted whenever we remove their
2847 * last non-NULL input, which the aggregate won't be aware is happening.
2848 * Also, just pfree()ing the transValue upon restarting wouldn't help,
2849 * since we'd miss any indirectly referenced data. We could, in theory,
2850 * make the memory allocation rules for moving aggregates different than
2851 * they have historically been for plain aggregates, but that seems grotty
2852 * and likely to lead to memory leaks.
2853 */
2854 if (OidIsValid(invtransfn_oid))
2855 peraggstate->aggcontext =
2856 AllocSetContextCreate(CurrentMemoryContext,
2857 "WindowAgg Per Aggregate",
2858 ALLOCSET_DEFAULT_SIZES);
2859 else
2860 peraggstate->aggcontext = winstate->aggcontext;
2861
2862 ReleaseSysCache(aggTuple);
2863
2864 return peraggstate;
2865 }
2866
2867 static Datum
GetAggInitVal(Datum textInitVal,Oid transtype)2868 GetAggInitVal(Datum textInitVal, Oid transtype)
2869 {
2870 Oid typinput,
2871 typioparam;
2872 char *strInitVal;
2873 Datum initVal;
2874
2875 getTypeInputInfo(transtype, &typinput, &typioparam);
2876 strInitVal = TextDatumGetCString(textInitVal);
2877 initVal = OidInputFunctionCall(typinput, strInitVal,
2878 typioparam, -1);
2879 pfree(strInitVal);
2880 return initVal;
2881 }
2882
2883 /*
2884 * are_peers
2885 * compare two rows to see if they are equal according to the ORDER BY clause
2886 *
2887 * NB: this does not consider the window frame mode.
2888 */
2889 static bool
are_peers(WindowAggState * winstate,TupleTableSlot * slot1,TupleTableSlot * slot2)2890 are_peers(WindowAggState *winstate, TupleTableSlot *slot1,
2891 TupleTableSlot *slot2)
2892 {
2893 WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
2894 ExprContext *econtext = winstate->tmpcontext;
2895
2896 /* If no ORDER BY, all rows are peers with each other */
2897 if (node->ordNumCols == 0)
2898 return true;
2899
2900 econtext->ecxt_outertuple = slot1;
2901 econtext->ecxt_innertuple = slot2;
2902 return ExecQualAndReset(winstate->ordEqfunction, econtext);
2903 }
2904
2905 /*
2906 * window_gettupleslot
2907 * Fetch the pos'th tuple of the current partition into the slot,
2908 * using the winobj's read pointer
2909 *
2910 * Returns true if successful, false if no such row
2911 */
2912 static bool
window_gettupleslot(WindowObject winobj,int64 pos,TupleTableSlot * slot)2913 window_gettupleslot(WindowObject winobj, int64 pos, TupleTableSlot *slot)
2914 {
2915 WindowAggState *winstate = winobj->winstate;
2916 MemoryContext oldcontext;
2917
2918 /* often called repeatedly in a row */
2919 CHECK_FOR_INTERRUPTS();
2920
2921 /* Don't allow passing -1 to spool_tuples here */
2922 if (pos < 0)
2923 return false;
2924
2925 /* If necessary, fetch the tuple into the spool */
2926 spool_tuples(winstate, pos);
2927
2928 if (pos >= winstate->spooled_rows)
2929 return false;
2930
2931 if (pos < winobj->markpos)
2932 elog(ERROR, "cannot fetch row before WindowObject's mark position");
2933
2934 oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
2935
2936 tuplestore_select_read_pointer(winstate->buffer, winobj->readptr);
2937
2938 /*
2939 * Advance or rewind until we are within one tuple of the one we want.
2940 */
2941 if (winobj->seekpos < pos - 1)
2942 {
2943 if (!tuplestore_skiptuples(winstate->buffer,
2944 pos - 1 - winobj->seekpos,
2945 true))
2946 elog(ERROR, "unexpected end of tuplestore");
2947 winobj->seekpos = pos - 1;
2948 }
2949 else if (winobj->seekpos > pos + 1)
2950 {
2951 if (!tuplestore_skiptuples(winstate->buffer,
2952 winobj->seekpos - (pos + 1),
2953 false))
2954 elog(ERROR, "unexpected end of tuplestore");
2955 winobj->seekpos = pos + 1;
2956 }
2957 else if (winobj->seekpos == pos)
2958 {
2959 /*
2960 * There's no API to refetch the tuple at the current position. We
2961 * have to move one tuple forward, and then one backward. (We don't
2962 * do it the other way because we might try to fetch the row before
2963 * our mark, which isn't allowed.) XXX this case could stand to be
2964 * optimized.
2965 */
2966 tuplestore_advance(winstate->buffer, true);
2967 winobj->seekpos++;
2968 }
2969
2970 /*
2971 * Now we should be on the tuple immediately before or after the one we
2972 * want, so just fetch forwards or backwards as appropriate.
2973 */
2974 if (winobj->seekpos > pos)
2975 {
2976 if (!tuplestore_gettupleslot(winstate->buffer, false, true, slot))
2977 elog(ERROR, "unexpected end of tuplestore");
2978 winobj->seekpos--;
2979 }
2980 else
2981 {
2982 if (!tuplestore_gettupleslot(winstate->buffer, true, true, slot))
2983 elog(ERROR, "unexpected end of tuplestore");
2984 winobj->seekpos++;
2985 }
2986
2987 Assert(winobj->seekpos == pos);
2988
2989 MemoryContextSwitchTo(oldcontext);
2990
2991 return true;
2992 }
2993
2994
2995 /***********************************************************************
2996 * API exposed to window functions
2997 ***********************************************************************/
2998
2999
3000 /*
3001 * WinGetPartitionLocalMemory
3002 * Get working memory that lives till end of partition processing
3003 *
3004 * On first call within a given partition, this allocates and zeroes the
3005 * requested amount of space. Subsequent calls just return the same chunk.
3006 *
3007 * Memory obtained this way is normally used to hold state that should be
3008 * automatically reset for each new partition. If a window function wants
3009 * to hold state across the whole query, fcinfo->fn_extra can be used in the
3010 * usual way for that.
3011 */
3012 void *
WinGetPartitionLocalMemory(WindowObject winobj,Size sz)3013 WinGetPartitionLocalMemory(WindowObject winobj, Size sz)
3014 {
3015 Assert(WindowObjectIsValid(winobj));
3016 if (winobj->localmem == NULL)
3017 winobj->localmem =
3018 MemoryContextAllocZero(winobj->winstate->partcontext, sz);
3019 return winobj->localmem;
3020 }
3021
3022 /*
3023 * WinGetCurrentPosition
3024 * Return the current row's position (counting from 0) within the current
3025 * partition.
3026 */
3027 int64
WinGetCurrentPosition(WindowObject winobj)3028 WinGetCurrentPosition(WindowObject winobj)
3029 {
3030 Assert(WindowObjectIsValid(winobj));
3031 return winobj->winstate->currentpos;
3032 }
3033
3034 /*
3035 * WinGetPartitionRowCount
3036 * Return total number of rows contained in the current partition.
3037 *
3038 * Note: this is a relatively expensive operation because it forces the
3039 * whole partition to be "spooled" into the tuplestore at once. Once
3040 * executed, however, additional calls within the same partition are cheap.
3041 */
3042 int64
WinGetPartitionRowCount(WindowObject winobj)3043 WinGetPartitionRowCount(WindowObject winobj)
3044 {
3045 Assert(WindowObjectIsValid(winobj));
3046 spool_tuples(winobj->winstate, -1);
3047 return winobj->winstate->spooled_rows;
3048 }
3049
3050 /*
3051 * WinSetMarkPosition
3052 * Set the "mark" position for the window object, which is the oldest row
3053 * number (counting from 0) it is allowed to fetch during all subsequent
3054 * operations within the current partition.
3055 *
3056 * Window functions do not have to call this, but are encouraged to move the
3057 * mark forward when possible to keep the tuplestore size down and prevent
3058 * having to spill rows to disk.
3059 */
3060 void
WinSetMarkPosition(WindowObject winobj,int64 markpos)3061 WinSetMarkPosition(WindowObject winobj, int64 markpos)
3062 {
3063 WindowAggState *winstate;
3064
3065 Assert(WindowObjectIsValid(winobj));
3066 winstate = winobj->winstate;
3067
3068 if (markpos < winobj->markpos)
3069 elog(ERROR, "cannot move WindowObject's mark position backward");
3070 tuplestore_select_read_pointer(winstate->buffer, winobj->markptr);
3071 if (markpos > winobj->markpos)
3072 {
3073 tuplestore_skiptuples(winstate->buffer,
3074 markpos - winobj->markpos,
3075 true);
3076 winobj->markpos = markpos;
3077 }
3078 tuplestore_select_read_pointer(winstate->buffer, winobj->readptr);
3079 if (markpos > winobj->seekpos)
3080 {
3081 tuplestore_skiptuples(winstate->buffer,
3082 markpos - winobj->seekpos,
3083 true);
3084 winobj->seekpos = markpos;
3085 }
3086 }
3087
3088 /*
3089 * WinRowsArePeers
3090 * Compare two rows (specified by absolute position in partition) to see
3091 * if they are equal according to the ORDER BY clause.
3092 *
3093 * NB: this does not consider the window frame mode.
3094 */
3095 bool
WinRowsArePeers(WindowObject winobj,int64 pos1,int64 pos2)3096 WinRowsArePeers(WindowObject winobj, int64 pos1, int64 pos2)
3097 {
3098 WindowAggState *winstate;
3099 WindowAgg *node;
3100 TupleTableSlot *slot1;
3101 TupleTableSlot *slot2;
3102 bool res;
3103
3104 Assert(WindowObjectIsValid(winobj));
3105 winstate = winobj->winstate;
3106 node = (WindowAgg *) winstate->ss.ps.plan;
3107
3108 /* If no ORDER BY, all rows are peers; don't bother to fetch them */
3109 if (node->ordNumCols == 0)
3110 return true;
3111
3112 /*
3113 * Note: OK to use temp_slot_2 here because we aren't calling any
3114 * frame-related functions (those tend to clobber temp_slot_2).
3115 */
3116 slot1 = winstate->temp_slot_1;
3117 slot2 = winstate->temp_slot_2;
3118
3119 if (!window_gettupleslot(winobj, pos1, slot1))
3120 elog(ERROR, "specified position is out of window: " INT64_FORMAT,
3121 pos1);
3122 if (!window_gettupleslot(winobj, pos2, slot2))
3123 elog(ERROR, "specified position is out of window: " INT64_FORMAT,
3124 pos2);
3125
3126 res = are_peers(winstate, slot1, slot2);
3127
3128 ExecClearTuple(slot1);
3129 ExecClearTuple(slot2);
3130
3131 return res;
3132 }
3133
3134 /*
3135 * WinGetFuncArgInPartition
3136 * Evaluate a window function's argument expression on a specified
3137 * row of the partition. The row is identified in lseek(2) style,
3138 * i.e. relative to the current, first, or last row.
3139 *
3140 * argno: argument number to evaluate (counted from 0)
3141 * relpos: signed rowcount offset from the seek position
3142 * seektype: WINDOW_SEEK_CURRENT, WINDOW_SEEK_HEAD, or WINDOW_SEEK_TAIL
3143 * set_mark: If the row is found and set_mark is true, the mark is moved to
3144 * the row as a side-effect.
3145 * isnull: output argument, receives isnull status of result
3146 * isout: output argument, set to indicate whether target row position
3147 * is out of partition (can pass NULL if caller doesn't care about this)
3148 *
3149 * Specifying a nonexistent row is not an error, it just causes a null result
3150 * (plus setting *isout true, if isout isn't NULL).
3151 */
3152 Datum
WinGetFuncArgInPartition(WindowObject winobj,int argno,int relpos,int seektype,bool set_mark,bool * isnull,bool * isout)3153 WinGetFuncArgInPartition(WindowObject winobj, int argno,
3154 int relpos, int seektype, bool set_mark,
3155 bool *isnull, bool *isout)
3156 {
3157 WindowAggState *winstate;
3158 ExprContext *econtext;
3159 TupleTableSlot *slot;
3160 bool gottuple;
3161 int64 abs_pos;
3162
3163 Assert(WindowObjectIsValid(winobj));
3164 winstate = winobj->winstate;
3165 econtext = winstate->ss.ps.ps_ExprContext;
3166 slot = winstate->temp_slot_1;
3167
3168 switch (seektype)
3169 {
3170 case WINDOW_SEEK_CURRENT:
3171 abs_pos = winstate->currentpos + relpos;
3172 break;
3173 case WINDOW_SEEK_HEAD:
3174 abs_pos = relpos;
3175 break;
3176 case WINDOW_SEEK_TAIL:
3177 spool_tuples(winstate, -1);
3178 abs_pos = winstate->spooled_rows - 1 + relpos;
3179 break;
3180 default:
3181 elog(ERROR, "unrecognized window seek type: %d", seektype);
3182 abs_pos = 0; /* keep compiler quiet */
3183 break;
3184 }
3185
3186 gottuple = window_gettupleslot(winobj, abs_pos, slot);
3187
3188 if (!gottuple)
3189 {
3190 if (isout)
3191 *isout = true;
3192 *isnull = true;
3193 return (Datum) 0;
3194 }
3195 else
3196 {
3197 if (isout)
3198 *isout = false;
3199 if (set_mark)
3200 WinSetMarkPosition(winobj, abs_pos);
3201 econtext->ecxt_outertuple = slot;
3202 return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
3203 econtext, isnull);
3204 }
3205 }
3206
3207 /*
3208 * WinGetFuncArgInFrame
3209 * Evaluate a window function's argument expression on a specified
3210 * row of the window frame. The row is identified in lseek(2) style,
3211 * i.e. relative to the first or last row of the frame. (We do not
3212 * support WINDOW_SEEK_CURRENT here, because it's not very clear what
3213 * that should mean if the current row isn't part of the frame.)
3214 *
3215 * argno: argument number to evaluate (counted from 0)
3216 * relpos: signed rowcount offset from the seek position
3217 * seektype: WINDOW_SEEK_HEAD or WINDOW_SEEK_TAIL
3218 * set_mark: If the row is found/in frame and set_mark is true, the mark is
3219 * moved to the row as a side-effect.
3220 * isnull: output argument, receives isnull status of result
3221 * isout: output argument, set to indicate whether target row position
3222 * is out of frame (can pass NULL if caller doesn't care about this)
3223 *
3224 * Specifying a nonexistent or not-in-frame row is not an error, it just
3225 * causes a null result (plus setting *isout true, if isout isn't NULL).
3226 *
3227 * Note that some exclusion-clause options lead to situations where the
3228 * rows that are in-frame are not consecutive in the partition. But we
3229 * count only in-frame rows when measuring relpos.
3230 *
3231 * The set_mark flag is interpreted as meaning that the caller will specify
3232 * a constant (or, perhaps, monotonically increasing) relpos in successive
3233 * calls, so that *if there is no exclusion clause* there will be no need
3234 * to fetch a row before the previously fetched row. But we do not expect
3235 * the caller to know how to account for exclusion clauses. Therefore,
3236 * if there is an exclusion clause we take responsibility for adjusting the
3237 * mark request to something that will be safe given the above assumption
3238 * about relpos.
3239 */
3240 Datum
WinGetFuncArgInFrame(WindowObject winobj,int argno,int relpos,int seektype,bool set_mark,bool * isnull,bool * isout)3241 WinGetFuncArgInFrame(WindowObject winobj, int argno,
3242 int relpos, int seektype, bool set_mark,
3243 bool *isnull, bool *isout)
3244 {
3245 WindowAggState *winstate;
3246 ExprContext *econtext;
3247 TupleTableSlot *slot;
3248 int64 abs_pos;
3249 int64 mark_pos;
3250
3251 Assert(WindowObjectIsValid(winobj));
3252 winstate = winobj->winstate;
3253 econtext = winstate->ss.ps.ps_ExprContext;
3254 slot = winstate->temp_slot_1;
3255
3256 switch (seektype)
3257 {
3258 case WINDOW_SEEK_CURRENT:
3259 elog(ERROR, "WINDOW_SEEK_CURRENT is not supported for WinGetFuncArgInFrame");
3260 abs_pos = mark_pos = 0; /* keep compiler quiet */
3261 break;
3262 case WINDOW_SEEK_HEAD:
3263 /* rejecting relpos < 0 is easy and simplifies code below */
3264 if (relpos < 0)
3265 goto out_of_frame;
3266 update_frameheadpos(winstate);
3267 abs_pos = winstate->frameheadpos + relpos;
3268 mark_pos = abs_pos;
3269
3270 /*
3271 * Account for exclusion option if one is active, but advance only
3272 * abs_pos not mark_pos. This prevents changes of the current
3273 * row's peer group from resulting in trying to fetch a row before
3274 * some previous mark position.
3275 *
3276 * Note that in some corner cases such as current row being
3277 * outside frame, these calculations are theoretically too simple,
3278 * but it doesn't matter because we'll end up deciding the row is
3279 * out of frame. We do not attempt to avoid fetching rows past
3280 * end of frame; that would happen in some cases anyway.
3281 */
3282 switch (winstate->frameOptions & FRAMEOPTION_EXCLUSION)
3283 {
3284 case 0:
3285 /* no adjustment needed */
3286 break;
3287 case FRAMEOPTION_EXCLUDE_CURRENT_ROW:
3288 if (abs_pos >= winstate->currentpos &&
3289 winstate->currentpos >= winstate->frameheadpos)
3290 abs_pos++;
3291 break;
3292 case FRAMEOPTION_EXCLUDE_GROUP:
3293 update_grouptailpos(winstate);
3294 if (abs_pos >= winstate->groupheadpos &&
3295 winstate->grouptailpos > winstate->frameheadpos)
3296 {
3297 int64 overlapstart = Max(winstate->groupheadpos,
3298 winstate->frameheadpos);
3299
3300 abs_pos += winstate->grouptailpos - overlapstart;
3301 }
3302 break;
3303 case FRAMEOPTION_EXCLUDE_TIES:
3304 update_grouptailpos(winstate);
3305 if (abs_pos >= winstate->groupheadpos &&
3306 winstate->grouptailpos > winstate->frameheadpos)
3307 {
3308 int64 overlapstart = Max(winstate->groupheadpos,
3309 winstate->frameheadpos);
3310
3311 if (abs_pos == overlapstart)
3312 abs_pos = winstate->currentpos;
3313 else
3314 abs_pos += winstate->grouptailpos - overlapstart - 1;
3315 }
3316 break;
3317 default:
3318 elog(ERROR, "unrecognized frame option state: 0x%x",
3319 winstate->frameOptions);
3320 break;
3321 }
3322 break;
3323 case WINDOW_SEEK_TAIL:
3324 /* rejecting relpos > 0 is easy and simplifies code below */
3325 if (relpos > 0)
3326 goto out_of_frame;
3327 update_frametailpos(winstate);
3328 abs_pos = winstate->frametailpos - 1 + relpos;
3329
3330 /*
3331 * Account for exclusion option if one is active. If there is no
3332 * exclusion, we can safely set the mark at the accessed row. But
3333 * if there is, we can only mark the frame start, because we can't
3334 * be sure how far back in the frame the exclusion might cause us
3335 * to fetch in future. Furthermore, we have to actually check
3336 * against frameheadpos here, since it's unsafe to try to fetch a
3337 * row before frame start if the mark might be there already.
3338 */
3339 switch (winstate->frameOptions & FRAMEOPTION_EXCLUSION)
3340 {
3341 case 0:
3342 /* no adjustment needed */
3343 mark_pos = abs_pos;
3344 break;
3345 case FRAMEOPTION_EXCLUDE_CURRENT_ROW:
3346 if (abs_pos <= winstate->currentpos &&
3347 winstate->currentpos < winstate->frametailpos)
3348 abs_pos--;
3349 update_frameheadpos(winstate);
3350 if (abs_pos < winstate->frameheadpos)
3351 goto out_of_frame;
3352 mark_pos = winstate->frameheadpos;
3353 break;
3354 case FRAMEOPTION_EXCLUDE_GROUP:
3355 update_grouptailpos(winstate);
3356 if (abs_pos < winstate->grouptailpos &&
3357 winstate->groupheadpos < winstate->frametailpos)
3358 {
3359 int64 overlapend = Min(winstate->grouptailpos,
3360 winstate->frametailpos);
3361
3362 abs_pos -= overlapend - winstate->groupheadpos;
3363 }
3364 update_frameheadpos(winstate);
3365 if (abs_pos < winstate->frameheadpos)
3366 goto out_of_frame;
3367 mark_pos = winstate->frameheadpos;
3368 break;
3369 case FRAMEOPTION_EXCLUDE_TIES:
3370 update_grouptailpos(winstate);
3371 if (abs_pos < winstate->grouptailpos &&
3372 winstate->groupheadpos < winstate->frametailpos)
3373 {
3374 int64 overlapend = Min(winstate->grouptailpos,
3375 winstate->frametailpos);
3376
3377 if (abs_pos == overlapend - 1)
3378 abs_pos = winstate->currentpos;
3379 else
3380 abs_pos -= overlapend - 1 - winstate->groupheadpos;
3381 }
3382 update_frameheadpos(winstate);
3383 if (abs_pos < winstate->frameheadpos)
3384 goto out_of_frame;
3385 mark_pos = winstate->frameheadpos;
3386 break;
3387 default:
3388 elog(ERROR, "unrecognized frame option state: 0x%x",
3389 winstate->frameOptions);
3390 mark_pos = 0; /* keep compiler quiet */
3391 break;
3392 }
3393 break;
3394 default:
3395 elog(ERROR, "unrecognized window seek type: %d", seektype);
3396 abs_pos = mark_pos = 0; /* keep compiler quiet */
3397 break;
3398 }
3399
3400 if (!window_gettupleslot(winobj, abs_pos, slot))
3401 goto out_of_frame;
3402
3403 /* The code above does not detect all out-of-frame cases, so check */
3404 if (row_is_in_frame(winstate, abs_pos, slot) <= 0)
3405 goto out_of_frame;
3406
3407 if (isout)
3408 *isout = false;
3409 if (set_mark)
3410 WinSetMarkPosition(winobj, mark_pos);
3411 econtext->ecxt_outertuple = slot;
3412 return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
3413 econtext, isnull);
3414
3415 out_of_frame:
3416 if (isout)
3417 *isout = true;
3418 *isnull = true;
3419 return (Datum) 0;
3420 }
3421
3422 /*
3423 * WinGetFuncArgCurrent
3424 * Evaluate a window function's argument expression on the current row.
3425 *
3426 * argno: argument number to evaluate (counted from 0)
3427 * isnull: output argument, receives isnull status of result
3428 *
3429 * Note: this isn't quite equivalent to WinGetFuncArgInPartition or
3430 * WinGetFuncArgInFrame targeting the current row, because it will succeed
3431 * even if the WindowObject's mark has been set beyond the current row.
3432 * This should generally be used for "ordinary" arguments of a window
3433 * function, such as the offset argument of lead() or lag().
3434 */
3435 Datum
WinGetFuncArgCurrent(WindowObject winobj,int argno,bool * isnull)3436 WinGetFuncArgCurrent(WindowObject winobj, int argno, bool *isnull)
3437 {
3438 WindowAggState *winstate;
3439 ExprContext *econtext;
3440
3441 Assert(WindowObjectIsValid(winobj));
3442 winstate = winobj->winstate;
3443
3444 econtext = winstate->ss.ps.ps_ExprContext;
3445
3446 econtext->ecxt_outertuple = winstate->ss.ss_ScanTupleSlot;
3447 return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
3448 econtext, isnull);
3449 }
3450