1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include <algorithm>
11 #include <cinttypes>
12 #include <iostream>
13 #include <mutex>
14 #include <queue>
15 #include <set>
16 #include <thread>
17 #include <unordered_set>
18 #include <utility>
19 
20 #include "db/db_impl/db_impl.h"
21 #include "db/dbformat.h"
22 #include "db/job_context.h"
23 #include "db/version_set.h"
24 #include "db/write_batch_internal.h"
25 #include "env/mock_env.h"
26 #include "file/filename.h"
27 #include "memtable/hash_linklist_rep.h"
28 #include "monitoring/statistics.h"
29 #include "monitoring/thread_status_util.h"
30 #include "port/stack_trace.h"
31 #include "rocksdb/cache.h"
32 #include "rocksdb/compaction_filter.h"
33 #include "rocksdb/convenience.h"
34 #include "rocksdb/db.h"
35 #include "rocksdb/env.h"
36 #include "rocksdb/experimental.h"
37 #include "rocksdb/filter_policy.h"
38 #include "rocksdb/options.h"
39 #include "rocksdb/perf_context.h"
40 #include "rocksdb/slice.h"
41 #include "rocksdb/slice_transform.h"
42 #include "rocksdb/table.h"
43 #include "rocksdb/table_properties.h"
44 #include "rocksdb/thread_status.h"
45 #include "rocksdb/utilities/checkpoint.h"
46 #include "rocksdb/utilities/write_batch_with_index.h"
47 #include "table/block_based/block_based_table_factory.h"
48 #include "table/mock_table.h"
49 #include "table/plain/plain_table_factory.h"
50 #include "table/scoped_arena_iterator.h"
51 #include "test_util/sync_point.h"
52 #include "test_util/testharness.h"
53 #include "test_util/testutil.h"
54 #include "util/cast_util.h"
55 #include "util/compression.h"
56 #include "util/hash.h"
57 #include "util/mutexlock.h"
58 #include "util/rate_limiter.h"
59 #include "util/string_util.h"
60 #include "utilities/merge_operators.h"
61 
62 #if !defined(IOS_CROSS_COMPILE)
63 #ifndef ROCKSDB_LITE
64 namespace ROCKSDB_NAMESPACE {
65 
RandomString(Random * rnd,int len,double ratio)66 static std::string RandomString(Random* rnd, int len, double ratio) {
67   std::string r;
68   test::CompressibleString(rnd, ratio, len, &r);
69   return r;
70 }
71 
Key(uint64_t key,int length)72 std::string Key(uint64_t key, int length) {
73   const int kBufSize = 1000;
74   char buf[kBufSize];
75   if (length > kBufSize) {
76     length = kBufSize;
77   }
78   snprintf(buf, kBufSize, "%0*" PRIu64, length, key);
79   return std::string(buf);
80 }
81 
82 class CompactionJobStatsTest : public testing::Test,
83                                public testing::WithParamInterface<bool> {
84  public:
85   std::string dbname_;
86   std::string alternative_wal_dir_;
87   Env* env_;
88   DB* db_;
89   std::vector<ColumnFamilyHandle*> handles_;
90   uint32_t max_subcompactions_;
91 
92   Options last_options_;
93 
CompactionJobStatsTest()94   CompactionJobStatsTest() : env_(Env::Default()) {
95     env_->SetBackgroundThreads(1, Env::LOW);
96     env_->SetBackgroundThreads(1, Env::HIGH);
97     dbname_ = test::PerThreadDBPath("compaction_job_stats_test");
98     alternative_wal_dir_ = dbname_ + "/wal";
99     Options options;
100     options.create_if_missing = true;
101     max_subcompactions_ = GetParam();
102     options.max_subcompactions = max_subcompactions_;
103     auto delete_options = options;
104     delete_options.wal_dir = alternative_wal_dir_;
105     EXPECT_OK(DestroyDB(dbname_, delete_options));
106     // Destroy it for not alternative WAL dir is used.
107     EXPECT_OK(DestroyDB(dbname_, options));
108     db_ = nullptr;
109     Reopen(options);
110   }
111 
~CompactionJobStatsTest()112   ~CompactionJobStatsTest() override {
113     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
114     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({});
115     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
116     Close();
117     Options options;
118     options.db_paths.emplace_back(dbname_, 0);
119     options.db_paths.emplace_back(dbname_ + "_2", 0);
120     options.db_paths.emplace_back(dbname_ + "_3", 0);
121     options.db_paths.emplace_back(dbname_ + "_4", 0);
122     EXPECT_OK(DestroyDB(dbname_, options));
123   }
124 
125   // Required if inheriting from testing::WithParamInterface<>
SetUpTestCase()126   static void SetUpTestCase() {}
TearDownTestCase()127   static void TearDownTestCase() {}
128 
dbfull()129   DBImpl* dbfull() { return static_cast_with_check<DBImpl>(db_); }
130 
CreateColumnFamilies(const std::vector<std::string> & cfs,const Options & options)131   void CreateColumnFamilies(const std::vector<std::string>& cfs,
132                             const Options& options) {
133     ColumnFamilyOptions cf_opts(options);
134     size_t cfi = handles_.size();
135     handles_.resize(cfi + cfs.size());
136     for (auto cf : cfs) {
137       ASSERT_OK(db_->CreateColumnFamily(cf_opts, cf, &handles_[cfi++]));
138     }
139   }
140 
CreateAndReopenWithCF(const std::vector<std::string> & cfs,const Options & options)141   void CreateAndReopenWithCF(const std::vector<std::string>& cfs,
142                              const Options& options) {
143     CreateColumnFamilies(cfs, options);
144     std::vector<std::string> cfs_plus_default = cfs;
145     cfs_plus_default.insert(cfs_plus_default.begin(), kDefaultColumnFamilyName);
146     ReopenWithColumnFamilies(cfs_plus_default, options);
147   }
148 
ReopenWithColumnFamilies(const std::vector<std::string> & cfs,const std::vector<Options> & options)149   void ReopenWithColumnFamilies(const std::vector<std::string>& cfs,
150                                 const std::vector<Options>& options) {
151     ASSERT_OK(TryReopenWithColumnFamilies(cfs, options));
152   }
153 
ReopenWithColumnFamilies(const std::vector<std::string> & cfs,const Options & options)154   void ReopenWithColumnFamilies(const std::vector<std::string>& cfs,
155                                 const Options& options) {
156     ASSERT_OK(TryReopenWithColumnFamilies(cfs, options));
157   }
158 
TryReopenWithColumnFamilies(const std::vector<std::string> & cfs,const std::vector<Options> & options)159   Status TryReopenWithColumnFamilies(
160       const std::vector<std::string>& cfs,
161       const std::vector<Options>& options) {
162     Close();
163     EXPECT_EQ(cfs.size(), options.size());
164     std::vector<ColumnFamilyDescriptor> column_families;
165     for (size_t i = 0; i < cfs.size(); ++i) {
166       column_families.push_back(ColumnFamilyDescriptor(cfs[i], options[i]));
167     }
168     DBOptions db_opts = DBOptions(options[0]);
169     return DB::Open(db_opts, dbname_, column_families, &handles_, &db_);
170   }
171 
TryReopenWithColumnFamilies(const std::vector<std::string> & cfs,const Options & options)172   Status TryReopenWithColumnFamilies(const std::vector<std::string>& cfs,
173                                      const Options& options) {
174     Close();
175     std::vector<Options> v_opts(cfs.size(), options);
176     return TryReopenWithColumnFamilies(cfs, v_opts);
177   }
178 
Reopen(const Options & options)179   void Reopen(const Options& options) {
180     ASSERT_OK(TryReopen(options));
181   }
182 
Close()183   void Close() {
184     for (auto h : handles_) {
185       delete h;
186     }
187     handles_.clear();
188     delete db_;
189     db_ = nullptr;
190   }
191 
DestroyAndReopen(const Options & options)192   void DestroyAndReopen(const Options& options) {
193     // Destroy using last options
194     Destroy(last_options_);
195     ASSERT_OK(TryReopen(options));
196   }
197 
Destroy(const Options & options)198   void Destroy(const Options& options) {
199     Close();
200     ASSERT_OK(DestroyDB(dbname_, options));
201   }
202 
ReadOnlyReopen(const Options & options)203   Status ReadOnlyReopen(const Options& options) {
204     return DB::OpenForReadOnly(options, dbname_, &db_);
205   }
206 
TryReopen(const Options & options)207   Status TryReopen(const Options& options) {
208     Close();
209     last_options_ = options;
210     return DB::Open(options, dbname_, &db_);
211   }
212 
Flush(int cf=0)213   Status Flush(int cf = 0) {
214     if (cf == 0) {
215       return db_->Flush(FlushOptions());
216     } else {
217       return db_->Flush(FlushOptions(), handles_[cf]);
218     }
219   }
220 
Put(const Slice & k,const Slice & v,WriteOptions wo=WriteOptions ())221   Status Put(const Slice& k, const Slice& v, WriteOptions wo = WriteOptions()) {
222     return db_->Put(wo, k, v);
223   }
224 
Put(int cf,const Slice & k,const Slice & v,WriteOptions wo=WriteOptions ())225   Status Put(int cf, const Slice& k, const Slice& v,
226              WriteOptions wo = WriteOptions()) {
227     return db_->Put(wo, handles_[cf], k, v);
228   }
229 
Delete(const std::string & k)230   Status Delete(const std::string& k) {
231     return db_->Delete(WriteOptions(), k);
232   }
233 
Delete(int cf,const std::string & k)234   Status Delete(int cf, const std::string& k) {
235     return db_->Delete(WriteOptions(), handles_[cf], k);
236   }
237 
Get(const std::string & k,const Snapshot * snapshot=nullptr)238   std::string Get(const std::string& k, const Snapshot* snapshot = nullptr) {
239     ReadOptions options;
240     options.verify_checksums = true;
241     options.snapshot = snapshot;
242     std::string result;
243     Status s = db_->Get(options, k, &result);
244     if (s.IsNotFound()) {
245       result = "NOT_FOUND";
246     } else if (!s.ok()) {
247       result = s.ToString();
248     }
249     return result;
250   }
251 
Get(int cf,const std::string & k,const Snapshot * snapshot=nullptr)252   std::string Get(int cf, const std::string& k,
253                   const Snapshot* snapshot = nullptr) {
254     ReadOptions options;
255     options.verify_checksums = true;
256     options.snapshot = snapshot;
257     std::string result;
258     Status s = db_->Get(options, handles_[cf], k, &result);
259     if (s.IsNotFound()) {
260       result = "NOT_FOUND";
261     } else if (!s.ok()) {
262       result = s.ToString();
263     }
264     return result;
265   }
266 
NumTableFilesAtLevel(int level,int cf=0)267   int NumTableFilesAtLevel(int level, int cf = 0) {
268     std::string property;
269     if (cf == 0) {
270       // default cfd
271       EXPECT_TRUE(db_->GetProperty(
272           "rocksdb.num-files-at-level" + NumberToString(level), &property));
273     } else {
274       EXPECT_TRUE(db_->GetProperty(
275           handles_[cf], "rocksdb.num-files-at-level" + NumberToString(level),
276           &property));
277     }
278     return atoi(property.c_str());
279   }
280 
281   // Return spread of files per level
FilesPerLevel(int cf=0)282   std::string FilesPerLevel(int cf = 0) {
283     int num_levels =
284         (cf == 0) ? db_->NumberLevels() : db_->NumberLevels(handles_[1]);
285     std::string result;
286     size_t last_non_zero_offset = 0;
287     for (int level = 0; level < num_levels; level++) {
288       int f = NumTableFilesAtLevel(level, cf);
289       char buf[100];
290       snprintf(buf, sizeof(buf), "%s%d", (level ? "," : ""), f);
291       result += buf;
292       if (f > 0) {
293         last_non_zero_offset = result.size();
294       }
295     }
296     result.resize(last_non_zero_offset);
297     return result;
298   }
299 
Size(uint64_t * size,const Slice & start,const Slice & limit,int cf=0)300   Status Size(uint64_t* size, const Slice& start, const Slice& limit,
301               int cf = 0) {
302     Range r(start, limit);
303     if (cf == 0) {
304       return db_->GetApproximateSizes(&r, 1, size);
305     } else {
306       return db_->GetApproximateSizes(handles_[1], &r, 1, size);
307     }
308   }
309 
Compact(int cf,const Slice & start,const Slice & limit,uint32_t target_path_id)310   void Compact(int cf, const Slice& start, const Slice& limit,
311                uint32_t target_path_id) {
312     CompactRangeOptions compact_options;
313     compact_options.target_path_id = target_path_id;
314     ASSERT_OK(db_->CompactRange(compact_options, handles_[cf], &start, &limit));
315   }
316 
Compact(int cf,const Slice & start,const Slice & limit)317   void Compact(int cf, const Slice& start, const Slice& limit) {
318     ASSERT_OK(
319         db_->CompactRange(CompactRangeOptions(), handles_[cf], &start, &limit));
320   }
321 
Compact(const Slice & start,const Slice & limit)322   void Compact(const Slice& start, const Slice& limit) {
323     ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &limit));
324   }
325 
TEST_Compact(int level,int cf,const Slice & start,const Slice & limit)326   void TEST_Compact(int level, int cf, const Slice& start, const Slice& limit) {
327     ASSERT_OK(dbfull()->TEST_CompactRange(level, &start, &limit, handles_[cf],
328                                           true /* disallow trivial move */));
329   }
330 
331   // Do n memtable compactions, each of which produces an sstable
332   // covering the range [small,large].
MakeTables(int n,const std::string & small,const std::string & large,int cf=0)333   void MakeTables(int n, const std::string& small, const std::string& large,
334                   int cf = 0) {
335     for (int i = 0; i < n; i++) {
336       ASSERT_OK(Put(cf, small, "begin"));
337       ASSERT_OK(Put(cf, large, "end"));
338       ASSERT_OK(Flush(cf));
339     }
340   }
341 
SetDeletionCompactionStats(CompactionJobStats * stats,uint64_t input_deletions,uint64_t expired_deletions,uint64_t records_replaced)342   static void SetDeletionCompactionStats(
343       CompactionJobStats *stats, uint64_t input_deletions,
344       uint64_t expired_deletions, uint64_t records_replaced) {
345     stats->num_input_deletion_records = input_deletions;
346     stats->num_expired_deletion_records = expired_deletions;
347     stats->num_records_replaced = records_replaced;
348   }
349 
MakeTableWithKeyValues(Random * rnd,uint64_t smallest,uint64_t largest,int key_size,int value_size,uint64_t interval,double ratio,int cf=0)350   void MakeTableWithKeyValues(
351     Random* rnd, uint64_t smallest, uint64_t largest,
352     int key_size, int value_size, uint64_t interval,
353     double ratio, int cf = 0) {
354     for (auto key = smallest; key < largest; key += interval) {
355       ASSERT_OK(Put(cf, Slice(Key(key, key_size)),
356                         Slice(RandomString(rnd, value_size, ratio))));
357     }
358     ASSERT_OK(Flush(cf));
359   }
360 
361   // This function behaves with the implicit understanding that two
362   // rounds of keys are inserted into the database, as per the behavior
363   // of the DeletionStatsTest.
SelectivelyDeleteKeys(uint64_t smallest,uint64_t largest,uint64_t interval,int deletion_interval,int key_size,uint64_t cutoff_key_num,CompactionJobStats * stats,int cf=0)364   void SelectivelyDeleteKeys(uint64_t smallest, uint64_t largest,
365     uint64_t interval, int deletion_interval, int key_size,
366     uint64_t cutoff_key_num, CompactionJobStats* stats, int cf = 0) {
367 
368     // interval needs to be >= 2 so that deletion entries can be inserted
369     // that are intended to not result in an actual key deletion by using
370     // an offset of 1 from another existing key
371     ASSERT_GE(interval, 2);
372 
373     uint64_t ctr = 1;
374     uint32_t deletions_made = 0;
375     uint32_t num_deleted = 0;
376     uint32_t num_expired = 0;
377     for (auto key = smallest; key <= largest; key += interval, ctr++) {
378       if (ctr % deletion_interval == 0) {
379         ASSERT_OK(Delete(cf, Key(key, key_size)));
380         deletions_made++;
381         num_deleted++;
382 
383         if (key > cutoff_key_num) {
384           num_expired++;
385         }
386       }
387     }
388 
389     // Insert some deletions for keys that don't exist that
390     // are both in and out of the key range
391     ASSERT_OK(Delete(cf, Key(smallest+1, key_size)));
392     deletions_made++;
393 
394     ASSERT_OK(Delete(cf, Key(smallest-1, key_size)));
395     deletions_made++;
396     num_expired++;
397 
398     ASSERT_OK(Delete(cf, Key(smallest-9, key_size)));
399     deletions_made++;
400     num_expired++;
401 
402     ASSERT_OK(Flush(cf));
403     SetDeletionCompactionStats(stats, deletions_made, num_expired,
404       num_deleted);
405   }
406 };
407 
408 // An EventListener which helps verify the compaction results in
409 // test CompactionJobStatsTest.
410 class CompactionJobStatsChecker : public EventListener {
411  public:
CompactionJobStatsChecker()412   CompactionJobStatsChecker()
413       : compression_enabled_(false), verify_next_comp_io_stats_(false) {}
414 
NumberOfUnverifiedStats()415   size_t NumberOfUnverifiedStats() { return expected_stats_.size(); }
416 
set_verify_next_comp_io_stats(bool v)417   void set_verify_next_comp_io_stats(bool v) { verify_next_comp_io_stats_ = v; }
418 
419   // Once a compaction completed, this function will verify the returned
420   // CompactionJobInfo with the oldest CompactionJobInfo added earlier
421   // in "expected_stats_" which has not yet being used for verification.
OnCompactionCompleted(DB *,const CompactionJobInfo & ci)422   void OnCompactionCompleted(DB* /*db*/, const CompactionJobInfo& ci) override {
423     if (verify_next_comp_io_stats_) {
424       ASSERT_GT(ci.stats.file_write_nanos, 0);
425       ASSERT_GT(ci.stats.file_range_sync_nanos, 0);
426       ASSERT_GT(ci.stats.file_fsync_nanos, 0);
427       ASSERT_GT(ci.stats.file_prepare_write_nanos, 0);
428       verify_next_comp_io_stats_ = false;
429     }
430 
431     std::lock_guard<std::mutex> lock(mutex_);
432     if (expected_stats_.size()) {
433       Verify(ci.stats, expected_stats_.front());
434       expected_stats_.pop();
435     }
436   }
437 
438   // A helper function which verifies whether two CompactionJobStats
439   // match.  The verification of all compaction stats are done by
440   // ASSERT_EQ except for the total input / output bytes, which we
441   // use ASSERT_GE and ASSERT_LE with a reasonable bias ---
442   // 10% in uncompressed case and 20% when compression is used.
Verify(const CompactionJobStats & current_stats,const CompactionJobStats & stats)443   virtual void Verify(const CompactionJobStats& current_stats,
444               const CompactionJobStats& stats) {
445     // time
446     ASSERT_GT(current_stats.elapsed_micros, 0U);
447 
448     ASSERT_EQ(current_stats.num_input_records,
449         stats.num_input_records);
450     ASSERT_EQ(current_stats.num_input_files,
451         stats.num_input_files);
452     ASSERT_EQ(current_stats.num_input_files_at_output_level,
453         stats.num_input_files_at_output_level);
454 
455     ASSERT_EQ(current_stats.num_output_records,
456         stats.num_output_records);
457     ASSERT_EQ(current_stats.num_output_files,
458         stats.num_output_files);
459 
460     ASSERT_EQ(current_stats.is_full_compaction, stats.is_full_compaction);
461     ASSERT_EQ(current_stats.is_manual_compaction,
462         stats.is_manual_compaction);
463 
464     // file size
465     double kFileSizeBias = compression_enabled_ ? 0.20 : 0.10;
466     ASSERT_GE(current_stats.total_input_bytes * (1.00 + kFileSizeBias),
467               stats.total_input_bytes);
468     ASSERT_LE(current_stats.total_input_bytes,
469               stats.total_input_bytes * (1.00 + kFileSizeBias));
470     ASSERT_GE(current_stats.total_output_bytes * (1.00 + kFileSizeBias),
471               stats.total_output_bytes);
472     ASSERT_LE(current_stats.total_output_bytes,
473               stats.total_output_bytes * (1.00 + kFileSizeBias));
474     ASSERT_EQ(current_stats.total_input_raw_key_bytes,
475               stats.total_input_raw_key_bytes);
476     ASSERT_EQ(current_stats.total_input_raw_value_bytes,
477               stats.total_input_raw_value_bytes);
478 
479     ASSERT_EQ(current_stats.num_records_replaced,
480         stats.num_records_replaced);
481 
482     ASSERT_EQ(current_stats.num_corrupt_keys,
483         stats.num_corrupt_keys);
484 
485     ASSERT_EQ(
486         std::string(current_stats.smallest_output_key_prefix),
487         std::string(stats.smallest_output_key_prefix));
488     ASSERT_EQ(
489         std::string(current_stats.largest_output_key_prefix),
490         std::string(stats.largest_output_key_prefix));
491   }
492 
493   // Add an expected compaction stats, which will be used to
494   // verify the CompactionJobStats returned by the OnCompactionCompleted()
495   // callback.
AddExpectedStats(const CompactionJobStats & stats)496   void AddExpectedStats(const CompactionJobStats& stats) {
497     std::lock_guard<std::mutex> lock(mutex_);
498     expected_stats_.push(stats);
499   }
500 
EnableCompression(bool flag)501   void EnableCompression(bool flag) {
502     compression_enabled_ = flag;
503   }
504 
verify_next_comp_io_stats() const505   bool verify_next_comp_io_stats() const { return verify_next_comp_io_stats_; }
506 
507  private:
508   std::mutex mutex_;
509   std::queue<CompactionJobStats> expected_stats_;
510   bool compression_enabled_;
511   bool verify_next_comp_io_stats_;
512 };
513 
514 // An EventListener which helps verify the compaction statistics in
515 // the test DeletionStatsTest.
516 class CompactionJobDeletionStatsChecker : public CompactionJobStatsChecker {
517  public:
518   // Verifies whether two CompactionJobStats match.
Verify(const CompactionJobStats & current_stats,const CompactionJobStats & stats)519   void Verify(const CompactionJobStats& current_stats,
520               const CompactionJobStats& stats) override {
521     ASSERT_EQ(
522       current_stats.num_input_deletion_records,
523       stats.num_input_deletion_records);
524     ASSERT_EQ(
525         current_stats.num_expired_deletion_records,
526         stats.num_expired_deletion_records);
527     ASSERT_EQ(
528         current_stats.num_records_replaced,
529         stats.num_records_replaced);
530 
531     ASSERT_EQ(current_stats.num_corrupt_keys,
532         stats.num_corrupt_keys);
533   }
534 };
535 
536 namespace {
537 
EstimatedFileSize(uint64_t num_records,size_t key_size,size_t value_size,double compression_ratio=1.0,size_t block_size=4096,int bloom_bits_per_key=10)538 uint64_t EstimatedFileSize(
539     uint64_t num_records, size_t key_size, size_t value_size,
540     double compression_ratio = 1.0,
541     size_t block_size = 4096,
542     int bloom_bits_per_key = 10) {
543   const size_t kPerKeyOverhead = 8;
544   const size_t kFooterSize = 512;
545 
546   uint64_t data_size =
547     static_cast<uint64_t>(
548       num_records * (key_size + value_size * compression_ratio +
549                      kPerKeyOverhead));
550 
551   return data_size + kFooterSize
552          + num_records * bloom_bits_per_key / 8      // filter block
553          + data_size * (key_size + 8) / block_size;  // index block
554 }
555 
556 namespace {
557 
CopyPrefix(const Slice & src,size_t prefix_length,std::string * dst)558 void CopyPrefix(
559     const Slice& src, size_t prefix_length, std::string* dst) {
560   assert(prefix_length > 0);
561   size_t length = src.size() > prefix_length ? prefix_length : src.size();
562   dst->assign(src.data(), length);
563 }
564 
565 }  // namespace
566 
NewManualCompactionJobStats(const std::string & smallest_key,const std::string & largest_key,size_t num_input_files,size_t num_input_files_at_output_level,uint64_t num_input_records,size_t key_size,size_t value_size,size_t num_output_files,uint64_t num_output_records,double compression_ratio,uint64_t num_records_replaced,bool is_full=false,bool is_manual=true)567 CompactionJobStats NewManualCompactionJobStats(
568     const std::string& smallest_key, const std::string& largest_key,
569     size_t num_input_files, size_t num_input_files_at_output_level,
570     uint64_t num_input_records, size_t key_size, size_t value_size,
571     size_t num_output_files, uint64_t num_output_records,
572     double compression_ratio, uint64_t num_records_replaced,
573     bool is_full = false, bool is_manual = true) {
574   CompactionJobStats stats;
575   stats.Reset();
576 
577   stats.num_input_records = num_input_records;
578   stats.num_input_files = num_input_files;
579   stats.num_input_files_at_output_level = num_input_files_at_output_level;
580 
581   stats.num_output_records = num_output_records;
582   stats.num_output_files = num_output_files;
583 
584   stats.total_input_bytes =
585       EstimatedFileSize(
586           num_input_records / num_input_files,
587           key_size, value_size, compression_ratio) * num_input_files;
588   stats.total_output_bytes =
589       EstimatedFileSize(
590           num_output_records / num_output_files,
591           key_size, value_size, compression_ratio) * num_output_files;
592   stats.total_input_raw_key_bytes =
593       num_input_records * (key_size + 8);
594   stats.total_input_raw_value_bytes =
595       num_input_records * value_size;
596 
597   stats.is_full_compaction = is_full;
598   stats.is_manual_compaction = is_manual;
599 
600   stats.num_records_replaced = num_records_replaced;
601 
602   CopyPrefix(smallest_key,
603              CompactionJobStats::kMaxPrefixLength,
604              &stats.smallest_output_key_prefix);
605   CopyPrefix(largest_key,
606              CompactionJobStats::kMaxPrefixLength,
607              &stats.largest_output_key_prefix);
608 
609   return stats;
610 }
611 
GetAnyCompression()612 CompressionType GetAnyCompression() {
613   if (Snappy_Supported()) {
614     return kSnappyCompression;
615   } else if (Zlib_Supported()) {
616     return kZlibCompression;
617   } else if (BZip2_Supported()) {
618     return kBZip2Compression;
619   } else if (LZ4_Supported()) {
620     return kLZ4Compression;
621   } else if (XPRESS_Supported()) {
622     return kXpressCompression;
623   }
624 
625   return kNoCompression;
626 }
627 
628 }  // namespace
629 
TEST_P(CompactionJobStatsTest,CompactionJobStatsTest)630 TEST_P(CompactionJobStatsTest, CompactionJobStatsTest) {
631   Random rnd(301);
632   const int kBufSize = 100;
633   char buf[kBufSize];
634   uint64_t key_base = 100000000l;
635   // Note: key_base must be multiple of num_keys_per_L0_file
636   int num_keys_per_L0_file = 100;
637   const int kTestScale = 8;
638   const int kKeySize = 10;
639   const int kValueSize = 1000;
640   const double kCompressionRatio = 0.5;
641   double compression_ratio = 1.0;
642   uint64_t key_interval = key_base / num_keys_per_L0_file;
643 
644   // Whenever a compaction completes, this listener will try to
645   // verify whether the returned CompactionJobStats matches
646   // what we expect.  The expected CompactionJobStats is added
647   // via AddExpectedStats().
648   auto* stats_checker = new CompactionJobStatsChecker();
649   Options options;
650   options.listeners.emplace_back(stats_checker);
651   options.create_if_missing = true;
652   // just enough setting to hold off auto-compaction.
653   options.level0_file_num_compaction_trigger = kTestScale + 1;
654   options.num_levels = 3;
655   options.compression = kNoCompression;
656   options.max_subcompactions = max_subcompactions_;
657   options.bytes_per_sync = 512 * 1024;
658 
659   options.report_bg_io_stats = true;
660   for (int test = 0; test < 2; ++test) {
661     DestroyAndReopen(options);
662     CreateAndReopenWithCF({"pikachu"}, options);
663 
664     // 1st Phase: generate "num_L0_files" L0 files.
665     int num_L0_files = 0;
666     for (uint64_t start_key = key_base;
667                   start_key <= key_base * kTestScale;
668                   start_key += key_base) {
669       MakeTableWithKeyValues(
670           &rnd, start_key, start_key + key_base - 1,
671           kKeySize, kValueSize, key_interval,
672           compression_ratio, 1);
673       snprintf(buf, kBufSize, "%d", ++num_L0_files);
674       ASSERT_EQ(std::string(buf), FilesPerLevel(1));
675     }
676     ASSERT_EQ(ToString(num_L0_files), FilesPerLevel(1));
677 
678     // 2nd Phase: perform L0 -> L1 compaction.
679     int L0_compaction_count = 6;
680     int count = 1;
681     std::string smallest_key;
682     std::string largest_key;
683     for (uint64_t start_key = key_base;
684          start_key <= key_base * L0_compaction_count;
685          start_key += key_base, count++) {
686       smallest_key = Key(start_key, 10);
687       largest_key = Key(start_key + key_base - key_interval, 10);
688       stats_checker->AddExpectedStats(
689           NewManualCompactionJobStats(
690               smallest_key, largest_key,
691               1, 0, num_keys_per_L0_file,
692               kKeySize, kValueSize,
693               1, num_keys_per_L0_file,
694               compression_ratio, 0));
695       ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 1U);
696       TEST_Compact(0, 1, smallest_key, largest_key);
697       snprintf(buf, kBufSize, "%d,%d", num_L0_files - count, count);
698       ASSERT_EQ(std::string(buf), FilesPerLevel(1));
699     }
700 
701     // compact two files into one in the last L0 -> L1 compaction
702     int num_remaining_L0 = num_L0_files - L0_compaction_count;
703     smallest_key = Key(key_base * (L0_compaction_count + 1), 10);
704     largest_key = Key(key_base * (kTestScale + 1) - key_interval, 10);
705     stats_checker->AddExpectedStats(
706         NewManualCompactionJobStats(
707             smallest_key, largest_key,
708             num_remaining_L0,
709             0, num_keys_per_L0_file * num_remaining_L0,
710             kKeySize, kValueSize,
711             1, num_keys_per_L0_file * num_remaining_L0,
712             compression_ratio, 0));
713     ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 1U);
714     TEST_Compact(0, 1, smallest_key, largest_key);
715 
716     int num_L1_files = num_L0_files - num_remaining_L0 + 1;
717     num_L0_files = 0;
718     snprintf(buf, kBufSize, "%d,%d", num_L0_files, num_L1_files);
719     ASSERT_EQ(std::string(buf), FilesPerLevel(1));
720 
721     // 3rd Phase: generate sparse L0 files (wider key-range, same num of keys)
722     int sparseness = 2;
723     for (uint64_t start_key = key_base;
724                   start_key <= key_base * kTestScale;
725                   start_key += key_base * sparseness) {
726       MakeTableWithKeyValues(
727           &rnd, start_key, start_key + key_base * sparseness - 1,
728           kKeySize, kValueSize,
729           key_base * sparseness / num_keys_per_L0_file,
730           compression_ratio, 1);
731       snprintf(buf, kBufSize, "%d,%d", ++num_L0_files, num_L1_files);
732       ASSERT_EQ(std::string(buf), FilesPerLevel(1));
733     }
734 
735     // 4th Phase: perform L0 -> L1 compaction again, expect higher write amp
736     // When subcompactions are enabled, the number of output files increases
737     // by 1 because multiple threads are consuming the input and generating
738     // output files without coordinating to see if the output could fit into
739     // a smaller number of files like it does when it runs sequentially
740     int num_output_files = options.max_subcompactions > 1 ? 2 : 1;
741     for (uint64_t start_key = key_base;
742          num_L0_files > 1;
743          start_key += key_base * sparseness) {
744       smallest_key = Key(start_key, 10);
745       largest_key =
746           Key(start_key + key_base * sparseness - key_interval, 10);
747       stats_checker->AddExpectedStats(
748           NewManualCompactionJobStats(
749               smallest_key, largest_key,
750               3, 2, num_keys_per_L0_file * 3,
751               kKeySize, kValueSize,
752               num_output_files,
753               num_keys_per_L0_file * 2,  // 1/3 of the data will be updated.
754               compression_ratio,
755               num_keys_per_L0_file));
756       ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 1U);
757       Compact(1, smallest_key, largest_key);
758       if (options.max_subcompactions == 1) {
759         --num_L1_files;
760       }
761       snprintf(buf, kBufSize, "%d,%d", --num_L0_files, num_L1_files);
762       ASSERT_EQ(std::string(buf), FilesPerLevel(1));
763     }
764 
765     // 5th Phase: Do a full compaction, which involves in two sub-compactions.
766     // Here we expect to have 1 L0 files and 4 L1 files
767     // In the first sub-compaction, we expect L0 compaction.
768     smallest_key = Key(key_base, 10);
769     largest_key = Key(key_base * (kTestScale + 1) - key_interval, 10);
770     stats_checker->AddExpectedStats(
771         NewManualCompactionJobStats(
772             Key(key_base * (kTestScale + 1 - sparseness), 10), largest_key,
773             2, 1, num_keys_per_L0_file * 3,
774             kKeySize, kValueSize,
775             1, num_keys_per_L0_file * 2,
776             compression_ratio,
777             num_keys_per_L0_file));
778     ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 1U);
779     Compact(1, smallest_key, largest_key);
780 
781     num_L1_files = options.max_subcompactions > 1 ? 7 : 4;
782     char L1_buf[4];
783     snprintf(L1_buf, sizeof(L1_buf), "0,%d", num_L1_files);
784     std::string L1_files(L1_buf);
785     ASSERT_EQ(L1_files, FilesPerLevel(1));
786     options.compression = GetAnyCompression();
787     if (options.compression == kNoCompression) {
788       break;
789     }
790     stats_checker->EnableCompression(true);
791     compression_ratio = kCompressionRatio;
792 
793     for (int i = 0; i < 5; i++) {
794       ASSERT_OK(Put(1, Slice(Key(key_base + i, 10)),
795                     Slice(RandomString(&rnd, 512 * 1024, 1))));
796     }
797 
798     ASSERT_OK(Flush(1));
799     ASSERT_OK(static_cast_with_check<DBImpl>(db_)->TEST_WaitForCompact());
800 
801     stats_checker->set_verify_next_comp_io_stats(true);
802     std::atomic<bool> first_prepare_write(true);
803     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
804         "WritableFileWriter::Append:BeforePrepareWrite", [&](void* /*arg*/) {
805           if (first_prepare_write.load()) {
806             options.env->SleepForMicroseconds(3);
807             first_prepare_write.store(false);
808           }
809         });
810 
811     std::atomic<bool> first_flush(true);
812     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
813         "WritableFileWriter::Flush:BeforeAppend", [&](void* /*arg*/) {
814           if (first_flush.load()) {
815             options.env->SleepForMicroseconds(3);
816             first_flush.store(false);
817           }
818         });
819 
820     std::atomic<bool> first_sync(true);
821     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
822         "WritableFileWriter::SyncInternal:0", [&](void* /*arg*/) {
823           if (first_sync.load()) {
824             options.env->SleepForMicroseconds(3);
825             first_sync.store(false);
826           }
827         });
828 
829     std::atomic<bool> first_range_sync(true);
830     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
831         "WritableFileWriter::RangeSync:0", [&](void* /*arg*/) {
832           if (first_range_sync.load()) {
833             options.env->SleepForMicroseconds(3);
834             first_range_sync.store(false);
835           }
836         });
837     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
838 
839     Compact(1, smallest_key, largest_key);
840 
841     ASSERT_TRUE(!stats_checker->verify_next_comp_io_stats());
842     ASSERT_TRUE(!first_prepare_write.load());
843     ASSERT_TRUE(!first_flush.load());
844     ASSERT_TRUE(!first_sync.load());
845     ASSERT_TRUE(!first_range_sync.load());
846     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
847   }
848   ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 0U);
849 }
850 
TEST_P(CompactionJobStatsTest,DeletionStatsTest)851 TEST_P(CompactionJobStatsTest, DeletionStatsTest) {
852   Random rnd(301);
853   uint64_t key_base = 100000l;
854   // Note: key_base must be multiple of num_keys_per_L0_file
855   int num_keys_per_L0_file = 20;
856   const int kTestScale = 8;  // make sure this is even
857   const int kKeySize = 10;
858   const int kValueSize = 100;
859   double compression_ratio = 1.0;
860   uint64_t key_interval = key_base / num_keys_per_L0_file;
861   uint64_t largest_key_num = key_base * (kTestScale + 1) - key_interval;
862   uint64_t cutoff_key_num = key_base * (kTestScale / 2 + 1) - key_interval;
863   const std::string smallest_key = Key(key_base - 10, kKeySize);
864   const std::string largest_key = Key(largest_key_num + 10, kKeySize);
865 
866   // Whenever a compaction completes, this listener will try to
867   // verify whether the returned CompactionJobStats matches
868   // what we expect.
869   auto* stats_checker = new CompactionJobDeletionStatsChecker();
870   Options options;
871   options.listeners.emplace_back(stats_checker);
872   options.create_if_missing = true;
873   options.level0_file_num_compaction_trigger = kTestScale+1;
874   options.num_levels = 3;
875   options.compression = kNoCompression;
876   options.max_bytes_for_level_multiplier = 2;
877   options.max_subcompactions = max_subcompactions_;
878 
879   DestroyAndReopen(options);
880   CreateAndReopenWithCF({"pikachu"}, options);
881 
882   // Stage 1: Generate several L0 files and then send them to L2 by
883   // using CompactRangeOptions and CompactRange(). These files will
884   // have a strict subset of the keys from the full key-range
885   for (uint64_t start_key = key_base;
886                 start_key <= key_base * kTestScale / 2;
887                 start_key += key_base) {
888     MakeTableWithKeyValues(
889         &rnd, start_key, start_key + key_base - 1,
890         kKeySize, kValueSize, key_interval,
891         compression_ratio, 1);
892   }
893 
894   CompactRangeOptions cr_options;
895   cr_options.change_level = true;
896   cr_options.target_level = 2;
897   ASSERT_OK(db_->CompactRange(cr_options, handles_[1], nullptr, nullptr));
898   ASSERT_GT(NumTableFilesAtLevel(2, 1), 0);
899 
900   // Stage 2: Generate files including keys from the entire key range
901   for (uint64_t start_key = key_base;
902                 start_key <= key_base * kTestScale;
903                 start_key += key_base) {
904     MakeTableWithKeyValues(
905         &rnd, start_key, start_key + key_base - 1,
906         kKeySize, kValueSize, key_interval,
907         compression_ratio, 1);
908   }
909 
910   // Send these L0 files to L1
911   TEST_Compact(0, 1, smallest_key, largest_key);
912   ASSERT_GT(NumTableFilesAtLevel(1, 1), 0);
913 
914   // Add a new record and flush so now there is a L0 file
915   // with a value too (not just deletions from the next step)
916   ASSERT_OK(Put(1, Key(key_base-6, kKeySize), "test"));
917   ASSERT_OK(Flush(1));
918 
919   // Stage 3: Generate L0 files with some deletions so now
920   // there are files with the same key range in L0, L1, and L2
921   int deletion_interval = 3;
922   CompactionJobStats first_compaction_stats;
923   SelectivelyDeleteKeys(key_base, largest_key_num,
924       key_interval, deletion_interval, kKeySize, cutoff_key_num,
925       &first_compaction_stats, 1);
926 
927   stats_checker->AddExpectedStats(first_compaction_stats);
928 
929   // Stage 4: Trigger compaction and verify the stats
930   TEST_Compact(0, 1, smallest_key, largest_key);
931 }
932 
933 namespace {
GetUniversalCompactionInputUnits(uint32_t num_flushes)934 int GetUniversalCompactionInputUnits(uint32_t num_flushes) {
935   uint32_t compaction_input_units;
936   for (compaction_input_units = 1;
937        num_flushes >= compaction_input_units;
938        compaction_input_units *= 2) {
939     if ((num_flushes & compaction_input_units) != 0) {
940       return compaction_input_units > 1 ? compaction_input_units : 0;
941     }
942   }
943   return 0;
944 }
945 }  // namespace
946 
TEST_P(CompactionJobStatsTest,UniversalCompactionTest)947 TEST_P(CompactionJobStatsTest, UniversalCompactionTest) {
948   Random rnd(301);
949   uint64_t key_base = 100000000l;
950   // Note: key_base must be multiple of num_keys_per_L0_file
951   int num_keys_per_table = 100;
952   const uint32_t kTestScale = 6;
953   const int kKeySize = 10;
954   const int kValueSize = 900;
955   double compression_ratio = 1.0;
956   uint64_t key_interval = key_base / num_keys_per_table;
957 
958   auto* stats_checker = new CompactionJobStatsChecker();
959   Options options;
960   options.listeners.emplace_back(stats_checker);
961   options.create_if_missing = true;
962   options.num_levels = 3;
963   options.compression = kNoCompression;
964   options.level0_file_num_compaction_trigger = 2;
965   options.target_file_size_base = num_keys_per_table * 1000;
966   options.compaction_style = kCompactionStyleUniversal;
967   options.compaction_options_universal.size_ratio = 1;
968   options.compaction_options_universal.max_size_amplification_percent = 1000;
969   options.max_subcompactions = max_subcompactions_;
970 
971   DestroyAndReopen(options);
972   CreateAndReopenWithCF({"pikachu"}, options);
973 
974   // Generates the expected CompactionJobStats for each compaction
975   for (uint32_t num_flushes = 2; num_flushes <= kTestScale; num_flushes++) {
976     // Here we treat one newly flushed file as an unit.
977     //
978     // For example, if a newly flushed file is 100k, and a compaction has
979     // 4 input units, then this compaction inputs 400k.
980     uint32_t num_input_units = GetUniversalCompactionInputUnits(num_flushes);
981     if (num_input_units == 0) {
982       continue;
983     }
984     // A full compaction only happens when the number of flushes equals to
985     // the number of compaction input runs.
986     bool is_full = num_flushes == num_input_units;
987     // The following statement determines the expected smallest key
988     // based on whether it is a full compaction.
989     uint64_t smallest_key = is_full ? key_base : key_base * (num_flushes - 1);
990 
991     stats_checker->AddExpectedStats(NewManualCompactionJobStats(
992         Key(smallest_key, 10),
993         Key(smallest_key + key_base * num_input_units - key_interval, 10),
994         num_input_units, num_input_units > 2 ? num_input_units / 2 : 0,
995         num_keys_per_table * num_input_units, kKeySize, kValueSize,
996         num_input_units, num_keys_per_table * num_input_units, 1.0, 0, is_full,
997         false));
998     ASSERT_OK(dbfull()->TEST_WaitForCompact());
999   }
1000   ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 3U);
1001 
1002   for (uint64_t start_key = key_base;
1003                 start_key <= key_base * kTestScale;
1004                 start_key += key_base) {
1005     MakeTableWithKeyValues(
1006         &rnd, start_key, start_key + key_base - 1,
1007         kKeySize, kValueSize, key_interval,
1008         compression_ratio, 1);
1009     ASSERT_OK(static_cast_with_check<DBImpl>(db_)->TEST_WaitForCompact());
1010   }
1011   ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 0U);
1012 }
1013 
1014 INSTANTIATE_TEST_CASE_P(CompactionJobStatsTest, CompactionJobStatsTest,
1015                         ::testing::Values(1, 4));
1016 }  // namespace ROCKSDB_NAMESPACE
1017 
main(int argc,char ** argv)1018 int main(int argc, char** argv) {
1019   ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
1020   ::testing::InitGoogleTest(&argc, argv);
1021   return RUN_ALL_TESTS();
1022 }
1023 
1024 #else
1025 #include <stdio.h>
1026 
main(int,char **)1027 int main(int /*argc*/, char** /*argv*/) {
1028   fprintf(stderr, "SKIPPED, not supported in ROCKSDB_LITE\n");
1029   return 0;
1030 }
1031 
1032 #endif  // !ROCKSDB_LITE
1033 
1034 #else
1035 
main(int,char **)1036 int main(int /*argc*/, char** /*argv*/) { return 0; }
1037 #endif  // !defined(IOS_CROSS_COMPILE)
1038