1 /*
2 Copyright (c) 2016, Facebook, Inc.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA */
16
17 /* For PRIu64 use below: */
18 #define __STDC_FORMAT_MACROS
19
20 #include <my_global.h>
21
22 /* This C++ file's header file */
23 #include "./rdb_sst_info.h"
24
25 #include <inttypes.h>
26
27 /* C++ standard header files */
28 #include <cstdio>
29 #include <string>
30 #include <utility>
31 #include <vector>
32
33 /* MySQL header files */
34 #include <mysqld_error.h>
35 #include "../sql/log.h"
36 #include "./my_dir.h"
37
38 /* RocksDB header files */
39 #include "rocksdb/db.h"
40 #include "rocksdb/options.h"
41
42 /* MyRocks header files */
43 #include "./ha_rocksdb.h"
44 #include "./ha_rocksdb_proto.h"
45 #include "./rdb_cf_options.h"
46 #include "./rdb_psi.h"
47
48 namespace myrocks {
49
Rdb_sst_file(rocksdb::DB * const db,rocksdb::ColumnFamilyHandle * const cf,const rocksdb::DBOptions & db_options,const std::string & name,const bool tracing)50 Rdb_sst_file_ordered::Rdb_sst_file::Rdb_sst_file(
51 rocksdb::DB *const db, rocksdb::ColumnFamilyHandle *const cf,
52 const rocksdb::DBOptions &db_options, const std::string &name,
53 const bool tracing)
54 : m_db(db),
55 m_cf(cf),
56 m_db_options(db_options),
57 m_sst_file_writer(nullptr),
58 m_name(name),
59 m_tracing(tracing),
60 m_comparator(cf->GetComparator()) {
61 DBUG_ASSERT(db != nullptr);
62 DBUG_ASSERT(cf != nullptr);
63 }
64
~Rdb_sst_file()65 Rdb_sst_file_ordered::Rdb_sst_file::~Rdb_sst_file() {
66 // Make sure we clean up
67 delete m_sst_file_writer;
68 m_sst_file_writer = nullptr;
69 }
70
open()71 rocksdb::Status Rdb_sst_file_ordered::Rdb_sst_file::open() {
72 DBUG_ASSERT(m_sst_file_writer == nullptr);
73
74 rocksdb::ColumnFamilyDescriptor cf_descr;
75
76 rocksdb::Status s = m_cf->GetDescriptor(&cf_descr);
77 if (!s.ok()) {
78 return s;
79 }
80
81 // Create an sst file writer with the current options and comparator
82 const rocksdb::EnvOptions env_options(m_db_options);
83 const rocksdb::Options options(m_db_options, cf_descr.options);
84
85 m_sst_file_writer =
86 new rocksdb::SstFileWriter(env_options, options, m_comparator, m_cf, true,
87 rocksdb::Env::IOPriority::IO_TOTAL,
88 cf_descr.options.optimize_filters_for_hits);
89
90 s = m_sst_file_writer->Open(m_name);
91 if (m_tracing) {
92 // NO_LINT_DEBUG
93 sql_print_information("SST Tracing: Open(%s) returned %s", m_name.c_str(),
94 s.ok() ? "ok" : "not ok");
95 }
96
97 if (!s.ok()) {
98 delete m_sst_file_writer;
99 m_sst_file_writer = nullptr;
100 }
101
102 return s;
103 }
104
put(const rocksdb::Slice & key,const rocksdb::Slice & value)105 rocksdb::Status Rdb_sst_file_ordered::Rdb_sst_file::put(
106 const rocksdb::Slice &key, const rocksdb::Slice &value) {
107 DBUG_ASSERT(m_sst_file_writer != nullptr);
108
109 #ifdef __GNUC__
110 // Add the specified key/value to the sst file writer
111 #pragma GCC diagnostic push
112 #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
113 #endif
114 #ifdef _MSC_VER
115 #pragma warning (disable : 4996)
116 #endif
117 return m_sst_file_writer->Add(key, value);
118 }
119
generateKey(const std::string & key)120 std::string Rdb_sst_file_ordered::Rdb_sst_file::generateKey(
121 const std::string &key) {
122 static char const hexdigit[] = {'0', '1', '2', '3', '4', '5', '6', '7',
123 '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'};
124
125 std::string res;
126
127 res.reserve(key.size() * 2);
128
129 for (auto ch : key) {
130 res += hexdigit[((uint8_t)ch) >> 4];
131 res += hexdigit[((uint8_t)ch) & 0x0F];
132 }
133
134 return res;
135 }
136
137 // This function is run by the background thread
commit()138 rocksdb::Status Rdb_sst_file_ordered::Rdb_sst_file::commit() {
139 DBUG_ASSERT(m_sst_file_writer != nullptr);
140
141 rocksdb::Status s;
142 rocksdb::ExternalSstFileInfo fileinfo; /// Finish may should be modified
143
144 // Close out the sst file
145 s = m_sst_file_writer->Finish(&fileinfo);
146 if (m_tracing) {
147 // NO_LINT_DEBUG
148 sql_print_information("SST Tracing: Finish returned %s",
149 s.ok() ? "ok" : "not ok");
150 }
151
152 if (s.ok()) {
153 if (m_tracing) {
154 // NO_LINT_DEBUG
155 sql_print_information(
156 "SST Tracing: Adding file %s, smallest key: %s, "
157 "largest key: %s, file size: %" PRIu64
158 ", "
159 "num_entries: %" PRIu64,
160 fileinfo.file_path.c_str(),
161 generateKey(fileinfo.smallest_key).c_str(),
162 generateKey(fileinfo.largest_key).c_str(), fileinfo.file_size,
163 fileinfo.num_entries);
164 }
165 }
166
167 delete m_sst_file_writer;
168 m_sst_file_writer = nullptr;
169
170 return s;
171 }
172
push(const rocksdb::Slice & key,const rocksdb::Slice & value)173 void Rdb_sst_file_ordered::Rdb_sst_stack::push(const rocksdb::Slice &key,
174 const rocksdb::Slice &value) {
175 if (m_buffer == nullptr) {
176 m_buffer = new char[m_buffer_size];
177 }
178
179 // Put the actual key and value data unto our stack
180 size_t key_offset = m_offset;
181 memcpy(m_buffer + m_offset, key.data(), key.size());
182 m_offset += key.size();
183 memcpy(m_buffer + m_offset, value.data(), value.size());
184 m_offset += value.size();
185
186 // Push just the offset, the key length and the value length onto the stack
187 m_stack.push(std::make_tuple(key_offset, key.size(), value.size()));
188 }
189
190 std::pair<rocksdb::Slice, rocksdb::Slice>
top()191 Rdb_sst_file_ordered::Rdb_sst_stack::top() {
192 size_t offset, key_len, value_len;
193 // Pop the next item off the internal stack
194 std::tie(offset, key_len, value_len) = m_stack.top();
195
196 // Make slices from the offset (first), key length (second), and value
197 // length (third)
198 DBUG_ASSERT(m_buffer != nullptr);
199 rocksdb::Slice key(m_buffer + offset, key_len);
200 rocksdb::Slice value(m_buffer + offset + key_len, value_len);
201
202 return std::make_pair(key, value);
203 }
204
Rdb_sst_file_ordered(rocksdb::DB * const db,rocksdb::ColumnFamilyHandle * const cf,const rocksdb::DBOptions & db_options,const std::string & name,const bool tracing,size_t max_size)205 Rdb_sst_file_ordered::Rdb_sst_file_ordered(
206 rocksdb::DB *const db, rocksdb::ColumnFamilyHandle *const cf,
207 const rocksdb::DBOptions &db_options, const std::string &name,
208 const bool tracing, size_t max_size)
209 : m_use_stack(false),
210 m_first(true),
211 m_stack(max_size),
212 m_file(db, cf, db_options, name, tracing) {
213 m_stack.reset();
214 }
215
apply_first()216 rocksdb::Status Rdb_sst_file_ordered::apply_first() {
217 rocksdb::Slice first_key_slice(m_first_key);
218 rocksdb::Slice first_value_slice(m_first_value);
219 rocksdb::Status s;
220
221 if (m_use_stack) {
222 // Put the first key onto the stack
223 m_stack.push(first_key_slice, first_value_slice);
224 } else {
225 // Put the first key into the SST
226 s = m_file.put(first_key_slice, first_value_slice);
227 if (!s.ok()) {
228 return s;
229 }
230 }
231
232 // Clear out the 'first' strings for next key/value
233 m_first_key.clear();
234 m_first_value.clear();
235
236 return s;
237 }
238
put(const rocksdb::Slice & key,const rocksdb::Slice & value)239 rocksdb::Status Rdb_sst_file_ordered::put(const rocksdb::Slice &key,
240 const rocksdb::Slice &value) {
241 rocksdb::Status s;
242
243 // If this is the first key, just store a copy of the key and value
244 if (m_first) {
245 m_first_key = key.ToString();
246 m_first_value = value.ToString();
247 m_first = false;
248 return rocksdb::Status::OK();
249 }
250
251 // If the first key is not empty we must be the second key. Compare the
252 // new key with the first key to determine if the data will go straight
253 // the SST or be put on the stack to be retrieved later.
254 if (!m_first_key.empty()) {
255 rocksdb::Slice first_key_slice(m_first_key);
256 int cmp = m_file.compare(first_key_slice, key);
257 m_use_stack = (cmp > 0);
258
259 // Apply the first key to the stack or SST
260 s = apply_first();
261 if (!s.ok()) {
262 return s;
263 }
264 }
265
266 // Put this key on the stack or into the SST
267 if (m_use_stack) {
268 m_stack.push(key, value);
269 } else {
270 s = m_file.put(key, value);
271 }
272
273 return s;
274 }
275
commit()276 rocksdb::Status Rdb_sst_file_ordered::commit() {
277 rocksdb::Status s;
278
279 // Make sure we get the first key if it was the only key given to us.
280 if (!m_first_key.empty()) {
281 s = apply_first();
282 if (!s.ok()) {
283 return s;
284 }
285 }
286
287 if (m_use_stack) {
288 rocksdb::Slice key;
289 rocksdb::Slice value;
290
291 // We are ready to commit, pull each entry off the stack (which reverses
292 // the original data) and send it to the SST file.
293 while (!m_stack.empty()) {
294 std::tie(key, value) = m_stack.top();
295 s = m_file.put(key, value);
296 if (!s.ok()) {
297 return s;
298 }
299
300 m_stack.pop();
301 }
302
303 // We have pulled everything off the stack, reset for the next time
304 m_stack.reset();
305 m_use_stack = false;
306 }
307
308 // reset m_first
309 m_first = true;
310
311 return m_file.commit();
312 }
313
Rdb_sst_info(rocksdb::DB * const db,const std::string & tablename,const std::string & indexname,rocksdb::ColumnFamilyHandle * const cf,const rocksdb::DBOptions & db_options,const bool tracing)314 Rdb_sst_info::Rdb_sst_info(rocksdb::DB *const db, const std::string &tablename,
315 const std::string &indexname,
316 rocksdb::ColumnFamilyHandle *const cf,
317 const rocksdb::DBOptions &db_options,
318 const bool tracing)
319 : m_db(db),
320 m_cf(cf),
321 m_db_options(db_options),
322 m_curr_size(0),
323 m_sst_count(0),
324 m_background_error(HA_EXIT_SUCCESS),
325 m_done(false),
326 m_sst_file(nullptr),
327 m_tracing(tracing),
328 m_print_client_error(true) {
329 m_prefix = db->GetName() + "/";
330
331 std::string normalized_table;
332 if (rdb_normalize_tablename(tablename.c_str(), &normalized_table)) {
333 // We failed to get a normalized table name. This should never happen,
334 // but handle it anyway.
335 m_prefix += "fallback_" +
336 std::to_string(reinterpret_cast<intptr_t>(
337 reinterpret_cast<void *>(this))) +
338 "_" + indexname + "_";
339 } else {
340 m_prefix += normalized_table + "_" + indexname + "_";
341 }
342
343 // Unique filename generated to prevent collisions when the same table
344 // is loaded in parallel
345 m_prefix += std::to_string(m_prefix_counter.fetch_add(1)) + "_";
346
347 rocksdb::ColumnFamilyDescriptor cf_descr;
348 const rocksdb::Status s = m_cf->GetDescriptor(&cf_descr);
349 if (!s.ok()) {
350 // Default size if we can't get the cf's target size
351 m_max_size = 64 * 1024 * 1024;
352 } else {
353 // Set the maximum size to 3 times the cf's target size
354 m_max_size = cf_descr.options.target_file_size_base * 3;
355 }
356 mysql_mutex_init(rdb_sst_commit_key, &m_commit_mutex, MY_MUTEX_INIT_FAST);
357 }
358
~Rdb_sst_info()359 Rdb_sst_info::~Rdb_sst_info() {
360 DBUG_ASSERT(m_sst_file == nullptr);
361
362 for (auto sst_file : m_committed_files) {
363 // In case something went wrong attempt to delete the temporary file.
364 // If everything went fine that file will have been renamed and this
365 // function call will fail.
366 std::remove(sst_file.c_str());
367 }
368 m_committed_files.clear();
369
370 mysql_mutex_destroy(&m_commit_mutex);
371 }
372
open_new_sst_file()373 int Rdb_sst_info::open_new_sst_file() {
374 DBUG_ASSERT(m_sst_file == nullptr);
375
376 // Create the new sst file's name
377 const std::string name = m_prefix + std::to_string(m_sst_count++) + m_suffix;
378
379 // Create the new sst file object
380 m_sst_file = new Rdb_sst_file_ordered(m_db, m_cf, m_db_options, name,
381 m_tracing, m_max_size);
382
383 // Open the sst file
384 const rocksdb::Status s = m_sst_file->open();
385 if (!s.ok()) {
386 set_error_msg(m_sst_file->get_name(), s);
387 delete m_sst_file;
388 m_sst_file = nullptr;
389 return HA_ERR_ROCKSDB_BULK_LOAD;
390 }
391
392 m_curr_size = 0;
393
394 return HA_EXIT_SUCCESS;
395 }
396
commit_sst_file(Rdb_sst_file_ordered * sst_file)397 void Rdb_sst_info::commit_sst_file(Rdb_sst_file_ordered *sst_file) {
398 const rocksdb::Status s = sst_file->commit();
399 if (!s.ok()) {
400 set_error_msg(sst_file->get_name(), s);
401 set_background_error(HA_ERR_ROCKSDB_BULK_LOAD);
402 }
403
404 m_committed_files.push_back(sst_file->get_name());
405
406 delete sst_file;
407 }
408
close_curr_sst_file()409 void Rdb_sst_info::close_curr_sst_file() {
410 DBUG_ASSERT(m_sst_file != nullptr);
411 DBUG_ASSERT(m_curr_size > 0);
412
413 commit_sst_file(m_sst_file);
414
415 // Reset for next sst file
416 m_sst_file = nullptr;
417 m_curr_size = 0;
418 }
419
put(const rocksdb::Slice & key,const rocksdb::Slice & value)420 int Rdb_sst_info::put(const rocksdb::Slice &key, const rocksdb::Slice &value) {
421 int rc;
422
423 DBUG_ASSERT(!m_done);
424
425 if (m_curr_size + key.size() + value.size() >= m_max_size) {
426 // The current sst file has reached its maximum, close it out
427 close_curr_sst_file();
428
429 // While we are here, check to see if we have had any errors from the
430 // background thread - we don't want to wait for the end to report them
431 if (have_background_error()) {
432 return get_and_reset_background_error();
433 }
434 }
435
436 if (m_curr_size == 0) {
437 // We don't have an sst file open - open one
438 rc = open_new_sst_file();
439 if (rc != 0) {
440 return rc;
441 }
442 }
443
444 DBUG_ASSERT(m_sst_file != nullptr);
445
446 // Add the key/value to the current sst file
447 const rocksdb::Status s = m_sst_file->put(key, value);
448 if (!s.ok()) {
449 set_error_msg(m_sst_file->get_name(), s);
450 return HA_ERR_ROCKSDB_BULK_LOAD;
451 }
452
453 m_curr_size += key.size() + value.size();
454
455 return HA_EXIT_SUCCESS;
456 }
457
458 /*
459 Finish the current work and return the list of SST files ready to be
460 ingested. This function need to be idempotent and atomic
461 */
finish(Rdb_sst_commit_info * commit_info,bool print_client_error)462 int Rdb_sst_info::finish(Rdb_sst_commit_info *commit_info,
463 bool print_client_error) {
464 int ret = HA_EXIT_SUCCESS;
465
466 // Both the transaction clean up and the ha_rocksdb handler have
467 // references to this Rdb_sst_info and both can call commit, so
468 // synchronize on the object here.
469 // This also means in such case the bulk loading operation stop being truly
470 // atomic, and we should consider fixing this in the future
471 RDB_MUTEX_LOCK_CHECK(m_commit_mutex);
472
473 if (is_done()) {
474 RDB_MUTEX_UNLOCK_CHECK(m_commit_mutex);
475 return ret;
476 }
477
478 m_print_client_error = print_client_error;
479
480 if (m_curr_size > 0) {
481 // Close out any existing files
482 close_curr_sst_file();
483 }
484
485 // This checks out the list of files so that the caller can collect/group
486 // them and ingest them all in one go, and any racing calls to commit
487 // won't see them at all
488 commit_info->init(m_cf, std::move(m_committed_files));
489 DBUG_ASSERT(m_committed_files.size() == 0);
490
491 m_done = true;
492 RDB_MUTEX_UNLOCK_CHECK(m_commit_mutex);
493
494 // Did we get any errors?
495 if (have_background_error()) {
496 ret = get_and_reset_background_error();
497 }
498
499 m_print_client_error = true;
500 return ret;
501 }
502
set_error_msg(const std::string & sst_file_name,const rocksdb::Status & s)503 void Rdb_sst_info::set_error_msg(const std::string &sst_file_name,
504 const rocksdb::Status &s) {
505 if (!m_print_client_error) return;
506
507 report_error_msg(s, sst_file_name.c_str());
508 }
509
report_error_msg(const rocksdb::Status & s,const char * sst_file_name)510 void Rdb_sst_info::report_error_msg(const rocksdb::Status &s,
511 const char *sst_file_name) {
512 if (s.IsInvalidArgument() &&
513 strcmp(s.getState(), "Keys must be added in strict ascending order.") == 0) {
514 my_printf_error(ER_KEYS_OUT_OF_ORDER,
515 "Rows must be inserted in primary key order "
516 "during bulk load operation",
517 MYF(0));
518 } else if (s.IsInvalidArgument() &&
519 strcmp(s.getState(), "Global seqno is required, but disabled") ==
520 0) {
521 my_printf_error(ER_OVERLAPPING_KEYS,
522 "Rows inserted during bulk load "
523 "must not overlap existing rows",
524 MYF(0));
525 } else {
526 my_printf_error(ER_UNKNOWN_ERROR, "[%s] bulk load error: %s", MYF(0),
527 sst_file_name, s.ToString().c_str());
528 }
529 }
530
init(const rocksdb::DB * const db)531 void Rdb_sst_info::init(const rocksdb::DB *const db) {
532 const std::string path = db->GetName() + FN_DIRSEP;
533 struct st_my_dir *const dir_info = my_dir(path.c_str(), MYF(MY_DONT_SORT));
534
535 // Access the directory
536 if (dir_info == nullptr) {
537 // NO_LINT_DEBUG
538 sql_print_warning("RocksDB: Could not access database directory: %s",
539 path.c_str());
540 return;
541 }
542
543 // Scan through the files in the directory
544 const struct fileinfo *file_info = dir_info->dir_entry;
545 for (uint ii= 0; ii < dir_info->number_of_files; ii++, file_info++) {
546 // find any files ending with m_suffix ...
547 const std::string name = file_info->name;
548 const size_t pos = name.find(m_suffix);
549 if (pos != std::string::npos && name.size() - pos == m_suffix.size()) {
550 // ... and remove them
551 const std::string fullname = path + name;
552 my_delete(fullname.c_str(), MYF(0));
553 }
554 }
555
556 // Release the directory entry
557 my_dirend(dir_info);
558 }
559
560 std::atomic<uint64_t> Rdb_sst_info::m_prefix_counter(0);
561 std::string Rdb_sst_info::m_suffix = ".bulk_load.tmp";
562 } // namespace myrocks
563