1 /* 2 Copyright (c) 2016, Facebook, Inc. 3 4 This program is free software; you can redistribute it and/or modify 5 it under the terms of the GNU General Public License as published by 6 the Free Software Foundation; version 2 of the License. 7 8 This program is distributed in the hope that it will be useful, 9 but WITHOUT ANY WARRANTY; without even the implied warranty of 10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 11 GNU General Public License for more details. 12 13 You should have received a copy of the GNU General Public License 14 along with this program; if not, write to the Free Software 15 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ 16 17 #pragma once 18 19 /* MySQL header files */ 20 #include "../sql/log.h" 21 #include "./handler.h" /* handler */ 22 #include "./my_global.h" /* ulonglong */ 23 24 /* C++ standard header files */ 25 #include <queue> 26 #include <set> 27 #include <vector> 28 29 /* RocksDB header files */ 30 #include "rocksdb/db.h" 31 32 /* MyRocks header files */ 33 #include "./rdb_comparator.h" 34 35 namespace myrocks { 36 37 /* 38 Length of delimiters used during inplace index creation. 39 */ 40 #define RDB_MERGE_CHUNK_LEN sizeof(size_t) 41 #define RDB_MERGE_REC_DELIMITER sizeof(size_t) 42 #define RDB_MERGE_KEY_DELIMITER RDB_MERGE_REC_DELIMITER 43 #define RDB_MERGE_VAL_DELIMITER RDB_MERGE_REC_DELIMITER 44 45 class Rdb_key_def; 46 class Rdb_tbl_def; 47 48 class Rdb_index_merge { 49 Rdb_index_merge(const Rdb_index_merge &p) = delete; 50 Rdb_index_merge &operator=(const Rdb_index_merge &p) = delete; 51 52 public: 53 /* Information about temporary files used in external merge sort */ 54 struct merge_file_info { 55 File fd = -1; /* file descriptor */ 56 ulong num_sort_buffers; /* number of sort buffers in temp file */ 57 }; 58 59 /* Buffer for sorting in main memory. */ 60 struct merge_buf_info { 61 /* heap memory allocated for main memory sort/merge */ 62 std::unique_ptr<uchar[]> block; 63 const ulonglong 64 block_len; /* amount of data bytes allocated for block above */ 65 ulonglong curr_offset; /* offset of the record pointer for the block */ 66 ulonglong disk_start_offset; /* where the chunk starts on disk */ 67 ulonglong disk_curr_offset; /* current offset on disk */ 68 ulonglong total_size; /* total # of data bytes in chunk */ 69 70 void store_key_value(const rocksdb::Slice &key, const rocksdb::Slice &val) 71 __attribute__((__nonnull__)); 72 73 void store_slice(const rocksdb::Slice &slice) __attribute__((__nonnull__)); 74 75 size_t prepare(File fd, ulonglong f_offset) __attribute__((__nonnull__)); 76 77 int read_next_chunk_from_disk(File fd) 78 __attribute__((__nonnull__, __warn_unused_result__)); 79 is_chunk_finishedmerge_buf_info80 inline bool is_chunk_finished() const { 81 return curr_offset + disk_curr_offset - disk_start_offset == total_size; 82 } 83 has_spacemerge_buf_info84 inline bool has_space(uint64 needed) const { 85 return curr_offset + needed <= block_len; 86 } 87 merge_buf_infomerge_buf_info88 explicit merge_buf_info(const ulonglong merge_block_size) 89 : block(nullptr), block_len(merge_block_size), curr_offset(0), 90 disk_start_offset(0), disk_curr_offset(0), 91 total_size(merge_block_size) { 92 /* Will throw an exception if it runs out of memory here */ 93 block = std::unique_ptr<uchar[]>(new uchar[merge_block_size]); 94 95 /* Initialize entire buffer to 0 to avoid valgrind errors */ 96 memset(block.get(), 0, merge_block_size); 97 } 98 }; 99 100 /* Represents an entry in the heap during merge phase of external sort */ 101 struct merge_heap_entry { 102 std::shared_ptr<merge_buf_info> chunk_info; /* pointer to buffer info */ 103 uchar *block; /* pointer to heap memory where record is stored */ 104 const rocksdb::Comparator *const comparator; 105 rocksdb::Slice key; /* current key pointed to by block ptr */ 106 rocksdb::Slice val; 107 108 size_t prepare(File fd, ulonglong f_offset, ulonglong chunk_size) 109 __attribute__((__nonnull__)); 110 111 int read_next_chunk_from_disk(File fd) 112 __attribute__((__nonnull__, __warn_unused_result__)); 113 114 int read_rec(rocksdb::Slice *const key, rocksdb::Slice *const val) 115 __attribute__((__nonnull__, __warn_unused_result__)); 116 117 int read_slice(rocksdb::Slice *const slice, const uchar **block_ptr) 118 __attribute__((__nonnull__, __warn_unused_result__)); 119 merge_heap_entrymerge_heap_entry120 explicit merge_heap_entry(const rocksdb::Comparator *const comparator) 121 : chunk_info(nullptr), block(nullptr), comparator(comparator) {} 122 }; 123 124 struct merge_heap_comparator { operatormerge_heap_comparator125 bool operator()(const std::shared_ptr<merge_heap_entry> &lhs, 126 const std::shared_ptr<merge_heap_entry> &rhs) { 127 return lhs->comparator->Compare(rhs->key, lhs->key) < 0; 128 } 129 }; 130 131 /* Represents a record in unsorted buffer */ 132 struct merge_record { 133 uchar *block; /* points to offset of key in sort buffer */ 134 const rocksdb::Comparator *const comparator; 135 136 bool operator<(const merge_record &record) const { 137 return merge_record_compare(this->block, record.block, comparator) < 0; 138 } 139 merge_recordmerge_record140 merge_record(uchar *const block, 141 const rocksdb::Comparator *const comparator) 142 : block(block), comparator(comparator) {} 143 }; 144 145 private: 146 const char *m_tmpfile_path; 147 const ulonglong m_merge_buf_size; 148 const ulonglong m_merge_combine_read_size; 149 const rocksdb::Comparator *m_comparator; 150 struct merge_file_info m_merge_file; 151 std::shared_ptr<merge_buf_info> m_rec_buf_unsorted; 152 std::shared_ptr<merge_buf_info> m_output_buf; 153 std::set<merge_record> m_offset_tree; 154 std::priority_queue<std::shared_ptr<merge_heap_entry>, 155 std::vector<std::shared_ptr<merge_heap_entry>>, 156 merge_heap_comparator> 157 m_merge_min_heap; 158 merge_store_uint64(uchar * const dst,uint64 n)159 static inline void merge_store_uint64(uchar *const dst, uint64 n) { 160 memcpy(dst, &n, sizeof(n)); 161 } 162 merge_read_uint64(const uchar ** buf_ptr,uint64 * const dst)163 static inline void merge_read_uint64(const uchar **buf_ptr, 164 uint64 *const dst) { 165 DBUG_ASSERT(buf_ptr != nullptr); 166 memcpy(dst, *buf_ptr, sizeof(uint64)); 167 *buf_ptr += sizeof(uint64); 168 } 169 as_slice(const uchar * block)170 static inline rocksdb::Slice as_slice(const uchar *block) { 171 uint64 len; 172 merge_read_uint64(&block, &len); 173 174 return rocksdb::Slice(reinterpret_cast<const char *>(block), len); 175 } 176 177 static int merge_record_compare(const uchar *a_block, const uchar *b_block, 178 const rocksdb::Comparator *const comparator) 179 __attribute__((__nonnull__, __warn_unused_result__)); 180 181 void merge_read_rec(const uchar *const block, rocksdb::Slice *const key, 182 rocksdb::Slice *const val) __attribute__((__nonnull__)); 183 184 void read_slice(rocksdb::Slice *slice, const uchar *block_ptr) 185 __attribute__((__nonnull__)); 186 187 public: 188 Rdb_index_merge(const char *const tmpfile_path, 189 const ulonglong &merge_buf_size, 190 const ulonglong &merge_combine_read_size, 191 const rocksdb::Comparator *const comparator); 192 ~Rdb_index_merge(); 193 194 int init() __attribute__((__nonnull__, __warn_unused_result__)); 195 196 int merge_file_create() __attribute__((__nonnull__, __warn_unused_result__)); 197 198 int add(const rocksdb::Slice &key, const rocksdb::Slice &val) 199 __attribute__((__nonnull__, __warn_unused_result__)); 200 201 int merge_buf_write() __attribute__((__nonnull__, __warn_unused_result__)); 202 203 int next(rocksdb::Slice *const key, rocksdb::Slice *const val) 204 __attribute__((__nonnull__, __warn_unused_result__)); 205 206 int merge_heap_prepare() __attribute__((__nonnull__, __warn_unused_result__)); 207 208 void merge_heap_top(rocksdb::Slice *key, rocksdb::Slice *val) 209 __attribute__((__nonnull__)); 210 211 int merge_heap_pop_and_get_next(rocksdb::Slice *const key, 212 rocksdb::Slice *const val) 213 __attribute__((__nonnull__, __warn_unused_result__)); 214 215 void merge_reset(); 216 }; 217 218 } // namespace myrocks 219