1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #ifdef GFLAGS
7 #include <cinttypes>
8 #include <cstdio>
9 #include <limits>
10 #include <set>
11 #include <sstream>
12 
13 #include "monitoring/histogram.h"
14 #include "port/port.h"
15 #include "rocksdb/cache.h"
16 #include "rocksdb/db.h"
17 #include "rocksdb/env.h"
18 #include "rocksdb/secondary_cache.h"
19 #include "rocksdb/system_clock.h"
20 #include "rocksdb/utilities/object_registry.h"
21 #include "table/block_based/cachable_entry.h"
22 #include "util/coding.h"
23 #include "util/gflags_compat.h"
24 #include "util/hash.h"
25 #include "util/mutexlock.h"
26 #include "util/random.h"
27 #include "util/stop_watch.h"
28 #include "util/string_util.h"
29 
30 using GFLAGS_NAMESPACE::ParseCommandLineFlags;
31 
32 static constexpr uint32_t KiB = uint32_t{1} << 10;
33 static constexpr uint32_t MiB = KiB << 10;
34 static constexpr uint64_t GiB = MiB << 10;
35 
36 DEFINE_uint32(threads, 16, "Number of concurrent threads to run.");
37 DEFINE_uint64(cache_size, 1 * GiB,
38               "Number of bytes to use as a cache of uncompressed data.");
39 DEFINE_uint32(num_shard_bits, 6, "shard_bits.");
40 
41 DEFINE_double(resident_ratio, 0.25,
42               "Ratio of keys fitting in cache to keyspace.");
43 DEFINE_uint64(ops_per_thread, 2000000U, "Number of operations per thread.");
44 DEFINE_uint32(value_bytes, 8 * KiB, "Size of each value added.");
45 
46 DEFINE_uint32(skew, 5, "Degree of skew in key selection");
47 DEFINE_bool(populate_cache, true, "Populate cache before operations");
48 
49 DEFINE_uint32(lookup_insert_percent, 87,
50               "Ratio of lookup (+ insert on not found) to total workload "
51               "(expressed as a percentage)");
52 DEFINE_uint32(insert_percent, 2,
53               "Ratio of insert to total workload (expressed as a percentage)");
54 DEFINE_uint32(lookup_percent, 10,
55               "Ratio of lookup to total workload (expressed as a percentage)");
56 DEFINE_uint32(erase_percent, 1,
57               "Ratio of erase to total workload (expressed as a percentage)");
58 DEFINE_bool(gather_stats, false,
59             "Whether to periodically simulate gathering block cache stats, "
60             "using one more thread.");
61 DEFINE_uint32(
62     gather_stats_sleep_ms, 1000,
63     "How many milliseconds to sleep between each gathering of stats.");
64 
65 DEFINE_uint32(gather_stats_entries_per_lock, 256,
66               "For Cache::ApplyToAllEntries");
67 DEFINE_bool(skewed, false, "If true, skew the key access distribution");
68 #ifndef ROCKSDB_LITE
69 DEFINE_string(secondary_cache_uri, "",
70               "Full URI for creating a custom secondary cache object");
71 static class std::shared_ptr<ROCKSDB_NAMESPACE::SecondaryCache> secondary_cache;
72 #endif  // ROCKSDB_LITE
73 
74 DEFINE_bool(use_clock_cache, false, "");
75 
76 namespace ROCKSDB_NAMESPACE {
77 
78 class CacheBench;
79 namespace {
80 // State shared by all concurrent executions of the same benchmark.
81 class SharedState {
82  public:
SharedState(CacheBench * cache_bench)83   explicit SharedState(CacheBench* cache_bench)
84       : cv_(&mu_),
85         num_initialized_(0),
86         start_(false),
87         num_done_(0),
88         cache_bench_(cache_bench) {}
89 
~SharedState()90   ~SharedState() {}
91 
GetMutex()92   port::Mutex* GetMutex() { return &mu_; }
93 
GetCondVar()94   port::CondVar* GetCondVar() { return &cv_; }
95 
GetCacheBench() const96   CacheBench* GetCacheBench() const { return cache_bench_; }
97 
IncInitialized()98   void IncInitialized() { num_initialized_++; }
99 
IncDone()100   void IncDone() { num_done_++; }
101 
AllInitialized() const102   bool AllInitialized() const { return num_initialized_ >= FLAGS_threads; }
103 
AllDone() const104   bool AllDone() const { return num_done_ >= FLAGS_threads; }
105 
SetStart()106   void SetStart() { start_ = true; }
107 
Started() const108   bool Started() const { return start_; }
109 
110  private:
111   port::Mutex mu_;
112   port::CondVar cv_;
113 
114   uint64_t num_initialized_;
115   bool start_;
116   uint64_t num_done_;
117 
118   CacheBench* cache_bench_;
119 };
120 
121 // Per-thread state for concurrent executions of the same benchmark.
122 struct ThreadState {
123   uint32_t tid;
124   Random64 rnd;
125   SharedState* shared;
126   HistogramImpl latency_ns_hist;
127   uint64_t duration_us = 0;
128 
ThreadStateROCKSDB_NAMESPACE::__anonfc6ee2bb0111::ThreadState129   ThreadState(uint32_t index, SharedState* _shared)
130       : tid(index), rnd(1000 + index), shared(_shared) {}
131 };
132 
133 struct KeyGen {
134   char key_data[27];
135 
GetRandROCKSDB_NAMESPACE::__anonfc6ee2bb0111::KeyGen136   Slice GetRand(Random64& rnd, uint64_t max_key, int max_log) {
137     uint64_t key = 0;
138     if (!FLAGS_skewed) {
139       uint64_t raw = rnd.Next();
140       // Skew according to setting
141       for (uint32_t i = 0; i < FLAGS_skew; ++i) {
142         raw = std::min(raw, rnd.Next());
143       }
144       key = FastRange64(raw, max_key);
145     } else {
146       key = rnd.Skewed(max_log);
147       if (key > max_key) {
148         key -= max_key;
149       }
150     }
151     // Variable size and alignment
152     size_t off = key % 8;
153     key_data[0] = char{42};
154     EncodeFixed64(key_data + 1, key);
155     key_data[9] = char{11};
156     EncodeFixed64(key_data + 10, key);
157     key_data[18] = char{4};
158     EncodeFixed64(key_data + 19, key);
159     return Slice(&key_data[off], sizeof(key_data) - off);
160   }
161 };
162 
createValue(Random64 & rnd)163 char* createValue(Random64& rnd) {
164   char* rv = new char[FLAGS_value_bytes];
165   // Fill with some filler data, and take some CPU time
166   for (uint32_t i = 0; i < FLAGS_value_bytes; i += 8) {
167     EncodeFixed64(rv + i, rnd.Next());
168   }
169   return rv;
170 }
171 
172 // Callbacks for secondary cache
SizeFn(void *)173 size_t SizeFn(void* /*obj*/) { return FLAGS_value_bytes; }
174 
SaveToFn(void * obj,size_t,size_t size,void * out)175 Status SaveToFn(void* obj, size_t /*offset*/, size_t size, void* out) {
176   memcpy(out, obj, size);
177   return Status::OK();
178 }
179 
180 // Different deleters to simulate using deleter to gather
181 // stats on the code origin and kind of cache entries.
deleter1(const Slice &,void * value)182 void deleter1(const Slice& /*key*/, void* value) {
183   delete[] static_cast<char*>(value);
184 }
deleter2(const Slice &,void * value)185 void deleter2(const Slice& /*key*/, void* value) {
186   delete[] static_cast<char*>(value);
187 }
deleter3(const Slice &,void * value)188 void deleter3(const Slice& /*key*/, void* value) {
189   delete[] static_cast<char*>(value);
190 }
191 
192 Cache::CacheItemHelper helper1(SizeFn, SaveToFn, deleter1);
193 Cache::CacheItemHelper helper2(SizeFn, SaveToFn, deleter2);
194 Cache::CacheItemHelper helper3(SizeFn, SaveToFn, deleter3);
195 }  // namespace
196 
197 class CacheBench {
198   static constexpr uint64_t kHundredthUint64 =
199       std::numeric_limits<uint64_t>::max() / 100U;
200 
201  public:
CacheBench()202   CacheBench()
203       : max_key_(static_cast<uint64_t>(FLAGS_cache_size / FLAGS_resident_ratio /
204                                        FLAGS_value_bytes)),
205         lookup_insert_threshold_(kHundredthUint64 *
206                                  FLAGS_lookup_insert_percent),
207         insert_threshold_(lookup_insert_threshold_ +
208                           kHundredthUint64 * FLAGS_insert_percent),
209         lookup_threshold_(insert_threshold_ +
210                           kHundredthUint64 * FLAGS_lookup_percent),
211         erase_threshold_(lookup_threshold_ +
212                          kHundredthUint64 * FLAGS_erase_percent),
213         skewed_(FLAGS_skewed) {
214     if (erase_threshold_ != 100U * kHundredthUint64) {
215       fprintf(stderr, "Percentages must add to 100.\n");
216       exit(1);
217     }
218 
219     max_log_ = 0;
220     if (skewed_) {
221       uint64_t max_key = max_key_;
222       while (max_key >>= 1) max_log_++;
223       if (max_key > (1u << max_log_)) max_log_++;
224     }
225 
226     if (FLAGS_use_clock_cache) {
227       cache_ = NewClockCache(FLAGS_cache_size, FLAGS_num_shard_bits);
228       if (!cache_) {
229         fprintf(stderr, "Clock cache not supported.\n");
230         exit(1);
231       }
232     } else {
233       LRUCacheOptions opts(FLAGS_cache_size, FLAGS_num_shard_bits, false, 0.5);
234 #ifndef ROCKSDB_LITE
235       if (!FLAGS_secondary_cache_uri.empty()) {
236         Status s =
237             ObjectRegistry::NewInstance()->NewSharedObject<SecondaryCache>(
238                 FLAGS_secondary_cache_uri, &secondary_cache);
239         if (secondary_cache == nullptr) {
240           fprintf(
241               stderr,
242               "No secondary cache registered matching string: %s status=%s\n",
243               FLAGS_secondary_cache_uri.c_str(), s.ToString().c_str());
244           exit(1);
245         }
246         opts.secondary_cache = secondary_cache;
247       }
248 #endif  // ROCKSDB_LITE
249 
250       cache_ = NewLRUCache(opts);
251     }
252   }
253 
~CacheBench()254   ~CacheBench() {}
255 
PopulateCache()256   void PopulateCache() {
257     Random64 rnd(1);
258     KeyGen keygen;
259     for (uint64_t i = 0; i < 2 * FLAGS_cache_size; i += FLAGS_value_bytes) {
260       cache_->Insert(keygen.GetRand(rnd, max_key_, max_log_), createValue(rnd),
261                      &helper1, FLAGS_value_bytes);
262     }
263   }
264 
Run()265   bool Run() {
266     const auto clock = SystemClock::Default().get();
267 
268     PrintEnv();
269     SharedState shared(this);
270     std::vector<std::unique_ptr<ThreadState> > threads(FLAGS_threads);
271     for (uint32_t i = 0; i < FLAGS_threads; i++) {
272       threads[i].reset(new ThreadState(i, &shared));
273       std::thread(ThreadBody, threads[i].get()).detach();
274     }
275 
276     HistogramImpl stats_hist;
277     std::string stats_report;
278     std::thread stats_thread(StatsBody, &shared, &stats_hist, &stats_report);
279 
280     uint64_t start_time;
281     {
282       MutexLock l(shared.GetMutex());
283       while (!shared.AllInitialized()) {
284         shared.GetCondVar()->Wait();
285       }
286       // Record start time
287       start_time = clock->NowMicros();
288 
289       // Start all threads
290       shared.SetStart();
291       shared.GetCondVar()->SignalAll();
292 
293       // Wait threads to complete
294       while (!shared.AllDone()) {
295         shared.GetCondVar()->Wait();
296       }
297     }
298 
299     // Stats gathering is considered background work. This time measurement
300     // is for foreground work, and not really ideal for that. See below.
301     uint64_t end_time = clock->NowMicros();
302     stats_thread.join();
303 
304     // Wall clock time - includes idle time if threads
305     // finish at different times (not ideal).
306     double elapsed_secs = static_cast<double>(end_time - start_time) * 1e-6;
307     uint32_t ops_per_sec = static_cast<uint32_t>(
308         1.0 * FLAGS_threads * FLAGS_ops_per_thread / elapsed_secs);
309     printf("Complete in %.3f s; Rough parallel ops/sec = %u\n", elapsed_secs,
310            ops_per_sec);
311 
312     // Total time in each thread (more accurate throughput measure)
313     elapsed_secs = 0;
314     for (uint32_t i = 0; i < FLAGS_threads; i++) {
315       elapsed_secs += threads[i]->duration_us * 1e-6;
316     }
317     ops_per_sec = static_cast<uint32_t>(1.0 * FLAGS_threads *
318                                         FLAGS_ops_per_thread / elapsed_secs);
319     printf("Thread ops/sec = %u\n", ops_per_sec);
320 
321     printf("\nOperation latency (ns):\n");
322     HistogramImpl combined;
323     for (uint32_t i = 0; i < FLAGS_threads; i++) {
324       combined.Merge(threads[i]->latency_ns_hist);
325     }
326     printf("%s", combined.ToString().c_str());
327 
328     if (FLAGS_gather_stats) {
329       printf("\nGather stats latency (us):\n");
330       printf("%s", stats_hist.ToString().c_str());
331     }
332 
333     printf("\n%s", stats_report.c_str());
334 
335     return true;
336   }
337 
338  private:
339   std::shared_ptr<Cache> cache_;
340   const uint64_t max_key_;
341   // Cumulative thresholds in the space of a random uint64_t
342   const uint64_t lookup_insert_threshold_;
343   const uint64_t insert_threshold_;
344   const uint64_t lookup_threshold_;
345   const uint64_t erase_threshold_;
346   const bool skewed_;
347   int max_log_;
348 
349   // A benchmark version of gathering stats on an active block cache by
350   // iterating over it. The primary purpose is to measure the impact of
351   // gathering stats with ApplyToAllEntries on throughput- and
352   // latency-sensitive Cache users. Performance of stats gathering is
353   // also reported. The last set of gathered stats is also reported, for
354   // manual sanity checking for logical errors or other unexpected
355   // behavior of cache_bench or the underlying Cache.
StatsBody(SharedState * shared,HistogramImpl * stats_hist,std::string * stats_report)356   static void StatsBody(SharedState* shared, HistogramImpl* stats_hist,
357                         std::string* stats_report) {
358     if (!FLAGS_gather_stats) {
359       return;
360     }
361     const auto clock = SystemClock::Default().get();
362     uint64_t total_key_size = 0;
363     uint64_t total_charge = 0;
364     uint64_t total_entry_count = 0;
365     std::set<Cache::DeleterFn> deleters;
366     StopWatchNano timer(clock);
367 
368     for (;;) {
369       uint64_t time;
370       time = clock->NowMicros();
371       uint64_t deadline = time + uint64_t{FLAGS_gather_stats_sleep_ms} * 1000;
372 
373       {
374         MutexLock l(shared->GetMutex());
375         for (;;) {
376           if (shared->AllDone()) {
377             std::ostringstream ostr;
378             ostr << "Most recent cache entry stats:\n"
379                  << "Number of entries: " << total_entry_count << "\n"
380                  << "Total charge: " << BytesToHumanString(total_charge) << "\n"
381                  << "Average key size: "
382                  << (1.0 * total_key_size / total_entry_count) << "\n"
383                  << "Average charge: "
384                  << BytesToHumanString(1.0 * total_charge / total_entry_count)
385                  << "\n"
386                  << "Unique deleters: " << deleters.size() << "\n";
387             *stats_report = ostr.str();
388             return;
389           }
390           if (clock->NowMicros() >= deadline) {
391             break;
392           }
393           uint64_t diff = deadline - std::min(clock->NowMicros(), deadline);
394           shared->GetCondVar()->TimedWait(diff + 1);
395         }
396       }
397 
398       // Now gather stats, outside of mutex
399       total_key_size = 0;
400       total_charge = 0;
401       total_entry_count = 0;
402       deleters.clear();
403       auto fn = [&](const Slice& key, void* /*value*/, size_t charge,
404                     Cache::DeleterFn deleter) {
405         total_key_size += key.size();
406         total_charge += charge;
407         ++total_entry_count;
408         // Something slightly more expensive as in (future) stats by category
409         deleters.insert(deleter);
410       };
411       timer.Start();
412       Cache::ApplyToAllEntriesOptions opts;
413       opts.average_entries_per_lock = FLAGS_gather_stats_entries_per_lock;
414       shared->GetCacheBench()->cache_->ApplyToAllEntries(fn, opts);
415       stats_hist->Add(timer.ElapsedNanos() / 1000);
416     }
417   }
418 
ThreadBody(ThreadState * thread)419   static void ThreadBody(ThreadState* thread) {
420     SharedState* shared = thread->shared;
421 
422     {
423       MutexLock l(shared->GetMutex());
424       shared->IncInitialized();
425       if (shared->AllInitialized()) {
426         shared->GetCondVar()->SignalAll();
427       }
428       while (!shared->Started()) {
429         shared->GetCondVar()->Wait();
430       }
431     }
432     thread->shared->GetCacheBench()->OperateCache(thread);
433 
434     {
435       MutexLock l(shared->GetMutex());
436       shared->IncDone();
437       if (shared->AllDone()) {
438         shared->GetCondVar()->SignalAll();
439       }
440     }
441   }
442 
OperateCache(ThreadState * thread)443   void OperateCache(ThreadState* thread) {
444     // To use looked-up values
445     uint64_t result = 0;
446     // To hold handles for a non-trivial amount of time
447     Cache::Handle* handle = nullptr;
448     KeyGen gen;
449     const auto clock = SystemClock::Default().get();
450     uint64_t start_time = clock->NowMicros();
451     StopWatchNano timer(clock);
452 
453     for (uint64_t i = 0; i < FLAGS_ops_per_thread; i++) {
454       timer.Start();
455       Slice key = gen.GetRand(thread->rnd, max_key_, max_log_);
456       uint64_t random_op = thread->rnd.Next();
457       Cache::CreateCallback create_cb =
458           [](void* buf, size_t size, void** out_obj, size_t* charge) -> Status {
459         *out_obj = reinterpret_cast<void*>(new char[size]);
460         memcpy(*out_obj, buf, size);
461         *charge = size;
462         return Status::OK();
463       };
464 
465       if (random_op < lookup_insert_threshold_) {
466         if (handle) {
467           cache_->Release(handle);
468           handle = nullptr;
469         }
470         // do lookup
471         handle = cache_->Lookup(key, &helper2, create_cb, Cache::Priority::LOW,
472                                 true);
473         if (handle) {
474           // do something with the data
475           result += NPHash64(static_cast<char*>(cache_->Value(handle)),
476                              FLAGS_value_bytes);
477         } else {
478           // do insert
479           cache_->Insert(key, createValue(thread->rnd), &helper2,
480                          FLAGS_value_bytes, &handle);
481         }
482       } else if (random_op < insert_threshold_) {
483         if (handle) {
484           cache_->Release(handle);
485           handle = nullptr;
486         }
487         // do insert
488         cache_->Insert(key, createValue(thread->rnd), &helper3,
489                        FLAGS_value_bytes, &handle);
490       } else if (random_op < lookup_threshold_) {
491         if (handle) {
492           cache_->Release(handle);
493           handle = nullptr;
494         }
495         // do lookup
496         handle = cache_->Lookup(key, &helper2, create_cb, Cache::Priority::LOW,
497                                 true);
498         if (handle) {
499           // do something with the data
500           result += NPHash64(static_cast<char*>(cache_->Value(handle)),
501                              FLAGS_value_bytes);
502         }
503       } else if (random_op < erase_threshold_) {
504         // do erase
505         cache_->Erase(key);
506       } else {
507         // Should be extremely unlikely (noop)
508         assert(random_op >= kHundredthUint64 * 100U);
509       }
510       thread->latency_ns_hist.Add(timer.ElapsedNanos());
511     }
512     if (handle) {
513       cache_->Release(handle);
514       handle = nullptr;
515     }
516     // Ensure computations on `result` are not optimized away.
517     if (result == 1) {
518       printf("You are extremely unlucky(2). Try again.\n");
519       exit(1);
520     }
521     thread->duration_us = clock->NowMicros() - start_time;
522   }
523 
PrintEnv() const524   void PrintEnv() const {
525     printf("RocksDB version     : %d.%d\n", kMajorVersion, kMinorVersion);
526     printf("Number of threads   : %u\n", FLAGS_threads);
527     printf("Ops per thread      : %" PRIu64 "\n", FLAGS_ops_per_thread);
528     printf("Cache size          : %s\n",
529            BytesToHumanString(FLAGS_cache_size).c_str());
530     printf("Num shard bits      : %u\n", FLAGS_num_shard_bits);
531     printf("Max key             : %" PRIu64 "\n", max_key_);
532     printf("Resident ratio      : %g\n", FLAGS_resident_ratio);
533     printf("Skew degree         : %u\n", FLAGS_skew);
534     printf("Populate cache      : %d\n", int{FLAGS_populate_cache});
535     printf("Lookup+Insert pct   : %u%%\n", FLAGS_lookup_insert_percent);
536     printf("Insert percentage   : %u%%\n", FLAGS_insert_percent);
537     printf("Lookup percentage   : %u%%\n", FLAGS_lookup_percent);
538     printf("Erase percentage    : %u%%\n", FLAGS_erase_percent);
539     std::ostringstream stats;
540     if (FLAGS_gather_stats) {
541       stats << "enabled (" << FLAGS_gather_stats_sleep_ms << "ms, "
542             << FLAGS_gather_stats_entries_per_lock << "/lock)";
543     } else {
544       stats << "disabled";
545     }
546     printf("Gather stats        : %s\n", stats.str().c_str());
547     printf("----------------------------\n");
548   }
549 };
550 
cache_bench_tool(int argc,char ** argv)551 int cache_bench_tool(int argc, char** argv) {
552   ParseCommandLineFlags(&argc, &argv, true);
553 
554   if (FLAGS_threads <= 0) {
555     fprintf(stderr, "threads number <= 0\n");
556     exit(1);
557   }
558 
559   ROCKSDB_NAMESPACE::CacheBench bench;
560   if (FLAGS_populate_cache) {
561     bench.PopulateCache();
562     printf("Population complete\n");
563     printf("----------------------------\n");
564   }
565   if (bench.Run()) {
566     return 0;
567   } else {
568     return 1;
569   }
570 }  // namespace ROCKSDB_NAMESPACE
571 }  // namespace ROCKSDB_NAMESPACE
572 
573 #endif  // GFLAGS
574