1 /*-------------------------------------------------------------------------
2  *
3  * nodeMemoize.c
4  *	  Routines to handle caching of results from parameterized nodes
5  *
6  * Portions Copyright (c) 2021, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/executor/nodeMemoize.c
12  *
13  * Memoize nodes are intended to sit above parameterized nodes in the plan
14  * tree in order to cache results from them.  The intention here is that a
15  * repeat scan with a parameter value that has already been seen by the node
16  * can fetch tuples from the cache rather than having to re-scan the outer
17  * node all over again.  The query planner may choose to make use of one of
18  * these when it thinks rescans for previously seen values are likely enough
19  * to warrant adding the additional node.
20  *
21  * The method of cache we use is a hash table.  When the cache fills, we never
22  * spill tuples to disk, instead, we choose to evict the least recently used
23  * cache entry from the cache.  We remember the least recently used entry by
24  * always pushing new entries and entries we look for onto the tail of a
25  * doubly linked list.  This means that older items always bubble to the top
26  * of this LRU list.
27  *
28  * Sometimes our callers won't run their scans to completion. For example a
29  * semi-join only needs to run until it finds a matching tuple, and once it
30  * does, the join operator skips to the next outer tuple and does not execute
31  * the inner side again on that scan.  Because of this, we must keep track of
32  * when a cache entry is complete, and by default, we know it is when we run
33  * out of tuples to read during the scan.  However, there are cases where we
34  * can mark the cache entry as complete without exhausting the scan of all
35  * tuples.  One case is unique joins, where the join operator knows that there
36  * will only be at most one match for any given outer tuple.  In order to
37  * support such cases we allow the "singlerow" option to be set for the cache.
38  * This option marks the cache entry as complete after we read the first tuple
39  * from the subnode.
40  *
41  * It's possible when we're filling the cache for a given set of parameters
42  * that we're unable to free enough memory to store any more tuples.  If this
43  * happens then we'll have already evicted all other cache entries.  When
44  * caching another tuple would cause us to exceed our memory budget, we must
45  * free the entry that we're currently populating and move the state machine
46  * into MEMO_CACHE_BYPASS_MODE.  This means that we'll not attempt to cache
47  * any further tuples for this particular scan.  We don't have the memory for
48  * it.  The state machine will be reset again on the next rescan.  If the
49  * memory requirements to cache the next parameter's tuples are less
50  * demanding, then that may allow us to start putting useful entries back into
51  * the cache again.
52  *
53  *
54  * INTERFACE ROUTINES
55  *		ExecMemoize			- lookup cache, exec subplan when not found
56  *		ExecInitMemoize		- initialize node and subnodes
57  *		ExecEndMemoize		- shutdown node and subnodes
58  *		ExecReScanMemoize	- rescan the memoize node
59  *
60  *		ExecMemoizeEstimate		estimates DSM space needed for parallel plan
61  *		ExecMemoizeInitializeDSM initialize DSM for parallel plan
62  *		ExecMemoizeInitializeWorker attach to DSM info in parallel worker
63  *		ExecMemoizeRetrieveInstrumentation get instrumentation from worker
64  *-------------------------------------------------------------------------
65  */
66 
67 #include "postgres.h"
68 
69 #include "common/hashfn.h"
70 #include "executor/executor.h"
71 #include "executor/nodeMemoize.h"
72 #include "lib/ilist.h"
73 #include "miscadmin.h"
74 #include "utils/lsyscache.h"
75 
76 /* States of the ExecMemoize state machine */
77 #define MEMO_CACHE_LOOKUP			1	/* Attempt to perform a cache lookup */
78 #define MEMO_CACHE_FETCH_NEXT_TUPLE	2	/* Get another tuple from the cache */
79 #define MEMO_FILLING_CACHE			3	/* Read outer node to fill cache */
80 #define MEMO_CACHE_BYPASS_MODE		4	/* Bypass mode.  Just read from our
81 										 * subplan without caching anything */
82 #define MEMO_END_OF_SCAN			5	/* Ready for rescan */
83 
84 
85 /* Helper macros for memory accounting */
86 #define EMPTY_ENTRY_MEMORY_BYTES(e)		(sizeof(MemoizeEntry) + \
87 										 sizeof(MemoizeKey) + \
88 										 (e)->key->params->t_len);
89 #define CACHE_TUPLE_BYTES(t)			(sizeof(MemoizeTuple) + \
90 										 (t)->mintuple->t_len)
91 
92  /* MemoizeTuple Stores an individually cached tuple */
93 typedef struct MemoizeTuple
94 {
95 	MinimalTuple mintuple;		/* Cached tuple */
96 	struct MemoizeTuple *next;	/* The next tuple with the same parameter
97 								 * values or NULL if it's the last one */
98 } MemoizeTuple;
99 
100 /*
101  * MemoizeKey
102  * The hash table key for cached entries plus the LRU list link
103  */
104 typedef struct MemoizeKey
105 {
106 	MinimalTuple params;
107 	dlist_node	lru_node;		/* Pointer to next/prev key in LRU list */
108 } MemoizeKey;
109 
110 /*
111  * MemoizeEntry
112  *		The data struct that the cache hash table stores
113  */
114 typedef struct MemoizeEntry
115 {
116 	MemoizeKey *key;			/* Hash key for hash table lookups */
117 	MemoizeTuple *tuplehead;	/* Pointer to the first tuple or NULL if
118 								 * no tuples are cached for this entry */
119 	uint32		hash;			/* Hash value (cached) */
120 	char		status;			/* Hash status */
121 	bool		complete;		/* Did we read the outer plan to completion? */
122 } MemoizeEntry;
123 
124 
125 #define SH_PREFIX memoize
126 #define SH_ELEMENT_TYPE MemoizeEntry
127 #define SH_KEY_TYPE MemoizeKey *
128 #define SH_SCOPE static inline
129 #define SH_DECLARE
130 #include "lib/simplehash.h"
131 
132 static uint32 MemoizeHash_hash(struct memoize_hash *tb,
133 							   const MemoizeKey *key);
134 static int MemoizeHash_equal(struct memoize_hash *tb,
135 							 const MemoizeKey *params1,
136 							 const MemoizeKey *params2);
137 
138 #define SH_PREFIX memoize
139 #define SH_ELEMENT_TYPE MemoizeEntry
140 #define SH_KEY_TYPE MemoizeKey *
141 #define SH_KEY key
142 #define SH_HASH_KEY(tb, key) MemoizeHash_hash(tb, key)
143 #define SH_EQUAL(tb, a, b) (MemoizeHash_equal(tb, a, b) == 0)
144 #define SH_SCOPE static inline
145 #define SH_STORE_HASH
146 #define SH_GET_HASH(tb, a) a->hash
147 #define SH_DEFINE
148 #include "lib/simplehash.h"
149 
150 /*
151  * MemoizeHash_hash
152  *		Hash function for simplehash hashtable.  'key' is unused here as we
153  *		require that all table lookups first populate the MemoizeState's
154  *		probeslot with the key values to be looked up.
155  */
156 static uint32
MemoizeHash_hash(struct memoize_hash * tb,const MemoizeKey * key)157 MemoizeHash_hash(struct memoize_hash *tb, const MemoizeKey *key)
158 {
159 	MemoizeState *mstate = (MemoizeState *) tb->private_data;
160 	TupleTableSlot *pslot = mstate->probeslot;
161 	uint32		hashkey = 0;
162 	int			numkeys = mstate->nkeys;
163 	FmgrInfo   *hashfunctions = mstate->hashfunctions;
164 	Oid		   *collations = mstate->collations;
165 
166 	for (int i = 0; i < numkeys; i++)
167 	{
168 		/* rotate hashkey left 1 bit at each step */
169 		hashkey = (hashkey << 1) | ((hashkey & 0x80000000) ? 1 : 0);
170 
171 		if (!pslot->tts_isnull[i])	/* treat nulls as having hash key 0 */
172 		{
173 			uint32		hkey;
174 
175 			hkey = DatumGetUInt32(FunctionCall1Coll(&hashfunctions[i],
176 													collations[i], pslot->tts_values[i]));
177 			hashkey ^= hkey;
178 		}
179 	}
180 
181 	return murmurhash32(hashkey);
182 }
183 
184 /*
185  * MemoizeHash_equal
186  *		Equality function for confirming hash value matches during a hash
187  *		table lookup.  'key2' is never used.  Instead the MemoizeState's
188  *		probeslot is always populated with details of what's being looked up.
189  */
190 static int
MemoizeHash_equal(struct memoize_hash * tb,const MemoizeKey * key1,const MemoizeKey * key2)191 MemoizeHash_equal(struct memoize_hash *tb, const MemoizeKey *key1,
192 			  const MemoizeKey *key2)
193 {
194 	MemoizeState *mstate = (MemoizeState *) tb->private_data;
195 	ExprContext *econtext = mstate->ss.ps.ps_ExprContext;
196 	TupleTableSlot *tslot = mstate->tableslot;
197 	TupleTableSlot *pslot = mstate->probeslot;
198 
199 	/* probeslot should have already been prepared by prepare_probe_slot() */
200 	ExecStoreMinimalTuple(key1->params, tslot, false);
201 
202 	econtext->ecxt_innertuple = tslot;
203 	econtext->ecxt_outertuple = pslot;
204 	return !ExecQualAndReset(mstate->cache_eq_expr, econtext);
205 }
206 
207 /*
208  * Initialize the hash table to empty.
209  */
210 static void
build_hash_table(MemoizeState * mstate,uint32 size)211 build_hash_table(MemoizeState *mstate, uint32 size)
212 {
213 	/* Make a guess at a good size when we're not given a valid size. */
214 	if (size == 0)
215 		size = 1024;
216 
217 	/* memoize_create will convert the size to a power of 2 */
218 	mstate->hashtable = memoize_create(mstate->tableContext, size, mstate);
219 }
220 
221 /*
222  * prepare_probe_slot
223  *		Populate mstate's probeslot with the values from the tuple stored
224  *		in 'key'.  If 'key' is NULL, then perform the population by evaluating
225  *		mstate's param_exprs.
226  */
227 static inline void
prepare_probe_slot(MemoizeState * mstate,MemoizeKey * key)228 prepare_probe_slot(MemoizeState *mstate, MemoizeKey *key)
229 {
230 	TupleTableSlot *pslot = mstate->probeslot;
231 	TupleTableSlot *tslot = mstate->tableslot;
232 	int			numKeys = mstate->nkeys;
233 
234 	ExecClearTuple(pslot);
235 
236 	if (key == NULL)
237 	{
238 		/* Set the probeslot's values based on the current parameter values */
239 		for (int i = 0; i < numKeys; i++)
240 			pslot->tts_values[i] = ExecEvalExpr(mstate->param_exprs[i],
241 												mstate->ss.ps.ps_ExprContext,
242 												&pslot->tts_isnull[i]);
243 	}
244 	else
245 	{
246 		/* Process the key's MinimalTuple and store the values in probeslot */
247 		ExecStoreMinimalTuple(key->params, tslot, false);
248 		slot_getallattrs(tslot);
249 		memcpy(pslot->tts_values, tslot->tts_values, sizeof(Datum) * numKeys);
250 		memcpy(pslot->tts_isnull, tslot->tts_isnull, sizeof(bool) * numKeys);
251 	}
252 
253 	ExecStoreVirtualTuple(pslot);
254 }
255 
256 /*
257  * entry_purge_tuples
258  *		Remove all tuples from the cache entry pointed to by 'entry'.  This
259  *		leaves an empty cache entry.  Also, update the memory accounting to
260  *		reflect the removal of the tuples.
261  */
262 static inline void
entry_purge_tuples(MemoizeState * mstate,MemoizeEntry * entry)263 entry_purge_tuples(MemoizeState *mstate, MemoizeEntry *entry)
264 {
265 	MemoizeTuple *tuple = entry->tuplehead;
266 	uint64		freed_mem = 0;
267 
268 	while (tuple != NULL)
269 	{
270 		MemoizeTuple *next = tuple->next;
271 
272 		freed_mem += CACHE_TUPLE_BYTES(tuple);
273 
274 		/* Free memory used for this tuple */
275 		pfree(tuple->mintuple);
276 		pfree(tuple);
277 
278 		tuple = next;
279 	}
280 
281 	entry->complete = false;
282 	entry->tuplehead = NULL;
283 
284 	/* Update the memory accounting */
285 	mstate->mem_used -= freed_mem;
286 }
287 
288 /*
289  * remove_cache_entry
290  *		Remove 'entry' from the cache and free memory used by it.
291  */
292 static void
remove_cache_entry(MemoizeState * mstate,MemoizeEntry * entry)293 remove_cache_entry(MemoizeState *mstate, MemoizeEntry *entry)
294 {
295 	MemoizeKey *key = entry->key;
296 
297 	dlist_delete(&entry->key->lru_node);
298 
299 	/* Remove all of the tuples from this entry */
300 	entry_purge_tuples(mstate, entry);
301 
302 	/*
303 	 * Update memory accounting. entry_purge_tuples should have already
304 	 * subtracted the memory used for each cached tuple.  Here we just update
305 	 * the amount used by the entry itself.
306 	 */
307 	mstate->mem_used -= EMPTY_ENTRY_MEMORY_BYTES(entry);
308 
309 	/* Remove the entry from the cache */
310 	memoize_delete_item(mstate->hashtable, entry);
311 
312 	pfree(key->params);
313 	pfree(key);
314 }
315 
316 /*
317  * cache_reduce_memory
318  *		Evict older and less recently used items from the cache in order to
319  *		reduce the memory consumption back to something below the
320  *		MemoizeState's mem_limit.
321  *
322  * 'specialkey', if not NULL, causes the function to return false if the entry
323  * which the key belongs to is removed from the cache.
324  */
325 static bool
cache_reduce_memory(MemoizeState * mstate,MemoizeKey * specialkey)326 cache_reduce_memory(MemoizeState *mstate, MemoizeKey *specialkey)
327 {
328 	bool		specialkey_intact = true;	/* for now */
329 	dlist_mutable_iter iter;
330 	uint64		evictions = 0;
331 
332 	/* Update peak memory usage */
333 	if (mstate->mem_used > mstate->stats.mem_peak)
334 		mstate->stats.mem_peak = mstate->mem_used;
335 
336 	/* We expect only to be called when we've gone over budget on memory */
337 	Assert(mstate->mem_used > mstate->mem_limit);
338 
339 	/* Start the eviction process starting at the head of the LRU list. */
340 	dlist_foreach_modify(iter, &mstate->lru_list)
341 	{
342 		MemoizeKey *key = dlist_container(MemoizeKey, lru_node, iter.cur);
343 		MemoizeEntry *entry;
344 
345 		/*
346 		 * Populate the hash probe slot in preparation for looking up this LRU
347 		 * entry.
348 		 */
349 		prepare_probe_slot(mstate, key);
350 
351 		/*
352 		 * Ideally the LRU list pointers would be stored in the entry itself
353 		 * rather than in the key.  Unfortunately, we can't do that as the
354 		 * simplehash.h code may resize the table and allocate new memory for
355 		 * entries which would result in those pointers pointing to the old
356 		 * buckets.  However, it's fine to use the key to store this as that's
357 		 * only referenced by a pointer in the entry, which of course follows
358 		 * the entry whenever the hash table is resized.  Since we only have a
359 		 * pointer to the key here, we must perform a hash table lookup to
360 		 * find the entry that the key belongs to.
361 		 */
362 		entry = memoize_lookup(mstate->hashtable, NULL);
363 
364 		/* A good spot to check for corruption of the table and LRU list. */
365 		Assert(entry != NULL);
366 		Assert(entry->key == key);
367 
368 		/*
369 		 * If we're being called to free memory while the cache is being
370 		 * populated with new tuples, then we'd better take some care as we
371 		 * could end up freeing the entry which 'specialkey' belongs to.
372 		 * Generally callers will pass 'specialkey' as the key for the cache
373 		 * entry which is currently being populated, so we must set
374 		 * 'specialkey_intact' to false to inform the caller the specialkey
375 		 * entry has been removed.
376 		 */
377 		if (key == specialkey)
378 			specialkey_intact = false;
379 
380 		/*
381 		 * Finally remove the entry.  This will remove from the LRU list too.
382 		 */
383 		remove_cache_entry(mstate, entry);
384 
385 		evictions++;
386 
387 		/* Exit if we've freed enough memory */
388 		if (mstate->mem_used <= mstate->mem_limit)
389 			break;
390 	}
391 
392 	mstate->stats.cache_evictions += evictions;	/* Update Stats */
393 
394 	return specialkey_intact;
395 }
396 
397 /*
398  * cache_lookup
399  *		Perform a lookup to see if we've already cached tuples based on the
400  *		scan's current parameters.  If we find an existing entry we move it to
401  *		the end of the LRU list, set *found to true then return it.  If we
402  *		don't find an entry then we create a new one and add it to the end of
403  *		the LRU list.  We also update cache memory accounting and remove older
404  *		entries if we go over the memory budget.  If we managed to free enough
405  *		memory we return the new entry, else we return NULL.
406  *
407  * Callers can assume we'll never return NULL when *found is true.
408  */
409 static MemoizeEntry *
cache_lookup(MemoizeState * mstate,bool * found)410 cache_lookup(MemoizeState *mstate, bool *found)
411 {
412 	MemoizeKey *key;
413 	MemoizeEntry *entry;
414 	MemoryContext oldcontext;
415 
416 	/* prepare the probe slot with the current scan parameters */
417 	prepare_probe_slot(mstate, NULL);
418 
419 	/*
420 	 * Add the new entry to the cache.  No need to pass a valid key since the
421 	 * hash function uses mstate's probeslot, which we populated above.
422 	 */
423 	entry = memoize_insert(mstate->hashtable, NULL, found);
424 
425 	if (*found)
426 	{
427 		/*
428 		 * Move existing entry to the tail of the LRU list to mark it as the
429 		 * most recently used item.
430 		 */
431 		dlist_move_tail(&mstate->lru_list, &entry->key->lru_node);
432 
433 		return entry;
434 	}
435 
436 	oldcontext = MemoryContextSwitchTo(mstate->tableContext);
437 
438 	/* Allocate a new key */
439 	entry->key = key = (MemoizeKey *) palloc(sizeof(MemoizeKey));
440 	key->params = ExecCopySlotMinimalTuple(mstate->probeslot);
441 
442 	/* Update the total cache memory utilization */
443 	mstate->mem_used += EMPTY_ENTRY_MEMORY_BYTES(entry);
444 
445 	/* Initialize this entry */
446 	entry->complete = false;
447 	entry->tuplehead = NULL;
448 
449 	/*
450 	 * Since this is the most recently used entry, push this entry onto the
451 	 * end of the LRU list.
452 	 */
453 	dlist_push_tail(&mstate->lru_list, &entry->key->lru_node);
454 
455 	mstate->last_tuple = NULL;
456 
457 	MemoryContextSwitchTo(oldcontext);
458 
459 	/*
460 	 * If we've gone over our memory budget, then we'll free up some space in
461 	 * the cache.
462 	 */
463 	if (mstate->mem_used > mstate->mem_limit)
464 	{
465 		/*
466 		 * Try to free up some memory.  It's highly unlikely that we'll fail
467 		 * to do so here since the entry we've just added is yet to contain
468 		 * any tuples and we're able to remove any other entry to reduce the
469 		 * memory consumption.
470 		 */
471 		if (unlikely(!cache_reduce_memory(mstate, key)))
472 			return NULL;
473 
474 		/*
475 		 * The process of removing entries from the cache may have caused the
476 		 * code in simplehash.h to shuffle elements to earlier buckets in the
477 		 * hash table.  If it has, we'll need to find the entry again by
478 		 * performing a lookup.  Fortunately, we can detect if this has
479 		 * happened by seeing if the entry is still in use and that the key
480 		 * pointer matches our expected key.
481 		 */
482 		if (entry->status != memoize_SH_IN_USE || entry->key != key)
483 		{
484 			/*
485 			 * We need to repopulate the probeslot as lookups performed during
486 			 * the cache evictions above will have stored some other key.
487 			 */
488 			prepare_probe_slot(mstate, key);
489 
490 			/* Re-find the newly added entry */
491 			entry = memoize_lookup(mstate->hashtable, NULL);
492 			Assert(entry != NULL);
493 		}
494 	}
495 
496 	return entry;
497 }
498 
499 /*
500  * cache_store_tuple
501  *		Add the tuple stored in 'slot' to the mstate's current cache entry.
502  *		The cache entry must have already been made with cache_lookup().
503  *		mstate's last_tuple field must point to the tail of mstate->entry's
504  *		list of tuples.
505  */
506 static bool
cache_store_tuple(MemoizeState * mstate,TupleTableSlot * slot)507 cache_store_tuple(MemoizeState *mstate, TupleTableSlot *slot)
508 {
509 	MemoizeTuple *tuple;
510 	MemoizeEntry *entry = mstate->entry;
511 	MemoryContext oldcontext;
512 
513 	Assert(slot != NULL);
514 	Assert(entry != NULL);
515 
516 	oldcontext = MemoryContextSwitchTo(mstate->tableContext);
517 
518 	tuple = (MemoizeTuple *) palloc(sizeof(MemoizeTuple));
519 	tuple->mintuple = ExecCopySlotMinimalTuple(slot);
520 	tuple->next = NULL;
521 
522 	/* Account for the memory we just consumed */
523 	mstate->mem_used += CACHE_TUPLE_BYTES(tuple);
524 
525 	if (entry->tuplehead == NULL)
526 	{
527 		/*
528 		 * This is the first tuple for this entry, so just point the list head
529 		 * to it.
530 		 */
531 		entry->tuplehead = tuple;
532 	}
533 	else
534 	{
535 		/* push this tuple onto the tail of the list */
536 		mstate->last_tuple->next = tuple;
537 	}
538 
539 	mstate->last_tuple = tuple;
540 	MemoryContextSwitchTo(oldcontext);
541 
542 	/*
543 	 * If we've gone over our memory budget then free up some space in the
544 	 * cache.
545 	 */
546 	if (mstate->mem_used > mstate->mem_limit)
547 	{
548 		MemoizeKey *key = entry->key;
549 
550 		if (!cache_reduce_memory(mstate, key))
551 			return false;
552 
553 		/*
554 		 * The process of removing entries from the cache may have caused the
555 		 * code in simplehash.h to shuffle elements to earlier buckets in the
556 		 * hash table.  If it has, we'll need to find the entry again by
557 		 * performing a lookup.  Fortunately, we can detect if this has
558 		 * happened by seeing if the entry is still in use and that the key
559 		 * pointer matches our expected key.
560 		 */
561 		if (entry->status != memoize_SH_IN_USE || entry->key != key)
562 		{
563 			/*
564 			 * We need to repopulate the probeslot as lookups performed during
565 			 * the cache evictions above will have stored some other key.
566 			 */
567 			prepare_probe_slot(mstate, key);
568 
569 			/* Re-find the entry */
570 			mstate->entry = entry = memoize_lookup(mstate->hashtable, NULL);
571 			Assert(entry != NULL);
572 		}
573 	}
574 
575 	return true;
576 }
577 
578 static TupleTableSlot *
ExecMemoize(PlanState * pstate)579 ExecMemoize(PlanState *pstate)
580 {
581 	MemoizeState *node = castNode(MemoizeState, pstate);
582 	PlanState  *outerNode;
583 	TupleTableSlot *slot;
584 
585 	switch (node->mstatus)
586 	{
587 		case MEMO_CACHE_LOOKUP:
588 			{
589 				MemoizeEntry *entry;
590 				TupleTableSlot *outerslot;
591 				bool		found;
592 
593 				Assert(node->entry == NULL);
594 
595 				/*
596 				 * We're only ever in this state for the first call of the
597 				 * scan.  Here we have a look to see if we've already seen the
598 				 * current parameters before and if we have already cached a
599 				 * complete set of records that the outer plan will return for
600 				 * these parameters.
601 				 *
602 				 * When we find a valid cache entry, we'll return the first
603 				 * tuple from it. If not found, we'll create a cache entry and
604 				 * then try to fetch a tuple from the outer scan.  If we find
605 				 * one there, we'll try to cache it.
606 				 */
607 
608 				/* see if we've got anything cached for the current parameters */
609 				entry = cache_lookup(node, &found);
610 
611 				if (found && entry->complete)
612 				{
613 					node->stats.cache_hits += 1;	/* stats update */
614 
615 					/*
616 					 * Set last_tuple and entry so that the state
617 					 * MEMO_CACHE_FETCH_NEXT_TUPLE can easily find the next
618 					 * tuple for these parameters.
619 					 */
620 					node->last_tuple = entry->tuplehead;
621 					node->entry = entry;
622 
623 					/* Fetch the first cached tuple, if there is one */
624 					if (entry->tuplehead)
625 					{
626 						node->mstatus = MEMO_CACHE_FETCH_NEXT_TUPLE;
627 
628 						slot = node->ss.ps.ps_ResultTupleSlot;
629 						ExecStoreMinimalTuple(entry->tuplehead->mintuple,
630 											  slot, false);
631 
632 						return slot;
633 					}
634 
635 					/* The cache entry is void of any tuples. */
636 					node->mstatus = MEMO_END_OF_SCAN;
637 					return NULL;
638 				}
639 
640 				/* Handle cache miss */
641 				node->stats.cache_misses += 1;	/* stats update */
642 
643 				if (found)
644 				{
645 					/*
646 					 * A cache entry was found, but the scan for that entry
647 					 * did not run to completion.  We'll just remove all
648 					 * tuples and start again.  It might be tempting to
649 					 * continue where we left off, but there's no guarantee
650 					 * the outer node will produce the tuples in the same
651 					 * order as it did last time.
652 					 */
653 					entry_purge_tuples(node, entry);
654 				}
655 
656 				/* Scan the outer node for a tuple to cache */
657 				outerNode = outerPlanState(node);
658 				outerslot = ExecProcNode(outerNode);
659 				if (TupIsNull(outerslot))
660 				{
661 					/*
662 					 * cache_lookup may have returned NULL due to failure to
663 					 * free enough cache space, so ensure we don't do anything
664 					 * here that assumes it worked. There's no need to go into
665 					 * bypass mode here as we're setting mstatus to end of
666 					 * scan.
667 					 */
668 					if (likely(entry))
669 						entry->complete = true;
670 
671 					node->mstatus = MEMO_END_OF_SCAN;
672 					return NULL;
673 				}
674 
675 				node->entry = entry;
676 
677 				/*
678 				 * If we failed to create the entry or failed to store the
679 				 * tuple in the entry, then go into bypass mode.
680 				 */
681 				if (unlikely(entry == NULL ||
682 							 !cache_store_tuple(node, outerslot)))
683 				{
684 					node->stats.cache_overflows += 1;	/* stats update */
685 
686 					node->mstatus = MEMO_CACHE_BYPASS_MODE;
687 
688 					/*
689 					 * No need to clear out last_tuple as we'll stay in bypass
690 					 * mode until the end of the scan.
691 					 */
692 				}
693 				else
694 				{
695 					/*
696 					 * If we only expect a single row from this scan then we
697 					 * can mark that we're not expecting more.  This allows
698 					 * cache lookups to work even when the scan has not been
699 					 * executed to completion.
700 					 */
701 					entry->complete = node->singlerow;
702 					node->mstatus = MEMO_FILLING_CACHE;
703 				}
704 
705 				slot = node->ss.ps.ps_ResultTupleSlot;
706 				ExecCopySlot(slot, outerslot);
707 				return slot;
708 			}
709 
710 		case MEMO_CACHE_FETCH_NEXT_TUPLE:
711 			{
712 				/* We shouldn't be in this state if these are not set */
713 				Assert(node->entry != NULL);
714 				Assert(node->last_tuple != NULL);
715 
716 				/* Skip to the next tuple to output */
717 				node->last_tuple = node->last_tuple->next;
718 
719 				/* No more tuples in the cache */
720 				if (node->last_tuple == NULL)
721 				{
722 					node->mstatus = MEMO_END_OF_SCAN;
723 					return NULL;
724 				}
725 
726 				slot = node->ss.ps.ps_ResultTupleSlot;
727 				ExecStoreMinimalTuple(node->last_tuple->mintuple, slot,
728 									  false);
729 
730 				return slot;
731 			}
732 
733 		case MEMO_FILLING_CACHE:
734 			{
735 				TupleTableSlot *outerslot;
736 				MemoizeEntry *entry = node->entry;
737 
738 				/* entry should already have been set by MEMO_CACHE_LOOKUP */
739 				Assert(entry != NULL);
740 
741 				/*
742 				 * When in the MEMO_FILLING_CACHE state, we've just had a
743 				 * cache miss and are populating the cache with the current
744 				 * scan tuples.
745 				 */
746 				outerNode = outerPlanState(node);
747 				outerslot = ExecProcNode(outerNode);
748 				if (TupIsNull(outerslot))
749 				{
750 					/* No more tuples.  Mark it as complete */
751 					entry->complete = true;
752 					node->mstatus = MEMO_END_OF_SCAN;
753 					return NULL;
754 				}
755 
756 				/*
757 				 * Validate if the planner properly set the singlerow flag. It
758 				 * should only set that if each cache entry can, at most,
759 				 * return 1 row.
760 				 */
761 				if (unlikely(entry->complete))
762 					elog(ERROR, "cache entry already complete");
763 
764 				/* Record the tuple in the current cache entry */
765 				if (unlikely(!cache_store_tuple(node, outerslot)))
766 				{
767 					/* Couldn't store it?  Handle overflow */
768 					node->stats.cache_overflows += 1;	/* stats update */
769 
770 					node->mstatus = MEMO_CACHE_BYPASS_MODE;
771 
772 					/*
773 					 * No need to clear out entry or last_tuple as we'll stay
774 					 * in bypass mode until the end of the scan.
775 					 */
776 				}
777 
778 				slot = node->ss.ps.ps_ResultTupleSlot;
779 				ExecCopySlot(slot, outerslot);
780 				return slot;
781 			}
782 
783 		case MEMO_CACHE_BYPASS_MODE:
784 			{
785 				TupleTableSlot *outerslot;
786 
787 				/*
788 				 * When in bypass mode we just continue to read tuples without
789 				 * caching.  We need to wait until the next rescan before we
790 				 * can come out of this mode.
791 				 */
792 				outerNode = outerPlanState(node);
793 				outerslot = ExecProcNode(outerNode);
794 				if (TupIsNull(outerslot))
795 				{
796 					node->mstatus = MEMO_END_OF_SCAN;
797 					return NULL;
798 				}
799 
800 				slot = node->ss.ps.ps_ResultTupleSlot;
801 				ExecCopySlot(slot, outerslot);
802 				return slot;
803 			}
804 
805 		case MEMO_END_OF_SCAN:
806 
807 			/*
808 			 * We've already returned NULL for this scan, but just in case
809 			 * something calls us again by mistake.
810 			 */
811 			return NULL;
812 
813 		default:
814 			elog(ERROR, "unrecognized memoize state: %d",
815 				 (int) node->mstatus);
816 			return NULL;
817 	}							/* switch */
818 }
819 
820 MemoizeState *
ExecInitMemoize(Memoize * node,EState * estate,int eflags)821 ExecInitMemoize(Memoize *node, EState *estate, int eflags)
822 {
823 	MemoizeState *mstate = makeNode(MemoizeState);
824 	Plan	   *outerNode;
825 	int			i;
826 	int			nkeys;
827 	Oid		   *eqfuncoids;
828 
829 	/* check for unsupported flags */
830 	Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
831 
832 	mstate->ss.ps.plan = (Plan *) node;
833 	mstate->ss.ps.state = estate;
834 	mstate->ss.ps.ExecProcNode = ExecMemoize;
835 
836 	/*
837 	 * Miscellaneous initialization
838 	 *
839 	 * create expression context for node
840 	 */
841 	ExecAssignExprContext(estate, &mstate->ss.ps);
842 
843 	outerNode = outerPlan(node);
844 	outerPlanState(mstate) = ExecInitNode(outerNode, estate, eflags);
845 
846 	/*
847 	 * Initialize return slot and type. No need to initialize projection info
848 	 * because this node doesn't do projections.
849 	 */
850 	ExecInitResultTupleSlotTL(&mstate->ss.ps, &TTSOpsMinimalTuple);
851 	mstate->ss.ps.ps_ProjInfo = NULL;
852 
853 	/*
854 	 * Initialize scan slot and type.
855 	 */
856 	ExecCreateScanSlotFromOuterPlan(estate, &mstate->ss, &TTSOpsMinimalTuple);
857 
858 	/*
859 	 * Set the state machine to lookup the cache.  We won't find anything
860 	 * until we cache something, but this saves a special case to create the
861 	 * first entry.
862 	 */
863 	mstate->mstatus = MEMO_CACHE_LOOKUP;
864 
865 	mstate->nkeys = nkeys = node->numKeys;
866 	mstate->hashkeydesc = ExecTypeFromExprList(node->param_exprs);
867 	mstate->tableslot = MakeSingleTupleTableSlot(mstate->hashkeydesc,
868 												 &TTSOpsMinimalTuple);
869 	mstate->probeslot = MakeSingleTupleTableSlot(mstate->hashkeydesc,
870 												 &TTSOpsVirtual);
871 
872 	mstate->param_exprs = (ExprState **) palloc(nkeys * sizeof(ExprState *));
873 	mstate->collations = node->collations; /* Just point directly to the plan
874 											* data */
875 	mstate->hashfunctions = (FmgrInfo *) palloc(nkeys * sizeof(FmgrInfo));
876 
877 	eqfuncoids = palloc(nkeys * sizeof(Oid));
878 
879 	for (i = 0; i < nkeys; i++)
880 	{
881 		Oid			hashop = node->hashOperators[i];
882 		Oid			left_hashfn;
883 		Oid			right_hashfn;
884 		Expr	   *param_expr = (Expr *) list_nth(node->param_exprs, i);
885 
886 		if (!get_op_hash_functions(hashop, &left_hashfn, &right_hashfn))
887 			elog(ERROR, "could not find hash function for hash operator %u",
888 				 hashop);
889 
890 		fmgr_info(left_hashfn, &mstate->hashfunctions[i]);
891 
892 		mstate->param_exprs[i] = ExecInitExpr(param_expr, (PlanState *) mstate);
893 		eqfuncoids[i] = get_opcode(hashop);
894 	}
895 
896 	mstate->cache_eq_expr = ExecBuildParamSetEqual(mstate->hashkeydesc,
897 												   &TTSOpsMinimalTuple,
898 												   &TTSOpsVirtual,
899 												   eqfuncoids,
900 												   node->collations,
901 												   node->param_exprs,
902 												   (PlanState *) mstate);
903 
904 	pfree(eqfuncoids);
905 	mstate->mem_used = 0;
906 
907 	/* Limit the total memory consumed by the cache to this */
908 	mstate->mem_limit = get_hash_memory_limit();
909 
910 	/* A memory context dedicated for the cache */
911 	mstate->tableContext = AllocSetContextCreate(CurrentMemoryContext,
912 												 "MemoizeHashTable",
913 												 ALLOCSET_DEFAULT_SIZES);
914 
915 	dlist_init(&mstate->lru_list);
916 	mstate->last_tuple = NULL;
917 	mstate->entry = NULL;
918 
919 	/*
920 	 * Mark if we can assume the cache entry is completed after we get the
921 	 * first record for it.  Some callers might not call us again after
922 	 * getting the first match. e.g. A join operator performing a unique join
923 	 * is able to skip to the next outer tuple after getting the first
924 	 * matching inner tuple.  In this case, the cache entry is complete after
925 	 * getting the first tuple.  This allows us to mark it as so.
926 	 */
927 	mstate->singlerow = node->singlerow;
928 
929 	/* Zero the statistics counters */
930 	memset(&mstate->stats, 0, sizeof(MemoizeInstrumentation));
931 
932 	/* Allocate and set up the actual cache */
933 	build_hash_table(mstate, node->est_entries);
934 
935 	return mstate;
936 }
937 
938 void
ExecEndMemoize(MemoizeState * node)939 ExecEndMemoize(MemoizeState *node)
940 {
941 #ifdef USE_ASSERT_CHECKING
942 	/* Validate the memory accounting code is correct in assert builds. */
943 	{
944 		int			count;
945 		uint64		mem = 0;
946 		memoize_iterator i;
947 		MemoizeEntry *entry;
948 
949 		memoize_start_iterate(node->hashtable, &i);
950 
951 		count = 0;
952 		while ((entry = memoize_iterate(node->hashtable, &i)) != NULL)
953 		{
954 			MemoizeTuple *tuple = entry->tuplehead;
955 
956 			mem += EMPTY_ENTRY_MEMORY_BYTES(entry);
957 			while (tuple != NULL)
958 			{
959 				mem += CACHE_TUPLE_BYTES(tuple);
960 				tuple = tuple->next;
961 			}
962 			count++;
963 		}
964 
965 		Assert(count == node->hashtable->members);
966 		Assert(mem == node->mem_used);
967 	}
968 #endif
969 
970 	/*
971 	 * When ending a parallel worker, copy the statistics gathered by the
972 	 * worker back into shared memory so that it can be picked up by the main
973 	 * process to report in EXPLAIN ANALYZE.
974 	 */
975 	if (node->shared_info != NULL && IsParallelWorker())
976 	{
977 		MemoizeInstrumentation *si;
978 
979 		/* Make mem_peak available for EXPLAIN */
980 		if (node->stats.mem_peak == 0)
981 			node->stats.mem_peak = node->mem_used;
982 
983 		Assert(ParallelWorkerNumber <= node->shared_info->num_workers);
984 		si = &node->shared_info->sinstrument[ParallelWorkerNumber];
985 		memcpy(si, &node->stats, sizeof(MemoizeInstrumentation));
986 	}
987 
988 	/* Remove the cache context */
989 	MemoryContextDelete(node->tableContext);
990 
991 	ExecClearTuple(node->ss.ss_ScanTupleSlot);
992 	/* must drop pointer to cache result tuple */
993 	ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
994 
995 	/*
996 	 * free exprcontext
997 	 */
998 	ExecFreeExprContext(&node->ss.ps);
999 
1000 	/*
1001 	 * shut down the subplan
1002 	 */
1003 	ExecEndNode(outerPlanState(node));
1004 }
1005 
1006 void
ExecReScanMemoize(MemoizeState * node)1007 ExecReScanMemoize(MemoizeState *node)
1008 {
1009 	PlanState  *outerPlan = outerPlanState(node);
1010 
1011 	/* Mark that we must lookup the cache for a new set of parameters */
1012 	node->mstatus = MEMO_CACHE_LOOKUP;
1013 
1014 	/* nullify pointers used for the last scan */
1015 	node->entry = NULL;
1016 	node->last_tuple = NULL;
1017 
1018 	/*
1019 	 * if chgParam of subnode is not null then plan will be re-scanned by
1020 	 * first ExecProcNode.
1021 	 */
1022 	if (outerPlan->chgParam == NULL)
1023 		ExecReScan(outerPlan);
1024 
1025 }
1026 
1027 /*
1028  * ExecEstimateCacheEntryOverheadBytes
1029  *		For use in the query planner to help it estimate the amount of memory
1030  *		required to store a single entry in the cache.
1031  */
1032 double
ExecEstimateCacheEntryOverheadBytes(double ntuples)1033 ExecEstimateCacheEntryOverheadBytes(double ntuples)
1034 {
1035 	return sizeof(MemoizeEntry) + sizeof(MemoizeKey) + sizeof(MemoizeTuple) *
1036 			ntuples;
1037 }
1038 
1039 /* ----------------------------------------------------------------
1040  *						Parallel Query Support
1041  * ----------------------------------------------------------------
1042  */
1043 
1044  /* ----------------------------------------------------------------
1045   *		ExecMemoizeEstimate
1046   *
1047   *		Estimate space required to propagate memoize statistics.
1048   * ----------------------------------------------------------------
1049   */
1050 void
ExecMemoizeEstimate(MemoizeState * node,ParallelContext * pcxt)1051 ExecMemoizeEstimate(MemoizeState *node, ParallelContext *pcxt)
1052 {
1053 	Size		size;
1054 
1055 	/* don't need this if not instrumenting or no workers */
1056 	if (!node->ss.ps.instrument || pcxt->nworkers == 0)
1057 		return;
1058 
1059 	size = mul_size(pcxt->nworkers, sizeof(MemoizeInstrumentation));
1060 	size = add_size(size, offsetof(SharedMemoizeInfo, sinstrument));
1061 	shm_toc_estimate_chunk(&pcxt->estimator, size);
1062 	shm_toc_estimate_keys(&pcxt->estimator, 1);
1063 }
1064 
1065 /* ----------------------------------------------------------------
1066  *		ExecMemoizeInitializeDSM
1067  *
1068  *		Initialize DSM space for memoize statistics.
1069  * ----------------------------------------------------------------
1070  */
1071 void
ExecMemoizeInitializeDSM(MemoizeState * node,ParallelContext * pcxt)1072 ExecMemoizeInitializeDSM(MemoizeState *node, ParallelContext *pcxt)
1073 {
1074 	Size		size;
1075 
1076 	/* don't need this if not instrumenting or no workers */
1077 	if (!node->ss.ps.instrument || pcxt->nworkers == 0)
1078 		return;
1079 
1080 	size = offsetof(SharedMemoizeInfo, sinstrument)
1081 		+ pcxt->nworkers * sizeof(MemoizeInstrumentation);
1082 	node->shared_info = shm_toc_allocate(pcxt->toc, size);
1083 	/* ensure any unfilled slots will contain zeroes */
1084 	memset(node->shared_info, 0, size);
1085 	node->shared_info->num_workers = pcxt->nworkers;
1086 	shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id,
1087 				   node->shared_info);
1088 }
1089 
1090 /* ----------------------------------------------------------------
1091  *		ExecMemoizeInitializeWorker
1092  *
1093  *		Attach worker to DSM space for memoize statistics.
1094  * ----------------------------------------------------------------
1095  */
1096 void
ExecMemoizeInitializeWorker(MemoizeState * node,ParallelWorkerContext * pwcxt)1097 ExecMemoizeInitializeWorker(MemoizeState *node, ParallelWorkerContext *pwcxt)
1098 {
1099 	node->shared_info =
1100 		shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, true);
1101 }
1102 
1103 /* ----------------------------------------------------------------
1104  *		ExecMemoizeRetrieveInstrumentation
1105  *
1106  *		Transfer memoize statistics from DSM to private memory.
1107  * ----------------------------------------------------------------
1108  */
1109 void
ExecMemoizeRetrieveInstrumentation(MemoizeState * node)1110 ExecMemoizeRetrieveInstrumentation(MemoizeState *node)
1111 {
1112 	Size		size;
1113 	SharedMemoizeInfo *si;
1114 
1115 	if (node->shared_info == NULL)
1116 		return;
1117 
1118 	size = offsetof(SharedMemoizeInfo, sinstrument)
1119 		+ node->shared_info->num_workers * sizeof(MemoizeInstrumentation);
1120 	si = palloc(size);
1121 	memcpy(si, node->shared_info, size);
1122 	node->shared_info = si;
1123 }
1124