1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 //
10
11 #ifdef GFLAGS
12 #include "db_stress_tool/db_stress_common.h"
13 #include "db_stress_tool/db_stress_compaction_filter.h"
14 #include "db_stress_tool/db_stress_driver.h"
15 #include "db_stress_tool/db_stress_table_properties_collector.h"
16 #include "rocksdb/convenience.h"
17 #include "rocksdb/sst_file_manager.h"
18 #include "rocksdb/types.h"
19 #include "util/cast_util.h"
20 #include "utilities/backupable/backupable_db_impl.h"
21 #include "utilities/fault_injection_fs.h"
22
23 namespace ROCKSDB_NAMESPACE {
StressTest()24 StressTest::StressTest()
25 : cache_(NewCache(FLAGS_cache_size)),
26 compressed_cache_(NewLRUCache(FLAGS_compressed_cache_size)),
27 filter_policy_(
28 FLAGS_bloom_bits >= 0
29 ? FLAGS_use_ribbon_filter
30 ? NewExperimentalRibbonFilterPolicy(FLAGS_bloom_bits)
31 : FLAGS_use_block_based_filter
32 ? NewBloomFilterPolicy(FLAGS_bloom_bits, true)
33 : NewBloomFilterPolicy(FLAGS_bloom_bits, false)
34 : nullptr),
35 db_(nullptr),
36 #ifndef ROCKSDB_LITE
37 txn_db_(nullptr),
38 #endif
39 clock_(db_stress_env->GetSystemClock().get()),
40 new_column_family_name_(1),
41 num_times_reopened_(0),
42 db_preload_finished_(false),
43 cmp_db_(nullptr) {
44 if (FLAGS_destroy_db_initially) {
45 std::vector<std::string> files;
46 db_stress_env->GetChildren(FLAGS_db, &files);
47 for (unsigned int i = 0; i < files.size(); i++) {
48 if (Slice(files[i]).starts_with("heap-")) {
49 db_stress_env->DeleteFile(FLAGS_db + "/" + files[i]);
50 }
51 }
52
53 Options options;
54 options.env = db_stress_env;
55 // Remove files without preserving manfiest files
56 #ifndef ROCKSDB_LITE
57 const Status s = !FLAGS_use_blob_db
58 ? DestroyDB(FLAGS_db, options)
59 : blob_db::DestroyBlobDB(FLAGS_db, options,
60 blob_db::BlobDBOptions());
61 #else
62 const Status s = DestroyDB(FLAGS_db, options);
63 #endif // !ROCKSDB_LITE
64
65 if (!s.ok()) {
66 fprintf(stderr, "Cannot destroy original db: %s\n", s.ToString().c_str());
67 exit(1);
68 }
69 }
70 }
71
~StressTest()72 StressTest::~StressTest() {
73 for (auto cf : column_families_) {
74 delete cf;
75 }
76 column_families_.clear();
77 delete db_;
78
79 assert(secondaries_.size() == secondary_cfh_lists_.size());
80 size_t n = secondaries_.size();
81 for (size_t i = 0; i != n; ++i) {
82 for (auto* cf : secondary_cfh_lists_[i]) {
83 delete cf;
84 }
85 secondary_cfh_lists_[i].clear();
86 delete secondaries_[i];
87 }
88 secondaries_.clear();
89
90 for (auto* cf : cmp_cfhs_) {
91 delete cf;
92 }
93 cmp_cfhs_.clear();
94 delete cmp_db_;
95 }
96
NewCache(size_t capacity)97 std::shared_ptr<Cache> StressTest::NewCache(size_t capacity) {
98 if (capacity <= 0) {
99 return nullptr;
100 }
101 if (FLAGS_use_clock_cache) {
102 auto cache = NewClockCache((size_t)capacity);
103 if (!cache) {
104 fprintf(stderr, "Clock cache not supported.");
105 exit(1);
106 }
107 return cache;
108 } else {
109 return NewLRUCache((size_t)capacity);
110 }
111 }
112
GetBlobCompressionTags()113 std::vector<std::string> StressTest::GetBlobCompressionTags() {
114 std::vector<std::string> compression_tags{"kNoCompression"};
115
116 if (Snappy_Supported()) {
117 compression_tags.emplace_back("kSnappyCompression");
118 }
119 if (LZ4_Supported()) {
120 compression_tags.emplace_back("kLZ4Compression");
121 }
122 if (ZSTD_Supported()) {
123 compression_tags.emplace_back("kZSTD");
124 }
125
126 return compression_tags;
127 }
128
BuildOptionsTable()129 bool StressTest::BuildOptionsTable() {
130 if (FLAGS_set_options_one_in <= 0) {
131 return true;
132 }
133
134 std::unordered_map<std::string, std::vector<std::string>> options_tbl = {
135 {"write_buffer_size",
136 {ToString(options_.write_buffer_size),
137 ToString(options_.write_buffer_size * 2),
138 ToString(options_.write_buffer_size * 4)}},
139 {"max_write_buffer_number",
140 {ToString(options_.max_write_buffer_number),
141 ToString(options_.max_write_buffer_number * 2),
142 ToString(options_.max_write_buffer_number * 4)}},
143 {"arena_block_size",
144 {
145 ToString(options_.arena_block_size),
146 ToString(options_.write_buffer_size / 4),
147 ToString(options_.write_buffer_size / 8),
148 }},
149 {"memtable_huge_page_size", {"0", ToString(2 * 1024 * 1024)}},
150 {"max_successive_merges", {"0", "2", "4"}},
151 {"inplace_update_num_locks", {"100", "200", "300"}},
152 // TODO(ljin): enable test for this option
153 // {"disable_auto_compactions", {"100", "200", "300"}},
154 {"soft_rate_limit", {"0", "0.5", "0.9"}},
155 {"hard_rate_limit", {"0", "1.1", "2.0"}},
156 {"level0_file_num_compaction_trigger",
157 {
158 ToString(options_.level0_file_num_compaction_trigger),
159 ToString(options_.level0_file_num_compaction_trigger + 2),
160 ToString(options_.level0_file_num_compaction_trigger + 4),
161 }},
162 {"level0_slowdown_writes_trigger",
163 {
164 ToString(options_.level0_slowdown_writes_trigger),
165 ToString(options_.level0_slowdown_writes_trigger + 2),
166 ToString(options_.level0_slowdown_writes_trigger + 4),
167 }},
168 {"level0_stop_writes_trigger",
169 {
170 ToString(options_.level0_stop_writes_trigger),
171 ToString(options_.level0_stop_writes_trigger + 2),
172 ToString(options_.level0_stop_writes_trigger + 4),
173 }},
174 {"max_compaction_bytes",
175 {
176 ToString(options_.target_file_size_base * 5),
177 ToString(options_.target_file_size_base * 15),
178 ToString(options_.target_file_size_base * 100),
179 }},
180 {"target_file_size_base",
181 {
182 ToString(options_.target_file_size_base),
183 ToString(options_.target_file_size_base * 2),
184 ToString(options_.target_file_size_base * 4),
185 }},
186 {"target_file_size_multiplier",
187 {
188 ToString(options_.target_file_size_multiplier),
189 "1",
190 "2",
191 }},
192 {"max_bytes_for_level_base",
193 {
194 ToString(options_.max_bytes_for_level_base / 2),
195 ToString(options_.max_bytes_for_level_base),
196 ToString(options_.max_bytes_for_level_base * 2),
197 }},
198 {"max_bytes_for_level_multiplier",
199 {
200 ToString(options_.max_bytes_for_level_multiplier),
201 "1",
202 "2",
203 }},
204 {"max_sequential_skip_in_iterations", {"4", "8", "12"}},
205 };
206
207 if (FLAGS_allow_setting_blob_options_dynamically) {
208 options_tbl.emplace("enable_blob_files",
209 std::vector<std::string>{"false", "true"});
210 options_tbl.emplace("min_blob_size",
211 std::vector<std::string>{"0", "8", "16"});
212 options_tbl.emplace("blob_file_size",
213 std::vector<std::string>{"1M", "16M", "256M", "1G"});
214 options_tbl.emplace("blob_compression_type", GetBlobCompressionTags());
215 options_tbl.emplace("enable_blob_garbage_collection",
216 std::vector<std::string>{"false", "true"});
217 options_tbl.emplace(
218 "blob_garbage_collection_age_cutoff",
219 std::vector<std::string>{"0.0", "0.25", "0.5", "0.75", "1.0"});
220 }
221
222 options_table_ = std::move(options_tbl);
223
224 for (const auto& iter : options_table_) {
225 options_index_.push_back(iter.first);
226 }
227 return true;
228 }
229
InitDb()230 void StressTest::InitDb() {
231 uint64_t now = clock_->NowMicros();
232 fprintf(stdout, "%s Initializing db_stress\n",
233 clock_->TimeToString(now / 1000000).c_str());
234 PrintEnv();
235 Open();
236 BuildOptionsTable();
237 }
238
FinishInitDb(SharedState * shared)239 void StressTest::FinishInitDb(SharedState* shared) {
240 if (FLAGS_read_only) {
241 uint64_t now = clock_->NowMicros();
242 fprintf(stdout, "%s Preloading db with %" PRIu64 " KVs\n",
243 clock_->TimeToString(now / 1000000).c_str(), FLAGS_max_key);
244 PreloadDbAndReopenAsReadOnly(FLAGS_max_key, shared);
245 }
246 if (FLAGS_enable_compaction_filter) {
247 auto* compaction_filter_factory =
248 reinterpret_cast<DbStressCompactionFilterFactory*>(
249 options_.compaction_filter_factory.get());
250 assert(compaction_filter_factory);
251 compaction_filter_factory->SetSharedState(shared);
252 fprintf(stdout, "Compaction filter factory: %s\n",
253 compaction_filter_factory->Name());
254 }
255 }
256
VerifySecondaries()257 bool StressTest::VerifySecondaries() {
258 #ifndef ROCKSDB_LITE
259 if (FLAGS_test_secondary) {
260 uint64_t now = clock_->NowMicros();
261 fprintf(stdout, "%s Start to verify secondaries against primary\n",
262 clock_->TimeToString(static_cast<uint64_t>(now) / 1000000).c_str());
263 }
264 for (size_t k = 0; k != secondaries_.size(); ++k) {
265 Status s = secondaries_[k]->TryCatchUpWithPrimary();
266 if (!s.ok()) {
267 fprintf(stderr, "Secondary failed to catch up with primary\n");
268 return false;
269 }
270 ReadOptions ropts;
271 ropts.total_order_seek = true;
272 // Verify only the default column family since the primary may have
273 // dropped other column families after most recent reopen.
274 std::unique_ptr<Iterator> iter1(db_->NewIterator(ropts));
275 std::unique_ptr<Iterator> iter2(secondaries_[k]->NewIterator(ropts));
276 for (iter1->SeekToFirst(), iter2->SeekToFirst();
277 iter1->Valid() && iter2->Valid(); iter1->Next(), iter2->Next()) {
278 if (iter1->key().compare(iter2->key()) != 0 ||
279 iter1->value().compare(iter2->value())) {
280 fprintf(stderr,
281 "Secondary %d contains different data from "
282 "primary.\nPrimary: %s : %s\nSecondary: %s : %s\n",
283 static_cast<int>(k),
284 iter1->key().ToString(/*hex=*/true).c_str(),
285 iter1->value().ToString(/*hex=*/true).c_str(),
286 iter2->key().ToString(/*hex=*/true).c_str(),
287 iter2->value().ToString(/*hex=*/true).c_str());
288 return false;
289 }
290 }
291 if (iter1->Valid() && !iter2->Valid()) {
292 fprintf(stderr,
293 "Secondary %d record count is smaller than that of primary\n",
294 static_cast<int>(k));
295 return false;
296 } else if (!iter1->Valid() && iter2->Valid()) {
297 fprintf(stderr,
298 "Secondary %d record count is larger than that of primary\n",
299 static_cast<int>(k));
300 return false;
301 }
302 }
303 if (FLAGS_test_secondary) {
304 uint64_t now = clock_->NowMicros();
305 fprintf(stdout, "%s Verification of secondaries succeeded\n",
306 clock_->TimeToString(static_cast<uint64_t>(now) / 1000000).c_str());
307 }
308 #endif // ROCKSDB_LITE
309 return true;
310 }
311
AssertSame(DB * db,ColumnFamilyHandle * cf,ThreadState::SnapshotState & snap_state)312 Status StressTest::AssertSame(DB* db, ColumnFamilyHandle* cf,
313 ThreadState::SnapshotState& snap_state) {
314 Status s;
315 if (cf->GetName() != snap_state.cf_at_name) {
316 return s;
317 }
318 ReadOptions ropt;
319 ropt.snapshot = snap_state.snapshot;
320 Slice ts;
321 if (!snap_state.timestamp.empty()) {
322 ts = snap_state.timestamp;
323 ropt.timestamp = &ts;
324 }
325 PinnableSlice exp_v(&snap_state.value);
326 exp_v.PinSelf();
327 PinnableSlice v;
328 s = db->Get(ropt, cf, snap_state.key, &v);
329 if (!s.ok() && !s.IsNotFound()) {
330 return s;
331 }
332 if (snap_state.status != s) {
333 return Status::Corruption(
334 "The snapshot gave inconsistent results for key " +
335 ToString(Hash(snap_state.key.c_str(), snap_state.key.size(), 0)) +
336 " in cf " + cf->GetName() + ": (" + snap_state.status.ToString() +
337 ") vs. (" + s.ToString() + ")");
338 }
339 if (s.ok()) {
340 if (exp_v != v) {
341 return Status::Corruption("The snapshot gave inconsistent values: (" +
342 exp_v.ToString() + ") vs. (" + v.ToString() +
343 ")");
344 }
345 }
346 if (snap_state.key_vec != nullptr) {
347 // When `prefix_extractor` is set, seeking to beginning and scanning
348 // across prefixes are only supported with `total_order_seek` set.
349 ropt.total_order_seek = true;
350 std::unique_ptr<Iterator> iterator(db->NewIterator(ropt));
351 std::unique_ptr<std::vector<bool>> tmp_bitvec(
352 new std::vector<bool>(FLAGS_max_key));
353 for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
354 uint64_t key_val;
355 if (GetIntVal(iterator->key().ToString(), &key_val)) {
356 (*tmp_bitvec.get())[key_val] = true;
357 }
358 }
359 if (!std::equal(snap_state.key_vec->begin(), snap_state.key_vec->end(),
360 tmp_bitvec.get()->begin())) {
361 return Status::Corruption("Found inconsistent keys at this snapshot");
362 }
363 }
364 return Status::OK();
365 }
366
VerificationAbort(SharedState * shared,std::string msg,Status s) const367 void StressTest::VerificationAbort(SharedState* shared, std::string msg,
368 Status s) const {
369 fprintf(stderr, "Verification failed: %s. Status is %s\n", msg.c_str(),
370 s.ToString().c_str());
371 shared->SetVerificationFailure();
372 }
373
VerificationAbort(SharedState * shared,std::string msg,int cf,int64_t key) const374 void StressTest::VerificationAbort(SharedState* shared, std::string msg, int cf,
375 int64_t key) const {
376 auto key_str = Key(key);
377 Slice key_slice = key_str;
378 fprintf(stderr,
379 "Verification failed for column family %d key %s (%" PRIi64 "): %s\n",
380 cf, key_slice.ToString(true).c_str(), key, msg.c_str());
381 shared->SetVerificationFailure();
382 }
383
PrintStatistics()384 void StressTest::PrintStatistics() {
385 if (dbstats) {
386 fprintf(stdout, "STATISTICS:\n%s\n", dbstats->ToString().c_str());
387 }
388 if (dbstats_secondaries) {
389 fprintf(stdout, "Secondary instances STATISTICS:\n%s\n",
390 dbstats_secondaries->ToString().c_str());
391 }
392 }
393
394 // Currently PreloadDb has to be single-threaded.
PreloadDbAndReopenAsReadOnly(int64_t number_of_keys,SharedState * shared)395 void StressTest::PreloadDbAndReopenAsReadOnly(int64_t number_of_keys,
396 SharedState* shared) {
397 WriteOptions write_opts;
398 write_opts.disableWAL = FLAGS_disable_wal;
399 if (FLAGS_sync) {
400 write_opts.sync = true;
401 }
402 char value[100];
403 int cf_idx = 0;
404 Status s;
405 for (auto cfh : column_families_) {
406 for (int64_t k = 0; k != number_of_keys; ++k) {
407 std::string key_str = Key(k);
408 Slice key = key_str;
409 size_t sz = GenerateValue(0 /*value_base*/, value, sizeof(value));
410 Slice v(value, sz);
411 shared->Put(cf_idx, k, 0, true /* pending */);
412
413 if (FLAGS_use_merge) {
414 if (!FLAGS_use_txn) {
415 s = db_->Merge(write_opts, cfh, key, v);
416 } else {
417 #ifndef ROCKSDB_LITE
418 Transaction* txn;
419 s = NewTxn(write_opts, &txn);
420 if (s.ok()) {
421 s = txn->Merge(cfh, key, v);
422 if (s.ok()) {
423 s = CommitTxn(txn);
424 }
425 }
426 #endif
427 }
428 } else {
429 if (!FLAGS_use_txn) {
430 std::string ts_str;
431 Slice ts;
432 if (FLAGS_user_timestamp_size > 0) {
433 ts_str = NowNanosStr();
434 ts = ts_str;
435 write_opts.timestamp = &ts;
436 }
437 s = db_->Put(write_opts, cfh, key, v);
438 } else {
439 #ifndef ROCKSDB_LITE
440 Transaction* txn;
441 s = NewTxn(write_opts, &txn);
442 if (s.ok()) {
443 s = txn->Put(cfh, key, v);
444 if (s.ok()) {
445 s = CommitTxn(txn);
446 }
447 }
448 #endif
449 }
450 }
451
452 shared->Put(cf_idx, k, 0, false /* pending */);
453 if (!s.ok()) {
454 break;
455 }
456 }
457 if (!s.ok()) {
458 break;
459 }
460 ++cf_idx;
461 }
462 if (s.ok()) {
463 s = db_->Flush(FlushOptions(), column_families_);
464 }
465 if (s.ok()) {
466 for (auto cf : column_families_) {
467 delete cf;
468 }
469 column_families_.clear();
470 delete db_;
471 db_ = nullptr;
472 #ifndef ROCKSDB_LITE
473 txn_db_ = nullptr;
474 #endif
475
476 db_preload_finished_.store(true);
477 auto now = clock_->NowMicros();
478 fprintf(stdout, "%s Reopening database in read-only\n",
479 clock_->TimeToString(now / 1000000).c_str());
480 // Reopen as read-only, can ignore all options related to updates
481 Open();
482 } else {
483 fprintf(stderr, "Failed to preload db");
484 exit(1);
485 }
486 }
487
SetOptions(ThreadState * thread)488 Status StressTest::SetOptions(ThreadState* thread) {
489 assert(FLAGS_set_options_one_in > 0);
490 std::unordered_map<std::string, std::string> opts;
491 std::string name =
492 options_index_[thread->rand.Next() % options_index_.size()];
493 int value_idx = thread->rand.Next() % options_table_[name].size();
494 if (name == "soft_rate_limit" || name == "hard_rate_limit") {
495 opts["soft_rate_limit"] = options_table_["soft_rate_limit"][value_idx];
496 opts["hard_rate_limit"] = options_table_["hard_rate_limit"][value_idx];
497 } else if (name == "level0_file_num_compaction_trigger" ||
498 name == "level0_slowdown_writes_trigger" ||
499 name == "level0_stop_writes_trigger") {
500 opts["level0_file_num_compaction_trigger"] =
501 options_table_["level0_file_num_compaction_trigger"][value_idx];
502 opts["level0_slowdown_writes_trigger"] =
503 options_table_["level0_slowdown_writes_trigger"][value_idx];
504 opts["level0_stop_writes_trigger"] =
505 options_table_["level0_stop_writes_trigger"][value_idx];
506 } else {
507 opts[name] = options_table_[name][value_idx];
508 }
509
510 int rand_cf_idx = thread->rand.Next() % FLAGS_column_families;
511 auto cfh = column_families_[rand_cf_idx];
512 return db_->SetOptions(cfh, opts);
513 }
514
515 #ifndef ROCKSDB_LITE
NewTxn(WriteOptions & write_opts,Transaction ** txn)516 Status StressTest::NewTxn(WriteOptions& write_opts, Transaction** txn) {
517 if (!FLAGS_use_txn) {
518 return Status::InvalidArgument("NewTxn when FLAGS_use_txn is not set");
519 }
520 static std::atomic<uint64_t> txn_id = {0};
521 TransactionOptions txn_options;
522 txn_options.lock_timeout = 600000; // 10 min
523 txn_options.deadlock_detect = true;
524 *txn = txn_db_->BeginTransaction(write_opts, txn_options);
525 auto istr = std::to_string(txn_id.fetch_add(1));
526 Status s = (*txn)->SetName("xid" + istr);
527 return s;
528 }
529
CommitTxn(Transaction * txn)530 Status StressTest::CommitTxn(Transaction* txn) {
531 if (!FLAGS_use_txn) {
532 return Status::InvalidArgument("CommitTxn when FLAGS_use_txn is not set");
533 }
534 Status s = txn->Prepare();
535 if (s.ok()) {
536 s = txn->Commit();
537 }
538 delete txn;
539 return s;
540 }
541
RollbackTxn(Transaction * txn)542 Status StressTest::RollbackTxn(Transaction* txn) {
543 if (!FLAGS_use_txn) {
544 return Status::InvalidArgument(
545 "RollbackTxn when FLAGS_use_txn is not"
546 " set");
547 }
548 Status s = txn->Rollback();
549 delete txn;
550 return s;
551 }
552 #endif
553
OperateDb(ThreadState * thread)554 void StressTest::OperateDb(ThreadState* thread) {
555 ReadOptions read_opts(FLAGS_verify_checksum, true);
556 WriteOptions write_opts;
557 auto shared = thread->shared;
558 char value[100];
559 std::string from_db;
560 if (FLAGS_sync) {
561 write_opts.sync = true;
562 }
563 write_opts.disableWAL = FLAGS_disable_wal;
564 const int prefixBound = static_cast<int>(FLAGS_readpercent) +
565 static_cast<int>(FLAGS_prefixpercent);
566 const int writeBound = prefixBound + static_cast<int>(FLAGS_writepercent);
567 const int delBound = writeBound + static_cast<int>(FLAGS_delpercent);
568 const int delRangeBound = delBound + static_cast<int>(FLAGS_delrangepercent);
569 const uint64_t ops_per_open = FLAGS_ops_per_thread / (FLAGS_reopen + 1);
570
571 #ifndef NDEBUG
572 if (FLAGS_read_fault_one_in) {
573 fault_fs_guard->SetThreadLocalReadErrorContext(thread->shared->GetSeed(),
574 FLAGS_read_fault_one_in);
575 }
576 if (FLAGS_write_fault_one_in) {
577 IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
578 error_msg.SetRetryable(true);
579 std::vector<FileType> types = {FileType::kTableFile,
580 FileType::kDescriptorFile,
581 FileType::kCurrentFile};
582 fault_fs_guard->SetRandomWriteError(
583 thread->shared->GetSeed(), FLAGS_write_fault_one_in, error_msg, types);
584 }
585 #endif // NDEBUG
586 thread->stats.Start();
587 for (int open_cnt = 0; open_cnt <= FLAGS_reopen; ++open_cnt) {
588 if (thread->shared->HasVerificationFailedYet() ||
589 thread->shared->ShouldStopTest()) {
590 break;
591 }
592 if (open_cnt != 0) {
593 thread->stats.FinishedSingleOp();
594 MutexLock l(thread->shared->GetMutex());
595 while (!thread->snapshot_queue.empty()) {
596 db_->ReleaseSnapshot(thread->snapshot_queue.front().second.snapshot);
597 delete thread->snapshot_queue.front().second.key_vec;
598 thread->snapshot_queue.pop();
599 }
600 thread->shared->IncVotedReopen();
601 if (thread->shared->AllVotedReopen()) {
602 thread->shared->GetStressTest()->Reopen(thread);
603 thread->shared->GetCondVar()->SignalAll();
604 } else {
605 thread->shared->GetCondVar()->Wait();
606 }
607 // Commenting this out as we don't want to reset stats on each open.
608 // thread->stats.Start();
609 }
610
611 for (uint64_t i = 0; i < ops_per_open; i++) {
612 if (thread->shared->HasVerificationFailedYet()) {
613 break;
614 }
615
616 // Change Options
617 if (thread->rand.OneInOpt(FLAGS_set_options_one_in)) {
618 SetOptions(thread);
619 }
620
621 if (thread->rand.OneInOpt(FLAGS_set_in_place_one_in)) {
622 options_.inplace_update_support ^= options_.inplace_update_support;
623 }
624
625 if (thread->tid == 0 && FLAGS_verify_db_one_in > 0 &&
626 thread->rand.OneIn(FLAGS_verify_db_one_in)) {
627 ContinuouslyVerifyDb(thread);
628 if (thread->shared->ShouldStopTest()) {
629 break;
630 }
631 }
632
633 MaybeClearOneColumnFamily(thread);
634
635 if (thread->rand.OneInOpt(FLAGS_sync_wal_one_in)) {
636 Status s = db_->SyncWAL();
637 if (!s.ok() && !s.IsNotSupported()) {
638 fprintf(stderr, "SyncWAL() failed: %s\n", s.ToString().c_str());
639 }
640 }
641
642 int rand_column_family = thread->rand.Next() % FLAGS_column_families;
643 ColumnFamilyHandle* column_family = column_families_[rand_column_family];
644
645 if (thread->rand.OneInOpt(FLAGS_compact_files_one_in)) {
646 TestCompactFiles(thread, column_family);
647 }
648
649 int64_t rand_key = GenerateOneKey(thread, i);
650 std::string keystr = Key(rand_key);
651 Slice key = keystr;
652 std::unique_ptr<MutexLock> lock;
653 if (ShouldAcquireMutexOnKey()) {
654 lock.reset(new MutexLock(
655 shared->GetMutexForKey(rand_column_family, rand_key)));
656 }
657
658 if (thread->rand.OneInOpt(FLAGS_compact_range_one_in)) {
659 TestCompactRange(thread, rand_key, key, column_family);
660 if (thread->shared->HasVerificationFailedYet()) {
661 break;
662 }
663 }
664
665 std::vector<int> rand_column_families =
666 GenerateColumnFamilies(FLAGS_column_families, rand_column_family);
667
668 if (thread->rand.OneInOpt(FLAGS_flush_one_in)) {
669 Status status = TestFlush(rand_column_families);
670 if (!status.ok()) {
671 fprintf(stdout, "Unable to perform Flush(): %s\n",
672 status.ToString().c_str());
673 }
674 }
675
676 #ifndef ROCKSDB_LITE
677 // Verify GetLiveFiles with a 1 in N chance.
678 if (thread->rand.OneInOpt(FLAGS_get_live_files_one_in) &&
679 !FLAGS_write_fault_one_in) {
680 Status status = VerifyGetLiveFiles();
681 if (!status.ok()) {
682 VerificationAbort(shared, "VerifyGetLiveFiles status not OK", status);
683 }
684 }
685
686 // Verify GetSortedWalFiles with a 1 in N chance.
687 if (thread->rand.OneInOpt(FLAGS_get_sorted_wal_files_one_in)) {
688 Status status = VerifyGetSortedWalFiles();
689 if (!status.ok()) {
690 VerificationAbort(shared, "VerifyGetSortedWalFiles status not OK",
691 status);
692 }
693 }
694
695 // Verify GetCurrentWalFile with a 1 in N chance.
696 if (thread->rand.OneInOpt(FLAGS_get_current_wal_file_one_in)) {
697 Status status = VerifyGetCurrentWalFile();
698 if (!status.ok()) {
699 VerificationAbort(shared, "VerifyGetCurrentWalFile status not OK",
700 status);
701 }
702 }
703 #endif // !ROCKSDB_LITE
704
705 if (thread->rand.OneInOpt(FLAGS_pause_background_one_in)) {
706 Status status = TestPauseBackground(thread);
707 if (!status.ok()) {
708 VerificationAbort(
709 shared, "Pause/ContinueBackgroundWork status not OK", status);
710 }
711 }
712
713 #ifndef ROCKSDB_LITE
714 if (thread->rand.OneInOpt(FLAGS_verify_checksum_one_in)) {
715 Status status = db_->VerifyChecksum();
716 if (!status.ok()) {
717 VerificationAbort(shared, "VerifyChecksum status not OK", status);
718 }
719 }
720
721 if (thread->rand.OneInOpt(FLAGS_get_property_one_in)) {
722 TestGetProperty(thread);
723 }
724 #endif
725
726 std::vector<int64_t> rand_keys = GenerateKeys(rand_key);
727
728 if (thread->rand.OneInOpt(FLAGS_ingest_external_file_one_in)) {
729 TestIngestExternalFile(thread, rand_column_families, rand_keys, lock);
730 }
731
732 if (thread->rand.OneInOpt(FLAGS_backup_one_in)) {
733 // Beyond a certain DB size threshold, this test becomes heavier than
734 // it's worth.
735 uint64_t total_size = 0;
736 if (FLAGS_backup_max_size > 0) {
737 std::vector<FileAttributes> files;
738 db_stress_env->GetChildrenFileAttributes(FLAGS_db, &files);
739 for (auto& file : files) {
740 total_size += file.size_bytes;
741 }
742 }
743
744 if (total_size <= FLAGS_backup_max_size) {
745 Status s = TestBackupRestore(thread, rand_column_families, rand_keys);
746 if (!s.ok()) {
747 VerificationAbort(shared, "Backup/restore gave inconsistent state",
748 s);
749 }
750 }
751 }
752
753 if (thread->rand.OneInOpt(FLAGS_checkpoint_one_in)) {
754 Status s = TestCheckpoint(thread, rand_column_families, rand_keys);
755 if (!s.ok()) {
756 VerificationAbort(shared, "Checkpoint gave inconsistent state", s);
757 }
758 }
759
760 #ifndef ROCKSDB_LITE
761 if (thread->rand.OneInOpt(FLAGS_approximate_size_one_in)) {
762 Status s =
763 TestApproximateSize(thread, i, rand_column_families, rand_keys);
764 if (!s.ok()) {
765 VerificationAbort(shared, "ApproximateSize Failed", s);
766 }
767 }
768 #endif // !ROCKSDB_LITE
769 if (thread->rand.OneInOpt(FLAGS_acquire_snapshot_one_in)) {
770 TestAcquireSnapshot(thread, rand_column_family, keystr, i);
771 }
772
773 /*always*/ {
774 Status s = MaybeReleaseSnapshots(thread, i);
775 if (!s.ok()) {
776 VerificationAbort(shared, "Snapshot gave inconsistent state", s);
777 }
778 }
779
780 // Assign timestamps if necessary.
781 std::string read_ts_str;
782 std::string write_ts_str;
783 Slice read_ts;
784 Slice write_ts;
785 if (ShouldAcquireMutexOnKey() && FLAGS_user_timestamp_size > 0) {
786 read_ts_str = GenerateTimestampForRead();
787 read_ts = read_ts_str;
788 read_opts.timestamp = &read_ts;
789 write_ts_str = NowNanosStr();
790 write_ts = write_ts_str;
791 write_opts.timestamp = &write_ts;
792 }
793
794 int prob_op = thread->rand.Uniform(100);
795 // Reset this in case we pick something other than a read op. We don't
796 // want to use a stale value when deciding at the beginning of the loop
797 // whether to vote to reopen
798 if (prob_op >= 0 && prob_op < static_cast<int>(FLAGS_readpercent)) {
799 assert(0 <= prob_op);
800 // OPERATION read
801 if (FLAGS_use_multiget) {
802 // Leave room for one more iteration of the loop with a single key
803 // batch. This is to ensure that each thread does exactly the same
804 // number of ops
805 int multiget_batch_size = static_cast<int>(
806 std::min(static_cast<uint64_t>(thread->rand.Uniform(64)),
807 FLAGS_ops_per_thread - i - 1));
808 // If its the last iteration, ensure that multiget_batch_size is 1
809 multiget_batch_size = std::max(multiget_batch_size, 1);
810 rand_keys = GenerateNKeys(thread, multiget_batch_size, i);
811 TestMultiGet(thread, read_opts, rand_column_families, rand_keys);
812 i += multiget_batch_size - 1;
813 } else {
814 TestGet(thread, read_opts, rand_column_families, rand_keys);
815 }
816 } else if (prob_op < prefixBound) {
817 assert(static_cast<int>(FLAGS_readpercent) <= prob_op);
818 // OPERATION prefix scan
819 // keys are 8 bytes long, prefix size is FLAGS_prefix_size. There are
820 // (8 - FLAGS_prefix_size) bytes besides the prefix. So there will
821 // be 2 ^ ((8 - FLAGS_prefix_size) * 8) possible keys with the same
822 // prefix
823 TestPrefixScan(thread, read_opts, rand_column_families, rand_keys);
824 } else if (prob_op < writeBound) {
825 assert(prefixBound <= prob_op);
826 // OPERATION write
827 TestPut(thread, write_opts, read_opts, rand_column_families, rand_keys,
828 value, lock);
829 } else if (prob_op < delBound) {
830 assert(writeBound <= prob_op);
831 // OPERATION delete
832 TestDelete(thread, write_opts, rand_column_families, rand_keys, lock);
833 } else if (prob_op < delRangeBound) {
834 assert(delBound <= prob_op);
835 // OPERATION delete range
836 TestDeleteRange(thread, write_opts, rand_column_families, rand_keys,
837 lock);
838 } else {
839 assert(delRangeBound <= prob_op);
840 // OPERATION iterate
841 int num_seeks = static_cast<int>(
842 std::min(static_cast<uint64_t>(thread->rand.Uniform(4)),
843 FLAGS_ops_per_thread - i - 1));
844 rand_keys = GenerateNKeys(thread, num_seeks, i);
845 i += num_seeks - 1;
846 TestIterate(thread, read_opts, rand_column_families, rand_keys);
847 }
848 thread->stats.FinishedSingleOp();
849 #ifndef ROCKSDB_LITE
850 uint32_t tid = thread->tid;
851 assert(secondaries_.empty() ||
852 static_cast<size_t>(tid) < secondaries_.size());
853 if (thread->rand.OneInOpt(FLAGS_secondary_catch_up_one_in)) {
854 Status s = secondaries_[tid]->TryCatchUpWithPrimary();
855 if (!s.ok()) {
856 VerificationAbort(shared, "Secondary instance failed to catch up", s);
857 break;
858 }
859 }
860 #endif
861 }
862 }
863 while (!thread->snapshot_queue.empty()) {
864 db_->ReleaseSnapshot(thread->snapshot_queue.front().second.snapshot);
865 delete thread->snapshot_queue.front().second.key_vec;
866 thread->snapshot_queue.pop();
867 }
868
869 thread->stats.Stop();
870 }
871
872 #ifndef ROCKSDB_LITE
873 // Generated a list of keys that close to boundaries of SST keys.
874 // If there isn't any SST file in the DB, return empty list.
GetWhiteBoxKeys(ThreadState * thread,DB * db,ColumnFamilyHandle * cfh,size_t num_keys)875 std::vector<std::string> StressTest::GetWhiteBoxKeys(ThreadState* thread,
876 DB* db,
877 ColumnFamilyHandle* cfh,
878 size_t num_keys) {
879 ColumnFamilyMetaData cfmd;
880 db->GetColumnFamilyMetaData(cfh, &cfmd);
881 std::vector<std::string> boundaries;
882 for (const LevelMetaData& lmd : cfmd.levels) {
883 for (const SstFileMetaData& sfmd : lmd.files) {
884 // If FLAGS_user_timestamp_size > 0, then both smallestkey and largestkey
885 // have timestamps.
886 const auto& skey = sfmd.smallestkey;
887 const auto& lkey = sfmd.largestkey;
888 assert(skey.size() >= FLAGS_user_timestamp_size);
889 assert(lkey.size() >= FLAGS_user_timestamp_size);
890 boundaries.push_back(
891 skey.substr(0, skey.size() - FLAGS_user_timestamp_size));
892 boundaries.push_back(
893 lkey.substr(0, lkey.size() - FLAGS_user_timestamp_size));
894 }
895 }
896 if (boundaries.empty()) {
897 return {};
898 }
899
900 std::vector<std::string> ret;
901 for (size_t j = 0; j < num_keys; j++) {
902 std::string k =
903 boundaries[thread->rand.Uniform(static_cast<int>(boundaries.size()))];
904 if (thread->rand.OneIn(3)) {
905 // Reduce one byte from the string
906 for (int i = static_cast<int>(k.length()) - 1; i >= 0; i--) {
907 uint8_t cur = k[i];
908 if (cur > 0) {
909 k[i] = static_cast<char>(cur - 1);
910 break;
911 } else if (i > 0) {
912 k[i] = 0xFFu;
913 }
914 }
915 } else if (thread->rand.OneIn(2)) {
916 // Add one byte to the string
917 for (int i = static_cast<int>(k.length()) - 1; i >= 0; i--) {
918 uint8_t cur = k[i];
919 if (cur < 255) {
920 k[i] = static_cast<char>(cur + 1);
921 break;
922 } else if (i > 0) {
923 k[i] = 0x00;
924 }
925 }
926 }
927 ret.push_back(k);
928 }
929 return ret;
930 }
931 #endif // !ROCKSDB_LITE
932
933 // Given a key K, this creates an iterator which scans to K and then
934 // does a random sequence of Next/Prev operations.
TestIterate(ThreadState * thread,const ReadOptions & read_opts,const std::vector<int> & rand_column_families,const std::vector<int64_t> & rand_keys)935 Status StressTest::TestIterate(ThreadState* thread,
936 const ReadOptions& read_opts,
937 const std::vector<int>& rand_column_families,
938 const std::vector<int64_t>& rand_keys) {
939 Status s;
940 const Snapshot* snapshot = db_->GetSnapshot();
941 ReadOptions readoptionscopy = read_opts;
942 readoptionscopy.snapshot = snapshot;
943
944 bool expect_total_order = false;
945 if (thread->rand.OneIn(16)) {
946 // When prefix extractor is used, it's useful to cover total order seek.
947 readoptionscopy.total_order_seek = true;
948 expect_total_order = true;
949 } else if (thread->rand.OneIn(4)) {
950 readoptionscopy.total_order_seek = false;
951 readoptionscopy.auto_prefix_mode = true;
952 expect_total_order = true;
953 } else if (options_.prefix_extractor.get() == nullptr) {
954 expect_total_order = true;
955 }
956
957 std::string upper_bound_str;
958 Slice upper_bound;
959 if (thread->rand.OneIn(16)) {
960 // in 1/16 chance, set a iterator upper bound
961 int64_t rand_upper_key = GenerateOneKey(thread, FLAGS_ops_per_thread);
962 upper_bound_str = Key(rand_upper_key);
963 upper_bound = Slice(upper_bound_str);
964 // uppder_bound can be smaller than seek key, but the query itself
965 // should not crash either.
966 readoptionscopy.iterate_upper_bound = &upper_bound;
967 }
968 std::string lower_bound_str;
969 Slice lower_bound;
970 if (thread->rand.OneIn(16)) {
971 // in 1/16 chance, enable iterator lower bound
972 int64_t rand_lower_key = GenerateOneKey(thread, FLAGS_ops_per_thread);
973 lower_bound_str = Key(rand_lower_key);
974 lower_bound = Slice(lower_bound_str);
975 // uppder_bound can be smaller than seek key, but the query itself
976 // should not crash either.
977 readoptionscopy.iterate_lower_bound = &lower_bound;
978 }
979
980 auto cfh = column_families_[rand_column_families[0]];
981 std::unique_ptr<Iterator> iter(db_->NewIterator(readoptionscopy, cfh));
982
983 std::vector<std::string> key_str;
984 if (thread->rand.OneIn(16)) {
985 // Generate keys close to lower or upper bound of SST files.
986 key_str = GetWhiteBoxKeys(thread, db_, cfh, rand_keys.size());
987 }
988 if (key_str.empty()) {
989 // If key string is not geneerated using white block keys,
990 // Use randomized key passe in.
991 for (int64_t rkey : rand_keys) {
992 key_str.push_back(Key(rkey));
993 }
994 }
995
996 std::string op_logs;
997 const size_t kOpLogsLimit = 10000;
998
999 for (const std::string& skey : key_str) {
1000 if (op_logs.size() > kOpLogsLimit) {
1001 // Shouldn't take too much memory for the history log. Clear it.
1002 op_logs = "(cleared...)\n";
1003 }
1004
1005 Slice key = skey;
1006
1007 if (readoptionscopy.iterate_upper_bound != nullptr &&
1008 thread->rand.OneIn(2)) {
1009 // 1/2 chance, change the upper bound.
1010 // It is possible that it is changed without first use, but there is no
1011 // problem with that.
1012 int64_t rand_upper_key = GenerateOneKey(thread, FLAGS_ops_per_thread);
1013 upper_bound_str = Key(rand_upper_key);
1014 upper_bound = Slice(upper_bound_str);
1015 } else if (readoptionscopy.iterate_lower_bound != nullptr &&
1016 thread->rand.OneIn(4)) {
1017 // 1/4 chance, change the lower bound.
1018 // It is possible that it is changed without first use, but there is no
1019 // problem with that.
1020 int64_t rand_lower_key = GenerateOneKey(thread, FLAGS_ops_per_thread);
1021 lower_bound_str = Key(rand_lower_key);
1022 lower_bound = Slice(lower_bound_str);
1023 }
1024
1025 // Record some options to op_logs;
1026 op_logs += "total_order_seek: ";
1027 op_logs += (readoptionscopy.total_order_seek ? "1 " : "0 ");
1028 op_logs += "auto_prefix_mode: ";
1029 op_logs += (readoptionscopy.auto_prefix_mode ? "1 " : "0 ");
1030 if (readoptionscopy.iterate_upper_bound != nullptr) {
1031 op_logs += "ub: " + upper_bound.ToString(true) + " ";
1032 }
1033 if (readoptionscopy.iterate_lower_bound != nullptr) {
1034 op_logs += "lb: " + lower_bound.ToString(true) + " ";
1035 }
1036
1037 // Set up an iterator and does the same without bounds and with total
1038 // order seek and compare the results. This is to identify bugs related
1039 // to bounds, prefix extractor or reseeking. Sometimes we are comparing
1040 // iterators with the same set-up, and it doesn't hurt to check them
1041 // to be equal.
1042 ReadOptions cmp_ro;
1043 cmp_ro.timestamp = readoptionscopy.timestamp;
1044 cmp_ro.snapshot = snapshot;
1045 cmp_ro.total_order_seek = true;
1046 ColumnFamilyHandle* cmp_cfh =
1047 GetControlCfh(thread, rand_column_families[0]);
1048 std::unique_ptr<Iterator> cmp_iter(db_->NewIterator(cmp_ro, cmp_cfh));
1049 bool diverged = false;
1050
1051 bool support_seek_first_or_last = expect_total_order;
1052
1053 LastIterateOp last_op;
1054 if (support_seek_first_or_last && thread->rand.OneIn(100)) {
1055 iter->SeekToFirst();
1056 cmp_iter->SeekToFirst();
1057 last_op = kLastOpSeekToFirst;
1058 op_logs += "STF ";
1059 } else if (support_seek_first_or_last && thread->rand.OneIn(100)) {
1060 iter->SeekToLast();
1061 cmp_iter->SeekToLast();
1062 last_op = kLastOpSeekToLast;
1063 op_logs += "STL ";
1064 } else if (thread->rand.OneIn(8)) {
1065 iter->SeekForPrev(key);
1066 cmp_iter->SeekForPrev(key);
1067 last_op = kLastOpSeekForPrev;
1068 op_logs += "SFP " + key.ToString(true) + " ";
1069 } else {
1070 iter->Seek(key);
1071 cmp_iter->Seek(key);
1072 last_op = kLastOpSeek;
1073 op_logs += "S " + key.ToString(true) + " ";
1074 }
1075 VerifyIterator(thread, cmp_cfh, readoptionscopy, iter.get(), cmp_iter.get(),
1076 last_op, key, op_logs, &diverged);
1077
1078 bool no_reverse =
1079 (FLAGS_memtablerep == "prefix_hash" && !expect_total_order);
1080 for (uint64_t i = 0; i < FLAGS_num_iterations && iter->Valid(); i++) {
1081 if (no_reverse || thread->rand.OneIn(2)) {
1082 iter->Next();
1083 if (!diverged) {
1084 assert(cmp_iter->Valid());
1085 cmp_iter->Next();
1086 }
1087 op_logs += "N";
1088 } else {
1089 iter->Prev();
1090 if (!diverged) {
1091 assert(cmp_iter->Valid());
1092 cmp_iter->Prev();
1093 }
1094 op_logs += "P";
1095 }
1096 last_op = kLastOpNextOrPrev;
1097 VerifyIterator(thread, cmp_cfh, readoptionscopy, iter.get(),
1098 cmp_iter.get(), last_op, key, op_logs, &diverged);
1099 }
1100
1101 if (s.ok()) {
1102 thread->stats.AddIterations(1);
1103 } else {
1104 fprintf(stderr, "TestIterate error: %s\n", s.ToString().c_str());
1105 thread->stats.AddErrors(1);
1106 break;
1107 }
1108
1109 op_logs += "; ";
1110 }
1111
1112 db_->ReleaseSnapshot(snapshot);
1113
1114 return s;
1115 }
1116
1117 #ifndef ROCKSDB_LITE
1118 // Test the return status of GetLiveFiles.
VerifyGetLiveFiles() const1119 Status StressTest::VerifyGetLiveFiles() const {
1120 std::vector<std::string> live_file;
1121 uint64_t manifest_size = 0;
1122 return db_->GetLiveFiles(live_file, &manifest_size);
1123 }
1124
1125 // Test the return status of GetSortedWalFiles.
VerifyGetSortedWalFiles() const1126 Status StressTest::VerifyGetSortedWalFiles() const {
1127 VectorLogPtr log_ptr;
1128 return db_->GetSortedWalFiles(log_ptr);
1129 }
1130
1131 // Test the return status of GetCurrentWalFile.
VerifyGetCurrentWalFile() const1132 Status StressTest::VerifyGetCurrentWalFile() const {
1133 std::unique_ptr<LogFile> cur_wal_file;
1134 return db_->GetCurrentWalFile(&cur_wal_file);
1135 }
1136 #endif // !ROCKSDB_LITE
1137
1138 // Compare the two iterator, iter and cmp_iter are in the same position,
1139 // unless iter might be made invalidate or undefined because of
1140 // upper or lower bounds, or prefix extractor.
1141 // Will flag failure if the verification fails.
1142 // diverged = true if the two iterator is already diverged.
1143 // True if verification passed, false if not.
VerifyIterator(ThreadState * thread,ColumnFamilyHandle * cmp_cfh,const ReadOptions & ro,Iterator * iter,Iterator * cmp_iter,LastIterateOp op,const Slice & seek_key,const std::string & op_logs,bool * diverged)1144 void StressTest::VerifyIterator(ThreadState* thread,
1145 ColumnFamilyHandle* cmp_cfh,
1146 const ReadOptions& ro, Iterator* iter,
1147 Iterator* cmp_iter, LastIterateOp op,
1148 const Slice& seek_key,
1149 const std::string& op_logs, bool* diverged) {
1150 if (*diverged) {
1151 return;
1152 }
1153
1154 if (op == kLastOpSeekToFirst && ro.iterate_lower_bound != nullptr) {
1155 // SeekToFirst() with lower bound is not well defined.
1156 *diverged = true;
1157 return;
1158 } else if (op == kLastOpSeekToLast && ro.iterate_upper_bound != nullptr) {
1159 // SeekToLast() with higher bound is not well defined.
1160 *diverged = true;
1161 return;
1162 } else if (op == kLastOpSeek && ro.iterate_lower_bound != nullptr &&
1163 (options_.comparator->CompareWithoutTimestamp(
1164 *ro.iterate_lower_bound, /*a_has_ts=*/false, seek_key,
1165 /*b_has_ts=*/false) >= 0 ||
1166 (ro.iterate_upper_bound != nullptr &&
1167 options_.comparator->CompareWithoutTimestamp(
1168 *ro.iterate_lower_bound, /*a_has_ts=*/false,
1169 *ro.iterate_upper_bound, /*b_has_ts*/ false) >= 0))) {
1170 // Lower bound behavior is not well defined if it is larger than
1171 // seek key or upper bound. Disable the check for now.
1172 *diverged = true;
1173 return;
1174 } else if (op == kLastOpSeekForPrev && ro.iterate_upper_bound != nullptr &&
1175 (options_.comparator->CompareWithoutTimestamp(
1176 *ro.iterate_upper_bound, /*a_has_ts=*/false, seek_key,
1177 /*b_has_ts=*/false) <= 0 ||
1178 (ro.iterate_lower_bound != nullptr &&
1179 options_.comparator->CompareWithoutTimestamp(
1180 *ro.iterate_lower_bound, /*a_has_ts=*/false,
1181 *ro.iterate_upper_bound, /*b_has_ts=*/false) >= 0))) {
1182 // Uppder bound behavior is not well defined if it is smaller than
1183 // seek key or lower bound. Disable the check for now.
1184 *diverged = true;
1185 return;
1186 }
1187
1188 const SliceTransform* pe = (ro.total_order_seek || ro.auto_prefix_mode)
1189 ? nullptr
1190 : options_.prefix_extractor.get();
1191 const Comparator* cmp = options_.comparator;
1192
1193 if (iter->Valid() && !cmp_iter->Valid()) {
1194 if (pe != nullptr) {
1195 if (!pe->InDomain(seek_key)) {
1196 // Prefix seek a non-in-domain key is undefined. Skip checking for
1197 // this scenario.
1198 *diverged = true;
1199 return;
1200 } else if (!pe->InDomain(iter->key())) {
1201 // out of range is iterator key is not in domain anymore.
1202 *diverged = true;
1203 return;
1204 } else if (pe->Transform(iter->key()) != pe->Transform(seek_key)) {
1205 *diverged = true;
1206 return;
1207 }
1208 }
1209 fprintf(stderr,
1210 "Control interator is invalid but iterator has key %s "
1211 "%s\n",
1212 iter->key().ToString(true).c_str(), op_logs.c_str());
1213
1214 *diverged = true;
1215 } else if (cmp_iter->Valid()) {
1216 // Iterator is not valid. It can be legimate if it has already been
1217 // out of upper or lower bound, or filtered out by prefix iterator.
1218 const Slice& total_order_key = cmp_iter->key();
1219
1220 if (pe != nullptr) {
1221 if (!pe->InDomain(seek_key)) {
1222 // Prefix seek a non-in-domain key is undefined. Skip checking for
1223 // this scenario.
1224 *diverged = true;
1225 return;
1226 }
1227
1228 if (!pe->InDomain(total_order_key) ||
1229 pe->Transform(total_order_key) != pe->Transform(seek_key)) {
1230 // If the prefix is exhausted, the only thing needs to check
1231 // is the iterator isn't return a position in prefix.
1232 // Either way, checking can stop from here.
1233 *diverged = true;
1234 if (!iter->Valid() || !pe->InDomain(iter->key()) ||
1235 pe->Transform(iter->key()) != pe->Transform(seek_key)) {
1236 return;
1237 }
1238 fprintf(stderr,
1239 "Iterator stays in prefix but contol doesn't"
1240 " iterator key %s control iterator key %s %s\n",
1241 iter->key().ToString(true).c_str(),
1242 cmp_iter->key().ToString(true).c_str(), op_logs.c_str());
1243 }
1244 }
1245 // Check upper or lower bounds.
1246 if (!*diverged) {
1247 if ((iter->Valid() && iter->key() != cmp_iter->key()) ||
1248 (!iter->Valid() &&
1249 (ro.iterate_upper_bound == nullptr ||
1250 cmp->CompareWithoutTimestamp(total_order_key, /*a_has_ts=*/false,
1251 *ro.iterate_upper_bound,
1252 /*b_has_ts=*/false) < 0) &&
1253 (ro.iterate_lower_bound == nullptr ||
1254 cmp->CompareWithoutTimestamp(total_order_key, /*a_has_ts=*/false,
1255 *ro.iterate_lower_bound,
1256 /*b_has_ts=*/false) > 0))) {
1257 fprintf(stderr,
1258 "Iterator diverged from control iterator which"
1259 " has value %s %s\n",
1260 total_order_key.ToString(true).c_str(), op_logs.c_str());
1261 if (iter->Valid()) {
1262 fprintf(stderr, "iterator has value %s\n",
1263 iter->key().ToString(true).c_str());
1264 } else {
1265 fprintf(stderr, "iterator is not valid\n");
1266 }
1267 *diverged = true;
1268 }
1269 }
1270 }
1271 if (*diverged) {
1272 fprintf(stderr, "Control CF %s\n", cmp_cfh->GetName().c_str());
1273 thread->stats.AddErrors(1);
1274 // Fail fast to preserve the DB state.
1275 thread->shared->SetVerificationFailure();
1276 }
1277 }
1278
1279 #ifdef ROCKSDB_LITE
TestBackupRestore(ThreadState *,const std::vector<int> &,const std::vector<int64_t> &)1280 Status StressTest::TestBackupRestore(
1281 ThreadState* /* thread */,
1282 const std::vector<int>& /* rand_column_families */,
1283 const std::vector<int64_t>& /* rand_keys */) {
1284 assert(false);
1285 fprintf(stderr,
1286 "RocksDB lite does not support "
1287 "TestBackupRestore\n");
1288 std::terminate();
1289 }
1290
TestCheckpoint(ThreadState *,const std::vector<int> &,const std::vector<int64_t> &)1291 Status StressTest::TestCheckpoint(
1292 ThreadState* /* thread */,
1293 const std::vector<int>& /* rand_column_families */,
1294 const std::vector<int64_t>& /* rand_keys */) {
1295 assert(false);
1296 fprintf(stderr,
1297 "RocksDB lite does not support "
1298 "TestCheckpoint\n");
1299 std::terminate();
1300 }
1301
TestCompactFiles(ThreadState *,ColumnFamilyHandle *)1302 void StressTest::TestCompactFiles(ThreadState* /* thread */,
1303 ColumnFamilyHandle* /* column_family */) {
1304 assert(false);
1305 fprintf(stderr,
1306 "RocksDB lite does not support "
1307 "CompactFiles\n");
1308 std::terminate();
1309 }
1310 #else // ROCKSDB_LITE
TestBackupRestore(ThreadState * thread,const std::vector<int> & rand_column_families,const std::vector<int64_t> & rand_keys)1311 Status StressTest::TestBackupRestore(
1312 ThreadState* thread, const std::vector<int>& rand_column_families,
1313 const std::vector<int64_t>& rand_keys) {
1314 std::string backup_dir = FLAGS_db + "/.backup" + ToString(thread->tid);
1315 std::string restore_dir = FLAGS_db + "/.restore" + ToString(thread->tid);
1316 BackupableDBOptions backup_opts(backup_dir);
1317 // For debugging, get info_log from live options
1318 backup_opts.info_log = db_->GetDBOptions().info_log.get();
1319 assert(backup_opts.info_log);
1320 if (thread->rand.OneIn(10)) {
1321 backup_opts.share_table_files = false;
1322 } else {
1323 backup_opts.share_table_files = true;
1324 if (thread->rand.OneIn(5)) {
1325 backup_opts.share_files_with_checksum = false;
1326 } else {
1327 backup_opts.share_files_with_checksum = true;
1328 if (thread->rand.OneIn(2)) {
1329 // old
1330 backup_opts.share_files_with_checksum_naming =
1331 BackupableDBOptions::kLegacyCrc32cAndFileSize;
1332 } else {
1333 // new
1334 backup_opts.share_files_with_checksum_naming =
1335 BackupableDBOptions::kUseDbSessionId;
1336 }
1337 if (thread->rand.OneIn(2)) {
1338 backup_opts.share_files_with_checksum_naming =
1339 backup_opts.share_files_with_checksum_naming |
1340 BackupableDBOptions::kFlagIncludeFileSize;
1341 }
1342 }
1343 }
1344 BackupEngine* backup_engine = nullptr;
1345 std::string from = "a backup/restore operation";
1346 Status s = BackupEngine::Open(db_stress_env, backup_opts, &backup_engine);
1347 if (!s.ok()) {
1348 from = "BackupEngine::Open";
1349 }
1350 if (s.ok()) {
1351 if (thread->rand.OneIn(2)) {
1352 TEST_FutureSchemaVersion2Options test_opts;
1353 test_opts.crc32c_checksums = thread->rand.OneIn(2) == 0;
1354 test_opts.file_sizes = thread->rand.OneIn(2) == 0;
1355 TEST_EnableWriteFutureSchemaVersion2(backup_engine, test_opts);
1356 }
1357 s = backup_engine->CreateNewBackup(db_);
1358 if (!s.ok()) {
1359 from = "BackupEngine::CreateNewBackup";
1360 }
1361 }
1362 if (s.ok()) {
1363 delete backup_engine;
1364 backup_engine = nullptr;
1365 s = BackupEngine::Open(db_stress_env, backup_opts, &backup_engine);
1366 if (!s.ok()) {
1367 from = "BackupEngine::Open (again)";
1368 }
1369 }
1370 std::vector<BackupInfo> backup_info;
1371 // If inplace_not_restore, we verify the backup by opening it as a
1372 // read-only DB. If !inplace_not_restore, we restore it to a temporary
1373 // directory for verification.
1374 bool inplace_not_restore = thread->rand.OneIn(3);
1375 if (s.ok()) {
1376 backup_engine->GetBackupInfo(&backup_info,
1377 /*include_file_details*/ inplace_not_restore);
1378 if (backup_info.empty()) {
1379 s = Status::NotFound("no backups found");
1380 from = "BackupEngine::GetBackupInfo";
1381 }
1382 }
1383 if (s.ok() && thread->rand.OneIn(2)) {
1384 s = backup_engine->VerifyBackup(
1385 backup_info.front().backup_id,
1386 thread->rand.OneIn(2) /* verify_with_checksum */);
1387 if (!s.ok()) {
1388 from = "BackupEngine::VerifyBackup";
1389 }
1390 }
1391 const bool allow_persistent = thread->tid == 0; // not too many
1392 bool from_latest = false;
1393 int count = static_cast<int>(backup_info.size());
1394 if (s.ok() && !inplace_not_restore) {
1395 if (count > 1) {
1396 s = backup_engine->RestoreDBFromBackup(
1397 RestoreOptions(), backup_info[thread->rand.Uniform(count)].backup_id,
1398 restore_dir /* db_dir */, restore_dir /* wal_dir */);
1399 if (!s.ok()) {
1400 from = "BackupEngine::RestoreDBFromBackup";
1401 }
1402 } else {
1403 from_latest = true;
1404 s = backup_engine->RestoreDBFromLatestBackup(RestoreOptions(),
1405 restore_dir /* db_dir */,
1406 restore_dir /* wal_dir */);
1407 if (!s.ok()) {
1408 from = "BackupEngine::RestoreDBFromLatestBackup";
1409 }
1410 }
1411 }
1412 if (s.ok() && !inplace_not_restore) {
1413 // Purge early if restoring, to ensure the restored directory doesn't
1414 // have some secret dependency on the backup directory.
1415 uint32_t to_keep = 0;
1416 if (allow_persistent) {
1417 // allow one thread to keep up to 2 backups
1418 to_keep = thread->rand.Uniform(3);
1419 }
1420 s = backup_engine->PurgeOldBackups(to_keep);
1421 if (!s.ok()) {
1422 from = "BackupEngine::PurgeOldBackups";
1423 }
1424 }
1425 DB* restored_db = nullptr;
1426 std::vector<ColumnFamilyHandle*> restored_cf_handles;
1427 // Not yet implemented: opening restored BlobDB or TransactionDB
1428 if (s.ok() && !FLAGS_use_txn && !FLAGS_use_blob_db) {
1429 Options restore_options(options_);
1430 restore_options.listeners.clear();
1431 // Avoid dangling/shared file descriptors, for reliable destroy
1432 restore_options.sst_file_manager = nullptr;
1433 std::vector<ColumnFamilyDescriptor> cf_descriptors;
1434 // TODO(ajkr): `column_family_names_` is not safe to access here when
1435 // `clear_column_family_one_in != 0`. But we can't easily switch to
1436 // `ListColumnFamilies` to get names because it won't necessarily give
1437 // the same order as `column_family_names_`.
1438 assert(FLAGS_clear_column_family_one_in == 0);
1439 for (auto name : column_family_names_) {
1440 cf_descriptors.emplace_back(name, ColumnFamilyOptions(restore_options));
1441 }
1442 if (inplace_not_restore) {
1443 BackupInfo& info = backup_info[thread->rand.Uniform(count)];
1444 restore_options.env = info.env_for_open.get();
1445 s = DB::OpenForReadOnly(DBOptions(restore_options), info.name_for_open,
1446 cf_descriptors, &restored_cf_handles,
1447 &restored_db);
1448 if (!s.ok()) {
1449 from = "DB::OpenForReadOnly in backup/restore";
1450 }
1451 } else {
1452 s = DB::Open(DBOptions(restore_options), restore_dir, cf_descriptors,
1453 &restored_cf_handles, &restored_db);
1454 if (!s.ok()) {
1455 from = "DB::Open in backup/restore";
1456 }
1457 }
1458 }
1459 // Note the column families chosen by `rand_column_families` cannot be
1460 // dropped while the locks for `rand_keys` are held. So we should not have
1461 // to worry about accessing those column families throughout this function.
1462 //
1463 // For simplicity, currently only verifies existence/non-existence of a
1464 // single key
1465 for (size_t i = 0; restored_db && s.ok() && i < rand_column_families.size();
1466 ++i) {
1467 std::string key_str = Key(rand_keys[0]);
1468 Slice key = key_str;
1469 std::string restored_value;
1470 ReadOptions read_opts;
1471 std::string ts_str;
1472 Slice ts;
1473 if (FLAGS_user_timestamp_size > 0) {
1474 ts_str = GenerateTimestampForRead();
1475 ts = ts_str;
1476 read_opts.timestamp = &ts;
1477 }
1478 Status get_status = restored_db->Get(
1479 read_opts, restored_cf_handles[rand_column_families[i]], key,
1480 &restored_value);
1481 bool exists = thread->shared->Exists(rand_column_families[i], rand_keys[0]);
1482 if (get_status.ok()) {
1483 if (!exists && from_latest && ShouldAcquireMutexOnKey()) {
1484 s = Status::Corruption("key exists in restore but not in original db");
1485 }
1486 } else if (get_status.IsNotFound()) {
1487 if (exists && from_latest && ShouldAcquireMutexOnKey()) {
1488 s = Status::Corruption("key exists in original db but not in restore");
1489 }
1490 } else {
1491 s = get_status;
1492 if (!s.ok()) {
1493 from = "DB::Get in backup/restore";
1494 }
1495 }
1496 }
1497 if (restored_db != nullptr) {
1498 for (auto* cf_handle : restored_cf_handles) {
1499 restored_db->DestroyColumnFamilyHandle(cf_handle);
1500 }
1501 delete restored_db;
1502 restored_db = nullptr;
1503 }
1504 if (s.ok() && inplace_not_restore) {
1505 // Purge late if inplace open read-only
1506 uint32_t to_keep = 0;
1507 if (allow_persistent) {
1508 // allow one thread to keep up to 2 backups
1509 to_keep = thread->rand.Uniform(3);
1510 }
1511 s = backup_engine->PurgeOldBackups(to_keep);
1512 if (!s.ok()) {
1513 from = "BackupEngine::PurgeOldBackups";
1514 }
1515 }
1516 if (backup_engine != nullptr) {
1517 delete backup_engine;
1518 backup_engine = nullptr;
1519 }
1520 if (s.ok()) {
1521 // Preserve directories on failure, or allowed persistent backup
1522 if (!allow_persistent) {
1523 s = DestroyDir(db_stress_env, backup_dir);
1524 if (!s.ok()) {
1525 from = "Destroy backup dir";
1526 }
1527 }
1528 }
1529 if (s.ok()) {
1530 s = DestroyDir(db_stress_env, restore_dir);
1531 if (!s.ok()) {
1532 from = "Destroy restore dir";
1533 }
1534 }
1535 if (!s.ok()) {
1536 fprintf(stderr, "Failure in %s with: %s\n", from.c_str(),
1537 s.ToString().c_str());
1538 }
1539 return s;
1540 }
1541
TestApproximateSize(ThreadState * thread,uint64_t iteration,const std::vector<int> & rand_column_families,const std::vector<int64_t> & rand_keys)1542 Status StressTest::TestApproximateSize(
1543 ThreadState* thread, uint64_t iteration,
1544 const std::vector<int>& rand_column_families,
1545 const std::vector<int64_t>& rand_keys) {
1546 // rand_keys likely only has one key. Just use the first one.
1547 assert(!rand_keys.empty());
1548 assert(!rand_column_families.empty());
1549 int64_t key1 = rand_keys[0];
1550 int64_t key2;
1551 if (thread->rand.OneIn(2)) {
1552 // Two totally random keys. This tends to cover large ranges.
1553 key2 = GenerateOneKey(thread, iteration);
1554 if (key2 < key1) {
1555 std::swap(key1, key2);
1556 }
1557 } else {
1558 // Unless users pass a very large FLAGS_max_key, it we should not worry
1559 // about overflow. It is for testing, so we skip the overflow checking
1560 // for simplicity.
1561 key2 = key1 + static_cast<int64_t>(thread->rand.Uniform(1000));
1562 }
1563 std::string key1_str = Key(key1);
1564 std::string key2_str = Key(key2);
1565 Range range{Slice(key1_str), Slice(key2_str)};
1566 SizeApproximationOptions sao;
1567 sao.include_memtabtles = thread->rand.OneIn(2);
1568 if (sao.include_memtabtles) {
1569 sao.include_files = thread->rand.OneIn(2);
1570 }
1571 if (thread->rand.OneIn(2)) {
1572 if (thread->rand.OneIn(2)) {
1573 sao.files_size_error_margin = 0.0;
1574 } else {
1575 sao.files_size_error_margin =
1576 static_cast<double>(thread->rand.Uniform(3));
1577 }
1578 }
1579 uint64_t result;
1580 return db_->GetApproximateSizes(
1581 sao, column_families_[rand_column_families[0]], &range, 1, &result);
1582 }
1583
TestCheckpoint(ThreadState * thread,const std::vector<int> & rand_column_families,const std::vector<int64_t> & rand_keys)1584 Status StressTest::TestCheckpoint(ThreadState* thread,
1585 const std::vector<int>& rand_column_families,
1586 const std::vector<int64_t>& rand_keys) {
1587 std::string checkpoint_dir =
1588 FLAGS_db + "/.checkpoint" + ToString(thread->tid);
1589 Options tmp_opts(options_);
1590 tmp_opts.listeners.clear();
1591 tmp_opts.env = db_stress_env;
1592
1593 DestroyDB(checkpoint_dir, tmp_opts);
1594
1595 if (db_stress_env->FileExists(checkpoint_dir).ok()) {
1596 // If the directory might still exist, try to delete the files one by one.
1597 // Likely a trash file is still there.
1598 Status my_s = DestroyDir(db_stress_env, checkpoint_dir);
1599 if (!my_s.ok()) {
1600 fprintf(stderr, "Fail to destory directory before checkpoint: %s",
1601 my_s.ToString().c_str());
1602 }
1603 }
1604
1605 Checkpoint* checkpoint = nullptr;
1606 Status s = Checkpoint::Create(db_, &checkpoint);
1607 if (s.ok()) {
1608 s = checkpoint->CreateCheckpoint(checkpoint_dir);
1609 if (!s.ok()) {
1610 fprintf(stderr, "Fail to create checkpoint to %s\n",
1611 checkpoint_dir.c_str());
1612 std::vector<std::string> files;
1613 Status my_s = db_stress_env->GetChildren(checkpoint_dir, &files);
1614 if (my_s.ok()) {
1615 for (const auto& f : files) {
1616 fprintf(stderr, " %s\n", f.c_str());
1617 }
1618 } else {
1619 fprintf(stderr, "Fail to get files under the directory to %s\n",
1620 my_s.ToString().c_str());
1621 }
1622 }
1623 }
1624 delete checkpoint;
1625 checkpoint = nullptr;
1626 std::vector<ColumnFamilyHandle*> cf_handles;
1627 DB* checkpoint_db = nullptr;
1628 if (s.ok()) {
1629 Options options(options_);
1630 options.listeners.clear();
1631 std::vector<ColumnFamilyDescriptor> cf_descs;
1632 // TODO(ajkr): `column_family_names_` is not safe to access here when
1633 // `clear_column_family_one_in != 0`. But we can't easily switch to
1634 // `ListColumnFamilies` to get names because it won't necessarily give
1635 // the same order as `column_family_names_`.
1636 assert(FLAGS_clear_column_family_one_in == 0);
1637 if (FLAGS_clear_column_family_one_in == 0) {
1638 for (const auto& name : column_family_names_) {
1639 cf_descs.emplace_back(name, ColumnFamilyOptions(options));
1640 }
1641 s = DB::OpenForReadOnly(DBOptions(options), checkpoint_dir, cf_descs,
1642 &cf_handles, &checkpoint_db);
1643 }
1644 }
1645 if (checkpoint_db != nullptr) {
1646 // Note the column families chosen by `rand_column_families` cannot be
1647 // dropped while the locks for `rand_keys` are held. So we should not have
1648 // to worry about accessing those column families throughout this function.
1649 for (size_t i = 0; s.ok() && i < rand_column_families.size(); ++i) {
1650 std::string key_str = Key(rand_keys[0]);
1651 Slice key = key_str;
1652 std::string value;
1653 Status get_status = checkpoint_db->Get(
1654 ReadOptions(), cf_handles[rand_column_families[i]], key, &value);
1655 bool exists =
1656 thread->shared->Exists(rand_column_families[i], rand_keys[0]);
1657 if (get_status.ok()) {
1658 if (!exists && ShouldAcquireMutexOnKey()) {
1659 s = Status::Corruption(
1660 "key exists in checkpoint but not in original db");
1661 }
1662 } else if (get_status.IsNotFound()) {
1663 if (exists && ShouldAcquireMutexOnKey()) {
1664 s = Status::Corruption(
1665 "key exists in original db but not in checkpoint");
1666 }
1667 } else {
1668 s = get_status;
1669 }
1670 }
1671 for (auto cfh : cf_handles) {
1672 delete cfh;
1673 }
1674 cf_handles.clear();
1675 delete checkpoint_db;
1676 checkpoint_db = nullptr;
1677 }
1678
1679 if (!s.ok()) {
1680 fprintf(stderr, "A checkpoint operation failed with: %s\n",
1681 s.ToString().c_str());
1682 } else {
1683 DestroyDB(checkpoint_dir, tmp_opts);
1684 }
1685 return s;
1686 }
1687
TestGetProperty(ThreadState * thread) const1688 void StressTest::TestGetProperty(ThreadState* thread) const {
1689 std::unordered_set<std::string> levelPropertyNames = {
1690 DB::Properties::kAggregatedTablePropertiesAtLevel,
1691 DB::Properties::kCompressionRatioAtLevelPrefix,
1692 DB::Properties::kNumFilesAtLevelPrefix,
1693 };
1694 std::unordered_set<std::string> unknownPropertyNames = {
1695 DB::Properties::kEstimateOldestKeyTime,
1696 DB::Properties::kOptionsStatistics,
1697 };
1698 unknownPropertyNames.insert(levelPropertyNames.begin(),
1699 levelPropertyNames.end());
1700
1701 std::string prop;
1702 for (const auto& ppt_name_and_info : InternalStats::ppt_name_to_info) {
1703 bool res = db_->GetProperty(ppt_name_and_info.first, &prop);
1704 if (unknownPropertyNames.find(ppt_name_and_info.first) ==
1705 unknownPropertyNames.end()) {
1706 if (!res) {
1707 fprintf(stderr, "Failed to get DB property: %s\n",
1708 ppt_name_and_info.first.c_str());
1709 thread->shared->SetVerificationFailure();
1710 }
1711 if (ppt_name_and_info.second.handle_int != nullptr) {
1712 uint64_t prop_int;
1713 if (!db_->GetIntProperty(ppt_name_and_info.first, &prop_int)) {
1714 fprintf(stderr, "Failed to get Int property: %s\n",
1715 ppt_name_and_info.first.c_str());
1716 thread->shared->SetVerificationFailure();
1717 }
1718 }
1719 }
1720 }
1721
1722 ROCKSDB_NAMESPACE::ColumnFamilyMetaData cf_meta_data;
1723 db_->GetColumnFamilyMetaData(&cf_meta_data);
1724 int level_size = static_cast<int>(cf_meta_data.levels.size());
1725 for (int level = 0; level < level_size; level++) {
1726 for (const auto& ppt_name : levelPropertyNames) {
1727 bool res = db_->GetProperty(ppt_name + std::to_string(level), &prop);
1728 if (!res) {
1729 fprintf(stderr, "Failed to get DB property: %s\n",
1730 (ppt_name + std::to_string(level)).c_str());
1731 thread->shared->SetVerificationFailure();
1732 }
1733 }
1734 }
1735
1736 // Test for an invalid property name
1737 if (thread->rand.OneIn(100)) {
1738 if (db_->GetProperty("rocksdb.invalid_property_name", &prop)) {
1739 fprintf(stderr, "Failed to return false for invalid property name\n");
1740 thread->shared->SetVerificationFailure();
1741 }
1742 }
1743 }
1744
TestCompactFiles(ThreadState * thread,ColumnFamilyHandle * column_family)1745 void StressTest::TestCompactFiles(ThreadState* thread,
1746 ColumnFamilyHandle* column_family) {
1747 ROCKSDB_NAMESPACE::ColumnFamilyMetaData cf_meta_data;
1748 db_->GetColumnFamilyMetaData(column_family, &cf_meta_data);
1749
1750 // Randomly compact up to three consecutive files from a level
1751 const int kMaxRetry = 3;
1752 for (int attempt = 0; attempt < kMaxRetry; ++attempt) {
1753 size_t random_level =
1754 thread->rand.Uniform(static_cast<int>(cf_meta_data.levels.size()));
1755
1756 const auto& files = cf_meta_data.levels[random_level].files;
1757 if (files.size() > 0) {
1758 size_t random_file_index =
1759 thread->rand.Uniform(static_cast<int>(files.size()));
1760 if (files[random_file_index].being_compacted) {
1761 // Retry as the selected file is currently being compacted
1762 continue;
1763 }
1764
1765 std::vector<std::string> input_files;
1766 input_files.push_back(files[random_file_index].name);
1767 if (random_file_index > 0 &&
1768 !files[random_file_index - 1].being_compacted) {
1769 input_files.push_back(files[random_file_index - 1].name);
1770 }
1771 if (random_file_index + 1 < files.size() &&
1772 !files[random_file_index + 1].being_compacted) {
1773 input_files.push_back(files[random_file_index + 1].name);
1774 }
1775
1776 size_t output_level =
1777 std::min(random_level + 1, cf_meta_data.levels.size() - 1);
1778 auto s = db_->CompactFiles(CompactionOptions(), column_family,
1779 input_files, static_cast<int>(output_level));
1780 if (!s.ok()) {
1781 fprintf(stdout, "Unable to perform CompactFiles(): %s\n",
1782 s.ToString().c_str());
1783 thread->stats.AddNumCompactFilesFailed(1);
1784 } else {
1785 thread->stats.AddNumCompactFilesSucceed(1);
1786 }
1787 break;
1788 }
1789 }
1790 }
1791 #endif // ROCKSDB_LITE
1792
TestFlush(const std::vector<int> & rand_column_families)1793 Status StressTest::TestFlush(const std::vector<int>& rand_column_families) {
1794 FlushOptions flush_opts;
1795 std::vector<ColumnFamilyHandle*> cfhs;
1796 std::for_each(rand_column_families.begin(), rand_column_families.end(),
1797 [this, &cfhs](int k) { cfhs.push_back(column_families_[k]); });
1798 return db_->Flush(flush_opts, cfhs);
1799 }
1800
TestPauseBackground(ThreadState * thread)1801 Status StressTest::TestPauseBackground(ThreadState* thread) {
1802 Status status = db_->PauseBackgroundWork();
1803 if (!status.ok()) {
1804 return status;
1805 }
1806 // To avoid stalling/deadlocking ourself in this thread, just
1807 // sleep here during pause and let other threads do db operations.
1808 // Sleep up to ~16 seconds (2**24 microseconds), but very skewed
1809 // toward short pause. (1 chance in 25 of pausing >= 1s;
1810 // 1 chance in 625 of pausing full 16s.)
1811 int pwr2_micros =
1812 std::min(thread->rand.Uniform(25), thread->rand.Uniform(25));
1813 clock_->SleepForMicroseconds(1 << pwr2_micros);
1814 return db_->ContinueBackgroundWork();
1815 }
1816
TestAcquireSnapshot(ThreadState * thread,int rand_column_family,const std::string & keystr,uint64_t i)1817 void StressTest::TestAcquireSnapshot(ThreadState* thread,
1818 int rand_column_family,
1819 const std::string& keystr, uint64_t i) {
1820 Slice key = keystr;
1821 ColumnFamilyHandle* column_family = column_families_[rand_column_family];
1822 ReadOptions ropt;
1823 #ifndef ROCKSDB_LITE
1824 auto db_impl = static_cast_with_check<DBImpl>(db_->GetRootDB());
1825 const bool ww_snapshot = thread->rand.OneIn(10);
1826 const Snapshot* snapshot =
1827 ww_snapshot ? db_impl->GetSnapshotForWriteConflictBoundary()
1828 : db_->GetSnapshot();
1829 #else
1830 const Snapshot* snapshot = db_->GetSnapshot();
1831 #endif // !ROCKSDB_LITE
1832 ropt.snapshot = snapshot;
1833
1834 // Ideally, we want snapshot taking and timestamp generation to be atomic
1835 // here, so that the snapshot corresponds to the timestamp. However, it is
1836 // not possible with current GetSnapshot() API.
1837 std::string ts_str;
1838 Slice ts;
1839 if (FLAGS_user_timestamp_size > 0) {
1840 ts_str = GenerateTimestampForRead();
1841 ts = ts_str;
1842 ropt.timestamp = &ts;
1843 }
1844
1845 std::string value_at;
1846 // When taking a snapshot, we also read a key from that snapshot. We
1847 // will later read the same key before releasing the snapshot and
1848 // verify that the results are the same.
1849 auto status_at = db_->Get(ropt, column_family, key, &value_at);
1850 std::vector<bool>* key_vec = nullptr;
1851
1852 if (FLAGS_compare_full_db_state_snapshot && (thread->tid == 0)) {
1853 key_vec = new std::vector<bool>(FLAGS_max_key);
1854 // When `prefix_extractor` is set, seeking to beginning and scanning
1855 // across prefixes are only supported with `total_order_seek` set.
1856 ropt.total_order_seek = true;
1857 std::unique_ptr<Iterator> iterator(db_->NewIterator(ropt));
1858 for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
1859 uint64_t key_val;
1860 if (GetIntVal(iterator->key().ToString(), &key_val)) {
1861 (*key_vec)[key_val] = true;
1862 }
1863 }
1864 }
1865
1866 ThreadState::SnapshotState snap_state = {snapshot,
1867 rand_column_family,
1868 column_family->GetName(),
1869 keystr,
1870 status_at,
1871 value_at,
1872 key_vec,
1873 ts_str};
1874 uint64_t hold_for = FLAGS_snapshot_hold_ops;
1875 if (FLAGS_long_running_snapshots) {
1876 // Hold 10% of snapshots for 10x more
1877 if (thread->rand.OneIn(10)) {
1878 assert(hold_for < port::kMaxInt64 / 10);
1879 hold_for *= 10;
1880 // Hold 1% of snapshots for 100x more
1881 if (thread->rand.OneIn(10)) {
1882 assert(hold_for < port::kMaxInt64 / 10);
1883 hold_for *= 10;
1884 }
1885 }
1886 }
1887 uint64_t release_at = std::min(FLAGS_ops_per_thread - 1, i + hold_for);
1888 thread->snapshot_queue.emplace(release_at, snap_state);
1889 }
1890
MaybeReleaseSnapshots(ThreadState * thread,uint64_t i)1891 Status StressTest::MaybeReleaseSnapshots(ThreadState* thread, uint64_t i) {
1892 while (!thread->snapshot_queue.empty() &&
1893 i >= thread->snapshot_queue.front().first) {
1894 auto snap_state = thread->snapshot_queue.front().second;
1895 assert(snap_state.snapshot);
1896 // Note: this is unsafe as the cf might be dropped concurrently. But
1897 // it is ok since unclean cf drop is cunnrently not supported by write
1898 // prepared transactions.
1899 Status s = AssertSame(db_, column_families_[snap_state.cf_at], snap_state);
1900 db_->ReleaseSnapshot(snap_state.snapshot);
1901 delete snap_state.key_vec;
1902 thread->snapshot_queue.pop();
1903 if (!s.ok()) {
1904 return s;
1905 }
1906 }
1907 return Status::OK();
1908 }
1909
TestCompactRange(ThreadState * thread,int64_t rand_key,const Slice & start_key,ColumnFamilyHandle * column_family)1910 void StressTest::TestCompactRange(ThreadState* thread, int64_t rand_key,
1911 const Slice& start_key,
1912 ColumnFamilyHandle* column_family) {
1913 int64_t end_key_num;
1914 if (port::kMaxInt64 - rand_key < FLAGS_compact_range_width) {
1915 end_key_num = port::kMaxInt64;
1916 } else {
1917 end_key_num = FLAGS_compact_range_width + rand_key;
1918 }
1919 std::string end_key_buf = Key(end_key_num);
1920 Slice end_key(end_key_buf);
1921
1922 CompactRangeOptions cro;
1923 cro.exclusive_manual_compaction = static_cast<bool>(thread->rand.Next() % 2);
1924 cro.change_level = static_cast<bool>(thread->rand.Next() % 2);
1925 std::vector<BottommostLevelCompaction> bottom_level_styles = {
1926 BottommostLevelCompaction::kSkip,
1927 BottommostLevelCompaction::kIfHaveCompactionFilter,
1928 BottommostLevelCompaction::kForce,
1929 BottommostLevelCompaction::kForceOptimized};
1930 cro.bottommost_level_compaction =
1931 bottom_level_styles[thread->rand.Next() %
1932 static_cast<uint32_t>(bottom_level_styles.size())];
1933 cro.allow_write_stall = static_cast<bool>(thread->rand.Next() % 2);
1934 cro.max_subcompactions = static_cast<uint32_t>(thread->rand.Next() % 4);
1935
1936 const Snapshot* pre_snapshot = nullptr;
1937 uint32_t pre_hash = 0;
1938 if (thread->rand.OneIn(2)) {
1939 // Do some validation by declaring a snapshot and compare the data before
1940 // and after the compaction
1941 pre_snapshot = db_->GetSnapshot();
1942 pre_hash =
1943 GetRangeHash(thread, pre_snapshot, column_family, start_key, end_key);
1944 }
1945
1946 Status status = db_->CompactRange(cro, column_family, &start_key, &end_key);
1947
1948 if (!status.ok()) {
1949 fprintf(stdout, "Unable to perform CompactRange(): %s\n",
1950 status.ToString().c_str());
1951 }
1952
1953 if (pre_snapshot != nullptr) {
1954 uint32_t post_hash =
1955 GetRangeHash(thread, pre_snapshot, column_family, start_key, end_key);
1956 if (pre_hash != post_hash) {
1957 fprintf(stderr,
1958 "Data hash different before and after compact range "
1959 "start_key %s end_key %s\n",
1960 start_key.ToString(true).c_str(), end_key.ToString(true).c_str());
1961 thread->stats.AddErrors(1);
1962 // Fail fast to preserve the DB state.
1963 thread->shared->SetVerificationFailure();
1964 }
1965 db_->ReleaseSnapshot(pre_snapshot);
1966 }
1967 }
1968
GetRangeHash(ThreadState * thread,const Snapshot * snapshot,ColumnFamilyHandle * column_family,const Slice & start_key,const Slice & end_key)1969 uint32_t StressTest::GetRangeHash(ThreadState* thread, const Snapshot* snapshot,
1970 ColumnFamilyHandle* column_family,
1971 const Slice& start_key,
1972 const Slice& end_key) {
1973 const std::string kCrcCalculatorSepearator = ";";
1974 uint32_t crc = 0;
1975 ReadOptions ro;
1976 ro.snapshot = snapshot;
1977 ro.total_order_seek = true;
1978 std::string ts_str;
1979 Slice ts;
1980 if (FLAGS_user_timestamp_size > 0) {
1981 ts_str = GenerateTimestampForRead();
1982 ts = ts_str;
1983 ro.timestamp = &ts;
1984 }
1985 std::unique_ptr<Iterator> it(db_->NewIterator(ro, column_family));
1986 for (it->Seek(start_key);
1987 it->Valid() && options_.comparator->Compare(it->key(), end_key) <= 0;
1988 it->Next()) {
1989 crc = crc32c::Extend(crc, it->key().data(), it->key().size());
1990 crc = crc32c::Extend(crc, kCrcCalculatorSepearator.data(), 1);
1991 crc = crc32c::Extend(crc, it->value().data(), it->value().size());
1992 crc = crc32c::Extend(crc, kCrcCalculatorSepearator.data(), 1);
1993 }
1994 if (!it->status().ok()) {
1995 fprintf(stderr, "Iterator non-OK when calculating range CRC: %s\n",
1996 it->status().ToString().c_str());
1997 thread->stats.AddErrors(1);
1998 // Fail fast to preserve the DB state.
1999 thread->shared->SetVerificationFailure();
2000 }
2001 return crc;
2002 }
2003
PrintEnv() const2004 void StressTest::PrintEnv() const {
2005 fprintf(stdout, "RocksDB version : %d.%d\n", kMajorVersion,
2006 kMinorVersion);
2007 fprintf(stdout, "Format version : %d\n", FLAGS_format_version);
2008 fprintf(stdout, "TransactionDB : %s\n",
2009 FLAGS_use_txn ? "true" : "false");
2010 #ifndef ROCKSDB_LITE
2011 fprintf(stdout, "Stacked BlobDB : %s\n",
2012 FLAGS_use_blob_db ? "true" : "false");
2013 #endif // !ROCKSDB_LITE
2014 fprintf(stdout, "Read only mode : %s\n",
2015 FLAGS_read_only ? "true" : "false");
2016 fprintf(stdout, "Atomic flush : %s\n",
2017 FLAGS_atomic_flush ? "true" : "false");
2018 fprintf(stdout, "Column families : %d\n", FLAGS_column_families);
2019 if (!FLAGS_test_batches_snapshots) {
2020 fprintf(stdout, "Clear CFs one in : %d\n",
2021 FLAGS_clear_column_family_one_in);
2022 }
2023 fprintf(stdout, "Number of threads : %d\n", FLAGS_threads);
2024 fprintf(stdout, "Ops per thread : %lu\n",
2025 (unsigned long)FLAGS_ops_per_thread);
2026 std::string ttl_state("unused");
2027 if (FLAGS_ttl > 0) {
2028 ttl_state = NumberToString(FLAGS_ttl);
2029 }
2030 fprintf(stdout, "Time to live(sec) : %s\n", ttl_state.c_str());
2031 fprintf(stdout, "Read percentage : %d%%\n", FLAGS_readpercent);
2032 fprintf(stdout, "Prefix percentage : %d%%\n", FLAGS_prefixpercent);
2033 fprintf(stdout, "Write percentage : %d%%\n", FLAGS_writepercent);
2034 fprintf(stdout, "Delete percentage : %d%%\n", FLAGS_delpercent);
2035 fprintf(stdout, "Delete range percentage : %d%%\n", FLAGS_delrangepercent);
2036 fprintf(stdout, "No overwrite percentage : %d%%\n",
2037 FLAGS_nooverwritepercent);
2038 fprintf(stdout, "Iterate percentage : %d%%\n", FLAGS_iterpercent);
2039 fprintf(stdout, "DB-write-buffer-size : %" PRIu64 "\n",
2040 FLAGS_db_write_buffer_size);
2041 fprintf(stdout, "Write-buffer-size : %d\n", FLAGS_write_buffer_size);
2042 fprintf(stdout, "Iterations : %lu\n",
2043 (unsigned long)FLAGS_num_iterations);
2044 fprintf(stdout, "Max key : %lu\n",
2045 (unsigned long)FLAGS_max_key);
2046 fprintf(stdout, "Ratio #ops/#keys : %f\n",
2047 (1.0 * FLAGS_ops_per_thread * FLAGS_threads) / FLAGS_max_key);
2048 fprintf(stdout, "Num times DB reopens : %d\n", FLAGS_reopen);
2049 fprintf(stdout, "Batches/snapshots : %d\n",
2050 FLAGS_test_batches_snapshots);
2051 fprintf(stdout, "Do update in place : %d\n", FLAGS_in_place_update);
2052 fprintf(stdout, "Num keys per lock : %d\n",
2053 1 << FLAGS_log2_keys_per_lock);
2054 std::string compression = CompressionTypeToString(compression_type_e);
2055 fprintf(stdout, "Compression : %s\n", compression.c_str());
2056 std::string bottommost_compression =
2057 CompressionTypeToString(bottommost_compression_type_e);
2058 fprintf(stdout, "Bottommost Compression : %s\n",
2059 bottommost_compression.c_str());
2060 std::string checksum = ChecksumTypeToString(checksum_type_e);
2061 fprintf(stdout, "Checksum type : %s\n", checksum.c_str());
2062 fprintf(stdout, "File checksum impl : %s\n",
2063 FLAGS_file_checksum_impl.c_str());
2064 fprintf(stdout, "Bloom bits / key : %s\n",
2065 FormatDoubleParam(FLAGS_bloom_bits).c_str());
2066 fprintf(stdout, "Max subcompactions : %" PRIu64 "\n",
2067 FLAGS_subcompactions);
2068 fprintf(stdout, "Use MultiGet : %s\n",
2069 FLAGS_use_multiget ? "true" : "false");
2070
2071 const char* memtablerep = "";
2072 switch (FLAGS_rep_factory) {
2073 case kSkipList:
2074 memtablerep = "skip_list";
2075 break;
2076 case kHashSkipList:
2077 memtablerep = "prefix_hash";
2078 break;
2079 case kVectorRep:
2080 memtablerep = "vector";
2081 break;
2082 }
2083
2084 fprintf(stdout, "Memtablerep : %s\n", memtablerep);
2085
2086 #ifndef NDEBUG
2087 KillPoint* kp = KillPoint::GetInstance();
2088 fprintf(stdout, "Test kill odd : %d\n", kp->rocksdb_kill_odds);
2089 if (!kp->rocksdb_kill_exclude_prefixes.empty()) {
2090 fprintf(stdout, "Skipping kill points prefixes:\n");
2091 for (auto& p : kp->rocksdb_kill_exclude_prefixes) {
2092 fprintf(stdout, " %s\n", p.c_str());
2093 }
2094 }
2095 #endif
2096 fprintf(stdout, "Periodic Compaction Secs : %" PRIu64 "\n",
2097 FLAGS_periodic_compaction_seconds);
2098 fprintf(stdout, "Compaction TTL : %" PRIu64 "\n",
2099 FLAGS_compaction_ttl);
2100 fprintf(stdout, "Background Purge : %d\n",
2101 static_cast<int>(FLAGS_avoid_unnecessary_blocking_io));
2102 fprintf(stdout, "Write DB ID to manifest : %d\n",
2103 static_cast<int>(FLAGS_write_dbid_to_manifest));
2104 fprintf(stdout, "Max Write Batch Group Size: %" PRIu64 "\n",
2105 FLAGS_max_write_batch_group_size_bytes);
2106 fprintf(stdout, "Use dynamic level : %d\n",
2107 static_cast<int>(FLAGS_level_compaction_dynamic_level_bytes));
2108 fprintf(stdout, "Read fault one in : %d\n", FLAGS_read_fault_one_in);
2109 fprintf(stdout, "Write fault one in : %d\n", FLAGS_write_fault_one_in);
2110 fprintf(stdout, "Open metadata write fault one in:\n");
2111 fprintf(stdout, " %d\n",
2112 FLAGS_open_metadata_write_fault_one_in);
2113 fprintf(stdout, "Sync fault injection : %d\n", FLAGS_sync_fault_injection);
2114 fprintf(stdout, "Best efforts recovery : %d\n",
2115 static_cast<int>(FLAGS_best_efforts_recovery));
2116 fprintf(stdout, "Fail if OPTIONS file error: %d\n",
2117 static_cast<int>(FLAGS_fail_if_options_file_error));
2118 fprintf(stdout, "User timestamp size bytes : %d\n",
2119 static_cast<int>(FLAGS_user_timestamp_size));
2120
2121 fprintf(stdout, "------------------------------------------------\n");
2122 }
2123
Open()2124 void StressTest::Open() {
2125 assert(db_ == nullptr);
2126 #ifndef ROCKSDB_LITE
2127 assert(txn_db_ == nullptr);
2128 #endif
2129 if (FLAGS_options_file.empty()) {
2130 BlockBasedTableOptions block_based_options;
2131 block_based_options.block_cache = cache_;
2132 block_based_options.cache_index_and_filter_blocks =
2133 FLAGS_cache_index_and_filter_blocks;
2134 block_based_options.metadata_cache_options.top_level_index_pinning =
2135 static_cast<PinningTier>(FLAGS_top_level_index_pinning);
2136 block_based_options.metadata_cache_options.partition_pinning =
2137 static_cast<PinningTier>(FLAGS_partition_pinning);
2138 block_based_options.metadata_cache_options.unpartitioned_pinning =
2139 static_cast<PinningTier>(FLAGS_unpartitioned_pinning);
2140 block_based_options.block_cache_compressed = compressed_cache_;
2141 block_based_options.checksum = checksum_type_e;
2142 block_based_options.block_size = FLAGS_block_size;
2143 block_based_options.format_version =
2144 static_cast<uint32_t>(FLAGS_format_version);
2145 block_based_options.index_block_restart_interval =
2146 static_cast<int32_t>(FLAGS_index_block_restart_interval);
2147 block_based_options.filter_policy = filter_policy_;
2148 block_based_options.partition_filters = FLAGS_partition_filters;
2149 block_based_options.optimize_filters_for_memory =
2150 FLAGS_optimize_filters_for_memory;
2151 block_based_options.index_type =
2152 static_cast<BlockBasedTableOptions::IndexType>(FLAGS_index_type);
2153 options_.table_factory.reset(
2154 NewBlockBasedTableFactory(block_based_options));
2155 options_.db_write_buffer_size = FLAGS_db_write_buffer_size;
2156 options_.write_buffer_size = FLAGS_write_buffer_size;
2157 options_.max_write_buffer_number = FLAGS_max_write_buffer_number;
2158 options_.min_write_buffer_number_to_merge =
2159 FLAGS_min_write_buffer_number_to_merge;
2160 options_.max_write_buffer_number_to_maintain =
2161 FLAGS_max_write_buffer_number_to_maintain;
2162 options_.max_write_buffer_size_to_maintain =
2163 FLAGS_max_write_buffer_size_to_maintain;
2164 options_.memtable_prefix_bloom_size_ratio =
2165 FLAGS_memtable_prefix_bloom_size_ratio;
2166 options_.memtable_whole_key_filtering = FLAGS_memtable_whole_key_filtering;
2167 options_.max_background_compactions = FLAGS_max_background_compactions;
2168 options_.max_background_flushes = FLAGS_max_background_flushes;
2169 options_.compaction_style =
2170 static_cast<ROCKSDB_NAMESPACE::CompactionStyle>(FLAGS_compaction_style);
2171 if (FLAGS_prefix_size >= 0) {
2172 options_.prefix_extractor.reset(
2173 NewFixedPrefixTransform(FLAGS_prefix_size));
2174 }
2175 options_.max_open_files = FLAGS_open_files;
2176 options_.statistics = dbstats;
2177 options_.env = db_stress_env;
2178 options_.use_fsync = FLAGS_use_fsync;
2179 options_.compaction_readahead_size = FLAGS_compaction_readahead_size;
2180 options_.allow_mmap_reads = FLAGS_mmap_read;
2181 options_.allow_mmap_writes = FLAGS_mmap_write;
2182 options_.use_direct_reads = FLAGS_use_direct_reads;
2183 options_.use_direct_io_for_flush_and_compaction =
2184 FLAGS_use_direct_io_for_flush_and_compaction;
2185 options_.recycle_log_file_num =
2186 static_cast<size_t>(FLAGS_recycle_log_file_num);
2187 options_.target_file_size_base = FLAGS_target_file_size_base;
2188 options_.target_file_size_multiplier = FLAGS_target_file_size_multiplier;
2189 options_.max_bytes_for_level_base = FLAGS_max_bytes_for_level_base;
2190 options_.max_bytes_for_level_multiplier =
2191 FLAGS_max_bytes_for_level_multiplier;
2192 options_.level0_stop_writes_trigger = FLAGS_level0_stop_writes_trigger;
2193 options_.level0_slowdown_writes_trigger =
2194 FLAGS_level0_slowdown_writes_trigger;
2195 options_.level0_file_num_compaction_trigger =
2196 FLAGS_level0_file_num_compaction_trigger;
2197 options_.compression = compression_type_e;
2198 options_.bottommost_compression = bottommost_compression_type_e;
2199 options_.compression_opts.max_dict_bytes = FLAGS_compression_max_dict_bytes;
2200 options_.compression_opts.zstd_max_train_bytes =
2201 FLAGS_compression_zstd_max_train_bytes;
2202 options_.compression_opts.parallel_threads =
2203 FLAGS_compression_parallel_threads;
2204 options_.compression_opts.max_dict_buffer_bytes =
2205 FLAGS_compression_max_dict_buffer_bytes;
2206 options_.create_if_missing = true;
2207 options_.max_manifest_file_size = FLAGS_max_manifest_file_size;
2208 options_.inplace_update_support = FLAGS_in_place_update;
2209 options_.max_subcompactions = static_cast<uint32_t>(FLAGS_subcompactions);
2210 options_.allow_concurrent_memtable_write =
2211 FLAGS_allow_concurrent_memtable_write;
2212 options_.periodic_compaction_seconds = FLAGS_periodic_compaction_seconds;
2213 options_.ttl = FLAGS_compaction_ttl;
2214 options_.enable_pipelined_write = FLAGS_enable_pipelined_write;
2215 options_.enable_write_thread_adaptive_yield =
2216 FLAGS_enable_write_thread_adaptive_yield;
2217 options_.compaction_options_universal.size_ratio =
2218 FLAGS_universal_size_ratio;
2219 options_.compaction_options_universal.min_merge_width =
2220 FLAGS_universal_min_merge_width;
2221 options_.compaction_options_universal.max_merge_width =
2222 FLAGS_universal_max_merge_width;
2223 options_.compaction_options_universal.max_size_amplification_percent =
2224 FLAGS_universal_max_size_amplification_percent;
2225 options_.atomic_flush = FLAGS_atomic_flush;
2226 options_.avoid_unnecessary_blocking_io =
2227 FLAGS_avoid_unnecessary_blocking_io;
2228 options_.write_dbid_to_manifest = FLAGS_write_dbid_to_manifest;
2229 options_.avoid_flush_during_recovery = FLAGS_avoid_flush_during_recovery;
2230 options_.max_write_batch_group_size_bytes =
2231 FLAGS_max_write_batch_group_size_bytes;
2232 options_.level_compaction_dynamic_level_bytes =
2233 FLAGS_level_compaction_dynamic_level_bytes;
2234 options_.file_checksum_gen_factory =
2235 GetFileChecksumImpl(FLAGS_file_checksum_impl);
2236 options_.track_and_verify_wals_in_manifest = true;
2237
2238 // Integrated BlobDB
2239 options_.enable_blob_files = FLAGS_enable_blob_files;
2240 options_.min_blob_size = FLAGS_min_blob_size;
2241 options_.blob_file_size = FLAGS_blob_file_size;
2242 options_.blob_compression_type =
2243 StringToCompressionType(FLAGS_blob_compression_type.c_str());
2244 options_.enable_blob_garbage_collection =
2245 FLAGS_enable_blob_garbage_collection;
2246 options_.blob_garbage_collection_age_cutoff =
2247 FLAGS_blob_garbage_collection_age_cutoff;
2248 } else {
2249 #ifdef ROCKSDB_LITE
2250 fprintf(stderr, "--options_file not supported in lite mode\n");
2251 exit(1);
2252 #else
2253 DBOptions db_options;
2254 std::vector<ColumnFamilyDescriptor> cf_descriptors;
2255 Status s = LoadOptionsFromFile(FLAGS_options_file, db_stress_env,
2256 &db_options, &cf_descriptors);
2257 db_options.env = new DbStressEnvWrapper(db_stress_env);
2258 if (!s.ok()) {
2259 fprintf(stderr, "Unable to load options file %s --- %s\n",
2260 FLAGS_options_file.c_str(), s.ToString().c_str());
2261 exit(1);
2262 }
2263 options_ = Options(db_options, cf_descriptors[0].options);
2264 #endif // ROCKSDB_LITE
2265 }
2266
2267 if (FLAGS_rate_limiter_bytes_per_sec > 0) {
2268 options_.rate_limiter.reset(NewGenericRateLimiter(
2269 FLAGS_rate_limiter_bytes_per_sec, 1000 /* refill_period_us */,
2270 10 /* fairness */,
2271 FLAGS_rate_limit_bg_reads ? RateLimiter::Mode::kReadsOnly
2272 : RateLimiter::Mode::kWritesOnly));
2273 if (FLAGS_rate_limit_bg_reads) {
2274 options_.new_table_reader_for_compaction_inputs = true;
2275 }
2276 }
2277 if (FLAGS_sst_file_manager_bytes_per_sec > 0 ||
2278 FLAGS_sst_file_manager_bytes_per_truncate > 0) {
2279 Status status;
2280 options_.sst_file_manager.reset(NewSstFileManager(
2281 db_stress_env, options_.info_log, "" /* trash_dir */,
2282 static_cast<int64_t>(FLAGS_sst_file_manager_bytes_per_sec),
2283 true /* delete_existing_trash */, &status,
2284 0.25 /* max_trash_db_ratio */,
2285 FLAGS_sst_file_manager_bytes_per_truncate));
2286 if (!status.ok()) {
2287 fprintf(stderr, "SstFileManager creation failed: %s\n",
2288 status.ToString().c_str());
2289 exit(1);
2290 }
2291 }
2292
2293 if (FLAGS_prefix_size == 0 && FLAGS_rep_factory == kHashSkipList) {
2294 fprintf(stderr,
2295 "prefeix_size cannot be zero if memtablerep == prefix_hash\n");
2296 exit(1);
2297 }
2298 if (FLAGS_prefix_size != 0 && FLAGS_rep_factory != kHashSkipList) {
2299 fprintf(stderr,
2300 "WARNING: prefix_size is non-zero but "
2301 "memtablerep != prefix_hash\n");
2302 }
2303 switch (FLAGS_rep_factory) {
2304 case kSkipList:
2305 // no need to do anything
2306 break;
2307 #ifndef ROCKSDB_LITE
2308 case kHashSkipList:
2309 options_.memtable_factory.reset(NewHashSkipListRepFactory(10000));
2310 break;
2311 case kVectorRep:
2312 options_.memtable_factory.reset(new VectorRepFactory());
2313 break;
2314 #else
2315 default:
2316 fprintf(stderr,
2317 "RocksdbLite only supports skip list mem table. Skip "
2318 "--rep_factory\n");
2319 #endif // ROCKSDB_LITE
2320 }
2321
2322 if (FLAGS_use_full_merge_v1) {
2323 options_.merge_operator = MergeOperators::CreateDeprecatedPutOperator();
2324 } else {
2325 options_.merge_operator = MergeOperators::CreatePutOperator();
2326 }
2327 if (FLAGS_enable_compaction_filter) {
2328 options_.compaction_filter_factory =
2329 std::make_shared<DbStressCompactionFilterFactory>();
2330 }
2331 options_.table_properties_collector_factories.emplace_back(
2332 std::make_shared<DbStressTablePropertiesCollectorFactory>());
2333
2334 options_.best_efforts_recovery = FLAGS_best_efforts_recovery;
2335 options_.paranoid_file_checks = FLAGS_paranoid_file_checks;
2336 options_.fail_if_options_file_error = FLAGS_fail_if_options_file_error;
2337
2338 if ((options_.enable_blob_files || options_.enable_blob_garbage_collection ||
2339 FLAGS_allow_setting_blob_options_dynamically) &&
2340 (FLAGS_use_merge || FLAGS_best_efforts_recovery)) {
2341 fprintf(stderr,
2342 "Integrated BlobDB is currently incompatible with Merge, "
2343 "and best-effort recovery\n");
2344 exit(1);
2345 }
2346
2347 if (options_.enable_blob_files) {
2348 fprintf(stdout,
2349 "Integrated BlobDB: blob files enabled, min blob size %" PRIu64
2350 ", blob file size %" PRIu64 ", blob compression type %s\n",
2351 options_.min_blob_size, options_.blob_file_size,
2352 CompressionTypeToString(options_.blob_compression_type).c_str());
2353 }
2354
2355 if (options_.enable_blob_garbage_collection) {
2356 fprintf(stdout, "Integrated BlobDB: blob GC enabled, cutoff %f\n",
2357 options_.blob_garbage_collection_age_cutoff);
2358 }
2359
2360 fprintf(stdout, "DB path: [%s]\n", FLAGS_db.c_str());
2361
2362 Status s;
2363
2364 if (FLAGS_user_timestamp_size > 0) {
2365 CheckAndSetOptionsForUserTimestamp();
2366 }
2367
2368 if (FLAGS_ttl == -1) {
2369 std::vector<std::string> existing_column_families;
2370 s = DB::ListColumnFamilies(DBOptions(options_), FLAGS_db,
2371 &existing_column_families); // ignore errors
2372 if (!s.ok()) {
2373 // DB doesn't exist
2374 assert(existing_column_families.empty());
2375 assert(column_family_names_.empty());
2376 column_family_names_.push_back(kDefaultColumnFamilyName);
2377 } else if (column_family_names_.empty()) {
2378 // this is the first call to the function Open()
2379 column_family_names_ = existing_column_families;
2380 } else {
2381 // this is a reopen. just assert that existing column_family_names are
2382 // equivalent to what we remember
2383 auto sorted_cfn = column_family_names_;
2384 std::sort(sorted_cfn.begin(), sorted_cfn.end());
2385 std::sort(existing_column_families.begin(),
2386 existing_column_families.end());
2387 if (sorted_cfn != existing_column_families) {
2388 fprintf(stderr, "Expected column families differ from the existing:\n");
2389 fprintf(stderr, "Expected: {");
2390 for (auto cf : sorted_cfn) {
2391 fprintf(stderr, "%s ", cf.c_str());
2392 }
2393 fprintf(stderr, "}\n");
2394 fprintf(stderr, "Existing: {");
2395 for (auto cf : existing_column_families) {
2396 fprintf(stderr, "%s ", cf.c_str());
2397 }
2398 fprintf(stderr, "}\n");
2399 }
2400 assert(sorted_cfn == existing_column_families);
2401 }
2402 std::vector<ColumnFamilyDescriptor> cf_descriptors;
2403 for (auto name : column_family_names_) {
2404 if (name != kDefaultColumnFamilyName) {
2405 new_column_family_name_ =
2406 std::max(new_column_family_name_.load(), std::stoi(name) + 1);
2407 }
2408 cf_descriptors.emplace_back(name, ColumnFamilyOptions(options_));
2409 }
2410 while (cf_descriptors.size() < (size_t)FLAGS_column_families) {
2411 std::string name = ToString(new_column_family_name_.load());
2412 new_column_family_name_++;
2413 cf_descriptors.emplace_back(name, ColumnFamilyOptions(options_));
2414 column_family_names_.push_back(name);
2415 }
2416 options_.listeners.clear();
2417 options_.listeners.emplace_back(
2418 new DbStressListener(FLAGS_db, options_.db_paths, cf_descriptors));
2419 options_.create_missing_column_families = true;
2420 if (!FLAGS_use_txn) {
2421 #ifndef NDEBUG
2422 // Determine whether we need to ingest file metadata write failures
2423 // during DB reopen. If it does, enable it.
2424 // Only ingest metadata error if it is reopening, as initial open
2425 // failure doesn't need to be handled.
2426 // TODO cover transaction DB is not covered in this fault test too.
2427 bool ingest_meta_error =
2428 FLAGS_open_metadata_write_fault_one_in &&
2429 fault_fs_guard
2430 ->FileExists(FLAGS_db + "/CURRENT", IOOptions(), nullptr)
2431 .ok();
2432 if (ingest_meta_error) {
2433 fault_fs_guard->EnableMetadataWriteErrorInjection();
2434 fault_fs_guard->SetRandomMetadataWriteError(
2435 FLAGS_open_metadata_write_fault_one_in);
2436 }
2437 while (true) {
2438 #endif // NDEBUG
2439 #ifndef ROCKSDB_LITE
2440 // StackableDB-based BlobDB
2441 if (FLAGS_use_blob_db) {
2442 blob_db::BlobDBOptions blob_db_options;
2443 blob_db_options.min_blob_size = FLAGS_blob_db_min_blob_size;
2444 blob_db_options.bytes_per_sync = FLAGS_blob_db_bytes_per_sync;
2445 blob_db_options.blob_file_size = FLAGS_blob_db_file_size;
2446 blob_db_options.enable_garbage_collection = FLAGS_blob_db_enable_gc;
2447 blob_db_options.garbage_collection_cutoff = FLAGS_blob_db_gc_cutoff;
2448
2449 blob_db::BlobDB* blob_db = nullptr;
2450 s = blob_db::BlobDB::Open(options_, blob_db_options, FLAGS_db,
2451 cf_descriptors, &column_families_,
2452 &blob_db);
2453 if (s.ok()) {
2454 db_ = blob_db;
2455 }
2456 } else
2457 #endif // !ROCKSDB_LITE
2458 {
2459 if (db_preload_finished_.load() && FLAGS_read_only) {
2460 s = DB::OpenForReadOnly(DBOptions(options_), FLAGS_db,
2461 cf_descriptors, &column_families_, &db_);
2462 } else {
2463 s = DB::Open(DBOptions(options_), FLAGS_db, cf_descriptors,
2464 &column_families_, &db_);
2465 }
2466 }
2467
2468 #ifndef NDEBUG
2469 if (ingest_meta_error) {
2470 fault_fs_guard->DisableMetadataWriteErrorInjection();
2471 if (s.ok()) {
2472 // Ingested errors might happen in background compactions. We
2473 // wait for all compactions to finish to make sure DB is in
2474 // clean state before executing queries.
2475 s = static_cast_with_check<DBImpl>(db_->GetRootDB())
2476 ->TEST_WaitForCompact(true);
2477 if (!s.ok()) {
2478 delete db_;
2479 }
2480 }
2481 if (!s.ok()) {
2482 // After failure to opening a DB due to IO error, retry should
2483 // successfully open the DB with correct data if no IO error shows
2484 // up.
2485 ingest_meta_error = false;
2486
2487 Random rand(static_cast<uint32_t>(FLAGS_seed));
2488 if (rand.OneIn(2)) {
2489 fault_fs_guard->DeleteFilesCreatedAfterLastDirSync(IOOptions(),
2490 nullptr);
2491 }
2492 if (rand.OneIn(3)) {
2493 fault_fs_guard->DropUnsyncedFileData();
2494 } else if (rand.OneIn(2)) {
2495 fault_fs_guard->DropRandomUnsyncedFileData(&rand);
2496 }
2497 continue;
2498 }
2499 }
2500 break;
2501 }
2502 #endif // NDEBUG
2503 } else {
2504 #ifndef ROCKSDB_LITE
2505 TransactionDBOptions txn_db_options;
2506 assert(FLAGS_txn_write_policy <= TxnDBWritePolicy::WRITE_UNPREPARED);
2507 txn_db_options.write_policy =
2508 static_cast<TxnDBWritePolicy>(FLAGS_txn_write_policy);
2509 if (FLAGS_unordered_write) {
2510 assert(txn_db_options.write_policy == TxnDBWritePolicy::WRITE_PREPARED);
2511 options_.unordered_write = true;
2512 options_.two_write_queues = true;
2513 txn_db_options.skip_concurrency_control = true;
2514 }
2515 s = TransactionDB::Open(options_, txn_db_options, FLAGS_db,
2516 cf_descriptors, &column_families_, &txn_db_);
2517 if (!s.ok()) {
2518 fprintf(stderr, "Error in opening the TransactionDB [%s]\n",
2519 s.ToString().c_str());
2520 fflush(stderr);
2521 }
2522 assert(s.ok());
2523 db_ = txn_db_;
2524 // after a crash, rollback to commit recovered transactions
2525 std::vector<Transaction*> trans;
2526 txn_db_->GetAllPreparedTransactions(&trans);
2527 Random rand(static_cast<uint32_t>(FLAGS_seed));
2528 for (auto txn : trans) {
2529 if (rand.OneIn(2)) {
2530 s = txn->Commit();
2531 assert(s.ok());
2532 } else {
2533 s = txn->Rollback();
2534 assert(s.ok());
2535 }
2536 delete txn;
2537 }
2538 trans.clear();
2539 txn_db_->GetAllPreparedTransactions(&trans);
2540 assert(trans.size() == 0);
2541 #endif
2542 }
2543 assert(!s.ok() || column_families_.size() ==
2544 static_cast<size_t>(FLAGS_column_families));
2545
2546 if (FLAGS_test_secondary) {
2547 #ifndef ROCKSDB_LITE
2548 secondaries_.resize(FLAGS_threads);
2549 std::fill(secondaries_.begin(), secondaries_.end(), nullptr);
2550 secondary_cfh_lists_.clear();
2551 secondary_cfh_lists_.resize(FLAGS_threads);
2552 Options tmp_opts;
2553 // TODO(yanqin) support max_open_files != -1 for secondary instance.
2554 tmp_opts.max_open_files = -1;
2555 tmp_opts.statistics = dbstats_secondaries;
2556 tmp_opts.env = db_stress_env;
2557 for (size_t i = 0; i != static_cast<size_t>(FLAGS_threads); ++i) {
2558 const std::string secondary_path =
2559 FLAGS_secondaries_base + "/" + std::to_string(i);
2560 s = DB::OpenAsSecondary(tmp_opts, FLAGS_db, secondary_path,
2561 cf_descriptors, &secondary_cfh_lists_[i],
2562 &secondaries_[i]);
2563 if (!s.ok()) {
2564 break;
2565 }
2566 }
2567 assert(s.ok());
2568 #else
2569 fprintf(stderr, "Secondary is not supported in RocksDBLite\n");
2570 exit(1);
2571 #endif
2572 }
2573 if (FLAGS_continuous_verification_interval > 0 && !cmp_db_) {
2574 Options tmp_opts;
2575 // TODO(yanqin) support max_open_files != -1 for secondary instance.
2576 tmp_opts.max_open_files = -1;
2577 tmp_opts.env = db_stress_env;
2578 std::string secondary_path = FLAGS_secondaries_base + "/cmp_database";
2579 s = DB::OpenAsSecondary(tmp_opts, FLAGS_db, secondary_path,
2580 cf_descriptors, &cmp_cfhs_, &cmp_db_);
2581 assert(!s.ok() ||
2582 cmp_cfhs_.size() == static_cast<size_t>(FLAGS_column_families));
2583 }
2584 } else {
2585 #ifndef ROCKSDB_LITE
2586 DBWithTTL* db_with_ttl;
2587 s = DBWithTTL::Open(options_, FLAGS_db, &db_with_ttl, FLAGS_ttl);
2588 db_ = db_with_ttl;
2589 if (FLAGS_test_secondary) {
2590 secondaries_.resize(FLAGS_threads);
2591 std::fill(secondaries_.begin(), secondaries_.end(), nullptr);
2592 Options tmp_opts;
2593 tmp_opts.env = options_.env;
2594 // TODO(yanqin) support max_open_files != -1 for secondary instance.
2595 tmp_opts.max_open_files = -1;
2596 for (size_t i = 0; i != static_cast<size_t>(FLAGS_threads); ++i) {
2597 const std::string secondary_path =
2598 FLAGS_secondaries_base + "/" + std::to_string(i);
2599 s = DB::OpenAsSecondary(tmp_opts, FLAGS_db, secondary_path,
2600 &secondaries_[i]);
2601 if (!s.ok()) {
2602 break;
2603 }
2604 }
2605 }
2606 #else
2607 fprintf(stderr, "TTL is not supported in RocksDBLite\n");
2608 exit(1);
2609 #endif
2610 }
2611 if (!s.ok()) {
2612 fprintf(stderr, "open error: %s\n", s.ToString().c_str());
2613 exit(1);
2614 }
2615 }
2616
Reopen(ThreadState * thread)2617 void StressTest::Reopen(ThreadState* thread) {
2618 #ifndef ROCKSDB_LITE
2619 // BG jobs in WritePrepared must be canceled first because i) they can access
2620 // the db via a callbac ii) they hold on to a snapshot and the upcoming
2621 // ::Close would complain about it.
2622 const bool write_prepared = FLAGS_use_txn && FLAGS_txn_write_policy != 0;
2623 bool bg_canceled = false;
2624 if (write_prepared || thread->rand.OneIn(2)) {
2625 const bool wait =
2626 write_prepared || static_cast<bool>(thread->rand.OneIn(2));
2627 CancelAllBackgroundWork(db_, wait);
2628 bg_canceled = wait;
2629 }
2630 assert(!write_prepared || bg_canceled);
2631 (void) bg_canceled;
2632 #else
2633 (void) thread;
2634 #endif
2635
2636 for (auto cf : column_families_) {
2637 delete cf;
2638 }
2639 column_families_.clear();
2640
2641 #ifndef ROCKSDB_LITE
2642 if (thread->rand.OneIn(2)) {
2643 Status s = db_->Close();
2644 if (!s.ok()) {
2645 fprintf(stderr, "Non-ok close status: %s\n", s.ToString().c_str());
2646 fflush(stderr);
2647 }
2648 assert(s.ok());
2649 }
2650 #endif
2651 delete db_;
2652 db_ = nullptr;
2653 #ifndef ROCKSDB_LITE
2654 txn_db_ = nullptr;
2655 #endif
2656
2657 assert(secondaries_.size() == secondary_cfh_lists_.size());
2658 size_t n = secondaries_.size();
2659 for (size_t i = 0; i != n; ++i) {
2660 for (auto* cf : secondary_cfh_lists_[i]) {
2661 delete cf;
2662 }
2663 secondary_cfh_lists_[i].clear();
2664 delete secondaries_[i];
2665 }
2666 secondaries_.clear();
2667
2668 num_times_reopened_++;
2669 auto now = clock_->NowMicros();
2670 fprintf(stdout, "%s Reopening database for the %dth time\n",
2671 clock_->TimeToString(now / 1000000).c_str(), num_times_reopened_);
2672 Open();
2673 }
2674
CheckAndSetOptionsForUserTimestamp()2675 void StressTest::CheckAndSetOptionsForUserTimestamp() {
2676 assert(FLAGS_user_timestamp_size > 0);
2677 const Comparator* const cmp = test::ComparatorWithU64Ts();
2678 assert(cmp);
2679 if (FLAGS_user_timestamp_size != cmp->timestamp_size()) {
2680 fprintf(stderr,
2681 "Only -user_timestamp_size=%d is supported in stress test.\n",
2682 static_cast<int>(cmp->timestamp_size()));
2683 exit(1);
2684 }
2685 if (FLAGS_nooverwritepercent > 0) {
2686 fprintf(stderr,
2687 "-nooverwritepercent must be 0 because SingleDelete must be "
2688 "disabled.\n");
2689 exit(1);
2690 }
2691 if (FLAGS_use_merge || FLAGS_use_full_merge_v1) {
2692 fprintf(stderr, "Merge does not support timestamp yet.\n");
2693 exit(1);
2694 }
2695 if (FLAGS_delrangepercent > 0) {
2696 fprintf(stderr, "DeleteRange does not support timestamp yet.\n");
2697 exit(1);
2698 }
2699 if (FLAGS_use_txn) {
2700 fprintf(stderr, "TransactionDB does not support timestamp yet.\n");
2701 exit(1);
2702 }
2703 if (FLAGS_read_only) {
2704 fprintf(stderr, "When opened as read-only, timestamp not supported.\n");
2705 exit(1);
2706 }
2707 if (FLAGS_test_secondary || FLAGS_secondary_catch_up_one_in > 0 ||
2708 FLAGS_continuous_verification_interval > 0) {
2709 fprintf(stderr, "Secondary instance does not support timestamp.\n");
2710 exit(1);
2711 }
2712 if (FLAGS_checkpoint_one_in > 0) {
2713 fprintf(stderr,
2714 "-checkpoint_one_in=%d requires "
2715 "DBImplReadOnly, which is not supported with timestamp\n",
2716 FLAGS_checkpoint_one_in);
2717 exit(1);
2718 }
2719 #ifndef ROCKSDB_LITE
2720 if (FLAGS_enable_blob_files || FLAGS_use_blob_db) {
2721 fprintf(stderr, "BlobDB not supported with timestamp.\n");
2722 exit(1);
2723 }
2724 #endif // !ROCKSDB_LITE
2725 if (FLAGS_enable_compaction_filter) {
2726 fprintf(stderr, "CompactionFilter not supported with timestamp.\n");
2727 exit(1);
2728 }
2729 if (FLAGS_test_cf_consistency || FLAGS_test_batches_snapshots) {
2730 fprintf(stderr,
2731 "Due to per-key ts-seq ordering constraint, only the (default) "
2732 "non-batched test is supported with timestamp.\n");
2733 exit(1);
2734 }
2735 if (FLAGS_ingest_external_file_one_in > 0) {
2736 fprintf(stderr, "Bulk loading may not support timestamp yet.\n");
2737 exit(1);
2738 }
2739 options_.comparator = cmp;
2740 }
2741 } // namespace ROCKSDB_NAMESPACE
2742 #endif // GFLAGS
2743