1 /* 2 Copyright (c) 2016, Facebook, Inc. 3 4 This program is free software; you can redistribute it and/or modify 5 it under the terms of the GNU General Public License as published by 6 the Free Software Foundation; version 2 of the License. 7 8 This program is distributed in the hope that it will be useful, 9 but WITHOUT ANY WARRANTY; without even the implied warranty of 10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 11 GNU General Public License for more details. 12 13 You should have received a copy of the GNU General Public License 14 along with this program; if not, write to the Free Software 15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA */ 16 17 #pragma once 18 19 /* MySQL header files */ 20 #include "../sql/log.h" 21 #include "./handler.h" /* handler */ 22 #include "./my_global.h" /* ulonglong */ 23 24 /* C++ standard header files */ 25 #include <queue> 26 #include <set> 27 #include <vector> 28 29 /* RocksDB header files */ 30 #include "rocksdb/db.h" 31 32 /* MyRocks header files */ 33 #include "./rdb_comparator.h" 34 35 namespace myrocks { 36 37 /* 38 Length of delimiters used during inplace index creation. 39 */ 40 #define RDB_MERGE_CHUNK_LEN sizeof(size_t) 41 #define RDB_MERGE_REC_DELIMITER sizeof(size_t) 42 #define RDB_MERGE_KEY_DELIMITER RDB_MERGE_REC_DELIMITER 43 #define RDB_MERGE_VAL_DELIMITER RDB_MERGE_REC_DELIMITER 44 45 class Rdb_key_def; 46 class Rdb_tbl_def; 47 48 class Rdb_index_merge { 49 Rdb_index_merge(const Rdb_index_merge &p) = delete; 50 Rdb_index_merge &operator=(const Rdb_index_merge &p) = delete; 51 52 public: 53 /* Information about temporary files used in external merge sort */ 54 struct merge_file_info { 55 File m_fd = -1; /* file descriptor */ 56 ulong m_num_sort_buffers = 0; /* number of sort buffers in temp file */ 57 }; 58 59 /* Buffer for sorting in main memory. */ 60 struct merge_buf_info { 61 /* heap memory allocated for main memory sort/merge */ 62 std::unique_ptr<uchar[]> m_block; 63 const ulonglong 64 m_block_len; /* amount of data bytes allocated for block above */ 65 ulonglong m_curr_offset; /* offset of the record pointer for the block */ 66 ulonglong m_disk_start_offset; /* where the chunk starts on disk */ 67 ulonglong m_disk_curr_offset; /* current offset on disk */ 68 ulonglong m_total_size; /* total # of data bytes in chunk */ 69 70 void store_key_value(const rocksdb::Slice &key, const rocksdb::Slice &val) 71 MY_ATTRIBUTE((__nonnull__)); 72 73 void store_slice(const rocksdb::Slice &slice) MY_ATTRIBUTE((__nonnull__)); 74 75 size_t prepare(File fd, ulonglong f_offset) MY_ATTRIBUTE((__nonnull__)); 76 77 int read_next_chunk_from_disk(File fd) 78 MY_ATTRIBUTE((__nonnull__, __warn_unused_result__)); 79 is_chunk_finishedmerge_buf_info80 inline bool is_chunk_finished() const { 81 return m_curr_offset + m_disk_curr_offset - m_disk_start_offset == 82 m_total_size; 83 } 84 has_spacemerge_buf_info85 inline bool has_space(uint64 needed) const { 86 return m_curr_offset + needed <= m_block_len; 87 } 88 merge_buf_infomerge_buf_info89 explicit merge_buf_info(const ulonglong merge_block_size) 90 : m_block(nullptr), 91 m_block_len(merge_block_size), 92 m_curr_offset(0), 93 m_disk_start_offset(0), 94 m_disk_curr_offset(0), 95 m_total_size(merge_block_size) { 96 /* Will throw an exception if it runs out of memory here */ 97 m_block = std::unique_ptr<uchar[]>(new uchar[merge_block_size]); 98 99 /* Initialize entire buffer to 0 to avoid valgrind errors */ 100 memset(m_block.get(), 0, merge_block_size); 101 } 102 }; 103 104 /* Represents an entry in the heap during merge phase of external sort */ 105 struct merge_heap_entry { 106 std::shared_ptr<merge_buf_info> m_chunk_info; /* pointer to buffer info */ 107 uchar *m_block; /* pointer to heap memory where record is stored */ 108 const rocksdb::Comparator *const m_comparator; 109 rocksdb::Slice m_key; /* current key pointed to by block ptr */ 110 rocksdb::Slice m_val; 111 112 size_t prepare(File fd, ulonglong f_offset, ulonglong chunk_size) 113 MY_ATTRIBUTE((__nonnull__)); 114 115 int read_next_chunk_from_disk(File fd) 116 MY_ATTRIBUTE((__nonnull__, __warn_unused_result__)); 117 118 int read_rec(rocksdb::Slice *const key, rocksdb::Slice *const val) 119 MY_ATTRIBUTE((__nonnull__, __warn_unused_result__)); 120 121 int read_slice(rocksdb::Slice *const slice, const uchar **block_ptr) 122 MY_ATTRIBUTE((__nonnull__, __warn_unused_result__)); 123 merge_heap_entrymerge_heap_entry124 explicit merge_heap_entry(const rocksdb::Comparator *const comparator) 125 : m_chunk_info(nullptr), m_block(nullptr), m_comparator(comparator) {} 126 }; 127 128 struct merge_heap_comparator { operatormerge_heap_comparator129 bool operator()(const std::shared_ptr<merge_heap_entry> &lhs, 130 const std::shared_ptr<merge_heap_entry> &rhs) { 131 return lhs->m_comparator->Compare(rhs->m_key, lhs->m_key) < 0; 132 } 133 }; 134 135 /* Represents a record in unsorted buffer */ 136 struct merge_record { 137 uchar *m_block; /* points to offset of key in sort buffer */ 138 const rocksdb::Comparator *const m_comparator; 139 140 bool operator<(const merge_record &record) const { 141 return merge_record_compare(this->m_block, record.m_block, m_comparator) < 142 0; 143 } 144 merge_recordmerge_record145 merge_record(uchar *const block, 146 const rocksdb::Comparator *const comparator) 147 : m_block(block), m_comparator(comparator) {} 148 }; 149 150 private: 151 const char *m_tmpfile_path; 152 const ulonglong m_merge_buf_size; 153 const ulonglong m_merge_combine_read_size; 154 const ulonglong m_merge_tmp_file_removal_delay; 155 rocksdb::ColumnFamilyHandle *m_cf_handle; 156 struct merge_file_info m_merge_file; 157 std::shared_ptr<merge_buf_info> m_rec_buf_unsorted; 158 std::shared_ptr<merge_buf_info> m_output_buf; 159 std::set<merge_record> m_offset_tree; 160 std::priority_queue<std::shared_ptr<merge_heap_entry>, 161 std::vector<std::shared_ptr<merge_heap_entry>>, 162 merge_heap_comparator> 163 m_merge_min_heap; 164 merge_store_uint64(uchar * const dst,uint64 n)165 static inline void merge_store_uint64(uchar *const dst, uint64 n) { 166 memcpy(dst, &n, sizeof(n)); 167 } 168 merge_read_uint64(const uchar ** buf_ptr,uint64 * const dst)169 static inline void merge_read_uint64(const uchar **buf_ptr, 170 uint64 *const dst) { 171 DBUG_ASSERT(buf_ptr != nullptr); 172 memcpy(dst, *buf_ptr, sizeof(uint64)); 173 *buf_ptr += sizeof(uint64); 174 } 175 as_slice(const uchar * block)176 static inline rocksdb::Slice as_slice(const uchar *block) { 177 uint64 len; 178 merge_read_uint64(&block, &len); 179 180 return rocksdb::Slice(reinterpret_cast<const char *>(block), len); 181 } 182 183 static int merge_record_compare(const uchar *a_block, const uchar *b_block, 184 const rocksdb::Comparator *const comparator) 185 MY_ATTRIBUTE((__nonnull__, __warn_unused_result__)); 186 187 void merge_read_rec(const uchar *const block, rocksdb::Slice *const key, 188 rocksdb::Slice *const val) MY_ATTRIBUTE((__nonnull__)); 189 190 void read_slice(rocksdb::Slice *slice, const uchar *block_ptr) 191 MY_ATTRIBUTE((__nonnull__)); 192 193 public: 194 Rdb_index_merge(const char *const tmpfile_path, 195 const ulonglong merge_buf_size, 196 const ulonglong merge_combine_read_size, 197 const ulonglong merge_tmp_file_removal_delay, 198 rocksdb::ColumnFamilyHandle *cf); 199 ~Rdb_index_merge(); 200 201 int init() MY_ATTRIBUTE((__nonnull__, __warn_unused_result__)); 202 203 int merge_file_create() MY_ATTRIBUTE((__nonnull__, __warn_unused_result__)); 204 205 int add(const rocksdb::Slice &key, const rocksdb::Slice &val) 206 MY_ATTRIBUTE((__nonnull__, __warn_unused_result__)); 207 208 int merge_buf_write() MY_ATTRIBUTE((__nonnull__, __warn_unused_result__)); 209 210 int next(rocksdb::Slice *const key, rocksdb::Slice *const val) 211 MY_ATTRIBUTE((__nonnull__, __warn_unused_result__)); 212 213 int merge_heap_prepare() MY_ATTRIBUTE((__nonnull__, __warn_unused_result__)); 214 215 void merge_heap_top(rocksdb::Slice *key, rocksdb::Slice *val) 216 MY_ATTRIBUTE((__nonnull__)); 217 218 int merge_heap_pop_and_get_next(rocksdb::Slice *const key, 219 rocksdb::Slice *const val) 220 MY_ATTRIBUTE((__nonnull__, __warn_unused_result__)); 221 222 void merge_reset(); 223 get_cf()224 rocksdb::ColumnFamilyHandle *get_cf() const { return m_cf_handle; } 225 }; 226 227 } // namespace myrocks 228