1 /*
2    Copyright (c) 2016, Facebook, Inc.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation; version 2 of the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA */
16 
17 /* For PRIu64 use below: */
18 #define __STDC_FORMAT_MACROS
19 
20 #include <my_global.h>
21 
22 /* This C++ file's header file */
23 #include "./rdb_sst_info.h"
24 
25 #include <inttypes.h>
26 
27 /* C++ standard header files */
28 #include <cstdio>
29 #include <string>
30 #include <utility>
31 #include <vector>
32 
33 /* MySQL header files */
34 #include <mysqld_error.h>
35 #include "../sql/log.h"
36 #include "./my_dir.h"
37 
38 /* RocksDB header files */
39 #include "rocksdb/db.h"
40 #include "rocksdb/options.h"
41 
42 /* MyRocks header files */
43 #include "./ha_rocksdb.h"
44 #include "./ha_rocksdb_proto.h"
45 #include "./rdb_cf_options.h"
46 #include "./rdb_psi.h"
47 
48 namespace myrocks {
49 
Rdb_sst_file(rocksdb::DB * const db,rocksdb::ColumnFamilyHandle * const cf,const rocksdb::DBOptions & db_options,const std::string & name,const bool tracing)50 Rdb_sst_file_ordered::Rdb_sst_file::Rdb_sst_file(
51     rocksdb::DB *const db, rocksdb::ColumnFamilyHandle *const cf,
52     const rocksdb::DBOptions &db_options, const std::string &name,
53     const bool tracing)
54     : m_db(db),
55       m_cf(cf),
56       m_db_options(db_options),
57       m_sst_file_writer(nullptr),
58       m_name(name),
59       m_tracing(tracing),
60       m_comparator(cf->GetComparator()) {
61   DBUG_ASSERT(db != nullptr);
62   DBUG_ASSERT(cf != nullptr);
63 }
64 
~Rdb_sst_file()65 Rdb_sst_file_ordered::Rdb_sst_file::~Rdb_sst_file() {
66   // Make sure we clean up
67   delete m_sst_file_writer;
68   m_sst_file_writer = nullptr;
69 }
70 
open()71 rocksdb::Status Rdb_sst_file_ordered::Rdb_sst_file::open() {
72   DBUG_ASSERT(m_sst_file_writer == nullptr);
73 
74   rocksdb::ColumnFamilyDescriptor cf_descr;
75 
76   rocksdb::Status s = m_cf->GetDescriptor(&cf_descr);
77   if (!s.ok()) {
78     return s;
79   }
80 
81   // Create an sst file writer with the current options and comparator
82   const rocksdb::EnvOptions env_options(m_db_options);
83   const rocksdb::Options options(m_db_options, cf_descr.options);
84 
85   m_sst_file_writer =
86       new rocksdb::SstFileWriter(env_options, options, m_comparator, m_cf, true,
87                                  rocksdb::Env::IOPriority::IO_TOTAL,
88                                  cf_descr.options.optimize_filters_for_hits);
89 
90   s = m_sst_file_writer->Open(m_name);
91   if (m_tracing) {
92     // NO_LINT_DEBUG
93     sql_print_information("SST Tracing: Open(%s) returned %s", m_name.c_str(),
94                           s.ok() ? "ok" : "not ok");
95   }
96 
97   if (!s.ok()) {
98     delete m_sst_file_writer;
99     m_sst_file_writer = nullptr;
100   }
101 
102   return s;
103 }
104 
put(const rocksdb::Slice & key,const rocksdb::Slice & value)105 rocksdb::Status Rdb_sst_file_ordered::Rdb_sst_file::put(
106     const rocksdb::Slice &key, const rocksdb::Slice &value) {
107   DBUG_ASSERT(m_sst_file_writer != nullptr);
108 
109 #ifdef __GNUC__
110   // Add the specified key/value to the sst file writer
111 #pragma GCC diagnostic push
112 #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
113 #endif
114 #ifdef _MSC_VER
115 #pragma warning (disable : 4996)
116 #endif
117   return m_sst_file_writer->Add(key, value);
118 }
119 
generateKey(const std::string & key)120 std::string Rdb_sst_file_ordered::Rdb_sst_file::generateKey(
121     const std::string &key) {
122   static char const hexdigit[] = {'0', '1', '2', '3', '4', '5', '6', '7',
123                                   '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'};
124 
125   std::string res;
126 
127   res.reserve(key.size() * 2);
128 
129   for (auto ch : key) {
130     res += hexdigit[((uint8_t)ch) >> 4];
131     res += hexdigit[((uint8_t)ch) & 0x0F];
132   }
133 
134   return res;
135 }
136 
137 // This function is run by the background thread
commit()138 rocksdb::Status Rdb_sst_file_ordered::Rdb_sst_file::commit() {
139   DBUG_ASSERT(m_sst_file_writer != nullptr);
140 
141   rocksdb::Status s;
142   rocksdb::ExternalSstFileInfo fileinfo;  /// Finish may should be modified
143 
144   // Close out the sst file
145   s = m_sst_file_writer->Finish(&fileinfo);
146   if (m_tracing) {
147     // NO_LINT_DEBUG
148     sql_print_information("SST Tracing: Finish returned %s",
149                           s.ok() ? "ok" : "not ok");
150   }
151 
152   if (s.ok()) {
153     if (m_tracing) {
154       // NO_LINT_DEBUG
155       sql_print_information(
156           "SST Tracing: Adding file %s, smallest key: %s, "
157           "largest key: %s, file size: %" PRIu64
158           ", "
159           "num_entries: %" PRIu64,
160           fileinfo.file_path.c_str(),
161           generateKey(fileinfo.smallest_key).c_str(),
162           generateKey(fileinfo.largest_key).c_str(), fileinfo.file_size,
163           fileinfo.num_entries);
164     }
165   }
166 
167   delete m_sst_file_writer;
168   m_sst_file_writer = nullptr;
169 
170   return s;
171 }
172 
push(const rocksdb::Slice & key,const rocksdb::Slice & value)173 void Rdb_sst_file_ordered::Rdb_sst_stack::push(const rocksdb::Slice &key,
174                                                const rocksdb::Slice &value) {
175   if (m_buffer == nullptr) {
176     m_buffer = new char[m_buffer_size];
177   }
178 
179   // Put the actual key and value data unto our stack
180   size_t key_offset = m_offset;
181   memcpy(m_buffer + m_offset, key.data(), key.size());
182   m_offset += key.size();
183   memcpy(m_buffer + m_offset, value.data(), value.size());
184   m_offset += value.size();
185 
186   // Push just the offset, the key length and the value length onto the stack
187   m_stack.push(std::make_tuple(key_offset, key.size(), value.size()));
188 }
189 
190 std::pair<rocksdb::Slice, rocksdb::Slice>
top()191 Rdb_sst_file_ordered::Rdb_sst_stack::top() {
192   size_t offset, key_len, value_len;
193   // Pop the next item off the internal stack
194   std::tie(offset, key_len, value_len) = m_stack.top();
195 
196   // Make slices from the offset (first), key length (second), and value
197   // length (third)
198   DBUG_ASSERT(m_buffer != nullptr);
199   rocksdb::Slice key(m_buffer + offset, key_len);
200   rocksdb::Slice value(m_buffer + offset + key_len, value_len);
201 
202   return std::make_pair(key, value);
203 }
204 
Rdb_sst_file_ordered(rocksdb::DB * const db,rocksdb::ColumnFamilyHandle * const cf,const rocksdb::DBOptions & db_options,const std::string & name,const bool tracing,size_t max_size)205 Rdb_sst_file_ordered::Rdb_sst_file_ordered(
206     rocksdb::DB *const db, rocksdb::ColumnFamilyHandle *const cf,
207     const rocksdb::DBOptions &db_options, const std::string &name,
208     const bool tracing, size_t max_size)
209     : m_use_stack(false),
210       m_first(true),
211       m_stack(max_size),
212       m_file(db, cf, db_options, name, tracing) {
213   m_stack.reset();
214 }
215 
apply_first()216 rocksdb::Status Rdb_sst_file_ordered::apply_first() {
217   rocksdb::Slice first_key_slice(m_first_key);
218   rocksdb::Slice first_value_slice(m_first_value);
219   rocksdb::Status s;
220 
221   if (m_use_stack) {
222     // Put the first key onto the stack
223     m_stack.push(first_key_slice, first_value_slice);
224   } else {
225     // Put the first key into the SST
226     s = m_file.put(first_key_slice, first_value_slice);
227     if (!s.ok()) {
228       return s;
229     }
230   }
231 
232   // Clear out the 'first' strings for next key/value
233   m_first_key.clear();
234   m_first_value.clear();
235 
236   return s;
237 }
238 
put(const rocksdb::Slice & key,const rocksdb::Slice & value)239 rocksdb::Status Rdb_sst_file_ordered::put(const rocksdb::Slice &key,
240                                           const rocksdb::Slice &value) {
241   rocksdb::Status s;
242 
243   // If this is the first key, just store a copy of the key and value
244   if (m_first) {
245     m_first_key = key.ToString();
246     m_first_value = value.ToString();
247     m_first = false;
248     return rocksdb::Status::OK();
249   }
250 
251   // If the first key is not empty we must be the second key.  Compare the
252   // new key with the first key to determine if the data will go straight
253   // the SST or be put on the stack to be retrieved later.
254   if (!m_first_key.empty()) {
255     rocksdb::Slice first_key_slice(m_first_key);
256     int cmp = m_file.compare(first_key_slice, key);
257     m_use_stack = (cmp > 0);
258 
259     // Apply the first key to the stack or SST
260     s = apply_first();
261     if (!s.ok()) {
262       return s;
263     }
264   }
265 
266   // Put this key on the stack or into the SST
267   if (m_use_stack) {
268     m_stack.push(key, value);
269   } else {
270     s = m_file.put(key, value);
271   }
272 
273   return s;
274 }
275 
commit()276 rocksdb::Status Rdb_sst_file_ordered::commit() {
277   rocksdb::Status s;
278 
279   // Make sure we get the first key if it was the only key given to us.
280   if (!m_first_key.empty()) {
281     s = apply_first();
282     if (!s.ok()) {
283       return s;
284     }
285   }
286 
287   if (m_use_stack) {
288     rocksdb::Slice key;
289     rocksdb::Slice value;
290 
291     // We are ready to commit, pull each entry off the stack (which reverses
292     // the original data) and send it to the SST file.
293     while (!m_stack.empty()) {
294       std::tie(key, value) = m_stack.top();
295       s = m_file.put(key, value);
296       if (!s.ok()) {
297         return s;
298       }
299 
300       m_stack.pop();
301     }
302 
303     // We have pulled everything off the stack, reset for the next time
304     m_stack.reset();
305     m_use_stack = false;
306   }
307 
308   // reset m_first
309   m_first = true;
310 
311   return m_file.commit();
312 }
313 
Rdb_sst_info(rocksdb::DB * const db,const std::string & tablename,const std::string & indexname,rocksdb::ColumnFamilyHandle * const cf,const rocksdb::DBOptions & db_options,const bool tracing)314 Rdb_sst_info::Rdb_sst_info(rocksdb::DB *const db, const std::string &tablename,
315                            const std::string &indexname,
316                            rocksdb::ColumnFamilyHandle *const cf,
317                            const rocksdb::DBOptions &db_options,
318                            const bool tracing)
319     : m_db(db),
320       m_cf(cf),
321       m_db_options(db_options),
322       m_curr_size(0),
323       m_sst_count(0),
324       m_background_error(HA_EXIT_SUCCESS),
325       m_done(false),
326       m_sst_file(nullptr),
327       m_tracing(tracing),
328       m_print_client_error(true) {
329   m_prefix = db->GetName() + "/";
330 
331   std::string normalized_table;
332   if (rdb_normalize_tablename(tablename.c_str(), &normalized_table)) {
333     // We failed to get a normalized table name.  This should never happen,
334     // but handle it anyway.
335     m_prefix += "fallback_" +
336                 std::to_string(reinterpret_cast<intptr_t>(
337                     reinterpret_cast<void *>(this))) +
338                 "_" + indexname + "_";
339   } else {
340     m_prefix += normalized_table + "_" + indexname + "_";
341   }
342 
343   // Unique filename generated to prevent collisions when the same table
344   // is loaded in parallel
345   m_prefix += std::to_string(m_prefix_counter.fetch_add(1)) + "_";
346 
347   rocksdb::ColumnFamilyDescriptor cf_descr;
348   const rocksdb::Status s = m_cf->GetDescriptor(&cf_descr);
349   if (!s.ok()) {
350     // Default size if we can't get the cf's target size
351     m_max_size = 64 * 1024 * 1024;
352   } else {
353     // Set the maximum size to 3 times the cf's target size
354     m_max_size = cf_descr.options.target_file_size_base * 3;
355   }
356   mysql_mutex_init(rdb_sst_commit_key, &m_commit_mutex, MY_MUTEX_INIT_FAST);
357 }
358 
~Rdb_sst_info()359 Rdb_sst_info::~Rdb_sst_info() {
360   DBUG_ASSERT(m_sst_file == nullptr);
361 
362   for (auto sst_file : m_committed_files) {
363     // In case something went wrong attempt to delete the temporary file.
364     // If everything went fine that file will have been renamed and this
365     // function call will fail.
366     std::remove(sst_file.c_str());
367   }
368   m_committed_files.clear();
369 
370   mysql_mutex_destroy(&m_commit_mutex);
371 }
372 
open_new_sst_file()373 int Rdb_sst_info::open_new_sst_file() {
374   DBUG_ASSERT(m_sst_file == nullptr);
375 
376   // Create the new sst file's name
377   const std::string name = m_prefix + std::to_string(m_sst_count++) + m_suffix;
378 
379   // Create the new sst file object
380   m_sst_file = new Rdb_sst_file_ordered(m_db, m_cf, m_db_options, name,
381                                         m_tracing, m_max_size);
382 
383   // Open the sst file
384   const rocksdb::Status s = m_sst_file->open();
385   if (!s.ok()) {
386     set_error_msg(m_sst_file->get_name(), s);
387     delete m_sst_file;
388     m_sst_file = nullptr;
389     return HA_ERR_ROCKSDB_BULK_LOAD;
390   }
391 
392   m_curr_size = 0;
393 
394   return HA_EXIT_SUCCESS;
395 }
396 
commit_sst_file(Rdb_sst_file_ordered * sst_file)397 void Rdb_sst_info::commit_sst_file(Rdb_sst_file_ordered *sst_file) {
398   const rocksdb::Status s = sst_file->commit();
399   if (!s.ok()) {
400     set_error_msg(sst_file->get_name(), s);
401     set_background_error(HA_ERR_ROCKSDB_BULK_LOAD);
402   }
403 
404   m_committed_files.push_back(sst_file->get_name());
405 
406   delete sst_file;
407 }
408 
close_curr_sst_file()409 void Rdb_sst_info::close_curr_sst_file() {
410   DBUG_ASSERT(m_sst_file != nullptr);
411   DBUG_ASSERT(m_curr_size > 0);
412 
413   commit_sst_file(m_sst_file);
414 
415   // Reset for next sst file
416   m_sst_file = nullptr;
417   m_curr_size = 0;
418 }
419 
put(const rocksdb::Slice & key,const rocksdb::Slice & value)420 int Rdb_sst_info::put(const rocksdb::Slice &key, const rocksdb::Slice &value) {
421   int rc;
422 
423   DBUG_ASSERT(!m_done);
424 
425   if (m_curr_size + key.size() + value.size() >= m_max_size) {
426     // The current sst file has reached its maximum, close it out
427     close_curr_sst_file();
428 
429     // While we are here, check to see if we have had any errors from the
430     // background thread - we don't want to wait for the end to report them
431     if (have_background_error()) {
432       return get_and_reset_background_error();
433     }
434   }
435 
436   if (m_curr_size == 0) {
437     // We don't have an sst file open - open one
438     rc = open_new_sst_file();
439     if (rc != 0) {
440       return rc;
441     }
442   }
443 
444   DBUG_ASSERT(m_sst_file != nullptr);
445 
446   // Add the key/value to the current sst file
447   const rocksdb::Status s = m_sst_file->put(key, value);
448   if (!s.ok()) {
449     set_error_msg(m_sst_file->get_name(), s);
450     return HA_ERR_ROCKSDB_BULK_LOAD;
451   }
452 
453   m_curr_size += key.size() + value.size();
454 
455   return HA_EXIT_SUCCESS;
456 }
457 
458 /*
459   Finish the current work and return the list of SST files ready to be
460   ingested. This function need to be idempotent and atomic
461  */
finish(Rdb_sst_commit_info * commit_info,bool print_client_error)462 int Rdb_sst_info::finish(Rdb_sst_commit_info *commit_info,
463                          bool print_client_error) {
464   int ret = HA_EXIT_SUCCESS;
465 
466   // Both the transaction clean up and the ha_rocksdb handler have
467   // references to this Rdb_sst_info and both can call commit, so
468   // synchronize on the object here.
469   // This also means in such case the bulk loading operation stop being truly
470   // atomic, and we should consider fixing this in the future
471   RDB_MUTEX_LOCK_CHECK(m_commit_mutex);
472 
473   if (is_done()) {
474     RDB_MUTEX_UNLOCK_CHECK(m_commit_mutex);
475     return ret;
476   }
477 
478   m_print_client_error = print_client_error;
479 
480   if (m_curr_size > 0) {
481     // Close out any existing files
482     close_curr_sst_file();
483   }
484 
485   // This checks out the list of files so that the caller can collect/group
486   // them and ingest them all in one go, and any racing calls to commit
487   // won't see them at all
488   commit_info->init(m_cf, std::move(m_committed_files));
489   DBUG_ASSERT(m_committed_files.size() == 0);
490 
491   m_done = true;
492   RDB_MUTEX_UNLOCK_CHECK(m_commit_mutex);
493 
494   // Did we get any errors?
495   if (have_background_error()) {
496     ret = get_and_reset_background_error();
497   }
498 
499   m_print_client_error = true;
500   return ret;
501 }
502 
set_error_msg(const std::string & sst_file_name,const rocksdb::Status & s)503 void Rdb_sst_info::set_error_msg(const std::string &sst_file_name,
504                                  const rocksdb::Status &s) {
505   if (!m_print_client_error) return;
506 
507   report_error_msg(s, sst_file_name.c_str());
508 }
509 
report_error_msg(const rocksdb::Status & s,const char * sst_file_name)510 void Rdb_sst_info::report_error_msg(const rocksdb::Status &s,
511                                     const char *sst_file_name) {
512   if (s.IsInvalidArgument() &&
513       strcmp(s.getState(), "Keys must be added in strict ascending order.") == 0) {
514     my_printf_error(ER_KEYS_OUT_OF_ORDER,
515                     "Rows must be inserted in primary key order "
516                     "during bulk load operation",
517                     MYF(0));
518   } else if (s.IsInvalidArgument() &&
519              strcmp(s.getState(), "Global seqno is required, but disabled") ==
520                  0) {
521     my_printf_error(ER_OVERLAPPING_KEYS,
522                     "Rows inserted during bulk load "
523                     "must not overlap existing rows",
524                     MYF(0));
525   } else {
526     my_printf_error(ER_UNKNOWN_ERROR, "[%s] bulk load error: %s", MYF(0),
527                     sst_file_name, s.ToString().c_str());
528   }
529 }
530 
init(const rocksdb::DB * const db)531 void Rdb_sst_info::init(const rocksdb::DB *const db) {
532   const std::string path = db->GetName() + FN_DIRSEP;
533   struct st_my_dir *const dir_info = my_dir(path.c_str(), MYF(MY_DONT_SORT));
534 
535   // Access the directory
536   if (dir_info == nullptr) {
537     // NO_LINT_DEBUG
538     sql_print_warning("RocksDB: Could not access database directory: %s",
539                       path.c_str());
540     return;
541   }
542 
543   // Scan through the files in the directory
544   const struct fileinfo *file_info = dir_info->dir_entry;
545   for (uint ii= 0; ii < dir_info->number_of_files; ii++, file_info++) {
546     // find any files ending with m_suffix ...
547     const std::string name = file_info->name;
548     const size_t pos = name.find(m_suffix);
549     if (pos != std::string::npos && name.size() - pos == m_suffix.size()) {
550       // ... and remove them
551       const std::string fullname = path + name;
552       my_delete(fullname.c_str(), MYF(0));
553     }
554   }
555 
556   // Release the directory entry
557   my_dirend(dir_info);
558 }
559 
560 std::atomic<uint64_t> Rdb_sst_info::m_prefix_counter(0);
561 std::string Rdb_sst_info::m_suffix = ".bulk_load.tmp";
562 }  // namespace myrocks
563