1 /*-------------------------------------------------------------------------
2  *
3  * hashjoin.h
4  *	  internal structures for hash joins
5  *
6  *
7  * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * src/include/executor/hashjoin.h
11  *
12  *-------------------------------------------------------------------------
13  */
14 #ifndef HASHJOIN_H
15 #define HASHJOIN_H
16 
17 #include "nodes/execnodes.h"
18 #include "port/atomics.h"
19 #include "storage/barrier.h"
20 #include "storage/buffile.h"
21 #include "storage/lwlock.h"
22 
23 /* ----------------------------------------------------------------
24  *				hash-join hash table structures
25  *
26  * Each active hashjoin has a HashJoinTable control block, which is
27  * palloc'd in the executor's per-query context.  All other storage needed
28  * for the hashjoin is kept in private memory contexts, two for each hashjoin.
29  * This makes it easy and fast to release the storage when we don't need it
30  * anymore.  (Exception: data associated with the temp files lives in the
31  * per-query context too, since we always call buffile.c in that context.)
32  *
33  * The hashtable contexts are made children of the per-query context, ensuring
34  * that they will be discarded at end of statement even if the join is
35  * aborted early by an error.  (Likewise, any temporary files we make will
36  * be cleaned up by the virtual file manager in event of an error.)
37  *
38  * Storage that should live through the entire join is allocated from the
39  * "hashCxt", while storage that is only wanted for the current batch is
40  * allocated in the "batchCxt".  By resetting the batchCxt at the end of
41  * each batch, we free all the per-batch storage reliably and without tedium.
42  *
43  * During first scan of inner relation, we get its tuples from executor.
44  * If nbatch > 1 then tuples that don't belong in first batch get saved
45  * into inner-batch temp files. The same statements apply for the
46  * first scan of the outer relation, except we write tuples to outer-batch
47  * temp files.  After finishing the first scan, we do the following for
48  * each remaining batch:
49  *	1. Read tuples from inner batch file, load into hash buckets.
50  *	2. Read tuples from outer batch file, match to hash buckets and output.
51  *
52  * It is possible to increase nbatch on the fly if the in-memory hash table
53  * gets too big.  The hash-value-to-batch computation is arranged so that this
54  * can only cause a tuple to go into a later batch than previously thought,
55  * never into an earlier batch.  When we increase nbatch, we rescan the hash
56  * table and dump out any tuples that are now of a later batch to the correct
57  * inner batch file.  Subsequently, while reading either inner or outer batch
58  * files, we might find tuples that no longer belong to the current batch;
59  * if so, we just dump them out to the correct batch file.
60  * ----------------------------------------------------------------
61  */
62 
63 /* these are in nodes/execnodes.h: */
64 /* typedef struct HashJoinTupleData *HashJoinTuple; */
65 /* typedef struct HashJoinTableData *HashJoinTable; */
66 
67 typedef struct HashJoinTupleData
68 {
69 	/* link to next tuple in same bucket */
70 	union
71 	{
72 		struct HashJoinTupleData *unshared;
73 		dsa_pointer shared;
74 	}			next;
75 	uint32		hashvalue;		/* tuple's hash code */
76 	/* Tuple data, in MinimalTuple format, follows on a MAXALIGN boundary */
77 }			HashJoinTupleData;
78 
79 #define HJTUPLE_OVERHEAD  MAXALIGN(sizeof(HashJoinTupleData))
80 #define HJTUPLE_MINTUPLE(hjtup)  \
81 	((MinimalTuple) ((char *) (hjtup) + HJTUPLE_OVERHEAD))
82 
83 /*
84  * If the outer relation's distribution is sufficiently nonuniform, we attempt
85  * to optimize the join by treating the hash values corresponding to the outer
86  * relation's MCVs specially.  Inner relation tuples matching these hash
87  * values go into the "skew" hashtable instead of the main hashtable, and
88  * outer relation tuples with these hash values are matched against that
89  * table instead of the main one.  Thus, tuples with these hash values are
90  * effectively handled as part of the first batch and will never go to disk.
91  * The skew hashtable is limited to SKEW_WORK_MEM_PERCENT of the total memory
92  * allowed for the join; while building the hashtables, we decrease the number
93  * of MCVs being specially treated if needed to stay under this limit.
94  *
95  * Note: you might wonder why we look at the outer relation stats for this,
96  * rather than the inner.  One reason is that the outer relation is typically
97  * bigger, so we get more I/O savings by optimizing for its most common values.
98  * Also, for similarly-sized relations, the planner prefers to put the more
99  * uniformly distributed relation on the inside, so we're more likely to find
100  * interesting skew in the outer relation.
101  */
102 typedef struct HashSkewBucket
103 {
104 	uint32		hashvalue;		/* common hash value */
105 	HashJoinTuple tuples;		/* linked list of inner-relation tuples */
106 } HashSkewBucket;
107 
108 #define SKEW_BUCKET_OVERHEAD  MAXALIGN(sizeof(HashSkewBucket))
109 #define INVALID_SKEW_BUCKET_NO	(-1)
110 #define SKEW_WORK_MEM_PERCENT  2
111 #define SKEW_MIN_OUTER_FRACTION  0.01
112 
113 /*
114  * To reduce palloc overhead, the HashJoinTuples for the current batch are
115  * packed in 32kB buffers instead of pallocing each tuple individually.
116  */
117 typedef struct HashMemoryChunkData
118 {
119 	int			ntuples;		/* number of tuples stored in this chunk */
120 	size_t		maxlen;			/* size of the chunk's tuple buffer */
121 	size_t		used;			/* number of buffer bytes already used */
122 
123 	/* pointer to the next chunk (linked list) */
124 	union
125 	{
126 		struct HashMemoryChunkData *unshared;
127 		dsa_pointer shared;
128 	}			next;
129 
130 	/*
131 	 * The chunk's tuple buffer starts after the HashMemoryChunkData struct,
132 	 * at offset HASH_CHUNK_HEADER_SIZE (which must be maxaligned).  Note that
133 	 * that offset is not included in "maxlen" or "used".
134 	 */
135 }			HashMemoryChunkData;
136 
137 typedef struct HashMemoryChunkData *HashMemoryChunk;
138 
139 #define HASH_CHUNK_SIZE			(32 * 1024L)
140 #define HASH_CHUNK_HEADER_SIZE	MAXALIGN(sizeof(HashMemoryChunkData))
141 #define HASH_CHUNK_DATA(hc)		(((char *) (hc)) + HASH_CHUNK_HEADER_SIZE)
142 /* tuples exceeding HASH_CHUNK_THRESHOLD bytes are put in their own chunk */
143 #define HASH_CHUNK_THRESHOLD	(HASH_CHUNK_SIZE / 4)
144 
145 /*
146  * For each batch of a Parallel Hash Join, we have a ParallelHashJoinBatch
147  * object in shared memory to coordinate access to it.  Since they are
148  * followed by variable-sized objects, they are arranged in contiguous memory
149  * but not accessed directly as an array.
150  */
151 typedef struct ParallelHashJoinBatch
152 {
153 	dsa_pointer buckets;		/* array of hash table buckets */
154 	Barrier		batch_barrier;	/* synchronization for joining this batch */
155 
156 	dsa_pointer chunks;			/* chunks of tuples loaded */
157 	size_t		size;			/* size of buckets + chunks in memory */
158 	size_t		estimated_size; /* size of buckets + chunks while writing */
159 	size_t		ntuples;		/* number of tuples loaded */
160 	size_t		old_ntuples;	/* number of tuples before repartitioning */
161 	bool		space_exhausted;
162 
163 	/*
164 	 * Variable-sized SharedTuplestore objects follow this struct in memory.
165 	 * See the accessor macros below.
166 	 */
167 } ParallelHashJoinBatch;
168 
169 /* Accessor for inner batch tuplestore following a ParallelHashJoinBatch. */
170 #define ParallelHashJoinBatchInner(batch)							\
171 	((SharedTuplestore *)											\
172 	 ((char *) (batch) + MAXALIGN(sizeof(ParallelHashJoinBatch))))
173 
174 /* Accessor for outer batch tuplestore following a ParallelHashJoinBatch. */
175 #define ParallelHashJoinBatchOuter(batch, nparticipants) \
176 	((SharedTuplestore *)												\
177 	 ((char *) ParallelHashJoinBatchInner(batch) +						\
178 	  MAXALIGN(sts_estimate(nparticipants))))
179 
180 /* Total size of a ParallelHashJoinBatch and tuplestores. */
181 #define EstimateParallelHashJoinBatch(hashtable)						\
182 	(MAXALIGN(sizeof(ParallelHashJoinBatch)) +							\
183 	 MAXALIGN(sts_estimate((hashtable)->parallel_state->nparticipants)) * 2)
184 
185 /* Accessor for the nth ParallelHashJoinBatch given the base. */
186 #define NthParallelHashJoinBatch(base, n)								\
187 	((ParallelHashJoinBatch *)											\
188 	 ((char *) (base) +													\
189 	  EstimateParallelHashJoinBatch(hashtable) *  (n)))
190 
191 /*
192  * Each backend requires a small amount of per-batch state to interact with
193  * each ParallelHashJoinBatch.
194  */
195 typedef struct ParallelHashJoinBatchAccessor
196 {
197 	ParallelHashJoinBatch *shared;	/* pointer to shared state */
198 
199 	/* Per-backend partial counters to reduce contention. */
200 	size_t		preallocated;	/* pre-allocated space for this backend */
201 	size_t		ntuples;		/* number of tuples */
202 	size_t		size;			/* size of partition in memory */
203 	size_t		estimated_size; /* size of partition on disk */
204 	size_t		old_ntuples;	/* how many tuples before repartitioning? */
205 	bool		at_least_one_chunk; /* has this backend allocated a chunk? */
206 
207 	bool		done;			/* flag to remember that a batch is done */
208 	SharedTuplestoreAccessor *inner_tuples;
209 	SharedTuplestoreAccessor *outer_tuples;
210 } ParallelHashJoinBatchAccessor;
211 
212 /*
213  * While hashing the inner relation, any participant might determine that it's
214  * time to increase the number of buckets to reduce the load factor or batches
215  * to reduce the memory size.  This is indicated by setting the growth flag to
216  * these values.
217  */
218 typedef enum ParallelHashGrowth
219 {
220 	/* The current dimensions are sufficient. */
221 	PHJ_GROWTH_OK,
222 	/* The load factor is too high, so we need to add buckets. */
223 	PHJ_GROWTH_NEED_MORE_BUCKETS,
224 	/* The memory budget would be exhausted, so we need to repartition. */
225 	PHJ_GROWTH_NEED_MORE_BATCHES,
226 	/* Repartitioning didn't help last time, so don't try to do that again. */
227 	PHJ_GROWTH_DISABLED
228 } ParallelHashGrowth;
229 
230 /*
231  * The shared state used to coordinate a Parallel Hash Join.  This is stored
232  * in the DSM segment.
233  */
234 typedef struct ParallelHashJoinState
235 {
236 	dsa_pointer batches;		/* array of ParallelHashJoinBatch */
237 	dsa_pointer old_batches;	/* previous generation during repartition */
238 	int			nbatch;			/* number of batches now */
239 	int			old_nbatch;		/* previous number of batches */
240 	int			nbuckets;		/* number of buckets */
241 	ParallelHashGrowth growth;	/* control batch/bucket growth */
242 	dsa_pointer chunk_work_queue;	/* chunk work queue */
243 	int			nparticipants;
244 	size_t		space_allowed;
245 	size_t		total_tuples;	/* total number of inner tuples */
246 	LWLock		lock;			/* lock protecting the above */
247 
248 	Barrier		build_barrier;	/* synchronization for the build phases */
249 	Barrier		grow_batches_barrier;
250 	Barrier		grow_buckets_barrier;
251 	pg_atomic_uint32 distributor;	/* counter for load balancing */
252 
253 	SharedFileSet fileset;		/* space for shared temporary files */
254 } ParallelHashJoinState;
255 
256 /* The phases for building batches, used by build_barrier. */
257 #define PHJ_BUILD_ELECTING				0
258 #define PHJ_BUILD_ALLOCATING			1
259 #define PHJ_BUILD_HASHING_INNER			2
260 #define PHJ_BUILD_HASHING_OUTER			3
261 #define PHJ_BUILD_DONE					4
262 
263 /* The phases for probing each batch, used by for batch_barrier. */
264 #define PHJ_BATCH_ELECTING				0
265 #define PHJ_BATCH_ALLOCATING			1
266 #define PHJ_BATCH_LOADING				2
267 #define PHJ_BATCH_PROBING				3
268 #define PHJ_BATCH_DONE					4
269 
270 /* The phases of batch growth while hashing, for grow_batches_barrier. */
271 #define PHJ_GROW_BATCHES_ELECTING		0
272 #define PHJ_GROW_BATCHES_ALLOCATING		1
273 #define PHJ_GROW_BATCHES_REPARTITIONING 2
274 #define PHJ_GROW_BATCHES_DECIDING		3
275 #define PHJ_GROW_BATCHES_FINISHING		4
276 #define PHJ_GROW_BATCHES_PHASE(n)		((n) % 5)	/* circular phases */
277 
278 /* The phases of bucket growth while hashing, for grow_buckets_barrier. */
279 #define PHJ_GROW_BUCKETS_ELECTING		0
280 #define PHJ_GROW_BUCKETS_ALLOCATING		1
281 #define PHJ_GROW_BUCKETS_REINSERTING	2
282 #define PHJ_GROW_BUCKETS_PHASE(n)		((n) % 3)	/* circular phases */
283 
284 typedef struct HashJoinTableData
285 {
286 	int			nbuckets;		/* # buckets in the in-memory hash table */
287 	int			log2_nbuckets;	/* its log2 (nbuckets must be a power of 2) */
288 
289 	int			nbuckets_original;	/* # buckets when starting the first hash */
290 	int			nbuckets_optimal;	/* optimal # buckets (per batch) */
291 	int			log2_nbuckets_optimal;	/* log2(nbuckets_optimal) */
292 
293 	/* buckets[i] is head of list of tuples in i'th in-memory bucket */
294 	union
295 	{
296 		/* unshared array is per-batch storage, as are all the tuples */
297 		struct HashJoinTupleData **unshared;
298 		/* shared array is per-query DSA area, as are all the tuples */
299 		dsa_pointer_atomic *shared;
300 	}			buckets;
301 
302 	bool		keepNulls;		/* true to store unmatchable NULL tuples */
303 
304 	bool		skewEnabled;	/* are we using skew optimization? */
305 	HashSkewBucket **skewBucket;	/* hashtable of skew buckets */
306 	int			skewBucketLen;	/* size of skewBucket array (a power of 2!) */
307 	int			nSkewBuckets;	/* number of active skew buckets */
308 	int		   *skewBucketNums; /* array indexes of active skew buckets */
309 
310 	int			nbatch;			/* number of batches */
311 	int			curbatch;		/* current batch #; 0 during 1st pass */
312 
313 	int			nbatch_original;	/* nbatch when we started inner scan */
314 	int			nbatch_outstart;	/* nbatch when we started outer scan */
315 
316 	bool		growEnabled;	/* flag to shut off nbatch increases */
317 
318 	double		totalTuples;	/* # tuples obtained from inner plan */
319 	double		partialTuples;	/* # tuples obtained from inner plan by me */
320 	double		skewTuples;		/* # tuples inserted into skew tuples */
321 
322 	/*
323 	 * These arrays are allocated for the life of the hash join, but only if
324 	 * nbatch > 1.  A file is opened only when we first write a tuple into it
325 	 * (otherwise its pointer remains NULL).  Note that the zero'th array
326 	 * elements never get used, since we will process rather than dump out any
327 	 * tuples of batch zero.
328 	 */
329 	BufFile   **innerBatchFile; /* buffered virtual temp file per batch */
330 	BufFile   **outerBatchFile; /* buffered virtual temp file per batch */
331 
332 	/*
333 	 * Info about the datatype-specific hash functions for the datatypes being
334 	 * hashed. These are arrays of the same length as the number of hash join
335 	 * clauses (hash keys).
336 	 */
337 	FmgrInfo   *outer_hashfunctions;	/* lookup data for hash functions */
338 	FmgrInfo   *inner_hashfunctions;	/* lookup data for hash functions */
339 	bool	   *hashStrict;		/* is each hash join operator strict? */
340 
341 	Size		spaceUsed;		/* memory space currently used by tuples */
342 	Size		spaceAllowed;	/* upper limit for space used */
343 	Size		spacePeak;		/* peak space used */
344 	Size		spaceUsedSkew;	/* skew hash table's current space usage */
345 	Size		spaceAllowedSkew;	/* upper limit for skew hashtable */
346 
347 	MemoryContext hashCxt;		/* context for whole-hash-join storage */
348 	MemoryContext batchCxt;		/* context for this-batch-only storage */
349 
350 	/* used for dense allocation of tuples (into linked chunks) */
351 	HashMemoryChunk chunks;		/* one list for the whole batch */
352 
353 	/* Shared and private state for Parallel Hash. */
354 	HashMemoryChunk current_chunk;	/* this backend's current chunk */
355 	dsa_area   *area;			/* DSA area to allocate memory from */
356 	ParallelHashJoinState *parallel_state;
357 	ParallelHashJoinBatchAccessor *batches;
358 	dsa_pointer current_chunk_shared;
359 }			HashJoinTableData;
360 
361 #endif							/* HASHJOIN_H */
362