1 /*-------------------------------------------------------------------------
2  *
3  * nodeHash.c
4  *	  Routines to hash relations for hashjoin
5  *
6  * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/executor/nodeHash.c
12  *
13  * See note on parallelism in nodeHashjoin.c.
14  *
15  *-------------------------------------------------------------------------
16  */
17 /*
18  * INTERFACE ROUTINES
19  *		MultiExecHash	- generate an in-memory hash table of the relation
20  *		ExecInitHash	- initialize node and subnodes
21  *		ExecEndHash		- shutdown node and subnodes
22  */
23 
24 #include "postgres.h"
25 
26 #include <math.h>
27 #include <limits.h>
28 
29 #include "access/htup_details.h"
30 #include "access/parallel.h"
31 #include "catalog/pg_statistic.h"
32 #include "commands/tablespace.h"
33 #include "executor/execdebug.h"
34 #include "executor/hashjoin.h"
35 #include "executor/nodeHash.h"
36 #include "executor/nodeHashjoin.h"
37 #include "miscadmin.h"
38 #include "pgstat.h"
39 #include "port/atomics.h"
40 #include "utils/dynahash.h"
41 #include "utils/memutils.h"
42 #include "utils/lsyscache.h"
43 #include "utils/syscache.h"
44 
45 
46 static void ExecHashIncreaseNumBatches(HashJoinTable hashtable);
47 static void ExecHashIncreaseNumBuckets(HashJoinTable hashtable);
48 static void ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable);
49 static void ExecParallelHashIncreaseNumBuckets(HashJoinTable hashtable);
50 static void ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node,
51 					  int mcvsToUse);
52 static void ExecHashSkewTableInsert(HashJoinTable hashtable,
53 						TupleTableSlot *slot,
54 						uint32 hashvalue,
55 						int bucketNumber);
56 static void ExecHashRemoveNextSkewBucket(HashJoinTable hashtable);
57 
58 static void *dense_alloc(HashJoinTable hashtable, Size size);
59 static HashJoinTuple ExecParallelHashTupleAlloc(HashJoinTable hashtable,
60 						   size_t size,
61 						   dsa_pointer *shared);
62 static void MultiExecPrivateHash(HashState *node);
63 static void MultiExecParallelHash(HashState *node);
64 static inline HashJoinTuple ExecParallelHashFirstTuple(HashJoinTable table,
65 						   int bucketno);
66 static inline HashJoinTuple ExecParallelHashNextTuple(HashJoinTable table,
67 						  HashJoinTuple tuple);
68 static inline void ExecParallelHashPushTuple(dsa_pointer_atomic *head,
69 						  HashJoinTuple tuple,
70 						  dsa_pointer tuple_shared);
71 static void ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch);
72 static void ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable);
73 static void ExecParallelHashRepartitionFirst(HashJoinTable hashtable);
74 static void ExecParallelHashRepartitionRest(HashJoinTable hashtable);
75 static HashMemoryChunk ExecParallelHashPopChunkQueue(HashJoinTable table,
76 							  dsa_pointer *shared);
77 static bool ExecParallelHashTuplePrealloc(HashJoinTable hashtable,
78 							  int batchno,
79 							  size_t size);
80 static void ExecParallelHashMergeCounters(HashJoinTable hashtable);
81 static void ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable);
82 
83 
84 /* ----------------------------------------------------------------
85  *		ExecHash
86  *
87  *		stub for pro forma compliance
88  * ----------------------------------------------------------------
89  */
90 static TupleTableSlot *
ExecHash(PlanState * pstate)91 ExecHash(PlanState *pstate)
92 {
93 	elog(ERROR, "Hash node does not support ExecProcNode call convention");
94 	return NULL;
95 }
96 
97 /* ----------------------------------------------------------------
98  *		MultiExecHash
99  *
100  *		build hash table for hashjoin, doing partitioning if more
101  *		than one batch is required.
102  * ----------------------------------------------------------------
103  */
104 Node *
MultiExecHash(HashState * node)105 MultiExecHash(HashState *node)
106 {
107 	/* must provide our own instrumentation support */
108 	if (node->ps.instrument)
109 		InstrStartNode(node->ps.instrument);
110 
111 	if (node->parallel_state != NULL)
112 		MultiExecParallelHash(node);
113 	else
114 		MultiExecPrivateHash(node);
115 
116 	/* must provide our own instrumentation support */
117 	if (node->ps.instrument)
118 		InstrStopNode(node->ps.instrument, node->hashtable->partialTuples);
119 
120 	/*
121 	 * We do not return the hash table directly because it's not a subtype of
122 	 * Node, and so would violate the MultiExecProcNode API.  Instead, our
123 	 * parent Hashjoin node is expected to know how to fish it out of our node
124 	 * state.  Ugly but not really worth cleaning up, since Hashjoin knows
125 	 * quite a bit more about Hash besides that.
126 	 */
127 	return NULL;
128 }
129 
130 /* ----------------------------------------------------------------
131  *		MultiExecPrivateHash
132  *
133  *		parallel-oblivious version, building a backend-private
134  *		hash table and (if necessary) batch files.
135  * ----------------------------------------------------------------
136  */
137 static void
MultiExecPrivateHash(HashState * node)138 MultiExecPrivateHash(HashState *node)
139 {
140 	PlanState  *outerNode;
141 	List	   *hashkeys;
142 	HashJoinTable hashtable;
143 	TupleTableSlot *slot;
144 	ExprContext *econtext;
145 	uint32		hashvalue;
146 
147 	/*
148 	 * get state info from node
149 	 */
150 	outerNode = outerPlanState(node);
151 	hashtable = node->hashtable;
152 
153 	/*
154 	 * set expression context
155 	 */
156 	hashkeys = node->hashkeys;
157 	econtext = node->ps.ps_ExprContext;
158 
159 	/*
160 	 * get all inner tuples and insert into the hash table (or temp files)
161 	 */
162 	for (;;)
163 	{
164 		slot = ExecProcNode(outerNode);
165 		if (TupIsNull(slot))
166 			break;
167 		/* We have to compute the hash value */
168 		econtext->ecxt_innertuple = slot;
169 		if (ExecHashGetHashValue(hashtable, econtext, hashkeys,
170 								 false, hashtable->keepNulls,
171 								 &hashvalue))
172 		{
173 			int			bucketNumber;
174 
175 			bucketNumber = ExecHashGetSkewBucket(hashtable, hashvalue);
176 			if (bucketNumber != INVALID_SKEW_BUCKET_NO)
177 			{
178 				/* It's a skew tuple, so put it into that hash table */
179 				ExecHashSkewTableInsert(hashtable, slot, hashvalue,
180 										bucketNumber);
181 				hashtable->skewTuples += 1;
182 			}
183 			else
184 			{
185 				/* Not subject to skew optimization, so insert normally */
186 				ExecHashTableInsert(hashtable, slot, hashvalue);
187 			}
188 			hashtable->totalTuples += 1;
189 		}
190 	}
191 
192 	/* resize the hash table if needed (NTUP_PER_BUCKET exceeded) */
193 	if (hashtable->nbuckets != hashtable->nbuckets_optimal)
194 		ExecHashIncreaseNumBuckets(hashtable);
195 
196 	/* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */
197 	hashtable->spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple);
198 	if (hashtable->spaceUsed > hashtable->spacePeak)
199 		hashtable->spacePeak = hashtable->spaceUsed;
200 
201 	hashtable->partialTuples = hashtable->totalTuples;
202 }
203 
204 /* ----------------------------------------------------------------
205  *		MultiExecParallelHash
206  *
207  *		parallel-aware version, building a shared hash table and
208  *		(if necessary) batch files using the combined effort of
209  *		a set of co-operating backends.
210  * ----------------------------------------------------------------
211  */
212 static void
MultiExecParallelHash(HashState * node)213 MultiExecParallelHash(HashState *node)
214 {
215 	ParallelHashJoinState *pstate;
216 	PlanState  *outerNode;
217 	List	   *hashkeys;
218 	HashJoinTable hashtable;
219 	TupleTableSlot *slot;
220 	ExprContext *econtext;
221 	uint32		hashvalue;
222 	Barrier    *build_barrier;
223 	int			i;
224 
225 	/*
226 	 * get state info from node
227 	 */
228 	outerNode = outerPlanState(node);
229 	hashtable = node->hashtable;
230 
231 	/*
232 	 * set expression context
233 	 */
234 	hashkeys = node->hashkeys;
235 	econtext = node->ps.ps_ExprContext;
236 
237 	/*
238 	 * Synchronize the parallel hash table build.  At this stage we know that
239 	 * the shared hash table has been or is being set up by
240 	 * ExecHashTableCreate(), but we don't know if our peers have returned
241 	 * from there or are here in MultiExecParallelHash(), and if so how far
242 	 * through they are.  To find out, we check the build_barrier phase then
243 	 * and jump to the right step in the build algorithm.
244 	 */
245 	pstate = hashtable->parallel_state;
246 	build_barrier = &pstate->build_barrier;
247 	Assert(BarrierPhase(build_barrier) >= PHJ_BUILD_ALLOCATING);
248 	switch (BarrierPhase(build_barrier))
249 	{
250 		case PHJ_BUILD_ALLOCATING:
251 
252 			/*
253 			 * Either I just allocated the initial hash table in
254 			 * ExecHashTableCreate(), or someone else is doing that.  Either
255 			 * way, wait for everyone to arrive here so we can proceed.
256 			 */
257 			BarrierArriveAndWait(build_barrier, WAIT_EVENT_HASH_BUILD_ALLOCATING);
258 			/* Fall through. */
259 
260 		case PHJ_BUILD_HASHING_INNER:
261 
262 			/*
263 			 * It's time to begin hashing, or if we just arrived here then
264 			 * hashing is already underway, so join in that effort.  While
265 			 * hashing we have to be prepared to help increase the number of
266 			 * batches or buckets at any time, and if we arrived here when
267 			 * that was already underway we'll have to help complete that work
268 			 * immediately so that it's safe to access batches and buckets
269 			 * below.
270 			 */
271 			if (PHJ_GROW_BATCHES_PHASE(BarrierAttach(&pstate->grow_batches_barrier)) !=
272 				PHJ_GROW_BATCHES_ELECTING)
273 				ExecParallelHashIncreaseNumBatches(hashtable);
274 			if (PHJ_GROW_BUCKETS_PHASE(BarrierAttach(&pstate->grow_buckets_barrier)) !=
275 				PHJ_GROW_BUCKETS_ELECTING)
276 				ExecParallelHashIncreaseNumBuckets(hashtable);
277 			ExecParallelHashEnsureBatchAccessors(hashtable);
278 			ExecParallelHashTableSetCurrentBatch(hashtable, 0);
279 			for (;;)
280 			{
281 				slot = ExecProcNode(outerNode);
282 				if (TupIsNull(slot))
283 					break;
284 				econtext->ecxt_innertuple = slot;
285 				if (ExecHashGetHashValue(hashtable, econtext, hashkeys,
286 										 false, hashtable->keepNulls,
287 										 &hashvalue))
288 					ExecParallelHashTableInsert(hashtable, slot, hashvalue);
289 				hashtable->partialTuples++;
290 			}
291 
292 			/*
293 			 * Make sure that any tuples we wrote to disk are visible to
294 			 * others before anyone tries to load them.
295 			 */
296 			for (i = 0; i < hashtable->nbatch; ++i)
297 				sts_end_write(hashtable->batches[i].inner_tuples);
298 
299 			/*
300 			 * Update shared counters.  We need an accurate total tuple count
301 			 * to control the empty table optimization.
302 			 */
303 			ExecParallelHashMergeCounters(hashtable);
304 
305 			BarrierDetach(&pstate->grow_buckets_barrier);
306 			BarrierDetach(&pstate->grow_batches_barrier);
307 
308 			/*
309 			 * Wait for everyone to finish building and flushing files and
310 			 * counters.
311 			 */
312 			if (BarrierArriveAndWait(build_barrier,
313 									 WAIT_EVENT_HASH_BUILD_HASHING_INNER))
314 			{
315 				/*
316 				 * Elect one backend to disable any further growth.  Batches
317 				 * are now fixed.  While building them we made sure they'd fit
318 				 * in our memory budget when we load them back in later (or we
319 				 * tried to do that and gave up because we detected extreme
320 				 * skew).
321 				 */
322 				pstate->growth = PHJ_GROWTH_DISABLED;
323 			}
324 	}
325 
326 	/*
327 	 * We're not yet attached to a batch.  We all agree on the dimensions and
328 	 * number of inner tuples (for the empty table optimization).
329 	 */
330 	hashtable->curbatch = -1;
331 	hashtable->nbuckets = pstate->nbuckets;
332 	hashtable->log2_nbuckets = my_log2(hashtable->nbuckets);
333 	hashtable->totalTuples = pstate->total_tuples;
334 	ExecParallelHashEnsureBatchAccessors(hashtable);
335 
336 	/*
337 	 * The next synchronization point is in ExecHashJoin's HJ_BUILD_HASHTABLE
338 	 * case, which will bring the build phase to PHJ_BUILD_DONE (if it isn't
339 	 * there already).
340 	 */
341 	Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER ||
342 		   BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
343 }
344 
345 /* ----------------------------------------------------------------
346  *		ExecInitHash
347  *
348  *		Init routine for Hash node
349  * ----------------------------------------------------------------
350  */
351 HashState *
ExecInitHash(Hash * node,EState * estate,int eflags)352 ExecInitHash(Hash *node, EState *estate, int eflags)
353 {
354 	HashState  *hashstate;
355 
356 	/* check for unsupported flags */
357 	Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
358 
359 	/*
360 	 * create state structure
361 	 */
362 	hashstate = makeNode(HashState);
363 	hashstate->ps.plan = (Plan *) node;
364 	hashstate->ps.state = estate;
365 	hashstate->ps.ExecProcNode = ExecHash;
366 	hashstate->hashtable = NULL;
367 	hashstate->hashkeys = NIL;	/* will be set by parent HashJoin */
368 
369 	/*
370 	 * Miscellaneous initialization
371 	 *
372 	 * create expression context for node
373 	 */
374 	ExecAssignExprContext(estate, &hashstate->ps);
375 
376 	/*
377 	 * initialize child nodes
378 	 */
379 	outerPlanState(hashstate) = ExecInitNode(outerPlan(node), estate, eflags);
380 
381 	/*
382 	 * initialize our result slot and type. No need to build projection
383 	 * because this node doesn't do projections.
384 	 */
385 	ExecInitResultTupleSlotTL(estate, &hashstate->ps);
386 	hashstate->ps.ps_ProjInfo = NULL;
387 
388 	/*
389 	 * initialize child expressions
390 	 */
391 	hashstate->ps.qual =
392 		ExecInitQual(node->plan.qual, (PlanState *) hashstate);
393 
394 	return hashstate;
395 }
396 
397 /* ---------------------------------------------------------------
398  *		ExecEndHash
399  *
400  *		clean up routine for Hash node
401  * ----------------------------------------------------------------
402  */
403 void
ExecEndHash(HashState * node)404 ExecEndHash(HashState *node)
405 {
406 	PlanState  *outerPlan;
407 
408 	/*
409 	 * free exprcontext
410 	 */
411 	ExecFreeExprContext(&node->ps);
412 
413 	/*
414 	 * shut down the subplan
415 	 */
416 	outerPlan = outerPlanState(node);
417 	ExecEndNode(outerPlan);
418 }
419 
420 
421 /* ----------------------------------------------------------------
422  *		ExecHashTableCreate
423  *
424  *		create an empty hashtable data structure for hashjoin.
425  * ----------------------------------------------------------------
426  */
427 HashJoinTable
ExecHashTableCreate(HashState * state,List * hashOperators,bool keepNulls)428 ExecHashTableCreate(HashState *state, List *hashOperators, bool keepNulls)
429 {
430 	Hash	   *node;
431 	HashJoinTable hashtable;
432 	Plan	   *outerNode;
433 	size_t		space_allowed;
434 	int			nbuckets;
435 	int			nbatch;
436 	double		rows;
437 	int			num_skew_mcvs;
438 	int			log2_nbuckets;
439 	int			nkeys;
440 	int			i;
441 	ListCell   *ho;
442 	MemoryContext oldcxt;
443 
444 	/*
445 	 * Get information about the size of the relation to be hashed (it's the
446 	 * "outer" subtree of this node, but the inner relation of the hashjoin).
447 	 * Compute the appropriate size of the hash table.
448 	 */
449 	node = (Hash *) state->ps.plan;
450 	outerNode = outerPlan(node);
451 
452 	/*
453 	 * If this is shared hash table with a partial plan, then we can't use
454 	 * outerNode->plan_rows to estimate its size.  We need an estimate of the
455 	 * total number of rows across all copies of the partial plan.
456 	 */
457 	rows = node->plan.parallel_aware ? node->rows_total : outerNode->plan_rows;
458 
459 	ExecChooseHashTableSize(rows, outerNode->plan_width,
460 							OidIsValid(node->skewTable),
461 							state->parallel_state != NULL,
462 							state->parallel_state != NULL ?
463 							state->parallel_state->nparticipants - 1 : 0,
464 							&space_allowed,
465 							&nbuckets, &nbatch, &num_skew_mcvs);
466 
467 	/* nbuckets must be a power of 2 */
468 	log2_nbuckets = my_log2(nbuckets);
469 	Assert(nbuckets == (1 << log2_nbuckets));
470 
471 	/*
472 	 * Initialize the hash table control block.
473 	 *
474 	 * The hashtable control block is just palloc'd from the executor's
475 	 * per-query memory context.  Everything else should be kept inside the
476 	 * subsidiary hashCxt or batchCxt.
477 	 */
478 	hashtable = (HashJoinTable) palloc(sizeof(HashJoinTableData));
479 	hashtable->nbuckets = nbuckets;
480 	hashtable->nbuckets_original = nbuckets;
481 	hashtable->nbuckets_optimal = nbuckets;
482 	hashtable->log2_nbuckets = log2_nbuckets;
483 	hashtable->log2_nbuckets_optimal = log2_nbuckets;
484 	hashtable->buckets.unshared = NULL;
485 	hashtable->keepNulls = keepNulls;
486 	hashtable->skewEnabled = false;
487 	hashtable->skewBucket = NULL;
488 	hashtable->skewBucketLen = 0;
489 	hashtable->nSkewBuckets = 0;
490 	hashtable->skewBucketNums = NULL;
491 	hashtable->nbatch = nbatch;
492 	hashtable->curbatch = 0;
493 	hashtable->nbatch_original = nbatch;
494 	hashtable->nbatch_outstart = nbatch;
495 	hashtable->growEnabled = true;
496 	hashtable->totalTuples = 0;
497 	hashtable->partialTuples = 0;
498 	hashtable->skewTuples = 0;
499 	hashtable->innerBatchFile = NULL;
500 	hashtable->outerBatchFile = NULL;
501 	hashtable->spaceUsed = 0;
502 	hashtable->spacePeak = 0;
503 	hashtable->spaceAllowed = space_allowed;
504 	hashtable->spaceUsedSkew = 0;
505 	hashtable->spaceAllowedSkew =
506 		hashtable->spaceAllowed * SKEW_WORK_MEM_PERCENT / 100;
507 	hashtable->chunks = NULL;
508 	hashtable->current_chunk = NULL;
509 	hashtable->parallel_state = state->parallel_state;
510 	hashtable->area = state->ps.state->es_query_dsa;
511 	hashtable->batches = NULL;
512 
513 #ifdef HJDEBUG
514 	printf("Hashjoin %p: initial nbatch = %d, nbuckets = %d\n",
515 		   hashtable, nbatch, nbuckets);
516 #endif
517 
518 	/*
519 	 * Create temporary memory contexts in which to keep the hashtable working
520 	 * storage.  See notes in executor/hashjoin.h.
521 	 */
522 	hashtable->hashCxt = AllocSetContextCreate(CurrentMemoryContext,
523 											   "HashTableContext",
524 											   ALLOCSET_DEFAULT_SIZES);
525 
526 	hashtable->batchCxt = AllocSetContextCreate(hashtable->hashCxt,
527 												"HashBatchContext",
528 												ALLOCSET_DEFAULT_SIZES);
529 
530 	/* Allocate data that will live for the life of the hashjoin */
531 
532 	oldcxt = MemoryContextSwitchTo(hashtable->hashCxt);
533 
534 	/*
535 	 * Get info about the hash functions to be used for each hash key. Also
536 	 * remember whether the join operators are strict.
537 	 */
538 	nkeys = list_length(hashOperators);
539 	hashtable->outer_hashfunctions =
540 		(FmgrInfo *) palloc(nkeys * sizeof(FmgrInfo));
541 	hashtable->inner_hashfunctions =
542 		(FmgrInfo *) palloc(nkeys * sizeof(FmgrInfo));
543 	hashtable->hashStrict = (bool *) palloc(nkeys * sizeof(bool));
544 	i = 0;
545 	foreach(ho, hashOperators)
546 	{
547 		Oid			hashop = lfirst_oid(ho);
548 		Oid			left_hashfn;
549 		Oid			right_hashfn;
550 
551 		if (!get_op_hash_functions(hashop, &left_hashfn, &right_hashfn))
552 			elog(ERROR, "could not find hash function for hash operator %u",
553 				 hashop);
554 		fmgr_info(left_hashfn, &hashtable->outer_hashfunctions[i]);
555 		fmgr_info(right_hashfn, &hashtable->inner_hashfunctions[i]);
556 		hashtable->hashStrict[i] = op_strict(hashop);
557 		i++;
558 	}
559 
560 	if (nbatch > 1 && hashtable->parallel_state == NULL)
561 	{
562 		/*
563 		 * allocate and initialize the file arrays in hashCxt (not needed for
564 		 * parallel case which uses shared tuplestores instead of raw files)
565 		 */
566 		hashtable->innerBatchFile = (BufFile **)
567 			palloc0(nbatch * sizeof(BufFile *));
568 		hashtable->outerBatchFile = (BufFile **)
569 			palloc0(nbatch * sizeof(BufFile *));
570 		/* The files will not be opened until needed... */
571 		/* ... but make sure we have temp tablespaces established for them */
572 		PrepareTempTablespaces();
573 	}
574 
575 	MemoryContextSwitchTo(oldcxt);
576 
577 	if (hashtable->parallel_state)
578 	{
579 		ParallelHashJoinState *pstate = hashtable->parallel_state;
580 		Barrier    *build_barrier;
581 
582 		/*
583 		 * Attach to the build barrier.  The corresponding detach operation is
584 		 * in ExecHashTableDetach.  Note that we won't attach to the
585 		 * batch_barrier for batch 0 yet.  We'll attach later and start it out
586 		 * in PHJ_BATCH_PROBING phase, because batch 0 is allocated up front
587 		 * and then loaded while hashing (the standard hybrid hash join
588 		 * algorithm), and we'll coordinate that using build_barrier.
589 		 */
590 		build_barrier = &pstate->build_barrier;
591 		BarrierAttach(build_barrier);
592 
593 		/*
594 		 * So far we have no idea whether there are any other participants,
595 		 * and if so, what phase they are working on.  The only thing we care
596 		 * about at this point is whether someone has already created the
597 		 * SharedHashJoinBatch objects and the hash table for batch 0.  One
598 		 * backend will be elected to do that now if necessary.
599 		 */
600 		if (BarrierPhase(build_barrier) == PHJ_BUILD_ELECTING &&
601 			BarrierArriveAndWait(build_barrier, WAIT_EVENT_HASH_BUILD_ELECTING))
602 		{
603 			pstate->nbatch = nbatch;
604 			pstate->space_allowed = space_allowed;
605 			pstate->growth = PHJ_GROWTH_OK;
606 
607 			/* Set up the shared state for coordinating batches. */
608 			ExecParallelHashJoinSetUpBatches(hashtable, nbatch);
609 
610 			/*
611 			 * Allocate batch 0's hash table up front so we can load it
612 			 * directly while hashing.
613 			 */
614 			pstate->nbuckets = nbuckets;
615 			ExecParallelHashTableAlloc(hashtable, 0);
616 		}
617 
618 		/*
619 		 * The next Parallel Hash synchronization point is in
620 		 * MultiExecParallelHash(), which will progress it all the way to
621 		 * PHJ_BUILD_DONE.  The caller must not return control from this
622 		 * executor node between now and then.
623 		 */
624 	}
625 	else
626 	{
627 		/*
628 		 * Prepare context for the first-scan space allocations; allocate the
629 		 * hashbucket array therein, and set each bucket "empty".
630 		 */
631 		MemoryContextSwitchTo(hashtable->batchCxt);
632 
633 		hashtable->buckets.unshared = (HashJoinTuple *)
634 			palloc0(nbuckets * sizeof(HashJoinTuple));
635 
636 		/*
637 		 * Set up for skew optimization, if possible and there's a need for
638 		 * more than one batch.  (In a one-batch join, there's no point in
639 		 * it.)
640 		 */
641 		if (nbatch > 1)
642 			ExecHashBuildSkewHash(hashtable, node, num_skew_mcvs);
643 
644 		MemoryContextSwitchTo(oldcxt);
645 	}
646 
647 	return hashtable;
648 }
649 
650 
651 /*
652  * Compute appropriate size for hashtable given the estimated size of the
653  * relation to be hashed (number of rows and average row width).
654  *
655  * This is exported so that the planner's costsize.c can use it.
656  */
657 
658 /* Target bucket loading (tuples per bucket) */
659 #define NTUP_PER_BUCKET			1
660 
661 void
ExecChooseHashTableSize(double ntuples,int tupwidth,bool useskew,bool try_combined_work_mem,int parallel_workers,size_t * space_allowed,int * numbuckets,int * numbatches,int * num_skew_mcvs)662 ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
663 						bool try_combined_work_mem,
664 						int parallel_workers,
665 						size_t *space_allowed,
666 						int *numbuckets,
667 						int *numbatches,
668 						int *num_skew_mcvs)
669 {
670 	int			tupsize;
671 	double		inner_rel_bytes;
672 	long		bucket_bytes;
673 	long		hash_table_bytes;
674 	long		skew_table_bytes;
675 	long		max_pointers;
676 	long		mppow2;
677 	int			nbatch = 1;
678 	int			nbuckets;
679 	double		dbuckets;
680 
681 	/* Force a plausible relation size if no info */
682 	if (ntuples <= 0.0)
683 		ntuples = 1000.0;
684 
685 	/*
686 	 * Estimate tupsize based on footprint of tuple in hashtable... note this
687 	 * does not allow for any palloc overhead.  The manipulations of spaceUsed
688 	 * don't count palloc overhead either.
689 	 */
690 	tupsize = HJTUPLE_OVERHEAD +
691 		MAXALIGN(SizeofMinimalTupleHeader) +
692 		MAXALIGN(tupwidth);
693 	inner_rel_bytes = ntuples * tupsize;
694 
695 	/*
696 	 * Target in-memory hashtable size is work_mem kilobytes.
697 	 */
698 	hash_table_bytes = work_mem * 1024L;
699 
700 	/*
701 	 * Parallel Hash tries to use the combined work_mem of all workers to
702 	 * avoid the need to batch.  If that won't work, it falls back to work_mem
703 	 * per worker and tries to process batches in parallel.
704 	 */
705 	if (try_combined_work_mem)
706 		hash_table_bytes += hash_table_bytes * parallel_workers;
707 
708 	*space_allowed = hash_table_bytes;
709 
710 	/*
711 	 * If skew optimization is possible, estimate the number of skew buckets
712 	 * that will fit in the memory allowed, and decrement the assumed space
713 	 * available for the main hash table accordingly.
714 	 *
715 	 * We make the optimistic assumption that each skew bucket will contain
716 	 * one inner-relation tuple.  If that turns out to be low, we will recover
717 	 * at runtime by reducing the number of skew buckets.
718 	 *
719 	 * hashtable->skewBucket will have up to 8 times as many HashSkewBucket
720 	 * pointers as the number of MCVs we allow, since ExecHashBuildSkewHash
721 	 * will round up to the next power of 2 and then multiply by 4 to reduce
722 	 * collisions.
723 	 */
724 	if (useskew)
725 	{
726 		skew_table_bytes = hash_table_bytes * SKEW_WORK_MEM_PERCENT / 100;
727 
728 		/*----------
729 		 * Divisor is:
730 		 * size of a hash tuple +
731 		 * worst-case size of skewBucket[] per MCV +
732 		 * size of skewBucketNums[] entry +
733 		 * size of skew bucket struct itself
734 		 *----------
735 		 */
736 		*num_skew_mcvs = skew_table_bytes / (tupsize +
737 											 (8 * sizeof(HashSkewBucket *)) +
738 											 sizeof(int) +
739 											 SKEW_BUCKET_OVERHEAD);
740 		if (*num_skew_mcvs > 0)
741 			hash_table_bytes -= skew_table_bytes;
742 	}
743 	else
744 		*num_skew_mcvs = 0;
745 
746 	/*
747 	 * Set nbuckets to achieve an average bucket load of NTUP_PER_BUCKET when
748 	 * memory is filled, assuming a single batch; but limit the value so that
749 	 * the pointer arrays we'll try to allocate do not exceed work_mem nor
750 	 * MaxAllocSize.
751 	 *
752 	 * Note that both nbuckets and nbatch must be powers of 2 to make
753 	 * ExecHashGetBucketAndBatch fast.
754 	 */
755 	max_pointers = *space_allowed / sizeof(HashJoinTuple);
756 	max_pointers = Min(max_pointers, MaxAllocSize / sizeof(HashJoinTuple));
757 	/* If max_pointers isn't a power of 2, must round it down to one */
758 	mppow2 = 1L << my_log2(max_pointers);
759 	if (max_pointers != mppow2)
760 		max_pointers = mppow2 / 2;
761 
762 	/* Also ensure we avoid integer overflow in nbatch and nbuckets */
763 	/* (this step is redundant given the current value of MaxAllocSize) */
764 	max_pointers = Min(max_pointers, INT_MAX / 2);
765 
766 	dbuckets = ceil(ntuples / NTUP_PER_BUCKET);
767 	dbuckets = Min(dbuckets, max_pointers);
768 	nbuckets = (int) dbuckets;
769 	/* don't let nbuckets be really small, though ... */
770 	nbuckets = Max(nbuckets, 1024);
771 	/* ... and force it to be a power of 2. */
772 	nbuckets = 1 << my_log2(nbuckets);
773 
774 	/*
775 	 * If there's not enough space to store the projected number of tuples and
776 	 * the required bucket headers, we will need multiple batches.
777 	 */
778 	bucket_bytes = sizeof(HashJoinTuple) * nbuckets;
779 	if (inner_rel_bytes + bucket_bytes > hash_table_bytes)
780 	{
781 		/* We'll need multiple batches */
782 		long		lbuckets;
783 		double		dbatch;
784 		int			minbatch;
785 		long		bucket_size;
786 
787 		/*
788 		 * If Parallel Hash with combined work_mem would still need multiple
789 		 * batches, we'll have to fall back to regular work_mem budget.
790 		 */
791 		if (try_combined_work_mem)
792 		{
793 			ExecChooseHashTableSize(ntuples, tupwidth, useskew,
794 									false, parallel_workers,
795 									space_allowed,
796 									numbuckets,
797 									numbatches,
798 									num_skew_mcvs);
799 			return;
800 		}
801 
802 		/*
803 		 * Estimate the number of buckets we'll want to have when work_mem is
804 		 * entirely full.  Each bucket will contain a bucket pointer plus
805 		 * NTUP_PER_BUCKET tuples, whose projected size already includes
806 		 * overhead for the hash code, pointer to the next tuple, etc.
807 		 */
808 		bucket_size = (tupsize * NTUP_PER_BUCKET + sizeof(HashJoinTuple));
809 		lbuckets = 1L << my_log2(hash_table_bytes / bucket_size);
810 		lbuckets = Min(lbuckets, max_pointers);
811 		nbuckets = (int) lbuckets;
812 		nbuckets = 1 << my_log2(nbuckets);
813 		bucket_bytes = nbuckets * sizeof(HashJoinTuple);
814 
815 		/*
816 		 * Buckets are simple pointers to hashjoin tuples, while tupsize
817 		 * includes the pointer, hash code, and MinimalTupleData.  So buckets
818 		 * should never really exceed 25% of work_mem (even for
819 		 * NTUP_PER_BUCKET=1); except maybe for work_mem values that are not
820 		 * 2^N bytes, where we might get more because of doubling. So let's
821 		 * look for 50% here.
822 		 */
823 		Assert(bucket_bytes <= hash_table_bytes / 2);
824 
825 		/* Calculate required number of batches. */
826 		dbatch = ceil(inner_rel_bytes / (hash_table_bytes - bucket_bytes));
827 		dbatch = Min(dbatch, max_pointers);
828 		minbatch = (int) dbatch;
829 		nbatch = 2;
830 		while (nbatch < minbatch)
831 			nbatch <<= 1;
832 	}
833 
834 	Assert(nbuckets > 0);
835 	Assert(nbatch > 0);
836 
837 	*numbuckets = nbuckets;
838 	*numbatches = nbatch;
839 }
840 
841 
842 /* ----------------------------------------------------------------
843  *		ExecHashTableDestroy
844  *
845  *		destroy a hash table
846  * ----------------------------------------------------------------
847  */
848 void
ExecHashTableDestroy(HashJoinTable hashtable)849 ExecHashTableDestroy(HashJoinTable hashtable)
850 {
851 	int			i;
852 
853 	/*
854 	 * Make sure all the temp files are closed.  We skip batch 0, since it
855 	 * can't have any temp files (and the arrays might not even exist if
856 	 * nbatch is only 1).  Parallel hash joins don't use these files.
857 	 */
858 	if (hashtable->innerBatchFile != NULL)
859 	{
860 		for (i = 1; i < hashtable->nbatch; i++)
861 		{
862 			if (hashtable->innerBatchFile[i])
863 				BufFileClose(hashtable->innerBatchFile[i]);
864 			if (hashtable->outerBatchFile[i])
865 				BufFileClose(hashtable->outerBatchFile[i]);
866 		}
867 	}
868 
869 	/* Release working memory (batchCxt is a child, so it goes away too) */
870 	MemoryContextDelete(hashtable->hashCxt);
871 
872 	/* And drop the control block */
873 	pfree(hashtable);
874 }
875 
876 /*
877  * ExecHashIncreaseNumBatches
878  *		increase the original number of batches in order to reduce
879  *		current memory consumption
880  */
881 static void
ExecHashIncreaseNumBatches(HashJoinTable hashtable)882 ExecHashIncreaseNumBatches(HashJoinTable hashtable)
883 {
884 	int			oldnbatch = hashtable->nbatch;
885 	int			curbatch = hashtable->curbatch;
886 	int			nbatch;
887 	MemoryContext oldcxt;
888 	long		ninmemory;
889 	long		nfreed;
890 	HashMemoryChunk oldchunks;
891 
892 	/* do nothing if we've decided to shut off growth */
893 	if (!hashtable->growEnabled)
894 		return;
895 
896 	/* safety check to avoid overflow */
897 	if (oldnbatch > Min(INT_MAX / 2, MaxAllocSize / (sizeof(void *) * 2)))
898 		return;
899 
900 	nbatch = oldnbatch * 2;
901 	Assert(nbatch > 1);
902 
903 #ifdef HJDEBUG
904 	printf("Hashjoin %p: increasing nbatch to %d because space = %zu\n",
905 		   hashtable, nbatch, hashtable->spaceUsed);
906 #endif
907 
908 	oldcxt = MemoryContextSwitchTo(hashtable->hashCxt);
909 
910 	if (hashtable->innerBatchFile == NULL)
911 	{
912 		/* we had no file arrays before */
913 		hashtable->innerBatchFile = (BufFile **)
914 			palloc0(nbatch * sizeof(BufFile *));
915 		hashtable->outerBatchFile = (BufFile **)
916 			palloc0(nbatch * sizeof(BufFile *));
917 		/* time to establish the temp tablespaces, too */
918 		PrepareTempTablespaces();
919 	}
920 	else
921 	{
922 		/* enlarge arrays and zero out added entries */
923 		hashtable->innerBatchFile = (BufFile **)
924 			repalloc(hashtable->innerBatchFile, nbatch * sizeof(BufFile *));
925 		hashtable->outerBatchFile = (BufFile **)
926 			repalloc(hashtable->outerBatchFile, nbatch * sizeof(BufFile *));
927 		MemSet(hashtable->innerBatchFile + oldnbatch, 0,
928 			   (nbatch - oldnbatch) * sizeof(BufFile *));
929 		MemSet(hashtable->outerBatchFile + oldnbatch, 0,
930 			   (nbatch - oldnbatch) * sizeof(BufFile *));
931 	}
932 
933 	MemoryContextSwitchTo(oldcxt);
934 
935 	hashtable->nbatch = nbatch;
936 
937 	/*
938 	 * Scan through the existing hash table entries and dump out any that are
939 	 * no longer of the current batch.
940 	 */
941 	ninmemory = nfreed = 0;
942 
943 	/* If know we need to resize nbuckets, we can do it while rebatching. */
944 	if (hashtable->nbuckets_optimal != hashtable->nbuckets)
945 	{
946 		/* we never decrease the number of buckets */
947 		Assert(hashtable->nbuckets_optimal > hashtable->nbuckets);
948 
949 		hashtable->nbuckets = hashtable->nbuckets_optimal;
950 		hashtable->log2_nbuckets = hashtable->log2_nbuckets_optimal;
951 
952 		hashtable->buckets.unshared =
953 			repalloc(hashtable->buckets.unshared,
954 					 sizeof(HashJoinTuple) * hashtable->nbuckets);
955 	}
956 
957 	/*
958 	 * We will scan through the chunks directly, so that we can reset the
959 	 * buckets now and not have to keep track which tuples in the buckets have
960 	 * already been processed. We will free the old chunks as we go.
961 	 */
962 	memset(hashtable->buckets.unshared, 0,
963 		   sizeof(HashJoinTuple) * hashtable->nbuckets);
964 	oldchunks = hashtable->chunks;
965 	hashtable->chunks = NULL;
966 
967 	/* so, let's scan through the old chunks, and all tuples in each chunk */
968 	while (oldchunks != NULL)
969 	{
970 		HashMemoryChunk nextchunk = oldchunks->next.unshared;
971 
972 		/* position within the buffer (up to oldchunks->used) */
973 		size_t		idx = 0;
974 
975 		/* process all tuples stored in this chunk (and then free it) */
976 		while (idx < oldchunks->used)
977 		{
978 			HashJoinTuple hashTuple = (HashJoinTuple) (HASH_CHUNK_DATA(oldchunks) + idx);
979 			MinimalTuple tuple = HJTUPLE_MINTUPLE(hashTuple);
980 			int			hashTupleSize = (HJTUPLE_OVERHEAD + tuple->t_len);
981 			int			bucketno;
982 			int			batchno;
983 
984 			ninmemory++;
985 			ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue,
986 									  &bucketno, &batchno);
987 
988 			if (batchno == curbatch)
989 			{
990 				/* keep tuple in memory - copy it into the new chunk */
991 				HashJoinTuple copyTuple;
992 
993 				copyTuple = (HashJoinTuple) dense_alloc(hashtable, hashTupleSize);
994 				memcpy(copyTuple, hashTuple, hashTupleSize);
995 
996 				/* and add it back to the appropriate bucket */
997 				copyTuple->next.unshared = hashtable->buckets.unshared[bucketno];
998 				hashtable->buckets.unshared[bucketno] = copyTuple;
999 			}
1000 			else
1001 			{
1002 				/* dump it out */
1003 				Assert(batchno > curbatch);
1004 				ExecHashJoinSaveTuple(HJTUPLE_MINTUPLE(hashTuple),
1005 									  hashTuple->hashvalue,
1006 									  &hashtable->innerBatchFile[batchno]);
1007 
1008 				hashtable->spaceUsed -= hashTupleSize;
1009 				nfreed++;
1010 			}
1011 
1012 			/* next tuple in this chunk */
1013 			idx += MAXALIGN(hashTupleSize);
1014 
1015 			/* allow this loop to be cancellable */
1016 			CHECK_FOR_INTERRUPTS();
1017 		}
1018 
1019 		/* we're done with this chunk - free it and proceed to the next one */
1020 		pfree(oldchunks);
1021 		oldchunks = nextchunk;
1022 	}
1023 
1024 #ifdef HJDEBUG
1025 	printf("Hashjoin %p: freed %ld of %ld tuples, space now %zu\n",
1026 		   hashtable, nfreed, ninmemory, hashtable->spaceUsed);
1027 #endif
1028 
1029 	/*
1030 	 * If we dumped out either all or none of the tuples in the table, disable
1031 	 * further expansion of nbatch.  This situation implies that we have
1032 	 * enough tuples of identical hashvalues to overflow spaceAllowed.
1033 	 * Increasing nbatch will not fix it since there's no way to subdivide the
1034 	 * group any more finely. We have to just gut it out and hope the server
1035 	 * has enough RAM.
1036 	 */
1037 	if (nfreed == 0 || nfreed == ninmemory)
1038 	{
1039 		hashtable->growEnabled = false;
1040 #ifdef HJDEBUG
1041 		printf("Hashjoin %p: disabling further increase of nbatch\n",
1042 			   hashtable);
1043 #endif
1044 	}
1045 }
1046 
1047 /*
1048  * ExecParallelHashIncreaseNumBatches
1049  *		Every participant attached to grow_barrier must run this function
1050  *		when it observes growth == PHJ_GROWTH_NEED_MORE_BATCHES.
1051  */
1052 static void
ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable)1053 ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable)
1054 {
1055 	ParallelHashJoinState *pstate = hashtable->parallel_state;
1056 	int			i;
1057 
1058 	Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_HASHING_INNER);
1059 
1060 	/*
1061 	 * It's unlikely, but we need to be prepared for new participants to show
1062 	 * up while we're in the middle of this operation so we need to switch on
1063 	 * barrier phase here.
1064 	 */
1065 	switch (PHJ_GROW_BATCHES_PHASE(BarrierPhase(&pstate->grow_batches_barrier)))
1066 	{
1067 		case PHJ_GROW_BATCHES_ELECTING:
1068 
1069 			/*
1070 			 * Elect one participant to prepare to grow the number of batches.
1071 			 * This involves reallocating or resetting the buckets of batch 0
1072 			 * in preparation for all participants to begin repartitioning the
1073 			 * tuples.
1074 			 */
1075 			if (BarrierArriveAndWait(&pstate->grow_batches_barrier,
1076 									 WAIT_EVENT_HASH_GROW_BATCHES_ELECTING))
1077 			{
1078 				dsa_pointer_atomic *buckets;
1079 				ParallelHashJoinBatch *old_batch0;
1080 				int			new_nbatch;
1081 				int			i;
1082 
1083 				/* Move the old batch out of the way. */
1084 				old_batch0 = hashtable->batches[0].shared;
1085 				pstate->old_batches = pstate->batches;
1086 				pstate->old_nbatch = hashtable->nbatch;
1087 				pstate->batches = InvalidDsaPointer;
1088 
1089 				/* Free this backend's old accessors. */
1090 				ExecParallelHashCloseBatchAccessors(hashtable);
1091 
1092 				/* Figure out how many batches to use. */
1093 				if (hashtable->nbatch == 1)
1094 				{
1095 					/*
1096 					 * We are going from single-batch to multi-batch.  We need
1097 					 * to switch from one large combined memory budget to the
1098 					 * regular work_mem budget.
1099 					 */
1100 					pstate->space_allowed = work_mem * 1024L;
1101 
1102 					/*
1103 					 * The combined work_mem of all participants wasn't
1104 					 * enough. Therefore one batch per participant would be
1105 					 * approximately equivalent and would probably also be
1106 					 * insufficient.  So try two batches per particiant,
1107 					 * rounded up to a power of two.
1108 					 */
1109 					new_nbatch = 1 << my_log2(pstate->nparticipants * 2);
1110 				}
1111 				else
1112 				{
1113 					/*
1114 					 * We were already multi-batched.  Try doubling the number
1115 					 * of batches.
1116 					 */
1117 					new_nbatch = hashtable->nbatch * 2;
1118 				}
1119 
1120 				/* Allocate new larger generation of batches. */
1121 				Assert(hashtable->nbatch == pstate->nbatch);
1122 				ExecParallelHashJoinSetUpBatches(hashtable, new_nbatch);
1123 				Assert(hashtable->nbatch == pstate->nbatch);
1124 
1125 				/* Replace or recycle batch 0's bucket array. */
1126 				if (pstate->old_nbatch == 1)
1127 				{
1128 					double		dtuples;
1129 					double		dbuckets;
1130 					int			new_nbuckets;
1131 
1132 					/*
1133 					 * We probably also need a smaller bucket array.  How many
1134 					 * tuples do we expect per batch, assuming we have only
1135 					 * half of them so far?  Normally we don't need to change
1136 					 * the bucket array's size, because the size of each batch
1137 					 * stays the same as we add more batches, but in this
1138 					 * special case we move from a large batch to many smaller
1139 					 * batches and it would be wasteful to keep the large
1140 					 * array.
1141 					 */
1142 					dtuples = (old_batch0->ntuples * 2.0) / new_nbatch;
1143 					dbuckets = ceil(dtuples / NTUP_PER_BUCKET);
1144 					dbuckets = Min(dbuckets,
1145 								   MaxAllocSize / sizeof(dsa_pointer_atomic));
1146 					new_nbuckets = (int) dbuckets;
1147 					new_nbuckets = Max(new_nbuckets, 1024);
1148 					new_nbuckets = 1 << my_log2(new_nbuckets);
1149 					dsa_free(hashtable->area, old_batch0->buckets);
1150 					hashtable->batches[0].shared->buckets =
1151 						dsa_allocate(hashtable->area,
1152 									 sizeof(dsa_pointer_atomic) * new_nbuckets);
1153 					buckets = (dsa_pointer_atomic *)
1154 						dsa_get_address(hashtable->area,
1155 										hashtable->batches[0].shared->buckets);
1156 					for (i = 0; i < new_nbuckets; ++i)
1157 						dsa_pointer_atomic_init(&buckets[i], InvalidDsaPointer);
1158 					pstate->nbuckets = new_nbuckets;
1159 				}
1160 				else
1161 				{
1162 					/* Recycle the existing bucket array. */
1163 					hashtable->batches[0].shared->buckets = old_batch0->buckets;
1164 					buckets = (dsa_pointer_atomic *)
1165 						dsa_get_address(hashtable->area, old_batch0->buckets);
1166 					for (i = 0; i < hashtable->nbuckets; ++i)
1167 						dsa_pointer_atomic_write(&buckets[i], InvalidDsaPointer);
1168 				}
1169 
1170 				/* Move all chunks to the work queue for parallel processing. */
1171 				pstate->chunk_work_queue = old_batch0->chunks;
1172 
1173 				/* Disable further growth temporarily while we're growing. */
1174 				pstate->growth = PHJ_GROWTH_DISABLED;
1175 			}
1176 			else
1177 			{
1178 				/* All other participants just flush their tuples to disk. */
1179 				ExecParallelHashCloseBatchAccessors(hashtable);
1180 			}
1181 			/* Fall through. */
1182 
1183 		case PHJ_GROW_BATCHES_ALLOCATING:
1184 			/* Wait for the above to be finished. */
1185 			BarrierArriveAndWait(&pstate->grow_batches_barrier,
1186 								 WAIT_EVENT_HASH_GROW_BATCHES_ALLOCATING);
1187 			/* Fall through. */
1188 
1189 		case PHJ_GROW_BATCHES_REPARTITIONING:
1190 			/* Make sure that we have the current dimensions and buckets. */
1191 			ExecParallelHashEnsureBatchAccessors(hashtable);
1192 			ExecParallelHashTableSetCurrentBatch(hashtable, 0);
1193 			/* Then partition, flush counters. */
1194 			ExecParallelHashRepartitionFirst(hashtable);
1195 			ExecParallelHashRepartitionRest(hashtable);
1196 			ExecParallelHashMergeCounters(hashtable);
1197 			/* Wait for the above to be finished. */
1198 			BarrierArriveAndWait(&pstate->grow_batches_barrier,
1199 								 WAIT_EVENT_HASH_GROW_BATCHES_REPARTITIONING);
1200 			/* Fall through. */
1201 
1202 		case PHJ_GROW_BATCHES_DECIDING:
1203 
1204 			/*
1205 			 * Elect one participant to clean up and decide whether further
1206 			 * repartitioning is needed, or should be disabled because it's
1207 			 * not helping.
1208 			 */
1209 			if (BarrierArriveAndWait(&pstate->grow_batches_barrier,
1210 									 WAIT_EVENT_HASH_GROW_BATCHES_DECIDING))
1211 			{
1212 				bool		space_exhausted = false;
1213 				bool		extreme_skew_detected = false;
1214 
1215 				/* Make sure that we have the current dimensions and buckets. */
1216 				ExecParallelHashEnsureBatchAccessors(hashtable);
1217 				ExecParallelHashTableSetCurrentBatch(hashtable, 0);
1218 
1219 				/* Are any of the new generation of batches exhausted? */
1220 				for (i = 0; i < hashtable->nbatch; ++i)
1221 				{
1222 					ParallelHashJoinBatch *batch = hashtable->batches[i].shared;
1223 
1224 					if (batch->space_exhausted ||
1225 						batch->estimated_size > pstate->space_allowed)
1226 					{
1227 						int			parent;
1228 
1229 						space_exhausted = true;
1230 
1231 						/*
1232 						 * Did this batch receive ALL of the tuples from its
1233 						 * parent batch?  That would indicate that further
1234 						 * repartitioning isn't going to help (the hash values
1235 						 * are probably all the same).
1236 						 */
1237 						parent = i % pstate->old_nbatch;
1238 						if (batch->ntuples == hashtable->batches[parent].shared->old_ntuples)
1239 							extreme_skew_detected = true;
1240 					}
1241 				}
1242 
1243 				/* Don't keep growing if it's not helping or we'd overflow. */
1244 				if (extreme_skew_detected || hashtable->nbatch >= INT_MAX / 2)
1245 					pstate->growth = PHJ_GROWTH_DISABLED;
1246 				else if (space_exhausted)
1247 					pstate->growth = PHJ_GROWTH_NEED_MORE_BATCHES;
1248 				else
1249 					pstate->growth = PHJ_GROWTH_OK;
1250 
1251 				/* Free the old batches in shared memory. */
1252 				dsa_free(hashtable->area, pstate->old_batches);
1253 				pstate->old_batches = InvalidDsaPointer;
1254 			}
1255 			/* Fall through. */
1256 
1257 		case PHJ_GROW_BATCHES_FINISHING:
1258 			/* Wait for the above to complete. */
1259 			BarrierArriveAndWait(&pstate->grow_batches_barrier,
1260 								 WAIT_EVENT_HASH_GROW_BATCHES_FINISHING);
1261 	}
1262 }
1263 
1264 /*
1265  * Repartition the tuples currently loaded into memory for inner batch 0
1266  * because the number of batches has been increased.  Some tuples are retained
1267  * in memory and some are written out to a later batch.
1268  */
1269 static void
ExecParallelHashRepartitionFirst(HashJoinTable hashtable)1270 ExecParallelHashRepartitionFirst(HashJoinTable hashtable)
1271 {
1272 	dsa_pointer chunk_shared;
1273 	HashMemoryChunk chunk;
1274 
1275 	Assert(hashtable->nbatch == hashtable->parallel_state->nbatch);
1276 
1277 	while ((chunk = ExecParallelHashPopChunkQueue(hashtable, &chunk_shared)))
1278 	{
1279 		size_t		idx = 0;
1280 
1281 		/* Repartition all tuples in this chunk. */
1282 		while (idx < chunk->used)
1283 		{
1284 			HashJoinTuple hashTuple = (HashJoinTuple) (HASH_CHUNK_DATA(chunk) + idx);
1285 			MinimalTuple tuple = HJTUPLE_MINTUPLE(hashTuple);
1286 			HashJoinTuple copyTuple;
1287 			dsa_pointer shared;
1288 			int			bucketno;
1289 			int			batchno;
1290 
1291 			ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue,
1292 									  &bucketno, &batchno);
1293 
1294 			Assert(batchno < hashtable->nbatch);
1295 			if (batchno == 0)
1296 			{
1297 				/* It still belongs in batch 0.  Copy to a new chunk. */
1298 				copyTuple =
1299 					ExecParallelHashTupleAlloc(hashtable,
1300 											   HJTUPLE_OVERHEAD + tuple->t_len,
1301 											   &shared);
1302 				copyTuple->hashvalue = hashTuple->hashvalue;
1303 				memcpy(HJTUPLE_MINTUPLE(copyTuple), tuple, tuple->t_len);
1304 				ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno],
1305 										  copyTuple, shared);
1306 			}
1307 			else
1308 			{
1309 				size_t		tuple_size =
1310 				MAXALIGN(HJTUPLE_OVERHEAD + tuple->t_len);
1311 
1312 				/* It belongs in a later batch. */
1313 				hashtable->batches[batchno].estimated_size += tuple_size;
1314 				sts_puttuple(hashtable->batches[batchno].inner_tuples,
1315 							 &hashTuple->hashvalue, tuple);
1316 			}
1317 
1318 			/* Count this tuple. */
1319 			++hashtable->batches[0].old_ntuples;
1320 			++hashtable->batches[batchno].ntuples;
1321 
1322 			idx += MAXALIGN(HJTUPLE_OVERHEAD +
1323 							HJTUPLE_MINTUPLE(hashTuple)->t_len);
1324 		}
1325 
1326 		/* Free this chunk. */
1327 		dsa_free(hashtable->area, chunk_shared);
1328 
1329 		CHECK_FOR_INTERRUPTS();
1330 	}
1331 }
1332 
1333 /*
1334  * Help repartition inner batches 1..n.
1335  */
1336 static void
ExecParallelHashRepartitionRest(HashJoinTable hashtable)1337 ExecParallelHashRepartitionRest(HashJoinTable hashtable)
1338 {
1339 	ParallelHashJoinState *pstate = hashtable->parallel_state;
1340 	int			old_nbatch = pstate->old_nbatch;
1341 	SharedTuplestoreAccessor **old_inner_tuples;
1342 	ParallelHashJoinBatch *old_batches;
1343 	int			i;
1344 
1345 	/* Get our hands on the previous generation of batches. */
1346 	old_batches = (ParallelHashJoinBatch *)
1347 		dsa_get_address(hashtable->area, pstate->old_batches);
1348 	old_inner_tuples = palloc0(sizeof(SharedTuplestoreAccessor *) * old_nbatch);
1349 	for (i = 1; i < old_nbatch; ++i)
1350 	{
1351 		ParallelHashJoinBatch *shared =
1352 		NthParallelHashJoinBatch(old_batches, i);
1353 
1354 		old_inner_tuples[i] = sts_attach(ParallelHashJoinBatchInner(shared),
1355 										 ParallelWorkerNumber + 1,
1356 										 &pstate->fileset);
1357 	}
1358 
1359 	/* Join in the effort to repartition them. */
1360 	for (i = 1; i < old_nbatch; ++i)
1361 	{
1362 		MinimalTuple tuple;
1363 		uint32		hashvalue;
1364 
1365 		/* Scan one partition from the previous generation. */
1366 		sts_begin_parallel_scan(old_inner_tuples[i]);
1367 		while ((tuple = sts_parallel_scan_next(old_inner_tuples[i], &hashvalue)))
1368 		{
1369 			size_t		tuple_size = MAXALIGN(HJTUPLE_OVERHEAD + tuple->t_len);
1370 			int			bucketno;
1371 			int			batchno;
1372 
1373 			/* Decide which partition it goes to in the new generation. */
1374 			ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno,
1375 									  &batchno);
1376 
1377 			hashtable->batches[batchno].estimated_size += tuple_size;
1378 			++hashtable->batches[batchno].ntuples;
1379 			++hashtable->batches[i].old_ntuples;
1380 
1381 			/* Store the tuple its new batch. */
1382 			sts_puttuple(hashtable->batches[batchno].inner_tuples,
1383 						 &hashvalue, tuple);
1384 
1385 			CHECK_FOR_INTERRUPTS();
1386 		}
1387 		sts_end_parallel_scan(old_inner_tuples[i]);
1388 	}
1389 
1390 	pfree(old_inner_tuples);
1391 }
1392 
1393 /*
1394  * Transfer the backend-local per-batch counters to the shared totals.
1395  */
1396 static void
ExecParallelHashMergeCounters(HashJoinTable hashtable)1397 ExecParallelHashMergeCounters(HashJoinTable hashtable)
1398 {
1399 	ParallelHashJoinState *pstate = hashtable->parallel_state;
1400 	int			i;
1401 
1402 	LWLockAcquire(&pstate->lock, LW_EXCLUSIVE);
1403 	pstate->total_tuples = 0;
1404 	for (i = 0; i < hashtable->nbatch; ++i)
1405 	{
1406 		ParallelHashJoinBatchAccessor *batch = &hashtable->batches[i];
1407 
1408 		batch->shared->size += batch->size;
1409 		batch->shared->estimated_size += batch->estimated_size;
1410 		batch->shared->ntuples += batch->ntuples;
1411 		batch->shared->old_ntuples += batch->old_ntuples;
1412 		batch->size = 0;
1413 		batch->estimated_size = 0;
1414 		batch->ntuples = 0;
1415 		batch->old_ntuples = 0;
1416 		pstate->total_tuples += batch->shared->ntuples;
1417 	}
1418 	LWLockRelease(&pstate->lock);
1419 }
1420 
1421 /*
1422  * ExecHashIncreaseNumBuckets
1423  *		increase the original number of buckets in order to reduce
1424  *		number of tuples per bucket
1425  */
1426 static void
ExecHashIncreaseNumBuckets(HashJoinTable hashtable)1427 ExecHashIncreaseNumBuckets(HashJoinTable hashtable)
1428 {
1429 	HashMemoryChunk chunk;
1430 
1431 	/* do nothing if not an increase (it's called increase for a reason) */
1432 	if (hashtable->nbuckets >= hashtable->nbuckets_optimal)
1433 		return;
1434 
1435 #ifdef HJDEBUG
1436 	printf("Hashjoin %p: increasing nbuckets %d => %d\n",
1437 		   hashtable, hashtable->nbuckets, hashtable->nbuckets_optimal);
1438 #endif
1439 
1440 	hashtable->nbuckets = hashtable->nbuckets_optimal;
1441 	hashtable->log2_nbuckets = hashtable->log2_nbuckets_optimal;
1442 
1443 	Assert(hashtable->nbuckets > 1);
1444 	Assert(hashtable->nbuckets <= (INT_MAX / 2));
1445 	Assert(hashtable->nbuckets == (1 << hashtable->log2_nbuckets));
1446 
1447 	/*
1448 	 * Just reallocate the proper number of buckets - we don't need to walk
1449 	 * through them - we can walk the dense-allocated chunks (just like in
1450 	 * ExecHashIncreaseNumBatches, but without all the copying into new
1451 	 * chunks)
1452 	 */
1453 	hashtable->buckets.unshared =
1454 		(HashJoinTuple *) repalloc(hashtable->buckets.unshared,
1455 								   hashtable->nbuckets * sizeof(HashJoinTuple));
1456 
1457 	memset(hashtable->buckets.unshared, 0,
1458 		   hashtable->nbuckets * sizeof(HashJoinTuple));
1459 
1460 	/* scan through all tuples in all chunks to rebuild the hash table */
1461 	for (chunk = hashtable->chunks; chunk != NULL; chunk = chunk->next.unshared)
1462 	{
1463 		/* process all tuples stored in this chunk */
1464 		size_t		idx = 0;
1465 
1466 		while (idx < chunk->used)
1467 		{
1468 			HashJoinTuple hashTuple = (HashJoinTuple) (HASH_CHUNK_DATA(chunk) + idx);
1469 			int			bucketno;
1470 			int			batchno;
1471 
1472 			ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue,
1473 									  &bucketno, &batchno);
1474 
1475 			/* add the tuple to the proper bucket */
1476 			hashTuple->next.unshared = hashtable->buckets.unshared[bucketno];
1477 			hashtable->buckets.unshared[bucketno] = hashTuple;
1478 
1479 			/* advance index past the tuple */
1480 			idx += MAXALIGN(HJTUPLE_OVERHEAD +
1481 							HJTUPLE_MINTUPLE(hashTuple)->t_len);
1482 		}
1483 
1484 		/* allow this loop to be cancellable */
1485 		CHECK_FOR_INTERRUPTS();
1486 	}
1487 }
1488 
1489 static void
ExecParallelHashIncreaseNumBuckets(HashJoinTable hashtable)1490 ExecParallelHashIncreaseNumBuckets(HashJoinTable hashtable)
1491 {
1492 	ParallelHashJoinState *pstate = hashtable->parallel_state;
1493 	int			i;
1494 	HashMemoryChunk chunk;
1495 	dsa_pointer chunk_s;
1496 
1497 	Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_HASHING_INNER);
1498 
1499 	/*
1500 	 * It's unlikely, but we need to be prepared for new participants to show
1501 	 * up while we're in the middle of this operation so we need to switch on
1502 	 * barrier phase here.
1503 	 */
1504 	switch (PHJ_GROW_BUCKETS_PHASE(BarrierPhase(&pstate->grow_buckets_barrier)))
1505 	{
1506 		case PHJ_GROW_BUCKETS_ELECTING:
1507 			/* Elect one participant to prepare to increase nbuckets. */
1508 			if (BarrierArriveAndWait(&pstate->grow_buckets_barrier,
1509 									 WAIT_EVENT_HASH_GROW_BUCKETS_ELECTING))
1510 			{
1511 				size_t		size;
1512 				dsa_pointer_atomic *buckets;
1513 
1514 				/* Double the size of the bucket array. */
1515 				pstate->nbuckets *= 2;
1516 				size = pstate->nbuckets * sizeof(dsa_pointer_atomic);
1517 				hashtable->batches[0].shared->size += size / 2;
1518 				dsa_free(hashtable->area, hashtable->batches[0].shared->buckets);
1519 				hashtable->batches[0].shared->buckets =
1520 					dsa_allocate(hashtable->area, size);
1521 				buckets = (dsa_pointer_atomic *)
1522 					dsa_get_address(hashtable->area,
1523 									hashtable->batches[0].shared->buckets);
1524 				for (i = 0; i < pstate->nbuckets; ++i)
1525 					dsa_pointer_atomic_init(&buckets[i], InvalidDsaPointer);
1526 
1527 				/* Put the chunk list onto the work queue. */
1528 				pstate->chunk_work_queue = hashtable->batches[0].shared->chunks;
1529 
1530 				/* Clear the flag. */
1531 				pstate->growth = PHJ_GROWTH_OK;
1532 			}
1533 			/* Fall through. */
1534 
1535 		case PHJ_GROW_BUCKETS_ALLOCATING:
1536 			/* Wait for the above to complete. */
1537 			BarrierArriveAndWait(&pstate->grow_buckets_barrier,
1538 								 WAIT_EVENT_HASH_GROW_BUCKETS_ALLOCATING);
1539 			/* Fall through. */
1540 
1541 		case PHJ_GROW_BUCKETS_REINSERTING:
1542 			/* Reinsert all tuples into the hash table. */
1543 			ExecParallelHashEnsureBatchAccessors(hashtable);
1544 			ExecParallelHashTableSetCurrentBatch(hashtable, 0);
1545 			while ((chunk = ExecParallelHashPopChunkQueue(hashtable, &chunk_s)))
1546 			{
1547 				size_t		idx = 0;
1548 
1549 				while (idx < chunk->used)
1550 				{
1551 					HashJoinTuple hashTuple = (HashJoinTuple) (HASH_CHUNK_DATA(chunk) + idx);
1552 					dsa_pointer shared = chunk_s + HASH_CHUNK_HEADER_SIZE + idx;
1553 					int			bucketno;
1554 					int			batchno;
1555 
1556 					ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue,
1557 											  &bucketno, &batchno);
1558 					Assert(batchno == 0);
1559 
1560 					/* add the tuple to the proper bucket */
1561 					ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno],
1562 											  hashTuple, shared);
1563 
1564 					/* advance index past the tuple */
1565 					idx += MAXALIGN(HJTUPLE_OVERHEAD +
1566 									HJTUPLE_MINTUPLE(hashTuple)->t_len);
1567 				}
1568 
1569 				/* allow this loop to be cancellable */
1570 				CHECK_FOR_INTERRUPTS();
1571 			}
1572 			BarrierArriveAndWait(&pstate->grow_buckets_barrier,
1573 								 WAIT_EVENT_HASH_GROW_BUCKETS_REINSERTING);
1574 	}
1575 }
1576 
1577 /*
1578  * ExecHashTableInsert
1579  *		insert a tuple into the hash table depending on the hash value
1580  *		it may just go to a temp file for later batches
1581  *
1582  * Note: the passed TupleTableSlot may contain a regular, minimal, or virtual
1583  * tuple; the minimal case in particular is certain to happen while reloading
1584  * tuples from batch files.  We could save some cycles in the regular-tuple
1585  * case by not forcing the slot contents into minimal form; not clear if it's
1586  * worth the messiness required.
1587  */
1588 void
ExecHashTableInsert(HashJoinTable hashtable,TupleTableSlot * slot,uint32 hashvalue)1589 ExecHashTableInsert(HashJoinTable hashtable,
1590 					TupleTableSlot *slot,
1591 					uint32 hashvalue)
1592 {
1593 	MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot);
1594 	int			bucketno;
1595 	int			batchno;
1596 
1597 	ExecHashGetBucketAndBatch(hashtable, hashvalue,
1598 							  &bucketno, &batchno);
1599 
1600 	/*
1601 	 * decide whether to put the tuple in the hash table or a temp file
1602 	 */
1603 	if (batchno == hashtable->curbatch)
1604 	{
1605 		/*
1606 		 * put the tuple in hash table
1607 		 */
1608 		HashJoinTuple hashTuple;
1609 		int			hashTupleSize;
1610 		double		ntuples = (hashtable->totalTuples - hashtable->skewTuples);
1611 
1612 		/* Create the HashJoinTuple */
1613 		hashTupleSize = HJTUPLE_OVERHEAD + tuple->t_len;
1614 		hashTuple = (HashJoinTuple) dense_alloc(hashtable, hashTupleSize);
1615 
1616 		hashTuple->hashvalue = hashvalue;
1617 		memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len);
1618 
1619 		/*
1620 		 * We always reset the tuple-matched flag on insertion.  This is okay
1621 		 * even when reloading a tuple from a batch file, since the tuple
1622 		 * could not possibly have been matched to an outer tuple before it
1623 		 * went into the batch file.
1624 		 */
1625 		HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(hashTuple));
1626 
1627 		/* Push it onto the front of the bucket's list */
1628 		hashTuple->next.unshared = hashtable->buckets.unshared[bucketno];
1629 		hashtable->buckets.unshared[bucketno] = hashTuple;
1630 
1631 		/*
1632 		 * Increase the (optimal) number of buckets if we just exceeded the
1633 		 * NTUP_PER_BUCKET threshold, but only when there's still a single
1634 		 * batch.
1635 		 */
1636 		if (hashtable->nbatch == 1 &&
1637 			ntuples > (hashtable->nbuckets_optimal * NTUP_PER_BUCKET))
1638 		{
1639 			/* Guard against integer overflow and alloc size overflow */
1640 			if (hashtable->nbuckets_optimal <= INT_MAX / 2 &&
1641 				hashtable->nbuckets_optimal * 2 <= MaxAllocSize / sizeof(HashJoinTuple))
1642 			{
1643 				hashtable->nbuckets_optimal *= 2;
1644 				hashtable->log2_nbuckets_optimal += 1;
1645 			}
1646 		}
1647 
1648 		/* Account for space used, and back off if we've used too much */
1649 		hashtable->spaceUsed += hashTupleSize;
1650 		if (hashtable->spaceUsed > hashtable->spacePeak)
1651 			hashtable->spacePeak = hashtable->spaceUsed;
1652 		if (hashtable->spaceUsed +
1653 			hashtable->nbuckets_optimal * sizeof(HashJoinTuple)
1654 			> hashtable->spaceAllowed)
1655 			ExecHashIncreaseNumBatches(hashtable);
1656 	}
1657 	else
1658 	{
1659 		/*
1660 		 * put the tuple into a temp file for later batches
1661 		 */
1662 		Assert(batchno > hashtable->curbatch);
1663 		ExecHashJoinSaveTuple(tuple,
1664 							  hashvalue,
1665 							  &hashtable->innerBatchFile[batchno]);
1666 	}
1667 }
1668 
1669 /*
1670  * ExecHashTableParallelInsert
1671  *		insert a tuple into a shared hash table or shared batch tuplestore
1672  */
1673 void
ExecParallelHashTableInsert(HashJoinTable hashtable,TupleTableSlot * slot,uint32 hashvalue)1674 ExecParallelHashTableInsert(HashJoinTable hashtable,
1675 							TupleTableSlot *slot,
1676 							uint32 hashvalue)
1677 {
1678 	MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot);
1679 	dsa_pointer shared;
1680 	int			bucketno;
1681 	int			batchno;
1682 
1683 retry:
1684 	ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, &batchno);
1685 
1686 	if (batchno == 0)
1687 	{
1688 		HashJoinTuple hashTuple;
1689 
1690 		/* Try to load it into memory. */
1691 		Assert(BarrierPhase(&hashtable->parallel_state->build_barrier) ==
1692 			   PHJ_BUILD_HASHING_INNER);
1693 		hashTuple = ExecParallelHashTupleAlloc(hashtable,
1694 											   HJTUPLE_OVERHEAD + tuple->t_len,
1695 											   &shared);
1696 		if (hashTuple == NULL)
1697 			goto retry;
1698 
1699 		/* Store the hash value in the HashJoinTuple header. */
1700 		hashTuple->hashvalue = hashvalue;
1701 		memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len);
1702 
1703 		/* Push it onto the front of the bucket's list */
1704 		ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno],
1705 								  hashTuple, shared);
1706 	}
1707 	else
1708 	{
1709 		size_t		tuple_size = MAXALIGN(HJTUPLE_OVERHEAD + tuple->t_len);
1710 
1711 		Assert(batchno > 0);
1712 
1713 		/* Try to preallocate space in the batch if necessary. */
1714 		if (hashtable->batches[batchno].preallocated < tuple_size)
1715 		{
1716 			if (!ExecParallelHashTuplePrealloc(hashtable, batchno, tuple_size))
1717 				goto retry;
1718 		}
1719 
1720 		Assert(hashtable->batches[batchno].preallocated >= tuple_size);
1721 		hashtable->batches[batchno].preallocated -= tuple_size;
1722 		sts_puttuple(hashtable->batches[batchno].inner_tuples, &hashvalue,
1723 					 tuple);
1724 	}
1725 	++hashtable->batches[batchno].ntuples;
1726 }
1727 
1728 /*
1729  * Insert a tuple into the current hash table.  Unlike
1730  * ExecParallelHashTableInsert, this version is not prepared to send the tuple
1731  * to other batches or to run out of memory, and should only be called with
1732  * tuples that belong in the current batch once growth has been disabled.
1733  */
1734 void
ExecParallelHashTableInsertCurrentBatch(HashJoinTable hashtable,TupleTableSlot * slot,uint32 hashvalue)1735 ExecParallelHashTableInsertCurrentBatch(HashJoinTable hashtable,
1736 										TupleTableSlot *slot,
1737 										uint32 hashvalue)
1738 {
1739 	MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot);
1740 	HashJoinTuple hashTuple;
1741 	dsa_pointer shared;
1742 	int			batchno;
1743 	int			bucketno;
1744 
1745 	ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, &batchno);
1746 	Assert(batchno == hashtable->curbatch);
1747 	hashTuple = ExecParallelHashTupleAlloc(hashtable,
1748 										   HJTUPLE_OVERHEAD + tuple->t_len,
1749 										   &shared);
1750 	hashTuple->hashvalue = hashvalue;
1751 	memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len);
1752 	HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(hashTuple));
1753 	ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno],
1754 							  hashTuple, shared);
1755 }
1756 
1757 /*
1758  * ExecHashGetHashValue
1759  *		Compute the hash value for a tuple
1760  *
1761  * The tuple to be tested must be in either econtext->ecxt_outertuple or
1762  * econtext->ecxt_innertuple.  Vars in the hashkeys expressions should have
1763  * varno either OUTER_VAR or INNER_VAR.
1764  *
1765  * A true result means the tuple's hash value has been successfully computed
1766  * and stored at *hashvalue.  A false result means the tuple cannot match
1767  * because it contains a null attribute, and hence it should be discarded
1768  * immediately.  (If keep_nulls is true then false is never returned.)
1769  */
1770 bool
ExecHashGetHashValue(HashJoinTable hashtable,ExprContext * econtext,List * hashkeys,bool outer_tuple,bool keep_nulls,uint32 * hashvalue)1771 ExecHashGetHashValue(HashJoinTable hashtable,
1772 					 ExprContext *econtext,
1773 					 List *hashkeys,
1774 					 bool outer_tuple,
1775 					 bool keep_nulls,
1776 					 uint32 *hashvalue)
1777 {
1778 	uint32		hashkey = 0;
1779 	FmgrInfo   *hashfunctions;
1780 	ListCell   *hk;
1781 	int			i = 0;
1782 	MemoryContext oldContext;
1783 
1784 	/*
1785 	 * We reset the eval context each time to reclaim any memory leaked in the
1786 	 * hashkey expressions.
1787 	 */
1788 	ResetExprContext(econtext);
1789 
1790 	oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
1791 
1792 	if (outer_tuple)
1793 		hashfunctions = hashtable->outer_hashfunctions;
1794 	else
1795 		hashfunctions = hashtable->inner_hashfunctions;
1796 
1797 	foreach(hk, hashkeys)
1798 	{
1799 		ExprState  *keyexpr = (ExprState *) lfirst(hk);
1800 		Datum		keyval;
1801 		bool		isNull;
1802 
1803 		/* rotate hashkey left 1 bit at each step */
1804 		hashkey = (hashkey << 1) | ((hashkey & 0x80000000) ? 1 : 0);
1805 
1806 		/*
1807 		 * Get the join attribute value of the tuple
1808 		 */
1809 		keyval = ExecEvalExpr(keyexpr, econtext, &isNull);
1810 
1811 		/*
1812 		 * If the attribute is NULL, and the join operator is strict, then
1813 		 * this tuple cannot pass the join qual so we can reject it
1814 		 * immediately (unless we're scanning the outside of an outer join, in
1815 		 * which case we must not reject it).  Otherwise we act like the
1816 		 * hashcode of NULL is zero (this will support operators that act like
1817 		 * IS NOT DISTINCT, though not any more-random behavior).  We treat
1818 		 * the hash support function as strict even if the operator is not.
1819 		 *
1820 		 * Note: currently, all hashjoinable operators must be strict since
1821 		 * the hash index AM assumes that.  However, it takes so little extra
1822 		 * code here to allow non-strict that we may as well do it.
1823 		 */
1824 		if (isNull)
1825 		{
1826 			if (hashtable->hashStrict[i] && !keep_nulls)
1827 			{
1828 				MemoryContextSwitchTo(oldContext);
1829 				return false;	/* cannot match */
1830 			}
1831 			/* else, leave hashkey unmodified, equivalent to hashcode 0 */
1832 		}
1833 		else
1834 		{
1835 			/* Compute the hash function */
1836 			uint32		hkey;
1837 
1838 			hkey = DatumGetUInt32(FunctionCall1(&hashfunctions[i], keyval));
1839 			hashkey ^= hkey;
1840 		}
1841 
1842 		i++;
1843 	}
1844 
1845 	MemoryContextSwitchTo(oldContext);
1846 
1847 	*hashvalue = hashkey;
1848 	return true;
1849 }
1850 
1851 /*
1852  * Rotate the bits of "word" to the right by n bits.
1853  */
1854 static inline uint32
pg_rotate_right32(uint32 word,int n)1855 pg_rotate_right32(uint32 word, int n)
1856 {
1857 	return (word >> n) | (word << (sizeof(word) * BITS_PER_BYTE - n));
1858 }
1859 
1860 /*
1861  * ExecHashGetBucketAndBatch
1862  *		Determine the bucket number and batch number for a hash value
1863  *
1864  * Note: on-the-fly increases of nbatch must not change the bucket number
1865  * for a given hash code (since we don't move tuples to different hash
1866  * chains), and must only cause the batch number to remain the same or
1867  * increase.  Our algorithm is
1868  *		bucketno = hashvalue MOD nbuckets
1869  *		batchno = ROR(hashvalue, log2_nbuckets) MOD nbatch
1870  * where nbuckets and nbatch are both expected to be powers of 2, so we can
1871  * do the computations by shifting and masking.  (This assumes that all hash
1872  * functions are good about randomizing all their output bits, else we are
1873  * likely to have very skewed bucket or batch occupancy.)
1874  *
1875  * nbuckets and log2_nbuckets may change while nbatch == 1 because of dynamic
1876  * bucket count growth.  Once we start batching, the value is fixed and does
1877  * not change over the course of the join (making it possible to compute batch
1878  * number the way we do here).
1879  *
1880  * nbatch is always a power of 2; we increase it only by doubling it.  This
1881  * effectively adds one more bit to the top of the batchno.  In very large
1882  * joins, we might run out of bits to add, so we do this by rotating the hash
1883  * value.  This causes batchno to steal bits from bucketno when the number of
1884  * virtual buckets exceeds 2^32.  It's better to have longer bucket chains
1885  * than to lose the ability to divide batches.
1886  */
1887 void
ExecHashGetBucketAndBatch(HashJoinTable hashtable,uint32 hashvalue,int * bucketno,int * batchno)1888 ExecHashGetBucketAndBatch(HashJoinTable hashtable,
1889 						  uint32 hashvalue,
1890 						  int *bucketno,
1891 						  int *batchno)
1892 {
1893 	uint32		nbuckets = (uint32) hashtable->nbuckets;
1894 	uint32		nbatch = (uint32) hashtable->nbatch;
1895 
1896 	if (nbatch > 1)
1897 	{
1898 		*bucketno = hashvalue & (nbuckets - 1);
1899 		*batchno = pg_rotate_right32(hashvalue,
1900 									 hashtable->log2_nbuckets) & (nbatch - 1);
1901 	}
1902 	else
1903 	{
1904 		*bucketno = hashvalue & (nbuckets - 1);
1905 		*batchno = 0;
1906 	}
1907 }
1908 
1909 /*
1910  * ExecScanHashBucket
1911  *		scan a hash bucket for matches to the current outer tuple
1912  *
1913  * The current outer tuple must be stored in econtext->ecxt_outertuple.
1914  *
1915  * On success, the inner tuple is stored into hjstate->hj_CurTuple and
1916  * econtext->ecxt_innertuple, using hjstate->hj_HashTupleSlot as the slot
1917  * for the latter.
1918  */
1919 bool
ExecScanHashBucket(HashJoinState * hjstate,ExprContext * econtext)1920 ExecScanHashBucket(HashJoinState *hjstate,
1921 				   ExprContext *econtext)
1922 {
1923 	ExprState  *hjclauses = hjstate->hashclauses;
1924 	HashJoinTable hashtable = hjstate->hj_HashTable;
1925 	HashJoinTuple hashTuple = hjstate->hj_CurTuple;
1926 	uint32		hashvalue = hjstate->hj_CurHashValue;
1927 
1928 	/*
1929 	 * hj_CurTuple is the address of the tuple last returned from the current
1930 	 * bucket, or NULL if it's time to start scanning a new bucket.
1931 	 *
1932 	 * If the tuple hashed to a skew bucket then scan the skew bucket
1933 	 * otherwise scan the standard hashtable bucket.
1934 	 */
1935 	if (hashTuple != NULL)
1936 		hashTuple = hashTuple->next.unshared;
1937 	else if (hjstate->hj_CurSkewBucketNo != INVALID_SKEW_BUCKET_NO)
1938 		hashTuple = hashtable->skewBucket[hjstate->hj_CurSkewBucketNo]->tuples;
1939 	else
1940 		hashTuple = hashtable->buckets.unshared[hjstate->hj_CurBucketNo];
1941 
1942 	while (hashTuple != NULL)
1943 	{
1944 		if (hashTuple->hashvalue == hashvalue)
1945 		{
1946 			TupleTableSlot *inntuple;
1947 
1948 			/* insert hashtable's tuple into exec slot so ExecQual sees it */
1949 			inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
1950 											 hjstate->hj_HashTupleSlot,
1951 											 false);	/* do not pfree */
1952 			econtext->ecxt_innertuple = inntuple;
1953 
1954 			if (ExecQualAndReset(hjclauses, econtext))
1955 			{
1956 				hjstate->hj_CurTuple = hashTuple;
1957 				return true;
1958 			}
1959 		}
1960 
1961 		hashTuple = hashTuple->next.unshared;
1962 	}
1963 
1964 	/*
1965 	 * no match
1966 	 */
1967 	return false;
1968 }
1969 
1970 /*
1971  * ExecParallelScanHashBucket
1972  *		scan a hash bucket for matches to the current outer tuple
1973  *
1974  * The current outer tuple must be stored in econtext->ecxt_outertuple.
1975  *
1976  * On success, the inner tuple is stored into hjstate->hj_CurTuple and
1977  * econtext->ecxt_innertuple, using hjstate->hj_HashTupleSlot as the slot
1978  * for the latter.
1979  */
1980 bool
ExecParallelScanHashBucket(HashJoinState * hjstate,ExprContext * econtext)1981 ExecParallelScanHashBucket(HashJoinState *hjstate,
1982 						   ExprContext *econtext)
1983 {
1984 	ExprState  *hjclauses = hjstate->hashclauses;
1985 	HashJoinTable hashtable = hjstate->hj_HashTable;
1986 	HashJoinTuple hashTuple = hjstate->hj_CurTuple;
1987 	uint32		hashvalue = hjstate->hj_CurHashValue;
1988 
1989 	/*
1990 	 * hj_CurTuple is the address of the tuple last returned from the current
1991 	 * bucket, or NULL if it's time to start scanning a new bucket.
1992 	 */
1993 	if (hashTuple != NULL)
1994 		hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple);
1995 	else
1996 		hashTuple = ExecParallelHashFirstTuple(hashtable,
1997 											   hjstate->hj_CurBucketNo);
1998 
1999 	while (hashTuple != NULL)
2000 	{
2001 		if (hashTuple->hashvalue == hashvalue)
2002 		{
2003 			TupleTableSlot *inntuple;
2004 
2005 			/* insert hashtable's tuple into exec slot so ExecQual sees it */
2006 			inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
2007 											 hjstate->hj_HashTupleSlot,
2008 											 false);	/* do not pfree */
2009 			econtext->ecxt_innertuple = inntuple;
2010 
2011 			if (ExecQualAndReset(hjclauses, econtext))
2012 			{
2013 				hjstate->hj_CurTuple = hashTuple;
2014 				return true;
2015 			}
2016 		}
2017 
2018 		hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple);
2019 	}
2020 
2021 	/*
2022 	 * no match
2023 	 */
2024 	return false;
2025 }
2026 
2027 /*
2028  * ExecPrepHashTableForUnmatched
2029  *		set up for a series of ExecScanHashTableForUnmatched calls
2030  */
2031 void
ExecPrepHashTableForUnmatched(HashJoinState * hjstate)2032 ExecPrepHashTableForUnmatched(HashJoinState *hjstate)
2033 {
2034 	/*----------
2035 	 * During this scan we use the HashJoinState fields as follows:
2036 	 *
2037 	 * hj_CurBucketNo: next regular bucket to scan
2038 	 * hj_CurSkewBucketNo: next skew bucket (an index into skewBucketNums)
2039 	 * hj_CurTuple: last tuple returned, or NULL to start next bucket
2040 	 *----------
2041 	 */
2042 	hjstate->hj_CurBucketNo = 0;
2043 	hjstate->hj_CurSkewBucketNo = 0;
2044 	hjstate->hj_CurTuple = NULL;
2045 }
2046 
2047 /*
2048  * ExecScanHashTableForUnmatched
2049  *		scan the hash table for unmatched inner tuples
2050  *
2051  * On success, the inner tuple is stored into hjstate->hj_CurTuple and
2052  * econtext->ecxt_innertuple, using hjstate->hj_HashTupleSlot as the slot
2053  * for the latter.
2054  */
2055 bool
ExecScanHashTableForUnmatched(HashJoinState * hjstate,ExprContext * econtext)2056 ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext)
2057 {
2058 	HashJoinTable hashtable = hjstate->hj_HashTable;
2059 	HashJoinTuple hashTuple = hjstate->hj_CurTuple;
2060 
2061 	for (;;)
2062 	{
2063 		/*
2064 		 * hj_CurTuple is the address of the tuple last returned from the
2065 		 * current bucket, or NULL if it's time to start scanning a new
2066 		 * bucket.
2067 		 */
2068 		if (hashTuple != NULL)
2069 			hashTuple = hashTuple->next.unshared;
2070 		else if (hjstate->hj_CurBucketNo < hashtable->nbuckets)
2071 		{
2072 			hashTuple = hashtable->buckets.unshared[hjstate->hj_CurBucketNo];
2073 			hjstate->hj_CurBucketNo++;
2074 		}
2075 		else if (hjstate->hj_CurSkewBucketNo < hashtable->nSkewBuckets)
2076 		{
2077 			int			j = hashtable->skewBucketNums[hjstate->hj_CurSkewBucketNo];
2078 
2079 			hashTuple = hashtable->skewBucket[j]->tuples;
2080 			hjstate->hj_CurSkewBucketNo++;
2081 		}
2082 		else
2083 			break;				/* finished all buckets */
2084 
2085 		while (hashTuple != NULL)
2086 		{
2087 			if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple)))
2088 			{
2089 				TupleTableSlot *inntuple;
2090 
2091 				/* insert hashtable's tuple into exec slot */
2092 				inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
2093 												 hjstate->hj_HashTupleSlot,
2094 												 false);	/* do not pfree */
2095 				econtext->ecxt_innertuple = inntuple;
2096 
2097 				/*
2098 				 * Reset temp memory each time; although this function doesn't
2099 				 * do any qual eval, the caller will, so let's keep it
2100 				 * parallel to ExecScanHashBucket.
2101 				 */
2102 				ResetExprContext(econtext);
2103 
2104 				hjstate->hj_CurTuple = hashTuple;
2105 				return true;
2106 			}
2107 
2108 			hashTuple = hashTuple->next.unshared;
2109 		}
2110 
2111 		/* allow this loop to be cancellable */
2112 		CHECK_FOR_INTERRUPTS();
2113 	}
2114 
2115 	/*
2116 	 * no more unmatched tuples
2117 	 */
2118 	return false;
2119 }
2120 
2121 /*
2122  * ExecHashTableReset
2123  *
2124  *		reset hash table header for new batch
2125  */
2126 void
ExecHashTableReset(HashJoinTable hashtable)2127 ExecHashTableReset(HashJoinTable hashtable)
2128 {
2129 	MemoryContext oldcxt;
2130 	int			nbuckets = hashtable->nbuckets;
2131 
2132 	/*
2133 	 * Release all the hash buckets and tuples acquired in the prior pass, and
2134 	 * reinitialize the context for a new pass.
2135 	 */
2136 	MemoryContextReset(hashtable->batchCxt);
2137 	oldcxt = MemoryContextSwitchTo(hashtable->batchCxt);
2138 
2139 	/* Reallocate and reinitialize the hash bucket headers. */
2140 	hashtable->buckets.unshared = (HashJoinTuple *)
2141 		palloc0(nbuckets * sizeof(HashJoinTuple));
2142 
2143 	hashtable->spaceUsed = 0;
2144 
2145 	MemoryContextSwitchTo(oldcxt);
2146 
2147 	/* Forget the chunks (the memory was freed by the context reset above). */
2148 	hashtable->chunks = NULL;
2149 }
2150 
2151 /*
2152  * ExecHashTableResetMatchFlags
2153  *		Clear all the HeapTupleHeaderHasMatch flags in the table
2154  */
2155 void
ExecHashTableResetMatchFlags(HashJoinTable hashtable)2156 ExecHashTableResetMatchFlags(HashJoinTable hashtable)
2157 {
2158 	HashJoinTuple tuple;
2159 	int			i;
2160 
2161 	/* Reset all flags in the main table ... */
2162 	for (i = 0; i < hashtable->nbuckets; i++)
2163 	{
2164 		for (tuple = hashtable->buckets.unshared[i]; tuple != NULL;
2165 			 tuple = tuple->next.unshared)
2166 			HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(tuple));
2167 	}
2168 
2169 	/* ... and the same for the skew buckets, if any */
2170 	for (i = 0; i < hashtable->nSkewBuckets; i++)
2171 	{
2172 		int			j = hashtable->skewBucketNums[i];
2173 		HashSkewBucket *skewBucket = hashtable->skewBucket[j];
2174 
2175 		for (tuple = skewBucket->tuples; tuple != NULL; tuple = tuple->next.unshared)
2176 			HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(tuple));
2177 	}
2178 }
2179 
2180 
2181 void
ExecReScanHash(HashState * node)2182 ExecReScanHash(HashState *node)
2183 {
2184 	/*
2185 	 * if chgParam of subnode is not null then plan will be re-scanned by
2186 	 * first ExecProcNode.
2187 	 */
2188 	if (node->ps.lefttree->chgParam == NULL)
2189 		ExecReScan(node->ps.lefttree);
2190 }
2191 
2192 
2193 /*
2194  * ExecHashBuildSkewHash
2195  *
2196  *		Set up for skew optimization if we can identify the most common values
2197  *		(MCVs) of the outer relation's join key.  We make a skew hash bucket
2198  *		for the hash value of each MCV, up to the number of slots allowed
2199  *		based on available memory.
2200  */
2201 static void
ExecHashBuildSkewHash(HashJoinTable hashtable,Hash * node,int mcvsToUse)2202 ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node, int mcvsToUse)
2203 {
2204 	HeapTupleData *statsTuple;
2205 	AttStatsSlot sslot;
2206 
2207 	/* Do nothing if planner didn't identify the outer relation's join key */
2208 	if (!OidIsValid(node->skewTable))
2209 		return;
2210 	/* Also, do nothing if we don't have room for at least one skew bucket */
2211 	if (mcvsToUse <= 0)
2212 		return;
2213 
2214 	/*
2215 	 * Try to find the MCV statistics for the outer relation's join key.
2216 	 */
2217 	statsTuple = SearchSysCache3(STATRELATTINH,
2218 								 ObjectIdGetDatum(node->skewTable),
2219 								 Int16GetDatum(node->skewColumn),
2220 								 BoolGetDatum(node->skewInherit));
2221 	if (!HeapTupleIsValid(statsTuple))
2222 		return;
2223 
2224 	if (get_attstatsslot(&sslot, statsTuple,
2225 						 STATISTIC_KIND_MCV, InvalidOid,
2226 						 ATTSTATSSLOT_VALUES | ATTSTATSSLOT_NUMBERS))
2227 	{
2228 		double		frac;
2229 		int			nbuckets;
2230 		FmgrInfo   *hashfunctions;
2231 		int			i;
2232 
2233 		if (mcvsToUse > sslot.nvalues)
2234 			mcvsToUse = sslot.nvalues;
2235 
2236 		/*
2237 		 * Calculate the expected fraction of outer relation that will
2238 		 * participate in the skew optimization.  If this isn't at least
2239 		 * SKEW_MIN_OUTER_FRACTION, don't use skew optimization.
2240 		 */
2241 		frac = 0;
2242 		for (i = 0; i < mcvsToUse; i++)
2243 			frac += sslot.numbers[i];
2244 		if (frac < SKEW_MIN_OUTER_FRACTION)
2245 		{
2246 			free_attstatsslot(&sslot);
2247 			ReleaseSysCache(statsTuple);
2248 			return;
2249 		}
2250 
2251 		/*
2252 		 * Okay, set up the skew hashtable.
2253 		 *
2254 		 * skewBucket[] is an open addressing hashtable with a power of 2 size
2255 		 * that is greater than the number of MCV values.  (This ensures there
2256 		 * will be at least one null entry, so searches will always
2257 		 * terminate.)
2258 		 *
2259 		 * Note: this code could fail if mcvsToUse exceeds INT_MAX/8 or
2260 		 * MaxAllocSize/sizeof(void *)/8, but that is not currently possible
2261 		 * since we limit pg_statistic entries to much less than that.
2262 		 */
2263 		nbuckets = 2;
2264 		while (nbuckets <= mcvsToUse)
2265 			nbuckets <<= 1;
2266 		/* use two more bits just to help avoid collisions */
2267 		nbuckets <<= 2;
2268 
2269 		hashtable->skewEnabled = true;
2270 		hashtable->skewBucketLen = nbuckets;
2271 
2272 		/*
2273 		 * We allocate the bucket memory in the hashtable's batch context. It
2274 		 * is only needed during the first batch, and this ensures it will be
2275 		 * automatically removed once the first batch is done.
2276 		 */
2277 		hashtable->skewBucket = (HashSkewBucket **)
2278 			MemoryContextAllocZero(hashtable->batchCxt,
2279 								   nbuckets * sizeof(HashSkewBucket *));
2280 		hashtable->skewBucketNums = (int *)
2281 			MemoryContextAllocZero(hashtable->batchCxt,
2282 								   mcvsToUse * sizeof(int));
2283 
2284 		hashtable->spaceUsed += nbuckets * sizeof(HashSkewBucket *)
2285 			+ mcvsToUse * sizeof(int);
2286 		hashtable->spaceUsedSkew += nbuckets * sizeof(HashSkewBucket *)
2287 			+ mcvsToUse * sizeof(int);
2288 		if (hashtable->spaceUsed > hashtable->spacePeak)
2289 			hashtable->spacePeak = hashtable->spaceUsed;
2290 
2291 		/*
2292 		 * Create a skew bucket for each MCV hash value.
2293 		 *
2294 		 * Note: it is very important that we create the buckets in order of
2295 		 * decreasing MCV frequency.  If we have to remove some buckets, they
2296 		 * must be removed in reverse order of creation (see notes in
2297 		 * ExecHashRemoveNextSkewBucket) and we want the least common MCVs to
2298 		 * be removed first.
2299 		 */
2300 		hashfunctions = hashtable->outer_hashfunctions;
2301 
2302 		for (i = 0; i < mcvsToUse; i++)
2303 		{
2304 			uint32		hashvalue;
2305 			int			bucket;
2306 
2307 			hashvalue = DatumGetUInt32(FunctionCall1(&hashfunctions[0],
2308 													 sslot.values[i]));
2309 
2310 			/*
2311 			 * While we have not hit a hole in the hashtable and have not hit
2312 			 * the desired bucket, we have collided with some previous hash
2313 			 * value, so try the next bucket location.  NB: this code must
2314 			 * match ExecHashGetSkewBucket.
2315 			 */
2316 			bucket = hashvalue & (nbuckets - 1);
2317 			while (hashtable->skewBucket[bucket] != NULL &&
2318 				   hashtable->skewBucket[bucket]->hashvalue != hashvalue)
2319 				bucket = (bucket + 1) & (nbuckets - 1);
2320 
2321 			/*
2322 			 * If we found an existing bucket with the same hashvalue, leave
2323 			 * it alone.  It's okay for two MCVs to share a hashvalue.
2324 			 */
2325 			if (hashtable->skewBucket[bucket] != NULL)
2326 				continue;
2327 
2328 			/* Okay, create a new skew bucket for this hashvalue. */
2329 			hashtable->skewBucket[bucket] = (HashSkewBucket *)
2330 				MemoryContextAlloc(hashtable->batchCxt,
2331 								   sizeof(HashSkewBucket));
2332 			hashtable->skewBucket[bucket]->hashvalue = hashvalue;
2333 			hashtable->skewBucket[bucket]->tuples = NULL;
2334 			hashtable->skewBucketNums[hashtable->nSkewBuckets] = bucket;
2335 			hashtable->nSkewBuckets++;
2336 			hashtable->spaceUsed += SKEW_BUCKET_OVERHEAD;
2337 			hashtable->spaceUsedSkew += SKEW_BUCKET_OVERHEAD;
2338 			if (hashtable->spaceUsed > hashtable->spacePeak)
2339 				hashtable->spacePeak = hashtable->spaceUsed;
2340 		}
2341 
2342 		free_attstatsslot(&sslot);
2343 	}
2344 
2345 	ReleaseSysCache(statsTuple);
2346 }
2347 
2348 /*
2349  * ExecHashGetSkewBucket
2350  *
2351  *		Returns the index of the skew bucket for this hashvalue,
2352  *		or INVALID_SKEW_BUCKET_NO if the hashvalue is not
2353  *		associated with any active skew bucket.
2354  */
2355 int
ExecHashGetSkewBucket(HashJoinTable hashtable,uint32 hashvalue)2356 ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 hashvalue)
2357 {
2358 	int			bucket;
2359 
2360 	/*
2361 	 * Always return INVALID_SKEW_BUCKET_NO if not doing skew optimization (in
2362 	 * particular, this happens after the initial batch is done).
2363 	 */
2364 	if (!hashtable->skewEnabled)
2365 		return INVALID_SKEW_BUCKET_NO;
2366 
2367 	/*
2368 	 * Since skewBucketLen is a power of 2, we can do a modulo by ANDing.
2369 	 */
2370 	bucket = hashvalue & (hashtable->skewBucketLen - 1);
2371 
2372 	/*
2373 	 * While we have not hit a hole in the hashtable and have not hit the
2374 	 * desired bucket, we have collided with some other hash value, so try the
2375 	 * next bucket location.
2376 	 */
2377 	while (hashtable->skewBucket[bucket] != NULL &&
2378 		   hashtable->skewBucket[bucket]->hashvalue != hashvalue)
2379 		bucket = (bucket + 1) & (hashtable->skewBucketLen - 1);
2380 
2381 	/*
2382 	 * Found the desired bucket?
2383 	 */
2384 	if (hashtable->skewBucket[bucket] != NULL)
2385 		return bucket;
2386 
2387 	/*
2388 	 * There must not be any hashtable entry for this hash value.
2389 	 */
2390 	return INVALID_SKEW_BUCKET_NO;
2391 }
2392 
2393 /*
2394  * ExecHashSkewTableInsert
2395  *
2396  *		Insert a tuple into the skew hashtable.
2397  *
2398  * This should generally match up with the current-batch case in
2399  * ExecHashTableInsert.
2400  */
2401 static void
ExecHashSkewTableInsert(HashJoinTable hashtable,TupleTableSlot * slot,uint32 hashvalue,int bucketNumber)2402 ExecHashSkewTableInsert(HashJoinTable hashtable,
2403 						TupleTableSlot *slot,
2404 						uint32 hashvalue,
2405 						int bucketNumber)
2406 {
2407 	MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot);
2408 	HashJoinTuple hashTuple;
2409 	int			hashTupleSize;
2410 
2411 	/* Create the HashJoinTuple */
2412 	hashTupleSize = HJTUPLE_OVERHEAD + tuple->t_len;
2413 	hashTuple = (HashJoinTuple) MemoryContextAlloc(hashtable->batchCxt,
2414 												   hashTupleSize);
2415 	hashTuple->hashvalue = hashvalue;
2416 	memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len);
2417 	HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(hashTuple));
2418 
2419 	/* Push it onto the front of the skew bucket's list */
2420 	hashTuple->next.unshared = hashtable->skewBucket[bucketNumber]->tuples;
2421 	hashtable->skewBucket[bucketNumber]->tuples = hashTuple;
2422 	Assert(hashTuple != hashTuple->next.unshared);
2423 
2424 	/* Account for space used, and back off if we've used too much */
2425 	hashtable->spaceUsed += hashTupleSize;
2426 	hashtable->spaceUsedSkew += hashTupleSize;
2427 	if (hashtable->spaceUsed > hashtable->spacePeak)
2428 		hashtable->spacePeak = hashtable->spaceUsed;
2429 	while (hashtable->spaceUsedSkew > hashtable->spaceAllowedSkew)
2430 		ExecHashRemoveNextSkewBucket(hashtable);
2431 
2432 	/* Check we are not over the total spaceAllowed, either */
2433 	if (hashtable->spaceUsed > hashtable->spaceAllowed)
2434 		ExecHashIncreaseNumBatches(hashtable);
2435 }
2436 
2437 /*
2438  *		ExecHashRemoveNextSkewBucket
2439  *
2440  *		Remove the least valuable skew bucket by pushing its tuples into
2441  *		the main hash table.
2442  */
2443 static void
ExecHashRemoveNextSkewBucket(HashJoinTable hashtable)2444 ExecHashRemoveNextSkewBucket(HashJoinTable hashtable)
2445 {
2446 	int			bucketToRemove;
2447 	HashSkewBucket *bucket;
2448 	uint32		hashvalue;
2449 	int			bucketno;
2450 	int			batchno;
2451 	HashJoinTuple hashTuple;
2452 
2453 	/* Locate the bucket to remove */
2454 	bucketToRemove = hashtable->skewBucketNums[hashtable->nSkewBuckets - 1];
2455 	bucket = hashtable->skewBucket[bucketToRemove];
2456 
2457 	/*
2458 	 * Calculate which bucket and batch the tuples belong to in the main
2459 	 * hashtable.  They all have the same hash value, so it's the same for all
2460 	 * of them.  Also note that it's not possible for nbatch to increase while
2461 	 * we are processing the tuples.
2462 	 */
2463 	hashvalue = bucket->hashvalue;
2464 	ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, &batchno);
2465 
2466 	/* Process all tuples in the bucket */
2467 	hashTuple = bucket->tuples;
2468 	while (hashTuple != NULL)
2469 	{
2470 		HashJoinTuple nextHashTuple = hashTuple->next.unshared;
2471 		MinimalTuple tuple;
2472 		Size		tupleSize;
2473 
2474 		/*
2475 		 * This code must agree with ExecHashTableInsert.  We do not use
2476 		 * ExecHashTableInsert directly as ExecHashTableInsert expects a
2477 		 * TupleTableSlot while we already have HashJoinTuples.
2478 		 */
2479 		tuple = HJTUPLE_MINTUPLE(hashTuple);
2480 		tupleSize = HJTUPLE_OVERHEAD + tuple->t_len;
2481 
2482 		/* Decide whether to put the tuple in the hash table or a temp file */
2483 		if (batchno == hashtable->curbatch)
2484 		{
2485 			/* Move the tuple to the main hash table */
2486 			HashJoinTuple copyTuple;
2487 
2488 			/*
2489 			 * We must copy the tuple into the dense storage, else it will not
2490 			 * be found by, eg, ExecHashIncreaseNumBatches.
2491 			 */
2492 			copyTuple = (HashJoinTuple) dense_alloc(hashtable, tupleSize);
2493 			memcpy(copyTuple, hashTuple, tupleSize);
2494 			pfree(hashTuple);
2495 
2496 			copyTuple->next.unshared = hashtable->buckets.unshared[bucketno];
2497 			hashtable->buckets.unshared[bucketno] = copyTuple;
2498 
2499 			/* We have reduced skew space, but overall space doesn't change */
2500 			hashtable->spaceUsedSkew -= tupleSize;
2501 		}
2502 		else
2503 		{
2504 			/* Put the tuple into a temp file for later batches */
2505 			Assert(batchno > hashtable->curbatch);
2506 			ExecHashJoinSaveTuple(tuple, hashvalue,
2507 								  &hashtable->innerBatchFile[batchno]);
2508 			pfree(hashTuple);
2509 			hashtable->spaceUsed -= tupleSize;
2510 			hashtable->spaceUsedSkew -= tupleSize;
2511 		}
2512 
2513 		hashTuple = nextHashTuple;
2514 
2515 		/* allow this loop to be cancellable */
2516 		CHECK_FOR_INTERRUPTS();
2517 	}
2518 
2519 	/*
2520 	 * Free the bucket struct itself and reset the hashtable entry to NULL.
2521 	 *
2522 	 * NOTE: this is not nearly as simple as it looks on the surface, because
2523 	 * of the possibility of collisions in the hashtable.  Suppose that hash
2524 	 * values A and B collide at a particular hashtable entry, and that A was
2525 	 * entered first so B gets shifted to a different table entry.  If we were
2526 	 * to remove A first then ExecHashGetSkewBucket would mistakenly start
2527 	 * reporting that B is not in the hashtable, because it would hit the NULL
2528 	 * before finding B.  However, we always remove entries in the reverse
2529 	 * order of creation, so this failure cannot happen.
2530 	 */
2531 	hashtable->skewBucket[bucketToRemove] = NULL;
2532 	hashtable->nSkewBuckets--;
2533 	pfree(bucket);
2534 	hashtable->spaceUsed -= SKEW_BUCKET_OVERHEAD;
2535 	hashtable->spaceUsedSkew -= SKEW_BUCKET_OVERHEAD;
2536 
2537 	/*
2538 	 * If we have removed all skew buckets then give up on skew optimization.
2539 	 * Release the arrays since they aren't useful any more.
2540 	 */
2541 	if (hashtable->nSkewBuckets == 0)
2542 	{
2543 		hashtable->skewEnabled = false;
2544 		pfree(hashtable->skewBucket);
2545 		pfree(hashtable->skewBucketNums);
2546 		hashtable->skewBucket = NULL;
2547 		hashtable->skewBucketNums = NULL;
2548 		hashtable->spaceUsed -= hashtable->spaceUsedSkew;
2549 		hashtable->spaceUsedSkew = 0;
2550 	}
2551 }
2552 
2553 /*
2554  * Reserve space in the DSM segment for instrumentation data.
2555  */
2556 void
ExecHashEstimate(HashState * node,ParallelContext * pcxt)2557 ExecHashEstimate(HashState *node, ParallelContext *pcxt)
2558 {
2559 	size_t		size;
2560 
2561 	/* don't need this if not instrumenting or no workers */
2562 	if (!node->ps.instrument || pcxt->nworkers == 0)
2563 		return;
2564 
2565 	size = mul_size(pcxt->nworkers, sizeof(HashInstrumentation));
2566 	size = add_size(size, offsetof(SharedHashInfo, hinstrument));
2567 	shm_toc_estimate_chunk(&pcxt->estimator, size);
2568 	shm_toc_estimate_keys(&pcxt->estimator, 1);
2569 }
2570 
2571 /*
2572  * Set up a space in the DSM for all workers to record instrumentation data
2573  * about their hash table.
2574  */
2575 void
ExecHashInitializeDSM(HashState * node,ParallelContext * pcxt)2576 ExecHashInitializeDSM(HashState *node, ParallelContext *pcxt)
2577 {
2578 	size_t		size;
2579 
2580 	/* don't need this if not instrumenting or no workers */
2581 	if (!node->ps.instrument || pcxt->nworkers == 0)
2582 		return;
2583 
2584 	size = offsetof(SharedHashInfo, hinstrument) +
2585 		pcxt->nworkers * sizeof(HashInstrumentation);
2586 	node->shared_info = (SharedHashInfo *) shm_toc_allocate(pcxt->toc, size);
2587 	memset(node->shared_info, 0, size);
2588 	node->shared_info->num_workers = pcxt->nworkers;
2589 	shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id,
2590 				   node->shared_info);
2591 }
2592 
2593 /*
2594  * Locate the DSM space for hash table instrumentation data that we'll write
2595  * to at shutdown time.
2596  */
2597 void
ExecHashInitializeWorker(HashState * node,ParallelWorkerContext * pwcxt)2598 ExecHashInitializeWorker(HashState *node, ParallelWorkerContext *pwcxt)
2599 {
2600 	SharedHashInfo *shared_info;
2601 
2602 	/* don't need this if not instrumenting */
2603 	if (!node->ps.instrument)
2604 		return;
2605 
2606 	shared_info = (SharedHashInfo *)
2607 		shm_toc_lookup(pwcxt->toc, node->ps.plan->plan_node_id, false);
2608 	node->hinstrument = &shared_info->hinstrument[ParallelWorkerNumber];
2609 }
2610 
2611 /*
2612  * Copy instrumentation data from this worker's hash table (if it built one)
2613  * to DSM memory so the leader can retrieve it.  This must be done in an
2614  * ExecShutdownHash() rather than ExecEndHash() because the latter runs after
2615  * we've detached from the DSM segment.
2616  */
2617 void
ExecShutdownHash(HashState * node)2618 ExecShutdownHash(HashState *node)
2619 {
2620 	if (node->hinstrument && node->hashtable)
2621 		ExecHashGetInstrumentation(node->hinstrument, node->hashtable);
2622 }
2623 
2624 /*
2625  * Retrieve instrumentation data from workers before the DSM segment is
2626  * detached, so that EXPLAIN can access it.
2627  */
2628 void
ExecHashRetrieveInstrumentation(HashState * node)2629 ExecHashRetrieveInstrumentation(HashState *node)
2630 {
2631 	SharedHashInfo *shared_info = node->shared_info;
2632 	size_t		size;
2633 
2634 	if (shared_info == NULL)
2635 		return;
2636 
2637 	/* Replace node->shared_info with a copy in backend-local memory. */
2638 	size = offsetof(SharedHashInfo, hinstrument) +
2639 		shared_info->num_workers * sizeof(HashInstrumentation);
2640 	node->shared_info = palloc(size);
2641 	memcpy(node->shared_info, shared_info, size);
2642 }
2643 
2644 /*
2645  * Copy the instrumentation data from 'hashtable' into a HashInstrumentation
2646  * struct.
2647  */
2648 void
ExecHashGetInstrumentation(HashInstrumentation * instrument,HashJoinTable hashtable)2649 ExecHashGetInstrumentation(HashInstrumentation *instrument,
2650 						   HashJoinTable hashtable)
2651 {
2652 	instrument->nbuckets = hashtable->nbuckets;
2653 	instrument->nbuckets_original = hashtable->nbuckets_original;
2654 	instrument->nbatch = hashtable->nbatch;
2655 	instrument->nbatch_original = hashtable->nbatch_original;
2656 	instrument->space_peak = hashtable->spacePeak;
2657 }
2658 
2659 /*
2660  * Allocate 'size' bytes from the currently active HashMemoryChunk
2661  */
2662 static void *
dense_alloc(HashJoinTable hashtable,Size size)2663 dense_alloc(HashJoinTable hashtable, Size size)
2664 {
2665 	HashMemoryChunk newChunk;
2666 	char	   *ptr;
2667 
2668 	/* just in case the size is not already aligned properly */
2669 	size = MAXALIGN(size);
2670 
2671 	/*
2672 	 * If tuple size is larger than threshold, allocate a separate chunk.
2673 	 */
2674 	if (size > HASH_CHUNK_THRESHOLD)
2675 	{
2676 		/* allocate new chunk and put it at the beginning of the list */
2677 		newChunk = (HashMemoryChunk) MemoryContextAlloc(hashtable->batchCxt,
2678 														HASH_CHUNK_HEADER_SIZE + size);
2679 		newChunk->maxlen = size;
2680 		newChunk->used = size;
2681 		newChunk->ntuples = 1;
2682 
2683 		/*
2684 		 * Add this chunk to the list after the first existing chunk, so that
2685 		 * we don't lose the remaining space in the "current" chunk.
2686 		 */
2687 		if (hashtable->chunks != NULL)
2688 		{
2689 			newChunk->next = hashtable->chunks->next;
2690 			hashtable->chunks->next.unshared = newChunk;
2691 		}
2692 		else
2693 		{
2694 			newChunk->next.unshared = hashtable->chunks;
2695 			hashtable->chunks = newChunk;
2696 		}
2697 
2698 		return HASH_CHUNK_DATA(newChunk);
2699 	}
2700 
2701 	/*
2702 	 * See if we have enough space for it in the current chunk (if any). If
2703 	 * not, allocate a fresh chunk.
2704 	 */
2705 	if ((hashtable->chunks == NULL) ||
2706 		(hashtable->chunks->maxlen - hashtable->chunks->used) < size)
2707 	{
2708 		/* allocate new chunk and put it at the beginning of the list */
2709 		newChunk = (HashMemoryChunk) MemoryContextAlloc(hashtable->batchCxt,
2710 														HASH_CHUNK_HEADER_SIZE + HASH_CHUNK_SIZE);
2711 
2712 		newChunk->maxlen = HASH_CHUNK_SIZE;
2713 		newChunk->used = size;
2714 		newChunk->ntuples = 1;
2715 
2716 		newChunk->next.unshared = hashtable->chunks;
2717 		hashtable->chunks = newChunk;
2718 
2719 		return HASH_CHUNK_DATA(newChunk);
2720 	}
2721 
2722 	/* There is enough space in the current chunk, let's add the tuple */
2723 	ptr = HASH_CHUNK_DATA(hashtable->chunks) + hashtable->chunks->used;
2724 	hashtable->chunks->used += size;
2725 	hashtable->chunks->ntuples += 1;
2726 
2727 	/* return pointer to the start of the tuple memory */
2728 	return ptr;
2729 }
2730 
2731 /*
2732  * Allocate space for a tuple in shared dense storage.  This is equivalent to
2733  * dense_alloc but for Parallel Hash using shared memory.
2734  *
2735  * While loading a tuple into shared memory, we might run out of memory and
2736  * decide to repartition, or determine that the load factor is too high and
2737  * decide to expand the bucket array, or discover that another participant has
2738  * commanded us to help do that.  Return NULL if number of buckets or batches
2739  * has changed, indicating that the caller must retry (considering the
2740  * possibility that the tuple no longer belongs in the same batch).
2741  */
2742 static HashJoinTuple
ExecParallelHashTupleAlloc(HashJoinTable hashtable,size_t size,dsa_pointer * shared)2743 ExecParallelHashTupleAlloc(HashJoinTable hashtable, size_t size,
2744 						   dsa_pointer *shared)
2745 {
2746 	ParallelHashJoinState *pstate = hashtable->parallel_state;
2747 	dsa_pointer chunk_shared;
2748 	HashMemoryChunk chunk;
2749 	Size		chunk_size;
2750 	HashJoinTuple result;
2751 	int			curbatch = hashtable->curbatch;
2752 
2753 	size = MAXALIGN(size);
2754 
2755 	/*
2756 	 * Fast path: if there is enough space in this backend's current chunk,
2757 	 * then we can allocate without any locking.
2758 	 */
2759 	chunk = hashtable->current_chunk;
2760 	if (chunk != NULL &&
2761 		size <= HASH_CHUNK_THRESHOLD &&
2762 		chunk->maxlen - chunk->used >= size)
2763 	{
2764 
2765 		chunk_shared = hashtable->current_chunk_shared;
2766 		Assert(chunk == dsa_get_address(hashtable->area, chunk_shared));
2767 		*shared = chunk_shared + HASH_CHUNK_HEADER_SIZE + chunk->used;
2768 		result = (HashJoinTuple) (HASH_CHUNK_DATA(chunk) + chunk->used);
2769 		chunk->used += size;
2770 
2771 		Assert(chunk->used <= chunk->maxlen);
2772 		Assert(result == dsa_get_address(hashtable->area, *shared));
2773 
2774 		return result;
2775 	}
2776 
2777 	/* Slow path: try to allocate a new chunk. */
2778 	LWLockAcquire(&pstate->lock, LW_EXCLUSIVE);
2779 
2780 	/*
2781 	 * Check if we need to help increase the number of buckets or batches.
2782 	 */
2783 	if (pstate->growth == PHJ_GROWTH_NEED_MORE_BATCHES ||
2784 		pstate->growth == PHJ_GROWTH_NEED_MORE_BUCKETS)
2785 	{
2786 		ParallelHashGrowth growth = pstate->growth;
2787 
2788 		hashtable->current_chunk = NULL;
2789 		LWLockRelease(&pstate->lock);
2790 
2791 		/* Another participant has commanded us to help grow. */
2792 		if (growth == PHJ_GROWTH_NEED_MORE_BATCHES)
2793 			ExecParallelHashIncreaseNumBatches(hashtable);
2794 		else if (growth == PHJ_GROWTH_NEED_MORE_BUCKETS)
2795 			ExecParallelHashIncreaseNumBuckets(hashtable);
2796 
2797 		/* The caller must retry. */
2798 		return NULL;
2799 	}
2800 
2801 	/* Oversized tuples get their own chunk. */
2802 	if (size > HASH_CHUNK_THRESHOLD)
2803 		chunk_size = size + HASH_CHUNK_HEADER_SIZE;
2804 	else
2805 		chunk_size = HASH_CHUNK_SIZE;
2806 
2807 	/* Check if it's time to grow batches or buckets. */
2808 	if (pstate->growth != PHJ_GROWTH_DISABLED)
2809 	{
2810 		Assert(curbatch == 0);
2811 		Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_HASHING_INNER);
2812 
2813 		/*
2814 		 * Check if our space limit would be exceeded.  To avoid choking on
2815 		 * very large tuples or very low work_mem setting, we'll always allow
2816 		 * each backend to allocate at least one chunk.
2817 		 */
2818 		if (hashtable->batches[0].at_least_one_chunk &&
2819 			hashtable->batches[0].shared->size +
2820 			chunk_size > pstate->space_allowed)
2821 		{
2822 			pstate->growth = PHJ_GROWTH_NEED_MORE_BATCHES;
2823 			hashtable->batches[0].shared->space_exhausted = true;
2824 			LWLockRelease(&pstate->lock);
2825 
2826 			return NULL;
2827 		}
2828 
2829 		/* Check if our load factor limit would be exceeded. */
2830 		if (hashtable->nbatch == 1)
2831 		{
2832 			hashtable->batches[0].shared->ntuples += hashtable->batches[0].ntuples;
2833 			hashtable->batches[0].ntuples = 0;
2834 			/* Guard against integer overflow and alloc size overflow */
2835 			if (hashtable->batches[0].shared->ntuples + 1 >
2836 				hashtable->nbuckets * NTUP_PER_BUCKET &&
2837 				hashtable->nbuckets < (INT_MAX / 2) &&
2838 				hashtable->nbuckets * 2 <=
2839 				MaxAllocSize / sizeof(dsa_pointer_atomic))
2840 			{
2841 				pstate->growth = PHJ_GROWTH_NEED_MORE_BUCKETS;
2842 				LWLockRelease(&pstate->lock);
2843 
2844 				return NULL;
2845 			}
2846 		}
2847 	}
2848 
2849 	/* We are cleared to allocate a new chunk. */
2850 	chunk_shared = dsa_allocate(hashtable->area, chunk_size);
2851 	hashtable->batches[curbatch].shared->size += chunk_size;
2852 	hashtable->batches[curbatch].at_least_one_chunk = true;
2853 
2854 	/* Set up the chunk. */
2855 	chunk = (HashMemoryChunk) dsa_get_address(hashtable->area, chunk_shared);
2856 	*shared = chunk_shared + HASH_CHUNK_HEADER_SIZE;
2857 	chunk->maxlen = chunk_size - HASH_CHUNK_HEADER_SIZE;
2858 	chunk->used = size;
2859 
2860 	/*
2861 	 * Push it onto the list of chunks, so that it can be found if we need to
2862 	 * increase the number of buckets or batches (batch 0 only) and later for
2863 	 * freeing the memory (all batches).
2864 	 */
2865 	chunk->next.shared = hashtable->batches[curbatch].shared->chunks;
2866 	hashtable->batches[curbatch].shared->chunks = chunk_shared;
2867 
2868 	if (size <= HASH_CHUNK_THRESHOLD)
2869 	{
2870 		/*
2871 		 * Make this the current chunk so that we can use the fast path to
2872 		 * fill the rest of it up in future calls.
2873 		 */
2874 		hashtable->current_chunk = chunk;
2875 		hashtable->current_chunk_shared = chunk_shared;
2876 	}
2877 	LWLockRelease(&pstate->lock);
2878 
2879 	Assert(HASH_CHUNK_DATA(chunk) == dsa_get_address(hashtable->area, *shared));
2880 	result = (HashJoinTuple) HASH_CHUNK_DATA(chunk);
2881 
2882 	return result;
2883 }
2884 
2885 /*
2886  * One backend needs to set up the shared batch state including tuplestores.
2887  * Other backends will ensure they have correctly configured accessors by
2888  * called ExecParallelHashEnsureBatchAccessors().
2889  */
2890 static void
ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable,int nbatch)2891 ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch)
2892 {
2893 	ParallelHashJoinState *pstate = hashtable->parallel_state;
2894 	ParallelHashJoinBatch *batches;
2895 	MemoryContext oldcxt;
2896 	int			i;
2897 
2898 	Assert(hashtable->batches == NULL);
2899 
2900 	/* Allocate space. */
2901 	pstate->batches =
2902 		dsa_allocate0(hashtable->area,
2903 					  EstimateParallelHashJoinBatch(hashtable) * nbatch);
2904 	pstate->nbatch = nbatch;
2905 	batches = dsa_get_address(hashtable->area, pstate->batches);
2906 
2907 	/* Use hash join memory context. */
2908 	oldcxt = MemoryContextSwitchTo(hashtable->hashCxt);
2909 
2910 	/* Allocate this backend's accessor array. */
2911 	hashtable->nbatch = nbatch;
2912 	hashtable->batches = (ParallelHashJoinBatchAccessor *)
2913 		palloc0(sizeof(ParallelHashJoinBatchAccessor) * hashtable->nbatch);
2914 
2915 	/* Set up the shared state, tuplestores and backend-local accessors. */
2916 	for (i = 0; i < hashtable->nbatch; ++i)
2917 	{
2918 		ParallelHashJoinBatchAccessor *accessor = &hashtable->batches[i];
2919 		ParallelHashJoinBatch *shared = NthParallelHashJoinBatch(batches, i);
2920 		char		name[MAXPGPATH];
2921 
2922 		/*
2923 		 * All members of shared were zero-initialized.  We just need to set
2924 		 * up the Barrier.
2925 		 */
2926 		BarrierInit(&shared->batch_barrier, 0);
2927 		if (i == 0)
2928 		{
2929 			/* Batch 0 doesn't need to be loaded. */
2930 			BarrierAttach(&shared->batch_barrier);
2931 			while (BarrierPhase(&shared->batch_barrier) < PHJ_BATCH_PROBING)
2932 				BarrierArriveAndWait(&shared->batch_barrier, 0);
2933 			BarrierDetach(&shared->batch_barrier);
2934 		}
2935 
2936 		/* Initialize accessor state.  All members were zero-initialized. */
2937 		accessor->shared = shared;
2938 
2939 		/* Initialize the shared tuplestores. */
2940 		snprintf(name, sizeof(name), "i%dof%d", i, hashtable->nbatch);
2941 		accessor->inner_tuples =
2942 			sts_initialize(ParallelHashJoinBatchInner(shared),
2943 						   pstate->nparticipants,
2944 						   ParallelWorkerNumber + 1,
2945 						   sizeof(uint32),
2946 						   SHARED_TUPLESTORE_SINGLE_PASS,
2947 						   &pstate->fileset,
2948 						   name);
2949 		snprintf(name, sizeof(name), "o%dof%d", i, hashtable->nbatch);
2950 		accessor->outer_tuples =
2951 			sts_initialize(ParallelHashJoinBatchOuter(shared,
2952 													  pstate->nparticipants),
2953 						   pstate->nparticipants,
2954 						   ParallelWorkerNumber + 1,
2955 						   sizeof(uint32),
2956 						   SHARED_TUPLESTORE_SINGLE_PASS,
2957 						   &pstate->fileset,
2958 						   name);
2959 	}
2960 
2961 	MemoryContextSwitchTo(oldcxt);
2962 }
2963 
2964 /*
2965  * Free the current set of ParallelHashJoinBatchAccessor objects.
2966  */
2967 static void
ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable)2968 ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable)
2969 {
2970 	int			i;
2971 
2972 	for (i = 0; i < hashtable->nbatch; ++i)
2973 	{
2974 		/* Make sure no files are left open. */
2975 		sts_end_write(hashtable->batches[i].inner_tuples);
2976 		sts_end_write(hashtable->batches[i].outer_tuples);
2977 		sts_end_parallel_scan(hashtable->batches[i].inner_tuples);
2978 		sts_end_parallel_scan(hashtable->batches[i].outer_tuples);
2979 	}
2980 	pfree(hashtable->batches);
2981 	hashtable->batches = NULL;
2982 }
2983 
2984 /*
2985  * Make sure this backend has up-to-date accessors for the current set of
2986  * batches.
2987  */
2988 static void
ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable)2989 ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable)
2990 {
2991 	ParallelHashJoinState *pstate = hashtable->parallel_state;
2992 	ParallelHashJoinBatch *batches;
2993 	MemoryContext oldcxt;
2994 	int			i;
2995 
2996 	if (hashtable->batches != NULL)
2997 	{
2998 		if (hashtable->nbatch == pstate->nbatch)
2999 			return;
3000 		ExecParallelHashCloseBatchAccessors(hashtable);
3001 	}
3002 
3003 	/*
3004 	 * It's possible for a backend to start up very late so that the whole
3005 	 * join is finished and the shm state for tracking batches has already
3006 	 * been freed by ExecHashTableDetach().  In that case we'll just leave
3007 	 * hashtable->batches as NULL so that ExecParallelHashJoinNewBatch() gives
3008 	 * up early.
3009 	 */
3010 	if (!DsaPointerIsValid(pstate->batches))
3011 		return;
3012 
3013 	/* Use hash join memory context. */
3014 	oldcxt = MemoryContextSwitchTo(hashtable->hashCxt);
3015 
3016 	/* Allocate this backend's accessor array. */
3017 	hashtable->nbatch = pstate->nbatch;
3018 	hashtable->batches = (ParallelHashJoinBatchAccessor *)
3019 		palloc0(sizeof(ParallelHashJoinBatchAccessor) * hashtable->nbatch);
3020 
3021 	/* Find the base of the pseudo-array of ParallelHashJoinBatch objects. */
3022 	batches = (ParallelHashJoinBatch *)
3023 		dsa_get_address(hashtable->area, pstate->batches);
3024 
3025 	/* Set up the accessor array and attach to the tuplestores. */
3026 	for (i = 0; i < hashtable->nbatch; ++i)
3027 	{
3028 		ParallelHashJoinBatchAccessor *accessor = &hashtable->batches[i];
3029 		ParallelHashJoinBatch *shared = NthParallelHashJoinBatch(batches, i);
3030 
3031 		accessor->shared = shared;
3032 		accessor->preallocated = 0;
3033 		accessor->done = false;
3034 		accessor->inner_tuples =
3035 			sts_attach(ParallelHashJoinBatchInner(shared),
3036 					   ParallelWorkerNumber + 1,
3037 					   &pstate->fileset);
3038 		accessor->outer_tuples =
3039 			sts_attach(ParallelHashJoinBatchOuter(shared,
3040 												  pstate->nparticipants),
3041 					   ParallelWorkerNumber + 1,
3042 					   &pstate->fileset);
3043 	}
3044 
3045 	MemoryContextSwitchTo(oldcxt);
3046 }
3047 
3048 /*
3049  * Allocate an empty shared memory hash table for a given batch.
3050  */
3051 void
ExecParallelHashTableAlloc(HashJoinTable hashtable,int batchno)3052 ExecParallelHashTableAlloc(HashJoinTable hashtable, int batchno)
3053 {
3054 	ParallelHashJoinBatch *batch = hashtable->batches[batchno].shared;
3055 	dsa_pointer_atomic *buckets;
3056 	int			nbuckets = hashtable->parallel_state->nbuckets;
3057 	int			i;
3058 
3059 	batch->buckets =
3060 		dsa_allocate(hashtable->area, sizeof(dsa_pointer_atomic) * nbuckets);
3061 	buckets = (dsa_pointer_atomic *)
3062 		dsa_get_address(hashtable->area, batch->buckets);
3063 	for (i = 0; i < nbuckets; ++i)
3064 		dsa_pointer_atomic_init(&buckets[i], InvalidDsaPointer);
3065 }
3066 
3067 /*
3068  * If we are currently attached to a shared hash join batch, detach.  If we
3069  * are last to detach, clean up.
3070  */
3071 void
ExecHashTableDetachBatch(HashJoinTable hashtable)3072 ExecHashTableDetachBatch(HashJoinTable hashtable)
3073 {
3074 	if (hashtable->parallel_state != NULL &&
3075 		hashtable->curbatch >= 0)
3076 	{
3077 		int			curbatch = hashtable->curbatch;
3078 		ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared;
3079 
3080 		/* Make sure any temporary files are closed. */
3081 		sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples);
3082 		sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples);
3083 
3084 		/* Detach from the batch we were last working on. */
3085 		if (BarrierArriveAndDetach(&batch->batch_barrier))
3086 		{
3087 			/*
3088 			 * Technically we shouldn't access the barrier because we're no
3089 			 * longer attached, but since there is no way it's moving after
3090 			 * this point it seems safe to make the following assertion.
3091 			 */
3092 			Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_DONE);
3093 
3094 			/* Free shared chunks and buckets. */
3095 			while (DsaPointerIsValid(batch->chunks))
3096 			{
3097 				HashMemoryChunk chunk =
3098 				dsa_get_address(hashtable->area, batch->chunks);
3099 				dsa_pointer next = chunk->next.shared;
3100 
3101 				dsa_free(hashtable->area, batch->chunks);
3102 				batch->chunks = next;
3103 			}
3104 			if (DsaPointerIsValid(batch->buckets))
3105 			{
3106 				dsa_free(hashtable->area, batch->buckets);
3107 				batch->buckets = InvalidDsaPointer;
3108 			}
3109 		}
3110 
3111 		/*
3112 		 * Track the largest batch we've been attached to.  Though each
3113 		 * backend might see a different subset of batches, explain.c will
3114 		 * scan the results from all backends to find the largest value.
3115 		 */
3116 		hashtable->spacePeak =
3117 			Max(hashtable->spacePeak,
3118 				batch->size + sizeof(dsa_pointer_atomic) * hashtable->nbuckets);
3119 
3120 		/* Remember that we are not attached to a batch. */
3121 		hashtable->curbatch = -1;
3122 	}
3123 }
3124 
3125 /*
3126  * Detach from all shared resources.  If we are last to detach, clean up.
3127  */
3128 void
ExecHashTableDetach(HashJoinTable hashtable)3129 ExecHashTableDetach(HashJoinTable hashtable)
3130 {
3131 	if (hashtable->parallel_state)
3132 	{
3133 		ParallelHashJoinState *pstate = hashtable->parallel_state;
3134 		int			i;
3135 
3136 		/* Make sure any temporary files are closed. */
3137 		if (hashtable->batches)
3138 		{
3139 			for (i = 0; i < hashtable->nbatch; ++i)
3140 			{
3141 				sts_end_write(hashtable->batches[i].inner_tuples);
3142 				sts_end_write(hashtable->batches[i].outer_tuples);
3143 				sts_end_parallel_scan(hashtable->batches[i].inner_tuples);
3144 				sts_end_parallel_scan(hashtable->batches[i].outer_tuples);
3145 			}
3146 		}
3147 
3148 		/* If we're last to detach, clean up shared memory. */
3149 		if (BarrierDetach(&pstate->build_barrier))
3150 		{
3151 			if (DsaPointerIsValid(pstate->batches))
3152 			{
3153 				dsa_free(hashtable->area, pstate->batches);
3154 				pstate->batches = InvalidDsaPointer;
3155 			}
3156 		}
3157 
3158 		hashtable->parallel_state = NULL;
3159 	}
3160 }
3161 
3162 /*
3163  * Get the first tuple in a given bucket identified by number.
3164  */
3165 static inline HashJoinTuple
ExecParallelHashFirstTuple(HashJoinTable hashtable,int bucketno)3166 ExecParallelHashFirstTuple(HashJoinTable hashtable, int bucketno)
3167 {
3168 	HashJoinTuple tuple;
3169 	dsa_pointer p;
3170 
3171 	Assert(hashtable->parallel_state);
3172 	p = dsa_pointer_atomic_read(&hashtable->buckets.shared[bucketno]);
3173 	tuple = (HashJoinTuple) dsa_get_address(hashtable->area, p);
3174 
3175 	return tuple;
3176 }
3177 
3178 /*
3179  * Get the next tuple in the same bucket as 'tuple'.
3180  */
3181 static inline HashJoinTuple
ExecParallelHashNextTuple(HashJoinTable hashtable,HashJoinTuple tuple)3182 ExecParallelHashNextTuple(HashJoinTable hashtable, HashJoinTuple tuple)
3183 {
3184 	HashJoinTuple next;
3185 
3186 	Assert(hashtable->parallel_state);
3187 	next = (HashJoinTuple) dsa_get_address(hashtable->area, tuple->next.shared);
3188 
3189 	return next;
3190 }
3191 
3192 /*
3193  * Insert a tuple at the front of a chain of tuples in DSA memory atomically.
3194  */
3195 static inline void
ExecParallelHashPushTuple(dsa_pointer_atomic * head,HashJoinTuple tuple,dsa_pointer tuple_shared)3196 ExecParallelHashPushTuple(dsa_pointer_atomic *head,
3197 						  HashJoinTuple tuple,
3198 						  dsa_pointer tuple_shared)
3199 {
3200 	for (;;)
3201 	{
3202 		tuple->next.shared = dsa_pointer_atomic_read(head);
3203 		if (dsa_pointer_atomic_compare_exchange(head,
3204 												&tuple->next.shared,
3205 												tuple_shared))
3206 			break;
3207 	}
3208 }
3209 
3210 /*
3211  * Prepare to work on a given batch.
3212  */
3213 void
ExecParallelHashTableSetCurrentBatch(HashJoinTable hashtable,int batchno)3214 ExecParallelHashTableSetCurrentBatch(HashJoinTable hashtable, int batchno)
3215 {
3216 	Assert(hashtable->batches[batchno].shared->buckets != InvalidDsaPointer);
3217 
3218 	hashtable->curbatch = batchno;
3219 	hashtable->buckets.shared = (dsa_pointer_atomic *)
3220 		dsa_get_address(hashtable->area,
3221 						hashtable->batches[batchno].shared->buckets);
3222 	hashtable->nbuckets = hashtable->parallel_state->nbuckets;
3223 	hashtable->log2_nbuckets = my_log2(hashtable->nbuckets);
3224 	hashtable->current_chunk = NULL;
3225 	hashtable->current_chunk_shared = InvalidDsaPointer;
3226 	hashtable->batches[batchno].at_least_one_chunk = false;
3227 }
3228 
3229 /*
3230  * Take the next available chunk from the queue of chunks being worked on in
3231  * parallel.  Return NULL if there are none left.  Otherwise return a pointer
3232  * to the chunk, and set *shared to the DSA pointer to the chunk.
3233  */
3234 static HashMemoryChunk
ExecParallelHashPopChunkQueue(HashJoinTable hashtable,dsa_pointer * shared)3235 ExecParallelHashPopChunkQueue(HashJoinTable hashtable, dsa_pointer *shared)
3236 {
3237 	ParallelHashJoinState *pstate = hashtable->parallel_state;
3238 	HashMemoryChunk chunk;
3239 
3240 	LWLockAcquire(&pstate->lock, LW_EXCLUSIVE);
3241 	if (DsaPointerIsValid(pstate->chunk_work_queue))
3242 	{
3243 		*shared = pstate->chunk_work_queue;
3244 		chunk = (HashMemoryChunk)
3245 			dsa_get_address(hashtable->area, *shared);
3246 		pstate->chunk_work_queue = chunk->next.shared;
3247 	}
3248 	else
3249 		chunk = NULL;
3250 	LWLockRelease(&pstate->lock);
3251 
3252 	return chunk;
3253 }
3254 
3255 /*
3256  * Increase the space preallocated in this backend for a given inner batch by
3257  * at least a given amount.  This allows us to track whether a given batch
3258  * would fit in memory when loaded back in.  Also increase the number of
3259  * batches or buckets if required.
3260  *
3261  * This maintains a running estimation of how much space will be taken when we
3262  * load the batch back into memory by simulating the way chunks will be handed
3263  * out to workers.  It's not perfectly accurate because the tuples will be
3264  * packed into memory chunks differently by ExecParallelHashTupleAlloc(), but
3265  * it should be pretty close.  It tends to overestimate by a fraction of a
3266  * chunk per worker since all workers gang up to preallocate during hashing,
3267  * but workers tend to reload batches alone if there are enough to go around,
3268  * leaving fewer partially filled chunks.  This effect is bounded by
3269  * nparticipants.
3270  *
3271  * Return false if the number of batches or buckets has changed, and the
3272  * caller should reconsider which batch a given tuple now belongs in and call
3273  * again.
3274  */
3275 static bool
ExecParallelHashTuplePrealloc(HashJoinTable hashtable,int batchno,size_t size)3276 ExecParallelHashTuplePrealloc(HashJoinTable hashtable, int batchno, size_t size)
3277 {
3278 	ParallelHashJoinState *pstate = hashtable->parallel_state;
3279 	ParallelHashJoinBatchAccessor *batch = &hashtable->batches[batchno];
3280 	size_t		want = Max(size, HASH_CHUNK_SIZE - HASH_CHUNK_HEADER_SIZE);
3281 
3282 	Assert(batchno > 0);
3283 	Assert(batchno < hashtable->nbatch);
3284 	Assert(size == MAXALIGN(size));
3285 
3286 	LWLockAcquire(&pstate->lock, LW_EXCLUSIVE);
3287 
3288 	/* Has another participant commanded us to help grow? */
3289 	if (pstate->growth == PHJ_GROWTH_NEED_MORE_BATCHES ||
3290 		pstate->growth == PHJ_GROWTH_NEED_MORE_BUCKETS)
3291 	{
3292 		ParallelHashGrowth growth = pstate->growth;
3293 
3294 		LWLockRelease(&pstate->lock);
3295 		if (growth == PHJ_GROWTH_NEED_MORE_BATCHES)
3296 			ExecParallelHashIncreaseNumBatches(hashtable);
3297 		else if (growth == PHJ_GROWTH_NEED_MORE_BUCKETS)
3298 			ExecParallelHashIncreaseNumBuckets(hashtable);
3299 
3300 		return false;
3301 	}
3302 
3303 	if (pstate->growth != PHJ_GROWTH_DISABLED &&
3304 		batch->at_least_one_chunk &&
3305 		(batch->shared->estimated_size + want + HASH_CHUNK_HEADER_SIZE
3306 		 > pstate->space_allowed))
3307 	{
3308 		/*
3309 		 * We have determined that this batch would exceed the space budget if
3310 		 * loaded into memory.  Command all participants to help repartition.
3311 		 */
3312 		batch->shared->space_exhausted = true;
3313 		pstate->growth = PHJ_GROWTH_NEED_MORE_BATCHES;
3314 		LWLockRelease(&pstate->lock);
3315 
3316 		return false;
3317 	}
3318 
3319 	batch->at_least_one_chunk = true;
3320 	batch->shared->estimated_size += want + HASH_CHUNK_HEADER_SIZE;
3321 	batch->preallocated = want;
3322 	LWLockRelease(&pstate->lock);
3323 
3324 	return true;
3325 }
3326