1 /* Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software
21 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
22
23 #include "sql/bka_iterator.h"
24
25 #include <math.h>
26 #include <string.h>
27 #include <sys/types.h>
28 #include <algorithm>
29 #include <iterator>
30 #include <new>
31 #include <string>
32 #include <utility>
33 #include <vector>
34
35 #include "my_alloc.h"
36 #include "my_base.h"
37 #include "my_dbug.h"
38 #include "my_inttypes.h"
39 #include "my_sys.h"
40 #include "mysqld_error.h"
41 #include "sql/handler.h"
42 #include "sql/hash_join_buffer.h"
43 #include "sql/hash_join_iterator.h"
44 #include "sql/item.h"
45 #include "sql/key.h"
46 #include "sql/psi_memory_key.h"
47 #include "sql/row_iterator.h"
48 #include "sql/sql_executor.h"
49 #include "sql/sql_opt_exec_shared.h"
50 #include "sql/table.h"
51
52 using hash_join_buffer::BufferRow;
53 using hash_join_buffer::TableCollection;
54 using std::string;
55 using std::vector;
56
NeedMatchFlags(JoinType join_type)57 static bool NeedMatchFlags(JoinType join_type) {
58 return join_type == JoinType::OUTER || join_type == JoinType::SEMI ||
59 join_type == JoinType::ANTI;
60 }
61
BytesNeededForMatchFlags(size_t rows)62 static size_t BytesNeededForMatchFlags(size_t rows) {
63 // One bit per row.
64 return (rows + 7) / 8;
65 }
66
BKAIterator(THD * thd,JOIN * join,unique_ptr_destroy_only<RowIterator> outer_input,qep_tab_map outer_input_tables,unique_ptr_destroy_only<RowIterator> inner_input,size_t max_memory_available,size_t mrr_bytes_needed_for_single_inner_row,float expected_inner_rows_per_outer_row,MultiRangeRowIterator * mrr_iterator,JoinType join_type)67 BKAIterator::BKAIterator(THD *thd, JOIN *join,
68 unique_ptr_destroy_only<RowIterator> outer_input,
69 qep_tab_map outer_input_tables,
70 unique_ptr_destroy_only<RowIterator> inner_input,
71 size_t max_memory_available,
72 size_t mrr_bytes_needed_for_single_inner_row,
73 float expected_inner_rows_per_outer_row,
74 MultiRangeRowIterator *mrr_iterator,
75 JoinType join_type)
76 : RowIterator(thd),
77 m_outer_input(move(outer_input)),
78 m_inner_input(move(inner_input)),
79 m_mem_root(key_memory_hash_join, 16384 /* 16 kB */),
80 m_rows(&m_mem_root),
81 m_outer_input_tables(join, outer_input_tables),
82 m_max_memory_available(max_memory_available),
83 m_mrr_bytes_needed_for_single_inner_row(
84 mrr_bytes_needed_for_single_inner_row),
85 m_mrr_iterator(mrr_iterator),
86 m_join_type(join_type) {
87 DBUG_ASSERT(m_outer_input != nullptr);
88 DBUG_ASSERT(m_inner_input != nullptr);
89
90 m_mrr_bytes_needed_per_row =
91 lrint(mrr_bytes_needed_for_single_inner_row *
92 std::max(expected_inner_rows_per_outer_row, 1.0f));
93
94 // Mark that this iterator will provide the row ID, so that iterators above
95 // this one does not call position(). See QEP_TAB::rowid_status for more
96 // details.
97 for (const hash_join_buffer::Table &it : m_outer_input_tables.tables()) {
98 if (it.qep_tab->rowid_status == NEED_TO_CALL_POSITION_FOR_ROWID) {
99 it.qep_tab->rowid_status = ROWID_PROVIDED_BY_ITERATOR_READ_CALL;
100 }
101 }
102
103 m_mrr_iterator->set_outer_input_tables(join, outer_input_tables);
104 m_mrr_iterator->set_join_type(join_type);
105 }
106
Init()107 bool BKAIterator::Init() {
108 if (!m_outer_input_tables.has_blob_column()) {
109 size_t upper_row_size =
110 hash_join_buffer::ComputeRowSizeUpperBound(m_outer_input_tables);
111 if (m_outer_row_buffer.reserve(upper_row_size)) {
112 my_error(ER_OUTOFMEMORY, MYF(0), upper_row_size);
113 return true;
114 }
115 }
116
117 BeginNewBatch();
118 m_end_of_outer_rows = false;
119 m_has_row_from_previous_batch = false;
120
121 return m_outer_input->Init();
122 }
123
BeginNewBatch()124 void BKAIterator::BeginNewBatch() {
125 m_mem_root.ClearForReuse();
126 new (&m_rows) Mem_root_array<BufferRow>(&m_mem_root);
127 m_bytes_used = 0;
128 m_state = State::NEED_OUTER_ROWS;
129 }
130
ReadOuterRows()131 int BKAIterator::ReadOuterRows() {
132 for (;;) {
133 if (m_has_row_from_previous_batch) {
134 // The outer row will be in m_outer_row_buffer already. Load it back
135 // into the global table buffers; MultiRangeRowIterator has loaded other
136 // rows into them, and in case we are reading from a join, Read() may
137 // not update all of the tables.
138 m_has_row_from_previous_batch = false;
139 hash_join_buffer::LoadIntoTableBuffers(
140 m_outer_input_tables,
141 hash_join_buffer::Key(
142 pointer_cast<const uchar *>(m_outer_row_buffer.ptr()),
143 m_outer_row_buffer.length()));
144 } else {
145 int result = m_outer_input->Read();
146 if (result == 1) {
147 // Error.
148 return 1;
149 }
150 if (result == -1) {
151 // EOF.
152 m_end_of_outer_rows = true;
153 break;
154 }
155 RequestRowId(m_outer_input_tables.tables());
156
157 // Save the contents of all columns marked for reading.
158 if (StoreFromTableBuffers(m_outer_input_tables, &m_outer_row_buffer)) {
159 return 1;
160 }
161 }
162
163 // See if we have room for this row, and the associated number of MRR
164 // rows, without going over our total RAM budget. (We ignore the budget
165 // if the buffer is empty; at least a single row must be allowed at all
166 // times.)
167 const size_t row_size = m_outer_row_buffer.length();
168 size_t total_bytes_needed_after_this_row =
169 m_bytes_used + row_size +
170 (m_mrr_bytes_needed_per_row + sizeof(m_rows[0])) * (m_rows.size() + 1);
171
172 if (NeedMatchFlags(m_join_type)) {
173 total_bytes_needed_after_this_row +=
174 BytesNeededForMatchFlags(m_rows.size() + 1);
175 }
176
177 if (!m_rows.empty() &&
178 total_bytes_needed_after_this_row > m_max_memory_available) {
179 // Out of memory, so end the batch and send it.
180 // This row will be dealt with in the next batch.
181 m_has_row_from_previous_batch = true;
182 break;
183 }
184
185 uchar *row = m_mem_root.ArrayAlloc<uchar>(row_size);
186 if (row == nullptr) {
187 return 1;
188 }
189 memcpy(row, m_outer_row_buffer.ptr(), row_size);
190
191 m_rows.push_back(BufferRow(row, row_size));
192 m_bytes_used += row_size;
193 }
194
195 // If we had no rows at all, we're done.
196 if (m_rows.empty()) {
197 DBUG_ASSERT(!m_has_row_from_previous_batch);
198 m_state = State::END_OF_ROWS;
199 return -1;
200 }
201
202 // Figure out how much RAM we need to allocate for the MRR row buffer,
203 // given to the handler for holding inner rows.
204 size_t mrr_buffer_size = m_mrr_bytes_needed_per_row * m_rows.size();
205 if (m_bytes_used + mrr_buffer_size >= m_max_memory_available) {
206 // Even if it will take us over budget, DS-MRR needs space for at least
207 // one row to work.
208 DBUG_ASSERT(m_rows.size() ==
209 1); // Otherwise, we would have stopped reading rows earlier.
210 if (m_bytes_used + m_mrr_bytes_needed_for_single_inner_row >=
211 m_max_memory_available) {
212 mrr_buffer_size = m_mrr_bytes_needed_for_single_inner_row;
213 } else {
214 mrr_buffer_size = m_max_memory_available - m_bytes_used;
215 }
216 } else {
217 // We're under budget. Heuristically, increase it to get some
218 // extra headroom if the estimate is pessimistic.
219 mrr_buffer_size = std::min(mrr_buffer_size * 2 + 16384,
220 m_max_memory_available - m_bytes_used);
221 }
222 DBUG_ASSERT(mrr_buffer_size >= m_mrr_bytes_needed_for_single_inner_row);
223
224 // Ask the MRR iterator to do the actual read.
225 m_mrr_iterator->set_rows(m_rows.begin(), m_rows.end());
226 m_mrr_iterator->set_mrr_buffer(m_mem_root.ArrayAlloc<uchar>(mrr_buffer_size),
227 mrr_buffer_size);
228 if (NeedMatchFlags(m_join_type)) {
229 const size_t bytes_needed = BytesNeededForMatchFlags(m_rows.size());
230 m_mrr_iterator->set_match_flag_buffer(
231 m_mem_root.ArrayAlloc<uchar>(bytes_needed));
232 }
233 if (m_inner_input->Init()) {
234 return 1;
235 }
236
237 // Probe the rows we've got using MRR.
238 m_state = State::RETURNING_JOINED_ROWS;
239 m_mrr_iterator->SetNullRowFlag(false);
240 return 0;
241 }
242
BatchFinished()243 void BKAIterator::BatchFinished() {
244 // End of joined rows; start reading the next batch if there are
245 // more outer rows.
246 if (m_end_of_outer_rows) {
247 m_state = State::END_OF_ROWS;
248 } else {
249 BeginNewBatch();
250 DBUG_ASSERT(m_state == State::NEED_OUTER_ROWS);
251 }
252 }
253
MakeNullComplementedRow()254 int BKAIterator::MakeNullComplementedRow() {
255 // Find the next row that hasn't been matched to anything yet.
256 while (m_current_pos != m_rows.end()) {
257 if (m_mrr_iterator->RowHasBeenRead(m_current_pos)) {
258 ++m_current_pos;
259 } else {
260 // Return a NULL-complemented row. (Our table already has the NULL flag
261 // set.)
262 hash_join_buffer::LoadIntoTableBuffers(m_outer_input_tables,
263 m_current_pos->data());
264 ++m_current_pos;
265 return 0;
266 }
267 }
268
269 // No more NULL-complemented rows to return.
270 m_mrr_iterator->SetNullRowFlag(false);
271 return -1;
272 }
273
Read()274 int BKAIterator::Read() {
275 for (;;) { // Termination condition within loop.
276 switch (m_state) {
277 case State::END_OF_ROWS:
278 return -1;
279 case State::NEED_OUTER_ROWS: {
280 int err = ReadOuterRows();
281 if (err != 0) {
282 return err;
283 }
284 break;
285 }
286 case State::RETURNING_JOINED_ROWS: {
287 int err = m_inner_input->Read();
288 if (err != -1) {
289 if (err == 0) {
290 m_mrr_iterator->MarkLastRowAsRead();
291 if (m_join_type == JoinType::ANTI) {
292 break;
293 }
294 }
295
296 // A row or an error; pass it through (unless we are an antijoin).
297 return err;
298 }
299
300 // No more joined rows in this batch. Go to the next batch -- but
301 // if we're an outer join or antijoin, first create NULL-complemented
302 // rows for the ones in this batch that we didn't match to anything.
303 if (m_join_type == JoinType::OUTER || m_join_type == JoinType::ANTI) {
304 m_state = State::RETURNING_NULL_COMPLEMENTED_ROWS;
305 m_current_pos = m_rows.begin();
306 m_mrr_iterator->SetNullRowFlag(true);
307 } else {
308 BatchFinished();
309 break;
310 }
311 }
312 // Fall through.
313 case State::RETURNING_NULL_COMPLEMENTED_ROWS: {
314 int err = MakeNullComplementedRow();
315 if (err != -1) {
316 return err;
317 }
318
319 BatchFinished();
320 break;
321 }
322 }
323 }
324 }
325
DebugString() const326 vector<string> BKAIterator::DebugString() const {
327 switch (m_join_type) {
328 case JoinType::INNER:
329 return {"Batched key access inner join"};
330 case JoinType::SEMI:
331 return {"Batched key access semijoin"};
332 case JoinType::OUTER:
333 return {"Batched key access left join"};
334 case JoinType::ANTI:
335 return {"Batched key access antijoin"};
336 default:
337 DBUG_ASSERT(false);
338 return {"Batched key access unknown join"};
339 }
340 }
341
MultiRangeRowIterator(THD * thd,Item * cache_idx_cond,TABLE * table,bool keep_current_rowid,TABLE_REF * ref,int mrr_flags)342 MultiRangeRowIterator::MultiRangeRowIterator(THD *thd, Item *cache_idx_cond,
343 TABLE *table,
344 bool keep_current_rowid,
345 TABLE_REF *ref, int mrr_flags)
346 : TableRowIterator(thd, table),
347 m_cache_idx_cond(cache_idx_cond),
348 m_keep_current_rowid(keep_current_rowid),
349 m_table(table),
350 m_file(table->file),
351 m_ref(ref),
352 m_mrr_flags(mrr_flags) {}
353
set_outer_input_tables(JOIN * join,qep_tab_map outer_input_tables)354 void MultiRangeRowIterator::set_outer_input_tables(
355 JOIN *join, qep_tab_map outer_input_tables) {
356 m_outer_input_tables = TableCollection(join, outer_input_tables);
357 }
358
Init()359 bool MultiRangeRowIterator::Init() {
360 /*
361 Prepare to iterate over keys from the join buffer and to get
362 matching candidates obtained with MRR handler functions.
363 */
364 if (!m_file->inited) {
365 const int error = m_file->ha_index_init(m_ref->key, true);
366 if (error) {
367 m_file->print_error(error, MYF(0));
368 return error;
369 }
370 }
371 RANGE_SEQ_IF seq_funcs = {MultiRangeRowIterator::MrrInitCallbackThunk,
372 MultiRangeRowIterator::MrrNextCallbackThunk,
373 nullptr, nullptr};
374 if (m_cache_idx_cond != nullptr) {
375 seq_funcs.skip_index_tuple =
376 MultiRangeRowIterator::MrrSkipIndexTupleCallbackThunk;
377 }
378 if (m_join_type == JoinType::SEMI || m_join_type == JoinType::ANTI) {
379 seq_funcs.skip_record = MultiRangeRowIterator::MrrSkipRecordCallbackThunk;
380 }
381 if (m_match_flag_buffer != nullptr) {
382 DBUG_ASSERT(NeedMatchFlags(m_join_type));
383
384 // Reset all the match flags.
385 memset(m_match_flag_buffer, 0,
386 BytesNeededForMatchFlags(std::distance(m_begin, m_end)));
387 } else {
388 DBUG_ASSERT(!NeedMatchFlags(m_join_type));
389 }
390
391 /**
392 We don't send a set of rows directly to MRR; instead, we give it a set
393 of function pointers to iterate over the rows, and a pointer to ourselves.
394 The handler will call our callbacks as follows:
395
396 1. MrrInitCallback at the start, to initialize iteration.
397 2. MrrNextCallback is called to yield ranges to scan, until it returns 1.
398 3. If we have dependent index conditions (see the comment on
399 m_cache_idx_cond), MrrSkipIndexTuple will be called back for each
400 range that returned an inner row, and can choose to discard the row
401 there and then if it doesn't match the dependent index condition.
402 */
403 return m_file->multi_range_read_init(&seq_funcs, this,
404 std::distance(m_begin, m_end),
405 m_mrr_flags, &m_mrr_buffer);
406 }
407
MrrInitCallback(uint,uint)408 range_seq_t MultiRangeRowIterator::MrrInitCallback(uint, uint) {
409 m_current_pos = m_begin;
410 return this;
411 }
412
MrrNextCallback(KEY_MULTI_RANGE * range)413 uint MultiRangeRowIterator::MrrNextCallback(KEY_MULTI_RANGE *range) {
414 // Load the next row from the buffer, if there is one.
415 //
416 // NULL values will never match in a inner join. The optimizer will often
417 // set up a NULL filter for inner joins, but not in all cases, so we must
418 // skip such rows by checking impossible_null_ref(). Thus, we iterate
419 // until we have a row that is not NULL-filtered. The typical case is
420 // that this happens immediately.
421 //
422 // TODO(sgunders): Consider whether it would be possible to put this check
423 // before putting the rows into the buffer. That would require evaluating
424 // any items twice, though.
425 for (;;) {
426 if (m_current_pos == m_end) {
427 return 1;
428 }
429
430 hash_join_buffer::LoadIntoTableBuffers(m_outer_input_tables,
431 *m_current_pos);
432
433 construct_lookup_ref(thd(), table(), m_ref);
434 if (!m_ref->impossible_null_ref()) {
435 break;
436 }
437 ++m_current_pos;
438 }
439
440 // Set up a range consisting of a single key, so the only difference
441 // between start and end is the flags. They signify that the range starts
442 // at the row in question, and ends right after it (exclusive).
443
444 range->range_flag = EQ_RANGE;
445 range->ptr = const_cast<char *>(pointer_cast<const char *>(m_current_pos));
446
447 range->start_key.key = m_ref->key_buff;
448 range->start_key.keypart_map = (1 << m_ref->key_parts) - 1; // All keyparts.
449 range->start_key.length = m_ref->key_length;
450 range->start_key.flag = HA_READ_KEY_EXACT;
451
452 range->end_key = range->start_key;
453 range->end_key.flag = HA_READ_AFTER_KEY;
454
455 ++m_current_pos;
456 return 0;
457 }
458
MrrSkipIndexTuple(char * range_info)459 bool MultiRangeRowIterator::MrrSkipIndexTuple(char *range_info) {
460 BufferRow *rec_ptr = pointer_cast<BufferRow *>(range_info);
461
462 // The index condition depends on fields from the outer tables (or we would
463 // not be called), so we need to load the relevant rows before checking it.
464 // range_info tells us which outer row we are talking about; it corresponds to
465 // range->ptr in MrrNextCallback(), and points to the serialized outer row in
466 // BKAIterator's m_row array.
467 hash_join_buffer::LoadIntoTableBuffers(m_outer_input_tables, rec_ptr->data());
468
469 // Skip this tuple if the index condition is false.
470 return !m_cache_idx_cond->val_int();
471 }
472
MrrSkipRecord(char * range_info)473 bool MultiRangeRowIterator::MrrSkipRecord(char *range_info) {
474 BufferRow *rec_ptr = pointer_cast<BufferRow *>(range_info);
475 return RowHasBeenRead(rec_ptr);
476 }
477
Read()478 int MultiRangeRowIterator::Read() {
479 // Read a row from the MRR buffer. rec_ptr tells us which outer row
480 // this corresponds to; it corresponds to range->ptr in MrrNextCallback(),
481 // and points to the serialized outer row in BKAIterator's m_row array.
482 BufferRow *rec_ptr = nullptr;
483 do {
484 int error =
485 m_file->ha_multi_range_read_next(pointer_cast<char **>(&rec_ptr));
486 if (error != 0) {
487 return HandleError(error);
488 }
489
490 // NDB never calls mrr_funcs.skip_record(), so we need to recheck here.
491 // See bug #30594210.
492 } while (m_join_type == JoinType::SEMI && RowHasBeenRead(rec_ptr));
493
494 hash_join_buffer::LoadIntoTableBuffers(m_outer_input_tables, rec_ptr->data());
495
496 m_last_row_returned = rec_ptr;
497
498 if (m_keep_current_rowid) {
499 m_file->position(m_table->record[0]);
500 }
501
502 return 0;
503 }
504
DebugString() const505 vector<string> MultiRangeRowIterator::DebugString() const {
506 const KEY *key = &table()->key_info[m_ref->key];
507 string str = string("Multi-range index lookup on ") + table()->alias +
508 " using " + key->name + " (" +
509 RefToString(*m_ref, key, /*include_nulls=*/false) + ")";
510 if (table()->file->pushed_idx_cond != nullptr) {
511 str += ", with index condition: " +
512 ItemToString(table()->file->pushed_idx_cond);
513 }
514 if (m_cache_idx_cond != nullptr) {
515 str +=
516 ", with dependent index condition: " + ItemToString(m_cache_idx_cond);
517 }
518 str += table()->file->explain_extra();
519 return {str};
520 }
521