1 /***************************************************************************** 2 3 Copyright (c) 2018, 2020, Oracle and/or its affiliates. All Rights Reserved. 4 5 This program is free software; you can redistribute it and/or modify it under 6 the terms of the GNU General Public License, version 2.0, as published by the 7 Free Software Foundation. 8 9 This program is also distributed with certain software (including but not 10 limited to OpenSSL) that is licensed under separate terms, as designated in a 11 particular file or component or in included license documentation. The authors 12 of MySQL hereby grant you an additional permission to link the program and 13 your derivative works with the separately licensed software that they have 14 included with MySQL. 15 16 This program is distributed in the hope that it will be useful, but WITHOUT 17 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 18 FOR A PARTICULAR PURPOSE. See the GNU General Public License, version 2.0, 19 for more details. 20 21 You should have received a copy of the GNU General Public License along with 22 this program; if not, write to the Free Software Foundation, Inc., 23 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA 24 25 *****************************************************************************/ 26 27 /** @file include/row0pread-adapter.h 28 Parallel read adapter interface. 29 30 Created 2018-02-28 by Darshan M N. */ 31 32 #ifndef row0pread_adapter_h 33 #define row0pread_adapter_h 34 35 #include "row0pread.h" 36 #include "ut0counter.h" 37 38 #include "handler.h" 39 40 /** Traverse an index in the leaf page block list order and send records to 41 adapter. */ 42 class Parallel_reader_adapter { 43 /** Size of the buffer used to store InnoDB records and sent to the adapter*/ 44 static constexpr size_t ADAPTER_SEND_BUFFER_SIZE = 2 * 1024 * 1024; 45 46 /** Forward declaration. */ 47 struct Thread_ctx; 48 49 public: 50 using Load_fn = handler::Load_cbk; 51 52 using End_fn = handler::Load_end_cbk; 53 54 using Init_fn = handler::Load_init_cbk; 55 56 /** Constructor. 57 @param[in] max_threads Maximum threads to use for all scan contexts. 58 @param[in] rowlen Row length. */ 59 Parallel_reader_adapter(size_t max_threads, ulint rowlen); 60 61 /** Destructor. */ 62 ~Parallel_reader_adapter() = default; 63 64 /** Add scan context. 65 @param[in] trx Transaction used for parallel read. 66 @param[in] config (Cluster) Index scan configuration. 67 @param[in] f Callback function. 68 @retval error. */ 69 dberr_t add_scan(trx_t *trx, const Parallel_reader::Config &config, 70 Parallel_reader::F &&f) MY_ATTRIBUTE((warn_unused_result)); 71 72 /** Run the parallel scan. 73 @param[in] thread_contexts Context for each of the spawned threads 74 @param[in] init_fn Callback called by each parallel load thread 75 at the beginning of the parallel load. 76 @param[in] load_fn Callback called by each parallel load thread 77 when processing of rows is required. 78 @param[in] end_fn Callback called by each parallel load thread 79 when processing of rows has ended. 80 @return DB_SUCCESS or error code. */ 81 dberr_t run(void **thread_contexts, Init_fn init_fn, Load_fn load_fn, 82 End_fn end_fn) MY_ATTRIBUTE((warn_unused_result)); 83 84 /** Convert the record in InnoDB format to MySQL format and send them. 85 @param[in] reader_ctx Parallel read context. 86 @return error code */ 87 dberr_t process_rows(const Parallel_reader::Ctx *reader_ctx) 88 MY_ATTRIBUTE((warn_unused_result)); 89 90 /** Set up the query processing state cache. 91 @param[in] prebuilt The prebuilt cache for the query. */ 92 void set(row_prebuilt_t *prebuilt); 93 94 private: 95 /** Each parallel reader thread's init function. 96 @param[in] reader_thread_ctx context info related to the current thread 97 @return DB_SUCCESS or error code. */ 98 dberr_t init(Parallel_reader::Thread_ctx *reader_thread_ctx) 99 MY_ATTRIBUTE((warn_unused_result)); 100 101 /** Each parallel reader thread's end function. 102 @param[in] reader_thread_ctx context info related to the current thread 103 @return DB_SUCCESS or error code. */ 104 dberr_t end(Parallel_reader::Thread_ctx *reader_thread_ctx) 105 MY_ATTRIBUTE((warn_unused_result)); 106 107 /** Send a batch of records. 108 @param[in] reader_thread_ctx reader threads related thread context info 109 @param[in] partition_id partition ID of the index the record belongs to 110 @param[in] n_recs Number of records to send. 111 @return DB_SUCCESS or error code. */ 112 dberr_t send_batch(Parallel_reader::Thread_ctx *reader_thread_ctx, 113 size_t partition_id, uint64_t n_recs) 114 MY_ATTRIBUTE((warn_unused_result)); 115 116 /** Get the number of rows buffered but not sent. 117 @param[in] ctx adapter related thread context information. 118 @return number of buffered items. */ pending(Thread_ctx * ctx)119 size_t pending(Thread_ctx *ctx) const MY_ATTRIBUTE((warn_unused_result)) { 120 return (ctx->m_n_read - ctx->m_n_sent); 121 } 122 123 /** Check if the buffer is full. 124 @param[in] ctx adapter related thread context information. 125 @return true if the buffer is full. */ is_buffer_full(Thread_ctx * ctx)126 bool is_buffer_full(Thread_ctx *ctx) const 127 MY_ATTRIBUTE((warn_unused_result)) { 128 return ctx->m_n_read > 0 && ctx->m_n_read % m_batch_size == 0; 129 } 130 131 private: 132 /** Adapter context for each of the spawned threads. We don't know the 133 type of the context it's passed to us as a void *. */ 134 void **m_thread_ctxs{}; 135 136 /** Callback called by each parallel load thread at the 137 beginning of the parallel load for the scan. */ 138 Init_fn m_init_fn{}; 139 140 /** Callback called by each parallel load thread when 141 processing of rows is required for the scan. */ 142 Load_fn m_load_fn{}; 143 144 /** Callback called by each parallel load thread when 145 processing of rows has ended for the scan. */ 146 End_fn m_end_fn{}; 147 148 /** Number of records to be sent across to the caller in a batch. */ 149 uint64_t m_batch_size{}; 150 151 /** MySQL row meta data. This is common across partitions. */ 152 struct MySQL_row { 153 using Column_meta_data = std::vector<ulong, ut_allocator<ulong>>; 154 155 /** Column offsets. */ 156 Column_meta_data m_offsets{}; 157 158 /** Column null bit masks. */ 159 Column_meta_data m_null_bit_mask{}; 160 161 /** Column null bit offsets. */ 162 Column_meta_data m_null_bit_offsets{}; 163 164 /** Maximum row length. */ 165 ulong m_max_len{}; 166 }; 167 168 /** Row meta data per scan context. */ 169 MySQL_row m_mysql_row{}; 170 171 /** Callback thread context for each of the spawned threads. */ 172 struct Thread_ctx { 173 /** Constructor. */ 174 Thread_ctx(); 175 176 /** Destructor. */ 177 ~Thread_ctx() = default; 178 179 /** Number of records read. */ 180 size_t m_n_read{}; 181 182 /** Number of records sent to the adapter. */ 183 size_t m_n_sent{}; 184 185 /** Buffer to store records to be sent to the adapter. */ 186 std::vector<byte, ut_allocator<byte>> m_buffer; 187 }; 188 189 /** Prebuilt to use for conversion to MySQL row format. 190 NOTE: We are sharing this because we don't convert BLOBs yet. There are 191 data members in row_prebuilt_t that cannot be accessed in multi-threaded 192 mode e.g., blob_heap. 193 194 row_prebuilt_t is designed for single threaded access and to share 195 it among threads is not recommended unless "you know what you are doing". 196 This is very fragile code as it stands. 197 198 To solve the blob heap issue in prebuilt we use per thread m_blob_heaps. 199 Pass the blob heap to the InnoDB to MySQL row format conversion function. */ 200 row_prebuilt_t *m_prebuilt{}; 201 202 /** Parallel reader to use. */ 203 Parallel_reader m_parallel_reader; 204 }; 205 206 #endif /* !row0pread_adapter_h */ 207