1 /* Copyright (c) 2018, 2020, Oracle and/or its affiliates. All rights reserved.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
22 
23 #include "sql/hash_join_iterator.h"
24 
25 #include <sys/types.h>
26 #include <algorithm>
27 #include <cmath>
28 #include <string>
29 #include <utility>
30 #include <vector>
31 
32 #include "extra/lz4/my_xxhash.h"
33 #include "field_types.h"
34 #include "my_alloc.h"
35 #include "my_bit.h"
36 #include "my_bitmap.h"
37 #include "my_dbug.h"
38 #include "my_inttypes.h"
39 #include "my_sys.h"
40 #include "mysqld_error.h"
41 #include "scope_guard.h"
42 #include "sql/handler.h"
43 #include "sql/hash_join_buffer.h"
44 #include "sql/item.h"
45 #include "sql/item_cmpfunc.h"
46 #include "sql/pfs_batch_mode.h"
47 #include "sql/row_iterator.h"
48 #include "sql/sql_class.h"
49 #include "sql/sql_executor.h"
50 #include "sql/sql_optimizer.h"
51 #include "sql/sql_select.h"
52 #include "sql/table.h"
53 
54 constexpr size_t HashJoinIterator::kMaxChunks;
55 
HashJoinIterator(THD * thd,unique_ptr_destroy_only<RowIterator> build_input,qep_tab_map build_input_tables,unique_ptr_destroy_only<RowIterator> probe_input,qep_tab_map probe_input_tables,size_t max_memory_available,const std::vector<HashJoinCondition> & join_conditions,bool allow_spill_to_disk,JoinType join_type,const JOIN * join,const std::vector<Item * > & extra_conditions)56 HashJoinIterator::HashJoinIterator(
57     THD *thd, unique_ptr_destroy_only<RowIterator> build_input,
58     qep_tab_map build_input_tables,
59     unique_ptr_destroy_only<RowIterator> probe_input,
60     qep_tab_map probe_input_tables, size_t max_memory_available,
61     const std::vector<HashJoinCondition> &join_conditions,
62     bool allow_spill_to_disk, JoinType join_type, const JOIN *join,
63     const std::vector<Item *> &extra_conditions)
64     : RowIterator(thd),
65       m_state(State::READING_ROW_FROM_PROBE_ITERATOR),
66       m_build_input(move(build_input)),
67       m_probe_input(move(probe_input)),
68       m_probe_input_tables(join, probe_input_tables),
69       m_build_input_tables(join, build_input_tables),
70       m_row_buffer(m_build_input_tables, join_conditions, max_memory_available),
71       m_join_conditions(PSI_NOT_INSTRUMENTED, join_conditions.data(),
72                         join_conditions.data() + join_conditions.size()),
73       m_chunk_files_on_disk(thd->mem_root, kMaxChunks),
74       m_allow_spill_to_disk(allow_spill_to_disk),
75       m_join_type(join_type) {
76   DBUG_ASSERT(m_build_input != nullptr);
77   DBUG_ASSERT(m_probe_input != nullptr);
78 
79   // If there are multiple extra conditions, merge them into a single AND-ed
80   // condition, so evaluation of the item is a bit easier.
81   if (extra_conditions.size() == 1) {
82     m_extra_condition = extra_conditions.front();
83   } else if (extra_conditions.size() > 1) {
84     List<Item> items;
85     for (Item *cond : extra_conditions) {
86       items.push_back(cond);
87     }
88     m_extra_condition = new Item_cond_and(items);
89     m_extra_condition->quick_fix_field();
90     m_extra_condition->update_used_tables();
91     m_extra_condition->apply_is_true();
92   }
93 
94   // Mark that this iterator will provide the row ID, so that iterators above
95   // this one does not call position(). See QEP_TAB::rowid_status for more
96   // details.
97   for (const hash_join_buffer::Table &it : m_build_input_tables.tables()) {
98     if (it.qep_tab->rowid_status == NEED_TO_CALL_POSITION_FOR_ROWID) {
99       it.qep_tab->rowid_status = ROWID_PROVIDED_BY_ITERATOR_READ_CALL;
100     }
101   }
102 
103   for (const hash_join_buffer::Table &it : m_probe_input_tables.tables()) {
104     if (it.qep_tab->rowid_status == NEED_TO_CALL_POSITION_FOR_ROWID) {
105       it.qep_tab->rowid_status = ROWID_PROVIDED_BY_ITERATOR_READ_CALL;
106     }
107   }
108 
109   if (m_probe_input_tables.tables().size() == 1) {
110     // If there is more than one table, batch mode will be handled by the join
111     // iterators on the probe side.
112     m_probe_input_batch_mode =
113         m_probe_input_tables.tables()[0].qep_tab->pfs_batch_update(join);
114   }
115 }
116 
InitRowBuffer()117 bool HashJoinIterator::InitRowBuffer() {
118   // After the row buffer is initialized, we want the row buffer iterators to
119   // point to the end of the row buffer in order to have a clean state. But on
120   // some platforms, especially windows, the iterator assignment operator will
121   // try to access the data it points to. This may be problematic if the hash
122   // join iterator is being re-inited; the iterators will point to data that has
123   // already been freed when doing the iterator assignment. To avoid the
124   // iterators to point to any data, call the destructors so that they have a
125   // clean state.
126   {
127     // Due to a bug in LLVM, we have to introduce a non-nested alias in order to
128     // call the destructor (https://bugs.llvm.org//show_bug.cgi?id=12350).
129     using iterator = hash_join_buffer::HashJoinRowBuffer::hash_map_iterator;
130     m_hash_map_iterator.iterator::~iterator();
131     m_hash_map_end.iterator::~iterator();
132   }
133 
134   if (m_row_buffer.Init(kHashTableSeed)) {
135     DBUG_ASSERT(thd()->is_error());  // my_error should have been called.
136     return true;
137   }
138 
139   m_hash_map_iterator = m_row_buffer.end();
140   m_hash_map_end = m_row_buffer.end();
141   return false;
142 }
143 
144 // Mark that blobs should be copied for each table that contains at least one
145 // geometry column.
MarkCopyBlobsIfTableContainsGeometry(const hash_join_buffer::TableCollection & table_collection)146 static void MarkCopyBlobsIfTableContainsGeometry(
147     const hash_join_buffer::TableCollection &table_collection) {
148   for (const hash_join_buffer::Table &table : table_collection.tables()) {
149     for (const hash_join_buffer::Column &col : table.columns) {
150       if (col.field_type == MYSQL_TYPE_GEOMETRY) {
151         table.qep_tab->table()->copy_blobs = true;
152         break;
153       }
154     }
155   }
156 }
157 
InitProbeIterator()158 bool HashJoinIterator::InitProbeIterator() {
159   DBUG_ASSERT(m_state == State::READING_ROW_FROM_PROBE_ITERATOR);
160 
161   if (m_probe_input->Init()) {
162     return true;
163   }
164 
165   if (m_probe_input_batch_mode) {
166     m_probe_input->StartPSIBatchMode();
167   }
168   return false;
169 }
170 
Init()171 bool HashJoinIterator::Init() {
172   // Prepare to read the build input into the hash map.
173   if (m_build_input->Init()) {
174     DBUG_ASSERT(thd()->is_error());  // my_error should have been called.
175     return true;
176   }
177 
178   // We always start out by doing everything in memory.
179   m_hash_join_type = HashJoinType::IN_MEMORY;
180   m_write_to_probe_row_saving = false;
181 
182   m_build_iterator_has_more_rows = true;
183   m_probe_input->EndPSIBatchModeIfStarted();
184   m_probe_row_match_flag = false;
185 
186   // Set up the buffer that is used when
187   // a) moving a row between the tables' record buffers, and,
188   // b) when constructing a join key from join conditions.
189   size_t upper_row_size = 0;
190   if (!m_build_input_tables.has_blob_column()) {
191     upper_row_size =
192         hash_join_buffer::ComputeRowSizeUpperBound(m_build_input_tables);
193   }
194 
195   if (!m_probe_input_tables.has_blob_column()) {
196     upper_row_size = std::max(
197         upper_row_size,
198         hash_join_buffer::ComputeRowSizeUpperBound(m_probe_input_tables));
199   }
200 
201   if (m_temporary_row_and_join_key_buffer.reserve(upper_row_size)) {
202     my_error(ER_OUTOFMEMORY, MYF(0), upper_row_size);
203     return true;  // oom
204   }
205 
206   // If any of the tables contains a geometry column, we must ensure that
207   // the geometry data is copied to the row buffer (see
208   // Field_geom::store_internal) instead of only setting the pointer to the
209   // data. This is needed if the hash join spills to disk; when we read a row
210   // back from chunk file, row data is stored in a temporary buffer. If not told
211   // otherwise, Field_geom::store_internal will only store the pointer to the
212   // data, and not the data itself. The data this field points to will then
213   // become invalid when the temporary buffer is used for something else.
214   MarkCopyBlobsIfTableContainsGeometry(m_probe_input_tables);
215   MarkCopyBlobsIfTableContainsGeometry(m_build_input_tables);
216 
217   // Close any leftover files from previous iterations.
218   m_chunk_files_on_disk.clear();
219 
220   m_build_chunk_current_row = 0;
221   m_probe_chunk_current_row = 0;
222   m_current_chunk = -1;
223 
224   // Build the hash table
225   if (BuildHashTable()) {
226     DBUG_ASSERT(thd()->is_error());  // my_error should have been called.
227     return true;
228   }
229 
230   if (m_state == State::END_OF_ROWS) {
231     // BuildHashTable() decided that the join is done (the build input is
232     // empty, and we are in an inner-/semijoin. Anti-/outer join must output
233     // NULL-complemented rows from the probe input).
234     return false;
235   }
236 
237   if (m_join_type == JoinType::ANTI && m_join_conditions.empty() &&
238       m_extra_condition == nullptr && !m_row_buffer.empty()) {
239     // For degenerate antijoins, we know we will never output anything
240     // if there's anything in the hash table, so we can end right away.
241     // (We also don't need to read more than one row, but
242     // CreateHashJoinAccessPath() has already added a LIMIT 1 for us
243     // in this case.)
244     m_state = State::END_OF_ROWS;
245     return false;
246   }
247 
248   return InitProbeIterator();
249 }
250 
251 // Construct a join key from a list of join conditions, where the join key from
252 // each join condition is concatenated together in the output buffer
253 // "join_key_buffer". The function returns true if a SQL NULL value is found.
ConstructJoinKey(THD * thd,const Prealloced_array<HashJoinCondition,4> & join_conditions,table_map tables_bitmap,String * join_key_buffer)254 static bool ConstructJoinKey(
255     THD *thd, const Prealloced_array<HashJoinCondition, 4> &join_conditions,
256     table_map tables_bitmap, String *join_key_buffer) {
257   join_key_buffer->length(0);
258   for (const HashJoinCondition &hash_join_condition : join_conditions) {
259     if (hash_join_condition.join_condition()->append_join_key_for_hash_join(
260             thd, tables_bitmap, hash_join_condition, join_key_buffer)) {
261       // The join condition returned SQL NULL.
262       return true;
263     }
264   }
265   return false;
266 }
267 
268 // Write a single row to a HashJoinChunk. The row must lie in the record buffer
269 // (record[0]) for each involved table. The row is put into one of the chunks in
270 // the input vector "chunks"; which chunk to use is decided by the hash value of
271 // the join attribute.
WriteRowToChunk(THD * thd,Mem_root_array<ChunkPair> * chunks,bool write_to_build_chunk,const hash_join_buffer::TableCollection & tables,const Prealloced_array<HashJoinCondition,4> & join_conditions,const uint32 xxhash_seed,bool row_has_match,bool store_row_with_null_in_join_key,String * join_key_and_row_buffer)272 static bool WriteRowToChunk(
273     THD *thd, Mem_root_array<ChunkPair> *chunks, bool write_to_build_chunk,
274     const hash_join_buffer::TableCollection &tables,
275     const Prealloced_array<HashJoinCondition, 4> &join_conditions,
276     const uint32 xxhash_seed, bool row_has_match,
277     bool store_row_with_null_in_join_key, String *join_key_and_row_buffer) {
278   bool null_in_join_key = ConstructJoinKey(
279       thd, join_conditions, tables.tables_bitmap(), join_key_and_row_buffer);
280 
281   if (null_in_join_key && !store_row_with_null_in_join_key) {
282     // NULL values will never match in a inner join or a semijoin. The optimizer
283     // will often set up a NULL filter for inner joins, but not in all cases. So
284     // we must handle this gracefully instead of asserting.
285     return false;
286   }
287 
288   const uint64_t join_key_hash =
289       MY_XXH64(join_key_and_row_buffer->ptr(),
290                join_key_and_row_buffer->length(), xxhash_seed);
291 
292   DBUG_ASSERT((chunks->size() & (chunks->size() - 1)) == 0);
293   // Since we know that the number of chunks will be a power of two, do a
294   // bitwise AND instead of (join_key_hash % chunks->size()).
295   const size_t chunk_index = join_key_hash & (chunks->size() - 1);
296   ChunkPair &chunk_pair = (*chunks)[chunk_index];
297   if (write_to_build_chunk) {
298     return chunk_pair.build_chunk.WriteRowToChunk(join_key_and_row_buffer,
299                                                   row_has_match);
300   } else {
301     return chunk_pair.probe_chunk.WriteRowToChunk(join_key_and_row_buffer,
302                                                   row_has_match);
303   }
304 }
305 
306 // Request the row ID for all tables where it should be kept.
RequestRowId(const Prealloced_array<hash_join_buffer::Table,4> & tables)307 void RequestRowId(const Prealloced_array<hash_join_buffer::Table, 4> &tables) {
308   for (const hash_join_buffer::Table &it : tables) {
309     TABLE *table = it.qep_tab->table();
310     if (it.rowid_status == NEED_TO_CALL_POSITION_FOR_ROWID &&
311         can_call_position(table)) {
312       table->file->position(table->record[0]);
313     }
314   }
315 }
316 
317 // Write all the remaining rows from the given iterator out to chunk files
318 // on disk. If the function returns true, an unrecoverable error occurred
319 // (IO error etc.).
WriteRowsToChunks(THD * thd,RowIterator * iterator,const hash_join_buffer::TableCollection & tables,const Prealloced_array<HashJoinCondition,4> & join_conditions,const uint32 xxhash_seed,Mem_root_array<ChunkPair> * chunks,bool write_to_build_chunk,bool write_rows_with_null_in_join_key,String * join_key_buffer)320 static bool WriteRowsToChunks(
321     THD *thd, RowIterator *iterator,
322     const hash_join_buffer::TableCollection &tables,
323     const Prealloced_array<HashJoinCondition, 4> &join_conditions,
324     const uint32 xxhash_seed, Mem_root_array<ChunkPair> *chunks,
325     bool write_to_build_chunk, bool write_rows_with_null_in_join_key,
326     String *join_key_buffer) {
327   for (;;) {  // Termination condition within loop.
328     int res = iterator->Read();
329     if (res == 1) {
330       DBUG_ASSERT(thd->is_error());  // my_error should have been called.
331       return true;
332     }
333 
334     if (res == -1) {
335       return false;  // EOF; success.
336     }
337 
338     DBUG_ASSERT(res == 0);
339 
340     RequestRowId(tables.tables());
341     if (WriteRowToChunk(thd, chunks, write_to_build_chunk, tables,
342                         join_conditions, xxhash_seed, /*row_has_match=*/false,
343                         write_rows_with_null_in_join_key, join_key_buffer)) {
344       DBUG_ASSERT(thd->is_error());  // my_error should have been called.
345       return true;
346     }
347   }
348 }
349 
350 // Initialize all HashJoinChunks for both inputs. When estimating how many
351 // chunks we need, we first assume that the estimated row count from the planner
352 // is correct. Furthermore, we assume that the current row buffer is
353 // representative of the overall row density, so that if we divide the
354 // (estimated) number of remaining rows by the number of rows read so far and
355 // use that as our chunk count, we will get on-disk chunks that each will fit
356 // into RAM when we read them back later. As a safeguard, we subtract a small
357 // percentage (reduction factor), since we'd rather get one or two extra chunks
358 // instead of having to re-read the probe input multiple times. We limit the
359 // number of chunks per input, so we don't risk hitting the server's limit for
360 // number of open files.
InitializeChunkFiles(size_t estimated_rows_produced_by_join,size_t rows_in_hash_table,size_t max_chunk_files,const hash_join_buffer::TableCollection & probe_tables,const hash_join_buffer::TableCollection & build_tables,bool include_match_flag_for_probe,Mem_root_array<ChunkPair> * chunk_pairs)361 static bool InitializeChunkFiles(
362     size_t estimated_rows_produced_by_join, size_t rows_in_hash_table,
363     size_t max_chunk_files,
364     const hash_join_buffer::TableCollection &probe_tables,
365     const hash_join_buffer::TableCollection &build_tables,
366     bool include_match_flag_for_probe, Mem_root_array<ChunkPair> *chunk_pairs) {
367   constexpr double kReductionFactor = 0.9;
368   const size_t reduced_rows_in_hash_table =
369       std::max<size_t>(1, rows_in_hash_table * kReductionFactor);
370 
371   // Avoid underflow, since the hash table may contain more rows than the
372   // estimate from the planner.
373   const size_t remaining_rows =
374       std::max(rows_in_hash_table, estimated_rows_produced_by_join) -
375       rows_in_hash_table;
376 
377   const size_t chunks_needed = std::max<size_t>(
378       1, std::ceil(remaining_rows / reduced_rows_in_hash_table));
379   const size_t num_chunks = std::min(max_chunk_files, chunks_needed);
380 
381   // Ensure that the number of chunks is always a power of two. This allows
382   // us to do some optimizations when calculating which chunk a row should
383   // be placed in.
384   const size_t num_chunks_pow_2 = my_round_up_to_next_power(num_chunks);
385 
386   DBUG_ASSERT(chunk_pairs != nullptr && chunk_pairs->empty());
387   chunk_pairs->resize(num_chunks_pow_2);
388   for (ChunkPair &chunk_pair : *chunk_pairs) {
389     if (chunk_pair.build_chunk.Init(build_tables, /*uses_match_flags=*/false) ||
390         chunk_pair.probe_chunk.Init(probe_tables,
391                                     include_match_flag_for_probe)) {
392       my_error(ER_TEMP_FILE_WRITE_FAILURE, MYF(0));
393       return true;
394     }
395   }
396 
397   return false;
398 }
399 
BuildHashTable()400 bool HashJoinIterator::BuildHashTable() {
401   if (!m_build_iterator_has_more_rows) {
402     m_state = State::END_OF_ROWS;
403     return false;
404   }
405 
406   // Restore the last row that was inserted into the row buffer. This is
407   // necessary if the build input is a nested loop with a filter on the inner
408   // side, like this:
409   //
410   //        +---Hash join---+
411   //        |               |
412   //  Nested loop          t1
413   //  |         |
414   //  t3    Filter: (t3.i < t2.i)
415   //               |
416   //              t2
417   //
418   // If the hash join is not allowed to spill to disk, we may need to re-fill
419   // the hash table multiple times. If the nested loop happens to be in the
420   // state "reading inner rows" when a re-fill is triggered, the filter will
421   // look at the data in t3's record buffer in order to evaluate the filter. The
422   // row in t3's record buffer may be any of the rows that was stored in the
423   // hash table, and not the last row returned from t3. To ensure that the
424   // filter is looking at the correct data, restore the last row that was
425   // inserted into the hash table.
426   if (m_row_buffer.Initialized() &&
427       m_row_buffer.LastRowStored() != m_row_buffer.end()) {
428     hash_join_buffer::LoadIntoTableBuffers(
429         m_build_input_tables, m_row_buffer.LastRowStored()->second);
430   }
431 
432   if (InitRowBuffer()) {
433     return true;
434   }
435 
436   const bool reject_duplicate_keys = RejectDuplicateKeys();
437   const bool store_rows_with_null_in_join_key = m_join_type == JoinType::OUTER;
438 
439   // If Init() is called multiple times (e.g., if hash join is inside an
440   // dependent subquery), we must clear the NULL row flag, as it may have been
441   // set by the previous executing of this hash join.
442   m_build_input->SetNullRowFlag(/*is_null_row=*/false);
443 
444   PFSBatchMode batch_mode(m_build_input.get());
445   for (;;) {  // Termination condition within loop.
446     int res = m_build_input->Read();
447     if (res == 1) {
448       DBUG_ASSERT(thd()->is_error());  // my_error should have been called.
449       return true;
450     }
451 
452     if (res == -1) {
453       m_build_iterator_has_more_rows = false;
454       // If the build input was empty, the result of inner joins and semijoins
455       // will also be empty. However, if the build input was empty, the output
456       // of antijoins will be all the rows from the probe input.
457       if (m_row_buffer.empty() && m_join_type != JoinType::ANTI &&
458           m_join_type != JoinType::OUTER) {
459         m_state = State::END_OF_ROWS;
460         return false;
461       }
462 
463       // As we managed to read to the end of the build iterator, this is the
464       // last time we will read from the probe iterator. Thus, we can disable
465       // probe row saving again (it was enabled if the hash table ran out of
466       // memory _and_ we were not allowed to spill to disk).
467       m_write_to_probe_row_saving = false;
468       SetReadingProbeRowState();
469       return false;
470     }
471     DBUG_ASSERT(res == 0);
472     RequestRowId(m_build_input_tables.tables());
473 
474     const hash_join_buffer::StoreRowResult store_row_result =
475         m_row_buffer.StoreRow(thd(), reject_duplicate_keys,
476                               store_rows_with_null_in_join_key);
477     switch (store_row_result) {
478       case hash_join_buffer::StoreRowResult::ROW_STORED:
479         break;
480       case hash_join_buffer::StoreRowResult::BUFFER_FULL: {
481         // The row buffer is full, so start spilling to disk (if allowed). Note
482         // that the row buffer checks for OOM _after_ the row was inserted, so
483         // we should always manage to insert at least one row.
484         DBUG_ASSERT(!m_row_buffer.empty());
485 
486         // If we are not allowed to spill to disk, just go on to reading from
487         // the probe iterator.
488         if (!m_allow_spill_to_disk) {
489           if (m_join_type != JoinType::INNER) {
490             // Enable probe row saving, so that unmatched probe rows are written
491             // to the probe row saving file. After the next refill of the hash
492             // table, we will read rows from the probe row saving file, ensuring
493             // that we only read unmatched probe rows.
494             InitWritingToProbeRowSavingFile();
495           }
496           SetReadingProbeRowState();
497           return false;
498         }
499 
500         // Ideally, we would use the estimated row count from the iterator. But
501         // not all iterators has the row count available (i.e.
502         // RemoveDuplicatesIterator), so get the row count directly from the
503         // QEP_TAB.
504         const QEP_TAB *last_table_in_join =
505             m_build_input_tables.tables().back().qep_tab;
506         if (InitializeChunkFiles(
507                 last_table_in_join->position()->prefix_rowcount,
508                 m_row_buffer.size(), kMaxChunks, m_probe_input_tables,
509                 m_build_input_tables,
510                 /*include_match_flag_for_probe=*/m_join_type == JoinType::OUTER,
511                 &m_chunk_files_on_disk)) {
512           DBUG_ASSERT(thd()->is_error());  // my_error should have been called.
513           return true;
514         }
515 
516         // Write out the remaining rows from the build input out to chunk files.
517         // The probe input will be written out to chunk files later; we will do
518         // it _after_ we have checked the probe input for matches against the
519         // rows that are already written to the hash table. An alternative
520         // approach would be to write out the remaining rows from the build
521         // _and_ the rows that already are in the hash table. In that case, we
522         // could also write out the entire probe input to disk here as well. But
523         // we don't want to waste the rows that we already have stored in
524         // memory.
525         //
526         // We never write out rows with NULL in condition for the build/right
527         // input, as these rows will never match in a join condition.
528         if (WriteRowsToChunks(thd(), m_build_input.get(), m_build_input_tables,
529                               m_join_conditions, kChunkPartitioningHashSeed,
530                               &m_chunk_files_on_disk,
531                               true /* write_to_build_chunks */,
532                               false /* write_rows_with_null_in_join_key */,
533                               &m_temporary_row_and_join_key_buffer)) {
534           DBUG_ASSERT(thd()->is_error());  // my_error should have been called.
535           return true;
536         }
537 
538         // Flush and position all chunk files from the build input at the
539         // beginning.
540         for (ChunkPair &chunk_pair : m_chunk_files_on_disk) {
541           if (chunk_pair.build_chunk.Rewind()) {
542             DBUG_ASSERT(
543                 thd()->is_error());  // my_error should have been called.
544             return true;
545           }
546         }
547         SetReadingProbeRowState();
548         return false;
549       }
550       case hash_join_buffer::StoreRowResult::FATAL_ERROR:
551         // An unrecoverable error. Most likely, malloc failed, so report OOM.
552         // Note that we cannot say for sure how much memory we tried to allocate
553         // when failing, so just report 'join_buffer_size' as the amount of
554         // memory we tried to allocate.
555         my_error(ER_OUTOFMEMORY, MYF(ME_FATALERROR),
556                  thd()->variables.join_buff_size);
557         return true;
558     }
559   }
560 }
561 
ReadNextHashJoinChunk()562 bool HashJoinIterator::ReadNextHashJoinChunk() {
563   // See if we should proceed to the next pair of chunk files. In general,
564   // it works like this; if we are at the end of the build chunk, move to the
565   // next. If not, keep reading from the same chunk pair. We also move to the
566   // next pair of chunk files if the probe chunk file is empty.
567   bool move_to_next_chunk = false;
568   if (m_current_chunk == -1) {
569     // We are before the first chunk, so move to the next.
570     move_to_next_chunk = true;
571   } else if (m_build_chunk_current_row >=
572              m_chunk_files_on_disk[m_current_chunk].build_chunk.num_rows()) {
573     // We are done reading all the rows from the build chunk.
574     move_to_next_chunk = true;
575   } else if (m_chunk_files_on_disk[m_current_chunk].probe_chunk.num_rows() ==
576              0) {
577     // The probe chunk file is empty.
578     move_to_next_chunk = true;
579   }
580 
581   if (move_to_next_chunk) {
582     m_current_chunk++;
583     m_build_chunk_current_row = 0;
584 
585     // Since we are moving to a new set of chunk files, ensure that we read from
586     // the chunk file and not from the probe row saving file.
587     m_read_from_probe_row_saving = false;
588   }
589 
590   if (m_current_chunk == static_cast<int>(m_chunk_files_on_disk.size())) {
591     // We have moved past the last chunk, so we are done.
592     m_state = State::END_OF_ROWS;
593     return false;
594   }
595 
596   if (InitRowBuffer()) {
597     return true;
598   }
599 
600   HashJoinChunk &build_chunk =
601       m_chunk_files_on_disk[m_current_chunk].build_chunk;
602 
603   const bool reject_duplicate_keys = RejectDuplicateKeys();
604   const bool store_rows_with_null_in_join_key = m_join_type == JoinType::OUTER;
605   for (; m_build_chunk_current_row < build_chunk.num_rows();
606        ++m_build_chunk_current_row) {
607     // Read the next row from the chunk file, and put it in the in-memory row
608     // buffer. If the buffer goes full, do the probe phase against the rows we
609     // managed to put in the buffer and continue reading where we left in the
610     // next iteration.
611     if (build_chunk.LoadRowFromChunk(&m_temporary_row_and_join_key_buffer,
612                                      /*matched=*/nullptr)) {
613       DBUG_ASSERT(thd()->is_error());  // my_error should have been called.
614       return true;
615     }
616 
617     hash_join_buffer::StoreRowResult store_row_result = m_row_buffer.StoreRow(
618         thd(), reject_duplicate_keys, store_rows_with_null_in_join_key);
619 
620     if (store_row_result == hash_join_buffer::StoreRowResult::BUFFER_FULL) {
621       // The row buffer checks for OOM _after_ the row was inserted, so we
622       // should always manage to insert at least one row.
623       DBUG_ASSERT(!m_row_buffer.empty());
624 
625       // Since the last row read was actually stored in the buffer, increment
626       // the row counter manually before breaking out of the loop.
627       ++m_build_chunk_current_row;
628       break;
629     } else if (store_row_result ==
630                hash_join_buffer::StoreRowResult::FATAL_ERROR) {
631       // An unrecoverable error. Most likely, malloc failed, so report OOM.
632       // Note that we cannot say for sure how much memory we tried to allocate
633       // when failing, so just report 'join_buffer_size' as the amount of
634       // memory we tried to allocate.
635       my_error(ER_OUTOFMEMORY, MYF(ME_FATALERROR),
636                thd()->variables.join_buff_size);
637       return true;
638     }
639 
640     DBUG_ASSERT(store_row_result ==
641                 hash_join_buffer::StoreRowResult::ROW_STORED);
642   }
643 
644   // Prepare to do a lookup in the hash table for all rows from the probe
645   // chunk.
646   if (m_chunk_files_on_disk[m_current_chunk].probe_chunk.Rewind()) {
647     DBUG_ASSERT(thd()->is_error());  // my_error should have been called.
648     return true;
649   }
650   m_probe_chunk_current_row = 0;
651   SetReadingProbeRowState();
652 
653   if (m_build_chunk_current_row < build_chunk.num_rows() &&
654       m_join_type != JoinType::INNER) {
655     // The build chunk did not fit into memory, causing us to refill the hash
656     // table once the probe input is consumed. If we don't take any special
657     // action, we can end up outputting the same probe row twice if the probe
658     // phase finds a match in both iterations through the hash table.
659     // By enabling probe row saving, unmatched probe rows are written to a probe
660     // row saving file. After the next hash table refill, we load the probe rows
661     // from the probe row saving file instead of from the build chunk, and thus
662     // ensuring that we only see unmatched probe rows. Note that we have not
663     // started reading probe rows yet, but we are about to do so.
664     InitWritingToProbeRowSavingFile();
665   } else {
666     m_write_to_probe_row_saving = false;
667   }
668 
669   return false;
670 }
671 
ReadRowFromProbeIterator()672 bool HashJoinIterator::ReadRowFromProbeIterator() {
673   DBUG_ASSERT(m_current_chunk == -1);
674 
675   int result = m_probe_input->Read();
676   if (result == 1) {
677     DBUG_ASSERT(thd()->is_error());  // my_error should have been called.
678     return true;
679   }
680 
681   if (result == 0) {
682     RequestRowId(m_probe_input_tables.tables());
683 
684     // A row from the probe iterator is ready.
685     LookupProbeRowInHashTable();
686     return false;
687   }
688 
689   DBUG_ASSERT(result == -1);
690   m_probe_input->EndPSIBatchModeIfStarted();
691 
692   // The probe iterator is out of rows. We may be in three different situations
693   // here (ordered from most common to less common):
694   // 1. The build input is also empty, and the join is done. The iterator state
695   //    will go into "LOADING_NEXT_CHUNK_PAIR", and we will see that there are
696   //    no chunk files when trying to load the next pair of chunk files.
697   // 2. We have degraded into an on-disk hash join, and we will now start
698   //    reading from chunk files on disk.
699   // 3. The build input is not empty, and we have not degraded into an on-disk
700   //    hash join (i.e. we were not allowed due to a LIMIT in the query),
701   //    re-populate the hash table with the remaining rows from the build input.
702   if (m_allow_spill_to_disk) {
703     m_hash_join_type = HashJoinType::SPILL_TO_DISK;
704     m_state = State::LOADING_NEXT_CHUNK_PAIR;
705     return false;
706   }
707 
708   m_hash_join_type = HashJoinType::IN_MEMORY_WITH_HASH_TABLE_REFILL;
709   if (m_write_to_probe_row_saving) {
710     // If probe row saving is enabled, it means that the probe row saving write
711     // file contains all the rows from the probe input that should be
712     // read/processed again. We must swap the probe row saving writing and probe
713     // row saving reading file _before_ calling BuildHashTable, since
714     // BuildHashTable may initialize (and thus clear) the probe row saving write
715     // file, loosing any rows written to said file.
716     if (InitReadingFromProbeRowSavingFile()) {
717       DBUG_ASSERT(thd()->is_error());  // my_error should have been called.
718       return true;
719     }
720   }
721 
722   if (BuildHashTable()) {
723     DBUG_ASSERT(thd()->is_error());  // my_error should have been called.
724     return true;
725   }
726 
727   switch (m_state) {
728     case State::END_OF_ROWS:
729       // BuildHashTable() decided that the join is done (the build input is
730       // empty, and we are in an inner-/semijoin. Anti-/outer join must output
731       // NULL-complemented rows from the probe input).
732       return false;
733     case State::READING_ROW_FROM_PROBE_ITERATOR:
734       // Start reading from the beginning of the probe iterator.
735       return InitProbeIterator();
736     case State::READING_ROW_FROM_PROBE_ROW_SAVING_FILE:
737       // The probe row saving read file is already initialized for reading
738       // further up in this function.
739       return false;
740     default:
741       DBUG_ASSERT(false);
742       return true;
743   }
744 }
745 
ReadRowFromProbeChunkFile()746 bool HashJoinIterator::ReadRowFromProbeChunkFile() {
747   DBUG_ASSERT(on_disk_hash_join() && m_current_chunk != -1);
748 
749   // Read one row from the current HashJoinChunk, and put
750   // that row into the record buffer of the probe input table.
751   HashJoinChunk &current_probe_chunk =
752       m_chunk_files_on_disk[m_current_chunk].probe_chunk;
753   if (m_probe_chunk_current_row >= current_probe_chunk.num_rows()) {
754     // No more rows in the current probe chunk, so load the next chunk of
755     // build rows into the hash table.
756     if (m_write_to_probe_row_saving) {
757       // If probe row saving is enabled, the build chunk did not fit in memory.
758       // This causes us to refill the hash table with the rows from the build
759       // chunk that did not fit, and thus read the probe chunk multiple times.
760       // This can be problematic for semijoin; we do not want to output a probe
761       // row that has a match in both parts of the hash table. To mitigate
762       // this, we write probe rows that does not have a match in the hash table
763       // to a probe row saving file (m_probe_row_saving_write_file), and read
764       // from said file instead of from the probe input the next time.
765       if (InitReadingFromProbeRowSavingFile()) {
766         DBUG_ASSERT(thd()->is_error());  // my_error should have been called.
767         return true;
768       }
769     } else {
770       m_read_from_probe_row_saving = false;
771     }
772 
773     m_state = State::LOADING_NEXT_CHUNK_PAIR;
774     return false;
775   } else if (current_probe_chunk.LoadRowFromChunk(
776                  &m_temporary_row_and_join_key_buffer,
777                  &m_probe_row_match_flag)) {
778     DBUG_ASSERT(thd()->is_error());  // my_error should have been called.
779     return true;
780   }
781 
782   m_probe_chunk_current_row++;
783 
784   // A row from the chunk file is ready.
785   LookupProbeRowInHashTable();
786   return false;
787 }
788 
ReadRowFromProbeRowSavingFile()789 bool HashJoinIterator::ReadRowFromProbeRowSavingFile() {
790   // Read one row from the probe row saving file, and put that row into the
791   // record buffer of the probe input table.
792   if (m_probe_row_saving_read_file_current_row >=
793       m_probe_row_saving_read_file.num_rows()) {
794     // We are done reading all the rows from the probe row saving file. If probe
795     // row saving is still enabled, we have a new set of rows in the probe row
796     // saving write file.
797     if (m_write_to_probe_row_saving) {
798       if (InitReadingFromProbeRowSavingFile()) {
799         DBUG_ASSERT(thd()->is_error());  // my_error should have been called.
800         return true;
801       }
802     } else {
803       m_read_from_probe_row_saving = false;
804     }
805 
806     // If we are executing an on-disk hash join, go and load the next pair of
807     // chunk files. If we are doing everything in memory with multiple hash
808     // table refills, go and refill the hash table.
809     if (m_hash_join_type == HashJoinType::SPILL_TO_DISK) {
810       m_state = State::LOADING_NEXT_CHUNK_PAIR;
811       return false;
812     }
813     DBUG_ASSERT(m_hash_join_type ==
814                 HashJoinType::IN_MEMORY_WITH_HASH_TABLE_REFILL);
815 
816     // No more rows in the probe row saving file.
817     if (BuildHashTable()) {
818       DBUG_ASSERT(thd()->is_error());  // my_error should have been called.
819       return true;
820     }
821 
822     if (m_state == State::END_OF_ROWS) {
823       // BuildHashTable() decided that the join is done (the build input is
824       // empty).
825       return false;
826     }
827 
828     SetReadingProbeRowState();
829     return false;
830   } else if (m_probe_row_saving_read_file.LoadRowFromChunk(
831                  &m_temporary_row_and_join_key_buffer,
832                  &m_probe_row_match_flag)) {
833     DBUG_ASSERT(thd()->is_error());  // my_error should have been called.
834     return true;
835   }
836 
837   m_probe_row_saving_read_file_current_row++;
838 
839   // A row from the chunk file is ready.
840   LookupProbeRowInHashTable();
841   return false;
842 }
843 
LookupProbeRowInHashTable()844 void HashJoinIterator::LookupProbeRowInHashTable() {
845   if (m_join_conditions.empty()) {
846     // Skip the call to equal_range in case we don't have any join conditions.
847     // This can save up to 20% in case of multi-table joins.
848     m_hash_map_iterator = m_row_buffer.begin();
849     m_hash_map_end = m_row_buffer.end();
850     m_state = State::READING_FIRST_ROW_FROM_HASH_TABLE;
851     return;
852   }
853 
854   // Extract the join key from the probe input, and use that key as the lookup
855   // key in the hash table.
856   bool null_in_join_key = ConstructJoinKey(
857       thd(), m_join_conditions, m_probe_input_tables.tables_bitmap(),
858       &m_temporary_row_and_join_key_buffer);
859 
860   if (null_in_join_key) {
861     if (m_join_type == JoinType::ANTI || m_join_type == JoinType::OUTER) {
862       // SQL NULL was found, and we will never find a matching row in the hash
863       // table. Let us indicate that, so that a null-complemented row is
864       // returned.
865       m_hash_map_iterator = m_row_buffer.end();
866       m_hash_map_end = m_row_buffer.end();
867       m_state = State::READING_FIRST_ROW_FROM_HASH_TABLE;
868     } else {
869       SetReadingProbeRowState();
870     }
871     return;
872   }
873 
874   hash_join_buffer::Key key(
875       pointer_cast<const uchar *>(m_temporary_row_and_join_key_buffer.ptr()),
876       m_temporary_row_and_join_key_buffer.length());
877 
878   if ((m_join_type == JoinType::SEMI || m_join_type == JoinType::ANTI) &&
879       m_extra_condition == nullptr) {
880     // find() has a better average complexity than equal_range() (constant vs.
881     // linear in the number of matching elements). And for semijoins, we are
882     // only interested in the first match anyways, so this may give a nice
883     // speedup. An exception to this is if we have any "extra" conditions that
884     // needs to be evaluated after the hash table lookup, but before the row is
885     // returned; we may need to read through the entire hash table to find a row
886     // that satisfies the extra condition(s).
887     m_hash_map_iterator = m_row_buffer.find(key);
888     m_hash_map_end = m_row_buffer.end();
889   } else {
890     auto range = m_row_buffer.equal_range(key);
891     m_hash_map_iterator = range.first;
892     m_hash_map_end = range.second;
893   }
894 
895   m_state = State::READING_FIRST_ROW_FROM_HASH_TABLE;
896 }
897 
ReadJoinedRow()898 int HashJoinIterator::ReadJoinedRow() {
899   if (m_hash_map_iterator == m_hash_map_end) {
900     // Signal that we have reached the end of hash table entries. Let the caller
901     // determine which state we end up in.
902     return -1;
903   }
904 
905   // A row is ready in the hash table, so put the data from the hash table row
906   // into the record buffers of the build input tables.
907   hash_join_buffer::LoadIntoTableBuffers(m_build_input_tables,
908                                          m_hash_map_iterator->second);
909   return 0;
910 }
911 
WriteProbeRowToDiskIfApplicable()912 bool HashJoinIterator::WriteProbeRowToDiskIfApplicable() {
913   // If we are spilling to disk, we need to match the row against rows from
914   // the build input that are written out to chunk files. So we need to write
915   // the probe row to chunk files as well. Semijoin/antijoin has an exception to
916   // this; if the probe input already got a match in the hash table, we do not
917   // need to write it out to disk. Outer joins should always write the row out
918   // to disk, since the probe/left input should return NULL-complemented rows
919   // even if the join condition contains SQL NULL.
920   const bool write_rows_with_null_in_join_key = m_join_type == JoinType::OUTER;
921   if (m_state == State::READING_FIRST_ROW_FROM_HASH_TABLE) {
922     const bool found_match = m_hash_map_iterator != m_hash_map_end;
923 
924     if ((m_join_type == JoinType::INNER || m_join_type == JoinType::OUTER) ||
925         !found_match) {
926       if (on_disk_hash_join() && m_current_chunk == -1) {
927         if (WriteRowToChunk(thd(), &m_chunk_files_on_disk,
928                             false /* write_to_build_chunk */,
929                             m_probe_input_tables, m_join_conditions,
930                             kChunkPartitioningHashSeed, found_match,
931                             write_rows_with_null_in_join_key,
932                             &m_temporary_row_and_join_key_buffer)) {
933           return true;
934         }
935       }
936 
937       if (m_write_to_probe_row_saving &&
938           m_probe_row_saving_write_file.WriteRowToChunk(
939               &m_temporary_row_and_join_key_buffer,
940               found_match || m_probe_row_match_flag)) {
941         return true;
942       }
943     }
944   }
945 
946   return false;
947 }
948 
JoinedRowPassesExtraConditions() const949 bool HashJoinIterator::JoinedRowPassesExtraConditions() const {
950   if (m_extra_condition != nullptr) {
951     return m_extra_condition->val_int() != 0;
952   }
953 
954   return true;
955 }
956 
ReadNextJoinedRowFromHashTable()957 int HashJoinIterator::ReadNextJoinedRowFromHashTable() {
958   int res;
959   bool passes_extra_conditions = false;
960   do {
961     res = ReadJoinedRow();
962 
963     // ReadJoinedRow() can only return 0 (row is ready) or -1 (EOF).
964     DBUG_ASSERT(res == 0 || res == -1);
965 
966     // Evaluate any extra conditions that are attached to this iterator before
967     // we return a row.
968     if (res == 0) {
969       passes_extra_conditions = JoinedRowPassesExtraConditions();
970       if (thd()->is_error()) {
971         // Evaluation of extra conditions raised an error, so abort the join.
972         return 1;
973       }
974 
975       if (!passes_extra_conditions) {
976         // Advance to the next matching row in the hash table. Note that the
977         // iterator stays in the state READING_FIRST_ROW_FROM_HASH_TABLE even
978         // though we are not actually reading the first row anymore. This is
979         // because WriteProbeRowToDiskIfApplicable() needs to know if this is
980         // the first row that matches both the join condition and any extra
981         // conditions; only unmatched rows will be written to disk.
982         ++m_hash_map_iterator;
983       }
984     }
985   } while (res == 0 && !passes_extra_conditions);
986 
987   // The row passed all extra conditions (or we are out of rows in the hash
988   // table), so we can now write the row to disk.
989   // Inner and outer joins: Write out all rows from the probe input (given that
990   //   we have degraded into on-disk hash join).
991   // Semijoin and antijoin: Write out rows that do not have any matching row in
992   //   the hash table.
993   if (WriteProbeRowToDiskIfApplicable()) {
994     return 1;
995   }
996 
997   if (res == -1) {
998     // If we did not find a matching row in the hash table, antijoin and outer
999     // join should ouput the last row read from the probe input together with a
1000     // NULL-complemented row from the build input. However, in case of on-disk
1001     // antijoin, a row from the probe input can match a row from the build input
1002     // that has already been written out to disk. So for on-disk antijoin, we
1003     // cannot output any rows until we have started reading from chunk files.
1004     //
1005     // On-disk outer join is a bit more tricky; we can only output a
1006     // NULL-complemented row if the probe row did not match anything from the
1007     // build input while doing any of the probe phases. We can have multiple
1008     // probe phases if e.g. a build chunk file is too big to fit in memory; we
1009     // would have to read the build chunk in multiple smaller chunks while doing
1010     // a probe phase for each of these smaller chunks. To keep track of this,
1011     // each probe row is prefixed with a match flag in the chunk files.
1012     bool return_null_complemented_row = false;
1013     if ((on_disk_hash_join() && m_current_chunk == -1) ||
1014         m_write_to_probe_row_saving) {
1015       return_null_complemented_row = false;
1016     } else if (m_join_type == JoinType::ANTI) {
1017       return_null_complemented_row = true;
1018     } else if (m_join_type == JoinType::OUTER &&
1019                m_state == State::READING_FIRST_ROW_FROM_HASH_TABLE &&
1020                !m_probe_row_match_flag) {
1021       return_null_complemented_row = true;
1022     }
1023 
1024     SetReadingProbeRowState();
1025 
1026     if (return_null_complemented_row) {
1027       m_build_input->SetNullRowFlag(true);
1028       return 0;
1029     }
1030     return -1;
1031   }
1032 
1033   // We have a matching row ready.
1034   switch (m_join_type) {
1035     case JoinType::SEMI:
1036       // Semijoin should return the first matching row, and then go to the next
1037       // row from the probe input.
1038       SetReadingProbeRowState();
1039       break;
1040     case JoinType::ANTI:
1041       // Antijoin should immediately go to the next row from the probe input,
1042       // without returning the matching row.
1043       SetReadingProbeRowState();
1044       return -1;  // Read the next row.
1045     case JoinType::OUTER:
1046     case JoinType::INNER:
1047       // Inner join should return all matching rows from the hash table before
1048       // moving to the next row from the probe input.
1049       m_state = State::READING_FROM_HASH_TABLE;
1050       break;
1051   }
1052 
1053   ++m_hash_map_iterator;
1054   return 0;
1055 }
1056 
Read()1057 int HashJoinIterator::Read() {
1058   for (;;) {
1059     if (thd()->killed) {  // Aborted by user.
1060       thd()->send_kill_message();
1061       return 1;
1062     }
1063 
1064     switch (m_state) {
1065       case State::LOADING_NEXT_CHUNK_PAIR:
1066         if (ReadNextHashJoinChunk()) {
1067           return 1;
1068         }
1069         break;
1070       case State::READING_ROW_FROM_PROBE_ITERATOR:
1071         if (ReadRowFromProbeIterator()) {
1072           return 1;
1073         }
1074         break;
1075       case State::READING_ROW_FROM_PROBE_CHUNK_FILE:
1076         if (ReadRowFromProbeChunkFile()) {
1077           return 1;
1078         }
1079         break;
1080       case State::READING_ROW_FROM_PROBE_ROW_SAVING_FILE:
1081         if (ReadRowFromProbeRowSavingFile()) {
1082           return 1;
1083         }
1084         break;
1085       case State::READING_FIRST_ROW_FROM_HASH_TABLE:
1086       case State::READING_FROM_HASH_TABLE: {
1087         const int res = ReadNextJoinedRowFromHashTable();
1088         if (res == 0) {
1089           // A joined row is ready, so send it to the client.
1090           return 0;
1091         }
1092 
1093         if (res == -1) {
1094           // No more matching rows in the hash table, or antijoin found a
1095           // matching row. Read a new row from the probe input.
1096           continue;
1097         }
1098 
1099         // An error occured, so abort the join.
1100         DBUG_ASSERT(res == 1);
1101         return res;
1102       }
1103       case State::END_OF_ROWS:
1104         return -1;
1105     }
1106   }
1107 
1108   // Unreachable.
1109   DBUG_ASSERT(false);
1110   return 1;
1111 }
1112 
DebugString() const1113 std::vector<std::string> HashJoinIterator::DebugString() const {
1114   std::string ret;
1115 
1116   switch (m_join_type) {
1117     case JoinType::INNER:
1118       ret += "Inner hash join";
1119       break;
1120     case JoinType::SEMI:
1121       ret += "Hash semijoin";
1122       break;
1123     case JoinType::ANTI:
1124       ret += "Hash antijoin";
1125       break;
1126     case JoinType::OUTER:
1127       ret += "Left hash join";
1128       break;
1129     default:
1130       DBUG_ASSERT(false);
1131       ret += "not implemented";
1132       break;
1133   }
1134 
1135   if (m_join_conditions.empty()) {
1136     ret.append(" (no condition)");
1137   } else {
1138     for (const HashJoinCondition &join_condition : m_join_conditions) {
1139       if (join_condition.join_condition() !=
1140           m_join_conditions[0].join_condition()) {
1141         ret.push_back(',');
1142       }
1143       if (!join_condition.store_full_sort_key()) {
1144         ret.append(" (<hash>(" + ItemToString(join_condition.left_extractor()) +
1145                    ")=<hash>(" +
1146                    ItemToString(join_condition.right_extractor()) + "))");
1147       } else {
1148         ret.append(" " + ItemToString(join_condition.join_condition()));
1149       }
1150     }
1151   }
1152 
1153   if (m_extra_condition != nullptr) {
1154     ret.append(", extra conditions: " + ItemToString(m_extra_condition));
1155   }
1156 
1157   return {ret};
1158 }
1159 
InitWritingToProbeRowSavingFile()1160 bool HashJoinIterator::InitWritingToProbeRowSavingFile() {
1161   m_write_to_probe_row_saving = true;
1162   return m_probe_row_saving_write_file.Init(m_probe_input_tables,
1163                                             m_join_type == JoinType::OUTER);
1164 }
1165 
InitReadingFromProbeRowSavingFile()1166 bool HashJoinIterator::InitReadingFromProbeRowSavingFile() {
1167   m_probe_row_saving_read_file = std::move(m_probe_row_saving_write_file);
1168   m_probe_row_saving_read_file_current_row = 0;
1169   m_read_from_probe_row_saving = true;
1170   return m_probe_row_saving_read_file.Rewind();
1171 }
1172 
SetReadingProbeRowState()1173 void HashJoinIterator::SetReadingProbeRowState() {
1174   switch (m_hash_join_type) {
1175     case HashJoinType::IN_MEMORY:
1176       m_state = State::READING_ROW_FROM_PROBE_ITERATOR;
1177       break;
1178     case HashJoinType::IN_MEMORY_WITH_HASH_TABLE_REFILL:
1179       if (m_join_type == JoinType::INNER) {
1180         // As inner joins does not need probe row match flags, probe row saving
1181         // will never be activated for inner joins.
1182         m_state = State::READING_ROW_FROM_PROBE_ITERATOR;
1183       } else {
1184         m_state = State::READING_ROW_FROM_PROBE_ROW_SAVING_FILE;
1185       }
1186       break;
1187     case HashJoinType::SPILL_TO_DISK:
1188       if (m_read_from_probe_row_saving) {
1189         // Probe row saving may be activated if a build chunk did not fit in
1190         // memory.
1191         m_state = State::READING_ROW_FROM_PROBE_ROW_SAVING_FILE;
1192         return;
1193       }
1194       m_state = State::READING_ROW_FROM_PROBE_CHUNK_FILE;
1195       break;
1196   }
1197 }
1198