1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #ifdef GFLAGS
11 #include "db_stress_tool/db_stress_common.h"
12 #include "file/file_util.h"
13 
14 namespace ROCKSDB_NAMESPACE {
15 class CfConsistencyStressTest : public StressTest {
16  public:
CfConsistencyStressTest()17   CfConsistencyStressTest() : batch_id_(0) {}
18 
~CfConsistencyStressTest()19   ~CfConsistencyStressTest() override {}
20 
TestPut(ThreadState * thread,WriteOptions & write_opts,const ReadOptions &,const std::vector<int> & rand_column_families,const std::vector<int64_t> & rand_keys,char (& value)[100],std::unique_ptr<MutexLock> &)21   Status TestPut(ThreadState* thread, WriteOptions& write_opts,
22                  const ReadOptions& /* read_opts */,
23                  const std::vector<int>& rand_column_families,
24                  const std::vector<int64_t>& rand_keys, char (&value)[100],
25                  std::unique_ptr<MutexLock>& /* lock */) override {
26     std::string key_str = Key(rand_keys[0]);
27     Slice key = key_str;
28     uint64_t value_base = batch_id_.fetch_add(1);
29     size_t sz =
30         GenerateValue(static_cast<uint32_t>(value_base), value, sizeof(value));
31     Slice v(value, sz);
32     WriteBatch batch;
33     for (auto cf : rand_column_families) {
34       ColumnFamilyHandle* cfh = column_families_[cf];
35       if (FLAGS_use_merge) {
36         batch.Merge(cfh, key, v);
37       } else { /* !FLAGS_use_merge */
38         batch.Put(cfh, key, v);
39       }
40     }
41     Status s = db_->Write(write_opts, &batch);
42     if (!s.ok()) {
43       fprintf(stderr, "multi put or merge error: %s\n", s.ToString().c_str());
44       thread->stats.AddErrors(1);
45     } else {
46       auto num = static_cast<long>(rand_column_families.size());
47       thread->stats.AddBytesForWrites(num, (sz + 1) * num);
48     }
49 
50     return s;
51   }
52 
TestDelete(ThreadState * thread,WriteOptions & write_opts,const std::vector<int> & rand_column_families,const std::vector<int64_t> & rand_keys,std::unique_ptr<MutexLock> &)53   Status TestDelete(ThreadState* thread, WriteOptions& write_opts,
54                     const std::vector<int>& rand_column_families,
55                     const std::vector<int64_t>& rand_keys,
56                     std::unique_ptr<MutexLock>& /* lock */) override {
57     std::string key_str = Key(rand_keys[0]);
58     Slice key = key_str;
59     WriteBatch batch;
60     for (auto cf : rand_column_families) {
61       ColumnFamilyHandle* cfh = column_families_[cf];
62       batch.Delete(cfh, key);
63     }
64     Status s = db_->Write(write_opts, &batch);
65     if (!s.ok()) {
66       fprintf(stderr, "multidel error: %s\n", s.ToString().c_str());
67       thread->stats.AddErrors(1);
68     } else {
69       thread->stats.AddDeletes(static_cast<long>(rand_column_families.size()));
70     }
71     return s;
72   }
73 
TestDeleteRange(ThreadState * thread,WriteOptions & write_opts,const std::vector<int> & rand_column_families,const std::vector<int64_t> & rand_keys,std::unique_ptr<MutexLock> &)74   Status TestDeleteRange(ThreadState* thread, WriteOptions& write_opts,
75                          const std::vector<int>& rand_column_families,
76                          const std::vector<int64_t>& rand_keys,
77                          std::unique_ptr<MutexLock>& /* lock */) override {
78     int64_t rand_key = rand_keys[0];
79     auto shared = thread->shared;
80     int64_t max_key = shared->GetMaxKey();
81     if (rand_key > max_key - FLAGS_range_deletion_width) {
82       rand_key =
83           thread->rand.Next() % (max_key - FLAGS_range_deletion_width + 1);
84     }
85     std::string key_str = Key(rand_key);
86     Slice key = key_str;
87     std::string end_key_str = Key(rand_key + FLAGS_range_deletion_width);
88     Slice end_key = end_key_str;
89     WriteBatch batch;
90     for (auto cf : rand_column_families) {
91       ColumnFamilyHandle* cfh = column_families_[rand_column_families[cf]];
92       batch.DeleteRange(cfh, key, end_key);
93     }
94     Status s = db_->Write(write_opts, &batch);
95     if (!s.ok()) {
96       fprintf(stderr, "multi del range error: %s\n", s.ToString().c_str());
97       thread->stats.AddErrors(1);
98     } else {
99       thread->stats.AddRangeDeletions(
100           static_cast<long>(rand_column_families.size()));
101     }
102     return s;
103   }
104 
TestIngestExternalFile(ThreadState *,const std::vector<int> &,const std::vector<int64_t> &,std::unique_ptr<MutexLock> &)105   void TestIngestExternalFile(
106       ThreadState* /* thread */,
107       const std::vector<int>& /* rand_column_families */,
108       const std::vector<int64_t>& /* rand_keys */,
109       std::unique_ptr<MutexLock>& /* lock */) override {
110     assert(false);
111     fprintf(stderr,
112             "CfConsistencyStressTest does not support TestIngestExternalFile "
113             "because it's not possible to verify the result\n");
114     std::terminate();
115   }
116 
TestGet(ThreadState * thread,const ReadOptions & readoptions,const std::vector<int> & rand_column_families,const std::vector<int64_t> & rand_keys)117   Status TestGet(ThreadState* thread, const ReadOptions& readoptions,
118                  const std::vector<int>& rand_column_families,
119                  const std::vector<int64_t>& rand_keys) override {
120     std::string key_str = Key(rand_keys[0]);
121     Slice key = key_str;
122     Status s;
123     bool is_consistent = true;
124 
125     if (thread->rand.OneIn(2)) {
126       // 1/2 chance, does a random read from random CF
127       auto cfh =
128           column_families_[rand_column_families[thread->rand.Next() %
129                                                 rand_column_families.size()]];
130       std::string from_db;
131       s = db_->Get(readoptions, cfh, key, &from_db);
132     } else {
133       // 1/2 chance, comparing one key is the same across all CFs
134       const Snapshot* snapshot = db_->GetSnapshot();
135       ReadOptions readoptionscopy = readoptions;
136       readoptionscopy.snapshot = snapshot;
137 
138       std::string value0;
139       s = db_->Get(readoptionscopy, column_families_[rand_column_families[0]],
140                    key, &value0);
141       if (s.ok() || s.IsNotFound()) {
142         bool found = s.ok();
143         for (size_t i = 1; i < rand_column_families.size(); i++) {
144           std::string value1;
145           s = db_->Get(readoptionscopy,
146                        column_families_[rand_column_families[i]], key, &value1);
147           if (!s.ok() && !s.IsNotFound()) {
148             break;
149           }
150           if (!found && s.ok()) {
151             fprintf(stderr, "Get() return different results with key %s\n",
152                     Slice(key_str).ToString(true).c_str());
153             fprintf(stderr, "CF %s is not found\n",
154                     column_family_names_[0].c_str());
155             fprintf(stderr, "CF %s returns value %s\n",
156                     column_family_names_[i].c_str(),
157                     Slice(value1).ToString(true).c_str());
158             is_consistent = false;
159           } else if (found && s.IsNotFound()) {
160             fprintf(stderr, "Get() return different results with key %s\n",
161                     Slice(key_str).ToString(true).c_str());
162             fprintf(stderr, "CF %s returns value %s\n",
163                     column_family_names_[0].c_str(),
164                     Slice(value0).ToString(true).c_str());
165             fprintf(stderr, "CF %s is not found\n",
166                     column_family_names_[i].c_str());
167             is_consistent = false;
168           } else if (s.ok() && value0 != value1) {
169             fprintf(stderr, "Get() return different results with key %s\n",
170                     Slice(key_str).ToString(true).c_str());
171             fprintf(stderr, "CF %s returns value %s\n",
172                     column_family_names_[0].c_str(),
173                     Slice(value0).ToString(true).c_str());
174             fprintf(stderr, "CF %s returns value %s\n",
175                     column_family_names_[i].c_str(),
176                     Slice(value1).ToString(true).c_str());
177             is_consistent = false;
178           }
179           if (!is_consistent) {
180             break;
181           }
182         }
183       }
184 
185       db_->ReleaseSnapshot(snapshot);
186     }
187     if (!is_consistent) {
188       fprintf(stderr, "TestGet error: is_consistent is false\n");
189       thread->stats.AddErrors(1);
190       // Fail fast to preserve the DB state.
191       thread->shared->SetVerificationFailure();
192     } else if (s.ok()) {
193       thread->stats.AddGets(1, 1);
194     } else if (s.IsNotFound()) {
195       thread->stats.AddGets(1, 0);
196     } else {
197       fprintf(stderr, "TestGet error: %s\n", s.ToString().c_str());
198       thread->stats.AddErrors(1);
199     }
200     return s;
201   }
202 
TestMultiGet(ThreadState * thread,const ReadOptions & read_opts,const std::vector<int> & rand_column_families,const std::vector<int64_t> & rand_keys)203   std::vector<Status> TestMultiGet(
204       ThreadState* thread, const ReadOptions& read_opts,
205       const std::vector<int>& rand_column_families,
206       const std::vector<int64_t>& rand_keys) override {
207     size_t num_keys = rand_keys.size();
208     std::vector<std::string> key_str;
209     std::vector<Slice> keys;
210     keys.reserve(num_keys);
211     key_str.reserve(num_keys);
212     std::vector<PinnableSlice> values(num_keys);
213     std::vector<Status> statuses(num_keys);
214     ColumnFamilyHandle* cfh = column_families_[rand_column_families[0]];
215 
216     for (size_t i = 0; i < num_keys; ++i) {
217       key_str.emplace_back(Key(rand_keys[i]));
218       keys.emplace_back(key_str.back());
219     }
220     db_->MultiGet(read_opts, cfh, num_keys, keys.data(), values.data(),
221                   statuses.data());
222     for (auto s : statuses) {
223       if (s.ok()) {
224         // found case
225         thread->stats.AddGets(1, 1);
226       } else if (s.IsNotFound()) {
227         // not found case
228         thread->stats.AddGets(1, 0);
229       } else {
230         // errors case
231         fprintf(stderr, "MultiGet error: %s\n", s.ToString().c_str());
232         thread->stats.AddErrors(1);
233       }
234     }
235     return statuses;
236   }
237 
TestPrefixScan(ThreadState * thread,const ReadOptions & readoptions,const std::vector<int> & rand_column_families,const std::vector<int64_t> & rand_keys)238   Status TestPrefixScan(ThreadState* thread, const ReadOptions& readoptions,
239                         const std::vector<int>& rand_column_families,
240                         const std::vector<int64_t>& rand_keys) override {
241     size_t prefix_to_use =
242         (FLAGS_prefix_size < 0) ? 7 : static_cast<size_t>(FLAGS_prefix_size);
243 
244     std::string key_str = Key(rand_keys[0]);
245     Slice key = key_str;
246     Slice prefix = Slice(key.data(), prefix_to_use);
247 
248     std::string upper_bound;
249     Slice ub_slice;
250     ReadOptions ro_copy = readoptions;
251     // Get the next prefix first and then see if we want to set upper bound.
252     // We'll use the next prefix in an assertion later on
253     if (GetNextPrefix(prefix, &upper_bound) && thread->rand.OneIn(2)) {
254       ub_slice = Slice(upper_bound);
255       ro_copy.iterate_upper_bound = &ub_slice;
256     }
257     auto cfh =
258         column_families_[rand_column_families[thread->rand.Next() %
259                                               rand_column_families.size()]];
260     Iterator* iter = db_->NewIterator(ro_copy, cfh);
261     unsigned long count = 0;
262     for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix);
263          iter->Next()) {
264       ++count;
265     }
266     assert(prefix_to_use == 0 ||
267            count <= GetPrefixKeyCount(prefix.ToString(), upper_bound));
268     Status s = iter->status();
269     if (s.ok()) {
270       thread->stats.AddPrefixes(1, count);
271     } else {
272       fprintf(stderr, "TestPrefixScan error: %s\n", s.ToString().c_str());
273       thread->stats.AddErrors(1);
274     }
275     delete iter;
276     return s;
277   }
278 
GetControlCfh(ThreadState * thread,int)279   ColumnFamilyHandle* GetControlCfh(ThreadState* thread,
280                                     int /*column_family_id*/
281                                     ) override {
282     // All column families should contain the same data. Randomly pick one.
283     return column_families_[thread->rand.Next() % column_families_.size()];
284   }
285 
VerifyDb(ThreadState * thread) const286   void VerifyDb(ThreadState* thread) const override {
287     ReadOptions options(FLAGS_verify_checksum, true);
288     // We must set total_order_seek to true because we are doing a SeekToFirst
289     // on a column family whose memtables may support (by default) prefix-based
290     // iterator. In this case, NewIterator with options.total_order_seek being
291     // false returns a prefix-based iterator. Calling SeekToFirst using this
292     // iterator causes the iterator to become invalid. That means we cannot
293     // iterate the memtable using this iterator any more, although the memtable
294     // contains the most up-to-date key-values.
295     options.total_order_seek = true;
296     const auto ss_deleter = [this](const Snapshot* ss) {
297       db_->ReleaseSnapshot(ss);
298     };
299     std::unique_ptr<const Snapshot, decltype(ss_deleter)> snapshot_guard(
300         db_->GetSnapshot(), ss_deleter);
301     options.snapshot = snapshot_guard.get();
302     assert(thread != nullptr);
303     auto shared = thread->shared;
304     std::vector<std::unique_ptr<Iterator>> iters(column_families_.size());
305     for (size_t i = 0; i != column_families_.size(); ++i) {
306       iters[i].reset(db_->NewIterator(options, column_families_[i]));
307     }
308     for (auto& iter : iters) {
309       iter->SeekToFirst();
310     }
311     size_t num = column_families_.size();
312     assert(num == iters.size());
313     std::vector<Status> statuses(num, Status::OK());
314     do {
315       if (shared->HasVerificationFailedYet()) {
316         break;
317       }
318       size_t valid_cnt = 0;
319       size_t idx = 0;
320       for (auto& iter : iters) {
321         if (iter->Valid()) {
322           ++valid_cnt;
323         } else {
324           statuses[idx] = iter->status();
325         }
326         ++idx;
327       }
328       if (valid_cnt == 0) {
329         Status status;
330         for (size_t i = 0; i != num; ++i) {
331           const auto& s = statuses[i];
332           if (!s.ok()) {
333             status = s;
334             fprintf(stderr, "Iterator on cf %s has error: %s\n",
335                     column_families_[i]->GetName().c_str(),
336                     s.ToString().c_str());
337             shared->SetVerificationFailure();
338           }
339         }
340         break;
341       } else if (valid_cnt != iters.size()) {
342         shared->SetVerificationFailure();
343         for (size_t i = 0; i != num; ++i) {
344           if (!iters[i]->Valid()) {
345             if (statuses[i].ok()) {
346               fprintf(stderr, "Finished scanning cf %s\n",
347                       column_families_[i]->GetName().c_str());
348             } else {
349               fprintf(stderr, "Iterator on cf %s has error: %s\n",
350                       column_families_[i]->GetName().c_str(),
351                       statuses[i].ToString().c_str());
352             }
353           } else {
354             fprintf(stderr, "cf %s has remaining data to scan\n",
355                     column_families_[i]->GetName().c_str());
356           }
357         }
358         break;
359       }
360       if (shared->HasVerificationFailedYet()) {
361         break;
362       }
363       // If the program reaches here, then all column families' iterators are
364       // still valid.
365       if (shared->PrintingVerificationResults()) {
366         continue;
367       }
368       Slice key;
369       Slice value;
370       int num_mismatched_cfs = 0;
371       for (size_t i = 0; i != num; ++i) {
372         if (i == 0) {
373           key = iters[i]->key();
374           value = iters[i]->value();
375         } else {
376           int cmp = key.compare(iters[i]->key());
377           if (cmp != 0) {
378             ++num_mismatched_cfs;
379             if (1 == num_mismatched_cfs) {
380               fprintf(stderr, "Verification failed\n");
381               fprintf(stderr, "Latest Sequence Number: %" PRIu64 "\n",
382                       db_->GetLatestSequenceNumber());
383               fprintf(stderr, "[%s] %s => %s\n",
384                       column_families_[0]->GetName().c_str(),
385                       key.ToString(true /* hex */).c_str(),
386                       value.ToString(true /* hex */).c_str());
387             }
388             fprintf(stderr, "[%s] %s => %s\n",
389                     column_families_[i]->GetName().c_str(),
390                     iters[i]->key().ToString(true /* hex */).c_str(),
391                     iters[i]->value().ToString(true /* hex */).c_str());
392 #ifndef ROCKSDB_LITE
393             Slice begin_key;
394             Slice end_key;
395             if (cmp < 0) {
396               begin_key = key;
397               end_key = iters[i]->key();
398             } else {
399               begin_key = iters[i]->key();
400               end_key = key;
401             }
402             std::vector<KeyVersion> versions;
403             const size_t kMaxNumIKeys = 8;
404             const auto print_key_versions = [&](ColumnFamilyHandle* cfh) {
405               Status s = GetAllKeyVersions(db_, cfh, begin_key, end_key,
406                                            kMaxNumIKeys, &versions);
407               if (!s.ok()) {
408                 fprintf(stderr, "%s\n", s.ToString().c_str());
409                 return;
410               }
411               assert(nullptr != cfh);
412               fprintf(stderr,
413                       "Internal keys in CF '%s', [%s, %s] (max %" ROCKSDB_PRIszt
414                       ")\n",
415                       cfh->GetName().c_str(),
416                       begin_key.ToString(true /* hex */).c_str(),
417                       end_key.ToString(true /* hex */).c_str(), kMaxNumIKeys);
418               for (const KeyVersion& kv : versions) {
419                 fprintf(stderr, "  key %s seq %" PRIu64 " type %d\n",
420                         Slice(kv.user_key).ToString(true).c_str(), kv.sequence,
421                         kv.type);
422               }
423             };
424             if (1 == num_mismatched_cfs) {
425               print_key_versions(column_families_[0]);
426             }
427             print_key_versions(column_families_[i]);
428 #endif  // ROCKSDB_LITE
429             shared->SetVerificationFailure();
430           }
431         }
432       }
433       shared->FinishPrintingVerificationResults();
434       for (auto& iter : iters) {
435         iter->Next();
436       }
437     } while (true);
438   }
439 
440 #ifndef ROCKSDB_LITE
ContinuouslyVerifyDb(ThreadState * thread) const441   void ContinuouslyVerifyDb(ThreadState* thread) const override {
442     assert(thread);
443     Status status;
444 
445     DB* db_ptr = cmp_db_ ? cmp_db_ : db_;
446     const auto& cfhs = cmp_db_ ? cmp_cfhs_ : column_families_;
447     const auto ss_deleter = [&](const Snapshot* ss) {
448       db_ptr->ReleaseSnapshot(ss);
449     };
450     std::unique_ptr<const Snapshot, decltype(ss_deleter)> snapshot_guard(
451         db_ptr->GetSnapshot(), ss_deleter);
452     if (cmp_db_) {
453       status = cmp_db_->TryCatchUpWithPrimary();
454     }
455     SharedState* shared = thread->shared;
456     assert(shared);
457     if (!status.ok()) {
458       shared->SetShouldStopTest();
459       return;
460     }
461     assert(cmp_db_ || snapshot_guard.get());
462     const auto checksum_column_family = [](Iterator* iter,
463                                            uint32_t* checksum) -> Status {
464       assert(nullptr != checksum);
465       uint32_t ret = 0;
466       for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
467         ret = crc32c::Extend(ret, iter->key().data(), iter->key().size());
468         ret = crc32c::Extend(ret, iter->value().data(), iter->value().size());
469       }
470       *checksum = ret;
471       return iter->status();
472     };
473     ReadOptions ropts;
474     ropts.total_order_seek = true;
475     ropts.snapshot = snapshot_guard.get();
476     uint32_t crc = 0;
477     {
478       // Compute crc for all key-values of default column family.
479       std::unique_ptr<Iterator> it(db_ptr->NewIterator(ropts));
480       status = checksum_column_family(it.get(), &crc);
481     }
482     uint32_t tmp_crc = 0;
483     if (status.ok()) {
484       for (ColumnFamilyHandle* cfh : cfhs) {
485         if (cfh == db_ptr->DefaultColumnFamily()) {
486           continue;
487         }
488         std::unique_ptr<Iterator> it(db_ptr->NewIterator(ropts, cfh));
489         status = checksum_column_family(it.get(), &tmp_crc);
490         if (!status.ok() || tmp_crc != crc) {
491           break;
492         }
493       }
494     }
495     if (!status.ok() || tmp_crc != crc) {
496       shared->SetShouldStopTest();
497     }
498   }
499 #endif  // !ROCKSDB_LITE
500 
GenerateColumnFamilies(const int,int) const501   std::vector<int> GenerateColumnFamilies(
502       const int /* num_column_families */,
503       int /* rand_column_family */) const override {
504     std::vector<int> ret;
505     int num = static_cast<int>(column_families_.size());
506     int k = 0;
507     std::generate_n(back_inserter(ret), num, [&k]() -> int { return k++; });
508     return ret;
509   }
510 
511  private:
512   std::atomic<int64_t> batch_id_;
513 };
514 
CreateCfConsistencyStressTest()515 StressTest* CreateCfConsistencyStressTest() {
516   return new CfConsistencyStressTest();
517 }
518 
519 }  // namespace ROCKSDB_NAMESPACE
520 #endif  // GFLAGS
521