1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include <algorithm>
11 #include <cinttypes>
12 #include <iostream>
13 #include <mutex>
14 #include <queue>
15 #include <set>
16 #include <thread>
17 #include <unordered_set>
18 #include <utility>
19
20 #include "db/db_impl/db_impl.h"
21 #include "db/dbformat.h"
22 #include "db/job_context.h"
23 #include "db/version_set.h"
24 #include "db/write_batch_internal.h"
25 #include "env/mock_env.h"
26 #include "file/filename.h"
27 #include "logging/logging.h"
28 #include "memtable/hash_linklist_rep.h"
29 #include "monitoring/statistics.h"
30 #include "monitoring/thread_status_util.h"
31 #include "port/stack_trace.h"
32 #include "rocksdb/cache.h"
33 #include "rocksdb/compaction_filter.h"
34 #include "rocksdb/convenience.h"
35 #include "rocksdb/db.h"
36 #include "rocksdb/env.h"
37 #include "rocksdb/experimental.h"
38 #include "rocksdb/filter_policy.h"
39 #include "rocksdb/options.h"
40 #include "rocksdb/perf_context.h"
41 #include "rocksdb/slice.h"
42 #include "rocksdb/slice_transform.h"
43 #include "rocksdb/table.h"
44 #include "rocksdb/table_properties.h"
45 #include "rocksdb/thread_status.h"
46 #include "rocksdb/utilities/checkpoint.h"
47 #include "rocksdb/utilities/write_batch_with_index.h"
48 #include "table/block_based/block_based_table_factory.h"
49 #include "table/mock_table.h"
50 #include "table/plain/plain_table_factory.h"
51 #include "table/scoped_arena_iterator.h"
52 #include "test_util/sync_point.h"
53 #include "test_util/testharness.h"
54 #include "test_util/testutil.h"
55 #include "util/compression.h"
56 #include "util/hash.h"
57 #include "util/mutexlock.h"
58 #include "util/rate_limiter.h"
59 #include "util/string_util.h"
60 #include "utilities/merge_operators.h"
61
62 #if !defined(IOS_CROSS_COMPILE)
63 #ifndef ROCKSDB_LITE
64 namespace ROCKSDB_NAMESPACE {
65
RandomString(Random * rnd,int len,double ratio)66 static std::string RandomString(Random* rnd, int len, double ratio) {
67 std::string r;
68 test::CompressibleString(rnd, ratio, len, &r);
69 return r;
70 }
71
Key(uint64_t key,int length)72 std::string Key(uint64_t key, int length) {
73 const int kBufSize = 1000;
74 char buf[kBufSize];
75 if (length > kBufSize) {
76 length = kBufSize;
77 }
78 snprintf(buf, kBufSize, "%0*" PRIu64, length, key);
79 return std::string(buf);
80 }
81
82 class CompactionJobStatsTest : public testing::Test,
83 public testing::WithParamInterface<bool> {
84 public:
85 std::string dbname_;
86 std::string alternative_wal_dir_;
87 Env* env_;
88 DB* db_;
89 std::vector<ColumnFamilyHandle*> handles_;
90 uint32_t max_subcompactions_;
91
92 Options last_options_;
93
CompactionJobStatsTest()94 CompactionJobStatsTest() : env_(Env::Default()) {
95 env_->SetBackgroundThreads(1, Env::LOW);
96 env_->SetBackgroundThreads(1, Env::HIGH);
97 dbname_ = test::PerThreadDBPath("compaction_job_stats_test");
98 alternative_wal_dir_ = dbname_ + "/wal";
99 Options options;
100 options.create_if_missing = true;
101 max_subcompactions_ = GetParam();
102 options.max_subcompactions = max_subcompactions_;
103 auto delete_options = options;
104 delete_options.wal_dir = alternative_wal_dir_;
105 EXPECT_OK(DestroyDB(dbname_, delete_options));
106 // Destroy it for not alternative WAL dir is used.
107 EXPECT_OK(DestroyDB(dbname_, options));
108 db_ = nullptr;
109 Reopen(options);
110 }
111
~CompactionJobStatsTest()112 ~CompactionJobStatsTest() override {
113 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
114 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({});
115 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
116 Close();
117 Options options;
118 options.db_paths.emplace_back(dbname_, 0);
119 options.db_paths.emplace_back(dbname_ + "_2", 0);
120 options.db_paths.emplace_back(dbname_ + "_3", 0);
121 options.db_paths.emplace_back(dbname_ + "_4", 0);
122 EXPECT_OK(DestroyDB(dbname_, options));
123 }
124
125 // Required if inheriting from testing::WithParamInterface<>
SetUpTestCase()126 static void SetUpTestCase() {}
TearDownTestCase()127 static void TearDownTestCase() {}
128
dbfull()129 DBImpl* dbfull() {
130 return reinterpret_cast<DBImpl*>(db_);
131 }
132
CreateColumnFamilies(const std::vector<std::string> & cfs,const Options & options)133 void CreateColumnFamilies(const std::vector<std::string>& cfs,
134 const Options& options) {
135 ColumnFamilyOptions cf_opts(options);
136 size_t cfi = handles_.size();
137 handles_.resize(cfi + cfs.size());
138 for (auto cf : cfs) {
139 ASSERT_OK(db_->CreateColumnFamily(cf_opts, cf, &handles_[cfi++]));
140 }
141 }
142
CreateAndReopenWithCF(const std::vector<std::string> & cfs,const Options & options)143 void CreateAndReopenWithCF(const std::vector<std::string>& cfs,
144 const Options& options) {
145 CreateColumnFamilies(cfs, options);
146 std::vector<std::string> cfs_plus_default = cfs;
147 cfs_plus_default.insert(cfs_plus_default.begin(), kDefaultColumnFamilyName);
148 ReopenWithColumnFamilies(cfs_plus_default, options);
149 }
150
ReopenWithColumnFamilies(const std::vector<std::string> & cfs,const std::vector<Options> & options)151 void ReopenWithColumnFamilies(const std::vector<std::string>& cfs,
152 const std::vector<Options>& options) {
153 ASSERT_OK(TryReopenWithColumnFamilies(cfs, options));
154 }
155
ReopenWithColumnFamilies(const std::vector<std::string> & cfs,const Options & options)156 void ReopenWithColumnFamilies(const std::vector<std::string>& cfs,
157 const Options& options) {
158 ASSERT_OK(TryReopenWithColumnFamilies(cfs, options));
159 }
160
TryReopenWithColumnFamilies(const std::vector<std::string> & cfs,const std::vector<Options> & options)161 Status TryReopenWithColumnFamilies(
162 const std::vector<std::string>& cfs,
163 const std::vector<Options>& options) {
164 Close();
165 EXPECT_EQ(cfs.size(), options.size());
166 std::vector<ColumnFamilyDescriptor> column_families;
167 for (size_t i = 0; i < cfs.size(); ++i) {
168 column_families.push_back(ColumnFamilyDescriptor(cfs[i], options[i]));
169 }
170 DBOptions db_opts = DBOptions(options[0]);
171 return DB::Open(db_opts, dbname_, column_families, &handles_, &db_);
172 }
173
TryReopenWithColumnFamilies(const std::vector<std::string> & cfs,const Options & options)174 Status TryReopenWithColumnFamilies(const std::vector<std::string>& cfs,
175 const Options& options) {
176 Close();
177 std::vector<Options> v_opts(cfs.size(), options);
178 return TryReopenWithColumnFamilies(cfs, v_opts);
179 }
180
Reopen(const Options & options)181 void Reopen(const Options& options) {
182 ASSERT_OK(TryReopen(options));
183 }
184
Close()185 void Close() {
186 for (auto h : handles_) {
187 delete h;
188 }
189 handles_.clear();
190 delete db_;
191 db_ = nullptr;
192 }
193
DestroyAndReopen(const Options & options)194 void DestroyAndReopen(const Options& options) {
195 // Destroy using last options
196 Destroy(last_options_);
197 ASSERT_OK(TryReopen(options));
198 }
199
Destroy(const Options & options)200 void Destroy(const Options& options) {
201 Close();
202 ASSERT_OK(DestroyDB(dbname_, options));
203 }
204
ReadOnlyReopen(const Options & options)205 Status ReadOnlyReopen(const Options& options) {
206 return DB::OpenForReadOnly(options, dbname_, &db_);
207 }
208
TryReopen(const Options & options)209 Status TryReopen(const Options& options) {
210 Close();
211 last_options_ = options;
212 return DB::Open(options, dbname_, &db_);
213 }
214
Flush(int cf=0)215 Status Flush(int cf = 0) {
216 if (cf == 0) {
217 return db_->Flush(FlushOptions());
218 } else {
219 return db_->Flush(FlushOptions(), handles_[cf]);
220 }
221 }
222
Put(const Slice & k,const Slice & v,WriteOptions wo=WriteOptions ())223 Status Put(const Slice& k, const Slice& v, WriteOptions wo = WriteOptions()) {
224 return db_->Put(wo, k, v);
225 }
226
Put(int cf,const Slice & k,const Slice & v,WriteOptions wo=WriteOptions ())227 Status Put(int cf, const Slice& k, const Slice& v,
228 WriteOptions wo = WriteOptions()) {
229 return db_->Put(wo, handles_[cf], k, v);
230 }
231
Delete(const std::string & k)232 Status Delete(const std::string& k) {
233 return db_->Delete(WriteOptions(), k);
234 }
235
Delete(int cf,const std::string & k)236 Status Delete(int cf, const std::string& k) {
237 return db_->Delete(WriteOptions(), handles_[cf], k);
238 }
239
Get(const std::string & k,const Snapshot * snapshot=nullptr)240 std::string Get(const std::string& k, const Snapshot* snapshot = nullptr) {
241 ReadOptions options;
242 options.verify_checksums = true;
243 options.snapshot = snapshot;
244 std::string result;
245 Status s = db_->Get(options, k, &result);
246 if (s.IsNotFound()) {
247 result = "NOT_FOUND";
248 } else if (!s.ok()) {
249 result = s.ToString();
250 }
251 return result;
252 }
253
Get(int cf,const std::string & k,const Snapshot * snapshot=nullptr)254 std::string Get(int cf, const std::string& k,
255 const Snapshot* snapshot = nullptr) {
256 ReadOptions options;
257 options.verify_checksums = true;
258 options.snapshot = snapshot;
259 std::string result;
260 Status s = db_->Get(options, handles_[cf], k, &result);
261 if (s.IsNotFound()) {
262 result = "NOT_FOUND";
263 } else if (!s.ok()) {
264 result = s.ToString();
265 }
266 return result;
267 }
268
NumTableFilesAtLevel(int level,int cf=0)269 int NumTableFilesAtLevel(int level, int cf = 0) {
270 std::string property;
271 if (cf == 0) {
272 // default cfd
273 EXPECT_TRUE(db_->GetProperty(
274 "rocksdb.num-files-at-level" + NumberToString(level), &property));
275 } else {
276 EXPECT_TRUE(db_->GetProperty(
277 handles_[cf], "rocksdb.num-files-at-level" + NumberToString(level),
278 &property));
279 }
280 return atoi(property.c_str());
281 }
282
283 // Return spread of files per level
FilesPerLevel(int cf=0)284 std::string FilesPerLevel(int cf = 0) {
285 int num_levels =
286 (cf == 0) ? db_->NumberLevels() : db_->NumberLevels(handles_[1]);
287 std::string result;
288 size_t last_non_zero_offset = 0;
289 for (int level = 0; level < num_levels; level++) {
290 int f = NumTableFilesAtLevel(level, cf);
291 char buf[100];
292 snprintf(buf, sizeof(buf), "%s%d", (level ? "," : ""), f);
293 result += buf;
294 if (f > 0) {
295 last_non_zero_offset = result.size();
296 }
297 }
298 result.resize(last_non_zero_offset);
299 return result;
300 }
301
Size(const Slice & start,const Slice & limit,int cf=0)302 uint64_t Size(const Slice& start, const Slice& limit, int cf = 0) {
303 Range r(start, limit);
304 uint64_t size;
305 if (cf == 0) {
306 db_->GetApproximateSizes(&r, 1, &size);
307 } else {
308 db_->GetApproximateSizes(handles_[1], &r, 1, &size);
309 }
310 return size;
311 }
312
Compact(int cf,const Slice & start,const Slice & limit,uint32_t target_path_id)313 void Compact(int cf, const Slice& start, const Slice& limit,
314 uint32_t target_path_id) {
315 CompactRangeOptions compact_options;
316 compact_options.target_path_id = target_path_id;
317 ASSERT_OK(db_->CompactRange(compact_options, handles_[cf], &start, &limit));
318 }
319
Compact(int cf,const Slice & start,const Slice & limit)320 void Compact(int cf, const Slice& start, const Slice& limit) {
321 ASSERT_OK(
322 db_->CompactRange(CompactRangeOptions(), handles_[cf], &start, &limit));
323 }
324
Compact(const Slice & start,const Slice & limit)325 void Compact(const Slice& start, const Slice& limit) {
326 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &limit));
327 }
328
TEST_Compact(int level,int cf,const Slice & start,const Slice & limit)329 void TEST_Compact(int level, int cf, const Slice& start, const Slice& limit) {
330 ASSERT_OK(dbfull()->TEST_CompactRange(level, &start, &limit, handles_[cf],
331 true /* disallow trivial move */));
332 }
333
334 // Do n memtable compactions, each of which produces an sstable
335 // covering the range [small,large].
MakeTables(int n,const std::string & small,const std::string & large,int cf=0)336 void MakeTables(int n, const std::string& small, const std::string& large,
337 int cf = 0) {
338 for (int i = 0; i < n; i++) {
339 ASSERT_OK(Put(cf, small, "begin"));
340 ASSERT_OK(Put(cf, large, "end"));
341 ASSERT_OK(Flush(cf));
342 }
343 }
344
SetDeletionCompactionStats(CompactionJobStats * stats,uint64_t input_deletions,uint64_t expired_deletions,uint64_t records_replaced)345 static void SetDeletionCompactionStats(
346 CompactionJobStats *stats, uint64_t input_deletions,
347 uint64_t expired_deletions, uint64_t records_replaced) {
348 stats->num_input_deletion_records = input_deletions;
349 stats->num_expired_deletion_records = expired_deletions;
350 stats->num_records_replaced = records_replaced;
351 }
352
MakeTableWithKeyValues(Random * rnd,uint64_t smallest,uint64_t largest,int key_size,int value_size,uint64_t interval,double ratio,int cf=0)353 void MakeTableWithKeyValues(
354 Random* rnd, uint64_t smallest, uint64_t largest,
355 int key_size, int value_size, uint64_t interval,
356 double ratio, int cf = 0) {
357 for (auto key = smallest; key < largest; key += interval) {
358 ASSERT_OK(Put(cf, Slice(Key(key, key_size)),
359 Slice(RandomString(rnd, value_size, ratio))));
360 }
361 ASSERT_OK(Flush(cf));
362 }
363
364 // This function behaves with the implicit understanding that two
365 // rounds of keys are inserted into the database, as per the behavior
366 // of the DeletionStatsTest.
SelectivelyDeleteKeys(uint64_t smallest,uint64_t largest,uint64_t interval,int deletion_interval,int key_size,uint64_t cutoff_key_num,CompactionJobStats * stats,int cf=0)367 void SelectivelyDeleteKeys(uint64_t smallest, uint64_t largest,
368 uint64_t interval, int deletion_interval, int key_size,
369 uint64_t cutoff_key_num, CompactionJobStats* stats, int cf = 0) {
370
371 // interval needs to be >= 2 so that deletion entries can be inserted
372 // that are intended to not result in an actual key deletion by using
373 // an offset of 1 from another existing key
374 ASSERT_GE(interval, 2);
375
376 uint64_t ctr = 1;
377 uint32_t deletions_made = 0;
378 uint32_t num_deleted = 0;
379 uint32_t num_expired = 0;
380 for (auto key = smallest; key <= largest; key += interval, ctr++) {
381 if (ctr % deletion_interval == 0) {
382 ASSERT_OK(Delete(cf, Key(key, key_size)));
383 deletions_made++;
384 num_deleted++;
385
386 if (key > cutoff_key_num) {
387 num_expired++;
388 }
389 }
390 }
391
392 // Insert some deletions for keys that don't exist that
393 // are both in and out of the key range
394 ASSERT_OK(Delete(cf, Key(smallest+1, key_size)));
395 deletions_made++;
396
397 ASSERT_OK(Delete(cf, Key(smallest-1, key_size)));
398 deletions_made++;
399 num_expired++;
400
401 ASSERT_OK(Delete(cf, Key(smallest-9, key_size)));
402 deletions_made++;
403 num_expired++;
404
405 ASSERT_OK(Flush(cf));
406 SetDeletionCompactionStats(stats, deletions_made, num_expired,
407 num_deleted);
408 }
409 };
410
411 // An EventListener which helps verify the compaction results in
412 // test CompactionJobStatsTest.
413 class CompactionJobStatsChecker : public EventListener {
414 public:
CompactionJobStatsChecker()415 CompactionJobStatsChecker()
416 : compression_enabled_(false), verify_next_comp_io_stats_(false) {}
417
NumberOfUnverifiedStats()418 size_t NumberOfUnverifiedStats() { return expected_stats_.size(); }
419
set_verify_next_comp_io_stats(bool v)420 void set_verify_next_comp_io_stats(bool v) { verify_next_comp_io_stats_ = v; }
421
422 // Once a compaction completed, this function will verify the returned
423 // CompactionJobInfo with the oldest CompactionJobInfo added earlier
424 // in "expected_stats_" which has not yet being used for verification.
OnCompactionCompleted(DB *,const CompactionJobInfo & ci)425 void OnCompactionCompleted(DB* /*db*/, const CompactionJobInfo& ci) override {
426 if (verify_next_comp_io_stats_) {
427 ASSERT_GT(ci.stats.file_write_nanos, 0);
428 ASSERT_GT(ci.stats.file_range_sync_nanos, 0);
429 ASSERT_GT(ci.stats.file_fsync_nanos, 0);
430 ASSERT_GT(ci.stats.file_prepare_write_nanos, 0);
431 verify_next_comp_io_stats_ = false;
432 }
433
434 std::lock_guard<std::mutex> lock(mutex_);
435 if (expected_stats_.size()) {
436 Verify(ci.stats, expected_stats_.front());
437 expected_stats_.pop();
438 }
439 }
440
441 // A helper function which verifies whether two CompactionJobStats
442 // match. The verification of all compaction stats are done by
443 // ASSERT_EQ except for the total input / output bytes, which we
444 // use ASSERT_GE and ASSERT_LE with a reasonable bias ---
445 // 10% in uncompressed case and 20% when compression is used.
Verify(const CompactionJobStats & current_stats,const CompactionJobStats & stats)446 virtual void Verify(const CompactionJobStats& current_stats,
447 const CompactionJobStats& stats) {
448 // time
449 ASSERT_GT(current_stats.elapsed_micros, 0U);
450
451 ASSERT_EQ(current_stats.num_input_records,
452 stats.num_input_records);
453 ASSERT_EQ(current_stats.num_input_files,
454 stats.num_input_files);
455 ASSERT_EQ(current_stats.num_input_files_at_output_level,
456 stats.num_input_files_at_output_level);
457
458 ASSERT_EQ(current_stats.num_output_records,
459 stats.num_output_records);
460 ASSERT_EQ(current_stats.num_output_files,
461 stats.num_output_files);
462
463 ASSERT_EQ(current_stats.is_manual_compaction,
464 stats.is_manual_compaction);
465
466 // file size
467 double kFileSizeBias = compression_enabled_ ? 0.20 : 0.10;
468 ASSERT_GE(current_stats.total_input_bytes * (1.00 + kFileSizeBias),
469 stats.total_input_bytes);
470 ASSERT_LE(current_stats.total_input_bytes,
471 stats.total_input_bytes * (1.00 + kFileSizeBias));
472 ASSERT_GE(current_stats.total_output_bytes * (1.00 + kFileSizeBias),
473 stats.total_output_bytes);
474 ASSERT_LE(current_stats.total_output_bytes,
475 stats.total_output_bytes * (1.00 + kFileSizeBias));
476 ASSERT_EQ(current_stats.total_input_raw_key_bytes,
477 stats.total_input_raw_key_bytes);
478 ASSERT_EQ(current_stats.total_input_raw_value_bytes,
479 stats.total_input_raw_value_bytes);
480
481 ASSERT_EQ(current_stats.num_records_replaced,
482 stats.num_records_replaced);
483
484 ASSERT_EQ(current_stats.num_corrupt_keys,
485 stats.num_corrupt_keys);
486
487 ASSERT_EQ(
488 std::string(current_stats.smallest_output_key_prefix),
489 std::string(stats.smallest_output_key_prefix));
490 ASSERT_EQ(
491 std::string(current_stats.largest_output_key_prefix),
492 std::string(stats.largest_output_key_prefix));
493 }
494
495 // Add an expected compaction stats, which will be used to
496 // verify the CompactionJobStats returned by the OnCompactionCompleted()
497 // callback.
AddExpectedStats(const CompactionJobStats & stats)498 void AddExpectedStats(const CompactionJobStats& stats) {
499 std::lock_guard<std::mutex> lock(mutex_);
500 expected_stats_.push(stats);
501 }
502
EnableCompression(bool flag)503 void EnableCompression(bool flag) {
504 compression_enabled_ = flag;
505 }
506
verify_next_comp_io_stats() const507 bool verify_next_comp_io_stats() const { return verify_next_comp_io_stats_; }
508
509 private:
510 std::mutex mutex_;
511 std::queue<CompactionJobStats> expected_stats_;
512 bool compression_enabled_;
513 bool verify_next_comp_io_stats_;
514 };
515
516 // An EventListener which helps verify the compaction statistics in
517 // the test DeletionStatsTest.
518 class CompactionJobDeletionStatsChecker : public CompactionJobStatsChecker {
519 public:
520 // Verifies whether two CompactionJobStats match.
Verify(const CompactionJobStats & current_stats,const CompactionJobStats & stats)521 void Verify(const CompactionJobStats& current_stats,
522 const CompactionJobStats& stats) override {
523 ASSERT_EQ(
524 current_stats.num_input_deletion_records,
525 stats.num_input_deletion_records);
526 ASSERT_EQ(
527 current_stats.num_expired_deletion_records,
528 stats.num_expired_deletion_records);
529 ASSERT_EQ(
530 current_stats.num_records_replaced,
531 stats.num_records_replaced);
532
533 ASSERT_EQ(current_stats.num_corrupt_keys,
534 stats.num_corrupt_keys);
535 }
536 };
537
538 namespace {
539
EstimatedFileSize(uint64_t num_records,size_t key_size,size_t value_size,double compression_ratio=1.0,size_t block_size=4096,int bloom_bits_per_key=10)540 uint64_t EstimatedFileSize(
541 uint64_t num_records, size_t key_size, size_t value_size,
542 double compression_ratio = 1.0,
543 size_t block_size = 4096,
544 int bloom_bits_per_key = 10) {
545 const size_t kPerKeyOverhead = 8;
546 const size_t kFooterSize = 512;
547
548 uint64_t data_size =
549 static_cast<uint64_t>(
550 num_records * (key_size + value_size * compression_ratio +
551 kPerKeyOverhead));
552
553 return data_size + kFooterSize
554 + num_records * bloom_bits_per_key / 8 // filter block
555 + data_size * (key_size + 8) / block_size; // index block
556 }
557
558 namespace {
559
CopyPrefix(const Slice & src,size_t prefix_length,std::string * dst)560 void CopyPrefix(
561 const Slice& src, size_t prefix_length, std::string* dst) {
562 assert(prefix_length > 0);
563 size_t length = src.size() > prefix_length ? prefix_length : src.size();
564 dst->assign(src.data(), length);
565 }
566
567 } // namespace
568
NewManualCompactionJobStats(const std::string & smallest_key,const std::string & largest_key,size_t num_input_files,size_t num_input_files_at_output_level,uint64_t num_input_records,size_t key_size,size_t value_size,size_t num_output_files,uint64_t num_output_records,double compression_ratio,uint64_t num_records_replaced,bool is_manual=true)569 CompactionJobStats NewManualCompactionJobStats(
570 const std::string& smallest_key, const std::string& largest_key,
571 size_t num_input_files, size_t num_input_files_at_output_level,
572 uint64_t num_input_records, size_t key_size, size_t value_size,
573 size_t num_output_files, uint64_t num_output_records,
574 double compression_ratio, uint64_t num_records_replaced,
575 bool is_manual = true) {
576 CompactionJobStats stats;
577 stats.Reset();
578
579 stats.num_input_records = num_input_records;
580 stats.num_input_files = num_input_files;
581 stats.num_input_files_at_output_level = num_input_files_at_output_level;
582
583 stats.num_output_records = num_output_records;
584 stats.num_output_files = num_output_files;
585
586 stats.total_input_bytes =
587 EstimatedFileSize(
588 num_input_records / num_input_files,
589 key_size, value_size, compression_ratio) * num_input_files;
590 stats.total_output_bytes =
591 EstimatedFileSize(
592 num_output_records / num_output_files,
593 key_size, value_size, compression_ratio) * num_output_files;
594 stats.total_input_raw_key_bytes =
595 num_input_records * (key_size + 8);
596 stats.total_input_raw_value_bytes =
597 num_input_records * value_size;
598
599 stats.is_manual_compaction = is_manual;
600
601 stats.num_records_replaced = num_records_replaced;
602
603 CopyPrefix(smallest_key,
604 CompactionJobStats::kMaxPrefixLength,
605 &stats.smallest_output_key_prefix);
606 CopyPrefix(largest_key,
607 CompactionJobStats::kMaxPrefixLength,
608 &stats.largest_output_key_prefix);
609
610 return stats;
611 }
612
GetAnyCompression()613 CompressionType GetAnyCompression() {
614 if (Snappy_Supported()) {
615 return kSnappyCompression;
616 } else if (Zlib_Supported()) {
617 return kZlibCompression;
618 } else if (BZip2_Supported()) {
619 return kBZip2Compression;
620 } else if (LZ4_Supported()) {
621 return kLZ4Compression;
622 } else if (XPRESS_Supported()) {
623 return kXpressCompression;
624 }
625
626 return kNoCompression;
627 }
628
629 } // namespace
630
TEST_P(CompactionJobStatsTest,CompactionJobStatsTest)631 TEST_P(CompactionJobStatsTest, CompactionJobStatsTest) {
632 Random rnd(301);
633 const int kBufSize = 100;
634 char buf[kBufSize];
635 uint64_t key_base = 100000000l;
636 // Note: key_base must be multiple of num_keys_per_L0_file
637 int num_keys_per_L0_file = 100;
638 const int kTestScale = 8;
639 const int kKeySize = 10;
640 const int kValueSize = 1000;
641 const double kCompressionRatio = 0.5;
642 double compression_ratio = 1.0;
643 uint64_t key_interval = key_base / num_keys_per_L0_file;
644
645 // Whenever a compaction completes, this listener will try to
646 // verify whether the returned CompactionJobStats matches
647 // what we expect. The expected CompactionJobStats is added
648 // via AddExpectedStats().
649 auto* stats_checker = new CompactionJobStatsChecker();
650 Options options;
651 options.listeners.emplace_back(stats_checker);
652 options.create_if_missing = true;
653 // just enough setting to hold off auto-compaction.
654 options.level0_file_num_compaction_trigger = kTestScale + 1;
655 options.num_levels = 3;
656 options.compression = kNoCompression;
657 options.max_subcompactions = max_subcompactions_;
658 options.bytes_per_sync = 512 * 1024;
659
660 options.report_bg_io_stats = true;
661 for (int test = 0; test < 2; ++test) {
662 DestroyAndReopen(options);
663 CreateAndReopenWithCF({"pikachu"}, options);
664
665 // 1st Phase: generate "num_L0_files" L0 files.
666 int num_L0_files = 0;
667 for (uint64_t start_key = key_base;
668 start_key <= key_base * kTestScale;
669 start_key += key_base) {
670 MakeTableWithKeyValues(
671 &rnd, start_key, start_key + key_base - 1,
672 kKeySize, kValueSize, key_interval,
673 compression_ratio, 1);
674 snprintf(buf, kBufSize, "%d", ++num_L0_files);
675 ASSERT_EQ(std::string(buf), FilesPerLevel(1));
676 }
677 ASSERT_EQ(ToString(num_L0_files), FilesPerLevel(1));
678
679 // 2nd Phase: perform L0 -> L1 compaction.
680 int L0_compaction_count = 6;
681 int count = 1;
682 std::string smallest_key;
683 std::string largest_key;
684 for (uint64_t start_key = key_base;
685 start_key <= key_base * L0_compaction_count;
686 start_key += key_base, count++) {
687 smallest_key = Key(start_key, 10);
688 largest_key = Key(start_key + key_base - key_interval, 10);
689 stats_checker->AddExpectedStats(
690 NewManualCompactionJobStats(
691 smallest_key, largest_key,
692 1, 0, num_keys_per_L0_file,
693 kKeySize, kValueSize,
694 1, num_keys_per_L0_file,
695 compression_ratio, 0));
696 ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 1U);
697 TEST_Compact(0, 1, smallest_key, largest_key);
698 snprintf(buf, kBufSize, "%d,%d", num_L0_files - count, count);
699 ASSERT_EQ(std::string(buf), FilesPerLevel(1));
700 }
701
702 // compact two files into one in the last L0 -> L1 compaction
703 int num_remaining_L0 = num_L0_files - L0_compaction_count;
704 smallest_key = Key(key_base * (L0_compaction_count + 1), 10);
705 largest_key = Key(key_base * (kTestScale + 1) - key_interval, 10);
706 stats_checker->AddExpectedStats(
707 NewManualCompactionJobStats(
708 smallest_key, largest_key,
709 num_remaining_L0,
710 0, num_keys_per_L0_file * num_remaining_L0,
711 kKeySize, kValueSize,
712 1, num_keys_per_L0_file * num_remaining_L0,
713 compression_ratio, 0));
714 ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 1U);
715 TEST_Compact(0, 1, smallest_key, largest_key);
716
717 int num_L1_files = num_L0_files - num_remaining_L0 + 1;
718 num_L0_files = 0;
719 snprintf(buf, kBufSize, "%d,%d", num_L0_files, num_L1_files);
720 ASSERT_EQ(std::string(buf), FilesPerLevel(1));
721
722 // 3rd Phase: generate sparse L0 files (wider key-range, same num of keys)
723 int sparseness = 2;
724 for (uint64_t start_key = key_base;
725 start_key <= key_base * kTestScale;
726 start_key += key_base * sparseness) {
727 MakeTableWithKeyValues(
728 &rnd, start_key, start_key + key_base * sparseness - 1,
729 kKeySize, kValueSize,
730 key_base * sparseness / num_keys_per_L0_file,
731 compression_ratio, 1);
732 snprintf(buf, kBufSize, "%d,%d", ++num_L0_files, num_L1_files);
733 ASSERT_EQ(std::string(buf), FilesPerLevel(1));
734 }
735
736 // 4th Phase: perform L0 -> L1 compaction again, expect higher write amp
737 // When subcompactions are enabled, the number of output files increases
738 // by 1 because multiple threads are consuming the input and generating
739 // output files without coordinating to see if the output could fit into
740 // a smaller number of files like it does when it runs sequentially
741 int num_output_files = options.max_subcompactions > 1 ? 2 : 1;
742 for (uint64_t start_key = key_base;
743 num_L0_files > 1;
744 start_key += key_base * sparseness) {
745 smallest_key = Key(start_key, 10);
746 largest_key =
747 Key(start_key + key_base * sparseness - key_interval, 10);
748 stats_checker->AddExpectedStats(
749 NewManualCompactionJobStats(
750 smallest_key, largest_key,
751 3, 2, num_keys_per_L0_file * 3,
752 kKeySize, kValueSize,
753 num_output_files,
754 num_keys_per_L0_file * 2, // 1/3 of the data will be updated.
755 compression_ratio,
756 num_keys_per_L0_file));
757 ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 1U);
758 Compact(1, smallest_key, largest_key);
759 if (options.max_subcompactions == 1) {
760 --num_L1_files;
761 }
762 snprintf(buf, kBufSize, "%d,%d", --num_L0_files, num_L1_files);
763 ASSERT_EQ(std::string(buf), FilesPerLevel(1));
764 }
765
766 // 5th Phase: Do a full compaction, which involves in two sub-compactions.
767 // Here we expect to have 1 L0 files and 4 L1 files
768 // In the first sub-compaction, we expect L0 compaction.
769 smallest_key = Key(key_base, 10);
770 largest_key = Key(key_base * (kTestScale + 1) - key_interval, 10);
771 stats_checker->AddExpectedStats(
772 NewManualCompactionJobStats(
773 Key(key_base * (kTestScale + 1 - sparseness), 10), largest_key,
774 2, 1, num_keys_per_L0_file * 3,
775 kKeySize, kValueSize,
776 1, num_keys_per_L0_file * 2,
777 compression_ratio,
778 num_keys_per_L0_file));
779 ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 1U);
780 Compact(1, smallest_key, largest_key);
781
782 num_L1_files = options.max_subcompactions > 1 ? 7 : 4;
783 char L1_buf[4];
784 snprintf(L1_buf, sizeof(L1_buf), "0,%d", num_L1_files);
785 std::string L1_files(L1_buf);
786 ASSERT_EQ(L1_files, FilesPerLevel(1));
787 options.compression = GetAnyCompression();
788 if (options.compression == kNoCompression) {
789 break;
790 }
791 stats_checker->EnableCompression(true);
792 compression_ratio = kCompressionRatio;
793
794 for (int i = 0; i < 5; i++) {
795 ASSERT_OK(Put(1, Slice(Key(key_base + i, 10)),
796 Slice(RandomString(&rnd, 512 * 1024, 1))));
797 }
798
799 ASSERT_OK(Flush(1));
800 reinterpret_cast<DBImpl*>(db_)->TEST_WaitForCompact();
801
802 stats_checker->set_verify_next_comp_io_stats(true);
803 std::atomic<bool> first_prepare_write(true);
804 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
805 "WritableFileWriter::Append:BeforePrepareWrite", [&](void* /*arg*/) {
806 if (first_prepare_write.load()) {
807 options.env->SleepForMicroseconds(3);
808 first_prepare_write.store(false);
809 }
810 });
811
812 std::atomic<bool> first_flush(true);
813 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
814 "WritableFileWriter::Flush:BeforeAppend", [&](void* /*arg*/) {
815 if (first_flush.load()) {
816 options.env->SleepForMicroseconds(3);
817 first_flush.store(false);
818 }
819 });
820
821 std::atomic<bool> first_sync(true);
822 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
823 "WritableFileWriter::SyncInternal:0", [&](void* /*arg*/) {
824 if (first_sync.load()) {
825 options.env->SleepForMicroseconds(3);
826 first_sync.store(false);
827 }
828 });
829
830 std::atomic<bool> first_range_sync(true);
831 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
832 "WritableFileWriter::RangeSync:0", [&](void* /*arg*/) {
833 if (first_range_sync.load()) {
834 options.env->SleepForMicroseconds(3);
835 first_range_sync.store(false);
836 }
837 });
838 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
839
840 Compact(1, smallest_key, largest_key);
841
842 ASSERT_TRUE(!stats_checker->verify_next_comp_io_stats());
843 ASSERT_TRUE(!first_prepare_write.load());
844 ASSERT_TRUE(!first_flush.load());
845 ASSERT_TRUE(!first_sync.load());
846 ASSERT_TRUE(!first_range_sync.load());
847 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
848 }
849 ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 0U);
850 }
851
TEST_P(CompactionJobStatsTest,DeletionStatsTest)852 TEST_P(CompactionJobStatsTest, DeletionStatsTest) {
853 Random rnd(301);
854 uint64_t key_base = 100000l;
855 // Note: key_base must be multiple of num_keys_per_L0_file
856 int num_keys_per_L0_file = 20;
857 const int kTestScale = 8; // make sure this is even
858 const int kKeySize = 10;
859 const int kValueSize = 100;
860 double compression_ratio = 1.0;
861 uint64_t key_interval = key_base / num_keys_per_L0_file;
862 uint64_t largest_key_num = key_base * (kTestScale + 1) - key_interval;
863 uint64_t cutoff_key_num = key_base * (kTestScale / 2 + 1) - key_interval;
864 const std::string smallest_key = Key(key_base - 10, kKeySize);
865 const std::string largest_key = Key(largest_key_num + 10, kKeySize);
866
867 // Whenever a compaction completes, this listener will try to
868 // verify whether the returned CompactionJobStats matches
869 // what we expect.
870 auto* stats_checker = new CompactionJobDeletionStatsChecker();
871 Options options;
872 options.listeners.emplace_back(stats_checker);
873 options.create_if_missing = true;
874 options.level0_file_num_compaction_trigger = kTestScale+1;
875 options.num_levels = 3;
876 options.compression = kNoCompression;
877 options.max_bytes_for_level_multiplier = 2;
878 options.max_subcompactions = max_subcompactions_;
879
880 DestroyAndReopen(options);
881 CreateAndReopenWithCF({"pikachu"}, options);
882
883 // Stage 1: Generate several L0 files and then send them to L2 by
884 // using CompactRangeOptions and CompactRange(). These files will
885 // have a strict subset of the keys from the full key-range
886 for (uint64_t start_key = key_base;
887 start_key <= key_base * kTestScale / 2;
888 start_key += key_base) {
889 MakeTableWithKeyValues(
890 &rnd, start_key, start_key + key_base - 1,
891 kKeySize, kValueSize, key_interval,
892 compression_ratio, 1);
893 }
894
895 CompactRangeOptions cr_options;
896 cr_options.change_level = true;
897 cr_options.target_level = 2;
898 db_->CompactRange(cr_options, handles_[1], nullptr, nullptr);
899 ASSERT_GT(NumTableFilesAtLevel(2, 1), 0);
900
901 // Stage 2: Generate files including keys from the entire key range
902 for (uint64_t start_key = key_base;
903 start_key <= key_base * kTestScale;
904 start_key += key_base) {
905 MakeTableWithKeyValues(
906 &rnd, start_key, start_key + key_base - 1,
907 kKeySize, kValueSize, key_interval,
908 compression_ratio, 1);
909 }
910
911 // Send these L0 files to L1
912 TEST_Compact(0, 1, smallest_key, largest_key);
913 ASSERT_GT(NumTableFilesAtLevel(1, 1), 0);
914
915 // Add a new record and flush so now there is a L0 file
916 // with a value too (not just deletions from the next step)
917 ASSERT_OK(Put(1, Key(key_base-6, kKeySize), "test"));
918 ASSERT_OK(Flush(1));
919
920 // Stage 3: Generate L0 files with some deletions so now
921 // there are files with the same key range in L0, L1, and L2
922 int deletion_interval = 3;
923 CompactionJobStats first_compaction_stats;
924 SelectivelyDeleteKeys(key_base, largest_key_num,
925 key_interval, deletion_interval, kKeySize, cutoff_key_num,
926 &first_compaction_stats, 1);
927
928 stats_checker->AddExpectedStats(first_compaction_stats);
929
930 // Stage 4: Trigger compaction and verify the stats
931 TEST_Compact(0, 1, smallest_key, largest_key);
932 }
933
934 namespace {
GetUniversalCompactionInputUnits(uint32_t num_flushes)935 int GetUniversalCompactionInputUnits(uint32_t num_flushes) {
936 uint32_t compaction_input_units;
937 for (compaction_input_units = 1;
938 num_flushes >= compaction_input_units;
939 compaction_input_units *= 2) {
940 if ((num_flushes & compaction_input_units) != 0) {
941 return compaction_input_units > 1 ? compaction_input_units : 0;
942 }
943 }
944 return 0;
945 }
946 } // namespace
947
TEST_P(CompactionJobStatsTest,UniversalCompactionTest)948 TEST_P(CompactionJobStatsTest, UniversalCompactionTest) {
949 Random rnd(301);
950 uint64_t key_base = 100000000l;
951 // Note: key_base must be multiple of num_keys_per_L0_file
952 int num_keys_per_table = 100;
953 const uint32_t kTestScale = 6;
954 const int kKeySize = 10;
955 const int kValueSize = 900;
956 double compression_ratio = 1.0;
957 uint64_t key_interval = key_base / num_keys_per_table;
958
959 auto* stats_checker = new CompactionJobStatsChecker();
960 Options options;
961 options.listeners.emplace_back(stats_checker);
962 options.create_if_missing = true;
963 options.num_levels = 3;
964 options.compression = kNoCompression;
965 options.level0_file_num_compaction_trigger = 2;
966 options.target_file_size_base = num_keys_per_table * 1000;
967 options.compaction_style = kCompactionStyleUniversal;
968 options.compaction_options_universal.size_ratio = 1;
969 options.compaction_options_universal.max_size_amplification_percent = 1000;
970 options.max_subcompactions = max_subcompactions_;
971
972 DestroyAndReopen(options);
973 CreateAndReopenWithCF({"pikachu"}, options);
974
975 // Generates the expected CompactionJobStats for each compaction
976 for (uint32_t num_flushes = 2; num_flushes <= kTestScale; num_flushes++) {
977 // Here we treat one newly flushed file as an unit.
978 //
979 // For example, if a newly flushed file is 100k, and a compaction has
980 // 4 input units, then this compaction inputs 400k.
981 uint32_t num_input_units = GetUniversalCompactionInputUnits(num_flushes);
982 if (num_input_units == 0) {
983 continue;
984 }
985 // The following statement determines the expected smallest key
986 // based on whether it is a full compaction. A full compaction only
987 // happens when the number of flushes equals to the number of compaction
988 // input runs.
989 uint64_t smallest_key =
990 (num_flushes == num_input_units) ?
991 key_base : key_base * (num_flushes - 1);
992
993 stats_checker->AddExpectedStats(
994 NewManualCompactionJobStats(
995 Key(smallest_key, 10),
996 Key(smallest_key + key_base * num_input_units - key_interval, 10),
997 num_input_units,
998 num_input_units > 2 ? num_input_units / 2 : 0,
999 num_keys_per_table * num_input_units,
1000 kKeySize, kValueSize,
1001 num_input_units,
1002 num_keys_per_table * num_input_units,
1003 1.0, 0, false));
1004 dbfull()->TEST_WaitForCompact();
1005 }
1006 ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 3U);
1007
1008 for (uint64_t start_key = key_base;
1009 start_key <= key_base * kTestScale;
1010 start_key += key_base) {
1011 MakeTableWithKeyValues(
1012 &rnd, start_key, start_key + key_base - 1,
1013 kKeySize, kValueSize, key_interval,
1014 compression_ratio, 1);
1015 reinterpret_cast<DBImpl*>(db_)->TEST_WaitForCompact();
1016 }
1017 ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 0U);
1018 }
1019
1020 INSTANTIATE_TEST_CASE_P(CompactionJobStatsTest, CompactionJobStatsTest,
1021 ::testing::Values(1, 4));
1022 } // namespace ROCKSDB_NAMESPACE
1023
main(int argc,char ** argv)1024 int main(int argc, char** argv) {
1025 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
1026 ::testing::InitGoogleTest(&argc, argv);
1027 return RUN_ALL_TESTS();
1028 }
1029
1030 #else
1031 #include <stdio.h>
1032
main(int,char **)1033 int main(int /*argc*/, char** /*argv*/) {
1034 fprintf(stderr, "SKIPPED, not supported in ROCKSDB_LITE\n");
1035 return 0;
1036 }
1037
1038 #endif // !ROCKSDB_LITE
1039
1040 #else
1041
main(int,char **)1042 int main(int /*argc*/, char** /*argv*/) { return 0; }
1043 #endif // !defined(IOS_CROSS_COMPILE)
1044