1 /*****************************************************************************
2 
3 Copyright (c) 2018, 2020, Oracle and/or its affiliates. All Rights Reserved.
4 
5 This program is free software; you can redistribute it and/or modify it under
6 the terms of the GNU General Public License, version 2.0, as published by the
7 Free Software Foundation.
8 
9 This program is also distributed with certain software (including but not
10 limited to OpenSSL) that is licensed under separate terms, as designated in a
11 particular file or component or in included license documentation. The authors
12 of MySQL hereby grant you an additional permission to link the program and
13 your derivative works with the separately licensed software that they have
14 included with MySQL.
15 
16 This program is distributed in the hope that it will be useful, but WITHOUT
17 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
18 FOR A PARTICULAR PURPOSE. See the GNU General Public License, version 2.0,
19 for more details.
20 
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
24 
25 *****************************************************************************/
26 
27 /** @file include/row0pread-adapter.h
28 Parallel read adapter interface.
29 
30 Created 2018-02-28 by Darshan M N. */
31 
32 #ifndef row0pread_adapter_h
33 #define row0pread_adapter_h
34 
35 #include "row0pread.h"
36 #include "ut0counter.h"
37 
38 #include "handler.h"
39 
40 /** Traverse an index in the leaf page block list order and send records to
41 adapter. */
42 class Parallel_reader_adapter {
43   /** Size of the buffer used to store InnoDB records and sent to the adapter*/
44   static constexpr size_t ADAPTER_SEND_BUFFER_SIZE = 2 * 1024 * 1024;
45 
46   /** Forward declaration. */
47   struct Thread_ctx;
48 
49  public:
50   using Load_fn = handler::Load_cbk;
51 
52   using End_fn = handler::Load_end_cbk;
53 
54   using Init_fn = handler::Load_init_cbk;
55 
56   /** Constructor.
57   @param[in]  max_threads       Maximum threads to use for all scan contexts.
58   @param[in]  rowlen            Row length.  */
59   Parallel_reader_adapter(size_t max_threads, ulint rowlen);
60 
61   /** Destructor. */
62   ~Parallel_reader_adapter() = default;
63 
64   /** Add scan context.
65   @param[in]  trx               Transaction used for parallel read.
66   @param[in]  config            (Cluster) Index scan configuration.
67   @param[in]  f                 Callback function.
68   @retval error. */
69   dberr_t add_scan(trx_t *trx, const Parallel_reader::Config &config,
70                    Parallel_reader::F &&f) MY_ATTRIBUTE((warn_unused_result));
71 
72   /** Run the parallel scan.
73   @param[in]  thread_contexts   Context for each of the spawned threads
74   @param[in]  init_fn           Callback called by each parallel load thread
75                                 at the beginning of the parallel load.
76   @param[in]  load_fn           Callback called by each parallel load thread
77                                 when processing of rows is required.
78   @param[in]  end_fn            Callback called by each parallel load thread
79                                 when processing of rows has ended.
80   @return DB_SUCCESS or error code. */
81   dberr_t run(void **thread_contexts, Init_fn init_fn, Load_fn load_fn,
82               End_fn end_fn) MY_ATTRIBUTE((warn_unused_result));
83 
84   /** Convert the record in InnoDB format to MySQL format and send them.
85   @param[in]  reader_ctx  Parallel read context.
86   @return error code */
87   dberr_t process_rows(const Parallel_reader::Ctx *reader_ctx)
88       MY_ATTRIBUTE((warn_unused_result));
89 
90   /** Set up the query processing state cache.
91   @param[in]  prebuilt           The prebuilt cache for the query. */
92   void set(row_prebuilt_t *prebuilt);
93 
94  private:
95   /** Each parallel reader thread's init function.
96   @param[in]  reader_thread_ctx  context info related to the current thread
97   @return DB_SUCCESS or error code. */
98   dberr_t init(Parallel_reader::Thread_ctx *reader_thread_ctx)
99       MY_ATTRIBUTE((warn_unused_result));
100 
101   /** Each parallel reader thread's end function.
102   @param[in]  reader_thread_ctx  context info related to the current thread
103   @return DB_SUCCESS or error code. */
104   dberr_t end(Parallel_reader::Thread_ctx *reader_thread_ctx)
105       MY_ATTRIBUTE((warn_unused_result));
106 
107   /** Send a batch of records.
108   @param[in]  reader_thread_ctx reader threads related thread context info
109   @param[in]  partition_id      partition ID of the index the record belongs to
110   @param[in]  n_recs            Number of records to send.
111   @return DB_SUCCESS or error code. */
112   dberr_t send_batch(Parallel_reader::Thread_ctx *reader_thread_ctx,
113                      size_t partition_id, uint64_t n_recs)
114       MY_ATTRIBUTE((warn_unused_result));
115 
116   /** Get the number of rows buffered but not sent.
117   @param[in]  ctx  adapter related thread context information.
118   @return number of buffered items. */
pending(Thread_ctx * ctx)119   size_t pending(Thread_ctx *ctx) const MY_ATTRIBUTE((warn_unused_result)) {
120     return (ctx->m_n_read - ctx->m_n_sent);
121   }
122 
123   /** Check if the buffer is full.
124   @param[in]  ctx  adapter related thread context information.
125   @return true if the buffer is full. */
is_buffer_full(Thread_ctx * ctx)126   bool is_buffer_full(Thread_ctx *ctx) const
127       MY_ATTRIBUTE((warn_unused_result)) {
128     return ctx->m_n_read > 0 && ctx->m_n_read % m_batch_size == 0;
129   }
130 
131  private:
132   /** Adapter context for each of the spawned threads. We don't know the
133   type of the context it's passed to us as a void *.  */
134   void **m_thread_ctxs{};
135 
136   /** Callback called by each parallel load thread at the
137   beginning of the parallel load for the scan. */
138   Init_fn m_init_fn{};
139 
140   /** Callback called by each parallel load thread when
141   processing of rows is required for the scan. */
142   Load_fn m_load_fn{};
143 
144   /** Callback called by each parallel load thread when
145   processing of rows has ended for the scan. */
146   End_fn m_end_fn{};
147 
148   /** Number of records to be sent across to the caller in a batch. */
149   uint64_t m_batch_size{};
150 
151   /** MySQL row meta data. This is common across partitions. */
152   struct MySQL_row {
153     using Column_meta_data = std::vector<ulong, ut_allocator<ulong>>;
154 
155     /** Column offsets. */
156     Column_meta_data m_offsets{};
157 
158     /** Column null bit masks. */
159     Column_meta_data m_null_bit_mask{};
160 
161     /** Column null bit offsets. */
162     Column_meta_data m_null_bit_offsets{};
163 
164     /** Maximum row length. */
165     ulong m_max_len{};
166   };
167 
168   /** Row meta data per scan context. */
169   MySQL_row m_mysql_row{};
170 
171   /** Callback thread context for each of the spawned threads. */
172   struct Thread_ctx {
173     /** Constructor. */
174     Thread_ctx();
175 
176     /** Destructor. */
177     ~Thread_ctx() = default;
178 
179     /** Number of records read. */
180     size_t m_n_read{};
181 
182     /** Number of records sent to the adapter. */
183     size_t m_n_sent{};
184 
185     /** Buffer to store records to be sent to the adapter. */
186     std::vector<byte, ut_allocator<byte>> m_buffer;
187   };
188 
189   /** Prebuilt to use for conversion to MySQL row format.
190   NOTE: We are sharing this because we don't convert BLOBs yet. There are
191   data members in row_prebuilt_t that cannot be accessed in multi-threaded
192   mode e.g., blob_heap.
193 
194   row_prebuilt_t is designed for single threaded access and to share
195   it among threads is not recommended unless "you know what you are doing".
196   This is very fragile code as it stands.
197 
198   To solve the blob heap issue in prebuilt we use per thread m_blob_heaps.
199   Pass the blob heap to the InnoDB to MySQL row format conversion function. */
200   row_prebuilt_t *m_prebuilt{};
201 
202   /** Parallel reader to use. */
203   Parallel_reader m_parallel_reader;
204 };
205 
206 #endif /* !row0pread_adapter_h */
207