1 /*
2    Copyright (c) 2016, Facebook, Inc.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation; version 2 of the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
16 
17 /* This C++ file's header file */
18 #include "./rdb_index_merge.h"
19 
20 /* MyRocks header files */
21 #include "./ha_rocksdb.h"
22 #include "./rdb_datadic.h"
23 
24 namespace myrocks {
25 
Rdb_index_merge(const char * const tmpfile_path,const ulonglong & merge_buf_size,const ulonglong & merge_combine_read_size,const rocksdb::Comparator * const comparator)26 Rdb_index_merge::Rdb_index_merge(const char *const tmpfile_path,
27                                  const ulonglong &merge_buf_size,
28                                  const ulonglong &merge_combine_read_size,
29                                  const rocksdb::Comparator *const comparator)
30     : m_tmpfile_path(tmpfile_path), m_merge_buf_size(merge_buf_size),
31       m_merge_combine_read_size(merge_combine_read_size),
32       m_comparator(comparator), m_rec_buf_unsorted(nullptr),
33       m_output_buf(nullptr) {}
34 
~Rdb_index_merge()35 Rdb_index_merge::~Rdb_index_merge() {
36   /*
37     Close tmp file, we don't need to worry about deletion, mysql handles it.
38   */
39   my_close(m_merge_file.fd, MYF(MY_WME));
40 }
41 
init()42 int Rdb_index_merge::init() {
43   /*
44     Create a temporary merge file on disk to store sorted chunks during
45     inplace index creation.
46   */
47   if (merge_file_create()) {
48     return HA_ERR_INTERNAL_ERROR;
49   }
50 
51   /*
52     Then, allocate buffer to store unsorted records before they are written
53     to disk. They will be written to disk sorted. A sorted tree is used to
54     keep track of the offset of each record within the unsorted buffer.
55   */
56   m_rec_buf_unsorted =
57       std::shared_ptr<merge_buf_info>(new merge_buf_info(m_merge_buf_size));
58 
59   /*
60     Allocate output buffer that will contain sorted block that is written to
61     disk.
62   */
63   m_output_buf =
64       std::shared_ptr<merge_buf_info>(new merge_buf_info(m_merge_buf_size));
65 
66   return HA_EXIT_SUCCESS;
67 }
68 
69 /**
70   Create a merge file in the given location.
71 */
merge_file_create()72 int Rdb_index_merge::merge_file_create() {
73   DBUG_ASSERT(m_merge_file.fd == -1);
74 
75   int fd;
76   /* If no path set for tmpfile, use mysql_tmpdir by default */
77   if (m_tmpfile_path == nullptr) {
78     fd = mysql_tmpfile("myrocks");
79   } else {
80     char filename[FN_REFLEN];
81     fd = create_temp_file(filename, m_tmpfile_path, "myrocks",
82                           O_CREAT | O_EXCL | O_RDWR | O_TEMPORARY,
83                           MYF(MY_WME));
84     if (fd >= 0) {
85 #ifndef __WIN__
86       /*
87         This can be removed once the following bug is fixed:
88         Bug #28903  create_temp_file() doesn't honor O_TEMPORARY option
89                     (file not removed) (Unix)
90       */
91       unlink(filename);
92 #endif /* !__WIN__ */
93     }
94   }
95 
96   if (fd < 0) {
97     return HA_ERR_INTERNAL_ERROR;
98   }
99  m_merge_file.fd = fd;
100   m_merge_file.num_sort_buffers = 0;
101 
102   return HA_EXIT_SUCCESS;
103 }
104 
105 /**
106   Add record to offset tree (and unsorted merge buffer) in preparation for
107   writing out to disk in sorted chunks.
108 
109   If buffer in memory is full, write the buffer out to disk sorted using the
110   offset tree, and clear the tree. (Happens in merge_buf_write)
111 */
add(const rocksdb::Slice & key,const rocksdb::Slice & val)112 int Rdb_index_merge::add(const rocksdb::Slice &key, const rocksdb::Slice &val) {
113   /* Adding a record after heap is already created results in error */
114   DBUG_ASSERT(m_merge_min_heap.empty());
115 
116   /*
117     Check if sort buffer is going to be out of space, if so write it
118     out to disk in sorted order using offset tree.
119   */
120   const uint total_offset = RDB_MERGE_CHUNK_LEN +
121                             m_rec_buf_unsorted->curr_offset +
122                             RDB_MERGE_KEY_DELIMITER + RDB_MERGE_VAL_DELIMITER +
123                             key.size() + val.size();
124   if (total_offset >= m_rec_buf_unsorted->total_size) {
125     /*
126       If the offset tree is empty here, that means that the proposed key to
127       add is too large for the buffer.
128     */
129     if (m_offset_tree.empty()) {
130       // NO_LINT_DEBUG
131       sql_print_error("Sort buffer size is too small to process merge. "
132                       "Please set merge buffer size to a higher value.");
133       return HA_ERR_INTERNAL_ERROR;
134     }
135 
136     if (merge_buf_write()) {
137       // NO_LINT_DEBUG
138       sql_print_error("Error writing sort buffer to disk.");
139       return HA_ERR_INTERNAL_ERROR;
140     }
141   }
142 
143   const ulonglong rec_offset = m_rec_buf_unsorted->curr_offset;
144 
145   /*
146     Store key and value in temporary unsorted in memory buffer pointed to by
147     offset tree.
148   */
149   m_rec_buf_unsorted->store_key_value(key, val);
150 
151   /* Find sort order of the new record */
152   m_offset_tree.emplace(m_rec_buf_unsorted->block.get() + rec_offset,
153                         m_comparator);
154 
155   return HA_EXIT_SUCCESS;
156 }
157 
158 /**
159   Sort + write merge buffer chunk out to disk.
160 */
merge_buf_write()161 int Rdb_index_merge::merge_buf_write() {
162   DBUG_ASSERT(m_merge_file.fd != -1);
163   DBUG_ASSERT(m_rec_buf_unsorted != nullptr);
164   DBUG_ASSERT(m_output_buf != nullptr);
165   DBUG_ASSERT(!m_offset_tree.empty());
166 
167   /* Write actual chunk size to first 8 bytes of the merge buffer */
168   merge_store_uint64(m_output_buf->block.get(),
169                      m_rec_buf_unsorted->curr_offset + RDB_MERGE_CHUNK_LEN);
170   m_output_buf->curr_offset += RDB_MERGE_CHUNK_LEN;
171 
172   /*
173     Iterate through the offset tree.  Should be ordered by the secondary key
174     at this point.
175   */
176   for (const auto &rec : m_offset_tree) {
177     DBUG_ASSERT(m_output_buf->curr_offset <= m_merge_buf_size);
178 
179     /* Read record from offset (should never fail) */
180     rocksdb::Slice key;
181     rocksdb::Slice val;
182     merge_read_rec(rec.block, &key, &val);
183 
184     /* Store key and value into sorted output buffer */
185     m_output_buf->store_key_value(key, val);
186   }
187 
188   DBUG_ASSERT(m_output_buf->curr_offset <= m_output_buf->total_size);
189 
190   /*
191     Write output buffer to disk.
192 
193     Need to position cursor to the chunk it needs to be at on filesystem
194     then write into the respective merge buffer.
195   */
196   if (my_seek(m_merge_file.fd, m_merge_file.num_sort_buffers * m_merge_buf_size,
197               SEEK_SET, MYF(0)) == MY_FILEPOS_ERROR) {
198     // NO_LINT_DEBUG
199     sql_print_error("Error seeking to location in merge file on disk.");
200     return HA_ERR_INTERNAL_ERROR;
201   }
202 
203   /*
204     Add a file sync call here to flush the data out. Otherwise, the filesystem
205     cache can flush out all of the files at the same time, causing a write
206     burst.
207   */
208   if (my_write(m_merge_file.fd, m_output_buf->block.get(),
209                m_output_buf->total_size, MYF(MY_WME | MY_NABP)) ||
210       mysql_file_sync(m_merge_file.fd, MYF(MY_WME))) {
211     // NO_LINT_DEBUG
212     sql_print_error("Error writing sorted merge buffer to disk.");
213     return HA_ERR_INTERNAL_ERROR;
214   }
215 
216   /* Increment merge file offset to track number of merge buffers written */
217   m_merge_file.num_sort_buffers += 1;
218 
219   /* Reset everything for next run */
220   merge_reset();
221 
222   return HA_EXIT_SUCCESS;
223 }
224 
225 /**
226   Prepare n-way merge of n sorted buffers on disk, using a heap sorted by
227   secondary key records.
228 */
merge_heap_prepare()229 int Rdb_index_merge::merge_heap_prepare() {
230   DBUG_ASSERT(m_merge_min_heap.empty());
231 
232   /*
233     If the offset tree is not empty, there are still some records that need to
234     be written to disk. Write them out now.
235   */
236   if (!m_offset_tree.empty() && merge_buf_write()) {
237     return HA_ERR_INTERNAL_ERROR;
238   }
239 
240   DBUG_ASSERT(m_merge_file.num_sort_buffers > 0);
241 
242   /*
243     For an n-way merge, we need to read chunks of each merge file
244     simultaneously.
245   */
246   ulonglong chunk_size =
247       m_merge_combine_read_size / m_merge_file.num_sort_buffers;
248   if (chunk_size >= m_merge_buf_size) {
249     chunk_size = m_merge_buf_size;
250   }
251 
252   /* Allocate buffers for each chunk */
253   for (ulonglong i = 0; i < m_merge_file.num_sort_buffers; i++) {
254     const auto entry = std::make_shared<merge_heap_entry>(m_comparator);
255 
256     /*
257       Read chunk_size bytes from each chunk on disk, and place inside
258       respective chunk buffer.
259     */
260     const size_t total_size =
261         entry->prepare(m_merge_file.fd, i * m_merge_buf_size, chunk_size);
262 
263     if (total_size == (size_t)-1) {
264       return HA_ERR_INTERNAL_ERROR;
265     }
266 
267     /* Can reach this condition if an index was added on table w/ no rows */
268     if (total_size - RDB_MERGE_CHUNK_LEN == 0) {
269       break;
270     }
271 
272     /* Read the first record from each buffer to initially populate the heap */
273     if (entry->read_rec(&entry->key, &entry->val)) {
274       // NO_LINT_DEBUG
275       sql_print_error("Chunk size is too small to process merge.");
276       return HA_ERR_INTERNAL_ERROR;
277     }
278 
279     m_merge_min_heap.push(std::move(entry));
280   }
281 
282   return HA_EXIT_SUCCESS;
283 }
284 
285 /**
286   Create and/or iterate through keys in the merge heap.
287 */
next(rocksdb::Slice * const key,rocksdb::Slice * const val)288 int Rdb_index_merge::next(rocksdb::Slice *const key,
289                           rocksdb::Slice *const val) {
290   /*
291     If table fits in one sort buffer, we can optimize by writing
292     the sort buffer directly through to the sstfilewriter instead of
293     needing to create tmp files/heap to merge the sort buffers.
294 
295     If there are no sort buffer records (alters on empty tables),
296     also exit here.
297   */
298   if (m_merge_file.num_sort_buffers == 0) {
299     if (m_offset_tree.empty()) {
300       return -1;
301     }
302 
303     const auto rec = m_offset_tree.begin();
304 
305     /* Read record from offset */
306     merge_read_rec(rec->block, key, val);
307 
308     m_offset_tree.erase(rec);
309     return HA_EXIT_SUCCESS;
310   }
311 
312   int res;
313 
314   /*
315     If heap and heap chunk info are empty, we must be beginning the merge phase
316     of the external sort. Populate the heap with initial values from each
317     disk chunk.
318   */
319   if (m_merge_min_heap.empty()) {
320     if ((res = merge_heap_prepare())) {
321       // NO_LINT_DEBUG
322       sql_print_error("Error during preparation of heap.");
323       return res;
324     }
325 
326     /*
327       Return the first top record without popping, as we haven't put this
328       inside the SST file yet.
329     */
330     merge_heap_top(key, val);
331     return HA_EXIT_SUCCESS;
332   }
333 
334   DBUG_ASSERT(!m_merge_min_heap.empty());
335   return merge_heap_pop_and_get_next(key, val);
336 }
337 
338 /**
339   Get current top record from the heap.
340 */
merge_heap_top(rocksdb::Slice * const key,rocksdb::Slice * const val)341 void Rdb_index_merge::merge_heap_top(rocksdb::Slice *const key,
342                                      rocksdb::Slice *const val) {
343   DBUG_ASSERT(!m_merge_min_heap.empty());
344 
345   const std::shared_ptr<merge_heap_entry> &entry = m_merge_min_heap.top();
346   *key = entry->key;
347   *val = entry->val;
348 }
349 
350 /**
351   Pops the top record, and uses it to read next record from the
352   corresponding sort buffer and push onto the heap.
353 
354   Returns -1 when there are no more records in the heap.
355 */
merge_heap_pop_and_get_next(rocksdb::Slice * const key,rocksdb::Slice * const val)356 int Rdb_index_merge::merge_heap_pop_and_get_next(rocksdb::Slice *const key,
357                                                  rocksdb::Slice *const val) {
358   /*
359     Make a new reference to shared ptr so it doesn't get destroyed
360     during pop(). We are going to push this entry back onto the heap.
361   */
362   const std::shared_ptr<merge_heap_entry> entry = m_merge_min_heap.top();
363   m_merge_min_heap.pop();
364 
365   /*
366     We are finished w/ current chunk if:
367     current_offset + disk_offset == total_size
368 
369     Return without adding entry back onto heap.
370     If heap is also empty, we must be finished with merge.
371   */
372   if (entry->chunk_info->is_chunk_finished()) {
373     if (m_merge_min_heap.empty()) {
374       return -1;
375     }
376 
377     merge_heap_top(key, val);
378     return HA_EXIT_SUCCESS;
379   }
380 
381   /*
382     Make sure we haven't reached the end of the chunk.
383   */
384   DBUG_ASSERT(!entry->chunk_info->is_chunk_finished());
385 
386   /*
387     If merge_read_rec fails, it means the either the chunk was cut off
388     or we've reached the end of the respective chunk.
389   */
390   if (entry->read_rec(&entry->key, &entry->val)) {
391     if (entry->read_next_chunk_from_disk(m_merge_file.fd)) {
392       return HA_ERR_INTERNAL_ERROR;
393     }
394 
395     /* Try reading record again, should never fail. */
396     if (entry->read_rec(&entry->key, &entry->val)) {
397       return HA_ERR_INTERNAL_ERROR;
398     }
399   }
400 
401   /* Push entry back on to the heap w/ updated buffer + offset ptr */
402   m_merge_min_heap.push(std::move(entry));
403 
404   /* Return the current top record on heap */
405   merge_heap_top(key, val);
406   return HA_EXIT_SUCCESS;
407 }
408 
read_next_chunk_from_disk(File fd)409 int Rdb_index_merge::merge_heap_entry::read_next_chunk_from_disk(File fd) {
410   if (chunk_info->read_next_chunk_from_disk(fd)) {
411     return HA_EXIT_FAILURE;
412   }
413 
414   block = chunk_info->block.get();
415   return HA_EXIT_SUCCESS;
416 }
417 
read_next_chunk_from_disk(File fd)418 int Rdb_index_merge::merge_buf_info::read_next_chunk_from_disk(File fd) {
419   disk_curr_offset += curr_offset;
420 
421   if (my_seek(fd, disk_curr_offset, SEEK_SET, MYF(0)) == MY_FILEPOS_ERROR) {
422     // NO_LINT_DEBUG
423     sql_print_error("Error seeking to location in merge file on disk.");
424     return HA_EXIT_FAILURE;
425   }
426 
427   /* Overwrite the old block */
428   const size_t bytes_read = my_read(fd, block.get(), block_len, MYF(MY_WME));
429   if (bytes_read == (size_t)-1) {
430     // NO_LINT_DEBUG
431     sql_print_error("Error reading merge file from disk.");
432     return HA_EXIT_FAILURE;
433   }
434 
435   curr_offset = 0;
436   return HA_EXIT_SUCCESS;
437 }
438 
439 /**
440   Get records from offset within sort buffer and compare them.
441   Sort by least to greatest.
442 */
merge_record_compare(const uchar * const a_block,const uchar * const b_block,const rocksdb::Comparator * const comparator)443 int Rdb_index_merge::merge_record_compare(
444     const uchar *const a_block, const uchar *const b_block,
445     const rocksdb::Comparator *const comparator) {
446   return comparator->Compare(as_slice(a_block), as_slice(b_block));
447 }
448 
449 /**
450   Given an offset in a merge sort buffer, read out the keys + values.
451   After this, block will point to the next record in the buffer.
452 **/
merge_read_rec(const uchar * const block,rocksdb::Slice * const key,rocksdb::Slice * const val)453 void Rdb_index_merge::merge_read_rec(const uchar *const block,
454                                      rocksdb::Slice *const key,
455                                      rocksdb::Slice *const val) {
456   /* Read key at block offset into key slice and the value into value slice*/
457   read_slice(key, block);
458   read_slice(val, block + RDB_MERGE_REC_DELIMITER + key->size());
459 }
460 
read_slice(rocksdb::Slice * slice,const uchar * block_ptr)461 void Rdb_index_merge::read_slice(rocksdb::Slice *slice,
462                                  const uchar *block_ptr) {
463   uint64 slice_len;
464   merge_read_uint64(&block_ptr, &slice_len);
465 
466   *slice = rocksdb::Slice(reinterpret_cast<const char *>(block_ptr), slice_len);
467 }
468 
read_rec(rocksdb::Slice * const key,rocksdb::Slice * const val)469 int Rdb_index_merge::merge_heap_entry::read_rec(rocksdb::Slice *const key,
470                                                 rocksdb::Slice *const val) {
471   const uchar *block_ptr = block;
472   const auto orig_offset = chunk_info->curr_offset;
473   const auto orig_block = block;
474 
475   /* Read key at block offset into key slice and the value into value slice*/
476   if (read_slice(key, &block_ptr) != 0) {
477     return HA_EXIT_FAILURE;
478   }
479 
480   chunk_info->curr_offset += (uintptr_t)block_ptr - (uintptr_t)block;
481   block += (uintptr_t)block_ptr - (uintptr_t)block;
482 
483   if (read_slice(val, &block_ptr) != 0) {
484     chunk_info->curr_offset = orig_offset;
485     block = orig_block;
486     return HA_EXIT_FAILURE;
487   }
488 
489   chunk_info->curr_offset += (uintptr_t)block_ptr - (uintptr_t)block;
490   block += (uintptr_t)block_ptr - (uintptr_t)block;
491 
492   return HA_EXIT_SUCCESS;
493 }
494 
read_slice(rocksdb::Slice * const slice,const uchar ** block_ptr)495 int Rdb_index_merge::merge_heap_entry::read_slice(rocksdb::Slice *const slice,
496                                                   const uchar **block_ptr) {
497   if (!chunk_info->has_space(RDB_MERGE_REC_DELIMITER)) {
498     return HA_EXIT_FAILURE;
499   }
500 
501   uint64 slice_len;
502   merge_read_uint64(block_ptr, &slice_len);
503   if (!chunk_info->has_space(RDB_MERGE_REC_DELIMITER + slice_len)) {
504     return HA_EXIT_FAILURE;
505   }
506 
507   *slice =
508       rocksdb::Slice(reinterpret_cast<const char *>(*block_ptr), slice_len);
509   *block_ptr += slice_len;
510   return HA_EXIT_SUCCESS;
511 }
512 
prepare(File fd,ulonglong f_offset,ulonglong chunk_size)513 size_t Rdb_index_merge::merge_heap_entry::prepare(File fd, ulonglong f_offset,
514                                                   ulonglong chunk_size) {
515   chunk_info = std::make_shared<merge_buf_info>(chunk_size);
516   const size_t res = chunk_info->prepare(fd, f_offset);
517   if (res != (size_t)-1) {
518     block = chunk_info->block.get() + RDB_MERGE_CHUNK_LEN;
519   }
520 
521   return res;
522 }
523 
prepare(File fd,ulonglong f_offset)524 size_t Rdb_index_merge::merge_buf_info::prepare(File fd, ulonglong f_offset) {
525   disk_start_offset = f_offset;
526   disk_curr_offset = f_offset;
527 
528   /*
529     Need to position cursor to the chunk it needs to be at on filesystem
530     then read 'chunk_size' bytes into the respective chunk buffer.
531   */
532   if (my_seek(fd, f_offset, SEEK_SET, MYF(0)) == MY_FILEPOS_ERROR) {
533     // NO_LINT_DEBUG
534     sql_print_error("Error seeking to location in merge file on disk.");
535     return (size_t)-1;
536   }
537 
538   const size_t bytes_read = my_read(fd, block.get(), total_size, MYF(MY_WME));
539   if (bytes_read == (size_t)-1) {
540     // NO_LINT_DEBUG
541     sql_print_error("Error reading merge file from disk.");
542     return (size_t)-1;
543   }
544 
545   /*
546     Read the first 8 bytes of each chunk, this gives us the actual
547     size of each chunk.
548   */
549   const uchar *block_ptr = block.get();
550   merge_read_uint64(&block_ptr, &total_size);
551   curr_offset += RDB_MERGE_CHUNK_LEN;
552   return total_size;
553 }
554 
555 /* Store key and value w/ their respective delimiters at the given offset */
store_key_value(const rocksdb::Slice & key,const rocksdb::Slice & val)556 void Rdb_index_merge::merge_buf_info::store_key_value(
557     const rocksdb::Slice &key, const rocksdb::Slice &val) {
558   store_slice(key);
559   store_slice(val);
560 }
561 
store_slice(const rocksdb::Slice & slice)562 void Rdb_index_merge::merge_buf_info::store_slice(const rocksdb::Slice &slice) {
563   /* Store length delimiter */
564   merge_store_uint64(&block[curr_offset], slice.size());
565 
566   /* Store slice data */
567   memcpy(&block[curr_offset + RDB_MERGE_REC_DELIMITER], slice.data(),
568          slice.size());
569 
570   curr_offset += slice.size() + RDB_MERGE_REC_DELIMITER;
571 }
572 
merge_reset()573 void Rdb_index_merge::merge_reset() {
574   /*
575     Either error, or all values in the sort buffer have been written to disk,
576     so we need to clear the offset tree.
577   */
578   m_offset_tree.clear();
579 
580   /* Reset sort buffer block */
581   if (m_rec_buf_unsorted && m_rec_buf_unsorted->block) {
582     m_rec_buf_unsorted->curr_offset = 0;
583   }
584 
585   /* Reset output buf */
586   if (m_output_buf && m_output_buf->block) {
587     m_output_buf->curr_offset = 0;
588   }
589 }
590 
591 } // namespace myrocks
592