1 /*
2 Copyright (c) 2016, Facebook, Inc.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
16
17 /* This C++ file's header file */
18 #include "./rdb_index_merge.h"
19
20 /* MyRocks header files */
21 #include "./ha_rocksdb.h"
22 #include "./rdb_datadic.h"
23
24 namespace myrocks {
25
Rdb_index_merge(const char * const tmpfile_path,const ulonglong & merge_buf_size,const ulonglong & merge_combine_read_size,const rocksdb::Comparator * const comparator)26 Rdb_index_merge::Rdb_index_merge(const char *const tmpfile_path,
27 const ulonglong &merge_buf_size,
28 const ulonglong &merge_combine_read_size,
29 const rocksdb::Comparator *const comparator)
30 : m_tmpfile_path(tmpfile_path), m_merge_buf_size(merge_buf_size),
31 m_merge_combine_read_size(merge_combine_read_size),
32 m_comparator(comparator), m_rec_buf_unsorted(nullptr),
33 m_output_buf(nullptr) {}
34
~Rdb_index_merge()35 Rdb_index_merge::~Rdb_index_merge() {
36 /*
37 Close tmp file, we don't need to worry about deletion, mysql handles it.
38 */
39 my_close(m_merge_file.fd, MYF(MY_WME));
40 }
41
init()42 int Rdb_index_merge::init() {
43 /*
44 Create a temporary merge file on disk to store sorted chunks during
45 inplace index creation.
46 */
47 if (merge_file_create()) {
48 return HA_ERR_INTERNAL_ERROR;
49 }
50
51 /*
52 Then, allocate buffer to store unsorted records before they are written
53 to disk. They will be written to disk sorted. A sorted tree is used to
54 keep track of the offset of each record within the unsorted buffer.
55 */
56 m_rec_buf_unsorted =
57 std::shared_ptr<merge_buf_info>(new merge_buf_info(m_merge_buf_size));
58
59 /*
60 Allocate output buffer that will contain sorted block that is written to
61 disk.
62 */
63 m_output_buf =
64 std::shared_ptr<merge_buf_info>(new merge_buf_info(m_merge_buf_size));
65
66 return HA_EXIT_SUCCESS;
67 }
68
69 /**
70 Create a merge file in the given location.
71 */
merge_file_create()72 int Rdb_index_merge::merge_file_create() {
73 DBUG_ASSERT(m_merge_file.fd == -1);
74
75 int fd;
76 /* If no path set for tmpfile, use mysql_tmpdir by default */
77 if (m_tmpfile_path == nullptr) {
78 fd = mysql_tmpfile("myrocks");
79 } else {
80 char filename[FN_REFLEN];
81 fd = create_temp_file(filename, m_tmpfile_path, "myrocks",
82 O_CREAT | O_EXCL | O_RDWR | O_TEMPORARY,
83 MYF(MY_WME));
84 if (fd >= 0) {
85 #ifndef __WIN__
86 /*
87 This can be removed once the following bug is fixed:
88 Bug #28903 create_temp_file() doesn't honor O_TEMPORARY option
89 (file not removed) (Unix)
90 */
91 unlink(filename);
92 #endif /* !__WIN__ */
93 }
94 }
95
96 if (fd < 0) {
97 return HA_ERR_INTERNAL_ERROR;
98 }
99 m_merge_file.fd = fd;
100 m_merge_file.num_sort_buffers = 0;
101
102 return HA_EXIT_SUCCESS;
103 }
104
105 /**
106 Add record to offset tree (and unsorted merge buffer) in preparation for
107 writing out to disk in sorted chunks.
108
109 If buffer in memory is full, write the buffer out to disk sorted using the
110 offset tree, and clear the tree. (Happens in merge_buf_write)
111 */
add(const rocksdb::Slice & key,const rocksdb::Slice & val)112 int Rdb_index_merge::add(const rocksdb::Slice &key, const rocksdb::Slice &val) {
113 /* Adding a record after heap is already created results in error */
114 DBUG_ASSERT(m_merge_min_heap.empty());
115
116 /*
117 Check if sort buffer is going to be out of space, if so write it
118 out to disk in sorted order using offset tree.
119 */
120 const uint total_offset = RDB_MERGE_CHUNK_LEN +
121 m_rec_buf_unsorted->curr_offset +
122 RDB_MERGE_KEY_DELIMITER + RDB_MERGE_VAL_DELIMITER +
123 key.size() + val.size();
124 if (total_offset >= m_rec_buf_unsorted->total_size) {
125 /*
126 If the offset tree is empty here, that means that the proposed key to
127 add is too large for the buffer.
128 */
129 if (m_offset_tree.empty()) {
130 // NO_LINT_DEBUG
131 sql_print_error("Sort buffer size is too small to process merge. "
132 "Please set merge buffer size to a higher value.");
133 return HA_ERR_INTERNAL_ERROR;
134 }
135
136 if (merge_buf_write()) {
137 // NO_LINT_DEBUG
138 sql_print_error("Error writing sort buffer to disk.");
139 return HA_ERR_INTERNAL_ERROR;
140 }
141 }
142
143 const ulonglong rec_offset = m_rec_buf_unsorted->curr_offset;
144
145 /*
146 Store key and value in temporary unsorted in memory buffer pointed to by
147 offset tree.
148 */
149 m_rec_buf_unsorted->store_key_value(key, val);
150
151 /* Find sort order of the new record */
152 m_offset_tree.emplace(m_rec_buf_unsorted->block.get() + rec_offset,
153 m_comparator);
154
155 return HA_EXIT_SUCCESS;
156 }
157
158 /**
159 Sort + write merge buffer chunk out to disk.
160 */
merge_buf_write()161 int Rdb_index_merge::merge_buf_write() {
162 DBUG_ASSERT(m_merge_file.fd != -1);
163 DBUG_ASSERT(m_rec_buf_unsorted != nullptr);
164 DBUG_ASSERT(m_output_buf != nullptr);
165 DBUG_ASSERT(!m_offset_tree.empty());
166
167 /* Write actual chunk size to first 8 bytes of the merge buffer */
168 merge_store_uint64(m_output_buf->block.get(),
169 m_rec_buf_unsorted->curr_offset + RDB_MERGE_CHUNK_LEN);
170 m_output_buf->curr_offset += RDB_MERGE_CHUNK_LEN;
171
172 /*
173 Iterate through the offset tree. Should be ordered by the secondary key
174 at this point.
175 */
176 for (const auto &rec : m_offset_tree) {
177 DBUG_ASSERT(m_output_buf->curr_offset <= m_merge_buf_size);
178
179 /* Read record from offset (should never fail) */
180 rocksdb::Slice key;
181 rocksdb::Slice val;
182 merge_read_rec(rec.block, &key, &val);
183
184 /* Store key and value into sorted output buffer */
185 m_output_buf->store_key_value(key, val);
186 }
187
188 DBUG_ASSERT(m_output_buf->curr_offset <= m_output_buf->total_size);
189
190 /*
191 Write output buffer to disk.
192
193 Need to position cursor to the chunk it needs to be at on filesystem
194 then write into the respective merge buffer.
195 */
196 if (my_seek(m_merge_file.fd, m_merge_file.num_sort_buffers * m_merge_buf_size,
197 SEEK_SET, MYF(0)) == MY_FILEPOS_ERROR) {
198 // NO_LINT_DEBUG
199 sql_print_error("Error seeking to location in merge file on disk.");
200 return HA_ERR_INTERNAL_ERROR;
201 }
202
203 /*
204 Add a file sync call here to flush the data out. Otherwise, the filesystem
205 cache can flush out all of the files at the same time, causing a write
206 burst.
207 */
208 if (my_write(m_merge_file.fd, m_output_buf->block.get(),
209 m_output_buf->total_size, MYF(MY_WME | MY_NABP)) ||
210 mysql_file_sync(m_merge_file.fd, MYF(MY_WME))) {
211 // NO_LINT_DEBUG
212 sql_print_error("Error writing sorted merge buffer to disk.");
213 return HA_ERR_INTERNAL_ERROR;
214 }
215
216 /* Increment merge file offset to track number of merge buffers written */
217 m_merge_file.num_sort_buffers += 1;
218
219 /* Reset everything for next run */
220 merge_reset();
221
222 return HA_EXIT_SUCCESS;
223 }
224
225 /**
226 Prepare n-way merge of n sorted buffers on disk, using a heap sorted by
227 secondary key records.
228 */
merge_heap_prepare()229 int Rdb_index_merge::merge_heap_prepare() {
230 DBUG_ASSERT(m_merge_min_heap.empty());
231
232 /*
233 If the offset tree is not empty, there are still some records that need to
234 be written to disk. Write them out now.
235 */
236 if (!m_offset_tree.empty() && merge_buf_write()) {
237 return HA_ERR_INTERNAL_ERROR;
238 }
239
240 DBUG_ASSERT(m_merge_file.num_sort_buffers > 0);
241
242 /*
243 For an n-way merge, we need to read chunks of each merge file
244 simultaneously.
245 */
246 ulonglong chunk_size =
247 m_merge_combine_read_size / m_merge_file.num_sort_buffers;
248 if (chunk_size >= m_merge_buf_size) {
249 chunk_size = m_merge_buf_size;
250 }
251
252 /* Allocate buffers for each chunk */
253 for (ulonglong i = 0; i < m_merge_file.num_sort_buffers; i++) {
254 const auto entry = std::make_shared<merge_heap_entry>(m_comparator);
255
256 /*
257 Read chunk_size bytes from each chunk on disk, and place inside
258 respective chunk buffer.
259 */
260 const size_t total_size =
261 entry->prepare(m_merge_file.fd, i * m_merge_buf_size, chunk_size);
262
263 if (total_size == (size_t)-1) {
264 return HA_ERR_INTERNAL_ERROR;
265 }
266
267 /* Can reach this condition if an index was added on table w/ no rows */
268 if (total_size - RDB_MERGE_CHUNK_LEN == 0) {
269 break;
270 }
271
272 /* Read the first record from each buffer to initially populate the heap */
273 if (entry->read_rec(&entry->key, &entry->val)) {
274 // NO_LINT_DEBUG
275 sql_print_error("Chunk size is too small to process merge.");
276 return HA_ERR_INTERNAL_ERROR;
277 }
278
279 m_merge_min_heap.push(std::move(entry));
280 }
281
282 return HA_EXIT_SUCCESS;
283 }
284
285 /**
286 Create and/or iterate through keys in the merge heap.
287 */
next(rocksdb::Slice * const key,rocksdb::Slice * const val)288 int Rdb_index_merge::next(rocksdb::Slice *const key,
289 rocksdb::Slice *const val) {
290 /*
291 If table fits in one sort buffer, we can optimize by writing
292 the sort buffer directly through to the sstfilewriter instead of
293 needing to create tmp files/heap to merge the sort buffers.
294
295 If there are no sort buffer records (alters on empty tables),
296 also exit here.
297 */
298 if (m_merge_file.num_sort_buffers == 0) {
299 if (m_offset_tree.empty()) {
300 return -1;
301 }
302
303 const auto rec = m_offset_tree.begin();
304
305 /* Read record from offset */
306 merge_read_rec(rec->block, key, val);
307
308 m_offset_tree.erase(rec);
309 return HA_EXIT_SUCCESS;
310 }
311
312 int res;
313
314 /*
315 If heap and heap chunk info are empty, we must be beginning the merge phase
316 of the external sort. Populate the heap with initial values from each
317 disk chunk.
318 */
319 if (m_merge_min_heap.empty()) {
320 if ((res = merge_heap_prepare())) {
321 // NO_LINT_DEBUG
322 sql_print_error("Error during preparation of heap.");
323 return res;
324 }
325
326 /*
327 Return the first top record without popping, as we haven't put this
328 inside the SST file yet.
329 */
330 merge_heap_top(key, val);
331 return HA_EXIT_SUCCESS;
332 }
333
334 DBUG_ASSERT(!m_merge_min_heap.empty());
335 return merge_heap_pop_and_get_next(key, val);
336 }
337
338 /**
339 Get current top record from the heap.
340 */
merge_heap_top(rocksdb::Slice * const key,rocksdb::Slice * const val)341 void Rdb_index_merge::merge_heap_top(rocksdb::Slice *const key,
342 rocksdb::Slice *const val) {
343 DBUG_ASSERT(!m_merge_min_heap.empty());
344
345 const std::shared_ptr<merge_heap_entry> &entry = m_merge_min_heap.top();
346 *key = entry->key;
347 *val = entry->val;
348 }
349
350 /**
351 Pops the top record, and uses it to read next record from the
352 corresponding sort buffer and push onto the heap.
353
354 Returns -1 when there are no more records in the heap.
355 */
merge_heap_pop_and_get_next(rocksdb::Slice * const key,rocksdb::Slice * const val)356 int Rdb_index_merge::merge_heap_pop_and_get_next(rocksdb::Slice *const key,
357 rocksdb::Slice *const val) {
358 /*
359 Make a new reference to shared ptr so it doesn't get destroyed
360 during pop(). We are going to push this entry back onto the heap.
361 */
362 const std::shared_ptr<merge_heap_entry> entry = m_merge_min_heap.top();
363 m_merge_min_heap.pop();
364
365 /*
366 We are finished w/ current chunk if:
367 current_offset + disk_offset == total_size
368
369 Return without adding entry back onto heap.
370 If heap is also empty, we must be finished with merge.
371 */
372 if (entry->chunk_info->is_chunk_finished()) {
373 if (m_merge_min_heap.empty()) {
374 return -1;
375 }
376
377 merge_heap_top(key, val);
378 return HA_EXIT_SUCCESS;
379 }
380
381 /*
382 Make sure we haven't reached the end of the chunk.
383 */
384 DBUG_ASSERT(!entry->chunk_info->is_chunk_finished());
385
386 /*
387 If merge_read_rec fails, it means the either the chunk was cut off
388 or we've reached the end of the respective chunk.
389 */
390 if (entry->read_rec(&entry->key, &entry->val)) {
391 if (entry->read_next_chunk_from_disk(m_merge_file.fd)) {
392 return HA_ERR_INTERNAL_ERROR;
393 }
394
395 /* Try reading record again, should never fail. */
396 if (entry->read_rec(&entry->key, &entry->val)) {
397 return HA_ERR_INTERNAL_ERROR;
398 }
399 }
400
401 /* Push entry back on to the heap w/ updated buffer + offset ptr */
402 m_merge_min_heap.push(std::move(entry));
403
404 /* Return the current top record on heap */
405 merge_heap_top(key, val);
406 return HA_EXIT_SUCCESS;
407 }
408
read_next_chunk_from_disk(File fd)409 int Rdb_index_merge::merge_heap_entry::read_next_chunk_from_disk(File fd) {
410 if (chunk_info->read_next_chunk_from_disk(fd)) {
411 return HA_EXIT_FAILURE;
412 }
413
414 block = chunk_info->block.get();
415 return HA_EXIT_SUCCESS;
416 }
417
read_next_chunk_from_disk(File fd)418 int Rdb_index_merge::merge_buf_info::read_next_chunk_from_disk(File fd) {
419 disk_curr_offset += curr_offset;
420
421 if (my_seek(fd, disk_curr_offset, SEEK_SET, MYF(0)) == MY_FILEPOS_ERROR) {
422 // NO_LINT_DEBUG
423 sql_print_error("Error seeking to location in merge file on disk.");
424 return HA_EXIT_FAILURE;
425 }
426
427 /* Overwrite the old block */
428 const size_t bytes_read = my_read(fd, block.get(), block_len, MYF(MY_WME));
429 if (bytes_read == (size_t)-1) {
430 // NO_LINT_DEBUG
431 sql_print_error("Error reading merge file from disk.");
432 return HA_EXIT_FAILURE;
433 }
434
435 curr_offset = 0;
436 return HA_EXIT_SUCCESS;
437 }
438
439 /**
440 Get records from offset within sort buffer and compare them.
441 Sort by least to greatest.
442 */
merge_record_compare(const uchar * const a_block,const uchar * const b_block,const rocksdb::Comparator * const comparator)443 int Rdb_index_merge::merge_record_compare(
444 const uchar *const a_block, const uchar *const b_block,
445 const rocksdb::Comparator *const comparator) {
446 return comparator->Compare(as_slice(a_block), as_slice(b_block));
447 }
448
449 /**
450 Given an offset in a merge sort buffer, read out the keys + values.
451 After this, block will point to the next record in the buffer.
452 **/
merge_read_rec(const uchar * const block,rocksdb::Slice * const key,rocksdb::Slice * const val)453 void Rdb_index_merge::merge_read_rec(const uchar *const block,
454 rocksdb::Slice *const key,
455 rocksdb::Slice *const val) {
456 /* Read key at block offset into key slice and the value into value slice*/
457 read_slice(key, block);
458 read_slice(val, block + RDB_MERGE_REC_DELIMITER + key->size());
459 }
460
read_slice(rocksdb::Slice * slice,const uchar * block_ptr)461 void Rdb_index_merge::read_slice(rocksdb::Slice *slice,
462 const uchar *block_ptr) {
463 uint64 slice_len;
464 merge_read_uint64(&block_ptr, &slice_len);
465
466 *slice = rocksdb::Slice(reinterpret_cast<const char *>(block_ptr), slice_len);
467 }
468
read_rec(rocksdb::Slice * const key,rocksdb::Slice * const val)469 int Rdb_index_merge::merge_heap_entry::read_rec(rocksdb::Slice *const key,
470 rocksdb::Slice *const val) {
471 const uchar *block_ptr = block;
472 const auto orig_offset = chunk_info->curr_offset;
473 const auto orig_block = block;
474
475 /* Read key at block offset into key slice and the value into value slice*/
476 if (read_slice(key, &block_ptr) != 0) {
477 return HA_EXIT_FAILURE;
478 }
479
480 chunk_info->curr_offset += (uintptr_t)block_ptr - (uintptr_t)block;
481 block += (uintptr_t)block_ptr - (uintptr_t)block;
482
483 if (read_slice(val, &block_ptr) != 0) {
484 chunk_info->curr_offset = orig_offset;
485 block = orig_block;
486 return HA_EXIT_FAILURE;
487 }
488
489 chunk_info->curr_offset += (uintptr_t)block_ptr - (uintptr_t)block;
490 block += (uintptr_t)block_ptr - (uintptr_t)block;
491
492 return HA_EXIT_SUCCESS;
493 }
494
read_slice(rocksdb::Slice * const slice,const uchar ** block_ptr)495 int Rdb_index_merge::merge_heap_entry::read_slice(rocksdb::Slice *const slice,
496 const uchar **block_ptr) {
497 if (!chunk_info->has_space(RDB_MERGE_REC_DELIMITER)) {
498 return HA_EXIT_FAILURE;
499 }
500
501 uint64 slice_len;
502 merge_read_uint64(block_ptr, &slice_len);
503 if (!chunk_info->has_space(RDB_MERGE_REC_DELIMITER + slice_len)) {
504 return HA_EXIT_FAILURE;
505 }
506
507 *slice =
508 rocksdb::Slice(reinterpret_cast<const char *>(*block_ptr), slice_len);
509 *block_ptr += slice_len;
510 return HA_EXIT_SUCCESS;
511 }
512
prepare(File fd,ulonglong f_offset,ulonglong chunk_size)513 size_t Rdb_index_merge::merge_heap_entry::prepare(File fd, ulonglong f_offset,
514 ulonglong chunk_size) {
515 chunk_info = std::make_shared<merge_buf_info>(chunk_size);
516 const size_t res = chunk_info->prepare(fd, f_offset);
517 if (res != (size_t)-1) {
518 block = chunk_info->block.get() + RDB_MERGE_CHUNK_LEN;
519 }
520
521 return res;
522 }
523
prepare(File fd,ulonglong f_offset)524 size_t Rdb_index_merge::merge_buf_info::prepare(File fd, ulonglong f_offset) {
525 disk_start_offset = f_offset;
526 disk_curr_offset = f_offset;
527
528 /*
529 Need to position cursor to the chunk it needs to be at on filesystem
530 then read 'chunk_size' bytes into the respective chunk buffer.
531 */
532 if (my_seek(fd, f_offset, SEEK_SET, MYF(0)) == MY_FILEPOS_ERROR) {
533 // NO_LINT_DEBUG
534 sql_print_error("Error seeking to location in merge file on disk.");
535 return (size_t)-1;
536 }
537
538 const size_t bytes_read = my_read(fd, block.get(), total_size, MYF(MY_WME));
539 if (bytes_read == (size_t)-1) {
540 // NO_LINT_DEBUG
541 sql_print_error("Error reading merge file from disk.");
542 return (size_t)-1;
543 }
544
545 /*
546 Read the first 8 bytes of each chunk, this gives us the actual
547 size of each chunk.
548 */
549 const uchar *block_ptr = block.get();
550 merge_read_uint64(&block_ptr, &total_size);
551 curr_offset += RDB_MERGE_CHUNK_LEN;
552 return total_size;
553 }
554
555 /* Store key and value w/ their respective delimiters at the given offset */
store_key_value(const rocksdb::Slice & key,const rocksdb::Slice & val)556 void Rdb_index_merge::merge_buf_info::store_key_value(
557 const rocksdb::Slice &key, const rocksdb::Slice &val) {
558 store_slice(key);
559 store_slice(val);
560 }
561
store_slice(const rocksdb::Slice & slice)562 void Rdb_index_merge::merge_buf_info::store_slice(const rocksdb::Slice &slice) {
563 /* Store length delimiter */
564 merge_store_uint64(&block[curr_offset], slice.size());
565
566 /* Store slice data */
567 memcpy(&block[curr_offset + RDB_MERGE_REC_DELIMITER], slice.data(),
568 slice.size());
569
570 curr_offset += slice.size() + RDB_MERGE_REC_DELIMITER;
571 }
572
merge_reset()573 void Rdb_index_merge::merge_reset() {
574 /*
575 Either error, or all values in the sort buffer have been written to disk,
576 so we need to clear the offset tree.
577 */
578 m_offset_tree.clear();
579
580 /* Reset sort buffer block */
581 if (m_rec_buf_unsorted && m_rec_buf_unsorted->block) {
582 m_rec_buf_unsorted->curr_offset = 0;
583 }
584
585 /* Reset output buf */
586 if (m_output_buf && m_output_buf->block) {
587 m_output_buf->curr_offset = 0;
588 }
589 }
590
591 } // namespace myrocks
592