1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #ifdef GFLAGS
11 #include "db_stress_tool/db_stress_common.h"
12 
13 namespace ROCKSDB_NAMESPACE {
14 class CfConsistencyStressTest : public StressTest {
15  public:
CfConsistencyStressTest()16   CfConsistencyStressTest() : batch_id_(0) {}
17 
~CfConsistencyStressTest()18   ~CfConsistencyStressTest() override {}
19 
TestPut(ThreadState * thread,WriteOptions & write_opts,const ReadOptions &,const std::vector<int> & rand_column_families,const std::vector<int64_t> & rand_keys,char (& value)[100],std::unique_ptr<MutexLock> &)20   Status TestPut(ThreadState* thread, WriteOptions& write_opts,
21                  const ReadOptions& /* read_opts */,
22                  const std::vector<int>& rand_column_families,
23                  const std::vector<int64_t>& rand_keys, char (&value)[100],
24                  std::unique_ptr<MutexLock>& /* lock */) override {
25     std::string key_str = Key(rand_keys[0]);
26     Slice key = key_str;
27     uint64_t value_base = batch_id_.fetch_add(1);
28     size_t sz =
29         GenerateValue(static_cast<uint32_t>(value_base), value, sizeof(value));
30     Slice v(value, sz);
31     WriteBatch batch;
32     for (auto cf : rand_column_families) {
33       ColumnFamilyHandle* cfh = column_families_[cf];
34       if (FLAGS_use_merge) {
35         batch.Merge(cfh, key, v);
36       } else { /* !FLAGS_use_merge */
37         batch.Put(cfh, key, v);
38       }
39     }
40     Status s = db_->Write(write_opts, &batch);
41     if (!s.ok()) {
42       fprintf(stderr, "multi put or merge error: %s\n", s.ToString().c_str());
43       thread->stats.AddErrors(1);
44     } else {
45       auto num = static_cast<long>(rand_column_families.size());
46       thread->stats.AddBytesForWrites(num, (sz + 1) * num);
47     }
48 
49     return s;
50   }
51 
TestDelete(ThreadState * thread,WriteOptions & write_opts,const std::vector<int> & rand_column_families,const std::vector<int64_t> & rand_keys,std::unique_ptr<MutexLock> &)52   Status TestDelete(ThreadState* thread, WriteOptions& write_opts,
53                     const std::vector<int>& rand_column_families,
54                     const std::vector<int64_t>& rand_keys,
55                     std::unique_ptr<MutexLock>& /* lock */) override {
56     std::string key_str = Key(rand_keys[0]);
57     Slice key = key_str;
58     WriteBatch batch;
59     for (auto cf : rand_column_families) {
60       ColumnFamilyHandle* cfh = column_families_[cf];
61       batch.Delete(cfh, key);
62     }
63     Status s = db_->Write(write_opts, &batch);
64     if (!s.ok()) {
65       fprintf(stderr, "multidel error: %s\n", s.ToString().c_str());
66       thread->stats.AddErrors(1);
67     } else {
68       thread->stats.AddDeletes(static_cast<long>(rand_column_families.size()));
69     }
70     return s;
71   }
72 
TestDeleteRange(ThreadState * thread,WriteOptions & write_opts,const std::vector<int> & rand_column_families,const std::vector<int64_t> & rand_keys,std::unique_ptr<MutexLock> &)73   Status TestDeleteRange(ThreadState* thread, WriteOptions& write_opts,
74                          const std::vector<int>& rand_column_families,
75                          const std::vector<int64_t>& rand_keys,
76                          std::unique_ptr<MutexLock>& /* lock */) override {
77     int64_t rand_key = rand_keys[0];
78     auto shared = thread->shared;
79     int64_t max_key = shared->GetMaxKey();
80     if (rand_key > max_key - FLAGS_range_deletion_width) {
81       rand_key =
82           thread->rand.Next() % (max_key - FLAGS_range_deletion_width + 1);
83     }
84     std::string key_str = Key(rand_key);
85     Slice key = key_str;
86     std::string end_key_str = Key(rand_key + FLAGS_range_deletion_width);
87     Slice end_key = end_key_str;
88     WriteBatch batch;
89     for (auto cf : rand_column_families) {
90       ColumnFamilyHandle* cfh = column_families_[rand_column_families[cf]];
91       batch.DeleteRange(cfh, key, end_key);
92     }
93     Status s = db_->Write(write_opts, &batch);
94     if (!s.ok()) {
95       fprintf(stderr, "multi del range error: %s\n", s.ToString().c_str());
96       thread->stats.AddErrors(1);
97     } else {
98       thread->stats.AddRangeDeletions(
99           static_cast<long>(rand_column_families.size()));
100     }
101     return s;
102   }
103 
TestIngestExternalFile(ThreadState *,const std::vector<int> &,const std::vector<int64_t> &,std::unique_ptr<MutexLock> &)104   void TestIngestExternalFile(
105       ThreadState* /* thread */,
106       const std::vector<int>& /* rand_column_families */,
107       const std::vector<int64_t>& /* rand_keys */,
108       std::unique_ptr<MutexLock>& /* lock */) override {
109     assert(false);
110     fprintf(stderr,
111             "CfConsistencyStressTest does not support TestIngestExternalFile "
112             "because it's not possible to verify the result\n");
113     std::terminate();
114   }
115 
TestGet(ThreadState * thread,const ReadOptions & readoptions,const std::vector<int> & rand_column_families,const std::vector<int64_t> & rand_keys)116   Status TestGet(ThreadState* thread, const ReadOptions& readoptions,
117                  const std::vector<int>& rand_column_families,
118                  const std::vector<int64_t>& rand_keys) override {
119     std::string key_str = Key(rand_keys[0]);
120     Slice key = key_str;
121     Status s;
122     bool is_consistent = true;
123 
124     if (thread->rand.OneIn(2)) {
125       // 1/2 chance, does a random read from random CF
126       auto cfh =
127           column_families_[rand_column_families[thread->rand.Next() %
128                                                 rand_column_families.size()]];
129       std::string from_db;
130       s = db_->Get(readoptions, cfh, key, &from_db);
131     } else {
132       // 1/2 chance, comparing one key is the same across all CFs
133       const Snapshot* snapshot = db_->GetSnapshot();
134       ReadOptions readoptionscopy = readoptions;
135       readoptionscopy.snapshot = snapshot;
136 
137       std::string value0;
138       s = db_->Get(readoptionscopy, column_families_[rand_column_families[0]],
139                    key, &value0);
140       if (s.ok() || s.IsNotFound()) {
141         bool found = s.ok();
142         for (size_t i = 1; i < rand_column_families.size(); i++) {
143           std::string value1;
144           s = db_->Get(readoptionscopy,
145                        column_families_[rand_column_families[i]], key, &value1);
146           if (!s.ok() && !s.IsNotFound()) {
147             break;
148           }
149           if (!found && s.ok()) {
150             fprintf(stderr, "Get() return different results with key %s\n",
151                     Slice(key_str).ToString(true).c_str());
152             fprintf(stderr, "CF %s is not found\n",
153                     column_family_names_[0].c_str());
154             fprintf(stderr, "CF %s returns value %s\n",
155                     column_family_names_[i].c_str(),
156                     Slice(value1).ToString(true).c_str());
157             is_consistent = false;
158           } else if (found && s.IsNotFound()) {
159             fprintf(stderr, "Get() return different results with key %s\n",
160                     Slice(key_str).ToString(true).c_str());
161             fprintf(stderr, "CF %s returns value %s\n",
162                     column_family_names_[0].c_str(),
163                     Slice(value0).ToString(true).c_str());
164             fprintf(stderr, "CF %s is not found\n",
165                     column_family_names_[i].c_str());
166             is_consistent = false;
167           } else if (s.ok() && value0 != value1) {
168             fprintf(stderr, "Get() return different results with key %s\n",
169                     Slice(key_str).ToString(true).c_str());
170             fprintf(stderr, "CF %s returns value %s\n",
171                     column_family_names_[0].c_str(),
172                     Slice(value0).ToString(true).c_str());
173             fprintf(stderr, "CF %s returns value %s\n",
174                     column_family_names_[i].c_str(),
175                     Slice(value1).ToString(true).c_str());
176             is_consistent = false;
177           }
178           if (!is_consistent) {
179             break;
180           }
181         }
182       }
183 
184       db_->ReleaseSnapshot(snapshot);
185     }
186     if (!is_consistent) {
187       fprintf(stderr, "TestGet error: is_consistent is false\n");
188       thread->stats.AddErrors(1);
189       // Fail fast to preserve the DB state.
190       thread->shared->SetVerificationFailure();
191     } else if (s.ok()) {
192       thread->stats.AddGets(1, 1);
193     } else if (s.IsNotFound()) {
194       thread->stats.AddGets(1, 0);
195     } else {
196       fprintf(stderr, "TestGet error: %s\n", s.ToString().c_str());
197       thread->stats.AddErrors(1);
198     }
199     return s;
200   }
201 
TestMultiGet(ThreadState * thread,const ReadOptions & read_opts,const std::vector<int> & rand_column_families,const std::vector<int64_t> & rand_keys)202   std::vector<Status> TestMultiGet(
203       ThreadState* thread, const ReadOptions& read_opts,
204       const std::vector<int>& rand_column_families,
205       const std::vector<int64_t>& rand_keys) override {
206     size_t num_keys = rand_keys.size();
207     std::vector<std::string> key_str;
208     std::vector<Slice> keys;
209     keys.reserve(num_keys);
210     key_str.reserve(num_keys);
211     std::vector<PinnableSlice> values(num_keys);
212     std::vector<Status> statuses(num_keys);
213     ColumnFamilyHandle* cfh = column_families_[rand_column_families[0]];
214 
215     for (size_t i = 0; i < num_keys; ++i) {
216       key_str.emplace_back(Key(rand_keys[i]));
217       keys.emplace_back(key_str.back());
218     }
219     db_->MultiGet(read_opts, cfh, num_keys, keys.data(), values.data(),
220                   statuses.data());
221     for (auto s : statuses) {
222       if (s.ok()) {
223         // found case
224         thread->stats.AddGets(1, 1);
225       } else if (s.IsNotFound()) {
226         // not found case
227         thread->stats.AddGets(1, 0);
228       } else {
229         // errors case
230         fprintf(stderr, "MultiGet error: %s\n", s.ToString().c_str());
231         thread->stats.AddErrors(1);
232       }
233     }
234     return statuses;
235   }
236 
TestPrefixScan(ThreadState * thread,const ReadOptions & readoptions,const std::vector<int> & rand_column_families,const std::vector<int64_t> & rand_keys)237   Status TestPrefixScan(ThreadState* thread, const ReadOptions& readoptions,
238                         const std::vector<int>& rand_column_families,
239                         const std::vector<int64_t>& rand_keys) override {
240     size_t prefix_to_use =
241         (FLAGS_prefix_size < 0) ? 7 : static_cast<size_t>(FLAGS_prefix_size);
242 
243     std::string key_str = Key(rand_keys[0]);
244     Slice key = key_str;
245     Slice prefix = Slice(key.data(), prefix_to_use);
246 
247     std::string upper_bound;
248     Slice ub_slice;
249     ReadOptions ro_copy = readoptions;
250     // Get the next prefix first and then see if we want to set upper bound.
251     // We'll use the next prefix in an assertion later on
252     if (GetNextPrefix(prefix, &upper_bound) && thread->rand.OneIn(2)) {
253       ub_slice = Slice(upper_bound);
254       ro_copy.iterate_upper_bound = &ub_slice;
255     }
256     auto cfh =
257         column_families_[rand_column_families[thread->rand.Next() %
258                                               rand_column_families.size()]];
259     Iterator* iter = db_->NewIterator(ro_copy, cfh);
260     unsigned long count = 0;
261     for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix);
262          iter->Next()) {
263       ++count;
264     }
265     assert(prefix_to_use == 0 ||
266            count <= GetPrefixKeyCount(prefix.ToString(), upper_bound));
267     Status s = iter->status();
268     if (s.ok()) {
269       thread->stats.AddPrefixes(1, count);
270     } else {
271       fprintf(stderr, "TestPrefixScan error: %s\n", s.ToString().c_str());
272       thread->stats.AddErrors(1);
273     }
274     delete iter;
275     return s;
276   }
277 
GetControlCfh(ThreadState * thread,int)278   ColumnFamilyHandle* GetControlCfh(ThreadState* thread,
279                                     int /*column_family_id*/
280                                     ) override {
281     // All column families should contain the same data. Randomly pick one.
282     return column_families_[thread->rand.Next() % column_families_.size()];
283   }
284 
285 #ifdef ROCKSDB_LITE
TestCheckpoint(ThreadState *,const std::vector<int> &,const std::vector<int64_t> &)286   Status TestCheckpoint(ThreadState* /* thread */,
287                         const std::vector<int>& /* rand_column_families */,
288                         const std::vector<int64_t>& /* rand_keys */) override {
289     assert(false);
290     fprintf(stderr,
291             "RocksDB lite does not support "
292             "TestCheckpoint\n");
293     std::terminate();
294   }
295 #else
TestCheckpoint(ThreadState * thread,const std::vector<int> &,const std::vector<int64_t> &)296   Status TestCheckpoint(ThreadState* thread,
297                         const std::vector<int>& /* rand_column_families */,
298                         const std::vector<int64_t>& /* rand_keys */) override {
299     std::string checkpoint_dir =
300         FLAGS_db + "/.checkpoint" + ToString(thread->tid);
301 
302     // We need to clear DB including manifest files, so make a copy
303     Options opt_copy = options_;
304     opt_copy.env = db_stress_env->target();
305     DestroyDB(checkpoint_dir, opt_copy);
306 
307     Checkpoint* checkpoint = nullptr;
308     Status s = Checkpoint::Create(db_, &checkpoint);
309     if (s.ok()) {
310       s = checkpoint->CreateCheckpoint(checkpoint_dir);
311     }
312     std::vector<ColumnFamilyHandle*> cf_handles;
313     DB* checkpoint_db = nullptr;
314     if (s.ok()) {
315       delete checkpoint;
316       checkpoint = nullptr;
317       Options options(options_);
318       options.listeners.clear();
319       std::vector<ColumnFamilyDescriptor> cf_descs;
320       // TODO(ajkr): `column_family_names_` is not safe to access here when
321       // `clear_column_family_one_in != 0`. But we can't easily switch to
322       // `ListColumnFamilies` to get names because it won't necessarily give
323       // the same order as `column_family_names_`.
324       if (FLAGS_clear_column_family_one_in == 0) {
325         for (const auto& name : column_family_names_) {
326           cf_descs.emplace_back(name, ColumnFamilyOptions(options));
327         }
328         s = DB::OpenForReadOnly(DBOptions(options), checkpoint_dir, cf_descs,
329                                 &cf_handles, &checkpoint_db);
330       }
331     }
332     if (checkpoint_db != nullptr) {
333       for (auto cfh : cf_handles) {
334         delete cfh;
335       }
336       cf_handles.clear();
337       delete checkpoint_db;
338       checkpoint_db = nullptr;
339     }
340     DestroyDB(checkpoint_dir, opt_copy);
341     if (!s.ok()) {
342       fprintf(stderr, "A checkpoint operation failed with: %s\n",
343               s.ToString().c_str());
344     }
345     return s;
346   }
347 #endif  // !ROCKSDB_LITE
348 
VerifyDb(ThreadState * thread) const349   void VerifyDb(ThreadState* thread) const override {
350     ReadOptions options(FLAGS_verify_checksum, true);
351     // We must set total_order_seek to true because we are doing a SeekToFirst
352     // on a column family whose memtables may support (by default) prefix-based
353     // iterator. In this case, NewIterator with options.total_order_seek being
354     // false returns a prefix-based iterator. Calling SeekToFirst using this
355     // iterator causes the iterator to become invalid. That means we cannot
356     // iterate the memtable using this iterator any more, although the memtable
357     // contains the most up-to-date key-values.
358     options.total_order_seek = true;
359     const auto ss_deleter = [this](const Snapshot* ss) {
360       db_->ReleaseSnapshot(ss);
361     };
362     std::unique_ptr<const Snapshot, decltype(ss_deleter)> snapshot_guard(
363         db_->GetSnapshot(), ss_deleter);
364     options.snapshot = snapshot_guard.get();
365     assert(thread != nullptr);
366     auto shared = thread->shared;
367     std::vector<std::unique_ptr<Iterator>> iters(column_families_.size());
368     for (size_t i = 0; i != column_families_.size(); ++i) {
369       iters[i].reset(db_->NewIterator(options, column_families_[i]));
370     }
371     for (auto& iter : iters) {
372       iter->SeekToFirst();
373     }
374     size_t num = column_families_.size();
375     assert(num == iters.size());
376     std::vector<Status> statuses(num, Status::OK());
377     do {
378       if (shared->HasVerificationFailedYet()) {
379         break;
380       }
381       size_t valid_cnt = 0;
382       size_t idx = 0;
383       for (auto& iter : iters) {
384         if (iter->Valid()) {
385           ++valid_cnt;
386         } else {
387           statuses[idx] = iter->status();
388         }
389         ++idx;
390       }
391       if (valid_cnt == 0) {
392         Status status;
393         for (size_t i = 0; i != num; ++i) {
394           const auto& s = statuses[i];
395           if (!s.ok()) {
396             status = s;
397             fprintf(stderr, "Iterator on cf %s has error: %s\n",
398                     column_families_[i]->GetName().c_str(),
399                     s.ToString().c_str());
400             shared->SetVerificationFailure();
401           }
402         }
403         break;
404       } else if (valid_cnt != iters.size()) {
405         shared->SetVerificationFailure();
406         for (size_t i = 0; i != num; ++i) {
407           if (!iters[i]->Valid()) {
408             if (statuses[i].ok()) {
409               fprintf(stderr, "Finished scanning cf %s\n",
410                       column_families_[i]->GetName().c_str());
411             } else {
412               fprintf(stderr, "Iterator on cf %s has error: %s\n",
413                       column_families_[i]->GetName().c_str(),
414                       statuses[i].ToString().c_str());
415             }
416           } else {
417             fprintf(stderr, "cf %s has remaining data to scan\n",
418                     column_families_[i]->GetName().c_str());
419           }
420         }
421         break;
422       }
423       if (shared->HasVerificationFailedYet()) {
424         break;
425       }
426       // If the program reaches here, then all column families' iterators are
427       // still valid.
428       if (shared->PrintingVerificationResults()) {
429         continue;
430       }
431       Slice key;
432       Slice value;
433       int num_mismatched_cfs = 0;
434       for (size_t i = 0; i != num; ++i) {
435         if (i == 0) {
436           key = iters[i]->key();
437           value = iters[i]->value();
438         } else {
439           int cmp = key.compare(iters[i]->key());
440           if (cmp != 0) {
441             ++num_mismatched_cfs;
442             if (1 == num_mismatched_cfs) {
443               fprintf(stderr, "Verification failed\n");
444               fprintf(stderr, "Latest Sequence Number: %" PRIu64 "\n",
445                       db_->GetLatestSequenceNumber());
446               fprintf(stderr, "[%s] %s => %s\n",
447                       column_families_[0]->GetName().c_str(),
448                       key.ToString(true /* hex */).c_str(),
449                       value.ToString(true /* hex */).c_str());
450             }
451             fprintf(stderr, "[%s] %s => %s\n",
452                     column_families_[i]->GetName().c_str(),
453                     iters[i]->key().ToString(true /* hex */).c_str(),
454                     iters[i]->value().ToString(true /* hex */).c_str());
455 #ifndef ROCKSDB_LITE
456             Slice begin_key;
457             Slice end_key;
458             if (cmp < 0) {
459               begin_key = key;
460               end_key = iters[i]->key();
461             } else {
462               begin_key = iters[i]->key();
463               end_key = key;
464             }
465             std::vector<KeyVersion> versions;
466             const size_t kMaxNumIKeys = 8;
467             const auto print_key_versions = [&](ColumnFamilyHandle* cfh) {
468               Status s = GetAllKeyVersions(db_, cfh, begin_key, end_key,
469                                            kMaxNumIKeys, &versions);
470               if (!s.ok()) {
471                 fprintf(stderr, "%s\n", s.ToString().c_str());
472                 return;
473               }
474               assert(nullptr != cfh);
475               fprintf(stderr,
476                       "Internal keys in CF '%s', [%s, %s] (max %" ROCKSDB_PRIszt
477                       ")\n",
478                       cfh->GetName().c_str(),
479                       begin_key.ToString(true /* hex */).c_str(),
480                       end_key.ToString(true /* hex */).c_str(), kMaxNumIKeys);
481               for (const KeyVersion& kv : versions) {
482                 fprintf(stderr, "  key %s seq %" PRIu64 " type %d\n",
483                         Slice(kv.user_key).ToString(true).c_str(), kv.sequence,
484                         kv.type);
485               }
486             };
487             if (1 == num_mismatched_cfs) {
488               print_key_versions(column_families_[0]);
489             }
490             print_key_versions(column_families_[i]);
491 #endif  // ROCKSDB_LITE
492             shared->SetVerificationFailure();
493           }
494         }
495       }
496       shared->FinishPrintingVerificationResults();
497       for (auto& iter : iters) {
498         iter->Next();
499       }
500     } while (true);
501   }
502 
503 #ifndef ROCKSDB_LITE
ContinuouslyVerifyDb(ThreadState * thread) const504   void ContinuouslyVerifyDb(ThreadState* thread) const override {
505     assert(thread);
506     Status status;
507 
508     DB* db_ptr = cmp_db_ ? cmp_db_ : db_;
509     const auto& cfhs = cmp_db_ ? cmp_cfhs_ : column_families_;
510     const auto ss_deleter = [&](const Snapshot* ss) {
511       db_ptr->ReleaseSnapshot(ss);
512     };
513     std::unique_ptr<const Snapshot, decltype(ss_deleter)> snapshot_guard(
514         db_ptr->GetSnapshot(), ss_deleter);
515     if (cmp_db_) {
516       status = cmp_db_->TryCatchUpWithPrimary();
517     }
518     SharedState* shared = thread->shared;
519     assert(shared);
520     if (!status.ok()) {
521       shared->SetShouldStopTest();
522       return;
523     }
524     assert(cmp_db_ || snapshot_guard.get());
525     const auto checksum_column_family = [](Iterator* iter,
526                                            uint32_t* checksum) -> Status {
527       assert(nullptr != checksum);
528       uint32_t ret = 0;
529       for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
530         ret = crc32c::Extend(ret, iter->key().data(), iter->key().size());
531         ret = crc32c::Extend(ret, iter->value().data(), iter->value().size());
532       }
533       *checksum = ret;
534       return iter->status();
535     };
536     ReadOptions ropts;
537     ropts.total_order_seek = true;
538     ropts.snapshot = snapshot_guard.get();
539     uint32_t crc = 0;
540     {
541       // Compute crc for all key-values of default column family.
542       std::unique_ptr<Iterator> it(db_ptr->NewIterator(ropts));
543       status = checksum_column_family(it.get(), &crc);
544     }
545     uint32_t tmp_crc = 0;
546     if (status.ok()) {
547       for (ColumnFamilyHandle* cfh : cfhs) {
548         if (cfh == db_ptr->DefaultColumnFamily()) {
549           continue;
550         }
551         std::unique_ptr<Iterator> it(db_ptr->NewIterator(ropts, cfh));
552         status = checksum_column_family(it.get(), &tmp_crc);
553         if (!status.ok() || tmp_crc != crc) {
554           break;
555         }
556       }
557     }
558     if (!status.ok() || tmp_crc != crc) {
559       shared->SetShouldStopTest();
560     }
561   }
562 #endif  // !ROCKSDB_LITE
563 
GenerateColumnFamilies(const int,int) const564   std::vector<int> GenerateColumnFamilies(
565       const int /* num_column_families */,
566       int /* rand_column_family */) const override {
567     std::vector<int> ret;
568     int num = static_cast<int>(column_families_.size());
569     int k = 0;
570     std::generate_n(back_inserter(ret), num, [&k]() -> int { return k++; });
571     return ret;
572   }
573 
574  private:
575   std::atomic<int64_t> batch_id_;
576 };
577 
CreateCfConsistencyStressTest()578 StressTest* CreateCfConsistencyStressTest() {
579   return new CfConsistencyStressTest();
580 }
581 
582 }  // namespace ROCKSDB_NAMESPACE
583 #endif  // GFLAGS
584