1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 //
10 
11 #ifdef GFLAGS
12 #include "db_stress_tool/db_stress_common.h"
13 #include "db_stress_tool/db_stress_compaction_filter.h"
14 #include "db_stress_tool/db_stress_driver.h"
15 #include "db_stress_tool/db_stress_table_properties_collector.h"
16 #include "rocksdb/convenience.h"
17 #include "rocksdb/sst_file_manager.h"
18 #include "rocksdb/types.h"
19 #include "util/cast_util.h"
20 #include "utilities/backupable/backupable_db_impl.h"
21 #include "utilities/fault_injection_fs.h"
22 
23 namespace ROCKSDB_NAMESPACE {
StressTest()24 StressTest::StressTest()
25     : cache_(NewCache(FLAGS_cache_size)),
26       compressed_cache_(NewLRUCache(FLAGS_compressed_cache_size)),
27       filter_policy_(
28           FLAGS_bloom_bits >= 0
29               ? FLAGS_use_ribbon_filter
30                     ? NewExperimentalRibbonFilterPolicy(FLAGS_bloom_bits)
31                     : FLAGS_use_block_based_filter
32                           ? NewBloomFilterPolicy(FLAGS_bloom_bits, true)
33                           : NewBloomFilterPolicy(FLAGS_bloom_bits, false)
34               : nullptr),
35       db_(nullptr),
36 #ifndef ROCKSDB_LITE
37       txn_db_(nullptr),
38 #endif
39       clock_(db_stress_env->GetSystemClock().get()),
40       new_column_family_name_(1),
41       num_times_reopened_(0),
42       db_preload_finished_(false),
43       cmp_db_(nullptr) {
44   if (FLAGS_destroy_db_initially) {
45     std::vector<std::string> files;
46     db_stress_env->GetChildren(FLAGS_db, &files);
47     for (unsigned int i = 0; i < files.size(); i++) {
48       if (Slice(files[i]).starts_with("heap-")) {
49         db_stress_env->DeleteFile(FLAGS_db + "/" + files[i]);
50       }
51     }
52 
53     Options options;
54     options.env = db_stress_env;
55     // Remove files without preserving manfiest files
56 #ifndef ROCKSDB_LITE
57     const Status s = !FLAGS_use_blob_db
58                          ? DestroyDB(FLAGS_db, options)
59                          : blob_db::DestroyBlobDB(FLAGS_db, options,
60                                                   blob_db::BlobDBOptions());
61 #else
62     const Status s = DestroyDB(FLAGS_db, options);
63 #endif  // !ROCKSDB_LITE
64 
65     if (!s.ok()) {
66       fprintf(stderr, "Cannot destroy original db: %s\n", s.ToString().c_str());
67       exit(1);
68     }
69   }
70 }
71 
~StressTest()72 StressTest::~StressTest() {
73   for (auto cf : column_families_) {
74     delete cf;
75   }
76   column_families_.clear();
77   delete db_;
78 
79   assert(secondaries_.size() == secondary_cfh_lists_.size());
80   size_t n = secondaries_.size();
81   for (size_t i = 0; i != n; ++i) {
82     for (auto* cf : secondary_cfh_lists_[i]) {
83       delete cf;
84     }
85     secondary_cfh_lists_[i].clear();
86     delete secondaries_[i];
87   }
88   secondaries_.clear();
89 
90   for (auto* cf : cmp_cfhs_) {
91     delete cf;
92   }
93   cmp_cfhs_.clear();
94   delete cmp_db_;
95 }
96 
NewCache(size_t capacity)97 std::shared_ptr<Cache> StressTest::NewCache(size_t capacity) {
98   if (capacity <= 0) {
99     return nullptr;
100   }
101   if (FLAGS_use_clock_cache) {
102     auto cache = NewClockCache((size_t)capacity);
103     if (!cache) {
104       fprintf(stderr, "Clock cache not supported.");
105       exit(1);
106     }
107     return cache;
108   } else {
109     return NewLRUCache((size_t)capacity);
110   }
111 }
112 
GetBlobCompressionTags()113 std::vector<std::string> StressTest::GetBlobCompressionTags() {
114   std::vector<std::string> compression_tags{"kNoCompression"};
115 
116   if (Snappy_Supported()) {
117     compression_tags.emplace_back("kSnappyCompression");
118   }
119   if (LZ4_Supported()) {
120     compression_tags.emplace_back("kLZ4Compression");
121   }
122   if (ZSTD_Supported()) {
123     compression_tags.emplace_back("kZSTD");
124   }
125 
126   return compression_tags;
127 }
128 
BuildOptionsTable()129 bool StressTest::BuildOptionsTable() {
130   if (FLAGS_set_options_one_in <= 0) {
131     return true;
132   }
133 
134   std::unordered_map<std::string, std::vector<std::string>> options_tbl = {
135       {"write_buffer_size",
136        {ToString(options_.write_buffer_size),
137         ToString(options_.write_buffer_size * 2),
138         ToString(options_.write_buffer_size * 4)}},
139       {"max_write_buffer_number",
140        {ToString(options_.max_write_buffer_number),
141         ToString(options_.max_write_buffer_number * 2),
142         ToString(options_.max_write_buffer_number * 4)}},
143       {"arena_block_size",
144        {
145            ToString(options_.arena_block_size),
146            ToString(options_.write_buffer_size / 4),
147            ToString(options_.write_buffer_size / 8),
148        }},
149       {"memtable_huge_page_size", {"0", ToString(2 * 1024 * 1024)}},
150       {"max_successive_merges", {"0", "2", "4"}},
151       {"inplace_update_num_locks", {"100", "200", "300"}},
152       // TODO(ljin): enable test for this option
153       // {"disable_auto_compactions", {"100", "200", "300"}},
154       {"soft_rate_limit", {"0", "0.5", "0.9"}},
155       {"hard_rate_limit", {"0", "1.1", "2.0"}},
156       {"level0_file_num_compaction_trigger",
157        {
158            ToString(options_.level0_file_num_compaction_trigger),
159            ToString(options_.level0_file_num_compaction_trigger + 2),
160            ToString(options_.level0_file_num_compaction_trigger + 4),
161        }},
162       {"level0_slowdown_writes_trigger",
163        {
164            ToString(options_.level0_slowdown_writes_trigger),
165            ToString(options_.level0_slowdown_writes_trigger + 2),
166            ToString(options_.level0_slowdown_writes_trigger + 4),
167        }},
168       {"level0_stop_writes_trigger",
169        {
170            ToString(options_.level0_stop_writes_trigger),
171            ToString(options_.level0_stop_writes_trigger + 2),
172            ToString(options_.level0_stop_writes_trigger + 4),
173        }},
174       {"max_compaction_bytes",
175        {
176            ToString(options_.target_file_size_base * 5),
177            ToString(options_.target_file_size_base * 15),
178            ToString(options_.target_file_size_base * 100),
179        }},
180       {"target_file_size_base",
181        {
182            ToString(options_.target_file_size_base),
183            ToString(options_.target_file_size_base * 2),
184            ToString(options_.target_file_size_base * 4),
185        }},
186       {"target_file_size_multiplier",
187        {
188            ToString(options_.target_file_size_multiplier),
189            "1",
190            "2",
191        }},
192       {"max_bytes_for_level_base",
193        {
194            ToString(options_.max_bytes_for_level_base / 2),
195            ToString(options_.max_bytes_for_level_base),
196            ToString(options_.max_bytes_for_level_base * 2),
197        }},
198       {"max_bytes_for_level_multiplier",
199        {
200            ToString(options_.max_bytes_for_level_multiplier),
201            "1",
202            "2",
203        }},
204       {"max_sequential_skip_in_iterations", {"4", "8", "12"}},
205   };
206 
207   if (FLAGS_allow_setting_blob_options_dynamically) {
208     options_tbl.emplace("enable_blob_files",
209                         std::vector<std::string>{"false", "true"});
210     options_tbl.emplace("min_blob_size",
211                         std::vector<std::string>{"0", "8", "16"});
212     options_tbl.emplace("blob_file_size",
213                         std::vector<std::string>{"1M", "16M", "256M", "1G"});
214     options_tbl.emplace("blob_compression_type", GetBlobCompressionTags());
215     options_tbl.emplace("enable_blob_garbage_collection",
216                         std::vector<std::string>{"false", "true"});
217     options_tbl.emplace(
218         "blob_garbage_collection_age_cutoff",
219         std::vector<std::string>{"0.0", "0.25", "0.5", "0.75", "1.0"});
220   }
221 
222   options_table_ = std::move(options_tbl);
223 
224   for (const auto& iter : options_table_) {
225     options_index_.push_back(iter.first);
226   }
227   return true;
228 }
229 
InitDb()230 void StressTest::InitDb() {
231   uint64_t now = clock_->NowMicros();
232   fprintf(stdout, "%s Initializing db_stress\n",
233           clock_->TimeToString(now / 1000000).c_str());
234   PrintEnv();
235   Open();
236   BuildOptionsTable();
237 }
238 
FinishInitDb(SharedState * shared)239 void StressTest::FinishInitDb(SharedState* shared) {
240   if (FLAGS_read_only) {
241     uint64_t now = clock_->NowMicros();
242     fprintf(stdout, "%s Preloading db with %" PRIu64 " KVs\n",
243             clock_->TimeToString(now / 1000000).c_str(), FLAGS_max_key);
244     PreloadDbAndReopenAsReadOnly(FLAGS_max_key, shared);
245   }
246   if (FLAGS_enable_compaction_filter) {
247     auto* compaction_filter_factory =
248         reinterpret_cast<DbStressCompactionFilterFactory*>(
249             options_.compaction_filter_factory.get());
250     assert(compaction_filter_factory);
251     compaction_filter_factory->SetSharedState(shared);
252     fprintf(stdout, "Compaction filter factory: %s\n",
253             compaction_filter_factory->Name());
254   }
255 }
256 
VerifySecondaries()257 bool StressTest::VerifySecondaries() {
258 #ifndef ROCKSDB_LITE
259   if (FLAGS_test_secondary) {
260     uint64_t now = clock_->NowMicros();
261     fprintf(stdout, "%s Start to verify secondaries against primary\n",
262             clock_->TimeToString(static_cast<uint64_t>(now) / 1000000).c_str());
263   }
264   for (size_t k = 0; k != secondaries_.size(); ++k) {
265     Status s = secondaries_[k]->TryCatchUpWithPrimary();
266     if (!s.ok()) {
267       fprintf(stderr, "Secondary failed to catch up with primary\n");
268       return false;
269     }
270     ReadOptions ropts;
271     ropts.total_order_seek = true;
272     // Verify only the default column family since the primary may have
273     // dropped other column families after most recent reopen.
274     std::unique_ptr<Iterator> iter1(db_->NewIterator(ropts));
275     std::unique_ptr<Iterator> iter2(secondaries_[k]->NewIterator(ropts));
276     for (iter1->SeekToFirst(), iter2->SeekToFirst();
277          iter1->Valid() && iter2->Valid(); iter1->Next(), iter2->Next()) {
278       if (iter1->key().compare(iter2->key()) != 0 ||
279           iter1->value().compare(iter2->value())) {
280         fprintf(stderr,
281                 "Secondary %d contains different data from "
282                 "primary.\nPrimary: %s : %s\nSecondary: %s : %s\n",
283                 static_cast<int>(k),
284                 iter1->key().ToString(/*hex=*/true).c_str(),
285                 iter1->value().ToString(/*hex=*/true).c_str(),
286                 iter2->key().ToString(/*hex=*/true).c_str(),
287                 iter2->value().ToString(/*hex=*/true).c_str());
288         return false;
289       }
290     }
291     if (iter1->Valid() && !iter2->Valid()) {
292       fprintf(stderr,
293               "Secondary %d record count is smaller than that of primary\n",
294               static_cast<int>(k));
295       return false;
296     } else if (!iter1->Valid() && iter2->Valid()) {
297       fprintf(stderr,
298               "Secondary %d record count is larger than that of primary\n",
299               static_cast<int>(k));
300       return false;
301     }
302   }
303   if (FLAGS_test_secondary) {
304     uint64_t now = clock_->NowMicros();
305     fprintf(stdout, "%s Verification of secondaries succeeded\n",
306             clock_->TimeToString(static_cast<uint64_t>(now) / 1000000).c_str());
307   }
308 #endif  // ROCKSDB_LITE
309   return true;
310 }
311 
AssertSame(DB * db,ColumnFamilyHandle * cf,ThreadState::SnapshotState & snap_state)312 Status StressTest::AssertSame(DB* db, ColumnFamilyHandle* cf,
313                               ThreadState::SnapshotState& snap_state) {
314   Status s;
315   if (cf->GetName() != snap_state.cf_at_name) {
316     return s;
317   }
318   ReadOptions ropt;
319   ropt.snapshot = snap_state.snapshot;
320   Slice ts;
321   if (!snap_state.timestamp.empty()) {
322     ts = snap_state.timestamp;
323     ropt.timestamp = &ts;
324   }
325   PinnableSlice exp_v(&snap_state.value);
326   exp_v.PinSelf();
327   PinnableSlice v;
328   s = db->Get(ropt, cf, snap_state.key, &v);
329   if (!s.ok() && !s.IsNotFound()) {
330     return s;
331   }
332   if (snap_state.status != s) {
333     return Status::Corruption(
334         "The snapshot gave inconsistent results for key " +
335         ToString(Hash(snap_state.key.c_str(), snap_state.key.size(), 0)) +
336         " in cf " + cf->GetName() + ": (" + snap_state.status.ToString() +
337         ") vs. (" + s.ToString() + ")");
338   }
339   if (s.ok()) {
340     if (exp_v != v) {
341       return Status::Corruption("The snapshot gave inconsistent values: (" +
342                                 exp_v.ToString() + ") vs. (" + v.ToString() +
343                                 ")");
344     }
345   }
346   if (snap_state.key_vec != nullptr) {
347     // When `prefix_extractor` is set, seeking to beginning and scanning
348     // across prefixes are only supported with `total_order_seek` set.
349     ropt.total_order_seek = true;
350     std::unique_ptr<Iterator> iterator(db->NewIterator(ropt));
351     std::unique_ptr<std::vector<bool>> tmp_bitvec(
352         new std::vector<bool>(FLAGS_max_key));
353     for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
354       uint64_t key_val;
355       if (GetIntVal(iterator->key().ToString(), &key_val)) {
356         (*tmp_bitvec.get())[key_val] = true;
357       }
358     }
359     if (!std::equal(snap_state.key_vec->begin(), snap_state.key_vec->end(),
360                     tmp_bitvec.get()->begin())) {
361       return Status::Corruption("Found inconsistent keys at this snapshot");
362     }
363   }
364   return Status::OK();
365 }
366 
VerificationAbort(SharedState * shared,std::string msg,Status s) const367 void StressTest::VerificationAbort(SharedState* shared, std::string msg,
368                                    Status s) const {
369   fprintf(stderr, "Verification failed: %s. Status is %s\n", msg.c_str(),
370           s.ToString().c_str());
371   shared->SetVerificationFailure();
372 }
373 
VerificationAbort(SharedState * shared,std::string msg,int cf,int64_t key) const374 void StressTest::VerificationAbort(SharedState* shared, std::string msg, int cf,
375                                    int64_t key) const {
376   auto key_str = Key(key);
377   Slice key_slice = key_str;
378   fprintf(stderr,
379           "Verification failed for column family %d key %s (%" PRIi64 "): %s\n",
380           cf, key_slice.ToString(true).c_str(), key, msg.c_str());
381   shared->SetVerificationFailure();
382 }
383 
PrintStatistics()384 void StressTest::PrintStatistics() {
385   if (dbstats) {
386     fprintf(stdout, "STATISTICS:\n%s\n", dbstats->ToString().c_str());
387   }
388   if (dbstats_secondaries) {
389     fprintf(stdout, "Secondary instances STATISTICS:\n%s\n",
390             dbstats_secondaries->ToString().c_str());
391   }
392 }
393 
394 // Currently PreloadDb has to be single-threaded.
PreloadDbAndReopenAsReadOnly(int64_t number_of_keys,SharedState * shared)395 void StressTest::PreloadDbAndReopenAsReadOnly(int64_t number_of_keys,
396                                               SharedState* shared) {
397   WriteOptions write_opts;
398   write_opts.disableWAL = FLAGS_disable_wal;
399   if (FLAGS_sync) {
400     write_opts.sync = true;
401   }
402   char value[100];
403   int cf_idx = 0;
404   Status s;
405   for (auto cfh : column_families_) {
406     for (int64_t k = 0; k != number_of_keys; ++k) {
407       std::string key_str = Key(k);
408       Slice key = key_str;
409       size_t sz = GenerateValue(0 /*value_base*/, value, sizeof(value));
410       Slice v(value, sz);
411       shared->Put(cf_idx, k, 0, true /* pending */);
412 
413       if (FLAGS_use_merge) {
414         if (!FLAGS_use_txn) {
415           s = db_->Merge(write_opts, cfh, key, v);
416         } else {
417 #ifndef ROCKSDB_LITE
418           Transaction* txn;
419           s = NewTxn(write_opts, &txn);
420           if (s.ok()) {
421             s = txn->Merge(cfh, key, v);
422             if (s.ok()) {
423               s = CommitTxn(txn);
424             }
425           }
426 #endif
427         }
428       } else {
429         if (!FLAGS_use_txn) {
430           std::string ts_str;
431           Slice ts;
432           if (FLAGS_user_timestamp_size > 0) {
433             ts_str = NowNanosStr();
434             ts = ts_str;
435             write_opts.timestamp = &ts;
436           }
437           s = db_->Put(write_opts, cfh, key, v);
438         } else {
439 #ifndef ROCKSDB_LITE
440           Transaction* txn;
441           s = NewTxn(write_opts, &txn);
442           if (s.ok()) {
443             s = txn->Put(cfh, key, v);
444             if (s.ok()) {
445               s = CommitTxn(txn);
446             }
447           }
448 #endif
449         }
450       }
451 
452       shared->Put(cf_idx, k, 0, false /* pending */);
453       if (!s.ok()) {
454         break;
455       }
456     }
457     if (!s.ok()) {
458       break;
459     }
460     ++cf_idx;
461   }
462   if (s.ok()) {
463     s = db_->Flush(FlushOptions(), column_families_);
464   }
465   if (s.ok()) {
466     for (auto cf : column_families_) {
467       delete cf;
468     }
469     column_families_.clear();
470     delete db_;
471     db_ = nullptr;
472 #ifndef ROCKSDB_LITE
473     txn_db_ = nullptr;
474 #endif
475 
476     db_preload_finished_.store(true);
477     auto now = clock_->NowMicros();
478     fprintf(stdout, "%s Reopening database in read-only\n",
479             clock_->TimeToString(now / 1000000).c_str());
480     // Reopen as read-only, can ignore all options related to updates
481     Open();
482   } else {
483     fprintf(stderr, "Failed to preload db");
484     exit(1);
485   }
486 }
487 
SetOptions(ThreadState * thread)488 Status StressTest::SetOptions(ThreadState* thread) {
489   assert(FLAGS_set_options_one_in > 0);
490   std::unordered_map<std::string, std::string> opts;
491   std::string name =
492       options_index_[thread->rand.Next() % options_index_.size()];
493   int value_idx = thread->rand.Next() % options_table_[name].size();
494   if (name == "soft_rate_limit" || name == "hard_rate_limit") {
495     opts["soft_rate_limit"] = options_table_["soft_rate_limit"][value_idx];
496     opts["hard_rate_limit"] = options_table_["hard_rate_limit"][value_idx];
497   } else if (name == "level0_file_num_compaction_trigger" ||
498              name == "level0_slowdown_writes_trigger" ||
499              name == "level0_stop_writes_trigger") {
500     opts["level0_file_num_compaction_trigger"] =
501         options_table_["level0_file_num_compaction_trigger"][value_idx];
502     opts["level0_slowdown_writes_trigger"] =
503         options_table_["level0_slowdown_writes_trigger"][value_idx];
504     opts["level0_stop_writes_trigger"] =
505         options_table_["level0_stop_writes_trigger"][value_idx];
506   } else {
507     opts[name] = options_table_[name][value_idx];
508   }
509 
510   int rand_cf_idx = thread->rand.Next() % FLAGS_column_families;
511   auto cfh = column_families_[rand_cf_idx];
512   return db_->SetOptions(cfh, opts);
513 }
514 
515 #ifndef ROCKSDB_LITE
NewTxn(WriteOptions & write_opts,Transaction ** txn)516 Status StressTest::NewTxn(WriteOptions& write_opts, Transaction** txn) {
517   if (!FLAGS_use_txn) {
518     return Status::InvalidArgument("NewTxn when FLAGS_use_txn is not set");
519   }
520   static std::atomic<uint64_t> txn_id = {0};
521   TransactionOptions txn_options;
522   txn_options.lock_timeout = 600000;  // 10 min
523   txn_options.deadlock_detect = true;
524   *txn = txn_db_->BeginTransaction(write_opts, txn_options);
525   auto istr = std::to_string(txn_id.fetch_add(1));
526   Status s = (*txn)->SetName("xid" + istr);
527   return s;
528 }
529 
CommitTxn(Transaction * txn)530 Status StressTest::CommitTxn(Transaction* txn) {
531   if (!FLAGS_use_txn) {
532     return Status::InvalidArgument("CommitTxn when FLAGS_use_txn is not set");
533   }
534   Status s = txn->Prepare();
535   if (s.ok()) {
536     s = txn->Commit();
537   }
538   delete txn;
539   return s;
540 }
541 
RollbackTxn(Transaction * txn)542 Status StressTest::RollbackTxn(Transaction* txn) {
543   if (!FLAGS_use_txn) {
544     return Status::InvalidArgument(
545         "RollbackTxn when FLAGS_use_txn is not"
546         " set");
547   }
548   Status s = txn->Rollback();
549   delete txn;
550   return s;
551 }
552 #endif
553 
OperateDb(ThreadState * thread)554 void StressTest::OperateDb(ThreadState* thread) {
555   ReadOptions read_opts(FLAGS_verify_checksum, true);
556   WriteOptions write_opts;
557   auto shared = thread->shared;
558   char value[100];
559   std::string from_db;
560   if (FLAGS_sync) {
561     write_opts.sync = true;
562   }
563   write_opts.disableWAL = FLAGS_disable_wal;
564   const int prefixBound = static_cast<int>(FLAGS_readpercent) +
565                           static_cast<int>(FLAGS_prefixpercent);
566   const int writeBound = prefixBound + static_cast<int>(FLAGS_writepercent);
567   const int delBound = writeBound + static_cast<int>(FLAGS_delpercent);
568   const int delRangeBound = delBound + static_cast<int>(FLAGS_delrangepercent);
569   const uint64_t ops_per_open = FLAGS_ops_per_thread / (FLAGS_reopen + 1);
570 
571 #ifndef NDEBUG
572   if (FLAGS_read_fault_one_in) {
573     fault_fs_guard->SetThreadLocalReadErrorContext(thread->shared->GetSeed(),
574                                             FLAGS_read_fault_one_in);
575   }
576   if (FLAGS_write_fault_one_in) {
577     IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
578     error_msg.SetRetryable(true);
579     std::vector<FileType> types = {FileType::kTableFile,
580                                    FileType::kDescriptorFile,
581                                    FileType::kCurrentFile};
582     fault_fs_guard->SetRandomWriteError(
583         thread->shared->GetSeed(), FLAGS_write_fault_one_in, error_msg, types);
584   }
585 #endif // NDEBUG
586   thread->stats.Start();
587   for (int open_cnt = 0; open_cnt <= FLAGS_reopen; ++open_cnt) {
588     if (thread->shared->HasVerificationFailedYet() ||
589         thread->shared->ShouldStopTest()) {
590       break;
591     }
592     if (open_cnt != 0) {
593       thread->stats.FinishedSingleOp();
594       MutexLock l(thread->shared->GetMutex());
595       while (!thread->snapshot_queue.empty()) {
596         db_->ReleaseSnapshot(thread->snapshot_queue.front().second.snapshot);
597         delete thread->snapshot_queue.front().second.key_vec;
598         thread->snapshot_queue.pop();
599       }
600       thread->shared->IncVotedReopen();
601       if (thread->shared->AllVotedReopen()) {
602         thread->shared->GetStressTest()->Reopen(thread);
603         thread->shared->GetCondVar()->SignalAll();
604       } else {
605         thread->shared->GetCondVar()->Wait();
606       }
607       // Commenting this out as we don't want to reset stats on each open.
608       // thread->stats.Start();
609     }
610 
611     for (uint64_t i = 0; i < ops_per_open; i++) {
612       if (thread->shared->HasVerificationFailedYet()) {
613         break;
614       }
615 
616       // Change Options
617       if (thread->rand.OneInOpt(FLAGS_set_options_one_in)) {
618         SetOptions(thread);
619       }
620 
621       if (thread->rand.OneInOpt(FLAGS_set_in_place_one_in)) {
622         options_.inplace_update_support ^= options_.inplace_update_support;
623       }
624 
625       if (thread->tid == 0 && FLAGS_verify_db_one_in > 0 &&
626           thread->rand.OneIn(FLAGS_verify_db_one_in)) {
627         ContinuouslyVerifyDb(thread);
628         if (thread->shared->ShouldStopTest()) {
629           break;
630         }
631       }
632 
633       MaybeClearOneColumnFamily(thread);
634 
635       if (thread->rand.OneInOpt(FLAGS_sync_wal_one_in)) {
636         Status s = db_->SyncWAL();
637         if (!s.ok() && !s.IsNotSupported()) {
638           fprintf(stderr, "SyncWAL() failed: %s\n", s.ToString().c_str());
639         }
640       }
641 
642       int rand_column_family = thread->rand.Next() % FLAGS_column_families;
643       ColumnFamilyHandle* column_family = column_families_[rand_column_family];
644 
645       if (thread->rand.OneInOpt(FLAGS_compact_files_one_in)) {
646         TestCompactFiles(thread, column_family);
647       }
648 
649       int64_t rand_key = GenerateOneKey(thread, i);
650       std::string keystr = Key(rand_key);
651       Slice key = keystr;
652       std::unique_ptr<MutexLock> lock;
653       if (ShouldAcquireMutexOnKey()) {
654         lock.reset(new MutexLock(
655             shared->GetMutexForKey(rand_column_family, rand_key)));
656       }
657 
658       if (thread->rand.OneInOpt(FLAGS_compact_range_one_in)) {
659         TestCompactRange(thread, rand_key, key, column_family);
660         if (thread->shared->HasVerificationFailedYet()) {
661           break;
662         }
663       }
664 
665       std::vector<int> rand_column_families =
666           GenerateColumnFamilies(FLAGS_column_families, rand_column_family);
667 
668       if (thread->rand.OneInOpt(FLAGS_flush_one_in)) {
669         Status status = TestFlush(rand_column_families);
670         if (!status.ok()) {
671           fprintf(stdout, "Unable to perform Flush(): %s\n",
672                   status.ToString().c_str());
673         }
674       }
675 
676 #ifndef ROCKSDB_LITE
677       // Verify GetLiveFiles with a 1 in N chance.
678       if (thread->rand.OneInOpt(FLAGS_get_live_files_one_in) &&
679           !FLAGS_write_fault_one_in) {
680         Status status = VerifyGetLiveFiles();
681         if (!status.ok()) {
682           VerificationAbort(shared, "VerifyGetLiveFiles status not OK", status);
683         }
684       }
685 
686       // Verify GetSortedWalFiles with a 1 in N chance.
687       if (thread->rand.OneInOpt(FLAGS_get_sorted_wal_files_one_in)) {
688         Status status = VerifyGetSortedWalFiles();
689         if (!status.ok()) {
690           VerificationAbort(shared, "VerifyGetSortedWalFiles status not OK",
691                             status);
692         }
693       }
694 
695       // Verify GetCurrentWalFile with a 1 in N chance.
696       if (thread->rand.OneInOpt(FLAGS_get_current_wal_file_one_in)) {
697         Status status = VerifyGetCurrentWalFile();
698         if (!status.ok()) {
699           VerificationAbort(shared, "VerifyGetCurrentWalFile status not OK",
700                             status);
701         }
702       }
703 #endif  // !ROCKSDB_LITE
704 
705       if (thread->rand.OneInOpt(FLAGS_pause_background_one_in)) {
706         Status status = TestPauseBackground(thread);
707         if (!status.ok()) {
708           VerificationAbort(
709               shared, "Pause/ContinueBackgroundWork status not OK", status);
710         }
711       }
712 
713 #ifndef ROCKSDB_LITE
714       if (thread->rand.OneInOpt(FLAGS_verify_checksum_one_in)) {
715         Status status = db_->VerifyChecksum();
716         if (!status.ok()) {
717           VerificationAbort(shared, "VerifyChecksum status not OK", status);
718         }
719       }
720 
721       if (thread->rand.OneInOpt(FLAGS_get_property_one_in)) {
722         TestGetProperty(thread);
723       }
724 #endif
725 
726       std::vector<int64_t> rand_keys = GenerateKeys(rand_key);
727 
728       if (thread->rand.OneInOpt(FLAGS_ingest_external_file_one_in)) {
729         TestIngestExternalFile(thread, rand_column_families, rand_keys, lock);
730       }
731 
732       if (thread->rand.OneInOpt(FLAGS_backup_one_in)) {
733         // Beyond a certain DB size threshold, this test becomes heavier than
734         // it's worth.
735         uint64_t total_size = 0;
736         if (FLAGS_backup_max_size > 0) {
737           std::vector<FileAttributes> files;
738           db_stress_env->GetChildrenFileAttributes(FLAGS_db, &files);
739           for (auto& file : files) {
740             total_size += file.size_bytes;
741           }
742         }
743 
744         if (total_size <= FLAGS_backup_max_size) {
745           Status s = TestBackupRestore(thread, rand_column_families, rand_keys);
746           if (!s.ok()) {
747             VerificationAbort(shared, "Backup/restore gave inconsistent state",
748                               s);
749           }
750         }
751       }
752 
753       if (thread->rand.OneInOpt(FLAGS_checkpoint_one_in)) {
754         Status s = TestCheckpoint(thread, rand_column_families, rand_keys);
755         if (!s.ok()) {
756           VerificationAbort(shared, "Checkpoint gave inconsistent state", s);
757         }
758       }
759 
760 #ifndef ROCKSDB_LITE
761       if (thread->rand.OneInOpt(FLAGS_approximate_size_one_in)) {
762         Status s =
763             TestApproximateSize(thread, i, rand_column_families, rand_keys);
764         if (!s.ok()) {
765           VerificationAbort(shared, "ApproximateSize Failed", s);
766         }
767       }
768 #endif  // !ROCKSDB_LITE
769       if (thread->rand.OneInOpt(FLAGS_acquire_snapshot_one_in)) {
770         TestAcquireSnapshot(thread, rand_column_family, keystr, i);
771       }
772 
773       /*always*/ {
774         Status s = MaybeReleaseSnapshots(thread, i);
775         if (!s.ok()) {
776           VerificationAbort(shared, "Snapshot gave inconsistent state", s);
777         }
778       }
779 
780       // Assign timestamps if necessary.
781       std::string read_ts_str;
782       std::string write_ts_str;
783       Slice read_ts;
784       Slice write_ts;
785       if (ShouldAcquireMutexOnKey() && FLAGS_user_timestamp_size > 0) {
786         read_ts_str = GenerateTimestampForRead();
787         read_ts = read_ts_str;
788         read_opts.timestamp = &read_ts;
789         write_ts_str = NowNanosStr();
790         write_ts = write_ts_str;
791         write_opts.timestamp = &write_ts;
792       }
793 
794       int prob_op = thread->rand.Uniform(100);
795       // Reset this in case we pick something other than a read op. We don't
796       // want to use a stale value when deciding at the beginning of the loop
797       // whether to vote to reopen
798       if (prob_op >= 0 && prob_op < static_cast<int>(FLAGS_readpercent)) {
799         assert(0 <= prob_op);
800         // OPERATION read
801         if (FLAGS_use_multiget) {
802           // Leave room for one more iteration of the loop with a single key
803           // batch. This is to ensure that each thread does exactly the same
804           // number of ops
805           int multiget_batch_size = static_cast<int>(
806               std::min(static_cast<uint64_t>(thread->rand.Uniform(64)),
807                        FLAGS_ops_per_thread - i - 1));
808           // If its the last iteration, ensure that multiget_batch_size is 1
809           multiget_batch_size = std::max(multiget_batch_size, 1);
810           rand_keys = GenerateNKeys(thread, multiget_batch_size, i);
811           TestMultiGet(thread, read_opts, rand_column_families, rand_keys);
812           i += multiget_batch_size - 1;
813         } else {
814           TestGet(thread, read_opts, rand_column_families, rand_keys);
815         }
816       } else if (prob_op < prefixBound) {
817         assert(static_cast<int>(FLAGS_readpercent) <= prob_op);
818         // OPERATION prefix scan
819         // keys are 8 bytes long, prefix size is FLAGS_prefix_size. There are
820         // (8 - FLAGS_prefix_size) bytes besides the prefix. So there will
821         // be 2 ^ ((8 - FLAGS_prefix_size) * 8) possible keys with the same
822         // prefix
823         TestPrefixScan(thread, read_opts, rand_column_families, rand_keys);
824       } else if (prob_op < writeBound) {
825         assert(prefixBound <= prob_op);
826         // OPERATION write
827         TestPut(thread, write_opts, read_opts, rand_column_families, rand_keys,
828                 value, lock);
829       } else if (prob_op < delBound) {
830         assert(writeBound <= prob_op);
831         // OPERATION delete
832         TestDelete(thread, write_opts, rand_column_families, rand_keys, lock);
833       } else if (prob_op < delRangeBound) {
834         assert(delBound <= prob_op);
835         // OPERATION delete range
836         TestDeleteRange(thread, write_opts, rand_column_families, rand_keys,
837                         lock);
838       } else {
839         assert(delRangeBound <= prob_op);
840         // OPERATION iterate
841         int num_seeks = static_cast<int>(
842             std::min(static_cast<uint64_t>(thread->rand.Uniform(4)),
843                      FLAGS_ops_per_thread - i - 1));
844         rand_keys = GenerateNKeys(thread, num_seeks, i);
845         i += num_seeks - 1;
846         TestIterate(thread, read_opts, rand_column_families, rand_keys);
847       }
848       thread->stats.FinishedSingleOp();
849 #ifndef ROCKSDB_LITE
850       uint32_t tid = thread->tid;
851       assert(secondaries_.empty() ||
852              static_cast<size_t>(tid) < secondaries_.size());
853       if (thread->rand.OneInOpt(FLAGS_secondary_catch_up_one_in)) {
854         Status s = secondaries_[tid]->TryCatchUpWithPrimary();
855         if (!s.ok()) {
856           VerificationAbort(shared, "Secondary instance failed to catch up", s);
857           break;
858         }
859       }
860 #endif
861     }
862   }
863   while (!thread->snapshot_queue.empty()) {
864     db_->ReleaseSnapshot(thread->snapshot_queue.front().second.snapshot);
865     delete thread->snapshot_queue.front().second.key_vec;
866     thread->snapshot_queue.pop();
867   }
868 
869   thread->stats.Stop();
870 }
871 
872 #ifndef ROCKSDB_LITE
873 // Generated a list of keys that close to boundaries of SST keys.
874 // If there isn't any SST file in the DB, return empty list.
GetWhiteBoxKeys(ThreadState * thread,DB * db,ColumnFamilyHandle * cfh,size_t num_keys)875 std::vector<std::string> StressTest::GetWhiteBoxKeys(ThreadState* thread,
876                                                      DB* db,
877                                                      ColumnFamilyHandle* cfh,
878                                                      size_t num_keys) {
879   ColumnFamilyMetaData cfmd;
880   db->GetColumnFamilyMetaData(cfh, &cfmd);
881   std::vector<std::string> boundaries;
882   for (const LevelMetaData& lmd : cfmd.levels) {
883     for (const SstFileMetaData& sfmd : lmd.files) {
884       // If FLAGS_user_timestamp_size > 0, then both smallestkey and largestkey
885       // have timestamps.
886       const auto& skey = sfmd.smallestkey;
887       const auto& lkey = sfmd.largestkey;
888       assert(skey.size() >= FLAGS_user_timestamp_size);
889       assert(lkey.size() >= FLAGS_user_timestamp_size);
890       boundaries.push_back(
891           skey.substr(0, skey.size() - FLAGS_user_timestamp_size));
892       boundaries.push_back(
893           lkey.substr(0, lkey.size() - FLAGS_user_timestamp_size));
894     }
895   }
896   if (boundaries.empty()) {
897     return {};
898   }
899 
900   std::vector<std::string> ret;
901   for (size_t j = 0; j < num_keys; j++) {
902     std::string k =
903         boundaries[thread->rand.Uniform(static_cast<int>(boundaries.size()))];
904     if (thread->rand.OneIn(3)) {
905       // Reduce one byte from the string
906       for (int i = static_cast<int>(k.length()) - 1; i >= 0; i--) {
907         uint8_t cur = k[i];
908         if (cur > 0) {
909           k[i] = static_cast<char>(cur - 1);
910           break;
911         } else if (i > 0) {
912           k[i] = 0xFFu;
913         }
914       }
915     } else if (thread->rand.OneIn(2)) {
916       // Add one byte to the string
917       for (int i = static_cast<int>(k.length()) - 1; i >= 0; i--) {
918         uint8_t cur = k[i];
919         if (cur < 255) {
920           k[i] = static_cast<char>(cur + 1);
921           break;
922         } else if (i > 0) {
923           k[i] = 0x00;
924         }
925       }
926     }
927     ret.push_back(k);
928   }
929   return ret;
930 }
931 #endif  // !ROCKSDB_LITE
932 
933 // Given a key K, this creates an iterator which scans to K and then
934 // does a random sequence of Next/Prev operations.
TestIterate(ThreadState * thread,const ReadOptions & read_opts,const std::vector<int> & rand_column_families,const std::vector<int64_t> & rand_keys)935 Status StressTest::TestIterate(ThreadState* thread,
936                                const ReadOptions& read_opts,
937                                const std::vector<int>& rand_column_families,
938                                const std::vector<int64_t>& rand_keys) {
939   Status s;
940   const Snapshot* snapshot = db_->GetSnapshot();
941   ReadOptions readoptionscopy = read_opts;
942   readoptionscopy.snapshot = snapshot;
943 
944   bool expect_total_order = false;
945   if (thread->rand.OneIn(16)) {
946     // When prefix extractor is used, it's useful to cover total order seek.
947     readoptionscopy.total_order_seek = true;
948     expect_total_order = true;
949   } else if (thread->rand.OneIn(4)) {
950     readoptionscopy.total_order_seek = false;
951     readoptionscopy.auto_prefix_mode = true;
952     expect_total_order = true;
953   } else if (options_.prefix_extractor.get() == nullptr) {
954     expect_total_order = true;
955   }
956 
957   std::string upper_bound_str;
958   Slice upper_bound;
959   if (thread->rand.OneIn(16)) {
960     // in 1/16 chance, set a iterator upper bound
961     int64_t rand_upper_key = GenerateOneKey(thread, FLAGS_ops_per_thread);
962     upper_bound_str = Key(rand_upper_key);
963     upper_bound = Slice(upper_bound_str);
964     // uppder_bound can be smaller than seek key, but the query itself
965     // should not crash either.
966     readoptionscopy.iterate_upper_bound = &upper_bound;
967   }
968   std::string lower_bound_str;
969   Slice lower_bound;
970   if (thread->rand.OneIn(16)) {
971     // in 1/16 chance, enable iterator lower bound
972     int64_t rand_lower_key = GenerateOneKey(thread, FLAGS_ops_per_thread);
973     lower_bound_str = Key(rand_lower_key);
974     lower_bound = Slice(lower_bound_str);
975     // uppder_bound can be smaller than seek key, but the query itself
976     // should not crash either.
977     readoptionscopy.iterate_lower_bound = &lower_bound;
978   }
979 
980   auto cfh = column_families_[rand_column_families[0]];
981   std::unique_ptr<Iterator> iter(db_->NewIterator(readoptionscopy, cfh));
982 
983   std::vector<std::string> key_str;
984   if (thread->rand.OneIn(16)) {
985     // Generate keys close to lower or upper bound of SST files.
986     key_str = GetWhiteBoxKeys(thread, db_, cfh, rand_keys.size());
987   }
988   if (key_str.empty()) {
989     // If key string is not geneerated using white block keys,
990     // Use randomized key passe in.
991     for (int64_t rkey : rand_keys) {
992       key_str.push_back(Key(rkey));
993     }
994   }
995 
996   std::string op_logs;
997   const size_t kOpLogsLimit = 10000;
998 
999   for (const std::string& skey : key_str) {
1000     if (op_logs.size() > kOpLogsLimit) {
1001       // Shouldn't take too much memory for the history log. Clear it.
1002       op_logs = "(cleared...)\n";
1003     }
1004 
1005     Slice key = skey;
1006 
1007     if (readoptionscopy.iterate_upper_bound != nullptr &&
1008         thread->rand.OneIn(2)) {
1009       // 1/2 chance, change the upper bound.
1010       // It is possible that it is changed without first use, but there is no
1011       // problem with that.
1012       int64_t rand_upper_key = GenerateOneKey(thread, FLAGS_ops_per_thread);
1013       upper_bound_str = Key(rand_upper_key);
1014       upper_bound = Slice(upper_bound_str);
1015     } else if (readoptionscopy.iterate_lower_bound != nullptr &&
1016                thread->rand.OneIn(4)) {
1017       // 1/4 chance, change the lower bound.
1018       // It is possible that it is changed without first use, but there is no
1019       // problem with that.
1020       int64_t rand_lower_key = GenerateOneKey(thread, FLAGS_ops_per_thread);
1021       lower_bound_str = Key(rand_lower_key);
1022       lower_bound = Slice(lower_bound_str);
1023     }
1024 
1025     // Record some options to op_logs;
1026     op_logs += "total_order_seek: ";
1027     op_logs += (readoptionscopy.total_order_seek ? "1 " : "0 ");
1028     op_logs += "auto_prefix_mode: ";
1029     op_logs += (readoptionscopy.auto_prefix_mode ? "1 " : "0 ");
1030     if (readoptionscopy.iterate_upper_bound != nullptr) {
1031       op_logs += "ub: " + upper_bound.ToString(true) + " ";
1032     }
1033     if (readoptionscopy.iterate_lower_bound != nullptr) {
1034       op_logs += "lb: " + lower_bound.ToString(true) + " ";
1035     }
1036 
1037     // Set up an iterator and does the same without bounds and with total
1038     // order seek and compare the results. This is to identify bugs related
1039     // to bounds, prefix extractor or reseeking. Sometimes we are comparing
1040     // iterators with the same set-up, and it doesn't hurt to check them
1041     // to be equal.
1042     ReadOptions cmp_ro;
1043     cmp_ro.timestamp = readoptionscopy.timestamp;
1044     cmp_ro.snapshot = snapshot;
1045     cmp_ro.total_order_seek = true;
1046     ColumnFamilyHandle* cmp_cfh =
1047         GetControlCfh(thread, rand_column_families[0]);
1048     std::unique_ptr<Iterator> cmp_iter(db_->NewIterator(cmp_ro, cmp_cfh));
1049     bool diverged = false;
1050 
1051     bool support_seek_first_or_last = expect_total_order;
1052 
1053     LastIterateOp last_op;
1054     if (support_seek_first_or_last && thread->rand.OneIn(100)) {
1055       iter->SeekToFirst();
1056       cmp_iter->SeekToFirst();
1057       last_op = kLastOpSeekToFirst;
1058       op_logs += "STF ";
1059     } else if (support_seek_first_or_last && thread->rand.OneIn(100)) {
1060       iter->SeekToLast();
1061       cmp_iter->SeekToLast();
1062       last_op = kLastOpSeekToLast;
1063       op_logs += "STL ";
1064     } else if (thread->rand.OneIn(8)) {
1065       iter->SeekForPrev(key);
1066       cmp_iter->SeekForPrev(key);
1067       last_op = kLastOpSeekForPrev;
1068       op_logs += "SFP " + key.ToString(true) + " ";
1069     } else {
1070       iter->Seek(key);
1071       cmp_iter->Seek(key);
1072       last_op = kLastOpSeek;
1073       op_logs += "S " + key.ToString(true) + " ";
1074     }
1075     VerifyIterator(thread, cmp_cfh, readoptionscopy, iter.get(), cmp_iter.get(),
1076                    last_op, key, op_logs, &diverged);
1077 
1078     bool no_reverse =
1079         (FLAGS_memtablerep == "prefix_hash" && !expect_total_order);
1080     for (uint64_t i = 0; i < FLAGS_num_iterations && iter->Valid(); i++) {
1081       if (no_reverse || thread->rand.OneIn(2)) {
1082         iter->Next();
1083         if (!diverged) {
1084           assert(cmp_iter->Valid());
1085           cmp_iter->Next();
1086         }
1087         op_logs += "N";
1088       } else {
1089         iter->Prev();
1090         if (!diverged) {
1091           assert(cmp_iter->Valid());
1092           cmp_iter->Prev();
1093         }
1094         op_logs += "P";
1095       }
1096       last_op = kLastOpNextOrPrev;
1097       VerifyIterator(thread, cmp_cfh, readoptionscopy, iter.get(),
1098                      cmp_iter.get(), last_op, key, op_logs, &diverged);
1099     }
1100 
1101     if (s.ok()) {
1102       thread->stats.AddIterations(1);
1103     } else {
1104       fprintf(stderr, "TestIterate error: %s\n", s.ToString().c_str());
1105       thread->stats.AddErrors(1);
1106       break;
1107     }
1108 
1109     op_logs += "; ";
1110   }
1111 
1112   db_->ReleaseSnapshot(snapshot);
1113 
1114   return s;
1115 }
1116 
1117 #ifndef ROCKSDB_LITE
1118 // Test the return status of GetLiveFiles.
VerifyGetLiveFiles() const1119 Status StressTest::VerifyGetLiveFiles() const {
1120   std::vector<std::string> live_file;
1121   uint64_t manifest_size = 0;
1122   return db_->GetLiveFiles(live_file, &manifest_size);
1123 }
1124 
1125 // Test the return status of GetSortedWalFiles.
VerifyGetSortedWalFiles() const1126 Status StressTest::VerifyGetSortedWalFiles() const {
1127   VectorLogPtr log_ptr;
1128   return db_->GetSortedWalFiles(log_ptr);
1129 }
1130 
1131 // Test the return status of GetCurrentWalFile.
VerifyGetCurrentWalFile() const1132 Status StressTest::VerifyGetCurrentWalFile() const {
1133   std::unique_ptr<LogFile> cur_wal_file;
1134   return db_->GetCurrentWalFile(&cur_wal_file);
1135 }
1136 #endif  // !ROCKSDB_LITE
1137 
1138 // Compare the two iterator, iter and cmp_iter are in the same position,
1139 // unless iter might be made invalidate or undefined because of
1140 // upper or lower bounds, or prefix extractor.
1141 // Will flag failure if the verification fails.
1142 // diverged = true if the two iterator is already diverged.
1143 // True if verification passed, false if not.
VerifyIterator(ThreadState * thread,ColumnFamilyHandle * cmp_cfh,const ReadOptions & ro,Iterator * iter,Iterator * cmp_iter,LastIterateOp op,const Slice & seek_key,const std::string & op_logs,bool * diverged)1144 void StressTest::VerifyIterator(ThreadState* thread,
1145                                 ColumnFamilyHandle* cmp_cfh,
1146                                 const ReadOptions& ro, Iterator* iter,
1147                                 Iterator* cmp_iter, LastIterateOp op,
1148                                 const Slice& seek_key,
1149                                 const std::string& op_logs, bool* diverged) {
1150   if (*diverged) {
1151     return;
1152   }
1153 
1154   if (op == kLastOpSeekToFirst && ro.iterate_lower_bound != nullptr) {
1155     // SeekToFirst() with lower bound is not well defined.
1156     *diverged = true;
1157     return;
1158   } else if (op == kLastOpSeekToLast && ro.iterate_upper_bound != nullptr) {
1159     // SeekToLast() with higher bound is not well defined.
1160     *diverged = true;
1161     return;
1162   } else if (op == kLastOpSeek && ro.iterate_lower_bound != nullptr &&
1163              (options_.comparator->CompareWithoutTimestamp(
1164                   *ro.iterate_lower_bound, /*a_has_ts=*/false, seek_key,
1165                   /*b_has_ts=*/false) >= 0 ||
1166               (ro.iterate_upper_bound != nullptr &&
1167                options_.comparator->CompareWithoutTimestamp(
1168                    *ro.iterate_lower_bound, /*a_has_ts=*/false,
1169                    *ro.iterate_upper_bound, /*b_has_ts*/ false) >= 0))) {
1170     // Lower bound behavior is not well defined if it is larger than
1171     // seek key or upper bound. Disable the check for now.
1172     *diverged = true;
1173     return;
1174   } else if (op == kLastOpSeekForPrev && ro.iterate_upper_bound != nullptr &&
1175              (options_.comparator->CompareWithoutTimestamp(
1176                   *ro.iterate_upper_bound, /*a_has_ts=*/false, seek_key,
1177                   /*b_has_ts=*/false) <= 0 ||
1178               (ro.iterate_lower_bound != nullptr &&
1179                options_.comparator->CompareWithoutTimestamp(
1180                    *ro.iterate_lower_bound, /*a_has_ts=*/false,
1181                    *ro.iterate_upper_bound, /*b_has_ts=*/false) >= 0))) {
1182     // Uppder bound behavior is not well defined if it is smaller than
1183     // seek key or lower bound. Disable the check for now.
1184     *diverged = true;
1185     return;
1186   }
1187 
1188   const SliceTransform* pe = (ro.total_order_seek || ro.auto_prefix_mode)
1189                                  ? nullptr
1190                                  : options_.prefix_extractor.get();
1191   const Comparator* cmp = options_.comparator;
1192 
1193   if (iter->Valid() && !cmp_iter->Valid()) {
1194     if (pe != nullptr) {
1195       if (!pe->InDomain(seek_key)) {
1196         // Prefix seek a non-in-domain key is undefined. Skip checking for
1197         // this scenario.
1198         *diverged = true;
1199         return;
1200       } else if (!pe->InDomain(iter->key())) {
1201         // out of range is iterator key is not in domain anymore.
1202         *diverged = true;
1203         return;
1204       } else if (pe->Transform(iter->key()) != pe->Transform(seek_key)) {
1205         *diverged = true;
1206         return;
1207       }
1208     }
1209     fprintf(stderr,
1210             "Control interator is invalid but iterator has key %s "
1211             "%s\n",
1212             iter->key().ToString(true).c_str(), op_logs.c_str());
1213 
1214     *diverged = true;
1215   } else if (cmp_iter->Valid()) {
1216     // Iterator is not valid. It can be legimate if it has already been
1217     // out of upper or lower bound, or filtered out by prefix iterator.
1218     const Slice& total_order_key = cmp_iter->key();
1219 
1220     if (pe != nullptr) {
1221       if (!pe->InDomain(seek_key)) {
1222         // Prefix seek a non-in-domain key is undefined. Skip checking for
1223         // this scenario.
1224         *diverged = true;
1225         return;
1226       }
1227 
1228       if (!pe->InDomain(total_order_key) ||
1229           pe->Transform(total_order_key) != pe->Transform(seek_key)) {
1230         // If the prefix is exhausted, the only thing needs to check
1231         // is the iterator isn't return a position in prefix.
1232         // Either way, checking can stop from here.
1233         *diverged = true;
1234         if (!iter->Valid() || !pe->InDomain(iter->key()) ||
1235             pe->Transform(iter->key()) != pe->Transform(seek_key)) {
1236           return;
1237         }
1238         fprintf(stderr,
1239                 "Iterator stays in prefix but contol doesn't"
1240                 " iterator key %s control iterator key %s %s\n",
1241                 iter->key().ToString(true).c_str(),
1242                 cmp_iter->key().ToString(true).c_str(), op_logs.c_str());
1243       }
1244     }
1245     // Check upper or lower bounds.
1246     if (!*diverged) {
1247       if ((iter->Valid() && iter->key() != cmp_iter->key()) ||
1248           (!iter->Valid() &&
1249            (ro.iterate_upper_bound == nullptr ||
1250             cmp->CompareWithoutTimestamp(total_order_key, /*a_has_ts=*/false,
1251                                          *ro.iterate_upper_bound,
1252                                          /*b_has_ts=*/false) < 0) &&
1253            (ro.iterate_lower_bound == nullptr ||
1254             cmp->CompareWithoutTimestamp(total_order_key, /*a_has_ts=*/false,
1255                                          *ro.iterate_lower_bound,
1256                                          /*b_has_ts=*/false) > 0))) {
1257         fprintf(stderr,
1258                 "Iterator diverged from control iterator which"
1259                 " has value %s %s\n",
1260                 total_order_key.ToString(true).c_str(), op_logs.c_str());
1261         if (iter->Valid()) {
1262           fprintf(stderr, "iterator has value %s\n",
1263                   iter->key().ToString(true).c_str());
1264         } else {
1265           fprintf(stderr, "iterator is not valid\n");
1266         }
1267         *diverged = true;
1268       }
1269     }
1270   }
1271   if (*diverged) {
1272     fprintf(stderr, "Control CF %s\n", cmp_cfh->GetName().c_str());
1273     thread->stats.AddErrors(1);
1274     // Fail fast to preserve the DB state.
1275     thread->shared->SetVerificationFailure();
1276   }
1277 }
1278 
1279 #ifdef ROCKSDB_LITE
TestBackupRestore(ThreadState *,const std::vector<int> &,const std::vector<int64_t> &)1280 Status StressTest::TestBackupRestore(
1281     ThreadState* /* thread */,
1282     const std::vector<int>& /* rand_column_families */,
1283     const std::vector<int64_t>& /* rand_keys */) {
1284   assert(false);
1285   fprintf(stderr,
1286           "RocksDB lite does not support "
1287           "TestBackupRestore\n");
1288   std::terminate();
1289 }
1290 
TestCheckpoint(ThreadState *,const std::vector<int> &,const std::vector<int64_t> &)1291 Status StressTest::TestCheckpoint(
1292     ThreadState* /* thread */,
1293     const std::vector<int>& /* rand_column_families */,
1294     const std::vector<int64_t>& /* rand_keys */) {
1295   assert(false);
1296   fprintf(stderr,
1297           "RocksDB lite does not support "
1298           "TestCheckpoint\n");
1299   std::terminate();
1300 }
1301 
TestCompactFiles(ThreadState *,ColumnFamilyHandle *)1302 void StressTest::TestCompactFiles(ThreadState* /* thread */,
1303                                   ColumnFamilyHandle* /* column_family */) {
1304   assert(false);
1305   fprintf(stderr,
1306           "RocksDB lite does not support "
1307           "CompactFiles\n");
1308   std::terminate();
1309 }
1310 #else   // ROCKSDB_LITE
TestBackupRestore(ThreadState * thread,const std::vector<int> & rand_column_families,const std::vector<int64_t> & rand_keys)1311 Status StressTest::TestBackupRestore(
1312     ThreadState* thread, const std::vector<int>& rand_column_families,
1313     const std::vector<int64_t>& rand_keys) {
1314   std::string backup_dir = FLAGS_db + "/.backup" + ToString(thread->tid);
1315   std::string restore_dir = FLAGS_db + "/.restore" + ToString(thread->tid);
1316   BackupableDBOptions backup_opts(backup_dir);
1317   // For debugging, get info_log from live options
1318   backup_opts.info_log = db_->GetDBOptions().info_log.get();
1319   assert(backup_opts.info_log);
1320   if (thread->rand.OneIn(10)) {
1321     backup_opts.share_table_files = false;
1322   } else {
1323     backup_opts.share_table_files = true;
1324     if (thread->rand.OneIn(5)) {
1325       backup_opts.share_files_with_checksum = false;
1326     } else {
1327       backup_opts.share_files_with_checksum = true;
1328       if (thread->rand.OneIn(2)) {
1329         // old
1330         backup_opts.share_files_with_checksum_naming =
1331             BackupableDBOptions::kLegacyCrc32cAndFileSize;
1332       } else {
1333         // new
1334         backup_opts.share_files_with_checksum_naming =
1335             BackupableDBOptions::kUseDbSessionId;
1336       }
1337       if (thread->rand.OneIn(2)) {
1338         backup_opts.share_files_with_checksum_naming =
1339             backup_opts.share_files_with_checksum_naming |
1340             BackupableDBOptions::kFlagIncludeFileSize;
1341       }
1342     }
1343   }
1344   BackupEngine* backup_engine = nullptr;
1345   std::string from = "a backup/restore operation";
1346   Status s = BackupEngine::Open(db_stress_env, backup_opts, &backup_engine);
1347   if (!s.ok()) {
1348     from = "BackupEngine::Open";
1349   }
1350   if (s.ok()) {
1351     if (thread->rand.OneIn(2)) {
1352       TEST_FutureSchemaVersion2Options test_opts;
1353       test_opts.crc32c_checksums = thread->rand.OneIn(2) == 0;
1354       test_opts.file_sizes = thread->rand.OneIn(2) == 0;
1355       TEST_EnableWriteFutureSchemaVersion2(backup_engine, test_opts);
1356     }
1357     s = backup_engine->CreateNewBackup(db_);
1358     if (!s.ok()) {
1359       from = "BackupEngine::CreateNewBackup";
1360     }
1361   }
1362   if (s.ok()) {
1363     delete backup_engine;
1364     backup_engine = nullptr;
1365     s = BackupEngine::Open(db_stress_env, backup_opts, &backup_engine);
1366     if (!s.ok()) {
1367       from = "BackupEngine::Open (again)";
1368     }
1369   }
1370   std::vector<BackupInfo> backup_info;
1371   // If inplace_not_restore, we verify the backup by opening it as a
1372   // read-only DB. If !inplace_not_restore, we restore it to a temporary
1373   // directory for verification.
1374   bool inplace_not_restore = thread->rand.OneIn(3);
1375   if (s.ok()) {
1376     backup_engine->GetBackupInfo(&backup_info,
1377                                  /*include_file_details*/ inplace_not_restore);
1378     if (backup_info.empty()) {
1379       s = Status::NotFound("no backups found");
1380       from = "BackupEngine::GetBackupInfo";
1381     }
1382   }
1383   if (s.ok() && thread->rand.OneIn(2)) {
1384     s = backup_engine->VerifyBackup(
1385         backup_info.front().backup_id,
1386         thread->rand.OneIn(2) /* verify_with_checksum */);
1387     if (!s.ok()) {
1388       from = "BackupEngine::VerifyBackup";
1389     }
1390   }
1391   const bool allow_persistent = thread->tid == 0;  // not too many
1392   bool from_latest = false;
1393   int count = static_cast<int>(backup_info.size());
1394   if (s.ok() && !inplace_not_restore) {
1395     if (count > 1) {
1396       s = backup_engine->RestoreDBFromBackup(
1397           RestoreOptions(), backup_info[thread->rand.Uniform(count)].backup_id,
1398           restore_dir /* db_dir */, restore_dir /* wal_dir */);
1399       if (!s.ok()) {
1400         from = "BackupEngine::RestoreDBFromBackup";
1401       }
1402     } else {
1403       from_latest = true;
1404       s = backup_engine->RestoreDBFromLatestBackup(RestoreOptions(),
1405                                                    restore_dir /* db_dir */,
1406                                                    restore_dir /* wal_dir */);
1407       if (!s.ok()) {
1408         from = "BackupEngine::RestoreDBFromLatestBackup";
1409       }
1410     }
1411   }
1412   if (s.ok() && !inplace_not_restore) {
1413     // Purge early if restoring, to ensure the restored directory doesn't
1414     // have some secret dependency on the backup directory.
1415     uint32_t to_keep = 0;
1416     if (allow_persistent) {
1417       // allow one thread to keep up to 2 backups
1418       to_keep = thread->rand.Uniform(3);
1419     }
1420     s = backup_engine->PurgeOldBackups(to_keep);
1421     if (!s.ok()) {
1422       from = "BackupEngine::PurgeOldBackups";
1423     }
1424   }
1425   DB* restored_db = nullptr;
1426   std::vector<ColumnFamilyHandle*> restored_cf_handles;
1427   // Not yet implemented: opening restored BlobDB or TransactionDB
1428   if (s.ok() && !FLAGS_use_txn && !FLAGS_use_blob_db) {
1429     Options restore_options(options_);
1430     restore_options.listeners.clear();
1431     // Avoid dangling/shared file descriptors, for reliable destroy
1432     restore_options.sst_file_manager = nullptr;
1433     std::vector<ColumnFamilyDescriptor> cf_descriptors;
1434     // TODO(ajkr): `column_family_names_` is not safe to access here when
1435     // `clear_column_family_one_in != 0`. But we can't easily switch to
1436     // `ListColumnFamilies` to get names because it won't necessarily give
1437     // the same order as `column_family_names_`.
1438     assert(FLAGS_clear_column_family_one_in == 0);
1439     for (auto name : column_family_names_) {
1440       cf_descriptors.emplace_back(name, ColumnFamilyOptions(restore_options));
1441     }
1442     if (inplace_not_restore) {
1443       BackupInfo& info = backup_info[thread->rand.Uniform(count)];
1444       restore_options.env = info.env_for_open.get();
1445       s = DB::OpenForReadOnly(DBOptions(restore_options), info.name_for_open,
1446                               cf_descriptors, &restored_cf_handles,
1447                               &restored_db);
1448       if (!s.ok()) {
1449         from = "DB::OpenForReadOnly in backup/restore";
1450       }
1451     } else {
1452       s = DB::Open(DBOptions(restore_options), restore_dir, cf_descriptors,
1453                    &restored_cf_handles, &restored_db);
1454       if (!s.ok()) {
1455         from = "DB::Open in backup/restore";
1456       }
1457     }
1458   }
1459   // Note the column families chosen by `rand_column_families` cannot be
1460   // dropped while the locks for `rand_keys` are held. So we should not have
1461   // to worry about accessing those column families throughout this function.
1462   //
1463   // For simplicity, currently only verifies existence/non-existence of a
1464   // single key
1465   for (size_t i = 0; restored_db && s.ok() && i < rand_column_families.size();
1466        ++i) {
1467     std::string key_str = Key(rand_keys[0]);
1468     Slice key = key_str;
1469     std::string restored_value;
1470     ReadOptions read_opts;
1471     std::string ts_str;
1472     Slice ts;
1473     if (FLAGS_user_timestamp_size > 0) {
1474       ts_str = GenerateTimestampForRead();
1475       ts = ts_str;
1476       read_opts.timestamp = &ts;
1477     }
1478     Status get_status = restored_db->Get(
1479         read_opts, restored_cf_handles[rand_column_families[i]], key,
1480         &restored_value);
1481     bool exists = thread->shared->Exists(rand_column_families[i], rand_keys[0]);
1482     if (get_status.ok()) {
1483       if (!exists && from_latest && ShouldAcquireMutexOnKey()) {
1484         s = Status::Corruption("key exists in restore but not in original db");
1485       }
1486     } else if (get_status.IsNotFound()) {
1487       if (exists && from_latest && ShouldAcquireMutexOnKey()) {
1488         s = Status::Corruption("key exists in original db but not in restore");
1489       }
1490     } else {
1491       s = get_status;
1492       if (!s.ok()) {
1493         from = "DB::Get in backup/restore";
1494       }
1495     }
1496   }
1497   if (restored_db != nullptr) {
1498     for (auto* cf_handle : restored_cf_handles) {
1499       restored_db->DestroyColumnFamilyHandle(cf_handle);
1500     }
1501     delete restored_db;
1502     restored_db = nullptr;
1503   }
1504   if (s.ok() && inplace_not_restore) {
1505     // Purge late if inplace open read-only
1506     uint32_t to_keep = 0;
1507     if (allow_persistent) {
1508       // allow one thread to keep up to 2 backups
1509       to_keep = thread->rand.Uniform(3);
1510     }
1511     s = backup_engine->PurgeOldBackups(to_keep);
1512     if (!s.ok()) {
1513       from = "BackupEngine::PurgeOldBackups";
1514     }
1515   }
1516   if (backup_engine != nullptr) {
1517     delete backup_engine;
1518     backup_engine = nullptr;
1519   }
1520   if (s.ok()) {
1521     // Preserve directories on failure, or allowed persistent backup
1522     if (!allow_persistent) {
1523       s = DestroyDir(db_stress_env, backup_dir);
1524       if (!s.ok()) {
1525         from = "Destroy backup dir";
1526       }
1527     }
1528   }
1529   if (s.ok()) {
1530     s = DestroyDir(db_stress_env, restore_dir);
1531     if (!s.ok()) {
1532       from = "Destroy restore dir";
1533     }
1534   }
1535   if (!s.ok()) {
1536     fprintf(stderr, "Failure in %s with: %s\n", from.c_str(),
1537             s.ToString().c_str());
1538   }
1539   return s;
1540 }
1541 
TestApproximateSize(ThreadState * thread,uint64_t iteration,const std::vector<int> & rand_column_families,const std::vector<int64_t> & rand_keys)1542 Status StressTest::TestApproximateSize(
1543     ThreadState* thread, uint64_t iteration,
1544     const std::vector<int>& rand_column_families,
1545     const std::vector<int64_t>& rand_keys) {
1546   // rand_keys likely only has one key. Just use the first one.
1547   assert(!rand_keys.empty());
1548   assert(!rand_column_families.empty());
1549   int64_t key1 = rand_keys[0];
1550   int64_t key2;
1551   if (thread->rand.OneIn(2)) {
1552     // Two totally random keys. This tends to cover large ranges.
1553     key2 = GenerateOneKey(thread, iteration);
1554     if (key2 < key1) {
1555       std::swap(key1, key2);
1556     }
1557   } else {
1558     // Unless users pass a very large FLAGS_max_key, it we should not worry
1559     // about overflow. It is for testing, so we skip the overflow checking
1560     // for simplicity.
1561     key2 = key1 + static_cast<int64_t>(thread->rand.Uniform(1000));
1562   }
1563   std::string key1_str = Key(key1);
1564   std::string key2_str = Key(key2);
1565   Range range{Slice(key1_str), Slice(key2_str)};
1566   SizeApproximationOptions sao;
1567   sao.include_memtabtles = thread->rand.OneIn(2);
1568   if (sao.include_memtabtles) {
1569     sao.include_files = thread->rand.OneIn(2);
1570   }
1571   if (thread->rand.OneIn(2)) {
1572     if (thread->rand.OneIn(2)) {
1573       sao.files_size_error_margin = 0.0;
1574     } else {
1575       sao.files_size_error_margin =
1576           static_cast<double>(thread->rand.Uniform(3));
1577     }
1578   }
1579   uint64_t result;
1580   return db_->GetApproximateSizes(
1581       sao, column_families_[rand_column_families[0]], &range, 1, &result);
1582 }
1583 
TestCheckpoint(ThreadState * thread,const std::vector<int> & rand_column_families,const std::vector<int64_t> & rand_keys)1584 Status StressTest::TestCheckpoint(ThreadState* thread,
1585                                   const std::vector<int>& rand_column_families,
1586                                   const std::vector<int64_t>& rand_keys) {
1587   std::string checkpoint_dir =
1588       FLAGS_db + "/.checkpoint" + ToString(thread->tid);
1589   Options tmp_opts(options_);
1590   tmp_opts.listeners.clear();
1591   tmp_opts.env = db_stress_env;
1592 
1593   DestroyDB(checkpoint_dir, tmp_opts);
1594 
1595   if (db_stress_env->FileExists(checkpoint_dir).ok()) {
1596     // If the directory might still exist, try to delete the files one by one.
1597     // Likely a trash file is still there.
1598     Status my_s = DestroyDir(db_stress_env, checkpoint_dir);
1599     if (!my_s.ok()) {
1600       fprintf(stderr, "Fail to destory directory before checkpoint: %s",
1601               my_s.ToString().c_str());
1602     }
1603   }
1604 
1605   Checkpoint* checkpoint = nullptr;
1606   Status s = Checkpoint::Create(db_, &checkpoint);
1607   if (s.ok()) {
1608     s = checkpoint->CreateCheckpoint(checkpoint_dir);
1609     if (!s.ok()) {
1610       fprintf(stderr, "Fail to create checkpoint to %s\n",
1611               checkpoint_dir.c_str());
1612       std::vector<std::string> files;
1613       Status my_s = db_stress_env->GetChildren(checkpoint_dir, &files);
1614       if (my_s.ok()) {
1615         for (const auto& f : files) {
1616           fprintf(stderr, " %s\n", f.c_str());
1617         }
1618       } else {
1619         fprintf(stderr, "Fail to get files under the directory to %s\n",
1620                 my_s.ToString().c_str());
1621       }
1622     }
1623   }
1624   delete checkpoint;
1625   checkpoint = nullptr;
1626   std::vector<ColumnFamilyHandle*> cf_handles;
1627   DB* checkpoint_db = nullptr;
1628   if (s.ok()) {
1629     Options options(options_);
1630     options.listeners.clear();
1631     std::vector<ColumnFamilyDescriptor> cf_descs;
1632     // TODO(ajkr): `column_family_names_` is not safe to access here when
1633     // `clear_column_family_one_in != 0`. But we can't easily switch to
1634     // `ListColumnFamilies` to get names because it won't necessarily give
1635     // the same order as `column_family_names_`.
1636     assert(FLAGS_clear_column_family_one_in == 0);
1637     if (FLAGS_clear_column_family_one_in == 0) {
1638       for (const auto& name : column_family_names_) {
1639         cf_descs.emplace_back(name, ColumnFamilyOptions(options));
1640       }
1641       s = DB::OpenForReadOnly(DBOptions(options), checkpoint_dir, cf_descs,
1642                               &cf_handles, &checkpoint_db);
1643     }
1644   }
1645   if (checkpoint_db != nullptr) {
1646     // Note the column families chosen by `rand_column_families` cannot be
1647     // dropped while the locks for `rand_keys` are held. So we should not have
1648     // to worry about accessing those column families throughout this function.
1649     for (size_t i = 0; s.ok() && i < rand_column_families.size(); ++i) {
1650       std::string key_str = Key(rand_keys[0]);
1651       Slice key = key_str;
1652       std::string value;
1653       Status get_status = checkpoint_db->Get(
1654           ReadOptions(), cf_handles[rand_column_families[i]], key, &value);
1655       bool exists =
1656           thread->shared->Exists(rand_column_families[i], rand_keys[0]);
1657       if (get_status.ok()) {
1658         if (!exists && ShouldAcquireMutexOnKey()) {
1659           s = Status::Corruption(
1660               "key exists in checkpoint but not in original db");
1661         }
1662       } else if (get_status.IsNotFound()) {
1663         if (exists && ShouldAcquireMutexOnKey()) {
1664           s = Status::Corruption(
1665               "key exists in original db but not in checkpoint");
1666         }
1667       } else {
1668         s = get_status;
1669       }
1670     }
1671     for (auto cfh : cf_handles) {
1672       delete cfh;
1673     }
1674     cf_handles.clear();
1675     delete checkpoint_db;
1676     checkpoint_db = nullptr;
1677   }
1678 
1679   if (!s.ok()) {
1680     fprintf(stderr, "A checkpoint operation failed with: %s\n",
1681             s.ToString().c_str());
1682   } else {
1683     DestroyDB(checkpoint_dir, tmp_opts);
1684   }
1685   return s;
1686 }
1687 
TestGetProperty(ThreadState * thread) const1688 void StressTest::TestGetProperty(ThreadState* thread) const {
1689   std::unordered_set<std::string> levelPropertyNames = {
1690       DB::Properties::kAggregatedTablePropertiesAtLevel,
1691       DB::Properties::kCompressionRatioAtLevelPrefix,
1692       DB::Properties::kNumFilesAtLevelPrefix,
1693   };
1694   std::unordered_set<std::string> unknownPropertyNames = {
1695       DB::Properties::kEstimateOldestKeyTime,
1696       DB::Properties::kOptionsStatistics,
1697   };
1698   unknownPropertyNames.insert(levelPropertyNames.begin(),
1699                               levelPropertyNames.end());
1700 
1701   std::string prop;
1702   for (const auto& ppt_name_and_info : InternalStats::ppt_name_to_info) {
1703     bool res = db_->GetProperty(ppt_name_and_info.first, &prop);
1704     if (unknownPropertyNames.find(ppt_name_and_info.first) ==
1705         unknownPropertyNames.end()) {
1706       if (!res) {
1707         fprintf(stderr, "Failed to get DB property: %s\n",
1708                 ppt_name_and_info.first.c_str());
1709         thread->shared->SetVerificationFailure();
1710       }
1711       if (ppt_name_and_info.second.handle_int != nullptr) {
1712         uint64_t prop_int;
1713         if (!db_->GetIntProperty(ppt_name_and_info.first, &prop_int)) {
1714           fprintf(stderr, "Failed to get Int property: %s\n",
1715                   ppt_name_and_info.first.c_str());
1716           thread->shared->SetVerificationFailure();
1717         }
1718       }
1719     }
1720   }
1721 
1722   ROCKSDB_NAMESPACE::ColumnFamilyMetaData cf_meta_data;
1723   db_->GetColumnFamilyMetaData(&cf_meta_data);
1724   int level_size = static_cast<int>(cf_meta_data.levels.size());
1725   for (int level = 0; level < level_size; level++) {
1726     for (const auto& ppt_name : levelPropertyNames) {
1727       bool res = db_->GetProperty(ppt_name + std::to_string(level), &prop);
1728       if (!res) {
1729         fprintf(stderr, "Failed to get DB property: %s\n",
1730                 (ppt_name + std::to_string(level)).c_str());
1731         thread->shared->SetVerificationFailure();
1732       }
1733     }
1734   }
1735 
1736   // Test for an invalid property name
1737   if (thread->rand.OneIn(100)) {
1738     if (db_->GetProperty("rocksdb.invalid_property_name", &prop)) {
1739       fprintf(stderr, "Failed to return false for invalid property name\n");
1740       thread->shared->SetVerificationFailure();
1741     }
1742   }
1743 }
1744 
TestCompactFiles(ThreadState * thread,ColumnFamilyHandle * column_family)1745 void StressTest::TestCompactFiles(ThreadState* thread,
1746                                   ColumnFamilyHandle* column_family) {
1747   ROCKSDB_NAMESPACE::ColumnFamilyMetaData cf_meta_data;
1748   db_->GetColumnFamilyMetaData(column_family, &cf_meta_data);
1749 
1750   // Randomly compact up to three consecutive files from a level
1751   const int kMaxRetry = 3;
1752   for (int attempt = 0; attempt < kMaxRetry; ++attempt) {
1753     size_t random_level =
1754         thread->rand.Uniform(static_cast<int>(cf_meta_data.levels.size()));
1755 
1756     const auto& files = cf_meta_data.levels[random_level].files;
1757     if (files.size() > 0) {
1758       size_t random_file_index =
1759           thread->rand.Uniform(static_cast<int>(files.size()));
1760       if (files[random_file_index].being_compacted) {
1761         // Retry as the selected file is currently being compacted
1762         continue;
1763       }
1764 
1765       std::vector<std::string> input_files;
1766       input_files.push_back(files[random_file_index].name);
1767       if (random_file_index > 0 &&
1768           !files[random_file_index - 1].being_compacted) {
1769         input_files.push_back(files[random_file_index - 1].name);
1770       }
1771       if (random_file_index + 1 < files.size() &&
1772           !files[random_file_index + 1].being_compacted) {
1773         input_files.push_back(files[random_file_index + 1].name);
1774       }
1775 
1776       size_t output_level =
1777           std::min(random_level + 1, cf_meta_data.levels.size() - 1);
1778       auto s = db_->CompactFiles(CompactionOptions(), column_family,
1779                                  input_files, static_cast<int>(output_level));
1780       if (!s.ok()) {
1781         fprintf(stdout, "Unable to perform CompactFiles(): %s\n",
1782                 s.ToString().c_str());
1783         thread->stats.AddNumCompactFilesFailed(1);
1784       } else {
1785         thread->stats.AddNumCompactFilesSucceed(1);
1786       }
1787       break;
1788     }
1789   }
1790 }
1791 #endif  // ROCKSDB_LITE
1792 
TestFlush(const std::vector<int> & rand_column_families)1793 Status StressTest::TestFlush(const std::vector<int>& rand_column_families) {
1794   FlushOptions flush_opts;
1795   std::vector<ColumnFamilyHandle*> cfhs;
1796   std::for_each(rand_column_families.begin(), rand_column_families.end(),
1797                 [this, &cfhs](int k) { cfhs.push_back(column_families_[k]); });
1798   return db_->Flush(flush_opts, cfhs);
1799 }
1800 
TestPauseBackground(ThreadState * thread)1801 Status StressTest::TestPauseBackground(ThreadState* thread) {
1802   Status status = db_->PauseBackgroundWork();
1803   if (!status.ok()) {
1804     return status;
1805   }
1806   // To avoid stalling/deadlocking ourself in this thread, just
1807   // sleep here during pause and let other threads do db operations.
1808   // Sleep up to ~16 seconds (2**24 microseconds), but very skewed
1809   // toward short pause. (1 chance in 25 of pausing >= 1s;
1810   // 1 chance in 625 of pausing full 16s.)
1811   int pwr2_micros =
1812       std::min(thread->rand.Uniform(25), thread->rand.Uniform(25));
1813   clock_->SleepForMicroseconds(1 << pwr2_micros);
1814   return db_->ContinueBackgroundWork();
1815 }
1816 
TestAcquireSnapshot(ThreadState * thread,int rand_column_family,const std::string & keystr,uint64_t i)1817 void StressTest::TestAcquireSnapshot(ThreadState* thread,
1818                                      int rand_column_family,
1819                                      const std::string& keystr, uint64_t i) {
1820   Slice key = keystr;
1821   ColumnFamilyHandle* column_family = column_families_[rand_column_family];
1822   ReadOptions ropt;
1823 #ifndef ROCKSDB_LITE
1824   auto db_impl = static_cast_with_check<DBImpl>(db_->GetRootDB());
1825   const bool ww_snapshot = thread->rand.OneIn(10);
1826   const Snapshot* snapshot =
1827       ww_snapshot ? db_impl->GetSnapshotForWriteConflictBoundary()
1828                   : db_->GetSnapshot();
1829 #else
1830   const Snapshot* snapshot = db_->GetSnapshot();
1831 #endif  // !ROCKSDB_LITE
1832   ropt.snapshot = snapshot;
1833 
1834   // Ideally, we want snapshot taking and timestamp generation to be atomic
1835   // here, so that the snapshot corresponds to the timestamp. However, it is
1836   // not possible with current GetSnapshot() API.
1837   std::string ts_str;
1838   Slice ts;
1839   if (FLAGS_user_timestamp_size > 0) {
1840     ts_str = GenerateTimestampForRead();
1841     ts = ts_str;
1842     ropt.timestamp = &ts;
1843   }
1844 
1845   std::string value_at;
1846   // When taking a snapshot, we also read a key from that snapshot. We
1847   // will later read the same key before releasing the snapshot and
1848   // verify that the results are the same.
1849   auto status_at = db_->Get(ropt, column_family, key, &value_at);
1850   std::vector<bool>* key_vec = nullptr;
1851 
1852   if (FLAGS_compare_full_db_state_snapshot && (thread->tid == 0)) {
1853     key_vec = new std::vector<bool>(FLAGS_max_key);
1854     // When `prefix_extractor` is set, seeking to beginning and scanning
1855     // across prefixes are only supported with `total_order_seek` set.
1856     ropt.total_order_seek = true;
1857     std::unique_ptr<Iterator> iterator(db_->NewIterator(ropt));
1858     for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
1859       uint64_t key_val;
1860       if (GetIntVal(iterator->key().ToString(), &key_val)) {
1861         (*key_vec)[key_val] = true;
1862       }
1863     }
1864   }
1865 
1866   ThreadState::SnapshotState snap_state = {snapshot,
1867                                            rand_column_family,
1868                                            column_family->GetName(),
1869                                            keystr,
1870                                            status_at,
1871                                            value_at,
1872                                            key_vec,
1873                                            ts_str};
1874   uint64_t hold_for = FLAGS_snapshot_hold_ops;
1875   if (FLAGS_long_running_snapshots) {
1876     // Hold 10% of snapshots for 10x more
1877     if (thread->rand.OneIn(10)) {
1878       assert(hold_for < port::kMaxInt64 / 10);
1879       hold_for *= 10;
1880       // Hold 1% of snapshots for 100x more
1881       if (thread->rand.OneIn(10)) {
1882         assert(hold_for < port::kMaxInt64 / 10);
1883         hold_for *= 10;
1884       }
1885     }
1886   }
1887   uint64_t release_at = std::min(FLAGS_ops_per_thread - 1, i + hold_for);
1888   thread->snapshot_queue.emplace(release_at, snap_state);
1889 }
1890 
MaybeReleaseSnapshots(ThreadState * thread,uint64_t i)1891 Status StressTest::MaybeReleaseSnapshots(ThreadState* thread, uint64_t i) {
1892   while (!thread->snapshot_queue.empty() &&
1893          i >= thread->snapshot_queue.front().first) {
1894     auto snap_state = thread->snapshot_queue.front().second;
1895     assert(snap_state.snapshot);
1896     // Note: this is unsafe as the cf might be dropped concurrently. But
1897     // it is ok since unclean cf drop is cunnrently not supported by write
1898     // prepared transactions.
1899     Status s = AssertSame(db_, column_families_[snap_state.cf_at], snap_state);
1900     db_->ReleaseSnapshot(snap_state.snapshot);
1901     delete snap_state.key_vec;
1902     thread->snapshot_queue.pop();
1903     if (!s.ok()) {
1904       return s;
1905     }
1906   }
1907   return Status::OK();
1908 }
1909 
TestCompactRange(ThreadState * thread,int64_t rand_key,const Slice & start_key,ColumnFamilyHandle * column_family)1910 void StressTest::TestCompactRange(ThreadState* thread, int64_t rand_key,
1911                                   const Slice& start_key,
1912                                   ColumnFamilyHandle* column_family) {
1913   int64_t end_key_num;
1914   if (port::kMaxInt64 - rand_key < FLAGS_compact_range_width) {
1915     end_key_num = port::kMaxInt64;
1916   } else {
1917     end_key_num = FLAGS_compact_range_width + rand_key;
1918   }
1919   std::string end_key_buf = Key(end_key_num);
1920   Slice end_key(end_key_buf);
1921 
1922   CompactRangeOptions cro;
1923   cro.exclusive_manual_compaction = static_cast<bool>(thread->rand.Next() % 2);
1924   cro.change_level = static_cast<bool>(thread->rand.Next() % 2);
1925   std::vector<BottommostLevelCompaction> bottom_level_styles = {
1926       BottommostLevelCompaction::kSkip,
1927       BottommostLevelCompaction::kIfHaveCompactionFilter,
1928       BottommostLevelCompaction::kForce,
1929       BottommostLevelCompaction::kForceOptimized};
1930   cro.bottommost_level_compaction =
1931       bottom_level_styles[thread->rand.Next() %
1932                           static_cast<uint32_t>(bottom_level_styles.size())];
1933   cro.allow_write_stall = static_cast<bool>(thread->rand.Next() % 2);
1934   cro.max_subcompactions = static_cast<uint32_t>(thread->rand.Next() % 4);
1935 
1936   const Snapshot* pre_snapshot = nullptr;
1937   uint32_t pre_hash = 0;
1938   if (thread->rand.OneIn(2)) {
1939     // Do some validation by declaring a snapshot and compare the data before
1940     // and after the compaction
1941     pre_snapshot = db_->GetSnapshot();
1942     pre_hash =
1943         GetRangeHash(thread, pre_snapshot, column_family, start_key, end_key);
1944   }
1945 
1946   Status status = db_->CompactRange(cro, column_family, &start_key, &end_key);
1947 
1948   if (!status.ok()) {
1949     fprintf(stdout, "Unable to perform CompactRange(): %s\n",
1950             status.ToString().c_str());
1951   }
1952 
1953   if (pre_snapshot != nullptr) {
1954     uint32_t post_hash =
1955         GetRangeHash(thread, pre_snapshot, column_family, start_key, end_key);
1956     if (pre_hash != post_hash) {
1957       fprintf(stderr,
1958               "Data hash different before and after compact range "
1959               "start_key %s end_key %s\n",
1960               start_key.ToString(true).c_str(), end_key.ToString(true).c_str());
1961       thread->stats.AddErrors(1);
1962       // Fail fast to preserve the DB state.
1963       thread->shared->SetVerificationFailure();
1964     }
1965     db_->ReleaseSnapshot(pre_snapshot);
1966   }
1967 }
1968 
GetRangeHash(ThreadState * thread,const Snapshot * snapshot,ColumnFamilyHandle * column_family,const Slice & start_key,const Slice & end_key)1969 uint32_t StressTest::GetRangeHash(ThreadState* thread, const Snapshot* snapshot,
1970                                   ColumnFamilyHandle* column_family,
1971                                   const Slice& start_key,
1972                                   const Slice& end_key) {
1973   const std::string kCrcCalculatorSepearator = ";";
1974   uint32_t crc = 0;
1975   ReadOptions ro;
1976   ro.snapshot = snapshot;
1977   ro.total_order_seek = true;
1978   std::string ts_str;
1979   Slice ts;
1980   if (FLAGS_user_timestamp_size > 0) {
1981     ts_str = GenerateTimestampForRead();
1982     ts = ts_str;
1983     ro.timestamp = &ts;
1984   }
1985   std::unique_ptr<Iterator> it(db_->NewIterator(ro, column_family));
1986   for (it->Seek(start_key);
1987        it->Valid() && options_.comparator->Compare(it->key(), end_key) <= 0;
1988        it->Next()) {
1989     crc = crc32c::Extend(crc, it->key().data(), it->key().size());
1990     crc = crc32c::Extend(crc, kCrcCalculatorSepearator.data(), 1);
1991     crc = crc32c::Extend(crc, it->value().data(), it->value().size());
1992     crc = crc32c::Extend(crc, kCrcCalculatorSepearator.data(), 1);
1993   }
1994   if (!it->status().ok()) {
1995     fprintf(stderr, "Iterator non-OK when calculating range CRC: %s\n",
1996             it->status().ToString().c_str());
1997     thread->stats.AddErrors(1);
1998     // Fail fast to preserve the DB state.
1999     thread->shared->SetVerificationFailure();
2000   }
2001   return crc;
2002 }
2003 
PrintEnv() const2004 void StressTest::PrintEnv() const {
2005   fprintf(stdout, "RocksDB version           : %d.%d\n", kMajorVersion,
2006           kMinorVersion);
2007   fprintf(stdout, "Format version            : %d\n", FLAGS_format_version);
2008   fprintf(stdout, "TransactionDB             : %s\n",
2009           FLAGS_use_txn ? "true" : "false");
2010 #ifndef ROCKSDB_LITE
2011   fprintf(stdout, "Stacked BlobDB            : %s\n",
2012           FLAGS_use_blob_db ? "true" : "false");
2013 #endif  // !ROCKSDB_LITE
2014   fprintf(stdout, "Read only mode            : %s\n",
2015           FLAGS_read_only ? "true" : "false");
2016   fprintf(stdout, "Atomic flush              : %s\n",
2017           FLAGS_atomic_flush ? "true" : "false");
2018   fprintf(stdout, "Column families           : %d\n", FLAGS_column_families);
2019   if (!FLAGS_test_batches_snapshots) {
2020     fprintf(stdout, "Clear CFs one in          : %d\n",
2021             FLAGS_clear_column_family_one_in);
2022   }
2023   fprintf(stdout, "Number of threads         : %d\n", FLAGS_threads);
2024   fprintf(stdout, "Ops per thread            : %lu\n",
2025           (unsigned long)FLAGS_ops_per_thread);
2026   std::string ttl_state("unused");
2027   if (FLAGS_ttl > 0) {
2028     ttl_state = NumberToString(FLAGS_ttl);
2029   }
2030   fprintf(stdout, "Time to live(sec)         : %s\n", ttl_state.c_str());
2031   fprintf(stdout, "Read percentage           : %d%%\n", FLAGS_readpercent);
2032   fprintf(stdout, "Prefix percentage         : %d%%\n", FLAGS_prefixpercent);
2033   fprintf(stdout, "Write percentage          : %d%%\n", FLAGS_writepercent);
2034   fprintf(stdout, "Delete percentage         : %d%%\n", FLAGS_delpercent);
2035   fprintf(stdout, "Delete range percentage   : %d%%\n", FLAGS_delrangepercent);
2036   fprintf(stdout, "No overwrite percentage   : %d%%\n",
2037           FLAGS_nooverwritepercent);
2038   fprintf(stdout, "Iterate percentage        : %d%%\n", FLAGS_iterpercent);
2039   fprintf(stdout, "DB-write-buffer-size      : %" PRIu64 "\n",
2040           FLAGS_db_write_buffer_size);
2041   fprintf(stdout, "Write-buffer-size         : %d\n", FLAGS_write_buffer_size);
2042   fprintf(stdout, "Iterations                : %lu\n",
2043           (unsigned long)FLAGS_num_iterations);
2044   fprintf(stdout, "Max key                   : %lu\n",
2045           (unsigned long)FLAGS_max_key);
2046   fprintf(stdout, "Ratio #ops/#keys          : %f\n",
2047           (1.0 * FLAGS_ops_per_thread * FLAGS_threads) / FLAGS_max_key);
2048   fprintf(stdout, "Num times DB reopens      : %d\n", FLAGS_reopen);
2049   fprintf(stdout, "Batches/snapshots         : %d\n",
2050           FLAGS_test_batches_snapshots);
2051   fprintf(stdout, "Do update in place        : %d\n", FLAGS_in_place_update);
2052   fprintf(stdout, "Num keys per lock         : %d\n",
2053           1 << FLAGS_log2_keys_per_lock);
2054   std::string compression = CompressionTypeToString(compression_type_e);
2055   fprintf(stdout, "Compression               : %s\n", compression.c_str());
2056   std::string bottommost_compression =
2057       CompressionTypeToString(bottommost_compression_type_e);
2058   fprintf(stdout, "Bottommost Compression    : %s\n",
2059           bottommost_compression.c_str());
2060   std::string checksum = ChecksumTypeToString(checksum_type_e);
2061   fprintf(stdout, "Checksum type             : %s\n", checksum.c_str());
2062   fprintf(stdout, "File checksum impl        : %s\n",
2063           FLAGS_file_checksum_impl.c_str());
2064   fprintf(stdout, "Bloom bits / key          : %s\n",
2065           FormatDoubleParam(FLAGS_bloom_bits).c_str());
2066   fprintf(stdout, "Max subcompactions        : %" PRIu64 "\n",
2067           FLAGS_subcompactions);
2068   fprintf(stdout, "Use MultiGet              : %s\n",
2069           FLAGS_use_multiget ? "true" : "false");
2070 
2071   const char* memtablerep = "";
2072   switch (FLAGS_rep_factory) {
2073     case kSkipList:
2074       memtablerep = "skip_list";
2075       break;
2076     case kHashSkipList:
2077       memtablerep = "prefix_hash";
2078       break;
2079     case kVectorRep:
2080       memtablerep = "vector";
2081       break;
2082   }
2083 
2084   fprintf(stdout, "Memtablerep               : %s\n", memtablerep);
2085 
2086 #ifndef NDEBUG
2087   KillPoint* kp = KillPoint::GetInstance();
2088   fprintf(stdout, "Test kill odd             : %d\n", kp->rocksdb_kill_odds);
2089   if (!kp->rocksdb_kill_exclude_prefixes.empty()) {
2090     fprintf(stdout, "Skipping kill points prefixes:\n");
2091     for (auto& p : kp->rocksdb_kill_exclude_prefixes) {
2092       fprintf(stdout, "  %s\n", p.c_str());
2093     }
2094   }
2095 #endif
2096   fprintf(stdout, "Periodic Compaction Secs  : %" PRIu64 "\n",
2097           FLAGS_periodic_compaction_seconds);
2098   fprintf(stdout, "Compaction TTL            : %" PRIu64 "\n",
2099           FLAGS_compaction_ttl);
2100   fprintf(stdout, "Background Purge          : %d\n",
2101           static_cast<int>(FLAGS_avoid_unnecessary_blocking_io));
2102   fprintf(stdout, "Write DB ID to manifest   : %d\n",
2103           static_cast<int>(FLAGS_write_dbid_to_manifest));
2104   fprintf(stdout, "Max Write Batch Group Size: %" PRIu64 "\n",
2105           FLAGS_max_write_batch_group_size_bytes);
2106   fprintf(stdout, "Use dynamic level         : %d\n",
2107           static_cast<int>(FLAGS_level_compaction_dynamic_level_bytes));
2108   fprintf(stdout, "Read fault one in         : %d\n", FLAGS_read_fault_one_in);
2109   fprintf(stdout, "Write fault one in        : %d\n", FLAGS_write_fault_one_in);
2110   fprintf(stdout, "Open metadata write fault one in:\n");
2111   fprintf(stdout, "                            %d\n",
2112           FLAGS_open_metadata_write_fault_one_in);
2113   fprintf(stdout, "Sync fault injection      : %d\n", FLAGS_sync_fault_injection);
2114   fprintf(stdout, "Best efforts recovery     : %d\n",
2115           static_cast<int>(FLAGS_best_efforts_recovery));
2116   fprintf(stdout, "Fail if OPTIONS file error: %d\n",
2117           static_cast<int>(FLAGS_fail_if_options_file_error));
2118   fprintf(stdout, "User timestamp size bytes : %d\n",
2119           static_cast<int>(FLAGS_user_timestamp_size));
2120 
2121   fprintf(stdout, "------------------------------------------------\n");
2122 }
2123 
Open()2124 void StressTest::Open() {
2125   assert(db_ == nullptr);
2126 #ifndef ROCKSDB_LITE
2127   assert(txn_db_ == nullptr);
2128 #endif
2129   if (FLAGS_options_file.empty()) {
2130     BlockBasedTableOptions block_based_options;
2131     block_based_options.block_cache = cache_;
2132     block_based_options.cache_index_and_filter_blocks =
2133         FLAGS_cache_index_and_filter_blocks;
2134     block_based_options.metadata_cache_options.top_level_index_pinning =
2135         static_cast<PinningTier>(FLAGS_top_level_index_pinning);
2136     block_based_options.metadata_cache_options.partition_pinning =
2137         static_cast<PinningTier>(FLAGS_partition_pinning);
2138     block_based_options.metadata_cache_options.unpartitioned_pinning =
2139         static_cast<PinningTier>(FLAGS_unpartitioned_pinning);
2140     block_based_options.block_cache_compressed = compressed_cache_;
2141     block_based_options.checksum = checksum_type_e;
2142     block_based_options.block_size = FLAGS_block_size;
2143     block_based_options.format_version =
2144         static_cast<uint32_t>(FLAGS_format_version);
2145     block_based_options.index_block_restart_interval =
2146         static_cast<int32_t>(FLAGS_index_block_restart_interval);
2147     block_based_options.filter_policy = filter_policy_;
2148     block_based_options.partition_filters = FLAGS_partition_filters;
2149     block_based_options.optimize_filters_for_memory =
2150         FLAGS_optimize_filters_for_memory;
2151     block_based_options.index_type =
2152         static_cast<BlockBasedTableOptions::IndexType>(FLAGS_index_type);
2153     options_.table_factory.reset(
2154         NewBlockBasedTableFactory(block_based_options));
2155     options_.db_write_buffer_size = FLAGS_db_write_buffer_size;
2156     options_.write_buffer_size = FLAGS_write_buffer_size;
2157     options_.max_write_buffer_number = FLAGS_max_write_buffer_number;
2158     options_.min_write_buffer_number_to_merge =
2159         FLAGS_min_write_buffer_number_to_merge;
2160     options_.max_write_buffer_number_to_maintain =
2161         FLAGS_max_write_buffer_number_to_maintain;
2162     options_.max_write_buffer_size_to_maintain =
2163         FLAGS_max_write_buffer_size_to_maintain;
2164     options_.memtable_prefix_bloom_size_ratio =
2165         FLAGS_memtable_prefix_bloom_size_ratio;
2166     options_.memtable_whole_key_filtering = FLAGS_memtable_whole_key_filtering;
2167     options_.max_background_compactions = FLAGS_max_background_compactions;
2168     options_.max_background_flushes = FLAGS_max_background_flushes;
2169     options_.compaction_style =
2170         static_cast<ROCKSDB_NAMESPACE::CompactionStyle>(FLAGS_compaction_style);
2171     if (FLAGS_prefix_size >= 0) {
2172       options_.prefix_extractor.reset(
2173           NewFixedPrefixTransform(FLAGS_prefix_size));
2174     }
2175     options_.max_open_files = FLAGS_open_files;
2176     options_.statistics = dbstats;
2177     options_.env = db_stress_env;
2178     options_.use_fsync = FLAGS_use_fsync;
2179     options_.compaction_readahead_size = FLAGS_compaction_readahead_size;
2180     options_.allow_mmap_reads = FLAGS_mmap_read;
2181     options_.allow_mmap_writes = FLAGS_mmap_write;
2182     options_.use_direct_reads = FLAGS_use_direct_reads;
2183     options_.use_direct_io_for_flush_and_compaction =
2184         FLAGS_use_direct_io_for_flush_and_compaction;
2185     options_.recycle_log_file_num =
2186         static_cast<size_t>(FLAGS_recycle_log_file_num);
2187     options_.target_file_size_base = FLAGS_target_file_size_base;
2188     options_.target_file_size_multiplier = FLAGS_target_file_size_multiplier;
2189     options_.max_bytes_for_level_base = FLAGS_max_bytes_for_level_base;
2190     options_.max_bytes_for_level_multiplier =
2191         FLAGS_max_bytes_for_level_multiplier;
2192     options_.level0_stop_writes_trigger = FLAGS_level0_stop_writes_trigger;
2193     options_.level0_slowdown_writes_trigger =
2194         FLAGS_level0_slowdown_writes_trigger;
2195     options_.level0_file_num_compaction_trigger =
2196         FLAGS_level0_file_num_compaction_trigger;
2197     options_.compression = compression_type_e;
2198     options_.bottommost_compression = bottommost_compression_type_e;
2199     options_.compression_opts.max_dict_bytes = FLAGS_compression_max_dict_bytes;
2200     options_.compression_opts.zstd_max_train_bytes =
2201         FLAGS_compression_zstd_max_train_bytes;
2202     options_.compression_opts.parallel_threads =
2203         FLAGS_compression_parallel_threads;
2204     options_.compression_opts.max_dict_buffer_bytes =
2205         FLAGS_compression_max_dict_buffer_bytes;
2206     options_.create_if_missing = true;
2207     options_.max_manifest_file_size = FLAGS_max_manifest_file_size;
2208     options_.inplace_update_support = FLAGS_in_place_update;
2209     options_.max_subcompactions = static_cast<uint32_t>(FLAGS_subcompactions);
2210     options_.allow_concurrent_memtable_write =
2211         FLAGS_allow_concurrent_memtable_write;
2212     options_.periodic_compaction_seconds = FLAGS_periodic_compaction_seconds;
2213     options_.ttl = FLAGS_compaction_ttl;
2214     options_.enable_pipelined_write = FLAGS_enable_pipelined_write;
2215     options_.enable_write_thread_adaptive_yield =
2216         FLAGS_enable_write_thread_adaptive_yield;
2217     options_.compaction_options_universal.size_ratio =
2218         FLAGS_universal_size_ratio;
2219     options_.compaction_options_universal.min_merge_width =
2220         FLAGS_universal_min_merge_width;
2221     options_.compaction_options_universal.max_merge_width =
2222         FLAGS_universal_max_merge_width;
2223     options_.compaction_options_universal.max_size_amplification_percent =
2224         FLAGS_universal_max_size_amplification_percent;
2225     options_.atomic_flush = FLAGS_atomic_flush;
2226     options_.avoid_unnecessary_blocking_io =
2227         FLAGS_avoid_unnecessary_blocking_io;
2228     options_.write_dbid_to_manifest = FLAGS_write_dbid_to_manifest;
2229     options_.avoid_flush_during_recovery = FLAGS_avoid_flush_during_recovery;
2230     options_.max_write_batch_group_size_bytes =
2231         FLAGS_max_write_batch_group_size_bytes;
2232     options_.level_compaction_dynamic_level_bytes =
2233         FLAGS_level_compaction_dynamic_level_bytes;
2234     options_.file_checksum_gen_factory =
2235         GetFileChecksumImpl(FLAGS_file_checksum_impl);
2236     options_.track_and_verify_wals_in_manifest = true;
2237 
2238     // Integrated BlobDB
2239     options_.enable_blob_files = FLAGS_enable_blob_files;
2240     options_.min_blob_size = FLAGS_min_blob_size;
2241     options_.blob_file_size = FLAGS_blob_file_size;
2242     options_.blob_compression_type =
2243         StringToCompressionType(FLAGS_blob_compression_type.c_str());
2244     options_.enable_blob_garbage_collection =
2245         FLAGS_enable_blob_garbage_collection;
2246     options_.blob_garbage_collection_age_cutoff =
2247         FLAGS_blob_garbage_collection_age_cutoff;
2248   } else {
2249 #ifdef ROCKSDB_LITE
2250     fprintf(stderr, "--options_file not supported in lite mode\n");
2251     exit(1);
2252 #else
2253     DBOptions db_options;
2254     std::vector<ColumnFamilyDescriptor> cf_descriptors;
2255     Status s = LoadOptionsFromFile(FLAGS_options_file, db_stress_env,
2256                                    &db_options, &cf_descriptors);
2257     db_options.env = new DbStressEnvWrapper(db_stress_env);
2258     if (!s.ok()) {
2259       fprintf(stderr, "Unable to load options file %s --- %s\n",
2260               FLAGS_options_file.c_str(), s.ToString().c_str());
2261       exit(1);
2262     }
2263     options_ = Options(db_options, cf_descriptors[0].options);
2264 #endif  // ROCKSDB_LITE
2265   }
2266 
2267   if (FLAGS_rate_limiter_bytes_per_sec > 0) {
2268     options_.rate_limiter.reset(NewGenericRateLimiter(
2269         FLAGS_rate_limiter_bytes_per_sec, 1000 /* refill_period_us */,
2270         10 /* fairness */,
2271         FLAGS_rate_limit_bg_reads ? RateLimiter::Mode::kReadsOnly
2272                                   : RateLimiter::Mode::kWritesOnly));
2273     if (FLAGS_rate_limit_bg_reads) {
2274       options_.new_table_reader_for_compaction_inputs = true;
2275     }
2276   }
2277   if (FLAGS_sst_file_manager_bytes_per_sec > 0 ||
2278       FLAGS_sst_file_manager_bytes_per_truncate > 0) {
2279     Status status;
2280     options_.sst_file_manager.reset(NewSstFileManager(
2281         db_stress_env, options_.info_log, "" /* trash_dir */,
2282         static_cast<int64_t>(FLAGS_sst_file_manager_bytes_per_sec),
2283         true /* delete_existing_trash */, &status,
2284         0.25 /* max_trash_db_ratio */,
2285         FLAGS_sst_file_manager_bytes_per_truncate));
2286     if (!status.ok()) {
2287       fprintf(stderr, "SstFileManager creation failed: %s\n",
2288               status.ToString().c_str());
2289       exit(1);
2290     }
2291   }
2292 
2293   if (FLAGS_prefix_size == 0 && FLAGS_rep_factory == kHashSkipList) {
2294     fprintf(stderr,
2295             "prefeix_size cannot be zero if memtablerep == prefix_hash\n");
2296     exit(1);
2297   }
2298   if (FLAGS_prefix_size != 0 && FLAGS_rep_factory != kHashSkipList) {
2299     fprintf(stderr,
2300             "WARNING: prefix_size is non-zero but "
2301             "memtablerep != prefix_hash\n");
2302   }
2303   switch (FLAGS_rep_factory) {
2304     case kSkipList:
2305       // no need to do anything
2306       break;
2307 #ifndef ROCKSDB_LITE
2308     case kHashSkipList:
2309       options_.memtable_factory.reset(NewHashSkipListRepFactory(10000));
2310       break;
2311     case kVectorRep:
2312       options_.memtable_factory.reset(new VectorRepFactory());
2313       break;
2314 #else
2315     default:
2316       fprintf(stderr,
2317               "RocksdbLite only supports skip list mem table. Skip "
2318               "--rep_factory\n");
2319 #endif  // ROCKSDB_LITE
2320   }
2321 
2322   if (FLAGS_use_full_merge_v1) {
2323     options_.merge_operator = MergeOperators::CreateDeprecatedPutOperator();
2324   } else {
2325     options_.merge_operator = MergeOperators::CreatePutOperator();
2326   }
2327   if (FLAGS_enable_compaction_filter) {
2328     options_.compaction_filter_factory =
2329         std::make_shared<DbStressCompactionFilterFactory>();
2330   }
2331   options_.table_properties_collector_factories.emplace_back(
2332       std::make_shared<DbStressTablePropertiesCollectorFactory>());
2333 
2334   options_.best_efforts_recovery = FLAGS_best_efforts_recovery;
2335   options_.paranoid_file_checks = FLAGS_paranoid_file_checks;
2336   options_.fail_if_options_file_error = FLAGS_fail_if_options_file_error;
2337 
2338   if ((options_.enable_blob_files || options_.enable_blob_garbage_collection ||
2339        FLAGS_allow_setting_blob_options_dynamically) &&
2340       (FLAGS_use_merge || FLAGS_best_efforts_recovery)) {
2341     fprintf(stderr,
2342             "Integrated BlobDB is currently incompatible with Merge, "
2343             "and best-effort recovery\n");
2344     exit(1);
2345   }
2346 
2347   if (options_.enable_blob_files) {
2348     fprintf(stdout,
2349             "Integrated BlobDB: blob files enabled, min blob size %" PRIu64
2350             ", blob file size %" PRIu64 ", blob compression type %s\n",
2351             options_.min_blob_size, options_.blob_file_size,
2352             CompressionTypeToString(options_.blob_compression_type).c_str());
2353   }
2354 
2355   if (options_.enable_blob_garbage_collection) {
2356     fprintf(stdout, "Integrated BlobDB: blob GC enabled, cutoff %f\n",
2357             options_.blob_garbage_collection_age_cutoff);
2358   }
2359 
2360   fprintf(stdout, "DB path: [%s]\n", FLAGS_db.c_str());
2361 
2362   Status s;
2363 
2364   if (FLAGS_user_timestamp_size > 0) {
2365     CheckAndSetOptionsForUserTimestamp();
2366   }
2367 
2368   if (FLAGS_ttl == -1) {
2369     std::vector<std::string> existing_column_families;
2370     s = DB::ListColumnFamilies(DBOptions(options_), FLAGS_db,
2371                                &existing_column_families);  // ignore errors
2372     if (!s.ok()) {
2373       // DB doesn't exist
2374       assert(existing_column_families.empty());
2375       assert(column_family_names_.empty());
2376       column_family_names_.push_back(kDefaultColumnFamilyName);
2377     } else if (column_family_names_.empty()) {
2378       // this is the first call to the function Open()
2379       column_family_names_ = existing_column_families;
2380     } else {
2381       // this is a reopen. just assert that existing column_family_names are
2382       // equivalent to what we remember
2383       auto sorted_cfn = column_family_names_;
2384       std::sort(sorted_cfn.begin(), sorted_cfn.end());
2385       std::sort(existing_column_families.begin(),
2386                 existing_column_families.end());
2387       if (sorted_cfn != existing_column_families) {
2388         fprintf(stderr, "Expected column families differ from the existing:\n");
2389         fprintf(stderr, "Expected: {");
2390         for (auto cf : sorted_cfn) {
2391           fprintf(stderr, "%s ", cf.c_str());
2392         }
2393         fprintf(stderr, "}\n");
2394         fprintf(stderr, "Existing: {");
2395         for (auto cf : existing_column_families) {
2396           fprintf(stderr, "%s ", cf.c_str());
2397         }
2398         fprintf(stderr, "}\n");
2399       }
2400       assert(sorted_cfn == existing_column_families);
2401     }
2402     std::vector<ColumnFamilyDescriptor> cf_descriptors;
2403     for (auto name : column_family_names_) {
2404       if (name != kDefaultColumnFamilyName) {
2405         new_column_family_name_ =
2406             std::max(new_column_family_name_.load(), std::stoi(name) + 1);
2407       }
2408       cf_descriptors.emplace_back(name, ColumnFamilyOptions(options_));
2409     }
2410     while (cf_descriptors.size() < (size_t)FLAGS_column_families) {
2411       std::string name = ToString(new_column_family_name_.load());
2412       new_column_family_name_++;
2413       cf_descriptors.emplace_back(name, ColumnFamilyOptions(options_));
2414       column_family_names_.push_back(name);
2415     }
2416     options_.listeners.clear();
2417     options_.listeners.emplace_back(
2418         new DbStressListener(FLAGS_db, options_.db_paths, cf_descriptors));
2419     options_.create_missing_column_families = true;
2420     if (!FLAGS_use_txn) {
2421 #ifndef NDEBUG
2422       // Determine whether we need to ingest file metadata write failures
2423       // during DB reopen. If it does, enable it.
2424       // Only ingest metadata error if it is reopening, as initial open
2425       // failure doesn't need to be handled.
2426       // TODO cover transaction DB is not covered in this fault test too.
2427       bool ingest_meta_error =
2428           FLAGS_open_metadata_write_fault_one_in &&
2429           fault_fs_guard
2430               ->FileExists(FLAGS_db + "/CURRENT", IOOptions(), nullptr)
2431               .ok();
2432       if (ingest_meta_error) {
2433         fault_fs_guard->EnableMetadataWriteErrorInjection();
2434         fault_fs_guard->SetRandomMetadataWriteError(
2435             FLAGS_open_metadata_write_fault_one_in);
2436       }
2437       while (true) {
2438 #endif  // NDEBUG
2439 #ifndef ROCKSDB_LITE
2440         // StackableDB-based BlobDB
2441         if (FLAGS_use_blob_db) {
2442           blob_db::BlobDBOptions blob_db_options;
2443           blob_db_options.min_blob_size = FLAGS_blob_db_min_blob_size;
2444           blob_db_options.bytes_per_sync = FLAGS_blob_db_bytes_per_sync;
2445           blob_db_options.blob_file_size = FLAGS_blob_db_file_size;
2446           blob_db_options.enable_garbage_collection = FLAGS_blob_db_enable_gc;
2447           blob_db_options.garbage_collection_cutoff = FLAGS_blob_db_gc_cutoff;
2448 
2449           blob_db::BlobDB* blob_db = nullptr;
2450           s = blob_db::BlobDB::Open(options_, blob_db_options, FLAGS_db,
2451                                     cf_descriptors, &column_families_,
2452                                     &blob_db);
2453           if (s.ok()) {
2454             db_ = blob_db;
2455           }
2456         } else
2457 #endif  // !ROCKSDB_LITE
2458         {
2459           if (db_preload_finished_.load() && FLAGS_read_only) {
2460             s = DB::OpenForReadOnly(DBOptions(options_), FLAGS_db,
2461                                     cf_descriptors, &column_families_, &db_);
2462           } else {
2463             s = DB::Open(DBOptions(options_), FLAGS_db, cf_descriptors,
2464                          &column_families_, &db_);
2465           }
2466         }
2467 
2468 #ifndef NDEBUG
2469         if (ingest_meta_error) {
2470           fault_fs_guard->DisableMetadataWriteErrorInjection();
2471           if (s.ok()) {
2472             // Ingested errors might happen in background compactions. We
2473             // wait for all compactions to finish to make sure DB is in
2474             // clean state before executing queries.
2475             s = static_cast_with_check<DBImpl>(db_->GetRootDB())
2476                     ->TEST_WaitForCompact(true);
2477             if (!s.ok()) {
2478               delete db_;
2479             }
2480           }
2481           if (!s.ok()) {
2482             // After failure to opening a DB due to IO error, retry should
2483             // successfully open the DB with correct data if no IO error shows
2484             // up.
2485             ingest_meta_error = false;
2486 
2487             Random rand(static_cast<uint32_t>(FLAGS_seed));
2488             if (rand.OneIn(2)) {
2489               fault_fs_guard->DeleteFilesCreatedAfterLastDirSync(IOOptions(),
2490                                                                  nullptr);
2491             }
2492             if (rand.OneIn(3)) {
2493               fault_fs_guard->DropUnsyncedFileData();
2494             } else if (rand.OneIn(2)) {
2495               fault_fs_guard->DropRandomUnsyncedFileData(&rand);
2496             }
2497             continue;
2498           }
2499         }
2500         break;
2501       }
2502 #endif  // NDEBUG
2503     } else {
2504 #ifndef ROCKSDB_LITE
2505       TransactionDBOptions txn_db_options;
2506       assert(FLAGS_txn_write_policy <= TxnDBWritePolicy::WRITE_UNPREPARED);
2507       txn_db_options.write_policy =
2508           static_cast<TxnDBWritePolicy>(FLAGS_txn_write_policy);
2509       if (FLAGS_unordered_write) {
2510         assert(txn_db_options.write_policy == TxnDBWritePolicy::WRITE_PREPARED);
2511         options_.unordered_write = true;
2512         options_.two_write_queues = true;
2513         txn_db_options.skip_concurrency_control = true;
2514       }
2515       s = TransactionDB::Open(options_, txn_db_options, FLAGS_db,
2516                               cf_descriptors, &column_families_, &txn_db_);
2517       if (!s.ok()) {
2518         fprintf(stderr, "Error in opening the TransactionDB [%s]\n",
2519                 s.ToString().c_str());
2520         fflush(stderr);
2521       }
2522       assert(s.ok());
2523       db_ = txn_db_;
2524       // after a crash, rollback to commit recovered transactions
2525       std::vector<Transaction*> trans;
2526       txn_db_->GetAllPreparedTransactions(&trans);
2527       Random rand(static_cast<uint32_t>(FLAGS_seed));
2528       for (auto txn : trans) {
2529         if (rand.OneIn(2)) {
2530           s = txn->Commit();
2531           assert(s.ok());
2532         } else {
2533           s = txn->Rollback();
2534           assert(s.ok());
2535         }
2536         delete txn;
2537       }
2538       trans.clear();
2539       txn_db_->GetAllPreparedTransactions(&trans);
2540       assert(trans.size() == 0);
2541 #endif
2542     }
2543     assert(!s.ok() || column_families_.size() ==
2544                           static_cast<size_t>(FLAGS_column_families));
2545 
2546     if (FLAGS_test_secondary) {
2547 #ifndef ROCKSDB_LITE
2548       secondaries_.resize(FLAGS_threads);
2549       std::fill(secondaries_.begin(), secondaries_.end(), nullptr);
2550       secondary_cfh_lists_.clear();
2551       secondary_cfh_lists_.resize(FLAGS_threads);
2552       Options tmp_opts;
2553       // TODO(yanqin) support max_open_files != -1 for secondary instance.
2554       tmp_opts.max_open_files = -1;
2555       tmp_opts.statistics = dbstats_secondaries;
2556       tmp_opts.env = db_stress_env;
2557       for (size_t i = 0; i != static_cast<size_t>(FLAGS_threads); ++i) {
2558         const std::string secondary_path =
2559             FLAGS_secondaries_base + "/" + std::to_string(i);
2560         s = DB::OpenAsSecondary(tmp_opts, FLAGS_db, secondary_path,
2561                                 cf_descriptors, &secondary_cfh_lists_[i],
2562                                 &secondaries_[i]);
2563         if (!s.ok()) {
2564           break;
2565         }
2566       }
2567       assert(s.ok());
2568 #else
2569       fprintf(stderr, "Secondary is not supported in RocksDBLite\n");
2570       exit(1);
2571 #endif
2572     }
2573     if (FLAGS_continuous_verification_interval > 0 && !cmp_db_) {
2574       Options tmp_opts;
2575       // TODO(yanqin) support max_open_files != -1 for secondary instance.
2576       tmp_opts.max_open_files = -1;
2577       tmp_opts.env = db_stress_env;
2578       std::string secondary_path = FLAGS_secondaries_base + "/cmp_database";
2579       s = DB::OpenAsSecondary(tmp_opts, FLAGS_db, secondary_path,
2580                               cf_descriptors, &cmp_cfhs_, &cmp_db_);
2581       assert(!s.ok() ||
2582              cmp_cfhs_.size() == static_cast<size_t>(FLAGS_column_families));
2583     }
2584   } else {
2585 #ifndef ROCKSDB_LITE
2586     DBWithTTL* db_with_ttl;
2587     s = DBWithTTL::Open(options_, FLAGS_db, &db_with_ttl, FLAGS_ttl);
2588     db_ = db_with_ttl;
2589     if (FLAGS_test_secondary) {
2590       secondaries_.resize(FLAGS_threads);
2591       std::fill(secondaries_.begin(), secondaries_.end(), nullptr);
2592       Options tmp_opts;
2593       tmp_opts.env = options_.env;
2594       // TODO(yanqin) support max_open_files != -1 for secondary instance.
2595       tmp_opts.max_open_files = -1;
2596       for (size_t i = 0; i != static_cast<size_t>(FLAGS_threads); ++i) {
2597         const std::string secondary_path =
2598             FLAGS_secondaries_base + "/" + std::to_string(i);
2599         s = DB::OpenAsSecondary(tmp_opts, FLAGS_db, secondary_path,
2600                                 &secondaries_[i]);
2601         if (!s.ok()) {
2602           break;
2603         }
2604       }
2605     }
2606 #else
2607     fprintf(stderr, "TTL is not supported in RocksDBLite\n");
2608     exit(1);
2609 #endif
2610   }
2611   if (!s.ok()) {
2612     fprintf(stderr, "open error: %s\n", s.ToString().c_str());
2613     exit(1);
2614   }
2615 }
2616 
Reopen(ThreadState * thread)2617 void StressTest::Reopen(ThreadState* thread) {
2618 #ifndef ROCKSDB_LITE
2619   // BG jobs in WritePrepared must be canceled first because i) they can access
2620   // the db via a callbac ii) they hold on to a snapshot and the upcoming
2621   // ::Close would complain about it.
2622   const bool write_prepared = FLAGS_use_txn && FLAGS_txn_write_policy != 0;
2623   bool bg_canceled = false;
2624   if (write_prepared || thread->rand.OneIn(2)) {
2625     const bool wait =
2626         write_prepared || static_cast<bool>(thread->rand.OneIn(2));
2627     CancelAllBackgroundWork(db_, wait);
2628     bg_canceled = wait;
2629   }
2630   assert(!write_prepared || bg_canceled);
2631   (void) bg_canceled;
2632 #else
2633   (void) thread;
2634 #endif
2635 
2636   for (auto cf : column_families_) {
2637     delete cf;
2638   }
2639   column_families_.clear();
2640 
2641 #ifndef ROCKSDB_LITE
2642   if (thread->rand.OneIn(2)) {
2643     Status s = db_->Close();
2644     if (!s.ok()) {
2645       fprintf(stderr, "Non-ok close status: %s\n", s.ToString().c_str());
2646       fflush(stderr);
2647     }
2648     assert(s.ok());
2649   }
2650 #endif
2651   delete db_;
2652   db_ = nullptr;
2653 #ifndef ROCKSDB_LITE
2654   txn_db_ = nullptr;
2655 #endif
2656 
2657   assert(secondaries_.size() == secondary_cfh_lists_.size());
2658   size_t n = secondaries_.size();
2659   for (size_t i = 0; i != n; ++i) {
2660     for (auto* cf : secondary_cfh_lists_[i]) {
2661       delete cf;
2662     }
2663     secondary_cfh_lists_[i].clear();
2664     delete secondaries_[i];
2665   }
2666   secondaries_.clear();
2667 
2668   num_times_reopened_++;
2669   auto now = clock_->NowMicros();
2670   fprintf(stdout, "%s Reopening database for the %dth time\n",
2671           clock_->TimeToString(now / 1000000).c_str(), num_times_reopened_);
2672   Open();
2673 }
2674 
CheckAndSetOptionsForUserTimestamp()2675 void StressTest::CheckAndSetOptionsForUserTimestamp() {
2676   assert(FLAGS_user_timestamp_size > 0);
2677   const Comparator* const cmp = test::ComparatorWithU64Ts();
2678   assert(cmp);
2679   if (FLAGS_user_timestamp_size != cmp->timestamp_size()) {
2680     fprintf(stderr,
2681             "Only -user_timestamp_size=%d is supported in stress test.\n",
2682             static_cast<int>(cmp->timestamp_size()));
2683     exit(1);
2684   }
2685   if (FLAGS_nooverwritepercent > 0) {
2686     fprintf(stderr,
2687             "-nooverwritepercent must be 0 because SingleDelete must be "
2688             "disabled.\n");
2689     exit(1);
2690   }
2691   if (FLAGS_use_merge || FLAGS_use_full_merge_v1) {
2692     fprintf(stderr, "Merge does not support timestamp yet.\n");
2693     exit(1);
2694   }
2695   if (FLAGS_delrangepercent > 0) {
2696     fprintf(stderr, "DeleteRange does not support timestamp yet.\n");
2697     exit(1);
2698   }
2699   if (FLAGS_use_txn) {
2700     fprintf(stderr, "TransactionDB does not support timestamp yet.\n");
2701     exit(1);
2702   }
2703   if (FLAGS_read_only) {
2704     fprintf(stderr, "When opened as read-only, timestamp not supported.\n");
2705     exit(1);
2706   }
2707   if (FLAGS_test_secondary || FLAGS_secondary_catch_up_one_in > 0 ||
2708       FLAGS_continuous_verification_interval > 0) {
2709     fprintf(stderr, "Secondary instance does not support timestamp.\n");
2710     exit(1);
2711   }
2712   if (FLAGS_checkpoint_one_in > 0) {
2713     fprintf(stderr,
2714             "-checkpoint_one_in=%d requires "
2715             "DBImplReadOnly, which is not supported with timestamp\n",
2716             FLAGS_checkpoint_one_in);
2717     exit(1);
2718   }
2719 #ifndef ROCKSDB_LITE
2720   if (FLAGS_enable_blob_files || FLAGS_use_blob_db) {
2721     fprintf(stderr, "BlobDB not supported with timestamp.\n");
2722     exit(1);
2723   }
2724 #endif  // !ROCKSDB_LITE
2725   if (FLAGS_enable_compaction_filter) {
2726     fprintf(stderr, "CompactionFilter not supported with timestamp.\n");
2727     exit(1);
2728   }
2729   if (FLAGS_test_cf_consistency || FLAGS_test_batches_snapshots) {
2730     fprintf(stderr,
2731             "Due to per-key ts-seq ordering constraint, only the (default) "
2732             "non-batched test is supported with timestamp.\n");
2733     exit(1);
2734   }
2735   if (FLAGS_ingest_external_file_one_in > 0) {
2736     fprintf(stderr, "Bulk loading may not support timestamp yet.\n");
2737     exit(1);
2738   }
2739   options_.comparator = cmp;
2740 }
2741 }  // namespace ROCKSDB_NAMESPACE
2742 #endif  // GFLAGS
2743