1 /*-------------------------------------------------------------------------
2 *
3 * nodeWindowAgg.c
4 * routines to handle WindowAgg nodes.
5 *
6 * A WindowAgg node evaluates "window functions" across suitable partitions
7 * of the input tuple set. Any one WindowAgg works for just a single window
8 * specification, though it can evaluate multiple window functions sharing
9 * identical window specifications. The input tuples are required to be
10 * delivered in sorted order, with the PARTITION BY columns (if any) as
11 * major sort keys and the ORDER BY columns (if any) as minor sort keys.
12 * (The planner generates a stack of WindowAggs with intervening Sort nodes
13 * as needed, if a query involves more than one window specification.)
14 *
15 * Since window functions can require access to any or all of the rows in
16 * the current partition, we accumulate rows of the partition into a
17 * tuplestore. The window functions are called using the WindowObject API
18 * so that they can access those rows as needed.
19 *
20 * We also support using plain aggregate functions as window functions.
21 * For these, the regular Agg-node environment is emulated for each partition.
22 * As required by the SQL spec, the output represents the value of the
23 * aggregate function over all rows in the current row's window frame.
24 *
25 *
26 * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
27 * Portions Copyright (c) 1994, Regents of the University of California
28 *
29 * IDENTIFICATION
30 * src/backend/executor/nodeWindowAgg.c
31 *
32 *-------------------------------------------------------------------------
33 */
34 #include "postgres.h"
35
36 #include "access/htup_details.h"
37 #include "catalog/objectaccess.h"
38 #include "catalog/pg_aggregate.h"
39 #include "catalog/pg_proc.h"
40 #include "executor/executor.h"
41 #include "executor/nodeWindowAgg.h"
42 #include "miscadmin.h"
43 #include "nodes/nodeFuncs.h"
44 #include "optimizer/clauses.h"
45 #include "parser/parse_agg.h"
46 #include "parser/parse_coerce.h"
47 #include "utils/acl.h"
48 #include "utils/builtins.h"
49 #include "utils/datum.h"
50 #include "utils/lsyscache.h"
51 #include "utils/memutils.h"
52 #include "utils/syscache.h"
53 #include "windowapi.h"
54
55 /*
56 * All the window function APIs are called with this object, which is passed
57 * to window functions as fcinfo->context.
58 */
59 typedef struct WindowObjectData
60 {
61 NodeTag type;
62 WindowAggState *winstate; /* parent WindowAggState */
63 List *argstates; /* ExprState trees for fn's arguments */
64 void *localmem; /* WinGetPartitionLocalMemory's chunk */
65 int markptr; /* tuplestore mark pointer for this fn */
66 int readptr; /* tuplestore read pointer for this fn */
67 int64 markpos; /* row that markptr is positioned on */
68 int64 seekpos; /* row that readptr is positioned on */
69 } WindowObjectData;
70
71 /*
72 * We have one WindowStatePerFunc struct for each window function and
73 * window aggregate handled by this node.
74 */
75 typedef struct WindowStatePerFuncData
76 {
77 /* Links to WindowFunc expr and state nodes this working state is for */
78 WindowFuncExprState *wfuncstate;
79 WindowFunc *wfunc;
80
81 int numArguments; /* number of arguments */
82
83 FmgrInfo flinfo; /* fmgr lookup data for window function */
84
85 Oid winCollation; /* collation derived for window function */
86
87 /*
88 * We need the len and byval info for the result of each function in order
89 * to know how to copy/delete values.
90 */
91 int16 resulttypeLen;
92 bool resulttypeByVal;
93
94 bool plain_agg; /* is it just a plain aggregate function? */
95 int aggno; /* if so, index of its PerAggData */
96
97 WindowObject winobj; /* object used in window function API */
98 } WindowStatePerFuncData;
99
100 /*
101 * For plain aggregate window functions, we also have one of these.
102 */
103 typedef struct WindowStatePerAggData
104 {
105 /* Oids of transition functions */
106 Oid transfn_oid;
107 Oid invtransfn_oid; /* may be InvalidOid */
108 Oid finalfn_oid; /* may be InvalidOid */
109
110 /*
111 * fmgr lookup data for transition functions --- only valid when
112 * corresponding oid is not InvalidOid. Note in particular that fn_strict
113 * flags are kept here.
114 */
115 FmgrInfo transfn;
116 FmgrInfo invtransfn;
117 FmgrInfo finalfn;
118
119 int numFinalArgs; /* number of arguments to pass to finalfn */
120
121 /*
122 * initial value from pg_aggregate entry
123 */
124 Datum initValue;
125 bool initValueIsNull;
126
127 /*
128 * cached value for current frame boundaries
129 */
130 Datum resultValue;
131 bool resultValueIsNull;
132
133 /*
134 * We need the len and byval info for the agg's input, result, and
135 * transition data types in order to know how to copy/delete values.
136 */
137 int16 inputtypeLen,
138 resulttypeLen,
139 transtypeLen;
140 bool inputtypeByVal,
141 resulttypeByVal,
142 transtypeByVal;
143
144 int wfuncno; /* index of associated PerFuncData */
145
146 /* Context holding transition value and possibly other subsidiary data */
147 MemoryContext aggcontext; /* may be private, or winstate->aggcontext */
148
149 /* Current transition value */
150 Datum transValue; /* current transition value */
151 bool transValueIsNull;
152
153 int64 transValueCount; /* number of currently-aggregated rows */
154
155 /* Data local to eval_windowaggregates() */
156 bool restart; /* need to restart this agg in this cycle? */
157 } WindowStatePerAggData;
158
159 static void initialize_windowaggregate(WindowAggState *winstate,
160 WindowStatePerFunc perfuncstate,
161 WindowStatePerAgg peraggstate);
162 static void advance_windowaggregate(WindowAggState *winstate,
163 WindowStatePerFunc perfuncstate,
164 WindowStatePerAgg peraggstate);
165 static bool advance_windowaggregate_base(WindowAggState *winstate,
166 WindowStatePerFunc perfuncstate,
167 WindowStatePerAgg peraggstate);
168 static void finalize_windowaggregate(WindowAggState *winstate,
169 WindowStatePerFunc perfuncstate,
170 WindowStatePerAgg peraggstate,
171 Datum *result, bool *isnull);
172
173 static void eval_windowaggregates(WindowAggState *winstate);
174 static void eval_windowfunction(WindowAggState *winstate,
175 WindowStatePerFunc perfuncstate,
176 Datum *result, bool *isnull);
177
178 static void begin_partition(WindowAggState *winstate);
179 static void spool_tuples(WindowAggState *winstate, int64 pos);
180 static void release_partition(WindowAggState *winstate);
181
182 static bool row_is_in_frame(WindowAggState *winstate, int64 pos,
183 TupleTableSlot *slot);
184 static void update_frameheadpos(WindowObject winobj, TupleTableSlot *slot);
185 static void update_frametailpos(WindowObject winobj, TupleTableSlot *slot);
186
187 static WindowStatePerAggData *initialize_peragg(WindowAggState *winstate,
188 WindowFunc *wfunc,
189 WindowStatePerAgg peraggstate);
190 static Datum GetAggInitVal(Datum textInitVal, Oid transtype);
191
192 static bool are_peers(WindowAggState *winstate, TupleTableSlot *slot1,
193 TupleTableSlot *slot2);
194 static bool window_gettupleslot(WindowObject winobj, int64 pos,
195 TupleTableSlot *slot);
196
197
198 /*
199 * initialize_windowaggregate
200 * parallel to initialize_aggregates in nodeAgg.c
201 */
202 static void
initialize_windowaggregate(WindowAggState * winstate,WindowStatePerFunc perfuncstate,WindowStatePerAgg peraggstate)203 initialize_windowaggregate(WindowAggState *winstate,
204 WindowStatePerFunc perfuncstate,
205 WindowStatePerAgg peraggstate)
206 {
207 MemoryContext oldContext;
208
209 /*
210 * If we're using a private aggcontext, we may reset it here. But if the
211 * context is shared, we don't know which other aggregates may still need
212 * it, so we must leave it to the caller to reset at an appropriate time.
213 */
214 if (peraggstate->aggcontext != winstate->aggcontext)
215 MemoryContextResetAndDeleteChildren(peraggstate->aggcontext);
216
217 if (peraggstate->initValueIsNull)
218 peraggstate->transValue = peraggstate->initValue;
219 else
220 {
221 oldContext = MemoryContextSwitchTo(peraggstate->aggcontext);
222 peraggstate->transValue = datumCopy(peraggstate->initValue,
223 peraggstate->transtypeByVal,
224 peraggstate->transtypeLen);
225 MemoryContextSwitchTo(oldContext);
226 }
227 peraggstate->transValueIsNull = peraggstate->initValueIsNull;
228 peraggstate->transValueCount = 0;
229 peraggstate->resultValue = (Datum) 0;
230 peraggstate->resultValueIsNull = true;
231 }
232
233 /*
234 * advance_windowaggregate
235 * parallel to advance_aggregates in nodeAgg.c
236 */
237 static void
advance_windowaggregate(WindowAggState * winstate,WindowStatePerFunc perfuncstate,WindowStatePerAgg peraggstate)238 advance_windowaggregate(WindowAggState *winstate,
239 WindowStatePerFunc perfuncstate,
240 WindowStatePerAgg peraggstate)
241 {
242 WindowFuncExprState *wfuncstate = perfuncstate->wfuncstate;
243 int numArguments = perfuncstate->numArguments;
244 FunctionCallInfoData fcinfodata;
245 FunctionCallInfo fcinfo = &fcinfodata;
246 Datum newVal;
247 ListCell *arg;
248 int i;
249 MemoryContext oldContext;
250 ExprContext *econtext = winstate->tmpcontext;
251 ExprState *filter = wfuncstate->aggfilter;
252
253 oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
254
255 /* Skip anything FILTERed out */
256 if (filter)
257 {
258 bool isnull;
259 Datum res = ExecEvalExpr(filter, econtext, &isnull, NULL);
260
261 if (isnull || !DatumGetBool(res))
262 {
263 MemoryContextSwitchTo(oldContext);
264 return;
265 }
266 }
267
268 /* We start from 1, since the 0th arg will be the transition value */
269 i = 1;
270 foreach(arg, wfuncstate->args)
271 {
272 ExprState *argstate = (ExprState *) lfirst(arg);
273
274 fcinfo->arg[i] = ExecEvalExpr(argstate, econtext,
275 &fcinfo->argnull[i], NULL);
276 i++;
277 }
278
279 if (peraggstate->transfn.fn_strict)
280 {
281 /*
282 * For a strict transfn, nothing happens when there's a NULL input; we
283 * just keep the prior transValue. Note transValueCount doesn't
284 * change either.
285 */
286 for (i = 1; i <= numArguments; i++)
287 {
288 if (fcinfo->argnull[i])
289 {
290 MemoryContextSwitchTo(oldContext);
291 return;
292 }
293 }
294
295 /*
296 * For strict transition functions with initial value NULL we use the
297 * first non-NULL input as the initial state. (We already checked
298 * that the agg's input type is binary-compatible with its transtype,
299 * so straight copy here is OK.)
300 *
301 * We must copy the datum into aggcontext if it is pass-by-ref. We do
302 * not need to pfree the old transValue, since it's NULL.
303 */
304 if (peraggstate->transValueCount == 0 && peraggstate->transValueIsNull)
305 {
306 MemoryContextSwitchTo(peraggstate->aggcontext);
307 peraggstate->transValue = datumCopy(fcinfo->arg[1],
308 peraggstate->transtypeByVal,
309 peraggstate->transtypeLen);
310 peraggstate->transValueIsNull = false;
311 peraggstate->transValueCount = 1;
312 MemoryContextSwitchTo(oldContext);
313 return;
314 }
315
316 if (peraggstate->transValueIsNull)
317 {
318 /*
319 * Don't call a strict function with NULL inputs. Note it is
320 * possible to get here despite the above tests, if the transfn is
321 * strict *and* returned a NULL on a prior cycle. If that happens
322 * we will propagate the NULL all the way to the end. That can
323 * only happen if there's no inverse transition function, though,
324 * since we disallow transitions back to NULL when there is one.
325 */
326 MemoryContextSwitchTo(oldContext);
327 Assert(!OidIsValid(peraggstate->invtransfn_oid));
328 return;
329 }
330 }
331
332 /*
333 * OK to call the transition function. Set winstate->curaggcontext while
334 * calling it, for possible use by AggCheckCallContext.
335 */
336 InitFunctionCallInfoData(*fcinfo, &(peraggstate->transfn),
337 numArguments + 1,
338 perfuncstate->winCollation,
339 (void *) winstate, NULL);
340 fcinfo->arg[0] = peraggstate->transValue;
341 fcinfo->argnull[0] = peraggstate->transValueIsNull;
342 winstate->curaggcontext = peraggstate->aggcontext;
343 newVal = FunctionCallInvoke(fcinfo);
344 winstate->curaggcontext = NULL;
345
346 /*
347 * Moving-aggregate transition functions must not return null, see
348 * advance_windowaggregate_base().
349 */
350 if (fcinfo->isnull && OidIsValid(peraggstate->invtransfn_oid))
351 ereport(ERROR,
352 (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
353 errmsg("moving-aggregate transition function must not return null")));
354
355 /*
356 * We must track the number of rows included in transValue, since to
357 * remove the last input, advance_windowaggregate_base() mustn't call the
358 * inverse transition function, but simply reset transValue back to its
359 * initial value.
360 */
361 peraggstate->transValueCount++;
362
363 /*
364 * If pass-by-ref datatype, must copy the new value into aggcontext and
365 * free the prior transValue. But if transfn returned a pointer to its
366 * first input, we don't need to do anything. Also, if transfn returned a
367 * pointer to a R/W expanded object that is already a child of the
368 * aggcontext, assume we can adopt that value without copying it.
369 */
370 if (!peraggstate->transtypeByVal &&
371 DatumGetPointer(newVal) != DatumGetPointer(peraggstate->transValue))
372 {
373 if (!fcinfo->isnull)
374 {
375 MemoryContextSwitchTo(peraggstate->aggcontext);
376 if (DatumIsReadWriteExpandedObject(newVal,
377 false,
378 peraggstate->transtypeLen) &&
379 MemoryContextGetParent(DatumGetEOHP(newVal)->eoh_context) == CurrentMemoryContext)
380 /* do nothing */ ;
381 else
382 newVal = datumCopy(newVal,
383 peraggstate->transtypeByVal,
384 peraggstate->transtypeLen);
385 }
386 if (!peraggstate->transValueIsNull)
387 {
388 if (DatumIsReadWriteExpandedObject(peraggstate->transValue,
389 false,
390 peraggstate->transtypeLen))
391 DeleteExpandedObject(peraggstate->transValue);
392 else
393 pfree(DatumGetPointer(peraggstate->transValue));
394 }
395 }
396
397 MemoryContextSwitchTo(oldContext);
398 peraggstate->transValue = newVal;
399 peraggstate->transValueIsNull = fcinfo->isnull;
400 }
401
402 /*
403 * advance_windowaggregate_base
404 * Remove the oldest tuple from an aggregation.
405 *
406 * This is very much like advance_windowaggregate, except that we will call
407 * the inverse transition function (which caller must have checked is
408 * available).
409 *
410 * Returns true if we successfully removed the current row from this
411 * aggregate, false if not (in the latter case, caller is responsible
412 * for cleaning up by restarting the aggregation).
413 */
414 static bool
advance_windowaggregate_base(WindowAggState * winstate,WindowStatePerFunc perfuncstate,WindowStatePerAgg peraggstate)415 advance_windowaggregate_base(WindowAggState *winstate,
416 WindowStatePerFunc perfuncstate,
417 WindowStatePerAgg peraggstate)
418 {
419 WindowFuncExprState *wfuncstate = perfuncstate->wfuncstate;
420 int numArguments = perfuncstate->numArguments;
421 FunctionCallInfoData fcinfodata;
422 FunctionCallInfo fcinfo = &fcinfodata;
423 Datum newVal;
424 ListCell *arg;
425 int i;
426 MemoryContext oldContext;
427 ExprContext *econtext = winstate->tmpcontext;
428 ExprState *filter = wfuncstate->aggfilter;
429
430 oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
431
432 /* Skip anything FILTERed out */
433 if (filter)
434 {
435 bool isnull;
436 Datum res = ExecEvalExpr(filter, econtext, &isnull, NULL);
437
438 if (isnull || !DatumGetBool(res))
439 {
440 MemoryContextSwitchTo(oldContext);
441 return true;
442 }
443 }
444
445 /* We start from 1, since the 0th arg will be the transition value */
446 i = 1;
447 foreach(arg, wfuncstate->args)
448 {
449 ExprState *argstate = (ExprState *) lfirst(arg);
450
451 fcinfo->arg[i] = ExecEvalExpr(argstate, econtext,
452 &fcinfo->argnull[i], NULL);
453 i++;
454 }
455
456 if (peraggstate->invtransfn.fn_strict)
457 {
458 /*
459 * For a strict (inv)transfn, nothing happens when there's a NULL
460 * input; we just keep the prior transValue. Note transValueCount
461 * doesn't change either.
462 */
463 for (i = 1; i <= numArguments; i++)
464 {
465 if (fcinfo->argnull[i])
466 {
467 MemoryContextSwitchTo(oldContext);
468 return true;
469 }
470 }
471 }
472
473 /* There should still be an added but not yet removed value */
474 Assert(peraggstate->transValueCount > 0);
475
476 /*
477 * In moving-aggregate mode, the state must never be NULL, except possibly
478 * before any rows have been aggregated (which is surely not the case at
479 * this point). This restriction allows us to interpret a NULL result
480 * from the inverse function as meaning "sorry, can't do an inverse
481 * transition in this case". We already checked this in
482 * advance_windowaggregate, but just for safety, check again.
483 */
484 if (peraggstate->transValueIsNull)
485 elog(ERROR, "aggregate transition value is NULL before inverse transition");
486
487 /*
488 * We mustn't use the inverse transition function to remove the last
489 * input. Doing so would yield a non-NULL state, whereas we should be in
490 * the initial state afterwards which may very well be NULL. So instead,
491 * we simply re-initialize the aggregate in this case.
492 */
493 if (peraggstate->transValueCount == 1)
494 {
495 MemoryContextSwitchTo(oldContext);
496 initialize_windowaggregate(winstate,
497 &winstate->perfunc[peraggstate->wfuncno],
498 peraggstate);
499 return true;
500 }
501
502 /*
503 * OK to call the inverse transition function. Set
504 * winstate->curaggcontext while calling it, for possible use by
505 * AggCheckCallContext.
506 */
507 InitFunctionCallInfoData(*fcinfo, &(peraggstate->invtransfn),
508 numArguments + 1,
509 perfuncstate->winCollation,
510 (void *) winstate, NULL);
511 fcinfo->arg[0] = peraggstate->transValue;
512 fcinfo->argnull[0] = peraggstate->transValueIsNull;
513 winstate->curaggcontext = peraggstate->aggcontext;
514 newVal = FunctionCallInvoke(fcinfo);
515 winstate->curaggcontext = NULL;
516
517 /*
518 * If the function returns NULL, report failure, forcing a restart.
519 */
520 if (fcinfo->isnull)
521 {
522 MemoryContextSwitchTo(oldContext);
523 return false;
524 }
525
526 /* Update number of rows included in transValue */
527 peraggstate->transValueCount--;
528
529 /*
530 * If pass-by-ref datatype, must copy the new value into aggcontext and
531 * free the prior transValue. But if invtransfn returned a pointer to its
532 * first input, we don't need to do anything. Also, if invtransfn
533 * returned a pointer to a R/W expanded object that is already a child of
534 * the aggcontext, assume we can adopt that value without copying it.
535 *
536 * Note: the checks for null values here will never fire, but it seems
537 * best to have this stanza look just like advance_windowaggregate.
538 */
539 if (!peraggstate->transtypeByVal &&
540 DatumGetPointer(newVal) != DatumGetPointer(peraggstate->transValue))
541 {
542 if (!fcinfo->isnull)
543 {
544 MemoryContextSwitchTo(peraggstate->aggcontext);
545 if (DatumIsReadWriteExpandedObject(newVal,
546 false,
547 peraggstate->transtypeLen) &&
548 MemoryContextGetParent(DatumGetEOHP(newVal)->eoh_context) == CurrentMemoryContext)
549 /* do nothing */ ;
550 else
551 newVal = datumCopy(newVal,
552 peraggstate->transtypeByVal,
553 peraggstate->transtypeLen);
554 }
555 if (!peraggstate->transValueIsNull)
556 {
557 if (DatumIsReadWriteExpandedObject(peraggstate->transValue,
558 false,
559 peraggstate->transtypeLen))
560 DeleteExpandedObject(peraggstate->transValue);
561 else
562 pfree(DatumGetPointer(peraggstate->transValue));
563 }
564 }
565
566 MemoryContextSwitchTo(oldContext);
567 peraggstate->transValue = newVal;
568 peraggstate->transValueIsNull = fcinfo->isnull;
569
570 return true;
571 }
572
573 /*
574 * finalize_windowaggregate
575 * parallel to finalize_aggregate in nodeAgg.c
576 */
577 static void
finalize_windowaggregate(WindowAggState * winstate,WindowStatePerFunc perfuncstate,WindowStatePerAgg peraggstate,Datum * result,bool * isnull)578 finalize_windowaggregate(WindowAggState *winstate,
579 WindowStatePerFunc perfuncstate,
580 WindowStatePerAgg peraggstate,
581 Datum *result, bool *isnull)
582 {
583 MemoryContext oldContext;
584
585 oldContext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
586
587 /*
588 * Apply the agg's finalfn if one is provided, else return transValue.
589 */
590 if (OidIsValid(peraggstate->finalfn_oid))
591 {
592 int numFinalArgs = peraggstate->numFinalArgs;
593 FunctionCallInfoData fcinfo;
594 bool anynull;
595 int i;
596
597 InitFunctionCallInfoData(fcinfo, &(peraggstate->finalfn),
598 numFinalArgs,
599 perfuncstate->winCollation,
600 (void *) winstate, NULL);
601 fcinfo.arg[0] = MakeExpandedObjectReadOnly(peraggstate->transValue,
602 peraggstate->transValueIsNull,
603 peraggstate->transtypeLen);
604 fcinfo.argnull[0] = peraggstate->transValueIsNull;
605 anynull = peraggstate->transValueIsNull;
606
607 /* Fill any remaining argument positions with nulls */
608 for (i = 1; i < numFinalArgs; i++)
609 {
610 fcinfo.arg[i] = (Datum) 0;
611 fcinfo.argnull[i] = true;
612 anynull = true;
613 }
614
615 if (fcinfo.flinfo->fn_strict && anynull)
616 {
617 /* don't call a strict function with NULL inputs */
618 *result = (Datum) 0;
619 *isnull = true;
620 }
621 else
622 {
623 winstate->curaggcontext = peraggstate->aggcontext;
624 *result = FunctionCallInvoke(&fcinfo);
625 winstate->curaggcontext = NULL;
626 *isnull = fcinfo.isnull;
627 }
628 }
629 else
630 {
631 /* Don't need MakeExpandedObjectReadOnly; datumCopy will copy it */
632 *result = peraggstate->transValue;
633 *isnull = peraggstate->transValueIsNull;
634 }
635
636 /*
637 * If result is pass-by-ref, make sure it is in the right context.
638 */
639 if (!peraggstate->resulttypeByVal && !*isnull &&
640 !MemoryContextContains(CurrentMemoryContext,
641 DatumGetPointer(*result)))
642 *result = datumCopy(*result,
643 peraggstate->resulttypeByVal,
644 peraggstate->resulttypeLen);
645 MemoryContextSwitchTo(oldContext);
646 }
647
648 /*
649 * eval_windowaggregates
650 * evaluate plain aggregates being used as window functions
651 *
652 * This differs from nodeAgg.c in two ways. First, if the window's frame
653 * start position moves, we use the inverse transition function (if it exists)
654 * to remove rows from the transition value. And second, we expect to be
655 * able to call aggregate final functions repeatedly after aggregating more
656 * data onto the same transition value. This is not a behavior required by
657 * nodeAgg.c.
658 */
659 static void
eval_windowaggregates(WindowAggState * winstate)660 eval_windowaggregates(WindowAggState *winstate)
661 {
662 WindowStatePerAgg peraggstate;
663 int wfuncno,
664 numaggs,
665 numaggs_restart,
666 i;
667 int64 aggregatedupto_nonrestarted;
668 MemoryContext oldContext;
669 ExprContext *econtext;
670 WindowObject agg_winobj;
671 TupleTableSlot *agg_row_slot;
672 TupleTableSlot *temp_slot;
673
674 numaggs = winstate->numaggs;
675 if (numaggs == 0)
676 return; /* nothing to do */
677
678 /* final output execution is in ps_ExprContext */
679 econtext = winstate->ss.ps.ps_ExprContext;
680 agg_winobj = winstate->agg_winobj;
681 agg_row_slot = winstate->agg_row_slot;
682 temp_slot = winstate->temp_slot_1;
683
684 /*
685 * Currently, we support only a subset of the SQL-standard window framing
686 * rules.
687 *
688 * If the frame start is UNBOUNDED_PRECEDING, the window frame consists of
689 * a contiguous group of rows extending forward from the start of the
690 * partition, and rows only enter the frame, never exit it, as the current
691 * row advances forward. This makes it possible to use an incremental
692 * strategy for evaluating aggregates: we run the transition function for
693 * each row added to the frame, and run the final function whenever we
694 * need the current aggregate value. This is considerably more efficient
695 * than the naive approach of re-running the entire aggregate calculation
696 * for each current row. It does assume that the final function doesn't
697 * damage the running transition value, but we have the same assumption in
698 * nodeAgg.c too (when it rescans an existing hash table).
699 *
700 * If the frame start does sometimes move, we can still optimize as above
701 * whenever successive rows share the same frame head, but if the frame
702 * head moves beyond the previous head we try to remove those rows using
703 * the aggregate's inverse transition function. This function restores
704 * the aggregate's current state to what it would be if the removed row
705 * had never been aggregated in the first place. Inverse transition
706 * functions may optionally return NULL, indicating that the function was
707 * unable to remove the tuple from aggregation. If this happens, or if
708 * the aggregate doesn't have an inverse transition function at all, we
709 * must perform the aggregation all over again for all tuples within the
710 * new frame boundaries.
711 *
712 * In many common cases, multiple rows share the same frame and hence the
713 * same aggregate value. (In particular, if there's no ORDER BY in a RANGE
714 * window, then all rows are peers and so they all have window frame equal
715 * to the whole partition.) We optimize such cases by calculating the
716 * aggregate value once when we reach the first row of a peer group, and
717 * then returning the saved value for all subsequent rows.
718 *
719 * 'aggregatedupto' keeps track of the first row that has not yet been
720 * accumulated into the aggregate transition values. Whenever we start a
721 * new peer group, we accumulate forward to the end of the peer group.
722 */
723
724 /*
725 * First, update the frame head position.
726 *
727 * The frame head should never move backwards, and the code below wouldn't
728 * cope if it did, so for safety we complain if it does.
729 */
730 update_frameheadpos(agg_winobj, temp_slot);
731 if (winstate->frameheadpos < winstate->aggregatedbase)
732 elog(ERROR, "window frame head moved backward");
733
734 /*
735 * If the frame didn't change compared to the previous row, we can re-use
736 * the result values that were previously saved at the bottom of this
737 * function. Since we don't know the current frame's end yet, this is not
738 * possible to check for fully. But if the frame end mode is UNBOUNDED
739 * FOLLOWING or CURRENT ROW, and the current row lies within the previous
740 * row's frame, then the two frames' ends must coincide. Note that on the
741 * first row aggregatedbase == aggregatedupto, meaning this test must
742 * fail, so we don't need to check the "there was no previous row" case
743 * explicitly here.
744 */
745 if (winstate->aggregatedbase == winstate->frameheadpos &&
746 (winstate->frameOptions & (FRAMEOPTION_END_UNBOUNDED_FOLLOWING |
747 FRAMEOPTION_END_CURRENT_ROW)) &&
748 winstate->aggregatedbase <= winstate->currentpos &&
749 winstate->aggregatedupto > winstate->currentpos)
750 {
751 for (i = 0; i < numaggs; i++)
752 {
753 peraggstate = &winstate->peragg[i];
754 wfuncno = peraggstate->wfuncno;
755 econtext->ecxt_aggvalues[wfuncno] = peraggstate->resultValue;
756 econtext->ecxt_aggnulls[wfuncno] = peraggstate->resultValueIsNull;
757 }
758 return;
759 }
760
761 /*----------
762 * Initialize restart flags.
763 *
764 * We restart the aggregation:
765 * - if we're processing the first row in the partition, or
766 * - if the frame's head moved and we cannot use an inverse
767 * transition function, or
768 * - if the new frame doesn't overlap the old one
769 *
770 * Note that we don't strictly need to restart in the last case, but if
771 * we're going to remove all rows from the aggregation anyway, a restart
772 * surely is faster.
773 *----------
774 */
775 numaggs_restart = 0;
776 for (i = 0; i < numaggs; i++)
777 {
778 peraggstate = &winstate->peragg[i];
779 if (winstate->currentpos == 0 ||
780 (winstate->aggregatedbase != winstate->frameheadpos &&
781 !OidIsValid(peraggstate->invtransfn_oid)) ||
782 winstate->aggregatedupto <= winstate->frameheadpos)
783 {
784 peraggstate->restart = true;
785 numaggs_restart++;
786 }
787 else
788 peraggstate->restart = false;
789 }
790
791 /*
792 * If we have any possibly-moving aggregates, attempt to advance
793 * aggregatedbase to match the frame's head by removing input rows that
794 * fell off the top of the frame from the aggregations. This can fail,
795 * i.e. advance_windowaggregate_base() can return false, in which case
796 * we'll restart that aggregate below.
797 */
798 while (numaggs_restart < numaggs &&
799 winstate->aggregatedbase < winstate->frameheadpos)
800 {
801 /*
802 * Fetch the next tuple of those being removed. This should never fail
803 * as we should have been here before.
804 */
805 if (!window_gettupleslot(agg_winobj, winstate->aggregatedbase,
806 temp_slot))
807 elog(ERROR, "could not re-fetch previously fetched frame row");
808
809 /* Set tuple context for evaluation of aggregate arguments */
810 winstate->tmpcontext->ecxt_outertuple = temp_slot;
811
812 /*
813 * Perform the inverse transition for each aggregate function in the
814 * window, unless it has already been marked as needing a restart.
815 */
816 for (i = 0; i < numaggs; i++)
817 {
818 bool ok;
819
820 peraggstate = &winstate->peragg[i];
821 if (peraggstate->restart)
822 continue;
823
824 wfuncno = peraggstate->wfuncno;
825 ok = advance_windowaggregate_base(winstate,
826 &winstate->perfunc[wfuncno],
827 peraggstate);
828 if (!ok)
829 {
830 /* Inverse transition function has failed, must restart */
831 peraggstate->restart = true;
832 numaggs_restart++;
833 }
834 }
835
836 /* Reset per-input-tuple context after each tuple */
837 ResetExprContext(winstate->tmpcontext);
838
839 /* And advance the aggregated-row state */
840 winstate->aggregatedbase++;
841 ExecClearTuple(temp_slot);
842 }
843
844 /*
845 * If we successfully advanced the base rows of all the aggregates,
846 * aggregatedbase now equals frameheadpos; but if we failed for any, we
847 * must forcibly update aggregatedbase.
848 */
849 winstate->aggregatedbase = winstate->frameheadpos;
850
851 /*
852 * If we created a mark pointer for aggregates, keep it pushed up to frame
853 * head, so that tuplestore can discard unnecessary rows.
854 */
855 if (agg_winobj->markptr >= 0)
856 WinSetMarkPosition(agg_winobj, winstate->frameheadpos);
857
858 /*
859 * Now restart the aggregates that require it.
860 *
861 * We assume that aggregates using the shared context always restart if
862 * *any* aggregate restarts, and we may thus clean up the shared
863 * aggcontext if that is the case. Private aggcontexts are reset by
864 * initialize_windowaggregate() if their owning aggregate restarts. If we
865 * aren't restarting an aggregate, we need to free any previously saved
866 * result for it, else we'll leak memory.
867 */
868 if (numaggs_restart > 0)
869 MemoryContextResetAndDeleteChildren(winstate->aggcontext);
870 for (i = 0; i < numaggs; i++)
871 {
872 peraggstate = &winstate->peragg[i];
873
874 /* Aggregates using the shared ctx must restart if *any* agg does */
875 Assert(peraggstate->aggcontext != winstate->aggcontext ||
876 numaggs_restart == 0 ||
877 peraggstate->restart);
878
879 if (peraggstate->restart)
880 {
881 wfuncno = peraggstate->wfuncno;
882 initialize_windowaggregate(winstate,
883 &winstate->perfunc[wfuncno],
884 peraggstate);
885 }
886 else if (!peraggstate->resultValueIsNull)
887 {
888 if (!peraggstate->resulttypeByVal)
889 pfree(DatumGetPointer(peraggstate->resultValue));
890 peraggstate->resultValue = (Datum) 0;
891 peraggstate->resultValueIsNull = true;
892 }
893 }
894
895 /*
896 * Non-restarted aggregates now contain the rows between aggregatedbase
897 * (i.e., frameheadpos) and aggregatedupto, while restarted aggregates
898 * contain no rows. If there are any restarted aggregates, we must thus
899 * begin aggregating anew at frameheadpos, otherwise we may simply
900 * continue at aggregatedupto. We must remember the old value of
901 * aggregatedupto to know how long to skip advancing non-restarted
902 * aggregates. If we modify aggregatedupto, we must also clear
903 * agg_row_slot, per the loop invariant below.
904 */
905 aggregatedupto_nonrestarted = winstate->aggregatedupto;
906 if (numaggs_restart > 0 &&
907 winstate->aggregatedupto != winstate->frameheadpos)
908 {
909 winstate->aggregatedupto = winstate->frameheadpos;
910 ExecClearTuple(agg_row_slot);
911 }
912
913 /*
914 * Advance until we reach a row not in frame (or end of partition).
915 *
916 * Note the loop invariant: agg_row_slot is either empty or holds the row
917 * at position aggregatedupto. We advance aggregatedupto after processing
918 * a row.
919 */
920 for (;;)
921 {
922 /* Fetch next row if we didn't already */
923 if (TupIsNull(agg_row_slot))
924 {
925 if (!window_gettupleslot(agg_winobj, winstate->aggregatedupto,
926 agg_row_slot))
927 break; /* must be end of partition */
928 }
929
930 /* Exit loop (for now) if not in frame */
931 if (!row_is_in_frame(winstate, winstate->aggregatedupto, agg_row_slot))
932 break;
933
934 /* Set tuple context for evaluation of aggregate arguments */
935 winstate->tmpcontext->ecxt_outertuple = agg_row_slot;
936
937 /* Accumulate row into the aggregates */
938 for (i = 0; i < numaggs; i++)
939 {
940 peraggstate = &winstate->peragg[i];
941
942 /* Non-restarted aggs skip until aggregatedupto_nonrestarted */
943 if (!peraggstate->restart &&
944 winstate->aggregatedupto < aggregatedupto_nonrestarted)
945 continue;
946
947 wfuncno = peraggstate->wfuncno;
948 advance_windowaggregate(winstate,
949 &winstate->perfunc[wfuncno],
950 peraggstate);
951 }
952
953 /* Reset per-input-tuple context after each tuple */
954 ResetExprContext(winstate->tmpcontext);
955
956 /* And advance the aggregated-row state */
957 winstate->aggregatedupto++;
958 ExecClearTuple(agg_row_slot);
959 }
960
961 /* The frame's end is not supposed to move backwards, ever */
962 Assert(aggregatedupto_nonrestarted <= winstate->aggregatedupto);
963
964 /*
965 * finalize aggregates and fill result/isnull fields.
966 */
967 for (i = 0; i < numaggs; i++)
968 {
969 Datum *result;
970 bool *isnull;
971
972 peraggstate = &winstate->peragg[i];
973 wfuncno = peraggstate->wfuncno;
974 result = &econtext->ecxt_aggvalues[wfuncno];
975 isnull = &econtext->ecxt_aggnulls[wfuncno];
976 finalize_windowaggregate(winstate,
977 &winstate->perfunc[wfuncno],
978 peraggstate,
979 result, isnull);
980
981 /*
982 * save the result in case next row shares the same frame.
983 *
984 * XXX in some framing modes, eg ROWS/END_CURRENT_ROW, we can know in
985 * advance that the next row can't possibly share the same frame. Is
986 * it worth detecting that and skipping this code?
987 */
988 if (!peraggstate->resulttypeByVal && !*isnull)
989 {
990 oldContext = MemoryContextSwitchTo(peraggstate->aggcontext);
991 peraggstate->resultValue =
992 datumCopy(*result,
993 peraggstate->resulttypeByVal,
994 peraggstate->resulttypeLen);
995 MemoryContextSwitchTo(oldContext);
996 }
997 else
998 {
999 peraggstate->resultValue = *result;
1000 }
1001 peraggstate->resultValueIsNull = *isnull;
1002 }
1003 }
1004
1005 /*
1006 * eval_windowfunction
1007 *
1008 * Arguments of window functions are not evaluated here, because a window
1009 * function can need random access to arbitrary rows in the partition.
1010 * The window function uses the special WinGetFuncArgInPartition and
1011 * WinGetFuncArgInFrame functions to evaluate the arguments for the rows
1012 * it wants.
1013 */
1014 static void
eval_windowfunction(WindowAggState * winstate,WindowStatePerFunc perfuncstate,Datum * result,bool * isnull)1015 eval_windowfunction(WindowAggState *winstate, WindowStatePerFunc perfuncstate,
1016 Datum *result, bool *isnull)
1017 {
1018 FunctionCallInfoData fcinfo;
1019 MemoryContext oldContext;
1020
1021 oldContext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
1022
1023 /*
1024 * We don't pass any normal arguments to a window function, but we do pass
1025 * it the number of arguments, in order to permit window function
1026 * implementations to support varying numbers of arguments. The real info
1027 * goes through the WindowObject, which is passed via fcinfo->context.
1028 */
1029 InitFunctionCallInfoData(fcinfo, &(perfuncstate->flinfo),
1030 perfuncstate->numArguments,
1031 perfuncstate->winCollation,
1032 (void *) perfuncstate->winobj, NULL);
1033 /* Just in case, make all the regular argument slots be null */
1034 memset(fcinfo.argnull, true, perfuncstate->numArguments);
1035 /* Window functions don't have a current aggregate context, either */
1036 winstate->curaggcontext = NULL;
1037
1038 *result = FunctionCallInvoke(&fcinfo);
1039 *isnull = fcinfo.isnull;
1040
1041 /*
1042 * Make sure pass-by-ref data is allocated in the appropriate context. (We
1043 * need this in case the function returns a pointer into some short-lived
1044 * tuple, as is entirely possible.)
1045 */
1046 if (!perfuncstate->resulttypeByVal && !fcinfo.isnull &&
1047 !MemoryContextContains(CurrentMemoryContext,
1048 DatumGetPointer(*result)))
1049 *result = datumCopy(*result,
1050 perfuncstate->resulttypeByVal,
1051 perfuncstate->resulttypeLen);
1052
1053 MemoryContextSwitchTo(oldContext);
1054 }
1055
1056 /*
1057 * begin_partition
1058 * Start buffering rows of the next partition.
1059 */
1060 static void
begin_partition(WindowAggState * winstate)1061 begin_partition(WindowAggState *winstate)
1062 {
1063 PlanState *outerPlan = outerPlanState(winstate);
1064 int numfuncs = winstate->numfuncs;
1065 int i;
1066
1067 winstate->partition_spooled = false;
1068 winstate->framehead_valid = false;
1069 winstate->frametail_valid = false;
1070 winstate->spooled_rows = 0;
1071 winstate->currentpos = 0;
1072 winstate->frameheadpos = 0;
1073 winstate->frametailpos = -1;
1074 ExecClearTuple(winstate->agg_row_slot);
1075
1076 /*
1077 * If this is the very first partition, we need to fetch the first input
1078 * row to store in first_part_slot.
1079 */
1080 if (TupIsNull(winstate->first_part_slot))
1081 {
1082 TupleTableSlot *outerslot = ExecProcNode(outerPlan);
1083
1084 if (!TupIsNull(outerslot))
1085 ExecCopySlot(winstate->first_part_slot, outerslot);
1086 else
1087 {
1088 /* outer plan is empty, so we have nothing to do */
1089 winstate->partition_spooled = true;
1090 winstate->more_partitions = false;
1091 return;
1092 }
1093 }
1094
1095 /* Create new tuplestore for this partition */
1096 winstate->buffer = tuplestore_begin_heap(false, false, work_mem);
1097
1098 /*
1099 * Set up read pointers for the tuplestore. The current pointer doesn't
1100 * need BACKWARD capability, but the per-window-function read pointers do,
1101 * and the aggregate pointer does if frame start is movable.
1102 */
1103 winstate->current_ptr = 0; /* read pointer 0 is pre-allocated */
1104
1105 /* reset default REWIND capability bit for current ptr */
1106 tuplestore_set_eflags(winstate->buffer, 0);
1107
1108 /* create read pointers for aggregates, if needed */
1109 if (winstate->numaggs > 0)
1110 {
1111 WindowObject agg_winobj = winstate->agg_winobj;
1112 int readptr_flags = 0;
1113
1114 /* If the frame head is potentially movable ... */
1115 if (!(winstate->frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING))
1116 {
1117 /* ... create a mark pointer to track the frame head */
1118 agg_winobj->markptr = tuplestore_alloc_read_pointer(winstate->buffer, 0);
1119 /* and the read pointer will need BACKWARD capability */
1120 readptr_flags |= EXEC_FLAG_BACKWARD;
1121 }
1122
1123 agg_winobj->readptr = tuplestore_alloc_read_pointer(winstate->buffer,
1124 readptr_flags);
1125 agg_winobj->markpos = -1;
1126 agg_winobj->seekpos = -1;
1127
1128 /* Also reset the row counters for aggregates */
1129 winstate->aggregatedbase = 0;
1130 winstate->aggregatedupto = 0;
1131 }
1132
1133 /* create mark and read pointers for each real window function */
1134 for (i = 0; i < numfuncs; i++)
1135 {
1136 WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
1137
1138 if (!perfuncstate->plain_agg)
1139 {
1140 WindowObject winobj = perfuncstate->winobj;
1141
1142 winobj->markptr = tuplestore_alloc_read_pointer(winstate->buffer,
1143 0);
1144 winobj->readptr = tuplestore_alloc_read_pointer(winstate->buffer,
1145 EXEC_FLAG_BACKWARD);
1146 winobj->markpos = -1;
1147 winobj->seekpos = -1;
1148 }
1149 }
1150
1151 /*
1152 * Store the first tuple into the tuplestore (it's always available now;
1153 * we either read it above, or saved it at the end of previous partition)
1154 */
1155 tuplestore_puttupleslot(winstate->buffer, winstate->first_part_slot);
1156 winstate->spooled_rows++;
1157 }
1158
1159 /*
1160 * Read tuples from the outer node, up to and including position 'pos', and
1161 * store them into the tuplestore. If pos is -1, reads the whole partition.
1162 */
1163 static void
spool_tuples(WindowAggState * winstate,int64 pos)1164 spool_tuples(WindowAggState *winstate, int64 pos)
1165 {
1166 WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
1167 PlanState *outerPlan;
1168 TupleTableSlot *outerslot;
1169 MemoryContext oldcontext;
1170
1171 if (!winstate->buffer)
1172 return; /* just a safety check */
1173 if (winstate->partition_spooled)
1174 return; /* whole partition done already */
1175
1176 /*
1177 * If the tuplestore has spilled to disk, alternate reading and writing
1178 * becomes quite expensive due to frequent buffer flushes. It's cheaper
1179 * to force the entire partition to get spooled in one go.
1180 *
1181 * XXX this is a horrid kluge --- it'd be better to fix the performance
1182 * problem inside tuplestore. FIXME
1183 */
1184 if (!tuplestore_in_memory(winstate->buffer))
1185 pos = -1;
1186
1187 outerPlan = outerPlanState(winstate);
1188
1189 /* Must be in query context to call outerplan */
1190 oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
1191
1192 while (winstate->spooled_rows <= pos || pos == -1)
1193 {
1194 outerslot = ExecProcNode(outerPlan);
1195 if (TupIsNull(outerslot))
1196 {
1197 /* reached the end of the last partition */
1198 winstate->partition_spooled = true;
1199 winstate->more_partitions = false;
1200 break;
1201 }
1202
1203 if (node->partNumCols > 0)
1204 {
1205 /* Check if this tuple still belongs to the current partition */
1206 if (!execTuplesMatch(winstate->first_part_slot,
1207 outerslot,
1208 node->partNumCols, node->partColIdx,
1209 winstate->partEqfunctions,
1210 winstate->tmpcontext->ecxt_per_tuple_memory))
1211 {
1212 /*
1213 * end of partition; copy the tuple for the next cycle.
1214 */
1215 ExecCopySlot(winstate->first_part_slot, outerslot);
1216 winstate->partition_spooled = true;
1217 winstate->more_partitions = true;
1218 break;
1219 }
1220 }
1221
1222 /* Still in partition, so save it into the tuplestore */
1223 tuplestore_puttupleslot(winstate->buffer, outerslot);
1224 winstate->spooled_rows++;
1225 }
1226
1227 MemoryContextSwitchTo(oldcontext);
1228 }
1229
1230 /*
1231 * release_partition
1232 * clear information kept within a partition, including
1233 * tuplestore and aggregate results.
1234 */
1235 static void
release_partition(WindowAggState * winstate)1236 release_partition(WindowAggState *winstate)
1237 {
1238 int i;
1239
1240 for (i = 0; i < winstate->numfuncs; i++)
1241 {
1242 WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
1243
1244 /* Release any partition-local state of this window function */
1245 if (perfuncstate->winobj)
1246 perfuncstate->winobj->localmem = NULL;
1247 }
1248
1249 /*
1250 * Release all partition-local memory (in particular, any partition-local
1251 * state that we might have trashed our pointers to in the above loop, and
1252 * any aggregate temp data). We don't rely on retail pfree because some
1253 * aggregates might have allocated data we don't have direct pointers to.
1254 */
1255 MemoryContextResetAndDeleteChildren(winstate->partcontext);
1256 MemoryContextResetAndDeleteChildren(winstate->aggcontext);
1257 for (i = 0; i < winstate->numaggs; i++)
1258 {
1259 if (winstate->peragg[i].aggcontext != winstate->aggcontext)
1260 MemoryContextResetAndDeleteChildren(winstate->peragg[i].aggcontext);
1261 }
1262
1263 if (winstate->buffer)
1264 tuplestore_end(winstate->buffer);
1265 winstate->buffer = NULL;
1266 winstate->partition_spooled = false;
1267 }
1268
1269 /*
1270 * row_is_in_frame
1271 * Determine whether a row is in the current row's window frame according
1272 * to our window framing rule
1273 *
1274 * The caller must have already determined that the row is in the partition
1275 * and fetched it into a slot. This function just encapsulates the framing
1276 * rules.
1277 */
1278 static bool
row_is_in_frame(WindowAggState * winstate,int64 pos,TupleTableSlot * slot)1279 row_is_in_frame(WindowAggState *winstate, int64 pos, TupleTableSlot *slot)
1280 {
1281 int frameOptions = winstate->frameOptions;
1282
1283 Assert(pos >= 0); /* else caller error */
1284
1285 /* First, check frame starting conditions */
1286 if (frameOptions & FRAMEOPTION_START_CURRENT_ROW)
1287 {
1288 if (frameOptions & FRAMEOPTION_ROWS)
1289 {
1290 /* rows before current row are out of frame */
1291 if (pos < winstate->currentpos)
1292 return false;
1293 }
1294 else if (frameOptions & FRAMEOPTION_RANGE)
1295 {
1296 /* preceding row that is not peer is out of frame */
1297 if (pos < winstate->currentpos &&
1298 !are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
1299 return false;
1300 }
1301 else
1302 Assert(false);
1303 }
1304 else if (frameOptions & FRAMEOPTION_START_VALUE)
1305 {
1306 if (frameOptions & FRAMEOPTION_ROWS)
1307 {
1308 int64 offset = DatumGetInt64(winstate->startOffsetValue);
1309
1310 /* rows before current row + offset are out of frame */
1311 if (frameOptions & FRAMEOPTION_START_VALUE_PRECEDING)
1312 offset = -offset;
1313
1314 if (pos < winstate->currentpos + offset)
1315 return false;
1316 }
1317 else if (frameOptions & FRAMEOPTION_RANGE)
1318 {
1319 /* parser should have rejected this */
1320 elog(ERROR, "window frame with value offset is not implemented");
1321 }
1322 else
1323 Assert(false);
1324 }
1325
1326 /* Okay so far, now check frame ending conditions */
1327 if (frameOptions & FRAMEOPTION_END_CURRENT_ROW)
1328 {
1329 if (frameOptions & FRAMEOPTION_ROWS)
1330 {
1331 /* rows after current row are out of frame */
1332 if (pos > winstate->currentpos)
1333 return false;
1334 }
1335 else if (frameOptions & FRAMEOPTION_RANGE)
1336 {
1337 /* following row that is not peer is out of frame */
1338 if (pos > winstate->currentpos &&
1339 !are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
1340 return false;
1341 }
1342 else
1343 Assert(false);
1344 }
1345 else if (frameOptions & FRAMEOPTION_END_VALUE)
1346 {
1347 if (frameOptions & FRAMEOPTION_ROWS)
1348 {
1349 int64 offset = DatumGetInt64(winstate->endOffsetValue);
1350
1351 /* rows after current row + offset are out of frame */
1352 if (frameOptions & FRAMEOPTION_END_VALUE_PRECEDING)
1353 offset = -offset;
1354
1355 if (pos > winstate->currentpos + offset)
1356 return false;
1357 }
1358 else if (frameOptions & FRAMEOPTION_RANGE)
1359 {
1360 /* parser should have rejected this */
1361 elog(ERROR, "window frame with value offset is not implemented");
1362 }
1363 else
1364 Assert(false);
1365 }
1366
1367 /* If we get here, it's in frame */
1368 return true;
1369 }
1370
1371 /*
1372 * update_frameheadpos
1373 * make frameheadpos valid for the current row
1374 *
1375 * Uses the winobj's read pointer for any required fetches; hence, if the
1376 * frame mode is one that requires row comparisons, the winobj's mark must
1377 * not be past the currently known frame head. Also uses the specified slot
1378 * for any required fetches.
1379 */
1380 static void
update_frameheadpos(WindowObject winobj,TupleTableSlot * slot)1381 update_frameheadpos(WindowObject winobj, TupleTableSlot *slot)
1382 {
1383 WindowAggState *winstate = winobj->winstate;
1384 WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
1385 int frameOptions = winstate->frameOptions;
1386
1387 if (winstate->framehead_valid)
1388 return; /* already known for current row */
1389
1390 if (frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING)
1391 {
1392 /* In UNBOUNDED PRECEDING mode, frame head is always row 0 */
1393 winstate->frameheadpos = 0;
1394 winstate->framehead_valid = true;
1395 }
1396 else if (frameOptions & FRAMEOPTION_START_CURRENT_ROW)
1397 {
1398 if (frameOptions & FRAMEOPTION_ROWS)
1399 {
1400 /* In ROWS mode, frame head is the same as current */
1401 winstate->frameheadpos = winstate->currentpos;
1402 winstate->framehead_valid = true;
1403 }
1404 else if (frameOptions & FRAMEOPTION_RANGE)
1405 {
1406 int64 fhprev;
1407
1408 /* If no ORDER BY, all rows are peers with each other */
1409 if (node->ordNumCols == 0)
1410 {
1411 winstate->frameheadpos = 0;
1412 winstate->framehead_valid = true;
1413 return;
1414 }
1415
1416 /*
1417 * In RANGE START_CURRENT mode, frame head is the first row that
1418 * is a peer of current row. We search backwards from current,
1419 * which could be a bit inefficient if peer sets are large. Might
1420 * be better to have a separate read pointer that moves forward
1421 * tracking the frame head.
1422 */
1423 fhprev = winstate->currentpos - 1;
1424 for (;;)
1425 {
1426 /* assume the frame head can't go backwards */
1427 if (fhprev < winstate->frameheadpos)
1428 break;
1429 if (!window_gettupleslot(winobj, fhprev, slot))
1430 break; /* start of partition */
1431 if (!are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
1432 break; /* not peer of current row */
1433 fhprev--;
1434 }
1435 winstate->frameheadpos = fhprev + 1;
1436 winstate->framehead_valid = true;
1437 }
1438 else
1439 Assert(false);
1440 }
1441 else if (frameOptions & FRAMEOPTION_START_VALUE)
1442 {
1443 if (frameOptions & FRAMEOPTION_ROWS)
1444 {
1445 /* In ROWS mode, bound is physically n before/after current */
1446 int64 offset = DatumGetInt64(winstate->startOffsetValue);
1447
1448 if (frameOptions & FRAMEOPTION_START_VALUE_PRECEDING)
1449 offset = -offset;
1450
1451 winstate->frameheadpos = winstate->currentpos + offset;
1452 /* frame head can't go before first row */
1453 if (winstate->frameheadpos < 0)
1454 winstate->frameheadpos = 0;
1455 else if (winstate->frameheadpos > winstate->currentpos)
1456 {
1457 /* make sure frameheadpos is not past end of partition */
1458 spool_tuples(winstate, winstate->frameheadpos - 1);
1459 if (winstate->frameheadpos > winstate->spooled_rows)
1460 winstate->frameheadpos = winstate->spooled_rows;
1461 }
1462 winstate->framehead_valid = true;
1463 }
1464 else if (frameOptions & FRAMEOPTION_RANGE)
1465 {
1466 /* parser should have rejected this */
1467 elog(ERROR, "window frame with value offset is not implemented");
1468 }
1469 else
1470 Assert(false);
1471 }
1472 else
1473 Assert(false);
1474 }
1475
1476 /*
1477 * update_frametailpos
1478 * make frametailpos valid for the current row
1479 *
1480 * Uses the winobj's read pointer for any required fetches; hence, if the
1481 * frame mode is one that requires row comparisons, the winobj's mark must
1482 * not be past the currently known frame tail. Also uses the specified slot
1483 * for any required fetches.
1484 */
1485 static void
update_frametailpos(WindowObject winobj,TupleTableSlot * slot)1486 update_frametailpos(WindowObject winobj, TupleTableSlot *slot)
1487 {
1488 WindowAggState *winstate = winobj->winstate;
1489 WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
1490 int frameOptions = winstate->frameOptions;
1491
1492 if (winstate->frametail_valid)
1493 return; /* already known for current row */
1494
1495 if (frameOptions & FRAMEOPTION_END_UNBOUNDED_FOLLOWING)
1496 {
1497 /* In UNBOUNDED FOLLOWING mode, all partition rows are in frame */
1498 spool_tuples(winstate, -1);
1499 winstate->frametailpos = winstate->spooled_rows - 1;
1500 winstate->frametail_valid = true;
1501 }
1502 else if (frameOptions & FRAMEOPTION_END_CURRENT_ROW)
1503 {
1504 if (frameOptions & FRAMEOPTION_ROWS)
1505 {
1506 /* In ROWS mode, exactly the rows up to current are in frame */
1507 winstate->frametailpos = winstate->currentpos;
1508 winstate->frametail_valid = true;
1509 }
1510 else if (frameOptions & FRAMEOPTION_RANGE)
1511 {
1512 int64 ftnext;
1513
1514 /* If no ORDER BY, all rows are peers with each other */
1515 if (node->ordNumCols == 0)
1516 {
1517 spool_tuples(winstate, -1);
1518 winstate->frametailpos = winstate->spooled_rows - 1;
1519 winstate->frametail_valid = true;
1520 return;
1521 }
1522
1523 /*
1524 * Else we have to search for the first non-peer of the current
1525 * row. We assume the current value of frametailpos is a lower
1526 * bound on the possible frame tail location, ie, frame tail never
1527 * goes backward, and that currentpos is also a lower bound, ie,
1528 * frame end always >= current row.
1529 */
1530 ftnext = Max(winstate->frametailpos, winstate->currentpos) + 1;
1531 for (;;)
1532 {
1533 if (!window_gettupleslot(winobj, ftnext, slot))
1534 break; /* end of partition */
1535 if (!are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
1536 break; /* not peer of current row */
1537 ftnext++;
1538 }
1539 winstate->frametailpos = ftnext - 1;
1540 winstate->frametail_valid = true;
1541 }
1542 else
1543 Assert(false);
1544 }
1545 else if (frameOptions & FRAMEOPTION_END_VALUE)
1546 {
1547 if (frameOptions & FRAMEOPTION_ROWS)
1548 {
1549 /* In ROWS mode, bound is physically n before/after current */
1550 int64 offset = DatumGetInt64(winstate->endOffsetValue);
1551
1552 if (frameOptions & FRAMEOPTION_END_VALUE_PRECEDING)
1553 offset = -offset;
1554
1555 winstate->frametailpos = winstate->currentpos + offset;
1556 /* smallest allowable value of frametailpos is -1 */
1557 if (winstate->frametailpos < 0)
1558 winstate->frametailpos = -1;
1559 else if (winstate->frametailpos > winstate->currentpos)
1560 {
1561 /* make sure frametailpos is not past last row of partition */
1562 spool_tuples(winstate, winstate->frametailpos);
1563 if (winstate->frametailpos >= winstate->spooled_rows)
1564 winstate->frametailpos = winstate->spooled_rows - 1;
1565 }
1566 winstate->frametail_valid = true;
1567 }
1568 else if (frameOptions & FRAMEOPTION_RANGE)
1569 {
1570 /* parser should have rejected this */
1571 elog(ERROR, "window frame with value offset is not implemented");
1572 }
1573 else
1574 Assert(false);
1575 }
1576 else
1577 Assert(false);
1578 }
1579
1580
1581 /* -----------------
1582 * ExecWindowAgg
1583 *
1584 * ExecWindowAgg receives tuples from its outer subplan and
1585 * stores them into a tuplestore, then processes window functions.
1586 * This node doesn't reduce nor qualify any row so the number of
1587 * returned rows is exactly the same as its outer subplan's result
1588 * (ignoring the case of SRFs in the targetlist, that is).
1589 * -----------------
1590 */
1591 TupleTableSlot *
ExecWindowAgg(WindowAggState * winstate)1592 ExecWindowAgg(WindowAggState *winstate)
1593 {
1594 TupleTableSlot *result;
1595 ExprDoneCond isDone;
1596 ExprContext *econtext;
1597 int i;
1598 int numfuncs;
1599
1600 if (winstate->all_done)
1601 return NULL;
1602
1603 /*
1604 * Check to see if we're still projecting out tuples from a previous
1605 * output tuple (because there is a function-returning-set in the
1606 * projection expressions). If so, try to project another one.
1607 */
1608 if (winstate->ss.ps.ps_TupFromTlist)
1609 {
1610 TupleTableSlot *result;
1611 ExprDoneCond isDone;
1612
1613 result = ExecProject(winstate->ss.ps.ps_ProjInfo, &isDone);
1614 if (isDone == ExprMultipleResult)
1615 return result;
1616 /* Done with that source tuple... */
1617 winstate->ss.ps.ps_TupFromTlist = false;
1618 }
1619
1620 /*
1621 * Compute frame offset values, if any, during first call.
1622 */
1623 if (winstate->all_first)
1624 {
1625 int frameOptions = winstate->frameOptions;
1626 ExprContext *econtext = winstate->ss.ps.ps_ExprContext;
1627 Datum value;
1628 bool isnull;
1629 int16 len;
1630 bool byval;
1631
1632 if (frameOptions & FRAMEOPTION_START_VALUE)
1633 {
1634 Assert(winstate->startOffset != NULL);
1635 value = ExecEvalExprSwitchContext(winstate->startOffset,
1636 econtext,
1637 &isnull,
1638 NULL);
1639 if (isnull)
1640 ereport(ERROR,
1641 (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
1642 errmsg("frame starting offset must not be null")));
1643 /* copy value into query-lifespan context */
1644 get_typlenbyval(exprType((Node *) winstate->startOffset->expr),
1645 &len, &byval);
1646 winstate->startOffsetValue = datumCopy(value, byval, len);
1647 if (frameOptions & FRAMEOPTION_ROWS)
1648 {
1649 /* value is known to be int8 */
1650 int64 offset = DatumGetInt64(value);
1651
1652 if (offset < 0)
1653 ereport(ERROR,
1654 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1655 errmsg("frame starting offset must not be negative")));
1656 }
1657 }
1658 if (frameOptions & FRAMEOPTION_END_VALUE)
1659 {
1660 Assert(winstate->endOffset != NULL);
1661 value = ExecEvalExprSwitchContext(winstate->endOffset,
1662 econtext,
1663 &isnull,
1664 NULL);
1665 if (isnull)
1666 ereport(ERROR,
1667 (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
1668 errmsg("frame ending offset must not be null")));
1669 /* copy value into query-lifespan context */
1670 get_typlenbyval(exprType((Node *) winstate->endOffset->expr),
1671 &len, &byval);
1672 winstate->endOffsetValue = datumCopy(value, byval, len);
1673 if (frameOptions & FRAMEOPTION_ROWS)
1674 {
1675 /* value is known to be int8 */
1676 int64 offset = DatumGetInt64(value);
1677
1678 if (offset < 0)
1679 ereport(ERROR,
1680 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1681 errmsg("frame ending offset must not be negative")));
1682 }
1683 }
1684 winstate->all_first = false;
1685 }
1686
1687 restart:
1688 if (winstate->buffer == NULL)
1689 {
1690 /* Initialize for first partition and set current row = 0 */
1691 begin_partition(winstate);
1692 /* If there are no input rows, we'll detect that and exit below */
1693 }
1694 else
1695 {
1696 /* Advance current row within partition */
1697 winstate->currentpos++;
1698 /* This might mean that the frame moves, too */
1699 winstate->framehead_valid = false;
1700 winstate->frametail_valid = false;
1701 }
1702
1703 /*
1704 * Spool all tuples up to and including the current row, if we haven't
1705 * already
1706 */
1707 spool_tuples(winstate, winstate->currentpos);
1708
1709 /* Move to the next partition if we reached the end of this partition */
1710 if (winstate->partition_spooled &&
1711 winstate->currentpos >= winstate->spooled_rows)
1712 {
1713 release_partition(winstate);
1714
1715 if (winstate->more_partitions)
1716 {
1717 begin_partition(winstate);
1718 Assert(winstate->spooled_rows > 0);
1719 }
1720 else
1721 {
1722 winstate->all_done = true;
1723 return NULL;
1724 }
1725 }
1726
1727 /* final output execution is in ps_ExprContext */
1728 econtext = winstate->ss.ps.ps_ExprContext;
1729
1730 /* Clear the per-output-tuple context for current row */
1731 ResetExprContext(econtext);
1732
1733 /*
1734 * Read the current row from the tuplestore, and save in ScanTupleSlot.
1735 * (We can't rely on the outerplan's output slot because we may have to
1736 * read beyond the current row. Also, we have to actually copy the row
1737 * out of the tuplestore, since window function evaluation might cause the
1738 * tuplestore to dump its state to disk.)
1739 *
1740 * Current row must be in the tuplestore, since we spooled it above.
1741 */
1742 tuplestore_select_read_pointer(winstate->buffer, winstate->current_ptr);
1743 if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1744 winstate->ss.ss_ScanTupleSlot))
1745 elog(ERROR, "unexpected end of tuplestore");
1746
1747 /*
1748 * Evaluate true window functions
1749 */
1750 numfuncs = winstate->numfuncs;
1751 for (i = 0; i < numfuncs; i++)
1752 {
1753 WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
1754
1755 if (perfuncstate->plain_agg)
1756 continue;
1757 eval_windowfunction(winstate, perfuncstate,
1758 &(econtext->ecxt_aggvalues[perfuncstate->wfuncstate->wfuncno]),
1759 &(econtext->ecxt_aggnulls[perfuncstate->wfuncstate->wfuncno]));
1760 }
1761
1762 /*
1763 * Evaluate aggregates
1764 */
1765 if (winstate->numaggs > 0)
1766 eval_windowaggregates(winstate);
1767
1768 /*
1769 * Truncate any no-longer-needed rows from the tuplestore.
1770 */
1771 tuplestore_trim(winstate->buffer);
1772
1773 /*
1774 * Form and return a projection tuple using the windowfunc results and the
1775 * current row. Setting ecxt_outertuple arranges that any Vars will be
1776 * evaluated with respect to that row.
1777 */
1778 econtext->ecxt_outertuple = winstate->ss.ss_ScanTupleSlot;
1779 result = ExecProject(winstate->ss.ps.ps_ProjInfo, &isDone);
1780
1781 if (isDone == ExprEndResult)
1782 {
1783 /* SRF in tlist returned no rows, so advance to next input tuple */
1784 goto restart;
1785 }
1786
1787 winstate->ss.ps.ps_TupFromTlist =
1788 (isDone == ExprMultipleResult);
1789 return result;
1790 }
1791
1792 /* -----------------
1793 * ExecInitWindowAgg
1794 *
1795 * Creates the run-time information for the WindowAgg node produced by the
1796 * planner and initializes its outer subtree
1797 * -----------------
1798 */
1799 WindowAggState *
ExecInitWindowAgg(WindowAgg * node,EState * estate,int eflags)1800 ExecInitWindowAgg(WindowAgg *node, EState *estate, int eflags)
1801 {
1802 WindowAggState *winstate;
1803 Plan *outerPlan;
1804 ExprContext *econtext;
1805 ExprContext *tmpcontext;
1806 WindowStatePerFunc perfunc;
1807 WindowStatePerAgg peragg;
1808 int numfuncs,
1809 wfuncno,
1810 numaggs,
1811 aggno;
1812 ListCell *l;
1813
1814 /* check for unsupported flags */
1815 Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
1816
1817 /*
1818 * create state structure
1819 */
1820 winstate = makeNode(WindowAggState);
1821 winstate->ss.ps.plan = (Plan *) node;
1822 winstate->ss.ps.state = estate;
1823
1824 /*
1825 * Create expression contexts. We need two, one for per-input-tuple
1826 * processing and one for per-output-tuple processing. We cheat a little
1827 * by using ExecAssignExprContext() to build both.
1828 */
1829 ExecAssignExprContext(estate, &winstate->ss.ps);
1830 tmpcontext = winstate->ss.ps.ps_ExprContext;
1831 winstate->tmpcontext = tmpcontext;
1832 ExecAssignExprContext(estate, &winstate->ss.ps);
1833
1834 /* Create long-lived context for storage of partition-local memory etc */
1835 winstate->partcontext =
1836 AllocSetContextCreate(CurrentMemoryContext,
1837 "WindowAgg Partition",
1838 ALLOCSET_DEFAULT_SIZES);
1839
1840 /*
1841 * Create mid-lived context for aggregate trans values etc.
1842 *
1843 * Note that moving aggregates each use their own private context, not
1844 * this one.
1845 */
1846 winstate->aggcontext =
1847 AllocSetContextCreate(CurrentMemoryContext,
1848 "WindowAgg Aggregates",
1849 ALLOCSET_DEFAULT_SIZES);
1850
1851 /*
1852 * tuple table initialization
1853 */
1854 ExecInitScanTupleSlot(estate, &winstate->ss);
1855 ExecInitResultTupleSlot(estate, &winstate->ss.ps);
1856 winstate->first_part_slot = ExecInitExtraTupleSlot(estate);
1857 winstate->agg_row_slot = ExecInitExtraTupleSlot(estate);
1858 winstate->temp_slot_1 = ExecInitExtraTupleSlot(estate);
1859 winstate->temp_slot_2 = ExecInitExtraTupleSlot(estate);
1860
1861 winstate->ss.ps.targetlist = (List *)
1862 ExecInitExpr((Expr *) node->plan.targetlist,
1863 (PlanState *) winstate);
1864
1865 /*
1866 * WindowAgg nodes never have quals, since they can only occur at the
1867 * logical top level of a query (ie, after any WHERE or HAVING filters)
1868 */
1869 Assert(node->plan.qual == NIL);
1870 winstate->ss.ps.qual = NIL;
1871
1872 /*
1873 * initialize child nodes
1874 */
1875 outerPlan = outerPlan(node);
1876 outerPlanState(winstate) = ExecInitNode(outerPlan, estate, eflags);
1877
1878 /*
1879 * initialize source tuple type (which is also the tuple type that we'll
1880 * store in the tuplestore and use in all our working slots).
1881 */
1882 ExecAssignScanTypeFromOuterPlan(&winstate->ss);
1883
1884 ExecSetSlotDescriptor(winstate->first_part_slot,
1885 winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor);
1886 ExecSetSlotDescriptor(winstate->agg_row_slot,
1887 winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor);
1888 ExecSetSlotDescriptor(winstate->temp_slot_1,
1889 winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor);
1890 ExecSetSlotDescriptor(winstate->temp_slot_2,
1891 winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor);
1892
1893 /*
1894 * Initialize result tuple type and projection info.
1895 */
1896 ExecAssignResultTypeFromTL(&winstate->ss.ps);
1897 ExecAssignProjectionInfo(&winstate->ss.ps, NULL);
1898
1899 winstate->ss.ps.ps_TupFromTlist = false;
1900
1901 /* Set up data for comparing tuples */
1902 if (node->partNumCols > 0)
1903 winstate->partEqfunctions = execTuplesMatchPrepare(node->partNumCols,
1904 node->partOperators);
1905 if (node->ordNumCols > 0)
1906 winstate->ordEqfunctions = execTuplesMatchPrepare(node->ordNumCols,
1907 node->ordOperators);
1908
1909 /*
1910 * WindowAgg nodes use aggvalues and aggnulls as well as Agg nodes.
1911 */
1912 numfuncs = winstate->numfuncs;
1913 numaggs = winstate->numaggs;
1914 econtext = winstate->ss.ps.ps_ExprContext;
1915 econtext->ecxt_aggvalues = (Datum *) palloc0(sizeof(Datum) * numfuncs);
1916 econtext->ecxt_aggnulls = (bool *) palloc0(sizeof(bool) * numfuncs);
1917
1918 /*
1919 * allocate per-wfunc/per-agg state information.
1920 */
1921 perfunc = (WindowStatePerFunc) palloc0(sizeof(WindowStatePerFuncData) * numfuncs);
1922 peragg = (WindowStatePerAgg) palloc0(sizeof(WindowStatePerAggData) * numaggs);
1923 winstate->perfunc = perfunc;
1924 winstate->peragg = peragg;
1925
1926 wfuncno = -1;
1927 aggno = -1;
1928 foreach(l, winstate->funcs)
1929 {
1930 WindowFuncExprState *wfuncstate = (WindowFuncExprState *) lfirst(l);
1931 WindowFunc *wfunc = (WindowFunc *) wfuncstate->xprstate.expr;
1932 WindowStatePerFunc perfuncstate;
1933 AclResult aclresult;
1934 int i;
1935
1936 if (wfunc->winref != node->winref) /* planner screwed up? */
1937 elog(ERROR, "WindowFunc with winref %u assigned to WindowAgg with winref %u",
1938 wfunc->winref, node->winref);
1939
1940 /* Look for a previous duplicate window function */
1941 for (i = 0; i <= wfuncno; i++)
1942 {
1943 if (equal(wfunc, perfunc[i].wfunc) &&
1944 !contain_volatile_functions((Node *) wfunc))
1945 break;
1946 }
1947 if (i <= wfuncno)
1948 {
1949 /* Found a match to an existing entry, so just mark it */
1950 wfuncstate->wfuncno = i;
1951 continue;
1952 }
1953
1954 /* Nope, so assign a new PerAgg record */
1955 perfuncstate = &perfunc[++wfuncno];
1956
1957 /* Mark WindowFunc state node with assigned index in the result array */
1958 wfuncstate->wfuncno = wfuncno;
1959
1960 /* Check permission to call window function */
1961 aclresult = pg_proc_aclcheck(wfunc->winfnoid, GetUserId(),
1962 ACL_EXECUTE);
1963 if (aclresult != ACLCHECK_OK)
1964 aclcheck_error(aclresult, ACL_KIND_PROC,
1965 get_func_name(wfunc->winfnoid));
1966 InvokeFunctionExecuteHook(wfunc->winfnoid);
1967
1968 /* Fill in the perfuncstate data */
1969 perfuncstate->wfuncstate = wfuncstate;
1970 perfuncstate->wfunc = wfunc;
1971 perfuncstate->numArguments = list_length(wfuncstate->args);
1972
1973 fmgr_info_cxt(wfunc->winfnoid, &perfuncstate->flinfo,
1974 econtext->ecxt_per_query_memory);
1975 fmgr_info_set_expr((Node *) wfunc, &perfuncstate->flinfo);
1976
1977 perfuncstate->winCollation = wfunc->inputcollid;
1978
1979 get_typlenbyval(wfunc->wintype,
1980 &perfuncstate->resulttypeLen,
1981 &perfuncstate->resulttypeByVal);
1982
1983 /*
1984 * If it's really just a plain aggregate function, we'll emulate the
1985 * Agg environment for it.
1986 */
1987 perfuncstate->plain_agg = wfunc->winagg;
1988 if (wfunc->winagg)
1989 {
1990 WindowStatePerAgg peraggstate;
1991
1992 perfuncstate->aggno = ++aggno;
1993 peraggstate = &winstate->peragg[aggno];
1994 initialize_peragg(winstate, wfunc, peraggstate);
1995 peraggstate->wfuncno = wfuncno;
1996 }
1997 else
1998 {
1999 WindowObject winobj = makeNode(WindowObjectData);
2000
2001 winobj->winstate = winstate;
2002 winobj->argstates = wfuncstate->args;
2003 winobj->localmem = NULL;
2004 perfuncstate->winobj = winobj;
2005 }
2006 }
2007
2008 /* Update numfuncs, numaggs to match number of unique functions found */
2009 winstate->numfuncs = wfuncno + 1;
2010 winstate->numaggs = aggno + 1;
2011
2012 /* Set up WindowObject for aggregates, if needed */
2013 if (winstate->numaggs > 0)
2014 {
2015 WindowObject agg_winobj = makeNode(WindowObjectData);
2016
2017 agg_winobj->winstate = winstate;
2018 agg_winobj->argstates = NIL;
2019 agg_winobj->localmem = NULL;
2020 /* make sure markptr = -1 to invalidate. It may not get used */
2021 agg_winobj->markptr = -1;
2022 agg_winobj->readptr = -1;
2023 winstate->agg_winobj = agg_winobj;
2024 }
2025
2026 /* copy frame options to state node for easy access */
2027 winstate->frameOptions = node->frameOptions;
2028
2029 /* initialize frame bound offset expressions */
2030 winstate->startOffset = ExecInitExpr((Expr *) node->startOffset,
2031 (PlanState *) winstate);
2032 winstate->endOffset = ExecInitExpr((Expr *) node->endOffset,
2033 (PlanState *) winstate);
2034
2035 winstate->all_first = true;
2036 winstate->partition_spooled = false;
2037 winstate->more_partitions = false;
2038
2039 return winstate;
2040 }
2041
2042 /* -----------------
2043 * ExecEndWindowAgg
2044 * -----------------
2045 */
2046 void
ExecEndWindowAgg(WindowAggState * node)2047 ExecEndWindowAgg(WindowAggState *node)
2048 {
2049 PlanState *outerPlan;
2050 int i;
2051
2052 release_partition(node);
2053
2054 ExecClearTuple(node->ss.ss_ScanTupleSlot);
2055 ExecClearTuple(node->first_part_slot);
2056 ExecClearTuple(node->agg_row_slot);
2057 ExecClearTuple(node->temp_slot_1);
2058 ExecClearTuple(node->temp_slot_2);
2059
2060 /*
2061 * Free both the expr contexts.
2062 */
2063 ExecFreeExprContext(&node->ss.ps);
2064 node->ss.ps.ps_ExprContext = node->tmpcontext;
2065 ExecFreeExprContext(&node->ss.ps);
2066
2067 for (i = 0; i < node->numaggs; i++)
2068 {
2069 if (node->peragg[i].aggcontext != node->aggcontext)
2070 MemoryContextDelete(node->peragg[i].aggcontext);
2071 }
2072 MemoryContextDelete(node->partcontext);
2073 MemoryContextDelete(node->aggcontext);
2074
2075 pfree(node->perfunc);
2076 pfree(node->peragg);
2077
2078 outerPlan = outerPlanState(node);
2079 ExecEndNode(outerPlan);
2080 }
2081
2082 /* -----------------
2083 * ExecReScanWindowAgg
2084 * -----------------
2085 */
2086 void
ExecReScanWindowAgg(WindowAggState * node)2087 ExecReScanWindowAgg(WindowAggState *node)
2088 {
2089 PlanState *outerPlan = outerPlanState(node);
2090 ExprContext *econtext = node->ss.ps.ps_ExprContext;
2091
2092 node->all_done = false;
2093
2094 node->ss.ps.ps_TupFromTlist = false;
2095 node->all_first = true;
2096
2097 /* release tuplestore et al */
2098 release_partition(node);
2099
2100 /* release all temp tuples, but especially first_part_slot */
2101 ExecClearTuple(node->ss.ss_ScanTupleSlot);
2102 ExecClearTuple(node->first_part_slot);
2103 ExecClearTuple(node->agg_row_slot);
2104 ExecClearTuple(node->temp_slot_1);
2105 ExecClearTuple(node->temp_slot_2);
2106
2107 /* Forget current wfunc values */
2108 MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * node->numfuncs);
2109 MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * node->numfuncs);
2110
2111 /*
2112 * if chgParam of subnode is not null then plan will be re-scanned by
2113 * first ExecProcNode.
2114 */
2115 if (outerPlan->chgParam == NULL)
2116 ExecReScan(outerPlan);
2117 }
2118
2119 /*
2120 * initialize_peragg
2121 *
2122 * Almost same as in nodeAgg.c, except we don't support DISTINCT currently.
2123 */
2124 static WindowStatePerAggData *
initialize_peragg(WindowAggState * winstate,WindowFunc * wfunc,WindowStatePerAgg peraggstate)2125 initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
2126 WindowStatePerAgg peraggstate)
2127 {
2128 Oid inputTypes[FUNC_MAX_ARGS];
2129 int numArguments;
2130 HeapTuple aggTuple;
2131 Form_pg_aggregate aggform;
2132 Oid aggtranstype;
2133 AttrNumber initvalAttNo;
2134 AclResult aclresult;
2135 Oid transfn_oid,
2136 invtransfn_oid,
2137 finalfn_oid;
2138 bool finalextra;
2139 Expr *transfnexpr,
2140 *invtransfnexpr,
2141 *finalfnexpr;
2142 Datum textInitVal;
2143 int i;
2144 ListCell *lc;
2145
2146 numArguments = list_length(wfunc->args);
2147
2148 i = 0;
2149 foreach(lc, wfunc->args)
2150 {
2151 inputTypes[i++] = exprType((Node *) lfirst(lc));
2152 }
2153
2154 aggTuple = SearchSysCache1(AGGFNOID, ObjectIdGetDatum(wfunc->winfnoid));
2155 if (!HeapTupleIsValid(aggTuple))
2156 elog(ERROR, "cache lookup failed for aggregate %u",
2157 wfunc->winfnoid);
2158 aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
2159
2160 /*
2161 * Figure out whether we want to use the moving-aggregate implementation,
2162 * and collect the right set of fields from the pg_attribute entry.
2163 *
2164 * If the frame head can't move, we don't need moving-aggregate code. Even
2165 * if we'd like to use it, don't do so if the aggregate's arguments (and
2166 * FILTER clause if any) contain any calls to volatile functions.
2167 * Otherwise, the difference between restarting and not restarting the
2168 * aggregation would be user-visible.
2169 */
2170 if (OidIsValid(aggform->aggminvtransfn) &&
2171 !(winstate->frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING) &&
2172 !contain_volatile_functions((Node *) wfunc))
2173 {
2174 peraggstate->transfn_oid = transfn_oid = aggform->aggmtransfn;
2175 peraggstate->invtransfn_oid = invtransfn_oid = aggform->aggminvtransfn;
2176 peraggstate->finalfn_oid = finalfn_oid = aggform->aggmfinalfn;
2177 finalextra = aggform->aggmfinalextra;
2178 aggtranstype = aggform->aggmtranstype;
2179 initvalAttNo = Anum_pg_aggregate_aggminitval;
2180 }
2181 else
2182 {
2183 peraggstate->transfn_oid = transfn_oid = aggform->aggtransfn;
2184 peraggstate->invtransfn_oid = invtransfn_oid = InvalidOid;
2185 peraggstate->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
2186 finalextra = aggform->aggfinalextra;
2187 aggtranstype = aggform->aggtranstype;
2188 initvalAttNo = Anum_pg_aggregate_agginitval;
2189 }
2190
2191 /*
2192 * ExecInitWindowAgg already checked permission to call aggregate function
2193 * ... but we still need to check the component functions
2194 */
2195
2196 /* Check that aggregate owner has permission to call component fns */
2197 {
2198 HeapTuple procTuple;
2199 Oid aggOwner;
2200
2201 procTuple = SearchSysCache1(PROCOID,
2202 ObjectIdGetDatum(wfunc->winfnoid));
2203 if (!HeapTupleIsValid(procTuple))
2204 elog(ERROR, "cache lookup failed for function %u",
2205 wfunc->winfnoid);
2206 aggOwner = ((Form_pg_proc) GETSTRUCT(procTuple))->proowner;
2207 ReleaseSysCache(procTuple);
2208
2209 aclresult = pg_proc_aclcheck(transfn_oid, aggOwner,
2210 ACL_EXECUTE);
2211 if (aclresult != ACLCHECK_OK)
2212 aclcheck_error(aclresult, ACL_KIND_PROC,
2213 get_func_name(transfn_oid));
2214 InvokeFunctionExecuteHook(transfn_oid);
2215
2216 if (OidIsValid(invtransfn_oid))
2217 {
2218 aclresult = pg_proc_aclcheck(invtransfn_oid, aggOwner,
2219 ACL_EXECUTE);
2220 if (aclresult != ACLCHECK_OK)
2221 aclcheck_error(aclresult, ACL_KIND_PROC,
2222 get_func_name(invtransfn_oid));
2223 InvokeFunctionExecuteHook(invtransfn_oid);
2224 }
2225
2226 if (OidIsValid(finalfn_oid))
2227 {
2228 aclresult = pg_proc_aclcheck(finalfn_oid, aggOwner,
2229 ACL_EXECUTE);
2230 if (aclresult != ACLCHECK_OK)
2231 aclcheck_error(aclresult, ACL_KIND_PROC,
2232 get_func_name(finalfn_oid));
2233 InvokeFunctionExecuteHook(finalfn_oid);
2234 }
2235 }
2236
2237 /* Detect how many arguments to pass to the finalfn */
2238 if (finalextra)
2239 peraggstate->numFinalArgs = numArguments + 1;
2240 else
2241 peraggstate->numFinalArgs = 1;
2242
2243 /* resolve actual type of transition state, if polymorphic */
2244 aggtranstype = resolve_aggregate_transtype(wfunc->winfnoid,
2245 aggtranstype,
2246 inputTypes,
2247 numArguments);
2248
2249 /* build expression trees using actual argument & result types */
2250 build_aggregate_transfn_expr(inputTypes,
2251 numArguments,
2252 0, /* no ordered-set window functions yet */
2253 false, /* no variadic window functions yet */
2254 aggtranstype,
2255 wfunc->inputcollid,
2256 transfn_oid,
2257 invtransfn_oid,
2258 &transfnexpr,
2259 &invtransfnexpr);
2260
2261 /* set up infrastructure for calling the transfn(s) and finalfn */
2262 fmgr_info(transfn_oid, &peraggstate->transfn);
2263 fmgr_info_set_expr((Node *) transfnexpr, &peraggstate->transfn);
2264
2265 if (OidIsValid(invtransfn_oid))
2266 {
2267 fmgr_info(invtransfn_oid, &peraggstate->invtransfn);
2268 fmgr_info_set_expr((Node *) invtransfnexpr, &peraggstate->invtransfn);
2269 }
2270
2271 if (OidIsValid(finalfn_oid))
2272 {
2273 build_aggregate_finalfn_expr(inputTypes,
2274 peraggstate->numFinalArgs,
2275 aggtranstype,
2276 wfunc->wintype,
2277 wfunc->inputcollid,
2278 finalfn_oid,
2279 &finalfnexpr);
2280 fmgr_info(finalfn_oid, &peraggstate->finalfn);
2281 fmgr_info_set_expr((Node *) finalfnexpr, &peraggstate->finalfn);
2282 }
2283
2284 /* get info about relevant datatypes */
2285 get_typlenbyval(wfunc->wintype,
2286 &peraggstate->resulttypeLen,
2287 &peraggstate->resulttypeByVal);
2288 get_typlenbyval(aggtranstype,
2289 &peraggstate->transtypeLen,
2290 &peraggstate->transtypeByVal);
2291
2292 /*
2293 * initval is potentially null, so don't try to access it as a struct
2294 * field. Must do it the hard way with SysCacheGetAttr.
2295 */
2296 textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple, initvalAttNo,
2297 &peraggstate->initValueIsNull);
2298
2299 if (peraggstate->initValueIsNull)
2300 peraggstate->initValue = (Datum) 0;
2301 else
2302 peraggstate->initValue = GetAggInitVal(textInitVal,
2303 aggtranstype);
2304
2305 /*
2306 * If the transfn is strict and the initval is NULL, make sure input type
2307 * and transtype are the same (or at least binary-compatible), so that
2308 * it's OK to use the first input value as the initial transValue. This
2309 * should have been checked at agg definition time, but we must check
2310 * again in case the transfn's strictness property has been changed.
2311 */
2312 if (peraggstate->transfn.fn_strict && peraggstate->initValueIsNull)
2313 {
2314 if (numArguments < 1 ||
2315 !IsBinaryCoercible(inputTypes[0], aggtranstype))
2316 ereport(ERROR,
2317 (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
2318 errmsg("aggregate %u needs to have compatible input type and transition type",
2319 wfunc->winfnoid)));
2320 }
2321
2322 /*
2323 * Insist that forward and inverse transition functions have the same
2324 * strictness setting. Allowing them to differ would require handling
2325 * more special cases in advance_windowaggregate and
2326 * advance_windowaggregate_base, for no discernible benefit. This should
2327 * have been checked at agg definition time, but we must check again in
2328 * case either function's strictness property has been changed.
2329 */
2330 if (OidIsValid(invtransfn_oid) &&
2331 peraggstate->transfn.fn_strict != peraggstate->invtransfn.fn_strict)
2332 ereport(ERROR,
2333 (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
2334 errmsg("strictness of aggregate's forward and inverse transition functions must match")));
2335
2336 /*
2337 * Moving aggregates use their own aggcontext.
2338 *
2339 * This is necessary because they might restart at different times, so we
2340 * might never be able to reset the shared context otherwise. We can't
2341 * make it the aggregates' responsibility to clean up after themselves,
2342 * because strict aggregates must be restarted whenever we remove their
2343 * last non-NULL input, which the aggregate won't be aware is happening.
2344 * Also, just pfree()ing the transValue upon restarting wouldn't help,
2345 * since we'd miss any indirectly referenced data. We could, in theory,
2346 * make the memory allocation rules for moving aggregates different than
2347 * they have historically been for plain aggregates, but that seems grotty
2348 * and likely to lead to memory leaks.
2349 */
2350 if (OidIsValid(invtransfn_oid))
2351 peraggstate->aggcontext =
2352 AllocSetContextCreate(CurrentMemoryContext,
2353 "WindowAgg Per Aggregate",
2354 ALLOCSET_DEFAULT_SIZES);
2355 else
2356 peraggstate->aggcontext = winstate->aggcontext;
2357
2358 ReleaseSysCache(aggTuple);
2359
2360 return peraggstate;
2361 }
2362
2363 static Datum
GetAggInitVal(Datum textInitVal,Oid transtype)2364 GetAggInitVal(Datum textInitVal, Oid transtype)
2365 {
2366 Oid typinput,
2367 typioparam;
2368 char *strInitVal;
2369 Datum initVal;
2370
2371 getTypeInputInfo(transtype, &typinput, &typioparam);
2372 strInitVal = TextDatumGetCString(textInitVal);
2373 initVal = OidInputFunctionCall(typinput, strInitVal,
2374 typioparam, -1);
2375 pfree(strInitVal);
2376 return initVal;
2377 }
2378
2379 /*
2380 * are_peers
2381 * compare two rows to see if they are equal according to the ORDER BY clause
2382 *
2383 * NB: this does not consider the window frame mode.
2384 */
2385 static bool
are_peers(WindowAggState * winstate,TupleTableSlot * slot1,TupleTableSlot * slot2)2386 are_peers(WindowAggState *winstate, TupleTableSlot *slot1,
2387 TupleTableSlot *slot2)
2388 {
2389 WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
2390
2391 /* If no ORDER BY, all rows are peers with each other */
2392 if (node->ordNumCols == 0)
2393 return true;
2394
2395 return execTuplesMatch(slot1, slot2,
2396 node->ordNumCols, node->ordColIdx,
2397 winstate->ordEqfunctions,
2398 winstate->tmpcontext->ecxt_per_tuple_memory);
2399 }
2400
2401 /*
2402 * window_gettupleslot
2403 * Fetch the pos'th tuple of the current partition into the slot,
2404 * using the winobj's read pointer
2405 *
2406 * Returns true if successful, false if no such row
2407 */
2408 static bool
window_gettupleslot(WindowObject winobj,int64 pos,TupleTableSlot * slot)2409 window_gettupleslot(WindowObject winobj, int64 pos, TupleTableSlot *slot)
2410 {
2411 WindowAggState *winstate = winobj->winstate;
2412 MemoryContext oldcontext;
2413
2414 /* Don't allow passing -1 to spool_tuples here */
2415 if (pos < 0)
2416 return false;
2417
2418 /* If necessary, fetch the tuple into the spool */
2419 spool_tuples(winstate, pos);
2420
2421 if (pos >= winstate->spooled_rows)
2422 return false;
2423
2424 if (pos < winobj->markpos)
2425 elog(ERROR, "cannot fetch row before WindowObject's mark position");
2426
2427 oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
2428
2429 tuplestore_select_read_pointer(winstate->buffer, winobj->readptr);
2430
2431 /*
2432 * Advance or rewind until we are within one tuple of the one we want.
2433 */
2434 if (winobj->seekpos < pos - 1)
2435 {
2436 if (!tuplestore_skiptuples(winstate->buffer,
2437 pos - 1 - winobj->seekpos,
2438 true))
2439 elog(ERROR, "unexpected end of tuplestore");
2440 winobj->seekpos = pos - 1;
2441 }
2442 else if (winobj->seekpos > pos + 1)
2443 {
2444 if (!tuplestore_skiptuples(winstate->buffer,
2445 winobj->seekpos - (pos + 1),
2446 false))
2447 elog(ERROR, "unexpected end of tuplestore");
2448 winobj->seekpos = pos + 1;
2449 }
2450 else if (winobj->seekpos == pos)
2451 {
2452 /*
2453 * There's no API to refetch the tuple at the current position. We
2454 * have to move one tuple forward, and then one backward. (We don't
2455 * do it the other way because we might try to fetch the row before
2456 * our mark, which isn't allowed.) XXX this case could stand to be
2457 * optimized.
2458 */
2459 tuplestore_advance(winstate->buffer, true);
2460 winobj->seekpos++;
2461 }
2462
2463 /*
2464 * Now we should be on the tuple immediately before or after the one we
2465 * want, so just fetch forwards or backwards as appropriate.
2466 */
2467 if (winobj->seekpos > pos)
2468 {
2469 if (!tuplestore_gettupleslot(winstate->buffer, false, true, slot))
2470 elog(ERROR, "unexpected end of tuplestore");
2471 winobj->seekpos--;
2472 }
2473 else
2474 {
2475 if (!tuplestore_gettupleslot(winstate->buffer, true, true, slot))
2476 elog(ERROR, "unexpected end of tuplestore");
2477 winobj->seekpos++;
2478 }
2479
2480 Assert(winobj->seekpos == pos);
2481
2482 MemoryContextSwitchTo(oldcontext);
2483
2484 return true;
2485 }
2486
2487
2488 /***********************************************************************
2489 * API exposed to window functions
2490 ***********************************************************************/
2491
2492
2493 /*
2494 * WinGetPartitionLocalMemory
2495 * Get working memory that lives till end of partition processing
2496 *
2497 * On first call within a given partition, this allocates and zeroes the
2498 * requested amount of space. Subsequent calls just return the same chunk.
2499 *
2500 * Memory obtained this way is normally used to hold state that should be
2501 * automatically reset for each new partition. If a window function wants
2502 * to hold state across the whole query, fcinfo->fn_extra can be used in the
2503 * usual way for that.
2504 */
2505 void *
WinGetPartitionLocalMemory(WindowObject winobj,Size sz)2506 WinGetPartitionLocalMemory(WindowObject winobj, Size sz)
2507 {
2508 Assert(WindowObjectIsValid(winobj));
2509 if (winobj->localmem == NULL)
2510 winobj->localmem =
2511 MemoryContextAllocZero(winobj->winstate->partcontext, sz);
2512 return winobj->localmem;
2513 }
2514
2515 /*
2516 * WinGetCurrentPosition
2517 * Return the current row's position (counting from 0) within the current
2518 * partition.
2519 */
2520 int64
WinGetCurrentPosition(WindowObject winobj)2521 WinGetCurrentPosition(WindowObject winobj)
2522 {
2523 Assert(WindowObjectIsValid(winobj));
2524 return winobj->winstate->currentpos;
2525 }
2526
2527 /*
2528 * WinGetPartitionRowCount
2529 * Return total number of rows contained in the current partition.
2530 *
2531 * Note: this is a relatively expensive operation because it forces the
2532 * whole partition to be "spooled" into the tuplestore at once. Once
2533 * executed, however, additional calls within the same partition are cheap.
2534 */
2535 int64
WinGetPartitionRowCount(WindowObject winobj)2536 WinGetPartitionRowCount(WindowObject winobj)
2537 {
2538 Assert(WindowObjectIsValid(winobj));
2539 spool_tuples(winobj->winstate, -1);
2540 return winobj->winstate->spooled_rows;
2541 }
2542
2543 /*
2544 * WinSetMarkPosition
2545 * Set the "mark" position for the window object, which is the oldest row
2546 * number (counting from 0) it is allowed to fetch during all subsequent
2547 * operations within the current partition.
2548 *
2549 * Window functions do not have to call this, but are encouraged to move the
2550 * mark forward when possible to keep the tuplestore size down and prevent
2551 * having to spill rows to disk.
2552 */
2553 void
WinSetMarkPosition(WindowObject winobj,int64 markpos)2554 WinSetMarkPosition(WindowObject winobj, int64 markpos)
2555 {
2556 WindowAggState *winstate;
2557
2558 Assert(WindowObjectIsValid(winobj));
2559 winstate = winobj->winstate;
2560
2561 if (markpos < winobj->markpos)
2562 elog(ERROR, "cannot move WindowObject's mark position backward");
2563 tuplestore_select_read_pointer(winstate->buffer, winobj->markptr);
2564 if (markpos > winobj->markpos)
2565 {
2566 tuplestore_skiptuples(winstate->buffer,
2567 markpos - winobj->markpos,
2568 true);
2569 winobj->markpos = markpos;
2570 }
2571 tuplestore_select_read_pointer(winstate->buffer, winobj->readptr);
2572 if (markpos > winobj->seekpos)
2573 {
2574 tuplestore_skiptuples(winstate->buffer,
2575 markpos - winobj->seekpos,
2576 true);
2577 winobj->seekpos = markpos;
2578 }
2579 }
2580
2581 /*
2582 * WinRowsArePeers
2583 * Compare two rows (specified by absolute position in window) to see
2584 * if they are equal according to the ORDER BY clause.
2585 *
2586 * NB: this does not consider the window frame mode.
2587 */
2588 bool
WinRowsArePeers(WindowObject winobj,int64 pos1,int64 pos2)2589 WinRowsArePeers(WindowObject winobj, int64 pos1, int64 pos2)
2590 {
2591 WindowAggState *winstate;
2592 WindowAgg *node;
2593 TupleTableSlot *slot1;
2594 TupleTableSlot *slot2;
2595 bool res;
2596
2597 Assert(WindowObjectIsValid(winobj));
2598 winstate = winobj->winstate;
2599 node = (WindowAgg *) winstate->ss.ps.plan;
2600
2601 /* If no ORDER BY, all rows are peers; don't bother to fetch them */
2602 if (node->ordNumCols == 0)
2603 return true;
2604
2605 slot1 = winstate->temp_slot_1;
2606 slot2 = winstate->temp_slot_2;
2607
2608 if (!window_gettupleslot(winobj, pos1, slot1))
2609 elog(ERROR, "specified position is out of window: " INT64_FORMAT,
2610 pos1);
2611 if (!window_gettupleslot(winobj, pos2, slot2))
2612 elog(ERROR, "specified position is out of window: " INT64_FORMAT,
2613 pos2);
2614
2615 res = are_peers(winstate, slot1, slot2);
2616
2617 ExecClearTuple(slot1);
2618 ExecClearTuple(slot2);
2619
2620 return res;
2621 }
2622
2623 /*
2624 * WinGetFuncArgInPartition
2625 * Evaluate a window function's argument expression on a specified
2626 * row of the partition. The row is identified in lseek(2) style,
2627 * i.e. relative to the current, first, or last row.
2628 *
2629 * argno: argument number to evaluate (counted from 0)
2630 * relpos: signed rowcount offset from the seek position
2631 * seektype: WINDOW_SEEK_CURRENT, WINDOW_SEEK_HEAD, or WINDOW_SEEK_TAIL
2632 * set_mark: If the row is found and set_mark is true, the mark is moved to
2633 * the row as a side-effect.
2634 * isnull: output argument, receives isnull status of result
2635 * isout: output argument, set to indicate whether target row position
2636 * is out of partition (can pass NULL if caller doesn't care about this)
2637 *
2638 * Specifying a nonexistent row is not an error, it just causes a null result
2639 * (plus setting *isout true, if isout isn't NULL).
2640 */
2641 Datum
WinGetFuncArgInPartition(WindowObject winobj,int argno,int relpos,int seektype,bool set_mark,bool * isnull,bool * isout)2642 WinGetFuncArgInPartition(WindowObject winobj, int argno,
2643 int relpos, int seektype, bool set_mark,
2644 bool *isnull, bool *isout)
2645 {
2646 WindowAggState *winstate;
2647 ExprContext *econtext;
2648 TupleTableSlot *slot;
2649 bool gottuple;
2650 int64 abs_pos;
2651
2652 Assert(WindowObjectIsValid(winobj));
2653 winstate = winobj->winstate;
2654 econtext = winstate->ss.ps.ps_ExprContext;
2655 slot = winstate->temp_slot_1;
2656
2657 switch (seektype)
2658 {
2659 case WINDOW_SEEK_CURRENT:
2660 abs_pos = winstate->currentpos + relpos;
2661 break;
2662 case WINDOW_SEEK_HEAD:
2663 abs_pos = relpos;
2664 break;
2665 case WINDOW_SEEK_TAIL:
2666 spool_tuples(winstate, -1);
2667 abs_pos = winstate->spooled_rows - 1 + relpos;
2668 break;
2669 default:
2670 elog(ERROR, "unrecognized window seek type: %d", seektype);
2671 abs_pos = 0; /* keep compiler quiet */
2672 break;
2673 }
2674
2675 gottuple = window_gettupleslot(winobj, abs_pos, slot);
2676
2677 if (!gottuple)
2678 {
2679 if (isout)
2680 *isout = true;
2681 *isnull = true;
2682 return (Datum) 0;
2683 }
2684 else
2685 {
2686 if (isout)
2687 *isout = false;
2688 if (set_mark)
2689 {
2690 int frameOptions = winstate->frameOptions;
2691 int64 mark_pos = abs_pos;
2692
2693 /*
2694 * In RANGE mode with a moving frame head, we must not let the
2695 * mark advance past frameheadpos, since that row has to be
2696 * fetchable during future update_frameheadpos calls.
2697 *
2698 * XXX it is very ugly to pollute window functions' marks with
2699 * this consideration; it could for instance mask a logic bug that
2700 * lets a window function fetch rows before what it had claimed
2701 * was its mark. Perhaps use a separate mark for frame head
2702 * probes?
2703 */
2704 if ((frameOptions & FRAMEOPTION_RANGE) &&
2705 !(frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING))
2706 {
2707 update_frameheadpos(winobj, winstate->temp_slot_2);
2708 if (mark_pos > winstate->frameheadpos)
2709 mark_pos = winstate->frameheadpos;
2710 }
2711 WinSetMarkPosition(winobj, mark_pos);
2712 }
2713 econtext->ecxt_outertuple = slot;
2714 return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
2715 econtext, isnull, NULL);
2716 }
2717 }
2718
2719 /*
2720 * WinGetFuncArgInFrame
2721 * Evaluate a window function's argument expression on a specified
2722 * row of the window frame. The row is identified in lseek(2) style,
2723 * i.e. relative to the current, first, or last row.
2724 *
2725 * argno: argument number to evaluate (counted from 0)
2726 * relpos: signed rowcount offset from the seek position
2727 * seektype: WINDOW_SEEK_CURRENT, WINDOW_SEEK_HEAD, or WINDOW_SEEK_TAIL
2728 * set_mark: If the row is found and set_mark is true, the mark is moved to
2729 * the row as a side-effect.
2730 * isnull: output argument, receives isnull status of result
2731 * isout: output argument, set to indicate whether target row position
2732 * is out of frame (can pass NULL if caller doesn't care about this)
2733 *
2734 * Specifying a nonexistent row is not an error, it just causes a null result
2735 * (plus setting *isout true, if isout isn't NULL).
2736 */
2737 Datum
WinGetFuncArgInFrame(WindowObject winobj,int argno,int relpos,int seektype,bool set_mark,bool * isnull,bool * isout)2738 WinGetFuncArgInFrame(WindowObject winobj, int argno,
2739 int relpos, int seektype, bool set_mark,
2740 bool *isnull, bool *isout)
2741 {
2742 WindowAggState *winstate;
2743 ExprContext *econtext;
2744 TupleTableSlot *slot;
2745 bool gottuple;
2746 int64 abs_pos;
2747
2748 Assert(WindowObjectIsValid(winobj));
2749 winstate = winobj->winstate;
2750 econtext = winstate->ss.ps.ps_ExprContext;
2751 slot = winstate->temp_slot_1;
2752
2753 switch (seektype)
2754 {
2755 case WINDOW_SEEK_CURRENT:
2756 abs_pos = winstate->currentpos + relpos;
2757 break;
2758 case WINDOW_SEEK_HEAD:
2759 update_frameheadpos(winobj, slot);
2760 abs_pos = winstate->frameheadpos + relpos;
2761 break;
2762 case WINDOW_SEEK_TAIL:
2763 update_frametailpos(winobj, slot);
2764 abs_pos = winstate->frametailpos + relpos;
2765 break;
2766 default:
2767 elog(ERROR, "unrecognized window seek type: %d", seektype);
2768 abs_pos = 0; /* keep compiler quiet */
2769 break;
2770 }
2771
2772 gottuple = window_gettupleslot(winobj, abs_pos, slot);
2773 if (gottuple)
2774 gottuple = row_is_in_frame(winstate, abs_pos, slot);
2775
2776 if (!gottuple)
2777 {
2778 if (isout)
2779 *isout = true;
2780 *isnull = true;
2781 return (Datum) 0;
2782 }
2783 else
2784 {
2785 if (isout)
2786 *isout = false;
2787 if (set_mark)
2788 {
2789 int frameOptions = winstate->frameOptions;
2790 int64 mark_pos = abs_pos;
2791
2792 /*
2793 * In RANGE mode with a moving frame head, we must not let the
2794 * mark advance past frameheadpos, since that row has to be
2795 * fetchable during future update_frameheadpos calls.
2796 *
2797 * XXX it is very ugly to pollute window functions' marks with
2798 * this consideration; it could for instance mask a logic bug that
2799 * lets a window function fetch rows before what it had claimed
2800 * was its mark. Perhaps use a separate mark for frame head
2801 * probes?
2802 */
2803 if ((frameOptions & FRAMEOPTION_RANGE) &&
2804 !(frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING))
2805 {
2806 update_frameheadpos(winobj, winstate->temp_slot_2);
2807 if (mark_pos > winstate->frameheadpos)
2808 mark_pos = winstate->frameheadpos;
2809 }
2810 WinSetMarkPosition(winobj, mark_pos);
2811 }
2812 econtext->ecxt_outertuple = slot;
2813 return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
2814 econtext, isnull, NULL);
2815 }
2816 }
2817
2818 /*
2819 * WinGetFuncArgCurrent
2820 * Evaluate a window function's argument expression on the current row.
2821 *
2822 * argno: argument number to evaluate (counted from 0)
2823 * isnull: output argument, receives isnull status of result
2824 *
2825 * Note: this isn't quite equivalent to WinGetFuncArgInPartition or
2826 * WinGetFuncArgInFrame targeting the current row, because it will succeed
2827 * even if the WindowObject's mark has been set beyond the current row.
2828 * This should generally be used for "ordinary" arguments of a window
2829 * function, such as the offset argument of lead() or lag().
2830 */
2831 Datum
WinGetFuncArgCurrent(WindowObject winobj,int argno,bool * isnull)2832 WinGetFuncArgCurrent(WindowObject winobj, int argno, bool *isnull)
2833 {
2834 WindowAggState *winstate;
2835 ExprContext *econtext;
2836
2837 Assert(WindowObjectIsValid(winobj));
2838 winstate = winobj->winstate;
2839
2840 econtext = winstate->ss.ps.ps_ExprContext;
2841
2842 econtext->ecxt_outertuple = winstate->ss.ss_ScanTupleSlot;
2843 return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
2844 econtext, isnull, NULL);
2845 }
2846