1 /* Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
22 
23 #include "sql/bka_iterator.h"
24 
25 #include <math.h>
26 #include <string.h>
27 #include <sys/types.h>
28 #include <algorithm>
29 #include <iterator>
30 #include <new>
31 #include <string>
32 #include <utility>
33 #include <vector>
34 
35 #include "my_alloc.h"
36 #include "my_base.h"
37 #include "my_dbug.h"
38 #include "my_inttypes.h"
39 #include "my_sys.h"
40 #include "mysqld_error.h"
41 #include "sql/handler.h"
42 #include "sql/hash_join_buffer.h"
43 #include "sql/hash_join_iterator.h"
44 #include "sql/item.h"
45 #include "sql/key.h"
46 #include "sql/psi_memory_key.h"
47 #include "sql/row_iterator.h"
48 #include "sql/sql_executor.h"
49 #include "sql/sql_opt_exec_shared.h"
50 #include "sql/table.h"
51 
52 using hash_join_buffer::BufferRow;
53 using hash_join_buffer::TableCollection;
54 using std::string;
55 using std::vector;
56 
NeedMatchFlags(JoinType join_type)57 static bool NeedMatchFlags(JoinType join_type) {
58   return join_type == JoinType::OUTER || join_type == JoinType::SEMI ||
59          join_type == JoinType::ANTI;
60 }
61 
BytesNeededForMatchFlags(size_t rows)62 static size_t BytesNeededForMatchFlags(size_t rows) {
63   // One bit per row.
64   return (rows + 7) / 8;
65 }
66 
BKAIterator(THD * thd,JOIN * join,unique_ptr_destroy_only<RowIterator> outer_input,qep_tab_map outer_input_tables,unique_ptr_destroy_only<RowIterator> inner_input,size_t max_memory_available,size_t mrr_bytes_needed_for_single_inner_row,float expected_inner_rows_per_outer_row,MultiRangeRowIterator * mrr_iterator,JoinType join_type)67 BKAIterator::BKAIterator(THD *thd, JOIN *join,
68                          unique_ptr_destroy_only<RowIterator> outer_input,
69                          qep_tab_map outer_input_tables,
70                          unique_ptr_destroy_only<RowIterator> inner_input,
71                          size_t max_memory_available,
72                          size_t mrr_bytes_needed_for_single_inner_row,
73                          float expected_inner_rows_per_outer_row,
74                          MultiRangeRowIterator *mrr_iterator,
75                          JoinType join_type)
76     : RowIterator(thd),
77       m_outer_input(move(outer_input)),
78       m_inner_input(move(inner_input)),
79       m_mem_root(key_memory_hash_join, 16384 /* 16 kB */),
80       m_rows(&m_mem_root),
81       m_outer_input_tables(join, outer_input_tables),
82       m_max_memory_available(max_memory_available),
83       m_mrr_bytes_needed_for_single_inner_row(
84           mrr_bytes_needed_for_single_inner_row),
85       m_mrr_iterator(mrr_iterator),
86       m_join_type(join_type) {
87   DBUG_ASSERT(m_outer_input != nullptr);
88   DBUG_ASSERT(m_inner_input != nullptr);
89 
90   m_mrr_bytes_needed_per_row =
91       lrint(mrr_bytes_needed_for_single_inner_row *
92             std::max(expected_inner_rows_per_outer_row, 1.0f));
93 
94   // Mark that this iterator will provide the row ID, so that iterators above
95   // this one does not call position(). See QEP_TAB::rowid_status for more
96   // details.
97   for (const hash_join_buffer::Table &it : m_outer_input_tables.tables()) {
98     if (it.qep_tab->rowid_status == NEED_TO_CALL_POSITION_FOR_ROWID) {
99       it.qep_tab->rowid_status = ROWID_PROVIDED_BY_ITERATOR_READ_CALL;
100     }
101   }
102 
103   m_mrr_iterator->set_outer_input_tables(join, outer_input_tables);
104   m_mrr_iterator->set_join_type(join_type);
105 }
106 
Init()107 bool BKAIterator::Init() {
108   if (!m_outer_input_tables.has_blob_column()) {
109     size_t upper_row_size =
110         hash_join_buffer::ComputeRowSizeUpperBound(m_outer_input_tables);
111     if (m_outer_row_buffer.reserve(upper_row_size)) {
112       my_error(ER_OUTOFMEMORY, MYF(0), upper_row_size);
113       return true;
114     }
115   }
116 
117   BeginNewBatch();
118   m_end_of_outer_rows = false;
119   m_has_row_from_previous_batch = false;
120 
121   return m_outer_input->Init();
122 }
123 
BeginNewBatch()124 void BKAIterator::BeginNewBatch() {
125   m_mem_root.ClearForReuse();
126   new (&m_rows) Mem_root_array<BufferRow>(&m_mem_root);
127   m_bytes_used = 0;
128   m_state = State::NEED_OUTER_ROWS;
129 }
130 
ReadOuterRows()131 int BKAIterator::ReadOuterRows() {
132   for (;;) {
133     if (m_has_row_from_previous_batch) {
134       // The outer row will be in m_outer_row_buffer already. Load it back
135       // into the global table buffers; MultiRangeRowIterator has loaded other
136       // rows into them, and in case we are reading from a join, Read() may
137       // not update all of the tables.
138       m_has_row_from_previous_batch = false;
139       hash_join_buffer::LoadIntoTableBuffers(
140           m_outer_input_tables,
141           hash_join_buffer::Key(
142               pointer_cast<const uchar *>(m_outer_row_buffer.ptr()),
143               m_outer_row_buffer.length()));
144     } else {
145       int result = m_outer_input->Read();
146       if (result == 1) {
147         // Error.
148         return 1;
149       }
150       if (result == -1) {
151         // EOF.
152         m_end_of_outer_rows = true;
153         break;
154       }
155       RequestRowId(m_outer_input_tables.tables());
156 
157       // Save the contents of all columns marked for reading.
158       if (StoreFromTableBuffers(m_outer_input_tables, &m_outer_row_buffer)) {
159         return 1;
160       }
161     }
162 
163     // See if we have room for this row, and the associated number of MRR
164     // rows, without going over our total RAM budget. (We ignore the budget
165     // if the buffer is empty; at least a single row must be allowed at all
166     // times.)
167     const size_t row_size = m_outer_row_buffer.length();
168     size_t total_bytes_needed_after_this_row =
169         m_bytes_used + row_size +
170         (m_mrr_bytes_needed_per_row + sizeof(m_rows[0])) * (m_rows.size() + 1);
171 
172     if (NeedMatchFlags(m_join_type)) {
173       total_bytes_needed_after_this_row +=
174           BytesNeededForMatchFlags(m_rows.size() + 1);
175     }
176 
177     if (!m_rows.empty() &&
178         total_bytes_needed_after_this_row > m_max_memory_available) {
179       // Out of memory, so end the batch and send it.
180       // This row will be dealt with in the next batch.
181       m_has_row_from_previous_batch = true;
182       break;
183     }
184 
185     uchar *row = m_mem_root.ArrayAlloc<uchar>(row_size);
186     if (row == nullptr) {
187       return 1;
188     }
189     memcpy(row, m_outer_row_buffer.ptr(), row_size);
190 
191     m_rows.push_back(BufferRow(row, row_size));
192     m_bytes_used += row_size;
193   }
194 
195   // If we had no rows at all, we're done.
196   if (m_rows.empty()) {
197     DBUG_ASSERT(!m_has_row_from_previous_batch);
198     m_state = State::END_OF_ROWS;
199     return -1;
200   }
201 
202   // Figure out how much RAM we need to allocate for the MRR row buffer,
203   // given to the handler for holding inner rows.
204   size_t mrr_buffer_size = m_mrr_bytes_needed_per_row * m_rows.size();
205   if (m_bytes_used + mrr_buffer_size >= m_max_memory_available) {
206     // Even if it will take us over budget, DS-MRR needs space for at least
207     // one row to work.
208     DBUG_ASSERT(m_rows.size() ==
209                 1);  // Otherwise, we would have stopped reading rows earlier.
210     if (m_bytes_used + m_mrr_bytes_needed_for_single_inner_row >=
211         m_max_memory_available) {
212       mrr_buffer_size = m_mrr_bytes_needed_for_single_inner_row;
213     } else {
214       mrr_buffer_size = m_max_memory_available - m_bytes_used;
215     }
216   } else {
217     // We're under budget. Heuristically, increase it to get some
218     // extra headroom if the estimate is pessimistic.
219     mrr_buffer_size = std::min(mrr_buffer_size * 2 + 16384,
220                                m_max_memory_available - m_bytes_used);
221   }
222   DBUG_ASSERT(mrr_buffer_size >= m_mrr_bytes_needed_for_single_inner_row);
223 
224   // Ask the MRR iterator to do the actual read.
225   m_mrr_iterator->set_rows(m_rows.begin(), m_rows.end());
226   m_mrr_iterator->set_mrr_buffer(m_mem_root.ArrayAlloc<uchar>(mrr_buffer_size),
227                                  mrr_buffer_size);
228   if (NeedMatchFlags(m_join_type)) {
229     const size_t bytes_needed = BytesNeededForMatchFlags(m_rows.size());
230     m_mrr_iterator->set_match_flag_buffer(
231         m_mem_root.ArrayAlloc<uchar>(bytes_needed));
232   }
233   if (m_inner_input->Init()) {
234     return 1;
235   }
236 
237   // Probe the rows we've got using MRR.
238   m_state = State::RETURNING_JOINED_ROWS;
239   m_mrr_iterator->SetNullRowFlag(false);
240   return 0;
241 }
242 
BatchFinished()243 void BKAIterator::BatchFinished() {
244   // End of joined rows; start reading the next batch if there are
245   // more outer rows.
246   if (m_end_of_outer_rows) {
247     m_state = State::END_OF_ROWS;
248   } else {
249     BeginNewBatch();
250     DBUG_ASSERT(m_state == State::NEED_OUTER_ROWS);
251   }
252 }
253 
MakeNullComplementedRow()254 int BKAIterator::MakeNullComplementedRow() {
255   // Find the next row that hasn't been matched to anything yet.
256   while (m_current_pos != m_rows.end()) {
257     if (m_mrr_iterator->RowHasBeenRead(m_current_pos)) {
258       ++m_current_pos;
259     } else {
260       // Return a NULL-complemented row. (Our table already has the NULL flag
261       // set.)
262       hash_join_buffer::LoadIntoTableBuffers(m_outer_input_tables,
263                                              m_current_pos->data());
264       ++m_current_pos;
265       return 0;
266     }
267   }
268 
269   // No more NULL-complemented rows to return.
270   m_mrr_iterator->SetNullRowFlag(false);
271   return -1;
272 }
273 
Read()274 int BKAIterator::Read() {
275   for (;;) {  // Termination condition within loop.
276     switch (m_state) {
277       case State::END_OF_ROWS:
278         return -1;
279       case State::NEED_OUTER_ROWS: {
280         int err = ReadOuterRows();
281         if (err != 0) {
282           return err;
283         }
284         break;
285       }
286       case State::RETURNING_JOINED_ROWS: {
287         int err = m_inner_input->Read();
288         if (err != -1) {
289           if (err == 0) {
290             m_mrr_iterator->MarkLastRowAsRead();
291             if (m_join_type == JoinType::ANTI) {
292               break;
293             }
294           }
295 
296           // A row or an error; pass it through (unless we are an antijoin).
297           return err;
298         }
299 
300         // No more joined rows in this batch. Go to the next batch -- but
301         // if we're an outer join or antijoin, first create NULL-complemented
302         // rows for the ones in this batch that we didn't match to anything.
303         if (m_join_type == JoinType::OUTER || m_join_type == JoinType::ANTI) {
304           m_state = State::RETURNING_NULL_COMPLEMENTED_ROWS;
305           m_current_pos = m_rows.begin();
306           m_mrr_iterator->SetNullRowFlag(true);
307         } else {
308           BatchFinished();
309           break;
310         }
311       }
312       // Fall through.
313       case State::RETURNING_NULL_COMPLEMENTED_ROWS: {
314         int err = MakeNullComplementedRow();
315         if (err != -1) {
316           return err;
317         }
318 
319         BatchFinished();
320         break;
321       }
322     }
323   }
324 }
325 
DebugString() const326 vector<string> BKAIterator::DebugString() const {
327   switch (m_join_type) {
328     case JoinType::INNER:
329       return {"Batched key access inner join"};
330     case JoinType::SEMI:
331       return {"Batched key access semijoin"};
332     case JoinType::OUTER:
333       return {"Batched key access left join"};
334     case JoinType::ANTI:
335       return {"Batched key access antijoin"};
336     default:
337       DBUG_ASSERT(false);
338       return {"Batched key access unknown join"};
339   }
340 }
341 
MultiRangeRowIterator(THD * thd,Item * cache_idx_cond,TABLE * table,bool keep_current_rowid,TABLE_REF * ref,int mrr_flags)342 MultiRangeRowIterator::MultiRangeRowIterator(THD *thd, Item *cache_idx_cond,
343                                              TABLE *table,
344                                              bool keep_current_rowid,
345                                              TABLE_REF *ref, int mrr_flags)
346     : TableRowIterator(thd, table),
347       m_cache_idx_cond(cache_idx_cond),
348       m_keep_current_rowid(keep_current_rowid),
349       m_table(table),
350       m_file(table->file),
351       m_ref(ref),
352       m_mrr_flags(mrr_flags) {}
353 
set_outer_input_tables(JOIN * join,qep_tab_map outer_input_tables)354 void MultiRangeRowIterator::set_outer_input_tables(
355     JOIN *join, qep_tab_map outer_input_tables) {
356   m_outer_input_tables = TableCollection(join, outer_input_tables);
357 }
358 
Init()359 bool MultiRangeRowIterator::Init() {
360   /*
361     Prepare to iterate over keys from the join buffer and to get
362     matching candidates obtained with MRR handler functions.
363    */
364   if (!m_file->inited) {
365     const int error = m_file->ha_index_init(m_ref->key, true);
366     if (error) {
367       m_file->print_error(error, MYF(0));
368       return error;
369     }
370   }
371   RANGE_SEQ_IF seq_funcs = {MultiRangeRowIterator::MrrInitCallbackThunk,
372                             MultiRangeRowIterator::MrrNextCallbackThunk,
373                             nullptr, nullptr};
374   if (m_cache_idx_cond != nullptr) {
375     seq_funcs.skip_index_tuple =
376         MultiRangeRowIterator::MrrSkipIndexTupleCallbackThunk;
377   }
378   if (m_join_type == JoinType::SEMI || m_join_type == JoinType::ANTI) {
379     seq_funcs.skip_record = MultiRangeRowIterator::MrrSkipRecordCallbackThunk;
380   }
381   if (m_match_flag_buffer != nullptr) {
382     DBUG_ASSERT(NeedMatchFlags(m_join_type));
383 
384     // Reset all the match flags.
385     memset(m_match_flag_buffer, 0,
386            BytesNeededForMatchFlags(std::distance(m_begin, m_end)));
387   } else {
388     DBUG_ASSERT(!NeedMatchFlags(m_join_type));
389   }
390 
391   /**
392     We don't send a set of rows directly to MRR; instead, we give it a set
393     of function pointers to iterate over the rows, and a pointer to ourselves.
394     The handler will call our callbacks as follows:
395 
396      1. MrrInitCallback at the start, to initialize iteration.
397      2. MrrNextCallback is called to yield ranges to scan, until it returns 1.
398      3. If we have dependent index conditions (see the comment on
399         m_cache_idx_cond), MrrSkipIndexTuple will be called back for each
400         range that returned an inner row, and can choose to discard the row
401         there and then if it doesn't match the dependent index condition.
402    */
403   return m_file->multi_range_read_init(&seq_funcs, this,
404                                        std::distance(m_begin, m_end),
405                                        m_mrr_flags, &m_mrr_buffer);
406 }
407 
MrrInitCallback(uint,uint)408 range_seq_t MultiRangeRowIterator::MrrInitCallback(uint, uint) {
409   m_current_pos = m_begin;
410   return this;
411 }
412 
MrrNextCallback(KEY_MULTI_RANGE * range)413 uint MultiRangeRowIterator::MrrNextCallback(KEY_MULTI_RANGE *range) {
414   // Load the next row from the buffer, if there is one.
415   //
416   // NULL values will never match in a inner join. The optimizer will often
417   // set up a NULL filter for inner joins, but not in all cases, so we must
418   // skip such rows by checking impossible_null_ref(). Thus, we iterate
419   // until we have a row that is not NULL-filtered. The typical case is
420   // that this happens immediately.
421   //
422   // TODO(sgunders): Consider whether it would be possible to put this check
423   // before putting the rows into the buffer. That would require evaluating
424   // any items twice, though.
425   for (;;) {
426     if (m_current_pos == m_end) {
427       return 1;
428     }
429 
430     hash_join_buffer::LoadIntoTableBuffers(m_outer_input_tables,
431                                            *m_current_pos);
432 
433     construct_lookup_ref(thd(), table(), m_ref);
434     if (!m_ref->impossible_null_ref()) {
435       break;
436     }
437     ++m_current_pos;
438   }
439 
440   // Set up a range consisting of a single key, so the only difference
441   // between start and end is the flags. They signify that the range starts
442   // at the row in question, and ends right after it (exclusive).
443 
444   range->range_flag = EQ_RANGE;
445   range->ptr = const_cast<char *>(pointer_cast<const char *>(m_current_pos));
446 
447   range->start_key.key = m_ref->key_buff;
448   range->start_key.keypart_map = (1 << m_ref->key_parts) - 1;  // All keyparts.
449   range->start_key.length = m_ref->key_length;
450   range->start_key.flag = HA_READ_KEY_EXACT;
451 
452   range->end_key = range->start_key;
453   range->end_key.flag = HA_READ_AFTER_KEY;
454 
455   ++m_current_pos;
456   return 0;
457 }
458 
MrrSkipIndexTuple(char * range_info)459 bool MultiRangeRowIterator::MrrSkipIndexTuple(char *range_info) {
460   BufferRow *rec_ptr = pointer_cast<BufferRow *>(range_info);
461 
462   // The index condition depends on fields from the outer tables (or we would
463   // not be called), so we need to load the relevant rows before checking it.
464   // range_info tells us which outer row we are talking about; it corresponds to
465   // range->ptr in MrrNextCallback(), and points to the serialized outer row in
466   // BKAIterator's m_row array.
467   hash_join_buffer::LoadIntoTableBuffers(m_outer_input_tables, rec_ptr->data());
468 
469   // Skip this tuple if the index condition is false.
470   return !m_cache_idx_cond->val_int();
471 }
472 
MrrSkipRecord(char * range_info)473 bool MultiRangeRowIterator::MrrSkipRecord(char *range_info) {
474   BufferRow *rec_ptr = pointer_cast<BufferRow *>(range_info);
475   return RowHasBeenRead(rec_ptr);
476 }
477 
Read()478 int MultiRangeRowIterator::Read() {
479   // Read a row from the MRR buffer. rec_ptr tells us which outer row
480   // this corresponds to; it corresponds to range->ptr in MrrNextCallback(),
481   // and points to the serialized outer row in BKAIterator's m_row array.
482   BufferRow *rec_ptr = nullptr;
483   do {
484     int error =
485         m_file->ha_multi_range_read_next(pointer_cast<char **>(&rec_ptr));
486     if (error != 0) {
487       return HandleError(error);
488     }
489 
490     // NDB never calls mrr_funcs.skip_record(), so we need to recheck here.
491     // See bug #30594210.
492   } while (m_join_type == JoinType::SEMI && RowHasBeenRead(rec_ptr));
493 
494   hash_join_buffer::LoadIntoTableBuffers(m_outer_input_tables, rec_ptr->data());
495 
496   m_last_row_returned = rec_ptr;
497 
498   if (m_keep_current_rowid) {
499     m_file->position(m_table->record[0]);
500   }
501 
502   return 0;
503 }
504 
DebugString() const505 vector<string> MultiRangeRowIterator::DebugString() const {
506   const KEY *key = &table()->key_info[m_ref->key];
507   string str = string("Multi-range index lookup on ") + table()->alias +
508                " using " + key->name + " (" +
509                RefToString(*m_ref, key, /*include_nulls=*/false) + ")";
510   if (table()->file->pushed_idx_cond != nullptr) {
511     str += ", with index condition: " +
512            ItemToString(table()->file->pushed_idx_cond);
513   }
514   if (m_cache_idx_cond != nullptr) {
515     str +=
516         ", with dependent index condition: " + ItemToString(m_cache_idx_cond);
517   }
518   str += table()->file->explain_extra();
519   return {str};
520 }
521