1 /*------------------------------------------------------------------------- 2 * 3 * nodeHash.c 4 * Routines to hash relations for hashjoin 5 * 6 * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group 7 * Portions Copyright (c) 1994, Regents of the University of California 8 * 9 * 10 * IDENTIFICATION 11 * src/backend/executor/nodeHash.c 12 * 13 * See note on parallelism in nodeHashjoin.c. 14 * 15 *------------------------------------------------------------------------- 16 */ 17 /* 18 * INTERFACE ROUTINES 19 * MultiExecHash - generate an in-memory hash table of the relation 20 * ExecInitHash - initialize node and subnodes 21 * ExecEndHash - shutdown node and subnodes 22 */ 23 24 #include "postgres.h" 25 26 #include <math.h> 27 #include <limits.h> 28 29 #include "access/htup_details.h" 30 #include "access/parallel.h" 31 #include "catalog/pg_statistic.h" 32 #include "commands/tablespace.h" 33 #include "executor/execdebug.h" 34 #include "executor/hashjoin.h" 35 #include "executor/nodeHash.h" 36 #include "executor/nodeHashjoin.h" 37 #include "miscadmin.h" 38 #include "pgstat.h" 39 #include "port/atomics.h" 40 #include "port/pg_bitutils.h" 41 #include "utils/dynahash.h" 42 #include "utils/guc.h" 43 #include "utils/lsyscache.h" 44 #include "utils/memutils.h" 45 #include "utils/syscache.h" 46 47 static void ExecHashIncreaseNumBatches(HashJoinTable hashtable); 48 static void ExecHashIncreaseNumBuckets(HashJoinTable hashtable); 49 static void ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable); 50 static void ExecParallelHashIncreaseNumBuckets(HashJoinTable hashtable); 51 static void ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node, 52 int mcvsToUse); 53 static void ExecHashSkewTableInsert(HashJoinTable hashtable, 54 TupleTableSlot *slot, 55 uint32 hashvalue, 56 int bucketNumber); 57 static void ExecHashRemoveNextSkewBucket(HashJoinTable hashtable); 58 59 static void *dense_alloc(HashJoinTable hashtable, Size size); 60 static HashJoinTuple ExecParallelHashTupleAlloc(HashJoinTable hashtable, 61 size_t size, 62 dsa_pointer *shared); 63 static void MultiExecPrivateHash(HashState *node); 64 static void MultiExecParallelHash(HashState *node); 65 static inline HashJoinTuple ExecParallelHashFirstTuple(HashJoinTable table, 66 int bucketno); 67 static inline HashJoinTuple ExecParallelHashNextTuple(HashJoinTable table, 68 HashJoinTuple tuple); 69 static inline void ExecParallelHashPushTuple(dsa_pointer_atomic *head, 70 HashJoinTuple tuple, 71 dsa_pointer tuple_shared); 72 static void ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch); 73 static void ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable); 74 static void ExecParallelHashRepartitionFirst(HashJoinTable hashtable); 75 static void ExecParallelHashRepartitionRest(HashJoinTable hashtable); 76 static HashMemoryChunk ExecParallelHashPopChunkQueue(HashJoinTable table, 77 dsa_pointer *shared); 78 static bool ExecParallelHashTuplePrealloc(HashJoinTable hashtable, 79 int batchno, 80 size_t size); 81 static void ExecParallelHashMergeCounters(HashJoinTable hashtable); 82 static void ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable); 83 84 85 /* ---------------------------------------------------------------- 86 * ExecHash 87 * 88 * stub for pro forma compliance 89 * ---------------------------------------------------------------- 90 */ 91 static TupleTableSlot * 92 ExecHash(PlanState *pstate) 93 { 94 elog(ERROR, "Hash node does not support ExecProcNode call convention"); 95 return NULL; 96 } 97 98 /* ---------------------------------------------------------------- 99 * MultiExecHash 100 * 101 * build hash table for hashjoin, doing partitioning if more 102 * than one batch is required. 103 * ---------------------------------------------------------------- 104 */ 105 Node * 106 MultiExecHash(HashState *node) 107 { 108 /* must provide our own instrumentation support */ 109 if (node->ps.instrument) 110 InstrStartNode(node->ps.instrument); 111 112 if (node->parallel_state != NULL) 113 MultiExecParallelHash(node); 114 else 115 MultiExecPrivateHash(node); 116 117 /* must provide our own instrumentation support */ 118 if (node->ps.instrument) 119 InstrStopNode(node->ps.instrument, node->hashtable->partialTuples); 120 121 /* 122 * We do not return the hash table directly because it's not a subtype of 123 * Node, and so would violate the MultiExecProcNode API. Instead, our 124 * parent Hashjoin node is expected to know how to fish it out of our node 125 * state. Ugly but not really worth cleaning up, since Hashjoin knows 126 * quite a bit more about Hash besides that. 127 */ 128 return NULL; 129 } 130 131 /* ---------------------------------------------------------------- 132 * MultiExecPrivateHash 133 * 134 * parallel-oblivious version, building a backend-private 135 * hash table and (if necessary) batch files. 136 * ---------------------------------------------------------------- 137 */ 138 static void 139 MultiExecPrivateHash(HashState *node) 140 { 141 PlanState *outerNode; 142 List *hashkeys; 143 HashJoinTable hashtable; 144 TupleTableSlot *slot; 145 ExprContext *econtext; 146 uint32 hashvalue; 147 148 /* 149 * get state info from node 150 */ 151 outerNode = outerPlanState(node); 152 hashtable = node->hashtable; 153 154 /* 155 * set expression context 156 */ 157 hashkeys = node->hashkeys; 158 econtext = node->ps.ps_ExprContext; 159 160 /* 161 * Get all tuples from the node below the Hash node and insert into the 162 * hash table (or temp files). 163 */ 164 for (;;) 165 { 166 slot = ExecProcNode(outerNode); 167 if (TupIsNull(slot)) 168 break; 169 /* We have to compute the hash value */ 170 econtext->ecxt_outertuple = slot; 171 if (ExecHashGetHashValue(hashtable, econtext, hashkeys, 172 false, hashtable->keepNulls, 173 &hashvalue)) 174 { 175 int bucketNumber; 176 177 bucketNumber = ExecHashGetSkewBucket(hashtable, hashvalue); 178 if (bucketNumber != INVALID_SKEW_BUCKET_NO) 179 { 180 /* It's a skew tuple, so put it into that hash table */ 181 ExecHashSkewTableInsert(hashtable, slot, hashvalue, 182 bucketNumber); 183 hashtable->skewTuples += 1; 184 } 185 else 186 { 187 /* Not subject to skew optimization, so insert normally */ 188 ExecHashTableInsert(hashtable, slot, hashvalue); 189 } 190 hashtable->totalTuples += 1; 191 } 192 } 193 194 /* resize the hash table if needed (NTUP_PER_BUCKET exceeded) */ 195 if (hashtable->nbuckets != hashtable->nbuckets_optimal) 196 ExecHashIncreaseNumBuckets(hashtable); 197 198 /* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */ 199 hashtable->spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple); 200 if (hashtable->spaceUsed > hashtable->spacePeak) 201 hashtable->spacePeak = hashtable->spaceUsed; 202 203 hashtable->partialTuples = hashtable->totalTuples; 204 } 205 206 /* ---------------------------------------------------------------- 207 * MultiExecParallelHash 208 * 209 * parallel-aware version, building a shared hash table and 210 * (if necessary) batch files using the combined effort of 211 * a set of co-operating backends. 212 * ---------------------------------------------------------------- 213 */ 214 static void 215 MultiExecParallelHash(HashState *node) 216 { 217 ParallelHashJoinState *pstate; 218 PlanState *outerNode; 219 List *hashkeys; 220 HashJoinTable hashtable; 221 TupleTableSlot *slot; 222 ExprContext *econtext; 223 uint32 hashvalue; 224 Barrier *build_barrier; 225 int i; 226 227 /* 228 * get state info from node 229 */ 230 outerNode = outerPlanState(node); 231 hashtable = node->hashtable; 232 233 /* 234 * set expression context 235 */ 236 hashkeys = node->hashkeys; 237 econtext = node->ps.ps_ExprContext; 238 239 /* 240 * Synchronize the parallel hash table build. At this stage we know that 241 * the shared hash table has been or is being set up by 242 * ExecHashTableCreate(), but we don't know if our peers have returned 243 * from there or are here in MultiExecParallelHash(), and if so how far 244 * through they are. To find out, we check the build_barrier phase then 245 * and jump to the right step in the build algorithm. 246 */ 247 pstate = hashtable->parallel_state; 248 build_barrier = &pstate->build_barrier; 249 Assert(BarrierPhase(build_barrier) >= PHJ_BUILD_ALLOCATING); 250 switch (BarrierPhase(build_barrier)) 251 { 252 case PHJ_BUILD_ALLOCATING: 253 254 /* 255 * Either I just allocated the initial hash table in 256 * ExecHashTableCreate(), or someone else is doing that. Either 257 * way, wait for everyone to arrive here so we can proceed. 258 */ 259 BarrierArriveAndWait(build_barrier, WAIT_EVENT_HASH_BUILD_ALLOCATE); 260 /* Fall through. */ 261 262 case PHJ_BUILD_HASHING_INNER: 263 264 /* 265 * It's time to begin hashing, or if we just arrived here then 266 * hashing is already underway, so join in that effort. While 267 * hashing we have to be prepared to help increase the number of 268 * batches or buckets at any time, and if we arrived here when 269 * that was already underway we'll have to help complete that work 270 * immediately so that it's safe to access batches and buckets 271 * below. 272 */ 273 if (PHJ_GROW_BATCHES_PHASE(BarrierAttach(&pstate->grow_batches_barrier)) != 274 PHJ_GROW_BATCHES_ELECTING) 275 ExecParallelHashIncreaseNumBatches(hashtable); 276 if (PHJ_GROW_BUCKETS_PHASE(BarrierAttach(&pstate->grow_buckets_barrier)) != 277 PHJ_GROW_BUCKETS_ELECTING) 278 ExecParallelHashIncreaseNumBuckets(hashtable); 279 ExecParallelHashEnsureBatchAccessors(hashtable); 280 ExecParallelHashTableSetCurrentBatch(hashtable, 0); 281 for (;;) 282 { 283 slot = ExecProcNode(outerNode); 284 if (TupIsNull(slot)) 285 break; 286 econtext->ecxt_outertuple = slot; 287 if (ExecHashGetHashValue(hashtable, econtext, hashkeys, 288 false, hashtable->keepNulls, 289 &hashvalue)) 290 ExecParallelHashTableInsert(hashtable, slot, hashvalue); 291 hashtable->partialTuples++; 292 } 293 294 /* 295 * Make sure that any tuples we wrote to disk are visible to 296 * others before anyone tries to load them. 297 */ 298 for (i = 0; i < hashtable->nbatch; ++i) 299 sts_end_write(hashtable->batches[i].inner_tuples); 300 301 /* 302 * Update shared counters. We need an accurate total tuple count 303 * to control the empty table optimization. 304 */ 305 ExecParallelHashMergeCounters(hashtable); 306 307 BarrierDetach(&pstate->grow_buckets_barrier); 308 BarrierDetach(&pstate->grow_batches_barrier); 309 310 /* 311 * Wait for everyone to finish building and flushing files and 312 * counters. 313 */ 314 if (BarrierArriveAndWait(build_barrier, 315 WAIT_EVENT_HASH_BUILD_HASH_INNER)) 316 { 317 /* 318 * Elect one backend to disable any further growth. Batches 319 * are now fixed. While building them we made sure they'd fit 320 * in our memory budget when we load them back in later (or we 321 * tried to do that and gave up because we detected extreme 322 * skew). 323 */ 324 pstate->growth = PHJ_GROWTH_DISABLED; 325 } 326 } 327 328 /* 329 * We're not yet attached to a batch. We all agree on the dimensions and 330 * number of inner tuples (for the empty table optimization). 331 */ 332 hashtable->curbatch = -1; 333 hashtable->nbuckets = pstate->nbuckets; 334 hashtable->log2_nbuckets = my_log2(hashtable->nbuckets); 335 hashtable->totalTuples = pstate->total_tuples; 336 ExecParallelHashEnsureBatchAccessors(hashtable); 337 338 /* 339 * The next synchronization point is in ExecHashJoin's HJ_BUILD_HASHTABLE 340 * case, which will bring the build phase to PHJ_BUILD_DONE (if it isn't 341 * there already). 342 */ 343 Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER || 344 BarrierPhase(build_barrier) == PHJ_BUILD_DONE); 345 } 346 347 /* ---------------------------------------------------------------- 348 * ExecInitHash 349 * 350 * Init routine for Hash node 351 * ---------------------------------------------------------------- 352 */ 353 HashState * 354 ExecInitHash(Hash *node, EState *estate, int eflags) 355 { 356 HashState *hashstate; 357 358 /* check for unsupported flags */ 359 Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK))); 360 361 /* 362 * create state structure 363 */ 364 hashstate = makeNode(HashState); 365 hashstate->ps.plan = (Plan *) node; 366 hashstate->ps.state = estate; 367 hashstate->ps.ExecProcNode = ExecHash; 368 hashstate->hashtable = NULL; 369 hashstate->hashkeys = NIL; /* will be set by parent HashJoin */ 370 371 /* 372 * Miscellaneous initialization 373 * 374 * create expression context for node 375 */ 376 ExecAssignExprContext(estate, &hashstate->ps); 377 378 /* 379 * initialize child nodes 380 */ 381 outerPlanState(hashstate) = ExecInitNode(outerPlan(node), estate, eflags); 382 383 /* 384 * initialize our result slot and type. No need to build projection 385 * because this node doesn't do projections. 386 */ 387 ExecInitResultTupleSlotTL(&hashstate->ps, &TTSOpsMinimalTuple); 388 hashstate->ps.ps_ProjInfo = NULL; 389 390 /* 391 * initialize child expressions 392 */ 393 Assert(node->plan.qual == NIL); 394 hashstate->hashkeys = 395 ExecInitExprList(node->hashkeys, (PlanState *) hashstate); 396 397 return hashstate; 398 } 399 400 /* --------------------------------------------------------------- 401 * ExecEndHash 402 * 403 * clean up routine for Hash node 404 * ---------------------------------------------------------------- 405 */ 406 void 407 ExecEndHash(HashState *node) 408 { 409 PlanState *outerPlan; 410 411 /* 412 * free exprcontext 413 */ 414 ExecFreeExprContext(&node->ps); 415 416 /* 417 * shut down the subplan 418 */ 419 outerPlan = outerPlanState(node); 420 ExecEndNode(outerPlan); 421 } 422 423 424 /* ---------------------------------------------------------------- 425 * ExecHashTableCreate 426 * 427 * create an empty hashtable data structure for hashjoin. 428 * ---------------------------------------------------------------- 429 */ 430 HashJoinTable 431 ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations, bool keepNulls) 432 { 433 Hash *node; 434 HashJoinTable hashtable; 435 Plan *outerNode; 436 size_t space_allowed; 437 int nbuckets; 438 int nbatch; 439 double rows; 440 int num_skew_mcvs; 441 int log2_nbuckets; 442 int nkeys; 443 int i; 444 ListCell *ho; 445 ListCell *hc; 446 MemoryContext oldcxt; 447 448 /* 449 * Get information about the size of the relation to be hashed (it's the 450 * "outer" subtree of this node, but the inner relation of the hashjoin). 451 * Compute the appropriate size of the hash table. 452 */ 453 node = (Hash *) state->ps.plan; 454 outerNode = outerPlan(node); 455 456 /* 457 * If this is shared hash table with a partial plan, then we can't use 458 * outerNode->plan_rows to estimate its size. We need an estimate of the 459 * total number of rows across all copies of the partial plan. 460 */ 461 rows = node->plan.parallel_aware ? node->rows_total : outerNode->plan_rows; 462 463 ExecChooseHashTableSize(rows, outerNode->plan_width, 464 OidIsValid(node->skewTable), 465 state->parallel_state != NULL, 466 state->parallel_state != NULL ? 467 state->parallel_state->nparticipants - 1 : 0, 468 &space_allowed, 469 &nbuckets, &nbatch, &num_skew_mcvs); 470 471 /* nbuckets must be a power of 2 */ 472 log2_nbuckets = my_log2(nbuckets); 473 Assert(nbuckets == (1 << log2_nbuckets)); 474 475 /* 476 * Initialize the hash table control block. 477 * 478 * The hashtable control block is just palloc'd from the executor's 479 * per-query memory context. Everything else should be kept inside the 480 * subsidiary hashCxt or batchCxt. 481 */ 482 hashtable = (HashJoinTable) palloc(sizeof(HashJoinTableData)); 483 hashtable->nbuckets = nbuckets; 484 hashtable->nbuckets_original = nbuckets; 485 hashtable->nbuckets_optimal = nbuckets; 486 hashtable->log2_nbuckets = log2_nbuckets; 487 hashtable->log2_nbuckets_optimal = log2_nbuckets; 488 hashtable->buckets.unshared = NULL; 489 hashtable->keepNulls = keepNulls; 490 hashtable->skewEnabled = false; 491 hashtable->skewBucket = NULL; 492 hashtable->skewBucketLen = 0; 493 hashtable->nSkewBuckets = 0; 494 hashtable->skewBucketNums = NULL; 495 hashtable->nbatch = nbatch; 496 hashtable->curbatch = 0; 497 hashtable->nbatch_original = nbatch; 498 hashtable->nbatch_outstart = nbatch; 499 hashtable->growEnabled = true; 500 hashtable->totalTuples = 0; 501 hashtable->partialTuples = 0; 502 hashtable->skewTuples = 0; 503 hashtable->innerBatchFile = NULL; 504 hashtable->outerBatchFile = NULL; 505 hashtable->spaceUsed = 0; 506 hashtable->spacePeak = 0; 507 hashtable->spaceAllowed = space_allowed; 508 hashtable->spaceUsedSkew = 0; 509 hashtable->spaceAllowedSkew = 510 hashtable->spaceAllowed * SKEW_HASH_MEM_PERCENT / 100; 511 hashtable->chunks = NULL; 512 hashtable->current_chunk = NULL; 513 hashtable->parallel_state = state->parallel_state; 514 hashtable->area = state->ps.state->es_query_dsa; 515 hashtable->batches = NULL; 516 517 #ifdef HJDEBUG 518 printf("Hashjoin %p: initial nbatch = %d, nbuckets = %d\n", 519 hashtable, nbatch, nbuckets); 520 #endif 521 522 /* 523 * Create temporary memory contexts in which to keep the hashtable working 524 * storage. See notes in executor/hashjoin.h. 525 */ 526 hashtable->hashCxt = AllocSetContextCreate(CurrentMemoryContext, 527 "HashTableContext", 528 ALLOCSET_DEFAULT_SIZES); 529 530 hashtable->batchCxt = AllocSetContextCreate(hashtable->hashCxt, 531 "HashBatchContext", 532 ALLOCSET_DEFAULT_SIZES); 533 534 /* Allocate data that will live for the life of the hashjoin */ 535 536 oldcxt = MemoryContextSwitchTo(hashtable->hashCxt); 537 538 /* 539 * Get info about the hash functions to be used for each hash key. Also 540 * remember whether the join operators are strict. 541 */ 542 nkeys = list_length(hashOperators); 543 hashtable->outer_hashfunctions = 544 (FmgrInfo *) palloc(nkeys * sizeof(FmgrInfo)); 545 hashtable->inner_hashfunctions = 546 (FmgrInfo *) palloc(nkeys * sizeof(FmgrInfo)); 547 hashtable->hashStrict = (bool *) palloc(nkeys * sizeof(bool)); 548 hashtable->collations = (Oid *) palloc(nkeys * sizeof(Oid)); 549 i = 0; 550 forboth(ho, hashOperators, hc, hashCollations) 551 { 552 Oid hashop = lfirst_oid(ho); 553 Oid left_hashfn; 554 Oid right_hashfn; 555 556 if (!get_op_hash_functions(hashop, &left_hashfn, &right_hashfn)) 557 elog(ERROR, "could not find hash function for hash operator %u", 558 hashop); 559 fmgr_info(left_hashfn, &hashtable->outer_hashfunctions[i]); 560 fmgr_info(right_hashfn, &hashtable->inner_hashfunctions[i]); 561 hashtable->hashStrict[i] = op_strict(hashop); 562 hashtable->collations[i] = lfirst_oid(hc); 563 i++; 564 } 565 566 if (nbatch > 1 && hashtable->parallel_state == NULL) 567 { 568 /* 569 * allocate and initialize the file arrays in hashCxt (not needed for 570 * parallel case which uses shared tuplestores instead of raw files) 571 */ 572 hashtable->innerBatchFile = (BufFile **) 573 palloc0(nbatch * sizeof(BufFile *)); 574 hashtable->outerBatchFile = (BufFile **) 575 palloc0(nbatch * sizeof(BufFile *)); 576 /* The files will not be opened until needed... */ 577 /* ... but make sure we have temp tablespaces established for them */ 578 PrepareTempTablespaces(); 579 } 580 581 MemoryContextSwitchTo(oldcxt); 582 583 if (hashtable->parallel_state) 584 { 585 ParallelHashJoinState *pstate = hashtable->parallel_state; 586 Barrier *build_barrier; 587 588 /* 589 * Attach to the build barrier. The corresponding detach operation is 590 * in ExecHashTableDetach. Note that we won't attach to the 591 * batch_barrier for batch 0 yet. We'll attach later and start it out 592 * in PHJ_BATCH_PROBING phase, because batch 0 is allocated up front 593 * and then loaded while hashing (the standard hybrid hash join 594 * algorithm), and we'll coordinate that using build_barrier. 595 */ 596 build_barrier = &pstate->build_barrier; 597 BarrierAttach(build_barrier); 598 599 /* 600 * So far we have no idea whether there are any other participants, 601 * and if so, what phase they are working on. The only thing we care 602 * about at this point is whether someone has already created the 603 * SharedHashJoinBatch objects and the hash table for batch 0. One 604 * backend will be elected to do that now if necessary. 605 */ 606 if (BarrierPhase(build_barrier) == PHJ_BUILD_ELECTING && 607 BarrierArriveAndWait(build_barrier, WAIT_EVENT_HASH_BUILD_ELECT)) 608 { 609 pstate->nbatch = nbatch; 610 pstate->space_allowed = space_allowed; 611 pstate->growth = PHJ_GROWTH_OK; 612 613 /* Set up the shared state for coordinating batches. */ 614 ExecParallelHashJoinSetUpBatches(hashtable, nbatch); 615 616 /* 617 * Allocate batch 0's hash table up front so we can load it 618 * directly while hashing. 619 */ 620 pstate->nbuckets = nbuckets; 621 ExecParallelHashTableAlloc(hashtable, 0); 622 } 623 624 /* 625 * The next Parallel Hash synchronization point is in 626 * MultiExecParallelHash(), which will progress it all the way to 627 * PHJ_BUILD_DONE. The caller must not return control from this 628 * executor node between now and then. 629 */ 630 } 631 else 632 { 633 /* 634 * Prepare context for the first-scan space allocations; allocate the 635 * hashbucket array therein, and set each bucket "empty". 636 */ 637 MemoryContextSwitchTo(hashtable->batchCxt); 638 639 hashtable->buckets.unshared = (HashJoinTuple *) 640 palloc0(nbuckets * sizeof(HashJoinTuple)); 641 642 /* 643 * Set up for skew optimization, if possible and there's a need for 644 * more than one batch. (In a one-batch join, there's no point in 645 * it.) 646 */ 647 if (nbatch > 1) 648 ExecHashBuildSkewHash(hashtable, node, num_skew_mcvs); 649 650 MemoryContextSwitchTo(oldcxt); 651 } 652 653 return hashtable; 654 } 655 656 657 /* 658 * Compute appropriate size for hashtable given the estimated size of the 659 * relation to be hashed (number of rows and average row width). 660 * 661 * This is exported so that the planner's costsize.c can use it. 662 */ 663 664 /* Target bucket loading (tuples per bucket) */ 665 #define NTUP_PER_BUCKET 1 666 667 void 668 ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, 669 bool try_combined_hash_mem, 670 int parallel_workers, 671 size_t *space_allowed, 672 int *numbuckets, 673 int *numbatches, 674 int *num_skew_mcvs) 675 { 676 int tupsize; 677 double inner_rel_bytes; 678 size_t hash_table_bytes; 679 size_t bucket_bytes; 680 size_t max_pointers; 681 int nbatch = 1; 682 int nbuckets; 683 double dbuckets; 684 685 /* Force a plausible relation size if no info */ 686 if (ntuples <= 0.0) 687 ntuples = 1000.0; 688 689 /* 690 * Estimate tupsize based on footprint of tuple in hashtable... note this 691 * does not allow for any palloc overhead. The manipulations of spaceUsed 692 * don't count palloc overhead either. 693 */ 694 tupsize = HJTUPLE_OVERHEAD + 695 MAXALIGN(SizeofMinimalTupleHeader) + 696 MAXALIGN(tupwidth); 697 inner_rel_bytes = ntuples * tupsize; 698 699 /* 700 * Compute in-memory hashtable size limit from GUCs. 701 */ 702 hash_table_bytes = get_hash_memory_limit(); 703 704 /* 705 * Parallel Hash tries to use the combined hash_mem of all workers to 706 * avoid the need to batch. If that won't work, it falls back to hash_mem 707 * per worker and tries to process batches in parallel. 708 */ 709 if (try_combined_hash_mem) 710 { 711 /* Careful, this could overflow size_t */ 712 double newlimit; 713 714 newlimit = (double) hash_table_bytes * (double) (parallel_workers + 1); 715 newlimit = Min(newlimit, (double) SIZE_MAX); 716 hash_table_bytes = (size_t) newlimit; 717 } 718 719 *space_allowed = hash_table_bytes; 720 721 /* 722 * If skew optimization is possible, estimate the number of skew buckets 723 * that will fit in the memory allowed, and decrement the assumed space 724 * available for the main hash table accordingly. 725 * 726 * We make the optimistic assumption that each skew bucket will contain 727 * one inner-relation tuple. If that turns out to be low, we will recover 728 * at runtime by reducing the number of skew buckets. 729 * 730 * hashtable->skewBucket will have up to 8 times as many HashSkewBucket 731 * pointers as the number of MCVs we allow, since ExecHashBuildSkewHash 732 * will round up to the next power of 2 and then multiply by 4 to reduce 733 * collisions. 734 */ 735 if (useskew) 736 { 737 size_t bytes_per_mcv; 738 size_t skew_mcvs; 739 740 /*---------- 741 * Compute number of MCVs we could hold in hash_table_bytes 742 * 743 * Divisor is: 744 * size of a hash tuple + 745 * worst-case size of skewBucket[] per MCV + 746 * size of skewBucketNums[] entry + 747 * size of skew bucket struct itself 748 *---------- 749 */ 750 bytes_per_mcv = tupsize + 751 (8 * sizeof(HashSkewBucket *)) + 752 sizeof(int) + 753 SKEW_BUCKET_OVERHEAD; 754 skew_mcvs = hash_table_bytes / bytes_per_mcv; 755 756 /* 757 * Now scale by SKEW_HASH_MEM_PERCENT (we do it in this order so as 758 * not to worry about size_t overflow in the multiplication) 759 */ 760 skew_mcvs = (skew_mcvs * SKEW_HASH_MEM_PERCENT) / 100; 761 762 /* Now clamp to integer range */ 763 skew_mcvs = Min(skew_mcvs, INT_MAX); 764 765 *num_skew_mcvs = (int) skew_mcvs; 766 767 /* Reduce hash_table_bytes by the amount needed for the skew table */ 768 if (skew_mcvs > 0) 769 hash_table_bytes -= skew_mcvs * bytes_per_mcv; 770 } 771 else 772 *num_skew_mcvs = 0; 773 774 /* 775 * Set nbuckets to achieve an average bucket load of NTUP_PER_BUCKET when 776 * memory is filled, assuming a single batch; but limit the value so that 777 * the pointer arrays we'll try to allocate do not exceed hash_table_bytes 778 * nor MaxAllocSize. 779 * 780 * Note that both nbuckets and nbatch must be powers of 2 to make 781 * ExecHashGetBucketAndBatch fast. 782 */ 783 max_pointers = hash_table_bytes / sizeof(HashJoinTuple); 784 max_pointers = Min(max_pointers, MaxAllocSize / sizeof(HashJoinTuple)); 785 /* If max_pointers isn't a power of 2, must round it down to one */ 786 max_pointers = pg_prevpower2_size_t(max_pointers); 787 788 /* Also ensure we avoid integer overflow in nbatch and nbuckets */ 789 /* (this step is redundant given the current value of MaxAllocSize) */ 790 max_pointers = Min(max_pointers, INT_MAX / 2 + 1); 791 792 dbuckets = ceil(ntuples / NTUP_PER_BUCKET); 793 dbuckets = Min(dbuckets, max_pointers); 794 nbuckets = (int) dbuckets; 795 /* don't let nbuckets be really small, though ... */ 796 nbuckets = Max(nbuckets, 1024); 797 /* ... and force it to be a power of 2. */ 798 nbuckets = pg_nextpower2_32(nbuckets); 799 800 /* 801 * If there's not enough space to store the projected number of tuples and 802 * the required bucket headers, we will need multiple batches. 803 */ 804 bucket_bytes = sizeof(HashJoinTuple) * nbuckets; 805 if (inner_rel_bytes + bucket_bytes > hash_table_bytes) 806 { 807 /* We'll need multiple batches */ 808 size_t sbuckets; 809 double dbatch; 810 int minbatch; 811 size_t bucket_size; 812 813 /* 814 * If Parallel Hash with combined hash_mem would still need multiple 815 * batches, we'll have to fall back to regular hash_mem budget. 816 */ 817 if (try_combined_hash_mem) 818 { 819 ExecChooseHashTableSize(ntuples, tupwidth, useskew, 820 false, parallel_workers, 821 space_allowed, 822 numbuckets, 823 numbatches, 824 num_skew_mcvs); 825 return; 826 } 827 828 /* 829 * Estimate the number of buckets we'll want to have when hash_mem is 830 * entirely full. Each bucket will contain a bucket pointer plus 831 * NTUP_PER_BUCKET tuples, whose projected size already includes 832 * overhead for the hash code, pointer to the next tuple, etc. 833 */ 834 bucket_size = (tupsize * NTUP_PER_BUCKET + sizeof(HashJoinTuple)); 835 sbuckets = pg_nextpower2_size_t(hash_table_bytes / bucket_size); 836 sbuckets = Min(sbuckets, max_pointers); 837 nbuckets = (int) sbuckets; 838 nbuckets = pg_nextpower2_32(nbuckets); 839 bucket_bytes = nbuckets * sizeof(HashJoinTuple); 840 841 /* 842 * Buckets are simple pointers to hashjoin tuples, while tupsize 843 * includes the pointer, hash code, and MinimalTupleData. So buckets 844 * should never really exceed 25% of hash_mem (even for 845 * NTUP_PER_BUCKET=1); except maybe for hash_mem values that are not 846 * 2^N bytes, where we might get more because of doubling. So let's 847 * look for 50% here. 848 */ 849 Assert(bucket_bytes <= hash_table_bytes / 2); 850 851 /* Calculate required number of batches. */ 852 dbatch = ceil(inner_rel_bytes / (hash_table_bytes - bucket_bytes)); 853 dbatch = Min(dbatch, max_pointers); 854 minbatch = (int) dbatch; 855 nbatch = pg_nextpower2_32(Max(2, minbatch)); 856 } 857 858 Assert(nbuckets > 0); 859 Assert(nbatch > 0); 860 861 *numbuckets = nbuckets; 862 *numbatches = nbatch; 863 } 864 865 866 /* ---------------------------------------------------------------- 867 * ExecHashTableDestroy 868 * 869 * destroy a hash table 870 * ---------------------------------------------------------------- 871 */ 872 void 873 ExecHashTableDestroy(HashJoinTable hashtable) 874 { 875 int i; 876 877 /* 878 * Make sure all the temp files are closed. We skip batch 0, since it 879 * can't have any temp files (and the arrays might not even exist if 880 * nbatch is only 1). Parallel hash joins don't use these files. 881 */ 882 if (hashtable->innerBatchFile != NULL) 883 { 884 for (i = 1; i < hashtable->nbatch; i++) 885 { 886 if (hashtable->innerBatchFile[i]) 887 BufFileClose(hashtable->innerBatchFile[i]); 888 if (hashtable->outerBatchFile[i]) 889 BufFileClose(hashtable->outerBatchFile[i]); 890 } 891 } 892 893 /* Release working memory (batchCxt is a child, so it goes away too) */ 894 MemoryContextDelete(hashtable->hashCxt); 895 896 /* And drop the control block */ 897 pfree(hashtable); 898 } 899 900 /* 901 * ExecHashIncreaseNumBatches 902 * increase the original number of batches in order to reduce 903 * current memory consumption 904 */ 905 static void 906 ExecHashIncreaseNumBatches(HashJoinTable hashtable) 907 { 908 int oldnbatch = hashtable->nbatch; 909 int curbatch = hashtable->curbatch; 910 int nbatch; 911 MemoryContext oldcxt; 912 long ninmemory; 913 long nfreed; 914 HashMemoryChunk oldchunks; 915 916 /* do nothing if we've decided to shut off growth */ 917 if (!hashtable->growEnabled) 918 return; 919 920 /* safety check to avoid overflow */ 921 if (oldnbatch > Min(INT_MAX / 2, MaxAllocSize / (sizeof(void *) * 2))) 922 return; 923 924 nbatch = oldnbatch * 2; 925 Assert(nbatch > 1); 926 927 #ifdef HJDEBUG 928 printf("Hashjoin %p: increasing nbatch to %d because space = %zu\n", 929 hashtable, nbatch, hashtable->spaceUsed); 930 #endif 931 932 oldcxt = MemoryContextSwitchTo(hashtable->hashCxt); 933 934 if (hashtable->innerBatchFile == NULL) 935 { 936 /* we had no file arrays before */ 937 hashtable->innerBatchFile = (BufFile **) 938 palloc0(nbatch * sizeof(BufFile *)); 939 hashtable->outerBatchFile = (BufFile **) 940 palloc0(nbatch * sizeof(BufFile *)); 941 /* time to establish the temp tablespaces, too */ 942 PrepareTempTablespaces(); 943 } 944 else 945 { 946 /* enlarge arrays and zero out added entries */ 947 hashtable->innerBatchFile = (BufFile **) 948 repalloc(hashtable->innerBatchFile, nbatch * sizeof(BufFile *)); 949 hashtable->outerBatchFile = (BufFile **) 950 repalloc(hashtable->outerBatchFile, nbatch * sizeof(BufFile *)); 951 MemSet(hashtable->innerBatchFile + oldnbatch, 0, 952 (nbatch - oldnbatch) * sizeof(BufFile *)); 953 MemSet(hashtable->outerBatchFile + oldnbatch, 0, 954 (nbatch - oldnbatch) * sizeof(BufFile *)); 955 } 956 957 MemoryContextSwitchTo(oldcxt); 958 959 hashtable->nbatch = nbatch; 960 961 /* 962 * Scan through the existing hash table entries and dump out any that are 963 * no longer of the current batch. 964 */ 965 ninmemory = nfreed = 0; 966 967 /* If know we need to resize nbuckets, we can do it while rebatching. */ 968 if (hashtable->nbuckets_optimal != hashtable->nbuckets) 969 { 970 /* we never decrease the number of buckets */ 971 Assert(hashtable->nbuckets_optimal > hashtable->nbuckets); 972 973 hashtable->nbuckets = hashtable->nbuckets_optimal; 974 hashtable->log2_nbuckets = hashtable->log2_nbuckets_optimal; 975 976 hashtable->buckets.unshared = 977 repalloc(hashtable->buckets.unshared, 978 sizeof(HashJoinTuple) * hashtable->nbuckets); 979 } 980 981 /* 982 * We will scan through the chunks directly, so that we can reset the 983 * buckets now and not have to keep track which tuples in the buckets have 984 * already been processed. We will free the old chunks as we go. 985 */ 986 memset(hashtable->buckets.unshared, 0, 987 sizeof(HashJoinTuple) * hashtable->nbuckets); 988 oldchunks = hashtable->chunks; 989 hashtable->chunks = NULL; 990 991 /* so, let's scan through the old chunks, and all tuples in each chunk */ 992 while (oldchunks != NULL) 993 { 994 HashMemoryChunk nextchunk = oldchunks->next.unshared; 995 996 /* position within the buffer (up to oldchunks->used) */ 997 size_t idx = 0; 998 999 /* process all tuples stored in this chunk (and then free it) */ 1000 while (idx < oldchunks->used) 1001 { 1002 HashJoinTuple hashTuple = (HashJoinTuple) (HASH_CHUNK_DATA(oldchunks) + idx); 1003 MinimalTuple tuple = HJTUPLE_MINTUPLE(hashTuple); 1004 int hashTupleSize = (HJTUPLE_OVERHEAD + tuple->t_len); 1005 int bucketno; 1006 int batchno; 1007 1008 ninmemory++; 1009 ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue, 1010 &bucketno, &batchno); 1011 1012 if (batchno == curbatch) 1013 { 1014 /* keep tuple in memory - copy it into the new chunk */ 1015 HashJoinTuple copyTuple; 1016 1017 copyTuple = (HashJoinTuple) dense_alloc(hashtable, hashTupleSize); 1018 memcpy(copyTuple, hashTuple, hashTupleSize); 1019 1020 /* and add it back to the appropriate bucket */ 1021 copyTuple->next.unshared = hashtable->buckets.unshared[bucketno]; 1022 hashtable->buckets.unshared[bucketno] = copyTuple; 1023 } 1024 else 1025 { 1026 /* dump it out */ 1027 Assert(batchno > curbatch); 1028 ExecHashJoinSaveTuple(HJTUPLE_MINTUPLE(hashTuple), 1029 hashTuple->hashvalue, 1030 &hashtable->innerBatchFile[batchno]); 1031 1032 hashtable->spaceUsed -= hashTupleSize; 1033 nfreed++; 1034 } 1035 1036 /* next tuple in this chunk */ 1037 idx += MAXALIGN(hashTupleSize); 1038 1039 /* allow this loop to be cancellable */ 1040 CHECK_FOR_INTERRUPTS(); 1041 } 1042 1043 /* we're done with this chunk - free it and proceed to the next one */ 1044 pfree(oldchunks); 1045 oldchunks = nextchunk; 1046 } 1047 1048 #ifdef HJDEBUG 1049 printf("Hashjoin %p: freed %ld of %ld tuples, space now %zu\n", 1050 hashtable, nfreed, ninmemory, hashtable->spaceUsed); 1051 #endif 1052 1053 /* 1054 * If we dumped out either all or none of the tuples in the table, disable 1055 * further expansion of nbatch. This situation implies that we have 1056 * enough tuples of identical hashvalues to overflow spaceAllowed. 1057 * Increasing nbatch will not fix it since there's no way to subdivide the 1058 * group any more finely. We have to just gut it out and hope the server 1059 * has enough RAM. 1060 */ 1061 if (nfreed == 0 || nfreed == ninmemory) 1062 { 1063 hashtable->growEnabled = false; 1064 #ifdef HJDEBUG 1065 printf("Hashjoin %p: disabling further increase of nbatch\n", 1066 hashtable); 1067 #endif 1068 } 1069 } 1070 1071 /* 1072 * ExecParallelHashIncreaseNumBatches 1073 * Every participant attached to grow_batches_barrier must run this 1074 * function when it observes growth == PHJ_GROWTH_NEED_MORE_BATCHES. 1075 */ 1076 static void 1077 ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable) 1078 { 1079 ParallelHashJoinState *pstate = hashtable->parallel_state; 1080 int i; 1081 1082 Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_HASHING_INNER); 1083 1084 /* 1085 * It's unlikely, but we need to be prepared for new participants to show 1086 * up while we're in the middle of this operation so we need to switch on 1087 * barrier phase here. 1088 */ 1089 switch (PHJ_GROW_BATCHES_PHASE(BarrierPhase(&pstate->grow_batches_barrier))) 1090 { 1091 case PHJ_GROW_BATCHES_ELECTING: 1092 1093 /* 1094 * Elect one participant to prepare to grow the number of batches. 1095 * This involves reallocating or resetting the buckets of batch 0 1096 * in preparation for all participants to begin repartitioning the 1097 * tuples. 1098 */ 1099 if (BarrierArriveAndWait(&pstate->grow_batches_barrier, 1100 WAIT_EVENT_HASH_GROW_BATCHES_ELECT)) 1101 { 1102 dsa_pointer_atomic *buckets; 1103 ParallelHashJoinBatch *old_batch0; 1104 int new_nbatch; 1105 int i; 1106 1107 /* Move the old batch out of the way. */ 1108 old_batch0 = hashtable->batches[0].shared; 1109 pstate->old_batches = pstate->batches; 1110 pstate->old_nbatch = hashtable->nbatch; 1111 pstate->batches = InvalidDsaPointer; 1112 1113 /* Free this backend's old accessors. */ 1114 ExecParallelHashCloseBatchAccessors(hashtable); 1115 1116 /* Figure out how many batches to use. */ 1117 if (hashtable->nbatch == 1) 1118 { 1119 /* 1120 * We are going from single-batch to multi-batch. We need 1121 * to switch from one large combined memory budget to the 1122 * regular hash_mem budget. 1123 */ 1124 pstate->space_allowed = get_hash_memory_limit(); 1125 1126 /* 1127 * The combined hash_mem of all participants wasn't 1128 * enough. Therefore one batch per participant would be 1129 * approximately equivalent and would probably also be 1130 * insufficient. So try two batches per participant, 1131 * rounded up to a power of two. 1132 */ 1133 new_nbatch = pg_nextpower2_32(pstate->nparticipants * 2); 1134 } 1135 else 1136 { 1137 /* 1138 * We were already multi-batched. Try doubling the number 1139 * of batches. 1140 */ 1141 new_nbatch = hashtable->nbatch * 2; 1142 } 1143 1144 /* Allocate new larger generation of batches. */ 1145 Assert(hashtable->nbatch == pstate->nbatch); 1146 ExecParallelHashJoinSetUpBatches(hashtable, new_nbatch); 1147 Assert(hashtable->nbatch == pstate->nbatch); 1148 1149 /* Replace or recycle batch 0's bucket array. */ 1150 if (pstate->old_nbatch == 1) 1151 { 1152 double dtuples; 1153 double dbuckets; 1154 int new_nbuckets; 1155 1156 /* 1157 * We probably also need a smaller bucket array. How many 1158 * tuples do we expect per batch, assuming we have only 1159 * half of them so far? Normally we don't need to change 1160 * the bucket array's size, because the size of each batch 1161 * stays the same as we add more batches, but in this 1162 * special case we move from a large batch to many smaller 1163 * batches and it would be wasteful to keep the large 1164 * array. 1165 */ 1166 dtuples = (old_batch0->ntuples * 2.0) / new_nbatch; 1167 dbuckets = ceil(dtuples / NTUP_PER_BUCKET); 1168 dbuckets = Min(dbuckets, 1169 MaxAllocSize / sizeof(dsa_pointer_atomic)); 1170 new_nbuckets = (int) dbuckets; 1171 new_nbuckets = Max(new_nbuckets, 1024); 1172 new_nbuckets = pg_nextpower2_32(new_nbuckets); 1173 dsa_free(hashtable->area, old_batch0->buckets); 1174 hashtable->batches[0].shared->buckets = 1175 dsa_allocate(hashtable->area, 1176 sizeof(dsa_pointer_atomic) * new_nbuckets); 1177 buckets = (dsa_pointer_atomic *) 1178 dsa_get_address(hashtable->area, 1179 hashtable->batches[0].shared->buckets); 1180 for (i = 0; i < new_nbuckets; ++i) 1181 dsa_pointer_atomic_init(&buckets[i], InvalidDsaPointer); 1182 pstate->nbuckets = new_nbuckets; 1183 } 1184 else 1185 { 1186 /* Recycle the existing bucket array. */ 1187 hashtable->batches[0].shared->buckets = old_batch0->buckets; 1188 buckets = (dsa_pointer_atomic *) 1189 dsa_get_address(hashtable->area, old_batch0->buckets); 1190 for (i = 0; i < hashtable->nbuckets; ++i) 1191 dsa_pointer_atomic_write(&buckets[i], InvalidDsaPointer); 1192 } 1193 1194 /* Move all chunks to the work queue for parallel processing. */ 1195 pstate->chunk_work_queue = old_batch0->chunks; 1196 1197 /* Disable further growth temporarily while we're growing. */ 1198 pstate->growth = PHJ_GROWTH_DISABLED; 1199 } 1200 else 1201 { 1202 /* All other participants just flush their tuples to disk. */ 1203 ExecParallelHashCloseBatchAccessors(hashtable); 1204 } 1205 /* Fall through. */ 1206 1207 case PHJ_GROW_BATCHES_ALLOCATING: 1208 /* Wait for the above to be finished. */ 1209 BarrierArriveAndWait(&pstate->grow_batches_barrier, 1210 WAIT_EVENT_HASH_GROW_BATCHES_ALLOCATE); 1211 /* Fall through. */ 1212 1213 case PHJ_GROW_BATCHES_REPARTITIONING: 1214 /* Make sure that we have the current dimensions and buckets. */ 1215 ExecParallelHashEnsureBatchAccessors(hashtable); 1216 ExecParallelHashTableSetCurrentBatch(hashtable, 0); 1217 /* Then partition, flush counters. */ 1218 ExecParallelHashRepartitionFirst(hashtable); 1219 ExecParallelHashRepartitionRest(hashtable); 1220 ExecParallelHashMergeCounters(hashtable); 1221 /* Wait for the above to be finished. */ 1222 BarrierArriveAndWait(&pstate->grow_batches_barrier, 1223 WAIT_EVENT_HASH_GROW_BATCHES_REPARTITION); 1224 /* Fall through. */ 1225 1226 case PHJ_GROW_BATCHES_DECIDING: 1227 1228 /* 1229 * Elect one participant to clean up and decide whether further 1230 * repartitioning is needed, or should be disabled because it's 1231 * not helping. 1232 */ 1233 if (BarrierArriveAndWait(&pstate->grow_batches_barrier, 1234 WAIT_EVENT_HASH_GROW_BATCHES_DECIDE)) 1235 { 1236 bool space_exhausted = false; 1237 bool extreme_skew_detected = false; 1238 1239 /* Make sure that we have the current dimensions and buckets. */ 1240 ExecParallelHashEnsureBatchAccessors(hashtable); 1241 ExecParallelHashTableSetCurrentBatch(hashtable, 0); 1242 1243 /* Are any of the new generation of batches exhausted? */ 1244 for (i = 0; i < hashtable->nbatch; ++i) 1245 { 1246 ParallelHashJoinBatch *batch = hashtable->batches[i].shared; 1247 1248 if (batch->space_exhausted || 1249 batch->estimated_size > pstate->space_allowed) 1250 { 1251 int parent; 1252 1253 space_exhausted = true; 1254 1255 /* 1256 * Did this batch receive ALL of the tuples from its 1257 * parent batch? That would indicate that further 1258 * repartitioning isn't going to help (the hash values 1259 * are probably all the same). 1260 */ 1261 parent = i % pstate->old_nbatch; 1262 if (batch->ntuples == hashtable->batches[parent].shared->old_ntuples) 1263 extreme_skew_detected = true; 1264 } 1265 } 1266 1267 /* Don't keep growing if it's not helping or we'd overflow. */ 1268 if (extreme_skew_detected || hashtable->nbatch >= INT_MAX / 2) 1269 pstate->growth = PHJ_GROWTH_DISABLED; 1270 else if (space_exhausted) 1271 pstate->growth = PHJ_GROWTH_NEED_MORE_BATCHES; 1272 else 1273 pstate->growth = PHJ_GROWTH_OK; 1274 1275 /* Free the old batches in shared memory. */ 1276 dsa_free(hashtable->area, pstate->old_batches); 1277 pstate->old_batches = InvalidDsaPointer; 1278 } 1279 /* Fall through. */ 1280 1281 case PHJ_GROW_BATCHES_FINISHING: 1282 /* Wait for the above to complete. */ 1283 BarrierArriveAndWait(&pstate->grow_batches_barrier, 1284 WAIT_EVENT_HASH_GROW_BATCHES_FINISH); 1285 } 1286 } 1287 1288 /* 1289 * Repartition the tuples currently loaded into memory for inner batch 0 1290 * because the number of batches has been increased. Some tuples are retained 1291 * in memory and some are written out to a later batch. 1292 */ 1293 static void 1294 ExecParallelHashRepartitionFirst(HashJoinTable hashtable) 1295 { 1296 dsa_pointer chunk_shared; 1297 HashMemoryChunk chunk; 1298 1299 Assert(hashtable->nbatch == hashtable->parallel_state->nbatch); 1300 1301 while ((chunk = ExecParallelHashPopChunkQueue(hashtable, &chunk_shared))) 1302 { 1303 size_t idx = 0; 1304 1305 /* Repartition all tuples in this chunk. */ 1306 while (idx < chunk->used) 1307 { 1308 HashJoinTuple hashTuple = (HashJoinTuple) (HASH_CHUNK_DATA(chunk) + idx); 1309 MinimalTuple tuple = HJTUPLE_MINTUPLE(hashTuple); 1310 HashJoinTuple copyTuple; 1311 dsa_pointer shared; 1312 int bucketno; 1313 int batchno; 1314 1315 ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue, 1316 &bucketno, &batchno); 1317 1318 Assert(batchno < hashtable->nbatch); 1319 if (batchno == 0) 1320 { 1321 /* It still belongs in batch 0. Copy to a new chunk. */ 1322 copyTuple = 1323 ExecParallelHashTupleAlloc(hashtable, 1324 HJTUPLE_OVERHEAD + tuple->t_len, 1325 &shared); 1326 copyTuple->hashvalue = hashTuple->hashvalue; 1327 memcpy(HJTUPLE_MINTUPLE(copyTuple), tuple, tuple->t_len); 1328 ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno], 1329 copyTuple, shared); 1330 } 1331 else 1332 { 1333 size_t tuple_size = 1334 MAXALIGN(HJTUPLE_OVERHEAD + tuple->t_len); 1335 1336 /* It belongs in a later batch. */ 1337 hashtable->batches[batchno].estimated_size += tuple_size; 1338 sts_puttuple(hashtable->batches[batchno].inner_tuples, 1339 &hashTuple->hashvalue, tuple); 1340 } 1341 1342 /* Count this tuple. */ 1343 ++hashtable->batches[0].old_ntuples; 1344 ++hashtable->batches[batchno].ntuples; 1345 1346 idx += MAXALIGN(HJTUPLE_OVERHEAD + 1347 HJTUPLE_MINTUPLE(hashTuple)->t_len); 1348 } 1349 1350 /* Free this chunk. */ 1351 dsa_free(hashtable->area, chunk_shared); 1352 1353 CHECK_FOR_INTERRUPTS(); 1354 } 1355 } 1356 1357 /* 1358 * Help repartition inner batches 1..n. 1359 */ 1360 static void 1361 ExecParallelHashRepartitionRest(HashJoinTable hashtable) 1362 { 1363 ParallelHashJoinState *pstate = hashtable->parallel_state; 1364 int old_nbatch = pstate->old_nbatch; 1365 SharedTuplestoreAccessor **old_inner_tuples; 1366 ParallelHashJoinBatch *old_batches; 1367 int i; 1368 1369 /* Get our hands on the previous generation of batches. */ 1370 old_batches = (ParallelHashJoinBatch *) 1371 dsa_get_address(hashtable->area, pstate->old_batches); 1372 old_inner_tuples = palloc0(sizeof(SharedTuplestoreAccessor *) * old_nbatch); 1373 for (i = 1; i < old_nbatch; ++i) 1374 { 1375 ParallelHashJoinBatch *shared = 1376 NthParallelHashJoinBatch(old_batches, i); 1377 1378 old_inner_tuples[i] = sts_attach(ParallelHashJoinBatchInner(shared), 1379 ParallelWorkerNumber + 1, 1380 &pstate->fileset); 1381 } 1382 1383 /* Join in the effort to repartition them. */ 1384 for (i = 1; i < old_nbatch; ++i) 1385 { 1386 MinimalTuple tuple; 1387 uint32 hashvalue; 1388 1389 /* Scan one partition from the previous generation. */ 1390 sts_begin_parallel_scan(old_inner_tuples[i]); 1391 while ((tuple = sts_parallel_scan_next(old_inner_tuples[i], &hashvalue))) 1392 { 1393 size_t tuple_size = MAXALIGN(HJTUPLE_OVERHEAD + tuple->t_len); 1394 int bucketno; 1395 int batchno; 1396 1397 /* Decide which partition it goes to in the new generation. */ 1398 ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, 1399 &batchno); 1400 1401 hashtable->batches[batchno].estimated_size += tuple_size; 1402 ++hashtable->batches[batchno].ntuples; 1403 ++hashtable->batches[i].old_ntuples; 1404 1405 /* Store the tuple its new batch. */ 1406 sts_puttuple(hashtable->batches[batchno].inner_tuples, 1407 &hashvalue, tuple); 1408 1409 CHECK_FOR_INTERRUPTS(); 1410 } 1411 sts_end_parallel_scan(old_inner_tuples[i]); 1412 } 1413 1414 pfree(old_inner_tuples); 1415 } 1416 1417 /* 1418 * Transfer the backend-local per-batch counters to the shared totals. 1419 */ 1420 static void 1421 ExecParallelHashMergeCounters(HashJoinTable hashtable) 1422 { 1423 ParallelHashJoinState *pstate = hashtable->parallel_state; 1424 int i; 1425 1426 LWLockAcquire(&pstate->lock, LW_EXCLUSIVE); 1427 pstate->total_tuples = 0; 1428 for (i = 0; i < hashtable->nbatch; ++i) 1429 { 1430 ParallelHashJoinBatchAccessor *batch = &hashtable->batches[i]; 1431 1432 batch->shared->size += batch->size; 1433 batch->shared->estimated_size += batch->estimated_size; 1434 batch->shared->ntuples += batch->ntuples; 1435 batch->shared->old_ntuples += batch->old_ntuples; 1436 batch->size = 0; 1437 batch->estimated_size = 0; 1438 batch->ntuples = 0; 1439 batch->old_ntuples = 0; 1440 pstate->total_tuples += batch->shared->ntuples; 1441 } 1442 LWLockRelease(&pstate->lock); 1443 } 1444 1445 /* 1446 * ExecHashIncreaseNumBuckets 1447 * increase the original number of buckets in order to reduce 1448 * number of tuples per bucket 1449 */ 1450 static void 1451 ExecHashIncreaseNumBuckets(HashJoinTable hashtable) 1452 { 1453 HashMemoryChunk chunk; 1454 1455 /* do nothing if not an increase (it's called increase for a reason) */ 1456 if (hashtable->nbuckets >= hashtable->nbuckets_optimal) 1457 return; 1458 1459 #ifdef HJDEBUG 1460 printf("Hashjoin %p: increasing nbuckets %d => %d\n", 1461 hashtable, hashtable->nbuckets, hashtable->nbuckets_optimal); 1462 #endif 1463 1464 hashtable->nbuckets = hashtable->nbuckets_optimal; 1465 hashtable->log2_nbuckets = hashtable->log2_nbuckets_optimal; 1466 1467 Assert(hashtable->nbuckets > 1); 1468 Assert(hashtable->nbuckets <= (INT_MAX / 2)); 1469 Assert(hashtable->nbuckets == (1 << hashtable->log2_nbuckets)); 1470 1471 /* 1472 * Just reallocate the proper number of buckets - we don't need to walk 1473 * through them - we can walk the dense-allocated chunks (just like in 1474 * ExecHashIncreaseNumBatches, but without all the copying into new 1475 * chunks) 1476 */ 1477 hashtable->buckets.unshared = 1478 (HashJoinTuple *) repalloc(hashtable->buckets.unshared, 1479 hashtable->nbuckets * sizeof(HashJoinTuple)); 1480 1481 memset(hashtable->buckets.unshared, 0, 1482 hashtable->nbuckets * sizeof(HashJoinTuple)); 1483 1484 /* scan through all tuples in all chunks to rebuild the hash table */ 1485 for (chunk = hashtable->chunks; chunk != NULL; chunk = chunk->next.unshared) 1486 { 1487 /* process all tuples stored in this chunk */ 1488 size_t idx = 0; 1489 1490 while (idx < chunk->used) 1491 { 1492 HashJoinTuple hashTuple = (HashJoinTuple) (HASH_CHUNK_DATA(chunk) + idx); 1493 int bucketno; 1494 int batchno; 1495 1496 ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue, 1497 &bucketno, &batchno); 1498 1499 /* add the tuple to the proper bucket */ 1500 hashTuple->next.unshared = hashtable->buckets.unshared[bucketno]; 1501 hashtable->buckets.unshared[bucketno] = hashTuple; 1502 1503 /* advance index past the tuple */ 1504 idx += MAXALIGN(HJTUPLE_OVERHEAD + 1505 HJTUPLE_MINTUPLE(hashTuple)->t_len); 1506 } 1507 1508 /* allow this loop to be cancellable */ 1509 CHECK_FOR_INTERRUPTS(); 1510 } 1511 } 1512 1513 static void 1514 ExecParallelHashIncreaseNumBuckets(HashJoinTable hashtable) 1515 { 1516 ParallelHashJoinState *pstate = hashtable->parallel_state; 1517 int i; 1518 HashMemoryChunk chunk; 1519 dsa_pointer chunk_s; 1520 1521 Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_HASHING_INNER); 1522 1523 /* 1524 * It's unlikely, but we need to be prepared for new participants to show 1525 * up while we're in the middle of this operation so we need to switch on 1526 * barrier phase here. 1527 */ 1528 switch (PHJ_GROW_BUCKETS_PHASE(BarrierPhase(&pstate->grow_buckets_barrier))) 1529 { 1530 case PHJ_GROW_BUCKETS_ELECTING: 1531 /* Elect one participant to prepare to increase nbuckets. */ 1532 if (BarrierArriveAndWait(&pstate->grow_buckets_barrier, 1533 WAIT_EVENT_HASH_GROW_BUCKETS_ELECT)) 1534 { 1535 size_t size; 1536 dsa_pointer_atomic *buckets; 1537 1538 /* Double the size of the bucket array. */ 1539 pstate->nbuckets *= 2; 1540 size = pstate->nbuckets * sizeof(dsa_pointer_atomic); 1541 hashtable->batches[0].shared->size += size / 2; 1542 dsa_free(hashtable->area, hashtable->batches[0].shared->buckets); 1543 hashtable->batches[0].shared->buckets = 1544 dsa_allocate(hashtable->area, size); 1545 buckets = (dsa_pointer_atomic *) 1546 dsa_get_address(hashtable->area, 1547 hashtable->batches[0].shared->buckets); 1548 for (i = 0; i < pstate->nbuckets; ++i) 1549 dsa_pointer_atomic_init(&buckets[i], InvalidDsaPointer); 1550 1551 /* Put the chunk list onto the work queue. */ 1552 pstate->chunk_work_queue = hashtable->batches[0].shared->chunks; 1553 1554 /* Clear the flag. */ 1555 pstate->growth = PHJ_GROWTH_OK; 1556 } 1557 /* Fall through. */ 1558 1559 case PHJ_GROW_BUCKETS_ALLOCATING: 1560 /* Wait for the above to complete. */ 1561 BarrierArriveAndWait(&pstate->grow_buckets_barrier, 1562 WAIT_EVENT_HASH_GROW_BUCKETS_ALLOCATE); 1563 /* Fall through. */ 1564 1565 case PHJ_GROW_BUCKETS_REINSERTING: 1566 /* Reinsert all tuples into the hash table. */ 1567 ExecParallelHashEnsureBatchAccessors(hashtable); 1568 ExecParallelHashTableSetCurrentBatch(hashtable, 0); 1569 while ((chunk = ExecParallelHashPopChunkQueue(hashtable, &chunk_s))) 1570 { 1571 size_t idx = 0; 1572 1573 while (idx < chunk->used) 1574 { 1575 HashJoinTuple hashTuple = (HashJoinTuple) (HASH_CHUNK_DATA(chunk) + idx); 1576 dsa_pointer shared = chunk_s + HASH_CHUNK_HEADER_SIZE + idx; 1577 int bucketno; 1578 int batchno; 1579 1580 ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue, 1581 &bucketno, &batchno); 1582 Assert(batchno == 0); 1583 1584 /* add the tuple to the proper bucket */ 1585 ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno], 1586 hashTuple, shared); 1587 1588 /* advance index past the tuple */ 1589 idx += MAXALIGN(HJTUPLE_OVERHEAD + 1590 HJTUPLE_MINTUPLE(hashTuple)->t_len); 1591 } 1592 1593 /* allow this loop to be cancellable */ 1594 CHECK_FOR_INTERRUPTS(); 1595 } 1596 BarrierArriveAndWait(&pstate->grow_buckets_barrier, 1597 WAIT_EVENT_HASH_GROW_BUCKETS_REINSERT); 1598 } 1599 } 1600 1601 /* 1602 * ExecHashTableInsert 1603 * insert a tuple into the hash table depending on the hash value 1604 * it may just go to a temp file for later batches 1605 * 1606 * Note: the passed TupleTableSlot may contain a regular, minimal, or virtual 1607 * tuple; the minimal case in particular is certain to happen while reloading 1608 * tuples from batch files. We could save some cycles in the regular-tuple 1609 * case by not forcing the slot contents into minimal form; not clear if it's 1610 * worth the messiness required. 1611 */ 1612 void 1613 ExecHashTableInsert(HashJoinTable hashtable, 1614 TupleTableSlot *slot, 1615 uint32 hashvalue) 1616 { 1617 bool shouldFree; 1618 MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot, &shouldFree); 1619 int bucketno; 1620 int batchno; 1621 1622 ExecHashGetBucketAndBatch(hashtable, hashvalue, 1623 &bucketno, &batchno); 1624 1625 /* 1626 * decide whether to put the tuple in the hash table or a temp file 1627 */ 1628 if (batchno == hashtable->curbatch) 1629 { 1630 /* 1631 * put the tuple in hash table 1632 */ 1633 HashJoinTuple hashTuple; 1634 int hashTupleSize; 1635 double ntuples = (hashtable->totalTuples - hashtable->skewTuples); 1636 1637 /* Create the HashJoinTuple */ 1638 hashTupleSize = HJTUPLE_OVERHEAD + tuple->t_len; 1639 hashTuple = (HashJoinTuple) dense_alloc(hashtable, hashTupleSize); 1640 1641 hashTuple->hashvalue = hashvalue; 1642 memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len); 1643 1644 /* 1645 * We always reset the tuple-matched flag on insertion. This is okay 1646 * even when reloading a tuple from a batch file, since the tuple 1647 * could not possibly have been matched to an outer tuple before it 1648 * went into the batch file. 1649 */ 1650 HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(hashTuple)); 1651 1652 /* Push it onto the front of the bucket's list */ 1653 hashTuple->next.unshared = hashtable->buckets.unshared[bucketno]; 1654 hashtable->buckets.unshared[bucketno] = hashTuple; 1655 1656 /* 1657 * Increase the (optimal) number of buckets if we just exceeded the 1658 * NTUP_PER_BUCKET threshold, but only when there's still a single 1659 * batch. 1660 */ 1661 if (hashtable->nbatch == 1 && 1662 ntuples > (hashtable->nbuckets_optimal * NTUP_PER_BUCKET)) 1663 { 1664 /* Guard against integer overflow and alloc size overflow */ 1665 if (hashtable->nbuckets_optimal <= INT_MAX / 2 && 1666 hashtable->nbuckets_optimal * 2 <= MaxAllocSize / sizeof(HashJoinTuple)) 1667 { 1668 hashtable->nbuckets_optimal *= 2; 1669 hashtable->log2_nbuckets_optimal += 1; 1670 } 1671 } 1672 1673 /* Account for space used, and back off if we've used too much */ 1674 hashtable->spaceUsed += hashTupleSize; 1675 if (hashtable->spaceUsed > hashtable->spacePeak) 1676 hashtable->spacePeak = hashtable->spaceUsed; 1677 if (hashtable->spaceUsed + 1678 hashtable->nbuckets_optimal * sizeof(HashJoinTuple) 1679 > hashtable->spaceAllowed) 1680 ExecHashIncreaseNumBatches(hashtable); 1681 } 1682 else 1683 { 1684 /* 1685 * put the tuple into a temp file for later batches 1686 */ 1687 Assert(batchno > hashtable->curbatch); 1688 ExecHashJoinSaveTuple(tuple, 1689 hashvalue, 1690 &hashtable->innerBatchFile[batchno]); 1691 } 1692 1693 if (shouldFree) 1694 heap_free_minimal_tuple(tuple); 1695 } 1696 1697 /* 1698 * ExecParallelHashTableInsert 1699 * insert a tuple into a shared hash table or shared batch tuplestore 1700 */ 1701 void 1702 ExecParallelHashTableInsert(HashJoinTable hashtable, 1703 TupleTableSlot *slot, 1704 uint32 hashvalue) 1705 { 1706 bool shouldFree; 1707 MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot, &shouldFree); 1708 dsa_pointer shared; 1709 int bucketno; 1710 int batchno; 1711 1712 retry: 1713 ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, &batchno); 1714 1715 if (batchno == 0) 1716 { 1717 HashJoinTuple hashTuple; 1718 1719 /* Try to load it into memory. */ 1720 Assert(BarrierPhase(&hashtable->parallel_state->build_barrier) == 1721 PHJ_BUILD_HASHING_INNER); 1722 hashTuple = ExecParallelHashTupleAlloc(hashtable, 1723 HJTUPLE_OVERHEAD + tuple->t_len, 1724 &shared); 1725 if (hashTuple == NULL) 1726 goto retry; 1727 1728 /* Store the hash value in the HashJoinTuple header. */ 1729 hashTuple->hashvalue = hashvalue; 1730 memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len); 1731 1732 /* Push it onto the front of the bucket's list */ 1733 ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno], 1734 hashTuple, shared); 1735 } 1736 else 1737 { 1738 size_t tuple_size = MAXALIGN(HJTUPLE_OVERHEAD + tuple->t_len); 1739 1740 Assert(batchno > 0); 1741 1742 /* Try to preallocate space in the batch if necessary. */ 1743 if (hashtable->batches[batchno].preallocated < tuple_size) 1744 { 1745 if (!ExecParallelHashTuplePrealloc(hashtable, batchno, tuple_size)) 1746 goto retry; 1747 } 1748 1749 Assert(hashtable->batches[batchno].preallocated >= tuple_size); 1750 hashtable->batches[batchno].preallocated -= tuple_size; 1751 sts_puttuple(hashtable->batches[batchno].inner_tuples, &hashvalue, 1752 tuple); 1753 } 1754 ++hashtable->batches[batchno].ntuples; 1755 1756 if (shouldFree) 1757 heap_free_minimal_tuple(tuple); 1758 } 1759 1760 /* 1761 * Insert a tuple into the current hash table. Unlike 1762 * ExecParallelHashTableInsert, this version is not prepared to send the tuple 1763 * to other batches or to run out of memory, and should only be called with 1764 * tuples that belong in the current batch once growth has been disabled. 1765 */ 1766 void 1767 ExecParallelHashTableInsertCurrentBatch(HashJoinTable hashtable, 1768 TupleTableSlot *slot, 1769 uint32 hashvalue) 1770 { 1771 bool shouldFree; 1772 MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot, &shouldFree); 1773 HashJoinTuple hashTuple; 1774 dsa_pointer shared; 1775 int batchno; 1776 int bucketno; 1777 1778 ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, &batchno); 1779 Assert(batchno == hashtable->curbatch); 1780 hashTuple = ExecParallelHashTupleAlloc(hashtable, 1781 HJTUPLE_OVERHEAD + tuple->t_len, 1782 &shared); 1783 hashTuple->hashvalue = hashvalue; 1784 memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len); 1785 HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(hashTuple)); 1786 ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno], 1787 hashTuple, shared); 1788 1789 if (shouldFree) 1790 heap_free_minimal_tuple(tuple); 1791 } 1792 1793 /* 1794 * ExecHashGetHashValue 1795 * Compute the hash value for a tuple 1796 * 1797 * The tuple to be tested must be in econtext->ecxt_outertuple (thus Vars in 1798 * the hashkeys expressions need to have OUTER_VAR as varno). If outer_tuple 1799 * is false (meaning it's the HashJoin's inner node, Hash), econtext, 1800 * hashkeys, and slot need to be from Hash, with hashkeys/slot referencing and 1801 * being suitable for tuples from the node below the Hash. Conversely, if 1802 * outer_tuple is true, econtext is from HashJoin, and hashkeys/slot need to 1803 * be appropriate for tuples from HashJoin's outer node. 1804 * 1805 * A true result means the tuple's hash value has been successfully computed 1806 * and stored at *hashvalue. A false result means the tuple cannot match 1807 * because it contains a null attribute, and hence it should be discarded 1808 * immediately. (If keep_nulls is true then false is never returned.) 1809 */ 1810 bool 1811 ExecHashGetHashValue(HashJoinTable hashtable, 1812 ExprContext *econtext, 1813 List *hashkeys, 1814 bool outer_tuple, 1815 bool keep_nulls, 1816 uint32 *hashvalue) 1817 { 1818 uint32 hashkey = 0; 1819 FmgrInfo *hashfunctions; 1820 ListCell *hk; 1821 int i = 0; 1822 MemoryContext oldContext; 1823 1824 /* 1825 * We reset the eval context each time to reclaim any memory leaked in the 1826 * hashkey expressions. 1827 */ 1828 ResetExprContext(econtext); 1829 1830 oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory); 1831 1832 if (outer_tuple) 1833 hashfunctions = hashtable->outer_hashfunctions; 1834 else 1835 hashfunctions = hashtable->inner_hashfunctions; 1836 1837 foreach(hk, hashkeys) 1838 { 1839 ExprState *keyexpr = (ExprState *) lfirst(hk); 1840 Datum keyval; 1841 bool isNull; 1842 1843 /* rotate hashkey left 1 bit at each step */ 1844 hashkey = (hashkey << 1) | ((hashkey & 0x80000000) ? 1 : 0); 1845 1846 /* 1847 * Get the join attribute value of the tuple 1848 */ 1849 keyval = ExecEvalExpr(keyexpr, econtext, &isNull); 1850 1851 /* 1852 * If the attribute is NULL, and the join operator is strict, then 1853 * this tuple cannot pass the join qual so we can reject it 1854 * immediately (unless we're scanning the outside of an outer join, in 1855 * which case we must not reject it). Otherwise we act like the 1856 * hashcode of NULL is zero (this will support operators that act like 1857 * IS NOT DISTINCT, though not any more-random behavior). We treat 1858 * the hash support function as strict even if the operator is not. 1859 * 1860 * Note: currently, all hashjoinable operators must be strict since 1861 * the hash index AM assumes that. However, it takes so little extra 1862 * code here to allow non-strict that we may as well do it. 1863 */ 1864 if (isNull) 1865 { 1866 if (hashtable->hashStrict[i] && !keep_nulls) 1867 { 1868 MemoryContextSwitchTo(oldContext); 1869 return false; /* cannot match */ 1870 } 1871 /* else, leave hashkey unmodified, equivalent to hashcode 0 */ 1872 } 1873 else 1874 { 1875 /* Compute the hash function */ 1876 uint32 hkey; 1877 1878 hkey = DatumGetUInt32(FunctionCall1Coll(&hashfunctions[i], hashtable->collations[i], keyval)); 1879 hashkey ^= hkey; 1880 } 1881 1882 i++; 1883 } 1884 1885 MemoryContextSwitchTo(oldContext); 1886 1887 *hashvalue = hashkey; 1888 return true; 1889 } 1890 1891 /* 1892 * ExecHashGetBucketAndBatch 1893 * Determine the bucket number and batch number for a hash value 1894 * 1895 * Note: on-the-fly increases of nbatch must not change the bucket number 1896 * for a given hash code (since we don't move tuples to different hash 1897 * chains), and must only cause the batch number to remain the same or 1898 * increase. Our algorithm is 1899 * bucketno = hashvalue MOD nbuckets 1900 * batchno = ROR(hashvalue, log2_nbuckets) MOD nbatch 1901 * where nbuckets and nbatch are both expected to be powers of 2, so we can 1902 * do the computations by shifting and masking. (This assumes that all hash 1903 * functions are good about randomizing all their output bits, else we are 1904 * likely to have very skewed bucket or batch occupancy.) 1905 * 1906 * nbuckets and log2_nbuckets may change while nbatch == 1 because of dynamic 1907 * bucket count growth. Once we start batching, the value is fixed and does 1908 * not change over the course of the join (making it possible to compute batch 1909 * number the way we do here). 1910 * 1911 * nbatch is always a power of 2; we increase it only by doubling it. This 1912 * effectively adds one more bit to the top of the batchno. In very large 1913 * joins, we might run out of bits to add, so we do this by rotating the hash 1914 * value. This causes batchno to steal bits from bucketno when the number of 1915 * virtual buckets exceeds 2^32. It's better to have longer bucket chains 1916 * than to lose the ability to divide batches. 1917 */ 1918 void 1919 ExecHashGetBucketAndBatch(HashJoinTable hashtable, 1920 uint32 hashvalue, 1921 int *bucketno, 1922 int *batchno) 1923 { 1924 uint32 nbuckets = (uint32) hashtable->nbuckets; 1925 uint32 nbatch = (uint32) hashtable->nbatch; 1926 1927 if (nbatch > 1) 1928 { 1929 *bucketno = hashvalue & (nbuckets - 1); 1930 *batchno = pg_rotate_right32(hashvalue, 1931 hashtable->log2_nbuckets) & (nbatch - 1); 1932 } 1933 else 1934 { 1935 *bucketno = hashvalue & (nbuckets - 1); 1936 *batchno = 0; 1937 } 1938 } 1939 1940 /* 1941 * ExecScanHashBucket 1942 * scan a hash bucket for matches to the current outer tuple 1943 * 1944 * The current outer tuple must be stored in econtext->ecxt_outertuple. 1945 * 1946 * On success, the inner tuple is stored into hjstate->hj_CurTuple and 1947 * econtext->ecxt_innertuple, using hjstate->hj_HashTupleSlot as the slot 1948 * for the latter. 1949 */ 1950 bool 1951 ExecScanHashBucket(HashJoinState *hjstate, 1952 ExprContext *econtext) 1953 { 1954 ExprState *hjclauses = hjstate->hashclauses; 1955 HashJoinTable hashtable = hjstate->hj_HashTable; 1956 HashJoinTuple hashTuple = hjstate->hj_CurTuple; 1957 uint32 hashvalue = hjstate->hj_CurHashValue; 1958 1959 /* 1960 * hj_CurTuple is the address of the tuple last returned from the current 1961 * bucket, or NULL if it's time to start scanning a new bucket. 1962 * 1963 * If the tuple hashed to a skew bucket then scan the skew bucket 1964 * otherwise scan the standard hashtable bucket. 1965 */ 1966 if (hashTuple != NULL) 1967 hashTuple = hashTuple->next.unshared; 1968 else if (hjstate->hj_CurSkewBucketNo != INVALID_SKEW_BUCKET_NO) 1969 hashTuple = hashtable->skewBucket[hjstate->hj_CurSkewBucketNo]->tuples; 1970 else 1971 hashTuple = hashtable->buckets.unshared[hjstate->hj_CurBucketNo]; 1972 1973 while (hashTuple != NULL) 1974 { 1975 if (hashTuple->hashvalue == hashvalue) 1976 { 1977 TupleTableSlot *inntuple; 1978 1979 /* insert hashtable's tuple into exec slot so ExecQual sees it */ 1980 inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple), 1981 hjstate->hj_HashTupleSlot, 1982 false); /* do not pfree */ 1983 econtext->ecxt_innertuple = inntuple; 1984 1985 if (ExecQualAndReset(hjclauses, econtext)) 1986 { 1987 hjstate->hj_CurTuple = hashTuple; 1988 return true; 1989 } 1990 } 1991 1992 hashTuple = hashTuple->next.unshared; 1993 } 1994 1995 /* 1996 * no match 1997 */ 1998 return false; 1999 } 2000 2001 /* 2002 * ExecParallelScanHashBucket 2003 * scan a hash bucket for matches to the current outer tuple 2004 * 2005 * The current outer tuple must be stored in econtext->ecxt_outertuple. 2006 * 2007 * On success, the inner tuple is stored into hjstate->hj_CurTuple and 2008 * econtext->ecxt_innertuple, using hjstate->hj_HashTupleSlot as the slot 2009 * for the latter. 2010 */ 2011 bool 2012 ExecParallelScanHashBucket(HashJoinState *hjstate, 2013 ExprContext *econtext) 2014 { 2015 ExprState *hjclauses = hjstate->hashclauses; 2016 HashJoinTable hashtable = hjstate->hj_HashTable; 2017 HashJoinTuple hashTuple = hjstate->hj_CurTuple; 2018 uint32 hashvalue = hjstate->hj_CurHashValue; 2019 2020 /* 2021 * hj_CurTuple is the address of the tuple last returned from the current 2022 * bucket, or NULL if it's time to start scanning a new bucket. 2023 */ 2024 if (hashTuple != NULL) 2025 hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple); 2026 else 2027 hashTuple = ExecParallelHashFirstTuple(hashtable, 2028 hjstate->hj_CurBucketNo); 2029 2030 while (hashTuple != NULL) 2031 { 2032 if (hashTuple->hashvalue == hashvalue) 2033 { 2034 TupleTableSlot *inntuple; 2035 2036 /* insert hashtable's tuple into exec slot so ExecQual sees it */ 2037 inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple), 2038 hjstate->hj_HashTupleSlot, 2039 false); /* do not pfree */ 2040 econtext->ecxt_innertuple = inntuple; 2041 2042 if (ExecQualAndReset(hjclauses, econtext)) 2043 { 2044 hjstate->hj_CurTuple = hashTuple; 2045 return true; 2046 } 2047 } 2048 2049 hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple); 2050 } 2051 2052 /* 2053 * no match 2054 */ 2055 return false; 2056 } 2057 2058 /* 2059 * ExecPrepHashTableForUnmatched 2060 * set up for a series of ExecScanHashTableForUnmatched calls 2061 */ 2062 void 2063 ExecPrepHashTableForUnmatched(HashJoinState *hjstate) 2064 { 2065 /*---------- 2066 * During this scan we use the HashJoinState fields as follows: 2067 * 2068 * hj_CurBucketNo: next regular bucket to scan 2069 * hj_CurSkewBucketNo: next skew bucket (an index into skewBucketNums) 2070 * hj_CurTuple: last tuple returned, or NULL to start next bucket 2071 *---------- 2072 */ 2073 hjstate->hj_CurBucketNo = 0; 2074 hjstate->hj_CurSkewBucketNo = 0; 2075 hjstate->hj_CurTuple = NULL; 2076 } 2077 2078 /* 2079 * ExecScanHashTableForUnmatched 2080 * scan the hash table for unmatched inner tuples 2081 * 2082 * On success, the inner tuple is stored into hjstate->hj_CurTuple and 2083 * econtext->ecxt_innertuple, using hjstate->hj_HashTupleSlot as the slot 2084 * for the latter. 2085 */ 2086 bool 2087 ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext) 2088 { 2089 HashJoinTable hashtable = hjstate->hj_HashTable; 2090 HashJoinTuple hashTuple = hjstate->hj_CurTuple; 2091 2092 for (;;) 2093 { 2094 /* 2095 * hj_CurTuple is the address of the tuple last returned from the 2096 * current bucket, or NULL if it's time to start scanning a new 2097 * bucket. 2098 */ 2099 if (hashTuple != NULL) 2100 hashTuple = hashTuple->next.unshared; 2101 else if (hjstate->hj_CurBucketNo < hashtable->nbuckets) 2102 { 2103 hashTuple = hashtable->buckets.unshared[hjstate->hj_CurBucketNo]; 2104 hjstate->hj_CurBucketNo++; 2105 } 2106 else if (hjstate->hj_CurSkewBucketNo < hashtable->nSkewBuckets) 2107 { 2108 int j = hashtable->skewBucketNums[hjstate->hj_CurSkewBucketNo]; 2109 2110 hashTuple = hashtable->skewBucket[j]->tuples; 2111 hjstate->hj_CurSkewBucketNo++; 2112 } 2113 else 2114 break; /* finished all buckets */ 2115 2116 while (hashTuple != NULL) 2117 { 2118 if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple))) 2119 { 2120 TupleTableSlot *inntuple; 2121 2122 /* insert hashtable's tuple into exec slot */ 2123 inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple), 2124 hjstate->hj_HashTupleSlot, 2125 false); /* do not pfree */ 2126 econtext->ecxt_innertuple = inntuple; 2127 2128 /* 2129 * Reset temp memory each time; although this function doesn't 2130 * do any qual eval, the caller will, so let's keep it 2131 * parallel to ExecScanHashBucket. 2132 */ 2133 ResetExprContext(econtext); 2134 2135 hjstate->hj_CurTuple = hashTuple; 2136 return true; 2137 } 2138 2139 hashTuple = hashTuple->next.unshared; 2140 } 2141 2142 /* allow this loop to be cancellable */ 2143 CHECK_FOR_INTERRUPTS(); 2144 } 2145 2146 /* 2147 * no more unmatched tuples 2148 */ 2149 return false; 2150 } 2151 2152 /* 2153 * ExecHashTableReset 2154 * 2155 * reset hash table header for new batch 2156 */ 2157 void 2158 ExecHashTableReset(HashJoinTable hashtable) 2159 { 2160 MemoryContext oldcxt; 2161 int nbuckets = hashtable->nbuckets; 2162 2163 /* 2164 * Release all the hash buckets and tuples acquired in the prior pass, and 2165 * reinitialize the context for a new pass. 2166 */ 2167 MemoryContextReset(hashtable->batchCxt); 2168 oldcxt = MemoryContextSwitchTo(hashtable->batchCxt); 2169 2170 /* Reallocate and reinitialize the hash bucket headers. */ 2171 hashtable->buckets.unshared = (HashJoinTuple *) 2172 palloc0(nbuckets * sizeof(HashJoinTuple)); 2173 2174 hashtable->spaceUsed = 0; 2175 2176 MemoryContextSwitchTo(oldcxt); 2177 2178 /* Forget the chunks (the memory was freed by the context reset above). */ 2179 hashtable->chunks = NULL; 2180 } 2181 2182 /* 2183 * ExecHashTableResetMatchFlags 2184 * Clear all the HeapTupleHeaderHasMatch flags in the table 2185 */ 2186 void 2187 ExecHashTableResetMatchFlags(HashJoinTable hashtable) 2188 { 2189 HashJoinTuple tuple; 2190 int i; 2191 2192 /* Reset all flags in the main table ... */ 2193 for (i = 0; i < hashtable->nbuckets; i++) 2194 { 2195 for (tuple = hashtable->buckets.unshared[i]; tuple != NULL; 2196 tuple = tuple->next.unshared) 2197 HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(tuple)); 2198 } 2199 2200 /* ... and the same for the skew buckets, if any */ 2201 for (i = 0; i < hashtable->nSkewBuckets; i++) 2202 { 2203 int j = hashtable->skewBucketNums[i]; 2204 HashSkewBucket *skewBucket = hashtable->skewBucket[j]; 2205 2206 for (tuple = skewBucket->tuples; tuple != NULL; tuple = tuple->next.unshared) 2207 HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(tuple)); 2208 } 2209 } 2210 2211 2212 void 2213 ExecReScanHash(HashState *node) 2214 { 2215 /* 2216 * if chgParam of subnode is not null then plan will be re-scanned by 2217 * first ExecProcNode. 2218 */ 2219 if (node->ps.lefttree->chgParam == NULL) 2220 ExecReScan(node->ps.lefttree); 2221 } 2222 2223 2224 /* 2225 * ExecHashBuildSkewHash 2226 * 2227 * Set up for skew optimization if we can identify the most common values 2228 * (MCVs) of the outer relation's join key. We make a skew hash bucket 2229 * for the hash value of each MCV, up to the number of slots allowed 2230 * based on available memory. 2231 */ 2232 static void 2233 ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node, int mcvsToUse) 2234 { 2235 HeapTupleData *statsTuple; 2236 AttStatsSlot sslot; 2237 2238 /* Do nothing if planner didn't identify the outer relation's join key */ 2239 if (!OidIsValid(node->skewTable)) 2240 return; 2241 /* Also, do nothing if we don't have room for at least one skew bucket */ 2242 if (mcvsToUse <= 0) 2243 return; 2244 2245 /* 2246 * Try to find the MCV statistics for the outer relation's join key. 2247 */ 2248 statsTuple = SearchSysCache3(STATRELATTINH, 2249 ObjectIdGetDatum(node->skewTable), 2250 Int16GetDatum(node->skewColumn), 2251 BoolGetDatum(node->skewInherit)); 2252 if (!HeapTupleIsValid(statsTuple)) 2253 return; 2254 2255 if (get_attstatsslot(&sslot, statsTuple, 2256 STATISTIC_KIND_MCV, InvalidOid, 2257 ATTSTATSSLOT_VALUES | ATTSTATSSLOT_NUMBERS)) 2258 { 2259 double frac; 2260 int nbuckets; 2261 FmgrInfo *hashfunctions; 2262 int i; 2263 2264 if (mcvsToUse > sslot.nvalues) 2265 mcvsToUse = sslot.nvalues; 2266 2267 /* 2268 * Calculate the expected fraction of outer relation that will 2269 * participate in the skew optimization. If this isn't at least 2270 * SKEW_MIN_OUTER_FRACTION, don't use skew optimization. 2271 */ 2272 frac = 0; 2273 for (i = 0; i < mcvsToUse; i++) 2274 frac += sslot.numbers[i]; 2275 if (frac < SKEW_MIN_OUTER_FRACTION) 2276 { 2277 free_attstatsslot(&sslot); 2278 ReleaseSysCache(statsTuple); 2279 return; 2280 } 2281 2282 /* 2283 * Okay, set up the skew hashtable. 2284 * 2285 * skewBucket[] is an open addressing hashtable with a power of 2 size 2286 * that is greater than the number of MCV values. (This ensures there 2287 * will be at least one null entry, so searches will always 2288 * terminate.) 2289 * 2290 * Note: this code could fail if mcvsToUse exceeds INT_MAX/8 or 2291 * MaxAllocSize/sizeof(void *)/8, but that is not currently possible 2292 * since we limit pg_statistic entries to much less than that. 2293 */ 2294 nbuckets = pg_nextpower2_32(mcvsToUse + 1); 2295 /* use two more bits just to help avoid collisions */ 2296 nbuckets <<= 2; 2297 2298 hashtable->skewEnabled = true; 2299 hashtable->skewBucketLen = nbuckets; 2300 2301 /* 2302 * We allocate the bucket memory in the hashtable's batch context. It 2303 * is only needed during the first batch, and this ensures it will be 2304 * automatically removed once the first batch is done. 2305 */ 2306 hashtable->skewBucket = (HashSkewBucket **) 2307 MemoryContextAllocZero(hashtable->batchCxt, 2308 nbuckets * sizeof(HashSkewBucket *)); 2309 hashtable->skewBucketNums = (int *) 2310 MemoryContextAllocZero(hashtable->batchCxt, 2311 mcvsToUse * sizeof(int)); 2312 2313 hashtable->spaceUsed += nbuckets * sizeof(HashSkewBucket *) 2314 + mcvsToUse * sizeof(int); 2315 hashtable->spaceUsedSkew += nbuckets * sizeof(HashSkewBucket *) 2316 + mcvsToUse * sizeof(int); 2317 if (hashtable->spaceUsed > hashtable->spacePeak) 2318 hashtable->spacePeak = hashtable->spaceUsed; 2319 2320 /* 2321 * Create a skew bucket for each MCV hash value. 2322 * 2323 * Note: it is very important that we create the buckets in order of 2324 * decreasing MCV frequency. If we have to remove some buckets, they 2325 * must be removed in reverse order of creation (see notes in 2326 * ExecHashRemoveNextSkewBucket) and we want the least common MCVs to 2327 * be removed first. 2328 */ 2329 hashfunctions = hashtable->outer_hashfunctions; 2330 2331 for (i = 0; i < mcvsToUse; i++) 2332 { 2333 uint32 hashvalue; 2334 int bucket; 2335 2336 hashvalue = DatumGetUInt32(FunctionCall1Coll(&hashfunctions[0], 2337 hashtable->collations[0], 2338 sslot.values[i])); 2339 2340 /* 2341 * While we have not hit a hole in the hashtable and have not hit 2342 * the desired bucket, we have collided with some previous hash 2343 * value, so try the next bucket location. NB: this code must 2344 * match ExecHashGetSkewBucket. 2345 */ 2346 bucket = hashvalue & (nbuckets - 1); 2347 while (hashtable->skewBucket[bucket] != NULL && 2348 hashtable->skewBucket[bucket]->hashvalue != hashvalue) 2349 bucket = (bucket + 1) & (nbuckets - 1); 2350 2351 /* 2352 * If we found an existing bucket with the same hashvalue, leave 2353 * it alone. It's okay for two MCVs to share a hashvalue. 2354 */ 2355 if (hashtable->skewBucket[bucket] != NULL) 2356 continue; 2357 2358 /* Okay, create a new skew bucket for this hashvalue. */ 2359 hashtable->skewBucket[bucket] = (HashSkewBucket *) 2360 MemoryContextAlloc(hashtable->batchCxt, 2361 sizeof(HashSkewBucket)); 2362 hashtable->skewBucket[bucket]->hashvalue = hashvalue; 2363 hashtable->skewBucket[bucket]->tuples = NULL; 2364 hashtable->skewBucketNums[hashtable->nSkewBuckets] = bucket; 2365 hashtable->nSkewBuckets++; 2366 hashtable->spaceUsed += SKEW_BUCKET_OVERHEAD; 2367 hashtable->spaceUsedSkew += SKEW_BUCKET_OVERHEAD; 2368 if (hashtable->spaceUsed > hashtable->spacePeak) 2369 hashtable->spacePeak = hashtable->spaceUsed; 2370 } 2371 2372 free_attstatsslot(&sslot); 2373 } 2374 2375 ReleaseSysCache(statsTuple); 2376 } 2377 2378 /* 2379 * ExecHashGetSkewBucket 2380 * 2381 * Returns the index of the skew bucket for this hashvalue, 2382 * or INVALID_SKEW_BUCKET_NO if the hashvalue is not 2383 * associated with any active skew bucket. 2384 */ 2385 int 2386 ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 hashvalue) 2387 { 2388 int bucket; 2389 2390 /* 2391 * Always return INVALID_SKEW_BUCKET_NO if not doing skew optimization (in 2392 * particular, this happens after the initial batch is done). 2393 */ 2394 if (!hashtable->skewEnabled) 2395 return INVALID_SKEW_BUCKET_NO; 2396 2397 /* 2398 * Since skewBucketLen is a power of 2, we can do a modulo by ANDing. 2399 */ 2400 bucket = hashvalue & (hashtable->skewBucketLen - 1); 2401 2402 /* 2403 * While we have not hit a hole in the hashtable and have not hit the 2404 * desired bucket, we have collided with some other hash value, so try the 2405 * next bucket location. 2406 */ 2407 while (hashtable->skewBucket[bucket] != NULL && 2408 hashtable->skewBucket[bucket]->hashvalue != hashvalue) 2409 bucket = (bucket + 1) & (hashtable->skewBucketLen - 1); 2410 2411 /* 2412 * Found the desired bucket? 2413 */ 2414 if (hashtable->skewBucket[bucket] != NULL) 2415 return bucket; 2416 2417 /* 2418 * There must not be any hashtable entry for this hash value. 2419 */ 2420 return INVALID_SKEW_BUCKET_NO; 2421 } 2422 2423 /* 2424 * ExecHashSkewTableInsert 2425 * 2426 * Insert a tuple into the skew hashtable. 2427 * 2428 * This should generally match up with the current-batch case in 2429 * ExecHashTableInsert. 2430 */ 2431 static void 2432 ExecHashSkewTableInsert(HashJoinTable hashtable, 2433 TupleTableSlot *slot, 2434 uint32 hashvalue, 2435 int bucketNumber) 2436 { 2437 bool shouldFree; 2438 MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot, &shouldFree); 2439 HashJoinTuple hashTuple; 2440 int hashTupleSize; 2441 2442 /* Create the HashJoinTuple */ 2443 hashTupleSize = HJTUPLE_OVERHEAD + tuple->t_len; 2444 hashTuple = (HashJoinTuple) MemoryContextAlloc(hashtable->batchCxt, 2445 hashTupleSize); 2446 hashTuple->hashvalue = hashvalue; 2447 memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len); 2448 HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(hashTuple)); 2449 2450 /* Push it onto the front of the skew bucket's list */ 2451 hashTuple->next.unshared = hashtable->skewBucket[bucketNumber]->tuples; 2452 hashtable->skewBucket[bucketNumber]->tuples = hashTuple; 2453 Assert(hashTuple != hashTuple->next.unshared); 2454 2455 /* Account for space used, and back off if we've used too much */ 2456 hashtable->spaceUsed += hashTupleSize; 2457 hashtable->spaceUsedSkew += hashTupleSize; 2458 if (hashtable->spaceUsed > hashtable->spacePeak) 2459 hashtable->spacePeak = hashtable->spaceUsed; 2460 while (hashtable->spaceUsedSkew > hashtable->spaceAllowedSkew) 2461 ExecHashRemoveNextSkewBucket(hashtable); 2462 2463 /* Check we are not over the total spaceAllowed, either */ 2464 if (hashtable->spaceUsed > hashtable->spaceAllowed) 2465 ExecHashIncreaseNumBatches(hashtable); 2466 2467 if (shouldFree) 2468 heap_free_minimal_tuple(tuple); 2469 } 2470 2471 /* 2472 * ExecHashRemoveNextSkewBucket 2473 * 2474 * Remove the least valuable skew bucket by pushing its tuples into 2475 * the main hash table. 2476 */ 2477 static void 2478 ExecHashRemoveNextSkewBucket(HashJoinTable hashtable) 2479 { 2480 int bucketToRemove; 2481 HashSkewBucket *bucket; 2482 uint32 hashvalue; 2483 int bucketno; 2484 int batchno; 2485 HashJoinTuple hashTuple; 2486 2487 /* Locate the bucket to remove */ 2488 bucketToRemove = hashtable->skewBucketNums[hashtable->nSkewBuckets - 1]; 2489 bucket = hashtable->skewBucket[bucketToRemove]; 2490 2491 /* 2492 * Calculate which bucket and batch the tuples belong to in the main 2493 * hashtable. They all have the same hash value, so it's the same for all 2494 * of them. Also note that it's not possible for nbatch to increase while 2495 * we are processing the tuples. 2496 */ 2497 hashvalue = bucket->hashvalue; 2498 ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, &batchno); 2499 2500 /* Process all tuples in the bucket */ 2501 hashTuple = bucket->tuples; 2502 while (hashTuple != NULL) 2503 { 2504 HashJoinTuple nextHashTuple = hashTuple->next.unshared; 2505 MinimalTuple tuple; 2506 Size tupleSize; 2507 2508 /* 2509 * This code must agree with ExecHashTableInsert. We do not use 2510 * ExecHashTableInsert directly as ExecHashTableInsert expects a 2511 * TupleTableSlot while we already have HashJoinTuples. 2512 */ 2513 tuple = HJTUPLE_MINTUPLE(hashTuple); 2514 tupleSize = HJTUPLE_OVERHEAD + tuple->t_len; 2515 2516 /* Decide whether to put the tuple in the hash table or a temp file */ 2517 if (batchno == hashtable->curbatch) 2518 { 2519 /* Move the tuple to the main hash table */ 2520 HashJoinTuple copyTuple; 2521 2522 /* 2523 * We must copy the tuple into the dense storage, else it will not 2524 * be found by, eg, ExecHashIncreaseNumBatches. 2525 */ 2526 copyTuple = (HashJoinTuple) dense_alloc(hashtable, tupleSize); 2527 memcpy(copyTuple, hashTuple, tupleSize); 2528 pfree(hashTuple); 2529 2530 copyTuple->next.unshared = hashtable->buckets.unshared[bucketno]; 2531 hashtable->buckets.unshared[bucketno] = copyTuple; 2532 2533 /* We have reduced skew space, but overall space doesn't change */ 2534 hashtable->spaceUsedSkew -= tupleSize; 2535 } 2536 else 2537 { 2538 /* Put the tuple into a temp file for later batches */ 2539 Assert(batchno > hashtable->curbatch); 2540 ExecHashJoinSaveTuple(tuple, hashvalue, 2541 &hashtable->innerBatchFile[batchno]); 2542 pfree(hashTuple); 2543 hashtable->spaceUsed -= tupleSize; 2544 hashtable->spaceUsedSkew -= tupleSize; 2545 } 2546 2547 hashTuple = nextHashTuple; 2548 2549 /* allow this loop to be cancellable */ 2550 CHECK_FOR_INTERRUPTS(); 2551 } 2552 2553 /* 2554 * Free the bucket struct itself and reset the hashtable entry to NULL. 2555 * 2556 * NOTE: this is not nearly as simple as it looks on the surface, because 2557 * of the possibility of collisions in the hashtable. Suppose that hash 2558 * values A and B collide at a particular hashtable entry, and that A was 2559 * entered first so B gets shifted to a different table entry. If we were 2560 * to remove A first then ExecHashGetSkewBucket would mistakenly start 2561 * reporting that B is not in the hashtable, because it would hit the NULL 2562 * before finding B. However, we always remove entries in the reverse 2563 * order of creation, so this failure cannot happen. 2564 */ 2565 hashtable->skewBucket[bucketToRemove] = NULL; 2566 hashtable->nSkewBuckets--; 2567 pfree(bucket); 2568 hashtable->spaceUsed -= SKEW_BUCKET_OVERHEAD; 2569 hashtable->spaceUsedSkew -= SKEW_BUCKET_OVERHEAD; 2570 2571 /* 2572 * If we have removed all skew buckets then give up on skew optimization. 2573 * Release the arrays since they aren't useful any more. 2574 */ 2575 if (hashtable->nSkewBuckets == 0) 2576 { 2577 hashtable->skewEnabled = false; 2578 pfree(hashtable->skewBucket); 2579 pfree(hashtable->skewBucketNums); 2580 hashtable->skewBucket = NULL; 2581 hashtable->skewBucketNums = NULL; 2582 hashtable->spaceUsed -= hashtable->spaceUsedSkew; 2583 hashtable->spaceUsedSkew = 0; 2584 } 2585 } 2586 2587 /* 2588 * Reserve space in the DSM segment for instrumentation data. 2589 */ 2590 void 2591 ExecHashEstimate(HashState *node, ParallelContext *pcxt) 2592 { 2593 size_t size; 2594 2595 /* don't need this if not instrumenting or no workers */ 2596 if (!node->ps.instrument || pcxt->nworkers == 0) 2597 return; 2598 2599 size = mul_size(pcxt->nworkers, sizeof(HashInstrumentation)); 2600 size = add_size(size, offsetof(SharedHashInfo, hinstrument)); 2601 shm_toc_estimate_chunk(&pcxt->estimator, size); 2602 shm_toc_estimate_keys(&pcxt->estimator, 1); 2603 } 2604 2605 /* 2606 * Set up a space in the DSM for all workers to record instrumentation data 2607 * about their hash table. 2608 */ 2609 void 2610 ExecHashInitializeDSM(HashState *node, ParallelContext *pcxt) 2611 { 2612 size_t size; 2613 2614 /* don't need this if not instrumenting or no workers */ 2615 if (!node->ps.instrument || pcxt->nworkers == 0) 2616 return; 2617 2618 size = offsetof(SharedHashInfo, hinstrument) + 2619 pcxt->nworkers * sizeof(HashInstrumentation); 2620 node->shared_info = (SharedHashInfo *) shm_toc_allocate(pcxt->toc, size); 2621 2622 /* Each per-worker area must start out as zeroes. */ 2623 memset(node->shared_info, 0, size); 2624 2625 node->shared_info->num_workers = pcxt->nworkers; 2626 shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id, 2627 node->shared_info); 2628 } 2629 2630 /* 2631 * Locate the DSM space for hash table instrumentation data that we'll write 2632 * to at shutdown time. 2633 */ 2634 void 2635 ExecHashInitializeWorker(HashState *node, ParallelWorkerContext *pwcxt) 2636 { 2637 SharedHashInfo *shared_info; 2638 2639 /* don't need this if not instrumenting */ 2640 if (!node->ps.instrument) 2641 return; 2642 2643 /* 2644 * Find our entry in the shared area, and set up a pointer to it so that 2645 * we'll accumulate stats there when shutting down or rebuilding the hash 2646 * table. 2647 */ 2648 shared_info = (SharedHashInfo *) 2649 shm_toc_lookup(pwcxt->toc, node->ps.plan->plan_node_id, false); 2650 node->hinstrument = &shared_info->hinstrument[ParallelWorkerNumber]; 2651 } 2652 2653 /* 2654 * Collect EXPLAIN stats if needed, saving them into DSM memory if 2655 * ExecHashInitializeWorker was called, or local storage if not. In the 2656 * parallel case, this must be done in ExecShutdownHash() rather than 2657 * ExecEndHash() because the latter runs after we've detached from the DSM 2658 * segment. 2659 */ 2660 void 2661 ExecShutdownHash(HashState *node) 2662 { 2663 /* Allocate save space if EXPLAIN'ing and we didn't do so already */ 2664 if (node->ps.instrument && !node->hinstrument) 2665 node->hinstrument = (HashInstrumentation *) 2666 palloc0(sizeof(HashInstrumentation)); 2667 /* Now accumulate data for the current (final) hash table */ 2668 if (node->hinstrument && node->hashtable) 2669 ExecHashAccumInstrumentation(node->hinstrument, node->hashtable); 2670 } 2671 2672 /* 2673 * Retrieve instrumentation data from workers before the DSM segment is 2674 * detached, so that EXPLAIN can access it. 2675 */ 2676 void 2677 ExecHashRetrieveInstrumentation(HashState *node) 2678 { 2679 SharedHashInfo *shared_info = node->shared_info; 2680 size_t size; 2681 2682 if (shared_info == NULL) 2683 return; 2684 2685 /* Replace node->shared_info with a copy in backend-local memory. */ 2686 size = offsetof(SharedHashInfo, hinstrument) + 2687 shared_info->num_workers * sizeof(HashInstrumentation); 2688 node->shared_info = palloc(size); 2689 memcpy(node->shared_info, shared_info, size); 2690 } 2691 2692 /* 2693 * Accumulate instrumentation data from 'hashtable' into an 2694 * initially-zeroed HashInstrumentation struct. 2695 * 2696 * This is used to merge information across successive hash table instances 2697 * within a single plan node. We take the maximum values of each interesting 2698 * number. The largest nbuckets and largest nbatch values might have occurred 2699 * in different instances, so there's some risk of confusion from reporting 2700 * unrelated numbers; but there's a bigger risk of misdiagnosing a performance 2701 * issue if we don't report the largest values. Similarly, we want to report 2702 * the largest spacePeak regardless of whether it happened in the same 2703 * instance as the largest nbuckets or nbatch. All the instances should have 2704 * the same nbuckets_original and nbatch_original; but there's little value 2705 * in depending on that here, so handle them the same way. 2706 */ 2707 void 2708 ExecHashAccumInstrumentation(HashInstrumentation *instrument, 2709 HashJoinTable hashtable) 2710 { 2711 instrument->nbuckets = Max(instrument->nbuckets, 2712 hashtable->nbuckets); 2713 instrument->nbuckets_original = Max(instrument->nbuckets_original, 2714 hashtable->nbuckets_original); 2715 instrument->nbatch = Max(instrument->nbatch, 2716 hashtable->nbatch); 2717 instrument->nbatch_original = Max(instrument->nbatch_original, 2718 hashtable->nbatch_original); 2719 instrument->space_peak = Max(instrument->space_peak, 2720 hashtable->spacePeak); 2721 } 2722 2723 /* 2724 * Allocate 'size' bytes from the currently active HashMemoryChunk 2725 */ 2726 static void * 2727 dense_alloc(HashJoinTable hashtable, Size size) 2728 { 2729 HashMemoryChunk newChunk; 2730 char *ptr; 2731 2732 /* just in case the size is not already aligned properly */ 2733 size = MAXALIGN(size); 2734 2735 /* 2736 * If tuple size is larger than threshold, allocate a separate chunk. 2737 */ 2738 if (size > HASH_CHUNK_THRESHOLD) 2739 { 2740 /* allocate new chunk and put it at the beginning of the list */ 2741 newChunk = (HashMemoryChunk) MemoryContextAlloc(hashtable->batchCxt, 2742 HASH_CHUNK_HEADER_SIZE + size); 2743 newChunk->maxlen = size; 2744 newChunk->used = size; 2745 newChunk->ntuples = 1; 2746 2747 /* 2748 * Add this chunk to the list after the first existing chunk, so that 2749 * we don't lose the remaining space in the "current" chunk. 2750 */ 2751 if (hashtable->chunks != NULL) 2752 { 2753 newChunk->next = hashtable->chunks->next; 2754 hashtable->chunks->next.unshared = newChunk; 2755 } 2756 else 2757 { 2758 newChunk->next.unshared = hashtable->chunks; 2759 hashtable->chunks = newChunk; 2760 } 2761 2762 return HASH_CHUNK_DATA(newChunk); 2763 } 2764 2765 /* 2766 * See if we have enough space for it in the current chunk (if any). If 2767 * not, allocate a fresh chunk. 2768 */ 2769 if ((hashtable->chunks == NULL) || 2770 (hashtable->chunks->maxlen - hashtable->chunks->used) < size) 2771 { 2772 /* allocate new chunk and put it at the beginning of the list */ 2773 newChunk = (HashMemoryChunk) MemoryContextAlloc(hashtable->batchCxt, 2774 HASH_CHUNK_HEADER_SIZE + HASH_CHUNK_SIZE); 2775 2776 newChunk->maxlen = HASH_CHUNK_SIZE; 2777 newChunk->used = size; 2778 newChunk->ntuples = 1; 2779 2780 newChunk->next.unshared = hashtable->chunks; 2781 hashtable->chunks = newChunk; 2782 2783 return HASH_CHUNK_DATA(newChunk); 2784 } 2785 2786 /* There is enough space in the current chunk, let's add the tuple */ 2787 ptr = HASH_CHUNK_DATA(hashtable->chunks) + hashtable->chunks->used; 2788 hashtable->chunks->used += size; 2789 hashtable->chunks->ntuples += 1; 2790 2791 /* return pointer to the start of the tuple memory */ 2792 return ptr; 2793 } 2794 2795 /* 2796 * Allocate space for a tuple in shared dense storage. This is equivalent to 2797 * dense_alloc but for Parallel Hash using shared memory. 2798 * 2799 * While loading a tuple into shared memory, we might run out of memory and 2800 * decide to repartition, or determine that the load factor is too high and 2801 * decide to expand the bucket array, or discover that another participant has 2802 * commanded us to help do that. Return NULL if number of buckets or batches 2803 * has changed, indicating that the caller must retry (considering the 2804 * possibility that the tuple no longer belongs in the same batch). 2805 */ 2806 static HashJoinTuple 2807 ExecParallelHashTupleAlloc(HashJoinTable hashtable, size_t size, 2808 dsa_pointer *shared) 2809 { 2810 ParallelHashJoinState *pstate = hashtable->parallel_state; 2811 dsa_pointer chunk_shared; 2812 HashMemoryChunk chunk; 2813 Size chunk_size; 2814 HashJoinTuple result; 2815 int curbatch = hashtable->curbatch; 2816 2817 size = MAXALIGN(size); 2818 2819 /* 2820 * Fast path: if there is enough space in this backend's current chunk, 2821 * then we can allocate without any locking. 2822 */ 2823 chunk = hashtable->current_chunk; 2824 if (chunk != NULL && 2825 size <= HASH_CHUNK_THRESHOLD && 2826 chunk->maxlen - chunk->used >= size) 2827 { 2828 2829 chunk_shared = hashtable->current_chunk_shared; 2830 Assert(chunk == dsa_get_address(hashtable->area, chunk_shared)); 2831 *shared = chunk_shared + HASH_CHUNK_HEADER_SIZE + chunk->used; 2832 result = (HashJoinTuple) (HASH_CHUNK_DATA(chunk) + chunk->used); 2833 chunk->used += size; 2834 2835 Assert(chunk->used <= chunk->maxlen); 2836 Assert(result == dsa_get_address(hashtable->area, *shared)); 2837 2838 return result; 2839 } 2840 2841 /* Slow path: try to allocate a new chunk. */ 2842 LWLockAcquire(&pstate->lock, LW_EXCLUSIVE); 2843 2844 /* 2845 * Check if we need to help increase the number of buckets or batches. 2846 */ 2847 if (pstate->growth == PHJ_GROWTH_NEED_MORE_BATCHES || 2848 pstate->growth == PHJ_GROWTH_NEED_MORE_BUCKETS) 2849 { 2850 ParallelHashGrowth growth = pstate->growth; 2851 2852 hashtable->current_chunk = NULL; 2853 LWLockRelease(&pstate->lock); 2854 2855 /* Another participant has commanded us to help grow. */ 2856 if (growth == PHJ_GROWTH_NEED_MORE_BATCHES) 2857 ExecParallelHashIncreaseNumBatches(hashtable); 2858 else if (growth == PHJ_GROWTH_NEED_MORE_BUCKETS) 2859 ExecParallelHashIncreaseNumBuckets(hashtable); 2860 2861 /* The caller must retry. */ 2862 return NULL; 2863 } 2864 2865 /* Oversized tuples get their own chunk. */ 2866 if (size > HASH_CHUNK_THRESHOLD) 2867 chunk_size = size + HASH_CHUNK_HEADER_SIZE; 2868 else 2869 chunk_size = HASH_CHUNK_SIZE; 2870 2871 /* Check if it's time to grow batches or buckets. */ 2872 if (pstate->growth != PHJ_GROWTH_DISABLED) 2873 { 2874 Assert(curbatch == 0); 2875 Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_HASHING_INNER); 2876 2877 /* 2878 * Check if our space limit would be exceeded. To avoid choking on 2879 * very large tuples or very low hash_mem setting, we'll always allow 2880 * each backend to allocate at least one chunk. 2881 */ 2882 if (hashtable->batches[0].at_least_one_chunk && 2883 hashtable->batches[0].shared->size + 2884 chunk_size > pstate->space_allowed) 2885 { 2886 pstate->growth = PHJ_GROWTH_NEED_MORE_BATCHES; 2887 hashtable->batches[0].shared->space_exhausted = true; 2888 LWLockRelease(&pstate->lock); 2889 2890 return NULL; 2891 } 2892 2893 /* Check if our load factor limit would be exceeded. */ 2894 if (hashtable->nbatch == 1) 2895 { 2896 hashtable->batches[0].shared->ntuples += hashtable->batches[0].ntuples; 2897 hashtable->batches[0].ntuples = 0; 2898 /* Guard against integer overflow and alloc size overflow */ 2899 if (hashtable->batches[0].shared->ntuples + 1 > 2900 hashtable->nbuckets * NTUP_PER_BUCKET && 2901 hashtable->nbuckets < (INT_MAX / 2) && 2902 hashtable->nbuckets * 2 <= 2903 MaxAllocSize / sizeof(dsa_pointer_atomic)) 2904 { 2905 pstate->growth = PHJ_GROWTH_NEED_MORE_BUCKETS; 2906 LWLockRelease(&pstate->lock); 2907 2908 return NULL; 2909 } 2910 } 2911 } 2912 2913 /* We are cleared to allocate a new chunk. */ 2914 chunk_shared = dsa_allocate(hashtable->area, chunk_size); 2915 hashtable->batches[curbatch].shared->size += chunk_size; 2916 hashtable->batches[curbatch].at_least_one_chunk = true; 2917 2918 /* Set up the chunk. */ 2919 chunk = (HashMemoryChunk) dsa_get_address(hashtable->area, chunk_shared); 2920 *shared = chunk_shared + HASH_CHUNK_HEADER_SIZE; 2921 chunk->maxlen = chunk_size - HASH_CHUNK_HEADER_SIZE; 2922 chunk->used = size; 2923 2924 /* 2925 * Push it onto the list of chunks, so that it can be found if we need to 2926 * increase the number of buckets or batches (batch 0 only) and later for 2927 * freeing the memory (all batches). 2928 */ 2929 chunk->next.shared = hashtable->batches[curbatch].shared->chunks; 2930 hashtable->batches[curbatch].shared->chunks = chunk_shared; 2931 2932 if (size <= HASH_CHUNK_THRESHOLD) 2933 { 2934 /* 2935 * Make this the current chunk so that we can use the fast path to 2936 * fill the rest of it up in future calls. 2937 */ 2938 hashtable->current_chunk = chunk; 2939 hashtable->current_chunk_shared = chunk_shared; 2940 } 2941 LWLockRelease(&pstate->lock); 2942 2943 Assert(HASH_CHUNK_DATA(chunk) == dsa_get_address(hashtable->area, *shared)); 2944 result = (HashJoinTuple) HASH_CHUNK_DATA(chunk); 2945 2946 return result; 2947 } 2948 2949 /* 2950 * One backend needs to set up the shared batch state including tuplestores. 2951 * Other backends will ensure they have correctly configured accessors by 2952 * called ExecParallelHashEnsureBatchAccessors(). 2953 */ 2954 static void 2955 ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch) 2956 { 2957 ParallelHashJoinState *pstate = hashtable->parallel_state; 2958 ParallelHashJoinBatch *batches; 2959 MemoryContext oldcxt; 2960 int i; 2961 2962 Assert(hashtable->batches == NULL); 2963 2964 /* Allocate space. */ 2965 pstate->batches = 2966 dsa_allocate0(hashtable->area, 2967 EstimateParallelHashJoinBatch(hashtable) * nbatch); 2968 pstate->nbatch = nbatch; 2969 batches = dsa_get_address(hashtable->area, pstate->batches); 2970 2971 /* Use hash join memory context. */ 2972 oldcxt = MemoryContextSwitchTo(hashtable->hashCxt); 2973 2974 /* Allocate this backend's accessor array. */ 2975 hashtable->nbatch = nbatch; 2976 hashtable->batches = (ParallelHashJoinBatchAccessor *) 2977 palloc0(sizeof(ParallelHashJoinBatchAccessor) * hashtable->nbatch); 2978 2979 /* Set up the shared state, tuplestores and backend-local accessors. */ 2980 for (i = 0; i < hashtable->nbatch; ++i) 2981 { 2982 ParallelHashJoinBatchAccessor *accessor = &hashtable->batches[i]; 2983 ParallelHashJoinBatch *shared = NthParallelHashJoinBatch(batches, i); 2984 char name[MAXPGPATH]; 2985 2986 /* 2987 * All members of shared were zero-initialized. We just need to set 2988 * up the Barrier. 2989 */ 2990 BarrierInit(&shared->batch_barrier, 0); 2991 if (i == 0) 2992 { 2993 /* Batch 0 doesn't need to be loaded. */ 2994 BarrierAttach(&shared->batch_barrier); 2995 while (BarrierPhase(&shared->batch_barrier) < PHJ_BATCH_PROBING) 2996 BarrierArriveAndWait(&shared->batch_barrier, 0); 2997 BarrierDetach(&shared->batch_barrier); 2998 } 2999 3000 /* Initialize accessor state. All members were zero-initialized. */ 3001 accessor->shared = shared; 3002 3003 /* Initialize the shared tuplestores. */ 3004 snprintf(name, sizeof(name), "i%dof%d", i, hashtable->nbatch); 3005 accessor->inner_tuples = 3006 sts_initialize(ParallelHashJoinBatchInner(shared), 3007 pstate->nparticipants, 3008 ParallelWorkerNumber + 1, 3009 sizeof(uint32), 3010 SHARED_TUPLESTORE_SINGLE_PASS, 3011 &pstate->fileset, 3012 name); 3013 snprintf(name, sizeof(name), "o%dof%d", i, hashtable->nbatch); 3014 accessor->outer_tuples = 3015 sts_initialize(ParallelHashJoinBatchOuter(shared, 3016 pstate->nparticipants), 3017 pstate->nparticipants, 3018 ParallelWorkerNumber + 1, 3019 sizeof(uint32), 3020 SHARED_TUPLESTORE_SINGLE_PASS, 3021 &pstate->fileset, 3022 name); 3023 } 3024 3025 MemoryContextSwitchTo(oldcxt); 3026 } 3027 3028 /* 3029 * Free the current set of ParallelHashJoinBatchAccessor objects. 3030 */ 3031 static void 3032 ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable) 3033 { 3034 int i; 3035 3036 for (i = 0; i < hashtable->nbatch; ++i) 3037 { 3038 /* Make sure no files are left open. */ 3039 sts_end_write(hashtable->batches[i].inner_tuples); 3040 sts_end_write(hashtable->batches[i].outer_tuples); 3041 sts_end_parallel_scan(hashtable->batches[i].inner_tuples); 3042 sts_end_parallel_scan(hashtable->batches[i].outer_tuples); 3043 } 3044 pfree(hashtable->batches); 3045 hashtable->batches = NULL; 3046 } 3047 3048 /* 3049 * Make sure this backend has up-to-date accessors for the current set of 3050 * batches. 3051 */ 3052 static void 3053 ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable) 3054 { 3055 ParallelHashJoinState *pstate = hashtable->parallel_state; 3056 ParallelHashJoinBatch *batches; 3057 MemoryContext oldcxt; 3058 int i; 3059 3060 if (hashtable->batches != NULL) 3061 { 3062 if (hashtable->nbatch == pstate->nbatch) 3063 return; 3064 ExecParallelHashCloseBatchAccessors(hashtable); 3065 } 3066 3067 /* 3068 * It's possible for a backend to start up very late so that the whole 3069 * join is finished and the shm state for tracking batches has already 3070 * been freed by ExecHashTableDetach(). In that case we'll just leave 3071 * hashtable->batches as NULL so that ExecParallelHashJoinNewBatch() gives 3072 * up early. 3073 */ 3074 if (!DsaPointerIsValid(pstate->batches)) 3075 return; 3076 3077 /* Use hash join memory context. */ 3078 oldcxt = MemoryContextSwitchTo(hashtable->hashCxt); 3079 3080 /* Allocate this backend's accessor array. */ 3081 hashtable->nbatch = pstate->nbatch; 3082 hashtable->batches = (ParallelHashJoinBatchAccessor *) 3083 palloc0(sizeof(ParallelHashJoinBatchAccessor) * hashtable->nbatch); 3084 3085 /* Find the base of the pseudo-array of ParallelHashJoinBatch objects. */ 3086 batches = (ParallelHashJoinBatch *) 3087 dsa_get_address(hashtable->area, pstate->batches); 3088 3089 /* Set up the accessor array and attach to the tuplestores. */ 3090 for (i = 0; i < hashtable->nbatch; ++i) 3091 { 3092 ParallelHashJoinBatchAccessor *accessor = &hashtable->batches[i]; 3093 ParallelHashJoinBatch *shared = NthParallelHashJoinBatch(batches, i); 3094 3095 accessor->shared = shared; 3096 accessor->preallocated = 0; 3097 accessor->done = false; 3098 accessor->inner_tuples = 3099 sts_attach(ParallelHashJoinBatchInner(shared), 3100 ParallelWorkerNumber + 1, 3101 &pstate->fileset); 3102 accessor->outer_tuples = 3103 sts_attach(ParallelHashJoinBatchOuter(shared, 3104 pstate->nparticipants), 3105 ParallelWorkerNumber + 1, 3106 &pstate->fileset); 3107 } 3108 3109 MemoryContextSwitchTo(oldcxt); 3110 } 3111 3112 /* 3113 * Allocate an empty shared memory hash table for a given batch. 3114 */ 3115 void 3116 ExecParallelHashTableAlloc(HashJoinTable hashtable, int batchno) 3117 { 3118 ParallelHashJoinBatch *batch = hashtable->batches[batchno].shared; 3119 dsa_pointer_atomic *buckets; 3120 int nbuckets = hashtable->parallel_state->nbuckets; 3121 int i; 3122 3123 batch->buckets = 3124 dsa_allocate(hashtable->area, sizeof(dsa_pointer_atomic) * nbuckets); 3125 buckets = (dsa_pointer_atomic *) 3126 dsa_get_address(hashtable->area, batch->buckets); 3127 for (i = 0; i < nbuckets; ++i) 3128 dsa_pointer_atomic_init(&buckets[i], InvalidDsaPointer); 3129 } 3130 3131 /* 3132 * If we are currently attached to a shared hash join batch, detach. If we 3133 * are last to detach, clean up. 3134 */ 3135 void 3136 ExecHashTableDetachBatch(HashJoinTable hashtable) 3137 { 3138 if (hashtable->parallel_state != NULL && 3139 hashtable->curbatch >= 0) 3140 { 3141 int curbatch = hashtable->curbatch; 3142 ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared; 3143 3144 /* Make sure any temporary files are closed. */ 3145 sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples); 3146 sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples); 3147 3148 /* Detach from the batch we were last working on. */ 3149 if (BarrierArriveAndDetach(&batch->batch_barrier)) 3150 { 3151 /* 3152 * Technically we shouldn't access the barrier because we're no 3153 * longer attached, but since there is no way it's moving after 3154 * this point it seems safe to make the following assertion. 3155 */ 3156 Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_DONE); 3157 3158 /* Free shared chunks and buckets. */ 3159 while (DsaPointerIsValid(batch->chunks)) 3160 { 3161 HashMemoryChunk chunk = 3162 dsa_get_address(hashtable->area, batch->chunks); 3163 dsa_pointer next = chunk->next.shared; 3164 3165 dsa_free(hashtable->area, batch->chunks); 3166 batch->chunks = next; 3167 } 3168 if (DsaPointerIsValid(batch->buckets)) 3169 { 3170 dsa_free(hashtable->area, batch->buckets); 3171 batch->buckets = InvalidDsaPointer; 3172 } 3173 } 3174 3175 /* 3176 * Track the largest batch we've been attached to. Though each 3177 * backend might see a different subset of batches, explain.c will 3178 * scan the results from all backends to find the largest value. 3179 */ 3180 hashtable->spacePeak = 3181 Max(hashtable->spacePeak, 3182 batch->size + sizeof(dsa_pointer_atomic) * hashtable->nbuckets); 3183 3184 /* Remember that we are not attached to a batch. */ 3185 hashtable->curbatch = -1; 3186 } 3187 } 3188 3189 /* 3190 * Detach from all shared resources. If we are last to detach, clean up. 3191 */ 3192 void 3193 ExecHashTableDetach(HashJoinTable hashtable) 3194 { 3195 if (hashtable->parallel_state) 3196 { 3197 ParallelHashJoinState *pstate = hashtable->parallel_state; 3198 int i; 3199 3200 /* Make sure any temporary files are closed. */ 3201 if (hashtable->batches) 3202 { 3203 for (i = 0; i < hashtable->nbatch; ++i) 3204 { 3205 sts_end_write(hashtable->batches[i].inner_tuples); 3206 sts_end_write(hashtable->batches[i].outer_tuples); 3207 sts_end_parallel_scan(hashtable->batches[i].inner_tuples); 3208 sts_end_parallel_scan(hashtable->batches[i].outer_tuples); 3209 } 3210 } 3211 3212 /* If we're last to detach, clean up shared memory. */ 3213 if (BarrierDetach(&pstate->build_barrier)) 3214 { 3215 if (DsaPointerIsValid(pstate->batches)) 3216 { 3217 dsa_free(hashtable->area, pstate->batches); 3218 pstate->batches = InvalidDsaPointer; 3219 } 3220 } 3221 3222 hashtable->parallel_state = NULL; 3223 } 3224 } 3225 3226 /* 3227 * Get the first tuple in a given bucket identified by number. 3228 */ 3229 static inline HashJoinTuple 3230 ExecParallelHashFirstTuple(HashJoinTable hashtable, int bucketno) 3231 { 3232 HashJoinTuple tuple; 3233 dsa_pointer p; 3234 3235 Assert(hashtable->parallel_state); 3236 p = dsa_pointer_atomic_read(&hashtable->buckets.shared[bucketno]); 3237 tuple = (HashJoinTuple) dsa_get_address(hashtable->area, p); 3238 3239 return tuple; 3240 } 3241 3242 /* 3243 * Get the next tuple in the same bucket as 'tuple'. 3244 */ 3245 static inline HashJoinTuple 3246 ExecParallelHashNextTuple(HashJoinTable hashtable, HashJoinTuple tuple) 3247 { 3248 HashJoinTuple next; 3249 3250 Assert(hashtable->parallel_state); 3251 next = (HashJoinTuple) dsa_get_address(hashtable->area, tuple->next.shared); 3252 3253 return next; 3254 } 3255 3256 /* 3257 * Insert a tuple at the front of a chain of tuples in DSA memory atomically. 3258 */ 3259 static inline void 3260 ExecParallelHashPushTuple(dsa_pointer_atomic *head, 3261 HashJoinTuple tuple, 3262 dsa_pointer tuple_shared) 3263 { 3264 for (;;) 3265 { 3266 tuple->next.shared = dsa_pointer_atomic_read(head); 3267 if (dsa_pointer_atomic_compare_exchange(head, 3268 &tuple->next.shared, 3269 tuple_shared)) 3270 break; 3271 } 3272 } 3273 3274 /* 3275 * Prepare to work on a given batch. 3276 */ 3277 void 3278 ExecParallelHashTableSetCurrentBatch(HashJoinTable hashtable, int batchno) 3279 { 3280 Assert(hashtable->batches[batchno].shared->buckets != InvalidDsaPointer); 3281 3282 hashtable->curbatch = batchno; 3283 hashtable->buckets.shared = (dsa_pointer_atomic *) 3284 dsa_get_address(hashtable->area, 3285 hashtable->batches[batchno].shared->buckets); 3286 hashtable->nbuckets = hashtable->parallel_state->nbuckets; 3287 hashtable->log2_nbuckets = my_log2(hashtable->nbuckets); 3288 hashtable->current_chunk = NULL; 3289 hashtable->current_chunk_shared = InvalidDsaPointer; 3290 hashtable->batches[batchno].at_least_one_chunk = false; 3291 } 3292 3293 /* 3294 * Take the next available chunk from the queue of chunks being worked on in 3295 * parallel. Return NULL if there are none left. Otherwise return a pointer 3296 * to the chunk, and set *shared to the DSA pointer to the chunk. 3297 */ 3298 static HashMemoryChunk 3299 ExecParallelHashPopChunkQueue(HashJoinTable hashtable, dsa_pointer *shared) 3300 { 3301 ParallelHashJoinState *pstate = hashtable->parallel_state; 3302 HashMemoryChunk chunk; 3303 3304 LWLockAcquire(&pstate->lock, LW_EXCLUSIVE); 3305 if (DsaPointerIsValid(pstate->chunk_work_queue)) 3306 { 3307 *shared = pstate->chunk_work_queue; 3308 chunk = (HashMemoryChunk) 3309 dsa_get_address(hashtable->area, *shared); 3310 pstate->chunk_work_queue = chunk->next.shared; 3311 } 3312 else 3313 chunk = NULL; 3314 LWLockRelease(&pstate->lock); 3315 3316 return chunk; 3317 } 3318 3319 /* 3320 * Increase the space preallocated in this backend for a given inner batch by 3321 * at least a given amount. This allows us to track whether a given batch 3322 * would fit in memory when loaded back in. Also increase the number of 3323 * batches or buckets if required. 3324 * 3325 * This maintains a running estimation of how much space will be taken when we 3326 * load the batch back into memory by simulating the way chunks will be handed 3327 * out to workers. It's not perfectly accurate because the tuples will be 3328 * packed into memory chunks differently by ExecParallelHashTupleAlloc(), but 3329 * it should be pretty close. It tends to overestimate by a fraction of a 3330 * chunk per worker since all workers gang up to preallocate during hashing, 3331 * but workers tend to reload batches alone if there are enough to go around, 3332 * leaving fewer partially filled chunks. This effect is bounded by 3333 * nparticipants. 3334 * 3335 * Return false if the number of batches or buckets has changed, and the 3336 * caller should reconsider which batch a given tuple now belongs in and call 3337 * again. 3338 */ 3339 static bool 3340 ExecParallelHashTuplePrealloc(HashJoinTable hashtable, int batchno, size_t size) 3341 { 3342 ParallelHashJoinState *pstate = hashtable->parallel_state; 3343 ParallelHashJoinBatchAccessor *batch = &hashtable->batches[batchno]; 3344 size_t want = Max(size, HASH_CHUNK_SIZE - HASH_CHUNK_HEADER_SIZE); 3345 3346 Assert(batchno > 0); 3347 Assert(batchno < hashtable->nbatch); 3348 Assert(size == MAXALIGN(size)); 3349 3350 LWLockAcquire(&pstate->lock, LW_EXCLUSIVE); 3351 3352 /* Has another participant commanded us to help grow? */ 3353 if (pstate->growth == PHJ_GROWTH_NEED_MORE_BATCHES || 3354 pstate->growth == PHJ_GROWTH_NEED_MORE_BUCKETS) 3355 { 3356 ParallelHashGrowth growth = pstate->growth; 3357 3358 LWLockRelease(&pstate->lock); 3359 if (growth == PHJ_GROWTH_NEED_MORE_BATCHES) 3360 ExecParallelHashIncreaseNumBatches(hashtable); 3361 else if (growth == PHJ_GROWTH_NEED_MORE_BUCKETS) 3362 ExecParallelHashIncreaseNumBuckets(hashtable); 3363 3364 return false; 3365 } 3366 3367 if (pstate->growth != PHJ_GROWTH_DISABLED && 3368 batch->at_least_one_chunk && 3369 (batch->shared->estimated_size + want + HASH_CHUNK_HEADER_SIZE 3370 > pstate->space_allowed)) 3371 { 3372 /* 3373 * We have determined that this batch would exceed the space budget if 3374 * loaded into memory. Command all participants to help repartition. 3375 */ 3376 batch->shared->space_exhausted = true; 3377 pstate->growth = PHJ_GROWTH_NEED_MORE_BATCHES; 3378 LWLockRelease(&pstate->lock); 3379 3380 return false; 3381 } 3382 3383 batch->at_least_one_chunk = true; 3384 batch->shared->estimated_size += want + HASH_CHUNK_HEADER_SIZE; 3385 batch->preallocated = want; 3386 LWLockRelease(&pstate->lock); 3387 3388 return true; 3389 } 3390 3391 /* 3392 * Calculate the limit on how much memory can be used by Hash and similar 3393 * plan types. This is work_mem times hash_mem_multiplier, and is 3394 * expressed in bytes. 3395 * 3396 * Exported for use by the planner, as well as other hash-like executor 3397 * nodes. This is a rather random place for this, but there is no better 3398 * place. 3399 */ 3400 size_t 3401 get_hash_memory_limit(void) 3402 { 3403 double mem_limit; 3404 3405 /* Do initial calculation in double arithmetic */ 3406 mem_limit = (double) work_mem * hash_mem_multiplier * 1024.0; 3407 3408 /* Clamp in case it doesn't fit in size_t */ 3409 mem_limit = Min(mem_limit, (double) SIZE_MAX); 3410 3411 return (size_t) mem_limit; 3412 } 3413 3414 /* 3415 * Convert the hash memory limit to an integer number of kilobytes, 3416 * that is something comparable to work_mem. Like work_mem, we clamp 3417 * the result to ensure that multiplying it by 1024 fits in a long int. 3418 * 3419 * This is deprecated since it may understate the actual memory limit. 3420 * It is unused in core and will eventually be removed. 3421 */ 3422 int 3423 get_hash_mem(void) 3424 { 3425 size_t mem_limit = get_hash_memory_limit(); 3426 3427 /* Remove the kilobyte factor */ 3428 mem_limit /= 1024; 3429 3430 /* Clamp to MAX_KILOBYTES, like work_mem */ 3431 mem_limit = Min(mem_limit, (size_t) MAX_KILOBYTES); 3432 3433 return (int) mem_limit; 3434 } 3435