1 //  Copyright (c) 2013, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 #ifndef ROCKSDB_LITE
7 
8 #ifndef GFLAGS
9 #include <cstdio>
main()10 int main() { fprintf(stderr, "Please install gflags to run tools\n"); }
11 #else
12 #include <atomic>
13 #include <functional>
14 #include <memory>
15 #include <sstream>
16 #include <unordered_map>
17 
18 #include "monitoring/histogram.h"
19 #include "port/port.h"
20 #include "rocksdb/env.h"
21 #include "rocksdb/system_clock.h"
22 #include "table/block_based/block_builder.h"
23 #include "util/gflags_compat.h"
24 #include "util/mutexlock.h"
25 #include "util/stop_watch.h"
26 #include "utilities/persistent_cache/block_cache_tier.h"
27 #include "utilities/persistent_cache/persistent_cache_tier.h"
28 #include "utilities/persistent_cache/volatile_tier_impl.h"
29 
30 DEFINE_int32(nsec, 10, "nsec");
31 DEFINE_int32(nthread_write, 1, "Insert threads");
32 DEFINE_int32(nthread_read, 1, "Lookup threads");
33 DEFINE_string(path, "/tmp/microbench/blkcache", "Path for cachefile");
34 DEFINE_string(log_path, "/tmp/log", "Path for the log file");
35 DEFINE_uint64(cache_size, std::numeric_limits<uint64_t>::max(), "Cache size");
36 DEFINE_int32(iosize, 4 * 1024, "Read IO size");
37 DEFINE_int32(writer_iosize, 4 * 1024, "File writer IO size");
38 DEFINE_int32(writer_qdepth, 1, "File writer qdepth");
39 DEFINE_bool(enable_pipelined_writes, false, "Enable async writes");
40 DEFINE_string(cache_type, "block_cache",
41               "Cache type. (block_cache, volatile, tiered)");
42 DEFINE_bool(benchmark, false, "Benchmark mode");
43 DEFINE_int32(volatile_cache_pct, 10, "Percentage of cache in memory tier.");
44 
45 namespace ROCKSDB_NAMESPACE {
46 
NewVolatileCache()47 std::unique_ptr<PersistentCacheTier> NewVolatileCache() {
48   assert(FLAGS_cache_size != std::numeric_limits<uint64_t>::max());
49   std::unique_ptr<PersistentCacheTier> pcache(
50       new VolatileCacheTier(FLAGS_cache_size));
51   return pcache;
52 }
53 
NewBlockCache()54 std::unique_ptr<PersistentCacheTier> NewBlockCache() {
55   std::shared_ptr<Logger> log;
56   if (!Env::Default()->NewLogger(FLAGS_log_path, &log).ok()) {
57     fprintf(stderr, "Error creating log %s \n", FLAGS_log_path.c_str());
58     return nullptr;
59   }
60 
61   PersistentCacheConfig opt(Env::Default(), FLAGS_path, FLAGS_cache_size, log);
62   opt.writer_dispatch_size = FLAGS_writer_iosize;
63   opt.writer_qdepth = FLAGS_writer_qdepth;
64   opt.pipeline_writes = FLAGS_enable_pipelined_writes;
65   opt.max_write_pipeline_backlog_size = std::numeric_limits<uint64_t>::max();
66   std::unique_ptr<PersistentCacheTier> cache(new BlockCacheTier(opt));
67   Status status = cache->Open();
68   return cache;
69 }
70 
71 // create a new cache tier
72 // construct a tiered RAM+Block cache
NewTieredCache(const size_t mem_size,const PersistentCacheConfig & opt)73 std::unique_ptr<PersistentTieredCache> NewTieredCache(
74     const size_t mem_size, const PersistentCacheConfig& opt) {
75   std::unique_ptr<PersistentTieredCache> tcache(new PersistentTieredCache());
76   // create primary tier
77   assert(mem_size);
78   auto pcache =
79       std::shared_ptr<PersistentCacheTier>(new VolatileCacheTier(mem_size));
80   tcache->AddTier(pcache);
81   // create secondary tier
82   auto scache = std::shared_ptr<PersistentCacheTier>(new BlockCacheTier(opt));
83   tcache->AddTier(scache);
84 
85   Status s = tcache->Open();
86   assert(s.ok());
87   return tcache;
88 }
89 
NewTieredCache()90 std::unique_ptr<PersistentTieredCache> NewTieredCache() {
91   std::shared_ptr<Logger> log;
92   if (!Env::Default()->NewLogger(FLAGS_log_path, &log).ok()) {
93     fprintf(stderr, "Error creating log %s \n", FLAGS_log_path.c_str());
94     abort();
95   }
96 
97   auto pct = FLAGS_volatile_cache_pct / static_cast<double>(100);
98   PersistentCacheConfig opt(Env::Default(), FLAGS_path,
99                             (1 - pct) * FLAGS_cache_size, log);
100   opt.writer_dispatch_size = FLAGS_writer_iosize;
101   opt.writer_qdepth = FLAGS_writer_qdepth;
102   opt.pipeline_writes = FLAGS_enable_pipelined_writes;
103   opt.max_write_pipeline_backlog_size = std::numeric_limits<uint64_t>::max();
104   return NewTieredCache(FLAGS_cache_size * pct, opt);
105 }
106 
107 //
108 // Benchmark driver
109 //
110 class CacheTierBenchmark {
111  public:
CacheTierBenchmark(std::shared_ptr<PersistentCacheTier> && cache)112   explicit CacheTierBenchmark(std::shared_ptr<PersistentCacheTier>&& cache)
113       : cache_(cache) {
114     if (FLAGS_nthread_read) {
115       fprintf(stdout, "Pre-populating\n");
116       Prepop();
117       fprintf(stdout, "Pre-population completed\n");
118     }
119 
120     stats_.Clear();
121 
122     // Start IO threads
123     std::list<port::Thread> threads;
124     Spawn(FLAGS_nthread_write, &threads,
125           std::bind(&CacheTierBenchmark::Write, this));
126     Spawn(FLAGS_nthread_read, &threads,
127           std::bind(&CacheTierBenchmark::Read, this));
128 
129     // Wait till FLAGS_nsec and then signal to quit
130     StopWatchNano t(SystemClock::Default().get(), /*auto_start=*/true);
131     size_t sec = t.ElapsedNanos() / 1000000000ULL;
132     while (!quit_) {
133       sec = t.ElapsedNanos() / 1000000000ULL;
134       quit_ = sec > size_t(FLAGS_nsec);
135       /* sleep override */ sleep(1);
136     }
137 
138     // Wait for threads to exit
139     Join(&threads);
140     // Print stats
141     PrintStats(sec);
142     // Close the cache
143     cache_->TEST_Flush();
144     cache_->Close();
145   }
146 
147  private:
PrintStats(const size_t sec)148   void PrintStats(const size_t sec) {
149     std::ostringstream msg;
150     msg << "Test stats" << std::endl
151         << "* Elapsed: " << sec << " s" << std::endl
152         << "* Write Latency:" << std::endl
153         << stats_.write_latency_.ToString() << std::endl
154         << "* Read Latency:" << std::endl
155         << stats_.read_latency_.ToString() << std::endl
156         << "* Bytes written:" << std::endl
157         << stats_.bytes_written_.ToString() << std::endl
158         << "* Bytes read:" << std::endl
159         << stats_.bytes_read_.ToString() << std::endl
160         << "Cache stats:" << std::endl
161         << cache_->PrintStats() << std::endl;
162     fprintf(stderr, "%s\n", msg.str().c_str());
163   }
164 
165   //
166   // Insert implementation and corresponding helper functions
167   //
Prepop()168   void Prepop() {
169     for (uint64_t i = 0; i < 1024 * 1024; ++i) {
170       InsertKey(i);
171       insert_key_limit_++;
172       read_key_limit_++;
173     }
174 
175     // Wait until data is flushed
176     cache_->TEST_Flush();
177     // warmup the cache
178     for (uint64_t i = 0; i < 1024 * 1024; ReadKey(i++)) {
179     }
180   }
181 
Write()182   void Write() {
183     while (!quit_) {
184       InsertKey(insert_key_limit_++);
185     }
186   }
187 
InsertKey(const uint64_t key)188   void InsertKey(const uint64_t key) {
189     // construct key
190     uint64_t k[3];
191     Slice block_key = FillKey(k, key);
192 
193     // construct value
194     auto block = NewBlock(key);
195 
196     // insert
197     StopWatchNano timer(SystemClock::Default().get(), /*auto_start=*/true);
198     while (true) {
199       Status status = cache_->Insert(block_key, block.get(), FLAGS_iosize);
200       if (status.ok()) {
201         break;
202       }
203 
204       // transient error is possible if we run without pipelining
205       assert(!FLAGS_enable_pipelined_writes);
206     }
207 
208     // adjust stats
209     const size_t elapsed_micro = timer.ElapsedNanos() / 1000;
210     stats_.write_latency_.Add(elapsed_micro);
211     stats_.bytes_written_.Add(FLAGS_iosize);
212   }
213 
214   //
215   // Read implementation
216   //
Read()217   void Read() {
218     while (!quit_) {
219       ReadKey(random() % read_key_limit_);
220     }
221   }
222 
ReadKey(const uint64_t val)223   void ReadKey(const uint64_t val) {
224     // construct key
225     uint64_t k[3];
226     Slice key = FillKey(k, val);
227 
228     // Lookup in cache
229     StopWatchNano timer(SystemClock::Default().get(), /*auto_start=*/true);
230     std::unique_ptr<char[]> block;
231     size_t size;
232     Status status = cache_->Lookup(key, &block, &size);
233     if (!status.ok()) {
234       fprintf(stderr, "%s\n", status.ToString().c_str());
235     }
236     assert(status.ok());
237     assert(size == (size_t) FLAGS_iosize);
238 
239     // adjust stats
240     const size_t elapsed_micro = timer.ElapsedNanos() / 1000;
241     stats_.read_latency_.Add(elapsed_micro);
242     stats_.bytes_read_.Add(FLAGS_iosize);
243 
244     // verify content
245     if (!FLAGS_benchmark) {
246       auto expected_block = NewBlock(val);
247       assert(memcmp(block.get(), expected_block.get(), FLAGS_iosize) == 0);
248     }
249   }
250 
251   // create data for a key by filling with a certain pattern
NewBlock(const uint64_t val)252   std::unique_ptr<char[]> NewBlock(const uint64_t val) {
253     std::unique_ptr<char[]> data(new char[FLAGS_iosize]);
254     memset(data.get(), val % 255, FLAGS_iosize);
255     return data;
256   }
257 
258   // spawn threads
Spawn(const size_t n,std::list<port::Thread> * threads,const std::function<void ()> & fn)259   void Spawn(const size_t n, std::list<port::Thread>* threads,
260              const std::function<void()>& fn) {
261     for (size_t i = 0; i < n; ++i) {
262       threads->emplace_back(fn);
263     }
264   }
265 
266   // join threads
Join(std::list<port::Thread> * threads)267   void Join(std::list<port::Thread>* threads) {
268     for (auto& th : *threads) {
269       th.join();
270     }
271   }
272 
273   // construct key
FillKey(uint64_t (& k)[3],const uint64_t val)274   Slice FillKey(uint64_t (&k)[3], const uint64_t val) {
275     k[0] = k[1] = 0;
276     k[2] = val;
277     void* p = static_cast<void*>(&k);
278     return Slice(static_cast<char*>(p), sizeof(k));
279   }
280 
281   // benchmark stats
282   struct Stats {
ClearROCKSDB_NAMESPACE::CacheTierBenchmark::Stats283     void Clear() {
284       bytes_written_.Clear();
285       bytes_read_.Clear();
286       read_latency_.Clear();
287       write_latency_.Clear();
288     }
289 
290     HistogramImpl bytes_written_;
291     HistogramImpl bytes_read_;
292     HistogramImpl read_latency_;
293     HistogramImpl write_latency_;
294   };
295 
296   std::shared_ptr<PersistentCacheTier> cache_;  // cache implementation
297   std::atomic<uint64_t> insert_key_limit_{0};   // data inserted upto
298   std::atomic<uint64_t> read_key_limit_{0};     // data can be read safely upto
299   bool quit_ = false;                           // Quit thread ?
300   mutable Stats stats_;                         // Stats
301 };
302 
303 }  // namespace ROCKSDB_NAMESPACE
304 
305 //
306 // main
307 //
main(int argc,char ** argv)308 int main(int argc, char** argv) {
309   GFLAGS_NAMESPACE::SetUsageMessage(std::string("\nUSAGE:\n") +
310                                     std::string(argv[0]) + " [OPTIONS]...");
311   GFLAGS_NAMESPACE::ParseCommandLineFlags(&argc, &argv, false);
312 
313   std::ostringstream msg;
314   msg << "Config" << std::endl
315       << "======" << std::endl
316       << "* nsec=" << FLAGS_nsec << std::endl
317       << "* nthread_write=" << FLAGS_nthread_write << std::endl
318       << "* path=" << FLAGS_path << std::endl
319       << "* cache_size=" << FLAGS_cache_size << std::endl
320       << "* iosize=" << FLAGS_iosize << std::endl
321       << "* writer_iosize=" << FLAGS_writer_iosize << std::endl
322       << "* writer_qdepth=" << FLAGS_writer_qdepth << std::endl
323       << "* enable_pipelined_writes=" << FLAGS_enable_pipelined_writes
324       << std::endl
325       << "* cache_type=" << FLAGS_cache_type << std::endl
326       << "* benchmark=" << FLAGS_benchmark << std::endl
327       << "* volatile_cache_pct=" << FLAGS_volatile_cache_pct << std::endl;
328 
329   fprintf(stderr, "%s\n", msg.str().c_str());
330 
331   std::shared_ptr<ROCKSDB_NAMESPACE::PersistentCacheTier> cache;
332   if (FLAGS_cache_type == "block_cache") {
333     fprintf(stderr, "Using block cache implementation\n");
334     cache = ROCKSDB_NAMESPACE::NewBlockCache();
335   } else if (FLAGS_cache_type == "volatile") {
336     fprintf(stderr, "Using volatile cache implementation\n");
337     cache = ROCKSDB_NAMESPACE::NewVolatileCache();
338   } else if (FLAGS_cache_type == "tiered") {
339     fprintf(stderr, "Using tiered cache implementation\n");
340     cache = ROCKSDB_NAMESPACE::NewTieredCache();
341   } else {
342     fprintf(stderr, "Unknown option for cache\n");
343   }
344 
345   assert(cache);
346   if (!cache) {
347     fprintf(stderr, "Error creating cache\n");
348     abort();
349   }
350 
351   std::unique_ptr<ROCKSDB_NAMESPACE::CacheTierBenchmark> benchmark(
352       new ROCKSDB_NAMESPACE::CacheTierBenchmark(std::move(cache)));
353 
354   return 0;
355 }
356 #endif  // #ifndef GFLAGS
357 #else
main(int,char **)358 int main(int, char**) { return 0; }
359 #endif
360