1 /*-------------------------------------------------------------------------
2  *
3  * nodeWindowAgg.c
4  *	  routines to handle WindowAgg nodes.
5  *
6  * A WindowAgg node evaluates "window functions" across suitable partitions
7  * of the input tuple set.  Any one WindowAgg works for just a single window
8  * specification, though it can evaluate multiple window functions sharing
9  * identical window specifications.  The input tuples are required to be
10  * delivered in sorted order, with the PARTITION BY columns (if any) as
11  * major sort keys and the ORDER BY columns (if any) as minor sort keys.
12  * (The planner generates a stack of WindowAggs with intervening Sort nodes
13  * as needed, if a query involves more than one window specification.)
14  *
15  * Since window functions can require access to any or all of the rows in
16  * the current partition, we accumulate rows of the partition into a
17  * tuplestore.  The window functions are called using the WindowObject API
18  * so that they can access those rows as needed.
19  *
20  * We also support using plain aggregate functions as window functions.
21  * For these, the regular Agg-node environment is emulated for each partition.
22  * As required by the SQL spec, the output represents the value of the
23  * aggregate function over all rows in the current row's window frame.
24  *
25  *
26  * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
27  * Portions Copyright (c) 1994, Regents of the University of California
28  *
29  * IDENTIFICATION
30  *	  src/backend/executor/nodeWindowAgg.c
31  *
32  *-------------------------------------------------------------------------
33  */
34 #include "postgres.h"
35 
36 #include "access/htup_details.h"
37 #include "catalog/objectaccess.h"
38 #include "catalog/pg_aggregate.h"
39 #include "catalog/pg_proc.h"
40 #include "executor/executor.h"
41 #include "executor/nodeWindowAgg.h"
42 #include "miscadmin.h"
43 #include "nodes/nodeFuncs.h"
44 #include "optimizer/clauses.h"
45 #include "parser/parse_agg.h"
46 #include "parser/parse_coerce.h"
47 #include "utils/acl.h"
48 #include "utils/builtins.h"
49 #include "utils/datum.h"
50 #include "utils/lsyscache.h"
51 #include "utils/memutils.h"
52 #include "utils/syscache.h"
53 #include "windowapi.h"
54 
55 /*
56  * All the window function APIs are called with this object, which is passed
57  * to window functions as fcinfo->context.
58  */
59 typedef struct WindowObjectData
60 {
61 	NodeTag		type;
62 	WindowAggState *winstate;	/* parent WindowAggState */
63 	List	   *argstates;		/* ExprState trees for fn's arguments */
64 	void	   *localmem;		/* WinGetPartitionLocalMemory's chunk */
65 	int			markptr;		/* tuplestore mark pointer for this fn */
66 	int			readptr;		/* tuplestore read pointer for this fn */
67 	int64		markpos;		/* row that markptr is positioned on */
68 	int64		seekpos;		/* row that readptr is positioned on */
69 } WindowObjectData;
70 
71 /*
72  * We have one WindowStatePerFunc struct for each window function and
73  * window aggregate handled by this node.
74  */
75 typedef struct WindowStatePerFuncData
76 {
77 	/* Links to WindowFunc expr and state nodes this working state is for */
78 	WindowFuncExprState *wfuncstate;
79 	WindowFunc *wfunc;
80 
81 	int			numArguments;	/* number of arguments */
82 
83 	FmgrInfo	flinfo;			/* fmgr lookup data for window function */
84 
85 	Oid			winCollation;	/* collation derived for window function */
86 
87 	/*
88 	 * We need the len and byval info for the result of each function in order
89 	 * to know how to copy/delete values.
90 	 */
91 	int16		resulttypeLen;
92 	bool		resulttypeByVal;
93 
94 	bool		plain_agg;		/* is it just a plain aggregate function? */
95 	int			aggno;			/* if so, index of its PerAggData */
96 
97 	WindowObject winobj;		/* object used in window function API */
98 }	WindowStatePerFuncData;
99 
100 /*
101  * For plain aggregate window functions, we also have one of these.
102  */
103 typedef struct WindowStatePerAggData
104 {
105 	/* Oids of transition functions */
106 	Oid			transfn_oid;
107 	Oid			invtransfn_oid; /* may be InvalidOid */
108 	Oid			finalfn_oid;	/* may be InvalidOid */
109 
110 	/*
111 	 * fmgr lookup data for transition functions --- only valid when
112 	 * corresponding oid is not InvalidOid.  Note in particular that fn_strict
113 	 * flags are kept here.
114 	 */
115 	FmgrInfo	transfn;
116 	FmgrInfo	invtransfn;
117 	FmgrInfo	finalfn;
118 
119 	int			numFinalArgs;	/* number of arguments to pass to finalfn */
120 
121 	/*
122 	 * initial value from pg_aggregate entry
123 	 */
124 	Datum		initValue;
125 	bool		initValueIsNull;
126 
127 	/*
128 	 * cached value for current frame boundaries
129 	 */
130 	Datum		resultValue;
131 	bool		resultValueIsNull;
132 
133 	/*
134 	 * We need the len and byval info for the agg's input, result, and
135 	 * transition data types in order to know how to copy/delete values.
136 	 */
137 	int16		inputtypeLen,
138 				resulttypeLen,
139 				transtypeLen;
140 	bool		inputtypeByVal,
141 				resulttypeByVal,
142 				transtypeByVal;
143 
144 	int			wfuncno;		/* index of associated PerFuncData */
145 
146 	/* Context holding transition value and possibly other subsidiary data */
147 	MemoryContext aggcontext;	/* may be private, or winstate->aggcontext */
148 
149 	/* Current transition value */
150 	Datum		transValue;		/* current transition value */
151 	bool		transValueIsNull;
152 
153 	int64		transValueCount;	/* number of currently-aggregated rows */
154 
155 	/* Data local to eval_windowaggregates() */
156 	bool		restart;		/* need to restart this agg in this cycle? */
157 } WindowStatePerAggData;
158 
159 static void initialize_windowaggregate(WindowAggState *winstate,
160 						   WindowStatePerFunc perfuncstate,
161 						   WindowStatePerAgg peraggstate);
162 static void advance_windowaggregate(WindowAggState *winstate,
163 						WindowStatePerFunc perfuncstate,
164 						WindowStatePerAgg peraggstate);
165 static bool advance_windowaggregate_base(WindowAggState *winstate,
166 							 WindowStatePerFunc perfuncstate,
167 							 WindowStatePerAgg peraggstate);
168 static void finalize_windowaggregate(WindowAggState *winstate,
169 						 WindowStatePerFunc perfuncstate,
170 						 WindowStatePerAgg peraggstate,
171 						 Datum *result, bool *isnull);
172 
173 static void eval_windowaggregates(WindowAggState *winstate);
174 static void eval_windowfunction(WindowAggState *winstate,
175 					WindowStatePerFunc perfuncstate,
176 					Datum *result, bool *isnull);
177 
178 static void begin_partition(WindowAggState *winstate);
179 static void spool_tuples(WindowAggState *winstate, int64 pos);
180 static void release_partition(WindowAggState *winstate);
181 
182 static bool row_is_in_frame(WindowAggState *winstate, int64 pos,
183 				TupleTableSlot *slot);
184 static void update_frameheadpos(WindowObject winobj, TupleTableSlot *slot);
185 static void update_frametailpos(WindowObject winobj, TupleTableSlot *slot);
186 
187 static WindowStatePerAggData *initialize_peragg(WindowAggState *winstate,
188 				  WindowFunc *wfunc,
189 				  WindowStatePerAgg peraggstate);
190 static Datum GetAggInitVal(Datum textInitVal, Oid transtype);
191 
192 static bool are_peers(WindowAggState *winstate, TupleTableSlot *slot1,
193 		  TupleTableSlot *slot2);
194 static bool window_gettupleslot(WindowObject winobj, int64 pos,
195 					TupleTableSlot *slot);
196 
197 
198 /*
199  * initialize_windowaggregate
200  * parallel to initialize_aggregates in nodeAgg.c
201  */
202 static void
initialize_windowaggregate(WindowAggState * winstate,WindowStatePerFunc perfuncstate,WindowStatePerAgg peraggstate)203 initialize_windowaggregate(WindowAggState *winstate,
204 						   WindowStatePerFunc perfuncstate,
205 						   WindowStatePerAgg peraggstate)
206 {
207 	MemoryContext oldContext;
208 
209 	/*
210 	 * If we're using a private aggcontext, we may reset it here.  But if the
211 	 * context is shared, we don't know which other aggregates may still need
212 	 * it, so we must leave it to the caller to reset at an appropriate time.
213 	 */
214 	if (peraggstate->aggcontext != winstate->aggcontext)
215 		MemoryContextResetAndDeleteChildren(peraggstate->aggcontext);
216 
217 	if (peraggstate->initValueIsNull)
218 		peraggstate->transValue = peraggstate->initValue;
219 	else
220 	{
221 		oldContext = MemoryContextSwitchTo(peraggstate->aggcontext);
222 		peraggstate->transValue = datumCopy(peraggstate->initValue,
223 											peraggstate->transtypeByVal,
224 											peraggstate->transtypeLen);
225 		MemoryContextSwitchTo(oldContext);
226 	}
227 	peraggstate->transValueIsNull = peraggstate->initValueIsNull;
228 	peraggstate->transValueCount = 0;
229 	peraggstate->resultValue = (Datum) 0;
230 	peraggstate->resultValueIsNull = true;
231 }
232 
233 /*
234  * advance_windowaggregate
235  * parallel to advance_aggregates in nodeAgg.c
236  */
237 static void
advance_windowaggregate(WindowAggState * winstate,WindowStatePerFunc perfuncstate,WindowStatePerAgg peraggstate)238 advance_windowaggregate(WindowAggState *winstate,
239 						WindowStatePerFunc perfuncstate,
240 						WindowStatePerAgg peraggstate)
241 {
242 	WindowFuncExprState *wfuncstate = perfuncstate->wfuncstate;
243 	int			numArguments = perfuncstate->numArguments;
244 	FunctionCallInfoData fcinfodata;
245 	FunctionCallInfo fcinfo = &fcinfodata;
246 	Datum		newVal;
247 	ListCell   *arg;
248 	int			i;
249 	MemoryContext oldContext;
250 	ExprContext *econtext = winstate->tmpcontext;
251 	ExprState  *filter = wfuncstate->aggfilter;
252 
253 	oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
254 
255 	/* Skip anything FILTERed out */
256 	if (filter)
257 	{
258 		bool		isnull;
259 		Datum		res = ExecEvalExpr(filter, econtext, &isnull, NULL);
260 
261 		if (isnull || !DatumGetBool(res))
262 		{
263 			MemoryContextSwitchTo(oldContext);
264 			return;
265 		}
266 	}
267 
268 	/* We start from 1, since the 0th arg will be the transition value */
269 	i = 1;
270 	foreach(arg, wfuncstate->args)
271 	{
272 		ExprState  *argstate = (ExprState *) lfirst(arg);
273 
274 		fcinfo->arg[i] = ExecEvalExpr(argstate, econtext,
275 									  &fcinfo->argnull[i], NULL);
276 		i++;
277 	}
278 
279 	if (peraggstate->transfn.fn_strict)
280 	{
281 		/*
282 		 * For a strict transfn, nothing happens when there's a NULL input; we
283 		 * just keep the prior transValue.  Note transValueCount doesn't
284 		 * change either.
285 		 */
286 		for (i = 1; i <= numArguments; i++)
287 		{
288 			if (fcinfo->argnull[i])
289 			{
290 				MemoryContextSwitchTo(oldContext);
291 				return;
292 			}
293 		}
294 
295 		/*
296 		 * For strict transition functions with initial value NULL we use the
297 		 * first non-NULL input as the initial state.  (We already checked
298 		 * that the agg's input type is binary-compatible with its transtype,
299 		 * so straight copy here is OK.)
300 		 *
301 		 * We must copy the datum into aggcontext if it is pass-by-ref.  We do
302 		 * not need to pfree the old transValue, since it's NULL.
303 		 */
304 		if (peraggstate->transValueCount == 0 && peraggstate->transValueIsNull)
305 		{
306 			MemoryContextSwitchTo(peraggstate->aggcontext);
307 			peraggstate->transValue = datumCopy(fcinfo->arg[1],
308 												peraggstate->transtypeByVal,
309 												peraggstate->transtypeLen);
310 			peraggstate->transValueIsNull = false;
311 			peraggstate->transValueCount = 1;
312 			MemoryContextSwitchTo(oldContext);
313 			return;
314 		}
315 
316 		if (peraggstate->transValueIsNull)
317 		{
318 			/*
319 			 * Don't call a strict function with NULL inputs.  Note it is
320 			 * possible to get here despite the above tests, if the transfn is
321 			 * strict *and* returned a NULL on a prior cycle.  If that happens
322 			 * we will propagate the NULL all the way to the end.  That can
323 			 * only happen if there's no inverse transition function, though,
324 			 * since we disallow transitions back to NULL when there is one.
325 			 */
326 			MemoryContextSwitchTo(oldContext);
327 			Assert(!OidIsValid(peraggstate->invtransfn_oid));
328 			return;
329 		}
330 	}
331 
332 	/*
333 	 * OK to call the transition function.  Set winstate->curaggcontext while
334 	 * calling it, for possible use by AggCheckCallContext.
335 	 */
336 	InitFunctionCallInfoData(*fcinfo, &(peraggstate->transfn),
337 							 numArguments + 1,
338 							 perfuncstate->winCollation,
339 							 (void *) winstate, NULL);
340 	fcinfo->arg[0] = peraggstate->transValue;
341 	fcinfo->argnull[0] = peraggstate->transValueIsNull;
342 	winstate->curaggcontext = peraggstate->aggcontext;
343 	newVal = FunctionCallInvoke(fcinfo);
344 	winstate->curaggcontext = NULL;
345 
346 	/*
347 	 * Moving-aggregate transition functions must not return null, see
348 	 * advance_windowaggregate_base().
349 	 */
350 	if (fcinfo->isnull && OidIsValid(peraggstate->invtransfn_oid))
351 		ereport(ERROR,
352 				(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
353 		errmsg("moving-aggregate transition function must not return null")));
354 
355 	/*
356 	 * We must track the number of rows included in transValue, since to
357 	 * remove the last input, advance_windowaggregate_base() mustn't call the
358 	 * inverse transition function, but simply reset transValue back to its
359 	 * initial value.
360 	 */
361 	peraggstate->transValueCount++;
362 
363 	/*
364 	 * If pass-by-ref datatype, must copy the new value into aggcontext and
365 	 * free the prior transValue.  But if transfn returned a pointer to its
366 	 * first input, we don't need to do anything.  Also, if transfn returned a
367 	 * pointer to a R/W expanded object that is already a child of the
368 	 * aggcontext, assume we can adopt that value without copying it.
369 	 */
370 	if (!peraggstate->transtypeByVal &&
371 		DatumGetPointer(newVal) != DatumGetPointer(peraggstate->transValue))
372 	{
373 		if (!fcinfo->isnull)
374 		{
375 			MemoryContextSwitchTo(peraggstate->aggcontext);
376 			if (DatumIsReadWriteExpandedObject(newVal,
377 											   false,
378 											   peraggstate->transtypeLen) &&
379 				MemoryContextGetParent(DatumGetEOHP(newVal)->eoh_context) == CurrentMemoryContext)
380 				 /* do nothing */ ;
381 			else
382 				newVal = datumCopy(newVal,
383 								   peraggstate->transtypeByVal,
384 								   peraggstate->transtypeLen);
385 		}
386 		if (!peraggstate->transValueIsNull)
387 		{
388 			if (DatumIsReadWriteExpandedObject(peraggstate->transValue,
389 											   false,
390 											   peraggstate->transtypeLen))
391 				DeleteExpandedObject(peraggstate->transValue);
392 			else
393 				pfree(DatumGetPointer(peraggstate->transValue));
394 		}
395 	}
396 
397 	MemoryContextSwitchTo(oldContext);
398 	peraggstate->transValue = newVal;
399 	peraggstate->transValueIsNull = fcinfo->isnull;
400 }
401 
402 /*
403  * advance_windowaggregate_base
404  * Remove the oldest tuple from an aggregation.
405  *
406  * This is very much like advance_windowaggregate, except that we will call
407  * the inverse transition function (which caller must have checked is
408  * available).
409  *
410  * Returns true if we successfully removed the current row from this
411  * aggregate, false if not (in the latter case, caller is responsible
412  * for cleaning up by restarting the aggregation).
413  */
414 static bool
advance_windowaggregate_base(WindowAggState * winstate,WindowStatePerFunc perfuncstate,WindowStatePerAgg peraggstate)415 advance_windowaggregate_base(WindowAggState *winstate,
416 							 WindowStatePerFunc perfuncstate,
417 							 WindowStatePerAgg peraggstate)
418 {
419 	WindowFuncExprState *wfuncstate = perfuncstate->wfuncstate;
420 	int			numArguments = perfuncstate->numArguments;
421 	FunctionCallInfoData fcinfodata;
422 	FunctionCallInfo fcinfo = &fcinfodata;
423 	Datum		newVal;
424 	ListCell   *arg;
425 	int			i;
426 	MemoryContext oldContext;
427 	ExprContext *econtext = winstate->tmpcontext;
428 	ExprState  *filter = wfuncstate->aggfilter;
429 
430 	oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
431 
432 	/* Skip anything FILTERed out */
433 	if (filter)
434 	{
435 		bool		isnull;
436 		Datum		res = ExecEvalExpr(filter, econtext, &isnull, NULL);
437 
438 		if (isnull || !DatumGetBool(res))
439 		{
440 			MemoryContextSwitchTo(oldContext);
441 			return true;
442 		}
443 	}
444 
445 	/* We start from 1, since the 0th arg will be the transition value */
446 	i = 1;
447 	foreach(arg, wfuncstate->args)
448 	{
449 		ExprState  *argstate = (ExprState *) lfirst(arg);
450 
451 		fcinfo->arg[i] = ExecEvalExpr(argstate, econtext,
452 									  &fcinfo->argnull[i], NULL);
453 		i++;
454 	}
455 
456 	if (peraggstate->invtransfn.fn_strict)
457 	{
458 		/*
459 		 * For a strict (inv)transfn, nothing happens when there's a NULL
460 		 * input; we just keep the prior transValue.  Note transValueCount
461 		 * doesn't change either.
462 		 */
463 		for (i = 1; i <= numArguments; i++)
464 		{
465 			if (fcinfo->argnull[i])
466 			{
467 				MemoryContextSwitchTo(oldContext);
468 				return true;
469 			}
470 		}
471 	}
472 
473 	/* There should still be an added but not yet removed value */
474 	Assert(peraggstate->transValueCount > 0);
475 
476 	/*
477 	 * In moving-aggregate mode, the state must never be NULL, except possibly
478 	 * before any rows have been aggregated (which is surely not the case at
479 	 * this point).  This restriction allows us to interpret a NULL result
480 	 * from the inverse function as meaning "sorry, can't do an inverse
481 	 * transition in this case".  We already checked this in
482 	 * advance_windowaggregate, but just for safety, check again.
483 	 */
484 	if (peraggstate->transValueIsNull)
485 		elog(ERROR, "aggregate transition value is NULL before inverse transition");
486 
487 	/*
488 	 * We mustn't use the inverse transition function to remove the last
489 	 * input.  Doing so would yield a non-NULL state, whereas we should be in
490 	 * the initial state afterwards which may very well be NULL.  So instead,
491 	 * we simply re-initialize the aggregate in this case.
492 	 */
493 	if (peraggstate->transValueCount == 1)
494 	{
495 		MemoryContextSwitchTo(oldContext);
496 		initialize_windowaggregate(winstate,
497 								   &winstate->perfunc[peraggstate->wfuncno],
498 								   peraggstate);
499 		return true;
500 	}
501 
502 	/*
503 	 * OK to call the inverse transition function.  Set
504 	 * winstate->curaggcontext while calling it, for possible use by
505 	 * AggCheckCallContext.
506 	 */
507 	InitFunctionCallInfoData(*fcinfo, &(peraggstate->invtransfn),
508 							 numArguments + 1,
509 							 perfuncstate->winCollation,
510 							 (void *) winstate, NULL);
511 	fcinfo->arg[0] = peraggstate->transValue;
512 	fcinfo->argnull[0] = peraggstate->transValueIsNull;
513 	winstate->curaggcontext = peraggstate->aggcontext;
514 	newVal = FunctionCallInvoke(fcinfo);
515 	winstate->curaggcontext = NULL;
516 
517 	/*
518 	 * If the function returns NULL, report failure, forcing a restart.
519 	 */
520 	if (fcinfo->isnull)
521 	{
522 		MemoryContextSwitchTo(oldContext);
523 		return false;
524 	}
525 
526 	/* Update number of rows included in transValue */
527 	peraggstate->transValueCount--;
528 
529 	/*
530 	 * If pass-by-ref datatype, must copy the new value into aggcontext and
531 	 * free the prior transValue.  But if invtransfn returned a pointer to its
532 	 * first input, we don't need to do anything.  Also, if invtransfn
533 	 * returned a pointer to a R/W expanded object that is already a child of
534 	 * the aggcontext, assume we can adopt that value without copying it.
535 	 *
536 	 * Note: the checks for null values here will never fire, but it seems
537 	 * best to have this stanza look just like advance_windowaggregate.
538 	 */
539 	if (!peraggstate->transtypeByVal &&
540 		DatumGetPointer(newVal) != DatumGetPointer(peraggstate->transValue))
541 	{
542 		if (!fcinfo->isnull)
543 		{
544 			MemoryContextSwitchTo(peraggstate->aggcontext);
545 			if (DatumIsReadWriteExpandedObject(newVal,
546 											   false,
547 											   peraggstate->transtypeLen) &&
548 				MemoryContextGetParent(DatumGetEOHP(newVal)->eoh_context) == CurrentMemoryContext)
549 				 /* do nothing */ ;
550 			else
551 				newVal = datumCopy(newVal,
552 								   peraggstate->transtypeByVal,
553 								   peraggstate->transtypeLen);
554 		}
555 		if (!peraggstate->transValueIsNull)
556 		{
557 			if (DatumIsReadWriteExpandedObject(peraggstate->transValue,
558 											   false,
559 											   peraggstate->transtypeLen))
560 				DeleteExpandedObject(peraggstate->transValue);
561 			else
562 				pfree(DatumGetPointer(peraggstate->transValue));
563 		}
564 	}
565 
566 	MemoryContextSwitchTo(oldContext);
567 	peraggstate->transValue = newVal;
568 	peraggstate->transValueIsNull = fcinfo->isnull;
569 
570 	return true;
571 }
572 
573 /*
574  * finalize_windowaggregate
575  * parallel to finalize_aggregate in nodeAgg.c
576  */
577 static void
finalize_windowaggregate(WindowAggState * winstate,WindowStatePerFunc perfuncstate,WindowStatePerAgg peraggstate,Datum * result,bool * isnull)578 finalize_windowaggregate(WindowAggState *winstate,
579 						 WindowStatePerFunc perfuncstate,
580 						 WindowStatePerAgg peraggstate,
581 						 Datum *result, bool *isnull)
582 {
583 	MemoryContext oldContext;
584 
585 	oldContext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
586 
587 	/*
588 	 * Apply the agg's finalfn if one is provided, else return transValue.
589 	 */
590 	if (OidIsValid(peraggstate->finalfn_oid))
591 	{
592 		int			numFinalArgs = peraggstate->numFinalArgs;
593 		FunctionCallInfoData fcinfo;
594 		bool		anynull;
595 		int			i;
596 
597 		InitFunctionCallInfoData(fcinfo, &(peraggstate->finalfn),
598 								 numFinalArgs,
599 								 perfuncstate->winCollation,
600 								 (void *) winstate, NULL);
601 		fcinfo.arg[0] = MakeExpandedObjectReadOnly(peraggstate->transValue,
602 											   peraggstate->transValueIsNull,
603 												   peraggstate->transtypeLen);
604 		fcinfo.argnull[0] = peraggstate->transValueIsNull;
605 		anynull = peraggstate->transValueIsNull;
606 
607 		/* Fill any remaining argument positions with nulls */
608 		for (i = 1; i < numFinalArgs; i++)
609 		{
610 			fcinfo.arg[i] = (Datum) 0;
611 			fcinfo.argnull[i] = true;
612 			anynull = true;
613 		}
614 
615 		if (fcinfo.flinfo->fn_strict && anynull)
616 		{
617 			/* don't call a strict function with NULL inputs */
618 			*result = (Datum) 0;
619 			*isnull = true;
620 		}
621 		else
622 		{
623 			winstate->curaggcontext = peraggstate->aggcontext;
624 			*result = FunctionCallInvoke(&fcinfo);
625 			winstate->curaggcontext = NULL;
626 			*isnull = fcinfo.isnull;
627 		}
628 	}
629 	else
630 	{
631 		/* Don't need MakeExpandedObjectReadOnly; datumCopy will copy it */
632 		*result = peraggstate->transValue;
633 		*isnull = peraggstate->transValueIsNull;
634 	}
635 
636 	/*
637 	 * If result is pass-by-ref, make sure it is in the right context.
638 	 */
639 	if (!peraggstate->resulttypeByVal && !*isnull &&
640 		!MemoryContextContains(CurrentMemoryContext,
641 							   DatumGetPointer(*result)))
642 		*result = datumCopy(*result,
643 							peraggstate->resulttypeByVal,
644 							peraggstate->resulttypeLen);
645 	MemoryContextSwitchTo(oldContext);
646 }
647 
648 /*
649  * eval_windowaggregates
650  * evaluate plain aggregates being used as window functions
651  *
652  * This differs from nodeAgg.c in two ways.  First, if the window's frame
653  * start position moves, we use the inverse transition function (if it exists)
654  * to remove rows from the transition value.  And second, we expect to be
655  * able to call aggregate final functions repeatedly after aggregating more
656  * data onto the same transition value.  This is not a behavior required by
657  * nodeAgg.c.
658  */
659 static void
eval_windowaggregates(WindowAggState * winstate)660 eval_windowaggregates(WindowAggState *winstate)
661 {
662 	WindowStatePerAgg peraggstate;
663 	int			wfuncno,
664 				numaggs,
665 				numaggs_restart,
666 				i;
667 	int64		aggregatedupto_nonrestarted;
668 	MemoryContext oldContext;
669 	ExprContext *econtext;
670 	WindowObject agg_winobj;
671 	TupleTableSlot *agg_row_slot;
672 	TupleTableSlot *temp_slot;
673 
674 	numaggs = winstate->numaggs;
675 	if (numaggs == 0)
676 		return;					/* nothing to do */
677 
678 	/* final output execution is in ps_ExprContext */
679 	econtext = winstate->ss.ps.ps_ExprContext;
680 	agg_winobj = winstate->agg_winobj;
681 	agg_row_slot = winstate->agg_row_slot;
682 	temp_slot = winstate->temp_slot_1;
683 
684 	/*
685 	 * Currently, we support only a subset of the SQL-standard window framing
686 	 * rules.
687 	 *
688 	 * If the frame start is UNBOUNDED_PRECEDING, the window frame consists of
689 	 * a contiguous group of rows extending forward from the start of the
690 	 * partition, and rows only enter the frame, never exit it, as the current
691 	 * row advances forward.  This makes it possible to use an incremental
692 	 * strategy for evaluating aggregates: we run the transition function for
693 	 * each row added to the frame, and run the final function whenever we
694 	 * need the current aggregate value.  This is considerably more efficient
695 	 * than the naive approach of re-running the entire aggregate calculation
696 	 * for each current row.  It does assume that the final function doesn't
697 	 * damage the running transition value, but we have the same assumption in
698 	 * nodeAgg.c too (when it rescans an existing hash table).
699 	 *
700 	 * If the frame start does sometimes move, we can still optimize as above
701 	 * whenever successive rows share the same frame head, but if the frame
702 	 * head moves beyond the previous head we try to remove those rows using
703 	 * the aggregate's inverse transition function.  This function restores
704 	 * the aggregate's current state to what it would be if the removed row
705 	 * had never been aggregated in the first place.  Inverse transition
706 	 * functions may optionally return NULL, indicating that the function was
707 	 * unable to remove the tuple from aggregation.  If this happens, or if
708 	 * the aggregate doesn't have an inverse transition function at all, we
709 	 * must perform the aggregation all over again for all tuples within the
710 	 * new frame boundaries.
711 	 *
712 	 * In many common cases, multiple rows share the same frame and hence the
713 	 * same aggregate value. (In particular, if there's no ORDER BY in a RANGE
714 	 * window, then all rows are peers and so they all have window frame equal
715 	 * to the whole partition.)  We optimize such cases by calculating the
716 	 * aggregate value once when we reach the first row of a peer group, and
717 	 * then returning the saved value for all subsequent rows.
718 	 *
719 	 * 'aggregatedupto' keeps track of the first row that has not yet been
720 	 * accumulated into the aggregate transition values.  Whenever we start a
721 	 * new peer group, we accumulate forward to the end of the peer group.
722 	 */
723 
724 	/*
725 	 * First, update the frame head position.
726 	 *
727 	 * The frame head should never move backwards, and the code below wouldn't
728 	 * cope if it did, so for safety we complain if it does.
729 	 */
730 	update_frameheadpos(agg_winobj, temp_slot);
731 	if (winstate->frameheadpos < winstate->aggregatedbase)
732 		elog(ERROR, "window frame head moved backward");
733 
734 	/*
735 	 * If the frame didn't change compared to the previous row, we can re-use
736 	 * the result values that were previously saved at the bottom of this
737 	 * function.  Since we don't know the current frame's end yet, this is not
738 	 * possible to check for fully.  But if the frame end mode is UNBOUNDED
739 	 * FOLLOWING or CURRENT ROW, and the current row lies within the previous
740 	 * row's frame, then the two frames' ends must coincide.  Note that on the
741 	 * first row aggregatedbase == aggregatedupto, meaning this test must
742 	 * fail, so we don't need to check the "there was no previous row" case
743 	 * explicitly here.
744 	 */
745 	if (winstate->aggregatedbase == winstate->frameheadpos &&
746 		(winstate->frameOptions & (FRAMEOPTION_END_UNBOUNDED_FOLLOWING |
747 								   FRAMEOPTION_END_CURRENT_ROW)) &&
748 		winstate->aggregatedbase <= winstate->currentpos &&
749 		winstate->aggregatedupto > winstate->currentpos)
750 	{
751 		for (i = 0; i < numaggs; i++)
752 		{
753 			peraggstate = &winstate->peragg[i];
754 			wfuncno = peraggstate->wfuncno;
755 			econtext->ecxt_aggvalues[wfuncno] = peraggstate->resultValue;
756 			econtext->ecxt_aggnulls[wfuncno] = peraggstate->resultValueIsNull;
757 		}
758 		return;
759 	}
760 
761 	/*----------
762 	 * Initialize restart flags.
763 	 *
764 	 * We restart the aggregation:
765 	 *	 - if we're processing the first row in the partition, or
766 	 *	 - if the frame's head moved and we cannot use an inverse
767 	 *	   transition function, or
768 	 *	 - if the new frame doesn't overlap the old one
769 	 *
770 	 * Note that we don't strictly need to restart in the last case, but if
771 	 * we're going to remove all rows from the aggregation anyway, a restart
772 	 * surely is faster.
773 	 *----------
774 	 */
775 	numaggs_restart = 0;
776 	for (i = 0; i < numaggs; i++)
777 	{
778 		peraggstate = &winstate->peragg[i];
779 		if (winstate->currentpos == 0 ||
780 			(winstate->aggregatedbase != winstate->frameheadpos &&
781 			 !OidIsValid(peraggstate->invtransfn_oid)) ||
782 			winstate->aggregatedupto <= winstate->frameheadpos)
783 		{
784 			peraggstate->restart = true;
785 			numaggs_restart++;
786 		}
787 		else
788 			peraggstate->restart = false;
789 	}
790 
791 	/*
792 	 * If we have any possibly-moving aggregates, attempt to advance
793 	 * aggregatedbase to match the frame's head by removing input rows that
794 	 * fell off the top of the frame from the aggregations.  This can fail,
795 	 * i.e. advance_windowaggregate_base() can return false, in which case
796 	 * we'll restart that aggregate below.
797 	 */
798 	while (numaggs_restart < numaggs &&
799 		   winstate->aggregatedbase < winstate->frameheadpos)
800 	{
801 		/*
802 		 * Fetch the next tuple of those being removed. This should never fail
803 		 * as we should have been here before.
804 		 */
805 		if (!window_gettupleslot(agg_winobj, winstate->aggregatedbase,
806 								 temp_slot))
807 			elog(ERROR, "could not re-fetch previously fetched frame row");
808 
809 		/* Set tuple context for evaluation of aggregate arguments */
810 		winstate->tmpcontext->ecxt_outertuple = temp_slot;
811 
812 		/*
813 		 * Perform the inverse transition for each aggregate function in the
814 		 * window, unless it has already been marked as needing a restart.
815 		 */
816 		for (i = 0; i < numaggs; i++)
817 		{
818 			bool		ok;
819 
820 			peraggstate = &winstate->peragg[i];
821 			if (peraggstate->restart)
822 				continue;
823 
824 			wfuncno = peraggstate->wfuncno;
825 			ok = advance_windowaggregate_base(winstate,
826 											  &winstate->perfunc[wfuncno],
827 											  peraggstate);
828 			if (!ok)
829 			{
830 				/* Inverse transition function has failed, must restart */
831 				peraggstate->restart = true;
832 				numaggs_restart++;
833 			}
834 		}
835 
836 		/* Reset per-input-tuple context after each tuple */
837 		ResetExprContext(winstate->tmpcontext);
838 
839 		/* And advance the aggregated-row state */
840 		winstate->aggregatedbase++;
841 		ExecClearTuple(temp_slot);
842 	}
843 
844 	/*
845 	 * If we successfully advanced the base rows of all the aggregates,
846 	 * aggregatedbase now equals frameheadpos; but if we failed for any, we
847 	 * must forcibly update aggregatedbase.
848 	 */
849 	winstate->aggregatedbase = winstate->frameheadpos;
850 
851 	/*
852 	 * If we created a mark pointer for aggregates, keep it pushed up to frame
853 	 * head, so that tuplestore can discard unnecessary rows.
854 	 */
855 	if (agg_winobj->markptr >= 0)
856 		WinSetMarkPosition(agg_winobj, winstate->frameheadpos);
857 
858 	/*
859 	 * Now restart the aggregates that require it.
860 	 *
861 	 * We assume that aggregates using the shared context always restart if
862 	 * *any* aggregate restarts, and we may thus clean up the shared
863 	 * aggcontext if that is the case.  Private aggcontexts are reset by
864 	 * initialize_windowaggregate() if their owning aggregate restarts. If we
865 	 * aren't restarting an aggregate, we need to free any previously saved
866 	 * result for it, else we'll leak memory.
867 	 */
868 	if (numaggs_restart > 0)
869 		MemoryContextResetAndDeleteChildren(winstate->aggcontext);
870 	for (i = 0; i < numaggs; i++)
871 	{
872 		peraggstate = &winstate->peragg[i];
873 
874 		/* Aggregates using the shared ctx must restart if *any* agg does */
875 		Assert(peraggstate->aggcontext != winstate->aggcontext ||
876 			   numaggs_restart == 0 ||
877 			   peraggstate->restart);
878 
879 		if (peraggstate->restart)
880 		{
881 			wfuncno = peraggstate->wfuncno;
882 			initialize_windowaggregate(winstate,
883 									   &winstate->perfunc[wfuncno],
884 									   peraggstate);
885 		}
886 		else if (!peraggstate->resultValueIsNull)
887 		{
888 			if (!peraggstate->resulttypeByVal)
889 				pfree(DatumGetPointer(peraggstate->resultValue));
890 			peraggstate->resultValue = (Datum) 0;
891 			peraggstate->resultValueIsNull = true;
892 		}
893 	}
894 
895 	/*
896 	 * Non-restarted aggregates now contain the rows between aggregatedbase
897 	 * (i.e., frameheadpos) and aggregatedupto, while restarted aggregates
898 	 * contain no rows.  If there are any restarted aggregates, we must thus
899 	 * begin aggregating anew at frameheadpos, otherwise we may simply
900 	 * continue at aggregatedupto.  We must remember the old value of
901 	 * aggregatedupto to know how long to skip advancing non-restarted
902 	 * aggregates.  If we modify aggregatedupto, we must also clear
903 	 * agg_row_slot, per the loop invariant below.
904 	 */
905 	aggregatedupto_nonrestarted = winstate->aggregatedupto;
906 	if (numaggs_restart > 0 &&
907 		winstate->aggregatedupto != winstate->frameheadpos)
908 	{
909 		winstate->aggregatedupto = winstate->frameheadpos;
910 		ExecClearTuple(agg_row_slot);
911 	}
912 
913 	/*
914 	 * Advance until we reach a row not in frame (or end of partition).
915 	 *
916 	 * Note the loop invariant: agg_row_slot is either empty or holds the row
917 	 * at position aggregatedupto.  We advance aggregatedupto after processing
918 	 * a row.
919 	 */
920 	for (;;)
921 	{
922 		/* Fetch next row if we didn't already */
923 		if (TupIsNull(agg_row_slot))
924 		{
925 			if (!window_gettupleslot(agg_winobj, winstate->aggregatedupto,
926 									 agg_row_slot))
927 				break;			/* must be end of partition */
928 		}
929 
930 		/* Exit loop (for now) if not in frame */
931 		if (!row_is_in_frame(winstate, winstate->aggregatedupto, agg_row_slot))
932 			break;
933 
934 		/* Set tuple context for evaluation of aggregate arguments */
935 		winstate->tmpcontext->ecxt_outertuple = agg_row_slot;
936 
937 		/* Accumulate row into the aggregates */
938 		for (i = 0; i < numaggs; i++)
939 		{
940 			peraggstate = &winstate->peragg[i];
941 
942 			/* Non-restarted aggs skip until aggregatedupto_nonrestarted */
943 			if (!peraggstate->restart &&
944 				winstate->aggregatedupto < aggregatedupto_nonrestarted)
945 				continue;
946 
947 			wfuncno = peraggstate->wfuncno;
948 			advance_windowaggregate(winstate,
949 									&winstate->perfunc[wfuncno],
950 									peraggstate);
951 		}
952 
953 		/* Reset per-input-tuple context after each tuple */
954 		ResetExprContext(winstate->tmpcontext);
955 
956 		/* And advance the aggregated-row state */
957 		winstate->aggregatedupto++;
958 		ExecClearTuple(agg_row_slot);
959 	}
960 
961 	/* The frame's end is not supposed to move backwards, ever */
962 	Assert(aggregatedupto_nonrestarted <= winstate->aggregatedupto);
963 
964 	/*
965 	 * finalize aggregates and fill result/isnull fields.
966 	 */
967 	for (i = 0; i < numaggs; i++)
968 	{
969 		Datum	   *result;
970 		bool	   *isnull;
971 
972 		peraggstate = &winstate->peragg[i];
973 		wfuncno = peraggstate->wfuncno;
974 		result = &econtext->ecxt_aggvalues[wfuncno];
975 		isnull = &econtext->ecxt_aggnulls[wfuncno];
976 		finalize_windowaggregate(winstate,
977 								 &winstate->perfunc[wfuncno],
978 								 peraggstate,
979 								 result, isnull);
980 
981 		/*
982 		 * save the result in case next row shares the same frame.
983 		 *
984 		 * XXX in some framing modes, eg ROWS/END_CURRENT_ROW, we can know in
985 		 * advance that the next row can't possibly share the same frame. Is
986 		 * it worth detecting that and skipping this code?
987 		 */
988 		if (!peraggstate->resulttypeByVal && !*isnull)
989 		{
990 			oldContext = MemoryContextSwitchTo(peraggstate->aggcontext);
991 			peraggstate->resultValue =
992 				datumCopy(*result,
993 						  peraggstate->resulttypeByVal,
994 						  peraggstate->resulttypeLen);
995 			MemoryContextSwitchTo(oldContext);
996 		}
997 		else
998 		{
999 			peraggstate->resultValue = *result;
1000 		}
1001 		peraggstate->resultValueIsNull = *isnull;
1002 	}
1003 }
1004 
1005 /*
1006  * eval_windowfunction
1007  *
1008  * Arguments of window functions are not evaluated here, because a window
1009  * function can need random access to arbitrary rows in the partition.
1010  * The window function uses the special WinGetFuncArgInPartition and
1011  * WinGetFuncArgInFrame functions to evaluate the arguments for the rows
1012  * it wants.
1013  */
1014 static void
eval_windowfunction(WindowAggState * winstate,WindowStatePerFunc perfuncstate,Datum * result,bool * isnull)1015 eval_windowfunction(WindowAggState *winstate, WindowStatePerFunc perfuncstate,
1016 					Datum *result, bool *isnull)
1017 {
1018 	FunctionCallInfoData fcinfo;
1019 	MemoryContext oldContext;
1020 
1021 	oldContext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
1022 
1023 	/*
1024 	 * We don't pass any normal arguments to a window function, but we do pass
1025 	 * it the number of arguments, in order to permit window function
1026 	 * implementations to support varying numbers of arguments.  The real info
1027 	 * goes through the WindowObject, which is passed via fcinfo->context.
1028 	 */
1029 	InitFunctionCallInfoData(fcinfo, &(perfuncstate->flinfo),
1030 							 perfuncstate->numArguments,
1031 							 perfuncstate->winCollation,
1032 							 (void *) perfuncstate->winobj, NULL);
1033 	/* Just in case, make all the regular argument slots be null */
1034 	memset(fcinfo.argnull, true, perfuncstate->numArguments);
1035 	/* Window functions don't have a current aggregate context, either */
1036 	winstate->curaggcontext = NULL;
1037 
1038 	*result = FunctionCallInvoke(&fcinfo);
1039 	*isnull = fcinfo.isnull;
1040 
1041 	/*
1042 	 * Make sure pass-by-ref data is allocated in the appropriate context. (We
1043 	 * need this in case the function returns a pointer into some short-lived
1044 	 * tuple, as is entirely possible.)
1045 	 */
1046 	if (!perfuncstate->resulttypeByVal && !fcinfo.isnull &&
1047 		!MemoryContextContains(CurrentMemoryContext,
1048 							   DatumGetPointer(*result)))
1049 		*result = datumCopy(*result,
1050 							perfuncstate->resulttypeByVal,
1051 							perfuncstate->resulttypeLen);
1052 
1053 	MemoryContextSwitchTo(oldContext);
1054 }
1055 
1056 /*
1057  * begin_partition
1058  * Start buffering rows of the next partition.
1059  */
1060 static void
begin_partition(WindowAggState * winstate)1061 begin_partition(WindowAggState *winstate)
1062 {
1063 	PlanState  *outerPlan = outerPlanState(winstate);
1064 	int			numfuncs = winstate->numfuncs;
1065 	int			i;
1066 
1067 	winstate->partition_spooled = false;
1068 	winstate->framehead_valid = false;
1069 	winstate->frametail_valid = false;
1070 	winstate->spooled_rows = 0;
1071 	winstate->currentpos = 0;
1072 	winstate->frameheadpos = 0;
1073 	winstate->frametailpos = -1;
1074 	ExecClearTuple(winstate->agg_row_slot);
1075 
1076 	/*
1077 	 * If this is the very first partition, we need to fetch the first input
1078 	 * row to store in first_part_slot.
1079 	 */
1080 	if (TupIsNull(winstate->first_part_slot))
1081 	{
1082 		TupleTableSlot *outerslot = ExecProcNode(outerPlan);
1083 
1084 		if (!TupIsNull(outerslot))
1085 			ExecCopySlot(winstate->first_part_slot, outerslot);
1086 		else
1087 		{
1088 			/* outer plan is empty, so we have nothing to do */
1089 			winstate->partition_spooled = true;
1090 			winstate->more_partitions = false;
1091 			return;
1092 		}
1093 	}
1094 
1095 	/* Create new tuplestore for this partition */
1096 	winstate->buffer = tuplestore_begin_heap(false, false, work_mem);
1097 
1098 	/*
1099 	 * Set up read pointers for the tuplestore.  The current pointer doesn't
1100 	 * need BACKWARD capability, but the per-window-function read pointers do,
1101 	 * and the aggregate pointer does if frame start is movable.
1102 	 */
1103 	winstate->current_ptr = 0;	/* read pointer 0 is pre-allocated */
1104 
1105 	/* reset default REWIND capability bit for current ptr */
1106 	tuplestore_set_eflags(winstate->buffer, 0);
1107 
1108 	/* create read pointers for aggregates, if needed */
1109 	if (winstate->numaggs > 0)
1110 	{
1111 		WindowObject agg_winobj = winstate->agg_winobj;
1112 		int			readptr_flags = 0;
1113 
1114 		/* If the frame head is potentially movable ... */
1115 		if (!(winstate->frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING))
1116 		{
1117 			/* ... create a mark pointer to track the frame head */
1118 			agg_winobj->markptr = tuplestore_alloc_read_pointer(winstate->buffer, 0);
1119 			/* and the read pointer will need BACKWARD capability */
1120 			readptr_flags |= EXEC_FLAG_BACKWARD;
1121 		}
1122 
1123 		agg_winobj->readptr = tuplestore_alloc_read_pointer(winstate->buffer,
1124 															readptr_flags);
1125 		agg_winobj->markpos = -1;
1126 		agg_winobj->seekpos = -1;
1127 
1128 		/* Also reset the row counters for aggregates */
1129 		winstate->aggregatedbase = 0;
1130 		winstate->aggregatedupto = 0;
1131 	}
1132 
1133 	/* create mark and read pointers for each real window function */
1134 	for (i = 0; i < numfuncs; i++)
1135 	{
1136 		WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
1137 
1138 		if (!perfuncstate->plain_agg)
1139 		{
1140 			WindowObject winobj = perfuncstate->winobj;
1141 
1142 			winobj->markptr = tuplestore_alloc_read_pointer(winstate->buffer,
1143 															0);
1144 			winobj->readptr = tuplestore_alloc_read_pointer(winstate->buffer,
1145 														 EXEC_FLAG_BACKWARD);
1146 			winobj->markpos = -1;
1147 			winobj->seekpos = -1;
1148 		}
1149 	}
1150 
1151 	/*
1152 	 * Store the first tuple into the tuplestore (it's always available now;
1153 	 * we either read it above, or saved it at the end of previous partition)
1154 	 */
1155 	tuplestore_puttupleslot(winstate->buffer, winstate->first_part_slot);
1156 	winstate->spooled_rows++;
1157 }
1158 
1159 /*
1160  * Read tuples from the outer node, up to and including position 'pos', and
1161  * store them into the tuplestore. If pos is -1, reads the whole partition.
1162  */
1163 static void
spool_tuples(WindowAggState * winstate,int64 pos)1164 spool_tuples(WindowAggState *winstate, int64 pos)
1165 {
1166 	WindowAgg  *node = (WindowAgg *) winstate->ss.ps.plan;
1167 	PlanState  *outerPlan;
1168 	TupleTableSlot *outerslot;
1169 	MemoryContext oldcontext;
1170 
1171 	if (!winstate->buffer)
1172 		return;					/* just a safety check */
1173 	if (winstate->partition_spooled)
1174 		return;					/* whole partition done already */
1175 
1176 	/*
1177 	 * If the tuplestore has spilled to disk, alternate reading and writing
1178 	 * becomes quite expensive due to frequent buffer flushes.  It's cheaper
1179 	 * to force the entire partition to get spooled in one go.
1180 	 *
1181 	 * XXX this is a horrid kluge --- it'd be better to fix the performance
1182 	 * problem inside tuplestore.  FIXME
1183 	 */
1184 	if (!tuplestore_in_memory(winstate->buffer))
1185 		pos = -1;
1186 
1187 	outerPlan = outerPlanState(winstate);
1188 
1189 	/* Must be in query context to call outerplan */
1190 	oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
1191 
1192 	while (winstate->spooled_rows <= pos || pos == -1)
1193 	{
1194 		outerslot = ExecProcNode(outerPlan);
1195 		if (TupIsNull(outerslot))
1196 		{
1197 			/* reached the end of the last partition */
1198 			winstate->partition_spooled = true;
1199 			winstate->more_partitions = false;
1200 			break;
1201 		}
1202 
1203 		if (node->partNumCols > 0)
1204 		{
1205 			/* Check if this tuple still belongs to the current partition */
1206 			if (!execTuplesMatch(winstate->first_part_slot,
1207 								 outerslot,
1208 								 node->partNumCols, node->partColIdx,
1209 								 winstate->partEqfunctions,
1210 								 winstate->tmpcontext->ecxt_per_tuple_memory))
1211 			{
1212 				/*
1213 				 * end of partition; copy the tuple for the next cycle.
1214 				 */
1215 				ExecCopySlot(winstate->first_part_slot, outerslot);
1216 				winstate->partition_spooled = true;
1217 				winstate->more_partitions = true;
1218 				break;
1219 			}
1220 		}
1221 
1222 		/* Still in partition, so save it into the tuplestore */
1223 		tuplestore_puttupleslot(winstate->buffer, outerslot);
1224 		winstate->spooled_rows++;
1225 	}
1226 
1227 	MemoryContextSwitchTo(oldcontext);
1228 }
1229 
1230 /*
1231  * release_partition
1232  * clear information kept within a partition, including
1233  * tuplestore and aggregate results.
1234  */
1235 static void
release_partition(WindowAggState * winstate)1236 release_partition(WindowAggState *winstate)
1237 {
1238 	int			i;
1239 
1240 	for (i = 0; i < winstate->numfuncs; i++)
1241 	{
1242 		WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
1243 
1244 		/* Release any partition-local state of this window function */
1245 		if (perfuncstate->winobj)
1246 			perfuncstate->winobj->localmem = NULL;
1247 	}
1248 
1249 	/*
1250 	 * Release all partition-local memory (in particular, any partition-local
1251 	 * state that we might have trashed our pointers to in the above loop, and
1252 	 * any aggregate temp data).  We don't rely on retail pfree because some
1253 	 * aggregates might have allocated data we don't have direct pointers to.
1254 	 */
1255 	MemoryContextResetAndDeleteChildren(winstate->partcontext);
1256 	MemoryContextResetAndDeleteChildren(winstate->aggcontext);
1257 	for (i = 0; i < winstate->numaggs; i++)
1258 	{
1259 		if (winstate->peragg[i].aggcontext != winstate->aggcontext)
1260 			MemoryContextResetAndDeleteChildren(winstate->peragg[i].aggcontext);
1261 	}
1262 
1263 	if (winstate->buffer)
1264 		tuplestore_end(winstate->buffer);
1265 	winstate->buffer = NULL;
1266 	winstate->partition_spooled = false;
1267 }
1268 
1269 /*
1270  * row_is_in_frame
1271  * Determine whether a row is in the current row's window frame according
1272  * to our window framing rule
1273  *
1274  * The caller must have already determined that the row is in the partition
1275  * and fetched it into a slot.  This function just encapsulates the framing
1276  * rules.
1277  */
1278 static bool
row_is_in_frame(WindowAggState * winstate,int64 pos,TupleTableSlot * slot)1279 row_is_in_frame(WindowAggState *winstate, int64 pos, TupleTableSlot *slot)
1280 {
1281 	int			frameOptions = winstate->frameOptions;
1282 
1283 	Assert(pos >= 0);			/* else caller error */
1284 
1285 	/* First, check frame starting conditions */
1286 	if (frameOptions & FRAMEOPTION_START_CURRENT_ROW)
1287 	{
1288 		if (frameOptions & FRAMEOPTION_ROWS)
1289 		{
1290 			/* rows before current row are out of frame */
1291 			if (pos < winstate->currentpos)
1292 				return false;
1293 		}
1294 		else if (frameOptions & FRAMEOPTION_RANGE)
1295 		{
1296 			/* preceding row that is not peer is out of frame */
1297 			if (pos < winstate->currentpos &&
1298 				!are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
1299 				return false;
1300 		}
1301 		else
1302 			Assert(false);
1303 	}
1304 	else if (frameOptions & FRAMEOPTION_START_VALUE)
1305 	{
1306 		if (frameOptions & FRAMEOPTION_ROWS)
1307 		{
1308 			int64		offset = DatumGetInt64(winstate->startOffsetValue);
1309 
1310 			/* rows before current row + offset are out of frame */
1311 			if (frameOptions & FRAMEOPTION_START_VALUE_PRECEDING)
1312 				offset = -offset;
1313 
1314 			if (pos < winstate->currentpos + offset)
1315 				return false;
1316 		}
1317 		else if (frameOptions & FRAMEOPTION_RANGE)
1318 		{
1319 			/* parser should have rejected this */
1320 			elog(ERROR, "window frame with value offset is not implemented");
1321 		}
1322 		else
1323 			Assert(false);
1324 	}
1325 
1326 	/* Okay so far, now check frame ending conditions */
1327 	if (frameOptions & FRAMEOPTION_END_CURRENT_ROW)
1328 	{
1329 		if (frameOptions & FRAMEOPTION_ROWS)
1330 		{
1331 			/* rows after current row are out of frame */
1332 			if (pos > winstate->currentpos)
1333 				return false;
1334 		}
1335 		else if (frameOptions & FRAMEOPTION_RANGE)
1336 		{
1337 			/* following row that is not peer is out of frame */
1338 			if (pos > winstate->currentpos &&
1339 				!are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
1340 				return false;
1341 		}
1342 		else
1343 			Assert(false);
1344 	}
1345 	else if (frameOptions & FRAMEOPTION_END_VALUE)
1346 	{
1347 		if (frameOptions & FRAMEOPTION_ROWS)
1348 		{
1349 			int64		offset = DatumGetInt64(winstate->endOffsetValue);
1350 
1351 			/* rows after current row + offset are out of frame */
1352 			if (frameOptions & FRAMEOPTION_END_VALUE_PRECEDING)
1353 				offset = -offset;
1354 
1355 			if (pos > winstate->currentpos + offset)
1356 				return false;
1357 		}
1358 		else if (frameOptions & FRAMEOPTION_RANGE)
1359 		{
1360 			/* parser should have rejected this */
1361 			elog(ERROR, "window frame with value offset is not implemented");
1362 		}
1363 		else
1364 			Assert(false);
1365 	}
1366 
1367 	/* If we get here, it's in frame */
1368 	return true;
1369 }
1370 
1371 /*
1372  * update_frameheadpos
1373  * make frameheadpos valid for the current row
1374  *
1375  * Uses the winobj's read pointer for any required fetches; hence, if the
1376  * frame mode is one that requires row comparisons, the winobj's mark must
1377  * not be past the currently known frame head.  Also uses the specified slot
1378  * for any required fetches.
1379  */
1380 static void
update_frameheadpos(WindowObject winobj,TupleTableSlot * slot)1381 update_frameheadpos(WindowObject winobj, TupleTableSlot *slot)
1382 {
1383 	WindowAggState *winstate = winobj->winstate;
1384 	WindowAgg  *node = (WindowAgg *) winstate->ss.ps.plan;
1385 	int			frameOptions = winstate->frameOptions;
1386 
1387 	if (winstate->framehead_valid)
1388 		return;					/* already known for current row */
1389 
1390 	if (frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING)
1391 	{
1392 		/* In UNBOUNDED PRECEDING mode, frame head is always row 0 */
1393 		winstate->frameheadpos = 0;
1394 		winstate->framehead_valid = true;
1395 	}
1396 	else if (frameOptions & FRAMEOPTION_START_CURRENT_ROW)
1397 	{
1398 		if (frameOptions & FRAMEOPTION_ROWS)
1399 		{
1400 			/* In ROWS mode, frame head is the same as current */
1401 			winstate->frameheadpos = winstate->currentpos;
1402 			winstate->framehead_valid = true;
1403 		}
1404 		else if (frameOptions & FRAMEOPTION_RANGE)
1405 		{
1406 			int64		fhprev;
1407 
1408 			/* If no ORDER BY, all rows are peers with each other */
1409 			if (node->ordNumCols == 0)
1410 			{
1411 				winstate->frameheadpos = 0;
1412 				winstate->framehead_valid = true;
1413 				return;
1414 			}
1415 
1416 			/*
1417 			 * In RANGE START_CURRENT mode, frame head is the first row that
1418 			 * is a peer of current row.  We search backwards from current,
1419 			 * which could be a bit inefficient if peer sets are large. Might
1420 			 * be better to have a separate read pointer that moves forward
1421 			 * tracking the frame head.
1422 			 */
1423 			fhprev = winstate->currentpos - 1;
1424 			for (;;)
1425 			{
1426 				/* assume the frame head can't go backwards */
1427 				if (fhprev < winstate->frameheadpos)
1428 					break;
1429 				if (!window_gettupleslot(winobj, fhprev, slot))
1430 					break;		/* start of partition */
1431 				if (!are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
1432 					break;		/* not peer of current row */
1433 				fhprev--;
1434 			}
1435 			winstate->frameheadpos = fhprev + 1;
1436 			winstate->framehead_valid = true;
1437 		}
1438 		else
1439 			Assert(false);
1440 	}
1441 	else if (frameOptions & FRAMEOPTION_START_VALUE)
1442 	{
1443 		if (frameOptions & FRAMEOPTION_ROWS)
1444 		{
1445 			/* In ROWS mode, bound is physically n before/after current */
1446 			int64		offset = DatumGetInt64(winstate->startOffsetValue);
1447 
1448 			if (frameOptions & FRAMEOPTION_START_VALUE_PRECEDING)
1449 				offset = -offset;
1450 
1451 			winstate->frameheadpos = winstate->currentpos + offset;
1452 			/* frame head can't go before first row */
1453 			if (winstate->frameheadpos < 0)
1454 				winstate->frameheadpos = 0;
1455 			else if (winstate->frameheadpos > winstate->currentpos)
1456 			{
1457 				/* make sure frameheadpos is not past end of partition */
1458 				spool_tuples(winstate, winstate->frameheadpos - 1);
1459 				if (winstate->frameheadpos > winstate->spooled_rows)
1460 					winstate->frameheadpos = winstate->spooled_rows;
1461 			}
1462 			winstate->framehead_valid = true;
1463 		}
1464 		else if (frameOptions & FRAMEOPTION_RANGE)
1465 		{
1466 			/* parser should have rejected this */
1467 			elog(ERROR, "window frame with value offset is not implemented");
1468 		}
1469 		else
1470 			Assert(false);
1471 	}
1472 	else
1473 		Assert(false);
1474 }
1475 
1476 /*
1477  * update_frametailpos
1478  * make frametailpos valid for the current row
1479  *
1480  * Uses the winobj's read pointer for any required fetches; hence, if the
1481  * frame mode is one that requires row comparisons, the winobj's mark must
1482  * not be past the currently known frame tail.  Also uses the specified slot
1483  * for any required fetches.
1484  */
1485 static void
update_frametailpos(WindowObject winobj,TupleTableSlot * slot)1486 update_frametailpos(WindowObject winobj, TupleTableSlot *slot)
1487 {
1488 	WindowAggState *winstate = winobj->winstate;
1489 	WindowAgg  *node = (WindowAgg *) winstate->ss.ps.plan;
1490 	int			frameOptions = winstate->frameOptions;
1491 
1492 	if (winstate->frametail_valid)
1493 		return;					/* already known for current row */
1494 
1495 	if (frameOptions & FRAMEOPTION_END_UNBOUNDED_FOLLOWING)
1496 	{
1497 		/* In UNBOUNDED FOLLOWING mode, all partition rows are in frame */
1498 		spool_tuples(winstate, -1);
1499 		winstate->frametailpos = winstate->spooled_rows - 1;
1500 		winstate->frametail_valid = true;
1501 	}
1502 	else if (frameOptions & FRAMEOPTION_END_CURRENT_ROW)
1503 	{
1504 		if (frameOptions & FRAMEOPTION_ROWS)
1505 		{
1506 			/* In ROWS mode, exactly the rows up to current are in frame */
1507 			winstate->frametailpos = winstate->currentpos;
1508 			winstate->frametail_valid = true;
1509 		}
1510 		else if (frameOptions & FRAMEOPTION_RANGE)
1511 		{
1512 			int64		ftnext;
1513 
1514 			/* If no ORDER BY, all rows are peers with each other */
1515 			if (node->ordNumCols == 0)
1516 			{
1517 				spool_tuples(winstate, -1);
1518 				winstate->frametailpos = winstate->spooled_rows - 1;
1519 				winstate->frametail_valid = true;
1520 				return;
1521 			}
1522 
1523 			/*
1524 			 * Else we have to search for the first non-peer of the current
1525 			 * row.  We assume the current value of frametailpos is a lower
1526 			 * bound on the possible frame tail location, ie, frame tail never
1527 			 * goes backward, and that currentpos is also a lower bound, ie,
1528 			 * frame end always >= current row.
1529 			 */
1530 			ftnext = Max(winstate->frametailpos, winstate->currentpos) + 1;
1531 			for (;;)
1532 			{
1533 				if (!window_gettupleslot(winobj, ftnext, slot))
1534 					break;		/* end of partition */
1535 				if (!are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
1536 					break;		/* not peer of current row */
1537 				ftnext++;
1538 			}
1539 			winstate->frametailpos = ftnext - 1;
1540 			winstate->frametail_valid = true;
1541 		}
1542 		else
1543 			Assert(false);
1544 	}
1545 	else if (frameOptions & FRAMEOPTION_END_VALUE)
1546 	{
1547 		if (frameOptions & FRAMEOPTION_ROWS)
1548 		{
1549 			/* In ROWS mode, bound is physically n before/after current */
1550 			int64		offset = DatumGetInt64(winstate->endOffsetValue);
1551 
1552 			if (frameOptions & FRAMEOPTION_END_VALUE_PRECEDING)
1553 				offset = -offset;
1554 
1555 			winstate->frametailpos = winstate->currentpos + offset;
1556 			/* smallest allowable value of frametailpos is -1 */
1557 			if (winstate->frametailpos < 0)
1558 				winstate->frametailpos = -1;
1559 			else if (winstate->frametailpos > winstate->currentpos)
1560 			{
1561 				/* make sure frametailpos is not past last row of partition */
1562 				spool_tuples(winstate, winstate->frametailpos);
1563 				if (winstate->frametailpos >= winstate->spooled_rows)
1564 					winstate->frametailpos = winstate->spooled_rows - 1;
1565 			}
1566 			winstate->frametail_valid = true;
1567 		}
1568 		else if (frameOptions & FRAMEOPTION_RANGE)
1569 		{
1570 			/* parser should have rejected this */
1571 			elog(ERROR, "window frame with value offset is not implemented");
1572 		}
1573 		else
1574 			Assert(false);
1575 	}
1576 	else
1577 		Assert(false);
1578 }
1579 
1580 
1581 /* -----------------
1582  * ExecWindowAgg
1583  *
1584  *	ExecWindowAgg receives tuples from its outer subplan and
1585  *	stores them into a tuplestore, then processes window functions.
1586  *	This node doesn't reduce nor qualify any row so the number of
1587  *	returned rows is exactly the same as its outer subplan's result
1588  *	(ignoring the case of SRFs in the targetlist, that is).
1589  * -----------------
1590  */
1591 TupleTableSlot *
ExecWindowAgg(WindowAggState * winstate)1592 ExecWindowAgg(WindowAggState *winstate)
1593 {
1594 	TupleTableSlot *result;
1595 	ExprDoneCond isDone;
1596 	ExprContext *econtext;
1597 	int			i;
1598 	int			numfuncs;
1599 
1600 	if (winstate->all_done)
1601 		return NULL;
1602 
1603 	/*
1604 	 * Check to see if we're still projecting out tuples from a previous
1605 	 * output tuple (because there is a function-returning-set in the
1606 	 * projection expressions).  If so, try to project another one.
1607 	 */
1608 	if (winstate->ss.ps.ps_TupFromTlist)
1609 	{
1610 		TupleTableSlot *result;
1611 		ExprDoneCond isDone;
1612 
1613 		result = ExecProject(winstate->ss.ps.ps_ProjInfo, &isDone);
1614 		if (isDone == ExprMultipleResult)
1615 			return result;
1616 		/* Done with that source tuple... */
1617 		winstate->ss.ps.ps_TupFromTlist = false;
1618 	}
1619 
1620 	/*
1621 	 * Compute frame offset values, if any, during first call.
1622 	 */
1623 	if (winstate->all_first)
1624 	{
1625 		int			frameOptions = winstate->frameOptions;
1626 		ExprContext *econtext = winstate->ss.ps.ps_ExprContext;
1627 		Datum		value;
1628 		bool		isnull;
1629 		int16		len;
1630 		bool		byval;
1631 
1632 		if (frameOptions & FRAMEOPTION_START_VALUE)
1633 		{
1634 			Assert(winstate->startOffset != NULL);
1635 			value = ExecEvalExprSwitchContext(winstate->startOffset,
1636 											  econtext,
1637 											  &isnull,
1638 											  NULL);
1639 			if (isnull)
1640 				ereport(ERROR,
1641 						(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
1642 						 errmsg("frame starting offset must not be null")));
1643 			/* copy value into query-lifespan context */
1644 			get_typlenbyval(exprType((Node *) winstate->startOffset->expr),
1645 							&len, &byval);
1646 			winstate->startOffsetValue = datumCopy(value, byval, len);
1647 			if (frameOptions & FRAMEOPTION_ROWS)
1648 			{
1649 				/* value is known to be int8 */
1650 				int64		offset = DatumGetInt64(value);
1651 
1652 				if (offset < 0)
1653 					ereport(ERROR,
1654 							(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1655 					  errmsg("frame starting offset must not be negative")));
1656 			}
1657 		}
1658 		if (frameOptions & FRAMEOPTION_END_VALUE)
1659 		{
1660 			Assert(winstate->endOffset != NULL);
1661 			value = ExecEvalExprSwitchContext(winstate->endOffset,
1662 											  econtext,
1663 											  &isnull,
1664 											  NULL);
1665 			if (isnull)
1666 				ereport(ERROR,
1667 						(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
1668 						 errmsg("frame ending offset must not be null")));
1669 			/* copy value into query-lifespan context */
1670 			get_typlenbyval(exprType((Node *) winstate->endOffset->expr),
1671 							&len, &byval);
1672 			winstate->endOffsetValue = datumCopy(value, byval, len);
1673 			if (frameOptions & FRAMEOPTION_ROWS)
1674 			{
1675 				/* value is known to be int8 */
1676 				int64		offset = DatumGetInt64(value);
1677 
1678 				if (offset < 0)
1679 					ereport(ERROR,
1680 							(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1681 						errmsg("frame ending offset must not be negative")));
1682 			}
1683 		}
1684 		winstate->all_first = false;
1685 	}
1686 
1687 restart:
1688 	if (winstate->buffer == NULL)
1689 	{
1690 		/* Initialize for first partition and set current row = 0 */
1691 		begin_partition(winstate);
1692 		/* If there are no input rows, we'll detect that and exit below */
1693 	}
1694 	else
1695 	{
1696 		/* Advance current row within partition */
1697 		winstate->currentpos++;
1698 		/* This might mean that the frame moves, too */
1699 		winstate->framehead_valid = false;
1700 		winstate->frametail_valid = false;
1701 	}
1702 
1703 	/*
1704 	 * Spool all tuples up to and including the current row, if we haven't
1705 	 * already
1706 	 */
1707 	spool_tuples(winstate, winstate->currentpos);
1708 
1709 	/* Move to the next partition if we reached the end of this partition */
1710 	if (winstate->partition_spooled &&
1711 		winstate->currentpos >= winstate->spooled_rows)
1712 	{
1713 		release_partition(winstate);
1714 
1715 		if (winstate->more_partitions)
1716 		{
1717 			begin_partition(winstate);
1718 			Assert(winstate->spooled_rows > 0);
1719 		}
1720 		else
1721 		{
1722 			winstate->all_done = true;
1723 			return NULL;
1724 		}
1725 	}
1726 
1727 	/* final output execution is in ps_ExprContext */
1728 	econtext = winstate->ss.ps.ps_ExprContext;
1729 
1730 	/* Clear the per-output-tuple context for current row */
1731 	ResetExprContext(econtext);
1732 
1733 	/*
1734 	 * Read the current row from the tuplestore, and save in ScanTupleSlot.
1735 	 * (We can't rely on the outerplan's output slot because we may have to
1736 	 * read beyond the current row.  Also, we have to actually copy the row
1737 	 * out of the tuplestore, since window function evaluation might cause the
1738 	 * tuplestore to dump its state to disk.)
1739 	 *
1740 	 * Current row must be in the tuplestore, since we spooled it above.
1741 	 */
1742 	tuplestore_select_read_pointer(winstate->buffer, winstate->current_ptr);
1743 	if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1744 								 winstate->ss.ss_ScanTupleSlot))
1745 		elog(ERROR, "unexpected end of tuplestore");
1746 
1747 	/*
1748 	 * Evaluate true window functions
1749 	 */
1750 	numfuncs = winstate->numfuncs;
1751 	for (i = 0; i < numfuncs; i++)
1752 	{
1753 		WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
1754 
1755 		if (perfuncstate->plain_agg)
1756 			continue;
1757 		eval_windowfunction(winstate, perfuncstate,
1758 			  &(econtext->ecxt_aggvalues[perfuncstate->wfuncstate->wfuncno]),
1759 			  &(econtext->ecxt_aggnulls[perfuncstate->wfuncstate->wfuncno]));
1760 	}
1761 
1762 	/*
1763 	 * Evaluate aggregates
1764 	 */
1765 	if (winstate->numaggs > 0)
1766 		eval_windowaggregates(winstate);
1767 
1768 	/*
1769 	 * Truncate any no-longer-needed rows from the tuplestore.
1770 	 */
1771 	tuplestore_trim(winstate->buffer);
1772 
1773 	/*
1774 	 * Form and return a projection tuple using the windowfunc results and the
1775 	 * current row.  Setting ecxt_outertuple arranges that any Vars will be
1776 	 * evaluated with respect to that row.
1777 	 */
1778 	econtext->ecxt_outertuple = winstate->ss.ss_ScanTupleSlot;
1779 	result = ExecProject(winstate->ss.ps.ps_ProjInfo, &isDone);
1780 
1781 	if (isDone == ExprEndResult)
1782 	{
1783 		/* SRF in tlist returned no rows, so advance to next input tuple */
1784 		goto restart;
1785 	}
1786 
1787 	winstate->ss.ps.ps_TupFromTlist =
1788 		(isDone == ExprMultipleResult);
1789 	return result;
1790 }
1791 
1792 /* -----------------
1793  * ExecInitWindowAgg
1794  *
1795  *	Creates the run-time information for the WindowAgg node produced by the
1796  *	planner and initializes its outer subtree
1797  * -----------------
1798  */
1799 WindowAggState *
ExecInitWindowAgg(WindowAgg * node,EState * estate,int eflags)1800 ExecInitWindowAgg(WindowAgg *node, EState *estate, int eflags)
1801 {
1802 	WindowAggState *winstate;
1803 	Plan	   *outerPlan;
1804 	ExprContext *econtext;
1805 	ExprContext *tmpcontext;
1806 	WindowStatePerFunc perfunc;
1807 	WindowStatePerAgg peragg;
1808 	int			numfuncs,
1809 				wfuncno,
1810 				numaggs,
1811 				aggno;
1812 	ListCell   *l;
1813 
1814 	/* check for unsupported flags */
1815 	Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
1816 
1817 	/*
1818 	 * create state structure
1819 	 */
1820 	winstate = makeNode(WindowAggState);
1821 	winstate->ss.ps.plan = (Plan *) node;
1822 	winstate->ss.ps.state = estate;
1823 
1824 	/*
1825 	 * Create expression contexts.  We need two, one for per-input-tuple
1826 	 * processing and one for per-output-tuple processing.  We cheat a little
1827 	 * by using ExecAssignExprContext() to build both.
1828 	 */
1829 	ExecAssignExprContext(estate, &winstate->ss.ps);
1830 	tmpcontext = winstate->ss.ps.ps_ExprContext;
1831 	winstate->tmpcontext = tmpcontext;
1832 	ExecAssignExprContext(estate, &winstate->ss.ps);
1833 
1834 	/* Create long-lived context for storage of partition-local memory etc */
1835 	winstate->partcontext =
1836 		AllocSetContextCreate(CurrentMemoryContext,
1837 							  "WindowAgg Partition",
1838 							  ALLOCSET_DEFAULT_SIZES);
1839 
1840 	/*
1841 	 * Create mid-lived context for aggregate trans values etc.
1842 	 *
1843 	 * Note that moving aggregates each use their own private context, not
1844 	 * this one.
1845 	 */
1846 	winstate->aggcontext =
1847 		AllocSetContextCreate(CurrentMemoryContext,
1848 							  "WindowAgg Aggregates",
1849 							  ALLOCSET_DEFAULT_SIZES);
1850 
1851 	/*
1852 	 * tuple table initialization
1853 	 */
1854 	ExecInitScanTupleSlot(estate, &winstate->ss);
1855 	ExecInitResultTupleSlot(estate, &winstate->ss.ps);
1856 	winstate->first_part_slot = ExecInitExtraTupleSlot(estate);
1857 	winstate->agg_row_slot = ExecInitExtraTupleSlot(estate);
1858 	winstate->temp_slot_1 = ExecInitExtraTupleSlot(estate);
1859 	winstate->temp_slot_2 = ExecInitExtraTupleSlot(estate);
1860 
1861 	winstate->ss.ps.targetlist = (List *)
1862 		ExecInitExpr((Expr *) node->plan.targetlist,
1863 					 (PlanState *) winstate);
1864 
1865 	/*
1866 	 * WindowAgg nodes never have quals, since they can only occur at the
1867 	 * logical top level of a query (ie, after any WHERE or HAVING filters)
1868 	 */
1869 	Assert(node->plan.qual == NIL);
1870 	winstate->ss.ps.qual = NIL;
1871 
1872 	/*
1873 	 * initialize child nodes
1874 	 */
1875 	outerPlan = outerPlan(node);
1876 	outerPlanState(winstate) = ExecInitNode(outerPlan, estate, eflags);
1877 
1878 	/*
1879 	 * initialize source tuple type (which is also the tuple type that we'll
1880 	 * store in the tuplestore and use in all our working slots).
1881 	 */
1882 	ExecAssignScanTypeFromOuterPlan(&winstate->ss);
1883 
1884 	ExecSetSlotDescriptor(winstate->first_part_slot,
1885 						  winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor);
1886 	ExecSetSlotDescriptor(winstate->agg_row_slot,
1887 						  winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor);
1888 	ExecSetSlotDescriptor(winstate->temp_slot_1,
1889 						  winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor);
1890 	ExecSetSlotDescriptor(winstate->temp_slot_2,
1891 						  winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor);
1892 
1893 	/*
1894 	 * Initialize result tuple type and projection info.
1895 	 */
1896 	ExecAssignResultTypeFromTL(&winstate->ss.ps);
1897 	ExecAssignProjectionInfo(&winstate->ss.ps, NULL);
1898 
1899 	winstate->ss.ps.ps_TupFromTlist = false;
1900 
1901 	/* Set up data for comparing tuples */
1902 	if (node->partNumCols > 0)
1903 		winstate->partEqfunctions = execTuplesMatchPrepare(node->partNumCols,
1904 														node->partOperators);
1905 	if (node->ordNumCols > 0)
1906 		winstate->ordEqfunctions = execTuplesMatchPrepare(node->ordNumCols,
1907 														  node->ordOperators);
1908 
1909 	/*
1910 	 * WindowAgg nodes use aggvalues and aggnulls as well as Agg nodes.
1911 	 */
1912 	numfuncs = winstate->numfuncs;
1913 	numaggs = winstate->numaggs;
1914 	econtext = winstate->ss.ps.ps_ExprContext;
1915 	econtext->ecxt_aggvalues = (Datum *) palloc0(sizeof(Datum) * numfuncs);
1916 	econtext->ecxt_aggnulls = (bool *) palloc0(sizeof(bool) * numfuncs);
1917 
1918 	/*
1919 	 * allocate per-wfunc/per-agg state information.
1920 	 */
1921 	perfunc = (WindowStatePerFunc) palloc0(sizeof(WindowStatePerFuncData) * numfuncs);
1922 	peragg = (WindowStatePerAgg) palloc0(sizeof(WindowStatePerAggData) * numaggs);
1923 	winstate->perfunc = perfunc;
1924 	winstate->peragg = peragg;
1925 
1926 	wfuncno = -1;
1927 	aggno = -1;
1928 	foreach(l, winstate->funcs)
1929 	{
1930 		WindowFuncExprState *wfuncstate = (WindowFuncExprState *) lfirst(l);
1931 		WindowFunc *wfunc = (WindowFunc *) wfuncstate->xprstate.expr;
1932 		WindowStatePerFunc perfuncstate;
1933 		AclResult	aclresult;
1934 		int			i;
1935 
1936 		if (wfunc->winref != node->winref)		/* planner screwed up? */
1937 			elog(ERROR, "WindowFunc with winref %u assigned to WindowAgg with winref %u",
1938 				 wfunc->winref, node->winref);
1939 
1940 		/* Look for a previous duplicate window function */
1941 		for (i = 0; i <= wfuncno; i++)
1942 		{
1943 			if (equal(wfunc, perfunc[i].wfunc) &&
1944 				!contain_volatile_functions((Node *) wfunc))
1945 				break;
1946 		}
1947 		if (i <= wfuncno)
1948 		{
1949 			/* Found a match to an existing entry, so just mark it */
1950 			wfuncstate->wfuncno = i;
1951 			continue;
1952 		}
1953 
1954 		/* Nope, so assign a new PerAgg record */
1955 		perfuncstate = &perfunc[++wfuncno];
1956 
1957 		/* Mark WindowFunc state node with assigned index in the result array */
1958 		wfuncstate->wfuncno = wfuncno;
1959 
1960 		/* Check permission to call window function */
1961 		aclresult = pg_proc_aclcheck(wfunc->winfnoid, GetUserId(),
1962 									 ACL_EXECUTE);
1963 		if (aclresult != ACLCHECK_OK)
1964 			aclcheck_error(aclresult, ACL_KIND_PROC,
1965 						   get_func_name(wfunc->winfnoid));
1966 		InvokeFunctionExecuteHook(wfunc->winfnoid);
1967 
1968 		/* Fill in the perfuncstate data */
1969 		perfuncstate->wfuncstate = wfuncstate;
1970 		perfuncstate->wfunc = wfunc;
1971 		perfuncstate->numArguments = list_length(wfuncstate->args);
1972 
1973 		fmgr_info_cxt(wfunc->winfnoid, &perfuncstate->flinfo,
1974 					  econtext->ecxt_per_query_memory);
1975 		fmgr_info_set_expr((Node *) wfunc, &perfuncstate->flinfo);
1976 
1977 		perfuncstate->winCollation = wfunc->inputcollid;
1978 
1979 		get_typlenbyval(wfunc->wintype,
1980 						&perfuncstate->resulttypeLen,
1981 						&perfuncstate->resulttypeByVal);
1982 
1983 		/*
1984 		 * If it's really just a plain aggregate function, we'll emulate the
1985 		 * Agg environment for it.
1986 		 */
1987 		perfuncstate->plain_agg = wfunc->winagg;
1988 		if (wfunc->winagg)
1989 		{
1990 			WindowStatePerAgg peraggstate;
1991 
1992 			perfuncstate->aggno = ++aggno;
1993 			peraggstate = &winstate->peragg[aggno];
1994 			initialize_peragg(winstate, wfunc, peraggstate);
1995 			peraggstate->wfuncno = wfuncno;
1996 		}
1997 		else
1998 		{
1999 			WindowObject winobj = makeNode(WindowObjectData);
2000 
2001 			winobj->winstate = winstate;
2002 			winobj->argstates = wfuncstate->args;
2003 			winobj->localmem = NULL;
2004 			perfuncstate->winobj = winobj;
2005 		}
2006 	}
2007 
2008 	/* Update numfuncs, numaggs to match number of unique functions found */
2009 	winstate->numfuncs = wfuncno + 1;
2010 	winstate->numaggs = aggno + 1;
2011 
2012 	/* Set up WindowObject for aggregates, if needed */
2013 	if (winstate->numaggs > 0)
2014 	{
2015 		WindowObject agg_winobj = makeNode(WindowObjectData);
2016 
2017 		agg_winobj->winstate = winstate;
2018 		agg_winobj->argstates = NIL;
2019 		agg_winobj->localmem = NULL;
2020 		/* make sure markptr = -1 to invalidate. It may not get used */
2021 		agg_winobj->markptr = -1;
2022 		agg_winobj->readptr = -1;
2023 		winstate->agg_winobj = agg_winobj;
2024 	}
2025 
2026 	/* copy frame options to state node for easy access */
2027 	winstate->frameOptions = node->frameOptions;
2028 
2029 	/* initialize frame bound offset expressions */
2030 	winstate->startOffset = ExecInitExpr((Expr *) node->startOffset,
2031 										 (PlanState *) winstate);
2032 	winstate->endOffset = ExecInitExpr((Expr *) node->endOffset,
2033 									   (PlanState *) winstate);
2034 
2035 	winstate->all_first = true;
2036 	winstate->partition_spooled = false;
2037 	winstate->more_partitions = false;
2038 
2039 	return winstate;
2040 }
2041 
2042 /* -----------------
2043  * ExecEndWindowAgg
2044  * -----------------
2045  */
2046 void
ExecEndWindowAgg(WindowAggState * node)2047 ExecEndWindowAgg(WindowAggState *node)
2048 {
2049 	PlanState  *outerPlan;
2050 	int			i;
2051 
2052 	release_partition(node);
2053 
2054 	ExecClearTuple(node->ss.ss_ScanTupleSlot);
2055 	ExecClearTuple(node->first_part_slot);
2056 	ExecClearTuple(node->agg_row_slot);
2057 	ExecClearTuple(node->temp_slot_1);
2058 	ExecClearTuple(node->temp_slot_2);
2059 
2060 	/*
2061 	 * Free both the expr contexts.
2062 	 */
2063 	ExecFreeExprContext(&node->ss.ps);
2064 	node->ss.ps.ps_ExprContext = node->tmpcontext;
2065 	ExecFreeExprContext(&node->ss.ps);
2066 
2067 	for (i = 0; i < node->numaggs; i++)
2068 	{
2069 		if (node->peragg[i].aggcontext != node->aggcontext)
2070 			MemoryContextDelete(node->peragg[i].aggcontext);
2071 	}
2072 	MemoryContextDelete(node->partcontext);
2073 	MemoryContextDelete(node->aggcontext);
2074 
2075 	pfree(node->perfunc);
2076 	pfree(node->peragg);
2077 
2078 	outerPlan = outerPlanState(node);
2079 	ExecEndNode(outerPlan);
2080 }
2081 
2082 /* -----------------
2083  * ExecReScanWindowAgg
2084  * -----------------
2085  */
2086 void
ExecReScanWindowAgg(WindowAggState * node)2087 ExecReScanWindowAgg(WindowAggState *node)
2088 {
2089 	PlanState  *outerPlan = outerPlanState(node);
2090 	ExprContext *econtext = node->ss.ps.ps_ExprContext;
2091 
2092 	node->all_done = false;
2093 
2094 	node->ss.ps.ps_TupFromTlist = false;
2095 	node->all_first = true;
2096 
2097 	/* release tuplestore et al */
2098 	release_partition(node);
2099 
2100 	/* release all temp tuples, but especially first_part_slot */
2101 	ExecClearTuple(node->ss.ss_ScanTupleSlot);
2102 	ExecClearTuple(node->first_part_slot);
2103 	ExecClearTuple(node->agg_row_slot);
2104 	ExecClearTuple(node->temp_slot_1);
2105 	ExecClearTuple(node->temp_slot_2);
2106 
2107 	/* Forget current wfunc values */
2108 	MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * node->numfuncs);
2109 	MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * node->numfuncs);
2110 
2111 	/*
2112 	 * if chgParam of subnode is not null then plan will be re-scanned by
2113 	 * first ExecProcNode.
2114 	 */
2115 	if (outerPlan->chgParam == NULL)
2116 		ExecReScan(outerPlan);
2117 }
2118 
2119 /*
2120  * initialize_peragg
2121  *
2122  * Almost same as in nodeAgg.c, except we don't support DISTINCT currently.
2123  */
2124 static WindowStatePerAggData *
initialize_peragg(WindowAggState * winstate,WindowFunc * wfunc,WindowStatePerAgg peraggstate)2125 initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
2126 				  WindowStatePerAgg peraggstate)
2127 {
2128 	Oid			inputTypes[FUNC_MAX_ARGS];
2129 	int			numArguments;
2130 	HeapTuple	aggTuple;
2131 	Form_pg_aggregate aggform;
2132 	Oid			aggtranstype;
2133 	AttrNumber	initvalAttNo;
2134 	AclResult	aclresult;
2135 	Oid			transfn_oid,
2136 				invtransfn_oid,
2137 				finalfn_oid;
2138 	bool		finalextra;
2139 	Expr	   *transfnexpr,
2140 			   *invtransfnexpr,
2141 			   *finalfnexpr;
2142 	Datum		textInitVal;
2143 	int			i;
2144 	ListCell   *lc;
2145 
2146 	numArguments = list_length(wfunc->args);
2147 
2148 	i = 0;
2149 	foreach(lc, wfunc->args)
2150 	{
2151 		inputTypes[i++] = exprType((Node *) lfirst(lc));
2152 	}
2153 
2154 	aggTuple = SearchSysCache1(AGGFNOID, ObjectIdGetDatum(wfunc->winfnoid));
2155 	if (!HeapTupleIsValid(aggTuple))
2156 		elog(ERROR, "cache lookup failed for aggregate %u",
2157 			 wfunc->winfnoid);
2158 	aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
2159 
2160 	/*
2161 	 * Figure out whether we want to use the moving-aggregate implementation,
2162 	 * and collect the right set of fields from the pg_attribute entry.
2163 	 *
2164 	 * If the frame head can't move, we don't need moving-aggregate code. Even
2165 	 * if we'd like to use it, don't do so if the aggregate's arguments (and
2166 	 * FILTER clause if any) contain any calls to volatile functions.
2167 	 * Otherwise, the difference between restarting and not restarting the
2168 	 * aggregation would be user-visible.
2169 	 */
2170 	if (OidIsValid(aggform->aggminvtransfn) &&
2171 		!(winstate->frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING) &&
2172 		!contain_volatile_functions((Node *) wfunc))
2173 	{
2174 		peraggstate->transfn_oid = transfn_oid = aggform->aggmtransfn;
2175 		peraggstate->invtransfn_oid = invtransfn_oid = aggform->aggminvtransfn;
2176 		peraggstate->finalfn_oid = finalfn_oid = aggform->aggmfinalfn;
2177 		finalextra = aggform->aggmfinalextra;
2178 		aggtranstype = aggform->aggmtranstype;
2179 		initvalAttNo = Anum_pg_aggregate_aggminitval;
2180 	}
2181 	else
2182 	{
2183 		peraggstate->transfn_oid = transfn_oid = aggform->aggtransfn;
2184 		peraggstate->invtransfn_oid = invtransfn_oid = InvalidOid;
2185 		peraggstate->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
2186 		finalextra = aggform->aggfinalextra;
2187 		aggtranstype = aggform->aggtranstype;
2188 		initvalAttNo = Anum_pg_aggregate_agginitval;
2189 	}
2190 
2191 	/*
2192 	 * ExecInitWindowAgg already checked permission to call aggregate function
2193 	 * ... but we still need to check the component functions
2194 	 */
2195 
2196 	/* Check that aggregate owner has permission to call component fns */
2197 	{
2198 		HeapTuple	procTuple;
2199 		Oid			aggOwner;
2200 
2201 		procTuple = SearchSysCache1(PROCOID,
2202 									ObjectIdGetDatum(wfunc->winfnoid));
2203 		if (!HeapTupleIsValid(procTuple))
2204 			elog(ERROR, "cache lookup failed for function %u",
2205 				 wfunc->winfnoid);
2206 		aggOwner = ((Form_pg_proc) GETSTRUCT(procTuple))->proowner;
2207 		ReleaseSysCache(procTuple);
2208 
2209 		aclresult = pg_proc_aclcheck(transfn_oid, aggOwner,
2210 									 ACL_EXECUTE);
2211 		if (aclresult != ACLCHECK_OK)
2212 			aclcheck_error(aclresult, ACL_KIND_PROC,
2213 						   get_func_name(transfn_oid));
2214 		InvokeFunctionExecuteHook(transfn_oid);
2215 
2216 		if (OidIsValid(invtransfn_oid))
2217 		{
2218 			aclresult = pg_proc_aclcheck(invtransfn_oid, aggOwner,
2219 										 ACL_EXECUTE);
2220 			if (aclresult != ACLCHECK_OK)
2221 				aclcheck_error(aclresult, ACL_KIND_PROC,
2222 							   get_func_name(invtransfn_oid));
2223 			InvokeFunctionExecuteHook(invtransfn_oid);
2224 		}
2225 
2226 		if (OidIsValid(finalfn_oid))
2227 		{
2228 			aclresult = pg_proc_aclcheck(finalfn_oid, aggOwner,
2229 										 ACL_EXECUTE);
2230 			if (aclresult != ACLCHECK_OK)
2231 				aclcheck_error(aclresult, ACL_KIND_PROC,
2232 							   get_func_name(finalfn_oid));
2233 			InvokeFunctionExecuteHook(finalfn_oid);
2234 		}
2235 	}
2236 
2237 	/* Detect how many arguments to pass to the finalfn */
2238 	if (finalextra)
2239 		peraggstate->numFinalArgs = numArguments + 1;
2240 	else
2241 		peraggstate->numFinalArgs = 1;
2242 
2243 	/* resolve actual type of transition state, if polymorphic */
2244 	aggtranstype = resolve_aggregate_transtype(wfunc->winfnoid,
2245 											   aggtranstype,
2246 											   inputTypes,
2247 											   numArguments);
2248 
2249 	/* build expression trees using actual argument & result types */
2250 	build_aggregate_transfn_expr(inputTypes,
2251 								 numArguments,
2252 								 0,		/* no ordered-set window functions yet */
2253 								 false, /* no variadic window functions yet */
2254 								 aggtranstype,
2255 								 wfunc->inputcollid,
2256 								 transfn_oid,
2257 								 invtransfn_oid,
2258 								 &transfnexpr,
2259 								 &invtransfnexpr);
2260 
2261 	/* set up infrastructure for calling the transfn(s) and finalfn */
2262 	fmgr_info(transfn_oid, &peraggstate->transfn);
2263 	fmgr_info_set_expr((Node *) transfnexpr, &peraggstate->transfn);
2264 
2265 	if (OidIsValid(invtransfn_oid))
2266 	{
2267 		fmgr_info(invtransfn_oid, &peraggstate->invtransfn);
2268 		fmgr_info_set_expr((Node *) invtransfnexpr, &peraggstate->invtransfn);
2269 	}
2270 
2271 	if (OidIsValid(finalfn_oid))
2272 	{
2273 		build_aggregate_finalfn_expr(inputTypes,
2274 									 peraggstate->numFinalArgs,
2275 									 aggtranstype,
2276 									 wfunc->wintype,
2277 									 wfunc->inputcollid,
2278 									 finalfn_oid,
2279 									 &finalfnexpr);
2280 		fmgr_info(finalfn_oid, &peraggstate->finalfn);
2281 		fmgr_info_set_expr((Node *) finalfnexpr, &peraggstate->finalfn);
2282 	}
2283 
2284 	/* get info about relevant datatypes */
2285 	get_typlenbyval(wfunc->wintype,
2286 					&peraggstate->resulttypeLen,
2287 					&peraggstate->resulttypeByVal);
2288 	get_typlenbyval(aggtranstype,
2289 					&peraggstate->transtypeLen,
2290 					&peraggstate->transtypeByVal);
2291 
2292 	/*
2293 	 * initval is potentially null, so don't try to access it as a struct
2294 	 * field. Must do it the hard way with SysCacheGetAttr.
2295 	 */
2296 	textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple, initvalAttNo,
2297 								  &peraggstate->initValueIsNull);
2298 
2299 	if (peraggstate->initValueIsNull)
2300 		peraggstate->initValue = (Datum) 0;
2301 	else
2302 		peraggstate->initValue = GetAggInitVal(textInitVal,
2303 											   aggtranstype);
2304 
2305 	/*
2306 	 * If the transfn is strict and the initval is NULL, make sure input type
2307 	 * and transtype are the same (or at least binary-compatible), so that
2308 	 * it's OK to use the first input value as the initial transValue.  This
2309 	 * should have been checked at agg definition time, but we must check
2310 	 * again in case the transfn's strictness property has been changed.
2311 	 */
2312 	if (peraggstate->transfn.fn_strict && peraggstate->initValueIsNull)
2313 	{
2314 		if (numArguments < 1 ||
2315 			!IsBinaryCoercible(inputTypes[0], aggtranstype))
2316 			ereport(ERROR,
2317 					(errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
2318 					 errmsg("aggregate %u needs to have compatible input type and transition type",
2319 							wfunc->winfnoid)));
2320 	}
2321 
2322 	/*
2323 	 * Insist that forward and inverse transition functions have the same
2324 	 * strictness setting.  Allowing them to differ would require handling
2325 	 * more special cases in advance_windowaggregate and
2326 	 * advance_windowaggregate_base, for no discernible benefit.  This should
2327 	 * have been checked at agg definition time, but we must check again in
2328 	 * case either function's strictness property has been changed.
2329 	 */
2330 	if (OidIsValid(invtransfn_oid) &&
2331 		peraggstate->transfn.fn_strict != peraggstate->invtransfn.fn_strict)
2332 		ereport(ERROR,
2333 				(errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
2334 				 errmsg("strictness of aggregate's forward and inverse transition functions must match")));
2335 
2336 	/*
2337 	 * Moving aggregates use their own aggcontext.
2338 	 *
2339 	 * This is necessary because they might restart at different times, so we
2340 	 * might never be able to reset the shared context otherwise.  We can't
2341 	 * make it the aggregates' responsibility to clean up after themselves,
2342 	 * because strict aggregates must be restarted whenever we remove their
2343 	 * last non-NULL input, which the aggregate won't be aware is happening.
2344 	 * Also, just pfree()ing the transValue upon restarting wouldn't help,
2345 	 * since we'd miss any indirectly referenced data.  We could, in theory,
2346 	 * make the memory allocation rules for moving aggregates different than
2347 	 * they have historically been for plain aggregates, but that seems grotty
2348 	 * and likely to lead to memory leaks.
2349 	 */
2350 	if (OidIsValid(invtransfn_oid))
2351 		peraggstate->aggcontext =
2352 			AllocSetContextCreate(CurrentMemoryContext,
2353 								  "WindowAgg Per Aggregate",
2354 								  ALLOCSET_DEFAULT_SIZES);
2355 	else
2356 		peraggstate->aggcontext = winstate->aggcontext;
2357 
2358 	ReleaseSysCache(aggTuple);
2359 
2360 	return peraggstate;
2361 }
2362 
2363 static Datum
GetAggInitVal(Datum textInitVal,Oid transtype)2364 GetAggInitVal(Datum textInitVal, Oid transtype)
2365 {
2366 	Oid			typinput,
2367 				typioparam;
2368 	char	   *strInitVal;
2369 	Datum		initVal;
2370 
2371 	getTypeInputInfo(transtype, &typinput, &typioparam);
2372 	strInitVal = TextDatumGetCString(textInitVal);
2373 	initVal = OidInputFunctionCall(typinput, strInitVal,
2374 								   typioparam, -1);
2375 	pfree(strInitVal);
2376 	return initVal;
2377 }
2378 
2379 /*
2380  * are_peers
2381  * compare two rows to see if they are equal according to the ORDER BY clause
2382  *
2383  * NB: this does not consider the window frame mode.
2384  */
2385 static bool
are_peers(WindowAggState * winstate,TupleTableSlot * slot1,TupleTableSlot * slot2)2386 are_peers(WindowAggState *winstate, TupleTableSlot *slot1,
2387 		  TupleTableSlot *slot2)
2388 {
2389 	WindowAgg  *node = (WindowAgg *) winstate->ss.ps.plan;
2390 
2391 	/* If no ORDER BY, all rows are peers with each other */
2392 	if (node->ordNumCols == 0)
2393 		return true;
2394 
2395 	return execTuplesMatch(slot1, slot2,
2396 						   node->ordNumCols, node->ordColIdx,
2397 						   winstate->ordEqfunctions,
2398 						   winstate->tmpcontext->ecxt_per_tuple_memory);
2399 }
2400 
2401 /*
2402  * window_gettupleslot
2403  *	Fetch the pos'th tuple of the current partition into the slot,
2404  *	using the winobj's read pointer
2405  *
2406  * Returns true if successful, false if no such row
2407  */
2408 static bool
window_gettupleslot(WindowObject winobj,int64 pos,TupleTableSlot * slot)2409 window_gettupleslot(WindowObject winobj, int64 pos, TupleTableSlot *slot)
2410 {
2411 	WindowAggState *winstate = winobj->winstate;
2412 	MemoryContext oldcontext;
2413 
2414 	/* Don't allow passing -1 to spool_tuples here */
2415 	if (pos < 0)
2416 		return false;
2417 
2418 	/* If necessary, fetch the tuple into the spool */
2419 	spool_tuples(winstate, pos);
2420 
2421 	if (pos >= winstate->spooled_rows)
2422 		return false;
2423 
2424 	if (pos < winobj->markpos)
2425 		elog(ERROR, "cannot fetch row before WindowObject's mark position");
2426 
2427 	oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
2428 
2429 	tuplestore_select_read_pointer(winstate->buffer, winobj->readptr);
2430 
2431 	/*
2432 	 * Advance or rewind until we are within one tuple of the one we want.
2433 	 */
2434 	if (winobj->seekpos < pos - 1)
2435 	{
2436 		if (!tuplestore_skiptuples(winstate->buffer,
2437 								   pos - 1 - winobj->seekpos,
2438 								   true))
2439 			elog(ERROR, "unexpected end of tuplestore");
2440 		winobj->seekpos = pos - 1;
2441 	}
2442 	else if (winobj->seekpos > pos + 1)
2443 	{
2444 		if (!tuplestore_skiptuples(winstate->buffer,
2445 								   winobj->seekpos - (pos + 1),
2446 								   false))
2447 			elog(ERROR, "unexpected end of tuplestore");
2448 		winobj->seekpos = pos + 1;
2449 	}
2450 	else if (winobj->seekpos == pos)
2451 	{
2452 		/*
2453 		 * There's no API to refetch the tuple at the current position.  We
2454 		 * have to move one tuple forward, and then one backward.  (We don't
2455 		 * do it the other way because we might try to fetch the row before
2456 		 * our mark, which isn't allowed.)  XXX this case could stand to be
2457 		 * optimized.
2458 		 */
2459 		tuplestore_advance(winstate->buffer, true);
2460 		winobj->seekpos++;
2461 	}
2462 
2463 	/*
2464 	 * Now we should be on the tuple immediately before or after the one we
2465 	 * want, so just fetch forwards or backwards as appropriate.
2466 	 */
2467 	if (winobj->seekpos > pos)
2468 	{
2469 		if (!tuplestore_gettupleslot(winstate->buffer, false, true, slot))
2470 			elog(ERROR, "unexpected end of tuplestore");
2471 		winobj->seekpos--;
2472 	}
2473 	else
2474 	{
2475 		if (!tuplestore_gettupleslot(winstate->buffer, true, true, slot))
2476 			elog(ERROR, "unexpected end of tuplestore");
2477 		winobj->seekpos++;
2478 	}
2479 
2480 	Assert(winobj->seekpos == pos);
2481 
2482 	MemoryContextSwitchTo(oldcontext);
2483 
2484 	return true;
2485 }
2486 
2487 
2488 /***********************************************************************
2489  * API exposed to window functions
2490  ***********************************************************************/
2491 
2492 
2493 /*
2494  * WinGetPartitionLocalMemory
2495  *		Get working memory that lives till end of partition processing
2496  *
2497  * On first call within a given partition, this allocates and zeroes the
2498  * requested amount of space.  Subsequent calls just return the same chunk.
2499  *
2500  * Memory obtained this way is normally used to hold state that should be
2501  * automatically reset for each new partition.  If a window function wants
2502  * to hold state across the whole query, fcinfo->fn_extra can be used in the
2503  * usual way for that.
2504  */
2505 void *
WinGetPartitionLocalMemory(WindowObject winobj,Size sz)2506 WinGetPartitionLocalMemory(WindowObject winobj, Size sz)
2507 {
2508 	Assert(WindowObjectIsValid(winobj));
2509 	if (winobj->localmem == NULL)
2510 		winobj->localmem =
2511 			MemoryContextAllocZero(winobj->winstate->partcontext, sz);
2512 	return winobj->localmem;
2513 }
2514 
2515 /*
2516  * WinGetCurrentPosition
2517  *		Return the current row's position (counting from 0) within the current
2518  *		partition.
2519  */
2520 int64
WinGetCurrentPosition(WindowObject winobj)2521 WinGetCurrentPosition(WindowObject winobj)
2522 {
2523 	Assert(WindowObjectIsValid(winobj));
2524 	return winobj->winstate->currentpos;
2525 }
2526 
2527 /*
2528  * WinGetPartitionRowCount
2529  *		Return total number of rows contained in the current partition.
2530  *
2531  * Note: this is a relatively expensive operation because it forces the
2532  * whole partition to be "spooled" into the tuplestore at once.  Once
2533  * executed, however, additional calls within the same partition are cheap.
2534  */
2535 int64
WinGetPartitionRowCount(WindowObject winobj)2536 WinGetPartitionRowCount(WindowObject winobj)
2537 {
2538 	Assert(WindowObjectIsValid(winobj));
2539 	spool_tuples(winobj->winstate, -1);
2540 	return winobj->winstate->spooled_rows;
2541 }
2542 
2543 /*
2544  * WinSetMarkPosition
2545  *		Set the "mark" position for the window object, which is the oldest row
2546  *		number (counting from 0) it is allowed to fetch during all subsequent
2547  *		operations within the current partition.
2548  *
2549  * Window functions do not have to call this, but are encouraged to move the
2550  * mark forward when possible to keep the tuplestore size down and prevent
2551  * having to spill rows to disk.
2552  */
2553 void
WinSetMarkPosition(WindowObject winobj,int64 markpos)2554 WinSetMarkPosition(WindowObject winobj, int64 markpos)
2555 {
2556 	WindowAggState *winstate;
2557 
2558 	Assert(WindowObjectIsValid(winobj));
2559 	winstate = winobj->winstate;
2560 
2561 	if (markpos < winobj->markpos)
2562 		elog(ERROR, "cannot move WindowObject's mark position backward");
2563 	tuplestore_select_read_pointer(winstate->buffer, winobj->markptr);
2564 	if (markpos > winobj->markpos)
2565 	{
2566 		tuplestore_skiptuples(winstate->buffer,
2567 							  markpos - winobj->markpos,
2568 							  true);
2569 		winobj->markpos = markpos;
2570 	}
2571 	tuplestore_select_read_pointer(winstate->buffer, winobj->readptr);
2572 	if (markpos > winobj->seekpos)
2573 	{
2574 		tuplestore_skiptuples(winstate->buffer,
2575 							  markpos - winobj->seekpos,
2576 							  true);
2577 		winobj->seekpos = markpos;
2578 	}
2579 }
2580 
2581 /*
2582  * WinRowsArePeers
2583  *		Compare two rows (specified by absolute position in window) to see
2584  *		if they are equal according to the ORDER BY clause.
2585  *
2586  * NB: this does not consider the window frame mode.
2587  */
2588 bool
WinRowsArePeers(WindowObject winobj,int64 pos1,int64 pos2)2589 WinRowsArePeers(WindowObject winobj, int64 pos1, int64 pos2)
2590 {
2591 	WindowAggState *winstate;
2592 	WindowAgg  *node;
2593 	TupleTableSlot *slot1;
2594 	TupleTableSlot *slot2;
2595 	bool		res;
2596 
2597 	Assert(WindowObjectIsValid(winobj));
2598 	winstate = winobj->winstate;
2599 	node = (WindowAgg *) winstate->ss.ps.plan;
2600 
2601 	/* If no ORDER BY, all rows are peers; don't bother to fetch them */
2602 	if (node->ordNumCols == 0)
2603 		return true;
2604 
2605 	slot1 = winstate->temp_slot_1;
2606 	slot2 = winstate->temp_slot_2;
2607 
2608 	if (!window_gettupleslot(winobj, pos1, slot1))
2609 		elog(ERROR, "specified position is out of window: " INT64_FORMAT,
2610 			 pos1);
2611 	if (!window_gettupleslot(winobj, pos2, slot2))
2612 		elog(ERROR, "specified position is out of window: " INT64_FORMAT,
2613 			 pos2);
2614 
2615 	res = are_peers(winstate, slot1, slot2);
2616 
2617 	ExecClearTuple(slot1);
2618 	ExecClearTuple(slot2);
2619 
2620 	return res;
2621 }
2622 
2623 /*
2624  * WinGetFuncArgInPartition
2625  *		Evaluate a window function's argument expression on a specified
2626  *		row of the partition.  The row is identified in lseek(2) style,
2627  *		i.e. relative to the current, first, or last row.
2628  *
2629  * argno: argument number to evaluate (counted from 0)
2630  * relpos: signed rowcount offset from the seek position
2631  * seektype: WINDOW_SEEK_CURRENT, WINDOW_SEEK_HEAD, or WINDOW_SEEK_TAIL
2632  * set_mark: If the row is found and set_mark is true, the mark is moved to
2633  *		the row as a side-effect.
2634  * isnull: output argument, receives isnull status of result
2635  * isout: output argument, set to indicate whether target row position
2636  *		is out of partition (can pass NULL if caller doesn't care about this)
2637  *
2638  * Specifying a nonexistent row is not an error, it just causes a null result
2639  * (plus setting *isout true, if isout isn't NULL).
2640  */
2641 Datum
WinGetFuncArgInPartition(WindowObject winobj,int argno,int relpos,int seektype,bool set_mark,bool * isnull,bool * isout)2642 WinGetFuncArgInPartition(WindowObject winobj, int argno,
2643 						 int relpos, int seektype, bool set_mark,
2644 						 bool *isnull, bool *isout)
2645 {
2646 	WindowAggState *winstate;
2647 	ExprContext *econtext;
2648 	TupleTableSlot *slot;
2649 	bool		gottuple;
2650 	int64		abs_pos;
2651 
2652 	Assert(WindowObjectIsValid(winobj));
2653 	winstate = winobj->winstate;
2654 	econtext = winstate->ss.ps.ps_ExprContext;
2655 	slot = winstate->temp_slot_1;
2656 
2657 	switch (seektype)
2658 	{
2659 		case WINDOW_SEEK_CURRENT:
2660 			abs_pos = winstate->currentpos + relpos;
2661 			break;
2662 		case WINDOW_SEEK_HEAD:
2663 			abs_pos = relpos;
2664 			break;
2665 		case WINDOW_SEEK_TAIL:
2666 			spool_tuples(winstate, -1);
2667 			abs_pos = winstate->spooled_rows - 1 + relpos;
2668 			break;
2669 		default:
2670 			elog(ERROR, "unrecognized window seek type: %d", seektype);
2671 			abs_pos = 0;		/* keep compiler quiet */
2672 			break;
2673 	}
2674 
2675 	gottuple = window_gettupleslot(winobj, abs_pos, slot);
2676 
2677 	if (!gottuple)
2678 	{
2679 		if (isout)
2680 			*isout = true;
2681 		*isnull = true;
2682 		return (Datum) 0;
2683 	}
2684 	else
2685 	{
2686 		if (isout)
2687 			*isout = false;
2688 		if (set_mark)
2689 		{
2690 			int			frameOptions = winstate->frameOptions;
2691 			int64		mark_pos = abs_pos;
2692 
2693 			/*
2694 			 * In RANGE mode with a moving frame head, we must not let the
2695 			 * mark advance past frameheadpos, since that row has to be
2696 			 * fetchable during future update_frameheadpos calls.
2697 			 *
2698 			 * XXX it is very ugly to pollute window functions' marks with
2699 			 * this consideration; it could for instance mask a logic bug that
2700 			 * lets a window function fetch rows before what it had claimed
2701 			 * was its mark.  Perhaps use a separate mark for frame head
2702 			 * probes?
2703 			 */
2704 			if ((frameOptions & FRAMEOPTION_RANGE) &&
2705 				!(frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING))
2706 			{
2707 				update_frameheadpos(winobj, winstate->temp_slot_2);
2708 				if (mark_pos > winstate->frameheadpos)
2709 					mark_pos = winstate->frameheadpos;
2710 			}
2711 			WinSetMarkPosition(winobj, mark_pos);
2712 		}
2713 		econtext->ecxt_outertuple = slot;
2714 		return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
2715 							econtext, isnull, NULL);
2716 	}
2717 }
2718 
2719 /*
2720  * WinGetFuncArgInFrame
2721  *		Evaluate a window function's argument expression on a specified
2722  *		row of the window frame.  The row is identified in lseek(2) style,
2723  *		i.e. relative to the current, first, or last row.
2724  *
2725  * argno: argument number to evaluate (counted from 0)
2726  * relpos: signed rowcount offset from the seek position
2727  * seektype: WINDOW_SEEK_CURRENT, WINDOW_SEEK_HEAD, or WINDOW_SEEK_TAIL
2728  * set_mark: If the row is found and set_mark is true, the mark is moved to
2729  *		the row as a side-effect.
2730  * isnull: output argument, receives isnull status of result
2731  * isout: output argument, set to indicate whether target row position
2732  *		is out of frame (can pass NULL if caller doesn't care about this)
2733  *
2734  * Specifying a nonexistent row is not an error, it just causes a null result
2735  * (plus setting *isout true, if isout isn't NULL).
2736  */
2737 Datum
WinGetFuncArgInFrame(WindowObject winobj,int argno,int relpos,int seektype,bool set_mark,bool * isnull,bool * isout)2738 WinGetFuncArgInFrame(WindowObject winobj, int argno,
2739 					 int relpos, int seektype, bool set_mark,
2740 					 bool *isnull, bool *isout)
2741 {
2742 	WindowAggState *winstate;
2743 	ExprContext *econtext;
2744 	TupleTableSlot *slot;
2745 	bool		gottuple;
2746 	int64		abs_pos;
2747 
2748 	Assert(WindowObjectIsValid(winobj));
2749 	winstate = winobj->winstate;
2750 	econtext = winstate->ss.ps.ps_ExprContext;
2751 	slot = winstate->temp_slot_1;
2752 
2753 	switch (seektype)
2754 	{
2755 		case WINDOW_SEEK_CURRENT:
2756 			abs_pos = winstate->currentpos + relpos;
2757 			break;
2758 		case WINDOW_SEEK_HEAD:
2759 			update_frameheadpos(winobj, slot);
2760 			abs_pos = winstate->frameheadpos + relpos;
2761 			break;
2762 		case WINDOW_SEEK_TAIL:
2763 			update_frametailpos(winobj, slot);
2764 			abs_pos = winstate->frametailpos + relpos;
2765 			break;
2766 		default:
2767 			elog(ERROR, "unrecognized window seek type: %d", seektype);
2768 			abs_pos = 0;		/* keep compiler quiet */
2769 			break;
2770 	}
2771 
2772 	gottuple = window_gettupleslot(winobj, abs_pos, slot);
2773 	if (gottuple)
2774 		gottuple = row_is_in_frame(winstate, abs_pos, slot);
2775 
2776 	if (!gottuple)
2777 	{
2778 		if (isout)
2779 			*isout = true;
2780 		*isnull = true;
2781 		return (Datum) 0;
2782 	}
2783 	else
2784 	{
2785 		if (isout)
2786 			*isout = false;
2787 		if (set_mark)
2788 		{
2789 			int			frameOptions = winstate->frameOptions;
2790 			int64		mark_pos = abs_pos;
2791 
2792 			/*
2793 			 * In RANGE mode with a moving frame head, we must not let the
2794 			 * mark advance past frameheadpos, since that row has to be
2795 			 * fetchable during future update_frameheadpos calls.
2796 			 *
2797 			 * XXX it is very ugly to pollute window functions' marks with
2798 			 * this consideration; it could for instance mask a logic bug that
2799 			 * lets a window function fetch rows before what it had claimed
2800 			 * was its mark.  Perhaps use a separate mark for frame head
2801 			 * probes?
2802 			 */
2803 			if ((frameOptions & FRAMEOPTION_RANGE) &&
2804 				!(frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING))
2805 			{
2806 				update_frameheadpos(winobj, winstate->temp_slot_2);
2807 				if (mark_pos > winstate->frameheadpos)
2808 					mark_pos = winstate->frameheadpos;
2809 			}
2810 			WinSetMarkPosition(winobj, mark_pos);
2811 		}
2812 		econtext->ecxt_outertuple = slot;
2813 		return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
2814 							econtext, isnull, NULL);
2815 	}
2816 }
2817 
2818 /*
2819  * WinGetFuncArgCurrent
2820  *		Evaluate a window function's argument expression on the current row.
2821  *
2822  * argno: argument number to evaluate (counted from 0)
2823  * isnull: output argument, receives isnull status of result
2824  *
2825  * Note: this isn't quite equivalent to WinGetFuncArgInPartition or
2826  * WinGetFuncArgInFrame targeting the current row, because it will succeed
2827  * even if the WindowObject's mark has been set beyond the current row.
2828  * This should generally be used for "ordinary" arguments of a window
2829  * function, such as the offset argument of lead() or lag().
2830  */
2831 Datum
WinGetFuncArgCurrent(WindowObject winobj,int argno,bool * isnull)2832 WinGetFuncArgCurrent(WindowObject winobj, int argno, bool *isnull)
2833 {
2834 	WindowAggState *winstate;
2835 	ExprContext *econtext;
2836 
2837 	Assert(WindowObjectIsValid(winobj));
2838 	winstate = winobj->winstate;
2839 
2840 	econtext = winstate->ss.ps.ps_ExprContext;
2841 
2842 	econtext->ecxt_outertuple = winstate->ss.ss_ScanTupleSlot;
2843 	return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
2844 						econtext, isnull, NULL);
2845 }
2846