1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #include "db/blob/blob_index.h"
7 #include "db/db_impl/db_impl.h"
8 #include "db/db_test_util.h"
9 #include "db/dbformat.h"
10 #include "db/version_set.h"
11 #include "db/write_batch_internal.h"
12 #include "file/filename.h"
13 #include "memtable/hash_linklist_rep.h"
14 #include "monitoring/statistics.h"
15 #include "rocksdb/cache.h"
16 #include "rocksdb/compaction_filter.h"
17 #include "rocksdb/db.h"
18 #include "rocksdb/env.h"
19 #include "rocksdb/filter_policy.h"
20 #include "rocksdb/options.h"
21 #include "rocksdb/perf_context.h"
22 #include "rocksdb/slice.h"
23 #include "rocksdb/slice_transform.h"
24 #include "rocksdb/table.h"
25 #include "rocksdb/table_properties.h"
26 #include "test_util/sync_point.h"
27 #include "test_util/testharness.h"
28 #include "test_util/testutil.h"
29 #include "util/hash.h"
30 #include "util/mutexlock.h"
31 #include "util/rate_limiter.h"
32 #include "util/string_util.h"
33 #include "utilities/merge_operators.h"
34 
35 #ifndef ROCKSDB_LITE
36 
37 namespace ROCKSDB_NAMESPACE {
38 
39 class EventListenerTest : public DBTestBase {
40  public:
EventListenerTest()41   EventListenerTest() : DBTestBase("/listener_test", /*env_do_fsync=*/true) {}
42 
BlobStr(uint64_t blob_file_number,uint64_t offset,uint64_t size)43   static std::string BlobStr(uint64_t blob_file_number, uint64_t offset,
44                              uint64_t size) {
45     std::string blob_index;
46     BlobIndex::EncodeBlob(&blob_index, blob_file_number, offset, size,
47                           kNoCompression);
48     return blob_index;
49   }
50 
51   const size_t k110KB = 110 << 10;
52 };
53 
54 struct TestPropertiesCollector
55     : public ROCKSDB_NAMESPACE::TablePropertiesCollector {
AddUserKeyROCKSDB_NAMESPACE::TestPropertiesCollector56   ROCKSDB_NAMESPACE::Status AddUserKey(
57       const ROCKSDB_NAMESPACE::Slice& /*key*/,
58       const ROCKSDB_NAMESPACE::Slice& /*value*/,
59       ROCKSDB_NAMESPACE::EntryType /*type*/,
60       ROCKSDB_NAMESPACE::SequenceNumber /*seq*/,
61       uint64_t /*file_size*/) override {
62     return Status::OK();
63   }
FinishROCKSDB_NAMESPACE::TestPropertiesCollector64   ROCKSDB_NAMESPACE::Status Finish(
65       ROCKSDB_NAMESPACE::UserCollectedProperties* properties) override {
66     properties->insert({"0", "1"});
67     return Status::OK();
68   }
69 
NameROCKSDB_NAMESPACE::TestPropertiesCollector70   const char* Name() const override { return "TestTablePropertiesCollector"; }
71 
GetReadablePropertiesROCKSDB_NAMESPACE::TestPropertiesCollector72   ROCKSDB_NAMESPACE::UserCollectedProperties GetReadableProperties()
73       const override {
74     ROCKSDB_NAMESPACE::UserCollectedProperties ret;
75     ret["2"] = "3";
76     return ret;
77   }
78 };
79 
80 class TestPropertiesCollectorFactory : public TablePropertiesCollectorFactory {
81  public:
CreateTablePropertiesCollector(TablePropertiesCollectorFactory::Context)82   TablePropertiesCollector* CreateTablePropertiesCollector(
83       TablePropertiesCollectorFactory::Context /*context*/) override {
84     return new TestPropertiesCollector;
85   }
Name() const86   const char* Name() const override { return "TestTablePropertiesCollector"; }
87 };
88 
89 class TestCompactionListener : public EventListener {
90  public:
TestCompactionListener(EventListenerTest * test)91   explicit TestCompactionListener(EventListenerTest* test) : test_(test) {}
92 
OnCompactionCompleted(DB * db,const CompactionJobInfo & ci)93   void OnCompactionCompleted(DB *db, const CompactionJobInfo& ci) override {
94     std::lock_guard<std::mutex> lock(mutex_);
95     compacted_dbs_.push_back(db);
96     ASSERT_GT(ci.input_files.size(), 0U);
97     ASSERT_EQ(ci.input_files.size(), ci.input_file_infos.size());
98 
99     for (size_t i = 0; i < ci.input_file_infos.size(); ++i) {
100       ASSERT_EQ(ci.input_file_infos[i].level, ci.base_input_level);
101       ASSERT_EQ(ci.input_file_infos[i].file_number,
102                 TableFileNameToNumber(ci.input_files[i]));
103     }
104 
105     ASSERT_GT(ci.output_files.size(), 0U);
106     ASSERT_EQ(ci.output_files.size(), ci.output_file_infos.size());
107 
108     ASSERT_TRUE(test_);
109     ASSERT_EQ(test_->db_, db);
110 
111     std::vector<std::vector<FileMetaData>> files_by_level;
112     test_->dbfull()->TEST_GetFilesMetaData(test_->handles_[ci.cf_id],
113                                            &files_by_level);
114     ASSERT_GT(files_by_level.size(), ci.output_level);
115 
116     for (size_t i = 0; i < ci.output_file_infos.size(); ++i) {
117       ASSERT_EQ(ci.output_file_infos[i].level, ci.output_level);
118       ASSERT_EQ(ci.output_file_infos[i].file_number,
119                 TableFileNameToNumber(ci.output_files[i]));
120 
121       auto it = std::find_if(
122           files_by_level[ci.output_level].begin(),
123           files_by_level[ci.output_level].end(), [&](const FileMetaData& meta) {
124             return meta.fd.GetNumber() == ci.output_file_infos[i].file_number;
125           });
126       ASSERT_NE(it, files_by_level[ci.output_level].end());
127 
128       ASSERT_EQ(ci.output_file_infos[i].oldest_blob_file_number,
129                 it->oldest_blob_file_number);
130     }
131 
132     ASSERT_EQ(db->GetEnv()->GetThreadID(), ci.thread_id);
133     ASSERT_GT(ci.thread_id, 0U);
134 
135     for (auto fl : {ci.input_files, ci.output_files}) {
136       for (auto fn : fl) {
137         auto it = ci.table_properties.find(fn);
138         ASSERT_NE(it, ci.table_properties.end());
139         auto tp = it->second;
140         ASSERT_TRUE(tp != nullptr);
141         ASSERT_EQ(tp->user_collected_properties.find("0")->second, "1");
142       }
143     }
144   }
145 
146   EventListenerTest* test_;
147   std::vector<DB*> compacted_dbs_;
148   std::mutex mutex_;
149 };
150 
TEST_F(EventListenerTest,OnSingleDBCompactionTest)151 TEST_F(EventListenerTest, OnSingleDBCompactionTest) {
152   const int kTestKeySize = 16;
153   const int kTestValueSize = 984;
154   const int kEntrySize = kTestKeySize + kTestValueSize;
155   const int kEntriesPerBuffer = 100;
156   const int kNumL0Files = 4;
157 
158   Options options;
159   options.env = CurrentOptions().env;
160   options.create_if_missing = true;
161   options.write_buffer_size = kEntrySize * kEntriesPerBuffer;
162   options.compaction_style = kCompactionStyleLevel;
163   options.target_file_size_base = options.write_buffer_size;
164   options.max_bytes_for_level_base = options.target_file_size_base * 2;
165   options.max_bytes_for_level_multiplier = 2;
166   options.compression = kNoCompression;
167 #ifdef ROCKSDB_USING_THREAD_STATUS
168   options.enable_thread_tracking = true;
169 #endif  // ROCKSDB_USING_THREAD_STATUS
170   options.level0_file_num_compaction_trigger = kNumL0Files;
171   options.table_properties_collector_factories.push_back(
172       std::make_shared<TestPropertiesCollectorFactory>());
173 
174   TestCompactionListener* listener = new TestCompactionListener(this);
175   options.listeners.emplace_back(listener);
176   std::vector<std::string> cf_names = {
177       "pikachu", "ilya", "muromec", "dobrynia",
178       "nikitich", "alyosha", "popovich"};
179   CreateAndReopenWithCF(cf_names, options);
180   ASSERT_OK(Put(1, "pikachu", std::string(90000, 'p')));
181 
182   WriteBatch batch;
183   ASSERT_OK(WriteBatchInternal::PutBlobIndex(&batch, 1, "ditto",
184                                              BlobStr(123, 0, 1 << 10)));
185   ASSERT_OK(dbfull()->Write(WriteOptions(), &batch));
186 
187   ASSERT_OK(Put(2, "ilya", std::string(90000, 'i')));
188   ASSERT_OK(Put(3, "muromec", std::string(90000, 'm')));
189   ASSERT_OK(Put(4, "dobrynia", std::string(90000, 'd')));
190   ASSERT_OK(Put(5, "nikitich", std::string(90000, 'n')));
191   ASSERT_OK(Put(6, "alyosha", std::string(90000, 'a')));
192   ASSERT_OK(Put(7, "popovich", std::string(90000, 'p')));
193   for (int i = 1; i < 8; ++i) {
194     ASSERT_OK(Flush(i));
195     ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
196     ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[i],
197                                      nullptr, nullptr));
198     ASSERT_OK(dbfull()->TEST_WaitForCompact());
199   }
200 
201   ASSERT_EQ(listener->compacted_dbs_.size(), cf_names.size());
202   for (size_t i = 0; i < cf_names.size(); ++i) {
203     ASSERT_EQ(listener->compacted_dbs_[i], db_);
204   }
205 }
206 
207 // This simple Listener can only handle one flush at a time.
208 class TestFlushListener : public EventListener {
209  public:
TestFlushListener(Env * env,EventListenerTest * test)210   TestFlushListener(Env* env, EventListenerTest* test)
211       : slowdown_count(0), stop_count(0), db_closed(), env_(env), test_(test) {
212     db_closed = false;
213   }
214 
~TestFlushListener()215   virtual ~TestFlushListener() {
216     prev_fc_info_.status.PermitUncheckedError();  // Ignore the status
217   }
OnTableFileCreated(const TableFileCreationInfo & info)218   void OnTableFileCreated(
219       const TableFileCreationInfo& info) override {
220     // remember the info for later checking the FlushJobInfo.
221     prev_fc_info_ = info;
222     ASSERT_GT(info.db_name.size(), 0U);
223     ASSERT_GT(info.cf_name.size(), 0U);
224     ASSERT_GT(info.file_path.size(), 0U);
225     ASSERT_GT(info.job_id, 0);
226     ASSERT_GT(info.table_properties.data_size, 0U);
227     ASSERT_GT(info.table_properties.raw_key_size, 0U);
228     ASSERT_GT(info.table_properties.raw_value_size, 0U);
229     ASSERT_GT(info.table_properties.num_data_blocks, 0U);
230     ASSERT_GT(info.table_properties.num_entries, 0U);
231     ASSERT_EQ(info.file_checksum, kUnknownFileChecksum);
232     ASSERT_EQ(info.file_checksum_func_name, kUnknownFileChecksumFuncName);
233 
234 #ifdef ROCKSDB_USING_THREAD_STATUS
235     // Verify the id of the current thread that created this table
236     // file matches the id of any active flush or compaction thread.
237     uint64_t thread_id = env_->GetThreadID();
238     std::vector<ThreadStatus> thread_list;
239     ASSERT_OK(env_->GetThreadList(&thread_list));
240     bool found_match = false;
241     for (auto thread_status : thread_list) {
242       if (thread_status.operation_type == ThreadStatus::OP_FLUSH ||
243           thread_status.operation_type == ThreadStatus::OP_COMPACTION) {
244         if (thread_id == thread_status.thread_id) {
245           found_match = true;
246           break;
247         }
248       }
249     }
250     ASSERT_TRUE(found_match);
251 #endif  // ROCKSDB_USING_THREAD_STATUS
252   }
253 
OnFlushCompleted(DB * db,const FlushJobInfo & info)254   void OnFlushCompleted(
255       DB* db, const FlushJobInfo& info) override {
256     flushed_dbs_.push_back(db);
257     flushed_column_family_names_.push_back(info.cf_name);
258     if (info.triggered_writes_slowdown) {
259       slowdown_count++;
260     }
261     if (info.triggered_writes_stop) {
262       stop_count++;
263     }
264     // verify whether the previously created file matches the flushed file.
265     ASSERT_EQ(prev_fc_info_.db_name, db->GetName());
266     ASSERT_EQ(prev_fc_info_.cf_name, info.cf_name);
267     ASSERT_EQ(prev_fc_info_.job_id, info.job_id);
268     ASSERT_EQ(prev_fc_info_.file_path, info.file_path);
269     ASSERT_EQ(TableFileNameToNumber(info.file_path), info.file_number);
270 
271     // Note: the following chunk relies on the notification pertaining to the
272     // database pointed to by DBTestBase::db_, and is thus bypassed when
273     // that assumption does not hold (see the test case MultiDBMultiListeners
274     // below).
275     ASSERT_TRUE(test_);
276     if (db == test_->db_) {
277       std::vector<std::vector<FileMetaData>> files_by_level;
278       test_->dbfull()->TEST_GetFilesMetaData(test_->handles_[info.cf_id],
279                                              &files_by_level);
280 
281       ASSERT_FALSE(files_by_level.empty());
282       auto it = std::find_if(files_by_level[0].begin(), files_by_level[0].end(),
283                              [&](const FileMetaData& meta) {
284                                return meta.fd.GetNumber() == info.file_number;
285                              });
286       ASSERT_NE(it, files_by_level[0].end());
287       ASSERT_EQ(info.oldest_blob_file_number, it->oldest_blob_file_number);
288     }
289 
290     ASSERT_EQ(db->GetEnv()->GetThreadID(), info.thread_id);
291     ASSERT_GT(info.thread_id, 0U);
292     ASSERT_EQ(info.table_properties.user_collected_properties.find("0")->second,
293               "1");
294   }
295 
296   std::vector<std::string> flushed_column_family_names_;
297   std::vector<DB*> flushed_dbs_;
298   int slowdown_count;
299   int stop_count;
300   bool db_closing;
301   std::atomic_bool db_closed;
302   TableFileCreationInfo prev_fc_info_;
303 
304  protected:
305   Env* env_;
306   EventListenerTest* test_;
307 };
308 
TEST_F(EventListenerTest,OnSingleDBFlushTest)309 TEST_F(EventListenerTest, OnSingleDBFlushTest) {
310   Options options;
311   options.env = CurrentOptions().env;
312   options.write_buffer_size = k110KB;
313 #ifdef ROCKSDB_USING_THREAD_STATUS
314   options.enable_thread_tracking = true;
315 #endif  // ROCKSDB_USING_THREAD_STATUS
316   TestFlushListener* listener = new TestFlushListener(options.env, this);
317   options.listeners.emplace_back(listener);
318   std::vector<std::string> cf_names = {
319       "pikachu", "ilya", "muromec", "dobrynia",
320       "nikitich", "alyosha", "popovich"};
321   options.table_properties_collector_factories.push_back(
322       std::make_shared<TestPropertiesCollectorFactory>());
323   CreateAndReopenWithCF(cf_names, options);
324 
325   ASSERT_OK(Put(1, "pikachu", std::string(90000, 'p')));
326 
327   WriteBatch batch;
328   ASSERT_OK(WriteBatchInternal::PutBlobIndex(&batch, 1, "ditto",
329                                              BlobStr(456, 0, 1 << 10)));
330   ASSERT_OK(dbfull()->Write(WriteOptions(), &batch));
331 
332   ASSERT_OK(Put(2, "ilya", std::string(90000, 'i')));
333   ASSERT_OK(Put(3, "muromec", std::string(90000, 'm')));
334   ASSERT_OK(Put(4, "dobrynia", std::string(90000, 'd')));
335   ASSERT_OK(Put(5, "nikitich", std::string(90000, 'n')));
336   ASSERT_OK(Put(6, "alyosha", std::string(90000, 'a')));
337   ASSERT_OK(Put(7, "popovich", std::string(90000, 'p')));
338   for (int i = 1; i < 8; ++i) {
339     ASSERT_OK(Flush(i));
340     ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
341     ASSERT_EQ(listener->flushed_dbs_.size(), i);
342     ASSERT_EQ(listener->flushed_column_family_names_.size(), i);
343   }
344 
345   // make sure callback functions are called in the right order
346   for (size_t i = 0; i < cf_names.size(); ++i) {
347     ASSERT_EQ(listener->flushed_dbs_[i], db_);
348     ASSERT_EQ(listener->flushed_column_family_names_[i], cf_names[i]);
349   }
350 }
351 
TEST_F(EventListenerTest,MultiCF)352 TEST_F(EventListenerTest, MultiCF) {
353   Options options;
354   options.env = CurrentOptions().env;
355   options.write_buffer_size = k110KB;
356 #ifdef ROCKSDB_USING_THREAD_STATUS
357   options.enable_thread_tracking = true;
358 #endif  // ROCKSDB_USING_THREAD_STATUS
359   TestFlushListener* listener = new TestFlushListener(options.env, this);
360   options.listeners.emplace_back(listener);
361   options.table_properties_collector_factories.push_back(
362       std::make_shared<TestPropertiesCollectorFactory>());
363   std::vector<std::string> cf_names = {
364       "pikachu", "ilya", "muromec", "dobrynia",
365       "nikitich", "alyosha", "popovich"};
366   CreateAndReopenWithCF(cf_names, options);
367 
368   ASSERT_OK(Put(1, "pikachu", std::string(90000, 'p')));
369   ASSERT_OK(Put(2, "ilya", std::string(90000, 'i')));
370   ASSERT_OK(Put(3, "muromec", std::string(90000, 'm')));
371   ASSERT_OK(Put(4, "dobrynia", std::string(90000, 'd')));
372   ASSERT_OK(Put(5, "nikitich", std::string(90000, 'n')));
373   ASSERT_OK(Put(6, "alyosha", std::string(90000, 'a')));
374   ASSERT_OK(Put(7, "popovich", std::string(90000, 'p')));
375   for (int i = 1; i < 8; ++i) {
376     ASSERT_OK(Flush(i));
377     ASSERT_EQ(listener->flushed_dbs_.size(), i);
378     ASSERT_EQ(listener->flushed_column_family_names_.size(), i);
379   }
380 
381   // make sure callback functions are called in the right order
382   for (size_t i = 0; i < cf_names.size(); i++) {
383     ASSERT_EQ(listener->flushed_dbs_[i], db_);
384     ASSERT_EQ(listener->flushed_column_family_names_[i], cf_names[i]);
385   }
386 }
387 
TEST_F(EventListenerTest,MultiDBMultiListeners)388 TEST_F(EventListenerTest, MultiDBMultiListeners) {
389   Options options;
390   options.env = CurrentOptions().env;
391 #ifdef ROCKSDB_USING_THREAD_STATUS
392   options.enable_thread_tracking = true;
393 #endif  // ROCKSDB_USING_THREAD_STATUS
394   options.table_properties_collector_factories.push_back(
395       std::make_shared<TestPropertiesCollectorFactory>());
396   std::vector<TestFlushListener*> listeners;
397   const int kNumDBs = 5;
398   const int kNumListeners = 10;
399   for (int i = 0; i < kNumListeners; ++i) {
400     listeners.emplace_back(new TestFlushListener(options.env, this));
401   }
402 
403   std::vector<std::string> cf_names = {
404       "pikachu", "ilya", "muromec", "dobrynia",
405       "nikitich", "alyosha", "popovich"};
406 
407   options.create_if_missing = true;
408   for (int i = 0; i < kNumListeners; ++i) {
409     options.listeners.emplace_back(listeners[i]);
410   }
411   DBOptions db_opts(options);
412   ColumnFamilyOptions cf_opts(options);
413 
414   std::vector<DB*> dbs;
415   std::vector<std::vector<ColumnFamilyHandle *>> vec_handles;
416 
417   for (int d = 0; d < kNumDBs; ++d) {
418     ASSERT_OK(DestroyDB(dbname_ + ToString(d), options));
419     DB* db;
420     std::vector<ColumnFamilyHandle*> handles;
421     ASSERT_OK(DB::Open(options, dbname_ + ToString(d), &db));
422     for (size_t c = 0; c < cf_names.size(); ++c) {
423       ColumnFamilyHandle* handle;
424       ASSERT_OK(db->CreateColumnFamily(cf_opts, cf_names[c], &handle));
425       handles.push_back(handle);
426     }
427 
428     vec_handles.push_back(std::move(handles));
429     dbs.push_back(db);
430   }
431 
432   for (int d = 0; d < kNumDBs; ++d) {
433     for (size_t c = 0; c < cf_names.size(); ++c) {
434       ASSERT_OK(dbs[d]->Put(WriteOptions(), vec_handles[d][c],
435                 cf_names[c], cf_names[c]));
436     }
437   }
438 
439   for (size_t c = 0; c < cf_names.size(); ++c) {
440     for (int d = 0; d < kNumDBs; ++d) {
441       ASSERT_OK(dbs[d]->Flush(FlushOptions(), vec_handles[d][c]));
442       ASSERT_OK(
443           static_cast_with_check<DBImpl>(dbs[d])->TEST_WaitForFlushMemTable());
444     }
445   }
446 
447   for (auto* listener : listeners) {
448     int pos = 0;
449     for (size_t c = 0; c < cf_names.size(); ++c) {
450       for (int d = 0; d < kNumDBs; ++d) {
451         ASSERT_EQ(listener->flushed_dbs_[pos], dbs[d]);
452         ASSERT_EQ(listener->flushed_column_family_names_[pos], cf_names[c]);
453         pos++;
454       }
455     }
456   }
457 
458 
459   for (auto handles : vec_handles) {
460     for (auto h : handles) {
461       delete h;
462     }
463     handles.clear();
464   }
465   vec_handles.clear();
466 
467   for (auto db : dbs) {
468     delete db;
469   }
470 }
471 
TEST_F(EventListenerTest,DisableBGCompaction)472 TEST_F(EventListenerTest, DisableBGCompaction) {
473   Options options;
474   options.env = CurrentOptions().env;
475 #ifdef ROCKSDB_USING_THREAD_STATUS
476   options.enable_thread_tracking = true;
477 #endif  // ROCKSDB_USING_THREAD_STATUS
478   TestFlushListener* listener = new TestFlushListener(options.env, this);
479   const int kCompactionTrigger = 1;
480   const int kSlowdownTrigger = 5;
481   const int kStopTrigger = 100;
482   options.level0_file_num_compaction_trigger = kCompactionTrigger;
483   options.level0_slowdown_writes_trigger = kSlowdownTrigger;
484   options.level0_stop_writes_trigger = kStopTrigger;
485   options.max_write_buffer_number = 10;
486   options.listeners.emplace_back(listener);
487   // BG compaction is disabled.  Number of L0 files will simply keeps
488   // increasing in this test.
489   options.compaction_style = kCompactionStyleNone;
490   options.compression = kNoCompression;
491   options.write_buffer_size = 100000;  // Small write buffer
492   options.table_properties_collector_factories.push_back(
493       std::make_shared<TestPropertiesCollectorFactory>());
494 
495   CreateAndReopenWithCF({"pikachu"}, options);
496   ColumnFamilyMetaData cf_meta;
497   db_->GetColumnFamilyMetaData(handles_[1], &cf_meta);
498 
499   // keep writing until writes are forced to stop.
500   for (int i = 0; static_cast<int>(cf_meta.file_count) < kSlowdownTrigger * 10;
501        ++i) {
502     ASSERT_OK(Put(1, ToString(i), std::string(10000, 'x'), WriteOptions()));
503     FlushOptions fo;
504     fo.allow_write_stall = true;
505     ASSERT_OK(db_->Flush(fo, handles_[1]));
506     db_->GetColumnFamilyMetaData(handles_[1], &cf_meta);
507   }
508   ASSERT_GE(listener->slowdown_count, kSlowdownTrigger * 9);
509 }
510 
511 class TestCompactionReasonListener : public EventListener {
512  public:
OnCompactionCompleted(DB *,const CompactionJobInfo & ci)513   void OnCompactionCompleted(DB* /*db*/, const CompactionJobInfo& ci) override {
514     std::lock_guard<std::mutex> lock(mutex_);
515     compaction_reasons_.push_back(ci.compaction_reason);
516   }
517 
518   std::vector<CompactionReason> compaction_reasons_;
519   std::mutex mutex_;
520 };
521 
TEST_F(EventListenerTest,CompactionReasonLevel)522 TEST_F(EventListenerTest, CompactionReasonLevel) {
523   Options options;
524   options.env = CurrentOptions().env;
525   options.create_if_missing = true;
526   options.memtable_factory.reset(
527       new SpecialSkipListFactory(DBTestBase::kNumKeysByGenerateNewRandomFile));
528 
529   TestCompactionReasonListener* listener = new TestCompactionReasonListener();
530   options.listeners.emplace_back(listener);
531 
532   options.level0_file_num_compaction_trigger = 4;
533   options.compaction_style = kCompactionStyleLevel;
534 
535   DestroyAndReopen(options);
536   Random rnd(301);
537 
538   // Write 4 files in L0
539   for (int i = 0; i < 4; i++) {
540     GenerateNewRandomFile(&rnd);
541   }
542   ASSERT_OK(dbfull()->TEST_WaitForCompact());
543 
544   ASSERT_EQ(listener->compaction_reasons_.size(), 1);
545   ASSERT_EQ(listener->compaction_reasons_[0],
546             CompactionReason::kLevelL0FilesNum);
547 
548   DestroyAndReopen(options);
549 
550   // Write 3 non-overlapping files in L0
551   for (int k = 1; k <= 30; k++) {
552     ASSERT_OK(Put(Key(k), Key(k)));
553     if (k % 10 == 0) {
554       Flush();
555     }
556   }
557 
558   // Do a trivial move from L0 -> L1
559   ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
560 
561   options.max_bytes_for_level_base = 1;
562   Close();
563   listener->compaction_reasons_.clear();
564   Reopen(options);
565 
566   ASSERT_OK(dbfull()->TEST_WaitForCompact());
567   ASSERT_GT(listener->compaction_reasons_.size(), 1);
568 
569   for (auto compaction_reason : listener->compaction_reasons_) {
570     ASSERT_EQ(compaction_reason, CompactionReason::kLevelMaxLevelSize);
571   }
572 
573   options.disable_auto_compactions = true;
574   Close();
575   listener->compaction_reasons_.clear();
576   Reopen(options);
577 
578   ASSERT_OK(Put("key", "value"));
579   CompactRangeOptions cro;
580   cro.bottommost_level_compaction = BottommostLevelCompaction::kForceOptimized;
581   ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
582   ASSERT_GT(listener->compaction_reasons_.size(), 0);
583   for (auto compaction_reason : listener->compaction_reasons_) {
584     ASSERT_EQ(compaction_reason, CompactionReason::kManualCompaction);
585   }
586 }
587 
TEST_F(EventListenerTest,CompactionReasonUniversal)588 TEST_F(EventListenerTest, CompactionReasonUniversal) {
589   Options options;
590   options.env = CurrentOptions().env;
591   options.create_if_missing = true;
592   options.memtable_factory.reset(
593       new SpecialSkipListFactory(DBTestBase::kNumKeysByGenerateNewRandomFile));
594 
595   TestCompactionReasonListener* listener = new TestCompactionReasonListener();
596   options.listeners.emplace_back(listener);
597 
598   options.compaction_style = kCompactionStyleUniversal;
599 
600   Random rnd(301);
601 
602   options.level0_file_num_compaction_trigger = 8;
603   options.compaction_options_universal.max_size_amplification_percent = 100000;
604   options.compaction_options_universal.size_ratio = 100000;
605   DestroyAndReopen(options);
606   listener->compaction_reasons_.clear();
607 
608   // Write 8 files in L0
609   for (int i = 0; i < 8; i++) {
610     GenerateNewRandomFile(&rnd);
611   }
612   ASSERT_OK(dbfull()->TEST_WaitForCompact());
613 
614   ASSERT_GT(listener->compaction_reasons_.size(), 0);
615   for (auto compaction_reason : listener->compaction_reasons_) {
616     ASSERT_EQ(compaction_reason, CompactionReason::kUniversalSizeRatio);
617   }
618 
619   options.level0_file_num_compaction_trigger = 8;
620   options.compaction_options_universal.max_size_amplification_percent = 1;
621   options.compaction_options_universal.size_ratio = 100000;
622 
623   DestroyAndReopen(options);
624   listener->compaction_reasons_.clear();
625 
626   // Write 8 files in L0
627   for (int i = 0; i < 8; i++) {
628     GenerateNewRandomFile(&rnd);
629   }
630   ASSERT_OK(dbfull()->TEST_WaitForCompact());
631 
632   ASSERT_GT(listener->compaction_reasons_.size(), 0);
633   for (auto compaction_reason : listener->compaction_reasons_) {
634     ASSERT_EQ(compaction_reason, CompactionReason::kUniversalSizeAmplification);
635   }
636 
637   options.disable_auto_compactions = true;
638   Close();
639   listener->compaction_reasons_.clear();
640   Reopen(options);
641 
642   ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
643 
644   ASSERT_GT(listener->compaction_reasons_.size(), 0);
645   for (auto compaction_reason : listener->compaction_reasons_) {
646     ASSERT_EQ(compaction_reason, CompactionReason::kManualCompaction);
647   }
648 }
649 
TEST_F(EventListenerTest,CompactionReasonFIFO)650 TEST_F(EventListenerTest, CompactionReasonFIFO) {
651   Options options;
652   options.env = CurrentOptions().env;
653   options.create_if_missing = true;
654   options.memtable_factory.reset(
655       new SpecialSkipListFactory(DBTestBase::kNumKeysByGenerateNewRandomFile));
656 
657   TestCompactionReasonListener* listener = new TestCompactionReasonListener();
658   options.listeners.emplace_back(listener);
659 
660   options.level0_file_num_compaction_trigger = 4;
661   options.compaction_style = kCompactionStyleFIFO;
662   options.compaction_options_fifo.max_table_files_size = 1;
663 
664   DestroyAndReopen(options);
665   Random rnd(301);
666 
667   // Write 4 files in L0
668   for (int i = 0; i < 4; i++) {
669     GenerateNewRandomFile(&rnd);
670   }
671   ASSERT_OK(dbfull()->TEST_WaitForCompact());
672 
673   ASSERT_GT(listener->compaction_reasons_.size(), 0);
674   for (auto compaction_reason : listener->compaction_reasons_) {
675     ASSERT_EQ(compaction_reason, CompactionReason::kFIFOMaxSize);
676   }
677 }
678 
679 class TableFileCreationListener : public EventListener {
680  public:
681   class TestEnv : public EnvWrapper {
682    public:
TestEnv(Env * t)683     explicit TestEnv(Env* t) : EnvWrapper(t) {}
684 
SetStatus(Status s)685     void SetStatus(Status s) { status_ = s; }
686 
NewWritableFile(const std::string & fname,std::unique_ptr<WritableFile> * result,const EnvOptions & options)687     Status NewWritableFile(const std::string& fname,
688                            std::unique_ptr<WritableFile>* result,
689                            const EnvOptions& options) override {
690       if (fname.size() > 4 && fname.substr(fname.size() - 4) == ".sst") {
691         if (!status_.ok()) {
692           return status_;
693         }
694       }
695       return target()->NewWritableFile(fname, result, options);
696     }
697 
698    private:
699     Status status_;
700   };
701 
TableFileCreationListener()702   TableFileCreationListener() {
703     for (int i = 0; i < 2; i++) {
704       started_[i] = finished_[i] = failure_[i] = 0;
705     }
706   }
707 
Index(TableFileCreationReason reason)708   int Index(TableFileCreationReason reason) {
709     int idx;
710     switch (reason) {
711       case TableFileCreationReason::kFlush:
712         idx = 0;
713         break;
714       case TableFileCreationReason::kCompaction:
715         idx = 1;
716         break;
717       default:
718         idx = -1;
719     }
720     return idx;
721   }
722 
CheckAndResetCounters(int flush_started,int flush_finished,int flush_failure,int compaction_started,int compaction_finished,int compaction_failure)723   void CheckAndResetCounters(int flush_started, int flush_finished,
724                              int flush_failure, int compaction_started,
725                              int compaction_finished, int compaction_failure) {
726     ASSERT_EQ(started_[0], flush_started);
727     ASSERT_EQ(finished_[0], flush_finished);
728     ASSERT_EQ(failure_[0], flush_failure);
729     ASSERT_EQ(started_[1], compaction_started);
730     ASSERT_EQ(finished_[1], compaction_finished);
731     ASSERT_EQ(failure_[1], compaction_failure);
732     for (int i = 0; i < 2; i++) {
733       started_[i] = finished_[i] = failure_[i] = 0;
734     }
735   }
736 
OnTableFileCreationStarted(const TableFileCreationBriefInfo & info)737   void OnTableFileCreationStarted(
738       const TableFileCreationBriefInfo& info) override {
739     int idx = Index(info.reason);
740     if (idx >= 0) {
741       started_[idx]++;
742     }
743     ASSERT_GT(info.db_name.size(), 0U);
744     ASSERT_GT(info.cf_name.size(), 0U);
745     ASSERT_GT(info.file_path.size(), 0U);
746     ASSERT_GT(info.job_id, 0);
747   }
748 
OnTableFileCreated(const TableFileCreationInfo & info)749   void OnTableFileCreated(const TableFileCreationInfo& info) override {
750     int idx = Index(info.reason);
751     if (idx >= 0) {
752       finished_[idx]++;
753     }
754     ASSERT_GT(info.db_name.size(), 0U);
755     ASSERT_GT(info.cf_name.size(), 0U);
756     ASSERT_GT(info.file_path.size(), 0U);
757     ASSERT_GT(info.job_id, 0);
758     ASSERT_EQ(info.file_checksum, kUnknownFileChecksum);
759     ASSERT_EQ(info.file_checksum_func_name, kUnknownFileChecksumFuncName);
760     if (info.status.ok()) {
761       ASSERT_GT(info.table_properties.data_size, 0U);
762       ASSERT_GT(info.table_properties.raw_key_size, 0U);
763       ASSERT_GT(info.table_properties.raw_value_size, 0U);
764       ASSERT_GT(info.table_properties.num_data_blocks, 0U);
765       ASSERT_GT(info.table_properties.num_entries, 0U);
766     } else {
767       if (idx >= 0) {
768         failure_[idx]++;
769       }
770     }
771   }
772 
773   int started_[2];
774   int finished_[2];
775   int failure_[2];
776 };
777 
TEST_F(EventListenerTest,TableFileCreationListenersTest)778 TEST_F(EventListenerTest, TableFileCreationListenersTest) {
779   auto listener = std::make_shared<TableFileCreationListener>();
780   Options options;
781   std::unique_ptr<TableFileCreationListener::TestEnv> test_env(
782       new TableFileCreationListener::TestEnv(CurrentOptions().env));
783   options.create_if_missing = true;
784   options.listeners.push_back(listener);
785   options.env = test_env.get();
786   DestroyAndReopen(options);
787 
788   ASSERT_OK(Put("foo", "aaa"));
789   ASSERT_OK(Put("bar", "bbb"));
790   ASSERT_OK(Flush());
791   ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
792   listener->CheckAndResetCounters(1, 1, 0, 0, 0, 0);
793   ASSERT_OK(Put("foo", "aaa1"));
794   ASSERT_OK(Put("bar", "bbb1"));
795   test_env->SetStatus(Status::NotSupported("not supported"));
796   ASSERT_NOK(Flush());
797   listener->CheckAndResetCounters(1, 1, 1, 0, 0, 0);
798   test_env->SetStatus(Status::OK());
799 
800   Reopen(options);
801   ASSERT_OK(Put("foo", "aaa2"));
802   ASSERT_OK(Put("bar", "bbb2"));
803   ASSERT_OK(Flush());
804   ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
805   listener->CheckAndResetCounters(1, 1, 0, 0, 0, 0);
806 
807   const Slice kRangeStart = "a";
808   const Slice kRangeEnd = "z";
809   ASSERT_OK(
810       dbfull()->CompactRange(CompactRangeOptions(), &kRangeStart, &kRangeEnd));
811   ASSERT_OK(dbfull()->TEST_WaitForCompact());
812   listener->CheckAndResetCounters(0, 0, 0, 1, 1, 0);
813 
814   ASSERT_OK(Put("foo", "aaa3"));
815   ASSERT_OK(Put("bar", "bbb3"));
816   ASSERT_OK(Flush());
817   test_env->SetStatus(Status::NotSupported("not supported"));
818   ASSERT_NOK(
819       dbfull()->CompactRange(CompactRangeOptions(), &kRangeStart, &kRangeEnd));
820   ASSERT_NOK(dbfull()->TEST_WaitForCompact());
821   listener->CheckAndResetCounters(1, 1, 0, 1, 1, 1);
822   Close();
823 }
824 
825 class MemTableSealedListener : public EventListener {
826 private:
827   SequenceNumber latest_seq_number_;
828 public:
MemTableSealedListener()829   MemTableSealedListener() {}
OnMemTableSealed(const MemTableInfo & info)830   void OnMemTableSealed(const MemTableInfo& info) override {
831     latest_seq_number_ = info.first_seqno;
832   }
833 
OnFlushCompleted(DB *,const FlushJobInfo & flush_job_info)834   void OnFlushCompleted(DB* /*db*/,
835     const FlushJobInfo& flush_job_info) override {
836     ASSERT_LE(flush_job_info.smallest_seqno, latest_seq_number_);
837   }
838 };
839 
TEST_F(EventListenerTest,MemTableSealedListenerTest)840 TEST_F(EventListenerTest, MemTableSealedListenerTest) {
841   auto listener = std::make_shared<MemTableSealedListener>();
842   Options options;
843   options.env = CurrentOptions().env;
844   options.create_if_missing = true;
845   options.listeners.push_back(listener);
846   DestroyAndReopen(options);
847 
848   for (unsigned int i = 0; i < 10; i++) {
849     std::string tag = std::to_string(i);
850     ASSERT_OK(Put("foo"+tag, "aaa"));
851     ASSERT_OK(Put("bar"+tag, "bbb"));
852 
853     ASSERT_OK(Flush());
854   }
855 }
856 
857 class ColumnFamilyHandleDeletionStartedListener : public EventListener {
858  private:
859   std::vector<std::string> cfs_;
860   int counter;
861 
862  public:
ColumnFamilyHandleDeletionStartedListener(const std::vector<std::string> & cfs)863   explicit ColumnFamilyHandleDeletionStartedListener(
864       const std::vector<std::string>& cfs)
865       : cfs_(cfs), counter(0) {
866     cfs_.insert(cfs_.begin(), kDefaultColumnFamilyName);
867   }
OnColumnFamilyHandleDeletionStarted(ColumnFamilyHandle * handle)868   void OnColumnFamilyHandleDeletionStarted(
869       ColumnFamilyHandle* handle) override {
870     ASSERT_EQ(cfs_[handle->GetID()], handle->GetName());
871     counter++;
872   }
getCounter()873   int getCounter() { return counter; }
874 };
875 
TEST_F(EventListenerTest,ColumnFamilyHandleDeletionStartedListenerTest)876 TEST_F(EventListenerTest, ColumnFamilyHandleDeletionStartedListenerTest) {
877   std::vector<std::string> cfs{"pikachu", "eevee", "Mewtwo"};
878   auto listener =
879       std::make_shared<ColumnFamilyHandleDeletionStartedListener>(cfs);
880   Options options;
881   options.env = CurrentOptions().env;
882   options.create_if_missing = true;
883   options.listeners.push_back(listener);
884   CreateAndReopenWithCF(cfs, options);
885   ASSERT_EQ(handles_.size(), 4);
886   delete handles_[3];
887   delete handles_[2];
888   delete handles_[1];
889   handles_.resize(1);
890   ASSERT_EQ(listener->getCounter(), 3);
891 }
892 
893 class BackgroundErrorListener : public EventListener {
894  private:
895   SpecialEnv* env_;
896   int counter_;
897 
898  public:
BackgroundErrorListener(SpecialEnv * env)899   BackgroundErrorListener(SpecialEnv* env) : env_(env), counter_(0) {}
900 
OnBackgroundError(BackgroundErrorReason,Status * bg_error)901   void OnBackgroundError(BackgroundErrorReason /*reason*/,
902                          Status* bg_error) override {
903     if (counter_ == 0) {
904       // suppress the first error and disable write-dropping such that a retry
905       // can succeed.
906       *bg_error = Status::OK();
907       env_->drop_writes_.store(false, std::memory_order_release);
908       env_->SetMockSleep(false);
909     }
910     ++counter_;
911   }
912 
counter()913   int counter() { return counter_; }
914 };
915 
TEST_F(EventListenerTest,BackgroundErrorListenerFailedFlushTest)916 TEST_F(EventListenerTest, BackgroundErrorListenerFailedFlushTest) {
917   auto listener = std::make_shared<BackgroundErrorListener>(env_);
918   Options options;
919   options.create_if_missing = true;
920   options.env = env_;
921   options.listeners.push_back(listener);
922   options.memtable_factory.reset(new SpecialSkipListFactory(1));
923   options.paranoid_checks = true;
924   DestroyAndReopen(options);
925 
926   // the usual TEST_WaitForFlushMemTable() doesn't work for failed flushes, so
927   // forge a custom one for the failed flush case.
928   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
929       {{"DBImpl::BGWorkFlush:done",
930         "EventListenerTest:BackgroundErrorListenerFailedFlushTest:1"}});
931   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
932 
933   env_->drop_writes_.store(true, std::memory_order_release);
934   env_->SetMockSleep();
935 
936   ASSERT_OK(Put("key0", "val"));
937   ASSERT_OK(Put("key1", "val"));
938   TEST_SYNC_POINT("EventListenerTest:BackgroundErrorListenerFailedFlushTest:1");
939   ASSERT_EQ(1, listener->counter());
940   ASSERT_OK(Put("key2", "val"));
941   ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
942   ASSERT_EQ(1, NumTableFilesAtLevel(0));
943 }
944 
TEST_F(EventListenerTest,BackgroundErrorListenerFailedCompactionTest)945 TEST_F(EventListenerTest, BackgroundErrorListenerFailedCompactionTest) {
946   auto listener = std::make_shared<BackgroundErrorListener>(env_);
947   Options options;
948   options.create_if_missing = true;
949   options.disable_auto_compactions = true;
950   options.env = env_;
951   options.level0_file_num_compaction_trigger = 2;
952   options.listeners.push_back(listener);
953   options.memtable_factory.reset(new SpecialSkipListFactory(2));
954   options.paranoid_checks = true;
955   DestroyAndReopen(options);
956 
957   // third iteration triggers the second memtable's flush
958   for (int i = 0; i < 3; ++i) {
959     ASSERT_OK(Put("key0", "val"));
960     if (i > 0) {
961       ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
962     }
963     ASSERT_OK(Put("key1", "val"));
964   }
965   ASSERT_EQ(2, NumTableFilesAtLevel(0));
966 
967   env_->drop_writes_.store(true, std::memory_order_release);
968   env_->SetMockSleep();
969   ASSERT_OK(dbfull()->SetOptions({{"disable_auto_compactions", "false"}}));
970   ASSERT_OK(dbfull()->TEST_WaitForCompact());
971   ASSERT_EQ(1, listener->counter());
972 
973   // trigger flush so compaction is triggered again; this time it succeeds
974   // The previous failed compaction may get retried automatically, so we may
975   // be left with 0 or 1 files in level 1, depending on when the retry gets
976   // scheduled
977   ASSERT_OK(Put("key0", "val"));
978   ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
979   ASSERT_OK(dbfull()->TEST_WaitForCompact());
980   ASSERT_LE(1, NumTableFilesAtLevel(0));
981 }
982 
983 class TestFileOperationListener : public EventListener {
984  public:
TestFileOperationListener()985   TestFileOperationListener() {
986     file_reads_.store(0);
987     file_reads_success_.store(0);
988     file_writes_.store(0);
989     file_writes_success_.store(0);
990     file_flushes_.store(0);
991     file_flushes_success_.store(0);
992     file_closes_.store(0);
993     file_closes_success_.store(0);
994     file_syncs_.store(0);
995     file_syncs_success_.store(0);
996     file_truncates_.store(0);
997     file_truncates_success_.store(0);
998   }
999 
OnFileReadFinish(const FileOperationInfo & info)1000   void OnFileReadFinish(const FileOperationInfo& info) override {
1001     ++file_reads_;
1002     if (info.status.ok()) {
1003       ++file_reads_success_;
1004     }
1005     ReportDuration(info);
1006   }
1007 
OnFileWriteFinish(const FileOperationInfo & info)1008   void OnFileWriteFinish(const FileOperationInfo& info) override {
1009     ++file_writes_;
1010     if (info.status.ok()) {
1011       ++file_writes_success_;
1012     }
1013     ReportDuration(info);
1014   }
1015 
OnFileFlushFinish(const FileOperationInfo & info)1016   void OnFileFlushFinish(const FileOperationInfo& info) override {
1017     ++file_flushes_;
1018     if (info.status.ok()) {
1019       ++file_flushes_success_;
1020     }
1021     ReportDuration(info);
1022   }
1023 
OnFileCloseFinish(const FileOperationInfo & info)1024   void OnFileCloseFinish(const FileOperationInfo& info) override {
1025     ++file_closes_;
1026     if (info.status.ok()) {
1027       ++file_closes_success_;
1028     }
1029     ReportDuration(info);
1030   }
1031 
OnFileSyncFinish(const FileOperationInfo & info)1032   void OnFileSyncFinish(const FileOperationInfo& info) override {
1033     ++file_syncs_;
1034     if (info.status.ok()) {
1035       ++file_syncs_success_;
1036     }
1037     ReportDuration(info);
1038   }
1039 
OnFileTruncateFinish(const FileOperationInfo & info)1040   void OnFileTruncateFinish(const FileOperationInfo& info) override {
1041     ++file_truncates_;
1042     if (info.status.ok()) {
1043       ++file_truncates_success_;
1044     }
1045     ReportDuration(info);
1046   }
1047 
ShouldBeNotifiedOnFileIO()1048   bool ShouldBeNotifiedOnFileIO() override { return true; }
1049 
1050   std::atomic<size_t> file_reads_;
1051   std::atomic<size_t> file_reads_success_;
1052   std::atomic<size_t> file_writes_;
1053   std::atomic<size_t> file_writes_success_;
1054   std::atomic<size_t> file_flushes_;
1055   std::atomic<size_t> file_flushes_success_;
1056   std::atomic<size_t> file_closes_;
1057   std::atomic<size_t> file_closes_success_;
1058   std::atomic<size_t> file_syncs_;
1059   std::atomic<size_t> file_syncs_success_;
1060   std::atomic<size_t> file_truncates_;
1061   std::atomic<size_t> file_truncates_success_;
1062 
1063  private:
ReportDuration(const FileOperationInfo & info) const1064   void ReportDuration(const FileOperationInfo& info) const {
1065     ASSERT_GT(info.duration.count(), 0);
1066   }
1067 };
1068 
TEST_F(EventListenerTest,OnFileOperationTest)1069 TEST_F(EventListenerTest, OnFileOperationTest) {
1070   Options options;
1071   options.env = CurrentOptions().env;
1072   options.create_if_missing = true;
1073 
1074   TestFileOperationListener* listener = new TestFileOperationListener();
1075   options.listeners.emplace_back(listener);
1076 
1077   options.use_direct_io_for_flush_and_compaction = false;
1078   Status s = TryReopen(options);
1079   if (s.IsInvalidArgument()) {
1080     options.use_direct_io_for_flush_and_compaction = false;
1081   } else {
1082     ASSERT_OK(s);
1083   }
1084   DestroyAndReopen(options);
1085   ASSERT_OK(Put("foo", "aaa"));
1086   ASSERT_OK(dbfull()->Flush(FlushOptions()));
1087   ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
1088   ASSERT_GE(listener->file_writes_.load(),
1089             listener->file_writes_success_.load());
1090   ASSERT_GT(listener->file_writes_.load(), 0);
1091   ASSERT_GE(listener->file_flushes_.load(),
1092             listener->file_flushes_success_.load());
1093   ASSERT_GT(listener->file_flushes_.load(), 0);
1094   Close();
1095 
1096   Reopen(options);
1097   ASSERT_GE(listener->file_reads_.load(), listener->file_reads_success_.load());
1098   ASSERT_GT(listener->file_reads_.load(), 0);
1099   ASSERT_GE(listener->file_closes_.load(),
1100             listener->file_closes_success_.load());
1101   ASSERT_GT(listener->file_closes_.load(), 0);
1102   ASSERT_GE(listener->file_syncs_.load(), listener->file_syncs_success_.load());
1103   ASSERT_GT(listener->file_syncs_.load(), 0);
1104   if (true == options.use_direct_io_for_flush_and_compaction) {
1105     ASSERT_GE(listener->file_truncates_.load(),
1106               listener->file_truncates_success_.load());
1107     ASSERT_GT(listener->file_truncates_.load(), 0);
1108   }
1109 }
1110 
1111 }  // namespace ROCKSDB_NAMESPACE
1112 
1113 #endif  // ROCKSDB_LITE
1114 
main(int argc,char ** argv)1115 int main(int argc, char** argv) {
1116   ::testing::InitGoogleTest(&argc, argv);
1117   return RUN_ALL_TESTS();
1118 }
1119