1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 //
10 
11 #ifdef GFLAGS
12 #include "db_stress_tool/db_stress_common.h"
13 #include "db_stress_tool/db_stress_compaction_filter.h"
14 #include "db_stress_tool/db_stress_driver.h"
15 #include "db_stress_tool/db_stress_table_properties_collector.h"
16 #include "rocksdb/convenience.h"
17 #include "rocksdb/filter_policy.h"
18 #include "rocksdb/secondary_cache.h"
19 #include "rocksdb/sst_file_manager.h"
20 #include "rocksdb/types.h"
21 #include "rocksdb/utilities/object_registry.h"
22 #include "util/cast_util.h"
23 #include "utilities/backupable/backupable_db_impl.h"
24 #include "utilities/fault_injection_fs.h"
25 
26 namespace ROCKSDB_NAMESPACE {
27 
28 namespace {
29 
CreateFilterPolicy()30 std::shared_ptr<const FilterPolicy> CreateFilterPolicy() {
31   if (FLAGS_bloom_bits < 0) {
32     return BlockBasedTableOptions().filter_policy;
33   }
34   const FilterPolicy* new_policy;
35   if (FLAGS_use_block_based_filter) {
36     if (FLAGS_ribbon_starting_level < 999) {
37       fprintf(
38           stderr,
39           "Cannot combine use_block_based_filter and ribbon_starting_level\n");
40       exit(1);
41     } else {
42       new_policy = NewBloomFilterPolicy(FLAGS_bloom_bits, true);
43     }
44   } else if (FLAGS_ribbon_starting_level >= 999) {
45     // Use Bloom API
46     new_policy = NewBloomFilterPolicy(FLAGS_bloom_bits, false);
47   } else {
48     new_policy = NewRibbonFilterPolicy(
49         FLAGS_bloom_bits, /* bloom_before_level */ FLAGS_ribbon_starting_level);
50   }
51   return std::shared_ptr<const FilterPolicy>(new_policy);
52 }
53 
54 }  // namespace
55 
StressTest()56 StressTest::StressTest()
57     : cache_(NewCache(FLAGS_cache_size, FLAGS_cache_numshardbits)),
58       compressed_cache_(NewLRUCache(FLAGS_compressed_cache_size)),
59       filter_policy_(CreateFilterPolicy()),
60       db_(nullptr),
61 #ifndef ROCKSDB_LITE
62       txn_db_(nullptr),
63 #endif
64       clock_(db_stress_env->GetSystemClock().get()),
65       new_column_family_name_(1),
66       num_times_reopened_(0),
67       db_preload_finished_(false),
68       cmp_db_(nullptr),
69       is_db_stopped_(false) {
70   if (FLAGS_destroy_db_initially) {
71     std::vector<std::string> files;
72     db_stress_env->GetChildren(FLAGS_db, &files);
73     for (unsigned int i = 0; i < files.size(); i++) {
74       if (Slice(files[i]).starts_with("heap-")) {
75         db_stress_env->DeleteFile(FLAGS_db + "/" + files[i]);
76       }
77     }
78 
79     Options options;
80     options.env = db_stress_env;
81     // Remove files without preserving manfiest files
82 #ifndef ROCKSDB_LITE
83     const Status s = !FLAGS_use_blob_db
84                          ? DestroyDB(FLAGS_db, options)
85                          : blob_db::DestroyBlobDB(FLAGS_db, options,
86                                                   blob_db::BlobDBOptions());
87 #else
88     const Status s = DestroyDB(FLAGS_db, options);
89 #endif  // !ROCKSDB_LITE
90 
91     if (!s.ok()) {
92       fprintf(stderr, "Cannot destroy original db: %s\n", s.ToString().c_str());
93       exit(1);
94     }
95   }
96 }
97 
~StressTest()98 StressTest::~StressTest() {
99   for (auto cf : column_families_) {
100     delete cf;
101   }
102   column_families_.clear();
103   delete db_;
104 
105   assert(secondaries_.size() == secondary_cfh_lists_.size());
106   size_t n = secondaries_.size();
107   for (size_t i = 0; i != n; ++i) {
108     for (auto* cf : secondary_cfh_lists_[i]) {
109       delete cf;
110     }
111     secondary_cfh_lists_[i].clear();
112     delete secondaries_[i];
113   }
114   secondaries_.clear();
115 
116   for (auto* cf : cmp_cfhs_) {
117     delete cf;
118   }
119   cmp_cfhs_.clear();
120   delete cmp_db_;
121 }
122 
NewCache(size_t capacity,int32_t num_shard_bits)123 std::shared_ptr<Cache> StressTest::NewCache(size_t capacity,
124                                             int32_t num_shard_bits) {
125   ConfigOptions config_options;
126   if (capacity <= 0) {
127     return nullptr;
128   }
129   if (FLAGS_use_clock_cache) {
130     auto cache = NewClockCache((size_t)capacity);
131     if (!cache) {
132       fprintf(stderr, "Clock cache not supported.");
133       exit(1);
134     }
135     return cache;
136   } else {
137     LRUCacheOptions opts;
138     opts.capacity = capacity;
139     opts.num_shard_bits = num_shard_bits;
140 #ifndef ROCKSDB_LITE
141     std::shared_ptr<SecondaryCache> secondary_cache;
142     if (!FLAGS_secondary_cache_uri.empty()) {
143       Status s = SecondaryCache::CreateFromString(
144           config_options, FLAGS_secondary_cache_uri, &secondary_cache);
145       if (secondary_cache == nullptr) {
146         fprintf(stderr,
147                 "No secondary cache registered matching string: %s status=%s\n",
148                 FLAGS_secondary_cache_uri.c_str(), s.ToString().c_str());
149         exit(1);
150       }
151       opts.secondary_cache = secondary_cache;
152     }
153 #endif
154     return NewLRUCache(opts);
155   }
156 }
157 
GetBlobCompressionTags()158 std::vector<std::string> StressTest::GetBlobCompressionTags() {
159   std::vector<std::string> compression_tags{"kNoCompression"};
160 
161   if (Snappy_Supported()) {
162     compression_tags.emplace_back("kSnappyCompression");
163   }
164   if (LZ4_Supported()) {
165     compression_tags.emplace_back("kLZ4Compression");
166   }
167   if (ZSTD_Supported()) {
168     compression_tags.emplace_back("kZSTD");
169   }
170 
171   return compression_tags;
172 }
173 
BuildOptionsTable()174 bool StressTest::BuildOptionsTable() {
175   if (FLAGS_set_options_one_in <= 0) {
176     return true;
177   }
178 
179   std::unordered_map<std::string, std::vector<std::string>> options_tbl = {
180       {"write_buffer_size",
181        {ToString(options_.write_buffer_size),
182         ToString(options_.write_buffer_size * 2),
183         ToString(options_.write_buffer_size * 4)}},
184       {"max_write_buffer_number",
185        {ToString(options_.max_write_buffer_number),
186         ToString(options_.max_write_buffer_number * 2),
187         ToString(options_.max_write_buffer_number * 4)}},
188       {"arena_block_size",
189        {
190            ToString(options_.arena_block_size),
191            ToString(options_.write_buffer_size / 4),
192            ToString(options_.write_buffer_size / 8),
193        }},
194       {"memtable_huge_page_size", {"0", ToString(2 * 1024 * 1024)}},
195       {"max_successive_merges", {"0", "2", "4"}},
196       {"inplace_update_num_locks", {"100", "200", "300"}},
197       // TODO(ljin): enable test for this option
198       // {"disable_auto_compactions", {"100", "200", "300"}},
199       {"soft_rate_limit", {"0", "0.5", "0.9"}},
200       {"hard_rate_limit", {"0", "1.1", "2.0"}},
201       {"level0_file_num_compaction_trigger",
202        {
203            ToString(options_.level0_file_num_compaction_trigger),
204            ToString(options_.level0_file_num_compaction_trigger + 2),
205            ToString(options_.level0_file_num_compaction_trigger + 4),
206        }},
207       {"level0_slowdown_writes_trigger",
208        {
209            ToString(options_.level0_slowdown_writes_trigger),
210            ToString(options_.level0_slowdown_writes_trigger + 2),
211            ToString(options_.level0_slowdown_writes_trigger + 4),
212        }},
213       {"level0_stop_writes_trigger",
214        {
215            ToString(options_.level0_stop_writes_trigger),
216            ToString(options_.level0_stop_writes_trigger + 2),
217            ToString(options_.level0_stop_writes_trigger + 4),
218        }},
219       {"max_compaction_bytes",
220        {
221            ToString(options_.target_file_size_base * 5),
222            ToString(options_.target_file_size_base * 15),
223            ToString(options_.target_file_size_base * 100),
224        }},
225       {"target_file_size_base",
226        {
227            ToString(options_.target_file_size_base),
228            ToString(options_.target_file_size_base * 2),
229            ToString(options_.target_file_size_base * 4),
230        }},
231       {"target_file_size_multiplier",
232        {
233            ToString(options_.target_file_size_multiplier),
234            "1",
235            "2",
236        }},
237       {"max_bytes_for_level_base",
238        {
239            ToString(options_.max_bytes_for_level_base / 2),
240            ToString(options_.max_bytes_for_level_base),
241            ToString(options_.max_bytes_for_level_base * 2),
242        }},
243       {"max_bytes_for_level_multiplier",
244        {
245            ToString(options_.max_bytes_for_level_multiplier),
246            "1",
247            "2",
248        }},
249       {"max_sequential_skip_in_iterations", {"4", "8", "12"}},
250   };
251 
252   if (FLAGS_allow_setting_blob_options_dynamically) {
253     options_tbl.emplace("enable_blob_files",
254                         std::vector<std::string>{"false", "true"});
255     options_tbl.emplace("min_blob_size",
256                         std::vector<std::string>{"0", "8", "16"});
257     options_tbl.emplace("blob_file_size",
258                         std::vector<std::string>{"1M", "16M", "256M", "1G"});
259     options_tbl.emplace("blob_compression_type", GetBlobCompressionTags());
260     options_tbl.emplace("enable_blob_garbage_collection",
261                         std::vector<std::string>{"false", "true"});
262     options_tbl.emplace(
263         "blob_garbage_collection_age_cutoff",
264         std::vector<std::string>{"0.0", "0.25", "0.5", "0.75", "1.0"});
265     options_tbl.emplace("blob_garbage_collection_force_threshold",
266                         std::vector<std::string>{"0.5", "0.75", "1.0"});
267   }
268 
269   options_table_ = std::move(options_tbl);
270 
271   for (const auto& iter : options_table_) {
272     options_index_.push_back(iter.first);
273   }
274   return true;
275 }
276 
InitDb()277 void StressTest::InitDb() {
278   uint64_t now = clock_->NowMicros();
279   fprintf(stdout, "%s Initializing db_stress\n",
280           clock_->TimeToString(now / 1000000).c_str());
281   PrintEnv();
282   Open();
283   BuildOptionsTable();
284 }
285 
FinishInitDb(SharedState * shared)286 void StressTest::FinishInitDb(SharedState* shared) {
287   if (FLAGS_read_only) {
288     uint64_t now = clock_->NowMicros();
289     fprintf(stdout, "%s Preloading db with %" PRIu64 " KVs\n",
290             clock_->TimeToString(now / 1000000).c_str(), FLAGS_max_key);
291     PreloadDbAndReopenAsReadOnly(FLAGS_max_key, shared);
292   }
293   if (FLAGS_enable_compaction_filter) {
294     auto* compaction_filter_factory =
295         reinterpret_cast<DbStressCompactionFilterFactory*>(
296             options_.compaction_filter_factory.get());
297     assert(compaction_filter_factory);
298     compaction_filter_factory->SetSharedState(shared);
299     fprintf(stdout, "Compaction filter factory: %s\n",
300             compaction_filter_factory->Name());
301   }
302 }
303 
VerifySecondaries()304 bool StressTest::VerifySecondaries() {
305 #ifndef ROCKSDB_LITE
306   if (FLAGS_test_secondary) {
307     uint64_t now = clock_->NowMicros();
308     fprintf(stdout, "%s Start to verify secondaries against primary\n",
309             clock_->TimeToString(static_cast<uint64_t>(now) / 1000000).c_str());
310   }
311   for (size_t k = 0; k != secondaries_.size(); ++k) {
312     Status s = secondaries_[k]->TryCatchUpWithPrimary();
313     if (!s.ok()) {
314       fprintf(stderr, "Secondary failed to catch up with primary\n");
315       return false;
316     }
317     ReadOptions ropts;
318     ropts.total_order_seek = true;
319     // Verify only the default column family since the primary may have
320     // dropped other column families after most recent reopen.
321     std::unique_ptr<Iterator> iter1(db_->NewIterator(ropts));
322     std::unique_ptr<Iterator> iter2(secondaries_[k]->NewIterator(ropts));
323     for (iter1->SeekToFirst(), iter2->SeekToFirst();
324          iter1->Valid() && iter2->Valid(); iter1->Next(), iter2->Next()) {
325       if (iter1->key().compare(iter2->key()) != 0 ||
326           iter1->value().compare(iter2->value())) {
327         fprintf(stderr,
328                 "Secondary %d contains different data from "
329                 "primary.\nPrimary: %s : %s\nSecondary: %s : %s\n",
330                 static_cast<int>(k),
331                 iter1->key().ToString(/*hex=*/true).c_str(),
332                 iter1->value().ToString(/*hex=*/true).c_str(),
333                 iter2->key().ToString(/*hex=*/true).c_str(),
334                 iter2->value().ToString(/*hex=*/true).c_str());
335         return false;
336       }
337     }
338     if (iter1->Valid() && !iter2->Valid()) {
339       fprintf(stderr,
340               "Secondary %d record count is smaller than that of primary\n",
341               static_cast<int>(k));
342       return false;
343     } else if (!iter1->Valid() && iter2->Valid()) {
344       fprintf(stderr,
345               "Secondary %d record count is larger than that of primary\n",
346               static_cast<int>(k));
347       return false;
348     }
349   }
350   if (FLAGS_test_secondary) {
351     uint64_t now = clock_->NowMicros();
352     fprintf(stdout, "%s Verification of secondaries succeeded\n",
353             clock_->TimeToString(static_cast<uint64_t>(now) / 1000000).c_str());
354   }
355 #endif  // ROCKSDB_LITE
356   return true;
357 }
358 
AssertSame(DB * db,ColumnFamilyHandle * cf,ThreadState::SnapshotState & snap_state)359 Status StressTest::AssertSame(DB* db, ColumnFamilyHandle* cf,
360                               ThreadState::SnapshotState& snap_state) {
361   Status s;
362   if (cf->GetName() != snap_state.cf_at_name) {
363     return s;
364   }
365   ReadOptions ropt;
366   ropt.snapshot = snap_state.snapshot;
367   Slice ts;
368   if (!snap_state.timestamp.empty()) {
369     ts = snap_state.timestamp;
370     ropt.timestamp = &ts;
371   }
372   PinnableSlice exp_v(&snap_state.value);
373   exp_v.PinSelf();
374   PinnableSlice v;
375   s = db->Get(ropt, cf, snap_state.key, &v);
376   if (!s.ok() && !s.IsNotFound()) {
377     return s;
378   }
379   if (snap_state.status != s) {
380     return Status::Corruption(
381         "The snapshot gave inconsistent results for key " +
382         ToString(Hash(snap_state.key.c_str(), snap_state.key.size(), 0)) +
383         " in cf " + cf->GetName() + ": (" + snap_state.status.ToString() +
384         ") vs. (" + s.ToString() + ")");
385   }
386   if (s.ok()) {
387     if (exp_v != v) {
388       return Status::Corruption("The snapshot gave inconsistent values: (" +
389                                 exp_v.ToString() + ") vs. (" + v.ToString() +
390                                 ")");
391     }
392   }
393   if (snap_state.key_vec != nullptr) {
394     // When `prefix_extractor` is set, seeking to beginning and scanning
395     // across prefixes are only supported with `total_order_seek` set.
396     ropt.total_order_seek = true;
397     std::unique_ptr<Iterator> iterator(db->NewIterator(ropt));
398     std::unique_ptr<std::vector<bool>> tmp_bitvec(
399         new std::vector<bool>(FLAGS_max_key));
400     for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
401       uint64_t key_val;
402       if (GetIntVal(iterator->key().ToString(), &key_val)) {
403         (*tmp_bitvec.get())[key_val] = true;
404       }
405     }
406     if (!std::equal(snap_state.key_vec->begin(), snap_state.key_vec->end(),
407                     tmp_bitvec.get()->begin())) {
408       return Status::Corruption("Found inconsistent keys at this snapshot");
409     }
410   }
411   return Status::OK();
412 }
413 
VerificationAbort(SharedState * shared,std::string msg,Status s) const414 void StressTest::VerificationAbort(SharedState* shared, std::string msg,
415                                    Status s) const {
416   fprintf(stderr, "Verification failed: %s. Status is %s\n", msg.c_str(),
417           s.ToString().c_str());
418   shared->SetVerificationFailure();
419 }
420 
VerificationAbort(SharedState * shared,std::string msg,int cf,int64_t key) const421 void StressTest::VerificationAbort(SharedState* shared, std::string msg, int cf,
422                                    int64_t key) const {
423   auto key_str = Key(key);
424   Slice key_slice = key_str;
425   fprintf(stderr,
426           "Verification failed for column family %d key %s (%" PRIi64 "): %s\n",
427           cf, key_slice.ToString(true).c_str(), key, msg.c_str());
428   shared->SetVerificationFailure();
429 }
430 
PrintStatistics()431 void StressTest::PrintStatistics() {
432   if (dbstats) {
433     fprintf(stdout, "STATISTICS:\n%s\n", dbstats->ToString().c_str());
434   }
435   if (dbstats_secondaries) {
436     fprintf(stdout, "Secondary instances STATISTICS:\n%s\n",
437             dbstats_secondaries->ToString().c_str());
438   }
439 }
440 
441 // Currently PreloadDb has to be single-threaded.
PreloadDbAndReopenAsReadOnly(int64_t number_of_keys,SharedState * shared)442 void StressTest::PreloadDbAndReopenAsReadOnly(int64_t number_of_keys,
443                                               SharedState* shared) {
444   WriteOptions write_opts;
445   write_opts.disableWAL = FLAGS_disable_wal;
446   if (FLAGS_sync) {
447     write_opts.sync = true;
448   }
449   char value[100];
450   int cf_idx = 0;
451   Status s;
452   for (auto cfh : column_families_) {
453     for (int64_t k = 0; k != number_of_keys; ++k) {
454       std::string key_str = Key(k);
455       Slice key = key_str;
456       size_t sz = GenerateValue(0 /*value_base*/, value, sizeof(value));
457       Slice v(value, sz);
458       shared->Put(cf_idx, k, 0, true /* pending */);
459 
460       if (FLAGS_use_merge) {
461         if (!FLAGS_use_txn) {
462           s = db_->Merge(write_opts, cfh, key, v);
463         } else {
464 #ifndef ROCKSDB_LITE
465           Transaction* txn;
466           s = NewTxn(write_opts, &txn);
467           if (s.ok()) {
468             s = txn->Merge(cfh, key, v);
469             if (s.ok()) {
470               s = CommitTxn(txn);
471             }
472           }
473 #endif
474         }
475       } else {
476         if (!FLAGS_use_txn) {
477           std::string ts_str;
478           Slice ts;
479           if (FLAGS_user_timestamp_size > 0) {
480             ts_str = NowNanosStr();
481             ts = ts_str;
482             write_opts.timestamp = &ts;
483           }
484           s = db_->Put(write_opts, cfh, key, v);
485         } else {
486 #ifndef ROCKSDB_LITE
487           Transaction* txn;
488           s = NewTxn(write_opts, &txn);
489           if (s.ok()) {
490             s = txn->Put(cfh, key, v);
491             if (s.ok()) {
492               s = CommitTxn(txn);
493             }
494           }
495 #endif
496         }
497       }
498 
499       shared->Put(cf_idx, k, 0, false /* pending */);
500       if (!s.ok()) {
501         break;
502       }
503     }
504     if (!s.ok()) {
505       break;
506     }
507     ++cf_idx;
508   }
509   if (s.ok()) {
510     s = db_->Flush(FlushOptions(), column_families_);
511   }
512   if (s.ok()) {
513     for (auto cf : column_families_) {
514       delete cf;
515     }
516     column_families_.clear();
517     delete db_;
518     db_ = nullptr;
519 #ifndef ROCKSDB_LITE
520     txn_db_ = nullptr;
521 #endif
522 
523     db_preload_finished_.store(true);
524     auto now = clock_->NowMicros();
525     fprintf(stdout, "%s Reopening database in read-only\n",
526             clock_->TimeToString(now / 1000000).c_str());
527     // Reopen as read-only, can ignore all options related to updates
528     Open();
529   } else {
530     fprintf(stderr, "Failed to preload db");
531     exit(1);
532   }
533 }
534 
SetOptions(ThreadState * thread)535 Status StressTest::SetOptions(ThreadState* thread) {
536   assert(FLAGS_set_options_one_in > 0);
537   std::unordered_map<std::string, std::string> opts;
538   std::string name =
539       options_index_[thread->rand.Next() % options_index_.size()];
540   int value_idx = thread->rand.Next() % options_table_[name].size();
541   if (name == "soft_rate_limit" || name == "hard_rate_limit") {
542     opts["soft_rate_limit"] = options_table_["soft_rate_limit"][value_idx];
543     opts["hard_rate_limit"] = options_table_["hard_rate_limit"][value_idx];
544   } else if (name == "level0_file_num_compaction_trigger" ||
545              name == "level0_slowdown_writes_trigger" ||
546              name == "level0_stop_writes_trigger") {
547     opts["level0_file_num_compaction_trigger"] =
548         options_table_["level0_file_num_compaction_trigger"][value_idx];
549     opts["level0_slowdown_writes_trigger"] =
550         options_table_["level0_slowdown_writes_trigger"][value_idx];
551     opts["level0_stop_writes_trigger"] =
552         options_table_["level0_stop_writes_trigger"][value_idx];
553   } else {
554     opts[name] = options_table_[name][value_idx];
555   }
556 
557   int rand_cf_idx = thread->rand.Next() % FLAGS_column_families;
558   auto cfh = column_families_[rand_cf_idx];
559   return db_->SetOptions(cfh, opts);
560 }
561 
562 #ifndef ROCKSDB_LITE
NewTxn(WriteOptions & write_opts,Transaction ** txn)563 Status StressTest::NewTxn(WriteOptions& write_opts, Transaction** txn) {
564   if (!FLAGS_use_txn) {
565     return Status::InvalidArgument("NewTxn when FLAGS_use_txn is not set");
566   }
567   static std::atomic<uint64_t> txn_id = {0};
568   TransactionOptions txn_options;
569   txn_options.lock_timeout = 600000;  // 10 min
570   txn_options.deadlock_detect = true;
571   *txn = txn_db_->BeginTransaction(write_opts, txn_options);
572   auto istr = std::to_string(txn_id.fetch_add(1));
573   Status s = (*txn)->SetName("xid" + istr);
574   return s;
575 }
576 
CommitTxn(Transaction * txn)577 Status StressTest::CommitTxn(Transaction* txn) {
578   if (!FLAGS_use_txn) {
579     return Status::InvalidArgument("CommitTxn when FLAGS_use_txn is not set");
580   }
581   Status s = txn->Prepare();
582   if (s.ok()) {
583     s = txn->Commit();
584   }
585   delete txn;
586   return s;
587 }
588 
RollbackTxn(Transaction * txn)589 Status StressTest::RollbackTxn(Transaction* txn) {
590   if (!FLAGS_use_txn) {
591     return Status::InvalidArgument(
592         "RollbackTxn when FLAGS_use_txn is not"
593         " set");
594   }
595   Status s = txn->Rollback();
596   delete txn;
597   return s;
598 }
599 #endif
600 
OperateDb(ThreadState * thread)601 void StressTest::OperateDb(ThreadState* thread) {
602   ReadOptions read_opts(FLAGS_verify_checksum, true);
603   WriteOptions write_opts;
604   auto shared = thread->shared;
605   char value[100];
606   std::string from_db;
607   if (FLAGS_sync) {
608     write_opts.sync = true;
609   }
610   write_opts.disableWAL = FLAGS_disable_wal;
611   const int prefixBound = static_cast<int>(FLAGS_readpercent) +
612                           static_cast<int>(FLAGS_prefixpercent);
613   const int writeBound = prefixBound + static_cast<int>(FLAGS_writepercent);
614   const int delBound = writeBound + static_cast<int>(FLAGS_delpercent);
615   const int delRangeBound = delBound + static_cast<int>(FLAGS_delrangepercent);
616   const uint64_t ops_per_open = FLAGS_ops_per_thread / (FLAGS_reopen + 1);
617 
618 #ifndef NDEBUG
619   if (FLAGS_read_fault_one_in) {
620     fault_fs_guard->SetThreadLocalReadErrorContext(thread->shared->GetSeed(),
621                                             FLAGS_read_fault_one_in);
622   }
623   if (FLAGS_write_fault_one_in) {
624     IOStatus error_msg;
625     if (FLAGS_injest_error_severity <= 1 || FLAGS_injest_error_severity > 2) {
626       error_msg = IOStatus::IOError("Retryable IO Error");
627       error_msg.SetRetryable(true);
628     } else if (FLAGS_injest_error_severity == 2) {
629       // Ingest the fatal error
630       error_msg = IOStatus::IOError("Fatal IO Error");
631       error_msg.SetDataLoss(true);
632     }
633     std::vector<FileType> types = {FileType::kTableFile,
634                                    FileType::kDescriptorFile,
635                                    FileType::kCurrentFile};
636     fault_fs_guard->SetRandomWriteError(
637         thread->shared->GetSeed(), FLAGS_write_fault_one_in, error_msg,
638         /*inject_for_all_file_types=*/false, types);
639   }
640 #endif // NDEBUG
641   thread->stats.Start();
642   for (int open_cnt = 0; open_cnt <= FLAGS_reopen; ++open_cnt) {
643     if (thread->shared->HasVerificationFailedYet() ||
644         thread->shared->ShouldStopTest()) {
645       break;
646     }
647     if (open_cnt != 0) {
648       thread->stats.FinishedSingleOp();
649       MutexLock l(thread->shared->GetMutex());
650       while (!thread->snapshot_queue.empty()) {
651         db_->ReleaseSnapshot(thread->snapshot_queue.front().second.snapshot);
652         delete thread->snapshot_queue.front().second.key_vec;
653         thread->snapshot_queue.pop();
654       }
655       thread->shared->IncVotedReopen();
656       if (thread->shared->AllVotedReopen()) {
657         thread->shared->GetStressTest()->Reopen(thread);
658         thread->shared->GetCondVar()->SignalAll();
659       } else {
660         thread->shared->GetCondVar()->Wait();
661       }
662       // Commenting this out as we don't want to reset stats on each open.
663       // thread->stats.Start();
664     }
665 
666     for (uint64_t i = 0; i < ops_per_open; i++) {
667       if (thread->shared->HasVerificationFailedYet()) {
668         break;
669       }
670 
671       // Change Options
672       if (thread->rand.OneInOpt(FLAGS_set_options_one_in)) {
673         SetOptions(thread);
674       }
675 
676       if (thread->rand.OneInOpt(FLAGS_set_in_place_one_in)) {
677         options_.inplace_update_support ^= options_.inplace_update_support;
678       }
679 
680       if (thread->tid == 0 && FLAGS_verify_db_one_in > 0 &&
681           thread->rand.OneIn(FLAGS_verify_db_one_in)) {
682         ContinuouslyVerifyDb(thread);
683         if (thread->shared->ShouldStopTest()) {
684           break;
685         }
686       }
687 
688       MaybeClearOneColumnFamily(thread);
689 
690       if (thread->rand.OneInOpt(FLAGS_sync_wal_one_in)) {
691         Status s = db_->SyncWAL();
692         if (!s.ok() && !s.IsNotSupported()) {
693           fprintf(stderr, "SyncWAL() failed: %s\n", s.ToString().c_str());
694         }
695       }
696 
697       int rand_column_family = thread->rand.Next() % FLAGS_column_families;
698       ColumnFamilyHandle* column_family = column_families_[rand_column_family];
699 
700       if (thread->rand.OneInOpt(FLAGS_compact_files_one_in)) {
701         TestCompactFiles(thread, column_family);
702       }
703 
704       int64_t rand_key = GenerateOneKey(thread, i);
705       std::string keystr = Key(rand_key);
706       Slice key = keystr;
707       std::unique_ptr<MutexLock> lock;
708       if (ShouldAcquireMutexOnKey()) {
709         lock.reset(new MutexLock(
710             shared->GetMutexForKey(rand_column_family, rand_key)));
711       }
712 
713       if (thread->rand.OneInOpt(FLAGS_compact_range_one_in)) {
714         TestCompactRange(thread, rand_key, key, column_family);
715         if (thread->shared->HasVerificationFailedYet()) {
716           break;
717         }
718       }
719 
720       std::vector<int> rand_column_families =
721           GenerateColumnFamilies(FLAGS_column_families, rand_column_family);
722 
723       if (thread->rand.OneInOpt(FLAGS_flush_one_in)) {
724         Status status = TestFlush(rand_column_families);
725         if (!status.ok()) {
726           fprintf(stdout, "Unable to perform Flush(): %s\n",
727                   status.ToString().c_str());
728         }
729       }
730 
731 #ifndef ROCKSDB_LITE
732       // Verify GetLiveFiles with a 1 in N chance.
733       if (thread->rand.OneInOpt(FLAGS_get_live_files_one_in) &&
734           !FLAGS_write_fault_one_in) {
735         Status status = VerifyGetLiveFiles();
736         if (!status.ok()) {
737           VerificationAbort(shared, "VerifyGetLiveFiles status not OK", status);
738         }
739       }
740 
741       // Verify GetSortedWalFiles with a 1 in N chance.
742       if (thread->rand.OneInOpt(FLAGS_get_sorted_wal_files_one_in)) {
743         Status status = VerifyGetSortedWalFiles();
744         if (!status.ok()) {
745           VerificationAbort(shared, "VerifyGetSortedWalFiles status not OK",
746                             status);
747         }
748       }
749 
750       // Verify GetCurrentWalFile with a 1 in N chance.
751       if (thread->rand.OneInOpt(FLAGS_get_current_wal_file_one_in)) {
752         Status status = VerifyGetCurrentWalFile();
753         if (!status.ok()) {
754           VerificationAbort(shared, "VerifyGetCurrentWalFile status not OK",
755                             status);
756         }
757       }
758 #endif  // !ROCKSDB_LITE
759 
760       if (thread->rand.OneInOpt(FLAGS_pause_background_one_in)) {
761         Status status = TestPauseBackground(thread);
762         if (!status.ok()) {
763           VerificationAbort(
764               shared, "Pause/ContinueBackgroundWork status not OK", status);
765         }
766       }
767 
768 #ifndef ROCKSDB_LITE
769       if (thread->rand.OneInOpt(FLAGS_verify_checksum_one_in)) {
770         Status status = db_->VerifyChecksum();
771         if (!status.ok()) {
772           VerificationAbort(shared, "VerifyChecksum status not OK", status);
773         }
774       }
775 
776       if (thread->rand.OneInOpt(FLAGS_get_property_one_in)) {
777         TestGetProperty(thread);
778       }
779 #endif
780 
781       std::vector<int64_t> rand_keys = GenerateKeys(rand_key);
782 
783       if (thread->rand.OneInOpt(FLAGS_ingest_external_file_one_in)) {
784         TestIngestExternalFile(thread, rand_column_families, rand_keys, lock);
785       }
786 
787       if (thread->rand.OneInOpt(FLAGS_backup_one_in)) {
788         // Beyond a certain DB size threshold, this test becomes heavier than
789         // it's worth.
790         uint64_t total_size = 0;
791         if (FLAGS_backup_max_size > 0) {
792           std::vector<FileAttributes> files;
793           db_stress_env->GetChildrenFileAttributes(FLAGS_db, &files);
794           for (auto& file : files) {
795             total_size += file.size_bytes;
796           }
797         }
798 
799         if (total_size <= FLAGS_backup_max_size) {
800           Status s = TestBackupRestore(thread, rand_column_families, rand_keys);
801           if (!s.ok()) {
802             VerificationAbort(shared, "Backup/restore gave inconsistent state",
803                               s);
804           }
805         }
806       }
807 
808       if (thread->rand.OneInOpt(FLAGS_checkpoint_one_in)) {
809         Status s = TestCheckpoint(thread, rand_column_families, rand_keys);
810         if (!s.ok()) {
811           VerificationAbort(shared, "Checkpoint gave inconsistent state", s);
812         }
813       }
814 
815 #ifndef ROCKSDB_LITE
816       if (thread->rand.OneInOpt(FLAGS_approximate_size_one_in)) {
817         Status s =
818             TestApproximateSize(thread, i, rand_column_families, rand_keys);
819         if (!s.ok()) {
820           VerificationAbort(shared, "ApproximateSize Failed", s);
821         }
822       }
823 #endif  // !ROCKSDB_LITE
824       if (thread->rand.OneInOpt(FLAGS_acquire_snapshot_one_in)) {
825         TestAcquireSnapshot(thread, rand_column_family, keystr, i);
826       }
827 
828       /*always*/ {
829         Status s = MaybeReleaseSnapshots(thread, i);
830         if (!s.ok()) {
831           VerificationAbort(shared, "Snapshot gave inconsistent state", s);
832         }
833       }
834 
835       // Assign timestamps if necessary.
836       std::string read_ts_str;
837       std::string write_ts_str;
838       Slice read_ts;
839       Slice write_ts;
840       if (ShouldAcquireMutexOnKey() && FLAGS_user_timestamp_size > 0) {
841         read_ts_str = GenerateTimestampForRead();
842         read_ts = read_ts_str;
843         read_opts.timestamp = &read_ts;
844         write_ts_str = NowNanosStr();
845         write_ts = write_ts_str;
846         write_opts.timestamp = &write_ts;
847       }
848 
849       int prob_op = thread->rand.Uniform(100);
850       // Reset this in case we pick something other than a read op. We don't
851       // want to use a stale value when deciding at the beginning of the loop
852       // whether to vote to reopen
853       if (prob_op >= 0 && prob_op < static_cast<int>(FLAGS_readpercent)) {
854         assert(0 <= prob_op);
855         // OPERATION read
856         if (FLAGS_use_multiget) {
857           // Leave room for one more iteration of the loop with a single key
858           // batch. This is to ensure that each thread does exactly the same
859           // number of ops
860           int multiget_batch_size = static_cast<int>(
861               std::min(static_cast<uint64_t>(thread->rand.Uniform(64)),
862                        FLAGS_ops_per_thread - i - 1));
863           // If its the last iteration, ensure that multiget_batch_size is 1
864           multiget_batch_size = std::max(multiget_batch_size, 1);
865           rand_keys = GenerateNKeys(thread, multiget_batch_size, i);
866           TestMultiGet(thread, read_opts, rand_column_families, rand_keys);
867           i += multiget_batch_size - 1;
868         } else {
869           TestGet(thread, read_opts, rand_column_families, rand_keys);
870         }
871       } else if (prob_op < prefixBound) {
872         assert(static_cast<int>(FLAGS_readpercent) <= prob_op);
873         // OPERATION prefix scan
874         // keys are 8 bytes long, prefix size is FLAGS_prefix_size. There are
875         // (8 - FLAGS_prefix_size) bytes besides the prefix. So there will
876         // be 2 ^ ((8 - FLAGS_prefix_size) * 8) possible keys with the same
877         // prefix
878         TestPrefixScan(thread, read_opts, rand_column_families, rand_keys);
879       } else if (prob_op < writeBound) {
880         assert(prefixBound <= prob_op);
881         // OPERATION write
882         TestPut(thread, write_opts, read_opts, rand_column_families, rand_keys,
883                 value, lock);
884       } else if (prob_op < delBound) {
885         assert(writeBound <= prob_op);
886         // OPERATION delete
887         TestDelete(thread, write_opts, rand_column_families, rand_keys, lock);
888       } else if (prob_op < delRangeBound) {
889         assert(delBound <= prob_op);
890         // OPERATION delete range
891         TestDeleteRange(thread, write_opts, rand_column_families, rand_keys,
892                         lock);
893       } else {
894         assert(delRangeBound <= prob_op);
895         // OPERATION iterate
896         int num_seeks = static_cast<int>(
897             std::min(static_cast<uint64_t>(thread->rand.Uniform(4)),
898                      FLAGS_ops_per_thread - i - 1));
899         rand_keys = GenerateNKeys(thread, num_seeks, i);
900         i += num_seeks - 1;
901         TestIterate(thread, read_opts, rand_column_families, rand_keys);
902       }
903       thread->stats.FinishedSingleOp();
904 #ifndef ROCKSDB_LITE
905       uint32_t tid = thread->tid;
906       assert(secondaries_.empty() ||
907              static_cast<size_t>(tid) < secondaries_.size());
908       if (thread->rand.OneInOpt(FLAGS_secondary_catch_up_one_in)) {
909         Status s = secondaries_[tid]->TryCatchUpWithPrimary();
910         if (!s.ok()) {
911           VerificationAbort(shared, "Secondary instance failed to catch up", s);
912           break;
913         }
914       }
915 #endif
916     }
917   }
918   while (!thread->snapshot_queue.empty()) {
919     db_->ReleaseSnapshot(thread->snapshot_queue.front().second.snapshot);
920     delete thread->snapshot_queue.front().second.key_vec;
921     thread->snapshot_queue.pop();
922   }
923 
924   thread->stats.Stop();
925 }
926 
927 #ifndef ROCKSDB_LITE
928 // Generated a list of keys that close to boundaries of SST keys.
929 // If there isn't any SST file in the DB, return empty list.
GetWhiteBoxKeys(ThreadState * thread,DB * db,ColumnFamilyHandle * cfh,size_t num_keys)930 std::vector<std::string> StressTest::GetWhiteBoxKeys(ThreadState* thread,
931                                                      DB* db,
932                                                      ColumnFamilyHandle* cfh,
933                                                      size_t num_keys) {
934   ColumnFamilyMetaData cfmd;
935   db->GetColumnFamilyMetaData(cfh, &cfmd);
936   std::vector<std::string> boundaries;
937   for (const LevelMetaData& lmd : cfmd.levels) {
938     for (const SstFileMetaData& sfmd : lmd.files) {
939       // If FLAGS_user_timestamp_size > 0, then both smallestkey and largestkey
940       // have timestamps.
941       const auto& skey = sfmd.smallestkey;
942       const auto& lkey = sfmd.largestkey;
943       assert(skey.size() >= FLAGS_user_timestamp_size);
944       assert(lkey.size() >= FLAGS_user_timestamp_size);
945       boundaries.push_back(
946           skey.substr(0, skey.size() - FLAGS_user_timestamp_size));
947       boundaries.push_back(
948           lkey.substr(0, lkey.size() - FLAGS_user_timestamp_size));
949     }
950   }
951   if (boundaries.empty()) {
952     return {};
953   }
954 
955   std::vector<std::string> ret;
956   for (size_t j = 0; j < num_keys; j++) {
957     std::string k =
958         boundaries[thread->rand.Uniform(static_cast<int>(boundaries.size()))];
959     if (thread->rand.OneIn(3)) {
960       // Reduce one byte from the string
961       for (int i = static_cast<int>(k.length()) - 1; i >= 0; i--) {
962         uint8_t cur = k[i];
963         if (cur > 0) {
964           k[i] = static_cast<char>(cur - 1);
965           break;
966         } else if (i > 0) {
967           k[i] = 0xFFu;
968         }
969       }
970     } else if (thread->rand.OneIn(2)) {
971       // Add one byte to the string
972       for (int i = static_cast<int>(k.length()) - 1; i >= 0; i--) {
973         uint8_t cur = k[i];
974         if (cur < 255) {
975           k[i] = static_cast<char>(cur + 1);
976           break;
977         } else if (i > 0) {
978           k[i] = 0x00;
979         }
980       }
981     }
982     ret.push_back(k);
983   }
984   return ret;
985 }
986 #endif  // !ROCKSDB_LITE
987 
988 // Given a key K, this creates an iterator which scans to K and then
989 // does a random sequence of Next/Prev operations.
TestIterate(ThreadState * thread,const ReadOptions & read_opts,const std::vector<int> & rand_column_families,const std::vector<int64_t> & rand_keys)990 Status StressTest::TestIterate(ThreadState* thread,
991                                const ReadOptions& read_opts,
992                                const std::vector<int>& rand_column_families,
993                                const std::vector<int64_t>& rand_keys) {
994   Status s;
995   const Snapshot* snapshot = db_->GetSnapshot();
996   ReadOptions readoptionscopy = read_opts;
997   readoptionscopy.snapshot = snapshot;
998 
999   bool expect_total_order = false;
1000   if (thread->rand.OneIn(16)) {
1001     // When prefix extractor is used, it's useful to cover total order seek.
1002     readoptionscopy.total_order_seek = true;
1003     expect_total_order = true;
1004   } else if (thread->rand.OneIn(4)) {
1005     readoptionscopy.total_order_seek = false;
1006     readoptionscopy.auto_prefix_mode = true;
1007     expect_total_order = true;
1008   } else if (options_.prefix_extractor.get() == nullptr) {
1009     expect_total_order = true;
1010   }
1011 
1012   std::string upper_bound_str;
1013   Slice upper_bound;
1014   if (thread->rand.OneIn(16)) {
1015     // in 1/16 chance, set a iterator upper bound
1016     int64_t rand_upper_key = GenerateOneKey(thread, FLAGS_ops_per_thread);
1017     upper_bound_str = Key(rand_upper_key);
1018     upper_bound = Slice(upper_bound_str);
1019     // uppder_bound can be smaller than seek key, but the query itself
1020     // should not crash either.
1021     readoptionscopy.iterate_upper_bound = &upper_bound;
1022   }
1023   std::string lower_bound_str;
1024   Slice lower_bound;
1025   if (thread->rand.OneIn(16)) {
1026     // in 1/16 chance, enable iterator lower bound
1027     int64_t rand_lower_key = GenerateOneKey(thread, FLAGS_ops_per_thread);
1028     lower_bound_str = Key(rand_lower_key);
1029     lower_bound = Slice(lower_bound_str);
1030     // uppder_bound can be smaller than seek key, but the query itself
1031     // should not crash either.
1032     readoptionscopy.iterate_lower_bound = &lower_bound;
1033   }
1034 
1035   auto cfh = column_families_[rand_column_families[0]];
1036   std::unique_ptr<Iterator> iter(db_->NewIterator(readoptionscopy, cfh));
1037 
1038   std::vector<std::string> key_str;
1039   if (thread->rand.OneIn(16)) {
1040     // Generate keys close to lower or upper bound of SST files.
1041     key_str = GetWhiteBoxKeys(thread, db_, cfh, rand_keys.size());
1042   }
1043   if (key_str.empty()) {
1044     // If key string is not geneerated using white block keys,
1045     // Use randomized key passe in.
1046     for (int64_t rkey : rand_keys) {
1047       key_str.push_back(Key(rkey));
1048     }
1049   }
1050 
1051   std::string op_logs;
1052   const size_t kOpLogsLimit = 10000;
1053 
1054   for (const std::string& skey : key_str) {
1055     if (op_logs.size() > kOpLogsLimit) {
1056       // Shouldn't take too much memory for the history log. Clear it.
1057       op_logs = "(cleared...)\n";
1058     }
1059 
1060     Slice key = skey;
1061 
1062     if (readoptionscopy.iterate_upper_bound != nullptr &&
1063         thread->rand.OneIn(2)) {
1064       // 1/2 chance, change the upper bound.
1065       // It is possible that it is changed without first use, but there is no
1066       // problem with that.
1067       int64_t rand_upper_key = GenerateOneKey(thread, FLAGS_ops_per_thread);
1068       upper_bound_str = Key(rand_upper_key);
1069       upper_bound = Slice(upper_bound_str);
1070     } else if (readoptionscopy.iterate_lower_bound != nullptr &&
1071                thread->rand.OneIn(4)) {
1072       // 1/4 chance, change the lower bound.
1073       // It is possible that it is changed without first use, but there is no
1074       // problem with that.
1075       int64_t rand_lower_key = GenerateOneKey(thread, FLAGS_ops_per_thread);
1076       lower_bound_str = Key(rand_lower_key);
1077       lower_bound = Slice(lower_bound_str);
1078     }
1079 
1080     // Record some options to op_logs;
1081     op_logs += "total_order_seek: ";
1082     op_logs += (readoptionscopy.total_order_seek ? "1 " : "0 ");
1083     op_logs += "auto_prefix_mode: ";
1084     op_logs += (readoptionscopy.auto_prefix_mode ? "1 " : "0 ");
1085     if (readoptionscopy.iterate_upper_bound != nullptr) {
1086       op_logs += "ub: " + upper_bound.ToString(true) + " ";
1087     }
1088     if (readoptionscopy.iterate_lower_bound != nullptr) {
1089       op_logs += "lb: " + lower_bound.ToString(true) + " ";
1090     }
1091 
1092     // Set up an iterator and does the same without bounds and with total
1093     // order seek and compare the results. This is to identify bugs related
1094     // to bounds, prefix extractor or reseeking. Sometimes we are comparing
1095     // iterators with the same set-up, and it doesn't hurt to check them
1096     // to be equal.
1097     ReadOptions cmp_ro;
1098     cmp_ro.timestamp = readoptionscopy.timestamp;
1099     cmp_ro.snapshot = snapshot;
1100     cmp_ro.total_order_seek = true;
1101     ColumnFamilyHandle* cmp_cfh =
1102         GetControlCfh(thread, rand_column_families[0]);
1103     std::unique_ptr<Iterator> cmp_iter(db_->NewIterator(cmp_ro, cmp_cfh));
1104     bool diverged = false;
1105 
1106     bool support_seek_first_or_last = expect_total_order;
1107 
1108     LastIterateOp last_op;
1109     if (support_seek_first_or_last && thread->rand.OneIn(100)) {
1110       iter->SeekToFirst();
1111       cmp_iter->SeekToFirst();
1112       last_op = kLastOpSeekToFirst;
1113       op_logs += "STF ";
1114     } else if (support_seek_first_or_last && thread->rand.OneIn(100)) {
1115       iter->SeekToLast();
1116       cmp_iter->SeekToLast();
1117       last_op = kLastOpSeekToLast;
1118       op_logs += "STL ";
1119     } else if (thread->rand.OneIn(8)) {
1120       iter->SeekForPrev(key);
1121       cmp_iter->SeekForPrev(key);
1122       last_op = kLastOpSeekForPrev;
1123       op_logs += "SFP " + key.ToString(true) + " ";
1124     } else {
1125       iter->Seek(key);
1126       cmp_iter->Seek(key);
1127       last_op = kLastOpSeek;
1128       op_logs += "S " + key.ToString(true) + " ";
1129     }
1130     VerifyIterator(thread, cmp_cfh, readoptionscopy, iter.get(), cmp_iter.get(),
1131                    last_op, key, op_logs, &diverged);
1132 
1133     bool no_reverse =
1134         (FLAGS_memtablerep == "prefix_hash" && !expect_total_order);
1135     for (uint64_t i = 0; i < FLAGS_num_iterations && iter->Valid(); i++) {
1136       if (no_reverse || thread->rand.OneIn(2)) {
1137         iter->Next();
1138         if (!diverged) {
1139           assert(cmp_iter->Valid());
1140           cmp_iter->Next();
1141         }
1142         op_logs += "N";
1143       } else {
1144         iter->Prev();
1145         if (!diverged) {
1146           assert(cmp_iter->Valid());
1147           cmp_iter->Prev();
1148         }
1149         op_logs += "P";
1150       }
1151       last_op = kLastOpNextOrPrev;
1152       VerifyIterator(thread, cmp_cfh, readoptionscopy, iter.get(),
1153                      cmp_iter.get(), last_op, key, op_logs, &diverged);
1154     }
1155 
1156     if (s.ok()) {
1157       thread->stats.AddIterations(1);
1158     } else {
1159       fprintf(stderr, "TestIterate error: %s\n", s.ToString().c_str());
1160       thread->stats.AddErrors(1);
1161       break;
1162     }
1163 
1164     op_logs += "; ";
1165   }
1166 
1167   db_->ReleaseSnapshot(snapshot);
1168 
1169   return s;
1170 }
1171 
1172 #ifndef ROCKSDB_LITE
1173 // Test the return status of GetLiveFiles.
VerifyGetLiveFiles() const1174 Status StressTest::VerifyGetLiveFiles() const {
1175   std::vector<std::string> live_file;
1176   uint64_t manifest_size = 0;
1177   return db_->GetLiveFiles(live_file, &manifest_size);
1178 }
1179 
1180 // Test the return status of GetSortedWalFiles.
VerifyGetSortedWalFiles() const1181 Status StressTest::VerifyGetSortedWalFiles() const {
1182   VectorLogPtr log_ptr;
1183   return db_->GetSortedWalFiles(log_ptr);
1184 }
1185 
1186 // Test the return status of GetCurrentWalFile.
VerifyGetCurrentWalFile() const1187 Status StressTest::VerifyGetCurrentWalFile() const {
1188   std::unique_ptr<LogFile> cur_wal_file;
1189   return db_->GetCurrentWalFile(&cur_wal_file);
1190 }
1191 #endif  // !ROCKSDB_LITE
1192 
1193 // Compare the two iterator, iter and cmp_iter are in the same position,
1194 // unless iter might be made invalidate or undefined because of
1195 // upper or lower bounds, or prefix extractor.
1196 // Will flag failure if the verification fails.
1197 // diverged = true if the two iterator is already diverged.
1198 // True if verification passed, false if not.
VerifyIterator(ThreadState * thread,ColumnFamilyHandle * cmp_cfh,const ReadOptions & ro,Iterator * iter,Iterator * cmp_iter,LastIterateOp op,const Slice & seek_key,const std::string & op_logs,bool * diverged)1199 void StressTest::VerifyIterator(ThreadState* thread,
1200                                 ColumnFamilyHandle* cmp_cfh,
1201                                 const ReadOptions& ro, Iterator* iter,
1202                                 Iterator* cmp_iter, LastIterateOp op,
1203                                 const Slice& seek_key,
1204                                 const std::string& op_logs, bool* diverged) {
1205   if (*diverged) {
1206     return;
1207   }
1208 
1209   if (op == kLastOpSeekToFirst && ro.iterate_lower_bound != nullptr) {
1210     // SeekToFirst() with lower bound is not well defined.
1211     *diverged = true;
1212     return;
1213   } else if (op == kLastOpSeekToLast && ro.iterate_upper_bound != nullptr) {
1214     // SeekToLast() with higher bound is not well defined.
1215     *diverged = true;
1216     return;
1217   } else if (op == kLastOpSeek && ro.iterate_lower_bound != nullptr &&
1218              (options_.comparator->CompareWithoutTimestamp(
1219                   *ro.iterate_lower_bound, /*a_has_ts=*/false, seek_key,
1220                   /*b_has_ts=*/false) >= 0 ||
1221               (ro.iterate_upper_bound != nullptr &&
1222                options_.comparator->CompareWithoutTimestamp(
1223                    *ro.iterate_lower_bound, /*a_has_ts=*/false,
1224                    *ro.iterate_upper_bound, /*b_has_ts*/ false) >= 0))) {
1225     // Lower bound behavior is not well defined if it is larger than
1226     // seek key or upper bound. Disable the check for now.
1227     *diverged = true;
1228     return;
1229   } else if (op == kLastOpSeekForPrev && ro.iterate_upper_bound != nullptr &&
1230              (options_.comparator->CompareWithoutTimestamp(
1231                   *ro.iterate_upper_bound, /*a_has_ts=*/false, seek_key,
1232                   /*b_has_ts=*/false) <= 0 ||
1233               (ro.iterate_lower_bound != nullptr &&
1234                options_.comparator->CompareWithoutTimestamp(
1235                    *ro.iterate_lower_bound, /*a_has_ts=*/false,
1236                    *ro.iterate_upper_bound, /*b_has_ts=*/false) >= 0))) {
1237     // Uppder bound behavior is not well defined if it is smaller than
1238     // seek key or lower bound. Disable the check for now.
1239     *diverged = true;
1240     return;
1241   }
1242 
1243   const SliceTransform* pe = (ro.total_order_seek || ro.auto_prefix_mode)
1244                                  ? nullptr
1245                                  : options_.prefix_extractor.get();
1246   const Comparator* cmp = options_.comparator;
1247 
1248   if (iter->Valid() && !cmp_iter->Valid()) {
1249     if (pe != nullptr) {
1250       if (!pe->InDomain(seek_key)) {
1251         // Prefix seek a non-in-domain key is undefined. Skip checking for
1252         // this scenario.
1253         *diverged = true;
1254         return;
1255       } else if (!pe->InDomain(iter->key())) {
1256         // out of range is iterator key is not in domain anymore.
1257         *diverged = true;
1258         return;
1259       } else if (pe->Transform(iter->key()) != pe->Transform(seek_key)) {
1260         *diverged = true;
1261         return;
1262       }
1263     }
1264     fprintf(stderr,
1265             "Control interator is invalid but iterator has key %s "
1266             "%s\n",
1267             iter->key().ToString(true).c_str(), op_logs.c_str());
1268 
1269     *diverged = true;
1270   } else if (cmp_iter->Valid()) {
1271     // Iterator is not valid. It can be legimate if it has already been
1272     // out of upper or lower bound, or filtered out by prefix iterator.
1273     const Slice& total_order_key = cmp_iter->key();
1274 
1275     if (pe != nullptr) {
1276       if (!pe->InDomain(seek_key)) {
1277         // Prefix seek a non-in-domain key is undefined. Skip checking for
1278         // this scenario.
1279         *diverged = true;
1280         return;
1281       }
1282 
1283       if (!pe->InDomain(total_order_key) ||
1284           pe->Transform(total_order_key) != pe->Transform(seek_key)) {
1285         // If the prefix is exhausted, the only thing needs to check
1286         // is the iterator isn't return a position in prefix.
1287         // Either way, checking can stop from here.
1288         *diverged = true;
1289         if (!iter->Valid() || !pe->InDomain(iter->key()) ||
1290             pe->Transform(iter->key()) != pe->Transform(seek_key)) {
1291           return;
1292         }
1293         fprintf(stderr,
1294                 "Iterator stays in prefix but contol doesn't"
1295                 " iterator key %s control iterator key %s %s\n",
1296                 iter->key().ToString(true).c_str(),
1297                 cmp_iter->key().ToString(true).c_str(), op_logs.c_str());
1298       }
1299     }
1300     // Check upper or lower bounds.
1301     if (!*diverged) {
1302       if ((iter->Valid() && iter->key() != cmp_iter->key()) ||
1303           (!iter->Valid() &&
1304            (ro.iterate_upper_bound == nullptr ||
1305             cmp->CompareWithoutTimestamp(total_order_key, /*a_has_ts=*/false,
1306                                          *ro.iterate_upper_bound,
1307                                          /*b_has_ts=*/false) < 0) &&
1308            (ro.iterate_lower_bound == nullptr ||
1309             cmp->CompareWithoutTimestamp(total_order_key, /*a_has_ts=*/false,
1310                                          *ro.iterate_lower_bound,
1311                                          /*b_has_ts=*/false) > 0))) {
1312         fprintf(stderr,
1313                 "Iterator diverged from control iterator which"
1314                 " has value %s %s\n",
1315                 total_order_key.ToString(true).c_str(), op_logs.c_str());
1316         if (iter->Valid()) {
1317           fprintf(stderr, "iterator has value %s\n",
1318                   iter->key().ToString(true).c_str());
1319         } else {
1320           fprintf(stderr, "iterator is not valid\n");
1321         }
1322         *diverged = true;
1323       }
1324     }
1325   }
1326   if (*diverged) {
1327     fprintf(stderr, "Control CF %s\n", cmp_cfh->GetName().c_str());
1328     thread->stats.AddErrors(1);
1329     // Fail fast to preserve the DB state.
1330     thread->shared->SetVerificationFailure();
1331   }
1332 }
1333 
1334 #ifdef ROCKSDB_LITE
TestBackupRestore(ThreadState *,const std::vector<int> &,const std::vector<int64_t> &)1335 Status StressTest::TestBackupRestore(
1336     ThreadState* /* thread */,
1337     const std::vector<int>& /* rand_column_families */,
1338     const std::vector<int64_t>& /* rand_keys */) {
1339   assert(false);
1340   fprintf(stderr,
1341           "RocksDB lite does not support "
1342           "TestBackupRestore\n");
1343   std::terminate();
1344 }
1345 
TestCheckpoint(ThreadState *,const std::vector<int> &,const std::vector<int64_t> &)1346 Status StressTest::TestCheckpoint(
1347     ThreadState* /* thread */,
1348     const std::vector<int>& /* rand_column_families */,
1349     const std::vector<int64_t>& /* rand_keys */) {
1350   assert(false);
1351   fprintf(stderr,
1352           "RocksDB lite does not support "
1353           "TestCheckpoint\n");
1354   std::terminate();
1355 }
1356 
TestCompactFiles(ThreadState *,ColumnFamilyHandle *)1357 void StressTest::TestCompactFiles(ThreadState* /* thread */,
1358                                   ColumnFamilyHandle* /* column_family */) {
1359   assert(false);
1360   fprintf(stderr,
1361           "RocksDB lite does not support "
1362           "CompactFiles\n");
1363   std::terminate();
1364 }
1365 #else   // ROCKSDB_LITE
TestBackupRestore(ThreadState * thread,const std::vector<int> & rand_column_families,const std::vector<int64_t> & rand_keys)1366 Status StressTest::TestBackupRestore(
1367     ThreadState* thread, const std::vector<int>& rand_column_families,
1368     const std::vector<int64_t>& rand_keys) {
1369   std::string backup_dir = FLAGS_db + "/.backup" + ToString(thread->tid);
1370   std::string restore_dir = FLAGS_db + "/.restore" + ToString(thread->tid);
1371   BackupableDBOptions backup_opts(backup_dir);
1372   // For debugging, get info_log from live options
1373   backup_opts.info_log = db_->GetDBOptions().info_log.get();
1374   if (thread->rand.OneIn(10)) {
1375     backup_opts.share_table_files = false;
1376   } else {
1377     backup_opts.share_table_files = true;
1378     if (thread->rand.OneIn(5)) {
1379       backup_opts.share_files_with_checksum = false;
1380     } else {
1381       backup_opts.share_files_with_checksum = true;
1382       if (thread->rand.OneIn(2)) {
1383         // old
1384         backup_opts.share_files_with_checksum_naming =
1385             BackupableDBOptions::kLegacyCrc32cAndFileSize;
1386       } else {
1387         // new
1388         backup_opts.share_files_with_checksum_naming =
1389             BackupableDBOptions::kUseDbSessionId;
1390       }
1391       if (thread->rand.OneIn(2)) {
1392         backup_opts.share_files_with_checksum_naming =
1393             backup_opts.share_files_with_checksum_naming |
1394             BackupableDBOptions::kFlagIncludeFileSize;
1395       }
1396     }
1397   }
1398   BackupEngine* backup_engine = nullptr;
1399   std::string from = "a backup/restore operation";
1400   Status s = BackupEngine::Open(db_stress_env, backup_opts, &backup_engine);
1401   if (!s.ok()) {
1402     from = "BackupEngine::Open";
1403   }
1404   if (s.ok()) {
1405     if (thread->rand.OneIn(2)) {
1406       TEST_FutureSchemaVersion2Options test_opts;
1407       test_opts.crc32c_checksums = thread->rand.OneIn(2) == 0;
1408       test_opts.file_sizes = thread->rand.OneIn(2) == 0;
1409       TEST_EnableWriteFutureSchemaVersion2(backup_engine, test_opts);
1410     }
1411     s = backup_engine->CreateNewBackup(db_);
1412     if (!s.ok()) {
1413       from = "BackupEngine::CreateNewBackup";
1414     }
1415   }
1416   if (s.ok()) {
1417     delete backup_engine;
1418     backup_engine = nullptr;
1419     s = BackupEngine::Open(db_stress_env, backup_opts, &backup_engine);
1420     if (!s.ok()) {
1421       from = "BackupEngine::Open (again)";
1422     }
1423   }
1424   std::vector<BackupInfo> backup_info;
1425   // If inplace_not_restore, we verify the backup by opening it as a
1426   // read-only DB. If !inplace_not_restore, we restore it to a temporary
1427   // directory for verification.
1428   bool inplace_not_restore = thread->rand.OneIn(3);
1429   if (s.ok()) {
1430     backup_engine->GetBackupInfo(&backup_info,
1431                                  /*include_file_details*/ inplace_not_restore);
1432     if (backup_info.empty()) {
1433       s = Status::NotFound("no backups found");
1434       from = "BackupEngine::GetBackupInfo";
1435     }
1436   }
1437   if (s.ok() && thread->rand.OneIn(2)) {
1438     s = backup_engine->VerifyBackup(
1439         backup_info.front().backup_id,
1440         thread->rand.OneIn(2) /* verify_with_checksum */);
1441     if (!s.ok()) {
1442       from = "BackupEngine::VerifyBackup";
1443     }
1444   }
1445   const bool allow_persistent = thread->tid == 0;  // not too many
1446   bool from_latest = false;
1447   int count = static_cast<int>(backup_info.size());
1448   if (s.ok() && !inplace_not_restore) {
1449     if (count > 1) {
1450       s = backup_engine->RestoreDBFromBackup(
1451           RestoreOptions(), backup_info[thread->rand.Uniform(count)].backup_id,
1452           restore_dir /* db_dir */, restore_dir /* wal_dir */);
1453       if (!s.ok()) {
1454         from = "BackupEngine::RestoreDBFromBackup";
1455       }
1456     } else {
1457       from_latest = true;
1458       s = backup_engine->RestoreDBFromLatestBackup(RestoreOptions(),
1459                                                    restore_dir /* db_dir */,
1460                                                    restore_dir /* wal_dir */);
1461       if (!s.ok()) {
1462         from = "BackupEngine::RestoreDBFromLatestBackup";
1463       }
1464     }
1465   }
1466   if (s.ok() && !inplace_not_restore) {
1467     // Purge early if restoring, to ensure the restored directory doesn't
1468     // have some secret dependency on the backup directory.
1469     uint32_t to_keep = 0;
1470     if (allow_persistent) {
1471       // allow one thread to keep up to 2 backups
1472       to_keep = thread->rand.Uniform(3);
1473     }
1474     s = backup_engine->PurgeOldBackups(to_keep);
1475     if (!s.ok()) {
1476       from = "BackupEngine::PurgeOldBackups";
1477     }
1478   }
1479   DB* restored_db = nullptr;
1480   std::vector<ColumnFamilyHandle*> restored_cf_handles;
1481   // Not yet implemented: opening restored BlobDB or TransactionDB
1482   if (s.ok() && !FLAGS_use_txn && !FLAGS_use_blob_db) {
1483     Options restore_options(options_);
1484     restore_options.listeners.clear();
1485     // Avoid dangling/shared file descriptors, for reliable destroy
1486     restore_options.sst_file_manager = nullptr;
1487     std::vector<ColumnFamilyDescriptor> cf_descriptors;
1488     // TODO(ajkr): `column_family_names_` is not safe to access here when
1489     // `clear_column_family_one_in != 0`. But we can't easily switch to
1490     // `ListColumnFamilies` to get names because it won't necessarily give
1491     // the same order as `column_family_names_`.
1492     assert(FLAGS_clear_column_family_one_in == 0);
1493     for (auto name : column_family_names_) {
1494       cf_descriptors.emplace_back(name, ColumnFamilyOptions(restore_options));
1495     }
1496     if (inplace_not_restore) {
1497       BackupInfo& info = backup_info[thread->rand.Uniform(count)];
1498       restore_options.env = info.env_for_open.get();
1499       s = DB::OpenForReadOnly(DBOptions(restore_options), info.name_for_open,
1500                               cf_descriptors, &restored_cf_handles,
1501                               &restored_db);
1502       if (!s.ok()) {
1503         from = "DB::OpenForReadOnly in backup/restore";
1504       }
1505     } else {
1506       s = DB::Open(DBOptions(restore_options), restore_dir, cf_descriptors,
1507                    &restored_cf_handles, &restored_db);
1508       if (!s.ok()) {
1509         from = "DB::Open in backup/restore";
1510       }
1511     }
1512   }
1513   // Note the column families chosen by `rand_column_families` cannot be
1514   // dropped while the locks for `rand_keys` are held. So we should not have
1515   // to worry about accessing those column families throughout this function.
1516   //
1517   // For simplicity, currently only verifies existence/non-existence of a
1518   // single key
1519   for (size_t i = 0; restored_db && s.ok() && i < rand_column_families.size();
1520        ++i) {
1521     std::string key_str = Key(rand_keys[0]);
1522     Slice key = key_str;
1523     std::string restored_value;
1524     ReadOptions read_opts;
1525     std::string ts_str;
1526     Slice ts;
1527     if (FLAGS_user_timestamp_size > 0) {
1528       ts_str = GenerateTimestampForRead();
1529       ts = ts_str;
1530       read_opts.timestamp = &ts;
1531     }
1532     Status get_status = restored_db->Get(
1533         read_opts, restored_cf_handles[rand_column_families[i]], key,
1534         &restored_value);
1535     bool exists = thread->shared->Exists(rand_column_families[i], rand_keys[0]);
1536     if (get_status.ok()) {
1537       if (!exists && from_latest && ShouldAcquireMutexOnKey()) {
1538         s = Status::Corruption("key exists in restore but not in original db");
1539       }
1540     } else if (get_status.IsNotFound()) {
1541       if (exists && from_latest && ShouldAcquireMutexOnKey()) {
1542         s = Status::Corruption("key exists in original db but not in restore");
1543       }
1544     } else {
1545       s = get_status;
1546       if (!s.ok()) {
1547         from = "DB::Get in backup/restore";
1548       }
1549     }
1550   }
1551   if (restored_db != nullptr) {
1552     for (auto* cf_handle : restored_cf_handles) {
1553       restored_db->DestroyColumnFamilyHandle(cf_handle);
1554     }
1555     delete restored_db;
1556     restored_db = nullptr;
1557   }
1558   if (s.ok() && inplace_not_restore) {
1559     // Purge late if inplace open read-only
1560     uint32_t to_keep = 0;
1561     if (allow_persistent) {
1562       // allow one thread to keep up to 2 backups
1563       to_keep = thread->rand.Uniform(3);
1564     }
1565     s = backup_engine->PurgeOldBackups(to_keep);
1566     if (!s.ok()) {
1567       from = "BackupEngine::PurgeOldBackups";
1568     }
1569   }
1570   if (backup_engine != nullptr) {
1571     delete backup_engine;
1572     backup_engine = nullptr;
1573   }
1574   if (s.ok()) {
1575     // Preserve directories on failure, or allowed persistent backup
1576     if (!allow_persistent) {
1577       s = DestroyDir(db_stress_env, backup_dir);
1578       if (!s.ok()) {
1579         from = "Destroy backup dir";
1580       }
1581     }
1582   }
1583   if (s.ok()) {
1584     s = DestroyDir(db_stress_env, restore_dir);
1585     if (!s.ok()) {
1586       from = "Destroy restore dir";
1587     }
1588   }
1589   if (!s.ok()) {
1590     fprintf(stderr, "Failure in %s with: %s\n", from.c_str(),
1591             s.ToString().c_str());
1592   }
1593   return s;
1594 }
1595 
TestApproximateSize(ThreadState * thread,uint64_t iteration,const std::vector<int> & rand_column_families,const std::vector<int64_t> & rand_keys)1596 Status StressTest::TestApproximateSize(
1597     ThreadState* thread, uint64_t iteration,
1598     const std::vector<int>& rand_column_families,
1599     const std::vector<int64_t>& rand_keys) {
1600   // rand_keys likely only has one key. Just use the first one.
1601   assert(!rand_keys.empty());
1602   assert(!rand_column_families.empty());
1603   int64_t key1 = rand_keys[0];
1604   int64_t key2;
1605   if (thread->rand.OneIn(2)) {
1606     // Two totally random keys. This tends to cover large ranges.
1607     key2 = GenerateOneKey(thread, iteration);
1608     if (key2 < key1) {
1609       std::swap(key1, key2);
1610     }
1611   } else {
1612     // Unless users pass a very large FLAGS_max_key, it we should not worry
1613     // about overflow. It is for testing, so we skip the overflow checking
1614     // for simplicity.
1615     key2 = key1 + static_cast<int64_t>(thread->rand.Uniform(1000));
1616   }
1617   std::string key1_str = Key(key1);
1618   std::string key2_str = Key(key2);
1619   Range range{Slice(key1_str), Slice(key2_str)};
1620   SizeApproximationOptions sao;
1621   sao.include_memtabtles = thread->rand.OneIn(2);
1622   if (sao.include_memtabtles) {
1623     sao.include_files = thread->rand.OneIn(2);
1624   }
1625   if (thread->rand.OneIn(2)) {
1626     if (thread->rand.OneIn(2)) {
1627       sao.files_size_error_margin = 0.0;
1628     } else {
1629       sao.files_size_error_margin =
1630           static_cast<double>(thread->rand.Uniform(3));
1631     }
1632   }
1633   uint64_t result;
1634   return db_->GetApproximateSizes(
1635       sao, column_families_[rand_column_families[0]], &range, 1, &result);
1636 }
1637 
TestCheckpoint(ThreadState * thread,const std::vector<int> & rand_column_families,const std::vector<int64_t> & rand_keys)1638 Status StressTest::TestCheckpoint(ThreadState* thread,
1639                                   const std::vector<int>& rand_column_families,
1640                                   const std::vector<int64_t>& rand_keys) {
1641   std::string checkpoint_dir =
1642       FLAGS_db + "/.checkpoint" + ToString(thread->tid);
1643   Options tmp_opts(options_);
1644   tmp_opts.listeners.clear();
1645   tmp_opts.env = db_stress_env;
1646 
1647   DestroyDB(checkpoint_dir, tmp_opts);
1648 
1649   if (db_stress_env->FileExists(checkpoint_dir).ok()) {
1650     // If the directory might still exist, try to delete the files one by one.
1651     // Likely a trash file is still there.
1652     Status my_s = DestroyDir(db_stress_env, checkpoint_dir);
1653     if (!my_s.ok()) {
1654       fprintf(stderr, "Fail to destory directory before checkpoint: %s",
1655               my_s.ToString().c_str());
1656     }
1657   }
1658 
1659   Checkpoint* checkpoint = nullptr;
1660   Status s = Checkpoint::Create(db_, &checkpoint);
1661   if (s.ok()) {
1662     s = checkpoint->CreateCheckpoint(checkpoint_dir);
1663     if (!s.ok()) {
1664       fprintf(stderr, "Fail to create checkpoint to %s\n",
1665               checkpoint_dir.c_str());
1666       std::vector<std::string> files;
1667       Status my_s = db_stress_env->GetChildren(checkpoint_dir, &files);
1668       if (my_s.ok()) {
1669         for (const auto& f : files) {
1670           fprintf(stderr, " %s\n", f.c_str());
1671         }
1672       } else {
1673         fprintf(stderr, "Fail to get files under the directory to %s\n",
1674                 my_s.ToString().c_str());
1675       }
1676     }
1677   }
1678   delete checkpoint;
1679   checkpoint = nullptr;
1680   std::vector<ColumnFamilyHandle*> cf_handles;
1681   DB* checkpoint_db = nullptr;
1682   if (s.ok()) {
1683     Options options(options_);
1684     options.listeners.clear();
1685     std::vector<ColumnFamilyDescriptor> cf_descs;
1686     // TODO(ajkr): `column_family_names_` is not safe to access here when
1687     // `clear_column_family_one_in != 0`. But we can't easily switch to
1688     // `ListColumnFamilies` to get names because it won't necessarily give
1689     // the same order as `column_family_names_`.
1690     assert(FLAGS_clear_column_family_one_in == 0);
1691     if (FLAGS_clear_column_family_one_in == 0) {
1692       for (const auto& name : column_family_names_) {
1693         cf_descs.emplace_back(name, ColumnFamilyOptions(options));
1694       }
1695       s = DB::OpenForReadOnly(DBOptions(options), checkpoint_dir, cf_descs,
1696                               &cf_handles, &checkpoint_db);
1697     }
1698   }
1699   if (checkpoint_db != nullptr) {
1700     // Note the column families chosen by `rand_column_families` cannot be
1701     // dropped while the locks for `rand_keys` are held. So we should not have
1702     // to worry about accessing those column families throughout this function.
1703     for (size_t i = 0; s.ok() && i < rand_column_families.size(); ++i) {
1704       std::string key_str = Key(rand_keys[0]);
1705       Slice key = key_str;
1706       std::string value;
1707       Status get_status = checkpoint_db->Get(
1708           ReadOptions(), cf_handles[rand_column_families[i]], key, &value);
1709       bool exists =
1710           thread->shared->Exists(rand_column_families[i], rand_keys[0]);
1711       if (get_status.ok()) {
1712         if (!exists && ShouldAcquireMutexOnKey()) {
1713           s = Status::Corruption(
1714               "key exists in checkpoint but not in original db");
1715         }
1716       } else if (get_status.IsNotFound()) {
1717         if (exists && ShouldAcquireMutexOnKey()) {
1718           s = Status::Corruption(
1719               "key exists in original db but not in checkpoint");
1720         }
1721       } else {
1722         s = get_status;
1723       }
1724     }
1725     for (auto cfh : cf_handles) {
1726       delete cfh;
1727     }
1728     cf_handles.clear();
1729     delete checkpoint_db;
1730     checkpoint_db = nullptr;
1731   }
1732 
1733   if (!s.ok()) {
1734     fprintf(stderr, "A checkpoint operation failed with: %s\n",
1735             s.ToString().c_str());
1736   } else {
1737     DestroyDB(checkpoint_dir, tmp_opts);
1738   }
1739   return s;
1740 }
1741 
TestGetProperty(ThreadState * thread) const1742 void StressTest::TestGetProperty(ThreadState* thread) const {
1743   std::unordered_set<std::string> levelPropertyNames = {
1744       DB::Properties::kAggregatedTablePropertiesAtLevel,
1745       DB::Properties::kCompressionRatioAtLevelPrefix,
1746       DB::Properties::kNumFilesAtLevelPrefix,
1747   };
1748   std::unordered_set<std::string> unknownPropertyNames = {
1749       DB::Properties::kEstimateOldestKeyTime,
1750       DB::Properties::kOptionsStatistics,
1751       DB::Properties::
1752           kLiveSstFilesSizeAtTemperature,  // similar to levelPropertyNames, it
1753                                            // requires a number suffix
1754   };
1755   unknownPropertyNames.insert(levelPropertyNames.begin(),
1756                               levelPropertyNames.end());
1757 
1758   std::string prop;
1759   for (const auto& ppt_name_and_info : InternalStats::ppt_name_to_info) {
1760     bool res = db_->GetProperty(ppt_name_and_info.first, &prop);
1761     if (unknownPropertyNames.find(ppt_name_and_info.first) ==
1762         unknownPropertyNames.end()) {
1763       if (!res) {
1764         fprintf(stderr, "Failed to get DB property: %s\n",
1765                 ppt_name_and_info.first.c_str());
1766         thread->shared->SetVerificationFailure();
1767       }
1768       if (ppt_name_and_info.second.handle_int != nullptr) {
1769         uint64_t prop_int;
1770         if (!db_->GetIntProperty(ppt_name_and_info.first, &prop_int)) {
1771           fprintf(stderr, "Failed to get Int property: %s\n",
1772                   ppt_name_and_info.first.c_str());
1773           thread->shared->SetVerificationFailure();
1774         }
1775       }
1776       if (ppt_name_and_info.second.handle_map != nullptr) {
1777         std::map<std::string, std::string> prop_map;
1778         if (!db_->GetMapProperty(ppt_name_and_info.first, &prop_map)) {
1779           fprintf(stderr, "Failed to get Map property: %s\n",
1780                   ppt_name_and_info.first.c_str());
1781           thread->shared->SetVerificationFailure();
1782         }
1783       }
1784     }
1785   }
1786 
1787   ROCKSDB_NAMESPACE::ColumnFamilyMetaData cf_meta_data;
1788   db_->GetColumnFamilyMetaData(&cf_meta_data);
1789   int level_size = static_cast<int>(cf_meta_data.levels.size());
1790   for (int level = 0; level < level_size; level++) {
1791     for (const auto& ppt_name : levelPropertyNames) {
1792       bool res = db_->GetProperty(ppt_name + std::to_string(level), &prop);
1793       if (!res) {
1794         fprintf(stderr, "Failed to get DB property: %s\n",
1795                 (ppt_name + std::to_string(level)).c_str());
1796         thread->shared->SetVerificationFailure();
1797       }
1798     }
1799   }
1800 
1801   // Test for an invalid property name
1802   if (thread->rand.OneIn(100)) {
1803     if (db_->GetProperty("rocksdb.invalid_property_name", &prop)) {
1804       fprintf(stderr, "Failed to return false for invalid property name\n");
1805       thread->shared->SetVerificationFailure();
1806     }
1807   }
1808 }
1809 
TestCompactFiles(ThreadState * thread,ColumnFamilyHandle * column_family)1810 void StressTest::TestCompactFiles(ThreadState* thread,
1811                                   ColumnFamilyHandle* column_family) {
1812   ROCKSDB_NAMESPACE::ColumnFamilyMetaData cf_meta_data;
1813   db_->GetColumnFamilyMetaData(column_family, &cf_meta_data);
1814 
1815   // Randomly compact up to three consecutive files from a level
1816   const int kMaxRetry = 3;
1817   for (int attempt = 0; attempt < kMaxRetry; ++attempt) {
1818     size_t random_level =
1819         thread->rand.Uniform(static_cast<int>(cf_meta_data.levels.size()));
1820 
1821     const auto& files = cf_meta_data.levels[random_level].files;
1822     if (files.size() > 0) {
1823       size_t random_file_index =
1824           thread->rand.Uniform(static_cast<int>(files.size()));
1825       if (files[random_file_index].being_compacted) {
1826         // Retry as the selected file is currently being compacted
1827         continue;
1828       }
1829 
1830       std::vector<std::string> input_files;
1831       input_files.push_back(files[random_file_index].name);
1832       if (random_file_index > 0 &&
1833           !files[random_file_index - 1].being_compacted) {
1834         input_files.push_back(files[random_file_index - 1].name);
1835       }
1836       if (random_file_index + 1 < files.size() &&
1837           !files[random_file_index + 1].being_compacted) {
1838         input_files.push_back(files[random_file_index + 1].name);
1839       }
1840 
1841       size_t output_level =
1842           std::min(random_level + 1, cf_meta_data.levels.size() - 1);
1843       auto s = db_->CompactFiles(CompactionOptions(), column_family,
1844                                  input_files, static_cast<int>(output_level));
1845       if (!s.ok()) {
1846         fprintf(stdout, "Unable to perform CompactFiles(): %s\n",
1847                 s.ToString().c_str());
1848         thread->stats.AddNumCompactFilesFailed(1);
1849       } else {
1850         thread->stats.AddNumCompactFilesSucceed(1);
1851       }
1852       break;
1853     }
1854   }
1855 }
1856 #endif  // ROCKSDB_LITE
1857 
TestFlush(const std::vector<int> & rand_column_families)1858 Status StressTest::TestFlush(const std::vector<int>& rand_column_families) {
1859   FlushOptions flush_opts;
1860   std::vector<ColumnFamilyHandle*> cfhs;
1861   std::for_each(rand_column_families.begin(), rand_column_families.end(),
1862                 [this, &cfhs](int k) { cfhs.push_back(column_families_[k]); });
1863   return db_->Flush(flush_opts, cfhs);
1864 }
1865 
TestPauseBackground(ThreadState * thread)1866 Status StressTest::TestPauseBackground(ThreadState* thread) {
1867   Status status = db_->PauseBackgroundWork();
1868   if (!status.ok()) {
1869     return status;
1870   }
1871   // To avoid stalling/deadlocking ourself in this thread, just
1872   // sleep here during pause and let other threads do db operations.
1873   // Sleep up to ~16 seconds (2**24 microseconds), but very skewed
1874   // toward short pause. (1 chance in 25 of pausing >= 1s;
1875   // 1 chance in 625 of pausing full 16s.)
1876   int pwr2_micros =
1877       std::min(thread->rand.Uniform(25), thread->rand.Uniform(25));
1878   clock_->SleepForMicroseconds(1 << pwr2_micros);
1879   return db_->ContinueBackgroundWork();
1880 }
1881 
TestAcquireSnapshot(ThreadState * thread,int rand_column_family,const std::string & keystr,uint64_t i)1882 void StressTest::TestAcquireSnapshot(ThreadState* thread,
1883                                      int rand_column_family,
1884                                      const std::string& keystr, uint64_t i) {
1885   Slice key = keystr;
1886   ColumnFamilyHandle* column_family = column_families_[rand_column_family];
1887   ReadOptions ropt;
1888 #ifndef ROCKSDB_LITE
1889   auto db_impl = static_cast_with_check<DBImpl>(db_->GetRootDB());
1890   const bool ww_snapshot = thread->rand.OneIn(10);
1891   const Snapshot* snapshot =
1892       ww_snapshot ? db_impl->GetSnapshotForWriteConflictBoundary()
1893                   : db_->GetSnapshot();
1894 #else
1895   const Snapshot* snapshot = db_->GetSnapshot();
1896 #endif  // !ROCKSDB_LITE
1897   ropt.snapshot = snapshot;
1898 
1899   // Ideally, we want snapshot taking and timestamp generation to be atomic
1900   // here, so that the snapshot corresponds to the timestamp. However, it is
1901   // not possible with current GetSnapshot() API.
1902   std::string ts_str;
1903   Slice ts;
1904   if (FLAGS_user_timestamp_size > 0) {
1905     ts_str = GenerateTimestampForRead();
1906     ts = ts_str;
1907     ropt.timestamp = &ts;
1908   }
1909 
1910   std::string value_at;
1911   // When taking a snapshot, we also read a key from that snapshot. We
1912   // will later read the same key before releasing the snapshot and
1913   // verify that the results are the same.
1914   auto status_at = db_->Get(ropt, column_family, key, &value_at);
1915   std::vector<bool>* key_vec = nullptr;
1916 
1917   if (FLAGS_compare_full_db_state_snapshot && (thread->tid == 0)) {
1918     key_vec = new std::vector<bool>(FLAGS_max_key);
1919     // When `prefix_extractor` is set, seeking to beginning and scanning
1920     // across prefixes are only supported with `total_order_seek` set.
1921     ropt.total_order_seek = true;
1922     std::unique_ptr<Iterator> iterator(db_->NewIterator(ropt));
1923     for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
1924       uint64_t key_val;
1925       if (GetIntVal(iterator->key().ToString(), &key_val)) {
1926         (*key_vec)[key_val] = true;
1927       }
1928     }
1929   }
1930 
1931   ThreadState::SnapshotState snap_state = {snapshot,
1932                                            rand_column_family,
1933                                            column_family->GetName(),
1934                                            keystr,
1935                                            status_at,
1936                                            value_at,
1937                                            key_vec,
1938                                            ts_str};
1939   uint64_t hold_for = FLAGS_snapshot_hold_ops;
1940   if (FLAGS_long_running_snapshots) {
1941     // Hold 10% of snapshots for 10x more
1942     if (thread->rand.OneIn(10)) {
1943       assert(hold_for < port::kMaxInt64 / 10);
1944       hold_for *= 10;
1945       // Hold 1% of snapshots for 100x more
1946       if (thread->rand.OneIn(10)) {
1947         assert(hold_for < port::kMaxInt64 / 10);
1948         hold_for *= 10;
1949       }
1950     }
1951   }
1952   uint64_t release_at = std::min(FLAGS_ops_per_thread - 1, i + hold_for);
1953   thread->snapshot_queue.emplace(release_at, snap_state);
1954 }
1955 
MaybeReleaseSnapshots(ThreadState * thread,uint64_t i)1956 Status StressTest::MaybeReleaseSnapshots(ThreadState* thread, uint64_t i) {
1957   while (!thread->snapshot_queue.empty() &&
1958          i >= thread->snapshot_queue.front().first) {
1959     auto snap_state = thread->snapshot_queue.front().second;
1960     assert(snap_state.snapshot);
1961     // Note: this is unsafe as the cf might be dropped concurrently. But
1962     // it is ok since unclean cf drop is cunnrently not supported by write
1963     // prepared transactions.
1964     Status s = AssertSame(db_, column_families_[snap_state.cf_at], snap_state);
1965     db_->ReleaseSnapshot(snap_state.snapshot);
1966     delete snap_state.key_vec;
1967     thread->snapshot_queue.pop();
1968     if (!s.ok()) {
1969       return s;
1970     }
1971   }
1972   return Status::OK();
1973 }
1974 
TestCompactRange(ThreadState * thread,int64_t rand_key,const Slice & start_key,ColumnFamilyHandle * column_family)1975 void StressTest::TestCompactRange(ThreadState* thread, int64_t rand_key,
1976                                   const Slice& start_key,
1977                                   ColumnFamilyHandle* column_family) {
1978   int64_t end_key_num;
1979   if (port::kMaxInt64 - rand_key < FLAGS_compact_range_width) {
1980     end_key_num = port::kMaxInt64;
1981   } else {
1982     end_key_num = FLAGS_compact_range_width + rand_key;
1983   }
1984   std::string end_key_buf = Key(end_key_num);
1985   Slice end_key(end_key_buf);
1986 
1987   CompactRangeOptions cro;
1988   cro.exclusive_manual_compaction = static_cast<bool>(thread->rand.Next() % 2);
1989   cro.change_level = static_cast<bool>(thread->rand.Next() % 2);
1990   std::vector<BottommostLevelCompaction> bottom_level_styles = {
1991       BottommostLevelCompaction::kSkip,
1992       BottommostLevelCompaction::kIfHaveCompactionFilter,
1993       BottommostLevelCompaction::kForce,
1994       BottommostLevelCompaction::kForceOptimized};
1995   cro.bottommost_level_compaction =
1996       bottom_level_styles[thread->rand.Next() %
1997                           static_cast<uint32_t>(bottom_level_styles.size())];
1998   cro.allow_write_stall = static_cast<bool>(thread->rand.Next() % 2);
1999   cro.max_subcompactions = static_cast<uint32_t>(thread->rand.Next() % 4);
2000 
2001   const Snapshot* pre_snapshot = nullptr;
2002   uint32_t pre_hash = 0;
2003   if (thread->rand.OneIn(2)) {
2004     // Do some validation by declaring a snapshot and compare the data before
2005     // and after the compaction
2006     pre_snapshot = db_->GetSnapshot();
2007     pre_hash =
2008         GetRangeHash(thread, pre_snapshot, column_family, start_key, end_key);
2009   }
2010 
2011   Status status = db_->CompactRange(cro, column_family, &start_key, &end_key);
2012 
2013   if (!status.ok()) {
2014     fprintf(stdout, "Unable to perform CompactRange(): %s\n",
2015             status.ToString().c_str());
2016   }
2017 
2018   if (pre_snapshot != nullptr) {
2019     uint32_t post_hash =
2020         GetRangeHash(thread, pre_snapshot, column_family, start_key, end_key);
2021     if (pre_hash != post_hash) {
2022       fprintf(stderr,
2023               "Data hash different before and after compact range "
2024               "start_key %s end_key %s\n",
2025               start_key.ToString(true).c_str(), end_key.ToString(true).c_str());
2026       thread->stats.AddErrors(1);
2027       // Fail fast to preserve the DB state.
2028       thread->shared->SetVerificationFailure();
2029     }
2030     db_->ReleaseSnapshot(pre_snapshot);
2031   }
2032 }
2033 
GetRangeHash(ThreadState * thread,const Snapshot * snapshot,ColumnFamilyHandle * column_family,const Slice & start_key,const Slice & end_key)2034 uint32_t StressTest::GetRangeHash(ThreadState* thread, const Snapshot* snapshot,
2035                                   ColumnFamilyHandle* column_family,
2036                                   const Slice& start_key,
2037                                   const Slice& end_key) {
2038   const std::string kCrcCalculatorSepearator = ";";
2039   uint32_t crc = 0;
2040   ReadOptions ro;
2041   ro.snapshot = snapshot;
2042   ro.total_order_seek = true;
2043   std::string ts_str;
2044   Slice ts;
2045   if (FLAGS_user_timestamp_size > 0) {
2046     ts_str = GenerateTimestampForRead();
2047     ts = ts_str;
2048     ro.timestamp = &ts;
2049   }
2050   std::unique_ptr<Iterator> it(db_->NewIterator(ro, column_family));
2051   for (it->Seek(start_key);
2052        it->Valid() && options_.comparator->Compare(it->key(), end_key) <= 0;
2053        it->Next()) {
2054     crc = crc32c::Extend(crc, it->key().data(), it->key().size());
2055     crc = crc32c::Extend(crc, kCrcCalculatorSepearator.data(), 1);
2056     crc = crc32c::Extend(crc, it->value().data(), it->value().size());
2057     crc = crc32c::Extend(crc, kCrcCalculatorSepearator.data(), 1);
2058   }
2059   if (!it->status().ok()) {
2060     fprintf(stderr, "Iterator non-OK when calculating range CRC: %s\n",
2061             it->status().ToString().c_str());
2062     thread->stats.AddErrors(1);
2063     // Fail fast to preserve the DB state.
2064     thread->shared->SetVerificationFailure();
2065   }
2066   return crc;
2067 }
2068 
PrintEnv() const2069 void StressTest::PrintEnv() const {
2070   fprintf(stdout, "RocksDB version           : %d.%d\n", kMajorVersion,
2071           kMinorVersion);
2072   fprintf(stdout, "Format version            : %d\n", FLAGS_format_version);
2073   fprintf(stdout, "TransactionDB             : %s\n",
2074           FLAGS_use_txn ? "true" : "false");
2075 #ifndef ROCKSDB_LITE
2076   fprintf(stdout, "Stacked BlobDB            : %s\n",
2077           FLAGS_use_blob_db ? "true" : "false");
2078 #endif  // !ROCKSDB_LITE
2079   fprintf(stdout, "Read only mode            : %s\n",
2080           FLAGS_read_only ? "true" : "false");
2081   fprintf(stdout, "Atomic flush              : %s\n",
2082           FLAGS_atomic_flush ? "true" : "false");
2083   fprintf(stdout, "Column families           : %d\n", FLAGS_column_families);
2084   if (!FLAGS_test_batches_snapshots) {
2085     fprintf(stdout, "Clear CFs one in          : %d\n",
2086             FLAGS_clear_column_family_one_in);
2087   }
2088   fprintf(stdout, "Number of threads         : %d\n", FLAGS_threads);
2089   fprintf(stdout, "Ops per thread            : %lu\n",
2090           (unsigned long)FLAGS_ops_per_thread);
2091   std::string ttl_state("unused");
2092   if (FLAGS_ttl > 0) {
2093     ttl_state = ToString(FLAGS_ttl);
2094   }
2095   fprintf(stdout, "Time to live(sec)         : %s\n", ttl_state.c_str());
2096   fprintf(stdout, "Read percentage           : %d%%\n", FLAGS_readpercent);
2097   fprintf(stdout, "Prefix percentage         : %d%%\n", FLAGS_prefixpercent);
2098   fprintf(stdout, "Write percentage          : %d%%\n", FLAGS_writepercent);
2099   fprintf(stdout, "Delete percentage         : %d%%\n", FLAGS_delpercent);
2100   fprintf(stdout, "Delete range percentage   : %d%%\n", FLAGS_delrangepercent);
2101   fprintf(stdout, "No overwrite percentage   : %d%%\n",
2102           FLAGS_nooverwritepercent);
2103   fprintf(stdout, "Iterate percentage        : %d%%\n", FLAGS_iterpercent);
2104   fprintf(stdout, "DB-write-buffer-size      : %" PRIu64 "\n",
2105           FLAGS_db_write_buffer_size);
2106   fprintf(stdout, "Write-buffer-size         : %d\n", FLAGS_write_buffer_size);
2107   fprintf(stdout, "Iterations                : %lu\n",
2108           (unsigned long)FLAGS_num_iterations);
2109   fprintf(stdout, "Max key                   : %lu\n",
2110           (unsigned long)FLAGS_max_key);
2111   fprintf(stdout, "Ratio #ops/#keys          : %f\n",
2112           (1.0 * FLAGS_ops_per_thread * FLAGS_threads) / FLAGS_max_key);
2113   fprintf(stdout, "Num times DB reopens      : %d\n", FLAGS_reopen);
2114   fprintf(stdout, "Batches/snapshots         : %d\n",
2115           FLAGS_test_batches_snapshots);
2116   fprintf(stdout, "Do update in place        : %d\n", FLAGS_in_place_update);
2117   fprintf(stdout, "Num keys per lock         : %d\n",
2118           1 << FLAGS_log2_keys_per_lock);
2119   std::string compression = CompressionTypeToString(compression_type_e);
2120   fprintf(stdout, "Compression               : %s\n", compression.c_str());
2121   std::string bottommost_compression =
2122       CompressionTypeToString(bottommost_compression_type_e);
2123   fprintf(stdout, "Bottommost Compression    : %s\n",
2124           bottommost_compression.c_str());
2125   std::string checksum = ChecksumTypeToString(checksum_type_e);
2126   fprintf(stdout, "Checksum type             : %s\n", checksum.c_str());
2127   fprintf(stdout, "File checksum impl        : %s\n",
2128           FLAGS_file_checksum_impl.c_str());
2129   fprintf(stdout, "Bloom bits / key          : %s\n",
2130           FormatDoubleParam(FLAGS_bloom_bits).c_str());
2131   fprintf(stdout, "Max subcompactions        : %" PRIu64 "\n",
2132           FLAGS_subcompactions);
2133   fprintf(stdout, "Use MultiGet              : %s\n",
2134           FLAGS_use_multiget ? "true" : "false");
2135 
2136   const char* memtablerep = "";
2137   switch (FLAGS_rep_factory) {
2138     case kSkipList:
2139       memtablerep = "skip_list";
2140       break;
2141     case kHashSkipList:
2142       memtablerep = "prefix_hash";
2143       break;
2144     case kVectorRep:
2145       memtablerep = "vector";
2146       break;
2147   }
2148 
2149   fprintf(stdout, "Memtablerep               : %s\n", memtablerep);
2150 
2151 #ifndef NDEBUG
2152   KillPoint* kp = KillPoint::GetInstance();
2153   fprintf(stdout, "Test kill odd             : %d\n", kp->rocksdb_kill_odds);
2154   if (!kp->rocksdb_kill_exclude_prefixes.empty()) {
2155     fprintf(stdout, "Skipping kill points prefixes:\n");
2156     for (auto& p : kp->rocksdb_kill_exclude_prefixes) {
2157       fprintf(stdout, "  %s\n", p.c_str());
2158     }
2159   }
2160 #endif
2161   fprintf(stdout, "Periodic Compaction Secs  : %" PRIu64 "\n",
2162           FLAGS_periodic_compaction_seconds);
2163   fprintf(stdout, "Compaction TTL            : %" PRIu64 "\n",
2164           FLAGS_compaction_ttl);
2165   fprintf(stdout, "Background Purge          : %d\n",
2166           static_cast<int>(FLAGS_avoid_unnecessary_blocking_io));
2167   fprintf(stdout, "Write DB ID to manifest   : %d\n",
2168           static_cast<int>(FLAGS_write_dbid_to_manifest));
2169   fprintf(stdout, "Max Write Batch Group Size: %" PRIu64 "\n",
2170           FLAGS_max_write_batch_group_size_bytes);
2171   fprintf(stdout, "Use dynamic level         : %d\n",
2172           static_cast<int>(FLAGS_level_compaction_dynamic_level_bytes));
2173   fprintf(stdout, "Read fault one in         : %d\n", FLAGS_read_fault_one_in);
2174   fprintf(stdout, "Write fault one in        : %d\n", FLAGS_write_fault_one_in);
2175   fprintf(stdout, "Open metadata write fault one in:\n");
2176   fprintf(stdout, "                            %d\n",
2177           FLAGS_open_metadata_write_fault_one_in);
2178   fprintf(stdout, "Sync fault injection      : %d\n", FLAGS_sync_fault_injection);
2179   fprintf(stdout, "Best efforts recovery     : %d\n",
2180           static_cast<int>(FLAGS_best_efforts_recovery));
2181   fprintf(stdout, "Fail if OPTIONS file error: %d\n",
2182           static_cast<int>(FLAGS_fail_if_options_file_error));
2183   fprintf(stdout, "User timestamp size bytes : %d\n",
2184           static_cast<int>(FLAGS_user_timestamp_size));
2185 
2186   fprintf(stdout, "------------------------------------------------\n");
2187 }
2188 
Open()2189 void StressTest::Open() {
2190   assert(db_ == nullptr);
2191 #ifndef ROCKSDB_LITE
2192   assert(txn_db_ == nullptr);
2193 #endif
2194   if (FLAGS_options_file.empty()) {
2195     BlockBasedTableOptions block_based_options;
2196     block_based_options.block_cache = cache_;
2197     block_based_options.cache_index_and_filter_blocks =
2198         FLAGS_cache_index_and_filter_blocks;
2199     block_based_options.metadata_cache_options.top_level_index_pinning =
2200         static_cast<PinningTier>(FLAGS_top_level_index_pinning);
2201     block_based_options.metadata_cache_options.partition_pinning =
2202         static_cast<PinningTier>(FLAGS_partition_pinning);
2203     block_based_options.metadata_cache_options.unpartitioned_pinning =
2204         static_cast<PinningTier>(FLAGS_unpartitioned_pinning);
2205     block_based_options.block_cache_compressed = compressed_cache_;
2206     block_based_options.checksum = checksum_type_e;
2207     block_based_options.block_size = FLAGS_block_size;
2208     block_based_options.format_version =
2209         static_cast<uint32_t>(FLAGS_format_version);
2210     block_based_options.index_block_restart_interval =
2211         static_cast<int32_t>(FLAGS_index_block_restart_interval);
2212     block_based_options.filter_policy = filter_policy_;
2213     block_based_options.partition_filters = FLAGS_partition_filters;
2214     block_based_options.optimize_filters_for_memory =
2215         FLAGS_optimize_filters_for_memory;
2216     block_based_options.index_type =
2217         static_cast<BlockBasedTableOptions::IndexType>(FLAGS_index_type);
2218     options_.table_factory.reset(
2219         NewBlockBasedTableFactory(block_based_options));
2220     options_.db_write_buffer_size = FLAGS_db_write_buffer_size;
2221     options_.write_buffer_size = FLAGS_write_buffer_size;
2222     options_.max_write_buffer_number = FLAGS_max_write_buffer_number;
2223     options_.min_write_buffer_number_to_merge =
2224         FLAGS_min_write_buffer_number_to_merge;
2225     options_.max_write_buffer_number_to_maintain =
2226         FLAGS_max_write_buffer_number_to_maintain;
2227     options_.max_write_buffer_size_to_maintain =
2228         FLAGS_max_write_buffer_size_to_maintain;
2229     options_.memtable_prefix_bloom_size_ratio =
2230         FLAGS_memtable_prefix_bloom_size_ratio;
2231     options_.memtable_whole_key_filtering = FLAGS_memtable_whole_key_filtering;
2232     options_.max_background_compactions = FLAGS_max_background_compactions;
2233     options_.max_background_flushes = FLAGS_max_background_flushes;
2234     options_.compaction_style =
2235         static_cast<ROCKSDB_NAMESPACE::CompactionStyle>(FLAGS_compaction_style);
2236     if (FLAGS_prefix_size >= 0) {
2237       options_.prefix_extractor.reset(
2238           NewFixedPrefixTransform(FLAGS_prefix_size));
2239     }
2240     options_.max_open_files = FLAGS_open_files;
2241     options_.statistics = dbstats;
2242     options_.env = db_stress_env;
2243     options_.use_fsync = FLAGS_use_fsync;
2244     options_.compaction_readahead_size = FLAGS_compaction_readahead_size;
2245     options_.allow_mmap_reads = FLAGS_mmap_read;
2246     options_.allow_mmap_writes = FLAGS_mmap_write;
2247     options_.use_direct_reads = FLAGS_use_direct_reads;
2248     options_.use_direct_io_for_flush_and_compaction =
2249         FLAGS_use_direct_io_for_flush_and_compaction;
2250     options_.recycle_log_file_num =
2251         static_cast<size_t>(FLAGS_recycle_log_file_num);
2252     options_.target_file_size_base = FLAGS_target_file_size_base;
2253     options_.target_file_size_multiplier = FLAGS_target_file_size_multiplier;
2254     options_.max_bytes_for_level_base = FLAGS_max_bytes_for_level_base;
2255     options_.max_bytes_for_level_multiplier =
2256         FLAGS_max_bytes_for_level_multiplier;
2257     options_.level0_stop_writes_trigger = FLAGS_level0_stop_writes_trigger;
2258     options_.level0_slowdown_writes_trigger =
2259         FLAGS_level0_slowdown_writes_trigger;
2260     options_.level0_file_num_compaction_trigger =
2261         FLAGS_level0_file_num_compaction_trigger;
2262     options_.compression = compression_type_e;
2263     options_.bottommost_compression = bottommost_compression_type_e;
2264     options_.compression_opts.max_dict_bytes = FLAGS_compression_max_dict_bytes;
2265     options_.compression_opts.zstd_max_train_bytes =
2266         FLAGS_compression_zstd_max_train_bytes;
2267     options_.compression_opts.parallel_threads =
2268         FLAGS_compression_parallel_threads;
2269     options_.compression_opts.max_dict_buffer_bytes =
2270         FLAGS_compression_max_dict_buffer_bytes;
2271     options_.create_if_missing = true;
2272     options_.max_manifest_file_size = FLAGS_max_manifest_file_size;
2273     options_.inplace_update_support = FLAGS_in_place_update;
2274     options_.max_subcompactions = static_cast<uint32_t>(FLAGS_subcompactions);
2275     options_.allow_concurrent_memtable_write =
2276         FLAGS_allow_concurrent_memtable_write;
2277     options_.experimental_mempurge_threshold =
2278         FLAGS_experimental_mempurge_threshold;
2279     options_.periodic_compaction_seconds = FLAGS_periodic_compaction_seconds;
2280     options_.ttl = FLAGS_compaction_ttl;
2281     options_.enable_pipelined_write = FLAGS_enable_pipelined_write;
2282     options_.enable_write_thread_adaptive_yield =
2283         FLAGS_enable_write_thread_adaptive_yield;
2284     options_.compaction_options_universal.size_ratio =
2285         FLAGS_universal_size_ratio;
2286     options_.compaction_options_universal.min_merge_width =
2287         FLAGS_universal_min_merge_width;
2288     options_.compaction_options_universal.max_merge_width =
2289         FLAGS_universal_max_merge_width;
2290     options_.compaction_options_universal.max_size_amplification_percent =
2291         FLAGS_universal_max_size_amplification_percent;
2292     options_.atomic_flush = FLAGS_atomic_flush;
2293     options_.avoid_unnecessary_blocking_io =
2294         FLAGS_avoid_unnecessary_blocking_io;
2295     options_.write_dbid_to_manifest = FLAGS_write_dbid_to_manifest;
2296     options_.avoid_flush_during_recovery = FLAGS_avoid_flush_during_recovery;
2297     options_.max_write_batch_group_size_bytes =
2298         FLAGS_max_write_batch_group_size_bytes;
2299     options_.level_compaction_dynamic_level_bytes =
2300         FLAGS_level_compaction_dynamic_level_bytes;
2301     options_.file_checksum_gen_factory =
2302         GetFileChecksumImpl(FLAGS_file_checksum_impl);
2303     options_.track_and_verify_wals_in_manifest = true;
2304 
2305     // Integrated BlobDB
2306     options_.enable_blob_files = FLAGS_enable_blob_files;
2307     options_.min_blob_size = FLAGS_min_blob_size;
2308     options_.blob_file_size = FLAGS_blob_file_size;
2309     options_.blob_compression_type =
2310         StringToCompressionType(FLAGS_blob_compression_type.c_str());
2311     options_.enable_blob_garbage_collection =
2312         FLAGS_enable_blob_garbage_collection;
2313     options_.blob_garbage_collection_age_cutoff =
2314         FLAGS_blob_garbage_collection_age_cutoff;
2315     options_.blob_garbage_collection_force_threshold =
2316         FLAGS_blob_garbage_collection_force_threshold;
2317   } else {
2318 #ifdef ROCKSDB_LITE
2319     fprintf(stderr, "--options_file not supported in lite mode\n");
2320     exit(1);
2321 #else
2322     DBOptions db_options;
2323     std::vector<ColumnFamilyDescriptor> cf_descriptors;
2324     Status s = LoadOptionsFromFile(FLAGS_options_file, db_stress_env,
2325                                    &db_options, &cf_descriptors);
2326     db_options.env = new DbStressEnvWrapper(db_stress_env);
2327     if (!s.ok()) {
2328       fprintf(stderr, "Unable to load options file %s --- %s\n",
2329               FLAGS_options_file.c_str(), s.ToString().c_str());
2330       exit(1);
2331     }
2332     options_ = Options(db_options, cf_descriptors[0].options);
2333 #endif  // ROCKSDB_LITE
2334   }
2335 
2336   if (FLAGS_rate_limiter_bytes_per_sec > 0) {
2337     options_.rate_limiter.reset(NewGenericRateLimiter(
2338         FLAGS_rate_limiter_bytes_per_sec, 1000 /* refill_period_us */,
2339         10 /* fairness */,
2340         FLAGS_rate_limit_bg_reads ? RateLimiter::Mode::kReadsOnly
2341                                   : RateLimiter::Mode::kWritesOnly));
2342     if (FLAGS_rate_limit_bg_reads) {
2343       options_.new_table_reader_for_compaction_inputs = true;
2344     }
2345   }
2346   if (FLAGS_sst_file_manager_bytes_per_sec > 0 ||
2347       FLAGS_sst_file_manager_bytes_per_truncate > 0) {
2348     Status status;
2349     options_.sst_file_manager.reset(NewSstFileManager(
2350         db_stress_env, options_.info_log, "" /* trash_dir */,
2351         static_cast<int64_t>(FLAGS_sst_file_manager_bytes_per_sec),
2352         true /* delete_existing_trash */, &status,
2353         0.25 /* max_trash_db_ratio */,
2354         FLAGS_sst_file_manager_bytes_per_truncate));
2355     if (!status.ok()) {
2356       fprintf(stderr, "SstFileManager creation failed: %s\n",
2357               status.ToString().c_str());
2358       exit(1);
2359     }
2360   }
2361 
2362   if (FLAGS_prefix_size == 0 && FLAGS_rep_factory == kHashSkipList) {
2363     fprintf(stderr,
2364             "prefeix_size cannot be zero if memtablerep == prefix_hash\n");
2365     exit(1);
2366   }
2367   if (FLAGS_prefix_size != 0 && FLAGS_rep_factory != kHashSkipList) {
2368     fprintf(stderr,
2369             "WARNING: prefix_size is non-zero but "
2370             "memtablerep != prefix_hash\n");
2371   }
2372   switch (FLAGS_rep_factory) {
2373     case kSkipList:
2374       // no need to do anything
2375       break;
2376 #ifndef ROCKSDB_LITE
2377     case kHashSkipList:
2378       options_.memtable_factory.reset(NewHashSkipListRepFactory(10000));
2379       break;
2380     case kVectorRep:
2381       options_.memtable_factory.reset(new VectorRepFactory());
2382       break;
2383 #else
2384     default:
2385       fprintf(stderr,
2386               "RocksdbLite only supports skip list mem table. Skip "
2387               "--rep_factory\n");
2388 #endif  // ROCKSDB_LITE
2389   }
2390 
2391   if (FLAGS_use_full_merge_v1) {
2392     options_.merge_operator = MergeOperators::CreateDeprecatedPutOperator();
2393   } else {
2394     options_.merge_operator = MergeOperators::CreatePutOperator();
2395   }
2396   if (FLAGS_enable_compaction_filter) {
2397     options_.compaction_filter_factory =
2398         std::make_shared<DbStressCompactionFilterFactory>();
2399   }
2400   options_.table_properties_collector_factories.emplace_back(
2401       std::make_shared<DbStressTablePropertiesCollectorFactory>());
2402 
2403   options_.best_efforts_recovery = FLAGS_best_efforts_recovery;
2404   options_.paranoid_file_checks = FLAGS_paranoid_file_checks;
2405   options_.fail_if_options_file_error = FLAGS_fail_if_options_file_error;
2406 
2407   if ((options_.enable_blob_files || options_.enable_blob_garbage_collection ||
2408        FLAGS_allow_setting_blob_options_dynamically) &&
2409       FLAGS_best_efforts_recovery) {
2410     fprintf(stderr,
2411             "Integrated BlobDB is currently incompatible with best-effort "
2412             "recovery\n");
2413     exit(1);
2414   }
2415 
2416   if (options_.enable_blob_files) {
2417     fprintf(stdout,
2418             "Integrated BlobDB: blob files enabled, min blob size %" PRIu64
2419             ", blob file size %" PRIu64 ", blob compression type %s\n",
2420             options_.min_blob_size, options_.blob_file_size,
2421             CompressionTypeToString(options_.blob_compression_type).c_str());
2422   }
2423 
2424   if (options_.enable_blob_garbage_collection) {
2425     fprintf(
2426         stdout,
2427         "Integrated BlobDB: blob GC enabled, cutoff %f, force threshold %f\n",
2428         options_.blob_garbage_collection_age_cutoff,
2429         options_.blob_garbage_collection_force_threshold);
2430   }
2431 
2432   fprintf(stdout, "DB path: [%s]\n", FLAGS_db.c_str());
2433 
2434   Status s;
2435 
2436   if (FLAGS_user_timestamp_size > 0) {
2437     CheckAndSetOptionsForUserTimestamp();
2438   }
2439 
2440   if (FLAGS_ttl == -1) {
2441     std::vector<std::string> existing_column_families;
2442     s = DB::ListColumnFamilies(DBOptions(options_), FLAGS_db,
2443                                &existing_column_families);  // ignore errors
2444     if (!s.ok()) {
2445       // DB doesn't exist
2446       assert(existing_column_families.empty());
2447       assert(column_family_names_.empty());
2448       column_family_names_.push_back(kDefaultColumnFamilyName);
2449     } else if (column_family_names_.empty()) {
2450       // this is the first call to the function Open()
2451       column_family_names_ = existing_column_families;
2452     } else {
2453       // this is a reopen. just assert that existing column_family_names are
2454       // equivalent to what we remember
2455       auto sorted_cfn = column_family_names_;
2456       std::sort(sorted_cfn.begin(), sorted_cfn.end());
2457       std::sort(existing_column_families.begin(),
2458                 existing_column_families.end());
2459       if (sorted_cfn != existing_column_families) {
2460         fprintf(stderr, "Expected column families differ from the existing:\n");
2461         fprintf(stderr, "Expected: {");
2462         for (auto cf : sorted_cfn) {
2463           fprintf(stderr, "%s ", cf.c_str());
2464         }
2465         fprintf(stderr, "}\n");
2466         fprintf(stderr, "Existing: {");
2467         for (auto cf : existing_column_families) {
2468           fprintf(stderr, "%s ", cf.c_str());
2469         }
2470         fprintf(stderr, "}\n");
2471       }
2472       assert(sorted_cfn == existing_column_families);
2473     }
2474     std::vector<ColumnFamilyDescriptor> cf_descriptors;
2475     for (auto name : column_family_names_) {
2476       if (name != kDefaultColumnFamilyName) {
2477         new_column_family_name_ =
2478             std::max(new_column_family_name_.load(), std::stoi(name) + 1);
2479       }
2480       cf_descriptors.emplace_back(name, ColumnFamilyOptions(options_));
2481     }
2482     while (cf_descriptors.size() < (size_t)FLAGS_column_families) {
2483       std::string name = ToString(new_column_family_name_.load());
2484       new_column_family_name_++;
2485       cf_descriptors.emplace_back(name, ColumnFamilyOptions(options_));
2486       column_family_names_.push_back(name);
2487     }
2488     options_.listeners.clear();
2489 #ifndef ROCKSDB_LITE
2490     options_.listeners.emplace_back(
2491         new DbStressListener(FLAGS_db, options_.db_paths, cf_descriptors));
2492 #endif  // !ROCKSDB_LITE
2493     options_.create_missing_column_families = true;
2494     if (!FLAGS_use_txn) {
2495 #ifndef NDEBUG
2496       // Determine whether we need to ingest file metadata write failures
2497       // during DB reopen. If it does, enable it.
2498       // Only ingest metadata error if it is reopening, as initial open
2499       // failure doesn't need to be handled.
2500       // TODO cover transaction DB is not covered in this fault test too.
2501       bool ingest_meta_error = false;
2502       bool ingest_write_error = false;
2503       bool ingest_read_error = false;
2504       if ((FLAGS_open_metadata_write_fault_one_in ||
2505            FLAGS_open_write_fault_one_in || FLAGS_open_read_fault_one_in) &&
2506           fault_fs_guard
2507               ->FileExists(FLAGS_db + "/CURRENT", IOOptions(), nullptr)
2508               .ok()) {
2509         if (!FLAGS_sync) {
2510           // When DB Stress is not sync mode, we expect all WAL writes to
2511           // WAL is durable. Buffering unsynced writes will cause false
2512           // positive in crash tests. Before we figure out a way to
2513           // solve it, skip WAL from failure injection.
2514           fault_fs_guard->SetSkipDirectWritableTypes({kWalFile});
2515         }
2516         ingest_meta_error = FLAGS_open_metadata_write_fault_one_in;
2517         ingest_write_error = FLAGS_open_write_fault_one_in;
2518         ingest_read_error = FLAGS_open_read_fault_one_in;
2519         if (ingest_meta_error) {
2520           fault_fs_guard->EnableMetadataWriteErrorInjection();
2521           fault_fs_guard->SetRandomMetadataWriteError(
2522               FLAGS_open_metadata_write_fault_one_in);
2523         }
2524         if (ingest_write_error) {
2525           fault_fs_guard->SetFilesystemDirectWritable(false);
2526           fault_fs_guard->EnableWriteErrorInjection();
2527           fault_fs_guard->SetRandomWriteError(
2528               static_cast<uint32_t>(FLAGS_seed), FLAGS_open_write_fault_one_in,
2529               IOStatus::IOError("Injected Open Error"),
2530               /*inject_for_all_file_types=*/true, /*types=*/{});
2531         }
2532         if (ingest_read_error) {
2533           fault_fs_guard->SetRandomReadError(FLAGS_open_read_fault_one_in);
2534         }
2535       }
2536       while (true) {
2537 #endif  // NDEBUG
2538 #ifndef ROCKSDB_LITE
2539         // StackableDB-based BlobDB
2540         if (FLAGS_use_blob_db) {
2541           blob_db::BlobDBOptions blob_db_options;
2542           blob_db_options.min_blob_size = FLAGS_blob_db_min_blob_size;
2543           blob_db_options.bytes_per_sync = FLAGS_blob_db_bytes_per_sync;
2544           blob_db_options.blob_file_size = FLAGS_blob_db_file_size;
2545           blob_db_options.enable_garbage_collection = FLAGS_blob_db_enable_gc;
2546           blob_db_options.garbage_collection_cutoff = FLAGS_blob_db_gc_cutoff;
2547 
2548           blob_db::BlobDB* blob_db = nullptr;
2549           s = blob_db::BlobDB::Open(options_, blob_db_options, FLAGS_db,
2550                                     cf_descriptors, &column_families_,
2551                                     &blob_db);
2552           if (s.ok()) {
2553             db_ = blob_db;
2554           }
2555         } else
2556 #endif  // !ROCKSDB_LITE
2557         {
2558           if (db_preload_finished_.load() && FLAGS_read_only) {
2559             s = DB::OpenForReadOnly(DBOptions(options_), FLAGS_db,
2560                                     cf_descriptors, &column_families_, &db_);
2561           } else {
2562             s = DB::Open(DBOptions(options_), FLAGS_db, cf_descriptors,
2563                          &column_families_, &db_);
2564           }
2565         }
2566 
2567 #ifndef NDEBUG
2568         if (ingest_meta_error || ingest_write_error || ingest_read_error) {
2569           fault_fs_guard->SetFilesystemDirectWritable(true);
2570           fault_fs_guard->DisableMetadataWriteErrorInjection();
2571           fault_fs_guard->DisableWriteErrorInjection();
2572           fault_fs_guard->SetSkipDirectWritableTypes({});
2573           fault_fs_guard->SetRandomReadError(0);
2574           if (s.ok()) {
2575             // Ingested errors might happen in background compactions. We
2576             // wait for all compactions to finish to make sure DB is in
2577             // clean state before executing queries.
2578             s = static_cast_with_check<DBImpl>(db_->GetRootDB())
2579                     ->TEST_WaitForCompact(true);
2580             if (!s.ok()) {
2581               for (auto cf : column_families_) {
2582                 delete cf;
2583               }
2584               column_families_.clear();
2585               delete db_;
2586               db_ = nullptr;
2587             }
2588           }
2589           if (!s.ok()) {
2590             // After failure to opening a DB due to IO error, retry should
2591             // successfully open the DB with correct data if no IO error shows
2592             // up.
2593             ingest_meta_error = false;
2594             ingest_write_error = false;
2595             ingest_read_error = false;
2596 
2597             Random rand(static_cast<uint32_t>(FLAGS_seed));
2598             if (rand.OneIn(2)) {
2599               fault_fs_guard->DeleteFilesCreatedAfterLastDirSync(IOOptions(),
2600                                                                  nullptr);
2601             }
2602             if (rand.OneIn(3)) {
2603               fault_fs_guard->DropUnsyncedFileData();
2604             } else if (rand.OneIn(2)) {
2605               fault_fs_guard->DropRandomUnsyncedFileData(&rand);
2606             }
2607             continue;
2608           }
2609         }
2610         break;
2611       }
2612 #endif  // NDEBUG
2613     } else {
2614 #ifndef ROCKSDB_LITE
2615       TransactionDBOptions txn_db_options;
2616       assert(FLAGS_txn_write_policy <= TxnDBWritePolicy::WRITE_UNPREPARED);
2617       txn_db_options.write_policy =
2618           static_cast<TxnDBWritePolicy>(FLAGS_txn_write_policy);
2619       if (FLAGS_unordered_write) {
2620         assert(txn_db_options.write_policy == TxnDBWritePolicy::WRITE_PREPARED);
2621         options_.unordered_write = true;
2622         options_.two_write_queues = true;
2623         txn_db_options.skip_concurrency_control = true;
2624       }
2625       s = TransactionDB::Open(options_, txn_db_options, FLAGS_db,
2626                               cf_descriptors, &column_families_, &txn_db_);
2627       if (!s.ok()) {
2628         fprintf(stderr, "Error in opening the TransactionDB [%s]\n",
2629                 s.ToString().c_str());
2630         fflush(stderr);
2631       }
2632       assert(s.ok());
2633       db_ = txn_db_;
2634       // after a crash, rollback to commit recovered transactions
2635       std::vector<Transaction*> trans;
2636       txn_db_->GetAllPreparedTransactions(&trans);
2637       Random rand(static_cast<uint32_t>(FLAGS_seed));
2638       for (auto txn : trans) {
2639         if (rand.OneIn(2)) {
2640           s = txn->Commit();
2641           assert(s.ok());
2642         } else {
2643           s = txn->Rollback();
2644           assert(s.ok());
2645         }
2646         delete txn;
2647       }
2648       trans.clear();
2649       txn_db_->GetAllPreparedTransactions(&trans);
2650       assert(trans.size() == 0);
2651 #endif
2652     }
2653     assert(!s.ok() || column_families_.size() ==
2654                           static_cast<size_t>(FLAGS_column_families));
2655 
2656     if (s.ok() && FLAGS_test_secondary) {
2657 #ifndef ROCKSDB_LITE
2658       secondaries_.resize(FLAGS_threads);
2659       std::fill(secondaries_.begin(), secondaries_.end(), nullptr);
2660       secondary_cfh_lists_.clear();
2661       secondary_cfh_lists_.resize(FLAGS_threads);
2662       Options tmp_opts;
2663       // TODO(yanqin) support max_open_files != -1 for secondary instance.
2664       tmp_opts.max_open_files = -1;
2665       tmp_opts.statistics = dbstats_secondaries;
2666       tmp_opts.env = db_stress_env;
2667       for (size_t i = 0; i != static_cast<size_t>(FLAGS_threads); ++i) {
2668         const std::string secondary_path =
2669             FLAGS_secondaries_base + "/" + std::to_string(i);
2670         s = DB::OpenAsSecondary(tmp_opts, FLAGS_db, secondary_path,
2671                                 cf_descriptors, &secondary_cfh_lists_[i],
2672                                 &secondaries_[i]);
2673         if (!s.ok()) {
2674           break;
2675         }
2676       }
2677 #else
2678       fprintf(stderr, "Secondary is not supported in RocksDBLite\n");
2679       exit(1);
2680 #endif
2681     }
2682     if (s.ok() && FLAGS_continuous_verification_interval > 0 && !cmp_db_) {
2683       Options tmp_opts;
2684       // TODO(yanqin) support max_open_files != -1 for secondary instance.
2685       tmp_opts.max_open_files = -1;
2686       tmp_opts.env = db_stress_env;
2687       std::string secondary_path = FLAGS_secondaries_base + "/cmp_database";
2688       s = DB::OpenAsSecondary(tmp_opts, FLAGS_db, secondary_path,
2689                               cf_descriptors, &cmp_cfhs_, &cmp_db_);
2690       assert(!s.ok() ||
2691              cmp_cfhs_.size() == static_cast<size_t>(FLAGS_column_families));
2692     }
2693   } else {
2694 #ifndef ROCKSDB_LITE
2695     DBWithTTL* db_with_ttl;
2696     s = DBWithTTL::Open(options_, FLAGS_db, &db_with_ttl, FLAGS_ttl);
2697     db_ = db_with_ttl;
2698     if (FLAGS_test_secondary) {
2699       secondaries_.resize(FLAGS_threads);
2700       std::fill(secondaries_.begin(), secondaries_.end(), nullptr);
2701       Options tmp_opts;
2702       tmp_opts.env = options_.env;
2703       // TODO(yanqin) support max_open_files != -1 for secondary instance.
2704       tmp_opts.max_open_files = -1;
2705       for (size_t i = 0; i != static_cast<size_t>(FLAGS_threads); ++i) {
2706         const std::string secondary_path =
2707             FLAGS_secondaries_base + "/" + std::to_string(i);
2708         s = DB::OpenAsSecondary(tmp_opts, FLAGS_db, secondary_path,
2709                                 &secondaries_[i]);
2710         if (!s.ok()) {
2711           break;
2712         }
2713       }
2714     }
2715 #else
2716     fprintf(stderr, "TTL is not supported in RocksDBLite\n");
2717     exit(1);
2718 #endif
2719   }
2720   if (!s.ok()) {
2721     fprintf(stderr, "open error: %s\n", s.ToString().c_str());
2722     exit(1);
2723   }
2724 }
2725 
Reopen(ThreadState * thread)2726 void StressTest::Reopen(ThreadState* thread) {
2727 #ifndef ROCKSDB_LITE
2728   // BG jobs in WritePrepared must be canceled first because i) they can access
2729   // the db via a callbac ii) they hold on to a snapshot and the upcoming
2730   // ::Close would complain about it.
2731   const bool write_prepared = FLAGS_use_txn && FLAGS_txn_write_policy != 0;
2732   bool bg_canceled = false;
2733   if (write_prepared || thread->rand.OneIn(2)) {
2734     const bool wait =
2735         write_prepared || static_cast<bool>(thread->rand.OneIn(2));
2736     CancelAllBackgroundWork(db_, wait);
2737     bg_canceled = wait;
2738   }
2739   assert(!write_prepared || bg_canceled);
2740   (void) bg_canceled;
2741 #else
2742   (void) thread;
2743 #endif
2744 
2745   for (auto cf : column_families_) {
2746     delete cf;
2747   }
2748   column_families_.clear();
2749 
2750 #ifndef ROCKSDB_LITE
2751   if (thread->rand.OneIn(2)) {
2752     Status s = db_->Close();
2753     if (!s.ok()) {
2754       fprintf(stderr, "Non-ok close status: %s\n", s.ToString().c_str());
2755       fflush(stderr);
2756     }
2757     assert(s.ok());
2758   }
2759 #endif
2760   delete db_;
2761   db_ = nullptr;
2762 #ifndef ROCKSDB_LITE
2763   txn_db_ = nullptr;
2764 #endif
2765 
2766   assert(secondaries_.size() == secondary_cfh_lists_.size());
2767   size_t n = secondaries_.size();
2768   for (size_t i = 0; i != n; ++i) {
2769     for (auto* cf : secondary_cfh_lists_[i]) {
2770       delete cf;
2771     }
2772     secondary_cfh_lists_[i].clear();
2773     delete secondaries_[i];
2774   }
2775   secondaries_.clear();
2776 
2777   num_times_reopened_++;
2778   auto now = clock_->NowMicros();
2779   fprintf(stdout, "%s Reopening database for the %dth time\n",
2780           clock_->TimeToString(now / 1000000).c_str(), num_times_reopened_);
2781   Open();
2782 }
2783 
CheckAndSetOptionsForUserTimestamp()2784 void StressTest::CheckAndSetOptionsForUserTimestamp() {
2785   assert(FLAGS_user_timestamp_size > 0);
2786   const Comparator* const cmp = test::ComparatorWithU64Ts();
2787   assert(cmp);
2788   if (FLAGS_user_timestamp_size != cmp->timestamp_size()) {
2789     fprintf(stderr,
2790             "Only -user_timestamp_size=%d is supported in stress test.\n",
2791             static_cast<int>(cmp->timestamp_size()));
2792     exit(1);
2793   }
2794   if (FLAGS_use_merge || FLAGS_use_full_merge_v1) {
2795     fprintf(stderr, "Merge does not support timestamp yet.\n");
2796     exit(1);
2797   }
2798   if (FLAGS_delrangepercent > 0) {
2799     fprintf(stderr, "DeleteRange does not support timestamp yet.\n");
2800     exit(1);
2801   }
2802   if (FLAGS_use_txn) {
2803     fprintf(stderr, "TransactionDB does not support timestamp yet.\n");
2804     exit(1);
2805   }
2806   if (FLAGS_read_only) {
2807     fprintf(stderr, "When opened as read-only, timestamp not supported.\n");
2808     exit(1);
2809   }
2810   if (FLAGS_test_secondary || FLAGS_secondary_catch_up_one_in > 0 ||
2811       FLAGS_continuous_verification_interval > 0) {
2812     fprintf(stderr, "Secondary instance does not support timestamp.\n");
2813     exit(1);
2814   }
2815   if (FLAGS_checkpoint_one_in > 0) {
2816     fprintf(stderr,
2817             "-checkpoint_one_in=%d requires "
2818             "DBImplReadOnly, which is not supported with timestamp\n",
2819             FLAGS_checkpoint_one_in);
2820     exit(1);
2821   }
2822 #ifndef ROCKSDB_LITE
2823   if (FLAGS_enable_blob_files || FLAGS_use_blob_db) {
2824     fprintf(stderr, "BlobDB not supported with timestamp.\n");
2825     exit(1);
2826   }
2827 #endif  // !ROCKSDB_LITE
2828   if (FLAGS_enable_compaction_filter) {
2829     fprintf(stderr, "CompactionFilter not supported with timestamp.\n");
2830     exit(1);
2831   }
2832   if (FLAGS_test_cf_consistency || FLAGS_test_batches_snapshots) {
2833     fprintf(stderr,
2834             "Due to per-key ts-seq ordering constraint, only the (default) "
2835             "non-batched test is supported with timestamp.\n");
2836     exit(1);
2837   }
2838   if (FLAGS_ingest_external_file_one_in > 0) {
2839     fprintf(stderr, "Bulk loading may not support timestamp yet.\n");
2840     exit(1);
2841   }
2842   options_.comparator = cmp;
2843 }
2844 }  // namespace ROCKSDB_NAMESPACE
2845 #endif  // GFLAGS
2846