1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include <algorithm>
11 #include <cinttypes>
12 #include <iostream>
13 #include <mutex>
14 #include <queue>
15 #include <set>
16 #include <thread>
17 #include <unordered_set>
18 #include <utility>
19 
20 #include "db/db_impl/db_impl.h"
21 #include "db/dbformat.h"
22 #include "db/job_context.h"
23 #include "db/version_set.h"
24 #include "db/write_batch_internal.h"
25 #include "env/mock_env.h"
26 #include "file/filename.h"
27 #include "logging/logging.h"
28 #include "memtable/hash_linklist_rep.h"
29 #include "monitoring/statistics.h"
30 #include "monitoring/thread_status_util.h"
31 #include "port/stack_trace.h"
32 #include "rocksdb/cache.h"
33 #include "rocksdb/compaction_filter.h"
34 #include "rocksdb/convenience.h"
35 #include "rocksdb/db.h"
36 #include "rocksdb/env.h"
37 #include "rocksdb/experimental.h"
38 #include "rocksdb/filter_policy.h"
39 #include "rocksdb/options.h"
40 #include "rocksdb/perf_context.h"
41 #include "rocksdb/slice.h"
42 #include "rocksdb/slice_transform.h"
43 #include "rocksdb/table.h"
44 #include "rocksdb/table_properties.h"
45 #include "rocksdb/thread_status.h"
46 #include "rocksdb/utilities/checkpoint.h"
47 #include "rocksdb/utilities/write_batch_with_index.h"
48 #include "table/block_based/block_based_table_factory.h"
49 #include "table/mock_table.h"
50 #include "table/plain/plain_table_factory.h"
51 #include "table/scoped_arena_iterator.h"
52 #include "test_util/sync_point.h"
53 #include "test_util/testharness.h"
54 #include "test_util/testutil.h"
55 #include "util/compression.h"
56 #include "util/hash.h"
57 #include "util/mutexlock.h"
58 #include "util/rate_limiter.h"
59 #include "util/string_util.h"
60 #include "utilities/merge_operators.h"
61 
62 #if !defined(IOS_CROSS_COMPILE)
63 #ifndef ROCKSDB_LITE
64 namespace ROCKSDB_NAMESPACE {
65 
RandomString(Random * rnd,int len,double ratio)66 static std::string RandomString(Random* rnd, int len, double ratio) {
67   std::string r;
68   test::CompressibleString(rnd, ratio, len, &r);
69   return r;
70 }
71 
Key(uint64_t key,int length)72 std::string Key(uint64_t key, int length) {
73   const int kBufSize = 1000;
74   char buf[kBufSize];
75   if (length > kBufSize) {
76     length = kBufSize;
77   }
78   snprintf(buf, kBufSize, "%0*" PRIu64, length, key);
79   return std::string(buf);
80 }
81 
82 class CompactionJobStatsTest : public testing::Test,
83                                public testing::WithParamInterface<bool> {
84  public:
85   std::string dbname_;
86   std::string alternative_wal_dir_;
87   Env* env_;
88   DB* db_;
89   std::vector<ColumnFamilyHandle*> handles_;
90   uint32_t max_subcompactions_;
91 
92   Options last_options_;
93 
CompactionJobStatsTest()94   CompactionJobStatsTest() : env_(Env::Default()) {
95     env_->SetBackgroundThreads(1, Env::LOW);
96     env_->SetBackgroundThreads(1, Env::HIGH);
97     dbname_ = test::PerThreadDBPath("compaction_job_stats_test");
98     alternative_wal_dir_ = dbname_ + "/wal";
99     Options options;
100     options.create_if_missing = true;
101     max_subcompactions_ = GetParam();
102     options.max_subcompactions = max_subcompactions_;
103     auto delete_options = options;
104     delete_options.wal_dir = alternative_wal_dir_;
105     EXPECT_OK(DestroyDB(dbname_, delete_options));
106     // Destroy it for not alternative WAL dir is used.
107     EXPECT_OK(DestroyDB(dbname_, options));
108     db_ = nullptr;
109     Reopen(options);
110   }
111 
~CompactionJobStatsTest()112   ~CompactionJobStatsTest() override {
113     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
114     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({});
115     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
116     Close();
117     Options options;
118     options.db_paths.emplace_back(dbname_, 0);
119     options.db_paths.emplace_back(dbname_ + "_2", 0);
120     options.db_paths.emplace_back(dbname_ + "_3", 0);
121     options.db_paths.emplace_back(dbname_ + "_4", 0);
122     EXPECT_OK(DestroyDB(dbname_, options));
123   }
124 
125   // Required if inheriting from testing::WithParamInterface<>
SetUpTestCase()126   static void SetUpTestCase() {}
TearDownTestCase()127   static void TearDownTestCase() {}
128 
dbfull()129   DBImpl* dbfull() {
130     return reinterpret_cast<DBImpl*>(db_);
131   }
132 
CreateColumnFamilies(const std::vector<std::string> & cfs,const Options & options)133   void CreateColumnFamilies(const std::vector<std::string>& cfs,
134                             const Options& options) {
135     ColumnFamilyOptions cf_opts(options);
136     size_t cfi = handles_.size();
137     handles_.resize(cfi + cfs.size());
138     for (auto cf : cfs) {
139       ASSERT_OK(db_->CreateColumnFamily(cf_opts, cf, &handles_[cfi++]));
140     }
141   }
142 
CreateAndReopenWithCF(const std::vector<std::string> & cfs,const Options & options)143   void CreateAndReopenWithCF(const std::vector<std::string>& cfs,
144                              const Options& options) {
145     CreateColumnFamilies(cfs, options);
146     std::vector<std::string> cfs_plus_default = cfs;
147     cfs_plus_default.insert(cfs_plus_default.begin(), kDefaultColumnFamilyName);
148     ReopenWithColumnFamilies(cfs_plus_default, options);
149   }
150 
ReopenWithColumnFamilies(const std::vector<std::string> & cfs,const std::vector<Options> & options)151   void ReopenWithColumnFamilies(const std::vector<std::string>& cfs,
152                                 const std::vector<Options>& options) {
153     ASSERT_OK(TryReopenWithColumnFamilies(cfs, options));
154   }
155 
ReopenWithColumnFamilies(const std::vector<std::string> & cfs,const Options & options)156   void ReopenWithColumnFamilies(const std::vector<std::string>& cfs,
157                                 const Options& options) {
158     ASSERT_OK(TryReopenWithColumnFamilies(cfs, options));
159   }
160 
TryReopenWithColumnFamilies(const std::vector<std::string> & cfs,const std::vector<Options> & options)161   Status TryReopenWithColumnFamilies(
162       const std::vector<std::string>& cfs,
163       const std::vector<Options>& options) {
164     Close();
165     EXPECT_EQ(cfs.size(), options.size());
166     std::vector<ColumnFamilyDescriptor> column_families;
167     for (size_t i = 0; i < cfs.size(); ++i) {
168       column_families.push_back(ColumnFamilyDescriptor(cfs[i], options[i]));
169     }
170     DBOptions db_opts = DBOptions(options[0]);
171     return DB::Open(db_opts, dbname_, column_families, &handles_, &db_);
172   }
173 
TryReopenWithColumnFamilies(const std::vector<std::string> & cfs,const Options & options)174   Status TryReopenWithColumnFamilies(const std::vector<std::string>& cfs,
175                                      const Options& options) {
176     Close();
177     std::vector<Options> v_opts(cfs.size(), options);
178     return TryReopenWithColumnFamilies(cfs, v_opts);
179   }
180 
Reopen(const Options & options)181   void Reopen(const Options& options) {
182     ASSERT_OK(TryReopen(options));
183   }
184 
Close()185   void Close() {
186     for (auto h : handles_) {
187       delete h;
188     }
189     handles_.clear();
190     delete db_;
191     db_ = nullptr;
192   }
193 
DestroyAndReopen(const Options & options)194   void DestroyAndReopen(const Options& options) {
195     // Destroy using last options
196     Destroy(last_options_);
197     ASSERT_OK(TryReopen(options));
198   }
199 
Destroy(const Options & options)200   void Destroy(const Options& options) {
201     Close();
202     ASSERT_OK(DestroyDB(dbname_, options));
203   }
204 
ReadOnlyReopen(const Options & options)205   Status ReadOnlyReopen(const Options& options) {
206     return DB::OpenForReadOnly(options, dbname_, &db_);
207   }
208 
TryReopen(const Options & options)209   Status TryReopen(const Options& options) {
210     Close();
211     last_options_ = options;
212     return DB::Open(options, dbname_, &db_);
213   }
214 
Flush(int cf=0)215   Status Flush(int cf = 0) {
216     if (cf == 0) {
217       return db_->Flush(FlushOptions());
218     } else {
219       return db_->Flush(FlushOptions(), handles_[cf]);
220     }
221   }
222 
Put(const Slice & k,const Slice & v,WriteOptions wo=WriteOptions ())223   Status Put(const Slice& k, const Slice& v, WriteOptions wo = WriteOptions()) {
224     return db_->Put(wo, k, v);
225   }
226 
Put(int cf,const Slice & k,const Slice & v,WriteOptions wo=WriteOptions ())227   Status Put(int cf, const Slice& k, const Slice& v,
228              WriteOptions wo = WriteOptions()) {
229     return db_->Put(wo, handles_[cf], k, v);
230   }
231 
Delete(const std::string & k)232   Status Delete(const std::string& k) {
233     return db_->Delete(WriteOptions(), k);
234   }
235 
Delete(int cf,const std::string & k)236   Status Delete(int cf, const std::string& k) {
237     return db_->Delete(WriteOptions(), handles_[cf], k);
238   }
239 
Get(const std::string & k,const Snapshot * snapshot=nullptr)240   std::string Get(const std::string& k, const Snapshot* snapshot = nullptr) {
241     ReadOptions options;
242     options.verify_checksums = true;
243     options.snapshot = snapshot;
244     std::string result;
245     Status s = db_->Get(options, k, &result);
246     if (s.IsNotFound()) {
247       result = "NOT_FOUND";
248     } else if (!s.ok()) {
249       result = s.ToString();
250     }
251     return result;
252   }
253 
Get(int cf,const std::string & k,const Snapshot * snapshot=nullptr)254   std::string Get(int cf, const std::string& k,
255                   const Snapshot* snapshot = nullptr) {
256     ReadOptions options;
257     options.verify_checksums = true;
258     options.snapshot = snapshot;
259     std::string result;
260     Status s = db_->Get(options, handles_[cf], k, &result);
261     if (s.IsNotFound()) {
262       result = "NOT_FOUND";
263     } else if (!s.ok()) {
264       result = s.ToString();
265     }
266     return result;
267   }
268 
NumTableFilesAtLevel(int level,int cf=0)269   int NumTableFilesAtLevel(int level, int cf = 0) {
270     std::string property;
271     if (cf == 0) {
272       // default cfd
273       EXPECT_TRUE(db_->GetProperty(
274           "rocksdb.num-files-at-level" + NumberToString(level), &property));
275     } else {
276       EXPECT_TRUE(db_->GetProperty(
277           handles_[cf], "rocksdb.num-files-at-level" + NumberToString(level),
278           &property));
279     }
280     return atoi(property.c_str());
281   }
282 
283   // Return spread of files per level
FilesPerLevel(int cf=0)284   std::string FilesPerLevel(int cf = 0) {
285     int num_levels =
286         (cf == 0) ? db_->NumberLevels() : db_->NumberLevels(handles_[1]);
287     std::string result;
288     size_t last_non_zero_offset = 0;
289     for (int level = 0; level < num_levels; level++) {
290       int f = NumTableFilesAtLevel(level, cf);
291       char buf[100];
292       snprintf(buf, sizeof(buf), "%s%d", (level ? "," : ""), f);
293       result += buf;
294       if (f > 0) {
295         last_non_zero_offset = result.size();
296       }
297     }
298     result.resize(last_non_zero_offset);
299     return result;
300   }
301 
Size(const Slice & start,const Slice & limit,int cf=0)302   uint64_t Size(const Slice& start, const Slice& limit, int cf = 0) {
303     Range r(start, limit);
304     uint64_t size;
305     if (cf == 0) {
306       db_->GetApproximateSizes(&r, 1, &size);
307     } else {
308       db_->GetApproximateSizes(handles_[1], &r, 1, &size);
309     }
310     return size;
311   }
312 
Compact(int cf,const Slice & start,const Slice & limit,uint32_t target_path_id)313   void Compact(int cf, const Slice& start, const Slice& limit,
314                uint32_t target_path_id) {
315     CompactRangeOptions compact_options;
316     compact_options.target_path_id = target_path_id;
317     ASSERT_OK(db_->CompactRange(compact_options, handles_[cf], &start, &limit));
318   }
319 
Compact(int cf,const Slice & start,const Slice & limit)320   void Compact(int cf, const Slice& start, const Slice& limit) {
321     ASSERT_OK(
322         db_->CompactRange(CompactRangeOptions(), handles_[cf], &start, &limit));
323   }
324 
Compact(const Slice & start,const Slice & limit)325   void Compact(const Slice& start, const Slice& limit) {
326     ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &limit));
327   }
328 
TEST_Compact(int level,int cf,const Slice & start,const Slice & limit)329   void TEST_Compact(int level, int cf, const Slice& start, const Slice& limit) {
330     ASSERT_OK(dbfull()->TEST_CompactRange(level, &start, &limit, handles_[cf],
331                                           true /* disallow trivial move */));
332   }
333 
334   // Do n memtable compactions, each of which produces an sstable
335   // covering the range [small,large].
MakeTables(int n,const std::string & small,const std::string & large,int cf=0)336   void MakeTables(int n, const std::string& small, const std::string& large,
337                   int cf = 0) {
338     for (int i = 0; i < n; i++) {
339       ASSERT_OK(Put(cf, small, "begin"));
340       ASSERT_OK(Put(cf, large, "end"));
341       ASSERT_OK(Flush(cf));
342     }
343   }
344 
SetDeletionCompactionStats(CompactionJobStats * stats,uint64_t input_deletions,uint64_t expired_deletions,uint64_t records_replaced)345   static void SetDeletionCompactionStats(
346       CompactionJobStats *stats, uint64_t input_deletions,
347       uint64_t expired_deletions, uint64_t records_replaced) {
348     stats->num_input_deletion_records = input_deletions;
349     stats->num_expired_deletion_records = expired_deletions;
350     stats->num_records_replaced = records_replaced;
351   }
352 
MakeTableWithKeyValues(Random * rnd,uint64_t smallest,uint64_t largest,int key_size,int value_size,uint64_t interval,double ratio,int cf=0)353   void MakeTableWithKeyValues(
354     Random* rnd, uint64_t smallest, uint64_t largest,
355     int key_size, int value_size, uint64_t interval,
356     double ratio, int cf = 0) {
357     for (auto key = smallest; key < largest; key += interval) {
358       ASSERT_OK(Put(cf, Slice(Key(key, key_size)),
359                         Slice(RandomString(rnd, value_size, ratio))));
360     }
361     ASSERT_OK(Flush(cf));
362   }
363 
364   // This function behaves with the implicit understanding that two
365   // rounds of keys are inserted into the database, as per the behavior
366   // of the DeletionStatsTest.
SelectivelyDeleteKeys(uint64_t smallest,uint64_t largest,uint64_t interval,int deletion_interval,int key_size,uint64_t cutoff_key_num,CompactionJobStats * stats,int cf=0)367   void SelectivelyDeleteKeys(uint64_t smallest, uint64_t largest,
368     uint64_t interval, int deletion_interval, int key_size,
369     uint64_t cutoff_key_num, CompactionJobStats* stats, int cf = 0) {
370 
371     // interval needs to be >= 2 so that deletion entries can be inserted
372     // that are intended to not result in an actual key deletion by using
373     // an offset of 1 from another existing key
374     ASSERT_GE(interval, 2);
375 
376     uint64_t ctr = 1;
377     uint32_t deletions_made = 0;
378     uint32_t num_deleted = 0;
379     uint32_t num_expired = 0;
380     for (auto key = smallest; key <= largest; key += interval, ctr++) {
381       if (ctr % deletion_interval == 0) {
382         ASSERT_OK(Delete(cf, Key(key, key_size)));
383         deletions_made++;
384         num_deleted++;
385 
386         if (key > cutoff_key_num) {
387           num_expired++;
388         }
389       }
390     }
391 
392     // Insert some deletions for keys that don't exist that
393     // are both in and out of the key range
394     ASSERT_OK(Delete(cf, Key(smallest+1, key_size)));
395     deletions_made++;
396 
397     ASSERT_OK(Delete(cf, Key(smallest-1, key_size)));
398     deletions_made++;
399     num_expired++;
400 
401     ASSERT_OK(Delete(cf, Key(smallest-9, key_size)));
402     deletions_made++;
403     num_expired++;
404 
405     ASSERT_OK(Flush(cf));
406     SetDeletionCompactionStats(stats, deletions_made, num_expired,
407       num_deleted);
408   }
409 };
410 
411 // An EventListener which helps verify the compaction results in
412 // test CompactionJobStatsTest.
413 class CompactionJobStatsChecker : public EventListener {
414  public:
CompactionJobStatsChecker()415   CompactionJobStatsChecker()
416       : compression_enabled_(false), verify_next_comp_io_stats_(false) {}
417 
NumberOfUnverifiedStats()418   size_t NumberOfUnverifiedStats() { return expected_stats_.size(); }
419 
set_verify_next_comp_io_stats(bool v)420   void set_verify_next_comp_io_stats(bool v) { verify_next_comp_io_stats_ = v; }
421 
422   // Once a compaction completed, this function will verify the returned
423   // CompactionJobInfo with the oldest CompactionJobInfo added earlier
424   // in "expected_stats_" which has not yet being used for verification.
OnCompactionCompleted(DB *,const CompactionJobInfo & ci)425   void OnCompactionCompleted(DB* /*db*/, const CompactionJobInfo& ci) override {
426     if (verify_next_comp_io_stats_) {
427       ASSERT_GT(ci.stats.file_write_nanos, 0);
428       ASSERT_GT(ci.stats.file_range_sync_nanos, 0);
429       ASSERT_GT(ci.stats.file_fsync_nanos, 0);
430       ASSERT_GT(ci.stats.file_prepare_write_nanos, 0);
431       verify_next_comp_io_stats_ = false;
432     }
433 
434     std::lock_guard<std::mutex> lock(mutex_);
435     if (expected_stats_.size()) {
436       Verify(ci.stats, expected_stats_.front());
437       expected_stats_.pop();
438     }
439   }
440 
441   // A helper function which verifies whether two CompactionJobStats
442   // match.  The verification of all compaction stats are done by
443   // ASSERT_EQ except for the total input / output bytes, which we
444   // use ASSERT_GE and ASSERT_LE with a reasonable bias ---
445   // 10% in uncompressed case and 20% when compression is used.
Verify(const CompactionJobStats & current_stats,const CompactionJobStats & stats)446   virtual void Verify(const CompactionJobStats& current_stats,
447               const CompactionJobStats& stats) {
448     // time
449     ASSERT_GT(current_stats.elapsed_micros, 0U);
450 
451     ASSERT_EQ(current_stats.num_input_records,
452         stats.num_input_records);
453     ASSERT_EQ(current_stats.num_input_files,
454         stats.num_input_files);
455     ASSERT_EQ(current_stats.num_input_files_at_output_level,
456         stats.num_input_files_at_output_level);
457 
458     ASSERT_EQ(current_stats.num_output_records,
459         stats.num_output_records);
460     ASSERT_EQ(current_stats.num_output_files,
461         stats.num_output_files);
462 
463     ASSERT_EQ(current_stats.is_manual_compaction,
464         stats.is_manual_compaction);
465 
466     // file size
467     double kFileSizeBias = compression_enabled_ ? 0.20 : 0.10;
468     ASSERT_GE(current_stats.total_input_bytes * (1.00 + kFileSizeBias),
469               stats.total_input_bytes);
470     ASSERT_LE(current_stats.total_input_bytes,
471               stats.total_input_bytes * (1.00 + kFileSizeBias));
472     ASSERT_GE(current_stats.total_output_bytes * (1.00 + kFileSizeBias),
473               stats.total_output_bytes);
474     ASSERT_LE(current_stats.total_output_bytes,
475               stats.total_output_bytes * (1.00 + kFileSizeBias));
476     ASSERT_EQ(current_stats.total_input_raw_key_bytes,
477               stats.total_input_raw_key_bytes);
478     ASSERT_EQ(current_stats.total_input_raw_value_bytes,
479               stats.total_input_raw_value_bytes);
480 
481     ASSERT_EQ(current_stats.num_records_replaced,
482         stats.num_records_replaced);
483 
484     ASSERT_EQ(current_stats.num_corrupt_keys,
485         stats.num_corrupt_keys);
486 
487     ASSERT_EQ(
488         std::string(current_stats.smallest_output_key_prefix),
489         std::string(stats.smallest_output_key_prefix));
490     ASSERT_EQ(
491         std::string(current_stats.largest_output_key_prefix),
492         std::string(stats.largest_output_key_prefix));
493   }
494 
495   // Add an expected compaction stats, which will be used to
496   // verify the CompactionJobStats returned by the OnCompactionCompleted()
497   // callback.
AddExpectedStats(const CompactionJobStats & stats)498   void AddExpectedStats(const CompactionJobStats& stats) {
499     std::lock_guard<std::mutex> lock(mutex_);
500     expected_stats_.push(stats);
501   }
502 
EnableCompression(bool flag)503   void EnableCompression(bool flag) {
504     compression_enabled_ = flag;
505   }
506 
verify_next_comp_io_stats() const507   bool verify_next_comp_io_stats() const { return verify_next_comp_io_stats_; }
508 
509  private:
510   std::mutex mutex_;
511   std::queue<CompactionJobStats> expected_stats_;
512   bool compression_enabled_;
513   bool verify_next_comp_io_stats_;
514 };
515 
516 // An EventListener which helps verify the compaction statistics in
517 // the test DeletionStatsTest.
518 class CompactionJobDeletionStatsChecker : public CompactionJobStatsChecker {
519  public:
520   // Verifies whether two CompactionJobStats match.
Verify(const CompactionJobStats & current_stats,const CompactionJobStats & stats)521   void Verify(const CompactionJobStats& current_stats,
522               const CompactionJobStats& stats) override {
523     ASSERT_EQ(
524       current_stats.num_input_deletion_records,
525       stats.num_input_deletion_records);
526     ASSERT_EQ(
527         current_stats.num_expired_deletion_records,
528         stats.num_expired_deletion_records);
529     ASSERT_EQ(
530         current_stats.num_records_replaced,
531         stats.num_records_replaced);
532 
533     ASSERT_EQ(current_stats.num_corrupt_keys,
534         stats.num_corrupt_keys);
535   }
536 };
537 
538 namespace {
539 
EstimatedFileSize(uint64_t num_records,size_t key_size,size_t value_size,double compression_ratio=1.0,size_t block_size=4096,int bloom_bits_per_key=10)540 uint64_t EstimatedFileSize(
541     uint64_t num_records, size_t key_size, size_t value_size,
542     double compression_ratio = 1.0,
543     size_t block_size = 4096,
544     int bloom_bits_per_key = 10) {
545   const size_t kPerKeyOverhead = 8;
546   const size_t kFooterSize = 512;
547 
548   uint64_t data_size =
549     static_cast<uint64_t>(
550       num_records * (key_size + value_size * compression_ratio +
551                      kPerKeyOverhead));
552 
553   return data_size + kFooterSize
554          + num_records * bloom_bits_per_key / 8      // filter block
555          + data_size * (key_size + 8) / block_size;  // index block
556 }
557 
558 namespace {
559 
CopyPrefix(const Slice & src,size_t prefix_length,std::string * dst)560 void CopyPrefix(
561     const Slice& src, size_t prefix_length, std::string* dst) {
562   assert(prefix_length > 0);
563   size_t length = src.size() > prefix_length ? prefix_length : src.size();
564   dst->assign(src.data(), length);
565 }
566 
567 }  // namespace
568 
NewManualCompactionJobStats(const std::string & smallest_key,const std::string & largest_key,size_t num_input_files,size_t num_input_files_at_output_level,uint64_t num_input_records,size_t key_size,size_t value_size,size_t num_output_files,uint64_t num_output_records,double compression_ratio,uint64_t num_records_replaced,bool is_manual=true)569 CompactionJobStats NewManualCompactionJobStats(
570     const std::string& smallest_key, const std::string& largest_key,
571     size_t num_input_files, size_t num_input_files_at_output_level,
572     uint64_t num_input_records, size_t key_size, size_t value_size,
573     size_t num_output_files, uint64_t num_output_records,
574     double compression_ratio, uint64_t num_records_replaced,
575     bool is_manual = true) {
576   CompactionJobStats stats;
577   stats.Reset();
578 
579   stats.num_input_records = num_input_records;
580   stats.num_input_files = num_input_files;
581   stats.num_input_files_at_output_level = num_input_files_at_output_level;
582 
583   stats.num_output_records = num_output_records;
584   stats.num_output_files = num_output_files;
585 
586   stats.total_input_bytes =
587       EstimatedFileSize(
588           num_input_records / num_input_files,
589           key_size, value_size, compression_ratio) * num_input_files;
590   stats.total_output_bytes =
591       EstimatedFileSize(
592           num_output_records / num_output_files,
593           key_size, value_size, compression_ratio) * num_output_files;
594   stats.total_input_raw_key_bytes =
595       num_input_records * (key_size + 8);
596   stats.total_input_raw_value_bytes =
597       num_input_records * value_size;
598 
599   stats.is_manual_compaction = is_manual;
600 
601   stats.num_records_replaced = num_records_replaced;
602 
603   CopyPrefix(smallest_key,
604              CompactionJobStats::kMaxPrefixLength,
605              &stats.smallest_output_key_prefix);
606   CopyPrefix(largest_key,
607              CompactionJobStats::kMaxPrefixLength,
608              &stats.largest_output_key_prefix);
609 
610   return stats;
611 }
612 
GetAnyCompression()613 CompressionType GetAnyCompression() {
614   if (Snappy_Supported()) {
615     return kSnappyCompression;
616   } else if (Zlib_Supported()) {
617     return kZlibCompression;
618   } else if (BZip2_Supported()) {
619     return kBZip2Compression;
620   } else if (LZ4_Supported()) {
621     return kLZ4Compression;
622   } else if (XPRESS_Supported()) {
623     return kXpressCompression;
624   }
625 
626   return kNoCompression;
627 }
628 
629 }  // namespace
630 
TEST_P(CompactionJobStatsTest,CompactionJobStatsTest)631 TEST_P(CompactionJobStatsTest, CompactionJobStatsTest) {
632   Random rnd(301);
633   const int kBufSize = 100;
634   char buf[kBufSize];
635   uint64_t key_base = 100000000l;
636   // Note: key_base must be multiple of num_keys_per_L0_file
637   int num_keys_per_L0_file = 100;
638   const int kTestScale = 8;
639   const int kKeySize = 10;
640   const int kValueSize = 1000;
641   const double kCompressionRatio = 0.5;
642   double compression_ratio = 1.0;
643   uint64_t key_interval = key_base / num_keys_per_L0_file;
644 
645   // Whenever a compaction completes, this listener will try to
646   // verify whether the returned CompactionJobStats matches
647   // what we expect.  The expected CompactionJobStats is added
648   // via AddExpectedStats().
649   auto* stats_checker = new CompactionJobStatsChecker();
650   Options options;
651   options.listeners.emplace_back(stats_checker);
652   options.create_if_missing = true;
653   // just enough setting to hold off auto-compaction.
654   options.level0_file_num_compaction_trigger = kTestScale + 1;
655   options.num_levels = 3;
656   options.compression = kNoCompression;
657   options.max_subcompactions = max_subcompactions_;
658   options.bytes_per_sync = 512 * 1024;
659 
660   options.report_bg_io_stats = true;
661   for (int test = 0; test < 2; ++test) {
662     DestroyAndReopen(options);
663     CreateAndReopenWithCF({"pikachu"}, options);
664 
665     // 1st Phase: generate "num_L0_files" L0 files.
666     int num_L0_files = 0;
667     for (uint64_t start_key = key_base;
668                   start_key <= key_base * kTestScale;
669                   start_key += key_base) {
670       MakeTableWithKeyValues(
671           &rnd, start_key, start_key + key_base - 1,
672           kKeySize, kValueSize, key_interval,
673           compression_ratio, 1);
674       snprintf(buf, kBufSize, "%d", ++num_L0_files);
675       ASSERT_EQ(std::string(buf), FilesPerLevel(1));
676     }
677     ASSERT_EQ(ToString(num_L0_files), FilesPerLevel(1));
678 
679     // 2nd Phase: perform L0 -> L1 compaction.
680     int L0_compaction_count = 6;
681     int count = 1;
682     std::string smallest_key;
683     std::string largest_key;
684     for (uint64_t start_key = key_base;
685          start_key <= key_base * L0_compaction_count;
686          start_key += key_base, count++) {
687       smallest_key = Key(start_key, 10);
688       largest_key = Key(start_key + key_base - key_interval, 10);
689       stats_checker->AddExpectedStats(
690           NewManualCompactionJobStats(
691               smallest_key, largest_key,
692               1, 0, num_keys_per_L0_file,
693               kKeySize, kValueSize,
694               1, num_keys_per_L0_file,
695               compression_ratio, 0));
696       ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 1U);
697       TEST_Compact(0, 1, smallest_key, largest_key);
698       snprintf(buf, kBufSize, "%d,%d", num_L0_files - count, count);
699       ASSERT_EQ(std::string(buf), FilesPerLevel(1));
700     }
701 
702     // compact two files into one in the last L0 -> L1 compaction
703     int num_remaining_L0 = num_L0_files - L0_compaction_count;
704     smallest_key = Key(key_base * (L0_compaction_count + 1), 10);
705     largest_key = Key(key_base * (kTestScale + 1) - key_interval, 10);
706     stats_checker->AddExpectedStats(
707         NewManualCompactionJobStats(
708             smallest_key, largest_key,
709             num_remaining_L0,
710             0, num_keys_per_L0_file * num_remaining_L0,
711             kKeySize, kValueSize,
712             1, num_keys_per_L0_file * num_remaining_L0,
713             compression_ratio, 0));
714     ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 1U);
715     TEST_Compact(0, 1, smallest_key, largest_key);
716 
717     int num_L1_files = num_L0_files - num_remaining_L0 + 1;
718     num_L0_files = 0;
719     snprintf(buf, kBufSize, "%d,%d", num_L0_files, num_L1_files);
720     ASSERT_EQ(std::string(buf), FilesPerLevel(1));
721 
722     // 3rd Phase: generate sparse L0 files (wider key-range, same num of keys)
723     int sparseness = 2;
724     for (uint64_t start_key = key_base;
725                   start_key <= key_base * kTestScale;
726                   start_key += key_base * sparseness) {
727       MakeTableWithKeyValues(
728           &rnd, start_key, start_key + key_base * sparseness - 1,
729           kKeySize, kValueSize,
730           key_base * sparseness / num_keys_per_L0_file,
731           compression_ratio, 1);
732       snprintf(buf, kBufSize, "%d,%d", ++num_L0_files, num_L1_files);
733       ASSERT_EQ(std::string(buf), FilesPerLevel(1));
734     }
735 
736     // 4th Phase: perform L0 -> L1 compaction again, expect higher write amp
737     // When subcompactions are enabled, the number of output files increases
738     // by 1 because multiple threads are consuming the input and generating
739     // output files without coordinating to see if the output could fit into
740     // a smaller number of files like it does when it runs sequentially
741     int num_output_files = options.max_subcompactions > 1 ? 2 : 1;
742     for (uint64_t start_key = key_base;
743          num_L0_files > 1;
744          start_key += key_base * sparseness) {
745       smallest_key = Key(start_key, 10);
746       largest_key =
747           Key(start_key + key_base * sparseness - key_interval, 10);
748       stats_checker->AddExpectedStats(
749           NewManualCompactionJobStats(
750               smallest_key, largest_key,
751               3, 2, num_keys_per_L0_file * 3,
752               kKeySize, kValueSize,
753               num_output_files,
754               num_keys_per_L0_file * 2,  // 1/3 of the data will be updated.
755               compression_ratio,
756               num_keys_per_L0_file));
757       ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 1U);
758       Compact(1, smallest_key, largest_key);
759       if (options.max_subcompactions == 1) {
760         --num_L1_files;
761       }
762       snprintf(buf, kBufSize, "%d,%d", --num_L0_files, num_L1_files);
763       ASSERT_EQ(std::string(buf), FilesPerLevel(1));
764     }
765 
766     // 5th Phase: Do a full compaction, which involves in two sub-compactions.
767     // Here we expect to have 1 L0 files and 4 L1 files
768     // In the first sub-compaction, we expect L0 compaction.
769     smallest_key = Key(key_base, 10);
770     largest_key = Key(key_base * (kTestScale + 1) - key_interval, 10);
771     stats_checker->AddExpectedStats(
772         NewManualCompactionJobStats(
773             Key(key_base * (kTestScale + 1 - sparseness), 10), largest_key,
774             2, 1, num_keys_per_L0_file * 3,
775             kKeySize, kValueSize,
776             1, num_keys_per_L0_file * 2,
777             compression_ratio,
778             num_keys_per_L0_file));
779     ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 1U);
780     Compact(1, smallest_key, largest_key);
781 
782     num_L1_files = options.max_subcompactions > 1 ? 7 : 4;
783     char L1_buf[4];
784     snprintf(L1_buf, sizeof(L1_buf), "0,%d", num_L1_files);
785     std::string L1_files(L1_buf);
786     ASSERT_EQ(L1_files, FilesPerLevel(1));
787     options.compression = GetAnyCompression();
788     if (options.compression == kNoCompression) {
789       break;
790     }
791     stats_checker->EnableCompression(true);
792     compression_ratio = kCompressionRatio;
793 
794     for (int i = 0; i < 5; i++) {
795       ASSERT_OK(Put(1, Slice(Key(key_base + i, 10)),
796                     Slice(RandomString(&rnd, 512 * 1024, 1))));
797     }
798 
799     ASSERT_OK(Flush(1));
800     reinterpret_cast<DBImpl*>(db_)->TEST_WaitForCompact();
801 
802     stats_checker->set_verify_next_comp_io_stats(true);
803     std::atomic<bool> first_prepare_write(true);
804     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
805         "WritableFileWriter::Append:BeforePrepareWrite", [&](void* /*arg*/) {
806           if (first_prepare_write.load()) {
807             options.env->SleepForMicroseconds(3);
808             first_prepare_write.store(false);
809           }
810         });
811 
812     std::atomic<bool> first_flush(true);
813     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
814         "WritableFileWriter::Flush:BeforeAppend", [&](void* /*arg*/) {
815           if (first_flush.load()) {
816             options.env->SleepForMicroseconds(3);
817             first_flush.store(false);
818           }
819         });
820 
821     std::atomic<bool> first_sync(true);
822     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
823         "WritableFileWriter::SyncInternal:0", [&](void* /*arg*/) {
824           if (first_sync.load()) {
825             options.env->SleepForMicroseconds(3);
826             first_sync.store(false);
827           }
828         });
829 
830     std::atomic<bool> first_range_sync(true);
831     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
832         "WritableFileWriter::RangeSync:0", [&](void* /*arg*/) {
833           if (first_range_sync.load()) {
834             options.env->SleepForMicroseconds(3);
835             first_range_sync.store(false);
836           }
837         });
838     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
839 
840     Compact(1, smallest_key, largest_key);
841 
842     ASSERT_TRUE(!stats_checker->verify_next_comp_io_stats());
843     ASSERT_TRUE(!first_prepare_write.load());
844     ASSERT_TRUE(!first_flush.load());
845     ASSERT_TRUE(!first_sync.load());
846     ASSERT_TRUE(!first_range_sync.load());
847     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
848   }
849   ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 0U);
850 }
851 
TEST_P(CompactionJobStatsTest,DeletionStatsTest)852 TEST_P(CompactionJobStatsTest, DeletionStatsTest) {
853   Random rnd(301);
854   uint64_t key_base = 100000l;
855   // Note: key_base must be multiple of num_keys_per_L0_file
856   int num_keys_per_L0_file = 20;
857   const int kTestScale = 8;  // make sure this is even
858   const int kKeySize = 10;
859   const int kValueSize = 100;
860   double compression_ratio = 1.0;
861   uint64_t key_interval = key_base / num_keys_per_L0_file;
862   uint64_t largest_key_num = key_base * (kTestScale + 1) - key_interval;
863   uint64_t cutoff_key_num = key_base * (kTestScale / 2 + 1) - key_interval;
864   const std::string smallest_key = Key(key_base - 10, kKeySize);
865   const std::string largest_key = Key(largest_key_num + 10, kKeySize);
866 
867   // Whenever a compaction completes, this listener will try to
868   // verify whether the returned CompactionJobStats matches
869   // what we expect.
870   auto* stats_checker = new CompactionJobDeletionStatsChecker();
871   Options options;
872   options.listeners.emplace_back(stats_checker);
873   options.create_if_missing = true;
874   options.level0_file_num_compaction_trigger = kTestScale+1;
875   options.num_levels = 3;
876   options.compression = kNoCompression;
877   options.max_bytes_for_level_multiplier = 2;
878   options.max_subcompactions = max_subcompactions_;
879 
880   DestroyAndReopen(options);
881   CreateAndReopenWithCF({"pikachu"}, options);
882 
883   // Stage 1: Generate several L0 files and then send them to L2 by
884   // using CompactRangeOptions and CompactRange(). These files will
885   // have a strict subset of the keys from the full key-range
886   for (uint64_t start_key = key_base;
887                 start_key <= key_base * kTestScale / 2;
888                 start_key += key_base) {
889     MakeTableWithKeyValues(
890         &rnd, start_key, start_key + key_base - 1,
891         kKeySize, kValueSize, key_interval,
892         compression_ratio, 1);
893   }
894 
895   CompactRangeOptions cr_options;
896   cr_options.change_level = true;
897   cr_options.target_level = 2;
898   db_->CompactRange(cr_options, handles_[1], nullptr, nullptr);
899   ASSERT_GT(NumTableFilesAtLevel(2, 1), 0);
900 
901   // Stage 2: Generate files including keys from the entire key range
902   for (uint64_t start_key = key_base;
903                 start_key <= key_base * kTestScale;
904                 start_key += key_base) {
905     MakeTableWithKeyValues(
906         &rnd, start_key, start_key + key_base - 1,
907         kKeySize, kValueSize, key_interval,
908         compression_ratio, 1);
909   }
910 
911   // Send these L0 files to L1
912   TEST_Compact(0, 1, smallest_key, largest_key);
913   ASSERT_GT(NumTableFilesAtLevel(1, 1), 0);
914 
915   // Add a new record and flush so now there is a L0 file
916   // with a value too (not just deletions from the next step)
917   ASSERT_OK(Put(1, Key(key_base-6, kKeySize), "test"));
918   ASSERT_OK(Flush(1));
919 
920   // Stage 3: Generate L0 files with some deletions so now
921   // there are files with the same key range in L0, L1, and L2
922   int deletion_interval = 3;
923   CompactionJobStats first_compaction_stats;
924   SelectivelyDeleteKeys(key_base, largest_key_num,
925       key_interval, deletion_interval, kKeySize, cutoff_key_num,
926       &first_compaction_stats, 1);
927 
928   stats_checker->AddExpectedStats(first_compaction_stats);
929 
930   // Stage 4: Trigger compaction and verify the stats
931   TEST_Compact(0, 1, smallest_key, largest_key);
932 }
933 
934 namespace {
GetUniversalCompactionInputUnits(uint32_t num_flushes)935 int GetUniversalCompactionInputUnits(uint32_t num_flushes) {
936   uint32_t compaction_input_units;
937   for (compaction_input_units = 1;
938        num_flushes >= compaction_input_units;
939        compaction_input_units *= 2) {
940     if ((num_flushes & compaction_input_units) != 0) {
941       return compaction_input_units > 1 ? compaction_input_units : 0;
942     }
943   }
944   return 0;
945 }
946 }  // namespace
947 
TEST_P(CompactionJobStatsTest,UniversalCompactionTest)948 TEST_P(CompactionJobStatsTest, UniversalCompactionTest) {
949   Random rnd(301);
950   uint64_t key_base = 100000000l;
951   // Note: key_base must be multiple of num_keys_per_L0_file
952   int num_keys_per_table = 100;
953   const uint32_t kTestScale = 6;
954   const int kKeySize = 10;
955   const int kValueSize = 900;
956   double compression_ratio = 1.0;
957   uint64_t key_interval = key_base / num_keys_per_table;
958 
959   auto* stats_checker = new CompactionJobStatsChecker();
960   Options options;
961   options.listeners.emplace_back(stats_checker);
962   options.create_if_missing = true;
963   options.num_levels = 3;
964   options.compression = kNoCompression;
965   options.level0_file_num_compaction_trigger = 2;
966   options.target_file_size_base = num_keys_per_table * 1000;
967   options.compaction_style = kCompactionStyleUniversal;
968   options.compaction_options_universal.size_ratio = 1;
969   options.compaction_options_universal.max_size_amplification_percent = 1000;
970   options.max_subcompactions = max_subcompactions_;
971 
972   DestroyAndReopen(options);
973   CreateAndReopenWithCF({"pikachu"}, options);
974 
975   // Generates the expected CompactionJobStats for each compaction
976   for (uint32_t num_flushes = 2; num_flushes <= kTestScale; num_flushes++) {
977     // Here we treat one newly flushed file as an unit.
978     //
979     // For example, if a newly flushed file is 100k, and a compaction has
980     // 4 input units, then this compaction inputs 400k.
981     uint32_t num_input_units = GetUniversalCompactionInputUnits(num_flushes);
982     if (num_input_units == 0) {
983       continue;
984     }
985     // The following statement determines the expected smallest key
986     // based on whether it is a full compaction.  A full compaction only
987     // happens when the number of flushes equals to the number of compaction
988     // input runs.
989     uint64_t smallest_key =
990         (num_flushes == num_input_units) ?
991             key_base : key_base * (num_flushes - 1);
992 
993     stats_checker->AddExpectedStats(
994         NewManualCompactionJobStats(
995             Key(smallest_key, 10),
996             Key(smallest_key + key_base * num_input_units - key_interval, 10),
997             num_input_units,
998             num_input_units > 2 ? num_input_units / 2 : 0,
999             num_keys_per_table * num_input_units,
1000             kKeySize, kValueSize,
1001             num_input_units,
1002             num_keys_per_table * num_input_units,
1003             1.0, 0, false));
1004     dbfull()->TEST_WaitForCompact();
1005   }
1006   ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 3U);
1007 
1008   for (uint64_t start_key = key_base;
1009                 start_key <= key_base * kTestScale;
1010                 start_key += key_base) {
1011     MakeTableWithKeyValues(
1012         &rnd, start_key, start_key + key_base - 1,
1013         kKeySize, kValueSize, key_interval,
1014         compression_ratio, 1);
1015     reinterpret_cast<DBImpl*>(db_)->TEST_WaitForCompact();
1016   }
1017   ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 0U);
1018 }
1019 
1020 INSTANTIATE_TEST_CASE_P(CompactionJobStatsTest, CompactionJobStatsTest,
1021                         ::testing::Values(1, 4));
1022 }  // namespace ROCKSDB_NAMESPACE
1023 
main(int argc,char ** argv)1024 int main(int argc, char** argv) {
1025   ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
1026   ::testing::InitGoogleTest(&argc, argv);
1027   return RUN_ALL_TESTS();
1028 }
1029 
1030 #else
1031 #include <stdio.h>
1032 
main(int,char **)1033 int main(int /*argc*/, char** /*argv*/) {
1034   fprintf(stderr, "SKIPPED, not supported in ROCKSDB_LITE\n");
1035   return 0;
1036 }
1037 
1038 #endif  // !ROCKSDB_LITE
1039 
1040 #else
1041 
main(int,char **)1042 int main(int /*argc*/, char** /*argv*/) { return 0; }
1043 #endif  // !defined(IOS_CROSS_COMPILE)
1044