1 /*------------------------------------------------------------------------- 2 * 3 * nodeMemoize.c 4 * Routines to handle caching of results from parameterized nodes 5 * 6 * Portions Copyright (c) 2021, PostgreSQL Global Development Group 7 * Portions Copyright (c) 1994, Regents of the University of California 8 * 9 * 10 * IDENTIFICATION 11 * src/backend/executor/nodeMemoize.c 12 * 13 * Memoize nodes are intended to sit above parameterized nodes in the plan 14 * tree in order to cache results from them. The intention here is that a 15 * repeat scan with a parameter value that has already been seen by the node 16 * can fetch tuples from the cache rather than having to re-scan the outer 17 * node all over again. The query planner may choose to make use of one of 18 * these when it thinks rescans for previously seen values are likely enough 19 * to warrant adding the additional node. 20 * 21 * The method of cache we use is a hash table. When the cache fills, we never 22 * spill tuples to disk, instead, we choose to evict the least recently used 23 * cache entry from the cache. We remember the least recently used entry by 24 * always pushing new entries and entries we look for onto the tail of a 25 * doubly linked list. This means that older items always bubble to the top 26 * of this LRU list. 27 * 28 * Sometimes our callers won't run their scans to completion. For example a 29 * semi-join only needs to run until it finds a matching tuple, and once it 30 * does, the join operator skips to the next outer tuple and does not execute 31 * the inner side again on that scan. Because of this, we must keep track of 32 * when a cache entry is complete, and by default, we know it is when we run 33 * out of tuples to read during the scan. However, there are cases where we 34 * can mark the cache entry as complete without exhausting the scan of all 35 * tuples. One case is unique joins, where the join operator knows that there 36 * will only be at most one match for any given outer tuple. In order to 37 * support such cases we allow the "singlerow" option to be set for the cache. 38 * This option marks the cache entry as complete after we read the first tuple 39 * from the subnode. 40 * 41 * It's possible when we're filling the cache for a given set of parameters 42 * that we're unable to free enough memory to store any more tuples. If this 43 * happens then we'll have already evicted all other cache entries. When 44 * caching another tuple would cause us to exceed our memory budget, we must 45 * free the entry that we're currently populating and move the state machine 46 * into MEMO_CACHE_BYPASS_MODE. This means that we'll not attempt to cache 47 * any further tuples for this particular scan. We don't have the memory for 48 * it. The state machine will be reset again on the next rescan. If the 49 * memory requirements to cache the next parameter's tuples are less 50 * demanding, then that may allow us to start putting useful entries back into 51 * the cache again. 52 * 53 * 54 * INTERFACE ROUTINES 55 * ExecMemoize - lookup cache, exec subplan when not found 56 * ExecInitMemoize - initialize node and subnodes 57 * ExecEndMemoize - shutdown node and subnodes 58 * ExecReScanMemoize - rescan the memoize node 59 * 60 * ExecMemoizeEstimate estimates DSM space needed for parallel plan 61 * ExecMemoizeInitializeDSM initialize DSM for parallel plan 62 * ExecMemoizeInitializeWorker attach to DSM info in parallel worker 63 * ExecMemoizeRetrieveInstrumentation get instrumentation from worker 64 *------------------------------------------------------------------------- 65 */ 66 67 #include "postgres.h" 68 69 #include "common/hashfn.h" 70 #include "executor/executor.h" 71 #include "executor/nodeMemoize.h" 72 #include "lib/ilist.h" 73 #include "miscadmin.h" 74 #include "utils/lsyscache.h" 75 76 /* States of the ExecMemoize state machine */ 77 #define MEMO_CACHE_LOOKUP 1 /* Attempt to perform a cache lookup */ 78 #define MEMO_CACHE_FETCH_NEXT_TUPLE 2 /* Get another tuple from the cache */ 79 #define MEMO_FILLING_CACHE 3 /* Read outer node to fill cache */ 80 #define MEMO_CACHE_BYPASS_MODE 4 /* Bypass mode. Just read from our 81 * subplan without caching anything */ 82 #define MEMO_END_OF_SCAN 5 /* Ready for rescan */ 83 84 85 /* Helper macros for memory accounting */ 86 #define EMPTY_ENTRY_MEMORY_BYTES(e) (sizeof(MemoizeEntry) + \ 87 sizeof(MemoizeKey) + \ 88 (e)->key->params->t_len); 89 #define CACHE_TUPLE_BYTES(t) (sizeof(MemoizeTuple) + \ 90 (t)->mintuple->t_len) 91 92 /* MemoizeTuple Stores an individually cached tuple */ 93 typedef struct MemoizeTuple 94 { 95 MinimalTuple mintuple; /* Cached tuple */ 96 struct MemoizeTuple *next; /* The next tuple with the same parameter 97 * values or NULL if it's the last one */ 98 } MemoizeTuple; 99 100 /* 101 * MemoizeKey 102 * The hash table key for cached entries plus the LRU list link 103 */ 104 typedef struct MemoizeKey 105 { 106 MinimalTuple params; 107 dlist_node lru_node; /* Pointer to next/prev key in LRU list */ 108 } MemoizeKey; 109 110 /* 111 * MemoizeEntry 112 * The data struct that the cache hash table stores 113 */ 114 typedef struct MemoizeEntry 115 { 116 MemoizeKey *key; /* Hash key for hash table lookups */ 117 MemoizeTuple *tuplehead; /* Pointer to the first tuple or NULL if 118 * no tuples are cached for this entry */ 119 uint32 hash; /* Hash value (cached) */ 120 char status; /* Hash status */ 121 bool complete; /* Did we read the outer plan to completion? */ 122 } MemoizeEntry; 123 124 125 #define SH_PREFIX memoize 126 #define SH_ELEMENT_TYPE MemoizeEntry 127 #define SH_KEY_TYPE MemoizeKey * 128 #define SH_SCOPE static inline 129 #define SH_DECLARE 130 #include "lib/simplehash.h" 131 132 static uint32 MemoizeHash_hash(struct memoize_hash *tb, 133 const MemoizeKey *key); 134 static int MemoizeHash_equal(struct memoize_hash *tb, 135 const MemoizeKey *params1, 136 const MemoizeKey *params2); 137 138 #define SH_PREFIX memoize 139 #define SH_ELEMENT_TYPE MemoizeEntry 140 #define SH_KEY_TYPE MemoizeKey * 141 #define SH_KEY key 142 #define SH_HASH_KEY(tb, key) MemoizeHash_hash(tb, key) 143 #define SH_EQUAL(tb, a, b) (MemoizeHash_equal(tb, a, b) == 0) 144 #define SH_SCOPE static inline 145 #define SH_STORE_HASH 146 #define SH_GET_HASH(tb, a) a->hash 147 #define SH_DEFINE 148 #include "lib/simplehash.h" 149 150 /* 151 * MemoizeHash_hash 152 * Hash function for simplehash hashtable. 'key' is unused here as we 153 * require that all table lookups first populate the MemoizeState's 154 * probeslot with the key values to be looked up. 155 */ 156 static uint32 157 MemoizeHash_hash(struct memoize_hash *tb, const MemoizeKey *key) 158 { 159 MemoizeState *mstate = (MemoizeState *) tb->private_data; 160 TupleTableSlot *pslot = mstate->probeslot; 161 uint32 hashkey = 0; 162 int numkeys = mstate->nkeys; 163 FmgrInfo *hashfunctions = mstate->hashfunctions; 164 Oid *collations = mstate->collations; 165 166 for (int i = 0; i < numkeys; i++) 167 { 168 /* rotate hashkey left 1 bit at each step */ 169 hashkey = (hashkey << 1) | ((hashkey & 0x80000000) ? 1 : 0); 170 171 if (!pslot->tts_isnull[i]) /* treat nulls as having hash key 0 */ 172 { 173 uint32 hkey; 174 175 hkey = DatumGetUInt32(FunctionCall1Coll(&hashfunctions[i], 176 collations[i], pslot->tts_values[i])); 177 hashkey ^= hkey; 178 } 179 } 180 181 return murmurhash32(hashkey); 182 } 183 184 /* 185 * MemoizeHash_equal 186 * Equality function for confirming hash value matches during a hash 187 * table lookup. 'key2' is never used. Instead the MemoizeState's 188 * probeslot is always populated with details of what's being looked up. 189 */ 190 static int 191 MemoizeHash_equal(struct memoize_hash *tb, const MemoizeKey *key1, 192 const MemoizeKey *key2) 193 { 194 MemoizeState *mstate = (MemoizeState *) tb->private_data; 195 ExprContext *econtext = mstate->ss.ps.ps_ExprContext; 196 TupleTableSlot *tslot = mstate->tableslot; 197 TupleTableSlot *pslot = mstate->probeslot; 198 199 /* probeslot should have already been prepared by prepare_probe_slot() */ 200 ExecStoreMinimalTuple(key1->params, tslot, false); 201 202 econtext->ecxt_innertuple = tslot; 203 econtext->ecxt_outertuple = pslot; 204 return !ExecQualAndReset(mstate->cache_eq_expr, econtext); 205 } 206 207 /* 208 * Initialize the hash table to empty. 209 */ 210 static void 211 build_hash_table(MemoizeState *mstate, uint32 size) 212 { 213 /* Make a guess at a good size when we're not given a valid size. */ 214 if (size == 0) 215 size = 1024; 216 217 /* memoize_create will convert the size to a power of 2 */ 218 mstate->hashtable = memoize_create(mstate->tableContext, size, mstate); 219 } 220 221 /* 222 * prepare_probe_slot 223 * Populate mstate's probeslot with the values from the tuple stored 224 * in 'key'. If 'key' is NULL, then perform the population by evaluating 225 * mstate's param_exprs. 226 */ 227 static inline void 228 prepare_probe_slot(MemoizeState *mstate, MemoizeKey *key) 229 { 230 TupleTableSlot *pslot = mstate->probeslot; 231 TupleTableSlot *tslot = mstate->tableslot; 232 int numKeys = mstate->nkeys; 233 234 ExecClearTuple(pslot); 235 236 if (key == NULL) 237 { 238 /* Set the probeslot's values based on the current parameter values */ 239 for (int i = 0; i < numKeys; i++) 240 pslot->tts_values[i] = ExecEvalExpr(mstate->param_exprs[i], 241 mstate->ss.ps.ps_ExprContext, 242 &pslot->tts_isnull[i]); 243 } 244 else 245 { 246 /* Process the key's MinimalTuple and store the values in probeslot */ 247 ExecStoreMinimalTuple(key->params, tslot, false); 248 slot_getallattrs(tslot); 249 memcpy(pslot->tts_values, tslot->tts_values, sizeof(Datum) * numKeys); 250 memcpy(pslot->tts_isnull, tslot->tts_isnull, sizeof(bool) * numKeys); 251 } 252 253 ExecStoreVirtualTuple(pslot); 254 } 255 256 /* 257 * entry_purge_tuples 258 * Remove all tuples from the cache entry pointed to by 'entry'. This 259 * leaves an empty cache entry. Also, update the memory accounting to 260 * reflect the removal of the tuples. 261 */ 262 static inline void 263 entry_purge_tuples(MemoizeState *mstate, MemoizeEntry *entry) 264 { 265 MemoizeTuple *tuple = entry->tuplehead; 266 uint64 freed_mem = 0; 267 268 while (tuple != NULL) 269 { 270 MemoizeTuple *next = tuple->next; 271 272 freed_mem += CACHE_TUPLE_BYTES(tuple); 273 274 /* Free memory used for this tuple */ 275 pfree(tuple->mintuple); 276 pfree(tuple); 277 278 tuple = next; 279 } 280 281 entry->complete = false; 282 entry->tuplehead = NULL; 283 284 /* Update the memory accounting */ 285 mstate->mem_used -= freed_mem; 286 } 287 288 /* 289 * remove_cache_entry 290 * Remove 'entry' from the cache and free memory used by it. 291 */ 292 static void 293 remove_cache_entry(MemoizeState *mstate, MemoizeEntry *entry) 294 { 295 MemoizeKey *key = entry->key; 296 297 dlist_delete(&entry->key->lru_node); 298 299 /* Remove all of the tuples from this entry */ 300 entry_purge_tuples(mstate, entry); 301 302 /* 303 * Update memory accounting. entry_purge_tuples should have already 304 * subtracted the memory used for each cached tuple. Here we just update 305 * the amount used by the entry itself. 306 */ 307 mstate->mem_used -= EMPTY_ENTRY_MEMORY_BYTES(entry); 308 309 /* Remove the entry from the cache */ 310 memoize_delete_item(mstate->hashtable, entry); 311 312 pfree(key->params); 313 pfree(key); 314 } 315 316 /* 317 * cache_reduce_memory 318 * Evict older and less recently used items from the cache in order to 319 * reduce the memory consumption back to something below the 320 * MemoizeState's mem_limit. 321 * 322 * 'specialkey', if not NULL, causes the function to return false if the entry 323 * which the key belongs to is removed from the cache. 324 */ 325 static bool 326 cache_reduce_memory(MemoizeState *mstate, MemoizeKey *specialkey) 327 { 328 bool specialkey_intact = true; /* for now */ 329 dlist_mutable_iter iter; 330 uint64 evictions = 0; 331 332 /* Update peak memory usage */ 333 if (mstate->mem_used > mstate->stats.mem_peak) 334 mstate->stats.mem_peak = mstate->mem_used; 335 336 /* We expect only to be called when we've gone over budget on memory */ 337 Assert(mstate->mem_used > mstate->mem_limit); 338 339 /* Start the eviction process starting at the head of the LRU list. */ 340 dlist_foreach_modify(iter, &mstate->lru_list) 341 { 342 MemoizeKey *key = dlist_container(MemoizeKey, lru_node, iter.cur); 343 MemoizeEntry *entry; 344 345 /* 346 * Populate the hash probe slot in preparation for looking up this LRU 347 * entry. 348 */ 349 prepare_probe_slot(mstate, key); 350 351 /* 352 * Ideally the LRU list pointers would be stored in the entry itself 353 * rather than in the key. Unfortunately, we can't do that as the 354 * simplehash.h code may resize the table and allocate new memory for 355 * entries which would result in those pointers pointing to the old 356 * buckets. However, it's fine to use the key to store this as that's 357 * only referenced by a pointer in the entry, which of course follows 358 * the entry whenever the hash table is resized. Since we only have a 359 * pointer to the key here, we must perform a hash table lookup to 360 * find the entry that the key belongs to. 361 */ 362 entry = memoize_lookup(mstate->hashtable, NULL); 363 364 /* A good spot to check for corruption of the table and LRU list. */ 365 Assert(entry != NULL); 366 Assert(entry->key == key); 367 368 /* 369 * If we're being called to free memory while the cache is being 370 * populated with new tuples, then we'd better take some care as we 371 * could end up freeing the entry which 'specialkey' belongs to. 372 * Generally callers will pass 'specialkey' as the key for the cache 373 * entry which is currently being populated, so we must set 374 * 'specialkey_intact' to false to inform the caller the specialkey 375 * entry has been removed. 376 */ 377 if (key == specialkey) 378 specialkey_intact = false; 379 380 /* 381 * Finally remove the entry. This will remove from the LRU list too. 382 */ 383 remove_cache_entry(mstate, entry); 384 385 evictions++; 386 387 /* Exit if we've freed enough memory */ 388 if (mstate->mem_used <= mstate->mem_limit) 389 break; 390 } 391 392 mstate->stats.cache_evictions += evictions; /* Update Stats */ 393 394 return specialkey_intact; 395 } 396 397 /* 398 * cache_lookup 399 * Perform a lookup to see if we've already cached tuples based on the 400 * scan's current parameters. If we find an existing entry we move it to 401 * the end of the LRU list, set *found to true then return it. If we 402 * don't find an entry then we create a new one and add it to the end of 403 * the LRU list. We also update cache memory accounting and remove older 404 * entries if we go over the memory budget. If we managed to free enough 405 * memory we return the new entry, else we return NULL. 406 * 407 * Callers can assume we'll never return NULL when *found is true. 408 */ 409 static MemoizeEntry * 410 cache_lookup(MemoizeState *mstate, bool *found) 411 { 412 MemoizeKey *key; 413 MemoizeEntry *entry; 414 MemoryContext oldcontext; 415 416 /* prepare the probe slot with the current scan parameters */ 417 prepare_probe_slot(mstate, NULL); 418 419 /* 420 * Add the new entry to the cache. No need to pass a valid key since the 421 * hash function uses mstate's probeslot, which we populated above. 422 */ 423 entry = memoize_insert(mstate->hashtable, NULL, found); 424 425 if (*found) 426 { 427 /* 428 * Move existing entry to the tail of the LRU list to mark it as the 429 * most recently used item. 430 */ 431 dlist_move_tail(&mstate->lru_list, &entry->key->lru_node); 432 433 return entry; 434 } 435 436 oldcontext = MemoryContextSwitchTo(mstate->tableContext); 437 438 /* Allocate a new key */ 439 entry->key = key = (MemoizeKey *) palloc(sizeof(MemoizeKey)); 440 key->params = ExecCopySlotMinimalTuple(mstate->probeslot); 441 442 /* Update the total cache memory utilization */ 443 mstate->mem_used += EMPTY_ENTRY_MEMORY_BYTES(entry); 444 445 /* Initialize this entry */ 446 entry->complete = false; 447 entry->tuplehead = NULL; 448 449 /* 450 * Since this is the most recently used entry, push this entry onto the 451 * end of the LRU list. 452 */ 453 dlist_push_tail(&mstate->lru_list, &entry->key->lru_node); 454 455 mstate->last_tuple = NULL; 456 457 MemoryContextSwitchTo(oldcontext); 458 459 /* 460 * If we've gone over our memory budget, then we'll free up some space in 461 * the cache. 462 */ 463 if (mstate->mem_used > mstate->mem_limit) 464 { 465 /* 466 * Try to free up some memory. It's highly unlikely that we'll fail 467 * to do so here since the entry we've just added is yet to contain 468 * any tuples and we're able to remove any other entry to reduce the 469 * memory consumption. 470 */ 471 if (unlikely(!cache_reduce_memory(mstate, key))) 472 return NULL; 473 474 /* 475 * The process of removing entries from the cache may have caused the 476 * code in simplehash.h to shuffle elements to earlier buckets in the 477 * hash table. If it has, we'll need to find the entry again by 478 * performing a lookup. Fortunately, we can detect if this has 479 * happened by seeing if the entry is still in use and that the key 480 * pointer matches our expected key. 481 */ 482 if (entry->status != memoize_SH_IN_USE || entry->key != key) 483 { 484 /* 485 * We need to repopulate the probeslot as lookups performed during 486 * the cache evictions above will have stored some other key. 487 */ 488 prepare_probe_slot(mstate, key); 489 490 /* Re-find the newly added entry */ 491 entry = memoize_lookup(mstate->hashtable, NULL); 492 Assert(entry != NULL); 493 } 494 } 495 496 return entry; 497 } 498 499 /* 500 * cache_store_tuple 501 * Add the tuple stored in 'slot' to the mstate's current cache entry. 502 * The cache entry must have already been made with cache_lookup(). 503 * mstate's last_tuple field must point to the tail of mstate->entry's 504 * list of tuples. 505 */ 506 static bool 507 cache_store_tuple(MemoizeState *mstate, TupleTableSlot *slot) 508 { 509 MemoizeTuple *tuple; 510 MemoizeEntry *entry = mstate->entry; 511 MemoryContext oldcontext; 512 513 Assert(slot != NULL); 514 Assert(entry != NULL); 515 516 oldcontext = MemoryContextSwitchTo(mstate->tableContext); 517 518 tuple = (MemoizeTuple *) palloc(sizeof(MemoizeTuple)); 519 tuple->mintuple = ExecCopySlotMinimalTuple(slot); 520 tuple->next = NULL; 521 522 /* Account for the memory we just consumed */ 523 mstate->mem_used += CACHE_TUPLE_BYTES(tuple); 524 525 if (entry->tuplehead == NULL) 526 { 527 /* 528 * This is the first tuple for this entry, so just point the list head 529 * to it. 530 */ 531 entry->tuplehead = tuple; 532 } 533 else 534 { 535 /* push this tuple onto the tail of the list */ 536 mstate->last_tuple->next = tuple; 537 } 538 539 mstate->last_tuple = tuple; 540 MemoryContextSwitchTo(oldcontext); 541 542 /* 543 * If we've gone over our memory budget then free up some space in the 544 * cache. 545 */ 546 if (mstate->mem_used > mstate->mem_limit) 547 { 548 MemoizeKey *key = entry->key; 549 550 if (!cache_reduce_memory(mstate, key)) 551 return false; 552 553 /* 554 * The process of removing entries from the cache may have caused the 555 * code in simplehash.h to shuffle elements to earlier buckets in the 556 * hash table. If it has, we'll need to find the entry again by 557 * performing a lookup. Fortunately, we can detect if this has 558 * happened by seeing if the entry is still in use and that the key 559 * pointer matches our expected key. 560 */ 561 if (entry->status != memoize_SH_IN_USE || entry->key != key) 562 { 563 /* 564 * We need to repopulate the probeslot as lookups performed during 565 * the cache evictions above will have stored some other key. 566 */ 567 prepare_probe_slot(mstate, key); 568 569 /* Re-find the entry */ 570 mstate->entry = entry = memoize_lookup(mstate->hashtable, NULL); 571 Assert(entry != NULL); 572 } 573 } 574 575 return true; 576 } 577 578 static TupleTableSlot * 579 ExecMemoize(PlanState *pstate) 580 { 581 MemoizeState *node = castNode(MemoizeState, pstate); 582 PlanState *outerNode; 583 TupleTableSlot *slot; 584 585 switch (node->mstatus) 586 { 587 case MEMO_CACHE_LOOKUP: 588 { 589 MemoizeEntry *entry; 590 TupleTableSlot *outerslot; 591 bool found; 592 593 Assert(node->entry == NULL); 594 595 /* 596 * We're only ever in this state for the first call of the 597 * scan. Here we have a look to see if we've already seen the 598 * current parameters before and if we have already cached a 599 * complete set of records that the outer plan will return for 600 * these parameters. 601 * 602 * When we find a valid cache entry, we'll return the first 603 * tuple from it. If not found, we'll create a cache entry and 604 * then try to fetch a tuple from the outer scan. If we find 605 * one there, we'll try to cache it. 606 */ 607 608 /* see if we've got anything cached for the current parameters */ 609 entry = cache_lookup(node, &found); 610 611 if (found && entry->complete) 612 { 613 node->stats.cache_hits += 1; /* stats update */ 614 615 /* 616 * Set last_tuple and entry so that the state 617 * MEMO_CACHE_FETCH_NEXT_TUPLE can easily find the next 618 * tuple for these parameters. 619 */ 620 node->last_tuple = entry->tuplehead; 621 node->entry = entry; 622 623 /* Fetch the first cached tuple, if there is one */ 624 if (entry->tuplehead) 625 { 626 node->mstatus = MEMO_CACHE_FETCH_NEXT_TUPLE; 627 628 slot = node->ss.ps.ps_ResultTupleSlot; 629 ExecStoreMinimalTuple(entry->tuplehead->mintuple, 630 slot, false); 631 632 return slot; 633 } 634 635 /* The cache entry is void of any tuples. */ 636 node->mstatus = MEMO_END_OF_SCAN; 637 return NULL; 638 } 639 640 /* Handle cache miss */ 641 node->stats.cache_misses += 1; /* stats update */ 642 643 if (found) 644 { 645 /* 646 * A cache entry was found, but the scan for that entry 647 * did not run to completion. We'll just remove all 648 * tuples and start again. It might be tempting to 649 * continue where we left off, but there's no guarantee 650 * the outer node will produce the tuples in the same 651 * order as it did last time. 652 */ 653 entry_purge_tuples(node, entry); 654 } 655 656 /* Scan the outer node for a tuple to cache */ 657 outerNode = outerPlanState(node); 658 outerslot = ExecProcNode(outerNode); 659 if (TupIsNull(outerslot)) 660 { 661 /* 662 * cache_lookup may have returned NULL due to failure to 663 * free enough cache space, so ensure we don't do anything 664 * here that assumes it worked. There's no need to go into 665 * bypass mode here as we're setting mstatus to end of 666 * scan. 667 */ 668 if (likely(entry)) 669 entry->complete = true; 670 671 node->mstatus = MEMO_END_OF_SCAN; 672 return NULL; 673 } 674 675 node->entry = entry; 676 677 /* 678 * If we failed to create the entry or failed to store the 679 * tuple in the entry, then go into bypass mode. 680 */ 681 if (unlikely(entry == NULL || 682 !cache_store_tuple(node, outerslot))) 683 { 684 node->stats.cache_overflows += 1; /* stats update */ 685 686 node->mstatus = MEMO_CACHE_BYPASS_MODE; 687 688 /* 689 * No need to clear out last_tuple as we'll stay in bypass 690 * mode until the end of the scan. 691 */ 692 } 693 else 694 { 695 /* 696 * If we only expect a single row from this scan then we 697 * can mark that we're not expecting more. This allows 698 * cache lookups to work even when the scan has not been 699 * executed to completion. 700 */ 701 entry->complete = node->singlerow; 702 node->mstatus = MEMO_FILLING_CACHE; 703 } 704 705 slot = node->ss.ps.ps_ResultTupleSlot; 706 ExecCopySlot(slot, outerslot); 707 return slot; 708 } 709 710 case MEMO_CACHE_FETCH_NEXT_TUPLE: 711 { 712 /* We shouldn't be in this state if these are not set */ 713 Assert(node->entry != NULL); 714 Assert(node->last_tuple != NULL); 715 716 /* Skip to the next tuple to output */ 717 node->last_tuple = node->last_tuple->next; 718 719 /* No more tuples in the cache */ 720 if (node->last_tuple == NULL) 721 { 722 node->mstatus = MEMO_END_OF_SCAN; 723 return NULL; 724 } 725 726 slot = node->ss.ps.ps_ResultTupleSlot; 727 ExecStoreMinimalTuple(node->last_tuple->mintuple, slot, 728 false); 729 730 return slot; 731 } 732 733 case MEMO_FILLING_CACHE: 734 { 735 TupleTableSlot *outerslot; 736 MemoizeEntry *entry = node->entry; 737 738 /* entry should already have been set by MEMO_CACHE_LOOKUP */ 739 Assert(entry != NULL); 740 741 /* 742 * When in the MEMO_FILLING_CACHE state, we've just had a 743 * cache miss and are populating the cache with the current 744 * scan tuples. 745 */ 746 outerNode = outerPlanState(node); 747 outerslot = ExecProcNode(outerNode); 748 if (TupIsNull(outerslot)) 749 { 750 /* No more tuples. Mark it as complete */ 751 entry->complete = true; 752 node->mstatus = MEMO_END_OF_SCAN; 753 return NULL; 754 } 755 756 /* 757 * Validate if the planner properly set the singlerow flag. It 758 * should only set that if each cache entry can, at most, 759 * return 1 row. 760 */ 761 if (unlikely(entry->complete)) 762 elog(ERROR, "cache entry already complete"); 763 764 /* Record the tuple in the current cache entry */ 765 if (unlikely(!cache_store_tuple(node, outerslot))) 766 { 767 /* Couldn't store it? Handle overflow */ 768 node->stats.cache_overflows += 1; /* stats update */ 769 770 node->mstatus = MEMO_CACHE_BYPASS_MODE; 771 772 /* 773 * No need to clear out entry or last_tuple as we'll stay 774 * in bypass mode until the end of the scan. 775 */ 776 } 777 778 slot = node->ss.ps.ps_ResultTupleSlot; 779 ExecCopySlot(slot, outerslot); 780 return slot; 781 } 782 783 case MEMO_CACHE_BYPASS_MODE: 784 { 785 TupleTableSlot *outerslot; 786 787 /* 788 * When in bypass mode we just continue to read tuples without 789 * caching. We need to wait until the next rescan before we 790 * can come out of this mode. 791 */ 792 outerNode = outerPlanState(node); 793 outerslot = ExecProcNode(outerNode); 794 if (TupIsNull(outerslot)) 795 { 796 node->mstatus = MEMO_END_OF_SCAN; 797 return NULL; 798 } 799 800 slot = node->ss.ps.ps_ResultTupleSlot; 801 ExecCopySlot(slot, outerslot); 802 return slot; 803 } 804 805 case MEMO_END_OF_SCAN: 806 807 /* 808 * We've already returned NULL for this scan, but just in case 809 * something calls us again by mistake. 810 */ 811 return NULL; 812 813 default: 814 elog(ERROR, "unrecognized memoize state: %d", 815 (int) node->mstatus); 816 return NULL; 817 } /* switch */ 818 } 819 820 MemoizeState * 821 ExecInitMemoize(Memoize *node, EState *estate, int eflags) 822 { 823 MemoizeState *mstate = makeNode(MemoizeState); 824 Plan *outerNode; 825 int i; 826 int nkeys; 827 Oid *eqfuncoids; 828 829 /* check for unsupported flags */ 830 Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK))); 831 832 mstate->ss.ps.plan = (Plan *) node; 833 mstate->ss.ps.state = estate; 834 mstate->ss.ps.ExecProcNode = ExecMemoize; 835 836 /* 837 * Miscellaneous initialization 838 * 839 * create expression context for node 840 */ 841 ExecAssignExprContext(estate, &mstate->ss.ps); 842 843 outerNode = outerPlan(node); 844 outerPlanState(mstate) = ExecInitNode(outerNode, estate, eflags); 845 846 /* 847 * Initialize return slot and type. No need to initialize projection info 848 * because this node doesn't do projections. 849 */ 850 ExecInitResultTupleSlotTL(&mstate->ss.ps, &TTSOpsMinimalTuple); 851 mstate->ss.ps.ps_ProjInfo = NULL; 852 853 /* 854 * Initialize scan slot and type. 855 */ 856 ExecCreateScanSlotFromOuterPlan(estate, &mstate->ss, &TTSOpsMinimalTuple); 857 858 /* 859 * Set the state machine to lookup the cache. We won't find anything 860 * until we cache something, but this saves a special case to create the 861 * first entry. 862 */ 863 mstate->mstatus = MEMO_CACHE_LOOKUP; 864 865 mstate->nkeys = nkeys = node->numKeys; 866 mstate->hashkeydesc = ExecTypeFromExprList(node->param_exprs); 867 mstate->tableslot = MakeSingleTupleTableSlot(mstate->hashkeydesc, 868 &TTSOpsMinimalTuple); 869 mstate->probeslot = MakeSingleTupleTableSlot(mstate->hashkeydesc, 870 &TTSOpsVirtual); 871 872 mstate->param_exprs = (ExprState **) palloc(nkeys * sizeof(ExprState *)); 873 mstate->collations = node->collations; /* Just point directly to the plan 874 * data */ 875 mstate->hashfunctions = (FmgrInfo *) palloc(nkeys * sizeof(FmgrInfo)); 876 877 eqfuncoids = palloc(nkeys * sizeof(Oid)); 878 879 for (i = 0; i < nkeys; i++) 880 { 881 Oid hashop = node->hashOperators[i]; 882 Oid left_hashfn; 883 Oid right_hashfn; 884 Expr *param_expr = (Expr *) list_nth(node->param_exprs, i); 885 886 if (!get_op_hash_functions(hashop, &left_hashfn, &right_hashfn)) 887 elog(ERROR, "could not find hash function for hash operator %u", 888 hashop); 889 890 fmgr_info(left_hashfn, &mstate->hashfunctions[i]); 891 892 mstate->param_exprs[i] = ExecInitExpr(param_expr, (PlanState *) mstate); 893 eqfuncoids[i] = get_opcode(hashop); 894 } 895 896 mstate->cache_eq_expr = ExecBuildParamSetEqual(mstate->hashkeydesc, 897 &TTSOpsMinimalTuple, 898 &TTSOpsVirtual, 899 eqfuncoids, 900 node->collations, 901 node->param_exprs, 902 (PlanState *) mstate); 903 904 pfree(eqfuncoids); 905 mstate->mem_used = 0; 906 907 /* Limit the total memory consumed by the cache to this */ 908 mstate->mem_limit = get_hash_memory_limit(); 909 910 /* A memory context dedicated for the cache */ 911 mstate->tableContext = AllocSetContextCreate(CurrentMemoryContext, 912 "MemoizeHashTable", 913 ALLOCSET_DEFAULT_SIZES); 914 915 dlist_init(&mstate->lru_list); 916 mstate->last_tuple = NULL; 917 mstate->entry = NULL; 918 919 /* 920 * Mark if we can assume the cache entry is completed after we get the 921 * first record for it. Some callers might not call us again after 922 * getting the first match. e.g. A join operator performing a unique join 923 * is able to skip to the next outer tuple after getting the first 924 * matching inner tuple. In this case, the cache entry is complete after 925 * getting the first tuple. This allows us to mark it as so. 926 */ 927 mstate->singlerow = node->singlerow; 928 929 /* Zero the statistics counters */ 930 memset(&mstate->stats, 0, sizeof(MemoizeInstrumentation)); 931 932 /* Allocate and set up the actual cache */ 933 build_hash_table(mstate, node->est_entries); 934 935 return mstate; 936 } 937 938 void 939 ExecEndMemoize(MemoizeState *node) 940 { 941 #ifdef USE_ASSERT_CHECKING 942 /* Validate the memory accounting code is correct in assert builds. */ 943 { 944 int count; 945 uint64 mem = 0; 946 memoize_iterator i; 947 MemoizeEntry *entry; 948 949 memoize_start_iterate(node->hashtable, &i); 950 951 count = 0; 952 while ((entry = memoize_iterate(node->hashtable, &i)) != NULL) 953 { 954 MemoizeTuple *tuple = entry->tuplehead; 955 956 mem += EMPTY_ENTRY_MEMORY_BYTES(entry); 957 while (tuple != NULL) 958 { 959 mem += CACHE_TUPLE_BYTES(tuple); 960 tuple = tuple->next; 961 } 962 count++; 963 } 964 965 Assert(count == node->hashtable->members); 966 Assert(mem == node->mem_used); 967 } 968 #endif 969 970 /* 971 * When ending a parallel worker, copy the statistics gathered by the 972 * worker back into shared memory so that it can be picked up by the main 973 * process to report in EXPLAIN ANALYZE. 974 */ 975 if (node->shared_info != NULL && IsParallelWorker()) 976 { 977 MemoizeInstrumentation *si; 978 979 /* Make mem_peak available for EXPLAIN */ 980 if (node->stats.mem_peak == 0) 981 node->stats.mem_peak = node->mem_used; 982 983 Assert(ParallelWorkerNumber <= node->shared_info->num_workers); 984 si = &node->shared_info->sinstrument[ParallelWorkerNumber]; 985 memcpy(si, &node->stats, sizeof(MemoizeInstrumentation)); 986 } 987 988 /* Remove the cache context */ 989 MemoryContextDelete(node->tableContext); 990 991 ExecClearTuple(node->ss.ss_ScanTupleSlot); 992 /* must drop pointer to cache result tuple */ 993 ExecClearTuple(node->ss.ps.ps_ResultTupleSlot); 994 995 /* 996 * free exprcontext 997 */ 998 ExecFreeExprContext(&node->ss.ps); 999 1000 /* 1001 * shut down the subplan 1002 */ 1003 ExecEndNode(outerPlanState(node)); 1004 } 1005 1006 void 1007 ExecReScanMemoize(MemoizeState *node) 1008 { 1009 PlanState *outerPlan = outerPlanState(node); 1010 1011 /* Mark that we must lookup the cache for a new set of parameters */ 1012 node->mstatus = MEMO_CACHE_LOOKUP; 1013 1014 /* nullify pointers used for the last scan */ 1015 node->entry = NULL; 1016 node->last_tuple = NULL; 1017 1018 /* 1019 * if chgParam of subnode is not null then plan will be re-scanned by 1020 * first ExecProcNode. 1021 */ 1022 if (outerPlan->chgParam == NULL) 1023 ExecReScan(outerPlan); 1024 1025 } 1026 1027 /* 1028 * ExecEstimateCacheEntryOverheadBytes 1029 * For use in the query planner to help it estimate the amount of memory 1030 * required to store a single entry in the cache. 1031 */ 1032 double 1033 ExecEstimateCacheEntryOverheadBytes(double ntuples) 1034 { 1035 return sizeof(MemoizeEntry) + sizeof(MemoizeKey) + sizeof(MemoizeTuple) * 1036 ntuples; 1037 } 1038 1039 /* ---------------------------------------------------------------- 1040 * Parallel Query Support 1041 * ---------------------------------------------------------------- 1042 */ 1043 1044 /* ---------------------------------------------------------------- 1045 * ExecMemoizeEstimate 1046 * 1047 * Estimate space required to propagate memoize statistics. 1048 * ---------------------------------------------------------------- 1049 */ 1050 void 1051 ExecMemoizeEstimate(MemoizeState *node, ParallelContext *pcxt) 1052 { 1053 Size size; 1054 1055 /* don't need this if not instrumenting or no workers */ 1056 if (!node->ss.ps.instrument || pcxt->nworkers == 0) 1057 return; 1058 1059 size = mul_size(pcxt->nworkers, sizeof(MemoizeInstrumentation)); 1060 size = add_size(size, offsetof(SharedMemoizeInfo, sinstrument)); 1061 shm_toc_estimate_chunk(&pcxt->estimator, size); 1062 shm_toc_estimate_keys(&pcxt->estimator, 1); 1063 } 1064 1065 /* ---------------------------------------------------------------- 1066 * ExecMemoizeInitializeDSM 1067 * 1068 * Initialize DSM space for memoize statistics. 1069 * ---------------------------------------------------------------- 1070 */ 1071 void 1072 ExecMemoizeInitializeDSM(MemoizeState *node, ParallelContext *pcxt) 1073 { 1074 Size size; 1075 1076 /* don't need this if not instrumenting or no workers */ 1077 if (!node->ss.ps.instrument || pcxt->nworkers == 0) 1078 return; 1079 1080 size = offsetof(SharedMemoizeInfo, sinstrument) 1081 + pcxt->nworkers * sizeof(MemoizeInstrumentation); 1082 node->shared_info = shm_toc_allocate(pcxt->toc, size); 1083 /* ensure any unfilled slots will contain zeroes */ 1084 memset(node->shared_info, 0, size); 1085 node->shared_info->num_workers = pcxt->nworkers; 1086 shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, 1087 node->shared_info); 1088 } 1089 1090 /* ---------------------------------------------------------------- 1091 * ExecMemoizeInitializeWorker 1092 * 1093 * Attach worker to DSM space for memoize statistics. 1094 * ---------------------------------------------------------------- 1095 */ 1096 void 1097 ExecMemoizeInitializeWorker(MemoizeState *node, ParallelWorkerContext *pwcxt) 1098 { 1099 node->shared_info = 1100 shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, true); 1101 } 1102 1103 /* ---------------------------------------------------------------- 1104 * ExecMemoizeRetrieveInstrumentation 1105 * 1106 * Transfer memoize statistics from DSM to private memory. 1107 * ---------------------------------------------------------------- 1108 */ 1109 void 1110 ExecMemoizeRetrieveInstrumentation(MemoizeState *node) 1111 { 1112 Size size; 1113 SharedMemoizeInfo *si; 1114 1115 if (node->shared_info == NULL) 1116 return; 1117 1118 size = offsetof(SharedMemoizeInfo, sinstrument) 1119 + node->shared_info->num_workers * sizeof(MemoizeInstrumentation); 1120 si = palloc(size); 1121 memcpy(si, node->shared_info, size); 1122 node->shared_info = si; 1123 } 1124