1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #ifdef GFLAGS
11 #include "db_stress_tool/db_stress_common.h"
12
13 namespace ROCKSDB_NAMESPACE {
14 class NonBatchedOpsStressTest : public StressTest {
15 public:
NonBatchedOpsStressTest()16 NonBatchedOpsStressTest() {}
17
~NonBatchedOpsStressTest()18 virtual ~NonBatchedOpsStressTest() {}
19
VerifyDb(ThreadState * thread) const20 void VerifyDb(ThreadState* thread) const override {
21 ReadOptions options(FLAGS_verify_checksum, true);
22 auto shared = thread->shared;
23 const int64_t max_key = shared->GetMaxKey();
24 const int64_t keys_per_thread = max_key / shared->GetNumThreads();
25 int64_t start = keys_per_thread * thread->tid;
26 int64_t end = start + keys_per_thread;
27 uint64_t prefix_to_use =
28 (FLAGS_prefix_size < 0) ? 1 : static_cast<size_t>(FLAGS_prefix_size);
29 if (thread->tid == shared->GetNumThreads() - 1) {
30 end = max_key;
31 }
32 for (size_t cf = 0; cf < column_families_.size(); ++cf) {
33 if (thread->shared->HasVerificationFailedYet()) {
34 break;
35 }
36 if (!thread->rand.OneIn(2)) {
37 // Use iterator to verify this range
38 Slice prefix;
39 std::string seek_key = Key(start);
40 std::unique_ptr<Iterator> iter(
41 db_->NewIterator(options, column_families_[cf]));
42 iter->Seek(seek_key);
43 prefix = Slice(seek_key.data(), prefix_to_use);
44 for (auto i = start; i < end; i++) {
45 if (thread->shared->HasVerificationFailedYet()) {
46 break;
47 }
48 std::string from_db;
49 std::string keystr = Key(i);
50 Slice k = keystr;
51 Slice pfx = Slice(keystr.data(), prefix_to_use);
52 // Reseek when the prefix changes
53 if (prefix_to_use > 0 && prefix.compare(pfx) != 0) {
54 iter->Seek(k);
55 seek_key = keystr;
56 prefix = Slice(seek_key.data(), prefix_to_use);
57 }
58 Status s = iter->status();
59 if (iter->Valid()) {
60 Slice iter_key = iter->key();
61 if (iter->key().compare(k) > 0) {
62 s = Status::NotFound(Slice());
63 } else if (iter->key().compare(k) == 0) {
64 from_db = iter->value().ToString();
65 iter->Next();
66 } else if (iter_key.compare(k) < 0) {
67 VerificationAbort(shared, "An out of range key was found",
68 static_cast<int>(cf), i);
69 }
70 } else {
71 // The iterator found no value for the key in question, so do not
72 // move to the next item in the iterator
73 s = Status::NotFound();
74 }
75 VerifyValue(static_cast<int>(cf), i, options, shared, from_db, s,
76 true);
77 if (from_db.length()) {
78 PrintKeyValue(static_cast<int>(cf), static_cast<uint32_t>(i),
79 from_db.data(), from_db.length());
80 }
81 }
82 } else {
83 // Use Get to verify this range
84 for (auto i = start; i < end; i++) {
85 if (thread->shared->HasVerificationFailedYet()) {
86 break;
87 }
88 std::string from_db;
89 std::string keystr = Key(i);
90 Slice k = keystr;
91 Status s = db_->Get(options, column_families_[cf], k, &from_db);
92 VerifyValue(static_cast<int>(cf), i, options, shared, from_db, s,
93 true);
94 if (from_db.length()) {
95 PrintKeyValue(static_cast<int>(cf), static_cast<uint32_t>(i),
96 from_db.data(), from_db.length());
97 }
98 }
99 }
100 }
101 }
102
MaybeClearOneColumnFamily(ThreadState * thread)103 void MaybeClearOneColumnFamily(ThreadState* thread) override {
104 if (FLAGS_column_families > 1) {
105 if (thread->rand.OneInOpt(FLAGS_clear_column_family_one_in)) {
106 // drop column family and then create it again (can't drop default)
107 int cf = thread->rand.Next() % (FLAGS_column_families - 1) + 1;
108 std::string new_name = ToString(new_column_family_name_.fetch_add(1));
109 {
110 MutexLock l(thread->shared->GetMutex());
111 fprintf(
112 stdout,
113 "[CF %d] Dropping and recreating column family. new name: %s\n",
114 cf, new_name.c_str());
115 }
116 thread->shared->LockColumnFamily(cf);
117 Status s = db_->DropColumnFamily(column_families_[cf]);
118 delete column_families_[cf];
119 if (!s.ok()) {
120 fprintf(stderr, "dropping column family error: %s\n",
121 s.ToString().c_str());
122 std::terminate();
123 }
124 s = db_->CreateColumnFamily(ColumnFamilyOptions(options_), new_name,
125 &column_families_[cf]);
126 column_family_names_[cf] = new_name;
127 thread->shared->ClearColumnFamily(cf);
128 if (!s.ok()) {
129 fprintf(stderr, "creating column family error: %s\n",
130 s.ToString().c_str());
131 std::terminate();
132 }
133 thread->shared->UnlockColumnFamily(cf);
134 }
135 }
136 }
137
ShouldAcquireMutexOnKey() const138 bool ShouldAcquireMutexOnKey() const override { return true; }
139
TestGet(ThreadState * thread,const ReadOptions & read_opts,const std::vector<int> & rand_column_families,const std::vector<int64_t> & rand_keys)140 Status TestGet(ThreadState* thread, const ReadOptions& read_opts,
141 const std::vector<int>& rand_column_families,
142 const std::vector<int64_t>& rand_keys) override {
143 auto cfh = column_families_[rand_column_families[0]];
144 std::string key_str = Key(rand_keys[0]);
145 Slice key = key_str;
146 std::string from_db;
147 Status s = db_->Get(read_opts, cfh, key, &from_db);
148 if (s.ok()) {
149 // found case
150 thread->stats.AddGets(1, 1);
151 } else if (s.IsNotFound()) {
152 // not found case
153 thread->stats.AddGets(1, 0);
154 } else {
155 // errors case
156 fprintf(stderr, "TestGet error: %s\n", s.ToString().c_str());
157 thread->stats.AddErrors(1);
158 }
159 return s;
160 }
161
TestMultiGet(ThreadState * thread,const ReadOptions & read_opts,const std::vector<int> & rand_column_families,const std::vector<int64_t> & rand_keys)162 std::vector<Status> TestMultiGet(
163 ThreadState* thread, const ReadOptions& read_opts,
164 const std::vector<int>& rand_column_families,
165 const std::vector<int64_t>& rand_keys) override {
166 size_t num_keys = rand_keys.size();
167 std::vector<std::string> key_str;
168 std::vector<Slice> keys;
169 key_str.reserve(num_keys);
170 keys.reserve(num_keys);
171 std::vector<PinnableSlice> values(num_keys);
172 std::vector<Status> statuses(num_keys);
173 ColumnFamilyHandle* cfh = column_families_[rand_column_families[0]];
174
175 // To appease clang analyzer
176 const bool use_txn = FLAGS_use_txn;
177
178 // Create a transaction in order to write some data. The purpose is to
179 // exercise WriteBatchWithIndex::MultiGetFromBatchAndDB. The transaction
180 // will be rolled back once MultiGet returns.
181 #ifndef ROCKSDB_LITE
182 Transaction* txn = nullptr;
183 if (use_txn) {
184 WriteOptions wo;
185 Status s = NewTxn(wo, &txn);
186 if (!s.ok()) {
187 fprintf(stderr, "NewTxn: %s\n", s.ToString().c_str());
188 std::terminate();
189 }
190 }
191 #endif
192 for (size_t i = 0; i < num_keys; ++i) {
193 key_str.emplace_back(Key(rand_keys[i]));
194 keys.emplace_back(key_str.back());
195 #ifndef ROCKSDB_LITE
196 if (use_txn) {
197 // With a 1 in 10 probability, insert the just added key in the batch
198 // into the transaction. This will create an overlap with the MultiGet
199 // keys and exercise some corner cases in the code
200 if (thread->rand.OneIn(10)) {
201 int op = thread->rand.Uniform(2);
202 Status s;
203 switch (op) {
204 case 0:
205 case 1: {
206 uint32_t value_base =
207 thread->rand.Next() % thread->shared->UNKNOWN_SENTINEL;
208 char value[100];
209 size_t sz = GenerateValue(value_base, value, sizeof(value));
210 Slice v(value, sz);
211 if (op == 0) {
212 s = txn->Put(cfh, keys.back(), v);
213 } else {
214 s = txn->Merge(cfh, keys.back(), v);
215 }
216 break;
217 }
218 case 2:
219 s = txn->Delete(cfh, keys.back());
220 break;
221 default:
222 assert(false);
223 }
224 if (!s.ok()) {
225 fprintf(stderr, "Transaction put: %s\n", s.ToString().c_str());
226 std::terminate();
227 }
228 }
229 }
230 #endif
231 }
232
233 if (!use_txn) {
234 db_->MultiGet(read_opts, cfh, num_keys, keys.data(), values.data(),
235 statuses.data());
236 } else {
237 #ifndef ROCKSDB_LITE
238 txn->MultiGet(read_opts, cfh, num_keys, keys.data(), values.data(),
239 statuses.data());
240 RollbackTxn(txn);
241 #endif
242 }
243
244 for (const auto& s : statuses) {
245 if (s.ok()) {
246 // found case
247 thread->stats.AddGets(1, 1);
248 } else if (s.IsNotFound()) {
249 // not found case
250 thread->stats.AddGets(1, 0);
251 } else if (s.IsMergeInProgress() && use_txn) {
252 // With txn this is sometimes expected.
253 thread->stats.AddGets(1, 1);
254 } else {
255 // errors case
256 fprintf(stderr, "MultiGet error: %s\n", s.ToString().c_str());
257 thread->stats.AddErrors(1);
258 }
259 }
260 return statuses;
261 }
262
TestPrefixScan(ThreadState * thread,const ReadOptions & read_opts,const std::vector<int> & rand_column_families,const std::vector<int64_t> & rand_keys)263 Status TestPrefixScan(ThreadState* thread, const ReadOptions& read_opts,
264 const std::vector<int>& rand_column_families,
265 const std::vector<int64_t>& rand_keys) override {
266 auto cfh = column_families_[rand_column_families[0]];
267 std::string key_str = Key(rand_keys[0]);
268 Slice key = key_str;
269 Slice prefix = Slice(key.data(), FLAGS_prefix_size);
270
271 std::string upper_bound;
272 Slice ub_slice;
273 ReadOptions ro_copy = read_opts;
274 // Get the next prefix first and then see if we want to set upper bound.
275 // We'll use the next prefix in an assertion later on
276 if (GetNextPrefix(prefix, &upper_bound) && thread->rand.OneIn(2)) {
277 // For half of the time, set the upper bound to the next prefix
278 ub_slice = Slice(upper_bound);
279 ro_copy.iterate_upper_bound = &ub_slice;
280 }
281
282 Iterator* iter = db_->NewIterator(ro_copy, cfh);
283 unsigned long count = 0;
284 for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix);
285 iter->Next()) {
286 ++count;
287 }
288
289 assert(count <= GetPrefixKeyCount(prefix.ToString(), upper_bound));
290
291 Status s = iter->status();
292 if (iter->status().ok()) {
293 thread->stats.AddPrefixes(1, count);
294 } else {
295 fprintf(stderr, "TestPrefixScan error: %s\n", s.ToString().c_str());
296 thread->stats.AddErrors(1);
297 }
298 delete iter;
299 return s;
300 }
301
TestPut(ThreadState * thread,WriteOptions & write_opts,const ReadOptions & read_opts,const std::vector<int> & rand_column_families,const std::vector<int64_t> & rand_keys,char (& value)[100],std::unique_ptr<MutexLock> & lock)302 Status TestPut(ThreadState* thread, WriteOptions& write_opts,
303 const ReadOptions& read_opts,
304 const std::vector<int>& rand_column_families,
305 const std::vector<int64_t>& rand_keys, char (&value)[100],
306 std::unique_ptr<MutexLock>& lock) override {
307 auto shared = thread->shared;
308 int64_t max_key = shared->GetMaxKey();
309 int64_t rand_key = rand_keys[0];
310 int rand_column_family = rand_column_families[0];
311 while (!shared->AllowsOverwrite(rand_key) &&
312 (FLAGS_use_merge || shared->Exists(rand_column_family, rand_key))) {
313 lock.reset();
314 rand_key = thread->rand.Next() % max_key;
315 rand_column_family = thread->rand.Next() % FLAGS_column_families;
316 lock.reset(
317 new MutexLock(shared->GetMutexForKey(rand_column_family, rand_key)));
318 }
319
320 std::string key_str = Key(rand_key);
321 Slice key = key_str;
322 ColumnFamilyHandle* cfh = column_families_[rand_column_family];
323
324 if (FLAGS_verify_before_write) {
325 std::string key_str2 = Key(rand_key);
326 Slice k = key_str2;
327 std::string from_db;
328 Status s = db_->Get(read_opts, cfh, k, &from_db);
329 if (!VerifyValue(rand_column_family, rand_key, read_opts, shared, from_db,
330 s, true)) {
331 return s;
332 }
333 }
334 uint32_t value_base = thread->rand.Next() % shared->UNKNOWN_SENTINEL;
335 size_t sz = GenerateValue(value_base, value, sizeof(value));
336 Slice v(value, sz);
337 shared->Put(rand_column_family, rand_key, value_base, true /* pending */);
338 Status s;
339 if (FLAGS_use_merge) {
340 if (!FLAGS_use_txn) {
341 s = db_->Merge(write_opts, cfh, key, v);
342 } else {
343 #ifndef ROCKSDB_LITE
344 Transaction* txn;
345 s = NewTxn(write_opts, &txn);
346 if (s.ok()) {
347 s = txn->Merge(cfh, key, v);
348 if (s.ok()) {
349 s = CommitTxn(txn);
350 }
351 }
352 #endif
353 }
354 } else {
355 if (!FLAGS_use_txn) {
356 s = db_->Put(write_opts, cfh, key, v);
357 } else {
358 #ifndef ROCKSDB_LITE
359 Transaction* txn;
360 s = NewTxn(write_opts, &txn);
361 if (s.ok()) {
362 s = txn->Put(cfh, key, v);
363 if (s.ok()) {
364 s = CommitTxn(txn);
365 }
366 }
367 #endif
368 }
369 }
370 shared->Put(rand_column_family, rand_key, value_base, false /* pending */);
371 if (!s.ok()) {
372 fprintf(stderr, "put or merge error: %s\n", s.ToString().c_str());
373 std::terminate();
374 }
375 thread->stats.AddBytesForWrites(1, sz);
376 PrintKeyValue(rand_column_family, static_cast<uint32_t>(rand_key), value,
377 sz);
378 return s;
379 }
380
TestDelete(ThreadState * thread,WriteOptions & write_opts,const std::vector<int> & rand_column_families,const std::vector<int64_t> & rand_keys,std::unique_ptr<MutexLock> & lock)381 Status TestDelete(ThreadState* thread, WriteOptions& write_opts,
382 const std::vector<int>& rand_column_families,
383 const std::vector<int64_t>& rand_keys,
384 std::unique_ptr<MutexLock>& lock) override {
385 int64_t rand_key = rand_keys[0];
386 int rand_column_family = rand_column_families[0];
387 auto shared = thread->shared;
388 int64_t max_key = shared->GetMaxKey();
389
390 // OPERATION delete
391 // If the chosen key does not allow overwrite and it does not exist,
392 // choose another key.
393 while (!shared->AllowsOverwrite(rand_key) &&
394 !shared->Exists(rand_column_family, rand_key)) {
395 lock.reset();
396 rand_key = thread->rand.Next() % max_key;
397 rand_column_family = thread->rand.Next() % FLAGS_column_families;
398 lock.reset(
399 new MutexLock(shared->GetMutexForKey(rand_column_family, rand_key)));
400 }
401
402 std::string key_str = Key(rand_key);
403 Slice key = key_str;
404 auto cfh = column_families_[rand_column_family];
405
406 // Use delete if the key may be overwritten and a single deletion
407 // otherwise.
408 Status s;
409 if (shared->AllowsOverwrite(rand_key)) {
410 shared->Delete(rand_column_family, rand_key, true /* pending */);
411 if (!FLAGS_use_txn) {
412 s = db_->Delete(write_opts, cfh, key);
413 } else {
414 #ifndef ROCKSDB_LITE
415 Transaction* txn;
416 s = NewTxn(write_opts, &txn);
417 if (s.ok()) {
418 s = txn->Delete(cfh, key);
419 if (s.ok()) {
420 s = CommitTxn(txn);
421 }
422 }
423 #endif
424 }
425 shared->Delete(rand_column_family, rand_key, false /* pending */);
426 thread->stats.AddDeletes(1);
427 if (!s.ok()) {
428 fprintf(stderr, "delete error: %s\n", s.ToString().c_str());
429 std::terminate();
430 }
431 } else {
432 shared->SingleDelete(rand_column_family, rand_key, true /* pending */);
433 if (!FLAGS_use_txn) {
434 s = db_->SingleDelete(write_opts, cfh, key);
435 } else {
436 #ifndef ROCKSDB_LITE
437 Transaction* txn;
438 s = NewTxn(write_opts, &txn);
439 if (s.ok()) {
440 s = txn->SingleDelete(cfh, key);
441 if (s.ok()) {
442 s = CommitTxn(txn);
443 }
444 }
445 #endif
446 }
447 shared->SingleDelete(rand_column_family, rand_key, false /* pending */);
448 thread->stats.AddSingleDeletes(1);
449 if (!s.ok()) {
450 fprintf(stderr, "single delete error: %s\n", s.ToString().c_str());
451 std::terminate();
452 }
453 }
454 return s;
455 }
456
TestDeleteRange(ThreadState * thread,WriteOptions & write_opts,const std::vector<int> & rand_column_families,const std::vector<int64_t> & rand_keys,std::unique_ptr<MutexLock> & lock)457 Status TestDeleteRange(ThreadState* thread, WriteOptions& write_opts,
458 const std::vector<int>& rand_column_families,
459 const std::vector<int64_t>& rand_keys,
460 std::unique_ptr<MutexLock>& lock) override {
461 // OPERATION delete range
462 std::vector<std::unique_ptr<MutexLock>> range_locks;
463 // delete range does not respect disallowed overwrites. the keys for
464 // which overwrites are disallowed are randomly distributed so it
465 // could be expensive to find a range where each key allows
466 // overwrites.
467 int64_t rand_key = rand_keys[0];
468 int rand_column_family = rand_column_families[0];
469 auto shared = thread->shared;
470 int64_t max_key = shared->GetMaxKey();
471 if (rand_key > max_key - FLAGS_range_deletion_width) {
472 lock.reset();
473 rand_key =
474 thread->rand.Next() % (max_key - FLAGS_range_deletion_width + 1);
475 range_locks.emplace_back(
476 new MutexLock(shared->GetMutexForKey(rand_column_family, rand_key)));
477 } else {
478 range_locks.emplace_back(std::move(lock));
479 }
480 for (int j = 1; j < FLAGS_range_deletion_width; ++j) {
481 if (((rand_key + j) & ((1 << FLAGS_log2_keys_per_lock) - 1)) == 0) {
482 range_locks.emplace_back(new MutexLock(
483 shared->GetMutexForKey(rand_column_family, rand_key + j)));
484 }
485 }
486 shared->DeleteRange(rand_column_family, rand_key,
487 rand_key + FLAGS_range_deletion_width,
488 true /* pending */);
489
490 std::string keystr = Key(rand_key);
491 Slice key = keystr;
492 auto cfh = column_families_[rand_column_family];
493 std::string end_keystr = Key(rand_key + FLAGS_range_deletion_width);
494 Slice end_key = end_keystr;
495 Status s = db_->DeleteRange(write_opts, cfh, key, end_key);
496 if (!s.ok()) {
497 fprintf(stderr, "delete range error: %s\n", s.ToString().c_str());
498 std::terminate();
499 }
500 int covered = shared->DeleteRange(rand_column_family, rand_key,
501 rand_key + FLAGS_range_deletion_width,
502 false /* pending */);
503 thread->stats.AddRangeDeletions(1);
504 thread->stats.AddCoveredByRangeDeletions(covered);
505 return s;
506 }
507
508 #ifdef ROCKSDB_LITE
TestIngestExternalFile(ThreadState *,const std::vector<int> &,const std::vector<int64_t> &,std::unique_ptr<MutexLock> &)509 void TestIngestExternalFile(
510 ThreadState* /* thread */,
511 const std::vector<int>& /* rand_column_families */,
512 const std::vector<int64_t>& /* rand_keys */,
513 std::unique_ptr<MutexLock>& /* lock */) override {
514 assert(false);
515 fprintf(stderr,
516 "RocksDB lite does not support "
517 "TestIngestExternalFile\n");
518 std::terminate();
519 }
520 #else
TestIngestExternalFile(ThreadState * thread,const std::vector<int> & rand_column_families,const std::vector<int64_t> & rand_keys,std::unique_ptr<MutexLock> & lock)521 void TestIngestExternalFile(ThreadState* thread,
522 const std::vector<int>& rand_column_families,
523 const std::vector<int64_t>& rand_keys,
524 std::unique_ptr<MutexLock>& lock) override {
525 const std::string sst_filename =
526 FLAGS_db + "/." + ToString(thread->tid) + ".sst";
527 Status s;
528 if (db_stress_env->FileExists(sst_filename).ok()) {
529 // Maybe we terminated abnormally before, so cleanup to give this file
530 // ingestion a clean slate
531 s = db_stress_env->DeleteFile(sst_filename);
532 }
533
534 SstFileWriter sst_file_writer(EnvOptions(options_), options_);
535 if (s.ok()) {
536 s = sst_file_writer.Open(sst_filename);
537 }
538 int64_t key_base = rand_keys[0];
539 int column_family = rand_column_families[0];
540 std::vector<std::unique_ptr<MutexLock>> range_locks;
541 std::vector<uint32_t> values;
542 SharedState* shared = thread->shared;
543
544 // Grab locks, set pending state on expected values, and add keys
545 for (int64_t key = key_base;
546 s.ok() && key < std::min(key_base + FLAGS_ingest_external_file_width,
547 shared->GetMaxKey());
548 ++key) {
549 if (key == key_base) {
550 range_locks.emplace_back(std::move(lock));
551 } else if ((key & ((1 << FLAGS_log2_keys_per_lock) - 1)) == 0) {
552 range_locks.emplace_back(
553 new MutexLock(shared->GetMutexForKey(column_family, key)));
554 }
555
556 uint32_t value_base = thread->rand.Next() % shared->UNKNOWN_SENTINEL;
557 values.push_back(value_base);
558 shared->Put(column_family, key, value_base, true /* pending */);
559
560 char value[100];
561 size_t value_len = GenerateValue(value_base, value, sizeof(value));
562 auto key_str = Key(key);
563 s = sst_file_writer.Put(Slice(key_str), Slice(value, value_len));
564 }
565
566 if (s.ok()) {
567 s = sst_file_writer.Finish();
568 }
569 if (s.ok()) {
570 s = db_->IngestExternalFile(column_families_[column_family],
571 {sst_filename}, IngestExternalFileOptions());
572 }
573 if (!s.ok()) {
574 fprintf(stderr, "file ingestion error: %s\n", s.ToString().c_str());
575 std::terminate();
576 }
577 int64_t key = key_base;
578 for (int32_t value : values) {
579 shared->Put(column_family, key, value, false /* pending */);
580 ++key;
581 }
582 }
583 #endif // ROCKSDB_LITE
584
VerifyValue(int cf,int64_t key,const ReadOptions &,SharedState * shared,const std::string & value_from_db,const Status & s,bool strict=false) const585 bool VerifyValue(int cf, int64_t key, const ReadOptions& /*opts*/,
586 SharedState* shared, const std::string& value_from_db,
587 const Status& s, bool strict = false) const {
588 if (shared->HasVerificationFailedYet()) {
589 return false;
590 }
591 // compare value_from_db with the value in the shared state
592 char value[kValueMaxLen];
593 uint32_t value_base = shared->Get(cf, key);
594 if (value_base == SharedState::UNKNOWN_SENTINEL) {
595 return true;
596 }
597 if (value_base == SharedState::DELETION_SENTINEL && !strict) {
598 return true;
599 }
600
601 if (s.ok()) {
602 if (value_base == SharedState::DELETION_SENTINEL) {
603 VerificationAbort(shared, "Unexpected value found", cf, key);
604 return false;
605 }
606 size_t sz = GenerateValue(value_base, value, sizeof(value));
607 if (value_from_db.length() != sz) {
608 VerificationAbort(shared, "Length of value read is not equal", cf, key);
609 return false;
610 }
611 if (memcmp(value_from_db.data(), value, sz) != 0) {
612 VerificationAbort(shared, "Contents of value read don't match", cf,
613 key);
614 return false;
615 }
616 } else {
617 if (value_base != SharedState::DELETION_SENTINEL) {
618 VerificationAbort(shared, "Value not found: " + s.ToString(), cf, key);
619 return false;
620 }
621 }
622 return true;
623 }
624 };
625
CreateNonBatchedOpsStressTest()626 StressTest* CreateNonBatchedOpsStressTest() {
627 return new NonBatchedOpsStressTest();
628 }
629
630 } // namespace ROCKSDB_NAMESPACE
631 #endif // GFLAGS
632