1 /*
2    Copyright (c) 2016, Facebook, Inc.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation; version 2 of the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
16 
17 /* This C++ file's header file */
18 #include "./rdb_index_merge.h"
19 
20 /* MySQL header files */
21 #include "mysql/plugin.h"
22 #include "mysql/psi/mysql_file.h"
23 
24 /* MyRocks header files */
25 #include "./ha_rocksdb.h"
26 #include "./rdb_datadic.h"
27 
28 namespace myrocks {
29 
Rdb_index_merge(const char * const tmpfile_path,const ulonglong merge_buf_size,const ulonglong merge_combine_read_size,const ulonglong merge_tmp_file_removal_delay,rocksdb::ColumnFamilyHandle * cf)30 Rdb_index_merge::Rdb_index_merge(const char *const tmpfile_path,
31                                  const ulonglong merge_buf_size,
32                                  const ulonglong merge_combine_read_size,
33                                  const ulonglong merge_tmp_file_removal_delay,
34                                  rocksdb::ColumnFamilyHandle *cf)
35     : m_tmpfile_path(tmpfile_path),
36       m_merge_buf_size(merge_buf_size),
37       m_merge_combine_read_size(merge_combine_read_size),
38       m_merge_tmp_file_removal_delay(merge_tmp_file_removal_delay),
39       m_cf_handle(cf),
40       m_rec_buf_unsorted(nullptr),
41       m_output_buf(nullptr) {}
42 
~Rdb_index_merge()43 Rdb_index_merge::~Rdb_index_merge() {
44   /*
45     If merge_tmp_file_removal_delay is set, sleep between calls to chsize.
46 
47     This helps mitigate potential trim stalls on flash when large files are
48     being deleted too quickly.
49   */
50   if (m_merge_tmp_file_removal_delay > 0) {
51     uint64 curr_size = m_merge_buf_size * m_merge_file.m_num_sort_buffers;
52     for (uint i = 0; i < m_merge_file.m_num_sort_buffers; i++) {
53       if (my_chsize(m_merge_file.m_fd, curr_size, 0, MYF(MY_WME))) {
54         // NO_LINT_DEBUG
55         sql_print_error("Error truncating file during fast index creation.");
56       }
57 
58       my_sleep(m_merge_tmp_file_removal_delay * 1000);
59       // Not aborting on fsync error since the tmp file is not used anymore
60       if (mysql_file_sync(m_merge_file.m_fd, MYF(MY_WME))) {
61         // NO_LINT_DEBUG
62         sql_print_error("Error flushing truncated MyRocks merge buffer.");
63       }
64       curr_size -= m_merge_buf_size;
65     }
66   }
67 
68   /*
69     Close file descriptor, we don't need to worry about deletion,
70     mysql handles it.
71   */
72   my_close(m_merge_file.m_fd, MYF(MY_WME));
73 }
74 
init()75 int Rdb_index_merge::init() {
76   /*
77     Create a temporary merge file on disk to store sorted chunks during
78     inplace index creation.
79   */
80   if (merge_file_create()) {
81     return HA_ERR_ROCKSDB_MERGE_FILE_ERR;
82   }
83 
84   /*
85     Then, allocate buffer to store unsorted records before they are written
86     to disk. They will be written to disk sorted. A sorted tree is used to
87     keep track of the offset of each record within the unsorted buffer.
88   */
89   m_rec_buf_unsorted =
90       std::shared_ptr<merge_buf_info>(new merge_buf_info(m_merge_buf_size));
91 
92   /*
93     Allocate output buffer that will contain sorted block that is written to
94     disk.
95   */
96   m_output_buf =
97       std::shared_ptr<merge_buf_info>(new merge_buf_info(m_merge_buf_size));
98 
99   return HA_EXIT_SUCCESS;
100 }
101 
102 /**
103   Create a merge file in the given location.
104 */
merge_file_create()105 int Rdb_index_merge::merge_file_create() {
106   assert(m_merge_file.m_fd == -1);
107 
108   int fd;
109   /* If no path set for tmpfile, use mysql_tmpdir by default */
110   if (m_tmpfile_path == nullptr) {
111     fd = mysql_tmpfile("myrocks");
112   } else {
113     char filename[FN_REFLEN];
114     fd = create_temp_file(filename, m_tmpfile_path, "myrocks",
115                           O_CREAT | O_EXCL | O_RDWR, MYF(MY_WME));
116     if (fd >= 0) {
117 #ifndef __WIN__
118       /*
119         This can be removed once the following bug is fixed:
120         Bug #28903  create_temp_file() doesn't honor O_TEMPORARY option
121                     (file not removed) (Unix)
122       */
123       unlink(filename);
124 #endif /* !__WIN__ */
125     }
126   }
127 
128   if (fd < 0) {
129     // NO_LINT_DEBUG
130     sql_print_error("Failed to create temp file during fast index creation.");
131     return HA_ERR_ROCKSDB_MERGE_FILE_ERR;
132   }
133 
134   m_merge_file.m_fd = fd;
135   m_merge_file.m_num_sort_buffers = 0;
136 
137   return HA_EXIT_SUCCESS;
138 }
139 
140 /**
141   Add record to offset tree (and unsorted merge buffer) in preparation for
142   writing out to disk in sorted chunks.
143 
144   If buffer in memory is full, write the buffer out to disk sorted using the
145   offset tree, and clear the tree. (Happens in merge_buf_write)
146 */
add(const rocksdb::Slice & key,const rocksdb::Slice & val)147 int Rdb_index_merge::add(const rocksdb::Slice &key, const rocksdb::Slice &val) {
148   /* Adding a record after heap is already created results in error */
149   assert(m_merge_min_heap.empty());
150 
151   /*
152     Check if sort buffer is going to be out of space, if so write it
153     out to disk in sorted order using offset tree.
154   */
155   const uint total_offset = RDB_MERGE_CHUNK_LEN +
156                             m_rec_buf_unsorted->m_curr_offset +
157                             RDB_MERGE_KEY_DELIMITER + RDB_MERGE_VAL_DELIMITER +
158                             key.size() + val.size();
159   if (total_offset >= m_rec_buf_unsorted->m_total_size) {
160     /*
161       If the offset tree is empty here, that means that the proposed key to
162       add is too large for the buffer.
163     */
164     if (m_offset_tree.empty()) {
165       // NO_LINT_DEBUG
166       sql_print_error(
167           "Sort buffer size is too small to process merge. "
168           "Please set merge buffer size to a higher value.");
169       return HA_ERR_ROCKSDB_MERGE_FILE_ERR;
170     }
171 
172     if (merge_buf_write()) {
173       // NO_LINT_DEBUG
174       sql_print_error("Error writing sort buffer to disk.");
175       return HA_ERR_ROCKSDB_MERGE_FILE_ERR;
176     }
177   }
178 
179   const ulonglong rec_offset = m_rec_buf_unsorted->m_curr_offset;
180 
181   /*
182     Store key and value in temporary unsorted in memory buffer pointed to by
183     offset tree.
184   */
185   m_rec_buf_unsorted->store_key_value(key, val);
186 
187   /* Find sort order of the new record */
188   auto res =
189       m_offset_tree.emplace(m_rec_buf_unsorted->m_block.get() + rec_offset,
190                             m_cf_handle->GetComparator());
191   if (!res.second) {
192     my_printf_error(ER_DUP_ENTRY,
193                     "Failed to insert the record: the key already exists",
194                     MYF(0));
195     return ER_DUP_ENTRY;
196   }
197 
198   return HA_EXIT_SUCCESS;
199 }
200 
201 /**
202   Sort + write merge buffer chunk out to disk.
203 */
merge_buf_write()204 int Rdb_index_merge::merge_buf_write() {
205   assert(m_merge_file.m_fd != -1);
206   assert(m_rec_buf_unsorted != nullptr);
207   assert(m_output_buf != nullptr);
208   assert(!m_offset_tree.empty());
209 
210   /* Write actual chunk size to first 8 bytes of the merge buffer */
211   merge_store_uint64(m_output_buf->m_block.get(),
212                      m_rec_buf_unsorted->m_curr_offset + RDB_MERGE_CHUNK_LEN);
213   m_output_buf->m_curr_offset += RDB_MERGE_CHUNK_LEN;
214 
215   /*
216     Iterate through the offset tree.  Should be ordered by the secondary key
217     at this point.
218   */
219   for (const auto &rec : m_offset_tree) {
220     assert(m_output_buf->m_curr_offset <= m_merge_buf_size);
221 
222     /* Read record from offset (should never fail) */
223     rocksdb::Slice key;
224     rocksdb::Slice val;
225     merge_read_rec(rec.m_block, &key, &val);
226 
227     /* Store key and value into sorted output buffer */
228     m_output_buf->store_key_value(key, val);
229   }
230 
231   assert(m_output_buf->m_curr_offset <= m_output_buf->m_total_size);
232 
233   /*
234     Write output buffer to disk.
235 
236     Need to position cursor to the chunk it needs to be at on filesystem
237     then write into the respective merge buffer.
238   */
239   if (my_seek(m_merge_file.m_fd,
240               m_merge_file.m_num_sort_buffers * m_merge_buf_size, SEEK_SET,
241               MYF(0)) == MY_FILEPOS_ERROR) {
242     // NO_LINT_DEBUG
243     sql_print_error("Error seeking to location in merge file on disk.");
244     return HA_ERR_ROCKSDB_MERGE_FILE_ERR;
245   }
246 
247   /*
248     Add a file sync call here to flush the data out. Otherwise, the filesystem
249     cache can flush out all of the files at the same time, causing a write
250     burst.
251   */
252   if (my_write(m_merge_file.m_fd, m_output_buf->m_block.get(),
253                m_output_buf->m_total_size, MYF(MY_WME | MY_NABP)) ||
254       mysql_file_sync(m_merge_file.m_fd, MYF(MY_WME))) {
255     // NO_LINT_DEBUG
256     sql_print_error("Error writing sorted merge buffer to disk.");
257     return HA_ERR_ROCKSDB_MERGE_FILE_ERR;
258   }
259 
260   /* Increment merge file offset to track number of merge buffers written */
261   m_merge_file.m_num_sort_buffers += 1;
262 
263   /* Reset everything for next run */
264   merge_reset();
265 
266   return HA_EXIT_SUCCESS;
267 }
268 
269 /**
270   Prepare n-way merge of n sorted buffers on disk, using a heap sorted by
271   secondary key records.
272 */
merge_heap_prepare()273 int Rdb_index_merge::merge_heap_prepare() {
274   assert(m_merge_min_heap.empty());
275 
276   /*
277     If the offset tree is not empty, there are still some records that need to
278     be written to disk. Write them out now.
279   */
280   if (!m_offset_tree.empty() && merge_buf_write()) {
281     return HA_ERR_ROCKSDB_MERGE_FILE_ERR;
282   }
283 
284   assert(m_merge_file.m_num_sort_buffers > 0);
285 
286   /*
287     For an n-way merge, we need to read chunks of each merge file
288     simultaneously.
289   */
290   ulonglong chunk_size =
291       m_merge_combine_read_size / m_merge_file.m_num_sort_buffers;
292   if (chunk_size >= m_merge_buf_size) {
293     chunk_size = m_merge_buf_size;
294   }
295 
296   /* Allocate buffers for each chunk */
297   for (ulonglong i = 0; i < m_merge_file.m_num_sort_buffers; i++) {
298     const auto entry =
299         std::make_shared<merge_heap_entry>(m_cf_handle->GetComparator());
300 
301     /*
302       Read chunk_size bytes from each chunk on disk, and place inside
303       respective chunk buffer.
304     */
305     const size_t total_size =
306         entry->prepare(m_merge_file.m_fd, i * m_merge_buf_size, chunk_size);
307 
308     if (total_size == (size_t)-1) {
309       return HA_ERR_ROCKSDB_MERGE_FILE_ERR;
310     }
311 
312     /* Can reach this condition if an index was added on table w/ no rows */
313     if (total_size - RDB_MERGE_CHUNK_LEN == 0) {
314       break;
315     }
316 
317     /* Read the first record from each buffer to initially populate the heap */
318     if (entry->read_rec(&entry->m_key, &entry->m_val)) {
319       // NO_LINT_DEBUG
320       sql_print_error("Chunk size is too small to process merge.");
321       return HA_ERR_ROCKSDB_MERGE_FILE_ERR;
322     }
323 
324     m_merge_min_heap.push(std::move(entry));
325   }
326 
327   return HA_EXIT_SUCCESS;
328 }
329 
330 /**
331   Create and/or iterate through keys in the merge heap.
332 */
next(rocksdb::Slice * const key,rocksdb::Slice * const val)333 int Rdb_index_merge::next(rocksdb::Slice *const key,
334                           rocksdb::Slice *const val) {
335   /*
336     If table fits in one sort buffer, we can optimize by writing
337     the sort buffer directly through to the sstfilewriter instead of
338     needing to create tmp files/heap to merge the sort buffers.
339 
340     If there are no sort buffer records (alters on empty tables),
341     also exit here.
342   */
343   if (m_merge_file.m_num_sort_buffers == 0) {
344     if (m_offset_tree.empty()) {
345       return -1;
346     }
347 
348     const auto rec = m_offset_tree.begin();
349 
350     /* Read record from offset */
351     merge_read_rec(rec->m_block, key, val);
352 
353     m_offset_tree.erase(rec);
354     return HA_EXIT_SUCCESS;
355   }
356 
357   int res;
358 
359   /*
360     If heap and heap chunk info are empty, we must be beginning the merge phase
361     of the external sort. Populate the heap with initial values from each
362     disk chunk.
363   */
364   if (m_merge_min_heap.empty()) {
365     if ((res = merge_heap_prepare())) {
366       // NO_LINT_DEBUG
367       sql_print_error("Error during preparation of heap.");
368       return res;
369     }
370 
371     /*
372       Return the first top record without popping, as we haven't put this
373       inside the SST file yet.
374     */
375     merge_heap_top(key, val);
376     return HA_EXIT_SUCCESS;
377   }
378 
379   assert(!m_merge_min_heap.empty());
380   return merge_heap_pop_and_get_next(key, val);
381 }
382 
383 /**
384   Get current top record from the heap.
385 */
merge_heap_top(rocksdb::Slice * const key,rocksdb::Slice * const val)386 void Rdb_index_merge::merge_heap_top(rocksdb::Slice *const key,
387                                      rocksdb::Slice *const val) {
388   assert(!m_merge_min_heap.empty());
389 
390   const std::shared_ptr<merge_heap_entry> &entry = m_merge_min_heap.top();
391   *key = entry->m_key;
392   *val = entry->m_val;
393 }
394 
395 /**
396   Pops the top record, and uses it to read next record from the
397   corresponding sort buffer and push onto the heap.
398 
399   Returns -1 when there are no more records in the heap.
400 */
merge_heap_pop_and_get_next(rocksdb::Slice * const key,rocksdb::Slice * const val)401 int Rdb_index_merge::merge_heap_pop_and_get_next(rocksdb::Slice *const key,
402                                                  rocksdb::Slice *const val) {
403   /*
404     Make a new reference to shared ptr so it doesn't get destroyed
405     during pop(). We are going to push this entry back onto the heap.
406   */
407   const std::shared_ptr<merge_heap_entry> entry = m_merge_min_heap.top();
408   m_merge_min_heap.pop();
409 
410   /*
411     We are finished w/ current chunk if:
412     current_offset + disk_offset == m_total_size
413 
414     Return without adding entry back onto heap.
415     If heap is also empty, we must be finished with merge.
416   */
417   if (entry->m_chunk_info->is_chunk_finished()) {
418     if (m_merge_min_heap.empty()) {
419       return -1;
420     }
421 
422     merge_heap_top(key, val);
423     return HA_EXIT_SUCCESS;
424   }
425 
426   /*
427     Make sure we haven't reached the end of the chunk.
428   */
429   assert(!entry->m_chunk_info->is_chunk_finished());
430 
431   /*
432     If merge_read_rec fails, it means the either the chunk was cut off
433     or we've reached the end of the respective chunk.
434   */
435   if (entry->read_rec(&entry->m_key, &entry->m_val)) {
436     if (entry->read_next_chunk_from_disk(m_merge_file.m_fd)) {
437       return HA_ERR_ROCKSDB_MERGE_FILE_ERR;
438     }
439 
440     /* Try reading record again, should never fail. */
441     if (entry->read_rec(&entry->m_key, &entry->m_val)) {
442       return HA_ERR_ROCKSDB_MERGE_FILE_ERR;
443     }
444   }
445 
446   /* Push entry back on to the heap w/ updated buffer + offset ptr */
447   m_merge_min_heap.push(std::move(entry));
448 
449   /* Return the current top record on heap */
450   merge_heap_top(key, val);
451   return HA_EXIT_SUCCESS;
452 }
453 
read_next_chunk_from_disk(File fd)454 int Rdb_index_merge::merge_heap_entry::read_next_chunk_from_disk(File fd) {
455   if (m_chunk_info->read_next_chunk_from_disk(fd)) {
456     return HA_EXIT_FAILURE;
457   }
458 
459   m_block = m_chunk_info->m_block.get();
460   return HA_EXIT_SUCCESS;
461 }
462 
read_next_chunk_from_disk(File fd)463 int Rdb_index_merge::merge_buf_info::read_next_chunk_from_disk(File fd) {
464   m_disk_curr_offset += m_curr_offset;
465 
466   if (my_seek(fd, m_disk_curr_offset, SEEK_SET, MYF(0)) == MY_FILEPOS_ERROR) {
467     // NO_LINT_DEBUG
468     sql_print_error("Error seeking to location in merge file on disk.");
469     return HA_EXIT_FAILURE;
470   }
471 
472   /* Overwrite the old block */
473   const size_t bytes_read =
474       my_read(fd, m_block.get(), m_block_len, MYF(MY_WME));
475   if (bytes_read == (size_t)-1) {
476     // NO_LINT_DEBUG
477     sql_print_error("Error reading merge file from disk.");
478     return HA_EXIT_FAILURE;
479   }
480 
481   m_curr_offset = 0;
482   return HA_EXIT_SUCCESS;
483 }
484 
485 /**
486   Get records from offset within sort buffer and compare them.
487   Sort by least to greatest.
488 */
merge_record_compare(const uchar * const a_block,const uchar * const b_block,const rocksdb::Comparator * const comparator)489 int Rdb_index_merge::merge_record_compare(
490     const uchar *const a_block, const uchar *const b_block,
491     const rocksdb::Comparator *const comparator) {
492   return comparator->Compare(as_slice(a_block), as_slice(b_block));
493 }
494 
495 /**
496   Given an offset in a merge sort buffer, read out the keys + values.
497   After this, block will point to the next record in the buffer.
498 **/
merge_read_rec(const uchar * const block,rocksdb::Slice * const key,rocksdb::Slice * const val)499 void Rdb_index_merge::merge_read_rec(const uchar *const block,
500                                      rocksdb::Slice *const key,
501                                      rocksdb::Slice *const val) {
502   /* Read key at block offset into key slice and the value into value slice*/
503   read_slice(key, block);
504   read_slice(val, block + RDB_MERGE_REC_DELIMITER + key->size());
505 }
506 
read_slice(rocksdb::Slice * slice,const uchar * block_ptr)507 void Rdb_index_merge::read_slice(rocksdb::Slice *slice,
508                                  const uchar *block_ptr) {
509   uint64 slice_len;
510   merge_read_uint64(&block_ptr, &slice_len);
511 
512   *slice = rocksdb::Slice(reinterpret_cast<const char *>(block_ptr), slice_len);
513 }
514 
read_rec(rocksdb::Slice * const key,rocksdb::Slice * const val)515 int Rdb_index_merge::merge_heap_entry::read_rec(rocksdb::Slice *const key,
516                                                 rocksdb::Slice *const val) {
517   const uchar *block_ptr = m_block;
518   const auto orig_offset = m_chunk_info->m_curr_offset;
519   const auto orig_block = m_block;
520 
521   /* Read key at block offset into key slice and the value into value slice*/
522   if (read_slice(key, &block_ptr) != 0) {
523     return HA_EXIT_FAILURE;
524   }
525 
526   m_chunk_info->m_curr_offset += (uintptr_t)block_ptr - (uintptr_t)m_block;
527   m_block += (uintptr_t)block_ptr - (uintptr_t)m_block;
528 
529   if (read_slice(val, &block_ptr) != 0) {
530     m_chunk_info->m_curr_offset = orig_offset;
531     m_block = orig_block;
532     return HA_EXIT_FAILURE;
533   }
534 
535   m_chunk_info->m_curr_offset += (uintptr_t)block_ptr - (uintptr_t)m_block;
536   m_block += (uintptr_t)block_ptr - (uintptr_t)m_block;
537 
538   return HA_EXIT_SUCCESS;
539 }
540 
read_slice(rocksdb::Slice * const slice,const uchar ** block_ptr)541 int Rdb_index_merge::merge_heap_entry::read_slice(rocksdb::Slice *const slice,
542                                                   const uchar **block_ptr) {
543   if (!m_chunk_info->has_space(RDB_MERGE_REC_DELIMITER)) {
544     return HA_EXIT_FAILURE;
545   }
546 
547   uint64 slice_len;
548   merge_read_uint64(block_ptr, &slice_len);
549   if (!m_chunk_info->has_space(RDB_MERGE_REC_DELIMITER + slice_len)) {
550     return HA_EXIT_FAILURE;
551   }
552 
553   *slice =
554       rocksdb::Slice(reinterpret_cast<const char *>(*block_ptr), slice_len);
555   *block_ptr += slice_len;
556   return HA_EXIT_SUCCESS;
557 }
558 
prepare(File fd,ulonglong f_offset,ulonglong chunk_size)559 size_t Rdb_index_merge::merge_heap_entry::prepare(File fd, ulonglong f_offset,
560                                                   ulonglong chunk_size) {
561   m_chunk_info = std::make_shared<merge_buf_info>(chunk_size);
562   const size_t res = m_chunk_info->prepare(fd, f_offset);
563   if (res != (size_t)-1) {
564     m_block = m_chunk_info->m_block.get() + RDB_MERGE_CHUNK_LEN;
565   }
566 
567   return res;
568 }
569 
prepare(File fd,ulonglong f_offset)570 size_t Rdb_index_merge::merge_buf_info::prepare(File fd, ulonglong f_offset) {
571   m_disk_start_offset = f_offset;
572   m_disk_curr_offset = f_offset;
573 
574   /*
575     Need to position cursor to the chunk it needs to be at on filesystem
576     then read 'chunk_size' bytes into the respective chunk buffer.
577   */
578   if (my_seek(fd, f_offset, SEEK_SET, MYF(0)) == MY_FILEPOS_ERROR) {
579     // NO_LINT_DEBUG
580     sql_print_error("Error seeking to location in merge file on disk.");
581     return (size_t)-1;
582   }
583 
584   const size_t bytes_read =
585       my_read(fd, m_block.get(), m_total_size, MYF(MY_WME));
586   if (bytes_read == (size_t)-1) {
587     // NO_LINT_DEBUG
588     sql_print_error("Error reading merge file from disk.");
589     return (size_t)-1;
590   }
591 
592   /*
593     Read the first 8 bytes of each chunk, this gives us the actual
594     size of each chunk.
595   */
596   const uchar *block_ptr = m_block.get();
597   merge_read_uint64(&block_ptr, &m_total_size);
598   m_curr_offset += RDB_MERGE_CHUNK_LEN;
599   return m_total_size;
600 }
601 
602 /* Store key and value w/ their respective delimiters at the given offset */
store_key_value(const rocksdb::Slice & key,const rocksdb::Slice & val)603 void Rdb_index_merge::merge_buf_info::store_key_value(
604     const rocksdb::Slice &key, const rocksdb::Slice &val) {
605   store_slice(key);
606   store_slice(val);
607 }
608 
store_slice(const rocksdb::Slice & slice)609 void Rdb_index_merge::merge_buf_info::store_slice(const rocksdb::Slice &slice) {
610   /* Store length delimiter */
611   merge_store_uint64(&m_block[m_curr_offset], slice.size());
612 
613   /* Store slice data */
614   memcpy(&m_block[m_curr_offset + RDB_MERGE_REC_DELIMITER], slice.data(),
615          slice.size());
616 
617   m_curr_offset += slice.size() + RDB_MERGE_REC_DELIMITER;
618 }
619 
merge_reset()620 void Rdb_index_merge::merge_reset() {
621   /*
622     Either error, or all values in the sort buffer have been written to disk,
623     so we need to clear the offset tree.
624   */
625   m_offset_tree.clear();
626 
627   /* Reset sort buffer block */
628   if (m_rec_buf_unsorted && m_rec_buf_unsorted->m_block) {
629     m_rec_buf_unsorted->m_curr_offset = 0;
630   }
631 
632   /* Reset output buf */
633   if (m_output_buf && m_output_buf->m_block) {
634     m_output_buf->m_curr_offset = 0;
635   }
636 }
637 
638 }  // namespace myrocks
639