1 /*
2 Copyright (c) 2016, Facebook, Inc.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
16
17 /* This C++ file's header file */
18 #include "./rdb_sst_info.h"
19
20 /* C++ standard header files */
21 #include <cstdio>
22 #include <string>
23 #include <utility>
24 #include <vector>
25
26 /* MySQL header files */
27 #include "log.h"
28 #include "my_dir.h"
29 #include "sql_class.h"
30
31 /* RocksDB header files */
32 #include "rocksdb/db.h"
33 #include "rocksdb/options.h"
34
35 /* MyRocks header files */
36 #include "./ha_rocksdb.h"
37 #include "./ha_rocksdb_proto.h"
38 #include "./rdb_cf_options.h"
39 #include "./rdb_psi.h"
40
41 namespace myrocks {
42
Rdb_sst_file(rocksdb::DB * const db,rocksdb::ColumnFamilyHandle * const cf,const rocksdb::DBOptions & db_options,const std::string & name,const bool tracing)43 Rdb_sst_file_ordered::Rdb_sst_file::Rdb_sst_file(
44 rocksdb::DB *const db, rocksdb::ColumnFamilyHandle *const cf,
45 const rocksdb::DBOptions &db_options, const std::string &name,
46 const bool tracing)
47 : m_db(db),
48 m_cf(cf),
49 m_db_options(db_options),
50 m_sst_file_writer(nullptr),
51 m_name(name),
52 m_tracing(tracing),
53 m_comparator(cf->GetComparator()) {
54 assert(db != nullptr);
55 assert(cf != nullptr);
56 }
57
~Rdb_sst_file()58 Rdb_sst_file_ordered::Rdb_sst_file::~Rdb_sst_file() {
59 // Make sure we clean up
60 delete m_sst_file_writer;
61 m_sst_file_writer = nullptr;
62 }
63
open()64 rocksdb::Status Rdb_sst_file_ordered::Rdb_sst_file::open() {
65 assert(m_sst_file_writer == nullptr);
66
67 rocksdb::ColumnFamilyDescriptor cf_descr;
68
69 rocksdb::Status s = m_cf->GetDescriptor(&cf_descr);
70 if (!s.ok()) {
71 return s;
72 }
73
74 // Create an sst file writer with the current options and comparator
75 const rocksdb::EnvOptions env_options(m_db_options);
76 const rocksdb::Options options(m_db_options, cf_descr.options);
77
78 m_sst_file_writer =
79 new rocksdb::SstFileWriter(env_options, options, m_comparator, m_cf, true,
80 rocksdb::Env::IOPriority::IO_TOTAL,
81 cf_descr.options.optimize_filters_for_hits);
82
83 s = m_sst_file_writer->Open(m_name);
84 if (m_tracing) {
85 // NO_LINT_DEBUG
86 sql_print_information("SST Tracing: Open(%s) returned %s", m_name.c_str(),
87 s.ok() ? "ok" : "not ok");
88 }
89
90 if (!s.ok()) {
91 delete m_sst_file_writer;
92 m_sst_file_writer = nullptr;
93 }
94
95 return s;
96 }
97
put(const rocksdb::Slice & key,const rocksdb::Slice & value)98 rocksdb::Status Rdb_sst_file_ordered::Rdb_sst_file::put(
99 const rocksdb::Slice &key, const rocksdb::Slice &value) {
100 assert(m_sst_file_writer != nullptr);
101
102 // Add the specified key/value to the sst file writer
103 #pragma GCC diagnostic push
104 #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
105 return m_sst_file_writer->Add(key, value);
106 #pragma GCC diagnostic pop
107 }
108
generateKey(const std::string & key)109 std::string Rdb_sst_file_ordered::Rdb_sst_file::generateKey(
110 const std::string &key) {
111 static char const hexdigit[] = {'0', '1', '2', '3', '4', '5', '6', '7',
112 '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'};
113
114 std::string res;
115
116 res.reserve(key.size() * 2);
117
118 for (auto ch : key) {
119 res += hexdigit[((uint8_t)ch) >> 4];
120 res += hexdigit[((uint8_t)ch) & 0x0F];
121 }
122
123 return res;
124 }
125
126 // This function is run by the background thread
commit()127 rocksdb::Status Rdb_sst_file_ordered::Rdb_sst_file::commit() {
128 assert(m_sst_file_writer != nullptr);
129
130 rocksdb::Status s;
131 rocksdb::ExternalSstFileInfo fileinfo; /// Finish may should be modified
132
133 // Close out the sst file
134 s = m_sst_file_writer->Finish(&fileinfo);
135 if (m_tracing) {
136 // NO_LINT_DEBUG
137 sql_print_information("SST Tracing: Finish returned %s",
138 s.ok() ? "ok" : "not ok");
139 }
140
141 if (s.ok()) {
142 if (m_tracing) {
143 // NO_LINT_DEBUG
144 sql_print_information(
145 "SST Tracing: Adding file %s, smallest key: %s, "
146 "largest key: %s, file size: %lu, "
147 "num_entries: %lu",
148 fileinfo.file_path.c_str(),
149 generateKey(fileinfo.smallest_key).c_str(),
150 generateKey(fileinfo.largest_key).c_str(), fileinfo.file_size,
151 fileinfo.num_entries);
152 }
153 }
154
155 delete m_sst_file_writer;
156 m_sst_file_writer = nullptr;
157
158 return s;
159 }
160
push(const rocksdb::Slice & key,const rocksdb::Slice & value)161 void Rdb_sst_file_ordered::Rdb_sst_stack::push(const rocksdb::Slice &key,
162 const rocksdb::Slice &value) {
163 if (m_buffer == nullptr) {
164 m_buffer = new char[m_buffer_size];
165 }
166
167 // Put the actual key and value data unto our stack
168 size_t key_offset = m_offset;
169 memcpy(m_buffer + m_offset, key.data(), key.size());
170 m_offset += key.size();
171 memcpy(m_buffer + m_offset, value.data(), value.size());
172 m_offset += value.size();
173
174 // Push just the offset, the key length and the value length onto the stack
175 m_stack.push(std::make_tuple(key_offset, key.size(), value.size()));
176 }
177
178 std::pair<rocksdb::Slice, rocksdb::Slice>
top()179 Rdb_sst_file_ordered::Rdb_sst_stack::top() {
180 size_t offset, key_len, value_len;
181 // Pop the next item off the internal stack
182 std::tie(offset, key_len, value_len) = m_stack.top();
183
184 // Make slices from the offset (first), key length (second), and value
185 // length (third)
186 assert(m_buffer != nullptr);
187 rocksdb::Slice key(m_buffer + offset, key_len);
188 rocksdb::Slice value(m_buffer + offset + key_len, value_len);
189
190 return std::make_pair(key, value);
191 }
192
Rdb_sst_file_ordered(rocksdb::DB * const db,rocksdb::ColumnFamilyHandle * const cf,const rocksdb::DBOptions & db_options,const std::string & name,const bool tracing,size_t max_size)193 Rdb_sst_file_ordered::Rdb_sst_file_ordered(
194 rocksdb::DB *const db, rocksdb::ColumnFamilyHandle *const cf,
195 const rocksdb::DBOptions &db_options, const std::string &name,
196 const bool tracing, size_t max_size)
197 : m_use_stack(false),
198 m_first(true),
199 m_stack(max_size),
200 m_file(db, cf, db_options, name, tracing) {
201 m_stack.reset();
202 }
203
apply_first()204 rocksdb::Status Rdb_sst_file_ordered::apply_first() {
205 rocksdb::Slice first_key_slice(m_first_key);
206 rocksdb::Slice first_value_slice(m_first_value);
207 rocksdb::Status s;
208
209 if (m_use_stack) {
210 // Put the first key onto the stack
211 m_stack.push(first_key_slice, first_value_slice);
212 } else {
213 // Put the first key into the SST
214 s = m_file.put(first_key_slice, first_value_slice);
215 if (!s.ok()) {
216 return s;
217 }
218 }
219
220 // Clear out the 'first' strings for next key/value
221 m_first_key.clear();
222 m_first_value.clear();
223
224 return s;
225 }
226
put(const rocksdb::Slice & key,const rocksdb::Slice & value)227 rocksdb::Status Rdb_sst_file_ordered::put(const rocksdb::Slice &key,
228 const rocksdb::Slice &value) {
229 rocksdb::Status s;
230
231 // If this is the first key, just store a copy of the key and value
232 if (m_first) {
233 m_first_key = key.ToString();
234 m_first_value = value.ToString();
235 m_first = false;
236 return rocksdb::Status::OK();
237 }
238
239 // If the first key is not empty we must be the second key. Compare the
240 // new key with the first key to determine if the data will go straight
241 // the SST or be put on the stack to be retrieved later.
242 if (!m_first_key.empty()) {
243 rocksdb::Slice first_key_slice(m_first_key);
244 int cmp = m_file.compare(first_key_slice, key);
245 m_use_stack = (cmp > 0);
246
247 // Apply the first key to the stack or SST
248 s = apply_first();
249 if (!s.ok()) {
250 return s;
251 }
252 }
253
254 // Put this key on the stack or into the SST
255 if (m_use_stack) {
256 m_stack.push(key, value);
257 } else {
258 s = m_file.put(key, value);
259 }
260
261 return s;
262 }
263
commit()264 rocksdb::Status Rdb_sst_file_ordered::commit() {
265 rocksdb::Status s;
266
267 // Make sure we get the first key if it was the only key given to us.
268 if (!m_first_key.empty()) {
269 s = apply_first();
270 if (!s.ok()) {
271 return s;
272 }
273 }
274
275 if (m_use_stack) {
276 rocksdb::Slice key;
277 rocksdb::Slice value;
278
279 // We are ready to commit, pull each entry off the stack (which reverses
280 // the original data) and send it to the SST file.
281 while (!m_stack.empty()) {
282 std::tie(key, value) = m_stack.top();
283 s = m_file.put(key, value);
284 if (!s.ok()) {
285 return s;
286 }
287
288 m_stack.pop();
289 }
290
291 // We have pulled everything off the stack, reset for the next time
292 m_stack.reset();
293 m_use_stack = false;
294 }
295
296 // reset m_first
297 m_first = true;
298
299 return m_file.commit();
300 }
301
Rdb_sst_info(rocksdb::DB * const db,const std::string & tablename,const std::string & indexname,rocksdb::ColumnFamilyHandle * const cf,const rocksdb::DBOptions & db_options,const bool tracing)302 Rdb_sst_info::Rdb_sst_info(rocksdb::DB *const db, const std::string &tablename,
303 const std::string &indexname,
304 rocksdb::ColumnFamilyHandle *const cf,
305 const rocksdb::DBOptions &db_options,
306 const bool tracing)
307 : m_db(db),
308 m_cf(cf),
309 m_db_options(db_options),
310 m_curr_size(0),
311 m_sst_count(0),
312 m_background_error(HA_EXIT_SUCCESS),
313 m_done(false),
314 m_sst_file(nullptr),
315 m_tracing(tracing),
316 m_print_client_error(true) {
317 m_prefix = db->GetName() + "/";
318
319 std::string normalized_table;
320 if (rdb_normalize_tablename(tablename.c_str(), &normalized_table)) {
321 // We failed to get a normalized table name. This should never happen,
322 // but handle it anyway.
323 m_prefix += "fallback_" +
324 std::to_string(reinterpret_cast<intptr_t>(
325 reinterpret_cast<void *>(this))) +
326 "_" + indexname + "_";
327 } else {
328 m_prefix += normalized_table + "_" + indexname + "_";
329 }
330
331 // Unique filename generated to prevent collisions when the same table
332 // is loaded in parallel
333 m_prefix += std::to_string(m_prefix_counter.fetch_add(1)) + "_";
334
335 rocksdb::ColumnFamilyDescriptor cf_descr;
336 const rocksdb::Status s = m_cf->GetDescriptor(&cf_descr);
337 if (!s.ok()) {
338 // Default size if we can't get the cf's target size
339 m_max_size = 64 * 1024 * 1024;
340 } else {
341 // Set the maximum size to 3 times the cf's target size
342 m_max_size = cf_descr.options.target_file_size_base * 3;
343 }
344 mysql_mutex_init(rdb_sst_commit_key, &m_commit_mutex, MY_MUTEX_INIT_FAST);
345 }
346
~Rdb_sst_info()347 Rdb_sst_info::~Rdb_sst_info() {
348 assert(m_sst_file == nullptr);
349
350 for (const auto &sst_file : m_committed_files) {
351 // In case something went wrong attempt to delete the temporary file.
352 // If everything went fine that file will have been renamed and this
353 // function call will fail.
354 std::remove(sst_file.c_str());
355 }
356 m_committed_files.clear();
357
358 mysql_mutex_destroy(&m_commit_mutex);
359 }
360
open_new_sst_file()361 int Rdb_sst_info::open_new_sst_file() {
362 assert(m_sst_file == nullptr);
363
364 // Create the new sst file's name
365 const std::string name = m_prefix + std::to_string(m_sst_count++) + m_suffix;
366
367 // Create the new sst file object
368 m_sst_file = new Rdb_sst_file_ordered(m_db, m_cf, m_db_options, name,
369 m_tracing, m_max_size);
370
371 // Open the sst file
372 const rocksdb::Status s = m_sst_file->open();
373 if (!s.ok()) {
374 set_error_msg(m_sst_file->get_name(), s);
375 delete m_sst_file;
376 m_sst_file = nullptr;
377 return HA_ERR_ROCKSDB_BULK_LOAD;
378 }
379
380 m_curr_size = 0;
381
382 return HA_EXIT_SUCCESS;
383 }
384
commit_sst_file(Rdb_sst_file_ordered * sst_file)385 void Rdb_sst_info::commit_sst_file(Rdb_sst_file_ordered *sst_file) {
386 const rocksdb::Status s = sst_file->commit();
387 if (!s.ok()) {
388 set_error_msg(sst_file->get_name(), s);
389 set_background_error(HA_ERR_ROCKSDB_BULK_LOAD);
390 }
391
392 m_committed_files.push_back(sst_file->get_name());
393
394 delete sst_file;
395 }
396
close_curr_sst_file()397 void Rdb_sst_info::close_curr_sst_file() {
398 assert(m_sst_file != nullptr);
399 assert(m_curr_size > 0);
400
401 commit_sst_file(m_sst_file);
402
403 // Reset for next sst file
404 m_sst_file = nullptr;
405 m_curr_size = 0;
406 }
407
put(const rocksdb::Slice & key,const rocksdb::Slice & value)408 int Rdb_sst_info::put(const rocksdb::Slice &key, const rocksdb::Slice &value) {
409 int rc;
410
411 assert(!m_done);
412
413 if (m_curr_size + key.size() + value.size() >= m_max_size) {
414 // The current sst file has reached its maximum, close it out
415 close_curr_sst_file();
416
417 // While we are here, check to see if we have had any errors from the
418 // background thread - we don't want to wait for the end to report them
419 if (have_background_error()) {
420 return get_and_reset_background_error();
421 }
422 }
423
424 if (m_curr_size == 0) {
425 // We don't have an sst file open - open one
426 rc = open_new_sst_file();
427 if (rc != 0) {
428 return rc;
429 }
430 }
431
432 assert(m_sst_file != nullptr);
433
434 // Add the key/value to the current sst file
435 const rocksdb::Status s = m_sst_file->put(key, value);
436 if (!s.ok()) {
437 set_error_msg(m_sst_file->get_name(), s);
438 return HA_ERR_ROCKSDB_BULK_LOAD;
439 }
440
441 m_curr_size += key.size() + value.size();
442
443 return HA_EXIT_SUCCESS;
444 }
445
446 /*
447 Finish the current work and return the list of SST files ready to be
448 ingested. This function need to be idempotent and atomic
449 */
finish(Rdb_sst_commit_info * commit_info,bool print_client_error)450 int Rdb_sst_info::finish(Rdb_sst_commit_info *commit_info,
451 bool print_client_error) {
452 int ret = HA_EXIT_SUCCESS;
453
454 // Both the transaction clean up and the ha_rocksdb handler have
455 // references to this Rdb_sst_info and both can call commit, so
456 // synchronize on the object here.
457 // This also means in such case the bulk loading operation stop being truly
458 // atomic, and we should consider fixing this in the future
459 RDB_MUTEX_LOCK_CHECK(m_commit_mutex);
460
461 if (is_done()) {
462 RDB_MUTEX_UNLOCK_CHECK(m_commit_mutex);
463 return ret;
464 }
465
466 m_print_client_error = print_client_error;
467
468 if (m_curr_size > 0) {
469 // Close out any existing files
470 close_curr_sst_file();
471 }
472
473 // This checks out the list of files so that the caller can collect/group
474 // them and ingest them all in one go, and any racing calls to commit
475 // won't see them at all
476 commit_info->init(m_cf, std::move(m_committed_files));
477 assert(m_committed_files.size() == 0);
478
479 m_done = true;
480 RDB_MUTEX_UNLOCK_CHECK(m_commit_mutex);
481
482 // Did we get any errors?
483 if (have_background_error()) {
484 ret = get_and_reset_background_error();
485 }
486
487 m_print_client_error = true;
488 return ret;
489 }
490
set_error_msg(const std::string & sst_file_name,const rocksdb::Status & s)491 void Rdb_sst_info::set_error_msg(const std::string &sst_file_name,
492 const rocksdb::Status &s) {
493 if (!m_print_client_error) return;
494
495 report_error_msg(s, sst_file_name.c_str());
496 }
497
report_error_msg(const rocksdb::Status & s,const char * sst_file_name)498 void Rdb_sst_info::report_error_msg(const rocksdb::Status &s,
499 const char *sst_file_name) {
500 if (s.IsInvalidArgument() &&
501 strcmp(s.getState(), "Keys must be added in strict ascending order.") == 0) {
502 my_printf_error(ER_KEYS_OUT_OF_ORDER,
503 "Rows must be inserted in primary key order "
504 "during bulk load operation",
505 MYF(0));
506 } else if (s.IsInvalidArgument() &&
507 strcmp(s.getState(), "Global seqno is required, but disabled") ==
508 0) {
509 my_printf_error(ER_OVERLAPPING_KEYS,
510 "Rows inserted during bulk load "
511 "must not overlap existing rows",
512 MYF(0));
513 } else {
514 my_printf_error(ER_UNKNOWN_ERROR, "[%s] bulk load error: %s", MYF(0),
515 sst_file_name, s.ToString().c_str());
516 }
517 }
518
init(const rocksdb::DB * const db)519 void Rdb_sst_info::init(const rocksdb::DB *const db) {
520 const std::string path = db->GetName() + FN_DIRSEP;
521 struct st_my_dir *const dir_info = my_dir(path.c_str(), MYF(MY_DONT_SORT));
522
523 // Access the directory
524 if (dir_info == nullptr) {
525 // NO_LINT_DEBUG
526 sql_print_warning("RocksDB: Could not access database directory: %s",
527 path.c_str());
528 return;
529 }
530
531 // Scan through the files in the directory
532 const struct fileinfo *file_info = dir_info->dir_entry;
533 for (uint ii = 0; ii < dir_info->number_off_files; ii++, file_info++) {
534 // find any files ending with m_suffix ...
535 const std::string name = file_info->name;
536 const size_t pos = name.find(m_suffix);
537 if (pos != std::string::npos && name.size() - pos == m_suffix.size()) {
538 // ... and remove them
539 const std::string fullname = path + name;
540 my_delete(fullname.c_str(), MYF(0));
541 }
542 }
543
544 // Release the directory entry
545 my_dirend(dir_info);
546 }
547
548 std::atomic<uint64_t> Rdb_sst_info::m_prefix_counter(0);
549 std::string Rdb_sst_info::m_suffix = ".bulk_load.tmp";
550 } // namespace myrocks
551