1 /* Copyright (c) 2019, 2020, Oracle and/or its affiliates. All rights reserved.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
22 
23 #include "sql/hash_join_chunk.h"
24 
25 #include <cstring>
26 
27 #include "my_base.h"
28 #include "my_dbug.h"
29 #include "my_sys.h"
30 #include "scope_guard.h"
31 #include "sql/hash_join_buffer.h"
32 #include "sql/mysqld.h"
33 #include "sql/sql_base.h"
34 #include "sql/sql_const.h"
35 
HashJoinChunk(HashJoinChunk && other)36 HashJoinChunk::HashJoinChunk(HashJoinChunk &&other)
37     : m_tables(std::move(other.m_tables)),
38       m_num_rows(other.m_num_rows),
39       m_file(other.m_file),
40       m_uses_match_flags(other.m_uses_match_flags) {
41   setup_io_cache(&m_file);
42   // Reset the IO_CACHE structure so that the destructor doesn't close/clear the
43   // file contents and it's buffers.
44   new (&other.m_file) IO_CACHE();
45 }
46 
operator =(HashJoinChunk && other)47 HashJoinChunk &HashJoinChunk::operator=(HashJoinChunk &&other) {
48   m_tables = std::move(other.m_tables);
49   m_num_rows = other.m_num_rows;
50   m_uses_match_flags = other.m_uses_match_flags;
51 
52   // Since the file we are replacing will become unreachable, free all resources
53   // used by it.
54   close_cached_file(&m_file);
55   m_file = other.m_file;
56   setup_io_cache(&m_file);
57 
58   // Reset the IO_CACHE structure so that the destructor doesn't close/clear the
59   // file contents and it's buffers.
60   new (&other.m_file) IO_CACHE();
61   return *this;
62 }
63 
~HashJoinChunk()64 HashJoinChunk::~HashJoinChunk() { close_cached_file(&m_file); }
65 
Init(const hash_join_buffer::TableCollection & tables,bool uses_match_flags)66 bool HashJoinChunk::Init(const hash_join_buffer::TableCollection &tables,
67                          bool uses_match_flags) {
68   m_tables = tables;
69   m_file.file_key = key_file_hash_join;
70   m_num_rows = 0;
71   m_uses_match_flags = uses_match_flags;
72   return open_cached_file(&m_file, mysql_tmpdir, TEMP_PREFIX, DISK_BUFFER_SIZE,
73                           MYF(MY_WME));
74 }
75 
Rewind()76 bool HashJoinChunk::Rewind() {
77   if (my_b_flush_io_cache(&m_file, /*need_append_buffer_lock=*/0) == -1 ||
78       reinit_io_cache(&m_file, READ_CACHE, 0, false, false)) {
79     my_error(ER_TEMP_FILE_WRITE_FAILURE, MYF(0));
80     return true;
81   }
82 
83   return false;
84 }
85 
WriteRowToChunk(String * buffer,bool matched)86 bool HashJoinChunk::WriteRowToChunk(String *buffer, bool matched) {
87   if (hash_join_buffer::StoreFromTableBuffers(m_tables, buffer)) {
88     my_error(ER_OUTOFMEMORY, MYF(ME_FATALERROR),
89              ComputeRowSizeUpperBound(m_tables));
90     return true;
91   }
92 
93   if (m_uses_match_flags) {
94     if (my_b_write(&m_file, pointer_cast<const uchar *>(&matched),
95                    sizeof(matched)) != 0) {
96       my_error(ER_TEMP_FILE_WRITE_FAILURE, MYF(0));
97       return true;
98     }
99   }
100 
101   // Write out the length of the data.
102   size_t data_length = buffer->length();
103   if (my_b_write(&m_file, pointer_cast<const uchar *>(&data_length),
104                  sizeof(data_length)) != 0) {
105     my_error(ER_TEMP_FILE_WRITE_FAILURE, MYF(0));
106     return true;
107   }
108 
109   // ... and then write the actual data.
110   if (my_b_write(&m_file, pointer_cast<uchar *>(buffer->ptr()), data_length) !=
111       0) {
112     my_error(ER_TEMP_FILE_WRITE_FAILURE, MYF(0));
113     return true;
114   }
115   m_num_rows++;
116   return false;
117 }
118 
LoadRowFromChunk(String * buffer,bool * matched)119 bool HashJoinChunk::LoadRowFromChunk(String *buffer, bool *matched) {
120   if (m_uses_match_flags) {
121     if (my_b_read(&m_file, pointer_cast<uchar *>(matched), sizeof(*matched)) !=
122         0) {
123       my_error(ER_TEMP_FILE_WRITE_FAILURE, MYF(0));
124       return true;
125     }
126   }
127 
128   // Read the length of the row.
129   size_t row_length;
130   if (my_b_read(&m_file, pointer_cast<uchar *>(&row_length),
131                 sizeof(row_length)) != 0) {
132     my_error(ER_TEMP_FILE_WRITE_FAILURE, MYF(0));
133     return true;
134   }
135 
136   // Read the actual data of the row.
137   if (buffer->reserve(row_length)) {
138     my_error(ER_OUTOFMEMORY, MYF(ME_FATALERROR), row_length);
139     return true;
140   }
141 
142   buffer->length(row_length);
143   if (my_b_read(&m_file, pointer_cast<uchar *>(buffer->ptr()), row_length) !=
144       0) {
145     my_error(ER_TEMP_FILE_WRITE_FAILURE, MYF(0));
146     return true;
147   }
148 
149   hash_join_buffer::LoadIntoTableBuffers(
150       m_tables, {pointer_cast<const uchar *>(buffer->ptr()), buffer->length()});
151 
152   return false;
153 }
154