1 /*-------------------------------------------------------------------------
2  *
3  * nodeWindowAgg.c
4  *	  routines to handle WindowAgg nodes.
5  *
6  * A WindowAgg node evaluates "window functions" across suitable partitions
7  * of the input tuple set.  Any one WindowAgg works for just a single window
8  * specification, though it can evaluate multiple window functions sharing
9  * identical window specifications.  The input tuples are required to be
10  * delivered in sorted order, with the PARTITION BY columns (if any) as
11  * major sort keys and the ORDER BY columns (if any) as minor sort keys.
12  * (The planner generates a stack of WindowAggs with intervening Sort nodes
13  * as needed, if a query involves more than one window specification.)
14  *
15  * Since window functions can require access to any or all of the rows in
16  * the current partition, we accumulate rows of the partition into a
17  * tuplestore.  The window functions are called using the WindowObject API
18  * so that they can access those rows as needed.
19  *
20  * We also support using plain aggregate functions as window functions.
21  * For these, the regular Agg-node environment is emulated for each partition.
22  * As required by the SQL spec, the output represents the value of the
23  * aggregate function over all rows in the current row's window frame.
24  *
25  *
26  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
27  * Portions Copyright (c) 1994, Regents of the University of California
28  *
29  * IDENTIFICATION
30  *	  src/backend/executor/nodeWindowAgg.c
31  *
32  *-------------------------------------------------------------------------
33  */
34 #include "postgres.h"
35 
36 #include "access/htup_details.h"
37 #include "catalog/objectaccess.h"
38 #include "catalog/pg_aggregate.h"
39 #include "catalog/pg_proc.h"
40 #include "executor/executor.h"
41 #include "executor/nodeWindowAgg.h"
42 #include "miscadmin.h"
43 #include "nodes/nodeFuncs.h"
44 #include "optimizer/optimizer.h"
45 #include "parser/parse_agg.h"
46 #include "parser/parse_coerce.h"
47 #include "utils/acl.h"
48 #include "utils/builtins.h"
49 #include "utils/datum.h"
50 #include "utils/expandeddatum.h"
51 #include "utils/lsyscache.h"
52 #include "utils/memutils.h"
53 #include "utils/regproc.h"
54 #include "utils/syscache.h"
55 #include "windowapi.h"
56 
57 /*
58  * All the window function APIs are called with this object, which is passed
59  * to window functions as fcinfo->context.
60  */
61 typedef struct WindowObjectData
62 {
63 	NodeTag		type;
64 	WindowAggState *winstate;	/* parent WindowAggState */
65 	List	   *argstates;		/* ExprState trees for fn's arguments */
66 	void	   *localmem;		/* WinGetPartitionLocalMemory's chunk */
67 	int			markptr;		/* tuplestore mark pointer for this fn */
68 	int			readptr;		/* tuplestore read pointer for this fn */
69 	int64		markpos;		/* row that markptr is positioned on */
70 	int64		seekpos;		/* row that readptr is positioned on */
71 } WindowObjectData;
72 
73 /*
74  * We have one WindowStatePerFunc struct for each window function and
75  * window aggregate handled by this node.
76  */
77 typedef struct WindowStatePerFuncData
78 {
79 	/* Links to WindowFunc expr and state nodes this working state is for */
80 	WindowFuncExprState *wfuncstate;
81 	WindowFunc *wfunc;
82 
83 	int			numArguments;	/* number of arguments */
84 
85 	FmgrInfo	flinfo;			/* fmgr lookup data for window function */
86 
87 	Oid			winCollation;	/* collation derived for window function */
88 
89 	/*
90 	 * We need the len and byval info for the result of each function in order
91 	 * to know how to copy/delete values.
92 	 */
93 	int16		resulttypeLen;
94 	bool		resulttypeByVal;
95 
96 	bool		plain_agg;		/* is it just a plain aggregate function? */
97 	int			aggno;			/* if so, index of its WindowStatePerAggData */
98 
99 	WindowObject winobj;		/* object used in window function API */
100 }			WindowStatePerFuncData;
101 
102 /*
103  * For plain aggregate window functions, we also have one of these.
104  */
105 typedef struct WindowStatePerAggData
106 {
107 	/* Oids of transition functions */
108 	Oid			transfn_oid;
109 	Oid			invtransfn_oid; /* may be InvalidOid */
110 	Oid			finalfn_oid;	/* may be InvalidOid */
111 
112 	/*
113 	 * fmgr lookup data for transition functions --- only valid when
114 	 * corresponding oid is not InvalidOid.  Note in particular that fn_strict
115 	 * flags are kept here.
116 	 */
117 	FmgrInfo	transfn;
118 	FmgrInfo	invtransfn;
119 	FmgrInfo	finalfn;
120 
121 	int			numFinalArgs;	/* number of arguments to pass to finalfn */
122 
123 	/*
124 	 * initial value from pg_aggregate entry
125 	 */
126 	Datum		initValue;
127 	bool		initValueIsNull;
128 
129 	/*
130 	 * cached value for current frame boundaries
131 	 */
132 	Datum		resultValue;
133 	bool		resultValueIsNull;
134 
135 	/*
136 	 * We need the len and byval info for the agg's input, result, and
137 	 * transition data types in order to know how to copy/delete values.
138 	 */
139 	int16		inputtypeLen,
140 				resulttypeLen,
141 				transtypeLen;
142 	bool		inputtypeByVal,
143 				resulttypeByVal,
144 				transtypeByVal;
145 
146 	int			wfuncno;		/* index of associated WindowStatePerFuncData */
147 
148 	/* Context holding transition value and possibly other subsidiary data */
149 	MemoryContext aggcontext;	/* may be private, or winstate->aggcontext */
150 
151 	/* Current transition value */
152 	Datum		transValue;		/* current transition value */
153 	bool		transValueIsNull;
154 
155 	int64		transValueCount;	/* number of currently-aggregated rows */
156 
157 	/* Data local to eval_windowaggregates() */
158 	bool		restart;		/* need to restart this agg in this cycle? */
159 } WindowStatePerAggData;
160 
161 static void initialize_windowaggregate(WindowAggState *winstate,
162 									   WindowStatePerFunc perfuncstate,
163 									   WindowStatePerAgg peraggstate);
164 static void advance_windowaggregate(WindowAggState *winstate,
165 									WindowStatePerFunc perfuncstate,
166 									WindowStatePerAgg peraggstate);
167 static bool advance_windowaggregate_base(WindowAggState *winstate,
168 										 WindowStatePerFunc perfuncstate,
169 										 WindowStatePerAgg peraggstate);
170 static void finalize_windowaggregate(WindowAggState *winstate,
171 									 WindowStatePerFunc perfuncstate,
172 									 WindowStatePerAgg peraggstate,
173 									 Datum *result, bool *isnull);
174 
175 static void eval_windowaggregates(WindowAggState *winstate);
176 static void eval_windowfunction(WindowAggState *winstate,
177 								WindowStatePerFunc perfuncstate,
178 								Datum *result, bool *isnull);
179 
180 static void begin_partition(WindowAggState *winstate);
181 static void spool_tuples(WindowAggState *winstate, int64 pos);
182 static void release_partition(WindowAggState *winstate);
183 
184 static int	row_is_in_frame(WindowAggState *winstate, int64 pos,
185 							TupleTableSlot *slot);
186 static void update_frameheadpos(WindowAggState *winstate);
187 static void update_frametailpos(WindowAggState *winstate);
188 static void update_grouptailpos(WindowAggState *winstate);
189 
190 static WindowStatePerAggData *initialize_peragg(WindowAggState *winstate,
191 												WindowFunc *wfunc,
192 												WindowStatePerAgg peraggstate);
193 static Datum GetAggInitVal(Datum textInitVal, Oid transtype);
194 
195 static bool are_peers(WindowAggState *winstate, TupleTableSlot *slot1,
196 					  TupleTableSlot *slot2);
197 static bool window_gettupleslot(WindowObject winobj, int64 pos,
198 								TupleTableSlot *slot);
199 
200 
201 /*
202  * initialize_windowaggregate
203  * parallel to initialize_aggregates in nodeAgg.c
204  */
205 static void
initialize_windowaggregate(WindowAggState * winstate,WindowStatePerFunc perfuncstate,WindowStatePerAgg peraggstate)206 initialize_windowaggregate(WindowAggState *winstate,
207 						   WindowStatePerFunc perfuncstate,
208 						   WindowStatePerAgg peraggstate)
209 {
210 	MemoryContext oldContext;
211 
212 	/*
213 	 * If we're using a private aggcontext, we may reset it here.  But if the
214 	 * context is shared, we don't know which other aggregates may still need
215 	 * it, so we must leave it to the caller to reset at an appropriate time.
216 	 */
217 	if (peraggstate->aggcontext != winstate->aggcontext)
218 		MemoryContextResetAndDeleteChildren(peraggstate->aggcontext);
219 
220 	if (peraggstate->initValueIsNull)
221 		peraggstate->transValue = peraggstate->initValue;
222 	else
223 	{
224 		oldContext = MemoryContextSwitchTo(peraggstate->aggcontext);
225 		peraggstate->transValue = datumCopy(peraggstate->initValue,
226 											peraggstate->transtypeByVal,
227 											peraggstate->transtypeLen);
228 		MemoryContextSwitchTo(oldContext);
229 	}
230 	peraggstate->transValueIsNull = peraggstate->initValueIsNull;
231 	peraggstate->transValueCount = 0;
232 	peraggstate->resultValue = (Datum) 0;
233 	peraggstate->resultValueIsNull = true;
234 }
235 
236 /*
237  * advance_windowaggregate
238  * parallel to advance_aggregates in nodeAgg.c
239  */
240 static void
advance_windowaggregate(WindowAggState * winstate,WindowStatePerFunc perfuncstate,WindowStatePerAgg peraggstate)241 advance_windowaggregate(WindowAggState *winstate,
242 						WindowStatePerFunc perfuncstate,
243 						WindowStatePerAgg peraggstate)
244 {
245 	LOCAL_FCINFO(fcinfo, FUNC_MAX_ARGS);
246 	WindowFuncExprState *wfuncstate = perfuncstate->wfuncstate;
247 	int			numArguments = perfuncstate->numArguments;
248 	Datum		newVal;
249 	ListCell   *arg;
250 	int			i;
251 	MemoryContext oldContext;
252 	ExprContext *econtext = winstate->tmpcontext;
253 	ExprState  *filter = wfuncstate->aggfilter;
254 
255 	oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
256 
257 	/* Skip anything FILTERed out */
258 	if (filter)
259 	{
260 		bool		isnull;
261 		Datum		res = ExecEvalExpr(filter, econtext, &isnull);
262 
263 		if (isnull || !DatumGetBool(res))
264 		{
265 			MemoryContextSwitchTo(oldContext);
266 			return;
267 		}
268 	}
269 
270 	/* We start from 1, since the 0th arg will be the transition value */
271 	i = 1;
272 	foreach(arg, wfuncstate->args)
273 	{
274 		ExprState  *argstate = (ExprState *) lfirst(arg);
275 
276 		fcinfo->args[i].value = ExecEvalExpr(argstate, econtext,
277 											 &fcinfo->args[i].isnull);
278 		i++;
279 	}
280 
281 	if (peraggstate->transfn.fn_strict)
282 	{
283 		/*
284 		 * For a strict transfn, nothing happens when there's a NULL input; we
285 		 * just keep the prior transValue.  Note transValueCount doesn't
286 		 * change either.
287 		 */
288 		for (i = 1; i <= numArguments; i++)
289 		{
290 			if (fcinfo->args[i].isnull)
291 			{
292 				MemoryContextSwitchTo(oldContext);
293 				return;
294 			}
295 		}
296 
297 		/*
298 		 * For strict transition functions with initial value NULL we use the
299 		 * first non-NULL input as the initial state.  (We already checked
300 		 * that the agg's input type is binary-compatible with its transtype,
301 		 * so straight copy here is OK.)
302 		 *
303 		 * We must copy the datum into aggcontext if it is pass-by-ref.  We do
304 		 * not need to pfree the old transValue, since it's NULL.
305 		 */
306 		if (peraggstate->transValueCount == 0 && peraggstate->transValueIsNull)
307 		{
308 			MemoryContextSwitchTo(peraggstate->aggcontext);
309 			peraggstate->transValue = datumCopy(fcinfo->args[1].value,
310 												peraggstate->transtypeByVal,
311 												peraggstate->transtypeLen);
312 			peraggstate->transValueIsNull = false;
313 			peraggstate->transValueCount = 1;
314 			MemoryContextSwitchTo(oldContext);
315 			return;
316 		}
317 
318 		if (peraggstate->transValueIsNull)
319 		{
320 			/*
321 			 * Don't call a strict function with NULL inputs.  Note it is
322 			 * possible to get here despite the above tests, if the transfn is
323 			 * strict *and* returned a NULL on a prior cycle.  If that happens
324 			 * we will propagate the NULL all the way to the end.  That can
325 			 * only happen if there's no inverse transition function, though,
326 			 * since we disallow transitions back to NULL when there is one.
327 			 */
328 			MemoryContextSwitchTo(oldContext);
329 			Assert(!OidIsValid(peraggstate->invtransfn_oid));
330 			return;
331 		}
332 	}
333 
334 	/*
335 	 * OK to call the transition function.  Set winstate->curaggcontext while
336 	 * calling it, for possible use by AggCheckCallContext.
337 	 */
338 	InitFunctionCallInfoData(*fcinfo, &(peraggstate->transfn),
339 							 numArguments + 1,
340 							 perfuncstate->winCollation,
341 							 (void *) winstate, NULL);
342 	fcinfo->args[0].value = peraggstate->transValue;
343 	fcinfo->args[0].isnull = peraggstate->transValueIsNull;
344 	winstate->curaggcontext = peraggstate->aggcontext;
345 	newVal = FunctionCallInvoke(fcinfo);
346 	winstate->curaggcontext = NULL;
347 
348 	/*
349 	 * Moving-aggregate transition functions must not return null, see
350 	 * advance_windowaggregate_base().
351 	 */
352 	if (fcinfo->isnull && OidIsValid(peraggstate->invtransfn_oid))
353 		ereport(ERROR,
354 				(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
355 				 errmsg("moving-aggregate transition function must not return null")));
356 
357 	/*
358 	 * We must track the number of rows included in transValue, since to
359 	 * remove the last input, advance_windowaggregate_base() mustn't call the
360 	 * inverse transition function, but simply reset transValue back to its
361 	 * initial value.
362 	 */
363 	peraggstate->transValueCount++;
364 
365 	/*
366 	 * If pass-by-ref datatype, must copy the new value into aggcontext and
367 	 * free the prior transValue.  But if transfn returned a pointer to its
368 	 * first input, we don't need to do anything.  Also, if transfn returned a
369 	 * pointer to a R/W expanded object that is already a child of the
370 	 * aggcontext, assume we can adopt that value without copying it.
371 	 */
372 	if (!peraggstate->transtypeByVal &&
373 		DatumGetPointer(newVal) != DatumGetPointer(peraggstate->transValue))
374 	{
375 		if (!fcinfo->isnull)
376 		{
377 			MemoryContextSwitchTo(peraggstate->aggcontext);
378 			if (DatumIsReadWriteExpandedObject(newVal,
379 											   false,
380 											   peraggstate->transtypeLen) &&
381 				MemoryContextGetParent(DatumGetEOHP(newVal)->eoh_context) == CurrentMemoryContext)
382 				 /* do nothing */ ;
383 			else
384 				newVal = datumCopy(newVal,
385 								   peraggstate->transtypeByVal,
386 								   peraggstate->transtypeLen);
387 		}
388 		if (!peraggstate->transValueIsNull)
389 		{
390 			if (DatumIsReadWriteExpandedObject(peraggstate->transValue,
391 											   false,
392 											   peraggstate->transtypeLen))
393 				DeleteExpandedObject(peraggstate->transValue);
394 			else
395 				pfree(DatumGetPointer(peraggstate->transValue));
396 		}
397 	}
398 
399 	MemoryContextSwitchTo(oldContext);
400 	peraggstate->transValue = newVal;
401 	peraggstate->transValueIsNull = fcinfo->isnull;
402 }
403 
404 /*
405  * advance_windowaggregate_base
406  * Remove the oldest tuple from an aggregation.
407  *
408  * This is very much like advance_windowaggregate, except that we will call
409  * the inverse transition function (which caller must have checked is
410  * available).
411  *
412  * Returns true if we successfully removed the current row from this
413  * aggregate, false if not (in the latter case, caller is responsible
414  * for cleaning up by restarting the aggregation).
415  */
416 static bool
advance_windowaggregate_base(WindowAggState * winstate,WindowStatePerFunc perfuncstate,WindowStatePerAgg peraggstate)417 advance_windowaggregate_base(WindowAggState *winstate,
418 							 WindowStatePerFunc perfuncstate,
419 							 WindowStatePerAgg peraggstate)
420 {
421 	LOCAL_FCINFO(fcinfo, FUNC_MAX_ARGS);
422 	WindowFuncExprState *wfuncstate = perfuncstate->wfuncstate;
423 	int			numArguments = perfuncstate->numArguments;
424 	Datum		newVal;
425 	ListCell   *arg;
426 	int			i;
427 	MemoryContext oldContext;
428 	ExprContext *econtext = winstate->tmpcontext;
429 	ExprState  *filter = wfuncstate->aggfilter;
430 
431 	oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
432 
433 	/* Skip anything FILTERed out */
434 	if (filter)
435 	{
436 		bool		isnull;
437 		Datum		res = ExecEvalExpr(filter, econtext, &isnull);
438 
439 		if (isnull || !DatumGetBool(res))
440 		{
441 			MemoryContextSwitchTo(oldContext);
442 			return true;
443 		}
444 	}
445 
446 	/* We start from 1, since the 0th arg will be the transition value */
447 	i = 1;
448 	foreach(arg, wfuncstate->args)
449 	{
450 		ExprState  *argstate = (ExprState *) lfirst(arg);
451 
452 		fcinfo->args[i].value = ExecEvalExpr(argstate, econtext,
453 											 &fcinfo->args[i].isnull);
454 		i++;
455 	}
456 
457 	if (peraggstate->invtransfn.fn_strict)
458 	{
459 		/*
460 		 * For a strict (inv)transfn, nothing happens when there's a NULL
461 		 * input; we just keep the prior transValue.  Note transValueCount
462 		 * doesn't change either.
463 		 */
464 		for (i = 1; i <= numArguments; i++)
465 		{
466 			if (fcinfo->args[i].isnull)
467 			{
468 				MemoryContextSwitchTo(oldContext);
469 				return true;
470 			}
471 		}
472 	}
473 
474 	/* There should still be an added but not yet removed value */
475 	Assert(peraggstate->transValueCount > 0);
476 
477 	/*
478 	 * In moving-aggregate mode, the state must never be NULL, except possibly
479 	 * before any rows have been aggregated (which is surely not the case at
480 	 * this point).  This restriction allows us to interpret a NULL result
481 	 * from the inverse function as meaning "sorry, can't do an inverse
482 	 * transition in this case".  We already checked this in
483 	 * advance_windowaggregate, but just for safety, check again.
484 	 */
485 	if (peraggstate->transValueIsNull)
486 		elog(ERROR, "aggregate transition value is NULL before inverse transition");
487 
488 	/*
489 	 * We mustn't use the inverse transition function to remove the last
490 	 * input.  Doing so would yield a non-NULL state, whereas we should be in
491 	 * the initial state afterwards which may very well be NULL.  So instead,
492 	 * we simply re-initialize the aggregate in this case.
493 	 */
494 	if (peraggstate->transValueCount == 1)
495 	{
496 		MemoryContextSwitchTo(oldContext);
497 		initialize_windowaggregate(winstate,
498 								   &winstate->perfunc[peraggstate->wfuncno],
499 								   peraggstate);
500 		return true;
501 	}
502 
503 	/*
504 	 * OK to call the inverse transition function.  Set
505 	 * winstate->curaggcontext while calling it, for possible use by
506 	 * AggCheckCallContext.
507 	 */
508 	InitFunctionCallInfoData(*fcinfo, &(peraggstate->invtransfn),
509 							 numArguments + 1,
510 							 perfuncstate->winCollation,
511 							 (void *) winstate, NULL);
512 	fcinfo->args[0].value = peraggstate->transValue;
513 	fcinfo->args[0].isnull = peraggstate->transValueIsNull;
514 	winstate->curaggcontext = peraggstate->aggcontext;
515 	newVal = FunctionCallInvoke(fcinfo);
516 	winstate->curaggcontext = NULL;
517 
518 	/*
519 	 * If the function returns NULL, report failure, forcing a restart.
520 	 */
521 	if (fcinfo->isnull)
522 	{
523 		MemoryContextSwitchTo(oldContext);
524 		return false;
525 	}
526 
527 	/* Update number of rows included in transValue */
528 	peraggstate->transValueCount--;
529 
530 	/*
531 	 * If pass-by-ref datatype, must copy the new value into aggcontext and
532 	 * free the prior transValue.  But if invtransfn returned a pointer to its
533 	 * first input, we don't need to do anything.  Also, if invtransfn
534 	 * returned a pointer to a R/W expanded object that is already a child of
535 	 * the aggcontext, assume we can adopt that value without copying it.
536 	 *
537 	 * Note: the checks for null values here will never fire, but it seems
538 	 * best to have this stanza look just like advance_windowaggregate.
539 	 */
540 	if (!peraggstate->transtypeByVal &&
541 		DatumGetPointer(newVal) != DatumGetPointer(peraggstate->transValue))
542 	{
543 		if (!fcinfo->isnull)
544 		{
545 			MemoryContextSwitchTo(peraggstate->aggcontext);
546 			if (DatumIsReadWriteExpandedObject(newVal,
547 											   false,
548 											   peraggstate->transtypeLen) &&
549 				MemoryContextGetParent(DatumGetEOHP(newVal)->eoh_context) == CurrentMemoryContext)
550 				 /* do nothing */ ;
551 			else
552 				newVal = datumCopy(newVal,
553 								   peraggstate->transtypeByVal,
554 								   peraggstate->transtypeLen);
555 		}
556 		if (!peraggstate->transValueIsNull)
557 		{
558 			if (DatumIsReadWriteExpandedObject(peraggstate->transValue,
559 											   false,
560 											   peraggstate->transtypeLen))
561 				DeleteExpandedObject(peraggstate->transValue);
562 			else
563 				pfree(DatumGetPointer(peraggstate->transValue));
564 		}
565 	}
566 
567 	MemoryContextSwitchTo(oldContext);
568 	peraggstate->transValue = newVal;
569 	peraggstate->transValueIsNull = fcinfo->isnull;
570 
571 	return true;
572 }
573 
574 /*
575  * finalize_windowaggregate
576  * parallel to finalize_aggregate in nodeAgg.c
577  */
578 static void
finalize_windowaggregate(WindowAggState * winstate,WindowStatePerFunc perfuncstate,WindowStatePerAgg peraggstate,Datum * result,bool * isnull)579 finalize_windowaggregate(WindowAggState *winstate,
580 						 WindowStatePerFunc perfuncstate,
581 						 WindowStatePerAgg peraggstate,
582 						 Datum *result, bool *isnull)
583 {
584 	MemoryContext oldContext;
585 
586 	oldContext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
587 
588 	/*
589 	 * Apply the agg's finalfn if one is provided, else return transValue.
590 	 */
591 	if (OidIsValid(peraggstate->finalfn_oid))
592 	{
593 		LOCAL_FCINFO(fcinfo, FUNC_MAX_ARGS);
594 		int			numFinalArgs = peraggstate->numFinalArgs;
595 		bool		anynull;
596 		int			i;
597 
598 		InitFunctionCallInfoData(fcinfodata.fcinfo, &(peraggstate->finalfn),
599 								 numFinalArgs,
600 								 perfuncstate->winCollation,
601 								 (void *) winstate, NULL);
602 		fcinfo->args[0].value =
603 			MakeExpandedObjectReadOnly(peraggstate->transValue,
604 									   peraggstate->transValueIsNull,
605 									   peraggstate->transtypeLen);
606 		fcinfo->args[0].isnull = peraggstate->transValueIsNull;
607 		anynull = peraggstate->transValueIsNull;
608 
609 		/* Fill any remaining argument positions with nulls */
610 		for (i = 1; i < numFinalArgs; i++)
611 		{
612 			fcinfo->args[i].value = (Datum) 0;
613 			fcinfo->args[i].isnull = true;
614 			anynull = true;
615 		}
616 
617 		if (fcinfo->flinfo->fn_strict && anynull)
618 		{
619 			/* don't call a strict function with NULL inputs */
620 			*result = (Datum) 0;
621 			*isnull = true;
622 		}
623 		else
624 		{
625 			winstate->curaggcontext = peraggstate->aggcontext;
626 			*result = FunctionCallInvoke(fcinfo);
627 			winstate->curaggcontext = NULL;
628 			*isnull = fcinfo->isnull;
629 		}
630 	}
631 	else
632 	{
633 		/* Don't need MakeExpandedObjectReadOnly; datumCopy will copy it */
634 		*result = peraggstate->transValue;
635 		*isnull = peraggstate->transValueIsNull;
636 	}
637 
638 	/*
639 	 * If result is pass-by-ref, make sure it is in the right context.
640 	 */
641 	if (!peraggstate->resulttypeByVal && !*isnull &&
642 		!MemoryContextContains(CurrentMemoryContext,
643 							   DatumGetPointer(*result)))
644 		*result = datumCopy(*result,
645 							peraggstate->resulttypeByVal,
646 							peraggstate->resulttypeLen);
647 	MemoryContextSwitchTo(oldContext);
648 }
649 
650 /*
651  * eval_windowaggregates
652  * evaluate plain aggregates being used as window functions
653  *
654  * This differs from nodeAgg.c in two ways.  First, if the window's frame
655  * start position moves, we use the inverse transition function (if it exists)
656  * to remove rows from the transition value.  And second, we expect to be
657  * able to call aggregate final functions repeatedly after aggregating more
658  * data onto the same transition value.  This is not a behavior required by
659  * nodeAgg.c.
660  */
661 static void
eval_windowaggregates(WindowAggState * winstate)662 eval_windowaggregates(WindowAggState *winstate)
663 {
664 	WindowStatePerAgg peraggstate;
665 	int			wfuncno,
666 				numaggs,
667 				numaggs_restart,
668 				i;
669 	int64		aggregatedupto_nonrestarted;
670 	MemoryContext oldContext;
671 	ExprContext *econtext;
672 	WindowObject agg_winobj;
673 	TupleTableSlot *agg_row_slot;
674 	TupleTableSlot *temp_slot;
675 
676 	numaggs = winstate->numaggs;
677 	if (numaggs == 0)
678 		return;					/* nothing to do */
679 
680 	/* final output execution is in ps_ExprContext */
681 	econtext = winstate->ss.ps.ps_ExprContext;
682 	agg_winobj = winstate->agg_winobj;
683 	agg_row_slot = winstate->agg_row_slot;
684 	temp_slot = winstate->temp_slot_1;
685 
686 	/*
687 	 * If the window's frame start clause is UNBOUNDED_PRECEDING and no
688 	 * exclusion clause is specified, then the window frame consists of a
689 	 * contiguous group of rows extending forward from the start of the
690 	 * partition, and rows only enter the frame, never exit it, as the current
691 	 * row advances forward.  This makes it possible to use an incremental
692 	 * strategy for evaluating aggregates: we run the transition function for
693 	 * each row added to the frame, and run the final function whenever we
694 	 * need the current aggregate value.  This is considerably more efficient
695 	 * than the naive approach of re-running the entire aggregate calculation
696 	 * for each current row.  It does assume that the final function doesn't
697 	 * damage the running transition value, but we have the same assumption in
698 	 * nodeAgg.c too (when it rescans an existing hash table).
699 	 *
700 	 * If the frame start does sometimes move, we can still optimize as above
701 	 * whenever successive rows share the same frame head, but if the frame
702 	 * head moves beyond the previous head we try to remove those rows using
703 	 * the aggregate's inverse transition function.  This function restores
704 	 * the aggregate's current state to what it would be if the removed row
705 	 * had never been aggregated in the first place.  Inverse transition
706 	 * functions may optionally return NULL, indicating that the function was
707 	 * unable to remove the tuple from aggregation.  If this happens, or if
708 	 * the aggregate doesn't have an inverse transition function at all, we
709 	 * must perform the aggregation all over again for all tuples within the
710 	 * new frame boundaries.
711 	 *
712 	 * If there's any exclusion clause, then we may have to aggregate over a
713 	 * non-contiguous set of rows, so we punt and recalculate for every row.
714 	 * (For some frame end choices, it might be that the frame is always
715 	 * contiguous anyway, but that's an optimization to investigate later.)
716 	 *
717 	 * In many common cases, multiple rows share the same frame and hence the
718 	 * same aggregate value. (In particular, if there's no ORDER BY in a RANGE
719 	 * window, then all rows are peers and so they all have window frame equal
720 	 * to the whole partition.)  We optimize such cases by calculating the
721 	 * aggregate value once when we reach the first row of a peer group, and
722 	 * then returning the saved value for all subsequent rows.
723 	 *
724 	 * 'aggregatedupto' keeps track of the first row that has not yet been
725 	 * accumulated into the aggregate transition values.  Whenever we start a
726 	 * new peer group, we accumulate forward to the end of the peer group.
727 	 */
728 
729 	/*
730 	 * First, update the frame head position.
731 	 *
732 	 * The frame head should never move backwards, and the code below wouldn't
733 	 * cope if it did, so for safety we complain if it does.
734 	 */
735 	update_frameheadpos(winstate);
736 	if (winstate->frameheadpos < winstate->aggregatedbase)
737 		elog(ERROR, "window frame head moved backward");
738 
739 	/*
740 	 * If the frame didn't change compared to the previous row, we can re-use
741 	 * the result values that were previously saved at the bottom of this
742 	 * function.  Since we don't know the current frame's end yet, this is not
743 	 * possible to check for fully.  But if the frame end mode is UNBOUNDED
744 	 * FOLLOWING or CURRENT ROW, no exclusion clause is specified, and the
745 	 * current row lies within the previous row's frame, then the two frames'
746 	 * ends must coincide.  Note that on the first row aggregatedbase ==
747 	 * aggregatedupto, meaning this test must fail, so we don't need to check
748 	 * the "there was no previous row" case explicitly here.
749 	 */
750 	if (winstate->aggregatedbase == winstate->frameheadpos &&
751 		(winstate->frameOptions & (FRAMEOPTION_END_UNBOUNDED_FOLLOWING |
752 								   FRAMEOPTION_END_CURRENT_ROW)) &&
753 		!(winstate->frameOptions & FRAMEOPTION_EXCLUSION) &&
754 		winstate->aggregatedbase <= winstate->currentpos &&
755 		winstate->aggregatedupto > winstate->currentpos)
756 	{
757 		for (i = 0; i < numaggs; i++)
758 		{
759 			peraggstate = &winstate->peragg[i];
760 			wfuncno = peraggstate->wfuncno;
761 			econtext->ecxt_aggvalues[wfuncno] = peraggstate->resultValue;
762 			econtext->ecxt_aggnulls[wfuncno] = peraggstate->resultValueIsNull;
763 		}
764 		return;
765 	}
766 
767 	/*----------
768 	 * Initialize restart flags.
769 	 *
770 	 * We restart the aggregation:
771 	 *	 - if we're processing the first row in the partition, or
772 	 *	 - if the frame's head moved and we cannot use an inverse
773 	 *	   transition function, or
774 	 *	 - we have an EXCLUSION clause, or
775 	 *	 - if the new frame doesn't overlap the old one
776 	 *
777 	 * Note that we don't strictly need to restart in the last case, but if
778 	 * we're going to remove all rows from the aggregation anyway, a restart
779 	 * surely is faster.
780 	 *----------
781 	 */
782 	numaggs_restart = 0;
783 	for (i = 0; i < numaggs; i++)
784 	{
785 		peraggstate = &winstate->peragg[i];
786 		if (winstate->currentpos == 0 ||
787 			(winstate->aggregatedbase != winstate->frameheadpos &&
788 			 !OidIsValid(peraggstate->invtransfn_oid)) ||
789 			(winstate->frameOptions & FRAMEOPTION_EXCLUSION) ||
790 			winstate->aggregatedupto <= winstate->frameheadpos)
791 		{
792 			peraggstate->restart = true;
793 			numaggs_restart++;
794 		}
795 		else
796 			peraggstate->restart = false;
797 	}
798 
799 	/*
800 	 * If we have any possibly-moving aggregates, attempt to advance
801 	 * aggregatedbase to match the frame's head by removing input rows that
802 	 * fell off the top of the frame from the aggregations.  This can fail,
803 	 * i.e. advance_windowaggregate_base() can return false, in which case
804 	 * we'll restart that aggregate below.
805 	 */
806 	while (numaggs_restart < numaggs &&
807 		   winstate->aggregatedbase < winstate->frameheadpos)
808 	{
809 		/*
810 		 * Fetch the next tuple of those being removed. This should never fail
811 		 * as we should have been here before.
812 		 */
813 		if (!window_gettupleslot(agg_winobj, winstate->aggregatedbase,
814 								 temp_slot))
815 			elog(ERROR, "could not re-fetch previously fetched frame row");
816 
817 		/* Set tuple context for evaluation of aggregate arguments */
818 		winstate->tmpcontext->ecxt_outertuple = temp_slot;
819 
820 		/*
821 		 * Perform the inverse transition for each aggregate function in the
822 		 * window, unless it has already been marked as needing a restart.
823 		 */
824 		for (i = 0; i < numaggs; i++)
825 		{
826 			bool		ok;
827 
828 			peraggstate = &winstate->peragg[i];
829 			if (peraggstate->restart)
830 				continue;
831 
832 			wfuncno = peraggstate->wfuncno;
833 			ok = advance_windowaggregate_base(winstate,
834 											  &winstate->perfunc[wfuncno],
835 											  peraggstate);
836 			if (!ok)
837 			{
838 				/* Inverse transition function has failed, must restart */
839 				peraggstate->restart = true;
840 				numaggs_restart++;
841 			}
842 		}
843 
844 		/* Reset per-input-tuple context after each tuple */
845 		ResetExprContext(winstate->tmpcontext);
846 
847 		/* And advance the aggregated-row state */
848 		winstate->aggregatedbase++;
849 		ExecClearTuple(temp_slot);
850 	}
851 
852 	/*
853 	 * If we successfully advanced the base rows of all the aggregates,
854 	 * aggregatedbase now equals frameheadpos; but if we failed for any, we
855 	 * must forcibly update aggregatedbase.
856 	 */
857 	winstate->aggregatedbase = winstate->frameheadpos;
858 
859 	/*
860 	 * If we created a mark pointer for aggregates, keep it pushed up to frame
861 	 * head, so that tuplestore can discard unnecessary rows.
862 	 */
863 	if (agg_winobj->markptr >= 0)
864 		WinSetMarkPosition(agg_winobj, winstate->frameheadpos);
865 
866 	/*
867 	 * Now restart the aggregates that require it.
868 	 *
869 	 * We assume that aggregates using the shared context always restart if
870 	 * *any* aggregate restarts, and we may thus clean up the shared
871 	 * aggcontext if that is the case.  Private aggcontexts are reset by
872 	 * initialize_windowaggregate() if their owning aggregate restarts. If we
873 	 * aren't restarting an aggregate, we need to free any previously saved
874 	 * result for it, else we'll leak memory.
875 	 */
876 	if (numaggs_restart > 0)
877 		MemoryContextResetAndDeleteChildren(winstate->aggcontext);
878 	for (i = 0; i < numaggs; i++)
879 	{
880 		peraggstate = &winstate->peragg[i];
881 
882 		/* Aggregates using the shared ctx must restart if *any* agg does */
883 		Assert(peraggstate->aggcontext != winstate->aggcontext ||
884 			   numaggs_restart == 0 ||
885 			   peraggstate->restart);
886 
887 		if (peraggstate->restart)
888 		{
889 			wfuncno = peraggstate->wfuncno;
890 			initialize_windowaggregate(winstate,
891 									   &winstate->perfunc[wfuncno],
892 									   peraggstate);
893 		}
894 		else if (!peraggstate->resultValueIsNull)
895 		{
896 			if (!peraggstate->resulttypeByVal)
897 				pfree(DatumGetPointer(peraggstate->resultValue));
898 			peraggstate->resultValue = (Datum) 0;
899 			peraggstate->resultValueIsNull = true;
900 		}
901 	}
902 
903 	/*
904 	 * Non-restarted aggregates now contain the rows between aggregatedbase
905 	 * (i.e., frameheadpos) and aggregatedupto, while restarted aggregates
906 	 * contain no rows.  If there are any restarted aggregates, we must thus
907 	 * begin aggregating anew at frameheadpos, otherwise we may simply
908 	 * continue at aggregatedupto.  We must remember the old value of
909 	 * aggregatedupto to know how long to skip advancing non-restarted
910 	 * aggregates.  If we modify aggregatedupto, we must also clear
911 	 * agg_row_slot, per the loop invariant below.
912 	 */
913 	aggregatedupto_nonrestarted = winstate->aggregatedupto;
914 	if (numaggs_restart > 0 &&
915 		winstate->aggregatedupto != winstate->frameheadpos)
916 	{
917 		winstate->aggregatedupto = winstate->frameheadpos;
918 		ExecClearTuple(agg_row_slot);
919 	}
920 
921 	/*
922 	 * Advance until we reach a row not in frame (or end of partition).
923 	 *
924 	 * Note the loop invariant: agg_row_slot is either empty or holds the row
925 	 * at position aggregatedupto.  We advance aggregatedupto after processing
926 	 * a row.
927 	 */
928 	for (;;)
929 	{
930 		int			ret;
931 
932 		/* Fetch next row if we didn't already */
933 		if (TupIsNull(agg_row_slot))
934 		{
935 			if (!window_gettupleslot(agg_winobj, winstate->aggregatedupto,
936 									 agg_row_slot))
937 				break;			/* must be end of partition */
938 		}
939 
940 		/*
941 		 * Exit loop if no more rows can be in frame.  Skip aggregation if
942 		 * current row is not in frame but there might be more in the frame.
943 		 */
944 		ret = row_is_in_frame(winstate, winstate->aggregatedupto, agg_row_slot);
945 		if (ret < 0)
946 			break;
947 		if (ret == 0)
948 			goto next_tuple;
949 
950 		/* Set tuple context for evaluation of aggregate arguments */
951 		winstate->tmpcontext->ecxt_outertuple = agg_row_slot;
952 
953 		/* Accumulate row into the aggregates */
954 		for (i = 0; i < numaggs; i++)
955 		{
956 			peraggstate = &winstate->peragg[i];
957 
958 			/* Non-restarted aggs skip until aggregatedupto_nonrestarted */
959 			if (!peraggstate->restart &&
960 				winstate->aggregatedupto < aggregatedupto_nonrestarted)
961 				continue;
962 
963 			wfuncno = peraggstate->wfuncno;
964 			advance_windowaggregate(winstate,
965 									&winstate->perfunc[wfuncno],
966 									peraggstate);
967 		}
968 
969 next_tuple:
970 		/* Reset per-input-tuple context after each tuple */
971 		ResetExprContext(winstate->tmpcontext);
972 
973 		/* And advance the aggregated-row state */
974 		winstate->aggregatedupto++;
975 		ExecClearTuple(agg_row_slot);
976 	}
977 
978 	/* The frame's end is not supposed to move backwards, ever */
979 	Assert(aggregatedupto_nonrestarted <= winstate->aggregatedupto);
980 
981 	/*
982 	 * finalize aggregates and fill result/isnull fields.
983 	 */
984 	for (i = 0; i < numaggs; i++)
985 	{
986 		Datum	   *result;
987 		bool	   *isnull;
988 
989 		peraggstate = &winstate->peragg[i];
990 		wfuncno = peraggstate->wfuncno;
991 		result = &econtext->ecxt_aggvalues[wfuncno];
992 		isnull = &econtext->ecxt_aggnulls[wfuncno];
993 		finalize_windowaggregate(winstate,
994 								 &winstate->perfunc[wfuncno],
995 								 peraggstate,
996 								 result, isnull);
997 
998 		/*
999 		 * save the result in case next row shares the same frame.
1000 		 *
1001 		 * XXX in some framing modes, eg ROWS/END_CURRENT_ROW, we can know in
1002 		 * advance that the next row can't possibly share the same frame. Is
1003 		 * it worth detecting that and skipping this code?
1004 		 */
1005 		if (!peraggstate->resulttypeByVal && !*isnull)
1006 		{
1007 			oldContext = MemoryContextSwitchTo(peraggstate->aggcontext);
1008 			peraggstate->resultValue =
1009 				datumCopy(*result,
1010 						  peraggstate->resulttypeByVal,
1011 						  peraggstate->resulttypeLen);
1012 			MemoryContextSwitchTo(oldContext);
1013 		}
1014 		else
1015 		{
1016 			peraggstate->resultValue = *result;
1017 		}
1018 		peraggstate->resultValueIsNull = *isnull;
1019 	}
1020 }
1021 
1022 /*
1023  * eval_windowfunction
1024  *
1025  * Arguments of window functions are not evaluated here, because a window
1026  * function can need random access to arbitrary rows in the partition.
1027  * The window function uses the special WinGetFuncArgInPartition and
1028  * WinGetFuncArgInFrame functions to evaluate the arguments for the rows
1029  * it wants.
1030  */
1031 static void
eval_windowfunction(WindowAggState * winstate,WindowStatePerFunc perfuncstate,Datum * result,bool * isnull)1032 eval_windowfunction(WindowAggState *winstate, WindowStatePerFunc perfuncstate,
1033 					Datum *result, bool *isnull)
1034 {
1035 	LOCAL_FCINFO(fcinfo, FUNC_MAX_ARGS);
1036 	MemoryContext oldContext;
1037 
1038 	oldContext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
1039 
1040 	/*
1041 	 * We don't pass any normal arguments to a window function, but we do pass
1042 	 * it the number of arguments, in order to permit window function
1043 	 * implementations to support varying numbers of arguments.  The real info
1044 	 * goes through the WindowObject, which is passed via fcinfo->context.
1045 	 */
1046 	InitFunctionCallInfoData(*fcinfo, &(perfuncstate->flinfo),
1047 							 perfuncstate->numArguments,
1048 							 perfuncstate->winCollation,
1049 							 (void *) perfuncstate->winobj, NULL);
1050 	/* Just in case, make all the regular argument slots be null */
1051 	for (int argno = 0; argno < perfuncstate->numArguments; argno++)
1052 		fcinfo->args[argno].isnull = true;
1053 	/* Window functions don't have a current aggregate context, either */
1054 	winstate->curaggcontext = NULL;
1055 
1056 	*result = FunctionCallInvoke(fcinfo);
1057 	*isnull = fcinfo->isnull;
1058 
1059 	/*
1060 	 * Make sure pass-by-ref data is allocated in the appropriate context. (We
1061 	 * need this in case the function returns a pointer into some short-lived
1062 	 * tuple, as is entirely possible.)
1063 	 */
1064 	if (!perfuncstate->resulttypeByVal && !fcinfo->isnull &&
1065 		!MemoryContextContains(CurrentMemoryContext,
1066 							   DatumGetPointer(*result)))
1067 		*result = datumCopy(*result,
1068 							perfuncstate->resulttypeByVal,
1069 							perfuncstate->resulttypeLen);
1070 
1071 	MemoryContextSwitchTo(oldContext);
1072 }
1073 
1074 /*
1075  * begin_partition
1076  * Start buffering rows of the next partition.
1077  */
1078 static void
begin_partition(WindowAggState * winstate)1079 begin_partition(WindowAggState *winstate)
1080 {
1081 	WindowAgg  *node = (WindowAgg *) winstate->ss.ps.plan;
1082 	PlanState  *outerPlan = outerPlanState(winstate);
1083 	int			frameOptions = winstate->frameOptions;
1084 	int			numfuncs = winstate->numfuncs;
1085 	int			i;
1086 
1087 	winstate->partition_spooled = false;
1088 	winstate->framehead_valid = false;
1089 	winstate->frametail_valid = false;
1090 	winstate->grouptail_valid = false;
1091 	winstate->spooled_rows = 0;
1092 	winstate->currentpos = 0;
1093 	winstate->frameheadpos = 0;
1094 	winstate->frametailpos = 0;
1095 	winstate->currentgroup = 0;
1096 	winstate->frameheadgroup = 0;
1097 	winstate->frametailgroup = 0;
1098 	winstate->groupheadpos = 0;
1099 	winstate->grouptailpos = -1;	/* see update_grouptailpos */
1100 	ExecClearTuple(winstate->agg_row_slot);
1101 	if (winstate->framehead_slot)
1102 		ExecClearTuple(winstate->framehead_slot);
1103 	if (winstate->frametail_slot)
1104 		ExecClearTuple(winstate->frametail_slot);
1105 
1106 	/*
1107 	 * If this is the very first partition, we need to fetch the first input
1108 	 * row to store in first_part_slot.
1109 	 */
1110 	if (TupIsNull(winstate->first_part_slot))
1111 	{
1112 		TupleTableSlot *outerslot = ExecProcNode(outerPlan);
1113 
1114 		if (!TupIsNull(outerslot))
1115 			ExecCopySlot(winstate->first_part_slot, outerslot);
1116 		else
1117 		{
1118 			/* outer plan is empty, so we have nothing to do */
1119 			winstate->partition_spooled = true;
1120 			winstate->more_partitions = false;
1121 			return;
1122 		}
1123 	}
1124 
1125 	/* Create new tuplestore for this partition */
1126 	winstate->buffer = tuplestore_begin_heap(false, false, work_mem);
1127 
1128 	/*
1129 	 * Set up read pointers for the tuplestore.  The current pointer doesn't
1130 	 * need BACKWARD capability, but the per-window-function read pointers do,
1131 	 * and the aggregate pointer does if we might need to restart aggregation.
1132 	 */
1133 	winstate->current_ptr = 0;	/* read pointer 0 is pre-allocated */
1134 
1135 	/* reset default REWIND capability bit for current ptr */
1136 	tuplestore_set_eflags(winstate->buffer, 0);
1137 
1138 	/* create read pointers for aggregates, if needed */
1139 	if (winstate->numaggs > 0)
1140 	{
1141 		WindowObject agg_winobj = winstate->agg_winobj;
1142 		int			readptr_flags = 0;
1143 
1144 		/*
1145 		 * If the frame head is potentially movable, or we have an EXCLUSION
1146 		 * clause, we might need to restart aggregation ...
1147 		 */
1148 		if (!(frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING) ||
1149 			(frameOptions & FRAMEOPTION_EXCLUSION))
1150 		{
1151 			/* ... so create a mark pointer to track the frame head */
1152 			agg_winobj->markptr = tuplestore_alloc_read_pointer(winstate->buffer, 0);
1153 			/* and the read pointer will need BACKWARD capability */
1154 			readptr_flags |= EXEC_FLAG_BACKWARD;
1155 		}
1156 
1157 		agg_winobj->readptr = tuplestore_alloc_read_pointer(winstate->buffer,
1158 															readptr_flags);
1159 		agg_winobj->markpos = -1;
1160 		agg_winobj->seekpos = -1;
1161 
1162 		/* Also reset the row counters for aggregates */
1163 		winstate->aggregatedbase = 0;
1164 		winstate->aggregatedupto = 0;
1165 	}
1166 
1167 	/* create mark and read pointers for each real window function */
1168 	for (i = 0; i < numfuncs; i++)
1169 	{
1170 		WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
1171 
1172 		if (!perfuncstate->plain_agg)
1173 		{
1174 			WindowObject winobj = perfuncstate->winobj;
1175 
1176 			winobj->markptr = tuplestore_alloc_read_pointer(winstate->buffer,
1177 															0);
1178 			winobj->readptr = tuplestore_alloc_read_pointer(winstate->buffer,
1179 															EXEC_FLAG_BACKWARD);
1180 			winobj->markpos = -1;
1181 			winobj->seekpos = -1;
1182 		}
1183 	}
1184 
1185 	/*
1186 	 * If we are in RANGE or GROUPS mode, then determining frame boundaries
1187 	 * requires physical access to the frame endpoint rows, except in certain
1188 	 * degenerate cases.  We create read pointers to point to those rows, to
1189 	 * simplify access and ensure that the tuplestore doesn't discard the
1190 	 * endpoint rows prematurely.  (Must create pointers in exactly the same
1191 	 * cases that update_frameheadpos and update_frametailpos need them.)
1192 	 */
1193 	winstate->framehead_ptr = winstate->frametail_ptr = -1; /* if not used */
1194 
1195 	if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS))
1196 	{
1197 		if (((frameOptions & FRAMEOPTION_START_CURRENT_ROW) &&
1198 			 node->ordNumCols != 0) ||
1199 			(frameOptions & FRAMEOPTION_START_OFFSET))
1200 			winstate->framehead_ptr =
1201 				tuplestore_alloc_read_pointer(winstate->buffer, 0);
1202 		if (((frameOptions & FRAMEOPTION_END_CURRENT_ROW) &&
1203 			 node->ordNumCols != 0) ||
1204 			(frameOptions & FRAMEOPTION_END_OFFSET))
1205 			winstate->frametail_ptr =
1206 				tuplestore_alloc_read_pointer(winstate->buffer, 0);
1207 	}
1208 
1209 	/*
1210 	 * If we have an exclusion clause that requires knowing the boundaries of
1211 	 * the current row's peer group, we create a read pointer to track the
1212 	 * tail position of the peer group (i.e., first row of the next peer
1213 	 * group).  The head position does not require its own pointer because we
1214 	 * maintain that as a side effect of advancing the current row.
1215 	 */
1216 	winstate->grouptail_ptr = -1;
1217 
1218 	if ((frameOptions & (FRAMEOPTION_EXCLUDE_GROUP |
1219 						 FRAMEOPTION_EXCLUDE_TIES)) &&
1220 		node->ordNumCols != 0)
1221 	{
1222 		winstate->grouptail_ptr =
1223 			tuplestore_alloc_read_pointer(winstate->buffer, 0);
1224 	}
1225 
1226 	/*
1227 	 * Store the first tuple into the tuplestore (it's always available now;
1228 	 * we either read it above, or saved it at the end of previous partition)
1229 	 */
1230 	tuplestore_puttupleslot(winstate->buffer, winstate->first_part_slot);
1231 	winstate->spooled_rows++;
1232 }
1233 
1234 /*
1235  * Read tuples from the outer node, up to and including position 'pos', and
1236  * store them into the tuplestore. If pos is -1, reads the whole partition.
1237  */
1238 static void
spool_tuples(WindowAggState * winstate,int64 pos)1239 spool_tuples(WindowAggState *winstate, int64 pos)
1240 {
1241 	WindowAgg  *node = (WindowAgg *) winstate->ss.ps.plan;
1242 	PlanState  *outerPlan;
1243 	TupleTableSlot *outerslot;
1244 	MemoryContext oldcontext;
1245 
1246 	if (!winstate->buffer)
1247 		return;					/* just a safety check */
1248 	if (winstate->partition_spooled)
1249 		return;					/* whole partition done already */
1250 
1251 	/*
1252 	 * If the tuplestore has spilled to disk, alternate reading and writing
1253 	 * becomes quite expensive due to frequent buffer flushes.  It's cheaper
1254 	 * to force the entire partition to get spooled in one go.
1255 	 *
1256 	 * XXX this is a horrid kluge --- it'd be better to fix the performance
1257 	 * problem inside tuplestore.  FIXME
1258 	 */
1259 	if (!tuplestore_in_memory(winstate->buffer))
1260 		pos = -1;
1261 
1262 	outerPlan = outerPlanState(winstate);
1263 
1264 	/* Must be in query context to call outerplan */
1265 	oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
1266 
1267 	while (winstate->spooled_rows <= pos || pos == -1)
1268 	{
1269 		outerslot = ExecProcNode(outerPlan);
1270 		if (TupIsNull(outerslot))
1271 		{
1272 			/* reached the end of the last partition */
1273 			winstate->partition_spooled = true;
1274 			winstate->more_partitions = false;
1275 			break;
1276 		}
1277 
1278 		if (node->partNumCols > 0)
1279 		{
1280 			ExprContext *econtext = winstate->tmpcontext;
1281 
1282 			econtext->ecxt_innertuple = winstate->first_part_slot;
1283 			econtext->ecxt_outertuple = outerslot;
1284 
1285 			/* Check if this tuple still belongs to the current partition */
1286 			if (!ExecQualAndReset(winstate->partEqfunction, econtext))
1287 			{
1288 				/*
1289 				 * end of partition; copy the tuple for the next cycle.
1290 				 */
1291 				ExecCopySlot(winstate->first_part_slot, outerslot);
1292 				winstate->partition_spooled = true;
1293 				winstate->more_partitions = true;
1294 				break;
1295 			}
1296 		}
1297 
1298 		/* Still in partition, so save it into the tuplestore */
1299 		tuplestore_puttupleslot(winstate->buffer, outerslot);
1300 		winstate->spooled_rows++;
1301 	}
1302 
1303 	MemoryContextSwitchTo(oldcontext);
1304 }
1305 
1306 /*
1307  * release_partition
1308  * clear information kept within a partition, including
1309  * tuplestore and aggregate results.
1310  */
1311 static void
release_partition(WindowAggState * winstate)1312 release_partition(WindowAggState *winstate)
1313 {
1314 	int			i;
1315 
1316 	for (i = 0; i < winstate->numfuncs; i++)
1317 	{
1318 		WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
1319 
1320 		/* Release any partition-local state of this window function */
1321 		if (perfuncstate->winobj)
1322 			perfuncstate->winobj->localmem = NULL;
1323 	}
1324 
1325 	/*
1326 	 * Release all partition-local memory (in particular, any partition-local
1327 	 * state that we might have trashed our pointers to in the above loop, and
1328 	 * any aggregate temp data).  We don't rely on retail pfree because some
1329 	 * aggregates might have allocated data we don't have direct pointers to.
1330 	 */
1331 	MemoryContextResetAndDeleteChildren(winstate->partcontext);
1332 	MemoryContextResetAndDeleteChildren(winstate->aggcontext);
1333 	for (i = 0; i < winstate->numaggs; i++)
1334 	{
1335 		if (winstate->peragg[i].aggcontext != winstate->aggcontext)
1336 			MemoryContextResetAndDeleteChildren(winstate->peragg[i].aggcontext);
1337 	}
1338 
1339 	if (winstate->buffer)
1340 		tuplestore_end(winstate->buffer);
1341 	winstate->buffer = NULL;
1342 	winstate->partition_spooled = false;
1343 }
1344 
1345 /*
1346  * row_is_in_frame
1347  * Determine whether a row is in the current row's window frame according
1348  * to our window framing rule
1349  *
1350  * The caller must have already determined that the row is in the partition
1351  * and fetched it into a slot.  This function just encapsulates the framing
1352  * rules.
1353  *
1354  * Returns:
1355  * -1, if the row is out of frame and no succeeding rows can be in frame
1356  * 0, if the row is out of frame but succeeding rows might be in frame
1357  * 1, if the row is in frame
1358  *
1359  * May clobber winstate->temp_slot_2.
1360  */
1361 static int
row_is_in_frame(WindowAggState * winstate,int64 pos,TupleTableSlot * slot)1362 row_is_in_frame(WindowAggState *winstate, int64 pos, TupleTableSlot *slot)
1363 {
1364 	int			frameOptions = winstate->frameOptions;
1365 
1366 	Assert(pos >= 0);			/* else caller error */
1367 
1368 	/*
1369 	 * First, check frame starting conditions.  We might as well delegate this
1370 	 * to update_frameheadpos always; it doesn't add any notable cost.
1371 	 */
1372 	update_frameheadpos(winstate);
1373 	if (pos < winstate->frameheadpos)
1374 		return 0;
1375 
1376 	/*
1377 	 * Okay so far, now check frame ending conditions.  Here, we avoid calling
1378 	 * update_frametailpos in simple cases, so as not to spool tuples further
1379 	 * ahead than necessary.
1380 	 */
1381 	if (frameOptions & FRAMEOPTION_END_CURRENT_ROW)
1382 	{
1383 		if (frameOptions & FRAMEOPTION_ROWS)
1384 		{
1385 			/* rows after current row are out of frame */
1386 			if (pos > winstate->currentpos)
1387 				return -1;
1388 		}
1389 		else if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS))
1390 		{
1391 			/* following row that is not peer is out of frame */
1392 			if (pos > winstate->currentpos &&
1393 				!are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
1394 				return -1;
1395 		}
1396 		else
1397 			Assert(false);
1398 	}
1399 	else if (frameOptions & FRAMEOPTION_END_OFFSET)
1400 	{
1401 		if (frameOptions & FRAMEOPTION_ROWS)
1402 		{
1403 			int64		offset = DatumGetInt64(winstate->endOffsetValue);
1404 
1405 			/* rows after current row + offset are out of frame */
1406 			if (frameOptions & FRAMEOPTION_END_OFFSET_PRECEDING)
1407 				offset = -offset;
1408 
1409 			if (pos > winstate->currentpos + offset)
1410 				return -1;
1411 		}
1412 		else if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS))
1413 		{
1414 			/* hard cases, so delegate to update_frametailpos */
1415 			update_frametailpos(winstate);
1416 			if (pos >= winstate->frametailpos)
1417 				return -1;
1418 		}
1419 		else
1420 			Assert(false);
1421 	}
1422 
1423 	/* Check exclusion clause */
1424 	if (frameOptions & FRAMEOPTION_EXCLUDE_CURRENT_ROW)
1425 	{
1426 		if (pos == winstate->currentpos)
1427 			return 0;
1428 	}
1429 	else if ((frameOptions & FRAMEOPTION_EXCLUDE_GROUP) ||
1430 			 ((frameOptions & FRAMEOPTION_EXCLUDE_TIES) &&
1431 			  pos != winstate->currentpos))
1432 	{
1433 		WindowAgg  *node = (WindowAgg *) winstate->ss.ps.plan;
1434 
1435 		/* If no ORDER BY, all rows are peers with each other */
1436 		if (node->ordNumCols == 0)
1437 			return 0;
1438 		/* Otherwise, check the group boundaries */
1439 		if (pos >= winstate->groupheadpos)
1440 		{
1441 			update_grouptailpos(winstate);
1442 			if (pos < winstate->grouptailpos)
1443 				return 0;
1444 		}
1445 	}
1446 
1447 	/* If we get here, it's in frame */
1448 	return 1;
1449 }
1450 
1451 /*
1452  * update_frameheadpos
1453  * make frameheadpos valid for the current row
1454  *
1455  * Note that frameheadpos is computed without regard for any window exclusion
1456  * clause; the current row and/or its peers are considered part of the frame
1457  * for this purpose even if they must be excluded later.
1458  *
1459  * May clobber winstate->temp_slot_2.
1460  */
1461 static void
update_frameheadpos(WindowAggState * winstate)1462 update_frameheadpos(WindowAggState *winstate)
1463 {
1464 	WindowAgg  *node = (WindowAgg *) winstate->ss.ps.plan;
1465 	int			frameOptions = winstate->frameOptions;
1466 	MemoryContext oldcontext;
1467 
1468 	if (winstate->framehead_valid)
1469 		return;					/* already known for current row */
1470 
1471 	/* We may be called in a short-lived context */
1472 	oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
1473 
1474 	if (frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING)
1475 	{
1476 		/* In UNBOUNDED PRECEDING mode, frame head is always row 0 */
1477 		winstate->frameheadpos = 0;
1478 		winstate->framehead_valid = true;
1479 	}
1480 	else if (frameOptions & FRAMEOPTION_START_CURRENT_ROW)
1481 	{
1482 		if (frameOptions & FRAMEOPTION_ROWS)
1483 		{
1484 			/* In ROWS mode, frame head is the same as current */
1485 			winstate->frameheadpos = winstate->currentpos;
1486 			winstate->framehead_valid = true;
1487 		}
1488 		else if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS))
1489 		{
1490 			/* If no ORDER BY, all rows are peers with each other */
1491 			if (node->ordNumCols == 0)
1492 			{
1493 				winstate->frameheadpos = 0;
1494 				winstate->framehead_valid = true;
1495 				MemoryContextSwitchTo(oldcontext);
1496 				return;
1497 			}
1498 
1499 			/*
1500 			 * In RANGE or GROUPS START_CURRENT_ROW mode, frame head is the
1501 			 * first row that is a peer of current row.  We keep a copy of the
1502 			 * last-known frame head row in framehead_slot, and advance as
1503 			 * necessary.  Note that if we reach end of partition, we will
1504 			 * leave frameheadpos = end+1 and framehead_slot empty.
1505 			 */
1506 			tuplestore_select_read_pointer(winstate->buffer,
1507 										   winstate->framehead_ptr);
1508 			if (winstate->frameheadpos == 0 &&
1509 				TupIsNull(winstate->framehead_slot))
1510 			{
1511 				/* fetch first row into framehead_slot, if we didn't already */
1512 				if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1513 											 winstate->framehead_slot))
1514 					elog(ERROR, "unexpected end of tuplestore");
1515 			}
1516 
1517 			while (!TupIsNull(winstate->framehead_slot))
1518 			{
1519 				if (are_peers(winstate, winstate->framehead_slot,
1520 							  winstate->ss.ss_ScanTupleSlot))
1521 					break;		/* this row is the correct frame head */
1522 				/* Note we advance frameheadpos even if the fetch fails */
1523 				winstate->frameheadpos++;
1524 				spool_tuples(winstate, winstate->frameheadpos);
1525 				if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1526 											 winstate->framehead_slot))
1527 					break;		/* end of partition */
1528 			}
1529 			winstate->framehead_valid = true;
1530 		}
1531 		else
1532 			Assert(false);
1533 	}
1534 	else if (frameOptions & FRAMEOPTION_START_OFFSET)
1535 	{
1536 		if (frameOptions & FRAMEOPTION_ROWS)
1537 		{
1538 			/* In ROWS mode, bound is physically n before/after current */
1539 			int64		offset = DatumGetInt64(winstate->startOffsetValue);
1540 
1541 			if (frameOptions & FRAMEOPTION_START_OFFSET_PRECEDING)
1542 				offset = -offset;
1543 
1544 			winstate->frameheadpos = winstate->currentpos + offset;
1545 			/* frame head can't go before first row */
1546 			if (winstate->frameheadpos < 0)
1547 				winstate->frameheadpos = 0;
1548 			else if (winstate->frameheadpos > winstate->currentpos + 1)
1549 			{
1550 				/* make sure frameheadpos is not past end of partition */
1551 				spool_tuples(winstate, winstate->frameheadpos - 1);
1552 				if (winstate->frameheadpos > winstate->spooled_rows)
1553 					winstate->frameheadpos = winstate->spooled_rows;
1554 			}
1555 			winstate->framehead_valid = true;
1556 		}
1557 		else if (frameOptions & FRAMEOPTION_RANGE)
1558 		{
1559 			/*
1560 			 * In RANGE START_OFFSET mode, frame head is the first row that
1561 			 * satisfies the in_range constraint relative to the current row.
1562 			 * We keep a copy of the last-known frame head row in
1563 			 * framehead_slot, and advance as necessary.  Note that if we
1564 			 * reach end of partition, we will leave frameheadpos = end+1 and
1565 			 * framehead_slot empty.
1566 			 */
1567 			int			sortCol = node->ordColIdx[0];
1568 			bool		sub,
1569 						less;
1570 
1571 			/* We must have an ordering column */
1572 			Assert(node->ordNumCols == 1);
1573 
1574 			/* Precompute flags for in_range checks */
1575 			if (frameOptions & FRAMEOPTION_START_OFFSET_PRECEDING)
1576 				sub = true;		/* subtract startOffset from current row */
1577 			else
1578 				sub = false;	/* add it */
1579 			less = false;		/* normally, we want frame head >= sum */
1580 			/* If sort order is descending, flip both flags */
1581 			if (!winstate->inRangeAsc)
1582 			{
1583 				sub = !sub;
1584 				less = true;
1585 			}
1586 
1587 			tuplestore_select_read_pointer(winstate->buffer,
1588 										   winstate->framehead_ptr);
1589 			if (winstate->frameheadpos == 0 &&
1590 				TupIsNull(winstate->framehead_slot))
1591 			{
1592 				/* fetch first row into framehead_slot, if we didn't already */
1593 				if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1594 											 winstate->framehead_slot))
1595 					elog(ERROR, "unexpected end of tuplestore");
1596 			}
1597 
1598 			while (!TupIsNull(winstate->framehead_slot))
1599 			{
1600 				Datum		headval,
1601 							currval;
1602 				bool		headisnull,
1603 							currisnull;
1604 
1605 				headval = slot_getattr(winstate->framehead_slot, sortCol,
1606 									   &headisnull);
1607 				currval = slot_getattr(winstate->ss.ss_ScanTupleSlot, sortCol,
1608 									   &currisnull);
1609 				if (headisnull || currisnull)
1610 				{
1611 					/* order of the rows depends only on nulls_first */
1612 					if (winstate->inRangeNullsFirst)
1613 					{
1614 						/* advance head if head is null and curr is not */
1615 						if (!headisnull || currisnull)
1616 							break;
1617 					}
1618 					else
1619 					{
1620 						/* advance head if head is not null and curr is null */
1621 						if (headisnull || !currisnull)
1622 							break;
1623 					}
1624 				}
1625 				else
1626 				{
1627 					if (DatumGetBool(FunctionCall5Coll(&winstate->startInRangeFunc,
1628 													   winstate->inRangeColl,
1629 													   headval,
1630 													   currval,
1631 													   winstate->startOffsetValue,
1632 													   BoolGetDatum(sub),
1633 													   BoolGetDatum(less))))
1634 						break;	/* this row is the correct frame head */
1635 				}
1636 				/* Note we advance frameheadpos even if the fetch fails */
1637 				winstate->frameheadpos++;
1638 				spool_tuples(winstate, winstate->frameheadpos);
1639 				if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1640 											 winstate->framehead_slot))
1641 					break;		/* end of partition */
1642 			}
1643 			winstate->framehead_valid = true;
1644 		}
1645 		else if (frameOptions & FRAMEOPTION_GROUPS)
1646 		{
1647 			/*
1648 			 * In GROUPS START_OFFSET mode, frame head is the first row of the
1649 			 * first peer group whose number satisfies the offset constraint.
1650 			 * We keep a copy of the last-known frame head row in
1651 			 * framehead_slot, and advance as necessary.  Note that if we
1652 			 * reach end of partition, we will leave frameheadpos = end+1 and
1653 			 * framehead_slot empty.
1654 			 */
1655 			int64		offset = DatumGetInt64(winstate->startOffsetValue);
1656 			int64		minheadgroup;
1657 
1658 			if (frameOptions & FRAMEOPTION_START_OFFSET_PRECEDING)
1659 				minheadgroup = winstate->currentgroup - offset;
1660 			else
1661 				minheadgroup = winstate->currentgroup + offset;
1662 
1663 			tuplestore_select_read_pointer(winstate->buffer,
1664 										   winstate->framehead_ptr);
1665 			if (winstate->frameheadpos == 0 &&
1666 				TupIsNull(winstate->framehead_slot))
1667 			{
1668 				/* fetch first row into framehead_slot, if we didn't already */
1669 				if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1670 											 winstate->framehead_slot))
1671 					elog(ERROR, "unexpected end of tuplestore");
1672 			}
1673 
1674 			while (!TupIsNull(winstate->framehead_slot))
1675 			{
1676 				if (winstate->frameheadgroup >= minheadgroup)
1677 					break;		/* this row is the correct frame head */
1678 				ExecCopySlot(winstate->temp_slot_2, winstate->framehead_slot);
1679 				/* Note we advance frameheadpos even if the fetch fails */
1680 				winstate->frameheadpos++;
1681 				spool_tuples(winstate, winstate->frameheadpos);
1682 				if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1683 											 winstate->framehead_slot))
1684 					break;		/* end of partition */
1685 				if (!are_peers(winstate, winstate->temp_slot_2,
1686 							   winstate->framehead_slot))
1687 					winstate->frameheadgroup++;
1688 			}
1689 			ExecClearTuple(winstate->temp_slot_2);
1690 			winstate->framehead_valid = true;
1691 		}
1692 		else
1693 			Assert(false);
1694 	}
1695 	else
1696 		Assert(false);
1697 
1698 	MemoryContextSwitchTo(oldcontext);
1699 }
1700 
1701 /*
1702  * update_frametailpos
1703  * make frametailpos valid for the current row
1704  *
1705  * Note that frametailpos is computed without regard for any window exclusion
1706  * clause; the current row and/or its peers are considered part of the frame
1707  * for this purpose even if they must be excluded later.
1708  *
1709  * May clobber winstate->temp_slot_2.
1710  */
1711 static void
update_frametailpos(WindowAggState * winstate)1712 update_frametailpos(WindowAggState *winstate)
1713 {
1714 	WindowAgg  *node = (WindowAgg *) winstate->ss.ps.plan;
1715 	int			frameOptions = winstate->frameOptions;
1716 	MemoryContext oldcontext;
1717 
1718 	if (winstate->frametail_valid)
1719 		return;					/* already known for current row */
1720 
1721 	/* We may be called in a short-lived context */
1722 	oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
1723 
1724 	if (frameOptions & FRAMEOPTION_END_UNBOUNDED_FOLLOWING)
1725 	{
1726 		/* In UNBOUNDED FOLLOWING mode, all partition rows are in frame */
1727 		spool_tuples(winstate, -1);
1728 		winstate->frametailpos = winstate->spooled_rows;
1729 		winstate->frametail_valid = true;
1730 	}
1731 	else if (frameOptions & FRAMEOPTION_END_CURRENT_ROW)
1732 	{
1733 		if (frameOptions & FRAMEOPTION_ROWS)
1734 		{
1735 			/* In ROWS mode, exactly the rows up to current are in frame */
1736 			winstate->frametailpos = winstate->currentpos + 1;
1737 			winstate->frametail_valid = true;
1738 		}
1739 		else if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS))
1740 		{
1741 			/* If no ORDER BY, all rows are peers with each other */
1742 			if (node->ordNumCols == 0)
1743 			{
1744 				spool_tuples(winstate, -1);
1745 				winstate->frametailpos = winstate->spooled_rows;
1746 				winstate->frametail_valid = true;
1747 				MemoryContextSwitchTo(oldcontext);
1748 				return;
1749 			}
1750 
1751 			/*
1752 			 * In RANGE or GROUPS END_CURRENT_ROW mode, frame end is the last
1753 			 * row that is a peer of current row, frame tail is the row after
1754 			 * that (if any).  We keep a copy of the last-known frame tail row
1755 			 * in frametail_slot, and advance as necessary.  Note that if we
1756 			 * reach end of partition, we will leave frametailpos = end+1 and
1757 			 * frametail_slot empty.
1758 			 */
1759 			tuplestore_select_read_pointer(winstate->buffer,
1760 										   winstate->frametail_ptr);
1761 			if (winstate->frametailpos == 0 &&
1762 				TupIsNull(winstate->frametail_slot))
1763 			{
1764 				/* fetch first row into frametail_slot, if we didn't already */
1765 				if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1766 											 winstate->frametail_slot))
1767 					elog(ERROR, "unexpected end of tuplestore");
1768 			}
1769 
1770 			while (!TupIsNull(winstate->frametail_slot))
1771 			{
1772 				if (winstate->frametailpos > winstate->currentpos &&
1773 					!are_peers(winstate, winstate->frametail_slot,
1774 							   winstate->ss.ss_ScanTupleSlot))
1775 					break;		/* this row is the frame tail */
1776 				/* Note we advance frametailpos even if the fetch fails */
1777 				winstate->frametailpos++;
1778 				spool_tuples(winstate, winstate->frametailpos);
1779 				if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1780 											 winstate->frametail_slot))
1781 					break;		/* end of partition */
1782 			}
1783 			winstate->frametail_valid = true;
1784 		}
1785 		else
1786 			Assert(false);
1787 	}
1788 	else if (frameOptions & FRAMEOPTION_END_OFFSET)
1789 	{
1790 		if (frameOptions & FRAMEOPTION_ROWS)
1791 		{
1792 			/* In ROWS mode, bound is physically n before/after current */
1793 			int64		offset = DatumGetInt64(winstate->endOffsetValue);
1794 
1795 			if (frameOptions & FRAMEOPTION_END_OFFSET_PRECEDING)
1796 				offset = -offset;
1797 
1798 			winstate->frametailpos = winstate->currentpos + offset + 1;
1799 			/* smallest allowable value of frametailpos is 0 */
1800 			if (winstate->frametailpos < 0)
1801 				winstate->frametailpos = 0;
1802 			else if (winstate->frametailpos > winstate->currentpos + 1)
1803 			{
1804 				/* make sure frametailpos is not past end of partition */
1805 				spool_tuples(winstate, winstate->frametailpos - 1);
1806 				if (winstate->frametailpos > winstate->spooled_rows)
1807 					winstate->frametailpos = winstate->spooled_rows;
1808 			}
1809 			winstate->frametail_valid = true;
1810 		}
1811 		else if (frameOptions & FRAMEOPTION_RANGE)
1812 		{
1813 			/*
1814 			 * In RANGE END_OFFSET mode, frame end is the last row that
1815 			 * satisfies the in_range constraint relative to the current row,
1816 			 * frame tail is the row after that (if any).  We keep a copy of
1817 			 * the last-known frame tail row in frametail_slot, and advance as
1818 			 * necessary.  Note that if we reach end of partition, we will
1819 			 * leave frametailpos = end+1 and frametail_slot empty.
1820 			 */
1821 			int			sortCol = node->ordColIdx[0];
1822 			bool		sub,
1823 						less;
1824 
1825 			/* We must have an ordering column */
1826 			Assert(node->ordNumCols == 1);
1827 
1828 			/* Precompute flags for in_range checks */
1829 			if (frameOptions & FRAMEOPTION_END_OFFSET_PRECEDING)
1830 				sub = true;		/* subtract endOffset from current row */
1831 			else
1832 				sub = false;	/* add it */
1833 			less = true;		/* normally, we want frame tail <= sum */
1834 			/* If sort order is descending, flip both flags */
1835 			if (!winstate->inRangeAsc)
1836 			{
1837 				sub = !sub;
1838 				less = false;
1839 			}
1840 
1841 			tuplestore_select_read_pointer(winstate->buffer,
1842 										   winstate->frametail_ptr);
1843 			if (winstate->frametailpos == 0 &&
1844 				TupIsNull(winstate->frametail_slot))
1845 			{
1846 				/* fetch first row into frametail_slot, if we didn't already */
1847 				if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1848 											 winstate->frametail_slot))
1849 					elog(ERROR, "unexpected end of tuplestore");
1850 			}
1851 
1852 			while (!TupIsNull(winstate->frametail_slot))
1853 			{
1854 				Datum		tailval,
1855 							currval;
1856 				bool		tailisnull,
1857 							currisnull;
1858 
1859 				tailval = slot_getattr(winstate->frametail_slot, sortCol,
1860 									   &tailisnull);
1861 				currval = slot_getattr(winstate->ss.ss_ScanTupleSlot, sortCol,
1862 									   &currisnull);
1863 				if (tailisnull || currisnull)
1864 				{
1865 					/* order of the rows depends only on nulls_first */
1866 					if (winstate->inRangeNullsFirst)
1867 					{
1868 						/* advance tail if tail is null or curr is not */
1869 						if (!tailisnull)
1870 							break;
1871 					}
1872 					else
1873 					{
1874 						/* advance tail if tail is not null or curr is null */
1875 						if (!currisnull)
1876 							break;
1877 					}
1878 				}
1879 				else
1880 				{
1881 					if (!DatumGetBool(FunctionCall5Coll(&winstate->endInRangeFunc,
1882 														winstate->inRangeColl,
1883 														tailval,
1884 														currval,
1885 														winstate->endOffsetValue,
1886 														BoolGetDatum(sub),
1887 														BoolGetDatum(less))))
1888 						break;	/* this row is the correct frame tail */
1889 				}
1890 				/* Note we advance frametailpos even if the fetch fails */
1891 				winstate->frametailpos++;
1892 				spool_tuples(winstate, winstate->frametailpos);
1893 				if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1894 											 winstate->frametail_slot))
1895 					break;		/* end of partition */
1896 			}
1897 			winstate->frametail_valid = true;
1898 		}
1899 		else if (frameOptions & FRAMEOPTION_GROUPS)
1900 		{
1901 			/*
1902 			 * In GROUPS END_OFFSET mode, frame end is the last row of the
1903 			 * last peer group whose number satisfies the offset constraint,
1904 			 * and frame tail is the row after that (if any).  We keep a copy
1905 			 * of the last-known frame tail row in frametail_slot, and advance
1906 			 * as necessary.  Note that if we reach end of partition, we will
1907 			 * leave frametailpos = end+1 and frametail_slot empty.
1908 			 */
1909 			int64		offset = DatumGetInt64(winstate->endOffsetValue);
1910 			int64		maxtailgroup;
1911 
1912 			if (frameOptions & FRAMEOPTION_END_OFFSET_PRECEDING)
1913 				maxtailgroup = winstate->currentgroup - offset;
1914 			else
1915 				maxtailgroup = winstate->currentgroup + offset;
1916 
1917 			tuplestore_select_read_pointer(winstate->buffer,
1918 										   winstate->frametail_ptr);
1919 			if (winstate->frametailpos == 0 &&
1920 				TupIsNull(winstate->frametail_slot))
1921 			{
1922 				/* fetch first row into frametail_slot, if we didn't already */
1923 				if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1924 											 winstate->frametail_slot))
1925 					elog(ERROR, "unexpected end of tuplestore");
1926 			}
1927 
1928 			while (!TupIsNull(winstate->frametail_slot))
1929 			{
1930 				if (winstate->frametailgroup > maxtailgroup)
1931 					break;		/* this row is the correct frame tail */
1932 				ExecCopySlot(winstate->temp_slot_2, winstate->frametail_slot);
1933 				/* Note we advance frametailpos even if the fetch fails */
1934 				winstate->frametailpos++;
1935 				spool_tuples(winstate, winstate->frametailpos);
1936 				if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1937 											 winstate->frametail_slot))
1938 					break;		/* end of partition */
1939 				if (!are_peers(winstate, winstate->temp_slot_2,
1940 							   winstate->frametail_slot))
1941 					winstate->frametailgroup++;
1942 			}
1943 			ExecClearTuple(winstate->temp_slot_2);
1944 			winstate->frametail_valid = true;
1945 		}
1946 		else
1947 			Assert(false);
1948 	}
1949 	else
1950 		Assert(false);
1951 
1952 	MemoryContextSwitchTo(oldcontext);
1953 }
1954 
1955 /*
1956  * update_grouptailpos
1957  * make grouptailpos valid for the current row
1958  *
1959  * May clobber winstate->temp_slot_2.
1960  */
1961 static void
update_grouptailpos(WindowAggState * winstate)1962 update_grouptailpos(WindowAggState *winstate)
1963 {
1964 	WindowAgg  *node = (WindowAgg *) winstate->ss.ps.plan;
1965 	MemoryContext oldcontext;
1966 
1967 	if (winstate->grouptail_valid)
1968 		return;					/* already known for current row */
1969 
1970 	/* We may be called in a short-lived context */
1971 	oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
1972 
1973 	/* If no ORDER BY, all rows are peers with each other */
1974 	if (node->ordNumCols == 0)
1975 	{
1976 		spool_tuples(winstate, -1);
1977 		winstate->grouptailpos = winstate->spooled_rows;
1978 		winstate->grouptail_valid = true;
1979 		MemoryContextSwitchTo(oldcontext);
1980 		return;
1981 	}
1982 
1983 	/*
1984 	 * Because grouptail_valid is reset only when current row advances into a
1985 	 * new peer group, we always reach here knowing that grouptailpos needs to
1986 	 * be advanced by at least one row.  Hence, unlike the otherwise similar
1987 	 * case for frame tail tracking, we do not need persistent storage of the
1988 	 * group tail row.
1989 	 */
1990 	Assert(winstate->grouptailpos <= winstate->currentpos);
1991 	tuplestore_select_read_pointer(winstate->buffer,
1992 								   winstate->grouptail_ptr);
1993 	for (;;)
1994 	{
1995 		/* Note we advance grouptailpos even if the fetch fails */
1996 		winstate->grouptailpos++;
1997 		spool_tuples(winstate, winstate->grouptailpos);
1998 		if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1999 									 winstate->temp_slot_2))
2000 			break;				/* end of partition */
2001 		if (winstate->grouptailpos > winstate->currentpos &&
2002 			!are_peers(winstate, winstate->temp_slot_2,
2003 					   winstate->ss.ss_ScanTupleSlot))
2004 			break;				/* this row is the group tail */
2005 	}
2006 	ExecClearTuple(winstate->temp_slot_2);
2007 	winstate->grouptail_valid = true;
2008 
2009 	MemoryContextSwitchTo(oldcontext);
2010 }
2011 
2012 
2013 /* -----------------
2014  * ExecWindowAgg
2015  *
2016  *	ExecWindowAgg receives tuples from its outer subplan and
2017  *	stores them into a tuplestore, then processes window functions.
2018  *	This node doesn't reduce nor qualify any row so the number of
2019  *	returned rows is exactly the same as its outer subplan's result.
2020  * -----------------
2021  */
2022 static TupleTableSlot *
ExecWindowAgg(PlanState * pstate)2023 ExecWindowAgg(PlanState *pstate)
2024 {
2025 	WindowAggState *winstate = castNode(WindowAggState, pstate);
2026 	ExprContext *econtext;
2027 	int			i;
2028 	int			numfuncs;
2029 
2030 	CHECK_FOR_INTERRUPTS();
2031 
2032 	if (winstate->all_done)
2033 		return NULL;
2034 
2035 	/*
2036 	 * Compute frame offset values, if any, during first call (or after a
2037 	 * rescan).  These are assumed to hold constant throughout the scan; if
2038 	 * user gives us a volatile expression, we'll only use its initial value.
2039 	 */
2040 	if (winstate->all_first)
2041 	{
2042 		int			frameOptions = winstate->frameOptions;
2043 		ExprContext *econtext = winstate->ss.ps.ps_ExprContext;
2044 		Datum		value;
2045 		bool		isnull;
2046 		int16		len;
2047 		bool		byval;
2048 
2049 		if (frameOptions & FRAMEOPTION_START_OFFSET)
2050 		{
2051 			Assert(winstate->startOffset != NULL);
2052 			value = ExecEvalExprSwitchContext(winstate->startOffset,
2053 											  econtext,
2054 											  &isnull);
2055 			if (isnull)
2056 				ereport(ERROR,
2057 						(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
2058 						 errmsg("frame starting offset must not be null")));
2059 			/* copy value into query-lifespan context */
2060 			get_typlenbyval(exprType((Node *) winstate->startOffset->expr),
2061 							&len, &byval);
2062 			winstate->startOffsetValue = datumCopy(value, byval, len);
2063 			if (frameOptions & (FRAMEOPTION_ROWS | FRAMEOPTION_GROUPS))
2064 			{
2065 				/* value is known to be int8 */
2066 				int64		offset = DatumGetInt64(value);
2067 
2068 				if (offset < 0)
2069 					ereport(ERROR,
2070 							(errcode(ERRCODE_INVALID_PRECEDING_OR_FOLLOWING_SIZE),
2071 							 errmsg("frame starting offset must not be negative")));
2072 			}
2073 		}
2074 		if (frameOptions & FRAMEOPTION_END_OFFSET)
2075 		{
2076 			Assert(winstate->endOffset != NULL);
2077 			value = ExecEvalExprSwitchContext(winstate->endOffset,
2078 											  econtext,
2079 											  &isnull);
2080 			if (isnull)
2081 				ereport(ERROR,
2082 						(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
2083 						 errmsg("frame ending offset must not be null")));
2084 			/* copy value into query-lifespan context */
2085 			get_typlenbyval(exprType((Node *) winstate->endOffset->expr),
2086 							&len, &byval);
2087 			winstate->endOffsetValue = datumCopy(value, byval, len);
2088 			if (frameOptions & (FRAMEOPTION_ROWS | FRAMEOPTION_GROUPS))
2089 			{
2090 				/* value is known to be int8 */
2091 				int64		offset = DatumGetInt64(value);
2092 
2093 				if (offset < 0)
2094 					ereport(ERROR,
2095 							(errcode(ERRCODE_INVALID_PRECEDING_OR_FOLLOWING_SIZE),
2096 							 errmsg("frame ending offset must not be negative")));
2097 			}
2098 		}
2099 		winstate->all_first = false;
2100 	}
2101 
2102 	if (winstate->buffer == NULL)
2103 	{
2104 		/* Initialize for first partition and set current row = 0 */
2105 		begin_partition(winstate);
2106 		/* If there are no input rows, we'll detect that and exit below */
2107 	}
2108 	else
2109 	{
2110 		/* Advance current row within partition */
2111 		winstate->currentpos++;
2112 		/* This might mean that the frame moves, too */
2113 		winstate->framehead_valid = false;
2114 		winstate->frametail_valid = false;
2115 		/* we don't need to invalidate grouptail here; see below */
2116 	}
2117 
2118 	/*
2119 	 * Spool all tuples up to and including the current row, if we haven't
2120 	 * already
2121 	 */
2122 	spool_tuples(winstate, winstate->currentpos);
2123 
2124 	/* Move to the next partition if we reached the end of this partition */
2125 	if (winstate->partition_spooled &&
2126 		winstate->currentpos >= winstate->spooled_rows)
2127 	{
2128 		release_partition(winstate);
2129 
2130 		if (winstate->more_partitions)
2131 		{
2132 			begin_partition(winstate);
2133 			Assert(winstate->spooled_rows > 0);
2134 		}
2135 		else
2136 		{
2137 			winstate->all_done = true;
2138 			return NULL;
2139 		}
2140 	}
2141 
2142 	/* final output execution is in ps_ExprContext */
2143 	econtext = winstate->ss.ps.ps_ExprContext;
2144 
2145 	/* Clear the per-output-tuple context for current row */
2146 	ResetExprContext(econtext);
2147 
2148 	/*
2149 	 * Read the current row from the tuplestore, and save in ScanTupleSlot.
2150 	 * (We can't rely on the outerplan's output slot because we may have to
2151 	 * read beyond the current row.  Also, we have to actually copy the row
2152 	 * out of the tuplestore, since window function evaluation might cause the
2153 	 * tuplestore to dump its state to disk.)
2154 	 *
2155 	 * In GROUPS mode, or when tracking a group-oriented exclusion clause, we
2156 	 * must also detect entering a new peer group and update associated state
2157 	 * when that happens.  We use temp_slot_2 to temporarily hold the previous
2158 	 * row for this purpose.
2159 	 *
2160 	 * Current row must be in the tuplestore, since we spooled it above.
2161 	 */
2162 	tuplestore_select_read_pointer(winstate->buffer, winstate->current_ptr);
2163 	if ((winstate->frameOptions & (FRAMEOPTION_GROUPS |
2164 								   FRAMEOPTION_EXCLUDE_GROUP |
2165 								   FRAMEOPTION_EXCLUDE_TIES)) &&
2166 		winstate->currentpos > 0)
2167 	{
2168 		ExecCopySlot(winstate->temp_slot_2, winstate->ss.ss_ScanTupleSlot);
2169 		if (!tuplestore_gettupleslot(winstate->buffer, true, true,
2170 									 winstate->ss.ss_ScanTupleSlot))
2171 			elog(ERROR, "unexpected end of tuplestore");
2172 		if (!are_peers(winstate, winstate->temp_slot_2,
2173 					   winstate->ss.ss_ScanTupleSlot))
2174 		{
2175 			winstate->currentgroup++;
2176 			winstate->groupheadpos = winstate->currentpos;
2177 			winstate->grouptail_valid = false;
2178 		}
2179 		ExecClearTuple(winstate->temp_slot_2);
2180 	}
2181 	else
2182 	{
2183 		if (!tuplestore_gettupleslot(winstate->buffer, true, true,
2184 									 winstate->ss.ss_ScanTupleSlot))
2185 			elog(ERROR, "unexpected end of tuplestore");
2186 	}
2187 
2188 	/*
2189 	 * Evaluate true window functions
2190 	 */
2191 	numfuncs = winstate->numfuncs;
2192 	for (i = 0; i < numfuncs; i++)
2193 	{
2194 		WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
2195 
2196 		if (perfuncstate->plain_agg)
2197 			continue;
2198 		eval_windowfunction(winstate, perfuncstate,
2199 							&(econtext->ecxt_aggvalues[perfuncstate->wfuncstate->wfuncno]),
2200 							&(econtext->ecxt_aggnulls[perfuncstate->wfuncstate->wfuncno]));
2201 	}
2202 
2203 	/*
2204 	 * Evaluate aggregates
2205 	 */
2206 	if (winstate->numaggs > 0)
2207 		eval_windowaggregates(winstate);
2208 
2209 	/*
2210 	 * If we have created auxiliary read pointers for the frame or group
2211 	 * boundaries, force them to be kept up-to-date, because we don't know
2212 	 * whether the window function(s) will do anything that requires that.
2213 	 * Failing to advance the pointers would result in being unable to trim
2214 	 * data from the tuplestore, which is bad.  (If we could know in advance
2215 	 * whether the window functions will use frame boundary info, we could
2216 	 * skip creating these pointers in the first place ... but unfortunately
2217 	 * the window function API doesn't require that.)
2218 	 */
2219 	if (winstate->framehead_ptr >= 0)
2220 		update_frameheadpos(winstate);
2221 	if (winstate->frametail_ptr >= 0)
2222 		update_frametailpos(winstate);
2223 	if (winstate->grouptail_ptr >= 0)
2224 		update_grouptailpos(winstate);
2225 
2226 	/*
2227 	 * Truncate any no-longer-needed rows from the tuplestore.
2228 	 */
2229 	tuplestore_trim(winstate->buffer);
2230 
2231 	/*
2232 	 * Form and return a projection tuple using the windowfunc results and the
2233 	 * current row.  Setting ecxt_outertuple arranges that any Vars will be
2234 	 * evaluated with respect to that row.
2235 	 */
2236 	econtext->ecxt_outertuple = winstate->ss.ss_ScanTupleSlot;
2237 
2238 	return ExecProject(winstate->ss.ps.ps_ProjInfo);
2239 }
2240 
2241 /* -----------------
2242  * ExecInitWindowAgg
2243  *
2244  *	Creates the run-time information for the WindowAgg node produced by the
2245  *	planner and initializes its outer subtree
2246  * -----------------
2247  */
2248 WindowAggState *
ExecInitWindowAgg(WindowAgg * node,EState * estate,int eflags)2249 ExecInitWindowAgg(WindowAgg *node, EState *estate, int eflags)
2250 {
2251 	WindowAggState *winstate;
2252 	Plan	   *outerPlan;
2253 	ExprContext *econtext;
2254 	ExprContext *tmpcontext;
2255 	WindowStatePerFunc perfunc;
2256 	WindowStatePerAgg peragg;
2257 	int			frameOptions = node->frameOptions;
2258 	int			numfuncs,
2259 				wfuncno,
2260 				numaggs,
2261 				aggno;
2262 	TupleDesc	scanDesc;
2263 	ListCell   *l;
2264 
2265 	/* check for unsupported flags */
2266 	Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
2267 
2268 	/*
2269 	 * create state structure
2270 	 */
2271 	winstate = makeNode(WindowAggState);
2272 	winstate->ss.ps.plan = (Plan *) node;
2273 	winstate->ss.ps.state = estate;
2274 	winstate->ss.ps.ExecProcNode = ExecWindowAgg;
2275 
2276 	/*
2277 	 * Create expression contexts.  We need two, one for per-input-tuple
2278 	 * processing and one for per-output-tuple processing.  We cheat a little
2279 	 * by using ExecAssignExprContext() to build both.
2280 	 */
2281 	ExecAssignExprContext(estate, &winstate->ss.ps);
2282 	tmpcontext = winstate->ss.ps.ps_ExprContext;
2283 	winstate->tmpcontext = tmpcontext;
2284 	ExecAssignExprContext(estate, &winstate->ss.ps);
2285 
2286 	/* Create long-lived context for storage of partition-local memory etc */
2287 	winstate->partcontext =
2288 		AllocSetContextCreate(CurrentMemoryContext,
2289 							  "WindowAgg Partition",
2290 							  ALLOCSET_DEFAULT_SIZES);
2291 
2292 	/*
2293 	 * Create mid-lived context for aggregate trans values etc.
2294 	 *
2295 	 * Note that moving aggregates each use their own private context, not
2296 	 * this one.
2297 	 */
2298 	winstate->aggcontext =
2299 		AllocSetContextCreate(CurrentMemoryContext,
2300 							  "WindowAgg Aggregates",
2301 							  ALLOCSET_DEFAULT_SIZES);
2302 
2303 	/*
2304 	 * WindowAgg nodes never have quals, since they can only occur at the
2305 	 * logical top level of a query (ie, after any WHERE or HAVING filters)
2306 	 */
2307 	Assert(node->plan.qual == NIL);
2308 	winstate->ss.ps.qual = NULL;
2309 
2310 	/*
2311 	 * initialize child nodes
2312 	 */
2313 	outerPlan = outerPlan(node);
2314 	outerPlanState(winstate) = ExecInitNode(outerPlan, estate, eflags);
2315 
2316 	/*
2317 	 * initialize source tuple type (which is also the tuple type that we'll
2318 	 * store in the tuplestore and use in all our working slots).
2319 	 */
2320 	ExecCreateScanSlotFromOuterPlan(estate, &winstate->ss, &TTSOpsMinimalTuple);
2321 	scanDesc = winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
2322 
2323 	/* the outer tuple isn't the child's tuple, but always a minimal tuple */
2324 	winstate->ss.ps.outeropsset = true;
2325 	winstate->ss.ps.outerops = &TTSOpsMinimalTuple;
2326 	winstate->ss.ps.outeropsfixed = true;
2327 
2328 	/*
2329 	 * tuple table initialization
2330 	 */
2331 	winstate->first_part_slot = ExecInitExtraTupleSlot(estate, scanDesc,
2332 													   &TTSOpsMinimalTuple);
2333 	winstate->agg_row_slot = ExecInitExtraTupleSlot(estate, scanDesc,
2334 													&TTSOpsMinimalTuple);
2335 	winstate->temp_slot_1 = ExecInitExtraTupleSlot(estate, scanDesc,
2336 												   &TTSOpsMinimalTuple);
2337 	winstate->temp_slot_2 = ExecInitExtraTupleSlot(estate, scanDesc,
2338 												   &TTSOpsMinimalTuple);
2339 
2340 	/*
2341 	 * create frame head and tail slots only if needed (must create slots in
2342 	 * exactly the same cases that update_frameheadpos and update_frametailpos
2343 	 * need them)
2344 	 */
2345 	winstate->framehead_slot = winstate->frametail_slot = NULL;
2346 
2347 	if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS))
2348 	{
2349 		if (((frameOptions & FRAMEOPTION_START_CURRENT_ROW) &&
2350 			 node->ordNumCols != 0) ||
2351 			(frameOptions & FRAMEOPTION_START_OFFSET))
2352 			winstate->framehead_slot = ExecInitExtraTupleSlot(estate, scanDesc,
2353 															  &TTSOpsMinimalTuple);
2354 		if (((frameOptions & FRAMEOPTION_END_CURRENT_ROW) &&
2355 			 node->ordNumCols != 0) ||
2356 			(frameOptions & FRAMEOPTION_END_OFFSET))
2357 			winstate->frametail_slot = ExecInitExtraTupleSlot(estate, scanDesc,
2358 															  &TTSOpsMinimalTuple);
2359 	}
2360 
2361 	/*
2362 	 * Initialize result slot, type and projection.
2363 	 */
2364 	ExecInitResultTupleSlotTL(&winstate->ss.ps, &TTSOpsVirtual);
2365 	ExecAssignProjectionInfo(&winstate->ss.ps, NULL);
2366 
2367 	/* Set up data for comparing tuples */
2368 	if (node->partNumCols > 0)
2369 		winstate->partEqfunction =
2370 			execTuplesMatchPrepare(scanDesc,
2371 								   node->partNumCols,
2372 								   node->partColIdx,
2373 								   node->partOperators,
2374 								   node->partCollations,
2375 								   &winstate->ss.ps);
2376 
2377 	if (node->ordNumCols > 0)
2378 		winstate->ordEqfunction =
2379 			execTuplesMatchPrepare(scanDesc,
2380 								   node->ordNumCols,
2381 								   node->ordColIdx,
2382 								   node->ordOperators,
2383 								   node->ordCollations,
2384 								   &winstate->ss.ps);
2385 
2386 	/*
2387 	 * WindowAgg nodes use aggvalues and aggnulls as well as Agg nodes.
2388 	 */
2389 	numfuncs = winstate->numfuncs;
2390 	numaggs = winstate->numaggs;
2391 	econtext = winstate->ss.ps.ps_ExprContext;
2392 	econtext->ecxt_aggvalues = (Datum *) palloc0(sizeof(Datum) * numfuncs);
2393 	econtext->ecxt_aggnulls = (bool *) palloc0(sizeof(bool) * numfuncs);
2394 
2395 	/*
2396 	 * allocate per-wfunc/per-agg state information.
2397 	 */
2398 	perfunc = (WindowStatePerFunc) palloc0(sizeof(WindowStatePerFuncData) * numfuncs);
2399 	peragg = (WindowStatePerAgg) palloc0(sizeof(WindowStatePerAggData) * numaggs);
2400 	winstate->perfunc = perfunc;
2401 	winstate->peragg = peragg;
2402 
2403 	wfuncno = -1;
2404 	aggno = -1;
2405 	foreach(l, winstate->funcs)
2406 	{
2407 		WindowFuncExprState *wfuncstate = (WindowFuncExprState *) lfirst(l);
2408 		WindowFunc *wfunc = wfuncstate->wfunc;
2409 		WindowStatePerFunc perfuncstate;
2410 		AclResult	aclresult;
2411 		int			i;
2412 
2413 		if (wfunc->winref != node->winref)	/* planner screwed up? */
2414 			elog(ERROR, "WindowFunc with winref %u assigned to WindowAgg with winref %u",
2415 				 wfunc->winref, node->winref);
2416 
2417 		/* Look for a previous duplicate window function */
2418 		for (i = 0; i <= wfuncno; i++)
2419 		{
2420 			if (equal(wfunc, perfunc[i].wfunc) &&
2421 				!contain_volatile_functions((Node *) wfunc))
2422 				break;
2423 		}
2424 		if (i <= wfuncno)
2425 		{
2426 			/* Found a match to an existing entry, so just mark it */
2427 			wfuncstate->wfuncno = i;
2428 			continue;
2429 		}
2430 
2431 		/* Nope, so assign a new PerAgg record */
2432 		perfuncstate = &perfunc[++wfuncno];
2433 
2434 		/* Mark WindowFunc state node with assigned index in the result array */
2435 		wfuncstate->wfuncno = wfuncno;
2436 
2437 		/* Check permission to call window function */
2438 		aclresult = pg_proc_aclcheck(wfunc->winfnoid, GetUserId(),
2439 									 ACL_EXECUTE);
2440 		if (aclresult != ACLCHECK_OK)
2441 			aclcheck_error(aclresult, OBJECT_FUNCTION,
2442 						   get_func_name(wfunc->winfnoid));
2443 		InvokeFunctionExecuteHook(wfunc->winfnoid);
2444 
2445 		/* Fill in the perfuncstate data */
2446 		perfuncstate->wfuncstate = wfuncstate;
2447 		perfuncstate->wfunc = wfunc;
2448 		perfuncstate->numArguments = list_length(wfuncstate->args);
2449 
2450 		fmgr_info_cxt(wfunc->winfnoid, &perfuncstate->flinfo,
2451 					  econtext->ecxt_per_query_memory);
2452 		fmgr_info_set_expr((Node *) wfunc, &perfuncstate->flinfo);
2453 
2454 		perfuncstate->winCollation = wfunc->inputcollid;
2455 
2456 		get_typlenbyval(wfunc->wintype,
2457 						&perfuncstate->resulttypeLen,
2458 						&perfuncstate->resulttypeByVal);
2459 
2460 		/*
2461 		 * If it's really just a plain aggregate function, we'll emulate the
2462 		 * Agg environment for it.
2463 		 */
2464 		perfuncstate->plain_agg = wfunc->winagg;
2465 		if (wfunc->winagg)
2466 		{
2467 			WindowStatePerAgg peraggstate;
2468 
2469 			perfuncstate->aggno = ++aggno;
2470 			peraggstate = &winstate->peragg[aggno];
2471 			initialize_peragg(winstate, wfunc, peraggstate);
2472 			peraggstate->wfuncno = wfuncno;
2473 		}
2474 		else
2475 		{
2476 			WindowObject winobj = makeNode(WindowObjectData);
2477 
2478 			winobj->winstate = winstate;
2479 			winobj->argstates = wfuncstate->args;
2480 			winobj->localmem = NULL;
2481 			perfuncstate->winobj = winobj;
2482 		}
2483 	}
2484 
2485 	/* Update numfuncs, numaggs to match number of unique functions found */
2486 	winstate->numfuncs = wfuncno + 1;
2487 	winstate->numaggs = aggno + 1;
2488 
2489 	/* Set up WindowObject for aggregates, if needed */
2490 	if (winstate->numaggs > 0)
2491 	{
2492 		WindowObject agg_winobj = makeNode(WindowObjectData);
2493 
2494 		agg_winobj->winstate = winstate;
2495 		agg_winobj->argstates = NIL;
2496 		agg_winobj->localmem = NULL;
2497 		/* make sure markptr = -1 to invalidate. It may not get used */
2498 		agg_winobj->markptr = -1;
2499 		agg_winobj->readptr = -1;
2500 		winstate->agg_winobj = agg_winobj;
2501 	}
2502 
2503 	/* copy frame options to state node for easy access */
2504 	winstate->frameOptions = frameOptions;
2505 
2506 	/* initialize frame bound offset expressions */
2507 	winstate->startOffset = ExecInitExpr((Expr *) node->startOffset,
2508 										 (PlanState *) winstate);
2509 	winstate->endOffset = ExecInitExpr((Expr *) node->endOffset,
2510 									   (PlanState *) winstate);
2511 
2512 	/* Lookup in_range support functions if needed */
2513 	if (OidIsValid(node->startInRangeFunc))
2514 		fmgr_info(node->startInRangeFunc, &winstate->startInRangeFunc);
2515 	if (OidIsValid(node->endInRangeFunc))
2516 		fmgr_info(node->endInRangeFunc, &winstate->endInRangeFunc);
2517 	winstate->inRangeColl = node->inRangeColl;
2518 	winstate->inRangeAsc = node->inRangeAsc;
2519 	winstate->inRangeNullsFirst = node->inRangeNullsFirst;
2520 
2521 	winstate->all_first = true;
2522 	winstate->partition_spooled = false;
2523 	winstate->more_partitions = false;
2524 
2525 	return winstate;
2526 }
2527 
2528 /* -----------------
2529  * ExecEndWindowAgg
2530  * -----------------
2531  */
2532 void
ExecEndWindowAgg(WindowAggState * node)2533 ExecEndWindowAgg(WindowAggState *node)
2534 {
2535 	PlanState  *outerPlan;
2536 	int			i;
2537 
2538 	release_partition(node);
2539 
2540 	ExecClearTuple(node->ss.ss_ScanTupleSlot);
2541 	ExecClearTuple(node->first_part_slot);
2542 	ExecClearTuple(node->agg_row_slot);
2543 	ExecClearTuple(node->temp_slot_1);
2544 	ExecClearTuple(node->temp_slot_2);
2545 	if (node->framehead_slot)
2546 		ExecClearTuple(node->framehead_slot);
2547 	if (node->frametail_slot)
2548 		ExecClearTuple(node->frametail_slot);
2549 
2550 	/*
2551 	 * Free both the expr contexts.
2552 	 */
2553 	ExecFreeExprContext(&node->ss.ps);
2554 	node->ss.ps.ps_ExprContext = node->tmpcontext;
2555 	ExecFreeExprContext(&node->ss.ps);
2556 
2557 	for (i = 0; i < node->numaggs; i++)
2558 	{
2559 		if (node->peragg[i].aggcontext != node->aggcontext)
2560 			MemoryContextDelete(node->peragg[i].aggcontext);
2561 	}
2562 	MemoryContextDelete(node->partcontext);
2563 	MemoryContextDelete(node->aggcontext);
2564 
2565 	pfree(node->perfunc);
2566 	pfree(node->peragg);
2567 
2568 	outerPlan = outerPlanState(node);
2569 	ExecEndNode(outerPlan);
2570 }
2571 
2572 /* -----------------
2573  * ExecReScanWindowAgg
2574  * -----------------
2575  */
2576 void
ExecReScanWindowAgg(WindowAggState * node)2577 ExecReScanWindowAgg(WindowAggState *node)
2578 {
2579 	PlanState  *outerPlan = outerPlanState(node);
2580 	ExprContext *econtext = node->ss.ps.ps_ExprContext;
2581 
2582 	node->all_done = false;
2583 	node->all_first = true;
2584 
2585 	/* release tuplestore et al */
2586 	release_partition(node);
2587 
2588 	/* release all temp tuples, but especially first_part_slot */
2589 	ExecClearTuple(node->ss.ss_ScanTupleSlot);
2590 	ExecClearTuple(node->first_part_slot);
2591 	ExecClearTuple(node->agg_row_slot);
2592 	ExecClearTuple(node->temp_slot_1);
2593 	ExecClearTuple(node->temp_slot_2);
2594 	if (node->framehead_slot)
2595 		ExecClearTuple(node->framehead_slot);
2596 	if (node->frametail_slot)
2597 		ExecClearTuple(node->frametail_slot);
2598 
2599 	/* Forget current wfunc values */
2600 	MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * node->numfuncs);
2601 	MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * node->numfuncs);
2602 
2603 	/*
2604 	 * if chgParam of subnode is not null then plan will be re-scanned by
2605 	 * first ExecProcNode.
2606 	 */
2607 	if (outerPlan->chgParam == NULL)
2608 		ExecReScan(outerPlan);
2609 }
2610 
2611 /*
2612  * initialize_peragg
2613  *
2614  * Almost same as in nodeAgg.c, except we don't support DISTINCT currently.
2615  */
2616 static WindowStatePerAggData *
initialize_peragg(WindowAggState * winstate,WindowFunc * wfunc,WindowStatePerAgg peraggstate)2617 initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
2618 				  WindowStatePerAgg peraggstate)
2619 {
2620 	Oid			inputTypes[FUNC_MAX_ARGS];
2621 	int			numArguments;
2622 	HeapTuple	aggTuple;
2623 	Form_pg_aggregate aggform;
2624 	Oid			aggtranstype;
2625 	AttrNumber	initvalAttNo;
2626 	AclResult	aclresult;
2627 	bool		use_ma_code;
2628 	Oid			transfn_oid,
2629 				invtransfn_oid,
2630 				finalfn_oid;
2631 	bool		finalextra;
2632 	char		finalmodify;
2633 	Expr	   *transfnexpr,
2634 			   *invtransfnexpr,
2635 			   *finalfnexpr;
2636 	Datum		textInitVal;
2637 	int			i;
2638 	ListCell   *lc;
2639 
2640 	numArguments = list_length(wfunc->args);
2641 
2642 	i = 0;
2643 	foreach(lc, wfunc->args)
2644 	{
2645 		inputTypes[i++] = exprType((Node *) lfirst(lc));
2646 	}
2647 
2648 	aggTuple = SearchSysCache1(AGGFNOID, ObjectIdGetDatum(wfunc->winfnoid));
2649 	if (!HeapTupleIsValid(aggTuple))
2650 		elog(ERROR, "cache lookup failed for aggregate %u",
2651 			 wfunc->winfnoid);
2652 	aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
2653 
2654 	/*
2655 	 * Figure out whether we want to use the moving-aggregate implementation,
2656 	 * and collect the right set of fields from the pg_attribute entry.
2657 	 *
2658 	 * It's possible that an aggregate would supply a safe moving-aggregate
2659 	 * implementation and an unsafe normal one, in which case our hand is
2660 	 * forced.  Otherwise, if the frame head can't move, we don't need
2661 	 * moving-aggregate code.  Even if we'd like to use it, don't do so if the
2662 	 * aggregate's arguments (and FILTER clause if any) contain any calls to
2663 	 * volatile functions.  Otherwise, the difference between restarting and
2664 	 * not restarting the aggregation would be user-visible.
2665 	 */
2666 	if (!OidIsValid(aggform->aggminvtransfn))
2667 		use_ma_code = false;	/* sine qua non */
2668 	else if (aggform->aggmfinalmodify == AGGMODIFY_READ_ONLY &&
2669 			 aggform->aggfinalmodify != AGGMODIFY_READ_ONLY)
2670 		use_ma_code = true;		/* decision forced by safety */
2671 	else if (winstate->frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING)
2672 		use_ma_code = false;	/* non-moving frame head */
2673 	else if (contain_volatile_functions((Node *) wfunc))
2674 		use_ma_code = false;	/* avoid possible behavioral change */
2675 	else
2676 		use_ma_code = true;		/* yes, let's use it */
2677 	if (use_ma_code)
2678 	{
2679 		peraggstate->transfn_oid = transfn_oid = aggform->aggmtransfn;
2680 		peraggstate->invtransfn_oid = invtransfn_oid = aggform->aggminvtransfn;
2681 		peraggstate->finalfn_oid = finalfn_oid = aggform->aggmfinalfn;
2682 		finalextra = aggform->aggmfinalextra;
2683 		finalmodify = aggform->aggmfinalmodify;
2684 		aggtranstype = aggform->aggmtranstype;
2685 		initvalAttNo = Anum_pg_aggregate_aggminitval;
2686 	}
2687 	else
2688 	{
2689 		peraggstate->transfn_oid = transfn_oid = aggform->aggtransfn;
2690 		peraggstate->invtransfn_oid = invtransfn_oid = InvalidOid;
2691 		peraggstate->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
2692 		finalextra = aggform->aggfinalextra;
2693 		finalmodify = aggform->aggfinalmodify;
2694 		aggtranstype = aggform->aggtranstype;
2695 		initvalAttNo = Anum_pg_aggregate_agginitval;
2696 	}
2697 
2698 	/*
2699 	 * ExecInitWindowAgg already checked permission to call aggregate function
2700 	 * ... but we still need to check the component functions
2701 	 */
2702 
2703 	/* Check that aggregate owner has permission to call component fns */
2704 	{
2705 		HeapTuple	procTuple;
2706 		Oid			aggOwner;
2707 
2708 		procTuple = SearchSysCache1(PROCOID,
2709 									ObjectIdGetDatum(wfunc->winfnoid));
2710 		if (!HeapTupleIsValid(procTuple))
2711 			elog(ERROR, "cache lookup failed for function %u",
2712 				 wfunc->winfnoid);
2713 		aggOwner = ((Form_pg_proc) GETSTRUCT(procTuple))->proowner;
2714 		ReleaseSysCache(procTuple);
2715 
2716 		aclresult = pg_proc_aclcheck(transfn_oid, aggOwner,
2717 									 ACL_EXECUTE);
2718 		if (aclresult != ACLCHECK_OK)
2719 			aclcheck_error(aclresult, OBJECT_FUNCTION,
2720 						   get_func_name(transfn_oid));
2721 		InvokeFunctionExecuteHook(transfn_oid);
2722 
2723 		if (OidIsValid(invtransfn_oid))
2724 		{
2725 			aclresult = pg_proc_aclcheck(invtransfn_oid, aggOwner,
2726 										 ACL_EXECUTE);
2727 			if (aclresult != ACLCHECK_OK)
2728 				aclcheck_error(aclresult, OBJECT_FUNCTION,
2729 							   get_func_name(invtransfn_oid));
2730 			InvokeFunctionExecuteHook(invtransfn_oid);
2731 		}
2732 
2733 		if (OidIsValid(finalfn_oid))
2734 		{
2735 			aclresult = pg_proc_aclcheck(finalfn_oid, aggOwner,
2736 										 ACL_EXECUTE);
2737 			if (aclresult != ACLCHECK_OK)
2738 				aclcheck_error(aclresult, OBJECT_FUNCTION,
2739 							   get_func_name(finalfn_oid));
2740 			InvokeFunctionExecuteHook(finalfn_oid);
2741 		}
2742 	}
2743 
2744 	/*
2745 	 * If the selected finalfn isn't read-only, we can't run this aggregate as
2746 	 * a window function.  This is a user-facing error, so we take a bit more
2747 	 * care with the error message than elsewhere in this function.
2748 	 */
2749 	if (finalmodify != AGGMODIFY_READ_ONLY)
2750 		ereport(ERROR,
2751 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2752 				 errmsg("aggregate function %s does not support use as a window function",
2753 						format_procedure(wfunc->winfnoid))));
2754 
2755 	/* Detect how many arguments to pass to the finalfn */
2756 	if (finalextra)
2757 		peraggstate->numFinalArgs = numArguments + 1;
2758 	else
2759 		peraggstate->numFinalArgs = 1;
2760 
2761 	/* resolve actual type of transition state, if polymorphic */
2762 	aggtranstype = resolve_aggregate_transtype(wfunc->winfnoid,
2763 											   aggtranstype,
2764 											   inputTypes,
2765 											   numArguments);
2766 
2767 	/* build expression trees using actual argument & result types */
2768 	build_aggregate_transfn_expr(inputTypes,
2769 								 numArguments,
2770 								 0, /* no ordered-set window functions yet */
2771 								 false, /* no variadic window functions yet */
2772 								 aggtranstype,
2773 								 wfunc->inputcollid,
2774 								 transfn_oid,
2775 								 invtransfn_oid,
2776 								 &transfnexpr,
2777 								 &invtransfnexpr);
2778 
2779 	/* set up infrastructure for calling the transfn(s) and finalfn */
2780 	fmgr_info(transfn_oid, &peraggstate->transfn);
2781 	fmgr_info_set_expr((Node *) transfnexpr, &peraggstate->transfn);
2782 
2783 	if (OidIsValid(invtransfn_oid))
2784 	{
2785 		fmgr_info(invtransfn_oid, &peraggstate->invtransfn);
2786 		fmgr_info_set_expr((Node *) invtransfnexpr, &peraggstate->invtransfn);
2787 	}
2788 
2789 	if (OidIsValid(finalfn_oid))
2790 	{
2791 		build_aggregate_finalfn_expr(inputTypes,
2792 									 peraggstate->numFinalArgs,
2793 									 aggtranstype,
2794 									 wfunc->wintype,
2795 									 wfunc->inputcollid,
2796 									 finalfn_oid,
2797 									 &finalfnexpr);
2798 		fmgr_info(finalfn_oid, &peraggstate->finalfn);
2799 		fmgr_info_set_expr((Node *) finalfnexpr, &peraggstate->finalfn);
2800 	}
2801 
2802 	/* get info about relevant datatypes */
2803 	get_typlenbyval(wfunc->wintype,
2804 					&peraggstate->resulttypeLen,
2805 					&peraggstate->resulttypeByVal);
2806 	get_typlenbyval(aggtranstype,
2807 					&peraggstate->transtypeLen,
2808 					&peraggstate->transtypeByVal);
2809 
2810 	/*
2811 	 * initval is potentially null, so don't try to access it as a struct
2812 	 * field. Must do it the hard way with SysCacheGetAttr.
2813 	 */
2814 	textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple, initvalAttNo,
2815 								  &peraggstate->initValueIsNull);
2816 
2817 	if (peraggstate->initValueIsNull)
2818 		peraggstate->initValue = (Datum) 0;
2819 	else
2820 		peraggstate->initValue = GetAggInitVal(textInitVal,
2821 											   aggtranstype);
2822 
2823 	/*
2824 	 * If the transfn is strict and the initval is NULL, make sure input type
2825 	 * and transtype are the same (or at least binary-compatible), so that
2826 	 * it's OK to use the first input value as the initial transValue.  This
2827 	 * should have been checked at agg definition time, but we must check
2828 	 * again in case the transfn's strictness property has been changed.
2829 	 */
2830 	if (peraggstate->transfn.fn_strict && peraggstate->initValueIsNull)
2831 	{
2832 		if (numArguments < 1 ||
2833 			!IsBinaryCoercible(inputTypes[0], aggtranstype))
2834 			ereport(ERROR,
2835 					(errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
2836 					 errmsg("aggregate %u needs to have compatible input type and transition type",
2837 							wfunc->winfnoid)));
2838 	}
2839 
2840 	/*
2841 	 * Insist that forward and inverse transition functions have the same
2842 	 * strictness setting.  Allowing them to differ would require handling
2843 	 * more special cases in advance_windowaggregate and
2844 	 * advance_windowaggregate_base, for no discernible benefit.  This should
2845 	 * have been checked at agg definition time, but we must check again in
2846 	 * case either function's strictness property has been changed.
2847 	 */
2848 	if (OidIsValid(invtransfn_oid) &&
2849 		peraggstate->transfn.fn_strict != peraggstate->invtransfn.fn_strict)
2850 		ereport(ERROR,
2851 				(errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
2852 				 errmsg("strictness of aggregate's forward and inverse transition functions must match")));
2853 
2854 	/*
2855 	 * Moving aggregates use their own aggcontext.
2856 	 *
2857 	 * This is necessary because they might restart at different times, so we
2858 	 * might never be able to reset the shared context otherwise.  We can't
2859 	 * make it the aggregates' responsibility to clean up after themselves,
2860 	 * because strict aggregates must be restarted whenever we remove their
2861 	 * last non-NULL input, which the aggregate won't be aware is happening.
2862 	 * Also, just pfree()ing the transValue upon restarting wouldn't help,
2863 	 * since we'd miss any indirectly referenced data.  We could, in theory,
2864 	 * make the memory allocation rules for moving aggregates different than
2865 	 * they have historically been for plain aggregates, but that seems grotty
2866 	 * and likely to lead to memory leaks.
2867 	 */
2868 	if (OidIsValid(invtransfn_oid))
2869 		peraggstate->aggcontext =
2870 			AllocSetContextCreate(CurrentMemoryContext,
2871 								  "WindowAgg Per Aggregate",
2872 								  ALLOCSET_DEFAULT_SIZES);
2873 	else
2874 		peraggstate->aggcontext = winstate->aggcontext;
2875 
2876 	ReleaseSysCache(aggTuple);
2877 
2878 	return peraggstate;
2879 }
2880 
2881 static Datum
GetAggInitVal(Datum textInitVal,Oid transtype)2882 GetAggInitVal(Datum textInitVal, Oid transtype)
2883 {
2884 	Oid			typinput,
2885 				typioparam;
2886 	char	   *strInitVal;
2887 	Datum		initVal;
2888 
2889 	getTypeInputInfo(transtype, &typinput, &typioparam);
2890 	strInitVal = TextDatumGetCString(textInitVal);
2891 	initVal = OidInputFunctionCall(typinput, strInitVal,
2892 								   typioparam, -1);
2893 	pfree(strInitVal);
2894 	return initVal;
2895 }
2896 
2897 /*
2898  * are_peers
2899  * compare two rows to see if they are equal according to the ORDER BY clause
2900  *
2901  * NB: this does not consider the window frame mode.
2902  */
2903 static bool
are_peers(WindowAggState * winstate,TupleTableSlot * slot1,TupleTableSlot * slot2)2904 are_peers(WindowAggState *winstate, TupleTableSlot *slot1,
2905 		  TupleTableSlot *slot2)
2906 {
2907 	WindowAgg  *node = (WindowAgg *) winstate->ss.ps.plan;
2908 	ExprContext *econtext = winstate->tmpcontext;
2909 
2910 	/* If no ORDER BY, all rows are peers with each other */
2911 	if (node->ordNumCols == 0)
2912 		return true;
2913 
2914 	econtext->ecxt_outertuple = slot1;
2915 	econtext->ecxt_innertuple = slot2;
2916 	return ExecQualAndReset(winstate->ordEqfunction, econtext);
2917 }
2918 
2919 /*
2920  * window_gettupleslot
2921  *	Fetch the pos'th tuple of the current partition into the slot,
2922  *	using the winobj's read pointer
2923  *
2924  * Returns true if successful, false if no such row
2925  */
2926 static bool
window_gettupleslot(WindowObject winobj,int64 pos,TupleTableSlot * slot)2927 window_gettupleslot(WindowObject winobj, int64 pos, TupleTableSlot *slot)
2928 {
2929 	WindowAggState *winstate = winobj->winstate;
2930 	MemoryContext oldcontext;
2931 
2932 	/* often called repeatedly in a row */
2933 	CHECK_FOR_INTERRUPTS();
2934 
2935 	/* Don't allow passing -1 to spool_tuples here */
2936 	if (pos < 0)
2937 		return false;
2938 
2939 	/* If necessary, fetch the tuple into the spool */
2940 	spool_tuples(winstate, pos);
2941 
2942 	if (pos >= winstate->spooled_rows)
2943 		return false;
2944 
2945 	if (pos < winobj->markpos)
2946 		elog(ERROR, "cannot fetch row before WindowObject's mark position");
2947 
2948 	oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
2949 
2950 	tuplestore_select_read_pointer(winstate->buffer, winobj->readptr);
2951 
2952 	/*
2953 	 * Advance or rewind until we are within one tuple of the one we want.
2954 	 */
2955 	if (winobj->seekpos < pos - 1)
2956 	{
2957 		if (!tuplestore_skiptuples(winstate->buffer,
2958 								   pos - 1 - winobj->seekpos,
2959 								   true))
2960 			elog(ERROR, "unexpected end of tuplestore");
2961 		winobj->seekpos = pos - 1;
2962 	}
2963 	else if (winobj->seekpos > pos + 1)
2964 	{
2965 		if (!tuplestore_skiptuples(winstate->buffer,
2966 								   winobj->seekpos - (pos + 1),
2967 								   false))
2968 			elog(ERROR, "unexpected end of tuplestore");
2969 		winobj->seekpos = pos + 1;
2970 	}
2971 	else if (winobj->seekpos == pos)
2972 	{
2973 		/*
2974 		 * There's no API to refetch the tuple at the current position.  We
2975 		 * have to move one tuple forward, and then one backward.  (We don't
2976 		 * do it the other way because we might try to fetch the row before
2977 		 * our mark, which isn't allowed.)  XXX this case could stand to be
2978 		 * optimized.
2979 		 */
2980 		tuplestore_advance(winstate->buffer, true);
2981 		winobj->seekpos++;
2982 	}
2983 
2984 	/*
2985 	 * Now we should be on the tuple immediately before or after the one we
2986 	 * want, so just fetch forwards or backwards as appropriate.
2987 	 */
2988 	if (winobj->seekpos > pos)
2989 	{
2990 		if (!tuplestore_gettupleslot(winstate->buffer, false, true, slot))
2991 			elog(ERROR, "unexpected end of tuplestore");
2992 		winobj->seekpos--;
2993 	}
2994 	else
2995 	{
2996 		if (!tuplestore_gettupleslot(winstate->buffer, true, true, slot))
2997 			elog(ERROR, "unexpected end of tuplestore");
2998 		winobj->seekpos++;
2999 	}
3000 
3001 	Assert(winobj->seekpos == pos);
3002 
3003 	MemoryContextSwitchTo(oldcontext);
3004 
3005 	return true;
3006 }
3007 
3008 
3009 /***********************************************************************
3010  * API exposed to window functions
3011  ***********************************************************************/
3012 
3013 
3014 /*
3015  * WinGetPartitionLocalMemory
3016  *		Get working memory that lives till end of partition processing
3017  *
3018  * On first call within a given partition, this allocates and zeroes the
3019  * requested amount of space.  Subsequent calls just return the same chunk.
3020  *
3021  * Memory obtained this way is normally used to hold state that should be
3022  * automatically reset for each new partition.  If a window function wants
3023  * to hold state across the whole query, fcinfo->fn_extra can be used in the
3024  * usual way for that.
3025  */
3026 void *
WinGetPartitionLocalMemory(WindowObject winobj,Size sz)3027 WinGetPartitionLocalMemory(WindowObject winobj, Size sz)
3028 {
3029 	Assert(WindowObjectIsValid(winobj));
3030 	if (winobj->localmem == NULL)
3031 		winobj->localmem =
3032 			MemoryContextAllocZero(winobj->winstate->partcontext, sz);
3033 	return winobj->localmem;
3034 }
3035 
3036 /*
3037  * WinGetCurrentPosition
3038  *		Return the current row's position (counting from 0) within the current
3039  *		partition.
3040  */
3041 int64
WinGetCurrentPosition(WindowObject winobj)3042 WinGetCurrentPosition(WindowObject winobj)
3043 {
3044 	Assert(WindowObjectIsValid(winobj));
3045 	return winobj->winstate->currentpos;
3046 }
3047 
3048 /*
3049  * WinGetPartitionRowCount
3050  *		Return total number of rows contained in the current partition.
3051  *
3052  * Note: this is a relatively expensive operation because it forces the
3053  * whole partition to be "spooled" into the tuplestore at once.  Once
3054  * executed, however, additional calls within the same partition are cheap.
3055  */
3056 int64
WinGetPartitionRowCount(WindowObject winobj)3057 WinGetPartitionRowCount(WindowObject winobj)
3058 {
3059 	Assert(WindowObjectIsValid(winobj));
3060 	spool_tuples(winobj->winstate, -1);
3061 	return winobj->winstate->spooled_rows;
3062 }
3063 
3064 /*
3065  * WinSetMarkPosition
3066  *		Set the "mark" position for the window object, which is the oldest row
3067  *		number (counting from 0) it is allowed to fetch during all subsequent
3068  *		operations within the current partition.
3069  *
3070  * Window functions do not have to call this, but are encouraged to move the
3071  * mark forward when possible to keep the tuplestore size down and prevent
3072  * having to spill rows to disk.
3073  */
3074 void
WinSetMarkPosition(WindowObject winobj,int64 markpos)3075 WinSetMarkPosition(WindowObject winobj, int64 markpos)
3076 {
3077 	WindowAggState *winstate;
3078 
3079 	Assert(WindowObjectIsValid(winobj));
3080 	winstate = winobj->winstate;
3081 
3082 	if (markpos < winobj->markpos)
3083 		elog(ERROR, "cannot move WindowObject's mark position backward");
3084 	tuplestore_select_read_pointer(winstate->buffer, winobj->markptr);
3085 	if (markpos > winobj->markpos)
3086 	{
3087 		tuplestore_skiptuples(winstate->buffer,
3088 							  markpos - winobj->markpos,
3089 							  true);
3090 		winobj->markpos = markpos;
3091 	}
3092 	tuplestore_select_read_pointer(winstate->buffer, winobj->readptr);
3093 	if (markpos > winobj->seekpos)
3094 	{
3095 		tuplestore_skiptuples(winstate->buffer,
3096 							  markpos - winobj->seekpos,
3097 							  true);
3098 		winobj->seekpos = markpos;
3099 	}
3100 }
3101 
3102 /*
3103  * WinRowsArePeers
3104  *		Compare two rows (specified by absolute position in partition) to see
3105  *		if they are equal according to the ORDER BY clause.
3106  *
3107  * NB: this does not consider the window frame mode.
3108  */
3109 bool
WinRowsArePeers(WindowObject winobj,int64 pos1,int64 pos2)3110 WinRowsArePeers(WindowObject winobj, int64 pos1, int64 pos2)
3111 {
3112 	WindowAggState *winstate;
3113 	WindowAgg  *node;
3114 	TupleTableSlot *slot1;
3115 	TupleTableSlot *slot2;
3116 	bool		res;
3117 
3118 	Assert(WindowObjectIsValid(winobj));
3119 	winstate = winobj->winstate;
3120 	node = (WindowAgg *) winstate->ss.ps.plan;
3121 
3122 	/* If no ORDER BY, all rows are peers; don't bother to fetch them */
3123 	if (node->ordNumCols == 0)
3124 		return true;
3125 
3126 	/*
3127 	 * Note: OK to use temp_slot_2 here because we aren't calling any
3128 	 * frame-related functions (those tend to clobber temp_slot_2).
3129 	 */
3130 	slot1 = winstate->temp_slot_1;
3131 	slot2 = winstate->temp_slot_2;
3132 
3133 	if (!window_gettupleslot(winobj, pos1, slot1))
3134 		elog(ERROR, "specified position is out of window: " INT64_FORMAT,
3135 			 pos1);
3136 	if (!window_gettupleslot(winobj, pos2, slot2))
3137 		elog(ERROR, "specified position is out of window: " INT64_FORMAT,
3138 			 pos2);
3139 
3140 	res = are_peers(winstate, slot1, slot2);
3141 
3142 	ExecClearTuple(slot1);
3143 	ExecClearTuple(slot2);
3144 
3145 	return res;
3146 }
3147 
3148 /*
3149  * WinGetFuncArgInPartition
3150  *		Evaluate a window function's argument expression on a specified
3151  *		row of the partition.  The row is identified in lseek(2) style,
3152  *		i.e. relative to the current, first, or last row.
3153  *
3154  * argno: argument number to evaluate (counted from 0)
3155  * relpos: signed rowcount offset from the seek position
3156  * seektype: WINDOW_SEEK_CURRENT, WINDOW_SEEK_HEAD, or WINDOW_SEEK_TAIL
3157  * set_mark: If the row is found and set_mark is true, the mark is moved to
3158  *		the row as a side-effect.
3159  * isnull: output argument, receives isnull status of result
3160  * isout: output argument, set to indicate whether target row position
3161  *		is out of partition (can pass NULL if caller doesn't care about this)
3162  *
3163  * Specifying a nonexistent row is not an error, it just causes a null result
3164  * (plus setting *isout true, if isout isn't NULL).
3165  */
3166 Datum
WinGetFuncArgInPartition(WindowObject winobj,int argno,int relpos,int seektype,bool set_mark,bool * isnull,bool * isout)3167 WinGetFuncArgInPartition(WindowObject winobj, int argno,
3168 						 int relpos, int seektype, bool set_mark,
3169 						 bool *isnull, bool *isout)
3170 {
3171 	WindowAggState *winstate;
3172 	ExprContext *econtext;
3173 	TupleTableSlot *slot;
3174 	bool		gottuple;
3175 	int64		abs_pos;
3176 
3177 	Assert(WindowObjectIsValid(winobj));
3178 	winstate = winobj->winstate;
3179 	econtext = winstate->ss.ps.ps_ExprContext;
3180 	slot = winstate->temp_slot_1;
3181 
3182 	switch (seektype)
3183 	{
3184 		case WINDOW_SEEK_CURRENT:
3185 			abs_pos = winstate->currentpos + relpos;
3186 			break;
3187 		case WINDOW_SEEK_HEAD:
3188 			abs_pos = relpos;
3189 			break;
3190 		case WINDOW_SEEK_TAIL:
3191 			spool_tuples(winstate, -1);
3192 			abs_pos = winstate->spooled_rows - 1 + relpos;
3193 			break;
3194 		default:
3195 			elog(ERROR, "unrecognized window seek type: %d", seektype);
3196 			abs_pos = 0;		/* keep compiler quiet */
3197 			break;
3198 	}
3199 
3200 	gottuple = window_gettupleslot(winobj, abs_pos, slot);
3201 
3202 	if (!gottuple)
3203 	{
3204 		if (isout)
3205 			*isout = true;
3206 		*isnull = true;
3207 		return (Datum) 0;
3208 	}
3209 	else
3210 	{
3211 		if (isout)
3212 			*isout = false;
3213 		if (set_mark)
3214 			WinSetMarkPosition(winobj, abs_pos);
3215 		econtext->ecxt_outertuple = slot;
3216 		return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
3217 							econtext, isnull);
3218 	}
3219 }
3220 
3221 /*
3222  * WinGetFuncArgInFrame
3223  *		Evaluate a window function's argument expression on a specified
3224  *		row of the window frame.  The row is identified in lseek(2) style,
3225  *		i.e. relative to the first or last row of the frame.  (We do not
3226  *		support WINDOW_SEEK_CURRENT here, because it's not very clear what
3227  *		that should mean if the current row isn't part of the frame.)
3228  *
3229  * argno: argument number to evaluate (counted from 0)
3230  * relpos: signed rowcount offset from the seek position
3231  * seektype: WINDOW_SEEK_HEAD or WINDOW_SEEK_TAIL
3232  * set_mark: If the row is found/in frame and set_mark is true, the mark is
3233  *		moved to the row as a side-effect.
3234  * isnull: output argument, receives isnull status of result
3235  * isout: output argument, set to indicate whether target row position
3236  *		is out of frame (can pass NULL if caller doesn't care about this)
3237  *
3238  * Specifying a nonexistent or not-in-frame row is not an error, it just
3239  * causes a null result (plus setting *isout true, if isout isn't NULL).
3240  *
3241  * Note that some exclusion-clause options lead to situations where the
3242  * rows that are in-frame are not consecutive in the partition.  But we
3243  * count only in-frame rows when measuring relpos.
3244  *
3245  * The set_mark flag is interpreted as meaning that the caller will specify
3246  * a constant (or, perhaps, monotonically increasing) relpos in successive
3247  * calls, so that *if there is no exclusion clause* there will be no need
3248  * to fetch a row before the previously fetched row.  But we do not expect
3249  * the caller to know how to account for exclusion clauses.  Therefore,
3250  * if there is an exclusion clause we take responsibility for adjusting the
3251  * mark request to something that will be safe given the above assumption
3252  * about relpos.
3253  */
3254 Datum
WinGetFuncArgInFrame(WindowObject winobj,int argno,int relpos,int seektype,bool set_mark,bool * isnull,bool * isout)3255 WinGetFuncArgInFrame(WindowObject winobj, int argno,
3256 					 int relpos, int seektype, bool set_mark,
3257 					 bool *isnull, bool *isout)
3258 {
3259 	WindowAggState *winstate;
3260 	ExprContext *econtext;
3261 	TupleTableSlot *slot;
3262 	int64		abs_pos;
3263 	int64		mark_pos;
3264 
3265 	Assert(WindowObjectIsValid(winobj));
3266 	winstate = winobj->winstate;
3267 	econtext = winstate->ss.ps.ps_ExprContext;
3268 	slot = winstate->temp_slot_1;
3269 
3270 	switch (seektype)
3271 	{
3272 		case WINDOW_SEEK_CURRENT:
3273 			elog(ERROR, "WINDOW_SEEK_CURRENT is not supported for WinGetFuncArgInFrame");
3274 			abs_pos = mark_pos = 0; /* keep compiler quiet */
3275 			break;
3276 		case WINDOW_SEEK_HEAD:
3277 			/* rejecting relpos < 0 is easy and simplifies code below */
3278 			if (relpos < 0)
3279 				goto out_of_frame;
3280 			update_frameheadpos(winstate);
3281 			abs_pos = winstate->frameheadpos + relpos;
3282 			mark_pos = abs_pos;
3283 
3284 			/*
3285 			 * Account for exclusion option if one is active, but advance only
3286 			 * abs_pos not mark_pos.  This prevents changes of the current
3287 			 * row's peer group from resulting in trying to fetch a row before
3288 			 * some previous mark position.
3289 			 *
3290 			 * Note that in some corner cases such as current row being
3291 			 * outside frame, these calculations are theoretically too simple,
3292 			 * but it doesn't matter because we'll end up deciding the row is
3293 			 * out of frame.  We do not attempt to avoid fetching rows past
3294 			 * end of frame; that would happen in some cases anyway.
3295 			 */
3296 			switch (winstate->frameOptions & FRAMEOPTION_EXCLUSION)
3297 			{
3298 				case 0:
3299 					/* no adjustment needed */
3300 					break;
3301 				case FRAMEOPTION_EXCLUDE_CURRENT_ROW:
3302 					if (abs_pos >= winstate->currentpos &&
3303 						winstate->currentpos >= winstate->frameheadpos)
3304 						abs_pos++;
3305 					break;
3306 				case FRAMEOPTION_EXCLUDE_GROUP:
3307 					update_grouptailpos(winstate);
3308 					if (abs_pos >= winstate->groupheadpos &&
3309 						winstate->grouptailpos > winstate->frameheadpos)
3310 					{
3311 						int64		overlapstart = Max(winstate->groupheadpos,
3312 													   winstate->frameheadpos);
3313 
3314 						abs_pos += winstate->grouptailpos - overlapstart;
3315 					}
3316 					break;
3317 				case FRAMEOPTION_EXCLUDE_TIES:
3318 					update_grouptailpos(winstate);
3319 					if (abs_pos >= winstate->groupheadpos &&
3320 						winstate->grouptailpos > winstate->frameheadpos)
3321 					{
3322 						int64		overlapstart = Max(winstate->groupheadpos,
3323 													   winstate->frameheadpos);
3324 
3325 						if (abs_pos == overlapstart)
3326 							abs_pos = winstate->currentpos;
3327 						else
3328 							abs_pos += winstate->grouptailpos - overlapstart - 1;
3329 					}
3330 					break;
3331 				default:
3332 					elog(ERROR, "unrecognized frame option state: 0x%x",
3333 						 winstate->frameOptions);
3334 					break;
3335 			}
3336 			break;
3337 		case WINDOW_SEEK_TAIL:
3338 			/* rejecting relpos > 0 is easy and simplifies code below */
3339 			if (relpos > 0)
3340 				goto out_of_frame;
3341 			update_frametailpos(winstate);
3342 			abs_pos = winstate->frametailpos - 1 + relpos;
3343 
3344 			/*
3345 			 * Account for exclusion option if one is active.  If there is no
3346 			 * exclusion, we can safely set the mark at the accessed row.  But
3347 			 * if there is, we can only mark the frame start, because we can't
3348 			 * be sure how far back in the frame the exclusion might cause us
3349 			 * to fetch in future.  Furthermore, we have to actually check
3350 			 * against frameheadpos here, since it's unsafe to try to fetch a
3351 			 * row before frame start if the mark might be there already.
3352 			 */
3353 			switch (winstate->frameOptions & FRAMEOPTION_EXCLUSION)
3354 			{
3355 				case 0:
3356 					/* no adjustment needed */
3357 					mark_pos = abs_pos;
3358 					break;
3359 				case FRAMEOPTION_EXCLUDE_CURRENT_ROW:
3360 					if (abs_pos <= winstate->currentpos &&
3361 						winstate->currentpos < winstate->frametailpos)
3362 						abs_pos--;
3363 					update_frameheadpos(winstate);
3364 					if (abs_pos < winstate->frameheadpos)
3365 						goto out_of_frame;
3366 					mark_pos = winstate->frameheadpos;
3367 					break;
3368 				case FRAMEOPTION_EXCLUDE_GROUP:
3369 					update_grouptailpos(winstate);
3370 					if (abs_pos < winstate->grouptailpos &&
3371 						winstate->groupheadpos < winstate->frametailpos)
3372 					{
3373 						int64		overlapend = Min(winstate->grouptailpos,
3374 													 winstate->frametailpos);
3375 
3376 						abs_pos -= overlapend - winstate->groupheadpos;
3377 					}
3378 					update_frameheadpos(winstate);
3379 					if (abs_pos < winstate->frameheadpos)
3380 						goto out_of_frame;
3381 					mark_pos = winstate->frameheadpos;
3382 					break;
3383 				case FRAMEOPTION_EXCLUDE_TIES:
3384 					update_grouptailpos(winstate);
3385 					if (abs_pos < winstate->grouptailpos &&
3386 						winstate->groupheadpos < winstate->frametailpos)
3387 					{
3388 						int64		overlapend = Min(winstate->grouptailpos,
3389 													 winstate->frametailpos);
3390 
3391 						if (abs_pos == overlapend - 1)
3392 							abs_pos = winstate->currentpos;
3393 						else
3394 							abs_pos -= overlapend - 1 - winstate->groupheadpos;
3395 					}
3396 					update_frameheadpos(winstate);
3397 					if (abs_pos < winstate->frameheadpos)
3398 						goto out_of_frame;
3399 					mark_pos = winstate->frameheadpos;
3400 					break;
3401 				default:
3402 					elog(ERROR, "unrecognized frame option state: 0x%x",
3403 						 winstate->frameOptions);
3404 					mark_pos = 0;	/* keep compiler quiet */
3405 					break;
3406 			}
3407 			break;
3408 		default:
3409 			elog(ERROR, "unrecognized window seek type: %d", seektype);
3410 			abs_pos = mark_pos = 0; /* keep compiler quiet */
3411 			break;
3412 	}
3413 
3414 	if (!window_gettupleslot(winobj, abs_pos, slot))
3415 		goto out_of_frame;
3416 
3417 	/* The code above does not detect all out-of-frame cases, so check */
3418 	if (row_is_in_frame(winstate, abs_pos, slot) <= 0)
3419 		goto out_of_frame;
3420 
3421 	if (isout)
3422 		*isout = false;
3423 	if (set_mark)
3424 		WinSetMarkPosition(winobj, mark_pos);
3425 	econtext->ecxt_outertuple = slot;
3426 	return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
3427 						econtext, isnull);
3428 
3429 out_of_frame:
3430 	if (isout)
3431 		*isout = true;
3432 	*isnull = true;
3433 	return (Datum) 0;
3434 }
3435 
3436 /*
3437  * WinGetFuncArgCurrent
3438  *		Evaluate a window function's argument expression on the current row.
3439  *
3440  * argno: argument number to evaluate (counted from 0)
3441  * isnull: output argument, receives isnull status of result
3442  *
3443  * Note: this isn't quite equivalent to WinGetFuncArgInPartition or
3444  * WinGetFuncArgInFrame targeting the current row, because it will succeed
3445  * even if the WindowObject's mark has been set beyond the current row.
3446  * This should generally be used for "ordinary" arguments of a window
3447  * function, such as the offset argument of lead() or lag().
3448  */
3449 Datum
WinGetFuncArgCurrent(WindowObject winobj,int argno,bool * isnull)3450 WinGetFuncArgCurrent(WindowObject winobj, int argno, bool *isnull)
3451 {
3452 	WindowAggState *winstate;
3453 	ExprContext *econtext;
3454 
3455 	Assert(WindowObjectIsValid(winobj));
3456 	winstate = winobj->winstate;
3457 
3458 	econtext = winstate->ss.ps.ps_ExprContext;
3459 
3460 	econtext->ecxt_outertuple = winstate->ss.ss_ScanTupleSlot;
3461 	return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
3462 						econtext, isnull);
3463 }
3464