1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #ifdef GFLAGS
11 #include "db_stress_tool/db_stress_common.h"
12 
13 namespace ROCKSDB_NAMESPACE {
14 class NonBatchedOpsStressTest : public StressTest {
15  public:
NonBatchedOpsStressTest()16   NonBatchedOpsStressTest() {}
17 
~NonBatchedOpsStressTest()18   virtual ~NonBatchedOpsStressTest() {}
19 
VerifyDb(ThreadState * thread) const20   void VerifyDb(ThreadState* thread) const override {
21     ReadOptions options(FLAGS_verify_checksum, true);
22     auto shared = thread->shared;
23     const int64_t max_key = shared->GetMaxKey();
24     const int64_t keys_per_thread = max_key / shared->GetNumThreads();
25     int64_t start = keys_per_thread * thread->tid;
26     int64_t end = start + keys_per_thread;
27     uint64_t prefix_to_use =
28         (FLAGS_prefix_size < 0) ? 1 : static_cast<size_t>(FLAGS_prefix_size);
29     if (thread->tid == shared->GetNumThreads() - 1) {
30       end = max_key;
31     }
32     for (size_t cf = 0; cf < column_families_.size(); ++cf) {
33       if (thread->shared->HasVerificationFailedYet()) {
34         break;
35       }
36       if (!thread->rand.OneIn(2)) {
37         // Use iterator to verify this range
38         Slice prefix;
39         std::string seek_key = Key(start);
40         std::unique_ptr<Iterator> iter(
41             db_->NewIterator(options, column_families_[cf]));
42         iter->Seek(seek_key);
43         prefix = Slice(seek_key.data(), prefix_to_use);
44         for (auto i = start; i < end; i++) {
45           if (thread->shared->HasVerificationFailedYet()) {
46             break;
47           }
48           std::string from_db;
49           std::string keystr = Key(i);
50           Slice k = keystr;
51           Slice pfx = Slice(keystr.data(), prefix_to_use);
52           // Reseek when the prefix changes
53           if (prefix_to_use > 0 && prefix.compare(pfx) != 0) {
54             iter->Seek(k);
55             seek_key = keystr;
56             prefix = Slice(seek_key.data(), prefix_to_use);
57           }
58           Status s = iter->status();
59           if (iter->Valid()) {
60             Slice iter_key = iter->key();
61             if (iter->key().compare(k) > 0) {
62               s = Status::NotFound(Slice());
63             } else if (iter->key().compare(k) == 0) {
64               from_db = iter->value().ToString();
65               iter->Next();
66             } else if (iter_key.compare(k) < 0) {
67               VerificationAbort(shared, "An out of range key was found",
68                                 static_cast<int>(cf), i);
69             }
70           } else {
71             // The iterator found no value for the key in question, so do not
72             // move to the next item in the iterator
73             s = Status::NotFound();
74           }
75           VerifyValue(static_cast<int>(cf), i, options, shared, from_db, s,
76                       true);
77           if (from_db.length()) {
78             PrintKeyValue(static_cast<int>(cf), static_cast<uint32_t>(i),
79                           from_db.data(), from_db.length());
80           }
81         }
82       } else {
83         // Use Get to verify this range
84         for (auto i = start; i < end; i++) {
85           if (thread->shared->HasVerificationFailedYet()) {
86             break;
87           }
88           std::string from_db;
89           std::string keystr = Key(i);
90           Slice k = keystr;
91           Status s = db_->Get(options, column_families_[cf], k, &from_db);
92           VerifyValue(static_cast<int>(cf), i, options, shared, from_db, s,
93                       true);
94           if (from_db.length()) {
95             PrintKeyValue(static_cast<int>(cf), static_cast<uint32_t>(i),
96                           from_db.data(), from_db.length());
97           }
98         }
99       }
100     }
101   }
102 
MaybeClearOneColumnFamily(ThreadState * thread)103   void MaybeClearOneColumnFamily(ThreadState* thread) override {
104     if (FLAGS_column_families > 1) {
105       if (thread->rand.OneInOpt(FLAGS_clear_column_family_one_in)) {
106         // drop column family and then create it again (can't drop default)
107         int cf = thread->rand.Next() % (FLAGS_column_families - 1) + 1;
108         std::string new_name = ToString(new_column_family_name_.fetch_add(1));
109         {
110           MutexLock l(thread->shared->GetMutex());
111           fprintf(
112               stdout,
113               "[CF %d] Dropping and recreating column family. new name: %s\n",
114               cf, new_name.c_str());
115         }
116         thread->shared->LockColumnFamily(cf);
117         Status s = db_->DropColumnFamily(column_families_[cf]);
118         delete column_families_[cf];
119         if (!s.ok()) {
120           fprintf(stderr, "dropping column family error: %s\n",
121                   s.ToString().c_str());
122           std::terminate();
123         }
124         s = db_->CreateColumnFamily(ColumnFamilyOptions(options_), new_name,
125                                     &column_families_[cf]);
126         column_family_names_[cf] = new_name;
127         thread->shared->ClearColumnFamily(cf);
128         if (!s.ok()) {
129           fprintf(stderr, "creating column family error: %s\n",
130                   s.ToString().c_str());
131           std::terminate();
132         }
133         thread->shared->UnlockColumnFamily(cf);
134       }
135     }
136   }
137 
ShouldAcquireMutexOnKey() const138   bool ShouldAcquireMutexOnKey() const override { return true; }
139 
TestGet(ThreadState * thread,const ReadOptions & read_opts,const std::vector<int> & rand_column_families,const std::vector<int64_t> & rand_keys)140   Status TestGet(ThreadState* thread, const ReadOptions& read_opts,
141                  const std::vector<int>& rand_column_families,
142                  const std::vector<int64_t>& rand_keys) override {
143     auto cfh = column_families_[rand_column_families[0]];
144     std::string key_str = Key(rand_keys[0]);
145     Slice key = key_str;
146     std::string from_db;
147     Status s = db_->Get(read_opts, cfh, key, &from_db);
148     if (s.ok()) {
149       // found case
150       thread->stats.AddGets(1, 1);
151     } else if (s.IsNotFound()) {
152       // not found case
153       thread->stats.AddGets(1, 0);
154     } else {
155       // errors case
156       fprintf(stderr, "TestGet error: %s\n", s.ToString().c_str());
157       thread->stats.AddErrors(1);
158     }
159     return s;
160   }
161 
TestMultiGet(ThreadState * thread,const ReadOptions & read_opts,const std::vector<int> & rand_column_families,const std::vector<int64_t> & rand_keys)162   std::vector<Status> TestMultiGet(
163       ThreadState* thread, const ReadOptions& read_opts,
164       const std::vector<int>& rand_column_families,
165       const std::vector<int64_t>& rand_keys) override {
166     size_t num_keys = rand_keys.size();
167     std::vector<std::string> key_str;
168     std::vector<Slice> keys;
169     key_str.reserve(num_keys);
170     keys.reserve(num_keys);
171     std::vector<PinnableSlice> values(num_keys);
172     std::vector<Status> statuses(num_keys);
173     ColumnFamilyHandle* cfh = column_families_[rand_column_families[0]];
174 
175     // To appease clang analyzer
176     const bool use_txn = FLAGS_use_txn;
177 
178     // Create a transaction in order to write some data. The purpose is to
179     // exercise WriteBatchWithIndex::MultiGetFromBatchAndDB. The transaction
180     // will be rolled back once MultiGet returns.
181 #ifndef ROCKSDB_LITE
182     Transaction* txn = nullptr;
183     if (use_txn) {
184       WriteOptions wo;
185       Status s = NewTxn(wo, &txn);
186       if (!s.ok()) {
187         fprintf(stderr, "NewTxn: %s\n", s.ToString().c_str());
188         std::terminate();
189       }
190     }
191 #endif
192     for (size_t i = 0; i < num_keys; ++i) {
193       key_str.emplace_back(Key(rand_keys[i]));
194       keys.emplace_back(key_str.back());
195 #ifndef ROCKSDB_LITE
196       if (use_txn) {
197         // With a 1 in 10 probability, insert the just added key in the batch
198         // into the transaction. This will create an overlap with the MultiGet
199         // keys and exercise some corner cases in the code
200         if (thread->rand.OneIn(10)) {
201           int op = thread->rand.Uniform(2);
202           Status s;
203           switch (op) {
204             case 0:
205             case 1: {
206               uint32_t value_base =
207                   thread->rand.Next() % thread->shared->UNKNOWN_SENTINEL;
208               char value[100];
209               size_t sz = GenerateValue(value_base, value, sizeof(value));
210               Slice v(value, sz);
211               if (op == 0) {
212                 s = txn->Put(cfh, keys.back(), v);
213               } else {
214                 s = txn->Merge(cfh, keys.back(), v);
215               }
216               break;
217             }
218             case 2:
219               s = txn->Delete(cfh, keys.back());
220               break;
221             default:
222               assert(false);
223           }
224           if (!s.ok()) {
225             fprintf(stderr, "Transaction put: %s\n", s.ToString().c_str());
226             std::terminate();
227           }
228         }
229       }
230 #endif
231     }
232 
233     if (!use_txn) {
234       db_->MultiGet(read_opts, cfh, num_keys, keys.data(), values.data(),
235                     statuses.data());
236     } else {
237 #ifndef ROCKSDB_LITE
238       txn->MultiGet(read_opts, cfh, num_keys, keys.data(), values.data(),
239                     statuses.data());
240       RollbackTxn(txn);
241 #endif
242     }
243 
244     for (const auto& s : statuses) {
245       if (s.ok()) {
246         // found case
247         thread->stats.AddGets(1, 1);
248       } else if (s.IsNotFound()) {
249         // not found case
250         thread->stats.AddGets(1, 0);
251       } else if (s.IsMergeInProgress() && use_txn) {
252         // With txn this is sometimes expected.
253         thread->stats.AddGets(1, 1);
254       } else {
255         // errors case
256         fprintf(stderr, "MultiGet error: %s\n", s.ToString().c_str());
257         thread->stats.AddErrors(1);
258       }
259     }
260     return statuses;
261   }
262 
TestPrefixScan(ThreadState * thread,const ReadOptions & read_opts,const std::vector<int> & rand_column_families,const std::vector<int64_t> & rand_keys)263   Status TestPrefixScan(ThreadState* thread, const ReadOptions& read_opts,
264                         const std::vector<int>& rand_column_families,
265                         const std::vector<int64_t>& rand_keys) override {
266     auto cfh = column_families_[rand_column_families[0]];
267     std::string key_str = Key(rand_keys[0]);
268     Slice key = key_str;
269     Slice prefix = Slice(key.data(), FLAGS_prefix_size);
270 
271     std::string upper_bound;
272     Slice ub_slice;
273     ReadOptions ro_copy = read_opts;
274     // Get the next prefix first and then see if we want to set upper bound.
275     // We'll use the next prefix in an assertion later on
276     if (GetNextPrefix(prefix, &upper_bound) && thread->rand.OneIn(2)) {
277       // For half of the time, set the upper bound to the next prefix
278       ub_slice = Slice(upper_bound);
279       ro_copy.iterate_upper_bound = &ub_slice;
280     }
281 
282     Iterator* iter = db_->NewIterator(ro_copy, cfh);
283     unsigned long count = 0;
284     for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix);
285          iter->Next()) {
286       ++count;
287     }
288 
289     assert(count <= GetPrefixKeyCount(prefix.ToString(), upper_bound));
290 
291     Status s = iter->status();
292     if (iter->status().ok()) {
293       thread->stats.AddPrefixes(1, count);
294     } else {
295       fprintf(stderr, "TestPrefixScan error: %s\n", s.ToString().c_str());
296       thread->stats.AddErrors(1);
297     }
298     delete iter;
299     return s;
300   }
301 
TestPut(ThreadState * thread,WriteOptions & write_opts,const ReadOptions & read_opts,const std::vector<int> & rand_column_families,const std::vector<int64_t> & rand_keys,char (& value)[100],std::unique_ptr<MutexLock> & lock)302   Status TestPut(ThreadState* thread, WriteOptions& write_opts,
303                  const ReadOptions& read_opts,
304                  const std::vector<int>& rand_column_families,
305                  const std::vector<int64_t>& rand_keys, char (&value)[100],
306                  std::unique_ptr<MutexLock>& lock) override {
307     auto shared = thread->shared;
308     int64_t max_key = shared->GetMaxKey();
309     int64_t rand_key = rand_keys[0];
310     int rand_column_family = rand_column_families[0];
311     while (!shared->AllowsOverwrite(rand_key) &&
312            (FLAGS_use_merge || shared->Exists(rand_column_family, rand_key))) {
313       lock.reset();
314       rand_key = thread->rand.Next() % max_key;
315       rand_column_family = thread->rand.Next() % FLAGS_column_families;
316       lock.reset(
317           new MutexLock(shared->GetMutexForKey(rand_column_family, rand_key)));
318     }
319 
320     std::string key_str = Key(rand_key);
321     Slice key = key_str;
322     ColumnFamilyHandle* cfh = column_families_[rand_column_family];
323 
324     if (FLAGS_verify_before_write) {
325       std::string key_str2 = Key(rand_key);
326       Slice k = key_str2;
327       std::string from_db;
328       Status s = db_->Get(read_opts, cfh, k, &from_db);
329       if (!VerifyValue(rand_column_family, rand_key, read_opts, shared, from_db,
330                        s, true)) {
331         return s;
332       }
333     }
334     uint32_t value_base = thread->rand.Next() % shared->UNKNOWN_SENTINEL;
335     size_t sz = GenerateValue(value_base, value, sizeof(value));
336     Slice v(value, sz);
337     shared->Put(rand_column_family, rand_key, value_base, true /* pending */);
338     Status s;
339     if (FLAGS_use_merge) {
340       if (!FLAGS_use_txn) {
341         s = db_->Merge(write_opts, cfh, key, v);
342       } else {
343 #ifndef ROCKSDB_LITE
344         Transaction* txn;
345         s = NewTxn(write_opts, &txn);
346         if (s.ok()) {
347           s = txn->Merge(cfh, key, v);
348           if (s.ok()) {
349             s = CommitTxn(txn);
350           }
351         }
352 #endif
353       }
354     } else {
355       if (!FLAGS_use_txn) {
356         s = db_->Put(write_opts, cfh, key, v);
357       } else {
358 #ifndef ROCKSDB_LITE
359         Transaction* txn;
360         s = NewTxn(write_opts, &txn);
361         if (s.ok()) {
362           s = txn->Put(cfh, key, v);
363           if (s.ok()) {
364             s = CommitTxn(txn);
365           }
366         }
367 #endif
368       }
369     }
370     shared->Put(rand_column_family, rand_key, value_base, false /* pending */);
371     if (!s.ok()) {
372       fprintf(stderr, "put or merge error: %s\n", s.ToString().c_str());
373       std::terminate();
374     }
375     thread->stats.AddBytesForWrites(1, sz);
376     PrintKeyValue(rand_column_family, static_cast<uint32_t>(rand_key), value,
377                   sz);
378     return s;
379   }
380 
TestDelete(ThreadState * thread,WriteOptions & write_opts,const std::vector<int> & rand_column_families,const std::vector<int64_t> & rand_keys,std::unique_ptr<MutexLock> & lock)381   Status TestDelete(ThreadState* thread, WriteOptions& write_opts,
382                     const std::vector<int>& rand_column_families,
383                     const std::vector<int64_t>& rand_keys,
384                     std::unique_ptr<MutexLock>& lock) override {
385     int64_t rand_key = rand_keys[0];
386     int rand_column_family = rand_column_families[0];
387     auto shared = thread->shared;
388     int64_t max_key = shared->GetMaxKey();
389 
390     // OPERATION delete
391     // If the chosen key does not allow overwrite and it does not exist,
392     // choose another key.
393     while (!shared->AllowsOverwrite(rand_key) &&
394            !shared->Exists(rand_column_family, rand_key)) {
395       lock.reset();
396       rand_key = thread->rand.Next() % max_key;
397       rand_column_family = thread->rand.Next() % FLAGS_column_families;
398       lock.reset(
399           new MutexLock(shared->GetMutexForKey(rand_column_family, rand_key)));
400     }
401 
402     std::string key_str = Key(rand_key);
403     Slice key = key_str;
404     auto cfh = column_families_[rand_column_family];
405 
406     // Use delete if the key may be overwritten and a single deletion
407     // otherwise.
408     Status s;
409     if (shared->AllowsOverwrite(rand_key)) {
410       shared->Delete(rand_column_family, rand_key, true /* pending */);
411       if (!FLAGS_use_txn) {
412         s = db_->Delete(write_opts, cfh, key);
413       } else {
414 #ifndef ROCKSDB_LITE
415         Transaction* txn;
416         s = NewTxn(write_opts, &txn);
417         if (s.ok()) {
418           s = txn->Delete(cfh, key);
419           if (s.ok()) {
420             s = CommitTxn(txn);
421           }
422         }
423 #endif
424       }
425       shared->Delete(rand_column_family, rand_key, false /* pending */);
426       thread->stats.AddDeletes(1);
427       if (!s.ok()) {
428         fprintf(stderr, "delete error: %s\n", s.ToString().c_str());
429         std::terminate();
430       }
431     } else {
432       shared->SingleDelete(rand_column_family, rand_key, true /* pending */);
433       if (!FLAGS_use_txn) {
434         s = db_->SingleDelete(write_opts, cfh, key);
435       } else {
436 #ifndef ROCKSDB_LITE
437         Transaction* txn;
438         s = NewTxn(write_opts, &txn);
439         if (s.ok()) {
440           s = txn->SingleDelete(cfh, key);
441           if (s.ok()) {
442             s = CommitTxn(txn);
443           }
444         }
445 #endif
446       }
447       shared->SingleDelete(rand_column_family, rand_key, false /* pending */);
448       thread->stats.AddSingleDeletes(1);
449       if (!s.ok()) {
450         fprintf(stderr, "single delete error: %s\n", s.ToString().c_str());
451         std::terminate();
452       }
453     }
454     return s;
455   }
456 
TestDeleteRange(ThreadState * thread,WriteOptions & write_opts,const std::vector<int> & rand_column_families,const std::vector<int64_t> & rand_keys,std::unique_ptr<MutexLock> & lock)457   Status TestDeleteRange(ThreadState* thread, WriteOptions& write_opts,
458                          const std::vector<int>& rand_column_families,
459                          const std::vector<int64_t>& rand_keys,
460                          std::unique_ptr<MutexLock>& lock) override {
461     // OPERATION delete range
462     std::vector<std::unique_ptr<MutexLock>> range_locks;
463     // delete range does not respect disallowed overwrites. the keys for
464     // which overwrites are disallowed are randomly distributed so it
465     // could be expensive to find a range where each key allows
466     // overwrites.
467     int64_t rand_key = rand_keys[0];
468     int rand_column_family = rand_column_families[0];
469     auto shared = thread->shared;
470     int64_t max_key = shared->GetMaxKey();
471     if (rand_key > max_key - FLAGS_range_deletion_width) {
472       lock.reset();
473       rand_key =
474           thread->rand.Next() % (max_key - FLAGS_range_deletion_width + 1);
475       range_locks.emplace_back(
476           new MutexLock(shared->GetMutexForKey(rand_column_family, rand_key)));
477     } else {
478       range_locks.emplace_back(std::move(lock));
479     }
480     for (int j = 1; j < FLAGS_range_deletion_width; ++j) {
481       if (((rand_key + j) & ((1 << FLAGS_log2_keys_per_lock) - 1)) == 0) {
482         range_locks.emplace_back(new MutexLock(
483             shared->GetMutexForKey(rand_column_family, rand_key + j)));
484       }
485     }
486     shared->DeleteRange(rand_column_family, rand_key,
487                         rand_key + FLAGS_range_deletion_width,
488                         true /* pending */);
489 
490     std::string keystr = Key(rand_key);
491     Slice key = keystr;
492     auto cfh = column_families_[rand_column_family];
493     std::string end_keystr = Key(rand_key + FLAGS_range_deletion_width);
494     Slice end_key = end_keystr;
495     Status s = db_->DeleteRange(write_opts, cfh, key, end_key);
496     if (!s.ok()) {
497       fprintf(stderr, "delete range error: %s\n", s.ToString().c_str());
498       std::terminate();
499     }
500     int covered = shared->DeleteRange(rand_column_family, rand_key,
501                                       rand_key + FLAGS_range_deletion_width,
502                                       false /* pending */);
503     thread->stats.AddRangeDeletions(1);
504     thread->stats.AddCoveredByRangeDeletions(covered);
505     return s;
506   }
507 
508 #ifdef ROCKSDB_LITE
TestIngestExternalFile(ThreadState *,const std::vector<int> &,const std::vector<int64_t> &,std::unique_ptr<MutexLock> &)509   void TestIngestExternalFile(
510       ThreadState* /* thread */,
511       const std::vector<int>& /* rand_column_families */,
512       const std::vector<int64_t>& /* rand_keys */,
513       std::unique_ptr<MutexLock>& /* lock */) override {
514     assert(false);
515     fprintf(stderr,
516             "RocksDB lite does not support "
517             "TestIngestExternalFile\n");
518     std::terminate();
519   }
520 #else
TestIngestExternalFile(ThreadState * thread,const std::vector<int> & rand_column_families,const std::vector<int64_t> & rand_keys,std::unique_ptr<MutexLock> & lock)521   void TestIngestExternalFile(ThreadState* thread,
522                               const std::vector<int>& rand_column_families,
523                               const std::vector<int64_t>& rand_keys,
524                               std::unique_ptr<MutexLock>& lock) override {
525     const std::string sst_filename =
526         FLAGS_db + "/." + ToString(thread->tid) + ".sst";
527     Status s;
528     if (db_stress_env->FileExists(sst_filename).ok()) {
529       // Maybe we terminated abnormally before, so cleanup to give this file
530       // ingestion a clean slate
531       s = db_stress_env->DeleteFile(sst_filename);
532     }
533 
534     SstFileWriter sst_file_writer(EnvOptions(options_), options_);
535     if (s.ok()) {
536       s = sst_file_writer.Open(sst_filename);
537     }
538     int64_t key_base = rand_keys[0];
539     int column_family = rand_column_families[0];
540     std::vector<std::unique_ptr<MutexLock>> range_locks;
541     std::vector<uint32_t> values;
542     SharedState* shared = thread->shared;
543 
544     // Grab locks, set pending state on expected values, and add keys
545     for (int64_t key = key_base;
546          s.ok() && key < std::min(key_base + FLAGS_ingest_external_file_width,
547                                   shared->GetMaxKey());
548          ++key) {
549       if (key == key_base) {
550         range_locks.emplace_back(std::move(lock));
551       } else if ((key & ((1 << FLAGS_log2_keys_per_lock) - 1)) == 0) {
552         range_locks.emplace_back(
553             new MutexLock(shared->GetMutexForKey(column_family, key)));
554       }
555 
556       uint32_t value_base = thread->rand.Next() % shared->UNKNOWN_SENTINEL;
557       values.push_back(value_base);
558       shared->Put(column_family, key, value_base, true /* pending */);
559 
560       char value[100];
561       size_t value_len = GenerateValue(value_base, value, sizeof(value));
562       auto key_str = Key(key);
563       s = sst_file_writer.Put(Slice(key_str), Slice(value, value_len));
564     }
565 
566     if (s.ok()) {
567       s = sst_file_writer.Finish();
568     }
569     if (s.ok()) {
570       s = db_->IngestExternalFile(column_families_[column_family],
571                                   {sst_filename}, IngestExternalFileOptions());
572     }
573     if (!s.ok()) {
574       fprintf(stderr, "file ingestion error: %s\n", s.ToString().c_str());
575       std::terminate();
576     }
577     int64_t key = key_base;
578     for (int32_t value : values) {
579       shared->Put(column_family, key, value, false /* pending */);
580       ++key;
581     }
582   }
583 #endif  // ROCKSDB_LITE
584 
VerifyValue(int cf,int64_t key,const ReadOptions &,SharedState * shared,const std::string & value_from_db,const Status & s,bool strict=false) const585   bool VerifyValue(int cf, int64_t key, const ReadOptions& /*opts*/,
586                    SharedState* shared, const std::string& value_from_db,
587                    const Status& s, bool strict = false) const {
588     if (shared->HasVerificationFailedYet()) {
589       return false;
590     }
591     // compare value_from_db with the value in the shared state
592     char value[kValueMaxLen];
593     uint32_t value_base = shared->Get(cf, key);
594     if (value_base == SharedState::UNKNOWN_SENTINEL) {
595       return true;
596     }
597     if (value_base == SharedState::DELETION_SENTINEL && !strict) {
598       return true;
599     }
600 
601     if (s.ok()) {
602       if (value_base == SharedState::DELETION_SENTINEL) {
603         VerificationAbort(shared, "Unexpected value found", cf, key);
604         return false;
605       }
606       size_t sz = GenerateValue(value_base, value, sizeof(value));
607       if (value_from_db.length() != sz) {
608         VerificationAbort(shared, "Length of value read is not equal", cf, key);
609         return false;
610       }
611       if (memcmp(value_from_db.data(), value, sz) != 0) {
612         VerificationAbort(shared, "Contents of value read don't match", cf,
613                           key);
614         return false;
615       }
616     } else {
617       if (value_base != SharedState::DELETION_SENTINEL) {
618         VerificationAbort(shared, "Value not found: " + s.ToString(), cf, key);
619         return false;
620       }
621     }
622     return true;
623   }
624 };
625 
CreateNonBatchedOpsStressTest()626 StressTest* CreateNonBatchedOpsStressTest() {
627   return new NonBatchedOpsStressTest();
628 }
629 
630 }  // namespace ROCKSDB_NAMESPACE
631 #endif  // GFLAGS
632