1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. 2 // This source code is licensed under both the GPLv2 (found in the 3 // COPYING file in the root directory) and Apache 2.0 License 4 // (found in the LICENSE.Apache file in the root directory). 5 // 6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved. 7 // Use of this source code is governed by a BSD-style license that can be 8 // found in the LICENSE file. See the AUTHORS file for names of contributors 9 10 #ifdef GFLAGS 11 #pragma once 12 13 #include "db_stress_tool/db_stress_stat.h" 14 #include "db_stress_tool/expected_state.h" 15 // SyncPoint is not supported in Released Windows Mode. 16 #if !(defined NDEBUG) || !defined(OS_WIN) 17 #include "test_util/sync_point.h" 18 #endif // !(defined NDEBUG) || !defined(OS_WIN) 19 #include "util/gflags_compat.h" 20 21 DECLARE_uint64(seed); 22 DECLARE_int64(max_key); 23 DECLARE_uint64(log2_keys_per_lock); 24 DECLARE_int32(threads); 25 DECLARE_int32(column_families); 26 DECLARE_int32(nooverwritepercent); 27 DECLARE_string(expected_values_dir); 28 DECLARE_int32(clear_column_family_one_in); 29 DECLARE_bool(test_batches_snapshots); 30 DECLARE_int32(compaction_thread_pool_adjust_interval); 31 DECLARE_int32(continuous_verification_interval); 32 DECLARE_int32(read_fault_one_in); 33 DECLARE_int32(write_fault_one_in); 34 DECLARE_int32(open_metadata_write_fault_one_in); 35 DECLARE_int32(open_write_fault_one_in); 36 DECLARE_int32(open_read_fault_one_in); 37 38 DECLARE_int32(injest_error_severity); 39 40 namespace ROCKSDB_NAMESPACE { 41 class StressTest; 42 43 // State shared by all concurrent executions of the same benchmark. 44 class SharedState { 45 public: 46 // indicates a key may have any value (or not be present) as an operation on 47 // it is incomplete. 48 static const uint32_t UNKNOWN_SENTINEL; 49 // indicates a key should definitely be deleted 50 static const uint32_t DELETION_SENTINEL; 51 52 // Errors when reading filter blocks are ignored, so we use a thread 53 // local variable updated via sync points to keep track of errors injected 54 // while reading filter blocks in order to ignore the Get/MultiGet result 55 // for those calls 56 #if defined(ROCKSDB_SUPPORT_THREAD_LOCAL) 57 #if defined(OS_SOLARIS) 58 static __thread bool ignore_read_error; 59 #else 60 static thread_local bool ignore_read_error; 61 #endif // OS_SOLARIS 62 #else 63 static bool ignore_read_error; 64 #endif // ROCKSDB_SUPPORT_THREAD_LOCAL 65 SharedState(Env *,StressTest * stress_test)66 SharedState(Env* /*env*/, StressTest* stress_test) 67 : cv_(&mu_), 68 seed_(static_cast<uint32_t>(FLAGS_seed)), 69 max_key_(FLAGS_max_key), 70 log2_keys_per_lock_(static_cast<uint32_t>(FLAGS_log2_keys_per_lock)), 71 num_threads_(FLAGS_threads), 72 num_initialized_(0), 73 num_populated_(0), 74 vote_reopen_(0), 75 num_done_(0), 76 start_(false), 77 start_verify_(false), 78 num_bg_threads_(0), 79 should_stop_bg_thread_(false), 80 bg_thread_finished_(0), 81 stress_test_(stress_test), 82 verification_failure_(false), 83 should_stop_test_(false), 84 no_overwrite_ids_(FLAGS_column_families), 85 expected_state_manager_(nullptr), 86 printing_verification_results_(false) { 87 // Pick random keys in each column family that will not experience 88 // overwrite 89 90 fprintf(stdout, "Choosing random keys with no overwrite\n"); 91 Random64 rnd(seed_); 92 // Start with the identity permutation. Subsequent iterations of 93 // for loop below will start with perm of previous for loop 94 int64_t* permutation = new int64_t[max_key_]; 95 for (int64_t i = 0; i < max_key_; i++) { 96 permutation[i] = i; 97 } 98 // Now do the Knuth shuffle 99 int64_t num_no_overwrite_keys = (max_key_ * FLAGS_nooverwritepercent) / 100; 100 // Only need to figure out first num_no_overwrite_keys of permutation 101 no_overwrite_ids_.reserve(num_no_overwrite_keys); 102 for (int64_t i = 0; i < num_no_overwrite_keys; i++) { 103 int64_t rand_index = i + rnd.Next() % (max_key_ - i); 104 // Swap i and rand_index; 105 int64_t temp = permutation[i]; 106 permutation[i] = permutation[rand_index]; 107 permutation[rand_index] = temp; 108 // Fill no_overwrite_ids_ with the first num_no_overwrite_keys of 109 // permutation 110 no_overwrite_ids_.insert(permutation[i]); 111 } 112 delete[] permutation; 113 114 Status status; 115 // TODO: We should introduce a way to explicitly disable verification 116 // during shutdown. When that is disabled and FLAGS_expected_values_dir 117 // is empty (disabling verification at startup), we can skip tracking 118 // expected state. Only then should we permit bypassing the below feature 119 // compatibility checks. 120 if (!FLAGS_expected_values_dir.empty()) { 121 if (!std::atomic<uint32_t>{}.is_lock_free()) { 122 status = Status::InvalidArgument( 123 "Cannot use --expected_values_dir on platforms without lock-free " 124 "std::atomic<uint32_t>"); 125 } 126 if (status.ok() && FLAGS_clear_column_family_one_in > 0) { 127 status = Status::InvalidArgument( 128 "Cannot use --expected_values_dir on when " 129 "--clear_column_family_one_in is greater than zero."); 130 } 131 } 132 if (status.ok()) { 133 if (FLAGS_expected_values_dir.empty()) { 134 expected_state_manager_.reset( 135 new AnonExpectedStateManager(FLAGS_max_key, FLAGS_column_families)); 136 } else { 137 expected_state_manager_.reset(new FileExpectedStateManager( 138 FLAGS_max_key, FLAGS_column_families, FLAGS_expected_values_dir)); 139 } 140 status = expected_state_manager_->Open(); 141 } 142 if (!status.ok()) { 143 fprintf(stderr, "Failed setting up expected state with error: %s\n", 144 status.ToString().c_str()); 145 exit(1); 146 } 147 148 if (FLAGS_test_batches_snapshots) { 149 fprintf(stdout, "No lock creation because test_batches_snapshots set\n"); 150 return; 151 } 152 153 long num_locks = static_cast<long>(max_key_ >> log2_keys_per_lock_); 154 if (max_key_ & ((1 << log2_keys_per_lock_) - 1)) { 155 num_locks++; 156 } 157 fprintf(stdout, "Creating %ld locks\n", num_locks * FLAGS_column_families); 158 key_locks_.resize(FLAGS_column_families); 159 160 for (int i = 0; i < FLAGS_column_families; ++i) { 161 key_locks_[i].resize(num_locks); 162 for (auto& ptr : key_locks_[i]) { 163 ptr.reset(new port::Mutex); 164 } 165 } 166 if (FLAGS_compaction_thread_pool_adjust_interval > 0) { 167 ++num_bg_threads_; 168 fprintf(stdout, "Starting compaction_thread_pool_adjust_thread\n"); 169 } 170 if (FLAGS_continuous_verification_interval > 0) { 171 ++num_bg_threads_; 172 fprintf(stdout, "Starting continuous_verification_thread\n"); 173 } 174 #ifndef NDEBUG 175 if (FLAGS_read_fault_one_in) { 176 SyncPoint::GetInstance()->SetCallBack("FaultInjectionIgnoreError", 177 IgnoreReadErrorCallback); 178 SyncPoint::GetInstance()->EnableProcessing(); 179 } 180 #endif // NDEBUG 181 } 182 ~SharedState()183 ~SharedState() { 184 #ifndef NDEBUG 185 if (FLAGS_read_fault_one_in) { 186 SyncPoint::GetInstance()->ClearAllCallBacks(); 187 SyncPoint::GetInstance()->DisableProcessing(); 188 } 189 #endif 190 } 191 GetMutex()192 port::Mutex* GetMutex() { return &mu_; } 193 GetCondVar()194 port::CondVar* GetCondVar() { return &cv_; } 195 GetStressTest()196 StressTest* GetStressTest() const { return stress_test_; } 197 GetMaxKey()198 int64_t GetMaxKey() const { return max_key_; } 199 GetNumThreads()200 uint32_t GetNumThreads() const { return num_threads_; } 201 IncInitialized()202 void IncInitialized() { num_initialized_++; } 203 IncOperated()204 void IncOperated() { num_populated_++; } 205 IncDone()206 void IncDone() { num_done_++; } 207 IncVotedReopen()208 void IncVotedReopen() { vote_reopen_ = (vote_reopen_ + 1) % num_threads_; } 209 AllInitialized()210 bool AllInitialized() const { return num_initialized_ >= num_threads_; } 211 AllOperated()212 bool AllOperated() const { return num_populated_ >= num_threads_; } 213 AllDone()214 bool AllDone() const { return num_done_ >= num_threads_; } 215 AllVotedReopen()216 bool AllVotedReopen() { return (vote_reopen_ == 0); } 217 SetStart()218 void SetStart() { start_ = true; } 219 SetStartVerify()220 void SetStartVerify() { start_verify_ = true; } 221 Started()222 bool Started() const { return start_; } 223 VerifyStarted()224 bool VerifyStarted() const { return start_verify_; } 225 SetVerificationFailure()226 void SetVerificationFailure() { verification_failure_.store(true); } 227 HasVerificationFailedYet()228 bool HasVerificationFailedYet() const { return verification_failure_.load(); } 229 SetShouldStopTest()230 void SetShouldStopTest() { should_stop_test_.store(true); } 231 ShouldStopTest()232 bool ShouldStopTest() const { return should_stop_test_.load(); } 233 234 // Returns a lock covering `key` in `cf`. GetMutexForKey(int cf,int64_t key)235 port::Mutex* GetMutexForKey(int cf, int64_t key) { 236 return key_locks_[cf][key >> log2_keys_per_lock_].get(); 237 } 238 239 // Acquires locks for all keys in `cf`. LockColumnFamily(int cf)240 void LockColumnFamily(int cf) { 241 for (auto& mutex : key_locks_[cf]) { 242 mutex->Lock(); 243 } 244 } 245 246 // Releases locks for all keys in `cf`. UnlockColumnFamily(int cf)247 void UnlockColumnFamily(int cf) { 248 for (auto& mutex : key_locks_[cf]) { 249 mutex->Unlock(); 250 } 251 } 252 253 // Requires external locking covering all keys in `cf`. ClearColumnFamily(int cf)254 void ClearColumnFamily(int cf) { 255 return expected_state_manager_->ClearColumnFamily(cf); 256 } 257 258 // @param pending True if the update may have started but is not yet 259 // guaranteed finished. This is useful for crash-recovery testing when the 260 // process may crash before updating the expected values array. 261 // 262 // Requires external locking covering `key` in `cf`. Put(int cf,int64_t key,uint32_t value_base,bool pending)263 void Put(int cf, int64_t key, uint32_t value_base, bool pending) { 264 return expected_state_manager_->Put(cf, key, value_base, pending); 265 } 266 267 // Requires external locking covering `key` in `cf`. Get(int cf,int64_t key)268 uint32_t Get(int cf, int64_t key) const { 269 return expected_state_manager_->Get(cf, key); 270 } 271 272 // @param pending See comment above Put() 273 // Returns true if the key was not yet deleted. 274 // 275 // Requires external locking covering `key` in `cf`. Delete(int cf,int64_t key,bool pending)276 bool Delete(int cf, int64_t key, bool pending) { 277 return expected_state_manager_->Delete(cf, key, pending); 278 } 279 280 // @param pending See comment above Put() 281 // Returns true if the key was not yet deleted. 282 // 283 // Requires external locking covering `key` in `cf`. SingleDelete(int cf,int64_t key,bool pending)284 bool SingleDelete(int cf, int64_t key, bool pending) { 285 return expected_state_manager_->Delete(cf, key, pending); 286 } 287 288 // @param pending See comment above Put() 289 // Returns number of keys deleted by the call. 290 // 291 // Requires external locking covering keys in `[begin_key, end_key)` in `cf`. DeleteRange(int cf,int64_t begin_key,int64_t end_key,bool pending)292 int DeleteRange(int cf, int64_t begin_key, int64_t end_key, bool pending) { 293 return expected_state_manager_->DeleteRange(cf, begin_key, end_key, 294 pending); 295 } 296 AllowsOverwrite(int64_t key)297 bool AllowsOverwrite(int64_t key) { 298 return no_overwrite_ids_.find(key) == no_overwrite_ids_.end(); 299 } 300 301 // Requires external locking covering `key` in `cf`. Exists(int cf,int64_t key)302 bool Exists(int cf, int64_t key) { 303 return expected_state_manager_->Exists(cf, key); 304 } 305 GetSeed()306 uint32_t GetSeed() const { return seed_; } 307 SetShouldStopBgThread()308 void SetShouldStopBgThread() { should_stop_bg_thread_ = true; } 309 ShouldStopBgThread()310 bool ShouldStopBgThread() { return should_stop_bg_thread_; } 311 IncBgThreadsFinished()312 void IncBgThreadsFinished() { ++bg_thread_finished_; } 313 BgThreadsFinished()314 bool BgThreadsFinished() const { 315 return bg_thread_finished_ == num_bg_threads_; 316 } 317 ShouldVerifyAtBeginning()318 bool ShouldVerifyAtBeginning() const { 319 return !FLAGS_expected_values_dir.empty(); 320 } 321 PrintingVerificationResults()322 bool PrintingVerificationResults() { 323 bool tmp = false; 324 return !printing_verification_results_.compare_exchange_strong( 325 tmp, true, std::memory_order_relaxed); 326 } 327 FinishPrintingVerificationResults()328 void FinishPrintingVerificationResults() { 329 printing_verification_results_.store(false, std::memory_order_relaxed); 330 } 331 332 private: IgnoreReadErrorCallback(void *)333 static void IgnoreReadErrorCallback(void*) { 334 ignore_read_error = true; 335 } 336 337 port::Mutex mu_; 338 port::CondVar cv_; 339 const uint32_t seed_; 340 const int64_t max_key_; 341 const uint32_t log2_keys_per_lock_; 342 const int num_threads_; 343 long num_initialized_; 344 long num_populated_; 345 long vote_reopen_; 346 long num_done_; 347 bool start_; 348 bool start_verify_; 349 int num_bg_threads_; 350 bool should_stop_bg_thread_; 351 int bg_thread_finished_; 352 StressTest* stress_test_; 353 std::atomic<bool> verification_failure_; 354 std::atomic<bool> should_stop_test_; 355 356 // Keys that should not be overwritten 357 std::unordered_set<size_t> no_overwrite_ids_; 358 359 std::unique_ptr<ExpectedStateManager> expected_state_manager_; 360 // Has to make it owned by a smart ptr as port::Mutex is not copyable 361 // and storing it in the container may require copying depending on the impl. 362 std::vector<std::vector<std::unique_ptr<port::Mutex>>> key_locks_; 363 std::atomic<bool> printing_verification_results_; 364 }; 365 366 // Per-thread state for concurrent executions of the same benchmark. 367 struct ThreadState { 368 uint32_t tid; // 0..n-1 369 Random rand; // Has different seeds for different threads 370 SharedState* shared; 371 Stats stats; 372 struct SnapshotState { 373 const Snapshot* snapshot; 374 // The cf from which we did a Get at this snapshot 375 int cf_at; 376 // The name of the cf at the time that we did a read 377 std::string cf_at_name; 378 // The key with which we did a Get at this snapshot 379 std::string key; 380 // The status of the Get 381 Status status; 382 // The value of the Get 383 std::string value; 384 // optional state of all keys in the db 385 std::vector<bool>* key_vec; 386 387 std::string timestamp; 388 }; 389 std::queue<std::pair<uint64_t, SnapshotState>> snapshot_queue; 390 ThreadStateThreadState391 ThreadState(uint32_t index, SharedState* _shared) 392 : tid(index), rand(1000 + index + _shared->GetSeed()), shared(_shared) {} 393 }; 394 } // namespace ROCKSDB_NAMESPACE 395 #endif // GFLAGS 396