1 /*-------------------------------------------------------------------------
2  *
3  * nodeHashjoin.c
4  *	  Routines to handle hash join nodes
5  *
6  * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/executor/nodeHashjoin.c
12  *
13  * PARALLELISM
14  *
15  * Hash joins can participate in parallel query execution in several ways.  A
16  * parallel-oblivious hash join is one where the node is unaware that it is
17  * part of a parallel plan.  In this case, a copy of the inner plan is used to
18  * build a copy of the hash table in every backend, and the outer plan could
19  * either be built from a partial or complete path, so that the results of the
20  * hash join are correspondingly either partial or complete.  A parallel-aware
21  * hash join is one that behaves differently, coordinating work between
22  * backends, and appears as Parallel Hash Join in EXPLAIN output.  A Parallel
23  * Hash Join always appears with a Parallel Hash node.
24  *
25  * Parallel-aware hash joins use the same per-backend state machine to track
26  * progress through the hash join algorithm as parallel-oblivious hash joins.
27  * In a parallel-aware hash join, there is also a shared state machine that
28  * co-operating backends use to synchronize their local state machines and
29  * program counters.  The shared state machine is managed with a Barrier IPC
30  * primitive.  When all attached participants arrive at a barrier, the phase
31  * advances and all waiting participants are released.
32  *
33  * When a participant begins working on a parallel hash join, it must first
34  * figure out how much progress has already been made, because participants
35  * don't wait for each other to begin.  For this reason there are switch
36  * statements at key points in the code where we have to synchronize our local
37  * state machine with the phase, and then jump to the correct part of the
38  * algorithm so that we can get started.
39  *
40  * One barrier called build_barrier is used to coordinate the hashing phases.
41  * The phase is represented by an integer which begins at zero and increments
42  * one by one, but in the code it is referred to by symbolic names as follows:
43  *
44  *   PHJ_BUILD_ELECTING              -- initial state
45  *   PHJ_BUILD_ALLOCATING            -- one sets up the batches and table 0
46  *   PHJ_BUILD_HASHING_INNER         -- all hash the inner rel
47  *   PHJ_BUILD_HASHING_OUTER         -- (multi-batch only) all hash the outer
48  *   PHJ_BUILD_DONE                  -- building done, probing can begin
49  *
50  * While in the phase PHJ_BUILD_HASHING_INNER a separate pair of barriers may
51  * be used repeatedly as required to coordinate expansions in the number of
52  * batches or buckets.  Their phases are as follows:
53  *
54  *   PHJ_GROW_BATCHES_ELECTING       -- initial state
55  *   PHJ_GROW_BATCHES_ALLOCATING     -- one allocates new batches
56  *   PHJ_GROW_BATCHES_REPARTITIONING -- all repartition
57  *   PHJ_GROW_BATCHES_FINISHING      -- one cleans up, detects skew
58  *
59  *   PHJ_GROW_BUCKETS_ELECTING       -- initial state
60  *   PHJ_GROW_BUCKETS_ALLOCATING     -- one allocates new buckets
61  *   PHJ_GROW_BUCKETS_REINSERTING    -- all insert tuples
62  *
63  * If the planner got the number of batches and buckets right, those won't be
64  * necessary, but on the other hand we might finish up needing to expand the
65  * buckets or batches multiple times while hashing the inner relation to stay
66  * within our memory budget and load factor target.  For that reason it's a
67  * separate pair of barriers using circular phases.
68  *
69  * The PHJ_BUILD_HASHING_OUTER phase is required only for multi-batch joins,
70  * because we need to divide the outer relation into batches up front in order
71  * to be able to process batches entirely independently.  In contrast, the
72  * parallel-oblivious algorithm simply throws tuples 'forward' to 'later'
73  * batches whenever it encounters them while scanning and probing, which it
74  * can do because it processes batches in serial order.
75  *
76  * Once PHJ_BUILD_DONE is reached, backends then split up and process
77  * different batches, or gang up and work together on probing batches if there
78  * aren't enough to go around.  For each batch there is a separate barrier
79  * with the following phases:
80  *
81  *  PHJ_BATCH_ELECTING       -- initial state
82  *  PHJ_BATCH_ALLOCATING     -- one allocates buckets
83  *  PHJ_BATCH_LOADING        -- all load the hash table from disk
84  *  PHJ_BATCH_PROBING        -- all probe
85  *  PHJ_BATCH_DONE           -- end
86  *
87  * Batch 0 is a special case, because it starts out in phase
88  * PHJ_BATCH_PROBING; populating batch 0's hash table is done during
89  * PHJ_BUILD_HASHING_INNER so we can skip loading.
90  *
91  * Initially we try to plan for a single-batch hash join using the combined
92  * hash_mem of all participants to create a large shared hash table.  If that
93  * turns out either at planning or execution time to be impossible then we
94  * fall back to regular hash_mem sized hash tables.
95  *
96  * To avoid deadlocks, we never wait for any barrier unless it is known that
97  * all other backends attached to it are actively executing the node or have
98  * already arrived.  Practically, that means that we never return a tuple
99  * while attached to a barrier, unless the barrier has reached its final
100  * state.  In the slightly special case of the per-batch barrier, we return
101  * tuples while in PHJ_BATCH_PROBING phase, but that's OK because we use
102  * BarrierArriveAndDetach() to advance it to PHJ_BATCH_DONE without waiting.
103  *
104  *-------------------------------------------------------------------------
105  */
106 
107 #include "postgres.h"
108 
109 #include "access/htup_details.h"
110 #include "access/parallel.h"
111 #include "executor/executor.h"
112 #include "executor/hashjoin.h"
113 #include "executor/nodeHash.h"
114 #include "executor/nodeHashjoin.h"
115 #include "miscadmin.h"
116 #include "pgstat.h"
117 #include "utils/memutils.h"
118 #include "utils/sharedtuplestore.h"
119 
120 
121 /*
122  * States of the ExecHashJoin state machine
123  */
124 #define HJ_BUILD_HASHTABLE		1
125 #define HJ_NEED_NEW_OUTER		2
126 #define HJ_SCAN_BUCKET			3
127 #define HJ_FILL_OUTER_TUPLE		4
128 #define HJ_FILL_INNER_TUPLES	5
129 #define HJ_NEED_NEW_BATCH		6
130 
131 /* Returns true if doing null-fill on outer relation */
132 #define HJ_FILL_OUTER(hjstate)	((hjstate)->hj_NullInnerTupleSlot != NULL)
133 /* Returns true if doing null-fill on inner relation */
134 #define HJ_FILL_INNER(hjstate)	((hjstate)->hj_NullOuterTupleSlot != NULL)
135 
136 static TupleTableSlot *ExecHashJoinOuterGetTuple(PlanState *outerNode,
137 												 HashJoinState *hjstate,
138 												 uint32 *hashvalue);
139 static TupleTableSlot *ExecParallelHashJoinOuterGetTuple(PlanState *outerNode,
140 														 HashJoinState *hjstate,
141 														 uint32 *hashvalue);
142 static TupleTableSlot *ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
143 												 BufFile *file,
144 												 uint32 *hashvalue,
145 												 TupleTableSlot *tupleSlot);
146 static bool ExecHashJoinNewBatch(HashJoinState *hjstate);
147 static bool ExecParallelHashJoinNewBatch(HashJoinState *hjstate);
148 static void ExecParallelHashJoinPartitionOuter(HashJoinState *node);
149 
150 
151 /* ----------------------------------------------------------------
152  *		ExecHashJoinImpl
153  *
154  *		This function implements the Hybrid Hashjoin algorithm.  It is marked
155  *		with an always-inline attribute so that ExecHashJoin() and
156  *		ExecParallelHashJoin() can inline it.  Compilers that respect the
157  *		attribute should create versions specialized for parallel == true and
158  *		parallel == false with unnecessary branches removed.
159  *
160  *		Note: the relation we build hash table on is the "inner"
161  *			  the other one is "outer".
162  * ----------------------------------------------------------------
163  */
164 static pg_attribute_always_inline TupleTableSlot *
ExecHashJoinImpl(PlanState * pstate,bool parallel)165 ExecHashJoinImpl(PlanState *pstate, bool parallel)
166 {
167 	HashJoinState *node = castNode(HashJoinState, pstate);
168 	PlanState  *outerNode;
169 	HashState  *hashNode;
170 	ExprState  *joinqual;
171 	ExprState  *otherqual;
172 	ExprContext *econtext;
173 	HashJoinTable hashtable;
174 	TupleTableSlot *outerTupleSlot;
175 	uint32		hashvalue;
176 	int			batchno;
177 	ParallelHashJoinState *parallel_state;
178 
179 	/*
180 	 * get information from HashJoin node
181 	 */
182 	joinqual = node->js.joinqual;
183 	otherqual = node->js.ps.qual;
184 	hashNode = (HashState *) innerPlanState(node);
185 	outerNode = outerPlanState(node);
186 	hashtable = node->hj_HashTable;
187 	econtext = node->js.ps.ps_ExprContext;
188 	parallel_state = hashNode->parallel_state;
189 
190 	/*
191 	 * Reset per-tuple memory context to free any expression evaluation
192 	 * storage allocated in the previous tuple cycle.
193 	 */
194 	ResetExprContext(econtext);
195 
196 	/*
197 	 * run the hash join state machine
198 	 */
199 	for (;;)
200 	{
201 		/*
202 		 * It's possible to iterate this loop many times before returning a
203 		 * tuple, in some pathological cases such as needing to move much of
204 		 * the current batch to a later batch.  So let's check for interrupts
205 		 * each time through.
206 		 */
207 		CHECK_FOR_INTERRUPTS();
208 
209 		switch (node->hj_JoinState)
210 		{
211 			case HJ_BUILD_HASHTABLE:
212 
213 				/*
214 				 * First time through: build hash table for inner relation.
215 				 */
216 				Assert(hashtable == NULL);
217 
218 				/*
219 				 * If the outer relation is completely empty, and it's not
220 				 * right/full join, we can quit without building the hash
221 				 * table.  However, for an inner join it is only a win to
222 				 * check this when the outer relation's startup cost is less
223 				 * than the projected cost of building the hash table.
224 				 * Otherwise it's best to build the hash table first and see
225 				 * if the inner relation is empty.  (When it's a left join, we
226 				 * should always make this check, since we aren't going to be
227 				 * able to skip the join on the strength of an empty inner
228 				 * relation anyway.)
229 				 *
230 				 * If we are rescanning the join, we make use of information
231 				 * gained on the previous scan: don't bother to try the
232 				 * prefetch if the previous scan found the outer relation
233 				 * nonempty. This is not 100% reliable since with new
234 				 * parameters the outer relation might yield different
235 				 * results, but it's a good heuristic.
236 				 *
237 				 * The only way to make the check is to try to fetch a tuple
238 				 * from the outer plan node.  If we succeed, we have to stash
239 				 * it away for later consumption by ExecHashJoinOuterGetTuple.
240 				 */
241 				if (HJ_FILL_INNER(node))
242 				{
243 					/* no chance to not build the hash table */
244 					node->hj_FirstOuterTupleSlot = NULL;
245 				}
246 				else if (parallel)
247 				{
248 					/*
249 					 * The empty-outer optimization is not implemented for
250 					 * shared hash tables, because no one participant can
251 					 * determine that there are no outer tuples, and it's not
252 					 * yet clear that it's worth the synchronization overhead
253 					 * of reaching consensus to figure that out.  So we have
254 					 * to build the hash table.
255 					 */
256 					node->hj_FirstOuterTupleSlot = NULL;
257 				}
258 				else if (HJ_FILL_OUTER(node) ||
259 						 (outerNode->plan->startup_cost < hashNode->ps.plan->total_cost &&
260 						  !node->hj_OuterNotEmpty))
261 				{
262 					node->hj_FirstOuterTupleSlot = ExecProcNode(outerNode);
263 					if (TupIsNull(node->hj_FirstOuterTupleSlot))
264 					{
265 						node->hj_OuterNotEmpty = false;
266 						return NULL;
267 					}
268 					else
269 						node->hj_OuterNotEmpty = true;
270 				}
271 				else
272 					node->hj_FirstOuterTupleSlot = NULL;
273 
274 				/*
275 				 * Create the hash table.  If using Parallel Hash, then
276 				 * whoever gets here first will create the hash table and any
277 				 * later arrivals will merely attach to it.
278 				 */
279 				hashtable = ExecHashTableCreate(hashNode,
280 												node->hj_HashOperators,
281 												node->hj_Collations,
282 												HJ_FILL_INNER(node));
283 				node->hj_HashTable = hashtable;
284 
285 				/*
286 				 * Execute the Hash node, to build the hash table.  If using
287 				 * Parallel Hash, then we'll try to help hashing unless we
288 				 * arrived too late.
289 				 */
290 				hashNode->hashtable = hashtable;
291 				(void) MultiExecProcNode((PlanState *) hashNode);
292 
293 				/*
294 				 * If the inner relation is completely empty, and we're not
295 				 * doing a left outer join, we can quit without scanning the
296 				 * outer relation.
297 				 */
298 				if (hashtable->totalTuples == 0 && !HJ_FILL_OUTER(node))
299 					return NULL;
300 
301 				/*
302 				 * need to remember whether nbatch has increased since we
303 				 * began scanning the outer relation
304 				 */
305 				hashtable->nbatch_outstart = hashtable->nbatch;
306 
307 				/*
308 				 * Reset OuterNotEmpty for scan.  (It's OK if we fetched a
309 				 * tuple above, because ExecHashJoinOuterGetTuple will
310 				 * immediately set it again.)
311 				 */
312 				node->hj_OuterNotEmpty = false;
313 
314 				if (parallel)
315 				{
316 					Barrier    *build_barrier;
317 
318 					build_barrier = &parallel_state->build_barrier;
319 					Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER ||
320 						   BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
321 					if (BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER)
322 					{
323 						/*
324 						 * If multi-batch, we need to hash the outer relation
325 						 * up front.
326 						 */
327 						if (hashtable->nbatch > 1)
328 							ExecParallelHashJoinPartitionOuter(node);
329 						BarrierArriveAndWait(build_barrier,
330 											 WAIT_EVENT_HASH_BUILD_HASH_OUTER);
331 					}
332 					Assert(BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
333 
334 					/* Each backend should now select a batch to work on. */
335 					hashtable->curbatch = -1;
336 					node->hj_JoinState = HJ_NEED_NEW_BATCH;
337 
338 					continue;
339 				}
340 				else
341 					node->hj_JoinState = HJ_NEED_NEW_OUTER;
342 
343 				/* FALL THRU */
344 
345 			case HJ_NEED_NEW_OUTER:
346 
347 				/*
348 				 * We don't have an outer tuple, try to get the next one
349 				 */
350 				if (parallel)
351 					outerTupleSlot =
352 						ExecParallelHashJoinOuterGetTuple(outerNode, node,
353 														  &hashvalue);
354 				else
355 					outerTupleSlot =
356 						ExecHashJoinOuterGetTuple(outerNode, node, &hashvalue);
357 
358 				if (TupIsNull(outerTupleSlot))
359 				{
360 					/* end of batch, or maybe whole join */
361 					if (HJ_FILL_INNER(node))
362 					{
363 						/* set up to scan for unmatched inner tuples */
364 						ExecPrepHashTableForUnmatched(node);
365 						node->hj_JoinState = HJ_FILL_INNER_TUPLES;
366 					}
367 					else
368 						node->hj_JoinState = HJ_NEED_NEW_BATCH;
369 					continue;
370 				}
371 
372 				econtext->ecxt_outertuple = outerTupleSlot;
373 				node->hj_MatchedOuter = false;
374 
375 				/*
376 				 * Find the corresponding bucket for this tuple in the main
377 				 * hash table or skew hash table.
378 				 */
379 				node->hj_CurHashValue = hashvalue;
380 				ExecHashGetBucketAndBatch(hashtable, hashvalue,
381 										  &node->hj_CurBucketNo, &batchno);
382 				node->hj_CurSkewBucketNo = ExecHashGetSkewBucket(hashtable,
383 																 hashvalue);
384 				node->hj_CurTuple = NULL;
385 
386 				/*
387 				 * The tuple might not belong to the current batch (where
388 				 * "current batch" includes the skew buckets if any).
389 				 */
390 				if (batchno != hashtable->curbatch &&
391 					node->hj_CurSkewBucketNo == INVALID_SKEW_BUCKET_NO)
392 				{
393 					bool		shouldFree;
394 					MinimalTuple mintuple = ExecFetchSlotMinimalTuple(outerTupleSlot,
395 																	  &shouldFree);
396 
397 					/*
398 					 * Need to postpone this outer tuple to a later batch.
399 					 * Save it in the corresponding outer-batch file.
400 					 */
401 					Assert(parallel_state == NULL);
402 					Assert(batchno > hashtable->curbatch);
403 					ExecHashJoinSaveTuple(mintuple, hashvalue,
404 										  &hashtable->outerBatchFile[batchno]);
405 
406 					if (shouldFree)
407 						heap_free_minimal_tuple(mintuple);
408 
409 					/* Loop around, staying in HJ_NEED_NEW_OUTER state */
410 					continue;
411 				}
412 
413 				/* OK, let's scan the bucket for matches */
414 				node->hj_JoinState = HJ_SCAN_BUCKET;
415 
416 				/* FALL THRU */
417 
418 			case HJ_SCAN_BUCKET:
419 
420 				/*
421 				 * Scan the selected hash bucket for matches to current outer
422 				 */
423 				if (parallel)
424 				{
425 					if (!ExecParallelScanHashBucket(node, econtext))
426 					{
427 						/* out of matches; check for possible outer-join fill */
428 						node->hj_JoinState = HJ_FILL_OUTER_TUPLE;
429 						continue;
430 					}
431 				}
432 				else
433 				{
434 					if (!ExecScanHashBucket(node, econtext))
435 					{
436 						/* out of matches; check for possible outer-join fill */
437 						node->hj_JoinState = HJ_FILL_OUTER_TUPLE;
438 						continue;
439 					}
440 				}
441 
442 				/*
443 				 * We've got a match, but still need to test non-hashed quals.
444 				 * ExecScanHashBucket already set up all the state needed to
445 				 * call ExecQual.
446 				 *
447 				 * If we pass the qual, then save state for next call and have
448 				 * ExecProject form the projection, store it in the tuple
449 				 * table, and return the slot.
450 				 *
451 				 * Only the joinquals determine tuple match status, but all
452 				 * quals must pass to actually return the tuple.
453 				 */
454 				if (joinqual == NULL || ExecQual(joinqual, econtext))
455 				{
456 					node->hj_MatchedOuter = true;
457 
458 					if (parallel)
459 					{
460 						/*
461 						 * Full/right outer joins are currently not supported
462 						 * for parallel joins, so we don't need to set the
463 						 * match bit.  Experiments show that it's worth
464 						 * avoiding the shared memory traffic on large
465 						 * systems.
466 						 */
467 						Assert(!HJ_FILL_INNER(node));
468 					}
469 					else
470 					{
471 						/*
472 						 * This is really only needed if HJ_FILL_INNER(node),
473 						 * but we'll avoid the branch and just set it always.
474 						 */
475 						HeapTupleHeaderSetMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple));
476 					}
477 
478 					/* In an antijoin, we never return a matched tuple */
479 					if (node->js.jointype == JOIN_ANTI)
480 					{
481 						node->hj_JoinState = HJ_NEED_NEW_OUTER;
482 						continue;
483 					}
484 
485 					/*
486 					 * If we only need to join to the first matching inner
487 					 * tuple, then consider returning this one, but after that
488 					 * continue with next outer tuple.
489 					 */
490 					if (node->js.single_match)
491 						node->hj_JoinState = HJ_NEED_NEW_OUTER;
492 
493 					if (otherqual == NULL || ExecQual(otherqual, econtext))
494 						return ExecProject(node->js.ps.ps_ProjInfo);
495 					else
496 						InstrCountFiltered2(node, 1);
497 				}
498 				else
499 					InstrCountFiltered1(node, 1);
500 				break;
501 
502 			case HJ_FILL_OUTER_TUPLE:
503 
504 				/*
505 				 * The current outer tuple has run out of matches, so check
506 				 * whether to emit a dummy outer-join tuple.  Whether we emit
507 				 * one or not, the next state is NEED_NEW_OUTER.
508 				 */
509 				node->hj_JoinState = HJ_NEED_NEW_OUTER;
510 
511 				if (!node->hj_MatchedOuter &&
512 					HJ_FILL_OUTER(node))
513 				{
514 					/*
515 					 * Generate a fake join tuple with nulls for the inner
516 					 * tuple, and return it if it passes the non-join quals.
517 					 */
518 					econtext->ecxt_innertuple = node->hj_NullInnerTupleSlot;
519 
520 					if (otherqual == NULL || ExecQual(otherqual, econtext))
521 						return ExecProject(node->js.ps.ps_ProjInfo);
522 					else
523 						InstrCountFiltered2(node, 1);
524 				}
525 				break;
526 
527 			case HJ_FILL_INNER_TUPLES:
528 
529 				/*
530 				 * We have finished a batch, but we are doing right/full join,
531 				 * so any unmatched inner tuples in the hashtable have to be
532 				 * emitted before we continue to the next batch.
533 				 */
534 				if (!ExecScanHashTableForUnmatched(node, econtext))
535 				{
536 					/* no more unmatched tuples */
537 					node->hj_JoinState = HJ_NEED_NEW_BATCH;
538 					continue;
539 				}
540 
541 				/*
542 				 * Generate a fake join tuple with nulls for the outer tuple,
543 				 * and return it if it passes the non-join quals.
544 				 */
545 				econtext->ecxt_outertuple = node->hj_NullOuterTupleSlot;
546 
547 				if (otherqual == NULL || ExecQual(otherqual, econtext))
548 					return ExecProject(node->js.ps.ps_ProjInfo);
549 				else
550 					InstrCountFiltered2(node, 1);
551 				break;
552 
553 			case HJ_NEED_NEW_BATCH:
554 
555 				/*
556 				 * Try to advance to next batch.  Done if there are no more.
557 				 */
558 				if (parallel)
559 				{
560 					if (!ExecParallelHashJoinNewBatch(node))
561 						return NULL;	/* end of parallel-aware join */
562 				}
563 				else
564 				{
565 					if (!ExecHashJoinNewBatch(node))
566 						return NULL;	/* end of parallel-oblivious join */
567 				}
568 				node->hj_JoinState = HJ_NEED_NEW_OUTER;
569 				break;
570 
571 			default:
572 				elog(ERROR, "unrecognized hashjoin state: %d",
573 					 (int) node->hj_JoinState);
574 		}
575 	}
576 }
577 
578 /* ----------------------------------------------------------------
579  *		ExecHashJoin
580  *
581  *		Parallel-oblivious version.
582  * ----------------------------------------------------------------
583  */
584 static TupleTableSlot *			/* return: a tuple or NULL */
ExecHashJoin(PlanState * pstate)585 ExecHashJoin(PlanState *pstate)
586 {
587 	/*
588 	 * On sufficiently smart compilers this should be inlined with the
589 	 * parallel-aware branches removed.
590 	 */
591 	return ExecHashJoinImpl(pstate, false);
592 }
593 
594 /* ----------------------------------------------------------------
595  *		ExecParallelHashJoin
596  *
597  *		Parallel-aware version.
598  * ----------------------------------------------------------------
599  */
600 static TupleTableSlot *			/* return: a tuple or NULL */
ExecParallelHashJoin(PlanState * pstate)601 ExecParallelHashJoin(PlanState *pstate)
602 {
603 	/*
604 	 * On sufficiently smart compilers this should be inlined with the
605 	 * parallel-oblivious branches removed.
606 	 */
607 	return ExecHashJoinImpl(pstate, true);
608 }
609 
610 /* ----------------------------------------------------------------
611  *		ExecInitHashJoin
612  *
613  *		Init routine for HashJoin node.
614  * ----------------------------------------------------------------
615  */
616 HashJoinState *
ExecInitHashJoin(HashJoin * node,EState * estate,int eflags)617 ExecInitHashJoin(HashJoin *node, EState *estate, int eflags)
618 {
619 	HashJoinState *hjstate;
620 	Plan	   *outerNode;
621 	Hash	   *hashNode;
622 	TupleDesc	outerDesc,
623 				innerDesc;
624 	const TupleTableSlotOps *ops;
625 
626 	/* check for unsupported flags */
627 	Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
628 
629 	/*
630 	 * create state structure
631 	 */
632 	hjstate = makeNode(HashJoinState);
633 	hjstate->js.ps.plan = (Plan *) node;
634 	hjstate->js.ps.state = estate;
635 
636 	/*
637 	 * See ExecHashJoinInitializeDSM() and ExecHashJoinInitializeWorker()
638 	 * where this function may be replaced with a parallel version, if we
639 	 * managed to launch a parallel query.
640 	 */
641 	hjstate->js.ps.ExecProcNode = ExecHashJoin;
642 	hjstate->js.jointype = node->join.jointype;
643 
644 	/*
645 	 * Miscellaneous initialization
646 	 *
647 	 * create expression context for node
648 	 */
649 	ExecAssignExprContext(estate, &hjstate->js.ps);
650 
651 	/*
652 	 * initialize child nodes
653 	 *
654 	 * Note: we could suppress the REWIND flag for the inner input, which
655 	 * would amount to betting that the hash will be a single batch.  Not
656 	 * clear if this would be a win or not.
657 	 */
658 	outerNode = outerPlan(node);
659 	hashNode = (Hash *) innerPlan(node);
660 
661 	outerPlanState(hjstate) = ExecInitNode(outerNode, estate, eflags);
662 	outerDesc = ExecGetResultType(outerPlanState(hjstate));
663 	innerPlanState(hjstate) = ExecInitNode((Plan *) hashNode, estate, eflags);
664 	innerDesc = ExecGetResultType(innerPlanState(hjstate));
665 
666 	/*
667 	 * Initialize result slot, type and projection.
668 	 */
669 	ExecInitResultTupleSlotTL(&hjstate->js.ps, &TTSOpsVirtual);
670 	ExecAssignProjectionInfo(&hjstate->js.ps, NULL);
671 
672 	/*
673 	 * tuple table initialization
674 	 */
675 	ops = ExecGetResultSlotOps(outerPlanState(hjstate), NULL);
676 	hjstate->hj_OuterTupleSlot = ExecInitExtraTupleSlot(estate, outerDesc,
677 														ops);
678 
679 	/*
680 	 * detect whether we need only consider the first matching inner tuple
681 	 */
682 	hjstate->js.single_match = (node->join.inner_unique ||
683 								node->join.jointype == JOIN_SEMI);
684 
685 	/* set up null tuples for outer joins, if needed */
686 	switch (node->join.jointype)
687 	{
688 		case JOIN_INNER:
689 		case JOIN_SEMI:
690 			break;
691 		case JOIN_LEFT:
692 		case JOIN_ANTI:
693 			hjstate->hj_NullInnerTupleSlot =
694 				ExecInitNullTupleSlot(estate, innerDesc, &TTSOpsVirtual);
695 			break;
696 		case JOIN_RIGHT:
697 			hjstate->hj_NullOuterTupleSlot =
698 				ExecInitNullTupleSlot(estate, outerDesc, &TTSOpsVirtual);
699 			break;
700 		case JOIN_FULL:
701 			hjstate->hj_NullOuterTupleSlot =
702 				ExecInitNullTupleSlot(estate, outerDesc, &TTSOpsVirtual);
703 			hjstate->hj_NullInnerTupleSlot =
704 				ExecInitNullTupleSlot(estate, innerDesc, &TTSOpsVirtual);
705 			break;
706 		default:
707 			elog(ERROR, "unrecognized join type: %d",
708 				 (int) node->join.jointype);
709 	}
710 
711 	/*
712 	 * now for some voodoo.  our temporary tuple slot is actually the result
713 	 * tuple slot of the Hash node (which is our inner plan).  we can do this
714 	 * because Hash nodes don't return tuples via ExecProcNode() -- instead
715 	 * the hash join node uses ExecScanHashBucket() to get at the contents of
716 	 * the hash table.  -cim 6/9/91
717 	 */
718 	{
719 		HashState  *hashstate = (HashState *) innerPlanState(hjstate);
720 		TupleTableSlot *slot = hashstate->ps.ps_ResultTupleSlot;
721 
722 		hjstate->hj_HashTupleSlot = slot;
723 	}
724 
725 	/*
726 	 * initialize child expressions
727 	 */
728 	hjstate->js.ps.qual =
729 		ExecInitQual(node->join.plan.qual, (PlanState *) hjstate);
730 	hjstate->js.joinqual =
731 		ExecInitQual(node->join.joinqual, (PlanState *) hjstate);
732 	hjstate->hashclauses =
733 		ExecInitQual(node->hashclauses, (PlanState *) hjstate);
734 
735 	/*
736 	 * initialize hash-specific info
737 	 */
738 	hjstate->hj_HashTable = NULL;
739 	hjstate->hj_FirstOuterTupleSlot = NULL;
740 
741 	hjstate->hj_CurHashValue = 0;
742 	hjstate->hj_CurBucketNo = 0;
743 	hjstate->hj_CurSkewBucketNo = INVALID_SKEW_BUCKET_NO;
744 	hjstate->hj_CurTuple = NULL;
745 
746 	hjstate->hj_OuterHashKeys = ExecInitExprList(node->hashkeys,
747 												 (PlanState *) hjstate);
748 	hjstate->hj_HashOperators = node->hashoperators;
749 	hjstate->hj_Collations = node->hashcollations;
750 
751 	hjstate->hj_JoinState = HJ_BUILD_HASHTABLE;
752 	hjstate->hj_MatchedOuter = false;
753 	hjstate->hj_OuterNotEmpty = false;
754 
755 	return hjstate;
756 }
757 
758 /* ----------------------------------------------------------------
759  *		ExecEndHashJoin
760  *
761  *		clean up routine for HashJoin node
762  * ----------------------------------------------------------------
763  */
764 void
ExecEndHashJoin(HashJoinState * node)765 ExecEndHashJoin(HashJoinState *node)
766 {
767 	/*
768 	 * Free hash table
769 	 */
770 	if (node->hj_HashTable)
771 	{
772 		ExecHashTableDestroy(node->hj_HashTable);
773 		node->hj_HashTable = NULL;
774 	}
775 
776 	/*
777 	 * Free the exprcontext
778 	 */
779 	ExecFreeExprContext(&node->js.ps);
780 
781 	/*
782 	 * clean out the tuple table
783 	 */
784 	ExecClearTuple(node->js.ps.ps_ResultTupleSlot);
785 	ExecClearTuple(node->hj_OuterTupleSlot);
786 	ExecClearTuple(node->hj_HashTupleSlot);
787 
788 	/*
789 	 * clean up subtrees
790 	 */
791 	ExecEndNode(outerPlanState(node));
792 	ExecEndNode(innerPlanState(node));
793 }
794 
795 /*
796  * ExecHashJoinOuterGetTuple
797  *
798  *		get the next outer tuple for a parallel oblivious hashjoin: either by
799  *		executing the outer plan node in the first pass, or from the temp
800  *		files for the hashjoin batches.
801  *
802  * Returns a null slot if no more outer tuples (within the current batch).
803  *
804  * On success, the tuple's hash value is stored at *hashvalue --- this is
805  * either originally computed, or re-read from the temp file.
806  */
807 static TupleTableSlot *
ExecHashJoinOuterGetTuple(PlanState * outerNode,HashJoinState * hjstate,uint32 * hashvalue)808 ExecHashJoinOuterGetTuple(PlanState *outerNode,
809 						  HashJoinState *hjstate,
810 						  uint32 *hashvalue)
811 {
812 	HashJoinTable hashtable = hjstate->hj_HashTable;
813 	int			curbatch = hashtable->curbatch;
814 	TupleTableSlot *slot;
815 
816 	if (curbatch == 0)			/* if it is the first pass */
817 	{
818 		/*
819 		 * Check to see if first outer tuple was already fetched by
820 		 * ExecHashJoin() and not used yet.
821 		 */
822 		slot = hjstate->hj_FirstOuterTupleSlot;
823 		if (!TupIsNull(slot))
824 			hjstate->hj_FirstOuterTupleSlot = NULL;
825 		else
826 			slot = ExecProcNode(outerNode);
827 
828 		while (!TupIsNull(slot))
829 		{
830 			/*
831 			 * We have to compute the tuple's hash value.
832 			 */
833 			ExprContext *econtext = hjstate->js.ps.ps_ExprContext;
834 
835 			econtext->ecxt_outertuple = slot;
836 			if (ExecHashGetHashValue(hashtable, econtext,
837 									 hjstate->hj_OuterHashKeys,
838 									 true,	/* outer tuple */
839 									 HJ_FILL_OUTER(hjstate),
840 									 hashvalue))
841 			{
842 				/* remember outer relation is not empty for possible rescan */
843 				hjstate->hj_OuterNotEmpty = true;
844 
845 				return slot;
846 			}
847 
848 			/*
849 			 * That tuple couldn't match because of a NULL, so discard it and
850 			 * continue with the next one.
851 			 */
852 			slot = ExecProcNode(outerNode);
853 		}
854 	}
855 	else if (curbatch < hashtable->nbatch)
856 	{
857 		BufFile    *file = hashtable->outerBatchFile[curbatch];
858 
859 		/*
860 		 * In outer-join cases, we could get here even though the batch file
861 		 * is empty.
862 		 */
863 		if (file == NULL)
864 			return NULL;
865 
866 		slot = ExecHashJoinGetSavedTuple(hjstate,
867 										 file,
868 										 hashvalue,
869 										 hjstate->hj_OuterTupleSlot);
870 		if (!TupIsNull(slot))
871 			return slot;
872 	}
873 
874 	/* End of this batch */
875 	return NULL;
876 }
877 
878 /*
879  * ExecHashJoinOuterGetTuple variant for the parallel case.
880  */
881 static TupleTableSlot *
ExecParallelHashJoinOuterGetTuple(PlanState * outerNode,HashJoinState * hjstate,uint32 * hashvalue)882 ExecParallelHashJoinOuterGetTuple(PlanState *outerNode,
883 								  HashJoinState *hjstate,
884 								  uint32 *hashvalue)
885 {
886 	HashJoinTable hashtable = hjstate->hj_HashTable;
887 	int			curbatch = hashtable->curbatch;
888 	TupleTableSlot *slot;
889 
890 	/*
891 	 * In the Parallel Hash case we only run the outer plan directly for
892 	 * single-batch hash joins.  Otherwise we have to go to batch files, even
893 	 * for batch 0.
894 	 */
895 	if (curbatch == 0 && hashtable->nbatch == 1)
896 	{
897 		slot = ExecProcNode(outerNode);
898 
899 		while (!TupIsNull(slot))
900 		{
901 			ExprContext *econtext = hjstate->js.ps.ps_ExprContext;
902 
903 			econtext->ecxt_outertuple = slot;
904 			if (ExecHashGetHashValue(hashtable, econtext,
905 									 hjstate->hj_OuterHashKeys,
906 									 true,	/* outer tuple */
907 									 HJ_FILL_OUTER(hjstate),
908 									 hashvalue))
909 				return slot;
910 
911 			/*
912 			 * That tuple couldn't match because of a NULL, so discard it and
913 			 * continue with the next one.
914 			 */
915 			slot = ExecProcNode(outerNode);
916 		}
917 	}
918 	else if (curbatch < hashtable->nbatch)
919 	{
920 		MinimalTuple tuple;
921 
922 		tuple = sts_parallel_scan_next(hashtable->batches[curbatch].outer_tuples,
923 									   hashvalue);
924 		if (tuple != NULL)
925 		{
926 			ExecForceStoreMinimalTuple(tuple,
927 									   hjstate->hj_OuterTupleSlot,
928 									   false);
929 			slot = hjstate->hj_OuterTupleSlot;
930 			return slot;
931 		}
932 		else
933 			ExecClearTuple(hjstate->hj_OuterTupleSlot);
934 	}
935 
936 	/* End of this batch */
937 	return NULL;
938 }
939 
940 /*
941  * ExecHashJoinNewBatch
942  *		switch to a new hashjoin batch
943  *
944  * Returns true if successful, false if there are no more batches.
945  */
946 static bool
ExecHashJoinNewBatch(HashJoinState * hjstate)947 ExecHashJoinNewBatch(HashJoinState *hjstate)
948 {
949 	HashJoinTable hashtable = hjstate->hj_HashTable;
950 	int			nbatch;
951 	int			curbatch;
952 	BufFile    *innerFile;
953 	TupleTableSlot *slot;
954 	uint32		hashvalue;
955 
956 	nbatch = hashtable->nbatch;
957 	curbatch = hashtable->curbatch;
958 
959 	if (curbatch > 0)
960 	{
961 		/*
962 		 * We no longer need the previous outer batch file; close it right
963 		 * away to free disk space.
964 		 */
965 		if (hashtable->outerBatchFile[curbatch])
966 			BufFileClose(hashtable->outerBatchFile[curbatch]);
967 		hashtable->outerBatchFile[curbatch] = NULL;
968 	}
969 	else						/* we just finished the first batch */
970 	{
971 		/*
972 		 * Reset some of the skew optimization state variables, since we no
973 		 * longer need to consider skew tuples after the first batch. The
974 		 * memory context reset we are about to do will release the skew
975 		 * hashtable itself.
976 		 */
977 		hashtable->skewEnabled = false;
978 		hashtable->skewBucket = NULL;
979 		hashtable->skewBucketNums = NULL;
980 		hashtable->nSkewBuckets = 0;
981 		hashtable->spaceUsedSkew = 0;
982 	}
983 
984 	/*
985 	 * We can always skip over any batches that are completely empty on both
986 	 * sides.  We can sometimes skip over batches that are empty on only one
987 	 * side, but there are exceptions:
988 	 *
989 	 * 1. In a left/full outer join, we have to process outer batches even if
990 	 * the inner batch is empty.  Similarly, in a right/full outer join, we
991 	 * have to process inner batches even if the outer batch is empty.
992 	 *
993 	 * 2. If we have increased nbatch since the initial estimate, we have to
994 	 * scan inner batches since they might contain tuples that need to be
995 	 * reassigned to later inner batches.
996 	 *
997 	 * 3. Similarly, if we have increased nbatch since starting the outer
998 	 * scan, we have to rescan outer batches in case they contain tuples that
999 	 * need to be reassigned.
1000 	 */
1001 	curbatch++;
1002 	while (curbatch < nbatch &&
1003 		   (hashtable->outerBatchFile[curbatch] == NULL ||
1004 			hashtable->innerBatchFile[curbatch] == NULL))
1005 	{
1006 		if (hashtable->outerBatchFile[curbatch] &&
1007 			HJ_FILL_OUTER(hjstate))
1008 			break;				/* must process due to rule 1 */
1009 		if (hashtable->innerBatchFile[curbatch] &&
1010 			HJ_FILL_INNER(hjstate))
1011 			break;				/* must process due to rule 1 */
1012 		if (hashtable->innerBatchFile[curbatch] &&
1013 			nbatch != hashtable->nbatch_original)
1014 			break;				/* must process due to rule 2 */
1015 		if (hashtable->outerBatchFile[curbatch] &&
1016 			nbatch != hashtable->nbatch_outstart)
1017 			break;				/* must process due to rule 3 */
1018 		/* We can ignore this batch. */
1019 		/* Release associated temp files right away. */
1020 		if (hashtable->innerBatchFile[curbatch])
1021 			BufFileClose(hashtable->innerBatchFile[curbatch]);
1022 		hashtable->innerBatchFile[curbatch] = NULL;
1023 		if (hashtable->outerBatchFile[curbatch])
1024 			BufFileClose(hashtable->outerBatchFile[curbatch]);
1025 		hashtable->outerBatchFile[curbatch] = NULL;
1026 		curbatch++;
1027 	}
1028 
1029 	if (curbatch >= nbatch)
1030 		return false;			/* no more batches */
1031 
1032 	hashtable->curbatch = curbatch;
1033 
1034 	/*
1035 	 * Reload the hash table with the new inner batch (which could be empty)
1036 	 */
1037 	ExecHashTableReset(hashtable);
1038 
1039 	innerFile = hashtable->innerBatchFile[curbatch];
1040 
1041 	if (innerFile != NULL)
1042 	{
1043 		if (BufFileSeek(innerFile, 0, 0L, SEEK_SET))
1044 			ereport(ERROR,
1045 					(errcode_for_file_access(),
1046 					 errmsg("could not rewind hash-join temporary file")));
1047 
1048 		while ((slot = ExecHashJoinGetSavedTuple(hjstate,
1049 												 innerFile,
1050 												 &hashvalue,
1051 												 hjstate->hj_HashTupleSlot)))
1052 		{
1053 			/*
1054 			 * NOTE: some tuples may be sent to future batches.  Also, it is
1055 			 * possible for hashtable->nbatch to be increased here!
1056 			 */
1057 			ExecHashTableInsert(hashtable, slot, hashvalue);
1058 		}
1059 
1060 		/*
1061 		 * after we build the hash table, the inner batch file is no longer
1062 		 * needed
1063 		 */
1064 		BufFileClose(innerFile);
1065 		hashtable->innerBatchFile[curbatch] = NULL;
1066 	}
1067 
1068 	/*
1069 	 * Rewind outer batch file (if present), so that we can start reading it.
1070 	 */
1071 	if (hashtable->outerBatchFile[curbatch] != NULL)
1072 	{
1073 		if (BufFileSeek(hashtable->outerBatchFile[curbatch], 0, 0L, SEEK_SET))
1074 			ereport(ERROR,
1075 					(errcode_for_file_access(),
1076 					 errmsg("could not rewind hash-join temporary file")));
1077 	}
1078 
1079 	return true;
1080 }
1081 
1082 /*
1083  * Choose a batch to work on, and attach to it.  Returns true if successful,
1084  * false if there are no more batches.
1085  */
1086 static bool
ExecParallelHashJoinNewBatch(HashJoinState * hjstate)1087 ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
1088 {
1089 	HashJoinTable hashtable = hjstate->hj_HashTable;
1090 	int			start_batchno;
1091 	int			batchno;
1092 
1093 	/*
1094 	 * If we started up so late that the batch tracking array has been freed
1095 	 * already by ExecHashTableDetach(), then we are finished.  See also
1096 	 * ExecParallelHashEnsureBatchAccessors().
1097 	 */
1098 	if (hashtable->batches == NULL)
1099 		return false;
1100 
1101 	/*
1102 	 * If we were already attached to a batch, remember not to bother checking
1103 	 * it again, and detach from it (possibly freeing the hash table if we are
1104 	 * last to detach).
1105 	 */
1106 	if (hashtable->curbatch >= 0)
1107 	{
1108 		hashtable->batches[hashtable->curbatch].done = true;
1109 		ExecHashTableDetachBatch(hashtable);
1110 	}
1111 
1112 	/*
1113 	 * Search for a batch that isn't done.  We use an atomic counter to start
1114 	 * our search at a different batch in every participant when there are
1115 	 * more batches than participants.
1116 	 */
1117 	batchno = start_batchno =
1118 		pg_atomic_fetch_add_u32(&hashtable->parallel_state->distributor, 1) %
1119 		hashtable->nbatch;
1120 	do
1121 	{
1122 		uint32		hashvalue;
1123 		MinimalTuple tuple;
1124 		TupleTableSlot *slot;
1125 
1126 		if (!hashtable->batches[batchno].done)
1127 		{
1128 			SharedTuplestoreAccessor *inner_tuples;
1129 			Barrier    *batch_barrier =
1130 			&hashtable->batches[batchno].shared->batch_barrier;
1131 
1132 			switch (BarrierAttach(batch_barrier))
1133 			{
1134 				case PHJ_BATCH_ELECTING:
1135 
1136 					/* One backend allocates the hash table. */
1137 					if (BarrierArriveAndWait(batch_barrier,
1138 											 WAIT_EVENT_HASH_BATCH_ELECT))
1139 						ExecParallelHashTableAlloc(hashtable, batchno);
1140 					/* Fall through. */
1141 
1142 				case PHJ_BATCH_ALLOCATING:
1143 					/* Wait for allocation to complete. */
1144 					BarrierArriveAndWait(batch_barrier,
1145 										 WAIT_EVENT_HASH_BATCH_ALLOCATE);
1146 					/* Fall through. */
1147 
1148 				case PHJ_BATCH_LOADING:
1149 					/* Start (or join in) loading tuples. */
1150 					ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
1151 					inner_tuples = hashtable->batches[batchno].inner_tuples;
1152 					sts_begin_parallel_scan(inner_tuples);
1153 					while ((tuple = sts_parallel_scan_next(inner_tuples,
1154 														   &hashvalue)))
1155 					{
1156 						ExecForceStoreMinimalTuple(tuple,
1157 												   hjstate->hj_HashTupleSlot,
1158 												   false);
1159 						slot = hjstate->hj_HashTupleSlot;
1160 						ExecParallelHashTableInsertCurrentBatch(hashtable, slot,
1161 																hashvalue);
1162 					}
1163 					sts_end_parallel_scan(inner_tuples);
1164 					BarrierArriveAndWait(batch_barrier,
1165 										 WAIT_EVENT_HASH_BATCH_LOAD);
1166 					/* Fall through. */
1167 
1168 				case PHJ_BATCH_PROBING:
1169 
1170 					/*
1171 					 * This batch is ready to probe.  Return control to
1172 					 * caller. We stay attached to batch_barrier so that the
1173 					 * hash table stays alive until everyone's finished
1174 					 * probing it, but no participant is allowed to wait at
1175 					 * this barrier again (or else a deadlock could occur).
1176 					 * All attached participants must eventually call
1177 					 * BarrierArriveAndDetach() so that the final phase
1178 					 * PHJ_BATCH_DONE can be reached.
1179 					 */
1180 					ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
1181 					sts_begin_parallel_scan(hashtable->batches[batchno].outer_tuples);
1182 					return true;
1183 
1184 				case PHJ_BATCH_DONE:
1185 
1186 					/*
1187 					 * Already done.  Detach and go around again (if any
1188 					 * remain).
1189 					 */
1190 					BarrierDetach(batch_barrier);
1191 					hashtable->batches[batchno].done = true;
1192 					hashtable->curbatch = -1;
1193 					break;
1194 
1195 				default:
1196 					elog(ERROR, "unexpected batch phase %d",
1197 						 BarrierPhase(batch_barrier));
1198 			}
1199 		}
1200 		batchno = (batchno + 1) % hashtable->nbatch;
1201 	} while (batchno != start_batchno);
1202 
1203 	return false;
1204 }
1205 
1206 /*
1207  * ExecHashJoinSaveTuple
1208  *		save a tuple to a batch file.
1209  *
1210  * The data recorded in the file for each tuple is its hash value,
1211  * then the tuple in MinimalTuple format.
1212  *
1213  * Note: it is important always to call this in the regular executor
1214  * context, not in a shorter-lived context; else the temp file buffers
1215  * will get messed up.
1216  */
1217 void
ExecHashJoinSaveTuple(MinimalTuple tuple,uint32 hashvalue,BufFile ** fileptr)1218 ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue,
1219 					  BufFile **fileptr)
1220 {
1221 	BufFile    *file = *fileptr;
1222 
1223 	if (file == NULL)
1224 	{
1225 		/* First write to this batch file, so open it. */
1226 		file = BufFileCreateTemp(false);
1227 		*fileptr = file;
1228 	}
1229 
1230 	BufFileWrite(file, (void *) &hashvalue, sizeof(uint32));
1231 	BufFileWrite(file, (void *) tuple, tuple->t_len);
1232 }
1233 
1234 /*
1235  * ExecHashJoinGetSavedTuple
1236  *		read the next tuple from a batch file.  Return NULL if no more.
1237  *
1238  * On success, *hashvalue is set to the tuple's hash value, and the tuple
1239  * itself is stored in the given slot.
1240  */
1241 static TupleTableSlot *
ExecHashJoinGetSavedTuple(HashJoinState * hjstate,BufFile * file,uint32 * hashvalue,TupleTableSlot * tupleSlot)1242 ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
1243 						  BufFile *file,
1244 						  uint32 *hashvalue,
1245 						  TupleTableSlot *tupleSlot)
1246 {
1247 	uint32		header[2];
1248 	size_t		nread;
1249 	MinimalTuple tuple;
1250 
1251 	/*
1252 	 * We check for interrupts here because this is typically taken as an
1253 	 * alternative code path to an ExecProcNode() call, which would include
1254 	 * such a check.
1255 	 */
1256 	CHECK_FOR_INTERRUPTS();
1257 
1258 	/*
1259 	 * Since both the hash value and the MinimalTuple length word are uint32,
1260 	 * we can read them both in one BufFileRead() call without any type
1261 	 * cheating.
1262 	 */
1263 	nread = BufFileRead(file, (void *) header, sizeof(header));
1264 	if (nread == 0)				/* end of file */
1265 	{
1266 		ExecClearTuple(tupleSlot);
1267 		return NULL;
1268 	}
1269 	if (nread != sizeof(header))
1270 		ereport(ERROR,
1271 				(errcode_for_file_access(),
1272 				 errmsg("could not read from hash-join temporary file: read only %zu of %zu bytes",
1273 						nread, sizeof(header))));
1274 	*hashvalue = header[0];
1275 	tuple = (MinimalTuple) palloc(header[1]);
1276 	tuple->t_len = header[1];
1277 	nread = BufFileRead(file,
1278 						(void *) ((char *) tuple + sizeof(uint32)),
1279 						header[1] - sizeof(uint32));
1280 	if (nread != header[1] - sizeof(uint32))
1281 		ereport(ERROR,
1282 				(errcode_for_file_access(),
1283 				 errmsg("could not read from hash-join temporary file: read only %zu of %zu bytes",
1284 						nread, header[1] - sizeof(uint32))));
1285 	ExecForceStoreMinimalTuple(tuple, tupleSlot, true);
1286 	return tupleSlot;
1287 }
1288 
1289 
1290 void
ExecReScanHashJoin(HashJoinState * node)1291 ExecReScanHashJoin(HashJoinState *node)
1292 {
1293 	/*
1294 	 * In a multi-batch join, we currently have to do rescans the hard way,
1295 	 * primarily because batch temp files may have already been released. But
1296 	 * if it's a single-batch join, and there is no parameter change for the
1297 	 * inner subnode, then we can just re-use the existing hash table without
1298 	 * rebuilding it.
1299 	 */
1300 	if (node->hj_HashTable != NULL)
1301 	{
1302 		if (node->hj_HashTable->nbatch == 1 &&
1303 			node->js.ps.righttree->chgParam == NULL)
1304 		{
1305 			/*
1306 			 * Okay to reuse the hash table; needn't rescan inner, either.
1307 			 *
1308 			 * However, if it's a right/full join, we'd better reset the
1309 			 * inner-tuple match flags contained in the table.
1310 			 */
1311 			if (HJ_FILL_INNER(node))
1312 				ExecHashTableResetMatchFlags(node->hj_HashTable);
1313 
1314 			/*
1315 			 * Also, we need to reset our state about the emptiness of the
1316 			 * outer relation, so that the new scan of the outer will update
1317 			 * it correctly if it turns out to be empty this time. (There's no
1318 			 * harm in clearing it now because ExecHashJoin won't need the
1319 			 * info.  In the other cases, where the hash table doesn't exist
1320 			 * or we are destroying it, we leave this state alone because
1321 			 * ExecHashJoin will need it the first time through.)
1322 			 */
1323 			node->hj_OuterNotEmpty = false;
1324 
1325 			/* ExecHashJoin can skip the BUILD_HASHTABLE step */
1326 			node->hj_JoinState = HJ_NEED_NEW_OUTER;
1327 		}
1328 		else
1329 		{
1330 			/* must destroy and rebuild hash table */
1331 			HashState  *hashNode = castNode(HashState, innerPlanState(node));
1332 
1333 			Assert(hashNode->hashtable == node->hj_HashTable);
1334 			/* accumulate stats from old hash table, if wanted */
1335 			/* (this should match ExecShutdownHash) */
1336 			if (hashNode->ps.instrument && !hashNode->hinstrument)
1337 				hashNode->hinstrument = (HashInstrumentation *)
1338 					palloc0(sizeof(HashInstrumentation));
1339 			if (hashNode->hinstrument)
1340 				ExecHashAccumInstrumentation(hashNode->hinstrument,
1341 											 hashNode->hashtable);
1342 			/* for safety, be sure to clear child plan node's pointer too */
1343 			hashNode->hashtable = NULL;
1344 
1345 			ExecHashTableDestroy(node->hj_HashTable);
1346 			node->hj_HashTable = NULL;
1347 			node->hj_JoinState = HJ_BUILD_HASHTABLE;
1348 
1349 			/*
1350 			 * if chgParam of subnode is not null then plan will be re-scanned
1351 			 * by first ExecProcNode.
1352 			 */
1353 			if (node->js.ps.righttree->chgParam == NULL)
1354 				ExecReScan(node->js.ps.righttree);
1355 		}
1356 	}
1357 
1358 	/* Always reset intra-tuple state */
1359 	node->hj_CurHashValue = 0;
1360 	node->hj_CurBucketNo = 0;
1361 	node->hj_CurSkewBucketNo = INVALID_SKEW_BUCKET_NO;
1362 	node->hj_CurTuple = NULL;
1363 
1364 	node->hj_MatchedOuter = false;
1365 	node->hj_FirstOuterTupleSlot = NULL;
1366 
1367 	/*
1368 	 * if chgParam of subnode is not null then plan will be re-scanned by
1369 	 * first ExecProcNode.
1370 	 */
1371 	if (node->js.ps.lefttree->chgParam == NULL)
1372 		ExecReScan(node->js.ps.lefttree);
1373 }
1374 
1375 void
ExecShutdownHashJoin(HashJoinState * node)1376 ExecShutdownHashJoin(HashJoinState *node)
1377 {
1378 	if (node->hj_HashTable)
1379 	{
1380 		/*
1381 		 * Detach from shared state before DSM memory goes away.  This makes
1382 		 * sure that we don't have any pointers into DSM memory by the time
1383 		 * ExecEndHashJoin runs.
1384 		 */
1385 		ExecHashTableDetachBatch(node->hj_HashTable);
1386 		ExecHashTableDetach(node->hj_HashTable);
1387 	}
1388 }
1389 
1390 static void
ExecParallelHashJoinPartitionOuter(HashJoinState * hjstate)1391 ExecParallelHashJoinPartitionOuter(HashJoinState *hjstate)
1392 {
1393 	PlanState  *outerState = outerPlanState(hjstate);
1394 	ExprContext *econtext = hjstate->js.ps.ps_ExprContext;
1395 	HashJoinTable hashtable = hjstate->hj_HashTable;
1396 	TupleTableSlot *slot;
1397 	uint32		hashvalue;
1398 	int			i;
1399 
1400 	Assert(hjstate->hj_FirstOuterTupleSlot == NULL);
1401 
1402 	/* Execute outer plan, writing all tuples to shared tuplestores. */
1403 	for (;;)
1404 	{
1405 		slot = ExecProcNode(outerState);
1406 		if (TupIsNull(slot))
1407 			break;
1408 		econtext->ecxt_outertuple = slot;
1409 		if (ExecHashGetHashValue(hashtable, econtext,
1410 								 hjstate->hj_OuterHashKeys,
1411 								 true,	/* outer tuple */
1412 								 HJ_FILL_OUTER(hjstate),
1413 								 &hashvalue))
1414 		{
1415 			int			batchno;
1416 			int			bucketno;
1417 			bool		shouldFree;
1418 			MinimalTuple mintup = ExecFetchSlotMinimalTuple(slot, &shouldFree);
1419 
1420 			ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno,
1421 									  &batchno);
1422 			sts_puttuple(hashtable->batches[batchno].outer_tuples,
1423 						 &hashvalue, mintup);
1424 
1425 			if (shouldFree)
1426 				heap_free_minimal_tuple(mintup);
1427 		}
1428 		CHECK_FOR_INTERRUPTS();
1429 	}
1430 
1431 	/* Make sure all outer partitions are readable by any backend. */
1432 	for (i = 0; i < hashtable->nbatch; ++i)
1433 		sts_end_write(hashtable->batches[i].outer_tuples);
1434 }
1435 
1436 void
ExecHashJoinEstimate(HashJoinState * state,ParallelContext * pcxt)1437 ExecHashJoinEstimate(HashJoinState *state, ParallelContext *pcxt)
1438 {
1439 	shm_toc_estimate_chunk(&pcxt->estimator, sizeof(ParallelHashJoinState));
1440 	shm_toc_estimate_keys(&pcxt->estimator, 1);
1441 }
1442 
1443 void
ExecHashJoinInitializeDSM(HashJoinState * state,ParallelContext * pcxt)1444 ExecHashJoinInitializeDSM(HashJoinState *state, ParallelContext *pcxt)
1445 {
1446 	int			plan_node_id = state->js.ps.plan->plan_node_id;
1447 	HashState  *hashNode;
1448 	ParallelHashJoinState *pstate;
1449 
1450 	/*
1451 	 * Disable shared hash table mode if we failed to create a real DSM
1452 	 * segment, because that means that we don't have a DSA area to work with.
1453 	 */
1454 	if (pcxt->seg == NULL)
1455 		return;
1456 
1457 	ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin);
1458 
1459 	/*
1460 	 * Set up the state needed to coordinate access to the shared hash
1461 	 * table(s), using the plan node ID as the toc key.
1462 	 */
1463 	pstate = shm_toc_allocate(pcxt->toc, sizeof(ParallelHashJoinState));
1464 	shm_toc_insert(pcxt->toc, plan_node_id, pstate);
1465 
1466 	/*
1467 	 * Set up the shared hash join state with no batches initially.
1468 	 * ExecHashTableCreate() will prepare at least one later and set nbatch
1469 	 * and space_allowed.
1470 	 */
1471 	pstate->nbatch = 0;
1472 	pstate->space_allowed = 0;
1473 	pstate->batches = InvalidDsaPointer;
1474 	pstate->old_batches = InvalidDsaPointer;
1475 	pstate->nbuckets = 0;
1476 	pstate->growth = PHJ_GROWTH_OK;
1477 	pstate->chunk_work_queue = InvalidDsaPointer;
1478 	pg_atomic_init_u32(&pstate->distributor, 0);
1479 	pstate->nparticipants = pcxt->nworkers + 1;
1480 	pstate->total_tuples = 0;
1481 	LWLockInitialize(&pstate->lock,
1482 					 LWTRANCHE_PARALLEL_HASH_JOIN);
1483 	BarrierInit(&pstate->build_barrier, 0);
1484 	BarrierInit(&pstate->grow_batches_barrier, 0);
1485 	BarrierInit(&pstate->grow_buckets_barrier, 0);
1486 
1487 	/* Set up the space we'll use for shared temporary files. */
1488 	SharedFileSetInit(&pstate->fileset, pcxt->seg);
1489 
1490 	/* Initialize the shared state in the hash node. */
1491 	hashNode = (HashState *) innerPlanState(state);
1492 	hashNode->parallel_state = pstate;
1493 }
1494 
1495 /* ----------------------------------------------------------------
1496  *		ExecHashJoinReInitializeDSM
1497  *
1498  *		Reset shared state before beginning a fresh scan.
1499  * ----------------------------------------------------------------
1500  */
1501 void
ExecHashJoinReInitializeDSM(HashJoinState * state,ParallelContext * cxt)1502 ExecHashJoinReInitializeDSM(HashJoinState *state, ParallelContext *cxt)
1503 {
1504 	int			plan_node_id = state->js.ps.plan->plan_node_id;
1505 	ParallelHashJoinState *pstate =
1506 	shm_toc_lookup(cxt->toc, plan_node_id, false);
1507 
1508 	/*
1509 	 * It would be possible to reuse the shared hash table in single-batch
1510 	 * cases by resetting and then fast-forwarding build_barrier to
1511 	 * PHJ_BUILD_DONE and batch 0's batch_barrier to PHJ_BATCH_PROBING, but
1512 	 * currently shared hash tables are already freed by now (by the last
1513 	 * participant to detach from the batch).  We could consider keeping it
1514 	 * around for single-batch joins.  We'd also need to adjust
1515 	 * finalize_plan() so that it doesn't record a dummy dependency for
1516 	 * Parallel Hash nodes, preventing the rescan optimization.  For now we
1517 	 * don't try.
1518 	 */
1519 
1520 	/* Detach, freeing any remaining shared memory. */
1521 	if (state->hj_HashTable != NULL)
1522 	{
1523 		ExecHashTableDetachBatch(state->hj_HashTable);
1524 		ExecHashTableDetach(state->hj_HashTable);
1525 	}
1526 
1527 	/* Clear any shared batch files. */
1528 	SharedFileSetDeleteAll(&pstate->fileset);
1529 
1530 	/* Reset build_barrier to PHJ_BUILD_ELECTING so we can go around again. */
1531 	BarrierInit(&pstate->build_barrier, 0);
1532 }
1533 
1534 void
ExecHashJoinInitializeWorker(HashJoinState * state,ParallelWorkerContext * pwcxt)1535 ExecHashJoinInitializeWorker(HashJoinState *state,
1536 							 ParallelWorkerContext *pwcxt)
1537 {
1538 	HashState  *hashNode;
1539 	int			plan_node_id = state->js.ps.plan->plan_node_id;
1540 	ParallelHashJoinState *pstate =
1541 	shm_toc_lookup(pwcxt->toc, plan_node_id, false);
1542 
1543 	/* Attach to the space for shared temporary files. */
1544 	SharedFileSetAttach(&pstate->fileset, pwcxt->seg);
1545 
1546 	/* Attach to the shared state in the hash node. */
1547 	hashNode = (HashState *) innerPlanState(state);
1548 	hashNode->parallel_state = pstate;
1549 
1550 	ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin);
1551 }
1552