1 /*------------------------------------------------------------------------- 2 * 3 * nodeWindowAgg.c 4 * routines to handle WindowAgg nodes. 5 * 6 * A WindowAgg node evaluates "window functions" across suitable partitions 7 * of the input tuple set. Any one WindowAgg works for just a single window 8 * specification, though it can evaluate multiple window functions sharing 9 * identical window specifications. The input tuples are required to be 10 * delivered in sorted order, with the PARTITION BY columns (if any) as 11 * major sort keys and the ORDER BY columns (if any) as minor sort keys. 12 * (The planner generates a stack of WindowAggs with intervening Sort nodes 13 * as needed, if a query involves more than one window specification.) 14 * 15 * Since window functions can require access to any or all of the rows in 16 * the current partition, we accumulate rows of the partition into a 17 * tuplestore. The window functions are called using the WindowObject API 18 * so that they can access those rows as needed. 19 * 20 * We also support using plain aggregate functions as window functions. 21 * For these, the regular Agg-node environment is emulated for each partition. 22 * As required by the SQL spec, the output represents the value of the 23 * aggregate function over all rows in the current row's window frame. 24 * 25 * 26 * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group 27 * Portions Copyright (c) 1994, Regents of the University of California 28 * 29 * IDENTIFICATION 30 * src/backend/executor/nodeWindowAgg.c 31 * 32 *------------------------------------------------------------------------- 33 */ 34 #include "postgres.h" 35 36 #include "access/htup_details.h" 37 #include "catalog/objectaccess.h" 38 #include "catalog/pg_aggregate.h" 39 #include "catalog/pg_proc.h" 40 #include "executor/executor.h" 41 #include "executor/nodeWindowAgg.h" 42 #include "miscadmin.h" 43 #include "nodes/nodeFuncs.h" 44 #include "optimizer/clauses.h" 45 #include "parser/parse_agg.h" 46 #include "parser/parse_coerce.h" 47 #include "utils/acl.h" 48 #include "utils/builtins.h" 49 #include "utils/datum.h" 50 #include "utils/lsyscache.h" 51 #include "utils/memutils.h" 52 #include "utils/regproc.h" 53 #include "utils/syscache.h" 54 #include "windowapi.h" 55 56 /* 57 * All the window function APIs are called with this object, which is passed 58 * to window functions as fcinfo->context. 59 */ 60 typedef struct WindowObjectData 61 { 62 NodeTag type; 63 WindowAggState *winstate; /* parent WindowAggState */ 64 List *argstates; /* ExprState trees for fn's arguments */ 65 void *localmem; /* WinGetPartitionLocalMemory's chunk */ 66 int markptr; /* tuplestore mark pointer for this fn */ 67 int readptr; /* tuplestore read pointer for this fn */ 68 int64 markpos; /* row that markptr is positioned on */ 69 int64 seekpos; /* row that readptr is positioned on */ 70 } WindowObjectData; 71 72 /* 73 * We have one WindowStatePerFunc struct for each window function and 74 * window aggregate handled by this node. 75 */ 76 typedef struct WindowStatePerFuncData 77 { 78 /* Links to WindowFunc expr and state nodes this working state is for */ 79 WindowFuncExprState *wfuncstate; 80 WindowFunc *wfunc; 81 82 int numArguments; /* number of arguments */ 83 84 FmgrInfo flinfo; /* fmgr lookup data for window function */ 85 86 Oid winCollation; /* collation derived for window function */ 87 88 /* 89 * We need the len and byval info for the result of each function in order 90 * to know how to copy/delete values. 91 */ 92 int16 resulttypeLen; 93 bool resulttypeByVal; 94 95 bool plain_agg; /* is it just a plain aggregate function? */ 96 int aggno; /* if so, index of its PerAggData */ 97 98 WindowObject winobj; /* object used in window function API */ 99 } WindowStatePerFuncData; 100 101 /* 102 * For plain aggregate window functions, we also have one of these. 103 */ 104 typedef struct WindowStatePerAggData 105 { 106 /* Oids of transition functions */ 107 Oid transfn_oid; 108 Oid invtransfn_oid; /* may be InvalidOid */ 109 Oid finalfn_oid; /* may be InvalidOid */ 110 111 /* 112 * fmgr lookup data for transition functions --- only valid when 113 * corresponding oid is not InvalidOid. Note in particular that fn_strict 114 * flags are kept here. 115 */ 116 FmgrInfo transfn; 117 FmgrInfo invtransfn; 118 FmgrInfo finalfn; 119 120 int numFinalArgs; /* number of arguments to pass to finalfn */ 121 122 /* 123 * initial value from pg_aggregate entry 124 */ 125 Datum initValue; 126 bool initValueIsNull; 127 128 /* 129 * cached value for current frame boundaries 130 */ 131 Datum resultValue; 132 bool resultValueIsNull; 133 134 /* 135 * We need the len and byval info for the agg's input, result, and 136 * transition data types in order to know how to copy/delete values. 137 */ 138 int16 inputtypeLen, 139 resulttypeLen, 140 transtypeLen; 141 bool inputtypeByVal, 142 resulttypeByVal, 143 transtypeByVal; 144 145 int wfuncno; /* index of associated PerFuncData */ 146 147 /* Context holding transition value and possibly other subsidiary data */ 148 MemoryContext aggcontext; /* may be private, or winstate->aggcontext */ 149 150 /* Current transition value */ 151 Datum transValue; /* current transition value */ 152 bool transValueIsNull; 153 154 int64 transValueCount; /* number of currently-aggregated rows */ 155 156 /* Data local to eval_windowaggregates() */ 157 bool restart; /* need to restart this agg in this cycle? */ 158 } WindowStatePerAggData; 159 160 static void initialize_windowaggregate(WindowAggState *winstate, 161 WindowStatePerFunc perfuncstate, 162 WindowStatePerAgg peraggstate); 163 static void advance_windowaggregate(WindowAggState *winstate, 164 WindowStatePerFunc perfuncstate, 165 WindowStatePerAgg peraggstate); 166 static bool advance_windowaggregate_base(WindowAggState *winstate, 167 WindowStatePerFunc perfuncstate, 168 WindowStatePerAgg peraggstate); 169 static void finalize_windowaggregate(WindowAggState *winstate, 170 WindowStatePerFunc perfuncstate, 171 WindowStatePerAgg peraggstate, 172 Datum *result, bool *isnull); 173 174 static void eval_windowaggregates(WindowAggState *winstate); 175 static void eval_windowfunction(WindowAggState *winstate, 176 WindowStatePerFunc perfuncstate, 177 Datum *result, bool *isnull); 178 179 static void begin_partition(WindowAggState *winstate); 180 static void spool_tuples(WindowAggState *winstate, int64 pos); 181 static void release_partition(WindowAggState *winstate); 182 183 static int row_is_in_frame(WindowAggState *winstate, int64 pos, 184 TupleTableSlot *slot); 185 static void update_frameheadpos(WindowAggState *winstate); 186 static void update_frametailpos(WindowAggState *winstate); 187 static void update_grouptailpos(WindowAggState *winstate); 188 189 static WindowStatePerAggData *initialize_peragg(WindowAggState *winstate, 190 WindowFunc *wfunc, 191 WindowStatePerAgg peraggstate); 192 static Datum GetAggInitVal(Datum textInitVal, Oid transtype); 193 194 static bool are_peers(WindowAggState *winstate, TupleTableSlot *slot1, 195 TupleTableSlot *slot2); 196 static bool window_gettupleslot(WindowObject winobj, int64 pos, 197 TupleTableSlot *slot); 198 199 200 /* 201 * initialize_windowaggregate 202 * parallel to initialize_aggregates in nodeAgg.c 203 */ 204 static void 205 initialize_windowaggregate(WindowAggState *winstate, 206 WindowStatePerFunc perfuncstate, 207 WindowStatePerAgg peraggstate) 208 { 209 MemoryContext oldContext; 210 211 /* 212 * If we're using a private aggcontext, we may reset it here. But if the 213 * context is shared, we don't know which other aggregates may still need 214 * it, so we must leave it to the caller to reset at an appropriate time. 215 */ 216 if (peraggstate->aggcontext != winstate->aggcontext) 217 MemoryContextResetAndDeleteChildren(peraggstate->aggcontext); 218 219 if (peraggstate->initValueIsNull) 220 peraggstate->transValue = peraggstate->initValue; 221 else 222 { 223 oldContext = MemoryContextSwitchTo(peraggstate->aggcontext); 224 peraggstate->transValue = datumCopy(peraggstate->initValue, 225 peraggstate->transtypeByVal, 226 peraggstate->transtypeLen); 227 MemoryContextSwitchTo(oldContext); 228 } 229 peraggstate->transValueIsNull = peraggstate->initValueIsNull; 230 peraggstate->transValueCount = 0; 231 peraggstate->resultValue = (Datum) 0; 232 peraggstate->resultValueIsNull = true; 233 } 234 235 /* 236 * advance_windowaggregate 237 * parallel to advance_aggregates in nodeAgg.c 238 */ 239 static void 240 advance_windowaggregate(WindowAggState *winstate, 241 WindowStatePerFunc perfuncstate, 242 WindowStatePerAgg peraggstate) 243 { 244 WindowFuncExprState *wfuncstate = perfuncstate->wfuncstate; 245 int numArguments = perfuncstate->numArguments; 246 FunctionCallInfoData fcinfodata; 247 FunctionCallInfo fcinfo = &fcinfodata; 248 Datum newVal; 249 ListCell *arg; 250 int i; 251 MemoryContext oldContext; 252 ExprContext *econtext = winstate->tmpcontext; 253 ExprState *filter = wfuncstate->aggfilter; 254 255 oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory); 256 257 /* Skip anything FILTERed out */ 258 if (filter) 259 { 260 bool isnull; 261 Datum res = ExecEvalExpr(filter, econtext, &isnull); 262 263 if (isnull || !DatumGetBool(res)) 264 { 265 MemoryContextSwitchTo(oldContext); 266 return; 267 } 268 } 269 270 /* We start from 1, since the 0th arg will be the transition value */ 271 i = 1; 272 foreach(arg, wfuncstate->args) 273 { 274 ExprState *argstate = (ExprState *) lfirst(arg); 275 276 fcinfo->arg[i] = ExecEvalExpr(argstate, econtext, 277 &fcinfo->argnull[i]); 278 i++; 279 } 280 281 if (peraggstate->transfn.fn_strict) 282 { 283 /* 284 * For a strict transfn, nothing happens when there's a NULL input; we 285 * just keep the prior transValue. Note transValueCount doesn't 286 * change either. 287 */ 288 for (i = 1; i <= numArguments; i++) 289 { 290 if (fcinfo->argnull[i]) 291 { 292 MemoryContextSwitchTo(oldContext); 293 return; 294 } 295 } 296 297 /* 298 * For strict transition functions with initial value NULL we use the 299 * first non-NULL input as the initial state. (We already checked 300 * that the agg's input type is binary-compatible with its transtype, 301 * so straight copy here is OK.) 302 * 303 * We must copy the datum into aggcontext if it is pass-by-ref. We do 304 * not need to pfree the old transValue, since it's NULL. 305 */ 306 if (peraggstate->transValueCount == 0 && peraggstate->transValueIsNull) 307 { 308 MemoryContextSwitchTo(peraggstate->aggcontext); 309 peraggstate->transValue = datumCopy(fcinfo->arg[1], 310 peraggstate->transtypeByVal, 311 peraggstate->transtypeLen); 312 peraggstate->transValueIsNull = false; 313 peraggstate->transValueCount = 1; 314 MemoryContextSwitchTo(oldContext); 315 return; 316 } 317 318 if (peraggstate->transValueIsNull) 319 { 320 /* 321 * Don't call a strict function with NULL inputs. Note it is 322 * possible to get here despite the above tests, if the transfn is 323 * strict *and* returned a NULL on a prior cycle. If that happens 324 * we will propagate the NULL all the way to the end. That can 325 * only happen if there's no inverse transition function, though, 326 * since we disallow transitions back to NULL when there is one. 327 */ 328 MemoryContextSwitchTo(oldContext); 329 Assert(!OidIsValid(peraggstate->invtransfn_oid)); 330 return; 331 } 332 } 333 334 /* 335 * OK to call the transition function. Set winstate->curaggcontext while 336 * calling it, for possible use by AggCheckCallContext. 337 */ 338 InitFunctionCallInfoData(*fcinfo, &(peraggstate->transfn), 339 numArguments + 1, 340 perfuncstate->winCollation, 341 (void *) winstate, NULL); 342 fcinfo->arg[0] = peraggstate->transValue; 343 fcinfo->argnull[0] = peraggstate->transValueIsNull; 344 winstate->curaggcontext = peraggstate->aggcontext; 345 newVal = FunctionCallInvoke(fcinfo); 346 winstate->curaggcontext = NULL; 347 348 /* 349 * Moving-aggregate transition functions must not return null, see 350 * advance_windowaggregate_base(). 351 */ 352 if (fcinfo->isnull && OidIsValid(peraggstate->invtransfn_oid)) 353 ereport(ERROR, 354 (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), 355 errmsg("moving-aggregate transition function must not return null"))); 356 357 /* 358 * We must track the number of rows included in transValue, since to 359 * remove the last input, advance_windowaggregate_base() mustn't call the 360 * inverse transition function, but simply reset transValue back to its 361 * initial value. 362 */ 363 peraggstate->transValueCount++; 364 365 /* 366 * If pass-by-ref datatype, must copy the new value into aggcontext and 367 * free the prior transValue. But if transfn returned a pointer to its 368 * first input, we don't need to do anything. Also, if transfn returned a 369 * pointer to a R/W expanded object that is already a child of the 370 * aggcontext, assume we can adopt that value without copying it. 371 */ 372 if (!peraggstate->transtypeByVal && 373 DatumGetPointer(newVal) != DatumGetPointer(peraggstate->transValue)) 374 { 375 if (!fcinfo->isnull) 376 { 377 MemoryContextSwitchTo(peraggstate->aggcontext); 378 if (DatumIsReadWriteExpandedObject(newVal, 379 false, 380 peraggstate->transtypeLen) && 381 MemoryContextGetParent(DatumGetEOHP(newVal)->eoh_context) == CurrentMemoryContext) 382 /* do nothing */ ; 383 else 384 newVal = datumCopy(newVal, 385 peraggstate->transtypeByVal, 386 peraggstate->transtypeLen); 387 } 388 if (!peraggstate->transValueIsNull) 389 { 390 if (DatumIsReadWriteExpandedObject(peraggstate->transValue, 391 false, 392 peraggstate->transtypeLen)) 393 DeleteExpandedObject(peraggstate->transValue); 394 else 395 pfree(DatumGetPointer(peraggstate->transValue)); 396 } 397 } 398 399 MemoryContextSwitchTo(oldContext); 400 peraggstate->transValue = newVal; 401 peraggstate->transValueIsNull = fcinfo->isnull; 402 } 403 404 /* 405 * advance_windowaggregate_base 406 * Remove the oldest tuple from an aggregation. 407 * 408 * This is very much like advance_windowaggregate, except that we will call 409 * the inverse transition function (which caller must have checked is 410 * available). 411 * 412 * Returns true if we successfully removed the current row from this 413 * aggregate, false if not (in the latter case, caller is responsible 414 * for cleaning up by restarting the aggregation). 415 */ 416 static bool 417 advance_windowaggregate_base(WindowAggState *winstate, 418 WindowStatePerFunc perfuncstate, 419 WindowStatePerAgg peraggstate) 420 { 421 WindowFuncExprState *wfuncstate = perfuncstate->wfuncstate; 422 int numArguments = perfuncstate->numArguments; 423 FunctionCallInfoData fcinfodata; 424 FunctionCallInfo fcinfo = &fcinfodata; 425 Datum newVal; 426 ListCell *arg; 427 int i; 428 MemoryContext oldContext; 429 ExprContext *econtext = winstate->tmpcontext; 430 ExprState *filter = wfuncstate->aggfilter; 431 432 oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory); 433 434 /* Skip anything FILTERed out */ 435 if (filter) 436 { 437 bool isnull; 438 Datum res = ExecEvalExpr(filter, econtext, &isnull); 439 440 if (isnull || !DatumGetBool(res)) 441 { 442 MemoryContextSwitchTo(oldContext); 443 return true; 444 } 445 } 446 447 /* We start from 1, since the 0th arg will be the transition value */ 448 i = 1; 449 foreach(arg, wfuncstate->args) 450 { 451 ExprState *argstate = (ExprState *) lfirst(arg); 452 453 fcinfo->arg[i] = ExecEvalExpr(argstate, econtext, 454 &fcinfo->argnull[i]); 455 i++; 456 } 457 458 if (peraggstate->invtransfn.fn_strict) 459 { 460 /* 461 * For a strict (inv)transfn, nothing happens when there's a NULL 462 * input; we just keep the prior transValue. Note transValueCount 463 * doesn't change either. 464 */ 465 for (i = 1; i <= numArguments; i++) 466 { 467 if (fcinfo->argnull[i]) 468 { 469 MemoryContextSwitchTo(oldContext); 470 return true; 471 } 472 } 473 } 474 475 /* There should still be an added but not yet removed value */ 476 Assert(peraggstate->transValueCount > 0); 477 478 /* 479 * In moving-aggregate mode, the state must never be NULL, except possibly 480 * before any rows have been aggregated (which is surely not the case at 481 * this point). This restriction allows us to interpret a NULL result 482 * from the inverse function as meaning "sorry, can't do an inverse 483 * transition in this case". We already checked this in 484 * advance_windowaggregate, but just for safety, check again. 485 */ 486 if (peraggstate->transValueIsNull) 487 elog(ERROR, "aggregate transition value is NULL before inverse transition"); 488 489 /* 490 * We mustn't use the inverse transition function to remove the last 491 * input. Doing so would yield a non-NULL state, whereas we should be in 492 * the initial state afterwards which may very well be NULL. So instead, 493 * we simply re-initialize the aggregate in this case. 494 */ 495 if (peraggstate->transValueCount == 1) 496 { 497 MemoryContextSwitchTo(oldContext); 498 initialize_windowaggregate(winstate, 499 &winstate->perfunc[peraggstate->wfuncno], 500 peraggstate); 501 return true; 502 } 503 504 /* 505 * OK to call the inverse transition function. Set 506 * winstate->curaggcontext while calling it, for possible use by 507 * AggCheckCallContext. 508 */ 509 InitFunctionCallInfoData(*fcinfo, &(peraggstate->invtransfn), 510 numArguments + 1, 511 perfuncstate->winCollation, 512 (void *) winstate, NULL); 513 fcinfo->arg[0] = peraggstate->transValue; 514 fcinfo->argnull[0] = peraggstate->transValueIsNull; 515 winstate->curaggcontext = peraggstate->aggcontext; 516 newVal = FunctionCallInvoke(fcinfo); 517 winstate->curaggcontext = NULL; 518 519 /* 520 * If the function returns NULL, report failure, forcing a restart. 521 */ 522 if (fcinfo->isnull) 523 { 524 MemoryContextSwitchTo(oldContext); 525 return false; 526 } 527 528 /* Update number of rows included in transValue */ 529 peraggstate->transValueCount--; 530 531 /* 532 * If pass-by-ref datatype, must copy the new value into aggcontext and 533 * free the prior transValue. But if invtransfn returned a pointer to its 534 * first input, we don't need to do anything. Also, if invtransfn 535 * returned a pointer to a R/W expanded object that is already a child of 536 * the aggcontext, assume we can adopt that value without copying it. 537 * 538 * Note: the checks for null values here will never fire, but it seems 539 * best to have this stanza look just like advance_windowaggregate. 540 */ 541 if (!peraggstate->transtypeByVal && 542 DatumGetPointer(newVal) != DatumGetPointer(peraggstate->transValue)) 543 { 544 if (!fcinfo->isnull) 545 { 546 MemoryContextSwitchTo(peraggstate->aggcontext); 547 if (DatumIsReadWriteExpandedObject(newVal, 548 false, 549 peraggstate->transtypeLen) && 550 MemoryContextGetParent(DatumGetEOHP(newVal)->eoh_context) == CurrentMemoryContext) 551 /* do nothing */ ; 552 else 553 newVal = datumCopy(newVal, 554 peraggstate->transtypeByVal, 555 peraggstate->transtypeLen); 556 } 557 if (!peraggstate->transValueIsNull) 558 { 559 if (DatumIsReadWriteExpandedObject(peraggstate->transValue, 560 false, 561 peraggstate->transtypeLen)) 562 DeleteExpandedObject(peraggstate->transValue); 563 else 564 pfree(DatumGetPointer(peraggstate->transValue)); 565 } 566 } 567 568 MemoryContextSwitchTo(oldContext); 569 peraggstate->transValue = newVal; 570 peraggstate->transValueIsNull = fcinfo->isnull; 571 572 return true; 573 } 574 575 /* 576 * finalize_windowaggregate 577 * parallel to finalize_aggregate in nodeAgg.c 578 */ 579 static void 580 finalize_windowaggregate(WindowAggState *winstate, 581 WindowStatePerFunc perfuncstate, 582 WindowStatePerAgg peraggstate, 583 Datum *result, bool *isnull) 584 { 585 MemoryContext oldContext; 586 587 oldContext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory); 588 589 /* 590 * Apply the agg's finalfn if one is provided, else return transValue. 591 */ 592 if (OidIsValid(peraggstate->finalfn_oid)) 593 { 594 int numFinalArgs = peraggstate->numFinalArgs; 595 FunctionCallInfoData fcinfo; 596 bool anynull; 597 int i; 598 599 InitFunctionCallInfoData(fcinfo, &(peraggstate->finalfn), 600 numFinalArgs, 601 perfuncstate->winCollation, 602 (void *) winstate, NULL); 603 fcinfo.arg[0] = MakeExpandedObjectReadOnly(peraggstate->transValue, 604 peraggstate->transValueIsNull, 605 peraggstate->transtypeLen); 606 fcinfo.argnull[0] = peraggstate->transValueIsNull; 607 anynull = peraggstate->transValueIsNull; 608 609 /* Fill any remaining argument positions with nulls */ 610 for (i = 1; i < numFinalArgs; i++) 611 { 612 fcinfo.arg[i] = (Datum) 0; 613 fcinfo.argnull[i] = true; 614 anynull = true; 615 } 616 617 if (fcinfo.flinfo->fn_strict && anynull) 618 { 619 /* don't call a strict function with NULL inputs */ 620 *result = (Datum) 0; 621 *isnull = true; 622 } 623 else 624 { 625 winstate->curaggcontext = peraggstate->aggcontext; 626 *result = FunctionCallInvoke(&fcinfo); 627 winstate->curaggcontext = NULL; 628 *isnull = fcinfo.isnull; 629 } 630 } 631 else 632 { 633 /* Don't need MakeExpandedObjectReadOnly; datumCopy will copy it */ 634 *result = peraggstate->transValue; 635 *isnull = peraggstate->transValueIsNull; 636 } 637 638 /* 639 * If result is pass-by-ref, make sure it is in the right context. 640 */ 641 if (!peraggstate->resulttypeByVal && !*isnull && 642 !MemoryContextContains(CurrentMemoryContext, 643 DatumGetPointer(*result))) 644 *result = datumCopy(*result, 645 peraggstate->resulttypeByVal, 646 peraggstate->resulttypeLen); 647 MemoryContextSwitchTo(oldContext); 648 } 649 650 /* 651 * eval_windowaggregates 652 * evaluate plain aggregates being used as window functions 653 * 654 * This differs from nodeAgg.c in two ways. First, if the window's frame 655 * start position moves, we use the inverse transition function (if it exists) 656 * to remove rows from the transition value. And second, we expect to be 657 * able to call aggregate final functions repeatedly after aggregating more 658 * data onto the same transition value. This is not a behavior required by 659 * nodeAgg.c. 660 */ 661 static void 662 eval_windowaggregates(WindowAggState *winstate) 663 { 664 WindowStatePerAgg peraggstate; 665 int wfuncno, 666 numaggs, 667 numaggs_restart, 668 i; 669 int64 aggregatedupto_nonrestarted; 670 MemoryContext oldContext; 671 ExprContext *econtext; 672 WindowObject agg_winobj; 673 TupleTableSlot *agg_row_slot; 674 TupleTableSlot *temp_slot; 675 676 numaggs = winstate->numaggs; 677 if (numaggs == 0) 678 return; /* nothing to do */ 679 680 /* final output execution is in ps_ExprContext */ 681 econtext = winstate->ss.ps.ps_ExprContext; 682 agg_winobj = winstate->agg_winobj; 683 agg_row_slot = winstate->agg_row_slot; 684 temp_slot = winstate->temp_slot_1; 685 686 /* 687 * If the window's frame start clause is UNBOUNDED_PRECEDING and no 688 * exclusion clause is specified, then the window frame consists of a 689 * contiguous group of rows extending forward from the start of the 690 * partition, and rows only enter the frame, never exit it, as the current 691 * row advances forward. This makes it possible to use an incremental 692 * strategy for evaluating aggregates: we run the transition function for 693 * each row added to the frame, and run the final function whenever we 694 * need the current aggregate value. This is considerably more efficient 695 * than the naive approach of re-running the entire aggregate calculation 696 * for each current row. It does assume that the final function doesn't 697 * damage the running transition value, but we have the same assumption in 698 * nodeAgg.c too (when it rescans an existing hash table). 699 * 700 * If the frame start does sometimes move, we can still optimize as above 701 * whenever successive rows share the same frame head, but if the frame 702 * head moves beyond the previous head we try to remove those rows using 703 * the aggregate's inverse transition function. This function restores 704 * the aggregate's current state to what it would be if the removed row 705 * had never been aggregated in the first place. Inverse transition 706 * functions may optionally return NULL, indicating that the function was 707 * unable to remove the tuple from aggregation. If this happens, or if 708 * the aggregate doesn't have an inverse transition function at all, we 709 * must perform the aggregation all over again for all tuples within the 710 * new frame boundaries. 711 * 712 * If there's any exclusion clause, then we may have to aggregate over a 713 * non-contiguous set of rows, so we punt and recalculate for every row. 714 * (For some frame end choices, it might be that the frame is always 715 * contiguous anyway, but that's an optimization to investigate later.) 716 * 717 * In many common cases, multiple rows share the same frame and hence the 718 * same aggregate value. (In particular, if there's no ORDER BY in a RANGE 719 * window, then all rows are peers and so they all have window frame equal 720 * to the whole partition.) We optimize such cases by calculating the 721 * aggregate value once when we reach the first row of a peer group, and 722 * then returning the saved value for all subsequent rows. 723 * 724 * 'aggregatedupto' keeps track of the first row that has not yet been 725 * accumulated into the aggregate transition values. Whenever we start a 726 * new peer group, we accumulate forward to the end of the peer group. 727 */ 728 729 /* 730 * First, update the frame head position. 731 * 732 * The frame head should never move backwards, and the code below wouldn't 733 * cope if it did, so for safety we complain if it does. 734 */ 735 update_frameheadpos(winstate); 736 if (winstate->frameheadpos < winstate->aggregatedbase) 737 elog(ERROR, "window frame head moved backward"); 738 739 /* 740 * If the frame didn't change compared to the previous row, we can re-use 741 * the result values that were previously saved at the bottom of this 742 * function. Since we don't know the current frame's end yet, this is not 743 * possible to check for fully. But if the frame end mode is UNBOUNDED 744 * FOLLOWING or CURRENT ROW, no exclusion clause is specified, and the 745 * current row lies within the previous row's frame, then the two frames' 746 * ends must coincide. Note that on the first row aggregatedbase == 747 * aggregatedupto, meaning this test must fail, so we don't need to check 748 * the "there was no previous row" case explicitly here. 749 */ 750 if (winstate->aggregatedbase == winstate->frameheadpos && 751 (winstate->frameOptions & (FRAMEOPTION_END_UNBOUNDED_FOLLOWING | 752 FRAMEOPTION_END_CURRENT_ROW)) && 753 !(winstate->frameOptions & FRAMEOPTION_EXCLUSION) && 754 winstate->aggregatedbase <= winstate->currentpos && 755 winstate->aggregatedupto > winstate->currentpos) 756 { 757 for (i = 0; i < numaggs; i++) 758 { 759 peraggstate = &winstate->peragg[i]; 760 wfuncno = peraggstate->wfuncno; 761 econtext->ecxt_aggvalues[wfuncno] = peraggstate->resultValue; 762 econtext->ecxt_aggnulls[wfuncno] = peraggstate->resultValueIsNull; 763 } 764 return; 765 } 766 767 /*---------- 768 * Initialize restart flags. 769 * 770 * We restart the aggregation: 771 * - if we're processing the first row in the partition, or 772 * - if the frame's head moved and we cannot use an inverse 773 * transition function, or 774 * - we have an EXCLUSION clause, or 775 * - if the new frame doesn't overlap the old one 776 * 777 * Note that we don't strictly need to restart in the last case, but if 778 * we're going to remove all rows from the aggregation anyway, a restart 779 * surely is faster. 780 *---------- 781 */ 782 numaggs_restart = 0; 783 for (i = 0; i < numaggs; i++) 784 { 785 peraggstate = &winstate->peragg[i]; 786 if (winstate->currentpos == 0 || 787 (winstate->aggregatedbase != winstate->frameheadpos && 788 !OidIsValid(peraggstate->invtransfn_oid)) || 789 (winstate->frameOptions & FRAMEOPTION_EXCLUSION) || 790 winstate->aggregatedupto <= winstate->frameheadpos) 791 { 792 peraggstate->restart = true; 793 numaggs_restart++; 794 } 795 else 796 peraggstate->restart = false; 797 } 798 799 /* 800 * If we have any possibly-moving aggregates, attempt to advance 801 * aggregatedbase to match the frame's head by removing input rows that 802 * fell off the top of the frame from the aggregations. This can fail, 803 * i.e. advance_windowaggregate_base() can return false, in which case 804 * we'll restart that aggregate below. 805 */ 806 while (numaggs_restart < numaggs && 807 winstate->aggregatedbase < winstate->frameheadpos) 808 { 809 /* 810 * Fetch the next tuple of those being removed. This should never fail 811 * as we should have been here before. 812 */ 813 if (!window_gettupleslot(agg_winobj, winstate->aggregatedbase, 814 temp_slot)) 815 elog(ERROR, "could not re-fetch previously fetched frame row"); 816 817 /* Set tuple context for evaluation of aggregate arguments */ 818 winstate->tmpcontext->ecxt_outertuple = temp_slot; 819 820 /* 821 * Perform the inverse transition for each aggregate function in the 822 * window, unless it has already been marked as needing a restart. 823 */ 824 for (i = 0; i < numaggs; i++) 825 { 826 bool ok; 827 828 peraggstate = &winstate->peragg[i]; 829 if (peraggstate->restart) 830 continue; 831 832 wfuncno = peraggstate->wfuncno; 833 ok = advance_windowaggregate_base(winstate, 834 &winstate->perfunc[wfuncno], 835 peraggstate); 836 if (!ok) 837 { 838 /* Inverse transition function has failed, must restart */ 839 peraggstate->restart = true; 840 numaggs_restart++; 841 } 842 } 843 844 /* Reset per-input-tuple context after each tuple */ 845 ResetExprContext(winstate->tmpcontext); 846 847 /* And advance the aggregated-row state */ 848 winstate->aggregatedbase++; 849 ExecClearTuple(temp_slot); 850 } 851 852 /* 853 * If we successfully advanced the base rows of all the aggregates, 854 * aggregatedbase now equals frameheadpos; but if we failed for any, we 855 * must forcibly update aggregatedbase. 856 */ 857 winstate->aggregatedbase = winstate->frameheadpos; 858 859 /* 860 * If we created a mark pointer for aggregates, keep it pushed up to frame 861 * head, so that tuplestore can discard unnecessary rows. 862 */ 863 if (agg_winobj->markptr >= 0) 864 WinSetMarkPosition(agg_winobj, winstate->frameheadpos); 865 866 /* 867 * Now restart the aggregates that require it. 868 * 869 * We assume that aggregates using the shared context always restart if 870 * *any* aggregate restarts, and we may thus clean up the shared 871 * aggcontext if that is the case. Private aggcontexts are reset by 872 * initialize_windowaggregate() if their owning aggregate restarts. If we 873 * aren't restarting an aggregate, we need to free any previously saved 874 * result for it, else we'll leak memory. 875 */ 876 if (numaggs_restart > 0) 877 MemoryContextResetAndDeleteChildren(winstate->aggcontext); 878 for (i = 0; i < numaggs; i++) 879 { 880 peraggstate = &winstate->peragg[i]; 881 882 /* Aggregates using the shared ctx must restart if *any* agg does */ 883 Assert(peraggstate->aggcontext != winstate->aggcontext || 884 numaggs_restart == 0 || 885 peraggstate->restart); 886 887 if (peraggstate->restart) 888 { 889 wfuncno = peraggstate->wfuncno; 890 initialize_windowaggregate(winstate, 891 &winstate->perfunc[wfuncno], 892 peraggstate); 893 } 894 else if (!peraggstate->resultValueIsNull) 895 { 896 if (!peraggstate->resulttypeByVal) 897 pfree(DatumGetPointer(peraggstate->resultValue)); 898 peraggstate->resultValue = (Datum) 0; 899 peraggstate->resultValueIsNull = true; 900 } 901 } 902 903 /* 904 * Non-restarted aggregates now contain the rows between aggregatedbase 905 * (i.e., frameheadpos) and aggregatedupto, while restarted aggregates 906 * contain no rows. If there are any restarted aggregates, we must thus 907 * begin aggregating anew at frameheadpos, otherwise we may simply 908 * continue at aggregatedupto. We must remember the old value of 909 * aggregatedupto to know how long to skip advancing non-restarted 910 * aggregates. If we modify aggregatedupto, we must also clear 911 * agg_row_slot, per the loop invariant below. 912 */ 913 aggregatedupto_nonrestarted = winstate->aggregatedupto; 914 if (numaggs_restart > 0 && 915 winstate->aggregatedupto != winstate->frameheadpos) 916 { 917 winstate->aggregatedupto = winstate->frameheadpos; 918 ExecClearTuple(agg_row_slot); 919 } 920 921 /* 922 * Advance until we reach a row not in frame (or end of partition). 923 * 924 * Note the loop invariant: agg_row_slot is either empty or holds the row 925 * at position aggregatedupto. We advance aggregatedupto after processing 926 * a row. 927 */ 928 for (;;) 929 { 930 int ret; 931 932 /* Fetch next row if we didn't already */ 933 if (TupIsNull(agg_row_slot)) 934 { 935 if (!window_gettupleslot(agg_winobj, winstate->aggregatedupto, 936 agg_row_slot)) 937 break; /* must be end of partition */ 938 } 939 940 /* 941 * Exit loop if no more rows can be in frame. Skip aggregation if 942 * current row is not in frame but there might be more in the frame. 943 */ 944 ret = row_is_in_frame(winstate, winstate->aggregatedupto, agg_row_slot); 945 if (ret < 0) 946 break; 947 if (ret == 0) 948 goto next_tuple; 949 950 /* Set tuple context for evaluation of aggregate arguments */ 951 winstate->tmpcontext->ecxt_outertuple = agg_row_slot; 952 953 /* Accumulate row into the aggregates */ 954 for (i = 0; i < numaggs; i++) 955 { 956 peraggstate = &winstate->peragg[i]; 957 958 /* Non-restarted aggs skip until aggregatedupto_nonrestarted */ 959 if (!peraggstate->restart && 960 winstate->aggregatedupto < aggregatedupto_nonrestarted) 961 continue; 962 963 wfuncno = peraggstate->wfuncno; 964 advance_windowaggregate(winstate, 965 &winstate->perfunc[wfuncno], 966 peraggstate); 967 } 968 969 next_tuple: 970 /* Reset per-input-tuple context after each tuple */ 971 ResetExprContext(winstate->tmpcontext); 972 973 /* And advance the aggregated-row state */ 974 winstate->aggregatedupto++; 975 ExecClearTuple(agg_row_slot); 976 } 977 978 /* The frame's end is not supposed to move backwards, ever */ 979 Assert(aggregatedupto_nonrestarted <= winstate->aggregatedupto); 980 981 /* 982 * finalize aggregates and fill result/isnull fields. 983 */ 984 for (i = 0; i < numaggs; i++) 985 { 986 Datum *result; 987 bool *isnull; 988 989 peraggstate = &winstate->peragg[i]; 990 wfuncno = peraggstate->wfuncno; 991 result = &econtext->ecxt_aggvalues[wfuncno]; 992 isnull = &econtext->ecxt_aggnulls[wfuncno]; 993 finalize_windowaggregate(winstate, 994 &winstate->perfunc[wfuncno], 995 peraggstate, 996 result, isnull); 997 998 /* 999 * save the result in case next row shares the same frame. 1000 * 1001 * XXX in some framing modes, eg ROWS/END_CURRENT_ROW, we can know in 1002 * advance that the next row can't possibly share the same frame. Is 1003 * it worth detecting that and skipping this code? 1004 */ 1005 if (!peraggstate->resulttypeByVal && !*isnull) 1006 { 1007 oldContext = MemoryContextSwitchTo(peraggstate->aggcontext); 1008 peraggstate->resultValue = 1009 datumCopy(*result, 1010 peraggstate->resulttypeByVal, 1011 peraggstate->resulttypeLen); 1012 MemoryContextSwitchTo(oldContext); 1013 } 1014 else 1015 { 1016 peraggstate->resultValue = *result; 1017 } 1018 peraggstate->resultValueIsNull = *isnull; 1019 } 1020 } 1021 1022 /* 1023 * eval_windowfunction 1024 * 1025 * Arguments of window functions are not evaluated here, because a window 1026 * function can need random access to arbitrary rows in the partition. 1027 * The window function uses the special WinGetFuncArgInPartition and 1028 * WinGetFuncArgInFrame functions to evaluate the arguments for the rows 1029 * it wants. 1030 */ 1031 static void 1032 eval_windowfunction(WindowAggState *winstate, WindowStatePerFunc perfuncstate, 1033 Datum *result, bool *isnull) 1034 { 1035 FunctionCallInfoData fcinfo; 1036 MemoryContext oldContext; 1037 1038 oldContext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory); 1039 1040 /* 1041 * We don't pass any normal arguments to a window function, but we do pass 1042 * it the number of arguments, in order to permit window function 1043 * implementations to support varying numbers of arguments. The real info 1044 * goes through the WindowObject, which is passed via fcinfo->context. 1045 */ 1046 InitFunctionCallInfoData(fcinfo, &(perfuncstate->flinfo), 1047 perfuncstate->numArguments, 1048 perfuncstate->winCollation, 1049 (void *) perfuncstate->winobj, NULL); 1050 /* Just in case, make all the regular argument slots be null */ 1051 memset(fcinfo.argnull, true, perfuncstate->numArguments); 1052 /* Window functions don't have a current aggregate context, either */ 1053 winstate->curaggcontext = NULL; 1054 1055 *result = FunctionCallInvoke(&fcinfo); 1056 *isnull = fcinfo.isnull; 1057 1058 /* 1059 * Make sure pass-by-ref data is allocated in the appropriate context. (We 1060 * need this in case the function returns a pointer into some short-lived 1061 * tuple, as is entirely possible.) 1062 */ 1063 if (!perfuncstate->resulttypeByVal && !fcinfo.isnull && 1064 !MemoryContextContains(CurrentMemoryContext, 1065 DatumGetPointer(*result))) 1066 *result = datumCopy(*result, 1067 perfuncstate->resulttypeByVal, 1068 perfuncstate->resulttypeLen); 1069 1070 MemoryContextSwitchTo(oldContext); 1071 } 1072 1073 /* 1074 * begin_partition 1075 * Start buffering rows of the next partition. 1076 */ 1077 static void 1078 begin_partition(WindowAggState *winstate) 1079 { 1080 WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan; 1081 PlanState *outerPlan = outerPlanState(winstate); 1082 int frameOptions = winstate->frameOptions; 1083 int numfuncs = winstate->numfuncs; 1084 int i; 1085 1086 winstate->partition_spooled = false; 1087 winstate->framehead_valid = false; 1088 winstate->frametail_valid = false; 1089 winstate->grouptail_valid = false; 1090 winstate->spooled_rows = 0; 1091 winstate->currentpos = 0; 1092 winstate->frameheadpos = 0; 1093 winstate->frametailpos = 0; 1094 winstate->currentgroup = 0; 1095 winstate->frameheadgroup = 0; 1096 winstate->frametailgroup = 0; 1097 winstate->groupheadpos = 0; 1098 winstate->grouptailpos = -1; /* see update_grouptailpos */ 1099 ExecClearTuple(winstate->agg_row_slot); 1100 if (winstate->framehead_slot) 1101 ExecClearTuple(winstate->framehead_slot); 1102 if (winstate->frametail_slot) 1103 ExecClearTuple(winstate->frametail_slot); 1104 1105 /* 1106 * If this is the very first partition, we need to fetch the first input 1107 * row to store in first_part_slot. 1108 */ 1109 if (TupIsNull(winstate->first_part_slot)) 1110 { 1111 TupleTableSlot *outerslot = ExecProcNode(outerPlan); 1112 1113 if (!TupIsNull(outerslot)) 1114 ExecCopySlot(winstate->first_part_slot, outerslot); 1115 else 1116 { 1117 /* outer plan is empty, so we have nothing to do */ 1118 winstate->partition_spooled = true; 1119 winstate->more_partitions = false; 1120 return; 1121 } 1122 } 1123 1124 /* Create new tuplestore for this partition */ 1125 winstate->buffer = tuplestore_begin_heap(false, false, work_mem); 1126 1127 /* 1128 * Set up read pointers for the tuplestore. The current pointer doesn't 1129 * need BACKWARD capability, but the per-window-function read pointers do, 1130 * and the aggregate pointer does if we might need to restart aggregation. 1131 */ 1132 winstate->current_ptr = 0; /* read pointer 0 is pre-allocated */ 1133 1134 /* reset default REWIND capability bit for current ptr */ 1135 tuplestore_set_eflags(winstate->buffer, 0); 1136 1137 /* create read pointers for aggregates, if needed */ 1138 if (winstate->numaggs > 0) 1139 { 1140 WindowObject agg_winobj = winstate->agg_winobj; 1141 int readptr_flags = 0; 1142 1143 /* 1144 * If the frame head is potentially movable, or we have an EXCLUSION 1145 * clause, we might need to restart aggregation ... 1146 */ 1147 if (!(frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING) || 1148 (frameOptions & FRAMEOPTION_EXCLUSION)) 1149 { 1150 /* ... so create a mark pointer to track the frame head */ 1151 agg_winobj->markptr = tuplestore_alloc_read_pointer(winstate->buffer, 0); 1152 /* and the read pointer will need BACKWARD capability */ 1153 readptr_flags |= EXEC_FLAG_BACKWARD; 1154 } 1155 1156 agg_winobj->readptr = tuplestore_alloc_read_pointer(winstate->buffer, 1157 readptr_flags); 1158 agg_winobj->markpos = -1; 1159 agg_winobj->seekpos = -1; 1160 1161 /* Also reset the row counters for aggregates */ 1162 winstate->aggregatedbase = 0; 1163 winstate->aggregatedupto = 0; 1164 } 1165 1166 /* create mark and read pointers for each real window function */ 1167 for (i = 0; i < numfuncs; i++) 1168 { 1169 WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]); 1170 1171 if (!perfuncstate->plain_agg) 1172 { 1173 WindowObject winobj = perfuncstate->winobj; 1174 1175 winobj->markptr = tuplestore_alloc_read_pointer(winstate->buffer, 1176 0); 1177 winobj->readptr = tuplestore_alloc_read_pointer(winstate->buffer, 1178 EXEC_FLAG_BACKWARD); 1179 winobj->markpos = -1; 1180 winobj->seekpos = -1; 1181 } 1182 } 1183 1184 /* 1185 * If we are in RANGE or GROUPS mode, then determining frame boundaries 1186 * requires physical access to the frame endpoint rows, except in certain 1187 * degenerate cases. We create read pointers to point to those rows, to 1188 * simplify access and ensure that the tuplestore doesn't discard the 1189 * endpoint rows prematurely. (Must create pointers in exactly the same 1190 * cases that update_frameheadpos and update_frametailpos need them.) 1191 */ 1192 winstate->framehead_ptr = winstate->frametail_ptr = -1; /* if not used */ 1193 1194 if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS)) 1195 { 1196 if (((frameOptions & FRAMEOPTION_START_CURRENT_ROW) && 1197 node->ordNumCols != 0) || 1198 (frameOptions & FRAMEOPTION_START_OFFSET)) 1199 winstate->framehead_ptr = 1200 tuplestore_alloc_read_pointer(winstate->buffer, 0); 1201 if (((frameOptions & FRAMEOPTION_END_CURRENT_ROW) && 1202 node->ordNumCols != 0) || 1203 (frameOptions & FRAMEOPTION_END_OFFSET)) 1204 winstate->frametail_ptr = 1205 tuplestore_alloc_read_pointer(winstate->buffer, 0); 1206 } 1207 1208 /* 1209 * If we have an exclusion clause that requires knowing the boundaries of 1210 * the current row's peer group, we create a read pointer to track the 1211 * tail position of the peer group (i.e., first row of the next peer 1212 * group). The head position does not require its own pointer because we 1213 * maintain that as a side effect of advancing the current row. 1214 */ 1215 winstate->grouptail_ptr = -1; 1216 1217 if ((frameOptions & (FRAMEOPTION_EXCLUDE_GROUP | 1218 FRAMEOPTION_EXCLUDE_TIES)) && 1219 node->ordNumCols != 0) 1220 { 1221 winstate->grouptail_ptr = 1222 tuplestore_alloc_read_pointer(winstate->buffer, 0); 1223 } 1224 1225 /* 1226 * Store the first tuple into the tuplestore (it's always available now; 1227 * we either read it above, or saved it at the end of previous partition) 1228 */ 1229 tuplestore_puttupleslot(winstate->buffer, winstate->first_part_slot); 1230 winstate->spooled_rows++; 1231 } 1232 1233 /* 1234 * Read tuples from the outer node, up to and including position 'pos', and 1235 * store them into the tuplestore. If pos is -1, reads the whole partition. 1236 */ 1237 static void 1238 spool_tuples(WindowAggState *winstate, int64 pos) 1239 { 1240 WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan; 1241 PlanState *outerPlan; 1242 TupleTableSlot *outerslot; 1243 MemoryContext oldcontext; 1244 1245 if (!winstate->buffer) 1246 return; /* just a safety check */ 1247 if (winstate->partition_spooled) 1248 return; /* whole partition done already */ 1249 1250 /* 1251 * If the tuplestore has spilled to disk, alternate reading and writing 1252 * becomes quite expensive due to frequent buffer flushes. It's cheaper 1253 * to force the entire partition to get spooled in one go. 1254 * 1255 * XXX this is a horrid kluge --- it'd be better to fix the performance 1256 * problem inside tuplestore. FIXME 1257 */ 1258 if (!tuplestore_in_memory(winstate->buffer)) 1259 pos = -1; 1260 1261 outerPlan = outerPlanState(winstate); 1262 1263 /* Must be in query context to call outerplan */ 1264 oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory); 1265 1266 while (winstate->spooled_rows <= pos || pos == -1) 1267 { 1268 outerslot = ExecProcNode(outerPlan); 1269 if (TupIsNull(outerslot)) 1270 { 1271 /* reached the end of the last partition */ 1272 winstate->partition_spooled = true; 1273 winstate->more_partitions = false; 1274 break; 1275 } 1276 1277 if (node->partNumCols > 0) 1278 { 1279 ExprContext *econtext = winstate->tmpcontext; 1280 1281 econtext->ecxt_innertuple = winstate->first_part_slot; 1282 econtext->ecxt_outertuple = outerslot; 1283 1284 /* Check if this tuple still belongs to the current partition */ 1285 if (!ExecQualAndReset(winstate->partEqfunction, econtext)) 1286 { 1287 /* 1288 * end of partition; copy the tuple for the next cycle. 1289 */ 1290 ExecCopySlot(winstate->first_part_slot, outerslot); 1291 winstate->partition_spooled = true; 1292 winstate->more_partitions = true; 1293 break; 1294 } 1295 } 1296 1297 /* Still in partition, so save it into the tuplestore */ 1298 tuplestore_puttupleslot(winstate->buffer, outerslot); 1299 winstate->spooled_rows++; 1300 } 1301 1302 MemoryContextSwitchTo(oldcontext); 1303 } 1304 1305 /* 1306 * release_partition 1307 * clear information kept within a partition, including 1308 * tuplestore and aggregate results. 1309 */ 1310 static void 1311 release_partition(WindowAggState *winstate) 1312 { 1313 int i; 1314 1315 for (i = 0; i < winstate->numfuncs; i++) 1316 { 1317 WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]); 1318 1319 /* Release any partition-local state of this window function */ 1320 if (perfuncstate->winobj) 1321 perfuncstate->winobj->localmem = NULL; 1322 } 1323 1324 /* 1325 * Release all partition-local memory (in particular, any partition-local 1326 * state that we might have trashed our pointers to in the above loop, and 1327 * any aggregate temp data). We don't rely on retail pfree because some 1328 * aggregates might have allocated data we don't have direct pointers to. 1329 */ 1330 MemoryContextResetAndDeleteChildren(winstate->partcontext); 1331 MemoryContextResetAndDeleteChildren(winstate->aggcontext); 1332 for (i = 0; i < winstate->numaggs; i++) 1333 { 1334 if (winstate->peragg[i].aggcontext != winstate->aggcontext) 1335 MemoryContextResetAndDeleteChildren(winstate->peragg[i].aggcontext); 1336 } 1337 1338 if (winstate->buffer) 1339 tuplestore_end(winstate->buffer); 1340 winstate->buffer = NULL; 1341 winstate->partition_spooled = false; 1342 } 1343 1344 /* 1345 * row_is_in_frame 1346 * Determine whether a row is in the current row's window frame according 1347 * to our window framing rule 1348 * 1349 * The caller must have already determined that the row is in the partition 1350 * and fetched it into a slot. This function just encapsulates the framing 1351 * rules. 1352 * 1353 * Returns: 1354 * -1, if the row is out of frame and no succeeding rows can be in frame 1355 * 0, if the row is out of frame but succeeding rows might be in frame 1356 * 1, if the row is in frame 1357 * 1358 * May clobber winstate->temp_slot_2. 1359 */ 1360 static int 1361 row_is_in_frame(WindowAggState *winstate, int64 pos, TupleTableSlot *slot) 1362 { 1363 int frameOptions = winstate->frameOptions; 1364 1365 Assert(pos >= 0); /* else caller error */ 1366 1367 /* 1368 * First, check frame starting conditions. We might as well delegate this 1369 * to update_frameheadpos always; it doesn't add any notable cost. 1370 */ 1371 update_frameheadpos(winstate); 1372 if (pos < winstate->frameheadpos) 1373 return 0; 1374 1375 /* 1376 * Okay so far, now check frame ending conditions. Here, we avoid calling 1377 * update_frametailpos in simple cases, so as not to spool tuples further 1378 * ahead than necessary. 1379 */ 1380 if (frameOptions & FRAMEOPTION_END_CURRENT_ROW) 1381 { 1382 if (frameOptions & FRAMEOPTION_ROWS) 1383 { 1384 /* rows after current row are out of frame */ 1385 if (pos > winstate->currentpos) 1386 return -1; 1387 } 1388 else if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS)) 1389 { 1390 /* following row that is not peer is out of frame */ 1391 if (pos > winstate->currentpos && 1392 !are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot)) 1393 return -1; 1394 } 1395 else 1396 Assert(false); 1397 } 1398 else if (frameOptions & FRAMEOPTION_END_OFFSET) 1399 { 1400 if (frameOptions & FRAMEOPTION_ROWS) 1401 { 1402 int64 offset = DatumGetInt64(winstate->endOffsetValue); 1403 1404 /* rows after current row + offset are out of frame */ 1405 if (frameOptions & FRAMEOPTION_END_OFFSET_PRECEDING) 1406 offset = -offset; 1407 1408 if (pos > winstate->currentpos + offset) 1409 return -1; 1410 } 1411 else if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS)) 1412 { 1413 /* hard cases, so delegate to update_frametailpos */ 1414 update_frametailpos(winstate); 1415 if (pos >= winstate->frametailpos) 1416 return -1; 1417 } 1418 else 1419 Assert(false); 1420 } 1421 1422 /* Check exclusion clause */ 1423 if (frameOptions & FRAMEOPTION_EXCLUDE_CURRENT_ROW) 1424 { 1425 if (pos == winstate->currentpos) 1426 return 0; 1427 } 1428 else if ((frameOptions & FRAMEOPTION_EXCLUDE_GROUP) || 1429 ((frameOptions & FRAMEOPTION_EXCLUDE_TIES) && 1430 pos != winstate->currentpos)) 1431 { 1432 WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan; 1433 1434 /* If no ORDER BY, all rows are peers with each other */ 1435 if (node->ordNumCols == 0) 1436 return 0; 1437 /* Otherwise, check the group boundaries */ 1438 if (pos >= winstate->groupheadpos) 1439 { 1440 update_grouptailpos(winstate); 1441 if (pos < winstate->grouptailpos) 1442 return 0; 1443 } 1444 } 1445 1446 /* If we get here, it's in frame */ 1447 return 1; 1448 } 1449 1450 /* 1451 * update_frameheadpos 1452 * make frameheadpos valid for the current row 1453 * 1454 * Note that frameheadpos is computed without regard for any window exclusion 1455 * clause; the current row and/or its peers are considered part of the frame 1456 * for this purpose even if they must be excluded later. 1457 * 1458 * May clobber winstate->temp_slot_2. 1459 */ 1460 static void 1461 update_frameheadpos(WindowAggState *winstate) 1462 { 1463 WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan; 1464 int frameOptions = winstate->frameOptions; 1465 MemoryContext oldcontext; 1466 1467 if (winstate->framehead_valid) 1468 return; /* already known for current row */ 1469 1470 /* We may be called in a short-lived context */ 1471 oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory); 1472 1473 if (frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING) 1474 { 1475 /* In UNBOUNDED PRECEDING mode, frame head is always row 0 */ 1476 winstate->frameheadpos = 0; 1477 winstate->framehead_valid = true; 1478 } 1479 else if (frameOptions & FRAMEOPTION_START_CURRENT_ROW) 1480 { 1481 if (frameOptions & FRAMEOPTION_ROWS) 1482 { 1483 /* In ROWS mode, frame head is the same as current */ 1484 winstate->frameheadpos = winstate->currentpos; 1485 winstate->framehead_valid = true; 1486 } 1487 else if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS)) 1488 { 1489 /* If no ORDER BY, all rows are peers with each other */ 1490 if (node->ordNumCols == 0) 1491 { 1492 winstate->frameheadpos = 0; 1493 winstate->framehead_valid = true; 1494 MemoryContextSwitchTo(oldcontext); 1495 return; 1496 } 1497 1498 /* 1499 * In RANGE or GROUPS START_CURRENT_ROW mode, frame head is the 1500 * first row that is a peer of current row. We keep a copy of the 1501 * last-known frame head row in framehead_slot, and advance as 1502 * necessary. Note that if we reach end of partition, we will 1503 * leave frameheadpos = end+1 and framehead_slot empty. 1504 */ 1505 tuplestore_select_read_pointer(winstate->buffer, 1506 winstate->framehead_ptr); 1507 if (winstate->frameheadpos == 0 && 1508 TupIsNull(winstate->framehead_slot)) 1509 { 1510 /* fetch first row into framehead_slot, if we didn't already */ 1511 if (!tuplestore_gettupleslot(winstate->buffer, true, true, 1512 winstate->framehead_slot)) 1513 elog(ERROR, "unexpected end of tuplestore"); 1514 } 1515 1516 while (!TupIsNull(winstate->framehead_slot)) 1517 { 1518 if (are_peers(winstate, winstate->framehead_slot, 1519 winstate->ss.ss_ScanTupleSlot)) 1520 break; /* this row is the correct frame head */ 1521 /* Note we advance frameheadpos even if the fetch fails */ 1522 winstate->frameheadpos++; 1523 spool_tuples(winstate, winstate->frameheadpos); 1524 if (!tuplestore_gettupleslot(winstate->buffer, true, true, 1525 winstate->framehead_slot)) 1526 break; /* end of partition */ 1527 } 1528 winstate->framehead_valid = true; 1529 } 1530 else 1531 Assert(false); 1532 } 1533 else if (frameOptions & FRAMEOPTION_START_OFFSET) 1534 { 1535 if (frameOptions & FRAMEOPTION_ROWS) 1536 { 1537 /* In ROWS mode, bound is physically n before/after current */ 1538 int64 offset = DatumGetInt64(winstate->startOffsetValue); 1539 1540 if (frameOptions & FRAMEOPTION_START_OFFSET_PRECEDING) 1541 offset = -offset; 1542 1543 winstate->frameheadpos = winstate->currentpos + offset; 1544 /* frame head can't go before first row */ 1545 if (winstate->frameheadpos < 0) 1546 winstate->frameheadpos = 0; 1547 else if (winstate->frameheadpos > winstate->currentpos + 1) 1548 { 1549 /* make sure frameheadpos is not past end of partition */ 1550 spool_tuples(winstate, winstate->frameheadpos - 1); 1551 if (winstate->frameheadpos > winstate->spooled_rows) 1552 winstate->frameheadpos = winstate->spooled_rows; 1553 } 1554 winstate->framehead_valid = true; 1555 } 1556 else if (frameOptions & FRAMEOPTION_RANGE) 1557 { 1558 /* 1559 * In RANGE START_OFFSET mode, frame head is the first row that 1560 * satisfies the in_range constraint relative to the current row. 1561 * We keep a copy of the last-known frame head row in 1562 * framehead_slot, and advance as necessary. Note that if we 1563 * reach end of partition, we will leave frameheadpos = end+1 and 1564 * framehead_slot empty. 1565 */ 1566 int sortCol = node->ordColIdx[0]; 1567 bool sub, 1568 less; 1569 1570 /* We must have an ordering column */ 1571 Assert(node->ordNumCols == 1); 1572 1573 /* Precompute flags for in_range checks */ 1574 if (frameOptions & FRAMEOPTION_START_OFFSET_PRECEDING) 1575 sub = true; /* subtract startOffset from current row */ 1576 else 1577 sub = false; /* add it */ 1578 less = false; /* normally, we want frame head >= sum */ 1579 /* If sort order is descending, flip both flags */ 1580 if (!winstate->inRangeAsc) 1581 { 1582 sub = !sub; 1583 less = true; 1584 } 1585 1586 tuplestore_select_read_pointer(winstate->buffer, 1587 winstate->framehead_ptr); 1588 if (winstate->frameheadpos == 0 && 1589 TupIsNull(winstate->framehead_slot)) 1590 { 1591 /* fetch first row into framehead_slot, if we didn't already */ 1592 if (!tuplestore_gettupleslot(winstate->buffer, true, true, 1593 winstate->framehead_slot)) 1594 elog(ERROR, "unexpected end of tuplestore"); 1595 } 1596 1597 while (!TupIsNull(winstate->framehead_slot)) 1598 { 1599 Datum headval, 1600 currval; 1601 bool headisnull, 1602 currisnull; 1603 1604 headval = slot_getattr(winstate->framehead_slot, sortCol, 1605 &headisnull); 1606 currval = slot_getattr(winstate->ss.ss_ScanTupleSlot, sortCol, 1607 &currisnull); 1608 if (headisnull || currisnull) 1609 { 1610 /* order of the rows depends only on nulls_first */ 1611 if (winstate->inRangeNullsFirst) 1612 { 1613 /* advance head if head is null and curr is not */ 1614 if (!headisnull || currisnull) 1615 break; 1616 } 1617 else 1618 { 1619 /* advance head if head is not null and curr is null */ 1620 if (headisnull || !currisnull) 1621 break; 1622 } 1623 } 1624 else 1625 { 1626 if (DatumGetBool(FunctionCall5Coll(&winstate->startInRangeFunc, 1627 winstate->inRangeColl, 1628 headval, 1629 currval, 1630 winstate->startOffsetValue, 1631 BoolGetDatum(sub), 1632 BoolGetDatum(less)))) 1633 break; /* this row is the correct frame head */ 1634 } 1635 /* Note we advance frameheadpos even if the fetch fails */ 1636 winstate->frameheadpos++; 1637 spool_tuples(winstate, winstate->frameheadpos); 1638 if (!tuplestore_gettupleslot(winstate->buffer, true, true, 1639 winstate->framehead_slot)) 1640 break; /* end of partition */ 1641 } 1642 winstate->framehead_valid = true; 1643 } 1644 else if (frameOptions & FRAMEOPTION_GROUPS) 1645 { 1646 /* 1647 * In GROUPS START_OFFSET mode, frame head is the first row of the 1648 * first peer group whose number satisfies the offset constraint. 1649 * We keep a copy of the last-known frame head row in 1650 * framehead_slot, and advance as necessary. Note that if we 1651 * reach end of partition, we will leave frameheadpos = end+1 and 1652 * framehead_slot empty. 1653 */ 1654 int64 offset = DatumGetInt64(winstate->startOffsetValue); 1655 int64 minheadgroup; 1656 1657 if (frameOptions & FRAMEOPTION_START_OFFSET_PRECEDING) 1658 minheadgroup = winstate->currentgroup - offset; 1659 else 1660 minheadgroup = winstate->currentgroup + offset; 1661 1662 tuplestore_select_read_pointer(winstate->buffer, 1663 winstate->framehead_ptr); 1664 if (winstate->frameheadpos == 0 && 1665 TupIsNull(winstate->framehead_slot)) 1666 { 1667 /* fetch first row into framehead_slot, if we didn't already */ 1668 if (!tuplestore_gettupleslot(winstate->buffer, true, true, 1669 winstate->framehead_slot)) 1670 elog(ERROR, "unexpected end of tuplestore"); 1671 } 1672 1673 while (!TupIsNull(winstate->framehead_slot)) 1674 { 1675 if (winstate->frameheadgroup >= minheadgroup) 1676 break; /* this row is the correct frame head */ 1677 ExecCopySlot(winstate->temp_slot_2, winstate->framehead_slot); 1678 /* Note we advance frameheadpos even if the fetch fails */ 1679 winstate->frameheadpos++; 1680 spool_tuples(winstate, winstate->frameheadpos); 1681 if (!tuplestore_gettupleslot(winstate->buffer, true, true, 1682 winstate->framehead_slot)) 1683 break; /* end of partition */ 1684 if (!are_peers(winstate, winstate->temp_slot_2, 1685 winstate->framehead_slot)) 1686 winstate->frameheadgroup++; 1687 } 1688 ExecClearTuple(winstate->temp_slot_2); 1689 winstate->framehead_valid = true; 1690 } 1691 else 1692 Assert(false); 1693 } 1694 else 1695 Assert(false); 1696 1697 MemoryContextSwitchTo(oldcontext); 1698 } 1699 1700 /* 1701 * update_frametailpos 1702 * make frametailpos valid for the current row 1703 * 1704 * Note that frametailpos is computed without regard for any window exclusion 1705 * clause; the current row and/or its peers are considered part of the frame 1706 * for this purpose even if they must be excluded later. 1707 * 1708 * May clobber winstate->temp_slot_2. 1709 */ 1710 static void 1711 update_frametailpos(WindowAggState *winstate) 1712 { 1713 WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan; 1714 int frameOptions = winstate->frameOptions; 1715 MemoryContext oldcontext; 1716 1717 if (winstate->frametail_valid) 1718 return; /* already known for current row */ 1719 1720 /* We may be called in a short-lived context */ 1721 oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory); 1722 1723 if (frameOptions & FRAMEOPTION_END_UNBOUNDED_FOLLOWING) 1724 { 1725 /* In UNBOUNDED FOLLOWING mode, all partition rows are in frame */ 1726 spool_tuples(winstate, -1); 1727 winstate->frametailpos = winstate->spooled_rows; 1728 winstate->frametail_valid = true; 1729 } 1730 else if (frameOptions & FRAMEOPTION_END_CURRENT_ROW) 1731 { 1732 if (frameOptions & FRAMEOPTION_ROWS) 1733 { 1734 /* In ROWS mode, exactly the rows up to current are in frame */ 1735 winstate->frametailpos = winstate->currentpos + 1; 1736 winstate->frametail_valid = true; 1737 } 1738 else if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS)) 1739 { 1740 /* If no ORDER BY, all rows are peers with each other */ 1741 if (node->ordNumCols == 0) 1742 { 1743 spool_tuples(winstate, -1); 1744 winstate->frametailpos = winstate->spooled_rows; 1745 winstate->frametail_valid = true; 1746 MemoryContextSwitchTo(oldcontext); 1747 return; 1748 } 1749 1750 /* 1751 * In RANGE or GROUPS END_CURRENT_ROW mode, frame end is the last 1752 * row that is a peer of current row, frame tail is the row after 1753 * that (if any). We keep a copy of the last-known frame tail row 1754 * in frametail_slot, and advance as necessary. Note that if we 1755 * reach end of partition, we will leave frametailpos = end+1 and 1756 * frametail_slot empty. 1757 */ 1758 tuplestore_select_read_pointer(winstate->buffer, 1759 winstate->frametail_ptr); 1760 if (winstate->frametailpos == 0 && 1761 TupIsNull(winstate->frametail_slot)) 1762 { 1763 /* fetch first row into frametail_slot, if we didn't already */ 1764 if (!tuplestore_gettupleslot(winstate->buffer, true, true, 1765 winstate->frametail_slot)) 1766 elog(ERROR, "unexpected end of tuplestore"); 1767 } 1768 1769 while (!TupIsNull(winstate->frametail_slot)) 1770 { 1771 if (winstate->frametailpos > winstate->currentpos && 1772 !are_peers(winstate, winstate->frametail_slot, 1773 winstate->ss.ss_ScanTupleSlot)) 1774 break; /* this row is the frame tail */ 1775 /* Note we advance frametailpos even if the fetch fails */ 1776 winstate->frametailpos++; 1777 spool_tuples(winstate, winstate->frametailpos); 1778 if (!tuplestore_gettupleslot(winstate->buffer, true, true, 1779 winstate->frametail_slot)) 1780 break; /* end of partition */ 1781 } 1782 winstate->frametail_valid = true; 1783 } 1784 else 1785 Assert(false); 1786 } 1787 else if (frameOptions & FRAMEOPTION_END_OFFSET) 1788 { 1789 if (frameOptions & FRAMEOPTION_ROWS) 1790 { 1791 /* In ROWS mode, bound is physically n before/after current */ 1792 int64 offset = DatumGetInt64(winstate->endOffsetValue); 1793 1794 if (frameOptions & FRAMEOPTION_END_OFFSET_PRECEDING) 1795 offset = -offset; 1796 1797 winstate->frametailpos = winstate->currentpos + offset + 1; 1798 /* smallest allowable value of frametailpos is 0 */ 1799 if (winstate->frametailpos < 0) 1800 winstate->frametailpos = 0; 1801 else if (winstate->frametailpos > winstate->currentpos + 1) 1802 { 1803 /* make sure frametailpos is not past end of partition */ 1804 spool_tuples(winstate, winstate->frametailpos - 1); 1805 if (winstate->frametailpos > winstate->spooled_rows) 1806 winstate->frametailpos = winstate->spooled_rows; 1807 } 1808 winstate->frametail_valid = true; 1809 } 1810 else if (frameOptions & FRAMEOPTION_RANGE) 1811 { 1812 /* 1813 * In RANGE END_OFFSET mode, frame end is the last row that 1814 * satisfies the in_range constraint relative to the current row, 1815 * frame tail is the row after that (if any). We keep a copy of 1816 * the last-known frame tail row in frametail_slot, and advance as 1817 * necessary. Note that if we reach end of partition, we will 1818 * leave frametailpos = end+1 and frametail_slot empty. 1819 */ 1820 int sortCol = node->ordColIdx[0]; 1821 bool sub, 1822 less; 1823 1824 /* We must have an ordering column */ 1825 Assert(node->ordNumCols == 1); 1826 1827 /* Precompute flags for in_range checks */ 1828 if (frameOptions & FRAMEOPTION_END_OFFSET_PRECEDING) 1829 sub = true; /* subtract endOffset from current row */ 1830 else 1831 sub = false; /* add it */ 1832 less = true; /* normally, we want frame tail <= sum */ 1833 /* If sort order is descending, flip both flags */ 1834 if (!winstate->inRangeAsc) 1835 { 1836 sub = !sub; 1837 less = false; 1838 } 1839 1840 tuplestore_select_read_pointer(winstate->buffer, 1841 winstate->frametail_ptr); 1842 if (winstate->frametailpos == 0 && 1843 TupIsNull(winstate->frametail_slot)) 1844 { 1845 /* fetch first row into frametail_slot, if we didn't already */ 1846 if (!tuplestore_gettupleslot(winstate->buffer, true, true, 1847 winstate->frametail_slot)) 1848 elog(ERROR, "unexpected end of tuplestore"); 1849 } 1850 1851 while (!TupIsNull(winstate->frametail_slot)) 1852 { 1853 Datum tailval, 1854 currval; 1855 bool tailisnull, 1856 currisnull; 1857 1858 tailval = slot_getattr(winstate->frametail_slot, sortCol, 1859 &tailisnull); 1860 currval = slot_getattr(winstate->ss.ss_ScanTupleSlot, sortCol, 1861 &currisnull); 1862 if (tailisnull || currisnull) 1863 { 1864 /* order of the rows depends only on nulls_first */ 1865 if (winstate->inRangeNullsFirst) 1866 { 1867 /* advance tail if tail is null or curr is not */ 1868 if (!tailisnull) 1869 break; 1870 } 1871 else 1872 { 1873 /* advance tail if tail is not null or curr is null */ 1874 if (!currisnull) 1875 break; 1876 } 1877 } 1878 else 1879 { 1880 if (!DatumGetBool(FunctionCall5Coll(&winstate->endInRangeFunc, 1881 winstate->inRangeColl, 1882 tailval, 1883 currval, 1884 winstate->endOffsetValue, 1885 BoolGetDatum(sub), 1886 BoolGetDatum(less)))) 1887 break; /* this row is the correct frame tail */ 1888 } 1889 /* Note we advance frametailpos even if the fetch fails */ 1890 winstate->frametailpos++; 1891 spool_tuples(winstate, winstate->frametailpos); 1892 if (!tuplestore_gettupleslot(winstate->buffer, true, true, 1893 winstate->frametail_slot)) 1894 break; /* end of partition */ 1895 } 1896 winstate->frametail_valid = true; 1897 } 1898 else if (frameOptions & FRAMEOPTION_GROUPS) 1899 { 1900 /* 1901 * In GROUPS END_OFFSET mode, frame end is the last row of the 1902 * last peer group whose number satisfies the offset constraint, 1903 * and frame tail is the row after that (if any). We keep a copy 1904 * of the last-known frame tail row in frametail_slot, and advance 1905 * as necessary. Note that if we reach end of partition, we will 1906 * leave frametailpos = end+1 and frametail_slot empty. 1907 */ 1908 int64 offset = DatumGetInt64(winstate->endOffsetValue); 1909 int64 maxtailgroup; 1910 1911 if (frameOptions & FRAMEOPTION_END_OFFSET_PRECEDING) 1912 maxtailgroup = winstate->currentgroup - offset; 1913 else 1914 maxtailgroup = winstate->currentgroup + offset; 1915 1916 tuplestore_select_read_pointer(winstate->buffer, 1917 winstate->frametail_ptr); 1918 if (winstate->frametailpos == 0 && 1919 TupIsNull(winstate->frametail_slot)) 1920 { 1921 /* fetch first row into frametail_slot, if we didn't already */ 1922 if (!tuplestore_gettupleslot(winstate->buffer, true, true, 1923 winstate->frametail_slot)) 1924 elog(ERROR, "unexpected end of tuplestore"); 1925 } 1926 1927 while (!TupIsNull(winstate->frametail_slot)) 1928 { 1929 if (winstate->frametailgroup > maxtailgroup) 1930 break; /* this row is the correct frame tail */ 1931 ExecCopySlot(winstate->temp_slot_2, winstate->frametail_slot); 1932 /* Note we advance frametailpos even if the fetch fails */ 1933 winstate->frametailpos++; 1934 spool_tuples(winstate, winstate->frametailpos); 1935 if (!tuplestore_gettupleslot(winstate->buffer, true, true, 1936 winstate->frametail_slot)) 1937 break; /* end of partition */ 1938 if (!are_peers(winstate, winstate->temp_slot_2, 1939 winstate->frametail_slot)) 1940 winstate->frametailgroup++; 1941 } 1942 ExecClearTuple(winstate->temp_slot_2); 1943 winstate->frametail_valid = true; 1944 } 1945 else 1946 Assert(false); 1947 } 1948 else 1949 Assert(false); 1950 1951 MemoryContextSwitchTo(oldcontext); 1952 } 1953 1954 /* 1955 * update_grouptailpos 1956 * make grouptailpos valid for the current row 1957 * 1958 * May clobber winstate->temp_slot_2. 1959 */ 1960 static void 1961 update_grouptailpos(WindowAggState *winstate) 1962 { 1963 WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan; 1964 MemoryContext oldcontext; 1965 1966 if (winstate->grouptail_valid) 1967 return; /* already known for current row */ 1968 1969 /* We may be called in a short-lived context */ 1970 oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory); 1971 1972 /* If no ORDER BY, all rows are peers with each other */ 1973 if (node->ordNumCols == 0) 1974 { 1975 spool_tuples(winstate, -1); 1976 winstate->grouptailpos = winstate->spooled_rows; 1977 winstate->grouptail_valid = true; 1978 MemoryContextSwitchTo(oldcontext); 1979 return; 1980 } 1981 1982 /* 1983 * Because grouptail_valid is reset only when current row advances into a 1984 * new peer group, we always reach here knowing that grouptailpos needs to 1985 * be advanced by at least one row. Hence, unlike the otherwise similar 1986 * case for frame tail tracking, we do not need persistent storage of the 1987 * group tail row. 1988 */ 1989 Assert(winstate->grouptailpos <= winstate->currentpos); 1990 tuplestore_select_read_pointer(winstate->buffer, 1991 winstate->grouptail_ptr); 1992 for (;;) 1993 { 1994 /* Note we advance grouptailpos even if the fetch fails */ 1995 winstate->grouptailpos++; 1996 spool_tuples(winstate, winstate->grouptailpos); 1997 if (!tuplestore_gettupleslot(winstate->buffer, true, true, 1998 winstate->temp_slot_2)) 1999 break; /* end of partition */ 2000 if (winstate->grouptailpos > winstate->currentpos && 2001 !are_peers(winstate, winstate->temp_slot_2, 2002 winstate->ss.ss_ScanTupleSlot)) 2003 break; /* this row is the group tail */ 2004 } 2005 ExecClearTuple(winstate->temp_slot_2); 2006 winstate->grouptail_valid = true; 2007 2008 MemoryContextSwitchTo(oldcontext); 2009 } 2010 2011 2012 /* ----------------- 2013 * ExecWindowAgg 2014 * 2015 * ExecWindowAgg receives tuples from its outer subplan and 2016 * stores them into a tuplestore, then processes window functions. 2017 * This node doesn't reduce nor qualify any row so the number of 2018 * returned rows is exactly the same as its outer subplan's result. 2019 * ----------------- 2020 */ 2021 static TupleTableSlot * 2022 ExecWindowAgg(PlanState *pstate) 2023 { 2024 WindowAggState *winstate = castNode(WindowAggState, pstate); 2025 ExprContext *econtext; 2026 int i; 2027 int numfuncs; 2028 2029 CHECK_FOR_INTERRUPTS(); 2030 2031 if (winstate->all_done) 2032 return NULL; 2033 2034 /* 2035 * Compute frame offset values, if any, during first call (or after a 2036 * rescan). These are assumed to hold constant throughout the scan; if 2037 * user gives us a volatile expression, we'll only use its initial value. 2038 */ 2039 if (winstate->all_first) 2040 { 2041 int frameOptions = winstate->frameOptions; 2042 ExprContext *econtext = winstate->ss.ps.ps_ExprContext; 2043 Datum value; 2044 bool isnull; 2045 int16 len; 2046 bool byval; 2047 2048 if (frameOptions & FRAMEOPTION_START_OFFSET) 2049 { 2050 Assert(winstate->startOffset != NULL); 2051 value = ExecEvalExprSwitchContext(winstate->startOffset, 2052 econtext, 2053 &isnull); 2054 if (isnull) 2055 ereport(ERROR, 2056 (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), 2057 errmsg("frame starting offset must not be null"))); 2058 /* copy value into query-lifespan context */ 2059 get_typlenbyval(exprType((Node *) winstate->startOffset->expr), 2060 &len, &byval); 2061 winstate->startOffsetValue = datumCopy(value, byval, len); 2062 if (frameOptions & (FRAMEOPTION_ROWS | FRAMEOPTION_GROUPS)) 2063 { 2064 /* value is known to be int8 */ 2065 int64 offset = DatumGetInt64(value); 2066 2067 if (offset < 0) 2068 ereport(ERROR, 2069 (errcode(ERRCODE_INVALID_PRECEDING_OR_FOLLOWING_SIZE), 2070 errmsg("frame starting offset must not be negative"))); 2071 } 2072 } 2073 if (frameOptions & FRAMEOPTION_END_OFFSET) 2074 { 2075 Assert(winstate->endOffset != NULL); 2076 value = ExecEvalExprSwitchContext(winstate->endOffset, 2077 econtext, 2078 &isnull); 2079 if (isnull) 2080 ereport(ERROR, 2081 (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), 2082 errmsg("frame ending offset must not be null"))); 2083 /* copy value into query-lifespan context */ 2084 get_typlenbyval(exprType((Node *) winstate->endOffset->expr), 2085 &len, &byval); 2086 winstate->endOffsetValue = datumCopy(value, byval, len); 2087 if (frameOptions & (FRAMEOPTION_ROWS | FRAMEOPTION_GROUPS)) 2088 { 2089 /* value is known to be int8 */ 2090 int64 offset = DatumGetInt64(value); 2091 2092 if (offset < 0) 2093 ereport(ERROR, 2094 (errcode(ERRCODE_INVALID_PRECEDING_OR_FOLLOWING_SIZE), 2095 errmsg("frame ending offset must not be negative"))); 2096 } 2097 } 2098 winstate->all_first = false; 2099 } 2100 2101 if (winstate->buffer == NULL) 2102 { 2103 /* Initialize for first partition and set current row = 0 */ 2104 begin_partition(winstate); 2105 /* If there are no input rows, we'll detect that and exit below */ 2106 } 2107 else 2108 { 2109 /* Advance current row within partition */ 2110 winstate->currentpos++; 2111 /* This might mean that the frame moves, too */ 2112 winstate->framehead_valid = false; 2113 winstate->frametail_valid = false; 2114 /* we don't need to invalidate grouptail here; see below */ 2115 } 2116 2117 /* 2118 * Spool all tuples up to and including the current row, if we haven't 2119 * already 2120 */ 2121 spool_tuples(winstate, winstate->currentpos); 2122 2123 /* Move to the next partition if we reached the end of this partition */ 2124 if (winstate->partition_spooled && 2125 winstate->currentpos >= winstate->spooled_rows) 2126 { 2127 release_partition(winstate); 2128 2129 if (winstate->more_partitions) 2130 { 2131 begin_partition(winstate); 2132 Assert(winstate->spooled_rows > 0); 2133 } 2134 else 2135 { 2136 winstate->all_done = true; 2137 return NULL; 2138 } 2139 } 2140 2141 /* final output execution is in ps_ExprContext */ 2142 econtext = winstate->ss.ps.ps_ExprContext; 2143 2144 /* Clear the per-output-tuple context for current row */ 2145 ResetExprContext(econtext); 2146 2147 /* 2148 * Read the current row from the tuplestore, and save in ScanTupleSlot. 2149 * (We can't rely on the outerplan's output slot because we may have to 2150 * read beyond the current row. Also, we have to actually copy the row 2151 * out of the tuplestore, since window function evaluation might cause the 2152 * tuplestore to dump its state to disk.) 2153 * 2154 * In GROUPS mode, or when tracking a group-oriented exclusion clause, we 2155 * must also detect entering a new peer group and update associated state 2156 * when that happens. We use temp_slot_2 to temporarily hold the previous 2157 * row for this purpose. 2158 * 2159 * Current row must be in the tuplestore, since we spooled it above. 2160 */ 2161 tuplestore_select_read_pointer(winstate->buffer, winstate->current_ptr); 2162 if ((winstate->frameOptions & (FRAMEOPTION_GROUPS | 2163 FRAMEOPTION_EXCLUDE_GROUP | 2164 FRAMEOPTION_EXCLUDE_TIES)) && 2165 winstate->currentpos > 0) 2166 { 2167 ExecCopySlot(winstate->temp_slot_2, winstate->ss.ss_ScanTupleSlot); 2168 if (!tuplestore_gettupleslot(winstate->buffer, true, true, 2169 winstate->ss.ss_ScanTupleSlot)) 2170 elog(ERROR, "unexpected end of tuplestore"); 2171 if (!are_peers(winstate, winstate->temp_slot_2, 2172 winstate->ss.ss_ScanTupleSlot)) 2173 { 2174 winstate->currentgroup++; 2175 winstate->groupheadpos = winstate->currentpos; 2176 winstate->grouptail_valid = false; 2177 } 2178 ExecClearTuple(winstate->temp_slot_2); 2179 } 2180 else 2181 { 2182 if (!tuplestore_gettupleslot(winstate->buffer, true, true, 2183 winstate->ss.ss_ScanTupleSlot)) 2184 elog(ERROR, "unexpected end of tuplestore"); 2185 } 2186 2187 /* 2188 * Evaluate true window functions 2189 */ 2190 numfuncs = winstate->numfuncs; 2191 for (i = 0; i < numfuncs; i++) 2192 { 2193 WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]); 2194 2195 if (perfuncstate->plain_agg) 2196 continue; 2197 eval_windowfunction(winstate, perfuncstate, 2198 &(econtext->ecxt_aggvalues[perfuncstate->wfuncstate->wfuncno]), 2199 &(econtext->ecxt_aggnulls[perfuncstate->wfuncstate->wfuncno])); 2200 } 2201 2202 /* 2203 * Evaluate aggregates 2204 */ 2205 if (winstate->numaggs > 0) 2206 eval_windowaggregates(winstate); 2207 2208 /* 2209 * If we have created auxiliary read pointers for the frame or group 2210 * boundaries, force them to be kept up-to-date, because we don't know 2211 * whether the window function(s) will do anything that requires that. 2212 * Failing to advance the pointers would result in being unable to trim 2213 * data from the tuplestore, which is bad. (If we could know in advance 2214 * whether the window functions will use frame boundary info, we could 2215 * skip creating these pointers in the first place ... but unfortunately 2216 * the window function API doesn't require that.) 2217 */ 2218 if (winstate->framehead_ptr >= 0) 2219 update_frameheadpos(winstate); 2220 if (winstate->frametail_ptr >= 0) 2221 update_frametailpos(winstate); 2222 if (winstate->grouptail_ptr >= 0) 2223 update_grouptailpos(winstate); 2224 2225 /* 2226 * Truncate any no-longer-needed rows from the tuplestore. 2227 */ 2228 tuplestore_trim(winstate->buffer); 2229 2230 /* 2231 * Form and return a projection tuple using the windowfunc results and the 2232 * current row. Setting ecxt_outertuple arranges that any Vars will be 2233 * evaluated with respect to that row. 2234 */ 2235 econtext->ecxt_outertuple = winstate->ss.ss_ScanTupleSlot; 2236 2237 return ExecProject(winstate->ss.ps.ps_ProjInfo); 2238 } 2239 2240 /* ----------------- 2241 * ExecInitWindowAgg 2242 * 2243 * Creates the run-time information for the WindowAgg node produced by the 2244 * planner and initializes its outer subtree 2245 * ----------------- 2246 */ 2247 WindowAggState * 2248 ExecInitWindowAgg(WindowAgg *node, EState *estate, int eflags) 2249 { 2250 WindowAggState *winstate; 2251 Plan *outerPlan; 2252 ExprContext *econtext; 2253 ExprContext *tmpcontext; 2254 WindowStatePerFunc perfunc; 2255 WindowStatePerAgg peragg; 2256 int frameOptions = node->frameOptions; 2257 int numfuncs, 2258 wfuncno, 2259 numaggs, 2260 aggno; 2261 TupleDesc scanDesc; 2262 ListCell *l; 2263 2264 /* check for unsupported flags */ 2265 Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK))); 2266 2267 /* 2268 * create state structure 2269 */ 2270 winstate = makeNode(WindowAggState); 2271 winstate->ss.ps.plan = (Plan *) node; 2272 winstate->ss.ps.state = estate; 2273 winstate->ss.ps.ExecProcNode = ExecWindowAgg; 2274 2275 /* 2276 * Create expression contexts. We need two, one for per-input-tuple 2277 * processing and one for per-output-tuple processing. We cheat a little 2278 * by using ExecAssignExprContext() to build both. 2279 */ 2280 ExecAssignExprContext(estate, &winstate->ss.ps); 2281 tmpcontext = winstate->ss.ps.ps_ExprContext; 2282 winstate->tmpcontext = tmpcontext; 2283 ExecAssignExprContext(estate, &winstate->ss.ps); 2284 2285 /* Create long-lived context for storage of partition-local memory etc */ 2286 winstate->partcontext = 2287 AllocSetContextCreate(CurrentMemoryContext, 2288 "WindowAgg Partition", 2289 ALLOCSET_DEFAULT_SIZES); 2290 2291 /* 2292 * Create mid-lived context for aggregate trans values etc. 2293 * 2294 * Note that moving aggregates each use their own private context, not 2295 * this one. 2296 */ 2297 winstate->aggcontext = 2298 AllocSetContextCreate(CurrentMemoryContext, 2299 "WindowAgg Aggregates", 2300 ALLOCSET_DEFAULT_SIZES); 2301 2302 /* 2303 * WindowAgg nodes never have quals, since they can only occur at the 2304 * logical top level of a query (ie, after any WHERE or HAVING filters) 2305 */ 2306 Assert(node->plan.qual == NIL); 2307 winstate->ss.ps.qual = NULL; 2308 2309 /* 2310 * initialize child nodes 2311 */ 2312 outerPlan = outerPlan(node); 2313 outerPlanState(winstate) = ExecInitNode(outerPlan, estate, eflags); 2314 2315 /* 2316 * initialize source tuple type (which is also the tuple type that we'll 2317 * store in the tuplestore and use in all our working slots). 2318 */ 2319 ExecCreateScanSlotFromOuterPlan(estate, &winstate->ss); 2320 scanDesc = winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor; 2321 2322 /* 2323 * tuple table initialization 2324 */ 2325 winstate->first_part_slot = ExecInitExtraTupleSlot(estate, scanDesc); 2326 winstate->agg_row_slot = ExecInitExtraTupleSlot(estate, scanDesc); 2327 winstate->temp_slot_1 = ExecInitExtraTupleSlot(estate, scanDesc); 2328 winstate->temp_slot_2 = ExecInitExtraTupleSlot(estate, scanDesc); 2329 2330 /* 2331 * create frame head and tail slots only if needed (must create slots in 2332 * exactly the same cases that update_frameheadpos and update_frametailpos 2333 * need them) 2334 */ 2335 winstate->framehead_slot = winstate->frametail_slot = NULL; 2336 2337 if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS)) 2338 { 2339 if (((frameOptions & FRAMEOPTION_START_CURRENT_ROW) && 2340 node->ordNumCols != 0) || 2341 (frameOptions & FRAMEOPTION_START_OFFSET)) 2342 winstate->framehead_slot = ExecInitExtraTupleSlot(estate, scanDesc); 2343 if (((frameOptions & FRAMEOPTION_END_CURRENT_ROW) && 2344 node->ordNumCols != 0) || 2345 (frameOptions & FRAMEOPTION_END_OFFSET)) 2346 winstate->frametail_slot = ExecInitExtraTupleSlot(estate, scanDesc); 2347 } 2348 2349 /* 2350 * Initialize result slot, type and projection. 2351 */ 2352 ExecInitResultTupleSlotTL(estate, &winstate->ss.ps); 2353 ExecAssignProjectionInfo(&winstate->ss.ps, NULL); 2354 2355 /* Set up data for comparing tuples */ 2356 if (node->partNumCols > 0) 2357 winstate->partEqfunction = 2358 execTuplesMatchPrepare(scanDesc, 2359 node->partNumCols, 2360 node->partColIdx, 2361 node->partOperators, 2362 &winstate->ss.ps); 2363 2364 if (node->ordNumCols > 0) 2365 winstate->ordEqfunction = 2366 execTuplesMatchPrepare(scanDesc, 2367 node->ordNumCols, 2368 node->ordColIdx, 2369 node->ordOperators, 2370 &winstate->ss.ps); 2371 2372 /* 2373 * WindowAgg nodes use aggvalues and aggnulls as well as Agg nodes. 2374 */ 2375 numfuncs = winstate->numfuncs; 2376 numaggs = winstate->numaggs; 2377 econtext = winstate->ss.ps.ps_ExprContext; 2378 econtext->ecxt_aggvalues = (Datum *) palloc0(sizeof(Datum) * numfuncs); 2379 econtext->ecxt_aggnulls = (bool *) palloc0(sizeof(bool) * numfuncs); 2380 2381 /* 2382 * allocate per-wfunc/per-agg state information. 2383 */ 2384 perfunc = (WindowStatePerFunc) palloc0(sizeof(WindowStatePerFuncData) * numfuncs); 2385 peragg = (WindowStatePerAgg) palloc0(sizeof(WindowStatePerAggData) * numaggs); 2386 winstate->perfunc = perfunc; 2387 winstate->peragg = peragg; 2388 2389 wfuncno = -1; 2390 aggno = -1; 2391 foreach(l, winstate->funcs) 2392 { 2393 WindowFuncExprState *wfuncstate = (WindowFuncExprState *) lfirst(l); 2394 WindowFunc *wfunc = wfuncstate->wfunc; 2395 WindowStatePerFunc perfuncstate; 2396 AclResult aclresult; 2397 int i; 2398 2399 if (wfunc->winref != node->winref) /* planner screwed up? */ 2400 elog(ERROR, "WindowFunc with winref %u assigned to WindowAgg with winref %u", 2401 wfunc->winref, node->winref); 2402 2403 /* Look for a previous duplicate window function */ 2404 for (i = 0; i <= wfuncno; i++) 2405 { 2406 if (equal(wfunc, perfunc[i].wfunc) && 2407 !contain_volatile_functions((Node *) wfunc)) 2408 break; 2409 } 2410 if (i <= wfuncno) 2411 { 2412 /* Found a match to an existing entry, so just mark it */ 2413 wfuncstate->wfuncno = i; 2414 continue; 2415 } 2416 2417 /* Nope, so assign a new PerAgg record */ 2418 perfuncstate = &perfunc[++wfuncno]; 2419 2420 /* Mark WindowFunc state node with assigned index in the result array */ 2421 wfuncstate->wfuncno = wfuncno; 2422 2423 /* Check permission to call window function */ 2424 aclresult = pg_proc_aclcheck(wfunc->winfnoid, GetUserId(), 2425 ACL_EXECUTE); 2426 if (aclresult != ACLCHECK_OK) 2427 aclcheck_error(aclresult, OBJECT_FUNCTION, 2428 get_func_name(wfunc->winfnoid)); 2429 InvokeFunctionExecuteHook(wfunc->winfnoid); 2430 2431 /* Fill in the perfuncstate data */ 2432 perfuncstate->wfuncstate = wfuncstate; 2433 perfuncstate->wfunc = wfunc; 2434 perfuncstate->numArguments = list_length(wfuncstate->args); 2435 2436 fmgr_info_cxt(wfunc->winfnoid, &perfuncstate->flinfo, 2437 econtext->ecxt_per_query_memory); 2438 fmgr_info_set_expr((Node *) wfunc, &perfuncstate->flinfo); 2439 2440 perfuncstate->winCollation = wfunc->inputcollid; 2441 2442 get_typlenbyval(wfunc->wintype, 2443 &perfuncstate->resulttypeLen, 2444 &perfuncstate->resulttypeByVal); 2445 2446 /* 2447 * If it's really just a plain aggregate function, we'll emulate the 2448 * Agg environment for it. 2449 */ 2450 perfuncstate->plain_agg = wfunc->winagg; 2451 if (wfunc->winagg) 2452 { 2453 WindowStatePerAgg peraggstate; 2454 2455 perfuncstate->aggno = ++aggno; 2456 peraggstate = &winstate->peragg[aggno]; 2457 initialize_peragg(winstate, wfunc, peraggstate); 2458 peraggstate->wfuncno = wfuncno; 2459 } 2460 else 2461 { 2462 WindowObject winobj = makeNode(WindowObjectData); 2463 2464 winobj->winstate = winstate; 2465 winobj->argstates = wfuncstate->args; 2466 winobj->localmem = NULL; 2467 perfuncstate->winobj = winobj; 2468 } 2469 } 2470 2471 /* Update numfuncs, numaggs to match number of unique functions found */ 2472 winstate->numfuncs = wfuncno + 1; 2473 winstate->numaggs = aggno + 1; 2474 2475 /* Set up WindowObject for aggregates, if needed */ 2476 if (winstate->numaggs > 0) 2477 { 2478 WindowObject agg_winobj = makeNode(WindowObjectData); 2479 2480 agg_winobj->winstate = winstate; 2481 agg_winobj->argstates = NIL; 2482 agg_winobj->localmem = NULL; 2483 /* make sure markptr = -1 to invalidate. It may not get used */ 2484 agg_winobj->markptr = -1; 2485 agg_winobj->readptr = -1; 2486 winstate->agg_winobj = agg_winobj; 2487 } 2488 2489 /* copy frame options to state node for easy access */ 2490 winstate->frameOptions = frameOptions; 2491 2492 /* initialize frame bound offset expressions */ 2493 winstate->startOffset = ExecInitExpr((Expr *) node->startOffset, 2494 (PlanState *) winstate); 2495 winstate->endOffset = ExecInitExpr((Expr *) node->endOffset, 2496 (PlanState *) winstate); 2497 2498 /* Lookup in_range support functions if needed */ 2499 if (OidIsValid(node->startInRangeFunc)) 2500 fmgr_info(node->startInRangeFunc, &winstate->startInRangeFunc); 2501 if (OidIsValid(node->endInRangeFunc)) 2502 fmgr_info(node->endInRangeFunc, &winstate->endInRangeFunc); 2503 winstate->inRangeColl = node->inRangeColl; 2504 winstate->inRangeAsc = node->inRangeAsc; 2505 winstate->inRangeNullsFirst = node->inRangeNullsFirst; 2506 2507 winstate->all_first = true; 2508 winstate->partition_spooled = false; 2509 winstate->more_partitions = false; 2510 2511 return winstate; 2512 } 2513 2514 /* ----------------- 2515 * ExecEndWindowAgg 2516 * ----------------- 2517 */ 2518 void 2519 ExecEndWindowAgg(WindowAggState *node) 2520 { 2521 PlanState *outerPlan; 2522 int i; 2523 2524 release_partition(node); 2525 2526 ExecClearTuple(node->ss.ss_ScanTupleSlot); 2527 ExecClearTuple(node->first_part_slot); 2528 ExecClearTuple(node->agg_row_slot); 2529 ExecClearTuple(node->temp_slot_1); 2530 ExecClearTuple(node->temp_slot_2); 2531 if (node->framehead_slot) 2532 ExecClearTuple(node->framehead_slot); 2533 if (node->frametail_slot) 2534 ExecClearTuple(node->frametail_slot); 2535 2536 /* 2537 * Free both the expr contexts. 2538 */ 2539 ExecFreeExprContext(&node->ss.ps); 2540 node->ss.ps.ps_ExprContext = node->tmpcontext; 2541 ExecFreeExprContext(&node->ss.ps); 2542 2543 for (i = 0; i < node->numaggs; i++) 2544 { 2545 if (node->peragg[i].aggcontext != node->aggcontext) 2546 MemoryContextDelete(node->peragg[i].aggcontext); 2547 } 2548 MemoryContextDelete(node->partcontext); 2549 MemoryContextDelete(node->aggcontext); 2550 2551 pfree(node->perfunc); 2552 pfree(node->peragg); 2553 2554 outerPlan = outerPlanState(node); 2555 ExecEndNode(outerPlan); 2556 } 2557 2558 /* ----------------- 2559 * ExecReScanWindowAgg 2560 * ----------------- 2561 */ 2562 void 2563 ExecReScanWindowAgg(WindowAggState *node) 2564 { 2565 PlanState *outerPlan = outerPlanState(node); 2566 ExprContext *econtext = node->ss.ps.ps_ExprContext; 2567 2568 node->all_done = false; 2569 node->all_first = true; 2570 2571 /* release tuplestore et al */ 2572 release_partition(node); 2573 2574 /* release all temp tuples, but especially first_part_slot */ 2575 ExecClearTuple(node->ss.ss_ScanTupleSlot); 2576 ExecClearTuple(node->first_part_slot); 2577 ExecClearTuple(node->agg_row_slot); 2578 ExecClearTuple(node->temp_slot_1); 2579 ExecClearTuple(node->temp_slot_2); 2580 if (node->framehead_slot) 2581 ExecClearTuple(node->framehead_slot); 2582 if (node->frametail_slot) 2583 ExecClearTuple(node->frametail_slot); 2584 2585 /* Forget current wfunc values */ 2586 MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * node->numfuncs); 2587 MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * node->numfuncs); 2588 2589 /* 2590 * if chgParam of subnode is not null then plan will be re-scanned by 2591 * first ExecProcNode. 2592 */ 2593 if (outerPlan->chgParam == NULL) 2594 ExecReScan(outerPlan); 2595 } 2596 2597 /* 2598 * initialize_peragg 2599 * 2600 * Almost same as in nodeAgg.c, except we don't support DISTINCT currently. 2601 */ 2602 static WindowStatePerAggData * 2603 initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc, 2604 WindowStatePerAgg peraggstate) 2605 { 2606 Oid inputTypes[FUNC_MAX_ARGS]; 2607 int numArguments; 2608 HeapTuple aggTuple; 2609 Form_pg_aggregate aggform; 2610 Oid aggtranstype; 2611 AttrNumber initvalAttNo; 2612 AclResult aclresult; 2613 bool use_ma_code; 2614 Oid transfn_oid, 2615 invtransfn_oid, 2616 finalfn_oid; 2617 bool finalextra; 2618 char finalmodify; 2619 Expr *transfnexpr, 2620 *invtransfnexpr, 2621 *finalfnexpr; 2622 Datum textInitVal; 2623 int i; 2624 ListCell *lc; 2625 2626 numArguments = list_length(wfunc->args); 2627 2628 i = 0; 2629 foreach(lc, wfunc->args) 2630 { 2631 inputTypes[i++] = exprType((Node *) lfirst(lc)); 2632 } 2633 2634 aggTuple = SearchSysCache1(AGGFNOID, ObjectIdGetDatum(wfunc->winfnoid)); 2635 if (!HeapTupleIsValid(aggTuple)) 2636 elog(ERROR, "cache lookup failed for aggregate %u", 2637 wfunc->winfnoid); 2638 aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple); 2639 2640 /* 2641 * Figure out whether we want to use the moving-aggregate implementation, 2642 * and collect the right set of fields from the pg_attribute entry. 2643 * 2644 * It's possible that an aggregate would supply a safe moving-aggregate 2645 * implementation and an unsafe normal one, in which case our hand is 2646 * forced. Otherwise, if the frame head can't move, we don't need 2647 * moving-aggregate code. Even if we'd like to use it, don't do so if the 2648 * aggregate's arguments (and FILTER clause if any) contain any calls to 2649 * volatile functions. Otherwise, the difference between restarting and 2650 * not restarting the aggregation would be user-visible. 2651 */ 2652 if (!OidIsValid(aggform->aggminvtransfn)) 2653 use_ma_code = false; /* sine qua non */ 2654 else if (aggform->aggmfinalmodify == AGGMODIFY_READ_ONLY && 2655 aggform->aggfinalmodify != AGGMODIFY_READ_ONLY) 2656 use_ma_code = true; /* decision forced by safety */ 2657 else if (winstate->frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING) 2658 use_ma_code = false; /* non-moving frame head */ 2659 else if (contain_volatile_functions((Node *) wfunc)) 2660 use_ma_code = false; /* avoid possible behavioral change */ 2661 else 2662 use_ma_code = true; /* yes, let's use it */ 2663 if (use_ma_code) 2664 { 2665 peraggstate->transfn_oid = transfn_oid = aggform->aggmtransfn; 2666 peraggstate->invtransfn_oid = invtransfn_oid = aggform->aggminvtransfn; 2667 peraggstate->finalfn_oid = finalfn_oid = aggform->aggmfinalfn; 2668 finalextra = aggform->aggmfinalextra; 2669 finalmodify = aggform->aggmfinalmodify; 2670 aggtranstype = aggform->aggmtranstype; 2671 initvalAttNo = Anum_pg_aggregate_aggminitval; 2672 } 2673 else 2674 { 2675 peraggstate->transfn_oid = transfn_oid = aggform->aggtransfn; 2676 peraggstate->invtransfn_oid = invtransfn_oid = InvalidOid; 2677 peraggstate->finalfn_oid = finalfn_oid = aggform->aggfinalfn; 2678 finalextra = aggform->aggfinalextra; 2679 finalmodify = aggform->aggfinalmodify; 2680 aggtranstype = aggform->aggtranstype; 2681 initvalAttNo = Anum_pg_aggregate_agginitval; 2682 } 2683 2684 /* 2685 * ExecInitWindowAgg already checked permission to call aggregate function 2686 * ... but we still need to check the component functions 2687 */ 2688 2689 /* Check that aggregate owner has permission to call component fns */ 2690 { 2691 HeapTuple procTuple; 2692 Oid aggOwner; 2693 2694 procTuple = SearchSysCache1(PROCOID, 2695 ObjectIdGetDatum(wfunc->winfnoid)); 2696 if (!HeapTupleIsValid(procTuple)) 2697 elog(ERROR, "cache lookup failed for function %u", 2698 wfunc->winfnoid); 2699 aggOwner = ((Form_pg_proc) GETSTRUCT(procTuple))->proowner; 2700 ReleaseSysCache(procTuple); 2701 2702 aclresult = pg_proc_aclcheck(transfn_oid, aggOwner, 2703 ACL_EXECUTE); 2704 if (aclresult != ACLCHECK_OK) 2705 aclcheck_error(aclresult, OBJECT_FUNCTION, 2706 get_func_name(transfn_oid)); 2707 InvokeFunctionExecuteHook(transfn_oid); 2708 2709 if (OidIsValid(invtransfn_oid)) 2710 { 2711 aclresult = pg_proc_aclcheck(invtransfn_oid, aggOwner, 2712 ACL_EXECUTE); 2713 if (aclresult != ACLCHECK_OK) 2714 aclcheck_error(aclresult, OBJECT_FUNCTION, 2715 get_func_name(invtransfn_oid)); 2716 InvokeFunctionExecuteHook(invtransfn_oid); 2717 } 2718 2719 if (OidIsValid(finalfn_oid)) 2720 { 2721 aclresult = pg_proc_aclcheck(finalfn_oid, aggOwner, 2722 ACL_EXECUTE); 2723 if (aclresult != ACLCHECK_OK) 2724 aclcheck_error(aclresult, OBJECT_FUNCTION, 2725 get_func_name(finalfn_oid)); 2726 InvokeFunctionExecuteHook(finalfn_oid); 2727 } 2728 } 2729 2730 /* 2731 * If the selected finalfn isn't read-only, we can't run this aggregate as 2732 * a window function. This is a user-facing error, so we take a bit more 2733 * care with the error message than elsewhere in this function. 2734 */ 2735 if (finalmodify != AGGMODIFY_READ_ONLY) 2736 ereport(ERROR, 2737 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), 2738 errmsg("aggregate function %s does not support use as a window function", 2739 format_procedure(wfunc->winfnoid)))); 2740 2741 /* Detect how many arguments to pass to the finalfn */ 2742 if (finalextra) 2743 peraggstate->numFinalArgs = numArguments + 1; 2744 else 2745 peraggstate->numFinalArgs = 1; 2746 2747 /* resolve actual type of transition state, if polymorphic */ 2748 aggtranstype = resolve_aggregate_transtype(wfunc->winfnoid, 2749 aggtranstype, 2750 inputTypes, 2751 numArguments); 2752 2753 /* build expression trees using actual argument & result types */ 2754 build_aggregate_transfn_expr(inputTypes, 2755 numArguments, 2756 0, /* no ordered-set window functions yet */ 2757 false, /* no variadic window functions yet */ 2758 aggtranstype, 2759 wfunc->inputcollid, 2760 transfn_oid, 2761 invtransfn_oid, 2762 &transfnexpr, 2763 &invtransfnexpr); 2764 2765 /* set up infrastructure for calling the transfn(s) and finalfn */ 2766 fmgr_info(transfn_oid, &peraggstate->transfn); 2767 fmgr_info_set_expr((Node *) transfnexpr, &peraggstate->transfn); 2768 2769 if (OidIsValid(invtransfn_oid)) 2770 { 2771 fmgr_info(invtransfn_oid, &peraggstate->invtransfn); 2772 fmgr_info_set_expr((Node *) invtransfnexpr, &peraggstate->invtransfn); 2773 } 2774 2775 if (OidIsValid(finalfn_oid)) 2776 { 2777 build_aggregate_finalfn_expr(inputTypes, 2778 peraggstate->numFinalArgs, 2779 aggtranstype, 2780 wfunc->wintype, 2781 wfunc->inputcollid, 2782 finalfn_oid, 2783 &finalfnexpr); 2784 fmgr_info(finalfn_oid, &peraggstate->finalfn); 2785 fmgr_info_set_expr((Node *) finalfnexpr, &peraggstate->finalfn); 2786 } 2787 2788 /* get info about relevant datatypes */ 2789 get_typlenbyval(wfunc->wintype, 2790 &peraggstate->resulttypeLen, 2791 &peraggstate->resulttypeByVal); 2792 get_typlenbyval(aggtranstype, 2793 &peraggstate->transtypeLen, 2794 &peraggstate->transtypeByVal); 2795 2796 /* 2797 * initval is potentially null, so don't try to access it as a struct 2798 * field. Must do it the hard way with SysCacheGetAttr. 2799 */ 2800 textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple, initvalAttNo, 2801 &peraggstate->initValueIsNull); 2802 2803 if (peraggstate->initValueIsNull) 2804 peraggstate->initValue = (Datum) 0; 2805 else 2806 peraggstate->initValue = GetAggInitVal(textInitVal, 2807 aggtranstype); 2808 2809 /* 2810 * If the transfn is strict and the initval is NULL, make sure input type 2811 * and transtype are the same (or at least binary-compatible), so that 2812 * it's OK to use the first input value as the initial transValue. This 2813 * should have been checked at agg definition time, but we must check 2814 * again in case the transfn's strictness property has been changed. 2815 */ 2816 if (peraggstate->transfn.fn_strict && peraggstate->initValueIsNull) 2817 { 2818 if (numArguments < 1 || 2819 !IsBinaryCoercible(inputTypes[0], aggtranstype)) 2820 ereport(ERROR, 2821 (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), 2822 errmsg("aggregate %u needs to have compatible input type and transition type", 2823 wfunc->winfnoid))); 2824 } 2825 2826 /* 2827 * Insist that forward and inverse transition functions have the same 2828 * strictness setting. Allowing them to differ would require handling 2829 * more special cases in advance_windowaggregate and 2830 * advance_windowaggregate_base, for no discernible benefit. This should 2831 * have been checked at agg definition time, but we must check again in 2832 * case either function's strictness property has been changed. 2833 */ 2834 if (OidIsValid(invtransfn_oid) && 2835 peraggstate->transfn.fn_strict != peraggstate->invtransfn.fn_strict) 2836 ereport(ERROR, 2837 (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), 2838 errmsg("strictness of aggregate's forward and inverse transition functions must match"))); 2839 2840 /* 2841 * Moving aggregates use their own aggcontext. 2842 * 2843 * This is necessary because they might restart at different times, so we 2844 * might never be able to reset the shared context otherwise. We can't 2845 * make it the aggregates' responsibility to clean up after themselves, 2846 * because strict aggregates must be restarted whenever we remove their 2847 * last non-NULL input, which the aggregate won't be aware is happening. 2848 * Also, just pfree()ing the transValue upon restarting wouldn't help, 2849 * since we'd miss any indirectly referenced data. We could, in theory, 2850 * make the memory allocation rules for moving aggregates different than 2851 * they have historically been for plain aggregates, but that seems grotty 2852 * and likely to lead to memory leaks. 2853 */ 2854 if (OidIsValid(invtransfn_oid)) 2855 peraggstate->aggcontext = 2856 AllocSetContextCreate(CurrentMemoryContext, 2857 "WindowAgg Per Aggregate", 2858 ALLOCSET_DEFAULT_SIZES); 2859 else 2860 peraggstate->aggcontext = winstate->aggcontext; 2861 2862 ReleaseSysCache(aggTuple); 2863 2864 return peraggstate; 2865 } 2866 2867 static Datum 2868 GetAggInitVal(Datum textInitVal, Oid transtype) 2869 { 2870 Oid typinput, 2871 typioparam; 2872 char *strInitVal; 2873 Datum initVal; 2874 2875 getTypeInputInfo(transtype, &typinput, &typioparam); 2876 strInitVal = TextDatumGetCString(textInitVal); 2877 initVal = OidInputFunctionCall(typinput, strInitVal, 2878 typioparam, -1); 2879 pfree(strInitVal); 2880 return initVal; 2881 } 2882 2883 /* 2884 * are_peers 2885 * compare two rows to see if they are equal according to the ORDER BY clause 2886 * 2887 * NB: this does not consider the window frame mode. 2888 */ 2889 static bool 2890 are_peers(WindowAggState *winstate, TupleTableSlot *slot1, 2891 TupleTableSlot *slot2) 2892 { 2893 WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan; 2894 ExprContext *econtext = winstate->tmpcontext; 2895 2896 /* If no ORDER BY, all rows are peers with each other */ 2897 if (node->ordNumCols == 0) 2898 return true; 2899 2900 econtext->ecxt_outertuple = slot1; 2901 econtext->ecxt_innertuple = slot2; 2902 return ExecQualAndReset(winstate->ordEqfunction, econtext); 2903 } 2904 2905 /* 2906 * window_gettupleslot 2907 * Fetch the pos'th tuple of the current partition into the slot, 2908 * using the winobj's read pointer 2909 * 2910 * Returns true if successful, false if no such row 2911 */ 2912 static bool 2913 window_gettupleslot(WindowObject winobj, int64 pos, TupleTableSlot *slot) 2914 { 2915 WindowAggState *winstate = winobj->winstate; 2916 MemoryContext oldcontext; 2917 2918 /* often called repeatedly in a row */ 2919 CHECK_FOR_INTERRUPTS(); 2920 2921 /* Don't allow passing -1 to spool_tuples here */ 2922 if (pos < 0) 2923 return false; 2924 2925 /* If necessary, fetch the tuple into the spool */ 2926 spool_tuples(winstate, pos); 2927 2928 if (pos >= winstate->spooled_rows) 2929 return false; 2930 2931 if (pos < winobj->markpos) 2932 elog(ERROR, "cannot fetch row before WindowObject's mark position"); 2933 2934 oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory); 2935 2936 tuplestore_select_read_pointer(winstate->buffer, winobj->readptr); 2937 2938 /* 2939 * Advance or rewind until we are within one tuple of the one we want. 2940 */ 2941 if (winobj->seekpos < pos - 1) 2942 { 2943 if (!tuplestore_skiptuples(winstate->buffer, 2944 pos - 1 - winobj->seekpos, 2945 true)) 2946 elog(ERROR, "unexpected end of tuplestore"); 2947 winobj->seekpos = pos - 1; 2948 } 2949 else if (winobj->seekpos > pos + 1) 2950 { 2951 if (!tuplestore_skiptuples(winstate->buffer, 2952 winobj->seekpos - (pos + 1), 2953 false)) 2954 elog(ERROR, "unexpected end of tuplestore"); 2955 winobj->seekpos = pos + 1; 2956 } 2957 else if (winobj->seekpos == pos) 2958 { 2959 /* 2960 * There's no API to refetch the tuple at the current position. We 2961 * have to move one tuple forward, and then one backward. (We don't 2962 * do it the other way because we might try to fetch the row before 2963 * our mark, which isn't allowed.) XXX this case could stand to be 2964 * optimized. 2965 */ 2966 tuplestore_advance(winstate->buffer, true); 2967 winobj->seekpos++; 2968 } 2969 2970 /* 2971 * Now we should be on the tuple immediately before or after the one we 2972 * want, so just fetch forwards or backwards as appropriate. 2973 */ 2974 if (winobj->seekpos > pos) 2975 { 2976 if (!tuplestore_gettupleslot(winstate->buffer, false, true, slot)) 2977 elog(ERROR, "unexpected end of tuplestore"); 2978 winobj->seekpos--; 2979 } 2980 else 2981 { 2982 if (!tuplestore_gettupleslot(winstate->buffer, true, true, slot)) 2983 elog(ERROR, "unexpected end of tuplestore"); 2984 winobj->seekpos++; 2985 } 2986 2987 Assert(winobj->seekpos == pos); 2988 2989 MemoryContextSwitchTo(oldcontext); 2990 2991 return true; 2992 } 2993 2994 2995 /*********************************************************************** 2996 * API exposed to window functions 2997 ***********************************************************************/ 2998 2999 3000 /* 3001 * WinGetPartitionLocalMemory 3002 * Get working memory that lives till end of partition processing 3003 * 3004 * On first call within a given partition, this allocates and zeroes the 3005 * requested amount of space. Subsequent calls just return the same chunk. 3006 * 3007 * Memory obtained this way is normally used to hold state that should be 3008 * automatically reset for each new partition. If a window function wants 3009 * to hold state across the whole query, fcinfo->fn_extra can be used in the 3010 * usual way for that. 3011 */ 3012 void * 3013 WinGetPartitionLocalMemory(WindowObject winobj, Size sz) 3014 { 3015 Assert(WindowObjectIsValid(winobj)); 3016 if (winobj->localmem == NULL) 3017 winobj->localmem = 3018 MemoryContextAllocZero(winobj->winstate->partcontext, sz); 3019 return winobj->localmem; 3020 } 3021 3022 /* 3023 * WinGetCurrentPosition 3024 * Return the current row's position (counting from 0) within the current 3025 * partition. 3026 */ 3027 int64 3028 WinGetCurrentPosition(WindowObject winobj) 3029 { 3030 Assert(WindowObjectIsValid(winobj)); 3031 return winobj->winstate->currentpos; 3032 } 3033 3034 /* 3035 * WinGetPartitionRowCount 3036 * Return total number of rows contained in the current partition. 3037 * 3038 * Note: this is a relatively expensive operation because it forces the 3039 * whole partition to be "spooled" into the tuplestore at once. Once 3040 * executed, however, additional calls within the same partition are cheap. 3041 */ 3042 int64 3043 WinGetPartitionRowCount(WindowObject winobj) 3044 { 3045 Assert(WindowObjectIsValid(winobj)); 3046 spool_tuples(winobj->winstate, -1); 3047 return winobj->winstate->spooled_rows; 3048 } 3049 3050 /* 3051 * WinSetMarkPosition 3052 * Set the "mark" position for the window object, which is the oldest row 3053 * number (counting from 0) it is allowed to fetch during all subsequent 3054 * operations within the current partition. 3055 * 3056 * Window functions do not have to call this, but are encouraged to move the 3057 * mark forward when possible to keep the tuplestore size down and prevent 3058 * having to spill rows to disk. 3059 */ 3060 void 3061 WinSetMarkPosition(WindowObject winobj, int64 markpos) 3062 { 3063 WindowAggState *winstate; 3064 3065 Assert(WindowObjectIsValid(winobj)); 3066 winstate = winobj->winstate; 3067 3068 if (markpos < winobj->markpos) 3069 elog(ERROR, "cannot move WindowObject's mark position backward"); 3070 tuplestore_select_read_pointer(winstate->buffer, winobj->markptr); 3071 if (markpos > winobj->markpos) 3072 { 3073 tuplestore_skiptuples(winstate->buffer, 3074 markpos - winobj->markpos, 3075 true); 3076 winobj->markpos = markpos; 3077 } 3078 tuplestore_select_read_pointer(winstate->buffer, winobj->readptr); 3079 if (markpos > winobj->seekpos) 3080 { 3081 tuplestore_skiptuples(winstate->buffer, 3082 markpos - winobj->seekpos, 3083 true); 3084 winobj->seekpos = markpos; 3085 } 3086 } 3087 3088 /* 3089 * WinRowsArePeers 3090 * Compare two rows (specified by absolute position in partition) to see 3091 * if they are equal according to the ORDER BY clause. 3092 * 3093 * NB: this does not consider the window frame mode. 3094 */ 3095 bool 3096 WinRowsArePeers(WindowObject winobj, int64 pos1, int64 pos2) 3097 { 3098 WindowAggState *winstate; 3099 WindowAgg *node; 3100 TupleTableSlot *slot1; 3101 TupleTableSlot *slot2; 3102 bool res; 3103 3104 Assert(WindowObjectIsValid(winobj)); 3105 winstate = winobj->winstate; 3106 node = (WindowAgg *) winstate->ss.ps.plan; 3107 3108 /* If no ORDER BY, all rows are peers; don't bother to fetch them */ 3109 if (node->ordNumCols == 0) 3110 return true; 3111 3112 /* 3113 * Note: OK to use temp_slot_2 here because we aren't calling any 3114 * frame-related functions (those tend to clobber temp_slot_2). 3115 */ 3116 slot1 = winstate->temp_slot_1; 3117 slot2 = winstate->temp_slot_2; 3118 3119 if (!window_gettupleslot(winobj, pos1, slot1)) 3120 elog(ERROR, "specified position is out of window: " INT64_FORMAT, 3121 pos1); 3122 if (!window_gettupleslot(winobj, pos2, slot2)) 3123 elog(ERROR, "specified position is out of window: " INT64_FORMAT, 3124 pos2); 3125 3126 res = are_peers(winstate, slot1, slot2); 3127 3128 ExecClearTuple(slot1); 3129 ExecClearTuple(slot2); 3130 3131 return res; 3132 } 3133 3134 /* 3135 * WinGetFuncArgInPartition 3136 * Evaluate a window function's argument expression on a specified 3137 * row of the partition. The row is identified in lseek(2) style, 3138 * i.e. relative to the current, first, or last row. 3139 * 3140 * argno: argument number to evaluate (counted from 0) 3141 * relpos: signed rowcount offset from the seek position 3142 * seektype: WINDOW_SEEK_CURRENT, WINDOW_SEEK_HEAD, or WINDOW_SEEK_TAIL 3143 * set_mark: If the row is found and set_mark is true, the mark is moved to 3144 * the row as a side-effect. 3145 * isnull: output argument, receives isnull status of result 3146 * isout: output argument, set to indicate whether target row position 3147 * is out of partition (can pass NULL if caller doesn't care about this) 3148 * 3149 * Specifying a nonexistent row is not an error, it just causes a null result 3150 * (plus setting *isout true, if isout isn't NULL). 3151 */ 3152 Datum 3153 WinGetFuncArgInPartition(WindowObject winobj, int argno, 3154 int relpos, int seektype, bool set_mark, 3155 bool *isnull, bool *isout) 3156 { 3157 WindowAggState *winstate; 3158 ExprContext *econtext; 3159 TupleTableSlot *slot; 3160 bool gottuple; 3161 int64 abs_pos; 3162 3163 Assert(WindowObjectIsValid(winobj)); 3164 winstate = winobj->winstate; 3165 econtext = winstate->ss.ps.ps_ExprContext; 3166 slot = winstate->temp_slot_1; 3167 3168 switch (seektype) 3169 { 3170 case WINDOW_SEEK_CURRENT: 3171 abs_pos = winstate->currentpos + relpos; 3172 break; 3173 case WINDOW_SEEK_HEAD: 3174 abs_pos = relpos; 3175 break; 3176 case WINDOW_SEEK_TAIL: 3177 spool_tuples(winstate, -1); 3178 abs_pos = winstate->spooled_rows - 1 + relpos; 3179 break; 3180 default: 3181 elog(ERROR, "unrecognized window seek type: %d", seektype); 3182 abs_pos = 0; /* keep compiler quiet */ 3183 break; 3184 } 3185 3186 gottuple = window_gettupleslot(winobj, abs_pos, slot); 3187 3188 if (!gottuple) 3189 { 3190 if (isout) 3191 *isout = true; 3192 *isnull = true; 3193 return (Datum) 0; 3194 } 3195 else 3196 { 3197 if (isout) 3198 *isout = false; 3199 if (set_mark) 3200 WinSetMarkPosition(winobj, abs_pos); 3201 econtext->ecxt_outertuple = slot; 3202 return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno), 3203 econtext, isnull); 3204 } 3205 } 3206 3207 /* 3208 * WinGetFuncArgInFrame 3209 * Evaluate a window function's argument expression on a specified 3210 * row of the window frame. The row is identified in lseek(2) style, 3211 * i.e. relative to the first or last row of the frame. (We do not 3212 * support WINDOW_SEEK_CURRENT here, because it's not very clear what 3213 * that should mean if the current row isn't part of the frame.) 3214 * 3215 * argno: argument number to evaluate (counted from 0) 3216 * relpos: signed rowcount offset from the seek position 3217 * seektype: WINDOW_SEEK_HEAD or WINDOW_SEEK_TAIL 3218 * set_mark: If the row is found/in frame and set_mark is true, the mark is 3219 * moved to the row as a side-effect. 3220 * isnull: output argument, receives isnull status of result 3221 * isout: output argument, set to indicate whether target row position 3222 * is out of frame (can pass NULL if caller doesn't care about this) 3223 * 3224 * Specifying a nonexistent or not-in-frame row is not an error, it just 3225 * causes a null result (plus setting *isout true, if isout isn't NULL). 3226 * 3227 * Note that some exclusion-clause options lead to situations where the 3228 * rows that are in-frame are not consecutive in the partition. But we 3229 * count only in-frame rows when measuring relpos. 3230 * 3231 * The set_mark flag is interpreted as meaning that the caller will specify 3232 * a constant (or, perhaps, monotonically increasing) relpos in successive 3233 * calls, so that *if there is no exclusion clause* there will be no need 3234 * to fetch a row before the previously fetched row. But we do not expect 3235 * the caller to know how to account for exclusion clauses. Therefore, 3236 * if there is an exclusion clause we take responsibility for adjusting the 3237 * mark request to something that will be safe given the above assumption 3238 * about relpos. 3239 */ 3240 Datum 3241 WinGetFuncArgInFrame(WindowObject winobj, int argno, 3242 int relpos, int seektype, bool set_mark, 3243 bool *isnull, bool *isout) 3244 { 3245 WindowAggState *winstate; 3246 ExprContext *econtext; 3247 TupleTableSlot *slot; 3248 int64 abs_pos; 3249 int64 mark_pos; 3250 3251 Assert(WindowObjectIsValid(winobj)); 3252 winstate = winobj->winstate; 3253 econtext = winstate->ss.ps.ps_ExprContext; 3254 slot = winstate->temp_slot_1; 3255 3256 switch (seektype) 3257 { 3258 case WINDOW_SEEK_CURRENT: 3259 elog(ERROR, "WINDOW_SEEK_CURRENT is not supported for WinGetFuncArgInFrame"); 3260 abs_pos = mark_pos = 0; /* keep compiler quiet */ 3261 break; 3262 case WINDOW_SEEK_HEAD: 3263 /* rejecting relpos < 0 is easy and simplifies code below */ 3264 if (relpos < 0) 3265 goto out_of_frame; 3266 update_frameheadpos(winstate); 3267 abs_pos = winstate->frameheadpos + relpos; 3268 mark_pos = abs_pos; 3269 3270 /* 3271 * Account for exclusion option if one is active, but advance only 3272 * abs_pos not mark_pos. This prevents changes of the current 3273 * row's peer group from resulting in trying to fetch a row before 3274 * some previous mark position. 3275 * 3276 * Note that in some corner cases such as current row being 3277 * outside frame, these calculations are theoretically too simple, 3278 * but it doesn't matter because we'll end up deciding the row is 3279 * out of frame. We do not attempt to avoid fetching rows past 3280 * end of frame; that would happen in some cases anyway. 3281 */ 3282 switch (winstate->frameOptions & FRAMEOPTION_EXCLUSION) 3283 { 3284 case 0: 3285 /* no adjustment needed */ 3286 break; 3287 case FRAMEOPTION_EXCLUDE_CURRENT_ROW: 3288 if (abs_pos >= winstate->currentpos && 3289 winstate->currentpos >= winstate->frameheadpos) 3290 abs_pos++; 3291 break; 3292 case FRAMEOPTION_EXCLUDE_GROUP: 3293 update_grouptailpos(winstate); 3294 if (abs_pos >= winstate->groupheadpos && 3295 winstate->grouptailpos > winstate->frameheadpos) 3296 { 3297 int64 overlapstart = Max(winstate->groupheadpos, 3298 winstate->frameheadpos); 3299 3300 abs_pos += winstate->grouptailpos - overlapstart; 3301 } 3302 break; 3303 case FRAMEOPTION_EXCLUDE_TIES: 3304 update_grouptailpos(winstate); 3305 if (abs_pos >= winstate->groupheadpos && 3306 winstate->grouptailpos > winstate->frameheadpos) 3307 { 3308 int64 overlapstart = Max(winstate->groupheadpos, 3309 winstate->frameheadpos); 3310 3311 if (abs_pos == overlapstart) 3312 abs_pos = winstate->currentpos; 3313 else 3314 abs_pos += winstate->grouptailpos - overlapstart - 1; 3315 } 3316 break; 3317 default: 3318 elog(ERROR, "unrecognized frame option state: 0x%x", 3319 winstate->frameOptions); 3320 break; 3321 } 3322 break; 3323 case WINDOW_SEEK_TAIL: 3324 /* rejecting relpos > 0 is easy and simplifies code below */ 3325 if (relpos > 0) 3326 goto out_of_frame; 3327 update_frametailpos(winstate); 3328 abs_pos = winstate->frametailpos - 1 + relpos; 3329 3330 /* 3331 * Account for exclusion option if one is active. If there is no 3332 * exclusion, we can safely set the mark at the accessed row. But 3333 * if there is, we can only mark the frame start, because we can't 3334 * be sure how far back in the frame the exclusion might cause us 3335 * to fetch in future. Furthermore, we have to actually check 3336 * against frameheadpos here, since it's unsafe to try to fetch a 3337 * row before frame start if the mark might be there already. 3338 */ 3339 switch (winstate->frameOptions & FRAMEOPTION_EXCLUSION) 3340 { 3341 case 0: 3342 /* no adjustment needed */ 3343 mark_pos = abs_pos; 3344 break; 3345 case FRAMEOPTION_EXCLUDE_CURRENT_ROW: 3346 if (abs_pos <= winstate->currentpos && 3347 winstate->currentpos < winstate->frametailpos) 3348 abs_pos--; 3349 update_frameheadpos(winstate); 3350 if (abs_pos < winstate->frameheadpos) 3351 goto out_of_frame; 3352 mark_pos = winstate->frameheadpos; 3353 break; 3354 case FRAMEOPTION_EXCLUDE_GROUP: 3355 update_grouptailpos(winstate); 3356 if (abs_pos < winstate->grouptailpos && 3357 winstate->groupheadpos < winstate->frametailpos) 3358 { 3359 int64 overlapend = Min(winstate->grouptailpos, 3360 winstate->frametailpos); 3361 3362 abs_pos -= overlapend - winstate->groupheadpos; 3363 } 3364 update_frameheadpos(winstate); 3365 if (abs_pos < winstate->frameheadpos) 3366 goto out_of_frame; 3367 mark_pos = winstate->frameheadpos; 3368 break; 3369 case FRAMEOPTION_EXCLUDE_TIES: 3370 update_grouptailpos(winstate); 3371 if (abs_pos < winstate->grouptailpos && 3372 winstate->groupheadpos < winstate->frametailpos) 3373 { 3374 int64 overlapend = Min(winstate->grouptailpos, 3375 winstate->frametailpos); 3376 3377 if (abs_pos == overlapend - 1) 3378 abs_pos = winstate->currentpos; 3379 else 3380 abs_pos -= overlapend - 1 - winstate->groupheadpos; 3381 } 3382 update_frameheadpos(winstate); 3383 if (abs_pos < winstate->frameheadpos) 3384 goto out_of_frame; 3385 mark_pos = winstate->frameheadpos; 3386 break; 3387 default: 3388 elog(ERROR, "unrecognized frame option state: 0x%x", 3389 winstate->frameOptions); 3390 mark_pos = 0; /* keep compiler quiet */ 3391 break; 3392 } 3393 break; 3394 default: 3395 elog(ERROR, "unrecognized window seek type: %d", seektype); 3396 abs_pos = mark_pos = 0; /* keep compiler quiet */ 3397 break; 3398 } 3399 3400 if (!window_gettupleslot(winobj, abs_pos, slot)) 3401 goto out_of_frame; 3402 3403 /* The code above does not detect all out-of-frame cases, so check */ 3404 if (row_is_in_frame(winstate, abs_pos, slot) <= 0) 3405 goto out_of_frame; 3406 3407 if (isout) 3408 *isout = false; 3409 if (set_mark) 3410 WinSetMarkPosition(winobj, mark_pos); 3411 econtext->ecxt_outertuple = slot; 3412 return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno), 3413 econtext, isnull); 3414 3415 out_of_frame: 3416 if (isout) 3417 *isout = true; 3418 *isnull = true; 3419 return (Datum) 0; 3420 } 3421 3422 /* 3423 * WinGetFuncArgCurrent 3424 * Evaluate a window function's argument expression on the current row. 3425 * 3426 * argno: argument number to evaluate (counted from 0) 3427 * isnull: output argument, receives isnull status of result 3428 * 3429 * Note: this isn't quite equivalent to WinGetFuncArgInPartition or 3430 * WinGetFuncArgInFrame targeting the current row, because it will succeed 3431 * even if the WindowObject's mark has been set beyond the current row. 3432 * This should generally be used for "ordinary" arguments of a window 3433 * function, such as the offset argument of lead() or lag(). 3434 */ 3435 Datum 3436 WinGetFuncArgCurrent(WindowObject winobj, int argno, bool *isnull) 3437 { 3438 WindowAggState *winstate; 3439 ExprContext *econtext; 3440 3441 Assert(WindowObjectIsValid(winobj)); 3442 winstate = winobj->winstate; 3443 3444 econtext = winstate->ss.ps.ps_ExprContext; 3445 3446 econtext->ecxt_outertuple = winstate->ss.ss_ScanTupleSlot; 3447 return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno), 3448 econtext, isnull); 3449 } 3450