1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 //
10
11 #ifdef GFLAGS
12 #include "db_stress_tool/db_stress_common.h"
13 #include "db_stress_tool/db_stress_compaction_filter.h"
14 #include "db_stress_tool/db_stress_driver.h"
15 #include "db_stress_tool/db_stress_table_properties_collector.h"
16 #include "rocksdb/convenience.h"
17 #include "rocksdb/filter_policy.h"
18 #include "rocksdb/secondary_cache.h"
19 #include "rocksdb/sst_file_manager.h"
20 #include "rocksdb/types.h"
21 #include "rocksdb/utilities/object_registry.h"
22 #include "util/cast_util.h"
23 #include "utilities/backupable/backupable_db_impl.h"
24 #include "utilities/fault_injection_fs.h"
25
26 namespace ROCKSDB_NAMESPACE {
27
28 namespace {
29
CreateFilterPolicy()30 std::shared_ptr<const FilterPolicy> CreateFilterPolicy() {
31 if (FLAGS_bloom_bits < 0) {
32 return BlockBasedTableOptions().filter_policy;
33 }
34 const FilterPolicy* new_policy;
35 if (FLAGS_use_block_based_filter) {
36 if (FLAGS_ribbon_starting_level < 999) {
37 fprintf(
38 stderr,
39 "Cannot combine use_block_based_filter and ribbon_starting_level\n");
40 exit(1);
41 } else {
42 new_policy = NewBloomFilterPolicy(FLAGS_bloom_bits, true);
43 }
44 } else if (FLAGS_ribbon_starting_level >= 999) {
45 // Use Bloom API
46 new_policy = NewBloomFilterPolicy(FLAGS_bloom_bits, false);
47 } else {
48 new_policy = NewRibbonFilterPolicy(
49 FLAGS_bloom_bits, /* bloom_before_level */ FLAGS_ribbon_starting_level);
50 }
51 return std::shared_ptr<const FilterPolicy>(new_policy);
52 }
53
54 } // namespace
55
StressTest()56 StressTest::StressTest()
57 : cache_(NewCache(FLAGS_cache_size, FLAGS_cache_numshardbits)),
58 compressed_cache_(NewLRUCache(FLAGS_compressed_cache_size)),
59 filter_policy_(CreateFilterPolicy()),
60 db_(nullptr),
61 #ifndef ROCKSDB_LITE
62 txn_db_(nullptr),
63 #endif
64 clock_(db_stress_env->GetSystemClock().get()),
65 new_column_family_name_(1),
66 num_times_reopened_(0),
67 db_preload_finished_(false),
68 cmp_db_(nullptr),
69 is_db_stopped_(false) {
70 if (FLAGS_destroy_db_initially) {
71 std::vector<std::string> files;
72 db_stress_env->GetChildren(FLAGS_db, &files);
73 for (unsigned int i = 0; i < files.size(); i++) {
74 if (Slice(files[i]).starts_with("heap-")) {
75 db_stress_env->DeleteFile(FLAGS_db + "/" + files[i]);
76 }
77 }
78
79 Options options;
80 options.env = db_stress_env;
81 // Remove files without preserving manfiest files
82 #ifndef ROCKSDB_LITE
83 const Status s = !FLAGS_use_blob_db
84 ? DestroyDB(FLAGS_db, options)
85 : blob_db::DestroyBlobDB(FLAGS_db, options,
86 blob_db::BlobDBOptions());
87 #else
88 const Status s = DestroyDB(FLAGS_db, options);
89 #endif // !ROCKSDB_LITE
90
91 if (!s.ok()) {
92 fprintf(stderr, "Cannot destroy original db: %s\n", s.ToString().c_str());
93 exit(1);
94 }
95 }
96 }
97
~StressTest()98 StressTest::~StressTest() {
99 for (auto cf : column_families_) {
100 delete cf;
101 }
102 column_families_.clear();
103 delete db_;
104
105 assert(secondaries_.size() == secondary_cfh_lists_.size());
106 size_t n = secondaries_.size();
107 for (size_t i = 0; i != n; ++i) {
108 for (auto* cf : secondary_cfh_lists_[i]) {
109 delete cf;
110 }
111 secondary_cfh_lists_[i].clear();
112 delete secondaries_[i];
113 }
114 secondaries_.clear();
115
116 for (auto* cf : cmp_cfhs_) {
117 delete cf;
118 }
119 cmp_cfhs_.clear();
120 delete cmp_db_;
121 }
122
NewCache(size_t capacity,int32_t num_shard_bits)123 std::shared_ptr<Cache> StressTest::NewCache(size_t capacity,
124 int32_t num_shard_bits) {
125 ConfigOptions config_options;
126 if (capacity <= 0) {
127 return nullptr;
128 }
129 if (FLAGS_use_clock_cache) {
130 auto cache = NewClockCache((size_t)capacity);
131 if (!cache) {
132 fprintf(stderr, "Clock cache not supported.");
133 exit(1);
134 }
135 return cache;
136 } else {
137 LRUCacheOptions opts;
138 opts.capacity = capacity;
139 opts.num_shard_bits = num_shard_bits;
140 #ifndef ROCKSDB_LITE
141 std::shared_ptr<SecondaryCache> secondary_cache;
142 if (!FLAGS_secondary_cache_uri.empty()) {
143 Status s = SecondaryCache::CreateFromString(
144 config_options, FLAGS_secondary_cache_uri, &secondary_cache);
145 if (secondary_cache == nullptr) {
146 fprintf(stderr,
147 "No secondary cache registered matching string: %s status=%s\n",
148 FLAGS_secondary_cache_uri.c_str(), s.ToString().c_str());
149 exit(1);
150 }
151 opts.secondary_cache = secondary_cache;
152 }
153 #endif
154 return NewLRUCache(opts);
155 }
156 }
157
GetBlobCompressionTags()158 std::vector<std::string> StressTest::GetBlobCompressionTags() {
159 std::vector<std::string> compression_tags{"kNoCompression"};
160
161 if (Snappy_Supported()) {
162 compression_tags.emplace_back("kSnappyCompression");
163 }
164 if (LZ4_Supported()) {
165 compression_tags.emplace_back("kLZ4Compression");
166 }
167 if (ZSTD_Supported()) {
168 compression_tags.emplace_back("kZSTD");
169 }
170
171 return compression_tags;
172 }
173
BuildOptionsTable()174 bool StressTest::BuildOptionsTable() {
175 if (FLAGS_set_options_one_in <= 0) {
176 return true;
177 }
178
179 std::unordered_map<std::string, std::vector<std::string>> options_tbl = {
180 {"write_buffer_size",
181 {ToString(options_.write_buffer_size),
182 ToString(options_.write_buffer_size * 2),
183 ToString(options_.write_buffer_size * 4)}},
184 {"max_write_buffer_number",
185 {ToString(options_.max_write_buffer_number),
186 ToString(options_.max_write_buffer_number * 2),
187 ToString(options_.max_write_buffer_number * 4)}},
188 {"arena_block_size",
189 {
190 ToString(options_.arena_block_size),
191 ToString(options_.write_buffer_size / 4),
192 ToString(options_.write_buffer_size / 8),
193 }},
194 {"memtable_huge_page_size", {"0", ToString(2 * 1024 * 1024)}},
195 {"max_successive_merges", {"0", "2", "4"}},
196 {"inplace_update_num_locks", {"100", "200", "300"}},
197 // TODO(ljin): enable test for this option
198 // {"disable_auto_compactions", {"100", "200", "300"}},
199 {"soft_rate_limit", {"0", "0.5", "0.9"}},
200 {"hard_rate_limit", {"0", "1.1", "2.0"}},
201 {"level0_file_num_compaction_trigger",
202 {
203 ToString(options_.level0_file_num_compaction_trigger),
204 ToString(options_.level0_file_num_compaction_trigger + 2),
205 ToString(options_.level0_file_num_compaction_trigger + 4),
206 }},
207 {"level0_slowdown_writes_trigger",
208 {
209 ToString(options_.level0_slowdown_writes_trigger),
210 ToString(options_.level0_slowdown_writes_trigger + 2),
211 ToString(options_.level0_slowdown_writes_trigger + 4),
212 }},
213 {"level0_stop_writes_trigger",
214 {
215 ToString(options_.level0_stop_writes_trigger),
216 ToString(options_.level0_stop_writes_trigger + 2),
217 ToString(options_.level0_stop_writes_trigger + 4),
218 }},
219 {"max_compaction_bytes",
220 {
221 ToString(options_.target_file_size_base * 5),
222 ToString(options_.target_file_size_base * 15),
223 ToString(options_.target_file_size_base * 100),
224 }},
225 {"target_file_size_base",
226 {
227 ToString(options_.target_file_size_base),
228 ToString(options_.target_file_size_base * 2),
229 ToString(options_.target_file_size_base * 4),
230 }},
231 {"target_file_size_multiplier",
232 {
233 ToString(options_.target_file_size_multiplier),
234 "1",
235 "2",
236 }},
237 {"max_bytes_for_level_base",
238 {
239 ToString(options_.max_bytes_for_level_base / 2),
240 ToString(options_.max_bytes_for_level_base),
241 ToString(options_.max_bytes_for_level_base * 2),
242 }},
243 {"max_bytes_for_level_multiplier",
244 {
245 ToString(options_.max_bytes_for_level_multiplier),
246 "1",
247 "2",
248 }},
249 {"max_sequential_skip_in_iterations", {"4", "8", "12"}},
250 };
251
252 if (FLAGS_allow_setting_blob_options_dynamically) {
253 options_tbl.emplace("enable_blob_files",
254 std::vector<std::string>{"false", "true"});
255 options_tbl.emplace("min_blob_size",
256 std::vector<std::string>{"0", "8", "16"});
257 options_tbl.emplace("blob_file_size",
258 std::vector<std::string>{"1M", "16M", "256M", "1G"});
259 options_tbl.emplace("blob_compression_type", GetBlobCompressionTags());
260 options_tbl.emplace("enable_blob_garbage_collection",
261 std::vector<std::string>{"false", "true"});
262 options_tbl.emplace(
263 "blob_garbage_collection_age_cutoff",
264 std::vector<std::string>{"0.0", "0.25", "0.5", "0.75", "1.0"});
265 options_tbl.emplace("blob_garbage_collection_force_threshold",
266 std::vector<std::string>{"0.5", "0.75", "1.0"});
267 }
268
269 options_table_ = std::move(options_tbl);
270
271 for (const auto& iter : options_table_) {
272 options_index_.push_back(iter.first);
273 }
274 return true;
275 }
276
InitDb()277 void StressTest::InitDb() {
278 uint64_t now = clock_->NowMicros();
279 fprintf(stdout, "%s Initializing db_stress\n",
280 clock_->TimeToString(now / 1000000).c_str());
281 PrintEnv();
282 Open();
283 BuildOptionsTable();
284 }
285
FinishInitDb(SharedState * shared)286 void StressTest::FinishInitDb(SharedState* shared) {
287 if (FLAGS_read_only) {
288 uint64_t now = clock_->NowMicros();
289 fprintf(stdout, "%s Preloading db with %" PRIu64 " KVs\n",
290 clock_->TimeToString(now / 1000000).c_str(), FLAGS_max_key);
291 PreloadDbAndReopenAsReadOnly(FLAGS_max_key, shared);
292 }
293 if (FLAGS_enable_compaction_filter) {
294 auto* compaction_filter_factory =
295 reinterpret_cast<DbStressCompactionFilterFactory*>(
296 options_.compaction_filter_factory.get());
297 assert(compaction_filter_factory);
298 compaction_filter_factory->SetSharedState(shared);
299 fprintf(stdout, "Compaction filter factory: %s\n",
300 compaction_filter_factory->Name());
301 }
302 }
303
VerifySecondaries()304 bool StressTest::VerifySecondaries() {
305 #ifndef ROCKSDB_LITE
306 if (FLAGS_test_secondary) {
307 uint64_t now = clock_->NowMicros();
308 fprintf(stdout, "%s Start to verify secondaries against primary\n",
309 clock_->TimeToString(static_cast<uint64_t>(now) / 1000000).c_str());
310 }
311 for (size_t k = 0; k != secondaries_.size(); ++k) {
312 Status s = secondaries_[k]->TryCatchUpWithPrimary();
313 if (!s.ok()) {
314 fprintf(stderr, "Secondary failed to catch up with primary\n");
315 return false;
316 }
317 ReadOptions ropts;
318 ropts.total_order_seek = true;
319 // Verify only the default column family since the primary may have
320 // dropped other column families after most recent reopen.
321 std::unique_ptr<Iterator> iter1(db_->NewIterator(ropts));
322 std::unique_ptr<Iterator> iter2(secondaries_[k]->NewIterator(ropts));
323 for (iter1->SeekToFirst(), iter2->SeekToFirst();
324 iter1->Valid() && iter2->Valid(); iter1->Next(), iter2->Next()) {
325 if (iter1->key().compare(iter2->key()) != 0 ||
326 iter1->value().compare(iter2->value())) {
327 fprintf(stderr,
328 "Secondary %d contains different data from "
329 "primary.\nPrimary: %s : %s\nSecondary: %s : %s\n",
330 static_cast<int>(k),
331 iter1->key().ToString(/*hex=*/true).c_str(),
332 iter1->value().ToString(/*hex=*/true).c_str(),
333 iter2->key().ToString(/*hex=*/true).c_str(),
334 iter2->value().ToString(/*hex=*/true).c_str());
335 return false;
336 }
337 }
338 if (iter1->Valid() && !iter2->Valid()) {
339 fprintf(stderr,
340 "Secondary %d record count is smaller than that of primary\n",
341 static_cast<int>(k));
342 return false;
343 } else if (!iter1->Valid() && iter2->Valid()) {
344 fprintf(stderr,
345 "Secondary %d record count is larger than that of primary\n",
346 static_cast<int>(k));
347 return false;
348 }
349 }
350 if (FLAGS_test_secondary) {
351 uint64_t now = clock_->NowMicros();
352 fprintf(stdout, "%s Verification of secondaries succeeded\n",
353 clock_->TimeToString(static_cast<uint64_t>(now) / 1000000).c_str());
354 }
355 #endif // ROCKSDB_LITE
356 return true;
357 }
358
AssertSame(DB * db,ColumnFamilyHandle * cf,ThreadState::SnapshotState & snap_state)359 Status StressTest::AssertSame(DB* db, ColumnFamilyHandle* cf,
360 ThreadState::SnapshotState& snap_state) {
361 Status s;
362 if (cf->GetName() != snap_state.cf_at_name) {
363 return s;
364 }
365 ReadOptions ropt;
366 ropt.snapshot = snap_state.snapshot;
367 Slice ts;
368 if (!snap_state.timestamp.empty()) {
369 ts = snap_state.timestamp;
370 ropt.timestamp = &ts;
371 }
372 PinnableSlice exp_v(&snap_state.value);
373 exp_v.PinSelf();
374 PinnableSlice v;
375 s = db->Get(ropt, cf, snap_state.key, &v);
376 if (!s.ok() && !s.IsNotFound()) {
377 return s;
378 }
379 if (snap_state.status != s) {
380 return Status::Corruption(
381 "The snapshot gave inconsistent results for key " +
382 ToString(Hash(snap_state.key.c_str(), snap_state.key.size(), 0)) +
383 " in cf " + cf->GetName() + ": (" + snap_state.status.ToString() +
384 ") vs. (" + s.ToString() + ")");
385 }
386 if (s.ok()) {
387 if (exp_v != v) {
388 return Status::Corruption("The snapshot gave inconsistent values: (" +
389 exp_v.ToString() + ") vs. (" + v.ToString() +
390 ")");
391 }
392 }
393 if (snap_state.key_vec != nullptr) {
394 // When `prefix_extractor` is set, seeking to beginning and scanning
395 // across prefixes are only supported with `total_order_seek` set.
396 ropt.total_order_seek = true;
397 std::unique_ptr<Iterator> iterator(db->NewIterator(ropt));
398 std::unique_ptr<std::vector<bool>> tmp_bitvec(
399 new std::vector<bool>(FLAGS_max_key));
400 for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
401 uint64_t key_val;
402 if (GetIntVal(iterator->key().ToString(), &key_val)) {
403 (*tmp_bitvec.get())[key_val] = true;
404 }
405 }
406 if (!std::equal(snap_state.key_vec->begin(), snap_state.key_vec->end(),
407 tmp_bitvec.get()->begin())) {
408 return Status::Corruption("Found inconsistent keys at this snapshot");
409 }
410 }
411 return Status::OK();
412 }
413
VerificationAbort(SharedState * shared,std::string msg,Status s) const414 void StressTest::VerificationAbort(SharedState* shared, std::string msg,
415 Status s) const {
416 fprintf(stderr, "Verification failed: %s. Status is %s\n", msg.c_str(),
417 s.ToString().c_str());
418 shared->SetVerificationFailure();
419 }
420
VerificationAbort(SharedState * shared,std::string msg,int cf,int64_t key) const421 void StressTest::VerificationAbort(SharedState* shared, std::string msg, int cf,
422 int64_t key) const {
423 auto key_str = Key(key);
424 Slice key_slice = key_str;
425 fprintf(stderr,
426 "Verification failed for column family %d key %s (%" PRIi64 "): %s\n",
427 cf, key_slice.ToString(true).c_str(), key, msg.c_str());
428 shared->SetVerificationFailure();
429 }
430
PrintStatistics()431 void StressTest::PrintStatistics() {
432 if (dbstats) {
433 fprintf(stdout, "STATISTICS:\n%s\n", dbstats->ToString().c_str());
434 }
435 if (dbstats_secondaries) {
436 fprintf(stdout, "Secondary instances STATISTICS:\n%s\n",
437 dbstats_secondaries->ToString().c_str());
438 }
439 }
440
441 // Currently PreloadDb has to be single-threaded.
PreloadDbAndReopenAsReadOnly(int64_t number_of_keys,SharedState * shared)442 void StressTest::PreloadDbAndReopenAsReadOnly(int64_t number_of_keys,
443 SharedState* shared) {
444 WriteOptions write_opts;
445 write_opts.disableWAL = FLAGS_disable_wal;
446 if (FLAGS_sync) {
447 write_opts.sync = true;
448 }
449 char value[100];
450 int cf_idx = 0;
451 Status s;
452 for (auto cfh : column_families_) {
453 for (int64_t k = 0; k != number_of_keys; ++k) {
454 std::string key_str = Key(k);
455 Slice key = key_str;
456 size_t sz = GenerateValue(0 /*value_base*/, value, sizeof(value));
457 Slice v(value, sz);
458 shared->Put(cf_idx, k, 0, true /* pending */);
459
460 if (FLAGS_use_merge) {
461 if (!FLAGS_use_txn) {
462 s = db_->Merge(write_opts, cfh, key, v);
463 } else {
464 #ifndef ROCKSDB_LITE
465 Transaction* txn;
466 s = NewTxn(write_opts, &txn);
467 if (s.ok()) {
468 s = txn->Merge(cfh, key, v);
469 if (s.ok()) {
470 s = CommitTxn(txn);
471 }
472 }
473 #endif
474 }
475 } else {
476 if (!FLAGS_use_txn) {
477 std::string ts_str;
478 Slice ts;
479 if (FLAGS_user_timestamp_size > 0) {
480 ts_str = NowNanosStr();
481 ts = ts_str;
482 write_opts.timestamp = &ts;
483 }
484 s = db_->Put(write_opts, cfh, key, v);
485 } else {
486 #ifndef ROCKSDB_LITE
487 Transaction* txn;
488 s = NewTxn(write_opts, &txn);
489 if (s.ok()) {
490 s = txn->Put(cfh, key, v);
491 if (s.ok()) {
492 s = CommitTxn(txn);
493 }
494 }
495 #endif
496 }
497 }
498
499 shared->Put(cf_idx, k, 0, false /* pending */);
500 if (!s.ok()) {
501 break;
502 }
503 }
504 if (!s.ok()) {
505 break;
506 }
507 ++cf_idx;
508 }
509 if (s.ok()) {
510 s = db_->Flush(FlushOptions(), column_families_);
511 }
512 if (s.ok()) {
513 for (auto cf : column_families_) {
514 delete cf;
515 }
516 column_families_.clear();
517 delete db_;
518 db_ = nullptr;
519 #ifndef ROCKSDB_LITE
520 txn_db_ = nullptr;
521 #endif
522
523 db_preload_finished_.store(true);
524 auto now = clock_->NowMicros();
525 fprintf(stdout, "%s Reopening database in read-only\n",
526 clock_->TimeToString(now / 1000000).c_str());
527 // Reopen as read-only, can ignore all options related to updates
528 Open();
529 } else {
530 fprintf(stderr, "Failed to preload db");
531 exit(1);
532 }
533 }
534
SetOptions(ThreadState * thread)535 Status StressTest::SetOptions(ThreadState* thread) {
536 assert(FLAGS_set_options_one_in > 0);
537 std::unordered_map<std::string, std::string> opts;
538 std::string name =
539 options_index_[thread->rand.Next() % options_index_.size()];
540 int value_idx = thread->rand.Next() % options_table_[name].size();
541 if (name == "soft_rate_limit" || name == "hard_rate_limit") {
542 opts["soft_rate_limit"] = options_table_["soft_rate_limit"][value_idx];
543 opts["hard_rate_limit"] = options_table_["hard_rate_limit"][value_idx];
544 } else if (name == "level0_file_num_compaction_trigger" ||
545 name == "level0_slowdown_writes_trigger" ||
546 name == "level0_stop_writes_trigger") {
547 opts["level0_file_num_compaction_trigger"] =
548 options_table_["level0_file_num_compaction_trigger"][value_idx];
549 opts["level0_slowdown_writes_trigger"] =
550 options_table_["level0_slowdown_writes_trigger"][value_idx];
551 opts["level0_stop_writes_trigger"] =
552 options_table_["level0_stop_writes_trigger"][value_idx];
553 } else {
554 opts[name] = options_table_[name][value_idx];
555 }
556
557 int rand_cf_idx = thread->rand.Next() % FLAGS_column_families;
558 auto cfh = column_families_[rand_cf_idx];
559 return db_->SetOptions(cfh, opts);
560 }
561
562 #ifndef ROCKSDB_LITE
NewTxn(WriteOptions & write_opts,Transaction ** txn)563 Status StressTest::NewTxn(WriteOptions& write_opts, Transaction** txn) {
564 if (!FLAGS_use_txn) {
565 return Status::InvalidArgument("NewTxn when FLAGS_use_txn is not set");
566 }
567 static std::atomic<uint64_t> txn_id = {0};
568 TransactionOptions txn_options;
569 txn_options.lock_timeout = 600000; // 10 min
570 txn_options.deadlock_detect = true;
571 *txn = txn_db_->BeginTransaction(write_opts, txn_options);
572 auto istr = std::to_string(txn_id.fetch_add(1));
573 Status s = (*txn)->SetName("xid" + istr);
574 return s;
575 }
576
CommitTxn(Transaction * txn)577 Status StressTest::CommitTxn(Transaction* txn) {
578 if (!FLAGS_use_txn) {
579 return Status::InvalidArgument("CommitTxn when FLAGS_use_txn is not set");
580 }
581 Status s = txn->Prepare();
582 if (s.ok()) {
583 s = txn->Commit();
584 }
585 delete txn;
586 return s;
587 }
588
RollbackTxn(Transaction * txn)589 Status StressTest::RollbackTxn(Transaction* txn) {
590 if (!FLAGS_use_txn) {
591 return Status::InvalidArgument(
592 "RollbackTxn when FLAGS_use_txn is not"
593 " set");
594 }
595 Status s = txn->Rollback();
596 delete txn;
597 return s;
598 }
599 #endif
600
OperateDb(ThreadState * thread)601 void StressTest::OperateDb(ThreadState* thread) {
602 ReadOptions read_opts(FLAGS_verify_checksum, true);
603 WriteOptions write_opts;
604 auto shared = thread->shared;
605 char value[100];
606 std::string from_db;
607 if (FLAGS_sync) {
608 write_opts.sync = true;
609 }
610 write_opts.disableWAL = FLAGS_disable_wal;
611 const int prefixBound = static_cast<int>(FLAGS_readpercent) +
612 static_cast<int>(FLAGS_prefixpercent);
613 const int writeBound = prefixBound + static_cast<int>(FLAGS_writepercent);
614 const int delBound = writeBound + static_cast<int>(FLAGS_delpercent);
615 const int delRangeBound = delBound + static_cast<int>(FLAGS_delrangepercent);
616 const uint64_t ops_per_open = FLAGS_ops_per_thread / (FLAGS_reopen + 1);
617
618 #ifndef NDEBUG
619 if (FLAGS_read_fault_one_in) {
620 fault_fs_guard->SetThreadLocalReadErrorContext(thread->shared->GetSeed(),
621 FLAGS_read_fault_one_in);
622 }
623 if (FLAGS_write_fault_one_in) {
624 IOStatus error_msg;
625 if (FLAGS_injest_error_severity <= 1 || FLAGS_injest_error_severity > 2) {
626 error_msg = IOStatus::IOError("Retryable IO Error");
627 error_msg.SetRetryable(true);
628 } else if (FLAGS_injest_error_severity == 2) {
629 // Ingest the fatal error
630 error_msg = IOStatus::IOError("Fatal IO Error");
631 error_msg.SetDataLoss(true);
632 }
633 std::vector<FileType> types = {FileType::kTableFile,
634 FileType::kDescriptorFile,
635 FileType::kCurrentFile};
636 fault_fs_guard->SetRandomWriteError(
637 thread->shared->GetSeed(), FLAGS_write_fault_one_in, error_msg,
638 /*inject_for_all_file_types=*/false, types);
639 }
640 #endif // NDEBUG
641 thread->stats.Start();
642 for (int open_cnt = 0; open_cnt <= FLAGS_reopen; ++open_cnt) {
643 if (thread->shared->HasVerificationFailedYet() ||
644 thread->shared->ShouldStopTest()) {
645 break;
646 }
647 if (open_cnt != 0) {
648 thread->stats.FinishedSingleOp();
649 MutexLock l(thread->shared->GetMutex());
650 while (!thread->snapshot_queue.empty()) {
651 db_->ReleaseSnapshot(thread->snapshot_queue.front().second.snapshot);
652 delete thread->snapshot_queue.front().second.key_vec;
653 thread->snapshot_queue.pop();
654 }
655 thread->shared->IncVotedReopen();
656 if (thread->shared->AllVotedReopen()) {
657 thread->shared->GetStressTest()->Reopen(thread);
658 thread->shared->GetCondVar()->SignalAll();
659 } else {
660 thread->shared->GetCondVar()->Wait();
661 }
662 // Commenting this out as we don't want to reset stats on each open.
663 // thread->stats.Start();
664 }
665
666 for (uint64_t i = 0; i < ops_per_open; i++) {
667 if (thread->shared->HasVerificationFailedYet()) {
668 break;
669 }
670
671 // Change Options
672 if (thread->rand.OneInOpt(FLAGS_set_options_one_in)) {
673 SetOptions(thread);
674 }
675
676 if (thread->rand.OneInOpt(FLAGS_set_in_place_one_in)) {
677 options_.inplace_update_support ^= options_.inplace_update_support;
678 }
679
680 if (thread->tid == 0 && FLAGS_verify_db_one_in > 0 &&
681 thread->rand.OneIn(FLAGS_verify_db_one_in)) {
682 ContinuouslyVerifyDb(thread);
683 if (thread->shared->ShouldStopTest()) {
684 break;
685 }
686 }
687
688 MaybeClearOneColumnFamily(thread);
689
690 if (thread->rand.OneInOpt(FLAGS_sync_wal_one_in)) {
691 Status s = db_->SyncWAL();
692 if (!s.ok() && !s.IsNotSupported()) {
693 fprintf(stderr, "SyncWAL() failed: %s\n", s.ToString().c_str());
694 }
695 }
696
697 int rand_column_family = thread->rand.Next() % FLAGS_column_families;
698 ColumnFamilyHandle* column_family = column_families_[rand_column_family];
699
700 if (thread->rand.OneInOpt(FLAGS_compact_files_one_in)) {
701 TestCompactFiles(thread, column_family);
702 }
703
704 int64_t rand_key = GenerateOneKey(thread, i);
705 std::string keystr = Key(rand_key);
706 Slice key = keystr;
707 std::unique_ptr<MutexLock> lock;
708 if (ShouldAcquireMutexOnKey()) {
709 lock.reset(new MutexLock(
710 shared->GetMutexForKey(rand_column_family, rand_key)));
711 }
712
713 if (thread->rand.OneInOpt(FLAGS_compact_range_one_in)) {
714 TestCompactRange(thread, rand_key, key, column_family);
715 if (thread->shared->HasVerificationFailedYet()) {
716 break;
717 }
718 }
719
720 std::vector<int> rand_column_families =
721 GenerateColumnFamilies(FLAGS_column_families, rand_column_family);
722
723 if (thread->rand.OneInOpt(FLAGS_flush_one_in)) {
724 Status status = TestFlush(rand_column_families);
725 if (!status.ok()) {
726 fprintf(stdout, "Unable to perform Flush(): %s\n",
727 status.ToString().c_str());
728 }
729 }
730
731 #ifndef ROCKSDB_LITE
732 // Verify GetLiveFiles with a 1 in N chance.
733 if (thread->rand.OneInOpt(FLAGS_get_live_files_one_in) &&
734 !FLAGS_write_fault_one_in) {
735 Status status = VerifyGetLiveFiles();
736 if (!status.ok()) {
737 VerificationAbort(shared, "VerifyGetLiveFiles status not OK", status);
738 }
739 }
740
741 // Verify GetSortedWalFiles with a 1 in N chance.
742 if (thread->rand.OneInOpt(FLAGS_get_sorted_wal_files_one_in)) {
743 Status status = VerifyGetSortedWalFiles();
744 if (!status.ok()) {
745 VerificationAbort(shared, "VerifyGetSortedWalFiles status not OK",
746 status);
747 }
748 }
749
750 // Verify GetCurrentWalFile with a 1 in N chance.
751 if (thread->rand.OneInOpt(FLAGS_get_current_wal_file_one_in)) {
752 Status status = VerifyGetCurrentWalFile();
753 if (!status.ok()) {
754 VerificationAbort(shared, "VerifyGetCurrentWalFile status not OK",
755 status);
756 }
757 }
758 #endif // !ROCKSDB_LITE
759
760 if (thread->rand.OneInOpt(FLAGS_pause_background_one_in)) {
761 Status status = TestPauseBackground(thread);
762 if (!status.ok()) {
763 VerificationAbort(
764 shared, "Pause/ContinueBackgroundWork status not OK", status);
765 }
766 }
767
768 #ifndef ROCKSDB_LITE
769 if (thread->rand.OneInOpt(FLAGS_verify_checksum_one_in)) {
770 Status status = db_->VerifyChecksum();
771 if (!status.ok()) {
772 VerificationAbort(shared, "VerifyChecksum status not OK", status);
773 }
774 }
775
776 if (thread->rand.OneInOpt(FLAGS_get_property_one_in)) {
777 TestGetProperty(thread);
778 }
779 #endif
780
781 std::vector<int64_t> rand_keys = GenerateKeys(rand_key);
782
783 if (thread->rand.OneInOpt(FLAGS_ingest_external_file_one_in)) {
784 TestIngestExternalFile(thread, rand_column_families, rand_keys, lock);
785 }
786
787 if (thread->rand.OneInOpt(FLAGS_backup_one_in)) {
788 // Beyond a certain DB size threshold, this test becomes heavier than
789 // it's worth.
790 uint64_t total_size = 0;
791 if (FLAGS_backup_max_size > 0) {
792 std::vector<FileAttributes> files;
793 db_stress_env->GetChildrenFileAttributes(FLAGS_db, &files);
794 for (auto& file : files) {
795 total_size += file.size_bytes;
796 }
797 }
798
799 if (total_size <= FLAGS_backup_max_size) {
800 Status s = TestBackupRestore(thread, rand_column_families, rand_keys);
801 if (!s.ok()) {
802 VerificationAbort(shared, "Backup/restore gave inconsistent state",
803 s);
804 }
805 }
806 }
807
808 if (thread->rand.OneInOpt(FLAGS_checkpoint_one_in)) {
809 Status s = TestCheckpoint(thread, rand_column_families, rand_keys);
810 if (!s.ok()) {
811 VerificationAbort(shared, "Checkpoint gave inconsistent state", s);
812 }
813 }
814
815 #ifndef ROCKSDB_LITE
816 if (thread->rand.OneInOpt(FLAGS_approximate_size_one_in)) {
817 Status s =
818 TestApproximateSize(thread, i, rand_column_families, rand_keys);
819 if (!s.ok()) {
820 VerificationAbort(shared, "ApproximateSize Failed", s);
821 }
822 }
823 #endif // !ROCKSDB_LITE
824 if (thread->rand.OneInOpt(FLAGS_acquire_snapshot_one_in)) {
825 TestAcquireSnapshot(thread, rand_column_family, keystr, i);
826 }
827
828 /*always*/ {
829 Status s = MaybeReleaseSnapshots(thread, i);
830 if (!s.ok()) {
831 VerificationAbort(shared, "Snapshot gave inconsistent state", s);
832 }
833 }
834
835 // Assign timestamps if necessary.
836 std::string read_ts_str;
837 std::string write_ts_str;
838 Slice read_ts;
839 Slice write_ts;
840 if (ShouldAcquireMutexOnKey() && FLAGS_user_timestamp_size > 0) {
841 read_ts_str = GenerateTimestampForRead();
842 read_ts = read_ts_str;
843 read_opts.timestamp = &read_ts;
844 write_ts_str = NowNanosStr();
845 write_ts = write_ts_str;
846 write_opts.timestamp = &write_ts;
847 }
848
849 int prob_op = thread->rand.Uniform(100);
850 // Reset this in case we pick something other than a read op. We don't
851 // want to use a stale value when deciding at the beginning of the loop
852 // whether to vote to reopen
853 if (prob_op >= 0 && prob_op < static_cast<int>(FLAGS_readpercent)) {
854 assert(0 <= prob_op);
855 // OPERATION read
856 if (FLAGS_use_multiget) {
857 // Leave room for one more iteration of the loop with a single key
858 // batch. This is to ensure that each thread does exactly the same
859 // number of ops
860 int multiget_batch_size = static_cast<int>(
861 std::min(static_cast<uint64_t>(thread->rand.Uniform(64)),
862 FLAGS_ops_per_thread - i - 1));
863 // If its the last iteration, ensure that multiget_batch_size is 1
864 multiget_batch_size = std::max(multiget_batch_size, 1);
865 rand_keys = GenerateNKeys(thread, multiget_batch_size, i);
866 TestMultiGet(thread, read_opts, rand_column_families, rand_keys);
867 i += multiget_batch_size - 1;
868 } else {
869 TestGet(thread, read_opts, rand_column_families, rand_keys);
870 }
871 } else if (prob_op < prefixBound) {
872 assert(static_cast<int>(FLAGS_readpercent) <= prob_op);
873 // OPERATION prefix scan
874 // keys are 8 bytes long, prefix size is FLAGS_prefix_size. There are
875 // (8 - FLAGS_prefix_size) bytes besides the prefix. So there will
876 // be 2 ^ ((8 - FLAGS_prefix_size) * 8) possible keys with the same
877 // prefix
878 TestPrefixScan(thread, read_opts, rand_column_families, rand_keys);
879 } else if (prob_op < writeBound) {
880 assert(prefixBound <= prob_op);
881 // OPERATION write
882 TestPut(thread, write_opts, read_opts, rand_column_families, rand_keys,
883 value, lock);
884 } else if (prob_op < delBound) {
885 assert(writeBound <= prob_op);
886 // OPERATION delete
887 TestDelete(thread, write_opts, rand_column_families, rand_keys, lock);
888 } else if (prob_op < delRangeBound) {
889 assert(delBound <= prob_op);
890 // OPERATION delete range
891 TestDeleteRange(thread, write_opts, rand_column_families, rand_keys,
892 lock);
893 } else {
894 assert(delRangeBound <= prob_op);
895 // OPERATION iterate
896 int num_seeks = static_cast<int>(
897 std::min(static_cast<uint64_t>(thread->rand.Uniform(4)),
898 FLAGS_ops_per_thread - i - 1));
899 rand_keys = GenerateNKeys(thread, num_seeks, i);
900 i += num_seeks - 1;
901 TestIterate(thread, read_opts, rand_column_families, rand_keys);
902 }
903 thread->stats.FinishedSingleOp();
904 #ifndef ROCKSDB_LITE
905 uint32_t tid = thread->tid;
906 assert(secondaries_.empty() ||
907 static_cast<size_t>(tid) < secondaries_.size());
908 if (thread->rand.OneInOpt(FLAGS_secondary_catch_up_one_in)) {
909 Status s = secondaries_[tid]->TryCatchUpWithPrimary();
910 if (!s.ok()) {
911 VerificationAbort(shared, "Secondary instance failed to catch up", s);
912 break;
913 }
914 }
915 #endif
916 }
917 }
918 while (!thread->snapshot_queue.empty()) {
919 db_->ReleaseSnapshot(thread->snapshot_queue.front().second.snapshot);
920 delete thread->snapshot_queue.front().second.key_vec;
921 thread->snapshot_queue.pop();
922 }
923
924 thread->stats.Stop();
925 }
926
927 #ifndef ROCKSDB_LITE
928 // Generated a list of keys that close to boundaries of SST keys.
929 // If there isn't any SST file in the DB, return empty list.
GetWhiteBoxKeys(ThreadState * thread,DB * db,ColumnFamilyHandle * cfh,size_t num_keys)930 std::vector<std::string> StressTest::GetWhiteBoxKeys(ThreadState* thread,
931 DB* db,
932 ColumnFamilyHandle* cfh,
933 size_t num_keys) {
934 ColumnFamilyMetaData cfmd;
935 db->GetColumnFamilyMetaData(cfh, &cfmd);
936 std::vector<std::string> boundaries;
937 for (const LevelMetaData& lmd : cfmd.levels) {
938 for (const SstFileMetaData& sfmd : lmd.files) {
939 // If FLAGS_user_timestamp_size > 0, then both smallestkey and largestkey
940 // have timestamps.
941 const auto& skey = sfmd.smallestkey;
942 const auto& lkey = sfmd.largestkey;
943 assert(skey.size() >= FLAGS_user_timestamp_size);
944 assert(lkey.size() >= FLAGS_user_timestamp_size);
945 boundaries.push_back(
946 skey.substr(0, skey.size() - FLAGS_user_timestamp_size));
947 boundaries.push_back(
948 lkey.substr(0, lkey.size() - FLAGS_user_timestamp_size));
949 }
950 }
951 if (boundaries.empty()) {
952 return {};
953 }
954
955 std::vector<std::string> ret;
956 for (size_t j = 0; j < num_keys; j++) {
957 std::string k =
958 boundaries[thread->rand.Uniform(static_cast<int>(boundaries.size()))];
959 if (thread->rand.OneIn(3)) {
960 // Reduce one byte from the string
961 for (int i = static_cast<int>(k.length()) - 1; i >= 0; i--) {
962 uint8_t cur = k[i];
963 if (cur > 0) {
964 k[i] = static_cast<char>(cur - 1);
965 break;
966 } else if (i > 0) {
967 k[i] = 0xFFu;
968 }
969 }
970 } else if (thread->rand.OneIn(2)) {
971 // Add one byte to the string
972 for (int i = static_cast<int>(k.length()) - 1; i >= 0; i--) {
973 uint8_t cur = k[i];
974 if (cur < 255) {
975 k[i] = static_cast<char>(cur + 1);
976 break;
977 } else if (i > 0) {
978 k[i] = 0x00;
979 }
980 }
981 }
982 ret.push_back(k);
983 }
984 return ret;
985 }
986 #endif // !ROCKSDB_LITE
987
988 // Given a key K, this creates an iterator which scans to K and then
989 // does a random sequence of Next/Prev operations.
TestIterate(ThreadState * thread,const ReadOptions & read_opts,const std::vector<int> & rand_column_families,const std::vector<int64_t> & rand_keys)990 Status StressTest::TestIterate(ThreadState* thread,
991 const ReadOptions& read_opts,
992 const std::vector<int>& rand_column_families,
993 const std::vector<int64_t>& rand_keys) {
994 Status s;
995 const Snapshot* snapshot = db_->GetSnapshot();
996 ReadOptions readoptionscopy = read_opts;
997 readoptionscopy.snapshot = snapshot;
998
999 bool expect_total_order = false;
1000 if (thread->rand.OneIn(16)) {
1001 // When prefix extractor is used, it's useful to cover total order seek.
1002 readoptionscopy.total_order_seek = true;
1003 expect_total_order = true;
1004 } else if (thread->rand.OneIn(4)) {
1005 readoptionscopy.total_order_seek = false;
1006 readoptionscopy.auto_prefix_mode = true;
1007 expect_total_order = true;
1008 } else if (options_.prefix_extractor.get() == nullptr) {
1009 expect_total_order = true;
1010 }
1011
1012 std::string upper_bound_str;
1013 Slice upper_bound;
1014 if (thread->rand.OneIn(16)) {
1015 // in 1/16 chance, set a iterator upper bound
1016 int64_t rand_upper_key = GenerateOneKey(thread, FLAGS_ops_per_thread);
1017 upper_bound_str = Key(rand_upper_key);
1018 upper_bound = Slice(upper_bound_str);
1019 // uppder_bound can be smaller than seek key, but the query itself
1020 // should not crash either.
1021 readoptionscopy.iterate_upper_bound = &upper_bound;
1022 }
1023 std::string lower_bound_str;
1024 Slice lower_bound;
1025 if (thread->rand.OneIn(16)) {
1026 // in 1/16 chance, enable iterator lower bound
1027 int64_t rand_lower_key = GenerateOneKey(thread, FLAGS_ops_per_thread);
1028 lower_bound_str = Key(rand_lower_key);
1029 lower_bound = Slice(lower_bound_str);
1030 // uppder_bound can be smaller than seek key, but the query itself
1031 // should not crash either.
1032 readoptionscopy.iterate_lower_bound = &lower_bound;
1033 }
1034
1035 auto cfh = column_families_[rand_column_families[0]];
1036 std::unique_ptr<Iterator> iter(db_->NewIterator(readoptionscopy, cfh));
1037
1038 std::vector<std::string> key_str;
1039 if (thread->rand.OneIn(16)) {
1040 // Generate keys close to lower or upper bound of SST files.
1041 key_str = GetWhiteBoxKeys(thread, db_, cfh, rand_keys.size());
1042 }
1043 if (key_str.empty()) {
1044 // If key string is not geneerated using white block keys,
1045 // Use randomized key passe in.
1046 for (int64_t rkey : rand_keys) {
1047 key_str.push_back(Key(rkey));
1048 }
1049 }
1050
1051 std::string op_logs;
1052 const size_t kOpLogsLimit = 10000;
1053
1054 for (const std::string& skey : key_str) {
1055 if (op_logs.size() > kOpLogsLimit) {
1056 // Shouldn't take too much memory for the history log. Clear it.
1057 op_logs = "(cleared...)\n";
1058 }
1059
1060 Slice key = skey;
1061
1062 if (readoptionscopy.iterate_upper_bound != nullptr &&
1063 thread->rand.OneIn(2)) {
1064 // 1/2 chance, change the upper bound.
1065 // It is possible that it is changed without first use, but there is no
1066 // problem with that.
1067 int64_t rand_upper_key = GenerateOneKey(thread, FLAGS_ops_per_thread);
1068 upper_bound_str = Key(rand_upper_key);
1069 upper_bound = Slice(upper_bound_str);
1070 } else if (readoptionscopy.iterate_lower_bound != nullptr &&
1071 thread->rand.OneIn(4)) {
1072 // 1/4 chance, change the lower bound.
1073 // It is possible that it is changed without first use, but there is no
1074 // problem with that.
1075 int64_t rand_lower_key = GenerateOneKey(thread, FLAGS_ops_per_thread);
1076 lower_bound_str = Key(rand_lower_key);
1077 lower_bound = Slice(lower_bound_str);
1078 }
1079
1080 // Record some options to op_logs;
1081 op_logs += "total_order_seek: ";
1082 op_logs += (readoptionscopy.total_order_seek ? "1 " : "0 ");
1083 op_logs += "auto_prefix_mode: ";
1084 op_logs += (readoptionscopy.auto_prefix_mode ? "1 " : "0 ");
1085 if (readoptionscopy.iterate_upper_bound != nullptr) {
1086 op_logs += "ub: " + upper_bound.ToString(true) + " ";
1087 }
1088 if (readoptionscopy.iterate_lower_bound != nullptr) {
1089 op_logs += "lb: " + lower_bound.ToString(true) + " ";
1090 }
1091
1092 // Set up an iterator and does the same without bounds and with total
1093 // order seek and compare the results. This is to identify bugs related
1094 // to bounds, prefix extractor or reseeking. Sometimes we are comparing
1095 // iterators with the same set-up, and it doesn't hurt to check them
1096 // to be equal.
1097 ReadOptions cmp_ro;
1098 cmp_ro.timestamp = readoptionscopy.timestamp;
1099 cmp_ro.snapshot = snapshot;
1100 cmp_ro.total_order_seek = true;
1101 ColumnFamilyHandle* cmp_cfh =
1102 GetControlCfh(thread, rand_column_families[0]);
1103 std::unique_ptr<Iterator> cmp_iter(db_->NewIterator(cmp_ro, cmp_cfh));
1104 bool diverged = false;
1105
1106 bool support_seek_first_or_last = expect_total_order;
1107
1108 LastIterateOp last_op;
1109 if (support_seek_first_or_last && thread->rand.OneIn(100)) {
1110 iter->SeekToFirst();
1111 cmp_iter->SeekToFirst();
1112 last_op = kLastOpSeekToFirst;
1113 op_logs += "STF ";
1114 } else if (support_seek_first_or_last && thread->rand.OneIn(100)) {
1115 iter->SeekToLast();
1116 cmp_iter->SeekToLast();
1117 last_op = kLastOpSeekToLast;
1118 op_logs += "STL ";
1119 } else if (thread->rand.OneIn(8)) {
1120 iter->SeekForPrev(key);
1121 cmp_iter->SeekForPrev(key);
1122 last_op = kLastOpSeekForPrev;
1123 op_logs += "SFP " + key.ToString(true) + " ";
1124 } else {
1125 iter->Seek(key);
1126 cmp_iter->Seek(key);
1127 last_op = kLastOpSeek;
1128 op_logs += "S " + key.ToString(true) + " ";
1129 }
1130 VerifyIterator(thread, cmp_cfh, readoptionscopy, iter.get(), cmp_iter.get(),
1131 last_op, key, op_logs, &diverged);
1132
1133 bool no_reverse =
1134 (FLAGS_memtablerep == "prefix_hash" && !expect_total_order);
1135 for (uint64_t i = 0; i < FLAGS_num_iterations && iter->Valid(); i++) {
1136 if (no_reverse || thread->rand.OneIn(2)) {
1137 iter->Next();
1138 if (!diverged) {
1139 assert(cmp_iter->Valid());
1140 cmp_iter->Next();
1141 }
1142 op_logs += "N";
1143 } else {
1144 iter->Prev();
1145 if (!diverged) {
1146 assert(cmp_iter->Valid());
1147 cmp_iter->Prev();
1148 }
1149 op_logs += "P";
1150 }
1151 last_op = kLastOpNextOrPrev;
1152 VerifyIterator(thread, cmp_cfh, readoptionscopy, iter.get(),
1153 cmp_iter.get(), last_op, key, op_logs, &diverged);
1154 }
1155
1156 if (s.ok()) {
1157 thread->stats.AddIterations(1);
1158 } else {
1159 fprintf(stderr, "TestIterate error: %s\n", s.ToString().c_str());
1160 thread->stats.AddErrors(1);
1161 break;
1162 }
1163
1164 op_logs += "; ";
1165 }
1166
1167 db_->ReleaseSnapshot(snapshot);
1168
1169 return s;
1170 }
1171
1172 #ifndef ROCKSDB_LITE
1173 // Test the return status of GetLiveFiles.
VerifyGetLiveFiles() const1174 Status StressTest::VerifyGetLiveFiles() const {
1175 std::vector<std::string> live_file;
1176 uint64_t manifest_size = 0;
1177 return db_->GetLiveFiles(live_file, &manifest_size);
1178 }
1179
1180 // Test the return status of GetSortedWalFiles.
VerifyGetSortedWalFiles() const1181 Status StressTest::VerifyGetSortedWalFiles() const {
1182 VectorLogPtr log_ptr;
1183 return db_->GetSortedWalFiles(log_ptr);
1184 }
1185
1186 // Test the return status of GetCurrentWalFile.
VerifyGetCurrentWalFile() const1187 Status StressTest::VerifyGetCurrentWalFile() const {
1188 std::unique_ptr<LogFile> cur_wal_file;
1189 return db_->GetCurrentWalFile(&cur_wal_file);
1190 }
1191 #endif // !ROCKSDB_LITE
1192
1193 // Compare the two iterator, iter and cmp_iter are in the same position,
1194 // unless iter might be made invalidate or undefined because of
1195 // upper or lower bounds, or prefix extractor.
1196 // Will flag failure if the verification fails.
1197 // diverged = true if the two iterator is already diverged.
1198 // True if verification passed, false if not.
VerifyIterator(ThreadState * thread,ColumnFamilyHandle * cmp_cfh,const ReadOptions & ro,Iterator * iter,Iterator * cmp_iter,LastIterateOp op,const Slice & seek_key,const std::string & op_logs,bool * diverged)1199 void StressTest::VerifyIterator(ThreadState* thread,
1200 ColumnFamilyHandle* cmp_cfh,
1201 const ReadOptions& ro, Iterator* iter,
1202 Iterator* cmp_iter, LastIterateOp op,
1203 const Slice& seek_key,
1204 const std::string& op_logs, bool* diverged) {
1205 if (*diverged) {
1206 return;
1207 }
1208
1209 if (op == kLastOpSeekToFirst && ro.iterate_lower_bound != nullptr) {
1210 // SeekToFirst() with lower bound is not well defined.
1211 *diverged = true;
1212 return;
1213 } else if (op == kLastOpSeekToLast && ro.iterate_upper_bound != nullptr) {
1214 // SeekToLast() with higher bound is not well defined.
1215 *diverged = true;
1216 return;
1217 } else if (op == kLastOpSeek && ro.iterate_lower_bound != nullptr &&
1218 (options_.comparator->CompareWithoutTimestamp(
1219 *ro.iterate_lower_bound, /*a_has_ts=*/false, seek_key,
1220 /*b_has_ts=*/false) >= 0 ||
1221 (ro.iterate_upper_bound != nullptr &&
1222 options_.comparator->CompareWithoutTimestamp(
1223 *ro.iterate_lower_bound, /*a_has_ts=*/false,
1224 *ro.iterate_upper_bound, /*b_has_ts*/ false) >= 0))) {
1225 // Lower bound behavior is not well defined if it is larger than
1226 // seek key or upper bound. Disable the check for now.
1227 *diverged = true;
1228 return;
1229 } else if (op == kLastOpSeekForPrev && ro.iterate_upper_bound != nullptr &&
1230 (options_.comparator->CompareWithoutTimestamp(
1231 *ro.iterate_upper_bound, /*a_has_ts=*/false, seek_key,
1232 /*b_has_ts=*/false) <= 0 ||
1233 (ro.iterate_lower_bound != nullptr &&
1234 options_.comparator->CompareWithoutTimestamp(
1235 *ro.iterate_lower_bound, /*a_has_ts=*/false,
1236 *ro.iterate_upper_bound, /*b_has_ts=*/false) >= 0))) {
1237 // Uppder bound behavior is not well defined if it is smaller than
1238 // seek key or lower bound. Disable the check for now.
1239 *diverged = true;
1240 return;
1241 }
1242
1243 const SliceTransform* pe = (ro.total_order_seek || ro.auto_prefix_mode)
1244 ? nullptr
1245 : options_.prefix_extractor.get();
1246 const Comparator* cmp = options_.comparator;
1247
1248 if (iter->Valid() && !cmp_iter->Valid()) {
1249 if (pe != nullptr) {
1250 if (!pe->InDomain(seek_key)) {
1251 // Prefix seek a non-in-domain key is undefined. Skip checking for
1252 // this scenario.
1253 *diverged = true;
1254 return;
1255 } else if (!pe->InDomain(iter->key())) {
1256 // out of range is iterator key is not in domain anymore.
1257 *diverged = true;
1258 return;
1259 } else if (pe->Transform(iter->key()) != pe->Transform(seek_key)) {
1260 *diverged = true;
1261 return;
1262 }
1263 }
1264 fprintf(stderr,
1265 "Control interator is invalid but iterator has key %s "
1266 "%s\n",
1267 iter->key().ToString(true).c_str(), op_logs.c_str());
1268
1269 *diverged = true;
1270 } else if (cmp_iter->Valid()) {
1271 // Iterator is not valid. It can be legimate if it has already been
1272 // out of upper or lower bound, or filtered out by prefix iterator.
1273 const Slice& total_order_key = cmp_iter->key();
1274
1275 if (pe != nullptr) {
1276 if (!pe->InDomain(seek_key)) {
1277 // Prefix seek a non-in-domain key is undefined. Skip checking for
1278 // this scenario.
1279 *diverged = true;
1280 return;
1281 }
1282
1283 if (!pe->InDomain(total_order_key) ||
1284 pe->Transform(total_order_key) != pe->Transform(seek_key)) {
1285 // If the prefix is exhausted, the only thing needs to check
1286 // is the iterator isn't return a position in prefix.
1287 // Either way, checking can stop from here.
1288 *diverged = true;
1289 if (!iter->Valid() || !pe->InDomain(iter->key()) ||
1290 pe->Transform(iter->key()) != pe->Transform(seek_key)) {
1291 return;
1292 }
1293 fprintf(stderr,
1294 "Iterator stays in prefix but contol doesn't"
1295 " iterator key %s control iterator key %s %s\n",
1296 iter->key().ToString(true).c_str(),
1297 cmp_iter->key().ToString(true).c_str(), op_logs.c_str());
1298 }
1299 }
1300 // Check upper or lower bounds.
1301 if (!*diverged) {
1302 if ((iter->Valid() && iter->key() != cmp_iter->key()) ||
1303 (!iter->Valid() &&
1304 (ro.iterate_upper_bound == nullptr ||
1305 cmp->CompareWithoutTimestamp(total_order_key, /*a_has_ts=*/false,
1306 *ro.iterate_upper_bound,
1307 /*b_has_ts=*/false) < 0) &&
1308 (ro.iterate_lower_bound == nullptr ||
1309 cmp->CompareWithoutTimestamp(total_order_key, /*a_has_ts=*/false,
1310 *ro.iterate_lower_bound,
1311 /*b_has_ts=*/false) > 0))) {
1312 fprintf(stderr,
1313 "Iterator diverged from control iterator which"
1314 " has value %s %s\n",
1315 total_order_key.ToString(true).c_str(), op_logs.c_str());
1316 if (iter->Valid()) {
1317 fprintf(stderr, "iterator has value %s\n",
1318 iter->key().ToString(true).c_str());
1319 } else {
1320 fprintf(stderr, "iterator is not valid\n");
1321 }
1322 *diverged = true;
1323 }
1324 }
1325 }
1326 if (*diverged) {
1327 fprintf(stderr, "Control CF %s\n", cmp_cfh->GetName().c_str());
1328 thread->stats.AddErrors(1);
1329 // Fail fast to preserve the DB state.
1330 thread->shared->SetVerificationFailure();
1331 }
1332 }
1333
1334 #ifdef ROCKSDB_LITE
TestBackupRestore(ThreadState *,const std::vector<int> &,const std::vector<int64_t> &)1335 Status StressTest::TestBackupRestore(
1336 ThreadState* /* thread */,
1337 const std::vector<int>& /* rand_column_families */,
1338 const std::vector<int64_t>& /* rand_keys */) {
1339 assert(false);
1340 fprintf(stderr,
1341 "RocksDB lite does not support "
1342 "TestBackupRestore\n");
1343 std::terminate();
1344 }
1345
TestCheckpoint(ThreadState *,const std::vector<int> &,const std::vector<int64_t> &)1346 Status StressTest::TestCheckpoint(
1347 ThreadState* /* thread */,
1348 const std::vector<int>& /* rand_column_families */,
1349 const std::vector<int64_t>& /* rand_keys */) {
1350 assert(false);
1351 fprintf(stderr,
1352 "RocksDB lite does not support "
1353 "TestCheckpoint\n");
1354 std::terminate();
1355 }
1356
TestCompactFiles(ThreadState *,ColumnFamilyHandle *)1357 void StressTest::TestCompactFiles(ThreadState* /* thread */,
1358 ColumnFamilyHandle* /* column_family */) {
1359 assert(false);
1360 fprintf(stderr,
1361 "RocksDB lite does not support "
1362 "CompactFiles\n");
1363 std::terminate();
1364 }
1365 #else // ROCKSDB_LITE
TestBackupRestore(ThreadState * thread,const std::vector<int> & rand_column_families,const std::vector<int64_t> & rand_keys)1366 Status StressTest::TestBackupRestore(
1367 ThreadState* thread, const std::vector<int>& rand_column_families,
1368 const std::vector<int64_t>& rand_keys) {
1369 std::string backup_dir = FLAGS_db + "/.backup" + ToString(thread->tid);
1370 std::string restore_dir = FLAGS_db + "/.restore" + ToString(thread->tid);
1371 BackupableDBOptions backup_opts(backup_dir);
1372 // For debugging, get info_log from live options
1373 backup_opts.info_log = db_->GetDBOptions().info_log.get();
1374 if (thread->rand.OneIn(10)) {
1375 backup_opts.share_table_files = false;
1376 } else {
1377 backup_opts.share_table_files = true;
1378 if (thread->rand.OneIn(5)) {
1379 backup_opts.share_files_with_checksum = false;
1380 } else {
1381 backup_opts.share_files_with_checksum = true;
1382 if (thread->rand.OneIn(2)) {
1383 // old
1384 backup_opts.share_files_with_checksum_naming =
1385 BackupableDBOptions::kLegacyCrc32cAndFileSize;
1386 } else {
1387 // new
1388 backup_opts.share_files_with_checksum_naming =
1389 BackupableDBOptions::kUseDbSessionId;
1390 }
1391 if (thread->rand.OneIn(2)) {
1392 backup_opts.share_files_with_checksum_naming =
1393 backup_opts.share_files_with_checksum_naming |
1394 BackupableDBOptions::kFlagIncludeFileSize;
1395 }
1396 }
1397 }
1398 BackupEngine* backup_engine = nullptr;
1399 std::string from = "a backup/restore operation";
1400 Status s = BackupEngine::Open(db_stress_env, backup_opts, &backup_engine);
1401 if (!s.ok()) {
1402 from = "BackupEngine::Open";
1403 }
1404 if (s.ok()) {
1405 if (thread->rand.OneIn(2)) {
1406 TEST_FutureSchemaVersion2Options test_opts;
1407 test_opts.crc32c_checksums = thread->rand.OneIn(2) == 0;
1408 test_opts.file_sizes = thread->rand.OneIn(2) == 0;
1409 TEST_EnableWriteFutureSchemaVersion2(backup_engine, test_opts);
1410 }
1411 s = backup_engine->CreateNewBackup(db_);
1412 if (!s.ok()) {
1413 from = "BackupEngine::CreateNewBackup";
1414 }
1415 }
1416 if (s.ok()) {
1417 delete backup_engine;
1418 backup_engine = nullptr;
1419 s = BackupEngine::Open(db_stress_env, backup_opts, &backup_engine);
1420 if (!s.ok()) {
1421 from = "BackupEngine::Open (again)";
1422 }
1423 }
1424 std::vector<BackupInfo> backup_info;
1425 // If inplace_not_restore, we verify the backup by opening it as a
1426 // read-only DB. If !inplace_not_restore, we restore it to a temporary
1427 // directory for verification.
1428 bool inplace_not_restore = thread->rand.OneIn(3);
1429 if (s.ok()) {
1430 backup_engine->GetBackupInfo(&backup_info,
1431 /*include_file_details*/ inplace_not_restore);
1432 if (backup_info.empty()) {
1433 s = Status::NotFound("no backups found");
1434 from = "BackupEngine::GetBackupInfo";
1435 }
1436 }
1437 if (s.ok() && thread->rand.OneIn(2)) {
1438 s = backup_engine->VerifyBackup(
1439 backup_info.front().backup_id,
1440 thread->rand.OneIn(2) /* verify_with_checksum */);
1441 if (!s.ok()) {
1442 from = "BackupEngine::VerifyBackup";
1443 }
1444 }
1445 const bool allow_persistent = thread->tid == 0; // not too many
1446 bool from_latest = false;
1447 int count = static_cast<int>(backup_info.size());
1448 if (s.ok() && !inplace_not_restore) {
1449 if (count > 1) {
1450 s = backup_engine->RestoreDBFromBackup(
1451 RestoreOptions(), backup_info[thread->rand.Uniform(count)].backup_id,
1452 restore_dir /* db_dir */, restore_dir /* wal_dir */);
1453 if (!s.ok()) {
1454 from = "BackupEngine::RestoreDBFromBackup";
1455 }
1456 } else {
1457 from_latest = true;
1458 s = backup_engine->RestoreDBFromLatestBackup(RestoreOptions(),
1459 restore_dir /* db_dir */,
1460 restore_dir /* wal_dir */);
1461 if (!s.ok()) {
1462 from = "BackupEngine::RestoreDBFromLatestBackup";
1463 }
1464 }
1465 }
1466 if (s.ok() && !inplace_not_restore) {
1467 // Purge early if restoring, to ensure the restored directory doesn't
1468 // have some secret dependency on the backup directory.
1469 uint32_t to_keep = 0;
1470 if (allow_persistent) {
1471 // allow one thread to keep up to 2 backups
1472 to_keep = thread->rand.Uniform(3);
1473 }
1474 s = backup_engine->PurgeOldBackups(to_keep);
1475 if (!s.ok()) {
1476 from = "BackupEngine::PurgeOldBackups";
1477 }
1478 }
1479 DB* restored_db = nullptr;
1480 std::vector<ColumnFamilyHandle*> restored_cf_handles;
1481 // Not yet implemented: opening restored BlobDB or TransactionDB
1482 if (s.ok() && !FLAGS_use_txn && !FLAGS_use_blob_db) {
1483 Options restore_options(options_);
1484 restore_options.listeners.clear();
1485 // Avoid dangling/shared file descriptors, for reliable destroy
1486 restore_options.sst_file_manager = nullptr;
1487 std::vector<ColumnFamilyDescriptor> cf_descriptors;
1488 // TODO(ajkr): `column_family_names_` is not safe to access here when
1489 // `clear_column_family_one_in != 0`. But we can't easily switch to
1490 // `ListColumnFamilies` to get names because it won't necessarily give
1491 // the same order as `column_family_names_`.
1492 assert(FLAGS_clear_column_family_one_in == 0);
1493 for (auto name : column_family_names_) {
1494 cf_descriptors.emplace_back(name, ColumnFamilyOptions(restore_options));
1495 }
1496 if (inplace_not_restore) {
1497 BackupInfo& info = backup_info[thread->rand.Uniform(count)];
1498 restore_options.env = info.env_for_open.get();
1499 s = DB::OpenForReadOnly(DBOptions(restore_options), info.name_for_open,
1500 cf_descriptors, &restored_cf_handles,
1501 &restored_db);
1502 if (!s.ok()) {
1503 from = "DB::OpenForReadOnly in backup/restore";
1504 }
1505 } else {
1506 s = DB::Open(DBOptions(restore_options), restore_dir, cf_descriptors,
1507 &restored_cf_handles, &restored_db);
1508 if (!s.ok()) {
1509 from = "DB::Open in backup/restore";
1510 }
1511 }
1512 }
1513 // Note the column families chosen by `rand_column_families` cannot be
1514 // dropped while the locks for `rand_keys` are held. So we should not have
1515 // to worry about accessing those column families throughout this function.
1516 //
1517 // For simplicity, currently only verifies existence/non-existence of a
1518 // single key
1519 for (size_t i = 0; restored_db && s.ok() && i < rand_column_families.size();
1520 ++i) {
1521 std::string key_str = Key(rand_keys[0]);
1522 Slice key = key_str;
1523 std::string restored_value;
1524 ReadOptions read_opts;
1525 std::string ts_str;
1526 Slice ts;
1527 if (FLAGS_user_timestamp_size > 0) {
1528 ts_str = GenerateTimestampForRead();
1529 ts = ts_str;
1530 read_opts.timestamp = &ts;
1531 }
1532 Status get_status = restored_db->Get(
1533 read_opts, restored_cf_handles[rand_column_families[i]], key,
1534 &restored_value);
1535 bool exists = thread->shared->Exists(rand_column_families[i], rand_keys[0]);
1536 if (get_status.ok()) {
1537 if (!exists && from_latest && ShouldAcquireMutexOnKey()) {
1538 s = Status::Corruption("key exists in restore but not in original db");
1539 }
1540 } else if (get_status.IsNotFound()) {
1541 if (exists && from_latest && ShouldAcquireMutexOnKey()) {
1542 s = Status::Corruption("key exists in original db but not in restore");
1543 }
1544 } else {
1545 s = get_status;
1546 if (!s.ok()) {
1547 from = "DB::Get in backup/restore";
1548 }
1549 }
1550 }
1551 if (restored_db != nullptr) {
1552 for (auto* cf_handle : restored_cf_handles) {
1553 restored_db->DestroyColumnFamilyHandle(cf_handle);
1554 }
1555 delete restored_db;
1556 restored_db = nullptr;
1557 }
1558 if (s.ok() && inplace_not_restore) {
1559 // Purge late if inplace open read-only
1560 uint32_t to_keep = 0;
1561 if (allow_persistent) {
1562 // allow one thread to keep up to 2 backups
1563 to_keep = thread->rand.Uniform(3);
1564 }
1565 s = backup_engine->PurgeOldBackups(to_keep);
1566 if (!s.ok()) {
1567 from = "BackupEngine::PurgeOldBackups";
1568 }
1569 }
1570 if (backup_engine != nullptr) {
1571 delete backup_engine;
1572 backup_engine = nullptr;
1573 }
1574 if (s.ok()) {
1575 // Preserve directories on failure, or allowed persistent backup
1576 if (!allow_persistent) {
1577 s = DestroyDir(db_stress_env, backup_dir);
1578 if (!s.ok()) {
1579 from = "Destroy backup dir";
1580 }
1581 }
1582 }
1583 if (s.ok()) {
1584 s = DestroyDir(db_stress_env, restore_dir);
1585 if (!s.ok()) {
1586 from = "Destroy restore dir";
1587 }
1588 }
1589 if (!s.ok()) {
1590 fprintf(stderr, "Failure in %s with: %s\n", from.c_str(),
1591 s.ToString().c_str());
1592 }
1593 return s;
1594 }
1595
TestApproximateSize(ThreadState * thread,uint64_t iteration,const std::vector<int> & rand_column_families,const std::vector<int64_t> & rand_keys)1596 Status StressTest::TestApproximateSize(
1597 ThreadState* thread, uint64_t iteration,
1598 const std::vector<int>& rand_column_families,
1599 const std::vector<int64_t>& rand_keys) {
1600 // rand_keys likely only has one key. Just use the first one.
1601 assert(!rand_keys.empty());
1602 assert(!rand_column_families.empty());
1603 int64_t key1 = rand_keys[0];
1604 int64_t key2;
1605 if (thread->rand.OneIn(2)) {
1606 // Two totally random keys. This tends to cover large ranges.
1607 key2 = GenerateOneKey(thread, iteration);
1608 if (key2 < key1) {
1609 std::swap(key1, key2);
1610 }
1611 } else {
1612 // Unless users pass a very large FLAGS_max_key, it we should not worry
1613 // about overflow. It is for testing, so we skip the overflow checking
1614 // for simplicity.
1615 key2 = key1 + static_cast<int64_t>(thread->rand.Uniform(1000));
1616 }
1617 std::string key1_str = Key(key1);
1618 std::string key2_str = Key(key2);
1619 Range range{Slice(key1_str), Slice(key2_str)};
1620 SizeApproximationOptions sao;
1621 sao.include_memtabtles = thread->rand.OneIn(2);
1622 if (sao.include_memtabtles) {
1623 sao.include_files = thread->rand.OneIn(2);
1624 }
1625 if (thread->rand.OneIn(2)) {
1626 if (thread->rand.OneIn(2)) {
1627 sao.files_size_error_margin = 0.0;
1628 } else {
1629 sao.files_size_error_margin =
1630 static_cast<double>(thread->rand.Uniform(3));
1631 }
1632 }
1633 uint64_t result;
1634 return db_->GetApproximateSizes(
1635 sao, column_families_[rand_column_families[0]], &range, 1, &result);
1636 }
1637
TestCheckpoint(ThreadState * thread,const std::vector<int> & rand_column_families,const std::vector<int64_t> & rand_keys)1638 Status StressTest::TestCheckpoint(ThreadState* thread,
1639 const std::vector<int>& rand_column_families,
1640 const std::vector<int64_t>& rand_keys) {
1641 std::string checkpoint_dir =
1642 FLAGS_db + "/.checkpoint" + ToString(thread->tid);
1643 Options tmp_opts(options_);
1644 tmp_opts.listeners.clear();
1645 tmp_opts.env = db_stress_env;
1646
1647 DestroyDB(checkpoint_dir, tmp_opts);
1648
1649 if (db_stress_env->FileExists(checkpoint_dir).ok()) {
1650 // If the directory might still exist, try to delete the files one by one.
1651 // Likely a trash file is still there.
1652 Status my_s = DestroyDir(db_stress_env, checkpoint_dir);
1653 if (!my_s.ok()) {
1654 fprintf(stderr, "Fail to destory directory before checkpoint: %s",
1655 my_s.ToString().c_str());
1656 }
1657 }
1658
1659 Checkpoint* checkpoint = nullptr;
1660 Status s = Checkpoint::Create(db_, &checkpoint);
1661 if (s.ok()) {
1662 s = checkpoint->CreateCheckpoint(checkpoint_dir);
1663 if (!s.ok()) {
1664 fprintf(stderr, "Fail to create checkpoint to %s\n",
1665 checkpoint_dir.c_str());
1666 std::vector<std::string> files;
1667 Status my_s = db_stress_env->GetChildren(checkpoint_dir, &files);
1668 if (my_s.ok()) {
1669 for (const auto& f : files) {
1670 fprintf(stderr, " %s\n", f.c_str());
1671 }
1672 } else {
1673 fprintf(stderr, "Fail to get files under the directory to %s\n",
1674 my_s.ToString().c_str());
1675 }
1676 }
1677 }
1678 delete checkpoint;
1679 checkpoint = nullptr;
1680 std::vector<ColumnFamilyHandle*> cf_handles;
1681 DB* checkpoint_db = nullptr;
1682 if (s.ok()) {
1683 Options options(options_);
1684 options.listeners.clear();
1685 std::vector<ColumnFamilyDescriptor> cf_descs;
1686 // TODO(ajkr): `column_family_names_` is not safe to access here when
1687 // `clear_column_family_one_in != 0`. But we can't easily switch to
1688 // `ListColumnFamilies` to get names because it won't necessarily give
1689 // the same order as `column_family_names_`.
1690 assert(FLAGS_clear_column_family_one_in == 0);
1691 if (FLAGS_clear_column_family_one_in == 0) {
1692 for (const auto& name : column_family_names_) {
1693 cf_descs.emplace_back(name, ColumnFamilyOptions(options));
1694 }
1695 s = DB::OpenForReadOnly(DBOptions(options), checkpoint_dir, cf_descs,
1696 &cf_handles, &checkpoint_db);
1697 }
1698 }
1699 if (checkpoint_db != nullptr) {
1700 // Note the column families chosen by `rand_column_families` cannot be
1701 // dropped while the locks for `rand_keys` are held. So we should not have
1702 // to worry about accessing those column families throughout this function.
1703 for (size_t i = 0; s.ok() && i < rand_column_families.size(); ++i) {
1704 std::string key_str = Key(rand_keys[0]);
1705 Slice key = key_str;
1706 std::string value;
1707 Status get_status = checkpoint_db->Get(
1708 ReadOptions(), cf_handles[rand_column_families[i]], key, &value);
1709 bool exists =
1710 thread->shared->Exists(rand_column_families[i], rand_keys[0]);
1711 if (get_status.ok()) {
1712 if (!exists && ShouldAcquireMutexOnKey()) {
1713 s = Status::Corruption(
1714 "key exists in checkpoint but not in original db");
1715 }
1716 } else if (get_status.IsNotFound()) {
1717 if (exists && ShouldAcquireMutexOnKey()) {
1718 s = Status::Corruption(
1719 "key exists in original db but not in checkpoint");
1720 }
1721 } else {
1722 s = get_status;
1723 }
1724 }
1725 for (auto cfh : cf_handles) {
1726 delete cfh;
1727 }
1728 cf_handles.clear();
1729 delete checkpoint_db;
1730 checkpoint_db = nullptr;
1731 }
1732
1733 if (!s.ok()) {
1734 fprintf(stderr, "A checkpoint operation failed with: %s\n",
1735 s.ToString().c_str());
1736 } else {
1737 DestroyDB(checkpoint_dir, tmp_opts);
1738 }
1739 return s;
1740 }
1741
TestGetProperty(ThreadState * thread) const1742 void StressTest::TestGetProperty(ThreadState* thread) const {
1743 std::unordered_set<std::string> levelPropertyNames = {
1744 DB::Properties::kAggregatedTablePropertiesAtLevel,
1745 DB::Properties::kCompressionRatioAtLevelPrefix,
1746 DB::Properties::kNumFilesAtLevelPrefix,
1747 };
1748 std::unordered_set<std::string> unknownPropertyNames = {
1749 DB::Properties::kEstimateOldestKeyTime,
1750 DB::Properties::kOptionsStatistics,
1751 DB::Properties::
1752 kLiveSstFilesSizeAtTemperature, // similar to levelPropertyNames, it
1753 // requires a number suffix
1754 };
1755 unknownPropertyNames.insert(levelPropertyNames.begin(),
1756 levelPropertyNames.end());
1757
1758 std::string prop;
1759 for (const auto& ppt_name_and_info : InternalStats::ppt_name_to_info) {
1760 bool res = db_->GetProperty(ppt_name_and_info.first, &prop);
1761 if (unknownPropertyNames.find(ppt_name_and_info.first) ==
1762 unknownPropertyNames.end()) {
1763 if (!res) {
1764 fprintf(stderr, "Failed to get DB property: %s\n",
1765 ppt_name_and_info.first.c_str());
1766 thread->shared->SetVerificationFailure();
1767 }
1768 if (ppt_name_and_info.second.handle_int != nullptr) {
1769 uint64_t prop_int;
1770 if (!db_->GetIntProperty(ppt_name_and_info.first, &prop_int)) {
1771 fprintf(stderr, "Failed to get Int property: %s\n",
1772 ppt_name_and_info.first.c_str());
1773 thread->shared->SetVerificationFailure();
1774 }
1775 }
1776 if (ppt_name_and_info.second.handle_map != nullptr) {
1777 std::map<std::string, std::string> prop_map;
1778 if (!db_->GetMapProperty(ppt_name_and_info.first, &prop_map)) {
1779 fprintf(stderr, "Failed to get Map property: %s\n",
1780 ppt_name_and_info.first.c_str());
1781 thread->shared->SetVerificationFailure();
1782 }
1783 }
1784 }
1785 }
1786
1787 ROCKSDB_NAMESPACE::ColumnFamilyMetaData cf_meta_data;
1788 db_->GetColumnFamilyMetaData(&cf_meta_data);
1789 int level_size = static_cast<int>(cf_meta_data.levels.size());
1790 for (int level = 0; level < level_size; level++) {
1791 for (const auto& ppt_name : levelPropertyNames) {
1792 bool res = db_->GetProperty(ppt_name + std::to_string(level), &prop);
1793 if (!res) {
1794 fprintf(stderr, "Failed to get DB property: %s\n",
1795 (ppt_name + std::to_string(level)).c_str());
1796 thread->shared->SetVerificationFailure();
1797 }
1798 }
1799 }
1800
1801 // Test for an invalid property name
1802 if (thread->rand.OneIn(100)) {
1803 if (db_->GetProperty("rocksdb.invalid_property_name", &prop)) {
1804 fprintf(stderr, "Failed to return false for invalid property name\n");
1805 thread->shared->SetVerificationFailure();
1806 }
1807 }
1808 }
1809
TestCompactFiles(ThreadState * thread,ColumnFamilyHandle * column_family)1810 void StressTest::TestCompactFiles(ThreadState* thread,
1811 ColumnFamilyHandle* column_family) {
1812 ROCKSDB_NAMESPACE::ColumnFamilyMetaData cf_meta_data;
1813 db_->GetColumnFamilyMetaData(column_family, &cf_meta_data);
1814
1815 // Randomly compact up to three consecutive files from a level
1816 const int kMaxRetry = 3;
1817 for (int attempt = 0; attempt < kMaxRetry; ++attempt) {
1818 size_t random_level =
1819 thread->rand.Uniform(static_cast<int>(cf_meta_data.levels.size()));
1820
1821 const auto& files = cf_meta_data.levels[random_level].files;
1822 if (files.size() > 0) {
1823 size_t random_file_index =
1824 thread->rand.Uniform(static_cast<int>(files.size()));
1825 if (files[random_file_index].being_compacted) {
1826 // Retry as the selected file is currently being compacted
1827 continue;
1828 }
1829
1830 std::vector<std::string> input_files;
1831 input_files.push_back(files[random_file_index].name);
1832 if (random_file_index > 0 &&
1833 !files[random_file_index - 1].being_compacted) {
1834 input_files.push_back(files[random_file_index - 1].name);
1835 }
1836 if (random_file_index + 1 < files.size() &&
1837 !files[random_file_index + 1].being_compacted) {
1838 input_files.push_back(files[random_file_index + 1].name);
1839 }
1840
1841 size_t output_level =
1842 std::min(random_level + 1, cf_meta_data.levels.size() - 1);
1843 auto s = db_->CompactFiles(CompactionOptions(), column_family,
1844 input_files, static_cast<int>(output_level));
1845 if (!s.ok()) {
1846 fprintf(stdout, "Unable to perform CompactFiles(): %s\n",
1847 s.ToString().c_str());
1848 thread->stats.AddNumCompactFilesFailed(1);
1849 } else {
1850 thread->stats.AddNumCompactFilesSucceed(1);
1851 }
1852 break;
1853 }
1854 }
1855 }
1856 #endif // ROCKSDB_LITE
1857
TestFlush(const std::vector<int> & rand_column_families)1858 Status StressTest::TestFlush(const std::vector<int>& rand_column_families) {
1859 FlushOptions flush_opts;
1860 std::vector<ColumnFamilyHandle*> cfhs;
1861 std::for_each(rand_column_families.begin(), rand_column_families.end(),
1862 [this, &cfhs](int k) { cfhs.push_back(column_families_[k]); });
1863 return db_->Flush(flush_opts, cfhs);
1864 }
1865
TestPauseBackground(ThreadState * thread)1866 Status StressTest::TestPauseBackground(ThreadState* thread) {
1867 Status status = db_->PauseBackgroundWork();
1868 if (!status.ok()) {
1869 return status;
1870 }
1871 // To avoid stalling/deadlocking ourself in this thread, just
1872 // sleep here during pause and let other threads do db operations.
1873 // Sleep up to ~16 seconds (2**24 microseconds), but very skewed
1874 // toward short pause. (1 chance in 25 of pausing >= 1s;
1875 // 1 chance in 625 of pausing full 16s.)
1876 int pwr2_micros =
1877 std::min(thread->rand.Uniform(25), thread->rand.Uniform(25));
1878 clock_->SleepForMicroseconds(1 << pwr2_micros);
1879 return db_->ContinueBackgroundWork();
1880 }
1881
TestAcquireSnapshot(ThreadState * thread,int rand_column_family,const std::string & keystr,uint64_t i)1882 void StressTest::TestAcquireSnapshot(ThreadState* thread,
1883 int rand_column_family,
1884 const std::string& keystr, uint64_t i) {
1885 Slice key = keystr;
1886 ColumnFamilyHandle* column_family = column_families_[rand_column_family];
1887 ReadOptions ropt;
1888 #ifndef ROCKSDB_LITE
1889 auto db_impl = static_cast_with_check<DBImpl>(db_->GetRootDB());
1890 const bool ww_snapshot = thread->rand.OneIn(10);
1891 const Snapshot* snapshot =
1892 ww_snapshot ? db_impl->GetSnapshotForWriteConflictBoundary()
1893 : db_->GetSnapshot();
1894 #else
1895 const Snapshot* snapshot = db_->GetSnapshot();
1896 #endif // !ROCKSDB_LITE
1897 ropt.snapshot = snapshot;
1898
1899 // Ideally, we want snapshot taking and timestamp generation to be atomic
1900 // here, so that the snapshot corresponds to the timestamp. However, it is
1901 // not possible with current GetSnapshot() API.
1902 std::string ts_str;
1903 Slice ts;
1904 if (FLAGS_user_timestamp_size > 0) {
1905 ts_str = GenerateTimestampForRead();
1906 ts = ts_str;
1907 ropt.timestamp = &ts;
1908 }
1909
1910 std::string value_at;
1911 // When taking a snapshot, we also read a key from that snapshot. We
1912 // will later read the same key before releasing the snapshot and
1913 // verify that the results are the same.
1914 auto status_at = db_->Get(ropt, column_family, key, &value_at);
1915 std::vector<bool>* key_vec = nullptr;
1916
1917 if (FLAGS_compare_full_db_state_snapshot && (thread->tid == 0)) {
1918 key_vec = new std::vector<bool>(FLAGS_max_key);
1919 // When `prefix_extractor` is set, seeking to beginning and scanning
1920 // across prefixes are only supported with `total_order_seek` set.
1921 ropt.total_order_seek = true;
1922 std::unique_ptr<Iterator> iterator(db_->NewIterator(ropt));
1923 for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
1924 uint64_t key_val;
1925 if (GetIntVal(iterator->key().ToString(), &key_val)) {
1926 (*key_vec)[key_val] = true;
1927 }
1928 }
1929 }
1930
1931 ThreadState::SnapshotState snap_state = {snapshot,
1932 rand_column_family,
1933 column_family->GetName(),
1934 keystr,
1935 status_at,
1936 value_at,
1937 key_vec,
1938 ts_str};
1939 uint64_t hold_for = FLAGS_snapshot_hold_ops;
1940 if (FLAGS_long_running_snapshots) {
1941 // Hold 10% of snapshots for 10x more
1942 if (thread->rand.OneIn(10)) {
1943 assert(hold_for < port::kMaxInt64 / 10);
1944 hold_for *= 10;
1945 // Hold 1% of snapshots for 100x more
1946 if (thread->rand.OneIn(10)) {
1947 assert(hold_for < port::kMaxInt64 / 10);
1948 hold_for *= 10;
1949 }
1950 }
1951 }
1952 uint64_t release_at = std::min(FLAGS_ops_per_thread - 1, i + hold_for);
1953 thread->snapshot_queue.emplace(release_at, snap_state);
1954 }
1955
MaybeReleaseSnapshots(ThreadState * thread,uint64_t i)1956 Status StressTest::MaybeReleaseSnapshots(ThreadState* thread, uint64_t i) {
1957 while (!thread->snapshot_queue.empty() &&
1958 i >= thread->snapshot_queue.front().first) {
1959 auto snap_state = thread->snapshot_queue.front().second;
1960 assert(snap_state.snapshot);
1961 // Note: this is unsafe as the cf might be dropped concurrently. But
1962 // it is ok since unclean cf drop is cunnrently not supported by write
1963 // prepared transactions.
1964 Status s = AssertSame(db_, column_families_[snap_state.cf_at], snap_state);
1965 db_->ReleaseSnapshot(snap_state.snapshot);
1966 delete snap_state.key_vec;
1967 thread->snapshot_queue.pop();
1968 if (!s.ok()) {
1969 return s;
1970 }
1971 }
1972 return Status::OK();
1973 }
1974
TestCompactRange(ThreadState * thread,int64_t rand_key,const Slice & start_key,ColumnFamilyHandle * column_family)1975 void StressTest::TestCompactRange(ThreadState* thread, int64_t rand_key,
1976 const Slice& start_key,
1977 ColumnFamilyHandle* column_family) {
1978 int64_t end_key_num;
1979 if (port::kMaxInt64 - rand_key < FLAGS_compact_range_width) {
1980 end_key_num = port::kMaxInt64;
1981 } else {
1982 end_key_num = FLAGS_compact_range_width + rand_key;
1983 }
1984 std::string end_key_buf = Key(end_key_num);
1985 Slice end_key(end_key_buf);
1986
1987 CompactRangeOptions cro;
1988 cro.exclusive_manual_compaction = static_cast<bool>(thread->rand.Next() % 2);
1989 cro.change_level = static_cast<bool>(thread->rand.Next() % 2);
1990 std::vector<BottommostLevelCompaction> bottom_level_styles = {
1991 BottommostLevelCompaction::kSkip,
1992 BottommostLevelCompaction::kIfHaveCompactionFilter,
1993 BottommostLevelCompaction::kForce,
1994 BottommostLevelCompaction::kForceOptimized};
1995 cro.bottommost_level_compaction =
1996 bottom_level_styles[thread->rand.Next() %
1997 static_cast<uint32_t>(bottom_level_styles.size())];
1998 cro.allow_write_stall = static_cast<bool>(thread->rand.Next() % 2);
1999 cro.max_subcompactions = static_cast<uint32_t>(thread->rand.Next() % 4);
2000
2001 const Snapshot* pre_snapshot = nullptr;
2002 uint32_t pre_hash = 0;
2003 if (thread->rand.OneIn(2)) {
2004 // Do some validation by declaring a snapshot and compare the data before
2005 // and after the compaction
2006 pre_snapshot = db_->GetSnapshot();
2007 pre_hash =
2008 GetRangeHash(thread, pre_snapshot, column_family, start_key, end_key);
2009 }
2010
2011 Status status = db_->CompactRange(cro, column_family, &start_key, &end_key);
2012
2013 if (!status.ok()) {
2014 fprintf(stdout, "Unable to perform CompactRange(): %s\n",
2015 status.ToString().c_str());
2016 }
2017
2018 if (pre_snapshot != nullptr) {
2019 uint32_t post_hash =
2020 GetRangeHash(thread, pre_snapshot, column_family, start_key, end_key);
2021 if (pre_hash != post_hash) {
2022 fprintf(stderr,
2023 "Data hash different before and after compact range "
2024 "start_key %s end_key %s\n",
2025 start_key.ToString(true).c_str(), end_key.ToString(true).c_str());
2026 thread->stats.AddErrors(1);
2027 // Fail fast to preserve the DB state.
2028 thread->shared->SetVerificationFailure();
2029 }
2030 db_->ReleaseSnapshot(pre_snapshot);
2031 }
2032 }
2033
GetRangeHash(ThreadState * thread,const Snapshot * snapshot,ColumnFamilyHandle * column_family,const Slice & start_key,const Slice & end_key)2034 uint32_t StressTest::GetRangeHash(ThreadState* thread, const Snapshot* snapshot,
2035 ColumnFamilyHandle* column_family,
2036 const Slice& start_key,
2037 const Slice& end_key) {
2038 const std::string kCrcCalculatorSepearator = ";";
2039 uint32_t crc = 0;
2040 ReadOptions ro;
2041 ro.snapshot = snapshot;
2042 ro.total_order_seek = true;
2043 std::string ts_str;
2044 Slice ts;
2045 if (FLAGS_user_timestamp_size > 0) {
2046 ts_str = GenerateTimestampForRead();
2047 ts = ts_str;
2048 ro.timestamp = &ts;
2049 }
2050 std::unique_ptr<Iterator> it(db_->NewIterator(ro, column_family));
2051 for (it->Seek(start_key);
2052 it->Valid() && options_.comparator->Compare(it->key(), end_key) <= 0;
2053 it->Next()) {
2054 crc = crc32c::Extend(crc, it->key().data(), it->key().size());
2055 crc = crc32c::Extend(crc, kCrcCalculatorSepearator.data(), 1);
2056 crc = crc32c::Extend(crc, it->value().data(), it->value().size());
2057 crc = crc32c::Extend(crc, kCrcCalculatorSepearator.data(), 1);
2058 }
2059 if (!it->status().ok()) {
2060 fprintf(stderr, "Iterator non-OK when calculating range CRC: %s\n",
2061 it->status().ToString().c_str());
2062 thread->stats.AddErrors(1);
2063 // Fail fast to preserve the DB state.
2064 thread->shared->SetVerificationFailure();
2065 }
2066 return crc;
2067 }
2068
PrintEnv() const2069 void StressTest::PrintEnv() const {
2070 fprintf(stdout, "RocksDB version : %d.%d\n", kMajorVersion,
2071 kMinorVersion);
2072 fprintf(stdout, "Format version : %d\n", FLAGS_format_version);
2073 fprintf(stdout, "TransactionDB : %s\n",
2074 FLAGS_use_txn ? "true" : "false");
2075 #ifndef ROCKSDB_LITE
2076 fprintf(stdout, "Stacked BlobDB : %s\n",
2077 FLAGS_use_blob_db ? "true" : "false");
2078 #endif // !ROCKSDB_LITE
2079 fprintf(stdout, "Read only mode : %s\n",
2080 FLAGS_read_only ? "true" : "false");
2081 fprintf(stdout, "Atomic flush : %s\n",
2082 FLAGS_atomic_flush ? "true" : "false");
2083 fprintf(stdout, "Column families : %d\n", FLAGS_column_families);
2084 if (!FLAGS_test_batches_snapshots) {
2085 fprintf(stdout, "Clear CFs one in : %d\n",
2086 FLAGS_clear_column_family_one_in);
2087 }
2088 fprintf(stdout, "Number of threads : %d\n", FLAGS_threads);
2089 fprintf(stdout, "Ops per thread : %lu\n",
2090 (unsigned long)FLAGS_ops_per_thread);
2091 std::string ttl_state("unused");
2092 if (FLAGS_ttl > 0) {
2093 ttl_state = ToString(FLAGS_ttl);
2094 }
2095 fprintf(stdout, "Time to live(sec) : %s\n", ttl_state.c_str());
2096 fprintf(stdout, "Read percentage : %d%%\n", FLAGS_readpercent);
2097 fprintf(stdout, "Prefix percentage : %d%%\n", FLAGS_prefixpercent);
2098 fprintf(stdout, "Write percentage : %d%%\n", FLAGS_writepercent);
2099 fprintf(stdout, "Delete percentage : %d%%\n", FLAGS_delpercent);
2100 fprintf(stdout, "Delete range percentage : %d%%\n", FLAGS_delrangepercent);
2101 fprintf(stdout, "No overwrite percentage : %d%%\n",
2102 FLAGS_nooverwritepercent);
2103 fprintf(stdout, "Iterate percentage : %d%%\n", FLAGS_iterpercent);
2104 fprintf(stdout, "DB-write-buffer-size : %" PRIu64 "\n",
2105 FLAGS_db_write_buffer_size);
2106 fprintf(stdout, "Write-buffer-size : %d\n", FLAGS_write_buffer_size);
2107 fprintf(stdout, "Iterations : %lu\n",
2108 (unsigned long)FLAGS_num_iterations);
2109 fprintf(stdout, "Max key : %lu\n",
2110 (unsigned long)FLAGS_max_key);
2111 fprintf(stdout, "Ratio #ops/#keys : %f\n",
2112 (1.0 * FLAGS_ops_per_thread * FLAGS_threads) / FLAGS_max_key);
2113 fprintf(stdout, "Num times DB reopens : %d\n", FLAGS_reopen);
2114 fprintf(stdout, "Batches/snapshots : %d\n",
2115 FLAGS_test_batches_snapshots);
2116 fprintf(stdout, "Do update in place : %d\n", FLAGS_in_place_update);
2117 fprintf(stdout, "Num keys per lock : %d\n",
2118 1 << FLAGS_log2_keys_per_lock);
2119 std::string compression = CompressionTypeToString(compression_type_e);
2120 fprintf(stdout, "Compression : %s\n", compression.c_str());
2121 std::string bottommost_compression =
2122 CompressionTypeToString(bottommost_compression_type_e);
2123 fprintf(stdout, "Bottommost Compression : %s\n",
2124 bottommost_compression.c_str());
2125 std::string checksum = ChecksumTypeToString(checksum_type_e);
2126 fprintf(stdout, "Checksum type : %s\n", checksum.c_str());
2127 fprintf(stdout, "File checksum impl : %s\n",
2128 FLAGS_file_checksum_impl.c_str());
2129 fprintf(stdout, "Bloom bits / key : %s\n",
2130 FormatDoubleParam(FLAGS_bloom_bits).c_str());
2131 fprintf(stdout, "Max subcompactions : %" PRIu64 "\n",
2132 FLAGS_subcompactions);
2133 fprintf(stdout, "Use MultiGet : %s\n",
2134 FLAGS_use_multiget ? "true" : "false");
2135
2136 const char* memtablerep = "";
2137 switch (FLAGS_rep_factory) {
2138 case kSkipList:
2139 memtablerep = "skip_list";
2140 break;
2141 case kHashSkipList:
2142 memtablerep = "prefix_hash";
2143 break;
2144 case kVectorRep:
2145 memtablerep = "vector";
2146 break;
2147 }
2148
2149 fprintf(stdout, "Memtablerep : %s\n", memtablerep);
2150
2151 #ifndef NDEBUG
2152 KillPoint* kp = KillPoint::GetInstance();
2153 fprintf(stdout, "Test kill odd : %d\n", kp->rocksdb_kill_odds);
2154 if (!kp->rocksdb_kill_exclude_prefixes.empty()) {
2155 fprintf(stdout, "Skipping kill points prefixes:\n");
2156 for (auto& p : kp->rocksdb_kill_exclude_prefixes) {
2157 fprintf(stdout, " %s\n", p.c_str());
2158 }
2159 }
2160 #endif
2161 fprintf(stdout, "Periodic Compaction Secs : %" PRIu64 "\n",
2162 FLAGS_periodic_compaction_seconds);
2163 fprintf(stdout, "Compaction TTL : %" PRIu64 "\n",
2164 FLAGS_compaction_ttl);
2165 fprintf(stdout, "Background Purge : %d\n",
2166 static_cast<int>(FLAGS_avoid_unnecessary_blocking_io));
2167 fprintf(stdout, "Write DB ID to manifest : %d\n",
2168 static_cast<int>(FLAGS_write_dbid_to_manifest));
2169 fprintf(stdout, "Max Write Batch Group Size: %" PRIu64 "\n",
2170 FLAGS_max_write_batch_group_size_bytes);
2171 fprintf(stdout, "Use dynamic level : %d\n",
2172 static_cast<int>(FLAGS_level_compaction_dynamic_level_bytes));
2173 fprintf(stdout, "Read fault one in : %d\n", FLAGS_read_fault_one_in);
2174 fprintf(stdout, "Write fault one in : %d\n", FLAGS_write_fault_one_in);
2175 fprintf(stdout, "Open metadata write fault one in:\n");
2176 fprintf(stdout, " %d\n",
2177 FLAGS_open_metadata_write_fault_one_in);
2178 fprintf(stdout, "Sync fault injection : %d\n", FLAGS_sync_fault_injection);
2179 fprintf(stdout, "Best efforts recovery : %d\n",
2180 static_cast<int>(FLAGS_best_efforts_recovery));
2181 fprintf(stdout, "Fail if OPTIONS file error: %d\n",
2182 static_cast<int>(FLAGS_fail_if_options_file_error));
2183 fprintf(stdout, "User timestamp size bytes : %d\n",
2184 static_cast<int>(FLAGS_user_timestamp_size));
2185
2186 fprintf(stdout, "------------------------------------------------\n");
2187 }
2188
Open()2189 void StressTest::Open() {
2190 assert(db_ == nullptr);
2191 #ifndef ROCKSDB_LITE
2192 assert(txn_db_ == nullptr);
2193 #endif
2194 if (FLAGS_options_file.empty()) {
2195 BlockBasedTableOptions block_based_options;
2196 block_based_options.block_cache = cache_;
2197 block_based_options.cache_index_and_filter_blocks =
2198 FLAGS_cache_index_and_filter_blocks;
2199 block_based_options.metadata_cache_options.top_level_index_pinning =
2200 static_cast<PinningTier>(FLAGS_top_level_index_pinning);
2201 block_based_options.metadata_cache_options.partition_pinning =
2202 static_cast<PinningTier>(FLAGS_partition_pinning);
2203 block_based_options.metadata_cache_options.unpartitioned_pinning =
2204 static_cast<PinningTier>(FLAGS_unpartitioned_pinning);
2205 block_based_options.block_cache_compressed = compressed_cache_;
2206 block_based_options.checksum = checksum_type_e;
2207 block_based_options.block_size = FLAGS_block_size;
2208 block_based_options.format_version =
2209 static_cast<uint32_t>(FLAGS_format_version);
2210 block_based_options.index_block_restart_interval =
2211 static_cast<int32_t>(FLAGS_index_block_restart_interval);
2212 block_based_options.filter_policy = filter_policy_;
2213 block_based_options.partition_filters = FLAGS_partition_filters;
2214 block_based_options.optimize_filters_for_memory =
2215 FLAGS_optimize_filters_for_memory;
2216 block_based_options.index_type =
2217 static_cast<BlockBasedTableOptions::IndexType>(FLAGS_index_type);
2218 options_.table_factory.reset(
2219 NewBlockBasedTableFactory(block_based_options));
2220 options_.db_write_buffer_size = FLAGS_db_write_buffer_size;
2221 options_.write_buffer_size = FLAGS_write_buffer_size;
2222 options_.max_write_buffer_number = FLAGS_max_write_buffer_number;
2223 options_.min_write_buffer_number_to_merge =
2224 FLAGS_min_write_buffer_number_to_merge;
2225 options_.max_write_buffer_number_to_maintain =
2226 FLAGS_max_write_buffer_number_to_maintain;
2227 options_.max_write_buffer_size_to_maintain =
2228 FLAGS_max_write_buffer_size_to_maintain;
2229 options_.memtable_prefix_bloom_size_ratio =
2230 FLAGS_memtable_prefix_bloom_size_ratio;
2231 options_.memtable_whole_key_filtering = FLAGS_memtable_whole_key_filtering;
2232 options_.max_background_compactions = FLAGS_max_background_compactions;
2233 options_.max_background_flushes = FLAGS_max_background_flushes;
2234 options_.compaction_style =
2235 static_cast<ROCKSDB_NAMESPACE::CompactionStyle>(FLAGS_compaction_style);
2236 if (FLAGS_prefix_size >= 0) {
2237 options_.prefix_extractor.reset(
2238 NewFixedPrefixTransform(FLAGS_prefix_size));
2239 }
2240 options_.max_open_files = FLAGS_open_files;
2241 options_.statistics = dbstats;
2242 options_.env = db_stress_env;
2243 options_.use_fsync = FLAGS_use_fsync;
2244 options_.compaction_readahead_size = FLAGS_compaction_readahead_size;
2245 options_.allow_mmap_reads = FLAGS_mmap_read;
2246 options_.allow_mmap_writes = FLAGS_mmap_write;
2247 options_.use_direct_reads = FLAGS_use_direct_reads;
2248 options_.use_direct_io_for_flush_and_compaction =
2249 FLAGS_use_direct_io_for_flush_and_compaction;
2250 options_.recycle_log_file_num =
2251 static_cast<size_t>(FLAGS_recycle_log_file_num);
2252 options_.target_file_size_base = FLAGS_target_file_size_base;
2253 options_.target_file_size_multiplier = FLAGS_target_file_size_multiplier;
2254 options_.max_bytes_for_level_base = FLAGS_max_bytes_for_level_base;
2255 options_.max_bytes_for_level_multiplier =
2256 FLAGS_max_bytes_for_level_multiplier;
2257 options_.level0_stop_writes_trigger = FLAGS_level0_stop_writes_trigger;
2258 options_.level0_slowdown_writes_trigger =
2259 FLAGS_level0_slowdown_writes_trigger;
2260 options_.level0_file_num_compaction_trigger =
2261 FLAGS_level0_file_num_compaction_trigger;
2262 options_.compression = compression_type_e;
2263 options_.bottommost_compression = bottommost_compression_type_e;
2264 options_.compression_opts.max_dict_bytes = FLAGS_compression_max_dict_bytes;
2265 options_.compression_opts.zstd_max_train_bytes =
2266 FLAGS_compression_zstd_max_train_bytes;
2267 options_.compression_opts.parallel_threads =
2268 FLAGS_compression_parallel_threads;
2269 options_.compression_opts.max_dict_buffer_bytes =
2270 FLAGS_compression_max_dict_buffer_bytes;
2271 options_.create_if_missing = true;
2272 options_.max_manifest_file_size = FLAGS_max_manifest_file_size;
2273 options_.inplace_update_support = FLAGS_in_place_update;
2274 options_.max_subcompactions = static_cast<uint32_t>(FLAGS_subcompactions);
2275 options_.allow_concurrent_memtable_write =
2276 FLAGS_allow_concurrent_memtable_write;
2277 options_.experimental_mempurge_threshold =
2278 FLAGS_experimental_mempurge_threshold;
2279 options_.periodic_compaction_seconds = FLAGS_periodic_compaction_seconds;
2280 options_.ttl = FLAGS_compaction_ttl;
2281 options_.enable_pipelined_write = FLAGS_enable_pipelined_write;
2282 options_.enable_write_thread_adaptive_yield =
2283 FLAGS_enable_write_thread_adaptive_yield;
2284 options_.compaction_options_universal.size_ratio =
2285 FLAGS_universal_size_ratio;
2286 options_.compaction_options_universal.min_merge_width =
2287 FLAGS_universal_min_merge_width;
2288 options_.compaction_options_universal.max_merge_width =
2289 FLAGS_universal_max_merge_width;
2290 options_.compaction_options_universal.max_size_amplification_percent =
2291 FLAGS_universal_max_size_amplification_percent;
2292 options_.atomic_flush = FLAGS_atomic_flush;
2293 options_.avoid_unnecessary_blocking_io =
2294 FLAGS_avoid_unnecessary_blocking_io;
2295 options_.write_dbid_to_manifest = FLAGS_write_dbid_to_manifest;
2296 options_.avoid_flush_during_recovery = FLAGS_avoid_flush_during_recovery;
2297 options_.max_write_batch_group_size_bytes =
2298 FLAGS_max_write_batch_group_size_bytes;
2299 options_.level_compaction_dynamic_level_bytes =
2300 FLAGS_level_compaction_dynamic_level_bytes;
2301 options_.file_checksum_gen_factory =
2302 GetFileChecksumImpl(FLAGS_file_checksum_impl);
2303 options_.track_and_verify_wals_in_manifest = true;
2304
2305 // Integrated BlobDB
2306 options_.enable_blob_files = FLAGS_enable_blob_files;
2307 options_.min_blob_size = FLAGS_min_blob_size;
2308 options_.blob_file_size = FLAGS_blob_file_size;
2309 options_.blob_compression_type =
2310 StringToCompressionType(FLAGS_blob_compression_type.c_str());
2311 options_.enable_blob_garbage_collection =
2312 FLAGS_enable_blob_garbage_collection;
2313 options_.blob_garbage_collection_age_cutoff =
2314 FLAGS_blob_garbage_collection_age_cutoff;
2315 options_.blob_garbage_collection_force_threshold =
2316 FLAGS_blob_garbage_collection_force_threshold;
2317 } else {
2318 #ifdef ROCKSDB_LITE
2319 fprintf(stderr, "--options_file not supported in lite mode\n");
2320 exit(1);
2321 #else
2322 DBOptions db_options;
2323 std::vector<ColumnFamilyDescriptor> cf_descriptors;
2324 Status s = LoadOptionsFromFile(FLAGS_options_file, db_stress_env,
2325 &db_options, &cf_descriptors);
2326 db_options.env = new DbStressEnvWrapper(db_stress_env);
2327 if (!s.ok()) {
2328 fprintf(stderr, "Unable to load options file %s --- %s\n",
2329 FLAGS_options_file.c_str(), s.ToString().c_str());
2330 exit(1);
2331 }
2332 options_ = Options(db_options, cf_descriptors[0].options);
2333 #endif // ROCKSDB_LITE
2334 }
2335
2336 if (FLAGS_rate_limiter_bytes_per_sec > 0) {
2337 options_.rate_limiter.reset(NewGenericRateLimiter(
2338 FLAGS_rate_limiter_bytes_per_sec, 1000 /* refill_period_us */,
2339 10 /* fairness */,
2340 FLAGS_rate_limit_bg_reads ? RateLimiter::Mode::kReadsOnly
2341 : RateLimiter::Mode::kWritesOnly));
2342 if (FLAGS_rate_limit_bg_reads) {
2343 options_.new_table_reader_for_compaction_inputs = true;
2344 }
2345 }
2346 if (FLAGS_sst_file_manager_bytes_per_sec > 0 ||
2347 FLAGS_sst_file_manager_bytes_per_truncate > 0) {
2348 Status status;
2349 options_.sst_file_manager.reset(NewSstFileManager(
2350 db_stress_env, options_.info_log, "" /* trash_dir */,
2351 static_cast<int64_t>(FLAGS_sst_file_manager_bytes_per_sec),
2352 true /* delete_existing_trash */, &status,
2353 0.25 /* max_trash_db_ratio */,
2354 FLAGS_sst_file_manager_bytes_per_truncate));
2355 if (!status.ok()) {
2356 fprintf(stderr, "SstFileManager creation failed: %s\n",
2357 status.ToString().c_str());
2358 exit(1);
2359 }
2360 }
2361
2362 if (FLAGS_prefix_size == 0 && FLAGS_rep_factory == kHashSkipList) {
2363 fprintf(stderr,
2364 "prefeix_size cannot be zero if memtablerep == prefix_hash\n");
2365 exit(1);
2366 }
2367 if (FLAGS_prefix_size != 0 && FLAGS_rep_factory != kHashSkipList) {
2368 fprintf(stderr,
2369 "WARNING: prefix_size is non-zero but "
2370 "memtablerep != prefix_hash\n");
2371 }
2372 switch (FLAGS_rep_factory) {
2373 case kSkipList:
2374 // no need to do anything
2375 break;
2376 #ifndef ROCKSDB_LITE
2377 case kHashSkipList:
2378 options_.memtable_factory.reset(NewHashSkipListRepFactory(10000));
2379 break;
2380 case kVectorRep:
2381 options_.memtable_factory.reset(new VectorRepFactory());
2382 break;
2383 #else
2384 default:
2385 fprintf(stderr,
2386 "RocksdbLite only supports skip list mem table. Skip "
2387 "--rep_factory\n");
2388 #endif // ROCKSDB_LITE
2389 }
2390
2391 if (FLAGS_use_full_merge_v1) {
2392 options_.merge_operator = MergeOperators::CreateDeprecatedPutOperator();
2393 } else {
2394 options_.merge_operator = MergeOperators::CreatePutOperator();
2395 }
2396 if (FLAGS_enable_compaction_filter) {
2397 options_.compaction_filter_factory =
2398 std::make_shared<DbStressCompactionFilterFactory>();
2399 }
2400 options_.table_properties_collector_factories.emplace_back(
2401 std::make_shared<DbStressTablePropertiesCollectorFactory>());
2402
2403 options_.best_efforts_recovery = FLAGS_best_efforts_recovery;
2404 options_.paranoid_file_checks = FLAGS_paranoid_file_checks;
2405 options_.fail_if_options_file_error = FLAGS_fail_if_options_file_error;
2406
2407 if ((options_.enable_blob_files || options_.enable_blob_garbage_collection ||
2408 FLAGS_allow_setting_blob_options_dynamically) &&
2409 FLAGS_best_efforts_recovery) {
2410 fprintf(stderr,
2411 "Integrated BlobDB is currently incompatible with best-effort "
2412 "recovery\n");
2413 exit(1);
2414 }
2415
2416 if (options_.enable_blob_files) {
2417 fprintf(stdout,
2418 "Integrated BlobDB: blob files enabled, min blob size %" PRIu64
2419 ", blob file size %" PRIu64 ", blob compression type %s\n",
2420 options_.min_blob_size, options_.blob_file_size,
2421 CompressionTypeToString(options_.blob_compression_type).c_str());
2422 }
2423
2424 if (options_.enable_blob_garbage_collection) {
2425 fprintf(
2426 stdout,
2427 "Integrated BlobDB: blob GC enabled, cutoff %f, force threshold %f\n",
2428 options_.blob_garbage_collection_age_cutoff,
2429 options_.blob_garbage_collection_force_threshold);
2430 }
2431
2432 fprintf(stdout, "DB path: [%s]\n", FLAGS_db.c_str());
2433
2434 Status s;
2435
2436 if (FLAGS_user_timestamp_size > 0) {
2437 CheckAndSetOptionsForUserTimestamp();
2438 }
2439
2440 if (FLAGS_ttl == -1) {
2441 std::vector<std::string> existing_column_families;
2442 s = DB::ListColumnFamilies(DBOptions(options_), FLAGS_db,
2443 &existing_column_families); // ignore errors
2444 if (!s.ok()) {
2445 // DB doesn't exist
2446 assert(existing_column_families.empty());
2447 assert(column_family_names_.empty());
2448 column_family_names_.push_back(kDefaultColumnFamilyName);
2449 } else if (column_family_names_.empty()) {
2450 // this is the first call to the function Open()
2451 column_family_names_ = existing_column_families;
2452 } else {
2453 // this is a reopen. just assert that existing column_family_names are
2454 // equivalent to what we remember
2455 auto sorted_cfn = column_family_names_;
2456 std::sort(sorted_cfn.begin(), sorted_cfn.end());
2457 std::sort(existing_column_families.begin(),
2458 existing_column_families.end());
2459 if (sorted_cfn != existing_column_families) {
2460 fprintf(stderr, "Expected column families differ from the existing:\n");
2461 fprintf(stderr, "Expected: {");
2462 for (auto cf : sorted_cfn) {
2463 fprintf(stderr, "%s ", cf.c_str());
2464 }
2465 fprintf(stderr, "}\n");
2466 fprintf(stderr, "Existing: {");
2467 for (auto cf : existing_column_families) {
2468 fprintf(stderr, "%s ", cf.c_str());
2469 }
2470 fprintf(stderr, "}\n");
2471 }
2472 assert(sorted_cfn == existing_column_families);
2473 }
2474 std::vector<ColumnFamilyDescriptor> cf_descriptors;
2475 for (auto name : column_family_names_) {
2476 if (name != kDefaultColumnFamilyName) {
2477 new_column_family_name_ =
2478 std::max(new_column_family_name_.load(), std::stoi(name) + 1);
2479 }
2480 cf_descriptors.emplace_back(name, ColumnFamilyOptions(options_));
2481 }
2482 while (cf_descriptors.size() < (size_t)FLAGS_column_families) {
2483 std::string name = ToString(new_column_family_name_.load());
2484 new_column_family_name_++;
2485 cf_descriptors.emplace_back(name, ColumnFamilyOptions(options_));
2486 column_family_names_.push_back(name);
2487 }
2488 options_.listeners.clear();
2489 #ifndef ROCKSDB_LITE
2490 options_.listeners.emplace_back(
2491 new DbStressListener(FLAGS_db, options_.db_paths, cf_descriptors));
2492 #endif // !ROCKSDB_LITE
2493 options_.create_missing_column_families = true;
2494 if (!FLAGS_use_txn) {
2495 #ifndef NDEBUG
2496 // Determine whether we need to ingest file metadata write failures
2497 // during DB reopen. If it does, enable it.
2498 // Only ingest metadata error if it is reopening, as initial open
2499 // failure doesn't need to be handled.
2500 // TODO cover transaction DB is not covered in this fault test too.
2501 bool ingest_meta_error = false;
2502 bool ingest_write_error = false;
2503 bool ingest_read_error = false;
2504 if ((FLAGS_open_metadata_write_fault_one_in ||
2505 FLAGS_open_write_fault_one_in || FLAGS_open_read_fault_one_in) &&
2506 fault_fs_guard
2507 ->FileExists(FLAGS_db + "/CURRENT", IOOptions(), nullptr)
2508 .ok()) {
2509 if (!FLAGS_sync) {
2510 // When DB Stress is not sync mode, we expect all WAL writes to
2511 // WAL is durable. Buffering unsynced writes will cause false
2512 // positive in crash tests. Before we figure out a way to
2513 // solve it, skip WAL from failure injection.
2514 fault_fs_guard->SetSkipDirectWritableTypes({kWalFile});
2515 }
2516 ingest_meta_error = FLAGS_open_metadata_write_fault_one_in;
2517 ingest_write_error = FLAGS_open_write_fault_one_in;
2518 ingest_read_error = FLAGS_open_read_fault_one_in;
2519 if (ingest_meta_error) {
2520 fault_fs_guard->EnableMetadataWriteErrorInjection();
2521 fault_fs_guard->SetRandomMetadataWriteError(
2522 FLAGS_open_metadata_write_fault_one_in);
2523 }
2524 if (ingest_write_error) {
2525 fault_fs_guard->SetFilesystemDirectWritable(false);
2526 fault_fs_guard->EnableWriteErrorInjection();
2527 fault_fs_guard->SetRandomWriteError(
2528 static_cast<uint32_t>(FLAGS_seed), FLAGS_open_write_fault_one_in,
2529 IOStatus::IOError("Injected Open Error"),
2530 /*inject_for_all_file_types=*/true, /*types=*/{});
2531 }
2532 if (ingest_read_error) {
2533 fault_fs_guard->SetRandomReadError(FLAGS_open_read_fault_one_in);
2534 }
2535 }
2536 while (true) {
2537 #endif // NDEBUG
2538 #ifndef ROCKSDB_LITE
2539 // StackableDB-based BlobDB
2540 if (FLAGS_use_blob_db) {
2541 blob_db::BlobDBOptions blob_db_options;
2542 blob_db_options.min_blob_size = FLAGS_blob_db_min_blob_size;
2543 blob_db_options.bytes_per_sync = FLAGS_blob_db_bytes_per_sync;
2544 blob_db_options.blob_file_size = FLAGS_blob_db_file_size;
2545 blob_db_options.enable_garbage_collection = FLAGS_blob_db_enable_gc;
2546 blob_db_options.garbage_collection_cutoff = FLAGS_blob_db_gc_cutoff;
2547
2548 blob_db::BlobDB* blob_db = nullptr;
2549 s = blob_db::BlobDB::Open(options_, blob_db_options, FLAGS_db,
2550 cf_descriptors, &column_families_,
2551 &blob_db);
2552 if (s.ok()) {
2553 db_ = blob_db;
2554 }
2555 } else
2556 #endif // !ROCKSDB_LITE
2557 {
2558 if (db_preload_finished_.load() && FLAGS_read_only) {
2559 s = DB::OpenForReadOnly(DBOptions(options_), FLAGS_db,
2560 cf_descriptors, &column_families_, &db_);
2561 } else {
2562 s = DB::Open(DBOptions(options_), FLAGS_db, cf_descriptors,
2563 &column_families_, &db_);
2564 }
2565 }
2566
2567 #ifndef NDEBUG
2568 if (ingest_meta_error || ingest_write_error || ingest_read_error) {
2569 fault_fs_guard->SetFilesystemDirectWritable(true);
2570 fault_fs_guard->DisableMetadataWriteErrorInjection();
2571 fault_fs_guard->DisableWriteErrorInjection();
2572 fault_fs_guard->SetSkipDirectWritableTypes({});
2573 fault_fs_guard->SetRandomReadError(0);
2574 if (s.ok()) {
2575 // Ingested errors might happen in background compactions. We
2576 // wait for all compactions to finish to make sure DB is in
2577 // clean state before executing queries.
2578 s = static_cast_with_check<DBImpl>(db_->GetRootDB())
2579 ->TEST_WaitForCompact(true);
2580 if (!s.ok()) {
2581 for (auto cf : column_families_) {
2582 delete cf;
2583 }
2584 column_families_.clear();
2585 delete db_;
2586 db_ = nullptr;
2587 }
2588 }
2589 if (!s.ok()) {
2590 // After failure to opening a DB due to IO error, retry should
2591 // successfully open the DB with correct data if no IO error shows
2592 // up.
2593 ingest_meta_error = false;
2594 ingest_write_error = false;
2595 ingest_read_error = false;
2596
2597 Random rand(static_cast<uint32_t>(FLAGS_seed));
2598 if (rand.OneIn(2)) {
2599 fault_fs_guard->DeleteFilesCreatedAfterLastDirSync(IOOptions(),
2600 nullptr);
2601 }
2602 if (rand.OneIn(3)) {
2603 fault_fs_guard->DropUnsyncedFileData();
2604 } else if (rand.OneIn(2)) {
2605 fault_fs_guard->DropRandomUnsyncedFileData(&rand);
2606 }
2607 continue;
2608 }
2609 }
2610 break;
2611 }
2612 #endif // NDEBUG
2613 } else {
2614 #ifndef ROCKSDB_LITE
2615 TransactionDBOptions txn_db_options;
2616 assert(FLAGS_txn_write_policy <= TxnDBWritePolicy::WRITE_UNPREPARED);
2617 txn_db_options.write_policy =
2618 static_cast<TxnDBWritePolicy>(FLAGS_txn_write_policy);
2619 if (FLAGS_unordered_write) {
2620 assert(txn_db_options.write_policy == TxnDBWritePolicy::WRITE_PREPARED);
2621 options_.unordered_write = true;
2622 options_.two_write_queues = true;
2623 txn_db_options.skip_concurrency_control = true;
2624 }
2625 s = TransactionDB::Open(options_, txn_db_options, FLAGS_db,
2626 cf_descriptors, &column_families_, &txn_db_);
2627 if (!s.ok()) {
2628 fprintf(stderr, "Error in opening the TransactionDB [%s]\n",
2629 s.ToString().c_str());
2630 fflush(stderr);
2631 }
2632 assert(s.ok());
2633 db_ = txn_db_;
2634 // after a crash, rollback to commit recovered transactions
2635 std::vector<Transaction*> trans;
2636 txn_db_->GetAllPreparedTransactions(&trans);
2637 Random rand(static_cast<uint32_t>(FLAGS_seed));
2638 for (auto txn : trans) {
2639 if (rand.OneIn(2)) {
2640 s = txn->Commit();
2641 assert(s.ok());
2642 } else {
2643 s = txn->Rollback();
2644 assert(s.ok());
2645 }
2646 delete txn;
2647 }
2648 trans.clear();
2649 txn_db_->GetAllPreparedTransactions(&trans);
2650 assert(trans.size() == 0);
2651 #endif
2652 }
2653 assert(!s.ok() || column_families_.size() ==
2654 static_cast<size_t>(FLAGS_column_families));
2655
2656 if (s.ok() && FLAGS_test_secondary) {
2657 #ifndef ROCKSDB_LITE
2658 secondaries_.resize(FLAGS_threads);
2659 std::fill(secondaries_.begin(), secondaries_.end(), nullptr);
2660 secondary_cfh_lists_.clear();
2661 secondary_cfh_lists_.resize(FLAGS_threads);
2662 Options tmp_opts;
2663 // TODO(yanqin) support max_open_files != -1 for secondary instance.
2664 tmp_opts.max_open_files = -1;
2665 tmp_opts.statistics = dbstats_secondaries;
2666 tmp_opts.env = db_stress_env;
2667 for (size_t i = 0; i != static_cast<size_t>(FLAGS_threads); ++i) {
2668 const std::string secondary_path =
2669 FLAGS_secondaries_base + "/" + std::to_string(i);
2670 s = DB::OpenAsSecondary(tmp_opts, FLAGS_db, secondary_path,
2671 cf_descriptors, &secondary_cfh_lists_[i],
2672 &secondaries_[i]);
2673 if (!s.ok()) {
2674 break;
2675 }
2676 }
2677 #else
2678 fprintf(stderr, "Secondary is not supported in RocksDBLite\n");
2679 exit(1);
2680 #endif
2681 }
2682 if (s.ok() && FLAGS_continuous_verification_interval > 0 && !cmp_db_) {
2683 Options tmp_opts;
2684 // TODO(yanqin) support max_open_files != -1 for secondary instance.
2685 tmp_opts.max_open_files = -1;
2686 tmp_opts.env = db_stress_env;
2687 std::string secondary_path = FLAGS_secondaries_base + "/cmp_database";
2688 s = DB::OpenAsSecondary(tmp_opts, FLAGS_db, secondary_path,
2689 cf_descriptors, &cmp_cfhs_, &cmp_db_);
2690 assert(!s.ok() ||
2691 cmp_cfhs_.size() == static_cast<size_t>(FLAGS_column_families));
2692 }
2693 } else {
2694 #ifndef ROCKSDB_LITE
2695 DBWithTTL* db_with_ttl;
2696 s = DBWithTTL::Open(options_, FLAGS_db, &db_with_ttl, FLAGS_ttl);
2697 db_ = db_with_ttl;
2698 if (FLAGS_test_secondary) {
2699 secondaries_.resize(FLAGS_threads);
2700 std::fill(secondaries_.begin(), secondaries_.end(), nullptr);
2701 Options tmp_opts;
2702 tmp_opts.env = options_.env;
2703 // TODO(yanqin) support max_open_files != -1 for secondary instance.
2704 tmp_opts.max_open_files = -1;
2705 for (size_t i = 0; i != static_cast<size_t>(FLAGS_threads); ++i) {
2706 const std::string secondary_path =
2707 FLAGS_secondaries_base + "/" + std::to_string(i);
2708 s = DB::OpenAsSecondary(tmp_opts, FLAGS_db, secondary_path,
2709 &secondaries_[i]);
2710 if (!s.ok()) {
2711 break;
2712 }
2713 }
2714 }
2715 #else
2716 fprintf(stderr, "TTL is not supported in RocksDBLite\n");
2717 exit(1);
2718 #endif
2719 }
2720 if (!s.ok()) {
2721 fprintf(stderr, "open error: %s\n", s.ToString().c_str());
2722 exit(1);
2723 }
2724 }
2725
Reopen(ThreadState * thread)2726 void StressTest::Reopen(ThreadState* thread) {
2727 #ifndef ROCKSDB_LITE
2728 // BG jobs in WritePrepared must be canceled first because i) they can access
2729 // the db via a callbac ii) they hold on to a snapshot and the upcoming
2730 // ::Close would complain about it.
2731 const bool write_prepared = FLAGS_use_txn && FLAGS_txn_write_policy != 0;
2732 bool bg_canceled = false;
2733 if (write_prepared || thread->rand.OneIn(2)) {
2734 const bool wait =
2735 write_prepared || static_cast<bool>(thread->rand.OneIn(2));
2736 CancelAllBackgroundWork(db_, wait);
2737 bg_canceled = wait;
2738 }
2739 assert(!write_prepared || bg_canceled);
2740 (void) bg_canceled;
2741 #else
2742 (void) thread;
2743 #endif
2744
2745 for (auto cf : column_families_) {
2746 delete cf;
2747 }
2748 column_families_.clear();
2749
2750 #ifndef ROCKSDB_LITE
2751 if (thread->rand.OneIn(2)) {
2752 Status s = db_->Close();
2753 if (!s.ok()) {
2754 fprintf(stderr, "Non-ok close status: %s\n", s.ToString().c_str());
2755 fflush(stderr);
2756 }
2757 assert(s.ok());
2758 }
2759 #endif
2760 delete db_;
2761 db_ = nullptr;
2762 #ifndef ROCKSDB_LITE
2763 txn_db_ = nullptr;
2764 #endif
2765
2766 assert(secondaries_.size() == secondary_cfh_lists_.size());
2767 size_t n = secondaries_.size();
2768 for (size_t i = 0; i != n; ++i) {
2769 for (auto* cf : secondary_cfh_lists_[i]) {
2770 delete cf;
2771 }
2772 secondary_cfh_lists_[i].clear();
2773 delete secondaries_[i];
2774 }
2775 secondaries_.clear();
2776
2777 num_times_reopened_++;
2778 auto now = clock_->NowMicros();
2779 fprintf(stdout, "%s Reopening database for the %dth time\n",
2780 clock_->TimeToString(now / 1000000).c_str(), num_times_reopened_);
2781 Open();
2782 }
2783
CheckAndSetOptionsForUserTimestamp()2784 void StressTest::CheckAndSetOptionsForUserTimestamp() {
2785 assert(FLAGS_user_timestamp_size > 0);
2786 const Comparator* const cmp = test::ComparatorWithU64Ts();
2787 assert(cmp);
2788 if (FLAGS_user_timestamp_size != cmp->timestamp_size()) {
2789 fprintf(stderr,
2790 "Only -user_timestamp_size=%d is supported in stress test.\n",
2791 static_cast<int>(cmp->timestamp_size()));
2792 exit(1);
2793 }
2794 if (FLAGS_use_merge || FLAGS_use_full_merge_v1) {
2795 fprintf(stderr, "Merge does not support timestamp yet.\n");
2796 exit(1);
2797 }
2798 if (FLAGS_delrangepercent > 0) {
2799 fprintf(stderr, "DeleteRange does not support timestamp yet.\n");
2800 exit(1);
2801 }
2802 if (FLAGS_use_txn) {
2803 fprintf(stderr, "TransactionDB does not support timestamp yet.\n");
2804 exit(1);
2805 }
2806 if (FLAGS_read_only) {
2807 fprintf(stderr, "When opened as read-only, timestamp not supported.\n");
2808 exit(1);
2809 }
2810 if (FLAGS_test_secondary || FLAGS_secondary_catch_up_one_in > 0 ||
2811 FLAGS_continuous_verification_interval > 0) {
2812 fprintf(stderr, "Secondary instance does not support timestamp.\n");
2813 exit(1);
2814 }
2815 if (FLAGS_checkpoint_one_in > 0) {
2816 fprintf(stderr,
2817 "-checkpoint_one_in=%d requires "
2818 "DBImplReadOnly, which is not supported with timestamp\n",
2819 FLAGS_checkpoint_one_in);
2820 exit(1);
2821 }
2822 #ifndef ROCKSDB_LITE
2823 if (FLAGS_enable_blob_files || FLAGS_use_blob_db) {
2824 fprintf(stderr, "BlobDB not supported with timestamp.\n");
2825 exit(1);
2826 }
2827 #endif // !ROCKSDB_LITE
2828 if (FLAGS_enable_compaction_filter) {
2829 fprintf(stderr, "CompactionFilter not supported with timestamp.\n");
2830 exit(1);
2831 }
2832 if (FLAGS_test_cf_consistency || FLAGS_test_batches_snapshots) {
2833 fprintf(stderr,
2834 "Due to per-key ts-seq ordering constraint, only the (default) "
2835 "non-batched test is supported with timestamp.\n");
2836 exit(1);
2837 }
2838 if (FLAGS_ingest_external_file_one_in > 0) {
2839 fprintf(stderr, "Bulk loading may not support timestamp yet.\n");
2840 exit(1);
2841 }
2842 options_.comparator = cmp;
2843 }
2844 } // namespace ROCKSDB_NAMESPACE
2845 #endif // GFLAGS
2846