1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 //
10
11 #ifdef GFLAGS
12 #include "db_stress_tool/db_stress_common.h"
13
14 namespace ROCKSDB_NAMESPACE {
ThreadBody(void * v)15 void ThreadBody(void* v) {
16 ThreadState* thread = reinterpret_cast<ThreadState*>(v);
17 SharedState* shared = thread->shared;
18
19 if (shared->ShouldVerifyAtBeginning()) {
20 thread->shared->GetStressTest()->VerifyDb(thread);
21 }
22 {
23 MutexLock l(shared->GetMutex());
24 shared->IncInitialized();
25 if (shared->AllInitialized()) {
26 shared->GetCondVar()->SignalAll();
27 }
28 while (!shared->Started()) {
29 shared->GetCondVar()->Wait();
30 }
31 }
32 thread->shared->GetStressTest()->OperateDb(thread);
33
34 {
35 MutexLock l(shared->GetMutex());
36 shared->IncOperated();
37 if (shared->AllOperated()) {
38 shared->GetCondVar()->SignalAll();
39 }
40 while (!shared->VerifyStarted()) {
41 shared->GetCondVar()->Wait();
42 }
43 }
44
45 thread->shared->GetStressTest()->VerifyDb(thread);
46
47 {
48 MutexLock l(shared->GetMutex());
49 shared->IncDone();
50 if (shared->AllDone()) {
51 shared->GetCondVar()->SignalAll();
52 }
53 }
54 }
55
RunStressTest(StressTest * stress)56 bool RunStressTest(StressTest* stress) {
57 stress->InitDb();
58
59 SharedState shared(db_stress_env, stress);
60 if (FLAGS_read_only) {
61 stress->InitReadonlyDb(&shared);
62 }
63
64 uint32_t n = shared.GetNumThreads();
65
66 uint64_t now = db_stress_env->NowMicros();
67 fprintf(stdout, "%s Initializing worker threads\n",
68 db_stress_env->TimeToString(now / 1000000).c_str());
69 std::vector<ThreadState*> threads(n);
70 for (uint32_t i = 0; i < n; i++) {
71 threads[i] = new ThreadState(i, &shared);
72 db_stress_env->StartThread(ThreadBody, threads[i]);
73 }
74 ThreadState bg_thread(0, &shared);
75 if (FLAGS_compaction_thread_pool_adjust_interval > 0) {
76 db_stress_env->StartThread(PoolSizeChangeThread, &bg_thread);
77 }
78 ThreadState continuous_verification_thread(0, &shared);
79 if (FLAGS_continuous_verification_interval > 0) {
80 db_stress_env->StartThread(DbVerificationThread,
81 &continuous_verification_thread);
82 }
83
84 // Each thread goes through the following states:
85 // initializing -> wait for others to init -> read/populate/depopulate
86 // wait for others to operate -> verify -> done
87
88 {
89 MutexLock l(shared.GetMutex());
90 while (!shared.AllInitialized()) {
91 shared.GetCondVar()->Wait();
92 }
93 if (shared.ShouldVerifyAtBeginning()) {
94 if (shared.HasVerificationFailedYet()) {
95 fprintf(stderr, "Crash-recovery verification failed :(\n");
96 } else {
97 fprintf(stdout, "Crash-recovery verification passed :)\n");
98 }
99 }
100
101 now = db_stress_env->NowMicros();
102 fprintf(stdout, "%s Starting database operations\n",
103 db_stress_env->TimeToString(now / 1000000).c_str());
104
105 shared.SetStart();
106 shared.GetCondVar()->SignalAll();
107 while (!shared.AllOperated()) {
108 shared.GetCondVar()->Wait();
109 }
110
111 now = db_stress_env->NowMicros();
112 if (FLAGS_test_batches_snapshots) {
113 fprintf(stdout, "%s Limited verification already done during gets\n",
114 db_stress_env->TimeToString((uint64_t)now / 1000000).c_str());
115 } else {
116 fprintf(stdout, "%s Starting verification\n",
117 db_stress_env->TimeToString((uint64_t)now / 1000000).c_str());
118 }
119
120 shared.SetStartVerify();
121 shared.GetCondVar()->SignalAll();
122 while (!shared.AllDone()) {
123 shared.GetCondVar()->Wait();
124 }
125 }
126
127 for (unsigned int i = 1; i < n; i++) {
128 threads[0]->stats.Merge(threads[i]->stats);
129 }
130 threads[0]->stats.Report("Stress Test");
131
132 for (unsigned int i = 0; i < n; i++) {
133 delete threads[i];
134 threads[i] = nullptr;
135 }
136 now = db_stress_env->NowMicros();
137 if (!FLAGS_test_batches_snapshots && !shared.HasVerificationFailedYet()) {
138 fprintf(stdout, "%s Verification successful\n",
139 db_stress_env->TimeToString(now / 1000000).c_str());
140 }
141 stress->PrintStatistics();
142
143 if (FLAGS_compaction_thread_pool_adjust_interval > 0 ||
144 FLAGS_continuous_verification_interval > 0) {
145 MutexLock l(shared.GetMutex());
146 shared.SetShouldStopBgThread();
147 while (!shared.BgThreadsFinished()) {
148 shared.GetCondVar()->Wait();
149 }
150 }
151
152 if (!stress->VerifySecondaries()) {
153 return false;
154 }
155
156 if (shared.HasVerificationFailedYet()) {
157 fprintf(stderr, "Verification failed :(\n");
158 return false;
159 }
160 return true;
161 }
162 } // namespace ROCKSDB_NAMESPACE
163 #endif // GFLAGS
164