1 /*-------------------------------------------------------------------------
2 *
3 * nodeMemoize.c
4 * Routines to handle caching of results from parameterized nodes
5 *
6 * Portions Copyright (c) 2021, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 *
10 * IDENTIFICATION
11 * src/backend/executor/nodeMemoize.c
12 *
13 * Memoize nodes are intended to sit above parameterized nodes in the plan
14 * tree in order to cache results from them. The intention here is that a
15 * repeat scan with a parameter value that has already been seen by the node
16 * can fetch tuples from the cache rather than having to re-scan the outer
17 * node all over again. The query planner may choose to make use of one of
18 * these when it thinks rescans for previously seen values are likely enough
19 * to warrant adding the additional node.
20 *
21 * The method of cache we use is a hash table. When the cache fills, we never
22 * spill tuples to disk, instead, we choose to evict the least recently used
23 * cache entry from the cache. We remember the least recently used entry by
24 * always pushing new entries and entries we look for onto the tail of a
25 * doubly linked list. This means that older items always bubble to the top
26 * of this LRU list.
27 *
28 * Sometimes our callers won't run their scans to completion. For example a
29 * semi-join only needs to run until it finds a matching tuple, and once it
30 * does, the join operator skips to the next outer tuple and does not execute
31 * the inner side again on that scan. Because of this, we must keep track of
32 * when a cache entry is complete, and by default, we know it is when we run
33 * out of tuples to read during the scan. However, there are cases where we
34 * can mark the cache entry as complete without exhausting the scan of all
35 * tuples. One case is unique joins, where the join operator knows that there
36 * will only be at most one match for any given outer tuple. In order to
37 * support such cases we allow the "singlerow" option to be set for the cache.
38 * This option marks the cache entry as complete after we read the first tuple
39 * from the subnode.
40 *
41 * It's possible when we're filling the cache for a given set of parameters
42 * that we're unable to free enough memory to store any more tuples. If this
43 * happens then we'll have already evicted all other cache entries. When
44 * caching another tuple would cause us to exceed our memory budget, we must
45 * free the entry that we're currently populating and move the state machine
46 * into MEMO_CACHE_BYPASS_MODE. This means that we'll not attempt to cache
47 * any further tuples for this particular scan. We don't have the memory for
48 * it. The state machine will be reset again on the next rescan. If the
49 * memory requirements to cache the next parameter's tuples are less
50 * demanding, then that may allow us to start putting useful entries back into
51 * the cache again.
52 *
53 *
54 * INTERFACE ROUTINES
55 * ExecMemoize - lookup cache, exec subplan when not found
56 * ExecInitMemoize - initialize node and subnodes
57 * ExecEndMemoize - shutdown node and subnodes
58 * ExecReScanMemoize - rescan the memoize node
59 *
60 * ExecMemoizeEstimate estimates DSM space needed for parallel plan
61 * ExecMemoizeInitializeDSM initialize DSM for parallel plan
62 * ExecMemoizeInitializeWorker attach to DSM info in parallel worker
63 * ExecMemoizeRetrieveInstrumentation get instrumentation from worker
64 *-------------------------------------------------------------------------
65 */
66
67 #include "postgres.h"
68
69 #include "common/hashfn.h"
70 #include "executor/executor.h"
71 #include "executor/nodeMemoize.h"
72 #include "lib/ilist.h"
73 #include "miscadmin.h"
74 #include "utils/lsyscache.h"
75
76 /* States of the ExecMemoize state machine */
77 #define MEMO_CACHE_LOOKUP 1 /* Attempt to perform a cache lookup */
78 #define MEMO_CACHE_FETCH_NEXT_TUPLE 2 /* Get another tuple from the cache */
79 #define MEMO_FILLING_CACHE 3 /* Read outer node to fill cache */
80 #define MEMO_CACHE_BYPASS_MODE 4 /* Bypass mode. Just read from our
81 * subplan without caching anything */
82 #define MEMO_END_OF_SCAN 5 /* Ready for rescan */
83
84
85 /* Helper macros for memory accounting */
86 #define EMPTY_ENTRY_MEMORY_BYTES(e) (sizeof(MemoizeEntry) + \
87 sizeof(MemoizeKey) + \
88 (e)->key->params->t_len);
89 #define CACHE_TUPLE_BYTES(t) (sizeof(MemoizeTuple) + \
90 (t)->mintuple->t_len)
91
92 /* MemoizeTuple Stores an individually cached tuple */
93 typedef struct MemoizeTuple
94 {
95 MinimalTuple mintuple; /* Cached tuple */
96 struct MemoizeTuple *next; /* The next tuple with the same parameter
97 * values or NULL if it's the last one */
98 } MemoizeTuple;
99
100 /*
101 * MemoizeKey
102 * The hash table key for cached entries plus the LRU list link
103 */
104 typedef struct MemoizeKey
105 {
106 MinimalTuple params;
107 dlist_node lru_node; /* Pointer to next/prev key in LRU list */
108 } MemoizeKey;
109
110 /*
111 * MemoizeEntry
112 * The data struct that the cache hash table stores
113 */
114 typedef struct MemoizeEntry
115 {
116 MemoizeKey *key; /* Hash key for hash table lookups */
117 MemoizeTuple *tuplehead; /* Pointer to the first tuple or NULL if
118 * no tuples are cached for this entry */
119 uint32 hash; /* Hash value (cached) */
120 char status; /* Hash status */
121 bool complete; /* Did we read the outer plan to completion? */
122 } MemoizeEntry;
123
124
125 #define SH_PREFIX memoize
126 #define SH_ELEMENT_TYPE MemoizeEntry
127 #define SH_KEY_TYPE MemoizeKey *
128 #define SH_SCOPE static inline
129 #define SH_DECLARE
130 #include "lib/simplehash.h"
131
132 static uint32 MemoizeHash_hash(struct memoize_hash *tb,
133 const MemoizeKey *key);
134 static int MemoizeHash_equal(struct memoize_hash *tb,
135 const MemoizeKey *params1,
136 const MemoizeKey *params2);
137
138 #define SH_PREFIX memoize
139 #define SH_ELEMENT_TYPE MemoizeEntry
140 #define SH_KEY_TYPE MemoizeKey *
141 #define SH_KEY key
142 #define SH_HASH_KEY(tb, key) MemoizeHash_hash(tb, key)
143 #define SH_EQUAL(tb, a, b) (MemoizeHash_equal(tb, a, b) == 0)
144 #define SH_SCOPE static inline
145 #define SH_STORE_HASH
146 #define SH_GET_HASH(tb, a) a->hash
147 #define SH_DEFINE
148 #include "lib/simplehash.h"
149
150 /*
151 * MemoizeHash_hash
152 * Hash function for simplehash hashtable. 'key' is unused here as we
153 * require that all table lookups first populate the MemoizeState's
154 * probeslot with the key values to be looked up.
155 */
156 static uint32
MemoizeHash_hash(struct memoize_hash * tb,const MemoizeKey * key)157 MemoizeHash_hash(struct memoize_hash *tb, const MemoizeKey *key)
158 {
159 MemoizeState *mstate = (MemoizeState *) tb->private_data;
160 TupleTableSlot *pslot = mstate->probeslot;
161 uint32 hashkey = 0;
162 int numkeys = mstate->nkeys;
163 FmgrInfo *hashfunctions = mstate->hashfunctions;
164 Oid *collations = mstate->collations;
165
166 for (int i = 0; i < numkeys; i++)
167 {
168 /* rotate hashkey left 1 bit at each step */
169 hashkey = (hashkey << 1) | ((hashkey & 0x80000000) ? 1 : 0);
170
171 if (!pslot->tts_isnull[i]) /* treat nulls as having hash key 0 */
172 {
173 uint32 hkey;
174
175 hkey = DatumGetUInt32(FunctionCall1Coll(&hashfunctions[i],
176 collations[i], pslot->tts_values[i]));
177 hashkey ^= hkey;
178 }
179 }
180
181 return murmurhash32(hashkey);
182 }
183
184 /*
185 * MemoizeHash_equal
186 * Equality function for confirming hash value matches during a hash
187 * table lookup. 'key2' is never used. Instead the MemoizeState's
188 * probeslot is always populated with details of what's being looked up.
189 */
190 static int
MemoizeHash_equal(struct memoize_hash * tb,const MemoizeKey * key1,const MemoizeKey * key2)191 MemoizeHash_equal(struct memoize_hash *tb, const MemoizeKey *key1,
192 const MemoizeKey *key2)
193 {
194 MemoizeState *mstate = (MemoizeState *) tb->private_data;
195 ExprContext *econtext = mstate->ss.ps.ps_ExprContext;
196 TupleTableSlot *tslot = mstate->tableslot;
197 TupleTableSlot *pslot = mstate->probeslot;
198
199 /* probeslot should have already been prepared by prepare_probe_slot() */
200 ExecStoreMinimalTuple(key1->params, tslot, false);
201
202 econtext->ecxt_innertuple = tslot;
203 econtext->ecxt_outertuple = pslot;
204 return !ExecQualAndReset(mstate->cache_eq_expr, econtext);
205 }
206
207 /*
208 * Initialize the hash table to empty.
209 */
210 static void
build_hash_table(MemoizeState * mstate,uint32 size)211 build_hash_table(MemoizeState *mstate, uint32 size)
212 {
213 /* Make a guess at a good size when we're not given a valid size. */
214 if (size == 0)
215 size = 1024;
216
217 /* memoize_create will convert the size to a power of 2 */
218 mstate->hashtable = memoize_create(mstate->tableContext, size, mstate);
219 }
220
221 /*
222 * prepare_probe_slot
223 * Populate mstate's probeslot with the values from the tuple stored
224 * in 'key'. If 'key' is NULL, then perform the population by evaluating
225 * mstate's param_exprs.
226 */
227 static inline void
prepare_probe_slot(MemoizeState * mstate,MemoizeKey * key)228 prepare_probe_slot(MemoizeState *mstate, MemoizeKey *key)
229 {
230 TupleTableSlot *pslot = mstate->probeslot;
231 TupleTableSlot *tslot = mstate->tableslot;
232 int numKeys = mstate->nkeys;
233
234 ExecClearTuple(pslot);
235
236 if (key == NULL)
237 {
238 /* Set the probeslot's values based on the current parameter values */
239 for (int i = 0; i < numKeys; i++)
240 pslot->tts_values[i] = ExecEvalExpr(mstate->param_exprs[i],
241 mstate->ss.ps.ps_ExprContext,
242 &pslot->tts_isnull[i]);
243 }
244 else
245 {
246 /* Process the key's MinimalTuple and store the values in probeslot */
247 ExecStoreMinimalTuple(key->params, tslot, false);
248 slot_getallattrs(tslot);
249 memcpy(pslot->tts_values, tslot->tts_values, sizeof(Datum) * numKeys);
250 memcpy(pslot->tts_isnull, tslot->tts_isnull, sizeof(bool) * numKeys);
251 }
252
253 ExecStoreVirtualTuple(pslot);
254 }
255
256 /*
257 * entry_purge_tuples
258 * Remove all tuples from the cache entry pointed to by 'entry'. This
259 * leaves an empty cache entry. Also, update the memory accounting to
260 * reflect the removal of the tuples.
261 */
262 static inline void
entry_purge_tuples(MemoizeState * mstate,MemoizeEntry * entry)263 entry_purge_tuples(MemoizeState *mstate, MemoizeEntry *entry)
264 {
265 MemoizeTuple *tuple = entry->tuplehead;
266 uint64 freed_mem = 0;
267
268 while (tuple != NULL)
269 {
270 MemoizeTuple *next = tuple->next;
271
272 freed_mem += CACHE_TUPLE_BYTES(tuple);
273
274 /* Free memory used for this tuple */
275 pfree(tuple->mintuple);
276 pfree(tuple);
277
278 tuple = next;
279 }
280
281 entry->complete = false;
282 entry->tuplehead = NULL;
283
284 /* Update the memory accounting */
285 mstate->mem_used -= freed_mem;
286 }
287
288 /*
289 * remove_cache_entry
290 * Remove 'entry' from the cache and free memory used by it.
291 */
292 static void
remove_cache_entry(MemoizeState * mstate,MemoizeEntry * entry)293 remove_cache_entry(MemoizeState *mstate, MemoizeEntry *entry)
294 {
295 MemoizeKey *key = entry->key;
296
297 dlist_delete(&entry->key->lru_node);
298
299 /* Remove all of the tuples from this entry */
300 entry_purge_tuples(mstate, entry);
301
302 /*
303 * Update memory accounting. entry_purge_tuples should have already
304 * subtracted the memory used for each cached tuple. Here we just update
305 * the amount used by the entry itself.
306 */
307 mstate->mem_used -= EMPTY_ENTRY_MEMORY_BYTES(entry);
308
309 /* Remove the entry from the cache */
310 memoize_delete_item(mstate->hashtable, entry);
311
312 pfree(key->params);
313 pfree(key);
314 }
315
316 /*
317 * cache_reduce_memory
318 * Evict older and less recently used items from the cache in order to
319 * reduce the memory consumption back to something below the
320 * MemoizeState's mem_limit.
321 *
322 * 'specialkey', if not NULL, causes the function to return false if the entry
323 * which the key belongs to is removed from the cache.
324 */
325 static bool
cache_reduce_memory(MemoizeState * mstate,MemoizeKey * specialkey)326 cache_reduce_memory(MemoizeState *mstate, MemoizeKey *specialkey)
327 {
328 bool specialkey_intact = true; /* for now */
329 dlist_mutable_iter iter;
330 uint64 evictions = 0;
331
332 /* Update peak memory usage */
333 if (mstate->mem_used > mstate->stats.mem_peak)
334 mstate->stats.mem_peak = mstate->mem_used;
335
336 /* We expect only to be called when we've gone over budget on memory */
337 Assert(mstate->mem_used > mstate->mem_limit);
338
339 /* Start the eviction process starting at the head of the LRU list. */
340 dlist_foreach_modify(iter, &mstate->lru_list)
341 {
342 MemoizeKey *key = dlist_container(MemoizeKey, lru_node, iter.cur);
343 MemoizeEntry *entry;
344
345 /*
346 * Populate the hash probe slot in preparation for looking up this LRU
347 * entry.
348 */
349 prepare_probe_slot(mstate, key);
350
351 /*
352 * Ideally the LRU list pointers would be stored in the entry itself
353 * rather than in the key. Unfortunately, we can't do that as the
354 * simplehash.h code may resize the table and allocate new memory for
355 * entries which would result in those pointers pointing to the old
356 * buckets. However, it's fine to use the key to store this as that's
357 * only referenced by a pointer in the entry, which of course follows
358 * the entry whenever the hash table is resized. Since we only have a
359 * pointer to the key here, we must perform a hash table lookup to
360 * find the entry that the key belongs to.
361 */
362 entry = memoize_lookup(mstate->hashtable, NULL);
363
364 /* A good spot to check for corruption of the table and LRU list. */
365 Assert(entry != NULL);
366 Assert(entry->key == key);
367
368 /*
369 * If we're being called to free memory while the cache is being
370 * populated with new tuples, then we'd better take some care as we
371 * could end up freeing the entry which 'specialkey' belongs to.
372 * Generally callers will pass 'specialkey' as the key for the cache
373 * entry which is currently being populated, so we must set
374 * 'specialkey_intact' to false to inform the caller the specialkey
375 * entry has been removed.
376 */
377 if (key == specialkey)
378 specialkey_intact = false;
379
380 /*
381 * Finally remove the entry. This will remove from the LRU list too.
382 */
383 remove_cache_entry(mstate, entry);
384
385 evictions++;
386
387 /* Exit if we've freed enough memory */
388 if (mstate->mem_used <= mstate->mem_limit)
389 break;
390 }
391
392 mstate->stats.cache_evictions += evictions; /* Update Stats */
393
394 return specialkey_intact;
395 }
396
397 /*
398 * cache_lookup
399 * Perform a lookup to see if we've already cached tuples based on the
400 * scan's current parameters. If we find an existing entry we move it to
401 * the end of the LRU list, set *found to true then return it. If we
402 * don't find an entry then we create a new one and add it to the end of
403 * the LRU list. We also update cache memory accounting and remove older
404 * entries if we go over the memory budget. If we managed to free enough
405 * memory we return the new entry, else we return NULL.
406 *
407 * Callers can assume we'll never return NULL when *found is true.
408 */
409 static MemoizeEntry *
cache_lookup(MemoizeState * mstate,bool * found)410 cache_lookup(MemoizeState *mstate, bool *found)
411 {
412 MemoizeKey *key;
413 MemoizeEntry *entry;
414 MemoryContext oldcontext;
415
416 /* prepare the probe slot with the current scan parameters */
417 prepare_probe_slot(mstate, NULL);
418
419 /*
420 * Add the new entry to the cache. No need to pass a valid key since the
421 * hash function uses mstate's probeslot, which we populated above.
422 */
423 entry = memoize_insert(mstate->hashtable, NULL, found);
424
425 if (*found)
426 {
427 /*
428 * Move existing entry to the tail of the LRU list to mark it as the
429 * most recently used item.
430 */
431 dlist_move_tail(&mstate->lru_list, &entry->key->lru_node);
432
433 return entry;
434 }
435
436 oldcontext = MemoryContextSwitchTo(mstate->tableContext);
437
438 /* Allocate a new key */
439 entry->key = key = (MemoizeKey *) palloc(sizeof(MemoizeKey));
440 key->params = ExecCopySlotMinimalTuple(mstate->probeslot);
441
442 /* Update the total cache memory utilization */
443 mstate->mem_used += EMPTY_ENTRY_MEMORY_BYTES(entry);
444
445 /* Initialize this entry */
446 entry->complete = false;
447 entry->tuplehead = NULL;
448
449 /*
450 * Since this is the most recently used entry, push this entry onto the
451 * end of the LRU list.
452 */
453 dlist_push_tail(&mstate->lru_list, &entry->key->lru_node);
454
455 mstate->last_tuple = NULL;
456
457 MemoryContextSwitchTo(oldcontext);
458
459 /*
460 * If we've gone over our memory budget, then we'll free up some space in
461 * the cache.
462 */
463 if (mstate->mem_used > mstate->mem_limit)
464 {
465 /*
466 * Try to free up some memory. It's highly unlikely that we'll fail
467 * to do so here since the entry we've just added is yet to contain
468 * any tuples and we're able to remove any other entry to reduce the
469 * memory consumption.
470 */
471 if (unlikely(!cache_reduce_memory(mstate, key)))
472 return NULL;
473
474 /*
475 * The process of removing entries from the cache may have caused the
476 * code in simplehash.h to shuffle elements to earlier buckets in the
477 * hash table. If it has, we'll need to find the entry again by
478 * performing a lookup. Fortunately, we can detect if this has
479 * happened by seeing if the entry is still in use and that the key
480 * pointer matches our expected key.
481 */
482 if (entry->status != memoize_SH_IN_USE || entry->key != key)
483 {
484 /*
485 * We need to repopulate the probeslot as lookups performed during
486 * the cache evictions above will have stored some other key.
487 */
488 prepare_probe_slot(mstate, key);
489
490 /* Re-find the newly added entry */
491 entry = memoize_lookup(mstate->hashtable, NULL);
492 Assert(entry != NULL);
493 }
494 }
495
496 return entry;
497 }
498
499 /*
500 * cache_store_tuple
501 * Add the tuple stored in 'slot' to the mstate's current cache entry.
502 * The cache entry must have already been made with cache_lookup().
503 * mstate's last_tuple field must point to the tail of mstate->entry's
504 * list of tuples.
505 */
506 static bool
cache_store_tuple(MemoizeState * mstate,TupleTableSlot * slot)507 cache_store_tuple(MemoizeState *mstate, TupleTableSlot *slot)
508 {
509 MemoizeTuple *tuple;
510 MemoizeEntry *entry = mstate->entry;
511 MemoryContext oldcontext;
512
513 Assert(slot != NULL);
514 Assert(entry != NULL);
515
516 oldcontext = MemoryContextSwitchTo(mstate->tableContext);
517
518 tuple = (MemoizeTuple *) palloc(sizeof(MemoizeTuple));
519 tuple->mintuple = ExecCopySlotMinimalTuple(slot);
520 tuple->next = NULL;
521
522 /* Account for the memory we just consumed */
523 mstate->mem_used += CACHE_TUPLE_BYTES(tuple);
524
525 if (entry->tuplehead == NULL)
526 {
527 /*
528 * This is the first tuple for this entry, so just point the list head
529 * to it.
530 */
531 entry->tuplehead = tuple;
532 }
533 else
534 {
535 /* push this tuple onto the tail of the list */
536 mstate->last_tuple->next = tuple;
537 }
538
539 mstate->last_tuple = tuple;
540 MemoryContextSwitchTo(oldcontext);
541
542 /*
543 * If we've gone over our memory budget then free up some space in the
544 * cache.
545 */
546 if (mstate->mem_used > mstate->mem_limit)
547 {
548 MemoizeKey *key = entry->key;
549
550 if (!cache_reduce_memory(mstate, key))
551 return false;
552
553 /*
554 * The process of removing entries from the cache may have caused the
555 * code in simplehash.h to shuffle elements to earlier buckets in the
556 * hash table. If it has, we'll need to find the entry again by
557 * performing a lookup. Fortunately, we can detect if this has
558 * happened by seeing if the entry is still in use and that the key
559 * pointer matches our expected key.
560 */
561 if (entry->status != memoize_SH_IN_USE || entry->key != key)
562 {
563 /*
564 * We need to repopulate the probeslot as lookups performed during
565 * the cache evictions above will have stored some other key.
566 */
567 prepare_probe_slot(mstate, key);
568
569 /* Re-find the entry */
570 mstate->entry = entry = memoize_lookup(mstate->hashtable, NULL);
571 Assert(entry != NULL);
572 }
573 }
574
575 return true;
576 }
577
578 static TupleTableSlot *
ExecMemoize(PlanState * pstate)579 ExecMemoize(PlanState *pstate)
580 {
581 MemoizeState *node = castNode(MemoizeState, pstate);
582 PlanState *outerNode;
583 TupleTableSlot *slot;
584
585 switch (node->mstatus)
586 {
587 case MEMO_CACHE_LOOKUP:
588 {
589 MemoizeEntry *entry;
590 TupleTableSlot *outerslot;
591 bool found;
592
593 Assert(node->entry == NULL);
594
595 /*
596 * We're only ever in this state for the first call of the
597 * scan. Here we have a look to see if we've already seen the
598 * current parameters before and if we have already cached a
599 * complete set of records that the outer plan will return for
600 * these parameters.
601 *
602 * When we find a valid cache entry, we'll return the first
603 * tuple from it. If not found, we'll create a cache entry and
604 * then try to fetch a tuple from the outer scan. If we find
605 * one there, we'll try to cache it.
606 */
607
608 /* see if we've got anything cached for the current parameters */
609 entry = cache_lookup(node, &found);
610
611 if (found && entry->complete)
612 {
613 node->stats.cache_hits += 1; /* stats update */
614
615 /*
616 * Set last_tuple and entry so that the state
617 * MEMO_CACHE_FETCH_NEXT_TUPLE can easily find the next
618 * tuple for these parameters.
619 */
620 node->last_tuple = entry->tuplehead;
621 node->entry = entry;
622
623 /* Fetch the first cached tuple, if there is one */
624 if (entry->tuplehead)
625 {
626 node->mstatus = MEMO_CACHE_FETCH_NEXT_TUPLE;
627
628 slot = node->ss.ps.ps_ResultTupleSlot;
629 ExecStoreMinimalTuple(entry->tuplehead->mintuple,
630 slot, false);
631
632 return slot;
633 }
634
635 /* The cache entry is void of any tuples. */
636 node->mstatus = MEMO_END_OF_SCAN;
637 return NULL;
638 }
639
640 /* Handle cache miss */
641 node->stats.cache_misses += 1; /* stats update */
642
643 if (found)
644 {
645 /*
646 * A cache entry was found, but the scan for that entry
647 * did not run to completion. We'll just remove all
648 * tuples and start again. It might be tempting to
649 * continue where we left off, but there's no guarantee
650 * the outer node will produce the tuples in the same
651 * order as it did last time.
652 */
653 entry_purge_tuples(node, entry);
654 }
655
656 /* Scan the outer node for a tuple to cache */
657 outerNode = outerPlanState(node);
658 outerslot = ExecProcNode(outerNode);
659 if (TupIsNull(outerslot))
660 {
661 /*
662 * cache_lookup may have returned NULL due to failure to
663 * free enough cache space, so ensure we don't do anything
664 * here that assumes it worked. There's no need to go into
665 * bypass mode here as we're setting mstatus to end of
666 * scan.
667 */
668 if (likely(entry))
669 entry->complete = true;
670
671 node->mstatus = MEMO_END_OF_SCAN;
672 return NULL;
673 }
674
675 node->entry = entry;
676
677 /*
678 * If we failed to create the entry or failed to store the
679 * tuple in the entry, then go into bypass mode.
680 */
681 if (unlikely(entry == NULL ||
682 !cache_store_tuple(node, outerslot)))
683 {
684 node->stats.cache_overflows += 1; /* stats update */
685
686 node->mstatus = MEMO_CACHE_BYPASS_MODE;
687
688 /*
689 * No need to clear out last_tuple as we'll stay in bypass
690 * mode until the end of the scan.
691 */
692 }
693 else
694 {
695 /*
696 * If we only expect a single row from this scan then we
697 * can mark that we're not expecting more. This allows
698 * cache lookups to work even when the scan has not been
699 * executed to completion.
700 */
701 entry->complete = node->singlerow;
702 node->mstatus = MEMO_FILLING_CACHE;
703 }
704
705 slot = node->ss.ps.ps_ResultTupleSlot;
706 ExecCopySlot(slot, outerslot);
707 return slot;
708 }
709
710 case MEMO_CACHE_FETCH_NEXT_TUPLE:
711 {
712 /* We shouldn't be in this state if these are not set */
713 Assert(node->entry != NULL);
714 Assert(node->last_tuple != NULL);
715
716 /* Skip to the next tuple to output */
717 node->last_tuple = node->last_tuple->next;
718
719 /* No more tuples in the cache */
720 if (node->last_tuple == NULL)
721 {
722 node->mstatus = MEMO_END_OF_SCAN;
723 return NULL;
724 }
725
726 slot = node->ss.ps.ps_ResultTupleSlot;
727 ExecStoreMinimalTuple(node->last_tuple->mintuple, slot,
728 false);
729
730 return slot;
731 }
732
733 case MEMO_FILLING_CACHE:
734 {
735 TupleTableSlot *outerslot;
736 MemoizeEntry *entry = node->entry;
737
738 /* entry should already have been set by MEMO_CACHE_LOOKUP */
739 Assert(entry != NULL);
740
741 /*
742 * When in the MEMO_FILLING_CACHE state, we've just had a
743 * cache miss and are populating the cache with the current
744 * scan tuples.
745 */
746 outerNode = outerPlanState(node);
747 outerslot = ExecProcNode(outerNode);
748 if (TupIsNull(outerslot))
749 {
750 /* No more tuples. Mark it as complete */
751 entry->complete = true;
752 node->mstatus = MEMO_END_OF_SCAN;
753 return NULL;
754 }
755
756 /*
757 * Validate if the planner properly set the singlerow flag. It
758 * should only set that if each cache entry can, at most,
759 * return 1 row.
760 */
761 if (unlikely(entry->complete))
762 elog(ERROR, "cache entry already complete");
763
764 /* Record the tuple in the current cache entry */
765 if (unlikely(!cache_store_tuple(node, outerslot)))
766 {
767 /* Couldn't store it? Handle overflow */
768 node->stats.cache_overflows += 1; /* stats update */
769
770 node->mstatus = MEMO_CACHE_BYPASS_MODE;
771
772 /*
773 * No need to clear out entry or last_tuple as we'll stay
774 * in bypass mode until the end of the scan.
775 */
776 }
777
778 slot = node->ss.ps.ps_ResultTupleSlot;
779 ExecCopySlot(slot, outerslot);
780 return slot;
781 }
782
783 case MEMO_CACHE_BYPASS_MODE:
784 {
785 TupleTableSlot *outerslot;
786
787 /*
788 * When in bypass mode we just continue to read tuples without
789 * caching. We need to wait until the next rescan before we
790 * can come out of this mode.
791 */
792 outerNode = outerPlanState(node);
793 outerslot = ExecProcNode(outerNode);
794 if (TupIsNull(outerslot))
795 {
796 node->mstatus = MEMO_END_OF_SCAN;
797 return NULL;
798 }
799
800 slot = node->ss.ps.ps_ResultTupleSlot;
801 ExecCopySlot(slot, outerslot);
802 return slot;
803 }
804
805 case MEMO_END_OF_SCAN:
806
807 /*
808 * We've already returned NULL for this scan, but just in case
809 * something calls us again by mistake.
810 */
811 return NULL;
812
813 default:
814 elog(ERROR, "unrecognized memoize state: %d",
815 (int) node->mstatus);
816 return NULL;
817 } /* switch */
818 }
819
820 MemoizeState *
ExecInitMemoize(Memoize * node,EState * estate,int eflags)821 ExecInitMemoize(Memoize *node, EState *estate, int eflags)
822 {
823 MemoizeState *mstate = makeNode(MemoizeState);
824 Plan *outerNode;
825 int i;
826 int nkeys;
827 Oid *eqfuncoids;
828
829 /* check for unsupported flags */
830 Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
831
832 mstate->ss.ps.plan = (Plan *) node;
833 mstate->ss.ps.state = estate;
834 mstate->ss.ps.ExecProcNode = ExecMemoize;
835
836 /*
837 * Miscellaneous initialization
838 *
839 * create expression context for node
840 */
841 ExecAssignExprContext(estate, &mstate->ss.ps);
842
843 outerNode = outerPlan(node);
844 outerPlanState(mstate) = ExecInitNode(outerNode, estate, eflags);
845
846 /*
847 * Initialize return slot and type. No need to initialize projection info
848 * because this node doesn't do projections.
849 */
850 ExecInitResultTupleSlotTL(&mstate->ss.ps, &TTSOpsMinimalTuple);
851 mstate->ss.ps.ps_ProjInfo = NULL;
852
853 /*
854 * Initialize scan slot and type.
855 */
856 ExecCreateScanSlotFromOuterPlan(estate, &mstate->ss, &TTSOpsMinimalTuple);
857
858 /*
859 * Set the state machine to lookup the cache. We won't find anything
860 * until we cache something, but this saves a special case to create the
861 * first entry.
862 */
863 mstate->mstatus = MEMO_CACHE_LOOKUP;
864
865 mstate->nkeys = nkeys = node->numKeys;
866 mstate->hashkeydesc = ExecTypeFromExprList(node->param_exprs);
867 mstate->tableslot = MakeSingleTupleTableSlot(mstate->hashkeydesc,
868 &TTSOpsMinimalTuple);
869 mstate->probeslot = MakeSingleTupleTableSlot(mstate->hashkeydesc,
870 &TTSOpsVirtual);
871
872 mstate->param_exprs = (ExprState **) palloc(nkeys * sizeof(ExprState *));
873 mstate->collations = node->collations; /* Just point directly to the plan
874 * data */
875 mstate->hashfunctions = (FmgrInfo *) palloc(nkeys * sizeof(FmgrInfo));
876
877 eqfuncoids = palloc(nkeys * sizeof(Oid));
878
879 for (i = 0; i < nkeys; i++)
880 {
881 Oid hashop = node->hashOperators[i];
882 Oid left_hashfn;
883 Oid right_hashfn;
884 Expr *param_expr = (Expr *) list_nth(node->param_exprs, i);
885
886 if (!get_op_hash_functions(hashop, &left_hashfn, &right_hashfn))
887 elog(ERROR, "could not find hash function for hash operator %u",
888 hashop);
889
890 fmgr_info(left_hashfn, &mstate->hashfunctions[i]);
891
892 mstate->param_exprs[i] = ExecInitExpr(param_expr, (PlanState *) mstate);
893 eqfuncoids[i] = get_opcode(hashop);
894 }
895
896 mstate->cache_eq_expr = ExecBuildParamSetEqual(mstate->hashkeydesc,
897 &TTSOpsMinimalTuple,
898 &TTSOpsVirtual,
899 eqfuncoids,
900 node->collations,
901 node->param_exprs,
902 (PlanState *) mstate);
903
904 pfree(eqfuncoids);
905 mstate->mem_used = 0;
906
907 /* Limit the total memory consumed by the cache to this */
908 mstate->mem_limit = get_hash_memory_limit();
909
910 /* A memory context dedicated for the cache */
911 mstate->tableContext = AllocSetContextCreate(CurrentMemoryContext,
912 "MemoizeHashTable",
913 ALLOCSET_DEFAULT_SIZES);
914
915 dlist_init(&mstate->lru_list);
916 mstate->last_tuple = NULL;
917 mstate->entry = NULL;
918
919 /*
920 * Mark if we can assume the cache entry is completed after we get the
921 * first record for it. Some callers might not call us again after
922 * getting the first match. e.g. A join operator performing a unique join
923 * is able to skip to the next outer tuple after getting the first
924 * matching inner tuple. In this case, the cache entry is complete after
925 * getting the first tuple. This allows us to mark it as so.
926 */
927 mstate->singlerow = node->singlerow;
928
929 /* Zero the statistics counters */
930 memset(&mstate->stats, 0, sizeof(MemoizeInstrumentation));
931
932 /* Allocate and set up the actual cache */
933 build_hash_table(mstate, node->est_entries);
934
935 return mstate;
936 }
937
938 void
ExecEndMemoize(MemoizeState * node)939 ExecEndMemoize(MemoizeState *node)
940 {
941 #ifdef USE_ASSERT_CHECKING
942 /* Validate the memory accounting code is correct in assert builds. */
943 {
944 int count;
945 uint64 mem = 0;
946 memoize_iterator i;
947 MemoizeEntry *entry;
948
949 memoize_start_iterate(node->hashtable, &i);
950
951 count = 0;
952 while ((entry = memoize_iterate(node->hashtable, &i)) != NULL)
953 {
954 MemoizeTuple *tuple = entry->tuplehead;
955
956 mem += EMPTY_ENTRY_MEMORY_BYTES(entry);
957 while (tuple != NULL)
958 {
959 mem += CACHE_TUPLE_BYTES(tuple);
960 tuple = tuple->next;
961 }
962 count++;
963 }
964
965 Assert(count == node->hashtable->members);
966 Assert(mem == node->mem_used);
967 }
968 #endif
969
970 /*
971 * When ending a parallel worker, copy the statistics gathered by the
972 * worker back into shared memory so that it can be picked up by the main
973 * process to report in EXPLAIN ANALYZE.
974 */
975 if (node->shared_info != NULL && IsParallelWorker())
976 {
977 MemoizeInstrumentation *si;
978
979 /* Make mem_peak available for EXPLAIN */
980 if (node->stats.mem_peak == 0)
981 node->stats.mem_peak = node->mem_used;
982
983 Assert(ParallelWorkerNumber <= node->shared_info->num_workers);
984 si = &node->shared_info->sinstrument[ParallelWorkerNumber];
985 memcpy(si, &node->stats, sizeof(MemoizeInstrumentation));
986 }
987
988 /* Remove the cache context */
989 MemoryContextDelete(node->tableContext);
990
991 ExecClearTuple(node->ss.ss_ScanTupleSlot);
992 /* must drop pointer to cache result tuple */
993 ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
994
995 /*
996 * free exprcontext
997 */
998 ExecFreeExprContext(&node->ss.ps);
999
1000 /*
1001 * shut down the subplan
1002 */
1003 ExecEndNode(outerPlanState(node));
1004 }
1005
1006 void
ExecReScanMemoize(MemoizeState * node)1007 ExecReScanMemoize(MemoizeState *node)
1008 {
1009 PlanState *outerPlan = outerPlanState(node);
1010
1011 /* Mark that we must lookup the cache for a new set of parameters */
1012 node->mstatus = MEMO_CACHE_LOOKUP;
1013
1014 /* nullify pointers used for the last scan */
1015 node->entry = NULL;
1016 node->last_tuple = NULL;
1017
1018 /*
1019 * if chgParam of subnode is not null then plan will be re-scanned by
1020 * first ExecProcNode.
1021 */
1022 if (outerPlan->chgParam == NULL)
1023 ExecReScan(outerPlan);
1024
1025 }
1026
1027 /*
1028 * ExecEstimateCacheEntryOverheadBytes
1029 * For use in the query planner to help it estimate the amount of memory
1030 * required to store a single entry in the cache.
1031 */
1032 double
ExecEstimateCacheEntryOverheadBytes(double ntuples)1033 ExecEstimateCacheEntryOverheadBytes(double ntuples)
1034 {
1035 return sizeof(MemoizeEntry) + sizeof(MemoizeKey) + sizeof(MemoizeTuple) *
1036 ntuples;
1037 }
1038
1039 /* ----------------------------------------------------------------
1040 * Parallel Query Support
1041 * ----------------------------------------------------------------
1042 */
1043
1044 /* ----------------------------------------------------------------
1045 * ExecMemoizeEstimate
1046 *
1047 * Estimate space required to propagate memoize statistics.
1048 * ----------------------------------------------------------------
1049 */
1050 void
ExecMemoizeEstimate(MemoizeState * node,ParallelContext * pcxt)1051 ExecMemoizeEstimate(MemoizeState *node, ParallelContext *pcxt)
1052 {
1053 Size size;
1054
1055 /* don't need this if not instrumenting or no workers */
1056 if (!node->ss.ps.instrument || pcxt->nworkers == 0)
1057 return;
1058
1059 size = mul_size(pcxt->nworkers, sizeof(MemoizeInstrumentation));
1060 size = add_size(size, offsetof(SharedMemoizeInfo, sinstrument));
1061 shm_toc_estimate_chunk(&pcxt->estimator, size);
1062 shm_toc_estimate_keys(&pcxt->estimator, 1);
1063 }
1064
1065 /* ----------------------------------------------------------------
1066 * ExecMemoizeInitializeDSM
1067 *
1068 * Initialize DSM space for memoize statistics.
1069 * ----------------------------------------------------------------
1070 */
1071 void
ExecMemoizeInitializeDSM(MemoizeState * node,ParallelContext * pcxt)1072 ExecMemoizeInitializeDSM(MemoizeState *node, ParallelContext *pcxt)
1073 {
1074 Size size;
1075
1076 /* don't need this if not instrumenting or no workers */
1077 if (!node->ss.ps.instrument || pcxt->nworkers == 0)
1078 return;
1079
1080 size = offsetof(SharedMemoizeInfo, sinstrument)
1081 + pcxt->nworkers * sizeof(MemoizeInstrumentation);
1082 node->shared_info = shm_toc_allocate(pcxt->toc, size);
1083 /* ensure any unfilled slots will contain zeroes */
1084 memset(node->shared_info, 0, size);
1085 node->shared_info->num_workers = pcxt->nworkers;
1086 shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id,
1087 node->shared_info);
1088 }
1089
1090 /* ----------------------------------------------------------------
1091 * ExecMemoizeInitializeWorker
1092 *
1093 * Attach worker to DSM space for memoize statistics.
1094 * ----------------------------------------------------------------
1095 */
1096 void
ExecMemoizeInitializeWorker(MemoizeState * node,ParallelWorkerContext * pwcxt)1097 ExecMemoizeInitializeWorker(MemoizeState *node, ParallelWorkerContext *pwcxt)
1098 {
1099 node->shared_info =
1100 shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, true);
1101 }
1102
1103 /* ----------------------------------------------------------------
1104 * ExecMemoizeRetrieveInstrumentation
1105 *
1106 * Transfer memoize statistics from DSM to private memory.
1107 * ----------------------------------------------------------------
1108 */
1109 void
ExecMemoizeRetrieveInstrumentation(MemoizeState * node)1110 ExecMemoizeRetrieveInstrumentation(MemoizeState *node)
1111 {
1112 Size size;
1113 SharedMemoizeInfo *si;
1114
1115 if (node->shared_info == NULL)
1116 return;
1117
1118 size = offsetof(SharedMemoizeInfo, sinstrument)
1119 + node->shared_info->num_workers * sizeof(MemoizeInstrumentation);
1120 si = palloc(size);
1121 memcpy(si, node->shared_info, size);
1122 node->shared_info = si;
1123 }
1124