1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors
9 
10 #ifdef GFLAGS
11 #pragma once
12 
13 #include "db_stress_tool/db_stress_stat.h"
14 #include "util/gflags_compat.h"
15 
16 DECLARE_uint64(seed);
17 DECLARE_int64(max_key);
18 DECLARE_uint64(log2_keys_per_lock);
19 DECLARE_int32(threads);
20 DECLARE_int32(column_families);
21 DECLARE_int32(nooverwritepercent);
22 DECLARE_string(expected_values_path);
23 DECLARE_int32(clear_column_family_one_in);
24 DECLARE_bool(test_batches_snapshots);
25 DECLARE_int32(compaction_thread_pool_adjust_interval);
26 DECLARE_int32(continuous_verification_interval);
27 
28 namespace ROCKSDB_NAMESPACE {
29 class StressTest;
30 
31 // State shared by all concurrent executions of the same benchmark.
32 class SharedState {
33  public:
34   // indicates a key may have any value (or not be present) as an operation on
35   // it is incomplete.
36   static const uint32_t UNKNOWN_SENTINEL;
37   // indicates a key should definitely be deleted
38   static const uint32_t DELETION_SENTINEL;
39 
SharedState(Env * env,StressTest * stress_test)40   SharedState(Env* env, StressTest* stress_test)
41       : cv_(&mu_),
42         seed_(static_cast<uint32_t>(FLAGS_seed)),
43         max_key_(FLAGS_max_key),
44         log2_keys_per_lock_(static_cast<uint32_t>(FLAGS_log2_keys_per_lock)),
45         num_threads_(FLAGS_threads),
46         num_initialized_(0),
47         num_populated_(0),
48         vote_reopen_(0),
49         num_done_(0),
50         start_(false),
51         start_verify_(false),
52         num_bg_threads_(0),
53         should_stop_bg_thread_(false),
54         bg_thread_finished_(0),
55         stress_test_(stress_test),
56         verification_failure_(false),
57         should_stop_test_(false),
58         no_overwrite_ids_(FLAGS_column_families),
59         values_(nullptr),
60         printing_verification_results_(false) {
61     // Pick random keys in each column family that will not experience
62     // overwrite
63 
64     fprintf(stdout, "Choosing random keys with no overwrite\n");
65     Random64 rnd(seed_);
66     // Start with the identity permutation. Subsequent iterations of
67     // for loop below will start with perm of previous for loop
68     int64_t* permutation = new int64_t[max_key_];
69     for (int64_t i = 0; i < max_key_; i++) {
70       permutation[i] = i;
71     }
72     // Now do the Knuth shuffle
73     int64_t num_no_overwrite_keys = (max_key_ * FLAGS_nooverwritepercent) / 100;
74     // Only need to figure out first num_no_overwrite_keys of permutation
75     no_overwrite_ids_.reserve(num_no_overwrite_keys);
76     for (int64_t i = 0; i < num_no_overwrite_keys; i++) {
77       int64_t rand_index = i + rnd.Next() % (max_key_ - i);
78       // Swap i and rand_index;
79       int64_t temp = permutation[i];
80       permutation[i] = permutation[rand_index];
81       permutation[rand_index] = temp;
82       // Fill no_overwrite_ids_ with the first num_no_overwrite_keys of
83       // permutation
84       no_overwrite_ids_.insert(permutation[i]);
85     }
86     delete[] permutation;
87 
88     size_t expected_values_size =
89         sizeof(std::atomic<uint32_t>) * FLAGS_column_families * max_key_;
90     bool values_init_needed = false;
91     Status status;
92     if (!FLAGS_expected_values_path.empty()) {
93       if (!std::atomic<uint32_t>{}.is_lock_free()) {
94         status = Status::InvalidArgument(
95             "Cannot use --expected_values_path on platforms without lock-free "
96             "std::atomic<uint32_t>");
97       }
98       if (status.ok() && FLAGS_clear_column_family_one_in > 0) {
99         status = Status::InvalidArgument(
100             "Cannot use --expected_values_path on when "
101             "--clear_column_family_one_in is greater than zero.");
102       }
103       uint64_t size = 0;
104       if (status.ok()) {
105         status = env->GetFileSize(FLAGS_expected_values_path, &size);
106       }
107       std::unique_ptr<WritableFile> wfile;
108       if (status.ok() && size == 0) {
109         const EnvOptions soptions;
110         status =
111             env->NewWritableFile(FLAGS_expected_values_path, &wfile, soptions);
112       }
113       if (status.ok() && size == 0) {
114         std::string buf(expected_values_size, '\0');
115         status = wfile->Append(buf);
116         values_init_needed = true;
117       }
118       if (status.ok()) {
119         status = env->NewMemoryMappedFileBuffer(FLAGS_expected_values_path,
120                                                 &expected_mmap_buffer_);
121       }
122       if (status.ok()) {
123         assert(expected_mmap_buffer_->GetLen() == expected_values_size);
124         values_ = static_cast<std::atomic<uint32_t>*>(
125             expected_mmap_buffer_->GetBase());
126         assert(values_ != nullptr);
127       } else {
128         fprintf(stderr, "Failed opening shared file '%s' with error: %s\n",
129                 FLAGS_expected_values_path.c_str(), status.ToString().c_str());
130         assert(values_ == nullptr);
131       }
132     }
133     if (values_ == nullptr) {
134       values_allocation_.reset(
135           new std::atomic<uint32_t>[FLAGS_column_families * max_key_]);
136       values_ = &values_allocation_[0];
137       values_init_needed = true;
138     }
139     assert(values_ != nullptr);
140     if (values_init_needed) {
141       for (int i = 0; i < FLAGS_column_families; ++i) {
142         for (int j = 0; j < max_key_; ++j) {
143           Delete(i, j, false /* pending */);
144         }
145       }
146     }
147 
148     if (FLAGS_test_batches_snapshots) {
149       fprintf(stdout, "No lock creation because test_batches_snapshots set\n");
150       return;
151     }
152 
153     long num_locks = static_cast<long>(max_key_ >> log2_keys_per_lock_);
154     if (max_key_ & ((1 << log2_keys_per_lock_) - 1)) {
155       num_locks++;
156     }
157     fprintf(stdout, "Creating %ld locks\n", num_locks * FLAGS_column_families);
158     key_locks_.resize(FLAGS_column_families);
159 
160     for (int i = 0; i < FLAGS_column_families; ++i) {
161       key_locks_[i].resize(num_locks);
162       for (auto& ptr : key_locks_[i]) {
163         ptr.reset(new port::Mutex);
164       }
165     }
166     if (FLAGS_compaction_thread_pool_adjust_interval > 0) {
167       ++num_bg_threads_;
168       fprintf(stdout, "Starting compaction_thread_pool_adjust_thread\n");
169     }
170     if (FLAGS_continuous_verification_interval > 0) {
171       ++num_bg_threads_;
172       fprintf(stdout, "Starting continuous_verification_thread\n");
173     }
174   }
175 
~SharedState()176   ~SharedState() {}
177 
GetMutex()178   port::Mutex* GetMutex() { return &mu_; }
179 
GetCondVar()180   port::CondVar* GetCondVar() { return &cv_; }
181 
GetStressTest()182   StressTest* GetStressTest() const { return stress_test_; }
183 
GetMaxKey()184   int64_t GetMaxKey() const { return max_key_; }
185 
GetNumThreads()186   uint32_t GetNumThreads() const { return num_threads_; }
187 
IncInitialized()188   void IncInitialized() { num_initialized_++; }
189 
IncOperated()190   void IncOperated() { num_populated_++; }
191 
IncDone()192   void IncDone() { num_done_++; }
193 
IncVotedReopen()194   void IncVotedReopen() { vote_reopen_ = (vote_reopen_ + 1) % num_threads_; }
195 
AllInitialized()196   bool AllInitialized() const { return num_initialized_ >= num_threads_; }
197 
AllOperated()198   bool AllOperated() const { return num_populated_ >= num_threads_; }
199 
AllDone()200   bool AllDone() const { return num_done_ >= num_threads_; }
201 
AllVotedReopen()202   bool AllVotedReopen() { return (vote_reopen_ == 0); }
203 
SetStart()204   void SetStart() { start_ = true; }
205 
SetStartVerify()206   void SetStartVerify() { start_verify_ = true; }
207 
Started()208   bool Started() const { return start_; }
209 
VerifyStarted()210   bool VerifyStarted() const { return start_verify_; }
211 
SetVerificationFailure()212   void SetVerificationFailure() { verification_failure_.store(true); }
213 
HasVerificationFailedYet()214   bool HasVerificationFailedYet() const { return verification_failure_.load(); }
215 
SetShouldStopTest()216   void SetShouldStopTest() { should_stop_test_.store(true); }
217 
ShouldStopTest()218   bool ShouldStopTest() const { return should_stop_test_.load(); }
219 
GetMutexForKey(int cf,int64_t key)220   port::Mutex* GetMutexForKey(int cf, int64_t key) {
221     return key_locks_[cf][key >> log2_keys_per_lock_].get();
222   }
223 
LockColumnFamily(int cf)224   void LockColumnFamily(int cf) {
225     for (auto& mutex : key_locks_[cf]) {
226       mutex->Lock();
227     }
228   }
229 
UnlockColumnFamily(int cf)230   void UnlockColumnFamily(int cf) {
231     for (auto& mutex : key_locks_[cf]) {
232       mutex->Unlock();
233     }
234   }
235 
Value(int cf,int64_t key)236   std::atomic<uint32_t>& Value(int cf, int64_t key) const {
237     return values_[cf * max_key_ + key];
238   }
239 
ClearColumnFamily(int cf)240   void ClearColumnFamily(int cf) {
241     std::fill(&Value(cf, 0 /* key */), &Value(cf + 1, 0 /* key */),
242               DELETION_SENTINEL);
243   }
244 
245   // @param pending True if the update may have started but is not yet
246   //    guaranteed finished. This is useful for crash-recovery testing when the
247   //    process may crash before updating the expected values array.
Put(int cf,int64_t key,uint32_t value_base,bool pending)248   void Put(int cf, int64_t key, uint32_t value_base, bool pending) {
249     if (!pending) {
250       // prevent expected-value update from reordering before Write
251       std::atomic_thread_fence(std::memory_order_release);
252     }
253     Value(cf, key).store(pending ? UNKNOWN_SENTINEL : value_base,
254                          std::memory_order_relaxed);
255     if (pending) {
256       // prevent Write from reordering before expected-value update
257       std::atomic_thread_fence(std::memory_order_release);
258     }
259   }
260 
Get(int cf,int64_t key)261   uint32_t Get(int cf, int64_t key) const { return Value(cf, key); }
262 
263   // @param pending See comment above Put()
264   // Returns true if the key was not yet deleted.
Delete(int cf,int64_t key,bool pending)265   bool Delete(int cf, int64_t key, bool pending) {
266     if (Value(cf, key) == DELETION_SENTINEL) {
267       return false;
268     }
269     Put(cf, key, DELETION_SENTINEL, pending);
270     return true;
271   }
272 
273   // @param pending See comment above Put()
274   // Returns true if the key was not yet deleted.
SingleDelete(int cf,int64_t key,bool pending)275   bool SingleDelete(int cf, int64_t key, bool pending) {
276     return Delete(cf, key, pending);
277   }
278 
279   // @param pending See comment above Put()
280   // Returns number of keys deleted by the call.
DeleteRange(int cf,int64_t begin_key,int64_t end_key,bool pending)281   int DeleteRange(int cf, int64_t begin_key, int64_t end_key, bool pending) {
282     int covered = 0;
283     for (int64_t key = begin_key; key < end_key; ++key) {
284       if (Delete(cf, key, pending)) {
285         ++covered;
286       }
287     }
288     return covered;
289   }
290 
AllowsOverwrite(int64_t key)291   bool AllowsOverwrite(int64_t key) {
292     return no_overwrite_ids_.find(key) == no_overwrite_ids_.end();
293   }
294 
Exists(int cf,int64_t key)295   bool Exists(int cf, int64_t key) {
296     // UNKNOWN_SENTINEL counts as exists. That assures a key for which overwrite
297     // is disallowed can't be accidentally added a second time, in which case
298     // SingleDelete wouldn't be able to properly delete the key. It does allow
299     // the case where a SingleDelete might be added which covers nothing, but
300     // that's not a correctness issue.
301     uint32_t expected_value = Value(cf, key).load();
302     return expected_value != DELETION_SENTINEL;
303   }
304 
GetSeed()305   uint32_t GetSeed() const { return seed_; }
306 
SetShouldStopBgThread()307   void SetShouldStopBgThread() { should_stop_bg_thread_ = true; }
308 
ShouldStopBgThread()309   bool ShouldStopBgThread() { return should_stop_bg_thread_; }
310 
IncBgThreadsFinished()311   void IncBgThreadsFinished() { ++bg_thread_finished_; }
312 
BgThreadsFinished()313   bool BgThreadsFinished() const {
314     return bg_thread_finished_ == num_bg_threads_;
315   }
316 
ShouldVerifyAtBeginning()317   bool ShouldVerifyAtBeginning() const {
318     return expected_mmap_buffer_.get() != nullptr;
319   }
320 
PrintingVerificationResults()321   bool PrintingVerificationResults() {
322     bool tmp = false;
323     return !printing_verification_results_.compare_exchange_strong(
324         tmp, true, std::memory_order_relaxed);
325   }
326 
FinishPrintingVerificationResults()327   void FinishPrintingVerificationResults() {
328     printing_verification_results_.store(false, std::memory_order_relaxed);
329   }
330 
331  private:
332   port::Mutex mu_;
333   port::CondVar cv_;
334   const uint32_t seed_;
335   const int64_t max_key_;
336   const uint32_t log2_keys_per_lock_;
337   const int num_threads_;
338   long num_initialized_;
339   long num_populated_;
340   long vote_reopen_;
341   long num_done_;
342   bool start_;
343   bool start_verify_;
344   int num_bg_threads_;
345   bool should_stop_bg_thread_;
346   int bg_thread_finished_;
347   StressTest* stress_test_;
348   std::atomic<bool> verification_failure_;
349   std::atomic<bool> should_stop_test_;
350 
351   // Keys that should not be overwritten
352   std::unordered_set<size_t> no_overwrite_ids_;
353 
354   std::atomic<uint32_t>* values_;
355   std::unique_ptr<std::atomic<uint32_t>[]> values_allocation_;
356   // Has to make it owned by a smart ptr as port::Mutex is not copyable
357   // and storing it in the container may require copying depending on the impl.
358   std::vector<std::vector<std::unique_ptr<port::Mutex>>> key_locks_;
359   std::unique_ptr<MemoryMappedFileBuffer> expected_mmap_buffer_;
360   std::atomic<bool> printing_verification_results_;
361 };
362 
363 // Per-thread state for concurrent executions of the same benchmark.
364 struct ThreadState {
365   uint32_t tid;  // 0..n-1
366   Random rand;   // Has different seeds for different threads
367   SharedState* shared;
368   Stats stats;
369   struct SnapshotState {
370     const Snapshot* snapshot;
371     // The cf from which we did a Get at this snapshot
372     int cf_at;
373     // The name of the cf at the time that we did a read
374     std::string cf_at_name;
375     // The key with which we did a Get at this snapshot
376     std::string key;
377     // The status of the Get
378     Status status;
379     // The value of the Get
380     std::string value;
381     // optional state of all keys in the db
382     std::vector<bool>* key_vec;
383   };
384   std::queue<std::pair<uint64_t, SnapshotState>> snapshot_queue;
385 
ThreadStateThreadState386   ThreadState(uint32_t index, SharedState* _shared)
387       : tid(index), rand(1000 + index + _shared->GetSeed()), shared(_shared) {}
388 };
389 }  // namespace ROCKSDB_NAMESPACE
390 #endif  // GFLAGS
391