1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 //
10 
11 #ifdef GFLAGS
12 #include "db_stress_tool/db_stress_common.h"
13 
14 namespace ROCKSDB_NAMESPACE {
ThreadBody(void * v)15 void ThreadBody(void* v) {
16   ThreadState* thread = reinterpret_cast<ThreadState*>(v);
17   SharedState* shared = thread->shared;
18 
19   if (shared->ShouldVerifyAtBeginning()) {
20     thread->shared->GetStressTest()->VerifyDb(thread);
21   }
22   {
23     MutexLock l(shared->GetMutex());
24     shared->IncInitialized();
25     if (shared->AllInitialized()) {
26       shared->GetCondVar()->SignalAll();
27     }
28     while (!shared->Started()) {
29       shared->GetCondVar()->Wait();
30     }
31   }
32   thread->shared->GetStressTest()->OperateDb(thread);
33 
34   {
35     MutexLock l(shared->GetMutex());
36     shared->IncOperated();
37     if (shared->AllOperated()) {
38       shared->GetCondVar()->SignalAll();
39     }
40     while (!shared->VerifyStarted()) {
41       shared->GetCondVar()->Wait();
42     }
43   }
44 
45   thread->shared->GetStressTest()->VerifyDb(thread);
46 
47   {
48     MutexLock l(shared->GetMutex());
49     shared->IncDone();
50     if (shared->AllDone()) {
51       shared->GetCondVar()->SignalAll();
52     }
53   }
54 }
55 
RunStressTest(StressTest * stress)56 bool RunStressTest(StressTest* stress) {
57   stress->InitDb();
58 
59   SharedState shared(db_stress_env, stress);
60   if (FLAGS_read_only) {
61     stress->InitReadonlyDb(&shared);
62   }
63 
64   uint32_t n = shared.GetNumThreads();
65 
66   uint64_t now = db_stress_env->NowMicros();
67   fprintf(stdout, "%s Initializing worker threads\n",
68           db_stress_env->TimeToString(now / 1000000).c_str());
69   std::vector<ThreadState*> threads(n);
70   for (uint32_t i = 0; i < n; i++) {
71     threads[i] = new ThreadState(i, &shared);
72     db_stress_env->StartThread(ThreadBody, threads[i]);
73   }
74   ThreadState bg_thread(0, &shared);
75   if (FLAGS_compaction_thread_pool_adjust_interval > 0) {
76     db_stress_env->StartThread(PoolSizeChangeThread, &bg_thread);
77   }
78   ThreadState continuous_verification_thread(0, &shared);
79   if (FLAGS_continuous_verification_interval > 0) {
80     db_stress_env->StartThread(DbVerificationThread,
81                                &continuous_verification_thread);
82   }
83 
84   // Each thread goes through the following states:
85   // initializing -> wait for others to init -> read/populate/depopulate
86   // wait for others to operate -> verify -> done
87 
88   {
89     MutexLock l(shared.GetMutex());
90     while (!shared.AllInitialized()) {
91       shared.GetCondVar()->Wait();
92     }
93     if (shared.ShouldVerifyAtBeginning()) {
94       if (shared.HasVerificationFailedYet()) {
95         fprintf(stderr, "Crash-recovery verification failed :(\n");
96       } else {
97         fprintf(stdout, "Crash-recovery verification passed :)\n");
98       }
99     }
100 
101     now = db_stress_env->NowMicros();
102     fprintf(stdout, "%s Starting database operations\n",
103             db_stress_env->TimeToString(now / 1000000).c_str());
104 
105     shared.SetStart();
106     shared.GetCondVar()->SignalAll();
107     while (!shared.AllOperated()) {
108       shared.GetCondVar()->Wait();
109     }
110 
111     now = db_stress_env->NowMicros();
112     if (FLAGS_test_batches_snapshots) {
113       fprintf(stdout, "%s Limited verification already done during gets\n",
114               db_stress_env->TimeToString((uint64_t)now / 1000000).c_str());
115     } else {
116       fprintf(stdout, "%s Starting verification\n",
117               db_stress_env->TimeToString((uint64_t)now / 1000000).c_str());
118     }
119 
120     shared.SetStartVerify();
121     shared.GetCondVar()->SignalAll();
122     while (!shared.AllDone()) {
123       shared.GetCondVar()->Wait();
124     }
125   }
126 
127   for (unsigned int i = 1; i < n; i++) {
128     threads[0]->stats.Merge(threads[i]->stats);
129   }
130   threads[0]->stats.Report("Stress Test");
131 
132   for (unsigned int i = 0; i < n; i++) {
133     delete threads[i];
134     threads[i] = nullptr;
135   }
136   now = db_stress_env->NowMicros();
137   if (!FLAGS_test_batches_snapshots && !shared.HasVerificationFailedYet()) {
138     fprintf(stdout, "%s Verification successful\n",
139             db_stress_env->TimeToString(now / 1000000).c_str());
140   }
141   stress->PrintStatistics();
142 
143   if (FLAGS_compaction_thread_pool_adjust_interval > 0 ||
144       FLAGS_continuous_verification_interval > 0) {
145     MutexLock l(shared.GetMutex());
146     shared.SetShouldStopBgThread();
147     while (!shared.BgThreadsFinished()) {
148       shared.GetCondVar()->Wait();
149     }
150   }
151 
152   if (!stress->VerifySecondaries()) {
153     return false;
154   }
155 
156   if (shared.HasVerificationFailedYet()) {
157     fprintf(stderr, "Verification failed :(\n");
158     return false;
159   }
160   return true;
161 }
162 }  // namespace ROCKSDB_NAMESPACE
163 #endif  // GFLAGS
164