1 /* Copyright (c) 2018, 2020, Oracle and/or its affiliates. All rights reserved.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software
21 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
22
23 #include "sql/hash_join_iterator.h"
24
25 #include <sys/types.h>
26 #include <algorithm>
27 #include <cmath>
28 #include <string>
29 #include <utility>
30 #include <vector>
31
32 #include "extra/lz4/my_xxhash.h"
33 #include "field_types.h"
34 #include "my_alloc.h"
35 #include "my_bit.h"
36 #include "my_bitmap.h"
37 #include "my_dbug.h"
38 #include "my_inttypes.h"
39 #include "my_sys.h"
40 #include "mysqld_error.h"
41 #include "scope_guard.h"
42 #include "sql/handler.h"
43 #include "sql/hash_join_buffer.h"
44 #include "sql/item.h"
45 #include "sql/item_cmpfunc.h"
46 #include "sql/pfs_batch_mode.h"
47 #include "sql/row_iterator.h"
48 #include "sql/sql_class.h"
49 #include "sql/sql_executor.h"
50 #include "sql/sql_optimizer.h"
51 #include "sql/sql_select.h"
52 #include "sql/table.h"
53
54 constexpr size_t HashJoinIterator::kMaxChunks;
55
HashJoinIterator(THD * thd,unique_ptr_destroy_only<RowIterator> build_input,qep_tab_map build_input_tables,unique_ptr_destroy_only<RowIterator> probe_input,qep_tab_map probe_input_tables,size_t max_memory_available,const std::vector<HashJoinCondition> & join_conditions,bool allow_spill_to_disk,JoinType join_type,const JOIN * join,const std::vector<Item * > & extra_conditions)56 HashJoinIterator::HashJoinIterator(
57 THD *thd, unique_ptr_destroy_only<RowIterator> build_input,
58 qep_tab_map build_input_tables,
59 unique_ptr_destroy_only<RowIterator> probe_input,
60 qep_tab_map probe_input_tables, size_t max_memory_available,
61 const std::vector<HashJoinCondition> &join_conditions,
62 bool allow_spill_to_disk, JoinType join_type, const JOIN *join,
63 const std::vector<Item *> &extra_conditions)
64 : RowIterator(thd),
65 m_state(State::READING_ROW_FROM_PROBE_ITERATOR),
66 m_build_input(move(build_input)),
67 m_probe_input(move(probe_input)),
68 m_probe_input_tables(join, probe_input_tables),
69 m_build_input_tables(join, build_input_tables),
70 m_row_buffer(m_build_input_tables, join_conditions, max_memory_available),
71 m_join_conditions(PSI_NOT_INSTRUMENTED, join_conditions.data(),
72 join_conditions.data() + join_conditions.size()),
73 m_chunk_files_on_disk(thd->mem_root, kMaxChunks),
74 m_allow_spill_to_disk(allow_spill_to_disk),
75 m_join_type(join_type) {
76 DBUG_ASSERT(m_build_input != nullptr);
77 DBUG_ASSERT(m_probe_input != nullptr);
78
79 // If there are multiple extra conditions, merge them into a single AND-ed
80 // condition, so evaluation of the item is a bit easier.
81 if (extra_conditions.size() == 1) {
82 m_extra_condition = extra_conditions.front();
83 } else if (extra_conditions.size() > 1) {
84 List<Item> items;
85 for (Item *cond : extra_conditions) {
86 items.push_back(cond);
87 }
88 m_extra_condition = new Item_cond_and(items);
89 m_extra_condition->quick_fix_field();
90 m_extra_condition->update_used_tables();
91 m_extra_condition->apply_is_true();
92 }
93
94 // Mark that this iterator will provide the row ID, so that iterators above
95 // this one does not call position(). See QEP_TAB::rowid_status for more
96 // details.
97 for (const hash_join_buffer::Table &it : m_build_input_tables.tables()) {
98 if (it.qep_tab->rowid_status == NEED_TO_CALL_POSITION_FOR_ROWID) {
99 it.qep_tab->rowid_status = ROWID_PROVIDED_BY_ITERATOR_READ_CALL;
100 }
101 }
102
103 for (const hash_join_buffer::Table &it : m_probe_input_tables.tables()) {
104 if (it.qep_tab->rowid_status == NEED_TO_CALL_POSITION_FOR_ROWID) {
105 it.qep_tab->rowid_status = ROWID_PROVIDED_BY_ITERATOR_READ_CALL;
106 }
107 }
108
109 if (m_probe_input_tables.tables().size() == 1) {
110 // If there is more than one table, batch mode will be handled by the join
111 // iterators on the probe side.
112 m_probe_input_batch_mode =
113 m_probe_input_tables.tables()[0].qep_tab->pfs_batch_update(join);
114 }
115 }
116
InitRowBuffer()117 bool HashJoinIterator::InitRowBuffer() {
118 // After the row buffer is initialized, we want the row buffer iterators to
119 // point to the end of the row buffer in order to have a clean state. But on
120 // some platforms, especially windows, the iterator assignment operator will
121 // try to access the data it points to. This may be problematic if the hash
122 // join iterator is being re-inited; the iterators will point to data that has
123 // already been freed when doing the iterator assignment. To avoid the
124 // iterators to point to any data, call the destructors so that they have a
125 // clean state.
126 {
127 // Due to a bug in LLVM, we have to introduce a non-nested alias in order to
128 // call the destructor (https://bugs.llvm.org//show_bug.cgi?id=12350).
129 using iterator = hash_join_buffer::HashJoinRowBuffer::hash_map_iterator;
130 m_hash_map_iterator.iterator::~iterator();
131 m_hash_map_end.iterator::~iterator();
132 }
133
134 if (m_row_buffer.Init(kHashTableSeed)) {
135 DBUG_ASSERT(thd()->is_error()); // my_error should have been called.
136 return true;
137 }
138
139 m_hash_map_iterator = m_row_buffer.end();
140 m_hash_map_end = m_row_buffer.end();
141 return false;
142 }
143
144 // Mark that blobs should be copied for each table that contains at least one
145 // geometry column.
MarkCopyBlobsIfTableContainsGeometry(const hash_join_buffer::TableCollection & table_collection)146 static void MarkCopyBlobsIfTableContainsGeometry(
147 const hash_join_buffer::TableCollection &table_collection) {
148 for (const hash_join_buffer::Table &table : table_collection.tables()) {
149 for (const hash_join_buffer::Column &col : table.columns) {
150 if (col.field_type == MYSQL_TYPE_GEOMETRY) {
151 table.qep_tab->table()->copy_blobs = true;
152 break;
153 }
154 }
155 }
156 }
157
InitProbeIterator()158 bool HashJoinIterator::InitProbeIterator() {
159 DBUG_ASSERT(m_state == State::READING_ROW_FROM_PROBE_ITERATOR);
160
161 if (m_probe_input->Init()) {
162 return true;
163 }
164
165 if (m_probe_input_batch_mode) {
166 m_probe_input->StartPSIBatchMode();
167 }
168 return false;
169 }
170
Init()171 bool HashJoinIterator::Init() {
172 // Prepare to read the build input into the hash map.
173 if (m_build_input->Init()) {
174 DBUG_ASSERT(thd()->is_error()); // my_error should have been called.
175 return true;
176 }
177
178 // We always start out by doing everything in memory.
179 m_hash_join_type = HashJoinType::IN_MEMORY;
180 m_write_to_probe_row_saving = false;
181
182 m_build_iterator_has_more_rows = true;
183 m_probe_input->EndPSIBatchModeIfStarted();
184 m_probe_row_match_flag = false;
185
186 // Set up the buffer that is used when
187 // a) moving a row between the tables' record buffers, and,
188 // b) when constructing a join key from join conditions.
189 size_t upper_row_size = 0;
190 if (!m_build_input_tables.has_blob_column()) {
191 upper_row_size =
192 hash_join_buffer::ComputeRowSizeUpperBound(m_build_input_tables);
193 }
194
195 if (!m_probe_input_tables.has_blob_column()) {
196 upper_row_size = std::max(
197 upper_row_size,
198 hash_join_buffer::ComputeRowSizeUpperBound(m_probe_input_tables));
199 }
200
201 if (m_temporary_row_and_join_key_buffer.reserve(upper_row_size)) {
202 my_error(ER_OUTOFMEMORY, MYF(0), upper_row_size);
203 return true; // oom
204 }
205
206 // If any of the tables contains a geometry column, we must ensure that
207 // the geometry data is copied to the row buffer (see
208 // Field_geom::store_internal) instead of only setting the pointer to the
209 // data. This is needed if the hash join spills to disk; when we read a row
210 // back from chunk file, row data is stored in a temporary buffer. If not told
211 // otherwise, Field_geom::store_internal will only store the pointer to the
212 // data, and not the data itself. The data this field points to will then
213 // become invalid when the temporary buffer is used for something else.
214 MarkCopyBlobsIfTableContainsGeometry(m_probe_input_tables);
215 MarkCopyBlobsIfTableContainsGeometry(m_build_input_tables);
216
217 // Close any leftover files from previous iterations.
218 m_chunk_files_on_disk.clear();
219
220 m_build_chunk_current_row = 0;
221 m_probe_chunk_current_row = 0;
222 m_current_chunk = -1;
223
224 // Build the hash table
225 if (BuildHashTable()) {
226 DBUG_ASSERT(thd()->is_error()); // my_error should have been called.
227 return true;
228 }
229
230 if (m_state == State::END_OF_ROWS) {
231 // BuildHashTable() decided that the join is done (the build input is
232 // empty, and we are in an inner-/semijoin. Anti-/outer join must output
233 // NULL-complemented rows from the probe input).
234 return false;
235 }
236
237 if (m_join_type == JoinType::ANTI && m_join_conditions.empty() &&
238 m_extra_condition == nullptr && !m_row_buffer.empty()) {
239 // For degenerate antijoins, we know we will never output anything
240 // if there's anything in the hash table, so we can end right away.
241 // (We also don't need to read more than one row, but
242 // CreateHashJoinAccessPath() has already added a LIMIT 1 for us
243 // in this case.)
244 m_state = State::END_OF_ROWS;
245 return false;
246 }
247
248 return InitProbeIterator();
249 }
250
251 // Construct a join key from a list of join conditions, where the join key from
252 // each join condition is concatenated together in the output buffer
253 // "join_key_buffer". The function returns true if a SQL NULL value is found.
ConstructJoinKey(THD * thd,const Prealloced_array<HashJoinCondition,4> & join_conditions,table_map tables_bitmap,String * join_key_buffer)254 static bool ConstructJoinKey(
255 THD *thd, const Prealloced_array<HashJoinCondition, 4> &join_conditions,
256 table_map tables_bitmap, String *join_key_buffer) {
257 join_key_buffer->length(0);
258 for (const HashJoinCondition &hash_join_condition : join_conditions) {
259 if (hash_join_condition.join_condition()->append_join_key_for_hash_join(
260 thd, tables_bitmap, hash_join_condition, join_key_buffer)) {
261 // The join condition returned SQL NULL.
262 return true;
263 }
264 }
265 return false;
266 }
267
268 // Write a single row to a HashJoinChunk. The row must lie in the record buffer
269 // (record[0]) for each involved table. The row is put into one of the chunks in
270 // the input vector "chunks"; which chunk to use is decided by the hash value of
271 // the join attribute.
WriteRowToChunk(THD * thd,Mem_root_array<ChunkPair> * chunks,bool write_to_build_chunk,const hash_join_buffer::TableCollection & tables,const Prealloced_array<HashJoinCondition,4> & join_conditions,const uint32 xxhash_seed,bool row_has_match,bool store_row_with_null_in_join_key,String * join_key_and_row_buffer)272 static bool WriteRowToChunk(
273 THD *thd, Mem_root_array<ChunkPair> *chunks, bool write_to_build_chunk,
274 const hash_join_buffer::TableCollection &tables,
275 const Prealloced_array<HashJoinCondition, 4> &join_conditions,
276 const uint32 xxhash_seed, bool row_has_match,
277 bool store_row_with_null_in_join_key, String *join_key_and_row_buffer) {
278 bool null_in_join_key = ConstructJoinKey(
279 thd, join_conditions, tables.tables_bitmap(), join_key_and_row_buffer);
280
281 if (null_in_join_key && !store_row_with_null_in_join_key) {
282 // NULL values will never match in a inner join or a semijoin. The optimizer
283 // will often set up a NULL filter for inner joins, but not in all cases. So
284 // we must handle this gracefully instead of asserting.
285 return false;
286 }
287
288 const uint64_t join_key_hash =
289 MY_XXH64(join_key_and_row_buffer->ptr(),
290 join_key_and_row_buffer->length(), xxhash_seed);
291
292 DBUG_ASSERT((chunks->size() & (chunks->size() - 1)) == 0);
293 // Since we know that the number of chunks will be a power of two, do a
294 // bitwise AND instead of (join_key_hash % chunks->size()).
295 const size_t chunk_index = join_key_hash & (chunks->size() - 1);
296 ChunkPair &chunk_pair = (*chunks)[chunk_index];
297 if (write_to_build_chunk) {
298 return chunk_pair.build_chunk.WriteRowToChunk(join_key_and_row_buffer,
299 row_has_match);
300 } else {
301 return chunk_pair.probe_chunk.WriteRowToChunk(join_key_and_row_buffer,
302 row_has_match);
303 }
304 }
305
306 // Request the row ID for all tables where it should be kept.
RequestRowId(const Prealloced_array<hash_join_buffer::Table,4> & tables)307 void RequestRowId(const Prealloced_array<hash_join_buffer::Table, 4> &tables) {
308 for (const hash_join_buffer::Table &it : tables) {
309 TABLE *table = it.qep_tab->table();
310 if (it.rowid_status == NEED_TO_CALL_POSITION_FOR_ROWID &&
311 can_call_position(table)) {
312 table->file->position(table->record[0]);
313 }
314 }
315 }
316
317 // Write all the remaining rows from the given iterator out to chunk files
318 // on disk. If the function returns true, an unrecoverable error occurred
319 // (IO error etc.).
WriteRowsToChunks(THD * thd,RowIterator * iterator,const hash_join_buffer::TableCollection & tables,const Prealloced_array<HashJoinCondition,4> & join_conditions,const uint32 xxhash_seed,Mem_root_array<ChunkPair> * chunks,bool write_to_build_chunk,bool write_rows_with_null_in_join_key,String * join_key_buffer)320 static bool WriteRowsToChunks(
321 THD *thd, RowIterator *iterator,
322 const hash_join_buffer::TableCollection &tables,
323 const Prealloced_array<HashJoinCondition, 4> &join_conditions,
324 const uint32 xxhash_seed, Mem_root_array<ChunkPair> *chunks,
325 bool write_to_build_chunk, bool write_rows_with_null_in_join_key,
326 String *join_key_buffer) {
327 for (;;) { // Termination condition within loop.
328 int res = iterator->Read();
329 if (res == 1) {
330 DBUG_ASSERT(thd->is_error()); // my_error should have been called.
331 return true;
332 }
333
334 if (res == -1) {
335 return false; // EOF; success.
336 }
337
338 DBUG_ASSERT(res == 0);
339
340 RequestRowId(tables.tables());
341 if (WriteRowToChunk(thd, chunks, write_to_build_chunk, tables,
342 join_conditions, xxhash_seed, /*row_has_match=*/false,
343 write_rows_with_null_in_join_key, join_key_buffer)) {
344 DBUG_ASSERT(thd->is_error()); // my_error should have been called.
345 return true;
346 }
347 }
348 }
349
350 // Initialize all HashJoinChunks for both inputs. When estimating how many
351 // chunks we need, we first assume that the estimated row count from the planner
352 // is correct. Furthermore, we assume that the current row buffer is
353 // representative of the overall row density, so that if we divide the
354 // (estimated) number of remaining rows by the number of rows read so far and
355 // use that as our chunk count, we will get on-disk chunks that each will fit
356 // into RAM when we read them back later. As a safeguard, we subtract a small
357 // percentage (reduction factor), since we'd rather get one or two extra chunks
358 // instead of having to re-read the probe input multiple times. We limit the
359 // number of chunks per input, so we don't risk hitting the server's limit for
360 // number of open files.
InitializeChunkFiles(size_t estimated_rows_produced_by_join,size_t rows_in_hash_table,size_t max_chunk_files,const hash_join_buffer::TableCollection & probe_tables,const hash_join_buffer::TableCollection & build_tables,bool include_match_flag_for_probe,Mem_root_array<ChunkPair> * chunk_pairs)361 static bool InitializeChunkFiles(
362 size_t estimated_rows_produced_by_join, size_t rows_in_hash_table,
363 size_t max_chunk_files,
364 const hash_join_buffer::TableCollection &probe_tables,
365 const hash_join_buffer::TableCollection &build_tables,
366 bool include_match_flag_for_probe, Mem_root_array<ChunkPair> *chunk_pairs) {
367 constexpr double kReductionFactor = 0.9;
368 const size_t reduced_rows_in_hash_table =
369 std::max<size_t>(1, rows_in_hash_table * kReductionFactor);
370
371 // Avoid underflow, since the hash table may contain more rows than the
372 // estimate from the planner.
373 const size_t remaining_rows =
374 std::max(rows_in_hash_table, estimated_rows_produced_by_join) -
375 rows_in_hash_table;
376
377 const size_t chunks_needed = std::max<size_t>(
378 1, std::ceil(remaining_rows / reduced_rows_in_hash_table));
379 const size_t num_chunks = std::min(max_chunk_files, chunks_needed);
380
381 // Ensure that the number of chunks is always a power of two. This allows
382 // us to do some optimizations when calculating which chunk a row should
383 // be placed in.
384 const size_t num_chunks_pow_2 = my_round_up_to_next_power(num_chunks);
385
386 DBUG_ASSERT(chunk_pairs != nullptr && chunk_pairs->empty());
387 chunk_pairs->resize(num_chunks_pow_2);
388 for (ChunkPair &chunk_pair : *chunk_pairs) {
389 if (chunk_pair.build_chunk.Init(build_tables, /*uses_match_flags=*/false) ||
390 chunk_pair.probe_chunk.Init(probe_tables,
391 include_match_flag_for_probe)) {
392 my_error(ER_TEMP_FILE_WRITE_FAILURE, MYF(0));
393 return true;
394 }
395 }
396
397 return false;
398 }
399
BuildHashTable()400 bool HashJoinIterator::BuildHashTable() {
401 if (!m_build_iterator_has_more_rows) {
402 m_state = State::END_OF_ROWS;
403 return false;
404 }
405
406 // Restore the last row that was inserted into the row buffer. This is
407 // necessary if the build input is a nested loop with a filter on the inner
408 // side, like this:
409 //
410 // +---Hash join---+
411 // | |
412 // Nested loop t1
413 // | |
414 // t3 Filter: (t3.i < t2.i)
415 // |
416 // t2
417 //
418 // If the hash join is not allowed to spill to disk, we may need to re-fill
419 // the hash table multiple times. If the nested loop happens to be in the
420 // state "reading inner rows" when a re-fill is triggered, the filter will
421 // look at the data in t3's record buffer in order to evaluate the filter. The
422 // row in t3's record buffer may be any of the rows that was stored in the
423 // hash table, and not the last row returned from t3. To ensure that the
424 // filter is looking at the correct data, restore the last row that was
425 // inserted into the hash table.
426 if (m_row_buffer.Initialized() &&
427 m_row_buffer.LastRowStored() != m_row_buffer.end()) {
428 hash_join_buffer::LoadIntoTableBuffers(
429 m_build_input_tables, m_row_buffer.LastRowStored()->second);
430 }
431
432 if (InitRowBuffer()) {
433 return true;
434 }
435
436 const bool reject_duplicate_keys = RejectDuplicateKeys();
437 const bool store_rows_with_null_in_join_key = m_join_type == JoinType::OUTER;
438
439 // If Init() is called multiple times (e.g., if hash join is inside an
440 // dependent subquery), we must clear the NULL row flag, as it may have been
441 // set by the previous executing of this hash join.
442 m_build_input->SetNullRowFlag(/*is_null_row=*/false);
443
444 PFSBatchMode batch_mode(m_build_input.get());
445 for (;;) { // Termination condition within loop.
446 int res = m_build_input->Read();
447 if (res == 1) {
448 DBUG_ASSERT(thd()->is_error()); // my_error should have been called.
449 return true;
450 }
451
452 if (res == -1) {
453 m_build_iterator_has_more_rows = false;
454 // If the build input was empty, the result of inner joins and semijoins
455 // will also be empty. However, if the build input was empty, the output
456 // of antijoins will be all the rows from the probe input.
457 if (m_row_buffer.empty() && m_join_type != JoinType::ANTI &&
458 m_join_type != JoinType::OUTER) {
459 m_state = State::END_OF_ROWS;
460 return false;
461 }
462
463 // As we managed to read to the end of the build iterator, this is the
464 // last time we will read from the probe iterator. Thus, we can disable
465 // probe row saving again (it was enabled if the hash table ran out of
466 // memory _and_ we were not allowed to spill to disk).
467 m_write_to_probe_row_saving = false;
468 SetReadingProbeRowState();
469 return false;
470 }
471 DBUG_ASSERT(res == 0);
472 RequestRowId(m_build_input_tables.tables());
473
474 const hash_join_buffer::StoreRowResult store_row_result =
475 m_row_buffer.StoreRow(thd(), reject_duplicate_keys,
476 store_rows_with_null_in_join_key);
477 switch (store_row_result) {
478 case hash_join_buffer::StoreRowResult::ROW_STORED:
479 break;
480 case hash_join_buffer::StoreRowResult::BUFFER_FULL: {
481 // The row buffer is full, so start spilling to disk (if allowed). Note
482 // that the row buffer checks for OOM _after_ the row was inserted, so
483 // we should always manage to insert at least one row.
484 DBUG_ASSERT(!m_row_buffer.empty());
485
486 // If we are not allowed to spill to disk, just go on to reading from
487 // the probe iterator.
488 if (!m_allow_spill_to_disk) {
489 if (m_join_type != JoinType::INNER) {
490 // Enable probe row saving, so that unmatched probe rows are written
491 // to the probe row saving file. After the next refill of the hash
492 // table, we will read rows from the probe row saving file, ensuring
493 // that we only read unmatched probe rows.
494 InitWritingToProbeRowSavingFile();
495 }
496 SetReadingProbeRowState();
497 return false;
498 }
499
500 // Ideally, we would use the estimated row count from the iterator. But
501 // not all iterators has the row count available (i.e.
502 // RemoveDuplicatesIterator), so get the row count directly from the
503 // QEP_TAB.
504 const QEP_TAB *last_table_in_join =
505 m_build_input_tables.tables().back().qep_tab;
506 if (InitializeChunkFiles(
507 last_table_in_join->position()->prefix_rowcount,
508 m_row_buffer.size(), kMaxChunks, m_probe_input_tables,
509 m_build_input_tables,
510 /*include_match_flag_for_probe=*/m_join_type == JoinType::OUTER,
511 &m_chunk_files_on_disk)) {
512 DBUG_ASSERT(thd()->is_error()); // my_error should have been called.
513 return true;
514 }
515
516 // Write out the remaining rows from the build input out to chunk files.
517 // The probe input will be written out to chunk files later; we will do
518 // it _after_ we have checked the probe input for matches against the
519 // rows that are already written to the hash table. An alternative
520 // approach would be to write out the remaining rows from the build
521 // _and_ the rows that already are in the hash table. In that case, we
522 // could also write out the entire probe input to disk here as well. But
523 // we don't want to waste the rows that we already have stored in
524 // memory.
525 //
526 // We never write out rows with NULL in condition for the build/right
527 // input, as these rows will never match in a join condition.
528 if (WriteRowsToChunks(thd(), m_build_input.get(), m_build_input_tables,
529 m_join_conditions, kChunkPartitioningHashSeed,
530 &m_chunk_files_on_disk,
531 true /* write_to_build_chunks */,
532 false /* write_rows_with_null_in_join_key */,
533 &m_temporary_row_and_join_key_buffer)) {
534 DBUG_ASSERT(thd()->is_error()); // my_error should have been called.
535 return true;
536 }
537
538 // Flush and position all chunk files from the build input at the
539 // beginning.
540 for (ChunkPair &chunk_pair : m_chunk_files_on_disk) {
541 if (chunk_pair.build_chunk.Rewind()) {
542 DBUG_ASSERT(
543 thd()->is_error()); // my_error should have been called.
544 return true;
545 }
546 }
547 SetReadingProbeRowState();
548 return false;
549 }
550 case hash_join_buffer::StoreRowResult::FATAL_ERROR:
551 // An unrecoverable error. Most likely, malloc failed, so report OOM.
552 // Note that we cannot say for sure how much memory we tried to allocate
553 // when failing, so just report 'join_buffer_size' as the amount of
554 // memory we tried to allocate.
555 my_error(ER_OUTOFMEMORY, MYF(ME_FATALERROR),
556 thd()->variables.join_buff_size);
557 return true;
558 }
559 }
560 }
561
ReadNextHashJoinChunk()562 bool HashJoinIterator::ReadNextHashJoinChunk() {
563 // See if we should proceed to the next pair of chunk files. In general,
564 // it works like this; if we are at the end of the build chunk, move to the
565 // next. If not, keep reading from the same chunk pair. We also move to the
566 // next pair of chunk files if the probe chunk file is empty.
567 bool move_to_next_chunk = false;
568 if (m_current_chunk == -1) {
569 // We are before the first chunk, so move to the next.
570 move_to_next_chunk = true;
571 } else if (m_build_chunk_current_row >=
572 m_chunk_files_on_disk[m_current_chunk].build_chunk.num_rows()) {
573 // We are done reading all the rows from the build chunk.
574 move_to_next_chunk = true;
575 } else if (m_chunk_files_on_disk[m_current_chunk].probe_chunk.num_rows() ==
576 0) {
577 // The probe chunk file is empty.
578 move_to_next_chunk = true;
579 }
580
581 if (move_to_next_chunk) {
582 m_current_chunk++;
583 m_build_chunk_current_row = 0;
584
585 // Since we are moving to a new set of chunk files, ensure that we read from
586 // the chunk file and not from the probe row saving file.
587 m_read_from_probe_row_saving = false;
588 }
589
590 if (m_current_chunk == static_cast<int>(m_chunk_files_on_disk.size())) {
591 // We have moved past the last chunk, so we are done.
592 m_state = State::END_OF_ROWS;
593 return false;
594 }
595
596 if (InitRowBuffer()) {
597 return true;
598 }
599
600 HashJoinChunk &build_chunk =
601 m_chunk_files_on_disk[m_current_chunk].build_chunk;
602
603 const bool reject_duplicate_keys = RejectDuplicateKeys();
604 const bool store_rows_with_null_in_join_key = m_join_type == JoinType::OUTER;
605 for (; m_build_chunk_current_row < build_chunk.num_rows();
606 ++m_build_chunk_current_row) {
607 // Read the next row from the chunk file, and put it in the in-memory row
608 // buffer. If the buffer goes full, do the probe phase against the rows we
609 // managed to put in the buffer and continue reading where we left in the
610 // next iteration.
611 if (build_chunk.LoadRowFromChunk(&m_temporary_row_and_join_key_buffer,
612 /*matched=*/nullptr)) {
613 DBUG_ASSERT(thd()->is_error()); // my_error should have been called.
614 return true;
615 }
616
617 hash_join_buffer::StoreRowResult store_row_result = m_row_buffer.StoreRow(
618 thd(), reject_duplicate_keys, store_rows_with_null_in_join_key);
619
620 if (store_row_result == hash_join_buffer::StoreRowResult::BUFFER_FULL) {
621 // The row buffer checks for OOM _after_ the row was inserted, so we
622 // should always manage to insert at least one row.
623 DBUG_ASSERT(!m_row_buffer.empty());
624
625 // Since the last row read was actually stored in the buffer, increment
626 // the row counter manually before breaking out of the loop.
627 ++m_build_chunk_current_row;
628 break;
629 } else if (store_row_result ==
630 hash_join_buffer::StoreRowResult::FATAL_ERROR) {
631 // An unrecoverable error. Most likely, malloc failed, so report OOM.
632 // Note that we cannot say for sure how much memory we tried to allocate
633 // when failing, so just report 'join_buffer_size' as the amount of
634 // memory we tried to allocate.
635 my_error(ER_OUTOFMEMORY, MYF(ME_FATALERROR),
636 thd()->variables.join_buff_size);
637 return true;
638 }
639
640 DBUG_ASSERT(store_row_result ==
641 hash_join_buffer::StoreRowResult::ROW_STORED);
642 }
643
644 // Prepare to do a lookup in the hash table for all rows from the probe
645 // chunk.
646 if (m_chunk_files_on_disk[m_current_chunk].probe_chunk.Rewind()) {
647 DBUG_ASSERT(thd()->is_error()); // my_error should have been called.
648 return true;
649 }
650 m_probe_chunk_current_row = 0;
651 SetReadingProbeRowState();
652
653 if (m_build_chunk_current_row < build_chunk.num_rows() &&
654 m_join_type != JoinType::INNER) {
655 // The build chunk did not fit into memory, causing us to refill the hash
656 // table once the probe input is consumed. If we don't take any special
657 // action, we can end up outputting the same probe row twice if the probe
658 // phase finds a match in both iterations through the hash table.
659 // By enabling probe row saving, unmatched probe rows are written to a probe
660 // row saving file. After the next hash table refill, we load the probe rows
661 // from the probe row saving file instead of from the build chunk, and thus
662 // ensuring that we only see unmatched probe rows. Note that we have not
663 // started reading probe rows yet, but we are about to do so.
664 InitWritingToProbeRowSavingFile();
665 } else {
666 m_write_to_probe_row_saving = false;
667 }
668
669 return false;
670 }
671
ReadRowFromProbeIterator()672 bool HashJoinIterator::ReadRowFromProbeIterator() {
673 DBUG_ASSERT(m_current_chunk == -1);
674
675 int result = m_probe_input->Read();
676 if (result == 1) {
677 DBUG_ASSERT(thd()->is_error()); // my_error should have been called.
678 return true;
679 }
680
681 if (result == 0) {
682 RequestRowId(m_probe_input_tables.tables());
683
684 // A row from the probe iterator is ready.
685 LookupProbeRowInHashTable();
686 return false;
687 }
688
689 DBUG_ASSERT(result == -1);
690 m_probe_input->EndPSIBatchModeIfStarted();
691
692 // The probe iterator is out of rows. We may be in three different situations
693 // here (ordered from most common to less common):
694 // 1. The build input is also empty, and the join is done. The iterator state
695 // will go into "LOADING_NEXT_CHUNK_PAIR", and we will see that there are
696 // no chunk files when trying to load the next pair of chunk files.
697 // 2. We have degraded into an on-disk hash join, and we will now start
698 // reading from chunk files on disk.
699 // 3. The build input is not empty, and we have not degraded into an on-disk
700 // hash join (i.e. we were not allowed due to a LIMIT in the query),
701 // re-populate the hash table with the remaining rows from the build input.
702 if (m_allow_spill_to_disk) {
703 m_hash_join_type = HashJoinType::SPILL_TO_DISK;
704 m_state = State::LOADING_NEXT_CHUNK_PAIR;
705 return false;
706 }
707
708 m_hash_join_type = HashJoinType::IN_MEMORY_WITH_HASH_TABLE_REFILL;
709 if (m_write_to_probe_row_saving) {
710 // If probe row saving is enabled, it means that the probe row saving write
711 // file contains all the rows from the probe input that should be
712 // read/processed again. We must swap the probe row saving writing and probe
713 // row saving reading file _before_ calling BuildHashTable, since
714 // BuildHashTable may initialize (and thus clear) the probe row saving write
715 // file, loosing any rows written to said file.
716 if (InitReadingFromProbeRowSavingFile()) {
717 DBUG_ASSERT(thd()->is_error()); // my_error should have been called.
718 return true;
719 }
720 }
721
722 if (BuildHashTable()) {
723 DBUG_ASSERT(thd()->is_error()); // my_error should have been called.
724 return true;
725 }
726
727 switch (m_state) {
728 case State::END_OF_ROWS:
729 // BuildHashTable() decided that the join is done (the build input is
730 // empty, and we are in an inner-/semijoin. Anti-/outer join must output
731 // NULL-complemented rows from the probe input).
732 return false;
733 case State::READING_ROW_FROM_PROBE_ITERATOR:
734 // Start reading from the beginning of the probe iterator.
735 return InitProbeIterator();
736 case State::READING_ROW_FROM_PROBE_ROW_SAVING_FILE:
737 // The probe row saving read file is already initialized for reading
738 // further up in this function.
739 return false;
740 default:
741 DBUG_ASSERT(false);
742 return true;
743 }
744 }
745
ReadRowFromProbeChunkFile()746 bool HashJoinIterator::ReadRowFromProbeChunkFile() {
747 DBUG_ASSERT(on_disk_hash_join() && m_current_chunk != -1);
748
749 // Read one row from the current HashJoinChunk, and put
750 // that row into the record buffer of the probe input table.
751 HashJoinChunk ¤t_probe_chunk =
752 m_chunk_files_on_disk[m_current_chunk].probe_chunk;
753 if (m_probe_chunk_current_row >= current_probe_chunk.num_rows()) {
754 // No more rows in the current probe chunk, so load the next chunk of
755 // build rows into the hash table.
756 if (m_write_to_probe_row_saving) {
757 // If probe row saving is enabled, the build chunk did not fit in memory.
758 // This causes us to refill the hash table with the rows from the build
759 // chunk that did not fit, and thus read the probe chunk multiple times.
760 // This can be problematic for semijoin; we do not want to output a probe
761 // row that has a match in both parts of the hash table. To mitigate
762 // this, we write probe rows that does not have a match in the hash table
763 // to a probe row saving file (m_probe_row_saving_write_file), and read
764 // from said file instead of from the probe input the next time.
765 if (InitReadingFromProbeRowSavingFile()) {
766 DBUG_ASSERT(thd()->is_error()); // my_error should have been called.
767 return true;
768 }
769 } else {
770 m_read_from_probe_row_saving = false;
771 }
772
773 m_state = State::LOADING_NEXT_CHUNK_PAIR;
774 return false;
775 } else if (current_probe_chunk.LoadRowFromChunk(
776 &m_temporary_row_and_join_key_buffer,
777 &m_probe_row_match_flag)) {
778 DBUG_ASSERT(thd()->is_error()); // my_error should have been called.
779 return true;
780 }
781
782 m_probe_chunk_current_row++;
783
784 // A row from the chunk file is ready.
785 LookupProbeRowInHashTable();
786 return false;
787 }
788
ReadRowFromProbeRowSavingFile()789 bool HashJoinIterator::ReadRowFromProbeRowSavingFile() {
790 // Read one row from the probe row saving file, and put that row into the
791 // record buffer of the probe input table.
792 if (m_probe_row_saving_read_file_current_row >=
793 m_probe_row_saving_read_file.num_rows()) {
794 // We are done reading all the rows from the probe row saving file. If probe
795 // row saving is still enabled, we have a new set of rows in the probe row
796 // saving write file.
797 if (m_write_to_probe_row_saving) {
798 if (InitReadingFromProbeRowSavingFile()) {
799 DBUG_ASSERT(thd()->is_error()); // my_error should have been called.
800 return true;
801 }
802 } else {
803 m_read_from_probe_row_saving = false;
804 }
805
806 // If we are executing an on-disk hash join, go and load the next pair of
807 // chunk files. If we are doing everything in memory with multiple hash
808 // table refills, go and refill the hash table.
809 if (m_hash_join_type == HashJoinType::SPILL_TO_DISK) {
810 m_state = State::LOADING_NEXT_CHUNK_PAIR;
811 return false;
812 }
813 DBUG_ASSERT(m_hash_join_type ==
814 HashJoinType::IN_MEMORY_WITH_HASH_TABLE_REFILL);
815
816 // No more rows in the probe row saving file.
817 if (BuildHashTable()) {
818 DBUG_ASSERT(thd()->is_error()); // my_error should have been called.
819 return true;
820 }
821
822 if (m_state == State::END_OF_ROWS) {
823 // BuildHashTable() decided that the join is done (the build input is
824 // empty).
825 return false;
826 }
827
828 SetReadingProbeRowState();
829 return false;
830 } else if (m_probe_row_saving_read_file.LoadRowFromChunk(
831 &m_temporary_row_and_join_key_buffer,
832 &m_probe_row_match_flag)) {
833 DBUG_ASSERT(thd()->is_error()); // my_error should have been called.
834 return true;
835 }
836
837 m_probe_row_saving_read_file_current_row++;
838
839 // A row from the chunk file is ready.
840 LookupProbeRowInHashTable();
841 return false;
842 }
843
LookupProbeRowInHashTable()844 void HashJoinIterator::LookupProbeRowInHashTable() {
845 if (m_join_conditions.empty()) {
846 // Skip the call to equal_range in case we don't have any join conditions.
847 // This can save up to 20% in case of multi-table joins.
848 m_hash_map_iterator = m_row_buffer.begin();
849 m_hash_map_end = m_row_buffer.end();
850 m_state = State::READING_FIRST_ROW_FROM_HASH_TABLE;
851 return;
852 }
853
854 // Extract the join key from the probe input, and use that key as the lookup
855 // key in the hash table.
856 bool null_in_join_key = ConstructJoinKey(
857 thd(), m_join_conditions, m_probe_input_tables.tables_bitmap(),
858 &m_temporary_row_and_join_key_buffer);
859
860 if (null_in_join_key) {
861 if (m_join_type == JoinType::ANTI || m_join_type == JoinType::OUTER) {
862 // SQL NULL was found, and we will never find a matching row in the hash
863 // table. Let us indicate that, so that a null-complemented row is
864 // returned.
865 m_hash_map_iterator = m_row_buffer.end();
866 m_hash_map_end = m_row_buffer.end();
867 m_state = State::READING_FIRST_ROW_FROM_HASH_TABLE;
868 } else {
869 SetReadingProbeRowState();
870 }
871 return;
872 }
873
874 hash_join_buffer::Key key(
875 pointer_cast<const uchar *>(m_temporary_row_and_join_key_buffer.ptr()),
876 m_temporary_row_and_join_key_buffer.length());
877
878 if ((m_join_type == JoinType::SEMI || m_join_type == JoinType::ANTI) &&
879 m_extra_condition == nullptr) {
880 // find() has a better average complexity than equal_range() (constant vs.
881 // linear in the number of matching elements). And for semijoins, we are
882 // only interested in the first match anyways, so this may give a nice
883 // speedup. An exception to this is if we have any "extra" conditions that
884 // needs to be evaluated after the hash table lookup, but before the row is
885 // returned; we may need to read through the entire hash table to find a row
886 // that satisfies the extra condition(s).
887 m_hash_map_iterator = m_row_buffer.find(key);
888 m_hash_map_end = m_row_buffer.end();
889 } else {
890 auto range = m_row_buffer.equal_range(key);
891 m_hash_map_iterator = range.first;
892 m_hash_map_end = range.second;
893 }
894
895 m_state = State::READING_FIRST_ROW_FROM_HASH_TABLE;
896 }
897
ReadJoinedRow()898 int HashJoinIterator::ReadJoinedRow() {
899 if (m_hash_map_iterator == m_hash_map_end) {
900 // Signal that we have reached the end of hash table entries. Let the caller
901 // determine which state we end up in.
902 return -1;
903 }
904
905 // A row is ready in the hash table, so put the data from the hash table row
906 // into the record buffers of the build input tables.
907 hash_join_buffer::LoadIntoTableBuffers(m_build_input_tables,
908 m_hash_map_iterator->second);
909 return 0;
910 }
911
WriteProbeRowToDiskIfApplicable()912 bool HashJoinIterator::WriteProbeRowToDiskIfApplicable() {
913 // If we are spilling to disk, we need to match the row against rows from
914 // the build input that are written out to chunk files. So we need to write
915 // the probe row to chunk files as well. Semijoin/antijoin has an exception to
916 // this; if the probe input already got a match in the hash table, we do not
917 // need to write it out to disk. Outer joins should always write the row out
918 // to disk, since the probe/left input should return NULL-complemented rows
919 // even if the join condition contains SQL NULL.
920 const bool write_rows_with_null_in_join_key = m_join_type == JoinType::OUTER;
921 if (m_state == State::READING_FIRST_ROW_FROM_HASH_TABLE) {
922 const bool found_match = m_hash_map_iterator != m_hash_map_end;
923
924 if ((m_join_type == JoinType::INNER || m_join_type == JoinType::OUTER) ||
925 !found_match) {
926 if (on_disk_hash_join() && m_current_chunk == -1) {
927 if (WriteRowToChunk(thd(), &m_chunk_files_on_disk,
928 false /* write_to_build_chunk */,
929 m_probe_input_tables, m_join_conditions,
930 kChunkPartitioningHashSeed, found_match,
931 write_rows_with_null_in_join_key,
932 &m_temporary_row_and_join_key_buffer)) {
933 return true;
934 }
935 }
936
937 if (m_write_to_probe_row_saving &&
938 m_probe_row_saving_write_file.WriteRowToChunk(
939 &m_temporary_row_and_join_key_buffer,
940 found_match || m_probe_row_match_flag)) {
941 return true;
942 }
943 }
944 }
945
946 return false;
947 }
948
JoinedRowPassesExtraConditions() const949 bool HashJoinIterator::JoinedRowPassesExtraConditions() const {
950 if (m_extra_condition != nullptr) {
951 return m_extra_condition->val_int() != 0;
952 }
953
954 return true;
955 }
956
ReadNextJoinedRowFromHashTable()957 int HashJoinIterator::ReadNextJoinedRowFromHashTable() {
958 int res;
959 bool passes_extra_conditions = false;
960 do {
961 res = ReadJoinedRow();
962
963 // ReadJoinedRow() can only return 0 (row is ready) or -1 (EOF).
964 DBUG_ASSERT(res == 0 || res == -1);
965
966 // Evaluate any extra conditions that are attached to this iterator before
967 // we return a row.
968 if (res == 0) {
969 passes_extra_conditions = JoinedRowPassesExtraConditions();
970 if (thd()->is_error()) {
971 // Evaluation of extra conditions raised an error, so abort the join.
972 return 1;
973 }
974
975 if (!passes_extra_conditions) {
976 // Advance to the next matching row in the hash table. Note that the
977 // iterator stays in the state READING_FIRST_ROW_FROM_HASH_TABLE even
978 // though we are not actually reading the first row anymore. This is
979 // because WriteProbeRowToDiskIfApplicable() needs to know if this is
980 // the first row that matches both the join condition and any extra
981 // conditions; only unmatched rows will be written to disk.
982 ++m_hash_map_iterator;
983 }
984 }
985 } while (res == 0 && !passes_extra_conditions);
986
987 // The row passed all extra conditions (or we are out of rows in the hash
988 // table), so we can now write the row to disk.
989 // Inner and outer joins: Write out all rows from the probe input (given that
990 // we have degraded into on-disk hash join).
991 // Semijoin and antijoin: Write out rows that do not have any matching row in
992 // the hash table.
993 if (WriteProbeRowToDiskIfApplicable()) {
994 return 1;
995 }
996
997 if (res == -1) {
998 // If we did not find a matching row in the hash table, antijoin and outer
999 // join should ouput the last row read from the probe input together with a
1000 // NULL-complemented row from the build input. However, in case of on-disk
1001 // antijoin, a row from the probe input can match a row from the build input
1002 // that has already been written out to disk. So for on-disk antijoin, we
1003 // cannot output any rows until we have started reading from chunk files.
1004 //
1005 // On-disk outer join is a bit more tricky; we can only output a
1006 // NULL-complemented row if the probe row did not match anything from the
1007 // build input while doing any of the probe phases. We can have multiple
1008 // probe phases if e.g. a build chunk file is too big to fit in memory; we
1009 // would have to read the build chunk in multiple smaller chunks while doing
1010 // a probe phase for each of these smaller chunks. To keep track of this,
1011 // each probe row is prefixed with a match flag in the chunk files.
1012 bool return_null_complemented_row = false;
1013 if ((on_disk_hash_join() && m_current_chunk == -1) ||
1014 m_write_to_probe_row_saving) {
1015 return_null_complemented_row = false;
1016 } else if (m_join_type == JoinType::ANTI) {
1017 return_null_complemented_row = true;
1018 } else if (m_join_type == JoinType::OUTER &&
1019 m_state == State::READING_FIRST_ROW_FROM_HASH_TABLE &&
1020 !m_probe_row_match_flag) {
1021 return_null_complemented_row = true;
1022 }
1023
1024 SetReadingProbeRowState();
1025
1026 if (return_null_complemented_row) {
1027 m_build_input->SetNullRowFlag(true);
1028 return 0;
1029 }
1030 return -1;
1031 }
1032
1033 // We have a matching row ready.
1034 switch (m_join_type) {
1035 case JoinType::SEMI:
1036 // Semijoin should return the first matching row, and then go to the next
1037 // row from the probe input.
1038 SetReadingProbeRowState();
1039 break;
1040 case JoinType::ANTI:
1041 // Antijoin should immediately go to the next row from the probe input,
1042 // without returning the matching row.
1043 SetReadingProbeRowState();
1044 return -1; // Read the next row.
1045 case JoinType::OUTER:
1046 case JoinType::INNER:
1047 // Inner join should return all matching rows from the hash table before
1048 // moving to the next row from the probe input.
1049 m_state = State::READING_FROM_HASH_TABLE;
1050 break;
1051 }
1052
1053 ++m_hash_map_iterator;
1054 return 0;
1055 }
1056
Read()1057 int HashJoinIterator::Read() {
1058 for (;;) {
1059 if (thd()->killed) { // Aborted by user.
1060 thd()->send_kill_message();
1061 return 1;
1062 }
1063
1064 switch (m_state) {
1065 case State::LOADING_NEXT_CHUNK_PAIR:
1066 if (ReadNextHashJoinChunk()) {
1067 return 1;
1068 }
1069 break;
1070 case State::READING_ROW_FROM_PROBE_ITERATOR:
1071 if (ReadRowFromProbeIterator()) {
1072 return 1;
1073 }
1074 break;
1075 case State::READING_ROW_FROM_PROBE_CHUNK_FILE:
1076 if (ReadRowFromProbeChunkFile()) {
1077 return 1;
1078 }
1079 break;
1080 case State::READING_ROW_FROM_PROBE_ROW_SAVING_FILE:
1081 if (ReadRowFromProbeRowSavingFile()) {
1082 return 1;
1083 }
1084 break;
1085 case State::READING_FIRST_ROW_FROM_HASH_TABLE:
1086 case State::READING_FROM_HASH_TABLE: {
1087 const int res = ReadNextJoinedRowFromHashTable();
1088 if (res == 0) {
1089 // A joined row is ready, so send it to the client.
1090 return 0;
1091 }
1092
1093 if (res == -1) {
1094 // No more matching rows in the hash table, or antijoin found a
1095 // matching row. Read a new row from the probe input.
1096 continue;
1097 }
1098
1099 // An error occured, so abort the join.
1100 DBUG_ASSERT(res == 1);
1101 return res;
1102 }
1103 case State::END_OF_ROWS:
1104 return -1;
1105 }
1106 }
1107
1108 // Unreachable.
1109 DBUG_ASSERT(false);
1110 return 1;
1111 }
1112
DebugString() const1113 std::vector<std::string> HashJoinIterator::DebugString() const {
1114 std::string ret;
1115
1116 switch (m_join_type) {
1117 case JoinType::INNER:
1118 ret += "Inner hash join";
1119 break;
1120 case JoinType::SEMI:
1121 ret += "Hash semijoin";
1122 break;
1123 case JoinType::ANTI:
1124 ret += "Hash antijoin";
1125 break;
1126 case JoinType::OUTER:
1127 ret += "Left hash join";
1128 break;
1129 default:
1130 DBUG_ASSERT(false);
1131 ret += "not implemented";
1132 break;
1133 }
1134
1135 if (m_join_conditions.empty()) {
1136 ret.append(" (no condition)");
1137 } else {
1138 for (const HashJoinCondition &join_condition : m_join_conditions) {
1139 if (join_condition.join_condition() !=
1140 m_join_conditions[0].join_condition()) {
1141 ret.push_back(',');
1142 }
1143 if (!join_condition.store_full_sort_key()) {
1144 ret.append(" (<hash>(" + ItemToString(join_condition.left_extractor()) +
1145 ")=<hash>(" +
1146 ItemToString(join_condition.right_extractor()) + "))");
1147 } else {
1148 ret.append(" " + ItemToString(join_condition.join_condition()));
1149 }
1150 }
1151 }
1152
1153 if (m_extra_condition != nullptr) {
1154 ret.append(", extra conditions: " + ItemToString(m_extra_condition));
1155 }
1156
1157 return {ret};
1158 }
1159
InitWritingToProbeRowSavingFile()1160 bool HashJoinIterator::InitWritingToProbeRowSavingFile() {
1161 m_write_to_probe_row_saving = true;
1162 return m_probe_row_saving_write_file.Init(m_probe_input_tables,
1163 m_join_type == JoinType::OUTER);
1164 }
1165
InitReadingFromProbeRowSavingFile()1166 bool HashJoinIterator::InitReadingFromProbeRowSavingFile() {
1167 m_probe_row_saving_read_file = std::move(m_probe_row_saving_write_file);
1168 m_probe_row_saving_read_file_current_row = 0;
1169 m_read_from_probe_row_saving = true;
1170 return m_probe_row_saving_read_file.Rewind();
1171 }
1172
SetReadingProbeRowState()1173 void HashJoinIterator::SetReadingProbeRowState() {
1174 switch (m_hash_join_type) {
1175 case HashJoinType::IN_MEMORY:
1176 m_state = State::READING_ROW_FROM_PROBE_ITERATOR;
1177 break;
1178 case HashJoinType::IN_MEMORY_WITH_HASH_TABLE_REFILL:
1179 if (m_join_type == JoinType::INNER) {
1180 // As inner joins does not need probe row match flags, probe row saving
1181 // will never be activated for inner joins.
1182 m_state = State::READING_ROW_FROM_PROBE_ITERATOR;
1183 } else {
1184 m_state = State::READING_ROW_FROM_PROBE_ROW_SAVING_FILE;
1185 }
1186 break;
1187 case HashJoinType::SPILL_TO_DISK:
1188 if (m_read_from_probe_row_saving) {
1189 // Probe row saving may be activated if a build chunk did not fit in
1190 // memory.
1191 m_state = State::READING_ROW_FROM_PROBE_ROW_SAVING_FILE;
1192 return;
1193 }
1194 m_state = State::READING_ROW_FROM_PROBE_CHUNK_FILE;
1195 break;
1196 }
1197 }
1198