1 /*
2 Copyright (c) 2016, Facebook, Inc.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
16
17 /* This C++ file's header file */
18 #include "./rdb_index_merge.h"
19
20 /* MySQL header files */
21 #include "mysql/plugin.h"
22 #include "mysql/psi/mysql_file.h"
23
24 /* MyRocks header files */
25 #include "./ha_rocksdb.h"
26 #include "./rdb_datadic.h"
27
28 namespace myrocks {
29
Rdb_index_merge(const char * const tmpfile_path,const ulonglong merge_buf_size,const ulonglong merge_combine_read_size,const ulonglong merge_tmp_file_removal_delay,rocksdb::ColumnFamilyHandle * cf)30 Rdb_index_merge::Rdb_index_merge(const char *const tmpfile_path,
31 const ulonglong merge_buf_size,
32 const ulonglong merge_combine_read_size,
33 const ulonglong merge_tmp_file_removal_delay,
34 rocksdb::ColumnFamilyHandle *cf)
35 : m_tmpfile_path(tmpfile_path),
36 m_merge_buf_size(merge_buf_size),
37 m_merge_combine_read_size(merge_combine_read_size),
38 m_merge_tmp_file_removal_delay(merge_tmp_file_removal_delay),
39 m_cf_handle(cf),
40 m_rec_buf_unsorted(nullptr),
41 m_output_buf(nullptr) {}
42
~Rdb_index_merge()43 Rdb_index_merge::~Rdb_index_merge() {
44 /*
45 If merge_tmp_file_removal_delay is set, sleep between calls to chsize.
46
47 This helps mitigate potential trim stalls on flash when large files are
48 being deleted too quickly.
49 */
50 if (m_merge_tmp_file_removal_delay > 0) {
51 uint64 curr_size = m_merge_buf_size * m_merge_file.m_num_sort_buffers;
52 for (uint i = 0; i < m_merge_file.m_num_sort_buffers; i++) {
53 if (my_chsize(m_merge_file.m_fd, curr_size, 0, MYF(MY_WME))) {
54 // NO_LINT_DEBUG
55 sql_print_error("Error truncating file during fast index creation.");
56 }
57
58 my_sleep(m_merge_tmp_file_removal_delay * 1000);
59 // Not aborting on fsync error since the tmp file is not used anymore
60 if (mysql_file_sync(m_merge_file.m_fd, MYF(MY_WME))) {
61 // NO_LINT_DEBUG
62 sql_print_error("Error flushing truncated MyRocks merge buffer.");
63 }
64 curr_size -= m_merge_buf_size;
65 }
66 }
67
68 /*
69 Close file descriptor, we don't need to worry about deletion,
70 mysql handles it.
71 */
72 my_close(m_merge_file.m_fd, MYF(MY_WME));
73 }
74
init()75 int Rdb_index_merge::init() {
76 /*
77 Create a temporary merge file on disk to store sorted chunks during
78 inplace index creation.
79 */
80 if (merge_file_create()) {
81 return HA_ERR_ROCKSDB_MERGE_FILE_ERR;
82 }
83
84 /*
85 Then, allocate buffer to store unsorted records before they are written
86 to disk. They will be written to disk sorted. A sorted tree is used to
87 keep track of the offset of each record within the unsorted buffer.
88 */
89 m_rec_buf_unsorted =
90 std::shared_ptr<merge_buf_info>(new merge_buf_info(m_merge_buf_size));
91
92 /*
93 Allocate output buffer that will contain sorted block that is written to
94 disk.
95 */
96 m_output_buf =
97 std::shared_ptr<merge_buf_info>(new merge_buf_info(m_merge_buf_size));
98
99 return HA_EXIT_SUCCESS;
100 }
101
102 /**
103 Create a merge file in the given location.
104 */
merge_file_create()105 int Rdb_index_merge::merge_file_create() {
106 assert(m_merge_file.m_fd == -1);
107
108 int fd;
109 /* If no path set for tmpfile, use mysql_tmpdir by default */
110 if (m_tmpfile_path == nullptr) {
111 fd = mysql_tmpfile("myrocks");
112 } else {
113 char filename[FN_REFLEN];
114 fd = create_temp_file(filename, m_tmpfile_path, "myrocks",
115 O_CREAT | O_EXCL | O_RDWR, MYF(MY_WME));
116 if (fd >= 0) {
117 #ifndef __WIN__
118 /*
119 This can be removed once the following bug is fixed:
120 Bug #28903 create_temp_file() doesn't honor O_TEMPORARY option
121 (file not removed) (Unix)
122 */
123 unlink(filename);
124 #endif /* !__WIN__ */
125 }
126 }
127
128 if (fd < 0) {
129 // NO_LINT_DEBUG
130 sql_print_error("Failed to create temp file during fast index creation.");
131 return HA_ERR_ROCKSDB_MERGE_FILE_ERR;
132 }
133
134 m_merge_file.m_fd = fd;
135 m_merge_file.m_num_sort_buffers = 0;
136
137 return HA_EXIT_SUCCESS;
138 }
139
140 /**
141 Add record to offset tree (and unsorted merge buffer) in preparation for
142 writing out to disk in sorted chunks.
143
144 If buffer in memory is full, write the buffer out to disk sorted using the
145 offset tree, and clear the tree. (Happens in merge_buf_write)
146 */
add(const rocksdb::Slice & key,const rocksdb::Slice & val)147 int Rdb_index_merge::add(const rocksdb::Slice &key, const rocksdb::Slice &val) {
148 /* Adding a record after heap is already created results in error */
149 assert(m_merge_min_heap.empty());
150
151 /*
152 Check if sort buffer is going to be out of space, if so write it
153 out to disk in sorted order using offset tree.
154 */
155 const uint total_offset = RDB_MERGE_CHUNK_LEN +
156 m_rec_buf_unsorted->m_curr_offset +
157 RDB_MERGE_KEY_DELIMITER + RDB_MERGE_VAL_DELIMITER +
158 key.size() + val.size();
159 if (total_offset >= m_rec_buf_unsorted->m_total_size) {
160 /*
161 If the offset tree is empty here, that means that the proposed key to
162 add is too large for the buffer.
163 */
164 if (m_offset_tree.empty()) {
165 // NO_LINT_DEBUG
166 sql_print_error(
167 "Sort buffer size is too small to process merge. "
168 "Please set merge buffer size to a higher value.");
169 return HA_ERR_ROCKSDB_MERGE_FILE_ERR;
170 }
171
172 if (merge_buf_write()) {
173 // NO_LINT_DEBUG
174 sql_print_error("Error writing sort buffer to disk.");
175 return HA_ERR_ROCKSDB_MERGE_FILE_ERR;
176 }
177 }
178
179 const ulonglong rec_offset = m_rec_buf_unsorted->m_curr_offset;
180
181 /*
182 Store key and value in temporary unsorted in memory buffer pointed to by
183 offset tree.
184 */
185 m_rec_buf_unsorted->store_key_value(key, val);
186
187 /* Find sort order of the new record */
188 auto res =
189 m_offset_tree.emplace(m_rec_buf_unsorted->m_block.get() + rec_offset,
190 m_cf_handle->GetComparator());
191 if (!res.second) {
192 my_printf_error(ER_DUP_ENTRY,
193 "Failed to insert the record: the key already exists",
194 MYF(0));
195 return ER_DUP_ENTRY;
196 }
197
198 return HA_EXIT_SUCCESS;
199 }
200
201 /**
202 Sort + write merge buffer chunk out to disk.
203 */
merge_buf_write()204 int Rdb_index_merge::merge_buf_write() {
205 assert(m_merge_file.m_fd != -1);
206 assert(m_rec_buf_unsorted != nullptr);
207 assert(m_output_buf != nullptr);
208 assert(!m_offset_tree.empty());
209
210 /* Write actual chunk size to first 8 bytes of the merge buffer */
211 merge_store_uint64(m_output_buf->m_block.get(),
212 m_rec_buf_unsorted->m_curr_offset + RDB_MERGE_CHUNK_LEN);
213 m_output_buf->m_curr_offset += RDB_MERGE_CHUNK_LEN;
214
215 /*
216 Iterate through the offset tree. Should be ordered by the secondary key
217 at this point.
218 */
219 for (const auto &rec : m_offset_tree) {
220 assert(m_output_buf->m_curr_offset <= m_merge_buf_size);
221
222 /* Read record from offset (should never fail) */
223 rocksdb::Slice key;
224 rocksdb::Slice val;
225 merge_read_rec(rec.m_block, &key, &val);
226
227 /* Store key and value into sorted output buffer */
228 m_output_buf->store_key_value(key, val);
229 }
230
231 assert(m_output_buf->m_curr_offset <= m_output_buf->m_total_size);
232
233 /*
234 Write output buffer to disk.
235
236 Need to position cursor to the chunk it needs to be at on filesystem
237 then write into the respective merge buffer.
238 */
239 if (my_seek(m_merge_file.m_fd,
240 m_merge_file.m_num_sort_buffers * m_merge_buf_size, SEEK_SET,
241 MYF(0)) == MY_FILEPOS_ERROR) {
242 // NO_LINT_DEBUG
243 sql_print_error("Error seeking to location in merge file on disk.");
244 return HA_ERR_ROCKSDB_MERGE_FILE_ERR;
245 }
246
247 /*
248 Add a file sync call here to flush the data out. Otherwise, the filesystem
249 cache can flush out all of the files at the same time, causing a write
250 burst.
251 */
252 if (my_write(m_merge_file.m_fd, m_output_buf->m_block.get(),
253 m_output_buf->m_total_size, MYF(MY_WME | MY_NABP)) ||
254 mysql_file_sync(m_merge_file.m_fd, MYF(MY_WME))) {
255 // NO_LINT_DEBUG
256 sql_print_error("Error writing sorted merge buffer to disk.");
257 return HA_ERR_ROCKSDB_MERGE_FILE_ERR;
258 }
259
260 /* Increment merge file offset to track number of merge buffers written */
261 m_merge_file.m_num_sort_buffers += 1;
262
263 /* Reset everything for next run */
264 merge_reset();
265
266 return HA_EXIT_SUCCESS;
267 }
268
269 /**
270 Prepare n-way merge of n sorted buffers on disk, using a heap sorted by
271 secondary key records.
272 */
merge_heap_prepare()273 int Rdb_index_merge::merge_heap_prepare() {
274 assert(m_merge_min_heap.empty());
275
276 /*
277 If the offset tree is not empty, there are still some records that need to
278 be written to disk. Write them out now.
279 */
280 if (!m_offset_tree.empty() && merge_buf_write()) {
281 return HA_ERR_ROCKSDB_MERGE_FILE_ERR;
282 }
283
284 assert(m_merge_file.m_num_sort_buffers > 0);
285
286 /*
287 For an n-way merge, we need to read chunks of each merge file
288 simultaneously.
289 */
290 ulonglong chunk_size =
291 m_merge_combine_read_size / m_merge_file.m_num_sort_buffers;
292 if (chunk_size >= m_merge_buf_size) {
293 chunk_size = m_merge_buf_size;
294 }
295
296 /* Allocate buffers for each chunk */
297 for (ulonglong i = 0; i < m_merge_file.m_num_sort_buffers; i++) {
298 const auto entry =
299 std::make_shared<merge_heap_entry>(m_cf_handle->GetComparator());
300
301 /*
302 Read chunk_size bytes from each chunk on disk, and place inside
303 respective chunk buffer.
304 */
305 const size_t total_size =
306 entry->prepare(m_merge_file.m_fd, i * m_merge_buf_size, chunk_size);
307
308 if (total_size == (size_t)-1) {
309 return HA_ERR_ROCKSDB_MERGE_FILE_ERR;
310 }
311
312 /* Can reach this condition if an index was added on table w/ no rows */
313 if (total_size - RDB_MERGE_CHUNK_LEN == 0) {
314 break;
315 }
316
317 /* Read the first record from each buffer to initially populate the heap */
318 if (entry->read_rec(&entry->m_key, &entry->m_val)) {
319 // NO_LINT_DEBUG
320 sql_print_error("Chunk size is too small to process merge.");
321 return HA_ERR_ROCKSDB_MERGE_FILE_ERR;
322 }
323
324 m_merge_min_heap.push(std::move(entry));
325 }
326
327 return HA_EXIT_SUCCESS;
328 }
329
330 /**
331 Create and/or iterate through keys in the merge heap.
332 */
next(rocksdb::Slice * const key,rocksdb::Slice * const val)333 int Rdb_index_merge::next(rocksdb::Slice *const key,
334 rocksdb::Slice *const val) {
335 /*
336 If table fits in one sort buffer, we can optimize by writing
337 the sort buffer directly through to the sstfilewriter instead of
338 needing to create tmp files/heap to merge the sort buffers.
339
340 If there are no sort buffer records (alters on empty tables),
341 also exit here.
342 */
343 if (m_merge_file.m_num_sort_buffers == 0) {
344 if (m_offset_tree.empty()) {
345 return -1;
346 }
347
348 const auto rec = m_offset_tree.begin();
349
350 /* Read record from offset */
351 merge_read_rec(rec->m_block, key, val);
352
353 m_offset_tree.erase(rec);
354 return HA_EXIT_SUCCESS;
355 }
356
357 int res;
358
359 /*
360 If heap and heap chunk info are empty, we must be beginning the merge phase
361 of the external sort. Populate the heap with initial values from each
362 disk chunk.
363 */
364 if (m_merge_min_heap.empty()) {
365 if ((res = merge_heap_prepare())) {
366 // NO_LINT_DEBUG
367 sql_print_error("Error during preparation of heap.");
368 return res;
369 }
370
371 /*
372 Return the first top record without popping, as we haven't put this
373 inside the SST file yet.
374 */
375 merge_heap_top(key, val);
376 return HA_EXIT_SUCCESS;
377 }
378
379 assert(!m_merge_min_heap.empty());
380 return merge_heap_pop_and_get_next(key, val);
381 }
382
383 /**
384 Get current top record from the heap.
385 */
merge_heap_top(rocksdb::Slice * const key,rocksdb::Slice * const val)386 void Rdb_index_merge::merge_heap_top(rocksdb::Slice *const key,
387 rocksdb::Slice *const val) {
388 assert(!m_merge_min_heap.empty());
389
390 const std::shared_ptr<merge_heap_entry> &entry = m_merge_min_heap.top();
391 *key = entry->m_key;
392 *val = entry->m_val;
393 }
394
395 /**
396 Pops the top record, and uses it to read next record from the
397 corresponding sort buffer and push onto the heap.
398
399 Returns -1 when there are no more records in the heap.
400 */
merge_heap_pop_and_get_next(rocksdb::Slice * const key,rocksdb::Slice * const val)401 int Rdb_index_merge::merge_heap_pop_and_get_next(rocksdb::Slice *const key,
402 rocksdb::Slice *const val) {
403 /*
404 Make a new reference to shared ptr so it doesn't get destroyed
405 during pop(). We are going to push this entry back onto the heap.
406 */
407 const std::shared_ptr<merge_heap_entry> entry = m_merge_min_heap.top();
408 m_merge_min_heap.pop();
409
410 /*
411 We are finished w/ current chunk if:
412 current_offset + disk_offset == m_total_size
413
414 Return without adding entry back onto heap.
415 If heap is also empty, we must be finished with merge.
416 */
417 if (entry->m_chunk_info->is_chunk_finished()) {
418 if (m_merge_min_heap.empty()) {
419 return -1;
420 }
421
422 merge_heap_top(key, val);
423 return HA_EXIT_SUCCESS;
424 }
425
426 /*
427 Make sure we haven't reached the end of the chunk.
428 */
429 assert(!entry->m_chunk_info->is_chunk_finished());
430
431 /*
432 If merge_read_rec fails, it means the either the chunk was cut off
433 or we've reached the end of the respective chunk.
434 */
435 if (entry->read_rec(&entry->m_key, &entry->m_val)) {
436 if (entry->read_next_chunk_from_disk(m_merge_file.m_fd)) {
437 return HA_ERR_ROCKSDB_MERGE_FILE_ERR;
438 }
439
440 /* Try reading record again, should never fail. */
441 if (entry->read_rec(&entry->m_key, &entry->m_val)) {
442 return HA_ERR_ROCKSDB_MERGE_FILE_ERR;
443 }
444 }
445
446 /* Push entry back on to the heap w/ updated buffer + offset ptr */
447 m_merge_min_heap.push(std::move(entry));
448
449 /* Return the current top record on heap */
450 merge_heap_top(key, val);
451 return HA_EXIT_SUCCESS;
452 }
453
read_next_chunk_from_disk(File fd)454 int Rdb_index_merge::merge_heap_entry::read_next_chunk_from_disk(File fd) {
455 if (m_chunk_info->read_next_chunk_from_disk(fd)) {
456 return HA_EXIT_FAILURE;
457 }
458
459 m_block = m_chunk_info->m_block.get();
460 return HA_EXIT_SUCCESS;
461 }
462
read_next_chunk_from_disk(File fd)463 int Rdb_index_merge::merge_buf_info::read_next_chunk_from_disk(File fd) {
464 m_disk_curr_offset += m_curr_offset;
465
466 if (my_seek(fd, m_disk_curr_offset, SEEK_SET, MYF(0)) == MY_FILEPOS_ERROR) {
467 // NO_LINT_DEBUG
468 sql_print_error("Error seeking to location in merge file on disk.");
469 return HA_EXIT_FAILURE;
470 }
471
472 /* Overwrite the old block */
473 const size_t bytes_read =
474 my_read(fd, m_block.get(), m_block_len, MYF(MY_WME));
475 if (bytes_read == (size_t)-1) {
476 // NO_LINT_DEBUG
477 sql_print_error("Error reading merge file from disk.");
478 return HA_EXIT_FAILURE;
479 }
480
481 m_curr_offset = 0;
482 return HA_EXIT_SUCCESS;
483 }
484
485 /**
486 Get records from offset within sort buffer and compare them.
487 Sort by least to greatest.
488 */
merge_record_compare(const uchar * const a_block,const uchar * const b_block,const rocksdb::Comparator * const comparator)489 int Rdb_index_merge::merge_record_compare(
490 const uchar *const a_block, const uchar *const b_block,
491 const rocksdb::Comparator *const comparator) {
492 return comparator->Compare(as_slice(a_block), as_slice(b_block));
493 }
494
495 /**
496 Given an offset in a merge sort buffer, read out the keys + values.
497 After this, block will point to the next record in the buffer.
498 **/
merge_read_rec(const uchar * const block,rocksdb::Slice * const key,rocksdb::Slice * const val)499 void Rdb_index_merge::merge_read_rec(const uchar *const block,
500 rocksdb::Slice *const key,
501 rocksdb::Slice *const val) {
502 /* Read key at block offset into key slice and the value into value slice*/
503 read_slice(key, block);
504 read_slice(val, block + RDB_MERGE_REC_DELIMITER + key->size());
505 }
506
read_slice(rocksdb::Slice * slice,const uchar * block_ptr)507 void Rdb_index_merge::read_slice(rocksdb::Slice *slice,
508 const uchar *block_ptr) {
509 uint64 slice_len;
510 merge_read_uint64(&block_ptr, &slice_len);
511
512 *slice = rocksdb::Slice(reinterpret_cast<const char *>(block_ptr), slice_len);
513 }
514
read_rec(rocksdb::Slice * const key,rocksdb::Slice * const val)515 int Rdb_index_merge::merge_heap_entry::read_rec(rocksdb::Slice *const key,
516 rocksdb::Slice *const val) {
517 const uchar *block_ptr = m_block;
518 const auto orig_offset = m_chunk_info->m_curr_offset;
519 const auto orig_block = m_block;
520
521 /* Read key at block offset into key slice and the value into value slice*/
522 if (read_slice(key, &block_ptr) != 0) {
523 return HA_EXIT_FAILURE;
524 }
525
526 m_chunk_info->m_curr_offset += (uintptr_t)block_ptr - (uintptr_t)m_block;
527 m_block += (uintptr_t)block_ptr - (uintptr_t)m_block;
528
529 if (read_slice(val, &block_ptr) != 0) {
530 m_chunk_info->m_curr_offset = orig_offset;
531 m_block = orig_block;
532 return HA_EXIT_FAILURE;
533 }
534
535 m_chunk_info->m_curr_offset += (uintptr_t)block_ptr - (uintptr_t)m_block;
536 m_block += (uintptr_t)block_ptr - (uintptr_t)m_block;
537
538 return HA_EXIT_SUCCESS;
539 }
540
read_slice(rocksdb::Slice * const slice,const uchar ** block_ptr)541 int Rdb_index_merge::merge_heap_entry::read_slice(rocksdb::Slice *const slice,
542 const uchar **block_ptr) {
543 if (!m_chunk_info->has_space(RDB_MERGE_REC_DELIMITER)) {
544 return HA_EXIT_FAILURE;
545 }
546
547 uint64 slice_len;
548 merge_read_uint64(block_ptr, &slice_len);
549 if (!m_chunk_info->has_space(RDB_MERGE_REC_DELIMITER + slice_len)) {
550 return HA_EXIT_FAILURE;
551 }
552
553 *slice =
554 rocksdb::Slice(reinterpret_cast<const char *>(*block_ptr), slice_len);
555 *block_ptr += slice_len;
556 return HA_EXIT_SUCCESS;
557 }
558
prepare(File fd,ulonglong f_offset,ulonglong chunk_size)559 size_t Rdb_index_merge::merge_heap_entry::prepare(File fd, ulonglong f_offset,
560 ulonglong chunk_size) {
561 m_chunk_info = std::make_shared<merge_buf_info>(chunk_size);
562 const size_t res = m_chunk_info->prepare(fd, f_offset);
563 if (res != (size_t)-1) {
564 m_block = m_chunk_info->m_block.get() + RDB_MERGE_CHUNK_LEN;
565 }
566
567 return res;
568 }
569
prepare(File fd,ulonglong f_offset)570 size_t Rdb_index_merge::merge_buf_info::prepare(File fd, ulonglong f_offset) {
571 m_disk_start_offset = f_offset;
572 m_disk_curr_offset = f_offset;
573
574 /*
575 Need to position cursor to the chunk it needs to be at on filesystem
576 then read 'chunk_size' bytes into the respective chunk buffer.
577 */
578 if (my_seek(fd, f_offset, SEEK_SET, MYF(0)) == MY_FILEPOS_ERROR) {
579 // NO_LINT_DEBUG
580 sql_print_error("Error seeking to location in merge file on disk.");
581 return (size_t)-1;
582 }
583
584 const size_t bytes_read =
585 my_read(fd, m_block.get(), m_total_size, MYF(MY_WME));
586 if (bytes_read == (size_t)-1) {
587 // NO_LINT_DEBUG
588 sql_print_error("Error reading merge file from disk.");
589 return (size_t)-1;
590 }
591
592 /*
593 Read the first 8 bytes of each chunk, this gives us the actual
594 size of each chunk.
595 */
596 const uchar *block_ptr = m_block.get();
597 merge_read_uint64(&block_ptr, &m_total_size);
598 m_curr_offset += RDB_MERGE_CHUNK_LEN;
599 return m_total_size;
600 }
601
602 /* Store key and value w/ their respective delimiters at the given offset */
store_key_value(const rocksdb::Slice & key,const rocksdb::Slice & val)603 void Rdb_index_merge::merge_buf_info::store_key_value(
604 const rocksdb::Slice &key, const rocksdb::Slice &val) {
605 store_slice(key);
606 store_slice(val);
607 }
608
store_slice(const rocksdb::Slice & slice)609 void Rdb_index_merge::merge_buf_info::store_slice(const rocksdb::Slice &slice) {
610 /* Store length delimiter */
611 merge_store_uint64(&m_block[m_curr_offset], slice.size());
612
613 /* Store slice data */
614 memcpy(&m_block[m_curr_offset + RDB_MERGE_REC_DELIMITER], slice.data(),
615 slice.size());
616
617 m_curr_offset += slice.size() + RDB_MERGE_REC_DELIMITER;
618 }
619
merge_reset()620 void Rdb_index_merge::merge_reset() {
621 /*
622 Either error, or all values in the sort buffer have been written to disk,
623 so we need to clear the offset tree.
624 */
625 m_offset_tree.clear();
626
627 /* Reset sort buffer block */
628 if (m_rec_buf_unsorted && m_rec_buf_unsorted->m_block) {
629 m_rec_buf_unsorted->m_curr_offset = 0;
630 }
631
632 /* Reset output buf */
633 if (m_output_buf && m_output_buf->m_block) {
634 m_output_buf->m_curr_offset = 0;
635 }
636 }
637
638 } // namespace myrocks
639