1 /*****************************************************************************
2 
3 Copyright (c) 2018, 2020, Oracle and/or its affiliates. All Rights Reserved.
4 
5 This program is free software; you can redistribute it and/or modify it under
6 the terms of the GNU General Public License, version 2.0, as published by the
7 Free Software Foundation.
8 
9 This program is also distributed with certain software (including but not
10 limited to OpenSSL) that is licensed under separate terms, as designated in a
11 particular file or component or in included license documentation. The authors
12 of MySQL hereby grant you an additional permission to link the program and
13 your derivative works with the separately licensed software that they have
14 included with MySQL.
15 
16 This program is distributed in the hope that it will be useful, but WITHOUT
17 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
18 FOR A PARTICULAR PURPOSE. See the GNU General Public License, version 2.0,
19 for more details.
20 
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
24 
25 *****************************************************************************/
26 
27 /** @file row/row0pread-adapter.cc
28 Parallel read adapter interface implementation
29 
30 Created 2018-02-28 by Darshan M N */
31 
32 #ifndef UNIV_HOTBACKUP
33 
34 #include "row0pread-adapter.h"
35 #include "row0sel.h"
36 #include "univ.i"
37 
Parallel_reader_adapter(size_t max_threads,ulint rowlen)38 Parallel_reader_adapter::Parallel_reader_adapter(size_t max_threads,
39                                                  ulint rowlen)
40     : m_parallel_reader(max_threads) {
41   m_batch_size = ADAPTER_SEND_BUFFER_SIZE / rowlen;
42 }
43 
add_scan(trx_t * trx,const Parallel_reader::Config & config,Parallel_reader::F && f)44 dberr_t Parallel_reader_adapter::add_scan(trx_t *trx,
45                                           const Parallel_reader::Config &config,
46                                           Parallel_reader::F &&f) {
47   return (m_parallel_reader.add_scan(trx, config, std::move(f)));
48 }
49 
Thread_ctx()50 Parallel_reader_adapter::Thread_ctx::Thread_ctx() {
51   m_buffer.resize(ADAPTER_SEND_BUFFER_SIZE);
52 }
53 
set(row_prebuilt_t * prebuilt)54 void Parallel_reader_adapter::set(row_prebuilt_t *prebuilt) {
55   ut_a(prebuilt->n_template > 0);
56   ut_a(m_mysql_row.m_offsets.empty());
57   ut_a(m_mysql_row.m_null_bit_mask.empty());
58   ut_a(m_mysql_row.m_null_bit_offsets.empty());
59 
60   /* Partition structure should be the same across all partitions.
61   Therefore MySQL row meta-data is common across all paritions. */
62 
63   for (uint i = 0; i < prebuilt->n_template; ++i) {
64     const auto &templt = prebuilt->mysql_template[i];
65 
66     m_mysql_row.m_offsets.push_back(
67         static_cast<ulong>(templt.mysql_col_offset));
68     m_mysql_row.m_null_bit_mask.push_back(
69         static_cast<ulong>(templt.mysql_null_bit_mask));
70     m_mysql_row.m_null_bit_offsets.push_back(
71         static_cast<ulong>(templt.mysql_null_byte_offset));
72   }
73 
74   ut_a(m_mysql_row.m_max_len == 0);
75   ut_a(prebuilt->mysql_row_len > 0);
76   m_mysql_row.m_max_len = static_cast<ulong>(prebuilt->mysql_row_len);
77 
78   m_parallel_reader.set_start_callback(
79       [=](Parallel_reader::Thread_ctx *reader_thread_ctx) {
80         return init(reader_thread_ctx);
81       });
82 
83   m_parallel_reader.set_finish_callback(
84       [=](Parallel_reader::Thread_ctx *reader_thread_ctx) {
85         return end(reader_thread_ctx);
86       });
87 
88   ut_a(m_prebuilt == nullptr);
89   m_prebuilt = prebuilt;
90 }
91 
run(void ** thread_ctxs,Init_fn init_fn,Load_fn load_fn,End_fn end_fn)92 dberr_t Parallel_reader_adapter::run(void **thread_ctxs, Init_fn init_fn,
93                                      Load_fn load_fn, End_fn end_fn) {
94   m_end_fn = end_fn;
95   m_init_fn = init_fn;
96   m_load_fn = load_fn;
97   m_thread_ctxs = thread_ctxs;
98 
99   return m_parallel_reader.run();
100 }
101 
init(Parallel_reader::Thread_ctx * reader_thread_ctx)102 dberr_t Parallel_reader_adapter::init(
103     Parallel_reader::Thread_ctx *reader_thread_ctx) {
104   auto thread_ctx = UT_NEW(Thread_ctx(), mem_key_archive);
105 
106   if (thread_ctx == nullptr) {
107     return DB_OUT_OF_MEMORY;
108   }
109 
110   reader_thread_ctx->set_callback_ctx<Thread_ctx>(thread_ctx);
111 
112   /** There are data members in row_prebuilt_t that cannot be accessed in
113   multi-threaded mode e.g., blob_heap.
114 
115   row_prebuilt_t is designed for single threaded access and to share
116   it among threads is not recommended unless "you know what you are doing".
117   This is very fragile code as it stands.
118 
119   To solve the blob heap issue in prebuilt we request parallel reader thread to
120   use blob heap per thread and we pass this blob heap to the InnoDB to MySQL
121   row format conversion function. */
122   reader_thread_ctx->create_blob_heap();
123 
124   auto ret = m_init_fn(m_thread_ctxs[reader_thread_ctx->m_thread_id],
125                        static_cast<ulong>(m_mysql_row.m_offsets.size()),
126                        m_mysql_row.m_max_len, &m_mysql_row.m_offsets[0],
127                        &m_mysql_row.m_null_bit_offsets[0],
128                        &m_mysql_row.m_null_bit_mask[0]);
129 
130   return (ret ? DB_INTERRUPTED : DB_SUCCESS);
131 }
132 
send_batch(Parallel_reader::Thread_ctx * reader_thread_ctx,size_t partition_id,uint64_t n_recs)133 dberr_t Parallel_reader_adapter::send_batch(
134     Parallel_reader::Thread_ctx *reader_thread_ctx, size_t partition_id,
135     uint64_t n_recs) {
136   auto ctx = reader_thread_ctx->get_callback_ctx<Thread_ctx>();
137   const auto thread_id = reader_thread_ctx->m_thread_id;
138 
139   const auto start = ctx->m_n_sent % m_batch_size;
140 
141   ut_a(n_recs <= m_batch_size);
142   ut_a(start + n_recs <= m_batch_size);
143 
144   const auto rec_loc = &ctx->m_buffer[start * m_mysql_row.m_max_len];
145 
146   dberr_t err{DB_SUCCESS};
147 
148   if (m_load_fn(m_thread_ctxs[thread_id], n_recs, rec_loc, partition_id)) {
149     err = DB_INTERRUPTED;
150     m_parallel_reader.set_error_state(DB_INTERRUPTED);
151   }
152 
153   ctx->m_n_sent += n_recs;
154 
155   return err;
156 }
157 
process_rows(const Parallel_reader::Ctx * reader_ctx)158 dberr_t Parallel_reader_adapter::process_rows(
159     const Parallel_reader::Ctx *reader_ctx) {
160   auto reader_thread_ctx = reader_ctx->thread_ctx();
161   auto ctx = reader_thread_ctx->get_callback_ctx<Thread_ctx>();
162 
163   ut_a(ctx->m_n_read >= ctx->m_n_sent);
164   ut_a(ctx->m_n_read - ctx->m_n_sent <= m_batch_size);
165 
166   dberr_t err{DB_SUCCESS};
167 
168   {
169     auto n_pending = pending(ctx);
170 
171     /* Start of a new range, send what we have buffered. */
172     if ((reader_ctx->m_start && n_pending > 0) || is_buffer_full(ctx)) {
173       auto part_id = reader_ctx->m_start
174                          ? reader_thread_ctx->m_prev_partition_id
175                          : reader_ctx->partition_id();
176 
177       err = send_batch(reader_thread_ctx, part_id, n_pending);
178 
179       if (err != DB_SUCCESS) {
180         return (err);
181       }
182     }
183   }
184 
185   mem_heap_t *heap{};
186   ulint offsets_[REC_OFFS_NORMAL_SIZE];
187   ulint *offsets = offsets_;
188 
189   rec_offs_init(offsets_);
190 
191   offsets = rec_get_offsets(reader_ctx->m_rec, reader_ctx->index(), offsets,
192                             ULINT_UNDEFINED, &heap);
193 
194   const auto next_rec = ctx->m_n_read % m_batch_size;
195 
196   const auto buffer_loc = &ctx->m_buffer[0] + next_rec * m_mysql_row.m_max_len;
197 
198   if (row_sel_store_mysql_rec(buffer_loc, m_prebuilt, reader_ctx->m_rec,
199                               nullptr, true, reader_ctx->index(),
200                               reader_ctx->index(), offsets, false, nullptr,
201                               reader_thread_ctx->m_blob_heap)) {
202     ++ctx->m_n_read;
203 
204     if (m_parallel_reader.is_error_set()) {
205       /* Simply skip sending the records to RAPID in case of an error in the
206       parallel reader and return DB_ERROR as the error could have been
207       originated from RAPID threads. */
208       err = DB_ERROR;
209     }
210   } else {
211     err = DB_ERROR;
212   }
213 
214   if (heap != nullptr) {
215     mem_heap_free(heap);
216   }
217 
218   return (err);
219 }
220 
end(Parallel_reader::Thread_ctx * reader_thread_ctx)221 dberr_t Parallel_reader_adapter::end(
222     Parallel_reader::Thread_ctx *reader_thread_ctx) {
223   dberr_t err{DB_SUCCESS};
224 
225   auto thread_id = reader_thread_ctx->m_thread_id;
226   auto thread_ctx = reader_thread_ctx->get_callback_ctx<Thread_ctx>();
227 
228   ut_a(thread_ctx->m_n_sent <= thread_ctx->m_n_read);
229   ut_a(thread_ctx->m_n_read - thread_ctx->m_n_sent <= m_batch_size);
230 
231   if (!m_parallel_reader.is_error_set()) {
232     /* It's possible that we might not have sent the records in the buffer
233     when we have reached the end of records and the buffer is not full.
234     Send them now. */
235     size_t n_pending = pending(thread_ctx);
236 
237     err = (n_pending != 0)
238               ? send_batch(reader_thread_ctx,
239                            reader_thread_ctx->m_prev_partition_id, n_pending)
240               : DB_SUCCESS;
241   }
242 
243   m_end_fn(m_thread_ctxs[thread_id]);
244 
245   UT_DELETE(thread_ctx);
246   reader_thread_ctx->set_callback_ctx<Thread_ctx>(nullptr);
247 
248   return (err);
249 }
250 #endif /* !UNIV_HOTBACKUP */
251