1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors
9 
10 #ifdef GFLAGS
11 #pragma once
12 
13 #include "db_stress_tool/db_stress_stat.h"
14 #include "db_stress_tool/expected_state.h"
15 // SyncPoint is not supported in Released Windows Mode.
16 #if !(defined NDEBUG) || !defined(OS_WIN)
17 #include "test_util/sync_point.h"
18 #endif  // !(defined NDEBUG) || !defined(OS_WIN)
19 #include "util/gflags_compat.h"
20 
21 DECLARE_uint64(seed);
22 DECLARE_int64(max_key);
23 DECLARE_uint64(log2_keys_per_lock);
24 DECLARE_int32(threads);
25 DECLARE_int32(column_families);
26 DECLARE_int32(nooverwritepercent);
27 DECLARE_string(expected_values_dir);
28 DECLARE_int32(clear_column_family_one_in);
29 DECLARE_bool(test_batches_snapshots);
30 DECLARE_int32(compaction_thread_pool_adjust_interval);
31 DECLARE_int32(continuous_verification_interval);
32 DECLARE_int32(read_fault_one_in);
33 DECLARE_int32(write_fault_one_in);
34 DECLARE_int32(open_metadata_write_fault_one_in);
35 DECLARE_int32(open_write_fault_one_in);
36 DECLARE_int32(open_read_fault_one_in);
37 
38 DECLARE_int32(injest_error_severity);
39 
40 namespace ROCKSDB_NAMESPACE {
41 class StressTest;
42 
43 // State shared by all concurrent executions of the same benchmark.
44 class SharedState {
45  public:
46   // indicates a key may have any value (or not be present) as an operation on
47   // it is incomplete.
48   static const uint32_t UNKNOWN_SENTINEL;
49   // indicates a key should definitely be deleted
50   static const uint32_t DELETION_SENTINEL;
51 
52   // Errors when reading filter blocks are ignored, so we use a thread
53   // local variable updated via sync points to keep track of errors injected
54   // while reading filter blocks in order to ignore the Get/MultiGet result
55   // for those calls
56 #if defined(ROCKSDB_SUPPORT_THREAD_LOCAL)
57 #if defined(OS_SOLARIS)
58   static __thread bool ignore_read_error;
59 #else
60   static thread_local bool ignore_read_error;
61 #endif // OS_SOLARIS
62 #else
63   static bool ignore_read_error;
64 #endif // ROCKSDB_SUPPORT_THREAD_LOCAL
65 
SharedState(Env *,StressTest * stress_test)66   SharedState(Env* /*env*/, StressTest* stress_test)
67       : cv_(&mu_),
68         seed_(static_cast<uint32_t>(FLAGS_seed)),
69         max_key_(FLAGS_max_key),
70         log2_keys_per_lock_(static_cast<uint32_t>(FLAGS_log2_keys_per_lock)),
71         num_threads_(FLAGS_threads),
72         num_initialized_(0),
73         num_populated_(0),
74         vote_reopen_(0),
75         num_done_(0),
76         start_(false),
77         start_verify_(false),
78         num_bg_threads_(0),
79         should_stop_bg_thread_(false),
80         bg_thread_finished_(0),
81         stress_test_(stress_test),
82         verification_failure_(false),
83         should_stop_test_(false),
84         no_overwrite_ids_(FLAGS_column_families),
85         expected_state_manager_(nullptr),
86         printing_verification_results_(false) {
87     // Pick random keys in each column family that will not experience
88     // overwrite
89 
90     fprintf(stdout, "Choosing random keys with no overwrite\n");
91     Random64 rnd(seed_);
92     // Start with the identity permutation. Subsequent iterations of
93     // for loop below will start with perm of previous for loop
94     int64_t* permutation = new int64_t[max_key_];
95     for (int64_t i = 0; i < max_key_; i++) {
96       permutation[i] = i;
97     }
98     // Now do the Knuth shuffle
99     int64_t num_no_overwrite_keys = (max_key_ * FLAGS_nooverwritepercent) / 100;
100     // Only need to figure out first num_no_overwrite_keys of permutation
101     no_overwrite_ids_.reserve(num_no_overwrite_keys);
102     for (int64_t i = 0; i < num_no_overwrite_keys; i++) {
103       int64_t rand_index = i + rnd.Next() % (max_key_ - i);
104       // Swap i and rand_index;
105       int64_t temp = permutation[i];
106       permutation[i] = permutation[rand_index];
107       permutation[rand_index] = temp;
108       // Fill no_overwrite_ids_ with the first num_no_overwrite_keys of
109       // permutation
110       no_overwrite_ids_.insert(permutation[i]);
111     }
112     delete[] permutation;
113 
114     Status status;
115     // TODO: We should introduce a way to explicitly disable verification
116     // during shutdown. When that is disabled and FLAGS_expected_values_dir
117     // is empty (disabling verification at startup), we can skip tracking
118     // expected state. Only then should we permit bypassing the below feature
119     // compatibility checks.
120     if (!FLAGS_expected_values_dir.empty()) {
121       if (!std::atomic<uint32_t>{}.is_lock_free()) {
122         status = Status::InvalidArgument(
123             "Cannot use --expected_values_dir on platforms without lock-free "
124             "std::atomic<uint32_t>");
125       }
126       if (status.ok() && FLAGS_clear_column_family_one_in > 0) {
127         status = Status::InvalidArgument(
128             "Cannot use --expected_values_dir on when "
129             "--clear_column_family_one_in is greater than zero.");
130       }
131     }
132     if (status.ok()) {
133       if (FLAGS_expected_values_dir.empty()) {
134         expected_state_manager_.reset(
135             new AnonExpectedStateManager(FLAGS_max_key, FLAGS_column_families));
136       } else {
137         expected_state_manager_.reset(new FileExpectedStateManager(
138             FLAGS_max_key, FLAGS_column_families, FLAGS_expected_values_dir));
139       }
140       status = expected_state_manager_->Open();
141     }
142     if (!status.ok()) {
143       fprintf(stderr, "Failed setting up expected state with error: %s\n",
144               status.ToString().c_str());
145       exit(1);
146     }
147 
148     if (FLAGS_test_batches_snapshots) {
149       fprintf(stdout, "No lock creation because test_batches_snapshots set\n");
150       return;
151     }
152 
153     long num_locks = static_cast<long>(max_key_ >> log2_keys_per_lock_);
154     if (max_key_ & ((1 << log2_keys_per_lock_) - 1)) {
155       num_locks++;
156     }
157     fprintf(stdout, "Creating %ld locks\n", num_locks * FLAGS_column_families);
158     key_locks_.resize(FLAGS_column_families);
159 
160     for (int i = 0; i < FLAGS_column_families; ++i) {
161       key_locks_[i].resize(num_locks);
162       for (auto& ptr : key_locks_[i]) {
163         ptr.reset(new port::Mutex);
164       }
165     }
166     if (FLAGS_compaction_thread_pool_adjust_interval > 0) {
167       ++num_bg_threads_;
168       fprintf(stdout, "Starting compaction_thread_pool_adjust_thread\n");
169     }
170     if (FLAGS_continuous_verification_interval > 0) {
171       ++num_bg_threads_;
172       fprintf(stdout, "Starting continuous_verification_thread\n");
173     }
174 #ifndef NDEBUG
175     if (FLAGS_read_fault_one_in) {
176       SyncPoint::GetInstance()->SetCallBack("FaultInjectionIgnoreError",
177                                             IgnoreReadErrorCallback);
178       SyncPoint::GetInstance()->EnableProcessing();
179     }
180 #endif // NDEBUG
181   }
182 
~SharedState()183   ~SharedState() {
184 #ifndef NDEBUG
185     if (FLAGS_read_fault_one_in) {
186       SyncPoint::GetInstance()->ClearAllCallBacks();
187       SyncPoint::GetInstance()->DisableProcessing();
188     }
189 #endif
190   }
191 
GetMutex()192   port::Mutex* GetMutex() { return &mu_; }
193 
GetCondVar()194   port::CondVar* GetCondVar() { return &cv_; }
195 
GetStressTest()196   StressTest* GetStressTest() const { return stress_test_; }
197 
GetMaxKey()198   int64_t GetMaxKey() const { return max_key_; }
199 
GetNumThreads()200   uint32_t GetNumThreads() const { return num_threads_; }
201 
IncInitialized()202   void IncInitialized() { num_initialized_++; }
203 
IncOperated()204   void IncOperated() { num_populated_++; }
205 
IncDone()206   void IncDone() { num_done_++; }
207 
IncVotedReopen()208   void IncVotedReopen() { vote_reopen_ = (vote_reopen_ + 1) % num_threads_; }
209 
AllInitialized()210   bool AllInitialized() const { return num_initialized_ >= num_threads_; }
211 
AllOperated()212   bool AllOperated() const { return num_populated_ >= num_threads_; }
213 
AllDone()214   bool AllDone() const { return num_done_ >= num_threads_; }
215 
AllVotedReopen()216   bool AllVotedReopen() { return (vote_reopen_ == 0); }
217 
SetStart()218   void SetStart() { start_ = true; }
219 
SetStartVerify()220   void SetStartVerify() { start_verify_ = true; }
221 
Started()222   bool Started() const { return start_; }
223 
VerifyStarted()224   bool VerifyStarted() const { return start_verify_; }
225 
SetVerificationFailure()226   void SetVerificationFailure() { verification_failure_.store(true); }
227 
HasVerificationFailedYet()228   bool HasVerificationFailedYet() const { return verification_failure_.load(); }
229 
SetShouldStopTest()230   void SetShouldStopTest() { should_stop_test_.store(true); }
231 
ShouldStopTest()232   bool ShouldStopTest() const { return should_stop_test_.load(); }
233 
234   // Returns a lock covering `key` in `cf`.
GetMutexForKey(int cf,int64_t key)235   port::Mutex* GetMutexForKey(int cf, int64_t key) {
236     return key_locks_[cf][key >> log2_keys_per_lock_].get();
237   }
238 
239   // Acquires locks for all keys in `cf`.
LockColumnFamily(int cf)240   void LockColumnFamily(int cf) {
241     for (auto& mutex : key_locks_[cf]) {
242       mutex->Lock();
243     }
244   }
245 
246   // Releases locks for all keys in `cf`.
UnlockColumnFamily(int cf)247   void UnlockColumnFamily(int cf) {
248     for (auto& mutex : key_locks_[cf]) {
249       mutex->Unlock();
250     }
251   }
252 
253   // Requires external locking covering all keys in `cf`.
ClearColumnFamily(int cf)254   void ClearColumnFamily(int cf) {
255     return expected_state_manager_->ClearColumnFamily(cf);
256   }
257 
258   // @param pending True if the update may have started but is not yet
259   //    guaranteed finished. This is useful for crash-recovery testing when the
260   //    process may crash before updating the expected values array.
261   //
262   // Requires external locking covering `key` in `cf`.
Put(int cf,int64_t key,uint32_t value_base,bool pending)263   void Put(int cf, int64_t key, uint32_t value_base, bool pending) {
264     return expected_state_manager_->Put(cf, key, value_base, pending);
265   }
266 
267   // Requires external locking covering `key` in `cf`.
Get(int cf,int64_t key)268   uint32_t Get(int cf, int64_t key) const {
269     return expected_state_manager_->Get(cf, key);
270   }
271 
272   // @param pending See comment above Put()
273   // Returns true if the key was not yet deleted.
274   //
275   // Requires external locking covering `key` in `cf`.
Delete(int cf,int64_t key,bool pending)276   bool Delete(int cf, int64_t key, bool pending) {
277     return expected_state_manager_->Delete(cf, key, pending);
278   }
279 
280   // @param pending See comment above Put()
281   // Returns true if the key was not yet deleted.
282   //
283   // Requires external locking covering `key` in `cf`.
SingleDelete(int cf,int64_t key,bool pending)284   bool SingleDelete(int cf, int64_t key, bool pending) {
285     return expected_state_manager_->Delete(cf, key, pending);
286   }
287 
288   // @param pending See comment above Put()
289   // Returns number of keys deleted by the call.
290   //
291   // Requires external locking covering keys in `[begin_key, end_key)` in `cf`.
DeleteRange(int cf,int64_t begin_key,int64_t end_key,bool pending)292   int DeleteRange(int cf, int64_t begin_key, int64_t end_key, bool pending) {
293     return expected_state_manager_->DeleteRange(cf, begin_key, end_key,
294                                                 pending);
295   }
296 
AllowsOverwrite(int64_t key)297   bool AllowsOverwrite(int64_t key) {
298     return no_overwrite_ids_.find(key) == no_overwrite_ids_.end();
299   }
300 
301   // Requires external locking covering `key` in `cf`.
Exists(int cf,int64_t key)302   bool Exists(int cf, int64_t key) {
303     return expected_state_manager_->Exists(cf, key);
304   }
305 
GetSeed()306   uint32_t GetSeed() const { return seed_; }
307 
SetShouldStopBgThread()308   void SetShouldStopBgThread() { should_stop_bg_thread_ = true; }
309 
ShouldStopBgThread()310   bool ShouldStopBgThread() { return should_stop_bg_thread_; }
311 
IncBgThreadsFinished()312   void IncBgThreadsFinished() { ++bg_thread_finished_; }
313 
BgThreadsFinished()314   bool BgThreadsFinished() const {
315     return bg_thread_finished_ == num_bg_threads_;
316   }
317 
ShouldVerifyAtBeginning()318   bool ShouldVerifyAtBeginning() const {
319     return !FLAGS_expected_values_dir.empty();
320   }
321 
PrintingVerificationResults()322   bool PrintingVerificationResults() {
323     bool tmp = false;
324     return !printing_verification_results_.compare_exchange_strong(
325         tmp, true, std::memory_order_relaxed);
326   }
327 
FinishPrintingVerificationResults()328   void FinishPrintingVerificationResults() {
329     printing_verification_results_.store(false, std::memory_order_relaxed);
330   }
331 
332  private:
IgnoreReadErrorCallback(void *)333   static void IgnoreReadErrorCallback(void*) {
334     ignore_read_error = true;
335   }
336 
337   port::Mutex mu_;
338   port::CondVar cv_;
339   const uint32_t seed_;
340   const int64_t max_key_;
341   const uint32_t log2_keys_per_lock_;
342   const int num_threads_;
343   long num_initialized_;
344   long num_populated_;
345   long vote_reopen_;
346   long num_done_;
347   bool start_;
348   bool start_verify_;
349   int num_bg_threads_;
350   bool should_stop_bg_thread_;
351   int bg_thread_finished_;
352   StressTest* stress_test_;
353   std::atomic<bool> verification_failure_;
354   std::atomic<bool> should_stop_test_;
355 
356   // Keys that should not be overwritten
357   std::unordered_set<size_t> no_overwrite_ids_;
358 
359   std::unique_ptr<ExpectedStateManager> expected_state_manager_;
360   // Has to make it owned by a smart ptr as port::Mutex is not copyable
361   // and storing it in the container may require copying depending on the impl.
362   std::vector<std::vector<std::unique_ptr<port::Mutex>>> key_locks_;
363   std::atomic<bool> printing_verification_results_;
364 };
365 
366 // Per-thread state for concurrent executions of the same benchmark.
367 struct ThreadState {
368   uint32_t tid;  // 0..n-1
369   Random rand;   // Has different seeds for different threads
370   SharedState* shared;
371   Stats stats;
372   struct SnapshotState {
373     const Snapshot* snapshot;
374     // The cf from which we did a Get at this snapshot
375     int cf_at;
376     // The name of the cf at the time that we did a read
377     std::string cf_at_name;
378     // The key with which we did a Get at this snapshot
379     std::string key;
380     // The status of the Get
381     Status status;
382     // The value of the Get
383     std::string value;
384     // optional state of all keys in the db
385     std::vector<bool>* key_vec;
386 
387     std::string timestamp;
388   };
389   std::queue<std::pair<uint64_t, SnapshotState>> snapshot_queue;
390 
ThreadStateThreadState391   ThreadState(uint32_t index, SharedState* _shared)
392       : tid(index), rand(1000 + index + _shared->GetSeed()), shared(_shared) {}
393 };
394 }  // namespace ROCKSDB_NAMESPACE
395 #endif  // GFLAGS
396