1 /*------------------------------------------------------------------------- 2 * 3 * nodeWindowAgg.c 4 * routines to handle WindowAgg nodes. 5 * 6 * A WindowAgg node evaluates "window functions" across suitable partitions 7 * of the input tuple set. Any one WindowAgg works for just a single window 8 * specification, though it can evaluate multiple window functions sharing 9 * identical window specifications. The input tuples are required to be 10 * delivered in sorted order, with the PARTITION BY columns (if any) as 11 * major sort keys and the ORDER BY columns (if any) as minor sort keys. 12 * (The planner generates a stack of WindowAggs with intervening Sort nodes 13 * as needed, if a query involves more than one window specification.) 14 * 15 * Since window functions can require access to any or all of the rows in 16 * the current partition, we accumulate rows of the partition into a 17 * tuplestore. The window functions are called using the WindowObject API 18 * so that they can access those rows as needed. 19 * 20 * We also support using plain aggregate functions as window functions. 21 * For these, the regular Agg-node environment is emulated for each partition. 22 * As required by the SQL spec, the output represents the value of the 23 * aggregate function over all rows in the current row's window frame. 24 * 25 * 26 * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group 27 * Portions Copyright (c) 1994, Regents of the University of California 28 * 29 * IDENTIFICATION 30 * src/backend/executor/nodeWindowAgg.c 31 * 32 *------------------------------------------------------------------------- 33 */ 34 #include "postgres.h" 35 36 #include "access/htup_details.h" 37 #include "catalog/objectaccess.h" 38 #include "catalog/pg_aggregate.h" 39 #include "catalog/pg_proc.h" 40 #include "executor/executor.h" 41 #include "executor/nodeWindowAgg.h" 42 #include "miscadmin.h" 43 #include "nodes/nodeFuncs.h" 44 #include "optimizer/optimizer.h" 45 #include "parser/parse_agg.h" 46 #include "parser/parse_coerce.h" 47 #include "utils/acl.h" 48 #include "utils/builtins.h" 49 #include "utils/datum.h" 50 #include "utils/expandeddatum.h" 51 #include "utils/lsyscache.h" 52 #include "utils/memutils.h" 53 #include "utils/regproc.h" 54 #include "utils/syscache.h" 55 #include "windowapi.h" 56 57 /* 58 * All the window function APIs are called with this object, which is passed 59 * to window functions as fcinfo->context. 60 */ 61 typedef struct WindowObjectData 62 { 63 NodeTag type; 64 WindowAggState *winstate; /* parent WindowAggState */ 65 List *argstates; /* ExprState trees for fn's arguments */ TidExprListCreate(TidScanState * tidstate)66 void *localmem; /* WinGetPartitionLocalMemory's chunk */ 67 int markptr; /* tuplestore mark pointer for this fn */ 68 int readptr; /* tuplestore read pointer for this fn */ 69 int64 markpos; /* row that markptr is positioned on */ 70 int64 seekpos; /* row that readptr is positioned on */ 71 } WindowObjectData; 72 73 /* 74 * We have one WindowStatePerFunc struct for each window function and 75 * window aggregate handled by this node. 76 */ 77 typedef struct WindowStatePerFuncData 78 { 79 /* Links to WindowFunc expr and state nodes this working state is for */ 80 WindowFuncExprState *wfuncstate; 81 WindowFunc *wfunc; 82 83 int numArguments; /* number of arguments */ 84 85 FmgrInfo flinfo; /* fmgr lookup data for window function */ 86 87 Oid winCollation; /* collation derived for window function */ 88 89 /* 90 * We need the len and byval info for the result of each function in order 91 * to know how to copy/delete values. 92 */ 93 int16 resulttypeLen; 94 bool resulttypeByVal; 95 96 bool plain_agg; /* is it just a plain aggregate function? */ 97 int aggno; /* if so, index of its WindowStatePerAggData */ 98 99 WindowObject winobj; /* object used in window function API */ 100 } WindowStatePerFuncData; 101 102 /* 103 * For plain aggregate window functions, we also have one of these. 104 */ 105 typedef struct WindowStatePerAggData 106 { 107 /* Oids of transition functions */ 108 Oid transfn_oid; 109 Oid invtransfn_oid; /* may be InvalidOid */ 110 Oid finalfn_oid; /* may be InvalidOid */ 111 112 /* 113 * fmgr lookup data for transition functions --- only valid when 114 * corresponding oid is not InvalidOid. Note in particular that fn_strict 115 * flags are kept here. 116 */ 117 FmgrInfo transfn; 118 FmgrInfo invtransfn; 119 FmgrInfo finalfn; 120 121 int numFinalArgs; /* number of arguments to pass to finalfn */ 122 123 /* 124 * initial value from pg_aggregate entry 125 */ 126 Datum initValue; 127 bool initValueIsNull; 128 129 /* TidListEval(TidScanState * tidstate)130 * cached value for current frame boundaries 131 */ 132 Datum resultValue; 133 bool resultValueIsNull; 134 135 /* 136 * We need the len and byval info for the agg's input, result, and 137 * transition data types in order to know how to copy/delete values. 138 */ 139 int16 inputtypeLen, 140 resulttypeLen, 141 transtypeLen; 142 bool inputtypeByVal, 143 resulttypeByVal, 144 transtypeByVal; 145 146 int wfuncno; /* index of associated WindowStatePerFuncData */ 147 148 /* Context holding transition value and possibly other subsidiary data */ 149 MemoryContext aggcontext; /* may be private, or winstate->aggcontext */ 150 151 /* Current transition value */ 152 Datum transValue; /* current transition value */ 153 bool transValueIsNull; 154 155 int64 transValueCount; /* number of currently-aggregated rows */ 156 157 /* Data local to eval_windowaggregates() */ 158 bool restart; /* need to restart this agg in this cycle? */ 159 } WindowStatePerAggData; 160 161 static void initialize_windowaggregate(WindowAggState *winstate, 162 WindowStatePerFunc perfuncstate, 163 WindowStatePerAgg peraggstate); 164 static void advance_windowaggregate(WindowAggState *winstate, 165 WindowStatePerFunc perfuncstate, 166 WindowStatePerAgg peraggstate); 167 static bool advance_windowaggregate_base(WindowAggState *winstate, 168 WindowStatePerFunc perfuncstate, 169 WindowStatePerAgg peraggstate); 170 static void finalize_windowaggregate(WindowAggState *winstate, 171 WindowStatePerFunc perfuncstate, 172 WindowStatePerAgg peraggstate, 173 Datum *result, bool *isnull); 174 175 static void eval_windowaggregates(WindowAggState *winstate); 176 static void eval_windowfunction(WindowAggState *winstate, 177 WindowStatePerFunc perfuncstate, 178 Datum *result, bool *isnull); 179 180 static void begin_partition(WindowAggState *winstate); 181 static void spool_tuples(WindowAggState *winstate, int64 pos); 182 static void release_partition(WindowAggState *winstate); 183 184 static int row_is_in_frame(WindowAggState *winstate, int64 pos, 185 TupleTableSlot *slot); 186 static void update_frameheadpos(WindowAggState *winstate); 187 static void update_frametailpos(WindowAggState *winstate); 188 static void update_grouptailpos(WindowAggState *winstate); 189 190 static WindowStatePerAggData *initialize_peragg(WindowAggState *winstate, 191 WindowFunc *wfunc, 192 WindowStatePerAgg peraggstate); 193 static Datum GetAggInitVal(Datum textInitVal, Oid transtype); 194 195 static bool are_peers(WindowAggState *winstate, TupleTableSlot *slot1, 196 TupleTableSlot *slot2); 197 static bool window_gettupleslot(WindowObject winobj, int64 pos, 198 TupleTableSlot *slot); 199 200 201 /* 202 * initialize_windowaggregate 203 * parallel to initialize_aggregates in nodeAgg.c 204 */ 205 static void 206 initialize_windowaggregate(WindowAggState *winstate, 207 WindowStatePerFunc perfuncstate, 208 WindowStatePerAgg peraggstate) 209 { 210 MemoryContext oldContext; 211 212 /* 213 * If we're using a private aggcontext, we may reset it here. But if the 214 * context is shared, we don't know which other aggregates may still need 215 * it, so we must leave it to the caller to reset at an appropriate time. 216 */ 217 if (peraggstate->aggcontext != winstate->aggcontext) 218 MemoryContextResetAndDeleteChildren(peraggstate->aggcontext); 219 220 if (peraggstate->initValueIsNull) 221 peraggstate->transValue = peraggstate->initValue; 222 else 223 { 224 oldContext = MemoryContextSwitchTo(peraggstate->aggcontext); 225 peraggstate->transValue = datumCopy(peraggstate->initValue, 226 peraggstate->transtypeByVal, 227 peraggstate->transtypeLen); 228 MemoryContextSwitchTo(oldContext); 229 } 230 peraggstate->transValueIsNull = peraggstate->initValueIsNull; 231 peraggstate->transValueCount = 0; 232 peraggstate->resultValue = (Datum) 0; 233 peraggstate->resultValueIsNull = true; 234 } 235 236 /* 237 * advance_windowaggregate 238 * parallel to advance_aggregates in nodeAgg.c 239 */ 240 static void 241 advance_windowaggregate(WindowAggState *winstate, 242 WindowStatePerFunc perfuncstate, 243 WindowStatePerAgg peraggstate) 244 { 245 LOCAL_FCINFO(fcinfo, FUNC_MAX_ARGS); 246 WindowFuncExprState *wfuncstate = perfuncstate->wfuncstate; 247 int numArguments = perfuncstate->numArguments; 248 Datum newVal; 249 ListCell *arg; 250 int i; 251 MemoryContext oldContext; 252 ExprContext *econtext = winstate->tmpcontext; 253 ExprState *filter = wfuncstate->aggfilter; 254 255 oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory); 256 257 /* Skip anything FILTERed out */ 258 if (filter) 259 { 260 bool isnull; 261 Datum res = ExecEvalExpr(filter, econtext, &isnull); 262 263 if (isnull || !DatumGetBool(res)) 264 { 265 MemoryContextSwitchTo(oldContext); 266 return; 267 } 268 } 269 270 /* We start from 1, since the 0th arg will be the transition value */ 271 i = 1; 272 foreach(arg, wfuncstate->args) 273 { 274 ExprState *argstate = (ExprState *) lfirst(arg); 275 276 fcinfo->args[i].value = ExecEvalExpr(argstate, econtext, 277 &fcinfo->args[i].isnull); 278 i++; 279 } 280 itemptr_comparator(const void * a,const void * b)281 if (peraggstate->transfn.fn_strict) 282 { 283 /* 284 * For a strict transfn, nothing happens when there's a NULL input; we 285 * just keep the prior transValue. Note transValueCount doesn't 286 * change either. 287 */ 288 for (i = 1; i <= numArguments; i++) 289 { 290 if (fcinfo->args[i].isnull) 291 { 292 MemoryContextSwitchTo(oldContext); 293 return; 294 } 295 } 296 297 /* 298 * For strict transition functions with initial value NULL we use the 299 * first non-NULL input as the initial state. (We already checked 300 * that the agg's input type is binary-compatible with its transtype, 301 * so straight copy here is OK.) 302 * 303 * We must copy the datum into aggcontext if it is pass-by-ref. We do 304 * not need to pfree the old transValue, since it's NULL. 305 */ 306 if (peraggstate->transValueCount == 0 && peraggstate->transValueIsNull) 307 { 308 MemoryContextSwitchTo(peraggstate->aggcontext); 309 peraggstate->transValue = datumCopy(fcinfo->args[1].value, 310 peraggstate->transtypeByVal, 311 peraggstate->transtypeLen); 312 peraggstate->transValueIsNull = false; 313 peraggstate->transValueCount = 1; 314 MemoryContextSwitchTo(oldContext); 315 return; 316 } 317 318 if (peraggstate->transValueIsNull) 319 { 320 /* 321 * Don't call a strict function with NULL inputs. Note it is 322 * possible to get here despite the above tests, if the transfn is 323 * strict *and* returned a NULL on a prior cycle. If that happens 324 * we will propagate the NULL all the way to the end. That can 325 * only happen if there's no inverse transition function, though, 326 * since we disallow transitions back to NULL when there is one. 327 */ 328 MemoryContextSwitchTo(oldContext); 329 Assert(!OidIsValid(peraggstate->invtransfn_oid)); 330 return; 331 } 332 } 333 334 /* 335 * OK to call the transition function. Set winstate->curaggcontext while 336 * calling it, for possible use by AggCheckCallContext. 337 */ 338 InitFunctionCallInfoData(*fcinfo, &(peraggstate->transfn), 339 numArguments + 1, 340 perfuncstate->winCollation, 341 (void *) winstate, NULL); 342 fcinfo->args[0].value = peraggstate->transValue; 343 fcinfo->args[0].isnull = peraggstate->transValueIsNull; 344 winstate->curaggcontext = peraggstate->aggcontext; 345 newVal = FunctionCallInvoke(fcinfo); 346 winstate->curaggcontext = NULL; 347 348 /* 349 * Moving-aggregate transition functions must not return null, see 350 * advance_windowaggregate_base(). 351 */ 352 if (fcinfo->isnull && OidIsValid(peraggstate->invtransfn_oid)) 353 ereport(ERROR, 354 (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), 355 errmsg("moving-aggregate transition function must not return null"))); 356 357 /* 358 * We must track the number of rows included in transValue, since to 359 * remove the last input, advance_windowaggregate_base() mustn't call the 360 * inverse transition function, but simply reset transValue back to its 361 * initial value. 362 */ 363 peraggstate->transValueCount++; 364 365 /* 366 * If pass-by-ref datatype, must copy the new value into aggcontext and 367 * free the prior transValue. But if transfn returned a pointer to its 368 * first input, we don't need to do anything. Also, if transfn returned a 369 * pointer to a R/W expanded object that is already a child of the 370 * aggcontext, assume we can adopt that value without copying it. 371 */ 372 if (!peraggstate->transtypeByVal && 373 DatumGetPointer(newVal) != DatumGetPointer(peraggstate->transValue)) 374 { 375 if (!fcinfo->isnull) 376 { 377 MemoryContextSwitchTo(peraggstate->aggcontext); 378 if (DatumIsReadWriteExpandedObject(newVal, 379 false, 380 peraggstate->transtypeLen) && 381 MemoryContextGetParent(DatumGetEOHP(newVal)->eoh_context) == CurrentMemoryContext) 382 /* do nothing */ ; 383 else 384 newVal = datumCopy(newVal, 385 peraggstate->transtypeByVal, 386 peraggstate->transtypeLen); 387 } 388 if (!peraggstate->transValueIsNull) 389 { 390 if (DatumIsReadWriteExpandedObject(peraggstate->transValue, 391 false, 392 peraggstate->transtypeLen)) 393 DeleteExpandedObject(peraggstate->transValue); 394 else 395 pfree(DatumGetPointer(peraggstate->transValue)); 396 } 397 } 398 399 MemoryContextSwitchTo(oldContext); 400 peraggstate->transValue = newVal; TidRecheck(TidScanState * node,TupleTableSlot * slot)401 peraggstate->transValueIsNull = fcinfo->isnull; 402 } 403 404 /* 405 * advance_windowaggregate_base 406 * Remove the oldest tuple from an aggregation. 407 * 408 * This is very much like advance_windowaggregate, except that we will call 409 * the inverse transition function (which caller must have checked is 410 * available). 411 * 412 * Returns true if we successfully removed the current row from this 413 * aggregate, false if not (in the latter case, caller is responsible 414 * for cleaning up by restarting the aggregation). 415 */ 416 static bool 417 advance_windowaggregate_base(WindowAggState *winstate, 418 WindowStatePerFunc perfuncstate, 419 WindowStatePerAgg peraggstate) 420 { 421 LOCAL_FCINFO(fcinfo, FUNC_MAX_ARGS); 422 WindowFuncExprState *wfuncstate = perfuncstate->wfuncstate; 423 int numArguments = perfuncstate->numArguments; 424 Datum newVal; 425 ListCell *arg; 426 int i; 427 MemoryContext oldContext; 428 ExprContext *econtext = winstate->tmpcontext; 429 ExprState *filter = wfuncstate->aggfilter; 430 ExecTidScan(PlanState * pstate)431 oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory); 432 433 /* Skip anything FILTERed out */ 434 if (filter) 435 { 436 bool isnull; 437 Datum res = ExecEvalExpr(filter, econtext, &isnull); 438 439 if (isnull || !DatumGetBool(res)) 440 { 441 MemoryContextSwitchTo(oldContext); 442 return true; 443 } 444 } ExecReScanTidScan(TidScanState * node)445 446 /* We start from 1, since the 0th arg will be the transition value */ 447 i = 1; 448 foreach(arg, wfuncstate->args) 449 { 450 ExprState *argstate = (ExprState *) lfirst(arg); 451 452 fcinfo->args[i].value = ExecEvalExpr(argstate, econtext, 453 &fcinfo->args[i].isnull); 454 i++; 455 } 456 457 if (peraggstate->invtransfn.fn_strict) 458 { 459 /* 460 * For a strict (inv)transfn, nothing happens when there's a NULL 461 * input; we just keep the prior transValue. Note transValueCount 462 * doesn't change either. 463 */ 464 for (i = 1; i <= numArguments; i++) 465 { 466 if (fcinfo->args[i].isnull) 467 { ExecEndTidScan(TidScanState * node)468 MemoryContextSwitchTo(oldContext); 469 return true; 470 } 471 } 472 } 473 474 /* There should still be an added but not yet removed value */ 475 Assert(peraggstate->transValueCount > 0); 476 477 /* 478 * In moving-aggregate mode, the state must never be NULL, except possibly 479 * before any rows have been aggregated (which is surely not the case at 480 * this point). This restriction allows us to interpret a NULL result 481 * from the inverse function as meaning "sorry, can't do an inverse 482 * transition in this case". We already checked this in 483 * advance_windowaggregate, but just for safety, check again. 484 */ 485 if (peraggstate->transValueIsNull) 486 elog(ERROR, "aggregate transition value is NULL before inverse transition"); 487 488 /* 489 * We mustn't use the inverse transition function to remove the last 490 * input. Doing so would yield a non-NULL state, whereas we should be in 491 * the initial state afterwards which may very well be NULL. So instead, 492 * we simply re-initialize the aggregate in this case. 493 */ 494 if (peraggstate->transValueCount == 1) 495 { 496 MemoryContextSwitchTo(oldContext); 497 initialize_windowaggregate(winstate, ExecInitTidScan(TidScan * node,EState * estate,int eflags)498 &winstate->perfunc[peraggstate->wfuncno], 499 peraggstate); 500 return true; 501 } 502 503 /* 504 * OK to call the inverse transition function. Set 505 * winstate->curaggcontext while calling it, for possible use by 506 * AggCheckCallContext. 507 */ 508 InitFunctionCallInfoData(*fcinfo, &(peraggstate->invtransfn), 509 numArguments + 1, 510 perfuncstate->winCollation, 511 (void *) winstate, NULL); 512 fcinfo->args[0].value = peraggstate->transValue; 513 fcinfo->args[0].isnull = peraggstate->transValueIsNull; 514 winstate->curaggcontext = peraggstate->aggcontext; 515 newVal = FunctionCallInvoke(fcinfo); 516 winstate->curaggcontext = NULL; 517 518 /* 519 * If the function returns NULL, report failure, forcing a restart. 520 */ 521 if (fcinfo->isnull) 522 { 523 MemoryContextSwitchTo(oldContext); 524 return false; 525 } 526 527 /* Update number of rows included in transValue */ 528 peraggstate->transValueCount--; 529 530 /* 531 * If pass-by-ref datatype, must copy the new value into aggcontext and 532 * free the prior transValue. But if invtransfn returned a pointer to its 533 * first input, we don't need to do anything. Also, if invtransfn 534 * returned a pointer to a R/W expanded object that is already a child of 535 * the aggcontext, assume we can adopt that value without copying it. 536 * 537 * Note: the checks for null values here will never fire, but it seems 538 * best to have this stanza look just like advance_windowaggregate. 539 */ 540 if (!peraggstate->transtypeByVal && 541 DatumGetPointer(newVal) != DatumGetPointer(peraggstate->transValue)) 542 { 543 if (!fcinfo->isnull) 544 { 545 MemoryContextSwitchTo(peraggstate->aggcontext); 546 if (DatumIsReadWriteExpandedObject(newVal, 547 false, 548 peraggstate->transtypeLen) && 549 MemoryContextGetParent(DatumGetEOHP(newVal)->eoh_context) == CurrentMemoryContext) 550 /* do nothing */ ; 551 else 552 newVal = datumCopy(newVal, 553 peraggstate->transtypeByVal, 554 peraggstate->transtypeLen); 555 } 556 if (!peraggstate->transValueIsNull) 557 { 558 if (DatumIsReadWriteExpandedObject(peraggstate->transValue, 559 false, 560 peraggstate->transtypeLen)) 561 DeleteExpandedObject(peraggstate->transValue); 562 else 563 pfree(DatumGetPointer(peraggstate->transValue)); 564 } 565 } 566 567 MemoryContextSwitchTo(oldContext); 568 peraggstate->transValue = newVal; 569 peraggstate->transValueIsNull = fcinfo->isnull; 570 571 return true; 572 } 573 574 /* 575 * finalize_windowaggregate 576 * parallel to finalize_aggregate in nodeAgg.c 577 */ 578 static void 579 finalize_windowaggregate(WindowAggState *winstate, 580 WindowStatePerFunc perfuncstate, 581 WindowStatePerAgg peraggstate, 582 Datum *result, bool *isnull) 583 { 584 MemoryContext oldContext; 585 586 oldContext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory); 587 588 /* 589 * Apply the agg's finalfn if one is provided, else return transValue. 590 */ 591 if (OidIsValid(peraggstate->finalfn_oid)) 592 { 593 LOCAL_FCINFO(fcinfo, FUNC_MAX_ARGS); 594 int numFinalArgs = peraggstate->numFinalArgs; 595 bool anynull; 596 int i; 597 598 InitFunctionCallInfoData(fcinfodata.fcinfo, &(peraggstate->finalfn), 599 numFinalArgs, 600 perfuncstate->winCollation, 601 (void *) winstate, NULL); 602 fcinfo->args[0].value = 603 MakeExpandedObjectReadOnly(peraggstate->transValue, 604 peraggstate->transValueIsNull, 605 peraggstate->transtypeLen); 606 fcinfo->args[0].isnull = peraggstate->transValueIsNull; 607 anynull = peraggstate->transValueIsNull; 608 609 /* Fill any remaining argument positions with nulls */ 610 for (i = 1; i < numFinalArgs; i++) 611 { 612 fcinfo->args[i].value = (Datum) 0; 613 fcinfo->args[i].isnull = true; 614 anynull = true; 615 } 616 617 if (fcinfo->flinfo->fn_strict && anynull) 618 { 619 /* don't call a strict function with NULL inputs */ 620 *result = (Datum) 0; 621 *isnull = true; 622 } 623 else 624 { 625 winstate->curaggcontext = peraggstate->aggcontext; 626 *result = FunctionCallInvoke(fcinfo); 627 winstate->curaggcontext = NULL; 628 *isnull = fcinfo->isnull; 629 } 630 } 631 else 632 { 633 /* Don't need MakeExpandedObjectReadOnly; datumCopy will copy it */ 634 *result = peraggstate->transValue; 635 *isnull = peraggstate->transValueIsNull; 636 } 637 638 /* 639 * If result is pass-by-ref, make sure it is in the right context. 640 */ 641 if (!peraggstate->resulttypeByVal && !*isnull && 642 !MemoryContextContains(CurrentMemoryContext, 643 DatumGetPointer(*result))) 644 *result = datumCopy(*result, 645 peraggstate->resulttypeByVal, 646 peraggstate->resulttypeLen); 647 MemoryContextSwitchTo(oldContext); 648 } 649 650 /* 651 * eval_windowaggregates 652 * evaluate plain aggregates being used as window functions 653 * 654 * This differs from nodeAgg.c in two ways. First, if the window's frame 655 * start position moves, we use the inverse transition function (if it exists) 656 * to remove rows from the transition value. And second, we expect to be 657 * able to call aggregate final functions repeatedly after aggregating more 658 * data onto the same transition value. This is not a behavior required by 659 * nodeAgg.c. 660 */ 661 static void 662 eval_windowaggregates(WindowAggState *winstate) 663 { 664 WindowStatePerAgg peraggstate; 665 int wfuncno, 666 numaggs, 667 numaggs_restart, 668 i; 669 int64 aggregatedupto_nonrestarted; 670 MemoryContext oldContext; 671 ExprContext *econtext; 672 WindowObject agg_winobj; 673 TupleTableSlot *agg_row_slot; 674 TupleTableSlot *temp_slot; 675 676 numaggs = winstate->numaggs; 677 if (numaggs == 0) 678 return; /* nothing to do */ 679 680 /* final output execution is in ps_ExprContext */ 681 econtext = winstate->ss.ps.ps_ExprContext; 682 agg_winobj = winstate->agg_winobj; 683 agg_row_slot = winstate->agg_row_slot; 684 temp_slot = winstate->temp_slot_1; 685 686 /* 687 * If the window's frame start clause is UNBOUNDED_PRECEDING and no 688 * exclusion clause is specified, then the window frame consists of a 689 * contiguous group of rows extending forward from the start of the 690 * partition, and rows only enter the frame, never exit it, as the current 691 * row advances forward. This makes it possible to use an incremental 692 * strategy for evaluating aggregates: we run the transition function for 693 * each row added to the frame, and run the final function whenever we 694 * need the current aggregate value. This is considerably more efficient 695 * than the naive approach of re-running the entire aggregate calculation 696 * for each current row. It does assume that the final function doesn't 697 * damage the running transition value, but we have the same assumption in 698 * nodeAgg.c too (when it rescans an existing hash table). 699 * 700 * If the frame start does sometimes move, we can still optimize as above 701 * whenever successive rows share the same frame head, but if the frame 702 * head moves beyond the previous head we try to remove those rows using 703 * the aggregate's inverse transition function. This function restores 704 * the aggregate's current state to what it would be if the removed row 705 * had never been aggregated in the first place. Inverse transition 706 * functions may optionally return NULL, indicating that the function was 707 * unable to remove the tuple from aggregation. If this happens, or if 708 * the aggregate doesn't have an inverse transition function at all, we 709 * must perform the aggregation all over again for all tuples within the 710 * new frame boundaries. 711 * 712 * If there's any exclusion clause, then we may have to aggregate over a 713 * non-contiguous set of rows, so we punt and recalculate for every row. 714 * (For some frame end choices, it might be that the frame is always 715 * contiguous anyway, but that's an optimization to investigate later.) 716 * 717 * In many common cases, multiple rows share the same frame and hence the 718 * same aggregate value. (In particular, if there's no ORDER BY in a RANGE 719 * window, then all rows are peers and so they all have window frame equal 720 * to the whole partition.) We optimize such cases by calculating the 721 * aggregate value once when we reach the first row of a peer group, and 722 * then returning the saved value for all subsequent rows. 723 * 724 * 'aggregatedupto' keeps track of the first row that has not yet been 725 * accumulated into the aggregate transition values. Whenever we start a 726 * new peer group, we accumulate forward to the end of the peer group. 727 */ 728 729 /* 730 * First, update the frame head position. 731 * 732 * The frame head should never move backwards, and the code below wouldn't 733 * cope if it did, so for safety we complain if it does. 734 */ 735 update_frameheadpos(winstate); 736 if (winstate->frameheadpos < winstate->aggregatedbase) 737 elog(ERROR, "window frame head moved backward"); 738 739 /* 740 * If the frame didn't change compared to the previous row, we can re-use 741 * the result values that were previously saved at the bottom of this 742 * function. Since we don't know the current frame's end yet, this is not 743 * possible to check for fully. But if the frame end mode is UNBOUNDED 744 * FOLLOWING or CURRENT ROW, no exclusion clause is specified, and the 745 * current row lies within the previous row's frame, then the two frames' 746 * ends must coincide. Note that on the first row aggregatedbase == 747 * aggregatedupto, meaning this test must fail, so we don't need to check 748 * the "there was no previous row" case explicitly here. 749 */ 750 if (winstate->aggregatedbase == winstate->frameheadpos && 751 (winstate->frameOptions & (FRAMEOPTION_END_UNBOUNDED_FOLLOWING | 752 FRAMEOPTION_END_CURRENT_ROW)) && 753 !(winstate->frameOptions & FRAMEOPTION_EXCLUSION) && 754 winstate->aggregatedbase <= winstate->currentpos && 755 winstate->aggregatedupto > winstate->currentpos) 756 { 757 for (i = 0; i < numaggs; i++) 758 { 759 peraggstate = &winstate->peragg[i]; 760 wfuncno = peraggstate->wfuncno; 761 econtext->ecxt_aggvalues[wfuncno] = peraggstate->resultValue; 762 econtext->ecxt_aggnulls[wfuncno] = peraggstate->resultValueIsNull; 763 } 764 return; 765 } 766 767 /*---------- 768 * Initialize restart flags. 769 * 770 * We restart the aggregation: 771 * - if we're processing the first row in the partition, or 772 * - if the frame's head moved and we cannot use an inverse 773 * transition function, or 774 * - we have an EXCLUSION clause, or 775 * - if the new frame doesn't overlap the old one 776 * 777 * Note that we don't strictly need to restart in the last case, but if 778 * we're going to remove all rows from the aggregation anyway, a restart 779 * surely is faster. 780 *---------- 781 */ 782 numaggs_restart = 0; 783 for (i = 0; i < numaggs; i++) 784 { 785 peraggstate = &winstate->peragg[i]; 786 if (winstate->currentpos == 0 || 787 (winstate->aggregatedbase != winstate->frameheadpos && 788 !OidIsValid(peraggstate->invtransfn_oid)) || 789 (winstate->frameOptions & FRAMEOPTION_EXCLUSION) || 790 winstate->aggregatedupto <= winstate->frameheadpos) 791 { 792 peraggstate->restart = true; 793 numaggs_restart++; 794 } 795 else 796 peraggstate->restart = false; 797 } 798 799 /* 800 * If we have any possibly-moving aggregates, attempt to advance 801 * aggregatedbase to match the frame's head by removing input rows that 802 * fell off the top of the frame from the aggregations. This can fail, 803 * i.e. advance_windowaggregate_base() can return false, in which case 804 * we'll restart that aggregate below. 805 */ 806 while (numaggs_restart < numaggs && 807 winstate->aggregatedbase < winstate->frameheadpos) 808 { 809 /* 810 * Fetch the next tuple of those being removed. This should never fail 811 * as we should have been here before. 812 */ 813 if (!window_gettupleslot(agg_winobj, winstate->aggregatedbase, 814 temp_slot)) 815 elog(ERROR, "could not re-fetch previously fetched frame row"); 816 817 /* Set tuple context for evaluation of aggregate arguments */ 818 winstate->tmpcontext->ecxt_outertuple = temp_slot; 819 820 /* 821 * Perform the inverse transition for each aggregate function in the 822 * window, unless it has already been marked as needing a restart. 823 */ 824 for (i = 0; i < numaggs; i++) 825 { 826 bool ok; 827 828 peraggstate = &winstate->peragg[i]; 829 if (peraggstate->restart) 830 continue; 831 832 wfuncno = peraggstate->wfuncno; 833 ok = advance_windowaggregate_base(winstate, 834 &winstate->perfunc[wfuncno], 835 peraggstate); 836 if (!ok) 837 { 838 /* Inverse transition function has failed, must restart */ 839 peraggstate->restart = true; 840 numaggs_restart++; 841 } 842 } 843 844 /* Reset per-input-tuple context after each tuple */ 845 ResetExprContext(winstate->tmpcontext); 846 847 /* And advance the aggregated-row state */ 848 winstate->aggregatedbase++; 849 ExecClearTuple(temp_slot); 850 } 851 852 /* 853 * If we successfully advanced the base rows of all the aggregates, 854 * aggregatedbase now equals frameheadpos; but if we failed for any, we 855 * must forcibly update aggregatedbase. 856 */ 857 winstate->aggregatedbase = winstate->frameheadpos; 858 859 /* 860 * If we created a mark pointer for aggregates, keep it pushed up to frame 861 * head, so that tuplestore can discard unnecessary rows. 862 */ 863 if (agg_winobj->markptr >= 0) 864 WinSetMarkPosition(agg_winobj, winstate->frameheadpos); 865 866 /* 867 * Now restart the aggregates that require it. 868 * 869 * We assume that aggregates using the shared context always restart if 870 * *any* aggregate restarts, and we may thus clean up the shared 871 * aggcontext if that is the case. Private aggcontexts are reset by 872 * initialize_windowaggregate() if their owning aggregate restarts. If we 873 * aren't restarting an aggregate, we need to free any previously saved 874 * result for it, else we'll leak memory. 875 */ 876 if (numaggs_restart > 0) 877 MemoryContextResetAndDeleteChildren(winstate->aggcontext); 878 for (i = 0; i < numaggs; i++) 879 { 880 peraggstate = &winstate->peragg[i]; 881 882 /* Aggregates using the shared ctx must restart if *any* agg does */ 883 Assert(peraggstate->aggcontext != winstate->aggcontext || 884 numaggs_restart == 0 || 885 peraggstate->restart); 886 887 if (peraggstate->restart) 888 { 889 wfuncno = peraggstate->wfuncno; 890 initialize_windowaggregate(winstate, 891 &winstate->perfunc[wfuncno], 892 peraggstate); 893 } 894 else if (!peraggstate->resultValueIsNull) 895 { 896 if (!peraggstate->resulttypeByVal) 897 pfree(DatumGetPointer(peraggstate->resultValue)); 898 peraggstate->resultValue = (Datum) 0; 899 peraggstate->resultValueIsNull = true; 900 } 901 } 902 903 /* 904 * Non-restarted aggregates now contain the rows between aggregatedbase 905 * (i.e., frameheadpos) and aggregatedupto, while restarted aggregates 906 * contain no rows. If there are any restarted aggregates, we must thus 907 * begin aggregating anew at frameheadpos, otherwise we may simply 908 * continue at aggregatedupto. We must remember the old value of 909 * aggregatedupto to know how long to skip advancing non-restarted 910 * aggregates. If we modify aggregatedupto, we must also clear 911 * agg_row_slot, per the loop invariant below. 912 */ 913 aggregatedupto_nonrestarted = winstate->aggregatedupto; 914 if (numaggs_restart > 0 && 915 winstate->aggregatedupto != winstate->frameheadpos) 916 { 917 winstate->aggregatedupto = winstate->frameheadpos; 918 ExecClearTuple(agg_row_slot); 919 } 920 921 /* 922 * Advance until we reach a row not in frame (or end of partition). 923 * 924 * Note the loop invariant: agg_row_slot is either empty or holds the row 925 * at position aggregatedupto. We advance aggregatedupto after processing 926 * a row. 927 */ 928 for (;;) 929 { 930 int ret; 931 932 /* Fetch next row if we didn't already */ 933 if (TupIsNull(agg_row_slot)) 934 { 935 if (!window_gettupleslot(agg_winobj, winstate->aggregatedupto, 936 agg_row_slot)) 937 break; /* must be end of partition */ 938 } 939 940 /* 941 * Exit loop if no more rows can be in frame. Skip aggregation if 942 * current row is not in frame but there might be more in the frame. 943 */ 944 ret = row_is_in_frame(winstate, winstate->aggregatedupto, agg_row_slot); 945 if (ret < 0) 946 break; 947 if (ret == 0) 948 goto next_tuple; 949 950 /* Set tuple context for evaluation of aggregate arguments */ 951 winstate->tmpcontext->ecxt_outertuple = agg_row_slot; 952 953 /* Accumulate row into the aggregates */ 954 for (i = 0; i < numaggs; i++) 955 { 956 peraggstate = &winstate->peragg[i]; 957 958 /* Non-restarted aggs skip until aggregatedupto_nonrestarted */ 959 if (!peraggstate->restart && 960 winstate->aggregatedupto < aggregatedupto_nonrestarted) 961 continue; 962 963 wfuncno = peraggstate->wfuncno; 964 advance_windowaggregate(winstate, 965 &winstate->perfunc[wfuncno], 966 peraggstate); 967 } 968 969 next_tuple: 970 /* Reset per-input-tuple context after each tuple */ 971 ResetExprContext(winstate->tmpcontext); 972 973 /* And advance the aggregated-row state */ 974 winstate->aggregatedupto++; 975 ExecClearTuple(agg_row_slot); 976 } 977 978 /* The frame's end is not supposed to move backwards, ever */ 979 Assert(aggregatedupto_nonrestarted <= winstate->aggregatedupto); 980 981 /* 982 * finalize aggregates and fill result/isnull fields. 983 */ 984 for (i = 0; i < numaggs; i++) 985 { 986 Datum *result; 987 bool *isnull; 988 989 peraggstate = &winstate->peragg[i]; 990 wfuncno = peraggstate->wfuncno; 991 result = &econtext->ecxt_aggvalues[wfuncno]; 992 isnull = &econtext->ecxt_aggnulls[wfuncno]; 993 finalize_windowaggregate(winstate, 994 &winstate->perfunc[wfuncno], 995 peraggstate, 996 result, isnull); 997 998 /* 999 * save the result in case next row shares the same frame. 1000 * 1001 * XXX in some framing modes, eg ROWS/END_CURRENT_ROW, we can know in 1002 * advance that the next row can't possibly share the same frame. Is 1003 * it worth detecting that and skipping this code? 1004 */ 1005 if (!peraggstate->resulttypeByVal && !*isnull) 1006 { 1007 oldContext = MemoryContextSwitchTo(peraggstate->aggcontext); 1008 peraggstate->resultValue = 1009 datumCopy(*result, 1010 peraggstate->resulttypeByVal, 1011 peraggstate->resulttypeLen); 1012 MemoryContextSwitchTo(oldContext); 1013 } 1014 else 1015 { 1016 peraggstate->resultValue = *result; 1017 } 1018 peraggstate->resultValueIsNull = *isnull; 1019 } 1020 } 1021 1022 /* 1023 * eval_windowfunction 1024 * 1025 * Arguments of window functions are not evaluated here, because a window 1026 * function can need random access to arbitrary rows in the partition. 1027 * The window function uses the special WinGetFuncArgInPartition and 1028 * WinGetFuncArgInFrame functions to evaluate the arguments for the rows 1029 * it wants. 1030 */ 1031 static void 1032 eval_windowfunction(WindowAggState *winstate, WindowStatePerFunc perfuncstate, 1033 Datum *result, bool *isnull) 1034 { 1035 LOCAL_FCINFO(fcinfo, FUNC_MAX_ARGS); 1036 MemoryContext oldContext; 1037 1038 oldContext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory); 1039 1040 /* 1041 * We don't pass any normal arguments to a window function, but we do pass 1042 * it the number of arguments, in order to permit window function 1043 * implementations to support varying numbers of arguments. The real info 1044 * goes through the WindowObject, which is passed via fcinfo->context. 1045 */ 1046 InitFunctionCallInfoData(*fcinfo, &(perfuncstate->flinfo), 1047 perfuncstate->numArguments, 1048 perfuncstate->winCollation, 1049 (void *) perfuncstate->winobj, NULL); 1050 /* Just in case, make all the regular argument slots be null */ 1051 for (int argno = 0; argno < perfuncstate->numArguments; argno++) 1052 fcinfo->args[argno].isnull = true; 1053 /* Window functions don't have a current aggregate context, either */ 1054 winstate->curaggcontext = NULL; 1055 1056 *result = FunctionCallInvoke(fcinfo); 1057 *isnull = fcinfo->isnull; 1058 1059 /* 1060 * Make sure pass-by-ref data is allocated in the appropriate context. (We 1061 * need this in case the function returns a pointer into some short-lived 1062 * tuple, as is entirely possible.) 1063 */ 1064 if (!perfuncstate->resulttypeByVal && !fcinfo->isnull && 1065 !MemoryContextContains(CurrentMemoryContext, 1066 DatumGetPointer(*result))) 1067 *result = datumCopy(*result, 1068 perfuncstate->resulttypeByVal, 1069 perfuncstate->resulttypeLen); 1070 1071 MemoryContextSwitchTo(oldContext); 1072 } 1073 1074 /* 1075 * begin_partition 1076 * Start buffering rows of the next partition. 1077 */ 1078 static void 1079 begin_partition(WindowAggState *winstate) 1080 { 1081 WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan; 1082 PlanState *outerPlan = outerPlanState(winstate); 1083 int frameOptions = winstate->frameOptions; 1084 int numfuncs = winstate->numfuncs; 1085 int i; 1086 1087 winstate->partition_spooled = false; 1088 winstate->framehead_valid = false; 1089 winstate->frametail_valid = false; 1090 winstate->grouptail_valid = false; 1091 winstate->spooled_rows = 0; 1092 winstate->currentpos = 0; 1093 winstate->frameheadpos = 0; 1094 winstate->frametailpos = 0; 1095 winstate->currentgroup = 0; 1096 winstate->frameheadgroup = 0; 1097 winstate->frametailgroup = 0; 1098 winstate->groupheadpos = 0; 1099 winstate->grouptailpos = -1; /* see update_grouptailpos */ 1100 ExecClearTuple(winstate->agg_row_slot); 1101 if (winstate->framehead_slot) 1102 ExecClearTuple(winstate->framehead_slot); 1103 if (winstate->frametail_slot) 1104 ExecClearTuple(winstate->frametail_slot); 1105 1106 /* 1107 * If this is the very first partition, we need to fetch the first input 1108 * row to store in first_part_slot. 1109 */ 1110 if (TupIsNull(winstate->first_part_slot)) 1111 { 1112 TupleTableSlot *outerslot = ExecProcNode(outerPlan); 1113 1114 if (!TupIsNull(outerslot)) 1115 ExecCopySlot(winstate->first_part_slot, outerslot); 1116 else 1117 { 1118 /* outer plan is empty, so we have nothing to do */ 1119 winstate->partition_spooled = true; 1120 winstate->more_partitions = false; 1121 return; 1122 } 1123 } 1124 1125 /* Create new tuplestore for this partition */ 1126 winstate->buffer = tuplestore_begin_heap(false, false, work_mem); 1127 1128 /* 1129 * Set up read pointers for the tuplestore. The current pointer doesn't 1130 * need BACKWARD capability, but the per-window-function read pointers do, 1131 * and the aggregate pointer does if we might need to restart aggregation. 1132 */ 1133 winstate->current_ptr = 0; /* read pointer 0 is pre-allocated */ 1134 1135 /* reset default REWIND capability bit for current ptr */ 1136 tuplestore_set_eflags(winstate->buffer, 0); 1137 1138 /* create read pointers for aggregates, if needed */ 1139 if (winstate->numaggs > 0) 1140 { 1141 WindowObject agg_winobj = winstate->agg_winobj; 1142 int readptr_flags = 0; 1143 1144 /* 1145 * If the frame head is potentially movable, or we have an EXCLUSION 1146 * clause, we might need to restart aggregation ... 1147 */ 1148 if (!(frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING) || 1149 (frameOptions & FRAMEOPTION_EXCLUSION)) 1150 { 1151 /* ... so create a mark pointer to track the frame head */ 1152 agg_winobj->markptr = tuplestore_alloc_read_pointer(winstate->buffer, 0); 1153 /* and the read pointer will need BACKWARD capability */ 1154 readptr_flags |= EXEC_FLAG_BACKWARD; 1155 } 1156 1157 agg_winobj->readptr = tuplestore_alloc_read_pointer(winstate->buffer, 1158 readptr_flags); 1159 agg_winobj->markpos = -1; 1160 agg_winobj->seekpos = -1; 1161 1162 /* Also reset the row counters for aggregates */ 1163 winstate->aggregatedbase = 0; 1164 winstate->aggregatedupto = 0; 1165 } 1166 1167 /* create mark and read pointers for each real window function */ 1168 for (i = 0; i < numfuncs; i++) 1169 { 1170 WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]); 1171 1172 if (!perfuncstate->plain_agg) 1173 { 1174 WindowObject winobj = perfuncstate->winobj; 1175 1176 winobj->markptr = tuplestore_alloc_read_pointer(winstate->buffer, 1177 0); 1178 winobj->readptr = tuplestore_alloc_read_pointer(winstate->buffer, 1179 EXEC_FLAG_BACKWARD); 1180 winobj->markpos = -1; 1181 winobj->seekpos = -1; 1182 } 1183 } 1184 1185 /* 1186 * If we are in RANGE or GROUPS mode, then determining frame boundaries 1187 * requires physical access to the frame endpoint rows, except in certain 1188 * degenerate cases. We create read pointers to point to those rows, to 1189 * simplify access and ensure that the tuplestore doesn't discard the 1190 * endpoint rows prematurely. (Must create pointers in exactly the same 1191 * cases that update_frameheadpos and update_frametailpos need them.) 1192 */ 1193 winstate->framehead_ptr = winstate->frametail_ptr = -1; /* if not used */ 1194 1195 if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS)) 1196 { 1197 if (((frameOptions & FRAMEOPTION_START_CURRENT_ROW) && 1198 node->ordNumCols != 0) || 1199 (frameOptions & FRAMEOPTION_START_OFFSET)) 1200 winstate->framehead_ptr = 1201 tuplestore_alloc_read_pointer(winstate->buffer, 0); 1202 if (((frameOptions & FRAMEOPTION_END_CURRENT_ROW) && 1203 node->ordNumCols != 0) || 1204 (frameOptions & FRAMEOPTION_END_OFFSET)) 1205 winstate->frametail_ptr = 1206 tuplestore_alloc_read_pointer(winstate->buffer, 0); 1207 } 1208 1209 /* 1210 * If we have an exclusion clause that requires knowing the boundaries of 1211 * the current row's peer group, we create a read pointer to track the 1212 * tail position of the peer group (i.e., first row of the next peer 1213 * group). The head position does not require its own pointer because we 1214 * maintain that as a side effect of advancing the current row. 1215 */ 1216 winstate->grouptail_ptr = -1; 1217 1218 if ((frameOptions & (FRAMEOPTION_EXCLUDE_GROUP | 1219 FRAMEOPTION_EXCLUDE_TIES)) && 1220 node->ordNumCols != 0) 1221 { 1222 winstate->grouptail_ptr = 1223 tuplestore_alloc_read_pointer(winstate->buffer, 0); 1224 } 1225 1226 /* 1227 * Store the first tuple into the tuplestore (it's always available now; 1228 * we either read it above, or saved it at the end of previous partition) 1229 */ 1230 tuplestore_puttupleslot(winstate->buffer, winstate->first_part_slot); 1231 winstate->spooled_rows++; 1232 } 1233 1234 /* 1235 * Read tuples from the outer node, up to and including position 'pos', and 1236 * store them into the tuplestore. If pos is -1, reads the whole partition. 1237 */ 1238 static void 1239 spool_tuples(WindowAggState *winstate, int64 pos) 1240 { 1241 WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan; 1242 PlanState *outerPlan; 1243 TupleTableSlot *outerslot; 1244 MemoryContext oldcontext; 1245 1246 if (!winstate->buffer) 1247 return; /* just a safety check */ 1248 if (winstate->partition_spooled) 1249 return; /* whole partition done already */ 1250 1251 /* 1252 * If the tuplestore has spilled to disk, alternate reading and writing 1253 * becomes quite expensive due to frequent buffer flushes. It's cheaper 1254 * to force the entire partition to get spooled in one go. 1255 * 1256 * XXX this is a horrid kluge --- it'd be better to fix the performance 1257 * problem inside tuplestore. FIXME 1258 */ 1259 if (!tuplestore_in_memory(winstate->buffer)) 1260 pos = -1; 1261 1262 outerPlan = outerPlanState(winstate); 1263 1264 /* Must be in query context to call outerplan */ 1265 oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory); 1266 1267 while (winstate->spooled_rows <= pos || pos == -1) 1268 { 1269 outerslot = ExecProcNode(outerPlan); 1270 if (TupIsNull(outerslot)) 1271 { 1272 /* reached the end of the last partition */ 1273 winstate->partition_spooled = true; 1274 winstate->more_partitions = false; 1275 break; 1276 } 1277 1278 if (node->partNumCols > 0) 1279 { 1280 ExprContext *econtext = winstate->tmpcontext; 1281 1282 econtext->ecxt_innertuple = winstate->first_part_slot; 1283 econtext->ecxt_outertuple = outerslot; 1284 1285 /* Check if this tuple still belongs to the current partition */ 1286 if (!ExecQualAndReset(winstate->partEqfunction, econtext)) 1287 { 1288 /* 1289 * end of partition; copy the tuple for the next cycle. 1290 */ 1291 ExecCopySlot(winstate->first_part_slot, outerslot); 1292 winstate->partition_spooled = true; 1293 winstate->more_partitions = true; 1294 break; 1295 } 1296 } 1297 1298 /* Still in partition, so save it into the tuplestore */ 1299 tuplestore_puttupleslot(winstate->buffer, outerslot); 1300 winstate->spooled_rows++; 1301 } 1302 1303 MemoryContextSwitchTo(oldcontext); 1304 } 1305 1306 /* 1307 * release_partition 1308 * clear information kept within a partition, including 1309 * tuplestore and aggregate results. 1310 */ 1311 static void 1312 release_partition(WindowAggState *winstate) 1313 { 1314 int i; 1315 1316 for (i = 0; i < winstate->numfuncs; i++) 1317 { 1318 WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]); 1319 1320 /* Release any partition-local state of this window function */ 1321 if (perfuncstate->winobj) 1322 perfuncstate->winobj->localmem = NULL; 1323 } 1324 1325 /* 1326 * Release all partition-local memory (in particular, any partition-local 1327 * state that we might have trashed our pointers to in the above loop, and 1328 * any aggregate temp data). We don't rely on retail pfree because some 1329 * aggregates might have allocated data we don't have direct pointers to. 1330 */ 1331 MemoryContextResetAndDeleteChildren(winstate->partcontext); 1332 MemoryContextResetAndDeleteChildren(winstate->aggcontext); 1333 for (i = 0; i < winstate->numaggs; i++) 1334 { 1335 if (winstate->peragg[i].aggcontext != winstate->aggcontext) 1336 MemoryContextResetAndDeleteChildren(winstate->peragg[i].aggcontext); 1337 } 1338 1339 if (winstate->buffer) 1340 tuplestore_end(winstate->buffer); 1341 winstate->buffer = NULL; 1342 winstate->partition_spooled = false; 1343 } 1344 1345 /* 1346 * row_is_in_frame 1347 * Determine whether a row is in the current row's window frame according 1348 * to our window framing rule 1349 * 1350 * The caller must have already determined that the row is in the partition 1351 * and fetched it into a slot. This function just encapsulates the framing 1352 * rules. 1353 * 1354 * Returns: 1355 * -1, if the row is out of frame and no succeeding rows can be in frame 1356 * 0, if the row is out of frame but succeeding rows might be in frame 1357 * 1, if the row is in frame 1358 * 1359 * May clobber winstate->temp_slot_2. 1360 */ 1361 static int 1362 row_is_in_frame(WindowAggState *winstate, int64 pos, TupleTableSlot *slot) 1363 { 1364 int frameOptions = winstate->frameOptions; 1365 1366 Assert(pos >= 0); /* else caller error */ 1367 1368 /* 1369 * First, check frame starting conditions. We might as well delegate this 1370 * to update_frameheadpos always; it doesn't add any notable cost. 1371 */ 1372 update_frameheadpos(winstate); 1373 if (pos < winstate->frameheadpos) 1374 return 0; 1375 1376 /* 1377 * Okay so far, now check frame ending conditions. Here, we avoid calling 1378 * update_frametailpos in simple cases, so as not to spool tuples further 1379 * ahead than necessary. 1380 */ 1381 if (frameOptions & FRAMEOPTION_END_CURRENT_ROW) 1382 { 1383 if (frameOptions & FRAMEOPTION_ROWS) 1384 { 1385 /* rows after current row are out of frame */ 1386 if (pos > winstate->currentpos) 1387 return -1; 1388 } 1389 else if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS)) 1390 { 1391 /* following row that is not peer is out of frame */ 1392 if (pos > winstate->currentpos && 1393 !are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot)) 1394 return -1; 1395 } 1396 else 1397 Assert(false); 1398 } 1399 else if (frameOptions & FRAMEOPTION_END_OFFSET) 1400 { 1401 if (frameOptions & FRAMEOPTION_ROWS) 1402 { 1403 int64 offset = DatumGetInt64(winstate->endOffsetValue); 1404 1405 /* rows after current row + offset are out of frame */ 1406 if (frameOptions & FRAMEOPTION_END_OFFSET_PRECEDING) 1407 offset = -offset; 1408 1409 if (pos > winstate->currentpos + offset) 1410 return -1; 1411 } 1412 else if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS)) 1413 { 1414 /* hard cases, so delegate to update_frametailpos */ 1415 update_frametailpos(winstate); 1416 if (pos >= winstate->frametailpos) 1417 return -1; 1418 } 1419 else 1420 Assert(false); 1421 } 1422 1423 /* Check exclusion clause */ 1424 if (frameOptions & FRAMEOPTION_EXCLUDE_CURRENT_ROW) 1425 { 1426 if (pos == winstate->currentpos) 1427 return 0; 1428 } 1429 else if ((frameOptions & FRAMEOPTION_EXCLUDE_GROUP) || 1430 ((frameOptions & FRAMEOPTION_EXCLUDE_TIES) && 1431 pos != winstate->currentpos)) 1432 { 1433 WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan; 1434 1435 /* If no ORDER BY, all rows are peers with each other */ 1436 if (node->ordNumCols == 0) 1437 return 0; 1438 /* Otherwise, check the group boundaries */ 1439 if (pos >= winstate->groupheadpos) 1440 { 1441 update_grouptailpos(winstate); 1442 if (pos < winstate->grouptailpos) 1443 return 0; 1444 } 1445 } 1446 1447 /* If we get here, it's in frame */ 1448 return 1; 1449 } 1450 1451 /* 1452 * update_frameheadpos 1453 * make frameheadpos valid for the current row 1454 * 1455 * Note that frameheadpos is computed without regard for any window exclusion 1456 * clause; the current row and/or its peers are considered part of the frame 1457 * for this purpose even if they must be excluded later. 1458 * 1459 * May clobber winstate->temp_slot_2. 1460 */ 1461 static void 1462 update_frameheadpos(WindowAggState *winstate) 1463 { 1464 WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan; 1465 int frameOptions = winstate->frameOptions; 1466 MemoryContext oldcontext; 1467 1468 if (winstate->framehead_valid) 1469 return; /* already known for current row */ 1470 1471 /* We may be called in a short-lived context */ 1472 oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory); 1473 1474 if (frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING) 1475 { 1476 /* In UNBOUNDED PRECEDING mode, frame head is always row 0 */ 1477 winstate->frameheadpos = 0; 1478 winstate->framehead_valid = true; 1479 } 1480 else if (frameOptions & FRAMEOPTION_START_CURRENT_ROW) 1481 { 1482 if (frameOptions & FRAMEOPTION_ROWS) 1483 { 1484 /* In ROWS mode, frame head is the same as current */ 1485 winstate->frameheadpos = winstate->currentpos; 1486 winstate->framehead_valid = true; 1487 } 1488 else if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS)) 1489 { 1490 /* If no ORDER BY, all rows are peers with each other */ 1491 if (node->ordNumCols == 0) 1492 { 1493 winstate->frameheadpos = 0; 1494 winstate->framehead_valid = true; 1495 MemoryContextSwitchTo(oldcontext); 1496 return; 1497 } 1498 1499 /* 1500 * In RANGE or GROUPS START_CURRENT_ROW mode, frame head is the 1501 * first row that is a peer of current row. We keep a copy of the 1502 * last-known frame head row in framehead_slot, and advance as 1503 * necessary. Note that if we reach end of partition, we will 1504 * leave frameheadpos = end+1 and framehead_slot empty. 1505 */ 1506 tuplestore_select_read_pointer(winstate->buffer, 1507 winstate->framehead_ptr); 1508 if (winstate->frameheadpos == 0 && 1509 TupIsNull(winstate->framehead_slot)) 1510 { 1511 /* fetch first row into framehead_slot, if we didn't already */ 1512 if (!tuplestore_gettupleslot(winstate->buffer, true, true, 1513 winstate->framehead_slot)) 1514 elog(ERROR, "unexpected end of tuplestore"); 1515 } 1516 1517 while (!TupIsNull(winstate->framehead_slot)) 1518 { 1519 if (are_peers(winstate, winstate->framehead_slot, 1520 winstate->ss.ss_ScanTupleSlot)) 1521 break; /* this row is the correct frame head */ 1522 /* Note we advance frameheadpos even if the fetch fails */ 1523 winstate->frameheadpos++; 1524 spool_tuples(winstate, winstate->frameheadpos); 1525 if (!tuplestore_gettupleslot(winstate->buffer, true, true, 1526 winstate->framehead_slot)) 1527 break; /* end of partition */ 1528 } 1529 winstate->framehead_valid = true; 1530 } 1531 else 1532 Assert(false); 1533 } 1534 else if (frameOptions & FRAMEOPTION_START_OFFSET) 1535 { 1536 if (frameOptions & FRAMEOPTION_ROWS) 1537 { 1538 /* In ROWS mode, bound is physically n before/after current */ 1539 int64 offset = DatumGetInt64(winstate->startOffsetValue); 1540 1541 if (frameOptions & FRAMEOPTION_START_OFFSET_PRECEDING) 1542 offset = -offset; 1543 1544 winstate->frameheadpos = winstate->currentpos + offset; 1545 /* frame head can't go before first row */ 1546 if (winstate->frameheadpos < 0) 1547 winstate->frameheadpos = 0; 1548 else if (winstate->frameheadpos > winstate->currentpos + 1) 1549 { 1550 /* make sure frameheadpos is not past end of partition */ 1551 spool_tuples(winstate, winstate->frameheadpos - 1); 1552 if (winstate->frameheadpos > winstate->spooled_rows) 1553 winstate->frameheadpos = winstate->spooled_rows; 1554 } 1555 winstate->framehead_valid = true; 1556 } 1557 else if (frameOptions & FRAMEOPTION_RANGE) 1558 { 1559 /* 1560 * In RANGE START_OFFSET mode, frame head is the first row that 1561 * satisfies the in_range constraint relative to the current row. 1562 * We keep a copy of the last-known frame head row in 1563 * framehead_slot, and advance as necessary. Note that if we 1564 * reach end of partition, we will leave frameheadpos = end+1 and 1565 * framehead_slot empty. 1566 */ 1567 int sortCol = node->ordColIdx[0]; 1568 bool sub, 1569 less; 1570 1571 /* We must have an ordering column */ 1572 Assert(node->ordNumCols == 1); 1573 1574 /* Precompute flags for in_range checks */ 1575 if (frameOptions & FRAMEOPTION_START_OFFSET_PRECEDING) 1576 sub = true; /* subtract startOffset from current row */ 1577 else 1578 sub = false; /* add it */ 1579 less = false; /* normally, we want frame head >= sum */ 1580 /* If sort order is descending, flip both flags */ 1581 if (!winstate->inRangeAsc) 1582 { 1583 sub = !sub; 1584 less = true; 1585 } 1586 1587 tuplestore_select_read_pointer(winstate->buffer, 1588 winstate->framehead_ptr); 1589 if (winstate->frameheadpos == 0 && 1590 TupIsNull(winstate->framehead_slot)) 1591 { 1592 /* fetch first row into framehead_slot, if we didn't already */ 1593 if (!tuplestore_gettupleslot(winstate->buffer, true, true, 1594 winstate->framehead_slot)) 1595 elog(ERROR, "unexpected end of tuplestore"); 1596 } 1597 1598 while (!TupIsNull(winstate->framehead_slot)) 1599 { 1600 Datum headval, 1601 currval; 1602 bool headisnull, 1603 currisnull; 1604 1605 headval = slot_getattr(winstate->framehead_slot, sortCol, 1606 &headisnull); 1607 currval = slot_getattr(winstate->ss.ss_ScanTupleSlot, sortCol, 1608 &currisnull); 1609 if (headisnull || currisnull) 1610 { 1611 /* order of the rows depends only on nulls_first */ 1612 if (winstate->inRangeNullsFirst) 1613 { 1614 /* advance head if head is null and curr is not */ 1615 if (!headisnull || currisnull) 1616 break; 1617 } 1618 else 1619 { 1620 /* advance head if head is not null and curr is null */ 1621 if (headisnull || !currisnull) 1622 break; 1623 } 1624 } 1625 else 1626 { 1627 if (DatumGetBool(FunctionCall5Coll(&winstate->startInRangeFunc, 1628 winstate->inRangeColl, 1629 headval, 1630 currval, 1631 winstate->startOffsetValue, 1632 BoolGetDatum(sub), 1633 BoolGetDatum(less)))) 1634 break; /* this row is the correct frame head */ 1635 } 1636 /* Note we advance frameheadpos even if the fetch fails */ 1637 winstate->frameheadpos++; 1638 spool_tuples(winstate, winstate->frameheadpos); 1639 if (!tuplestore_gettupleslot(winstate->buffer, true, true, 1640 winstate->framehead_slot)) 1641 break; /* end of partition */ 1642 } 1643 winstate->framehead_valid = true; 1644 } 1645 else if (frameOptions & FRAMEOPTION_GROUPS) 1646 { 1647 /* 1648 * In GROUPS START_OFFSET mode, frame head is the first row of the 1649 * first peer group whose number satisfies the offset constraint. 1650 * We keep a copy of the last-known frame head row in 1651 * framehead_slot, and advance as necessary. Note that if we 1652 * reach end of partition, we will leave frameheadpos = end+1 and 1653 * framehead_slot empty. 1654 */ 1655 int64 offset = DatumGetInt64(winstate->startOffsetValue); 1656 int64 minheadgroup; 1657 1658 if (frameOptions & FRAMEOPTION_START_OFFSET_PRECEDING) 1659 minheadgroup = winstate->currentgroup - offset; 1660 else 1661 minheadgroup = winstate->currentgroup + offset; 1662 1663 tuplestore_select_read_pointer(winstate->buffer, 1664 winstate->framehead_ptr); 1665 if (winstate->frameheadpos == 0 && 1666 TupIsNull(winstate->framehead_slot)) 1667 { 1668 /* fetch first row into framehead_slot, if we didn't already */ 1669 if (!tuplestore_gettupleslot(winstate->buffer, true, true, 1670 winstate->framehead_slot)) 1671 elog(ERROR, "unexpected end of tuplestore"); 1672 } 1673 1674 while (!TupIsNull(winstate->framehead_slot)) 1675 { 1676 if (winstate->frameheadgroup >= minheadgroup) 1677 break; /* this row is the correct frame head */ 1678 ExecCopySlot(winstate->temp_slot_2, winstate->framehead_slot); 1679 /* Note we advance frameheadpos even if the fetch fails */ 1680 winstate->frameheadpos++; 1681 spool_tuples(winstate, winstate->frameheadpos); 1682 if (!tuplestore_gettupleslot(winstate->buffer, true, true, 1683 winstate->framehead_slot)) 1684 break; /* end of partition */ 1685 if (!are_peers(winstate, winstate->temp_slot_2, 1686 winstate->framehead_slot)) 1687 winstate->frameheadgroup++; 1688 } 1689 ExecClearTuple(winstate->temp_slot_2); 1690 winstate->framehead_valid = true; 1691 } 1692 else 1693 Assert(false); 1694 } 1695 else 1696 Assert(false); 1697 1698 MemoryContextSwitchTo(oldcontext); 1699 } 1700 1701 /* 1702 * update_frametailpos 1703 * make frametailpos valid for the current row 1704 * 1705 * Note that frametailpos is computed without regard for any window exclusion 1706 * clause; the current row and/or its peers are considered part of the frame 1707 * for this purpose even if they must be excluded later. 1708 * 1709 * May clobber winstate->temp_slot_2. 1710 */ 1711 static void 1712 update_frametailpos(WindowAggState *winstate) 1713 { 1714 WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan; 1715 int frameOptions = winstate->frameOptions; 1716 MemoryContext oldcontext; 1717 1718 if (winstate->frametail_valid) 1719 return; /* already known for current row */ 1720 1721 /* We may be called in a short-lived context */ 1722 oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory); 1723 1724 if (frameOptions & FRAMEOPTION_END_UNBOUNDED_FOLLOWING) 1725 { 1726 /* In UNBOUNDED FOLLOWING mode, all partition rows are in frame */ 1727 spool_tuples(winstate, -1); 1728 winstate->frametailpos = winstate->spooled_rows; 1729 winstate->frametail_valid = true; 1730 } 1731 else if (frameOptions & FRAMEOPTION_END_CURRENT_ROW) 1732 { 1733 if (frameOptions & FRAMEOPTION_ROWS) 1734 { 1735 /* In ROWS mode, exactly the rows up to current are in frame */ 1736 winstate->frametailpos = winstate->currentpos + 1; 1737 winstate->frametail_valid = true; 1738 } 1739 else if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS)) 1740 { 1741 /* If no ORDER BY, all rows are peers with each other */ 1742 if (node->ordNumCols == 0) 1743 { 1744 spool_tuples(winstate, -1); 1745 winstate->frametailpos = winstate->spooled_rows; 1746 winstate->frametail_valid = true; 1747 MemoryContextSwitchTo(oldcontext); 1748 return; 1749 } 1750 1751 /* 1752 * In RANGE or GROUPS END_CURRENT_ROW mode, frame end is the last 1753 * row that is a peer of current row, frame tail is the row after 1754 * that (if any). We keep a copy of the last-known frame tail row 1755 * in frametail_slot, and advance as necessary. Note that if we 1756 * reach end of partition, we will leave frametailpos = end+1 and 1757 * frametail_slot empty. 1758 */ 1759 tuplestore_select_read_pointer(winstate->buffer, 1760 winstate->frametail_ptr); 1761 if (winstate->frametailpos == 0 && 1762 TupIsNull(winstate->frametail_slot)) 1763 { 1764 /* fetch first row into frametail_slot, if we didn't already */ 1765 if (!tuplestore_gettupleslot(winstate->buffer, true, true, 1766 winstate->frametail_slot)) 1767 elog(ERROR, "unexpected end of tuplestore"); 1768 } 1769 1770 while (!TupIsNull(winstate->frametail_slot)) 1771 { 1772 if (winstate->frametailpos > winstate->currentpos && 1773 !are_peers(winstate, winstate->frametail_slot, 1774 winstate->ss.ss_ScanTupleSlot)) 1775 break; /* this row is the frame tail */ 1776 /* Note we advance frametailpos even if the fetch fails */ 1777 winstate->frametailpos++; 1778 spool_tuples(winstate, winstate->frametailpos); 1779 if (!tuplestore_gettupleslot(winstate->buffer, true, true, 1780 winstate->frametail_slot)) 1781 break; /* end of partition */ 1782 } 1783 winstate->frametail_valid = true; 1784 } 1785 else 1786 Assert(false); 1787 } 1788 else if (frameOptions & FRAMEOPTION_END_OFFSET) 1789 { 1790 if (frameOptions & FRAMEOPTION_ROWS) 1791 { 1792 /* In ROWS mode, bound is physically n before/after current */ 1793 int64 offset = DatumGetInt64(winstate->endOffsetValue); 1794 1795 if (frameOptions & FRAMEOPTION_END_OFFSET_PRECEDING) 1796 offset = -offset; 1797 1798 winstate->frametailpos = winstate->currentpos + offset + 1; 1799 /* smallest allowable value of frametailpos is 0 */ 1800 if (winstate->frametailpos < 0) 1801 winstate->frametailpos = 0; 1802 else if (winstate->frametailpos > winstate->currentpos + 1) 1803 { 1804 /* make sure frametailpos is not past end of partition */ 1805 spool_tuples(winstate, winstate->frametailpos - 1); 1806 if (winstate->frametailpos > winstate->spooled_rows) 1807 winstate->frametailpos = winstate->spooled_rows; 1808 } 1809 winstate->frametail_valid = true; 1810 } 1811 else if (frameOptions & FRAMEOPTION_RANGE) 1812 { 1813 /* 1814 * In RANGE END_OFFSET mode, frame end is the last row that 1815 * satisfies the in_range constraint relative to the current row, 1816 * frame tail is the row after that (if any). We keep a copy of 1817 * the last-known frame tail row in frametail_slot, and advance as 1818 * necessary. Note that if we reach end of partition, we will 1819 * leave frametailpos = end+1 and frametail_slot empty. 1820 */ 1821 int sortCol = node->ordColIdx[0]; 1822 bool sub, 1823 less; 1824 1825 /* We must have an ordering column */ 1826 Assert(node->ordNumCols == 1); 1827 1828 /* Precompute flags for in_range checks */ 1829 if (frameOptions & FRAMEOPTION_END_OFFSET_PRECEDING) 1830 sub = true; /* subtract endOffset from current row */ 1831 else 1832 sub = false; /* add it */ 1833 less = true; /* normally, we want frame tail <= sum */ 1834 /* If sort order is descending, flip both flags */ 1835 if (!winstate->inRangeAsc) 1836 { 1837 sub = !sub; 1838 less = false; 1839 } 1840 1841 tuplestore_select_read_pointer(winstate->buffer, 1842 winstate->frametail_ptr); 1843 if (winstate->frametailpos == 0 && 1844 TupIsNull(winstate->frametail_slot)) 1845 { 1846 /* fetch first row into frametail_slot, if we didn't already */ 1847 if (!tuplestore_gettupleslot(winstate->buffer, true, true, 1848 winstate->frametail_slot)) 1849 elog(ERROR, "unexpected end of tuplestore"); 1850 } 1851 1852 while (!TupIsNull(winstate->frametail_slot)) 1853 { 1854 Datum tailval, 1855 currval; 1856 bool tailisnull, 1857 currisnull; 1858 1859 tailval = slot_getattr(winstate->frametail_slot, sortCol, 1860 &tailisnull); 1861 currval = slot_getattr(winstate->ss.ss_ScanTupleSlot, sortCol, 1862 &currisnull); 1863 if (tailisnull || currisnull) 1864 { 1865 /* order of the rows depends only on nulls_first */ 1866 if (winstate->inRangeNullsFirst) 1867 { 1868 /* advance tail if tail is null or curr is not */ 1869 if (!tailisnull) 1870 break; 1871 } 1872 else 1873 { 1874 /* advance tail if tail is not null or curr is null */ 1875 if (!currisnull) 1876 break; 1877 } 1878 } 1879 else 1880 { 1881 if (!DatumGetBool(FunctionCall5Coll(&winstate->endInRangeFunc, 1882 winstate->inRangeColl, 1883 tailval, 1884 currval, 1885 winstate->endOffsetValue, 1886 BoolGetDatum(sub), 1887 BoolGetDatum(less)))) 1888 break; /* this row is the correct frame tail */ 1889 } 1890 /* Note we advance frametailpos even if the fetch fails */ 1891 winstate->frametailpos++; 1892 spool_tuples(winstate, winstate->frametailpos); 1893 if (!tuplestore_gettupleslot(winstate->buffer, true, true, 1894 winstate->frametail_slot)) 1895 break; /* end of partition */ 1896 } 1897 winstate->frametail_valid = true; 1898 } 1899 else if (frameOptions & FRAMEOPTION_GROUPS) 1900 { 1901 /* 1902 * In GROUPS END_OFFSET mode, frame end is the last row of the 1903 * last peer group whose number satisfies the offset constraint, 1904 * and frame tail is the row after that (if any). We keep a copy 1905 * of the last-known frame tail row in frametail_slot, and advance 1906 * as necessary. Note that if we reach end of partition, we will 1907 * leave frametailpos = end+1 and frametail_slot empty. 1908 */ 1909 int64 offset = DatumGetInt64(winstate->endOffsetValue); 1910 int64 maxtailgroup; 1911 1912 if (frameOptions & FRAMEOPTION_END_OFFSET_PRECEDING) 1913 maxtailgroup = winstate->currentgroup - offset; 1914 else 1915 maxtailgroup = winstate->currentgroup + offset; 1916 1917 tuplestore_select_read_pointer(winstate->buffer, 1918 winstate->frametail_ptr); 1919 if (winstate->frametailpos == 0 && 1920 TupIsNull(winstate->frametail_slot)) 1921 { 1922 /* fetch first row into frametail_slot, if we didn't already */ 1923 if (!tuplestore_gettupleslot(winstate->buffer, true, true, 1924 winstate->frametail_slot)) 1925 elog(ERROR, "unexpected end of tuplestore"); 1926 } 1927 1928 while (!TupIsNull(winstate->frametail_slot)) 1929 { 1930 if (winstate->frametailgroup > maxtailgroup) 1931 break; /* this row is the correct frame tail */ 1932 ExecCopySlot(winstate->temp_slot_2, winstate->frametail_slot); 1933 /* Note we advance frametailpos even if the fetch fails */ 1934 winstate->frametailpos++; 1935 spool_tuples(winstate, winstate->frametailpos); 1936 if (!tuplestore_gettupleslot(winstate->buffer, true, true, 1937 winstate->frametail_slot)) 1938 break; /* end of partition */ 1939 if (!are_peers(winstate, winstate->temp_slot_2, 1940 winstate->frametail_slot)) 1941 winstate->frametailgroup++; 1942 } 1943 ExecClearTuple(winstate->temp_slot_2); 1944 winstate->frametail_valid = true; 1945 } 1946 else 1947 Assert(false); 1948 } 1949 else 1950 Assert(false); 1951 1952 MemoryContextSwitchTo(oldcontext); 1953 } 1954 1955 /* 1956 * update_grouptailpos 1957 * make grouptailpos valid for the current row 1958 * 1959 * May clobber winstate->temp_slot_2. 1960 */ 1961 static void 1962 update_grouptailpos(WindowAggState *winstate) 1963 { 1964 WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan; 1965 MemoryContext oldcontext; 1966 1967 if (winstate->grouptail_valid) 1968 return; /* already known for current row */ 1969 1970 /* We may be called in a short-lived context */ 1971 oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory); 1972 1973 /* If no ORDER BY, all rows are peers with each other */ 1974 if (node->ordNumCols == 0) 1975 { 1976 spool_tuples(winstate, -1); 1977 winstate->grouptailpos = winstate->spooled_rows; 1978 winstate->grouptail_valid = true; 1979 MemoryContextSwitchTo(oldcontext); 1980 return; 1981 } 1982 1983 /* 1984 * Because grouptail_valid is reset only when current row advances into a 1985 * new peer group, we always reach here knowing that grouptailpos needs to 1986 * be advanced by at least one row. Hence, unlike the otherwise similar 1987 * case for frame tail tracking, we do not need persistent storage of the 1988 * group tail row. 1989 */ 1990 Assert(winstate->grouptailpos <= winstate->currentpos); 1991 tuplestore_select_read_pointer(winstate->buffer, 1992 winstate->grouptail_ptr); 1993 for (;;) 1994 { 1995 /* Note we advance grouptailpos even if the fetch fails */ 1996 winstate->grouptailpos++; 1997 spool_tuples(winstate, winstate->grouptailpos); 1998 if (!tuplestore_gettupleslot(winstate->buffer, true, true, 1999 winstate->temp_slot_2)) 2000 break; /* end of partition */ 2001 if (winstate->grouptailpos > winstate->currentpos && 2002 !are_peers(winstate, winstate->temp_slot_2, 2003 winstate->ss.ss_ScanTupleSlot)) 2004 break; /* this row is the group tail */ 2005 } 2006 ExecClearTuple(winstate->temp_slot_2); 2007 winstate->grouptail_valid = true; 2008 2009 MemoryContextSwitchTo(oldcontext); 2010 } 2011 2012 2013 /* ----------------- 2014 * ExecWindowAgg 2015 * 2016 * ExecWindowAgg receives tuples from its outer subplan and 2017 * stores them into a tuplestore, then processes window functions. 2018 * This node doesn't reduce nor qualify any row so the number of 2019 * returned rows is exactly the same as its outer subplan's result. 2020 * ----------------- 2021 */ 2022 static TupleTableSlot * 2023 ExecWindowAgg(PlanState *pstate) 2024 { 2025 WindowAggState *winstate = castNode(WindowAggState, pstate); 2026 ExprContext *econtext; 2027 int i; 2028 int numfuncs; 2029 2030 CHECK_FOR_INTERRUPTS(); 2031 2032 if (winstate->all_done) 2033 return NULL; 2034 2035 /* 2036 * Compute frame offset values, if any, during first call (or after a 2037 * rescan). These are assumed to hold constant throughout the scan; if 2038 * user gives us a volatile expression, we'll only use its initial value. 2039 */ 2040 if (winstate->all_first) 2041 { 2042 int frameOptions = winstate->frameOptions; 2043 ExprContext *econtext = winstate->ss.ps.ps_ExprContext; 2044 Datum value; 2045 bool isnull; 2046 int16 len; 2047 bool byval; 2048 2049 if (frameOptions & FRAMEOPTION_START_OFFSET) 2050 { 2051 Assert(winstate->startOffset != NULL); 2052 value = ExecEvalExprSwitchContext(winstate->startOffset, 2053 econtext, 2054 &isnull); 2055 if (isnull) 2056 ereport(ERROR, 2057 (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), 2058 errmsg("frame starting offset must not be null"))); 2059 /* copy value into query-lifespan context */ 2060 get_typlenbyval(exprType((Node *) winstate->startOffset->expr), 2061 &len, &byval); 2062 winstate->startOffsetValue = datumCopy(value, byval, len); 2063 if (frameOptions & (FRAMEOPTION_ROWS | FRAMEOPTION_GROUPS)) 2064 { 2065 /* value is known to be int8 */ 2066 int64 offset = DatumGetInt64(value); 2067 2068 if (offset < 0) 2069 ereport(ERROR, 2070 (errcode(ERRCODE_INVALID_PRECEDING_OR_FOLLOWING_SIZE), 2071 errmsg("frame starting offset must not be negative"))); 2072 } 2073 } 2074 if (frameOptions & FRAMEOPTION_END_OFFSET) 2075 { 2076 Assert(winstate->endOffset != NULL); 2077 value = ExecEvalExprSwitchContext(winstate->endOffset, 2078 econtext, 2079 &isnull); 2080 if (isnull) 2081 ereport(ERROR, 2082 (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), 2083 errmsg("frame ending offset must not be null"))); 2084 /* copy value into query-lifespan context */ 2085 get_typlenbyval(exprType((Node *) winstate->endOffset->expr), 2086 &len, &byval); 2087 winstate->endOffsetValue = datumCopy(value, byval, len); 2088 if (frameOptions & (FRAMEOPTION_ROWS | FRAMEOPTION_GROUPS)) 2089 { 2090 /* value is known to be int8 */ 2091 int64 offset = DatumGetInt64(value); 2092 2093 if (offset < 0) 2094 ereport(ERROR, 2095 (errcode(ERRCODE_INVALID_PRECEDING_OR_FOLLOWING_SIZE), 2096 errmsg("frame ending offset must not be negative"))); 2097 } 2098 } 2099 winstate->all_first = false; 2100 } 2101 2102 if (winstate->buffer == NULL) 2103 { 2104 /* Initialize for first partition and set current row = 0 */ 2105 begin_partition(winstate); 2106 /* If there are no input rows, we'll detect that and exit below */ 2107 } 2108 else 2109 { 2110 /* Advance current row within partition */ 2111 winstate->currentpos++; 2112 /* This might mean that the frame moves, too */ 2113 winstate->framehead_valid = false; 2114 winstate->frametail_valid = false; 2115 /* we don't need to invalidate grouptail here; see below */ 2116 } 2117 2118 /* 2119 * Spool all tuples up to and including the current row, if we haven't 2120 * already 2121 */ 2122 spool_tuples(winstate, winstate->currentpos); 2123 2124 /* Move to the next partition if we reached the end of this partition */ 2125 if (winstate->partition_spooled && 2126 winstate->currentpos >= winstate->spooled_rows) 2127 { 2128 release_partition(winstate); 2129 2130 if (winstate->more_partitions) 2131 { 2132 begin_partition(winstate); 2133 Assert(winstate->spooled_rows > 0); 2134 } 2135 else 2136 { 2137 winstate->all_done = true; 2138 return NULL; 2139 } 2140 } 2141 2142 /* final output execution is in ps_ExprContext */ 2143 econtext = winstate->ss.ps.ps_ExprContext; 2144 2145 /* Clear the per-output-tuple context for current row */ 2146 ResetExprContext(econtext); 2147 2148 /* 2149 * Read the current row from the tuplestore, and save in ScanTupleSlot. 2150 * (We can't rely on the outerplan's output slot because we may have to 2151 * read beyond the current row. Also, we have to actually copy the row 2152 * out of the tuplestore, since window function evaluation might cause the 2153 * tuplestore to dump its state to disk.) 2154 * 2155 * In GROUPS mode, or when tracking a group-oriented exclusion clause, we 2156 * must also detect entering a new peer group and update associated state 2157 * when that happens. We use temp_slot_2 to temporarily hold the previous 2158 * row for this purpose. 2159 * 2160 * Current row must be in the tuplestore, since we spooled it above. 2161 */ 2162 tuplestore_select_read_pointer(winstate->buffer, winstate->current_ptr); 2163 if ((winstate->frameOptions & (FRAMEOPTION_GROUPS | 2164 FRAMEOPTION_EXCLUDE_GROUP | 2165 FRAMEOPTION_EXCLUDE_TIES)) && 2166 winstate->currentpos > 0) 2167 { 2168 ExecCopySlot(winstate->temp_slot_2, winstate->ss.ss_ScanTupleSlot); 2169 if (!tuplestore_gettupleslot(winstate->buffer, true, true, 2170 winstate->ss.ss_ScanTupleSlot)) 2171 elog(ERROR, "unexpected end of tuplestore"); 2172 if (!are_peers(winstate, winstate->temp_slot_2, 2173 winstate->ss.ss_ScanTupleSlot)) 2174 { 2175 winstate->currentgroup++; 2176 winstate->groupheadpos = winstate->currentpos; 2177 winstate->grouptail_valid = false; 2178 } 2179 ExecClearTuple(winstate->temp_slot_2); 2180 } 2181 else 2182 { 2183 if (!tuplestore_gettupleslot(winstate->buffer, true, true, 2184 winstate->ss.ss_ScanTupleSlot)) 2185 elog(ERROR, "unexpected end of tuplestore"); 2186 } 2187 2188 /* 2189 * Evaluate true window functions 2190 */ 2191 numfuncs = winstate->numfuncs; 2192 for (i = 0; i < numfuncs; i++) 2193 { 2194 WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]); 2195 2196 if (perfuncstate->plain_agg) 2197 continue; 2198 eval_windowfunction(winstate, perfuncstate, 2199 &(econtext->ecxt_aggvalues[perfuncstate->wfuncstate->wfuncno]), 2200 &(econtext->ecxt_aggnulls[perfuncstate->wfuncstate->wfuncno])); 2201 } 2202 2203 /* 2204 * Evaluate aggregates 2205 */ 2206 if (winstate->numaggs > 0) 2207 eval_windowaggregates(winstate); 2208 2209 /* 2210 * If we have created auxiliary read pointers for the frame or group 2211 * boundaries, force them to be kept up-to-date, because we don't know 2212 * whether the window function(s) will do anything that requires that. 2213 * Failing to advance the pointers would result in being unable to trim 2214 * data from the tuplestore, which is bad. (If we could know in advance 2215 * whether the window functions will use frame boundary info, we could 2216 * skip creating these pointers in the first place ... but unfortunately 2217 * the window function API doesn't require that.) 2218 */ 2219 if (winstate->framehead_ptr >= 0) 2220 update_frameheadpos(winstate); 2221 if (winstate->frametail_ptr >= 0) 2222 update_frametailpos(winstate); 2223 if (winstate->grouptail_ptr >= 0) 2224 update_grouptailpos(winstate); 2225 2226 /* 2227 * Truncate any no-longer-needed rows from the tuplestore. 2228 */ 2229 tuplestore_trim(winstate->buffer); 2230 2231 /* 2232 * Form and return a projection tuple using the windowfunc results and the 2233 * current row. Setting ecxt_outertuple arranges that any Vars will be 2234 * evaluated with respect to that row. 2235 */ 2236 econtext->ecxt_outertuple = winstate->ss.ss_ScanTupleSlot; 2237 2238 return ExecProject(winstate->ss.ps.ps_ProjInfo); 2239 } 2240 2241 /* ----------------- 2242 * ExecInitWindowAgg 2243 * 2244 * Creates the run-time information for the WindowAgg node produced by the 2245 * planner and initializes its outer subtree 2246 * ----------------- 2247 */ 2248 WindowAggState * 2249 ExecInitWindowAgg(WindowAgg *node, EState *estate, int eflags) 2250 { 2251 WindowAggState *winstate; 2252 Plan *outerPlan; 2253 ExprContext *econtext; 2254 ExprContext *tmpcontext; 2255 WindowStatePerFunc perfunc; 2256 WindowStatePerAgg peragg; 2257 int frameOptions = node->frameOptions; 2258 int numfuncs, 2259 wfuncno, 2260 numaggs, 2261 aggno; 2262 TupleDesc scanDesc; 2263 ListCell *l; 2264 2265 /* check for unsupported flags */ 2266 Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK))); 2267 2268 /* 2269 * create state structure 2270 */ 2271 winstate = makeNode(WindowAggState); 2272 winstate->ss.ps.plan = (Plan *) node; 2273 winstate->ss.ps.state = estate; 2274 winstate->ss.ps.ExecProcNode = ExecWindowAgg; 2275 2276 /* 2277 * Create expression contexts. We need two, one for per-input-tuple 2278 * processing and one for per-output-tuple processing. We cheat a little 2279 * by using ExecAssignExprContext() to build both. 2280 */ 2281 ExecAssignExprContext(estate, &winstate->ss.ps); 2282 tmpcontext = winstate->ss.ps.ps_ExprContext; 2283 winstate->tmpcontext = tmpcontext; 2284 ExecAssignExprContext(estate, &winstate->ss.ps); 2285 2286 /* Create long-lived context for storage of partition-local memory etc */ 2287 winstate->partcontext = 2288 AllocSetContextCreate(CurrentMemoryContext, 2289 "WindowAgg Partition", 2290 ALLOCSET_DEFAULT_SIZES); 2291 2292 /* 2293 * Create mid-lived context for aggregate trans values etc. 2294 * 2295 * Note that moving aggregates each use their own private context, not 2296 * this one. 2297 */ 2298 winstate->aggcontext = 2299 AllocSetContextCreate(CurrentMemoryContext, 2300 "WindowAgg Aggregates", 2301 ALLOCSET_DEFAULT_SIZES); 2302 2303 /* 2304 * WindowAgg nodes never have quals, since they can only occur at the 2305 * logical top level of a query (ie, after any WHERE or HAVING filters) 2306 */ 2307 Assert(node->plan.qual == NIL); 2308 winstate->ss.ps.qual = NULL; 2309 2310 /* 2311 * initialize child nodes 2312 */ 2313 outerPlan = outerPlan(node); 2314 outerPlanState(winstate) = ExecInitNode(outerPlan, estate, eflags); 2315 2316 /* 2317 * initialize source tuple type (which is also the tuple type that we'll 2318 * store in the tuplestore and use in all our working slots). 2319 */ 2320 ExecCreateScanSlotFromOuterPlan(estate, &winstate->ss, &TTSOpsMinimalTuple); 2321 scanDesc = winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor; 2322 2323 /* the outer tuple isn't the child's tuple, but always a minimal tuple */ 2324 winstate->ss.ps.outeropsset = true; 2325 winstate->ss.ps.outerops = &TTSOpsMinimalTuple; 2326 winstate->ss.ps.outeropsfixed = true; 2327 2328 /* 2329 * tuple table initialization 2330 */ 2331 winstate->first_part_slot = ExecInitExtraTupleSlot(estate, scanDesc, 2332 &TTSOpsMinimalTuple); 2333 winstate->agg_row_slot = ExecInitExtraTupleSlot(estate, scanDesc, 2334 &TTSOpsMinimalTuple); 2335 winstate->temp_slot_1 = ExecInitExtraTupleSlot(estate, scanDesc, 2336 &TTSOpsMinimalTuple); 2337 winstate->temp_slot_2 = ExecInitExtraTupleSlot(estate, scanDesc, 2338 &TTSOpsMinimalTuple); 2339 2340 /* 2341 * create frame head and tail slots only if needed (must create slots in 2342 * exactly the same cases that update_frameheadpos and update_frametailpos 2343 * need them) 2344 */ 2345 winstate->framehead_slot = winstate->frametail_slot = NULL; 2346 2347 if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS)) 2348 { 2349 if (((frameOptions & FRAMEOPTION_START_CURRENT_ROW) && 2350 node->ordNumCols != 0) || 2351 (frameOptions & FRAMEOPTION_START_OFFSET)) 2352 winstate->framehead_slot = ExecInitExtraTupleSlot(estate, scanDesc, 2353 &TTSOpsMinimalTuple); 2354 if (((frameOptions & FRAMEOPTION_END_CURRENT_ROW) && 2355 node->ordNumCols != 0) || 2356 (frameOptions & FRAMEOPTION_END_OFFSET)) 2357 winstate->frametail_slot = ExecInitExtraTupleSlot(estate, scanDesc, 2358 &TTSOpsMinimalTuple); 2359 } 2360 2361 /* 2362 * Initialize result slot, type and projection. 2363 */ 2364 ExecInitResultTupleSlotTL(&winstate->ss.ps, &TTSOpsVirtual); 2365 ExecAssignProjectionInfo(&winstate->ss.ps, NULL); 2366 2367 /* Set up data for comparing tuples */ 2368 if (node->partNumCols > 0) 2369 winstate->partEqfunction = 2370 execTuplesMatchPrepare(scanDesc, 2371 node->partNumCols, 2372 node->partColIdx, 2373 node->partOperators, 2374 node->partCollations, 2375 &winstate->ss.ps); 2376 2377 if (node->ordNumCols > 0) 2378 winstate->ordEqfunction = 2379 execTuplesMatchPrepare(scanDesc, 2380 node->ordNumCols, 2381 node->ordColIdx, 2382 node->ordOperators, 2383 node->ordCollations, 2384 &winstate->ss.ps); 2385 2386 /* 2387 * WindowAgg nodes use aggvalues and aggnulls as well as Agg nodes. 2388 */ 2389 numfuncs = winstate->numfuncs; 2390 numaggs = winstate->numaggs; 2391 econtext = winstate->ss.ps.ps_ExprContext; 2392 econtext->ecxt_aggvalues = (Datum *) palloc0(sizeof(Datum) * numfuncs); 2393 econtext->ecxt_aggnulls = (bool *) palloc0(sizeof(bool) * numfuncs); 2394 2395 /* 2396 * allocate per-wfunc/per-agg state information. 2397 */ 2398 perfunc = (WindowStatePerFunc) palloc0(sizeof(WindowStatePerFuncData) * numfuncs); 2399 peragg = (WindowStatePerAgg) palloc0(sizeof(WindowStatePerAggData) * numaggs); 2400 winstate->perfunc = perfunc; 2401 winstate->peragg = peragg; 2402 2403 wfuncno = -1; 2404 aggno = -1; 2405 foreach(l, winstate->funcs) 2406 { 2407 WindowFuncExprState *wfuncstate = (WindowFuncExprState *) lfirst(l); 2408 WindowFunc *wfunc = wfuncstate->wfunc; 2409 WindowStatePerFunc perfuncstate; 2410 AclResult aclresult; 2411 int i; 2412 2413 if (wfunc->winref != node->winref) /* planner screwed up? */ 2414 elog(ERROR, "WindowFunc with winref %u assigned to WindowAgg with winref %u", 2415 wfunc->winref, node->winref); 2416 2417 /* Look for a previous duplicate window function */ 2418 for (i = 0; i <= wfuncno; i++) 2419 { 2420 if (equal(wfunc, perfunc[i].wfunc) && 2421 !contain_volatile_functions((Node *) wfunc)) 2422 break; 2423 } 2424 if (i <= wfuncno) 2425 { 2426 /* Found a match to an existing entry, so just mark it */ 2427 wfuncstate->wfuncno = i; 2428 continue; 2429 } 2430 2431 /* Nope, so assign a new PerAgg record */ 2432 perfuncstate = &perfunc[++wfuncno]; 2433 2434 /* Mark WindowFunc state node with assigned index in the result array */ 2435 wfuncstate->wfuncno = wfuncno; 2436 2437 /* Check permission to call window function */ 2438 aclresult = pg_proc_aclcheck(wfunc->winfnoid, GetUserId(), 2439 ACL_EXECUTE); 2440 if (aclresult != ACLCHECK_OK) 2441 aclcheck_error(aclresult, OBJECT_FUNCTION, 2442 get_func_name(wfunc->winfnoid)); 2443 InvokeFunctionExecuteHook(wfunc->winfnoid); 2444 2445 /* Fill in the perfuncstate data */ 2446 perfuncstate->wfuncstate = wfuncstate; 2447 perfuncstate->wfunc = wfunc; 2448 perfuncstate->numArguments = list_length(wfuncstate->args); 2449 2450 fmgr_info_cxt(wfunc->winfnoid, &perfuncstate->flinfo, 2451 econtext->ecxt_per_query_memory); 2452 fmgr_info_set_expr((Node *) wfunc, &perfuncstate->flinfo); 2453 2454 perfuncstate->winCollation = wfunc->inputcollid; 2455 2456 get_typlenbyval(wfunc->wintype, 2457 &perfuncstate->resulttypeLen, 2458 &perfuncstate->resulttypeByVal); 2459 2460 /* 2461 * If it's really just a plain aggregate function, we'll emulate the 2462 * Agg environment for it. 2463 */ 2464 perfuncstate->plain_agg = wfunc->winagg; 2465 if (wfunc->winagg) 2466 { 2467 WindowStatePerAgg peraggstate; 2468 2469 perfuncstate->aggno = ++aggno; 2470 peraggstate = &winstate->peragg[aggno]; 2471 initialize_peragg(winstate, wfunc, peraggstate); 2472 peraggstate->wfuncno = wfuncno; 2473 } 2474 else 2475 { 2476 WindowObject winobj = makeNode(WindowObjectData); 2477 2478 winobj->winstate = winstate; 2479 winobj->argstates = wfuncstate->args; 2480 winobj->localmem = NULL; 2481 perfuncstate->winobj = winobj; 2482 } 2483 } 2484 2485 /* Update numfuncs, numaggs to match number of unique functions found */ 2486 winstate->numfuncs = wfuncno + 1; 2487 winstate->numaggs = aggno + 1; 2488 2489 /* Set up WindowObject for aggregates, if needed */ 2490 if (winstate->numaggs > 0) 2491 { 2492 WindowObject agg_winobj = makeNode(WindowObjectData); 2493 2494 agg_winobj->winstate = winstate; 2495 agg_winobj->argstates = NIL; 2496 agg_winobj->localmem = NULL; 2497 /* make sure markptr = -1 to invalidate. It may not get used */ 2498 agg_winobj->markptr = -1; 2499 agg_winobj->readptr = -1; 2500 winstate->agg_winobj = agg_winobj; 2501 } 2502 2503 /* copy frame options to state node for easy access */ 2504 winstate->frameOptions = frameOptions; 2505 2506 /* initialize frame bound offset expressions */ 2507 winstate->startOffset = ExecInitExpr((Expr *) node->startOffset, 2508 (PlanState *) winstate); 2509 winstate->endOffset = ExecInitExpr((Expr *) node->endOffset, 2510 (PlanState *) winstate); 2511 2512 /* Lookup in_range support functions if needed */ 2513 if (OidIsValid(node->startInRangeFunc)) 2514 fmgr_info(node->startInRangeFunc, &winstate->startInRangeFunc); 2515 if (OidIsValid(node->endInRangeFunc)) 2516 fmgr_info(node->endInRangeFunc, &winstate->endInRangeFunc); 2517 winstate->inRangeColl = node->inRangeColl; 2518 winstate->inRangeAsc = node->inRangeAsc; 2519 winstate->inRangeNullsFirst = node->inRangeNullsFirst; 2520 2521 winstate->all_first = true; 2522 winstate->partition_spooled = false; 2523 winstate->more_partitions = false; 2524 2525 return winstate; 2526 } 2527 2528 /* ----------------- 2529 * ExecEndWindowAgg 2530 * ----------------- 2531 */ 2532 void 2533 ExecEndWindowAgg(WindowAggState *node) 2534 { 2535 PlanState *outerPlan; 2536 int i; 2537 2538 release_partition(node); 2539 2540 ExecClearTuple(node->ss.ss_ScanTupleSlot); 2541 ExecClearTuple(node->first_part_slot); 2542 ExecClearTuple(node->agg_row_slot); 2543 ExecClearTuple(node->temp_slot_1); 2544 ExecClearTuple(node->temp_slot_2); 2545 if (node->framehead_slot) 2546 ExecClearTuple(node->framehead_slot); 2547 if (node->frametail_slot) 2548 ExecClearTuple(node->frametail_slot); 2549 2550 /* 2551 * Free both the expr contexts. 2552 */ 2553 ExecFreeExprContext(&node->ss.ps); 2554 node->ss.ps.ps_ExprContext = node->tmpcontext; 2555 ExecFreeExprContext(&node->ss.ps); 2556 2557 for (i = 0; i < node->numaggs; i++) 2558 { 2559 if (node->peragg[i].aggcontext != node->aggcontext) 2560 MemoryContextDelete(node->peragg[i].aggcontext); 2561 } 2562 MemoryContextDelete(node->partcontext); 2563 MemoryContextDelete(node->aggcontext); 2564 2565 pfree(node->perfunc); 2566 pfree(node->peragg); 2567 2568 outerPlan = outerPlanState(node); 2569 ExecEndNode(outerPlan); 2570 } 2571 2572 /* ----------------- 2573 * ExecReScanWindowAgg 2574 * ----------------- 2575 */ 2576 void 2577 ExecReScanWindowAgg(WindowAggState *node) 2578 { 2579 PlanState *outerPlan = outerPlanState(node); 2580 ExprContext *econtext = node->ss.ps.ps_ExprContext; 2581 2582 node->all_done = false; 2583 node->all_first = true; 2584 2585 /* release tuplestore et al */ 2586 release_partition(node); 2587 2588 /* release all temp tuples, but especially first_part_slot */ 2589 ExecClearTuple(node->ss.ss_ScanTupleSlot); 2590 ExecClearTuple(node->first_part_slot); 2591 ExecClearTuple(node->agg_row_slot); 2592 ExecClearTuple(node->temp_slot_1); 2593 ExecClearTuple(node->temp_slot_2); 2594 if (node->framehead_slot) 2595 ExecClearTuple(node->framehead_slot); 2596 if (node->frametail_slot) 2597 ExecClearTuple(node->frametail_slot); 2598 2599 /* Forget current wfunc values */ 2600 MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * node->numfuncs); 2601 MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * node->numfuncs); 2602 2603 /* 2604 * if chgParam of subnode is not null then plan will be re-scanned by 2605 * first ExecProcNode. 2606 */ 2607 if (outerPlan->chgParam == NULL) 2608 ExecReScan(outerPlan); 2609 } 2610 2611 /* 2612 * initialize_peragg 2613 * 2614 * Almost same as in nodeAgg.c, except we don't support DISTINCT currently. 2615 */ 2616 static WindowStatePerAggData * 2617 initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc, 2618 WindowStatePerAgg peraggstate) 2619 { 2620 Oid inputTypes[FUNC_MAX_ARGS]; 2621 int numArguments; 2622 HeapTuple aggTuple; 2623 Form_pg_aggregate aggform; 2624 Oid aggtranstype; 2625 AttrNumber initvalAttNo; 2626 AclResult aclresult; 2627 bool use_ma_code; 2628 Oid transfn_oid, 2629 invtransfn_oid, 2630 finalfn_oid; 2631 bool finalextra; 2632 char finalmodify; 2633 Expr *transfnexpr, 2634 *invtransfnexpr, 2635 *finalfnexpr; 2636 Datum textInitVal; 2637 int i; 2638 ListCell *lc; 2639 2640 numArguments = list_length(wfunc->args); 2641 2642 i = 0; 2643 foreach(lc, wfunc->args) 2644 { 2645 inputTypes[i++] = exprType((Node *) lfirst(lc)); 2646 } 2647 2648 aggTuple = SearchSysCache1(AGGFNOID, ObjectIdGetDatum(wfunc->winfnoid)); 2649 if (!HeapTupleIsValid(aggTuple)) 2650 elog(ERROR, "cache lookup failed for aggregate %u", 2651 wfunc->winfnoid); 2652 aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple); 2653 2654 /* 2655 * Figure out whether we want to use the moving-aggregate implementation, 2656 * and collect the right set of fields from the pg_attribute entry. 2657 * 2658 * It's possible that an aggregate would supply a safe moving-aggregate 2659 * implementation and an unsafe normal one, in which case our hand is 2660 * forced. Otherwise, if the frame head can't move, we don't need 2661 * moving-aggregate code. Even if we'd like to use it, don't do so if the 2662 * aggregate's arguments (and FILTER clause if any) contain any calls to 2663 * volatile functions. Otherwise, the difference between restarting and 2664 * not restarting the aggregation would be user-visible. 2665 */ 2666 if (!OidIsValid(aggform->aggminvtransfn)) 2667 use_ma_code = false; /* sine qua non */ 2668 else if (aggform->aggmfinalmodify == AGGMODIFY_READ_ONLY && 2669 aggform->aggfinalmodify != AGGMODIFY_READ_ONLY) 2670 use_ma_code = true; /* decision forced by safety */ 2671 else if (winstate->frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING) 2672 use_ma_code = false; /* non-moving frame head */ 2673 else if (contain_volatile_functions((Node *) wfunc)) 2674 use_ma_code = false; /* avoid possible behavioral change */ 2675 else 2676 use_ma_code = true; /* yes, let's use it */ 2677 if (use_ma_code) 2678 { 2679 peraggstate->transfn_oid = transfn_oid = aggform->aggmtransfn; 2680 peraggstate->invtransfn_oid = invtransfn_oid = aggform->aggminvtransfn; 2681 peraggstate->finalfn_oid = finalfn_oid = aggform->aggmfinalfn; 2682 finalextra = aggform->aggmfinalextra; 2683 finalmodify = aggform->aggmfinalmodify; 2684 aggtranstype = aggform->aggmtranstype; 2685 initvalAttNo = Anum_pg_aggregate_aggminitval; 2686 } 2687 else 2688 { 2689 peraggstate->transfn_oid = transfn_oid = aggform->aggtransfn; 2690 peraggstate->invtransfn_oid = invtransfn_oid = InvalidOid; 2691 peraggstate->finalfn_oid = finalfn_oid = aggform->aggfinalfn; 2692 finalextra = aggform->aggfinalextra; 2693 finalmodify = aggform->aggfinalmodify; 2694 aggtranstype = aggform->aggtranstype; 2695 initvalAttNo = Anum_pg_aggregate_agginitval; 2696 } 2697 2698 /* 2699 * ExecInitWindowAgg already checked permission to call aggregate function 2700 * ... but we still need to check the component functions 2701 */ 2702 2703 /* Check that aggregate owner has permission to call component fns */ 2704 { 2705 HeapTuple procTuple; 2706 Oid aggOwner; 2707 2708 procTuple = SearchSysCache1(PROCOID, 2709 ObjectIdGetDatum(wfunc->winfnoid)); 2710 if (!HeapTupleIsValid(procTuple)) 2711 elog(ERROR, "cache lookup failed for function %u", 2712 wfunc->winfnoid); 2713 aggOwner = ((Form_pg_proc) GETSTRUCT(procTuple))->proowner; 2714 ReleaseSysCache(procTuple); 2715 2716 aclresult = pg_proc_aclcheck(transfn_oid, aggOwner, 2717 ACL_EXECUTE); 2718 if (aclresult != ACLCHECK_OK) 2719 aclcheck_error(aclresult, OBJECT_FUNCTION, 2720 get_func_name(transfn_oid)); 2721 InvokeFunctionExecuteHook(transfn_oid); 2722 2723 if (OidIsValid(invtransfn_oid)) 2724 { 2725 aclresult = pg_proc_aclcheck(invtransfn_oid, aggOwner, 2726 ACL_EXECUTE); 2727 if (aclresult != ACLCHECK_OK) 2728 aclcheck_error(aclresult, OBJECT_FUNCTION, 2729 get_func_name(invtransfn_oid)); 2730 InvokeFunctionExecuteHook(invtransfn_oid); 2731 } 2732 2733 if (OidIsValid(finalfn_oid)) 2734 { 2735 aclresult = pg_proc_aclcheck(finalfn_oid, aggOwner, 2736 ACL_EXECUTE); 2737 if (aclresult != ACLCHECK_OK) 2738 aclcheck_error(aclresult, OBJECT_FUNCTION, 2739 get_func_name(finalfn_oid)); 2740 InvokeFunctionExecuteHook(finalfn_oid); 2741 } 2742 } 2743 2744 /* 2745 * If the selected finalfn isn't read-only, we can't run this aggregate as 2746 * a window function. This is a user-facing error, so we take a bit more 2747 * care with the error message than elsewhere in this function. 2748 */ 2749 if (finalmodify != AGGMODIFY_READ_ONLY) 2750 ereport(ERROR, 2751 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), 2752 errmsg("aggregate function %s does not support use as a window function", 2753 format_procedure(wfunc->winfnoid)))); 2754 2755 /* Detect how many arguments to pass to the finalfn */ 2756 if (finalextra) 2757 peraggstate->numFinalArgs = numArguments + 1; 2758 else 2759 peraggstate->numFinalArgs = 1; 2760 2761 /* resolve actual type of transition state, if polymorphic */ 2762 aggtranstype = resolve_aggregate_transtype(wfunc->winfnoid, 2763 aggtranstype, 2764 inputTypes, 2765 numArguments); 2766 2767 /* build expression trees using actual argument & result types */ 2768 build_aggregate_transfn_expr(inputTypes, 2769 numArguments, 2770 0, /* no ordered-set window functions yet */ 2771 false, /* no variadic window functions yet */ 2772 aggtranstype, 2773 wfunc->inputcollid, 2774 transfn_oid, 2775 invtransfn_oid, 2776 &transfnexpr, 2777 &invtransfnexpr); 2778 2779 /* set up infrastructure for calling the transfn(s) and finalfn */ 2780 fmgr_info(transfn_oid, &peraggstate->transfn); 2781 fmgr_info_set_expr((Node *) transfnexpr, &peraggstate->transfn); 2782 2783 if (OidIsValid(invtransfn_oid)) 2784 { 2785 fmgr_info(invtransfn_oid, &peraggstate->invtransfn); 2786 fmgr_info_set_expr((Node *) invtransfnexpr, &peraggstate->invtransfn); 2787 } 2788 2789 if (OidIsValid(finalfn_oid)) 2790 { 2791 build_aggregate_finalfn_expr(inputTypes, 2792 peraggstate->numFinalArgs, 2793 aggtranstype, 2794 wfunc->wintype, 2795 wfunc->inputcollid, 2796 finalfn_oid, 2797 &finalfnexpr); 2798 fmgr_info(finalfn_oid, &peraggstate->finalfn); 2799 fmgr_info_set_expr((Node *) finalfnexpr, &peraggstate->finalfn); 2800 } 2801 2802 /* get info about relevant datatypes */ 2803 get_typlenbyval(wfunc->wintype, 2804 &peraggstate->resulttypeLen, 2805 &peraggstate->resulttypeByVal); 2806 get_typlenbyval(aggtranstype, 2807 &peraggstate->transtypeLen, 2808 &peraggstate->transtypeByVal); 2809 2810 /* 2811 * initval is potentially null, so don't try to access it as a struct 2812 * field. Must do it the hard way with SysCacheGetAttr. 2813 */ 2814 textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple, initvalAttNo, 2815 &peraggstate->initValueIsNull); 2816 2817 if (peraggstate->initValueIsNull) 2818 peraggstate->initValue = (Datum) 0; 2819 else 2820 peraggstate->initValue = GetAggInitVal(textInitVal, 2821 aggtranstype); 2822 2823 /* 2824 * If the transfn is strict and the initval is NULL, make sure input type 2825 * and transtype are the same (or at least binary-compatible), so that 2826 * it's OK to use the first input value as the initial transValue. This 2827 * should have been checked at agg definition time, but we must check 2828 * again in case the transfn's strictness property has been changed. 2829 */ 2830 if (peraggstate->transfn.fn_strict && peraggstate->initValueIsNull) 2831 { 2832 if (numArguments < 1 || 2833 !IsBinaryCoercible(inputTypes[0], aggtranstype)) 2834 ereport(ERROR, 2835 (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), 2836 errmsg("aggregate %u needs to have compatible input type and transition type", 2837 wfunc->winfnoid))); 2838 } 2839 2840 /* 2841 * Insist that forward and inverse transition functions have the same 2842 * strictness setting. Allowing them to differ would require handling 2843 * more special cases in advance_windowaggregate and 2844 * advance_windowaggregate_base, for no discernible benefit. This should 2845 * have been checked at agg definition time, but we must check again in 2846 * case either function's strictness property has been changed. 2847 */ 2848 if (OidIsValid(invtransfn_oid) && 2849 peraggstate->transfn.fn_strict != peraggstate->invtransfn.fn_strict) 2850 ereport(ERROR, 2851 (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), 2852 errmsg("strictness of aggregate's forward and inverse transition functions must match"))); 2853 2854 /* 2855 * Moving aggregates use their own aggcontext. 2856 * 2857 * This is necessary because they might restart at different times, so we 2858 * might never be able to reset the shared context otherwise. We can't 2859 * make it the aggregates' responsibility to clean up after themselves, 2860 * because strict aggregates must be restarted whenever we remove their 2861 * last non-NULL input, which the aggregate won't be aware is happening. 2862 * Also, just pfree()ing the transValue upon restarting wouldn't help, 2863 * since we'd miss any indirectly referenced data. We could, in theory, 2864 * make the memory allocation rules for moving aggregates different than 2865 * they have historically been for plain aggregates, but that seems grotty 2866 * and likely to lead to memory leaks. 2867 */ 2868 if (OidIsValid(invtransfn_oid)) 2869 peraggstate->aggcontext = 2870 AllocSetContextCreate(CurrentMemoryContext, 2871 "WindowAgg Per Aggregate", 2872 ALLOCSET_DEFAULT_SIZES); 2873 else 2874 peraggstate->aggcontext = winstate->aggcontext; 2875 2876 ReleaseSysCache(aggTuple); 2877 2878 return peraggstate; 2879 } 2880 2881 static Datum 2882 GetAggInitVal(Datum textInitVal, Oid transtype) 2883 { 2884 Oid typinput, 2885 typioparam; 2886 char *strInitVal; 2887 Datum initVal; 2888 2889 getTypeInputInfo(transtype, &typinput, &typioparam); 2890 strInitVal = TextDatumGetCString(textInitVal); 2891 initVal = OidInputFunctionCall(typinput, strInitVal, 2892 typioparam, -1); 2893 pfree(strInitVal); 2894 return initVal; 2895 } 2896 2897 /* 2898 * are_peers 2899 * compare two rows to see if they are equal according to the ORDER BY clause 2900 * 2901 * NB: this does not consider the window frame mode. 2902 */ 2903 static bool 2904 are_peers(WindowAggState *winstate, TupleTableSlot *slot1, 2905 TupleTableSlot *slot2) 2906 { 2907 WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan; 2908 ExprContext *econtext = winstate->tmpcontext; 2909 2910 /* If no ORDER BY, all rows are peers with each other */ 2911 if (node->ordNumCols == 0) 2912 return true; 2913 2914 econtext->ecxt_outertuple = slot1; 2915 econtext->ecxt_innertuple = slot2; 2916 return ExecQualAndReset(winstate->ordEqfunction, econtext); 2917 } 2918 2919 /* 2920 * window_gettupleslot 2921 * Fetch the pos'th tuple of the current partition into the slot, 2922 * using the winobj's read pointer 2923 * 2924 * Returns true if successful, false if no such row 2925 */ 2926 static bool 2927 window_gettupleslot(WindowObject winobj, int64 pos, TupleTableSlot *slot) 2928 { 2929 WindowAggState *winstate = winobj->winstate; 2930 MemoryContext oldcontext; 2931 2932 /* often called repeatedly in a row */ 2933 CHECK_FOR_INTERRUPTS(); 2934 2935 /* Don't allow passing -1 to spool_tuples here */ 2936 if (pos < 0) 2937 return false; 2938 2939 /* If necessary, fetch the tuple into the spool */ 2940 spool_tuples(winstate, pos); 2941 2942 if (pos >= winstate->spooled_rows) 2943 return false; 2944 2945 if (pos < winobj->markpos) 2946 elog(ERROR, "cannot fetch row before WindowObject's mark position"); 2947 2948 oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory); 2949 2950 tuplestore_select_read_pointer(winstate->buffer, winobj->readptr); 2951 2952 /* 2953 * Advance or rewind until we are within one tuple of the one we want. 2954 */ 2955 if (winobj->seekpos < pos - 1) 2956 { 2957 if (!tuplestore_skiptuples(winstate->buffer, 2958 pos - 1 - winobj->seekpos, 2959 true)) 2960 elog(ERROR, "unexpected end of tuplestore"); 2961 winobj->seekpos = pos - 1; 2962 } 2963 else if (winobj->seekpos > pos + 1) 2964 { 2965 if (!tuplestore_skiptuples(winstate->buffer, 2966 winobj->seekpos - (pos + 1), 2967 false)) 2968 elog(ERROR, "unexpected end of tuplestore"); 2969 winobj->seekpos = pos + 1; 2970 } 2971 else if (winobj->seekpos == pos) 2972 { 2973 /* 2974 * There's no API to refetch the tuple at the current position. We 2975 * have to move one tuple forward, and then one backward. (We don't 2976 * do it the other way because we might try to fetch the row before 2977 * our mark, which isn't allowed.) XXX this case could stand to be 2978 * optimized. 2979 */ 2980 tuplestore_advance(winstate->buffer, true); 2981 winobj->seekpos++; 2982 } 2983 2984 /* 2985 * Now we should be on the tuple immediately before or after the one we 2986 * want, so just fetch forwards or backwards as appropriate. 2987 */ 2988 if (winobj->seekpos > pos) 2989 { 2990 if (!tuplestore_gettupleslot(winstate->buffer, false, true, slot)) 2991 elog(ERROR, "unexpected end of tuplestore"); 2992 winobj->seekpos--; 2993 } 2994 else 2995 { 2996 if (!tuplestore_gettupleslot(winstate->buffer, true, true, slot)) 2997 elog(ERROR, "unexpected end of tuplestore"); 2998 winobj->seekpos++; 2999 } 3000 3001 Assert(winobj->seekpos == pos); 3002 3003 MemoryContextSwitchTo(oldcontext); 3004 3005 return true; 3006 } 3007 3008 3009 /*********************************************************************** 3010 * API exposed to window functions 3011 ***********************************************************************/ 3012 3013 3014 /* 3015 * WinGetPartitionLocalMemory 3016 * Get working memory that lives till end of partition processing 3017 * 3018 * On first call within a given partition, this allocates and zeroes the 3019 * requested amount of space. Subsequent calls just return the same chunk. 3020 * 3021 * Memory obtained this way is normally used to hold state that should be 3022 * automatically reset for each new partition. If a window function wants 3023 * to hold state across the whole query, fcinfo->fn_extra can be used in the 3024 * usual way for that. 3025 */ 3026 void * 3027 WinGetPartitionLocalMemory(WindowObject winobj, Size sz) 3028 { 3029 Assert(WindowObjectIsValid(winobj)); 3030 if (winobj->localmem == NULL) 3031 winobj->localmem = 3032 MemoryContextAllocZero(winobj->winstate->partcontext, sz); 3033 return winobj->localmem; 3034 } 3035 3036 /* 3037 * WinGetCurrentPosition 3038 * Return the current row's position (counting from 0) within the current 3039 * partition. 3040 */ 3041 int64 3042 WinGetCurrentPosition(WindowObject winobj) 3043 { 3044 Assert(WindowObjectIsValid(winobj)); 3045 return winobj->winstate->currentpos; 3046 } 3047 3048 /* 3049 * WinGetPartitionRowCount 3050 * Return total number of rows contained in the current partition. 3051 * 3052 * Note: this is a relatively expensive operation because it forces the 3053 * whole partition to be "spooled" into the tuplestore at once. Once 3054 * executed, however, additional calls within the same partition are cheap. 3055 */ 3056 int64 3057 WinGetPartitionRowCount(WindowObject winobj) 3058 { 3059 Assert(WindowObjectIsValid(winobj)); 3060 spool_tuples(winobj->winstate, -1); 3061 return winobj->winstate->spooled_rows; 3062 } 3063 3064 /* 3065 * WinSetMarkPosition 3066 * Set the "mark" position for the window object, which is the oldest row 3067 * number (counting from 0) it is allowed to fetch during all subsequent 3068 * operations within the current partition. 3069 * 3070 * Window functions do not have to call this, but are encouraged to move the 3071 * mark forward when possible to keep the tuplestore size down and prevent 3072 * having to spill rows to disk. 3073 */ 3074 void 3075 WinSetMarkPosition(WindowObject winobj, int64 markpos) 3076 { 3077 WindowAggState *winstate; 3078 3079 Assert(WindowObjectIsValid(winobj)); 3080 winstate = winobj->winstate; 3081 3082 if (markpos < winobj->markpos) 3083 elog(ERROR, "cannot move WindowObject's mark position backward"); 3084 tuplestore_select_read_pointer(winstate->buffer, winobj->markptr); 3085 if (markpos > winobj->markpos) 3086 { 3087 tuplestore_skiptuples(winstate->buffer, 3088 markpos - winobj->markpos, 3089 true); 3090 winobj->markpos = markpos; 3091 } 3092 tuplestore_select_read_pointer(winstate->buffer, winobj->readptr); 3093 if (markpos > winobj->seekpos) 3094 { 3095 tuplestore_skiptuples(winstate->buffer, 3096 markpos - winobj->seekpos, 3097 true); 3098 winobj->seekpos = markpos; 3099 } 3100 } 3101 3102 /* 3103 * WinRowsArePeers 3104 * Compare two rows (specified by absolute position in partition) to see 3105 * if they are equal according to the ORDER BY clause. 3106 * 3107 * NB: this does not consider the window frame mode. 3108 */ 3109 bool 3110 WinRowsArePeers(WindowObject winobj, int64 pos1, int64 pos2) 3111 { 3112 WindowAggState *winstate; 3113 WindowAgg *node; 3114 TupleTableSlot *slot1; 3115 TupleTableSlot *slot2; 3116 bool res; 3117 3118 Assert(WindowObjectIsValid(winobj)); 3119 winstate = winobj->winstate; 3120 node = (WindowAgg *) winstate->ss.ps.plan; 3121 3122 /* If no ORDER BY, all rows are peers; don't bother to fetch them */ 3123 if (node->ordNumCols == 0) 3124 return true; 3125 3126 /* 3127 * Note: OK to use temp_slot_2 here because we aren't calling any 3128 * frame-related functions (those tend to clobber temp_slot_2). 3129 */ 3130 slot1 = winstate->temp_slot_1; 3131 slot2 = winstate->temp_slot_2; 3132 3133 if (!window_gettupleslot(winobj, pos1, slot1)) 3134 elog(ERROR, "specified position is out of window: " INT64_FORMAT, 3135 pos1); 3136 if (!window_gettupleslot(winobj, pos2, slot2)) 3137 elog(ERROR, "specified position is out of window: " INT64_FORMAT, 3138 pos2); 3139 3140 res = are_peers(winstate, slot1, slot2); 3141 3142 ExecClearTuple(slot1); 3143 ExecClearTuple(slot2); 3144 3145 return res; 3146 } 3147 3148 /* 3149 * WinGetFuncArgInPartition 3150 * Evaluate a window function's argument expression on a specified 3151 * row of the partition. The row is identified in lseek(2) style, 3152 * i.e. relative to the current, first, or last row. 3153 * 3154 * argno: argument number to evaluate (counted from 0) 3155 * relpos: signed rowcount offset from the seek position 3156 * seektype: WINDOW_SEEK_CURRENT, WINDOW_SEEK_HEAD, or WINDOW_SEEK_TAIL 3157 * set_mark: If the row is found and set_mark is true, the mark is moved to 3158 * the row as a side-effect. 3159 * isnull: output argument, receives isnull status of result 3160 * isout: output argument, set to indicate whether target row position 3161 * is out of partition (can pass NULL if caller doesn't care about this) 3162 * 3163 * Specifying a nonexistent row is not an error, it just causes a null result 3164 * (plus setting *isout true, if isout isn't NULL). 3165 */ 3166 Datum 3167 WinGetFuncArgInPartition(WindowObject winobj, int argno, 3168 int relpos, int seektype, bool set_mark, 3169 bool *isnull, bool *isout) 3170 { 3171 WindowAggState *winstate; 3172 ExprContext *econtext; 3173 TupleTableSlot *slot; 3174 bool gottuple; 3175 int64 abs_pos; 3176 3177 Assert(WindowObjectIsValid(winobj)); 3178 winstate = winobj->winstate; 3179 econtext = winstate->ss.ps.ps_ExprContext; 3180 slot = winstate->temp_slot_1; 3181 3182 switch (seektype) 3183 { 3184 case WINDOW_SEEK_CURRENT: 3185 abs_pos = winstate->currentpos + relpos; 3186 break; 3187 case WINDOW_SEEK_HEAD: 3188 abs_pos = relpos; 3189 break; 3190 case WINDOW_SEEK_TAIL: 3191 spool_tuples(winstate, -1); 3192 abs_pos = winstate->spooled_rows - 1 + relpos; 3193 break; 3194 default: 3195 elog(ERROR, "unrecognized window seek type: %d", seektype); 3196 abs_pos = 0; /* keep compiler quiet */ 3197 break; 3198 } 3199 3200 gottuple = window_gettupleslot(winobj, abs_pos, slot); 3201 3202 if (!gottuple) 3203 { 3204 if (isout) 3205 *isout = true; 3206 *isnull = true; 3207 return (Datum) 0; 3208 } 3209 else 3210 { 3211 if (isout) 3212 *isout = false; 3213 if (set_mark) 3214 WinSetMarkPosition(winobj, abs_pos); 3215 econtext->ecxt_outertuple = slot; 3216 return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno), 3217 econtext, isnull); 3218 } 3219 } 3220 3221 /* 3222 * WinGetFuncArgInFrame 3223 * Evaluate a window function's argument expression on a specified 3224 * row of the window frame. The row is identified in lseek(2) style, 3225 * i.e. relative to the first or last row of the frame. (We do not 3226 * support WINDOW_SEEK_CURRENT here, because it's not very clear what 3227 * that should mean if the current row isn't part of the frame.) 3228 * 3229 * argno: argument number to evaluate (counted from 0) 3230 * relpos: signed rowcount offset from the seek position 3231 * seektype: WINDOW_SEEK_HEAD or WINDOW_SEEK_TAIL 3232 * set_mark: If the row is found/in frame and set_mark is true, the mark is 3233 * moved to the row as a side-effect. 3234 * isnull: output argument, receives isnull status of result 3235 * isout: output argument, set to indicate whether target row position 3236 * is out of frame (can pass NULL if caller doesn't care about this) 3237 * 3238 * Specifying a nonexistent or not-in-frame row is not an error, it just 3239 * causes a null result (plus setting *isout true, if isout isn't NULL). 3240 * 3241 * Note that some exclusion-clause options lead to situations where the 3242 * rows that are in-frame are not consecutive in the partition. But we 3243 * count only in-frame rows when measuring relpos. 3244 * 3245 * The set_mark flag is interpreted as meaning that the caller will specify 3246 * a constant (or, perhaps, monotonically increasing) relpos in successive 3247 * calls, so that *if there is no exclusion clause* there will be no need 3248 * to fetch a row before the previously fetched row. But we do not expect 3249 * the caller to know how to account for exclusion clauses. Therefore, 3250 * if there is an exclusion clause we take responsibility for adjusting the 3251 * mark request to something that will be safe given the above assumption 3252 * about relpos. 3253 */ 3254 Datum 3255 WinGetFuncArgInFrame(WindowObject winobj, int argno, 3256 int relpos, int seektype, bool set_mark, 3257 bool *isnull, bool *isout) 3258 { 3259 WindowAggState *winstate; 3260 ExprContext *econtext; 3261 TupleTableSlot *slot; 3262 int64 abs_pos; 3263 int64 mark_pos; 3264 3265 Assert(WindowObjectIsValid(winobj)); 3266 winstate = winobj->winstate; 3267 econtext = winstate->ss.ps.ps_ExprContext; 3268 slot = winstate->temp_slot_1; 3269 3270 switch (seektype) 3271 { 3272 case WINDOW_SEEK_CURRENT: 3273 elog(ERROR, "WINDOW_SEEK_CURRENT is not supported for WinGetFuncArgInFrame"); 3274 abs_pos = mark_pos = 0; /* keep compiler quiet */ 3275 break; 3276 case WINDOW_SEEK_HEAD: 3277 /* rejecting relpos < 0 is easy and simplifies code below */ 3278 if (relpos < 0) 3279 goto out_of_frame; 3280 update_frameheadpos(winstate); 3281 abs_pos = winstate->frameheadpos + relpos; 3282 mark_pos = abs_pos; 3283 3284 /* 3285 * Account for exclusion option if one is active, but advance only 3286 * abs_pos not mark_pos. This prevents changes of the current 3287 * row's peer group from resulting in trying to fetch a row before 3288 * some previous mark position. 3289 * 3290 * Note that in some corner cases such as current row being 3291 * outside frame, these calculations are theoretically too simple, 3292 * but it doesn't matter because we'll end up deciding the row is 3293 * out of frame. We do not attempt to avoid fetching rows past 3294 * end of frame; that would happen in some cases anyway. 3295 */ 3296 switch (winstate->frameOptions & FRAMEOPTION_EXCLUSION) 3297 { 3298 case 0: 3299 /* no adjustment needed */ 3300 break; 3301 case FRAMEOPTION_EXCLUDE_CURRENT_ROW: 3302 if (abs_pos >= winstate->currentpos && 3303 winstate->currentpos >= winstate->frameheadpos) 3304 abs_pos++; 3305 break; 3306 case FRAMEOPTION_EXCLUDE_GROUP: 3307 update_grouptailpos(winstate); 3308 if (abs_pos >= winstate->groupheadpos && 3309 winstate->grouptailpos > winstate->frameheadpos) 3310 { 3311 int64 overlapstart = Max(winstate->groupheadpos, 3312 winstate->frameheadpos); 3313 3314 abs_pos += winstate->grouptailpos - overlapstart; 3315 } 3316 break; 3317 case FRAMEOPTION_EXCLUDE_TIES: 3318 update_grouptailpos(winstate); 3319 if (abs_pos >= winstate->groupheadpos && 3320 winstate->grouptailpos > winstate->frameheadpos) 3321 { 3322 int64 overlapstart = Max(winstate->groupheadpos, 3323 winstate->frameheadpos); 3324 3325 if (abs_pos == overlapstart) 3326 abs_pos = winstate->currentpos; 3327 else 3328 abs_pos += winstate->grouptailpos - overlapstart - 1; 3329 } 3330 break; 3331 default: 3332 elog(ERROR, "unrecognized frame option state: 0x%x", 3333 winstate->frameOptions); 3334 break; 3335 } 3336 break; 3337 case WINDOW_SEEK_TAIL: 3338 /* rejecting relpos > 0 is easy and simplifies code below */ 3339 if (relpos > 0) 3340 goto out_of_frame; 3341 update_frametailpos(winstate); 3342 abs_pos = winstate->frametailpos - 1 + relpos; 3343 3344 /* 3345 * Account for exclusion option if one is active. If there is no 3346 * exclusion, we can safely set the mark at the accessed row. But 3347 * if there is, we can only mark the frame start, because we can't 3348 * be sure how far back in the frame the exclusion might cause us 3349 * to fetch in future. Furthermore, we have to actually check 3350 * against frameheadpos here, since it's unsafe to try to fetch a 3351 * row before frame start if the mark might be there already. 3352 */ 3353 switch (winstate->frameOptions & FRAMEOPTION_EXCLUSION) 3354 { 3355 case 0: 3356 /* no adjustment needed */ 3357 mark_pos = abs_pos; 3358 break; 3359 case FRAMEOPTION_EXCLUDE_CURRENT_ROW: 3360 if (abs_pos <= winstate->currentpos && 3361 winstate->currentpos < winstate->frametailpos) 3362 abs_pos--; 3363 update_frameheadpos(winstate); 3364 if (abs_pos < winstate->frameheadpos) 3365 goto out_of_frame; 3366 mark_pos = winstate->frameheadpos; 3367 break; 3368 case FRAMEOPTION_EXCLUDE_GROUP: 3369 update_grouptailpos(winstate); 3370 if (abs_pos < winstate->grouptailpos && 3371 winstate->groupheadpos < winstate->frametailpos) 3372 { 3373 int64 overlapend = Min(winstate->grouptailpos, 3374 winstate->frametailpos); 3375 3376 abs_pos -= overlapend - winstate->groupheadpos; 3377 } 3378 update_frameheadpos(winstate); 3379 if (abs_pos < winstate->frameheadpos) 3380 goto out_of_frame; 3381 mark_pos = winstate->frameheadpos; 3382 break; 3383 case FRAMEOPTION_EXCLUDE_TIES: 3384 update_grouptailpos(winstate); 3385 if (abs_pos < winstate->grouptailpos && 3386 winstate->groupheadpos < winstate->frametailpos) 3387 { 3388 int64 overlapend = Min(winstate->grouptailpos, 3389 winstate->frametailpos); 3390 3391 if (abs_pos == overlapend - 1) 3392 abs_pos = winstate->currentpos; 3393 else 3394 abs_pos -= overlapend - 1 - winstate->groupheadpos; 3395 } 3396 update_frameheadpos(winstate); 3397 if (abs_pos < winstate->frameheadpos) 3398 goto out_of_frame; 3399 mark_pos = winstate->frameheadpos; 3400 break; 3401 default: 3402 elog(ERROR, "unrecognized frame option state: 0x%x", 3403 winstate->frameOptions); 3404 mark_pos = 0; /* keep compiler quiet */ 3405 break; 3406 } 3407 break; 3408 default: 3409 elog(ERROR, "unrecognized window seek type: %d", seektype); 3410 abs_pos = mark_pos = 0; /* keep compiler quiet */ 3411 break; 3412 } 3413 3414 if (!window_gettupleslot(winobj, abs_pos, slot)) 3415 goto out_of_frame; 3416 3417 /* The code above does not detect all out-of-frame cases, so check */ 3418 if (row_is_in_frame(winstate, abs_pos, slot) <= 0) 3419 goto out_of_frame; 3420 3421 if (isout) 3422 *isout = false; 3423 if (set_mark) 3424 WinSetMarkPosition(winobj, mark_pos); 3425 econtext->ecxt_outertuple = slot; 3426 return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno), 3427 econtext, isnull); 3428 3429 out_of_frame: 3430 if (isout) 3431 *isout = true; 3432 *isnull = true; 3433 return (Datum) 0; 3434 } 3435 3436 /* 3437 * WinGetFuncArgCurrent 3438 * Evaluate a window function's argument expression on the current row. 3439 * 3440 * argno: argument number to evaluate (counted from 0) 3441 * isnull: output argument, receives isnull status of result 3442 * 3443 * Note: this isn't quite equivalent to WinGetFuncArgInPartition or 3444 * WinGetFuncArgInFrame targeting the current row, because it will succeed 3445 * even if the WindowObject's mark has been set beyond the current row. 3446 * This should generally be used for "ordinary" arguments of a window 3447 * function, such as the offset argument of lead() or lag(). 3448 */ 3449 Datum 3450 WinGetFuncArgCurrent(WindowObject winobj, int argno, bool *isnull) 3451 { 3452 WindowAggState *winstate; 3453 ExprContext *econtext; 3454 3455 Assert(WindowObjectIsValid(winobj)); 3456 winstate = winobj->winstate; 3457 3458 econtext = winstate->ss.ps.ps_ExprContext; 3459 3460 econtext->ecxt_outertuple = winstate->ss.ss_ScanTupleSlot; 3461 return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno), 3462 econtext, isnull); 3463 } 3464