1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #include "db/blob/blob_index.h"
7 #include "db/db_impl/db_impl.h"
8 #include "db/db_test_util.h"
9 #include "db/dbformat.h"
10 #include "db/version_set.h"
11 #include "db/write_batch_internal.h"
12 #include "file/filename.h"
13 #include "memtable/hash_linklist_rep.h"
14 #include "monitoring/statistics.h"
15 #include "rocksdb/cache.h"
16 #include "rocksdb/compaction_filter.h"
17 #include "rocksdb/db.h"
18 #include "rocksdb/env.h"
19 #include "rocksdb/filter_policy.h"
20 #include "rocksdb/options.h"
21 #include "rocksdb/perf_context.h"
22 #include "rocksdb/slice.h"
23 #include "rocksdb/slice_transform.h"
24 #include "rocksdb/table.h"
25 #include "rocksdb/table_properties.h"
26 #include "test_util/sync_point.h"
27 #include "test_util/testharness.h"
28 #include "test_util/testutil.h"
29 #include "util/hash.h"
30 #include "util/mutexlock.h"
31 #include "util/rate_limiter.h"
32 #include "util/string_util.h"
33 #include "utilities/merge_operators.h"
34
35 #ifndef ROCKSDB_LITE
36
37 namespace ROCKSDB_NAMESPACE {
38
39 class EventListenerTest : public DBTestBase {
40 public:
EventListenerTest()41 EventListenerTest() : DBTestBase("/listener_test", /*env_do_fsync=*/true) {}
42
BlobStr(uint64_t blob_file_number,uint64_t offset,uint64_t size)43 static std::string BlobStr(uint64_t blob_file_number, uint64_t offset,
44 uint64_t size) {
45 std::string blob_index;
46 BlobIndex::EncodeBlob(&blob_index, blob_file_number, offset, size,
47 kNoCompression);
48 return blob_index;
49 }
50
51 const size_t k110KB = 110 << 10;
52 };
53
54 struct TestPropertiesCollector
55 : public ROCKSDB_NAMESPACE::TablePropertiesCollector {
AddUserKeyROCKSDB_NAMESPACE::TestPropertiesCollector56 ROCKSDB_NAMESPACE::Status AddUserKey(
57 const ROCKSDB_NAMESPACE::Slice& /*key*/,
58 const ROCKSDB_NAMESPACE::Slice& /*value*/,
59 ROCKSDB_NAMESPACE::EntryType /*type*/,
60 ROCKSDB_NAMESPACE::SequenceNumber /*seq*/,
61 uint64_t /*file_size*/) override {
62 return Status::OK();
63 }
FinishROCKSDB_NAMESPACE::TestPropertiesCollector64 ROCKSDB_NAMESPACE::Status Finish(
65 ROCKSDB_NAMESPACE::UserCollectedProperties* properties) override {
66 properties->insert({"0", "1"});
67 return Status::OK();
68 }
69
NameROCKSDB_NAMESPACE::TestPropertiesCollector70 const char* Name() const override { return "TestTablePropertiesCollector"; }
71
GetReadablePropertiesROCKSDB_NAMESPACE::TestPropertiesCollector72 ROCKSDB_NAMESPACE::UserCollectedProperties GetReadableProperties()
73 const override {
74 ROCKSDB_NAMESPACE::UserCollectedProperties ret;
75 ret["2"] = "3";
76 return ret;
77 }
78 };
79
80 class TestPropertiesCollectorFactory : public TablePropertiesCollectorFactory {
81 public:
CreateTablePropertiesCollector(TablePropertiesCollectorFactory::Context)82 TablePropertiesCollector* CreateTablePropertiesCollector(
83 TablePropertiesCollectorFactory::Context /*context*/) override {
84 return new TestPropertiesCollector;
85 }
Name() const86 const char* Name() const override { return "TestTablePropertiesCollector"; }
87 };
88
89 class TestCompactionListener : public EventListener {
90 public:
TestCompactionListener(EventListenerTest * test)91 explicit TestCompactionListener(EventListenerTest* test) : test_(test) {}
92
OnCompactionCompleted(DB * db,const CompactionJobInfo & ci)93 void OnCompactionCompleted(DB *db, const CompactionJobInfo& ci) override {
94 std::lock_guard<std::mutex> lock(mutex_);
95 compacted_dbs_.push_back(db);
96 ASSERT_GT(ci.input_files.size(), 0U);
97 ASSERT_EQ(ci.input_files.size(), ci.input_file_infos.size());
98
99 for (size_t i = 0; i < ci.input_file_infos.size(); ++i) {
100 ASSERT_EQ(ci.input_file_infos[i].level, ci.base_input_level);
101 ASSERT_EQ(ci.input_file_infos[i].file_number,
102 TableFileNameToNumber(ci.input_files[i]));
103 }
104
105 ASSERT_GT(ci.output_files.size(), 0U);
106 ASSERT_EQ(ci.output_files.size(), ci.output_file_infos.size());
107
108 ASSERT_TRUE(test_);
109 ASSERT_EQ(test_->db_, db);
110
111 std::vector<std::vector<FileMetaData>> files_by_level;
112 test_->dbfull()->TEST_GetFilesMetaData(test_->handles_[ci.cf_id],
113 &files_by_level);
114 ASSERT_GT(files_by_level.size(), ci.output_level);
115
116 for (size_t i = 0; i < ci.output_file_infos.size(); ++i) {
117 ASSERT_EQ(ci.output_file_infos[i].level, ci.output_level);
118 ASSERT_EQ(ci.output_file_infos[i].file_number,
119 TableFileNameToNumber(ci.output_files[i]));
120
121 auto it = std::find_if(
122 files_by_level[ci.output_level].begin(),
123 files_by_level[ci.output_level].end(), [&](const FileMetaData& meta) {
124 return meta.fd.GetNumber() == ci.output_file_infos[i].file_number;
125 });
126 ASSERT_NE(it, files_by_level[ci.output_level].end());
127
128 ASSERT_EQ(ci.output_file_infos[i].oldest_blob_file_number,
129 it->oldest_blob_file_number);
130 }
131
132 ASSERT_EQ(db->GetEnv()->GetThreadID(), ci.thread_id);
133 ASSERT_GT(ci.thread_id, 0U);
134
135 for (auto fl : {ci.input_files, ci.output_files}) {
136 for (auto fn : fl) {
137 auto it = ci.table_properties.find(fn);
138 ASSERT_NE(it, ci.table_properties.end());
139 auto tp = it->second;
140 ASSERT_TRUE(tp != nullptr);
141 ASSERT_EQ(tp->user_collected_properties.find("0")->second, "1");
142 }
143 }
144 }
145
146 EventListenerTest* test_;
147 std::vector<DB*> compacted_dbs_;
148 std::mutex mutex_;
149 };
150
TEST_F(EventListenerTest,OnSingleDBCompactionTest)151 TEST_F(EventListenerTest, OnSingleDBCompactionTest) {
152 const int kTestKeySize = 16;
153 const int kTestValueSize = 984;
154 const int kEntrySize = kTestKeySize + kTestValueSize;
155 const int kEntriesPerBuffer = 100;
156 const int kNumL0Files = 4;
157
158 Options options;
159 options.env = CurrentOptions().env;
160 options.create_if_missing = true;
161 options.write_buffer_size = kEntrySize * kEntriesPerBuffer;
162 options.compaction_style = kCompactionStyleLevel;
163 options.target_file_size_base = options.write_buffer_size;
164 options.max_bytes_for_level_base = options.target_file_size_base * 2;
165 options.max_bytes_for_level_multiplier = 2;
166 options.compression = kNoCompression;
167 #ifdef ROCKSDB_USING_THREAD_STATUS
168 options.enable_thread_tracking = true;
169 #endif // ROCKSDB_USING_THREAD_STATUS
170 options.level0_file_num_compaction_trigger = kNumL0Files;
171 options.table_properties_collector_factories.push_back(
172 std::make_shared<TestPropertiesCollectorFactory>());
173
174 TestCompactionListener* listener = new TestCompactionListener(this);
175 options.listeners.emplace_back(listener);
176 std::vector<std::string> cf_names = {
177 "pikachu", "ilya", "muromec", "dobrynia",
178 "nikitich", "alyosha", "popovich"};
179 CreateAndReopenWithCF(cf_names, options);
180 ASSERT_OK(Put(1, "pikachu", std::string(90000, 'p')));
181
182 WriteBatch batch;
183 ASSERT_OK(WriteBatchInternal::PutBlobIndex(&batch, 1, "ditto",
184 BlobStr(123, 0, 1 << 10)));
185 ASSERT_OK(dbfull()->Write(WriteOptions(), &batch));
186
187 ASSERT_OK(Put(2, "ilya", std::string(90000, 'i')));
188 ASSERT_OK(Put(3, "muromec", std::string(90000, 'm')));
189 ASSERT_OK(Put(4, "dobrynia", std::string(90000, 'd')));
190 ASSERT_OK(Put(5, "nikitich", std::string(90000, 'n')));
191 ASSERT_OK(Put(6, "alyosha", std::string(90000, 'a')));
192 ASSERT_OK(Put(7, "popovich", std::string(90000, 'p')));
193 for (int i = 1; i < 8; ++i) {
194 ASSERT_OK(Flush(i));
195 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
196 ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[i],
197 nullptr, nullptr));
198 ASSERT_OK(dbfull()->TEST_WaitForCompact());
199 }
200
201 ASSERT_EQ(listener->compacted_dbs_.size(), cf_names.size());
202 for (size_t i = 0; i < cf_names.size(); ++i) {
203 ASSERT_EQ(listener->compacted_dbs_[i], db_);
204 }
205 }
206
207 // This simple Listener can only handle one flush at a time.
208 class TestFlushListener : public EventListener {
209 public:
TestFlushListener(Env * env,EventListenerTest * test)210 TestFlushListener(Env* env, EventListenerTest* test)
211 : slowdown_count(0), stop_count(0), db_closed(), env_(env), test_(test) {
212 db_closed = false;
213 }
214
~TestFlushListener()215 virtual ~TestFlushListener() {
216 prev_fc_info_.status.PermitUncheckedError(); // Ignore the status
217 }
OnTableFileCreated(const TableFileCreationInfo & info)218 void OnTableFileCreated(
219 const TableFileCreationInfo& info) override {
220 // remember the info for later checking the FlushJobInfo.
221 prev_fc_info_ = info;
222 ASSERT_GT(info.db_name.size(), 0U);
223 ASSERT_GT(info.cf_name.size(), 0U);
224 ASSERT_GT(info.file_path.size(), 0U);
225 ASSERT_GT(info.job_id, 0);
226 ASSERT_GT(info.table_properties.data_size, 0U);
227 ASSERT_GT(info.table_properties.raw_key_size, 0U);
228 ASSERT_GT(info.table_properties.raw_value_size, 0U);
229 ASSERT_GT(info.table_properties.num_data_blocks, 0U);
230 ASSERT_GT(info.table_properties.num_entries, 0U);
231 ASSERT_EQ(info.file_checksum, kUnknownFileChecksum);
232 ASSERT_EQ(info.file_checksum_func_name, kUnknownFileChecksumFuncName);
233
234 #ifdef ROCKSDB_USING_THREAD_STATUS
235 // Verify the id of the current thread that created this table
236 // file matches the id of any active flush or compaction thread.
237 uint64_t thread_id = env_->GetThreadID();
238 std::vector<ThreadStatus> thread_list;
239 ASSERT_OK(env_->GetThreadList(&thread_list));
240 bool found_match = false;
241 for (auto thread_status : thread_list) {
242 if (thread_status.operation_type == ThreadStatus::OP_FLUSH ||
243 thread_status.operation_type == ThreadStatus::OP_COMPACTION) {
244 if (thread_id == thread_status.thread_id) {
245 found_match = true;
246 break;
247 }
248 }
249 }
250 ASSERT_TRUE(found_match);
251 #endif // ROCKSDB_USING_THREAD_STATUS
252 }
253
OnFlushCompleted(DB * db,const FlushJobInfo & info)254 void OnFlushCompleted(
255 DB* db, const FlushJobInfo& info) override {
256 flushed_dbs_.push_back(db);
257 flushed_column_family_names_.push_back(info.cf_name);
258 if (info.triggered_writes_slowdown) {
259 slowdown_count++;
260 }
261 if (info.triggered_writes_stop) {
262 stop_count++;
263 }
264 // verify whether the previously created file matches the flushed file.
265 ASSERT_EQ(prev_fc_info_.db_name, db->GetName());
266 ASSERT_EQ(prev_fc_info_.cf_name, info.cf_name);
267 ASSERT_EQ(prev_fc_info_.job_id, info.job_id);
268 ASSERT_EQ(prev_fc_info_.file_path, info.file_path);
269 ASSERT_EQ(TableFileNameToNumber(info.file_path), info.file_number);
270
271 // Note: the following chunk relies on the notification pertaining to the
272 // database pointed to by DBTestBase::db_, and is thus bypassed when
273 // that assumption does not hold (see the test case MultiDBMultiListeners
274 // below).
275 ASSERT_TRUE(test_);
276 if (db == test_->db_) {
277 std::vector<std::vector<FileMetaData>> files_by_level;
278 test_->dbfull()->TEST_GetFilesMetaData(test_->handles_[info.cf_id],
279 &files_by_level);
280
281 ASSERT_FALSE(files_by_level.empty());
282 auto it = std::find_if(files_by_level[0].begin(), files_by_level[0].end(),
283 [&](const FileMetaData& meta) {
284 return meta.fd.GetNumber() == info.file_number;
285 });
286 ASSERT_NE(it, files_by_level[0].end());
287 ASSERT_EQ(info.oldest_blob_file_number, it->oldest_blob_file_number);
288 }
289
290 ASSERT_EQ(db->GetEnv()->GetThreadID(), info.thread_id);
291 ASSERT_GT(info.thread_id, 0U);
292 ASSERT_EQ(info.table_properties.user_collected_properties.find("0")->second,
293 "1");
294 }
295
296 std::vector<std::string> flushed_column_family_names_;
297 std::vector<DB*> flushed_dbs_;
298 int slowdown_count;
299 int stop_count;
300 bool db_closing;
301 std::atomic_bool db_closed;
302 TableFileCreationInfo prev_fc_info_;
303
304 protected:
305 Env* env_;
306 EventListenerTest* test_;
307 };
308
TEST_F(EventListenerTest,OnSingleDBFlushTest)309 TEST_F(EventListenerTest, OnSingleDBFlushTest) {
310 Options options;
311 options.env = CurrentOptions().env;
312 options.write_buffer_size = k110KB;
313 #ifdef ROCKSDB_USING_THREAD_STATUS
314 options.enable_thread_tracking = true;
315 #endif // ROCKSDB_USING_THREAD_STATUS
316 TestFlushListener* listener = new TestFlushListener(options.env, this);
317 options.listeners.emplace_back(listener);
318 std::vector<std::string> cf_names = {
319 "pikachu", "ilya", "muromec", "dobrynia",
320 "nikitich", "alyosha", "popovich"};
321 options.table_properties_collector_factories.push_back(
322 std::make_shared<TestPropertiesCollectorFactory>());
323 CreateAndReopenWithCF(cf_names, options);
324
325 ASSERT_OK(Put(1, "pikachu", std::string(90000, 'p')));
326
327 WriteBatch batch;
328 ASSERT_OK(WriteBatchInternal::PutBlobIndex(&batch, 1, "ditto",
329 BlobStr(456, 0, 1 << 10)));
330 ASSERT_OK(dbfull()->Write(WriteOptions(), &batch));
331
332 ASSERT_OK(Put(2, "ilya", std::string(90000, 'i')));
333 ASSERT_OK(Put(3, "muromec", std::string(90000, 'm')));
334 ASSERT_OK(Put(4, "dobrynia", std::string(90000, 'd')));
335 ASSERT_OK(Put(5, "nikitich", std::string(90000, 'n')));
336 ASSERT_OK(Put(6, "alyosha", std::string(90000, 'a')));
337 ASSERT_OK(Put(7, "popovich", std::string(90000, 'p')));
338 for (int i = 1; i < 8; ++i) {
339 ASSERT_OK(Flush(i));
340 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
341 ASSERT_EQ(listener->flushed_dbs_.size(), i);
342 ASSERT_EQ(listener->flushed_column_family_names_.size(), i);
343 }
344
345 // make sure callback functions are called in the right order
346 for (size_t i = 0; i < cf_names.size(); ++i) {
347 ASSERT_EQ(listener->flushed_dbs_[i], db_);
348 ASSERT_EQ(listener->flushed_column_family_names_[i], cf_names[i]);
349 }
350 }
351
TEST_F(EventListenerTest,MultiCF)352 TEST_F(EventListenerTest, MultiCF) {
353 Options options;
354 options.env = CurrentOptions().env;
355 options.write_buffer_size = k110KB;
356 #ifdef ROCKSDB_USING_THREAD_STATUS
357 options.enable_thread_tracking = true;
358 #endif // ROCKSDB_USING_THREAD_STATUS
359 TestFlushListener* listener = new TestFlushListener(options.env, this);
360 options.listeners.emplace_back(listener);
361 options.table_properties_collector_factories.push_back(
362 std::make_shared<TestPropertiesCollectorFactory>());
363 std::vector<std::string> cf_names = {
364 "pikachu", "ilya", "muromec", "dobrynia",
365 "nikitich", "alyosha", "popovich"};
366 CreateAndReopenWithCF(cf_names, options);
367
368 ASSERT_OK(Put(1, "pikachu", std::string(90000, 'p')));
369 ASSERT_OK(Put(2, "ilya", std::string(90000, 'i')));
370 ASSERT_OK(Put(3, "muromec", std::string(90000, 'm')));
371 ASSERT_OK(Put(4, "dobrynia", std::string(90000, 'd')));
372 ASSERT_OK(Put(5, "nikitich", std::string(90000, 'n')));
373 ASSERT_OK(Put(6, "alyosha", std::string(90000, 'a')));
374 ASSERT_OK(Put(7, "popovich", std::string(90000, 'p')));
375 for (int i = 1; i < 8; ++i) {
376 ASSERT_OK(Flush(i));
377 ASSERT_EQ(listener->flushed_dbs_.size(), i);
378 ASSERT_EQ(listener->flushed_column_family_names_.size(), i);
379 }
380
381 // make sure callback functions are called in the right order
382 for (size_t i = 0; i < cf_names.size(); i++) {
383 ASSERT_EQ(listener->flushed_dbs_[i], db_);
384 ASSERT_EQ(listener->flushed_column_family_names_[i], cf_names[i]);
385 }
386 }
387
TEST_F(EventListenerTest,MultiDBMultiListeners)388 TEST_F(EventListenerTest, MultiDBMultiListeners) {
389 Options options;
390 options.env = CurrentOptions().env;
391 #ifdef ROCKSDB_USING_THREAD_STATUS
392 options.enable_thread_tracking = true;
393 #endif // ROCKSDB_USING_THREAD_STATUS
394 options.table_properties_collector_factories.push_back(
395 std::make_shared<TestPropertiesCollectorFactory>());
396 std::vector<TestFlushListener*> listeners;
397 const int kNumDBs = 5;
398 const int kNumListeners = 10;
399 for (int i = 0; i < kNumListeners; ++i) {
400 listeners.emplace_back(new TestFlushListener(options.env, this));
401 }
402
403 std::vector<std::string> cf_names = {
404 "pikachu", "ilya", "muromec", "dobrynia",
405 "nikitich", "alyosha", "popovich"};
406
407 options.create_if_missing = true;
408 for (int i = 0; i < kNumListeners; ++i) {
409 options.listeners.emplace_back(listeners[i]);
410 }
411 DBOptions db_opts(options);
412 ColumnFamilyOptions cf_opts(options);
413
414 std::vector<DB*> dbs;
415 std::vector<std::vector<ColumnFamilyHandle *>> vec_handles;
416
417 for (int d = 0; d < kNumDBs; ++d) {
418 ASSERT_OK(DestroyDB(dbname_ + ToString(d), options));
419 DB* db;
420 std::vector<ColumnFamilyHandle*> handles;
421 ASSERT_OK(DB::Open(options, dbname_ + ToString(d), &db));
422 for (size_t c = 0; c < cf_names.size(); ++c) {
423 ColumnFamilyHandle* handle;
424 ASSERT_OK(db->CreateColumnFamily(cf_opts, cf_names[c], &handle));
425 handles.push_back(handle);
426 }
427
428 vec_handles.push_back(std::move(handles));
429 dbs.push_back(db);
430 }
431
432 for (int d = 0; d < kNumDBs; ++d) {
433 for (size_t c = 0; c < cf_names.size(); ++c) {
434 ASSERT_OK(dbs[d]->Put(WriteOptions(), vec_handles[d][c],
435 cf_names[c], cf_names[c]));
436 }
437 }
438
439 for (size_t c = 0; c < cf_names.size(); ++c) {
440 for (int d = 0; d < kNumDBs; ++d) {
441 ASSERT_OK(dbs[d]->Flush(FlushOptions(), vec_handles[d][c]));
442 ASSERT_OK(
443 static_cast_with_check<DBImpl>(dbs[d])->TEST_WaitForFlushMemTable());
444 }
445 }
446
447 for (auto* listener : listeners) {
448 int pos = 0;
449 for (size_t c = 0; c < cf_names.size(); ++c) {
450 for (int d = 0; d < kNumDBs; ++d) {
451 ASSERT_EQ(listener->flushed_dbs_[pos], dbs[d]);
452 ASSERT_EQ(listener->flushed_column_family_names_[pos], cf_names[c]);
453 pos++;
454 }
455 }
456 }
457
458
459 for (auto handles : vec_handles) {
460 for (auto h : handles) {
461 delete h;
462 }
463 handles.clear();
464 }
465 vec_handles.clear();
466
467 for (auto db : dbs) {
468 delete db;
469 }
470 }
471
TEST_F(EventListenerTest,DisableBGCompaction)472 TEST_F(EventListenerTest, DisableBGCompaction) {
473 Options options;
474 options.env = CurrentOptions().env;
475 #ifdef ROCKSDB_USING_THREAD_STATUS
476 options.enable_thread_tracking = true;
477 #endif // ROCKSDB_USING_THREAD_STATUS
478 TestFlushListener* listener = new TestFlushListener(options.env, this);
479 const int kCompactionTrigger = 1;
480 const int kSlowdownTrigger = 5;
481 const int kStopTrigger = 100;
482 options.level0_file_num_compaction_trigger = kCompactionTrigger;
483 options.level0_slowdown_writes_trigger = kSlowdownTrigger;
484 options.level0_stop_writes_trigger = kStopTrigger;
485 options.max_write_buffer_number = 10;
486 options.listeners.emplace_back(listener);
487 // BG compaction is disabled. Number of L0 files will simply keeps
488 // increasing in this test.
489 options.compaction_style = kCompactionStyleNone;
490 options.compression = kNoCompression;
491 options.write_buffer_size = 100000; // Small write buffer
492 options.table_properties_collector_factories.push_back(
493 std::make_shared<TestPropertiesCollectorFactory>());
494
495 CreateAndReopenWithCF({"pikachu"}, options);
496 ColumnFamilyMetaData cf_meta;
497 db_->GetColumnFamilyMetaData(handles_[1], &cf_meta);
498
499 // keep writing until writes are forced to stop.
500 for (int i = 0; static_cast<int>(cf_meta.file_count) < kSlowdownTrigger * 10;
501 ++i) {
502 ASSERT_OK(Put(1, ToString(i), std::string(10000, 'x'), WriteOptions()));
503 FlushOptions fo;
504 fo.allow_write_stall = true;
505 ASSERT_OK(db_->Flush(fo, handles_[1]));
506 db_->GetColumnFamilyMetaData(handles_[1], &cf_meta);
507 }
508 ASSERT_GE(listener->slowdown_count, kSlowdownTrigger * 9);
509 }
510
511 class TestCompactionReasonListener : public EventListener {
512 public:
OnCompactionCompleted(DB *,const CompactionJobInfo & ci)513 void OnCompactionCompleted(DB* /*db*/, const CompactionJobInfo& ci) override {
514 std::lock_guard<std::mutex> lock(mutex_);
515 compaction_reasons_.push_back(ci.compaction_reason);
516 }
517
518 std::vector<CompactionReason> compaction_reasons_;
519 std::mutex mutex_;
520 };
521
TEST_F(EventListenerTest,CompactionReasonLevel)522 TEST_F(EventListenerTest, CompactionReasonLevel) {
523 Options options;
524 options.env = CurrentOptions().env;
525 options.create_if_missing = true;
526 options.memtable_factory.reset(
527 new SpecialSkipListFactory(DBTestBase::kNumKeysByGenerateNewRandomFile));
528
529 TestCompactionReasonListener* listener = new TestCompactionReasonListener();
530 options.listeners.emplace_back(listener);
531
532 options.level0_file_num_compaction_trigger = 4;
533 options.compaction_style = kCompactionStyleLevel;
534
535 DestroyAndReopen(options);
536 Random rnd(301);
537
538 // Write 4 files in L0
539 for (int i = 0; i < 4; i++) {
540 GenerateNewRandomFile(&rnd);
541 }
542 ASSERT_OK(dbfull()->TEST_WaitForCompact());
543
544 ASSERT_EQ(listener->compaction_reasons_.size(), 1);
545 ASSERT_EQ(listener->compaction_reasons_[0],
546 CompactionReason::kLevelL0FilesNum);
547
548 DestroyAndReopen(options);
549
550 // Write 3 non-overlapping files in L0
551 for (int k = 1; k <= 30; k++) {
552 ASSERT_OK(Put(Key(k), Key(k)));
553 if (k % 10 == 0) {
554 Flush();
555 }
556 }
557
558 // Do a trivial move from L0 -> L1
559 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
560
561 options.max_bytes_for_level_base = 1;
562 Close();
563 listener->compaction_reasons_.clear();
564 Reopen(options);
565
566 ASSERT_OK(dbfull()->TEST_WaitForCompact());
567 ASSERT_GT(listener->compaction_reasons_.size(), 1);
568
569 for (auto compaction_reason : listener->compaction_reasons_) {
570 ASSERT_EQ(compaction_reason, CompactionReason::kLevelMaxLevelSize);
571 }
572
573 options.disable_auto_compactions = true;
574 Close();
575 listener->compaction_reasons_.clear();
576 Reopen(options);
577
578 ASSERT_OK(Put("key", "value"));
579 CompactRangeOptions cro;
580 cro.bottommost_level_compaction = BottommostLevelCompaction::kForceOptimized;
581 ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
582 ASSERT_GT(listener->compaction_reasons_.size(), 0);
583 for (auto compaction_reason : listener->compaction_reasons_) {
584 ASSERT_EQ(compaction_reason, CompactionReason::kManualCompaction);
585 }
586 }
587
TEST_F(EventListenerTest,CompactionReasonUniversal)588 TEST_F(EventListenerTest, CompactionReasonUniversal) {
589 Options options;
590 options.env = CurrentOptions().env;
591 options.create_if_missing = true;
592 options.memtable_factory.reset(
593 new SpecialSkipListFactory(DBTestBase::kNumKeysByGenerateNewRandomFile));
594
595 TestCompactionReasonListener* listener = new TestCompactionReasonListener();
596 options.listeners.emplace_back(listener);
597
598 options.compaction_style = kCompactionStyleUniversal;
599
600 Random rnd(301);
601
602 options.level0_file_num_compaction_trigger = 8;
603 options.compaction_options_universal.max_size_amplification_percent = 100000;
604 options.compaction_options_universal.size_ratio = 100000;
605 DestroyAndReopen(options);
606 listener->compaction_reasons_.clear();
607
608 // Write 8 files in L0
609 for (int i = 0; i < 8; i++) {
610 GenerateNewRandomFile(&rnd);
611 }
612 ASSERT_OK(dbfull()->TEST_WaitForCompact());
613
614 ASSERT_GT(listener->compaction_reasons_.size(), 0);
615 for (auto compaction_reason : listener->compaction_reasons_) {
616 ASSERT_EQ(compaction_reason, CompactionReason::kUniversalSizeRatio);
617 }
618
619 options.level0_file_num_compaction_trigger = 8;
620 options.compaction_options_universal.max_size_amplification_percent = 1;
621 options.compaction_options_universal.size_ratio = 100000;
622
623 DestroyAndReopen(options);
624 listener->compaction_reasons_.clear();
625
626 // Write 8 files in L0
627 for (int i = 0; i < 8; i++) {
628 GenerateNewRandomFile(&rnd);
629 }
630 ASSERT_OK(dbfull()->TEST_WaitForCompact());
631
632 ASSERT_GT(listener->compaction_reasons_.size(), 0);
633 for (auto compaction_reason : listener->compaction_reasons_) {
634 ASSERT_EQ(compaction_reason, CompactionReason::kUniversalSizeAmplification);
635 }
636
637 options.disable_auto_compactions = true;
638 Close();
639 listener->compaction_reasons_.clear();
640 Reopen(options);
641
642 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
643
644 ASSERT_GT(listener->compaction_reasons_.size(), 0);
645 for (auto compaction_reason : listener->compaction_reasons_) {
646 ASSERT_EQ(compaction_reason, CompactionReason::kManualCompaction);
647 }
648 }
649
TEST_F(EventListenerTest,CompactionReasonFIFO)650 TEST_F(EventListenerTest, CompactionReasonFIFO) {
651 Options options;
652 options.env = CurrentOptions().env;
653 options.create_if_missing = true;
654 options.memtable_factory.reset(
655 new SpecialSkipListFactory(DBTestBase::kNumKeysByGenerateNewRandomFile));
656
657 TestCompactionReasonListener* listener = new TestCompactionReasonListener();
658 options.listeners.emplace_back(listener);
659
660 options.level0_file_num_compaction_trigger = 4;
661 options.compaction_style = kCompactionStyleFIFO;
662 options.compaction_options_fifo.max_table_files_size = 1;
663
664 DestroyAndReopen(options);
665 Random rnd(301);
666
667 // Write 4 files in L0
668 for (int i = 0; i < 4; i++) {
669 GenerateNewRandomFile(&rnd);
670 }
671 ASSERT_OK(dbfull()->TEST_WaitForCompact());
672
673 ASSERT_GT(listener->compaction_reasons_.size(), 0);
674 for (auto compaction_reason : listener->compaction_reasons_) {
675 ASSERT_EQ(compaction_reason, CompactionReason::kFIFOMaxSize);
676 }
677 }
678
679 class TableFileCreationListener : public EventListener {
680 public:
681 class TestEnv : public EnvWrapper {
682 public:
TestEnv(Env * t)683 explicit TestEnv(Env* t) : EnvWrapper(t) {}
684
SetStatus(Status s)685 void SetStatus(Status s) { status_ = s; }
686
NewWritableFile(const std::string & fname,std::unique_ptr<WritableFile> * result,const EnvOptions & options)687 Status NewWritableFile(const std::string& fname,
688 std::unique_ptr<WritableFile>* result,
689 const EnvOptions& options) override {
690 if (fname.size() > 4 && fname.substr(fname.size() - 4) == ".sst") {
691 if (!status_.ok()) {
692 return status_;
693 }
694 }
695 return target()->NewWritableFile(fname, result, options);
696 }
697
698 private:
699 Status status_;
700 };
701
TableFileCreationListener()702 TableFileCreationListener() {
703 for (int i = 0; i < 2; i++) {
704 started_[i] = finished_[i] = failure_[i] = 0;
705 }
706 }
707
Index(TableFileCreationReason reason)708 int Index(TableFileCreationReason reason) {
709 int idx;
710 switch (reason) {
711 case TableFileCreationReason::kFlush:
712 idx = 0;
713 break;
714 case TableFileCreationReason::kCompaction:
715 idx = 1;
716 break;
717 default:
718 idx = -1;
719 }
720 return idx;
721 }
722
CheckAndResetCounters(int flush_started,int flush_finished,int flush_failure,int compaction_started,int compaction_finished,int compaction_failure)723 void CheckAndResetCounters(int flush_started, int flush_finished,
724 int flush_failure, int compaction_started,
725 int compaction_finished, int compaction_failure) {
726 ASSERT_EQ(started_[0], flush_started);
727 ASSERT_EQ(finished_[0], flush_finished);
728 ASSERT_EQ(failure_[0], flush_failure);
729 ASSERT_EQ(started_[1], compaction_started);
730 ASSERT_EQ(finished_[1], compaction_finished);
731 ASSERT_EQ(failure_[1], compaction_failure);
732 for (int i = 0; i < 2; i++) {
733 started_[i] = finished_[i] = failure_[i] = 0;
734 }
735 }
736
OnTableFileCreationStarted(const TableFileCreationBriefInfo & info)737 void OnTableFileCreationStarted(
738 const TableFileCreationBriefInfo& info) override {
739 int idx = Index(info.reason);
740 if (idx >= 0) {
741 started_[idx]++;
742 }
743 ASSERT_GT(info.db_name.size(), 0U);
744 ASSERT_GT(info.cf_name.size(), 0U);
745 ASSERT_GT(info.file_path.size(), 0U);
746 ASSERT_GT(info.job_id, 0);
747 }
748
OnTableFileCreated(const TableFileCreationInfo & info)749 void OnTableFileCreated(const TableFileCreationInfo& info) override {
750 int idx = Index(info.reason);
751 if (idx >= 0) {
752 finished_[idx]++;
753 }
754 ASSERT_GT(info.db_name.size(), 0U);
755 ASSERT_GT(info.cf_name.size(), 0U);
756 ASSERT_GT(info.file_path.size(), 0U);
757 ASSERT_GT(info.job_id, 0);
758 ASSERT_EQ(info.file_checksum, kUnknownFileChecksum);
759 ASSERT_EQ(info.file_checksum_func_name, kUnknownFileChecksumFuncName);
760 if (info.status.ok()) {
761 ASSERT_GT(info.table_properties.data_size, 0U);
762 ASSERT_GT(info.table_properties.raw_key_size, 0U);
763 ASSERT_GT(info.table_properties.raw_value_size, 0U);
764 ASSERT_GT(info.table_properties.num_data_blocks, 0U);
765 ASSERT_GT(info.table_properties.num_entries, 0U);
766 } else {
767 if (idx >= 0) {
768 failure_[idx]++;
769 }
770 }
771 }
772
773 int started_[2];
774 int finished_[2];
775 int failure_[2];
776 };
777
TEST_F(EventListenerTest,TableFileCreationListenersTest)778 TEST_F(EventListenerTest, TableFileCreationListenersTest) {
779 auto listener = std::make_shared<TableFileCreationListener>();
780 Options options;
781 std::unique_ptr<TableFileCreationListener::TestEnv> test_env(
782 new TableFileCreationListener::TestEnv(CurrentOptions().env));
783 options.create_if_missing = true;
784 options.listeners.push_back(listener);
785 options.env = test_env.get();
786 DestroyAndReopen(options);
787
788 ASSERT_OK(Put("foo", "aaa"));
789 ASSERT_OK(Put("bar", "bbb"));
790 ASSERT_OK(Flush());
791 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
792 listener->CheckAndResetCounters(1, 1, 0, 0, 0, 0);
793 ASSERT_OK(Put("foo", "aaa1"));
794 ASSERT_OK(Put("bar", "bbb1"));
795 test_env->SetStatus(Status::NotSupported("not supported"));
796 ASSERT_NOK(Flush());
797 listener->CheckAndResetCounters(1, 1, 1, 0, 0, 0);
798 test_env->SetStatus(Status::OK());
799
800 Reopen(options);
801 ASSERT_OK(Put("foo", "aaa2"));
802 ASSERT_OK(Put("bar", "bbb2"));
803 ASSERT_OK(Flush());
804 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
805 listener->CheckAndResetCounters(1, 1, 0, 0, 0, 0);
806
807 const Slice kRangeStart = "a";
808 const Slice kRangeEnd = "z";
809 ASSERT_OK(
810 dbfull()->CompactRange(CompactRangeOptions(), &kRangeStart, &kRangeEnd));
811 ASSERT_OK(dbfull()->TEST_WaitForCompact());
812 listener->CheckAndResetCounters(0, 0, 0, 1, 1, 0);
813
814 ASSERT_OK(Put("foo", "aaa3"));
815 ASSERT_OK(Put("bar", "bbb3"));
816 ASSERT_OK(Flush());
817 test_env->SetStatus(Status::NotSupported("not supported"));
818 ASSERT_NOK(
819 dbfull()->CompactRange(CompactRangeOptions(), &kRangeStart, &kRangeEnd));
820 ASSERT_NOK(dbfull()->TEST_WaitForCompact());
821 listener->CheckAndResetCounters(1, 1, 0, 1, 1, 1);
822 Close();
823 }
824
825 class MemTableSealedListener : public EventListener {
826 private:
827 SequenceNumber latest_seq_number_;
828 public:
MemTableSealedListener()829 MemTableSealedListener() {}
OnMemTableSealed(const MemTableInfo & info)830 void OnMemTableSealed(const MemTableInfo& info) override {
831 latest_seq_number_ = info.first_seqno;
832 }
833
OnFlushCompleted(DB *,const FlushJobInfo & flush_job_info)834 void OnFlushCompleted(DB* /*db*/,
835 const FlushJobInfo& flush_job_info) override {
836 ASSERT_LE(flush_job_info.smallest_seqno, latest_seq_number_);
837 }
838 };
839
TEST_F(EventListenerTest,MemTableSealedListenerTest)840 TEST_F(EventListenerTest, MemTableSealedListenerTest) {
841 auto listener = std::make_shared<MemTableSealedListener>();
842 Options options;
843 options.env = CurrentOptions().env;
844 options.create_if_missing = true;
845 options.listeners.push_back(listener);
846 DestroyAndReopen(options);
847
848 for (unsigned int i = 0; i < 10; i++) {
849 std::string tag = std::to_string(i);
850 ASSERT_OK(Put("foo"+tag, "aaa"));
851 ASSERT_OK(Put("bar"+tag, "bbb"));
852
853 ASSERT_OK(Flush());
854 }
855 }
856
857 class ColumnFamilyHandleDeletionStartedListener : public EventListener {
858 private:
859 std::vector<std::string> cfs_;
860 int counter;
861
862 public:
ColumnFamilyHandleDeletionStartedListener(const std::vector<std::string> & cfs)863 explicit ColumnFamilyHandleDeletionStartedListener(
864 const std::vector<std::string>& cfs)
865 : cfs_(cfs), counter(0) {
866 cfs_.insert(cfs_.begin(), kDefaultColumnFamilyName);
867 }
OnColumnFamilyHandleDeletionStarted(ColumnFamilyHandle * handle)868 void OnColumnFamilyHandleDeletionStarted(
869 ColumnFamilyHandle* handle) override {
870 ASSERT_EQ(cfs_[handle->GetID()], handle->GetName());
871 counter++;
872 }
getCounter()873 int getCounter() { return counter; }
874 };
875
TEST_F(EventListenerTest,ColumnFamilyHandleDeletionStartedListenerTest)876 TEST_F(EventListenerTest, ColumnFamilyHandleDeletionStartedListenerTest) {
877 std::vector<std::string> cfs{"pikachu", "eevee", "Mewtwo"};
878 auto listener =
879 std::make_shared<ColumnFamilyHandleDeletionStartedListener>(cfs);
880 Options options;
881 options.env = CurrentOptions().env;
882 options.create_if_missing = true;
883 options.listeners.push_back(listener);
884 CreateAndReopenWithCF(cfs, options);
885 ASSERT_EQ(handles_.size(), 4);
886 delete handles_[3];
887 delete handles_[2];
888 delete handles_[1];
889 handles_.resize(1);
890 ASSERT_EQ(listener->getCounter(), 3);
891 }
892
893 class BackgroundErrorListener : public EventListener {
894 private:
895 SpecialEnv* env_;
896 int counter_;
897
898 public:
BackgroundErrorListener(SpecialEnv * env)899 BackgroundErrorListener(SpecialEnv* env) : env_(env), counter_(0) {}
900
OnBackgroundError(BackgroundErrorReason,Status * bg_error)901 void OnBackgroundError(BackgroundErrorReason /*reason*/,
902 Status* bg_error) override {
903 if (counter_ == 0) {
904 // suppress the first error and disable write-dropping such that a retry
905 // can succeed.
906 *bg_error = Status::OK();
907 env_->drop_writes_.store(false, std::memory_order_release);
908 env_->SetMockSleep(false);
909 }
910 ++counter_;
911 }
912
counter()913 int counter() { return counter_; }
914 };
915
TEST_F(EventListenerTest,BackgroundErrorListenerFailedFlushTest)916 TEST_F(EventListenerTest, BackgroundErrorListenerFailedFlushTest) {
917 auto listener = std::make_shared<BackgroundErrorListener>(env_);
918 Options options;
919 options.create_if_missing = true;
920 options.env = env_;
921 options.listeners.push_back(listener);
922 options.memtable_factory.reset(new SpecialSkipListFactory(1));
923 options.paranoid_checks = true;
924 DestroyAndReopen(options);
925
926 // the usual TEST_WaitForFlushMemTable() doesn't work for failed flushes, so
927 // forge a custom one for the failed flush case.
928 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
929 {{"DBImpl::BGWorkFlush:done",
930 "EventListenerTest:BackgroundErrorListenerFailedFlushTest:1"}});
931 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
932
933 env_->drop_writes_.store(true, std::memory_order_release);
934 env_->SetMockSleep();
935
936 ASSERT_OK(Put("key0", "val"));
937 ASSERT_OK(Put("key1", "val"));
938 TEST_SYNC_POINT("EventListenerTest:BackgroundErrorListenerFailedFlushTest:1");
939 ASSERT_EQ(1, listener->counter());
940 ASSERT_OK(Put("key2", "val"));
941 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
942 ASSERT_EQ(1, NumTableFilesAtLevel(0));
943 }
944
TEST_F(EventListenerTest,BackgroundErrorListenerFailedCompactionTest)945 TEST_F(EventListenerTest, BackgroundErrorListenerFailedCompactionTest) {
946 auto listener = std::make_shared<BackgroundErrorListener>(env_);
947 Options options;
948 options.create_if_missing = true;
949 options.disable_auto_compactions = true;
950 options.env = env_;
951 options.level0_file_num_compaction_trigger = 2;
952 options.listeners.push_back(listener);
953 options.memtable_factory.reset(new SpecialSkipListFactory(2));
954 options.paranoid_checks = true;
955 DestroyAndReopen(options);
956
957 // third iteration triggers the second memtable's flush
958 for (int i = 0; i < 3; ++i) {
959 ASSERT_OK(Put("key0", "val"));
960 if (i > 0) {
961 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
962 }
963 ASSERT_OK(Put("key1", "val"));
964 }
965 ASSERT_EQ(2, NumTableFilesAtLevel(0));
966
967 env_->drop_writes_.store(true, std::memory_order_release);
968 env_->SetMockSleep();
969 ASSERT_OK(dbfull()->SetOptions({{"disable_auto_compactions", "false"}}));
970 ASSERT_OK(dbfull()->TEST_WaitForCompact());
971 ASSERT_EQ(1, listener->counter());
972
973 // trigger flush so compaction is triggered again; this time it succeeds
974 // The previous failed compaction may get retried automatically, so we may
975 // be left with 0 or 1 files in level 1, depending on when the retry gets
976 // scheduled
977 ASSERT_OK(Put("key0", "val"));
978 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
979 ASSERT_OK(dbfull()->TEST_WaitForCompact());
980 ASSERT_LE(1, NumTableFilesAtLevel(0));
981 }
982
983 class TestFileOperationListener : public EventListener {
984 public:
TestFileOperationListener()985 TestFileOperationListener() {
986 file_reads_.store(0);
987 file_reads_success_.store(0);
988 file_writes_.store(0);
989 file_writes_success_.store(0);
990 file_flushes_.store(0);
991 file_flushes_success_.store(0);
992 file_closes_.store(0);
993 file_closes_success_.store(0);
994 file_syncs_.store(0);
995 file_syncs_success_.store(0);
996 file_truncates_.store(0);
997 file_truncates_success_.store(0);
998 }
999
OnFileReadFinish(const FileOperationInfo & info)1000 void OnFileReadFinish(const FileOperationInfo& info) override {
1001 ++file_reads_;
1002 if (info.status.ok()) {
1003 ++file_reads_success_;
1004 }
1005 ReportDuration(info);
1006 }
1007
OnFileWriteFinish(const FileOperationInfo & info)1008 void OnFileWriteFinish(const FileOperationInfo& info) override {
1009 ++file_writes_;
1010 if (info.status.ok()) {
1011 ++file_writes_success_;
1012 }
1013 ReportDuration(info);
1014 }
1015
OnFileFlushFinish(const FileOperationInfo & info)1016 void OnFileFlushFinish(const FileOperationInfo& info) override {
1017 ++file_flushes_;
1018 if (info.status.ok()) {
1019 ++file_flushes_success_;
1020 }
1021 ReportDuration(info);
1022 }
1023
OnFileCloseFinish(const FileOperationInfo & info)1024 void OnFileCloseFinish(const FileOperationInfo& info) override {
1025 ++file_closes_;
1026 if (info.status.ok()) {
1027 ++file_closes_success_;
1028 }
1029 ReportDuration(info);
1030 }
1031
OnFileSyncFinish(const FileOperationInfo & info)1032 void OnFileSyncFinish(const FileOperationInfo& info) override {
1033 ++file_syncs_;
1034 if (info.status.ok()) {
1035 ++file_syncs_success_;
1036 }
1037 ReportDuration(info);
1038 }
1039
OnFileTruncateFinish(const FileOperationInfo & info)1040 void OnFileTruncateFinish(const FileOperationInfo& info) override {
1041 ++file_truncates_;
1042 if (info.status.ok()) {
1043 ++file_truncates_success_;
1044 }
1045 ReportDuration(info);
1046 }
1047
ShouldBeNotifiedOnFileIO()1048 bool ShouldBeNotifiedOnFileIO() override { return true; }
1049
1050 std::atomic<size_t> file_reads_;
1051 std::atomic<size_t> file_reads_success_;
1052 std::atomic<size_t> file_writes_;
1053 std::atomic<size_t> file_writes_success_;
1054 std::atomic<size_t> file_flushes_;
1055 std::atomic<size_t> file_flushes_success_;
1056 std::atomic<size_t> file_closes_;
1057 std::atomic<size_t> file_closes_success_;
1058 std::atomic<size_t> file_syncs_;
1059 std::atomic<size_t> file_syncs_success_;
1060 std::atomic<size_t> file_truncates_;
1061 std::atomic<size_t> file_truncates_success_;
1062
1063 private:
ReportDuration(const FileOperationInfo & info) const1064 void ReportDuration(const FileOperationInfo& info) const {
1065 ASSERT_GT(info.duration.count(), 0);
1066 }
1067 };
1068
TEST_F(EventListenerTest,OnFileOperationTest)1069 TEST_F(EventListenerTest, OnFileOperationTest) {
1070 Options options;
1071 options.env = CurrentOptions().env;
1072 options.create_if_missing = true;
1073
1074 TestFileOperationListener* listener = new TestFileOperationListener();
1075 options.listeners.emplace_back(listener);
1076
1077 options.use_direct_io_for_flush_and_compaction = false;
1078 Status s = TryReopen(options);
1079 if (s.IsInvalidArgument()) {
1080 options.use_direct_io_for_flush_and_compaction = false;
1081 } else {
1082 ASSERT_OK(s);
1083 }
1084 DestroyAndReopen(options);
1085 ASSERT_OK(Put("foo", "aaa"));
1086 ASSERT_OK(dbfull()->Flush(FlushOptions()));
1087 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
1088 ASSERT_GE(listener->file_writes_.load(),
1089 listener->file_writes_success_.load());
1090 ASSERT_GT(listener->file_writes_.load(), 0);
1091 ASSERT_GE(listener->file_flushes_.load(),
1092 listener->file_flushes_success_.load());
1093 ASSERT_GT(listener->file_flushes_.load(), 0);
1094 Close();
1095
1096 Reopen(options);
1097 ASSERT_GE(listener->file_reads_.load(), listener->file_reads_success_.load());
1098 ASSERT_GT(listener->file_reads_.load(), 0);
1099 ASSERT_GE(listener->file_closes_.load(),
1100 listener->file_closes_success_.load());
1101 ASSERT_GT(listener->file_closes_.load(), 0);
1102 ASSERT_GE(listener->file_syncs_.load(), listener->file_syncs_success_.load());
1103 ASSERT_GT(listener->file_syncs_.load(), 0);
1104 if (true == options.use_direct_io_for_flush_and_compaction) {
1105 ASSERT_GE(listener->file_truncates_.load(),
1106 listener->file_truncates_success_.load());
1107 ASSERT_GT(listener->file_truncates_.load(), 0);
1108 }
1109 }
1110
1111 } // namespace ROCKSDB_NAMESPACE
1112
1113 #endif // ROCKSDB_LITE
1114
main(int argc,char ** argv)1115 int main(int argc, char** argv) {
1116 ::testing::InitGoogleTest(&argc, argv);
1117 return RUN_ALL_TESTS();
1118 }
1119