1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. 2 // This source code is licensed under both the GPLv2 (found in the 3 // COPYING file in the root directory) and Apache 2.0 License 4 // (found in the LICENSE.Apache file in the root directory). 5 // 6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved. 7 // Use of this source code is governed by a BSD-style license that can be 8 // found in the LICENSE file. See the AUTHORS file for names of contributors 9 10 #ifdef GFLAGS 11 #pragma once 12 13 #include "db_stress_tool/db_stress_stat.h" 14 #include "util/gflags_compat.h" 15 16 DECLARE_uint64(seed); 17 DECLARE_int64(max_key); 18 DECLARE_uint64(log2_keys_per_lock); 19 DECLARE_int32(threads); 20 DECLARE_int32(column_families); 21 DECLARE_int32(nooverwritepercent); 22 DECLARE_string(expected_values_path); 23 DECLARE_int32(clear_column_family_one_in); 24 DECLARE_bool(test_batches_snapshots); 25 DECLARE_int32(compaction_thread_pool_adjust_interval); 26 DECLARE_int32(continuous_verification_interval); 27 28 namespace ROCKSDB_NAMESPACE { 29 class StressTest; 30 31 // State shared by all concurrent executions of the same benchmark. 32 class SharedState { 33 public: 34 // indicates a key may have any value (or not be present) as an operation on 35 // it is incomplete. 36 static const uint32_t UNKNOWN_SENTINEL; 37 // indicates a key should definitely be deleted 38 static const uint32_t DELETION_SENTINEL; 39 SharedState(Env * env,StressTest * stress_test)40 SharedState(Env* env, StressTest* stress_test) 41 : cv_(&mu_), 42 seed_(static_cast<uint32_t>(FLAGS_seed)), 43 max_key_(FLAGS_max_key), 44 log2_keys_per_lock_(static_cast<uint32_t>(FLAGS_log2_keys_per_lock)), 45 num_threads_(FLAGS_threads), 46 num_initialized_(0), 47 num_populated_(0), 48 vote_reopen_(0), 49 num_done_(0), 50 start_(false), 51 start_verify_(false), 52 num_bg_threads_(0), 53 should_stop_bg_thread_(false), 54 bg_thread_finished_(0), 55 stress_test_(stress_test), 56 verification_failure_(false), 57 should_stop_test_(false), 58 no_overwrite_ids_(FLAGS_column_families), 59 values_(nullptr), 60 printing_verification_results_(false) { 61 // Pick random keys in each column family that will not experience 62 // overwrite 63 64 fprintf(stdout, "Choosing random keys with no overwrite\n"); 65 Random64 rnd(seed_); 66 // Start with the identity permutation. Subsequent iterations of 67 // for loop below will start with perm of previous for loop 68 int64_t* permutation = new int64_t[max_key_]; 69 for (int64_t i = 0; i < max_key_; i++) { 70 permutation[i] = i; 71 } 72 // Now do the Knuth shuffle 73 int64_t num_no_overwrite_keys = (max_key_ * FLAGS_nooverwritepercent) / 100; 74 // Only need to figure out first num_no_overwrite_keys of permutation 75 no_overwrite_ids_.reserve(num_no_overwrite_keys); 76 for (int64_t i = 0; i < num_no_overwrite_keys; i++) { 77 int64_t rand_index = i + rnd.Next() % (max_key_ - i); 78 // Swap i and rand_index; 79 int64_t temp = permutation[i]; 80 permutation[i] = permutation[rand_index]; 81 permutation[rand_index] = temp; 82 // Fill no_overwrite_ids_ with the first num_no_overwrite_keys of 83 // permutation 84 no_overwrite_ids_.insert(permutation[i]); 85 } 86 delete[] permutation; 87 88 size_t expected_values_size = 89 sizeof(std::atomic<uint32_t>) * FLAGS_column_families * max_key_; 90 bool values_init_needed = false; 91 Status status; 92 if (!FLAGS_expected_values_path.empty()) { 93 if (!std::atomic<uint32_t>{}.is_lock_free()) { 94 status = Status::InvalidArgument( 95 "Cannot use --expected_values_path on platforms without lock-free " 96 "std::atomic<uint32_t>"); 97 } 98 if (status.ok() && FLAGS_clear_column_family_one_in > 0) { 99 status = Status::InvalidArgument( 100 "Cannot use --expected_values_path on when " 101 "--clear_column_family_one_in is greater than zero."); 102 } 103 uint64_t size = 0; 104 if (status.ok()) { 105 status = env->GetFileSize(FLAGS_expected_values_path, &size); 106 } 107 std::unique_ptr<WritableFile> wfile; 108 if (status.ok() && size == 0) { 109 const EnvOptions soptions; 110 status = 111 env->NewWritableFile(FLAGS_expected_values_path, &wfile, soptions); 112 } 113 if (status.ok() && size == 0) { 114 std::string buf(expected_values_size, '\0'); 115 status = wfile->Append(buf); 116 values_init_needed = true; 117 } 118 if (status.ok()) { 119 status = env->NewMemoryMappedFileBuffer(FLAGS_expected_values_path, 120 &expected_mmap_buffer_); 121 } 122 if (status.ok()) { 123 assert(expected_mmap_buffer_->GetLen() == expected_values_size); 124 values_ = static_cast<std::atomic<uint32_t>*>( 125 expected_mmap_buffer_->GetBase()); 126 assert(values_ != nullptr); 127 } else { 128 fprintf(stderr, "Failed opening shared file '%s' with error: %s\n", 129 FLAGS_expected_values_path.c_str(), status.ToString().c_str()); 130 assert(values_ == nullptr); 131 } 132 } 133 if (values_ == nullptr) { 134 values_allocation_.reset( 135 new std::atomic<uint32_t>[FLAGS_column_families * max_key_]); 136 values_ = &values_allocation_[0]; 137 values_init_needed = true; 138 } 139 assert(values_ != nullptr); 140 if (values_init_needed) { 141 for (int i = 0; i < FLAGS_column_families; ++i) { 142 for (int j = 0; j < max_key_; ++j) { 143 Delete(i, j, false /* pending */); 144 } 145 } 146 } 147 148 if (FLAGS_test_batches_snapshots) { 149 fprintf(stdout, "No lock creation because test_batches_snapshots set\n"); 150 return; 151 } 152 153 long num_locks = static_cast<long>(max_key_ >> log2_keys_per_lock_); 154 if (max_key_ & ((1 << log2_keys_per_lock_) - 1)) { 155 num_locks++; 156 } 157 fprintf(stdout, "Creating %ld locks\n", num_locks * FLAGS_column_families); 158 key_locks_.resize(FLAGS_column_families); 159 160 for (int i = 0; i < FLAGS_column_families; ++i) { 161 key_locks_[i].resize(num_locks); 162 for (auto& ptr : key_locks_[i]) { 163 ptr.reset(new port::Mutex); 164 } 165 } 166 if (FLAGS_compaction_thread_pool_adjust_interval > 0) { 167 ++num_bg_threads_; 168 fprintf(stdout, "Starting compaction_thread_pool_adjust_thread\n"); 169 } 170 if (FLAGS_continuous_verification_interval > 0) { 171 ++num_bg_threads_; 172 fprintf(stdout, "Starting continuous_verification_thread\n"); 173 } 174 } 175 ~SharedState()176 ~SharedState() {} 177 GetMutex()178 port::Mutex* GetMutex() { return &mu_; } 179 GetCondVar()180 port::CondVar* GetCondVar() { return &cv_; } 181 GetStressTest()182 StressTest* GetStressTest() const { return stress_test_; } 183 GetMaxKey()184 int64_t GetMaxKey() const { return max_key_; } 185 GetNumThreads()186 uint32_t GetNumThreads() const { return num_threads_; } 187 IncInitialized()188 void IncInitialized() { num_initialized_++; } 189 IncOperated()190 void IncOperated() { num_populated_++; } 191 IncDone()192 void IncDone() { num_done_++; } 193 IncVotedReopen()194 void IncVotedReopen() { vote_reopen_ = (vote_reopen_ + 1) % num_threads_; } 195 AllInitialized()196 bool AllInitialized() const { return num_initialized_ >= num_threads_; } 197 AllOperated()198 bool AllOperated() const { return num_populated_ >= num_threads_; } 199 AllDone()200 bool AllDone() const { return num_done_ >= num_threads_; } 201 AllVotedReopen()202 bool AllVotedReopen() { return (vote_reopen_ == 0); } 203 SetStart()204 void SetStart() { start_ = true; } 205 SetStartVerify()206 void SetStartVerify() { start_verify_ = true; } 207 Started()208 bool Started() const { return start_; } 209 VerifyStarted()210 bool VerifyStarted() const { return start_verify_; } 211 SetVerificationFailure()212 void SetVerificationFailure() { verification_failure_.store(true); } 213 HasVerificationFailedYet()214 bool HasVerificationFailedYet() const { return verification_failure_.load(); } 215 SetShouldStopTest()216 void SetShouldStopTest() { should_stop_test_.store(true); } 217 ShouldStopTest()218 bool ShouldStopTest() const { return should_stop_test_.load(); } 219 GetMutexForKey(int cf,int64_t key)220 port::Mutex* GetMutexForKey(int cf, int64_t key) { 221 return key_locks_[cf][key >> log2_keys_per_lock_].get(); 222 } 223 LockColumnFamily(int cf)224 void LockColumnFamily(int cf) { 225 for (auto& mutex : key_locks_[cf]) { 226 mutex->Lock(); 227 } 228 } 229 UnlockColumnFamily(int cf)230 void UnlockColumnFamily(int cf) { 231 for (auto& mutex : key_locks_[cf]) { 232 mutex->Unlock(); 233 } 234 } 235 Value(int cf,int64_t key)236 std::atomic<uint32_t>& Value(int cf, int64_t key) const { 237 return values_[cf * max_key_ + key]; 238 } 239 ClearColumnFamily(int cf)240 void ClearColumnFamily(int cf) { 241 std::fill(&Value(cf, 0 /* key */), &Value(cf + 1, 0 /* key */), 242 DELETION_SENTINEL); 243 } 244 245 // @param pending True if the update may have started but is not yet 246 // guaranteed finished. This is useful for crash-recovery testing when the 247 // process may crash before updating the expected values array. Put(int cf,int64_t key,uint32_t value_base,bool pending)248 void Put(int cf, int64_t key, uint32_t value_base, bool pending) { 249 if (!pending) { 250 // prevent expected-value update from reordering before Write 251 std::atomic_thread_fence(std::memory_order_release); 252 } 253 Value(cf, key).store(pending ? UNKNOWN_SENTINEL : value_base, 254 std::memory_order_relaxed); 255 if (pending) { 256 // prevent Write from reordering before expected-value update 257 std::atomic_thread_fence(std::memory_order_release); 258 } 259 } 260 Get(int cf,int64_t key)261 uint32_t Get(int cf, int64_t key) const { return Value(cf, key); } 262 263 // @param pending See comment above Put() 264 // Returns true if the key was not yet deleted. Delete(int cf,int64_t key,bool pending)265 bool Delete(int cf, int64_t key, bool pending) { 266 if (Value(cf, key) == DELETION_SENTINEL) { 267 return false; 268 } 269 Put(cf, key, DELETION_SENTINEL, pending); 270 return true; 271 } 272 273 // @param pending See comment above Put() 274 // Returns true if the key was not yet deleted. SingleDelete(int cf,int64_t key,bool pending)275 bool SingleDelete(int cf, int64_t key, bool pending) { 276 return Delete(cf, key, pending); 277 } 278 279 // @param pending See comment above Put() 280 // Returns number of keys deleted by the call. DeleteRange(int cf,int64_t begin_key,int64_t end_key,bool pending)281 int DeleteRange(int cf, int64_t begin_key, int64_t end_key, bool pending) { 282 int covered = 0; 283 for (int64_t key = begin_key; key < end_key; ++key) { 284 if (Delete(cf, key, pending)) { 285 ++covered; 286 } 287 } 288 return covered; 289 } 290 AllowsOverwrite(int64_t key)291 bool AllowsOverwrite(int64_t key) { 292 return no_overwrite_ids_.find(key) == no_overwrite_ids_.end(); 293 } 294 Exists(int cf,int64_t key)295 bool Exists(int cf, int64_t key) { 296 // UNKNOWN_SENTINEL counts as exists. That assures a key for which overwrite 297 // is disallowed can't be accidentally added a second time, in which case 298 // SingleDelete wouldn't be able to properly delete the key. It does allow 299 // the case where a SingleDelete might be added which covers nothing, but 300 // that's not a correctness issue. 301 uint32_t expected_value = Value(cf, key).load(); 302 return expected_value != DELETION_SENTINEL; 303 } 304 GetSeed()305 uint32_t GetSeed() const { return seed_; } 306 SetShouldStopBgThread()307 void SetShouldStopBgThread() { should_stop_bg_thread_ = true; } 308 ShouldStopBgThread()309 bool ShouldStopBgThread() { return should_stop_bg_thread_; } 310 IncBgThreadsFinished()311 void IncBgThreadsFinished() { ++bg_thread_finished_; } 312 BgThreadsFinished()313 bool BgThreadsFinished() const { 314 return bg_thread_finished_ == num_bg_threads_; 315 } 316 ShouldVerifyAtBeginning()317 bool ShouldVerifyAtBeginning() const { 318 return expected_mmap_buffer_.get() != nullptr; 319 } 320 PrintingVerificationResults()321 bool PrintingVerificationResults() { 322 bool tmp = false; 323 return !printing_verification_results_.compare_exchange_strong( 324 tmp, true, std::memory_order_relaxed); 325 } 326 FinishPrintingVerificationResults()327 void FinishPrintingVerificationResults() { 328 printing_verification_results_.store(false, std::memory_order_relaxed); 329 } 330 331 private: 332 port::Mutex mu_; 333 port::CondVar cv_; 334 const uint32_t seed_; 335 const int64_t max_key_; 336 const uint32_t log2_keys_per_lock_; 337 const int num_threads_; 338 long num_initialized_; 339 long num_populated_; 340 long vote_reopen_; 341 long num_done_; 342 bool start_; 343 bool start_verify_; 344 int num_bg_threads_; 345 bool should_stop_bg_thread_; 346 int bg_thread_finished_; 347 StressTest* stress_test_; 348 std::atomic<bool> verification_failure_; 349 std::atomic<bool> should_stop_test_; 350 351 // Keys that should not be overwritten 352 std::unordered_set<size_t> no_overwrite_ids_; 353 354 std::atomic<uint32_t>* values_; 355 std::unique_ptr<std::atomic<uint32_t>[]> values_allocation_; 356 // Has to make it owned by a smart ptr as port::Mutex is not copyable 357 // and storing it in the container may require copying depending on the impl. 358 std::vector<std::vector<std::unique_ptr<port::Mutex>>> key_locks_; 359 std::unique_ptr<MemoryMappedFileBuffer> expected_mmap_buffer_; 360 std::atomic<bool> printing_verification_results_; 361 }; 362 363 // Per-thread state for concurrent executions of the same benchmark. 364 struct ThreadState { 365 uint32_t tid; // 0..n-1 366 Random rand; // Has different seeds for different threads 367 SharedState* shared; 368 Stats stats; 369 struct SnapshotState { 370 const Snapshot* snapshot; 371 // The cf from which we did a Get at this snapshot 372 int cf_at; 373 // The name of the cf at the time that we did a read 374 std::string cf_at_name; 375 // The key with which we did a Get at this snapshot 376 std::string key; 377 // The status of the Get 378 Status status; 379 // The value of the Get 380 std::string value; 381 // optional state of all keys in the db 382 std::vector<bool>* key_vec; 383 }; 384 std::queue<std::pair<uint64_t, SnapshotState>> snapshot_queue; 385 ThreadStateThreadState386 ThreadState(uint32_t index, SharedState* _shared) 387 : tid(index), rand(1000 + index + _shared->GetSeed()), shared(_shared) {} 388 }; 389 } // namespace ROCKSDB_NAMESPACE 390 #endif // GFLAGS 391