1 /*****************************************************************************
2
3 Copyright (c) 2018, 2020, Oracle and/or its affiliates. All Rights Reserved.
4
5 This program is free software; you can redistribute it and/or modify it under
6 the terms of the GNU General Public License, version 2.0, as published by the
7 Free Software Foundation.
8
9 This program is also distributed with certain software (including but not
10 limited to OpenSSL) that is licensed under separate terms, as designated in a
11 particular file or component or in included license documentation. The authors
12 of MySQL hereby grant you an additional permission to link the program and
13 your derivative works with the separately licensed software that they have
14 included with MySQL.
15
16 This program is distributed in the hope that it will be useful, but WITHOUT
17 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
18 FOR A PARTICULAR PURPOSE. See the GNU General Public License, version 2.0,
19 for more details.
20
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
24
25 *****************************************************************************/
26
27 /** @file row/row0pread-adapter.cc
28 Parallel read adapter interface implementation
29
30 Created 2018-02-28 by Darshan M N */
31
32 #ifndef UNIV_HOTBACKUP
33
34 #include "row0pread-adapter.h"
35 #include "row0sel.h"
36 #include "univ.i"
37
Parallel_reader_adapter(size_t max_threads,ulint rowlen)38 Parallel_reader_adapter::Parallel_reader_adapter(size_t max_threads,
39 ulint rowlen)
40 : m_parallel_reader(max_threads) {
41 m_batch_size = ADAPTER_SEND_BUFFER_SIZE / rowlen;
42 }
43
add_scan(trx_t * trx,const Parallel_reader::Config & config,Parallel_reader::F && f)44 dberr_t Parallel_reader_adapter::add_scan(trx_t *trx,
45 const Parallel_reader::Config &config,
46 Parallel_reader::F &&f) {
47 return (m_parallel_reader.add_scan(trx, config, std::move(f)));
48 }
49
Thread_ctx()50 Parallel_reader_adapter::Thread_ctx::Thread_ctx() {
51 m_buffer.resize(ADAPTER_SEND_BUFFER_SIZE);
52 }
53
set(row_prebuilt_t * prebuilt)54 void Parallel_reader_adapter::set(row_prebuilt_t *prebuilt) {
55 ut_a(prebuilt->n_template > 0);
56 ut_a(m_mysql_row.m_offsets.empty());
57 ut_a(m_mysql_row.m_null_bit_mask.empty());
58 ut_a(m_mysql_row.m_null_bit_offsets.empty());
59
60 /* Partition structure should be the same across all partitions.
61 Therefore MySQL row meta-data is common across all paritions. */
62
63 for (uint i = 0; i < prebuilt->n_template; ++i) {
64 const auto &templt = prebuilt->mysql_template[i];
65
66 m_mysql_row.m_offsets.push_back(
67 static_cast<ulong>(templt.mysql_col_offset));
68 m_mysql_row.m_null_bit_mask.push_back(
69 static_cast<ulong>(templt.mysql_null_bit_mask));
70 m_mysql_row.m_null_bit_offsets.push_back(
71 static_cast<ulong>(templt.mysql_null_byte_offset));
72 }
73
74 ut_a(m_mysql_row.m_max_len == 0);
75 ut_a(prebuilt->mysql_row_len > 0);
76 m_mysql_row.m_max_len = static_cast<ulong>(prebuilt->mysql_row_len);
77
78 m_parallel_reader.set_start_callback(
79 [=](Parallel_reader::Thread_ctx *reader_thread_ctx) {
80 return init(reader_thread_ctx);
81 });
82
83 m_parallel_reader.set_finish_callback(
84 [=](Parallel_reader::Thread_ctx *reader_thread_ctx) {
85 return end(reader_thread_ctx);
86 });
87
88 ut_a(m_prebuilt == nullptr);
89 m_prebuilt = prebuilt;
90 }
91
run(void ** thread_ctxs,Init_fn init_fn,Load_fn load_fn,End_fn end_fn)92 dberr_t Parallel_reader_adapter::run(void **thread_ctxs, Init_fn init_fn,
93 Load_fn load_fn, End_fn end_fn) {
94 m_end_fn = end_fn;
95 m_init_fn = init_fn;
96 m_load_fn = load_fn;
97 m_thread_ctxs = thread_ctxs;
98
99 return m_parallel_reader.run();
100 }
101
init(Parallel_reader::Thread_ctx * reader_thread_ctx)102 dberr_t Parallel_reader_adapter::init(
103 Parallel_reader::Thread_ctx *reader_thread_ctx) {
104 auto thread_ctx = UT_NEW(Thread_ctx(), mem_key_archive);
105
106 if (thread_ctx == nullptr) {
107 return DB_OUT_OF_MEMORY;
108 }
109
110 reader_thread_ctx->set_callback_ctx<Thread_ctx>(thread_ctx);
111
112 /** There are data members in row_prebuilt_t that cannot be accessed in
113 multi-threaded mode e.g., blob_heap.
114
115 row_prebuilt_t is designed for single threaded access and to share
116 it among threads is not recommended unless "you know what you are doing".
117 This is very fragile code as it stands.
118
119 To solve the blob heap issue in prebuilt we request parallel reader thread to
120 use blob heap per thread and we pass this blob heap to the InnoDB to MySQL
121 row format conversion function. */
122 reader_thread_ctx->create_blob_heap();
123
124 auto ret = m_init_fn(m_thread_ctxs[reader_thread_ctx->m_thread_id],
125 static_cast<ulong>(m_mysql_row.m_offsets.size()),
126 m_mysql_row.m_max_len, &m_mysql_row.m_offsets[0],
127 &m_mysql_row.m_null_bit_offsets[0],
128 &m_mysql_row.m_null_bit_mask[0]);
129
130 return (ret ? DB_INTERRUPTED : DB_SUCCESS);
131 }
132
send_batch(Parallel_reader::Thread_ctx * reader_thread_ctx,size_t partition_id,uint64_t n_recs)133 dberr_t Parallel_reader_adapter::send_batch(
134 Parallel_reader::Thread_ctx *reader_thread_ctx, size_t partition_id,
135 uint64_t n_recs) {
136 auto ctx = reader_thread_ctx->get_callback_ctx<Thread_ctx>();
137 const auto thread_id = reader_thread_ctx->m_thread_id;
138
139 const auto start = ctx->m_n_sent % m_batch_size;
140
141 ut_a(n_recs <= m_batch_size);
142 ut_a(start + n_recs <= m_batch_size);
143
144 const auto rec_loc = &ctx->m_buffer[start * m_mysql_row.m_max_len];
145
146 dberr_t err{DB_SUCCESS};
147
148 if (m_load_fn(m_thread_ctxs[thread_id], n_recs, rec_loc, partition_id)) {
149 err = DB_INTERRUPTED;
150 m_parallel_reader.set_error_state(DB_INTERRUPTED);
151 }
152
153 ctx->m_n_sent += n_recs;
154
155 return err;
156 }
157
process_rows(const Parallel_reader::Ctx * reader_ctx)158 dberr_t Parallel_reader_adapter::process_rows(
159 const Parallel_reader::Ctx *reader_ctx) {
160 auto reader_thread_ctx = reader_ctx->thread_ctx();
161 auto ctx = reader_thread_ctx->get_callback_ctx<Thread_ctx>();
162
163 ut_a(ctx->m_n_read >= ctx->m_n_sent);
164 ut_a(ctx->m_n_read - ctx->m_n_sent <= m_batch_size);
165
166 dberr_t err{DB_SUCCESS};
167
168 {
169 auto n_pending = pending(ctx);
170
171 /* Start of a new range, send what we have buffered. */
172 if ((reader_ctx->m_start && n_pending > 0) || is_buffer_full(ctx)) {
173 auto part_id = reader_ctx->m_start
174 ? reader_thread_ctx->m_prev_partition_id
175 : reader_ctx->partition_id();
176
177 err = send_batch(reader_thread_ctx, part_id, n_pending);
178
179 if (err != DB_SUCCESS) {
180 return (err);
181 }
182 }
183 }
184
185 mem_heap_t *heap{};
186 ulint offsets_[REC_OFFS_NORMAL_SIZE];
187 ulint *offsets = offsets_;
188
189 rec_offs_init(offsets_);
190
191 offsets = rec_get_offsets(reader_ctx->m_rec, reader_ctx->index(), offsets,
192 ULINT_UNDEFINED, &heap);
193
194 const auto next_rec = ctx->m_n_read % m_batch_size;
195
196 const auto buffer_loc = &ctx->m_buffer[0] + next_rec * m_mysql_row.m_max_len;
197
198 if (row_sel_store_mysql_rec(buffer_loc, m_prebuilt, reader_ctx->m_rec,
199 nullptr, true, reader_ctx->index(),
200 reader_ctx->index(), offsets, false, nullptr,
201 reader_thread_ctx->m_blob_heap)) {
202 ++ctx->m_n_read;
203
204 if (m_parallel_reader.is_error_set()) {
205 /* Simply skip sending the records to RAPID in case of an error in the
206 parallel reader and return DB_ERROR as the error could have been
207 originated from RAPID threads. */
208 err = DB_ERROR;
209 }
210 } else {
211 err = DB_ERROR;
212 }
213
214 if (heap != nullptr) {
215 mem_heap_free(heap);
216 }
217
218 return (err);
219 }
220
end(Parallel_reader::Thread_ctx * reader_thread_ctx)221 dberr_t Parallel_reader_adapter::end(
222 Parallel_reader::Thread_ctx *reader_thread_ctx) {
223 dberr_t err{DB_SUCCESS};
224
225 auto thread_id = reader_thread_ctx->m_thread_id;
226 auto thread_ctx = reader_thread_ctx->get_callback_ctx<Thread_ctx>();
227
228 ut_a(thread_ctx->m_n_sent <= thread_ctx->m_n_read);
229 ut_a(thread_ctx->m_n_read - thread_ctx->m_n_sent <= m_batch_size);
230
231 if (!m_parallel_reader.is_error_set()) {
232 /* It's possible that we might not have sent the records in the buffer
233 when we have reached the end of records and the buffer is not full.
234 Send them now. */
235 size_t n_pending = pending(thread_ctx);
236
237 err = (n_pending != 0)
238 ? send_batch(reader_thread_ctx,
239 reader_thread_ctx->m_prev_partition_id, n_pending)
240 : DB_SUCCESS;
241 }
242
243 m_end_fn(m_thread_ctxs[thread_id]);
244
245 UT_DELETE(thread_ctx);
246 reader_thread_ctx->set_callback_ctx<Thread_ctx>(nullptr);
247
248 return (err);
249 }
250 #endif /* !UNIV_HOTBACKUP */
251