1 /*------------------------------------------------------------------------- 2 * 3 * hashjoin.h 4 * internal structures for hash joins 5 * 6 * 7 * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group 8 * Portions Copyright (c) 1994, Regents of the University of California 9 * 10 * src/include/executor/hashjoin.h 11 * 12 *------------------------------------------------------------------------- 13 */ 14 #ifndef HASHJOIN_H 15 #define HASHJOIN_H 16 17 #include "nodes/execnodes.h" 18 #include "port/atomics.h" 19 #include "storage/barrier.h" 20 #include "storage/buffile.h" 21 #include "storage/lwlock.h" 22 23 /* ---------------------------------------------------------------- 24 * hash-join hash table structures 25 * 26 * Each active hashjoin has a HashJoinTable control block, which is 27 * palloc'd in the executor's per-query context. All other storage needed 28 * for the hashjoin is kept in private memory contexts, two for each hashjoin. 29 * This makes it easy and fast to release the storage when we don't need it 30 * anymore. (Exception: data associated with the temp files lives in the 31 * per-query context too, since we always call buffile.c in that context.) 32 * 33 * The hashtable contexts are made children of the per-query context, ensuring 34 * that they will be discarded at end of statement even if the join is 35 * aborted early by an error. (Likewise, any temporary files we make will 36 * be cleaned up by the virtual file manager in event of an error.) 37 * 38 * Storage that should live through the entire join is allocated from the 39 * "hashCxt", while storage that is only wanted for the current batch is 40 * allocated in the "batchCxt". By resetting the batchCxt at the end of 41 * each batch, we free all the per-batch storage reliably and without tedium. 42 * 43 * During first scan of inner relation, we get its tuples from executor. 44 * If nbatch > 1 then tuples that don't belong in first batch get saved 45 * into inner-batch temp files. The same statements apply for the 46 * first scan of the outer relation, except we write tuples to outer-batch 47 * temp files. After finishing the first scan, we do the following for 48 * each remaining batch: 49 * 1. Read tuples from inner batch file, load into hash buckets. 50 * 2. Read tuples from outer batch file, match to hash buckets and output. 51 * 52 * It is possible to increase nbatch on the fly if the in-memory hash table 53 * gets too big. The hash-value-to-batch computation is arranged so that this 54 * can only cause a tuple to go into a later batch than previously thought, 55 * never into an earlier batch. When we increase nbatch, we rescan the hash 56 * table and dump out any tuples that are now of a later batch to the correct 57 * inner batch file. Subsequently, while reading either inner or outer batch 58 * files, we might find tuples that no longer belong to the current batch; 59 * if so, we just dump them out to the correct batch file. 60 * ---------------------------------------------------------------- 61 */ 62 63 /* these are in nodes/execnodes.h: */ 64 /* typedef struct HashJoinTupleData *HashJoinTuple; */ 65 /* typedef struct HashJoinTableData *HashJoinTable; */ 66 67 typedef struct HashJoinTupleData 68 { 69 /* link to next tuple in same bucket */ 70 union 71 { 72 struct HashJoinTupleData *unshared; 73 dsa_pointer shared; 74 } next; 75 uint32 hashvalue; /* tuple's hash code */ 76 /* Tuple data, in MinimalTuple format, follows on a MAXALIGN boundary */ 77 } HashJoinTupleData; 78 79 #define HJTUPLE_OVERHEAD MAXALIGN(sizeof(HashJoinTupleData)) 80 #define HJTUPLE_MINTUPLE(hjtup) \ 81 ((MinimalTuple) ((char *) (hjtup) + HJTUPLE_OVERHEAD)) 82 83 /* 84 * If the outer relation's distribution is sufficiently nonuniform, we attempt 85 * to optimize the join by treating the hash values corresponding to the outer 86 * relation's MCVs specially. Inner relation tuples matching these hash 87 * values go into the "skew" hashtable instead of the main hashtable, and 88 * outer relation tuples with these hash values are matched against that 89 * table instead of the main one. Thus, tuples with these hash values are 90 * effectively handled as part of the first batch and will never go to disk. 91 * The skew hashtable is limited to SKEW_WORK_MEM_PERCENT of the total memory 92 * allowed for the join; while building the hashtables, we decrease the number 93 * of MCVs being specially treated if needed to stay under this limit. 94 * 95 * Note: you might wonder why we look at the outer relation stats for this, 96 * rather than the inner. One reason is that the outer relation is typically 97 * bigger, so we get more I/O savings by optimizing for its most common values. 98 * Also, for similarly-sized relations, the planner prefers to put the more 99 * uniformly distributed relation on the inside, so we're more likely to find 100 * interesting skew in the outer relation. 101 */ 102 typedef struct HashSkewBucket 103 { 104 uint32 hashvalue; /* common hash value */ 105 HashJoinTuple tuples; /* linked list of inner-relation tuples */ 106 } HashSkewBucket; 107 108 #define SKEW_BUCKET_OVERHEAD MAXALIGN(sizeof(HashSkewBucket)) 109 #define INVALID_SKEW_BUCKET_NO (-1) 110 #define SKEW_WORK_MEM_PERCENT 2 111 #define SKEW_MIN_OUTER_FRACTION 0.01 112 113 /* 114 * To reduce palloc overhead, the HashJoinTuples for the current batch are 115 * packed in 32kB buffers instead of pallocing each tuple individually. 116 */ 117 typedef struct HashMemoryChunkData 118 { 119 int ntuples; /* number of tuples stored in this chunk */ 120 size_t maxlen; /* size of the chunk's tuple buffer */ 121 size_t used; /* number of buffer bytes already used */ 122 123 /* pointer to the next chunk (linked list) */ 124 union 125 { 126 struct HashMemoryChunkData *unshared; 127 dsa_pointer shared; 128 } next; 129 130 /* 131 * The chunk's tuple buffer starts after the HashMemoryChunkData struct, 132 * at offset HASH_CHUNK_HEADER_SIZE (which must be maxaligned). Note that 133 * that offset is not included in "maxlen" or "used". 134 */ 135 } HashMemoryChunkData; 136 137 typedef struct HashMemoryChunkData *HashMemoryChunk; 138 139 #define HASH_CHUNK_SIZE (32 * 1024L) 140 #define HASH_CHUNK_HEADER_SIZE MAXALIGN(sizeof(HashMemoryChunkData)) 141 #define HASH_CHUNK_DATA(hc) (((char *) (hc)) + HASH_CHUNK_HEADER_SIZE) 142 /* tuples exceeding HASH_CHUNK_THRESHOLD bytes are put in their own chunk */ 143 #define HASH_CHUNK_THRESHOLD (HASH_CHUNK_SIZE / 4) 144 145 /* 146 * For each batch of a Parallel Hash Join, we have a ParallelHashJoinBatch 147 * object in shared memory to coordinate access to it. Since they are 148 * followed by variable-sized objects, they are arranged in contiguous memory 149 * but not accessed directly as an array. 150 */ 151 typedef struct ParallelHashJoinBatch 152 { 153 dsa_pointer buckets; /* array of hash table buckets */ 154 Barrier batch_barrier; /* synchronization for joining this batch */ 155 156 dsa_pointer chunks; /* chunks of tuples loaded */ 157 size_t size; /* size of buckets + chunks in memory */ 158 size_t estimated_size; /* size of buckets + chunks while writing */ 159 size_t ntuples; /* number of tuples loaded */ 160 size_t old_ntuples; /* number of tuples before repartitioning */ 161 bool space_exhausted; 162 163 /* 164 * Variable-sized SharedTuplestore objects follow this struct in memory. 165 * See the accessor macros below. 166 */ 167 } ParallelHashJoinBatch; 168 169 /* Accessor for inner batch tuplestore following a ParallelHashJoinBatch. */ 170 #define ParallelHashJoinBatchInner(batch) \ 171 ((SharedTuplestore *) \ 172 ((char *) (batch) + MAXALIGN(sizeof(ParallelHashJoinBatch)))) 173 174 /* Accessor for outer batch tuplestore following a ParallelHashJoinBatch. */ 175 #define ParallelHashJoinBatchOuter(batch, nparticipants) \ 176 ((SharedTuplestore *) \ 177 ((char *) ParallelHashJoinBatchInner(batch) + \ 178 MAXALIGN(sts_estimate(nparticipants)))) 179 180 /* Total size of a ParallelHashJoinBatch and tuplestores. */ 181 #define EstimateParallelHashJoinBatch(hashtable) \ 182 (MAXALIGN(sizeof(ParallelHashJoinBatch)) + \ 183 MAXALIGN(sts_estimate((hashtable)->parallel_state->nparticipants)) * 2) 184 185 /* Accessor for the nth ParallelHashJoinBatch given the base. */ 186 #define NthParallelHashJoinBatch(base, n) \ 187 ((ParallelHashJoinBatch *) \ 188 ((char *) (base) + \ 189 EstimateParallelHashJoinBatch(hashtable) * (n))) 190 191 /* 192 * Each backend requires a small amount of per-batch state to interact with 193 * each ParallelHashJoinBatch. 194 */ 195 typedef struct ParallelHashJoinBatchAccessor 196 { 197 ParallelHashJoinBatch *shared; /* pointer to shared state */ 198 199 /* Per-backend partial counters to reduce contention. */ 200 size_t preallocated; /* pre-allocated space for this backend */ 201 size_t ntuples; /* number of tuples */ 202 size_t size; /* size of partition in memory */ 203 size_t estimated_size; /* size of partition on disk */ 204 size_t old_ntuples; /* how many tuples before repartitioning? */ 205 bool at_least_one_chunk; /* has this backend allocated a chunk? */ 206 207 bool done; /* flag to remember that a batch is done */ 208 SharedTuplestoreAccessor *inner_tuples; 209 SharedTuplestoreAccessor *outer_tuples; 210 } ParallelHashJoinBatchAccessor; 211 212 /* 213 * While hashing the inner relation, any participant might determine that it's 214 * time to increase the number of buckets to reduce the load factor or batches 215 * to reduce the memory size. This is indicated by setting the growth flag to 216 * these values. 217 */ 218 typedef enum ParallelHashGrowth 219 { 220 /* The current dimensions are sufficient. */ 221 PHJ_GROWTH_OK, 222 /* The load factor is too high, so we need to add buckets. */ 223 PHJ_GROWTH_NEED_MORE_BUCKETS, 224 /* The memory budget would be exhausted, so we need to repartition. */ 225 PHJ_GROWTH_NEED_MORE_BATCHES, 226 /* Repartitioning didn't help last time, so don't try to do that again. */ 227 PHJ_GROWTH_DISABLED 228 } ParallelHashGrowth; 229 230 /* 231 * The shared state used to coordinate a Parallel Hash Join. This is stored 232 * in the DSM segment. 233 */ 234 typedef struct ParallelHashJoinState 235 { 236 dsa_pointer batches; /* array of ParallelHashJoinBatch */ 237 dsa_pointer old_batches; /* previous generation during repartition */ 238 int nbatch; /* number of batches now */ 239 int old_nbatch; /* previous number of batches */ 240 int nbuckets; /* number of buckets */ 241 ParallelHashGrowth growth; /* control batch/bucket growth */ 242 dsa_pointer chunk_work_queue; /* chunk work queue */ 243 int nparticipants; 244 size_t space_allowed; 245 size_t total_tuples; /* total number of inner tuples */ 246 LWLock lock; /* lock protecting the above */ 247 248 Barrier build_barrier; /* synchronization for the build phases */ 249 Barrier grow_batches_barrier; 250 Barrier grow_buckets_barrier; 251 pg_atomic_uint32 distributor; /* counter for load balancing */ 252 253 SharedFileSet fileset; /* space for shared temporary files */ 254 } ParallelHashJoinState; 255 256 /* The phases for building batches, used by build_barrier. */ 257 #define PHJ_BUILD_ELECTING 0 258 #define PHJ_BUILD_ALLOCATING 1 259 #define PHJ_BUILD_HASHING_INNER 2 260 #define PHJ_BUILD_HASHING_OUTER 3 261 #define PHJ_BUILD_DONE 4 262 263 /* The phases for probing each batch, used by for batch_barrier. */ 264 #define PHJ_BATCH_ELECTING 0 265 #define PHJ_BATCH_ALLOCATING 1 266 #define PHJ_BATCH_LOADING 2 267 #define PHJ_BATCH_PROBING 3 268 #define PHJ_BATCH_DONE 4 269 270 /* The phases of batch growth while hashing, for grow_batches_barrier. */ 271 #define PHJ_GROW_BATCHES_ELECTING 0 272 #define PHJ_GROW_BATCHES_ALLOCATING 1 273 #define PHJ_GROW_BATCHES_REPARTITIONING 2 274 #define PHJ_GROW_BATCHES_DECIDING 3 275 #define PHJ_GROW_BATCHES_FINISHING 4 276 #define PHJ_GROW_BATCHES_PHASE(n) ((n) % 5) /* circular phases */ 277 278 /* The phases of bucket growth while hashing, for grow_buckets_barrier. */ 279 #define PHJ_GROW_BUCKETS_ELECTING 0 280 #define PHJ_GROW_BUCKETS_ALLOCATING 1 281 #define PHJ_GROW_BUCKETS_REINSERTING 2 282 #define PHJ_GROW_BUCKETS_PHASE(n) ((n) % 3) /* circular phases */ 283 284 typedef struct HashJoinTableData 285 { 286 int nbuckets; /* # buckets in the in-memory hash table */ 287 int log2_nbuckets; /* its log2 (nbuckets must be a power of 2) */ 288 289 int nbuckets_original; /* # buckets when starting the first hash */ 290 int nbuckets_optimal; /* optimal # buckets (per batch) */ 291 int log2_nbuckets_optimal; /* log2(nbuckets_optimal) */ 292 293 /* buckets[i] is head of list of tuples in i'th in-memory bucket */ 294 union 295 { 296 /* unshared array is per-batch storage, as are all the tuples */ 297 struct HashJoinTupleData **unshared; 298 /* shared array is per-query DSA area, as are all the tuples */ 299 dsa_pointer_atomic *shared; 300 } buckets; 301 302 bool keepNulls; /* true to store unmatchable NULL tuples */ 303 304 bool skewEnabled; /* are we using skew optimization? */ 305 HashSkewBucket **skewBucket; /* hashtable of skew buckets */ 306 int skewBucketLen; /* size of skewBucket array (a power of 2!) */ 307 int nSkewBuckets; /* number of active skew buckets */ 308 int *skewBucketNums; /* array indexes of active skew buckets */ 309 310 int nbatch; /* number of batches */ 311 int curbatch; /* current batch #; 0 during 1st pass */ 312 313 int nbatch_original; /* nbatch when we started inner scan */ 314 int nbatch_outstart; /* nbatch when we started outer scan */ 315 316 bool growEnabled; /* flag to shut off nbatch increases */ 317 318 double totalTuples; /* # tuples obtained from inner plan */ 319 double partialTuples; /* # tuples obtained from inner plan by me */ 320 double skewTuples; /* # tuples inserted into skew tuples */ 321 322 /* 323 * These arrays are allocated for the life of the hash join, but only if 324 * nbatch > 1. A file is opened only when we first write a tuple into it 325 * (otherwise its pointer remains NULL). Note that the zero'th array 326 * elements never get used, since we will process rather than dump out any 327 * tuples of batch zero. 328 */ 329 BufFile **innerBatchFile; /* buffered virtual temp file per batch */ 330 BufFile **outerBatchFile; /* buffered virtual temp file per batch */ 331 332 /* 333 * Info about the datatype-specific hash functions for the datatypes being 334 * hashed. These are arrays of the same length as the number of hash join 335 * clauses (hash keys). 336 */ 337 FmgrInfo *outer_hashfunctions; /* lookup data for hash functions */ 338 FmgrInfo *inner_hashfunctions; /* lookup data for hash functions */ 339 bool *hashStrict; /* is each hash join operator strict? */ 340 341 Size spaceUsed; /* memory space currently used by tuples */ 342 Size spaceAllowed; /* upper limit for space used */ 343 Size spacePeak; /* peak space used */ 344 Size spaceUsedSkew; /* skew hash table's current space usage */ 345 Size spaceAllowedSkew; /* upper limit for skew hashtable */ 346 347 MemoryContext hashCxt; /* context for whole-hash-join storage */ 348 MemoryContext batchCxt; /* context for this-batch-only storage */ 349 350 /* used for dense allocation of tuples (into linked chunks) */ 351 HashMemoryChunk chunks; /* one list for the whole batch */ 352 353 /* Shared and private state for Parallel Hash. */ 354 HashMemoryChunk current_chunk; /* this backend's current chunk */ 355 dsa_area *area; /* DSA area to allocate memory from */ 356 ParallelHashJoinState *parallel_state; 357 ParallelHashJoinBatchAccessor *batches; 358 dsa_pointer current_chunk_shared; 359 } HashJoinTableData; 360 361 #endif /* HASHJOIN_H */ 362