1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #ifdef GFLAGS
7 #include <cinttypes>
8 #include <cstdio>
9 #include <limits>
10 #include <set>
11 #include <sstream>
12 
13 #include "monitoring/histogram.h"
14 #include "port/port.h"
15 #include "rocksdb/cache.h"
16 #include "rocksdb/convenience.h"
17 #include "rocksdb/db.h"
18 #include "rocksdb/env.h"
19 #include "rocksdb/secondary_cache.h"
20 #include "rocksdb/system_clock.h"
21 #include "table/block_based/cachable_entry.h"
22 #include "util/coding.h"
23 #include "util/gflags_compat.h"
24 #include "util/hash.h"
25 #include "util/mutexlock.h"
26 #include "util/random.h"
27 #include "util/stop_watch.h"
28 #include "util/string_util.h"
29 
30 using GFLAGS_NAMESPACE::ParseCommandLineFlags;
31 
32 static constexpr uint32_t KiB = uint32_t{1} << 10;
33 static constexpr uint32_t MiB = KiB << 10;
34 static constexpr uint64_t GiB = MiB << 10;
35 
36 DEFINE_uint32(threads, 16, "Number of concurrent threads to run.");
37 DEFINE_uint64(cache_size, 1 * GiB,
38               "Number of bytes to use as a cache of uncompressed data.");
39 DEFINE_uint32(num_shard_bits, 6, "shard_bits.");
40 
41 DEFINE_double(resident_ratio, 0.25,
42               "Ratio of keys fitting in cache to keyspace.");
43 DEFINE_uint64(ops_per_thread, 2000000U, "Number of operations per thread.");
44 DEFINE_uint32(value_bytes, 8 * KiB, "Size of each value added.");
45 
46 DEFINE_uint32(skew, 5, "Degree of skew in key selection");
47 DEFINE_bool(populate_cache, true, "Populate cache before operations");
48 
49 DEFINE_uint32(lookup_insert_percent, 87,
50               "Ratio of lookup (+ insert on not found) to total workload "
51               "(expressed as a percentage)");
52 DEFINE_uint32(insert_percent, 2,
53               "Ratio of insert to total workload (expressed as a percentage)");
54 DEFINE_uint32(lookup_percent, 10,
55               "Ratio of lookup to total workload (expressed as a percentage)");
56 DEFINE_uint32(erase_percent, 1,
57               "Ratio of erase to total workload (expressed as a percentage)");
58 DEFINE_bool(gather_stats, false,
59             "Whether to periodically simulate gathering block cache stats, "
60             "using one more thread.");
61 DEFINE_uint32(
62     gather_stats_sleep_ms, 1000,
63     "How many milliseconds to sleep between each gathering of stats.");
64 
65 DEFINE_uint32(gather_stats_entries_per_lock, 256,
66               "For Cache::ApplyToAllEntries");
67 DEFINE_bool(skewed, false, "If true, skew the key access distribution");
68 #ifndef ROCKSDB_LITE
69 DEFINE_string(secondary_cache_uri, "",
70               "Full URI for creating a custom secondary cache object");
71 static class std::shared_ptr<ROCKSDB_NAMESPACE::SecondaryCache> secondary_cache;
72 #endif  // ROCKSDB_LITE
73 
74 DEFINE_bool(use_clock_cache, false, "");
75 
76 namespace ROCKSDB_NAMESPACE {
77 
78 class CacheBench;
79 namespace {
80 // State shared by all concurrent executions of the same benchmark.
81 class SharedState {
82  public:
SharedState(CacheBench * cache_bench)83   explicit SharedState(CacheBench* cache_bench)
84       : cv_(&mu_),
85         num_initialized_(0),
86         start_(false),
87         num_done_(0),
88         cache_bench_(cache_bench) {}
89 
~SharedState()90   ~SharedState() {}
91 
GetMutex()92   port::Mutex* GetMutex() { return &mu_; }
93 
GetCondVar()94   port::CondVar* GetCondVar() { return &cv_; }
95 
GetCacheBench() const96   CacheBench* GetCacheBench() const { return cache_bench_; }
97 
IncInitialized()98   void IncInitialized() { num_initialized_++; }
99 
IncDone()100   void IncDone() { num_done_++; }
101 
AllInitialized() const102   bool AllInitialized() const { return num_initialized_ >= FLAGS_threads; }
103 
AllDone() const104   bool AllDone() const { return num_done_ >= FLAGS_threads; }
105 
SetStart()106   void SetStart() { start_ = true; }
107 
Started() const108   bool Started() const { return start_; }
109 
110  private:
111   port::Mutex mu_;
112   port::CondVar cv_;
113 
114   uint64_t num_initialized_;
115   bool start_;
116   uint64_t num_done_;
117 
118   CacheBench* cache_bench_;
119 };
120 
121 // Per-thread state for concurrent executions of the same benchmark.
122 struct ThreadState {
123   uint32_t tid;
124   Random64 rnd;
125   SharedState* shared;
126   HistogramImpl latency_ns_hist;
127   uint64_t duration_us = 0;
128 
ThreadStateROCKSDB_NAMESPACE::__anone0cec4ae0111::ThreadState129   ThreadState(uint32_t index, SharedState* _shared)
130       : tid(index), rnd(1000 + index), shared(_shared) {}
131 };
132 
133 struct KeyGen {
134   char key_data[27];
135 
GetRandROCKSDB_NAMESPACE::__anone0cec4ae0111::KeyGen136   Slice GetRand(Random64& rnd, uint64_t max_key, int max_log) {
137     uint64_t key = 0;
138     if (!FLAGS_skewed) {
139       uint64_t raw = rnd.Next();
140       // Skew according to setting
141       for (uint32_t i = 0; i < FLAGS_skew; ++i) {
142         raw = std::min(raw, rnd.Next());
143       }
144       key = FastRange64(raw, max_key);
145     } else {
146       key = rnd.Skewed(max_log);
147       if (key > max_key) {
148         key -= max_key;
149       }
150     }
151     // Variable size and alignment
152     size_t off = key % 8;
153     key_data[0] = char{42};
154     EncodeFixed64(key_data + 1, key);
155     key_data[9] = char{11};
156     EncodeFixed64(key_data + 10, key);
157     key_data[18] = char{4};
158     EncodeFixed64(key_data + 19, key);
159     return Slice(&key_data[off], sizeof(key_data) - off);
160   }
161 };
162 
createValue(Random64 & rnd)163 char* createValue(Random64& rnd) {
164   char* rv = new char[FLAGS_value_bytes];
165   // Fill with some filler data, and take some CPU time
166   for (uint32_t i = 0; i < FLAGS_value_bytes; i += 8) {
167     EncodeFixed64(rv + i, rnd.Next());
168   }
169   return rv;
170 }
171 
172 // Callbacks for secondary cache
SizeFn(void *)173 size_t SizeFn(void* /*obj*/) { return FLAGS_value_bytes; }
174 
SaveToFn(void * obj,size_t,size_t size,void * out)175 Status SaveToFn(void* obj, size_t /*offset*/, size_t size, void* out) {
176   memcpy(out, obj, size);
177   return Status::OK();
178 }
179 
180 // Different deleters to simulate using deleter to gather
181 // stats on the code origin and kind of cache entries.
deleter1(const Slice &,void * value)182 void deleter1(const Slice& /*key*/, void* value) {
183   delete[] static_cast<char*>(value);
184 }
deleter2(const Slice &,void * value)185 void deleter2(const Slice& /*key*/, void* value) {
186   delete[] static_cast<char*>(value);
187 }
deleter3(const Slice &,void * value)188 void deleter3(const Slice& /*key*/, void* value) {
189   delete[] static_cast<char*>(value);
190 }
191 
192 Cache::CacheItemHelper helper1(SizeFn, SaveToFn, deleter1);
193 Cache::CacheItemHelper helper2(SizeFn, SaveToFn, deleter2);
194 Cache::CacheItemHelper helper3(SizeFn, SaveToFn, deleter3);
195 }  // namespace
196 
197 class CacheBench {
198   static constexpr uint64_t kHundredthUint64 =
199       std::numeric_limits<uint64_t>::max() / 100U;
200 
201  public:
CacheBench()202   CacheBench()
203       : max_key_(static_cast<uint64_t>(FLAGS_cache_size / FLAGS_resident_ratio /
204                                        FLAGS_value_bytes)),
205         lookup_insert_threshold_(kHundredthUint64 *
206                                  FLAGS_lookup_insert_percent),
207         insert_threshold_(lookup_insert_threshold_ +
208                           kHundredthUint64 * FLAGS_insert_percent),
209         lookup_threshold_(insert_threshold_ +
210                           kHundredthUint64 * FLAGS_lookup_percent),
211         erase_threshold_(lookup_threshold_ +
212                          kHundredthUint64 * FLAGS_erase_percent),
213         skewed_(FLAGS_skewed) {
214     if (erase_threshold_ != 100U * kHundredthUint64) {
215       fprintf(stderr, "Percentages must add to 100.\n");
216       exit(1);
217     }
218 
219     max_log_ = 0;
220     if (skewed_) {
221       uint64_t max_key = max_key_;
222       while (max_key >>= 1) max_log_++;
223       if (max_key > (1u << max_log_)) max_log_++;
224     }
225 
226     if (FLAGS_use_clock_cache) {
227       cache_ = NewClockCache(FLAGS_cache_size, FLAGS_num_shard_bits);
228       if (!cache_) {
229         fprintf(stderr, "Clock cache not supported.\n");
230         exit(1);
231       }
232     } else {
233       LRUCacheOptions opts(FLAGS_cache_size, FLAGS_num_shard_bits, false, 0.5);
234 #ifndef ROCKSDB_LITE
235       if (!FLAGS_secondary_cache_uri.empty()) {
236         Status s = SecondaryCache::CreateFromString(
237             ConfigOptions(), FLAGS_secondary_cache_uri, &secondary_cache);
238         if (secondary_cache == nullptr) {
239           fprintf(
240               stderr,
241               "No secondary cache registered matching string: %s status=%s\n",
242               FLAGS_secondary_cache_uri.c_str(), s.ToString().c_str());
243           exit(1);
244         }
245         opts.secondary_cache = secondary_cache;
246       }
247 #endif  // ROCKSDB_LITE
248 
249       cache_ = NewLRUCache(opts);
250     }
251   }
252 
~CacheBench()253   ~CacheBench() {}
254 
PopulateCache()255   void PopulateCache() {
256     Random64 rnd(1);
257     KeyGen keygen;
258     for (uint64_t i = 0; i < 2 * FLAGS_cache_size; i += FLAGS_value_bytes) {
259       cache_->Insert(keygen.GetRand(rnd, max_key_, max_log_), createValue(rnd),
260                      &helper1, FLAGS_value_bytes);
261     }
262   }
263 
Run()264   bool Run() {
265     const auto clock = SystemClock::Default().get();
266 
267     PrintEnv();
268     SharedState shared(this);
269     std::vector<std::unique_ptr<ThreadState> > threads(FLAGS_threads);
270     for (uint32_t i = 0; i < FLAGS_threads; i++) {
271       threads[i].reset(new ThreadState(i, &shared));
272       std::thread(ThreadBody, threads[i].get()).detach();
273     }
274 
275     HistogramImpl stats_hist;
276     std::string stats_report;
277     std::thread stats_thread(StatsBody, &shared, &stats_hist, &stats_report);
278 
279     uint64_t start_time;
280     {
281       MutexLock l(shared.GetMutex());
282       while (!shared.AllInitialized()) {
283         shared.GetCondVar()->Wait();
284       }
285       // Record start time
286       start_time = clock->NowMicros();
287 
288       // Start all threads
289       shared.SetStart();
290       shared.GetCondVar()->SignalAll();
291 
292       // Wait threads to complete
293       while (!shared.AllDone()) {
294         shared.GetCondVar()->Wait();
295       }
296     }
297 
298     // Stats gathering is considered background work. This time measurement
299     // is for foreground work, and not really ideal for that. See below.
300     uint64_t end_time = clock->NowMicros();
301     stats_thread.join();
302 
303     // Wall clock time - includes idle time if threads
304     // finish at different times (not ideal).
305     double elapsed_secs = static_cast<double>(end_time - start_time) * 1e-6;
306     uint32_t ops_per_sec = static_cast<uint32_t>(
307         1.0 * FLAGS_threads * FLAGS_ops_per_thread / elapsed_secs);
308     printf("Complete in %.3f s; Rough parallel ops/sec = %u\n", elapsed_secs,
309            ops_per_sec);
310 
311     // Total time in each thread (more accurate throughput measure)
312     elapsed_secs = 0;
313     for (uint32_t i = 0; i < FLAGS_threads; i++) {
314       elapsed_secs += threads[i]->duration_us * 1e-6;
315     }
316     ops_per_sec = static_cast<uint32_t>(1.0 * FLAGS_threads *
317                                         FLAGS_ops_per_thread / elapsed_secs);
318     printf("Thread ops/sec = %u\n", ops_per_sec);
319 
320     printf("\nOperation latency (ns):\n");
321     HistogramImpl combined;
322     for (uint32_t i = 0; i < FLAGS_threads; i++) {
323       combined.Merge(threads[i]->latency_ns_hist);
324     }
325     printf("%s", combined.ToString().c_str());
326 
327     if (FLAGS_gather_stats) {
328       printf("\nGather stats latency (us):\n");
329       printf("%s", stats_hist.ToString().c_str());
330     }
331 
332     printf("\n%s", stats_report.c_str());
333 
334     return true;
335   }
336 
337  private:
338   std::shared_ptr<Cache> cache_;
339   const uint64_t max_key_;
340   // Cumulative thresholds in the space of a random uint64_t
341   const uint64_t lookup_insert_threshold_;
342   const uint64_t insert_threshold_;
343   const uint64_t lookup_threshold_;
344   const uint64_t erase_threshold_;
345   const bool skewed_;
346   int max_log_;
347 
348   // A benchmark version of gathering stats on an active block cache by
349   // iterating over it. The primary purpose is to measure the impact of
350   // gathering stats with ApplyToAllEntries on throughput- and
351   // latency-sensitive Cache users. Performance of stats gathering is
352   // also reported. The last set of gathered stats is also reported, for
353   // manual sanity checking for logical errors or other unexpected
354   // behavior of cache_bench or the underlying Cache.
StatsBody(SharedState * shared,HistogramImpl * stats_hist,std::string * stats_report)355   static void StatsBody(SharedState* shared, HistogramImpl* stats_hist,
356                         std::string* stats_report) {
357     if (!FLAGS_gather_stats) {
358       return;
359     }
360     const auto clock = SystemClock::Default().get();
361     uint64_t total_key_size = 0;
362     uint64_t total_charge = 0;
363     uint64_t total_entry_count = 0;
364     std::set<Cache::DeleterFn> deleters;
365     StopWatchNano timer(clock);
366 
367     for (;;) {
368       uint64_t time;
369       time = clock->NowMicros();
370       uint64_t deadline = time + uint64_t{FLAGS_gather_stats_sleep_ms} * 1000;
371 
372       {
373         MutexLock l(shared->GetMutex());
374         for (;;) {
375           if (shared->AllDone()) {
376             std::ostringstream ostr;
377             ostr << "Most recent cache entry stats:\n"
378                  << "Number of entries: " << total_entry_count << "\n"
379                  << "Total charge: " << BytesToHumanString(total_charge) << "\n"
380                  << "Average key size: "
381                  << (1.0 * total_key_size / total_entry_count) << "\n"
382                  << "Average charge: "
383                  << BytesToHumanString(1.0 * total_charge / total_entry_count)
384                  << "\n"
385                  << "Unique deleters: " << deleters.size() << "\n";
386             *stats_report = ostr.str();
387             return;
388           }
389           if (clock->NowMicros() >= deadline) {
390             break;
391           }
392           uint64_t diff = deadline - std::min(clock->NowMicros(), deadline);
393           shared->GetCondVar()->TimedWait(diff + 1);
394         }
395       }
396 
397       // Now gather stats, outside of mutex
398       total_key_size = 0;
399       total_charge = 0;
400       total_entry_count = 0;
401       deleters.clear();
402       auto fn = [&](const Slice& key, void* /*value*/, size_t charge,
403                     Cache::DeleterFn deleter) {
404         total_key_size += key.size();
405         total_charge += charge;
406         ++total_entry_count;
407         // Something slightly more expensive as in (future) stats by category
408         deleters.insert(deleter);
409       };
410       timer.Start();
411       Cache::ApplyToAllEntriesOptions opts;
412       opts.average_entries_per_lock = FLAGS_gather_stats_entries_per_lock;
413       shared->GetCacheBench()->cache_->ApplyToAllEntries(fn, opts);
414       stats_hist->Add(timer.ElapsedNanos() / 1000);
415     }
416   }
417 
ThreadBody(ThreadState * thread)418   static void ThreadBody(ThreadState* thread) {
419     SharedState* shared = thread->shared;
420 
421     {
422       MutexLock l(shared->GetMutex());
423       shared->IncInitialized();
424       if (shared->AllInitialized()) {
425         shared->GetCondVar()->SignalAll();
426       }
427       while (!shared->Started()) {
428         shared->GetCondVar()->Wait();
429       }
430     }
431     thread->shared->GetCacheBench()->OperateCache(thread);
432 
433     {
434       MutexLock l(shared->GetMutex());
435       shared->IncDone();
436       if (shared->AllDone()) {
437         shared->GetCondVar()->SignalAll();
438       }
439     }
440   }
441 
OperateCache(ThreadState * thread)442   void OperateCache(ThreadState* thread) {
443     // To use looked-up values
444     uint64_t result = 0;
445     // To hold handles for a non-trivial amount of time
446     Cache::Handle* handle = nullptr;
447     KeyGen gen;
448     const auto clock = SystemClock::Default().get();
449     uint64_t start_time = clock->NowMicros();
450     StopWatchNano timer(clock);
451 
452     for (uint64_t i = 0; i < FLAGS_ops_per_thread; i++) {
453       timer.Start();
454       Slice key = gen.GetRand(thread->rnd, max_key_, max_log_);
455       uint64_t random_op = thread->rnd.Next();
456       Cache::CreateCallback create_cb =
457           [](void* buf, size_t size, void** out_obj, size_t* charge) -> Status {
458         *out_obj = reinterpret_cast<void*>(new char[size]);
459         memcpy(*out_obj, buf, size);
460         *charge = size;
461         return Status::OK();
462       };
463 
464       if (random_op < lookup_insert_threshold_) {
465         if (handle) {
466           cache_->Release(handle);
467           handle = nullptr;
468         }
469         // do lookup
470         handle = cache_->Lookup(key, &helper2, create_cb, Cache::Priority::LOW,
471                                 true);
472         if (handle) {
473           // do something with the data
474           result += NPHash64(static_cast<char*>(cache_->Value(handle)),
475                              FLAGS_value_bytes);
476         } else {
477           // do insert
478           cache_->Insert(key, createValue(thread->rnd), &helper2,
479                          FLAGS_value_bytes, &handle);
480         }
481       } else if (random_op < insert_threshold_) {
482         if (handle) {
483           cache_->Release(handle);
484           handle = nullptr;
485         }
486         // do insert
487         cache_->Insert(key, createValue(thread->rnd), &helper3,
488                        FLAGS_value_bytes, &handle);
489       } else if (random_op < lookup_threshold_) {
490         if (handle) {
491           cache_->Release(handle);
492           handle = nullptr;
493         }
494         // do lookup
495         handle = cache_->Lookup(key, &helper2, create_cb, Cache::Priority::LOW,
496                                 true);
497         if (handle) {
498           // do something with the data
499           result += NPHash64(static_cast<char*>(cache_->Value(handle)),
500                              FLAGS_value_bytes);
501         }
502       } else if (random_op < erase_threshold_) {
503         // do erase
504         cache_->Erase(key);
505       } else {
506         // Should be extremely unlikely (noop)
507         assert(random_op >= kHundredthUint64 * 100U);
508       }
509       thread->latency_ns_hist.Add(timer.ElapsedNanos());
510     }
511     if (handle) {
512       cache_->Release(handle);
513       handle = nullptr;
514     }
515     // Ensure computations on `result` are not optimized away.
516     if (result == 1) {
517       printf("You are extremely unlucky(2). Try again.\n");
518       exit(1);
519     }
520     thread->duration_us = clock->NowMicros() - start_time;
521   }
522 
PrintEnv() const523   void PrintEnv() const {
524     printf("RocksDB version     : %d.%d\n", kMajorVersion, kMinorVersion);
525     printf("Number of threads   : %u\n", FLAGS_threads);
526     printf("Ops per thread      : %" PRIu64 "\n", FLAGS_ops_per_thread);
527     printf("Cache size          : %s\n",
528            BytesToHumanString(FLAGS_cache_size).c_str());
529     printf("Num shard bits      : %u\n", FLAGS_num_shard_bits);
530     printf("Max key             : %" PRIu64 "\n", max_key_);
531     printf("Resident ratio      : %g\n", FLAGS_resident_ratio);
532     printf("Skew degree         : %u\n", FLAGS_skew);
533     printf("Populate cache      : %d\n", int{FLAGS_populate_cache});
534     printf("Lookup+Insert pct   : %u%%\n", FLAGS_lookup_insert_percent);
535     printf("Insert percentage   : %u%%\n", FLAGS_insert_percent);
536     printf("Lookup percentage   : %u%%\n", FLAGS_lookup_percent);
537     printf("Erase percentage    : %u%%\n", FLAGS_erase_percent);
538     std::ostringstream stats;
539     if (FLAGS_gather_stats) {
540       stats << "enabled (" << FLAGS_gather_stats_sleep_ms << "ms, "
541             << FLAGS_gather_stats_entries_per_lock << "/lock)";
542     } else {
543       stats << "disabled";
544     }
545     printf("Gather stats        : %s\n", stats.str().c_str());
546     printf("----------------------------\n");
547   }
548 };
549 
cache_bench_tool(int argc,char ** argv)550 int cache_bench_tool(int argc, char** argv) {
551   ParseCommandLineFlags(&argc, &argv, true);
552 
553   if (FLAGS_threads <= 0) {
554     fprintf(stderr, "threads number <= 0\n");
555     exit(1);
556   }
557 
558   ROCKSDB_NAMESPACE::CacheBench bench;
559   if (FLAGS_populate_cache) {
560     bench.PopulateCache();
561     printf("Population complete\n");
562     printf("----------------------------\n");
563   }
564   if (bench.Run()) {
565     return 0;
566   } else {
567     return 1;
568   }
569 }  // namespace ROCKSDB_NAMESPACE
570 }  // namespace ROCKSDB_NAMESPACE
571 
572 #endif  // GFLAGS
573