1 /*-------------------------------------------------------------------------
2  *
3  * nodeWindowAgg.c
4  *	  routines to handle WindowAgg nodes.
5  *
6  * A WindowAgg node evaluates "window functions" across suitable partitions
7  * of the input tuple set.  Any one WindowAgg works for just a single window
8  * specification, though it can evaluate multiple window functions sharing
9  * identical window specifications.  The input tuples are required to be
10  * delivered in sorted order, with the PARTITION BY columns (if any) as
11  * major sort keys and the ORDER BY columns (if any) as minor sort keys.
12  * (The planner generates a stack of WindowAggs with intervening Sort nodes
13  * as needed, if a query involves more than one window specification.)
14  *
15  * Since window functions can require access to any or all of the rows in
16  * the current partition, we accumulate rows of the partition into a
17  * tuplestore.  The window functions are called using the WindowObject API
18  * so that they can access those rows as needed.
19  *
20  * We also support using plain aggregate functions as window functions.
21  * For these, the regular Agg-node environment is emulated for each partition.
22  * As required by the SQL spec, the output represents the value of the
23  * aggregate function over all rows in the current row's window frame.
24  *
25  *
26  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
27  * Portions Copyright (c) 1994, Regents of the University of California
28  *
29  * IDENTIFICATION
30  *	  src/backend/executor/nodeWindowAgg.c
31  *
32  *-------------------------------------------------------------------------
33  */
34 #include "postgres.h"
35 
36 #include "access/htup_details.h"
37 #include "catalog/objectaccess.h"
38 #include "catalog/pg_aggregate.h"
39 #include "catalog/pg_proc.h"
40 #include "executor/executor.h"
41 #include "executor/nodeWindowAgg.h"
42 #include "miscadmin.h"
43 #include "nodes/nodeFuncs.h"
44 #include "optimizer/optimizer.h"
45 #include "parser/parse_agg.h"
46 #include "parser/parse_coerce.h"
47 #include "utils/acl.h"
48 #include "utils/builtins.h"
49 #include "utils/datum.h"
50 #include "utils/lsyscache.h"
51 #include "utils/memutils.h"
52 #include "utils/regproc.h"
53 #include "utils/syscache.h"
54 #include "windowapi.h"
55 
56 /*
57  * All the window function APIs are called with this object, which is passed
58  * to window functions as fcinfo->context.
59  */
60 typedef struct WindowObjectData
61 {
62 	NodeTag		type;
63 	WindowAggState *winstate;	/* parent WindowAggState */
64 	List	   *argstates;		/* ExprState trees for fn's arguments */
65 	void	   *localmem;		/* WinGetPartitionLocalMemory's chunk */
66 	int			markptr;		/* tuplestore mark pointer for this fn */
67 	int			readptr;		/* tuplestore read pointer for this fn */
68 	int64		markpos;		/* row that markptr is positioned on */
69 	int64		seekpos;		/* row that readptr is positioned on */
70 } WindowObjectData;
71 
72 /*
73  * We have one WindowStatePerFunc struct for each window function and
74  * window aggregate handled by this node.
75  */
76 typedef struct WindowStatePerFuncData
77 {
78 	/* Links to WindowFunc expr and state nodes this working state is for */
79 	WindowFuncExprState *wfuncstate;
80 	WindowFunc *wfunc;
81 
82 	int			numArguments;	/* number of arguments */
83 
84 	FmgrInfo	flinfo;			/* fmgr lookup data for window function */
85 
86 	Oid			winCollation;	/* collation derived for window function */
87 
88 	/*
89 	 * We need the len and byval info for the result of each function in order
90 	 * to know how to copy/delete values.
91 	 */
92 	int16		resulttypeLen;
93 	bool		resulttypeByVal;
94 
95 	bool		plain_agg;		/* is it just a plain aggregate function? */
96 	int			aggno;			/* if so, index of its PerAggData */
97 
98 	WindowObject winobj;		/* object used in window function API */
99 }			WindowStatePerFuncData;
100 
101 /*
102  * For plain aggregate window functions, we also have one of these.
103  */
104 typedef struct WindowStatePerAggData
105 {
106 	/* Oids of transition functions */
107 	Oid			transfn_oid;
108 	Oid			invtransfn_oid; /* may be InvalidOid */
109 	Oid			finalfn_oid;	/* may be InvalidOid */
110 
111 	/*
112 	 * fmgr lookup data for transition functions --- only valid when
113 	 * corresponding oid is not InvalidOid.  Note in particular that fn_strict
114 	 * flags are kept here.
115 	 */
116 	FmgrInfo	transfn;
117 	FmgrInfo	invtransfn;
118 	FmgrInfo	finalfn;
119 
120 	int			numFinalArgs;	/* number of arguments to pass to finalfn */
121 
122 	/*
123 	 * initial value from pg_aggregate entry
124 	 */
125 	Datum		initValue;
126 	bool		initValueIsNull;
127 
128 	/*
129 	 * cached value for current frame boundaries
130 	 */
131 	Datum		resultValue;
132 	bool		resultValueIsNull;
133 
134 	/*
135 	 * We need the len and byval info for the agg's input, result, and
136 	 * transition data types in order to know how to copy/delete values.
137 	 */
138 	int16		inputtypeLen,
139 				resulttypeLen,
140 				transtypeLen;
141 	bool		inputtypeByVal,
142 				resulttypeByVal,
143 				transtypeByVal;
144 
145 	int			wfuncno;		/* index of associated PerFuncData */
146 
147 	/* Context holding transition value and possibly other subsidiary data */
148 	MemoryContext aggcontext;	/* may be private, or winstate->aggcontext */
149 
150 	/* Current transition value */
151 	Datum		transValue;		/* current transition value */
152 	bool		transValueIsNull;
153 
154 	int64		transValueCount;	/* number of currently-aggregated rows */
155 
156 	/* Data local to eval_windowaggregates() */
157 	bool		restart;		/* need to restart this agg in this cycle? */
158 } WindowStatePerAggData;
159 
160 static void initialize_windowaggregate(WindowAggState *winstate,
161 									   WindowStatePerFunc perfuncstate,
162 									   WindowStatePerAgg peraggstate);
163 static void advance_windowaggregate(WindowAggState *winstate,
164 									WindowStatePerFunc perfuncstate,
165 									WindowStatePerAgg peraggstate);
166 static bool advance_windowaggregate_base(WindowAggState *winstate,
167 										 WindowStatePerFunc perfuncstate,
168 										 WindowStatePerAgg peraggstate);
169 static void finalize_windowaggregate(WindowAggState *winstate,
170 									 WindowStatePerFunc perfuncstate,
171 									 WindowStatePerAgg peraggstate,
172 									 Datum *result, bool *isnull);
173 
174 static void eval_windowaggregates(WindowAggState *winstate);
175 static void eval_windowfunction(WindowAggState *winstate,
176 								WindowStatePerFunc perfuncstate,
177 								Datum *result, bool *isnull);
178 
179 static void begin_partition(WindowAggState *winstate);
180 static void spool_tuples(WindowAggState *winstate, int64 pos);
181 static void release_partition(WindowAggState *winstate);
182 
183 static int	row_is_in_frame(WindowAggState *winstate, int64 pos,
184 							TupleTableSlot *slot);
185 static void update_frameheadpos(WindowAggState *winstate);
186 static void update_frametailpos(WindowAggState *winstate);
187 static void update_grouptailpos(WindowAggState *winstate);
188 
189 static WindowStatePerAggData *initialize_peragg(WindowAggState *winstate,
190 												WindowFunc *wfunc,
191 												WindowStatePerAgg peraggstate);
192 static Datum GetAggInitVal(Datum textInitVal, Oid transtype);
193 
194 static bool are_peers(WindowAggState *winstate, TupleTableSlot *slot1,
195 					  TupleTableSlot *slot2);
196 static bool window_gettupleslot(WindowObject winobj, int64 pos,
197 								TupleTableSlot *slot);
198 
199 
200 /*
201  * initialize_windowaggregate
202  * parallel to initialize_aggregates in nodeAgg.c
203  */
204 static void
initialize_windowaggregate(WindowAggState * winstate,WindowStatePerFunc perfuncstate,WindowStatePerAgg peraggstate)205 initialize_windowaggregate(WindowAggState *winstate,
206 						   WindowStatePerFunc perfuncstate,
207 						   WindowStatePerAgg peraggstate)
208 {
209 	MemoryContext oldContext;
210 
211 	/*
212 	 * If we're using a private aggcontext, we may reset it here.  But if the
213 	 * context is shared, we don't know which other aggregates may still need
214 	 * it, so we must leave it to the caller to reset at an appropriate time.
215 	 */
216 	if (peraggstate->aggcontext != winstate->aggcontext)
217 		MemoryContextResetAndDeleteChildren(peraggstate->aggcontext);
218 
219 	if (peraggstate->initValueIsNull)
220 		peraggstate->transValue = peraggstate->initValue;
221 	else
222 	{
223 		oldContext = MemoryContextSwitchTo(peraggstate->aggcontext);
224 		peraggstate->transValue = datumCopy(peraggstate->initValue,
225 											peraggstate->transtypeByVal,
226 											peraggstate->transtypeLen);
227 		MemoryContextSwitchTo(oldContext);
228 	}
229 	peraggstate->transValueIsNull = peraggstate->initValueIsNull;
230 	peraggstate->transValueCount = 0;
231 	peraggstate->resultValue = (Datum) 0;
232 	peraggstate->resultValueIsNull = true;
233 }
234 
235 /*
236  * advance_windowaggregate
237  * parallel to advance_aggregates in nodeAgg.c
238  */
239 static void
advance_windowaggregate(WindowAggState * winstate,WindowStatePerFunc perfuncstate,WindowStatePerAgg peraggstate)240 advance_windowaggregate(WindowAggState *winstate,
241 						WindowStatePerFunc perfuncstate,
242 						WindowStatePerAgg peraggstate)
243 {
244 	LOCAL_FCINFO(fcinfo, FUNC_MAX_ARGS);
245 	WindowFuncExprState *wfuncstate = perfuncstate->wfuncstate;
246 	int			numArguments = perfuncstate->numArguments;
247 	Datum		newVal;
248 	ListCell   *arg;
249 	int			i;
250 	MemoryContext oldContext;
251 	ExprContext *econtext = winstate->tmpcontext;
252 	ExprState  *filter = wfuncstate->aggfilter;
253 
254 	oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
255 
256 	/* Skip anything FILTERed out */
257 	if (filter)
258 	{
259 		bool		isnull;
260 		Datum		res = ExecEvalExpr(filter, econtext, &isnull);
261 
262 		if (isnull || !DatumGetBool(res))
263 		{
264 			MemoryContextSwitchTo(oldContext);
265 			return;
266 		}
267 	}
268 
269 	/* We start from 1, since the 0th arg will be the transition value */
270 	i = 1;
271 	foreach(arg, wfuncstate->args)
272 	{
273 		ExprState  *argstate = (ExprState *) lfirst(arg);
274 
275 		fcinfo->args[i].value = ExecEvalExpr(argstate, econtext,
276 											 &fcinfo->args[i].isnull);
277 		i++;
278 	}
279 
280 	if (peraggstate->transfn.fn_strict)
281 	{
282 		/*
283 		 * For a strict transfn, nothing happens when there's a NULL input; we
284 		 * just keep the prior transValue.  Note transValueCount doesn't
285 		 * change either.
286 		 */
287 		for (i = 1; i <= numArguments; i++)
288 		{
289 			if (fcinfo->args[i].isnull)
290 			{
291 				MemoryContextSwitchTo(oldContext);
292 				return;
293 			}
294 		}
295 
296 		/*
297 		 * For strict transition functions with initial value NULL we use the
298 		 * first non-NULL input as the initial state.  (We already checked
299 		 * that the agg's input type is binary-compatible with its transtype,
300 		 * so straight copy here is OK.)
301 		 *
302 		 * We must copy the datum into aggcontext if it is pass-by-ref.  We do
303 		 * not need to pfree the old transValue, since it's NULL.
304 		 */
305 		if (peraggstate->transValueCount == 0 && peraggstate->transValueIsNull)
306 		{
307 			MemoryContextSwitchTo(peraggstate->aggcontext);
308 			peraggstate->transValue = datumCopy(fcinfo->args[1].value,
309 												peraggstate->transtypeByVal,
310 												peraggstate->transtypeLen);
311 			peraggstate->transValueIsNull = false;
312 			peraggstate->transValueCount = 1;
313 			MemoryContextSwitchTo(oldContext);
314 			return;
315 		}
316 
317 		if (peraggstate->transValueIsNull)
318 		{
319 			/*
320 			 * Don't call a strict function with NULL inputs.  Note it is
321 			 * possible to get here despite the above tests, if the transfn is
322 			 * strict *and* returned a NULL on a prior cycle.  If that happens
323 			 * we will propagate the NULL all the way to the end.  That can
324 			 * only happen if there's no inverse transition function, though,
325 			 * since we disallow transitions back to NULL when there is one.
326 			 */
327 			MemoryContextSwitchTo(oldContext);
328 			Assert(!OidIsValid(peraggstate->invtransfn_oid));
329 			return;
330 		}
331 	}
332 
333 	/*
334 	 * OK to call the transition function.  Set winstate->curaggcontext while
335 	 * calling it, for possible use by AggCheckCallContext.
336 	 */
337 	InitFunctionCallInfoData(*fcinfo, &(peraggstate->transfn),
338 							 numArguments + 1,
339 							 perfuncstate->winCollation,
340 							 (void *) winstate, NULL);
341 	fcinfo->args[0].value = peraggstate->transValue;
342 	fcinfo->args[0].isnull = peraggstate->transValueIsNull;
343 	winstate->curaggcontext = peraggstate->aggcontext;
344 	newVal = FunctionCallInvoke(fcinfo);
345 	winstate->curaggcontext = NULL;
346 
347 	/*
348 	 * Moving-aggregate transition functions must not return null, see
349 	 * advance_windowaggregate_base().
350 	 */
351 	if (fcinfo->isnull && OidIsValid(peraggstate->invtransfn_oid))
352 		ereport(ERROR,
353 				(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
354 				 errmsg("moving-aggregate transition function must not return null")));
355 
356 	/*
357 	 * We must track the number of rows included in transValue, since to
358 	 * remove the last input, advance_windowaggregate_base() mustn't call the
359 	 * inverse transition function, but simply reset transValue back to its
360 	 * initial value.
361 	 */
362 	peraggstate->transValueCount++;
363 
364 	/*
365 	 * If pass-by-ref datatype, must copy the new value into aggcontext and
366 	 * free the prior transValue.  But if transfn returned a pointer to its
367 	 * first input, we don't need to do anything.  Also, if transfn returned a
368 	 * pointer to a R/W expanded object that is already a child of the
369 	 * aggcontext, assume we can adopt that value without copying it.
370 	 */
371 	if (!peraggstate->transtypeByVal &&
372 		DatumGetPointer(newVal) != DatumGetPointer(peraggstate->transValue))
373 	{
374 		if (!fcinfo->isnull)
375 		{
376 			MemoryContextSwitchTo(peraggstate->aggcontext);
377 			if (DatumIsReadWriteExpandedObject(newVal,
378 											   false,
379 											   peraggstate->transtypeLen) &&
380 				MemoryContextGetParent(DatumGetEOHP(newVal)->eoh_context) == CurrentMemoryContext)
381 				 /* do nothing */ ;
382 			else
383 				newVal = datumCopy(newVal,
384 								   peraggstate->transtypeByVal,
385 								   peraggstate->transtypeLen);
386 		}
387 		if (!peraggstate->transValueIsNull)
388 		{
389 			if (DatumIsReadWriteExpandedObject(peraggstate->transValue,
390 											   false,
391 											   peraggstate->transtypeLen))
392 				DeleteExpandedObject(peraggstate->transValue);
393 			else
394 				pfree(DatumGetPointer(peraggstate->transValue));
395 		}
396 	}
397 
398 	MemoryContextSwitchTo(oldContext);
399 	peraggstate->transValue = newVal;
400 	peraggstate->transValueIsNull = fcinfo->isnull;
401 }
402 
403 /*
404  * advance_windowaggregate_base
405  * Remove the oldest tuple from an aggregation.
406  *
407  * This is very much like advance_windowaggregate, except that we will call
408  * the inverse transition function (which caller must have checked is
409  * available).
410  *
411  * Returns true if we successfully removed the current row from this
412  * aggregate, false if not (in the latter case, caller is responsible
413  * for cleaning up by restarting the aggregation).
414  */
415 static bool
advance_windowaggregate_base(WindowAggState * winstate,WindowStatePerFunc perfuncstate,WindowStatePerAgg peraggstate)416 advance_windowaggregate_base(WindowAggState *winstate,
417 							 WindowStatePerFunc perfuncstate,
418 							 WindowStatePerAgg peraggstate)
419 {
420 	LOCAL_FCINFO(fcinfo, FUNC_MAX_ARGS);
421 	WindowFuncExprState *wfuncstate = perfuncstate->wfuncstate;
422 	int			numArguments = perfuncstate->numArguments;
423 	Datum		newVal;
424 	ListCell   *arg;
425 	int			i;
426 	MemoryContext oldContext;
427 	ExprContext *econtext = winstate->tmpcontext;
428 	ExprState  *filter = wfuncstate->aggfilter;
429 
430 	oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
431 
432 	/* Skip anything FILTERed out */
433 	if (filter)
434 	{
435 		bool		isnull;
436 		Datum		res = ExecEvalExpr(filter, econtext, &isnull);
437 
438 		if (isnull || !DatumGetBool(res))
439 		{
440 			MemoryContextSwitchTo(oldContext);
441 			return true;
442 		}
443 	}
444 
445 	/* We start from 1, since the 0th arg will be the transition value */
446 	i = 1;
447 	foreach(arg, wfuncstate->args)
448 	{
449 		ExprState  *argstate = (ExprState *) lfirst(arg);
450 
451 		fcinfo->args[i].value = ExecEvalExpr(argstate, econtext,
452 											 &fcinfo->args[i].isnull);
453 		i++;
454 	}
455 
456 	if (peraggstate->invtransfn.fn_strict)
457 	{
458 		/*
459 		 * For a strict (inv)transfn, nothing happens when there's a NULL
460 		 * input; we just keep the prior transValue.  Note transValueCount
461 		 * doesn't change either.
462 		 */
463 		for (i = 1; i <= numArguments; i++)
464 		{
465 			if (fcinfo->args[i].isnull)
466 			{
467 				MemoryContextSwitchTo(oldContext);
468 				return true;
469 			}
470 		}
471 	}
472 
473 	/* There should still be an added but not yet removed value */
474 	Assert(peraggstate->transValueCount > 0);
475 
476 	/*
477 	 * In moving-aggregate mode, the state must never be NULL, except possibly
478 	 * before any rows have been aggregated (which is surely not the case at
479 	 * this point).  This restriction allows us to interpret a NULL result
480 	 * from the inverse function as meaning "sorry, can't do an inverse
481 	 * transition in this case".  We already checked this in
482 	 * advance_windowaggregate, but just for safety, check again.
483 	 */
484 	if (peraggstate->transValueIsNull)
485 		elog(ERROR, "aggregate transition value is NULL before inverse transition");
486 
487 	/*
488 	 * We mustn't use the inverse transition function to remove the last
489 	 * input.  Doing so would yield a non-NULL state, whereas we should be in
490 	 * the initial state afterwards which may very well be NULL.  So instead,
491 	 * we simply re-initialize the aggregate in this case.
492 	 */
493 	if (peraggstate->transValueCount == 1)
494 	{
495 		MemoryContextSwitchTo(oldContext);
496 		initialize_windowaggregate(winstate,
497 								   &winstate->perfunc[peraggstate->wfuncno],
498 								   peraggstate);
499 		return true;
500 	}
501 
502 	/*
503 	 * OK to call the inverse transition function.  Set
504 	 * winstate->curaggcontext while calling it, for possible use by
505 	 * AggCheckCallContext.
506 	 */
507 	InitFunctionCallInfoData(*fcinfo, &(peraggstate->invtransfn),
508 							 numArguments + 1,
509 							 perfuncstate->winCollation,
510 							 (void *) winstate, NULL);
511 	fcinfo->args[0].value = peraggstate->transValue;
512 	fcinfo->args[0].isnull = peraggstate->transValueIsNull;
513 	winstate->curaggcontext = peraggstate->aggcontext;
514 	newVal = FunctionCallInvoke(fcinfo);
515 	winstate->curaggcontext = NULL;
516 
517 	/*
518 	 * If the function returns NULL, report failure, forcing a restart.
519 	 */
520 	if (fcinfo->isnull)
521 	{
522 		MemoryContextSwitchTo(oldContext);
523 		return false;
524 	}
525 
526 	/* Update number of rows included in transValue */
527 	peraggstate->transValueCount--;
528 
529 	/*
530 	 * If pass-by-ref datatype, must copy the new value into aggcontext and
531 	 * free the prior transValue.  But if invtransfn returned a pointer to its
532 	 * first input, we don't need to do anything.  Also, if invtransfn
533 	 * returned a pointer to a R/W expanded object that is already a child of
534 	 * the aggcontext, assume we can adopt that value without copying it.
535 	 *
536 	 * Note: the checks for null values here will never fire, but it seems
537 	 * best to have this stanza look just like advance_windowaggregate.
538 	 */
539 	if (!peraggstate->transtypeByVal &&
540 		DatumGetPointer(newVal) != DatumGetPointer(peraggstate->transValue))
541 	{
542 		if (!fcinfo->isnull)
543 		{
544 			MemoryContextSwitchTo(peraggstate->aggcontext);
545 			if (DatumIsReadWriteExpandedObject(newVal,
546 											   false,
547 											   peraggstate->transtypeLen) &&
548 				MemoryContextGetParent(DatumGetEOHP(newVal)->eoh_context) == CurrentMemoryContext)
549 				 /* do nothing */ ;
550 			else
551 				newVal = datumCopy(newVal,
552 								   peraggstate->transtypeByVal,
553 								   peraggstate->transtypeLen);
554 		}
555 		if (!peraggstate->transValueIsNull)
556 		{
557 			if (DatumIsReadWriteExpandedObject(peraggstate->transValue,
558 											   false,
559 											   peraggstate->transtypeLen))
560 				DeleteExpandedObject(peraggstate->transValue);
561 			else
562 				pfree(DatumGetPointer(peraggstate->transValue));
563 		}
564 	}
565 
566 	MemoryContextSwitchTo(oldContext);
567 	peraggstate->transValue = newVal;
568 	peraggstate->transValueIsNull = fcinfo->isnull;
569 
570 	return true;
571 }
572 
573 /*
574  * finalize_windowaggregate
575  * parallel to finalize_aggregate in nodeAgg.c
576  */
577 static void
finalize_windowaggregate(WindowAggState * winstate,WindowStatePerFunc perfuncstate,WindowStatePerAgg peraggstate,Datum * result,bool * isnull)578 finalize_windowaggregate(WindowAggState *winstate,
579 						 WindowStatePerFunc perfuncstate,
580 						 WindowStatePerAgg peraggstate,
581 						 Datum *result, bool *isnull)
582 {
583 	MemoryContext oldContext;
584 
585 	oldContext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
586 
587 	/*
588 	 * Apply the agg's finalfn if one is provided, else return transValue.
589 	 */
590 	if (OidIsValid(peraggstate->finalfn_oid))
591 	{
592 		LOCAL_FCINFO(fcinfo, FUNC_MAX_ARGS);
593 		int			numFinalArgs = peraggstate->numFinalArgs;
594 		bool		anynull;
595 		int			i;
596 
597 		InitFunctionCallInfoData(fcinfodata.fcinfo, &(peraggstate->finalfn),
598 								 numFinalArgs,
599 								 perfuncstate->winCollation,
600 								 (void *) winstate, NULL);
601 		fcinfo->args[0].value =
602 			MakeExpandedObjectReadOnly(peraggstate->transValue,
603 									   peraggstate->transValueIsNull,
604 									   peraggstate->transtypeLen);
605 		fcinfo->args[0].isnull = peraggstate->transValueIsNull;
606 		anynull = peraggstate->transValueIsNull;
607 
608 		/* Fill any remaining argument positions with nulls */
609 		for (i = 1; i < numFinalArgs; i++)
610 		{
611 			fcinfo->args[i].value = (Datum) 0;
612 			fcinfo->args[i].isnull = true;
613 			anynull = true;
614 		}
615 
616 		if (fcinfo->flinfo->fn_strict && anynull)
617 		{
618 			/* don't call a strict function with NULL inputs */
619 			*result = (Datum) 0;
620 			*isnull = true;
621 		}
622 		else
623 		{
624 			winstate->curaggcontext = peraggstate->aggcontext;
625 			*result = FunctionCallInvoke(fcinfo);
626 			winstate->curaggcontext = NULL;
627 			*isnull = fcinfo->isnull;
628 		}
629 	}
630 	else
631 	{
632 		/* Don't need MakeExpandedObjectReadOnly; datumCopy will copy it */
633 		*result = peraggstate->transValue;
634 		*isnull = peraggstate->transValueIsNull;
635 	}
636 
637 	/*
638 	 * If result is pass-by-ref, make sure it is in the right context.
639 	 */
640 	if (!peraggstate->resulttypeByVal && !*isnull &&
641 		!MemoryContextContains(CurrentMemoryContext,
642 							   DatumGetPointer(*result)))
643 		*result = datumCopy(*result,
644 							peraggstate->resulttypeByVal,
645 							peraggstate->resulttypeLen);
646 	MemoryContextSwitchTo(oldContext);
647 }
648 
649 /*
650  * eval_windowaggregates
651  * evaluate plain aggregates being used as window functions
652  *
653  * This differs from nodeAgg.c in two ways.  First, if the window's frame
654  * start position moves, we use the inverse transition function (if it exists)
655  * to remove rows from the transition value.  And second, we expect to be
656  * able to call aggregate final functions repeatedly after aggregating more
657  * data onto the same transition value.  This is not a behavior required by
658  * nodeAgg.c.
659  */
660 static void
eval_windowaggregates(WindowAggState * winstate)661 eval_windowaggregates(WindowAggState *winstate)
662 {
663 	WindowStatePerAgg peraggstate;
664 	int			wfuncno,
665 				numaggs,
666 				numaggs_restart,
667 				i;
668 	int64		aggregatedupto_nonrestarted;
669 	MemoryContext oldContext;
670 	ExprContext *econtext;
671 	WindowObject agg_winobj;
672 	TupleTableSlot *agg_row_slot;
673 	TupleTableSlot *temp_slot;
674 
675 	numaggs = winstate->numaggs;
676 	if (numaggs == 0)
677 		return;					/* nothing to do */
678 
679 	/* final output execution is in ps_ExprContext */
680 	econtext = winstate->ss.ps.ps_ExprContext;
681 	agg_winobj = winstate->agg_winobj;
682 	agg_row_slot = winstate->agg_row_slot;
683 	temp_slot = winstate->temp_slot_1;
684 
685 	/*
686 	 * If the window's frame start clause is UNBOUNDED_PRECEDING and no
687 	 * exclusion clause is specified, then the window frame consists of a
688 	 * contiguous group of rows extending forward from the start of the
689 	 * partition, and rows only enter the frame, never exit it, as the current
690 	 * row advances forward.  This makes it possible to use an incremental
691 	 * strategy for evaluating aggregates: we run the transition function for
692 	 * each row added to the frame, and run the final function whenever we
693 	 * need the current aggregate value.  This is considerably more efficient
694 	 * than the naive approach of re-running the entire aggregate calculation
695 	 * for each current row.  It does assume that the final function doesn't
696 	 * damage the running transition value, but we have the same assumption in
697 	 * nodeAgg.c too (when it rescans an existing hash table).
698 	 *
699 	 * If the frame start does sometimes move, we can still optimize as above
700 	 * whenever successive rows share the same frame head, but if the frame
701 	 * head moves beyond the previous head we try to remove those rows using
702 	 * the aggregate's inverse transition function.  This function restores
703 	 * the aggregate's current state to what it would be if the removed row
704 	 * had never been aggregated in the first place.  Inverse transition
705 	 * functions may optionally return NULL, indicating that the function was
706 	 * unable to remove the tuple from aggregation.  If this happens, or if
707 	 * the aggregate doesn't have an inverse transition function at all, we
708 	 * must perform the aggregation all over again for all tuples within the
709 	 * new frame boundaries.
710 	 *
711 	 * If there's any exclusion clause, then we may have to aggregate over a
712 	 * non-contiguous set of rows, so we punt and recalculate for every row.
713 	 * (For some frame end choices, it might be that the frame is always
714 	 * contiguous anyway, but that's an optimization to investigate later.)
715 	 *
716 	 * In many common cases, multiple rows share the same frame and hence the
717 	 * same aggregate value. (In particular, if there's no ORDER BY in a RANGE
718 	 * window, then all rows are peers and so they all have window frame equal
719 	 * to the whole partition.)  We optimize such cases by calculating the
720 	 * aggregate value once when we reach the first row of a peer group, and
721 	 * then returning the saved value for all subsequent rows.
722 	 *
723 	 * 'aggregatedupto' keeps track of the first row that has not yet been
724 	 * accumulated into the aggregate transition values.  Whenever we start a
725 	 * new peer group, we accumulate forward to the end of the peer group.
726 	 */
727 
728 	/*
729 	 * First, update the frame head position.
730 	 *
731 	 * The frame head should never move backwards, and the code below wouldn't
732 	 * cope if it did, so for safety we complain if it does.
733 	 */
734 	update_frameheadpos(winstate);
735 	if (winstate->frameheadpos < winstate->aggregatedbase)
736 		elog(ERROR, "window frame head moved backward");
737 
738 	/*
739 	 * If the frame didn't change compared to the previous row, we can re-use
740 	 * the result values that were previously saved at the bottom of this
741 	 * function.  Since we don't know the current frame's end yet, this is not
742 	 * possible to check for fully.  But if the frame end mode is UNBOUNDED
743 	 * FOLLOWING or CURRENT ROW, no exclusion clause is specified, and the
744 	 * current row lies within the previous row's frame, then the two frames'
745 	 * ends must coincide.  Note that on the first row aggregatedbase ==
746 	 * aggregatedupto, meaning this test must fail, so we don't need to check
747 	 * the "there was no previous row" case explicitly here.
748 	 */
749 	if (winstate->aggregatedbase == winstate->frameheadpos &&
750 		(winstate->frameOptions & (FRAMEOPTION_END_UNBOUNDED_FOLLOWING |
751 								   FRAMEOPTION_END_CURRENT_ROW)) &&
752 		!(winstate->frameOptions & FRAMEOPTION_EXCLUSION) &&
753 		winstate->aggregatedbase <= winstate->currentpos &&
754 		winstate->aggregatedupto > winstate->currentpos)
755 	{
756 		for (i = 0; i < numaggs; i++)
757 		{
758 			peraggstate = &winstate->peragg[i];
759 			wfuncno = peraggstate->wfuncno;
760 			econtext->ecxt_aggvalues[wfuncno] = peraggstate->resultValue;
761 			econtext->ecxt_aggnulls[wfuncno] = peraggstate->resultValueIsNull;
762 		}
763 		return;
764 	}
765 
766 	/*----------
767 	 * Initialize restart flags.
768 	 *
769 	 * We restart the aggregation:
770 	 *	 - if we're processing the first row in the partition, or
771 	 *	 - if the frame's head moved and we cannot use an inverse
772 	 *	   transition function, or
773 	 *	 - we have an EXCLUSION clause, or
774 	 *	 - if the new frame doesn't overlap the old one
775 	 *
776 	 * Note that we don't strictly need to restart in the last case, but if
777 	 * we're going to remove all rows from the aggregation anyway, a restart
778 	 * surely is faster.
779 	 *----------
780 	 */
781 	numaggs_restart = 0;
782 	for (i = 0; i < numaggs; i++)
783 	{
784 		peraggstate = &winstate->peragg[i];
785 		if (winstate->currentpos == 0 ||
786 			(winstate->aggregatedbase != winstate->frameheadpos &&
787 			 !OidIsValid(peraggstate->invtransfn_oid)) ||
788 			(winstate->frameOptions & FRAMEOPTION_EXCLUSION) ||
789 			winstate->aggregatedupto <= winstate->frameheadpos)
790 		{
791 			peraggstate->restart = true;
792 			numaggs_restart++;
793 		}
794 		else
795 			peraggstate->restart = false;
796 	}
797 
798 	/*
799 	 * If we have any possibly-moving aggregates, attempt to advance
800 	 * aggregatedbase to match the frame's head by removing input rows that
801 	 * fell off the top of the frame from the aggregations.  This can fail,
802 	 * i.e. advance_windowaggregate_base() can return false, in which case
803 	 * we'll restart that aggregate below.
804 	 */
805 	while (numaggs_restart < numaggs &&
806 		   winstate->aggregatedbase < winstate->frameheadpos)
807 	{
808 		/*
809 		 * Fetch the next tuple of those being removed. This should never fail
810 		 * as we should have been here before.
811 		 */
812 		if (!window_gettupleslot(agg_winobj, winstate->aggregatedbase,
813 								 temp_slot))
814 			elog(ERROR, "could not re-fetch previously fetched frame row");
815 
816 		/* Set tuple context for evaluation of aggregate arguments */
817 		winstate->tmpcontext->ecxt_outertuple = temp_slot;
818 
819 		/*
820 		 * Perform the inverse transition for each aggregate function in the
821 		 * window, unless it has already been marked as needing a restart.
822 		 */
823 		for (i = 0; i < numaggs; i++)
824 		{
825 			bool		ok;
826 
827 			peraggstate = &winstate->peragg[i];
828 			if (peraggstate->restart)
829 				continue;
830 
831 			wfuncno = peraggstate->wfuncno;
832 			ok = advance_windowaggregate_base(winstate,
833 											  &winstate->perfunc[wfuncno],
834 											  peraggstate);
835 			if (!ok)
836 			{
837 				/* Inverse transition function has failed, must restart */
838 				peraggstate->restart = true;
839 				numaggs_restart++;
840 			}
841 		}
842 
843 		/* Reset per-input-tuple context after each tuple */
844 		ResetExprContext(winstate->tmpcontext);
845 
846 		/* And advance the aggregated-row state */
847 		winstate->aggregatedbase++;
848 		ExecClearTuple(temp_slot);
849 	}
850 
851 	/*
852 	 * If we successfully advanced the base rows of all the aggregates,
853 	 * aggregatedbase now equals frameheadpos; but if we failed for any, we
854 	 * must forcibly update aggregatedbase.
855 	 */
856 	winstate->aggregatedbase = winstate->frameheadpos;
857 
858 	/*
859 	 * If we created a mark pointer for aggregates, keep it pushed up to frame
860 	 * head, so that tuplestore can discard unnecessary rows.
861 	 */
862 	if (agg_winobj->markptr >= 0)
863 		WinSetMarkPosition(agg_winobj, winstate->frameheadpos);
864 
865 	/*
866 	 * Now restart the aggregates that require it.
867 	 *
868 	 * We assume that aggregates using the shared context always restart if
869 	 * *any* aggregate restarts, and we may thus clean up the shared
870 	 * aggcontext if that is the case.  Private aggcontexts are reset by
871 	 * initialize_windowaggregate() if their owning aggregate restarts. If we
872 	 * aren't restarting an aggregate, we need to free any previously saved
873 	 * result for it, else we'll leak memory.
874 	 */
875 	if (numaggs_restart > 0)
876 		MemoryContextResetAndDeleteChildren(winstate->aggcontext);
877 	for (i = 0; i < numaggs; i++)
878 	{
879 		peraggstate = &winstate->peragg[i];
880 
881 		/* Aggregates using the shared ctx must restart if *any* agg does */
882 		Assert(peraggstate->aggcontext != winstate->aggcontext ||
883 			   numaggs_restart == 0 ||
884 			   peraggstate->restart);
885 
886 		if (peraggstate->restart)
887 		{
888 			wfuncno = peraggstate->wfuncno;
889 			initialize_windowaggregate(winstate,
890 									   &winstate->perfunc[wfuncno],
891 									   peraggstate);
892 		}
893 		else if (!peraggstate->resultValueIsNull)
894 		{
895 			if (!peraggstate->resulttypeByVal)
896 				pfree(DatumGetPointer(peraggstate->resultValue));
897 			peraggstate->resultValue = (Datum) 0;
898 			peraggstate->resultValueIsNull = true;
899 		}
900 	}
901 
902 	/*
903 	 * Non-restarted aggregates now contain the rows between aggregatedbase
904 	 * (i.e., frameheadpos) and aggregatedupto, while restarted aggregates
905 	 * contain no rows.  If there are any restarted aggregates, we must thus
906 	 * begin aggregating anew at frameheadpos, otherwise we may simply
907 	 * continue at aggregatedupto.  We must remember the old value of
908 	 * aggregatedupto to know how long to skip advancing non-restarted
909 	 * aggregates.  If we modify aggregatedupto, we must also clear
910 	 * agg_row_slot, per the loop invariant below.
911 	 */
912 	aggregatedupto_nonrestarted = winstate->aggregatedupto;
913 	if (numaggs_restart > 0 &&
914 		winstate->aggregatedupto != winstate->frameheadpos)
915 	{
916 		winstate->aggregatedupto = winstate->frameheadpos;
917 		ExecClearTuple(agg_row_slot);
918 	}
919 
920 	/*
921 	 * Advance until we reach a row not in frame (or end of partition).
922 	 *
923 	 * Note the loop invariant: agg_row_slot is either empty or holds the row
924 	 * at position aggregatedupto.  We advance aggregatedupto after processing
925 	 * a row.
926 	 */
927 	for (;;)
928 	{
929 		int			ret;
930 
931 		/* Fetch next row if we didn't already */
932 		if (TupIsNull(agg_row_slot))
933 		{
934 			if (!window_gettupleslot(agg_winobj, winstate->aggregatedupto,
935 									 agg_row_slot))
936 				break;			/* must be end of partition */
937 		}
938 
939 		/*
940 		 * Exit loop if no more rows can be in frame.  Skip aggregation if
941 		 * current row is not in frame but there might be more in the frame.
942 		 */
943 		ret = row_is_in_frame(winstate, winstate->aggregatedupto, agg_row_slot);
944 		if (ret < 0)
945 			break;
946 		if (ret == 0)
947 			goto next_tuple;
948 
949 		/* Set tuple context for evaluation of aggregate arguments */
950 		winstate->tmpcontext->ecxt_outertuple = agg_row_slot;
951 
952 		/* Accumulate row into the aggregates */
953 		for (i = 0; i < numaggs; i++)
954 		{
955 			peraggstate = &winstate->peragg[i];
956 
957 			/* Non-restarted aggs skip until aggregatedupto_nonrestarted */
958 			if (!peraggstate->restart &&
959 				winstate->aggregatedupto < aggregatedupto_nonrestarted)
960 				continue;
961 
962 			wfuncno = peraggstate->wfuncno;
963 			advance_windowaggregate(winstate,
964 									&winstate->perfunc[wfuncno],
965 									peraggstate);
966 		}
967 
968 next_tuple:
969 		/* Reset per-input-tuple context after each tuple */
970 		ResetExprContext(winstate->tmpcontext);
971 
972 		/* And advance the aggregated-row state */
973 		winstate->aggregatedupto++;
974 		ExecClearTuple(agg_row_slot);
975 	}
976 
977 	/* The frame's end is not supposed to move backwards, ever */
978 	Assert(aggregatedupto_nonrestarted <= winstate->aggregatedupto);
979 
980 	/*
981 	 * finalize aggregates and fill result/isnull fields.
982 	 */
983 	for (i = 0; i < numaggs; i++)
984 	{
985 		Datum	   *result;
986 		bool	   *isnull;
987 
988 		peraggstate = &winstate->peragg[i];
989 		wfuncno = peraggstate->wfuncno;
990 		result = &econtext->ecxt_aggvalues[wfuncno];
991 		isnull = &econtext->ecxt_aggnulls[wfuncno];
992 		finalize_windowaggregate(winstate,
993 								 &winstate->perfunc[wfuncno],
994 								 peraggstate,
995 								 result, isnull);
996 
997 		/*
998 		 * save the result in case next row shares the same frame.
999 		 *
1000 		 * XXX in some framing modes, eg ROWS/END_CURRENT_ROW, we can know in
1001 		 * advance that the next row can't possibly share the same frame. Is
1002 		 * it worth detecting that and skipping this code?
1003 		 */
1004 		if (!peraggstate->resulttypeByVal && !*isnull)
1005 		{
1006 			oldContext = MemoryContextSwitchTo(peraggstate->aggcontext);
1007 			peraggstate->resultValue =
1008 				datumCopy(*result,
1009 						  peraggstate->resulttypeByVal,
1010 						  peraggstate->resulttypeLen);
1011 			MemoryContextSwitchTo(oldContext);
1012 		}
1013 		else
1014 		{
1015 			peraggstate->resultValue = *result;
1016 		}
1017 		peraggstate->resultValueIsNull = *isnull;
1018 	}
1019 }
1020 
1021 /*
1022  * eval_windowfunction
1023  *
1024  * Arguments of window functions are not evaluated here, because a window
1025  * function can need random access to arbitrary rows in the partition.
1026  * The window function uses the special WinGetFuncArgInPartition and
1027  * WinGetFuncArgInFrame functions to evaluate the arguments for the rows
1028  * it wants.
1029  */
1030 static void
eval_windowfunction(WindowAggState * winstate,WindowStatePerFunc perfuncstate,Datum * result,bool * isnull)1031 eval_windowfunction(WindowAggState *winstate, WindowStatePerFunc perfuncstate,
1032 					Datum *result, bool *isnull)
1033 {
1034 	LOCAL_FCINFO(fcinfo, FUNC_MAX_ARGS);
1035 	MemoryContext oldContext;
1036 
1037 	oldContext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
1038 
1039 	/*
1040 	 * We don't pass any normal arguments to a window function, but we do pass
1041 	 * it the number of arguments, in order to permit window function
1042 	 * implementations to support varying numbers of arguments.  The real info
1043 	 * goes through the WindowObject, which is passed via fcinfo->context.
1044 	 */
1045 	InitFunctionCallInfoData(*fcinfo, &(perfuncstate->flinfo),
1046 							 perfuncstate->numArguments,
1047 							 perfuncstate->winCollation,
1048 							 (void *) perfuncstate->winobj, NULL);
1049 	/* Just in case, make all the regular argument slots be null */
1050 	for (int argno = 0; argno < perfuncstate->numArguments; argno++)
1051 		fcinfo->args[argno].isnull = true;
1052 	/* Window functions don't have a current aggregate context, either */
1053 	winstate->curaggcontext = NULL;
1054 
1055 	*result = FunctionCallInvoke(fcinfo);
1056 	*isnull = fcinfo->isnull;
1057 
1058 	/*
1059 	 * Make sure pass-by-ref data is allocated in the appropriate context. (We
1060 	 * need this in case the function returns a pointer into some short-lived
1061 	 * tuple, as is entirely possible.)
1062 	 */
1063 	if (!perfuncstate->resulttypeByVal && !fcinfo->isnull &&
1064 		!MemoryContextContains(CurrentMemoryContext,
1065 							   DatumGetPointer(*result)))
1066 		*result = datumCopy(*result,
1067 							perfuncstate->resulttypeByVal,
1068 							perfuncstate->resulttypeLen);
1069 
1070 	MemoryContextSwitchTo(oldContext);
1071 }
1072 
1073 /*
1074  * begin_partition
1075  * Start buffering rows of the next partition.
1076  */
1077 static void
begin_partition(WindowAggState * winstate)1078 begin_partition(WindowAggState *winstate)
1079 {
1080 	WindowAgg  *node = (WindowAgg *) winstate->ss.ps.plan;
1081 	PlanState  *outerPlan = outerPlanState(winstate);
1082 	int			frameOptions = winstate->frameOptions;
1083 	int			numfuncs = winstate->numfuncs;
1084 	int			i;
1085 
1086 	winstate->partition_spooled = false;
1087 	winstate->framehead_valid = false;
1088 	winstate->frametail_valid = false;
1089 	winstate->grouptail_valid = false;
1090 	winstate->spooled_rows = 0;
1091 	winstate->currentpos = 0;
1092 	winstate->frameheadpos = 0;
1093 	winstate->frametailpos = 0;
1094 	winstate->currentgroup = 0;
1095 	winstate->frameheadgroup = 0;
1096 	winstate->frametailgroup = 0;
1097 	winstate->groupheadpos = 0;
1098 	winstate->grouptailpos = -1;	/* see update_grouptailpos */
1099 	ExecClearTuple(winstate->agg_row_slot);
1100 	if (winstate->framehead_slot)
1101 		ExecClearTuple(winstate->framehead_slot);
1102 	if (winstate->frametail_slot)
1103 		ExecClearTuple(winstate->frametail_slot);
1104 
1105 	/*
1106 	 * If this is the very first partition, we need to fetch the first input
1107 	 * row to store in first_part_slot.
1108 	 */
1109 	if (TupIsNull(winstate->first_part_slot))
1110 	{
1111 		TupleTableSlot *outerslot = ExecProcNode(outerPlan);
1112 
1113 		if (!TupIsNull(outerslot))
1114 			ExecCopySlot(winstate->first_part_slot, outerslot);
1115 		else
1116 		{
1117 			/* outer plan is empty, so we have nothing to do */
1118 			winstate->partition_spooled = true;
1119 			winstate->more_partitions = false;
1120 			return;
1121 		}
1122 	}
1123 
1124 	/* Create new tuplestore for this partition */
1125 	winstate->buffer = tuplestore_begin_heap(false, false, work_mem);
1126 
1127 	/*
1128 	 * Set up read pointers for the tuplestore.  The current pointer doesn't
1129 	 * need BACKWARD capability, but the per-window-function read pointers do,
1130 	 * and the aggregate pointer does if we might need to restart aggregation.
1131 	 */
1132 	winstate->current_ptr = 0;	/* read pointer 0 is pre-allocated */
1133 
1134 	/* reset default REWIND capability bit for current ptr */
1135 	tuplestore_set_eflags(winstate->buffer, 0);
1136 
1137 	/* create read pointers for aggregates, if needed */
1138 	if (winstate->numaggs > 0)
1139 	{
1140 		WindowObject agg_winobj = winstate->agg_winobj;
1141 		int			readptr_flags = 0;
1142 
1143 		/*
1144 		 * If the frame head is potentially movable, or we have an EXCLUSION
1145 		 * clause, we might need to restart aggregation ...
1146 		 */
1147 		if (!(frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING) ||
1148 			(frameOptions & FRAMEOPTION_EXCLUSION))
1149 		{
1150 			/* ... so create a mark pointer to track the frame head */
1151 			agg_winobj->markptr = tuplestore_alloc_read_pointer(winstate->buffer, 0);
1152 			/* and the read pointer will need BACKWARD capability */
1153 			readptr_flags |= EXEC_FLAG_BACKWARD;
1154 		}
1155 
1156 		agg_winobj->readptr = tuplestore_alloc_read_pointer(winstate->buffer,
1157 															readptr_flags);
1158 		agg_winobj->markpos = -1;
1159 		agg_winobj->seekpos = -1;
1160 
1161 		/* Also reset the row counters for aggregates */
1162 		winstate->aggregatedbase = 0;
1163 		winstate->aggregatedupto = 0;
1164 	}
1165 
1166 	/* create mark and read pointers for each real window function */
1167 	for (i = 0; i < numfuncs; i++)
1168 	{
1169 		WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
1170 
1171 		if (!perfuncstate->plain_agg)
1172 		{
1173 			WindowObject winobj = perfuncstate->winobj;
1174 
1175 			winobj->markptr = tuplestore_alloc_read_pointer(winstate->buffer,
1176 															0);
1177 			winobj->readptr = tuplestore_alloc_read_pointer(winstate->buffer,
1178 															EXEC_FLAG_BACKWARD);
1179 			winobj->markpos = -1;
1180 			winobj->seekpos = -1;
1181 		}
1182 	}
1183 
1184 	/*
1185 	 * If we are in RANGE or GROUPS mode, then determining frame boundaries
1186 	 * requires physical access to the frame endpoint rows, except in certain
1187 	 * degenerate cases.  We create read pointers to point to those rows, to
1188 	 * simplify access and ensure that the tuplestore doesn't discard the
1189 	 * endpoint rows prematurely.  (Must create pointers in exactly the same
1190 	 * cases that update_frameheadpos and update_frametailpos need them.)
1191 	 */
1192 	winstate->framehead_ptr = winstate->frametail_ptr = -1; /* if not used */
1193 
1194 	if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS))
1195 	{
1196 		if (((frameOptions & FRAMEOPTION_START_CURRENT_ROW) &&
1197 			 node->ordNumCols != 0) ||
1198 			(frameOptions & FRAMEOPTION_START_OFFSET))
1199 			winstate->framehead_ptr =
1200 				tuplestore_alloc_read_pointer(winstate->buffer, 0);
1201 		if (((frameOptions & FRAMEOPTION_END_CURRENT_ROW) &&
1202 			 node->ordNumCols != 0) ||
1203 			(frameOptions & FRAMEOPTION_END_OFFSET))
1204 			winstate->frametail_ptr =
1205 				tuplestore_alloc_read_pointer(winstate->buffer, 0);
1206 	}
1207 
1208 	/*
1209 	 * If we have an exclusion clause that requires knowing the boundaries of
1210 	 * the current row's peer group, we create a read pointer to track the
1211 	 * tail position of the peer group (i.e., first row of the next peer
1212 	 * group).  The head position does not require its own pointer because we
1213 	 * maintain that as a side effect of advancing the current row.
1214 	 */
1215 	winstate->grouptail_ptr = -1;
1216 
1217 	if ((frameOptions & (FRAMEOPTION_EXCLUDE_GROUP |
1218 						 FRAMEOPTION_EXCLUDE_TIES)) &&
1219 		node->ordNumCols != 0)
1220 	{
1221 		winstate->grouptail_ptr =
1222 			tuplestore_alloc_read_pointer(winstate->buffer, 0);
1223 	}
1224 
1225 	/*
1226 	 * Store the first tuple into the tuplestore (it's always available now;
1227 	 * we either read it above, or saved it at the end of previous partition)
1228 	 */
1229 	tuplestore_puttupleslot(winstate->buffer, winstate->first_part_slot);
1230 	winstate->spooled_rows++;
1231 }
1232 
1233 /*
1234  * Read tuples from the outer node, up to and including position 'pos', and
1235  * store them into the tuplestore. If pos is -1, reads the whole partition.
1236  */
1237 static void
spool_tuples(WindowAggState * winstate,int64 pos)1238 spool_tuples(WindowAggState *winstate, int64 pos)
1239 {
1240 	WindowAgg  *node = (WindowAgg *) winstate->ss.ps.plan;
1241 	PlanState  *outerPlan;
1242 	TupleTableSlot *outerslot;
1243 	MemoryContext oldcontext;
1244 
1245 	if (!winstate->buffer)
1246 		return;					/* just a safety check */
1247 	if (winstate->partition_spooled)
1248 		return;					/* whole partition done already */
1249 
1250 	/*
1251 	 * If the tuplestore has spilled to disk, alternate reading and writing
1252 	 * becomes quite expensive due to frequent buffer flushes.  It's cheaper
1253 	 * to force the entire partition to get spooled in one go.
1254 	 *
1255 	 * XXX this is a horrid kluge --- it'd be better to fix the performance
1256 	 * problem inside tuplestore.  FIXME
1257 	 */
1258 	if (!tuplestore_in_memory(winstate->buffer))
1259 		pos = -1;
1260 
1261 	outerPlan = outerPlanState(winstate);
1262 
1263 	/* Must be in query context to call outerplan */
1264 	oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
1265 
1266 	while (winstate->spooled_rows <= pos || pos == -1)
1267 	{
1268 		outerslot = ExecProcNode(outerPlan);
1269 		if (TupIsNull(outerslot))
1270 		{
1271 			/* reached the end of the last partition */
1272 			winstate->partition_spooled = true;
1273 			winstate->more_partitions = false;
1274 			break;
1275 		}
1276 
1277 		if (node->partNumCols > 0)
1278 		{
1279 			ExprContext *econtext = winstate->tmpcontext;
1280 
1281 			econtext->ecxt_innertuple = winstate->first_part_slot;
1282 			econtext->ecxt_outertuple = outerslot;
1283 
1284 			/* Check if this tuple still belongs to the current partition */
1285 			if (!ExecQualAndReset(winstate->partEqfunction, econtext))
1286 			{
1287 				/*
1288 				 * end of partition; copy the tuple for the next cycle.
1289 				 */
1290 				ExecCopySlot(winstate->first_part_slot, outerslot);
1291 				winstate->partition_spooled = true;
1292 				winstate->more_partitions = true;
1293 				break;
1294 			}
1295 		}
1296 
1297 		/* Still in partition, so save it into the tuplestore */
1298 		tuplestore_puttupleslot(winstate->buffer, outerslot);
1299 		winstate->spooled_rows++;
1300 	}
1301 
1302 	MemoryContextSwitchTo(oldcontext);
1303 }
1304 
1305 /*
1306  * release_partition
1307  * clear information kept within a partition, including
1308  * tuplestore and aggregate results.
1309  */
1310 static void
release_partition(WindowAggState * winstate)1311 release_partition(WindowAggState *winstate)
1312 {
1313 	int			i;
1314 
1315 	for (i = 0; i < winstate->numfuncs; i++)
1316 	{
1317 		WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
1318 
1319 		/* Release any partition-local state of this window function */
1320 		if (perfuncstate->winobj)
1321 			perfuncstate->winobj->localmem = NULL;
1322 	}
1323 
1324 	/*
1325 	 * Release all partition-local memory (in particular, any partition-local
1326 	 * state that we might have trashed our pointers to in the above loop, and
1327 	 * any aggregate temp data).  We don't rely on retail pfree because some
1328 	 * aggregates might have allocated data we don't have direct pointers to.
1329 	 */
1330 	MemoryContextResetAndDeleteChildren(winstate->partcontext);
1331 	MemoryContextResetAndDeleteChildren(winstate->aggcontext);
1332 	for (i = 0; i < winstate->numaggs; i++)
1333 	{
1334 		if (winstate->peragg[i].aggcontext != winstate->aggcontext)
1335 			MemoryContextResetAndDeleteChildren(winstate->peragg[i].aggcontext);
1336 	}
1337 
1338 	if (winstate->buffer)
1339 		tuplestore_end(winstate->buffer);
1340 	winstate->buffer = NULL;
1341 	winstate->partition_spooled = false;
1342 }
1343 
1344 /*
1345  * row_is_in_frame
1346  * Determine whether a row is in the current row's window frame according
1347  * to our window framing rule
1348  *
1349  * The caller must have already determined that the row is in the partition
1350  * and fetched it into a slot.  This function just encapsulates the framing
1351  * rules.
1352  *
1353  * Returns:
1354  * -1, if the row is out of frame and no succeeding rows can be in frame
1355  * 0, if the row is out of frame but succeeding rows might be in frame
1356  * 1, if the row is in frame
1357  *
1358  * May clobber winstate->temp_slot_2.
1359  */
1360 static int
row_is_in_frame(WindowAggState * winstate,int64 pos,TupleTableSlot * slot)1361 row_is_in_frame(WindowAggState *winstate, int64 pos, TupleTableSlot *slot)
1362 {
1363 	int			frameOptions = winstate->frameOptions;
1364 
1365 	Assert(pos >= 0);			/* else caller error */
1366 
1367 	/*
1368 	 * First, check frame starting conditions.  We might as well delegate this
1369 	 * to update_frameheadpos always; it doesn't add any notable cost.
1370 	 */
1371 	update_frameheadpos(winstate);
1372 	if (pos < winstate->frameheadpos)
1373 		return 0;
1374 
1375 	/*
1376 	 * Okay so far, now check frame ending conditions.  Here, we avoid calling
1377 	 * update_frametailpos in simple cases, so as not to spool tuples further
1378 	 * ahead than necessary.
1379 	 */
1380 	if (frameOptions & FRAMEOPTION_END_CURRENT_ROW)
1381 	{
1382 		if (frameOptions & FRAMEOPTION_ROWS)
1383 		{
1384 			/* rows after current row are out of frame */
1385 			if (pos > winstate->currentpos)
1386 				return -1;
1387 		}
1388 		else if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS))
1389 		{
1390 			/* following row that is not peer is out of frame */
1391 			if (pos > winstate->currentpos &&
1392 				!are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
1393 				return -1;
1394 		}
1395 		else
1396 			Assert(false);
1397 	}
1398 	else if (frameOptions & FRAMEOPTION_END_OFFSET)
1399 	{
1400 		if (frameOptions & FRAMEOPTION_ROWS)
1401 		{
1402 			int64		offset = DatumGetInt64(winstate->endOffsetValue);
1403 
1404 			/* rows after current row + offset are out of frame */
1405 			if (frameOptions & FRAMEOPTION_END_OFFSET_PRECEDING)
1406 				offset = -offset;
1407 
1408 			if (pos > winstate->currentpos + offset)
1409 				return -1;
1410 		}
1411 		else if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS))
1412 		{
1413 			/* hard cases, so delegate to update_frametailpos */
1414 			update_frametailpos(winstate);
1415 			if (pos >= winstate->frametailpos)
1416 				return -1;
1417 		}
1418 		else
1419 			Assert(false);
1420 	}
1421 
1422 	/* Check exclusion clause */
1423 	if (frameOptions & FRAMEOPTION_EXCLUDE_CURRENT_ROW)
1424 	{
1425 		if (pos == winstate->currentpos)
1426 			return 0;
1427 	}
1428 	else if ((frameOptions & FRAMEOPTION_EXCLUDE_GROUP) ||
1429 			 ((frameOptions & FRAMEOPTION_EXCLUDE_TIES) &&
1430 			  pos != winstate->currentpos))
1431 	{
1432 		WindowAgg  *node = (WindowAgg *) winstate->ss.ps.plan;
1433 
1434 		/* If no ORDER BY, all rows are peers with each other */
1435 		if (node->ordNumCols == 0)
1436 			return 0;
1437 		/* Otherwise, check the group boundaries */
1438 		if (pos >= winstate->groupheadpos)
1439 		{
1440 			update_grouptailpos(winstate);
1441 			if (pos < winstate->grouptailpos)
1442 				return 0;
1443 		}
1444 	}
1445 
1446 	/* If we get here, it's in frame */
1447 	return 1;
1448 }
1449 
1450 /*
1451  * update_frameheadpos
1452  * make frameheadpos valid for the current row
1453  *
1454  * Note that frameheadpos is computed without regard for any window exclusion
1455  * clause; the current row and/or its peers are considered part of the frame
1456  * for this purpose even if they must be excluded later.
1457  *
1458  * May clobber winstate->temp_slot_2.
1459  */
1460 static void
update_frameheadpos(WindowAggState * winstate)1461 update_frameheadpos(WindowAggState *winstate)
1462 {
1463 	WindowAgg  *node = (WindowAgg *) winstate->ss.ps.plan;
1464 	int			frameOptions = winstate->frameOptions;
1465 	MemoryContext oldcontext;
1466 
1467 	if (winstate->framehead_valid)
1468 		return;					/* already known for current row */
1469 
1470 	/* We may be called in a short-lived context */
1471 	oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
1472 
1473 	if (frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING)
1474 	{
1475 		/* In UNBOUNDED PRECEDING mode, frame head is always row 0 */
1476 		winstate->frameheadpos = 0;
1477 		winstate->framehead_valid = true;
1478 	}
1479 	else if (frameOptions & FRAMEOPTION_START_CURRENT_ROW)
1480 	{
1481 		if (frameOptions & FRAMEOPTION_ROWS)
1482 		{
1483 			/* In ROWS mode, frame head is the same as current */
1484 			winstate->frameheadpos = winstate->currentpos;
1485 			winstate->framehead_valid = true;
1486 		}
1487 		else if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS))
1488 		{
1489 			/* If no ORDER BY, all rows are peers with each other */
1490 			if (node->ordNumCols == 0)
1491 			{
1492 				winstate->frameheadpos = 0;
1493 				winstate->framehead_valid = true;
1494 				MemoryContextSwitchTo(oldcontext);
1495 				return;
1496 			}
1497 
1498 			/*
1499 			 * In RANGE or GROUPS START_CURRENT_ROW mode, frame head is the
1500 			 * first row that is a peer of current row.  We keep a copy of the
1501 			 * last-known frame head row in framehead_slot, and advance as
1502 			 * necessary.  Note that if we reach end of partition, we will
1503 			 * leave frameheadpos = end+1 and framehead_slot empty.
1504 			 */
1505 			tuplestore_select_read_pointer(winstate->buffer,
1506 										   winstate->framehead_ptr);
1507 			if (winstate->frameheadpos == 0 &&
1508 				TupIsNull(winstate->framehead_slot))
1509 			{
1510 				/* fetch first row into framehead_slot, if we didn't already */
1511 				if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1512 											 winstate->framehead_slot))
1513 					elog(ERROR, "unexpected end of tuplestore");
1514 			}
1515 
1516 			while (!TupIsNull(winstate->framehead_slot))
1517 			{
1518 				if (are_peers(winstate, winstate->framehead_slot,
1519 							  winstate->ss.ss_ScanTupleSlot))
1520 					break;		/* this row is the correct frame head */
1521 				/* Note we advance frameheadpos even if the fetch fails */
1522 				winstate->frameheadpos++;
1523 				spool_tuples(winstate, winstate->frameheadpos);
1524 				if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1525 											 winstate->framehead_slot))
1526 					break;		/* end of partition */
1527 			}
1528 			winstate->framehead_valid = true;
1529 		}
1530 		else
1531 			Assert(false);
1532 	}
1533 	else if (frameOptions & FRAMEOPTION_START_OFFSET)
1534 	{
1535 		if (frameOptions & FRAMEOPTION_ROWS)
1536 		{
1537 			/* In ROWS mode, bound is physically n before/after current */
1538 			int64		offset = DatumGetInt64(winstate->startOffsetValue);
1539 
1540 			if (frameOptions & FRAMEOPTION_START_OFFSET_PRECEDING)
1541 				offset = -offset;
1542 
1543 			winstate->frameheadpos = winstate->currentpos + offset;
1544 			/* frame head can't go before first row */
1545 			if (winstate->frameheadpos < 0)
1546 				winstate->frameheadpos = 0;
1547 			else if (winstate->frameheadpos > winstate->currentpos + 1)
1548 			{
1549 				/* make sure frameheadpos is not past end of partition */
1550 				spool_tuples(winstate, winstate->frameheadpos - 1);
1551 				if (winstate->frameheadpos > winstate->spooled_rows)
1552 					winstate->frameheadpos = winstate->spooled_rows;
1553 			}
1554 			winstate->framehead_valid = true;
1555 		}
1556 		else if (frameOptions & FRAMEOPTION_RANGE)
1557 		{
1558 			/*
1559 			 * In RANGE START_OFFSET mode, frame head is the first row that
1560 			 * satisfies the in_range constraint relative to the current row.
1561 			 * We keep a copy of the last-known frame head row in
1562 			 * framehead_slot, and advance as necessary.  Note that if we
1563 			 * reach end of partition, we will leave frameheadpos = end+1 and
1564 			 * framehead_slot empty.
1565 			 */
1566 			int			sortCol = node->ordColIdx[0];
1567 			bool		sub,
1568 						less;
1569 
1570 			/* We must have an ordering column */
1571 			Assert(node->ordNumCols == 1);
1572 
1573 			/* Precompute flags for in_range checks */
1574 			if (frameOptions & FRAMEOPTION_START_OFFSET_PRECEDING)
1575 				sub = true;		/* subtract startOffset from current row */
1576 			else
1577 				sub = false;	/* add it */
1578 			less = false;		/* normally, we want frame head >= sum */
1579 			/* If sort order is descending, flip both flags */
1580 			if (!winstate->inRangeAsc)
1581 			{
1582 				sub = !sub;
1583 				less = true;
1584 			}
1585 
1586 			tuplestore_select_read_pointer(winstate->buffer,
1587 										   winstate->framehead_ptr);
1588 			if (winstate->frameheadpos == 0 &&
1589 				TupIsNull(winstate->framehead_slot))
1590 			{
1591 				/* fetch first row into framehead_slot, if we didn't already */
1592 				if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1593 											 winstate->framehead_slot))
1594 					elog(ERROR, "unexpected end of tuplestore");
1595 			}
1596 
1597 			while (!TupIsNull(winstate->framehead_slot))
1598 			{
1599 				Datum		headval,
1600 							currval;
1601 				bool		headisnull,
1602 							currisnull;
1603 
1604 				headval = slot_getattr(winstate->framehead_slot, sortCol,
1605 									   &headisnull);
1606 				currval = slot_getattr(winstate->ss.ss_ScanTupleSlot, sortCol,
1607 									   &currisnull);
1608 				if (headisnull || currisnull)
1609 				{
1610 					/* order of the rows depends only on nulls_first */
1611 					if (winstate->inRangeNullsFirst)
1612 					{
1613 						/* advance head if head is null and curr is not */
1614 						if (!headisnull || currisnull)
1615 							break;
1616 					}
1617 					else
1618 					{
1619 						/* advance head if head is not null and curr is null */
1620 						if (headisnull || !currisnull)
1621 							break;
1622 					}
1623 				}
1624 				else
1625 				{
1626 					if (DatumGetBool(FunctionCall5Coll(&winstate->startInRangeFunc,
1627 													   winstate->inRangeColl,
1628 													   headval,
1629 													   currval,
1630 													   winstate->startOffsetValue,
1631 													   BoolGetDatum(sub),
1632 													   BoolGetDatum(less))))
1633 						break;	/* this row is the correct frame head */
1634 				}
1635 				/* Note we advance frameheadpos even if the fetch fails */
1636 				winstate->frameheadpos++;
1637 				spool_tuples(winstate, winstate->frameheadpos);
1638 				if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1639 											 winstate->framehead_slot))
1640 					break;		/* end of partition */
1641 			}
1642 			winstate->framehead_valid = true;
1643 		}
1644 		else if (frameOptions & FRAMEOPTION_GROUPS)
1645 		{
1646 			/*
1647 			 * In GROUPS START_OFFSET mode, frame head is the first row of the
1648 			 * first peer group whose number satisfies the offset constraint.
1649 			 * We keep a copy of the last-known frame head row in
1650 			 * framehead_slot, and advance as necessary.  Note that if we
1651 			 * reach end of partition, we will leave frameheadpos = end+1 and
1652 			 * framehead_slot empty.
1653 			 */
1654 			int64		offset = DatumGetInt64(winstate->startOffsetValue);
1655 			int64		minheadgroup;
1656 
1657 			if (frameOptions & FRAMEOPTION_START_OFFSET_PRECEDING)
1658 				minheadgroup = winstate->currentgroup - offset;
1659 			else
1660 				minheadgroup = winstate->currentgroup + offset;
1661 
1662 			tuplestore_select_read_pointer(winstate->buffer,
1663 										   winstate->framehead_ptr);
1664 			if (winstate->frameheadpos == 0 &&
1665 				TupIsNull(winstate->framehead_slot))
1666 			{
1667 				/* fetch first row into framehead_slot, if we didn't already */
1668 				if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1669 											 winstate->framehead_slot))
1670 					elog(ERROR, "unexpected end of tuplestore");
1671 			}
1672 
1673 			while (!TupIsNull(winstate->framehead_slot))
1674 			{
1675 				if (winstate->frameheadgroup >= minheadgroup)
1676 					break;		/* this row is the correct frame head */
1677 				ExecCopySlot(winstate->temp_slot_2, winstate->framehead_slot);
1678 				/* Note we advance frameheadpos even if the fetch fails */
1679 				winstate->frameheadpos++;
1680 				spool_tuples(winstate, winstate->frameheadpos);
1681 				if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1682 											 winstate->framehead_slot))
1683 					break;		/* end of partition */
1684 				if (!are_peers(winstate, winstate->temp_slot_2,
1685 							   winstate->framehead_slot))
1686 					winstate->frameheadgroup++;
1687 			}
1688 			ExecClearTuple(winstate->temp_slot_2);
1689 			winstate->framehead_valid = true;
1690 		}
1691 		else
1692 			Assert(false);
1693 	}
1694 	else
1695 		Assert(false);
1696 
1697 	MemoryContextSwitchTo(oldcontext);
1698 }
1699 
1700 /*
1701  * update_frametailpos
1702  * make frametailpos valid for the current row
1703  *
1704  * Note that frametailpos is computed without regard for any window exclusion
1705  * clause; the current row and/or its peers are considered part of the frame
1706  * for this purpose even if they must be excluded later.
1707  *
1708  * May clobber winstate->temp_slot_2.
1709  */
1710 static void
update_frametailpos(WindowAggState * winstate)1711 update_frametailpos(WindowAggState *winstate)
1712 {
1713 	WindowAgg  *node = (WindowAgg *) winstate->ss.ps.plan;
1714 	int			frameOptions = winstate->frameOptions;
1715 	MemoryContext oldcontext;
1716 
1717 	if (winstate->frametail_valid)
1718 		return;					/* already known for current row */
1719 
1720 	/* We may be called in a short-lived context */
1721 	oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
1722 
1723 	if (frameOptions & FRAMEOPTION_END_UNBOUNDED_FOLLOWING)
1724 	{
1725 		/* In UNBOUNDED FOLLOWING mode, all partition rows are in frame */
1726 		spool_tuples(winstate, -1);
1727 		winstate->frametailpos = winstate->spooled_rows;
1728 		winstate->frametail_valid = true;
1729 	}
1730 	else if (frameOptions & FRAMEOPTION_END_CURRENT_ROW)
1731 	{
1732 		if (frameOptions & FRAMEOPTION_ROWS)
1733 		{
1734 			/* In ROWS mode, exactly the rows up to current are in frame */
1735 			winstate->frametailpos = winstate->currentpos + 1;
1736 			winstate->frametail_valid = true;
1737 		}
1738 		else if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS))
1739 		{
1740 			/* If no ORDER BY, all rows are peers with each other */
1741 			if (node->ordNumCols == 0)
1742 			{
1743 				spool_tuples(winstate, -1);
1744 				winstate->frametailpos = winstate->spooled_rows;
1745 				winstate->frametail_valid = true;
1746 				MemoryContextSwitchTo(oldcontext);
1747 				return;
1748 			}
1749 
1750 			/*
1751 			 * In RANGE or GROUPS END_CURRENT_ROW mode, frame end is the last
1752 			 * row that is a peer of current row, frame tail is the row after
1753 			 * that (if any).  We keep a copy of the last-known frame tail row
1754 			 * in frametail_slot, and advance as necessary.  Note that if we
1755 			 * reach end of partition, we will leave frametailpos = end+1 and
1756 			 * frametail_slot empty.
1757 			 */
1758 			tuplestore_select_read_pointer(winstate->buffer,
1759 										   winstate->frametail_ptr);
1760 			if (winstate->frametailpos == 0 &&
1761 				TupIsNull(winstate->frametail_slot))
1762 			{
1763 				/* fetch first row into frametail_slot, if we didn't already */
1764 				if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1765 											 winstate->frametail_slot))
1766 					elog(ERROR, "unexpected end of tuplestore");
1767 			}
1768 
1769 			while (!TupIsNull(winstate->frametail_slot))
1770 			{
1771 				if (winstate->frametailpos > winstate->currentpos &&
1772 					!are_peers(winstate, winstate->frametail_slot,
1773 							   winstate->ss.ss_ScanTupleSlot))
1774 					break;		/* this row is the frame tail */
1775 				/* Note we advance frametailpos even if the fetch fails */
1776 				winstate->frametailpos++;
1777 				spool_tuples(winstate, winstate->frametailpos);
1778 				if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1779 											 winstate->frametail_slot))
1780 					break;		/* end of partition */
1781 			}
1782 			winstate->frametail_valid = true;
1783 		}
1784 		else
1785 			Assert(false);
1786 	}
1787 	else if (frameOptions & FRAMEOPTION_END_OFFSET)
1788 	{
1789 		if (frameOptions & FRAMEOPTION_ROWS)
1790 		{
1791 			/* In ROWS mode, bound is physically n before/after current */
1792 			int64		offset = DatumGetInt64(winstate->endOffsetValue);
1793 
1794 			if (frameOptions & FRAMEOPTION_END_OFFSET_PRECEDING)
1795 				offset = -offset;
1796 
1797 			winstate->frametailpos = winstate->currentpos + offset + 1;
1798 			/* smallest allowable value of frametailpos is 0 */
1799 			if (winstate->frametailpos < 0)
1800 				winstate->frametailpos = 0;
1801 			else if (winstate->frametailpos > winstate->currentpos + 1)
1802 			{
1803 				/* make sure frametailpos is not past end of partition */
1804 				spool_tuples(winstate, winstate->frametailpos - 1);
1805 				if (winstate->frametailpos > winstate->spooled_rows)
1806 					winstate->frametailpos = winstate->spooled_rows;
1807 			}
1808 			winstate->frametail_valid = true;
1809 		}
1810 		else if (frameOptions & FRAMEOPTION_RANGE)
1811 		{
1812 			/*
1813 			 * In RANGE END_OFFSET mode, frame end is the last row that
1814 			 * satisfies the in_range constraint relative to the current row,
1815 			 * frame tail is the row after that (if any).  We keep a copy of
1816 			 * the last-known frame tail row in frametail_slot, and advance as
1817 			 * necessary.  Note that if we reach end of partition, we will
1818 			 * leave frametailpos = end+1 and frametail_slot empty.
1819 			 */
1820 			int			sortCol = node->ordColIdx[0];
1821 			bool		sub,
1822 						less;
1823 
1824 			/* We must have an ordering column */
1825 			Assert(node->ordNumCols == 1);
1826 
1827 			/* Precompute flags for in_range checks */
1828 			if (frameOptions & FRAMEOPTION_END_OFFSET_PRECEDING)
1829 				sub = true;		/* subtract endOffset from current row */
1830 			else
1831 				sub = false;	/* add it */
1832 			less = true;		/* normally, we want frame tail <= sum */
1833 			/* If sort order is descending, flip both flags */
1834 			if (!winstate->inRangeAsc)
1835 			{
1836 				sub = !sub;
1837 				less = false;
1838 			}
1839 
1840 			tuplestore_select_read_pointer(winstate->buffer,
1841 										   winstate->frametail_ptr);
1842 			if (winstate->frametailpos == 0 &&
1843 				TupIsNull(winstate->frametail_slot))
1844 			{
1845 				/* fetch first row into frametail_slot, if we didn't already */
1846 				if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1847 											 winstate->frametail_slot))
1848 					elog(ERROR, "unexpected end of tuplestore");
1849 			}
1850 
1851 			while (!TupIsNull(winstate->frametail_slot))
1852 			{
1853 				Datum		tailval,
1854 							currval;
1855 				bool		tailisnull,
1856 							currisnull;
1857 
1858 				tailval = slot_getattr(winstate->frametail_slot, sortCol,
1859 									   &tailisnull);
1860 				currval = slot_getattr(winstate->ss.ss_ScanTupleSlot, sortCol,
1861 									   &currisnull);
1862 				if (tailisnull || currisnull)
1863 				{
1864 					/* order of the rows depends only on nulls_first */
1865 					if (winstate->inRangeNullsFirst)
1866 					{
1867 						/* advance tail if tail is null or curr is not */
1868 						if (!tailisnull)
1869 							break;
1870 					}
1871 					else
1872 					{
1873 						/* advance tail if tail is not null or curr is null */
1874 						if (!currisnull)
1875 							break;
1876 					}
1877 				}
1878 				else
1879 				{
1880 					if (!DatumGetBool(FunctionCall5Coll(&winstate->endInRangeFunc,
1881 														winstate->inRangeColl,
1882 														tailval,
1883 														currval,
1884 														winstate->endOffsetValue,
1885 														BoolGetDatum(sub),
1886 														BoolGetDatum(less))))
1887 						break;	/* this row is the correct frame tail */
1888 				}
1889 				/* Note we advance frametailpos even if the fetch fails */
1890 				winstate->frametailpos++;
1891 				spool_tuples(winstate, winstate->frametailpos);
1892 				if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1893 											 winstate->frametail_slot))
1894 					break;		/* end of partition */
1895 			}
1896 			winstate->frametail_valid = true;
1897 		}
1898 		else if (frameOptions & FRAMEOPTION_GROUPS)
1899 		{
1900 			/*
1901 			 * In GROUPS END_OFFSET mode, frame end is the last row of the
1902 			 * last peer group whose number satisfies the offset constraint,
1903 			 * and frame tail is the row after that (if any).  We keep a copy
1904 			 * of the last-known frame tail row in frametail_slot, and advance
1905 			 * as necessary.  Note that if we reach end of partition, we will
1906 			 * leave frametailpos = end+1 and frametail_slot empty.
1907 			 */
1908 			int64		offset = DatumGetInt64(winstate->endOffsetValue);
1909 			int64		maxtailgroup;
1910 
1911 			if (frameOptions & FRAMEOPTION_END_OFFSET_PRECEDING)
1912 				maxtailgroup = winstate->currentgroup - offset;
1913 			else
1914 				maxtailgroup = winstate->currentgroup + offset;
1915 
1916 			tuplestore_select_read_pointer(winstate->buffer,
1917 										   winstate->frametail_ptr);
1918 			if (winstate->frametailpos == 0 &&
1919 				TupIsNull(winstate->frametail_slot))
1920 			{
1921 				/* fetch first row into frametail_slot, if we didn't already */
1922 				if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1923 											 winstate->frametail_slot))
1924 					elog(ERROR, "unexpected end of tuplestore");
1925 			}
1926 
1927 			while (!TupIsNull(winstate->frametail_slot))
1928 			{
1929 				if (winstate->frametailgroup > maxtailgroup)
1930 					break;		/* this row is the correct frame tail */
1931 				ExecCopySlot(winstate->temp_slot_2, winstate->frametail_slot);
1932 				/* Note we advance frametailpos even if the fetch fails */
1933 				winstate->frametailpos++;
1934 				spool_tuples(winstate, winstate->frametailpos);
1935 				if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1936 											 winstate->frametail_slot))
1937 					break;		/* end of partition */
1938 				if (!are_peers(winstate, winstate->temp_slot_2,
1939 							   winstate->frametail_slot))
1940 					winstate->frametailgroup++;
1941 			}
1942 			ExecClearTuple(winstate->temp_slot_2);
1943 			winstate->frametail_valid = true;
1944 		}
1945 		else
1946 			Assert(false);
1947 	}
1948 	else
1949 		Assert(false);
1950 
1951 	MemoryContextSwitchTo(oldcontext);
1952 }
1953 
1954 /*
1955  * update_grouptailpos
1956  * make grouptailpos valid for the current row
1957  *
1958  * May clobber winstate->temp_slot_2.
1959  */
1960 static void
update_grouptailpos(WindowAggState * winstate)1961 update_grouptailpos(WindowAggState *winstate)
1962 {
1963 	WindowAgg  *node = (WindowAgg *) winstate->ss.ps.plan;
1964 	MemoryContext oldcontext;
1965 
1966 	if (winstate->grouptail_valid)
1967 		return;					/* already known for current row */
1968 
1969 	/* We may be called in a short-lived context */
1970 	oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
1971 
1972 	/* If no ORDER BY, all rows are peers with each other */
1973 	if (node->ordNumCols == 0)
1974 	{
1975 		spool_tuples(winstate, -1);
1976 		winstate->grouptailpos = winstate->spooled_rows;
1977 		winstate->grouptail_valid = true;
1978 		MemoryContextSwitchTo(oldcontext);
1979 		return;
1980 	}
1981 
1982 	/*
1983 	 * Because grouptail_valid is reset only when current row advances into a
1984 	 * new peer group, we always reach here knowing that grouptailpos needs to
1985 	 * be advanced by at least one row.  Hence, unlike the otherwise similar
1986 	 * case for frame tail tracking, we do not need persistent storage of the
1987 	 * group tail row.
1988 	 */
1989 	Assert(winstate->grouptailpos <= winstate->currentpos);
1990 	tuplestore_select_read_pointer(winstate->buffer,
1991 								   winstate->grouptail_ptr);
1992 	for (;;)
1993 	{
1994 		/* Note we advance grouptailpos even if the fetch fails */
1995 		winstate->grouptailpos++;
1996 		spool_tuples(winstate, winstate->grouptailpos);
1997 		if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1998 									 winstate->temp_slot_2))
1999 			break;				/* end of partition */
2000 		if (winstate->grouptailpos > winstate->currentpos &&
2001 			!are_peers(winstate, winstate->temp_slot_2,
2002 					   winstate->ss.ss_ScanTupleSlot))
2003 			break;				/* this row is the group tail */
2004 	}
2005 	ExecClearTuple(winstate->temp_slot_2);
2006 	winstate->grouptail_valid = true;
2007 
2008 	MemoryContextSwitchTo(oldcontext);
2009 }
2010 
2011 
2012 /* -----------------
2013  * ExecWindowAgg
2014  *
2015  *	ExecWindowAgg receives tuples from its outer subplan and
2016  *	stores them into a tuplestore, then processes window functions.
2017  *	This node doesn't reduce nor qualify any row so the number of
2018  *	returned rows is exactly the same as its outer subplan's result.
2019  * -----------------
2020  */
2021 static TupleTableSlot *
ExecWindowAgg(PlanState * pstate)2022 ExecWindowAgg(PlanState *pstate)
2023 {
2024 	WindowAggState *winstate = castNode(WindowAggState, pstate);
2025 	ExprContext *econtext;
2026 	int			i;
2027 	int			numfuncs;
2028 
2029 	CHECK_FOR_INTERRUPTS();
2030 
2031 	if (winstate->all_done)
2032 		return NULL;
2033 
2034 	/*
2035 	 * Compute frame offset values, if any, during first call (or after a
2036 	 * rescan).  These are assumed to hold constant throughout the scan; if
2037 	 * user gives us a volatile expression, we'll only use its initial value.
2038 	 */
2039 	if (winstate->all_first)
2040 	{
2041 		int			frameOptions = winstate->frameOptions;
2042 		ExprContext *econtext = winstate->ss.ps.ps_ExprContext;
2043 		Datum		value;
2044 		bool		isnull;
2045 		int16		len;
2046 		bool		byval;
2047 
2048 		if (frameOptions & FRAMEOPTION_START_OFFSET)
2049 		{
2050 			Assert(winstate->startOffset != NULL);
2051 			value = ExecEvalExprSwitchContext(winstate->startOffset,
2052 											  econtext,
2053 											  &isnull);
2054 			if (isnull)
2055 				ereport(ERROR,
2056 						(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
2057 						 errmsg("frame starting offset must not be null")));
2058 			/* copy value into query-lifespan context */
2059 			get_typlenbyval(exprType((Node *) winstate->startOffset->expr),
2060 							&len, &byval);
2061 			winstate->startOffsetValue = datumCopy(value, byval, len);
2062 			if (frameOptions & (FRAMEOPTION_ROWS | FRAMEOPTION_GROUPS))
2063 			{
2064 				/* value is known to be int8 */
2065 				int64		offset = DatumGetInt64(value);
2066 
2067 				if (offset < 0)
2068 					ereport(ERROR,
2069 							(errcode(ERRCODE_INVALID_PRECEDING_OR_FOLLOWING_SIZE),
2070 							 errmsg("frame starting offset must not be negative")));
2071 			}
2072 		}
2073 		if (frameOptions & FRAMEOPTION_END_OFFSET)
2074 		{
2075 			Assert(winstate->endOffset != NULL);
2076 			value = ExecEvalExprSwitchContext(winstate->endOffset,
2077 											  econtext,
2078 											  &isnull);
2079 			if (isnull)
2080 				ereport(ERROR,
2081 						(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
2082 						 errmsg("frame ending offset must not be null")));
2083 			/* copy value into query-lifespan context */
2084 			get_typlenbyval(exprType((Node *) winstate->endOffset->expr),
2085 							&len, &byval);
2086 			winstate->endOffsetValue = datumCopy(value, byval, len);
2087 			if (frameOptions & (FRAMEOPTION_ROWS | FRAMEOPTION_GROUPS))
2088 			{
2089 				/* value is known to be int8 */
2090 				int64		offset = DatumGetInt64(value);
2091 
2092 				if (offset < 0)
2093 					ereport(ERROR,
2094 							(errcode(ERRCODE_INVALID_PRECEDING_OR_FOLLOWING_SIZE),
2095 							 errmsg("frame ending offset must not be negative")));
2096 			}
2097 		}
2098 		winstate->all_first = false;
2099 	}
2100 
2101 	if (winstate->buffer == NULL)
2102 	{
2103 		/* Initialize for first partition and set current row = 0 */
2104 		begin_partition(winstate);
2105 		/* If there are no input rows, we'll detect that and exit below */
2106 	}
2107 	else
2108 	{
2109 		/* Advance current row within partition */
2110 		winstate->currentpos++;
2111 		/* This might mean that the frame moves, too */
2112 		winstate->framehead_valid = false;
2113 		winstate->frametail_valid = false;
2114 		/* we don't need to invalidate grouptail here; see below */
2115 	}
2116 
2117 	/*
2118 	 * Spool all tuples up to and including the current row, if we haven't
2119 	 * already
2120 	 */
2121 	spool_tuples(winstate, winstate->currentpos);
2122 
2123 	/* Move to the next partition if we reached the end of this partition */
2124 	if (winstate->partition_spooled &&
2125 		winstate->currentpos >= winstate->spooled_rows)
2126 	{
2127 		release_partition(winstate);
2128 
2129 		if (winstate->more_partitions)
2130 		{
2131 			begin_partition(winstate);
2132 			Assert(winstate->spooled_rows > 0);
2133 		}
2134 		else
2135 		{
2136 			winstate->all_done = true;
2137 			return NULL;
2138 		}
2139 	}
2140 
2141 	/* final output execution is in ps_ExprContext */
2142 	econtext = winstate->ss.ps.ps_ExprContext;
2143 
2144 	/* Clear the per-output-tuple context for current row */
2145 	ResetExprContext(econtext);
2146 
2147 	/*
2148 	 * Read the current row from the tuplestore, and save in ScanTupleSlot.
2149 	 * (We can't rely on the outerplan's output slot because we may have to
2150 	 * read beyond the current row.  Also, we have to actually copy the row
2151 	 * out of the tuplestore, since window function evaluation might cause the
2152 	 * tuplestore to dump its state to disk.)
2153 	 *
2154 	 * In GROUPS mode, or when tracking a group-oriented exclusion clause, we
2155 	 * must also detect entering a new peer group and update associated state
2156 	 * when that happens.  We use temp_slot_2 to temporarily hold the previous
2157 	 * row for this purpose.
2158 	 *
2159 	 * Current row must be in the tuplestore, since we spooled it above.
2160 	 */
2161 	tuplestore_select_read_pointer(winstate->buffer, winstate->current_ptr);
2162 	if ((winstate->frameOptions & (FRAMEOPTION_GROUPS |
2163 								   FRAMEOPTION_EXCLUDE_GROUP |
2164 								   FRAMEOPTION_EXCLUDE_TIES)) &&
2165 		winstate->currentpos > 0)
2166 	{
2167 		ExecCopySlot(winstate->temp_slot_2, winstate->ss.ss_ScanTupleSlot);
2168 		if (!tuplestore_gettupleslot(winstate->buffer, true, true,
2169 									 winstate->ss.ss_ScanTupleSlot))
2170 			elog(ERROR, "unexpected end of tuplestore");
2171 		if (!are_peers(winstate, winstate->temp_slot_2,
2172 					   winstate->ss.ss_ScanTupleSlot))
2173 		{
2174 			winstate->currentgroup++;
2175 			winstate->groupheadpos = winstate->currentpos;
2176 			winstate->grouptail_valid = false;
2177 		}
2178 		ExecClearTuple(winstate->temp_slot_2);
2179 	}
2180 	else
2181 	{
2182 		if (!tuplestore_gettupleslot(winstate->buffer, true, true,
2183 									 winstate->ss.ss_ScanTupleSlot))
2184 			elog(ERROR, "unexpected end of tuplestore");
2185 	}
2186 
2187 	/*
2188 	 * Evaluate true window functions
2189 	 */
2190 	numfuncs = winstate->numfuncs;
2191 	for (i = 0; i < numfuncs; i++)
2192 	{
2193 		WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
2194 
2195 		if (perfuncstate->plain_agg)
2196 			continue;
2197 		eval_windowfunction(winstate, perfuncstate,
2198 							&(econtext->ecxt_aggvalues[perfuncstate->wfuncstate->wfuncno]),
2199 							&(econtext->ecxt_aggnulls[perfuncstate->wfuncstate->wfuncno]));
2200 	}
2201 
2202 	/*
2203 	 * Evaluate aggregates
2204 	 */
2205 	if (winstate->numaggs > 0)
2206 		eval_windowaggregates(winstate);
2207 
2208 	/*
2209 	 * If we have created auxiliary read pointers for the frame or group
2210 	 * boundaries, force them to be kept up-to-date, because we don't know
2211 	 * whether the window function(s) will do anything that requires that.
2212 	 * Failing to advance the pointers would result in being unable to trim
2213 	 * data from the tuplestore, which is bad.  (If we could know in advance
2214 	 * whether the window functions will use frame boundary info, we could
2215 	 * skip creating these pointers in the first place ... but unfortunately
2216 	 * the window function API doesn't require that.)
2217 	 */
2218 	if (winstate->framehead_ptr >= 0)
2219 		update_frameheadpos(winstate);
2220 	if (winstate->frametail_ptr >= 0)
2221 		update_frametailpos(winstate);
2222 	if (winstate->grouptail_ptr >= 0)
2223 		update_grouptailpos(winstate);
2224 
2225 	/*
2226 	 * Truncate any no-longer-needed rows from the tuplestore.
2227 	 */
2228 	tuplestore_trim(winstate->buffer);
2229 
2230 	/*
2231 	 * Form and return a projection tuple using the windowfunc results and the
2232 	 * current row.  Setting ecxt_outertuple arranges that any Vars will be
2233 	 * evaluated with respect to that row.
2234 	 */
2235 	econtext->ecxt_outertuple = winstate->ss.ss_ScanTupleSlot;
2236 
2237 	return ExecProject(winstate->ss.ps.ps_ProjInfo);
2238 }
2239 
2240 /* -----------------
2241  * ExecInitWindowAgg
2242  *
2243  *	Creates the run-time information for the WindowAgg node produced by the
2244  *	planner and initializes its outer subtree
2245  * -----------------
2246  */
2247 WindowAggState *
ExecInitWindowAgg(WindowAgg * node,EState * estate,int eflags)2248 ExecInitWindowAgg(WindowAgg *node, EState *estate, int eflags)
2249 {
2250 	WindowAggState *winstate;
2251 	Plan	   *outerPlan;
2252 	ExprContext *econtext;
2253 	ExprContext *tmpcontext;
2254 	WindowStatePerFunc perfunc;
2255 	WindowStatePerAgg peragg;
2256 	int			frameOptions = node->frameOptions;
2257 	int			numfuncs,
2258 				wfuncno,
2259 				numaggs,
2260 				aggno;
2261 	TupleDesc	scanDesc;
2262 	ListCell   *l;
2263 
2264 	/* check for unsupported flags */
2265 	Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
2266 
2267 	/*
2268 	 * create state structure
2269 	 */
2270 	winstate = makeNode(WindowAggState);
2271 	winstate->ss.ps.plan = (Plan *) node;
2272 	winstate->ss.ps.state = estate;
2273 	winstate->ss.ps.ExecProcNode = ExecWindowAgg;
2274 
2275 	/*
2276 	 * Create expression contexts.  We need two, one for per-input-tuple
2277 	 * processing and one for per-output-tuple processing.  We cheat a little
2278 	 * by using ExecAssignExprContext() to build both.
2279 	 */
2280 	ExecAssignExprContext(estate, &winstate->ss.ps);
2281 	tmpcontext = winstate->ss.ps.ps_ExprContext;
2282 	winstate->tmpcontext = tmpcontext;
2283 	ExecAssignExprContext(estate, &winstate->ss.ps);
2284 
2285 	/* Create long-lived context for storage of partition-local memory etc */
2286 	winstate->partcontext =
2287 		AllocSetContextCreate(CurrentMemoryContext,
2288 							  "WindowAgg Partition",
2289 							  ALLOCSET_DEFAULT_SIZES);
2290 
2291 	/*
2292 	 * Create mid-lived context for aggregate trans values etc.
2293 	 *
2294 	 * Note that moving aggregates each use their own private context, not
2295 	 * this one.
2296 	 */
2297 	winstate->aggcontext =
2298 		AllocSetContextCreate(CurrentMemoryContext,
2299 							  "WindowAgg Aggregates",
2300 							  ALLOCSET_DEFAULT_SIZES);
2301 
2302 	/*
2303 	 * WindowAgg nodes never have quals, since they can only occur at the
2304 	 * logical top level of a query (ie, after any WHERE or HAVING filters)
2305 	 */
2306 	Assert(node->plan.qual == NIL);
2307 	winstate->ss.ps.qual = NULL;
2308 
2309 	/*
2310 	 * initialize child nodes
2311 	 */
2312 	outerPlan = outerPlan(node);
2313 	outerPlanState(winstate) = ExecInitNode(outerPlan, estate, eflags);
2314 
2315 	/*
2316 	 * initialize source tuple type (which is also the tuple type that we'll
2317 	 * store in the tuplestore and use in all our working slots).
2318 	 */
2319 	ExecCreateScanSlotFromOuterPlan(estate, &winstate->ss, &TTSOpsMinimalTuple);
2320 	scanDesc = winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
2321 
2322 	/* the outer tuple isn't the child's tuple, but always a minimal tuple */
2323 	winstate->ss.ps.outeropsset = true;
2324 	winstate->ss.ps.outerops = &TTSOpsMinimalTuple;
2325 	winstate->ss.ps.outeropsfixed = true;
2326 
2327 	/*
2328 	 * tuple table initialization
2329 	 */
2330 	winstate->first_part_slot = ExecInitExtraTupleSlot(estate, scanDesc,
2331 													   &TTSOpsMinimalTuple);
2332 	winstate->agg_row_slot = ExecInitExtraTupleSlot(estate, scanDesc,
2333 													&TTSOpsMinimalTuple);
2334 	winstate->temp_slot_1 = ExecInitExtraTupleSlot(estate, scanDesc,
2335 												   &TTSOpsMinimalTuple);
2336 	winstate->temp_slot_2 = ExecInitExtraTupleSlot(estate, scanDesc,
2337 												   &TTSOpsMinimalTuple);
2338 
2339 	/*
2340 	 * create frame head and tail slots only if needed (must create slots in
2341 	 * exactly the same cases that update_frameheadpos and update_frametailpos
2342 	 * need them)
2343 	 */
2344 	winstate->framehead_slot = winstate->frametail_slot = NULL;
2345 
2346 	if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS))
2347 	{
2348 		if (((frameOptions & FRAMEOPTION_START_CURRENT_ROW) &&
2349 			 node->ordNumCols != 0) ||
2350 			(frameOptions & FRAMEOPTION_START_OFFSET))
2351 			winstate->framehead_slot = ExecInitExtraTupleSlot(estate, scanDesc,
2352 															  &TTSOpsMinimalTuple);
2353 		if (((frameOptions & FRAMEOPTION_END_CURRENT_ROW) &&
2354 			 node->ordNumCols != 0) ||
2355 			(frameOptions & FRAMEOPTION_END_OFFSET))
2356 			winstate->frametail_slot = ExecInitExtraTupleSlot(estate, scanDesc,
2357 															  &TTSOpsMinimalTuple);
2358 	}
2359 
2360 	/*
2361 	 * Initialize result slot, type and projection.
2362 	 */
2363 	ExecInitResultTupleSlotTL(&winstate->ss.ps, &TTSOpsVirtual);
2364 	ExecAssignProjectionInfo(&winstate->ss.ps, NULL);
2365 
2366 	/* Set up data for comparing tuples */
2367 	if (node->partNumCols > 0)
2368 		winstate->partEqfunction =
2369 			execTuplesMatchPrepare(scanDesc,
2370 								   node->partNumCols,
2371 								   node->partColIdx,
2372 								   node->partOperators,
2373 								   node->partCollations,
2374 								   &winstate->ss.ps);
2375 
2376 	if (node->ordNumCols > 0)
2377 		winstate->ordEqfunction =
2378 			execTuplesMatchPrepare(scanDesc,
2379 								   node->ordNumCols,
2380 								   node->ordColIdx,
2381 								   node->ordOperators,
2382 								   node->ordCollations,
2383 								   &winstate->ss.ps);
2384 
2385 	/*
2386 	 * WindowAgg nodes use aggvalues and aggnulls as well as Agg nodes.
2387 	 */
2388 	numfuncs = winstate->numfuncs;
2389 	numaggs = winstate->numaggs;
2390 	econtext = winstate->ss.ps.ps_ExprContext;
2391 	econtext->ecxt_aggvalues = (Datum *) palloc0(sizeof(Datum) * numfuncs);
2392 	econtext->ecxt_aggnulls = (bool *) palloc0(sizeof(bool) * numfuncs);
2393 
2394 	/*
2395 	 * allocate per-wfunc/per-agg state information.
2396 	 */
2397 	perfunc = (WindowStatePerFunc) palloc0(sizeof(WindowStatePerFuncData) * numfuncs);
2398 	peragg = (WindowStatePerAgg) palloc0(sizeof(WindowStatePerAggData) * numaggs);
2399 	winstate->perfunc = perfunc;
2400 	winstate->peragg = peragg;
2401 
2402 	wfuncno = -1;
2403 	aggno = -1;
2404 	foreach(l, winstate->funcs)
2405 	{
2406 		WindowFuncExprState *wfuncstate = (WindowFuncExprState *) lfirst(l);
2407 		WindowFunc *wfunc = wfuncstate->wfunc;
2408 		WindowStatePerFunc perfuncstate;
2409 		AclResult	aclresult;
2410 		int			i;
2411 
2412 		if (wfunc->winref != node->winref)	/* planner screwed up? */
2413 			elog(ERROR, "WindowFunc with winref %u assigned to WindowAgg with winref %u",
2414 				 wfunc->winref, node->winref);
2415 
2416 		/* Look for a previous duplicate window function */
2417 		for (i = 0; i <= wfuncno; i++)
2418 		{
2419 			if (equal(wfunc, perfunc[i].wfunc) &&
2420 				!contain_volatile_functions((Node *) wfunc))
2421 				break;
2422 		}
2423 		if (i <= wfuncno)
2424 		{
2425 			/* Found a match to an existing entry, so just mark it */
2426 			wfuncstate->wfuncno = i;
2427 			continue;
2428 		}
2429 
2430 		/* Nope, so assign a new PerAgg record */
2431 		perfuncstate = &perfunc[++wfuncno];
2432 
2433 		/* Mark WindowFunc state node with assigned index in the result array */
2434 		wfuncstate->wfuncno = wfuncno;
2435 
2436 		/* Check permission to call window function */
2437 		aclresult = pg_proc_aclcheck(wfunc->winfnoid, GetUserId(),
2438 									 ACL_EXECUTE);
2439 		if (aclresult != ACLCHECK_OK)
2440 			aclcheck_error(aclresult, OBJECT_FUNCTION,
2441 						   get_func_name(wfunc->winfnoid));
2442 		InvokeFunctionExecuteHook(wfunc->winfnoid);
2443 
2444 		/* Fill in the perfuncstate data */
2445 		perfuncstate->wfuncstate = wfuncstate;
2446 		perfuncstate->wfunc = wfunc;
2447 		perfuncstate->numArguments = list_length(wfuncstate->args);
2448 
2449 		fmgr_info_cxt(wfunc->winfnoid, &perfuncstate->flinfo,
2450 					  econtext->ecxt_per_query_memory);
2451 		fmgr_info_set_expr((Node *) wfunc, &perfuncstate->flinfo);
2452 
2453 		perfuncstate->winCollation = wfunc->inputcollid;
2454 
2455 		get_typlenbyval(wfunc->wintype,
2456 						&perfuncstate->resulttypeLen,
2457 						&perfuncstate->resulttypeByVal);
2458 
2459 		/*
2460 		 * If it's really just a plain aggregate function, we'll emulate the
2461 		 * Agg environment for it.
2462 		 */
2463 		perfuncstate->plain_agg = wfunc->winagg;
2464 		if (wfunc->winagg)
2465 		{
2466 			WindowStatePerAgg peraggstate;
2467 
2468 			perfuncstate->aggno = ++aggno;
2469 			peraggstate = &winstate->peragg[aggno];
2470 			initialize_peragg(winstate, wfunc, peraggstate);
2471 			peraggstate->wfuncno = wfuncno;
2472 		}
2473 		else
2474 		{
2475 			WindowObject winobj = makeNode(WindowObjectData);
2476 
2477 			winobj->winstate = winstate;
2478 			winobj->argstates = wfuncstate->args;
2479 			winobj->localmem = NULL;
2480 			perfuncstate->winobj = winobj;
2481 		}
2482 	}
2483 
2484 	/* Update numfuncs, numaggs to match number of unique functions found */
2485 	winstate->numfuncs = wfuncno + 1;
2486 	winstate->numaggs = aggno + 1;
2487 
2488 	/* Set up WindowObject for aggregates, if needed */
2489 	if (winstate->numaggs > 0)
2490 	{
2491 		WindowObject agg_winobj = makeNode(WindowObjectData);
2492 
2493 		agg_winobj->winstate = winstate;
2494 		agg_winobj->argstates = NIL;
2495 		agg_winobj->localmem = NULL;
2496 		/* make sure markptr = -1 to invalidate. It may not get used */
2497 		agg_winobj->markptr = -1;
2498 		agg_winobj->readptr = -1;
2499 		winstate->agg_winobj = agg_winobj;
2500 	}
2501 
2502 	/* copy frame options to state node for easy access */
2503 	winstate->frameOptions = frameOptions;
2504 
2505 	/* initialize frame bound offset expressions */
2506 	winstate->startOffset = ExecInitExpr((Expr *) node->startOffset,
2507 										 (PlanState *) winstate);
2508 	winstate->endOffset = ExecInitExpr((Expr *) node->endOffset,
2509 									   (PlanState *) winstate);
2510 
2511 	/* Lookup in_range support functions if needed */
2512 	if (OidIsValid(node->startInRangeFunc))
2513 		fmgr_info(node->startInRangeFunc, &winstate->startInRangeFunc);
2514 	if (OidIsValid(node->endInRangeFunc))
2515 		fmgr_info(node->endInRangeFunc, &winstate->endInRangeFunc);
2516 	winstate->inRangeColl = node->inRangeColl;
2517 	winstate->inRangeAsc = node->inRangeAsc;
2518 	winstate->inRangeNullsFirst = node->inRangeNullsFirst;
2519 
2520 	winstate->all_first = true;
2521 	winstate->partition_spooled = false;
2522 	winstate->more_partitions = false;
2523 
2524 	return winstate;
2525 }
2526 
2527 /* -----------------
2528  * ExecEndWindowAgg
2529  * -----------------
2530  */
2531 void
ExecEndWindowAgg(WindowAggState * node)2532 ExecEndWindowAgg(WindowAggState *node)
2533 {
2534 	PlanState  *outerPlan;
2535 	int			i;
2536 
2537 	release_partition(node);
2538 
2539 	ExecClearTuple(node->ss.ss_ScanTupleSlot);
2540 	ExecClearTuple(node->first_part_slot);
2541 	ExecClearTuple(node->agg_row_slot);
2542 	ExecClearTuple(node->temp_slot_1);
2543 	ExecClearTuple(node->temp_slot_2);
2544 	if (node->framehead_slot)
2545 		ExecClearTuple(node->framehead_slot);
2546 	if (node->frametail_slot)
2547 		ExecClearTuple(node->frametail_slot);
2548 
2549 	/*
2550 	 * Free both the expr contexts.
2551 	 */
2552 	ExecFreeExprContext(&node->ss.ps);
2553 	node->ss.ps.ps_ExprContext = node->tmpcontext;
2554 	ExecFreeExprContext(&node->ss.ps);
2555 
2556 	for (i = 0; i < node->numaggs; i++)
2557 	{
2558 		if (node->peragg[i].aggcontext != node->aggcontext)
2559 			MemoryContextDelete(node->peragg[i].aggcontext);
2560 	}
2561 	MemoryContextDelete(node->partcontext);
2562 	MemoryContextDelete(node->aggcontext);
2563 
2564 	pfree(node->perfunc);
2565 	pfree(node->peragg);
2566 
2567 	outerPlan = outerPlanState(node);
2568 	ExecEndNode(outerPlan);
2569 }
2570 
2571 /* -----------------
2572  * ExecReScanWindowAgg
2573  * -----------------
2574  */
2575 void
ExecReScanWindowAgg(WindowAggState * node)2576 ExecReScanWindowAgg(WindowAggState *node)
2577 {
2578 	PlanState  *outerPlan = outerPlanState(node);
2579 	ExprContext *econtext = node->ss.ps.ps_ExprContext;
2580 
2581 	node->all_done = false;
2582 	node->all_first = true;
2583 
2584 	/* release tuplestore et al */
2585 	release_partition(node);
2586 
2587 	/* release all temp tuples, but especially first_part_slot */
2588 	ExecClearTuple(node->ss.ss_ScanTupleSlot);
2589 	ExecClearTuple(node->first_part_slot);
2590 	ExecClearTuple(node->agg_row_slot);
2591 	ExecClearTuple(node->temp_slot_1);
2592 	ExecClearTuple(node->temp_slot_2);
2593 	if (node->framehead_slot)
2594 		ExecClearTuple(node->framehead_slot);
2595 	if (node->frametail_slot)
2596 		ExecClearTuple(node->frametail_slot);
2597 
2598 	/* Forget current wfunc values */
2599 	MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * node->numfuncs);
2600 	MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * node->numfuncs);
2601 
2602 	/*
2603 	 * if chgParam of subnode is not null then plan will be re-scanned by
2604 	 * first ExecProcNode.
2605 	 */
2606 	if (outerPlan->chgParam == NULL)
2607 		ExecReScan(outerPlan);
2608 }
2609 
2610 /*
2611  * initialize_peragg
2612  *
2613  * Almost same as in nodeAgg.c, except we don't support DISTINCT currently.
2614  */
2615 static WindowStatePerAggData *
initialize_peragg(WindowAggState * winstate,WindowFunc * wfunc,WindowStatePerAgg peraggstate)2616 initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
2617 				  WindowStatePerAgg peraggstate)
2618 {
2619 	Oid			inputTypes[FUNC_MAX_ARGS];
2620 	int			numArguments;
2621 	HeapTuple	aggTuple;
2622 	Form_pg_aggregate aggform;
2623 	Oid			aggtranstype;
2624 	AttrNumber	initvalAttNo;
2625 	AclResult	aclresult;
2626 	bool		use_ma_code;
2627 	Oid			transfn_oid,
2628 				invtransfn_oid,
2629 				finalfn_oid;
2630 	bool		finalextra;
2631 	char		finalmodify;
2632 	Expr	   *transfnexpr,
2633 			   *invtransfnexpr,
2634 			   *finalfnexpr;
2635 	Datum		textInitVal;
2636 	int			i;
2637 	ListCell   *lc;
2638 
2639 	numArguments = list_length(wfunc->args);
2640 
2641 	i = 0;
2642 	foreach(lc, wfunc->args)
2643 	{
2644 		inputTypes[i++] = exprType((Node *) lfirst(lc));
2645 	}
2646 
2647 	aggTuple = SearchSysCache1(AGGFNOID, ObjectIdGetDatum(wfunc->winfnoid));
2648 	if (!HeapTupleIsValid(aggTuple))
2649 		elog(ERROR, "cache lookup failed for aggregate %u",
2650 			 wfunc->winfnoid);
2651 	aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
2652 
2653 	/*
2654 	 * Figure out whether we want to use the moving-aggregate implementation,
2655 	 * and collect the right set of fields from the pg_attribute entry.
2656 	 *
2657 	 * It's possible that an aggregate would supply a safe moving-aggregate
2658 	 * implementation and an unsafe normal one, in which case our hand is
2659 	 * forced.  Otherwise, if the frame head can't move, we don't need
2660 	 * moving-aggregate code.  Even if we'd like to use it, don't do so if the
2661 	 * aggregate's arguments (and FILTER clause if any) contain any calls to
2662 	 * volatile functions.  Otherwise, the difference between restarting and
2663 	 * not restarting the aggregation would be user-visible.
2664 	 */
2665 	if (!OidIsValid(aggform->aggminvtransfn))
2666 		use_ma_code = false;	/* sine qua non */
2667 	else if (aggform->aggmfinalmodify == AGGMODIFY_READ_ONLY &&
2668 			 aggform->aggfinalmodify != AGGMODIFY_READ_ONLY)
2669 		use_ma_code = true;		/* decision forced by safety */
2670 	else if (winstate->frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING)
2671 		use_ma_code = false;	/* non-moving frame head */
2672 	else if (contain_volatile_functions((Node *) wfunc))
2673 		use_ma_code = false;	/* avoid possible behavioral change */
2674 	else
2675 		use_ma_code = true;		/* yes, let's use it */
2676 	if (use_ma_code)
2677 	{
2678 		peraggstate->transfn_oid = transfn_oid = aggform->aggmtransfn;
2679 		peraggstate->invtransfn_oid = invtransfn_oid = aggform->aggminvtransfn;
2680 		peraggstate->finalfn_oid = finalfn_oid = aggform->aggmfinalfn;
2681 		finalextra = aggform->aggmfinalextra;
2682 		finalmodify = aggform->aggmfinalmodify;
2683 		aggtranstype = aggform->aggmtranstype;
2684 		initvalAttNo = Anum_pg_aggregate_aggminitval;
2685 	}
2686 	else
2687 	{
2688 		peraggstate->transfn_oid = transfn_oid = aggform->aggtransfn;
2689 		peraggstate->invtransfn_oid = invtransfn_oid = InvalidOid;
2690 		peraggstate->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
2691 		finalextra = aggform->aggfinalextra;
2692 		finalmodify = aggform->aggfinalmodify;
2693 		aggtranstype = aggform->aggtranstype;
2694 		initvalAttNo = Anum_pg_aggregate_agginitval;
2695 	}
2696 
2697 	/*
2698 	 * ExecInitWindowAgg already checked permission to call aggregate function
2699 	 * ... but we still need to check the component functions
2700 	 */
2701 
2702 	/* Check that aggregate owner has permission to call component fns */
2703 	{
2704 		HeapTuple	procTuple;
2705 		Oid			aggOwner;
2706 
2707 		procTuple = SearchSysCache1(PROCOID,
2708 									ObjectIdGetDatum(wfunc->winfnoid));
2709 		if (!HeapTupleIsValid(procTuple))
2710 			elog(ERROR, "cache lookup failed for function %u",
2711 				 wfunc->winfnoid);
2712 		aggOwner = ((Form_pg_proc) GETSTRUCT(procTuple))->proowner;
2713 		ReleaseSysCache(procTuple);
2714 
2715 		aclresult = pg_proc_aclcheck(transfn_oid, aggOwner,
2716 									 ACL_EXECUTE);
2717 		if (aclresult != ACLCHECK_OK)
2718 			aclcheck_error(aclresult, OBJECT_FUNCTION,
2719 						   get_func_name(transfn_oid));
2720 		InvokeFunctionExecuteHook(transfn_oid);
2721 
2722 		if (OidIsValid(invtransfn_oid))
2723 		{
2724 			aclresult = pg_proc_aclcheck(invtransfn_oid, aggOwner,
2725 										 ACL_EXECUTE);
2726 			if (aclresult != ACLCHECK_OK)
2727 				aclcheck_error(aclresult, OBJECT_FUNCTION,
2728 							   get_func_name(invtransfn_oid));
2729 			InvokeFunctionExecuteHook(invtransfn_oid);
2730 		}
2731 
2732 		if (OidIsValid(finalfn_oid))
2733 		{
2734 			aclresult = pg_proc_aclcheck(finalfn_oid, aggOwner,
2735 										 ACL_EXECUTE);
2736 			if (aclresult != ACLCHECK_OK)
2737 				aclcheck_error(aclresult, OBJECT_FUNCTION,
2738 							   get_func_name(finalfn_oid));
2739 			InvokeFunctionExecuteHook(finalfn_oid);
2740 		}
2741 	}
2742 
2743 	/*
2744 	 * If the selected finalfn isn't read-only, we can't run this aggregate as
2745 	 * a window function.  This is a user-facing error, so we take a bit more
2746 	 * care with the error message than elsewhere in this function.
2747 	 */
2748 	if (finalmodify != AGGMODIFY_READ_ONLY)
2749 		ereport(ERROR,
2750 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2751 				 errmsg("aggregate function %s does not support use as a window function",
2752 						format_procedure(wfunc->winfnoid))));
2753 
2754 	/* Detect how many arguments to pass to the finalfn */
2755 	if (finalextra)
2756 		peraggstate->numFinalArgs = numArguments + 1;
2757 	else
2758 		peraggstate->numFinalArgs = 1;
2759 
2760 	/* resolve actual type of transition state, if polymorphic */
2761 	aggtranstype = resolve_aggregate_transtype(wfunc->winfnoid,
2762 											   aggtranstype,
2763 											   inputTypes,
2764 											   numArguments);
2765 
2766 	/* build expression trees using actual argument & result types */
2767 	build_aggregate_transfn_expr(inputTypes,
2768 								 numArguments,
2769 								 0, /* no ordered-set window functions yet */
2770 								 false, /* no variadic window functions yet */
2771 								 aggtranstype,
2772 								 wfunc->inputcollid,
2773 								 transfn_oid,
2774 								 invtransfn_oid,
2775 								 &transfnexpr,
2776 								 &invtransfnexpr);
2777 
2778 	/* set up infrastructure for calling the transfn(s) and finalfn */
2779 	fmgr_info(transfn_oid, &peraggstate->transfn);
2780 	fmgr_info_set_expr((Node *) transfnexpr, &peraggstate->transfn);
2781 
2782 	if (OidIsValid(invtransfn_oid))
2783 	{
2784 		fmgr_info(invtransfn_oid, &peraggstate->invtransfn);
2785 		fmgr_info_set_expr((Node *) invtransfnexpr, &peraggstate->invtransfn);
2786 	}
2787 
2788 	if (OidIsValid(finalfn_oid))
2789 	{
2790 		build_aggregate_finalfn_expr(inputTypes,
2791 									 peraggstate->numFinalArgs,
2792 									 aggtranstype,
2793 									 wfunc->wintype,
2794 									 wfunc->inputcollid,
2795 									 finalfn_oid,
2796 									 &finalfnexpr);
2797 		fmgr_info(finalfn_oid, &peraggstate->finalfn);
2798 		fmgr_info_set_expr((Node *) finalfnexpr, &peraggstate->finalfn);
2799 	}
2800 
2801 	/* get info about relevant datatypes */
2802 	get_typlenbyval(wfunc->wintype,
2803 					&peraggstate->resulttypeLen,
2804 					&peraggstate->resulttypeByVal);
2805 	get_typlenbyval(aggtranstype,
2806 					&peraggstate->transtypeLen,
2807 					&peraggstate->transtypeByVal);
2808 
2809 	/*
2810 	 * initval is potentially null, so don't try to access it as a struct
2811 	 * field. Must do it the hard way with SysCacheGetAttr.
2812 	 */
2813 	textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple, initvalAttNo,
2814 								  &peraggstate->initValueIsNull);
2815 
2816 	if (peraggstate->initValueIsNull)
2817 		peraggstate->initValue = (Datum) 0;
2818 	else
2819 		peraggstate->initValue = GetAggInitVal(textInitVal,
2820 											   aggtranstype);
2821 
2822 	/*
2823 	 * If the transfn is strict and the initval is NULL, make sure input type
2824 	 * and transtype are the same (or at least binary-compatible), so that
2825 	 * it's OK to use the first input value as the initial transValue.  This
2826 	 * should have been checked at agg definition time, but we must check
2827 	 * again in case the transfn's strictness property has been changed.
2828 	 */
2829 	if (peraggstate->transfn.fn_strict && peraggstate->initValueIsNull)
2830 	{
2831 		if (numArguments < 1 ||
2832 			!IsBinaryCoercible(inputTypes[0], aggtranstype))
2833 			ereport(ERROR,
2834 					(errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
2835 					 errmsg("aggregate %u needs to have compatible input type and transition type",
2836 							wfunc->winfnoid)));
2837 	}
2838 
2839 	/*
2840 	 * Insist that forward and inverse transition functions have the same
2841 	 * strictness setting.  Allowing them to differ would require handling
2842 	 * more special cases in advance_windowaggregate and
2843 	 * advance_windowaggregate_base, for no discernible benefit.  This should
2844 	 * have been checked at agg definition time, but we must check again in
2845 	 * case either function's strictness property has been changed.
2846 	 */
2847 	if (OidIsValid(invtransfn_oid) &&
2848 		peraggstate->transfn.fn_strict != peraggstate->invtransfn.fn_strict)
2849 		ereport(ERROR,
2850 				(errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
2851 				 errmsg("strictness of aggregate's forward and inverse transition functions must match")));
2852 
2853 	/*
2854 	 * Moving aggregates use their own aggcontext.
2855 	 *
2856 	 * This is necessary because they might restart at different times, so we
2857 	 * might never be able to reset the shared context otherwise.  We can't
2858 	 * make it the aggregates' responsibility to clean up after themselves,
2859 	 * because strict aggregates must be restarted whenever we remove their
2860 	 * last non-NULL input, which the aggregate won't be aware is happening.
2861 	 * Also, just pfree()ing the transValue upon restarting wouldn't help,
2862 	 * since we'd miss any indirectly referenced data.  We could, in theory,
2863 	 * make the memory allocation rules for moving aggregates different than
2864 	 * they have historically been for plain aggregates, but that seems grotty
2865 	 * and likely to lead to memory leaks.
2866 	 */
2867 	if (OidIsValid(invtransfn_oid))
2868 		peraggstate->aggcontext =
2869 			AllocSetContextCreate(CurrentMemoryContext,
2870 								  "WindowAgg Per Aggregate",
2871 								  ALLOCSET_DEFAULT_SIZES);
2872 	else
2873 		peraggstate->aggcontext = winstate->aggcontext;
2874 
2875 	ReleaseSysCache(aggTuple);
2876 
2877 	return peraggstate;
2878 }
2879 
2880 static Datum
GetAggInitVal(Datum textInitVal,Oid transtype)2881 GetAggInitVal(Datum textInitVal, Oid transtype)
2882 {
2883 	Oid			typinput,
2884 				typioparam;
2885 	char	   *strInitVal;
2886 	Datum		initVal;
2887 
2888 	getTypeInputInfo(transtype, &typinput, &typioparam);
2889 	strInitVal = TextDatumGetCString(textInitVal);
2890 	initVal = OidInputFunctionCall(typinput, strInitVal,
2891 								   typioparam, -1);
2892 	pfree(strInitVal);
2893 	return initVal;
2894 }
2895 
2896 /*
2897  * are_peers
2898  * compare two rows to see if they are equal according to the ORDER BY clause
2899  *
2900  * NB: this does not consider the window frame mode.
2901  */
2902 static bool
are_peers(WindowAggState * winstate,TupleTableSlot * slot1,TupleTableSlot * slot2)2903 are_peers(WindowAggState *winstate, TupleTableSlot *slot1,
2904 		  TupleTableSlot *slot2)
2905 {
2906 	WindowAgg  *node = (WindowAgg *) winstate->ss.ps.plan;
2907 	ExprContext *econtext = winstate->tmpcontext;
2908 
2909 	/* If no ORDER BY, all rows are peers with each other */
2910 	if (node->ordNumCols == 0)
2911 		return true;
2912 
2913 	econtext->ecxt_outertuple = slot1;
2914 	econtext->ecxt_innertuple = slot2;
2915 	return ExecQualAndReset(winstate->ordEqfunction, econtext);
2916 }
2917 
2918 /*
2919  * window_gettupleslot
2920  *	Fetch the pos'th tuple of the current partition into the slot,
2921  *	using the winobj's read pointer
2922  *
2923  * Returns true if successful, false if no such row
2924  */
2925 static bool
window_gettupleslot(WindowObject winobj,int64 pos,TupleTableSlot * slot)2926 window_gettupleslot(WindowObject winobj, int64 pos, TupleTableSlot *slot)
2927 {
2928 	WindowAggState *winstate = winobj->winstate;
2929 	MemoryContext oldcontext;
2930 
2931 	/* often called repeatedly in a row */
2932 	CHECK_FOR_INTERRUPTS();
2933 
2934 	/* Don't allow passing -1 to spool_tuples here */
2935 	if (pos < 0)
2936 		return false;
2937 
2938 	/* If necessary, fetch the tuple into the spool */
2939 	spool_tuples(winstate, pos);
2940 
2941 	if (pos >= winstate->spooled_rows)
2942 		return false;
2943 
2944 	if (pos < winobj->markpos)
2945 		elog(ERROR, "cannot fetch row before WindowObject's mark position");
2946 
2947 	oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
2948 
2949 	tuplestore_select_read_pointer(winstate->buffer, winobj->readptr);
2950 
2951 	/*
2952 	 * Advance or rewind until we are within one tuple of the one we want.
2953 	 */
2954 	if (winobj->seekpos < pos - 1)
2955 	{
2956 		if (!tuplestore_skiptuples(winstate->buffer,
2957 								   pos - 1 - winobj->seekpos,
2958 								   true))
2959 			elog(ERROR, "unexpected end of tuplestore");
2960 		winobj->seekpos = pos - 1;
2961 	}
2962 	else if (winobj->seekpos > pos + 1)
2963 	{
2964 		if (!tuplestore_skiptuples(winstate->buffer,
2965 								   winobj->seekpos - (pos + 1),
2966 								   false))
2967 			elog(ERROR, "unexpected end of tuplestore");
2968 		winobj->seekpos = pos + 1;
2969 	}
2970 	else if (winobj->seekpos == pos)
2971 	{
2972 		/*
2973 		 * There's no API to refetch the tuple at the current position.  We
2974 		 * have to move one tuple forward, and then one backward.  (We don't
2975 		 * do it the other way because we might try to fetch the row before
2976 		 * our mark, which isn't allowed.)  XXX this case could stand to be
2977 		 * optimized.
2978 		 */
2979 		tuplestore_advance(winstate->buffer, true);
2980 		winobj->seekpos++;
2981 	}
2982 
2983 	/*
2984 	 * Now we should be on the tuple immediately before or after the one we
2985 	 * want, so just fetch forwards or backwards as appropriate.
2986 	 */
2987 	if (winobj->seekpos > pos)
2988 	{
2989 		if (!tuplestore_gettupleslot(winstate->buffer, false, true, slot))
2990 			elog(ERROR, "unexpected end of tuplestore");
2991 		winobj->seekpos--;
2992 	}
2993 	else
2994 	{
2995 		if (!tuplestore_gettupleslot(winstate->buffer, true, true, slot))
2996 			elog(ERROR, "unexpected end of tuplestore");
2997 		winobj->seekpos++;
2998 	}
2999 
3000 	Assert(winobj->seekpos == pos);
3001 
3002 	MemoryContextSwitchTo(oldcontext);
3003 
3004 	return true;
3005 }
3006 
3007 
3008 /***********************************************************************
3009  * API exposed to window functions
3010  ***********************************************************************/
3011 
3012 
3013 /*
3014  * WinGetPartitionLocalMemory
3015  *		Get working memory that lives till end of partition processing
3016  *
3017  * On first call within a given partition, this allocates and zeroes the
3018  * requested amount of space.  Subsequent calls just return the same chunk.
3019  *
3020  * Memory obtained this way is normally used to hold state that should be
3021  * automatically reset for each new partition.  If a window function wants
3022  * to hold state across the whole query, fcinfo->fn_extra can be used in the
3023  * usual way for that.
3024  */
3025 void *
WinGetPartitionLocalMemory(WindowObject winobj,Size sz)3026 WinGetPartitionLocalMemory(WindowObject winobj, Size sz)
3027 {
3028 	Assert(WindowObjectIsValid(winobj));
3029 	if (winobj->localmem == NULL)
3030 		winobj->localmem =
3031 			MemoryContextAllocZero(winobj->winstate->partcontext, sz);
3032 	return winobj->localmem;
3033 }
3034 
3035 /*
3036  * WinGetCurrentPosition
3037  *		Return the current row's position (counting from 0) within the current
3038  *		partition.
3039  */
3040 int64
WinGetCurrentPosition(WindowObject winobj)3041 WinGetCurrentPosition(WindowObject winobj)
3042 {
3043 	Assert(WindowObjectIsValid(winobj));
3044 	return winobj->winstate->currentpos;
3045 }
3046 
3047 /*
3048  * WinGetPartitionRowCount
3049  *		Return total number of rows contained in the current partition.
3050  *
3051  * Note: this is a relatively expensive operation because it forces the
3052  * whole partition to be "spooled" into the tuplestore at once.  Once
3053  * executed, however, additional calls within the same partition are cheap.
3054  */
3055 int64
WinGetPartitionRowCount(WindowObject winobj)3056 WinGetPartitionRowCount(WindowObject winobj)
3057 {
3058 	Assert(WindowObjectIsValid(winobj));
3059 	spool_tuples(winobj->winstate, -1);
3060 	return winobj->winstate->spooled_rows;
3061 }
3062 
3063 /*
3064  * WinSetMarkPosition
3065  *		Set the "mark" position for the window object, which is the oldest row
3066  *		number (counting from 0) it is allowed to fetch during all subsequent
3067  *		operations within the current partition.
3068  *
3069  * Window functions do not have to call this, but are encouraged to move the
3070  * mark forward when possible to keep the tuplestore size down and prevent
3071  * having to spill rows to disk.
3072  */
3073 void
WinSetMarkPosition(WindowObject winobj,int64 markpos)3074 WinSetMarkPosition(WindowObject winobj, int64 markpos)
3075 {
3076 	WindowAggState *winstate;
3077 
3078 	Assert(WindowObjectIsValid(winobj));
3079 	winstate = winobj->winstate;
3080 
3081 	if (markpos < winobj->markpos)
3082 		elog(ERROR, "cannot move WindowObject's mark position backward");
3083 	tuplestore_select_read_pointer(winstate->buffer, winobj->markptr);
3084 	if (markpos > winobj->markpos)
3085 	{
3086 		tuplestore_skiptuples(winstate->buffer,
3087 							  markpos - winobj->markpos,
3088 							  true);
3089 		winobj->markpos = markpos;
3090 	}
3091 	tuplestore_select_read_pointer(winstate->buffer, winobj->readptr);
3092 	if (markpos > winobj->seekpos)
3093 	{
3094 		tuplestore_skiptuples(winstate->buffer,
3095 							  markpos - winobj->seekpos,
3096 							  true);
3097 		winobj->seekpos = markpos;
3098 	}
3099 }
3100 
3101 /*
3102  * WinRowsArePeers
3103  *		Compare two rows (specified by absolute position in partition) to see
3104  *		if they are equal according to the ORDER BY clause.
3105  *
3106  * NB: this does not consider the window frame mode.
3107  */
3108 bool
WinRowsArePeers(WindowObject winobj,int64 pos1,int64 pos2)3109 WinRowsArePeers(WindowObject winobj, int64 pos1, int64 pos2)
3110 {
3111 	WindowAggState *winstate;
3112 	WindowAgg  *node;
3113 	TupleTableSlot *slot1;
3114 	TupleTableSlot *slot2;
3115 	bool		res;
3116 
3117 	Assert(WindowObjectIsValid(winobj));
3118 	winstate = winobj->winstate;
3119 	node = (WindowAgg *) winstate->ss.ps.plan;
3120 
3121 	/* If no ORDER BY, all rows are peers; don't bother to fetch them */
3122 	if (node->ordNumCols == 0)
3123 		return true;
3124 
3125 	/*
3126 	 * Note: OK to use temp_slot_2 here because we aren't calling any
3127 	 * frame-related functions (those tend to clobber temp_slot_2).
3128 	 */
3129 	slot1 = winstate->temp_slot_1;
3130 	slot2 = winstate->temp_slot_2;
3131 
3132 	if (!window_gettupleslot(winobj, pos1, slot1))
3133 		elog(ERROR, "specified position is out of window: " INT64_FORMAT,
3134 			 pos1);
3135 	if (!window_gettupleslot(winobj, pos2, slot2))
3136 		elog(ERROR, "specified position is out of window: " INT64_FORMAT,
3137 			 pos2);
3138 
3139 	res = are_peers(winstate, slot1, slot2);
3140 
3141 	ExecClearTuple(slot1);
3142 	ExecClearTuple(slot2);
3143 
3144 	return res;
3145 }
3146 
3147 /*
3148  * WinGetFuncArgInPartition
3149  *		Evaluate a window function's argument expression on a specified
3150  *		row of the partition.  The row is identified in lseek(2) style,
3151  *		i.e. relative to the current, first, or last row.
3152  *
3153  * argno: argument number to evaluate (counted from 0)
3154  * relpos: signed rowcount offset from the seek position
3155  * seektype: WINDOW_SEEK_CURRENT, WINDOW_SEEK_HEAD, or WINDOW_SEEK_TAIL
3156  * set_mark: If the row is found and set_mark is true, the mark is moved to
3157  *		the row as a side-effect.
3158  * isnull: output argument, receives isnull status of result
3159  * isout: output argument, set to indicate whether target row position
3160  *		is out of partition (can pass NULL if caller doesn't care about this)
3161  *
3162  * Specifying a nonexistent row is not an error, it just causes a null result
3163  * (plus setting *isout true, if isout isn't NULL).
3164  */
3165 Datum
WinGetFuncArgInPartition(WindowObject winobj,int argno,int relpos,int seektype,bool set_mark,bool * isnull,bool * isout)3166 WinGetFuncArgInPartition(WindowObject winobj, int argno,
3167 						 int relpos, int seektype, bool set_mark,
3168 						 bool *isnull, bool *isout)
3169 {
3170 	WindowAggState *winstate;
3171 	ExprContext *econtext;
3172 	TupleTableSlot *slot;
3173 	bool		gottuple;
3174 	int64		abs_pos;
3175 
3176 	Assert(WindowObjectIsValid(winobj));
3177 	winstate = winobj->winstate;
3178 	econtext = winstate->ss.ps.ps_ExprContext;
3179 	slot = winstate->temp_slot_1;
3180 
3181 	switch (seektype)
3182 	{
3183 		case WINDOW_SEEK_CURRENT:
3184 			abs_pos = winstate->currentpos + relpos;
3185 			break;
3186 		case WINDOW_SEEK_HEAD:
3187 			abs_pos = relpos;
3188 			break;
3189 		case WINDOW_SEEK_TAIL:
3190 			spool_tuples(winstate, -1);
3191 			abs_pos = winstate->spooled_rows - 1 + relpos;
3192 			break;
3193 		default:
3194 			elog(ERROR, "unrecognized window seek type: %d", seektype);
3195 			abs_pos = 0;		/* keep compiler quiet */
3196 			break;
3197 	}
3198 
3199 	gottuple = window_gettupleslot(winobj, abs_pos, slot);
3200 
3201 	if (!gottuple)
3202 	{
3203 		if (isout)
3204 			*isout = true;
3205 		*isnull = true;
3206 		return (Datum) 0;
3207 	}
3208 	else
3209 	{
3210 		if (isout)
3211 			*isout = false;
3212 		if (set_mark)
3213 			WinSetMarkPosition(winobj, abs_pos);
3214 		econtext->ecxt_outertuple = slot;
3215 		return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
3216 							econtext, isnull);
3217 	}
3218 }
3219 
3220 /*
3221  * WinGetFuncArgInFrame
3222  *		Evaluate a window function's argument expression on a specified
3223  *		row of the window frame.  The row is identified in lseek(2) style,
3224  *		i.e. relative to the first or last row of the frame.  (We do not
3225  *		support WINDOW_SEEK_CURRENT here, because it's not very clear what
3226  *		that should mean if the current row isn't part of the frame.)
3227  *
3228  * argno: argument number to evaluate (counted from 0)
3229  * relpos: signed rowcount offset from the seek position
3230  * seektype: WINDOW_SEEK_HEAD or WINDOW_SEEK_TAIL
3231  * set_mark: If the row is found/in frame and set_mark is true, the mark is
3232  *		moved to the row as a side-effect.
3233  * isnull: output argument, receives isnull status of result
3234  * isout: output argument, set to indicate whether target row position
3235  *		is out of frame (can pass NULL if caller doesn't care about this)
3236  *
3237  * Specifying a nonexistent or not-in-frame row is not an error, it just
3238  * causes a null result (plus setting *isout true, if isout isn't NULL).
3239  *
3240  * Note that some exclusion-clause options lead to situations where the
3241  * rows that are in-frame are not consecutive in the partition.  But we
3242  * count only in-frame rows when measuring relpos.
3243  *
3244  * The set_mark flag is interpreted as meaning that the caller will specify
3245  * a constant (or, perhaps, monotonically increasing) relpos in successive
3246  * calls, so that *if there is no exclusion clause* there will be no need
3247  * to fetch a row before the previously fetched row.  But we do not expect
3248  * the caller to know how to account for exclusion clauses.  Therefore,
3249  * if there is an exclusion clause we take responsibility for adjusting the
3250  * mark request to something that will be safe given the above assumption
3251  * about relpos.
3252  */
3253 Datum
WinGetFuncArgInFrame(WindowObject winobj,int argno,int relpos,int seektype,bool set_mark,bool * isnull,bool * isout)3254 WinGetFuncArgInFrame(WindowObject winobj, int argno,
3255 					 int relpos, int seektype, bool set_mark,
3256 					 bool *isnull, bool *isout)
3257 {
3258 	WindowAggState *winstate;
3259 	ExprContext *econtext;
3260 	TupleTableSlot *slot;
3261 	int64		abs_pos;
3262 	int64		mark_pos;
3263 
3264 	Assert(WindowObjectIsValid(winobj));
3265 	winstate = winobj->winstate;
3266 	econtext = winstate->ss.ps.ps_ExprContext;
3267 	slot = winstate->temp_slot_1;
3268 
3269 	switch (seektype)
3270 	{
3271 		case WINDOW_SEEK_CURRENT:
3272 			elog(ERROR, "WINDOW_SEEK_CURRENT is not supported for WinGetFuncArgInFrame");
3273 			abs_pos = mark_pos = 0; /* keep compiler quiet */
3274 			break;
3275 		case WINDOW_SEEK_HEAD:
3276 			/* rejecting relpos < 0 is easy and simplifies code below */
3277 			if (relpos < 0)
3278 				goto out_of_frame;
3279 			update_frameheadpos(winstate);
3280 			abs_pos = winstate->frameheadpos + relpos;
3281 			mark_pos = abs_pos;
3282 
3283 			/*
3284 			 * Account for exclusion option if one is active, but advance only
3285 			 * abs_pos not mark_pos.  This prevents changes of the current
3286 			 * row's peer group from resulting in trying to fetch a row before
3287 			 * some previous mark position.
3288 			 *
3289 			 * Note that in some corner cases such as current row being
3290 			 * outside frame, these calculations are theoretically too simple,
3291 			 * but it doesn't matter because we'll end up deciding the row is
3292 			 * out of frame.  We do not attempt to avoid fetching rows past
3293 			 * end of frame; that would happen in some cases anyway.
3294 			 */
3295 			switch (winstate->frameOptions & FRAMEOPTION_EXCLUSION)
3296 			{
3297 				case 0:
3298 					/* no adjustment needed */
3299 					break;
3300 				case FRAMEOPTION_EXCLUDE_CURRENT_ROW:
3301 					if (abs_pos >= winstate->currentpos &&
3302 						winstate->currentpos >= winstate->frameheadpos)
3303 						abs_pos++;
3304 					break;
3305 				case FRAMEOPTION_EXCLUDE_GROUP:
3306 					update_grouptailpos(winstate);
3307 					if (abs_pos >= winstate->groupheadpos &&
3308 						winstate->grouptailpos > winstate->frameheadpos)
3309 					{
3310 						int64		overlapstart = Max(winstate->groupheadpos,
3311 													   winstate->frameheadpos);
3312 
3313 						abs_pos += winstate->grouptailpos - overlapstart;
3314 					}
3315 					break;
3316 				case FRAMEOPTION_EXCLUDE_TIES:
3317 					update_grouptailpos(winstate);
3318 					if (abs_pos >= winstate->groupheadpos &&
3319 						winstate->grouptailpos > winstate->frameheadpos)
3320 					{
3321 						int64		overlapstart = Max(winstate->groupheadpos,
3322 													   winstate->frameheadpos);
3323 
3324 						if (abs_pos == overlapstart)
3325 							abs_pos = winstate->currentpos;
3326 						else
3327 							abs_pos += winstate->grouptailpos - overlapstart - 1;
3328 					}
3329 					break;
3330 				default:
3331 					elog(ERROR, "unrecognized frame option state: 0x%x",
3332 						 winstate->frameOptions);
3333 					break;
3334 			}
3335 			break;
3336 		case WINDOW_SEEK_TAIL:
3337 			/* rejecting relpos > 0 is easy and simplifies code below */
3338 			if (relpos > 0)
3339 				goto out_of_frame;
3340 			update_frametailpos(winstate);
3341 			abs_pos = winstate->frametailpos - 1 + relpos;
3342 
3343 			/*
3344 			 * Account for exclusion option if one is active.  If there is no
3345 			 * exclusion, we can safely set the mark at the accessed row.  But
3346 			 * if there is, we can only mark the frame start, because we can't
3347 			 * be sure how far back in the frame the exclusion might cause us
3348 			 * to fetch in future.  Furthermore, we have to actually check
3349 			 * against frameheadpos here, since it's unsafe to try to fetch a
3350 			 * row before frame start if the mark might be there already.
3351 			 */
3352 			switch (winstate->frameOptions & FRAMEOPTION_EXCLUSION)
3353 			{
3354 				case 0:
3355 					/* no adjustment needed */
3356 					mark_pos = abs_pos;
3357 					break;
3358 				case FRAMEOPTION_EXCLUDE_CURRENT_ROW:
3359 					if (abs_pos <= winstate->currentpos &&
3360 						winstate->currentpos < winstate->frametailpos)
3361 						abs_pos--;
3362 					update_frameheadpos(winstate);
3363 					if (abs_pos < winstate->frameheadpos)
3364 						goto out_of_frame;
3365 					mark_pos = winstate->frameheadpos;
3366 					break;
3367 				case FRAMEOPTION_EXCLUDE_GROUP:
3368 					update_grouptailpos(winstate);
3369 					if (abs_pos < winstate->grouptailpos &&
3370 						winstate->groupheadpos < winstate->frametailpos)
3371 					{
3372 						int64		overlapend = Min(winstate->grouptailpos,
3373 													 winstate->frametailpos);
3374 
3375 						abs_pos -= overlapend - winstate->groupheadpos;
3376 					}
3377 					update_frameheadpos(winstate);
3378 					if (abs_pos < winstate->frameheadpos)
3379 						goto out_of_frame;
3380 					mark_pos = winstate->frameheadpos;
3381 					break;
3382 				case FRAMEOPTION_EXCLUDE_TIES:
3383 					update_grouptailpos(winstate);
3384 					if (abs_pos < winstate->grouptailpos &&
3385 						winstate->groupheadpos < winstate->frametailpos)
3386 					{
3387 						int64		overlapend = Min(winstate->grouptailpos,
3388 													 winstate->frametailpos);
3389 
3390 						if (abs_pos == overlapend - 1)
3391 							abs_pos = winstate->currentpos;
3392 						else
3393 							abs_pos -= overlapend - 1 - winstate->groupheadpos;
3394 					}
3395 					update_frameheadpos(winstate);
3396 					if (abs_pos < winstate->frameheadpos)
3397 						goto out_of_frame;
3398 					mark_pos = winstate->frameheadpos;
3399 					break;
3400 				default:
3401 					elog(ERROR, "unrecognized frame option state: 0x%x",
3402 						 winstate->frameOptions);
3403 					mark_pos = 0;	/* keep compiler quiet */
3404 					break;
3405 			}
3406 			break;
3407 		default:
3408 			elog(ERROR, "unrecognized window seek type: %d", seektype);
3409 			abs_pos = mark_pos = 0; /* keep compiler quiet */
3410 			break;
3411 	}
3412 
3413 	if (!window_gettupleslot(winobj, abs_pos, slot))
3414 		goto out_of_frame;
3415 
3416 	/* The code above does not detect all out-of-frame cases, so check */
3417 	if (row_is_in_frame(winstate, abs_pos, slot) <= 0)
3418 		goto out_of_frame;
3419 
3420 	if (isout)
3421 		*isout = false;
3422 	if (set_mark)
3423 		WinSetMarkPosition(winobj, mark_pos);
3424 	econtext->ecxt_outertuple = slot;
3425 	return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
3426 						econtext, isnull);
3427 
3428 out_of_frame:
3429 	if (isout)
3430 		*isout = true;
3431 	*isnull = true;
3432 	return (Datum) 0;
3433 }
3434 
3435 /*
3436  * WinGetFuncArgCurrent
3437  *		Evaluate a window function's argument expression on the current row.
3438  *
3439  * argno: argument number to evaluate (counted from 0)
3440  * isnull: output argument, receives isnull status of result
3441  *
3442  * Note: this isn't quite equivalent to WinGetFuncArgInPartition or
3443  * WinGetFuncArgInFrame targeting the current row, because it will succeed
3444  * even if the WindowObject's mark has been set beyond the current row.
3445  * This should generally be used for "ordinary" arguments of a window
3446  * function, such as the offset argument of lead() or lag().
3447  */
3448 Datum
WinGetFuncArgCurrent(WindowObject winobj,int argno,bool * isnull)3449 WinGetFuncArgCurrent(WindowObject winobj, int argno, bool *isnull)
3450 {
3451 	WindowAggState *winstate;
3452 	ExprContext *econtext;
3453 
3454 	Assert(WindowObjectIsValid(winobj));
3455 	winstate = winobj->winstate;
3456 
3457 	econtext = winstate->ss.ps.ps_ExprContext;
3458 
3459 	econtext->ecxt_outertuple = winstate->ss.ss_ScanTupleSlot;
3460 	return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
3461 						econtext, isnull);
3462 }
3463